SOFARegistry 是蚂蚁金服开源的一个生产级、高时效、高可用的服务注册中心。本系列文章重点在于分析设计和架构,即利用多篇文章,从多个角度反推总结 DataServer 或者 SOFARegistry 的实现机制和架构思路,让大家借以学习阿里如何设计。本文为第十二篇,上文我们简述了Data节点变化之后,在dataServer中是如何变化处理的,本文我们按照数据流程继续进行,讲讲SOFARegistry如何处理本机房Data节点变化。
目录
SOFARegistry 是蚂蚁金服开源的一个生产级、高时效、高可用的服务注册中心。
本系列文章重点在于分析设计和架构,即利用多篇文章,从多个角度反推总结 DataServer 或者 SOFARegistry 的实现机制和架构思路,让大家借以学习阿里如何设计。
本文为第十二篇,上文我们简述了Data节点变化之后,在dataServer中是如何变化处理的,本文我们按照数据流程继续进行,讲讲SOFARegistry如何处理本机房Data节点变化。
DataServer 在 SOFARegistry 中,承担着核心的数据存储功能。数据按 dataInfoId 进行一致性 Hash 分片存储,支持多副本备份,保证数据高可用。这一层可随服务数据量的规模的增长而扩容。
如果 DataServer 宕机,MetaServer 能感知,并通知所有 DataServer 和 SessionServer,数据分片可 failover 到其他副本,同时 DataServer 集群内部会进行分片数据的迁移。
Data Center 代表的是本地机房,从目前来看:
阿里有异机房备份,应该就是Global部分,但是没有开源。
所以我们得重点剖析本地节点如何继续处理。
我们先要提前剧透下总体逻辑,DataServer彼此通知是围绕着数据版本号进行,即:
所以我们总结如下:在收到 meta server 的 data server change 消息之后,同一个Data Center 之中所有data server 会互相通知彼此升级版本号。
前文提到,在DataServerChangeEventHandler中,处理DataServerChangeEvent时,若当前节点是 DataCenter 节点,则触发 LocalDataServerChangeEvent 事件。
public class LocalDataServerChangeEvent implements Event { private MaplocalDataServerMap; private long localDataCenterversion; private SetnewJoined; private long version; }
LocalDataServerChangeEvent消息来源是DataServerChangeEventHandler。
MetaServer 会通过网络连接感知到新节点上线或者下线,所有的 DataServer 中运行着一个定时刷新连接的任务 ConnectionRefreshTask,该任务定时去轮询 MetaServer,获取数据节点的信息。需要注意的是,除了 DataServer 主动去 MetaServer 拉取节点信息外,MetaServer 也会主动发送 NodeChangeResult 请求到各个节点,通知节点信息发生变化,推拉获取信息的最终效果是一致的。
当轮询信息返回数据节点有变化时,会向 EventCenter 投递一个 DataServerChangeEvent 事件,在该事件的处理器中,如果判断出是当前机房节点信息有变化,则会投递新的事件 LocalDataServerChangeEvent,该事件的处理器 LocalDataServerChangeEventHandler 中会判断当前节点是否为新加入的节点,如果是新节点则会向其它节点发送 NotifyOnlineRequest 请求,如图所示:
在 DataServerChangeEventHandler 的 doHandle 函数中,会产生 LocalDataServerChangeEvent。
LocalDataServerChangeEventHandler是同机房数据节点变更事件处理器,或者说是同一集群数据同步器。
LocalDataServerChangeEvent事件的处理器 LocalDataServerChangeEventHandler 中会判断当前节点是否为新加入的节点,如果是新节点则会向其它节点发送 NotifyOnlineRequest 请求。所以是针对本 Data Center中新加入的 Data Server 进行处理。
LocalDataServerChangeEventHandler 之中关键是:
private BlockingQueueevents = new LinkedBlockingDeque<>(); private class LocalClusterDataSyncer implements Runnable
讲解如下:
即在LocalDataServerChangeEventHandler内部做了一个统一延迟异步处理。当从 EventCenter 中拿到 LocalDataServerChangeEvent 之后,会往 events 放入这个 event,然后内部的LocalClusterDataSyncer 会在随后异步执行。
在LocalClusterDataSyncer内部是:
LocalDataServerChangeEventHandler定义如下:
public class LocalDataServerChangeEventHandler extends AbstractEventHandler{ @Autowired private DataServerConfig dataServerConfig; @Autowired private LocalDataServerCleanHandler localDataServerCleanHandler; @Autowired private DataServerCache dataServerCache; @Autowired private DataNodeExchanger dataNodeExchanger; @Autowired private DataNodeStatus dataNodeStatus; @Autowired private DatumCache datumCache; @Autowired private DatumLeaseManager datumLeaseManager; private BlockingQueueevents = new LinkedBlockingDeque<>(); }
在doHandle函数中,会把最新的消息投放到 BlockingQueue 之中。
public void doHandle(LocalDataServerChangeEvent localDataServerChangeEvent) { isChanged.set(true); // Better change to Listener pattern localDataServerCleanHandler.reset(); datumLeaseManager.reset(); events.offer(localDataServerChangeEvent); }
消费引擎在Bean启动之后,会通过afterPropertiesSet来启动。
@Override public void afterPropertiesSet() throws Exception { super.afterPropertiesSet(); start(); } public void start() { Executor executor = ExecutorFactory .newSingleThreadExecutor(LocalDataServerChangeEventHandler.class.getSimpleName()); executor.execute(new LocalClusterDataSyncer()); }
LocalClusterDataSyncer会执行具体业务消费消息。
在引擎之中,LocalClusterDataSyncer会持续消费。
private class LocalClusterDataSyncer implements Runnable { @Override public void run() { while (true) { try { LocalDataServerChangeEvent event = events.take(); //if size of events is greater than 0, not handle and continue, only handle the last one in the queue if (events.size() > 0) { continue; } long changeVersion = event.getVersion(); isChanged.set(false); if (LocalServerStatusEnum.WORKING == dataNodeStatus.getStatus()) { //if local server is working, compare sync data notifyToFetch(event, changeVersion); } else { dataServerCache.checkAndUpdateStatus(changeVersion); //if local server is not working, notify others that i am newer notifyOnline(changeVersion); dataServerCache.updateItem(event.getLocalDataServerMap(), event.getLocalDataCenterversion(), dataServerConfig.getLocalDataCenter()); } } } }
【重点说明】
每个data server 都会从 meta server 接收到 DataServerChangeEvent,因为是本地Data Server的消息,所以都会转换为 LocalDataServerChangeEvent。
因为是每个 data server 都会接收到,所以新上线服务器会接收到,已经在线的服务器也会接收到。这是下面讲解的重点。
在新节点中,LocalDataServerChangeEvent事件的处理器 LocalDataServerChangeEventHandler 中会判断当前节点是否为新加入的节点,如果是新节点则会向其它节点发送 NotifyOnlineRequest 请求,如图所示:
图 DataServer 节点上线时新节点的逻辑
上图展示的是新加入节点收到节点变更消息的处理逻辑,如果是线上已经运行的节点收到节点变更的消息,前面的处理流程都相同,不同之处在于 LocalDataServerChangeEventHandler 中会根据 Hash 环计算出变更节点(扩容场景下,变更节点是新节点,缩容场景下,变更节点是下线节点在 Hash 环中的后继节点)所负责的数据分片范围和其备份节点。
新加入节点 会通过 NotifyOnlineRequest 告诉其他已经在线的节点,我是新的,你可以做相应配置。
notifyOnline 会从 DataServerNodeFactory 获取当前Local Data Center中所有的DataServerNode,然后逐一发送 NotifyOnlineRequest 通知:我上线了。
然后其他在线的Data Server 当收到通知,会开始与新节点交互。
/** * notify other dataservers that this server is online newly * * @param changeVersion */ private void notifyOnline(long changeVersion) { MapdataServerNodeMap = DataServerNodeFactory .getDataServerNodes(dataServerConfig.getLocalDataCenter()); for (EntryserverEntry : dataServerNodeMap.entrySet()) { while (true) { String ip = serverEntry.getKey(); DataServerNode dataServerNode = DataServerNodeFactory.getDataServerNode( dataServerConfig.getLocalDataCenter(), ip); try { final Connection connection = dataServerNode.getConnection(); CommonResponse response = (CommonResponse) dataNodeExchanger.request( new Request() { @Override public Object getRequestBody() { return new NotifyOnlineRequest(DataServerConfig.IP, changeVersion); } @Override public URL getRequestUrl() { return new URL(connection.getRemoteIP(), connection .getRemotePort()); } }).getResult(); } } } }
当前在线服务节点遍历自身内存中的数据项,过滤出属于变更节点的分片范围的数据项,然后向变更节点和其备份节点发送 NotifyFetchDatumRequest 请求, 变更节点和其备份节点收到该请求后,其处理器会向发送者同步数据(NotifyFetchDatumHandler.fetchDatum),如图所示。
注意,本图与上图左右的JVM放置位置相反。
图 DataServer 节点变更时已存节点的逻辑
就是说,在线服务节点 会通过 NotifyFetchDatumRequest 告诉新节点,我这里有你需要的数据,你过来取。
下面是几个重要函数的说明:
notify onlined newly dataservers to fetch datum,就是通知新节点,你主动过来拉取,同时也依据请求消息来更新自身。
notifyToFetch的具体功能是:
toBeSyncMap = getToBeSyncMap(consistentHash);
获取需要同步的map;getToBeSyncMap的作用是哪些ip需要同步哪些东西
;get map of datum to be synced。Entry<String, Map
,该entry 的key 是dataCenter;allVersionMap.put(dataCenter, versionMap);
versionMap.put(dataInfoId, datum.getVersion());
Entry
,其key是 dataInfoId;Map<String, Map
类型;getToBeSyncMap 的逻辑是找出需要通知的IP列表,以及每个ip需要同步哪些dataInfoId,具体如下:
哪些ip需要同步哪些东西
;private Map<String/*ip*/, Map<String/*datacenter*/, Map>> getToBeSyncMap(ConsistentHashconsistentHash) { Map<String, Map<String, Map>> toBeSyncMap = new HashMap<>(); Map<String, List> triadCache = new HashMap<>(); ConsistentHashconsistentHashOld = dataServerCache .calculateOldConsistentHash(dataServerConfig.getLocalDataCenter()); }
getNewJoined就是找出那些不在已经存储的Triad 之中,或者在其中但是不是working状态的。
public ListgetNewJoined(ListnewTriad, SetnotWorking) { Listlist = new ArrayList<>(); for (DataNode node : newTriad) { String ip = node.getIp(); if (!ipSetOfNode.contains(ip) || notWorking.contains(ip)) { list.add(node); } } return list; }
BackupTriad 的作用是:针对 dataInfoId,对应的备份DataNode列表。
public class BackupTriad { /** dataInfoId */ private String dataInfoId; /** * calculate current dataServer list Consistent hash to get dataInfoId belong node and backup node list * @see ConsistentHash#ConsistentHash(int, java.util.Collection) * @see com.alipay.sofa.registry.consistency.hash.ConsistentHash#getNUniqueNodesFor(java.lang.Object, int) */ private Listtriad; private SetipSetOfNode = new HashSet<>(); /** * constructor * @param dataInfoId * @param triad */ public BackupTriad(String dataInfoId, Listtriad) { this.dataInfoId = dataInfoId; this.triad = triad; for (DataNode node : triad) { ipSetOfNode.add(node.getIp()); } } }
运行时如下:
backupTriad = {BackupTriad@1400} "BackupTriad{dataInfoId='TestDataInfoId', ipSetOfNode=[192.168.0.2, 192.168.0.1, 192.168.0.3]}" dataInfoId = "TestDataInfoId" triad = {ArrayList@1399} size = 3 0 = {DataNode@1409} "DataNode{ip=192.168.0.1}" 1 = {DataNode@1410} "DataNode{ip=192.168.0.2}" 2 = {DataNode@1411} "DataNode{ip=192.168.0.3}" ipSetOfNode = {HashSet@1403} size = 3 0 = "192.168.0.2" 1 = "192.168.0.1" 2 = "192.168.0.3"
在上述代码中,会从LocalDataServerChangeEvent获取一个version,从而利用这个版本做后续处理,同时也会给dataServerCache设置版本号。
LocalDataServerChangeEvent event = events.take(); long changeVersion = event.getVersion(); if (LocalServerStatusEnum.WORKING == dataNodeStatus.getStatus()) { //if local server is working, compare sync data notifyToFetch(event, changeVersion); } else { dataServerCache.checkAndUpdateStatus(changeVersion); //if local server is not working, notify others that i am newer notifyOnline(changeVersion); }
现在我们就好奇,当Data Server有变更时候,这个版本是从哪里来的。让我们追根溯源。这是从后往前倒推的过程。
因为提到了dataServerCache设置版本号,所以我们要回溯到DataServerCache。可以看到,DataServerCache之中有两个相关变量:curVersion 和 DataServerChangeItem。
这就是从newDataServerChangeItem获取了对应data center的版本号,设置在DataServerCache。
具体DataServerCache中相关定义如下:
public class DataServerCache { /** new input dataServer list and version */ private volatile DataServerChangeItem newDataServerChangeItem = new DataServerChangeItem(); private AtomicLong curVersion = new AtomicLong(-1L); public Long getDataCenterNewVersion(String dataCenter) { synchronized (DataServerCache.class) { MapversionMap = newDataServerChangeItem.getVersionMap(); if (versionMap.containsKey(dataCenter)) { return versionMap.get(dataCenter); } else { return null; } } } }
在 DataServerCache中只有addStatus控制curVersion的赋值,而对外的接口中,只有 synced 和 addNotWorkingServer 调用 addStatus。
而 newDataServerChangeItem 是在compareAndSet这里设置。
public Map<String, Set> compareAndSet(DataServerChangeItem newItem, FromType fromType) { if (!changedMap.isEmpty()) { newDataServerChangeItem = newItem; } }
逻辑如下:
+-----------------------------+ |[DataServerCache] | | | compareAndSet +-------------> DataServerChangeItem | | | | curVersion | | ^ ^ | | | | | +-----------------------------+ | | synced +----------------------+ | | addNotWorkingServer+-------------------+
现在涉及到 DataServerCache 两个设计点:
现在推论,每一个数据中心 Data Center 有一个版本号用做其内部所有状态控制。其实,在DataServerChangeItem 的定义中的 versionMap 也能看出来,是根据版本号控制的。
DataServerChangeItem 定义如下:
public class DataServerChangeItem { /** datacenter -> Map*/ private Map<String, Map> serverMap; /** datacenter -> version */ private MapversionMap; }
从而知道:
现在问题变成,
我们通过研读源码可以知道,是从Meta Server获取,下面就跟踪下这个过程。
我们需要复习下这个流程。
Meta Server 会广播通知 所有data server 现在有 data server 更新,也可能是 DataServer主动定期看看MetaServer 是否有更新。
但是具体更新的内容,还是 data server 主动发送 GetNodesRequest 获取。
这里以主动更新为例,可以看到,DataServer 会通过 metaServerService.getDateServers 从 meta server 获取到DataServerChangeItem,从而构建 DataServerChangeEvent。
public class ConnectionRefreshTask extends AbstractTask { @Autowired private IMetaServerService metaServerService; @Autowired private EventCenter eventCenter; @Override public void handle() { DataServerChangeItem dataServerChangeItem = metaServerService.getDateServers(); if (dataServerChangeItem != null) { eventCenter .post(new DataServerChangeEvent(dataServerChangeItem, FromType.CONNECT_TASK)); } } }
在 DefaultMetaServiceImpl 中可以看到,DataServerChangeItem是从 Meta Server获取的NodeChangeResult 提取出来。
public class DefaultMetaServiceImpl implements IMetaServerService { @Override public DataServerChangeItem getDateServers() { MapconnectionMap = metaServerConnectionFactory .getConnections(dataServerConfig.getLocalDataCenter()); String leader = getLeader().getIp(); if (connectionMap.containsKey(leader)) { Connection connection = connectionMap.get(leader); if (connection.isFine()) { try { GetNodesRequest request = new GetNodesRequest(NodeType.DATA); Object obj = metaNodeExchanger.request(new Request() { @Override public Object getRequestBody() { return request; } @Override public URL getRequestUrl() { return new URL(connection.getRemoteIP(), connection.getRemotePort()); } }).getResult(); if (obj instanceof NodeChangeResult) { NodeChangeResultresult = (NodeChangeResult) obj; MapversionMap = result.getDataCenterListVersions(); versionMap.put(result.getLocalDataCenter(), result.getVersion()); return new DataServerChangeItem(result.getNodes(), versionMap); } } } } String newip = refreshLeader().getIp(); return null; } }
逻辑如下:
+ Data Server | | | +------------------+ | | NodeChangeResult | | +-------+----------+ | | +--------------------------+ | | |[DataServerCache] | | | | | | +---------------->compareAndSet------> DataServerChangeItem | | DataServerChangeItem | | | | curVersion | | | ^ ^ | | | | | | | +--------------------------+ | | | | synced +------------- | | | | addNotWorkingServer----------+ | | +
让我们来到 meta server之中。可以看到,之前 在DataStoreService put,remove等各个函数中,当data server有变化时候,会调用 dataNodeRepository 通过时间戳设置版本号。
dataNodeRepository.setVersion(currentTimeMillis);
当 meta server 接收到 GetNodesRequest 之后,会生成 NodeChangeResult。
DataStoreService 会调用 dataNodeRepository 获取版本号,从而在 NodeChangeResult之中设置。
public class DataStoreService implements StoreService{ @Override public NodeChangeResult getNodeChangeResult() { NodeChangeResult nodeChangeResult = new NodeChangeResult(NodeType.DATA); try { String localDataCenter = nodeConfig.getLocalDataCenter(); MapdataNodeRepositoryMap = dataRepositoryService .getNodeRepositories(); ConcurrentHashMap<String/*dataCenter*/, Map> pushNodes = new ConcurrentHashMap<>(); MapversionMap = new ConcurrentHashMap<>(); dataNodeRepositoryMap.forEach((dataCenter, dataNodeRepository) -> { //在这里会获取版本号 if (localDataCenter.equalsIgnoreCase(dataCenter)) { nodeChangeResult.setVersion(dataNodeRepository.getVersion()); } versionMap.put(dataCenter, dataNodeRepository.getVersion()); Map<String, RenewDecorate> dataMap = dataNodeRepository.getNodeMap(); MapnewMap = new ConcurrentHashMap<>(); dataMap.forEach((ip, dataNode) -> newMap.put(ip, dataNode.getRenewal())); pushNodes.put(dataCenter, newMap); }); nodeChangeResult.setNodes(pushNodes); nodeChangeResult.setDataCenterListVersions(versionMap); nodeChangeResult.setLocalDataCenter(localDataCenter); } //返回 return nodeChangeResult; } }
具体如下:
Meta Server + Data Server | | getNodeChangeResult +-----------------+ | +------------------+ +-------------------------> | NodeChangeResult| +------>+ NodeChangeResult | | +-----------------+ | +-------+----------+ | | | +--------------------------+ | | | |[DataServerCache] | +--------+--------+ | | | | |DataStoreService | +-------------------+ | +---------------->compareAndSet------> DataServerChangeItem | +-----------------+ getVersion | | DataServerChangeItem | | | | | curVersion | | | | ^ ^ | | | | | | | v | +--------------------------+ +----------------------+ +-+-----------------+ | | | | DataRepositoryService+-------------> |dataNodeRepository | | synced +------------- | +----------------------+ +-------------------+ | | setVersion(currentTimeMillis) | addNotWorkingServer----------+ | | +
手机上如图:
我们又回到 Data Server。
当Data Server接收到NodeChangeResult之后,会提取出DataServerChangeItem。
public class DefaultMetaServiceImpl implements IMetaServerService { @Override public DataServerChangeItem getDateServers() { ...... GetNodesRequest request = new GetNodesRequest(NodeType.DATA); Object obj = metaNodeExchanger.request(new Request() { ...... } }).getResult(); if (obj instanceof NodeChangeResult) { NodeChangeResultresult = (NodeChangeResult) obj; MapversionMap = result.getDataCenterListVersions(); versionMap.put(result.getLocalDataCenter(), result.getVersion()); return new DataServerChangeItem(result.getNodes(), versionMap); } } } } } }
然后会回到前面 "主动获取变化" 小节,发送DataServerChangeEvent,进而转化为LocalDataServerChangeEvent,就和我们的代码联系起来。
关于 DataServerCache . curVersion 和 newDataServerChangeItem 如何进一步处理,我们需要再研究。
DataServerChangeEventHandler 的 doHandle 函数中有使用:
for (Entry<String, Set> changeEntry : changedMap.entrySet()) { String dataCenter = changeEntry.getKey(); Setips = changeEntry.getValue(); Long newVersion = dataServerCache.getDataCenterNewVersion(dataCenter); }
调用的是dataServerCache的函数,可以看到是取出newDataServerChangeItem的版本号。
public Long getDataCenterNewVersion(String dataCenter) { synchronized (DataServerCache.class) { MapversionMap = newDataServerChangeItem.getVersionMap(); if (versionMap.containsKey(dataCenter)) { return versionMap.get(dataCenter); } else { return null; } } }
构建LocalDataServerChangeEvent时候,则把newDataServerChangeItem的版本作为本地版本号localDataCenterversion。
public LocalDataServerChangeEvent(MaplocalDataServerMap, SetnewJoined, long version, long localDataCenterversion) { this.localDataServerMap = localDataServerMap; this.newJoined = newJoined; this.version = version; this.localDataCenterversion = localDataCenterversion; }
dataServerCache会据此做相关更新。
dataServerCache.updateItem(dataServerMapIn, event.getLocalDataCenterversion(), dataServerConfig.getLocalDataCenter());
关于curVersion,则来到了 notifyToFetch 和 notifyOnline 后续如何处理。
前面我们只是讲解了如何发送版本号,即:
所以我们总结可以,在收到 meta server 的 data server change 消息之后,同一个Data Center 之中所有data server 会互相通知彼此升级版本号。
notifyOnline 会发送 NotifyOnlineRequest,而其他 Data Server 的 NotifyOnlineHandler 会做相应处理。
notifyToFetch 会发送 NotifyFetchDatumRequest,而其他 Data Server 的 notifyFetchDatumHandler 会做相应处理。
下面我们要看看接收版本号之后,DataServer的新节点与在线节点分别做了什么。
这是一个数据拉取请求,当该 Handler 被触发时,通知当前 DataServer 节点进行版本号对比,若请求中数据的版本号高于当前节点缓存中的版本号,则会进行数据同步操作,保证数据是最新的。
这是一个 DataServer 上线通知请求 Handler,当其他节点上线时,会触发该 Handler,从而当前节点在缓存中存储新增的节点信息。用于管理节点状态,究竟是 INITIAL 还是 WORKING 。
于是可以看到,在NotifyOnlineHandler和NotifyFetchDatumHandler之中,都会根据本地dataServerCache中存储的curVersion做判断是否需要继续处理。
public class NotifyOnlineHandler extends AbstractServerHandler{ @Autowired private DataServerCache dataServerCache; @Override public Object doHandle(Channel channel, NotifyOnlineRequest request) { long version = request.getVersion(); if (version >= dataServerCache.getCurVersion()) { dataServerCache.addNotWorkingServer(version, request.getIp()); } return CommonResponse.buildSucce***esponse(); } }
以及 NotifyFetchDatumHandler 之中会调用sycned。
public class NotifyFetchDatumHandler extends AbstractServerHandler{ private static final Logger LOGGER = LoggerFactory .getLogger(NotifyFetchDatumHandler.class); @Autowired private DataServerCache dataServerCache; @Autowired private DataServerConnectionFactory dataServerConnectionFactory; @Autowired private DataChangeEventCenter dataChangeEventCenter; @Autowired private Exchange boltExchange; @Autowired private DataServerConfig dataServerConfig; @Autowired private DatumCache datumCache; @Autowired private LocalDataServerCleanHandler localDataServerCleanHandler; @Override public Object doHandle(Channel channel, NotifyFetchDatumRequest request) { ParaCheckUtil.checkNotBlank(request.getIp(), "ip"); //receive other data NotifyFetchDatumRequest,must delay clean datum task until fetch all datum localDataServerCleanHandler.reset(); Map<String, Map> versionMap = request.getDataVersionMap(); long version = request.getChangeVersion(); String ip = request.getIp(); if (version >= dataServerCache.getCurVersion()) { if (versionMap.isEmpty()) { dataServerCache.synced(version, ip); } else { ExecutorFactory.getCommonExecutor().execute(() -> { for (Entry<String, Map> dataCenterEntry : versionMap.entrySet()) { String dataCenter = dataCenterEntry.getKey(); Mapmap = dataCenterEntry.getValue(); for (EntrydataInfoEntry : map.entrySet()) { String dataInfoId = dataInfoEntry.getKey(); Datum datum = datumCache.get(dataCenter, dataInfoId); if (datum != null) { long inVersion = dataInfoEntry.getValue(); long currentVersion = datum.getVersion(); if (currentVersion > inVersion) { continue; } else if (datum.getVersion() == dataInfoEntry.getValue()) { //if version same,maybe remove publisher all by LocalDataServerCleanHandler,so must fetch from other node if (!datum.getPubMap().isEmpty()) { continue; } } } fetchDatum(ip, dataCenter, dataInfoId); } } dataServerCache.synced(version, ip); }); } } return CommonResponse.buildSucce***esponse(); } }
于是,目前如下:
+ | Meta Server | Data Server | | getNodeChangeResult +-----------------+ | +------------------+ +-------------------------> | NodeChangeResult| +------>+ NodeChangeResult | | +-----------------+ | +-------+----------+ | | | +--------------------------+ | | | |[DataServerCache] | +--------+--------+ | | | | |DataStoreService | +-------------------+ | +---------------->compareAndSet+-----> DataServerChangeItem | +-----------------+ getVersion | | DataServerChangeItem | | | | | curVersion | | | | ^ ^ | | | | | | | v | +-----------------+---+----+ +----------------------+ +-+-----------------+ | | | ^ ^ | DataRepositoryService+-------------> |dataNodeRepository | | synced +------------------------------------+ | | | getCurVersion +----------------------+ +-------------------+ | | | | setVersion(currentTimeMillis) | addNotWorkingSer^er+---------------------------------+ | | | +-------------------------------------------+ | | | getCurVersion | | | | | +-------------+---------+ +---------------------+----+ | | NotifyOnlineHandler | | NotifyFetchDatumHandler | | +-------------+---------+ +---------------+----------+ | ^ In Exist Server ^ In New Server | | | | | | +-----------------------------------------------------------------------------+ | | | | +-------+------------+ +---------+-----------+ | New Data Server | | Exist Data Server | +--------------------+ +---------------------+
手机如下:
至此,版本号流程就完全梳理完毕。
蚂蚁金服服务注册中心如何实现 DataServer 平滑扩缩容
蚂蚁金服服务注册中心 SOFARegistry 解析 | 服务发现优化之路
服务注册中心 Session 存储策略 | SOFARegistry 解析
海量数据下的注册中心 - SOFARegistry 架构介绍
服务注册中心数据分片和同步方案详解 | SOFARegistry 解析
蚂蚁金服开源通信框架SOFABolt解析之连接管理剖析
蚂蚁金服开源通信框架SOFABolt解析之超时控制机制及心跳机制
蚂蚁金服开源通信框架 SOFABolt 协议框架解析
蚂蚁金服服务注册中心数据一致性方案分析 | SOFARegistry 解析