是否可以为Java 8并行流指定一个自定义线程池?我到处都找不到。
假设我有一个服务器应用程序,我想使用并行流。但是这个应用程序很大,而且是多线程的,所以我想对它进行划分。我不希望在来自另一个模块的applicationblock任务的一个模块中运行缓慢的任务。
如果我不能为不同的模块使用不同的线程池,这意味着我不能在大多数实际情况下安全地使用并行流。
试试下面的例子。有一些CPU密集型任务在单独的线程中执行。
任务利用并行流。第一个任务中断,因此每一步花费1秒(通过线程睡眠模拟)。问题是其他线程卡住,等待中断的任务完成。这是一个虚构的例子,但是想象一下servlet应用程序和某人向共享fork连接池提交了一个长时间运行的任务。
public class ParallelTest {
public static void main(String[] args) throws InterruptedException {
ExecutorService es = Executors.newCachedThreadPool();
es.execute(() -> runTask(1000)); //incorrect task
es.execute(() -> runTask(0));
es.execute(() -> runTask(0));
es.execute(() -> runTask(0));
es.execute(() -> runTask(0));
es.execute(() -> runTask(0));
es.shutdown();
es.awaitTermination(60, TimeUnit.SECONDS);
}
private static void runTask(int delay) {
range(1, 1_000_000).parallel().filter(ParallelTest::isPrime).peek(i -> Utils.sleep(delay)).max()
.ifPresent(max -> System.out.println(Thread.currentThread() + " " + max));
}
public static boolean isPrime(long n) {
return n > 1 && rangeClosed(2, (long) sqrt(n)).noneMatch(divisor -> n % divisor == 0);
}
}
到目前为止,我使用了这个问题的答案中描述的解决方案。现在,我想出了一个叫做并行流支持的小库:
ForkJoinPool pool = new ForkJoinPool(NR_OF_THREADS);
ParallelIntStreamSupport.range(1, 1_000_000, pool)
.filter(PrimesPrint::isPrime)
.collect(toList())
但是正如@PabloMatiasGomez在评论中指出的那样,并行流的分割机制存在缺陷,它严重依赖于公共池的大小。参见HashSet中的并行流不能并行运行。
我使用这个解决方案只是为了对不同类型的工作有单独的池,但即使我不使用它,我也不能将公共池的大小设置为1。
(目前)公认的答案有一部分是错误的。仅仅将并行流提交给专用的fork-join-pool是不够的。在这种情况下,流将使用该池的线程以及公共fork-join-pool甚至调用线程来处理流的工作负载,这似乎取决于公共fork-join池的大小。这种行为有点奇怪,但绝对不是必需的。
为了将工作完全限制在专用池中,你必须将它封装到一个CompletableFuture中:
final int parallelism = 4;
ForkJoinPool forkJoinPool = null;
try {
forkJoinPool = new ForkJoinPool(parallelism);
final List<Integer> primes = CompletableFuture.supplyAsync(() ->
// Parallel task here, for example
IntStream.range(1, 1_000_000).parallel()
.filter(PrimesPrint::isPrime)
.boxed().collect(Collectors.toList()),
forkJoinPool) // <- passes dedicated fork-join pool as executor
.join(); // <- Wait for result from forkJoinPool
System.out.println(primes);
} finally {
if (forkJoinPool != null) {
forkJoinPool.shutdown();
}
}
这段代码保留了在Java 8u352和Java 17.0.1上的forkJoinPool中的所有操作。
并行流使用默认的ForkJoinPool.commonPool,默认情况下,当你有处理器时,它会少一个线程,这是由Runtime.getRuntime(). availableprocessors()返回的(这意味着并行流为调用线程留下一个处理器)。
对于需要单独或自定义池的应用程序,ForkJoinPool可以用给定的目标并行度级别来构造;默认情况下,等于可用处理器的数量。
这也意味着,如果您有嵌套的并行流或并发启动多个并行流,它们都将共享同一个池。优点:使用的处理器数量永远不会超过默认值(可用处理器数量)。缺点:您可能无法获得分配给您初始化的每个并行流的“所有处理器”(如果您碰巧有多个并行流)。(显然你可以使用ManagedBlocker来规避这个问题。)
要更改并行流的执行方式,您可以使用以下两种方法
提交并行流执行到你自己的ForkJoinPool: yourFJP.submit(() -> stream.parallel().forEach(soSomething)).get();或
你可以使用系统属性来改变公共池的大小:system . setproperty ("java.util.concurrent.ForkJoinPool.common.parallelism", "20"),目标并行度为20个线程。
后者的例子在我的机器上有8个处理器。如果我运行以下程序:
long start = System.currentTimeMillis();
IntStream s = IntStream.range(0, 20);
//System.setProperty("java.util.concurrent.ForkJoinPool.common.parallelism", "20");
s.parallel().forEach(i -> {
try { Thread.sleep(100); } catch (Exception ignore) {}
System.out.print((System.currentTimeMillis() - start) + " ");
});
输出结果为:
215 216 216 216 216 216 216 216 216 216 216 316 316 316 415 416 416 416
所以你可以看到并行流一次处理8个项目,也就是说它使用8个线程。然而,如果我取消注释注释行,输出是:
215 215 215 215 215 216 216 216 216 216 216 216 216 216 216 216 216 216 216 216 216 216 216
这一次,并行流使用了20个线程,流中的所有20个元素都被并发处理。
我使实用工具方法并行运行任务与参数定义最大线程数。
public static void runParallel(final int maxThreads, Runnable task) throws RuntimeException {
ForkJoinPool forkJoinPool = null;
try {
forkJoinPool = new ForkJoinPool(maxThreads);
forkJoinPool.submit(task).get();
} catch (InterruptedException | ExecutionException e) {
throw new RuntimeException(e);
} finally {
if (forkJoinPool != null) {
forkJoinPool.shutdown();
}
}
}
它创建了最大线程数的ForkJoinPool,并在任务完成(或失败)后关闭它。
用法如下:
final int maxThreads = 4;
runParallel(maxThreads, () ->
IntStream.range(1, 1_000_000).parallel()
.filter(PrimesPrint::isPrime)
.boxed().collect(Collectors.toList()));