我对Python和多线程编程非常陌生。基本上,我有一个脚本,将文件复制到另一个位置。我想把这个放在另一个线程,这样我就可以输出....表示脚本仍在运行。

我遇到的问题是,如果文件不能复制,它将抛出异常。如果在主线程中运行,这是可以的;但是,使用以下代码是无效的:

try:
    threadClass = TheThread(param1, param2, etc.)
    threadClass.start()   ##### **Exception takes place here**
except:
    print "Caught an exception"

在线程类本身中,我试图重新抛出异常,但它不起作用。我在这里看到有人问类似的问题,但他们似乎都在做一些比我试图做的更具体的事情(我不太理解所提供的解决方案)。我看到有人提到sys.exc_info()的用法,但我不知道在哪里或如何使用它。

编辑:线程类的代码如下:

class TheThread(threading.Thread):
    def __init__(self, sourceFolder, destFolder):
        threading.Thread.__init__(self)
        self.sourceFolder = sourceFolder
        self.destFolder = destFolder
    
    def run(self):
        try:
           shul.copytree(self.sourceFolder, self.destFolder)
        except:
           raise

当前回答

我喜欢的一种方法是基于观察者模式。我定义了一个信号类,线程用它向侦听器发出异常。它还可以用于从线程返回值。例子:

import threading

class Signal:
    def __init__(self):
        self._subscribers = list()

    def emit(self, *args, **kwargs):
        for func in self._subscribers:
            func(*args, **kwargs)

    def connect(self, func):
        self._subscribers.append(func)

    def disconnect(self, func):
        try:
            self._subscribers.remove(func)
        except ValueError:
            raise ValueError('Function {0} not removed from {1}'.format(func, self))


class WorkerThread(threading.Thread):

    def __init__(self, *args, **kwargs):
        super(WorkerThread, self).__init__(*args, **kwargs)
        self.Exception = Signal()
        self.Result = Signal()

    def run(self):
        if self._Thread__target is not None:
            try:
                self._return_value = self._Thread__target(*self._Thread__args, **self._Thread__kwargs)
            except Exception as e:
                self.Exception.emit(e)
            else:
                self.Result.emit(self._return_value)

if __name__ == '__main__':
    import time

    def handle_exception(exc):
        print exc.message

    def handle_result(res):
        print res

    def a():
        time.sleep(1)
        raise IOError('a failed')

    def b():
        time.sleep(2)
        return 'b returns'

    t = WorkerThread(target=a)
    t2 = WorkerThread(target=b)
    t.Exception.connect(handle_exception)
    t2.Result.connect(handle_result)
    t.start()
    t2.start()

    print 'Threads started'

    t.join()
    t2.join()
    print 'Done'

我没有足够的使用线程的经验来断言这是一种完全安全的方法。但这对我来说很管用,我喜欢这种灵活性。

其他回答

并发。Futures模块使得在单独的线程(或进程)中工作并处理任何由此产生的异常变得简单:

import concurrent.futures
import shutil

def copytree_with_dots(src_path, dst_path):
    with concurrent.futures.ThreadPoolExecutor(max_workers=1) as executor:
        # Execute the copy on a separate thread,
        # creating a future object to track progress.
        future = executor.submit(shutil.copytree, src_path, dst_path)

        while future.running():
            # Print pretty dots here.
            pass

        # Return the value returned by shutil.copytree(), None.
        # Raise any exceptions raised during the copy process.
        return future.result()

并发。futures包含在Python 3.2中,并可作为早期版本的反向移植futures模块使用。

我知道我在这里有点晚了,但我有一个非常类似的问题,但它包括使用tkinter作为GUI,并且主循环使它不可能使用依赖于.join()的任何解决方案。因此,我调整了原问题EDIT中给出的解决方案,但使其更一般,以便于其他人更容易理解。

下面是运行中的新线程类:

import threading
import traceback
import logging


