我正在努力理解Python中的线程。我看过文档和示例,但坦率地说,许多示例过于复杂,我很难理解它们。

如何清楚地显示为多线程划分的任务?


当前回答

以前的解决方案都没有在我的GNU/Linux服务器上使用多个内核(我没有管理员权限)。他们只是在一个核心上跑步。

我使用较低级别的os.fork接口来派生多个进程。这是对我有用的代码:

from os import fork

values = ['different', 'values', 'for', 'threads']

for i in range(len(values)):
    p = fork()
    if p == 0:
        my_function(values[i])
        break

其他回答

给定函数f,如下所示:

import threading
threading.Thread(target=f).start()

向f传递参数

threading.Thread(target=f, args=(a,b,c)).start()

注意:对于Python中的实际并行化,您应该使用多处理模块来分叉并行执行的多个进程(由于全局解释器锁,Python线程提供了交织,但实际上它们是串行执行的,而不是并行执行的,并且仅在交织I/O操作时有用)。

然而,如果您只是在寻找交错(或者正在执行可以并行化的I/O操作,尽管存在全局解释器锁),那么线程模块就是开始的地方。作为一个非常简单的例子,让我们考虑通过并行对子范围求和来对大范围求和的问题:

import threading

class SummingThread(threading.Thread):
     def __init__(self,low,high):
         super(SummingThread, self).__init__()
         self.low=low
         self.high=high
         self.total=0

     def run(self):
         for i in range(self.low,self.high):
             self.total+=i


thread1 = SummingThread(0,500000)
thread2 = SummingThread(500000,1000000)
thread1.start() # This actually causes the thread to run
thread2.start()
thread1.join()  # This waits until the thread has completed
thread2.join()
# At this point, both threads have completed
result = thread1.total + thread2.total
print result

请注意,以上是一个非常愚蠢的示例,因为它绝对没有I/O,并且由于全局解释器锁,虽然在CPython中交错执行(增加了上下文切换的开销),但仍将串行执行。

使用全新的concurrent.futures模块

def sqr(val):
    import time
    time.sleep(0.1)
    return val * val

def process_result(result):
    print(result)

def process_these_asap(tasks):
    import concurrent.futures

    with concurrent.futures.ProcessPoolExecutor() as executor:
        futures = []
        for task in tasks:
            futures.append(executor.submit(sqr, task))

        for future in concurrent.futures.as_completed(futures):
            process_result(future.result())
        # Or instead of all this just do:
        # results = executor.map(sqr, tasks)
        # list(map(process_result, results))

def main():
    tasks = list(range(10))
    print('Processing {} tasks'.format(len(tasks)))
    process_these_asap(tasks)
    print('Done')
    return 0

if __name__ == '__main__':
    import sys
    sys.exit(main())

执行器方法对于所有以前接触过Java的人来说似乎都很熟悉。

还有一个附带说明:为了保持宇宙的正常,如果你不使用上下文,不要忘记关闭你的池/执行器(这是如此棒,它为你做了)

以前的解决方案都没有在我的GNU/Linux服务器上使用多个内核(我没有管理员权限)。他们只是在一个核心上跑步。

我使用较低级别的os.fork接口来派生多个进程。这是对我有用的代码:

from os import fork

values = ['different', 'values', 'for', 'threads']

for i in range(len(values)):
    p = fork()
    if p == 0:
        my_function(values[i])
        break

借用本文,我们了解了如何在多线程、多处理和异步/异步之间进行选择及其用法。

Python 3有一个新的内置库,以实现并发和并行-concurrent.futures

因此,我将通过一个实验演示如何通过线程池运行四个任务(即.sleep()方法):

from concurrent.futures import ThreadPoolExecutor, as_completed
from time import sleep, time

def concurrent(max_worker):
    futures = []
    tic = time()
    with ThreadPoolExecutor(max_workers=max_worker) as executor:
        futures.append(executor.submit(sleep, 2))  # Two seconds sleep
        futures.append(executor.submit(sleep, 1))
        futures.append(executor.submit(sleep, 7))
        futures.append(executor.submit(sleep, 3))
        for future in as_completed(futures):
            if future.result() is not None:
                print(future.result())
    print(f'Total elapsed time by {max_worker} workers:', time()-tic)

concurrent(5)
concurrent(4)
concurrent(3)
concurrent(2)
concurrent(1)

输出:

Total elapsed time by 5 workers: 7.007831811904907
Total elapsed time by 4 workers: 7.007944107055664
Total elapsed time by 3 workers: 7.003149509429932
Total elapsed time by 2 workers: 8.004627466201782
Total elapsed time by 1 workers: 13.013478994369507

[注]:

正如您在上面的结果中看到的,最好的情况是这四项任务有3名员工。如果有进程任务而不是I/O绑定或阻塞(多处理而不是线程),则可以将ThreadPoolExecutor更改为ProcessPoolExecutoor。