NodeManager部分
NodeManager.nodeManagerShutdownHook ->NodeStatusUpdaterImpl.serviceStop() (检测isNMUnderSupervisionWithRecoveryEnabled == false, 判断有没有设置recovery,设置了就直接返回) ->NodeStatusUpdaterImpl.unRegisterNM() ->ResourceTrackerPBServiceImpl.unRegisterNodeManager()
ResourceManager部分
ResourceTrackerPBServiceImpl.unRegisterNodeManager() ->ResourceTrackerService.unRegisterNodeManager()
Resourcemanager部分
NMLivelinessMonitor.expire() ->RMNodeImpl.handle() ->stateMachine.doTransition() ->StatusUpdateWhenHealthyTransition.transition()->RMNodeImpl.reportNodeUnusable() ->CapacityScheduler.handle(NODE_REMOVED) ->CapacityScheduler.removeNode() ->触发container关闭
在设置recovery后,硬盘上的container任务中间数据不会在nodemanger退出后进行清理。
nodemanager进程退出后,contianer进程继续执行,container中不分配新的任务后由applicationMaster调度退出,此时如果有中间数据会写入中间数据文件。
中间文件被reducer拉取时依赖nodemanager进程的tez ShuffleHandler的http服务,如果在container完成后nodemanager还是没有启动成功,reducer端就会拉取失败,这时有两个位置触发下一步的变化:
生产者
ApplicationMaster部分
AMRMClientAsyncImpl.HeartbeatThread.run() ->AMRMClientImpl.allocate() ->ApplicationMasterProtocolPBClientImpl.allocate() ->ApplicationMasterProtocolPBServiceImpl.allocate()
ResourceManager部分
ApplicationMasterProtocolPBServiceImpl.allocate() ->ApplicationMasterService.allocate()
消费者
ApplicationMaster部分
AMRMClientAsyncImpl.CallbackHandlerThread.run() ->YarnTaskSchedulerService.onNodesUpdated() ->TaskSchedulerContextImpl.nodesUpdated() ->TaskSchedulerManager.nodesUpdated() ->EventHandler.handle(new AMNodeEventStateChanged()) ->AMContainerImpl.handle() ->AMContainerImpl.stateMachine
VertexImpl:4705行
this.maxFailedTaskAttempts = conf.getInt(TezConfiguration.TEZ_AM_TASK_MAX_FAILED_ATTEMPTS, TezConfiguration.TEZ_AM_TASK_MAX_FAILED_ATTEMPTS_DEFAULT); this.taskRescheduleHigherPriority = conf.getBoolean(TezConfiguration.TEZ_AM_TASK_RESCHEDULE_HIGHER_PRIORITY, TezConfiguration.TEZ_AM_TASK_RESCHEDULE_HIGHER_PRIORITY_DEFAULT); this.taskRescheduleRelaxedLocality = conf.getBoolean(TezConfiguration.TEZ_AM_TASK_RESCHEDULE_RELAXED_LOCALITY, TezConfiguration.TEZ_AM_TASK_RESCHEDULE_RELAXED_LOCALITY_DEFAULT); this.maxAllowedOutputFailures = conf.getInt(TezConfiguration .TEZ_TASK_MAX_ALLOWED_OUTPUT_FAILURES, TezConfiguration .TEZ_TASK_MAX_ALLOWED_OUTPUT_FAILURES_DEFAULT); this.maxAllowedOutputFailuresFraction = conf.getDouble(TezConfiguration .TEZ_TASK_MAX_ALLOWED_OUTPUT_FAILURES_FRACTION, TezConfiguration .TEZ_TASK_MAX_ALLOWED_OUTPUT_FAILURES_FRACTION_DEFAULT); this.maxAllowedTimeForTaskReadErrorSec = conf.getInt( TezConfiguration.TEZ_AM_MAX_ALLOWED_TIME_FOR_TASK_READ_ERROR_SEC, TezConfiguration.TEZ_AM_MAX_ALLOWED_TIME_FOR_TASK_READ_ERROR_SEC_DEFAULT);