Python - multiprocessing
multiprocessing
Python 的 multiprocessing
模块的使用和 threading
类似。
multiprocessing
模块包含了很多有用的子模块:1
2
3
4
5
6
7
8__all__ = [
'Process', 'current_process', 'active_children', 'freeze_support',
'Manager', 'Pipe', 'cpu_count', 'log_to_stderr', 'get_logger',
'allow_connection_pickling', 'BufferTooShort', 'TimeoutError',
'Lock', 'RLock', 'Semaphore', 'BoundedSemaphore', 'Condition',
'Event', 'Queue', 'JoinableQueue', 'Pool', 'Value', 'Array',
'RawValue', 'RawArray', 'SUBDEBUG', 'SUBWARNING',
]
multiprocessing.Process
用于创建子进程,Process
对象提供了和threading.Thread
类似的使用API。1
2def __init__(group=None, target=None, name=None, args=(), kwargs={}, *, daemon=None):
pass
- group:
- target: 进程的执行方法,在
run()
方法内被调用。- name: 进程名称
- args: 传给进程执行方法 (target) 的参数,默认
()
。位置参数和关键字参数都可以传,不过一定要注意顺序,传一个参数的时候要注意(a,)
- kwargs: 传给进程执行方法 (target) 的参数,只能够传关键字参数,默认
{}
。- daemon: Python3.3 新增参数。可以通过
daemon
参数来定义创建的进程是否为 daemon 进程。
如果daemon
为True
,则创建的进程为 daemon 进程,否则,继承创建进程的daemon
属性。
- start()
启动进程 run()
进程执行的主要方法,方法里面实际上是调用了target
参数的函数。所以进程的执行函数可以通过target
参数传进来给run()
,也可以重写run()
方法。join(timeout=None)
阻塞当前程序,知道所有进程退出,一个进程可以被join()
多次。timeout
:None
,会一直阻塞,直到全部被join()
的进程执行完,才继续执行主体下面的代码。>=0
的 float 时间(seconds):阻塞一定时间。0
,即不阻塞。注意,经测试总阻塞时间为所有进程timeout
的时间总和。1
2
3
4process_list = [multiprocessing.Process(target=xxx) for _ in range(5)] # 创建 5 个进程
map(lambda p: p.start(), process_list) # 启动进程
map(lambda p: p.join(timeout=1), process_list) # 设置阻塞模式
print('END'.center(100, '-'))
创建进程,并且设置了每个进程阻塞 1s。所以
print('END'.center(100, '-'))
语句会在全部进程执行完或者 5s 之后才去执行。name
获取或设置进程名称。- ident
进程标识符(pid),非零整数。如果进程还没有开始,则返回None
。 - pid
即ident
- exitcode
进程的退出状态码。如果子进程还没有退出,则返回None
。 - authkey
获取或设置进程的认证码。 daemon
获取或设置进程 daemon 属性,设置 daemon 属性要在start()
之前才会生效。如果不设置,会继承创建进程的程序的daemon
属性值。如果进程为 daemon 进程,则主程序结束退出的时候,daemon 进程也会结束。1
2
3
4
5process_list = [threading.Thread(target=xxx) for _ in range(5)] # 穿件 5 个进程
map(lambda p: setattr(t, 'daemon', True), process_list)
map(lambda p: p.start(), thread_list) # 启动进程
time.sleep(5)
exit()程序创建了 5 个进程,并且都是 daemon 进程。所以,无论进程有没有执行完,主程序执行完 (5s后),所有进程也会退出。
注意:因为 daemon 进程是会随着主程序的退出而强制结束的,所以,当 daemon 进程的一些操作可能会得不到安全的结束 (如:操作文件的时候)。
start()
, join()
, is_alive()
, terminate()
和exitcode
只能够在主进程调用。
- is_alive()
返回当前进程是否存活。 - terminate()
结束进程。1
2current_process = multiprocessing.current_process()
current_process.terminate()
因此,multiprocessing
模块创建子进程的方式也是和 threading.Thread
模块一样:
方法一,通过
multiprocessing.Process
传target
参数1
2
3
4
5
6def process_run():
"""The sub process mainly run function."""
pass
process = multiprocessing.Process(target=process_run)
process.start()
process.join()方法二,通过继承
multiprocessing.Process
对象,重写run
方法1
2
3
4
5
6class SubProcess(multiprocessing.Process):
def run(self):
pass
process = SubProcess()
process.start()
process.join()
multiprocessing 模块其它常用函数
multiprocessing.current_thread()
返回当前子进程的Process
对象。multiprocessing.active_children()
返回还没有结束的子进程列表。multiprocessing.cpu_count()
返回当前系统的 CPU 数量。
进程间通信
- 管道(pipe):也叫无名管道,管道是一种半双工通信,数据只能够单方面流动,而且只能够在具有亲缘关系(如:父子进程)的进程间使用。
- 命名管道(FIFO):有名管道也是半双工通信,但是它允许在无亲缘关系的进程间通信。
- 消息队列(message queue):一个消息的链表,存放在内核中并由消息队列标识符标识。消息队列克服了管道只能够承载无格式字节流以及缓冲区大小限制等问题。
- 共享内存(share memory):共享内存就是映射一段能被其它进程访问的内存,这个内存由一个进程创建,但是可以被多个进程访问。共享内存是最快的 IPC 方式。
- 信号(signal):通过想向进程发送信号来通知进程要做什么事情。
- 信号量(semaphore):信号量是一个计数器,可以用来控制多个进程对共享资源的访问。它常作为一种锁机制,防止某个进程在访问共享资源时,其它进程也访问该资源。因此,主要作为进程间以及同一个进程的不同线程之间的同步手段。
- 套接字(socket):socket 也是一种进程间的通信机制。
multiprocessing 模块提供了以下进程间通信的方式:
multiprocessing.Queue
1 | def process_run(queue): |
multiprocessing.Pipe
Pipe(duplex=True)
会创建一条管道,返回一对multiprocessing.Connection
对象 (conn1, conn2)
分别表示管道连接的两端。
duplex=True
: 表示管道是双向的,即conn1
和conn2
都可以接收和发送信息。duplex=False
: 表示管道是单向的,conn1
只能够接收信息,conn2
只能够发送信息。
1 | def process_run(conn): |
multiprocessing.Value(typecode_or_type, *args, lock=True)
返回一个共享内存的ctypes
对象。multiprocessing.Value()
实际上是调用了multiprocessing.sharedctypes.Value()
。可以通过value
属性来访问共享内存的值。
typecode_or_type
: 返回对象的类型。可以是ctypes的类型或特定的类型字符。1
2
3
4
5
6
7
8typecode_to_type = {
'c': ctypes.c_char,
'b': ctypes.c_byte, 'B': ctypes.c_ubyte,
'h': ctypes.c_short, 'H': ctypes.c_ushort,
'i': ctypes.c_int, 'I': ctypes.c_uint,
'l': ctypes.c_long, 'L': ctypes.c_ulong,
'f': ctypes.c_float, 'd': ctypes.c_double
}*args
: *args 是传给指定typecode_or_type
类型的值,可以理解为初始化值。注意,typecode_or_type
的长度,如:c
为一个字节,即只能够传一个字节的字符串Value('c', 'a')
。lock
: keyword-only 参数。默认为True
,会自动添加一个递归锁来保护共享内存的读写。如果False
,在不添加锁,也可以传一个Lock
或Rlock
来代替默认的锁。
multiprocessing.Array(typecode_or_type, size_or_initializer, *, lock=True)
Array
实际上调用的是 multiprocessing.sharedctypes.Array
,用法类似 Value
,只不过Value
只能够保存一个相应的类型的值,而Array
则可以保存一个数组相应类型的值。
typecode_or_type
: 同Value
。size_or_initializer
: 初始化共享内存数组的值或指定该数组的大小。如果为整数时,则指定数组的大小,同时各个元素初始化的值为0;当为序列时,则代表初始化的值,并且该序列的长度为共享数组的长度。lock
: 同Value
1 | def process_run(mv, ma): |
multiprocessing.managers
通过multiprocessing木块的managers对象创建的数据可以在进程间共享,甚至可以和不同网络的不同机器上面的进程共享。
multiprocessing.Manager
1 | def Manager(): |
multiprocessing.Manager 返回一个 SyncManager
实例对象,用于创建已注册类型的共享数据(Queue
, list
, dict
等)。
默认注册的类型:1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19SyncManager.register('Queue', Queue.Queue)
SyncManager.register('JoinableQueue', Queue.Queue)
SyncManager.register('Event', threading.Event, EventProxy)
SyncManager.register('Lock', threading.Lock, AcquirerProxy)
SyncManager.register('RLock', threading.RLock, AcquirerProxy)
SyncManager.register('Semaphore', threading.Semaphore, AcquirerProxy)
SyncManager.register('BoundedSemaphore', threading.BoundedSemaphore,
AcquirerProxy)
SyncManager.register('Condition', threading.Condition, ConditionProxy)
SyncManager.register('Pool', Pool, PoolProxy)
SyncManager.register('list', list, ListProxy)
SyncManager.register('dict', dict, DictProxy)
SyncManager.register('Value', Value, ValueProxy)
SyncManager.register('Array', Array, ArrayProxy)
SyncManager.register('Namespace', Namespace, NamespaceProxy)
# types returned by methods of PoolProxy
SyncManager.register('Iterator', proxytype=IteratorProxy, create_method=False)
SyncManager.register('AsyncResult', create_method=False)
简单例子:1
2
3
4
5
6
7
8
9
10
11
12def process_run(l, d):
p = multiprocessing.current_process()
l.append(p.name)
d[p.pid] = p.name
m = multiprocessing.Manager()
l, d = m.list(), m.dict()
process = multiprocessing.Process(target=Process_run, args=(l, d))
process.start()
process.join()
print(l)
print(d)
multiprocessing.managers.BaseManager([address[, authkey])
Manager 的基类。
address
: 同参数address
start([initializer[, initargs])
启动一个子进程来开启Manager。如果initializer
不为空并且为可执行的对象创建子进程时会调用initializer(*initargs)
。connect()
连接Manager,一般用于客户端进程。register(ypeid[, callable[, proxytype[, exposed[, method_to_typeid[, create_method]]]]])
注册一个自定义类型或可执行对象。get_server()
返回Manager的Server对象。Sever对象支持serve_forever()
。shutdown()
关闭启动Manager的进程,start()
之后才有意义,可以多次调用stop()
。
自定义Manager,注册共享的方法,并在网络端口上暴露:1
2
3
4
5
6
7from multiprocessing.managers import BaseManager
from Queue import Queue
class QueueManager(BaseManager): pass
QueueManager.register('get_queue', lambda:Queue())
qm = QueueManager(address=('', 50000), authkey='queue_manager')
qm.get_server().serve_forever()
首先定义自定义一个Manager,然后注册要使用的属性或方法。注意,register()
是类方法,并且第二个参数是可调用对象,所以传Queue()
而不是Queue
其它进程访问,可以是本地也可以是不同网络的进程。client1:1
2
3
4
5
6
7
8from multiprocessing.managers import BaseManager
class QueueManager(BaseManager): pass
QueueManager.register('get_queue')
qm = QueueManager(address=('xxxx', 50000), authkey='queue_manager')
qm.connect()
q = qm.get_queue()
q.put('hello')
client2:1
2
3
4
5
6
7
8from multiprocessing.managers import BaseManager
class QueueManager(BaseManager): pass
QueueManager.register('get_queue')
qm = QueueManager(address=('xxxx', 50000), authkey='queue_manager')
qm.connect()
q = qm.get_queue()
q.get() # 'hello'
multiprocessing.Pool
进程池。Poll(processes=None, initializer=None, initargs=(), maxtasksperchild=None)
:
processes
: 进程数,如果为None
, 则去os.cpu_count()/multiprocessing.cpu_count()
的值。initializer
/initargs
: 如果initializer
不为None
, 则进程池的所有进程在启动的时候都会执行initializer(*initargs)
。maxtasksperchild
:apply(func[, *args[, **kwargs]])
进程池里面的一个进程会执行func(*args, **kwargs)
,并且apply
会阻塞主进程直到执行完毕。用法同python2的apply(*args, **kwargs)
,*args
必须为sequence,**kwargs
必须为字典。如:1
2
3
4
5def process_run(x, y=None):
return pow(x, y or x)
pools = multiprocessing.Pool(5)
result = pools.apply(process_run, (2,), {'y': 3})apply_async(func[, *args[, **kwargs[, callback[, error_callback]]]])
类似apply
,不过apply_async
是异步进行的,不会阻塞主进程,并且返回multiprocessing.pool.AsyncResult
对象。callback
: 可传入一个参数的可调用对象,当func(*args, **kwargs)
执行完毕有结果的时候,传入执行结果来调用。error_callback
: 可以传入一个参数的可调用对象,当func(*args, **kwargs)
执行出错的时候,传入Exception实例来调用。
1
2
3
4
5
6
7
8
9
10
11
12
13
14def process_run(x, y=None):
time.sleep(5)
return pow(x, y or x)
def callback(x):
print(x)
pools = multiprocessing.Pool(5)
# no block, but the result is a multiprocessing.pool.AsyncResult object
result = pools.apply_async(process_run, (x,), {'y': 3}, callback=callback)
# wait until the result is available or until timeout seconds pass.
result.wait(10)
# get the result value
result.get()map(func, iterable[, chunksize])
相当于并行的内置函数map()
,iterable
的每一个元素都会作为参数传给func
,然后进程池分别调用进程去执行。1
2
3
4
5
6def process_run(x, y):
return pow(x, y)
pools = multiprocessing.Pool(5)
result = pools.map(process_run, zip(range(10), range(10)))
print(result)map_async(func, iterable[, chunksize[, callback[, error_callback]]])
类似map
,不过map_async
是异步进行,不会阻塞主进程,并且返回multiprocessing.pool.AsyncResult
对象。
callback
和error_callback
的用同apply_async
。
Python2.7在类里面使用
apply()
,map()
会出现bug:cPickle.PicklingError: Can’t pickle: attribute lookup builtin.function failed
multiprocess.pool.AsyncResult
进程池异步调用返回结果,即由apply_async
和map_async
调用的返回结果。
get([timeout])
: 获取返回结果。timeout
为None
: 默认,会阻塞到有执行结果返回。timeout
不为None
:设置阻塞的秒数,当超时还没有结果返回时,会引起multiprocessing.TimeoutError
。
当异步执行出错时,exception 也会由
get
引起。wait([timeout])
: 等待,直到有执行结果返回或超过超时时间。ready()
: True/False,返回调用是否执行完毕。successful()
: 当调用程序执行完毕时,返回执行有没有出错(True/False);当调用程序没有执行完毕时,会引起AssertionError
。