调度引擎是关键的基础设施,不但是定时执行任务,更是大规模分布式任务引擎,分布式并行处理平台,管理计算节点集群,提供高吞吐的可伸缩的数据处理能力。
公司日益增长的业务,对调度引擎高吞吐,高并发的要求也快速增长,需构建一个无中心,管理大集群,健壮分片容错的任务调度任务平台,支撑业务发展。
同时,分布式调度引擎也是 datax,可观测-性能指标组件(sentinel dashboard)分布式改造的核心技术
芋道源码作业调度中间件 Elastic-Job 源码解析合集_芋道源码-CSDN博客
elastic-job - 文集 - 简书
无中心/有中心分布式 有中心分布式设置中心节点负责集群协调和元数据保存等工作,例如 xxl-job 的 admin/executor, dolphin-scheduler master-worker 都是有中心分布式设计;真正无中心设计很少,大部分是节点平等,都可以通过选举成为主节点,也就是,任何一个节点都可以成为中心
脑裂 无中心分布式设计,当网络出现问题,节点分割成多个集群,集群间因不能通讯而不能达到状态一致,通常解决方案是集群节点数奇数,节点数少于总数的集群中一半停止工作
分片/容错 分片是调度平台很重要的特性,调度处理大规模数据,需要分片执行,分片执行带来新的问题,分片失败,平台回收分片,转移到其他节点执行
服务可分为功能服务和核心服务,其中核心服务支撑功能服务的服务,功能任务有任务注册,任务执行,失效转移等,是调度平台的”业务”功能
znode 设计是 zookeeper 分布式协调的灵魂,zk 的临时 znode,持久 znode,watch 机制
本文分析的核心,服务如何使用 znode,znode 间组合使用
注册中心属于生态一部分,但注册中心主要用在 znode 存储服务,合起来一起看
JobNodeStorage/CoordinatorRegistryCenter 两个类名字有点名不副实,CoordinatorRegistryCenter 才是 znode 节点存取;JobNodeStorage 更多是 znode”应用”:
executeInLeader 主节点选举
addDataListener 注册 znode 监听
executeInTransaction znode 操作事务 TransactionExecutionCallback
AbstracJobListener znode 事件监听器基类
LeaderExecutionCallback 主节点选举后回调
TransactionExecutionCallback znode 事务操作
总体来说,监听服务分两大类,quartz 和分布式(zk)
本文只关注 zk 部分,elastic-job 自身也只实现 zk 部分,quartz 部分留接口给用户,其他的核心服务都依赖监听服务,捕获 znode 事件,执行相应服务逻辑,因此监听服务非常关键服务,是其他服务分析的入口
ListenerManager 监听管理器的集中管理
AbstractListenerManager/ AbstractJobListener 其他服务继承,实现服务的监听管理接入的监听器实现
RegistryCenterConnectionStateListener 连接状态的监听器,连接断开暂停作业,重连后,重新初始化作业,清除分片,重启作业
Ø 谁使用:几乎所有核心服务
Ø 依赖服务:
- znode 存储服务/注册中心 znode 节点存取,监听器注册
分片是分布式任务不可或缺的特性,任务分片并行执行,极大提高处理能力,分片伴随弹性计算,节点发现,容错等能力,是分布式调度的体现
elastic-job 实现了静态分片和分片容错,当作业满足分片条件时,设置需要分片标记,等到作业执行时,判断有该标记后执行作业分配
elastic-job 的分片是先设置需要分片标记,等到需要的时候再实际分片,哪里设置分片?
ListenServersChangedJobListener 监听 server znode 和 instance znode,对于一个作业,server 可有对应多个 instance,因此 server 的上下线,同时接收到 server 和 instance 事件
server 节点是持久 znode,所以服务事件只有上线和 server enabled/disabled 变更
ShardingTotalCountChangedJobListener 监听作业配置分片数变更
另外,诊断服务也定时设置分片标记
下面分析分片,分片就是分配任务到各个在线运行实例节点
分片入口方法 ShardingService.shardingIfNecessary,回顾一下作业执行,
分片在 getShardingContexgts 完成
优先处理失效分片,如果没有,调用 ShardingService.shardingIfNecessary 正常作业分片,最终分片封装到 ShardingContexts
接下来分片代码
分片在主节点完成
LeaderService#isLeaderUntilBlock/blockUntilShardingCompleted 若非主节点等待分片完成
waitingOtherShardingItemCompleted 等待当前执行中的分片完成,依赖 monitor execution 配置
*分片运行依赖 monitorExecution 配置,也就是等不等待完成主要是看监控不监控,感觉主次没搞好;这里也反应了分片要严格按照配置要求,像失效转移分片改变需要另行处理
jobNodeStorage.fillEphemeralJobNode(ShardingNode.PROCESSING, "") 标记分片处理中,
写入/leading/sharding/processing
*processing znode 只有在非主节点等待分片完成是用到,由于 necessary 必定存在,只为监控使用
resetShardingInfo
重写分片 znode,去掉/sharding/{itemNum}/instance 分片分配,剪掉之前可能多出的分片 znode
JobShardingStrategy spi 载入,有多种策略,属于基础(infra)包,细节就不分析
最后事务写入分片分配使用 PersistShardingInfoTransactionExecutionCallback
分片分配给那个作业运行实例
清理 znode,需要分片标记和分片处理中标记
Ø 谁使用:
- 作业执行 作业分片,获取分片信息
- 诊断服务 设置分片标记,处理分布式环境下,离线作业运行实例未处理分片
Ø 依赖服务:
- znode 存储服务/注册中心 znode 节点存取,监听器注册
- 选主服务 分片需主节点执行