等待ExecutorService所有任务完成的最简单方法是什么?我的任务主要是计算性的,所以我只想运行大量的作业——每个核心上都有一个。现在我的设置是这样的:

ExecutorService es = Executors.newFixedThreadPool(2);
for (DataTable singleTable : uniquePhrases) {   
    es.execute(new ComputeDTask(singleTable));
}
try{
    es.wait();
} 
catch (InterruptedException e){
    e.printStackTrace();
}

ComputeDTask实现了runnable。这似乎正确地执行了任务,但代码在wait()时崩溃,并出现IllegalMonitorStateException。这是奇怪的,因为我玩了一些玩具的例子,它似乎工作。

uniquePhrases包含数万个元素。我应该用另一种方法吗?我在寻找一些尽可能简单的东西


当前回答

如果你想等待执行器服务完成执行,调用shutdown(),然后调用awaitterminate (units, unitType),例如awaitterminate (1, MINUTE)。ExecutorService不会在它自己的监视器上阻塞,所以你不能使用wait等。

其他回答

我将等待执行程序以您认为适合任务完成的指定超时终止。

 try {  
         //do stuff here 
         exe.execute(thread);
    } finally {
        exe.shutdown();
    }
    boolean result = exe.awaitTermination(4, TimeUnit.HOURS);
    if (!result)

    {
        LOGGER.error("It took more than 4 hour for the executor to stop, this shouldn't be the normal behaviour.");
    }

最简单的方法是使用ExecutorService.invokeAll(),它可以在一行程序中执行您想要的操作。用您的话说,您需要修改或包装ComputeDTask以实现Callable<>,这可以为您提供更大的灵活性。可能在您的应用程序中有一个有意义的Callable.call()实现,但如果不使用Executors.callable(),这里有一种包装它的方法。

ExecutorService es = Executors.newFixedThreadPool(2);
List<Callable<Object>> todo = new ArrayList<Callable<Object>>(singleTable.size());

for (DataTable singleTable: uniquePhrases) { 
    todo.add(Executors.callable(new ComputeDTask(singleTable))); 
}

List<Future<Object>> answers = es.invokeAll(todo);

正如其他人指出的那样,如果合适的话,您可以使用invokeAll()的超时版本。在这个例子中,answers将包含一堆返回null的future(参见Executors.callable()的定义)。可能您想做的是稍微重构一下,这样就可以得到一个有用的答案,或者对底层ComputeDTask的引用,但从您的示例中我无法判断。

如果不清楚,请注意,在所有任务完成之前,invokeAll()不会返回。(也就是说,如果被问到,答案集合中的所有future都会报告. isdone()。)这避免了所有的手动关机,等待终止等…并允许您在需要的情况下,在多个周期中巧妙地重用这个ExecutorService。

关于SO有几个相关的问题:

如何等待所有线程完成 从java线程返回值 invokeAll()不愿意接受一个Collection<Callable<t>> 我需要同步吗?

这些都不是严格意义上的问题,但它们确实为人们认为应该如何使用Executor/ExecutorService提供了一点色彩。

听起来好像你需要ForkJoinPool并使用全局池来执行任务。

public static void main(String[] args) {
    // the default `commonPool` should be sufficient for many cases.
    ForkJoinPool pool = ForkJoinPool.commonPool(); 
    // The root of your task that may spawn other tasks. 
    // Make sure it submits the additional tasks to the same executor that it is in.
    Runnable rootTask = new YourTask(pool); 
    pool.execute(rootTask);
    pool.awaitQuiescence(...);
    // that's it.
}

美在泳池里。awaitQuiescence方法将阻塞利用调用者的线程来执行它的任务,然后当它真的为空时返回。

有几种方法。

您可以调用第一个ExecutorService。shutdown,然后ExecutorService。返回:

如果该执行程序终止,则为True;如果超时,则为false 在终止之前

So:

