根据Learning Spark
请记住,重新划分数据是一项相当昂贵的操作。 Spark还有一个repartition()的优化版本,称为coalesce(),它允许避免数据移动,但仅当您正在减少RDD分区的数量时。
我得到的一个区别是,使用repartition()可以增加/减少分区的数量,但使用coalesce()只能减少分区的数量。
如果分区分布在多台机器上,并且运行了coalesce(),它如何避免数据移动?
根据Learning Spark
请记住,重新划分数据是一项相当昂贵的操作。 Spark还有一个repartition()的优化版本,称为coalesce(),它允许避免数据移动,但仅当您正在减少RDD分区的数量时。
我得到的一个区别是,使用repartition()可以增加/减少分区的数量,但使用coalesce()只能减少分区的数量。
如果分区分布在多台机器上,并且运行了coalesce(),它如何避免数据移动?
当前回答
合并比重新分区执行得更好。合并总是减少分区。假设你在yarn中启用动态分配,你有四个分区和执行器。如果过滤器应用于它,超过可能的一个或多个执行程序是空的,没有数据。这个问题可以通过合并而不是重新划分来解决。
其他回答
另一个不同之处是考虑到存在倾斜连接的情况,您必须在其之上进行合并。在大多数情况下,重新分区将解决倾斜连接,然后您可以进行合并。
另一种情况是,假设你在一个数据帧中保存了一个中等/大量的数据,你必须批量生成到Kafka。在某些情况下,在生成到Kafka之前,重新分区有助于collectasList。但是,当容量非常大时,重新分区可能会导致严重的性能影响。在这种情况下,直接从dataframe生成Kafka会有所帮助。
附注:Coalesce并不像在工作人员之间进行完整的数据移动那样避免数据移动。但它确实减少了洗牌的次数。我想这就是那本书的意思。
我想在贾斯汀和鲍尔的回答中补充一点——
重新分区将忽略现有分区并创建新分区。所以你可以用它来修复数据倾斜。您可以使用分区键来定义分布。数据倾斜是“大数据”问题空间中最大的问题之一。
Coalesce将使用现有分区并对其中的一个子集进行洗牌。它不能像重新分区那样修复数据倾斜。因此,即使它更便宜,它也可能不是你需要的东西。
以下是代码级别的一些额外细节/差异:
在这里只添加函数定义,完整的代码实现检查spark的github页面。
下面是在数据帧上重新分区的不同方法: 点击这里查看完整实现。
def repartition(numPartitions: Int): Dataset[T]
每当我们在dataframe上调用上述方法时,它都会返回一个新的数据集,该数据集恰好有numPartitions分区。
def repartition(numPartitions: Int, partitionExprs: Column*): Dataset[T]
上述方法返回一个新的数据集,该数据集由给定的分区表达式划分为numPartitions。生成的数据集是哈希分区的。
def repartition(partitionExprs: Column*): Dataset[T]
上面的方法返回一个新的数据集,由给定的分区表达式划分,使用spark.sql.shuffle.partitions作为分区数。生成的数据集是哈希分区的。
def repartitionByRange(numPartitions: Int, partitionExprs: Column*): Dataset[T]
上述方法返回一个新的数据集,该数据集由给定的分区表达式划分为numPartitions。生成的数据集是范围分区的。
def repartitionByRange(partitionExprs: Column*): Dataset[T]
上面的方法返回一个新的数据集,由给定的分区表达式划分,使用spark.sql.shuffle.partitions作为分区数。生成的数据集是范围分区的。
但是对于合并,我们只有以下方法在数据框架上:
def coalesce(numPartitions: Int): Dataset[T]
上述方法将返回一个新的数据集,该数据集恰好有numPartitions分区
下面是RDD上可用于重分区和合并的方法: 点击这里查看完整实现。
def coalesce(numPartitions: Int, shuffle: Boolean = false,
partitionCoalescer: Option[PartitionCoalescer] = Option.empty)
(implicit ord: Ordering[T] = null)
: RDD[T]
def repartition(numPartitions: Int)(implicit ord: Ordering[T] = null): RDD[T] = withScope {
coalesce(numPartitions, shuffle = true)
}
基本上,重分区方法通过将shuffle值传递为true来调用合并方法。 现在如果我们在RDD上使用coalesce方法,通过传递shuffle值为true,我们也可以增加分区!
但是你也应该确保,如果你在处理巨大的数据,将要合并的节点的数据应该是高度配置的。因为所有的数据都会加载到那些节点上,可能会导致内存异常。 虽然赔款很贵,但我还是愿意用它。因为它对数据进行了洗牌和平均分配。
在合并和重新分区之间进行明智的选择。
对于那些从PySpark (AWS EMR)生成单个csv文件并将其保存在s3上的问题,使用重新分区会有所帮助。原因是,合并不能进行完全洗牌,但重新分区可以。从本质上讲,您可以使用重分区增加或减少分区的数量,但使用合并只能减少分区的数量(而不是1)。以下是为试图从AWS EMR写入csv到s3的任何人编写的代码:
df.repartition(1).write.format('csv')\
.option("path", "s3a://my.bucket.name/location")\
.save(header = 'true')