大家好,我是小寒~
在 Flink 中提交作业到 Flink 集群后, Flink 集群是如何为作业分配资源,以及如何管理集群资源的呢?今天我们就来聊一聊 Flink 资源管理相关的内容。
2022 大数据学习路线图
原文链接
Flink 涉及的资源分为两级:集群资源和Flink自身资源。
集群资源管理的是硬件资源,包括 CPU、内存、GPU等,由资源管理框架(Yarn、K8s、Mesos)来管理,Flink 从资源管理框架中申请和释放资源。
Flink 从资源管理框架申请资源容器(Yarn 中的 Container),一个容器中运行一个TaskManager 进程。容器的资源对 Flink 来说也是比较粗粒度的。因为计算类型的不同,一个任务占用一个容器可能无法充分利用资源,所以单个容器会被多个Flink 的任务共享。
Flink 对申请到的资源进行切分,每一份叫作 TaskSlot。
从总体上来说,在资源管理中涉及了 JobMaster、ResourceManager、TaskManager 三种角色。JobMaster 是 Slot 资源的使用者,向 ResourceManager 申请资源,ResourceManager 负责分配资源和资源不足时申请资源,资源空闲时释放资源。TaskManager 是 Slot 资源的持有者,在其 Slot 清单中记录了 Slot 分配给了哪个作业的哪个 Task。
资源管理器在 Flink 中叫做 ResourceManager。Flink 同时支持不同的资源集群类型。ResourceManager 位于 Flink 和 资源管理集群(Yarn、K8s)之间,是 Flink 集群级资源管理的抽象,其主要作用如下。
在 FlinK 中内置了 4 种 ResourceManager,即 YarnResourceManager、KubernetesResourceManager(K8s)、StandaloneResourceManager、 MesosResourceManager,分别对应于不同的资源管理框架。
Slot 管理器在 Flink 中叫作 SlotManager,是 ResourceManager 的组件,从全局角度维护当前有多少个 TaskManager,每个 TaskManager 有多少个空闲的 Slot 和 Slot 等资源的使用情况。当 Flink 作业调度执行时,根据 Slot 分配策略为 Task 分配执行位置。
SlotManager 虽然是 ResourceManager 的组件,但是其逻辑是通用的,并不关心到底使用了哪种资源集群。面向不同的对象,SlotManager 提供不同的功能。
Flink 在决定 Task 运行在哪个 TaskManager 上时,会根据策略进行选择,选择 Slot 的时候有不同的选择策略。
选择策略从总体上分为两大类。
位置优先的选择策略 LocationPreferenceSlotSelectionStrategy
位置优先的策略分为两类:
已分配 Slot 优先的选择策略(PreviousAllocationSlotSelectionStrategy),如果当前没有空闲的已分配的Slot,则仍然会使用位置
优先的策略来分配和申请 Slot。
Slot 资源池在 Flink 中叫作 SlotPool,是 JobMaster 中记录当前作业从 TaskManager 获取的 Slot 的集合。JobMaster 的调度器首先从SlotPool 中获取 Slot 来调度任务,SlotPool 在没有足够的 Slot 资源执行作业的时候,首先会尝试从 ResourceManager 中获取资源。如果 ResourceManager 不可用、ResourceManager 拒绝资源请求 或 请求超时,资源申请失败,则作业启动失败。
JobMaster 申请到资源之后,会在本地持有 Slot,避免 ResourceManager 异常导致作业运行失败。对于批处理而言,持有资源 JobMaster 首先可以避免多次向 ResourceManager 申请资源,同时 ResourceManager 不可用也不会影响作业的继续执行,只有资源不足时才会导致作业执行失败。
当作业已经执行完毕或者作业完全启动且资源有剩余时,JobMaster 会将剩余资源交还给 ResourceManager 。
每一个 TaskManager 都是一个 java 进程,TaskManager 为每个 Task 分配一个线程。一个 TaskManager 中可能执行一个或者多个 Task。TaskManager 通过 Slot 来控制(一个 TaskManager 至少有一个 Slot)TaskManager 能够接收多少个 Task。
Slot 表示 TaskManager 拥有资源的一个固定大小的子集。假如 一个 TaskManager 有 3 个 Slot,那么它会将其管理的内存分成 3 份给各个 Slot,在没有 Slot 共享的情况下,并行度为 2 的作业部署之后,Slot Task 的分配关系如下图所示。
通过调整 Slot 的数量,用户可以定义 Task 之间如何相互隔离。如果一个 TaskManager 只有一个 Slot,意味着每个 Task 独立地运行在 JVM 中。而一个 TaskManager 有多个 Slot,则意味着更多的 Task 可以共享一个 JVM。在同一个 JVM 进程中的 Task 将共享 TCP 连接和心跳消息。Task 之间也可能共享数据集和数据结构,这样可以减少每个 Task 的负载。
虽然通过 Slot 对 TaskManager 的资源进行划分,在一定程度上能够提高集群的计算资源利用率,但是这种做法并没有考虑到不同 Task 的计算任务对资源需求的差异,计算任务有 IO 密集型、内存密集型、 CPU 密集型、GPU 密集型等不同的资源消耗类型,有时候还会是多种资源混合类型。
所以在 Slot 的基础上,Flink 设计了 Slot 共享机制。其中,SlotSharingManager 用在 Flink 作业的执行调度中,负责 Slot 的共享,不同的 Task 可以共享 Slot。
默认情况下,Flink 作业共享同一个 SlotSharingGroup,同一个作业中来自不同 JobVertex 的 Task 可以共享 Slot。使用 Slot 共享,可以在一个 Slot 中运行 Task 组成的流水线。共享 Slot 带来了如下优点。
资源分配简单
Flink 集群需要的 Slot 的数量和作业中的最高并行度一致,不需要计算一个程序总共包含多少个 Task。
资源利用率高
如果没有 Slot 共享,资源密集型的 Task (如长周期的窗口计算)跟非密集型的作业占用相同的资源,在整个 TaskManager 层面上,资源没有充分利用。如果共享 Slot ,可以充分利用 Slot 资源。如下图所示,将并行度从2提高到6,可以充分利用Slot资源,同时确保资源密集型的 Task 在TaskManager中公平分配。
Slot 共享管理器在 Flink 中叫作 SlotSharingManager,Slot 共享组在 Flink 中叫做 SlotSharingGroup。SlotSharingManager 对象管理资源共享和分配,一个 Slot 共享组 对应一个 Slot 共享管理器。两者在作业调度执行的时候发挥作用,部署 Task 之前,选择 Slot 确定 Task 发布到哪个TaskManager。
Flink 有两种共享组。
SlotSharingGroup
非强制性共享约束,根据组内的 JobVertext ID 查找是否已有可以共享的 Slot,只要确保相同 JobVertext ID 不能出现在一个共享的 Slot 内即可。
在符合资源要求的 Slot 中,找到没有相同 JobVertext ID 的 Slot,根据 Slot 选择策略选择一个 Slot 即可,如果没有符合条件的,则申请新的 Slot。
CoLocationGroup
CoLocationGroup 又叫作本地约束共享组,具有强制性的 Slot 共享限制。CoLocationGroup 用在迭代运算中,即在 IterativeStream 的 API 中调用。迭代运算中的 Task 必须共享同一个 TaskManager 的 Slot。CoLocationGroup 可以看成是 SlotSharingGroup 的特例。
此处需要注意, JobGraph 向 ExecutionGraph 的转换过程中,为每一个 ExecutionVertext 赋予了按照并行度编写的编号,相同编号的迭代计算 ExecutionVertext 会被放入本地共享约束组中,共享相同的 CoLocationConstraint 对象,在调度的时候,根据编号就能找到本组其他 Task 的 Slot 信息。
CoLocation 共享根据组内每个 ExecutionVertext 关联的 CoLocationConstraint 查找是否有相同 CoLocationConstraint 约束已分配 Slot 可用,在调度作业执行的时候,首先要找到本约束中其他 Task 部署的 TaskManager,如果没有则申请一个新的 Slot,如果有则共享该 TaskManager 上的 Slot。
单独 Slot 资源申请
该类型的 Slot 申请首先会从 JobMaster 的当前 SlotPool 中尝试获取资源,如果资源不足,则 SlotPool 向 ResourceManager 请求新的 Slot。
共享 Slot 资源申请
共享 Slot 在申请的时候,需要向 SlotSharingManager 请求资源,如果有 CoLocation 限制,则申请 CoLocation MultiTaskSlot,
否则申请一般的 MultiTaskSlot。