NIO
的那些事我们在前段时间学习了IO
和NIO
的一些概念性的东西,并且写了一些简单的例子进行实践,虽然简单,但基本上覆盖了NIO
的一些最基本的概念了。
如果还没看过的,如果翻一下之前的文章了解一下,或者看一下网上的其他文章。
JAVA
的NIO
的那些痛既然我们学过NIO
,那我们以JAVA
的NIO
来举个例子,说明一下我们使用NIO
的一些基本流程:
ServerSocketChannel
(Server
端)或SocketChannel
(Client
端),监听对应的端口或连接对应的端口configureBlocking(false)
为非阻塞register
注册要监听的描述符Selector.open
打开Selector
Selector.select
得到已就绪的SelectionKey
SelectionKey
进行相应的处理这里我把之前的某些步骤合并了,可能跟之前有前面的文章有点不一致,但总体步骤是一样的。
其实,上面的步骤我们大可以了解到,我们真正需要关注的步骤只是第6步,或者说是我们真正要处理IO
事件的一些逻辑,其他的都是一些通用流程而已。
既然如此,我们真的有必要把时间花费在这些通用的地方吗?
偷懒的程序员肯定不想这样做,所以有人开发了mina
和netty
一类的NIO
框架,旨在把程序员从这些烦杂的通用流程中释放出来,而是只关注真正的业务逻辑,把这些交由框架去做处理。
mina
和netty
的作者都是同一个人(Trustin Lee,牛人总是各种牛)。
但鉴于netty
基本上已经是事实上的NIO
标准框架了,并且社区一直比较活跃,而mina
已经归档很久了,都已经没更新很多年了。为了避免精力太过分散(其实是我没学习过mina
,不懂-_- ),我们这里不讨论mina
,直接学习netty
,里面有很多值得我们学习的东西。
在开始介绍netty
相关的知识前,我们来了解一下线程模型相关的一些知识,这里参考了很多网上的一些文章,加以自己整理了一下,希望能够给一些看其他文章不清楚的朋友一些不一样的理解。
图片来自:https://www.jianshu.com/p/738...
这里的单线程指的是分派线程和工作线程都在同一个线程,可以看回我们的JAVA
的NIO
示例代码,这里为了方便,我们也贴在下面:
public class MyServer { public static void main(String[] args) throws IOException { ServerSocketChannel serverSocketChannel = ServerSocketChannel.open(); serverSocketChannel.configureBlocking(false); serverSocketChannel.bind(new InetSocketAddress("localhost", 8001)); Selector selector = Selector.open(); serverSocketChannel.register(selector, SelectionKey.OP_ACCEPT); String str = ""; while(!Thread.currentThread().isInterrupted()) { //这里是一直阻塞,直到有描述符就绪 selector.select(); Set<SelectionKey> selectionKeys = selector.selectedKeys(); Iterator<SelectionKey> keyIterator = selectionKeys.iterator(); while(keyIterator.hasNext()) { SelectionKey key = keyIterator.next(); keyIterator.remove(); //连接建立 if (key.isAcceptable()) { try { SocketChannel clientChannel = serverSocketChannel.accept(); clientChannel.configureBlocking(false); clientChannel.register(selector, SelectionKey.OP_READ); } catch (ClosedChannelException e) { e.printStackTrace(); } } //连接可读,这时可以直接读 else if (key.isReadable()) { ByteBuffer readBuffer = ByteBuffer.allocate(1024); SocketChannel socketChannel = (SocketChannel) key.channel(); try { int num = socketChannel.read(readBuffer); str = new String(readBuffer.array(), 0, num); System.out.println("received message:" + str); } catch (IOException e) { e.printStackTrace(); } } } } } }
我们可以看到在我们nio
的例子中,我们没有明确使用多线程,这里就是使用了单线程来处理的。
它有什么好处呢?
实现简单。这是当然的,所有不涉及到多线程的代码都是相对比较简单的,注意,是相对
有优点的同时肯定有缺点,那么这种单线程有什么缺点呢:
性能相对比较差。只有一个线程进行请求的处理,也就是只有一个线程处理CPU的描述符,假设同一时间有很多信号都就绪了,并且我们读到IO
数据后的真正处理逻辑可能比较复杂,那么所有的请求都需要等待当前的请求处理完成后才能处理其他的。这也就导致了它的性能相对(这里的相对是对比其他多线程的处理方式)比较弱。
图片来自:https://www.jianshu.com/p/738...
这里的多线程指的是处理逻辑的多线程,对应到我们的NIO
代码逻辑里面就是对SelectionKey
的处理是多线程的,我们直接看代码会直观点:
public class MyServerMultipleThread { @SuppressWarnings("Duplicates") public static void main(String[] args) throws IOException { ServerSocketChannel serverSocketChannel = ServerSocketChannel.open(); serverSocketChannel.configureBlocking(false); serverSocketChannel.bind(new InetSocketAddress("localhost", 8001)); Selector selector = Selector.open(); serverSocketChannel.register(selector, SelectionKey.OP_ACCEPT); while(!Thread.currentThread().isInterrupted()) { //这里是一直阻塞,直到有描述符就绪 selector.select(); Set<SelectionKey> selectionKeys = selector.selectedKeys(); Iterator<SelectionKey> keyIterator = selectionKeys.iterator(); while(keyIterator.hasNext()) { SelectionKey key = keyIterator.next(); keyIterator.remove(); //连接建立 if (key.isAcceptable()) { try { SocketChannel clientChannel = serverSocketChannel.accept(); clientChannel.configureBlocking(false); clientChannel.register(selector, SelectionKey.OP_READ); } catch (IOException e) { e.printStackTrace(); } } //连接可读,这时可以直接读 else if (key.isReadable()) { ByteBuffer readBuffer = ByteBuffer.allocate(1024); SocketChannel socketChannel = (SocketChannel) key.channel(); int num = socketChannel.read(readBuffer); new Thread(() -> { String str = new String(readBuffer.array(), 0, num); System.out.println("received message:" + str); }).start(); } } } } }
这里我们可以看到,在进行SelectionKey
遍历读完数据后真正处理的时候,我们新起了一个新的线程进行NIO
的相关处理。
当然,这里的只是一个示例,真正写代码的时候不应该这样无限制的新起线程,而是应该使用线程池,更合理的使用线程,避免线程数量太多,导致CPU切换太频繁,这样反而起不到优化性能的作用。
图片来自:https://www.jianshu.com/p/738...
一看到这图,估计很多人头都大了,这都什么鬼,这么复杂啊。
实际可以简单一点理解:
注意,这里的多线程不包括accept
请求,accept
还是由单个线程进行分发。
我们直接看一下代码会比较容易理解
public class MyServerMultipleThread2 { @SuppressWarnings("Duplicates") public static void main(String[] args) throws IOException { ServerSocketChannel serverSocketChannel = ServerSocketChannel.open(); serverSocketChannel.configureBlocking(false); serverSocketChannel.bind(new InetSocketAddress("localhost", 8001)); Selector selector = Selector.open(); serverSocketChannel.register(selector, SelectionKey.OP_ACCEPT); while(!Thread.currentThread().isInterrupted()) { selector.select(); //这里是一直阻塞,直到有描述符就绪 Set<SelectionKey> selectionKeys = selector.selectedKeys(); Iterator<SelectionKey> keyIterator = selectionKeys.iterator(); while(keyIterator.hasNext()) { SelectionKey key = keyIterator.next(); keyIterator.remove(); //连接建立 if (key.isAcceptable()) { try { SocketChannel clientChannel = serverSocketChannel.accept(); clientChannel.configureBlocking(false); clientChannel.register(selector, SelectionKey.OP_READ); } catch (IOException e) { e.printStackTrace(); } } //连接可读,这时可以直接读 else if (key.isReadable()) { new Thread(() -> { ByteBuffer readBuffer = ByteBuffer.allocate(1024); SocketChannel socketChannel = (SocketChannel) key.channel(); int[] num = new int[]{0}; try { num[0] = socketChannel.read(readBuffer); new Thread(() -> { String str = new String(readBuffer.array(), 0, num[0]); System.out.println("received message:" + str); }).start(); } catch (IOException e) { e.printStackTrace(); } }).start(); key.channel().register(key.selector(), SelectionKey.OP_WRITE); } } } } }
这里我没有参考一些网上比较复杂的做法,可能实现起来不大一致,但相对容易理解一点。
accept
请求时,还是由单前线程单独处理READ
或WRITE
等请求时,我们新起一个线程去做处理,并且在真正的处理逻辑时,还是跟上面的多线程逻辑一样,是新起一个线程去做处理。READ
或WRITE
时,注意,需要重新注册相应的WRITE
或READ
事件——因为新起线程后,当前SelectionKey
的信号还是READ
,如果我们不做修改,会导致当前的线程会重复多次处理。具体大家可以下来试试,把后面的register
去掉,看一下会出现什么情况。我们看到,上面的线程模型,都以性能提升为目的,一步步去进行优化,但同时我们也看到了,代码是越来越复杂,使得我们在维护我们真正的逻辑时,有点像是大海捞针,真正的代码逻辑就那么一点,而很多都是一些模板代码。
为了解决这些问题,就需要引出我们的框架了,框架正是为了帮我们去约定好一些通用的逻辑而出现的,比如spring
,帮我做好了IOC
和AOP
等的一些逻辑,这些不需要我们去额外关注;而mybatis
帮我们做好了ORM
相关的一些处理,DB映射等,这些流程化的东西都已经固化了;而我们这里要说的netty
,它帮我们把NIO
这些线程模型相关的东西帮我们做了很多的优化和抽取,我们不再需要管这些流程化的东西,只需要写我们自己的逻辑。
netty
出场netty
作为一个高性能的NIO
框架,基本上已经是事实上的NIO
标准了,包括dubbo
,zookeeper
等内部都比较大量地使用了netty
。或者说具体点,这些框架能够有这么好的性能,大部分功劳要归结到netty
身上。
netty
基础知识看例子前我们先来补充一些基础知识。netty
有几个重要概念:
ChannelHandler
channel
的事件处理器,里面封装了针对当前channel
的生命周期的方法
ChannelInBoundHandler
channel
的READ
请求处理器,里面封装了当前channel
的对于接收请求相关的生命周期方法
ChannelOutBoundHandler
channel
的WRITE
请求处理器,里面封装了当前channel
的对象发出请求的生命周期方法。
ChannelPipeline
此类是netty
架构中比较重要的一个类,它使用了责任链模式,把请求从ChannelHandler
中一个个的往后传递,最终到达我们的业务Handler
。关于Pipeline
的详细描述,我们后面再详细看看。
ByteBuf
netty
封装了自己的ByteBuf
,与JDK
自带的ByteBuffer
的最主要的区别是它有两个指针,一个供读readerIndex
,一个供写writerIndex
。而至于该类的一些详细信息,大家可以看一下它的JavaDoc
,写得非常详细。
关于OutBound
和上面的InBound
的区别,大家可以简单地区分一下,In
就是请求进入,对应的就是READ
,Out
就是请求发出,对应的就是WRITE
。
基本的概念了解清楚了,那我们来看一下简单的例子。
其实netty
最好的文档是它的官网文档。我们就还是以类似官方源码里面的一个example
来学习一下,实现的功能很简单:
Client
连接成功后传一句话给Server
,Server
回复收到。
server
端ServerHandler
public class MyNettyServerHandler extends ChannelInboundHandlerAdapter { @Override public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { System.out.println("from client:" + msg); ctx.writeAndFlush("I received your message:" + msg + System.lineSeparator()); } @Override public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception { cause.printStackTrace(); } }
我们的代码比较简单,打印收到的文本,并且再发回一条语句。我们可以看到我们输出的时候加多了一个换行符——System.lineSeparator()
,这是为什么呢?
这里涉及到另外一个TCP/IP
一个比较重要的问题,拆包和粘包,这里我们先不细说,后面我会有专门的文章来说一下拆包和粘包还有一系列TCP/IP
相关的知识,这是非常大的一块了。我们现在就先简单的知道,加这个换行符是为了让Handler知道我们的消息从哪里结束。
Server
public class MyNettyServer { public static void main(String[] args) { EventLoopGroup bossGroup = new NioEventLoopGroup(); EventLoopGroup workerGroup = new NioEventLoopGroup(); ServerBootstrap serverBootstrap = new ServerBootstrap(); serverBootstrap.group(bossGroup, workerGroup) .channel(NioServerSocketChannel.class) .childHandler(new ChannelInitializer<SocketChannel>() { @Override protected void initChannel(SocketChannel ch) throws Exception { ch.pipeline().addLast(new LineBasedFrameDecoder(4096)); ch.pipeline().addLast(new StringDecoder()); ch.pipeline().addLast(new StringEncoder()); ch.pipeline().addLast(new MyNettyServerHandler()); } }) .option(ChannelOption.SO_BACKLOG, 128) .childOption(ChannelOption.SO_KEEPALIVE, true); ChannelFuture channelFuture = null; try { channelFuture = serverBootstrap.bind("127.0.0.1", 8080).sync(); channelFuture.channel().closeFuture().sync(); } catch (InterruptedException e) { e.printStackTrace(); } finally { bossGroup.shutdownGracefully(); workerGroup.shutdownGracefully(); } } }
这里涉及到比较多的知识点,整体结构我们先不管它,我们重要先关注一下:
线程池
这里定义了两个线程组进行处理,BossGroup
和WorkerGroup
,对应我们上面的多线程模型,原因是netty
并不使用主从多线程模型——这个我们以后的文章有机会再细说。
ServerBootStrap
netty
工具类,有助于编写服务器的相关代码,而Client
端对应的就是Bootstrap
了。
pipeline
的添加ch.pipeline().addLast(new LineBasedFrameDecoder(4096)); ch.pipeline().addLast(new StringDecoder()); ch.pipeline().addLast(new StringEncoder()); ch.pipeline().addLast(new MyNettyServerHandler());
这里把4个Handler
添加到Pipeline
的末尾,至于为什么是末尾,相应看到后面的pipeline
的解析的时候大家就会知道了。
我这里大概描述一下几个Handler
的作用:
LineBasedFrameDecoder
根据换行符\n
或\r\n
进行内容的分割——即拆包
StringDecoder
把接收到的内容解析为String
字符串
StringEncoder
把发出的内容解析为String
字符串
MyNettyServerHandler
我们的真正逻辑处理类,这个应该是在前面的几个处理完成后再进行。我们在后面的pipeline
执行顺序中可以看到为什么这样添加。
后面的Client
中的Handler
也可以参考上面的。
Client
端ClientHandler
public class MyNettyClientHandler extends ChannelInboundHandlerAdapter { @Override public void channelActive(ChannelHandlerContext ctx) throws Exception { ctx.writeAndFlush("helloworld" + System.lineSeparator()); } @Override public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { System.out.println("from server:" + msg); } @Override public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception { cause.printStackTrace(); } }
这里我们的代码也比较简单,就是连接成功的时候发条helloworld
过去服务端,然后再从服务端读到返回的内容。我们就不细说了。
public class MyNettyClient { public static void main(String[] args) { EventLoopGroup workerGroup = new NioEventLoopGroup(); Bootstrap bootstrap = new Bootstrap(); bootstrap.group(workerGroup) .channel(NioSocketChannel.class) .handler(new ChannelInitializer<SocketChannel>() { @Override protected void initChannel(SocketChannel ch) throws Exception { ch.pipeline().addLast(new LineBasedFrameDecoder(4096)); ch.pipeline().addLast(new StringDecoder()); ch.pipeline().addLast(new StringEncoder()); ch.pipeline().addLast(new MyNettyClientHandler()); } }) .option(ChannelOption.SO_KEEPALIVE, true); try { ChannelFuture channelFuture = bootstrap.connect("127.0.0.1", 8080).sync(); channelFuture.channel().closeFuture().sync(); } catch (InterruptedException e) { e.printStackTrace(); } finally { workerGroup.shutdownGracefully(); } } }
对比上面的Server
代码,这里的区别,最大的就是我们只有一个EventLoopGroup
,因为Client
端并不需要接收请求,所以并不需要所谓的BossGroup
。
一切就绪后,我们可以跑一下看看运行情况:
先运行server
,再运行client
server
可以看到client
可以看到
这表示我们已经使用netty
写了一个基本可以用的NIO
程序了。
ChannelPipeline
详解ChannelPipeline
作为netty
的一个底层重要组成部分,ChannelHandler
都需要依靠它进行调度,重要性不言而喻。那我们现在就一起来看看ChannelPipeline
究竟是怎么调度的。
查看ChannelPipeline
的JavaDoc
我们可以看到这样一串描述(牛人写描述都是特别认真的)。
大概的意思就是这样的:
InBoundHandler
的添加顺序,从前往后执行。OutBoundHandler
的添加顺序,从后往前执行。另外,文档中又举了一个例子:
我们套用一下我们的Server
例子来分析一下:LineBasedFrameDecoder
,StringDecoder
,StringEncoder
,MyNettyServerHandler
当我们收到消息时,需要执行的Handler
的顺序为:LineBasedFrameDecoder
,StringDecoder
,MyNettyServerHandler
当我们发出消息时,需要执行的OutboundHandler
的顺序为:StringEncoder
.
基于上面的分析,我们就可以分析为什么我们前面的例子可以得到那样的结果。
这篇文章,我们从一开始的线程模型到后面的netty
的示例,这些种种都是为了性能的提高去做的一些优化。在当前大数据的趋势下,更多需要我们把性能去做到极致。
后面,我们会再根据netty
中的一些最佳实践来分析它是怎么解析粘包和拆分的。
https://www.jianshu.com/p/738095702b75
https://netty.io/wiki/user-guide-for-4.x.html