multiprocessing

Python 的 multiprocessing 模块的使用和 threading 类似。

multiprocessing 模块包含了很多有用的子模块:

1
2
3
4
5
6
7
8
__all__ = [
'Process', 'current_process', 'active_children', 'freeze_support',
'Manager', 'Pipe', 'cpu_count', 'log_to_stderr', 'get_logger',
'allow_connection_pickling', 'BufferTooShort', 'TimeoutError',
'Lock', 'RLock', 'Semaphore', 'BoundedSemaphore', 'Condition',
'Event', 'Queue', 'JoinableQueue', 'Pool', 'Value', 'Array',
'RawValue', 'RawArray', 'SUBDEBUG', 'SUBWARNING',
]

multiprocessing.Process

用于创建子进程,Process 对象提供了和threading.Thread类似的使用API。

1
2
def __init__(group=None, target=None, name=None, args=(), kwargs={}, *, daemon=None):
pass

  • group:
  • target: 进程的执行方法,在 run() 方法内被调用。
  • name: 进程名称
  • args: 传给进程执行方法 (target) 的参数,默认 ()。位置参数和关键字参数都可以传,不过一定要注意顺序,传一个参数的时候要注意 (a,)
  • kwargs: 传给进程执行方法 (target) 的参数,只能够传关键字参数,默认 {}
  • daemon: Python3.3 新增参数。可以通过 daemon 参数来定义创建的进程是否为 daemon 进程。
    如果 daemonTrue,则创建的进程为 daemon 进程,否则,继承创建进程的 daemon 属性。
  • start()
    启动进程
  • run()
    进程执行的主要方法,方法里面实际上是调用了 target 参数的函数。所以进程的执行函数可以通过 target 参数传进来给 run(),也可以重写 run() 方法。

  • join(timeout=None)
    阻塞当前程序,知道所有进程退出,一个进程可以被 join() 多次。timeout

    • None,会一直阻塞,直到全部被 join() 的进程执行完,才继续执行主体下面的代码。
    • >=0 的 float 时间(seconds):阻塞一定时间。0,即不阻塞。注意,经测试总阻塞时间为所有进程 timeout 的时间总和。
      1
      2
      3
      4
      process_list = [multiprocessing.Process(target=xxx) for _ in range(5)]  # 创建 5 个进程
      map(lambda p: p.start(), process_list) # 启动进程
      map(lambda p: p.join(timeout=1), process_list) # 设置阻塞模式
      print('END'.center(100, '-'))

    创建进程,并且设置了每个进程阻塞 1s。所以 print('END'.center(100, '-')) 语句会在全部进程执行完或者 5s 之后才去执行。

  • name
    获取或设置进程名称。

  • ident
    进程标识符(pid),非零整数。如果进程还没有开始,则返回None
  • pid
    ident
  • exitcode
    进程的退出状态码。如果子进程还没有退出,则返回None
  • authkey
    获取或设置进程的认证码。
  • daemon
    获取或设置进程 daemon 属性,设置 daemon 属性要在 start() 之前才会生效。如果不设置,会继承创建进程的程序的 daemon 属性值。如果进程为 daemon 进程,则主程序结束退出的时候,daemon 进程也会结束。

    1
    2
    3
    4
    5
    process_list = [threading.Thread(target=xxx) for _ in range(5)]  # 穿件 5 个进程
    map(lambda p: setattr(t, 'daemon', True), process_list)
    map(lambda p: p.start(), thread_list) # 启动进程
    time.sleep(5)
    exit()

    程序创建了 5 个进程,并且都是 daemon 进程。所以,无论进程有没有执行完,主程序执行完 (5s后),所有进程也会退出。

注意:因为 daemon 进程是会随着主程序的退出而强制结束的,所以,当 daemon 进程的一些操作可能会得不到安全的结束 (如:操作文件的时候)。

start(), join(), is_alive(), terminate()exitcode只能够在主进程调用。

  • is_alive()
    返回当前进程是否存活。
  • terminate()
    结束进程。
    1
    2
    current_process = multiprocessing.current_process()
    current_process.terminate()

