根据Learning Spark

请记住,重新划分数据是一项相当昂贵的操作。 Spark还有一个repartition()的优化版本,称为coalesce(),它允许避免数据移动,但仅当您正在减少RDD分区的数量时。

我得到的一个区别是,使用repartition()可以增加/减少分区的数量,但使用coalesce()只能减少分区的数量。

如果分区分布在多台机器上,并且运行了coalesce(),它如何避免数据移动?


当前回答

用一种简单的方式 COALESCE:-仅用于减少分区数量,没有数据变换,它只是压缩分区

REPARTITION:-用于增加和减少分区的数量,但会发生洗牌

例子:-

val rdd = sc.textFile("path",7)
rdd.repartition(10)
rdd.repartition(2)

两者都很好

但是当我们需要在一个集群中看到输出时,我们通常会选择这两个。

其他回答

用一种简单的方式 COALESCE:-仅用于减少分区数量,没有数据变换,它只是压缩分区

REPARTITION:-用于增加和减少分区的数量,但会发生洗牌

例子:-

val rdd = sc.textFile("path",7)
rdd.repartition(10)
rdd.repartition(2)

两者都很好

但是当我们需要在一个集群中看到输出时,我们通常会选择这两个。

这里需要注意的一点是,Spark RDD的基本原则是不变性。重新分区或合并将创建新的RDD。基本RDD将继续存在其原始分区数量。如果用例要求将RDD持久化在缓存中,则必须对新创建的RDD进行同样的操作。

scala> pairMrkt.repartition(10)
res16: org.apache.spark.rdd.RDD[(String, Array[String])] =MapPartitionsRDD[11] at repartition at <console>:26

scala> res16.partitions.length
res17: Int = 10

scala>  pairMrkt.partitions.length
res20: Int = 2

贾斯汀的回答很棒,这个回答更有深度。

重分区算法进行完全洗牌,并使用均匀分布的数据创建新分区。让我们用1到12的数字创建一个DataFrame。

val x = (1 to 12).toList
val numbersDf = x.toDF("number")

numbersDf在我的机器上包含4个分区。

numbersDf.rdd.partitions.size // => 4

下面是数据在分区上的划分方式:

Partition 00000: 1, 2, 3
Partition 00001: 4, 5, 6
Partition 00002: 7, 8, 9
Partition 00003: 10, 11, 12

让我们使用重分区方法进行一次完全洗牌,并在两个节点上获得这些数据。

val numbersDfR = numbersDf.repartition(2)

下面是如何在我的机器上划分numbersDfR数据:

Partition A: 1, 3, 4, 6, 7, 9, 10, 12
Partition B: 2, 5, 8, 11

重分区方法创建新分区,并在新分区中均匀分布数据(对于较大的数据集,数据分布更均匀)。

合并和重新划分的区别

Coalesce使用现有分区来最小化打乱的数据量。重新分区创建新分区并进行完全洗牌。合并的结果是产生具有不同数据量的分区(有时分区的大小相差很大),而重新分区的结果是产生大小大致相同的分区。

合并和重新分区哪个更快?

联合可能比重新分区运行得快,但大小不等的分区通常比大小相等的分区运行得慢。在过滤了一个大型数据集之后,通常需要对数据集重新分区。我发现重新分区总体上更快,因为Spark是为处理相同大小的分区而构建的。

注意:我很好奇地发现重新分区会增加磁盘上数据的大小。在对大型数据集使用重分区/合并时,请确保运行测试。

如果你想了解更多细节,请阅读这篇博客文章。

当你在实践中使用合并和重分区

See this question on how to use coalesce & repartition to write out a DataFrame to a single file It's critical to repartition after running filtering queries. The number of partitions does not change after filtering, so if you don't repartition, you'll have way too many memory partitions (the more the filter reduces the dataset size, the bigger the problem). Watch out for the empty partition problem. partitionBy is used to write out data in partitions on disk. You'll need to use repartition / coalesce to partition your data in memory properly before using partitionBy.

所有的答案都为这个经常被问到的问题增添了一些伟大的知识。

所以根据这个问题的传统时间轴,这里是我的2美分。

我发现在非常具体的情况下,重新分区比合并更快。

在我的应用程序中,当我们估计的文件数量低于某个阈值时,重新分区工作得更快。

这就是我的意思

if(numFiles > 20)
    df.coalesce(numFiles).write.mode(SaveMode.Overwrite).parquet(dest)
else
    df.repartition(numFiles).write.mode(SaveMode.Overwrite).parquet(dest)

在上面的代码片段中,如果我的文件小于20,合并将永远无法完成,而重新分区要快得多,因此上面的代码。

当然,这个数字(20)将取决于工作人员的数量和数据量。

希望这能有所帮助。

对于那些从PySpark (AWS EMR)生成单个csv文件并将其保存在s3上的问题,使用重新分区会有所帮助。原因是,合并不能进行完全洗牌,但重新分区可以。从本质上讲,您可以使用重分区增加或减少分区的数量,但使用合并只能减少分区的数量(而不是1)。以下是为试图从AWS EMR写入csv到s3的任何人编写的代码:

df.repartition(1).write.format('csv')\
.option("path", "s3a://my.bucket.name/location")\
.save(header = 'true')