等待ExecutorService所有任务完成的最简单方法是什么?我的任务主要是计算性的,所以我只想运行大量的作业——每个核心上都有一个。现在我的设置是这样的:

ExecutorService es = Executors.newFixedThreadPool(2);
for (DataTable singleTable : uniquePhrases) {   
    es.execute(new ComputeDTask(singleTable));
}
try{
    es.wait();
} 
catch (InterruptedException e){
    e.printStackTrace();
}

ComputeDTask实现了runnable。这似乎正确地执行了任务,但代码在wait()时崩溃,并出现IllegalMonitorStateException。这是奇怪的,因为我玩了一些玩具的例子,它似乎工作。

uniquePhrases包含数万个元素。我应该用另一种方法吗?我在寻找一些尽可能简单的东西


当前回答

你可以在一定的时间间隔内等待任务完成:

int maxSecondsPerComputeDTask = 20;
try {
    while (!es.awaitTermination(uniquePhrases.size() * maxSecondsPerComputeDTask, TimeUnit.SECONDS)) {
        // consider giving up with a 'break' statement under certain conditions
    }
} catch (InterruptedException e) {
    throw new RuntimeException(e);    
}

或者您可以使用ExecutorService.submit(Runnable)并收集它返回的Future对象,然后依次对每个对象调用get()以等待它们完成。

ExecutorService es = Executors.newFixedThreadPool(2);
Collection<Future<?>> futures = new LinkedList<<Future<?>>();
for (DataTable singleTable : uniquePhrases) {
    futures.add(es.submit(new ComputeDTask(singleTable)));
}
for (Future<?> future : futures) {
   try {
       future.get();
   } catch (InterruptedException e) {
       throw new RuntimeException(e);
   } catch (ExecutionException e) {
       throw new RuntimeException(e);
   }
}

正确处理InterruptedException非常重要。它可以让您或库的用户安全地结束一个漫长的进程。

其他回答

将你的任务提交到Runner中,然后像这样调用waitTillDone()方法等待:

Runner runner = Runner.runner(2);

for (DataTable singleTable : uniquePhrases) {

    runner.run(new ComputeDTask(singleTable));
}

// blocks until all tasks are finished (or failed)
runner.waitTillDone();

runner.shutdown();

要使用它,添加以下gradle/maven依赖:

更多详细信息请查看这里:https://github.com/MatejTymes/JavaFixes或这里:http://matejtymes.blogspot.com/2016/04/executor-that-notifies-you-when-task.html

一个简单的替代方法是使用线程和join。 参考:连接线程

听起来好像你需要ForkJoinPool并使用全局池来执行任务。

public static void main(String[] args) {
    // the default `commonPool` should be sufficient for many cases.
    ForkJoinPool pool = ForkJoinPool.commonPool(); 
    // The root of your task that may spawn other tasks. 
    // Make sure it submits the additional tasks to the same executor that it is in.
    Runnable rootTask = new YourTask(pool); 
    pool.execute(rootTask);
    pool.awaitQuiescence(...);
    // that's it.
}

美在泳池里。awaitQuiescence方法将阻塞利用调用者的线程来执行它的任务,然后当它真的为空时返回。

我将等待执行程序以您认为适合任务完成的指定超时终止。

 try {  
         //do stuff here 
         exe.execute(thread);
    } finally {
        exe.shutdown();
    }
    boolean result = exe.awaitTermination(4, TimeUnit.HOURS);
    if (!result)

    {
        LOGGER.error("It took more than 4 hour for the executor to stop, this shouldn't be the normal behaviour.");
    }

IllegalMonitorStateException的根本原因:

抛出该异常,表示线程试图等待对象的监视器,或通知其他线程等待对象的监视器而不拥有指定的监视器。

在代码中,您刚刚在ExecutorService上调用了wait(),但没有拥有锁。

下面的代码将修复IllegalMonitorStateException

try 
{
    synchronized(es){
        es.wait(); // Add some condition before you call wait()
    }
} 

遵循以下方法中的一种来等待所有已提交给ExecutorService的任务的完成。

Iterate through all Future tasks from submit on ExecutorService and check the status with blocking call get() on Future object Using invokeAll on ExecutorService Using CountDownLatch Using ForkJoinPool or newWorkStealingPool of Executors(since java 8) Shutdown the pool as recommended in oracle documentation page void shutdownAndAwaitTermination(ExecutorService pool) { pool.shutdown(); // Disable new tasks from being submitted try { // Wait a while for existing tasks to terminate if (!pool.awaitTermination(60, TimeUnit.SECONDS)) { pool.shutdownNow(); // Cancel currently executing tasks // Wait a while for tasks to respond to being cancelled if (!pool.awaitTermination(60, TimeUnit.SECONDS)) System.err.println("Pool did not terminate"); } } catch (InterruptedException ie) { // (Re-)Cancel if current thread also interrupted pool.shutdownNow(); // Preserve interrupt status Thread.currentThread().interrupt(); } If you want to gracefully wait for all tasks for completion when you are using option 5 instead of options 1 to 4, change if (!pool.awaitTermination(60, TimeUnit.SECONDS)) { to a while(condition) which checks for every 1 minute.