因此,multiprocessing 模块创建子进程的方式也是和 threading.Thread 模块一样:

  • 方法一,通过 multiprocessing.Processtarget 参数

    1
    2
    3
    4
    5
    6
    def process_run():
    """The sub process mainly run function."""
    pass
    process = multiprocessing.Process(target=process_run)
    process.start()
    process.join()
  • 方法二,通过继承 multiprocessing.Process 对象,重写 run 方法

    1
    2
    3
    4
    5
    6
    class SubProcess(multiprocessing.Process):
    def run(self):
    pass
    process = SubProcess()
    process.start()
    process.join()



multiprocessing 模块其它常用函数

  • multiprocessing.current_thread()
    返回当前子进程的 Process 对象。

  • multiprocessing.active_children()
    返回还没有结束的子进程列表。

  • multiprocessing.cpu_count()
    返回当前系统的 CPU 数量。



进程间通信

  • 管道(pipe):也叫无名管道,管道是一种半双工通信,数据只能够单方面流动,而且只能够在具有亲缘关系(如:父子进程)的进程间使用。
  • 命名管道(FIFO):有名管道也是半双工通信,但是它允许在无亲缘关系的进程间通信。
  • 消息队列(message queue):一个消息的链表,存放在内核中并由消息队列标识符标识。消息队列克服了管道只能够承载无格式字节流以及缓冲区大小限制等问题。
  • 共享内存(share memory):共享内存就是映射一段能被其它进程访问的内存,这个内存由一个进程创建,但是可以被多个进程访问。共享内存是最快的 IPC 方式。
  • 信号(signal):通过想向进程发送信号来通知进程要做什么事情。
  • 信号量(semaphore):信号量是一个计数器,可以用来控制多个进程对共享资源的访问。它常作为一种锁机制,防止某个进程在访问共享资源时,其它进程也访问该资源。因此,主要作为进程间以及同一个进程的不同线程之间的同步手段。
  • 套接字(socket):socket 也是一种进程间的通信机制。

reference

multiprocessing 模块提供了以下进程间通信的方式:

multiprocessing.Queue

1
2
3
4
5
6
7
8
9
def process_run(queue):
process = multiprocessing.current_process()
queue.put(process.name)

q = multiprocessing.Queue()
process = multiprocessing.Process(target=processe_run, args=(q,), name='sub_process')
process.daemon = True
process.start()
print(q.get()) # sub_process

multiprocessing.Pipe

Pipe(duplex=True) 会创建一条管道,返回一对multiprocessing.Connection对象 (conn1, conn2) 分别表示管道连接的两端。

  • duplex=True: 表示管道是双向的,即 conn1conn2 都可以接收和发送信息。
  • duplex=False: 表示管道是单向的,conn1 只能够接收信息,conn2只能够发送信息。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
def process_run(conn):
process = multiprocessing.current_process()
while True:
msg = conn.recv()
if msg == 'start':
conn.send('%s is starting' % process.name)
break

conn1, conn2 = multiprocessing.Pipe()
process = multiprocessing.Process(target=process_run, args=(conn2,))
process.daemon = True
process.start()
time.sleep(3)
conn1.send('start')
conn1.recv()

multiprocessing.Value(typecode_or_type, *args, lock=True)

返回一个共享内存的ctypes对象。multiprocessing.Value()实际上是调用了multiprocessing.sharedctypes.Value()。可以通过value属性来访问共享内存的值。

  • typecode_or_type: 返回对象的类型。可以是ctypes的类型或特定的类型字符。

    1
    2
    3
    4
    5
    6
    7
    8
    typecode_to_type = {
    'c': ctypes.c_char,
    'b': ctypes.c_byte, 'B': ctypes.c_ubyte,
    'h': ctypes.c_short, 'H': ctypes.c_ushort,
    'i': ctypes.c_int, 'I': ctypes.c_uint,
    'l': ctypes.c_long, 'L': ctypes.c_ulong,
    'f': ctypes.c_float, 'd': ctypes.c_double
    }
  • *args: *args 是传给指定 typecode_or_type 类型的值,可以理解为初始化值。注意,typecode_or_type 的长度,如: c 为一个字节,即只能够传一个字节的字符串Value('c', 'a')

  • lock: keyword-only 参数。默认为True,会自动添加一个递归锁来保护共享内存的读写。如果False,在不添加锁,也可以传一个LockRlock来代替默认的锁。

