一、YarnJobClusterEntrypoint
进入main方法
SignalHandler.register(LOG); JvmShutdownSafeguard.installAsShutdownHook(LOG); Map<String, String> env = System.getenv(); final String workingDirectory = env.get(ApplicationConstants.Environment.PWD.key()); Preconditions.checkArgument( workingDirectory != null, "Working directory variable (%s) not set", ApplicationConstants.Environment.PWD.key()); try { YarnEntrypointUtils.logYarnEnvironmentInformation(env, LOG); } catch (IOException e) { LOG.warn("Could not log YARN environment information.", e); } final Configuration dynamicParameters = ClusterEntrypointUtils.parseParametersOrExit( args, new DynamicParametersConfigurationParserFactory(), YarnJobClusterEntrypoint.class); final Configuration configuration = YarnEntrypointUtils.loadConfiguration(workingDirectory, dynamicParameters, env); YarnJobClusterEntrypoint yarnJobClusterEntrypoint = new YarnJobClusterEntrypoint(configuration); //执行程序的入口 ClusterEntrypoint.runClusterEntrypoint(yarnJobClusterEntrypoint);
ClusterEntrypoint.runClusterEntrypoint(yarnJobClusterEntrypoint);
clusterEntrypoint.startCluster();
securityContext.runSecured((Callable<Void>) () -> { runCluster(configuration, pluginManager); return null; });
synchronized (lock) { //初始化服务rpc相关 initializeServices(configuration, pluginManager); // write host information into configuration configuration.setString(JobManagerOptions.ADDRESS, commonRpcService.getAddress()); configuration.setInteger(JobManagerOptions.PORT, commonRpcService.getPort()); final DispatcherResourceManagerComponentFactory dispatcherResourceManagerComponentFactory = createDispatcherResourceManagerComponentFactory(configuration); //创建ResourceManage,创建、启动Dispatcher,启动ResourceManage clusterComponent = dispatcherResourceManagerComponentFactory.create( configuration, ioExecutor, commonRpcService, haServices, blobServer, heartbeatServices, metricRegistry, archivedExecutionGraphStore, new RpcMetricQueryServiceRetriever(metricRegistry.getMetricQueryServiceRpcService()), this); clusterComponent.getShutDownFuture().whenComplete( (ApplicationStatus applicationStatus, Throwable throwable) -> { if (throwable != null) { shutDownAsync( ApplicationStatus.UNKNOWN, ExceptionUtils.stringifyException(throwable), false); } else { // This is the general shutdown path. If a separate more specific shutdown was // already triggered, this will do nothing shutDownAsync( applicationStatus, null, true); } }); }
创建ResourceMange、Dispatcher,并启动
clusterComponent = dispatcherResourceManagerComponentFactory.create( configuration, ioExecutor, commonRpcService, haServices, blobServer, heartbeatServices, metricRegistry, archivedExecutionGraphStore, new RpcMetricQueryServiceRetriever(metricRegistry.getMetricQueryServiceRpcService()), this);
具体实现:网页的打开
webMonitorEndpoint = restEndpointFactory.createRestEndpoint( configuration, dispatcherGatewayRetriever, resourceManagerGatewayRetriever, blobServer, executor, metricFetcher, highAvailabilityServices.getClusterRestEndpointLeaderElectionService(), fatalErrorHandler); log.debug("Starting Dispatcher REST endpoint."); webMonitorEndpoint.start();
resourceManage的启动
resourceManager = resourceManagerFactory.createResourceManager( configuration, ResourceID.generate(), rpcService, highAvailabilityServices, heartbeatServices, fatalErrorHandler, new ClusterInformation(hostname, blobServer.getPort()), webMonitorEndpoint.getRestBaseUrl(), metricRegistry, hostname, ioExecutor);
创建Dispatcher
dispatcherRunner = dispatcherRunnerFactory.createDispatcherRunner( highAvailabilityServices.getDispatcherLeaderElectionService(), fatalErrorHandler, new HaServicesJobGraphStoreFactory(highAvailabilityServices), ioExecutor, rpcService, partialDispatcherServices);
选举服务:每个组件都有选举服务,最终要调用这个
contender.grantLeadership(HighAvailabilityServices.DEFAULT_LEADER_ID);
具体实现:
最终执行这个东西:lamda表达式
previousDispatcherLeaderProcessTerminationFuture.thenRun(newDispatcherLeaderProcess::start));
start的实现
runIfStateIs( State.CREATED, this::startInternal);
final DispatcherGatewayService dispatcherService = dispatcherGatewayServiceFactory.create( DispatcherId.fromUuid(getLeaderSessionId()), Collections.singleton(jobGraph), ThrowingJobGraphWriter.INSTANCE);
最终启动dispatcher,并启动
final Dispatcher dispatcher; try { dispatcher = dispatcherFactory.createDispatcher( rpcService, fencingToken, recoveredJobs, (dispatcherGateway, scheduledExecutor, errorHandler) -> new NoOpDispatcherBootstrap(), PartialDispatcherServicesWithJobGraphStore.from(partialDispatcherServices, jobGraphWriter)); } catch (Exception e) { throw new FlinkRuntimeException("Could not create the Dispatcher rpc endpoint.", e); } dispatcher.start();
最终rpc调用,akka组件通信onStart方法
rpcServer.start();