下面的函数foo返回一个字符串'foo'。我如何才能获得从线程的目标返回的值'foo' ?

from threading import Thread

def foo(bar):
    print('hello {}'.format(bar))
    return 'foo'
    
thread = Thread(target=foo, args=('world!',))
thread.start()
return_value = thread.join()

上面所示的“一种明显的方法”不起作用:thread.join()返回None。


当前回答

在Python 3.2+中,stdlib concurrent。futures模块为线程提供了一个更高级别的API,包括将返回值或异常从工作线程传递回主线程:

import concurrent.futures

def foo(bar):
    print('hello {}'.format(bar))
    return 'foo'

with concurrent.futures.ThreadPoolExecutor() as executor:
    future = executor.submit(foo, 'world!')
    return_value = future.result()
    print(return_value)

其他回答

如上所述,多处理池比基本线程要慢得多。使用一些回答中提出的队列是一种非常有效的替代方法。我已经将它与字典一起使用,以便能够运行许多小线程,并通过将它们与字典结合来恢复多个答案:

#!/usr/bin/env python3

import threading
# use Queue for python2
import queue
import random

LETTERS = 'abcdefghijklmnopqrstuvwxyz'
LETTERS = [ x for x in LETTERS ]

NUMBERS = [0, 1, 2, 3, 4, 5, 6, 7, 8, 9]

def randoms(k, q):
    result = dict()
    result['letter'] = random.choice(LETTERS)
    result['number'] = random.choice(NUMBERS)
    q.put({k: result})

threads = list()
q = queue.Queue()
results = dict()

for name in ('alpha', 'oscar', 'yankee',):
    threads.append( threading.Thread(target=randoms, args=(name, q)) )
    threads[-1].start()
_ = [ t.join() for t in threads ]
while not q.empty():
    results.update(q.get())

print(results)

最好的方法…定义一个全局变量,然后在线程函数中更改该变量。没有可以传递或取回的东西

from threading import Thread

# global var
radom_global_var = 5

def function():
    global random_global_var
    random_global_var += 1

domath = Thread(target=function)
domath.start()
domath.join()
print(random_global_var)

# result: 6

Jake的回答很好,但如果您不想使用线程池(您不知道需要多少线程,但可以根据需要创建它们),那么在线程之间传输信息的一个好方法是内置的Queue。队列类,因为它提供线程安全性。

我创建了以下装饰器,使其以类似于线程池的方式工作:

def threaded(f, daemon=False):
    import Queue

    def wrapped_f(q, *args, **kwargs):
        '''this function calls the decorated function and puts the 
        result in a queue'''
        ret = f(*args, **kwargs)
        q.put(ret)

    def wrap(*args, **kwargs):
        '''this is the function returned from the decorator. It fires off
        wrapped_f in a new thread and returns the thread object with
        the result queue attached'''

        q = Queue.Queue()

        t = threading.Thread(target=wrapped_f, args=(q,)+args, kwargs=kwargs)
        t.daemon = daemon
        t.start()
        t.result_queue = q        
        return t

    return wrap

然后你就把它用作:

@threaded
def long_task(x):
    import time
    x = x + 5
    time.sleep(5)
    return x

# does not block, returns Thread object
y = long_task(10)
print y

# this blocks, waiting for the result
result = y.result_queue.get()
print result

装饰函数每次被调用时都会创建一个新线程,并返回一个thread对象,其中包含将接收结果的队列。

更新

自从我发布这个答案已经有一段时间了,但它仍然得到了观看,所以我想我应该更新它,以反映我在新版本的Python中这样做的方式:

Python 3.2并发添加。期货模块,为并行任务提供高级接口。它提供了ThreadPoolExecutor和ProcessPoolExecutor,因此您可以使用具有相同api的线程或进程池。

该api的一个好处是将任务提交给Executor将返回一个Future对象,该对象将以您提交的可调用对象的返回值结束。

这使得附加队列对象成为不必要的,这大大简化了装饰器:

_DEFAULT_POOL = ThreadPoolExecutor()

def threadpool(f, executor=None):
    @wraps(f)
    def wrap(*args, **kwargs):
        return (executor or _DEFAULT_POOL).submit(f, *args, **kwargs)

    return wrap

如果没有传入,将使用默认的模块线程池执行器。

用法和前面的非常相似:

@threadpool
def long_task(x):
    import time
    x = x + 5
    time.sleep(5)
    return x

# does not block, returns Future object
y = long_task(10)
print y

# this blocks, waiting for the result
result = y.result()
print result

如果您使用的是Python 3.4+,那么使用此方法(以及一般的Future对象)的一个非常好的特性是可以将返回的Future对象包装起来以将其转换为asyncio。使用asyncio.wrap_future。这使得它很容易与协程一起工作:

result = await asyncio.wrap_future(long_task(10))

如果您不需要访问底层并发。对象,你可以在装饰器中包含wrap:

_DEFAULT_POOL = ThreadPoolExecutor()

def threadpool(f, executor=None):
    @wraps(f)
    def wrap(*args, **kwargs):
        return asyncio.wrap_future((executor or _DEFAULT_POOL).submit(f, *args, **kwargs))

    return wrap

然后,当你需要将cpu密集型代码或阻塞代码从事件循环线程中推出时,你可以将它放在装饰函数中:

@threadpool
def some_long_calculation():
    ...

# this will suspend while the function is executed on a threadpool
result = await some_long_calculation()

FWIW,多处理模块使用Pool类提供了一个很好的接口。如果您希望坚持使用线程而不是进程,可以直接使用multiprocessing.pool.ThreadPool类作为替代。

def foo(bar, baz):
  print 'hello {0}'.format(bar)
  return 'foo' + baz

from multiprocessing.pool import ThreadPool
pool = ThreadPool(processes=1)

async_result = pool.apply_async(foo, ('world', 'foo')) # tuple of args for foo

# do some other stuff in the main process

return_val = async_result.get()  # get the return value from your function.

这是一个很老的问题,但我想分享一个简单的解决方案,它对我的开发过程有帮助。

这个答案背后的方法论是这样一个事实,即“新的”目标函数,内部是将原始函数的结果(通过__init__函数传递)通过所谓的闭包分配给包装器的结果实例属性。

这允许包装器类保留返回值以供调用者随时访问。

注意:这个方法不需要使用线程的任何mangded方法或私有方法。线程类,虽然没有考虑屈服函数(OP没有提到屈服函数)。

享受吧!

from threading import Thread as _Thread


class ThreadWrapper:
    def __init__(self, target, *args, **kwargs):
        self.result = None
        self._target = self._build_threaded_fn(target)
        self.thread = _Thread(
            target=self._target,
            *args,
            **kwargs
        )

    def _build_threaded_fn(self, func):
        def inner(*args, **kwargs):
            self.result = func(*args, **kwargs)
        return inner

此外,你可以用下面的代码运行pytest(假设你已经安装了它)来演示结果:

import time
from commons import ThreadWrapper


def test():

    def target():
        time.sleep(1)
        return 'Hello'

    wrapper = ThreadWrapper(target=target)
    wrapper.thread.start()

    r = wrapper.result
    assert r is None

    time.sleep(2)

    r = wrapper.result
    assert r == 'Hello'