我正在努力理解Python中的线程。我看过文档和示例,但坦率地说,许多示例过于复杂,我很难理解它们。
如何清楚地显示为多线程划分的任务?
我正在努力理解Python中的线程。我看过文档和示例,但坦率地说,许多示例过于复杂,我很难理解它们。
如何清楚地显示为多线程划分的任务?
当前回答
我在这里看到了很多没有执行实际工作的示例,它们大多是CPU限制的。这里是一个CPU绑定任务的示例,它计算1000万到1005万之间的所有素数。我在这里使用了所有四种方法:
import math
import timeit
import threading
import multiprocessing
from concurrent.futures import ThreadPoolExecutor, ProcessPoolExecutor
def time_stuff(fn):
"""
Measure time of execution of a function
"""
def wrapper(*args, **kwargs):
t0 = timeit.default_timer()
fn(*args, **kwargs)
t1 = timeit.default_timer()
print("{} seconds".format(t1 - t0))
return wrapper
def find_primes_in(nmin, nmax):
"""
Compute a list of prime numbers between the given minimum and maximum arguments
"""
primes = []
# Loop from minimum to maximum
for current in range(nmin, nmax + 1):
# Take the square root of the current number
sqrt_n = int(math.sqrt(current))
found = False
# Check if the any number from 2 to the square root + 1 divides the current numnber under consideration
for number in range(2, sqrt_n + 1):
# If divisible we have found a factor, hence this is not a prime number, lets move to the next one
if current % number == 0:
found = True
break
# If not divisible, add this number to the list of primes that we have found so far
if not found:
primes.append(current)
# I am merely printing the length of the array containing all the primes, but feel free to do what you want
print(len(primes))
@time_stuff
def sequential_prime_finder(nmin, nmax):
"""
Use the main process and main thread to compute everything in this case
"""
find_primes_in(nmin, nmax)
@time_stuff
def threading_prime_finder(nmin, nmax):
"""
If the minimum is 1000 and the maximum is 2000 and we have four workers,
1000 - 1250 to worker 1
1250 - 1500 to worker 2
1500 - 1750 to worker 3
1750 - 2000 to worker 4
so let’s split the minimum and maximum values according to the number of workers
"""
nrange = nmax - nmin
threads = []
for i in range(8):
start = int(nmin + i * nrange/8)
end = int(nmin + (i + 1) * nrange/8)
# Start the thread with the minimum and maximum split up to compute
# Parallel computation will not work here due to the GIL since this is a CPU-bound task
t = threading.Thread(target = find_primes_in, args = (start, end))
threads.append(t)
t.start()
# Don’t forget to wait for the threads to finish
for t in threads:
t.join()
@time_stuff
def processing_prime_finder(nmin, nmax):
"""
Split the minimum, maximum interval similar to the threading method above, but use processes this time
"""
nrange = nmax - nmin
processes = []
for i in range(8):
start = int(nmin + i * nrange/8)
end = int(nmin + (i + 1) * nrange/8)
p = multiprocessing.Process(target = find_primes_in, args = (start, end))
processes.append(p)
p.start()
for p in processes:
p.join()
@time_stuff
def thread_executor_prime_finder(nmin, nmax):
"""
Split the min max interval similar to the threading method, but use a thread pool executor this time.
This method is slightly faster than using pure threading as the pools manage threads more efficiently.
This method is still slow due to the GIL limitations since we are doing a CPU-bound task.
"""
nrange = nmax - nmin
with ThreadPoolExecutor(max_workers = 8) as e:
for i in range(8):
start = int(nmin + i * nrange/8)
end = int(nmin + (i + 1) * nrange/8)
e.submit(find_primes_in, start, end)
@time_stuff
def process_executor_prime_finder(nmin, nmax):
"""
Split the min max interval similar to the threading method, but use the process pool executor.
This is the fastest method recorded so far as it manages process efficiently + overcomes GIL limitations.
RECOMMENDED METHOD FOR CPU-BOUND TASKS
"""
nrange = nmax - nmin
with ProcessPoolExecutor(max_workers = 8) as e:
for i in range(8):
start = int(nmin + i * nrange/8)
end = int(nmin + (i + 1) * nrange/8)
e.submit(find_primes_in, start, end)
def main():
nmin = int(1e7)
nmax = int(1.05e7)
print("Sequential Prime Finder Starting")
sequential_prime_finder(nmin, nmax)
print("Threading Prime Finder Starting")
threading_prime_finder(nmin, nmax)
print("Processing Prime Finder Starting")
processing_prime_finder(nmin, nmax)
print("Thread Executor Prime Finder Starting")
thread_executor_prime_finder(nmin, nmax)
print("Process Executor Finder Starting")
process_executor_prime_finder(nmin, nmax)
if __name__ == "__main__":
main()
以下是我的Mac OS X四核计算机的结果
Sequential Prime Finder Starting
9.708213827005238 seconds
Threading Prime Finder Starting
9.81836523200036 seconds
Processing Prime Finder Starting
3.2467174359990167 seconds
Thread Executor Prime Finder Starting
10.228896902000997 seconds
Process Executor Finder Starting
2.656402041000547 seconds
其他回答
Alex Martelli的回答对我有所帮助。不过,这里有一个我认为更有用的修改版本(至少对我来说)。
更新:可在Python 2和Python 3中使用
try:
# For Python 3
import queue
from urllib.request import urlopen
except:
# For Python 2
import Queue as queue
from urllib2 import urlopen
import threading
worker_data = ['http://google.com', 'http://yahoo.com', 'http://bing.com']
# Load up a queue with your data. This will handle locking
q = queue.Queue()
for url in worker_data:
q.put(url)
# Define a worker function
def worker(url_queue):
queue_full = True
while queue_full:
try:
# Get your data off the queue, and do some work
url = url_queue.get(False)
data = urlopen(url).read()
print(len(data))
except queue.Empty:
queue_full = False
# Create as many threads as you want
thread_count = 5
for i in range(thread_count):
t = threading.Thread(target=worker, args = (q,))
t.start()
Python 3具有启动并行任务的功能。这使我们的工作更容易。
它有线程池和进程池。
以下内容提供了一个见解:
ThreadPoolExecutor示例(源代码)
import concurrent.futures
import urllib.request
URLS = ['http://www.foxnews.com/',
'http://www.cnn.com/',
'http://europe.wsj.com/',
'http://www.bbc.co.uk/',
'http://some-made-up-domain.com/']
# Retrieve a single page and report the URL and contents
def load_url(url, timeout):
with urllib.request.urlopen(url, timeout=timeout) as conn:
return conn.read()
# We can use a with statement to ensure threads are cleaned up promptly
with concurrent.futures.ThreadPoolExecutor(max_workers=5) as executor:
# Start the load operations and mark each future with its URL
future_to_url = {executor.submit(load_url, url, 60): url for url in URLS}
for future in concurrent.futures.as_completed(future_to_url):
url = future_to_url[future]
try:
data = future.result()
except Exception as exc:
print('%r generated an exception: %s' % (url, exc))
else:
print('%r page is %d bytes' % (url, len(data)))
ProcessPoolExecutor(源)
import concurrent.futures
import math
PRIMES = [
112272535095293,
112582705942171,
112272535095293,
115280095190773,
115797848077099,
1099726899285419]
def is_prime(n):
if n % 2 == 0:
return False
sqrt_n = int(math.floor(math.sqrt(n)))
for i in range(3, sqrt_n + 1, 2):
if n % i == 0:
return False
return True
def main():
with concurrent.futures.ProcessPoolExecutor() as executor:
for number, prime in zip(PRIMES, executor.map(is_prime, PRIMES)):
print('%d is prime: %s' % (number, prime))
if __name__ == '__main__':
main()
import threading
import requests
def send():
r = requests.get('https://www.stackoverlow.com')
thread = []
t = threading.Thread(target=send())
thread.append(t)
t.start()
我发现这非常有用:创建与内核一样多的线程,并让它们执行(大量)任务(在本例中,调用shell程序):
import Queue
import threading
import multiprocessing
import subprocess
q = Queue.Queue()
for i in range(30): # Put 30 tasks in the queue
q.put(i)
def worker():
while True:
item = q.get()
# Execute a task: call a shell program and wait until it completes
subprocess.call("echo " + str(item), shell=True)
q.task_done()
cpus = multiprocessing.cpu_count() # Detect number of cores
print("Creating %d threads" % cpus)
for i in range(cpus):
t = threading.Thread(target=worker)
t.daemon = True
t.start()
q.join() # Block until all tasks are done
使用线程/多处理的最简单方法是使用更多高级库,如autothread。
import autothread
from time import sleep as heavyworkload
@autothread.multithreaded() # <-- This is all you need to add
def example(x: int, y: int):
heavyworkload(1)
return x*y
现在,您可以为函数提供int列表。Autothread将为您处理所有事务,并只提供并行计算的结果。
result = example([1, 2, 3, 4, 5], 10)