下面的函数foo返回一个字符串'foo'。我如何才能获得从线程的目标返回的值'foo' ?

from threading import Thread

def foo(bar):
    print('hello {}'.format(bar))
    return 'foo'
    
thread = Thread(target=foo, args=('world!',))
thread.start()
return_value = thread.join()

上面所示的“一种明显的方法”不起作用:thread.join()返回None。


当前回答

GuySoft的想法很棒,但我认为对象不一定要从Thread继承,start()可以从接口中删除:

from threading import Thread
import queue
class ThreadWithReturnValue(object):
    def __init__(self, target=None, args=(), **kwargs):
        self._que = queue.Queue()
        self._t = Thread(target=lambda q,arg1,kwargs1: q.put(target(*arg1, **kwargs1)) ,
                args=(self._que, args, kwargs), )
        self._t.start()

    def join(self):
        self._t.join()
        return self._que.get()


def foo(bar):
    print('hello {0}'.format(bar))
    return "foo"

twrv = ThreadWithReturnValue(target=foo, args=('world!',))

print(twrv.join())   # prints foo

其他回答

join总是返回None,我认为你应该子类化Thread来处理返回代码等。

根据上面提到的,下面是适用于Python3的更通用的解决方案。

import threading

class ThreadWithReturnValue(threading.Thread):
    def __init__(self, *init_args, **init_kwargs):
        threading.Thread.__init__(self, *init_args, **init_kwargs)
        self._return = None
    def run(self):
        self._return = self._target(*self._args, **self._kwargs)
    def join(self):
        threading.Thread.join(self)
        return self._return

使用

        th = ThreadWithReturnValue(target=requests.get, args=('http://www.google.com',))
        th.start()
        response = th.join()
        response.status_code  # => 200

Jake的回答很好,但如果您不想使用线程池(您不知道需要多少线程,但可以根据需要创建它们),那么在线程之间传输信息的一个好方法是内置的Queue。队列类,因为它提供线程安全性。

我创建了以下装饰器,使其以类似于线程池的方式工作:

def threaded(f, daemon=False):
    import Queue

    def wrapped_f(q, *args, **kwargs):
        '''this function calls the decorated function and puts the 
        result in a queue'''
        ret = f(*args, **kwargs)
        q.put(ret)

    def wrap(*args, **kwargs):
        '''this is the function returned from the decorator. It fires off
        wrapped_f in a new thread and returns the thread object with
        the result queue attached'''

        q = Queue.Queue()

        t = threading.Thread(target=wrapped_f, args=(q,)+args, kwargs=kwargs)
        t.daemon = daemon
        t.start()
        t.result_queue = q        
        return t

    return wrap

然后你就把它用作:

@threaded
def long_task(x):
    import time
    x = x + 5
    time.sleep(5)
    return x

# does not block, returns Thread object
y = long_task(10)
print y

# this blocks, waiting for the result
result = y.result_queue.get()
print result

装饰函数每次被调用时都会创建一个新线程,并返回一个thread对象,其中包含将接收结果的队列。

更新

自从我发布这个答案已经有一段时间了,但它仍然得到了观看,所以我想我应该更新它,以反映我在新版本的Python中这样做的方式:

Python 3.2并发添加。期货模块,为并行任务提供高级接口。它提供了ThreadPoolExecutor和ProcessPoolExecutor,因此您可以使用具有相同api的线程或进程池。

该api的一个好处是将任务提交给Executor将返回一个Future对象,该对象将以您提交的可调用对象的返回值结束。

这使得附加队列对象成为不必要的,这大大简化了装饰器:

_DEFAULT_POOL = ThreadPoolExecutor()

def threadpool(f, executor=None):
    @wraps(f)
    def wrap(*args, **kwargs):
        return (executor or _DEFAULT_POOL).submit(f, *args, **kwargs)

    return wrap

如果没有传入,将使用默认的模块线程池执行器。

