我正在努力理解Python中的线程。我看过文档和示例,但坦率地说,许多示例过于复杂,我很难理解它们。

如何清楚地显示为多线程划分的任务?


当前回答

自2010年提出这个问题以来,如何使用带有映射和池的Python进行简单的多线程处理已经得到了真正的简化。

下面的代码来自一篇文章/博客文章,您应该明确查看(没有从属关系)-一行中的并行性:一个更好的日常线程任务模型。我将在下面总结一下——它最终只是几行代码:

from multiprocessing.dummy import Pool as ThreadPool
pool = ThreadPool(4)
results = pool.map(my_function, my_array)

以下是多线程版本:

results = []
for item in my_array:
    results.append(my_function(item))

描述

Map是一个很酷的小函数,是将并行性轻松注入Python代码的关键。对于那些不熟悉的人来说,map是从Lisp这样的函数语言中提取出来的。它是一个将另一个函数映射到序列上的函数。Map为我们处理序列上的迭代,应用函数,并在最后将所有结果存储在一个方便的列表中。


实施

map函数的并行版本由两个库提供:multiprocessing,以及它鲜为人知但同样神奇的stepchild:multiprocessing.dummy。

multiprocessing.dummy与多处理模块完全相同,但使用线程(一个重要的区别-对CPU密集型任务使用多个进程;对I/O(和在I/O期间)使用线程):

multiprocessing.dummy复制了多处理的API,但它不过是线程模块的包装器。

import urllib2
from multiprocessing.dummy import Pool as ThreadPool

urls = [
  'http://www.python.org',
  'http://www.python.org/about/',
  'http://www.onlamp.com/pub/a/python/2003/04/17/metaclasses.html',
  'http://www.python.org/doc/',
  'http://www.python.org/download/',
  'http://www.python.org/getit/',
  'http://www.python.org/community/',
  'https://wiki.python.org/moin/',
]

# Make the Pool of workers
pool = ThreadPool(4)

# Open the URLs in their own threads
# and return the results
results = pool.map(urllib2.urlopen, urls)

# Close the pool and wait for the work to finish
pool.close()
pool.join()

计时结果:

Single thread:   14.4 seconds
       4 Pool:   3.1 seconds
       8 Pool:   1.4 seconds
      13 Pool:   1.3 seconds

传递多个参数(仅在Python 3.3及更高版本中如此):

要传递多个数组,请执行以下操作:

results = pool.starmap(function, zip(list_a, list_b))

或者传递常量和数组:

results = pool.starmap(function, zip(itertools.repeat(constant), list_a))

如果您使用的是早期版本的Python,可以通过此解决方法传递多个参数)。

(感谢user136036提供的有用评论。)

其他回答

我想提供一个简单的例子,以及我在自己解决这个问题时发现有用的解释。

在这个答案中,您将找到一些关于Python的GIL(全局解释器锁)的信息,以及一个使用multiprocessing.dummy编写的简单日常示例,以及一些简单的基准测试。

全局解释器锁(GIL)

Python不允许真正意义上的多线程。它有一个多线程包,但是如果你想多线程来加快你的代码,那么使用它通常不是一个好主意。

Python有一个称为全局解释器锁(GIL)的构造。GIL确保在任何时候只能执行一个“线程”。一个线程获取GIL,做一些工作,然后将GIL传递给下一个线程。

这种情况发生得很快,因此在人眼看来,您的线程似乎是并行执行的,但它们实际上只是轮流使用相同的CPU内核。

所有这些GIL传递都增加了执行开销。这意味着如果你想让你的代码运行得更快,那么使用线程打包通常不是个好主意。

使用Python的线程包是有原因的。如果你想同时运行一些事情,而效率不是一个问题,那就很好,也很方便。或者,如果您运行的代码需要等待一些东西(比如一些I/O),那么这可能很有意义。但是线程库不允许您使用额外的CPU内核。

多线程可以外包给操作系统(通过执行多线程处理),以及一些调用Python代码的外部应用程序(例如,Spark或Hadoop),或者Python代码调用的一些代码(例如:您可以让Python代码调用一个C函数来完成昂贵的多线程任务)。

为什么这很重要

因为很多人在了解GIL是什么之前,会花很多时间在他们的Python多线程代码中寻找瓶颈。

一旦这些信息清楚,下面是我的代码:

#!/bin/python
from multiprocessing.dummy import Pool
from subprocess import PIPE,Popen
import time
import os

# In the variable pool_size we define the "parallelness".
# For CPU-bound tasks, it doesn't make sense to create more Pool processes
# than you have cores to run them on.
#
# On the other hand, if you are using I/O-bound tasks, it may make sense
# to create a quite a few more Pool processes than cores, since the processes
# will probably spend most their time blocked (waiting for I/O to complete).
pool_size = 8

