- Spark大数据商业实战三部曲:内核解密、商业案例、性能调优(第2版)
- 王家林 段智华编著
- 6345字
- 2025-02-25 00:19:27
6.3 从Application提交的角度重新审视Driver
本节从Application提交的角度重新审视Driver,彻底解密Driver到底是什么时候产生的,以及Driver和Master交互原理、Driver和Master交互源码。
6.3.1 Driver到底是什么时候产生的
在SparkContext实例化时,通过createTaskScheduler来创建TaskSchedulerImpl和StandaloneSchedulerBackend。
SparkContext.scala的源码如下:

在createTaskScheduler中调用scheduler.initialize(backend),initialize的方法参数把StandaloneSchedulerBackend传进来。
TaskSchedulerImpl的initialize的源码如下:

initialize的方法把StandaloneSchedulerBackend传进来了,但还没有启动Standalone-SchedulerBackend。在TaskSchedulerImpl的initialize方法中,把StandaloneSchedulerBackend传进来,赋值为TaskSchedulerImpl的backend。
在TaskSchedulerImpl中调用start方法时,会调用backend.start方法,在start方法中会注册应用程序。
SparkContext.scala的taskScheduler的源码如下:

其中调用了_taskScheduler的start方法。

TaskScheduler的start()方法没具体实现,TaskScheduler子类的TaskSchedulerImpl的start()方法的源码如下:

TaskSchedulerImpl的start()通过backend.start()启动了StandaloneSchedulerBackend的start方法。
StandaloneSchedulerBackend的start方法中,将command封装注册给Master,Master转过来要Worker启动具体的Executor。command已经封装好指令,Executor具体要启动进程入口类CoarseGrainedExecutorBackend。然后调用new()函数创建一个StandaloneAppClient,通过client.start()启动client。
StandaloneAppClient的start方法中调用new()函数创建一个ClientEndpoint。

ClientEndpoint的源码如下:

ClientEndpoint是一个ThreadSafeRpcEndpoint。ClientEndpoint的onStart()方法中调用registerWithMaster(1)进行注册,向Master注册程序。registerWithMaster方法如下。
StandaloneAppClient.scala的源码如下:

registerWithMaster中调用了tryRegisterAllMasters方法。在tryRegisterAllMasters方法中,ClientEndpoint向Master发送RegisterApplication消息进行应用程序的注册。
StandaloneAppClient.scala的源码如下:

程序注册以后,Master通过schedule()分配资源,通知Worker启动Executor,Executor启动的进程是CoarseGrainedExecutorBackend,Executor启动以后又转过来向Driver注册,Driver其实是StandaloneSchedulerBackend的父类CoarseGrainedSchedulerBackend的一个消息循环体DriverEndpoint。
Master.scala的receive方法的源码如下:

在Master的receive方法中调用了schedule方法。Schedule方法在等待的应用程序中调度当前可用的资源。每次一个新的应用程序连接或资源发生可用性的变化时,此方法将被调用。
Master.scala的schedule方法的源码如下:

Master.scala在schedule方法中调用launchDriver方法。launchDriver方法给Worker发送launchDriver的消息。Master.scala的launchDriver的源码如下:

launchDriver本身是一个case class,包括driverId、driverDesc等信息。

DriverDescription包含了jarUrl、memory、cores、supervise、command等内容。

Master.scala中launchDriver启动了Driver,接下来,launchExecutor启动Executor。Master.scala的launchExecutor的源码如下:

Master给Worker发送一个消息LaunchDriver启动Driver,然后是launchExecutor启动Executor,launchExecutor有自己的调度方式,资源调度后,也是给Worker发送了一个消息LaunchExecutor。
Worker就收到Master发送的LaunchDriver、LaunchExecutor消息。
图6-2是Worker原理内幕和流程机制。

图6-2 Worker原理内幕和流程机制
Master、Worker部署在不同的机器上,Master、Worker为进程存在。Master给Worker发两种不同的指令:一种指令是LaunchDriver;另一种指令是LaunchExecutor。
Worker收到Master的LaunchDriver消息以后,调用new()函数创建一个DriverRunner,然后启动driver.start()方法。
Worker.scala的源码如下:

Worker收到Master的LaunchExecutor消息以后,new()函数创建一个ExecutorRunner,然后启动manager.start()方法。
Worker.scala的源码如下:

Worker的DriverRunner、ExecutorRunner在调用start方法时,在start内部都启动了一条线程,使用Thread来处理Driver、Executor的启动。以Worker收到LaunchDriver消息,new出DriverRunnerDriverRunner为例,DriverRunner.scala的start的源码如下:

