我有一个返回期货列表的方法
List<Future<O>> futures = getFutures();
现在,我想要等待,直到所有的future都成功地完成处理,或者由future返回输出的任何任务抛出异常。即使一个任务抛出异常,等待其他未来也没有意义。
简单的方法就是
wait() {
For(Future f : futures) {
try {
f.get();
} catch(Exception e) {
//TODO catch specific exception
// this future threw exception , means somone could not do its task
return;
}
}
}
但这里的问题是,例如,如果第4个future抛出异常,那么我将不必要地等待前3个future可用。
如何解决这个问题?倒计时插销有什么帮助吗?我无法使用未来的isDone,因为java文档说
boolean isDone()
Returns true if this task completed. Completion may be due to normal termination, an exception, or cancellation -- in all of these cases, this method will return true.
基于guava的解决方案可以使用Futures.FutureCombiner实现。
下面是在javadoc中给出的代码示例:
final ListenableFuture<Instant> loginDateFuture =
loginService.findLastLoginDate(username);
final ListenableFuture<List<String>> recentCommandsFuture =
recentCommandsService.findRecentCommands(username);
ListenableFuture<UsageHistory> usageFuture =
Futures.whenAllSucceed(loginDateFuture, recentCommandsFuture)
.call(
() ->
new UsageHistory(
username,
Futures.getDone(loginDateFuture),
Futures.getDone(recentCommandsFuture)),
executor);
要了解更多信息,请参阅用户指南的ListenableFutureExplained部分。
如果您对它的工作原理感到好奇,我建议查看源代码的这一部分:aggregatefuture .java#L127-L186
也许这会有帮助(没有什么会被原始的线取代,耶!)
我建议用一个分开的线程运行每个Future家伙(它们并行),然后当其中一个得到错误时,它只是信号管理器(Handler类)。
class Handler{
//...
private Thread thisThread;
private boolean failed=false;
private Thread[] trds;
public void waitFor(){
thisThread=Thread.currentThread();
List<Future<Object>> futures = getFutures();
trds=new Thread[futures.size()];
for (int i = 0; i < trds.length; i++) {
RunTask rt=new RunTask(futures.get(i), this);
trds[i]=new Thread(rt);
}
synchronized (this) {
for(Thread tx:trds){
tx.start();
}
}
for(Thread tx:trds){
try {tx.join();
} catch (InterruptedException e) {
System.out.println("Job failed!");break;
}
}if(!failed){System.out.println("Job Done");}
}
private List<Future<Object>> getFutures() {
return null;
}
public synchronized void cancelOther(){if(failed){return;}
failed=true;
for(Thread tx:trds){
tx.stop();//Deprecated but works here like a boss
}thisThread.interrupt();
}
//...
}
class RunTask implements Runnable{
private Future f;private Handler h;
public RunTask(Future f,Handler h){this.f=f;this.h=h;}
public void run(){
try{
f.get();//beware about state of working, the stop() method throws ThreadDeath Error at any thread state (unless it blocked by some operation)
}catch(Exception e){System.out.println("Error, stopping other guys...");h.cancelOther();}
catch(Throwable t){System.out.println("Oops, some other guy has stopped working...");}
}
}
我不得不说上面的代码会出错(没有检查),但我希望我可以解释解决方案。请试一试。
我有一个实用工具类,它包含这些:
@FunctionalInterface
public interface CheckedSupplier<X> {
X get() throws Throwable;
}
public static <X> Supplier<X> uncheckedSupplier(final CheckedSupplier<X> supplier) {
return () -> {
try {
return supplier.get();
} catch (final Throwable checkedException) {
throw new IllegalStateException(checkedException);
}
};
}
一旦你有了这些,使用静态导入,你可以像这样简单地等待所有的期货:
futures.stream().forEach(future -> uncheckedSupplier(future::get).get());
你也可以像这样收集他们的所有结果:
List<MyResultType> results = futures.stream()
.map(future -> uncheckedSupplier(future::get).get())
.collect(Collectors.toList());
只是重温我的旧帖子,注意到你有另一个悲伤:
但这里的问题是,例如,如果第4个future抛出异常,那么我将不必要地等待前3个future可用。
在这种情况下,简单的解决方案是并行执行:
futures.stream().parallel()
.forEach(future -> uncheckedSupplier(future::get).get());
这样,第一个异常虽然不会停止future,但会中断foreach语句,就像在串行示例中一样,但由于所有异常都是并行等待的,因此您不必等待前3个异常完成。