等待ExecutorService所有任务完成的最简单方法是什么?我的任务主要是计算性的,所以我只想运行大量的作业——每个核心上都有一个。现在我的设置是这样的:

ExecutorService es = Executors.newFixedThreadPool(2);
for (DataTable singleTable : uniquePhrases) {   
    es.execute(new ComputeDTask(singleTable));
}
try{
    es.wait();
} 
catch (InterruptedException e){
    e.printStackTrace();
}

ComputeDTask实现了runnable。这似乎正确地执行了任务,但代码在wait()时崩溃,并出现IllegalMonitorStateException。这是奇怪的,因为我玩了一些玩具的例子,它似乎工作。

uniquePhrases包含数万个元素。我应该用另一种方法吗?我在寻找一些尽可能简单的东西


当前回答

一个简单的替代方法是使用线程和join。 参考:连接线程

其他回答

我还有一种情况,我有一组文档要爬取。我从一个应该被处理的初始“种子”文档开始,该文档包含到其他也应该被处理的文档的链接,等等。

在我的主程序中,我只想写如下所示的东西,其中Crawler控制了一堆线程。

Crawler c = new Crawler();
c.schedule(seedDocument); 
c.waitUntilCompletion()

如果我想在一棵树上导航,同样的情况也会发生;我将插入根节点,每个节点的处理器将根据需要向队列中添加子节点,一堆线程将处理树中的所有节点,直到没有更多的节点为止。

我在JVM中找不到任何东西,我觉得这有点令人惊讶。所以我写了一个类ThreadPool,一个可以直接使用或子类添加适合域的方法,例如schedule(Document)。希望能有所帮助!

ThreadPool Javadoc | Maven

IllegalMonitorStateException的根本原因:

抛出该异常,表示线程试图等待对象的监视器,或通知其他线程等待对象的监视器而不拥有指定的监视器。

在代码中,您刚刚在ExecutorService上调用了wait(),但没有拥有锁。

下面的代码将修复IllegalMonitorStateException

try 
{
    synchronized(es){
        es.wait(); // Add some condition before you call wait()
    }
} 

遵循以下方法中的一种来等待所有已提交给ExecutorService的任务的完成。

Iterate through all Future tasks from submit on ExecutorService and check the status with blocking call get() on Future object Using invokeAll on ExecutorService Using CountDownLatch Using ForkJoinPool or newWorkStealingPool of Executors(since java 8) Shutdown the pool as recommended in oracle documentation page void shutdownAndAwaitTermination(ExecutorService pool) { pool.shutdown(); // Disable new tasks from being submitted try { // Wait a while for existing tasks to terminate if (!pool.awaitTermination(60, TimeUnit.SECONDS)) { pool.shutdownNow(); // Cancel currently executing tasks // Wait a while for tasks to respond to being cancelled if (!pool.awaitTermination(60, TimeUnit.SECONDS)) System.err.println("Pool did not terminate"); } } catch (InterruptedException ie) { // (Re-)Cancel if current thread also interrupted pool.shutdownNow(); // Preserve interrupt status Thread.currentThread().interrupt(); } If you want to gracefully wait for all tasks for completion when you are using option 5 instead of options 1 to 4, change if (!pool.awaitTermination(60, TimeUnit.SECONDS)) { to a while(condition) which checks for every 1 minute.

如果您的目标不是等待ExecutorService中的所有任务完成,而是等待特定批次的任务完成,那么您可以使用CompletionService——具体地说,ExecutorCompletionService。

其思想是创建一个包装Executor的ExecutorCompletionService,通过CompletionService提交一些已知数量的任务,然后使用take()(阻塞)或poll()(不阻塞)从完成队列中提取相同数量的结果。一旦绘制了与提交的任务相对应的所有预期结果,就知道它们都完成了。

让我再说一遍,因为从接口上看不太明显:你必须知道有多少东西放入CompletionService,才能知道有多少东西要取出来。这对take()方法尤其重要:调用它一次太多次,它将阻塞您的调用线程,直到其他线程向同一CompletionService提交另一个作业。

在《Java并发实践》一书中有一些例子展示了如何使用CompletionService。

在集合中添加所有线程并使用invokeAll提交。 如果您可以使用ExecutorService的invokeAll方法,那么JVM在所有线程完成之前不会进行下一行。

这里有一个很好的例子: 通过ExecutorService调用所有

最简单的方法是使用ExecutorService.invokeAll(),它可以在一行程序中执行您想要的操作。用您的话说,您需要修改或包装ComputeDTask以实现Callable<>,这可以为您提供更大的灵活性。可能在您的应用程序中有一个有意义的Callable.call()实现,但如果不使用Executors.callable(),这里有一种包装它的方法。

ExecutorService es = Executors.newFixedThreadPool(2);
List<Callable<Object>> todo = new ArrayList<Callable<Object>>(singleTable.size());

for (DataTable singleTable: uniquePhrases) { 
    todo.add(Executors.callable(new ComputeDTask(singleTable))); 
}

List<Future<Object>> answers = es.invokeAll(todo);

正如其他人指出的那样,如果合适的话,您可以使用invokeAll()的超时版本。在这个例子中,answers将包含一堆返回null的future(参见Executors.callable()的定义)。可能您想做的是稍微重构一下,这样就可以得到一个有用的答案,或者对底层ComputeDTask的引用,但从您的示例中我无法判断。

如果不清楚,请注意,在所有任务完成之前,invokeAll()不会返回。(也就是说,如果被问到,答案集合中的所有future都会报告. isdone()。)这避免了所有的手动关机,等待终止等…并允许您在需要的情况下,在多个周期中巧妙地重用这个ExecutorService。

关于SO有几个相关的问题:

如何等待所有线程完成 从java线程返回值 invokeAll()不愿意接受一个Collection<Callable<t>> 我需要同步吗?

这些都不是严格意义上的问题,但它们确实为人们认为应该如何使用Executor/ExecutorService提供了一点色彩。