Python Concurrency

Table of Contents

1. Python 多线程

1.1. 创建线程

Python 中创建线程。方法一,直接使用 threading.Thread() ,如:

#!/usr/bin/env python
# -*- coding: utf-8 -*-

import threading

def fun1(n):
    print(n)

if __name__ == "__main__":
    t1 = threading.Thread(target=fun1, args=("thread 1",))
    t1.start()
    t1.join()

方法二,继承 threading.Thread 类,重写其 run 方法,如:

#!/usr/bin/env python
# -*- coding: utf-8 -*-

import threading

class MyThread(threading.Thread):
    def __init__(self, n):
        threading.Thread.__init__(self)
        self.n = n

    def run(self):
        print(self.n)

if __name__ == "__main__":
    t1 = MyThread("thread 1")
    t1.start()
    t1.join()

1.2. Global Interpreter Lock

对于 CPU 密集型任务来说,Python 中多线程效率不高。这是由于 Global Interpreter Lock (GIL)存在,Python 里一个进程永远只能同时执行一个线程(即拿到 GIL 的线程)。

为什么 Python 中会有 GIL 呢? 这是在 Python 设计之初,为了数据安全所做的决定。某个线程想要执行,必须先拿到 GIL,且在每个 Python 进程中,只有一个 GIL。

注:Python 有很多实现(参考:Python Interpreter)。GIL 在 CPython(即官方实现)中才有,在 PyPy、Jython 等实现中并没有 GIL。

2. Python 多进程

多进程(multiprocessing 包)和多线程(threading 包)的很多 API 是相同的。

2.1. 创建进程

Python 中创建进程和创建线程类似。方法一:直接使用 multiprocessing.Process() ,如:

#!/usr/bin/env python
# -*- coding: utf-8 -*-

import multiprocessing

def fun1(n):
    print(n)

if __name__ == '__main__':
    p = multiprocessing.Process(target=fun1, args=('process 1',))
    p.start()
    p.join()

方法二, 继承 multiprocessing.Process 类,重写其 run 方法,如:

#!/usr/bin/env python
# -*- coding: utf-8 -*-

import multiprocessing

class MyProcess(multiprocessing.Process):
    def __init__(self, n):
        multiprocessing.Process.__init__(self)
        self.n = n

    def run(self):
        print(self.n)

if __name__ == "__main__":
    p = MyProcess("process 1")
    p.start()
    p.join()

2.2. 进程之间交换对象

可以使用 Pipes 和 Queues 在进程之间交换对象,可参考:https://docs.python.org/3/library/multiprocessing.html#pipes-and-queues

2.3. 共享内存(共享 ctypes 对象)

Python 中容易通过共享内存交换信息。
函数 multiprocessing.Value() 返回一个从共享内存中分配的 ctypes 对象。
函数 multiprocessing.Array() 返回一个从共享内存中分配的 ctypes 对象数组。

#!/usr/bin/env python
# -*- coding: utf-8 -*-

from multiprocessing import Process, Value, Array

def f(n, a):
    n.value = 3.1415927
    for i in range(len(a)):
        a[i] = 100 + a[i]

if __name__ == '__main__':
    num = Value('d', 0.0)
    arr = Array('i', range(10))

    p = Process(target=f, args=(num, arr))
    p.start()
    p.join()

    print(num.value)
    print(arr[:])

上面程序会输出:

3.1415927
[100, 101, 102, 103, 104, 105, 106, 107, 108, 109]

参考:https://docs.python.org/3/library/multiprocessing.html#shared-ctypes-objects

2.4. 进程同步(Lock,Condition,Semaphore,Event,Barrier)

Python 中,线程(threading 包)和进程(multiprocessing 包)都包含有 Lock,Condition,Semaphore,Event,Barrier 等同步设施。

参考:
https://docs.python.org/3/library/multiprocessing.html#synchronization-primitives
https://docs.python.org/3/library/threading.html

2.5. 进程池

使用 Pool 可以方便地创建进程池,常用方法如表 1 所示。

Table 1: 进程池常用方法
方法 含义
apply() 同步执行(串行)
apply_async() 异步执行(并行)
terminate() 立刻关闭进程池
close() 等待所有进程结束后,关闭进程池
join() 等待所有子进程退出。必须在 close()或 terminate()之后使用

下面是进程池的实例:

#!/usr/bin/env python
# -*- coding: utf-8 -*-

import os
import time
from multiprocessing import Pool

def show(num):
    print('subproc pid=' + str(os.getpid()))
    time.sleep(1)
    print('num=' + str(num))

if __name__=="__main__":
    print('main pid=' + str(os.getpid()))
    pool = Pool(processes = 2)       # 创建进程池,最多两个进程
    for i in range(6):
        pool.apply_async(show, args=(i, ))
    pool.close()
    pool.join()                      # 等待所有子进程结束
    print('end')

运行上面程序,可能得到下面输出:

main pid=5503
subproc pid=5504
subproc pid=5505
num=0
num=1
subproc pid=5504
subproc pid=5505
num=2
num=3
subproc pid=5504
subproc pid=5505
num=4
num=5
end

3. concurrent.futures

Python 3.2 中引入了 concurrent.futures 包。可以方便创建线程池(ThreadPoolExecutor)和进程池(ProcessPoolExecutor)。

参考:https://docs.python.org/3/library/concurrent.futures.html

3.1. ProcessPoolExecutor

下面是 ProcessPoolExecutor 的使用实例:

#!/usr/bin/env python3
# -*- coding: utf-8 -*-

