等待ExecutorService所有任务完成的最简单方法是什么?我的任务主要是计算性的,所以我只想运行大量的作业——每个核心上都有一个。现在我的设置是这样的:

ExecutorService es = Executors.newFixedThreadPool(2);
for (DataTable singleTable : uniquePhrases) {   
    es.execute(new ComputeDTask(singleTable));
}
try{
    es.wait();
} 
catch (InterruptedException e){
    e.printStackTrace();
}

ComputeDTask实现了runnable。这似乎正确地执行了任务,但代码在wait()时崩溃,并出现IllegalMonitorStateException。这是奇怪的,因为我玩了一些玩具的例子,它似乎工作。

uniquePhrases包含数万个元素。我应该用另一种方法吗?我在寻找一些尽可能简单的东西


当前回答

如果您希望等待所有任务完成,请使用关机方法而不是等待。然后在后面加上awaitterminate。

此外,您还可以使用Runtime。availableProcessors来获取硬件线程的数量,以便正确地初始化线程池。

其他回答

如果你想等待执行器服务完成执行,调用shutdown(),然后调用awaitterminate (units, unitType),例如awaitterminate (1, MINUTE)。ExecutorService不会在它自己的监视器上阻塞,所以你不能使用wait等。

我还有一种情况,我有一组文档要爬取。我从一个应该被处理的初始“种子”文档开始,该文档包含到其他也应该被处理的文档的链接,等等。

在我的主程序中,我只想写如下所示的东西,其中Crawler控制了一堆线程。

Crawler c = new Crawler();
c.schedule(seedDocument); 
c.waitUntilCompletion()

如果我想在一棵树上导航,同样的情况也会发生;我将插入根节点,每个节点的处理器将根据需要向队列中添加子节点,一堆线程将处理树中的所有节点,直到没有更多的节点为止。

我在JVM中找不到任何东西,我觉得这有点令人惊讶。所以我写了一个类ThreadPool,一个可以直接使用或子类添加适合域的方法,例如schedule(Document)。希望能有所帮助!

ThreadPool Javadoc | Maven

有几种方法。

您可以调用第一个ExecutorService。shutdown,然后ExecutorService。返回:

如果该执行程序终止,则为True;如果超时,则为false 在终止之前

So:

有一个叫awaitterminate的函数,但是必须有一个超时 它提供了。这并不能保证当它返回全部时 任务早就完成了。有办法实现这个目标吗?

你只需要在循环中调用awaitterminate。

使用awaitTermination:

这个实现的完整示例:

public class WaitForAllToEnd {

    public static void main(String[] args) throws InterruptedException {
        final int total_threads = 4;
        ExecutorService executor = Executors.newFixedThreadPool(total_threads);
        for(int i = 0; i < total_threads; i++){
            executor.execute(parallelWork(100 + i * 100));
        }

        int count = 0;

        // This is the relevant part
        // Chose the delay most appropriate for your use case
        executor.shutdown();
        while (!executor.awaitTermination(100, TimeUnit.MILLISECONDS)) {
            System.out.println("Waiting "+ count);
            count++;
        }
    }

    private static Runnable parallelWork(long sleepMillis) {
        return () -> {
            try {
                Thread.sleep(sleepMillis);
            } catch (InterruptedException e) {
                // Do Something
            }
            System.out.println("I am Thread : " + Thread.currentThread().getId());
        };
    }
}

使用CountDownLatch:

另一种选择是创建CountDownLatch,其计数等于并行任务的数量。每个线程调用countDownLatch.countDown();,而主线程调用countDownLatch.await();。

这个实现的完整示例:

public class WaitForAllToEnd {

    public static void main(String[] args) throws InterruptedException {
        final int total_threads = 4;
        CountDownLatch countDownLatch = new CountDownLatch(total_threads);
        ExecutorService executor = Executors.newFixedThreadPool(total_threads);
        for(int i = 0; i < total_threads; i++){
            executor.execute(parallelWork(100 + i * 100, countDownLatch));
        }
        countDownLatch.await();
        System.out.println("Exit");
        executor.shutdown();
    }

