根据Learning Spark
请记住,重新划分数据是一项相当昂贵的操作。 Spark还有一个repartition()的优化版本,称为coalesce(),它允许避免数据移动,但仅当您正在减少RDD分区的数量时。
我得到的一个区别是,使用repartition()可以增加/减少分区的数量,但使用coalesce()只能减少分区的数量。
如果分区分布在多台机器上,并且运行了coalesce(),它如何避免数据移动?
根据Learning Spark
请记住,重新划分数据是一项相当昂贵的操作。 Spark还有一个repartition()的优化版本,称为coalesce(),它允许避免数据移动,但仅当您正在减少RDD分区的数量时。
我得到的一个区别是,使用repartition()可以增加/减少分区的数量,但使用coalesce()只能减少分区的数量。
如果分区分布在多台机器上,并且运行了coalesce(),它如何避免数据移动?
当前回答
合并比重新分区执行得更好。合并总是减少分区。假设你在yarn中启用动态分配,你有四个分区和执行器。如果过滤器应用于它,超过可能的一个或多个执行程序是空的,没有数据。这个问题可以通过合并而不是重新划分来解决。
其他回答
我想在贾斯汀和鲍尔的回答中补充一点——
重新分区将忽略现有分区并创建新分区。所以你可以用它来修复数据倾斜。您可以使用分区键来定义分布。数据倾斜是“大数据”问题空间中最大的问题之一。
Coalesce将使用现有分区并对其中的一个子集进行洗牌。它不能像重新分区那样修复数据倾斜。因此,即使它更便宜,它也可能不是你需要的东西。
用一种简单的方式 COALESCE:-仅用于减少分区数量,没有数据变换,它只是压缩分区
REPARTITION:-用于增加和减少分区的数量,但会发生洗牌
例子:-
val rdd = sc.textFile("path",7)
rdd.repartition(10)
rdd.repartition(2)
两者都很好
但是当我们需要在一个集群中看到输出时,我们通常会选择这两个。
从代码和代码文档中可以看出,coalesce(n)与coalesce(n, shuffle = false)相同,而repartition(n)与coalesce(n, shuffle = true)相同。
因此,合并和重新分区都可以用来增加分区的数量
使用shuffle = true,实际上可以合并为更大的数字 的分区。如果你有少量的分区,这很有用, 比如100,可能有几个分区异常大。
另一个需要强调的重要注意事项是,如果您大幅减少分区数量,则应该考虑使用合并的打乱版本(在这种情况下与重新分区相同)。这将允许您的计算在父分区上并行执行(多个任务)。
然而,如果你正在做一个激烈的合并,例如numPartitions = 1,这可能会导致你的计算发生在比你想要的更少的节点上(例如,numPartitions = 1的情况下只有一个节点)。为了避免这种情况,你可以传递shuffle = true。这将添加一个shuffle步骤,但意味着当前的上游分区将并行执行(无论当前分区是什么)。
相关答案也请参考此处
对于所有这些伟大的答案,我想补充的是,重新分区是利用数据并行化的最佳选择之一。而coalesce提供了一个廉价的选择来减少分区,并且在将数据写入HDFS或其他接收器以利用大写入时非常有用。
我发现这在以拼花格式写数据时很有用,可以充分利用它。
所有的答案都为这个经常被问到的问题增添了一些伟大的知识。
所以根据这个问题的传统时间轴,这里是我的2美分。
我发现在非常具体的情况下,重新分区比合并更快。
在我的应用程序中,当我们估计的文件数量低于某个阈值时,重新分区工作得更快。
这就是我的意思
if(numFiles > 20)
df.coalesce(numFiles).write.mode(SaveMode.Overwrite).parquet(dest)
else
df.repartition(numFiles).write.mode(SaveMode.Overwrite).parquet(dest)
在上面的代码片段中,如果我的文件小于20,合并将永远无法完成,而重新分区要快得多,因此上面的代码。
当然,这个数字(20)将取决于工作人员的数量和数据量。
希望这能有所帮助。