def do_ping(ip):
    if os.name == 'nt':
        print ("Using Windows Ping to " + ip)
        proc = Popen(['ping', ip], stdout=PIPE)
        return proc.communicate()[0]
    else:
        print ("Using Linux / Unix Ping to " + ip)
        proc = Popen(['ping', ip, '-c', '4'], stdout=PIPE)
        return proc.communicate()[0]


os.system('cls' if os.name=='nt' else 'clear')
print ("Running using threads\n")
start_time = time.time()
pool = Pool(pool_size)
website_names = ["www.google.com","www.facebook.com","www.pinterest.com","www.microsoft.com"]
result = {}
for website_name in website_names:
    result[website_name] = pool.apply_async(do_ping, args=(website_name,))
pool.close()
pool.join()
print ("\n--- Execution took {} seconds ---".format((time.time() - start_time)))

# Now we do the same without threading, just to compare time
print ("\nRunning NOT using threads\n")
start_time = time.time()
for website_name in website_names:
    do_ping(website_name)
print ("\n--- Execution took {} seconds ---".format((time.time() - start_time)))

# Here's one way to print the final output from the threads
output = {}
for key, value in result.items():
    output[key] = value.get()
print ("\nOutput aggregated in a Dictionary:")
print (output)
print ("\n")

print ("\nPretty printed output: ")
for key, value in output.items():
    print (key + "\n")
    print (value)

这里是多线程,有一个简单的例子会很有帮助。您可以运行它并轻松了解多线程在Python中的工作方式。我使用了一个锁来防止访问其他线程,直到前面的线程完成它们的工作。通过使用这行代码,

t锁定=线程。有界信号量(值=4)

您可以一次允许多个进程,并保留将在稍后或完成之前的进程后运行的其余线程。

import threading
import time

#tLock = threading.Lock()
tLock = threading.BoundedSemaphore(value=4)
def timer(name, delay, repeat):
    print  "\r\nTimer: ", name, " Started"
    tLock.acquire()
    print "\r\n", name, " has the acquired the lock"
    while repeat > 0:
        time.sleep(delay)
        print "\r\n", name, ": ", str(time.ctime(time.time()))
        repeat -= 1

    print "\r\n", name, " is releaseing the lock"
    tLock.release()
    print "\r\nTimer: ", name, " Completed"

def Main():
    t1 = threading.Thread(target=timer, args=("Timer1", 2, 5))
    t2 = threading.Thread(target=timer, args=("Timer2", 3, 5))
    t3 = threading.Thread(target=timer, args=("Timer3", 4, 5))
    t4 = threading.Thread(target=timer, args=("Timer4", 5, 5))
    t5 = threading.Thread(target=timer, args=("Timer5", 0.1, 5))

    t1.start()
    t2.start()
    t3.start()
    t4.start()
    t5.start()

    print "\r\nMain Complete"

if __name__ == "__main__":
    Main()

这很容易理解。这里有两种简单的线程处理方法。

import time
from concurrent.futures import ThreadPoolExecutor, as_completed
import threading

def a(a=1, b=2):
    print(a)
    time.sleep(5)
    print(b)
    return a+b

def b(**kwargs):
    if "a" in kwargs:
        print("am b")
    else:
        print("nothing")
        
to_do=[]
executor = ThreadPoolExecutor(max_workers=4)
ex1=executor.submit(a)
to_do.append(ex1)
ex2=executor.submit(b, **{"a":1})
to_do.append(ex2)

for future in as_completed(to_do):
    print("Future {} and Future Return is {}\n".format(future, future.result()))

print("threading")

to_do=[]
to_do.append(threading.Thread(target=a))
to_do.append(threading.Thread(target=b, kwargs={"a":1}))

for threads in to_do:
    threads.start()
    
for threads in to_do:
    threads.join()

以前的解决方案都没有在我的GNU/Linux服务器上使用多个内核(我没有管理员权限)。他们只是在一个核心上跑步。

我使用较低级别的os.fork接口来派生多个进程。这是对我有用的代码:

from os import fork

values = ['different', 'values', 'for', 'threads']

for i in range(len(values)):
    p = fork()
    if p == 0:
        my_function(values[i])
        break

使用线程/多处理的最简单方法是使用更多高级库,如autothread。

import autothread
from time import sleep as heavyworkload

@autothread.multithreaded() # <-- This is all you need to add
def example(x: int, y: int):
    heavyworkload(1)
    return x*y

现在,您可以为函数提供int列表。Autothread将为您处理所有事务,并只提供并行计算的结果。

result = example([1, 2, 3, 4, 5], 10)