我对Python和多线程编程非常陌生。基本上,我有一个脚本,将文件复制到另一个位置。我想把这个放在另一个线程,这样我就可以输出....表示脚本仍在运行。
我遇到的问题是,如果文件不能复制,它将抛出异常。如果在主线程中运行,这是可以的;但是,使用以下代码是无效的:
try:
threadClass = TheThread(param1, param2, etc.)
threadClass.start() ##### **Exception takes place here**
except:
print "Caught an exception"
在线程类本身中,我试图重新抛出异常,但它不起作用。我在这里看到有人问类似的问题,但他们似乎都在做一些比我试图做的更具体的事情(我不太理解所提供的解决方案)。我看到有人提到sys.exc_info()的用法,但我不知道在哪里或如何使用它。
编辑:线程类的代码如下:
class TheThread(threading.Thread):
def __init__(self, sourceFolder, destFolder):
threading.Thread.__init__(self)
self.sourceFolder = sourceFolder
self.destFolder = destFolder
def run(self):
try:
shul.copytree(self.sourceFolder, self.destFolder)
except:
raise
我使用这个版本,它是最小的,它工作得很好。
class SafeThread(threading.Thread):
def __init__(self, *args, **kwargs):
super(SafeThread, self).__init__(*args, **kwargs)
self.exception = None
def run(self) -> None:
try:
super(SafeThread, self).run()
except Exception as ex:
self.exception = ex
traceback.print_exc()
def join(self, *args, **kwargs) -> None:
super(SafeThread, self).join(*args, **kwargs)
if self.exception:
raise self.exception
要使用它,只需替换线程。带安全线程的线程
t = SafeThread(target = some_function, args = (some, args,))
t.start()
# do something else here if you want as the thread runs in the background
t.join()
如果在线程中发生异常,最好的方法是在连接期间在调用线程中重新引发它。您可以使用sys.exc_info()函数获取当前正在处理的异常的信息。此信息可以简单地存储为线程对象的属性,直到调用join,此时可以重新引发它。
注意,队列。队列(在其他回答中建议)在这个简单的情况下是不必要的,因为线程最多抛出1个异常,并且在抛出一个异常后立即完成。我们通过简单地等待线程完成来避免竞争条件。
例如,扩展ExcThread(如下),覆盖excRun(而不是run)。
Python 2. x:
import threading
class ExcThread(threading.Thread):
def excRun(self):
pass
def run(self):
self.exc = None
try:
# Possibly throws an exception
self.excRun()
except:
import sys
self.exc = sys.exc_info()
# Save details of the exception thrown but don't rethrow,
# just complete the function
def join(self):
threading.Thread.join(self)
if self.exc:
msg = "Thread '%s' threw an exception: %s" % (self.getName(), self.exc[1])
new_exc = Exception(msg)
raise new_exc.__class__, new_exc, self.exc[2]
Python 3. x:
在Python 3中,raise的参数形式为3,因此将最后一行更改为:
raise new_exc.with_traceback(self.exc[2])
虽然不可能直接捕获在不同线程中抛出的异常,但下面的代码可以相当透明地获取与此功能非常接近的内容。子线程必须继承ExThread类而不是线程。线程和父线程在等待线程完成任务时必须调用child_thread.join_with_exception()方法,而不是child_thread.join()方法。
此实现的技术细节:当子线程抛出异常时,它将通过Queue传递给父线程,并在父线程中再次抛出。注意,在这种方法中没有忙碌等待。
#!/usr/bin/env python
import sys
import threading
import Queue
class ExThread(threading.Thread):
def __init__(self):
threading.Thread.__init__(self)
self.__status_queue = Queue.Queue()
def run_with_exception(self):
"""This method should be overriden."""
raise NotImplementedError
def run(self):
"""This method should NOT be overriden."""
try:
self.run_with_exception()
except BaseException:
self.__status_queue.put(sys.exc_info())
self.__status_queue.put(None)
def wait_for_exc_info(self):
return self.__status_queue.get()
def join_with_exception(self):
ex_info = self.wait_for_exc_info()
if ex_info is None:
return
else:
raise ex_info[1]
class MyException(Exception):
pass
class MyThread(ExThread):
def __init__(self):
ExThread.__init__(self)
def run_with_exception(self):
thread_name = threading.current_thread().name
raise MyException("An error in thread '{}'.".format(thread_name))
def main():
t = MyThread()
t.start()
try:
t.join_with_exception()
except MyException as ex:
thread_name = threading.current_thread().name
print "Caught a MyException in thread '{}': {}".format(thread_name, ex)
if __name__ == '__main__':
main()
问题是thread_obj.start()立即返回。您所生成的子线程在它自己的上下文中使用自己的堆栈执行。在那里发生的任何异常都在子线程的上下文中,并且在它自己的堆栈中。我现在能想到的一种将此信息传递给父线程的方法是使用某种消息传递,因此您可以研究一下。
试试这个尺寸:
import sys
import threading
import queue
class ExcThread(threading.Thread):
def __init__(self, bucket):
threading.Thread.__init__(self)
self.bucket = bucket
def run(self):
try:
raise Exception('An error occured here.')
except Exception:
self.bucket.put(sys.exc_info())
def main():
bucket = queue.Queue()
thread_obj = ExcThread(bucket)
thread_obj.start()
while True:
try:
exc = bucket.get(block=False)
except queue.Empty:
pass
else:
exc_type, exc_obj, exc_trace = exc
# deal with the exception
print exc_type, exc_obj
print exc_trace
thread_obj.join(0.1)
if thread_obj.isAlive():
continue
else:
break
if __name__ == '__main__':
main()
这是一个棘手的小问题,我想提出我的解决方案。我发现了一些其他的解决方案(异步。例如IO)看起来很有前途,但也呈现出一些黑盒子。队列/事件循环方法将您与某个实现联系在一起。然而,并发期货的源代码只有大约1000行,很容易理解。它让我很容易地解决了我的问题:创建临时的工作线程,而不需要太多的设置,并且能够在主线程中捕获异常。
我的解决方案使用并发期货API和线程API。它允许你创建一个worker,给你线程和未来。这样,你就可以加入线程来等待结果:
worker = Worker(test)
thread = worker.start()
thread.join()
print(worker.future.result())
...或者你可以让worker在完成时发送一个回调:
worker = Worker(test)
thread = worker.start(lambda x: print('callback', x))
...或者你可以循环直到事件完成:
worker = Worker(test)
thread = worker.start()
while True:
print("waiting")
if worker.future.done():
exc = worker.future.exception()
print('exception?', exc)
result = worker.future.result()
print('result', result)
break
time.sleep(0.25)
代码如下:
from concurrent.futures import Future
import threading
import time
class Worker(object):
def __init__(self, fn, args=()):
self.future = Future()
self._fn = fn
self._args = args
def start(self, cb=None):
self._cb = cb
self.future.set_running_or_notify_cancel()
thread = threading.Thread(target=self.run, args=())
thread.daemon = True #this will continue thread execution after the main thread runs out of code - you can still ctrl + c or kill the process
thread.start()
return thread
def run(self):
try:
self.future.set_result(self._fn(*self._args))
except BaseException as e:
self.future.set_exception(e)
if(self._cb):
self._cb(self.future.result())
...和测试函数:
def test(*args):
print('args are', args)
time.sleep(2)
raise Exception('foo')
类似于RickardSjogren的方法,没有Queue, sys等,但也没有一些信号监听器:直接执行一个异常处理程序,对应于一个异常块。
#!/usr/bin/env python3
import threading
class ExceptionThread(threading.Thread):
def __init__(self, callback=None, *args, **kwargs):
"""
Redirect exceptions of thread to an exception handler.
:param callback: function to handle occured exception
:type callback: function(thread, exception)
:param args: arguments for threading.Thread()
:type args: tuple
:param kwargs: keyword arguments for threading.Thread()
:type kwargs: dict
"""
self._callback = callback
super().__init__(*args, **kwargs)
def run(self):
try:
if self._target:
self._target(*self._args, **self._kwargs)
except BaseException as e:
if self._callback is None:
raise e
else:
self._callback(self, e)
finally:
# Avoid a refcycle if the thread is running a function with
# an argument that has a member that points to the thread.
del self._target, self._args, self._kwargs, self._callback
只有自我。_callback和run()中的except块是普通threading.Thread之外的。