同步和异步的区别最大在于异步的话调用者不需要等待处理结果,被调用者会通过回调等机制来通知调用者其返回结果。
采用 BIO 通信模型 的服务端,通常由一个独立的 Acceptor 线程负责监听客户端的连接。我们一般通过在 while(true) 循环中服务端会调用 accept() 方法等待接收客户端的连接的方式监听请求,请求一旦接收到一个连接请求,就可以建立通信套接字在这个通信套接字上进行读写操作,此时不能再接收其他客户端连接请求,只能等待同当前连接的客户端的操作执行完成, 不过可以通过多线程来支持多个客户端的连接,如上图所示。
如果要让 BIO 通信模型 能够同时处理多个客户端请求,就必须使用多线程(主要原因是 socket.accept()、 socket.read()、 socket.write() 涉及的三个主要函数都是同步阻塞的),也就是说它在接收到客户端连接请求之后为每个客户端创建一个新的线程进行链路处理,处理完成之后,通过输出流返回应答给客户端,线程销毁。这就是典型的 一请求一应答通信模型 。我们可以设想一下如果这个连接不做任何事情的话就会造成不必要的线程开销,不过可以通过 线程池机制 改善,线程池还可以让线程的创建和回收成本相对较低。使用FixedThreadPool 可以有效的控制了线程的最大数量,保证了系统有限的资源的控制,实现了N(客户端请求数量):M(处理客户端请求的线程数量)的伪异步I/O模型(N 可以远远大于 M),下面一节"伪异步 BIO"中会详细介绍到。
我们再设想一下当客户端并发访问量增加后这种模型会出现什么问题?
在 Java 虚拟机中,线程是宝贵的资源,线程的创建和销毁成本很高,除此之外,线程的切换成本也是很高的。尤其在 Linux 这样的操作系统中,线程本质上就是一个进程,创建和销毁线程都是重量级的系统函数。如果并发访问量增加会导致线程数急剧膨胀可能会导致线程堆栈溢出、创建新线程失败等问题,最终导致进程宕机或者僵死,不能对外提供服务。
为了解决同步阻塞I/O面临的一个链路需要一个线程处理的问题,后来有人对它的线程模型进行了优化一一一后端通过一个线程池来处理多个客户端的请求接入,形成客户端个数M:线程池最大线程数N的比例关系,其中M可以远远大于N.通过线程池可以灵活地调配线程资源,设置线程的最大值,防止由于海量并发接入导致线程耗尽。
伪异步IO模型图
采用线程池和任务队列可以实现一种叫做伪异步的 I/O 通信框架,它的模型图如上图所示。当有新的客户端接入时,将客户端的 Socket 封装成一个Task(该任务实现java.lang.Runnable接口)投递到后端的线程池中进行处理,JDK 的线程池维护一个消息队列和 N 个活跃线程,对消息队列中的任务进行处理。由于线程池可以设置消息队列的大小和最大线程数,因此,它的资源占用是可控的,无论多少个客户端并发访问,都不会导致资源的耗尽和宕机。
伪异步I/O通信框架采用了线程池实现,因此避免了为每个请求都创建一个独立线程造成的线程资源耗尽问题。不过因为它的底层任然是同步阻塞的BIO模型,因此无法从根本上解决问题。
举个栗子:选择器就相当于菜鸟驿站,只有当快递准备就绪时,才会给你打电话让你去取快递,所以选择器就是这么个角色,当数据准备就绪时,才会分配到服务端运行
public class TestBlockingNIO { //客户端 @Test public void client() throws IOException{ //1. 获取通道 SocketChannel sChannel = SocketChannel.open(new InetSocketAddress("127.0.0.1", 9898)); FileChannel inChannel = FileChannel.open(Paths.get("1.jpg"), StandardOpenOption.READ); //2. 分配指定大小的缓冲区 ByteBuffer buf = ByteBuffer.allocate(1024); //3. 读取本地文件,并发送到服务端 while(inChannel.read(buf) != -1){ buf.flip(); sChannel.write(buf); buf.clear(); } //4. 关闭通道 inChannel.close(); sChannel.close(); } //服务端 @Test public void server() throws IOException{ //1. 获取通道 ServerSocketChannel ssChannel = ServerSocketChannel.open(); FileChannel outChannel = FileChannel.open(Paths.get("2.jpg"), StandardOpenOption.WRITE, StandardOpenOption.CREATE); //2. 绑定连接 ssChannel.bind(new InetSocketAddress(9898)); //3. 获取客户端连接的通道 SocketChannel sChannel = ssChannel.accept(); //4. 分配指定大小的缓冲区 ByteBuffer buf = ByteBuffer.allocate(1024); //5. 接收客户端的数据,并保存到本地 while(sChannel.read(buf) != -1){ buf.flip(); outChannel.write(buf); buf.clear(); } //6. 关闭通道 sChannel.close(); outChannel.close(); ssChannel.close(); } }
package com.atguigu.nio; import java.io.IOException; import java.net.InetSocketAddress; import java.nio.ByteBuffer; import java.nio.channels.FileChannel; import java.nio.channels.ServerSocketChannel; import java.nio.channels.SocketChannel; import java.nio.file.Paths; import java.nio.file.StandardOpenOption; import org.junit.Test; public class TestBlockingNIO2 { //客户端 @Test public void client() throws IOException{ SocketChannel sChannel = SocketChannel.open(new InetSocketAddress("127.0.0.1", 9898)); FileChannel inChannel = FileChannel.open(Paths.get("1.jpg"), StandardOpenOption.READ); ByteBuffer buf = ByteBuffer.allocate(1024); while(inChannel.read(buf) != -1){ buf.flip(); sChannel.write(buf); buf.clear(); } // sChannel.shutdownOutput(); //接收服务端的反馈 int len = 0; while((len = sChannel.read(buf)) != -1){ buf.flip(); System.out.println(new String(buf.array(), 0, len)); buf.clear(); } inChannel.close(); sChannel.close(); } //服务端 @Test public void server() throws IOException{ ServerSocketChannel ssChannel = ServerSocketChannel.open(); FileChannel outChannel = FileChannel.open(Paths.get("2.jpg"), StandardOpenOption.WRITE, StandardOpenOption.CREATE); ssChannel.bind(new InetSocketAddress(9898)); SocketChannel sChannel = ssChannel.accept(); ByteBuffer buf = ByteBuffer.allocate(1024); while(sChannel.read(buf) != -1){ buf.flip(); outChannel.write(buf); buf.clear(); } //发送反馈给客户端 buf.put("服务端接收数据成功".getBytes()); buf.flip(); sChannel.write(buf); sChannel.close(); outChannel.close(); ssChannel.close(); } }
客户端发送完毕后,给个提示 sChannel.shutdownOutput();
package com.cy.NIO2; import org.junit.Test; import java.io.IOException; import java.net.InetSocketAddress; import java.nio.ByteBuffer; import java.nio.channels.FileChannel; import java.nio.channels.ServerSocketChannel; import java.nio.channels.SocketChannel; import java.nio.file.Paths; import java.nio.file.StandardOpenOption; public class test02 { //客户端 @Test public void client() throws IOException{ SocketChannel sChannel = SocketChannel.open(new InetSocketAddress("127.0.0.1", 9898)); FileChannel inChannel = FileChannel.open(Paths.get("1.jpg"), StandardOpenOption.READ); ByteBuffer buf = ByteBuffer.allocate(1024); while(inChannel.read(buf) != -1){ buf.flip(); sChannel.write(buf); buf.clear(); } //提示服务端传输完成 sChannel.shutdownOutput(); //接收服务端的反馈 int len = 0; while((len = sChannel.read(buf)) != -1){ buf.flip(); System.out.println(new String(buf.array(), 0, len)); buf.clear(); } inChannel.close(); sChannel.close(); } //服务端 @Test public void server() throws IOException { ServerSocketChannel ssChannel = ServerSocketChannel.open(); FileChannel outChannel = FileChannel.open(Paths.get("2.jpg"), StandardOpenOption.WRITE, StandardOpenOption.CREATE); ssChannel.bind(new InetSocketAddress(9898)); SocketChannel sChannel = ssChannel.accept(); ByteBuffer buf = ByteBuffer.allocate(1024); while(sChannel.read(buf) != -1){ buf.flip(); outChannel.write(buf); buf.clear(); } //发送反馈给客户端 buf.put("服务端接收数据成功".getBytes()); buf.flip(); sChannel.write(buf); sChannel.close(); outChannel.close(); ssChannel.close(); } }