根据Learning Spark

请记住,重新划分数据是一项相当昂贵的操作。 Spark还有一个repartition()的优化版本,称为coalesce(),它允许避免数据移动,但仅当您正在减少RDD分区的数量时。

我得到的一个区别是,使用repartition()可以增加/减少分区的数量,但使用coalesce()只能减少分区的数量。

如果分区分布在多台机器上,并且运行了coalesce(),它如何避免数据移动?


当前回答

对于所有这些伟大的答案,我想补充的是,重新分区是利用数据并行化的最佳选择之一。而coalesce提供了一个廉价的选择来减少分区,并且在将数据写入HDFS或其他接收器以利用大写入时非常有用。

我发现这在以拼花格式写数据时很有用,可以充分利用它。

其他回答

我想在贾斯汀和鲍尔的回答中补充一点——

重新分区将忽略现有分区并创建新分区。所以你可以用它来修复数据倾斜。您可以使用分区键来定义分布。数据倾斜是“大数据”问题空间中最大的问题之一。

Coalesce将使用现有分区并对其中的一个子集进行洗牌。它不能像重新分区那样修复数据倾斜。因此,即使它更便宜,它也可能不是你需要的东西。

它避免了完全洗牌。如果已知分区数量正在减少,则执行器可以安全地将数据保存在最小分区数量上,只将数据从额外的节点移到我们保留的节点上。

所以,它会是这样的:

Node 1 = 1,2,3
Node 2 = 4,5,6
Node 3 = 7,8,9
Node 4 = 10,11,12

然后合并到2个分区:

Node 1 = 1,2,3 + (10,11,12)
Node 3 = 7,8,9 + (4,5,6)

注意,节点1和节点3不需要移动其原始数据。

用一种简单的方式 COALESCE:-仅用于减少分区数量,没有数据变换,它只是压缩分区

REPARTITION:-用于增加和减少分区的数量,但会发生洗牌

例子:-

val rdd = sc.textFile("path",7)
rdd.repartition(10)
rdd.repartition(2)

两者都很好

但是当我们需要在一个集群中看到输出时,我们通常会选择这两个。

这里需要注意的一点是,Spark RDD的基本原则是不变性。重新分区或合并将创建新的RDD。基本RDD将继续存在其原始分区数量。如果用例要求将RDD持久化在缓存中,则必须对新创建的RDD进行同样的操作。

scala> pairMrkt.repartition(10)
res16: org.apache.spark.rdd.RDD[(String, Array[String])] =MapPartitionsRDD[11] at repartition at <console>:26

scala> res16.partitions.length
res17: Int = 10

scala>  pairMrkt.partitions.length
res20: Int = 2

对于那些从PySpark (AWS EMR)生成单个csv文件并将其保存在s3上的问题,使用重新分区会有所帮助。原因是,合并不能进行完全洗牌,但重新分区可以。从本质上讲,您可以使用重分区增加或减少分区的数量,但使用合并只能减少分区的数量(而不是1)。以下是为试图从AWS EMR写入csv到s3的任何人编写的代码:

df.repartition(1).write.format('csv')\
.option("path", "s3a://my.bucket.name/location")\
.save(header = 'true')