DriverRunner.scala的start方法中调用了prepareAndRunDriver方法,准备Driver的jar包和启动Driver。prepareAndRunDriver的源码如下:

LaunchDriver的启动过程如下。
Worker进程:Worker的DriverRunner调用start方法,内部使用Thread来处理Driver启动。DriverRunner创建Driver在本地系统的工作目录(即Linux的文件目录),每次工作都有自己的目录,封装好Driver的启动Command,通过ProcessBuilder启动Driver。这些内容都属于Worker进程。
Driver进程:启动的Driver属于Driver进程。
LaunchExecutor的启动过程如下。
Worker进程:Worker的ExecutorRunner调用start方法,内部使用Thread来处理Executor启动。ExecutorRunner创建Executor在本地系统的工作目录(即Linux的文件目录),每次工作都有自己的目录,封装好Executor的启动Command,通过ProcessBuilder来启动Executor。这些内容都属于Worker进程。
Executor进程:启动的Executor属于Executor进程。Executor在ExecutorBackend里面,ExecutorBackend在Spark standalone模式中是CoarseGrainedExecutorBackend。CoarseGrainedExecutorBackend继承自ExecutorBackend。Executor和ExecutorBackend是一对一的关系,一个ExecutorBackend有一个Executor,在Executor内部是通过线程池并发处理的方式来处理Spark提交过来的Task的。
Executor启动后要向Driver注册,注册给SchedulerBackend。
CoarseGrainedExecutorBackend的源码如下:

再次看一下Master的schedule方法。

Master的schedule方法中,如果Driver运行在集群中,通过launchDriver来启动Driver。launchDriver发送一个消息交给worker的endpoint,这是RPC的通信机制。

Master的schedule方法中启动Executor的部分,通过startExecutorsOnWorkers启动,startExecutorsOnWorkers也是通过RPC的通信方式。
Master.scala的方法中调用allocateWorkerResourceToExecutors方法进行正式分配。
allocateWorkerResourceToExecutors正式分配时就通过launchExecutor方法启动Executor。

Master发送消息给Worker,发送两个消息:一个是LaunchDriver;另一个是LaunchExecutor。Worker收到Master的LaunchDriver、LaunchExecutor消息。下面看一下Worker。
Spark 2.2.1版本的Worker.scala源代码如下:

Spark 2.4.3版本的Worker.scala源码与Spark 2.2.1版本相比具有如下特点。
上段代码中第10行之后新增加一行代码,新增加一个成员变量externalShuffleService-Supplier。
1. externalShuffleServiceSupplier: Supplier[ExternalShuffleService] = null)
Worker实现RPC通信,继承自ThreadSafeRpcEndpoint。ThreadSafeRpcEndpoint是一个trait,其他的RPC对象可以给它发消息。
1. private[spark] trait ThreadSafeRpcEndpoint extends RpcEndpoint
Worker在receive方法中接收消息。就像一个邮箱,不断地循环邮箱接收邮件,我们可以把消息看成邮件。

Worker.scala的receive方法LaunchDriver启动Driver的源码如下:

LaunchDriver方法首先打印日志,传进来时肯定会告诉driverId。启动Driver或者Executor时,Driver或者Executor所在的进程一定满足内存级别的要求,但不一定满足Cores的要求,实际的Cores可能比期待的Cores多,也有可能少。
logInfo方法打印日志使用了封装。

回到LaunchDriver方法,其中调用new()函数创建一个DriverRunner。DriverRunner包括driverId、工作目录(workDir)、spark的路径(sparkHome)、driverDesc、workerUri、securityMgr等内容。在代码drivers(driverId) = driver中,将driver交给一个数据结构drivers,drivers是一个HashMap,是Key-Value的方式,其中Key是Driver的ID,Value是DriverRunner。Worker下可能启动很多Executor,须根据具体的ID管理DriverRunner。DriverRunner内部通过线程的方式启动另外一个进程Driver。DriverRunner是Driver所在进程的代理。
1. val drivers = new HashMap[String, DriverRunner]
回到Worker.scala的LaunchDriver,Worker在启动driver前,将相关的DriverRunner数据保存到Worker的内存数据结构中,然后进行driver.start()。start之后,将消耗的cores、memory增加到coresUsed、memoryUsed。
接下来进入DriverRunner.scala的源码。DriverRunner管理Driver的执行,包括在Driver失败的时候自动重启。如Driver运行在集群模式中,加入supervise关键字可以自动重启。