    private static Runnable parallelWork(long sleepMillis, CountDownLatch countDownLatch) {
        return () -> {
            try {
                Thread.sleep(sleepMillis);
            } catch (InterruptedException e) {
                // Do Something
            }
            System.out.println("I am Thread : " + Thread.currentThread().getId());
            countDownLatch.countDown();
        };
    }
}

使用CyclicBarrier:

另一种方法是使用循环屏障

public class WaitForAllToEnd {

    public static void main(String[] args) throws InterruptedException, BrokenBarrierException {
        final int total_threads = 4;
        CyclicBarrier barrier = new CyclicBarrier(total_threads+ 1);
        ExecutorService executor = Executors.newFixedThreadPool(total_threads);
        for(int i = 0; i < total_threads; i++){
            executor.execute(parallelWork(100 + i * 100, barrier));
        }
        barrier.await();
        System.out.println("Exit");
        executor.shutdown();
    }

    private static Runnable parallelWork(long sleepMillis, CyclicBarrier barrier) {
        return () -> {
            try {
                Thread.sleep(sleepMillis);
            } catch (InterruptedException e) {
                // Do Something
            }
            System.out.println("I am Thread : " + Thread.currentThread().getId());
            try {
                barrier.await();
            } catch (InterruptedException | BrokenBarrierException e) {
              // Do something
            }
        };
    }
}

还有其他方法,但这些方法需要对您的初始需求进行更改,即:

当任务提交时,如何等待所有任务完成 使用ExecutorService.execute()。

您可以使用ExecutorService。它将执行所有任务并等待所有线程完成它们的任务。

这是完整的javadoc

您还可以使用此方法的重载版本来指定超时时间。

下面是ExecutorService.invokeAll的示例代码

public class Test {
    public static void main(String[] args) throws InterruptedException, ExecutionException {
        ExecutorService service = Executors.newFixedThreadPool(3);
        List<Callable<String>> taskList = new ArrayList<>();
        taskList.add(new Task1());
        taskList.add(new Task2());
        List<Future<String>> results = service.invokeAll(taskList);
        for (Future<String> f : results) {
            System.out.println(f.get());
        }
    }

}

class Task1 implements Callable<String> {
    @Override
    public String call() throws Exception {
        try {
            Thread.sleep(2000);
            return "Task 1 done";
        } catch (Exception e) {
            e.printStackTrace();
            return " error in task1";
        }
    }
}

class Task2 implements Callable<String> {
    @Override
    public String call() throws Exception {
        try {
            Thread.sleep(3000);
            return "Task 2 done";
        } catch (Exception e) {
            e.printStackTrace();
            return " error in task2";
        }
    }
}

你可以在一定的时间间隔内等待任务完成:

int maxSecondsPerComputeDTask = 20;
try {
    while (!es.awaitTermination(uniquePhrases.size() * maxSecondsPerComputeDTask, TimeUnit.SECONDS)) {
        // consider giving up with a 'break' statement under certain conditions
    }
} catch (InterruptedException e) {
    throw new RuntimeException(e);    
}

或者您可以使用ExecutorService.submit(Runnable)并收集它返回的Future对象,然后依次对每个对象调用get()以等待它们完成。

ExecutorService es = Executors.newFixedThreadPool(2);
Collection<Future<?>> futures = new LinkedList<<Future<?>>();
for (DataTable singleTable : uniquePhrases) {
    futures.add(es.submit(new ComputeDTask(singleTable)));
}
for (Future<?> future : futures) {
   try {
       future.get();
   } catch (InterruptedException e) {
       throw new RuntimeException(e);
   } catch (ExecutionException e) {
       throw new RuntimeException(e);
   }
}

正确处理InterruptedException非常重要。它可以让您或库的用户安全地结束一个漫长的进程。