我对Python和多线程编程非常陌生。基本上,我有一个脚本,将文件复制到另一个位置。我想把这个放在另一个线程,这样我就可以输出....表示脚本仍在运行。
我遇到的问题是,如果文件不能复制,它将抛出异常。如果在主线程中运行,这是可以的;但是,使用以下代码是无效的:
try:
threadClass = TheThread(param1, param2, etc.)
threadClass.start() ##### **Exception takes place here**
except:
print "Caught an exception"
在线程类本身中,我试图重新抛出异常,但它不起作用。我在这里看到有人问类似的问题,但他们似乎都在做一些比我试图做的更具体的事情(我不太理解所提供的解决方案)。我看到有人提到sys.exc_info()的用法,但我不知道在哪里或如何使用它。
编辑:线程类的代码如下:
class TheThread(threading.Thread):
def __init__(self, sourceFolder, destFolder):
threading.Thread.__init__(self)
self.sourceFolder = sourceFolder
self.destFolder = destFolder
def run(self):
try:
shul.copytree(self.sourceFolder, self.destFolder)
except:
raise
虽然不可能直接捕获在不同线程中抛出的异常,但下面的代码可以相当透明地获取与此功能非常接近的内容。子线程必须继承ExThread类而不是线程。线程和父线程在等待线程完成任务时必须调用child_thread.join_with_exception()方法,而不是child_thread.join()方法。
此实现的技术细节:当子线程抛出异常时,它将通过Queue传递给父线程,并在父线程中再次抛出。注意,在这种方法中没有忙碌等待。
#!/usr/bin/env python
import sys
import threading
import Queue
class ExThread(threading.Thread):
def __init__(self):
threading.Thread.__init__(self)
self.__status_queue = Queue.Queue()
def run_with_exception(self):
"""This method should be overriden."""
raise NotImplementedError
def run(self):
"""This method should NOT be overriden."""
try:
self.run_with_exception()
except BaseException:
self.__status_queue.put(sys.exc_info())
self.__status_queue.put(None)
def wait_for_exc_info(self):
return self.__status_queue.get()
def join_with_exception(self):
ex_info = self.wait_for_exc_info()
if ex_info is None:
return
else:
raise ex_info[1]
class MyException(Exception):
pass
class MyThread(ExThread):
def __init__(self):
ExThread.__init__(self)
def run_with_exception(self):
thread_name = threading.current_thread().name
raise MyException("An error in thread '{}'.".format(thread_name))
def main():
t = MyThread()
t.start()
try:
t.join_with_exception()
except MyException as ex:
thread_name = threading.current_thread().name
print "Caught a MyException in thread '{}': {}".format(thread_name, ex)
if __name__ == '__main__':
main()
Pygolang提供同步功能。工作组,特别是将异常从派生的工作线程传播到主线程。例如:
#!/usr/bin/env python
"""This program demostrates how with sync.WorkGroup an exception raised in
spawned thread is propagated into main thread which spawned the worker."""
from __future__ import print_function
from golang import sync, context
def T1(ctx, *argv):
print('T1: run ... %r' % (argv,))
raise RuntimeError('T1: problem')
def T2(ctx):
print('T2: ran ok')
def main():
wg = sync.WorkGroup(context.background())
wg.go(T1, [1,2,3])
wg.go(T2)
try:
wg.wait()
except Exception as e:
print('Tmain: caught exception: %r\n' %e)
# reraising to see full traceback
raise
if __name__ == '__main__':
main()
在运行时给出以下结果:
T1: run ... ([1, 2, 3],)
T2: ran ok
Tmain: caught exception: RuntimeError('T1: problem',)
Traceback (most recent call last):
File "./x.py", line 28, in <module>
main()
File "./x.py", line 21, in main
wg.wait()
File "golang/_sync.pyx", line 198, in golang._sync.PyWorkGroup.wait
pyerr_reraise(pyerr)
File "golang/_sync.pyx", line 178, in golang._sync.PyWorkGroup.go.pyrunf
f(pywg._pyctx, *argv, **kw)
File "./x.py", line 10, in T1
raise RuntimeError('T1: problem')
RuntimeError: T1: problem
问题的原始代码将是:
wg = sync.WorkGroup(context.background())
def _(ctx):
shul.copytree(sourceFolder, destFolder)
wg.go(_)
# waits for spawned worker to complete and, on error, reraises
# its exception on the main thread.
wg.wait()
捕获线程异常并与调用方方法通信的一个简单方法是将字典或列表传递给worker方法。
示例(将字典传递给工作方法):
import threading
def my_method(throw_me):
raise Exception(throw_me)
def worker(shared_obj, *args, **kwargs):
try:
shared_obj['target'](*args, **kwargs)
except Exception as err:
shared_obj['err'] = err
shared_obj = {'err':'', 'target': my_method}
throw_me = "Test"
th = threading.Thread(target=worker, args=(shared_obj, throw_me), kwargs={})
th.start()
th.join()
if shared_obj['err']:
print(">>%s" % shared_obj['err'])
并发。Futures模块使得在单独的线程(或进程)中工作并处理任何由此产生的异常变得简单:
import concurrent.futures
import shutil
def copytree_with_dots(src_path, dst_path):
with concurrent.futures.ThreadPoolExecutor(max_workers=1) as executor:
# Execute the copy on a separate thread,
# creating a future object to track progress.
future = executor.submit(shutil.copytree, src_path, dst_path)
while future.running():
# Print pretty dots here.
pass
# Return the value returned by shutil.copytree(), None.
# Raise any exceptions raised during the copy process.
return future.result()
并发。futures包含在Python 3.2中,并可作为早期版本的反向移植futures模块使用。
类似于RickardSjogren的方法,没有Queue, sys等,但也没有一些信号监听器:直接执行一个异常处理程序,对应于一个异常块。
#!/usr/bin/env python3
import threading
class ExceptionThread(threading.Thread):
def __init__(self, callback=None, *args, **kwargs):
"""
Redirect exceptions of thread to an exception handler.
:param callback: function to handle occured exception
:type callback: function(thread, exception)
:param args: arguments for threading.Thread()
:type args: tuple
:param kwargs: keyword arguments for threading.Thread()
:type kwargs: dict
"""
self._callback = callback
super().__init__(*args, **kwargs)
def run(self):
try:
if self._target:
self._target(*self._args, **self._kwargs)
except BaseException as e:
if self._callback is None:
raise e
else:
self._callback(self, e)
finally:
# Avoid a refcycle if the thread is running a function with
# an argument that has a member that points to the thread.
del self._target, self._args, self._kwargs, self._callback
只有自我。_callback和run()中的except块是普通threading.Thread之外的。
作为线程的新手,我花了很长时间来理解如何实现Mateusz Kobos的代码(如上)。这里有一个明确的版本,以帮助您了解如何使用它。
#!/usr/bin/env python
import sys
import threading
import Queue
class ExThread(threading.Thread):
def __init__(self):
threading.Thread.__init__(self)
self.__status_queue = Queue.Queue()
def run_with_exception(self):
"""This method should be overriden."""
raise NotImplementedError
def run(self):
"""This method should NOT be overriden."""
try:
self.run_with_exception()
except Exception:
self.__status_queue.put(sys.exc_info())
self.__status_queue.put(None)
def wait_for_exc_info(self):
return self.__status_queue.get()
def join_with_exception(self):
ex_info = self.wait_for_exc_info()
if ex_info is None:
return
else:
raise ex_info[1]
class MyException(Exception):
pass
class MyThread(ExThread):
def __init__(self):
ExThread.__init__(self)
# This overrides the "run_with_exception" from class "ExThread"
# Note, this is where the actual thread to be run lives. The thread
# to be run could also call a method or be passed in as an object
def run_with_exception(self):
# Code will function until the int
print "sleeping 5 seconds"
import time
for i in 1, 2, 3, 4, 5:
print i
time.sleep(1)
# Thread should break here
int("str")
# I'm honestly not sure why these appear here? So, I removed them.
# Perhaps Mateusz can clarify?
# thread_name = threading.current_thread().name
# raise MyException("An error in thread '{}'.".format(thread_name))
if __name__ == '__main__':
# The code lives in MyThread in this example. So creating the MyThread
# object set the code to be run (but does not start it yet)
t = MyThread()
# This actually starts the thread
t.start()
print
print ("Notice 't.start()' is considered to have completed, although"
" the countdown continues in its new thread. So you code "
"can tinue into new processing.")
# Now that the thread is running, the join allows for monitoring of it
try:
t.join_with_exception()
# should be able to be replace "Exception" with specific error (untested)
except Exception, e:
print
print "Exceptioon was caught and control passed back to the main thread"
print "Do some handling here...or raise a custom exception "
thread_name = threading.current_thread().name
e = ("Caught a MyException in thread: '" +
str(thread_name) +
"' [" + str(e) + "]")
raise Exception(e) # Or custom class of exception, such as MyException