很抱歉,我不能用一个更简单的例子重现这个错误,而且我的代码太复杂了,无法发布。如果我在IPython shell而不是常规Python中运行程序,事情会很顺利。

我查阅了以前关于这个问题的一些笔记。它们都是由使用pool调用类函数中定义的函数引起的。但对我来说不是这样。

Exception in thread Thread-3:
Traceback (most recent call last):
  File "/usr/lib64/python2.7/threading.py", line 552, in __bootstrap_inner
    self.run()
  File "/usr/lib64/python2.7/threading.py", line 505, in run
    self.__target(*self.__args, **self.__kwargs)
  File "/usr/lib64/python2.7/multiprocessing/pool.py", line 313, in _handle_tasks
    put(task)
PicklingError: Can't pickle <type 'function'>: attribute lookup __builtin__.function failed

我很感激你的帮助。

更新:我pickle的函数是在模块的顶层定义的。尽管它调用了一个包含嵌套函数的函数。即,f()调用g()调用h(),其中有一个嵌套函数i(),我正在调用pool.apply_async(f)。F (), g(), h()都在顶层定义。我用这个模式尝试了一个更简单的例子,尽管它是有效的。


当前回答

当多处理出现这个问题时,一个简单的解决方案是从Pool切换到ThreadPool。除了import-,无需更改其他代码即可完成此操作

from multiprocessing.pool import ThreadPool as Pool

这是因为ThreadPool与主线程共享内存,而不是创建一个新进程——这意味着不需要pickle。

The downside to this method is that python isn't the greatest language with handling threads- it uses something called the Global Interpreter Lock to stay thread safe, which can slow down some use cases here. However, if you're primarily interacting with other systems (running HTTP commands, talking with a database, writing to filesystems) then your code is likely not bound by CPU and won't take much of a hit. In fact I've found when writing HTTP/HTTPS benchmarks that the threaded model used here has less overhead and delays, as the overhead from creating new processes is much higher than the overhead for creating new threads and the program was otherwise just waiting for HTTP responses.

所以如果你在python用户空间中处理大量的东西,这可能不是最好的方法。

其他回答

这里列出了一些可以腌制的东西。特别是,函数只有在模块的顶层定义时才可pickle。

这段代码:

import multiprocessing as mp

class Foo():
    @staticmethod
    def work(self):
        pass

if __name__ == '__main__':   
    pool = mp.Pool()
    foo = Foo()
    pool.apply_async(foo.work)
    pool.close()
    pool.join()

产生一个错误几乎相同的一个你张贴:

Exception in thread Thread-2:
Traceback (most recent call last):
  File "/usr/lib/python2.7/threading.py", line 552, in __bootstrap_inner
    self.run()
  File "/usr/lib/python2.7/threading.py", line 505, in run
    self.__target(*self.__args, **self.__kwargs)
  File "/usr/lib/python2.7/multiprocessing/pool.py", line 315, in _handle_tasks
    put(task)
PicklingError: Can't pickle <type 'function'>: attribute lookup __builtin__.function failed

问题是池方法都使用mp。SimpleQueue将任务传递给工作进程。所有经过mp的东西。SimpleQueue必须是可选的,而foo。工作是不可pickle的,因为它没有在模块的顶层定义。

可以通过在顶层定义一个调用foo.work()的函数来修复:

def work(foo):
    foo.work()

pool.apply_async(work,args=(foo,))

注意,foo是可选的,因为foo是在顶层定义的。__dict__是可腌制的。

我发现,通过尝试对代码段使用分析器,我还可以在一个完美工作的代码段上生成完全相同的错误输出。

注意,这是在Windows上(其中的分叉有点不优雅)。

我在跑:

python -m profile -o output.pstats <script> 

发现删除剖析可以消除错误,而放置剖析可以恢复错误。也快把我逼疯了因为我知道密码以前有用。我正在检查是否有什么东西更新了pool.py…然后有了一种沉沦的感觉,消除了侧写,就是这样。

把它贴在这里存档,以防别人发现。

当多处理出现这个问题时,一个简单的解决方案是从Pool切换到ThreadPool。除了import-,无需更改其他代码即可完成此操作

from multiprocessing.pool import ThreadPool as Pool

这是因为ThreadPool与主线程共享内存,而不是创建一个新进程——这意味着不需要pickle。

The downside to this method is that python isn't the greatest language with handling threads- it uses something called the Global Interpreter Lock to stay thread safe, which can slow down some use cases here. However, if you're primarily interacting with other systems (running HTTP commands, talking with a database, writing to filesystems) then your code is likely not bound by CPU and won't take much of a hit. In fact I've found when writing HTTP/HTTPS benchmarks that the threaded model used here has less overhead and delays, as the overhead from creating new processes is much higher than the overhead for creating new threads and the program was otherwise just waiting for HTTP responses.

所以如果你在python用户空间中处理大量的东西,这可能不是最好的方法。

不能pickle <type 'function'>:属性查找__builtin__。函数失败

如果传递给异步作业的模型对象中有任何内置函数,也会出现此错误。

因此,确保检查传递的模型对象没有内置函数。(在我们的例子中,我们在模型中使用django-model-utils的FieldTracker()函数来跟踪某个字段)。这里是相关GitHub问题的链接。

正如@penky Suresh在这个回答中建议的那样,不要使用内置关键字。

显然,args是处理多处理时的内置关键字


class TTS:
    def __init__(self):
        pass

    def process_and_render_items(self):
        multiprocessing_args = [{"a": "b", "c": "d"}, {"e": "f", "g": "h"}]

        with ProcessPoolExecutor(max_workers=10) as executor:
          # Using args here is fine. 
            future_processes = {
              executor.submit(TTS.process_and_render_item, args)
                for args in multiprocessing_args
            }

            for future in as_completed(future_processes):
                try:
                    data = future.result()
                except Exception as exc:
                    print(f"Generated an exception: {exc}")
                else:
                   print(f"Generated data for comment process: {future}")
 

    # Dont use 'args' here. It seems to be a built-in keyword.
    # Changing 'args' to 'arg' worked for me.
    def process_and_render_item(arg):
        print(arg)
      # This will print {"a": "b", "c": "d"} for the first process
      # and {"e": "f", "g": "h"} for the second process.



PS:制表符/空格可能有点不对。