我试图理解在YARN上运行Spark作业时,内核数量和执行器数量之间的关系。

测试环境如下:

Number of data nodes: 3 Data node machine spec: CPU: Core i7-4790 (# of cores: 4, # of threads: 8) RAM: 32GB (8GB x 4) HDD: 8TB (2TB x 4) Network: 1Gb Spark version: 1.0.0 Hadoop version: 2.4.0 (Hortonworks HDP 2.1) Spark job flow: sc.textFile -> filter -> map -> filter -> mapToPair -> reduceByKey -> map -> saveAsTextFile Input data Type: single text file Size: 165GB Number of lines: 454,568,833 Output Number of lines after second filter: 310,640,717 Number of lines of the result file: 99,848,268 Size of the result file: 41GB

作业在以下配置下运行:

——master yarn-client——executor-memory 19G——executor-cores 7——num-executors 3(每个数据节点的executor,使用的内核数一样多) ——master yarn-client——executor-memory 19G——executor-cores 4——num-executor 3(内核数减少) ——master yarn-client——executor-memory 4G——executor-cores 2——num-executor 12(内核少,executor多)

运行时间:

50分15秒 55分48秒 12分23秒

令我惊讶的是,(3)要快得多。 我认为(1)会更快,因为洗牌时执行者之间的交流会更少。 虽然(1)的核数比(3)少,但核数不是关键因素,因为2)的表现很好。

(在pwilmot的回答之后添加了以下内容。)

性能监视器屏幕截图如下:

(1)的Ganglia数据节点摘要-作业开始于04:37。

(3)的Ganglia数据节点摘要-作业开始于19:47。请忽略之前的图表。

图表大致分为两部分:

第一:从start到reduceByKey: CPU密集型,没有网络活动 第二:reduceByKey: CPU降低后,网络I/O完成。

如图所示,(1)可以使用尽可能多的CPU功率。所以,这可能不是线程数量的问题。

如何解释这一结果?


当前回答

根据Sandy Ryza的说法,当你在HDFS上运行spark应用程序时

我注意到HDFS客户端有大量并发的问题 线程。粗略估计,每个执行者最多可以执行5个任务 实现全写吞吐量,所以最好保持数量 每个执行程序的内核数低于这个数字。

所以我相信你的第一个配置比第三个配置慢是因为HDFS的I/O吞吐量不好

其他回答

我认为其中一个主要原因是地方性。您的输入文件大小为165G,文件的相关块当然分布在多个datanode上,更多的执行器可以避免网络复制。

尝试设置executor num等于blocks count,我认为可以更快。

I haven't played with these settings myself so this is just speculation but if we think about this issue as normal cores and threads in a distributed system then in your cluster you can use up to 12 cores (4 * 3 machines) and 24 threads (8 * 3 machines). In your first two examples you are giving your job a fair number of cores (potential computation space) but the number of threads (jobs) to run on those cores is so limited that you aren't able to use much of the processing power allocated and thus the job is slower even though there is more computation resources allocated.

您提到您关心的是shuffle步骤—虽然在shuffle步骤中限制开销很好,但通常更重要的是利用集群的并行化。考虑一个极端的情况——一个没有shuffle的单线程程序。

To hopefully make all of this a little more concrete, here’s a worked example of configuring a Spark app to use as much of the cluster as possible: Imagine a cluster with six nodes running NodeManagers, each equipped with 16 cores and 64GB of memory. The NodeManager capacities, yarn.nodemanager.resource.memory-mb and yarn.nodemanager.resource.cpu-vcores, should probably be set to 63 * 1024 = 64512 (megabytes) and 15 respectively. We avoid allocating 100% of the resources to YARN containers because the node needs some resources to run the OS and Hadoop daemons. In this case, we leave a gigabyte and a core for these system processes. Cloudera Manager helps by accounting for these and configuring these YARN properties automatically. The likely first impulse would be to use --num-executors 6 --executor-cores 15 --executor-memory 63G. However, this is the wrong approach because: 63GB + the executor memory overhead won’t fit within the 63GB capacity of the NodeManagers. The application master will take up a core on one of the nodes, meaning that there won’t be room for a 15-core executor on that node. 15 cores per executor can lead to bad HDFS I/O throughput. A better option would be to use --num-executors 17 --executor-cores 5 --executor-memory 19G. Why? This config results in three executors on all nodes except for the one with the AM, which will have two executors. --executor-memory was derived as (63/3 executors per node) = 21. 21 * 0.07 = 1.47. 21 – 1.47 ~ 19.

