在Python多处理库中,是否有支持多个参数的pool.map变体?

import multiprocessing

text = "test"

def harvester(text, case):
    X = case[0]
    text + str(X)

if __name__ == '__main__':
    pool = multiprocessing.Pool(processes=6)
    case = RAW_DATASET
    pool.map(harvester(text, case), case, 1)
    pool.close()
    pool.join()

当前回答

pool.map是否有支持多个参数的变体?

Python 3.3包含pool.starmap()方法:

#!/usr/bin/env python3
from functools import partial
from itertools import repeat
from multiprocessing import Pool, freeze_support

def func(a, b):
    return a + b

def main():
    a_args = [1,2,3]
    second_arg = 1
    with Pool() as pool:
        L = pool.starmap(func, [(1, 1), (2, 1), (3, 1)])
        M = pool.starmap(func, zip(a_args, repeat(second_arg)))
        N = pool.map(partial(func, b=second_arg), a_args)
        assert L == M == N

if __name__=="__main__":
    freeze_support()
    main()

对于旧版本:

#!/usr/bin/env python2
import itertools
from multiprocessing import Pool, freeze_support

def func(a, b):
    print a, b

def func_star(a_b):
    """Convert `f([1,2])` to `f(1,2)` call."""
    return func(*a_b)

def main():
    pool = Pool()
    a_args = [1,2,3]
    second_arg = 1
    pool.map(func_star, itertools.izip(a_args, itertools.repeat(second_arg)))

if __name__=="__main__":
    freeze_support()
    main()

输出

1 1
2 1
3 1

注意这里是如何使用itertools.izip()和itertools.crepeat()的。

由于@unsubu提到的错误,您不能在Python 2.6上使用functools.partial()或类似功能,因此应该显式定义简单包装函数func_tar()。另请参阅uptimebox建议的解决方法。

其他回答

更好的方法是使用修饰符,而不是手工编写包装函数。特别是当您有很多函数要映射时,装饰器将通过避免为每个函数编写包装器来节省时间。通常,修饰函数是不可选择的,但是我们可以使用functools来解决它。更多讨论可以在这里找到。

以下是示例:

def unpack_args(func):
    from functools import wraps
    @wraps(func)
    def wrapper(args):
        if isinstance(args, dict):
            return func(**args)
        else:
            return func(*args)
    return wrapper

@unpack_args
def func(x, y):
    return x + y

然后你可以用压缩的参数来映射它:

np, xlist, ylist = 2, range(10), range(10)
pool = Pool(np)
res = pool.map(func, zip(xlist, ylist))
pool.close()
pool.join()

当然,您可能总是在Python3中使用Pool.starmap(>=3.3),正如其他答案中提到的那样。

这里有另一种方法,IMHO比提供的任何其他答案都更简单和优雅。

该程序有一个函数,它获取两个参数,打印它们并打印总和:

import multiprocessing

def main():

    with multiprocessing.Pool(10) as pool:
        params = [ (2, 2), (3, 3), (4, 4) ]
        pool.starmap(printSum, params)
    # end with

# end function

def printSum(num1, num2):
    mySum = num1 + num2
    print('num1 = ' + str(num1) + ', num2 = ' + str(num2) + ', sum = ' + str(mySum))
# end function

if __name__ == '__main__':
    main()

输出为:

num1 = 2, num2 = 2, sum = 4
num1 = 3, num2 = 3, sum = 6
num1 = 4, num2 = 4, sum = 8

有关更多信息,请参阅python文档:

https://docs.python.org/3/library/multiprocessing.html#module-多处理工具

特别是要检查星图功能。

我使用的是Python 3.6,我不确定这是否适用于较旧的Python版本

为什么在文档中没有这样一个非常直接的例子,我不确定。

在官方文档中,它只支持一个可迭代的参数。在这种情况下,我喜欢使用apply_async。如果是你,我会:

from multiprocessing import Process, Pool, Manager

text = "test"
def harvester(text, case, q = None):
 X = case[0]
 res = text+ str(X)
 if q:
  q.put(res)
 return res


def block_until(q, results_queue, until_counter=0):
 i = 0
 while i < until_counter:
  results_queue.put(q.get())
  i+=1

if __name__ == '__main__':
 pool = multiprocessing.Pool(processes=6)
 case = RAW_DATASET
 m = Manager()
 q = m.Queue()
 results_queue = m.Queue() # when it completes results will reside in this queue
 blocking_process = Process(block_until, (q, results_queue, len(case)))
 blocking_process.start()
 for c in case:
  try:
   res = pool.apply_async(harvester, (text, case, q = None))
   res.get(timeout=0.1)
  except:
   pass
 blocking_process.join()

将所有参数存储为元组数组。

该示例表示,通常调用函数为:

def mainImage(fragCoord: vec2, iResolution: vec3, iTime: float) -> vec3:

而是传递一个元组并解压缩参数:

def mainImage(package_iter) -> vec3:
    fragCoord = package_iter[0]
    iResolution = package_iter[1]
    iTime = package_iter[2]

预先使用循环构建元组:

package_iter = []
iResolution = vec3(nx, ny, 1)
for j in range((ny-1), -1, -1):
    for i in range(0, nx, 1):
        fragCoord: vec2 = vec2(i, j)
        time_elapsed_seconds = 10
        package_iter.append((fragCoord, iResolution, time_elapsed_seconds))

然后通过传递元组数组来执行所有using map:

array_rgb_values = []

with concurrent.futures.ProcessPoolExecutor() as executor:
    for val in executor.map(mainImage, package_iter):
        fragColor = val
        ir = clip(int(255* fragColor.r), 0, 255)
        ig = clip(int(255* fragColor.g), 0, 255)
        ib = clip(int(255* fragColor.b), 0, 255)

        array_rgb_values.append((ir, ig, ib))

我知道Python有*和**用于开箱,但我还没有尝试过。

使用高级库并发期货也比使用低级多处理库更好。

对我来说,以下是一个简单明了的解决方案:

from multiprocessing.pool import ThreadPool
from functools import partial
from time import sleep
from random import randint

def dosomething(var,s):
    sleep(randint(1,5))
    print(var)
    return var + s

array = ["a", "b", "c", "d", "e"]
with ThreadPool(processes=5) as pool:
    resp_ = pool.map(partial(dosomething,s="2"), array)
    print(resp_)

输出:

a
b
d
e
c
['a2', 'b2', 'c2', 'd2', 'e2']