下面的函数foo返回一个字符串'foo'。我如何才能获得从线程的目标返回的值'foo' ?

from threading import Thread

def foo(bar):
    print('hello {}'.format(bar))
    return 'foo'
    
thread = Thread(target=foo, args=('world!',))
thread.start()
return_value = thread.join()

上面所示的“一种明显的方法”不起作用:thread.join()返回None。


当前回答

一种常见的解决方案是用装饰器来包装函数foo

result = queue.Queue()

def task_wrapper(*args):
    result.put(target(*args))

那么整个代码可能是这样的

result = queue.Queue()

def task_wrapper(*args):
    result.put(target(*args))

threads = [threading.Thread(target=task_wrapper, args=args) for args in args_list]

for t in threads:
    t.start()
    while(True):
        if(len(threading.enumerate()) < max_num):
            break
for t in threads:
    t.join()
return result

Note

一个重要的问题是返回值可能是无序的。 (事实上,返回值不一定保存到队列中,因为您可以选择任意线程安全的数据结构)

其他回答

我偷了kindall的答案,稍微整理了一下。

关键部分是为join()添加*args和**kwargs,以便处理超时

class threadWithReturn(Thread):
    def __init__(self, *args, **kwargs):
        super(threadWithReturn, self).__init__(*args, **kwargs)
        
        self._return = None
    
    def run(self):
        if self._Thread__target is not None:
            self._return = self._Thread__target(*self._Thread__args, **self._Thread__kwargs)
    
    def join(self, *args, **kwargs):
        super(threadWithReturn, self).join(*args, **kwargs)
        
        return self._return

更新答案如下

这是我得到最多好评的答案,所以我决定更新可以在py2和py3上运行的代码。

此外,我看到许多对这个问题的回答都显示出对Thread.join()缺乏理解。有些完全不能处理timeout参数。但是当你有(1)一个可以返回None的目标函数并且(2)你也将timeout参数传递给join()时,还有一种极端情况你应该注意。请参阅“TEST 4”以理解这个极端情况。

ThreadWithReturn类,用于py2和py3:

import sys
from threading import Thread
from builtins import super    # https://stackoverflow.com/a/30159479

_thread_target_key, _thread_args_key, _thread_kwargs_key = (
    ('_target', '_args', '_kwargs')
    if sys.version_info >= (3, 0) else
    ('_Thread__target', '_Thread__args', '_Thread__kwargs')
)

class ThreadWithReturn(Thread):
    def __init__(self, *args, **kwargs):
        super().__init__(*args, **kwargs)
        self._return = None
    
    def run(self):
        target = getattr(self, _thread_target_key)
        if target is not None:
            self._return = target(
                *getattr(self, _thread_args_key),
                **getattr(self, _thread_kwargs_key)
            )
    
    def join(self, *args, **kwargs):
        super().join(*args, **kwargs)
        return self._return

一些示例测试如下所示:

import time, random

# TEST TARGET FUNCTION
def giveMe(arg, seconds=None):
    if not seconds is None:
        time.sleep(seconds)
    return arg

# TEST 1
my_thread = ThreadWithReturn(target=giveMe, args=('stringy',))
my_thread.start()
returned = my_thread.join()
# (returned == 'stringy')

# TEST 2
my_thread = ThreadWithReturn(target=giveMe, args=(None,))
my_thread.start()
returned = my_thread.join()
# (returned is None)

# TEST 3
my_thread = ThreadWithReturn(target=giveMe, args=('stringy',), kwargs={'seconds': 5})
my_thread.start()
returned = my_thread.join(timeout=2)
# (returned is None) # because join() timed out before giveMe() finished

# TEST 4
my_thread = ThreadWithReturn(target=giveMe, args=(None,), kwargs={'seconds': 5})
my_thread.start()
returned = my_thread.join(timeout=random.randint(1, 10))

你能确定我们在测试4中可能遇到的极端情况吗?

问题是我们期望giveMe()返回None(参见TEST 2),但我们也期望join()在超时时返回None。

None表示:

(1)这就是giveMe()返回的,或者

(2) join()超时

这个例子很简单,因为我们知道giveMe()总是返回None。但在真实的实例中(目标可能返回None或其他内容),我们希望显式地检查发生了什么。

下面是如何解决这种极端情况:

# TEST 4
my_thread = ThreadWithReturn(target=giveMe, args=(None,), kwargs={'seconds': 5})
my_thread.start()
returned = my_thread.join(timeout=random.randint(1, 10))

if my_thread.isAlive():
    # returned is None because join() timed out
    # this also means that giveMe() is still running in the background
    pass
    # handle this based on your app's logic
