等待ExecutorService所有任务完成的最简单方法是什么?我的任务主要是计算性的,所以我只想运行大量的作业——每个核心上都有一个。现在我的设置是这样的:

ExecutorService es = Executors.newFixedThreadPool(2);
for (DataTable singleTable : uniquePhrases) {   
    es.execute(new ComputeDTask(singleTable));
}
try{
    es.wait();
} 
catch (InterruptedException e){
    e.printStackTrace();
}

ComputeDTask实现了runnable。这似乎正确地执行了任务,但代码在wait()时崩溃,并出现IllegalMonitorStateException。这是奇怪的,因为我玩了一些玩具的例子,它似乎工作。

uniquePhrases包含数万个元素。我应该用另一种方法吗?我在寻找一些尽可能简单的东西


当前回答

您可以使用ExecutorService。它将执行所有任务并等待所有线程完成它们的任务。

这是完整的javadoc

您还可以使用此方法的重载版本来指定超时时间。

下面是ExecutorService.invokeAll的示例代码

public class Test {
    public static void main(String[] args) throws InterruptedException, ExecutionException {
        ExecutorService service = Executors.newFixedThreadPool(3);
        List<Callable<String>> taskList = new ArrayList<>();
        taskList.add(new Task1());
        taskList.add(new Task2());
        List<Future<String>> results = service.invokeAll(taskList);
        for (Future<String> f : results) {
            System.out.println(f.get());
        }
    }

}

class Task1 implements Callable<String> {
    @Override
    public String call() throws Exception {
        try {
            Thread.sleep(2000);
            return "Task 1 done";
        } catch (Exception e) {
            e.printStackTrace();
            return " error in task1";
        }
    }
}

class Task2 implements Callable<String> {
    @Override
    public String call() throws Exception {
        try {
            Thread.sleep(3000);
            return "Task 2 done";
        } catch (Exception e) {
            e.printStackTrace();
            return " error in task2";
        }
    }
}

其他回答

只使用

latch = new CountDownLatch(noThreads)

在每个线程中

latch.countDown();

作为屏障

latch.await();

如果你想等待执行器服务完成执行,调用shutdown(),然后调用awaitterminate (units, unitType),例如awaitterminate (1, MINUTE)。ExecutorService不会在它自己的监视器上阻塞,所以你不能使用wait等。

如果您希望等待所有任务完成,请使用关机方法而不是等待。然后在后面加上awaitterminate。

此外,您还可以使用Runtime。availableProcessors来获取硬件线程的数量,以便正确地初始化线程池。

听起来好像你需要ForkJoinPool并使用全局池来执行任务。

public static void main(String[] args) {
    // the default `commonPool` should be sufficient for many cases.
    ForkJoinPool pool = ForkJoinPool.commonPool(); 
    // The root of your task that may spawn other tasks. 
    // Make sure it submits the additional tasks to the same executor that it is in.
    Runnable rootTask = new YourTask(pool); 
    pool.execute(rootTask);
    pool.awaitQuiescence(...);
    // that's it.
}

美在泳池里。awaitQuiescence方法将阻塞利用调用者的线程来执行它的任务,然后当它真的为空时返回。

如果您的目标不是等待ExecutorService中的所有任务完成,而是等待特定批次的任务完成,那么您可以使用CompletionService——具体地说,ExecutorCompletionService。

其思想是创建一个包装Executor的ExecutorCompletionService,通过CompletionService提交一些已知数量的任务,然后使用take()(阻塞)或poll()(不阻塞)从完成队列中提取相同数量的结果。一旦绘制了与提交的任务相对应的所有预期结果,就知道它们都完成了。

让我再说一遍,因为从接口上看不太明显:你必须知道有多少东西放入CompletionService,才能知道有多少东西要取出来。这对take()方法尤其重要:调用它一次太多次,它将阻塞您的调用线程,直到其他线程向同一CompletionService提交另一个作业。

在《Java并发实践》一书中有一些例子展示了如何使用CompletionService。