NameServer是一个注册中心,Broker在启动时向所有的NameServer注册,生产者Producer和消费者Consumer可以从NameServer中获取所有注册的Broker列表,并从中选取Broker进行消息的发送和消费。
NameServer的启动类是NamesrvStartup
,主要做了两件事情:
createNamesrvController
方法创建NamesrvController,NamesrvController是NameServer的核心public class NamesrvStartup { private static InternalLogger log; private static Properties properties = null; private static CommandLine commandLine = null; public static void main(String[] args) { // 启动入口 main0(args); } public static NamesrvController main0(String[] args) { try { // 创建NamesrvController NamesrvController controller = createNamesrvController(args); // 启动nameserver start(controller); String tip = "The Name Server boot success. serializeType=" + RemotingCommand.getSerializeTypeConfigInThisServer(); log.info(tip); System.out.printf("%s%n", tip); return controller; } catch (Throwable e) { e.printStackTrace(); System.exit(-1); } return null; } }
createNamesrvController
方法主要是对配置信息进行处理:
NamesrvConfig
,从名字可以看出是记录NameServer的相关配置信息NettyServerConfig
,与Netty服务相关的配置信息,默认设置监听端口为9876- c
指定了配置文件,如果指定了配置文件,从指定的路径中加载文件,并将解析文件将配置保存到NamesrvConfig
和NettyServerConfig
中-Drocketmq.home.dir=路径
指定主目录,也可以在操作系统设置环境变量ROCKETMQ_HOME的方式来指定NamesrvController
并返回public class NamesrvStartup { public static NamesrvController createNamesrvController(String[] args) throws IOException, JoranException { System.setProperty(RemotingCommand.REMOTING_VERSION_KEY, Integer.toString(MQVersion.CURRENT_VERSION)); Options options = ServerUtil.buildCommandlineOptions(new Options()); commandLine = ServerUtil.parseCmdLine("mqnamesrv", args, buildCommandlineOptions(options), new PosixParser()); if (null == commandLine) { System.exit(-1); return null; } // Nameserver相关配置 final NamesrvConfig namesrvConfig = new NamesrvConfig(); // Netty服务器连接相关配置 final NettyServerConfig nettyServerConfig = new NettyServerConfig(); // 设置端口 nettyServerConfig.setListenPort(9876); // 如果启动命令中指定了配置文件 if (commandLine.hasOption('c')) { String file = commandLine.getOptionValue('c'); if (file != null) { // 获取文件 InputStream in = new BufferedInputStream(new FileInputStream(file)); properties = new Properties(); properties.load(in); // 解析配置文件 MixAll.properties2Object(properties, namesrvConfig); MixAll.properties2Object(properties, nettyServerConfig); namesrvConfig.setConfigStorePath(file); System.out.printf("load config properties file OK, %s%n", file); in.close(); } } // 如果启动命令中带了-p参数,打印NameServer的相关配置信息 if (commandLine.hasOption('p')) { InternalLogger console = InternalLoggerFactory.getLogger(LoggerName.NAMESRV_CONSOLE_NAME); MixAll.printObjectProperties(console, namesrvConfig); MixAll.printObjectProperties(console, nettyServerConfig); System.exit(0); } // 将启动命令中的一些设置记录到namesrvConfig MixAll.properties2Object(ServerUtil.commandLine2Properties(commandLine), namesrvConfig); // 如果RocketMQ主目录获取为空,打印异常信息 if (null == namesrvConfig.getRocketmqHome()) { System.out.printf("Please set the %s variable in your environment to match the location of the RocketMQ installation%n", MixAll.ROCKETMQ_HOME_ENV); System.exit(-2); } // 日志相关设置 LoggerContext lc = (LoggerContext) LoggerFactory.getILoggerFactory(); JoranConfigurator configurator = new JoranConfigurator(); configurator.setContext(lc); lc.reset(); configurator.doConfigure(namesrvConfig.getRocketmqHome() + "/conf/logback_namesrv.xml"); log = InternalLoggerFactory.getLogger(LoggerName.NAMESRV_LOGGER_NAME); MixAll.printObjectProperties(log, namesrvConfig); MixAll.printObjectProperties(log, nettyServerConfig); // 创建NamesrvController final NamesrvController controller = new NamesrvController(namesrvConfig, nettyServerConfig); controller.getConfiguration().registerConfig(properties); return controller; } }
NameServer的启动主要通过NamesrvController进行,处理逻辑如下:
initialize
函数进行初始化shutdown
方法关闭相关资源start
方法启动NameServerpublic class NamesrvStartup { public static NamesrvController start(final NamesrvController controller) throws Exception { if (null == controller) { throw new IllegalArgumentException("NamesrvController is null"); } // 初始化NamesrvController boolean initResult = controller.initialize(); if (!initResult) { controller.shutdown(); System.exit(-3); } // 注册JVM关闭钩子函数 Runtime.getRuntime().addShutdownHook(new ShutdownHookThread(log, new Callable<Void>() { @Override public Void call() throws Exception { controller.shutdown(); return null; } })); // 启动nameserver controller.start(); return controller; } }
NamesrvController的初始化方法中主要做了如下操作:
DefaultRequestProcessor
,用于处理收到的请求,比如Broker发起的注册请求public class NamesrvController { private static final InternalLogger log = InternalLoggerFactory.getLogger(LoggerName.NAMESRV_LOGGER_NAME); private final NamesrvConfig namesrvConfig; // NameServer相关配置 private final NettyServerConfig nettyServerConfig; // Netty服务相关配置 private final ScheduledExecutorService scheduledExecutorService = Executors.newSingleThreadScheduledExecutor(new ThreadFactoryImpl( "NSScheduledThread")); // 定时执行任务的线程池 private final KVConfigManager kvConfigManager; private final RouteInfoManager routeInfoManager; // 路由表 private RemotingServer remotingServer; // 远程服务,使用的是NettyRemotingServer private BrokerHousekeepingService brokerHousekeepingService; private ExecutorService remotingExecutor; // Netty服务相关线程池 // ... public boolean initialize() { // 加载配置信息 this.kvConfigManager.load(); // 创建NettyRemotingServer this.remotingServer = new NettyRemotingServer(this.nettyServerConfig, this.brokerHousekeepingService); // 创建netty服务相关线程池 this.remotingExecutor = Executors.newFixedThreadPool(nettyServerConfig.getServerWorkerThreads(), new ThreadFactoryImpl("RemotingExecutorThread_")); // 注册处理器 this.registerProcessor(); // 定时任务,扫描Broker this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() { @Override public void run() { // 心跳监测扫描处于不活跃状态的Broker NamesrvController.this.routeInfoManager.scanNotActiveBroker(); } }, 5, 10, TimeUnit.SECONDS); // 定时打印KV配置信息 this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() { @Override public void run() { NamesrvController.this.kvConfigManager.printAllPeriodically(); } }, 1, 10, TimeUnit.MINUTES); // .... return true; } // 注册处理器 private void registerProcessor() { if (namesrvConfig.isClusterTest()) { this.remotingServer.registerDefaultProcessor(new ClusterTestRequestProcessor(this, namesrvConfig.getProductEnvName()), this.remotingExecutor); } else { // 注册DefaultRequestProcessor处理请求 this.remotingServer.registerDefaultProcessor(new DefaultRequestProcessor(this), this.remotingExecutor); } } }
在启动方法中,主要是调用了RemotingServer的start
方法启动服务,在NamesrvController的初始化方法中可知,使用的实现类是NettyRemotingServer
,所以之后会启动Netty服务:
public void start() throws Exception { // 启动Netty服务 this.remotingServer.start(); if (this.fileWatchService != null) { this.fileWatchService.start(); } }
NettyRemotingServer的start方法中主要是对Netty的一些设置,然后绑定端口并启动服务:
public class NettyRemotingServer extends NettyRemotingAbstract implements RemotingServer { @Override public void start() { // ... prepareSharableHandlers(); // Netty相关设置 ServerBootstrap childHandler = this.serverBootstrap.group(this.eventLoopGroupBoss, this.eventLoopGroupSelector) // 设置EventLoopGroup线程组 .channel(useEpoll() ? EpollServerSocketChannel.class : NioServerSocketChannel.class) // 设置channel类型 .option(ChannelOption.SO_BACKLOG, nettyServerConfig.getServerSocketBacklog()) .option(ChannelOption.SO_REUSEADDR, true) .option(ChannelOption.SO_KEEPALIVE, false) .childOption(ChannelOption.TCP_NODELAY, true) .localAddress(new InetSocketAddress(this.nettyServerConfig.getListenPort())) // 设置端口 .childHandler(new ChannelInitializer<SocketChannel>() { @Override public void initChannel(SocketChannel ch) throws Exception { // 设置ChannelHandler ch.pipeline() .addLast(defaultEventExecutorGroup, HANDSHAKE_HANDLER_NAME, handshakeHandler) .addLast(defaultEventExecutorGroup, encoder, new NettyDecoder(), new IdleStateHandler(0, 0, nettyServerConfig.getServerChannelMaxIdleTimeSeconds()), connectionManageHandler, serverHandler ); } }); if (nettyServerConfig.getServerSocketSndBufSize() > 0) { log.info("server set SO_SNDBUF to {}", nettyServerConfig.getServerSocketSndBufSize()); // 设置Socket发送缓存区大小 childHandler.childOption(ChannelOption.SO_SNDBUF, nettyServerConfig.getServerSocketSndBufSize()); } if (nettyServerConfig.getServerSocketRcvBufSize() > 0) { log.info("server set SO_RCVBUF to {}", nettyServerConfig.getServerSocketRcvBufSize()); // 设置Socket接收缓存区大小 childHandler.childOption(ChannelOption.SO_RCVBUF, nettyServerConfig.getServerSocketRcvBufSize()); } if (nettyServerConfig.getWriteBufferLowWaterMark() > 0 && nettyServerConfig.getWriteBufferHighWaterMark() > 0) { log.info("server set netty WRITE_BUFFER_WATER_MARK to {},{}", nettyServerConfig.getWriteBufferLowWaterMark(), nettyServerConfig.getWriteBufferHighWaterMark()); childHandler.childOption(ChannelOption.WRITE_BUFFER_WATER_MARK, new WriteBufferWaterMark( nettyServerConfig.getWriteBufferLowWaterMark(), nettyServerConfig.getWriteBufferHighWaterMark())); } if (nettyServerConfig.isServerPooledByteBufAllocatorEnable()) { childHandler.childOption(ChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT); } try { // 绑定端口并启动服务 ChannelFuture sync = this.serverBootstrap.bind().sync(); InetSocketAddress addr = (InetSocketAddress) sync.channel().localAddress(); this.port = addr.getPort(); } catch (InterruptedException e1) { throw new RuntimeException("this.serverBootstrap.bind().sync() InterruptedException", e1); } if (this.channelEventListener != null) { this.nettyEventExecutor.start(); } this.timer.scheduleAtFixedRate(new TimerTask() { @Override public void run() { try { NettyRemotingServer.this.scanResponseTable(); } catch (Throwable e) { log.error("scanResponseTable exception", e); } } }, 1000 * 3, 1000); } }