我正在学习如何使用Python的多处理包,但我不明白map_async和imap之间的区别。 我注意到map_async和imap都是异步执行的。那么什么时候我应该用一种而不是另一种呢?我应该如何检索map_async返回的结果?

我应该用这样的东西吗?

def test():
    result = pool.map_async()
    pool.close()
    pool.join()
    return result.get()

result=test()
for i in result:
    print i

当前回答

The accepted answer states that for imap_unordered "results will be yielded as soon as they're ready" where one might possibly infer that results will be returned in the order of completion. But I just want to make it clear that this is not true in general. The documentation states that the results are returned in arbitrary order. Consider the following program that uses a pool size of 4, an iterable size of 20 and a chunksize value of 5. The worker function sleeps a variable amount of time depending on its passed argument, which also ensures that no one process in the pool grabs all the submitted tasks. Thus I expect each process in the pool to have 20 / 4 = 5 tasks to process:

from multiprocessing import Pool
import time

def worker(x):
    print(f'x = {x}', flush=True)
    time.sleep(.1 * (20 - x))
    # return approximate completion time with passed argument:
    return time.time(), x

if __name__ == '__main__':
    pool = Pool(4)
    results = pool.imap_unordered(worker, range(20), chunksize=5)
    for t, x in results:
        print('result:', t, x)

打印:

x = 0
x = 5
x = 10
x = 15
x = 16
x = 17
x = 11
x = 18
x = 19
x = 6
result: 1621512513.7737606 15
result: 1621512514.1747007 16
result: 1621512514.4758775 17
result: 1621512514.675989 18
result: 1621512514.7766125 19
x = 12
x = 1
x = 13
x = 7
x = 14
x = 2
result: 1621512514.2716103 10
result: 1621512515.1721854 11
result: 1621512515.9727488 12
result: 1621512516.6744206 13
result: 1621512517.276999 14
x = 8
x = 9
x = 3
result: 1621512514.7695887 5
result: 1621512516.170747 6
result: 1621512517.4713914 7
result: 1621512518.6734042 8
result: 1621512519.7743165 9
x = 4
result: 1621512515.268784 0
result: 1621512517.1698637 1
result: 1621512518.9698756 2
result: 1621512520.671273 3
result: 1621512522.2716706 4

您可以清楚地看到,这些结果不是按完成顺序产生的。例如,我已经返回了1621512519.7743165 9,然后是1621512515.268784 0,这是由工作者函数返回的,比之前返回的结果早4秒多。然而,如果我将chunksize值改为1,打印输出就变成:

x = 0
x = 1
x = 2
x = 3
x = 4
result: 1621513028.888357 3
x = 5
result: 1621513028.9863524 2
x = 6
result: 1621513029.0838938 1
x = 7
result: 1621513029.1825204 0
x = 8
result: 1621513030.4842813 7
x = 9
result: 1621513030.4852195 6
x = 10
result: 1621513030.4872172 5
x = 11
result: 1621513030.4892178 4
x = 12
result: 1621513031.3908074 11
x = 13
result: 1621513031.4895358 10
x = 14
result: 1621513031.587289 9
x = 15
result: 1621513031.686152 8
x = 16
result: 1621513032.1877549 15
x = 17
result: 1621513032.1896958 14
x = 18
result: 1621513032.1923752 13
x = 19
result: 1621513032.1923752 12
result: 1621513032.2935638 19
result: 1621513032.3927407 18
result: 1621513032.4912949 17
result: 1621513032.5884912 16

这是完成顺序。但是,如果指定的chunksize值为1,我不愿意说imap_unordered总是会在结果可用时返回结果,尽管根据这个实验似乎就是这种情况,因为文档没有这样的声明。

讨论

当指定chunksize为5时,将把20个任务放在单个输入队列上,以便池中的4个进程以大小为5的chunk进行处理。因此,空闲的进程将从队列中取出5个任务的下一个块,并在再次空闲之前依次处理每个任务。因此,第一个进程将处理x个参数0到4,第二个进程处理x个参数5到9,等等。这就是为什么你看到初始x值打印为0、5、10和15。

但是,当x参数0的结果在x参数9的结果之前完成时,结果将被作为块一起写入,因此x参数0的结果将不会返回,直到在同一块中排队的x参数的结果(即1、2、3和4)也可用。

其他回答

imap/imap_unordered和map/map_async之间有两个关键区别:

它们消耗你传递给它们的可迭代对象的方式。 他们把结果返回给你的方式。

