Java教程

写给大忙人看的netty实战

本文主要是介绍写给大忙人看的netty实战,对大家解决编程问题具有一定的参考价值,需要的程序猿们随着小编来一起学习吧!

初探netty

netty作为一个NIO客户端服务器框架,可以快速、轻松地构建网络应用,比如协议服务器和客户端。netty吸收了FTP、SMTP、HTTP等协议的实现经验,在易用性和敏捷性的基础上保证了程序的稳健性以及可维护性 。

当我们刚开始学习java网络编程时都是开一个socket端口,然后调用方法accept()方法阻塞式等待连接,然后不断读取数据。等到后面掌握了更多后,我们尝试使用非阻塞方式读取数据以及使用Selector选择器的非阻塞I/O。

1591339191895

随着业务不断增长,成千上万的并发量不再是不可能的了。为了更高的吞吐量与可扩展的性能,稳健、易行的客户端开发框架成为开发者的追求,而netty很完美地满足了人们的需求。它封装了java复杂的底层API,并以易于使用的方式暴露出来,使用netty可以更加注重业务逻辑的开发,而不是琐碎的底层架构。

在下面是netty的核心组件,详细的内容将于后续记录:

  • Channel(通道)。

    通道是java nio的一个核心概念,它表示一个到实体的操作连接(比如网络连接、文件I/O操作)。

  • 回调。

    回调实际上是 一个方法,当一个方法调用时,其指定(或者说是绑定)的回调方法也会被调用。

  • Future

    Future是一个异步操作的占位符,当该异步操作完成时,其对应的Future对象便会调用。netty的Future实现——ChannelFuture允许一个异步操作可以注册多个ChannelFutureListener实例。每个netty的出站I/O都会返回一个ChannelFuture实例。

  • 事件与Handler(处理器)。

    netty是通过事件来通知我们操作状态的改变,事件可能有:

    • 连接激活或失活(入站事件)。
    • 数据读取(入站事件)。
    • 用户事件(入站事件)。
    • 错误事件(入站事件)。
    • 打开或关闭到远程节点的连接(出站事件)。
    • 将数据写到套接字(出站事件)。

    netty为ChannelHandler有许多实现,同时你也可以自定义实现。每个事件都会分发到对应的ChannelHandler类中的某个方法。

注意:入站和出站是相对ChannelHandler而言的,进入ChannelHandler为入站,从ChannelHandler发出消息是出站。

代码示例

下面是一个简单的netty使用:

入站处理器实现

@ChannelHandler.Sharable // 标识一个 channelHandler 可以被多个 channel 安全地调用。
public class NettyServerHandler extends ChannelInboundHandlerAdapter {
    @Override
    // 当有入站消息时该方法就会调用
    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
        ByteBuf buffer = (ByteBuf)msg;
        System.out.println("服务器收到消息:" + buffer.toString(CharsetUtil.UTF_8));
        // 将接收到的消息写给发送者,而不冲刷出站消息。
        ctx.write(buffer);
    }

    @Override
    // channelRead消费完读取的数据的时候被触发
    public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
        // 将未决消息冲刷到远程节点,并且关闭该 channel
        ChannelFuture channelFuture = ctx.writeAndFlush(Unpooled.EMPTY_BUFFER);
        channelFuture.addListener(ChannelFutureListener.CLOSE);
    }

    @Override
    // 在读操作时处理异常
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
        // 打印异常栈,并关闭该 channel
        cause.printStackTrace();
        ctx.close();
    }
}
复制代码

注意:netty服务端handler的处理是采用责任链的形式。默认情况下,channelHandler会把对它的方法调用转发给链中的下一个channelHandler。如果 exceptionCaught()方法没有被该链中的某处实现,那么所接收的异常会被传递到channelPipeline的尾端并被记录。

服务器实现

public class NettyServer {
    public static void main(String[] args) {
        NettyServer nettyServer = new NettyServer();
        nettyServer.start(8888);
    }

    public void start(int port){
        // 处理TCP连接请求
        EventLoopGroup bossGroup = new NioEventLoopGroup();
        // 处理I/O事件
        EventLoopGroup workGroup = new NioEventLoopGroup();
        try {
            // 用于引导和绑定服务器
            ServerBootstrap bootstrap = new ServerBootstrap();
            //将上面的线程组加入到 bootstrap 中
            bootstrap.group(bossGroup,workGroup)
                    //将通道设置为异步的通道
                    .channel(NioServerSocketChannel.class)
                    .childHandler(new ChannelInitializer<SocketChannel>() {
                        protected void initChannel(SocketChannel socketChannel) throws Exception {
                            // 因为 NettyServerHandler 被标注为 @Sharable,所以可以使用相同的实例
                            socketChannel.pipeline().addLast(new NettyServerHandler());
                        }
                    })
                    .option(ChannelOption.SO_BACKLOG,200)
                    .childOption(ChannelOption.SO_KEEPALIVE,true);
            // 异步绑定服务器,调用 sync() 方法阻塞等待直到绑定完成。
            ChannelFuture future = bootstrap.bind(port).sync();
            // 获取 channel 的 closeFuture,并且阻塞直到它完成。
            future.channel().closeFuture().sync();
        } catch (InterruptedException e) {
            e.printStackTrace();
        } finally {
            bossGroup.shutdownGracefully();
            workGroup.shutdownGracefully();
        }
    }
}
复制代码

