receive 方法其实是大量的case,分别对应处理不同的场景
case msg: RegisterWorkerResponse case SendHeartbeat case WorkDirCleanup case MasterChanged case ReconnectWorker case LaunchExecutor case executorStateChanged: ExecutorStateChanged case KillExecutor(masterUrl, appId, execId) case LaunchDriver(driverId, driverDesc, resources_) case KillDriver(driverId) case driverStateChanged @ DriverStateChanged(driverId, state, exception) case ReregisterWithMaster case ApplicationFinished(id) case DecommissionWorker case WorkerSigPWRReceived
private def handleRegisterResponse(msg: RegisterWorkerResponse): Unit = synchronized { msg match { case RegisteredWorker(masterRef, masterWebUiUrl, masterAddress, duplicate) => val preferredMasterAddress = if (preferConfiguredMasterAddress) { masterAddress.toSparkURL } else { masterRef.address.toSparkURL } if (duplicate) { logWarning(s"Duplicate registration at master $preferredMasterAddress") } logInfo(s"Successfully registered with master $preferredMasterAddress") registered = true changeMaster(masterRef, masterWebUiUrl, masterAddress)/*更新master信息*/ forwardMessageScheduler.scheduleAtFixedRate( () => Utils.tryLogNonFatalError { self.send(SendHeartbeat) }, 0, HEARTBEAT_MILLIS, TimeUnit.MILLISECONDS)/*启动一个定时任务 开始心跳*/ if (CLEANUP_ENABLED) { logInfo( s"Worker cleanup enabled; old application directories will be deleted in: $workDir") forwardMessageScheduler.scheduleAtFixedRate( () => Utils.tryLogNonFatalError { self.send(WorkDirCleanup) }, //给自己发送一个清理目录的消息??这是干嘛的 CLEANUP_INTERVAL_MILLIS, CLEANUP_INTERVAL_MILLIS, TimeUnit.MILLISECONDS) } val execs = executors.values.map { e => new ExecutorDescription(e.appId, e.execId, e.cores, e.state) } //给master发送一个当前状态的信息 masterRef.send(WorkerLatestState(workerId, execs.toList, drivers.keys.toSeq)) case RegisterWorkerFailed(message) => if (!registered) { logError("Worker registration failed: " + message) System.exit(1) } case MasterInStandby => // Ignore. Master not yet ready. } }
if (connected) { sendToMaster(Heartbeat(workerId, self)) } private def sendToMaster(message: Any): Unit = { master match { case Some(masterRef) => masterRef.send(message)//给master发送一个心跳信息 case None => logWarning( s"Dropping $message because the connection to master has not yet been established") } }
//所有的executors + drivers 目录 val appIds = (executors.values.map(_.appId) ++ drivers.values.map(_.driverId)).toSet try { val cleanupFuture: concurrent.Future[Unit] = concurrent.Future { val appDirs = workDir.listFiles() if (appDirs == null) { throw new IOException("ERROR: Failed to list files in " + appDirs) } appDirs.filter { dir => val appIdFromDir = dir.getName val isAppStillRunning = appIds.contains(appIdFromDir) //当前是一个目录,并且不运行了,APP_DATA_RETENTION_SECONDS 是过了这个时间就清理目录配置项 dir.isDirectory && !isAppStillRunning && !Utils.doesDirectoryContainAnyNewFiles(dir, APP_DATA_RETENTION_SECONDS) }.foreach { dir => logInfo(s"Removing directory: ${dir.getPath}") Utils.deleteRecursively(dir) if (conf.get(config.SHUFFLE_SERVICE_DB_ENABLED) && conf.get(config.SHUFFLE_SERVICE_ENABLED)) { //是移除shuffle服务中的文件的 并不是是清理所用文件的 //我也记得是任务kill的时候会看到清理目录的日志的 shuffleService.applicationRemoved(dir.getName) } } }(cleanupThreadExecutor) cleanupFuture.failed.foreach(e => logError("App dir cleanup failed: " + e.getMessage, e) )(cleanupThreadExecutor) } catch { case _: RejectedExecutionException if cleanupThreadExecutor.isShutdown => logWarning("Failed to cleanup work dir as executor pool was shutdown") }
logInfo("Master has changed, new master is at " + masterRef.address.toSparkURL) //worker节点只是 被动接受主节点改变的事实 改变自身的配置即可 //不存在 消息发送 changeMaster(masterRef, masterWebUiUrl, masterRef.address) val executorResponses = executors.values.map { e => WorkerExecutorStateResponse(new ExecutorDescription( e.appId, e.execId, e.cores, e.state), e.resources) } val driverResponses = drivers.keys.map { id => WorkerDriverStateResponse(id, drivers(id).resources)} //把当前任务的每个状态给 master汇报一下就行了 masterRef.send(WorkerSchedulerStateResponse( workerId, executorResponses.toList, driverResponses.toSeq))
registerWithMaster() //这个方法上面有介绍的