Java教程

Netty由浅入深的学习指南(入门)

本文主要是介绍Netty由浅入深的学习指南(入门),对大家解决编程问题具有一定的参考价值,需要的程序猿们随着小编来一起学习吧!

本章节会介绍EventLoop、Channel、Future、Pipeline、Handdler、ByteBuf等重要的组件。

2.1 netty的优势

  • Netty vs NIO
    • 需要自己构建协议
    • 解决TCP传输问题,如粘包、半包
    • epoll空轮巡导致CPU 100%
    • 对API进行增强,使之更易用
  • Netty vs 其他网络应用框架
    • 迭代快
    • API更加的简洁
    • 文档优秀全面

2.2 第一个NeetyDemo “Hello World”

  • 目标

    开发一个简单的服务器端和客户端

    • 客户端向服务器端发送“hello world”
    • 服务器仅接收,不返回
  • 引入依赖

    <dependency>
        <groupId>io.netty</groupId>
        <artifactId>netty-all</artifactId>
        <version>4.1.39.Final</version>
    </dependency>
    
  • 编码实现

    • 服务器端

      //声明一个服务器的启动器,组装Netty组件
      new ServerBootstrap()
          //工作组可以添加组线程组及工作线程组
          .group(new NioEventLoopGroup(1),new NioEventLoopGroup(10))
          //实现的事件,比如BIO、NIO
          .channel(NioServerSocketChannel.class)
          //boss负责处理连接,worker(child)负责业务处理
          .childHandler(
          //Channel客户端读写通道,Initializer初始化Handler
          new ChannelInitializer<NioSocketChannel>() {
              @Override
              protected void initChannel(NioSocketChannel sc) throws Exception {
                  sc.pipeline().addLast(new StringDecoder()); //解码处理器
                  sc.pipeline().addLast(new ChannelInboundHandlerAdapter(){ //自定义的Handler
                      //处理读事件
                      @Override
                      public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
                          log.info("服务器读取的内容:{}",msg);
                      }
                  });
              }
          })
          .bind("127.0.0.1",8080); //绑定服务器及端口
      
    • 客户端

      Channel channel = new Bootstrap()
          .group(new NioEventLoopGroup(1))
          .channel(NioSocketChannel.class)
          .handler(new ChannelInitializer<NioSocketChannel>() {
              @Override
              protected void initChannel(NioSocketChannel sc) throws Exception {
                  sc.pipeline().addLast(new StringEncoder());
              }
          }).connect(new InetSocketAddress("127.0.0.1", 8080))
          .sync() //阻塞方法,直到连接建立放开
          .channel();
      while (true){
          channel.writeAndFlush("hello world");
          Thread.sleep(1000);
      }
      

2.3 组件

2.3.1 EventLoop

事件循环对象(EventLoop)

其本质是一个单线程执行器(同时维护一个Selector),里面有run方法处理Channel中源源不断的IO事件。其集成关系也比较复杂:

  • 一条线是继承自ScheduledExecutorService,因此包含了线程池中所有的方法
  • 另一条线是继承netty自己的OrderedEventExecutor
    • 提供了boolean inEventLoop(Thread thread) 方法判断一个线程是否属于此EventLoop
    • 提供了parent方法来看看自己属于那个EventLoopGroup

事件循环组(EventLoopGroup)

EventLoopGroup是一组EventLoop,Chanel一般会调用EventLoopGroup的register方法来绑定其中一个EventLoop,后续这个Channel上的IO事件都由此EventLoop来处理,继承自netty自己的EventExecutorGroup:

  • 实现了Iterable接口提供遍历EventLoop的能力
  • 另有next方法获取集合中下一个EventLoop

创建一个EventLoop

//创建一个事件循环组,NioEventLoopGroup比较常见,支持IO事件、普通事件、定时任务
EventLoopGroup group = new NioEventLoopGroup(2); //指定线程数
//获取下一个事件,默认轮巡
group.next();

执行普通任务

//执行普通任务submit|execute
group.next().execute(() -> {
    log.info("我是普通任务!");
});

执行定时任务

//执行定时任务,0初始即执行,1延迟时间,TimeUnit.SECONDS时间单位
group.next().scheduleAtFixedRate(() -> {
    log.info("我是普通任务!");
},0,1, TimeUnit.SECONDS);

