SOFARegistry 是蚂蚁金服开源的一个生产级、高时效、高可用的服务注册中心。 本系列文章重点在于分析设计和架构,即利用多篇文章,从多个角度反推总结 DataServer 或者 SOFARegistry 的实现机制和架构思路,让大家借以学习阿里如何设计。 本文为第二篇,介绍SOFARegistry的网络封装和操作。
目录
SOFARegistry 是蚂蚁金服开源的一个生产级、高时效、高可用的服务注册中心。
本系列文章重点在于分析设计和架构,即利用多篇文章,从多个角度反推总结 DataServer 或者 SOFARegistry 的实现机制和架构思路,让大家借以学习阿里如何设计。
本文为第二篇,介绍SOFARegistry的网络封装和操作。
因为有的兄弟可能没有读过前面MetaServer的文章,所以这里回忆下SOFARegistry 总体架构。
应用服务器集群。Client 层是应用层,每个应用系统通过依赖注册中心相关的客户端 jar 包,通过编程方式来使用服务注册中心的服务发布和服务订阅能力。
Session 服务器集群。顾名思义,Session 层是会话层,通过长连接和 Client 层的应用服务器保持通讯,负责接收 Client 的服务发布和服务订阅请求。该层只在内存中保存各个服务的发布订阅关系,对于具体的服务信息,只在 Client 层和 Data 层之间透传转发。Session 层是无状态的,可以随着 Client 层应用规模的增长而扩容。
数据服务器集群。Data 层通过分片存储的方式保存着所用应用的服务注册数据。数据按照 dataInfoId(每一份服务数据的唯一标识)进行一致性 Hash 分片,多副本备份,保证数据的高可用。下文的重点也在于随着数据规模的增长,Data 层如何在不影响业务的前提下实现平滑的扩缩容。
元数据服务器集群。这个集群管辖的范围是 Session 服务器集群和 Data 服务器集群的服务器信息,其角色就相当于 SOFARegistry 架构内部的服务注册中心,只不过 SOFARegistry 作为服务注册中心是服务于广大应用服务层,而 Meta 集群是服务于 SOFARegistry 内部的 Session 集群和 Data 集群,Meta 层能够感知到 Session 节点和 Data 节点的变化,并通知集群的其它节点。
DataServer,SessionServer,MetaServer 本质上都是网络应用程序。这就决定了网络封装和操作是本系统的基础模块及功能,下面我们讲讲其应用场景。
SOFARegistry 的应用场景是单元化状态下。
在单元化状态下,一个单元,是一个五脏俱全的缩小版整站,它是全能的,因为部署了所有应用;但它不是全量的,因为只能操作一部分数据。能够单元化的系统,很容易在多机房中部署,因为可以轻易的把几个单元部署在一个机房,而把另外几个部署在其他机房。借由在业务入口处设置一个流量调配器,可以调整业务流量在单元之间的比例。
所以 SOFARegistry 考虑的就是在 IDC 私网环境中如何进行节点间通信。高吞吐、高并发的通信,数量众多的连接管理(C10K 问题),便捷的升级机制,兼容性保障,灵活的线程池模型运用,细致的异常处理与日志埋点等,这些功能都需要在通信协议和实现框架上做文章。
服务器也有若干配置需求,这用简单的http协议即可。
在这种内网单元化场景下,能够想到的问题点如下:
对于这种高性能,高并发的场景,在Java体系下,必然选择非阻塞IO复用,那么自然选择基于Netty进行开发。
阿里就是借助 SOFABolt 通信框架,实现基于TCP长连接的节点判活与推模式的变更推送,服务上下线通知时效性在秒级以内。
sofa-bolt是蚂蚁开源的一款基于Netty的网络通信框架。在Netty的基础上对网络编程常见问题进行了一层简单封装,让中间件开发者更关注于中间件产品本身。
大体功能为:
SOFABolt可以理解为Netty的最佳实践,并额外进行了一些优化工作。
SOFABolt框架我们后续可能会专门有系列进行分析,目前认为基于SOFABolt此可以满足我们需求,所以我们会简单介绍SOFABolt,重点在于如何使用以及业务实现。
在确定了采用SOFABolt之后,之前提到的问题点就基本被SOFABolt解决了,所以我们暂时能想到的其他问题大致如下:
我们提前剧透,即从逻辑上看,阿里提供了两个层级的封装
从连接角度看,阿里实现了基于 netty.channel.Channel 的封装,从下往上看是:
从应用角度看,阿里实现了Server,Client层次的封装,从下往上看是:
具体逻辑大致如下:
+---------------------+ +---------------------+ | | | | | DataNodeExchanger | | MetaNodeExchanger | | | | | +----------------+----+ +--------+------------+ | | +-----------+ +-------------+ | | v v +---------+---------------------+--------+ | BoltExchange | | +------------------------------------+ | | | | | | | Map| | | | | | | | ConcurrentHashMap| | | | | | | +------------------------------------+ | +----------------------------------------+ | | | | | | v v +-----------------------+----------+ +---------+---------------------------------+ | BoltClient | | BoltServer | | | | | | +------------------------------+ | | +---------------------------------------+ | | | remoting.rpc.RpcClient | | | | remoting.rpc.RpcServer | | | | +-------------------------+ | | | | | | | | | ConnectionEventHandler | | | | | Mapchannels | | | | | | | | | | | | | | | ConnectionFactory | | | | | ListchannelHandlers | | | | +-------------------------+ | | | | | | | +------------------------------+ | | +---------------------------------------+ | +----------------------------------+ +-------------------------------------------+ | | | | | | | +---------------------------+ | | v | | v | +---+------------+ <--------------+ +-----v--------------+--------------+ | | ChannelHandler | | BoltChannel | | +----------------+ | | | | +------------------------------+ | | | |com.alipay.remoting.Connection| | | | +------------------------------+ | | +-----------------------------------+ | | v v +---+-------------+ +---------+------------+ | CallbackHandler | | netty.channel.Channel| +-----------------+ +----------------------+
SOFARegistry 对网络基础功能做了封装,也对外提供了API。以下是封装模块以及对外接口 registry-remoting-api。
├── CallbackHandler.java ├── Channel.java ├── ChannelHandler.java ├── Client.java ├── Endpoint.java ├── RemotingException.java ├── Server.java └── exchange ├── Exchange.java ├── NodeExchanger.java ├── RequestException.java └── message ├── Request.java └── Response.java
其中比较关键的是四个接口:Server,Client,Exchange,Channel,因此这些就是网络封装的最基本概念。
Channel 这个概念比较普遍,代表了IO源与目标打开的连接。我们先以Java的Channel为例来进行说明。
Java 的Channel 由java.nio.channels包定义的,Channel表示IO源与目标打开的连接,Channel类似于传统的“流”,只不过Channel本身不能直接访问数据,Channel只能与Buffer进行交互。
Channel用于在字节缓冲区和位于通道另一侧的实体(通常是一个文件或套接字)之间有效地传输数据。通道是访问IO服务的导管,通过通道,我们可以以最小的开销来访问操作系统的I/O服务;顺便说下,缓冲区是通道内部发送数据和接收数据的端点。
由java.nio.channels包定义的,Channel表示IO源与目标打开的连接,Channel类似于传统的“流”,只不过Channel本身不能直接访问数据,Channel只能与Buffer进行交互。通道主要用于传输数据,从缓冲区的一侧传到另一侧的实体(如文件、套接字...),反之亦然;通道是访问IO服务的导管,通过通道,我们可以以最小的开销来访问操作系统的I/O服务;顺便说下,缓冲区是通道内部发送数据和接收数据的端点。
从SOFARegistry的Channel定义可以看出其基本功能主要是属性相关功能。
public interface Channel { InetSocketAddress getRemoteAddress(); InetSocketAddress getLocalAddress(); boolean isConnected(); Object getAttribute(String key); void setAttribute(String key, Object value); WebTarget getWebTarget(); void close(); }
Server是服务器对应的封装,其基本功能由定义可知,主要是基于Channel发送功能。
public interface Server extends Endpoint { boolean isOpen(); CollectiongetChannels(); Channel getChannel(InetSocketAddress remoteAddress); Channel getChannel(URL url); void close(Channel channel); int getChannelCount(); Object sendSync(final Channel channel, final Object message, final int timeoutMillis); void sendCallback(final Channel channel, final Object message, CallbackHandler callbackHandler,final int timeoutMillis); }
Client是客户端对应的封装,其基本功能也是基于Channel进行交互。
public interface Client extends Endpoint { Channel getChannel(URL url); Channel connect(URL url); Object sendSync(final URL url, final Object message, final int timeoutMillis); Object sendSync(final Channel channel, final Object message, final int timeoutMillis); void sendCallback(final URL url, final Object message, CallbackHandler callbackHandler, final int timeoutMillis); }
Exchange 作为 Client / Server 连接的进一步抽象,负责同类型server之间的连接。
public interface Exchange{ String DATA_SERVER_TYPE = "dataServer"; String META_SERVER_TYPE = "metaServer"; /** * connect same type server,one server ip one connection * such as different server on data server,serverOne and serverTwo,different type server must match different channelHandlers, * so we must connect by serverType,and get Client instance by serverType * @param serverType * @param serverUrl * @param channelHandlers */ Client connect(String serverType, URL serverUrl, T... channelHandlers); /** * connect same type server,one server ip one connection * such as different server on data server,serverOne and serverTwo,different type server must match different channelHandlers, * so we must connect by serverType,and get Client instance by serverType * @param serverType * @param connNum connection number per serverUrl * @param serverUrl * @param channelHandlers */ Client connect(String serverType, int connNum, URL serverUrl, T... channelHandlers); /** * bind server by server port in url parameter,one port must by same server type * @param url * @param channelHandlers */ Server open(URL url, T... channelHandlers); Client getClient(String serverType); Server getServer(Integer port); }
在建立连接中,可以设置一系列应对不同任务的 handler (称之为 ChannelHandler)。
这些 ChannelHandler 有的作为 Listener 用来处理连接事件,有的作为 Processor 用来处理各种指定的事件,比如服务信息数据变化、Subscriber 注册等事件。
public interface ChannelHandler{ enum HandlerType { LISENTER, PROCESSER } enum InvokeType { SYNC, ASYNC } /** * on channel connected. * @param channel */ void connected(Channel channel) throws RemotingException; /** * on channel disconnected. * * @param channel channel. */ void disconnected(Channel channel) throws RemotingException; /** * on message received. * @param channel channel. * @param message message. */ void received(Channel channel, T message) throws RemotingException; /** * on message reply. * * @param channel * @param message */ Object reply(Channel channel, T message) throws RemotingException; /** * on exception caught. * @param channel channel. * @param message message. * @param exception exception. * @throws RemotingException */ void caught(Channel channel, T message, Throwable exception) throws RemotingException; HandlerType getType(); /** * return processor request class name */ Class interest(); /** * Select Sync process by reply or Async process by received */ default InvokeType getInvokeType() { return InvokeType.SYNC; } /** * specify executor for processor handler */ default Executor getExecutor() { return null; } }
因此,网络基本对外接口如下:
+-------------------------------------------------------------------------+ |[registry+remoting+api] | | | | +----------+ +-------------+ | | | Exchange | |NodeExchanger| | | ++-----+--++ +----+--------+ | | | | | | | | | | +----------------------+ | | | | | | | | | +------+ v v v | | | +--+-----+ +-+-----++ | | | | Server | | Client | | | | +-----+--+ +-+----+-+ | | | | | | | | | +--------+ +-------+ | | | | | | | | | v v v v | | +----+-----------+ +-----+-+ ++--------------+ | | | ChannelHandler | |Channel| |CallbackHandler| | | +----------------+ +-------+ +---------------+ | +-------------------------------------------------------------------------+
因为SOFARegistry主要是基于SOFABolt,没法绕开,所以我们需要首先简单介绍SOFABolt。
Bolt是基于Netty,所以要先说明Netty Channel。
在Netty框架中,Channel是其中核心概念之一,是Netty网络通信的主体,由它负责同对端进行网络通信、注册和数据操作等功能。
Netty对Jdk原生的ServerSocketChannel
进行了封装和增强封装成了NioXXXChannel
, 相对于原生的JdkChannel, Netty的Channel增加了如下的组件。
根据服务端和客户端,Channel可以分成两类:
NioServerSocketChannel
NioSocketChannel
其实inbound和outbound分别用于标识 Context 所对应的handler的类型, 在Netty中事件可以分为Inbound和Outbound事件,在ChannelPipeline的类注释中,有如下图示:
* * I/O Request * via {@link Channel} or * {@link ChannelHandlerContext} * | * +---------------------------------------------------+---------------+ * | ChannelPipeline | | * | \|/ | * | +---------------------+ +-----------+----------+ | * | | Inbound Handler N | | Outbound Handler 1 | | * | +----------+----------+ +-----------+----------+ | * | /|\ | | * | | \|/ | * | +----------+----------+ +-----------+----------+ | * | | Inbound Handler N-1 | | Outbound Handler 2 | | * | +----------+----------+ +-----------+----------+ | * | /|\ . | * | . . | * | ChannelHandlerContext.fireIN_EVT() ChannelHandlerContext.OUT_EVT()| * | [ method call] [method call] | * | . . | * | . \|/ | * | +----------+----------+ +-----------+----------+ | * | | Inbound Handler 2 | | Outbound Handler M-1 | | * | +----------+----------+ +-----------+----------+ | * | /|\ | | * | | \|/ | * | +----------+----------+ +-----------+----------+ | * | | Inbound Handler 1 | | Outbound Handler M | | * | +----------+----------+ +-----------+----------+ | * | /|\ | | * +---------------+-----------------------------------+---------------+ * | \|/ * +---------------+-----------------------------------+---------------+ * | | | | * | [ Socket.read() ] [ Socket.write() ] | * | | * | Netty Internal I/O Threads (Transport Implementation) | * +-------------------------------------------------------------------+
Connection其删减版定义如下,可以看到其主要成员就是 Netty channel 实例:
public class Connection { private Channel channel; private final ConcurrentHashMapinvokeFutureMap = new ConcurrentHashMap(4); /** Attribute key for connection */ public static final AttributeKeyCONNECTION = AttributeKey.valueOf("connection"); /** Attribute key for heartbeat count */ public static final AttributeKeyHEARTBEAT_COUNT = AttributeKey.valueOf("heartbeatCount"); /** Attribute key for heartbeat switch for each connection */ public static final AttributeKeyHEARTBEAT_SWITCH = AttributeKey.valueOf("heartbeatSwitch"); /** Attribute key for protocol */ public static final AttributeKeyPROTOCOL = AttributeKey.valueOf("protocol"); /** Attribute key for version */ public static final AttributeKeyVERSION = AttributeKey.valueOf("version"); private Url url; private final ConcurrentHashMapid2PoolKey = new ConcurrentHashMap(256); private SetpoolKeys = new ConcurrentHashSet(); private final ConcurrentHashMapattributes = new ConcurrentHashMap(); }
Connection的辅助类很多,摘录如下:
另外,需要注意的点如下:
不论是服务端还是客户端,其实本质都在做一件事情:创建 ConnectionEventHandler 实例并添加到 Netty 的 pipeline 中,基本原理是:
ConnectionEventHandler处理两类事件
之后当有 ConnectionEvent 触发时(无论是 Netty 定义的事件被触发,还是 SOFABolt 定义的事件被触发),ConnectionEventHandler 会通过异步线程执行器通知 ConnectionEventListener,ConnectionEventListener 将消息派发给具体的 ConnectionEventProcessor 实现类。
RpcServer实现了一个Server所必须的基本机制,可以直接使用,比如:
其中,需要说明的是:
RpcHandler -> RpcCommandHandler -> RpcRequestProcessor -> UserProcessor
RpcHandler -> RpcCommandHandler -> RpcResponseProcessor
RpcHandler -> RpcCommandHandler -> RpcHeartBeatProcessor
具体定义如下:
public class RpcServer extends AbstractRemotingServer { /** server bootstrap */ private ServerBootstrap bootstrap; /** channelFuture */ private ChannelFuture channelFuture; /** connection event handler */ private ConnectionEventHandler connectionEventHandler; /** connection event listener */ private ConnectionEventListener connectionEventListener = new ConnectionEventListener(); /** user processors of rpc server */ private ConcurrentHashMap<String, UserProcessor> userProcessors = new ConcurrentHashMap<String, UserProcessor>( 4); /** boss event loop group, boss group should not be daemon, need shutdown manually*/ private final EventLoopGroup bossGroup = NettyEventLoopUtil.newEventLoopGroup(NamedThreadFactory("Rpc-netty-server-boss",false)); /** worker event loop group. Reuse I/O worker threads between rpc servers. */ private static final EventLoopGroup workerGroup = NettyEventLoopUtil.newEventLoopGroup(Runtime.getRuntime().availableProcessors() * 2,new NamedThreadFactory("Rpc-netty-server-worker",true)); /** address parser to get custom args */ private RemotingAddressParser addressParser; /** connection manager */ private DefaultConnectionManager connectionManager; /** rpc remoting */ protected RpcRemoting rpcRemoting; /** rpc codec */ private Codec codec = new RpcCodec(); }
RpcClient主要机制如下:
整个 Connection 设计的核心
);具体代码如下:
public class RpcClient extends AbstractConfigurableInstance { private ConcurrentHashMap<String, UserProcessor> userProcessors = new ConcurrentHashMap<String, UserProcessor>(); /** connection factory */ private ConnectionFactory connectionFactory = new RpcConnectionFactory(userProcessors, this); /** connection event handler */ private ConnectionEventHandler connectionEventHandler = new RpcConnectionEventHandler(switches()); /** reconnect manager */ private ReconnectManager reconnectManager; /** connection event listener */ private ConnectionEventListener connectionEventListener = new ConnectionEventListener(); /** address parser to get custom args */ private RemotingAddressParser addressParser; /** connection select strategy */ private ConnectionSelectStrategy connectionSelectStrategy = new RandomSelectStrategy( switches()); /** connection manager */ private DefaultConnectionManager connectionManager = new DefaultConnectionManager(connectionSelectStrategy,connectionFactory,connectionEventHandler,connectionEventListener,switches()); /** rpc remoting */ protected RpcRemoting rpcRemoting; /** task scanner */ private RpcTaskScanner taskScanner = new RpcTaskScanner(); /** connection monitor */ private DefaultConnectionMonitor connectionMonitor; /** connection monitor strategy */ private ConnectionMonitorStrategy monitorStrategy; }
针对上述提到的基础封装,系统针对Bolt和Http都进行了实现。以下是SOFABolt的对应封装 registry-remoting-bolt。
├── AsyncUserProcessorAdapter.java ├── BoltChannel.java ├── BoltChannelUtil.java ├── BoltClient.java ├── BoltServer.java ├── ConnectionEventAdapter.java ├── SyncUserProcessorAdapter.java └── exchange └── BoltExchange.java
BoltChannel 主要是封装了com.alipay.remoting.Connection,而com.alipay.remoting.Connection又封装了io.netty.channel.Channel。
感觉Channel封装的不够彻底,还是把Connection暴露出来了,以此得到本地IP,port,远端IP,port等。
public class BoltChannel implements Channel { private Connection connection; private AsyncContext asyncContext; private BizContext bizContext; private final Mapattributes = new ConcurrentHashMap<>(); }
BoltServer 封装了 com.alipay.remoting.rpc.RpcServer。
在初始化的时候,调用 addConnectionEventProcessor,registerUserProcessor等把 handler 注册到 RpcServer。
用 ConcurrentHashMap 记录了所有连接到本Server 的Channel,key是IP:port。
public class BoltServer implements Server { /** * accoding server port * can not be null */ private final URL url; private final ListchannelHandlers; /** * bolt server */ private RpcServer boltServer; /** * started status */ private AtomicBoolean isStarted = new AtomicBoolean(false); private Mapchannels = new ConcurrentHashMap<>(); private AtomicBoolean initHandler = new AtomicBoolean(false); }
其主要功能如下,基本就是调用Bolt的功能:
@Override public Object sendSync(Channel channel, Object message, int timeoutMillis) { if (channel != null && channel.isConnected()) { Url boltUrl = null; try { boltUrl = new Url(channel.getRemoteAddress().getAddress().getHostAddress(), channel .getRemoteAddress().getPort()); return boltServer.invokeSync(boltUrl, message, timeoutMillis); } } @Override public void sendCallback(Channel channel, Object message, CallbackHandler callbackHandler, int timeoutMillis) { if (channel != null && channel.isConnected()) { Url boltUrl = null; try { boltUrl = new Url(channel.getRemoteAddress().getAddress().getHostAddress(), channel .getRemoteAddress().getPort()); boltServer.invokeWithCallback(boltUrl, message, new InvokeCallback() { @Override public void onResponse(Object result) { callbackHandler.onCallback(channel, result); } @Override public void onException(Throwable e) { callbackHandler.onException(channel, e); } @Override public Executor getExecutor() { return callbackHandler.getExecutor(); } }, timeoutMillis); return; } }
主要就是封装了 com.alipay.remoting.rpc.RpcClient。
在初始化的时候,调用 addConnectionEventProcessor,registerUserProcessor 等把 handler 注册到 RpcClient。
public class BoltClient implements Client { private RpcClient rpcClient; private AtomicBoolean closed = new AtomicBoolean(false); private int connectTimeout = 2000; private final int connNum; }
主要函数如下:
@Override public Channel connect(URL url) { try { Connection connection = getBoltConnection(rpcClient, url); BoltChannel channel = new BoltChannel(); channel.setConnection(connection); return channel; } } protected Connection getBoltConnection(RpcClient rpcClient, URL url) throws RemotingException { Url boltUrl = createBoltUrl(url); try { Connection connection = rpcClient.getConnection(boltUrl, connectTimeout); if (connection == null || !connection.isFine()) { if (connection != null) { connection.close(); } } return connection; } } @Override public Object sendSync(URL url, Object message, int timeoutMillis) { return rpcClient.invokeSync(createBoltUrl(url), message, timeoutMillis); } @Override public Object sendSync(Channel channel, Object message, int timeoutMillis) { if (channel != null && channel.isConnected()) { BoltChannel boltChannel = (BoltChannel) channel; return rpcClient.invokeSync(boltChannel.getConnection(), message, timeoutMillis); } } @Override public void sendCallback(URL url, Object message, CallbackHandler callbackHandler, int timeoutMillis) { try { Connection connection = getBoltConnection(rpcClient, url); BoltChannel channel = new BoltChannel(); channel.setConnection(connection); rpcClient.invokeWithCallback(connection, message, new InvokeCallback() { @Override public void onResponse(Object result) { callbackHandler.onCallback(channel, result); } @Override public void onException(Throwable e) { callbackHandler.onException(channel, e); } @Override public Executor getExecutor() { return callbackHandler.getExecutor(); } }, timeoutMillis); return; } }
BoltExchange 的主要作用是维护了Client和Server两个ConcurrentHashMap。就是所有的Clients和Servers。
这里进行了第一层连接维护。
Map
String DATA_SERVER_TYPE = "dataServer"; String META_SERVER_TYPE = "metaServer";
就是说,假如 Data Server 使用了BoltExchange,则其内部只有两个BoltClient,这两个Client分别被 同 dataServer 和 metaServer 的交互 所复用。
ConcurrentHashMap
public class BoltExchange implements Exchange{ private Mapclients = new ConcurrentHashMap<>(); private ConcurrentHashMapserverMap = new ConcurrentHashMap<>(); @Override public Client connect(String serverType, URL serverUrl, ChannelHandler... channelHandlers) { return this.connect(serverType, 1, serverUrl, channelHandlers); } @Override public Client connect(String serverType, int connNum, URL serverUrl, ChannelHandler... channelHandlers) { Client client = clients.computeIfAbsent(serverType, key -> newBoltClient(connNum, channelHandlers)); client.connect(serverUrl); return client; } @Override public Server open(URL url, ChannelHandler... channelHandlers) { BoltServer server = createBoltServer(url, channelHandlers); setServer(server, url); server.startServer(); return server; } @Override public Client getClient(String serverType) { return clients.get(serverType); } @Override public Server getServer(Integer port) { return serverMap.get(port); } /** * add server into serverMap * @param server * @param url */ public void setServer(Server server, URL url) { serverMap.putIfAbsent(url.getPort(), server); } private BoltClient newBoltClient(int connNum, ChannelHandler[] channelHandlers) { BoltClient boltClient = createBoltClient(connNum); boltClient.initHandlers(Arrays.asList(channelHandlers)); return boltClient; } protected BoltClient createBoltClient(int connNum) { return new BoltClient(connNum); } protected BoltServer createBoltServer(URL url, ChannelHandler[] channelHandlers) { return new BoltServer(url, Arrays.asList(channelHandlers)); } }
内部会根据不同的server type从 boltExchange 取出对应Bolt Client。
String DATA_SERVER_TYPE = "dataServer"; String META_SERVER_TYPE = "metaServer";
用如下方法put Client。
Client client = clients.computeIfAbsent(serverType, key -> newBoltClient(connNum, channelHandlers));
Bolt用如下办法获取Client
Client client = boltExchange.getClient(Exchange.DATA_SERVER_TYPE); Client client = boltExchange.getClient(Exchange.META_SERVER_TYPE);
得到对应的Client之后,然后分别根据参数的url建立Channel,或者发送请求。
Channel channel = client.getChannel(url); client.sendCallback(request.getRequestUrl()....;
此时大致逻辑如下:
+----------------------------------------+ | BoltExchange | | +------------------------------------+ | | | | | | | Map| | | | | | | | ConcurrentHashMap| | | | | | | +------------------------------------+ | +----------------------------------------+ | | | | | | v v +-----------------------+----------+ +---------+---------------------------------+ | BoltClient | | BoltServer | | | | | | +------------------------------+ | | +---------------------------------------+ | | | remoting.rpc.RpcClient | | | | remoting.rpc.RpcServer | | | | +-------------------------+ | | | | | | | | | ConnectionEventHandler | | | | | Mapchannels | | | | | | | | | | | | | | | ConnectionFactory | | | | | ListchannelHandlers | | | | +-------------------------+ | | | | | | | +------------------------------+ | | +---------------------------------------+ | +----------------------------------+ +-------------------------------------------+ | | | | | | | +---------------------------+ | | v | | v | +---+------------+ <--------------+ +-----v--------------+--------------+ | | ChannelHandler | | BoltChannel | | +----------------+ | | | | +------------------------------+ | | | |com.alipay.remoting.Connection| | | | +------------------------------+ | | +-----------------------------------+ | | v v +---+-------------+ +---------+------------+ | CallbackHandler | | netty.channel.Channel| +-----------------+ +----------------------+
以下是基于Jetty封装的Http模块 registry-remoting-http。
├── JerseyChannel.java ├── JerseyClient.java ├── JerseyJettyServer.java ├── exchange │ └── JerseyExchange.java └── jetty └── server ├── HttpChannelOverHttpCustom.java ├── HttpConnectionCustom.java └── HttpConnectionCustomFactory.java
因为 Http 服务不是SOFTRegistry主要功能,所以此处略去。
我们从目录结构可以大致看出功能模块划分。
├── remoting │ ├── DataNodeExchanger.java │ ├── MetaNodeExchanger.java │ ├── dataserver │ │ ├── DataServerConnectionFactory.java │ │ ├── DataServerNodeFactory.java │ │ ├── GetSyncDataHandler.java │ │ ├── SyncDataCallback.java │ │ ├── handler │ │ │ ├── DataSyncServerConnectionHandler.java │ │ │ ├── FetchDataHandler.java │ │ │ ├── NotifyDataSyncHandler.java │ │ │ ├── NotifyFetchDatumHandler.java │ │ │ ├── NotifyOnlineHandler.java │ │ │ └── SyncDataHandler.java │ │ └── task │ │ ├── AbstractTask.java │ │ ├── ConnectionRefreshTask.java │ │ └── RenewNodeTask.java │ ├── handler │ │ ├── AbstractClientHandler.java │ │ └── AbstractServerHandler.java │ ├── metaserver │ │ ├── handler │ │ ├── provideData │ │ └── task │ └── sessionserver │ ├── disconnect │ ├── forward │ └── handler
因为每个大功能模块大同小异,所以我们下面主要以 dataserver 目录下为主,兼顾 metaserver 和 sessionserver目录下的特殊部分。
Data Server 比较复杂,即是服务器也是客户端,所以分别做了不同的组件来抽象这两个概念。
DataServerBootstrap#start 方法,用于启动一系列的初始化服务。在此函数中,启动了若干网络服务,用来提供 对外接口。
public void start() { try { openDataServer(); openDataSyncServer(); openHttpServer(); startRaftClient(); } }
各 Handler 具体作用如图所示:
DataServer 和 DataSyncServer 是 Bolt Server,是节点间的 bolt 通信组件,其中:
具体代码如下:
private void openDataServer() { try { if (serverForSessionStarted.compareAndSet(false, true)) { server = boltExchange.open(new URL(NetUtil.getLocalAddress().getHostAddress(), dataServerConfig.getPort()), serverHandlers .toArray(new ChannelHandler[serverHandlers.size()])); } } } private void openDataSyncServer() { try { if (serverForDataSyncStarted.compareAndSet(false, true)) { dataSyncServer = boltExchange.open(new URL(NetUtil.getLocalAddress() .getHostAddress(), dataServerConfig.getSyncDataPort()), serverSyncHandlers .toArray(new ChannelHandler[serverSyncHandlers.size()])); } } }
这两个server的handlers有部分重复,怀疑开发者在做功能迁移。
@Bean(name = "serverSyncHandlers") public CollectionserverSyncHandlers() { Collectionlist = new ArrayList<>(); list.add(getDataHandler()); list.add(publishDataProcessor()); list.add(unPublishDataHandler()); list.add(notifyFetchDatumHandler()); list.add(notifyOnlineHandler()); list.add(syncDataHandler()); list.add(dataSyncServerConnectionHandler()); return list; } @Bean(name = "serverHandlers") public CollectionserverHandlers() { Collectionlist = new ArrayList<>(); list.add(getDataHandler()); list.add(clientOffHandler()); list.add(getDataVersionsHandler()); list.add(publishDataProcessor()); list.add(sessionServerRegisterHandler()); list.add(unPublishDataHandler()); list.add(dataServerConnectionHandler()); list.add(renewDatumHandler()); list.add(datumSnapshotHandler()); return list; }
这里用DataSyncServer做具体说明。
启动 DataSyncServer 时,注册了如下几个 handler 用于处理 bolt 请求 :
DayaSyncServer 注册的 Handler 如下:
该 Handler 主要用于数据的获取,当一个请求过来时,会通过请求中的 DataCenter 和 DataInfoId 获取当前 DataServer 节点存储的相应数据。
当有数据发布者 publisher 上下线时,会分别触发 publishDataProcessor 或 unPublishDataHandler ,Handler 会往 dataChangeEventCenter 中添加一个数据变更事件,用于异步地通知事件变更中心数据的变更。事件变更中心收到该事件之后,会往队列中加入事件。此时 dataChangeEventCenter 会根据不同的事件类型异步地对上下线数据进行相应的处理。
与此同时,DataChangeHandler 会把这个事件变更信息通过 ChangeNotifier 对外发布,通知其他节点进行数据同步。
这是一个数据拉取请求,当该 Handler 被触发时,通知当前 DataServer 节点进行版本号对比,若请求中数据的版本号高于当前节点缓存中的版本号,则会进行数据同步操作,保证数据是最新的。
这是一个 DataServer 上线通知请求 Handler,当其他节点上线时,会触发该 Handler,从而当前节点在缓存中存储新增的节点信息。用于管理节点状态,究竟是 INITIAL 还是 WORKING 。
节点间数据同步 Handler,该 Handler 被触发时,会通过版本号进行比对,若当前 DataServer 所存储数据版本号含有当前请求版本号,则会返回所有大于当前请求数据版本号的所有数据,便于节点间进行数据同步。
连接管理 Handler,当其他 DataServer 节点与当前 DataServer 节点连接时,会触发 connect 方法,从而在本地缓存中注册连接信息,而当其他 DataServer 节点与当前节点断连时,则会触发 disconnect 方法,从而删除缓存信息,进而保证当前 DataServer 节点存储有所有与之连接的 DataServer 节点。
dataSyncServer 调用链如下:
在 DataServerBootstrap 中有
private void openDataSyncServer() { try { if (serverForDataSyncStarted.compareAndSet(false, true)) { dataSyncServer = boltExchange.open(new URL(NetUtil.getLocalAddress() .getHostAddress(), dataServerConfig.getSyncDataPort()), serverSyncHandlers .toArray(new ChannelHandler[serverSyncHandlers.size()])); } } }
然后有
public class BoltExchange implements Exchange{ @Override public Server open(URL url, ChannelHandler... channelHandlers) { BoltServer server = createBoltServer(url, channelHandlers); setServer(server, url); server.startServer(); return server; } protected BoltServer createBoltServer(URL url, ChannelHandler[] channelHandlers) { return new BoltServer(url, Arrays.asList(channelHandlers)); } }
BoltServer启动如下,并且用户自定义了UserProcessor。
public class BoltServer implements Server { public BoltServer(URL url, ListchannelHandlers) { this.channelHandlers = channelHandlers; this.url = url; } public void startServer() { if (isStarted.compareAndSet(false, true)) { boltServer = new RpcServer(url.getPort(), true); initHandler(); boltServer.start(); } } private void initHandler() { if (initHandler.compareAndSet(false, true)) { boltServer.addConnectionEventProcessor(ConnectionEventType.CONNECT, new ConnectionEventAdapter(ConnectionEventType.CONNECT, getConnectionEventHandler(), this)); boltServer.addConnectionEventProcessor(ConnectionEventType.CLOSE, new ConnectionEventAdapter(ConnectionEventType.CLOSE, getConnectionEventHandler(), this)); boltServer.addConnectionEventProcessor(ConnectionEventType.EXCEPTION, new ConnectionEventAdapter(ConnectionEventType.EXCEPTION, getConnectionEventHandler(), this)); registerUserProcessorHandler(); } } //这里分了同步和异步 private void registerUserProcessorHandler() { if (channelHandlers != null) { for (ChannelHandler channelHandler : channelHandlers) { if (HandlerType.PROCESSER.equals(channelHandler.getType())) { if (InvokeType.SYNC.equals(channelHandler.getInvokeType())) { boltServer.registerUserProcessor(new SyncUserProcessorAdapter( channelHandler)); } else { boltServer.registerUserProcessor(new AsyncUserProcessorAdapter( channelHandler)); } } } } } }
HttpServer 是用于控制的Http 通信组件以及其配置,提供一系列 REST 接口,用于 dashboard 管理、数据查询等;
private void openHttpServer() { try { if (httpServerStarted.compareAndSet(false, true)) { bindResourceConfig(); httpServer = jerseyExchange.open( new URL(NetUtil.getLocalAddress().getHostAddress(), dataServerConfig .getHttpServerPort()), new ResourceConfig[] { jerseyResourceConfig }); } } }
RaftClient 是基于Raft协议的客户端,用来基于raft协议获取meta server leader信息。
private void startRaftClient() { metaServerService.startRaftClient(); eventCenter.post(new MetaServerChangeEvent(metaServerService.getMetaServerMap())); }
具体DefaultMetaServiceImpl中实现代码如下:
@Override public void startRaftClient() { try { if (clientStart.compareAndSet(false, true)) { String serverConf = getServerConfig(); raftClient = new RaftClient(getGroup(), serverConf); raftClient.start(); } } }
功能模块最后逻辑大致如下:
+-> getDataHandler | +-> publishDataProcessor | +--> unPublishDataHandler | +---------------+ +--> notifyFetchDatumHandler +----> | DataSyncServer+->+ | +---------------+ +--> notifyOnlineHandler | | | +--> syncDataHandler | +-------------+ | +-------------------+ +----> | HttpServer | +-> dataSyncServerConnectionHandler |DataServerBootstrap+-->+ +-------------+ +-------------------+ | | | +------------+ +-> getDataHandler +-----> | RaftClient | | | +------------+ +--> clientOffHandler | | | +-------------+ +--> getDataVersionsHandler +-----> | DataServer +-->+ +-------------+ +--> publishDataProcessor | +--> sessionServerRegisterHandler | +--> unPublishDataHandler | +--> dataServerConnectionHandler | +--> renewDatumHandler | +-> datumSnapshotHandler
Exchange 作为 Client / Server 连接的抽象,负责节点之间的连接。
Data Server 这里主要是 DataNodeExchanger 和 MetaNodeExchanger,用来:
封装 BoltExchange
把 Bolt Client 和 Bolt Channel 进行抽象。
提供可以直接使用的网络API,比如ForwardServiceImpl,GetSyncDataHandler这些散落的Bean可以直接使用DataNodeExchanger来做网络交互。
从对外接口中可以看出来,
这里有两个问题:
可能是因为Session Server可能会很多,没必要保存Bolt client和Server,但是Session对应的有ConnectionFactory。ConectionFactory 是较低层次的封装,下文会讲解。
我们以 DataNodeExchanger 为例。
把所有 Data Server 相关的 非 “Server,Client概念 直接强相关” 的网络操作统一集中在这里。
可以看到,主要是对 boltExchange 进行了更高层次的封装。
具体代码如下:
import com.alipay.sofa.registry.remoting.Channel; import com.alipay.sofa.registry.remoting.ChannelHandler; import com.alipay.sofa.registry.remoting.Client; import com.alipay.sofa.registry.remoting.exchange.Exchange; import com.alipay.sofa.registry.remoting.exchange.NodeExchanger; import com.alipay.sofa.registry.remoting.exchange.message.Request; import com.alipay.sofa.registry.remoting.exchange.message.Response; public class DataNodeExchanger implements NodeExchanger { @Autowired private Exchange boltExchange; @Autowired private DataServerConfig dataServerConfig; @Resource(name = "dataClientHandlers") private CollectiondataClientHandlers; @Override public Response request(Request request) { Client client = boltExchange.getClient(Exchange.DATA_SERVER_TYPE); if (null != request.getCallBackHandler()) { client.sendCallback(request.getRequestUrl(), request.getRequestBody(), request.getCallBackHandler(), request.getTimeout() != null ? request.getTimeout() : dataServerConfig.getRpcTimeout()); return () -> Response.ResultStatus.SUCCESSFUL; } else { final Object result = client.sendSync(request.getRequestUrl(), request.getRequestBody(), dataServerConfig.getRpcTimeout()); return () -> result; } } public Channel connect(URL url) { Client client = boltExchange.getClient(Exchange.DATA_SERVER_TYPE); if (client == null) { synchronized (this) { client = boltExchange.getClient(Exchange.DATA_SERVER_TYPE); if (client == null) { client = boltExchange.connect(Exchange.DATA_SERVER_TYPE, url, dataClientHandlers.toArray(new ChannelHandler[dataClientHandlers.size()])); } } } Channel channel = client.getChannel(url); if (channel == null) { synchronized (this) { channel = client.getChannel(url); if (channel == null) { channel = client.connect(url); } } } return channel; } }
上面代码中使用了 dataClientHandlers,其是BoltClient所使用的,Server会对此进行推送,这两个Handler会处理。
@Bean(name = "dataClientHandlers") public CollectiondataClientHandlers() { Collectionlist = new ArrayList<>(); list.add(notifyDataSyncHandler()); list.add(fetchDataHandler()); return list; }
此时具体网络概念如下:
+-------------------+ | ForwardServiceImpl| +-----------------+ +-------------------+ | | +--------------------+ | +-------------------+ | GetSyncDataHandler | +-----------------------> | DataNodeExchanger | +--------------------+ | +-------+-----------+ | | +----------------------------------+ | | | LocalDataServerChangeEventHandler| +--+ | +----------------------------------+ | | | | +------------------------------+ | v | DataServerChangeEventHandler +--------+ +-----+--------+ +------------------------------+ | BoltExchange | +-----+--------+ | +-----------------+ | | JerseyExchange | | +-------+---------+ +-------+-------+ | | | | | | v v v +--------+----------+ +------+-----+ +-----+------+ | JerseyJettyServer | | BoltServer | | BoltServer | +-------------------+ +------------+ +------------+ httpServer server dataSyncServer
AbstractServerHandler 和 AbstractClientHandler 对 com.alipay.sofa.registry.remoting.ChannelHandler
进行了实现。
这里需要结合SOFABolt来讲解。
以 RpcServer 为例,SOFABolt在这里的使用是两种处理器:
sendResponse
方法返回处理结果;Handler主要代码分别如下:
public abstract class AbstractServerHandlerimplements ChannelHandler{ protected NodeType getConnectNodeType() { return NodeType.DATA; } @Override public Object reply(Channel channel, T request) { try { logRequest(channel, request); checkParam(request); return doHandle(channel, request); } catch (Exception e) { return buildFailedResponse(e.getMessage()); } } } public abstract class AbstractClientHandlerimplements ChannelHandler{ @Override public Object reply(Channel channel, T request) { try { logRequest(channel, request); checkParam(request); return doHandle(channel, request); } catch (Exception e) { return buildFailedResponse(e.getMessage()); } } }
系统可以据此实现各种派生类。
这里需要注意的是:ChannelHandler之中分成两类,分别如下,分别对应了RpcServer中的listener和userProcessor。
enum HandlerType { LISENTER, PROCESSER }
以serverSyncHandlers为例,只有dataSyncServerConnectionHandler是Listener,其余都是Processor。
这也符合常理,因为消息响应函数就是应该只有一个。
@Bean(name = "serverSyncHandlers") public CollectionserverSyncHandlers() { Collectionlist = new ArrayList<>(); list.add(getDataHandler()); list.add(publishDataProcessor()); list.add(unPublishDataHandler()); list.add(notifyFetchDatumHandler()); list.add(notifyOnlineHandler()); list.add(syncDataHandler()); list.add(dataSyncServerConnectionHandler()); 只有这个是Listener,其余都是Processor。 return list; }
总结如下:
+-> getDataHandler | +-> publishDataProcessor | +--> unPublishDataHandler +-------------------+ | | serverSyncHandlers+-------> notifyFetchDatumHandler +-------------------+ | +--> notifyOnlineHandler | +--> syncDataHandler | +-> dataSyncServerConnectionHandler(Listener)
在启动时,会使用serverSyncHandlers完成BoltServer的启动。
private void openDataSyncServer() { try { if (serverForDataSyncStarted.compareAndSet(false, true)) { dataSyncServer = boltExchange.open(new URL(NetUtil.getLocalAddress() .getHostAddress(), dataServerConfig.getSyncDataPort()), serverSyncHandlers .toArray(new ChannelHandler[serverSyncHandlers.size()])); } } }
BoltExchange中有:
@Override public Server open(URL url, ChannelHandler... channelHandlers) { BoltServer server = createBoltServer(url, channelHandlers); setServer(server, url); server.startServer(); return server; } protected BoltServer createBoltServer(URL url, ChannelHandler[] channelHandlers) { return new BoltServer(url, Arrays.asList(channelHandlers)); }
在BoltServer中有如下代码,主要是设置Handler。
public void startServer() { if (isStarted.compareAndSet(false, true)) { try { boltServer = new RpcServer(url.getPort(), true); initHandler(); boltServer.start(); } } } private void initHandler() { if (initHandler.compareAndSet(false, true)) { boltServer.addConnectionEventProcessor(ConnectionEventType.CONNECT, new ConnectionEventAdapter(ConnectionEventType.CONNECT, getConnectionEventHandler(), this)); boltServer.addConnectionEventProcessor(ConnectionEventType.CLOSE, new ConnectionEventAdapter(ConnectionEventType.CLOSE, getConnectionEventHandler(), this)); boltServer.addConnectionEventProcessor(ConnectionEventType.EXCEPTION, new ConnectionEventAdapter(ConnectionEventType.EXCEPTION, getConnectionEventHandler(), this)); registerUserProcessorHandler(); } }
最终则是调用到RpcServer之中,注册了连接响应函数和用户定义函数。
/** * Add processor to process connection event. */ public void addConnectionEventProcessor(ConnectionEventType type, ConnectionEventProcessor processor) { this.connectionEventListener.addConnectionEventProcessor(type, processor); } /** * Use UserProcessorRegisterHelper{@link UserProcessorRegisterHelper} to help register user processor for server side. */ @Override public void registerUserProcessor(UserProcessor processor) { UserProcessorRegisterHelper.registerUserProcessor(processor, this.userProcessors); }
此时与SOFABolt逻辑如下:
+----------------------------+ | [RpcServer] | | | | | +--> EXCEPTION +--> DataSyncServerConnectionHandler | | | | connectionEventListener--->--->-CONNECT +----> DataSyncServerConnectionHandler | | | | | +--> CLOSE +-----> DataSyncServerConnectionHandler | | | | | | +--> EXCEPTION +--> DataSyncServerConnectionHandler | | | | connectionEventHandler +-->---> CONNECT +----> DataSyncServerConnectionHandler | | | | | +--> CLOSE +-----> DataSyncServerConnectionHandler | | | | | | +----> GetDataRequest +--------> getDataHandler | | | | | +----> PublishDataRequest +-----> publishDataProcessor | | | | userProcessors +----------------> UnPublishDataRequest +----> unPublishDataHandler | | | | | +----> NotifyOnlineRequest-----> notifyFetchDatumHandler | | | | | +----> NotifyOnlineRequest +-----> notifyOnlineHandler +----------------------------+ | +----> SyncDataRequest +-------> syncDataHandler
至此,我们把SOFARegistry网络封装和操作大致梳理了下。
从逻辑上看,阿里提供了两个层级的封装:
从连接角度看,阿里实现了基于 netty.channel.Channel 的封装,从下往上看是:
从应用角度看,阿里实现了Server,Client层次的封装,从下往上看是:
具体逻辑大致如下:
+---------------------+ +---------------------+ | | | | | DataNodeExchanger | | MetaNodeExchanger | | | | | +----------------+----+ +--------+------------+ | | +-----------+ +-------------+ | | v v +---------+---------------------+--------+ | BoltExchange | | +------------------------------------+ | | | | | | | Map| | | | | | | | ConcurrentHashMap| | | | | | | +------------------------------------+ | +----------------------------------------+ | | | | | | v v +-----------------------+----------+ +---------+---------------------------------+ | BoltClient | | BoltServer | | | | | | +------------------------------+ | | +---------------------------------------+ | | | remoting.rpc.RpcClient | | | | remoting.rpc.RpcServer | | | | +-------------------------+ | | | | | | | | | ConnectionEventHandler | | | | | Mapchannels | | | | | | | | | | | | | | | ConnectionFactory | | | | | ListchannelHandlers | | | | +-------------------------+ | | | | | | | +------------------------------+ | | +---------------------------------------+ | +----------------------------------+ +-------------------------------------------+ | | | | | | | +---------------------------+ | | v | | v | +---+------------+ <--------------+ +-----v--------------+--------------+ | | ChannelHandler | | BoltChannel | | +----------------+ | | | | +------------------------------+ | | | |com.alipay.remoting.Connection| | | | +------------------------------+ | | +-----------------------------------+ | | v v +---+-------------+ +---------+------------+ | CallbackHandler | | netty.channel.Channel| +-----------------+ +----------------------+
阿里这里封装的非常细致。因为SOFARegistry是比较繁杂的系统,所以把网络概念,功能做封装是相当有必要的。大家在日常开发中可能不用这么细致的封装,可以参考阿里的思路,自己做选择和裁剪即可。
https://timyang.net/architecture/cell-distributed-system/
SOFABolt 源码分析12 - Connection 连接管理设计
SOFABolt 源码分析2 - RpcServer 服务端启动的设计
SOFABolt 源码分析3 - RpcClient 客户端启动的设计
蚂蚁通信框架实践
sofa-bolt 远程调用
sofa-bolt学习
SOFABolt 设计总结 - 优雅简洁的设计之道
SofaBolt源码分析-服务启动到消息处理
SOFABolt 源码分析
SOFABolt 源码分析9 - UserProcessor 自定义处理器的设计
SOFARegistry 介绍
SOFABolt 源码分析13 - Connection 事件处理机制的设计