multiprocessing.Array(typecode_or_type, size_or_initializer, *, lock=True)

Array 实际上调用的是 multiprocessing.sharedctypes.Array,用法类似 Value,只不过Value只能够保存一个相应的类型的值,而Array则可以保存一个数组相应类型的值。

  • typecode_or_type: 同Value
  • size_or_initializer: 初始化共享内存数组的值或指定该数组的大小。如果为整数时,则指定数组的大小,同时各个元素初始化的值为0;当为序列时,则代表初始化的值,并且该序列的长度为共享数组的长度。
  • lock: 同Value
1
2
3
4
5
6
7
8
9
10
11
12
13
14
def process_run(mv, ma):
print(mv.value)
mv.value = 1
for i in range(len(ma)):
print(ma[i])
ma[i] = i

mv = multiprocessing.Value('i', 0)
ma = multiprocessing.Array('i', 10)
process = multiprocessing.Process(target=process_run, args=(mv, ma))
process.start()
process.join()
print(mv.value)
print({i: ma[i] for in in range(len(ma))})



multiprocessing.managers

通过multiprocessing木块的managers对象创建的数据可以在进程间共享,甚至可以和不同网络的不同机器上面的进程共享。

multiprocessing.Manager

1
2
3
4
5
def Manager():
from multiprocessing.managers import SyncManager
m = SyncManager()
m.start()
return m

multiprocessing.Manager 返回一个 SyncManager 实例对象,用于创建已注册类型的共享数据(Queue, list, dict 等)。

默认注册的类型:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
SyncManager.register('Queue', Queue.Queue)
SyncManager.register('JoinableQueue', Queue.Queue)
SyncManager.register('Event', threading.Event, EventProxy)
SyncManager.register('Lock', threading.Lock, AcquirerProxy)
SyncManager.register('RLock', threading.RLock, AcquirerProxy)
SyncManager.register('Semaphore', threading.Semaphore, AcquirerProxy)
SyncManager.register('BoundedSemaphore', threading.BoundedSemaphore,
AcquirerProxy)
SyncManager.register('Condition', threading.Condition, ConditionProxy)
SyncManager.register('Pool', Pool, PoolProxy)
SyncManager.register('list', list, ListProxy)
SyncManager.register('dict', dict, DictProxy)
SyncManager.register('Value', Value, ValueProxy)
SyncManager.register('Array', Array, ArrayProxy)
SyncManager.register('Namespace', Namespace, NamespaceProxy)

# types returned by methods of PoolProxy
SyncManager.register('Iterator', proxytype=IteratorProxy, create_method=False)
SyncManager.register('AsyncResult', create_method=False)

简单例子:

1
2
3
4
5
6
7
8
9
10
11
12
def process_run(l, d):
p = multiprocessing.current_process()
l.append(p.name)
d[p.pid] = p.name

m = multiprocessing.Manager()
l, d = m.list(), m.dict()
process = multiprocessing.Process(target=Process_run, args=(l, d))
process.start()
process.join()
print(l)
print(d)

