我对Python和多线程编程非常陌生。基本上,我有一个脚本,将文件复制到另一个位置。我想把这个放在另一个线程,这样我就可以输出....表示脚本仍在运行。

我遇到的问题是,如果文件不能复制,它将抛出异常。如果在主线程中运行,这是可以的;但是,使用以下代码是无效的:

try:
    threadClass = TheThread(param1, param2, etc.)
    threadClass.start()   ##### **Exception takes place here**
except:
    print "Caught an exception"

在线程类本身中,我试图重新抛出异常,但它不起作用。我在这里看到有人问类似的问题,但他们似乎都在做一些比我试图做的更具体的事情(我不太理解所提供的解决方案)。我看到有人提到sys.exc_info()的用法,但我不知道在哪里或如何使用它。

编辑:线程类的代码如下:

class TheThread(threading.Thread):
    def __init__(self, sourceFolder, destFolder):
        threading.Thread.__init__(self)
        self.sourceFolder = sourceFolder
        self.destFolder = destFolder
    
    def run(self):
        try:
           shul.copytree(self.sourceFolder, self.destFolder)
        except:
           raise

当前回答

作为线程的新手,我花了很长时间来理解如何实现Mateusz Kobos的代码(如上)。这里有一个明确的版本,以帮助您了解如何使用它。

#!/usr/bin/env python

import sys
import threading
import Queue

class ExThread(threading.Thread):
    def __init__(self):
        threading.Thread.__init__(self)
        self.__status_queue = Queue.Queue()

    def run_with_exception(self):
        """This method should be overriden."""
        raise NotImplementedError

    def run(self):
        """This method should NOT be overriden."""
        try:
            self.run_with_exception()
        except Exception:
            self.__status_queue.put(sys.exc_info())
        self.__status_queue.put(None)

    def wait_for_exc_info(self):
        return self.__status_queue.get()

    def join_with_exception(self):
        ex_info = self.wait_for_exc_info()
        if ex_info is None:
            return
        else:
            raise ex_info[1]

class MyException(Exception):
    pass

class MyThread(ExThread):
    def __init__(self):
        ExThread.__init__(self)

    # This overrides the "run_with_exception" from class "ExThread"
    # Note, this is where the actual thread to be run lives. The thread
    # to be run could also call a method or be passed in as an object
    def run_with_exception(self):
        # Code will function until the int
        print "sleeping 5 seconds"
        import time
        for i in 1, 2, 3, 4, 5:
            print i
            time.sleep(1) 
        # Thread should break here
        int("str")
# I'm honestly not sure why these appear here? So, I removed them. 
# Perhaps Mateusz can clarify?        
#         thread_name = threading.current_thread().name
#         raise MyException("An error in thread '{}'.".format(thread_name))

if __name__ == '__main__':
    # The code lives in MyThread in this example. So creating the MyThread 
    # object set the code to be run (but does not start it yet)
    t = MyThread()
    # This actually starts the thread
    t.start()
    print
    print ("Notice 't.start()' is considered to have completed, although" 
           " the countdown continues in its new thread. So you code "
           "can tinue into new processing.")
    # Now that the thread is running, the join allows for monitoring of it
    try:
        t.join_with_exception()
    # should be able to be replace "Exception" with specific error (untested)
    except Exception, e: 
        print
        print "Exceptioon was caught and control passed back to the main thread"
        print "Do some handling here...or raise a custom exception "
        thread_name = threading.current_thread().name
        e = ("Caught a MyException in thread: '" + 
             str(thread_name) + 
             "' [" + str(e) + "]")
        raise Exception(e) # Or custom class of exception, such as MyException

其他回答

如果在线程中发生异常,最好的方法是在连接期间在调用线程中重新引发它。您可以使用sys.exc_info()函数获取当前正在处理的异常的信息。此信息可以简单地存储为线程对象的属性,直到调用join,此时可以重新引发它。

注意,队列。队列(在其他回答中建议)在这个简单的情况下是不必要的,因为线程最多抛出1个异常,并且在抛出一个异常后立即完成。我们通过简单地等待线程完成来避免竞争条件。

例如,扩展ExcThread(如下),覆盖excRun(而不是run)。

Python 2. x:

import threading

class ExcThread(threading.Thread):
  def excRun(self):
    pass

  def run(self):
    self.exc = None
    try:
      # Possibly throws an exception
      self.excRun()
    except:
      import sys
      self.exc = sys.exc_info()
      # Save details of the exception thrown but don't rethrow,
      # just complete the function

  def join(self):
    threading.Thread.join(self)
    if self.exc:
      msg = "Thread '%s' threw an exception: %s" % (self.getName(), self.exc[1])
      new_exc = Exception(msg)
      raise new_exc.__class__, new_exc, self.exc[2]

Python 3. x:

在Python 3中,raise的参数形式为3,因此将最后一行更改为:

raise new_exc.with_traceback(self.exc[2])

concurrent.futures.as_completed

https://docs.python.org/3.7/library/concurrent.futures.html#concurrent.futures.as_completed

解决方案如下:

