我试图使用multiprocessing的Pool.map()函数来同时划分工作。当我使用以下代码时,它工作得很好:

import multiprocessing

def f(x):
    return x*x

def go():
    pool = multiprocessing.Pool(processes=4)        
    print pool.map(f, range(10))


if __name__== '__main__' :
    go()

然而,当我在更面向对象的方法中使用它时,它就不起作用了。它给出的错误信息是:

PicklingError: Can't pickle <type 'instancemethod'>: attribute lookup
__builtin__.instancemethod failed

这发生时,以下是我的主程序:

import someClass

if __name__== '__main__' :
    sc = someClass.someClass()
    sc.go()

下面是我的someClass类:

import multiprocessing

class someClass(object):
    def __init__(self):
        pass

    def f(self, x):
        return x*x

    def go(self):
        pool = multiprocessing.Pool(processes=4)       
        print pool.map(self.f, range(10))

有人知道问题是什么吗,或者有什么简单的解决方法吗?


问题是,多进程必须pickle对象,以便在进程之间悬挂它们,而绑定的方法是不可pickle的。解决方法(不管您是否认为它“简单”;-)是将基础设施添加到程序中,以允许对此类方法进行pickle,将其注册到copy_reg标准库方法中。

例如,Steven Bethard对这个线程的贡献(接近线程的末尾)展示了一个通过copy_reg允许方法pickle / unpickle的完美可行的方法。


你也可以在someClass()中定义__call__()方法,该方法调用someClass.go(),然后将someClass()的一个实例传递给池。这个对象是可pickle的,它工作得很好(为我)…


但Steven Bethard的解决方案存在一些局限性:

When you register your class method as a function, the destructor of your class is surprisingly called every time your method processing is finished. So if you have 1 instance of your class that calls n times its method, members may disappear between 2 runs and you may get a message malloc: *** error for object 0x...: pointer being freed was not allocated (e.g. open member file) or pure virtual method called, terminate called without an active exception (which means than the lifetime of a member object I used was shorter than what I thought). I got this when dealing with n greater than the pool size. Here is a short example :

from multiprocessing import Pool, cpu_count
from multiprocessing.pool import ApplyResult

# --------- see Stenven's solution above -------------
from copy_reg import pickle
from types import MethodType

def _pickle_method(method):
    func_name = method.im_func.__name__
    obj = method.im_self
    cls = method.im_class
    return _unpickle_method, (func_name, obj, cls)

def _unpickle_method(func_name, obj, cls):
    for cls in cls.mro():
        try:
            func = cls.__dict__[func_name]
        except KeyError:
            pass
        else:
            break
    return func.__get__(obj, cls)


class Myclass(object):

    def __init__(self, nobj, workers=cpu_count()):

        print "Constructor ..."
        # multi-processing
        pool = Pool(processes=workers)
        async_results = [ pool.apply_async(self.process_obj, (i,)) for i in range(nobj) ]
        pool.close()
        # waiting for all results
        map(ApplyResult.wait, async_results)
        lst_results=[r.get() for r in async_results]
        print lst_results

    def __del__(self):
        print "... Destructor"

    def process_obj(self, index):
        print "object %d" % index
        return "results"

pickle(MethodType, _pickle_method, _unpickle_method)
Myclass(nobj=8, workers=3)
# problem !!! the destructor is called nobj times (instead of once)

输出:

Constructor ...
object 0
object 1
object 2
... Destructor
object 3
... Destructor
object 4
... Destructor
object 5
... Destructor
object 6
... Destructor
object 7
... Destructor
... Destructor
... Destructor
['results', 'results', 'results', 'results', 'results', 'results', 'results', 'results']
... Destructor

__call__方法不是那么等价的,因为[None,…]]的结果:

from multiprocessing import Pool, cpu_count
from multiprocessing.pool import ApplyResult

