根据Learning Spark
请记住,重新划分数据是一项相当昂贵的操作。 Spark还有一个repartition()的优化版本,称为coalesce(),它允许避免数据移动,但仅当您正在减少RDD分区的数量时。
我得到的一个区别是,使用repartition()可以增加/减少分区的数量,但使用coalesce()只能减少分区的数量。
如果分区分布在多台机器上,并且运行了coalesce(),它如何避免数据移动?
根据Learning Spark
请记住,重新划分数据是一项相当昂贵的操作。 Spark还有一个repartition()的优化版本,称为coalesce(),它允许避免数据移动,但仅当您正在减少RDD分区的数量时。
我得到的一个区别是,使用repartition()可以增加/减少分区的数量,但使用coalesce()只能减少分区的数量。
如果分区分布在多台机器上,并且运行了coalesce(),它如何避免数据移动?
当前回答
从代码和代码文档中可以看出,coalesce(n)与coalesce(n, shuffle = false)相同,而repartition(n)与coalesce(n, shuffle = true)相同。
因此,合并和重新分区都可以用来增加分区的数量
使用shuffle = true,实际上可以合并为更大的数字 的分区。如果你有少量的分区,这很有用, 比如100,可能有几个分区异常大。
另一个需要强调的重要注意事项是,如果您大幅减少分区数量,则应该考虑使用合并的打乱版本(在这种情况下与重新分区相同)。这将允许您的计算在父分区上并行执行(多个任务)。
然而,如果你正在做一个激烈的合并,例如numPartitions = 1,这可能会导致你的计算发生在比你想要的更少的节点上(例如,numPartitions = 1的情况下只有一个节点)。为了避免这种情况,你可以传递shuffle = true。这将添加一个shuffle步骤,但意味着当前的上游分区将并行执行(无论当前分区是什么)。
相关答案也请参考此处
其他回答
所有的答案都为这个经常被问到的问题增添了一些伟大的知识。
所以根据这个问题的传统时间轴,这里是我的2美分。
我发现在非常具体的情况下,重新分区比合并更快。
在我的应用程序中,当我们估计的文件数量低于某个阈值时,重新分区工作得更快。
这就是我的意思
if(numFiles > 20)
df.coalesce(numFiles).write.mode(SaveMode.Overwrite).parquet(dest)
else
df.repartition(numFiles).write.mode(SaveMode.Overwrite).parquet(dest)
在上面的代码片段中,如果我的文件小于20,合并将永远无法完成,而重新分区要快得多,因此上面的代码。
当然,这个数字(20)将取决于工作人员的数量和数据量。
希望这能有所帮助。
重新分区-建议在增加分区数量的同时使用它,因为它涉及到所有数据的洗牌。
Coalesce—建议在使用它的同时减少分区的数量。例如,如果你有3个分区,你想把它减少到2个,coalesce将把第3个分区的数据移动到分区1和分区2。分区1和分区2将保留在同一个容器中。 另一方面,重新分区将打乱所有分区中的数据,因此执行程序之间的网络使用将很高,这将影响性能。
在减少分区数量的同时,Coalesce比重分区的性能更好。
对于所有这些伟大的答案,我想补充的是,重新分区是利用数据并行化的最佳选择之一。而coalesce提供了一个廉价的选择来减少分区,并且在将数据写入HDFS或其他接收器以利用大写入时非常有用。
我发现这在以拼花格式写数据时很有用,可以充分利用它。
基本上,重分区允许您增加或减少分区的数量。重分区重新分配来自所有分区的数据,这导致完全shuffle,这是非常昂贵的操作。
Coalesce是重新分区的优化版本,您只能减少分区的数量。由于我们只能减少分区的数量,它所做的是将一些分区合并为一个分区。通过合并分区,与重新分区相比,跨分区的数据移动更低。所以在Coalesce中是最小的数据移动,但说Coalesce不做数据移动是完全错误的说法。
另一件事是通过提供分区的数量来重新分区,它试图在所有分区上均匀地重新分配数据而在Coalesce的情况下,在某些情况下我们仍然可能有倾斜的数据。
以下是代码级别的一些额外细节/差异:
在这里只添加函数定义,完整的代码实现检查spark的github页面。
下面是在数据帧上重新分区的不同方法: 点击这里查看完整实现。
def repartition(numPartitions: Int): Dataset[T]
每当我们在dataframe上调用上述方法时,它都会返回一个新的数据集,该数据集恰好有numPartitions分区。
def repartition(numPartitions: Int, partitionExprs: Column*): Dataset[T]
上述方法返回一个新的数据集,该数据集由给定的分区表达式划分为numPartitions。生成的数据集是哈希分区的。
def repartition(partitionExprs: Column*): Dataset[T]
上面的方法返回一个新的数据集,由给定的分区表达式划分,使用spark.sql.shuffle.partitions作为分区数。生成的数据集是哈希分区的。
def repartitionByRange(numPartitions: Int, partitionExprs: Column*): Dataset[T]
上述方法返回一个新的数据集,该数据集由给定的分区表达式划分为numPartitions。生成的数据集是范围分区的。
def repartitionByRange(partitionExprs: Column*): Dataset[T]
上面的方法返回一个新的数据集,由给定的分区表达式划分,使用spark.sql.shuffle.partitions作为分区数。生成的数据集是范围分区的。
但是对于合并,我们只有以下方法在数据框架上:
def coalesce(numPartitions: Int): Dataset[T]
上述方法将返回一个新的数据集,该数据集恰好有numPartitions分区
下面是RDD上可用于重分区和合并的方法: 点击这里查看完整实现。
def coalesce(numPartitions: Int, shuffle: Boolean = false,
partitionCoalescer: Option[PartitionCoalescer] = Option.empty)
(implicit ord: Ordering[T] = null)
: RDD[T]
def repartition(numPartitions: Int)(implicit ord: Ordering[T] = null): RDD[T] = withScope {
coalesce(numPartitions, shuffle = true)
}
基本上,重分区方法通过将shuffle值传递为true来调用合并方法。 现在如果我们在RDD上使用coalesce方法,通过传递shuffle值为true,我们也可以增加分区!