根据Learning Spark
请记住,重新划分数据是一项相当昂贵的操作。 Spark还有一个repartition()的优化版本,称为coalesce(),它允许避免数据移动,但仅当您正在减少RDD分区的数量时。
我得到的一个区别是,使用repartition()可以增加/减少分区的数量,但使用coalesce()只能减少分区的数量。
如果分区分布在多台机器上,并且运行了coalesce(),它如何避免数据移动?
根据Learning Spark
请记住,重新划分数据是一项相当昂贵的操作。 Spark还有一个repartition()的优化版本,称为coalesce(),它允许避免数据移动,但仅当您正在减少RDD分区的数量时。
我得到的一个区别是,使用repartition()可以增加/减少分区的数量,但使用coalesce()只能减少分区的数量。
如果分区分布在多台机器上,并且运行了coalesce(),它如何避免数据移动?
当前回答
重分区:将数据移到新的分区中。
如。初始数据帧划分为200个分区。
df.repartition(500):数据将从200个分区重新排列到新的500个分区。
联合:将数据移到现有的分区中。
df.coalesce(5):数据将从剩余的195个分区转移到5个现有分区。
其他回答
所有的答案都为这个经常被问到的问题增添了一些伟大的知识。
所以根据这个问题的传统时间轴,这里是我的2美分。
我发现在非常具体的情况下,重新分区比合并更快。
在我的应用程序中,当我们估计的文件数量低于某个阈值时,重新分区工作得更快。
这就是我的意思
if(numFiles > 20)
df.coalesce(numFiles).write.mode(SaveMode.Overwrite).parquet(dest)
else
df.repartition(numFiles).write.mode(SaveMode.Overwrite).parquet(dest)
在上面的代码片段中,如果我的文件小于20,合并将永远无法完成,而重新分区要快得多,因此上面的代码。
当然,这个数字(20)将取决于工作人员的数量和数据量。
希望这能有所帮助。
这里需要注意的一点是,Spark RDD的基本原则是不变性。重新分区或合并将创建新的RDD。基本RDD将继续存在其原始分区数量。如果用例要求将RDD持久化在缓存中,则必须对新创建的RDD进行同样的操作。
scala> pairMrkt.repartition(10)
res16: org.apache.spark.rdd.RDD[(String, Array[String])] =MapPartitionsRDD[11] at repartition at <console>:26
scala> res16.partitions.length
res17: Int = 10
scala> pairMrkt.partitions.length
res20: Int = 2
重分区算法对数据进行完全洗牌,并创建大小相等的数据分区。Coalesce结合现有分区以避免完全洗牌。
Coalesce可以很好地使用一个具有大量分区的RDD,并将单个工作节点上的分区组合在一起,以生成一个具有较少分区的最终RDD。
重新分区将重新洗牌RDD中的数据,以产生您请求的最终分区数量。 DataFrames的分区看起来像是一个应该由框架管理的低级实现细节,但事实并非如此。当将大的dataframe过滤成小的dataframe时,你应该总是对数据进行重新分区。 你可能会经常把大的数据帧过滤成小的数据帧,所以要习惯重新分区。
如果你想了解更多细节,请阅读这篇博客文章。
有一个重分区>>合并的用例,即使在@Rob的回答中提到的分区号减少,也就是将数据写入单个文件。
@Rob的回答暗示了一个好的方向,但我认为需要一些进一步的解释来理解引擎盖下面发生了什么。
如果您需要在写入数据之前过滤数据,那么重新分区比coalesce更适合,因为coalesce将在加载操作之前下推。
例如: load () . map(…).filter(…).coalesce (1) .save ()
翻译: load () .coalesce (1) . map(…).filter(…).save ()
这意味着您的所有数据将被压缩到一个单独的分区中,在那里它将被过滤,失去所有的并行性。 这种情况甚至会发生在非常简单的过滤器,如column='value'。
load().map(…).filter(…).repartition(1).save()
在这种情况下,在原始分区上并行地进行过滤。
举个数量级的例子,在我的例子中,当从Hive表加载后过滤109M行(~105G)和~1000个分区时,运行时从合并(1)的~6h下降到重新分区(1)的~2m。
具体示例取自AirBnB的这篇文章,这篇文章非常好,甚至涵盖了Spark中重新分区技术的更多方面。
Coalesce使用现有分区来最小化数据量 被打乱。重新分区将创建新的分区并执行满分区 洗牌。 合并会产生具有不同数据量的分区 (有时分区有许多不同的大小)和 重新分区会产生大小大致相同的分区。 合并可以减少分区,但修复可以用来增加或减少分区。