传输模块的初始化主要的在节点启动时的构造函数中完成的。
节点启动时,主要在构建函数中,进行通信模块的初始化。
protected Node( final Environment environment, Collection<Class<? extends Plugin>> classpathPlugins, boolean forbidPrivateIndexSettings) { logger = LogManager.getLogger(Node.class); final List<Closeable> resourcesToClose = new ArrayList<>(); // register everything we need to release in the case of an error boolean success = false; try { ... ... ... // 过滤出ActionPlugin插件列表作为参数传入 ActionModule构造函数,在ActionModule中会对TCP和HTTP的请求和处理类进行注册和绑定。 ActionModule actionModule = new ActionModule(false, settings, clusterModule.getIndexNameExpressionResolver(), settingsModule.getIndexScopedSettings(), settingsModule.getClusterSettings(), settingsModule.getSettingsFilter(), threadPool, pluginsService.filterPlugins(ActionPlugin.class), client, circuitBreakerService, usageService); ... final RestController restController = actionModule.getRestController(); // 过滤出NetworkPlugin插件列表作为参数传入 NetworkModule构造函数 final NetworkModule networkModule = new NetworkModule(settings, false, pluginsService.filterPlugins(NetworkPlugin.class), threadPool, bigArrays, pageCacheRecycler, circuitBreakerService, namedWriteableRegistry, xContentRegistry, networkService, restController); ... ... ... // 通过网络模块获取已经初始化的Transport final Transport transport = networkModule.getTransportSupplier().get(); Set<String> taskHeaders = Stream.concat( pluginsService.filterPlugins(ActionPlugin.class).stream().flatMap(p -> p.getTaskHeaders().stream()), Stream.of(Task.X_OPAQUE_ID) ).collect(Collectors.toSet()); // 基于网络模块的Transport构建TransportService final TransportService transportService = newTransportService(settings, transport, threadPool, networkModule.getTransportInterceptor(), localNodeFactory, settingsModule.getClusterSettings(), taskHeaders); final ResponseCollectorService responseCollectorService = new ResponseCollectorService(clusterService); // 基于TransportService构建searchTransportService服务 final SearchTransportService searchTransportService = new SearchTransportService(transportService, SearchExecutionStatsCollector.makeWrapper(responseCollectorService)); final Consumer<Binder> httpBind; final HttpServerTransport httpServerTransport; // 通过网络模块获取已经初始化的Transport,HTTP还可以关闭? if (networkModule.isHttpEnabled()) { httpServerTransport = networkModule.getHttpServerTransportSupplier().get(); httpBind = b -> { b.bind(HttpServerTransport.class).toInstance(httpServerTransport); }; } else { httpBind = b -> { b.bind(HttpServerTransport.class).toProvider(Providers.of(null)); }; httpServerTransport = null; } ... ... ... if (NetworkModule.HTTP_ENABLED.get(settings)) { logger.debug("initializing HTTP handlers ..."); // 初始化REST的请求和处理类的映射 actionModule.initRestHandlers(() -> clusterService.state().nodes()); } logger.info("initialized"); success = true; ... ... ... }
ActionModule的内部初始化是通过插件的方式加载的,主要完成注册Action与处理类的映射和RestController的创建。
public ActionModule(boolean transportClient, Settings settings, IndexNameExpressionResolver indexNameExpressionResolver, IndexScopedSettings indexScopedSettings, ClusterSettings clusterSettings, SettingsFilter settingsFilter, ThreadPool threadPool, List<ActionPlugin> actionPlugins, NodeClient nodeClient, CircuitBreakerService circuitBreakerService, UsageService usageService) { this.transportClient = transportClient; this.settings = settings; this.indexNameExpressionResolver = indexNameExpressionResolver; this.indexScopedSettings = indexScopedSettings; this.clusterSettings = clusterSettings; this.settingsFilter = settingsFilter; this.actionPlugins = actionPlugins; // 进行Action设置,action注册和对应handler的映射绑定 actions = setupActions(actionPlugins); // 对action过滤器进行配置 actionFilters = setupActionFilters(actionPlugins); autoCreateIndex = transportClient ? null : new AutoCreateIndex(settings, clusterSettings, indexNameExpressionResolver); destructiveOperations = new DestructiveOperations(settings, clusterSettings); Set<String> headers = Stream.concat( actionPlugins.stream().flatMap(p -> p.getRestHeaders().stream()), Stream.of(Task.X_OPAQUE_ID) ).collect(Collectors.toSet()); // rest包装器 UnaryOperator<RestHandler> restWrapper = null; for (ActionPlugin plugin : actionPlugins) { UnaryOperator<RestHandler> newRestWrapper = plugin.getRestHandlerWrapper(threadPool.getThreadContext()); if (newRestWrapper != null) { logger.debug("Using REST wrapper from plugin " + plugin.getClass().getName()); if (restWrapper != null) { throw new IllegalArgumentException("Cannot have more than one plugin implementing a REST wrapper"); } restWrapper = newRestWrapper; } } mappingRequestValidators = new TransportPutMappingAction.RequestValidators( actionPlugins.stream().flatMap(p -> p.mappingRequestValidators().stream()).collect(Collectors.toList()) ); if (transportClient) { restController = null; } else { // 构建出RestController对象 restController = new RestController(headers, restWrapper, nodeClient, circuitBreakerService, usageService); } }
构造NetworkModule对象,在执行构造函数的时候,通过插件的方式加载这3个成员对象。
主要数据成员对象
Map<String, Supplier<Transport>> transportFactories Map<String, Supplier<HttpServerTransport>> transportHttpFactories List<TransportInterceptor> transportIntercetors
Transport :负责内部节点的RPC请求
HttpServerTransport:负责客户端的REST服务
TransportInterceptor:传输层拦截器
public NetworkModule(Settings settings, boolean transportClient, List<NetworkPlugin> plugins, ThreadPool threadPool, BigArrays bigArrays, PageCacheRecycler pageCacheRecycler, CircuitBreakerService circuitBreakerService, NamedWriteableRegistry namedWriteableRegistry, NamedXContentRegistry xContentRegistry, NetworkService networkService, HttpServerTransport.Dispatcher dispatcher) { this.settings = settings; this.transportClient = transportClient; // 遍历插件,分别注册HttpTransport,Transport,TransportInterceptor for (NetworkPlugin plugin : plugins) { if (transportClient == false && HTTP_ENABLED.get(settings)) { Map<String, Supplier<HttpServerTransport>> httpTransportFactory = plugin.getHttpTransports(settings, threadPool, bigArrays, circuitBreakerService, namedWriteableRegistry, xContentRegistry, networkService, dispatcher); for (Map.Entry<String, Supplier<HttpServerTransport>> entry : httpTransportFactory.entrySet()) { // 其实检查不能使用TransportClient创建HTTP的通信传输,不能有同名的HttpServerTransport registerHttpTransport(entry.getKey(), entry.getValue()); } } Map<String, Supplier<Transport>> transportFactory = plugin.getTransports(settings, threadPool, pageCacheRecycler, circuitBreakerService, namedWriteableRegistry, networkService); for (Map.Entry<String, Supplier<Transport>> entry : transportFactory.entrySet()) { // 检查不有同名的Transport registerTransport(entry.getKey(), entry.getValue()); } List<TransportInterceptor> transportInterceptors = plugin.getTransportInterceptors(namedWriteableRegistry, threadPool.getThreadContext()); for (TransportInterceptor interceptor : transportInterceptors) { // 注册传输层拦截器 registerTransportInterceptor(interceptor); } } }
在构造完成时候,通过getTransportSupplier
,getHttpServerTransportSupplier
,getTransportInterceptor
对外提供服务。
NetworkPlugin是一个接口,Netty4Plugin实现了它,同时继承了Plugin。
在Netty4Plugin中实现了NetworkPlugin的getTransports和getHttpTransports方法,分别构建了Netty4Transport和Netty4HttpServerTransport用于Transport(TCP)传输和HTTP传输。
通过类图,可以发现Netty4Transport继承了TcpTransport,TcpTransport实现了Transport接口,这应该是在实现传输层,控制数据的在传输层的交互。Netty4Transport中实现了doStart的抽象方法,用来启动TCP服务。在启动的时候,默认情况下,同时构建了Client端和Server端。主要借助netty4框架来实现这些功能。
@Override protected void doStart() { boolean success = false; try { ThreadFactory threadFactory = daemonThreadFactory(settings, TRANSPORT_WORKER_THREAD_NAME_PREFIX); eventLoopGroup = new NioEventLoopGroup(workerCount, threadFactory); // 初始化客户端 clientBootstrap = createClientBootstrap(eventLoopGroup); // 默认是开启服务端配置的,初始化Server端 if (NetworkService.NETWORK_SERVER.get(settings)) { for (ProfileSettings profileSettings : profileSettings) { createServerBootstrap(profileSettings, eventLoopGroup); bindServer(profileSettings); } } super.doStart(); success = true; } finally { if (success == false) { doStop(); } } }
根据类图发现 Netty4HttpServerTransport 同时继承了AbstractLifecycleComponent和实现了HttpServerTransport。
同样实现了doStart的抽象方法,用来启动HTTP Server服务。在HTTP Server服务中配置了监听端口和处理器。这里其实应该是通过Netty4来完成HTTP协议下的传输层的部分。
@Override protected void doStart() { ... // 配置了请求的处理类HttpChannelHandler serverBootstrap.childHandler(configureServerChannelHandler()); ... // 绑定端口作为HTTP监听端口 this.boundAddress = createBoundHttpAddress(); ... }
configureServerChannelHandler
方法中构建了一个HttpChannelHandler
对象,HttpChannelHandler
的构造函数中有两个成员变量Netty4HttpServerTransport
和Netty4HttpRequestHandler
。当收到请求的时候,会调用dispatchRequest
对不同的请求执行相应的处理。dispatchRequest
是接口HttpServerTransport
类的内部接口Dispatcher中的方法。主要用来转发请求。它的主要实现类是RestController
。
在Node的启动过程中会在初始化Transport之后,基于Transport构建TransportService。newTransportService方法中其实是调用TransportService的构造函数。
public TransportService(Settings settings, Transport transport, ThreadPool threadPool, TransportInterceptor transportInterceptor, Function<BoundTransportAddress, DiscoveryNode> localNodeFactory, @Nullable ClusterSettings clusterSettings, Set<String> taskHeaders) { this(settings, transport, threadPool, transportInterceptor, localNodeFactory, clusterSettings, taskHeaders, // 创建连接管理器 new ConnectionManager(settings, transport)); }
ConnectionManager
是Node传输连接的管理类,通过这个类的connectionManager.connectToNode(node, connectionProfile, connectionValidator(node));
解析连接配置文件创建内部连接。在ConnectionManager
的构造函数中调用ConnectionProfile.buildDefaultConnectionProfile(settings)
,这个方法会根据配置文件提供的信息创建连接。在TransportRequestOptions.Type
中发现连接的分类。
public class TransportRequestOptions { ... ... ... public enum Type { RECOVERY, // 用于恢复 BULK, // 用于批量写入 REG, // 不是很清楚,有一个是集群注册 STATE, // 传输集群的状态 PING // ping请求 } }
在TransportSettings
类中发现各类TCP连接的连接数,默认合计13个。
public final class TransportSettings { ... ... ... public static final Setting<Integer> CONNECTIONS_PER_NODE_RECOVERY = intSetting("transport.connections_per_node.recovery", 2, 1, Setting.Property.NodeScope); public static final Setting<Integer> CONNECTIONS_PER_NODE_BULK = intSetting("transport.connections_per_node.bulk", 3, 1, Setting.Property.NodeScope); public static final Setting<Integer> CONNECTIONS_PER_NODE_REG = intSetting("transport.connections_per_node.reg", 6, 1, Setting.Property.NodeScope); public static final Setting<Integer> CONNECTIONS_PER_NODE_STATE = intSetting("transport.connections_per_node.state", 1, 1, Setting.Property.NodeScope); public static final Setting<Integer> CONNECTIONS_PER_NODE_PING = intSetting("transport.connections_per_node.ping", 1, 1, Setting.Property.NodeScope); ... ... ... }
public TransportService(Settings settings, Transport transport, ThreadPool threadPool, TransportInterceptor transportInterceptor, Function<BoundTransportAddress, DiscoveryNode> localNodeFactory, @Nullable ClusterSettings clusterSettings, Set<String> taskHeaders, ConnectionManager connectionManager) { // The only time we do not want to validate node connections is when this is a transport client using the simple node sampler this.validateConnections = TransportClient.CLIENT_TYPE.equals(settings.get(Client.CLIENT_TYPE_SETTING_S.getKey())) == false || TransportClient.CLIENT_TRANSPORT_SNIFF.get(settings); this.transport = transport; this.threadPool = threadPool; this.localNodeFactory = localNodeFactory; this.connectionManager = connectionManager; this.clusterName = ClusterName.CLUSTER_NAME_SETTING.get(settings); setTracerLogInclude(TransportSettings.TRACE_LOG_INCLUDE_SETTING.get(settings)); setTracerLogExclude(TransportSettings.TRACE_LOG_EXCLUDE_SETTING.get(settings)); tracerLog = Loggers.getLogger(logger, ".tracer"); // 任务管理器服务,用于跟踪节点上当前正在运行的任务 taskManager = createTaskManager(settings, threadPool, taskHeaders); this.interceptor = transportInterceptor; // 异步发送者-发送集群节点间的通信请求 this.asyncSender = interceptor.interceptSender(this::sendRequestInternal); // 远程集群--CCR this.connectToRemoteCluster = RemoteClusterService.ENABLE_REMOTE_CLUSTERS.get(settings); remoteClusterService = new RemoteClusterService(settings, this); responseHandlers = transport.getResponseHandlers(); if (clusterSettings != null) { clusterSettings.addSettingsUpdateConsumer(TransportSettings.TRACE_LOG_INCLUDE_SETTING, this::setTracerLogInclude); clusterSettings.addSettingsUpdateConsumer(TransportSettings.TRACE_LOG_EXCLUDE_SETTING, this::setTracerLogExclude); if (connectToRemoteCluster) { remoteClusterService.listenForUpdates(clusterSettings); } } // 注册心跳Action与handler的映射 registerRequestHandler( HANDSHAKE_ACTION_NAME, () -> HandshakeRequest.INSTANCE, ThreadPool.Names.SAME, false, false, (request, channel) -> channel.sendResponse( new HandshakeResponse(localNode, clusterName, localNode.getVersion()))); }
伴随着在Node初始化的最后会调用,进行RestHandler的注册映射。通信模块的初始化部分就结束了。