很抱歉,我不能用一个更简单的例子重现这个错误,而且我的代码太复杂了,无法发布。如果我在IPython shell而不是常规Python中运行程序,事情会很顺利。

我查阅了以前关于这个问题的一些笔记。它们都是由使用pool调用类函数中定义的函数引起的。但对我来说不是这样。

Exception in thread Thread-3:
Traceback (most recent call last):
  File "/usr/lib64/python2.7/threading.py", line 552, in __bootstrap_inner
    self.run()
  File "/usr/lib64/python2.7/threading.py", line 505, in run
    self.__target(*self.__args, **self.__kwargs)
  File "/usr/lib64/python2.7/multiprocessing/pool.py", line 313, in _handle_tasks
    put(task)
PicklingError: Can't pickle <type 'function'>: attribute lookup __builtin__.function failed

我很感激你的帮助。

更新:我pickle的函数是在模块的顶层定义的。尽管它调用了一个包含嵌套函数的函数。即,f()调用g()调用h(),其中有一个嵌套函数i(),我正在调用pool.apply_async(f)。F (), g(), h()都在顶层定义。我用这个模式尝试了一个更简单的例子,尽管它是有效的。


当前回答

这里列出了一些可以腌制的东西。特别是,函数只有在模块的顶层定义时才可pickle。

这段代码:

import multiprocessing as mp

class Foo():
    @staticmethod
    def work(self):
        pass

if __name__ == '__main__':   
    pool = mp.Pool()
    foo = Foo()
    pool.apply_async(foo.work)
    pool.close()
    pool.join()

产生一个错误几乎相同的一个你张贴:

Exception in thread Thread-2:
Traceback (most recent call last):
  File "/usr/lib/python2.7/threading.py", line 552, in __bootstrap_inner
    self.run()
  File "/usr/lib/python2.7/threading.py", line 505, in run
    self.__target(*self.__args, **self.__kwargs)
  File "/usr/lib/python2.7/multiprocessing/pool.py", line 315, in _handle_tasks
    put(task)
PicklingError: Can't pickle <type 'function'>: attribute lookup __builtin__.function failed

问题是池方法都使用mp。SimpleQueue将任务传递给工作进程。所有经过mp的东西。SimpleQueue必须是可选的,而foo。工作是不可pickle的,因为它没有在模块的顶层定义。

可以通过在顶层定义一个调用foo.work()的函数来修复:

def work(foo):
    foo.work()

pool.apply_async(work,args=(foo,))

注意,foo是可选的,因为foo是在顶层定义的。__dict__是可腌制的。

其他回答

基于@rocksportrocker解决方案, 在发送和RECVing结果时使用dill是有意义的。

import dill
import itertools
def run_dill_encoded(payload):
    fun, args = dill.loads(payload)
    res = fun(*args)
    res = dill.dumps(res)
    return res

def dill_map_async(pool, fun, args_list,
                   as_tuple=True,
                   **kw):
    if as_tuple:
        args_list = ((x,) for x in args_list)

    it = itertools.izip(
        itertools.cycle([fun]),
        args_list)
    it = itertools.imap(dill.dumps, it)
    return pool.map_async(run_dill_encoded, it, **kw)

if __name__ == '__main__':
    import multiprocessing as mp
    import sys,os
    p = mp.Pool(4)
    res = dill_map_async(p, lambda x:[sys.stdout.write('%s\n'%os.getpid()),x][-1],
                  [lambda x:x+1]*10,)
    res = res.get(timeout=100)
    res = map(dill.loads,res)
    print(res)

我会用悲怆。多处理,而不是多处理。感伤。Multiprocessing是Multiprocessing的一个分支,使用莳萝。Dill几乎可以序列化python中的任何东西,因此您可以并行地发送更多内容。正如类方法所需要的那样,pathos fork还能够直接使用多个参数函数。

>>> from pathos.multiprocessing import ProcessingPool as Pool
>>> p = Pool(4)
>>> class Test(object):
...   def plus(self, x, y): 
...     return x+y
... 
>>> t = Test()
>>> p.map(t.plus, x, y)
[4, 6, 8, 10]
>>> 
>>> class Foo(object):
...   @staticmethod
...   def work(self, x):
...     return x+1
... 
>>> f = Foo()
>>> p.apipe(f.work, f, 100)
<processing.pool.ApplyResult object at 0x10504f8d0>
>>> res = _
>>> res.get()
101

在这里获得感伤(如果你喜欢,莳萝): https://github.com/uqfoundation

一个快速的解决方法是使函数全局化

from multiprocessing import Pool


class Test:
    def __init__(self, x):
        self.x = x
    
    @staticmethod
    def test(x):
        return x**2


    def test_apply(self, list_):
        global r
        def r(x):
            return Test.test(x + self.x)

        with Pool() as p:
            l = p.map(r, list_)

        return l



if __name__ == '__main__':
    o = Test(2)
    print(o.test_apply(range(10)))

当多处理出现这个问题时,一个简单的解决方案是从Pool切换到ThreadPool。除了import-,无需更改其他代码即可完成此操作

from multiprocessing.pool import ThreadPool as Pool

这是因为ThreadPool与主线程共享内存,而不是创建一个新进程——这意味着不需要pickle。

The downside to this method is that python isn't the greatest language with handling threads- it uses something called the Global Interpreter Lock to stay thread safe, which can slow down some use cases here. However, if you're primarily interacting with other systems (running HTTP commands, talking with a database, writing to filesystems) then your code is likely not bound by CPU and won't take much of a hit. In fact I've found when writing HTTP/HTTPS benchmarks that the threaded model used here has less overhead and delays, as the overhead from creating new processes is much higher than the overhead for creating new threads and the program was otherwise just waiting for HTTP responses.

所以如果你在python用户空间中处理大量的东西,这可能不是最好的方法。

我发现,通过尝试对代码段使用分析器,我还可以在一个完美工作的代码段上生成完全相同的错误输出。

注意,这是在Windows上(其中的分叉有点不优雅)。

我在跑:

python -m profile -o output.pstats <script> 

发现删除剖析可以消除错误,而放置剖析可以恢复错误。也快把我逼疯了因为我知道密码以前有用。我正在检查是否有什么东西更新了pool.py…然后有了一种沉沦的感觉,消除了侧写,就是这样。

把它贴在这里存档,以防别人发现。