出站处理器实现

@ChannelHandler.Sharable
public class NettyClientHandler extends SimpleChannelInboundHandler<ByteBuf> {
    @Override
    public void channelActive(ChannelHandlerContext ctx) throws Exception {
       ctx.writeAndFlush(Unpooled.copiedBuffer("netty 活跃", CharsetUtil.UTF_8));
    }

    // 记录已接收的消息存储
    @Override
    protected void channelRead0(ChannelHandlerContext ctx, ByteBuf msg) throws Exception {
        System.out.println("客户端接收消息:" + msg.toString(CharsetUtil.UTF_8));
    }

    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
        cause.printStackTrace();
        ctx.close();
    }
}
复制代码

SimpleChannelInboundHandlerChannelInboundHandler

客户端使用SimpleChannelInboundHandler的原因是它不需要考虑异步操作,当channelRead0方法执行完后,SimpleChannelInboundHandler就会释放指向保存该消息的ByteBuffer的内存。

服务端使用ChannelInboundHandler是因为它需要给客户端传送消息,而ctx.write()方法是异步的,可能channelRead()方法执行完了它还没有返回,所以为了避免这种情况便使用了ChannelInboundHandlerchannelReadComplete方法会在channelRead()消费完读取的数据的时候被触发,此时它会将输出冲刷到channel

客户端实现

public class NettyClient {
    public static void main(String[] args) {
        NettyClient nettyClient = new NettyClient();
        nettyClient.connect("localhost", 8888);
    }

    public void connect(String hostname,int port) {
        // 处理TCP连接请求
        EventLoopGroup group = new NioEventLoopGroup();
        try {
            // 用于引导和绑定服务器
            Bootstrap bootstrap = new Bootstrap();
            //将上面的线程组加入到 bootstrap 中
            bootstrap.group(group)
                	//将通道设置为异步的通道
                    .channel(NioSocketChannel.class)
                    .handler(new ChannelInitializer<SocketChannel>() {
                        protected void initChannel(SocketChannel socketChannel) throws Exception {
                            ChannelPipeline pipeline = socketChannel.pipeline()
                                    .addLast(new NettyClientHandler());
                        }
                    })
                    .option(ChannelOption.TCP_NODELAY,true);

            // 连接到远程节点,阻塞等待直到连接完成。
            ChannelFuture future = bootstrap.connect(hostname, port).sync();
            // 阻塞,直到 channel 关闭。
            future.channel().closeFuture().sync();
        } catch (InterruptedException e) {
            e.printStackTrace();
        } finally {
            // 关闭线程池并释放所有资源。
            group.shutdownGracefully();
        }
    }
}
复制代码

总结

从上面的代码示例我们可以看出netty服务端和客户端的构建其实大同小异,都是实现了处理器然后进行绑定。

服务端开发

  1. 创建并实现处理器逻辑。
    1. 应用程序根据需求扩展ChannelHandler
    2. 针对不同类型的事件来调用ChannelHandler
  2. 创建ServerBootstrap实例来引导和绑定服务器。
  3. 创建并分配一个NioEventLoopGroup实例来进行请求处理。
  4. 创建并分配一个NioEventLoopGroup实例来进行事件处理。
  5. 指定服务器绑定的本地InetSocketAddress
  6. 使用处理器实例初始化每一个新的channel
  7. 调用ServerBootstrap.bind()方法来绑定服务器。

客户端开发

  1. 创建并实现处理器逻辑。
    1. 应用程序根据需求扩展ChannelHandler
    2. 针对不同类型的事件来调用ChannelHandler
  2. 创建Bootstrap实例来初始化客户端。
  3. 创建并分配一个NioEventLoopGroup实例来进行事件处理。
  4. 为服务器创建InetSocketAddress实例。
  5. 当连接被建立时,一个Handler会被安装到该channelChannelPipline上。
  6. 调用Bootstrap.connect()方法连接远程节点 。

在前面,我们初步了解了netty,包括它的核心内容、简单使用。接下来我们将继续学习netty的主要组件与设计理念。

netty组件化设计

Channel接口

我们知道netty的核心组件包括通道,通道是java中很重要的一个概念,它表达一个到实体操作的连接。netty将多个操作抽象出来作为通道,其中包括:

  • Socket操作。
  • 多线程处理。
  • 异步通知 。

我们知道网络中基本的I/O操作(连接建立、读取数据、写入数据)都是依赖于底层网络传输所提供的接口,在java中则表示为Socket类。netty对此类进行进一步的封装,大大降低了Socket类的复杂度,它基于Channel接口提供了一系列的实现:

  • EmbeddedChannel
  • EpollDatagramChannel
  • LocalServerChannel
  • KQueueDatagramChannel
  • NioDatagramChannel
  • NioSctpChannel
  • NioSocketChannel

1591443547407

如图所示,每个Channel都会被分配一个ChannelPipline和ChannelConfig,ChannelConfig包含了该Channel的所有配置,并且支持热更新。通常在Channel实例被创建时,就会创建默认的ChannelConfig:

