我正在努力理解Python中的线程。我看过文档和示例,但坦率地说,许多示例过于复杂,我很难理解它们。

如何清楚地显示为多线程划分的任务?


当前回答

使用全新的concurrent.futures模块

def sqr(val):
    import time
    time.sleep(0.1)
    return val * val

def process_result(result):
    print(result)

def process_these_asap(tasks):
    import concurrent.futures

    with concurrent.futures.ProcessPoolExecutor() as executor:
        futures = []
        for task in tasks:
            futures.append(executor.submit(sqr, task))

        for future in concurrent.futures.as_completed(futures):
            process_result(future.result())
        # Or instead of all this just do:
        # results = executor.map(sqr, tasks)
        # list(map(process_result, results))

def main():
    tasks = list(range(10))
    print('Processing {} tasks'.format(len(tasks)))
    process_these_asap(tasks)
    print('Done')
    return 0

if __name__ == '__main__':
    import sys
    sys.exit(main())

执行器方法对于所有以前接触过Java的人来说似乎都很熟悉。

还有一个附带说明:为了保持宇宙的正常,如果你不使用上下文,不要忘记关闭你的池/执行器(这是如此棒,它为你做了)

其他回答

这里是使用线程导入CSV的一个非常简单的示例。(图书馆的收录可能因不同的目的而有所不同。)

助手函数:

from threading import Thread
from project import app
import csv


def import_handler(csv_file_name):
    thr = Thread(target=dump_async_csv_data, args=[csv_file_name])
    thr.start()

def dump_async_csv_data(csv_file_name):
    with app.app_context():
        with open(csv_file_name) as File:
            reader = csv.DictReader(File)
            for row in reader:
                # DB operation/query

驾驶员功能:

import_handler(csv_file_name)

自2010年提出这个问题以来,如何使用带有映射和池的Python进行简单的多线程处理已经得到了真正的简化。

下面的代码来自一篇文章/博客文章,您应该明确查看(没有从属关系)-一行中的并行性:一个更好的日常线程任务模型。我将在下面总结一下——它最终只是几行代码:

from multiprocessing.dummy import Pool as ThreadPool
pool = ThreadPool(4)
results = pool.map(my_function, my_array)

以下是多线程版本:

results = []
for item in my_array:
    results.append(my_function(item))

描述

Map是一个很酷的小函数,是将并行性轻松注入Python代码的关键。对于那些不熟悉的人来说,map是从Lisp这样的函数语言中提取出来的。它是一个将另一个函数映射到序列上的函数。Map为我们处理序列上的迭代,应用函数,并在最后将所有结果存储在一个方便的列表中。


实施

map函数的并行版本由两个库提供:multiprocessing,以及它鲜为人知但同样神奇的stepchild:multiprocessing.dummy。

multiprocessing.dummy与多处理模块完全相同,但使用线程(一个重要的区别-对CPU密集型任务使用多个进程;对I/O(和在I/O期间)使用线程):

multiprocessing.dummy复制了多处理的API,但它不过是线程模块的包装器。

import urllib2
from multiprocessing.dummy import Pool as ThreadPool

urls = [
  'http://www.python.org',
  'http://www.python.org/about/',
  'http://www.onlamp.com/pub/a/python/2003/04/17/metaclasses.html',
  'http://www.python.org/doc/',
  'http://www.python.org/download/',
  'http://www.python.org/getit/',
  'http://www.python.org/community/',
  'https://wiki.python.org/moin/',
]

# Make the Pool of workers
pool = ThreadPool(4)

# Open the URLs in their own threads
# and return the results
results = pool.map(urllib2.urlopen, urls)

# Close the pool and wait for the work to finish
pool.close()
pool.join()

计时结果:

Single thread:   14.4 seconds
       4 Pool:   3.1 seconds
       8 Pool:   1.4 seconds
      13 Pool:   1.3 seconds

传递多个参数(仅在Python 3.3及更高版本中如此):

要传递多个数组,请执行以下操作:

results = pool.starmap(function, zip(list_a, list_b))

或者传递常量和数组:

results = pool.starmap(function, zip(itertools.repeat(constant), list_a))

如果您使用的是早期版本的Python,可以通过此解决方法传递多个参数)。

(感谢user136036提供的有用评论。)

这很容易理解。这里有两种简单的线程处理方法。

import time
from concurrent.futures import ThreadPoolExecutor, as_completed
import threading

def a(a=1, b=2):
    print(a)
    time.sleep(5)
    print(b)
    return a+b

def b(**kwargs):
    if "a" in kwargs:
        print("am b")
    else:
        print("nothing")
        
to_do=[]
executor = ThreadPoolExecutor(max_workers=4)
ex1=executor.submit(a)
to_do.append(ex1)
ex2=executor.submit(b, **{"a":1})
to_do.append(ex2)

for future in as_completed(to_do):
    print("Future {} and Future Return is {}\n".format(future, future.result()))

print("threading")

to_do=[]
to_do.append(threading.Thread(target=a))
to_do.append(threading.Thread(target=b, kwargs={"a":1}))

for threads in to_do:
    threads.start()
    
for threads in to_do:
    threads.join()

下面的代码可以运行10个线程同时打印0到99之间的数字:

from threading import Thread

def test():
    for i in range(0, 100):
        print(i)

thread_list = []

for _ in range(0, 10):
    thread = Thread(target=test)
    thread_list.append(thread)

for thread in thread_list:
    thread.start()

