在下面的示例代码中,我想获取函数worker的返回值。我该怎么做呢?这个值存储在哪里?

示例代码:

import multiprocessing

def worker(procnum):
    '''worker function'''
    print str(procnum) + ' represent!'
    return procnum


if __name__ == '__main__':
    jobs = []
    for i in range(5):
        p = multiprocessing.Process(target=worker, args=(i,))
        jobs.append(p)
        p.start()

    for proc in jobs:
        proc.join()
    print jobs

输出:

0 represent!
1 represent!
2 represent!
3 represent!
4 represent!
[<Process(Process-1, stopped)>, <Process(Process-2, stopped)>, <Process(Process-3, stopped)>, <Process(Process-4, stopped)>, <Process(Process-5, stopped)>]

我似乎无法在存储在作业中的对象中找到相关属性。


当前回答

你可以使用ProcessPoolExecutor从函数中获取一个返回值,如下所示:

from concurrent.futures import ProcessPoolExecutor

def test(num1, num2):
    return num1 + num2

with ProcessPoolExecutor() as executor:
    feature = executor.submit(test, 2, 3)
    print(feature.result()) # 5

其他回答

你可以使用ProcessPoolExecutor从函数中获取一个返回值,如下所示:

from concurrent.futures import ProcessPoolExecutor

def test(num1, num2):
    return num1 + num2

with ProcessPoolExecutor() as executor:
    feature = executor.submit(test, 2, 3)
    print(feature.result()) # 5

使用共享变量进行通信。比如这样:

import multiprocessing


def worker(procnum, return_dict):
    """worker function"""
    print(str(procnum) + " represent!")
    return_dict[procnum] = procnum


if __name__ == "__main__":
    manager = multiprocessing.Manager()
    return_dict = manager.dict()
    jobs = []
    for i in range(5):
        p = multiprocessing.Process(target=worker, args=(i, return_dict))
        jobs.append(p)
        p.start()

    for proc in jobs:
        proc.join()
    print(return_dict.values())

似乎应该使用多处理。使用.apply() .apply_async(), map()方法

http://docs.python.org/library/multiprocessing.html?highlight=pool#multiprocessing.pool.AsyncResult

出于某种原因,我找不到一个通用的例子,如何做到这一点与队列在任何地方(甚至Python的文档示例不会产生多个进程),所以这是我得到的工作后,10次尝试:

from multiprocessing import Process, Queue

def add_helper(queue, arg1, arg2): # the func called in child processes
    ret = arg1 + arg2
    queue.put(ret)

def multi_add(): # spawns child processes
    q = Queue()
    processes = []
    rets = []
    for _ in range(0, 100):
        p = Process(target=add_helper, args=(q, 1, 2))
        processes.append(p)
        p.start()
    for p in processes:
        ret = q.get() # will block
        rets.append(ret)
    for p in processes:
        p.join()
    return rets

Queue是一个阻塞的、线程安全的队列,您可以使用它来存储来自子进程的返回值。因此,您必须将队列传递给每个进程。这里不太明显的一点是,您必须在加入进程之前从队列中获取(),否则队列将被填满并阻塞所有内容。

面向对象的更新(在Python 3.4中测试):

from multiprocessing import Process, Queue

class Multiprocessor():

    def __init__(self):
        self.processes = []
        self.queue = Queue()

    @staticmethod
    def _wrapper(func, queue, args, kwargs):
        ret = func(*args, **kwargs)
        queue.put(ret)

    def run(self, func, *args, **kwargs):
        args2 = [func, self.queue, args, kwargs]
        p = Process(target=self._wrapper, args=args2)
        self.processes.append(p)
        p.start()

    def wait(self):
        rets = []
        for p in self.processes:
            ret = self.queue.get()
            rets.append(ret)
        for p in self.processes:
            p.join()
        return rets

# tester
if __name__ == "__main__":
    mp = Multiprocessor()
    num_proc = 64
    for _ in range(num_proc): # queue up multiple tasks running `sum`
        mp.run(sum, [1, 2, 3, 4, 5])
    ret = mp.wait() # get all results
    print(ret)
    assert len(ret) == num_proc and all(r == 15 for r in ret)

我认为@sega_sai建议的方法更好。但它确实需要一个代码示例,所以如下:

import multiprocessing
from os import getpid

def worker(procnum):
    print('I am number %d in process %d' % (procnum, getpid()))
    return getpid()

if __name__ == '__main__':
    pool = multiprocessing.Pool(processes = 3)
    print(pool.map(worker, range(5)))

它将打印返回值:

I am number 0 in process 19139
I am number 1 in process 19138
I am number 2 in process 19140
I am number 3 in process 19139
I am number 4 in process 19140
[19139, 19138, 19140, 19139, 19140]

如果你熟悉map (Python 2内置的),这应该不是太有挑战性。否则,请查看sega_Sai的链接。

注意,只需要很少的代码。(还要注意如何重用流程)。