使用Flink HA功能维护JobManager中组件的生命周期,可以有效的避免因为JobManager 进程失败导致任务无法恢复的情况。
接下来分享下 Flink HA功能的实现
基于Zookeeper+Hadoop HA功能的实现
HA功能的接口概述
基于Zookeeper实现的HA接口
手工课: 添加个新的组件并使用HA功能维护生命周期
/leaderlatch
: leaderlatch 目录下的节点用于竞选leader (相关类ZooKeeperLeaderElectionService
,LeaderContender
)
/leader
: 通过监听leader下的节点获取到leader 信息的实时变化 (相关类ZooKeeperLeaderRetrievalService
,LeaderRetrievalListener
)
/checkpoints
: checkpoints 目录下记录任务可用的checkpoint,最终可以获得hadoop的HA目录下的checkpoint metadata 的路径信息 (相关类ZooKeeperCompletedCheckpointStore
)
/checkpoint-counter
:checkpoint的计数器 (相关类ZooKeeperCheckpointIDCounter
)
/running_job_registry
: 运行中的任务及状态 (相关类ZooKeeperRunningJobsRegistry
)
Flink使用 Zookeeper不光负责竞选leader和实时通知其他组件最新的leader信息,还会存放JobManager和任务的信息,保证新的JobManager起来后,这些信息不会丢失。
ResourceManager在这个节点/leaderlatch/resource_manager_lock
竞选到leader之后会在/leader/resource_manager_lock
节点更新leader的信息,监听/leader/resource_manager_lock
节点变化的其他组件会立即使用新的地址和SessionId连接ResourceManager
FLink会创建基于Hadoop的BlobServer (相关类FileSystemBlobStore)
在HA路径下会保存Hadoop的checkpoint的元数据文件 (相关类FileSystemStateStorageHelper
)
HighAvailabilityServices
: 可以获取所有组件的 LeaderRetrievalService
和LeaderElectionService
接口及记录JobManager中需要持久化的状态,例如完成的保存点,JobGraph,BlobStore,任务调度的状态
LeaderElectionService
: 负责选举leader的Service接口。
具体方法:
//开启选举服务,一般是在RPC的Endpoint初始化好之后,开始调用选举 void start(LeaderContender contender) throws Exception; //停止选举服务,组件的生命周期结束,停止选举 void stop() throws Exception; //组件选上leader之后的确认操作,并回写信息,比如在基于zk的HA上会向leader目录下的节点回写leader的信息 void confirmLeaderSessionID(UUID leaderSessionID); //判断这个sessiondId是否是leader boolean hasLeadership(@Nonnull UUID leaderSessionId);
LeaderContender
: 参与选举的接口。在Flink中需要实现HA的组件,如: ResourceManager,Dispatcher,WebMonitorEndpoint,每个Job的JobManager都会实现这个接口。 通过LeaderElectionService#start(LeaderContender)
方法开始竞选leader
LeaderRetrievalService
: 实时接收Leader的变更信息的服务。Leader信息变更会调用 LeaderRetrievalListener
的notifyLeaderAddress
方法通知新Leader的变更信息(address,sessionid)
LeaderRetrievalListener
: 如果需要实时监听leader的信息,需要实现这个接口。通过对应组件实现的LeaderRetrievalService#start(LeaderRetrievalListener listener)
方法实时监听leader的信息
//通知有Leader的信息的变更 void notifyLeaderAddress(@Nullable String leaderAddress, @Nullable UUID leaderSessionID); //处理监听Leader服务报错 void handleError(Exception exception);
ZooKeeperHaServices
(实现自步骤2的HighAvailabilityServices
),通过 ZooKeeperHaServices
可以获取每个组件的ZooKeeperLeaderElectionService
和ZooKeeperLeaderRetrievalService
@Override public LeaderRetrievalService getAutoRescaleLeaderRetriever() { return ZooKeeperUtils.createLeaderRetrievalService(client, configuration, RESCALE_SERVICE_LEADER_PATH); } @Override public LeaderRetrievalService getResourceManagerLeaderRetriever() { return ZooKeeperUtils.createLeaderRetrievalService(client, configuration, RESOURCE_MANAGER_LEADER_PATH); } @Override public LeaderRetrievalService getDispatcherLeaderRetriever() { return ZooKeeperUtils.createLeaderRetrievalService(client, configuration, DISPATCHER_LEADER_PATH); } @Override public LeaderRetrievalService getJobManagerLeaderRetriever(JobID jobID) { return ZooKeeperUtils.createLeaderRetrievalService(client, configuration, getPathForJobManager(jobID)); } @Override public LeaderRetrievalService getJobManagerLeaderRetriever(JobID jobID, String defaultJobManagerAddress) { return getJobManagerLeaderRetriever(jobID); } @Override public LeaderRetrievalService getWebMonitorLeaderRetriever() { return ZooKeeperUtils.createLeaderRetrievalService(client, configuration, REST_SERVER_LEADER_PATH); } @Override public LeaderElectionService getResourceManagerLeaderElectionService() { return ZooKeeperUtils.createLeaderElectionService(client, configuration, RESOURCE_MANAGER_LEADER_PATH); } @Override public LeaderElectionService getDispatcherLeaderElectionService() { return ZooKeeperUtils.createLeaderElectionService(client, configuration, DISPATCHER_LEADER_PATH); } @Override public LeaderElectionService getJobManagerLeaderElectionService(JobID jobID) { return ZooKeeperUtils.createLeaderElectionService(client, configuration, getPathForJobManager(jobID)); } @Override public LeaderElectionService getWebMonitorLeaderElectionService() { return ZooKeeperUtils.createLeaderElectionService(client, configuration, REST_SERVER_LEADER_PATH); }
ZooKeeperLeaderElectionService
(实现自步骤2的LeaderElectionService
)负责Flink组件选举的service。分别实现了以下三个curator的接口
LeaderLatchListener
: 监听leaderlatch下的对应的组件的节点,已确保当前组件是否获取leader或者失去leadership
NodeCacheListener
: 监听leader下的对应实例节点发生变化,且当前实例节点是leader,则向leader对应节点重新写入当前实例的连接信息
UnhandledErrorListener
: 监听是否与zk通信出错
LeaderContender
接口不同的实现对应不同的选举者,举例ResourceManager 在确认选举上leader,旧状态清除后,
会设置旧的FencedRpcEndpoint设置新的Fencingtoken(防止脑裂)
private CompletableFuture<Boolean> tryAcceptLeadership(final UUID newLeaderSessionID) { if (leaderElectionService.hasLeadership(newLeaderSessionID)) { final ResourceManagerId newResourceManagerId = ResourceManagerId.fromUuid(newLeaderSessionID); log.info("ResourceManager {} was granted leadership with fencing token {}", getAddress(), newResourceManagerId); // clear the state if we've been the leader before if (getFencingToken() != null) { clearStateInternal(); } setFencingToken(newResourceManagerId); startServicesOnLeadership(); return prepareLeadershipAsync().thenApply(ignored -> true); } else { return CompletableFuture.completedFuture(false); } }
开启与TaskManager,JobManager的心跳服务和SlotManager
protected void startServicesOnLeadership() { startHeartbeatServices(); slotManager.start(getFencingToken(), getMainThreadExecutor(), new ResourceActionsImpl()); }
向ResourceManager的ZooKeeperLeaderElectionService 确定新的ResourceManager已经成功成为leader
@Override public void grantLeadership(final UUID newLeaderSessionID) { final CompletableFuture<Boolean> acceptLeadershipFuture = clearStateFuture .thenComposeAsync((ignored) -> tryAcceptLeadership(newLeaderSessionID), getUnfencedMainThreadExecutor()); final CompletableFuture<Void> confirmationFuture = acceptLeadershipFuture.thenAcceptAsync( (acceptLeadership) -> { if (acceptLeadership) { // confirming the leader session ID might be blocking, leaderElectionService.confirmLeaderSessionID(newLeaderSessionID); } }, getRpcService().getExecutor()); confirmationFuture.whenComplete( (Void ignored, Throwable throwable) -> { if (throwable != null) { onFatalError(ExceptionUtils.stripCompletionException(throwable)); } }); }
有一点需要提到的是confirmLeaderSessionID方法主要是向leader下的节点写入连接信息
@Override public void confirmLeaderSessionID(UUID leaderSessionID) { if (LOG.isDebugEnabled()) { LOG.debug( "Confirm leader session ID {} for leader {}.", leaderSessionID, leaderContender.getAddress()); } Preconditions.checkNotNull(leaderSessionID); if (leaderLatch.hasLeadership()) { // check if this is an old confirmation call synchronized (lock) { if (running) { if (leaderSessionID.equals(this.issuedLeaderSessionID)) { confirmedLeaderSessionID = leaderSessionID; writeLeaderInformation(confirmedLeaderSessionID); } } else { LOG.debug("Ignoring the leader session Id {} confirmation, since the " + "ZooKeeperLeaderElectionService has already been stopped.", leaderSessionID); } } } else { LOG.warn("The leader session ID {} was confirmed even though the " + "corresponding JobManager was not elected as the leader.", leaderSessionID); } }
ZooKeeperLeaderRetrievalService(实现自步骤2的LeaderRetrievalService
),监听Flink Leader信息的变更的服务。分别实现了两个curator的接口:
UnhandledErrorListener
: 监听是否与zk通信出错
NodeCacheListener
监听leader下对应的实例节点是否变更,如果变更则通知持有`LeaderRetrievalListener
实现类的其他实例,重新连接该实例的新leader
比如JobMaster的ResourceManagerLeaderListener(实现自 LeaderRetrievalListener
),当收到leader变更,则会连接新的ResourceManager
private class ResourceManagerLeaderListener implements LeaderRetrievalListener { @Override public void notifyLeaderAddress(final String leaderAddress, final UUID leaderSessionID) { runAsync( () -> notifyOfNewResourceManagerLeader( leaderAddress, ResourceManagerId.fromUuidOrNull(leaderSessionID))); } @Override public void handleError(final Exception exception) { handleJobMasterError(new Exception("Fatal error in the ResourceManager leader service", exception)); } }
使用HA模块,整体上我们可以非常方便地添加一个新的组件,使用HA维护其生命周期。现在举例添加个RescaleCoordinator组件。
在ZooKeeperHaServices
中实现添加RescaleCoordinator的HA service的实现,这里和其他组件一样,只需要指定不同的leader path即可。
private static final String RESCALE_SERVICE_LEADER_PATH ="xxxxx"; @Override public LeaderElectionService getAutoRescaleLeaderElectionService() { return ZooKeeperUtils.createLeaderElectionService(client, configuration, RESCALE_SERVICE_LEADER_PATH); } @Override public LeaderRetrievalService getAutoRescaleLeaderRetriever() { return ZooKeeperUtils.createLeaderRetrievalService(client, configuration, RESCALE_SERVICE_LEADER_PATH); }
实现AutoRescaleCoodinator组件,继承自FencedRpcEndpoint(RPC的节点,后面分享Flink RPC实现的时候会详细讲这个) ,LeaderContender(上文提到过),AutoRescaleGateway(RPC调用的接口声明)
public class AutoRescaleCoodinator extends FencedRpcEndpoint<AutoRescaleCoodinatorId> implements LeaderContender, AutoRescaleGateway{ autoRescaleLeaderElectionService=highAvailabilityServices.getAutoRescaleLeaderElectionService(); }
在AutoRescaleCoodinator启动成功后开始参与选举
@Override protected void onStart() throws Exception { autoRescaleLeaderElectionService.start(this); }
在被通知选上leader之后,初始化服务,设置Fencingtoken ,最后向LeaderElectionService确认已选上leader
@Override public void grantLeadership(UUID leaderSessionID) { logger.info("autorescale coodinator {} grant leadership", leaderSessionID); if (autoRescaleLeaderElectionService.hasLeadership(leaderSessionID)) { try { if (configuration.getBoolean(RescaleOptions.RESCALE_ENABLE)) { initAutoRescaleCoordinatorService(); } else { //只能触发手动伸缩容 logger.info("当前任务未开启自动伸缩容功能"); } setFencingToken(new AutoRescaleCoodinatorId(leaderSessionID)); autoRescaleLeaderElectionService.confirmLeaderSessionID(leaderSessionID); } catch (Exception exception) { if (schedulerUtil.isRunning()){ schedulerUtil.close(); } this.handleError(new RuntimeException("AutoRescaleCoodinator 选主失败",exception)); } } }
在revokeLeadership方法中停止AutoRescaleCoodinator内置的服务
@Override public void revokeLeadership() { schedulerUtil.close(); runAsyncWithoutFencing( () -> { log.info("AutoRescaleCoordinator {} was revoked leadership.", getAddress()); setFencingToken(null); }); }
创建RpcGatewayRetriever
对象(实现LeaderRetrievalListener
接口)
//从ZooKeeperHaServices中获取AutoRescaleLeaderRetriever autoRescaleLeaderRetrieverService = highAvailabilityServices.getAutoRescaleLeaderRetriever(); //新建RescaleCoodinator的RpcGatewayRetriever RpcGatewayRetriever<AutoRescaleCoodinatorId, AutoRescaleGateway> rescaleCoGtwRetriever = new RpcGatewayRetriever<>(rpcService, AutoRescaleGateway.class, AutoRescaleCoodinatorId::fromUuid, 10, Time.milliseconds(50L)); //实时从zk监听RescaleCoodinator的信息的变化 autoRescaleLeaderRetrieverService.start(rescaleCoGtwRetriever)
最后通过rescaleCoodinatorRetriever可以获取Gateway接口与RescaleCoodinator通信了
@Override public CompletableFuture<String> callOnlineRescale(RescaleState rescaleState) { return rescaleCoodinatorRetriever.getFuture().thenCompose( autoRescaleGateway -> autoRescaleGateway.doRescale(rescaleState) ); }