在Python多处理库中,是否有支持多个参数的pool.map变体?

import multiprocessing

text = "test"

def harvester(text, case):
    X = case[0]
    text + str(X)

if __name__ == '__main__':
    pool = multiprocessing.Pool(processes=6)
    case = RAW_DATASET
    pool.map(harvester(text, case), case, 1)
    pool.close()
    pool.join()

当前回答

这是我用来将多个参数传递给pool.imap fork中使用的单参数函数的例程的示例:

from multiprocessing import Pool

# Wrapper of the function to map:
class makefun:
    def __init__(self, var2):
        self.var2 = var2
    def fun(self, i):
        var2 = self.var2
        return var1[i] + var2

# Couple of variables for the example:
var1 = [1, 2, 3, 5, 6, 7, 8]
var2 = [9, 10, 11, 12]

# Open the pool:
pool = Pool(processes=2)

# Wrapper loop
for j in range(len(var2)):
    # Obtain the function to map
    pool_fun = makefun(var2[j]).fun

    # Fork loop
    for i, value in enumerate(pool.imap(pool_fun, range(len(var1))), 0):
        print(var1[i], '+' ,var2[j], '=', value)

# Close the pool
pool.close()

其他回答

将Python 3.3+与pool.starmap()一起使用:

from multiprocessing.dummy import Pool as ThreadPool 

def write(i, x):
    print(i, "---", x)

a = ["1","2","3"]
b = ["4","5","6"] 

pool = ThreadPool(2)
pool.starmap(write, zip(a,b)) 
pool.close() 
pool.join()

结果:

1 --- 4
2 --- 5
3 --- 6

如果您喜欢,还可以zip()更多参数:zip(a,b,c,d,e)

如果希望将常量值作为参数传递:

import itertools

zip(itertools.repeat(constant), a)

如果您的函数应该返回以下内容:

results = pool.starmap(write, zip(a,b))

这将提供一个包含返回值的列表。

答案取决于版本和情况。最近版本的Python(从3.3开始)的最一般的答案首先由J.F.Sebastian在下面描述。1它使用Pool.starmap方法,接受一系列参数元组。然后,它会自动将每个元组中的参数解包,并将它们传递给给定的函数:

import multiprocessing
from itertools import product

def merge_names(a, b):
    return '{} & {}'.format(a, b)

if __name__ == '__main__':
    names = ['Brown', 'Wilson', 'Bartlett', 'Rivera', 'Molloy', 'Opie']
    with multiprocessing.Pool(processes=3) as pool:
        results = pool.starmap(merge_names, product(names, repeat=2))
    print(results)

# Output: ['Brown & Brown', 'Brown & Wilson', 'Brown & Bartlett', ...

对于早期版本的Python,您需要编写一个助手函数来显式地解包参数。如果要与一起使用,还需要编写一个包装器,将Pool转换为上下文管理器。(感谢穆恩指出了这一点。)

import multiprocessing
from itertools import product
from contextlib import contextmanager

def merge_names(a, b):
    return '{} & {}'.format(a, b)

def merge_names_unpack(args):
    return merge_names(*args)

@contextmanager
def poolcontext(*args, **kwargs):
    pool = multiprocessing.Pool(*args, **kwargs)
    yield pool
    pool.terminate()

if __name__ == '__main__':
    names = ['Brown', 'Wilson', 'Bartlett', 'Rivera', 'Molloy', 'Opie']
    with poolcontext(processes=3) as pool:
        results = pool.map(merge_names_unpack, product(names, repeat=2))
    print(results)

# Output: ['Brown & Brown', 'Brown & Wilson', 'Brown & Bartlett', ...

在更简单的情况下,使用固定的第二个参数,也可以使用partial,但仅在Python 2.7+中使用。

import multiprocessing
from functools import partial
from contextlib import contextmanager

@contextmanager
def poolcontext(*args, **kwargs):
    pool = multiprocessing.Pool(*args, **kwargs)
    yield pool
    pool.terminate()

def merge_names(a, b):
    return '{} & {}'.format(a, b)