执行IO事件

new ServerBootstrap()
    .group(new NioEventLoopGroup(2))
    .channel(NioServerSocketChannel.class)
    .childHandler(new ChannelInitializer<NioSocketChannel>() {
        @Override
        protected void initChannel(NioSocketChannel sc) throws Exception {
            sc.pipeline().addLast(new ChannelInboundHandlerAdapter(){
                @Override
                public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
                    ByteBuf buf = (ByteBuf) msg;
                    log.info(buf.toString(Charset.forName("UTF-8")));
                }
            });
        }
    })
    .bind(8080);

分功细化

//主服务,只处理ServerSocketChannel的连接事件
EventLoopGroup boss = new NioEventLoopGroup(1);
//工作服务,只处理SocketChannel的读、写事件
EventLoopGroup worker = new NioEventLoopGroup(10);
//自定义处理组
EventLoopGroup group = new DefaultEventLoop();
new ServerBootstrap()
    .group(boss,worker)
    .channel(NioServerSocketChannel.class)
    .childHandler(new ChannelInitializer<NioSocketChannel>() {
        @Override
        protected void initChannel(NioSocketChannel sc) throws Exception {
            sc.pipeline().addLast("worker1",new ChannelInboundHandlerAdapter(){
                @Override
                public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
                    ByteBuf buf = (ByteBuf) msg;
                    log.info(buf.toString(Charset.forName("UTF-8")));
                }
            }).addLast(group,"worker2",new ChannelInboundHandlerAdapter(){
                @Override
                public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
                    ByteBuf buf = (ByteBuf) msg;
                    log.info(buf.toString(Charset.forName("UTF-8")));
                }
            });
        }
    })
    .bind(8080);

2.3.2 Channel

主要作用

  • close() :可以用来关闭Channel
  • closeFuture() :处理channel的关闭
    • sync方法作用是同步等待channel关闭
    • addListener异步等待channel
  • pipeline() :添加处理器
  • weite() :将数据写入
  • writeAndFlush() :将数据写入并刷出

sync & addListener 使用二选一

ChannelFuture future = new Bootstrap()
    .group(new NioEventLoopGroup(1))
    .channel(NioSocketChannel.class)
    .handler(new ChannelInitializer<NioSocketChannel>() {
        @Override
        protected void initChannel(NioSocketChannel sc) throws Exception {
            sc.pipeline().addLast(new StringEncoder());
        }
    }).connect(new InetSocketAddress("127.0.0.1", 8080));

//使用sync同步阻塞
future.sync(); //阻塞直到连接建立
Channel channel = future.channel();
channel.writeAndFlush("hello world");

//使用addListener异步处理
future.addListener(new ChannelFutureListener() {
    //连接建立执行operationComplete方法
    @Override
    public void operationComplete(ChannelFuture future) throws Exception {
        Channel channel = future.channel();
        channel.writeAndFlush("hello world");
    }
});

优雅关闭问题

ChannelFuture future = new Bootstrap()
    .group(new NioEventLoopGroup(1))
    .channel(NioSocketChannel.class)
    .handler(new ChannelInitializer<NioSocketChannel>() {
        @Override
        protected void initChannel(NioSocketChannel sc) throws Exception {
            sc.pipeline().addLast(new StringEncoder());
        }
    })
    .connect(new InetSocketAddress("127.0.0.1", 8080));
Channel channel = future.sync().channel();

new Thread(()->{
    Scanner scanner = new Scanner(System.in);
    while (true){
        String s = scanner.nextLine();
        if(s.equals("q")){
            channel.close();
            break;
        }
        channel.writeAndFlush(s);
    }
},"ceShi").start();

//优雅的关闭
ChannelFuture closeFuture = channel.closeFuture();

//同步方式sync
closeFuture.sync();
log.info("我已经关闭了");
group.shutdownGracefully(); //优雅的关闭EvenntLoop

//异步
closeFuture.addListener(new ChannelFutureListener() {
    @Override
    public void operationComplete(ChannelFuture channelFuture) throws Exception {
        log.info("我已经关闭了");
        group.shutdownGracefully(); //优雅的关闭EvenntLoop
    }
});

2.3.3 Future & Promis

