public class TestSourceServer { public static void main(String[] args) { new ServerBootstrap() //EventLoop有一个线程和执行器selector,用于关注事件,解决一些任务 .group(new NioEventLoopGroup()) .channel(NioServerSocketChannel.class) .childHandler(new ChannelInitializer<NioSocketChannel>(){ @Override protected void initChannel(NioSocketChannel ch) throws Exception { ch.pipeline().addLast(new LoggingHandler()); } }).bind(8080); } }
接着上一篇文章 NioServerSocketChannel 的第10点,下面这里就是进入 accept事件,可以说到了这里完成了nio 的1,2,3
下面就是 read 方法中的代码,这里面我们主要观察4、5、6
@Override public void read() { assert eventLoop().inEventLoop(); final ChannelConfig config = config(); final ChannelPipeline pipeline = pipeline(); final RecvByteBufAllocator.Handle allocHandle = unsafe().recvBufAllocHandle(); allocHandle.reset(config); boolean closed = false; Throwable exception = null; try { try { do { //4、创建 SocketChannel,设置非阻塞 //下面看了源码后这里是 localRead =1 int localRead = doReadMessages(readBuf); if (localRead == 0) { break; } if (localRead < 0) { closed = true; break; } allocHandle.incMessagesRead(localRead); } while (allocHandle.continueReading()); } catch (Throwable t) { exception = t; } int size = readBuf.size(); for (int i = 0; i < size; i ++) { readPending = false; //pipeline:拿到NioServertSocketChannel的流水线 //调用上面的handler处理 //这一步其实上面的处理器只有三个 head-accept-end //都是前面的文章说过的 pipeline.fireChannelRead(readBuf.get(i)); } readBuf.clear(); allocHandle.readComplete(); pipeline.fireChannelReadComplete(); if (exception != null) { closed = closeOnReadError(exception); pipeline.fireExceptionCaught(exception); } if (closed) { inputShutdown = true; if (isOpen()) { close(voidPromise()); } } } finally { if (!readPending && !config.isAutoRead()) { removeReadOp(); } } } }
@Override protected int doReadMessages(List<Object> buf) throws Exception { //建立连接,创建SocketChannel返回 SocketChannel ch = SocketUtils.accept(javaChannel()); try { if (ch != null) { //创建了 NioSocketChannel,下面就是把NioSocketChannel当成一个消息放到结果里面 //到时候pipeline上的处理器会获取到这些信息并进行处理 buf.add(new NioSocketChannel(this, ch)); return 1; } } catch (Throwable t) { logger.warn("Failed to create a new channel from an accepted socket.", t); try { ch.close(); } catch (Throwable t2) { logger.warn("Failed to close a socket.", t2); } } return 0; } //accept public static SocketChannel accept(final ServerSocketChannel serverSocketChannel) throws IOException { try { return AccessController.doPrivileged(new PrivilegedExceptionAction<SocketChannel>() { @Override public SocketChannel run() throws IOException { //调用serverSocketChannel.accept把连接建立完成 return serverSocketChannel.accept(); } }); } catch (PrivilegedActionException e) { throw (IOException) e.getCause(); } }
运行到这一步,意思就是调用 pipeline 上的 handler 来处理消息,一旦调用就会跳转到下面 ServerBootstrapAcceptor 这个 accept 处理器的 read 方法中,下面就是这个方法的流程
@Override @SuppressWarnings("unchecked") public void channelRead(ChannelHandlerContext ctx, Object msg) { final Channel child = (Channel) msg; //设置处理器 child.pipeline().addLast(childHandler); //下面设置一些参数 setChannelOptions(child, childOptions, logger); setAttributes(child, childAttrs); try { //这时比较重要的一些流程,其实就是把一个新的 eventLoop //在里面找到一个 selector 来和channel进行绑定,并设置一个 //线程监听绑定的channel的事件 childGroup.register(child).addListener(new ChannelFutureListener() { @Override public void operationComplete(ChannelFuture future) throws Exception { if (!future.isSuccess()) { forceClose(child, future.cause()); } } }); } catch (Throwable t) { forceClose(child, t); } }
注意下面这个方法为什么不走 if ,因为我们运行到这里的时候走的线程是 NioEventGroup 里面的 ServerSocketChannel 的线程,而我们新建的 SocketChannel 和 当前的线程应该不能是同一个才对
似曾相识的 doRegister()
,这个方法在之前的文章(netty的源码中)也说过,里面的作用就是 把 nioServerSocketChannel 和 selector 绑定起来,并且没有关注事件,到这里,第五步完成,5. 将 SocketChannel注册到 selector
, nio步骤
我们在这个方法中继续运行,在 diRegister() 方法下面有一个方法,这个方法的作用就是触发我们新创建的 channel 上面的初始化事件我们继续运行之后就会来到我们编写的客户端的 initChannel 方法里面,主要的作用看名字也知道,就是添加处理器handler 的
来到最终的调用方法,可以看到就是在这里调用了关注 read事件,至此,第六步完成 nio步骤,当然中间的调用链不用管,只是说看到这里能意识到最终确实是完成了 nio 中accept 流程的这步,只不过 netty 中对这六步做了层层的封装。
还是这个方法,客户端连接上之后发送一条数据给服务端,注意第一次进入是accept,第二次进入才是 read,可以看到这里 readyOps 变成了 1
@Override public final void read() { final ChannelConfig config = config(); if (shouldBreakReadReady(config)) { clearReadPending(); return; } //获取 pipeline,要用到里面的 handler 处理器来处理 final ChannelPipeline pipeline = pipeline(); // 获取 ByteBuf,因为消息在这里面 final ByteBufAllocator allocator = config.getAllocator(); //allocHandle :动态调整上面的ByteBuf大小,使用直接内存,因为是 io操作,使用直接内存效率高 final RecvByteBufAllocator.Handle allocHandle = recvBufAllocHandle(); allocHandle.reset(config); ByteBuf byteBuf = null; boolean close = false; try { do { //分配具体的 ByteBuf,分完就可以读数据了 byteBuf = allocHandle.allocate(allocator); //这个方法是读取客户端发送过来的数据 allocHandle.lastBytesRead(doReadBytes(byteBuf)); //证明读完了 if (allocHandle.lastBytesRead() <= 0) { //没有东西读了,就把 ByteBuf 释放掉 byteBuf.release(); byteBuf = null; close = allocHandle.lastBytesRead() < 0; if (close) { // There is nothing left to read as we received an EOF. readPending = false; } break; } //读一次消息就增加一次 allocHandle.incMessagesRead(1); readPending = false; //这也是个重要的方法,意思是调用我们服务端的handler来处理发送的消息 //这个方法调用之后就会进入我们写的handler那里 pipeline.fireChannelRead(byteBuf); byteBuf = null; } while (allocHandle.continueReading()); allocHandle.readComplete(); pipeline.fireChannelReadComplete(); if (close) { closeOnRead(pipeline); } } catch (Throwable t) { handleReadException(pipeline, byteBuf, t, close, allocHandle); } finally { if (!readPending && !config.isAutoRead()) { removeReadOp(); } } } }