在下面的示例代码中,我想获取函数worker的返回值。我该怎么做呢?这个值存储在哪里?
示例代码:
import multiprocessing
def worker(procnum):
'''worker function'''
print str(procnum) + ' represent!'
return procnum
if __name__ == '__main__':
jobs = []
for i in range(5):
p = multiprocessing.Process(target=worker, args=(i,))
jobs.append(p)
p.start()
for proc in jobs:
proc.join()
print jobs
输出:
0 represent!
1 represent!
2 represent!
3 represent!
4 represent!
[<Process(Process-1, stopped)>, <Process(Process-2, stopped)>, <Process(Process-3, stopped)>, <Process(Process-4, stopped)>, <Process(Process-5, stopped)>]
我似乎无法在存储在作业中的对象中找到相关属性。
对于正在寻找如何使用Queue从进程中获取值的任何人:
import multiprocessing
ret = {'foo': False}
def worker(queue):
ret = queue.get()
ret['foo'] = True
queue.put(ret)
if __name__ == '__main__':
queue = multiprocessing.Queue()
queue.put(ret)
p = multiprocessing.Process(target=worker, args=(queue,))
p.start()
p.join()
print(queue.get()) # Prints {"foo": True}
注意,在Windows或Jupyter Notebook中,使用多线程,您必须将其保存为文件并执行该文件。如果你在命令提示符中这样做,你会看到这样的错误:
AttributeError: Can't get attribute 'worker' on <module '__main__' (built-in)>
出于某种原因,我找不到一个通用的例子,如何做到这一点与队列在任何地方(甚至Python的文档示例不会产生多个进程),所以这是我得到的工作后,10次尝试:
from multiprocessing import Process, Queue
def add_helper(queue, arg1, arg2): # the func called in child processes
ret = arg1 + arg2
queue.put(ret)
def multi_add(): # spawns child processes
q = Queue()
processes = []
rets = []
for _ in range(0, 100):
p = Process(target=add_helper, args=(q, 1, 2))
processes.append(p)
p.start()
for p in processes:
ret = q.get() # will block
rets.append(ret)
for p in processes:
p.join()
return rets
Queue是一个阻塞的、线程安全的队列,您可以使用它来存储来自子进程的返回值。因此,您必须将队列传递给每个进程。这里不太明显的一点是,您必须在加入进程之前从队列中获取(),否则队列将被填满并阻塞所有内容。
面向对象的更新(在Python 3.4中测试):
from multiprocessing import Process, Queue
class Multiprocessor():
def __init__(self):
self.processes = []
self.queue = Queue()
@staticmethod
def _wrapper(func, queue, args, kwargs):
ret = func(*args, **kwargs)
queue.put(ret)
def run(self, func, *args, **kwargs):
args2 = [func, self.queue, args, kwargs]
p = Process(target=self._wrapper, args=args2)
self.processes.append(p)
p.start()
def wait(self):
rets = []
for p in self.processes:
ret = self.queue.get()
rets.append(ret)
for p in self.processes:
p.join()
return rets
# tester
if __name__ == "__main__":
mp = Multiprocessor()
num_proc = 64
for _ in range(num_proc): # queue up multiple tasks running `sum`
mp.run(sum, [1, 2, 3, 4, 5])
ret = mp.wait() # get all results
print(ret)
assert len(ret) == num_proc and all(r == 15 for r in ret)
对于正在寻找如何使用Queue从进程中获取值的任何人:
import multiprocessing
ret = {'foo': False}
def worker(queue):
ret = queue.get()
ret['foo'] = True
queue.put(ret)
if __name__ == '__main__':
queue = multiprocessing.Queue()
queue.put(ret)
p = multiprocessing.Process(target=worker, args=(queue,))
p.start()
p.join()
print(queue.get()) # Prints {"foo": True}
注意,在Windows或Jupyter Notebook中,使用多线程,您必须将其保存为文件并执行该文件。如果你在命令提示符中这样做,你会看到这样的错误:
AttributeError: Can't get attribute 'worker' on <module '__main__' (built-in)>
如果你正在使用Python 3,你可以使用concurrent.futures.ProcessPoolExecutor作为一个方便的抽象:
from concurrent.futures import ProcessPoolExecutor
def worker(procnum):
'''worker function'''
print(str(procnum) + ' represent!')
return procnum
if __name__ == '__main__':
with ProcessPoolExecutor() as executor:
print(list(executor.map(worker, range(5))))
输出:
0 represent!
1 represent!
2 represent!
3 represent!
4 represent!
[0, 1, 2, 3, 4]