等待ExecutorService所有任务完成的最简单方法是什么?我的任务主要是计算性的,所以我只想运行大量的作业——每个核心上都有一个。现在我的设置是这样的:

ExecutorService es = Executors.newFixedThreadPool(2);
for (DataTable singleTable : uniquePhrases) {   
    es.execute(new ComputeDTask(singleTable));
}
try{
    es.wait();
} 
catch (InterruptedException e){
    e.printStackTrace();
}

ComputeDTask实现了runnable。这似乎正确地执行了任务,但代码在wait()时崩溃,并出现IllegalMonitorStateException。这是奇怪的,因为我玩了一些玩具的例子,它似乎工作。

uniquePhrases包含数万个元素。我应该用另一种方法吗?我在寻找一些尽可能简单的东西


当前回答

在集合中添加所有线程并使用invokeAll提交。 如果您可以使用ExecutorService的invokeAll方法,那么JVM在所有线程完成之前不会进行下一行。

这里有一个很好的例子: 通过ExecutorService调用所有

其他回答

有几种方法。

您可以调用第一个ExecutorService。shutdown,然后ExecutorService。返回:

如果该执行程序终止,则为True;如果超时,则为false 在终止之前

So:

有一个叫awaitterminate的函数,但是必须有一个超时 它提供了。这并不能保证当它返回全部时 任务早就完成了。有办法实现这个目标吗?

你只需要在循环中调用awaitterminate。

使用awaitTermination:

这个实现的完整示例:

public class WaitForAllToEnd {

    public static void main(String[] args) throws InterruptedException {
        final int total_threads = 4;
        ExecutorService executor = Executors.newFixedThreadPool(total_threads);
        for(int i = 0; i < total_threads; i++){
            executor.execute(parallelWork(100 + i * 100));
        }

        int count = 0;

        // This is the relevant part
        // Chose the delay most appropriate for your use case
        executor.shutdown();
        while (!executor.awaitTermination(100, TimeUnit.MILLISECONDS)) {
            System.out.println("Waiting "+ count);
            count++;
        }
    }

    private static Runnable parallelWork(long sleepMillis) {
        return () -> {
            try {
                Thread.sleep(sleepMillis);
            } catch (InterruptedException e) {
                // Do Something
            }
            System.out.println("I am Thread : " + Thread.currentThread().getId());
        };
    }
}

使用CountDownLatch:

另一种选择是创建CountDownLatch,其计数等于并行任务的数量。每个线程调用countDownLatch.countDown();,而主线程调用countDownLatch.await();。

这个实现的完整示例:

public class WaitForAllToEnd {

    public static void main(String[] args) throws InterruptedException {
        final int total_threads = 4;
        CountDownLatch countDownLatch = new CountDownLatch(total_threads);
        ExecutorService executor = Executors.newFixedThreadPool(total_threads);
        for(int i = 0; i < total_threads; i++){
            executor.execute(parallelWork(100 + i * 100, countDownLatch));
        }
        countDownLatch.await();
        System.out.println("Exit");
        executor.shutdown();
    }

    private static Runnable parallelWork(long sleepMillis, CountDownLatch countDownLatch) {
        return () -> {
            try {
                Thread.sleep(sleepMillis);
            } catch (InterruptedException e) {
                // Do Something
            }
            System.out.println("I am Thread : " + Thread.currentThread().getId());
            countDownLatch.countDown();
        };
    }
}

使用CyclicBarrier:

另一种方法是使用循环屏障

public class WaitForAllToEnd {

    public static void main(String[] args) throws InterruptedException, BrokenBarrierException {
        final int total_threads = 4;
        CyclicBarrier barrier = new CyclicBarrier(total_threads+ 1);
        ExecutorService executor = Executors.newFixedThreadPool(total_threads);
        for(int i = 0; i < total_threads; i++){
            executor.execute(parallelWork(100 + i * 100, barrier));
        }
        barrier.await();
        System.out.println("Exit");
        executor.shutdown();
    }