有一个叫awaitterminate的函数,但是必须有一个超时 它提供了。这并不能保证当它返回全部时 任务早就完成了。有办法实现这个目标吗?

你只需要在循环中调用awaitterminate。

使用awaitTermination:

这个实现的完整示例:

public class WaitForAllToEnd {

    public static void main(String[] args) throws InterruptedException {
        final int total_threads = 4;
        ExecutorService executor = Executors.newFixedThreadPool(total_threads);
        for(int i = 0; i < total_threads; i++){
            executor.execute(parallelWork(100 + i * 100));
        }

        int count = 0;

        // This is the relevant part
        // Chose the delay most appropriate for your use case
        executor.shutdown();
        while (!executor.awaitTermination(100, TimeUnit.MILLISECONDS)) {
            System.out.println("Waiting "+ count);
            count++;
        }
    }

    private static Runnable parallelWork(long sleepMillis) {
        return () -> {
            try {
                Thread.sleep(sleepMillis);
            } catch (InterruptedException e) {
                // Do Something
            }
            System.out.println("I am Thread : " + Thread.currentThread().getId());
        };
    }
}

使用CountDownLatch:

另一种选择是创建CountDownLatch,其计数等于并行任务的数量。每个线程调用countDownLatch.countDown();,而主线程调用countDownLatch.await();。

这个实现的完整示例:

public class WaitForAllToEnd {

    public static void main(String[] args) throws InterruptedException {
        final int total_threads = 4;
        CountDownLatch countDownLatch = new CountDownLatch(total_threads);
        ExecutorService executor = Executors.newFixedThreadPool(total_threads);
        for(int i = 0; i < total_threads; i++){
            executor.execute(parallelWork(100 + i * 100, countDownLatch));
        }
        countDownLatch.await();
        System.out.println("Exit");
        executor.shutdown();
    }

    private static Runnable parallelWork(long sleepMillis, CountDownLatch countDownLatch) {
        return () -> {
            try {
                Thread.sleep(sleepMillis);
            } catch (InterruptedException e) {
                // Do Something
            }
            System.out.println("I am Thread : " + Thread.currentThread().getId());
            countDownLatch.countDown();
        };
    }
}

使用CyclicBarrier:

另一种方法是使用循环屏障

public class WaitForAllToEnd {

    public static void main(String[] args) throws InterruptedException, BrokenBarrierException {
        final int total_threads = 4;
        CyclicBarrier barrier = new CyclicBarrier(total_threads+ 1);
        ExecutorService executor = Executors.newFixedThreadPool(total_threads);
        for(int i = 0; i < total_threads; i++){
            executor.execute(parallelWork(100 + i * 100, barrier));
        }
        barrier.await();
        System.out.println("Exit");
        executor.shutdown();
    }

    private static Runnable parallelWork(long sleepMillis, CyclicBarrier barrier) {
        return () -> {
            try {
                Thread.sleep(sleepMillis);
            } catch (InterruptedException e) {
                // Do Something
            }
            System.out.println("I am Thread : " + Thread.currentThread().getId());
            try {
                barrier.await();
            } catch (InterruptedException | BrokenBarrierException e) {
              // Do something
            }
        };
    }
}

还有其他方法,但这些方法需要对您的初始需求进行更改,即:

当任务提交时,如何等待所有任务完成 使用ExecutorService.execute()。

将你的任务提交到Runner中,然后像这样调用waitTillDone()方法等待:

Runner runner = Runner.runner(2);

for (DataTable singleTable : uniquePhrases) {

    runner.run(new ComputeDTask(singleTable));
}

// blocks until all tasks are finished (or failed)
runner.waitTillDone();

runner.shutdown();

要使用它,添加以下gradle/maven依赖:

更多详细信息请查看这里:https://github.com/MatejTymes/JavaFixes或这里:http://matejtymes.blogspot.com/2016/04/executor-that-notifies-you-when-task.html