我试图理解在YARN上运行Spark作业时,内核数量和执行器数量之间的关系。

测试环境如下:

Number of data nodes: 3 Data node machine spec: CPU: Core i7-4790 (# of cores: 4, # of threads: 8) RAM: 32GB (8GB x 4) HDD: 8TB (2TB x 4) Network: 1Gb Spark version: 1.0.0 Hadoop version: 2.4.0 (Hortonworks HDP 2.1) Spark job flow: sc.textFile -> filter -> map -> filter -> mapToPair -> reduceByKey -> map -> saveAsTextFile Input data Type: single text file Size: 165GB Number of lines: 454,568,833 Output Number of lines after second filter: 310,640,717 Number of lines of the result file: 99,848,268 Size of the result file: 41GB

作业在以下配置下运行:

——master yarn-client——executor-memory 19G——executor-cores 7——num-executors 3(每个数据节点的executor,使用的内核数一样多) ——master yarn-client——executor-memory 19G——executor-cores 4——num-executor 3(内核数减少) ——master yarn-client——executor-memory 4G——executor-cores 2——num-executor 12(内核少,executor多)

运行时间:

50分15秒 55分48秒 12分23秒

令我惊讶的是,(3)要快得多。 我认为(1)会更快,因为洗牌时执行者之间的交流会更少。 虽然(1)的核数比(3)少,但核数不是关键因素,因为2)的表现很好。

(在pwilmot的回答之后添加了以下内容。)

性能监视器屏幕截图如下:

(1)的Ganglia数据节点摘要-作业开始于04:37。

(3)的Ganglia数据节点摘要-作业开始于19:47。请忽略之前的图表。

图表大致分为两部分:

第一:从start到reduceByKey: CPU密集型,没有网络活动 第二:reduceByKey: CPU降低后,网络I/O完成。

如图所示,(1)可以使用尽可能多的CPU功率。所以,这可能不是线程数量的问题。

如何解释这一结果?


当前回答

I haven't played with these settings myself so this is just speculation but if we think about this issue as normal cores and threads in a distributed system then in your cluster you can use up to 12 cores (4 * 3 machines) and 24 threads (8 * 3 machines). In your first two examples you are giving your job a fair number of cores (potential computation space) but the number of threads (jobs) to run on those cores is so limited that you aren't able to use much of the processing power allocated and thus the job is slower even though there is more computation resources allocated.

您提到您关心的是shuffle步骤—虽然在shuffle步骤中限制开销很好,但通常更重要的是利用集群的并行化。考虑一个极端的情况——一个没有shuffle的单线程程序。

其他回答

I haven't played with these settings myself so this is just speculation but if we think about this issue as normal cores and threads in a distributed system then in your cluster you can use up to 12 cores (4 * 3 machines) and 24 threads (8 * 3 machines). In your first two examples you are giving your job a fair number of cores (potential computation space) but the number of threads (jobs) to run on those cores is so limited that you aren't able to use much of the processing power allocated and thus the job is slower even though there is more computation resources allocated.

您提到您关心的是shuffle步骤—虽然在shuffle步骤中限制开销很好,但通常更重要的是利用集群的并行化。考虑一个极端的情况——一个没有shuffle的单线程程序。

To hopefully make all of this a little more concrete, here’s a worked example of configuring a Spark app to use as much of the cluster as possible: Imagine a cluster with six nodes running NodeManagers, each equipped with 16 cores and 64GB of memory. The NodeManager capacities, yarn.nodemanager.resource.memory-mb and yarn.nodemanager.resource.cpu-vcores, should probably be set to 63 * 1024 = 64512 (megabytes) and 15 respectively. We avoid allocating 100% of the resources to YARN containers because the node needs some resources to run the OS and Hadoop daemons. In this case, we leave a gigabyte and a core for these system processes. Cloudera Manager helps by accounting for these and configuring these YARN properties automatically. The likely first impulse would be to use --num-executors 6 --executor-cores 15 --executor-memory 63G. However, this is the wrong approach because: 63GB + the executor memory overhead won’t fit within the 63GB capacity of the NodeManagers. The application master will take up a core on one of the nodes, meaning that there won’t be room for a 15-core executor on that node. 15 cores per executor can lead to bad HDFS I/O throughput. A better option would be to use --num-executors 17 --executor-cores 5 --executor-memory 19G. Why? This config results in three executors on all nodes except for the one with the AM, which will have two executors. --executor-memory was derived as (63/3 executors per node) = 21. 21 * 0.07 = 1.47. 21 – 1.47 ~ 19.

Cloudera博客的一篇文章给出了解释,How-to: tuning Your Apache Spark Jobs (Part 2)。

我认为其中一个主要原因是地方性。您的输入文件大小为165G,文件的相关块当然分布在多个datanode上,更多的执行器可以避免网络复制。

尝试设置executor num等于blocks count,我认为可以更快。

根据Sandy Ryza的说法,当你在HDFS上运行spark应用程序时

我注意到HDFS客户端有大量并发的问题 线程。粗略估计,每个执行者最多可以执行5个任务 实现全写吞吐量,所以最好保持数量 每个执行程序的内核数低于这个数字。

所以我相信你的第一个配置比第三个配置慢是因为HDFS的I/O吞吐量不好

在2.)配置中,您减少了并行任务,因此我认为您的比较是不公平的。 使——num-executors至少为5。 因此,与1.)配置中的21个任务相比,您将有20个任务正在运行。 这样比较对我来说就公平了。

另外,请相应地计算执行程序内存。