我需要一次执行一定数量的任务4,就像这样:

ExecutorService taskExecutor = Executors.newFixedThreadPool(4);
while(...) {
    taskExecutor.execute(new MyTask());
}
//...wait for completion somehow

当所有这些都完成后,我如何得到通知?现在我想不出比设置一些全局任务计数器更好的方法,并在每个任务结束时减少它,然后在无限循环中监视这个计数器变成0;或获取一个期货列表,并在无限循环监视器isDone为所有它们。不涉及无限循环的更好的解决方案是什么?

谢谢。


当前回答

只是在这里提供更多不同于使用闩锁/屏障的选择。 你也可以在它们全部使用完CompletionService之前得到部分结果。

来自Java并发实践: “如果您有一批计算要提交给Executor,并且您希望检索它们的结果 可用时,您可以保留与每个任务关联的Future,并通过调用get来重复轮询完成 超时为0。这是可能的,但很乏味。幸运的是,还有更好的方法:完井服务。”

这里是实现

public class TaskSubmiter {
    private final ExecutorService executor;
    TaskSubmiter(ExecutorService executor) { this.executor = executor; }
    void doSomethingLarge(AnySourceClass source) {
        final List<InterestedResult> info = doPartialAsyncProcess(source);
        CompletionService<PartialResult> completionService = new ExecutorCompletionService<PartialResult>(executor);
        for (final InterestedResult interestedResultItem : info)
            completionService.submit(new Callable<PartialResult>() {
                public PartialResult call() {
                    return InterestedResult.doAnOperationToGetPartialResult();
                }
        });

    try {
        for (int t = 0, n = info.size(); t < n; t++) {
            Future<PartialResult> f = completionService.take();
            PartialResult PartialResult = f.get();
            processThisSegment(PartialResult);
            }
        } 
        catch (InterruptedException e) {
            Thread.currentThread().interrupt();
        } 
        catch (ExecutionException e) {
            throw somethinghrowable(e.getCause());
        }
    }
}

其他回答

ExecutorService.invokeAll()为您完成。

ExecutorService taskExecutor = Executors.newFixedThreadPool(4);
List<Callable<?>> tasks; // your tasks
// invokeAll() returns when all tasks are complete
List<Future<?>> futures = taskExecutor.invokeAll(tasks);

Java 5及以后版本中的CyclicBarrier类就是为这类事情而设计的。

有点晚了,但为了完成…

不要“等待”所有的任务都完成,你可以用好莱坞的原则来思考,“不要给我打电话,我会给你打电话”——当我完成的时候。 我认为结果代码更优雅…

番石榴提供了一些有趣的工具来实现这一点。

一个例子:

将ExecutorService包装成ListeningExecutorService:

ListeningExecutorService service = MoreExecutors.listeningDecorator(Executors.newFixedThreadPool(10));

提交一个可调用对象的集合来执行::

for (Callable<Integer> callable : callables) {
  ListenableFuture<Integer> lf = service.submit(callable);
  // listenableFutures is a collection
  listenableFutures.add(lf)
});

现在最重要的部分:

ListenableFuture<List<Integer>> lf = Futures.successfulAsList(listenableFutures);

附加一个回调到ListenableFuture,当所有future完成时,你可以使用它来得到通知:

Futures.addCallback(lf, new FutureCallback<List<Integer>> () {
    @Override
    public void onSuccess(List<Integer> result) {
        // do something with all the results
    }

    @Override
    public void onFailure(Throwable t) {
        // log failure
    }
});

这也提供了一个好处,一旦处理完成,您就可以在一个地方收集所有的结果……

更多信息请点击这里

在执行器getActiveCount() -中有一个方法可以给出活动线程的计数。

在跨越线程之后,我们可以检查activeCount()值是否为0。一旦该值为零,就意味着当前没有活动线程在运行,这意味着任务已经完成:

while (true) {
    if (executor.getActiveCount() == 0) {
    //ur own piece of code
    break;
    }
}

使用CountDownLatch:

CountDownLatch latch = new CountDownLatch(totalNumberOfTasks);
ExecutorService taskExecutor = Executors.newFixedThreadPool(4);
while(...) {
  taskExecutor.execute(new MyTask());
}

try {
  latch.await();
} catch (InterruptedException E) {
   // handle
}

在任务中(在try / finally中附上)

latch.countDown();