很抱歉,我不能用一个更简单的例子重现这个错误,而且我的代码太复杂了,无法发布。如果我在IPython shell而不是常规Python中运行程序,事情会很顺利。

我查阅了以前关于这个问题的一些笔记。它们都是由使用pool调用类函数中定义的函数引起的。但对我来说不是这样。

Exception in thread Thread-3:
Traceback (most recent call last):
  File "/usr/lib64/python2.7/threading.py", line 552, in __bootstrap_inner
    self.run()
  File "/usr/lib64/python2.7/threading.py", line 505, in run
    self.__target(*self.__args, **self.__kwargs)
  File "/usr/lib64/python2.7/multiprocessing/pool.py", line 313, in _handle_tasks
    put(task)
PicklingError: Can't pickle <type 'function'>: attribute lookup __builtin__.function failed

我很感激你的帮助。

更新:我pickle的函数是在模块的顶层定义的。尽管它调用了一个包含嵌套函数的函数。即,f()调用g()调用h(),其中有一个嵌套函数i(),我正在调用pool.apply_async(f)。F (), g(), h()都在顶层定义。我用这个模式尝试了一个更简单的例子,尽管它是有效的。


当前回答

当多处理出现这个问题时,一个简单的解决方案是从Pool切换到ThreadPool。除了import-,无需更改其他代码即可完成此操作

from multiprocessing.pool import ThreadPool as Pool

这是因为ThreadPool与主线程共享内存,而不是创建一个新进程——这意味着不需要pickle。

The downside to this method is that python isn't the greatest language with handling threads- it uses something called the Global Interpreter Lock to stay thread safe, which can slow down some use cases here. However, if you're primarily interacting with other systems (running HTTP commands, talking with a database, writing to filesystems) then your code is likely not bound by CPU and won't take much of a hit. In fact I've found when writing HTTP/HTTPS benchmarks that the threaded model used here has less overhead and delays, as the overhead from creating new processes is much higher than the overhead for creating new threads and the program was otherwise just waiting for HTTP responses.

所以如果你在python用户空间中处理大量的东西,这可能不是最好的方法。

其他回答

我发现,通过尝试对代码段使用分析器,我还可以在一个完美工作的代码段上生成完全相同的错误输出。

注意,这是在Windows上(其中的分叉有点不优雅)。

我在跑:

python -m profile -o output.pstats <script> 

发现删除剖析可以消除错误,而放置剖析可以恢复错误。也快把我逼疯了因为我知道密码以前有用。我正在检查是否有什么东西更新了pool.py…然后有了一种沉沦的感觉,消除了侧写,就是这样。

把它贴在这里存档,以防别人发现。

一个快速的解决方法是使函数全局化

from multiprocessing import Pool


class Test:
    def __init__(self, x):
        self.x = x
    
    @staticmethod
    def test(x):
        return x**2


    def test_apply(self, list_):
        global r
        def r(x):
            return Test.test(x + self.x)

        with Pool() as p:
            l = p.map(r, list_)

        return l



if __name__ == '__main__':
    o = Test(2)
    print(o.test_apply(range(10)))

正如其他人所说,多处理只能将Python对象转移到可以pickle的工作进程。如果您不能像unutbu所描述的那样重新组织代码,您可以使用dills扩展的pickle /unpickling功能来传输数据(特别是代码数据),如下所示。

这个解决方案只需要安装dill,而不需要像pathos这样的其他库:

import os
from multiprocessing import Pool

import dill


def run_dill_encoded(payload):
    fun, args = dill.loads(payload)
    return fun(*args)


def apply_async(pool, fun, args):
    payload = dill.dumps((fun, args))
    return pool.apply_async(run_dill_encoded, (payload,))


if __name__ == "__main__":

    pool = Pool(processes=5)

    # asyn execution of lambda
    jobs = []
    for i in range(10):
        job = apply_async(pool, lambda a, b: (a, b, a * b), (i, i + 1))
        jobs.append(job)

    for job in jobs:
        print job.get()
    print

    # async execution of static method

    class O(object):

        @staticmethod
        def calc():
            return os.getpid()

    jobs = []
    for i in range(10):
        job = apply_async(pool, O.calc, ())
        jobs.append(job)

    for job in jobs:
        print job.get()

当多处理出现这个问题时,一个简单的解决方案是从Pool切换到ThreadPool。除了import-,无需更改其他代码即可完成此操作

from multiprocessing.pool import ThreadPool as Pool

这是因为ThreadPool与主线程共享内存,而不是创建一个新进程——这意味着不需要pickle。

The downside to this method is that python isn't the greatest language with handling threads- it uses something called the Global Interpreter Lock to stay thread safe, which can slow down some use cases here. However, if you're primarily interacting with other systems (running HTTP commands, talking with a database, writing to filesystems) then your code is likely not bound by CPU and won't take much of a hit. In fact I've found when writing HTTP/HTTPS benchmarks that the threaded model used here has less overhead and delays, as the overhead from creating new processes is much higher than the overhead for creating new threads and the program was otherwise just waiting for HTTP responses.

所以如果你在python用户空间中处理大量的东西,这可能不是最好的方法。

不能pickle <type 'function'>:属性查找__builtin__。函数失败

如果传递给异步作业的模型对象中有任何内置函数,也会出现此错误。

因此,确保检查传递的模型对象没有内置函数。(在我们的例子中,我们在模型中使用django-model-utils的FieldTracker()函数来跟踪某个字段)。这里是相关GitHub问题的链接。