在Python多处理库中,是否有支持多个参数的pool.map变体?

import multiprocessing

text = "test"

def harvester(text, case):
    X = case[0]
    text + str(X)

if __name__ == '__main__':
    pool = multiprocessing.Pool(processes=6)
    case = RAW_DATASET
    pool.map(harvester(text, case), case, 1)
    pool.close()
    pool.join()

当前回答

在J.F.Sebastian的回答中了解了itertools之后,我决定更进一步,编写一个关注并行化的parmap包,在Python 2.7和Python 3.2(以及更高版本)中提供可以接受任意数量位置参数的map和starmap函数。

安装

pip install parmap

如何并行化:

import parmap
# If you want to do:
y = [myfunction(x, argument1, argument2) for x in mylist]
# In parallel:
y = parmap.map(myfunction, mylist, argument1, argument2)

# If you want to do:
z = [myfunction(x, y, argument1, argument2) for (x,y) in mylist]
# In parallel:
z = parmap.starmap(myfunction, mylist, argument1, argument2)

# If you want to do:
listx = [1, 2, 3, 4, 5, 6]
listy = [2, 3, 4, 5, 6, 7]
param = 3.14
param2 = 42
listz = []
for (x, y) in zip(listx, listy):
        listz.append(myfunction(x, y, param1, param2))
# In parallel:
listz = parmap.starmap(myfunction, zip(listx, listy), param1, param2)

我已经将parmap上传到PyPI和GitHub存储库。

例如,问题的答案如下:

import parmap

def harvester(case, text):
    X = case[0]
    text+ str(X)

if __name__ == "__main__":
    case = RAW_DATASET  # assuming this is an iterable
    parmap.map(harvester, case, "test", chunksize=1)

其他回答

在官方文档中,它只支持一个可迭代的参数。在这种情况下,我喜欢使用apply_async。如果是你,我会:

from multiprocessing import Process, Pool, Manager

text = "test"
def harvester(text, case, q = None):
 X = case[0]
 res = text+ str(X)
 if q:
  q.put(res)
 return res


def block_until(q, results_queue, until_counter=0):
 i = 0
 while i < until_counter:
  results_queue.put(q.get())
  i+=1

if __name__ == '__main__':
 pool = multiprocessing.Pool(processes=6)
 case = RAW_DATASET
 m = Manager()
 q = m.Queue()
 results_queue = m.Queue() # when it completes results will reside in this queue
 blocking_process = Process(block_until, (q, results_queue, len(case)))
 blocking_process.start()
 for c in case:
  try:
   res = pool.apply_async(harvester, (text, case, q = None))
   res.get(timeout=0.1)
  except:
   pass
 blocking_process.join()

对我来说,以下是一个简单明了的解决方案:

from multiprocessing.pool import ThreadPool
from functools import partial
from time import sleep
from random import randint

def dosomething(var,s):
    sleep(randint(1,5))
    print(var)
    return var + s

array = ["a", "b", "c", "d", "e"]
with ThreadPool(processes=5) as pool:
    resp_ = pool.map(partial(dosomething,s="2"), array)
    print(resp_)

输出:

a
b
d
e
c
['a2', 'b2', 'c2', 'd2', 'e2']

在J.F.Sebastian的回答中了解了itertools之后,我决定更进一步,编写一个关注并行化的parmap包,在Python 2.7和Python 3.2(以及更高版本)中提供可以接受任意数量位置参数的map和starmap函数。

安装

pip install parmap

如何并行化:

import parmap
# If you want to do:
y = [myfunction(x, argument1, argument2) for x in mylist]
# In parallel:
y = parmap.map(myfunction, mylist, argument1, argument2)

# If you want to do:
z = [myfunction(x, y, argument1, argument2) for (x,y) in mylist]
# In parallel:
z = parmap.starmap(myfunction, mylist, argument1, argument2)

# If you want to do:
listx = [1, 2, 3, 4, 5, 6]
listy = [2, 3, 4, 5, 6, 7]
param = 3.14
param2 = 42
listz = []
for (x, y) in zip(listx, listy):
        listz.append(myfunction(x, y, param1, param2))
# In parallel:
listz = parmap.starmap(myfunction, zip(listx, listy), param1, param2)

我已经将parmap上传到PyPI和GitHub存储库。

例如,问题的答案如下:

import parmap

def harvester(case, text):
    X = case[0]
    text+ str(X)

if __name__ == "__main__":
    case = RAW_DATASET  # assuming this is an iterable
    parmap.map(harvester, case, "test", chunksize=1)

这里有很多答案,但似乎没有一个能提供适用于任何版本的Python 2/3兼容代码。如果您希望代码能够正常工作,这将适用于以下任一Python版本:

# For python 2/3 compatibility, define pool context manager
# to support the 'with' statement in Python 2
if sys.version_info[0] == 2:
    from contextlib import contextmanager
    @contextmanager
    def multiprocessing_context(*args, **kwargs):
        pool = multiprocessing.Pool(*args, **kwargs)
        yield pool
        pool.terminate()
else:
    multiprocessing_context = multiprocessing.Pool

之后,您可以使用常规的Python3方式进行多处理。例如:

def _function_to_run_for_each(x):
       return x.lower()
with multiprocessing_context(processes=3) as pool:
    results = pool.map(_function_to_run_for_each, ['Bob', 'Sue', 'Tim'])    print(results)

将在Python 2或Python 3中工作。

这里有另一种方法,IMHO比提供的任何其他答案都更简单和优雅。

该程序有一个函数,它获取两个参数,打印它们并打印总和:

import multiprocessing

def main():

    with multiprocessing.Pool(10) as pool:
        params = [ (2, 2), (3, 3), (4, 4) ]
        pool.starmap(printSum, params)
    # end with

# end function

def printSum(num1, num2):
    mySum = num1 + num2
    print('num1 = ' + str(num1) + ', num2 = ' + str(num2) + ', sum = ' + str(mySum))
# end function

if __name__ == '__main__':
    main()

输出为:

num1 = 2, num2 = 2, sum = 4
num1 = 3, num2 = 3, sum = 6
num1 = 4, num2 = 4, sum = 8

有关更多信息,请参阅python文档:

https://docs.python.org/3/library/multiprocessing.html#module-多处理工具

特别是要检查星图功能。

我使用的是Python 3.6,我不确定这是否适用于较旧的Python版本

为什么在文档中没有这样一个非常直接的例子,我不确定。