我试图使用Java的ThreadPoolExecutor类运行大量具有固定数量线程的重量级任务。每个任务都有许多可能由于异常而失败的地方。

我已经继承了ThreadPoolExecutor的子类,并且重写了afterExecute方法,该方法应该提供在运行任务时遇到的任何未捕获的异常。然而,我似乎不能让它工作。

例如:

public class ThreadPoolErrors extends ThreadPoolExecutor {
    public ThreadPoolErrors() {
        super(  1, // core threads
                1, // max threads
                1, // timeout
                TimeUnit.MINUTES, // timeout units
                new LinkedBlockingQueue<Runnable>() // work queue
        );
    }

    protected void afterExecute(Runnable r, Throwable t) {
        super.afterExecute(r, t);
        if(t != null) {
            System.out.println("Got an error: " + t);
        } else {
            System.out.println("Everything's fine--situation normal!");
        }
    }

    public static void main( String [] args) {
        ThreadPoolErrors threadPool = new ThreadPoolErrors();
        threadPool.submit( 
                new Runnable() {
                    public void run() {
                        throw new RuntimeException("Ouch! Got an error.");
                    }
                }
        );
        threadPool.shutdown();
    }
}

这个程序的输出是“一切正常——情况正常!”,尽管提交给线程池的唯一Runnable抛出了异常。你知道这里发生了什么吗?

谢谢!


当前回答

如果希望监视任务的执行,可以旋转1或2个线程(可能更多,这取决于负载),并使用它们从ExecutionCompletionService包装器中获取任务。

其他回答

警告:需要注意的是,此解决方案将阻塞将来的调用线程。get()。


如果您想处理任务抛出的异常,那么通常使用Callable而不是Runnable更好。

Callable.call()允许抛出检查过的异常,这些异常会传播回调用线程:

Callable task = ...
Future future = executor.submit(task);
// do something else in the meantime, and then...
try {
   future.get();
} catch (ExecutionException ex) {
   ex.getCause().printStackTrace();
}

如果Callable.call()抛出异常,则该异常将被包装在ExecutionException中并由Future.get()抛出。

这可能比继承ThreadPoolExecutor的子类要好得多。如果异常是可恢复的,它还为您提供了重新提交任务的机会。

而不是子类化ThreadPoolExecutor,我将为它提供一个创建新线程的ThreadFactory实例,并为它们提供一个UncaughtExceptionHandler

如果希望监视任务的执行,可以旋转1或2个线程(可能更多,这取决于负载),并使用它们从ExecutionCompletionService包装器中获取任务。

另一个解决方案是使用ManagedTask和ManagedTaskListener。

你需要一个Callable或Runnable来实现ManagedTask接口。

方法getManagedTaskListener返回你想要的实例。

public ManagedTaskListener getManagedTaskListener() {

你在ManagedTaskListener中实现了taskDone方法:

@Override
public void taskDone(Future<?> future, ManagedExecutorService executor, Object task, Throwable exception) {
    if (exception != null) {
        LOGGER.log(Level.SEVERE, exception.getMessage());
    }
}

有关托管任务生命周期和侦听器的更多详细信息。

我通过将提供的可运行文件打包提交给执行程序来解决这个问题。

CompletableFuture.runAsync(() -> {
        try {
              runnable.run();
        } catch (Throwable e) {
              Log.info(Concurrency.class, "runAsync", e);
        }
}, executorService);