我试图使用Java的ThreadPoolExecutor类运行大量具有固定数量线程的重量级任务。每个任务都有许多可能由于异常而失败的地方。

我已经继承了ThreadPoolExecutor的子类,并且重写了afterExecute方法,该方法应该提供在运行任务时遇到的任何未捕获的异常。然而,我似乎不能让它工作。

例如:

public class ThreadPoolErrors extends ThreadPoolExecutor {
    public ThreadPoolErrors() {
        super(  1, // core threads
                1, // max threads
                1, // timeout
                TimeUnit.MINUTES, // timeout units
                new LinkedBlockingQueue<Runnable>() // work queue
        );
    }

    protected void afterExecute(Runnable r, Throwable t) {
        super.afterExecute(r, t);
        if(t != null) {
            System.out.println("Got an error: " + t);
        } else {
            System.out.println("Everything's fine--situation normal!");
        }
    }

    public static void main( String [] args) {
        ThreadPoolErrors threadPool = new ThreadPoolErrors();
        threadPool.submit( 
                new Runnable() {
                    public void run() {
                        throw new RuntimeException("Ouch! Got an error.");
                    }
                }
        );
        threadPool.shutdown();
    }
}

这个程序的输出是“一切正常——情况正常!”,尽管提交给线程池的唯一Runnable抛出了异常。你知道这里发生了什么吗?

谢谢!


当前回答

我使用的是jcabi-log中的VerboseRunnable类,它吸收所有异常并记录它们。非常方便,例如:

import com.jcabi.log.VerboseRunnable;
scheduler.scheduleWithFixedDelay(
  new VerboseRunnable(
    Runnable() {
      public void run() { 
        // the code, which may throw
      }
    },
    true // it means that all exceptions will be swallowed and logged
  ),
  1, 1, TimeUnit.MILLISECONDS
);

其他回答

警告:需要注意的是,此解决方案将阻塞将来的调用线程。get()。


如果您想处理任务抛出的异常,那么通常使用Callable而不是Runnable更好。

Callable.call()允许抛出检查过的异常,这些异常会传播回调用线程:

Callable task = ...
Future future = executor.submit(task);
// do something else in the meantime, and then...
try {
   future.get();
} catch (ExecutionException ex) {
   ex.getCause().printStackTrace();
}

如果Callable.call()抛出异常,则该异常将被包装在ExecutionException中并由Future.get()抛出。

这可能比继承ThreadPoolExecutor的子类要好得多。如果异常是可恢复的,它还为您提供了重新提交任务的机会。

而不是子类化ThreadPoolExecutor,我将为它提供一个创建新线程的ThreadFactory实例,并为它们提供一个UncaughtExceptionHandler

医生的例子并没有给我想要的结果。

当线程进程被放弃(带有显式的interput();s)时,出现异常。

我也想保留这个“系统”。exit”的功能,一个正常的主线程有一个典型的抛出,我想这样程序员就不会被迫在代码上工作,而不得不担心它的上下文(…一个线程),如果出现任何错误,它必须是一个编程错误,或者这种情况必须解决的地方,手动捕获…没有必要过于复杂。

所以我修改了代码来满足我的需要。

    @Override 
    protected void afterExecute(Runnable r, Throwable t) { 
        super.afterExecute(r, t); 
        if (t == null && r instanceof Future<?>) { 
            Future<?> future = (Future<?>) r; 
            boolean terminate = false; 
                try { 
                    future.get(); 
                } catch (ExecutionException e) { 
                    terminate = true; 
                    e.printStackTrace(); 
                } catch (InterruptedException | CancellationException ie) {// ignore/reset 
                    Thread.currentThread().interrupt(); 
                } finally { 
                    if (terminate) System.exit(0); 
                } 
        } 
    }

但是要小心,这段代码基本上将你的线程转换为主线程异常,同时保持所有的并行属性…但是让我们面对现实,在系统的并行机制(扩展Thread)的功能上设计架构是错误的方法……除非严格要求使用事件驱动设计....但后来…如果这是需求,那么问题是:在这种情况下是否需要ExecutorService ?……也许不是。

如果希望监视任务的执行,可以旋转1或2个线程(可能更多,这取决于负载),并使用它们从ExecutionCompletionService包装器中获取任务。

这是因为AbstractExecutorService:: submit将您的runnable包装为RunnableFuture(就是FutureTask),如下所示

AbstractExecutorService.java

public Future<?> submit(Runnable task) {
    if (task == null) throw new NullPointerException();
    RunnableFuture<Void> ftask = newTaskFor(task, null); /////////HERE////////
    execute(ftask);
    return ftask;
}

然后execute将它传递给Worker, Worker.run()将调用下面的。

ThreadPoolExecutor.java

final void runWorker(Worker w) {
    Thread wt = Thread.currentThread();
    Runnable task = w.firstTask;
    w.firstTask = null;
    w.unlock(); // allow interrupts
    boolean completedAbruptly = true;
    try {
        while (task != null || (task = getTask()) != null) {
            w.lock();
            // If pool is stopping, ensure thread is interrupted;
            // if not, ensure thread is not interrupted.  This
            // requires a recheck in second case to deal with
            // shutdownNow race while clearing interrupt
            if ((runStateAtLeast(ctl.get(), STOP) ||
                 (Thread.interrupted() &&
                  runStateAtLeast(ctl.get(), STOP))) &&
                !wt.isInterrupted())
                wt.interrupt();
            try {
                beforeExecute(wt, task);
                Throwable thrown = null;
                try {
                    task.run();           /////////HERE////////
                } catch (RuntimeException x) {
                    thrown = x; throw x;
                } catch (Error x) {
                    thrown = x; throw x;
                } catch (Throwable x) {
                    thrown = x; throw new Error(x);
                } finally {
                    afterExecute(task, thrown);
                }
            } finally {
                task = null;
                w.completedTasks++;
                w.unlock();
            }
        }
        completedAbruptly = false;
    } finally {
        processWorkerExit(w, completedAbruptly);
    }
}

最后task.run ();在上面的代码调用将调用 FutureTask.run()。下面是异常处理程序代码,因为 这样你就不会得到预期的异常。

class FutureTask<V> implements RunnableFuture<V>

public void run() {
    if (state != NEW ||
        !UNSAFE.compareAndSwapObject(this, runnerOffset,
                                     null, Thread.currentThread()))
        return;
    try {
        Callable<V> c = callable;
        if (c != null && state == NEW) {
            V result;
            boolean ran;
            try {
                result = c.call();
                ran = true;
            } catch (Throwable ex) {   /////////HERE////////
                result = null;
                ran = false;
                setException(ex);
            }
            if (ran)
                set(result);
        }
    } finally {
        // runner must be non-null until state is settled to
        // prevent concurrent calls to run()
        runner = null;
        // state must be re-read after nulling runner to prevent
        // leaked interrupts
        int s = state;
        if (s >= INTERRUPTING)
            handlePossibleCancellationInterrupt(s);
    }
}