我有一个返回期货列表的方法
List<Future<O>> futures = getFutures();
现在,我想要等待,直到所有的future都成功地完成处理,或者由future返回输出的任何任务抛出异常。即使一个任务抛出异常,等待其他未来也没有意义。
简单的方法就是
wait() {
For(Future f : futures) {
try {
f.get();
} catch(Exception e) {
//TODO catch specific exception
// this future threw exception , means somone could not do its task
return;
}
}
}
但这里的问题是,例如,如果第4个future抛出异常,那么我将不必要地等待前3个future可用。
如何解决这个问题?倒计时插销有什么帮助吗?我无法使用未来的isDone,因为java文档说
boolean isDone()
Returns true if this task completed. Completion may be due to normal termination, an exception, or cancellation -- in all of these cases, this method will return true.
CompletionService将使用.submit()方法获取你的Callables,你可以使用.take()方法检索计算出的期货。
您一定不能忘记的一件事是通过调用.shutdown()方法来终止ExecutorService。此外,只有在保存了对执行器服务的引用时才能调用此方法,因此请确保保留一个引用。
示例代码-对于要并行处理的固定数量的工作项:
ExecutorService service = Executors.newFixedThreadPool(Runtime.getRuntime().availableProcessors());
CompletionService<YourCallableImplementor> completionService =
new ExecutorCompletionService<YourCallableImplementor>(service);
ArrayList<Future<YourCallableImplementor>> futures = new ArrayList<Future<YourCallableImplementor>>();
for (String computeMe : elementsToCompute) {
futures.add(completionService.submit(new YourCallableImplementor(computeMe)));
}
//now retrieve the futures after computation (auto wait for it)
int received = 0;
while(received < elementsToCompute.size()) {
Future<YourCallableImplementor> resultFuture = completionService.take();
YourCallableImplementor result = resultFuture.get();
received ++;
}
//important: shutdown your ExecutorService
service.shutdown();
示例代码-对于要并行处理的动态数量的工作项:
public void runIt(){
ExecutorService service = Executors.newFixedThreadPool(Runtime.getRuntime().availableProcessors());
CompletionService<CallableImplementor> completionService = new ExecutorCompletionService<CallableImplementor>(service);
ArrayList<Future<CallableImplementor>> futures = new ArrayList<Future<CallableImplementor>>();
//Initial workload is 8 threads
for (int i = 0; i < 9; i++) {
futures.add(completionService.submit(write.new CallableImplementor()));
}
boolean finished = false;
while (!finished) {
try {
Future<CallableImplementor> resultFuture;
resultFuture = completionService.take();
CallableImplementor result = resultFuture.get();
finished = doSomethingWith(result.getResult());
result.setResult(null);
result = null;
resultFuture = null;
//After work package has been finished create new work package and add it to futures
futures.add(completionService.submit(write.new CallableImplementor()));
} catch (InterruptedException | ExecutionException e) {
//handle interrupted and assert correct thread / work packet count
}
}
//important: shutdown your ExecutorService
service.shutdown();
}
public class CallableImplementor implements Callable{
boolean result;
@Override
public CallableImplementor call() throws Exception {
//business logic goes here
return this;
}
public boolean getResult() {
return result;
}
public void setResult(boolean result) {
this.result = result;
}
}