我对Python和多线程编程非常陌生。基本上,我有一个脚本,将文件复制到另一个位置。我想把这个放在另一个线程,这样我就可以输出....表示脚本仍在运行。

我遇到的问题是,如果文件不能复制,它将抛出异常。如果在主线程中运行,这是可以的;但是,使用以下代码是无效的:

try:
    threadClass = TheThread(param1, param2, etc.)
    threadClass.start()   ##### **Exception takes place here**
except:
    print "Caught an exception"

在线程类本身中,我试图重新抛出异常,但它不起作用。我在这里看到有人问类似的问题,但他们似乎都在做一些比我试图做的更具体的事情(我不太理解所提供的解决方案)。我看到有人提到sys.exc_info()的用法,但我不知道在哪里或如何使用它。

编辑:线程类的代码如下:

class TheThread(threading.Thread):
    def __init__(self, sourceFolder, destFolder):
        threading.Thread.__init__(self)
        self.sourceFolder = sourceFolder
        self.destFolder = destFolder
    
    def run(self):
        try:
           shul.copytree(self.sourceFolder, self.destFolder)
        except:
           raise

当前回答

concurrent.futures.as_completed

https://docs.python.org/3.7/library/concurrent.futures.html#concurrent.futures.as_completed

解决方案如下:

当调用异常时,立即返回主线程 不需要额外的用户定义类,因为它不需要: 显式队列 在工作线程周围添加except else

来源:

#!/usr/bin/env python3

import concurrent.futures
import time

def func_that_raises(do_raise):
    for i in range(3):
        print(i)
        time.sleep(0.1)
    if do_raise:
        raise Exception()
    for i in range(3):
        print(i)
        time.sleep(0.1)
    
with concurrent.futures.ThreadPoolExecutor(max_workers=2) as executor:
    futures = []
    futures.append(executor.submit(func_that_raises, False))
    futures.append(executor.submit(func_that_raises, True))
    for future in concurrent.futures.as_completed(futures):
        print(repr(future.exception()))

可能的输出:

0
0
1
1
2
2
0
Exception()
1
2
None

不幸的是,当一个期货失效时,不可能通过终止期货来取消其他期货:

concurrent.futures;Python:并发。如何使其可取消? 线程:有办法杀死一个线程吗? C pthreads:在Pthread库中杀死线程

如果你这样做:

for future in concurrent.futures.as_completed(futures):
    if future.exception() is not None:
        raise future.exception()

然后with语句捕获它,等待第二个线程完成,然后继续。以下行为类似:

for future in concurrent.futures.as_completed(futures):
    future.result()

因为future.result()在发生异常时重新引发异常。

如果你想退出整个Python进程,你可以使用os._exit(0),但这可能意味着你需要重构。

具有完美异常语义的自定义类

我最终为自己编写了一个完美的接口:限制一次运行的最大线程数的正确方法?部分“错误处理的队列示例”。该类的目标是既方便,又让您完全控制提交和结果/错误处理。

在Python 3.6.7, Ubuntu 18.04上测试。

其他回答

这是一个棘手的小问题,我想提出我的解决方案。我发现了一些其他的解决方案(异步。例如IO)看起来很有前途,但也呈现出一些黑盒子。队列/事件循环方法将您与某个实现联系在一起。然而,并发期货的源代码只有大约1000行,很容易理解。它让我很容易地解决了我的问题:创建临时的工作线程,而不需要太多的设置,并且能够在主线程中捕获异常。

我的解决方案使用并发期货API和线程API。它允许你创建一个worker,给你线程和未来。这样,你就可以加入线程来等待结果:

worker = Worker(test)
thread = worker.start()
thread.join()
print(worker.future.result())

...或者你可以让worker在完成时发送一个回调:

worker = Worker(test)
thread = worker.start(lambda x: print('callback', x))

...或者你可以循环直到事件完成:

worker = Worker(test)
thread = worker.start()

while True:
    print("waiting")
    if worker.future.done():
        exc = worker.future.exception()
        print('exception?', exc)
        result = worker.future.result()
        print('result', result)           
        break
    time.sleep(0.25)

代码如下:

from concurrent.futures import Future
import threading
import time

class Worker(object):
    def __init__(self, fn, args=()):
        self.future = Future()
        self._fn = fn
        self._args = args

    def start(self, cb=None):
        self._cb = cb
        self.future.set_running_or_notify_cancel()
        thread = threading.Thread(target=self.run, args=())
        thread.daemon = True #this will continue thread execution after the main thread runs out of code - you can still ctrl + c or kill the process
        thread.start()
        return thread

    def run(self):
        try:
            self.future.set_result(self._fn(*self._args))
        except BaseException as e:
            self.future.set_exception(e)

        if(self._cb):
            self._cb(self.future.result())

