我读了集群模式概述,我仍然不能理解Spark独立集群中的不同进程和并行性。
worker是否是JVM进程?我运行bin\start-slave.sh,发现它生成了worker,它实际上是一个JVM。
根据上面的链接,执行程序是为运行任务的工作节点上的应用程序启动的进程。执行程序也是JVM。
这些是我的问题:
Executors are per application. Then what is the role of a worker? Does it co-ordinate with the executor and communicate the result back to the driver?
Or does the driver directly talks to the executor? If so, what is the worker's purpose then?
How to control the number of executors for an application?
Can the tasks be made to run in parallel inside the executor? If so, how to configure the number of threads for an executor?
What is the relation between a worker, executors and executor cores ( --total-executor-cores)?
What does it mean to have more workers per node?
更新
让我们举个例子来更好地理解。
示例1:
具有5个工作节点的独立集群(每个节点有8个核心)
当我用默认设置启动应用程序时。
示例2
与示例1相同的集群配置,但我使用以下设置运行了一个应用程序
——executor-cores 10
——total-executor-cores 10。
示例3
与示例1相同的集群配置,但我使用以下设置运行了一个应用程序
——executor-cores 10
——total-executor-cores 50。
示例4
与示例1相同的集群配置,但我使用以下设置运行了一个应用程序
——executor-cores 50
——total-executor-cores 50。
示例5
与示例1相同的集群配置,但我使用以下设置运行了一个应用程序
——executor-cores 50
——total-executor-cores 10。
在这些例子中,
有多少个执行人?每个执行器有多少线程?多少核?
如何决定每个应用程序的执行者数量?它总是和工人数量一样吗?
Spark使用主/从架构。正如您在图中看到的,它有一个中央协调器(Driver),与许多分布式工作者(执行程序)通信。驱动程序和每个执行程序在各自的Java进程中运行。
司机
驱动程序是主方法运行的进程。首先,它将用户程序转换为任务,然后在执行器上调度任务。
执行人
执行器是工作节点的进程,负责在给定的Spark作业中运行单个任务。它们在Spark应用程序开始时启动,通常在应用程序的整个生命周期中运行。一旦它们运行了任务,就会将结果发送给驱动程序。它们还为用户程序通过块管理器缓存的rdd提供内存存储。
应用程序执行流程
考虑到这一点,当你用spark-submit向集群提交一个应用程序时,内部会发生这样的情况:
A standalone application starts and instantiates a SparkContext instance (and it is only then when you can call the application a driver).
The driver program ask for resources to the cluster manager to launch executors.
The cluster manager launches executors.
The driver process runs through the user application. Depending on the actions and transformations over RDDs task are sent to executors.
Executors run the tasks and save the results.
If any worker crashes, its tasks will be sent to different executors to be processed again. In the book "Learning Spark: Lightning-Fast Big Data Analysis" they talk about Spark and Fault Tolerance:
Spark通过重新执行失败或缓慢的任务来自动处理失败或缓慢的机器。例如,如果运行map()操作分区的节点崩溃,Spark将在另一个节点上重新运行它;即使节点没有崩溃,只是比其他节点慢得多,Spark也可以抢先在另一个节点上启动任务的“推测”副本,并在完成后获取其结果。
使用SparkContext.stop()从驱动程序或如果主方法退出/崩溃,所有的执行程序将被终止,集群资源将由集群管理器释放。
你的问题
When executors are started they register themselves with the driver and from so on they communicate directly. The workers are in charge of communicating the cluster manager the availability of their resources.
In a YARN cluster you can do that with --num-executors. In a standalone cluster you will get one executor per worker unless you play with spark.executor.cores and a worker has enough cores to hold more than one executor. (As @JacekLaskowski pointed out, --num-executors is no longer in use in YARN https://github.com/apache/spark/commit/16b6d18613e150c7038c613992d80a7828413e66)
You can assign the number of cores per executor with --executor-cores
--total-executor-cores is the max number of executor cores per application
As Sean Owen said in this thread: "there's not a good reason to run more than one worker per machine". You would have many JVM sitting in one machine for instance.
更新
我还没能测试这个场景,但根据文档:
例1:Spark将贪婪地获取调度程序提供的尽可能多的内核和执行程序。所以最后你会得到5个8核的执行器。
例2到5:Spark无法在单个worker中分配足够多的内核,因此不会启动任何执行程序。