我正在努力理解Python中的线程。我看过文档和示例,但坦率地说,许多示例过于复杂,我很难理解它们。

如何清楚地显示为多线程划分的任务?


当前回答

Alex Martelli的回答对我有所帮助。不过,这里有一个我认为更有用的修改版本(至少对我来说)。

更新:可在Python 2和Python 3中使用

try:
    # For Python 3
    import queue
    from urllib.request import urlopen
except:
    # For Python 2 
    import Queue as queue
    from urllib2 import urlopen

import threading

worker_data = ['http://google.com', 'http://yahoo.com', 'http://bing.com']

# Load up a queue with your data. This will handle locking
q = queue.Queue()
for url in worker_data:
    q.put(url)

# Define a worker function
def worker(url_queue):
    queue_full = True
    while queue_full:
        try:
            # Get your data off the queue, and do some work
            url = url_queue.get(False)
            data = urlopen(url).read()
            print(len(data))

        except queue.Empty:
            queue_full = False

# Create as many threads as you want
thread_count = 5
for i in range(thread_count):
    t = threading.Thread(target=worker, args = (q,))
    t.start()

其他回答

以前的解决方案都没有在我的GNU/Linux服务器上使用多个内核(我没有管理员权限)。他们只是在一个核心上跑步。

我使用较低级别的os.fork接口来派生多个进程。这是对我有用的代码:

from os import fork

values = ['different', 'values', 'for', 'threads']

for i in range(len(values)):
    p = fork()
    if p == 0:
        my_function(values[i])
        break

借用本文,我们了解了如何在多线程、多处理和异步/异步之间进行选择及其用法。

Python 3有一个新的内置库,以实现并发和并行-concurrent.futures

因此,我将通过一个实验演示如何通过线程池运行四个任务(即.sleep()方法):

from concurrent.futures import ThreadPoolExecutor, as_completed
from time import sleep, time

def concurrent(max_worker):
    futures = []
    tic = time()
    with ThreadPoolExecutor(max_workers=max_worker) as executor:
        futures.append(executor.submit(sleep, 2))  # Two seconds sleep
        futures.append(executor.submit(sleep, 1))
        futures.append(executor.submit(sleep, 7))
        futures.append(executor.submit(sleep, 3))
        for future in as_completed(futures):
            if future.result() is not None:
                print(future.result())
    print(f'Total elapsed time by {max_worker} workers:', time()-tic)

concurrent(5)
concurrent(4)
concurrent(3)
concurrent(2)
concurrent(1)

输出:

Total elapsed time by 5 workers: 7.007831811904907
Total elapsed time by 4 workers: 7.007944107055664
Total elapsed time by 3 workers: 7.003149509429932
Total elapsed time by 2 workers: 8.004627466201782
Total elapsed time by 1 workers: 13.013478994369507

[注]:

正如您在上面的结果中看到的,最好的情况是这四项任务有3名员工。如果有进程任务而不是I/O绑定或阻塞(多处理而不是线程),则可以将ThreadPoolExecutor更改为ProcessPoolExecutoor。

下面的代码可以运行10个线程同时打印0到99之间的数字:

from threading import Thread

def test():
    for i in range(0, 100):
        print(i)

thread_list = []

for _ in range(0, 10):
    thread = Thread(target=test)
    thread_list.append(thread)

for thread in thread_list:
    thread.start()

for thread in thread_list:
    thread.join()

下面的代码是上述代码循环版本的简写,运行10个线程,同时打印0到99之间的数字:

from threading import Thread

def test():
    [print(i) for i in range(0, 100)]

thread_list = [Thread(target=test) for _ in range(0, 10)]

[thread.start() for thread in thread_list]

[thread.join() for thread in thread_list]

结果如下:

...
99
83
97
84
98
99
85
86
87
88
...

与其他提到的一样,由于GIL,CPython只能在I/O等待时使用线程。

如果您想从多个内核中获得CPU绑定任务的好处,请使用多处理:

from multiprocessing import Process

def f(name):
    print 'hello', name

if __name__ == '__main__':
    p = Process(target=f, args=('bob',))
    p.start()
    p.join()

大多数文档和教程都使用Python的“线程和队列”模块,对于初学者来说,它们可能会让人不知所措。

也许可以考虑Python 3的concurrent.futures.ThreadPoolExecutor模块。

结合子句和列表理解,这可能是一个真正的魅力。

from concurrent.futures import ThreadPoolExecutor, as_completed

def get_url(url):
    # Your actual program here. Using threading.Lock() if necessary
    return ""

# List of URLs to fetch
urls = ["url1", "url2"]

with ThreadPoolExecutor(max_workers = 5) as executor:

    # Create threads
    futures = {executor.submit(get_url, url) for url in urls}

    # as_completed() gives you the threads once finished
    for f in as_completed(futures):
        # Get the results
        rs = f.result()