我试图使用Java的ThreadPoolExecutor类运行大量具有固定数量线程的重量级任务。每个任务都有许多可能由于异常而失败的地方。

我已经继承了ThreadPoolExecutor的子类,并且重写了afterExecute方法,该方法应该提供在运行任务时遇到的任何未捕获的异常。然而,我似乎不能让它工作。

例如:

public class ThreadPoolErrors extends ThreadPoolExecutor {
    public ThreadPoolErrors() {
        super(  1, // core threads
                1, // max threads
                1, // timeout
                TimeUnit.MINUTES, // timeout units
                new LinkedBlockingQueue<Runnable>() // work queue
        );
    }

    protected void afterExecute(Runnable r, Throwable t) {
        super.afterExecute(r, t);
        if(t != null) {
            System.out.println("Got an error: " + t);
        } else {
            System.out.println("Everything's fine--situation normal!");
        }
    }

    public static void main( String [] args) {
        ThreadPoolErrors threadPool = new ThreadPoolErrors();
        threadPool.submit( 
                new Runnable() {
                    public void run() {
                        throw new RuntimeException("Ouch! Got an error.");
                    }
                }
        );
        threadPool.shutdown();
    }
}

这个程序的输出是“一切正常——情况正常!”,尽管提交给线程池的唯一Runnable抛出了异常。你知道这里发生了什么吗?

谢谢!


当前回答

如果希望监视任务的执行,可以旋转1或2个线程(可能更多,这取决于负载),并使用它们从ExecutionCompletionService包装器中获取任务。

其他回答

这是

它来源于SingleThreadExecutor,但是你可以很容易地适应它 Java 8的lamdas代码,但是很容易修复

它会创建一个Executor单线程,可以得到很多任务;并将等待当前的一个结束执行,然后开始执行下一个

如果出现uncaugth错误或异常,uncaughtExceptionHandler将捕获它

public final class SingleThreadExecutorWithExceptions {

    public static ExecutorService newSingleThreadExecutorWithExceptions(final Thread.UncaughtExceptionHandler uncaughtExceptionHandler) {

        ThreadFactory factory = (Runnable runnable)  -> {
            final Thread newThread = new Thread(runnable, "SingleThreadExecutorWithExceptions");
            newThread.setUncaughtExceptionHandler( (final Thread caugthThread,final Throwable throwable) -> {
                uncaughtExceptionHandler.uncaughtException(caugthThread, throwable);
            });
            return newThread;
        };
        return new FinalizableDelegatedExecutorService
                (new ThreadPoolExecutor(1, 1,
                        0L, TimeUnit.MILLISECONDS,
                        new LinkedBlockingQueue(),
                        factory){


                    protected void afterExecute(Runnable runnable, Throwable throwable) {
                        super.afterExecute(runnable, throwable);
                        if (throwable == null && runnable instanceof Future) {
                            try {
                                Future future = (Future) runnable;
                                if (future.isDone()) {
                                    future.get();
                                }
                            } catch (CancellationException ce) {
                                throwable = ce;
                            } catch (ExecutionException ee) {
                                throwable = ee.getCause();
                            } catch (InterruptedException ie) {
                                Thread.currentThread().interrupt(); // ignore/reset
                            }
                        }
                        if (throwable != null) {
                            uncaughtExceptionHandler.uncaughtException(Thread.currentThread(),throwable);
                        }
                    }
                });
    }



    private static class FinalizableDelegatedExecutorService
            extends DelegatedExecutorService {
        FinalizableDelegatedExecutorService(ExecutorService executor) {
            super(executor);
        }
        protected void finalize() {
            super.shutdown();
        }
    }