...和测试函数:

def test(*args):
    print('args are', args)
    time.sleep(2)
    raise Exception('foo')

如果在线程中发生异常,最好的方法是在连接期间在调用线程中重新引发它。您可以使用sys.exc_info()函数获取当前正在处理的异常的信息。此信息可以简单地存储为线程对象的属性,直到调用join,此时可以重新引发它。

注意,队列。队列(在其他回答中建议)在这个简单的情况下是不必要的,因为线程最多抛出1个异常,并且在抛出一个异常后立即完成。我们通过简单地等待线程完成来避免竞争条件。

例如,扩展ExcThread(如下),覆盖excRun(而不是run)。

Python 2. x:

import threading

class ExcThread(threading.Thread):
  def excRun(self):
    pass

  def run(self):
    self.exc = None
    try:
      # Possibly throws an exception
      self.excRun()
    except:
      import sys
      self.exc = sys.exc_info()
      # Save details of the exception thrown but don't rethrow,
      # just complete the function

  def join(self):
    threading.Thread.join(self)
    if self.exc:
      msg = "Thread '%s' threw an exception: %s" % (self.getName(), self.exc[1])
      new_exc = Exception(msg)
      raise new_exc.__class__, new_exc, self.exc[2]

Python 3. x:

在Python 3中,raise的参数形式为3,因此将最后一行更改为:

raise new_exc.with_traceback(self.exc[2])

我喜欢这门课:

https://gist.github.com/earonesty/b88d60cb256b71443e42c4f1d949163e

import threading
from typing import Any


class PropagatingThread(threading.Thread):
    """A Threading Class that raises errors it caught, and returns the return value of the target on join."""

    def __init__(self, *args, **kwargs):
        self._target = None
        self._args = ()
        self._kwargs = {}
        super().__init__(*args, **kwargs)
        self.exception = None
        self.return_value = None
        assert self._target

    def run(self):
        """Don't override this if you want the behavior of this class, use target instead."""
        try:
            if self._target:
                self.return_value = self._target(*self._args, **self._kwargs)
        except Exception as e:
            self.exception = e
        finally:
            # see super().run() for why this is necessary
            del self._target, self._args, self._kwargs

    def join(self, timeout=None) -> Any:
        super().join(timeout)
        if self.exception:
            raise self.exception
        return self.return_value

捕获线程异常并与调用方方法通信的一个简单方法是将字典或列表传递给worker方法。

示例(将字典传递给工作方法):

import threading

def my_method(throw_me):
    raise Exception(throw_me)

def worker(shared_obj, *args, **kwargs):
    try:
        shared_obj['target'](*args, **kwargs)
    except Exception as err:
        shared_obj['err'] = err

shared_obj = {'err':'', 'target': my_method}
throw_me = "Test"

th = threading.Thread(target=worker, args=(shared_obj, throw_me), kwargs={})
th.start()
th.join()

if shared_obj['err']:
    print(">>%s" % shared_obj['err'])

Pygolang提供同步功能。工作组,特别是将异常从派生的工作线程传播到主线程。例如:

#!/usr/bin/env python
"""This program demostrates how with sync.WorkGroup an exception raised in
spawned thread is propagated into main thread which spawned the worker."""

from __future__ import print_function
from golang import sync, context

def T1(ctx, *argv):
    print('T1: run ... %r' % (argv,))
    raise RuntimeError('T1: problem')

def T2(ctx):
    print('T2: ran ok')

def main():
    wg = sync.WorkGroup(context.background())
    wg.go(T1, [1,2,3])
    wg.go(T2)

    try:
        wg.wait()
    except Exception as e:
        print('Tmain: caught exception: %r\n' %e)
        # reraising to see full traceback
        raise

if __name__ == '__main__':
    main()

在运行时给出以下结果:

T1: run ... ([1, 2, 3],)
T2: ran ok
Tmain: caught exception: RuntimeError('T1: problem',)

Traceback (most recent call last):
  File "./x.py", line 28, in <module>
    main()
  File "./x.py", line 21, in main
    wg.wait()
  File "golang/_sync.pyx", line 198, in golang._sync.PyWorkGroup.wait
    pyerr_reraise(pyerr)
  File "golang/_sync.pyx", line 178, in golang._sync.PyWorkGroup.go.pyrunf
    f(pywg._pyctx, *argv, **kw)
  File "./x.py", line 10, in T1
    raise RuntimeError('T1: problem')
RuntimeError: T1: problem

问题的原始代码将是:

    wg = sync.WorkGroup(context.background())

    def _(ctx):
        shul.copytree(sourceFolder, destFolder)
    wg.go(_)

    # waits for spawned worker to complete and, on error, reraises
    # its exception on the main thread.
    wg.wait()