class Myclass(object):

    def __init__(self, nobj, workers=cpu_count()):

        print "Constructor ..."
        # multiprocessing
        pool = Pool(processes=workers)
        async_results = [ pool.apply_async(self, (i,)) for i in range(nobj) ]
        pool.close()
        # waiting for all results
        map(ApplyResult.wait, async_results)
        lst_results=[r.get() for r in async_results]
        print lst_results

    def __call__(self, i):
        self.process_obj(i)

    def __del__(self):
        print "... Destructor"

    def process_obj(self, i):
        print "obj %d" % i
        return "result"

Myclass(nobj=8, workers=3)
# problem !!! the destructor is called nobj times (instead of once), 
# **and** results are empty !

所以这两种方法都不令人满意……


您可以使用另一种快捷方式,尽管它可能效率很低,这取决于类实例中的内容。

正如每个人都说过的,问题是多处理代码必须pickle它发送给它已经启动的子进程的东西,而pickle器不做实例方法。

但是,您可以不发送实例方法,而是将实际的类实例加上要调用的函数名发送给普通函数,然后该函数使用getattr调用实例方法,从而在Pool子进程中创建绑定方法。这类似于定义__call__方法,不同的是你可以调用多个成员函数。

偷@EricH。的代码,并做了一些注释(我重新输入了它,因此所有的名字都发生了变化,出于某种原因,这似乎比剪切和粘贴更容易:-)),以说明所有的魔力:

import multiprocessing
import os

def call_it(instance, name, args=(), kwargs=None):
    "indirect caller for instance methods and multiprocessing"
    if kwargs is None:
        kwargs = {}
    return getattr(instance, name)(*args, **kwargs)

class Klass(object):
    def __init__(self, nobj, workers=multiprocessing.cpu_count()):
        print "Constructor (in pid=%d)..." % os.getpid()
        self.count = 1
        pool = multiprocessing.Pool(processes = workers)
        async_results = [pool.apply_async(call_it,
            args = (self, 'process_obj', (i,))) for i in range(nobj)]
        pool.close()
        map(multiprocessing.pool.ApplyResult.wait, async_results)
        lst_results = [r.get() for r in async_results]
        print lst_results

    def __del__(self):
        self.count -= 1
        print "... Destructor (in pid=%d) count=%d" % (os.getpid(), self.count)

    def process_obj(self, index):
        print "object %d" % index
        return "results"

Klass(nobj=8, workers=3)

输出显示,构造函数确实被调用了一次(在原始pid中),而析构函数被调用了9次(每个副本调用一次=根据需要,每个pool-worker-process调用2或3次,加上在原始进程中调用一次)。这通常是可以的,就像在这种情况下,因为默认pickler会复制整个实例并(半)秘密地重新填充它-在这种情况下,这样做:

obj = object.__new__(Klass)
obj.__dict__.update({'count':1})

这就是为什么即使析构函数在三个工作进程中被调用了八次,它每次都从1倒数到0——当然,这样你仍然会遇到麻烦。如果有必要,你可以提供自己的__setstate__:

    def __setstate__(self, adict):
        self.count = adict['count']

在这个例子中。


所有这些解决方案都是丑陋的,因为多处理和酸洗是坏的和有限的,除非跳出标准库。

如果你使用一个叫做pathos的多处理分支。在multiprocessing的map函数中,可以直接使用类和类方法。这是因为使用了dill而不是pickle或cPickle,并且dill可以序列化python中的几乎所有内容。

感伤。Multiprocessing还提供了一个异步映射函数…它可以映射具有多个参数的函数(例如map(math. map))。Pow, [1,2,3], [4,5,6])))

看到的: 多处理和莳萝一起可以做什么?

和: http://matthewrocklin.com/blog/work/2013/12/05/Parallelism-and-Serialization/

>>> import pathos.pools as pp
>>> p = pp.ProcessPool(4)
>>> 
>>> def add(x,y):
...   return x+y
... 
>>> x = [0,1,2,3]
>>> y = [4,5,6,7]
>>> 
>>> p.map(add, x, y)
[4, 6, 8, 10]
>>> 
>>> class Test(object):
...   def plus(self, x, y): 
...     return x+y
... 
>>> t = Test()
>>> 
>>> p.map(Test.plus, [t]*4, x, y)
[4, 6, 8, 10]
>>> 
>>> p.map(t.plus, x, y)
[4, 6, 8, 10]

