是否有一个用于工作线程的Pool类,类似于多处理模块的Pool类?

例如,我喜欢并行化映射函数的简单方法

def long_running_func(p):
    c_func_no_gil(p)

p = multiprocessing.Pool(4)
xs = p.map(long_running_func, range(100))

然而,我想这样做没有创建新进程的开销。

我知道GIL。然而,在我的用例中,该函数将是一个io绑定的C函数,python包装器将在实际函数调用之前释放GIL。

我必须编写自己的线程池吗?


当前回答

创建新进程的开销是最小的,特别是当只有4个进程时。我怀疑这是您的应用程序的性能热点。保持简单,优化你必须做的和分析结果指向的地方。

其他回答

创建新进程的开销是最小的,特别是当只有4个进程时。我怀疑这是您的应用程序的性能热点。保持简单,优化你必须做的和分析结果指向的地方。

嗨,在Python中使用线程池,你可以使用这个库:

from multiprocessing.dummy import Pool as ThreadPool

为了使用,这个库是这样的:

pool = ThreadPool(threads)
results = pool.map(service, tasks)
pool.close()
pool.join()
return results

线程是您想要的线程数,任务是大多数映射到服务的任务列表。

我刚刚发现,在多处理模块中实际上有一个基于线程的池接口,但是它有些隐藏,并且没有正确地记录。

它可以通过

from multiprocessing.pool import ThreadPool

它是使用封装python线程的虚拟Process类实现的。这个基于线程的Process类可以在multiprocessing中找到。在文档中简单提到过。这个虚拟模块提供了基于线程的整个多处理接口。

是的,它似乎(或多或少)具有相同的API。

import multiprocessing

def worker(lnk):
    ....    
def start_process():
    .....
....

if(PROCESS):
    pool = multiprocessing.Pool(processes=POOL_SIZE, initializer=start_process)
else:
    pool = multiprocessing.pool.ThreadPool(processes=POOL_SIZE, 
                                           initializer=start_process)

pool.map(worker, inputs)
....

是的,有一个线程池类似于多处理池,但是,它有些隐藏,没有适当的文档。您可以通过以下方式导入:-

from multiprocessing.pool import ThreadPool

我举个简单的例子

def test_multithread_stringio_read_csv(self):
        # see gh-11786
        max_row_range = 10000
        num_files = 100

        bytes_to_df = [
            '\n'.join(
                ['%d,%d,%d' % (i, i, i) for i in range(max_row_range)]
            ).encode() for j in range(num_files)]
        files = [BytesIO(b) for b in bytes_to_df]

        # read all files in many threads
        pool = ThreadPool(8)
        results = pool.map(self.read_csv, files)
        first_result = results[0]

        for result in results:
            tm.assert_frame_equal(first_result, result)