这里以org.apache.spark.examples.SparkPi为例。当执行reduce(_+_)方法时,其底层调用了sc.runJob方法。核心代码如下:
/** * 注释:(rdd, func, partitions, callSite, resultHandler, properties) * 1、应用程序调用 action 算子 * 2、sparkContext.runJob() * 3、dagScheduler.runJob() * 4、TaskScheduler.submitTasks(new TaskSet()) * 5、SchedulerBackEnd.driverEndpoint 提交任务 */ dagScheduler.runJob(rdd, cleanedFunc, partitions, callSite, resultHandler, localProperties.get)
其中runJob方法中执行的核心代码:
/** * TODO 注释: 提交任务 * 参数解析: * 1、rdd:要在其上运行任务的参数RDD目标RDD * 2、func:在RDD的每个分区上运行的函数 * 3、partitions:要运行的分区的集;某些作业可能不希望在目标RDD的所有分区上进行计算,例如,对于 first() 之类的操作。 * 4、callSite:在用户程序中调用此作业的位置 * 5、resultHandler:回调函数,以将每个分区结果传递给Xxx * 6、properties:要附加到此作业的scheduler属性,例如fair scheduler pool name * * rdd1.xx1().xx2().xx3().xx4() 这里的 rdd = rdd1.xx1().xx2().xx3() * */ val waiter = submitJob(rdd, func, partitions, callSite, resultHandler, properties)
其内部核心代码为:
/** * TODO 注释: * 第一步:封装一个JobWaiter对象; * 第二步:将 JobWaiter 对象赋值给 JobSubmitted 的 listener 属性, * 并将 JobSubmitted(DAGSchedulerEvent事件)对象传递给 eventProcessLoop 事件循环处理器。 * eventProcessLoop 内部事件消息处理线程将会接收 JobSubmitted 事件, * 并调用dagScheduler.handleJobSubmitted(...) 方法来处理事件; * 第三步:返回 JobWaiter 对象。 */ val waiter = new JobWaiter(this, jobId, partitions.size, resultHandler) /** * TODO 注释:这是提交任务运行 * eventProcessLoop 就是当初 DAGScheduler 在初始化的时候,创建的一个 DAGSchedulerEventProcessLoop * 这个组件主要负责:任务的提交执行 * 把 JobSubmitted 这个消息,放入了 eventQueue 队列中 */ eventProcessLoop.post(JobSubmitted(jobId, rdd, func2, partitions.toArray, callSite, waiter, SerializationUtils.clone(properties))) // TODO 注释: 返回结果对象的引用 waiter
通过eventProcessLoop.post(...)将任务放入了eventQueue队列中,有通过eventProcessLoop.start()方法将任务提交。前面有见到以下代码是等待任务的提交,正好有对应上:
/** * TODO 注释: driver 中,初始化了一个 dagSchedudelr * 它里面又初始化了一个 eventThread 专门用来处理 JobSubmitted */ // Exposed for testing. private[spark] val eventThread = new Thread(name) { setDaemon(true) override def run(): Unit = { try { while (!stopped.get) { // TODO 注释:获取消息 // TODO 注释:一定要注意:当 sparkContext 还没有初始化好的时候,是不执行 sc.runJob 提交任务的。 // TODO 注释:当执行 sc.runJob(sc) 的时候,就会提交 Job 到这儿来。 val event = eventQueue.take() try { /** * TODO 注释:根据事件的类型,调用不同的 handleXXX 方法来进行处理。 * 当接收到任务提交的时候: event = JobSubmitted */ onReceive(event) } catch { case NonFatal(e) => try { onError(e) } catch { case NonFatal(e) => logError("Unexpected error in " + name, e) } } } } catch { case ie: InterruptedException => // exit even if eventQueue is not empty case NonFatal(e) => logError("Unexpected error in " + name, e) } } }
在这里通过onReceive进行任务的提交,任务提交给了DAGScheduler
任务提交核心代码如下,包含了2个方面,Stage切分与Task分发与执行。在DAGScheduler.handleJobSubmitted(...)中有如下代码:
finalStage = createResultStage(finalRDD, func, partitions, jobId, callSite)
这个finalRDD就是rdd链条中的最后一个RDD,也就是触发sc.runJob()方法执行的RDD。必然是针对某个RDD调用了一个action算子才触发执行的,则该RDD就是finalRDD。
注意以下概念:
在createResultStage(...)方法中,做了以下事情:
/** * TODO Create a ResultStage associated with the provided jobId. * 进行Stage切分的详细方法实现 */ private def createResultStage(rdd: RDD[_], func: (TaskContext, Iterator[_]) => _, partitions: Array[Int], jobId: Int, callSite: CallSite): ResultStage = { checkBarrierStageWithDynamicAllocation(rdd) checkBarrierStageWithNumSlots(rdd) checkBarrierStageWithRDDChainPattern(rdd, partitions.toSet.size) // TODO 注释:获得父stage,若没有shuffle则返回空List // TODO 注释:获取当前Stage的parent Stage,这个方法是划分Stage的核心实现 // 所有的父类stage都已经构建完成并返回给parents。 val parents = getOrCreateParentStages(rdd, jobId) // TODO 注释: finalStage=resultStage 的 stageID 这里返回的是最后一个stage的Id val id = nextStageId.getAndIncrement() // TODO 注释:创建当前最后的ResultStage // TODO 注释:parents 所有的 父 stage val stage = new ResultStage(id, rdd, func, partitions, parents, jobId, callSite) // TODO 注释:将 ResultStage 与 stageId 相关联, 保存在 map 中 // TODO 注释:stageIdToStage = new HashMap[Int, Stage] stageIdToStage(id) = stage // TODO 注释:更新该job中包含的stage updateJobIdStageIdMaps(jobId, stage) // TODO 注释:返回ResultStage stage }