很抱歉,我不能用一个更简单的例子重现这个错误,而且我的代码太复杂了,无法发布。如果我在IPython shell而不是常规Python中运行程序,事情会很顺利。
我查阅了以前关于这个问题的一些笔记。它们都是由使用pool调用类函数中定义的函数引起的。但对我来说不是这样。
Exception in thread Thread-3:
Traceback (most recent call last):
File "/usr/lib64/python2.7/threading.py", line 552, in __bootstrap_inner
self.run()
File "/usr/lib64/python2.7/threading.py", line 505, in run
self.__target(*self.__args, **self.__kwargs)
File "/usr/lib64/python2.7/multiprocessing/pool.py", line 313, in _handle_tasks
put(task)
PicklingError: Can't pickle <type 'function'>: attribute lookup __builtin__.function failed
我很感激你的帮助。
更新:我pickle的函数是在模块的顶层定义的。尽管它调用了一个包含嵌套函数的函数。即,f()调用g()调用h(),其中有一个嵌套函数i(),我正在调用pool.apply_async(f)。F (), g(), h()都在顶层定义。我用这个模式尝试了一个更简单的例子,尽管它是有效的。
基于@rocksportrocker解决方案,
在发送和RECVing结果时使用dill是有意义的。
import dill
import itertools
def run_dill_encoded(payload):
fun, args = dill.loads(payload)
res = fun(*args)
res = dill.dumps(res)
return res
def dill_map_async(pool, fun, args_list,
as_tuple=True,
**kw):
if as_tuple:
args_list = ((x,) for x in args_list)
it = itertools.izip(
itertools.cycle([fun]),
args_list)
it = itertools.imap(dill.dumps, it)
return pool.map_async(run_dill_encoded, it, **kw)
if __name__ == '__main__':
import multiprocessing as mp
import sys,os
p = mp.Pool(4)
res = dill_map_async(p, lambda x:[sys.stdout.write('%s\n'%os.getpid()),x][-1],
[lambda x:x+1]*10,)
res = res.get(timeout=100)
res = map(dill.loads,res)
print(res)
正如其他人所说,多处理只能将Python对象转移到可以pickle的工作进程。如果您不能像unutbu所描述的那样重新组织代码,您可以使用dills扩展的pickle /unpickling功能来传输数据(特别是代码数据),如下所示。
这个解决方案只需要安装dill,而不需要像pathos这样的其他库:
import os
from multiprocessing import Pool
import dill
def run_dill_encoded(payload):
fun, args = dill.loads(payload)
return fun(*args)
def apply_async(pool, fun, args):
payload = dill.dumps((fun, args))
return pool.apply_async(run_dill_encoded, (payload,))
if __name__ == "__main__":
pool = Pool(processes=5)
# asyn execution of lambda
jobs = []
for i in range(10):
job = apply_async(pool, lambda a, b: (a, b, a * b), (i, i + 1))
jobs.append(job)
for job in jobs:
print job.get()
print
# async execution of static method
class O(object):
@staticmethod
def calc():
return os.getpid()
jobs = []
for i in range(10):
job = apply_async(pool, O.calc, ())
jobs.append(job)
for job in jobs:
print job.get()