当调用异常时,立即返回主线程 不需要额外的用户定义类,因为它不需要: 显式队列 在工作线程周围添加except else

来源:

#!/usr/bin/env python3

import concurrent.futures
import time

def func_that_raises(do_raise):
    for i in range(3):
        print(i)
        time.sleep(0.1)
    if do_raise:
        raise Exception()
    for i in range(3):
        print(i)
        time.sleep(0.1)
    
with concurrent.futures.ThreadPoolExecutor(max_workers=2) as executor:
    futures = []
    futures.append(executor.submit(func_that_raises, False))
    futures.append(executor.submit(func_that_raises, True))
    for future in concurrent.futures.as_completed(futures):
        print(repr(future.exception()))

可能的输出:

0
0
1
1
2
2
0
Exception()
1
2
None

不幸的是,当一个期货失效时,不可能通过终止期货来取消其他期货:

concurrent.futures;Python:并发。如何使其可取消? 线程:有办法杀死一个线程吗? C pthreads:在Pthread库中杀死线程

如果你这样做:

for future in concurrent.futures.as_completed(futures):
    if future.exception() is not None:
        raise future.exception()

然后with语句捕获它,等待第二个线程完成,然后继续。以下行为类似:

for future in concurrent.futures.as_completed(futures):
    future.result()

因为future.result()在发生异常时重新引发异常。

如果你想退出整个Python进程,你可以使用os._exit(0),但这可能意味着你需要重构。

具有完美异常语义的自定义类

我最终为自己编写了一个完美的接口:限制一次运行的最大线程数的正确方法?部分“错误处理的队列示例”。该类的目标是既方便,又让您完全控制提交和结果/错误处理。

在Python 3.6.7, Ubuntu 18.04上测试。

我认为其他的解决方案有点复杂,如果你唯一想要的是真正看到某个异常,而不是完全无视和盲目。

解决方案是创建一个自定义线程,从主线程获取记录器并记录任何异常。

class ThreadWithLoggedException(threading.Thread):
    """
    Similar to Thread but will log exceptions to passed logger.

    Args:
        logger: Logger instance used to log any exception in child thread

    Exception is also reachable via <thread>.exception from the main thread.
    """

    def __init__(self, *args, **kwargs):
        try:
            self.logger = kwargs.pop("logger")
        except KeyError:
            raise Exception("Missing 'logger' in kwargs")
        super().__init__(*args, **kwargs)
        self.exception = None

    def run(self):
        try:
            if self._target is not None:
                self._target(*self._args, **self._kwargs)
        except Exception as exception:
            thread = threading.current_thread()
            self.exception = exception
            self.logger.exception(f"Exception in child thread {thread}: {exception}")
        finally:
            del self._target, self._args, self._kwargs

例子:

logger = logging.getLogger(__name__)
logger.setLevel(logging.INFO)
logger.addHandler(logging.StreamHandler())

def serve():
    raise Exception("Earth exploded.")

th = ThreadWithLoggedException(target=serve, logger=logger)
th.start()

主线程输出:

Exception in child thread <ThreadWithLoggedException(Thread-1, started 139922384414464)>: Earth exploded.
Traceback (most recent call last):
  File "/core/utils.py", line 108, in run
    self._target(*self._args, **self._kwargs)
  File "/myapp.py", line 105, in serve
    raise Exception("Earth exploded.")
Exception: Earth exploded.

这个问题有很多非常奇怪复杂的答案。我是不是把它简化了,因为对我来说,这似乎对大多数事情都足够了。

from threading import Thread

class PropagatingThread(Thread):
    def run(self):
        self.exc = None
        try:
            if hasattr(self, '_Thread__target'):
                # Thread uses name mangling prior to Python 3.
                self.ret = self._Thread__target(*self._Thread__args, **self._Thread__kwargs)
            else:
                self.ret = self._target(*self._args, **self._kwargs)
        except BaseException as e:
            self.exc = e

    def join(self, timeout=None):
        super(PropagatingThread, self).join(timeout)
        if self.exc:
            raise self.exc
        return self.ret

如果你确定你只会在一个或另一个版本的Python上运行,你可以将run()方法减少到只有被破坏的版本(如果你只在3之前的Python版本上运行),或者仅仅是干净的版本(如果你只在3开始的Python版本上运行)。

使用示例:

def f(*args, **kwargs):
    print(args)
    print(kwargs)
    raise Exception('I suck at this')

t = PropagatingThread(target=f, args=(5,), kwargs={'hello':'world'})
t.start()
t.join()

当您加入时,您将看到在另一个线程上引发异常。

如果您正在使用six或仅在Python 3上使用,则可以改进重新引发异常时获得的堆栈跟踪信息。您可以将内部异常包装在一个新的外部异常中,而不是仅在连接点处使用堆栈,并使用

six.raise_from(RuntimeError('Exception in thread'),self.exc)

or

raise RuntimeError('Exception in thread') from self.exc

在Python 3.8中,我们可以使用线程。Excepthook在所有子线程中钩住未捕获的异常!例如,

threading.excepthook = thread_exception_handler

推荐人:https://stackoverflow.com/a/60002752/5093308