其中DriverDescription的源码如下。其中包括DriverDescription的成员supervise,supervise是一个布尔值,如果设置为True,在集群模式中Driver运行失败的时候,Worker会负责重新启动Driver。

回到Worker.scala的LaunchDriver,DriverRunner构造出后,调用其start方法,通过一个线程管理Driver,包括启动Driver及关闭Driver。其中,Thread("DriverRunner for " +driverId),DriverRunner for driverId是线程的名字,Thread是Java的代码,scala可以无缝连接Java。
DriverRunner的start方法调用prepareAndRunDriver来实现driver jar包的准备及启动driver。
prepareAndRunDriver方法中调用了createWorkingDirectory方法创建目录。通过Java的new File创建了Driver的工作目录,如果目录不存在而且创建不成功,就提示失败。在本地文件系统创建一个目录一般不会失败,除非磁盘满。createWorkingDirectory的源码如下:

回到DriverRunner.scala的prepareAndRunDriver方法,其中采用downloadUserJar方法下载jar包。我们自己写的代码是一个jar包,这里下载用户的jar包到本地。jar包在Hdfs中,开发人员需要从Hdfs中获取Jar包下载到本地。
downloadUserJar方法的源码如下:

downloadUserJar方法调用了fetchFile,fetchFile借助Hadoop,从Hdfs中下载文件。我们提交文件时,将jar包上传到Hdfs上,提交一份,大家都可以从Hdfs中下载。
Spark 2.2.1版本的Utils.scala源代码如下:

Spark 2.4.3版本的Utils.scala源码与Spark 2.2.1版本相比具有如下特点。
上段代码中第8行新增加了函数的返回类型File。
上段代码中第15行构建本地目录变量,调整为以下第5~12行代码。

回到DriverRunner.scala的prepareAndRunDriver方法,driverDesc.command表明运行什么类,构建进程运行类的入口,然后是runDriver启动Driver。

DriverRunner.scala的runDriver方法如下。runDriver中重定向输出文件和err文件,可以通过log文件查看执行的情况。最后是调用runCommandWithRetry方法。

runCommandWithRetry中传入的参数是ProcessBuilderLike(builder),这里调用new()函数创建一个ProcessBuilderLike,在重载方法start中执行processBuilder.start()。ProcessBuilderLike的源码如下:

runCommandWithRetry的源码如下:

runCommandWithRetry第一次不一定能申请成功,因此循环遍历重试。DriverRunner启动进程是通过ProcessBuilder中的process.get.waitFor来完成的。如果supervise设置为True,exitCode为非零退出码及driver进程没有终止,我们将keepTrying设置为True,继续循环重试启动进程。
回到DriverRunner.scala的LaunchDriver方法如下:

采用driver.start方法启动Driver,进入start的源码如下:

Start启动时运行到了finalState,可能是Spark运行出状况了,如Driver运行时KILLED或者FAILED,出状况以后,通过worker.send给自己发一个消息,通知DriverStateChanged状态改变。下面是Worker.scala中的driverStateChanged的源码。

在其中调用handleDriverStateChanged方法,handleDriverStateChanged的源码如下:

Worker.scala的handleDriverStateChanged方法中对于state的不同情况,打印相关日志。关键代码是sendToMaster(driverStateChanged),发一个消息给Master,告知Driver进程挂掉。消息内容是driverStateChanged。sendToMaster的源码如下:

下面来看一下Master的源码。Master收到DriverStateChanged消息以后,无论Driver的状态是DriverState.ERROR | DriverState.FINISHED | DriverState.KILLED | DriverState.FAILED中的任何一个,都把Driver从内存数据结构中删掉,并把持久化引擎中的数据清理掉。

进入removeDriver的源码,清理掉相关数据以后,再次调用schedule方法。

接下来看一下启动Executor。Worker.scala的LaunchExecutor方法的源码如下所示。
Worker.scala的源码如下:

直接看一下manager.start方法,启动一个线程Thread,在run方法中调用fetchAndRunExecutor。
Spark 2.2.1版本的fetchAndRunExecutor的源码如下:

Spark 2.4.3版本的ExecutorRunner.scala源码与Spark 2.2.1版本相比具有如下特点。
上段代码中第3行之后新增加代码,构建subsCommand变量。
上段代码中第4行appDesc.command调整为subsCommand变量。

