是否可以为Java 8并行流指定一个自定义线程池?我到处都找不到。

假设我有一个服务器应用程序,我想使用并行流。但是这个应用程序很大,而且是多线程的,所以我想对它进行划分。我不希望在来自另一个模块的applicationblock任务的一个模块中运行缓慢的任务。

如果我不能为不同的模块使用不同的线程池,这意味着我不能在大多数实际情况下安全地使用并行流。

试试下面的例子。有一些CPU密集型任务在单独的线程中执行。 任务利用并行流。第一个任务中断,因此每一步花费1秒(通过线程睡眠模拟)。问题是其他线程卡住,等待中断的任务完成。这是一个虚构的例子,但是想象一下servlet应用程序和某人向共享fork连接池提交了一个长时间运行的任务。

public class ParallelTest {
    public static void main(String[] args) throws InterruptedException {
        ExecutorService es = Executors.newCachedThreadPool();

        es.execute(() -> runTask(1000)); //incorrect task
        es.execute(() -> runTask(0));
        es.execute(() -> runTask(0));
        es.execute(() -> runTask(0));
        es.execute(() -> runTask(0));
        es.execute(() -> runTask(0));


        es.shutdown();
        es.awaitTermination(60, TimeUnit.SECONDS);
    }

    private static void runTask(int delay) {
        range(1, 1_000_000).parallel().filter(ParallelTest::isPrime).peek(i -> Utils.sleep(delay)).max()
                .ifPresent(max -> System.out.println(Thread.currentThread() + " " + max));
    }

    public static boolean isPrime(long n) {
        return n > 1 && rangeClosed(2, (long) sqrt(n)).noneMatch(divisor -> n % divisor == 0);
    }
}

当前回答

注意: JDK 10中似乎实现了一个修复,以确保自定义线程池使用预期的线程数量。

自定义ForkJoinPool中的并行流执行应该遵循并行性 https://bugs.openjdk.java.net/browse/JDK-8190974

其他回答

注意: JDK 10中似乎实现了一个修复,以确保自定义线程池使用预期的线程数量。

自定义ForkJoinPool中的并行流执行应该遵循并行性 https://bugs.openjdk.java.net/browse/JDK-8190974

如果你不想依赖于实现技巧,总有一种方法可以通过实现将映射和收集语义结合起来的自定义收集器来实现相同的目标……并且你不会局限于ForkJoinPool:

list.stream()
  .collect(parallel(i -> process(i), executor, 4))
  .join()

幸运的是,它已经在Maven Central上完成了: http://github.com/pivovarit/parallel-collectors

免责声明:是我写的,并为此负责。

要测量实际使用的线程数,可以检查Thread.activeCount():

    Runnable r = () -> IntStream
            .range(-42, +42)
            .parallel()
            .map(i -> Thread.activeCount())
            .max()
            .ifPresent(System.out::println);

    ForkJoinPool.commonPool().submit(r).join();
    new ForkJoinPool(42).submit(r).join();

这可以在4核CPU上产生如下输出:

5 // common pool
23 // custom pool

如果没有.parallel(),它会给出:

3 // common pool
4 // custom pool

如果你不需要自定义线程池,但你想要限制并发任务的数量,你可以使用:

List<Path> paths = List.of("/path/file1.csv", "/path/file2.csv", "/path/file3.csv").stream().map(e -> Paths.get(e)).collect(toList());
List<List<Path>> partitions = Lists.partition(paths, 4); // Guava method

partitions.forEach(group -> group.parallelStream().forEach(csvFilePath -> {
       // do your processing   
}));

(重复的问题被锁定了,所以请原谅我在这里)

除了在你自己的forkJoinPool中触发并行计算之外,你还可以将这个forkJoinPool传递给CompletableFuture。像下面这样的async方法:

ForkJoinPool forkJoinPool = new ForkJoinPool(2);
CompletableFuture<List<Integer>> primes = CompletableFuture.supplyAsync(() ->
    //parallel task here, for example
    range(1, 1_000_000).parallel().filter(PrimesPrint::isPrime).collect(toList()), 
    forkJoinPool
);