else:
    # join() is finished, and so is giveMe()
    # BUT we could also be in a race condition, so we need to update returned, just in case
    returned = my_thread.join()

在Python 3.2+中,stdlib concurrent。futures模块为线程提供了一个更高级别的API,包括将返回值或异常从工作线程传递回主线程:

import concurrent.futures

def foo(bar):
    print('hello {}'.format(bar))
    return 'foo'

with concurrent.futures.ThreadPoolExecutor() as executor:
    future = executor.submit(foo, 'world!')
    return_value = future.result()
    print(return_value)

我找到的大多数答案都很长,需要熟悉其他模块或高级python特性,除非他们已经熟悉答案所谈论的一切,否则会让人感到困惑。

简化方法的工作代码:

import threading

class ThreadWithResult(threading.Thread):
    def __init__(self, group=None, target=None, name=None, args=(), kwargs={}, *, daemon=None):
        def function():
            self.result = target(*args, **kwargs)
        super().__init__(group=group, target=function, name=name, daemon=daemon)

示例代码:

import time, random


def function_to_thread(n):
    count = 0
    while count < 3:
            print(f'still running thread {n}')
            count +=1
            time.sleep(3)
    result = random.random()
    print(f'Return value of thread {n} should be: {result}')
    return result


def main():
    thread1 = ThreadWithResult(target=function_to_thread, args=(1,))
    thread2 = ThreadWithResult(target=function_to_thread, args=(2,))
    thread1.start()
    thread2.start()
    thread1.join()
    thread2.join()
    print(thread1.result)
    print(thread2.result)

main()

解释: 我想大大简化事情,所以我创建了一个ThreadWithResult类,并让它继承threading.Thread。__init__中的嵌套函数函数调用我们想要保存值的线程函数,并将该嵌套函数的结果保存为实例属性self。线程执行完成后的结果。

创建this的实例与创建threading.Thread的实例是相同的。将希望在新线程上运行的函数传递给目标参数,将函数可能需要的任何参数传递给args参数,将任何关键字参数传递给kwargs参数。

e.g.

my_thread = ThreadWithResult(target=my_function, args=(arg1, arg2, arg3))

我认为这比绝大多数答案更容易理解,而且这种方法不需要额外的导入!我加入了time和random模块来模拟线程的行为,但它们并不是实现最初问题中所要求的功能所必需的。

我知道我是在这个问题被问到很久之后才回答的,但我希望这能在未来帮助更多的人!


编辑:我创建了保存线程结果的PyPI包,允许你访问上面相同的代码,并在项目中重用它(GitHub代码在这里)。PyPI包完全扩展了线程。线程类,因此您可以设置在线程上设置的任何属性。线程在ThreadWithResult类!

上面的原始答案介绍了这个子类背后的主要思想,但要了解更多信息,请参阅这里更详细的解释(来自模块docstring)。

快速使用示例:

pip3 install -U save-thread-result     # MacOS/Linux
pip  install -U save-thread-result     # Windows

python3     # MacOS/Linux
python      # Windows
from save_thread_result import ThreadWithResult

# As of Release 0.0.3, you can also specify values for
#`group`, `name`, and `daemon` if you want to set those
# values manually.
thread = ThreadWithResult(
    target = my_function,
    args   = (my_function_arg1, my_function_arg2, ...)
    kwargs = {my_function_kwarg1: kwarg1_value, my_function_kwarg2: kwarg2_value, ...}
)

thread.start()
thread.join()
if getattr(thread, 'result', None):
    print(thread.result)
else:
    # thread.result attribute not set - something caused
    # the thread to terminate BEFORE the thread finished
    # executing the function passed in through the
    # `target` argument
    print('ERROR! Something went wrong while executing this thread, and the function you passed in did NOT complete!!')

# seeing help about the class and information about the threading.Thread super class methods and attributes available:
help(ThreadWithResult)

定义你的目标 1)采取一个论点q 2)用q.put(foo)替换return foo的任何语句;返回

一个函数

def func(a):
    ans = a * a
    return ans

将成为

def func(a, q):
    ans = a * a
    q.put(ans)
    return

然后你就可以这样做了

from Queue import Queue
from threading import Thread

ans_q = Queue()
arg_tups = [(i, ans_q) for i in xrange(10)]

threads = [Thread(target=func, args=arg_tup) for arg_tup in arg_tups]
_ = [t.start() for t in threads]
_ = [t.join() for t in threads]
results = [q.get() for _ in xrange(len(threads))]

你可以使用函数装饰器/包装器来实现它,这样你就可以使用现有的函数作为目标,而不需要修改它们,但要遵循这个基本方案。

join总是返回None,我认为你应该子类化Thread来处理返回代码等。