很抱歉,我不能用一个更简单的例子重现这个错误,而且我的代码太复杂了,无法发布。如果我在IPython shell而不是常规Python中运行程序,事情会很顺利。

我查阅了以前关于这个问题的一些笔记。它们都是由使用pool调用类函数中定义的函数引起的。但对我来说不是这样。

Exception in thread Thread-3:
Traceback (most recent call last):
  File "/usr/lib64/python2.7/threading.py", line 552, in __bootstrap_inner
    self.run()
  File "/usr/lib64/python2.7/threading.py", line 505, in run
    self.__target(*self.__args, **self.__kwargs)
  File "/usr/lib64/python2.7/multiprocessing/pool.py", line 313, in _handle_tasks
    put(task)
PicklingError: Can't pickle <type 'function'>: attribute lookup __builtin__.function failed

我很感激你的帮助。

更新:我pickle的函数是在模块的顶层定义的。尽管它调用了一个包含嵌套函数的函数。即,f()调用g()调用h(),其中有一个嵌套函数i(),我正在调用pool.apply_async(f)。F (), g(), h()都在顶层定义。我用这个模式尝试了一个更简单的例子,尽管它是有效的。


当前回答

我会用悲怆。多处理,而不是多处理。感伤。Multiprocessing是Multiprocessing的一个分支,使用莳萝。Dill几乎可以序列化python中的任何东西,因此您可以并行地发送更多内容。正如类方法所需要的那样,pathos fork还能够直接使用多个参数函数。

>>> from pathos.multiprocessing import ProcessingPool as Pool
>>> p = Pool(4)
>>> class Test(object):
...   def plus(self, x, y): 
...     return x+y
... 
>>> t = Test()
>>> p.map(t.plus, x, y)
[4, 6, 8, 10]
>>> 
>>> class Foo(object):
...   @staticmethod
...   def work(self, x):
...     return x+1
... 
>>> f = Foo()
>>> p.apipe(f.work, f, 100)
<processing.pool.ApplyResult object at 0x10504f8d0>
>>> res = _
>>> res.get()
101

在这里获得感伤(如果你喜欢,莳萝): https://github.com/uqfoundation

其他回答

不能pickle <type 'function'>:属性查找__builtin__。函数失败

如果传递给异步作业的模型对象中有任何内置函数,也会出现此错误。

因此,确保检查传递的模型对象没有内置函数。(在我们的例子中,我们在模型中使用django-model-utils的FieldTracker()函数来跟踪某个字段)。这里是相关GitHub问题的链接。

一个快速的解决方法是使函数全局化

from multiprocessing import Pool


class Test:
    def __init__(self, x):
        self.x = x
    
    @staticmethod
    def test(x):
        return x**2


    def test_apply(self, list_):
        global r
        def r(x):
            return Test.test(x + self.x)

        with Pool() as p:
            l = p.map(r, list_)

        return l



if __name__ == '__main__':
    o = Test(2)
    print(o.test_apply(range(10)))

基于@rocksportrocker解决方案, 在发送和RECVing结果时使用dill是有意义的。

import dill
import itertools
def run_dill_encoded(payload):
    fun, args = dill.loads(payload)
    res = fun(*args)
    res = dill.dumps(res)
    return res

def dill_map_async(pool, fun, args_list,
                   as_tuple=True,
                   **kw):
    if as_tuple:
        args_list = ((x,) for x in args_list)

    it = itertools.izip(
        itertools.cycle([fun]),
        args_list)
    it = itertools.imap(dill.dumps, it)
    return pool.map_async(run_dill_encoded, it, **kw)

if __name__ == '__main__':
    import multiprocessing as mp
    import sys,os
    p = mp.Pool(4)
    res = dill_map_async(p, lambda x:[sys.stdout.write('%s\n'%os.getpid()),x][-1],
                  [lambda x:x+1]*10,)
    res = res.get(timeout=100)
    res = map(dill.loads,res)
    print(res)

正如@penky Suresh在这个回答中建议的那样,不要使用内置关键字。

显然,args是处理多处理时的内置关键字


class TTS:
    def __init__(self):
        pass

    def process_and_render_items(self):
        multiprocessing_args = [{"a": "b", "c": "d"}, {"e": "f", "g": "h"}]

        with ProcessPoolExecutor(max_workers=10) as executor:
          # Using args here is fine. 
            future_processes = {
              executor.submit(TTS.process_and_render_item, args)
                for args in multiprocessing_args
            }

            for future in as_completed(future_processes):
                try:
                    data = future.result()
                except Exception as exc:
                    print(f"Generated an exception: {exc}")
                else:
                   print(f"Generated data for comment process: {future}")
 

    # Dont use 'args' here. It seems to be a built-in keyword.
    # Changing 'args' to 'arg' worked for me.
    def process_and_render_item(arg):
        print(arg)
      # This will print {"a": "b", "c": "d"} for the first process
      # and {"e": "f", "g": "h"} for the second process.



PS:制表符/空格可能有点不对。

这里列出了一些可以腌制的东西。特别是,函数只有在模块的顶层定义时才可pickle。

这段代码:

import multiprocessing as mp

class Foo():
    @staticmethod
    def work(self):
        pass

if __name__ == '__main__':   
    pool = mp.Pool()
    foo = Foo()
    pool.apply_async(foo.work)
    pool.close()
    pool.join()

产生一个错误几乎相同的一个你张贴:

Exception in thread Thread-2:
Traceback (most recent call last):
  File "/usr/lib/python2.7/threading.py", line 552, in __bootstrap_inner
    self.run()
  File "/usr/lib/python2.7/threading.py", line 505, in run
    self.__target(*self.__args, **self.__kwargs)
  File "/usr/lib/python2.7/multiprocessing/pool.py", line 315, in _handle_tasks
    put(task)
PicklingError: Can't pickle <type 'function'>: attribute lookup __builtin__.function failed

问题是池方法都使用mp。SimpleQueue将任务传递给工作进程。所有经过mp的东西。SimpleQueue必须是可选的,而foo。工作是不可pickle的,因为它没有在模块的顶层定义。

可以通过在顶层定义一个调用foo.work()的函数来修复:

def work(foo):
    foo.work()

pool.apply_async(work,args=(foo,))

注意,foo是可选的,因为foo是在顶层定义的。__dict__是可腌制的。