根据Learning Spark
请记住,重新划分数据是一项相当昂贵的操作。 Spark还有一个repartition()的优化版本,称为coalesce(),它允许避免数据移动,但仅当您正在减少RDD分区的数量时。
我得到的一个区别是,使用repartition()可以增加/减少分区的数量,但使用coalesce()只能减少分区的数量。
如果分区分布在多台机器上,并且运行了coalesce(),它如何避免数据移动?
根据Learning Spark
请记住,重新划分数据是一项相当昂贵的操作。 Spark还有一个repartition()的优化版本,称为coalesce(),它允许避免数据移动,但仅当您正在减少RDD分区的数量时。
我得到的一个区别是,使用repartition()可以增加/减少分区的数量,但使用coalesce()只能减少分区的数量。
如果分区分布在多台机器上,并且运行了coalesce(),它如何避免数据移动?
当前回答
这里需要注意的一点是,Spark RDD的基本原则是不变性。重新分区或合并将创建新的RDD。基本RDD将继续存在其原始分区数量。如果用例要求将RDD持久化在缓存中,则必须对新创建的RDD进行同样的操作。
scala> pairMrkt.repartition(10)
res16: org.apache.spark.rdd.RDD[(String, Array[String])] =MapPartitionsRDD[11] at repartition at <console>:26
scala> res16.partitions.length
res17: Int = 10
scala> pairMrkt.partitions.length
res20: Int = 2
其他回答
所有的答案都为这个经常被问到的问题增添了一些伟大的知识。
所以根据这个问题的传统时间轴,这里是我的2美分。
我发现在非常具体的情况下,重新分区比合并更快。
在我的应用程序中,当我们估计的文件数量低于某个阈值时,重新分区工作得更快。
这就是我的意思
if(numFiles > 20)
df.coalesce(numFiles).write.mode(SaveMode.Overwrite).parquet(dest)
else
df.repartition(numFiles).write.mode(SaveMode.Overwrite).parquet(dest)
在上面的代码片段中,如果我的文件小于20,合并将永远无法完成,而重新分区要快得多,因此上面的代码。
当然,这个数字(20)将取决于工作人员的数量和数据量。
希望这能有所帮助。
从代码和代码文档中可以看出,coalesce(n)与coalesce(n, shuffle = false)相同,而repartition(n)与coalesce(n, shuffle = true)相同。
因此,合并和重新分区都可以用来增加分区的数量
使用shuffle = true,实际上可以合并为更大的数字 的分区。如果你有少量的分区,这很有用, 比如100,可能有几个分区异常大。
另一个需要强调的重要注意事项是,如果您大幅减少分区数量,则应该考虑使用合并的打乱版本(在这种情况下与重新分区相同)。这将允许您的计算在父分区上并行执行(多个任务)。
然而,如果你正在做一个激烈的合并,例如numPartitions = 1,这可能会导致你的计算发生在比你想要的更少的节点上(例如,numPartitions = 1的情况下只有一个节点)。为了避免这种情况,你可以传递shuffle = true。这将添加一个shuffle步骤,但意味着当前的上游分区将并行执行(无论当前分区是什么)。
相关答案也请参考此处
以下是代码级别的一些额外细节/差异:
在这里只添加函数定义,完整的代码实现检查spark的github页面。
下面是在数据帧上重新分区的不同方法: 点击这里查看完整实现。
def repartition(numPartitions: Int): Dataset[T]
每当我们在dataframe上调用上述方法时,它都会返回一个新的数据集,该数据集恰好有numPartitions分区。
def repartition(numPartitions: Int, partitionExprs: Column*): Dataset[T]
上述方法返回一个新的数据集,该数据集由给定的分区表达式划分为numPartitions。生成的数据集是哈希分区的。
def repartition(partitionExprs: Column*): Dataset[T]
上面的方法返回一个新的数据集,由给定的分区表达式划分,使用spark.sql.shuffle.partitions作为分区数。生成的数据集是哈希分区的。
def repartitionByRange(numPartitions: Int, partitionExprs: Column*): Dataset[T]
上述方法返回一个新的数据集,该数据集由给定的分区表达式划分为numPartitions。生成的数据集是范围分区的。
def repartitionByRange(partitionExprs: Column*): Dataset[T]
上面的方法返回一个新的数据集,由给定的分区表达式划分,使用spark.sql.shuffle.partitions作为分区数。生成的数据集是范围分区的。
但是对于合并,我们只有以下方法在数据框架上:
def coalesce(numPartitions: Int): Dataset[T]
上述方法将返回一个新的数据集,该数据集恰好有numPartitions分区
下面是RDD上可用于重分区和合并的方法: 点击这里查看完整实现。
def coalesce(numPartitions: Int, shuffle: Boolean = false,
partitionCoalescer: Option[PartitionCoalescer] = Option.empty)
(implicit ord: Ordering[T] = null)
: RDD[T]
def repartition(numPartitions: Int)(implicit ord: Ordering[T] = null): RDD[T] = withScope {
coalesce(numPartitions, shuffle = true)
}
基本上,重分区方法通过将shuffle值传递为true来调用合并方法。 现在如果我们在RDD上使用coalesce方法,通过传递shuffle值为true,我们也可以增加分区!
它避免了完全洗牌。如果已知分区数量正在减少,则执行器可以安全地将数据保存在最小分区数量上,只将数据从额外的节点移到我们保留的节点上。
所以,它会是这样的:
Node 1 = 1,2,3
Node 2 = 4,5,6
Node 3 = 7,8,9
Node 4 = 10,11,12
然后合并到2个分区:
Node 1 = 1,2,3 + (10,11,12)
Node 3 = 7,8,9 + (4,5,6)
注意,节点1和节点3不需要移动其原始数据。
但是你也应该确保,如果你在处理巨大的数据,将要合并的节点的数据应该是高度配置的。因为所有的数据都会加载到那些节点上,可能会导致内存异常。 虽然赔款很贵,但我还是愿意用它。因为它对数据进行了洗牌和平均分配。
在合并和重新分区之间进行明智的选择。