在Python多处理库中,是否有支持多个参数的pool.map变体?

import multiprocessing

text = "test"

def harvester(text, case):
    X = case[0]
    text + str(X)

if __name__ == '__main__':
    pool = multiprocessing.Pool(processes=6)
    case = RAW_DATASET
    pool.map(harvester(text, case), case, 1)
    pool.close()
    pool.join()

当前回答

将所有参数存储为元组数组。

该示例表示,通常调用函数为:

def mainImage(fragCoord: vec2, iResolution: vec3, iTime: float) -> vec3:

而是传递一个元组并解压缩参数:

def mainImage(package_iter) -> vec3:
    fragCoord = package_iter[0]
    iResolution = package_iter[1]
    iTime = package_iter[2]

预先使用循环构建元组:

package_iter = []
iResolution = vec3(nx, ny, 1)
for j in range((ny-1), -1, -1):
    for i in range(0, nx, 1):
        fragCoord: vec2 = vec2(i, j)
        time_elapsed_seconds = 10
        package_iter.append((fragCoord, iResolution, time_elapsed_seconds))

然后通过传递元组数组来执行所有using map:

array_rgb_values = []

with concurrent.futures.ProcessPoolExecutor() as executor:
    for val in executor.map(mainImage, package_iter):
        fragColor = val
        ir = clip(int(255* fragColor.r), 0, 255)
        ig = clip(int(255* fragColor.g), 0, 255)
        ib = clip(int(255* fragColor.b), 0, 255)

        array_rgb_values.append((ir, ig, ib))

我知道Python有*和**用于开箱,但我还没有尝试过。

使用高级库并发期货也比使用低级多处理库更好。

其他回答

将Python 3.3+与pool.starmap()一起使用:

from multiprocessing.dummy import Pool as ThreadPool 

def write(i, x):
    print(i, "---", x)

a = ["1","2","3"]
b = ["4","5","6"] 

pool = ThreadPool(2)
pool.starmap(write, zip(a,b)) 
pool.close() 
pool.join()

结果:

1 --- 4
2 --- 5
3 --- 6

如果您喜欢,还可以zip()更多参数:zip(a,b,c,d,e)

如果希望将常量值作为参数传递:

import itertools

zip(itertools.repeat(constant), a)

如果您的函数应该返回以下内容:

results = pool.starmap(write, zip(a,b))

这将提供一个包含返回值的列表。

这里有很多答案,但似乎没有一个能提供适用于任何版本的Python 2/3兼容代码。如果您希望代码能够正常工作,这将适用于以下任一Python版本:

# For python 2/3 compatibility, define pool context manager
# to support the 'with' statement in Python 2
if sys.version_info[0] == 2:
    from contextlib import contextmanager
    @contextmanager
    def multiprocessing_context(*args, **kwargs):
        pool = multiprocessing.Pool(*args, **kwargs)
        yield pool
        pool.terminate()
else:
    multiprocessing_context = multiprocessing.Pool

之后,您可以使用常规的Python3方式进行多处理。例如:

def _function_to_run_for_each(x):
       return x.lower()
with multiprocessing_context(processes=3) as pool:
    results = pool.map(_function_to_run_for_each, ['Bob', 'Sue', 'Tim'])    print(results)

将在Python 2或Python 3中工作。

对我来说,以下是一个简单明了的解决方案:

from multiprocessing.pool import ThreadPool
from functools import partial
from time import sleep
from random import randint

def dosomething(var,s):
    sleep(randint(1,5))
    print(var)
    return var + s

array = ["a", "b", "c", "d", "e"]
with ThreadPool(processes=5) as pool:
    resp_ = pool.map(partial(dosomething,s="2"), array)
    print(resp_)

输出:

a
b
d
e
c
['a2', 'b2', 'c2', 'd2', 'e2']

另一种方法是将列表列表传递给单参数例程:

import os
from multiprocessing import Pool

def task(args):
    print "PID =", os.getpid(), ", arg1 =", args[0], ", arg2 =", args[1]

pool = Pool()

pool.map(task, [
        [1,2],
        [3,4],
        [5,6],
        [7,8]
    ])

然后可以用自己喜欢的方法构造一个参数列表。

将所有参数存储为元组数组。

该示例表示,通常调用函数为:

def mainImage(fragCoord: vec2, iResolution: vec3, iTime: float) -> vec3:

而是传递一个元组并解压缩参数:

def mainImage(package_iter) -> vec3:
    fragCoord = package_iter[0]
    iResolution = package_iter[1]
    iTime = package_iter[2]

预先使用循环构建元组:

package_iter = []
iResolution = vec3(nx, ny, 1)
for j in range((ny-1), -1, -1):
    for i in range(0, nx, 1):
        fragCoord: vec2 = vec2(i, j)
        time_elapsed_seconds = 10
        package_iter.append((fragCoord, iResolution, time_elapsed_seconds))

然后通过传递元组数组来执行所有using map:

array_rgb_values = []

with concurrent.futures.ProcessPoolExecutor() as executor:
    for val in executor.map(mainImage, package_iter):
        fragColor = val
        ir = clip(int(255* fragColor.r), 0, 255)
        ig = clip(int(255* fragColor.g), 0, 255)
        ib = clip(int(255* fragColor.b), 0, 255)

        array_rgb_values.append((ir, ig, ib))

我知道Python有*和**用于开箱,但我还没有尝试过。

使用高级库并发期货也比使用低级多处理库更好。