netty作为一个NIO客户端服务器框架,可以快速、轻松地构建网络应用,比如协议服务器和客户端。netty吸收了FTP、SMTP、HTTP等协议的实现经验,在易用性和敏捷性的基础上保证了程序的稳健性以及可维护性 。
当我们刚开始学习java网络编程时都是开一个socket
端口,然后调用方法accept()
方法阻塞式等待连接,然后不断读取数据。等到后面掌握了更多后,我们尝试使用非阻塞方式读取数据以及使用Selector
选择器的非阻塞I/O。
随着业务不断增长,成千上万的并发量不再是不可能的了。为了更高的吞吐量与可扩展的性能,稳健、易行的客户端开发框架成为开发者的追求,而netty很完美地满足了人们的需求。它封装了java复杂的底层API,并以易于使用的方式暴露出来,使用netty可以更加注重业务逻辑的开发,而不是琐碎的底层架构。
在下面是netty的核心组件,详细的内容将于后续记录:
Channel
(通道)。
通道是java nio
的一个核心概念,它表示一个到实体的操作连接(比如网络连接、文件I/O操作)。
回调。
回调实际上是 一个方法,当一个方法调用时,其指定(或者说是绑定)的回调方法也会被调用。
Future
。
Future
是一个异步操作的占位符,当该异步操作完成时,其对应的Future
对象便会调用。netty的Future
实现——ChannelFuture
允许一个异步操作可以注册多个ChannelFutureListener
实例。每个netty的出站I/O都会返回一个ChannelFuture
实例。
事件与Handler
(处理器)。
netty是通过事件来通知我们操作状态的改变,事件可能有:
netty为ChannelHandler
有许多实现,同时你也可以自定义实现。每个事件都会分发到对应的ChannelHandler
类中的某个方法。
注意:入站和出站是相对
ChannelHandler
而言的,进入ChannelHandler
为入站,从ChannelHandler
发出消息是出站。
下面是一个简单的netty使用:
@ChannelHandler.Sharable // 标识一个 channelHandler 可以被多个 channel 安全地调用。 public class NettyServerHandler extends ChannelInboundHandlerAdapter { @Override // 当有入站消息时该方法就会调用 public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { ByteBuf buffer = (ByteBuf)msg; System.out.println("服务器收到消息:" + buffer.toString(CharsetUtil.UTF_8)); // 将接收到的消息写给发送者,而不冲刷出站消息。 ctx.write(buffer); } @Override // channelRead消费完读取的数据的时候被触发 public void channelReadComplete(ChannelHandlerContext ctx) throws Exception { // 将未决消息冲刷到远程节点,并且关闭该 channel ChannelFuture channelFuture = ctx.writeAndFlush(Unpooled.EMPTY_BUFFER); channelFuture.addListener(ChannelFutureListener.CLOSE); } @Override // 在读操作时处理异常 public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception { // 打印异常栈,并关闭该 channel cause.printStackTrace(); ctx.close(); } } 复制代码
注意:netty服务端handler的处理是采用责任链的形式。默认情况下,channelHandler会把对它的方法调用转发给链中的下一个channelHandler。如果
exceptionCaught()
方法没有被该链中的某处实现,那么所接收的异常会被传递到channelPipeline
的尾端并被记录。
public class NettyServer { public static void main(String[] args) { NettyServer nettyServer = new NettyServer(); nettyServer.start(8888); } public void start(int port){ // 处理TCP连接请求 EventLoopGroup bossGroup = new NioEventLoopGroup(); // 处理I/O事件 EventLoopGroup workGroup = new NioEventLoopGroup(); try { // 用于引导和绑定服务器 ServerBootstrap bootstrap = new ServerBootstrap(); //将上面的线程组加入到 bootstrap 中 bootstrap.group(bossGroup,workGroup) //将通道设置为异步的通道 .channel(NioServerSocketChannel.class) .childHandler(new ChannelInitializer<SocketChannel>() { protected void initChannel(SocketChannel socketChannel) throws Exception { // 因为 NettyServerHandler 被标注为 @Sharable,所以可以使用相同的实例 socketChannel.pipeline().addLast(new NettyServerHandler()); } }) .option(ChannelOption.SO_BACKLOG,200) .childOption(ChannelOption.SO_KEEPALIVE,true); // 异步绑定服务器,调用 sync() 方法阻塞等待直到绑定完成。 ChannelFuture future = bootstrap.bind(port).sync(); // 获取 channel 的 closeFuture,并且阻塞直到它完成。 future.channel().closeFuture().sync(); } catch (InterruptedException e) { e.printStackTrace(); } finally { bossGroup.shutdownGracefully(); workGroup.shutdownGracefully(); } } } 复制代码
@ChannelHandler.Sharable public class NettyClientHandler extends SimpleChannelInboundHandler<ByteBuf> { @Override public void channelActive(ChannelHandlerContext ctx) throws Exception { ctx.writeAndFlush(Unpooled.copiedBuffer("netty 活跃", CharsetUtil.UTF_8)); } // 记录已接收的消息存储 @Override protected void channelRead0(ChannelHandlerContext ctx, ByteBuf msg) throws Exception { System.out.println("客户端接收消息:" + msg.toString(CharsetUtil.UTF_8)); } @Override public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception { cause.printStackTrace(); ctx.close(); } } 复制代码
SimpleChannelInboundHandler
与ChannelInboundHandler
:客户端使用
SimpleChannelInboundHandler
的原因是它不需要考虑异步操作,当channelRead0
方法执行完后,SimpleChannelInboundHandler
就会释放指向保存该消息的ByteBuffer
的内存。服务端使用
ChannelInboundHandler
是因为它需要给客户端传送消息,而ctx.write()
方法是异步的,可能channelRead()
方法执行完了它还没有返回,所以为了避免这种情况便使用了ChannelInboundHandler
。channelReadComplete
方法会在channelRead()
消费完读取的数据的时候被触发,此时它会将输出冲刷到channel
。
public class NettyClient { public static void main(String[] args) { NettyClient nettyClient = new NettyClient(); nettyClient.connect("localhost", 8888); } public void connect(String hostname,int port) { // 处理TCP连接请求 EventLoopGroup group = new NioEventLoopGroup(); try { // 用于引导和绑定服务器 Bootstrap bootstrap = new Bootstrap(); //将上面的线程组加入到 bootstrap 中 bootstrap.group(group) //将通道设置为异步的通道 .channel(NioSocketChannel.class) .handler(new ChannelInitializer<SocketChannel>() { protected void initChannel(SocketChannel socketChannel) throws Exception { ChannelPipeline pipeline = socketChannel.pipeline() .addLast(new NettyClientHandler()); } }) .option(ChannelOption.TCP_NODELAY,true); // 连接到远程节点,阻塞等待直到连接完成。 ChannelFuture future = bootstrap.connect(hostname, port).sync(); // 阻塞,直到 channel 关闭。 future.channel().closeFuture().sync(); } catch (InterruptedException e) { e.printStackTrace(); } finally { // 关闭线程池并释放所有资源。 group.shutdownGracefully(); } } } 复制代码
从上面的代码示例我们可以看出netty服务端和客户端的构建其实大同小异,都是实现了处理器然后进行绑定。
服务端开发:
ChannelHandler
。ChannelHandler
。ServerBootstrap
实例来引导和绑定服务器。NioEventLoopGroup
实例来进行请求处理。NioEventLoopGroup
实例来进行事件处理。InetSocketAddress
。channel
。ServerBootstrap.bind()
方法来绑定服务器。客户端开发:
ChannelHandler
。ChannelHandler
。NioEventLoopGroup
实例来进行事件处理。InetSocketAddress
实例。Handler
会被安装到该channel
的ChannelPipline
上。Bootstrap.connect()
方法连接远程节点 。在前面,我们初步了解了netty,包括它的核心内容、简单使用。接下来我们将继续学习netty的主要组件与设计理念。
我们知道netty的核心组件包括通道,通道是java中很重要的一个概念,它表达一个到实体操作的连接。netty将多个操作抽象出来作为通道,其中包括:
我们知道网络中基本的I/O操作(连接建立、读取数据、写入数据)都是依赖于底层网络传输所提供的接口,在java中则表示为Socket类。netty对此类进行进一步的封装,大大降低了Socket类的复杂度,它基于Channel接口提供了一系列的实现:
如图所示,每个Channel都会被分配一个ChannelPipline和ChannelConfig,ChannelConfig包含了该Channel的所有配置,并且支持热更新。通常在Channel实例被创建时,就会创建默认的ChannelConfig:
// NioServerSocketChannel构造方法 public NioServerSocketChannel(ServerSocketChannel channel) { super(null, channel, SelectionKey.OP_ACCEPT); config = new NioServerSocketChannelConfig(this, javaChannel().socket()); } 复制代码
netty提供了一个ChannelOption类,定义了ChannelConfig支持的所有参数类型,可以这样使用:
NioServerSocketChannel channel = new NioServerSocketChannel(); ServerSocketChannelConfig config = channel.config(); config.setOption(ChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT); // 针对每个ChannelOption选项,netty还提供了对应的方法,比如上面的设置可以这样代替 // config.setAllocator(PooledByteBufAllocator.DEFAULT); // 设置通道 bootstrap.channel(channel.getClass()); 复制代码
为了保证Channel的顺序性,因此实现了Comparable接口。所以如果两个不同的Channel的哈希值一样,那么就会抛出错误。
Channel还提供了另外方法,主要的有:
方法名 | 描述 |
---|---|
eventLoop | 返回对应的EventLoop |
pipeline | 返回对应的ChannelPipline |
localAddress | 返回本地的SocketAddress |
remoteAddress | 返回远程的SocketAddress |
write | 将数据写到远程节点,这个数据将被传递给ChannelPipline,并且排队直到它被冲刷 |
flush | 将之前写好的数据冲刷到远程节点 |
writeFlush | 写数据并将其冲刷到远程节点 |
isActive | 判断该Channel是否处于活动状态 |
NIO通过Selector实现了所有I/O异步的操作,Selector运行在一个检查状态变化并对其做出相应的线程上。Selector的位模式由java.nio.channels.SelectionKey定义:
名称 | 描述 |
---|---|
OP_ACCEPT | 接受连接并创建Channel时获得通知 |
OP_CONNECT | 在建立连接时获得通知 |
OP_READ | 在数据可以读取时获得通知 |
OP_WRITE | 在Channel的发送缓冲区可写时获得通知 |
Epoll是来自Linux的一个高性能、可扩展的I/O事件通知特性。netty为Linux的epoll提供了对应的API。
OIO(old IO)是旧的阻塞I/O,通过常规的传输API使用,可以在项目移植的时候用它来进行过渡。
它是由netty提供的本地传输方式,用于在同一个JVM中运行的客户端和服务端之间的异步通信。
Embedded传输使得我们可以将一组ChannelHandler作为辅助类嵌入到其它的ChannelHandler内部,这样可以在不需要修改内部代码的情况下扩展一个ChannelHandler的功能。Embedded传输的关键是EmbeddChannel的Channel实现。
Channel定义了一组和ChannelInboundHandler相关的状态模式:
状态 | 描述 |
---|---|
ChannelUnregistered | Channel已被创建 ,但未被注册到EventLoop |
ChannelRegistered | Channel被注册到EventLoop |
ChannelActive | Channel处于激活状态,可以接收和发送数据 |
ChannelInactive | Channel处于未激活状态 |
EmbeddedChannel是netty专门改进针对ChannelHandler的单元测试而提供的。可以用它来模拟发送和请求消息,来测试对应的ChannelHandler的功能实现 。
EmbeddedChannel提供了以下常用的API:
API | 描述 |
---|---|
writeInbound | 写一个入站消息到 EmbeddedChannel。 如果数据能从 EmbeddedChannel 通过 readInbound() 读到,则返回 true; |
readInbound | 从 EmbeddedChannel 读到入站消息。任何返回遍历整个ChannelPipeline。如果读取还没有准备,则此方法返回 null; |
writeOutbound | 写一个出站消息到 EmbeddedChannel。 如果数据能从 EmbeddedChannel 通过 readOutbound() 读到,则返回 true; |
readOutbound | 从 EmbeddedChannel 读到出站消息。任何返回遍历整个ChannelPipeline。如果读取还没有准备,则此方法返回 null; |
Finish | 如果从入站或者出站中能读到数据,标记 EmbeddedChannel 完成并且返回。这同时会调用 EmbeddedChannel 的关闭方法; |
@Test public void testFramesDecoded(){ ByteBuf buf= Unpooled.buffer(); for (int i=0;i<9;i++){ buf.writeByte(i); } ByteBuf input=buf.duplicate(); EmbeddedChannel channel=new EmbeddedChannel( new FixedLengthFrameDecoder(3) ); assertTrue(channel.writeInbound(input.retain())); assertTrue(channel.finish()); //读取消息 ByteBuf read=channel.readInbound(); assertEquals(buf.readSlice(3),read); read.release(); read = (ByteBuf) channel.readInbound(); assertEquals(buf.readSlice(3), read); read.release(); read = (ByteBuf) channel.readInbound(); assertEquals(buf.readSlice(3), read); read.release(); assertNull(channel.readInbound()); buf.release(); } 复制代码
EventLoop定义了netty的核心抽象,用于处理连接的生命周期中所发生的事情。
从上图我们可以看出:
netty使用事件循环EventLoop来处理连接中的请求任务。EventLoop采用了两个基本的API:网络和并发编程。io.netty.util.concurrent包基于JUC包而构建地,用来提供线程的执行器 。io.util.channel包的类为了与Channel事件进行交互,扩展了这些类和接口。
netty的任务调度扩展了JUC的SecheduledExecutorService,因为netty的定时任务可以放入EventLoop的执行队列,不需要像JUC那样进行线程切换,所以降低 了性能消耗:
ctx.channel().eventLoop().schedule(new Runnable() { // 创建任务处理线程 @Override public void run() { System.out.println("EventLoop任务调度"); } }, 60, TimeUnit.MICROSECONDS); // 指定调度周期 复制代码
netty提供了ChannelFuture接口用于作为异步调用的占位符,ChannelFuture.addListener()方法注册了一个ChannelFutureListener,以便在某个操作完成时得到通知。
ChannelHandler接口成为处理入站和出站数据的应用程序逻辑的容器。
netty以适配器模式提供了大量默认的ChannelHandler实现,旨在简化应用程序处理逻辑的开发过程。下面是编写自定义ChannelHandler时常有的适配器类:
ChannelHandler有对应的生命周期,在ChannelHandler在ChannelPipline 中添加或移除时,会调用相关操作。
类型 | 描述 |
---|---|
handlerAdded | ChannelHandler被添加到ChannelPipline |
handlerRemoved | ChannelHandler被ChannelPipline移除 |
exceptionCaught | 处理过程中ChannelPipline有错误产生 |
我们可以通过实现ChannelInboundHandler或ChannelOutboundHandler接口来自定义自己的处理逻辑,也可以通过扩展ChannelInboundHandlerAdapter或ChannelOutboundHandlerAdapter类来实现 。
资源管理:在处理器处理数据时,我们需要确保在最后没有资源泄露,后面讲到的ByteBuf引用计数技术也是为了解决这个问题的。netty提供了ResourceLeakDetector用于检测内存泄漏。泄漏的级别可以通过java -Dio.netty.leakDetectionLevel=泄漏级别
来指定。netty定义的泄漏级别有:
级别 | 描述 |
---|---|
DISABLED | 禁用泄漏检测 |
SIMPLE | 使用1%的默认采样率并报告发现的泄漏(默认级别) |
ADVANCED | 使用1%的采样率并报告发现的泄漏以及对应的消息被访问位置 |
PARANOID | 对每次消息访问进行采样,报告发现的泄漏和对应的访问位置 |
编码器和解码器就是很典型的ChannelHandler实现。
在进行服务器开发时,就要注意解决粘包问题。大体的解决方案就是定义一个协议格式,接收信息后对数据按照协议进行解析。netty对应于不同的格式设计,提供了不同类型的抽象,大部分的命名方式为XxxDecoder
和XxxEncoder
,比如支持Google的Protocol Buffers的ProtobufEncoder和ProtobufDecoder。
netty的解码编码器大体可以分为两类:
我们可以通过扩展netty预置的解码器和编码器来实现自己的处理器。对于每个从入站Channel读取的消息,在channelRead()方法进行完后,它将调用解码器所提供的decode()方法,并将已解码的字节转发给ChannelPipline中的下一个ChannelInboundHandler。出站也类似。
netty为字节到消息的编码器实现提供了一个基类:ByteToMessageDecoder(继承了ChannelInboundHandlerAdapter),该类会对入站数据进行缓冲,直到准备好处理。它有两个最重要的方法:
API | 描述 |
---|---|
decode(ChannelHandlerContext ctx,ByteBuf in,List< Object > out) | 必须要实现的抽象方法。decode()方法被调用时将会传入一个包含了传入数据的ByteBuf,以及一个用来添加解码消息的List。对这个方法的调用将会重复进行,直到确定没有新的元素被添加到该List,或者该ByteBuf 中没有更多可读取的字节时为止。然后,如果该List 不为空,那么它的内容将会被传递给ChannelPipeline 中的下一个ChannelInboundHandler。 |
decodeLast(ChannelHandlerContext ctx,ByteBuf in,List< Objec t> out) | 当Channel的状态变为非活动时,这个方法将会被调用一次。默认是调用decode()方法。 |
ReplayingDecoder扩展了ByteToMessageDecoder(继承了ChannelInboundHandlerAdapter),ReplayingDecoder在处理数据时不用判断接收数据的长度。ReplayingDecoder实现了自己的ReplayingDecoderByteBuf,当数据不够时会抛出异常,然后ReplayingDecoder会重置readerIndex并且再次调用decode方法。
虽然ReplayingDecoder使用比ByteToMessageDecoder更便利,但是实际上ReplayingDecoder的运行稍慢于ByteToMessageDecoder。
netty为消息到消息的编码器实现提供了一个基类:MessageToMessageDecoder。它有最重要的方法:
API | 描述 |
---|---|
decode(ChannelHandlerContext ctx,ByteBuf in,List< Object > out) | 必须要实现的抽象方法。decode()方法被调用时将会传入一个包含了传入数据的ByteBuf,以及一个用来添加解码消息的List。对这个方法的调用将会重复进行,直到确定没有新的元素被添加到该List,或者该ByteBuf 中没有更多可读取的字节时为止。然后,如果该List 不为空,那么它的内容将会被传递给ChannelPipeline 中的下一个ChannelInboundHandler。 |
netty提供了TooLongFrameException异常,用于在解码器在帧超过指定大小限制时抛出,防止解码器缓冲大量数据造成内存耗尽。
解码基于分隔符的协议:基于分隔符的(delimited)消息协议使用定义的字符来标记的消息或者消息段(帧)的开头或者结尾。用于处理基于分隔符的协议和基于长度的协议的解码器有:
名称 | 描述 |
---|---|
DelimiterBasedFrameDecoder | 使用任何由用户提供的分隔符来提取帧的通用解码器 |
LineBasedFrameDecoder | 提取由行尾符(\n 或者\r\n)分隔的帧的解码器。这个解码器比DelimiterBasedFrameDecoder 更快 |
解码基于长度的协议:基于长度的协议通过将它的长度编码到帧的头部来定义帧,而不是使用特殊的分隔符来标记它的结束。用于基于长度的协议的解码器有:
名称 | 描述 |
---|---|
FixedLengthFrameDecoder | 提取在调用构造函数时指定的定长帧 |
LengthFieldBasedFrameDecoder | 根据编码进帧头部中的长度值提取帧;该字段的偏移量以及长度在构造函数中指定 |
编码器实现了ChannelOutboundHandler,并将出站数据从一种格式转换为另一种格式。
MessageToByteEncoder是将消息转换为字节的基类,最重要的方法是:
API | 描述 |
---|---|
encode(ChannelHandlerContext ctx,I msg,ByteBuf out) | 必须要实现的抽象方法。被调用时将会传入要被该类编码为ByteBuf 的(类型为I 的)出站消息。该ByteBuf 随后将会被转发给ChannelPipeline中的下一个ChannelOutboundHandler |
ByteToMessageDecoder之所以比MessageToByteEncoder多个decodeLast方法,是因为解码器通常需要在Channel关闭之后产生最后一个消息。
MessageToMessageEncoder是将消息转换为消息的基类,最重要的方法是:
API | 描述 |
---|---|
encode(ChannelHandlerContext ctx,I msg,List< Object > out) | 必须要实现的抽象方法。。每个通过write()方法写入的消息都将会被传递给encode()方法,以编码为一个或者多个出站消息。随后,这些出站消息将会被转发给ChannelPipeline中的下一个ChannelOutboundHandler |
从上面我们了解到编码器都是继承了ChannelInboundHandlerAdapter,而解码器继承了ChannelOutboundHandlerAdapter,那么如果我们同时实现这两个特性,是不是就可以将编码和解码整合到一个类中?netty基此为我们提供了字节和消息的编解码器。
字节编解码器:ByteToMessageCodec抽象类,该类结合了ByteToMessageDecoder和MessageToByteEncode,该类由重要的三个方法:
API | 描述 |
---|---|
decode(ChannelHandlerContext ctx,ByteBuf in,List< Object > out) | 只要有字节可以被消费,这个方法就将会被调用。它将入站ByteBuf 转换为指定的消息格式, 并将其转发给ChannelPipeline 中的下一个ChannelInboundHandler |
decodeLast(ChannelHandlerContext ctx,ByteBuf in,List< Object > out) | 这个方法的默认实现委托给了decode()方法。它只会在Channel 的状态变为非活动时被调用一次。它可以被重写以实现特殊的处理。 |
encode(ChannelHandlerContext ctx,msg,ByteBuf out) | 对于每个将被编码并写入出站ByteBuf 的(类型为I 的)消息来说,这个方法都将会被调用 |
消息编解码器:MessageToMessageCodec抽象类,通过使用MessageToMessageCodec,可以在一个单个的类中实现该转换的往返过程。该类的两个重要方法:
API | 描述 |
---|---|
protected abstract decode(ChannelHandlerContext ctx,INBOUND_IN msg,List< Object > out) | 这个方法被调用时会被传入INBOUND_IN 类型的消息。它将把它们解码为OUTBOUND_IN 类型的消息,这些消息将被转发给ChannelPipeline 中的下一个ChannelInboundHandler |
protected abstract encode(ChannelHandlerContext ctx,OUTBOUND_IN msg,List< Object > out) | 对于每个OUTBOUND_IN 类型的消息,这个方法都将会被调用。这些消息将会被编码为INBOUND_IN 类型的消息,然后被转发给ChannelPipeline 中的下一个ChannelOutboundHandler |
CombinedChannelDuplexHandler类可以将编码器和解码器组合起来,如下例所示:
// ByteToCharDecoder是自定义的编码器,CharToByteEncoder是自定义的解码器 public class CombinedByteCharCodec extends CombinedChannelDuplexHandler<ByteToCharDecoder, CharToByteEncoder> { public CombinedByteCharCodec() { super(new ByteToCharDecoder(), new CharToByteEncoder()); } } 复制代码
netty提供了三种序列化方式:
jdk自带的序列化。
名称 | 描述 |
---|---|
CompatibleObjectDecode | 和使用 JDK 序列化的非基于 Netty的远程节点进行互操作的解码器 |
CompatibleObjectEncoder | 和使用JDK 序列化的非基于Netty 的远程节点进行互操作的编码器 |
ObjectDecoder | 构建于JDK 序列化之上的使用自定义的序列化来解码的解码器;当没有其他的外部依赖时,它提供了速度上的改进。否则其他的序列化实现更加可取 |
ObjectEncoder | 构建于JDK 序列化之上的使用自定义的序列化来编码的编码器;当没有其他的外部依赖时,它提供了速度上的改进。否则其他的序列化实现更加可取 |
使用JBoss序列化:JBoss不仅修复了jdk自带的序列化器的一些问题,而且提高了性能。
名称 | 描述 |
---|---|
CompatibleMarshallingDecoder,CompatibleMarshallingEncoder | 与只使用JDK 序列化的远程节点兼容 |
MarshallingDecoder,MarshallingEncoder | 适用于使用JBoss Marshalling 的节点。这些类必须一起使用 |
使用Protocol Buffers序列化:Protocol Buffers是一种由谷歌公司开发并开源的数据交换格式。
名称 | 描述 |
---|---|
ProtobufDecoder | 使用protobuf 对消息进行解码 |
ProtobufEncoder | 使用protobuf 对消息进行编码 |
ProtobufVarint32FrameDecoder | 根据消息中的Google Protocol Buffers 的"Base 128 Varints"整型长度字段值动态地分割所接收到的ByteBuf |
ProtobufVarint32LengthFieldPrepender | 向ByteBuf 前追加一个Google Protocal Buffers 的"Base 128 Varints"整型的长度字段值 |
使用https:netty通过SslHandler使用了javax.net.ssl包,来实现SSL加密。此外它还提供了OpenSSL工具的SSLEngine实现——OPenSSLEngine,它的性能比jdk的SSLEngine更好。netty默认会尝试加载OpenSSLEngine,如果失败再去加载JdkSSLEngine。它的相关方法有:
API | 描述 |
---|---|
setHandshakeTimeout (long,TimeUnit),setHandshakeTimeoutMillis (long),getHandshakeTimeoutMillis() | 设置和获取超时时间,超时之后,握手ChannelFuture 将会被通知失败 |
setCloseNotifyTimeout (long,TimeUnit),setCloseNotifyTimeoutMillis (long),getCloseNotifyTimeoutMillis() | 设置和获取超时时间,超时之后,将会触发一个关闭通知并关闭连接。这也将会导致通知该ChannelFuture 失败 |
handshakeFuture() | 返回一个在握手完成后将会得到通知的ChannelFuure。如果握手先前已经执行过了,则返回一个包含了先前的握手结果的ChannelFuture |
close(),close(ChannelPromise),close(ChannelHandlerContext,ChannelPromise) | 发送close_notify 以请求关闭并销毁底层的SslEngine |
Http编解码器:netty提供了多个ChannelHandler用于将数据格式化为http响应或将请求解析为数据。下图展示了http请求响应的组成部分:
主要的编解码器有:
名称 | 描述 |
---|---|
HttpRequestEncoder | 将HttpRequest、HttpContent 和LastHttpContent 消息编码为字节 |
HttpResponseEncoder | 将HttpResponse、HttpContent 和LastHttpContent 消息编码为字节 |
HttpRequestDecoder | 将字节解码为HttpRequest、HttpContent 和LastHttpContent 消息 |
HttpResponseDecoder | 将字节解码为HttpResponse、HttpContent 和LastHttpContent 消息 |
http聚合:对于某些请求或响应数据,netty的编解码器可能会无法完整解析它们,而是将它们解析为多个数据片段,比如HttpServerCodec只能获取uri中参数,那么如果使用post请求,因为信息保存在messageBody,所以无法完全解析。这时就需要加上HttpObjectAggregator。HttpObjectAggregator结构体系如下:
MessageAggregator的decode()里有一个currentMessage参数,它是该handler的成员变量,每一个channel对应一个handler实例,这个currentMessage会存储多次decode迭代的结果,这就是聚合实现的关键。
http压缩:虽然http数据压缩会带来服务器时钟的开销,但是可以节省网络流量,加快传输速率。客户端可以使用HttpContentDecompressor来处理来自服务器的内容,服务端使用HttpContentCompressor()进行数据压缩。
websocket:netty为实现websocket长连接提供了多种框架。以下是websocketFrame类型:
名称 | 描述 |
---|---|
BinaryWebSocketFrame | 数据帧:二进制数据 |
TextWebSocketFrame | 数据帧:文本数据 |
ContinuationWebSocketFrame | 数据帧:属于上一个BinaryWebSocketFrame 或者TextWebSocketFrame 的文本的或者二进制数据 |
CloseWebSocketFrame | 控制帧:一个CLOSE 请求、关闭的状态码以及关闭的原因 |
PingWebSocketFrame | 控制帧:请求一个PongWebSocketFrame |
PongWebSocketFrame | 控制帧:对PingWebSocketFrame 请求的响应 |
要想为WebSocket 添加安全性,只需要将SslHandler作为第一个ChannelHandler 添加到 ChannelPipeline 中。
连接管理:netty为检测处理空闲和超时连接提供了管理器。主要的有:
名称 | 描述 |
---|---|
IdleStateHandler | 当连接空闲时间太长时,将会触发一个IdleStateEvent 事件。然后,可以通过在ChannelInboundHandler中重写userEventTriggered()方法来处理该IdleStateEvent 事件 |
ReadTimeoutHandler | 如果在指定的时间间隔内没有收到任何的入站数据,则抛出一个ReadTimeoutException 并关闭对应的Channel。可以通过重写ChannelHandler 中的exceptionCaught()方法来检测该ReadTimeoutException |
WriteTimeoutHandler | 如果在指定的时间间隔内没有任何出站数据写入,则抛出一个WriteTimeoutException 并关闭对应的Channel 。可以通过重写你的ChannelHandler 的exceptionCaught()方法检测该WriteTimeoutException |
ChannelPipline接口实现了链式调用,可添加ChannelHandler容器。当Channel被创建时,它会被自动分配到它所属的ChannelPipline上。
如图,当一个入站消息进入时,它会从ChannelPipline的头部开始,并被传递给第一个ChannelInboundHandler,当在此ChannelHandler被处理完后,它又会被传递给下一个ChannelInboundHandler,直到到达ChannelPipline的尾部。消息的出站与入站差不多。
当ChannelHandler被添加到ChannelPipline时,它会被分配一个ChannelHandlerContext,其代表了ChannelHandler和ChannelPipline之间的绑定。
在netty中,有两种发送消息的方式:
- 直接写到Channel中,这将使消息从ChannelPipline的尾部开始流动。
- 直接写到ChannelHandlerContext中,这将是消息从ChannelPipline中的 下一个ChannelHandlerContext开始流动。
在ChannelPipline 传播事件时,它会测试ChannelPipline 中的下一个ChannelHandler的类型是否符合事件的运动方向,如果不匹配,那么它会跳过该ChannelHandler。
ChannelHandlerContext主要作用是使ChannelHandler和ChannelPipline进行交互,ChannelHandler可以通知ChannelHandler所属的ChannelPipline的下一个ChannelHandler ,可以修改它所属的ChannelPipline。
ChannelHandlerContext API:
ChannelHandlerContext中有些方法Channel和ChannelPipline也有,要注意的是如果调用Channel 或者ChannelPipeline 上的这些方法,它们将沿着整个ChannelPipeline 进行传播。而调用位于ChannelHandlerContext 上的相同方法,则将从当前所关联的ChannelHandler 开始,并且只会传播给位于该 ChannelPipeline 中的下一个能够处理该事件的ChannelHandler。
netty的引导类为应用程序的网络层配置提供了容器,ServerBootstrap用于引导服务端,Bootstrap用于引导客户端。
我们可以看到ServerBootstrap和Bootstrap都实现了AbstractBootstrap抽象类。
在前面文章中,在实现服务端引导程序时,我们创建并绑定了两个EventLoopGroup,这是为什么呢?查看源码我们可以看出它的解释:
源码的解释为服务端引导需要一个EventLoopGroup表示服务器自身已被绑定到了本地端口正在监听的套接字,第二组则是用来处理客户端连接的。
查看Bootstrap的源码我们可以看到当调用Bootstrap.group(EventLoopGroup group)时,实际上是调用AbstractBootstrap的group方法,正和ServerBootstrap.group(EventLoopGroup parentGroup, EventLoopGroup childGroup)的第一行一样。
网络传输的基本单位都是字节,而对于字节操作可以使用java的ByetBuffer API。但是ByteBuf有一些的局限性 :
它的长度长度固定,一旦分配完成,容量不能动态扩展和收缩,当需要编码的POJO对象大于ByteBuffer的容量时,会发生索引越界异常;
ByteBuffer只有一个标识位控的指针position,读写的时候需要手工调用flip()和rewind()等,很容易导致程序处理失败。
API功能有限,一些高级和实用的特性需要使用者自己编程实现。
对于此,netty在ByteBuffer的基础上再次构建,从而实现了新的、功能强大的ByteBuf API,它具有以下优点:
堆缓冲区模式又称为:支撑数组(backing array)模式。之间将数据存放在JVM的堆空间,通过将数据存储在数组中实现。示例如下:
public static void heapBuffer() { // 创建Java堆缓冲区 ByteBuf heapBuf = Unpooled.buffer(); if (heapBuf.hasArray()) { // 判断是否有支撑数组 byte[] array = heapBuf.array(); // 获取该支撑数组引用 int offset = heapBuf.arrayOffset() + heapBuf.readerIndex(); // 计算第一个字节偏移量 int length = heapBuf.readableBytes(); // 获取可读字节数 handleArray(array, offset, length); // 调用自己的方法 } } 复制代码
直接缓冲区属于堆外分配的直接内存,不会占用堆的容量。适用于套接字传输过程,避免了数据从内部缓冲区拷贝到直接缓冲区的过程,性能较好。它的主要缺点是相对于基于堆的缓冲区,它们的分配和释放代价都较为昂贵,并且因为数据不是在java堆上 ,所以处理前你需要再进行一次复制。
public static void directBuffer() { ByteBuf directBuf = Unpooled.directBuffer(); if (!directBuf.hasArray()) { // 如果不是堆缓冲区 int length = directBuf.readableBytes(); // 获取可读字节数 byte[] array = new byte[length]; // 分配一个新数组来保存数据 directBuf.getBytes(directBuf.readerIndex(), array); // 将数据复制到新数组 handleArray(array, 0, length); // 调用自己的方法 } } 复制代码
复合缓冲区是netty特有的缓冲区。本质上类似于提供一个或多个ByteBuf的组合视图,可以根据需要添加和删除不同类型的ByteBuf。复合缓冲区不支持访问其支撑数组。因此如果要访问,需要先将内容拷贝到堆内存中,再进行访问。
public static void byteBufComposite() { // 复合缓冲区,只是提供一个视图 CompositeByteBuf messageBuf = Unpooled.compositeBuffer(); ByteBuf headerBuf = Unpooled.buffer(); ByteBuf bodyBuf = Unpooled.directBuffer(); messageBuf.addComponents(headerBuf, bodyBuf); // 将ByteBuf实例追加到CompositeByteBuf messageBuf.removeComponent(0); // 移除索引位置为0的缓冲区 for (ByteBuf buf : messageBuf) { // 遍历缓冲区 System.out.println(buf.toString()); } } 复制代码
随机访问索引:ByteBuf的索引是从0开始的。
public static void byteBufRelativeAccess() { ByteBuf buffer = Unpooled.buffer(); for (int i = 0; i < buffer.capacity(); i++) { byte b = buffer.getByte(i);// 不改变readerIndex值 System.out.println((char) b); } } 复制代码
对于那些只需要一个索引值参数的方法,它们都不会改变readIndex和writeIndex,不过可以通过调用readerIndex(index)和writeIndex(index)来手动改变。
可丢弃字节:可丢弃字节区域是指[0,readerIndex)之间的区域。可调用discardReadBytes()方法丢弃已经读过的字节。discardReadBytes()方法会移动可读字节区域内容(CONTENT)。如果频繁调用,会有多次数据复制开销,对性能有一定的影响
可读字节:可读字节区域是指[readerIndex, writerIndex)之间的区域。任何readxxx()
和skipxxx()
的操作方法,都会改变readerIndex索引。
可写字节:可写字节区域是指[writerIndex, capacity)之间的区域。任何writexxx()
的操作方法都将改变writerIndex的值。
索引管理:
查找操作:查找ByteBuf指定的值,最简单的就是使用indexOf()方法,较复杂的可以通过ByteBufProcessor作为参数的方法达,比如int index = buffer.forEachByte(ByteProcessor.FIND_CR);
。
派生缓冲区:派生缓冲区为ByteBuf提供了一个访问的视图。视图仅仅提供一种访问操作,不做任何拷贝操作。如果你修改了这个新的ByteBuf实例的具体内容,那么对应的源实例也会被修改,如果需要拷贝现有缓冲区的真实副本,请使用copy()或copy(int, int)方法。
读/写操作:有两种类别的读写:
get()和set()操作——从给定的索引开始,并且保持索引不变。
read()和write()操作——从给定的索引开始,并且根据已经访问过的字节数对索引进行访问。
ByteBufHolder是netty的高级特性,为缓冲区池化提供了支持。可以通过子类实现ByteBufHolder接口,根据自身需要添加自己需要的数据字段。可以用于自定义缓冲区类型扩展字段。
netty通过ByteBufAllocator接口实现了(ByteBuf的)池化。可以通过Channel或者ChannelHandler的ChannelHandlerContext获取一个到ByteBufAllocator的引用。
netty提供了两种ByteBufAllocator的实现:PooledByteBufAllocator(池化)、UnpolledByteBufAlloactor(非池化)。netty默认使用PooledByteBufAllocator,但是也可以通过ChannelConfig修改。
在未获得ByteBufAllocator引用的情况下,我们可以使用netty提付的Unpooled工具类来创建未池化的ByteBufAllocator。
ByteBufUtil类提供了用于操作ByteBuf的静态的辅助方法。hexdump()
方法以十六进制的表示形式大于ByteBuf内容。equals(ByteBuf, ByteBuf)
方法用来判断两个ByteBuf实例是否相等。
netty通过实现ReferenceCounted接口为ButeBuf和ButeBufHolder引入了引用计数技术。当计数为0时,系统将会回收缓冲区,它降低了内存分配的开销。