Python Concurrency

Table of Contents

1 Python多线程

1.1 创建线程

Python中创建线程。方法一,直接使用 threading.Thread() ,如:

#!/usr/bin/env python
# -*- coding: utf-8 -*-

import threading

def fun1(n):
    print(n)

if __name__ == "__main__":
    t1 = threading.Thread(target=fun1, args=("thread 1",))
    t1.start()
    t1.join()

方法二,继承 threading.Thread 类,重写其 run 方法,如:

#!/usr/bin/env python
# -*- coding: utf-8 -*-

import threading

class MyThread(threading.Thread):
    def __init__(self, n):
        threading.Thread.__init__(self)
        self.n = n

    def run(self):
        print(self.n)

if __name__ == "__main__":
    t1 = MyThread("thread 1")
    t1.start()
    t1.join()

1.2 Global Interpreter Lock

对于CPU密集型任务来说,Python中多线程效率不高。这是由于Global Interpreter Lock (GIL)存在,Python 里一个进程永远只能同时执行一个线程(即拿到GIL的线程)。

为什么Python中会有GIL呢? 这是在Python设计之初,为了数据安全所做的决定。某个线程想要执行,必须先拿到 GIL,且在每个Python进程中,只有一个GIL。

注:Python有很多实现(参考:Python Interpreter)。GIL在CPython(即官方实现)中才有,在PyPy、Jython等实现中并没有GIL。

2 Python多进程

多进程(multiprocessing包)和多线程(threading包)的很多API是相同的。

2.1 创建进程

Python中创建进程和创建线程类似。方法一:直接使用 multiprocessing.Process() ,如:

#!/usr/bin/env python
# -*- coding: utf-8 -*-

import multiprocessing

def fun1(n):
    print(n)

if __name__ == '__main__':
    p = multiprocessing.Process(target=fun1, args=('process 1',))
    p.start()
    p.join()

方法二, 继承 multiprocessing.Process 类,重写其 run 方法,如:

#!/usr/bin/env python
# -*- coding: utf-8 -*-

import multiprocessing

class MyProcess(multiprocessing.Process):
    def __init__(self, n):
        multiprocessing.Process.__init__(self)
        self.n = n

    def run(self):
        print(self.n)

if __name__ == "__main__":
    p = MyProcess("process 1")
    p.start()
    p.join()

2.2 进程之间交换对象

可以使用Pipes和Queues在进程之间交换对象,可参考:https://docs.python.org/3/library/multiprocessing.html#pipes-and-queues

2.3 共享内存(共享ctypes对象)

Python中容易通过共享内存交换信息。
函数 multiprocessing.Value() 返回一个从共享内存中分配的 ctypes 对象。
函数 multiprocessing.Array() 返回一个从共享内存中分配的 ctypes 对象数组。

#!/usr/bin/env python
# -*- coding: utf-8 -*-

from multiprocessing import Process, Value, Array

def f(n, a):
    n.value = 3.1415927
    for i in range(len(a)):
        a[i] = 100 + a[i]

if __name__ == '__main__':
    num = Value('d', 0.0)
    arr = Array('i', range(10))

    p = Process(target=f, args=(num, arr))
    p.start()
    p.join()

    print(num.value)
    print(arr[:])

上面程序会输出:

3.1415927
[100, 101, 102, 103, 104, 105, 106, 107, 108, 109]

参考:https://docs.python.org/3/library/multiprocessing.html#shared-ctypes-objects

2.4 进程之间同步

Python中,线程(threading包)和进程(multiprocessing包)都包含有Lock,Condition,Semaphore,Event,Barrier等同步设施。可参考:https://docs.python.org/3/library/multiprocessing.html#synchronization-primitives

2.5 进程池

使用Pool可以方便地创建进程池,常用方法如表 1 所示。

Table 1: 进程池常用方法
方法 含义
apply() 同步执行(串行)
apply_async() 异步执行(并行)
terminate() 立刻关闭进程池
close() 等待所有进程结束后,关闭进程池
join() 等待所有子进程退出。必须在close()或terminate()之后使用

下面是进程池的实例:

#!/usr/bin/env python
# -*- coding: utf-8 -*-

import os
import time
from multiprocessing import Pool

