是否可以为Java 8并行流指定一个自定义线程池?我到处都找不到。

假设我有一个服务器应用程序,我想使用并行流。但是这个应用程序很大,而且是多线程的,所以我想对它进行划分。我不希望在来自另一个模块的applicationblock任务的一个模块中运行缓慢的任务。

如果我不能为不同的模块使用不同的线程池,这意味着我不能在大多数实际情况下安全地使用并行流。

试试下面的例子。有一些CPU密集型任务在单独的线程中执行。 任务利用并行流。第一个任务中断,因此每一步花费1秒(通过线程睡眠模拟)。问题是其他线程卡住,等待中断的任务完成。这是一个虚构的例子,但是想象一下servlet应用程序和某人向共享fork连接池提交了一个长时间运行的任务。

public class ParallelTest {
    public static void main(String[] args) throws InterruptedException {
        ExecutorService es = Executors.newCachedThreadPool();

        es.execute(() -> runTask(1000)); //incorrect task
        es.execute(() -> runTask(0));
        es.execute(() -> runTask(0));
        es.execute(() -> runTask(0));
        es.execute(() -> runTask(0));
        es.execute(() -> runTask(0));


        es.shutdown();
        es.awaitTermination(60, TimeUnit.SECONDS);
    }

    private static void runTask(int delay) {
        range(1, 1_000_000).parallel().filter(ParallelTest::isPrime).peek(i -> Utils.sleep(delay)).max()
                .ifPresent(max -> System.out.println(Thread.currentThread() + " " + max));
    }

    public static boolean isPrime(long n) {
        return n > 1 && rangeClosed(2, (long) sqrt(n)).noneMatch(divisor -> n % divisor == 0);
    }
}

当前回答

到目前为止,我使用了这个问题的答案中描述的解决方案。现在,我想出了一个叫做并行流支持的小库:

ForkJoinPool pool = new ForkJoinPool(NR_OF_THREADS);
ParallelIntStreamSupport.range(1, 1_000_000, pool)
    .filter(PrimesPrint::isPrime)
    .collect(toList())

但是正如@PabloMatiasGomez在评论中指出的那样,并行流的分割机制存在缺陷,它严重依赖于公共池的大小。参见HashSet中的并行流不能并行运行。

我使用这个解决方案只是为了对不同类型的工作有单独的池,但即使我不使用它,我也不能将公共池的大小设置为1。

其他回答

如果你不介意使用第三方库,使用cyclops-react,你可以在同一个管道中混合顺序流和并行流,并提供自定义ForkJoinPools。例如

 ReactiveSeq.range(1, 1_000_000)
            .foldParallel(new ForkJoinPool(10),
                          s->s.filter(i->true)
                              .peek(i->System.out.println("Thread " + Thread.currentThread().getId()))
                              .max(Comparator.naturalOrder()));

或者希望继续在顺序流中处理

 ReactiveSeq.range(1, 1_000_000)
            .parallel(new ForkJoinPool(10),
                      s->s.filter(i->true)
                          .peek(i->System.out.println("Thread " + Thread.currentThread().getId())))
            .map(this::processSequentially)
            .forEach(System.out::println);

[披露我是cyclops-react的主要开发者]

(目前)公认的答案有一部分是错误的。仅仅将并行流提交给专用的fork-join-pool是不够的。在这种情况下,流将使用该池的线程以及公共fork-join-pool甚至调用线程来处理流的工作负载,这似乎取决于公共fork-join池的大小。这种行为有点奇怪,但绝对不是必需的。

为了将工作完全限制在专用池中,你必须将它封装到一个CompletableFuture中:

final int parallelism = 4;
ForkJoinPool forkJoinPool = null;
try {
  forkJoinPool = new ForkJoinPool(parallelism);
  final List<Integer> primes = CompletableFuture.supplyAsync(() -> 
        // Parallel task here, for example
        IntStream.range(1, 1_000_000).parallel()
                .filter(PrimesPrint::isPrime)
                .boxed().collect(Collectors.toList()),
    forkJoinPool)  // <- passes dedicated fork-join pool as executor
    .join();  // <- Wait for result from forkJoinPool
    System.out.println(primes);
} finally {
    if (forkJoinPool != null) {
        forkJoinPool.shutdown();
    }
}

这段代码保留了在Java 8u352和Java 17.0.1上的forkJoinPool中的所有操作。

要测量实际使用的线程数,可以检查Thread.activeCount():

    Runnable r = () -> IntStream
            .range(-42, +42)
            .parallel()
            .map(i -> Thread.activeCount())
            .max()
            .ifPresent(System.out::println);

    ForkJoinPool.commonPool().submit(r).join();
    new ForkJoinPool(42).submit(r).join();

这可以在4核CPU上产生如下输出:

5 // common pool
23 // custom pool

如果没有.parallel(),它会给出:

3 // common pool
4 // custom pool

除了在你自己的forkJoinPool中触发并行计算之外,你还可以将这个forkJoinPool传递给CompletableFuture。像下面这样的async方法:

ForkJoinPool forkJoinPool = new ForkJoinPool(2);
CompletableFuture<List<Integer>> primes = CompletableFuture.supplyAsync(() ->
    //parallel task here, for example
    range(1, 1_000_000).parallel().filter(PrimesPrint::isPrime).collect(toList()), 
    forkJoinPool
);

实际上,在特定的fork-join池中执行并行操作是有技巧的。如果您将其作为fork-join池中的任务执行,则它将停留在那里,而不使用公共池。

final int parallelism = 4;
ForkJoinPool forkJoinPool = null;
try {
    forkJoinPool = new ForkJoinPool(parallelism);
    final List<Integer> primes = forkJoinPool.submit(() ->
        // Parallel task here, for example
        IntStream.range(1, 1_000_000).parallel()
                .filter(PrimesPrint::isPrime)
                .boxed().collect(Collectors.toList())
    ).get();
    System.out.println(primes);
} catch (InterruptedException | ExecutionException e) {
    throw new RuntimeException(e);
} finally {
    if (forkJoinPool != null) {
        forkJoinPool.shutdown();
    }
}

这个技巧基于ForkJoinTask。安排在当前任务运行的池中异步执行这个任务,如果适用,或者使用ForkJoinPool.commonPool(),如果不是inForkJoinPool()"