在异步处理时经常使用这两个接口,首先说明的时netty中的Future和JDK中的Future同名,但是是两个接口,netty的Future继承自JDK的Future,而Promise又对netty的Future进行了扩展

  • jdk Future 只能同步等待任务结束(成功|失败)才能得到结果
  • netty Future可以同步等待任务结束得到结果,也可以异步得到结果,但都需要等任务结束
  • netty Promise不仅有netty Future的功能,而且脱离了任务独立存在,只作为两个线程间传递结果的容器
功能JDK FutureNetty FuturePromise
cancel取消任务
isCancel任务是否取消
isDone任务是否完成,不能区分成功失败
get获取任务结果,阻塞等待
getNow获取任务结果,非阻塞,未产生结果时返回null
await等待任务结束,如果任务失败,不抛异常,需要使用isSuccess判断
sync等待任务结束,如果任务失败,抛异常
isSuccess判断任务是否成功
cause获取失败信息,非阻塞,如果没有失败,返回null
addLinstener添加回调,异步接收结果
setSuccess设置成功
setFailure设置失败

JDK Future

//创建线程池
ExecutorService service = Executors.newFixedThreadPool(2);
//提交任务
Future<Integer> future = service.submit(() -> {
    log.info("等待计算结果... ...");
    Thread.sleep(1000);
    return 50;
});

//通过future获取数据,get会阻塞
log.info("获取计算结果:{}",future.get());

Netty Future

NioEventLoopGroup group = new NioEventLoopGroup();
EventLoop eventLoop = group.next();

Future<Integer> future = eventLoop.submit(() -> {
    log.info("等待计算结果... ...");
    Thread.sleep(1000);
    return 50;
});

//同步获取
log.info("获取计算结果:{}",future.get()); //get会阻塞同sync
log.info("获取计算结果:{}",future.sync().getNow());
//异步获取
future.addListener(new GenericFutureListener<Future<? super Integer>>() {
    @Override
    public void operationComplete(Future<? super Integer> future) throws Exception {
        log.info("获取计算结果:{}",future.getNow());
    }
});

Netty Promise

EventLoop eventLoop = new NioEventLoopGroup().next();
DefaultPromise<Integer> promise = new DefaultPromise<>(eventLoop);

new Thread(()-> {
    try {
        Thread.sleep(1000);
    } catch (InterruptedException e) {
        e.printStackTrace();
    }
    promise.setSuccess(50);
}).start();

//同步获取
log.info("获取计算结果:{}",promise.get());
//异步获取
promise.addListener(new GenericFutureListener<Future<? super Integer>>() {
    @Override
    public void operationComplete(Future<? super Integer> future) throws Exception {
        log.info("获取计算结果:{}",future.getNow());
    }
});

2.3.4 Handler & Pipeline

ChannelHandller用来处理Channel上的各种事件,分为入站、出战两种,所有ChannelHandler被连接成一串,就是pipeline

  • 入站处理器通常是ChannelInboundHandlerAdapter的子类,主要用来读取客户端数据,写回结果
  • 出战处理器通常是ChannelOutBoundHandlerAdapter的子类,主要对写回结果进行加工

代码示例:

new ServerBootstrap()
    .group(new NioEventLoopGroup(2))
    .channel(NioServerSocketChannel.class)
    .childHandler(new ChannelInitializer<NioSocketChannel>() {
        @Override
        protected void initChannel(NioSocketChannel sc) throws Exception {
            ChannelPipeline pipeline = sc.pipeline();
            /*pipeline会默认添加一个头(head)和尾(tail)的处理器
                          整体格式如:head <-> h1 <-> h2 <-> h3 <-> h4 <-> tail
                          他是一个双向的链表,入站处理器从前向后执行
                          出站处理器从后向前执行*/
            //添加入站处理
            pipeline.addLast("h1",new ChannelInboundHandlerAdapter(){
                @Override
                public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
                    log.info("1");
                    super.channelRead(ctx,msg);
                }
            });
            pipeline.addLast("h2",new ChannelInboundHandlerAdapter(){
                @Override
                public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
                    log.info("2");
                    //必须写入数据才能触发出站处理器
                    super.channelRead(ctx,msg);
                    sc.writeAndFlush(ctx.alloc().buffer().writeBytes("server".getBytes()));
                }
            });
            pipeline.addLast("h3",new ChannelOutboundHandlerAdapter(){
                @Override
                public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception {
                    log.info("3");
                    super.write(ctx, msg, promise);
                }
            });
            pipeline.addLast("h4",new ChannelOutboundHandlerAdapter(){
                @Override
                public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception {
                    log.info("4");
                    super.write(ctx, msg, promise);
                }
            });
        }
    })
    .bind(8080);
}

