根据Learning Spark
请记住,重新划分数据是一项相当昂贵的操作。 Spark还有一个repartition()的优化版本,称为coalesce(),它允许避免数据移动,但仅当您正在减少RDD分区的数量时。
我得到的一个区别是,使用repartition()可以增加/减少分区的数量,但使用coalesce()只能减少分区的数量。
如果分区分布在多台机器上,并且运行了coalesce(),它如何避免数据移动?
根据Learning Spark
请记住,重新划分数据是一项相当昂贵的操作。 Spark还有一个repartition()的优化版本,称为coalesce(),它允许避免数据移动,但仅当您正在减少RDD分区的数量时。
我得到的一个区别是,使用repartition()可以增加/减少分区的数量,但使用coalesce()只能减少分区的数量。
如果分区分布在多台机器上,并且运行了coalesce(),它如何避免数据移动?
当前回答
从代码和代码文档中可以看出,coalesce(n)与coalesce(n, shuffle = false)相同,而repartition(n)与coalesce(n, shuffle = true)相同。
因此,合并和重新分区都可以用来增加分区的数量
使用shuffle = true,实际上可以合并为更大的数字 的分区。如果你有少量的分区,这很有用, 比如100,可能有几个分区异常大。
另一个需要强调的重要注意事项是,如果您大幅减少分区数量,则应该考虑使用合并的打乱版本(在这种情况下与重新分区相同)。这将允许您的计算在父分区上并行执行(多个任务)。
然而,如果你正在做一个激烈的合并,例如numPartitions = 1,这可能会导致你的计算发生在比你想要的更少的节点上(例如,numPartitions = 1的情况下只有一个节点)。为了避免这种情况,你可以传递shuffle = true。这将添加一个shuffle步骤,但意味着当前的上游分区将并行执行(无论当前分区是什么)。
相关答案也请参考此处
其他回答
对于那些从PySpark (AWS EMR)生成单个csv文件并将其保存在s3上的问题,使用重新分区会有所帮助。原因是,合并不能进行完全洗牌,但重新分区可以。从本质上讲,您可以使用重分区增加或减少分区的数量,但使用合并只能减少分区的数量(而不是1)。以下是为试图从AWS EMR写入csv到s3的任何人编写的代码:
df.repartition(1).write.format('csv')\
.option("path", "s3a://my.bucket.name/location")\
.save(header = 'true')
有一个重分区>>合并的用例,即使在@Rob的回答中提到的分区号减少,也就是将数据写入单个文件。
@Rob的回答暗示了一个好的方向,但我认为需要一些进一步的解释来理解引擎盖下面发生了什么。
如果您需要在写入数据之前过滤数据,那么重新分区比coalesce更适合,因为coalesce将在加载操作之前下推。
例如: load () . map(…).filter(…).coalesce (1) .save ()
翻译: load () .coalesce (1) . map(…).filter(…).save ()
这意味着您的所有数据将被压缩到一个单独的分区中,在那里它将被过滤,失去所有的并行性。 这种情况甚至会发生在非常简单的过滤器,如column='value'。
load().map(…).filter(…).repartition(1).save()
在这种情况下,在原始分区上并行地进行过滤。
举个数量级的例子,在我的例子中,当从Hive表加载后过滤109M行(~105G)和~1000个分区时,运行时从合并(1)的~6h下降到重新分区(1)的~2m。
具体示例取自AirBnB的这篇文章,这篇文章非常好,甚至涵盖了Spark中重新分区技术的更多方面。
合并比重新分区执行得更好。合并总是减少分区。假设你在yarn中启用动态分配,你有四个分区和执行器。如果过滤器应用于它,超过可能的一个或多个执行程序是空的,没有数据。这个问题可以通过合并而不是重新划分来解决。
重分区算法对数据进行完全洗牌,并创建大小相等的数据分区。Coalesce结合现有分区以避免完全洗牌。
Coalesce可以很好地使用一个具有大量分区的RDD,并将单个工作节点上的分区组合在一起,以生成一个具有较少分区的最终RDD。
重新分区将重新洗牌RDD中的数据,以产生您请求的最终分区数量。 DataFrames的分区看起来像是一个应该由框架管理的低级实现细节,但事实并非如此。当将大的dataframe过滤成小的dataframe时,你应该总是对数据进行重新分区。 你可能会经常把大的数据帧过滤成小的数据帧,所以要习惯重新分区。
如果你想了解更多细节,请阅读这篇博客文章。
另一个不同之处是考虑到存在倾斜连接的情况,您必须在其之上进行合并。在大多数情况下,重新分区将解决倾斜连接,然后您可以进行合并。
另一种情况是,假设你在一个数据帧中保存了一个中等/大量的数据,你必须批量生成到Kafka。在某些情况下,在生成到Kafka之前,重新分区有助于collectasList。但是,当容量非常大时,重新分区可能会导致严重的性能影响。在这种情况下,直接从dataframe生成Kafka会有所帮助。
附注:Coalesce并不像在工作人员之间进行完整的数据移动那样避免数据移动。但它确实减少了洗牌的次数。我想这就是那本书的意思。