是否可以为Java 8并行流指定一个自定义线程池?我到处都找不到。

假设我有一个服务器应用程序,我想使用并行流。但是这个应用程序很大,而且是多线程的,所以我想对它进行划分。我不希望在来自另一个模块的applicationblock任务的一个模块中运行缓慢的任务。

如果我不能为不同的模块使用不同的线程池,这意味着我不能在大多数实际情况下安全地使用并行流。

试试下面的例子。有一些CPU密集型任务在单独的线程中执行。 任务利用并行流。第一个任务中断,因此每一步花费1秒(通过线程睡眠模拟)。问题是其他线程卡住,等待中断的任务完成。这是一个虚构的例子,但是想象一下servlet应用程序和某人向共享fork连接池提交了一个长时间运行的任务。

public class ParallelTest {
    public static void main(String[] args) throws InterruptedException {
        ExecutorService es = Executors.newCachedThreadPool();

        es.execute(() -> runTask(1000)); //incorrect task
        es.execute(() -> runTask(0));
        es.execute(() -> runTask(0));
        es.execute(() -> runTask(0));
        es.execute(() -> runTask(0));
        es.execute(() -> runTask(0));


        es.shutdown();
        es.awaitTermination(60, TimeUnit.SECONDS);
    }

    private static void runTask(int delay) {
        range(1, 1_000_000).parallel().filter(ParallelTest::isPrime).peek(i -> Utils.sleep(delay)).max()
                .ifPresent(max -> System.out.println(Thread.currentThread() + " " + max));
    }

    public static boolean isPrime(long n) {
        return n > 1 && rangeClosed(2, (long) sqrt(n)).noneMatch(divisor -> n % divisor == 0);
    }
}

当前回答

下面是我如何通过编程方式设置上面提到的最大线程数标志,以及一段代码来验证该参数是否符合要求

System.setProperty("java.util.concurrent.ForkJoinPool.common.parallelism", "2");
Set<String> threadNames = Stream.iterate(0, n -> n + 1)
  .parallel()
  .limit(100000)
  .map(i -> Thread.currentThread().getName())
  .collect(Collectors.toSet());
System.out.println(threadNames);

// Output -> [ForkJoinPool.commonPool-worker-1, Test worker, ForkJoinPool.commonPool-worker-3]

其他回答

我尝试了自定义ForkJoinPool,如下所示来调整池的大小:

private static Set<String> ThreadNameSet = new HashSet<>();
private static Callable<Long> getSum() {
    List<Long> aList = LongStream.rangeClosed(0, 10_000_000).boxed().collect(Collectors.toList());
    return () -> aList.parallelStream()
            .peek((i) -> {
                String threadName = Thread.currentThread().getName();
                ThreadNameSet.add(threadName);
            })
            .reduce(0L, Long::sum);
}

private static void testForkJoinPool() {
    final int parallelism = 10;

    ForkJoinPool forkJoinPool = null;
    Long result = 0L;
    try {
        forkJoinPool = new ForkJoinPool(parallelism);
        result = forkJoinPool.submit(getSum()).get(); //this makes it an overall blocking call

    } catch (InterruptedException | ExecutionException e) {
        e.printStackTrace();
    } finally {
        if (forkJoinPool != null) {
            forkJoinPool.shutdown(); //always remember to shutdown the pool
        }
    }
    out.println(result);
    out.println(ThreadNameSet);
}

下面的输出显示池使用的线程比默认的4个要多。

50000005000000
[ForkJoinPool-1-worker-8, ForkJoinPool-1-worker-9, ForkJoinPool-1-worker-6, ForkJoinPool-1-worker-11, ForkJoinPool-1-worker-10, ForkJoinPool-1-worker-1, ForkJoinPool-1-worker-15, ForkJoinPool-1-worker-13, ForkJoinPool-1-worker-4, ForkJoinPool-1-worker-2]

但实际上有一个奇怪的地方,当我试图使用ThreadPoolExecutor实现相同的结果时,如下所示:

BlockingDeque blockingDeque = new LinkedBlockingDeque(1000);
ThreadPoolExecutor fixedSizePool = new ThreadPoolExecutor(10, 20, 60, TimeUnit.SECONDS, blockingDeque, new MyThreadFactory("my-thread"));

但我失败了。

它只会在一个新的线程中启动并行流,然后其他一切都是一样的,这再次证明并行流将使用ForkJoinPool来启动它的子线程。

(目前)公认的答案有一部分是错误的。仅仅将并行流提交给专用的fork-join-pool是不够的。在这种情况下,流将使用该池的线程以及公共fork-join-pool甚至调用线程来处理流的工作负载,这似乎取决于公共fork-join池的大小。这种行为有点奇怪,但绝对不是必需的。

为了将工作完全限制在专用池中,你必须将它封装到一个CompletableFuture中:

final int parallelism = 4;
ForkJoinPool forkJoinPool = null;
try {
  forkJoinPool = new ForkJoinPool(parallelism);
  final List<Integer> primes = CompletableFuture.supplyAsync(() -> 
        // Parallel task here, for example
        IntStream.range(1, 1_000_000).parallel()
                .filter(PrimesPrint::isPrime)
                .boxed().collect(Collectors.toList()),
    forkJoinPool)  // <- passes dedicated fork-join pool as executor
    .join();  // <- Wait for result from forkJoinPool
    System.out.println(primes);
} finally {
    if (forkJoinPool != null) {
        forkJoinPool.shutdown();
    }
}

这段代码保留了在Java 8u352和Java 17.0.1上的forkJoinPool中的所有操作。

要测量实际使用的线程数,可以检查Thread.activeCount():

    Runnable r = () -> IntStream
            .range(-42, +42)
            .parallel()
            .map(i -> Thread.activeCount())
            .max()
            .ifPresent(System.out::println);

    ForkJoinPool.commonPool().submit(r).join();
    new ForkJoinPool(42).submit(r).join();

这可以在4核CPU上产生如下输出:

5 // common pool
23 // custom pool

如果没有.parallel(),它会给出:

3 // common pool
4 // custom pool

如果你不想依赖于实现技巧,总有一种方法可以通过实现将映射和收集语义结合起来的自定义收集器来实现相同的目标……并且你不会局限于ForkJoinPool:

list.stream()
  .collect(parallel(i -> process(i), executor, 4))
  .join()

幸运的是,它已经在Maven Central上完成了: http://github.com/pivovarit/parallel-collectors

免责声明:是我写的,并为此负责。

注意: JDK 10中似乎实现了一个修复,以确保自定义线程池使用预期的线程数量。

自定义ForkJoinPool中的并行流执行应该遵循并行性 https://bugs.openjdk.java.net/browse/JDK-8190974