本章节会介绍EventLoop、Channel、Future、Pipeline、Handdler、ByteBuf等重要的组件。
目标
开发一个简单的服务器端和客户端
引入依赖
<dependency> <groupId>io.netty</groupId> <artifactId>netty-all</artifactId> <version>4.1.39.Final</version> </dependency>
编码实现
服务器端
//声明一个服务器的启动器,组装Netty组件 new ServerBootstrap() //工作组可以添加组线程组及工作线程组 .group(new NioEventLoopGroup(1),new NioEventLoopGroup(10)) //实现的事件,比如BIO、NIO .channel(NioServerSocketChannel.class) //boss负责处理连接,worker(child)负责业务处理 .childHandler( //Channel客户端读写通道,Initializer初始化Handler new ChannelInitializer<NioSocketChannel>() { @Override protected void initChannel(NioSocketChannel sc) throws Exception { sc.pipeline().addLast(new StringDecoder()); //解码处理器 sc.pipeline().addLast(new ChannelInboundHandlerAdapter(){ //自定义的Handler //处理读事件 @Override public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { log.info("服务器读取的内容:{}",msg); } }); } }) .bind("127.0.0.1",8080); //绑定服务器及端口
客户端
Channel channel = new Bootstrap() .group(new NioEventLoopGroup(1)) .channel(NioSocketChannel.class) .handler(new ChannelInitializer<NioSocketChannel>() { @Override protected void initChannel(NioSocketChannel sc) throws Exception { sc.pipeline().addLast(new StringEncoder()); } }).connect(new InetSocketAddress("127.0.0.1", 8080)) .sync() //阻塞方法,直到连接建立放开 .channel(); while (true){ channel.writeAndFlush("hello world"); Thread.sleep(1000); }
事件循环对象(EventLoop)
其本质是一个单线程执行器(同时维护一个Selector),里面有run方法处理Channel中源源不断的IO事件。其集成关系也比较复杂:
事件循环组(EventLoopGroup)
EventLoopGroup是一组EventLoop,Chanel一般会调用EventLoopGroup的register方法来绑定其中一个EventLoop,后续这个Channel上的IO事件都由此EventLoop来处理,继承自netty自己的EventExecutorGroup:
创建一个EventLoop
//创建一个事件循环组,NioEventLoopGroup比较常见,支持IO事件、普通事件、定时任务 EventLoopGroup group = new NioEventLoopGroup(2); //指定线程数 //获取下一个事件,默认轮巡 group.next();
执行普通任务
//执行普通任务submit|execute group.next().execute(() -> { log.info("我是普通任务!"); });
执行定时任务
//执行定时任务,0初始即执行,1延迟时间,TimeUnit.SECONDS时间单位 group.next().scheduleAtFixedRate(() -> { log.info("我是普通任务!"); },0,1, TimeUnit.SECONDS);
执行IO事件
new ServerBootstrap() .group(new NioEventLoopGroup(2)) .channel(NioServerSocketChannel.class) .childHandler(new ChannelInitializer<NioSocketChannel>() { @Override protected void initChannel(NioSocketChannel sc) throws Exception { sc.pipeline().addLast(new ChannelInboundHandlerAdapter(){ @Override public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { ByteBuf buf = (ByteBuf) msg; log.info(buf.toString(Charset.forName("UTF-8"))); } }); } }) .bind(8080);
分功细化
//主服务,只处理ServerSocketChannel的连接事件 EventLoopGroup boss = new NioEventLoopGroup(1); //工作服务,只处理SocketChannel的读、写事件 EventLoopGroup worker = new NioEventLoopGroup(10); //自定义处理组 EventLoopGroup group = new DefaultEventLoop(); new ServerBootstrap() .group(boss,worker) .channel(NioServerSocketChannel.class) .childHandler(new ChannelInitializer<NioSocketChannel>() { @Override protected void initChannel(NioSocketChannel sc) throws Exception { sc.pipeline().addLast("worker1",new ChannelInboundHandlerAdapter(){ @Override public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { ByteBuf buf = (ByteBuf) msg; log.info(buf.toString(Charset.forName("UTF-8"))); } }).addLast(group,"worker2",new ChannelInboundHandlerAdapter(){ @Override public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { ByteBuf buf = (ByteBuf) msg; log.info(buf.toString(Charset.forName("UTF-8"))); } }); } }) .bind(8080);
主要作用
sync & addListener 使用二选一
ChannelFuture future = new Bootstrap() .group(new NioEventLoopGroup(1)) .channel(NioSocketChannel.class) .handler(new ChannelInitializer<NioSocketChannel>() { @Override protected void initChannel(NioSocketChannel sc) throws Exception { sc.pipeline().addLast(new StringEncoder()); } }).connect(new InetSocketAddress("127.0.0.1", 8080)); //使用sync同步阻塞 future.sync(); //阻塞直到连接建立 Channel channel = future.channel(); channel.writeAndFlush("hello world"); //使用addListener异步处理 future.addListener(new ChannelFutureListener() { //连接建立执行operationComplete方法 @Override public void operationComplete(ChannelFuture future) throws Exception { Channel channel = future.channel(); channel.writeAndFlush("hello world"); } });
优雅关闭问题
ChannelFuture future = new Bootstrap() .group(new NioEventLoopGroup(1)) .channel(NioSocketChannel.class) .handler(new ChannelInitializer<NioSocketChannel>() { @Override protected void initChannel(NioSocketChannel sc) throws Exception { sc.pipeline().addLast(new StringEncoder()); } }) .connect(new InetSocketAddress("127.0.0.1", 8080)); Channel channel = future.sync().channel(); new Thread(()->{ Scanner scanner = new Scanner(System.in); while (true){ String s = scanner.nextLine(); if(s.equals("q")){ channel.close(); break; } channel.writeAndFlush(s); } },"ceShi").start(); //优雅的关闭 ChannelFuture closeFuture = channel.closeFuture(); //同步方式sync closeFuture.sync(); log.info("我已经关闭了"); group.shutdownGracefully(); //优雅的关闭EvenntLoop //异步 closeFuture.addListener(new ChannelFutureListener() { @Override public void operationComplete(ChannelFuture channelFuture) throws Exception { log.info("我已经关闭了"); group.shutdownGracefully(); //优雅的关闭EvenntLoop } });
在异步处理时经常使用这两个接口,首先说明的时netty中的Future和JDK中的Future同名,但是是两个接口,netty的Future继承自JDK的Future,而Promise又对netty的Future进行了扩展
功能 | JDK Future | Netty Future | Promise |
---|---|---|---|
cancel | 取消任务 | ||
isCancel | 任务是否取消 | ||
isDone | 任务是否完成,不能区分成功失败 | ||
get | 获取任务结果,阻塞等待 | ||
getNow | 获取任务结果,非阻塞,未产生结果时返回null | ||
await | 等待任务结束,如果任务失败,不抛异常,需要使用isSuccess判断 | ||
sync | 等待任务结束,如果任务失败,抛异常 | ||
isSuccess | 判断任务是否成功 | ||
cause | 获取失败信息,非阻塞,如果没有失败,返回null | ||
addLinstener | 添加回调,异步接收结果 | ||
setSuccess | 设置成功 | ||
setFailure | 设置失败 |
JDK Future
//创建线程池 ExecutorService service = Executors.newFixedThreadPool(2); //提交任务 Future<Integer> future = service.submit(() -> { log.info("等待计算结果... ..."); Thread.sleep(1000); return 50; }); //通过future获取数据,get会阻塞 log.info("获取计算结果:{}",future.get());
Netty Future
NioEventLoopGroup group = new NioEventLoopGroup(); EventLoop eventLoop = group.next(); Future<Integer> future = eventLoop.submit(() -> { log.info("等待计算结果... ..."); Thread.sleep(1000); return 50; }); //同步获取 log.info("获取计算结果:{}",future.get()); //get会阻塞同sync log.info("获取计算结果:{}",future.sync().getNow()); //异步获取 future.addListener(new GenericFutureListener<Future<? super Integer>>() { @Override public void operationComplete(Future<? super Integer> future) throws Exception { log.info("获取计算结果:{}",future.getNow()); } });
Netty Promise
EventLoop eventLoop = new NioEventLoopGroup().next(); DefaultPromise<Integer> promise = new DefaultPromise<>(eventLoop); new Thread(()-> { try { Thread.sleep(1000); } catch (InterruptedException e) { e.printStackTrace(); } promise.setSuccess(50); }).start(); //同步获取 log.info("获取计算结果:{}",promise.get()); //异步获取 promise.addListener(new GenericFutureListener<Future<? super Integer>>() { @Override public void operationComplete(Future<? super Integer> future) throws Exception { log.info("获取计算结果:{}",future.getNow()); } });
ChannelHandller用来处理Channel上的各种事件,分为入站、出战两种,所有ChannelHandler被连接成一串,就是pipeline
代码示例:
new ServerBootstrap() .group(new NioEventLoopGroup(2)) .channel(NioServerSocketChannel.class) .childHandler(new ChannelInitializer<NioSocketChannel>() { @Override protected void initChannel(NioSocketChannel sc) throws Exception { ChannelPipeline pipeline = sc.pipeline(); /*pipeline会默认添加一个头(head)和尾(tail)的处理器 整体格式如:head <-> h1 <-> h2 <-> h3 <-> h4 <-> tail 他是一个双向的链表,入站处理器从前向后执行 出站处理器从后向前执行*/ //添加入站处理 pipeline.addLast("h1",new ChannelInboundHandlerAdapter(){ @Override public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { log.info("1"); super.channelRead(ctx,msg); } }); pipeline.addLast("h2",new ChannelInboundHandlerAdapter(){ @Override public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { log.info("2"); //必须写入数据才能触发出站处理器 super.channelRead(ctx,msg); sc.writeAndFlush(ctx.alloc().buffer().writeBytes("server".getBytes())); } }); pipeline.addLast("h3",new ChannelOutboundHandlerAdapter(){ @Override public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception { log.info("3"); super.write(ctx, msg, promise); } }); pipeline.addLast("h4",new ChannelOutboundHandlerAdapter(){ @Override public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception { log.info("4"); super.write(ctx, msg, promise); } }); } }) .bind(8080); }
日志输出:
##入站正序执行输出 15:48:46.456 [nioEventLoopGroup-2-2] INFO com.fyy.netty.chapter02.c2.PipelineTest - 1 15:48:46.456 [nioEventLoopGroup-2-2] INFO com.fyy.netty.chapter02.c2.PipelineTest - 2 ##出站倒序执行输出 15:48:46.457 [nioEventLoopGroup-2-2] INFO com.fyy.netty.chapter02.c2.PipelineTest - 4 15:48:46.457 [nioEventLoopGroup-2-2] INFO com.fyy.netty.chapter02.c2.PipelineTest - 3
使用EmbeddedChannel模拟服务器处理,避免创建服务器及客户端
ChannelInboundHandlerAdapter h1 = new ChannelInboundHandlerAdapter(){ @Override public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { log.debug("1"); super.channelRead(ctx, msg); } }; ChannelInboundHandlerAdapter h2 = new ChannelInboundHandlerAdapter(){ @Override public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { log.debug("2"); super.channelRead(ctx, msg); } }; ChannelOutboundHandlerAdapter h3 = new ChannelOutboundHandlerAdapter(){ @Override public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception { log.debug("3"); super.write(ctx, msg, promise); } }; ChannelOutboundHandlerAdapter h4 = new ChannelOutboundHandlerAdapter(){ @Override public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception { log.debug("4"); super.write(ctx, msg, promise); } }; //使用EmbeddedChannel模拟服务器处理 EmbeddedChannel channel = new EmbeddedChannel(h1, h2, h3, h4); //实现入站测试 channel.writeInbound(ByteBufAllocator.DEFAULT.buffer().writeBytes("hello".getBytes())); //实现出站测试 channel.writeOutbound(ByteBufAllocator.DEFAULT.buffer().writeBytes("hello".getBytes()));
创建及自动扩容
//创建ByteBuf ByteBuf buf = ByteBufAllocator.DEFAULT.buffer(); //默认大小为256 //自动扩容测试 System.out.println(buf); //打印初始对象 StringBuilder sb = new StringBuilder(); for (int i = 0; i < 300; i++) { sb.append("a"); } buf.writeBytes(sb.toString().getBytes()); System.out.println(buf); //打印填入数据后对象
PooledUnsafeDirectByteBuf(ridx: 0, widx: 0, cap: 256) PooledUnsafeDirectByteBuf(ridx: 0, widx: 300, cap: 512)
内存模式(直接内存&堆内存)
//创建池化基于堆的ByteBuf ByteBuf buf1 = ByteBufAllocator.DEFAULT.heapBuffer(10); //创建池化基于直接内存的ByteBuf ByteBuf buf2 = ByteBufAllocator.DEFAULT.directBuffer(10);
池化 vs 非池化
池化的最大意义在于可以重用ByteBuf,其优点:
设置池化功能开启,使用如下环境变量
-Dio.netty.allocator.type=[unpooled|pooled]
组成
ByteBuf由四部分组成(容量、最大容量、读指针、写指针)
最开始读写指针都在0的位置
写入
方法名 | 含义 | 备注 |
---|---|---|
writeBoolearn() | 写入boolearn值 | 用一字节01|00代表true|false |
writeByte() | 写入byte值 | |
writeShort() | 写入short值 | |
writeInt() | 写入int值 | Big Endian,即0x250,写入后00 00 02 50 |
writeLong() | 写入long值 | Little Endian,即0x250,写入后50 02 00 00 |
writeChar() | 写入char值 | |
writeFloat() | 写入float值 | |
writeDouble() | 写入double值 | |
writeBytes(ByteBuf src) | 写入netty的ByteBuf值 | |
writeBytes(byte[] src) | 写入byte[] |
读取
读取也存在类似于写入的一系列方法,不在一一展示,主要说明如何读取及重复读取
读取
byte b = buf.readByte(); //每次读取一个字节 boolean b1 = buf.readBoolean(); //读取Boolean值 ...
重复读取
buf.markReaderIndex(); //添加标记 buf.readByte(); buf.resetReaderIndex(); //重置到标记点
使用get()方法读取,不会修改读指针的位置
buf.getByte(1); //获取下标为1的字节 buf.getBoolean(1); //获取下标为1的字节并封装成Boolean ...
内存释放(retain & release)
由于Netty中有堆外内存(直接内存)的ByteBuf实现,堆外内存最好是手动释放,而不是等GC垃圾回收。
UnpooledHeapByteBuf使用的是JVM内存,只需要等GC回收即可
UnpooledDirectByteBuf使用的就是直接内存,需要特殊方法来回收内存
PooledByteBuf和他的子类使用了池化机制,需要更为复杂的规则来回收内存
//回收内存的源码实现,关注下面方法的不同实现 protected abstract void deallocate()
Netty这里采用了引用计数法来控制回收内存,每个ByteBuf都实现了ReferenceCounted接口
每个ByteBuf对象的初始计数为1
调用release方法计数减1,如果计数为0,ByteBuf内存被回收
调用retain方法计数加1,表示调用者没有用完之前,其他handler即使调用了release也不会造成回收
当计数为0,底层内存会被回收,这时即使ByteBuf对象还在,其各个方法均无法正常使用
buf.retain(); //计数加一 buf.release(); //计数减一
零拷贝
slice(切片)
ByteBuf buf = ByteBufAllocator.DEFAULT.buffer(10); buf.writeBytes(new byte[]{'a','b','c','d','e','f','j','h','i'}); ByteBufferUtil.log(buf); //使用slice实现零拷贝切片,且切片后容量不可变 ByteBuf f1 = buf.slice(0, 5); ByteBuf f2 = buf.slice(5, 5); ByteBufferUtil.log(f1); ByteBufferUtil.log(f2);
CompositeByteBuf(组合)
ByteBuf buf1 = ByteBufAllocator.DEFAULT.buffer(10); buf.writeBytes(new byte[]{'1','2','3','4','5'}); ByteBuf buf2 = ByteBufAllocator.DEFAULT.buffer(10); buf.writeBytes(new byte[]{'6','7','8','9','0'}); //使用CompositeByteBuf组合多个小的ByteBuf,同样基于零拷贝 CompositeByteBuf bytebuf = ByteBufAllocator.DEFAULT.compositeBuffer(); bytebuf.addComponent(true,buf1); bytebuf.addComponent(true,buf2); //或者使用addComponents bytebuf.addComponents(true,buf1,buf2);