for thread in thread_list:
    thread.join()

下面的代码是上述代码循环版本的简写,运行10个线程,同时打印0到99之间的数字:

from threading import Thread

def test():
    [print(i) for i in range(0, 100)]

thread_list = [Thread(target=test) for _ in range(0, 10)]

[thread.start() for thread in thread_list]

[thread.join() for thread in thread_list]

结果如下:

...
99
83
97
84
98
99
85
86
87
88
...

我在这里看到了很多没有执行实际工作的示例,它们大多是CPU限制的。这里是一个CPU绑定任务的示例,它计算1000万到1005万之间的所有素数。我在这里使用了所有四种方法:

import math
import timeit
import threading
import multiprocessing
from concurrent.futures import ThreadPoolExecutor, ProcessPoolExecutor


def time_stuff(fn):
    """
    Measure time of execution of a function
    """
    def wrapper(*args, **kwargs):
        t0 = timeit.default_timer()
        fn(*args, **kwargs)
        t1 = timeit.default_timer()
        print("{} seconds".format(t1 - t0))
    return wrapper

def find_primes_in(nmin, nmax):
    """
    Compute a list of prime numbers between the given minimum and maximum arguments
    """
    primes = []

    # Loop from minimum to maximum
    for current in range(nmin, nmax + 1):

        # Take the square root of the current number
        sqrt_n = int(math.sqrt(current))
        found = False

        # Check if the any number from 2 to the square root + 1 divides the current numnber under consideration
        for number in range(2, sqrt_n + 1):

            # If divisible we have found a factor, hence this is not a prime number, lets move to the next one
            if current % number == 0:
                found = True
                break

        # If not divisible, add this number to the list of primes that we have found so far
        if not found:
            primes.append(current)

    # I am merely printing the length of the array containing all the primes, but feel free to do what you want
    print(len(primes))

@time_stuff
def sequential_prime_finder(nmin, nmax):
    """
    Use the main process and main thread to compute everything in this case
    """
    find_primes_in(nmin, nmax)

@time_stuff
def threading_prime_finder(nmin, nmax):
    """
    If the minimum is 1000 and the maximum is 2000 and we have four workers,
    1000 - 1250 to worker 1
    1250 - 1500 to worker 2
    1500 - 1750 to worker 3
    1750 - 2000 to worker 4
    so let’s split the minimum and maximum values according to the number of workers
    """
    nrange = nmax - nmin
    threads = []
    for i in range(8):
        start = int(nmin + i * nrange/8)
        end = int(nmin + (i + 1) * nrange/8)

        # Start the thread with the minimum and maximum split up to compute
        # Parallel computation will not work here due to the GIL since this is a CPU-bound task
        t = threading.Thread(target = find_primes_in, args = (start, end))
        threads.append(t)
        t.start()

    # Don’t forget to wait for the threads to finish
    for t in threads:
        t.join()

@time_stuff
def processing_prime_finder(nmin, nmax):
    """
    Split the minimum, maximum interval similar to the threading method above, but use processes this time
    """
    nrange = nmax - nmin
    processes = []
    for i in range(8):
        start = int(nmin + i * nrange/8)
        end = int(nmin + (i + 1) * nrange/8)
        p = multiprocessing.Process(target = find_primes_in, args = (start, end))
        processes.append(p)
        p.start()

    for p in processes:
        p.join()

@time_stuff
def thread_executor_prime_finder(nmin, nmax):
    """
    Split the min max interval similar to the threading method, but use a thread pool executor this time.
    This method is slightly faster than using pure threading as the pools manage threads more efficiently.
    This method is still slow due to the GIL limitations since we are doing a CPU-bound task.
    """
    nrange = nmax - nmin
    with ThreadPoolExecutor(max_workers = 8) as e:
        for i in range(8):
            start = int(nmin + i * nrange/8)
            end = int(nmin + (i + 1) * nrange/8)
            e.submit(find_primes_in, start, end)

@time_stuff
def process_executor_prime_finder(nmin, nmax):
    """
    Split the min max interval similar to the threading method, but use the process pool executor.
    This is the fastest method recorded so far as it manages process efficiently + overcomes GIL limitations.
    RECOMMENDED METHOD FOR CPU-BOUND TASKS
    """
    nrange = nmax - nmin
    with ProcessPoolExecutor(max_workers = 8) as e:
        for i in range(8):
            start = int(nmin + i * nrange/8)
            end = int(nmin + (i + 1) * nrange/8)
            e.submit(find_primes_in, start, end)

def main():
    nmin = int(1e7)
    nmax = int(1.05e7)
    print("Sequential Prime Finder Starting")
    sequential_prime_finder(nmin, nmax)
    print("Threading Prime Finder Starting")
    threading_prime_finder(nmin, nmax)
    print("Processing Prime Finder Starting")
    processing_prime_finder(nmin, nmax)
    print("Thread Executor Prime Finder Starting")
    thread_executor_prime_finder(nmin, nmax)
    print("Process Executor Finder Starting")
    process_executor_prime_finder(nmin, nmax)
if __name__ == "__main__":
    main()

以下是我的Mac OS X四核计算机的结果

Sequential Prime Finder Starting
9.708213827005238 seconds
Threading Prime Finder Starting
9.81836523200036 seconds
Processing Prime Finder Starting
3.2467174359990167 seconds
Thread Executor Prime Finder Starting
10.228896902000997 seconds
Process Executor Finder Starting
2.656402041000547 seconds