这可能是一个微不足道的问题,但我如何在python中并行化下面的循环?

# setup output lists
output1 = list()
output2 = list()
output3 = list()

for j in range(0, 10):
    # calc individual parameter value
    parameter = j * offset
    # call the calculation
    out1, out2, out3 = calc_stuff(parameter = parameter)

    # put results into correct output list
    output1.append(out1)
    output2.append(out2)
    output3.append(out3)

我知道如何在Python中启动单个线程,但我不知道如何“收集”结果。

多个进程也可以——在这种情况下,只要是最简单的就行。我目前使用的是Linux,但代码应该在Windows和Mac上运行。

并行化这段代码最简单的方法是什么?


当前回答

并行处理的一个非常简单的例子是

from multiprocessing import Process

output1 = list()
output2 = list()
output3 = list()

def yourfunction():
    for j in range(0, 10):
        # calc individual parameter value
        parameter = j * offset
        # call the calculation
        out1, out2, out3 = calc_stuff(parameter=parameter)

        # put results into correct output list
        output1.append(out1)
        output2.append(out2)
        output3.append(out3)

if __name__ == '__main__':
    p = Process(target=pa.yourfunction, args=('bob',))
    p.start()
    p.join()

其他回答

由于全局解释器锁(GIL)的存在,在CPython上使用多线程并不能为纯python代码提供更好的性能。我建议使用multiprocessing模块:

pool = multiprocessing.Pool(4)
out1, out2, out3 = zip(*pool.map(calc_stuff, range(0, 10 * offset, offset)))

注意,这在交互式解释器中不起作用。

为了避免GIL周围常见的FUD:在本例中使用线程没有任何优势。这里要使用进程,而不是线程,因为它们避免了一大堆问题。

为什么不用线程和一个互斥来保护一个全局列表呢?

import os
import re
import time
import sys
import thread

from threading import Thread

class thread_it(Thread):
    def __init__ (self,param):
        Thread.__init__(self)
        self.param = param
    def run(self):
        mutex.acquire()
        output.append(calc_stuff(self.param))
        mutex.release()   


threads = []
output = []
mutex = thread.allocate_lock()

for j in range(0, 10):
    current = thread_it(j * offset)
    threads.append(current)
    current.start()

for t in threads:
    t.join()

#here you have output list filled with data

请记住,您的速度将与最慢的线程一样快

我发现joblib对我很有用。请看下面的例子:

from joblib import Parallel, delayed
def yourfunction(k):   
    s=3.14*k*k
    print "Area of a circle with a radius ", k, " is:", s

element_run = Parallel(n_jobs=-1)(delayed(yourfunction)(k) for k in range(1,10))

N_jobs =-1:使用所有可用内核

看看这个;

http://docs.python.org/library/queue.html

这可能不是正确的方法,但我会这样做;

实际的代码;

from multiprocessing import Process, JoinableQueue as Queue 

class CustomWorker(Process):
    def __init__(self,workQueue, out1,out2,out3):
        Process.__init__(self)
        self.input=workQueue
        self.out1=out1
        self.out2=out2
        self.out3=out3
    def run(self):
            while True:
                try:
                    value = self.input.get()
                    #value modifier
                    temp1,temp2,temp3 = self.calc_stuff(value)
                    self.out1.put(temp1)
                    self.out2.put(temp2)
                    self.out3.put(temp3)
                    self.input.task_done()
                except Queue.Empty:
                    return
                   #Catch things better here
    def calc_stuff(self,param):
        out1 = param * 2
        out2 = param * 4
        out3 = param * 8
        return out1,out2,out3
def Main():
    inputQueue = Queue()
    for i in range(10):
        inputQueue.put(i)
    out1 = Queue()
    out2 = Queue()
    out3 = Queue()
    processes = []
    for x in range(2):
          p = CustomWorker(inputQueue,out1,out2,out3)
          p.daemon = True
          p.start()
          processes.append(p)
    inputQueue.join()
    while(not out1.empty()):
        print out1.get()
        print out2.get()
        print out3.get()
if __name__ == '__main__':
    Main()

希望这能有所帮助。

为了并行化一个简单的for循环,joblib为多处理的原始使用带来了很多价值。不仅是简短的语法,还包括在迭代非常快的时候(以消除开销)进行透明的迭代聚集,或者捕获子进程的回溯,以获得更好的错误报告。

免责声明:我是joblib的原作者。