是否可以为Java 8并行流指定一个自定义线程池?我到处都找不到。

假设我有一个服务器应用程序,我想使用并行流。但是这个应用程序很大,而且是多线程的,所以我想对它进行划分。我不希望在来自另一个模块的applicationblock任务的一个模块中运行缓慢的任务。

如果我不能为不同的模块使用不同的线程池,这意味着我不能在大多数实际情况下安全地使用并行流。

试试下面的例子。有一些CPU密集型任务在单独的线程中执行。 任务利用并行流。第一个任务中断,因此每一步花费1秒(通过线程睡眠模拟)。问题是其他线程卡住,等待中断的任务完成。这是一个虚构的例子,但是想象一下servlet应用程序和某人向共享fork连接池提交了一个长时间运行的任务。

public class ParallelTest {
    public static void main(String[] args) throws InterruptedException {
        ExecutorService es = Executors.newCachedThreadPool();

        es.execute(() -> runTask(1000)); //incorrect task
        es.execute(() -> runTask(0));
        es.execute(() -> runTask(0));
        es.execute(() -> runTask(0));
        es.execute(() -> runTask(0));
        es.execute(() -> runTask(0));


        es.shutdown();
        es.awaitTermination(60, TimeUnit.SECONDS);
    }

    private static void runTask(int delay) {
        range(1, 1_000_000).parallel().filter(ParallelTest::isPrime).peek(i -> Utils.sleep(delay)).max()
                .ifPresent(max -> System.out.println(Thread.currentThread() + " " + max));
    }

    public static boolean isPrime(long n) {
        return n > 1 && rangeClosed(2, (long) sqrt(n)).noneMatch(divisor -> n % divisor == 0);
    }
}

当前回答

如果你不介意使用第三方库,使用cyclops-react,你可以在同一个管道中混合顺序流和并行流,并提供自定义ForkJoinPools。例如

 ReactiveSeq.range(1, 1_000_000)
            .foldParallel(new ForkJoinPool(10),
                          s->s.filter(i->true)
                              .peek(i->System.out.println("Thread " + Thread.currentThread().getId()))
                              .max(Comparator.naturalOrder()));

或者希望继续在顺序流中处理

 ReactiveSeq.range(1, 1_000_000)
            .parallel(new ForkJoinPool(10),
                      s->s.filter(i->true)
                          .peek(i->System.out.println("Thread " + Thread.currentThread().getId())))
            .map(this::processSequentially)
            .forEach(System.out::println);

[披露我是cyclops-react的主要开发者]

其他回答

去打算盘,很常见。并行流可指定线程数。下面是示例代码:

LongStream.range(4, 1_000_000).parallel(threadNum)...

披露:我是abacus-common的开发者。

到目前为止,我使用了这个问题的答案中描述的解决方案。现在,我想出了一个叫做并行流支持的小库:

ForkJoinPool pool = new ForkJoinPool(NR_OF_THREADS);
ParallelIntStreamSupport.range(1, 1_000_000, pool)
    .filter(PrimesPrint::isPrime)
    .collect(toList())

但是正如@PabloMatiasGomez在评论中指出的那样,并行流的分割机制存在缺陷,它严重依赖于公共池的大小。参见HashSet中的并行流不能并行运行。

我使用这个解决方案只是为了对不同类型的工作有单独的池,但即使我不使用它,我也不能将公共池的大小设置为1。

注意: JDK 10中似乎实现了一个修复,以确保自定义线程池使用预期的线程数量。

自定义ForkJoinPool中的并行流执行应该遵循并行性 https://bugs.openjdk.java.net/browse/JDK-8190974

我使实用工具方法并行运行任务与参数定义最大线程数。

public static void runParallel(final int maxThreads, Runnable task) throws RuntimeException {
    ForkJoinPool forkJoinPool = null;
    try {
        forkJoinPool = new ForkJoinPool(maxThreads);
        forkJoinPool.submit(task).get();
    } catch (InterruptedException | ExecutionException e) {
        throw new RuntimeException(e);
    } finally {
        if (forkJoinPool != null) {
            forkJoinPool.shutdown();
        }
    }
}

它创建了最大线程数的ForkJoinPool,并在任务完成(或失败)后关闭它。

用法如下:

final int maxThreads = 4;
runParallel(maxThreads, () -> 
    IntStream.range(1, 1_000_000).parallel()
            .filter(PrimesPrint::isPrime)
            .boxed().collect(Collectors.toList()));

原来的解决方案(设置ForkJoinPool公共并行性属性)不再有效。看看原始答案中的链接,打破这一点的更新已经被回移植到Java 8。正如链接线程中提到的,这个解决方案并不能保证永远有效。基于此,解决方案是forkjoinpool。提交接受答案中讨论的.get解决方案。我认为后端口修复了这个解决方案的不可靠性。

ForkJoinPool fjpool = new ForkJoinPool(10);
System.out.println("stream.parallel");
IntStream range = IntStream.range(0, 20);
fjpool.submit(() -> range.parallel()
        .forEach((int theInt) ->
        {
            try { Thread.sleep(100); } catch (Exception ignore) {}
            System.out.println(Thread.currentThread().getName() + " -- " + theInt);
        })).get();
System.out.println("list.parallelStream");
int [] array = IntStream.range(0, 20).toArray();
List<Integer> list = new ArrayList<>();
for (int theInt: array)
{
    list.add(theInt);
}
fjpool.submit(() -> list.parallelStream()
        .forEach((theInt) ->
        {
            try { Thread.sleep(100); } catch (Exception ignore) {}
            System.out.println(Thread.currentThread().getName() + " -- " + theInt);
        })).get();