multiprocessing.managers.BaseManager([address[, authkey])

Manager 的基类。

  • address: 同参数address

  • start([initializer[, initargs])

    启动一个子进程来开启Manager。如果initializer不为空并且为可执行的对象创建子进程时会调用initializer(*initargs)

  • connect()

    连接Manager,一般用于客户端进程。

  • register(ypeid[, callable[, proxytype[, exposed[, method_to_typeid[, create_method]]]]])

    注册一个自定义类型或可执行对象。

  • get_server()

    返回Manager的Server对象。Sever对象支持serve_forever()

  • shutdown()

    关闭启动Manager的进程,start()之后才有意义,可以多次调用stop()

自定义Manager,注册共享的方法,并在网络端口上暴露:

1
2
3
4
5
6
7
from multiprocessing.managers import BaseManager
from Queue import Queue

class QueueManager(BaseManager): pass
QueueManager.register('get_queue', lambda:Queue())
qm = QueueManager(address=('', 50000), authkey='queue_manager')
qm.get_server().serve_forever()

首先定义自定义一个Manager,然后注册要使用的属性或方法。注意,register() 是类方法,并且第二个参数是可调用对象,所以传Queue()而不是Queue

其它进程访问,可以是本地也可以是不同网络的进程。client1:

1
2
3
4
5
6
7
8
from multiprocessing.managers import BaseManager

class QueueManager(BaseManager): pass
QueueManager.register('get_queue')
qm = QueueManager(address=('xxxx', 50000), authkey='queue_manager')
qm.connect()
q = qm.get_queue()
q.put('hello')

client2:

1
2
3
4
5
6
7
8
from multiprocessing.managers import BaseManager

class QueueManager(BaseManager): pass
QueueManager.register('get_queue')
qm = QueueManager(address=('xxxx', 50000), authkey='queue_manager')
qm.connect()
q = qm.get_queue()
q.get() # 'hello'



multiprocessing.Pool

进程池。Poll(processes=None, initializer=None, initargs=(), maxtasksperchild=None):

  • processes: 进程数,如果为None, 则去os.cpu_count()/multiprocessing.cpu_count()的值。
  • initializer/initargs: 如果initializer不为None, 则进程池的所有进程在启动的时候都会执行initializer(*initargs)
  • maxtasksperchild:

  • apply(func[, *args[, **kwargs]])
    进程池里面的一个进程会执行func(*args, **kwargs),并且apply会阻塞主进程直到执行完毕。用法同python2的apply(*args, **kwargs)*args必须为sequence,**kwargs必须为字典。如:

    1
    2
    3
    4
    5
    def process_run(x, y=None):
    return pow(x, y or x)

    pools = multiprocessing.Pool(5)
    result = pools.apply(process_run, (2,), {'y': 3})
  • apply_async(func[, *args[, **kwargs[, callback[, error_callback]]]])

    类似apply,不过apply_async是异步进行的,不会阻塞主进程,并且返回 multiprocessing.pool.AsyncResult 对象。

    • callback: 可传入一个参数的可调用对象,当func(*args, **kwargs)执行完毕有结果的时候,传入执行结果来调用。
    • error_callback: 可以传入一个参数的可调用对象,当func(*args, **kwargs)执行出错的时候,传入Exception实例来调用。
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    def process_run(x, y=None):
    time.sleep(5)
    return pow(x, y or x)

    def callback(x):
    print(x)

    pools = multiprocessing.Pool(5)
    # no block, but the result is a multiprocessing.pool.AsyncResult object
    result = pools.apply_async(process_run, (x,), {'y': 3}, callback=callback)
    # wait until the result is available or until timeout seconds pass.
    result.wait(10)
    # get the result value
    result.get()
  • map(func, iterable[, chunksize])

    相当于并行的内置函数map()iterable的每一个元素都会作为参数传给func,然后进程池分别调用进程去执行。

    1
    2
    3
    4
    5
    6
    def process_run(x, y):
    return pow(x, y)

    pools = multiprocessing.Pool(5)
    result = pools.map(process_run, zip(range(10), range(10)))
    print(result)
  • map_async(func, iterable[, chunksize[, callback[, error_callback]]])
    类似map,不过map_async是异步进行,不会阻塞主进程,并且返回 multiprocessing.pool.AsyncResult 对象。

callbackerror_callback的用同apply_async

Python2.7在类里面使用apply(),map()会出现bug:cPickle.PicklingError: Can’t pickle : attribute lookup builtin.function failed

multiprocess.pool.AsyncResult

进程池异步调用返回结果,即由apply_asyncmap_async调用的返回结果。

  • get([timeout]): 获取返回结果。

    • timeoutNone: 默认,会阻塞到有执行结果返回。
    • timeout不为None:设置阻塞的秒数,当超时还没有结果返回时,会引起 multiprocessing.TimeoutError

    当异步执行出错时,exception 也会由get引起。

  • wait([timeout]): 等待,直到有执行结果返回或超过超时时间。

  • ready(): True/False,返回调用是否执行完毕。
  • successful(): 当调用程序执行完毕时,返回执行有没有出错(True/False);当调用程序没有执行完毕时,会引起AssertionError