我试图使用multiprocessing的Pool.map()函数来同时划分工作。当我使用以下代码时,它工作得很好:
import multiprocessing
def f(x):
return x*x
def go():
pool = multiprocessing.Pool(processes=4)
print pool.map(f, range(10))
if __name__== '__main__' :
go()
然而,当我在更面向对象的方法中使用它时,它就不起作用了。它给出的错误信息是:
PicklingError: Can't pickle <type 'instancemethod'>: attribute lookup
__builtin__.instancemethod failed
这发生时,以下是我的主程序:
import someClass
if __name__== '__main__' :
sc = someClass.someClass()
sc.go()
下面是我的someClass类:
import multiprocessing
class someClass(object):
def __init__(self):
pass
def f(self, x):
return x*x
def go(self):
pool = multiprocessing.Pool(processes=4)
print pool.map(self.f, range(10))
有人知道问题是什么吗,或者有什么简单的解决方法吗?
您可以使用另一种快捷方式,尽管它可能效率很低,这取决于类实例中的内容。
正如每个人都说过的,问题是多处理代码必须pickle它发送给它已经启动的子进程的东西,而pickle器不做实例方法。
但是,您可以不发送实例方法,而是将实际的类实例加上要调用的函数名发送给普通函数,然后该函数使用getattr调用实例方法,从而在Pool子进程中创建绑定方法。这类似于定义__call__方法,不同的是你可以调用多个成员函数。
偷@EricH。的代码,并做了一些注释(我重新输入了它,因此所有的名字都发生了变化,出于某种原因,这似乎比剪切和粘贴更容易:-)),以说明所有的魔力:
import multiprocessing
import os
def call_it(instance, name, args=(), kwargs=None):
"indirect caller for instance methods and multiprocessing"
if kwargs is None:
kwargs = {}
return getattr(instance, name)(*args, **kwargs)
class Klass(object):
def __init__(self, nobj, workers=multiprocessing.cpu_count()):
print "Constructor (in pid=%d)..." % os.getpid()
self.count = 1
pool = multiprocessing.Pool(processes = workers)
async_results = [pool.apply_async(call_it,
args = (self, 'process_obj', (i,))) for i in range(nobj)]
pool.close()
map(multiprocessing.pool.ApplyResult.wait, async_results)
lst_results = [r.get() for r in async_results]
print lst_results
def __del__(self):
self.count -= 1
print "... Destructor (in pid=%d) count=%d" % (os.getpid(), self.count)
def process_obj(self, index):
print "object %d" % index
return "results"
Klass(nobj=8, workers=3)
输出显示,构造函数确实被调用了一次(在原始pid中),而析构函数被调用了9次(每个副本调用一次=根据需要,每个pool-worker-process调用2或3次,加上在原始进程中调用一次)。这通常是可以的,就像在这种情况下,因为默认pickler会复制整个实例并(半)秘密地重新填充它-在这种情况下,这样做:
obj = object.__new__(Klass)
obj.__dict__.update({'count':1})
这就是为什么即使析构函数在三个工作进程中被调用了八次,它每次都从1倒数到0——当然,这样你仍然会遇到麻烦。如果有必要,你可以提供自己的__setstate__:
def __setstate__(self, adict):
self.count = adict['count']
在这个例子中。