是否有一个用于工作线程的Pool类,类似于多处理模块的Pool类?

例如,我喜欢并行化映射函数的简单方法

def long_running_func(p):
    c_func_no_gil(p)

p = multiprocessing.Pool(4)
xs = p.map(long_running_func, range(100))

然而,我想这样做没有创建新进程的开销。

我知道GIL。然而,在我的用例中,该函数将是一个io绑定的C函数,python包装器将在实际函数调用之前释放GIL。

我必须编写自己的线程池吗?


当前回答

创建新进程的开销是最小的,特别是当只有4个进程时。我怀疑这是您的应用程序的性能热点。保持简单,优化你必须做的和分析结果指向的地方。

其他回答

没有内置的基于线程的池。但是,使用queue类实现生产者/消费者队列可以非常快。

来自: https://docs.python.org/2/library/queue.html

from threading import Thread
from Queue import Queue
def worker():
    while True:
        item = q.get()
        do_work(item)
        q.task_done()

q = Queue()
for i in range(num_worker_threads):
     t = Thread(target=worker)
     t.daemon = True
     t.start()

for item in source():
    q.put(item)

q.join()       # block until all tasks are done

创建新进程的开销是最小的,特别是当只有4个进程时。我怀疑这是您的应用程序的性能热点。保持简单,优化你必须做的和分析结果指向的地方。

我刚刚发现,在多处理模块中实际上有一个基于线程的池接口,但是它有些隐藏,并且没有正确地记录。

它可以通过

from multiprocessing.pool import ThreadPool

它是使用封装python线程的虚拟Process类实现的。这个基于线程的Process类可以在multiprocessing中找到。在文档中简单提到过。这个虚拟模块提供了基于线程的整个多处理接口。

对于一些非常简单和轻量级的东西(从这里略有修改):

from Queue import Queue
from threading import Thread


class Worker(Thread):
    """Thread executing tasks from a given tasks queue"""
    def __init__(self, tasks):
        Thread.__init__(self)
        self.tasks = tasks
        self.daemon = True
        self.start()

    def run(self):
        while True:
            func, args, kargs = self.tasks.get()
            try:
                func(*args, **kargs)
            except Exception, e:
                print e
            finally:
                self.tasks.task_done()


class ThreadPool:
    """Pool of threads consuming tasks from a queue"""
    def __init__(self, num_threads):
        self.tasks = Queue(num_threads)
        for _ in range(num_threads):
            Worker(self.tasks)

    def add_task(self, func, *args, **kargs):
        """Add a task to the queue"""
        self.tasks.put((func, args, kargs))

    def wait_completion(self):
        """Wait for completion of all the tasks in the queue"""
        self.tasks.join()

if __name__ == '__main__':
    from random import randrange
    from time import sleep

    delays = [randrange(1, 10) for i in range(100)]

    def wait_delay(d):
        print 'sleeping for (%d)sec' % d
        sleep(d)

    pool = ThreadPool(20)

    for i, d in enumerate(delays):
        pool.add_task(wait_delay, d)

    pool.wait_completion()

要在任务完成时支持回调,只需将回调添加到任务元组。

是的,有一个线程池类似于多处理池,但是,它有些隐藏,没有适当的文档。您可以通过以下方式导入:-

from multiprocessing.pool import ThreadPool

我举个简单的例子

def test_multithread_stringio_read_csv(self):
        # see gh-11786
        max_row_range = 10000
        num_files = 100

        bytes_to_df = [
            '\n'.join(
                ['%d,%d,%d' % (i, i, i) for i in range(max_row_range)]
            ).encode() for j in range(num_files)]
        files = [BytesIO(b) for b in bytes_to_df]

        # read all files in many threads
        pool = ThreadPool(8)
        results = pool.map(self.read_csv, files)
        first_result = results[0]

        for result in results:
            tm.assert_frame_equal(first_result, result)