// NioServerSocketChannel构造方法
public NioServerSocketChannel(ServerSocketChannel channel) {
    super(null, channel, SelectionKey.OP_ACCEPT);
    config = new NioServerSocketChannelConfig(this, javaChannel().socket());
}
复制代码

netty提供了一个ChannelOption类,定义了ChannelConfig支持的所有参数类型,可以这样使用:

NioServerSocketChannel channel = new NioServerSocketChannel();
ServerSocketChannelConfig config = channel.config();
config.setOption(ChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT);
// 针对每个ChannelOption选项,netty还提供了对应的方法,比如上面的设置可以这样代替
// config.setAllocator(PooledByteBufAllocator.DEFAULT);
// 设置通道
bootstrap.channel(channel.getClass());
复制代码

为了保证Channel的顺序性,因此实现了Comparable接口。所以如果两个不同的Channel的哈希值一样,那么就会抛出错误。

Channel还提供了另外方法,主要的有:

方法名 描述
eventLoop 返回对应的EventLoop
pipeline 返回对应的ChannelPipline
localAddress 返回本地的SocketAddress
remoteAddress 返回远程的SocketAddress
write 将数据写到远程节点,这个数据将被传递给ChannelPipline,并且排队直到它被冲刷
flush 将之前写好的数据冲刷到远程节点
writeFlush 写数据并将其冲刷到远程节点
isActive 判断该Channel是否处于活动状态

netty支持的传输

NIO

NIO通过Selector实现了所有I/O异步的操作,Selector运行在一个检查状态变化并对其做出相应的线程上。Selector的位模式由java.nio.channels.SelectionKey定义:

名称 描述
OP_ACCEPT 接受连接并创建Channel时获得通知
OP_CONNECT 在建立连接时获得通知
OP_READ 在数据可以读取时获得通知
OP_WRITE 在Channel的发送缓冲区可写时获得通知

1591446199302

Epoll

Epoll是来自Linux的一个高性能、可扩展的I/O事件通知特性。netty为Linux的epoll提供了对应的API。

OIO

OIO(old IO)是旧的阻塞I/O,通过常规的传输API使用,可以在项目移植的时候用它来进行过渡。

Local

它是由netty提供的本地传输方式,用于在同一个JVM中运行的客户端和服务端之间的异步通信。

Embedded

Embedded传输使得我们可以将一组ChannelHandler作为辅助类嵌入到其它的ChannelHandler内部,这样可以在不需要修改内部代码的情况下扩展一个ChannelHandler的功能。Embedded传输的关键是EmbeddChannel的Channel实现。

Channel的生命周期

Channel定义了一组和ChannelInboundHandler相关的状态模式:

状态 描述
ChannelUnregistered Channel已被创建 ,但未被注册到EventLoop
ChannelRegistered Channel被注册到EventLoop
ChannelActive Channel处于激活状态,可以接收和发送数据
ChannelInactive Channel处于未激活状态

基于EmbeddedChannel的单元测试

EmbeddedChannel是netty专门改进针对ChannelHandler的单元测试而提供的。可以用它来模拟发送和请求消息,来测试对应的ChannelHandler的功能实现 。

EmbeddedChannel提供了以下常用的API:

API 描述
writeInbound 写一个入站消息到 EmbeddedChannel。 如果数据能从 EmbeddedChannel 通过 readInbound() 读到,则返回 true;
readInbound 从 EmbeddedChannel 读到入站消息。任何返回遍历整个ChannelPipeline。如果读取还没有准备,则此方法返回 null;
writeOutbound 写一个出站消息到 EmbeddedChannel。 如果数据能从 EmbeddedChannel 通过 readOutbound() 读到,则返回 true;
readOutbound 从 EmbeddedChannel 读到出站消息。任何返回遍历整个ChannelPipeline。如果读取还没有准备,则此方法返回 null;
Finish 如果从入站或者出站中能读到数据,标记 EmbeddedChannel 完成并且返回。这同时会调用 EmbeddedChannel 的关闭方法;

1591534125350

@Test
public void testFramesDecoded(){
    ByteBuf buf= Unpooled.buffer();
    for (int i=0;i<9;i++){
        buf.writeByte(i);
    }
    ByteBuf input=buf.duplicate();
    EmbeddedChannel channel=new EmbeddedChannel(
        new FixedLengthFrameDecoder(3)
    );
    assertTrue(channel.writeInbound(input.retain()));
    assertTrue(channel.finish());    
    //读取消息
    ByteBuf read=channel.readInbound();
    assertEquals(buf.readSlice(3),read);
    read.release();

    read = (ByteBuf) channel.readInbound();
    assertEquals(buf.readSlice(3), read);
    read.release();
    read = (ByteBuf) channel.readInbound();
    assertEquals(buf.readSlice(3), read);
    read.release();
    assertNull(channel.readInbound());
    buf.release();
}
复制代码

EventLoop接口

EventLoop定义了netty的核心抽象,用于处理连接的生命周期中所发生的事情。

1591427101690

从上图我们可以看出:

  • 一个EventLoopGroup包含一个或多个EventLoop。
  • 一个EventLoopGroup在生命周期中只能和一个Thread绑定。
  • 所有由EventLoop处理的I/O事件都将在它专有的Thread上被处理。
  • 一个Channel在它的生命周期中只能注册于一个EventLoop。
  • 一个EventLoop可能会被分配给一个或多个Channel。

