我有一个返回期货列表的方法

List<Future<O>> futures = getFutures();

现在,我想要等待,直到所有的future都成功地完成处理,或者由future返回输出的任何任务抛出异常。即使一个任务抛出异常,等待其他未来也没有意义。

简单的方法就是

wait() {

   For(Future f : futures) {
     try {
       f.get();
     } catch(Exception e) {
       //TODO catch specific exception
       // this future threw exception , means somone could not do its task
       return;
     }
   }
}

但这里的问题是,例如,如果第4个future抛出异常,那么我将不必要地等待前3个future可用。

如何解决这个问题?倒计时插销有什么帮助吗?我无法使用未来的isDone,因为java文档说

boolean isDone()
Returns true if this task completed. Completion may be due to normal termination, an exception, or cancellation -- in all of these cases, this method will return true.

当前回答

这就是我用来在未来列表上等待特定时间的方法。我认为它更干净。

CountDownLatch countDownLatch = new CountDownLatch(partitions.size());
// Some parallel work
        for (Something tp : somethings) {
            completionService.submit(() -> {
                try {
                   work(something)
                } catch (ConnectException e) {
                } finally {
                    countDownLatch.countDown();
                }
            });
        }    
  try {
        if (!countDownLatch.await(secondsToWait, TimeUnit.SECONDS)){
        }
    } catch (InterruptedException e) {
    }

其他回答

基于guava的解决方案可以使用Futures.FutureCombiner实现。

下面是在javadoc中给出的代码示例:

 final ListenableFuture<Instant> loginDateFuture =
     loginService.findLastLoginDate(username);
 final ListenableFuture<List<String>> recentCommandsFuture =
     recentCommandsService.findRecentCommands(username);
 ListenableFuture<UsageHistory> usageFuture =
     Futures.whenAllSucceed(loginDateFuture, recentCommandsFuture)
         .call(
             () ->
                 new UsageHistory(
                     username,
                     Futures.getDone(loginDateFuture),
                     Futures.getDone(recentCommandsFuture)),
             executor);

要了解更多信息,请参阅用户指南的ListenableFutureExplained部分。

如果您对它的工作原理感到好奇,我建议查看源代码的这一部分:aggregatefuture .java#L127-L186

在Java 8中使用CompletableFuture

    // Kick of multiple, asynchronous lookups
    CompletableFuture<User> page1 = gitHubLookupService.findUser("Test1");
    CompletableFuture<User> page2 = gitHubLookupService.findUser("Test2");
    CompletableFuture<User> page3 = gitHubLookupService.findUser("Test3");

    // Wait until they are all done
    CompletableFuture.allOf(page1,page2,page3).join();

    logger.info("--> " + page1.get());

也许这会有帮助(没有什么会被原始的线取代,耶!) 我建议用一个分开的线程运行每个Future家伙(它们并行),然后当其中一个得到错误时,它只是信号管理器(Handler类)。

class Handler{
//...
private Thread thisThread;
private boolean failed=false;
private Thread[] trds;
public void waitFor(){
  thisThread=Thread.currentThread();
  List<Future<Object>> futures = getFutures();
  trds=new Thread[futures.size()];
  for (int i = 0; i < trds.length; i++) {
    RunTask rt=new RunTask(futures.get(i), this);
    trds[i]=new Thread(rt);
  }
  synchronized (this) {
    for(Thread tx:trds){
      tx.start();
    }  
  }
  for(Thread tx:trds){
    try {tx.join();
    } catch (InterruptedException e) {
      System.out.println("Job failed!");break;
    }
  }if(!failed){System.out.println("Job Done");}
}

private List<Future<Object>> getFutures() {
  return null;
}

public synchronized void cancelOther(){if(failed){return;}
  failed=true;
  for(Thread tx:trds){
    tx.stop();//Deprecated but works here like a boss
  }thisThread.interrupt();
}
//...
}
class RunTask implements Runnable{
private Future f;private Handler h;
public RunTask(Future f,Handler h){this.f=f;this.h=h;}
public void run(){
try{
f.get();//beware about state of working, the stop() method throws ThreadDeath Error at any thread state (unless it blocked by some operation)
}catch(Exception e){System.out.println("Error, stopping other guys...");h.cancelOther();}
catch(Throwable t){System.out.println("Oops, some other guy has stopped working...");}
}
}

我不得不说上面的代码会出错(没有检查),但我希望我可以解释解决方案。请试一试。

我有一个实用工具类,它包含这些:

@FunctionalInterface
public interface CheckedSupplier<X> {
  X get() throws Throwable;
}

public static <X> Supplier<X> uncheckedSupplier(final CheckedSupplier<X> supplier) {
    return () -> {
        try {
            return supplier.get();
        } catch (final Throwable checkedException) {
            throw new IllegalStateException(checkedException);
        }
    };
}

一旦你有了这些,使用静态导入,你可以像这样简单地等待所有的期货:

futures.stream().forEach(future -> uncheckedSupplier(future::get).get());

你也可以像这样收集他们的所有结果:

List<MyResultType> results = futures.stream()
    .map(future -> uncheckedSupplier(future::get).get())
    .collect(Collectors.toList());

只是重温我的旧帖子,注意到你有另一个悲伤:

但这里的问题是,例如,如果第4个future抛出异常,那么我将不必要地等待前3个future可用。

在这种情况下,简单的解决方案是并行执行:

futures.stream().parallel()
 .forEach(future -> uncheckedSupplier(future::get).get());

这样,第一个异常虽然不会停止future,但会中断foreach语句,就像在串行示例中一样,但由于所有异常都是并行等待的,因此您不必等待前3个异常完成。

 /**
     * execute suppliers as future tasks then wait / join for getting results
     * @param functors a supplier(s) to execute
     * @return a list of results
     */
    private List getResultsInFuture(Supplier<?>... functors) {
        CompletableFuture[] futures = stream(functors)
                .map(CompletableFuture::supplyAsync)
                .collect(Collectors.toList())
                .toArray(new CompletableFuture[functors.length]);
        CompletableFuture.allOf(futures).join();
        return stream(futures).map(a-> {
            try {
                return a.get();
            } catch (InterruptedException | ExecutionException e) {
                //logger.error("an error occurred during runtime execution a function",e);
                return null;
            }
        }).collect(Collectors.toList());
    };