用法和前面的非常相似:

@threadpool
def long_task(x):
    import time
    x = x + 5
    time.sleep(5)
    return x

# does not block, returns Future object
y = long_task(10)
print y

# this blocks, waiting for the result
result = y.result()
print result

如果您使用的是Python 3.4+,那么使用此方法(以及一般的Future对象)的一个非常好的特性是可以将返回的Future对象包装起来以将其转换为asyncio。使用asyncio.wrap_future。这使得它很容易与协程一起工作:

result = await asyncio.wrap_future(long_task(10))

如果您不需要访问底层并发。对象,你可以在装饰器中包含wrap:

_DEFAULT_POOL = ThreadPoolExecutor()

def threadpool(f, executor=None):
    @wraps(f)
    def wrap(*args, **kwargs):
        return asyncio.wrap_future((executor or _DEFAULT_POOL).submit(f, *args, **kwargs))

    return wrap

然后,当你需要将cpu密集型代码或阻塞代码从事件循环线程中推出时,你可以将它放在装饰函数中:

@threadpool
def some_long_calculation():
    ...

# this will suspend while the function is executed on a threadpool
result = await some_long_calculation()

我正在使用这个包装器,它可以轻松地将任何函数转换为在线程中运行-照顾它的返回值或异常。它不会增加队列开销。

def threading_func(f):
    """Decorator for running a function in a thread and handling its return
    value or exception"""
    def start(*args, **kw):
        def run():
            try:
                th.ret = f(*args, **kw)
            except:
                th.exc = sys.exc_info()
        def get(timeout=None):
            th.join(timeout)
            if th.exc:
                raise th.exc[0], th.exc[1], th.exc[2] # py2
                ##raise th.exc[1] #py3                
            return th.ret
        th = threading.Thread(None, run)
        th.exc = None
        th.get = get
        th.start()
        return th
    return start

用法示例

def f(x):
    return 2.5 * x
th = threading_func(f)(4)
print("still running?:", th.is_alive())
print("result:", th.get(timeout=1.0))

@threading_func
def th_mul(a, b):
    return a * b
th = th_mul("text", 2.5)

try:
    print(th.get())
except TypeError:
    print("exception thrown ok.")

线程模块注意事项

线程函数的舒适返回值和异常处理是“python”的常见需求,而且threading模块应该已经提供了——可能直接在标准Thread类中。对于简单的任务,ThreadPool有太多的开销——3个管理线程,很多官僚主义。不幸的是,线程的布局最初是从Java中复制的——例如,从仍然无用的构造函数参数组1 (!)

你可以使用ThreadPool()的pool.apply_async()来返回test()的值,如下所示:

from multiprocessing.pool import ThreadPool

def test(num1, num2):
    return num1 + num2

pool = ThreadPool(processes=1) # Here
result = pool.apply_async(test, (2, 3)) # Here
print(result.get()) # 5

并且,你也可以使用concurrent.futures.ThreadPoolExecutor()的submit()来返回test()的值,如下所示:

from concurrent.futures import ThreadPoolExecutor

def test(num1, num2):
    return num1 + num2

with ThreadPoolExecutor(max_workers=1) as executor:
    future = executor.submit(test, 2, 3) # Here
print(future.result()) # 5

并且,代替返回,你可以使用数组结果,如下所示:

from threading import Thread

def test(num1, num2, r):
    r[0] = num1 + num2 # Instead of "return"

result = [None] # Here

thread = Thread(target=test, args=(2, 3, result))
thread.start()
thread.join()
print(result[0]) # 5

而不是返回,你也可以使用队列结果,如下所示:

from threading import Thread
import queue

def test(num1, num2, q):
    q.put(num1 + num2) # Instead of "return" 

queue = queue.Queue() # Here

thread = Thread(target=test, args=(2, 3, queue))
thread.start()
thread.join()
print(queue.get()) # '5'