我试图理解在YARN上运行Spark作业时,内核数量和执行器数量之间的关系。
测试环境如下:
Number of data nodes: 3
Data node machine spec:
CPU: Core i7-4790 (# of cores: 4, # of threads: 8)
RAM: 32GB (8GB x 4)
HDD: 8TB (2TB x 4)
Network: 1Gb
Spark version: 1.0.0
Hadoop version: 2.4.0 (Hortonworks HDP 2.1)
Spark job flow: sc.textFile -> filter -> map -> filter -> mapToPair -> reduceByKey -> map -> saveAsTextFile
Input data
Type: single text file
Size: 165GB
Number of lines: 454,568,833
Output
Number of lines after second filter: 310,640,717
Number of lines of the result file: 99,848,268
Size of the result file: 41GB
作业在以下配置下运行:
——master yarn-client——executor-memory 19G——executor-cores 7——num-executors 3(每个数据节点的executor,使用的内核数一样多)
——master yarn-client——executor-memory 19G——executor-cores 4——num-executor 3(内核数减少)
——master yarn-client——executor-memory 4G——executor-cores 2——num-executor 12(内核少,executor多)
运行时间:
50分15秒
55分48秒
12分23秒
令我惊讶的是,(3)要快得多。
我认为(1)会更快,因为洗牌时执行者之间的交流会更少。
虽然(1)的核数比(3)少,但核数不是关键因素,因为2)的表现很好。
(在pwilmot的回答之后添加了以下内容。)
性能监视器屏幕截图如下:
(1)的Ganglia数据节点摘要-作业开始于04:37。
(3)的Ganglia数据节点摘要-作业开始于19:47。请忽略之前的图表。
图表大致分为两部分:
第一:从start到reduceByKey: CPU密集型,没有网络活动
第二:reduceByKey: CPU降低后,网络I/O完成。
如图所示,(1)可以使用尽可能多的CPU功率。所以,这可能不是线程数量的问题。
如何解释这一结果?
简而言之:我认为特巴乔是对的。您在执行器上达到了HDFS的吞吐量限制。
我认为答案可能比这里的一些建议要简单一些。
对我来说,线索在集群网络图中。对于运行1,利用率稳定在~ 50m字节/秒。对于运行3,稳定的利用率增加了一倍,约为100 M字节/秒。
从DzOrd分享的cloudera博客文章中,你可以看到这句重要的话:
我注意到HDFS客户端有大量并发线程的问题。一个粗略的猜测是,每个执行器最多5个任务可以实现完整的写吞吐量,所以最好将每个执行器的内核数保持在这个数字以下。
那么,让我们做一些计算,看看如果这是真的,我们期望的性能是什么。
运行1:19 GB, 7核,3个执行器
3个执行者x 7个线程= 21个线程
每个executor有7个内核,我们期望有限的IO到HDFS(最多5个内核)
有效吞吐量~= 3个executor x 5个线程= 15个线程
运行3:4 GB, 2核,12个执行器
2个executor x 12个线程= 24个线程
每个executor有2个内核,所以HDFS的吞吐量是可以的
有效吞吐量~= 12个executor x 2个线程= 24个线程
如果作业100%受到并发性(线程数)的限制。我们期望运行时与线程数完全成反比。
ratio_num_threads = nthread_job1 / nthread_job3 = 15/24 = 0.625
inv_ratio_runtime = 1/(duration_job1 / duration_job3) = 1/(50/31) = 31/50 = 0.62
因此,ratio_num_threads ~= inv_ratio_runtime,看起来我们受到了网络限制。
同样的效果解释了运行1和运行2之间的差异。
运行2:19 GB, 4核,3个执行器
3个执行者x 4个线程= 12个线程
每个executor有4个内核,可以IO到HDFS
有效吞吐量~= 3个执行者x 4个线程= 12个线程
比较有效线程数和运行时数:
ratio_num_threads = nthread_job2 / nthread_job1 = 12/15 = 0.8
inv_ratio_runtime = 1/(duration_job2 / duration_job1) = 1/(55/50) = 50/55 = 0.91
它没有上次的比较那么完美,但是当失去线程时,我们仍然可以看到类似的性能下降。
现在是最后一点:为什么线程越多性能越好,尤其是线程数比cpu数多?
并行性(我们通过将数据划分到多个CPU上得到的东西)和并发性(我们使用多个线程在一个CPU上工作时得到的东西)之间的区别,Rob Pike在这篇很棒的文章中提供了一个很好的解释:并发性不是并行性。
简单的解释是,如果一个Spark作业与一个文件系统或网络交互,那么CPU会花很多时间等待与这些接口的通信,而不是花很多时间实际“工作”。通过同时为这些cpu提供多个任务,它们将花费更少的等待时间和更多的工作时间,从而获得更好的性能。
简而言之:我认为特巴乔是对的。您在执行器上达到了HDFS的吞吐量限制。
我认为答案可能比这里的一些建议要简单一些。
对我来说,线索在集群网络图中。对于运行1,利用率稳定在~ 50m字节/秒。对于运行3,稳定的利用率增加了一倍,约为100 M字节/秒。
从DzOrd分享的cloudera博客文章中,你可以看到这句重要的话:
我注意到HDFS客户端有大量并发线程的问题。一个粗略的猜测是,每个执行器最多5个任务可以实现完整的写吞吐量,所以最好将每个执行器的内核数保持在这个数字以下。
那么,让我们做一些计算,看看如果这是真的,我们期望的性能是什么。
运行1:19 GB, 7核,3个执行器
3个执行者x 7个线程= 21个线程
每个executor有7个内核,我们期望有限的IO到HDFS(最多5个内核)
有效吞吐量~= 3个executor x 5个线程= 15个线程
运行3:4 GB, 2核,12个执行器
2个executor x 12个线程= 24个线程
每个executor有2个内核,所以HDFS的吞吐量是可以的
有效吞吐量~= 12个executor x 2个线程= 24个线程
如果作业100%受到并发性(线程数)的限制。我们期望运行时与线程数完全成反比。
ratio_num_threads = nthread_job1 / nthread_job3 = 15/24 = 0.625
inv_ratio_runtime = 1/(duration_job1 / duration_job3) = 1/(50/31) = 31/50 = 0.62
因此,ratio_num_threads ~= inv_ratio_runtime,看起来我们受到了网络限制。
同样的效果解释了运行1和运行2之间的差异。
运行2:19 GB, 4核,3个执行器
3个执行者x 4个线程= 12个线程
每个executor有4个内核,可以IO到HDFS
有效吞吐量~= 3个执行者x 4个线程= 12个线程
比较有效线程数和运行时数:
ratio_num_threads = nthread_job2 / nthread_job1 = 12/15 = 0.8
inv_ratio_runtime = 1/(duration_job2 / duration_job1) = 1/(55/50) = 50/55 = 0.91
它没有上次的比较那么完美,但是当失去线程时,我们仍然可以看到类似的性能下降。
现在是最后一点:为什么线程越多性能越好,尤其是线程数比cpu数多?
并行性(我们通过将数据划分到多个CPU上得到的东西)和并发性(我们使用多个线程在一个CPU上工作时得到的东西)之间的区别,Rob Pike在这篇很棒的文章中提供了一个很好的解释:并发性不是并行性。
简单的解释是,如果一个Spark作业与一个文件系统或网络交互,那么CPU会花很多时间等待与这些接口的通信,而不是花很多时间实际“工作”。通过同时为这些cpu提供多个任务,它们将花费更少的等待时间和更多的工作时间,从而获得更好的性能。
To hopefully make all of this a little more concrete, here’s a worked example of configuring a Spark app to use as much of the cluster as
possible: Imagine a cluster with six nodes running NodeManagers, each
equipped with 16 cores and 64GB of memory. The NodeManager capacities,
yarn.nodemanager.resource.memory-mb and
yarn.nodemanager.resource.cpu-vcores, should probably be set to 63 *
1024 = 64512 (megabytes) and 15 respectively. We avoid allocating 100%
of the resources to YARN containers because the node needs some
resources to run the OS and Hadoop daemons. In this case, we leave a
gigabyte and a core for these system processes. Cloudera Manager helps
by accounting for these and configuring these YARN properties
automatically.
The likely first impulse would be to use --num-executors 6
--executor-cores 15 --executor-memory 63G. However, this is the wrong approach because:
63GB + the executor memory overhead won’t fit within the 63GB capacity
of the NodeManagers. The application master will take up a core on one
of the nodes, meaning that there won’t be room for a 15-core executor
on that node. 15 cores per executor can lead to bad HDFS I/O
throughput.
A better option would be to use --num-executors 17
--executor-cores 5 --executor-memory 19G. Why?
This config results in three executors on all nodes except for the one
with the AM, which will have two executors.
--executor-memory was derived as (63/3 executors per node) = 21. 21 * 0.07 = 1.47. 21 – 1.47 ~ 19.
Cloudera博客的一篇文章给出了解释,How-to: tuning Your Apache Spark Jobs (Part 2)。