是否有一个用于工作线程的Pool类,类似于多处理模块的Pool类?
例如,我喜欢并行化映射函数的简单方法
def long_running_func(p):
c_func_no_gil(p)
p = multiprocessing.Pool(4)
xs = p.map(long_running_func, range(100))
然而,我想这样做没有创建新进程的开销。
我知道GIL。然而,在我的用例中,该函数将是一个io绑定的C函数,python包装器将在实际函数调用之前释放GIL。
我必须编写自己的线程池吗?
是的,有一个线程池类似于多处理池,但是,它有些隐藏,没有适当的文档。您可以通过以下方式导入:-
from multiprocessing.pool import ThreadPool
我举个简单的例子
def test_multithread_stringio_read_csv(self):
# see gh-11786
max_row_range = 10000
num_files = 100
bytes_to_df = [
'\n'.join(
['%d,%d,%d' % (i, i, i) for i in range(max_row_range)]
).encode() for j in range(num_files)]
files = [BytesIO(b) for b in bytes_to_df]
# read all files in many threads
pool = ThreadPool(8)
results = pool.map(self.read_csv, files)
first_result = results[0]
for result in results:
tm.assert_frame_equal(first_result, result)
是的,有一个线程池类似于多处理池,但是,它有些隐藏,没有适当的文档。您可以通过以下方式导入:-
from multiprocessing.pool import ThreadPool
我举个简单的例子
def test_multithread_stringio_read_csv(self):
# see gh-11786
max_row_range = 10000
num_files = 100
bytes_to_df = [
'\n'.join(
['%d,%d,%d' % (i, i, i) for i in range(max_row_range)]
).encode() for j in range(num_files)]
files = [BytesIO(b) for b in bytes_to_df]
# read all files in many threads
pool = ThreadPool(8)
results = pool.map(self.read_csv, files)
first_result = results[0]
for result in results:
tm.assert_frame_equal(first_result, result)