SOFARegistry 是蚂蚁金服开源的一个生产级、高时效、高可用的服务注册中心。本系列文章重点在于分析设计和架构,即利用多篇文章,从多个角度反推总结 DataServer 或者 SOFARegistry 的实现机制和架构思路,让大家借以学习阿里如何设计。本文为第十篇,主要是从业务角度进行梳理。看看DataServer如何与MetaServer交互。
目录
SOFARegistry 是蚂蚁金服开源的一个生产级、高时效、高可用的服务注册中心。
本系列文章重点在于分析设计和架构,即利用多篇文章,从多个角度反推总结 DataServer 或者 SOFARegistry 的实现机制和架构思路,让大家借以学习阿里如何设计。
本文为第十篇,主要是从业务角度进行梳理。看看DataServer如何与MetaServer交互。
首先我们要复习下MetaServer的重要性。
MetaServer元数据服务器集群。这个集群管辖的范围是 Session 服务器集群和 Data 服务器集群的服务器信息,其角色就相当于 SOFARegistry 架构内部的服务注册中心,只不过 SOFARegistry 作为服务注册中心是服务于广大应用服务层,而 Meta 集群是服务于 SOFARegistry 内部的 Session 集群和 Data 集群,Meta 层能够感知到 Session 节点和 Data 节点的变化,并通知集群的其它节点。
所以,如果想获取节点的变化,DataServer就必须重点研究如何与MetaServer交互。
居于Bolt协议,DataServer在与Meta Server的交互中,使用了推拉模型。
我们在这里重点分析其设计策略如下:
此模块目录结构如下,大致可以推论,
DefaultMetaServiceImpl 是 Meta Server 相关模块主体;
MetaServerConnectionFactory是连接管理;
ConnectionRefreshMetaTask 是定期循环task;
handler目录下是三个响应函数;
provideData 目录下是配置相关功能;
具体目录结构如下:
│ ├── metaserver │ │ ├── DefaultMetaServiceImpl.java │ │ ├── IMetaServerService.java │ │ ├── MetaServerConnectionFactory.java │ │ ├── handler │ │ │ ├── NotifyProvideDataChangeHandler.java │ │ │ ├── ServerChangeHandler.java │ │ │ └── StatusConfirmHandler.java │ │ ├── provideData │ │ │ ├── ProvideDataProcessor.java │ │ │ ├── ProvideDataProcessorManager.java │ │ │ └── processor │ │ │ └── DatumExpireProvideDataProcessor.java │ │ └── task │ │ └── ConnectionRefreshMetaTask.java
MetaServer相关组件如下:
这里有一个问题 :为什么 DataServerBootstrap 之中还有 startRaftClient,按说DataServer只用Http和Bolt就可以了。
原来是用 raft 协议来获取MetaServer集群中leader的地址等信息: raftClient.getLeader();
比如 renewNodeTask 时候会用到。
Raft相关启动是在startRaftClient,此函数的作用是:
具体代码是:
private void startRaftClient() { metaServerService.startRaftClient(); eventCenter.post(new MetaServerChangeEvent(metaServerService.getMetaServerMap())); }
前面提到了,当系统启动之后,会主动发送一个MetaServerChangeEvent,我们就看看其内容。
public class MetaServerChangeEvent implements Event { private Map<String, Set> ipMap; /** * constructor * @param ipMap */ public MetaServerChangeEvent(Map<String, Set> ipMap) { this.ipMap = ipMap; } public Map<String, Set> getIpMap() { return ipMap; } }
其运行状态如下:
event = {MetaServerChangeEvent@5991} ipMap = {HashMap@5678} size = 1 "DefaultDataCenter" -> {ConcurrentHashMap$KeySetView@6007} size = 1
MetaServerChangeEvent有三种来源:启动主动获取,定期,推送。这三种具体如下:
我们先简述来源:
这就是上面提到的,启动时会从配置里面读取meta server配置,metaServerService.getMetaServerMap();据此构建MetaServerChangeEvent,投放到EventCenter之中。
当 DataServer 节点初始化成功后,会启动任务自动去连接 MetaServer。即,该任务会往事件中心 EventCenter 注册一个 DataServerChangeEvent 事件,该事件注册后会被触发,之后将对新增节点计算 Hash 值,同时进行纳管分片。
具体启动时,会从配置里面读取meta server配置,metaServerService.getMetaServerMap();据此构建MetaServerChangeEvent,投放到EventCenter之中。
private void startRaftClient() { metaServerService.startRaftClient(); eventCenter.post(new MetaServerChangeEvent(metaServerService.getMetaServerMap())); }
堆栈如下
register:44, MetaServerConnectionFactory (com.alipay.sofa.registry.server.data.remoting.metaserver) registerMetaServer:129, MetaServerChangeEventHandler (com.alipay.sofa.registry.server.data.event.handler) doHandle:92, MetaServerChangeEventHandler (com.alipay.sofa.registry.server.data.event.handler) doHandle:55, MetaServerChangeEventHandler (com.alipay.sofa.registry.server.data.event.handler) handle:51, AbstractEventHandler (com.alipay.sofa.registry.server.data.event.handler) post:56, EventCenter (com.alipay.sofa.registry.server.data.event) startRaftClient:197, DataServerBootstrap (com.alipay.sofa.registry.server.data.bootstrap) start:131, DataServerBootstrap (com.alipay.sofa.registry.server.data.bootstrap) start:47, DataServerInitializer (com.alipay.sofa.registry.server.data.bootstrap) doStart:173, DefaultLifecycleProcessor (org.springframework.context.support) access$200:50, DefaultLifecycleProcessor (org.springframework.context.support) start:350, DefaultLifecycleProcessor$LifecycleGroup (org.springframework.context.support) startBeans:149, DefaultLifecycleProcessor (org.springframework.context.support) onRefresh:112, DefaultLifecycleProcessor (org.springframework.context.support) finishRefresh:880, AbstractApplicationContext (org.springframework.context.support) refresh:546, AbstractApplicationContext (org.springframework.context.support) refresh:693, SpringApplication (org.springframework.boot) refreshContext:360, SpringApplication (org.springframework.boot) run:303, SpringApplication (org.springframework.boot) run:1118, SpringApplication (org.springframework.boot) run:1107, SpringApplication (org.springframework.boot) main:41, DataApplication (com.alipay.sofa.registry.server.data)
这部分是ConnectionRefreshMetaTask完成。ConnectionRefreshMetaTask 是定期 task,其在 Bean tasks 里面配置。
StartTaskEventHandler 会调用到 tasks,当收到 StartTaskEvent 之后,会启动 tasks里面的几个AbstractTask。
public class StartTaskEventHandler extends AbstractEventHandler{ @Resource(name = "tasks") private Listtasks; private ScheduledExecutorService executor = null; @Override public List<Class> interest() { return Lists.newArrayList(StartTaskEvent.class); } @Override public void doHandle(StartTaskEvent event) { if (executor == null || executor.isShutdown()) { getExecutor(); } for (AbstractTask task : tasks) { if (event.getSuitableTypes().contains(task.getStartTaskTypeEnum())) { executor.scheduleWithFixedDelay(task, task.getInitialDelay(), task.getDelay(),task.getTimeUnit()); } } } private void getExecutor() { executor = ExecutorFactory.newScheduledThreadPool(tasks.size(), this.getClass() .getSimpleName()); } }
具体tasks如下:
@Bean(name = "tasks") public Listtasks() { Listlist = new ArrayList<>(); list.add(connectionRefreshTask()); list.add(connectionRefreshMetaTask()); list.add(renewNodeTask()); return list; }
ConnectionRefreshMetaTask 是定期task,会定期向EventCenter投放一个 MetaServerChangeEvent。
执行时候调用 metaServerService.getMetaServerMap();
返回一个MetaServerChangeEvent,并且添加到EventCenter之中。
public class ConnectionRefreshMetaTask extends AbstractTask { @Autowired private IMetaServerService metaServerService; @Autowired private EventCenter eventCenter; @Override public void handle() { eventCenter.post(new MetaServerChangeEvent(metaServerService.getMetaServerMap())); } }
ServerChangeHandler 是 metaClientHandler 的一部分,是MetaNodeExchanger 的响应函数。
ServerChangeHandler 继承了AbstractClientHandler,在interest之中,配置了会响应NodeChangeResult。
如果Meta有推送,ServerChangeHandler这里就有响应,这个会是 Meta Server 主动通知。
在ServerChangeHandler之中,拿到了NodeChangeResult之后,会判断变更节点类型,这里会根据 Note 类型不同,决定产生 DataServerChangeEvent 还是MetaServerChangeEvent。如果是NodeType.META,就发送消息给eventCenter,eventCenter.post(new MetaServerChangeEvent(map));
,这就是MetaServerChangeEvent的来源之一。
public class ServerChangeHandler extends AbstractClientHandler{ @Autowired private EventCenter eventCenter; @Autowired private DataServerConfig dataServerConfig; @Override public Object doHandle(Channel channel, NodeChangeResult request) { ExecutorFactory.getCommonExecutor().execute(() -> { if (request.getNodeType() == NodeType.DATA) { eventCenter.post(new DataServerChangeEvent(request.getNodes(), request.getDataCenterListVersions(), FromType.META_NOTIFY)); } else if (request.getNodeType() == NodeType.META) { Map<String, Map> metaNodesMap = request.getNodes(); if (metaNodesMap != null && !metaNodesMap.isEmpty()) { MapmetaNodeMap = metaNodesMap.get(dataServerConfig.getLocalDataCenter()); if (metaNodeMap != null && !metaNodeMap.isEmpty()) { HashMap<String, Set> map = new HashMap<>(); map.put(dataServerConfig.getLocalDataCenter(), metaNodeMap.keySet()); eventCenter.post(new MetaServerChangeEvent(map)); } } } }); return CommonResponse.buildSucce***esponse(); } @Override public Class interest() { return NodeChangeResult.class; } @Override public HandlerType getType() { return HandlerType.PROCESSER; } @Override protected Node.NodeType getConnectNodeType() { return Node.NodeType.DATA; } }
此时逻辑图如下,可以看到三种MetaServerChangeEvent消息来源,ServerChangeHandler也会提供DataServerChangeEvent:
+-------------------------------+ |[DataServerBootstrap] | MetaServerChangeEvent | | | +-------------------------+ | startRaftClient | | | | | | | | +-------------------------------+ | +-------------------------------+ | | [Timer] | | | | | +-------------+ | ConnectionRefreshMetaTask +------------------------------> | EventCenter | | | MetaServerChangeEvent | +-------+-----+ +-------------------------------+ | ^ +-------------------------------+ | | | [Push] | | | | | | | | +-------------------------+ | | | MetaServerChangeEvent | | ServerChangeHandler | | | +----------------------------------------+ +-------------------------------+ DataServerChangeEvent
MetaServerChangeEventHandler 用来响应 MetaServerChangeEvent 消息。因为其继承了AbstractEventHandler,所以 MetaServerChangeEventHandler 已经注册到了EventCenter之上。
注意,这里有一个再次转换DataServerChangeEvent的过程,即这里又会主动和MetaServer交互,如果返回消息是NodeChangeResult,就转换为DataServerChangeEvent。
这是因为Meta Server的这个推送,也许是告诉data Server,"hi,目前data server也有变动,兄弟你再来拉取下"。
在处理时候,MetaServerChangeEventHandler会去与MetaServer交互,看看其有效性,如果有效,就注册。
逻辑如下:
DataNode(new URL(DataServerConfig.IP), dataServerConfig .getLocalDataCenter());
具体代码如下:
public class MetaServerChangeEventHandler extends AbstractEventHandler{ @Autowired private DataServerConfig dataServerConfig; @Autowired private IMetaServerService metaServerService; @Autowired private MetaNodeExchanger metaNodeExchanger; @Autowired private EventCenter eventCenter; @Autowired private MetaServerConnectionFactory metaServerConnectionFactory; @Override public List<Class> interest() { return Lists.newArrayList(MetaServerChangeEvent.class); } @Override public void doHandle(MetaServerChangeEvent event) { Map<String, Set> ipMap = event.getIpMap(); for (Entry<String, Set> ipEntry : ipMap.entrySet()) { String dataCenter = ipEntry.getKey(); Setips = ipEntry.getValue(); if (!CollectionUtils.isEmpty(ips)) { for (String ip : ips) { Connection connection = metaServerConnectionFactory.getConnection(dataCenter, ip); if (connection == null || !connection.isFine()) { registerMetaServer(dataCenter, ip); } } SetipSet = metaServerConnectionFactory.getIps(dataCenter); for (String ip : ipSet) { if (!ips.contains(ip)) { metaServerConnectionFactory.remove(dataCenter, ip); } } } else { //remove connections of dataCenter if the connectionMap of the dataCenter in ipMap is empty removeDataCenter(dataCenter); } } //remove connections of dataCenter if the dataCenter not exist in ipMap SetdataCenters = metaServerConnectionFactory.getAllDataCenters(); for (String dataCenter : dataCenters) { if (!ipMap.containsKey(dataCenter)) { removeDataCenter(dataCenter); } } } private void registerMetaServer(String dataCenter, String ip) { PeerId leader = metaServerService.getLeader(); for (int tryCount = 0; tryCount < TRY_COUNT; tryCount++) { try { Channel channel = metaNodeExchanger.connect(new URL(ip, dataServerConfig .getMetaServerPort())); //connect all meta server if (channel != null && channel.isConnected()) { metaServerConnectionFactory.register(dataCenter, ip, ((BoltChannel) channel).getConnection()); } //register leader meta node if (ip.equals(leader.getIp())) { Object obj = null; try { obj = metaNodeExchanger.request(new Request() { @Override public Object getRequestBody() { return new DataNode(new URL(DataServerConfig.IP), dataServerConfig .getLocalDataCenter()); } @Override public URL getRequestUrl() { return new URL(ip, dataServerConfig.getMetaServerPort()); } }).getResult(); } if (obj instanceof NodeChangeResult) { NodeChangeResultresult = (NodeChangeResult) obj; MapversionMap = result.getDataCenterListVersions(); //send renew after first register dataNode Setset = new HashSet<>(); set.add(StartTaskTypeEnum.RENEW); eventCenter.post(new StartTaskEvent(set)); eventCenter.post(new DataServerChangeEvent(result.getNodes(), versionMap,DataServerChangeEvent.FromType.REGISTER_META)); break; } } } } }
此时逻辑图如下:
+-------------------------------+ |[DataServerBootstrap] | MetaServerChangeEvent | | | +-------------------------+ | startRaftClient | | | | | +---------------+ | | | | | +-------------------------------+ | | | +-------------------------------+ | | | | [Timer] | | v | | | | 1 +-------+-----+ | | ConnectionRefreshMetaTask +------------------------------> | EventCenter +----+ | | | MetaServerChangeEvent | +-------+-----+ | | +-------------------------------+ | ^ | | +-------------------------------+ | | | | | | | | | | | [Push] | | | | | | | | | | | | +-------------------------+ | | | | | MetaServerChangeEvent | | | | ServerChangeHandler | 2 | | | | +----------------------------------------+ | | +-------------------------------+ DataServerChangeEvent | | | | | | MetaServerChangeEvent | | 3 | | +----------------------------------------------------+ | | | v | +-----------------+--------------+ DataServerChangeEvent | | | 4 | | MetaServerChangeEventHandler +------------------------------------------+ | | +--------------------------------+
手机如下:
下面我们讲讲dataServer如何管理metaServer的连接。
我们知道,一次 tcp 请求大致分为三个步骤:建立连接、通信、关闭连接。每次建立新连接都会经历三次握手,中间包含三次网络传输,对于高并发的系统,这是一笔不小的负担;关闭连接同样如此。为了减少每次网络调用请求的开销,对连接进行管理、复用,可以极大的提高系统的性能。
为了提高通信效率,我们需要考虑复用连接,减少 TCP 三次握手的次数,因此需要有连接管理的机制。
关于连接管理,SOFARegistry有两个层次的连接管理,分别是 Connection 和 Node。
可以用socket(localIp,localPort, remoteIp,remotePort )代表一个连接,在Netty中用Channel来表示,在sofa-bolt使用Connection对象来抽象一个连接,一个连接在client跟server端各用一个connection对象表示。
有了Connection这个抽象之后,自然的需要提供接口来管理Connection, 这个接口就是ConnectionFactory。
不论是服务端还是客户端,其实本质都在做一件事情:创建 ConnectionEventHandler 实例并添加到 Netty 的 pipeline 中。
之后当有 ConnectionEvent 触发时(无论是 Netty 定义的事件被触发,还是 SOFABolt 定义的事件被触发),ConnectionEventHandler 会通过异步线程执行器通知 ConnectionEventListener,ConnectionEventListener 将消息派发给具体的 ConnectionEventProcessor 实现类。
metaServerConnectionFactory 是用来存储所有 meta Sever Connection,这是Bolt的机制应用,需要维持一个长连接。
MetaServerChangeEvent 内容是:dataCenter,以及其下面的Data Server ip 列表。对应MetaServerConnectionFactory 的 MAP 是:
Map< dataCenter : Map>
具体定义如下:
public class MetaServerConnectionFactory { private final Map<String, Map> MAP = new ConcurrentHashMap<>(); /** * * @param dataCenter * @param ip * @param connection */ public void register(String dataCenter, String ip, Connection connection) { MapconnectionMap = MAP.get(dataCenter); if (connectionMap == null) { MapnewConnectionMap = new ConcurrentHashMap<>(); connectionMap = MAP.putIfAbsent(dataCenter, newConnectionMap); if (connectionMap == null) { connectionMap = newConnectionMap; } } connectionMap.put(ip, connection); } }
只是在 MetaServerChangeEventHandler . doHandle 函数中有添加操作,调用了metaServerConnectionFactory.register
。
所以在 doHandle 函数中,遍历Event所有的 meta Server IP,这里每一个ip对应一个 data Center。对于每一个ip做如下操作:
removeDataCenter(dataCenter);
其中使用了metaNodeExchanger去连接metaServer。具体代码如下:
private void registerMetaServer(String dataCenter, String ip) { PeerId leader = metaServerService.getLeader(); for (int tryCount = 0; tryCount < TRY_COUNT; tryCount++) { try { Channel channel = metaNodeExchanger.connect(new URL(ip, dataServerConfig .getMetaServerPort())); //connect all meta server if (channel != null && channel.isConnected()) { metaServerConnectionFactory.register(dataCenter, ip, ((BoltChannel) channel).getConnection()); } //其他操作 } }
MetaServerConnectionFactory在运行时如下:
metaServerConnectionFactory = {MetaServerConnectionFactory@5387} MAP = {ConcurrentHashMap@6154} size = 1 "DefaultDataCenter" -> {ConcurrentHashMap@6167} size = 1
dataServer和metaServer之间是推拉模型交互。
MetaNodeExchanger 是 bolt Exchange,把metaServer相关的网络操作集中在一起。无论是MetaServerChangeEventHandler还是DefaultMetaServiceImpl,都基于此与Meta Server交互。其中
connect 设置了响应函数metaClientHandlers
而 request 时候,如果失败了,则会 metaServerService.refreshLeader().getIp() 刷新地址,重新调用。
这里会测试MetaServer有效性 。
public class MetaNodeExchanger implements NodeExchanger { @Autowired private Exchange boltExchange; @Autowired private IMetaServerService metaServerService; @Autowired private DataServerConfig dataServerConfig; @Resource(name = "metaClientHandlers") private CollectionmetaClientHandlers; @Override public Response request(Request request) { Client client = boltExchange.getClient(Exchange.META_SERVER_TYPE); try { final Object result = client.sendSync(request.getRequestUrl(), request.getRequestBody(), dataServerConfig.getRpcTimeout()); return () -> result; } catch (Exception e) { //retry URL url = new URL(metaServerService.refreshLeader().getIp(), dataServerConfig.getMetaServerPort()); final Object result = client.sendSync(url, request.getRequestBody(), request.getTimeout() != null ? request.getTimeout() : dataServerConfig.getRpcTimeout()); return () -> result; } } public Channel connect(URL url) { Client client = boltExchange.getClient(Exchange.META_SERVER_TYPE); if (client == null) { synchronized (this) { client = boltExchange.getClient(Exchange.META_SERVER_TYPE); if (client == null) { client = boltExchange.connect(Exchange.META_SERVER_TYPE, url, metaClientHandlers.toArray(new ChannelHandler[metaClientHandlers.size()])); } } } //try to connect data Channel channel = client.getChannel(url); if (channel == null) { synchronized (this) { channel = client.getChannel(url); if (channel == null) { channel = client.connect(url); } } } return channel; } }
MetaNodeExchanger响应Handler如下,这部分是推模型,前面已经提到了,serverChangeHandler会响应推送。
@Bean(name = "metaClientHandlers") public CollectionmetaClientHandlers() { Collectionlist = new ArrayList<>(); list.add(serverChangeHandler()); list.add(statusConfirmHandler()); list.add(notifyProvideDataChangeHandler()); return list; }
DefaultMetaServiceImpl是Meta Server相关服务的核心实现。
其中,raftClient是raft的入口,metaNodeExchanger 是bolt的入口。metaServerConnectionFactory 保存目前所有的 meta server bolt connection。
public class DefaultMetaServiceImpl implements IMetaServerService { @Autowired private DataServerConfig dataServerConfig; @Autowired private MetaNodeExchanger metaNodeExchanger; @Autowired private MetaServerConnectionFactory metaServerConnectionFactory; @Autowired private DataServerCache dataServerCache; private RaftClient raftClient; }
刷新是重要功能之一,用来获取raft leader。
@Override public PeerId getLeader() { if (raftClient == null) { startRaftClient(); } PeerId leader = raftClient.getLeader(); if (leader == null) { throw new RuntimeException( "[DefaultMetaServiceImpl] register MetaServer get no leader!"); } return leader; } @Override public PeerId refreshLeader() { if (raftClient == null) { startRaftClient(); } PeerId leader = raftClient.refreshLeader(); if (leader == null) { throw new RuntimeException("[RaftClientManager] refresh MetaServer get no leader!"); } return leader; }
另外一个重要功能是重连。
getMetaServerMap完成了重连,getMetaServerMap 的作用:
GetNodesRequest(NodeType.META)
;GetNodesRequest(NodeType.META)
的结果 NodeChangeResult,构建一个 MetaServerChangeEvent,放入EventCenter。eventCenter.post(new MetaServerChangeEvent(metaServerService.getMetaServerMap()));
具体代码如下:
@Override public Map<String, Set> getMetaServerMap() { HashMap<String, Set> map = new HashMap<>(); Setset = dataServerConfig.getMetaServerIpAddresses(); MapconnectionMap = metaServerConnectionFactory .getConnections(dataServerConfig.getLocalDataCenter()); Connection connection = null; try { if (connectionMap.isEmpty()) { Listlist = new ArrayList(set); Collections.shuffle(list); connection = ((BoltChannel) metaNodeExchanger.connect(new URL(list.iterator() .next(), dataServerConfig.getMetaServerPort()))).getConnection(); } else { Listconnections = new ArrayList<>(connectionMap.values()); Collections.shuffle(connections); connection = connections.iterator().next(); if (!connection.isFine()) { connection = ((BoltChannel) metaNodeExchanger.connect(new URL(connection .getRemoteIP(), dataServerConfig.getMetaServerPort()))).getConnection(); } } GetNodesRequest request = new GetNodesRequest(NodeType.META); final Connection finalConnection = connection; Object obj = metaNodeExchanger.request(new Request() { @Override public Object getRequestBody() { return request; } @Override public URL getRequestUrl() { return new URL(finalConnection.getRemoteIP(), finalConnection.getRemotePort()); } }).getResult(); if (obj instanceof NodeChangeResult) { NodeChangeResultresult = (NodeChangeResult) obj; Map<String, Map> metaNodesMap = result.getNodes(); if (metaNodesMap != null && !metaNodesMap.isEmpty()) { MapmetaNodeMap = metaNodesMap.get(dataServerConfig .getLocalDataCenter()); if (metaNodeMap != null && !metaNodeMap.isEmpty()) { map.put(dataServerConfig.getLocalDataCenter(), metaNodeMap.keySet()); } } } } return map; }
其中,具体获取MetaServer信息是在
@ConfigurationProperties(prefix = DataServerConfig.PRE_FIX) public class DataServerConfig { /** * Getter method for property metaServerIpAddress. * * @return property value of metaServerIpAddress */ public SetgetMetaServerIpAddresses() { if (metaIps != null && !metaIps.isEmpty()) { return metaIps; } metaIps = new HashSet<>(); if (commonConfig != null) { Map<String, Collection> metaMap = commonConfig.getMetaNode(); if (metaMap != null && !metaMap.isEmpty()) { String localDataCenter = commonConfig.getLocalDataCenter(); if (localDataCenter != null && !localDataCenter.isEmpty()) { Collectionmetas = metaMap.get(localDataCenter); if (metas != null && !metas.isEmpty()) { metaIps = metas.stream().map(NetUtil::getIPAddressFromDomain).collect(Collectors.toSet()); } } } } return metaIps; } }
在文中我们可以看到,MetaServerChangeEvent也会转化为 DataServerChangeEvent,投放到EventCenter。
如前图的2,4两步。这是因为Meta Server的这个推送,也许是告诉data Server,"hi,目前data server也有变动"。所以下一期我们介绍如何处理DataServerChangeEvent。
前图:
+-------------------------------+ |[DataServerBootstrap] | MetaServerChangeEvent | | | +-------------------------+ | startRaftClient | | | | | +---------------+ | | | | | +-------------------------------+ | | | +-------------------------------+ | | | | [Timer] | | v | | | | 1 +-------+-----+ | | ConnectionRefreshMetaTask +------------------------------> | EventCenter +----+ | | | MetaServerChangeEvent | +-------+-----+ | | +-------------------------------+ | ^ | | +-------------------------------+ | | | | | | | | | | | [Push] | | | | | | | | | | | | +-------------------------+ | | | | | MetaServerChangeEvent | | | | ServerChangeHandler | 2 | | | | +----------------------------------------+ | | +-------------------------------+ DataServerChangeEvent | | | | | | MetaServerChangeEvent | | 3 | | +----------------------------------------------------+ | | | v | +-----------------+--------------+ DataServerChangeEvent | | | 4 | | MetaServerChangeEventHandler +------------------------------------------+ | | +--------------------------------+
手机如下:
sofa-bolt学习