netty使用事件循环EventLoop来处理连接中的请求任务。EventLoop采用了两个基本的API:网络和并发编程。io.netty.util.concurrent包基于JUC包而构建地,用来提供线程的执行器 。io.util.channel包的类为了与Channel事件进行交互,扩展了这些类和接口。

1591530734568

netty的任务调度扩展了JUC的SecheduledExecutorService,因为netty的定时任务可以放入EventLoop的执行队列,不需要像JUC那样进行线程切换,所以降低 了性能消耗:

ctx.channel().eventLoop().schedule(new Runnable() { // 创建任务处理线程
            @Override
            public void run() {
                System.out.println("EventLoop任务调度");
            }
        }, 60, TimeUnit.MICROSECONDS); // 指定调度周期
复制代码

1591532990719

ChannelFuture接口

netty提供了ChannelFuture接口用于作为异步调用的占位符,ChannelFuture.addListener()方法注册了一个ChannelFutureListener,以便在某个操作完成时得到通知。

ChannelHandler接口

ChannelHandler接口成为处理入站和出站数据的应用程序逻辑的容器。

netty以适配器模式提供了大量默认的ChannelHandler实现,旨在简化应用程序处理逻辑的开发过程。下面是编写自定义ChannelHandler时常有的适配器类:

  • ChannelHandlerAdapter
  • ChannelInbouAdapter
  • ChannelOutboundHandlerAdapter
  • ChannelDuplexHandler

ChannelHandler有对应的生命周期,在ChannelHandler在ChannelPipline 中添加或移除时,会调用相关操作。

类型 描述
handlerAdded ChannelHandler被添加到ChannelPipline
handlerRemoved ChannelHandler被ChannelPipline移除
exceptionCaught 处理过程中ChannelPipline有错误产生

我们可以通过实现ChannelInboundHandler或ChannelOutboundHandler接口来自定义自己的处理逻辑,也可以通过扩展ChannelInboundHandlerAdapter或ChannelOutboundHandlerAdapter类来实现 。

资源管理:在处理器处理数据时,我们需要确保在最后没有资源泄露,后面讲到的ByteBuf引用计数技术也是为了解决这个问题的。netty提供了ResourceLeakDetector用于检测内存泄漏。泄漏的级别可以通过java -Dio.netty.leakDetectionLevel=泄漏级别来指定。netty定义的泄漏级别有:

级别 描述
DISABLED 禁用泄漏检测
SIMPLE 使用1%的默认采样率并报告发现的泄漏(默认级别)
ADVANCED 使用1%的采样率并报告发现的泄漏以及对应的消息被访问位置
PARANOID 对每次消息访问进行采样,报告发现的泄漏和对应的访问位置

编码器和解码器就是很典型的ChannelHandler实现。

在进行服务器开发时,就要注意解决粘包问题。大体的解决方案就是定义一个协议格式,接收信息后对数据按照协议进行解析。netty对应于不同的格式设计,提供了不同类型的抽象,大部分的命名方式为XxxDecoderXxxEncoder,比如支持Google的Protocol Buffers的ProtobufEncoder和ProtobufDecoder。

netty的解码编码器大体可以分为两类:

  • 从字节到消息(ByteToMessage)
  • 从消息到消息(MessageToMessage)

我们可以通过扩展netty预置的解码器和编码器来实现自己的处理器。对于每个从入站Channel读取的消息,在channelRead()方法进行完后,它将调用解码器所提供的decode()方法,并将已解码的字节转发给ChannelPipline中的下一个ChannelInboundHandler。出站也类似。

解码器

netty为字节到消息的编码器实现提供了一个基类:ByteToMessageDecoder(继承了ChannelInboundHandlerAdapter),该类会对入站数据进行缓冲,直到准备好处理。它有两个最重要的方法:

API 描述
decode(ChannelHandlerContext ctx,ByteBuf in,List< Object > out) 必须要实现的抽象方法。decode()方法被调用时将会传入一个包含了传入数据的ByteBuf,以及一个用来添加解码消息的List。对这个方法的调用将会重复进行,直到确定没有新的元素被添加到该List,或者该ByteBuf 中没有更多可读取的字节时为止。然后,如果该List 不为空,那么它的内容将会被传递给ChannelPipeline 中的下一个ChannelInboundHandler。
decodeLast(ChannelHandlerContext ctx,ByteBuf in,List< Objec t> out) 当Channel的状态变为非活动时,这个方法将会被调用一次。默认是调用decode()方法。

ReplayingDecoder扩展了ByteToMessageDecoder(继承了ChannelInboundHandlerAdapter),ReplayingDecoder在处理数据时不用判断接收数据的长度。ReplayingDecoder实现了自己的ReplayingDecoderByteBuf,当数据不够时会抛出异常,然后ReplayingDecoder会重置readerIndex并且再次调用decode方法。

虽然ReplayingDecoder使用比ByteToMessageDecoder更便利,但是实际上ReplayingDecoder的运行稍慢于ByteToMessageDecoder。

netty为消息到消息的编码器实现提供了一个基类:MessageToMessageDecoder。它有最重要的方法:

