很抱歉,我不能用一个更简单的例子重现这个错误,而且我的代码太复杂了,无法发布。如果我在IPython shell而不是常规Python中运行程序,事情会很顺利。
我查阅了以前关于这个问题的一些笔记。它们都是由使用pool调用类函数中定义的函数引起的。但对我来说不是这样。
Exception in thread Thread-3:
Traceback (most recent call last):
File "/usr/lib64/python2.7/threading.py", line 552, in __bootstrap_inner
self.run()
File "/usr/lib64/python2.7/threading.py", line 505, in run
self.__target(*self.__args, **self.__kwargs)
File "/usr/lib64/python2.7/multiprocessing/pool.py", line 313, in _handle_tasks
put(task)
PicklingError: Can't pickle <type 'function'>: attribute lookup __builtin__.function failed
我很感激你的帮助。
更新:我pickle的函数是在模块的顶层定义的。尽管它调用了一个包含嵌套函数的函数。即,f()调用g()调用h(),其中有一个嵌套函数i(),我正在调用pool.apply_async(f)。F (), g(), h()都在顶层定义。我用这个模式尝试了一个更简单的例子,尽管它是有效的。
这里列出了一些可以腌制的东西。特别是,函数只有在模块的顶层定义时才可pickle。
这段代码:
import multiprocessing as mp
class Foo():
@staticmethod
def work(self):
pass
if __name__ == '__main__':
pool = mp.Pool()
foo = Foo()
pool.apply_async(foo.work)
pool.close()
pool.join()
产生一个错误几乎相同的一个你张贴:
Exception in thread Thread-2:
Traceback (most recent call last):
File "/usr/lib/python2.7/threading.py", line 552, in __bootstrap_inner
self.run()
File "/usr/lib/python2.7/threading.py", line 505, in run
self.__target(*self.__args, **self.__kwargs)
File "/usr/lib/python2.7/multiprocessing/pool.py", line 315, in _handle_tasks
put(task)
PicklingError: Can't pickle <type 'function'>: attribute lookup __builtin__.function failed
问题是池方法都使用mp。SimpleQueue将任务传递给工作进程。所有经过mp的东西。SimpleQueue必须是可选的,而foo。工作是不可pickle的,因为它没有在模块的顶层定义。
可以通过在顶层定义一个调用foo.work()的函数来修复:
def work(foo):
foo.work()
pool.apply_async(work,args=(foo,))
注意,foo是可选的,因为foo是在顶层定义的。__dict__是可腌制的。
基于@rocksportrocker解决方案,
在发送和RECVing结果时使用dill是有意义的。
import dill
import itertools
def run_dill_encoded(payload):
fun, args = dill.loads(payload)
res = fun(*args)
res = dill.dumps(res)
return res
def dill_map_async(pool, fun, args_list,
as_tuple=True,
**kw):
if as_tuple:
args_list = ((x,) for x in args_list)
it = itertools.izip(
itertools.cycle([fun]),
args_list)
it = itertools.imap(dill.dumps, it)
return pool.map_async(run_dill_encoded, it, **kw)
if __name__ == '__main__':
import multiprocessing as mp
import sys,os
p = mp.Pool(4)
res = dill_map_async(p, lambda x:[sys.stdout.write('%s\n'%os.getpid()),x][-1],
[lambda x:x+1]*10,)
res = res.get(timeout=100)
res = map(dill.loads,res)
print(res)
我会用悲怆。多处理,而不是多处理。感伤。Multiprocessing是Multiprocessing的一个分支,使用莳萝。Dill几乎可以序列化python中的任何东西,因此您可以并行地发送更多内容。正如类方法所需要的那样,pathos fork还能够直接使用多个参数函数。
>>> from pathos.multiprocessing import ProcessingPool as Pool
>>> p = Pool(4)
>>> class Test(object):
... def plus(self, x, y):
... return x+y
...
>>> t = Test()
>>> p.map(t.plus, x, y)
[4, 6, 8, 10]
>>>
>>> class Foo(object):
... @staticmethod
... def work(self, x):
... return x+1
...
>>> f = Foo()
>>> p.apipe(f.work, f, 100)
<processing.pool.ApplyResult object at 0x10504f8d0>
>>> res = _
>>> res.get()
101
在这里获得感伤(如果你喜欢,莳萝):
https://github.com/uqfoundation
当多处理出现这个问题时,一个简单的解决方案是从Pool切换到ThreadPool。除了import-,无需更改其他代码即可完成此操作
from multiprocessing.pool import ThreadPool as Pool
这是因为ThreadPool与主线程共享内存,而不是创建一个新进程——这意味着不需要pickle。
The downside to this method is that python isn't the greatest language with handling threads- it uses something called the Global Interpreter Lock to stay thread safe, which can slow down some use cases here. However, if you're primarily interacting with other systems (running HTTP commands, talking with a database, writing to filesystems) then your code is likely not bound by CPU and won't take much of a hit. In fact I've found when writing HTTP/HTTPS benchmarks that the threaded model used here has less overhead and delays, as the overhead from creating new processes is much higher than the overhead for creating new threads and the program was otherwise just waiting for HTTP responses.
所以如果你在python用户空间中处理大量的东西,这可能不是最好的方法。