我需要一次执行一定数量的任务4,就像这样:

ExecutorService taskExecutor = Executors.newFixedThreadPool(4);
while(...) {
    taskExecutor.execute(new MyTask());
}
//...wait for completion somehow

当所有这些都完成后,我如何得到通知?现在我想不出比设置一些全局任务计数器更好的方法,并在每个任务结束时减少它,然后在无限循环中监视这个计数器变成0;或获取一个期货列表,并在无限循环监视器isDone为所有它们。不涉及无限循环的更好的解决方案是什么?

谢谢。


当前回答

Project Loom的AutoCloseable执行器服务上的Try-with-Resources语法

Project Loom试图为Java中的并发能力添加新特性。

其中一个特性是使ExecutorService可自动关闭。这意味着每个ExecutorService实现都将提供一个close方法。这意味着我们可以使用try-with-resources语法自动关闭ExecutorService对象。

executorservice# close方法将阻塞,直到所有提交的任务都完成。使用close代替了调用shutdown & awaitterminate。

自动关闭功能有助于Project Loom将“结构化并发”引入Java。

try (
    ExecutorService executorService = Executors.… ;
) {
    // Submit your `Runnable`/`Callable` tasks to the executor service.
    …
}
// At this point, flow-of-control blocks until all submitted tasks are done/canceled/failed.
// After this point, the executor service will have been automatically shutdown, wia `close` method called by try-with-resources syntax.

有关Project Loom的更多信息,请搜索由Ron Pressler和Project Loom团队的其他人所做的演讲和采访。关注更近期的,如Project Loom的发展。

Project Loom技术的实验版本现在已经可以使用了,它是基于早期的Java 18。

其他回答

在Java8中,你可以用CompletableFuture来实现:

ExecutorService es = Executors.newFixedThreadPool(4);
List<Runnable> tasks = getTasks();
CompletableFuture<?>[] futures = tasks.stream()
                               .map(task -> CompletableFuture.runAsync(task, es))
                               .toArray(CompletableFuture[]::new);
CompletableFuture.allOf(futures).join();    
es.shutdown();

你可以把你的任务包装在另一个可运行文件中,它会发送通知:

taskExecutor.execute(new Runnable() {
  public void run() {
    taskStartedNotification();
    new MyTask().run();
    taskFinishedNotification();
  }
});

只是在这里提供更多不同于使用闩锁/屏障的选择。 你也可以在它们全部使用完CompletionService之前得到部分结果。

来自Java并发实践: “如果您有一批计算要提交给Executor,并且您希望检索它们的结果 可用时,您可以保留与每个任务关联的Future,并通过调用get来重复轮询完成 超时为0。这是可能的,但很乏味。幸运的是,还有更好的方法:完井服务。”

这里是实现

public class TaskSubmiter {
    private final ExecutorService executor;
    TaskSubmiter(ExecutorService executor) { this.executor = executor; }
    void doSomethingLarge(AnySourceClass source) {
        final List<InterestedResult> info = doPartialAsyncProcess(source);
        CompletionService<PartialResult> completionService = new ExecutorCompletionService<PartialResult>(executor);
        for (final InterestedResult interestedResultItem : info)
            completionService.submit(new Callable<PartialResult>() {
                public PartialResult call() {
                    return InterestedResult.doAnOperationToGetPartialResult();
                }
        });

    try {
        for (int t = 0, n = info.size(); t < n; t++) {
            Future<PartialResult> f = completionService.take();
            PartialResult PartialResult = f.get();
            processThisSegment(PartialResult);
            }
        } 
        catch (InterruptedException e) {
            Thread.currentThread().interrupt();
        } 
        catch (ExecutionException e) {
            throw somethinghrowable(e.getCause());
        }
    }
}

Java 5及以后版本中的CyclicBarrier类就是为这类事情而设计的。

你应该使用executorService.shutdown()和executorService。awaitTermination方法。

示例如下:

public class ScheduledThreadPoolExample {

    public static void main(String[] args) throws InterruptedException {
        ScheduledExecutorService executorService = Executors.newScheduledThreadPool(5);
        executorService.scheduleAtFixedRate(() -> System.out.println("process task."),
                0, 1, TimeUnit.SECONDS);

        TimeUnit.SECONDS.sleep(10);
        executorService.shutdown();
        executorService.awaitTermination(1, TimeUnit.DAYS);
    }

}