我对Python和多线程编程非常陌生。基本上,我有一个脚本,将文件复制到另一个位置。我想把这个放在另一个线程,这样我就可以输出....表示脚本仍在运行。
我遇到的问题是,如果文件不能复制,它将抛出异常。如果在主线程中运行,这是可以的;但是,使用以下代码是无效的:
try:
threadClass = TheThread(param1, param2, etc.)
threadClass.start() ##### **Exception takes place here**
except:
print "Caught an exception"
在线程类本身中,我试图重新抛出异常,但它不起作用。我在这里看到有人问类似的问题,但他们似乎都在做一些比我试图做的更具体的事情(我不太理解所提供的解决方案)。我看到有人提到sys.exc_info()的用法,但我不知道在哪里或如何使用它。
编辑:线程类的代码如下:
class TheThread(threading.Thread):
def __init__(self, sourceFolder, destFolder):
threading.Thread.__init__(self)
self.sourceFolder = sourceFolder
self.destFolder = destFolder
def run(self):
try:
shul.copytree(self.sourceFolder, self.destFolder)
except:
raise
使用裸例外并不是一个好的实践,因为您通常会获得比您讨价还价时更多的东西。
我建议修改except以只捕获您想要处理的异常。我不认为引发它有预期的效果,因为当你在外层try中实例化TheThread时,如果它引发一个异常,赋值永远不会发生。
相反,你可能只想提醒它,然后继续前进,比如:
def run(self):
try:
shul.copytree(self.sourceFolder, self.destFolder)
except OSError, err:
print err
然后,当异常被捕获时,您可以在那里处理它。然后,当外部try从TheThread捕获异常时,您知道它不是您已经处理过的异常,并将帮助您隔离流程流。
这是一个棘手的小问题,我想提出我的解决方案。我发现了一些其他的解决方案(异步。例如IO)看起来很有前途,但也呈现出一些黑盒子。队列/事件循环方法将您与某个实现联系在一起。然而,并发期货的源代码只有大约1000行,很容易理解。它让我很容易地解决了我的问题:创建临时的工作线程,而不需要太多的设置,并且能够在主线程中捕获异常。
我的解决方案使用并发期货API和线程API。它允许你创建一个worker,给你线程和未来。这样,你就可以加入线程来等待结果:
worker = Worker(test)
thread = worker.start()
thread.join()
print(worker.future.result())
...或者你可以让worker在完成时发送一个回调:
worker = Worker(test)
thread = worker.start(lambda x: print('callback', x))
...或者你可以循环直到事件完成:
worker = Worker(test)
thread = worker.start()
while True:
print("waiting")
if worker.future.done():
exc = worker.future.exception()
print('exception?', exc)
result = worker.future.result()
print('result', result)
break
time.sleep(0.25)
代码如下:
from concurrent.futures import Future
import threading
import time
class Worker(object):
def __init__(self, fn, args=()):
self.future = Future()
self._fn = fn
self._args = args
def start(self, cb=None):
self._cb = cb
self.future.set_running_or_notify_cancel()
thread = threading.Thread(target=self.run, args=())
thread.daemon = True #this will continue thread execution after the main thread runs out of code - you can still ctrl + c or kill the process
thread.start()
return thread
def run(self):
try:
self.future.set_result(self._fn(*self._args))
except BaseException as e:
self.future.set_exception(e)
if(self._cb):
self._cb(self.future.result())
...和测试函数:
def test(*args):
print('args are', args)
time.sleep(2)
raise Exception('foo')
我喜欢的一种方法是基于观察者模式。我定义了一个信号类,线程用它向侦听器发出异常。它还可以用于从线程返回值。例子:
import threading
class Signal:
def __init__(self):
self._subscribers = list()
def emit(self, *args, **kwargs):
for func in self._subscribers:
func(*args, **kwargs)
def connect(self, func):
self._subscribers.append(func)
def disconnect(self, func):
try:
self._subscribers.remove(func)
except ValueError:
raise ValueError('Function {0} not removed from {1}'.format(func, self))
class WorkerThread(threading.Thread):
def __init__(self, *args, **kwargs):
super(WorkerThread, self).__init__(*args, **kwargs)
self.Exception = Signal()
self.Result = Signal()
def run(self):
if self._Thread__target is not None:
try:
self._return_value = self._Thread__target(*self._Thread__args, **self._Thread__kwargs)
except Exception as e:
self.Exception.emit(e)
else:
self.Result.emit(self._return_value)
if __name__ == '__main__':
import time
def handle_exception(exc):
print exc.message
def handle_result(res):
print res
def a():
time.sleep(1)
raise IOError('a failed')
def b():
time.sleep(2)
return 'b returns'
t = WorkerThread(target=a)
t2 = WorkerThread(target=b)
t.Exception.connect(handle_exception)
t2.Result.connect(handle_result)
t.start()
t2.start()
print 'Threads started'
t.join()
t2.join()
print 'Done'
我没有足够的使用线程的经验来断言这是一种完全安全的方法。但这对我来说很管用,我喜欢这种灵活性。
我认为其他的解决方案有点复杂,如果你唯一想要的是真正看到某个异常,而不是完全无视和盲目。
解决方案是创建一个自定义线程,从主线程获取记录器并记录任何异常。
class ThreadWithLoggedException(threading.Thread):
"""
Similar to Thread but will log exceptions to passed logger.
Args:
logger: Logger instance used to log any exception in child thread
Exception is also reachable via <thread>.exception from the main thread.
"""
def __init__(self, *args, **kwargs):
try:
self.logger = kwargs.pop("logger")
except KeyError:
raise Exception("Missing 'logger' in kwargs")
super().__init__(*args, **kwargs)
self.exception = None
def run(self):
try:
if self._target is not None:
self._target(*self._args, **self._kwargs)
except Exception as exception:
thread = threading.current_thread()
self.exception = exception
self.logger.exception(f"Exception in child thread {thread}: {exception}")
finally:
del self._target, self._args, self._kwargs
例子:
logger = logging.getLogger(__name__)
logger.setLevel(logging.INFO)
logger.addHandler(logging.StreamHandler())
def serve():
raise Exception("Earth exploded.")
th = ThreadWithLoggedException(target=serve, logger=logger)
th.start()
主线程输出:
Exception in child thread <ThreadWithLoggedException(Thread-1, started 139922384414464)>: Earth exploded.
Traceback (most recent call last):
File "/core/utils.py", line 108, in run
self._target(*self._args, **self._kwargs)
File "/myapp.py", line 105, in serve
raise Exception("Earth exploded.")
Exception: Earth exploded.
捕获线程异常并与调用方方法通信的一个简单方法是将字典或列表传递给worker方法。
示例(将字典传递给工作方法):
import threading
def my_method(throw_me):
raise Exception(throw_me)
def worker(shared_obj, *args, **kwargs):
try:
shared_obj['target'](*args, **kwargs)
except Exception as err:
shared_obj['err'] = err
shared_obj = {'err':'', 'target': my_method}
throw_me = "Test"
th = threading.Thread(target=worker, args=(shared_obj, throw_me), kwargs={})
th.start()
th.join()
if shared_obj['err']:
print(">>%s" % shared_obj['err'])
如果在线程中发生异常,最好的方法是在连接期间在调用线程中重新引发它。您可以使用sys.exc_info()函数获取当前正在处理的异常的信息。此信息可以简单地存储为线程对象的属性,直到调用join,此时可以重新引发它。
注意,队列。队列(在其他回答中建议)在这个简单的情况下是不必要的,因为线程最多抛出1个异常,并且在抛出一个异常后立即完成。我们通过简单地等待线程完成来避免竞争条件。
例如,扩展ExcThread(如下),覆盖excRun(而不是run)。
Python 2. x:
import threading
class ExcThread(threading.Thread):
def excRun(self):
pass
def run(self):
self.exc = None
try:
# Possibly throws an exception
self.excRun()
except:
import sys
self.exc = sys.exc_info()
# Save details of the exception thrown but don't rethrow,
# just complete the function
def join(self):
threading.Thread.join(self)
if self.exc:
msg = "Thread '%s' threw an exception: %s" % (self.getName(), self.exc[1])
new_exc = Exception(msg)
raise new_exc.__class__, new_exc, self.exc[2]
Python 3. x:
在Python 3中,raise的参数形式为3,因此将最后一行更改为:
raise new_exc.with_traceback(self.exc[2])