我试图使用Java的ThreadPoolExecutor类运行大量具有固定数量线程的重量级任务。每个任务都有许多可能由于异常而失败的地方。
我已经继承了ThreadPoolExecutor的子类,并且重写了afterExecute方法,该方法应该提供在运行任务时遇到的任何未捕获的异常。然而,我似乎不能让它工作。
例如:
public class ThreadPoolErrors extends ThreadPoolExecutor {
public ThreadPoolErrors() {
super( 1, // core threads
1, // max threads
1, // timeout
TimeUnit.MINUTES, // timeout units
new LinkedBlockingQueue<Runnable>() // work queue
);
}
protected void afterExecute(Runnable r, Throwable t) {
super.afterExecute(r, t);
if(t != null) {
System.out.println("Got an error: " + t);
} else {
System.out.println("Everything's fine--situation normal!");
}
}
public static void main( String [] args) {
ThreadPoolErrors threadPool = new ThreadPoolErrors();
threadPool.submit(
new Runnable() {
public void run() {
throw new RuntimeException("Ouch! Got an error.");
}
}
);
threadPool.shutdown();
}
}
这个程序的输出是“一切正常——情况正常!”,尽管提交给线程池的唯一Runnable抛出了异常。你知道这里发生了什么吗?
谢谢!
从文档中可以看出:
注意:当动作被包含在
task(例如FutureTask)
显式地或通过方法
提交时,这些任务对象会捕获和
维护计算性异常和
所以它们不会引起突然性
终止,和内部
异常不会传递给this
方法。
当你提交一个Runnable时,它会被包装在一个Future中。
你的afterExecute应该是这样的:
public final class ExtendedExecutor extends ThreadPoolExecutor {
// ...
protected void afterExecute(Runnable r, Throwable t) {
super.afterExecute(r, t);
if (t == null && r instanceof Future<?>) {
try {
Future<?> future = (Future<?>) r;
if (future.isDone()) {
future.get();
}
} catch (CancellationException ce) {
t = ce;
} catch (ExecutionException ee) {
t = ee.getCause();
} catch (InterruptedException ie) {
Thread.currentThread().interrupt();
}
}
if (t != null) {
System.out.println(t);
}
}
}
从文档中可以看出:
注意:当动作被包含在
task(例如FutureTask)
显式地或通过方法
提交时,这些任务对象会捕获和
维护计算性异常和
所以它们不会引起突然性
终止,和内部
异常不会传递给this
方法。
当你提交一个Runnable时,它会被包装在一个Future中。
你的afterExecute应该是这样的:
public final class ExtendedExecutor extends ThreadPoolExecutor {
// ...
protected void afterExecute(Runnable r, Throwable t) {
super.afterExecute(r, t);
if (t == null && r instanceof Future<?>) {
try {
Future<?> future = (Future<?>) r;
if (future.isDone()) {
future.get();
}
} catch (CancellationException ce) {
t = ce;
} catch (ExecutionException ee) {
t = ee.getCause();
} catch (InterruptedException ie) {
Thread.currentThread().interrupt();
}
}
if (t != null) {
System.out.println(t);
}
}
}
如果你的ExecutorService来自外部源(即不可能子类化ThreadPoolExecutor并覆盖afterExecute()),你可以使用动态代理来实现所需的行为:
public static ExecutorService errorAware(final ExecutorService executor) {
return (ExecutorService) Proxy.newProxyInstance(Thread.currentThread().getContextClassLoader(),
new Class[] {ExecutorService.class},
(proxy, method, args) -> {
if (method.getName().equals("submit")) {
final Object arg0 = args[0];
if (arg0 instanceof Runnable) {
args[0] = new Runnable() {
@Override
public void run() {
final Runnable task = (Runnable) arg0;
try {
task.run();
if (task instanceof Future<?>) {
final Future<?> future = (Future<?>) task;
if (future.isDone()) {
try {
future.get();
} catch (final CancellationException ce) {
// Your error-handling code here
ce.printStackTrace();
} catch (final ExecutionException ee) {
// Your error-handling code here
ee.getCause().printStackTrace();
} catch (final InterruptedException ie) {
Thread.currentThread().interrupt();
}
}
}
} catch (final RuntimeException re) {
// Your error-handling code here
re.printStackTrace();
throw re;
} catch (final Error e) {
// Your error-handling code here
e.printStackTrace();
throw e;
}
}
};
} else if (arg0 instanceof Callable<?>) {
args[0] = new Callable<Object>() {
@Override
public Object call() throws Exception {
final Callable<?> task = (Callable<?>) arg0;
try {
return task.call();
} catch (final Exception e) {
// Your error-handling code here
e.printStackTrace();
throw e;
} catch (final Error e) {
// Your error-handling code here
e.printStackTrace();
throw e;
}
}
};
}
}
return method.invoke(executor, args);
});
}
另一个解决方案是使用ManagedTask和ManagedTaskListener。
你需要一个Callable或Runnable来实现ManagedTask接口。
方法getManagedTaskListener返回你想要的实例。
public ManagedTaskListener getManagedTaskListener() {
你在ManagedTaskListener中实现了taskDone方法:
@Override
public void taskDone(Future<?> future, ManagedExecutorService executor, Object task, Throwable exception) {
if (exception != null) {
LOGGER.log(Level.SEVERE, exception.getMessage());
}
}
有关托管任务生命周期和侦听器的更多详细信息。