multiprocessing.Pool 详解¶
创建¶
from multiprocessing import Pool
pool = Pool(4)
创建 worker pool 一般只需指定 worker processes 的数目即可。
apply*¶
- apply_async(func[, args[, kwds]])¶
- apply(func[, args[, kwds]])¶
apply_async 将任务提交后立即返回一个 result 对象,不阻塞,后续程序调用 result.get()
来阻塞获取执行的结果。
r1 = pool.apply_async(func1, args1, kws1)
r2 = pool.apply_async(func2, args2, kws2)
print r1.get()
print r2.get()
apply 就是 apply_async(...).get()
添加任务并立即阻塞等待结果。一般不会直接使用这个函数。
map*¶
- map(func, iterable[, chunksize])¶
- map_async(func, iterable[, chunksize])¶
- starmap(func, iterable[, chunksize])¶
- starmap_async(func, iterable[, chunksize])¶
首先,map 系列函数基本都有一个可选的 chunksize 参数,map 系列函数并不是一个一个迭代 iterable 发给 worker pool 去处理,而是将 iterable 按照 chunksize 切成一个一个的 chunk 然后再发给 worker pool 去处理。这样可以减少和 worker process 的交互,加快任务的执行速度。
task_batches = Pool._get_tasks(func, iterable, chunksize)
@staticmethod
def _get_tasks(func, it, size):
it = iter(it)
while 1:
x = tuple(itertools.islice(it, size))
if not x:
return
yield (func, x)
map 就是 map_async(...).get()
, starmap 就是 starmap_async(...).get()
。
map 和 starmap 的区别就是 map 传给 func 的只能是一个参数,而 starmap 可以多个参数。当然 map 也可以将多个参数打包到 tuple 中再传给 func。
def func_a(x):
return x*x
def func_b(x, y):
return x*y
p.map(func_a, range(10))
p.starmap(func_b, ((i,i) for i in range(10)))
- imap(func, iterable[, chunksize])¶
- imap_unordered(func, iterable[, chunksize])¶
imap 和 map 不一样的地方在于,map 是先执行了 list(iterable)
然后再将任务分 chunk 提交给 worker pool,而 imap 一次只会 list()
一个 chunk,并且默认 chunksize 为 1。如果 list(iterable)
要消耗大量的内存,可以考虑使用 imap 函数。一般 imap 比 map 慢。
imap 和 map 另外一个不一样的地方是 imap 函数返回的结果是一个迭代器,这样每个 func 执行完后可以立即获得执行结果,而不是像 map 要到所有全部执行完毕才能获得结果。
imap 和 imap_unordered 不一样的地方在于一个是按输入参数 iterable 的顺序返回结果的,一个是按任务的完成先后返回结果的。
参考: