根据Learning Spark

请记住,重新划分数据是一项相当昂贵的操作。 Spark还有一个repartition()的优化版本,称为coalesce(),它允许避免数据移动,但仅当您正在减少RDD分区的数量时。

我得到的一个区别是,使用repartition()可以增加/减少分区的数量,但使用coalesce()只能减少分区的数量。

如果分区分布在多台机器上,并且运行了coalesce(),它如何避免数据移动?


当前回答

以下是代码级别的一些额外细节/差异:

在这里只添加函数定义,完整的代码实现检查spark的github页面。

下面是在数据帧上重新分区的不同方法: 点击这里查看完整实现。

def repartition(numPartitions: Int): Dataset[T]

每当我们在dataframe上调用上述方法时,它都会返回一个新的数据集,该数据集恰好有numPartitions分区。

def repartition(numPartitions: Int, partitionExprs: Column*): Dataset[T]

上述方法返回一个新的数据集,该数据集由给定的分区表达式划分为numPartitions。生成的数据集是哈希分区的。

 def repartition(partitionExprs: Column*): Dataset[T]

上面的方法返回一个新的数据集,由给定的分区表达式划分,使用spark.sql.shuffle.partitions作为分区数。生成的数据集是哈希分区的。

def repartitionByRange(numPartitions: Int, partitionExprs: Column*): Dataset[T]

上述方法返回一个新的数据集,该数据集由给定的分区表达式划分为numPartitions。生成的数据集是范围分区的。

def repartitionByRange(partitionExprs: Column*): Dataset[T]

上面的方法返回一个新的数据集,由给定的分区表达式划分,使用spark.sql.shuffle.partitions作为分区数。生成的数据集是范围分区的。

但是对于合并,我们只有以下方法在数据框架上:

def coalesce(numPartitions: Int): Dataset[T] 

上述方法将返回一个新的数据集,该数据集恰好有numPartitions分区

下面是RDD上可用于重分区和合并的方法: 点击这里查看完整实现。

  def coalesce(numPartitions: Int, shuffle: Boolean = false,
           partitionCoalescer: Option[PartitionCoalescer] = Option.empty)
          (implicit ord: Ordering[T] = null)
  : RDD[T]

  def repartition(numPartitions: Int)(implicit ord: Ordering[T] = null): RDD[T] = withScope {
coalesce(numPartitions, shuffle = true)

}

基本上,重分区方法通过将shuffle值传递为true来调用合并方法。 现在如果我们在RDD上使用coalesce方法,通过传递shuffle值为true,我们也可以增加分区!

其他回答

有一个重分区>>合并的用例,即使在@Rob的回答中提到的分区号减少,也就是将数据写入单个文件。

@Rob的回答暗示了一个好的方向,但我认为需要一些进一步的解释来理解引擎盖下面发生了什么。

如果您需要在写入数据之前过滤数据,那么重新分区比coalesce更适合,因为coalesce将在加载操作之前下推。

例如: load () . map(…).filter(…).coalesce (1) .save ()

翻译: load () .coalesce (1) . map(…).filter(…).save ()

这意味着您的所有数据将被压缩到一个单独的分区中,在那里它将被过滤,失去所有的并行性。 这种情况甚至会发生在非常简单的过滤器,如column='value'。

load().map(…).filter(…).repartition(1).save()

在这种情况下,在原始分区上并行地进行过滤。

举个数量级的例子,在我的例子中,当从Hive表加载后过滤109M行(~105G)和~1000个分区时,运行时从合并(1)的~6h下降到重新分区(1)的~2m。

具体示例取自AirBnB的这篇文章,这篇文章非常好,甚至涵盖了Spark中重新分区技术的更多方面。

它避免了完全洗牌。如果已知分区数量正在减少,则执行器可以安全地将数据保存在最小分区数量上,只将数据从额外的节点移到我们保留的节点上。

所以,它会是这样的:

Node 1 = 1,2,3
Node 2 = 4,5,6
Node 3 = 7,8,9
Node 4 = 10,11,12

然后合并到2个分区:

Node 1 = 1,2,3 + (10,11,12)
Node 3 = 7,8,9 + (4,5,6)

注意,节点1和节点3不需要移动其原始数据。

用一种简单的方式 COALESCE:-仅用于减少分区数量,没有数据变换,它只是压缩分区

REPARTITION:-用于增加和减少分区的数量,但会发生洗牌

例子:-

val rdd = sc.textFile("path",7)
rdd.repartition(10)
rdd.repartition(2)

两者都很好

但是当我们需要在一个集群中看到输出时,我们通常会选择这两个。

Coalesce使用现有分区来最小化数据量 被打乱。重新分区将创建新的分区并执行满分区 洗牌。 合并会产生具有不同数据量的分区 (有时分区有许多不同的大小)和 重新分区会产生大小大致相同的分区。 合并可以减少分区,但修复可以用来增加或减少分区。

另一个不同之处是考虑到存在倾斜连接的情况,您必须在其之上进行合并。在大多数情况下,重新分区将解决倾斜连接,然后您可以进行合并。

另一种情况是,假设你在一个数据帧中保存了一个中等/大量的数据,你必须批量生成到Kafka。在某些情况下,在生成到Kafka之前,重新分区有助于collectasList。但是,当容量非常大时,重新分区可能会导致严重的性能影响。在这种情况下,直接从dataframe生成Kafka会有所帮助。

附注:Coalesce并不像在工作人员之间进行完整的数据移动那样避免数据移动。但它确实减少了洗牌的次数。我想这就是那本书的意思。