我正在努力理解Python中的线程。我看过文档和示例,但坦率地说,许多示例过于复杂,我很难理解它们。

如何清楚地显示为多线程划分的任务?


当前回答

作为第二个anwser的python3版本:

import queue as Queue
import threading
import urllib.request

# Called by each thread
def get_url(q, url):
    q.put(urllib.request.urlopen(url).read())

theurls = ["http://google.com", "http://yahoo.com", "http://www.python.org","https://wiki.python.org/moin/"]

q = Queue.Queue()
def thread_func():
    for u in theurls:
        t = threading.Thread(target=get_url, args = (q,u))
        t.daemon = True
        t.start()

    s = q.get()
    
def non_thread_func():
    for u in theurls:
        get_url(q,u)
        

    s = q.get()
   

您可以测试它:

start = time.time()
thread_func()
end = time.time()
print(end - start)

start = time.time()
non_thread_func()
end = time.time()
print(end - start)

non_thread_func()花费的时间应该是thread_func()的4倍

其他回答

我想提供一个简单的例子,以及我在自己解决这个问题时发现有用的解释。

在这个答案中,您将找到一些关于Python的GIL(全局解释器锁)的信息,以及一个使用multiprocessing.dummy编写的简单日常示例,以及一些简单的基准测试。

全局解释器锁(GIL)

Python不允许真正意义上的多线程。它有一个多线程包,但是如果你想多线程来加快你的代码,那么使用它通常不是一个好主意。

Python有一个称为全局解释器锁(GIL)的构造。GIL确保在任何时候只能执行一个“线程”。一个线程获取GIL,做一些工作,然后将GIL传递给下一个线程。

这种情况发生得很快,因此在人眼看来,您的线程似乎是并行执行的,但它们实际上只是轮流使用相同的CPU内核。

所有这些GIL传递都增加了执行开销。这意味着如果你想让你的代码运行得更快,那么使用线程打包通常不是个好主意。

使用Python的线程包是有原因的。如果你想同时运行一些事情,而效率不是一个问题,那就很好,也很方便。或者,如果您运行的代码需要等待一些东西(比如一些I/O),那么这可能很有意义。但是线程库不允许您使用额外的CPU内核。

多线程可以外包给操作系统(通过执行多线程处理),以及一些调用Python代码的外部应用程序(例如,Spark或Hadoop),或者Python代码调用的一些代码(例如:您可以让Python代码调用一个C函数来完成昂贵的多线程任务)。

为什么这很重要

因为很多人在了解GIL是什么之前,会花很多时间在他们的Python多线程代码中寻找瓶颈。

一旦这些信息清楚,下面是我的代码:

#!/bin/python
from multiprocessing.dummy import Pool
from subprocess import PIPE,Popen
import time
import os

# In the variable pool_size we define the "parallelness".
# For CPU-bound tasks, it doesn't make sense to create more Pool processes
# than you have cores to run them on.
#
# On the other hand, if you are using I/O-bound tasks, it may make sense
# to create a quite a few more Pool processes than cores, since the processes
# will probably spend most their time blocked (waiting for I/O to complete).
pool_size = 8

def do_ping(ip):
    if os.name == 'nt':
        print ("Using Windows Ping to " + ip)
        proc = Popen(['ping', ip], stdout=PIPE)
        return proc.communicate()[0]
    else:
        print ("Using Linux / Unix Ping to " + ip)
        proc = Popen(['ping', ip, '-c', '4'], stdout=PIPE)
        return proc.communicate()[0]


os.system('cls' if os.name=='nt' else 'clear')
print ("Running using threads\n")
start_time = time.time()
pool = Pool(pool_size)
website_names = ["www.google.com","www.facebook.com","www.pinterest.com","www.microsoft.com"]
result = {}
for website_name in website_names:
    result[website_name] = pool.apply_async(do_ping, args=(website_name,))
pool.close()
pool.join()
print ("\n--- Execution took {} seconds ---".format((time.time() - start_time)))

# Now we do the same without threading, just to compare time
print ("\nRunning NOT using threads\n")
start_time = time.time()
for website_name in website_names:
    do_ping(website_name)
print ("\n--- Execution took {} seconds ---".format((time.time() - start_time)))

# Here's one way to print the final output from the threads
output = {}
for key, value in result.items():
    output[key] = value.get()
print ("\nOutput aggregated in a Dictionary:")
print (output)
print ("\n")

print ("\nPretty printed output: ")
for key, value in output.items():
    print (key + "\n")
    print (value)

大多数文档和教程都使用Python的“线程和队列”模块,对于初学者来说,它们可能会让人不知所措。

也许可以考虑Python 3的concurrent.futures.ThreadPoolExecutor模块。

结合子句和列表理解,这可能是一个真正的魅力。

from concurrent.futures import ThreadPoolExecutor, as_completed

def get_url(url):
    # Your actual program here. Using threading.Lock() if necessary
    return ""

# List of URLs to fetch
urls = ["url1", "url2"]

with ThreadPoolExecutor(max_workers = 5) as executor:

    # Create threads
    futures = {executor.submit(get_url, url) for url in urls}

    # as_completed() gives you the threads once finished
    for f in as_completed(futures):
        # Get the results
        rs = f.result()

与其他提到的一样,由于GIL,CPython只能在I/O等待时使用线程。

如果您想从多个内核中获得CPU绑定任务的好处,请使用多处理:

from multiprocessing import Process

def f(name):
    print 'hello', name

if __name__ == '__main__':
    p = Process(target=f, args=('bob',))
    p.start()
    p.join()

这很容易理解。这里有两种简单的线程处理方法。

import time
from concurrent.futures import ThreadPoolExecutor, as_completed
import threading

def a(a=1, b=2):
    print(a)
    time.sleep(5)
    print(b)
    return a+b

def b(**kwargs):
    if "a" in kwargs:
        print("am b")
    else:
        print("nothing")
        
to_do=[]
executor = ThreadPoolExecutor(max_workers=4)
ex1=executor.submit(a)
to_do.append(ex1)
ex2=executor.submit(b, **{"a":1})
to_do.append(ex2)

for future in as_completed(to_do):
    print("Future {} and Future Return is {}\n".format(future, future.result()))

print("threading")

to_do=[]
to_do.append(threading.Thread(target=a))
to_do.append(threading.Thread(target=b, kwargs={"a":1}))

for threads in to_do:
    threads.start()
    
for threads in to_do:
    threads.join()

这里是使用线程导入CSV的一个非常简单的示例。(图书馆的收录可能因不同的目的而有所不同。)

助手函数:

from threading import Thread
from project import app
import csv


def import_handler(csv_file_name):
    thr = Thread(target=dump_async_csv_data, args=[csv_file_name])
    thr.start()

def dump_async_csv_data(csv_file_name):
    with app.app_context():
        with open(csv_file_name) as File:
            reader = csv.DictReader(File)
            for row in reader:
                # DB operation/query

驾驶员功能:

import_handler(csv_file_name)