我试图理解在YARN上运行Spark作业时,内核数量和执行器数量之间的关系。

测试环境如下:

Number of data nodes: 3 Data node machine spec: CPU: Core i7-4790 (# of cores: 4, # of threads: 8) RAM: 32GB (8GB x 4) HDD: 8TB (2TB x 4) Network: 1Gb Spark version: 1.0.0 Hadoop version: 2.4.0 (Hortonworks HDP 2.1) Spark job flow: sc.textFile -> filter -> map -> filter -> mapToPair -> reduceByKey -> map -> saveAsTextFile Input data Type: single text file Size: 165GB Number of lines: 454,568,833 Output Number of lines after second filter: 310,640,717 Number of lines of the result file: 99,848,268 Size of the result file: 41GB

作业在以下配置下运行:

——master yarn-client——executor-memory 19G——executor-cores 7——num-executors 3(每个数据节点的executor,使用的内核数一样多) ——master yarn-client——executor-memory 19G——executor-cores 4——num-executor 3(内核数减少) ——master yarn-client——executor-memory 4G——executor-cores 2——num-executor 12(内核少,executor多)

运行时间:

50分15秒 55分48秒 12分23秒

令我惊讶的是,(3)要快得多。 我认为(1)会更快,因为洗牌时执行者之间的交流会更少。 虽然(1)的核数比(3)少,但核数不是关键因素,因为2)的表现很好。

(在pwilmot的回答之后添加了以下内容。)

性能监视器屏幕截图如下:

(1)的Ganglia数据节点摘要-作业开始于04:37。

(3)的Ganglia数据节点摘要-作业开始于19:47。请忽略之前的图表。

图表大致分为两部分:

第一:从start到reduceByKey: CPU密集型,没有网络活动 第二:reduceByKey: CPU降低后,网络I/O完成。

如图所示,(1)可以使用尽可能多的CPU功率。所以,这可能不是线程数量的问题。

如何解释这一结果?


当前回答

来自RStudio的Sparklyr包页面的优秀资源:

SPARK DEFINITIONS: It may be useful to provide some simple definitions for the Spark nomenclature: Node: A server Worker Node: A server that is part of the cluster and are available to run Spark jobs Master Node: The server that coordinates the Worker nodes. Executor: A sort of virtual machine inside a node. One Node can have multiple Executors. Driver Node: The Node that initiates the Spark session. Typically, this will be the server where sparklyr is located. Driver (Executor): The Driver Node will also show up in the Executor list.

其他回答

来自RStudio的Sparklyr包页面的优秀资源:

SPARK DEFINITIONS: It may be useful to provide some simple definitions for the Spark nomenclature: Node: A server Worker Node: A server that is part of the cluster and are available to run Spark jobs Master Node: The server that coordinates the Worker nodes. Executor: A sort of virtual machine inside a node. One Node can have multiple Executors. Driver Node: The Node that initiates the Spark session. Typically, this will be the server where sparklyr is located. Driver (Executor): The Driver Node will also show up in the Executor list.

I haven't played with these settings myself so this is just speculation but if we think about this issue as normal cores and threads in a distributed system then in your cluster you can use up to 12 cores (4 * 3 machines) and 24 threads (8 * 3 machines). In your first two examples you are giving your job a fair number of cores (potential computation space) but the number of threads (jobs) to run on those cores is so limited that you aren't able to use much of the processing power allocated and thus the job is slower even though there is more computation resources allocated.

您提到您关心的是shuffle步骤—虽然在shuffle步骤中限制开销很好,但通常更重要的是利用集群的并行化。考虑一个极端的情况——一个没有shuffle的单线程程序。

我认为其中一个主要原因是地方性。您的输入文件大小为165G,文件的相关块当然分布在多个datanode上,更多的执行器可以避免网络复制。

尝试设置executor num等于blocks count,我认为可以更快。

Spark动态分配提供了灵活性,动态分配资源。在此范围内,可以给出最小和最大执行程序的数量。同样,在应用程序启动时必须启动的执行程序的数量也可以给定。

请阅读以下相同的内容:

http://spark.apache.org/docs/latest/configuration.html#dynamic-allocation

根据Sandy Ryza的说法,当你在HDFS上运行spark应用程序时

我注意到HDFS客户端有大量并发的问题 线程。粗略估计,每个执行者最多可以执行5个任务 实现全写吞吐量,所以最好保持数量 每个执行程序的内核数低于这个数字。

所以我相信你的第一个配置比第三个配置慢是因为HDFS的I/O吞吐量不好