明确地说,你可以做你一开始想做的,如果你想的话,你可以通过解释器来做。

>>> import pathos.pools as pp
>>> class someClass(object):
...   def __init__(self):
...     pass
...   def f(self, x):
...     return x*x
...   def go(self):
...     pool = pp.ProcessPool(4)
...     print pool.map(self.f, range(10))
... 
>>> sc = someClass()
>>> sc.go()
[0, 1, 4, 9, 16, 25, 36, 49, 64, 81]
>>> 

在这里获取代码: https://github.com/uqfoundation/pathos


对此,一个潜在的简单解决方案是切换到使用multiprocessing.dummy。这是一个基于线程的多处理接口实现,在Python 2.7中似乎没有这个问题。我在这方面没有太多经验,但是这个快速的导入更改允许我在类方法上调用apply_async。

一些关于multi - processing.dummy的好资源:

https://docs.python.org/2/library/multiprocessing.html#module-multiprocessing.dummy

http://chriskiehl.com/article/parallelism-in-one-line/


你也可以在someClass()中定义__call__()方法,该方法调用someClass.go(),然后将someClass()的一个实例传递给池。这个对象是可pickle的,它工作得很好(为我)…

class someClass(object):
   def __init__(self):
       pass
   def f(self, x):
       return x*x

   def go(self):
      p = Pool(4)
      sc = p.map(self, range(4))
      print sc

   def __call__(self, x):   
     return self.f(x)

sc = someClass()
sc.go()

在这个简单的例子中,someClass。F没有从类中继承任何数据,也没有向类中附加任何数据,一个可能的解决方案是将F分离出来,这样它就可以被pickle:

import multiprocessing


def f(x):
    return x*x


class someClass(object):
    def __init__(self):
        pass

    def go(self):
        pool = multiprocessing.Pool(processes=4)       
        print pool.map(f, range(10))

为什么不使用单独的函数?

def func(*args, **kwargs):
    return inst.method(args, kwargs)

print pool.map(func, arr)

更新:在撰写本文的当天,namedTuples是可选的(从python 2.7开始)

这里的问题是子进程不能导入对象的类-在这种情况下,类P-,在多模型项目的情况下,类P应该可以在使用子进程的任何地方导入

一个快速的解决方法是通过将其影响为globals()使其可导入

globals()["P"] = P

上面parisjohn的解决方案对我很有效。此外,代码看起来很干净,易于理解。在我的例子中,有几个函数要使用Pool调用,所以我在下面修改了parisjohn的代码。我使__call__能够调用几个函数,函数名在go()的参数dict中传递:

from multiprocessing import Pool
class someClass(object):
    def __init__(self):
        pass
    
    def f(self, x):
        return x*x
    
    def g(self, x):
        return x*x+1    

    def go(self):
        p = Pool(4)
        sc = p.map(self, [{"func": "f", "v": 1}, {"func": "g", "v": 2}])
        print sc

    def __call__(self, x):
        if x["func"]=="f":
            return self.f(x["v"])
        if x["func"]=="g":
            return self.g(x["v"])        

sc = someClass()
sc.go()

我遇到了同样的问题,但发现有一个JSON编码器可以用来在进程之间移动这些对象。

from pyVmomi.VmomiSupport import VmomiJSONEncoder

用这个来创建你的列表:

jsonSerialized = json.dumps(pfVmomiObj, cls=VmomiJSONEncoder)

然后在mapped函数中,使用this来恢复对象:

pfVmomiObj = json.loads(jsonSerialized)

感伤。多重处理对我很有用。

与多处理不同,它有一个池方法,可以序列化所有东西

import pathos.multiprocessing as mp
pool = mp.Pool(processes=2) 

甚至不需要安装完整的悲情包。

实际上,唯一需要的包是dill (pip install dill),然后用dill覆盖多处理Pickler:

dill.Pickler.dumps, dill.Pickler.loads = dill.dumps, dill.loads
multiprocessing.reduction.ForkingPickler = dill.Pickler
multiprocessing.reduction.dump = dill.dump

这个答案来自https://stackoverflow.com/a/69253561/10686785