map consumes your iterable by converting the iterable to a list (assuming it isn't a list already), breaking it into chunks, and sending those chunks to the worker processes in the Pool. Breaking the iterable into chunks performs better than passing each item in the iterable between processes one item at a time - particularly if the iterable is large. However, turning the iterable into a list in order to chunk it can have a very high memory cost, since the entire list will need to be kept in memory.

Imap不会把你给它的可迭代对象转换成一个列表,也不会把它分解成块(默认情况下)。它将每次迭代可迭代的一个元素,并将它们每个发送到工作进程。这意味着将整个可迭代对象转换为列表不会占用内存,但这也意味着大型可迭代对象的性能较慢,因为缺少分块。但是,可以通过传递一个大于默认值1的chunksize参数来缓解这一问题。

The other major difference between imap/imap_unordered and map/map_async, is that with imap/imap_unordered, you can start receiving results from workers as soon as they're ready, rather than having to wait for all of them to be finished. With map_async, an AsyncResult is returned right away, but you can't actually retrieve results from that object until all of them have been processed, at which points it returns the same list that map does (map is actually implemented internally as map_async(...).get()). There's no way to get partial results; you either have the entire result, or nothing.

Imap和imap_unordered都立即返回可迭代对象。使用imap,只要迭代对象准备好,结果就会立即从迭代对象中产生,同时仍然保留输入迭代对象的顺序。使用imap_unordered,只要结果准备好就会产生,而不管输入可迭代对象的顺序如何。假设你有这个:

import multiprocessing
import time

def func(x):
    time.sleep(x)
    return x + 2

if __name__ == "__main__":    
    p = multiprocessing.Pool()
    start = time.time()
    for x in p.imap(func, [1,5,3]):
        print("{} (Time elapsed: {}s)".format(x, int(time.time() - start)))

这将输出:

3 (Time elapsed: 1s)
7 (Time elapsed: 5s)
5 (Time elapsed: 5s)

如果你使用p.imap_unordered而不是p.imap,你会看到:

3 (Time elapsed: 1s)
5 (Time elapsed: 3s)
7 (Time elapsed: 5s)

如果你使用p.map或p.map_async().get(),你会看到:

3 (Time elapsed: 5s)
7 (Time elapsed: 5s)
5 (Time elapsed: 5s)

因此,使用imap/imap_unordered而不是map_async的主要原因是:

你的iterable足够大,以至于将它转换成一个列表会导致你耗尽/使用太多的内存。 您希望能够在完成所有结果之前开始处理结果。

The accepted answer states that for imap_unordered "results will be yielded as soon as they're ready" where one might possibly infer that results will be returned in the order of completion. But I just want to make it clear that this is not true in general. The documentation states that the results are returned in arbitrary order. Consider the following program that uses a pool size of 4, an iterable size of 20 and a chunksize value of 5. The worker function sleeps a variable amount of time depending on its passed argument, which also ensures that no one process in the pool grabs all the submitted tasks. Thus I expect each process in the pool to have 20 / 4 = 5 tasks to process:

from multiprocessing import Pool
import time

def worker(x):
    print(f'x = {x}', flush=True)
    time.sleep(.1 * (20 - x))
    # return approximate completion time with passed argument:
    return time.time(), x

if __name__ == '__main__':
    pool = Pool(4)
    results = pool.imap_unordered(worker, range(20), chunksize=5)
    for t, x in results:
        print('result:', t, x)

打印:

x = 0
x = 5
x = 10
x = 15
x = 16
x = 17
x = 11
x = 18
x = 19
x = 6
result: 1621512513.7737606 15
result: 1621512514.1747007 16
result: 1621512514.4758775 17
result: 1621512514.675989 18
result: 1621512514.7766125 19
x = 12
x = 1
x = 13
x = 7
x = 14
x = 2
result: 1621512514.2716103 10
result: 1621512515.1721854 11
result: 1621512515.9727488 12
result: 1621512516.6744206 13
result: 1621512517.276999 14
x = 8
x = 9
x = 3
result: 1621512514.7695887 5
result: 1621512516.170747 6
result: 1621512517.4713914 7
result: 1621512518.6734042 8
result: 1621512519.7743165 9
x = 4
result: 1621512515.268784 0
result: 1621512517.1698637 1
result: 1621512518.9698756 2
result: 1621512520.671273 3
result: 1621512522.2716706 4

您可以清楚地看到,这些结果不是按完成顺序产生的。例如,我已经返回了1621512519.7743165 9,然后是1621512515.268784 0,这是由工作者函数返回的,比之前返回的结果早4秒多。然而,如果我将chunksize值改为1,打印输出就变成:

x = 0
x = 1
x = 2
x = 3
x = 4
result: 1621513028.888357 3
x = 5
result: 1621513028.9863524 2
x = 6
result: 1621513029.0838938 1
x = 7
result: 1621513029.1825204 0
x = 8
result: 1621513030.4842813 7
x = 9
result: 1621513030.4852195 6
x = 10
result: 1621513030.4872172 5
x = 11
result: 1621513030.4892178 4
x = 12
result: 1621513031.3908074 11
x = 13
result: 1621513031.4895358 10
x = 14
result: 1621513031.587289 9
x = 15
result: 1621513031.686152 8
x = 16
result: 1621513032.1877549 15
x = 17
result: 1621513032.1896958 14
x = 18
result: 1621513032.1923752 13
x = 19
result: 1621513032.1923752 12
result: 1621513032.2935638 19
result: 1621513032.3927407 18
result: 1621513032.4912949 17
result: 1621513032.5884912 16

这是完成顺序。但是,如果指定的chunksize值为1,我不愿意说imap_unordered总是会在结果可用时返回结果,尽管根据这个实验似乎就是这种情况,因为文档没有这样的声明。

讨论

当指定chunksize为5时,将把20个任务放在单个输入队列上,以便池中的4个进程以大小为5的chunk进行处理。因此,空闲的进程将从队列中取出5个任务的下一个块,并在再次空闲之前依次处理每个任务。因此,第一个进程将处理x个参数0到4,第二个进程处理x个参数5到9,等等。这就是为什么你看到初始x值打印为0、5、10和15。

但是,当x参数0的结果在x参数9的结果之前完成时,结果将被作为块一起写入,因此x参数0的结果将不会返回,直到在同一块中排队的x参数的结果(即1、2、3和4)也可用。