是否可以为Java 8并行流指定一个自定义线程池?我到处都找不到。

假设我有一个服务器应用程序,我想使用并行流。但是这个应用程序很大,而且是多线程的,所以我想对它进行划分。我不希望在来自另一个模块的applicationblock任务的一个模块中运行缓慢的任务。

如果我不能为不同的模块使用不同的线程池,这意味着我不能在大多数实际情况下安全地使用并行流。

试试下面的例子。有一些CPU密集型任务在单独的线程中执行。 任务利用并行流。第一个任务中断,因此每一步花费1秒(通过线程睡眠模拟)。问题是其他线程卡住,等待中断的任务完成。这是一个虚构的例子,但是想象一下servlet应用程序和某人向共享fork连接池提交了一个长时间运行的任务。

public class ParallelTest {
    public static void main(String[] args) throws InterruptedException {
        ExecutorService es = Executors.newCachedThreadPool();

        es.execute(() -> runTask(1000)); //incorrect task
        es.execute(() -> runTask(0));
        es.execute(() -> runTask(0));
        es.execute(() -> runTask(0));
        es.execute(() -> runTask(0));
        es.execute(() -> runTask(0));


        es.shutdown();
        es.awaitTermination(60, TimeUnit.SECONDS);
    }

    private static void runTask(int delay) {
        range(1, 1_000_000).parallel().filter(ParallelTest::isPrime).peek(i -> Utils.sleep(delay)).max()
                .ifPresent(max -> System.out.println(Thread.currentThread() + " " + max));
    }

    public static boolean isPrime(long n) {
        return n > 1 && rangeClosed(2, (long) sqrt(n)).noneMatch(divisor -> n % divisor == 0);
    }
}

当前回答

原来的解决方案(设置ForkJoinPool公共并行性属性)不再有效。看看原始答案中的链接,打破这一点的更新已经被回移植到Java 8。正如链接线程中提到的,这个解决方案并不能保证永远有效。基于此,解决方案是forkjoinpool。提交接受答案中讨论的.get解决方案。我认为后端口修复了这个解决方案的不可靠性。

ForkJoinPool fjpool = new ForkJoinPool(10);
System.out.println("stream.parallel");
IntStream range = IntStream.range(0, 20);
fjpool.submit(() -> range.parallel()
        .forEach((int theInt) ->
        {
            try { Thread.sleep(100); } catch (Exception ignore) {}
            System.out.println(Thread.currentThread().getName() + " -- " + theInt);
        })).get();
System.out.println("list.parallelStream");
int [] array = IntStream.range(0, 20).toArray();
List<Integer> list = new ArrayList<>();
for (int theInt: array)
{
    list.add(theInt);
}
fjpool.submit(() -> list.parallelStream()
        .forEach((theInt) ->
        {
            try { Thread.sleep(100); } catch (Exception ignore) {}
            System.out.println(Thread.currentThread().getName() + " -- " + theInt);
        })).get();

其他回答

(目前)公认的答案有一部分是错误的。仅仅将并行流提交给专用的fork-join-pool是不够的。在这种情况下,流将使用该池的线程以及公共fork-join-pool甚至调用线程来处理流的工作负载,这似乎取决于公共fork-join池的大小。这种行为有点奇怪,但绝对不是必需的。

为了将工作完全限制在专用池中,你必须将它封装到一个CompletableFuture中:

final int parallelism = 4;
ForkJoinPool forkJoinPool = null;
try {
  forkJoinPool = new ForkJoinPool(parallelism);
  final List<Integer> primes = CompletableFuture.supplyAsync(() -> 
        // Parallel task here, for example
        IntStream.range(1, 1_000_000).parallel()
                .filter(PrimesPrint::isPrime)
                .boxed().collect(Collectors.toList()),
    forkJoinPool)  // <- passes dedicated fork-join pool as executor
    .join();  // <- Wait for result from forkJoinPool
    System.out.println(primes);
} finally {
    if (forkJoinPool != null) {
        forkJoinPool.shutdown();
    }
}

