使用Java 8和lambdas,可以很容易地将集合作为流迭代,也可以很容易地使用并行流。文档中的两个例子,第二个使用parallelStream:
myShapesCollection.stream()
.filter(e -> e.getColor() == Color.RED)
.forEach(e -> System.out.println(e.getName()));
myShapesCollection.parallelStream() // <-- This one uses parallel
.filter(e -> e.getColor() == Color.RED)
.forEach(e -> System.out.println(e.getName()));
只要我不关心顺序,使用并行运算总是有益的吗?有人会认为将工作分配到更多的核心上更快。
还有其他考虑吗?什么时候应该使用平行流,什么时候应该使用非平行流?
(问这个问题是为了引发关于如何以及何时使用并行流的讨论,而不是因为我认为总是使用它们是一个好主意。)
Collection.parallelStream() is a great way to do work in parallel. However you need to keep in mind that this effectively uses a common thread pool with only a few worker threads internally (number of threads equals to the number of cpu cores by default), see ForkJoinPool.commonPool(). If some of pool's tasks are a long-running I/O-bound work then others, potentially fast, parallelStream calls will get stuck waiting for the free pool threads. This obviously leads to a requirement of fork-join tasks being non-blocking and short or, in other words, cpu-bound. For better understanding of details I strongly recommend careful reading of java.util.concurrent.ForkJoinTask javadoc, here are some relevant quotes:
ForkJoinTasks的效率源于…它们主要用作计算任务,计算纯函数或操作纯孤立的对象。
理想情况下,计算应避免同步方法或块,并应尽量减少其他阻塞同步
可细分的任务也不应该执行阻塞I/O
这表明parallelStream()任务的主要目的是在隔离的内存结构上进行简短计算。也建议查看文章常见的并行流陷阱
其他答案已经涵盖了分析,以避免过早优化和并行处理中的开销成本。这个答案解释了并行流数据结构的理想选择。
As a rule, performance gains from parallelism are best on streams over ArrayList , HashMap , HashSet , and ConcurrentHashMap instances; arrays; int ranges; and long ranges. What these data structures have in common is that they can all be accurately and cheaply split into subranges of any desired sizes, which makes it easy to divide work among parallel threads. The abstraction used by the streams library to perform this task is the spliterator , which is returned by the spliterator method on Stream and Iterable.
Another important factor that all of these data structures have in common is that they provide good-to-excellent locality of reference when processed sequentially: sequential element references are stored together in memory. The objects referred to by those references may not be close to one another in memory, which reduces locality-of-reference. Locality-of-reference turns out to be critically important for parallelizing bulk operations: without it, threads spend much of their time idle, waiting for data to be transferred from memory into the processor’s cache. The data structures with the best locality of reference are primitive arrays because the data itself is stored contiguously in memory.
来源:Joshua Bloch所著的有效Java 3e,在使流并行时要小心
流API的设计目的是使计算的编写变得容易,这种方法可以从它们的执行方式中抽象出来,使顺序和并行之间的切换变得容易。
然而,仅仅因为它很简单,并不意味着它总是一个好主意,事实上,仅仅因为你可以就把.parallel()放在所有地方并不是一个好主意。
First, note that parallelism offers no benefits other than the possibility of faster execution when more cores are available. A parallel execution will always involve more work than a sequential one, because in addition to solving the problem, it also has to perform dispatching and coordinating of sub-tasks. The hope is that you'll be able to get to the answer faster by breaking up the work across multiple processors; whether this actually happens depends on a lot of things, including the size of your data set, how much computation you are doing on each element, the nature of the computation (specifically, does the processing of one element interact with processing of others?), the number of processors available, and the number of other tasks competing for those processors.
此外,请注意并行性还经常暴露计算中的不确定性,而这些不确定性通常被顺序实现所隐藏;有时这并不重要,或者可以通过限制所涉及的操作来缓解(即,约简操作符必须是无状态的和关联的)。
实际上,并行有时会加快计算速度,有时不会,有时甚至会减慢计算速度。最好先使用顺序执行进行开发,然后在其中应用并行
(A)你知道提高绩效确实有好处
(B)它实际上会提高性能。
(A)是业务问题,不是技术问题。如果您是一个性能专家,您通常能够查看代码并确定(B),但是明智的方法是度量。(而且,在你确信(A)之前,甚至都不用费心;如果代码足够快,最好把你的大脑循环应用到其他地方。)
并行性最简单的性能模型是“NQ”模型,其中N是元素的数量,Q是每个元素的计算量。通常,在开始获得性能收益之前,您需要产品NQ超过某个阈值。对于像“从1到N的数字相加”这样的低q问题,你通常会看到N=1000到N=10000之间的盈亏平衡。对于高q问题,您将在较低的阈值处看到盈亏平衡。
但现实情况相当复杂。因此,在您达到专业水平之前,首先要确定顺序处理什么时候会真正让您付出代价,然后衡量并行性是否有帮助。
Collection.parallelStream() is a great way to do work in parallel. However you need to keep in mind that this effectively uses a common thread pool with only a few worker threads internally (number of threads equals to the number of cpu cores by default), see ForkJoinPool.commonPool(). If some of pool's tasks are a long-running I/O-bound work then others, potentially fast, parallelStream calls will get stuck waiting for the free pool threads. This obviously leads to a requirement of fork-join tasks being non-blocking and short or, in other words, cpu-bound. For better understanding of details I strongly recommend careful reading of java.util.concurrent.ForkJoinTask javadoc, here are some relevant quotes:
ForkJoinTasks的效率源于…它们主要用作计算任务,计算纯函数或操作纯孤立的对象。
理想情况下,计算应避免同步方法或块,并应尽量减少其他阻塞同步
可细分的任务也不应该执行阻塞I/O
这表明parallelStream()任务的主要目的是在隔离的内存结构上进行简短计算。也建议查看文章常见的并行流陷阱
我看了Brian Goetz (Java语言架构师& Lambda表达式规范负责人)的一次演讲。他详细解释了在进行并行化之前需要考虑的4点:
Splitting / decomposition costs
– Sometimes splitting is more expensive than just doing the work!
Task dispatch / management costs
– Can do a lot of work in the time it takes to hand work to another thread.
Result combination costs
– Sometimes combination involves copying lots of data. For example, adding numbers is cheap whereas merging sets is expensive.
Locality
– The elephant in the room. This is an important point which everyone may miss. You should consider cache misses, if a CPU waits for data because of cache misses then you wouldn't gain anything by parallelization. That's why array-based sources parallelize the best as the next indices (near the current index) are cached and there are fewer chances that CPU would experience a cache miss.
他还提到了一个相对简单的公式来确定并行加速的机会。
NQ模型:
N x Q > 10000
在那里,
N =数据项个数
Q =每一项的工作量
永远不要让一个无限的流与一个极限并行。事情是这样的:
public static void main(String[] args) {
// let's count to 1 in parallel
System.out.println(
IntStream.iterate(0, i -> i + 1)
.parallel()
.skip(1)
.findFirst()
.getAsInt());
}
结果
Exception in thread "main" java.lang.OutOfMemoryError
at ...
at java.base/java.util.stream.IntPipeline.findFirst(IntPipeline.java:528)
at InfiniteTest.main(InfiniteTest.java:24)
Caused by: java.lang.OutOfMemoryError: Java heap space
at java.base/java.util.stream.SpinedBuffer$OfInt.newArray(SpinedBuffer.java:750)
at ...
如果你使用.limit(…)
解释:
在Java 8中,在流中使用.parallel会导致OOM错误
类似地,如果流是有序的并且有比你想要处理的更多的元素,不要使用parallel。
public static void main(String[] args) {
// let's count to 1 in parallel
System.out.println(
IntStream.range(1, 1000_000_000)
.parallel()
.skip(100)
.findFirst()
.getAsInt());
}
这可能会运行更长的时间,因为并行线程可能工作在大量的数字范围上,而不是关键的0-100,这将花费很长时间。