很抱歉,我不能用一个更简单的例子重现这个错误,而且我的代码太复杂了,无法发布。如果我在IPython shell而不是常规Python中运行程序,事情会很顺利。
我查阅了以前关于这个问题的一些笔记。它们都是由使用pool调用类函数中定义的函数引起的。但对我来说不是这样。
Exception in thread Thread-3:
Traceback (most recent call last):
File "/usr/lib64/python2.7/threading.py", line 552, in __bootstrap_inner
self.run()
File "/usr/lib64/python2.7/threading.py", line 505, in run
self.__target(*self.__args, **self.__kwargs)
File "/usr/lib64/python2.7/multiprocessing/pool.py", line 313, in _handle_tasks
put(task)
PicklingError: Can't pickle <type 'function'>: attribute lookup __builtin__.function failed
我很感激你的帮助。
更新:我pickle的函数是在模块的顶层定义的。尽管它调用了一个包含嵌套函数的函数。即,f()调用g()调用h(),其中有一个嵌套函数i(),我正在调用pool.apply_async(f)。F (), g(), h()都在顶层定义。我用这个模式尝试了一个更简单的例子,尽管它是有效的。
这里列出了一些可以腌制的东西。特别是,函数只有在模块的顶层定义时才可pickle。
这段代码:
import multiprocessing as mp
class Foo():
@staticmethod
def work(self):
pass
if __name__ == '__main__':
pool = mp.Pool()
foo = Foo()
pool.apply_async(foo.work)
pool.close()
pool.join()
产生一个错误几乎相同的一个你张贴:
Exception in thread Thread-2:
Traceback (most recent call last):
File "/usr/lib/python2.7/threading.py", line 552, in __bootstrap_inner
self.run()
File "/usr/lib/python2.7/threading.py", line 505, in run
self.__target(*self.__args, **self.__kwargs)
File "/usr/lib/python2.7/multiprocessing/pool.py", line 315, in _handle_tasks
put(task)
PicklingError: Can't pickle <type 'function'>: attribute lookup __builtin__.function failed
问题是池方法都使用mp。SimpleQueue将任务传递给工作进程。所有经过mp的东西。SimpleQueue必须是可选的,而foo。工作是不可pickle的,因为它没有在模块的顶层定义。
可以通过在顶层定义一个调用foo.work()的函数来修复:
def work(foo):
foo.work()
pool.apply_async(work,args=(foo,))
注意,foo是可选的,因为foo是在顶层定义的。__dict__是可腌制的。
正如其他人所说,多处理只能将Python对象转移到可以pickle的工作进程。如果您不能像unutbu所描述的那样重新组织代码,您可以使用dills扩展的pickle /unpickling功能来传输数据(特别是代码数据),如下所示。
这个解决方案只需要安装dill,而不需要像pathos这样的其他库:
import os
from multiprocessing import Pool
import dill
def run_dill_encoded(payload):
fun, args = dill.loads(payload)
return fun(*args)
def apply_async(pool, fun, args):
payload = dill.dumps((fun, args))
return pool.apply_async(run_dill_encoded, (payload,))
if __name__ == "__main__":
pool = Pool(processes=5)
# asyn execution of lambda
jobs = []
for i in range(10):
job = apply_async(pool, lambda a, b: (a, b, a * b), (i, i + 1))
jobs.append(job)
for job in jobs:
print job.get()
print
# async execution of static method
class O(object):
@staticmethod
def calc():
return os.getpid()
jobs = []
for i in range(10):
job = apply_async(pool, O.calc, ())
jobs.append(job)
for job in jobs:
print job.get()
正如@penky Suresh在这个回答中建议的那样,不要使用内置关键字。
显然,args是处理多处理时的内置关键字
class TTS:
def __init__(self):
pass
def process_and_render_items(self):
multiprocessing_args = [{"a": "b", "c": "d"}, {"e": "f", "g": "h"}]
with ProcessPoolExecutor(max_workers=10) as executor:
# Using args here is fine.
future_processes = {
executor.submit(TTS.process_and_render_item, args)
for args in multiprocessing_args
}
for future in as_completed(future_processes):
try:
data = future.result()
except Exception as exc:
print(f"Generated an exception: {exc}")
else:
print(f"Generated data for comment process: {future}")
# Dont use 'args' here. It seems to be a built-in keyword.
# Changing 'args' to 'arg' worked for me.
def process_and_render_item(arg):
print(arg)
# This will print {"a": "b", "c": "d"} for the first process
# and {"e": "f", "g": "h"} for the second process.
PS:制表符/空格可能有点不对。