if __name__ == '__main__':
    names = ['Brown', 'Wilson', 'Bartlett', 'Rivera', 'Molloy', 'Opie']
    with poolcontext(processes=3) as pool:
        results = pool.map(partial(merge_names, b='Sons'), names)
    print(results)

# Output: ['Brown & Sons', 'Wilson & Sons', 'Bartlett & Sons', ...

1.这大部分都是由他的答案激发的,而他的答案很可能应该被接受。但由于这本书一直停留在顶端,似乎最好为未来读者改进它。

另一个简单的选择是将函数参数包装在元组中,然后包装应该在元组中传递的参数。在处理大量数据时,这可能并不理想。我相信它会为每个元组创建副本。

from multiprocessing import Pool

def f((a,b,c,d)):
    print a,b,c,d
    return a + b + c +d

if __name__ == '__main__':
    p = Pool(10)
    data = [(i+0,i+1,i+2,i+3) for i in xrange(10)]
    print(p.map(f, data))
    p.close()
    p.join()

以某种随机顺序给出输出:

0 1 2 3
1 2 3 4
2 3 4 5
3 4 5 6
4 5 6 7
5 6 7 8
7 8 9 10
6 7 8 9
8 9 10 11
9 10 11 12
[6, 10, 14, 18, 22, 26, 30, 34, 38, 42]

更好的方法是使用修饰符,而不是手工编写包装函数。特别是当您有很多函数要映射时,装饰器将通过避免为每个函数编写包装器来节省时间。通常,修饰函数是不可选择的,但是我们可以使用functools来解决它。更多讨论可以在这里找到。

以下是示例:

def unpack_args(func):
    from functools import wraps
    @wraps(func)
    def wrapper(args):
        if isinstance(args, dict):
            return func(**args)
        else:
            return func(*args)
    return wrapper

@unpack_args
def func(x, y):
    return x + y

然后你可以用压缩的参数来映射它:

np, xlist, ylist = 2, range(10), range(10)
pool = Pool(np)
res = pool.map(func, zip(xlist, ylist))
pool.close()
pool.join()

当然,您可能总是在Python3中使用Pool.starmap(>=3.3),正如其他答案中提到的那样。

这可能是另一种选择。技巧在于包装器函数,它返回传递给pool.map的另一个函数。下面的代码读取一个输入数组,对于其中的每个(唯一)元素,返回该元素在数组中出现的次数(即计数)。例如,如果输入是

np.eye(3) = [ [1. 0. 0.]
              [0. 1. 0.]
              [0. 0. 1.]]

然后零出现6次,一出现3次

import numpy as np
from multiprocessing.dummy import Pool as ThreadPool
from multiprocessing import cpu_count


def extract_counts(label_array):
    labels = np.unique(label_array)
    out = extract_counts_helper([label_array], labels)
    return out

def extract_counts_helper(args, labels):
    n = max(1, cpu_count() - 1)
    pool = ThreadPool(n)
    results = {}
    pool.map(wrapper(args, results), labels)
    pool.close()
    pool.join()
    return results

def wrapper(argsin, results):
    def inner_fun(label):
        label_array = argsin[0]
        counts = get_label_counts(label_array, label)
        results[label] = counts
    return inner_fun

def get_label_counts(label_array, label):
    return sum(label_array.flatten() == label)

if __name__ == "__main__":
    img = np.ones([2,2])
    out = extract_counts(img)
    print('input array: \n', img)
    print('label counts: ', out)
    print("========")
           
    img = np.eye(3)
    out = extract_counts(img)
    print('input array: \n', img)
    print('label counts: ', out)
    print("========")
    
    img = np.random.randint(5, size=(3, 3))
    out = extract_counts(img)
    print('input array: \n', img)
    print('label counts: ', out)
    print("========")

你应该得到:

input array: 
 [[1. 1.]
 [1. 1.]]
label counts:  {1.0: 4}
========
input array: 
 [[1. 0. 0.]
 [0. 1. 0.]
 [0. 0. 1.]]
label counts:  {0.0: 6, 1.0: 3}
========
input array: 
 [[4 4 0]
 [2 4 3]
 [2 3 1]]
label counts:  {0: 1, 1: 1, 2: 2, 3: 2, 4: 3}
========