本文隶属于专栏《1000个问题搞定大数据技术体系》,该专栏为笔者原创,引用请注明来源,不足和错误之处请在评论区帮忙指出,谢谢!
本专栏目录结构和参考文献请见1000个问题搞定大数据技术体系
RDD DAG 构建了基于数据流之上的操作算子流,即 RDD 的各个分区的数据总共会经过哪些 Transformation 和 Action 这两种类型的一系列操作的调度运行,从而 RDD 先被 Transformation 操作转换为新的 RDD ,然后被 Action 操作将结果反馈到 Driver Program 或存储到外部存储系统上。
可以结合我的这篇博客来理解——DAG对大数据处理有什么好处?
上面提到的一系列操作的调度运行其实是 DAG 提交给 DAGScheduler 来解析完成的。
DAGScheduler 是面向 Stage 的高层级的调度器。
DAGScheduler 把 DAG 拆分成很多的 Tasks ,每组的 Tasks 都是一个 Sage ,解析时是以 Shuffle 为边界反向解析构建 Stage ,每当遇到 Shuffle ,就会产生新的 Stage ,然后以一个个 TaskSet (每个 Stage 封装一个 TaskSet )的形式提交给底层调度器 TaskScheduler 。
DAGScheduler 需要记录哪些 RDD 被存入磁盘等物化动作,同时要寻求 Task 的最优化调度,如在 Stage 内部数据的本地性等。
DAGScheduler 还需要监视因为 Shuffle 跨节点输出可能导致的失败,如果发现这个 Stage 失败,可能就要重新提交该 Stage 。
/** * 实现面向 stage 调度的高层次调度层。 * * 它为每个作业计算 stage 的 DAG,追踪那些被具象化的 rdd 和 stage 输出,并找到最小化的调度方式来运行作业。 * * 然后,它将 stage 作为 TaskSets 提交给在集群上的 TaskSchedulerImpl 来运行。 * * TaskSets 包含完全独立的任务,这些任务可以根据集群中已有的数据(例如,来自前一个 stage 的 map 输出文件)立即运行,但如果这些数据不可用,TaskSets 可能会失败。 * * Spark 的 stage 是通过在shuffle边界打破 RDD 的 graph 来创建的。 * * 具有“窄”依赖关系的 RDD 操作,如 map() 和 filter() ,在每个 stage 都通过 pipeline 连接到一组任务中,但是具有 Shuffle 依赖关系的操作需要多个 stage * * (一个 stage 用于写一组 map 输出文件,另一个 stage 用于在 Shuffle 之后读取这些文件)。 * * 最后,每个 stage 将只对其他 stage 具有 Shuffle 依赖关系,并且可以在其中计算多个操作。 * * 这些操作的实际 pipelining 操作发生在各种 RDD 的 RDD.compute() 函数中。 * * DAGScheduler 除了提供 stage 的 DAG 外,还根据当前缓存状态确定运行每个任务的首选位置,并将这些位置传递给低层次的调度器 TaskScheduler。 * * 此外,它还处理由于 Shuffle 输出文件丢失而导致的失败问题,在这种情况下,可能需要重新提交旧 stage。 * * stage 内部的不是由 Shuffle 文件丢失引起的失败问题,由 TaskScheduler 处理,它将在取消整个 stage 之前每个任务重试几次。 * * 在浏览此代码时,有几个关键概念: * * 1. Jobs(由 ActiveJob 表示)是提交给调度程序的高层次工作项。 * * 例如,当用户调用像 count() 这样的操作时,job 将通过 submitJob 提交。 * * 每个 job 可能需要执行多个 stage 来构建中间数据。 * * 2. Stage 是在 job 中计算中间结果的任务集,其中每个 Task 在相同 RDD 的分区上计算相同的函数。 * * stage 在 Shuffle 边界处分开,这引入了一个屏障(在这里我们必须等待前一 stage 完成以获取输出)。 * * 有两种类型的阶段:ResultStage(执行操作的最后阶段)和 ShuffleMapStage(为 shuffle 写入 map 输出文件)。 * * 如果多个 job 重用相同的 RDD,则 stage 通常在多个 job 之间共享。 * * 3. Task 是单独的工作单元,每个工作单元被发送到一台机器上。 * * 4. Cache tracking:DAGScheduler 会找出缓存了哪些 rdd 以避免重新计算它们,并且同样会记住哪些 shuffle map stage 已经生成了输出文件以避免重新跑一次 shuffle 的 map 端。 * * 5. Preferred locations:DAGScheduler 还基于底层 rdd 的首选位置,缓存 或者 Shuffle 数据的位置,计算在 stage 中运行每个 Task 的位置。 * * 6. Cleanup:当依赖于数据结构的正在运行的 job 完成时,所有数据结构都被清除,以防止长时间运行的应用程序中出现内存泄漏。 * * 要从失败中恢复,同一阶段可能需要多次运行,这称为“attempts”。 * * 如果 TaskScheduler 报告某个 Task 由于前一 stage 的 map 输出文件丢失而失败,则 DAGScheduler 将重新提交丢失的 stage。 * * 这是通过 FetchFailed 的 CompletionEvent 或 ExecutorLost 事件检测到的。 * * DAGScheduler 将等待一小段时间以查看其他节点或 Task 是否失败,然后重新提交 TaskSets 用于计算丢失 Task 的所有丢失的 stage 。 * * 作为此过程的一部分,我们可能还必须为以前清理 Stage 对象的旧(已完成)Stage 创建 Stage 对象。 * * 由于来自旧 stage 尝试的任务可能仍在运行,因此必须小心应对正确 stage 对象中接收的任何事件。 * * 对这个类进行修改或审核时应当注意: * * - 所有的数据结构都应该在涉及它们的 job 结束时清除,以避免在长时间运行的程序中状态的无限累积。 * * - 添加新数据结构时,请更新 `DAGSchedulerSuite.assertDataStructuresEmpty` 以包含新结构。这将有助于捕捉内存泄漏。 * * @param sc Spark 上下文对象 * @param taskScheduler Spark 底层针对 Task 的调度器 * @param listenerBus 事件 bus,基于 reactor 思想 * @param mapOutputTracker 用来追踪 map 输出结果的,Shuffle 时会用到 * @param blockManagerMaster master 的全局块管理器 * @param env Spark 环境对象 * @param clock 由“System”API报告的操作系统的实际时间钟 */ private[spark] class DAGScheduler( private[scheduler] val sc: SparkContext, private[scheduler] val taskScheduler: TaskScheduler, listenerBus: LiveListenerBus, mapOutputTracker: MapOutputTrackerMaster, blockManagerMaster: BlockManagerMaster, env: SparkEnv, clock: Clock = new SystemClock()) extends Logging