我对Python和多线程编程非常陌生。基本上,我有一个脚本,将文件复制到另一个位置。我想把这个放在另一个线程,这样我就可以输出....表示脚本仍在运行。
我遇到的问题是,如果文件不能复制,它将抛出异常。如果在主线程中运行,这是可以的;但是,使用以下代码是无效的:
try:
threadClass = TheThread(param1, param2, etc.)
threadClass.start() ##### **Exception takes place here**
except:
print "Caught an exception"
在线程类本身中,我试图重新抛出异常,但它不起作用。我在这里看到有人问类似的问题,但他们似乎都在做一些比我试图做的更具体的事情(我不太理解所提供的解决方案)。我看到有人提到sys.exc_info()的用法,但我不知道在哪里或如何使用它。
编辑:线程类的代码如下:
class TheThread(threading.Thread):
def __init__(self, sourceFolder, destFolder):
threading.Thread.__init__(self)
self.sourceFolder = sourceFolder
self.destFolder = destFolder
def run(self):
try:
shul.copytree(self.sourceFolder, self.destFolder)
except:
raise
使用裸例外并不是一个好的实践,因为您通常会获得比您讨价还价时更多的东西。
我建议修改except以只捕获您想要处理的异常。我不认为引发它有预期的效果,因为当你在外层try中实例化TheThread时,如果它引发一个异常,赋值永远不会发生。
相反,你可能只想提醒它,然后继续前进,比如:
def run(self):
try:
shul.copytree(self.sourceFolder, self.destFolder)
except OSError, err:
print err
然后,当异常被捕获时,您可以在那里处理它。然后,当外部try从TheThread捕获异常时,您知道它不是您已经处理过的异常,并将帮助您隔离流程流。
捕获线程异常并与调用方方法通信的一个简单方法是将字典或列表传递给worker方法。
示例(将字典传递给工作方法):
import threading
def my_method(throw_me):
raise Exception(throw_me)
def worker(shared_obj, *args, **kwargs):
try:
shared_obj['target'](*args, **kwargs)
except Exception as err:
shared_obj['err'] = err
shared_obj = {'err':'', 'target': my_method}
throw_me = "Test"
th = threading.Thread(target=worker, args=(shared_obj, throw_me), kwargs={})
th.start()
th.join()
if shared_obj['err']:
print(">>%s" % shared_obj['err'])
我使用这个版本,它是最小的,它工作得很好。
class SafeThread(threading.Thread):
def __init__(self, *args, **kwargs):
super(SafeThread, self).__init__(*args, **kwargs)
self.exception = None
def run(self) -> None:
try:
super(SafeThread, self).run()
except Exception as ex:
self.exception = ex
traceback.print_exc()
def join(self, *args, **kwargs) -> None:
super(SafeThread, self).join(*args, **kwargs)
if self.exception:
raise self.exception
要使用它,只需替换线程。带安全线程的线程
t = SafeThread(target = some_function, args = (some, args,))
t.start()
# do something else here if you want as the thread runs in the background
t.join()
我喜欢的一种方法是基于观察者模式。我定义了一个信号类,线程用它向侦听器发出异常。它还可以用于从线程返回值。例子:
import threading
class Signal:
def __init__(self):
self._subscribers = list()
def emit(self, *args, **kwargs):
for func in self._subscribers:
func(*args, **kwargs)
def connect(self, func):
self._subscribers.append(func)
def disconnect(self, func):
try:
self._subscribers.remove(func)
except ValueError:
raise ValueError('Function {0} not removed from {1}'.format(func, self))
class WorkerThread(threading.Thread):
def __init__(self, *args, **kwargs):
super(WorkerThread, self).__init__(*args, **kwargs)
self.Exception = Signal()
self.Result = Signal()
def run(self):
if self._Thread__target is not None:
try:
self._return_value = self._Thread__target(*self._Thread__args, **self._Thread__kwargs)
except Exception as e:
self.Exception.emit(e)
else:
self.Result.emit(self._return_value)
if __name__ == '__main__':
import time
def handle_exception(exc):
print exc.message
def handle_result(res):
print res
def a():
time.sleep(1)
raise IOError('a failed')
def b():
time.sleep(2)
return 'b returns'
t = WorkerThread(target=a)
t2 = WorkerThread(target=b)
t.Exception.connect(handle_exception)
t2.Result.connect(handle_result)
t.start()
t2.start()
print 'Threads started'
t.join()
t2.join()
print 'Done'
我没有足够的使用线程的经验来断言这是一种完全安全的方法。但这对我来说很管用,我喜欢这种灵活性。
concurrent.futures.as_completed
https://docs.python.org/3.7/library/concurrent.futures.html#concurrent.futures.as_completed
解决方案如下:
当调用异常时,立即返回主线程
不需要额外的用户定义类,因为它不需要:
显式队列
在工作线程周围添加except else
来源:
#!/usr/bin/env python3
import concurrent.futures
import time
def func_that_raises(do_raise):
for i in range(3):
print(i)
time.sleep(0.1)
if do_raise:
raise Exception()
for i in range(3):
print(i)
time.sleep(0.1)
with concurrent.futures.ThreadPoolExecutor(max_workers=2) as executor:
futures = []
futures.append(executor.submit(func_that_raises, False))
futures.append(executor.submit(func_that_raises, True))
for future in concurrent.futures.as_completed(futures):
print(repr(future.exception()))
可能的输出:
0
0
1
1
2
2
0
Exception()
1
2
None
不幸的是,当一个期货失效时,不可能通过终止期货来取消其他期货:
concurrent.futures;Python:并发。如何使其可取消?
线程:有办法杀死一个线程吗?
C pthreads:在Pthread库中杀死线程
如果你这样做:
for future in concurrent.futures.as_completed(futures):
if future.exception() is not None:
raise future.exception()
然后with语句捕获它,等待第二个线程完成,然后继续。以下行为类似:
for future in concurrent.futures.as_completed(futures):
future.result()
因为future.result()在发生异常时重新引发异常。
如果你想退出整个Python进程,你可以使用os._exit(0),但这可能意味着你需要重构。
具有完美异常语义的自定义类
我最终为自己编写了一个完美的接口:限制一次运行的最大线程数的正确方法?部分“错误处理的队列示例”。该类的目标是既方便,又让您完全控制提交和结果/错误处理。
在Python 3.6.7, Ubuntu 18.04上测试。
我喜欢这门课:
https://gist.github.com/earonesty/b88d60cb256b71443e42c4f1d949163e
import threading
from typing import Any
class PropagatingThread(threading.Thread):
"""A Threading Class that raises errors it caught, and returns the return value of the target on join."""
def __init__(self, *args, **kwargs):
self._target = None
self._args = ()
self._kwargs = {}
super().__init__(*args, **kwargs)
self.exception = None
self.return_value = None
assert self._target
def run(self):
"""Don't override this if you want the behavior of this class, use target instead."""
try:
if self._target:
self.return_value = self._target(*self._args, **self._kwargs)
except Exception as e:
self.exception = e
finally:
# see super().run() for why this is necessary
del self._target, self._args, self._kwargs
def join(self, timeout=None) -> Any:
super().join(timeout)
if self.exception:
raise self.exception
return self.return_value