很抱歉,我不能用一个更简单的例子重现这个错误,而且我的代码太复杂了,无法发布。如果我在IPython shell而不是常规Python中运行程序,事情会很顺利。
我查阅了以前关于这个问题的一些笔记。它们都是由使用pool调用类函数中定义的函数引起的。但对我来说不是这样。
Exception in thread Thread-3:
Traceback (most recent call last):
File "/usr/lib64/python2.7/threading.py", line 552, in __bootstrap_inner
self.run()
File "/usr/lib64/python2.7/threading.py", line 505, in run
self.__target(*self.__args, **self.__kwargs)
File "/usr/lib64/python2.7/multiprocessing/pool.py", line 313, in _handle_tasks
put(task)
PicklingError: Can't pickle <type 'function'>: attribute lookup __builtin__.function failed
我很感激你的帮助。
更新:我pickle的函数是在模块的顶层定义的。尽管它调用了一个包含嵌套函数的函数。即,f()调用g()调用h(),其中有一个嵌套函数i(),我正在调用pool.apply_async(f)。F (), g(), h()都在顶层定义。我用这个模式尝试了一个更简单的例子,尽管它是有效的。
当多处理出现这个问题时,一个简单的解决方案是从Pool切换到ThreadPool。除了import-,无需更改其他代码即可完成此操作
from multiprocessing.pool import ThreadPool as Pool
这是因为ThreadPool与主线程共享内存,而不是创建一个新进程——这意味着不需要pickle。
The downside to this method is that python isn't the greatest language with handling threads- it uses something called the Global Interpreter Lock to stay thread safe, which can slow down some use cases here. However, if you're primarily interacting with other systems (running HTTP commands, talking with a database, writing to filesystems) then your code is likely not bound by CPU and won't take much of a hit. In fact I've found when writing HTTP/HTTPS benchmarks that the threaded model used here has less overhead and delays, as the overhead from creating new processes is much higher than the overhead for creating new threads and the program was otherwise just waiting for HTTP responses.
所以如果你在python用户空间中处理大量的东西,这可能不是最好的方法。
这里列出了一些可以腌制的东西。特别是,函数只有在模块的顶层定义时才可pickle。
这段代码:
import multiprocessing as mp
class Foo():
@staticmethod
def work(self):
pass
if __name__ == '__main__':
pool = mp.Pool()
foo = Foo()
pool.apply_async(foo.work)
pool.close()
pool.join()
产生一个错误几乎相同的一个你张贴:
Exception in thread Thread-2:
Traceback (most recent call last):
File "/usr/lib/python2.7/threading.py", line 552, in __bootstrap_inner
self.run()
File "/usr/lib/python2.7/threading.py", line 505, in run
self.__target(*self.__args, **self.__kwargs)
File "/usr/lib/python2.7/multiprocessing/pool.py", line 315, in _handle_tasks
put(task)
PicklingError: Can't pickle <type 'function'>: attribute lookup __builtin__.function failed
问题是池方法都使用mp。SimpleQueue将任务传递给工作进程。所有经过mp的东西。SimpleQueue必须是可选的,而foo。工作是不可pickle的,因为它没有在模块的顶层定义。
可以通过在顶层定义一个调用foo.work()的函数来修复:
def work(foo):
foo.work()
pool.apply_async(work,args=(foo,))
注意,foo是可选的,因为foo是在顶层定义的。__dict__是可腌制的。
当多处理出现这个问题时,一个简单的解决方案是从Pool切换到ThreadPool。除了import-,无需更改其他代码即可完成此操作
from multiprocessing.pool import ThreadPool as Pool
这是因为ThreadPool与主线程共享内存,而不是创建一个新进程——这意味着不需要pickle。
The downside to this method is that python isn't the greatest language with handling threads- it uses something called the Global Interpreter Lock to stay thread safe, which can slow down some use cases here. However, if you're primarily interacting with other systems (running HTTP commands, talking with a database, writing to filesystems) then your code is likely not bound by CPU and won't take much of a hit. In fact I've found when writing HTTP/HTTPS benchmarks that the threaded model used here has less overhead and delays, as the overhead from creating new processes is much higher than the overhead for creating new threads and the program was otherwise just waiting for HTTP responses.
所以如果你在python用户空间中处理大量的东西,这可能不是最好的方法。
正如@penky Suresh在这个回答中建议的那样,不要使用内置关键字。
显然,args是处理多处理时的内置关键字
class TTS:
def __init__(self):
pass
def process_and_render_items(self):
multiprocessing_args = [{"a": "b", "c": "d"}, {"e": "f", "g": "h"}]
with ProcessPoolExecutor(max_workers=10) as executor:
# Using args here is fine.
future_processes = {
executor.submit(TTS.process_and_render_item, args)
for args in multiprocessing_args
}
for future in as_completed(future_processes):
try:
data = future.result()
except Exception as exc:
print(f"Generated an exception: {exc}")
else:
print(f"Generated data for comment process: {future}")
# Dont use 'args' here. It seems to be a built-in keyword.
# Changing 'args' to 'arg' worked for me.
def process_and_render_item(arg):
print(arg)
# This will print {"a": "b", "c": "d"} for the first process
# and {"e": "f", "g": "h"} for the second process.
PS:制表符/空格可能有点不对。