这可能是一个微不足道的问题,但我如何在python中并行化下面的循环?
# setup output lists
output1 = list()
output2 = list()
output3 = list()
for j in range(0, 10):
# calc individual parameter value
parameter = j * offset
# call the calculation
out1, out2, out3 = calc_stuff(parameter = parameter)
# put results into correct output list
output1.append(out1)
output2.append(out2)
output3.append(out3)
我知道如何在Python中启动单个线程,但我不知道如何“收集”结果。
多个进程也可以——在这种情况下,只要是最简单的就行。我目前使用的是Linux,但代码应该在Windows和Mac上运行。
并行化这段代码最简单的方法是什么?
使用Ray有很多优点:
除了多个核(使用相同的代码)之外,还可以在多台机器上并行。
通过共享内存(和零拷贝序列化)有效地处理数值数据。
具有分布式调度的高任务吞吐量。
容错。
在本例中,您可以启动Ray并定义一个远程函数
import ray
ray.init()
@ray.remote(num_return_vals=3)
def calc_stuff(parameter=None):
# Do something.
return 1, 2, 3
然后并行地调用它
output1, output2, output3 = [], [], []
# Launch the tasks.
for j in range(10):
id1, id2, id3 = calc_stuff.remote(parameter=j)
output1.append(id1)
output2.append(id2)
output3.append(id3)
# Block until the results have finished and get the results.
output1 = ray.get(output1)
output2 = ray.get(output2)
output3 = ray.get(output3)
要在集群上运行相同的示例,唯一需要更改的行是对ray.init()的调用。相关文档可以在这里找到。
请注意,我正在帮助开发雷。
这在用Python实现多处理和并行/分布式计算时非常有用。
YouTube教程使用techila包
Techila是一种分布式计算中间件,它使用Techila包直接与Python集成。包中的peach函数在并行化循环结构时非常有用。(以下代码片段来自Techila社区论坛)
techila.peach(funcname = 'theheavyalgorithm', # Function that will be called on the compute nodes/ Workers
files = 'theheavyalgorithm.py', # Python-file that will be sourced on Workers
jobs = jobcount # Number of Jobs in the Project
)
假设我们有一个async函数
async def work_async(self, student_name: str, code: str, loop):
"""
Some async function
"""
# Do some async procesing
这需要在一个大数组上运行。有些属性被传递给程序,有些则来自数组中字典元素的属性。
async def process_students(self, student_name: str, loop):
market = sys.argv[2]
subjects = [...] #Some large array
batchsize = 5
for i in range(0, len(subjects), batchsize):
batch = subjects[i:i+batchsize]
await asyncio.gather(*(self.work_async(student_name,
sub['Code'],
loop)
for sub in batch))