我正在努力理解Python中的线程。我看过文档和示例,但坦率地说,许多示例过于复杂,我很难理解它们。
如何清楚地显示为多线程划分的任务?
我正在努力理解Python中的线程。我看过文档和示例,但坦率地说,许多示例过于复杂,我很难理解它们。
如何清楚地显示为多线程划分的任务?
当前回答
只需注意:线程不需要队列。
这是我可以想象的最简单的例子,它显示了10个并发运行的进程。
import threading
from random import randint
from time import sleep
def print_number(number):
# Sleeps a random 1 to 10 seconds
rand_int_var = randint(1, 10)
sleep(rand_int_var)
print "Thread " + str(number) + " slept for " + str(rand_int_var) + " seconds"
thread_list = []
for i in range(1, 10):
# Instantiates the thread
# (i) does not make a sequence, so (i,)
t = threading.Thread(target=print_number, args=(i,))
# Sticks the thread in a list so that it remains accessible
thread_list.append(t)
# Starts threads
for thread in thread_list:
thread.start()
# This blocks the calling thread until the thread whose join() method is called is terminated.
# From http://docs.python.org/2/library/threading.html#thread-objects
for thread in thread_list:
thread.join()
# Demonstrates that the main process waited for threads to complete
print "Done"
其他回答
自2010年提出这个问题以来,如何使用带有映射和池的Python进行简单的多线程处理已经得到了真正的简化。
下面的代码来自一篇文章/博客文章,您应该明确查看(没有从属关系)-一行中的并行性:一个更好的日常线程任务模型。我将在下面总结一下——它最终只是几行代码:
from multiprocessing.dummy import Pool as ThreadPool
pool = ThreadPool(4)
results = pool.map(my_function, my_array)
以下是多线程版本:
results = []
for item in my_array:
results.append(my_function(item))
描述
Map是一个很酷的小函数,是将并行性轻松注入Python代码的关键。对于那些不熟悉的人来说,map是从Lisp这样的函数语言中提取出来的。它是一个将另一个函数映射到序列上的函数。Map为我们处理序列上的迭代,应用函数,并在最后将所有结果存储在一个方便的列表中。
实施
map函数的并行版本由两个库提供:multiprocessing,以及它鲜为人知但同样神奇的stepchild:multiprocessing.dummy。
multiprocessing.dummy与多处理模块完全相同,但使用线程(一个重要的区别-对CPU密集型任务使用多个进程;对I/O(和在I/O期间)使用线程):
multiprocessing.dummy复制了多处理的API,但它不过是线程模块的包装器。
import urllib2
from multiprocessing.dummy import Pool as ThreadPool
urls = [
'http://www.python.org',
'http://www.python.org/about/',
'http://www.onlamp.com/pub/a/python/2003/04/17/metaclasses.html',
'http://www.python.org/doc/',
'http://www.python.org/download/',
'http://www.python.org/getit/',
'http://www.python.org/community/',
'https://wiki.python.org/moin/',
]
# Make the Pool of workers
pool = ThreadPool(4)
# Open the URLs in their own threads
# and return the results
results = pool.map(urllib2.urlopen, urls)
# Close the pool and wait for the work to finish
pool.close()
pool.join()
计时结果:
Single thread: 14.4 seconds
4 Pool: 3.1 seconds
8 Pool: 1.4 seconds
13 Pool: 1.3 seconds
传递多个参数(仅在Python 3.3及更高版本中如此):
要传递多个数组,请执行以下操作:
results = pool.starmap(function, zip(list_a, list_b))
或者传递常量和数组:
results = pool.starmap(function, zip(itertools.repeat(constant), list_a))
如果您使用的是早期版本的Python,可以通过此解决方法传递多个参数)。
(感谢user136036提供的有用评论。)
这里是多线程,有一个简单的例子会很有帮助。您可以运行它并轻松了解多线程在Python中的工作方式。我使用了一个锁来防止访问其他线程,直到前面的线程完成它们的工作。通过使用这行代码,
t锁定=线程。有界信号量(值=4)
您可以一次允许多个进程,并保留将在稍后或完成之前的进程后运行的其余线程。
import threading
import time
#tLock = threading.Lock()
tLock = threading.BoundedSemaphore(value=4)
def timer(name, delay, repeat):
print "\r\nTimer: ", name, " Started"
tLock.acquire()
print "\r\n", name, " has the acquired the lock"
while repeat > 0:
time.sleep(delay)
print "\r\n", name, ": ", str(time.ctime(time.time()))
repeat -= 1
print "\r\n", name, " is releaseing the lock"
tLock.release()
print "\r\nTimer: ", name, " Completed"
def Main():
t1 = threading.Thread(target=timer, args=("Timer1", 2, 5))
t2 = threading.Thread(target=timer, args=("Timer2", 3, 5))
t3 = threading.Thread(target=timer, args=("Timer3", 4, 5))
t4 = threading.Thread(target=timer, args=("Timer4", 5, 5))
t5 = threading.Thread(target=timer, args=("Timer5", 0.1, 5))
t1.start()
t2.start()
t3.start()
t4.start()
t5.start()
print "\r\nMain Complete"
if __name__ == "__main__":
Main()
借用本文,我们了解了如何在多线程、多处理和异步/异步之间进行选择及其用法。
Python 3有一个新的内置库,以实现并发和并行-concurrent.futures
因此,我将通过一个实验演示如何通过线程池运行四个任务(即.sleep()方法):
from concurrent.futures import ThreadPoolExecutor, as_completed
from time import sleep, time
def concurrent(max_worker):
futures = []
tic = time()
with ThreadPoolExecutor(max_workers=max_worker) as executor:
futures.append(executor.submit(sleep, 2)) # Two seconds sleep
futures.append(executor.submit(sleep, 1))
futures.append(executor.submit(sleep, 7))
futures.append(executor.submit(sleep, 3))
for future in as_completed(futures):
if future.result() is not None:
print(future.result())
print(f'Total elapsed time by {max_worker} workers:', time()-tic)
concurrent(5)
concurrent(4)
concurrent(3)
concurrent(2)
concurrent(1)
输出:
Total elapsed time by 5 workers: 7.007831811904907
Total elapsed time by 4 workers: 7.007944107055664
Total elapsed time by 3 workers: 7.003149509429932
Total elapsed time by 2 workers: 8.004627466201782
Total elapsed time by 1 workers: 13.013478994369507
[注]:
正如您在上面的结果中看到的,最好的情况是这四项任务有3名员工。如果有进程任务而不是I/O绑定或阻塞(多处理而不是线程),则可以将ThreadPoolExecutor更改为ProcessPoolExecutoor。
给定函数f,如下所示:
import threading
threading.Thread(target=f).start()
向f传递参数
threading.Thread(target=f, args=(a,b,c)).start()
import threading
import requests
def send():
r = requests.get('https://www.stackoverlow.com')
thread = []
t = threading.Thread(target=send())
thread.append(t)
t.start()