我对Python和多线程编程非常陌生。基本上,我有一个脚本,将文件复制到另一个位置。我想把这个放在另一个线程,这样我就可以输出....表示脚本仍在运行。

我遇到的问题是,如果文件不能复制,它将抛出异常。如果在主线程中运行,这是可以的;但是,使用以下代码是无效的:

try:
    threadClass = TheThread(param1, param2, etc.)
    threadClass.start()   ##### **Exception takes place here**
except:
    print "Caught an exception"

在线程类本身中,我试图重新抛出异常,但它不起作用。我在这里看到有人问类似的问题,但他们似乎都在做一些比我试图做的更具体的事情(我不太理解所提供的解决方案)。我看到有人提到sys.exc_info()的用法,但我不知道在哪里或如何使用它。

编辑:线程类的代码如下:

class TheThread(threading.Thread):
    def __init__(self, sourceFolder, destFolder):
        threading.Thread.__init__(self)
        self.sourceFolder = sourceFolder
        self.destFolder = destFolder
    
    def run(self):
        try:
           shul.copytree(self.sourceFolder, self.destFolder)
        except:
           raise

当前回答

并发。Futures模块使得在单独的线程(或进程)中工作并处理任何由此产生的异常变得简单:

import concurrent.futures
import shutil

def copytree_with_dots(src_path, dst_path):
    with concurrent.futures.ThreadPoolExecutor(max_workers=1) as executor:
        # Execute the copy on a separate thread,
        # creating a future object to track progress.
        future = executor.submit(shutil.copytree, src_path, dst_path)

        while future.running():
            # Print pretty dots here.
            pass

        # Return the value returned by shutil.copytree(), None.
        # Raise any exceptions raised during the copy process.
        return future.result()

并发。futures包含在Python 3.2中,并可作为早期版本的反向移植futures模块使用。

其他回答

类似于RickardSjogren的方法,没有Queue, sys等,但也没有一些信号监听器:直接执行一个异常处理程序,对应于一个异常块。

#!/usr/bin/env python3

import threading

class ExceptionThread(threading.Thread):

    def __init__(self, callback=None, *args, **kwargs):
        """
        Redirect exceptions of thread to an exception handler.

        :param callback: function to handle occured exception
        :type callback: function(thread, exception)
        :param args: arguments for threading.Thread()
        :type args: tuple
        :param kwargs: keyword arguments for threading.Thread()
        :type kwargs: dict
        """
        self._callback = callback
        super().__init__(*args, **kwargs)

    def run(self):
        try:
            if self._target:
                self._target(*self._args, **self._kwargs)
        except BaseException as e:
            if self._callback is None:
                raise e
            else:
                self._callback(self, e)
        finally:
            # Avoid a refcycle if the thread is running a function with
            # an argument that has a member that points to the thread.
            del self._target, self._args, self._kwargs, self._callback

只有自我。_callback和run()中的except块是普通threading.Thread之外的。

Pygolang提供同步功能。工作组,特别是将异常从派生的工作线程传播到主线程。例如:

#!/usr/bin/env python
"""This program demostrates how with sync.WorkGroup an exception raised in
spawned thread is propagated into main thread which spawned the worker."""

from __future__ import print_function
from golang import sync, context

def T1(ctx, *argv):
    print('T1: run ... %r' % (argv,))
    raise RuntimeError('T1: problem')

def T2(ctx):
    print('T2: ran ok')

def main():
    wg = sync.WorkGroup(context.background())
    wg.go(T1, [1,2,3])
    wg.go(T2)

    try:
        wg.wait()
    except Exception as e:
        print('Tmain: caught exception: %r\n' %e)
        # reraising to see full traceback
        raise

if __name__ == '__main__':
    main()

在运行时给出以下结果:

T1: run ... ([1, 2, 3],)
T2: ran ok
Tmain: caught exception: RuntimeError('T1: problem',)

Traceback (most recent call last):
  File "./x.py", line 28, in <module>
    main()
  File "./x.py", line 21, in main
    wg.wait()
  File "golang/_sync.pyx", line 198, in golang._sync.PyWorkGroup.wait
    pyerr_reraise(pyerr)
  File "golang/_sync.pyx", line 178, in golang._sync.PyWorkGroup.go.pyrunf
    f(pywg._pyctx, *argv, **kw)
  File "./x.py", line 10, in T1
    raise RuntimeError('T1: problem')
RuntimeError: T1: problem