    /**
     * A wrapper class that exposes only the ExecutorService methods
     * of an ExecutorService implementation.
     */
    private static class DelegatedExecutorService extends AbstractExecutorService {
        private final ExecutorService e;
        DelegatedExecutorService(ExecutorService executor) { e = executor; }
        public void execute(Runnable command) { e.execute(command); }
        public void shutdown() { e.shutdown(); }
        public List shutdownNow() { return e.shutdownNow(); }
        public boolean isShutdown() { return e.isShutdown(); }
        public boolean isTerminated() { return e.isTerminated(); }
        public boolean awaitTermination(long timeout, TimeUnit unit)
                throws InterruptedException {
            return e.awaitTermination(timeout, unit);
        }
        public Future submit(Runnable task) {
            return e.submit(task);
        }
        public  Future submit(Callable task) {
            return e.submit(task);
        }
        public  Future submit(Runnable task, T result) {
            return e.submit(task, result);
        }
        public  List> invokeAll(Collection> tasks)
                throws InterruptedException {
            return e.invokeAll(tasks);
        }
        public  List> invokeAll(Collection> tasks,
                                             long timeout, TimeUnit unit)
                throws InterruptedException {
            return e.invokeAll(tasks, timeout, unit);
        }
        public  T invokeAny(Collection> tasks)
                throws InterruptedException, ExecutionException {
            return e.invokeAny(tasks);
        }
        public  T invokeAny(Collection> tasks,
                               long timeout, TimeUnit unit)
                throws InterruptedException, ExecutionException, TimeoutException {
            return e.invokeAny(tasks, timeout, unit);
        }
    }



    private SingleThreadExecutorWithExceptions() {}
}

我通过将提供的可运行文件打包提交给执行程序来解决这个问题。

CompletableFuture.runAsync(() -> {
        try {
              runnable.run();
        } catch (Throwable e) {
              Log.info(Concurrency.class, "runAsync", e);
        }
}, executorService);

这与mmm的解决方案类似,但更容易理解。让你的任务扩展一个封装run()方法的抽象类。

public abstract Task implements Runnable {

    public abstract void execute();

    public void run() {
      try {
        execute();
      } catch (Throwable t) {
        // handle it  
      }
    }
}


public MySampleTask extends Task {
    public void execute() {
        // heavy, error-prone code here
    }
}

我使用的是jcabi-log中的VerboseRunnable类,它吸收所有异常并记录它们。非常方便,例如:

import com.jcabi.log.VerboseRunnable;
scheduler.scheduleWithFixedDelay(
  new VerboseRunnable(
    Runnable() {
      public void run() { 
        // the code, which may throw
      }
    },
    true // it means that all exceptions will be swallowed and logged
  ),
  1, 1, TimeUnit.MILLISECONDS
);

医生的例子并没有给我想要的结果。

当线程进程被放弃(带有显式的interput();s)时,出现异常。

我也想保留这个“系统”。exit”的功能,一个正常的主线程有一个典型的抛出,我想这样程序员就不会被迫在代码上工作,而不得不担心它的上下文(…一个线程),如果出现任何错误,它必须是一个编程错误,或者这种情况必须解决的地方,手动捕获…没有必要过于复杂。

所以我修改了代码来满足我的需要。

    @Override 
    protected void afterExecute(Runnable r, Throwable t) { 
        super.afterExecute(r, t); 
        if (t == null && r instanceof Future<?>) { 
            Future<?> future = (Future<?>) r; 
            boolean terminate = false; 
                try { 
                    future.get(); 
                } catch (ExecutionException e) { 
                    terminate = true; 
                    e.printStackTrace(); 
                } catch (InterruptedException | CancellationException ie) {// ignore/reset 
                    Thread.currentThread().interrupt(); 
                } finally { 
                    if (terminate) System.exit(0); 
                } 
        } 
    }

但是要小心,这段代码基本上将你的线程转换为主线程异常,同时保持所有的并行属性…但是让我们面对现实,在系统的并行机制(扩展Thread)的功能上设计架构是错误的方法……除非严格要求使用事件驱动设计....但后来…如果这是需求,那么问题是:在这种情况下是否需要ExecutorService ?……也许不是。