API 描述
decode(ChannelHandlerContext ctx,ByteBuf in,List< Object > out) 必须要实现的抽象方法。decode()方法被调用时将会传入一个包含了传入数据的ByteBuf,以及一个用来添加解码消息的List。对这个方法的调用将会重复进行,直到确定没有新的元素被添加到该List,或者该ByteBuf 中没有更多可读取的字节时为止。然后,如果该List 不为空,那么它的内容将会被传递给ChannelPipeline 中的下一个ChannelInboundHandler。

netty提供了TooLongFrameException异常,用于在解码器在帧超过指定大小限制时抛出,防止解码器缓冲大量数据造成内存耗尽。

解码基于分隔符的协议:基于分隔符的(delimited)消息协议使用定义的字符来标记的消息或者消息段(帧)的开头或者结尾。用于处理基于分隔符的协议和基于长度的协议的解码器有:

名称 描述
DelimiterBasedFrameDecoder 使用任何由用户提供的分隔符来提取帧的通用解码器
LineBasedFrameDecoder 提取由行尾符(\n 或者\r\n)分隔的帧的解码器。这个解码器比DelimiterBasedFrameDecoder 更快

解码基于长度的协议:基于长度的协议通过将它的长度编码到帧的头部来定义帧,而不是使用特殊的分隔符来标记它的结束。用于基于长度的协议的解码器有:

名称 描述
FixedLengthFrameDecoder 提取在调用构造函数时指定的定长帧
LengthFieldBasedFrameDecoder 根据编码进帧头部中的长度值提取帧;该字段的偏移量以及长度在构造函数中指定

编码器

编码器实现了ChannelOutboundHandler,并将出站数据从一种格式转换为另一种格式。

MessageToByteEncoder是将消息转换为字节的基类,最重要的方法是:

API 描述
encode(ChannelHandlerContext ctx,I msg,ByteBuf out) 必须要实现的抽象方法。被调用时将会传入要被该类编码为ByteBuf 的(类型为I 的)出站消息。该ByteBuf 随后将会被转发给ChannelPipeline中的下一个ChannelOutboundHandler

ByteToMessageDecoder之所以比MessageToByteEncoder多个decodeLast方法,是因为解码器通常需要在Channel关闭之后产生最后一个消息。

MessageToMessageEncoder是将消息转换为消息的基类,最重要的方法是:

API 描述
encode(ChannelHandlerContext ctx,I msg,List< Object > out) 必须要实现的抽象方法。。每个通过write()方法写入的消息都将会被传递给encode()方法,以编码为一个或者多个出站消息。随后,这些出站消息将会被转发给ChannelPipeline中的下一个ChannelOutboundHandler

编解码器

从上面我们了解到编码器都是继承了ChannelInboundHandlerAdapter,而解码器继承了ChannelOutboundHandlerAdapter,那么如果我们同时实现这两个特性,是不是就可以将编码和解码整合到一个类中?netty基此为我们提供了字节和消息的编解码器。

字节编解码器:ByteToMessageCodec抽象类,该类结合了ByteToMessageDecoder和MessageToByteEncode,该类由重要的三个方法:

API 描述
decode(ChannelHandlerContext ctx,ByteBuf in,List< Object > out) 只要有字节可以被消费,这个方法就将会被调用。它将入站ByteBuf 转换为指定的消息格式, 并将其转发给ChannelPipeline 中的下一个ChannelInboundHandler
decodeLast(ChannelHandlerContext ctx,ByteBuf in,List< Object > out) 这个方法的默认实现委托给了decode()方法。它只会在Channel 的状态变为非活动时被调用一次。它可以被重写以实现特殊的处理。
encode(ChannelHandlerContext ctx,msg,ByteBuf out) 对于每个将被编码并写入出站ByteBuf 的(类型为I 的)消息来说,这个方法都将会被调用

消息编解码器:MessageToMessageCodec抽象类,通过使用MessageToMessageCodec,可以在一个单个的类中实现该转换的往返过程。该类的两个重要方法:

API 描述
protected abstract decode(ChannelHandlerContext ctx,INBOUND_IN msg,List< Object > out) 这个方法被调用时会被传入INBOUND_IN 类型的消息。它将把它们解码为OUTBOUND_IN 类型的消息,这些消息将被转发给ChannelPipeline 中的下一个ChannelInboundHandler
protected abstract encode(ChannelHandlerContext ctx,OUTBOUND_IN msg,List< Object > out) 对于每个OUTBOUND_IN 类型的消息,这个方法都将会被调用。这些消息将会被编码为INBOUND_IN 类型的消息,然后被转发给ChannelPipeline 中的下一个ChannelOutboundHandler

CombinedChannelDuplexHandler类可以将编码器和解码器组合起来,如下例所示:

// ByteToCharDecoder是自定义的编码器,CharToByteEncoder是自定义的解码器
public class CombinedByteCharCodec extends
    CombinedChannelDuplexHandler<ByteToCharDecoder, CharToByteEncoder> {
    public CombinedByteCharCodec() {
    super(new ByteToCharDecoder(), new CharToByteEncoder());
    }
}
复制代码

序列化与反序列化

