C/C++教程

Spark源码解析(七)Action算子解析

本文主要是介绍Spark源码解析(七)Action算子解析,对大家解决编程问题具有一定的参考价值,需要的程序猿们随着小编来一起学习吧!

1.任务提交分析

        这里以org.apache.spark.examples.SparkPi为例。当执行reduce(_+_)方法时,其底层调用了sc.runJob方法。核心代码如下:

/**
  *   注释:(rdd, func, partitions, callSite, resultHandler, properties)
  *   1、应用程序调用 action 算子
  *   2、sparkContext.runJob()
  *   3、dagScheduler.runJob()
  *   4、TaskScheduler.submitTasks(new TaskSet())
  *   5、SchedulerBackEnd.driverEndpoint 提交任务
  */
dagScheduler.runJob(rdd, cleanedFunc, partitions, callSite, resultHandler, localProperties.get)

        其中runJob方法中执行的核心代码:

/**
 * TODO  注释: 提交任务
 *   参数解析:
 *   1、rdd:要在其上运行任务的参数RDD目标RDD
 *   2、func:在RDD的每个分区上运行的函数
 *   3、partitions:要运行的分区的集;某些作业可能不希望在目标RDD的所有分区上进行计算,例如,对于 first() 之类的操作。
 *   4、callSite:在用户程序中调用此作业的位置
 *   5、resultHandler:回调函数,以将每个分区结果传递给Xxx
 *   6、properties:要附加到此作业的scheduler属性,例如fair scheduler pool name
 *
 *   rdd1.xx1().xx2().xx3().xx4() 这里的 rdd = rdd1.xx1().xx2().xx3()
 *   
 */
val waiter = submitJob(rdd, func, partitions, callSite, resultHandler, properties)

        其内部核心代码为:

 /**
  * TODO 注释:
  *   第一步:封装一个JobWaiter对象;
  *   第二步:将 JobWaiter 对象赋值给 JobSubmitted 的 listener 属性,
  *          并将 JobSubmitted(DAGSchedulerEvent事件)对象传递给 eventProcessLoop 事件循环处理器。
  *          eventProcessLoop 内部事件消息处理线程将会接收 JobSubmitted 事件,
  *          并调用dagScheduler.handleJobSubmitted(...) 方法来处理事件;
  *   第三步:返回 JobWaiter 对象。
  */
 val waiter = new JobWaiter(this, jobId, partitions.size, resultHandler)
 /**
  * TODO 注释:这是提交任务运行
  *   eventProcessLoop 就是当初 DAGScheduler 在初始化的时候,创建的一个 DAGSchedulerEventProcessLoop
  *   这个组件主要负责:任务的提交执行
  *   把 JobSubmitted 这个消息,放入了 eventQueue 队列中
  */
 eventProcessLoop.post(JobSubmitted(jobId, rdd, func2, partitions.toArray, callSite, waiter, SerializationUtils.clone(properties)))
 // TODO 注释: 返回结果对象的引用
 waiter

        通过eventProcessLoop.post(...)将任务放入了eventQueue队列中,有通过eventProcessLoop.start()方法将任务提交。前面有见到以下代码是等待任务的提交,正好有对应上:

/**
 * TODO 注释: driver 中,初始化了一个  dagSchedudelr
 *   它里面又初始化了一个 eventThread 专门用来处理  JobSubmitted
 */
// Exposed for testing.
private[spark] val eventThread = new Thread(name) {
	setDaemon(true)
	
	override def run(): Unit = {
		try {
			while (!stopped.get) {
				// TODO 注释:获取消息
				// TODO 注释:一定要注意:当 sparkContext 还没有初始化好的时候,是不执行 sc.runJob 提交任务的。
				// TODO 注释:当执行 sc.runJob(sc) 的时候,就会提交 Job 到这儿来。
				val event = eventQueue.take()
				try {
					/**
					 * TODO 注释:根据事件的类型,调用不同的 handleXXX 方法来进行处理。
					 *   当接收到任务提交的时候: event = JobSubmitted
					 */
					onReceive(event)
				} catch {
					case NonFatal(e) => try {
						onError(e)
					} catch {
						case NonFatal(e) => logError("Unexpected error in " + name, e)
					}
				}
			}
		} catch {
			case ie: InterruptedException => // exit even if eventQueue is not empty
			case NonFatal(e) => logError("Unexpected error in " + name, e)
		}
	}
}

        在这里通过onReceive进行任务的提交,任务提交给了DAGScheduler

2.Stage切分与提交

        任务提交核心代码如下,包含了2个方面,Stage切分与Task分发与执行。在DAGScheduler.handleJobSubmitted(...)中有如下代码:

finalStage = createResultStage(finalRDD, func, partitions, jobId, callSite)

        这个finalRDD就是rdd链条中的最后一个RDD,也就是触发sc.runJob()方法执行的RDD。必然是针对某个RDD调用了一个action算子才触发执行的,则该RDD就是finalRDD。

        注意以下概念:

  • ShuffleMapStage  +  ResultStage
  • ShuffleMapTask + ResultTask
  • ShuffleDependency + NarrowDependency

        在createResultStage(...)方法中,做了以下事情:

/**
 * TODO Create a ResultStage associated with the provided jobId.
 *  进行Stage切分的详细方法实现
 */
private def createResultStage(rdd: RDD[_], func: (TaskContext, Iterator[_]) => _, partitions: Array[Int], jobId: Int,
	callSite: CallSite): ResultStage = {

	checkBarrierStageWithDynamicAllocation(rdd)
	checkBarrierStageWithNumSlots(rdd)
	checkBarrierStageWithRDDChainPattern(rdd, partitions.toSet.size)

	// TODO 注释:获得父stage,若没有shuffle则返回空List
	// TODO 注释:获取当前Stage的parent Stage,这个方法是划分Stage的核心实现
	// 所有的父类stage都已经构建完成并返回给parents。
	val parents = getOrCreateParentStages(rdd, jobId)

	// TODO 注释: finalStage=resultStage 的 stageID  这里返回的是最后一个stage的Id
	val id = nextStageId.getAndIncrement()

	// TODO 注释:创建当前最后的ResultStage
	// TODO 注释:parents 所有的 父 stage
	val stage = new ResultStage(id, rdd, func, partitions, parents, jobId, callSite)

	// TODO 注释:将 ResultStage 与 stageId 相关联, 保存在 map 中
	// TODO 注释:stageIdToStage = new HashMap[Int, Stage]
	stageIdToStage(id) = stage

	// TODO 注释:更新该job中包含的stage
	updateJobIdStageIdMaps(jobId, stage)

	// TODO 注释:返回ResultStage
	stage
}

这篇关于Spark源码解析(七)Action算子解析的文章就介绍到这儿,希望我们推荐的文章对大家有所帮助,也希望大家多多支持为之网!