是否有一个用于工作线程的Pool类,类似于多处理模块的Pool类?

例如,我喜欢并行化映射函数的简单方法

def long_running_func(p):
    c_func_no_gil(p)

p = multiprocessing.Pool(4)
xs = p.map(long_running_func, range(100))

然而,我想这样做没有创建新进程的开销。

我知道GIL。然而,在我的用例中,该函数将是一个io绑定的C函数,python包装器将在实际函数调用之前释放GIL。

我必须编写自己的线程池吗?


当前回答

我刚刚发现,在多处理模块中实际上有一个基于线程的池接口,但是它有些隐藏,并且没有正确地记录。

它可以通过

from multiprocessing.pool import ThreadPool

它是使用封装python线程的虚拟Process类实现的。这个基于线程的Process类可以在multiprocessing中找到。在文档中简单提到过。这个虚拟模块提供了基于线程的整个多处理接口。

其他回答

如果你不介意执行别人的代码,这里是我的:

注意:有很多额外的代码你可能想要删除[添加是为了更好地说明和演示它是如何工作的]

注意:Python命名约定用于方法名和变量名,而不是camelCase。

工作过程:

MultiThread class will initiate with no of instances of threads by sharing lock, work queue, exit flag and results. SingleThread will be started by MultiThread once it creates all instances. We can add works using MultiThread (It will take care of locking). SingleThreads will process work queue using a lock in middle. Once your work is done, you can destroy all threads with shared boolean value. Here, work can be anything. It can automatically import (uncomment import line) and process module using given arguments. Results will be added to results and we can get using get_results

代码:

import threading
import queue


class SingleThread(threading.Thread):
    def __init__(self, name, work_queue, lock, exit_flag, results):
        threading.Thread.__init__(self)
        self.name = name
        self.work_queue = work_queue
        self.lock = lock
        self.exit_flag = exit_flag
        self.results = results

    def run(self):
        # print("Coming %s with parameters %s", self.name, self.exit_flag)
        while not self.exit_flag:
            # print(self.exit_flag)
            self.lock.acquire()
            if not self.work_queue.empty():
                work = self.work_queue.get()
                module, operation, args, kwargs = work.module, work.operation, work.args, work.kwargs
                self.lock.release()
                print("Processing : " + operation + " with parameters " + str(args) + " and " + str(kwargs) + " by " + self.name + "\n")
                # module = __import__(module_name)
                result = str(getattr(module, operation)(*args, **kwargs))
                print("Result : " + result + " for operation " + operation + " and input " + str(args) + " " + str(kwargs))
                self.results.append(result)
            else:
                self.lock.release()
        # process_work_queue(self.work_queue)

class MultiThread:
    def __init__(self, no_of_threads):
        self.exit_flag = bool_instance()
        self.queue_lock = threading.Lock()
        self.threads = []
        self.work_queue = queue.Queue()
        self.results = []
        for index in range(0, no_of_threads):
            thread = SingleThread("Thread" + str(index+1), self.work_queue, self.queue_lock, self.exit_flag, self.results)
            thread.start()
            self.threads.append(thread)

    def add_work(self, work):
        self.queue_lock.acquire()
        self.work_queue._put(work)
        self.queue_lock.release()

    def destroy(self):
        self.exit_flag.value = True
        for thread in self.threads:
            thread.join()

    def get_results(self):
        return self.results


class Work:
    def __init__(self, module, operation, args, kwargs={}):
        self.module = module
        self.operation = operation
        self.args = args
        self.kwargs = kwargs


class SimpleOperations:
    def sum(self, *args):
        return sum([int(arg) for arg in args])

    @staticmethod
    def mul(a, b, c=0):
        return int(a) * int(b) + int(c)


class bool_instance:
    def __init__(self, value=False):
        self.value = value

    def __setattr__(self, key, value):
        if key != "value":
            raise AttributeError("Only value can be set!")
        if not isinstance(value, bool):
            raise AttributeError("Only True/False can be set!")
        self.__dict__[key] = value
        # super.__setattr__(key, bool(value))

    def __bool__(self):
        return self.value

if __name__ == "__main__":
    multi_thread = MultiThread(5)
    multi_thread.add_work(Work(SimpleOperations(), "mul", [2, 3], {"c":4}))
    while True:
        data_input = input()
        if data_input == "":
            pass
        elif data_input == "break":
            break
        else:
            work = data_input.split()
            multi_thread.add_work(Work(SimpleOperations(), work[0], work[1:], {}))
    multi_thread.destroy()
    print(multi_thread.get_results())

我刚刚发现,在多处理模块中实际上有一个基于线程的池接口,但是它有些隐藏,并且没有正确地记录。

它可以通过

from multiprocessing.pool import ThreadPool

它是使用封装python线程的虚拟Process类实现的。这个基于线程的Process类可以在multiprocessing中找到。在文档中简单提到过。这个虚拟模块提供了基于线程的整个多处理接口。

对于一些非常简单和轻量级的东西(从这里略有修改):

from Queue import Queue
from threading import Thread


class Worker(Thread):
    """Thread executing tasks from a given tasks queue"""
    def __init__(self, tasks):
        Thread.__init__(self)
        self.tasks = tasks
        self.daemon = True
        self.start()

    def run(self):
        while True:
            func, args, kargs = self.tasks.get()
            try:
                func(*args, **kargs)
            except Exception, e:
                print e
            finally:
                self.tasks.task_done()


class ThreadPool:
    """Pool of threads consuming tasks from a queue"""
    def __init__(self, num_threads):
        self.tasks = Queue(num_threads)
        for _ in range(num_threads):
            Worker(self.tasks)

    def add_task(self, func, *args, **kargs):
        """Add a task to the queue"""
        self.tasks.put((func, args, kargs))

    def wait_completion(self):
        """Wait for completion of all the tasks in the queue"""
        self.tasks.join()

if __name__ == '__main__':
    from random import randrange
    from time import sleep

    delays = [randrange(1, 10) for i in range(100)]

    def wait_delay(d):
        print 'sleeping for (%d)sec' % d
        sleep(d)

    pool = ThreadPool(20)

    for i, d in enumerate(delays):
        pool.add_task(wait_delay, d)

    pool.wait_completion()

要在任务完成时支持回调,只需将回调添加到任务元组。

没有内置的基于线程的池。但是,使用queue类实现生产者/消费者队列可以非常快。

来自: https://docs.python.org/2/library/queue.html

from threading import Thread
from Queue import Queue
def worker():
    while True:
        item = q.get()
        do_work(item)
        q.task_done()

q = Queue()
for i in range(num_worker_threads):
     t = Thread(target=worker)
     t.daemon = True
     t.start()

for item in source():
    q.put(item)

q.join()       # block until all tasks are done

另一种方法是将进程添加到线程队列池中

import concurrent.futures
with concurrent.futures.ThreadPoolExecutor(max_workers=cpus) as executor:
    for i in range(10):
        a = executor.submit(arg1, arg2,....)