<!-- CometD Servlet --> <servlet> <servlet-name>cometd</servlet-name> <!--<1>--> <servlet-class>org.cometd.annotation.server.CometDServlet</servlet-class> <!--liqiang todo 600000--> <init-param> <param-name>maxProcessing</param-name> <param-value>600000</param-value> </init-param> <init-param> <param-name>timeout</param-name> <param-value>20000</param-value> </init-param> <init-param> <param-name>interval</param-name> <param-value>0</param-value> </init-param> <init-param> <param-name>maxInterval</param-name> <param-value>10000</param-value> </init-param> <init-param> <param-name>handshakeReconnect</param-name> <param-value>true</param-value> </init-param> <init-param> <param-name>maxLazyTimeout</param-name> <param-value>5000</param-value> </init-param> <init-param> <param-name>long-polling.multiSessionInterval</param-name> <param-value>2000</param-value> </init-param> <init-param> <param-name>services</param-name> <param-value>org.cometd.examples.ChatService</param-value> </init-param> <init-param> <param-name>ws.cometdURLMapping</param-name> <param-value>/cometd/*</param-value> </init-param> <!--容器启动时调用init方法初始化 而不是第一次调用时--> <load-on-startup>1</load-on-startup> <async-supported>true</async-supported> </servlet>
通过ServletInit为切入点进行初始化
org.cometd.server.CometDServlet#init
@Override public void init() throws ServletException { try { boolean export = false; //可以通过ServletContext如果在别的地方自定义初始化 需要指定此值 比如自己根据spring容器初始化了 _bayeux = (BayeuxServerImpl)getServletContext().getAttribute(BayeuxServer.ATTRIBUTE); if (_bayeux == null) { export = true; //初始化 _bayeux = newBayeuxServer(); //这里主要是设置Servlet参数到_bayeux 供后续初始化使用 // Transfer all servlet init parameters to the BayeuxServer implementation for (String initParamName : Collections.list(getInitParameterNames())) { _bayeux.setOption(initParamName, getInitParameter(initParamName)); } //添加ServletContext到_bayeux // Add the ServletContext to the options _bayeux.setOption(ServletContext.class.getName(), getServletContext()); } //调用start进行组件初始化<2> _bayeux.start(); if (export) { //设置到Attribute getServletContext().setAttribute(BayeuxServer.ATTRIBUTE, _bayeux); } } catch (Exception x) { throw new ServletException(x); } }
org.cometd.server.BayeuxServerImpl#doStart
@Override protected void doStart() throws Exception { //<3>初始化内置的渠道 initializeMetaChannels(); //<4>初始化消息序列化转换器 initializeJSONContext(); //<5>初始化Transport initializeServerTransports(); //如果线程池为空 创建线程池 if (_executor == null) { //<7> _executor = new MarkedReference<>(newExecutor(), true); } addBean(_executor.getReference()); //如果Schedule为空 创建Scheduler if (_scheduler == null) { _scheduler = new MarkedReference<>(newScheduler(), true); } addBean(_scheduler.getReference()); //配置获得validateMessageFields 默认为true 是否进行消息格式校验 _validation = getOption(VALIDATE_MESSAGE_FIELDS_OPTION, true); //配置获得broadcastToPublisher 默认为true 此消息是否广播到发布者,比如都订阅了同一个渠道,是否广播给自己 _broadcastToPublisher = getOption(BROADCAST_TO_PUBLISHER_OPTION, true); super.doStart(); long defaultSweepPeriod = 997; //获得配置的sweepPeriodOption 会话扫描周期检查会话是否需要移除 默认997 毫秒单位 long sweepPeriodOption = getOption(SWEEP_PERIOD_OPTION, defaultSweepPeriod); if (sweepPeriodOption < 0) { sweepPeriodOption = defaultSweepPeriod; } long sweepPeriod = sweepPeriodOption; //开启session检查,检查是否需要剔除 schedule(new Runnable() { @Override public void run() { //并行执行asyncSweep的四个任务 执行完后指定周期后 开启下一轮 实现了定时任务效果 asyncSweep().whenComplete((r, x) -> schedule(this, sweepPeriod)); } }, sweepPeriod); }
org.cometd.server.BayeuxServerImpl#initializeMetaChannels
protected void initializeMetaChannels() { //握手<8> createChannelIfAbsent(Channel.META_HANDSHAKE); //续约连接 createChannelIfAbsent(Channel.META_CONNECT); //订阅渠道 createChannelIfAbsent(Channel.META_SUBSCRIBE); //取消订阅 createChannelIfAbsent(Channel.META_UNSUBSCRIBE); //断开连接 createChannelIfAbsent(Channel.META_DISCONNECT); }
org.cometd.server.BayeuxServerImpl#initializeJSONContext
protected void initializeJSONContext() throws Exception { //默认通过 option jsonContext去获取 Object option = getOption(AbstractServerTransport.JSON_CONTEXT_OPTION); if (option == null) { //未配置则获取默认的 _jsonContext = new JettyJSONContextServer(); } else { //如果我们有配置的是class全名称 if (option instanceof String) { Class<?> jsonContextClass = Thread.currentThread().getContextClassLoader().loadClass((String)option); if (JSONContextServer.class.isAssignableFrom(jsonContextClass)) { _jsonContext = (JSONContextServer)jsonContextClass.getConstructor().newInstance(); } else { throw new IllegalArgumentException("Invalid " + JSONContextServer.class.getName() + " implementation class"); } } else if (option instanceof JSONContextServer) {//如果是context对象 _jsonContext = (JSONContextServer)option; } else { throw new IllegalArgumentException("Invalid " + JSONContextServer.class.getName() + " implementation class"); } } _options.put(AbstractServerTransport.JSON_CONTEXT_OPTION, _jsonContext); }
org.cometd.server.BayeuxServerImpl#initializeServerTransports
protected void initializeServerTransports() { if (_transports.isEmpty()) { //初始化Transport 没重定义则创建默认 指定了则创建指定的 注:反射创建 会传入bayeux String option = (String)getOption(TRANSPORTS_OPTION); if (option == null) { //未定义则初始化处理websocket 和长轮询的Transport处理器 JSONP的处理器 // Order is important, see #findHttpTransport() //<6> ServerTransport transport = newWebSocketTransport(); if (transport != null) { addTransport(transport); } addTransport(newJSONTransport()); addTransport(new JSONPTransport(this)); } else { //如果有进行类的全名称配置 根据累的全名称创建 for (String className : option.split(",")) { ServerTransport transport = newServerTransport(className.trim()); if (transport != null) { addTransport(transport); } } if (_transports.isEmpty()) { throw new IllegalArgumentException("Option '" + TRANSPORTS_OPTION + "' does not contain a valid list of server transport class names"); } } } //如果没有配置_allowedTransports 将transport加入到 _allowedTransports//liqiangtodo 暂时不晓得干嘛的 if (_allowedTransports.isEmpty()) { String option = (String)getOption(ALLOWED_TRANSPORTS_OPTION); if (option == null) { _allowedTransports.addAll(_transports.keySet()); } else { for (String transportName : option.split(",")) { if (_transports.containsKey(transportName)) { _allowedTransports.add(transportName); } } if (_allowedTransports.isEmpty()) { throw new IllegalArgumentException("Option '" + ALLOWED_TRANSPORTS_OPTION + "' does not contain at least one configured server transport name"); } } } //逐个调用transport init方法完成Transport的初始化 Transport 内部的相关参数自定义配置可以通过Option拿到 List<String> activeTransports = new ArrayList<>(); for (String transportName : _allowedTransports) { ServerTransport serverTransport = getTransport(transportName); if (serverTransport instanceof AbstractServerTransport) { //调用init方法进行初始化 ((AbstractServerTransport)serverTransport).init(); //加入到已激活的transpor activeTransports.add(serverTransport.getName()); } } if (_logger.isDebugEnabled()) { _logger.debug("Active transports: {}", activeTransports); } }
org.cometd.server.BayeuxServerImpl#newWebSocketTransport
private ServerTransport newWebSocketTransport() { try { ClassLoader loader = Thread.currentThread().getContextClassLoader(); //加载服务端websoket组件 websocket组件 loader.loadClass("javax.websocket.server.ServerContainer"); //初始化websoketTransport String transportClass = "org.cometd.server.websocket.javax.WebSocketTransport"; ServerTransport transport = newServerTransport(transportClass); if (transport == null) { _logger.info("JSR 356 WebSocket classes available, but " + transportClass + " unavailable: JSR 356 WebSocket transport disabled"); } return transport; } catch (Exception x) { return null; } }
org.cometd.server.BayeuxServerImpl#newExecutor
private Executor newExecutor() { String name = _name + "-Executor"; //默认线程大小为128 int maxThreads = (int)getOption(EXECUTOR_MAX_THREADS, 128); QueuedThreadPool executor = new QueuedThreadPool(maxThreads, 0); executor.setName(name); executor.setReservedThreads(0); return executor; }
org.cometd.server.BayeuxServerImpl#createChannelIfAbsent
@Override public MarkedReference<ServerChannel> createChannelIfAbsent(String channelName, Initializer... initializers) { ChannelId channelId; boolean initialized = false; //尝试根据channelName获取 判断是否存在 ServerChannelImpl channel = _channels.get(channelName); if (channel == null) { // Creating the ChannelId will also normalize the channelName. //尝试通过处理过的channelId获取 channelId = new ChannelId(channelName); String id = channelId.getId(); if (!id.equals(channelName)) { channelName = id; channel = _channels.get(channelName); } } else { channelId = channel.getChannelId(); } //表示没有被初始化 if (channel == null) { //新建一个channel ServerChannelImpl candidate = new ServerChannelImpl(this, channelId); //放入_channels channel = _channels.putIfAbsent(channelName, candidate); if (channel == null) { // My candidate channel was added to the map, so I'd better initialize it channel = candidate; if (_logger.isDebugEnabled()) { _logger.debug("Added channel {}", channel); } try { //通知 Initializer实现 可以对ServerChannelImpl做自定义配置 for (Initializer initializer : initializers) { notifyConfigureChannel(initializer, channel); } //调用listeners中ChannelListener的configureChannel方法可以对channel进行自定义配置 for (BayeuxServer.BayeuxServerListener listener : _listeners) { if (listener instanceof ServerChannel.Initializer) { notifyConfigureChannel((Initializer)listener, channel); } } } finally { channel.initialized(); } //调用listeners中ChannelListener的channelAdded表示已经被初始化 for (BayeuxServer.BayeuxServerListener listener : _listeners) { if (listener instanceof BayeuxServer.ChannelListener) { notifyChannelAdded((ChannelListener)listener, channel); } } initialized = true; } } else { channel.resetSweeperPasses(); // Double check if the sweeper removed this channel between the check at the top and here. // This is not 100% fool proof (e.g. this thread is preempted long enough for the sweeper // to remove the channel, but the alternative is to have a global lock) _channels.putIfAbsent(channelName, channel); } // Another thread may add this channel concurrently, so wait until it is initialized channel.waitForInitialized(); return new MarkedReference<>(channel, initialized); }