fetchAndRunExecutor类似于启动Driver的过程,在启动Executor时首先构建CommandUtils.buildProcessBuilder,然后是builder.start(),退出时发送ExecutorStateChanged消息给Worker。
Worker.scala源码中的executorStateChanged如下:

进入handleExecutorStateChanged源码,sendToMaster(executorStateChanged)发executorState-Changed消息给Master。
Spark 2.2.1版本的Worker.scala源码如下:

Spark 2.4.3版本的Worker.scala源码与Spark 2.2.1版本相比具有如下特点。
上段代码中第19行之后新增代码,如果CLEANUP_NON_SHUFFLE_FILES_ENABLED为True,则删除executor。

下面看一下Master.scala。Master收到ExecutorStateChanged消息。如状态发生改变,通过exec.application.driver.send给Driver也发送一个ExecutorUpdated消息,流程和启动Driver基本是一样的。ExecutorStateChanged的源码如下:

6.3.2 Driver和Master交互原理解析
Driver和Master交互,Master是一个消息循环体。本节讲解Driver消息循环体的产生过程,Driver消息循环体生成之后,就可以与Master互相通信了。
Spark应用程序提交时,我们会提交一个spark-submit脚本。spark-submit脚本中直接运行了org.apache.spark.deploy.SparkSubmit对象。Spark-submit脚本内容如下所示。

进入到SparkSubmit中,main函数代码如下所示。
Spark 2.2.1版本的SparkSubmit.scala的源码如下。

Spark 2.4.3版本的SparkSubmit.scala源码与Spark 2.2.1版本相比具有如下特点。
上段代码中第1~16行整体替换为以下代码。

上面的代码中,spark-submit脚本提交的命令行参数通过main函数的args获取,并将args参数传入SparkSubmitArguments中完成解析。最后通过匹配appArgs参数中的action类型,执行submit、kill、requestStatus、PRINT_VERSION操作。
进入到SparkSubmitArguments中,分析一下参数的解析过程。SparkSubmitArguments中的关键代码如下所示。
Spark 2.2.1版本的SparkSubmitArguments.scala的源码如下:

Spark 2.4.3版本的SparkSubmitArguments.scala源码与Spark 2.2.1版本相比具有如下特点。
上段代码中第2~8行删掉try-catch的处理,替换为parse(args.asJava)。
上段代码中第14行之后新增一行代码,构建useRest变量。

在上面的代码中,parse(args.toList)将会解析命令行参数,通过mergeDefaultSpark-Properties合并默认配置,调用ignoreNonSparkProperties方法忽略不是以“spark.”为开始的配置,方法loadEnvironmentArguments加载系统环境变量,最后调用validateArguments方法检验参数的合法性。这些配置如何提交呢?main函数中由case SparkSubmitAction.SUBMIT=> submit(appArgs)这句代码判断是否提交参数并执行程序,如果匹配到SparkSubmit-Action.SUBMIT,则调用submit(appArgs)方法,参数appArgs是SparkSubmitArguments类型,appArgs中包含了提交的各种参数,包括命令行传入以及默认的配置项。
submit(appArgs)方法主要完成两件事情。
(1)准备提交环境。
(2)执行main方法,完成提交。
首先来看Spark中是如何准备环境的。在submit(appArgs)方法中,有如下源码。
SparkSubmit.scala的源码如下:

这段代码中,调用prepareSubmitEnvironment(args)方法,完成提交环境的准备。该方法返回一个四元Tuple,分别表示子进程参数、子进程classpath列表、系统属性map、子进程main方法。完成了提交环境的准备工作后,接下来就启动子进程,在Standalone模式下,启动的子进程是org.apache.spark.deploy.Client对象。具体的执行过程在runMain函数中,关键代码如下所示。
Spark 2.2.1版本的SparkSubmit.scala的源码如下:

Spark 2.4.3版本的SparkSubmit.scala源码与Spark 2.2.1版本相比具有如下特点。
上段代码中第4行sysProps参数调整为sparkConf。
上段代码中第12~14行删除。
上段代码中第18行调整为new JavaMainApplication(mainClass)。
上段代码中第20行反射执行main方法调整为在JavaMainApplication实例的start方法里面执行。

在上面的代码中,使用Utils工具提供的classForName方法,找到主类,然后在mainClass上调用getMethod方法得到main方法,最后在mainMethod上调用invoke执行main方法。需要注意的是,执行invoke方法同时传入了childArgs参数,这个参数中保留了配置信息。Utils.classForName(childMainClass)方法将会返回要执行的主类,这里的childMainClass是哪一个类呢?其实,这个参数在不同的部署模式下是不一样的,standalone模式下,childMainClass指的是org.apache.spark.deploy.Client类,从源码中可以找到依据,源码如下所示。
Spark 2.2.1版本的SparkSubmit.scala的源码如下:

