我对Python和多线程编程非常陌生。基本上,我有一个脚本,将文件复制到另一个位置。我想把这个放在另一个线程,这样我就可以输出....表示脚本仍在运行。
我遇到的问题是,如果文件不能复制,它将抛出异常。如果在主线程中运行,这是可以的;但是,使用以下代码是无效的:
try:
threadClass = TheThread(param1, param2, etc.)
threadClass.start() ##### **Exception takes place here**
except:
print "Caught an exception"
在线程类本身中,我试图重新抛出异常,但它不起作用。我在这里看到有人问类似的问题,但他们似乎都在做一些比我试图做的更具体的事情(我不太理解所提供的解决方案)。我看到有人提到sys.exc_info()的用法,但我不知道在哪里或如何使用它。
编辑:线程类的代码如下:
class TheThread(threading.Thread):
def __init__(self, sourceFolder, destFolder):
threading.Thread.__init__(self)
self.sourceFolder = sourceFolder
self.destFolder = destFolder
def run(self):
try:
shul.copytree(self.sourceFolder, self.destFolder)
except:
raise
虽然不可能直接捕获在不同线程中抛出的异常,但下面的代码可以相当透明地获取与此功能非常接近的内容。子线程必须继承ExThread类而不是线程。线程和父线程在等待线程完成任务时必须调用child_thread.join_with_exception()方法,而不是child_thread.join()方法。
此实现的技术细节:当子线程抛出异常时,它将通过Queue传递给父线程,并在父线程中再次抛出。注意,在这种方法中没有忙碌等待。
#!/usr/bin/env python
import sys
import threading
import Queue
class ExThread(threading.Thread):
def __init__(self):
threading.Thread.__init__(self)
self.__status_queue = Queue.Queue()
def run_with_exception(self):
"""This method should be overriden."""
raise NotImplementedError
def run(self):
"""This method should NOT be overriden."""
try:
self.run_with_exception()
except BaseException:
self.__status_queue.put(sys.exc_info())
self.__status_queue.put(None)
def wait_for_exc_info(self):
return self.__status_queue.get()
def join_with_exception(self):
ex_info = self.wait_for_exc_info()
if ex_info is None:
return
else:
raise ex_info[1]
class MyException(Exception):
pass
class MyThread(ExThread):
def __init__(self):
ExThread.__init__(self)
def run_with_exception(self):
thread_name = threading.current_thread().name
raise MyException("An error in thread '{}'.".format(thread_name))
def main():
t = MyThread()
t.start()
try:
t.join_with_exception()
except MyException as ex:
thread_name = threading.current_thread().name
print "Caught a MyException in thread '{}': {}".format(thread_name, ex)
if __name__ == '__main__':
main()
并发。Futures模块使得在单独的线程(或进程)中工作并处理任何由此产生的异常变得简单:
import concurrent.futures
import shutil
def copytree_with_dots(src_path, dst_path):
with concurrent.futures.ThreadPoolExecutor(max_workers=1) as executor:
# Execute the copy on a separate thread,
# creating a future object to track progress.
future = executor.submit(shutil.copytree, src_path, dst_path)
while future.running():
# Print pretty dots here.
pass
# Return the value returned by shutil.copytree(), None.
# Raise any exceptions raised during the copy process.
return future.result()
并发。futures包含在Python 3.2中,并可作为早期版本的反向移植futures模块使用。
我喜欢的一种方法是基于观察者模式。我定义了一个信号类,线程用它向侦听器发出异常。它还可以用于从线程返回值。例子:
import threading
class Signal:
def __init__(self):
self._subscribers = list()
def emit(self, *args, **kwargs):
for func in self._subscribers:
func(*args, **kwargs)
def connect(self, func):
self._subscribers.append(func)
def disconnect(self, func):
try:
self._subscribers.remove(func)
except ValueError:
raise ValueError('Function {0} not removed from {1}'.format(func, self))
class WorkerThread(threading.Thread):
def __init__(self, *args, **kwargs):
super(WorkerThread, self).__init__(*args, **kwargs)
self.Exception = Signal()
self.Result = Signal()
def run(self):
if self._Thread__target is not None:
try:
self._return_value = self._Thread__target(*self._Thread__args, **self._Thread__kwargs)
except Exception as e:
self.Exception.emit(e)
else:
self.Result.emit(self._return_value)
if __name__ == '__main__':
import time
def handle_exception(exc):
print exc.message
def handle_result(res):
print res
def a():
time.sleep(1)
raise IOError('a failed')
def b():
time.sleep(2)
return 'b returns'
t = WorkerThread(target=a)
t2 = WorkerThread(target=b)
t.Exception.connect(handle_exception)
t2.Result.connect(handle_result)
t.start()
t2.start()
print 'Threads started'
t.join()
t2.join()
print 'Done'
我没有足够的使用线程的经验来断言这是一种完全安全的方法。但这对我来说很管用,我喜欢这种灵活性。
我喜欢这门课:
https://gist.github.com/earonesty/b88d60cb256b71443e42c4f1d949163e
import threading
from typing import Any
class PropagatingThread(threading.Thread):
"""A Threading Class that raises errors it caught, and returns the return value of the target on join."""
def __init__(self, *args, **kwargs):
self._target = None
self._args = ()
self._kwargs = {}
super().__init__(*args, **kwargs)
self.exception = None
self.return_value = None
assert self._target
def run(self):
"""Don't override this if you want the behavior of this class, use target instead."""
try:
if self._target:
self.return_value = self._target(*self._args, **self._kwargs)
except Exception as e:
self.exception = e
finally:
# see super().run() for why this is necessary
del self._target, self._args, self._kwargs
def join(self, timeout=None) -> Any:
super().join(timeout)
if self.exception:
raise self.exception
return self.return_value
这个问题有很多非常奇怪复杂的答案。我是不是把它简化了,因为对我来说,这似乎对大多数事情都足够了。
from threading import Thread
class PropagatingThread(Thread):
def run(self):
self.exc = None
try:
if hasattr(self, '_Thread__target'):
# Thread uses name mangling prior to Python 3.
self.ret = self._Thread__target(*self._Thread__args, **self._Thread__kwargs)
else:
self.ret = self._target(*self._args, **self._kwargs)
except BaseException as e:
self.exc = e
def join(self, timeout=None):
super(PropagatingThread, self).join(timeout)
if self.exc:
raise self.exc
return self.ret
如果你确定你只会在一个或另一个版本的Python上运行,你可以将run()方法减少到只有被破坏的版本(如果你只在3之前的Python版本上运行),或者仅仅是干净的版本(如果你只在3开始的Python版本上运行)。
使用示例:
def f(*args, **kwargs):
print(args)
print(kwargs)
raise Exception('I suck at this')
t = PropagatingThread(target=f, args=(5,), kwargs={'hello':'world'})
t.start()
t.join()
当您加入时,您将看到在另一个线程上引发异常。
如果您正在使用six或仅在Python 3上使用,则可以改进重新引发异常时获得的堆栈跟踪信息。您可以将内部异常包装在一个新的外部异常中,而不是仅在连接点处使用堆栈,并使用
six.raise_from(RuntimeError('Exception in thread'),self.exc)
or
raise RuntimeError('Exception in thread') from self.exc