Cloudera博客的一篇文章给出了解释,How-to: tuning Your Apache Spark Jobs (Part 2)。

来自RStudio的Sparklyr包页面的优秀资源:

SPARK DEFINITIONS: It may be useful to provide some simple definitions for the Spark nomenclature: Node: A server Worker Node: A server that is part of the cluster and are available to run Spark jobs Master Node: The server that coordinates the Worker nodes. Executor: A sort of virtual machine inside a node. One Node can have multiple Executors. Driver Node: The Node that initiates the Spark session. Typically, this will be the server where sparklyr is located. Driver (Executor): The Driver Node will also show up in the Executor list.

简而言之:我认为特巴乔是对的。您在执行器上达到了HDFS的吞吐量限制。

我认为答案可能比这里的一些建议要简单一些。

对我来说,线索在集群网络图中。对于运行1,利用率稳定在~ 50m字节/秒。对于运行3,稳定的利用率增加了一倍,约为100 M字节/秒。

从DzOrd分享的cloudera博客文章中,你可以看到这句重要的话:

我注意到HDFS客户端有大量并发线程的问题。一个粗略的猜测是,每个执行器最多5个任务可以实现完整的写吞吐量,所以最好将每个执行器的内核数保持在这个数字以下。

那么,让我们做一些计算,看看如果这是真的,我们期望的性能是什么。


运行1:19 GB, 7核,3个执行器

3个执行者x 7个线程= 21个线程 每个executor有7个内核,我们期望有限的IO到HDFS(最多5个内核) 有效吞吐量~= 3个executor x 5个线程= 15个线程

运行3:4 GB, 2核,12个执行器

2个executor x 12个线程= 24个线程 每个executor有2个内核,所以HDFS的吞吐量是可以的 有效吞吐量~= 12个executor x 2个线程= 24个线程


如果作业100%受到并发性(线程数)的限制。我们期望运行时与线程数完全成反比。

ratio_num_threads = nthread_job1 / nthread_job3 = 15/24 = 0.625
inv_ratio_runtime = 1/(duration_job1 / duration_job3) = 1/(50/31) = 31/50 = 0.62

因此,ratio_num_threads ~= inv_ratio_runtime,看起来我们受到了网络限制。

同样的效果解释了运行1和运行2之间的差异。


运行2:19 GB, 4核,3个执行器

3个执行者x 4个线程= 12个线程 每个executor有4个内核,可以IO到HDFS 有效吞吐量~= 3个执行者x 4个线程= 12个线程


比较有效线程数和运行时数:

ratio_num_threads = nthread_job2 / nthread_job1 = 12/15 = 0.8
inv_ratio_runtime = 1/(duration_job2 / duration_job1) = 1/(55/50) = 50/55 = 0.91

它没有上次的比较那么完美,但是当失去线程时,我们仍然可以看到类似的性能下降。

现在是最后一点:为什么线程越多性能越好,尤其是线程数比cpu数多?

并行性(我们通过将数据划分到多个CPU上得到的东西)和并发性(我们使用多个线程在一个CPU上工作时得到的东西)之间的区别,Rob Pike在这篇很棒的文章中提供了一个很好的解释:并发性不是并行性。

简单的解释是,如果一个Spark作业与一个文件系统或网络交互,那么CPU会花很多时间等待与这些接口的通信,而不是花很多时间实际“工作”。通过同时为这些cpu提供多个任务,它们将花费更少的等待时间和更多的工作时间,从而获得更好的性能。