Spark 2.4.3版本的SparkSubmit.scala源码与Spark 2.2.1版本相比具有如下特点。
上段代码中第5行将childMainClass调整为REST_CLUSTER_SUBMIT_CLASS。
上段代码中第9行将childMainClass调整为STANDALONE_CLUSTER_SUBMIT_CLASS。

在上面的代码中,程序首先根据args.isStandaloneCluster判断部署模式,如果是standalone模式并且不使用REST服务,childMainClass = STANDALONE_CLUSTER_SUBMIT_CLASS,获取的childMainClass其实是org.apache.spark.deploy.Client。从上述代码中可以看出,childArgs中存入了Executor的memory配置和cores配置。与runMain方法中描述一样,程序将启动org.apache.spark.deploy.Client类,并运行主方法。Client类中做了哪些事情?先来看这个类是怎样完成调用的。下面是Client对象及主方法。
Spark 2.2.1版本的Client.scala的源码如下:

Spark 2.4.3版本的Client.scala源码与Spark 2.2.1版本相比具有如下特点。
上段代码中第8~30行进行函数封装,整体替换为ClientApp().start方法。

上面的代码中,首先实例化出一个SparkConfig对象,通过这个配置对象,可以在代码中指定一些配置项,如appName、Master地址等。val driverArgs = new ClientArguments(args)使用传入的args参数构建一个ClientArguments对象,该对象同样保留传入的配置信息,如Executor memory、Executor cores等都包含在这个对象中。
使用RpcEnv.create工厂方法,创建一个rpcEnv成员,使用该成员设置好到Master的通信端点,通过该端点实现同Master的通信。Spark 2.0中默认采用Netty框架来实现远程过程调用(Remote Precedure Call,RPC),通过使用RPC异步通信机制,完成各节点之间的通信。在rpcEnv.setupEndpoint方法中调用new()函数创建一个Driver ClientEndpoint。ClientEndpoint是一个ThreadSafeRpcEndpoint消息循环体,至此就生成了Driver ClientEndpoint。在ClientEndpoint的onStart方法中向Master提交注册。这里通过masterEndpoint向Master发送RequestSubmitDriver(driverDescription)请求,完成Driver的注册。
Client.scala的onStart的源码如下:

Master收到Driver ClientEndpoint的RequestSubmitDriver消息以后,就将Driver的信息加入到waitingDrivers和drivers的数据结构中。然后进行schedule()资源分配,Master向Worker发送LaunchDriver的消息指令。
Master.scala的源码如下:

在Client.scala的onStart代码中,提交的配置参数始终在不同的对象、节点上传递。Master把Driver加载到Worker节点并启动,Worker节点上运行的Driver同样包含配置参数。当Driver端的SparkContext启动并实例化DAGScheduler、TaskScheduler时,StandaloneSchedulerBackend在做另一件事情——实例化StandaloneAppClient,StandaloneAppClient中有StandaloneApp-ClientPoint,也是一个RPC端口的引用,用于和Master进行通信。在StandaloneAppClientPoint的onStart方法中,向Master发送RegisterApplication(appDescription,self)请求,Master节点收到请求并调用schedule方法,向Worker发送LaunchExecutor(masterUrl,exec.application.id,exec.id, exec.application.desc, exec.cores, exec.memory)请求,Worker节点启动ExecutorRunner。ExecutorRunner中启动CoarseGrainedExecutorBackend并向Driver注册。
在CoarseGrainedExecutorBackend的main方法中,代码如下:

