在Python多处理库中,是否有支持多个参数的pool.map变体?
import multiprocessing
text = "test"
def harvester(text, case):
X = case[0]
text + str(X)
if __name__ == '__main__':
pool = multiprocessing.Pool(processes=6)
case = RAW_DATASET
pool.map(harvester(text, case), case, 1)
pool.close()
pool.join()
这里有很多答案,但似乎没有一个能提供适用于任何版本的Python 2/3兼容代码。如果您希望代码能够正常工作,这将适用于以下任一Python版本:
# For python 2/3 compatibility, define pool context manager
# to support the 'with' statement in Python 2
if sys.version_info[0] == 2:
from contextlib import contextmanager
@contextmanager
def multiprocessing_context(*args, **kwargs):
pool = multiprocessing.Pool(*args, **kwargs)
yield pool
pool.terminate()
else:
multiprocessing_context = multiprocessing.Pool
之后,您可以使用常规的Python3方式进行多处理。例如:
def _function_to_run_for_each(x):
return x.lower()
with multiprocessing_context(processes=3) as pool:
results = pool.map(_function_to_run_for_each, ['Bob', 'Sue', 'Tim']) print(results)
将在Python 2或Python 3中工作。
在官方文档中,它只支持一个可迭代的参数。在这种情况下,我喜欢使用apply_async。如果是你,我会:
from multiprocessing import Process, Pool, Manager
text = "test"
def harvester(text, case, q = None):
X = case[0]
res = text+ str(X)
if q:
q.put(res)
return res
def block_until(q, results_queue, until_counter=0):
i = 0
while i < until_counter:
results_queue.put(q.get())
i+=1
if __name__ == '__main__':
pool = multiprocessing.Pool(processes=6)
case = RAW_DATASET
m = Manager()
q = m.Queue()
results_queue = m.Queue() # when it completes results will reside in this queue
blocking_process = Process(block_until, (q, results_queue, len(case)))
blocking_process.start()
for c in case:
try:
res = pool.apply_async(harvester, (text, case, q = None))
res.get(timeout=0.1)
except:
pass
blocking_process.join()
将Python 3.3+与pool.starmap()一起使用:
from multiprocessing.dummy import Pool as ThreadPool
def write(i, x):
print(i, "---", x)
a = ["1","2","3"]
b = ["4","5","6"]
pool = ThreadPool(2)
pool.starmap(write, zip(a,b))
pool.close()
pool.join()
结果:
1 --- 4
2 --- 5
3 --- 6
如果您喜欢,还可以zip()更多参数:zip(a,b,c,d,e)
如果希望将常量值作为参数传递:
import itertools
zip(itertools.repeat(constant), a)
如果您的函数应该返回以下内容:
results = pool.starmap(write, zip(a,b))
这将提供一个包含返回值的列表。
Python 2的更好解决方案:
from multiprocessing import Pool
def func((i, (a, b))):
print i, a, b
return a + b
pool = Pool(3)
pool.map(func, [(0,(1,2)), (1,(2,3)), (2,(3, 4))])
输出
2 3 4
1 2 3
0 1 2
out[]:
[3, 5, 7]
更好的方法是使用修饰符,而不是手工编写包装函数。特别是当您有很多函数要映射时,装饰器将通过避免为每个函数编写包装器来节省时间。通常,修饰函数是不可选择的,但是我们可以使用functools来解决它。更多讨论可以在这里找到。
以下是示例:
def unpack_args(func):
from functools import wraps
@wraps(func)
def wrapper(args):
if isinstance(args, dict):
return func(**args)
else:
return func(*args)
return wrapper
@unpack_args
def func(x, y):
return x + y
然后你可以用压缩的参数来映射它:
np, xlist, ylist = 2, range(10), range(10)
pool = Pool(np)
res = pool.map(func, zip(xlist, ylist))
pool.close()
pool.join()
当然,您可能总是在Python3中使用Pool.starmap(>=3.3),正如其他答案中提到的那样。