TCP协议
TCP是一种面向连接的,可靠的,基于字节流的传输层通信协议.
我们先来看看网络上最传统的通信,网络上的通信默认是不可靠的,例如我们从中国发一个包到美国,在这个过程,我们的包可能会丢掉,或者包的顺序是不对的,但是我们的TCP的协议是一个可靠的协议,在一个不可靠的通道上传送信息,最后的结果是可靠的.我们生活上打电话,如果信号不好,是不是会等对方确认一下是否听到,我们网络上的通信也是如此,我们往Server发一个包,我们会希望Server端会给我们返回一个回应(ACK),当我们得到这个ACK,我们Client就知道这个包发成功了,当没有收到ACK的时候,我们会去重新发这个包直到Server端给我们ACK,这是TCP最原始的一个版本,但是这个版本会有一个问题,我们可以想象,我们发一个包就得等待一次ACK,这样的效率是不是太慢了,我们发一个包到Server端返回一个包这里就会有一个等待的时间,也就是会阻塞.所以TCP就引入了一个Batch的概念,我不是一个包发过去就等待一个ACK,我是一批包发过去,等待一个ACK, 例如我们client发一个Batch,里面包含1,2,3这三个包,Server只需要返回一个ACK 3,就可以了.但是这是理想的状态,这又有一个问题,Server只收到2,3这两个包,1这个包丢了,这怎么办.当时的解决方案是"回退N步",如果有某个包丢了,Server是不会返回ACK的,Client是需要重新发送这三个包的,最后就衍生到了现在的"三次握手","四次挥手".
那为什么是"三次"而不是"五次","八次"呢?
"三次"是刚好能证明双方的通讯是没有问题的,在Client和Server的角度上来看,"三次"刚好能把自己的"发送"和"接收"能力告诉对方.
我们先来看看Socket API
(图1)
(图2)
只要是使用到Socket的技术栈,所有的语言在实现上都是大同小异的,我们一开始是初始化一个Socket,可以理解为new 一个Socket,然后我们要绑定一个端口也就是Bind()的动作,然后要监听,listen(),监听就表示我们是Server端,不是Client端,而我们的"三次握手"就是发生在Client端发送connect请求,Server端执行accept的时候,当Server端执行完accept的时候,Client和Server就可以通信了,Server端就会为这个Client分配内存等等操作,如图2所示,Server端在内部会维护两个queue(队列),sync queue就是当Client发送第一个包过来询问的时候,觉得没有问题就会放进这里,同时发送ACK,当Client又发一次包过来的时候说明连接没有问题的时候,Server端就会把这个连接放进accept queue里面,而我们应用程序从这个队列里面显式地拿一条连接出来,这个动作就是accept,一旦连接接完了,就是很传统的通信了,可以执行read()/write()操作了.
我们再来看看Java里的Socket框架
BIO & NIO
1 import java.io.BufferedReader; 2 import java.io.IOException; 3 import java.io.InputStreamReader; 4 import java.io.PrintWriter; 5 import java.net.InetSocketAddress; 6 import java.net.ServerSocket; 7 import java.net.Socket; 8 9 public class BIOServer { 10 public static void start(int port) throws IOException { 11 //socket 12 ServerSocket serverSocket = new ServerSocket(); 13 //bind & listen 14 serverSocket.bind(new InetSocketAddress(port), 2);//backlog, accept queue was created in listen() 15 16 while(true) { 17 //accept 18 final Socket clientSocket = serverSocket.accept();//block! 19 System.out.println("accept!"); 20 new Thread(()-> {// or user thread pool 21 try { 22 BufferedReader in = new BufferedReader(new InputStreamReader(clientSocket.getInputStream())); 23 PrintWriter out = new PrintWriter(clientSocket.getOutputStream(), true); 24 String line = in.readLine();//block 25 while(line != null) { 26 out.println(line); 27 System.out.println("读到的值为:"+line); 28 out.flush(); 29 line = in.readLine();//block 30 } 31 clientSocket.close(); 32 } catch (IOException e) { 33 e.printStackTrace(); 34 try { 35 clientSocket.close(); 36 } catch (IOException ee) { 37 e.printStackTrace(); 38 } 39 } 40 }).start(); 41 } 42 43 } 44 45 public static void main( String[] args ) throws InterruptedException, IOException{ 46 start(8084); 47 } 48 }
我们这个代码,通过while(true)来不断的调用这个accept的方法,因为accept是个同步阻塞的方法,不会导致程序一直在死循环的创建线程,我们测试的时候可以直接在cmd里telnet这个ip端口,然后再输入模拟通讯.BIO有个缺点就是,在高并发的情况下,例如有1000个请求,就会创建1000个线程去执行任务,如果执行readLine的时候,阻塞了,那么1000个线程就不能执行完毕,不能得到销毁.
(图3)
我们来看看图3,BIO还有一个很浪费内存的情况就是,每一条线程(socket)过来都要经过"accept","read & parse","write"这些同步阻塞的操作,
"read & parse" : TCP是面向字节流的,肯定要解决一个边界的问题,哪里到哪里是一个完整的请求,我们要把它分隔开,这就是read & parse要做的事情,例如一个包是100字节,但是我只收到99字节,我还不能交给下层去用,我必须等待接收到一个完整的包才行,少一个字节都是不对的,下层就无法解析了
"write" : write为什么也会阻塞?接收端也是有一个窗口的,会有一个缓冲区域buffer,例如Server是要发1,2,3,4这四个包的,但是Client只收到1,3,4这三个包,就要等待2这个包,那么Client就会将1,3,4这三个包缓存进Buffer里面,当Buffer满了之后,Server就需要等待Buffer有空间才能进行发送.
因为BIO会导致创建过多的线程,线程间的上下文切换是很耗费资源的,所以就衍生出了NIO.
NIO核心:不想创建过多的线程
(图4)
NIO中的Selector你可以想象成是一个线程,一个线程负责多个Socket 连接,假设有一万个连接,这一万个连接都是交给这个线程管理,这个Selector会监听这么多个Socket的状态变化,这样就不会造成浪费,因为它要么是在做事情,要么就是没事可做,这个线程就是一直轮询这些Socket,有read或write操作的时候,它是有多少就读或写多少,不会去等待.
我们来看看NIO的Selector API
我们再来看看代码
1 import java.io.IOException; 2 import java.net.InetSocketAddress; 3 import java.nio.ByteBuffer; 4 import java.nio.channels.SelectionKey; 5 import java.nio.channels.Selector; 6 import java.nio.channels.ServerSocketChannel; 7 import java.nio.channels.SocketChannel; 8 import java.util.Iterator; 9 import java.util.Set; 10 11 public class NIOServer { 12 public static void start(int port) throws IOException { 13 ServerSocketChannel serverChannel = ServerSocketChannel.open();//我们可以想象成new 一个Socket 14 serverChannel.configureBlocking(false);//nonblocking 15 InetSocketAddress address = new InetSocketAddress(port); 16 //bind & listen 17 serverChannel.bind(address); 18 19 Selector selector = Selector.open();//可以理解成创建单个线程,轮询管理多个Socket对象 20 21 serverChannel.register(selector, SelectionKey.OP_ACCEPT);//把服务器的Socket注册到这个Selector上,告诉Selector我感兴趣的事情OP_ACCEPT 22 //开始进入Selector的工作模式 23 while(true) { 24 selector.select();//scan ,轮询一遍,查看所管理的channel是否有时间发生 25 26 Set<SelectionKey> readyKeys = selector.selectedKeys();//这个SelectionKey就是每一个channel绑定的东西 27 System.out.println("readyKeys.size="+readyKeys.size()); 28 Iterator<SelectionKey> it = readyKeys.iterator(); 29 while (it.hasNext()) { 30 SelectionKey key = it.next(); 31 //accept,如果这个key是Acceptable事件 32 if (key.isAcceptable()) { 33 System.out.println("isAcceptable"); 34 ServerSocketChannel server = (ServerSocketChannel)key.channel(); 35 SocketChannel socket = server.accept();//直接获取这个channel,这不是一个阻塞的方法,一经调用马上返回 36 System.out.println("Accept !"); 37 socket.configureBlocking(false);//don't forget to set nonblocking 38 socket.register(selector, SelectionKey.OP_READ );//tricky,在最开始selector只关注OP_ACCEPT事件,现在就可以关注这个channel的OP_READ事件了 39 } 40 if (key.isReadable()) { 41 System.out.println("isReadable"); 42 SocketChannel socket = (SocketChannel) key.channel(); 43 final ByteBuffer buffer = ByteBuffer.allocate(64); 44 final int bytesRead = socket.read(buffer);//also nonblock, how to convert byte to frame 45 if (bytesRead > 0) { 46 buffer.flip(); 47 int ret = socket.write(buffer); 48 System.out.println("ret="+ret); 49 if (ret <=0) { 50 //register op_write 51 socket.register(selector, SelectionKey.OP_WRITE); 52 } 53 54 buffer.clear(); 55 } else if (bytesRead < 0) {//means connection closed 56 key.cancel(); 57 socket.close(); 58 System.out.println("Client close"); 59 } 60 } 61 //不能在这里写wirtable事件,因为OP_ACCEPT-->OP_READ-->OP_WRITE,一旦到了OP_WRITE,这个while循环就会一直走里面的代码块,没有必要 62 // if (key.isWritable()) { 63 // SocketChannel socket = (SocketChannel) key.channel(); 64 // final ByteBuffer buffer = ByteBuffer.allocate(64); 65 // socket.write(buffer); 66 // //remove 67 // } 68 69 70 it.remove();//don't forget, why need to manually move? 71 } 72 } 73 } 74 75 76 public static void main( String[] args ) throws InterruptedException, IOException{ 77 start(8085); 78 } 79 }
NIO代码裸写有很大的风险,每一步都需要很严谨,而且很难融入业务,BIO跟它比还算比较方便,后面就衍生出了Reactor模式
Netty是一个NIO Client Server的框架
Netty核心概念
ChannelPipeline,ChannelHandler面向的是业务的
(图5)
select():遍历操作
processSelectedkeys():负责一些IO时间,这里不能放阻塞的代码,不然就违反NIO的原则
runAllTasks:会去判断当前任务是否属于主线程,如果不是则放到task queue里面异步执行
(图6)
我们为了最大限度地榨干CPU的性能,通常会根据CPU的核数来创建多个EventLoop,每个EventLoop负责多个channel.
(图7)
Pipeline:流水线的概念
ChannelHandler:真正操作业务的地方,有任务来就会进入到Handler代码块
传统的网络编程都有这几步:read,decode,compute,encode,send,每一层都可以有对应的Handler.
InboundHandler:专门处理外部进来的请求
OutboundHandler:专门处理应用的请求
Netty各组件之间关系
粘包 & 拆包
(图8)
我们来看图8的例子,当Client发来一堆包的时候,我们要从这一堆包里面拆分出来3个完整的信息,分别是(1011),(01011),(011),为了得到完整信息,我们会去跟Client端指定协议,例如每段有用的消息要在头部发一个特殊表示的head,head包后续跟着是整段信息的包长度(length),而这个包长度是否包括head包和length包这两个包完全可以由我们自己来定.我再来看看如何指定一套完整的协议
理解LengthFieldBasedFrameDecoder
1 import java.util.function.Consumer; 2 3 import com.tim.nettttty.handler.EchoHandler; 4 import com.tim.nettttty.handler.IProtocalHandler; 5 import com.tim.nettttty.handler.PipelinePrintHandler; 6 import com.tim.nettttty.handler.PrintInboundHandler; 7 8 import io.netty.bootstrap.ServerBootstrap; 9 import io.netty.channel.Channel; 10 import io.netty.channel.ChannelFuture; 11 import io.netty.channel.ChannelInitializer; 12 import io.netty.channel.ChannelPipeline; 13 import io.netty.channel.EventLoopGroup; 14 import io.netty.channel.nio.NioEventLoopGroup; 15 import io.netty.channel.socket.nio.NioServerSocketChannel; 16 import io.netty.handler.codec.LengthFieldBasedFrameDecoder; 17 import io.netty.handler.codec.LineBasedFrameDecoder; 18 import io.netty.handler.codec.string.StringDecoder; 19 import io.netty.handler.codec.string.StringEncoder; 20 import io.netty.util.CharsetUtil; 21 import io.netty.util.concurrent.DefaultEventExecutorGroup; 22 23 24 public class Server { 25 26 private static void use( ChannelPipeline pipeline, Consumer<ChannelPipeline> strategy) { 27 strategy.accept(pipeline); 28 } 29 30 private static Consumer<ChannelPipeline> echo = p ->{ 31 p.addLast( 32 new LineBasedFrameDecoder(80,false,false), 33 new StringDecoder(), 34 new EchoHandler(), 35 new PipelinePrintHandler(), 36 new StringEncoder(CharsetUtil.UTF_8) 37 38 ); 39 }; 40 41 private static Consumer<ChannelPipeline> print = p ->{ 42 p.addLast( 43 new PrintInboundHandler("id1") 44 ); 45 }; 46 47 private static Consumer<ChannelPipeline> decode = p ->{ 48 p.addLast( 49 new LengthFieldBasedFrameDecoder(1024,2,2,-2,0)) 50 .addLast(new DefaultEventExecutorGroup(16), new IProtocalHandler()) 51 .addLast( new StringEncoder(CharsetUtil.UTF_8)) 52 ; 53 }; 54 55 private static void start(int port) throws InterruptedException { 56 EventLoopGroup bossGroup = new NioEventLoopGroup(); 57 EventLoopGroup workerGroup = new NioEventLoopGroup(); 58 try { 59 ServerBootstrap b = new ServerBootstrap();//Bootstrap for client 60 b.group(bossGroup, workerGroup); 61 b.channel(NioServerSocketChannel.class);//always 62 b.childHandler(new ChannelInitializer() { 63 @Override 64 protected void initChannel(Channel ch) throws Exception { 65 use(ch.pipeline(), decode); 66 } 67 }); 68 69 ChannelFuture f = b.bind(port).sync(); 70 f.channel().closeFuture().sync(); 71 } finally { 72 bossGroup.shutdownGracefully(); 73 workerGroup.shutdownGracefully(); 74 } 75 } 76 77 public static void main( String[] args ) throws InterruptedException{ 78 start(8084); 79 } 80 }
1 import java.util.Random; 2 3 import io.netty.buffer.ByteBuf; 4 import io.netty.channel.ChannelHandlerContext; 5 import io.netty.channel.ChannelInboundHandlerAdapter; 6 import io.netty.util.CharsetUtil; 7 8 public class IProtocalHandler extends ChannelInboundHandlerAdapter { 9 10 @Override 11 public void channelRead(ChannelHandlerContext ctx, final Object msg) throws Exception { 12 int sleep = 500 * new Random().nextInt(5); 13 System.out.println("sleep:" + sleep); 14 Thread.sleep(sleep); 15 16 final ByteBuf buf = (ByteBuf) msg; 17 char c1 = (char) buf.readByte(); 18 char c2 = (char) buf.readByte(); 19 20 if (c1 != 'J' || c2 != 'W') { 21 ctx.fireExceptionCaught(new Exception("magic error")); 22 return ; 23 } 24 25 buf.readShort();//skip length 26 27 String outputStr = buf.toString(CharsetUtil.UTF_8); 28 System.out.println(outputStr); 29 30 ctx.channel().writeAndFlush(outputStr+"\n"); 31 32 } 33 34 }
1 import java.io.BufferedReader; 2 import java.io.DataOutputStream; 3 import java.io.IOException; 4 import java.io.InputStreamReader; 5 import java.net.Socket; 6 7 public class OIOClient { 8 9 public static final String[] commands = new String[] { 10 "hi", 11 "i am client", 12 "helloworld", 13 "java and netty" 14 }; 15 public static void main(String[] args) throws IOException { 16 int concurrent = 1; 17 Runnable task = () ->{ 18 try { 19 Socket socket = new Socket("127.0.0.1", 8084); 20 DataOutputStream out = new DataOutputStream(socket.getOutputStream()); 21 /** 22 * HEADER(2)|LENGTH(2)|BODY 23 * LENGTH = (self(2) + BODY), not include header 24 */ 25 for(String str : commands) { 26 out.writeByte('J'); 27 out.writeByte('W'); 28 int length = str.length(); 29 out.writeShort(length*2 + 2);//why *2 here? 30 out.writeChars(str); 31 } 32 out.flush(); 33 34 BufferedReader br=new BufferedReader(new InputStreamReader(socket.getInputStream())); 35 String line = null; 36 while(!((line =br.readLine())==null)){ 37 System.out.println(line); 38 } 39 40 socket.close(); 41 }catch (Exception e) { 42 e.printStackTrace(); 43 } 44 }; 45 46 47 for(int i =0;i<concurrent;i++) { 48 new Thread(task).start(); 49 } 50 51 52 } 53 }