NameServer 的定义以及用处,本篇文章就不做介绍了,此文章主要分析其源码。
NamesrvStartup 是的NameServer服务的启动类。 其入口是 main0( ) 方法。
public static NamesrvController main0(String[] args) { try { // 创建 namesrv 控制器 (nameServer核心类) NamesrvController controller = createNamesrvController(args); // 启动nameSrv start(controller); String tip = "The Name Server boot success. serializeType=" + RemotingCommand.getSerializeTypeConfigInThisServer(); log.info(tip); System.out.printf("%s%n", tip); return controller; } catch (Throwable e) { e.printStackTrace(); System.exit(-1); } return null; }
上述代码非常简单, 其功能就是:
createNamesrvController(args)
start(controller);
下面分别 根据这两个方法 来做入口分析。
在分析如何创建 NamesrvController 对象之前, 先来介绍下该类
上面是 NamesrvController 类的简图, 可以结合此图来看下面的属性分析:
属性:
// 日志 private static final InternalLogger log = InternalLoggerFactory.getLogger(LoggerName.NAMESRV_LOGGER_NAME); // NameSrv的配置信息类 private final NamesrvConfig namesrvConfig; // NameSrv底层的核心服务器 - Netty配置类 private final NettyServerConfig nettyServerConfig; // 调度线程池, 执行定时任务, 两件事 : 1.检查存活的broker状态 2.打印配置 private final ScheduledExecutorService scheduledExecutorService = Executors.newSingleThreadScheduledExecutor(new ThreadFactoryImpl( "NSScheduledThread")); // 管理 NameServer的 kv配置 private final KVConfigManager kvConfigManager; // 管理 路由信息的对象 ***重要*** private final RouteInfoManager routeInfoManager; // Netty网络层封装对象 ***重要*** private RemotingServer remotingServer; // ChannelEventListener接口类 用于监听channel 状态, // 当channel状态发生改变时 例如: close idle ... 等状态时 ,该service会监听并处理 private BrokerHousekeepingService brokerHousekeepingService; // 业务线程池,netty线程主要任务 是 解析报文, 将报文解析成 RemotingCommand对象, 然后就将该对象交给业务线程池继续处理 private ExecutorService remotingExecutor; // 总配置 (其中包含 NameSrv配置 和 NettyServer配置) private Configuration configuration; private FileWatchService fileWatchService;
构造方法:
// 参数1: namesrvConfig // 参数2: 网络层配置 nettyServerConfig public NamesrvController(NamesrvConfig namesrvConfig, NettyServerConfig nettyServerConfig) { this.namesrvConfig = namesrvConfig; this.nettyServerConfig = nettyServerConfig; this.kvConfigManager = new KVConfigManager(this); this.routeInfoManager = new RouteInfoManager(); this.brokerHousekeepingService = new BrokerHousekeepingService(this); this.configuration = new Configuration( log, this.namesrvConfig, this.nettyServerConfig ); this.configuration.setStorePathFromConfig(this.namesrvConfig, "configStorePath"); }
上述代码不用多说, 就是简单的 创建对象赋值操作。 由于本篇重点分析的是 NameServer的启动流程,因此我们重点关注 nameSrvConfig , 至于其他的对象会在之后的文章中介绍。
public class NamesrvConfig { private static final InternalLogger log = InternalLoggerFactory.getLogger(LoggerName.NAMESRV_LOGGER_NAME); // 从环境变量中 获取 ROCKETMQ_HOME 值 private String rocketmqHome = System.getProperty(MixAll.ROCKETMQ_HOME_PROPERTY, System.getenv(MixAll.ROCKETMQ_HOME_ENV)); // kvConfig.json 路径 kv的存储文件路径 private String kvConfigPath = System.getProperty("user.home") + File.separator + "namesrv" + File.separator + "kvConfig.json"; // namesrv.properties 配置文件路径 private String configStorePath = System.getProperty("user.home") + File.separator + "namesrv" + File.separator + "namesrv.properties"; private String productEnvName = "center"; private boolean clusterTest = false; // 顺序消息功能 是否开启 默认关闭 private boolean orderMessageEnable = false; // .... 省略 get/set 方法 }
实际上该类也没什么好说的, 核心就是保存一些 NameServer的配置文件路径 和一些关键值。
介绍完了 NameSrvController ,重回到入口的 createNamesrvController(args)
,看看是如何 创建 NameSrvController 对象的。
/** * 创建 namesrv 控制器 * @param args 启动参数 */ public static NamesrvController createNamesrvController(String[] args) throws IOException, JoranException { System.setProperty(RemotingCommand.REMOTING_VERSION_KEY, Integer.toString(MQVersion.CURRENT_VERSION)); Options options = ServerUtil.buildCommandlineOptions(new Options()); // 启动时的参数信息, 由commandLine 管理 commandLine = ServerUtil.parseCmdLine("mqnamesrv", args, buildCommandlineOptions(options), new PosixParser()); if (null == commandLine) { System.exit(-1); return null; } // nameSrvConfig nameSrv配置 final NamesrvConfig namesrvConfig = new NamesrvConfig(); // netty 服务器配置 final NettyServerConfig nettyServerConfig = new NettyServerConfig(); // namesrv 服务器 监听端口 修改为 9876 nettyServerConfig.setListenPort(9876); if (commandLine.hasOption('c')) { // 读取 -c 选项值 String file = commandLine.getOptionValue('c'); if (file != null) { // 读取 config 文件数据 到 properties 内 InputStream in = new BufferedInputStream(new FileInputStream(file)); properties = new Properties(); properties.load(in); // 如果 config 配置文件 内的配置 涉及到 namesrvConfig 或者 nettyServerConfig 字段进行复写 MixAll.properties2Object(properties, namesrvConfig); MixAll.properties2Object(properties, nettyServerConfig); // 将读取的 配置文件 路径 保存到 字段 namesrvConfig.setConfigStorePath(file); System.out.printf("load config properties file OK, %s%n", file); in.close(); } } if (commandLine.hasOption('p')) { InternalLogger console = InternalLoggerFactory.getLogger(LoggerName.NAMESRV_CONSOLE_NAME); MixAll.printObjectProperties(console, namesrvConfig); MixAll.printObjectProperties(console, nettyServerConfig); System.exit(0); } // 将启动时 命令行 设置的 kv 复写到 namesrvConfig 内 MixAll.properties2Object(ServerUtil.commandLine2Properties(commandLine), namesrvConfig); if (null == namesrvConfig.getRocketmqHome()) { System.out.printf("Please set the %s variable in your environment to match the location of the RocketMQ installation%n", MixAll.ROCKETMQ_HOME_ENV); System.exit(-2); } // 创建日志对象。 LoggerContext lc = (LoggerContext) LoggerFactory.getILoggerFactory(); JoranConfigurator configurator = new JoranConfigurator(); configurator.setContext(lc); lc.reset(); configurator.doConfigure(namesrvConfig.getRocketmqHome() + "/conf/logback_namesrv.xml"); log = InternalLoggerFactory.getLogger(LoggerName.NAMESRV_LOGGER_NAME); MixAll.printObjectProperties(log, namesrvConfig); MixAll.printObjectProperties(log, nettyServerConfig); // 创建 控制器 // 参数1: namesrvConfig // 参数2: 网络层配置 nettyServerConfig final NamesrvController controller = new NamesrvController(namesrvConfig, nettyServerConfig); // remember all configs to prevent discard controller.getConfiguration().registerConfig(properties); return controller; }
上述代码虽然很长, 实际上就是 读取系统环境变量 和 启动参数 来设置 NameSrvConfig 和 NettyServeConfig 配置类, 并通过这俩类构造出 NamesrvController对象。
下面再来看下 是如何启动 NamesrvController 的。
public static NamesrvController start(final NamesrvController controller) throws Exception { if (null == controller) { throw new IllegalArgumentException("NamesrvController is null"); } // 初始化方法... boolean initResult = controller.initialize(); if (!initResult) { controller.shutdown(); System.exit(-3); } // 注册 JVM级别的 Hook, 平滑关机的逻辑。 当JVM被关闭时,主动调用 controller.shutdown()方法平滑关机 Runtime.getRuntime().addShutdownHook(new ShutdownHookThread(log, new Callable<Void>() { @Override public Void call() throws Exception { controller.shutdown(); return null; } })); // 服务器网络层 启动 controller.start(); return controller; }
上述代码主要 做了 三件事:
重点需要关注 第 1点 初始化 , 而第3点的逻辑实际上就是 NettyServer的启动入口了。
/** * 初始化 NameSrv 控制器 * @return */ public boolean initialize() { // 加载本地 kv 配置 this.kvConfigManager.load(); // 创建 网络服务器 对象 this.remotingServer = new NettyRemotingServer(this.nettyServerConfig, this.brokerHousekeepingService); // 创建业务线程池 默认线程数 8 this.remotingExecutor = Executors.newFixedThreadPool(nettyServerConfig.getServerWorkerThreads(), new ThreadFactoryImpl("RemotingExecutorThread_")); // 注册协议处理器 (缺省协议处理器) this.registerProcessor(); // 定时任务1: 每10s中检查 broker 存活状态, 将idle状态的broker移除 this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() { @Override public void run() { NamesrvController.this.routeInfoManager.scanNotActiveBroker(); } }, 5, 10, TimeUnit.SECONDS); // 定时任务2: 每10min中, 打印一遍 kv配置 this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() { @Override public void run() { NamesrvController.this.kvConfigManager.printAllPeriodically(); } }, 1, 10, TimeUnit.MINUTES); // ... 省略 SSL证书的操作 (不在重点关注范围内) return true; }
上述代码主要做了如下几件事: