我试图理解在YARN上运行Spark作业时,内核数量和执行器数量之间的关系。

测试环境如下:

Number of data nodes: 3 Data node machine spec: CPU: Core i7-4790 (# of cores: 4, # of threads: 8) RAM: 32GB (8GB x 4) HDD: 8TB (2TB x 4) Network: 1Gb Spark version: 1.0.0 Hadoop version: 2.4.0 (Hortonworks HDP 2.1) Spark job flow: sc.textFile -> filter -> map -> filter -> mapToPair -> reduceByKey -> map -> saveAsTextFile Input data Type: single text file Size: 165GB Number of lines: 454,568,833 Output Number of lines after second filter: 310,640,717 Number of lines of the result file: 99,848,268 Size of the result file: 41GB

作业在以下配置下运行:

——master yarn-client——executor-memory 19G——executor-cores 7——num-executors 3(每个数据节点的executor,使用的内核数一样多) ——master yarn-client——executor-memory 19G——executor-cores 4——num-executor 3(内核数减少) ——master yarn-client——executor-memory 4G——executor-cores 2——num-executor 12(内核少,executor多)

运行时间:

50分15秒 55分48秒 12分23秒

令我惊讶的是,(3)要快得多。 我认为(1)会更快,因为洗牌时执行者之间的交流会更少。 虽然(1)的核数比(3)少,但核数不是关键因素,因为2)的表现很好。

(在pwilmot的回答之后添加了以下内容。)

性能监视器屏幕截图如下:

(1)的Ganglia数据节点摘要-作业开始于04:37。

(3)的Ganglia数据节点摘要-作业开始于19:47。请忽略之前的图表。

图表大致分为两部分:

第一:从start到reduceByKey: CPU密集型,没有网络活动 第二:reduceByKey: CPU降低后,网络I/O完成。

如图所示,(1)可以使用尽可能多的CPU功率。所以,这可能不是线程数量的问题。

如何解释这一结果?


当前回答

To hopefully make all of this a little more concrete, here’s a worked example of configuring a Spark app to use as much of the cluster as possible: Imagine a cluster with six nodes running NodeManagers, each equipped with 16 cores and 64GB of memory. The NodeManager capacities, yarn.nodemanager.resource.memory-mb and yarn.nodemanager.resource.cpu-vcores, should probably be set to 63 * 1024 = 64512 (megabytes) and 15 respectively. We avoid allocating 100% of the resources to YARN containers because the node needs some resources to run the OS and Hadoop daemons. In this case, we leave a gigabyte and a core for these system processes. Cloudera Manager helps by accounting for these and configuring these YARN properties automatically. The likely first impulse would be to use --num-executors 6 --executor-cores 15 --executor-memory 63G. However, this is the wrong approach because: 63GB + the executor memory overhead won’t fit within the 63GB capacity of the NodeManagers. The application master will take up a core on one of the nodes, meaning that there won’t be room for a 15-core executor on that node. 15 cores per executor can lead to bad HDFS I/O throughput. A better option would be to use --num-executors 17 --executor-cores 5 --executor-memory 19G. Why? This config results in three executors on all nodes except for the one with the AM, which will have two executors. --executor-memory was derived as (63/3 executors per node) = 21. 21 * 0.07 = 1.47. 21 – 1.47 ~ 19.

Cloudera博客的一篇文章给出了解释,How-to: tuning Your Apache Spark Jobs (Part 2)。

其他回答

来自RStudio的Sparklyr包页面的优秀资源:

SPARK DEFINITIONS: It may be useful to provide some simple definitions for the Spark nomenclature: Node: A server Worker Node: A server that is part of the cluster and are available to run Spark jobs Master Node: The server that coordinates the Worker nodes. Executor: A sort of virtual machine inside a node. One Node can have multiple Executors. Driver Node: The Node that initiates the Spark session. Typically, this will be the server where sparklyr is located. Driver (Executor): The Driver Node will also show up in the Executor list.

根据Sandy Ryza的说法,当你在HDFS上运行spark应用程序时

我注意到HDFS客户端有大量并发的问题 线程。粗略估计,每个执行者最多可以执行5个任务 实现全写吞吐量,所以最好保持数量 每个执行程序的内核数低于这个数字。

所以我相信你的第一个配置比第三个配置慢是因为HDFS的I/O吞吐量不好

To hopefully make all of this a little more concrete, here’s a worked example of configuring a Spark app to use as much of the cluster as possible: Imagine a cluster with six nodes running NodeManagers, each equipped with 16 cores and 64GB of memory. The NodeManager capacities, yarn.nodemanager.resource.memory-mb and yarn.nodemanager.resource.cpu-vcores, should probably be set to 63 * 1024 = 64512 (megabytes) and 15 respectively. We avoid allocating 100% of the resources to YARN containers because the node needs some resources to run the OS and Hadoop daemons. In this case, we leave a gigabyte and a core for these system processes. Cloudera Manager helps by accounting for these and configuring these YARN properties automatically. The likely first impulse would be to use --num-executors 6 --executor-cores 15 --executor-memory 63G. However, this is the wrong approach because: 63GB + the executor memory overhead won’t fit within the 63GB capacity of the NodeManagers. The application master will take up a core on one of the nodes, meaning that there won’t be room for a 15-core executor on that node. 15 cores per executor can lead to bad HDFS I/O throughput. A better option would be to use --num-executors 17 --executor-cores 5 --executor-memory 19G. Why? This config results in three executors on all nodes except for the one with the AM, which will have two executors. --executor-memory was derived as (63/3 executors per node) = 21. 21 * 0.07 = 1.47. 21 – 1.47 ~ 19.

Cloudera博客的一篇文章给出了解释,How-to: tuning Your Apache Spark Jobs (Part 2)。

简而言之:我认为特巴乔是对的。您在执行器上达到了HDFS的吞吐量限制。

我认为答案可能比这里的一些建议要简单一些。

对我来说,线索在集群网络图中。对于运行1,利用率稳定在~ 50m字节/秒。对于运行3,稳定的利用率增加了一倍,约为100 M字节/秒。

从DzOrd分享的cloudera博客文章中,你可以看到这句重要的话:

我注意到HDFS客户端有大量并发线程的问题。一个粗略的猜测是,每个执行器最多5个任务可以实现完整的写吞吐量,所以最好将每个执行器的内核数保持在这个数字以下。

那么,让我们做一些计算,看看如果这是真的,我们期望的性能是什么。


运行1:19 GB, 7核,3个执行器

3个执行者x 7个线程= 21个线程 每个executor有7个内核,我们期望有限的IO到HDFS(最多5个内核) 有效吞吐量~= 3个executor x 5个线程= 15个线程

运行3:4 GB, 2核,12个执行器

2个executor x 12个线程= 24个线程 每个executor有2个内核,所以HDFS的吞吐量是可以的 有效吞吐量~= 12个executor x 2个线程= 24个线程


如果作业100%受到并发性(线程数)的限制。我们期望运行时与线程数完全成反比。

ratio_num_threads = nthread_job1 / nthread_job3 = 15/24 = 0.625
inv_ratio_runtime = 1/(duration_job1 / duration_job3) = 1/(50/31) = 31/50 = 0.62

因此,ratio_num_threads ~= inv_ratio_runtime,看起来我们受到了网络限制。

同样的效果解释了运行1和运行2之间的差异。


运行2:19 GB, 4核,3个执行器

3个执行者x 4个线程= 12个线程 每个executor有4个内核,可以IO到HDFS 有效吞吐量~= 3个执行者x 4个线程= 12个线程


比较有效线程数和运行时数:

ratio_num_threads = nthread_job2 / nthread_job1 = 12/15 = 0.8
inv_ratio_runtime = 1/(duration_job2 / duration_job1) = 1/(55/50) = 50/55 = 0.91

它没有上次的比较那么完美,但是当失去线程时,我们仍然可以看到类似的性能下降。

现在是最后一点:为什么线程越多性能越好,尤其是线程数比cpu数多?

并行性(我们通过将数据划分到多个CPU上得到的东西)和并发性(我们使用多个线程在一个CPU上工作时得到的东西)之间的区别,Rob Pike在这篇很棒的文章中提供了一个很好的解释:并发性不是并行性。

简单的解释是,如果一个Spark作业与一个文件系统或网络交互,那么CPU会花很多时间等待与这些接口的通信,而不是花很多时间实际“工作”。通过同时为这些cpu提供多个任务,它们将花费更少的等待时间和更多的工作时间,从而获得更好的性能。

我认为前两个配置有一个小问题。线程和核心的概念如下。线程的概念是,如果内核是理想的,那么使用该内核来处理数据。所以在前两种情况下,内存没有被充分利用。如果您想对这个示例进行基准测试,请选择每台机器上有超过10个核的机器。然后做标杆。

但是不要给每个执行器超过5个内核,这会在i/o性能上遇到瓶颈。

所以做这个基准测试的最好机器可能是有10个核的数据节点。

数据节点机规格: CPU: i7-4790(核数:10,线程数:20) 内存:32GB (8GB x 4) 硬盘:8TB (2TB × 4)