我正在努力理解Python中的线程。我看过文档和示例,但坦率地说,许多示例过于复杂,我很难理解它们。

如何清楚地显示为多线程划分的任务?


当前回答

以前的解决方案都没有在我的GNU/Linux服务器上使用多个内核(我没有管理员权限)。他们只是在一个核心上跑步。

我使用较低级别的os.fork接口来派生多个进程。这是对我有用的代码:

from os import fork

values = ['different', 'values', 'for', 'threads']

for i in range(len(values)):
    p = fork()
    if p == 0:
        my_function(values[i])
        break

其他回答

与其他提到的一样,由于GIL,CPython只能在I/O等待时使用线程。

如果您想从多个内核中获得CPU绑定任务的好处,请使用多处理:

from multiprocessing import Process

def f(name):
    print 'hello', name

if __name__ == '__main__':
    p = Process(target=f, args=('bob',))
    p.start()
    p.join()

自2010年提出这个问题以来,如何使用带有映射和池的Python进行简单的多线程处理已经得到了真正的简化。

下面的代码来自一篇文章/博客文章,您应该明确查看(没有从属关系)-一行中的并行性:一个更好的日常线程任务模型。我将在下面总结一下——它最终只是几行代码:

from multiprocessing.dummy import Pool as ThreadPool
pool = ThreadPool(4)
results = pool.map(my_function, my_array)

以下是多线程版本:

results = []
for item in my_array:
    results.append(my_function(item))

描述

Map是一个很酷的小函数,是将并行性轻松注入Python代码的关键。对于那些不熟悉的人来说,map是从Lisp这样的函数语言中提取出来的。它是一个将另一个函数映射到序列上的函数。Map为我们处理序列上的迭代,应用函数,并在最后将所有结果存储在一个方便的列表中。


实施

map函数的并行版本由两个库提供:multiprocessing,以及它鲜为人知但同样神奇的stepchild:multiprocessing.dummy。

multiprocessing.dummy与多处理模块完全相同,但使用线程(一个重要的区别-对CPU密集型任务使用多个进程;对I/O(和在I/O期间)使用线程):

multiprocessing.dummy复制了多处理的API,但它不过是线程模块的包装器。

import urllib2
from multiprocessing.dummy import Pool as ThreadPool

urls = [
  'http://www.python.org',
  'http://www.python.org/about/',
  'http://www.onlamp.com/pub/a/python/2003/04/17/metaclasses.html',
  'http://www.python.org/doc/',
  'http://www.python.org/download/',
  'http://www.python.org/getit/',
  'http://www.python.org/community/',
  'https://wiki.python.org/moin/',
]

# Make the Pool of workers
pool = ThreadPool(4)

# Open the URLs in their own threads
# and return the results
results = pool.map(urllib2.urlopen, urls)

# Close the pool and wait for the work to finish
pool.close()
pool.join()

计时结果:

Single thread:   14.4 seconds
       4 Pool:   3.1 seconds
       8 Pool:   1.4 seconds
      13 Pool:   1.3 seconds

传递多个参数(仅在Python 3.3及更高版本中如此):

要传递多个数组,请执行以下操作:

results = pool.starmap(function, zip(list_a, list_b))

或者传递常量和数组:

results = pool.starmap(function, zip(itertools.repeat(constant), list_a))

如果您使用的是早期版本的Python,可以通过此解决方法传递多个参数)。

(感谢user136036提供的有用评论。)

这里有一个简单的示例:您需要尝试一些替代URL,并返回第一个URL的内容以进行响应。

import Queue
import threading
import urllib2

# Called by each thread
def get_url(q, url):
    q.put(urllib2.urlopen(url).read())

theurls = ["http://google.com", "http://yahoo.com"]

q = Queue.Queue()

for u in theurls:
    t = threading.Thread(target=get_url, args = (q,u))
    t.daemon = True
    t.start()

s = q.get()
print s

在这种情况下,线程被用作一种简单的优化:每个子线程都在等待URL解析和响应,以将其内容放入队列;每个线程都是一个守护进程(如果主线程结束,则不会保持进程运行——这比不结束更常见);主线程启动所有子线程,在队列中执行get以等待其中一个线程完成put,然后发出结果并终止(这将删除所有可能仍在运行的子线程,因为它们是守护进程线程)。

Python中线程的正确使用总是与I/O操作相关(因为CPython无论如何都不使用多个内核来运行CPU绑定的任务,线程的唯一原因是在等待一些I/O时不会阻塞进程)。顺便说一句,队列几乎总是将工作分配给线程和/或收集工作结果的最佳方式,而且它们本质上是线程安全的,因此它们使您不用担心锁、条件、事件、信号量和其他线程间协调/通信概念。

借用本文,我们了解了如何在多线程、多处理和异步/异步之间进行选择及其用法。

Python 3有一个新的内置库,以实现并发和并行-concurrent.futures

因此,我将通过一个实验演示如何通过线程池运行四个任务(即.sleep()方法):

from concurrent.futures import ThreadPoolExecutor, as_completed
from time import sleep, time

def concurrent(max_worker):
    futures = []
    tic = time()
    with ThreadPoolExecutor(max_workers=max_worker) as executor:
        futures.append(executor.submit(sleep, 2))  # Two seconds sleep
        futures.append(executor.submit(sleep, 1))
        futures.append(executor.submit(sleep, 7))
        futures.append(executor.submit(sleep, 3))
        for future in as_completed(futures):
            if future.result() is not None:
                print(future.result())
    print(f'Total elapsed time by {max_worker} workers:', time()-tic)

concurrent(5)
concurrent(4)
concurrent(3)
concurrent(2)
concurrent(1)

输出:

Total elapsed time by 5 workers: 7.007831811904907
Total elapsed time by 4 workers: 7.007944107055664
Total elapsed time by 3 workers: 7.003149509429932
Total elapsed time by 2 workers: 8.004627466201782
Total elapsed time by 1 workers: 13.013478994369507

[注]:

正如您在上面的结果中看到的,最好的情况是这四项任务有3名员工。如果有进程任务而不是I/O绑定或阻塞(多处理而不是线程),则可以将ThreadPoolExecutor更改为ProcessPoolExecutoor。

对我来说,线程的最佳示例是监视异步事件。看看这个代码。

# thread_test.py
import threading
import time

class Monitor(threading.Thread):
    def __init__(self, mon):
        threading.Thread.__init__(self)
        self.mon = mon

    def run(self):
        while True:
            if self.mon[0] == 2:
                print "Mon = 2"
                self.mon[0] = 3;

您可以通过打开IPython会话并执行以下操作来使用此代码:

>>> from thread_test import Monitor
>>> a = [0]
>>> mon = Monitor(a)
>>> mon.start()
>>> a[0] = 2
Mon = 2
>>>a[0] = 2
Mon = 2

等几分钟

>>> a[0] = 2
Mon = 2