根据Learning Spark
请记住,重新划分数据是一项相当昂贵的操作。 Spark还有一个repartition()的优化版本,称为coalesce(),它允许避免数据移动,但仅当您正在减少RDD分区的数量时。
我得到的一个区别是,使用repartition()可以增加/减少分区的数量,但使用coalesce()只能减少分区的数量。
如果分区分布在多台机器上,并且运行了coalesce(),它如何避免数据移动?
根据Learning Spark
请记住,重新划分数据是一项相当昂贵的操作。 Spark还有一个repartition()的优化版本,称为coalesce(),它允许避免数据移动,但仅当您正在减少RDD分区的数量时。
我得到的一个区别是,使用repartition()可以增加/减少分区的数量,但使用coalesce()只能减少分区的数量。
如果分区分布在多台机器上,并且运行了coalesce(),它如何避免数据移动?
当前回答
基本上,重分区允许您增加或减少分区的数量。重分区重新分配来自所有分区的数据,这导致完全shuffle,这是非常昂贵的操作。
Coalesce是重新分区的优化版本,您只能减少分区的数量。由于我们只能减少分区的数量,它所做的是将一些分区合并为一个分区。通过合并分区,与重新分区相比,跨分区的数据移动更低。所以在Coalesce中是最小的数据移动,但说Coalesce不做数据移动是完全错误的说法。
另一件事是通过提供分区的数量来重新分区,它试图在所有分区上均匀地重新分配数据而在Coalesce的情况下,在某些情况下我们仍然可能有倾斜的数据。
其他回答
基本上,重分区允许您增加或减少分区的数量。重分区重新分配来自所有分区的数据,这导致完全shuffle,这是非常昂贵的操作。
Coalesce是重新分区的优化版本,您只能减少分区的数量。由于我们只能减少分区的数量,它所做的是将一些分区合并为一个分区。通过合并分区,与重新分区相比,跨分区的数据移动更低。所以在Coalesce中是最小的数据移动,但说Coalesce不做数据移动是完全错误的说法。
另一件事是通过提供分区的数量来重新分区,它试图在所有分区上均匀地重新分配数据而在Coalesce的情况下,在某些情况下我们仍然可能有倾斜的数据。
我想在贾斯汀和鲍尔的回答中补充一点——
重新分区将忽略现有分区并创建新分区。所以你可以用它来修复数据倾斜。您可以使用分区键来定义分布。数据倾斜是“大数据”问题空间中最大的问题之一。
Coalesce将使用现有分区并对其中的一个子集进行洗牌。它不能像重新分区那样修复数据倾斜。因此,即使它更便宜,它也可能不是你需要的东西。
贾斯汀的回答很棒,这个回答更有深度。
重分区算法进行完全洗牌,并使用均匀分布的数据创建新分区。让我们用1到12的数字创建一个DataFrame。
val x = (1 to 12).toList
val numbersDf = x.toDF("number")
numbersDf在我的机器上包含4个分区。
numbersDf.rdd.partitions.size // => 4
下面是数据在分区上的划分方式:
Partition 00000: 1, 2, 3
Partition 00001: 4, 5, 6
Partition 00002: 7, 8, 9
Partition 00003: 10, 11, 12
让我们使用重分区方法进行一次完全洗牌,并在两个节点上获得这些数据。
val numbersDfR = numbersDf.repartition(2)
下面是如何在我的机器上划分numbersDfR数据:
Partition A: 1, 3, 4, 6, 7, 9, 10, 12
Partition B: 2, 5, 8, 11
重分区方法创建新分区,并在新分区中均匀分布数据(对于较大的数据集,数据分布更均匀)。
合并和重新划分的区别
Coalesce使用现有分区来最小化打乱的数据量。重新分区创建新分区并进行完全洗牌。合并的结果是产生具有不同数据量的分区(有时分区的大小相差很大),而重新分区的结果是产生大小大致相同的分区。
合并和重新分区哪个更快?
联合可能比重新分区运行得快,但大小不等的分区通常比大小相等的分区运行得慢。在过滤了一个大型数据集之后,通常需要对数据集重新分区。我发现重新分区总体上更快,因为Spark是为处理相同大小的分区而构建的。
注意:我很好奇地发现重新分区会增加磁盘上数据的大小。在对大型数据集使用重分区/合并时,请确保运行测试。
如果你想了解更多细节,请阅读这篇博客文章。
当你在实践中使用合并和重分区
See this question on how to use coalesce & repartition to write out a DataFrame to a single file It's critical to repartition after running filtering queries. The number of partitions does not change after filtering, so if you don't repartition, you'll have way too many memory partitions (the more the filter reduces the dataset size, the bigger the problem). Watch out for the empty partition problem. partitionBy is used to write out data in partitions on disk. You'll need to use repartition / coalesce to partition your data in memory properly before using partitionBy.
另一个不同之处是考虑到存在倾斜连接的情况,您必须在其之上进行合并。在大多数情况下,重新分区将解决倾斜连接,然后您可以进行合并。
另一种情况是,假设你在一个数据帧中保存了一个中等/大量的数据,你必须批量生成到Kafka。在某些情况下,在生成到Kafka之前,重新分区有助于collectasList。但是,当容量非常大时,重新分区可能会导致严重的性能影响。在这种情况下,直接从dataframe生成Kafka会有所帮助。
附注:Coalesce并不像在工作人员之间进行完整的数据移动那样避免数据移动。但它确实减少了洗牌的次数。我想这就是那本书的意思。
以下是代码级别的一些额外细节/差异:
在这里只添加函数定义,完整的代码实现检查spark的github页面。
下面是在数据帧上重新分区的不同方法: 点击这里查看完整实现。
def repartition(numPartitions: Int): Dataset[T]
每当我们在dataframe上调用上述方法时,它都会返回一个新的数据集,该数据集恰好有numPartitions分区。
def repartition(numPartitions: Int, partitionExprs: Column*): Dataset[T]
上述方法返回一个新的数据集,该数据集由给定的分区表达式划分为numPartitions。生成的数据集是哈希分区的。
def repartition(partitionExprs: Column*): Dataset[T]
上面的方法返回一个新的数据集,由给定的分区表达式划分,使用spark.sql.shuffle.partitions作为分区数。生成的数据集是哈希分区的。
def repartitionByRange(numPartitions: Int, partitionExprs: Column*): Dataset[T]
上述方法返回一个新的数据集,该数据集由给定的分区表达式划分为numPartitions。生成的数据集是范围分区的。
def repartitionByRange(partitionExprs: Column*): Dataset[T]
上面的方法返回一个新的数据集,由给定的分区表达式划分,使用spark.sql.shuffle.partitions作为分区数。生成的数据集是范围分区的。
但是对于合并,我们只有以下方法在数据框架上:
def coalesce(numPartitions: Int): Dataset[T]
上述方法将返回一个新的数据集,该数据集恰好有numPartitions分区
下面是RDD上可用于重分区和合并的方法: 点击这里查看完整实现。
def coalesce(numPartitions: Int, shuffle: Boolean = false,
partitionCoalescer: Option[PartitionCoalescer] = Option.empty)
(implicit ord: Ordering[T] = null)
: RDD[T]
def repartition(numPartitions: Int)(implicit ord: Ordering[T] = null): RDD[T] = withScope {
coalesce(numPartitions, shuffle = true)
}
基本上,重分区方法通过将shuffle值传递为true来调用合并方法。 现在如果我们在RDD上使用coalesce方法,通过传递shuffle值为true,我们也可以增加分区!