netty提供了三种序列化方式:

  1. jdk自带的序列化。

    名称 描述
    CompatibleObjectDecode 和使用 JDK 序列化的非基于 Netty的远程节点进行互操作的解码器
    CompatibleObjectEncoder 和使用JDK 序列化的非基于Netty 的远程节点进行互操作的编码器
    ObjectDecoder 构建于JDK 序列化之上的使用自定义的序列化来解码的解码器;当没有其他的外部依赖时,它提供了速度上的改进。否则其他的序列化实现更加可取
    ObjectEncoder 构建于JDK 序列化之上的使用自定义的序列化来编码的编码器;当没有其他的外部依赖时,它提供了速度上的改进。否则其他的序列化实现更加可取
  2. 使用JBoss序列化:JBoss不仅修复了jdk自带的序列化器的一些问题,而且提高了性能。

    名称 描述
    CompatibleMarshallingDecoder,CompatibleMarshallingEncoder 与只使用JDK 序列化的远程节点兼容
    MarshallingDecoder,MarshallingEncoder 适用于使用JBoss Marshalling 的节点。这些类必须一起使用
  3. 使用Protocol Buffers序列化:Protocol Buffers是一种由谷歌公司开发并开源的数据交换格式。

    名称 描述
    ProtobufDecoder 使用protobuf 对消息进行解码
    ProtobufEncoder 使用protobuf 对消息进行编码
    ProtobufVarint32FrameDecoder 根据消息中的Google Protocol Buffers 的"Base 128 Varints"整型长度字段值动态地分割所接收到的ByteBuf
    ProtobufVarint32LengthFieldPrepender 向ByteBuf 前追加一个Google Protocal Buffers 的"Base 128 Varints"整型的长度字段值

预置的ChannelHandler

使用https:netty通过SslHandler使用了javax.net.ssl包,来实现SSL加密。此外它还提供了OpenSSL工具的SSLEngine实现——OPenSSLEngine,它的性能比jdk的SSLEngine更好。netty默认会尝试加载OpenSSLEngine,如果失败再去加载JdkSSLEngine。它的相关方法有:

API 描述
setHandshakeTimeout (long,TimeUnit),setHandshakeTimeoutMillis (long),getHandshakeTimeoutMillis() 设置和获取超时时间,超时之后,握手ChannelFuture 将会被通知失败
setCloseNotifyTimeout (long,TimeUnit),setCloseNotifyTimeoutMillis (long),getCloseNotifyTimeoutMillis() 设置和获取超时时间,超时之后,将会触发一个关闭通知并关闭连接。这也将会导致通知该ChannelFuture 失败
handshakeFuture() 返回一个在握手完成后将会得到通知的ChannelFuure。如果握手先前已经执行过了,则返回一个包含了先前的握手结果的ChannelFuture
close(),close(ChannelPromise),close(ChannelHandlerContext,ChannelPromise) 发送close_notify 以请求关闭并销毁底层的SslEngine

Http编解码器:netty提供了多个ChannelHandler用于将数据格式化为http响应或将请求解析为数据。下图展示了http请求响应的组成部分:

1591601567435

主要的编解码器有:

名称 描述
HttpRequestEncoder 将HttpRequest、HttpContent 和LastHttpContent 消息编码为字节
HttpResponseEncoder 将HttpResponse、HttpContent 和LastHttpContent 消息编码为字节
HttpRequestDecoder 将字节解码为HttpRequest、HttpContent 和LastHttpContent 消息
HttpResponseDecoder 将字节解码为HttpResponse、HttpContent 和LastHttpContent 消息

http聚合:对于某些请求或响应数据,netty的编解码器可能会无法完整解析它们,而是将它们解析为多个数据片段,比如HttpServerCodec只能获取uri中参数,那么如果使用post请求,因为信息保存在messageBody,所以无法完全解析。这时就需要加上HttpObjectAggregator。HttpObjectAggregator结构体系如下:

1591602159652

MessageAggregator的decode()里有一个currentMessage参数,它是该handler的成员变量,每一个channel对应一个handler实例,这个currentMessage会存储多次decode迭代的结果,这就是聚合实现的关键。

http压缩:虽然http数据压缩会带来服务器时钟的开销,但是可以节省网络流量,加快传输速率。客户端可以使用HttpContentDecompressor来处理来自服务器的内容,服务端使用HttpContentCompressor()进行数据压缩。

websocket:netty为实现websocket长连接提供了多种框架。以下是websocketFrame类型:

名称 描述
BinaryWebSocketFrame 数据帧:二进制数据
TextWebSocketFrame 数据帧:文本数据
ContinuationWebSocketFrame 数据帧:属于上一个BinaryWebSocketFrame 或者TextWebSocketFrame 的文本的或者二进制数据
CloseWebSocketFrame 控制帧:一个CLOSE 请求、关闭的状态码以及关闭的原因
PingWebSocketFrame 控制帧:请求一个PongWebSocketFrame
PongWebSocketFrame 控制帧:对PingWebSocketFrame 请求的响应

要想为WebSocket 添加安全性,只需要将SslHandler作为第一个ChannelHandler 添加到 ChannelPipeline 中。

连接管理:netty为检测处理空闲和超时连接提供了管理器。主要的有:

