我正在努力理解Python中的线程。我看过文档和示例,但坦率地说,许多示例过于复杂,我很难理解它们。

如何清楚地显示为多线程划分的任务?


当前回答

借用本文,我们了解了如何在多线程、多处理和异步/异步之间进行选择及其用法。

Python 3有一个新的内置库,以实现并发和并行-concurrent.futures

因此,我将通过一个实验演示如何通过线程池运行四个任务(即.sleep()方法):

from concurrent.futures import ThreadPoolExecutor, as_completed
from time import sleep, time

def concurrent(max_worker):
    futures = []
    tic = time()
    with ThreadPoolExecutor(max_workers=max_worker) as executor:
        futures.append(executor.submit(sleep, 2))  # Two seconds sleep
        futures.append(executor.submit(sleep, 1))
        futures.append(executor.submit(sleep, 7))
        futures.append(executor.submit(sleep, 3))
        for future in as_completed(futures):
            if future.result() is not None:
                print(future.result())
    print(f'Total elapsed time by {max_worker} workers:', time()-tic)

concurrent(5)
concurrent(4)
concurrent(3)
concurrent(2)
concurrent(1)

输出:

Total elapsed time by 5 workers: 7.007831811904907
Total elapsed time by 4 workers: 7.007944107055664
Total elapsed time by 3 workers: 7.003149509429932
Total elapsed time by 2 workers: 8.004627466201782
Total elapsed time by 1 workers: 13.013478994369507

[注]:

正如您在上面的结果中看到的,最好的情况是这四项任务有3名员工。如果有进程任务而不是I/O绑定或阻塞(多处理而不是线程),则可以将ThreadPoolExecutor更改为ProcessPoolExecutoor。

其他回答

下面的代码可以运行10个线程同时打印0到99之间的数字:

from threading import Thread

def test():
    for i in range(0, 100):
        print(i)

thread_list = []

for _ in range(0, 10):
    thread = Thread(target=test)
    thread_list.append(thread)

for thread in thread_list:
    thread.start()

for thread in thread_list:
    thread.join()

下面的代码是上述代码循环版本的简写,运行10个线程,同时打印0到99之间的数字:

from threading import Thread

def test():
    [print(i) for i in range(0, 100)]

thread_list = [Thread(target=test) for _ in range(0, 10)]

[thread.start() for thread in thread_list]

[thread.join() for thread in thread_list]

结果如下:

...
99
83
97
84
98
99
85
86
87
88
...

给定函数f,如下所示:

import threading
threading.Thread(target=f).start()

向f传递参数

threading.Thread(target=f, args=(a,b,c)).start()

自2010年提出这个问题以来,如何使用带有映射和池的Python进行简单的多线程处理已经得到了真正的简化。

下面的代码来自一篇文章/博客文章,您应该明确查看(没有从属关系)-一行中的并行性:一个更好的日常线程任务模型。我将在下面总结一下——它最终只是几行代码:

from multiprocessing.dummy import Pool as ThreadPool
pool = ThreadPool(4)
results = pool.map(my_function, my_array)

以下是多线程版本:

results = []
for item in my_array:
    results.append(my_function(item))

描述

Map是一个很酷的小函数,是将并行性轻松注入Python代码的关键。对于那些不熟悉的人来说,map是从Lisp这样的函数语言中提取出来的。它是一个将另一个函数映射到序列上的函数。Map为我们处理序列上的迭代,应用函数,并在最后将所有结果存储在一个方便的列表中。


实施

map函数的并行版本由两个库提供:multiprocessing,以及它鲜为人知但同样神奇的stepchild:multiprocessing.dummy。

multiprocessing.dummy与多处理模块完全相同,但使用线程(一个重要的区别-对CPU密集型任务使用多个进程;对I/O(和在I/O期间)使用线程):

multiprocessing.dummy复制了多处理的API,但它不过是线程模块的包装器。

import urllib2
from multiprocessing.dummy import Pool as ThreadPool

urls = [
  'http://www.python.org',
  'http://www.python.org/about/',
  'http://www.onlamp.com/pub/a/python/2003/04/17/metaclasses.html',
  'http://www.python.org/doc/',
  'http://www.python.org/download/',
  'http://www.python.org/getit/',
  'http://www.python.org/community/',
  'https://wiki.python.org/moin/',
]

# Make the Pool of workers
pool = ThreadPool(4)

# Open the URLs in their own threads
# and return the results
results = pool.map(urllib2.urlopen, urls)

# Close the pool and wait for the work to finish
pool.close()
pool.join()

计时结果:

Single thread:   14.4 seconds
       4 Pool:   3.1 seconds
       8 Pool:   1.4 seconds
      13 Pool:   1.3 seconds

传递多个参数(仅在Python 3.3及更高版本中如此):

要传递多个数组,请执行以下操作:

results = pool.starmap(function, zip(list_a, list_b))

或者传递常量和数组:

results = pool.starmap(function, zip(itertools.repeat(constant), list_a))

如果您使用的是早期版本的Python,可以通过此解决方法传递多个参数)。

(感谢user136036提供的有用评论。)

使用全新的concurrent.futures模块

def sqr(val):
    import time
    time.sleep(0.1)
    return val * val

def process_result(result):
    print(result)

def process_these_asap(tasks):
    import concurrent.futures

    with concurrent.futures.ProcessPoolExecutor() as executor:
        futures = []
        for task in tasks:
            futures.append(executor.submit(sqr, task))

        for future in concurrent.futures.as_completed(futures):
            process_result(future.result())
        # Or instead of all this just do:
        # results = executor.map(sqr, tasks)
        # list(map(process_result, results))

def main():
    tasks = list(range(10))
    print('Processing {} tasks'.format(len(tasks)))
    process_these_asap(tasks)
    print('Done')
    return 0

if __name__ == '__main__':
    import sys
    sys.exit(main())

执行器方法对于所有以前接触过Java的人来说似乎都很熟悉。

还有一个附带说明:为了保持宇宙的正常,如果你不使用上下文,不要忘记关闭你的池/执行器(这是如此棒,它为你做了)

作为第二个anwser的python3版本:

import queue as Queue
import threading
import urllib.request

# Called by each thread
def get_url(q, url):
    q.put(urllib.request.urlopen(url).read())

theurls = ["http://google.com", "http://yahoo.com", "http://www.python.org","https://wiki.python.org/moin/"]

q = Queue.Queue()
def thread_func():
    for u in theurls:
        t = threading.Thread(target=get_url, args = (q,u))
        t.daemon = True
        t.start()

    s = q.get()
    
def non_thread_func():
    for u in theurls:
        get_url(q,u)
        

    s = q.get()
   

您可以测试它:

start = time.time()
thread_func()
end = time.time()
print(end - start)

start = time.time()
non_thread_func()
end = time.time()
print(end - start)

non_thread_func()花费的时间应该是thread_func()的4倍