在Python多处理库中,是否有支持多个参数的pool.map变体?

import multiprocessing

text = "test"

def harvester(text, case):
    X = case[0]
    text + str(X)

if __name__ == '__main__':
    pool = multiprocessing.Pool(processes=6)
    case = RAW_DATASET
    pool.map(harvester(text, case), case, 1)
    pool.close()
    pool.join()

当前回答

另一个简单的选择是将函数参数包装在元组中,然后包装应该在元组中传递的参数。在处理大量数据时,这可能并不理想。我相信它会为每个元组创建副本。

from multiprocessing import Pool

def f((a,b,c,d)):
    print a,b,c,d
    return a + b + c +d

if __name__ == '__main__':
    p = Pool(10)
    data = [(i+0,i+1,i+2,i+3) for i in xrange(10)]
    print(p.map(f, data))
    p.close()
    p.join()

以某种随机顺序给出输出:

0 1 2 3
1 2 3 4
2 3 4 5
3 4 5 6
4 5 6 7
5 6 7 8
7 8 9 10
6 7 8 9
8 9 10 11
9 10 11 12
[6, 10, 14, 18, 22, 26, 30, 34, 38, 42]

其他回答

更好的方法是使用修饰符,而不是手工编写包装函数。特别是当您有很多函数要映射时,装饰器将通过避免为每个函数编写包装器来节省时间。通常,修饰函数是不可选择的,但是我们可以使用functools来解决它。更多讨论可以在这里找到。

以下是示例:

def unpack_args(func):
    from functools import wraps
    @wraps(func)
    def wrapper(args):
        if isinstance(args, dict):
            return func(**args)
        else:
            return func(*args)
    return wrapper

@unpack_args
def func(x, y):
    return x + y

然后你可以用压缩的参数来映射它:

np, xlist, ylist = 2, range(10), range(10)
pool = Pool(np)
res = pool.map(func, zip(xlist, ylist))
pool.close()
pool.join()

当然,您可能总是在Python3中使用Pool.starmap(>=3.3),正如其他答案中提到的那样。

在官方文档中,它只支持一个可迭代的参数。在这种情况下,我喜欢使用apply_async。如果是你,我会:

from multiprocessing import Process, Pool, Manager

text = "test"
def harvester(text, case, q = None):
 X = case[0]
 res = text+ str(X)
 if q:
  q.put(res)
 return res


def block_until(q, results_queue, until_counter=0):
 i = 0
 while i < until_counter:
  results_queue.put(q.get())
  i+=1

if __name__ == '__main__':
 pool = multiprocessing.Pool(processes=6)
 case = RAW_DATASET
 m = Manager()
 q = m.Queue()
 results_queue = m.Queue() # when it completes results will reside in this queue
 blocking_process = Process(block_until, (q, results_queue, len(case)))
 blocking_process.start()
 for c in case:
  try:
   res = pool.apply_async(harvester, (text, case, q = None))
   res.get(timeout=0.1)
  except:
   pass
 blocking_process.join()

这里有另一种方法,IMHO比提供的任何其他答案都更简单和优雅。

该程序有一个函数,它获取两个参数,打印它们并打印总和:

import multiprocessing

def main():

    with multiprocessing.Pool(10) as pool:
        params = [ (2, 2), (3, 3), (4, 4) ]
        pool.starmap(printSum, params)
    # end with

# end function

def printSum(num1, num2):
    mySum = num1 + num2
    print('num1 = ' + str(num1) + ', num2 = ' + str(num2) + ', sum = ' + str(mySum))
# end function

if __name__ == '__main__':
    main()

输出为:

num1 = 2, num2 = 2, sum = 4
num1 = 3, num2 = 3, sum = 6
num1 = 4, num2 = 4, sum = 8

有关更多信息,请参阅python文档:

https://docs.python.org/3/library/multiprocessing.html#module-多处理工具

特别是要检查星图功能。

我使用的是Python 3.6,我不确定这是否适用于较旧的Python版本

为什么在文档中没有这样一个非常直接的例子,我不确定。

如何获取多个参数:

def f1(args):
    a, b, c = args[0] , args[1] , args[2]
    return a+b+c

if __name__ == "__main__":
    import multiprocessing
    pool = multiprocessing.Pool(4) 

    result1 = pool.map(f1, [ [1,2,3] ])
    print(result1)
import time
from multiprocessing import Pool


def f1(args):
    vfirst, vsecond, vthird = args[0] , args[1] , args[2]
    print(f'First Param: {vfirst}, Second value: {vsecond} and finally third value is: {vthird}')
    pass


if __name__ == '__main__':
    p = Pool()
    result = p.map(f1, [['Dog','Cat','Mouse']])
    p.close()
    p.join()
    print(result)