根据Learning Spark

请记住,重新划分数据是一项相当昂贵的操作。 Spark还有一个repartition()的优化版本,称为coalesce(),它允许避免数据移动,但仅当您正在减少RDD分区的数量时。

我得到的一个区别是,使用repartition()可以增加/减少分区的数量,但使用coalesce()只能减少分区的数量。

如果分区分布在多台机器上,并且运行了coalesce(),它如何避免数据移动?


当前回答

有一个重分区>>合并的用例,即使在@Rob的回答中提到的分区号减少,也就是将数据写入单个文件。

@Rob的回答暗示了一个好的方向,但我认为需要一些进一步的解释来理解引擎盖下面发生了什么。

如果您需要在写入数据之前过滤数据,那么重新分区比coalesce更适合,因为coalesce将在加载操作之前下推。

例如: load () . map(…).filter(…).coalesce (1) .save ()

翻译: load () .coalesce (1) . map(…).filter(…).save ()

这意味着您的所有数据将被压缩到一个单独的分区中,在那里它将被过滤,失去所有的并行性。 这种情况甚至会发生在非常简单的过滤器,如column='value'。

load().map(…).filter(…).repartition(1).save()

在这种情况下,在原始分区上并行地进行过滤。

举个数量级的例子,在我的例子中,当从Hive表加载后过滤109M行(~105G)和~1000个分区时,运行时从合并(1)的~6h下降到重新分区(1)的~2m。

具体示例取自AirBnB的这篇文章,这篇文章非常好,甚至涵盖了Spark中重新分区技术的更多方面。

其他回答

重分区:将数据移到新的分区中。

如。初始数据帧划分为200个分区。

df.repartition(500):数据将从200个分区重新排列到新的500个分区。

联合:将数据移到现有的分区中。

df.coalesce(5):数据将从剩余的195个分区转移到5个现有分区。

重分区算法对数据进行完全洗牌,并创建大小相等的数据分区。Coalesce结合现有分区以避免完全洗牌。

Coalesce可以很好地使用一个具有大量分区的RDD,并将单个工作节点上的分区组合在一起,以生成一个具有较少分区的最终RDD。

重新分区将重新洗牌RDD中的数据,以产生您请求的最终分区数量。 DataFrames的分区看起来像是一个应该由框架管理的低级实现细节,但事实并非如此。当将大的dataframe过滤成小的dataframe时,你应该总是对数据进行重新分区。 你可能会经常把大的数据帧过滤成小的数据帧,所以要习惯重新分区。

如果你想了解更多细节,请阅读这篇博客文章。

贾斯汀的回答很棒,这个回答更有深度。

重分区算法进行完全洗牌,并使用均匀分布的数据创建新分区。让我们用1到12的数字创建一个DataFrame。

val x = (1 to 12).toList
val numbersDf = x.toDF("number")

numbersDf在我的机器上包含4个分区。

numbersDf.rdd.partitions.size // => 4

下面是数据在分区上的划分方式:

Partition 00000: 1, 2, 3
Partition 00001: 4, 5, 6
Partition 00002: 7, 8, 9
Partition 00003: 10, 11, 12

让我们使用重分区方法进行一次完全洗牌,并在两个节点上获得这些数据。

val numbersDfR = numbersDf.repartition(2)

下面是如何在我的机器上划分numbersDfR数据:

Partition A: 1, 3, 4, 6, 7, 9, 10, 12
Partition B: 2, 5, 8, 11

重分区方法创建新分区,并在新分区中均匀分布数据(对于较大的数据集,数据分布更均匀)。

合并和重新划分的区别

Coalesce使用现有分区来最小化打乱的数据量。重新分区创建新分区并进行完全洗牌。合并的结果是产生具有不同数据量的分区(有时分区的大小相差很大),而重新分区的结果是产生大小大致相同的分区。

合并和重新分区哪个更快?

联合可能比重新分区运行得快,但大小不等的分区通常比大小相等的分区运行得慢。在过滤了一个大型数据集之后,通常需要对数据集重新分区。我发现重新分区总体上更快,因为Spark是为处理相同大小的分区而构建的。

注意:我很好奇地发现重新分区会增加磁盘上数据的大小。在对大型数据集使用重分区/合并时,请确保运行测试。

如果你想了解更多细节,请阅读这篇博客文章。

当你在实践中使用合并和重分区

See this question on how to use coalesce & repartition to write out a DataFrame to a single file It's critical to repartition after running filtering queries. The number of partitions does not change after filtering, so if you don't repartition, you'll have way too many memory partitions (the more the filter reduces the dataset size, the bigger the problem). Watch out for the empty partition problem. partitionBy is used to write out data in partitions on disk. You'll need to use repartition / coalesce to partition your data in memory properly before using partitionBy.

所有的答案都为这个经常被问到的问题增添了一些伟大的知识。

所以根据这个问题的传统时间轴,这里是我的2美分。

我发现在非常具体的情况下,重新分区比合并更快。

在我的应用程序中,当我们估计的文件数量低于某个阈值时,重新分区工作得更快。

这就是我的意思

if(numFiles > 20)
    df.coalesce(numFiles).write.mode(SaveMode.Overwrite).parquet(dest)
else
    df.repartition(numFiles).write.mode(SaveMode.Overwrite).parquet(dest)

在上面的代码片段中,如果我的文件小于20,合并将永远无法完成,而重新分区要快得多,因此上面的代码。

当然,这个数字(20)将取决于工作人员的数量和数据量。

希望这能有所帮助。

但是你也应该确保,如果你在处理巨大的数据,将要合并的节点的数据应该是高度配置的。因为所有的数据都会加载到那些节点上,可能会导致内存异常。 虽然赔款很贵,但我还是愿意用它。因为它对数据进行了洗牌和平均分配。

在合并和重新分区之间进行明智的选择。