下面的函数foo返回一个字符串'foo'。我如何才能获得从线程的目标返回的值'foo' ?

from threading import Thread

def foo(bar):
    print('hello {}'.format(bar))
    return 'foo'
    
thread = Thread(target=foo, args=('world!',))
thread.start()
return_value = thread.join()

上面所示的“一种明显的方法”不起作用:thread.join()返回None。


当前回答

你可以使用ThreadPool()的pool.apply_async()来返回test()的值,如下所示:

from multiprocessing.pool import ThreadPool

def test(num1, num2):
    return num1 + num2

pool = ThreadPool(processes=1) # Here
result = pool.apply_async(test, (2, 3)) # Here
print(result.get()) # 5

并且,你也可以使用concurrent.futures.ThreadPoolExecutor()的submit()来返回test()的值,如下所示:

from concurrent.futures import ThreadPoolExecutor

def test(num1, num2):
    return num1 + num2

with ThreadPoolExecutor(max_workers=1) as executor:
    future = executor.submit(test, 2, 3) # Here
print(future.result()) # 5

并且,代替返回,你可以使用数组结果,如下所示:

from threading import Thread

def test(num1, num2, r):
    r[0] = num1 + num2 # Instead of "return"

result = [None] # Here

thread = Thread(target=test, args=(2, 3, result))
thread.start()
thread.join()
print(result[0]) # 5

而不是返回,你也可以使用队列结果,如下所示:

from threading import Thread
import queue

def test(num1, num2, q):
    q.put(num1 + num2) # Instead of "return" 

queue = queue.Queue() # Here

thread = Thread(target=test, args=(2, 3, queue))
thread.start()
thread.join()
print(queue.get()) # '5'

其他回答

Jake的回答很好,但如果您不想使用线程池(您不知道需要多少线程,但可以根据需要创建它们),那么在线程之间传输信息的一个好方法是内置的Queue。队列类,因为它提供线程安全性。

我创建了以下装饰器,使其以类似于线程池的方式工作:

def threaded(f, daemon=False):
    import Queue

    def wrapped_f(q, *args, **kwargs):
        '''this function calls the decorated function and puts the 
        result in a queue'''
        ret = f(*args, **kwargs)
        q.put(ret)

    def wrap(*args, **kwargs):
        '''this is the function returned from the decorator. It fires off
        wrapped_f in a new thread and returns the thread object with
        the result queue attached'''

        q = Queue.Queue()

        t = threading.Thread(target=wrapped_f, args=(q,)+args, kwargs=kwargs)
        t.daemon = daemon
        t.start()
        t.result_queue = q        
        return t

    return wrap

然后你就把它用作:

@threaded
def long_task(x):
    import time
    x = x + 5
    time.sleep(5)
    return x

# does not block, returns Thread object
y = long_task(10)
print y

# this blocks, waiting for the result
result = y.result_queue.get()
print result

装饰函数每次被调用时都会创建一个新线程,并返回一个thread对象,其中包含将接收结果的队列。

更新

自从我发布这个答案已经有一段时间了,但它仍然得到了观看,所以我想我应该更新它,以反映我在新版本的Python中这样做的方式:

Python 3.2并发添加。期货模块,为并行任务提供高级接口。它提供了ThreadPoolExecutor和ProcessPoolExecutor,因此您可以使用具有相同api的线程或进程池。

该api的一个好处是将任务提交给Executor将返回一个Future对象,该对象将以您提交的可调用对象的返回值结束。

这使得附加队列对象成为不必要的,这大大简化了装饰器:

_DEFAULT_POOL = ThreadPoolExecutor()

def threadpool(f, executor=None):
    @wraps(f)
    def wrap(*args, **kwargs):
        return (executor or _DEFAULT_POOL).submit(f, *args, **kwargs)

    return wrap

如果没有传入,将使用默认的模块线程池执行器。

用法和前面的非常相似:

@threadpool
def long_task(x):
    import time
    x = x + 5
    time.sleep(5)
    return x

# does not block, returns Future object
y = long_task(10)
print y

# this blocks, waiting for the result
result = y.result()
print result

如果您使用的是Python 3.4+,那么使用此方法(以及一般的Future对象)的一个非常好的特性是可以将返回的Future对象包装起来以将其转换为asyncio。使用asyncio.wrap_future。这使得它很容易与协程一起工作:

result = await asyncio.wrap_future(long_task(10))

如果您不需要访问底层并发。对象,你可以在装饰器中包含wrap:

_DEFAULT_POOL = ThreadPoolExecutor()

def threadpool(f, executor=None):
    @wraps(f)
    def wrap(*args, **kwargs):
        return asyncio.wrap_future((executor or _DEFAULT_POOL).submit(f, *args, **kwargs))

    return wrap

然后,当你需要将cpu密集型代码或阻塞代码从事件循环线程中推出时,你可以将它放在装饰函数中:

@threadpool
def some_long_calculation():
    ...

# this will suspend while the function is executed on a threadpool
result = await some_long_calculation()

根据上面提到的,下面是适用于Python3的更通用的解决方案。

import threading

class ThreadWithReturnValue(threading.Thread):
    def __init__(self, *init_args, **init_kwargs):
        threading.Thread.__init__(self, *init_args, **init_kwargs)
        self._return = None
    def run(self):
        self._return = self._target(*self._args, **self._kwargs)
    def join(self):
        threading.Thread.join(self)
        return self._return

使用

        th = ThreadWithReturnValue(target=requests.get, args=('http://www.google.com',))
        th.start()
        response = th.join()
        response.status_code  # => 200

定义你的目标 1)采取一个论点q 2)用q.put(foo)替换return foo的任何语句;返回

一个函数

def func(a):
    ans = a * a
    return ans

将成为

def func(a, q):
    ans = a * a
    q.put(ans)
    return

然后你就可以这样做了

from Queue import Queue
from threading import Thread

ans_q = Queue()
arg_tups = [(i, ans_q) for i in xrange(10)]

threads = [Thread(target=func, args=arg_tup) for arg_tup in arg_tups]
_ = [t.start() for t in threads]
_ = [t.join() for t in threads]
results = [q.get() for _ in xrange(len(threads))]

你可以使用函数装饰器/包装器来实现它,这样你就可以使用现有的函数作为目标,而不需要修改它们,但要遵循这个基本方案。

在Python 3.2+中,stdlib concurrent。futures模块为线程提供了一个更高级别的API,包括将返回值或异常从工作线程传递回主线程:

import concurrent.futures

def foo(bar):
    print('hello {}'.format(bar))
    return 'foo'

with concurrent.futures.ThreadPoolExecutor() as executor:
    future = executor.submit(foo, 'world!')
    return_value = future.result()
    print(return_value)

join总是返回None,我认为你应该子类化Thread来处理返回代码等。