日志输出:

##入站正序执行输出
15:48:46.456 [nioEventLoopGroup-2-2] INFO com.fyy.netty.chapter02.c2.PipelineTest - 1
15:48:46.456 [nioEventLoopGroup-2-2] INFO com.fyy.netty.chapter02.c2.PipelineTest - 2
##出站倒序执行输出
15:48:46.457 [nioEventLoopGroup-2-2] INFO com.fyy.netty.chapter02.c2.PipelineTest - 4
15:48:46.457 [nioEventLoopGroup-2-2] INFO com.fyy.netty.chapter02.c2.PipelineTest - 3

使用EmbeddedChannel模拟服务器处理,避免创建服务器及客户端

ChannelInboundHandlerAdapter h1 = new ChannelInboundHandlerAdapter(){
    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
        log.debug("1");
        super.channelRead(ctx, msg);
    }
};
ChannelInboundHandlerAdapter h2 = new ChannelInboundHandlerAdapter(){
    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
        log.debug("2");
        super.channelRead(ctx, msg);
    }
};

ChannelOutboundHandlerAdapter h3 = new ChannelOutboundHandlerAdapter(){
    @Override
    public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception {
        log.debug("3");
        super.write(ctx, msg, promise);
    }
};
ChannelOutboundHandlerAdapter h4 = new ChannelOutboundHandlerAdapter(){
    @Override
    public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception {
        log.debug("4");
        super.write(ctx, msg, promise);
    }
};

//使用EmbeddedChannel模拟服务器处理
EmbeddedChannel channel = new EmbeddedChannel(h1, h2, h3, h4);
//实现入站测试
channel.writeInbound(ByteBufAllocator.DEFAULT.buffer().writeBytes("hello".getBytes()));
//实现出站测试
channel.writeOutbound(ByteBufAllocator.DEFAULT.buffer().writeBytes("hello".getBytes()));

2.3.5 ByteBuf

  • 创建及自动扩容

    • 测试代码
    //创建ByteBuf
    ByteBuf buf = ByteBufAllocator.DEFAULT.buffer(); //默认大小为256
    //自动扩容测试
    System.out.println(buf); //打印初始对象
    StringBuilder sb = new StringBuilder();
    for (int i = 0; i < 300; i++) {
        sb.append("a");
    }
    buf.writeBytes(sb.toString().getBytes());
    System.out.println(buf); //打印填入数据后对象
    
    • 日志
    PooledUnsafeDirectByteBuf(ridx: 0, widx: 0, cap: 256)
    PooledUnsafeDirectByteBuf(ridx: 0, widx: 300, cap: 512)
    
  • 内存模式(直接内存&堆内存)

    • 直接内存创建和销毁的代价昂贵,但读写性能高(少一次内存复制),适合配合池化功能一起使用
    • 直接内存对GC压力小,因为这部分内存不受JVM垃圾回收的管理,但也要注意及时主动释放
    //创建池化基于堆的ByteBuf
    ByteBuf buf1 = ByteBufAllocator.DEFAULT.heapBuffer(10);
    //创建池化基于直接内存的ByteBuf
    ByteBuf buf2 = ByteBufAllocator.DEFAULT.directBuffer(10);
    
  • 池化 vs 非池化

    池化的最大意义在于可以重用ByteBuf,其优点:

    • 没有池化,则每次都得创建新的ByteBuf实例,这个操作对直接内存代价昂贵,就算使用堆内存,也会增加GC的压力
    • 有了池化,则可以重用池中ByteBuf实例,并且采用了与jemalloc类似的内存分配算法提升分配效率
    • 高并发时,池化功能更节约内存,减少内存溢出的问题

    设置池化功能开启,使用如下环境变量

    -Dio.netty.allocator.type=[unpooled|pooled]
    
  • 组成

    ByteBuf由四部分组成(容量、最大容量、读指针、写指针)