名称 描述
IdleStateHandler 当连接空闲时间太长时,将会触发一个IdleStateEvent 事件。然后,可以通过在ChannelInboundHandler中重写userEventTriggered()方法来处理该IdleStateEvent 事件
ReadTimeoutHandler 如果在指定的时间间隔内没有收到任何的入站数据,则抛出一个ReadTimeoutException 并关闭对应的Channel。可以通过重写ChannelHandler 中的exceptionCaught()方法来检测该ReadTimeoutException
WriteTimeoutHandler 如果在指定的时间间隔内没有任何出站数据写入,则抛出一个WriteTimeoutException 并关闭对应的Channel 。可以通过重写你的ChannelHandler 的exceptionCaught()方法检测该WriteTimeoutException

ChannelPipline接口

ChannelPipline接口实现了链式调用,可添加ChannelHandler容器。当Channel被创建时,它会被自动分配到它所属的ChannelPipline上。

1591427854256

如图,当一个入站消息进入时,它会从ChannelPipline的头部开始,并被传递给第一个ChannelInboundHandler,当在此ChannelHandler被处理完后,它又会被传递给下一个ChannelInboundHandler,直到到达ChannelPipline的尾部。消息的出站与入站差不多。

当ChannelHandler被添加到ChannelPipline时,它会被分配一个ChannelHandlerContext,其代表了ChannelHandler和ChannelPipline之间的绑定。

在netty中,有两种发送消息的方式:

  1. 直接写到Channel中,这将使消息从ChannelPipline的尾部开始流动。
  2. 直接写到ChannelHandlerContext中,这将是消息从ChannelPipline中的 下一个ChannelHandlerContext开始流动。

在ChannelPipline 传播事件时,它会测试ChannelPipline 中的下一个ChannelHandler的类型是否符合事件的运动方向,如果不匹配,那么它会跳过该ChannelHandler。

ChannelHandlerContext

ChannelHandlerContext主要作用是使ChannelHandler和ChannelPipline进行交互,ChannelHandler可以通知ChannelHandler所属的ChannelPipline的下一个ChannelHandler ,可以修改它所属的ChannelPipline。

ChannelHandlerContext API:

1591518493786

ChannelHandlerContext中有些方法Channel和ChannelPipline也有,要注意的是如果调用Channel 或者ChannelPipeline 上的这些方法,它们将沿着整个ChannelPipeline 进行传播。而调用位于ChannelHandlerContext 上的相同方法,则将从当前所关联的ChannelHandler 开始,并且只会传播给位于该 ChannelPipeline 中的下一个能够处理该事件的ChannelHandler。

ServerBootstrap/Boostrap

netty的引导类为应用程序的网络层配置提供了容器,ServerBootstrap用于引导服务端,Bootstrap用于引导客户端。

1591429873430

我们可以看到ServerBootstrap和Bootstrap都实现了AbstractBootstrap抽象类。

在前面文章中,在实现服务端引导程序时,我们创建并绑定了两个EventLoopGroup,这是为什么呢?查看源码我们可以看出它的解释:

1591430052433

源码的解释为服务端引导需要一个EventLoopGroup表示服务器自身已被绑定到了本地端口正在监听的套接字,第二组则是用来处理客户端连接的。

查看Bootstrap的源码我们可以看到当调用Bootstrap.group(EventLoopGroup group)时,实际上是调用AbstractBootstrap的group方法,正和ServerBootstrap.group(EventLoopGroup parentGroup, EventLoopGroup childGroup)的第一行一样。

ByetBuf

网络传输的基本单位都是字节,而对于字节操作可以使用java的ByetBuffer API。但是ByteBuf有一些的局限性 :

  1. 它的长度长度固定,一旦分配完成,容量不能动态扩展和收缩,当需要编码的POJO对象大于ByteBuffer的容量时,会发生索引越界异常;

  2. ByteBuffer只有一个标识位控的指针position,读写的时候需要手工调用flip()和rewind()等,很容易导致程序处理失败。

  3. API功能有限,一些高级和实用的特性需要使用者自己编程实现。

对于此,netty在ByteBuffer的基础上再次构建,从而实现了新的、功能强大的ByteBuf API,它具有以下优点:

  1. 可以被用户自定义的缓冲区类型扩展。
  2. 通过内置复合缓冲区实现零拷贝。
  3. 容量变长增长 。
  4. 读和写采用不同的指针。
  5. 支持方法链式调用。
  6. 支持引用计数。
  7. 支持池化。

ByteBuf的使用模式

堆缓冲区

堆缓冲区模式又称为:支撑数组(backing array)模式。之间将数据存放在JVM的堆空间,通过将数据存储在数组中实现。示例如下:

public static void heapBuffer() {
    // 创建Java堆缓冲区
    ByteBuf heapBuf = Unpooled.buffer(); 
    if (heapBuf.hasArray()) { // 判断是否有支撑数组
        byte[] array = heapBuf.array(); // 获取该支撑数组引用
        int offset = heapBuf.arrayOffset() + heapBuf.readerIndex(); // 计算第一个字节偏移量
        int length = heapBuf.readableBytes(); // 获取可读字节数
        handleArray(array, offset, length); // 调用自己的方法
    }
}
复制代码

直接缓冲区

直接缓冲区属于堆外分配的直接内存,不会占用堆的容量。适用于套接字传输过程,避免了数据从内部缓冲区拷贝到直接缓冲区的过程,性能较好。它的主要缺点是相对于基于堆的缓冲区,它们的分配和释放代价都较为昂贵,并且因为数据不是在java堆上 ,所以处理前你需要再进行一次复制。

