@ChannelHandler.Sharable public class NettyServerHandler extends ChannelInboundHandlerAdapter
注解说明了这个handler可以被多个channel共享,然后该类又继承了ChannelInboundHandlerAdapter,他就可以处理各种网络事件,如register,unregistred等事件。
private final Logger logger = LoggerFactory.getLogger(NettyServerHandler.class); /** * netty remote server */ private final NettyRemotingServer nettyRemotingServer; /** * server processors queue */ private final ConcurrentHashMap<CommandType, Pair<NettyRequestProcessor, ExecutorService>> processors = new ConcurrentHashMap<>();
可以看到它持有一个NettyRemotingServer类的引用。 它还定义了一个并发的map结构,用来存储命令类型和处理类以及executor。
我们看它的构造方法, 给NettyRemotingServer类的引用赋值。
public NettyServerHandler(NettyRemotingServer nettyRemotingServer) { this.nettyRemotingServer = nettyRemotingServer; }
再看它的方法
public void registerProcessor(final CommandType commandType, final NettyRequestProcessor processor) { this.registerProcessor(commandType, processor, null); } public void registerProcessor(final CommandType commandType, final NettyRequestProcessor processor, final ExecutorService executor) { ExecutorService executorRef = executor; if (executorRef == null) { executorRef = nettyRemotingServer.getDefaultExecutor(); } this.processors.putIfAbsent(commandType, new Pair<>(processor, executorRef)); }
可以看出它的registerProcessor方法就是保存了命令类型和处理该命令的处理器以及执行的executor的到一个并发map结构中。
再看它的处理
@Override public void channelRead(ChannelHandlerContext ctx, Object msg) { processReceived(ctx.channel(), (Command) msg); } private void processReceived(final Channel channel, final Command msg) { final CommandType commandType = msg.getType(); if (CommandType.HEART_BEAT.equals(commandType)) { if (logger.isDebugEnabled()) { logger.debug("server receive heart beat from: host: {}", ChannelUtils.getRemoteAddress(channel)); } return; } final Pair<NettyRequestProcessor, ExecutorService> pair = processors.get(commandType); if (pair != null) { Runnable r = () -> { try { pair.getLeft().process(channel, msg); } catch (Exception ex) { logger.error("process msg {} error", msg, ex); } }; try { pair.getRight().submit(r); } catch (RejectedExecutionException e) { logger.warn("thread pool is full, discard msg {} from {}", msg, ChannelUtils.getRemoteAddress(channel)); } } else { logger.warn("commandType {} not support", commandType); } }
可以看出它是先从消息中拿出消息的CommandType,也就是命令模式的命令类型,如果是心跳命令就不处理,如果不是就从保存的map结构中拿出已经注册好的处理类和执行线程池。让线程池去执行该处理即可。