    private static Runnable parallelWork(long sleepMillis, CyclicBarrier barrier) {
        return () -> {
            try {
                Thread.sleep(sleepMillis);
            } catch (InterruptedException e) {
                // Do Something
            }
            System.out.println("I am Thread : " + Thread.currentThread().getId());
            try {
                barrier.await();
            } catch (InterruptedException | BrokenBarrierException e) {
              // Do something
            }
        };
    }
}

还有其他方法,但这些方法需要对您的初始需求进行更改,即:

当任务提交时,如何等待所有任务完成 使用ExecutorService.execute()。

如果您的目标不是等待ExecutorService中的所有任务完成,而是等待特定批次的任务完成,那么您可以使用CompletionService——具体地说,ExecutorCompletionService。

其思想是创建一个包装Executor的ExecutorCompletionService,通过CompletionService提交一些已知数量的任务,然后使用take()(阻塞)或poll()(不阻塞)从完成队列中提取相同数量的结果。一旦绘制了与提交的任务相对应的所有预期结果,就知道它们都完成了。

让我再说一遍,因为从接口上看不太明显:你必须知道有多少东西放入CompletionService,才能知道有多少东西要取出来。这对take()方法尤其重要:调用它一次太多次,它将阻塞您的调用线程,直到其他线程向同一CompletionService提交另一个作业。

在《Java并发实践》一书中有一些例子展示了如何使用CompletionService。

如果您希望等待所有任务完成,请使用关机方法而不是等待。然后在后面加上awaitterminate。

此外,您还可以使用Runtime。availableProcessors来获取硬件线程的数量,以便正确地初始化线程池。

IllegalMonitorStateException的根本原因:

抛出该异常,表示线程试图等待对象的监视器,或通知其他线程等待对象的监视器而不拥有指定的监视器。

在代码中,您刚刚在ExecutorService上调用了wait(),但没有拥有锁。

下面的代码将修复IllegalMonitorStateException

try 
{
    synchronized(es){
        es.wait(); // Add some condition before you call wait()
    }
} 

遵循以下方法中的一种来等待所有已提交给ExecutorService的任务的完成。

Iterate through all Future tasks from submit on ExecutorService and check the status with blocking call get() on Future object Using invokeAll on ExecutorService Using CountDownLatch Using ForkJoinPool or newWorkStealingPool of Executors(since java 8) Shutdown the pool as recommended in oracle documentation page void shutdownAndAwaitTermination(ExecutorService pool) { pool.shutdown(); // Disable new tasks from being submitted try { // Wait a while for existing tasks to terminate if (!pool.awaitTermination(60, TimeUnit.SECONDS)) { pool.shutdownNow(); // Cancel currently executing tasks // Wait a while for tasks to respond to being cancelled if (!pool.awaitTermination(60, TimeUnit.SECONDS)) System.err.println("Pool did not terminate"); } } catch (InterruptedException ie) { // (Re-)Cancel if current thread also interrupted pool.shutdownNow(); // Preserve interrupt status Thread.currentThread().interrupt(); } If you want to gracefully wait for all tasks for completion when you are using option 5 instead of options 1 to 4, change if (!pool.awaitTermination(60, TimeUnit.SECONDS)) { to a while(condition) which checks for every 1 minute.

这个怎么样?

Object lock = new Object();
CountDownLatch cdl = new CountDownLatch(threadNum);
for (int i = 0; i < threadNum; i++) {
    executorService.execute(() -> {

        synchronized (lock) {
            cdl.countDown();
            try {
                lock.wait();
            } catch (InterruptedException e) {
                Thread.currentThread().interrupt();
            }
        }
    });
}
cdl.await();
synchronized (lock) {
    lock.notifyAll();
}

如果您没有向ExecutorService添加新任务,这可能会等待所有当前任务完成