我正在努力理解Python中的线程。我看过文档和示例,但坦率地说,许多示例过于复杂,我很难理解它们。
如何清楚地显示为多线程划分的任务?
我正在努力理解Python中的线程。我看过文档和示例,但坦率地说,许多示例过于复杂,我很难理解它们。
如何清楚地显示为多线程划分的任务?
当前回答
使用线程/多处理的最简单方法是使用更多高级库,如autothread。
import autothread
from time import sleep as heavyworkload
@autothread.multithreaded() # <-- This is all you need to add
def example(x: int, y: int):
heavyworkload(1)
return x*y
现在,您可以为函数提供int列表。Autothread将为您处理所有事务,并只提供并行计算的结果。
result = example([1, 2, 3, 4, 5], 10)
其他回答
自2010年提出这个问题以来,如何使用带有映射和池的Python进行简单的多线程处理已经得到了真正的简化。
下面的代码来自一篇文章/博客文章,您应该明确查看(没有从属关系)-一行中的并行性:一个更好的日常线程任务模型。我将在下面总结一下——它最终只是几行代码:
from multiprocessing.dummy import Pool as ThreadPool
pool = ThreadPool(4)
results = pool.map(my_function, my_array)
以下是多线程版本:
results = []
for item in my_array:
results.append(my_function(item))
描述
Map是一个很酷的小函数,是将并行性轻松注入Python代码的关键。对于那些不熟悉的人来说,map是从Lisp这样的函数语言中提取出来的。它是一个将另一个函数映射到序列上的函数。Map为我们处理序列上的迭代,应用函数,并在最后将所有结果存储在一个方便的列表中。
实施
map函数的并行版本由两个库提供:multiprocessing,以及它鲜为人知但同样神奇的stepchild:multiprocessing.dummy。
multiprocessing.dummy与多处理模块完全相同,但使用线程(一个重要的区别-对CPU密集型任务使用多个进程;对I/O(和在I/O期间)使用线程):
multiprocessing.dummy复制了多处理的API,但它不过是线程模块的包装器。
import urllib2
from multiprocessing.dummy import Pool as ThreadPool
urls = [
'http://www.python.org',
'http://www.python.org/about/',
'http://www.onlamp.com/pub/a/python/2003/04/17/metaclasses.html',
'http://www.python.org/doc/',
'http://www.python.org/download/',
'http://www.python.org/getit/',
'http://www.python.org/community/',
'https://wiki.python.org/moin/',
]
# Make the Pool of workers
pool = ThreadPool(4)
# Open the URLs in their own threads
# and return the results
results = pool.map(urllib2.urlopen, urls)
# Close the pool and wait for the work to finish
pool.close()
pool.join()
计时结果:
Single thread: 14.4 seconds
4 Pool: 3.1 seconds
8 Pool: 1.4 seconds
13 Pool: 1.3 seconds
传递多个参数(仅在Python 3.3及更高版本中如此):
要传递多个数组,请执行以下操作:
results = pool.starmap(function, zip(list_a, list_b))
或者传递常量和数组:
results = pool.starmap(function, zip(itertools.repeat(constant), list_a))
如果您使用的是早期版本的Python,可以通过此解决方法传递多个参数)。
(感谢user136036提供的有用评论。)
以前的解决方案都没有在我的GNU/Linux服务器上使用多个内核(我没有管理员权限)。他们只是在一个核心上跑步。
我使用较低级别的os.fork接口来派生多个进程。这是对我有用的代码:
from os import fork
values = ['different', 'values', 'for', 'threads']
for i in range(len(values)):
p = fork()
if p == 0:
my_function(values[i])
break
使用线程/多处理的最简单方法是使用更多高级库,如autothread。
import autothread
from time import sleep as heavyworkload
@autothread.multithreaded() # <-- This is all you need to add
def example(x: int, y: int):
heavyworkload(1)
return x*y
现在,您可以为函数提供int列表。Autothread将为您处理所有事务,并只提供并行计算的结果。
result = example([1, 2, 3, 4, 5], 10)
注意:对于Python中的实际并行化,您应该使用多处理模块来分叉并行执行的多个进程(由于全局解释器锁,Python线程提供了交织,但实际上它们是串行执行的,而不是并行执行的,并且仅在交织I/O操作时有用)。
然而,如果您只是在寻找交错(或者正在执行可以并行化的I/O操作,尽管存在全局解释器锁),那么线程模块就是开始的地方。作为一个非常简单的例子,让我们考虑通过并行对子范围求和来对大范围求和的问题:
import threading
class SummingThread(threading.Thread):
def __init__(self,low,high):
super(SummingThread, self).__init__()
self.low=low
self.high=high
self.total=0
def run(self):
for i in range(self.low,self.high):
self.total+=i
thread1 = SummingThread(0,500000)
thread2 = SummingThread(500000,1000000)
thread1.start() # This actually causes the thread to run
thread2.start()
thread1.join() # This waits until the thread has completed
thread2.join()
# At this point, both threads have completed
result = thread1.total + thread2.total
print result
请注意,以上是一个非常愚蠢的示例,因为它绝对没有I/O,并且由于全局解释器锁,虽然在CPython中交错执行(增加了上下文切换的开销),但仍将串行执行。
大多数文档和教程都使用Python的“线程和队列”模块,对于初学者来说,它们可能会让人不知所措。
也许可以考虑Python 3的concurrent.futures.ThreadPoolExecutor模块。
结合子句和列表理解,这可能是一个真正的魅力。
from concurrent.futures import ThreadPoolExecutor, as_completed
def get_url(url):
# Your actual program here. Using threading.Lock() if necessary
return ""
# List of URLs to fetch
urls = ["url1", "url2"]
with ThreadPoolExecutor(max_workers = 5) as executor:
# Create threads
futures = {executor.submit(get_url, url) for url in urls}
# as_completed() gives you the threads once finished
for f in as_completed(futures):
# Get the results
rs = f.result()