是否有一个用于工作线程的Pool类,类似于多处理模块的Pool类?

例如,我喜欢并行化映射函数的简单方法

def long_running_func(p):
    c_func_no_gil(p)

p = multiprocessing.Pool(4)
xs = p.map(long_running_func, range(100))

然而,我想这样做没有创建新进程的开销。

我知道GIL。然而,在我的用例中,该函数将是一个io绑定的C函数,python包装器将在实际函数调用之前释放GIL。

我必须编写自己的线程池吗?


当前回答

另一种方法是将进程添加到线程队列池中

import concurrent.futures
with concurrent.futures.ThreadPoolExecutor(max_workers=cpus) as executor:
    for i in range(10):
        a = executor.submit(arg1, arg2,....)

其他回答

另一种方法是将进程添加到线程队列池中

import concurrent.futures
with concurrent.futures.ThreadPoolExecutor(max_workers=cpus) as executor:
    for i in range(10):
        a = executor.submit(arg1, arg2,....)

嗨,在Python中使用线程池,你可以使用这个库:

from multiprocessing.dummy import Pool as ThreadPool

为了使用,这个库是这样的:

pool = ThreadPool(threads)
results = pool.map(service, tasks)
pool.close()
pool.join()
return results

线程是您想要的线程数,任务是大多数映射到服务的任务列表。

没有内置的基于线程的池。但是,使用queue类实现生产者/消费者队列可以非常快。

来自: https://docs.python.org/2/library/queue.html

from threading import Thread
from Queue import Queue
def worker():
    while True:
        item = q.get()
        do_work(item)
        q.task_done()

q = Queue()
for i in range(num_worker_threads):
     t = Thread(target=worker)
     t.daemon = True
     t.start()

for item in source():
    q.put(item)

q.join()       # block until all tasks are done

创建新进程的开销是最小的,特别是当只有4个进程时。我怀疑这是您的应用程序的性能热点。保持简单,优化你必须做的和分析结果指向的地方。

是的,它似乎(或多或少)具有相同的API。

import multiprocessing

def worker(lnk):
    ....    
def start_process():
    .....
....

if(PROCESS):
    pool = multiprocessing.Pool(processes=POOL_SIZE, initializer=start_process)
else:
    pool = multiprocessing.pool.ThreadPool(processes=POOL_SIZE, 
                                           initializer=start_process)

pool.map(worker, inputs)
....