public static void directBuffer() {
    ByteBuf directBuf = Unpooled.directBuffer();
    if (!directBuf.hasArray()) { // 如果不是堆缓冲区
        int length = directBuf.readableBytes(); // 获取可读字节数
        byte[] array = new byte[length]; // 分配一个新数组来保存数据
        directBuf.getBytes(directBuf.readerIndex(), array); // 将数据复制到新数组
        handleArray(array, 0, length); // 调用自己的方法
    }
}
复制代码

复合缓冲区

复合缓冲区是netty特有的缓冲区。本质上类似于提供一个或多个ByteBuf的组合视图,可以根据需要添加和删除不同类型的ByteBuf。复合缓冲区不支持访问其支撑数组。因此如果要访问,需要先将内容拷贝到堆内存中,再进行访问。

public static void byteBufComposite() {
    // 复合缓冲区,只是提供一个视图
    CompositeByteBuf messageBuf = Unpooled.compositeBuffer();
    ByteBuf headerBuf = Unpooled.buffer();
    ByteBuf bodyBuf = Unpooled.directBuffer();
    messageBuf.addComponents(headerBuf, bodyBuf); // 将ByteBuf实例追加到CompositeByteBuf
    messageBuf.removeComponent(0); // 移除索引位置为0的缓冲区
    for (ByteBuf buf : messageBuf) { // 遍历缓冲区
        System.out.println(buf.toString());
    }
}
复制代码

字节级操作

随机访问索引:ByteBuf的索引是从0开始的。

public static void byteBufRelativeAccess() {
    ByteBuf buffer = Unpooled.buffer();
    for (int i = 0; i < buffer.capacity(); i++) {
        byte b = buffer.getByte(i);// 不改变readerIndex值
        System.out.println((char) b);
    }
}
复制代码

对于那些只需要一个索引值参数的方法,它们都不会改变readIndex和writeIndex,不过可以通过调用readerIndex(index)和writeIndex(index)来手动改变。

可丢弃字节:可丢弃字节区域是指[0,readerIndex)之间的区域。可调用discardReadBytes()方法丢弃已经读过的字节。discardReadBytes()方法会移动可读字节区域内容(CONTENT)。如果频繁调用,会有多次数据复制开销,对性能有一定的影响

可读字节:可读字节区域是指[readerIndex, writerIndex)之间的区域。任何readxxx()skipxxx()的操作方法,都会改变readerIndex索引。

可写字节:可写字节区域是指[writerIndex, capacity)之间的区域。任何writexxx()的操作方法都将改变writerIndex的值。

索引管理

  1. markReaderIndex()、markWriterIndex()——将流的当前位置标记。
  2. resetReaderIndex()、resetWriterIndex()——将流重置到标记位置。
  3. readIndex(index)、writeIndex(int)——将读/写索引移到指定位置。
  4. clear()——将readerIndex和writeerIndex设置为0,但是内存中的内容不会被清除。

查找操作:查找ByteBuf指定的值,最简单的就是使用indexOf()方法,较复杂的可以通过ByteBufProcessor作为参数的方法达,比如int index = buffer.forEachByte(ByteProcessor.FIND_CR);

派生缓冲区:派生缓冲区为ByteBuf提供了一个访问的视图。视图仅仅提供一种访问操作,不做任何拷贝操作。如果你修改了这个新的ByteBuf实例的具体内容,那么对应的源实例也会被修改,如果需要拷贝现有缓冲区的真实副本,请使用copy()或copy(int, int)方法。

读/写操作:有两种类别的读写:

  1. get()和set()操作——从给定的索引开始,并且保持索引不变。

  2. read()和write()操作——从给定的索引开始,并且根据已经访问过的字节数对索引进行访问。

ByteBufHolder接口

ByteBufHolder是netty的高级特性,为缓冲区池化提供了支持。可以通过子类实现ByteBufHolder接口,根据自身需要添加自己需要的数据字段。可以用于自定义缓冲区类型扩展字段。

ByetBuf分配

按需分配

netty通过ByteBufAllocator接口实现了(ByteBuf的)池化。可以通过Channel或者ChannelHandler的ChannelHandlerContext获取一个到ByteBufAllocator的引用。

netty提供了两种ByteBufAllocator的实现:PooledByteBufAllocator(池化)、UnpolledByteBufAlloactor(非池化)。netty默认使用PooledByteBufAllocator,但是也可以通过ChannelConfig修改。

Unpooled缓冲区

在未获得ByteBufAllocator引用的情况下,我们可以使用netty提付的Unpooled工具类来创建未池化的ByteBufAllocator。

ByteBufUtil类

ByteBufUtil类提供了用于操作ByteBuf的静态的辅助方法。hexdump()方法以十六进制的表示形式大于ByteBuf内容。equals(ByteBuf, ByteBuf)方法用来判断两个ByteBuf实例是否相等。

引用计数

netty通过实现ReferenceCounted接口为ButeBuf和ButeBufHolder引入了引用计数技术。当计数为0时,系统将会回收缓冲区,它降低了内存分配的开销。

这篇关于写给大忙人看的netty实战的文章就介绍到这儿,希望我们推荐的文章对大家有所帮助,也希望大家多多支持为之网!