def show(num):
    print('subproc pid=' + str(os.getpid()))
    time.sleep(1)
    print('num=' + str(num))

if __name__=="__main__":
    print('main pid=' + str(os.getpid()))
    pool = Pool(processes = 2)       # 创建进程池,最多两个进程
    for i in range(6):
        pool.apply_async(show, args=(i, ))
    pool.close()
    pool.join()                      # 等待所有子进程结束
    print('end')

运行上面程序,可能得到下面输出:

main pid=5503
subproc pid=5504
subproc pid=5505
num=0
num=1
subproc pid=5504
subproc pid=5505
num=2
num=3
subproc pid=5504
subproc pid=5505
num=4
num=5
end

3 concurrent.futures

Python 3.2中引入了concurrent.futures包。可以方便创建线程池(ThreadPoolExecutor)和进程池(ProcessPoolExecutor)。

参考:https://docs.python.org/3/library/concurrent.futures.html

3.1 ProcessPoolExecutor

下面是ProcessPoolExecutor的使用实例:

#!/usr/bin/env python3
# -*- coding: utf-8 -*-

import concurrent.futures
import math

PRIMES = [
    112272535095293,
    112582705942171,
    112272535095293,
    115280095190773,
    115797848077099,
    1099726899285419]

def is_prime(n):
    if n % 2 == 0:
        return False

    sqrt_n = int(math.floor(math.sqrt(n)))
    for i in range(3, sqrt_n + 1, 2):
        if n % i == 0:
            return False
    return True

def main():
    with concurrent.futures.ProcessPoolExecutor() as executor:
        for number, prime in zip(PRIMES, executor.map(is_prime, PRIMES)):
            print('%d is prime: %s' % (number, prime))

if __name__ == '__main__':
    main()

运行上面程序,将得到下面输出:

112272535095293 is prime: True
112582705942171 is prime: True
112272535095293 is prime: True
115280095190773 is prime: True
115797848077099 is prime: True
1099726899285419 is prime: False

3.2 ThreadPoolExecutor

下面是使用线程池同时复制4个文件的实例:

import shutil
with ThreadPoolExecutor(max_workers=4) as e:
    e.submit(shutil.copy, 'src1.txt', 'dest1.txt')
    e.submit(shutil.copy, 'src2.txt', 'dest2.txt')
    e.submit(shutil.copy, 'src3.txt', 'dest3.txt')
    e.submit(shutil.copy, 'src4.txt', 'dest4.txt')

下面是ThreadPoolExecutor的另一个使用实例:

#!/usr/bin/env python3
# -*- coding: utf-8 -*-

import concurrent.futures
import urllib.request

URLS = ['http://www.foxnews.com/',
        'http://www.cnn.com/',
        'http://europe.wsj.com/',
        'http://www.bbc.co.uk/',
        'http://some-made-up-domain.com/']

# Retrieve a single page and report the URL and contents
def load_url(url, timeout):
    with urllib.request.urlopen(url, timeout=timeout) as conn:
        return conn.read()

# We can use a with statement to ensure threads are cleaned up promptly
with concurrent.futures.ThreadPoolExecutor(max_workers=5) as executor:
    # Start the load operations and mark each future with its URL
    future_to_url = {executor.submit(load_url, url, 60): url for url in URLS}
    for future in concurrent.futures.as_completed(future_to_url):
        url = future_to_url[future]
        try:
            data = future.result()
        except Exception as exc:
            print('%r generated an exception: %s' % (url, exc))
        else:
            print('%r page is %d bytes' % (url, len(data)))

运行上面程序,可能得到下面输出:

'http://www.foxnews.com/' page is 222634 bytes
'http://some-made-up-domain.com/' generated an exception: <urlopen error [Errno 8] nodename nor servname provided, or not known>
'http://www.cnn.com/' page is 166686 bytes
'http://europe.wsj.com/' generated an exception: <urlopen error [Errno 61] Connection refused>
'http://www.bbc.co.uk/' page is 314815 bytes

4 参考

Python参考手册(第4版)
Concurrent Execution: https://docs.python.org/3/library/concurrency.html


Author: cig01

Created: <2018-04-01 Sun 00:00>

Last updated: <2018-04-15 Sun 10:46>

Creator: Emacs 25.3.1 (Org mode 9.1.4)