从程序提交一直到CoarseGrainedExecutorBackend进程启动,配置参数一直被传递。在CoarseGrainedExecutorBackend中取出了cores配置信息,并通过run(driverUrl, executorId,hostname, cores, appId, workerUrl, userClassPath)将cores传入run方法,CoarseGrainedExecutor-Backend以进程的形式在JVM中启动,此时JVM的资源指占用资源的数量并启动起来。需要注意的是,在一个Worker节点上,只要物理内核的个数和内存大小能够满足Executor启动要求,一个Worker节点上就可以运行多个Executor。
6.3.3 Driver和Master交互源码详解
从Spark-Submit的脚本分析,提交应用程序时,Main启动的类,也就是用户最终提交执行的类是org.apache.spark.deploy.SparkSubmit。SparkSubmit的全路径为org.apache.spark.deploy.SparkSubmit。SparkSubmit是启动一个Spark应用程序的主入口点。当集群管理器为STANDALONE、部署模式为CLUSTER时,根据提交的两种方式将childMainClass分别设置为不同的类,同时将传入的args.mainClass(提交应用程序时设置的主类)及其参数根据不同集群管理器与部署模式进行转换,并封装到新的主类所需的参数中。在REST方式(Spark 1.3+)方式中,childMainClass是"org.apache.spark.deploy.rest.RestSubmissionClient";在传统方式中,childMainClass是"org.apache.spark.deploy.Client"。
接下来以REST方式讲解。当提交方式为REST方式(Spark 1.3+)时,会将应用程序的主类等信息封装到RestSubmissionClient类中,由该类负责向RestSubmissionServer发送提交应用程序的请求,而RestSubmissionServer接收到应用程序提交的请求后,会向Master发送RequestSubmitDriver消息,然后由Master根据资源调度策略,启动集群中相应的Driver,执行提交的应用程序。Cluster部署模式下的部署与执行框架如图6-3所示。
为了体现各个组件间的部署关系,这里以框架图的形式进行描述,对应地,可以从时序图的角度去理解各个类或组件之间的交互关系。其中,组件Master和Worker的标注在方框的左上角,其他方框表示一个具体的实例。
其中,RestSubmissionClient是提交应用程序的客户端处,对提交的应用程序进行封装的类。之后各个组件间的交互流程分析如下。
(1)第1步constructSubmitRequest,就是在RestSubmissionClient实例中,根据提交的应用程序信息,构建出提交请求。
(2)然后继续第2步createSubmission,在该步骤中向RestSubmissionServer发送post请求,即图6-3中对应的第3步(注意,实际上是在第2步中调用)。
(3)RestSubmissionServer接收到post请求后,由对应的Servlet进行处理,这里对应为StandaloneSubmitRequestServlet,即开始第4步,调用doPost,发送Post请求。
(4)doPost中继续第5步handleSubmit,开始处理提交请求。在处理过程中,向Master的RPC终端发送消息RequestSubmitDriver,对应图中的第6步。

图6-3 Cluster部署模式下的部署与执行框架
(5)Master接收到该消息后,执行第7步createDriver,创建Driver,需要由Master的调度机制创建,对应第8步schedule,获取分配的资源后,向Worker(这些Worker启动时会注册到Master上)的RPC终端发送LaunchDriver消息。
(6)Worker在RPC终端接收到消息后开始处理,实例化一个DriverRunner,并运行之前封装的应用程序。
注意:从上面部署框架及其术语解析部分可以知道,由于提交的应用程序在main部分包含了SparkContext实例,因此我们也称之为Driver Program,即驱动程序。因此,在框架中,对应在Master和Worker处都使用Driver,而不是Application(应用程序)。
其中主要的源码及其分析如下。
(1)RestSubmissionClient中的run方法。
RestSubmissionClient.scala的源码如下:

(2)收到提交的Post请求之后,StandaloneSubmitRequestServlet向Master的RPC终端发送RequestSubmitDriver请求。
StandaloneRestServer.scala的源码如下:

(3)构建DriverDescription的buildDriverDescription方法。
StandaloneRestServer.scala的源码如下:

(4)Master接收RequestSubmitDriver,处理消息并返回SubmitDriverResponse消息。
Master.scala的源码如下:

(5)Master的schedule():调度机制的调度。
Master.scala的源码如下:

(6)Worker上的Driver启动。
Worker.scala的源码如下:

Driver Client管理Driver,包括向Master提交Driver、请求Kill Driver等。Driver Client与Master间的交互消息如下。
DeployMessages.scala的源码如下:

Driver在handleSubmit方法中向Master请求提交RequestSubmitDriver消息。
Master收到Driver StandaloneSubmitRequestServlet发送的消息RequestSubmitDriver。Master做相应的处理以后,返回Driver StandaloneSubmitRequestServlet消息SubmitDriver-Response。
Master的源码如下:

类似地,Master收到Driver StandaloneKillRequestServlet方法中发送的RequestKillDriver消息,Master做相应的处理以后,返回Driver StandaloneKillRequestServlet消息KillDriverResponse。
Master收到Driver StandaloneStatusRequestServlet方法中发送的RequestDriverStatus更新消息,Master做相应的处理以后,返回Driver StandaloneStatusRequestServlet消息DriverStatusResponse。