等待ExecutorService所有任务完成的最简单方法是什么?我的任务主要是计算性的,所以我只想运行大量的作业——每个核心上都有一个。现在我的设置是这样的:

ExecutorService es = Executors.newFixedThreadPool(2);
for (DataTable singleTable : uniquePhrases) {   
    es.execute(new ComputeDTask(singleTable));
}
try{
    es.wait();
} 
catch (InterruptedException e){
    e.printStackTrace();
}

ComputeDTask实现了runnable。这似乎正确地执行了任务,但代码在wait()时崩溃,并出现IllegalMonitorStateException。这是奇怪的,因为我玩了一些玩具的例子,它似乎工作。

uniquePhrases包含数万个元素。我应该用另一种方法吗?我在寻找一些尽可能简单的东西


当前回答

如果您的目标不是等待ExecutorService中的所有任务完成,而是等待特定批次的任务完成,那么您可以使用CompletionService——具体地说,ExecutorCompletionService。

其思想是创建一个包装Executor的ExecutorCompletionService,通过CompletionService提交一些已知数量的任务,然后使用take()(阻塞)或poll()(不阻塞)从完成队列中提取相同数量的结果。一旦绘制了与提交的任务相对应的所有预期结果,就知道它们都完成了。

让我再说一遍,因为从接口上看不太明显:你必须知道有多少东西放入CompletionService,才能知道有多少东西要取出来。这对take()方法尤其重要:调用它一次太多次,它将阻塞您的调用线程,直到其他线程向同一CompletionService提交另一个作业。

在《Java并发实践》一书中有一些例子展示了如何使用CompletionService。

其他回答

听起来好像你需要ForkJoinPool并使用全局池来执行任务。

public static void main(String[] args) {
    // the default `commonPool` should be sufficient for many cases.
    ForkJoinPool pool = ForkJoinPool.commonPool(); 
    // The root of your task that may spawn other tasks. 
    // Make sure it submits the additional tasks to the same executor that it is in.
    Runnable rootTask = new YourTask(pool); 
    pool.execute(rootTask);
    pool.awaitQuiescence(...);
    // that's it.
}

美在泳池里。awaitQuiescence方法将阻塞利用调用者的线程来执行它的任务,然后当它真的为空时返回。

如果您希望等待所有任务完成,请使用关机方法而不是等待。然后在后面加上awaitterminate。

此外,您还可以使用Runtime。availableProcessors来获取硬件线程的数量,以便正确地初始化线程池。

IllegalMonitorStateException的根本原因:

抛出该异常,表示线程试图等待对象的监视器,或通知其他线程等待对象的监视器而不拥有指定的监视器。

在代码中,您刚刚在ExecutorService上调用了wait(),但没有拥有锁。

下面的代码将修复IllegalMonitorStateException

try 
{
    synchronized(es){
        es.wait(); // Add some condition before you call wait()
    }
} 

遵循以下方法中的一种来等待所有已提交给ExecutorService的任务的完成。

Iterate through all Future tasks from submit on ExecutorService and check the status with blocking call get() on Future object Using invokeAll on ExecutorService Using CountDownLatch Using ForkJoinPool or newWorkStealingPool of Executors(since java 8) Shutdown the pool as recommended in oracle documentation page void shutdownAndAwaitTermination(ExecutorService pool) { pool.shutdown(); // Disable new tasks from being submitted try { // Wait a while for existing tasks to terminate if (!pool.awaitTermination(60, TimeUnit.SECONDS)) { pool.shutdownNow(); // Cancel currently executing tasks // Wait a while for tasks to respond to being cancelled if (!pool.awaitTermination(60, TimeUnit.SECONDS)) System.err.println("Pool did not terminate"); } } catch (InterruptedException ie) { // (Re-)Cancel if current thread also interrupted pool.shutdownNow(); // Preserve interrupt status Thread.currentThread().interrupt(); } If you want to gracefully wait for all tasks for completion when you are using option 5 instead of options 1 to 4, change if (!pool.awaitTermination(60, TimeUnit.SECONDS)) { to a while(condition) which checks for every 1 minute.

最简单的方法是使用ExecutorService.invokeAll(),它可以在一行程序中执行您想要的操作。用您的话说,您需要修改或包装ComputeDTask以实现Callable<>,这可以为您提供更大的灵活性。可能在您的应用程序中有一个有意义的Callable.call()实现,但如果不使用Executors.callable(),这里有一种包装它的方法。

ExecutorService es = Executors.newFixedThreadPool(2);
List<Callable<Object>> todo = new ArrayList<Callable<Object>>(singleTable.size());

for (DataTable singleTable: uniquePhrases) { 
    todo.add(Executors.callable(new ComputeDTask(singleTable))); 
}

List<Future<Object>> answers = es.invokeAll(todo);

正如其他人指出的那样,如果合适的话,您可以使用invokeAll()的超时版本。在这个例子中,answers将包含一堆返回null的future(参见Executors.callable()的定义)。可能您想做的是稍微重构一下,这样就可以得到一个有用的答案,或者对底层ComputeDTask的引用,但从您的示例中我无法判断。

如果不清楚,请注意,在所有任务完成之前,invokeAll()不会返回。(也就是说,如果被问到,答案集合中的所有future都会报告. isdone()。)这避免了所有的手动关机,等待终止等…并允许您在需要的情况下,在多个周期中巧妙地重用这个ExecutorService。

关于SO有几个相关的问题:

如何等待所有线程完成 从java线程返回值 invokeAll()不愿意接受一个Collection<Callable<t>> 我需要同步吗?

这些都不是严格意义上的问题,但它们确实为人们认为应该如何使用Executor/ExecutorService提供了一点色彩。

这个怎么样?

Object lock = new Object();
CountDownLatch cdl = new CountDownLatch(threadNum);
for (int i = 0; i < threadNum; i++) {
    executorService.execute(() -> {

        synchronized (lock) {
            cdl.countDown();
            try {
                lock.wait();
            } catch (InterruptedException e) {
                Thread.currentThread().interrupt();
            }
        }
    });
}
cdl.await();
synchronized (lock) {
    lock.notifyAll();
}

如果您没有向ExecutorService添加新任务,这可能会等待所有当前任务完成