class ExceptionThread(threading.Thread):
    def __init__(self, *args, **kwargs):
        threading.Thread.__init__(self, *args, **kwargs)

    def run(self):
        try:
            if self._target:
                self._target(*self._args, **self._kwargs)
        except Exception:
            logging.error(traceback.format_exc())


def test_function_1(input):
    raise IndexError(input)


if __name__ == "__main__":
    input = 'useful'

    t1 = ExceptionThread(target=test_function_1, args=[input])
    t1.start()

当然,您总是可以让它以日志以外的其他方式处理异常,例如将其打印出来,或将其输出到控制台。

这允许您像使用Thread类一样使用ExceptionThread类,无需任何特殊修改。

我喜欢这门课:

https://gist.github.com/earonesty/b88d60cb256b71443e42c4f1d949163e

import threading
from typing import Any


class PropagatingThread(threading.Thread):
    """A Threading Class that raises errors it caught, and returns the return value of the target on join."""

    def __init__(self, *args, **kwargs):
        self._target = None
        self._args = ()
        self._kwargs = {}
        super().__init__(*args, **kwargs)
        self.exception = None
        self.return_value = None
        assert self._target

    def run(self):
        """Don't override this if you want the behavior of this class, use target instead."""
        try:
            if self._target:
                self.return_value = self._target(*self._args, **self._kwargs)
        except Exception as e:
            self.exception = e
        finally:
            # see super().run() for why this is necessary
            del self._target, self._args, self._kwargs

    def join(self, timeout=None) -> Any:
        super().join(timeout)
        if self.exception:
            raise self.exception
        return self.return_value

我做的是,简单的覆盖连接和运行线程的方法:

class RaisingThread(threading.Thread):
  def run(self):
    self._exc = None
    try:
      super().run()
    except Exception as e:
      self._exc = e

  def join(self, timeout=None):
    super().join(timeout=timeout)
    if self._exc:
      raise self._exc

用途如下:

def foo():
  time.sleep(2)
  print('hi, from foo!')
  raise Exception('exception from foo')    

t = RaisingThread(target=foo)
t.start()
try:
  t.join()
except Exception as e:
  print(e)

结果:

hi, from foo!
exception from foo!

虽然不可能直接捕获在不同线程中抛出的异常,但下面的代码可以相当透明地获取与此功能非常接近的内容。子线程必须继承ExThread类而不是线程。线程和父线程在等待线程完成任务时必须调用child_thread.join_with_exception()方法,而不是child_thread.join()方法。

此实现的技术细节:当子线程抛出异常时,它将通过Queue传递给父线程,并在父线程中再次抛出。注意,在这种方法中没有忙碌等待。

#!/usr/bin/env python

import sys
import threading
import Queue

class ExThread(threading.Thread):
    def __init__(self):
        threading.Thread.__init__(self)
        self.__status_queue = Queue.Queue()

    def run_with_exception(self):
        """This method should be overriden."""
        raise NotImplementedError

    def run(self):
        """This method should NOT be overriden."""
        try:
            self.run_with_exception()
        except BaseException:
            self.__status_queue.put(sys.exc_info())
        self.__status_queue.put(None)

    def wait_for_exc_info(self):
        return self.__status_queue.get()

    def join_with_exception(self):
        ex_info = self.wait_for_exc_info()
        if ex_info is None:
            return
        else:
            raise ex_info[1]

class MyException(Exception):
    pass

class MyThread(ExThread):
    def __init__(self):
        ExThread.__init__(self)

    def run_with_exception(self):
        thread_name = threading.current_thread().name
        raise MyException("An error in thread '{}'.".format(thread_name))

def main():
    t = MyThread()
    t.start()
    try:
        t.join_with_exception()
    except MyException as ex:
        thread_name = threading.current_thread().name
        print "Caught a MyException in thread '{}': {}".format(thread_name, ex)

if __name__ == '__main__':
    main()