我需要一次执行一定数量的任务4,就像这样:

ExecutorService taskExecutor = Executors.newFixedThreadPool(4);
while(...) {
    taskExecutor.execute(new MyTask());
}
//...wait for completion somehow

当所有这些都完成后,我如何得到通知?现在我想不出比设置一些全局任务计数器更好的方法,并在每个任务结束时减少它,然后在无限循环中监视这个计数器变成0;或获取一个期货列表,并在无限循环监视器isDone为所有它们。不涉及无限循环的更好的解决方案是什么?

谢谢。


当前回答

遵循以下方法之一。

遍历从ExecutorService上的submit返回的所有Future任务,并按照Kiran的建议使用Future对象上的阻塞调用get()检查状态 在ExecutorService上使用invokeAll() CountDownLatch ForkJoinPool或Executors.html#newWorkStealingPool 按照适当的顺序使用ThreadPoolExecutor的shutdown、awaitterminate、shutdownNow api

相关的SE问题:

CountDownLatch是如何在Java多线程中使用的?

如何正确关闭java ExecutorService

其他回答

这里有两个选择,只是有点困惑,哪个是最好的。

选项1:

ExecutorService es = Executors.newFixedThreadPool(4);
List<Runnable> tasks = getTasks();
CompletableFuture<?>[] futures = tasks.stream()
                               .map(task -> CompletableFuture.runAsync(task, es))
                               .toArray(CompletableFuture[]::new);
CompletableFuture.allOf(futures).join();    
es.shutdown();

选项2:

ExecutorService es = Executors.newFixedThreadPool(4);
List< Future<?>> futures = new ArrayList<>();
for(Runnable task : taskList) {
    futures.add(es.submit(task));
}

for(Future<?> future : futures) {
    try {
        future.get();
    }catch(Exception e){
        // do logging and nothing else
    }
}
es.shutdown();

这里放入future.get();试抓是个好主意,对吧?

如果你连续使用了更多的ExecutionServices线程,并且想要等待每个EXECUTIONSERVICE完成。最好的办法是像下面这样;

ExecutorService executer1 = Executors.newFixedThreadPool(THREAD_SIZE1);
for (<loop>) {
   executer1.execute(new Runnable() {
            @Override
            public void run() {
                ...
            }
        });
} 
executer1.shutdown();

try{
   executer1.awaitTermination(Long.MAX_VALUE, TimeUnit.NANOSECONDS);

   ExecutorService executer2 = Executors.newFixedThreadPool(THREAD_SIZE2);
   for (true) {
      executer2.execute(new Runnable() {
            @Override
            public void run() {
                 ...
            }
        });
   } 
   executer2.shutdown();
} catch (Exception e){
 ...
}

基本上在ExecutorService上调用shutdown(),然后调用awaitterminate ():

ExecutorService taskExecutor = Executors.newFixedThreadPool(4);
while(...) {
  taskExecutor.execute(new MyTask());
}
taskExecutor.shutdown();
try {
  taskExecutor.awaitTermination(Long.MAX_VALUE, TimeUnit.NANOSECONDS);
} catch (InterruptedException e) {
  ...
}

你也可以使用期货列表:

List<Future> futures = new ArrayList<Future>();
// now add to it:
futures.add(executorInstance.submit(new Callable<Void>() {
  public Void call() throws IOException {
     // do something
    return null;
  }
}));

然后当你想要连接所有线程时,它本质上等同于连接每个线程,(额外的好处是它会从子线程重新引发异常到主线程):

for(Future f: this.futures) { f.get(); }

Basically the trick is to call .get() on each Future one at a time, instead of infinite looping calling isDone() on (all or each). So you're guaranteed to "move on" through and past this block as soon as the last thread finishes. The caveat is that since the .get() call re-raises exceptions, if one of the threads dies, you would raise from this possibly before the other threads have finished to completion [to avoid this, you could add a catch ExecutionException around the get call]. The other caveat is it keeps a reference to all threads so if they have thread local variables they won't get collected till after you get past this block (though you might be able to get around this, if it became a problem, by removing Future's off the ArrayList). If you wanted to know which Future "finishes first" you could use some something like https://stackoverflow.com/a/31885029/32453

使用CountDownLatch:

CountDownLatch latch = new CountDownLatch(totalNumberOfTasks);
ExecutorService taskExecutor = Executors.newFixedThreadPool(4);
while(...) {
  taskExecutor.execute(new MyTask());
}

try {
  latch.await();
} catch (InterruptedException E) {
   // handle
}

在任务中(在try / finally中附上)

latch.countDown();