SOFARegistry 是蚂蚁金服开源的一个生产级、高时效、高可用的服务注册中心。本系列文章重点在于分析设计和架构,即利用多篇文章,从多个角度反推总结 DataServer 或者 SOFARegistry 的实现机制和架构思路,让大家借以学习阿里如何设计。本文为第十三篇,介绍从SessionServer角度看的服务上线。
目录
SOFARegistry 是蚂蚁金服开源的一个生产级、高时效、高可用的服务注册中心。
本系列文章重点在于分析设计和架构,即利用多篇文章,从多个角度反推总结 DataServer 或者 SOFARegistry 的实现机制和架构思路,让大家借以学习阿里如何设计。
本文为第十三篇,介绍从SessionServer角度看的服务上线。
本文以介绍业务为主,顺便整理逻辑,设计和模式。因为注册过程牵扯模块太多,所以本文仅仅专注在注册过程中Session Server的部分。
服务的上下线过程是指服务通过代码调用执行常规注册(Publisher#register) 和下线(Publisher#unregister)操作,不考虑因为服务宕机等意外情况导致的下线场景。
一个典型的 “RPC 调用的服务寻址” 应用场景,服务的提供方通过如下两个步骤完成服务发布:
与此相对应的,服务的调用方通过如下步骤实现服务调用:
在SOFARegistry中,所有 Client 在注册和订阅数据时,根据 dataInfoId 做一致性 Hash,计算出应该访问哪一台 DataServer,然后与该 DataServer 建立长连接。
由于每个 Client 通常都会注册和订阅比较多的 dataInfoId 数据,因此我们可以预见每个 Client 均会与好几台 DataServer 建立连接。这个架构存在的问题是:“每台 DataServer 承载的连接数会随 Client 数量的增长而增长,每台 Client 极端的情况下需要与每台 DataServer 都建连,因此通过 DataServer 的扩容并不能线性的分摊 Client 连接数”。
所以,为数据分片层(DataServer)专门设计一个连接代理层是非常重要的,所以 SOFARegistry 就有了 SessionServer 这一层。随着 Client 数量的增长,可以通过扩容 SessionServer 就解决了单机的连接数瓶颈问题。
因为SessionServer是一个中间层,所以看起来好像比较简单,表面上看,就是接受,转发。
但是实际上,在大型系统中,应该如何在逻辑上,物理上实现模块分割,解耦都是非常有必要的。
我们主要看看阿里方案的注册部分。
服务的上下线过程,是指服务通过代码调用做正常的注册(publisher.register) 和 下线(publisher.unregister),不考虑因为服务宕机等意外情况导致的下线。如上图,大概呈现了“一次服务注册过程”的服务数据在内部流转过程。
因为篇幅所限,本文讨论的是前两点。
下图展示了 Publisher 注册的代码流转过程
这个过程也是采用了 Handler - Task & Strategy - Listener 的方式来处理,任务在代码内部的处理流程和订阅过程基本一致。
PublisherRegistration 是Client的接口,发布数据的关键代码如下:
// 构造发布者注册表 PublisherRegistration registration = new PublisherRegistration("com.alipay.test.demo.service:1.0@DEFAULT"); registration.setGroup("TEST_GROUP"); registration.setAppName("TEST_APP"); // 将注册表注册进客户端并发布数据 Publisher publisher = registryClient.register(registration, "10.10.1.1:12200?xx=yy"); // 如需覆盖上次发布的数据可以使用发布者模型重新发布数据 publisher.republish("10.10.1.1:12200?xx=zz");
发布数据的关键是构造 PublisherRegistration,该类包含三个属性:
属性名 | 属性类型 | 描述 |
---|---|---|
dataId | String | 数据ID,发布订阅时需要使用相同值,数据唯一标识由 dataId + group + instanceId 组成。 |
group | String | 数据分组,发布订阅时需要使用相同值,数据唯一标识由 dataId + group + instanceId 组成,默认值 DEFAULT_GROUP。 |
appName | String | 应用 appName。 |
流程来到了Session server。
首先,可以通过Beans来入手。
@Bean(name = "serverHandlers") public CollectionserverHandlers() { Collectionlist = new ArrayList<>(); list.add(publisherHandler()); list.add(subscriberHandler()); list.add(watcherHandler()); list.add(clientNodeConnectionHandler()); list.add(cancelAddre***equestHandler()); list.add(syncConfigHandler()); return list; }
serverHandlers 是Bolt Server 的响应函数组合。
@Bean @ConditionalOnMissingBean(name = "sessionRegistry") public Registry sessionRegistry() { return new SessionRegistry(); }
从Bean角度看,目前的逻辑是如图所示,这里有了一次解耦Strategy:
Beans +-----------------------------------+ | Bolt Server(in openSessionServer) | +---------------------------------+ | | +-> | DefaultPublisherHandlerStrategy | | +----------------------+ | | +---------+-----------------------+ | | serverHandlers | | | | | | | | | | | | +------------------+ | | | | | | | PublisherHandle+----------------+ v | | | | | | +-------+-------+ | | | watcherHandler | | | |SessionRegistry| | | | | | | +---------------+ | | | ...... | | | | | +------------------+ | | | +----------------------+ | +-----------------------------------+
服务发布者和Session Server一般都应该处于一个Data Center之中,这就是阿里等实践的单体概念.
PublisherHandler 是 Session Server对Client的接口,是Bolt Server 的响应函数。
public class PublisherHandler extends AbstractServerHandler { @Autowired private ExecutorManager executorManager; @Autowired private PublisherHandlerStrategy publisherHandlerStrategy; @Override public Object reply(Channel channel, Object message) throws RemotingException { RegisterResponse result = new RegisterResponse(); PublisherRegister publisherRegister = (PublisherRegister) message; publisherHandlerStrategy.handlePublisherRegister(channel, publisherRegister, result); return result; }
逻辑如下图所示:
Publisher + Session Server Scope Scope | | +-----------------------------------+ | | Bolt Server(in openSessionServer) | | | | | | +----------------------+ | + | | serverHandlers | | | | | | +--------+ PublisherRegister | | +------------------+ | | | Client +---------------------------> PublisherHandler | | | +--------+ 1 | | | | | | + | | | ...... | | | | | | +------------------+ | | | | +----------------------+ | | +-----------------------------------+ |
整体上,这里是采用 Handler - Task & Strategy - Listener 的方式来处理。
什么是策略模式(Strategy Pattern)
在软件开发过程中常常遇到这样的情况,实现某一个功能有很多种算法或实现策略,我们可以根据环境或者条件的不同选择不同的算法或者策略来完成该功能。如果将这些算法或者策略抽象出来,提供一个统一的接口,不同的算法或者策略有不同的实现类,这样在程序客户端就可以通过注入不同的实现对象来实现算法或者策略的动态替换,这种模式的可扩展性和可维护性也更高,这就是策略模式。
策略模式的定义(Strategy Pattern)
策略模式: 定义了算法族,分别封装起来,让它们之间可以相互替换,此模式让算法的变化独立与使用算法的客户。
简单理解: 定义了一系列算法。每个算法封装起来。各个算法之间可以互相替换。且算法的变化不会影响到使用算法的客户。属于行为型模式。
在策略模式(Strategy Pattern)中,一个类的行为或其算法可以在运行时更改。这种类型的设计模式属于行为型模式。
在策略模式中,我们创建表示各种策略的对象和一个行为随着策略对象改变而改变的 context 对象。策略对象改变 context 对象的执行算法。
从目录结构看,有很多Strategy的定义和实现,应该蚂蚁内部希望根据不同情况制定不同的策略,其中有些是目前留出的接口。
com/alipay/sofa/registry/server/session/strategy . ├── DataChangeRequestHandlerStrategy.java ├── PublisherHandlerStrategy.java ├── ReceivedConfigDataPushTaskStrategy.java ├── ReceivedDataMultiPushTaskStrategy.java ├── SessionRegistryStrategy.java ├── SubscriberHandlerStrategy.java ├── SubscriberMultiFetchTaskStrategy.java ├── SubscriberRegisterFetchTaskStrategy.java ├── SyncConfigHandlerStrategy.java ├── TaskMergeProcessorStrategy.java ├── WatcherHandlerStrategy.java └── impl ├── DefaultDataChangeRequestHandlerStrategy.java ├── DefaultPublisherHandlerStrategy.java ├── DefaultPushTaskMergeProcessor.java ├── DefaultReceivedConfigDataPushTaskStrategy.java ├── DefaultReceivedDataMultiPushTaskStrategy.java ├── DefaultSessionRegistryStrategy.java ├── DefaultSubscriberHandlerStrategy.java ├── DefaultSubscriberMultiFetchTaskStrategy.java ├── DefaultSubscriberRegisterFetchTaskStrategy.java ├── DefaultSyncConfigHandlerStrategy.java └── DefaultWatcherHandlerStrategy.java
从目前代码看,只是设置,分类,转发。即设置Publisher的缺省信息,并且根据 event type 不同执行register或者unRegister。
public class DefaultPublisherHandlerStrategy implements PublisherHandlerStrategy { @Autowired private Registry sessionRegistry; @Override public void handlePublisherRegister(Channel channel, PublisherRegister publisherRegister, RegisterResponse registerResponse) { try { String ip = channel.getRemoteAddress().getAddress().getHostAddress(); int port = channel.getRemoteAddress().getPort(); publisherRegister.setIp(ip); publisherRegister.setPort(port); if (StringUtils.isBlank(publisherRegister.getZone())) { publisherRegister.setZone(ValueConstants.DEFAULT_ZONE); } if (StringUtils.isBlank(publisherRegister.getInstanceId())) { publisherRegister.setInstanceId(DEFAULT_INSTANCE_ID); } Publisher publisher = PublisherConverter.convert(publisherRegister); publisher.setProcessId(ip + ":" + port); publisher.setSourceAddress(new URL(channel.getRemoteAddress())); if (EventTypeConstants.REGISTER.equals(publisherRegister.getEventType())) { sessionRegistry.register(publisher); } else if (EventTypeConstants.UNREGISTER.equals(publisherRegister.getEventType())) { sessionRegistry.unRegister(publisher); } registerResponse.setSuccess(true); registerResponse.setVersion(publisher.getVersion()); registerResponse.setRegistId(publisherRegister.getRegistId()); registerResponse.setMessage("Publisher register success!"); } } }
逻辑如下图所示
Publisher + Session Server Scope Scope | | +-----------------------------------+ | | Bolt Server(in openSessionServer) | | | | | | +----------------------+ | + | | serverHandlers | | +-------------------------------+ | | | | |DefaultPublisherHandlerStrategy| +--------+ PublisherRegister | | +------------------+ | | handlePublisherRegister | | | Client +---------------------------> PublisherHandler+------------------------------------>+ EventType == REGISTER | +--------+ 1 | | | | | | | | + | | | watcherHandler | | | +-------------------------------+ | | | | | | | | | | | ...... | | | | | | +------------------+ | | | | +----------------------+ | +-----------------------------------+
手机如图
前面代码中,策略会调用到 sessionRegistry.register(publisher),即注册功能。
从SessionRegistry的内部成员变量就能够看出来,这是 Session Server 核心逻辑所在,相对于业务引擎。
主要提供了如下功能:
register(StoreData
cancel(List
remove(List
unRegister(StoreData
.....
具体成员变量如下:
public class SessionRegistry implements Registry { /** * store subscribers */ @Autowired private Interests sessionInterests; /** * store watchers */ @Autowired private Watchers sessionWatchers; /** * store publishers */ @Autowired private DataStore sessionDataStore; /** * transfer data to DataNode */ @Autowired private DataNodeService dataNodeService; /** * trigger task com.alipay.sofa.registry.server.meta.listener process */ @Autowired private TaskListenerManager taskListenerManager; /** * calculate data node url */ @Autowired private NodeManager dataNodeManager; @Autowired private SessionServerConfig sessionServerConfig; @Autowired private Exchange boltExchange; @Autowired private SessionRegistryStrategy sessionRegistryStrategy; @Autowired private WrapperInterceptorManager wrapperInterceptorManager; @Autowired private DataIdMatchStrategy dataIdMatchStrategy; @Autowired private RenewService renewService; @Autowired private WriteDataAcceptor writeDataAcceptor; private volatile boolean enableDataRenewSnapshot = true; }
register函数生成一个WriteDataRequest,然后调用了 writeDataAcceptor.accept 完成处理。
@Override public void register(StoreData storeData) { WrapperInvocationwrapperInvocation = new WrapperInvocation( new Wrapper() { @Override public Boolean call() { switch (storeData.getDataType()) { case PUBLISHER: Publisher publisher = (Publisher) storeData; sessionDataStore.add(publisher); // All write operations to DataServer (pub/unPub/clientoff/renew/snapshot) // are handed over to WriteDataAcceptor writeDataAcceptor.accept(new WriteDataRequest() { @Override public Object getRequestBody() { return publisher; } @Override public WriteDataRequestType getRequestType() { return WriteDataRequestType.PUBLISHER; } @Override public String getConnectId() { return publisher.getSourceAddress().getAddressString(); } @Override public String getDataServerIP() { Node dataNode = dataNodeManager.getNode(publisher.getDataInfoId()); return dataNode.getNodeUrl().getIpAddress(); } }); sessionRegistryStrategy.afterPublisherRegister(publisher); break; case SUBSCRIBER: Subscriber subscriber = (Subscriber) storeData; sessionInterests.add(subscriber); sessionRegistryStrategy.afterSubscriberRegister(subscriber); break; case WATCHER: Watcher watcher = (Watcher) storeData; sessionWatchers.add(watcher); sessionRegistryStrategy.afterWatcherRegister(watcher); break; default: break; } return null; } @Override public SuppliergetParameterSupplier() { return () -> storeData; } }, wrapperInterceptorManager); try { wrapperInvocation.proceed(); } catch (Exception e) { throw new RuntimeException("Proceed register error!", e); } }
目前逻辑如下图所示:
Publisher + Session Server Scope Scope | | +-----------------------------------+ | | Bolt Server(in openSessionServer) | | | | | | +----------------------+ | + | | serverHandlers | | +-------------------------------+ | | | | |DefaultPublisherHandlerStrategy| +--------+ PublisherRegister | | +------------------+ | | handlePublisherRegister | | | Client +---------------------------> PublisherHandler+------------------------------------>+ EventType == REGISTER | +--------+ 1 | | | | | | 2 | | + | | | watcherHandler | | | +------------+------------------+ | | | | | | | | | | | | ...... | | | | | | | +------------------+ | | 3 | register | | +----------------------+ | | +-----------------------------------+ | v +-------------------+-------------------+ | SessionRegistry | | | | | | storeData.getDataType() == PUBLISHER | +---------------------------------------+
手机如下:
这里又出现一个策略,目前也只有一个实现,应该也是想要未来做成替换,目前功能只是简单的留下了接口为空。
我们可以看出阿里处处想解耦的思路。
public class DefaultSessionRegistryStrategy implements SessionRegistryStrategy { @Override public void afterPublisherRegister(Publisher publisher) { } }
前文在注册过程中有:
sessionDataStore.add(publisher);
这里就是Session的 数据存储模块,也是系统的核心。 存储了在本Session注册的所有Publisher。
public class SessionDataStore implements DataStore { /** * publisher store */ private Map<String/*dataInfoId*/, Map> registry = new ConcurrentHashMap<>(); /*** index */ private Map<String/*connectId*/, Map> connectIndex = new ConcurrentHashMap<>(); }
这里记录了两种存储方式,分别是按照 dataInfoId 和 connectId 来存储。
存储时候,会从版本号和时间戳两个维度来比较。
@Override public void add(Publisher publisher) { Publisher.internPublisher(publisher); write.lock(); try { Mappublishers = registry.get(publisher.getDataInfoId()); if (publishers == null) { ConcurrentHashMapnewmap = new ConcurrentHashMap<>(); publishers = registry.putIfAbsent(publisher.getDataInfoId(), newmap); if (publishers == null) { publishers = newmap; } } Publisher existingPublisher = publishers.get(publisher.getRegisterId()); if (existingPublisher != null) { if (existingPublisher.getVersion() != null) { long oldVersion = existingPublisher.getVersion(); Long newVersion = publisher.getVersion(); if (newVersion == null) { return; } else if (oldVersion > newVersion) { return; } else if (oldVersion == newVersion) { Long newTime = publisher.getRegisterTimestamp(); long oldTime = existingPublisher.getRegisterTimestamp(); if (newTime == null) { return; } if (oldTime > newTime) { return; } } } } publishers.put(publisher.getRegisterId(), publisher); addToConnectIndex(publisher); } finally { write.unlock(); } }
在SessionServer本身存储完成之后,接下来就是通知Data Server了。
下面工作的目的是:把收到的注册信息转发给 DataServer,但是为了接耦,这里做了比较复杂的工作。
WriteDataAcceptorImpl 负责处理具体Publisher的写入。首先需要把写入请求统一起来。
使用 private Map
; 来统一存储所有的写入请求。
这里根据不同的Connection来处理不同连接的写入请求。
具体如下:
public class WriteDataAcceptorImpl implements WriteDataAcceptor { @Autowired private TaskListenerManager taskListenerManager; @Autowired private SessionServerConfig sessionServerConfig; @Autowired private RenewService renewService; /** * acceptor for all write data request * key:connectId * value:writeRequest processor * */ private MapwriteDataProcessors = new ConcurrentHashMap(); public void accept(WriteDataRequest request) { String connectId = request.getConnectId(); WriteDataProcessor writeDataProcessor = writeDataProcessors.computeIfAbsent(connectId, key -> new WriteDataProcessor(connectId, taskListenerManager, sessionServerConfig, renewService)); writeDataProcessor.process(request); } public void remove(String connectId) { writeDataProcessors.remove(connectId); } }
目前逻辑如下图所示
Publisher + Session Server Scope Scope | | +-----------------------------------+ | | Bolt Server(in openSessionServer) | | | | | | +----------------------+ | + | | serverHandlers | | +-------------------------------+ | | | | |DefaultPublisherHandlerStrategy| +--------+ PublisherRegister | | +------------------+ | | handlePublisherRegister | | | Client +---------------------------> PublisherHandler+------------------------------------>+ EventType == REGISTER | +--------+ 1 | | | | | | 2 | | + | | | watcherHandler | | | +------------+------------------+ | | | | | | | | | | | | ...... | | | | | | | +------------------+ | | register | 3 | | +----------------------+ | | | +-----------------------------------+ | | v | +-----------------------------------------------------+ +-------------+-------------------------+ | | WriteDataAcceptorImpl | WriteDataRequest | SessionRegistry | | | | <------------------+ | | | | | | | | MapwriteDataProcessors | | storeData.getDataType() == PUBLISHER | | | | +---------------------------------------+ + +-----------------------------------------------------+
手机如图
前面已经把所有请求统一起来,现在就需要针对每一个连接的写入继续处理。
这里关键是如下数据结构,就是每一个连接的写入请求 放到了queue中。使用 Queue 的目的是为了内部做缓存,统一发给 DataServer。
ConcurrentLinkedQueueacceptorQueue
针对每个请求不同,做不同处理。
对于我们的例子,处理如下:
case PUBLISHER: { doPublishAsync(request); }
而最终是向taskListenerManager发送给请求TaskType.PUBLISH_DATA_TASK,该请求将被PublishDataTaskListener调用publishDataTask来处理。
这里有一个listener解耦,我们接下来讲解。
private void doPublishAsync(WriteDataRequest request) { sendEvent(request.getRequestBody(), TaskType.PUBLISH_DATA_TASK); } private void sendEvent(Object eventObj, TaskType taskType) { TaskEvent taskEvent = new TaskEvent(eventObj, taskType); taskListenerManager.sendTaskEvent(taskEvent); }
具体代码如下:
public class WriteDataProcessor { private final TaskListenerManager taskListenerManager; private final SessionServerConfig sessionServerConfig; private final RenewService renewService; private final String connectId; private MaplastUpdateTimestampMap = new ConcurrentHashMap<>(); private AtomicBoolean writeDataLock = new AtomicBoolean( false); private ConcurrentLinkedQueueacceptorQueue = new ConcurrentLinkedQueue(); private AtomicInteger acceptorQueueSize = new AtomicInteger(0); public void process(WriteDataRequest request) { // record the last update time by pub/unpub if (isWriteRequest(request)) { refreshUpdateTime(request.getDataServerIP()); } if (request.getRequestType() == WriteDataRequestType.DATUM_SNAPSHOT) { // snapshot has high priority, so handle directly doHandle(request); } else { // If locked, insert the queue; // otherwise, try emptying the queue (to avoid residue) before processing the request. if (writeDataLock.get()) { addQueue(request); } else { flushQueue(); doHandle(request); } } } private void doHandle(WriteDataRequest request) { switch (request.getRequestType()) { case PUBLISHER: { doPublishAsync(request); } break; case UN_PUBLISHER: { doUnPublishAsync(request); } break; case CLIENT_OFF: { doClientOffAsync(request); } break; case RENEW_DATUM: { if (renewAndSnapshotInSilence(request.getDataServerIP())) { return; } doRenewAsync(request); } break; case DATUM_SNAPSHOT: { if (renewAndSnapshotInSilenceAndRefreshUpdateTime(request.getDataServerIP())) { return; } halt(); try { doSnapshotAsync(request); } finally { resume(); } } break; } private void doPublishAsync(WriteDataRequest request) { sendEvent(request.getRequestBody(), TaskType.PUBLISH_DATA_TASK); } private void sendEvent(Object eventObj, TaskType taskType) { TaskEvent taskEvent = new TaskEvent(eventObj, taskType); taskListenerManager.sendTaskEvent(taskEvent); } }
如下图所示
Publisher + Session Server Scope Scope | | +-----------------------------------+ | | Bolt Server(in openSessionServer) | | | | | | +----------------------+ | + | | serverHandlers | | +-------------------------------+ | | | | |DefaultPublisherHandlerStrategy| +--------+ PublisherRegister | | +------------------+ | | handlePublisherRegister | | | Client +---------------------------> PublisherHandler+------------------------------------>+ EventType == REGISTER | +--------+ 1 | | | | | | 2 | | + | | | watcherHandler | | | +------------+------------------+ | | | | | | | | | | | | ...... | | | | | | | +------------------+ | | register | 3 | | +----------------------+ | | | +-----------------------------------+ | | v | +---------------------------------------------------------+ +---------+-----------------------------+ | | WriteDataAcceptorImpl | WriteDataRequest | SessionRegistry | | | | <------------------+ | | | | 4 | sessionDataStore.add(publisher) | | | MapwriteDataProcessors | | | | | | | storeData.getDataType() == PUBLISHER | | +----------------------+----------------------------------+ | | | process | 5 +---------------------------------------+ | v | +-------------------+---------------------+ +--------------------------+ | | WriteDataProcessor | | PublishDataTaskListener | | | | PUBLISH_DATA_TASK | | | | ConcurrentLinkedQueue+-------------------> | PublishDataTask | | | | 6 +--------------------------+ + +-----------------------------------------+
手机如图 :
前面在逻辑上都是一体化的,在这里,进行了一次解耦。
DefaultTaskListenerManager 是解耦的机制,可以看到,其中添加了listener,当用户调用sendTaskEvent时候,将遍历所有的listeners,调用对应的listener。
public class DefaultTaskListenerManager implements TaskListenerManager { private MultimaptaskListeners = ArrayListMultimap.create(); @Override public MultimapgetTaskListeners() { return taskListeners; } @Override public void addTaskListener(TaskListener taskListener) { taskListeners.put(taskListener.support(), taskListener); } @Override public void sendTaskEvent(TaskEvent taskEvent) { CollectiontaskListeners = this.taskListeners.get(taskEvent.getTaskType()); for (TaskListener taskListener : taskListeners) { taskListener.handleEvent(taskEvent); } } }
PublishDataTaskListener是对应的处理函数,在其support函数中,声明了支持PUBLISH_DATA_TASK。这样就完成了解耦。
public class PublishDataTaskListener implements TaskListener { @Autowired private DataNodeService dataNodeService; @Autowired private TaskProcessor dataNodeSingleTaskProcessor; @Autowired private ExecutorManager executorManager; @Override public TaskType support() { return TaskType.PUBLISH_DATA_TASK; } @Override public void handleEvent(TaskEvent event) { SessionTask publishDataTask = new PublishDataTask(dataNodeService); publishDataTask.setTaskEvent(event); executorManager.getPublishDataExecutor().execute(()-> dataNodeSingleTaskProcessor.process(publishDataTask)); } }
上面找到了Listener,Listener中通过如下代码启动了执行业务的task来处理。但是这背后的机制需要探究。
executorManager.getPublishDataExecutor().execute(()-> dataNodeSingleTaskProcessor.process(publishDataTask));
ExecutorManager 之中,对于线程池做了统一的启动,关闭。publishDataExecutor就是其中之一。
ExecutorManager相关代码摘取如下:
public class ExecutorManager { private final ScheduledThreadPoolExecutor scheduler; private final ThreadPoolExecutor publishDataExecutor; private static final String PUBLISH_DATA_EXECUTOR = "PublishDataExecutor"; public ExecutorManager(SessionServerConfig sessionServerConfig) { publishDataExecutor = reportExecutors.computeIfAbsent(PUBLISH_DATA_EXECUTOR, k -> new SessionThreadPoolExecutor(PUBLISH_DATA_EXECUTOR, sessionServerConfig.getPublishDataExecutorMinPoolSize(), sessionServerConfig.getPublishDataExecutorMaxPoolSize(), sessionServerConfig.getPublishDataExecutorKeepAliveTime(), TimeUnit.SECONDS, new ArrayBlockingQueue<>(sessionServerConfig.getPublishDataExecutorQueueSize()), new NamedThreadFactory("PublishData-executor", true))); } public ThreadPoolExecutor getPublishDataExecutor() { return publishDataExecutor; } }
其中ExecutorManager的bean如下:
@Bean public ExecutorManager executorManager(SessionServerConfig sessionServerConfig) { return new ExecutorManager(sessionServerConfig); }
Processor是任务定义,内部封装了task。
public class DataNodeSingleTaskProcessor implements TaskProcessor{ @Override public ProcessingResult process(SessionTask task) { try { task.execute(); return ProcessingResult.Success; } catch (Throwable throwable) { if (task instanceof Retryable) { Retryable retryAbleTask = (Retryable) task; if (retryAbleTask.checkRetryTimes()) { return ProcessingResult.TransientError; } } return ProcessingResult.PermanentError; } } @Override public ProcessingResult process(Listtasks) { return null; } }
PublishDataTask的execute 之中 ,调用dataNodeService.register(publisher)进行注册。
public class PublishDataTask extends AbstractSessionTask { private final DataNodeService dataNodeService; private Publisher publisher; public PublishDataTask(DataNodeService dataNodeService) { this.dataNodeService = dataNodeService; } @Override public void execute() { dataNodeService.register(publisher); } @Override public void setTaskEvent(TaskEvent taskEvent) { //taskId create from event if (taskEvent.getTaskId() != null) { setTaskId(taskEvent.getTaskId()); } Object obj = taskEvent.getEventObj(); if (obj instanceof Publisher) { this.publisher = (Publisher) obj; } } }
具体如下
+-------------------------------------------------+ | DefaultTaskListenerManager | | | | | | MultimaptaskListeners | | | +----------------------+--------------------------+ | | PUBLISH_DATA_TASK | | v +------------+--------------+ | PublishDataTaskListener | +------------+--------------+ | setTaskEvent | | v +--------+--------+ | PublishDataTask | +-----------------+
经过listener解耦之后,PublishDataTask就调用了dataNodeService.register(publisher)
,于是接下来就是转发服务信息给Data Server。
就是根据 dataInfoId 的一致性 Hash 寻找对应的 DataServer,将 PublisherRegister 发给 DataServer。
此处就是调用DataNodeServiceImpl的register函数来把请求转发给Data Server。
public class DataNodeServiceImpl implements DataNodeService { @Autowired private NodeExchanger dataNodeExchanger; @Autowired private NodeManager dataNodeManager; @Autowired private SessionServerConfig sessionServerConfig; private AsyncHashedWheelTimer asyncHashedWheelTimer; }
可以看到,建立了PublishDataRequest,然后通过Bolt Client,发送给Data Server。
@Override public void register(final Publisher publisher) { String bizName = "PublishData"; Requestrequest = buildPublishDataRequest(publisher); try { sendRequest(bizName, request); } catch (RequestException e) { doRetryAsync(bizName, request, e, sessionServerConfig.getPublishDataTaskRetryTimes(), sessionServerConfig.getPublishDataTaskRetryFirstDelay(), sessionServerConfig.getPublishDataTaskRetryIncrementDelay()); } } private CommonResponse sendRequest(String bizName, Request request) throws RequestException { Response response = dataNodeExchanger.request(request); Object result = response.getResult(); CommonResponse commonResponse = (CommonResponse) result; return commonResponse; }
如下:
+-------------------------------------------------+ | DefaultTaskListenerManager | | | | | | MultimaptaskListeners | | | +----------------------+--------------------------+ | PUBLISH_DATA_TASK | v +------------+--------------+ | PublishDataTaskListener | +------------+--------------+ | setTaskEvent | v +--------+--------+ | PublishDataTask | +--------+--------+ register | | +----------v----------+ | DataNodeServiceImpl | +----------+----------+ PublishDataRequest | v +----------+----------+ Client.sendSync +------------+ | DataNodeExchanger +------------------> | Data Server| +---------------------+ PublishDataRequest +------------+
如何知道发给哪一个Data Sever?DataNodeExchanger 中有:
@Override public Response request(Request request) throws RequestException { Response response; URL url = request.getRequestUrl(); try { Client sessionClient = getClient(url); final Object result = sessionClient .sendSync(url, request.getRequestBody(), request.getTimeout() != null ? request.getTimeout() : sessionServerConfig.getDataNodeExchangeTimeOut()); response = () -> result; } return response; }
于是去DataNodeServiceImpl寻找
private RequestbuildPublishDataRequest(Publisher publisher) { return new Request() { private AtomicInteger retryTimes = new AtomicInteger(); @Override public PublishDataRequest getRequestBody() { PublishDataRequest publishDataRequest = new PublishDataRequest(); publishDataRequest.setPublisher(publisher); publishDataRequest.setSessionServerProcessId(SessionProcessIdGenerator .getSessionProcessId()); return publishDataRequest; } @Override public URL getRequestUrl() { return getUrl(publisher.getDataInfoId()); } @Override public AtomicInteger getRetryTimes() { return retryTimes; } }; } private URL getUrl(String dataInfoId) { Node dataNode = dataNodeManager.getNode(dataInfoId); //meta push data node has not port String dataIp = dataNode.getNodeUrl().getIpAddress(); return new URL(dataIp, sessionServerConfig.getDataServerPort()); }
在 DataNodeManager中有:
@Override public DataNode getNode(String dataInfoId) { DataNode dataNode = consistentHash.getNodeFor(dataInfoId); return dataNode; }
可见是通过dataInfoId计算出hash,然后 从DataNodeManager之中获取对应的DataNode,得到其url。
于是,上图拓展为:
+-------------------------------------------------+ | DefaultTaskListenerManager | | | | MultimaptaskListeners | | | +----------------------+--------------------------+ | PUBLISH_DATA_TASK | 1 v +------------+--------------+ | PublishDataTaskListener | +------------+--------------+ | setTaskEvent | 2 v +--------+--------+ 4 +---------------+ | PublishDataTask | +------> |DataNodeManager| +--------+--------+ | +---------------+ register | 3 | consistentHash| | | | 5 +----------v----------+---+ v | DataNodeServiceImpl | 6 +-----+----+ +----------+----------+ <------------+ DataNode | PublishDataRequest | 7 url +----------+ v +----------+----------+ | DataNodeExchanger | +----------+----------+ | Client.sendSync | PublishDataRequest | v 8 +-----+------+ | Data Server| +------------+
回顾下“一次服务注册过程”的服务数据在内部流转过程。
因为篇幅所限,本文讨论的是前两点,后续会有文章介绍另外几点。
蚂蚁金服服务注册中心如何实现 DataServer 平滑扩缩容
蚂蚁金服服务注册中心 SOFARegistry 解析 | 服务发现优化之路
服务注册中心 Session 存储策略 | SOFARegistry 解析
海量数据下的注册中心 - SOFARegistry 架构介绍
服务注册中心数据分片和同步方案详解 | SOFARegistry 解析
蚂蚁金服开源通信框架SOFABolt解析之连接管理剖析
蚂蚁金服开源通信框架SOFABolt解析之超时控制机制及心跳机制
蚂蚁金服开源通信框架 SOFABolt 协议框架解析
蚂蚁金服服务注册中心数据一致性方案分析 | SOFARegistry 解析
蚂蚁通信框架实践
sofa-bolt 远程调用
sofa-bolt学习
SOFABolt 设计总结 - 优雅简洁的设计之道
SofaBolt源码分析-服务启动到消息处理
SOFABolt 源码分析
SOFABolt 源码分析9 - UserProcessor 自定义处理器的设计
SOFARegistry 介绍
SOFABolt 源码分析13 - Connection 事件处理机制的设计