这可能是一个微不足道的问题,但我如何在python中并行化下面的循环?

# setup output lists
output1 = list()
output2 = list()
output3 = list()

for j in range(0, 10):
    # calc individual parameter value
    parameter = j * offset
    # call the calculation
    out1, out2, out3 = calc_stuff(parameter = parameter)

    # put results into correct output list
    output1.append(out1)
    output2.append(out2)
    output3.append(out3)

我知道如何在Python中启动单个线程,但我不知道如何“收集”结果。

多个进程也可以——在这种情况下,只要是最简单的就行。我目前使用的是Linux,但代码应该在Windows和Mac上运行。

并行化这段代码最简单的方法是什么?


当前回答

Dask期货;我很惊讶至今还没有人提起这件事……

from dask.distributed import Client

client = Client(n_workers=8) # In this example I have 8 cores and processes (can also use threads if desired)

def my_function(i):
    output = <code to execute in the for loop here>
    return output

futures = []

for i in <whatever you want to loop across here>:
    future = client.submit(my_function, i)
    futures.append(future)

results = client.gather(futures)
client.close()

其他回答

使用Ray有很多优点:

除了多个核(使用相同的代码)之外,还可以在多台机器上并行。 通过共享内存(和零拷贝序列化)有效地处理数值数据。 具有分布式调度的高任务吞吐量。 容错。

在本例中,您可以启动Ray并定义一个远程函数

import ray

ray.init()

@ray.remote(num_return_vals=3)
def calc_stuff(parameter=None):
    # Do something.
    return 1, 2, 3

然后并行地调用它

output1, output2, output3 = [], [], []

# Launch the tasks.
for j in range(10):
    id1, id2, id3 = calc_stuff.remote(parameter=j)
    output1.append(id1)
    output2.append(id2)
    output3.append(id3)

# Block until the results have finished and get the results.
output1 = ray.get(output1)
output2 = ray.get(output2)
output3 = ray.get(output3)

要在集群上运行相同的示例,唯一需要更改的行是对ray.init()的调用。相关文档可以在这里找到。

请注意,我正在帮助开发雷。

Dask期货;我很惊讶至今还没有人提起这件事……

from dask.distributed import Client

client = Client(n_workers=8) # In this example I have 8 cores and processes (can also use threads if desired)

def my_function(i):
    output = <code to execute in the for loop here>
    return output

futures = []

for i in <whatever you want to loop across here>:
    future = client.submit(my_function, i)
    futures.append(future)

results = client.gather(futures)
client.close()

这在用Python实现多处理和并行/分布式计算时非常有用。

YouTube教程使用techila包

Techila是一种分布式计算中间件,它使用Techila包直接与Python集成。包中的peach函数在并行化循环结构时非常有用。(以下代码片段来自Techila社区论坛)

techila.peach(funcname = 'theheavyalgorithm', # Function that will be called on the compute nodes/ Workers
    files = 'theheavyalgorithm.py', # Python-file that will be sourced on Workers
    jobs = jobcount # Number of Jobs in the Project
    )

由于@iuryxavier

from multiprocessing import Pool
from multiprocessing import cpu_count


def add_1(x):
    return x + 1

if __name__ == "__main__":
    pool = Pool(cpu_count())
    results = pool.map(add_1, range(10**12))
    pool.close()  # 'TERM'
    pool.join()   # 'KILL'

假设我们有一个async函数

async def work_async(self, student_name: str, code: str, loop):
"""
Some async function
"""
    # Do some async procesing    

这需要在一个大数组上运行。有些属性被传递给程序,有些则来自数组中字典元素的属性。

async def process_students(self, student_name: str, loop):
    market = sys.argv[2]
    subjects = [...] #Some large array
    batchsize = 5
    for i in range(0, len(subjects), batchsize):
        batch = subjects[i:i+batchsize]
        await asyncio.gather(*(self.work_async(student_name,
                                           sub['Code'],
                                           loop)
                       for sub in batch))