问题的原始代码将是:

    wg = sync.WorkGroup(context.background())

    def _(ctx):
        shul.copytree(sourceFolder, destFolder)
    wg.go(_)

    # waits for spawned worker to complete and, on error, reraises
    # its exception on the main thread.
    wg.wait()

我喜欢的一种方法是基于观察者模式。我定义了一个信号类,线程用它向侦听器发出异常。它还可以用于从线程返回值。例子:

import threading

class Signal:
    def __init__(self):
        self._subscribers = list()

    def emit(self, *args, **kwargs):
        for func in self._subscribers:
            func(*args, **kwargs)

    def connect(self, func):
        self._subscribers.append(func)

    def disconnect(self, func):
        try:
            self._subscribers.remove(func)
        except ValueError:
            raise ValueError('Function {0} not removed from {1}'.format(func, self))


class WorkerThread(threading.Thread):

    def __init__(self, *args, **kwargs):
        super(WorkerThread, self).__init__(*args, **kwargs)
        self.Exception = Signal()
        self.Result = Signal()

    def run(self):
        if self._Thread__target is not None:
            try:
                self._return_value = self._Thread__target(*self._Thread__args, **self._Thread__kwargs)
            except Exception as e:
                self.Exception.emit(e)
            else:
                self.Result.emit(self._return_value)

if __name__ == '__main__':
    import time

    def handle_exception(exc):
        print exc.message

    def handle_result(res):
        print res

    def a():
        time.sleep(1)
        raise IOError('a failed')

    def b():
        time.sleep(2)
        return 'b returns'

    t = WorkerThread(target=a)
    t2 = WorkerThread(target=b)
    t.Exception.connect(handle_exception)
    t2.Result.connect(handle_result)
    t.start()
    t2.start()

    print 'Threads started'

    t.join()
    t2.join()
    print 'Done'

我没有足够的使用线程的经验来断言这是一种完全安全的方法。但这对我来说很管用,我喜欢这种灵活性。

使用裸例外并不是一个好的实践,因为您通常会获得比您讨价还价时更多的东西。

我建议修改except以只捕获您想要处理的异常。我不认为引发它有预期的效果,因为当你在外层try中实例化TheThread时,如果它引发一个异常,赋值永远不会发生。

相反,你可能只想提醒它,然后继续前进,比如:

def run(self):
    try:
       shul.copytree(self.sourceFolder, self.destFolder)
    except OSError, err:
       print err

然后,当异常被捕获时,您可以在那里处理它。然后,当外部try从TheThread捕获异常时,您知道它不是您已经处理过的异常,并将帮助您隔离流程流。

这是一个棘手的小问题,我想提出我的解决方案。我发现了一些其他的解决方案(异步。例如IO)看起来很有前途,但也呈现出一些黑盒子。队列/事件循环方法将您与某个实现联系在一起。然而,并发期货的源代码只有大约1000行,很容易理解。它让我很容易地解决了我的问题:创建临时的工作线程,而不需要太多的设置,并且能够在主线程中捕获异常。

我的解决方案使用并发期货API和线程API。它允许你创建一个worker,给你线程和未来。这样,你就可以加入线程来等待结果:

worker = Worker(test)
thread = worker.start()
thread.join()
print(worker.future.result())

...或者你可以让worker在完成时发送一个回调:

worker = Worker(test)
thread = worker.start(lambda x: print('callback', x))

...或者你可以循环直到事件完成:

worker = Worker(test)
thread = worker.start()

while True:
    print("waiting")
    if worker.future.done():
        exc = worker.future.exception()
        print('exception?', exc)
        result = worker.future.result()
        print('result', result)           
        break
    time.sleep(0.25)

代码如下:

from concurrent.futures import Future
import threading
import time

class Worker(object):
    def __init__(self, fn, args=()):
        self.future = Future()
        self._fn = fn
        self._args = args

    def start(self, cb=None):
        self._cb = cb
        self.future.set_running_or_notify_cancel()
        thread = threading.Thread(target=self.run, args=())
        thread.daemon = True #this will continue thread execution after the main thread runs out of code - you can still ctrl + c or kill the process
        thread.start()
        return thread

    def run(self):
        try:
            self.future.set_result(self._fn(*self._args))
        except BaseException as e:
            self.future.set_exception(e)

        if(self._cb):
            self._cb(self.future.result())

...和测试函数:

def test(*args):
    print('args are', args)
    time.sleep(2)
    raise Exception('foo')