根据Learning Spark

请记住,重新划分数据是一项相当昂贵的操作。 Spark还有一个repartition()的优化版本,称为coalesce(),它允许避免数据移动,但仅当您正在减少RDD分区的数量时。

我得到的一个区别是,使用repartition()可以增加/减少分区的数量,但使用coalesce()只能减少分区的数量。

如果分区分布在多台机器上,并且运行了coalesce(),它如何避免数据移动?


当前回答

重分区:将数据移到新的分区中。

如。初始数据帧划分为200个分区。

df.repartition(500):数据将从200个分区重新排列到新的500个分区。

联合:将数据移到现有的分区中。

df.coalesce(5):数据将从剩余的195个分区转移到5个现有分区。

其他回答

重分区算法对数据进行完全洗牌,并创建大小相等的数据分区。Coalesce结合现有分区以避免完全洗牌。

Coalesce可以很好地使用一个具有大量分区的RDD,并将单个工作节点上的分区组合在一起,以生成一个具有较少分区的最终RDD。

重新分区将重新洗牌RDD中的数据,以产生您请求的最终分区数量。 DataFrames的分区看起来像是一个应该由框架管理的低级实现细节,但事实并非如此。当将大的dataframe过滤成小的dataframe时,你应该总是对数据进行重新分区。 你可能会经常把大的数据帧过滤成小的数据帧,所以要习惯重新分区。

如果你想了解更多细节,请阅读这篇博客文章。

用一种简单的方式 COALESCE:-仅用于减少分区数量,没有数据变换,它只是压缩分区

REPARTITION:-用于增加和减少分区的数量,但会发生洗牌

例子:-

val rdd = sc.textFile("path",7)
rdd.repartition(10)
rdd.repartition(2)

两者都很好

但是当我们需要在一个集群中看到输出时,我们通常会选择这两个。

基本上,重分区允许您增加或减少分区的数量。重分区重新分配来自所有分区的数据,这导致完全shuffle,这是非常昂贵的操作。

Coalesce是重新分区的优化版本,您只能减少分区的数量。由于我们只能减少分区的数量,它所做的是将一些分区合并为一个分区。通过合并分区,与重新分区相比,跨分区的数据移动更低。所以在Coalesce中是最小的数据移动,但说Coalesce不做数据移动是完全错误的说法。

另一件事是通过提供分区的数量来重新分区,它试图在所有分区上均匀地重新分配数据而在Coalesce的情况下,在某些情况下我们仍然可能有倾斜的数据。

以下是代码级别的一些额外细节/差异:

在这里只添加函数定义,完整的代码实现检查spark的github页面。

下面是在数据帧上重新分区的不同方法: 点击这里查看完整实现。

def repartition(numPartitions: Int): Dataset[T]

每当我们在dataframe上调用上述方法时,它都会返回一个新的数据集,该数据集恰好有numPartitions分区。

def repartition(numPartitions: Int, partitionExprs: Column*): Dataset[T]

上述方法返回一个新的数据集,该数据集由给定的分区表达式划分为numPartitions。生成的数据集是哈希分区的。

 def repartition(partitionExprs: Column*): Dataset[T]

上面的方法返回一个新的数据集,由给定的分区表达式划分,使用spark.sql.shuffle.partitions作为分区数。生成的数据集是哈希分区的。

def repartitionByRange(numPartitions: Int, partitionExprs: Column*): Dataset[T]

上述方法返回一个新的数据集,该数据集由给定的分区表达式划分为numPartitions。生成的数据集是范围分区的。

def repartitionByRange(partitionExprs: Column*): Dataset[T]

上面的方法返回一个新的数据集,由给定的分区表达式划分,使用spark.sql.shuffle.partitions作为分区数。生成的数据集是范围分区的。

但是对于合并,我们只有以下方法在数据框架上:

def coalesce(numPartitions: Int): Dataset[T] 

上述方法将返回一个新的数据集,该数据集恰好有numPartitions分区

下面是RDD上可用于重分区和合并的方法: 点击这里查看完整实现。

  def coalesce(numPartitions: Int, shuffle: Boolean = false,
           partitionCoalescer: Option[PartitionCoalescer] = Option.empty)
          (implicit ord: Ordering[T] = null)
  : RDD[T]

  def repartition(numPartitions: Int)(implicit ord: Ordering[T] = null): RDD[T] = withScope {
coalesce(numPartitions, shuffle = true)

}

基本上,重分区方法通过将shuffle值传递为true来调用合并方法。 现在如果我们在RDD上使用coalesce方法,通过传递shuffle值为true,我们也可以增加分区!

这里需要注意的一点是,Spark RDD的基本原则是不变性。重新分区或合并将创建新的RDD。基本RDD将继续存在其原始分区数量。如果用例要求将RDD持久化在缓存中,则必须对新创建的RDD进行同样的操作。

scala> pairMrkt.repartition(10)
res16: org.apache.spark.rdd.RDD[(String, Array[String])] =MapPartitionsRDD[11] at repartition at <console>:26

scala> res16.partitions.length
res17: Int = 10

scala>  pairMrkt.partitions.length
res20: Int = 2