当用户用Session cli命令启动集群时,首先会在Flink集群启动脚本中调用ClusterEntrypoint抽象类中提供的main()方法,以启动和运行相应类型的集群环境。
也就是说,ClusterEntrypoint是整个集群的入口类,且带有main()方法。在运行时管理中,所有的服务都是通过CE类进行触发和启动,进而完成核心组件的创建和初始化。
我们先通过下图看一下CE抽象类的继承关系
可以看到ClusterEntrypoint分为两类
standalone对应的本地模式,mesos、yarn集群模式的不同调度器。
我们再从StandaloneSessionClusterEntrypoint中的main()方法开始,看看ClusterEntrypoint如何启动集群
public static void main(String[] args) { // startup checks and logging 启动配置检查和日志加载 EnvironmentInformation.logEnvironmentInfo(LOG, StandaloneSessionClusterEntrypoint.class.getSimpleName(), args); SignalHandler.register(LOG); JvmShutdownSafeguard.installAsShutdownHook(LOG); EntrypointClusterConfiguration entrypointClusterConfiguration = null; final CommandLineParser<EntrypointClusterConfiguration> commandLineParser = new CommandLineParser<>(new EntrypointClusterConfigurationParserFactory()); try { entrypointClusterConfiguration = commandLineParser.parse(args); } catch (FlinkParseException e) { LOG.error("Could not parse command line arguments {}.", args, e); commandLineParser.printHelp(StandaloneSessionClusterEntrypoint.class.getSimpleName()); System.exit(1); } Configuration configuration = loadConfiguration(entrypointClusterConfiguration); StandaloneSessionClusterEntrypoint entrypoint = new StandaloneSessionClusterEntrypoint(configuration); //经过上面一系列的配置之后,通过调用CE抽象类的runClusterEntrypoint启动 ClusterEntrypoint.runClusterEntrypoint(entrypoint); }
通过最后一行代码我们可以发现,经过一系列的配置和日志加载,最后调用了ClusterEntrypoint里的runClusterEntrypoint方法。我们再来看看这个方法干了什么。
public static void runClusterEntrypoint(ClusterEntrypoint clusterEntrypoint) { final String clusterEntrypointName = clusterEntrypoint.getClass().getSimpleName(); try { clusterEntrypoint.startCluster();//⭐通过这一行启动集群 } catch (ClusterEntrypointException e) { LOG.error(String.format("Could not start cluster entrypoint %s.", clusterEntrypointName), e); System.exit(STARTUP_FAILURE_RETURN_CODE); } clusterEntrypoint.getTerminationFuture().whenComplete((applicationStatus, throwable) -> { final int returnCode; if (throwable != null) { returnCode = RUNTIME_FAILURE_RETURN_CODE; } else { returnCode = applicationStatus.processExitCode(); } LOG.info("Terminating cluster entrypoint process {} with exit code {}.", clusterEntrypointName, returnCode, throwable); System.exit(returnCode); }); }
上述代码中带⭐的代码又调用的CE.startCluster()继续启动,然后等运行结束,用clusterEntrypoint.getTerminationFuture().whenComplete()获取运行结束状态并进行对应的处理。
我们再看看startCluster()干了什么
public void startCluster() throws ClusterEntrypointException { LOG.info("Starting {}.", getClass().getSimpleName()); try { configureFileSystems(configuration);//配置文件系统 SecurityContext securityContext = installSecurityContext(configuration); securityContext.runSecured((Callable<Void>) () -> { runCluster(configuration);//⭐在securityContext安全环境里继续启动 return null; }); } catch (Throwable t) { final Throwable strippedThrowable = ExceptionUtils.stripException(t, UndeclaredThrowableException.class); try { // clean up any partial state shutDownAsync( ApplicationStatus.FAILED, ExceptionUtils.stringifyException(strippedThrowable), false).get(INITIALIZATION_SHUTDOWN_TIMEOUT.toMilliseconds(), TimeUnit.MILLISECONDS); } catch (InterruptedException | ExecutionException | TimeoutException e) { strippedThrowable.addSuppressed(e); } throw new ClusterEntrypointException( String.format("Failed to initialize the cluster entrypoint %s.", getClass().getSimpleName()), strippedThrowable); } }
注意⭐号的代码,这里是SecurityContext在继续runCluster,而不是ClusterEntrypoint在做,继续看runCluster
private void runCluster(Configuration configuration) throws Exception { synchronized (lock) { //⭐初始化运行时集群需要创建的基础组件服务,如HAServices、CommonRPCService等。 initializeServices(configuration); // write host information into configuration 把host信息写入配置 configuration.setString(JobManagerOptions.ADDRESS, commonRpcService.getAddress()); configuration.setInteger(JobManagerOptions.PORT, commonRpcService.getPort()); final DispatcherResourceManagerComponentFactory<?> dispatcherResourceManagerComponentFactory = createDispatcherResourceManagerComponentFactory(configuration); //⭐创建集群组件clusterComponent //⭐其中包含了resourceManager、dispatcher、webMonitorEndpoint clusterComponent = dispatcherResourceManagerComponentFactory.create( configuration, commonRpcService, haServices, blobServer, heartbeatServices, metricRegistry, archivedExecutionGraphStore, new AkkaQueryServiceRetriever( metricQueryServiceActorSystem, Time.milliseconds(configuration.getLong(WebOptions.TIMEOUT))), this); clusterComponent.getShutDownFuture().whenComplete( (ApplicationStatus applicationStatus, Throwable throwable) -> { if (throwable != null) { shutDownAsync( ApplicationStatus.UNKNOWN, ExceptionUtils.stringifyException(throwable), false); } else { // This is the general shutdown path. If a separate more specific shutdown was // already triggered, this will do nothing shutDownAsync( applicationStatus, null, true); } }); } }
这一步启动了多种服务和组件,并通过dispatcherResourceManagerComponentFactory调用create来启动,继续看
@Override public DispatcherResourceManagerComponent<T> create( Configuration configuration, RpcService rpcService, HighAvailabilityServices highAvailabilityServices, BlobServer blobServer, HeartbeatServices heartbeatServices, MetricRegistry metricRegistry, ArchivedExecutionGraphStore archivedExecutionGraphStore, MetricQueryServiceRetriever metricQueryServiceRetriever, FatalErrorHandler fatalErrorHandler) throws Exception { LeaderRetrievalService dispatcherLeaderRetrievalService = null; LeaderRetrievalService resourceManagerRetrievalService = null; WebMonitorEndpoint<U> webMonitorEndpoint = null; ResourceManager<?> resourceManager = null; JobManagerMetricGroup jobManagerMetricGroup = null; T dispatcher = null; try { dispatcherLeaderRetrievalService = highAvailabilityServices.getDispatcherLeaderRetriever(); resourceManagerRetrievalService = highAvailabilityServices.getResourceManagerLeaderRetriever(); final LeaderGatewayRetriever<DispatcherGateway> dispatcherGatewayRetriever = new RpcGatewayRetriever<>( rpcService, DispatcherGateway.class, DispatcherId::fromUuid, 10, Time.milliseconds(50L)); final LeaderGatewayRetriever<ResourceManagerGateway> resourceManagerGatewayRetriever = new RpcGatewayRetriever<>( rpcService, ResourceManagerGateway.class, ResourceManagerId::fromUuid, 10, Time.milliseconds(50L)); final ExecutorService executor = WebMonitorEndpoint.createExecutorService( configuration.getInteger(RestOptions.SERVER_NUM_THREADS), configuration.getInteger(RestOptions.SERVER_THREAD_PRIORITY), "DispatcherRestEndpoint"); final long updateInterval = configuration.getLong(MetricOptions.METRIC_FETCHER_UPDATE_INTERVAL); final MetricFetcher metricFetcher = updateInterval == 0 ? VoidMetricFetcher.INSTANCE : MetricFetcherImpl.fromConfiguration( configuration, metricQueryServiceRetriever, dispatcherGatewayRetriever, executor); webMonitorEndpoint = restEndpointFactory.createRestEndpoint( configuration, dispatcherGatewayRetriever, resourceManagerGatewayRetriever, blobServer, executor, metricFetcher, highAvailabilityServices.getWebMonitorLeaderElectionService(), fatalErrorHandler);//⭐创建webMonitorEndpoint log.debug("Starting Dispatcher REST endpoint."); webMonitorEndpoint.start();//⭐启动webMonitorEndpoint final String hostname = getHostname(rpcService); jobManagerMetricGroup = MetricUtils.instantiateJobManagerMetricGroup( metricRegistry, hostname, ConfigurationUtils.getSystemResourceMetricsProbingInterval(configuration)); resourceManager = resourceManagerFactory.createResourceManager( configuration, ResourceID.generate(), rpcService, highAvailabilityServices, heartbeatServices, metricRegistry, fatalErrorHandler, new ClusterInformation(hostname, blobServer.getPort()), webMonitorEndpoint.getRestBaseUrl(), jobManagerMetricGroup); //⭐创建ResourceManager final HistoryServerArchivist historyServerArchivist = HistoryServerArchivist.createHistoryServerArchivist(configuration, webMonitorEndpoint); dispatcher = dispatcherFactory.createDispatcher( configuration, rpcService, highAvailabilityServices, resourceManagerGatewayRetriever, blobServer, heartbeatServices, jobManagerMetricGroup, metricRegistry.getMetricQueryServicePath(), archivedExecutionGraphStore, fatalErrorHandler, historyServerArchivist);//⭐创建dispatcher log.debug("Starting ResourceManager."); resourceManager.start();//⭐启动ResourceManager resourceManagerRetrievalService.start(resourceManagerGatewayRetriever); log.debug("Starting Dispatcher."); dispatcher.start();//⭐启动dispatcher dispatcherLeaderRetrievalService.start(dispatcherGatewayRetriever); return createDispatcherResourceManagerComponent( dispatcher, resourceManager, dispatcherLeaderRetrievalService, resourceManagerRetrievalService, webMonitorEndpoint, jobManagerMetricGroup); } catch (Exception exception) { // clean up all started components if (dispatcherLeaderRetrievalService != null) { try { dispatcherLeaderRetrievalService.stop(); } catch (Exception e) { exception = ExceptionUtils.firstOrSuppressed(e, exception); } } if (resourceManagerRetrievalService != null) { try { resourceManagerRetrievalService.stop(); } catch (Exception e) { exception = ExceptionUtils.firstOrSuppressed(e, exception); } } final Collection<CompletableFuture<Void>> terminationFutures = new ArrayList<>(3); if (webMonitorEndpoint != null) { terminationFutures.add(webMonitorEndpoint.closeAsync()); } if (resourceManager != null) { terminationFutures.add(resourceManager.closeAsync()); } if (dispatcher != null) { terminationFutures.add(dispatcher.closeAsync()); } final FutureUtils.ConjunctFuture<Void> terminationFuture = FutureUtils.completeAll(terminationFutures); try { terminationFuture.get(); } catch (Exception e) { exception = ExceptionUtils.firstOrSuppressed(e, exception); } if (jobManagerMetricGroup != null) { jobManagerMetricGroup.close(); } throw new FlinkException("Could not create the DispatcherResourceManagerComponent.", exception); } }
几个创建和启动组件的地方用⭐标注出来了。