这可能是一个微不足道的问题,但我如何在python中并行化下面的循环?

# setup output lists
output1 = list()
output2 = list()
output3 = list()

for j in range(0, 10):
    # calc individual parameter value
    parameter = j * offset
    # call the calculation
    out1, out2, out3 = calc_stuff(parameter = parameter)

    # put results into correct output list
    output1.append(out1)
    output2.append(out2)
    output3.append(out3)

我知道如何在Python中启动单个线程,但我不知道如何“收集”结果。

多个进程也可以——在这种情况下,只要是最简单的就行。我目前使用的是Linux,但代码应该在Windows和Mac上运行。

并行化这段代码最简单的方法是什么?


当前回答

我发现joblib对我很有用。请看下面的例子:

from joblib import Parallel, delayed
def yourfunction(k):   
    s=3.14*k*k
    print "Area of a circle with a radius ", k, " is:", s

element_run = Parallel(n_jobs=-1)(delayed(yourfunction)(k) for k in range(1,10))

N_jobs =-1:使用所有可用内核

其他回答

为了并行化一个简单的for循环,joblib为多处理的原始使用带来了很多价值。不仅是简短的语法,还包括在迭代非常快的时候(以消除开销)进行透明的迭代聚集,或者捕获子进程的回溯,以获得更好的错误报告。

免责声明:我是joblib的原作者。

看看这个;

http://docs.python.org/library/queue.html

这可能不是正确的方法,但我会这样做;

实际的代码;

from multiprocessing import Process, JoinableQueue as Queue 

class CustomWorker(Process):
    def __init__(self,workQueue, out1,out2,out3):
        Process.__init__(self)
        self.input=workQueue
        self.out1=out1
        self.out2=out2
        self.out3=out3
    def run(self):
            while True:
                try:
                    value = self.input.get()
                    #value modifier
                    temp1,temp2,temp3 = self.calc_stuff(value)
                    self.out1.put(temp1)
                    self.out2.put(temp2)
                    self.out3.put(temp3)
                    self.input.task_done()
                except Queue.Empty:
                    return
                   #Catch things better here
    def calc_stuff(self,param):
        out1 = param * 2
        out2 = param * 4
        out3 = param * 8
        return out1,out2,out3
def Main():
    inputQueue = Queue()
    for i in range(10):
        inputQueue.put(i)
    out1 = Queue()
    out2 = Queue()
    out3 = Queue()
    processes = []
    for x in range(2):
          p = CustomWorker(inputQueue,out1,out2,out3)
          p.daemon = True
          p.start()
          processes.append(p)
    inputQueue.join()
    while(not out1.empty()):
        print out1.get()
        print out2.get()
        print out3.get()
if __name__ == '__main__':
    Main()

希望这能有所帮助。

Dask期货;我很惊讶至今还没有人提起这件事……

from dask.distributed import Client

client = Client(n_workers=8) # In this example I have 8 cores and processes (can also use threads if desired)

def my_function(i):
    output = <code to execute in the for loop here>
    return output

futures = []

for i in <whatever you want to loop across here>:
    future = client.submit(my_function, i)
    futures.append(future)

results = client.gather(futures)
client.close()

为什么不用线程和一个互斥来保护一个全局列表呢?

import os
import re
import time
import sys
import thread

from threading import Thread

class thread_it(Thread):
    def __init__ (self,param):
        Thread.__init__(self)
        self.param = param
    def run(self):
        mutex.acquire()
        output.append(calc_stuff(self.param))
        mutex.release()   


threads = []
output = []
mutex = thread.allocate_lock()

for j in range(0, 10):
    current = thread_it(j * offset)
    threads.append(current)
    current.start()

for t in threads:
    t.join()

#here you have output list filled with data

请记住,您的速度将与最慢的线程一样快

使用Ray有很多优点:

除了多个核(使用相同的代码)之外,还可以在多台机器上并行。 通过共享内存(和零拷贝序列化)有效地处理数值数据。 具有分布式调度的高任务吞吐量。 容错。

在本例中,您可以启动Ray并定义一个远程函数

import ray

ray.init()

@ray.remote(num_return_vals=3)
def calc_stuff(parameter=None):
    # Do something.
    return 1, 2, 3

然后并行地调用它

output1, output2, output3 = [], [], []

# Launch the tasks.
for j in range(10):
    id1, id2, id3 = calc_stuff.remote(parameter=j)
    output1.append(id1)
    output2.append(id2)
    output3.append(id3)

# Block until the results have finished and get the results.
output1 = ray.get(output1)
output2 = ray.get(output2)
output3 = ray.get(output3)

要在集群上运行相同的示例,唯一需要更改的行是对ray.init()的调用。相关文档可以在这里找到。

请注意,我正在帮助开发雷。