import concurrent.futures
import math

PRIMES = [
    112272535095293,
    112582705942171,
    112272535095293,
    115280095190773,
    115797848077099,
    1099726899285419]

def is_prime(n):
    if n % 2 == 0:
        return False

    sqrt_n = int(math.floor(math.sqrt(n)))
    for i in range(3, sqrt_n + 1, 2):
        if n % i == 0:
            return False
    return True

def main():
    with concurrent.futures.ProcessPoolExecutor() as executor:
        for number, prime in zip(PRIMES, executor.map(is_prime, PRIMES)):
            print('%d is prime: %s' % (number, prime))

if __name__ == '__main__':
    main()

运行上面程序,将得到下面输出:

112272535095293 is prime: True
112582705942171 is prime: True
112272535095293 is prime: True
115280095190773 is prime: True
115797848077099 is prime: True
1099726899285419 is prime: False

3.2. ThreadPoolExecutor

下面是使用线程池同时复制 4 个文件的实例:

import shutil

# We can use a with statement to ensure threads are cleaned up promptly
with ThreadPoolExecutor(max_workers=4) as executor:         # 当所有提交的任务完成后,才会退出 with 语句,不需要额外的同步操作
    executor.submit(shutil.copy, 'src1.txt', 'dest1.txt')
    executor.submit(shutil.copy, 'src2.txt', 'dest2.txt')
    executor.submit(shutil.copy, 'src3.txt', 'dest3.txt')
    executor.submit(shutil.copy, 'src4.txt', 'dest4.txt')

下面是 ThreadPoolExecutor 的另一个使用实例:

#!/usr/bin/env python3
# -*- coding: utf-8 -*-

import concurrent.futures
import urllib.request

URLS = ['http://www.foxnews.com/',
        'http://www.cnn.com/',
        'http://europe.wsj.com/',
        'http://www.bbc.co.uk/',
        'http://some-made-up-domain.com/']

# Retrieve a single page and report the URL and contents
def load_url(url, timeout):
    with urllib.request.urlopen(url, timeout=timeout) as conn:
        return conn.read()

# We can use a with statement to ensure threads are cleaned up promptly
with concurrent.futures.ThreadPoolExecutor(max_workers=5) as executor:
    # Start the load operations and mark each future with its URL
    future_to_url = {executor.submit(load_url, url, 60): url for url in URLS}
    for future in concurrent.futures.as_completed(future_to_url):
        url = future_to_url[future]
        try:
            data = future.result()
        except Exception as exc:
            print('%r generated an exception: %s' % (url, exc))
        else:
            print('%r page is %d bytes' % (url, len(data)))

运行上面程序,可能得到下面输出:

'http://www.foxnews.com/' page is 222634 bytes
'http://some-made-up-domain.com/' generated an exception: <urlopen error [Errno 8] nodename nor servname provided, or not known>
'http://www.cnn.com/' page is 166686 bytes
'http://europe.wsj.com/' generated an exception: <urlopen error [Errno 61] Connection refused>
'http://www.bbc.co.uk/' page is 314815 bytes

4. asyncio

asyncio 是 Python 3.4 中引入的标准库,内置了对异步 I/O 的支持。在 Python 3.5 中,引入了 async/await 语法,进一步简化了异步 I/O 代码。

下面是 async/await 的简单例子:

import asyncio

async def main():             # 定义了一个 coroutine
    print("hello")
    await asyncio.sleep(1)
    print("world")

asyncio.run(main())           # asyncio.run 是在 Python 3.7 中引入的

# 注:在 Python 3.7 以前,可以这样执行上面的 async 函数
# loop = asyncio.get_event_loop()       # 获取 EventLoop
# loop.run_until_complete(main())       # 执行 Coroutine
# loop.close()

再看一个例子:

import asyncio
import time

async def say_after(delay, what):       # 定义了一个 coroutine
    await asyncio.sleep(delay)
    print(what)

async def main():                       # 定义了一个 coroutine
    print(f"started at {time.strftime('%X')}")

    await say_after(1, 'hello')         # 执行并等待 coroutine
    await say_after(2, 'world')         # 执行并等待 coroutine

    print(f"finished at {time.strftime('%X')}")

asyncio.run(main())

上面程序的一个输出实例:

started at 17:13:52
hello
world
finished at 17:13:55

程序执行后,sleep 1 秒后才会输出 hello,再 sleep 2 秒后输出 world。从开始到结束一共用了约 3 秒时间。

如果我们想并行地执行 coroutine say_after(1, 'hello')say_after(2, 'world') ,则可以使用 asyncio.create_task 来创建并执行它们:

async def main():
    print(f"started at {time.strftime('%X')}")

    task1 = asyncio.create_task(       # 开始执行 coroutine
        say_after(1, 'hello'))

    task2 = asyncio.create_task(       # 开始执行 coroutine
        say_after(2, 'world'))

    # Wait until both tasks are completed (should take around 2 seconds.)
    await task1
    await task2

    print(f"finished at {time.strftime('%X')}")

上面程序的一个输出:

started at 17:14:32
hello
world
finished at 17:14:34

这个程序只会花费约 2 秒时间,比前面的程序要少 1 秒。

5. 参考

Python 参考手册(第 4 版)
Concurrent Execution: https://docs.python.org/3/library/concurrency.html
asyncio — Asynchronous I/O: https://docs.python.org/3/library/asyncio.html

Author: cig01

Created: <2018-04-01 Sun>

Last updated: <2018-04-15 Sun>

Creator: Emacs 27.1 (Org mode 9.4)