是否可以为Java 8并行流指定一个自定义线程池?我到处都找不到。
假设我有一个服务器应用程序,我想使用并行流。但是这个应用程序很大,而且是多线程的,所以我想对它进行划分。我不希望在来自另一个模块的applicationblock任务的一个模块中运行缓慢的任务。
如果我不能为不同的模块使用不同的线程池,这意味着我不能在大多数实际情况下安全地使用并行流。
试试下面的例子。有一些CPU密集型任务在单独的线程中执行。
任务利用并行流。第一个任务中断,因此每一步花费1秒(通过线程睡眠模拟)。问题是其他线程卡住,等待中断的任务完成。这是一个虚构的例子,但是想象一下servlet应用程序和某人向共享fork连接池提交了一个长时间运行的任务。
public class ParallelTest {
public static void main(String[] args) throws InterruptedException {
ExecutorService es = Executors.newCachedThreadPool();
es.execute(() -> runTask(1000)); //incorrect task
es.execute(() -> runTask(0));
es.execute(() -> runTask(0));
es.execute(() -> runTask(0));
es.execute(() -> runTask(0));
es.execute(() -> runTask(0));
es.shutdown();
es.awaitTermination(60, TimeUnit.SECONDS);
}
private static void runTask(int delay) {
range(1, 1_000_000).parallel().filter(ParallelTest::isPrime).peek(i -> Utils.sleep(delay)).max()
.ifPresent(max -> System.out.println(Thread.currentThread() + " " + max));
}
public static boolean isPrime(long n) {
return n > 1 && rangeClosed(2, (long) sqrt(n)).noneMatch(divisor -> n % divisor == 0);
}
}
要测量实际使用的线程数,可以检查Thread.activeCount():
Runnable r = () -> IntStream
.range(-42, +42)
.parallel()
.map(i -> Thread.activeCount())
.max()
.ifPresent(System.out::println);
ForkJoinPool.commonPool().submit(r).join();
new ForkJoinPool(42).submit(r).join();
这可以在4核CPU上产生如下输出:
5 // common pool
23 // custom pool
如果没有.parallel(),它会给出:
3 // common pool
4 // custom pool
(目前)公认的答案有一部分是错误的。仅仅将并行流提交给专用的fork-join-pool是不够的。在这种情况下,流将使用该池的线程以及公共fork-join-pool甚至调用线程来处理流的工作负载,这似乎取决于公共fork-join池的大小。这种行为有点奇怪,但绝对不是必需的。
为了将工作完全限制在专用池中,你必须将它封装到一个CompletableFuture中:
final int parallelism = 4;
ForkJoinPool forkJoinPool = null;
try {
forkJoinPool = new ForkJoinPool(parallelism);
final List<Integer> primes = CompletableFuture.supplyAsync(() ->
// Parallel task here, for example
IntStream.range(1, 1_000_000).parallel()
.filter(PrimesPrint::isPrime)
.boxed().collect(Collectors.toList()),
forkJoinPool) // <- passes dedicated fork-join pool as executor
.join(); // <- Wait for result from forkJoinPool
System.out.println(primes);
} finally {
if (forkJoinPool != null) {
forkJoinPool.shutdown();
}
}
这段代码保留了在Java 8u352和Java 17.0.1上的forkJoinPool中的所有操作。
我使实用工具方法并行运行任务与参数定义最大线程数。
public static void runParallel(final int maxThreads, Runnable task) throws RuntimeException {
ForkJoinPool forkJoinPool = null;
try {
forkJoinPool = new ForkJoinPool(maxThreads);
forkJoinPool.submit(task).get();
} catch (InterruptedException | ExecutionException e) {
throw new RuntimeException(e);
} finally {
if (forkJoinPool != null) {
forkJoinPool.shutdown();
}
}
}
它创建了最大线程数的ForkJoinPool,并在任务完成(或失败)后关闭它。
用法如下:
final int maxThreads = 4;
runParallel(maxThreads, () ->
IntStream.range(1, 1_000_000).parallel()
.filter(PrimesPrint::isPrime)
.boxed().collect(Collectors.toList()));
并行流使用默认的ForkJoinPool.commonPool,默认情况下,当你有处理器时,它会少一个线程,这是由Runtime.getRuntime(). availableprocessors()返回的(这意味着并行流为调用线程留下一个处理器)。
对于需要单独或自定义池的应用程序,ForkJoinPool可以用给定的目标并行度级别来构造;默认情况下,等于可用处理器的数量。
这也意味着,如果您有嵌套的并行流或并发启动多个并行流,它们都将共享同一个池。优点:使用的处理器数量永远不会超过默认值(可用处理器数量)。缺点:您可能无法获得分配给您初始化的每个并行流的“所有处理器”(如果您碰巧有多个并行流)。(显然你可以使用ManagedBlocker来规避这个问题。)
要更改并行流的执行方式,您可以使用以下两种方法
提交并行流执行到你自己的ForkJoinPool: yourFJP.submit(() -> stream.parallel().forEach(soSomething)).get();或
你可以使用系统属性来改变公共池的大小:system . setproperty ("java.util.concurrent.ForkJoinPool.common.parallelism", "20"),目标并行度为20个线程。
后者的例子在我的机器上有8个处理器。如果我运行以下程序:
long start = System.currentTimeMillis();
IntStream s = IntStream.range(0, 20);
//System.setProperty("java.util.concurrent.ForkJoinPool.common.parallelism", "20");
s.parallel().forEach(i -> {
try { Thread.sleep(100); } catch (Exception ignore) {}
System.out.print((System.currentTimeMillis() - start) + " ");
});
输出结果为:
215 216 216 216 216 216 216 216 216 216 216 316 316 316 415 416 416 416
所以你可以看到并行流一次处理8个项目,也就是说它使用8个线程。然而,如果我取消注释注释行,输出是:
215 215 215 215 215 216 216 216 216 216 216 216 216 216 216 216 216 216 216 216 216 216 216
这一次,并行流使用了20个线程,流中的所有20个元素都被并发处理。