这段代码保留了在Java 8u352和Java 17.0.1上的forkJoinPool中的所有操作。

下面是我如何通过编程方式设置上面提到的最大线程数标志,以及一段代码来验证该参数是否符合要求

System.setProperty("java.util.concurrent.ForkJoinPool.common.parallelism", "2");
Set<String> threadNames = Stream.iterate(0, n -> n + 1)
  .parallel()
  .limit(100000)
  .map(i -> Thread.currentThread().getName())
  .collect(Collectors.toSet());
System.out.println(threadNames);

// Output -> [ForkJoinPool.commonPool-worker-1, Test worker, ForkJoinPool.commonPool-worker-3]

要测量实际使用的线程数,可以检查Thread.activeCount():

    Runnable r = () -> IntStream
            .range(-42, +42)
            .parallel()
            .map(i -> Thread.activeCount())
            .max()
            .ifPresent(System.out::println);

    ForkJoinPool.commonPool().submit(r).join();
    new ForkJoinPool(42).submit(r).join();

这可以在4核CPU上产生如下输出:

5 // common pool
23 // custom pool

如果没有.parallel(),它会给出:

3 // common pool
4 // custom pool

并行流使用默认的ForkJoinPool.commonPool,默认情况下,当你有处理器时,它会少一个线程,这是由Runtime.getRuntime(). availableprocessors()返回的(这意味着并行流为调用线程留下一个处理器)。

对于需要单独或自定义池的应用程序,ForkJoinPool可以用给定的目标并行度级别来构造;默认情况下,等于可用处理器的数量。

这也意味着,如果您有嵌套的并行流或并发启动多个并行流,它们都将共享同一个池。优点:使用的处理器数量永远不会超过默认值(可用处理器数量)。缺点:您可能无法获得分配给您初始化的每个并行流的“所有处理器”(如果您碰巧有多个并行流)。(显然你可以使用ManagedBlocker来规避这个问题。)

要更改并行流的执行方式,您可以使用以下两种方法

提交并行流执行到你自己的ForkJoinPool: yourFJP.submit(() -> stream.parallel().forEach(soSomething)).get();或 你可以使用系统属性来改变公共池的大小:system . setproperty ("java.util.concurrent.ForkJoinPool.common.parallelism", "20"),目标并行度为20个线程。


后者的例子在我的机器上有8个处理器。如果我运行以下程序:

long start = System.currentTimeMillis();
IntStream s = IntStream.range(0, 20);
//System.setProperty("java.util.concurrent.ForkJoinPool.common.parallelism", "20");
s.parallel().forEach(i -> {
    try { Thread.sleep(100); } catch (Exception ignore) {}
    System.out.print((System.currentTimeMillis() - start) + " ");
});

输出结果为:

215 216 216 216 216 216 216 216 216 216 216 316 316 316 415 416 416 416

所以你可以看到并行流一次处理8个项目,也就是说它使用8个线程。然而,如果我取消注释注释行,输出是:

215 215 215 215 215 216 216 216 216 216 216 216 216 216 216 216 216 216 216 216 216 216 216

这一次,并行流使用了20个线程,流中的所有20个元素都被并发处理。

我使实用工具方法并行运行任务与参数定义最大线程数。

public static void runParallel(final int maxThreads, Runnable task) throws RuntimeException {
    ForkJoinPool forkJoinPool = null;
    try {
        forkJoinPool = new ForkJoinPool(maxThreads);
        forkJoinPool.submit(task).get();
    } catch (InterruptedException | ExecutionException e) {
        throw new RuntimeException(e);
    } finally {
        if (forkJoinPool != null) {
            forkJoinPool.shutdown();
        }
    }
}

它创建了最大线程数的ForkJoinPool,并在任务完成(或失败)后关闭它。

用法如下:

final int maxThreads = 4;
runParallel(maxThreads, () -> 
    IntStream.range(1, 1_000_000).parallel()
            .filter(PrimesPrint::isPrime)
            .boxed().collect(Collectors.toList()));