很抱歉,我不能用一个更简单的例子重现这个错误,而且我的代码太复杂了,无法发布。如果我在IPython shell而不是常规Python中运行程序,事情会很顺利。
我查阅了以前关于这个问题的一些笔记。它们都是由使用pool调用类函数中定义的函数引起的。但对我来说不是这样。
Exception in thread Thread-3:
Traceback (most recent call last):
File "/usr/lib64/python2.7/threading.py", line 552, in __bootstrap_inner
self.run()
File "/usr/lib64/python2.7/threading.py", line 505, in run
self.__target(*self.__args, **self.__kwargs)
File "/usr/lib64/python2.7/multiprocessing/pool.py", line 313, in _handle_tasks
put(task)
PicklingError: Can't pickle <type 'function'>: attribute lookup __builtin__.function failed
我很感激你的帮助。
更新:我pickle的函数是在模块的顶层定义的。尽管它调用了一个包含嵌套函数的函数。即,f()调用g()调用h(),其中有一个嵌套函数i(),我正在调用pool.apply_async(f)。F (), g(), h()都在顶层定义。我用这个模式尝试了一个更简单的例子,尽管它是有效的。
这里列出了一些可以腌制的东西。特别是,函数只有在模块的顶层定义时才可pickle。
这段代码:
import multiprocessing as mp
class Foo():
@staticmethod
def work(self):
pass
if __name__ == '__main__':
pool = mp.Pool()
foo = Foo()
pool.apply_async(foo.work)
pool.close()
pool.join()
产生一个错误几乎相同的一个你张贴:
Exception in thread Thread-2:
Traceback (most recent call last):
File "/usr/lib/python2.7/threading.py", line 552, in __bootstrap_inner
self.run()
File "/usr/lib/python2.7/threading.py", line 505, in run
self.__target(*self.__args, **self.__kwargs)
File "/usr/lib/python2.7/multiprocessing/pool.py", line 315, in _handle_tasks
put(task)
PicklingError: Can't pickle <type 'function'>: attribute lookup __builtin__.function failed
问题是池方法都使用mp。SimpleQueue将任务传递给工作进程。所有经过mp的东西。SimpleQueue必须是可选的,而foo。工作是不可pickle的,因为它没有在模块的顶层定义。
可以通过在顶层定义一个调用foo.work()的函数来修复:
def work(foo):
foo.work()
pool.apply_async(work,args=(foo,))
注意,foo是可选的,因为foo是在顶层定义的。__dict__是可腌制的。
我会用悲怆。多处理,而不是多处理。感伤。Multiprocessing是Multiprocessing的一个分支,使用莳萝。Dill几乎可以序列化python中的任何东西,因此您可以并行地发送更多内容。正如类方法所需要的那样,pathos fork还能够直接使用多个参数函数。
>>> from pathos.multiprocessing import ProcessingPool as Pool
>>> p = Pool(4)
>>> class Test(object):
... def plus(self, x, y):
... return x+y
...
>>> t = Test()
>>> p.map(t.plus, x, y)
[4, 6, 8, 10]
>>>
>>> class Foo(object):
... @staticmethod
... def work(self, x):
... return x+1
...
>>> f = Foo()
>>> p.apipe(f.work, f, 100)
<processing.pool.ApplyResult object at 0x10504f8d0>
>>> res = _
>>> res.get()
101
在这里获得感伤(如果你喜欢,莳萝):
https://github.com/uqfoundation