在Python多处理库中,是否有支持多个参数的pool.map变体?

import multiprocessing

text = "test"

def harvester(text, case):
    X = case[0]
    text + str(X)

if __name__ == '__main__':
    pool = multiprocessing.Pool(processes=6)
    case = RAW_DATASET
    pool.map(harvester(text, case), case, 1)
    pool.close()
    pool.join()

当前回答

另一个简单的选择是将函数参数包装在元组中,然后包装应该在元组中传递的参数。在处理大量数据时,这可能并不理想。我相信它会为每个元组创建副本。

from multiprocessing import Pool

def f((a,b,c,d)):
    print a,b,c,d
    return a + b + c +d

if __name__ == '__main__':
    p = Pool(10)
    data = [(i+0,i+1,i+2,i+3) for i in xrange(10)]
    print(p.map(f, data))
    p.close()
    p.join()

以某种随机顺序给出输出:

0 1 2 3
1 2 3 4
2 3 4 5
3 4 5 6
4 5 6 7
5 6 7 8
7 8 9 10
6 7 8 9
8 9 10 11
9 10 11 12
[6, 10, 14, 18, 22, 26, 30, 34, 38, 42]

其他回答

在J.F.Sebastian的回答中了解了itertools之后,我决定更进一步,编写一个关注并行化的parmap包,在Python 2.7和Python 3.2(以及更高版本)中提供可以接受任意数量位置参数的map和starmap函数。

安装

pip install parmap

如何并行化:

import parmap
# If you want to do:
y = [myfunction(x, argument1, argument2) for x in mylist]
# In parallel:
y = parmap.map(myfunction, mylist, argument1, argument2)

# If you want to do:
z = [myfunction(x, y, argument1, argument2) for (x,y) in mylist]
# In parallel:
z = parmap.starmap(myfunction, mylist, argument1, argument2)

# If you want to do:
listx = [1, 2, 3, 4, 5, 6]
listy = [2, 3, 4, 5, 6, 7]
param = 3.14
param2 = 42
listz = []
for (x, y) in zip(listx, listy):
        listz.append(myfunction(x, y, param1, param2))
# In parallel:
listz = parmap.starmap(myfunction, zip(listx, listy), param1, param2)

我已经将parmap上传到PyPI和GitHub存储库。

例如,问题的答案如下:

import parmap

def harvester(case, text):
    X = case[0]
    text+ str(X)

if __name__ == "__main__":
    case = RAW_DATASET  # assuming this is an iterable
    parmap.map(harvester, case, "test", chunksize=1)

这可能是另一种选择。技巧在于包装器函数,它返回传递给pool.map的另一个函数。下面的代码读取一个输入数组,对于其中的每个(唯一)元素,返回该元素在数组中出现的次数(即计数)。例如,如果输入是

np.eye(3) = [ [1. 0. 0.]
              [0. 1. 0.]
              [0. 0. 1.]]

然后零出现6次,一出现3次

import numpy as np
from multiprocessing.dummy import Pool as ThreadPool
from multiprocessing import cpu_count


def extract_counts(label_array):
    labels = np.unique(label_array)
    out = extract_counts_helper([label_array], labels)
    return out

def extract_counts_helper(args, labels):
    n = max(1, cpu_count() - 1)
    pool = ThreadPool(n)
    results = {}
    pool.map(wrapper(args, results), labels)
    pool.close()
    pool.join()
    return results

def wrapper(argsin, results):
    def inner_fun(label):
        label_array = argsin[0]
        counts = get_label_counts(label_array, label)
        results[label] = counts
    return inner_fun

def get_label_counts(label_array, label):
    return sum(label_array.flatten() == label)

if __name__ == "__main__":
    img = np.ones([2,2])
    out = extract_counts(img)
    print('input array: \n', img)
    print('label counts: ', out)
    print("========")
           
    img = np.eye(3)
    out = extract_counts(img)
    print('input array: \n', img)
    print('label counts: ', out)
    print("========")
    
    img = np.random.randint(5, size=(3, 3))
    out = extract_counts(img)
    print('input array: \n', img)
    print('label counts: ', out)
    print("========")

你应该得到:

input array: 
 [[1. 1.]
 [1. 1.]]
label counts:  {1.0: 4}
========
input array: 
 [[1. 0. 0.]
 [0. 1. 0.]
 [0. 0. 1.]]
label counts:  {0.0: 6, 1.0: 3}
========
input array: 
 [[4 4 0]
 [2 4 3]
 [2 3 1]]
label counts:  {0: 1, 1: 1, 2: 2, 3: 2, 4: 3}
========

另一个简单的选择是将函数参数包装在元组中,然后包装应该在元组中传递的参数。在处理大量数据时,这可能并不理想。我相信它会为每个元组创建副本。

from multiprocessing import Pool

def f((a,b,c,d)):
    print a,b,c,d
    return a + b + c +d

if __name__ == '__main__':
    p = Pool(10)
    data = [(i+0,i+1,i+2,i+3) for i in xrange(10)]
    print(p.map(f, data))
    p.close()
    p.join()

以某种随机顺序给出输出:

0 1 2 3
1 2 3 4
2 3 4 5
3 4 5 6
4 5 6 7
5 6 7 8
7 8 9 10
6 7 8 9
8 9 10 11
9 10 11 12
[6, 10, 14, 18, 22, 26, 30, 34, 38, 42]

Python 2的更好解决方案:

from multiprocessing import Pool
def func((i, (a, b))):
    print i, a, b
    return a + b
pool = Pool(3)
pool.map(func, [(0,(1,2)), (1,(2,3)), (2,(3, 4))])

输出

2 3 4

1 2 3

0 1 2

out[]:

[3, 5, 7]

这里有另一种方法,IMHO比提供的任何其他答案都更简单和优雅。

该程序有一个函数,它获取两个参数,打印它们并打印总和:

import multiprocessing

def main():

    with multiprocessing.Pool(10) as pool:
        params = [ (2, 2), (3, 3), (4, 4) ]
        pool.starmap(printSum, params)
    # end with

# end function

def printSum(num1, num2):
    mySum = num1 + num2
    print('num1 = ' + str(num1) + ', num2 = ' + str(num2) + ', sum = ' + str(mySum))
# end function

if __name__ == '__main__':
    main()

输出为:

num1 = 2, num2 = 2, sum = 4
num1 = 3, num2 = 3, sum = 6
num1 = 4, num2 = 4, sum = 8

有关更多信息,请参阅python文档:

https://docs.python.org/3/library/multiprocessing.html#module-多处理工具

特别是要检查星图功能。

我使用的是Python 3.6,我不确定这是否适用于较旧的Python版本

为什么在文档中没有这样一个非常直接的例子,我不确定。