在这里插入图片描述

最开始读写指针都在0的位置

  • 写入

    方法名含义备注
    writeBoolearn()写入boolearn值用一字节01|00代表true|false
    writeByte()写入byte值
    writeShort()写入short值
    writeInt()写入int值Big Endian,即0x250,写入后00 00 02 50
    writeLong()写入long值Little Endian,即0x250,写入后50 02 00 00
    writeChar()写入char值
    writeFloat()写入float值
    writeDouble()写入double值
    writeBytes(ByteBuf src)写入netty的ByteBuf值
    writeBytes(byte[] src)写入byte[]
  • 读取

    读取也存在类似于写入的一系列方法,不在一一展示,主要说明如何读取及重复读取

    • 读取

      byte b = buf.readByte(); //每次读取一个字节
      boolean b1 = buf.readBoolean(); //读取Boolean值 
      ...
      
    • 重复读取

      buf.markReaderIndex(); //添加标记
      buf.readByte();
      buf.resetReaderIndex(); //重置到标记点
      
    • 使用get()方法读取,不会修改读指针的位置

      buf.getByte(1); //获取下标为1的字节
      buf.getBoolean(1); //获取下标为1的字节并封装成Boolean
      ...
      
  • 内存释放(retain & release)

    由于Netty中有堆外内存(直接内存)的ByteBuf实现,堆外内存最好是手动释放,而不是等GC垃圾回收。

    • UnpooledHeapByteBuf使用的是JVM内存,只需要等GC回收即可

    • UnpooledDirectByteBuf使用的就是直接内存,需要特殊方法来回收内存

    • PooledByteBuf和他的子类使用了池化机制,需要更为复杂的规则来回收内存

      //回收内存的源码实现,关注下面方法的不同实现
      protected abstract void deallocate()
      

    Netty这里采用了引用计数法来控制回收内存,每个ByteBuf都实现了ReferenceCounted接口

    • 每个ByteBuf对象的初始计数为1

    • 调用release方法计数减1,如果计数为0,ByteBuf内存被回收

    • 调用retain方法计数加1,表示调用者没有用完之前,其他handler即使调用了release也不会造成回收

    • 当计数为0,底层内存会被回收,这时即使ByteBuf对象还在,其各个方法均无法正常使用

      buf.retain(); //计数加一
      buf.release(); //计数减一
      
  • 零拷贝

    • slice(切片)

      ByteBuf buf = ByteBufAllocator.DEFAULT.buffer(10);
      buf.writeBytes(new byte[]{'a','b','c','d','e','f','j','h','i'});
      ByteBufferUtil.log(buf);
      
      //使用slice实现零拷贝切片,且切片后容量不可变
      ByteBuf f1 = buf.slice(0, 5);
      ByteBuf f2 = buf.slice(5, 5);
      ByteBufferUtil.log(f1);
      ByteBufferUtil.log(f2);
      
    • CompositeByteBuf(组合)

      ByteBuf buf1 = ByteBufAllocator.DEFAULT.buffer(10);
      buf.writeBytes(new byte[]{'1','2','3','4','5'});
      ByteBuf buf2 = ByteBufAllocator.DEFAULT.buffer(10);
      buf.writeBytes(new byte[]{'6','7','8','9','0'});
      
      //使用CompositeByteBuf组合多个小的ByteBuf,同样基于零拷贝
      CompositeByteBuf bytebuf = ByteBufAllocator.DEFAULT.compositeBuffer();
      bytebuf.addComponent(true,buf1);
      bytebuf.addComponent(true,buf2);
      //或者使用addComponents
      bytebuf.addComponents(true,buf1,buf2);
      

2.3.6 ByteBuf的优势

  • 池化:可以重用池中ByteBuf实例,节省内存,减少内存溢出的可能
  • 读写指针分离,不需要切换读写模式
  • 可以自动扩容
  • 支持链式调用,使用更流畅
  • 很多地方体现零拷贝,例如slice\duplicate\CompositeByteBuf
这篇关于Netty由浅入深的学习指南(入门)的文章就介绍到这儿,希望我们推荐的文章对大家有所帮助,也希望大家多多支持为之网!