传统的BIO模式
对于传统IO模型,其主要是一个Server对接N个客户端,在客户端连接之后,为每个客户端都分配一个执行线程。
特点:
① 每个客户端连接到达之后,都会分配一个线程给客户端。用于读写数据,编码解码以及业务计算。
② 服务端创建的连接数与服务端可创建线程数量呈线性关系
缺点(根据特点而定):
① 服务端的并发量与服务器可创建线程数有依赖关系
② 服务端线程不仅要IO读写数据,还要业务计算
③ 服务端先进行获取客户端连接,然后读取数据,最后写入数据的这些过程中都是阻塞的。在网络不好的情况下,降低了对线程的利用率,减少了服务器的吞吐量。
IO练习练习一
单发单收:客户端发送一行数据,服务端接收一行数据,通信架构的规则服务端与客户端的通信机制要一致
public class client { public static void main(String[] args) { try { Socket socket = new Socket("127.0.0.1",9999); OutputStream os = socket.getOutputStream(); //因为接受的时候是接受一行字符打印 //使用打印流比较好 PrintStream ps = new PrintStream(os); ps.println("Hello world"); ps.flush(); } catch (Exception e) { e.printStackTrace(); } } }
public class Server { public static void main(String[] args) { try { //1.创建ServerSocket对象用作服务端端口注册 ServerSocket ss = new ServerSocket(9999); //2.创建服务端socket Socket server = ss.accept(); //3.创建服务端输入流 InputStream is = server.getInputStream(); //创建字符输入流读取client传入的字符数据 BufferedReader br = new BufferedReader(new InputStreamReader(is)); String msg; if ((msg = br.readLine()) != null){ System.out.println("服务端接收到的信息" + msg); } } catch (Exception e) { e.printStackTrace(); } } }
IO练习二
多发多收:客户端发送多条消息,服务端接收多条消息(通过创建线程的方式实现)
缺点:
① 每个Socket接收到都会创建一个线程,线程的竞争,切换上下文影响性能
② 每个线程都会占用栈空间和CPU资源
③ 并不是每个Socket都进行IO操作,无意义的线程处理
④ 客户端并发增加时,服务端呈现线性线程开销,访问量越大,系统将发生线程栈溢出,线程创建失败,最终导致进程宕机或者僵死,从而不能对外提供服务
package javabasic.moreclient; import java.io.OutputStream; import java.io.PrintStream; import java.net.Socket; import java.util.Scanner; /** * @author Lyyyys * @version 1.0 * @date 2021/7/12 9:21 */ public class Client { public static void main(String[] args) { try { Socket socket = new Socket("127.0.0.1",9999); OutputStream os = socket.getOutputStream(); PrintStream ps = new PrintStream(os); Scanner scanner = new Scanner(System.in); while (true){ System.out.printf("客户端:"); String msg = scanner.nextLine(); ps.println(msg); ps.flush(); } } catch (Exception e) { e.printStackTrace(); } } }
package javabasic.moreclient; import javabasic.iodemo.Server; import java.io.IOException; import java.net.ServerSocket; import java.net.Socket; /** * @author Lyyyys * @version 1.0 * @date 2021/7/12 9:08 */ /* * 需求:实现服务端收取多个客户端发送的请求信息 * 思路:服务端每接收到一个客户端的请求的时候都建立一个线程给予通信 * */ public class server { public static void main(String[] args) { try { ServerSocket ss = new ServerSocket(9999); while (true){ Socket socket = ss.accept(); ServerThreadSocket threadSocket = new ServerThreadSocket(socket); threadSocket.run(); } } catch (Exception e) { e.printStackTrace(); } } }
package javabasic.moreclient; import java.io.BufferedReader; import java.io.IOException; import java.io.InputStream; import java.io.InputStreamReader; import java.net.Socket; /** * @author Lyyyys * @version 1.0 * @date 2021/7/12 9:15 */ public class ServerThreadSocket extends Thread{ private Socket socket; public ServerThreadSocket(Socket socket){ this.socket = socket; } @Override public void run() { try { InputStream is = socket.getInputStream(); BufferedReader br = new BufferedReader(new InputStreamReader(is)); String msg; while ((msg = br.readLine()) != null){ System.out.printf("服务端接收到的消息为" + msg); } } catch (Exception e) { e.printStackTrace(); } } }
IO练习三:改进IO练习二
多发多收:客户端发送多条消息,服务端接收多条消息(通过创建线程池的消息队列实现伪异步io)
package javabasic.fakenio; import java.io.OutputStream; import java.io.PrintStream; import java.net.Socket; import java.util.Scanner; /** * @author Lyyyys * @version 1.0 * @date 2021/7/12 10:09 */ public class client { public static void main(String[] args) { try { Socket socket = new Socket("127.0.0.1",9999); OutputStream os = socket.getOutputStream(); PrintStream ps = new PrintStream(os); Scanner scanner = new Scanner(System.in); while (true){ System.out.printf("客户端:"); String msg = scanner.nextLine(); ps.println(msg); ps.flush(); } } catch (Exception e) { e.printStackTrace(); } } }
package javabasic.fakenio; import java.util.concurrent.ArrayBlockingQueue; import java.util.concurrent.ExecutorService; import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.TimeUnit; /** * @author Lyyyys * @version 1.0 * @date 2021/7/12 9:58 */ public class HandlerSocketPool { private ExecutorService executorService; /* *public ThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue) { this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, Executors.defaultThreadFactory(), defaultHandler); * */ public HandlerSocketPool(int maximumPoolSize,int queueSize){ executorService = new ThreadPoolExecutor(3,maximumPoolSize,120 , TimeUnit.SECONDS,new ArrayBlockingQueue<Runnable>(queueSize)); } public void execute(Runnable target){ executorService.execute(target); } }
package javabasic.fakenio; import jdk.net.Sockets; import java.io.IOException; import java.net.ServerSocket; import java.net.Socket; /** * @author Lyyyys * @version 1.0 * @date 2021/7/12 9:56 */ /* * 实现伪异步io通信 * */ public class server { public static void main(String[] args) { try { //1.创建服务端注册端口 ServerSocket ss = new ServerSocket(9999); //2.创建线程池对象 HandlerSocketPool pool = new HandlerSocketPool(10,12); while (true){ //3.接收到客户端的请求,连接对象 Socket socket = ss.accept(); ServerRunnableTarget target = new ServerRunnableTarget(socket); pool.execute(target); } } catch (Exception e) { e.printStackTrace(); } } }
package javabasic.fakenio; import jdk.net.Sockets; import java.io.BufferedReader; import java.io.IOException; import java.io.InputStream; import java.io.InputStreamReader; import java.net.Socket; /** * @author Lyyyys * @version 1.0 * @date 2021/7/12 10:06 */ public class ServerRunnableTarget implements Runnable{ private Socket socket; public ServerRunnableTarget(Socket socket){ this.socket = socket; } @Override public void run() { try { InputStream is = socket.getInputStream(); BufferedReader br = new BufferedReader(new InputStreamReader(is)); String msg; while ((msg = br.readLine()) != null){ System.out.println("服务端接收到的数据是" + msg); } } catch (Exception e) { e.printStackTrace(); } } }
IO练习四
基于BIO模式下实现文件上传的案例,服务端使用根据连接数线性创建线程
思路:先用data流读取文件后缀名,然后再读取文件的数据。
package javabasic.fileupload; import java.io.DataOutputStream; import java.io.FileInputStream; import java.io.IOException; import java.io.OutputStream; import java.net.Socket; /** * @author Lyyyys * @version 1.0 * @date 2021/7/12 10:27 */ /* * 需求:实现客户端发送任意类型的文件数据给服务端保存起来 * * */ public class Client { public static void main(String[] args) { FileInputStream fis = null; DataOutputStream dos = null; try { fis = new FileInputStream("D:\\照片\\01.txt"); Socket socket = new Socket("127.0.0.1",9999); OutputStream os = socket.getOutputStream(); dos = new DataOutputStream(os); dos.writeUTF(".txt"); byte[] buffer = new byte[1024]; int len = 0; while ((len = fis.read(buffer)) > 0){ dos.write(buffer,0,len); } dos.flush(); socket.shutdownOutput(); } catch (Exception e) { e.printStackTrace(); } finally { if(fis != null){ try { fis.close(); } catch (IOException e) { e.printStackTrace(); } } if(dos != null){ try { dos.close(); } catch (IOException e) { e.printStackTrace(); } } } } }
package javabasic.fileupload; import java.io.FileOutputStream; import java.io.IOException; import java.net.ServerSocket; import java.net.Socket; /** * @author Lyyyys * @version 1.0 * @date 2021/7/12 10:27 */ /* * 需求:服务端可以接收客户端任意类型数据的文件,并保存到服务端的磁盘中去 * */ public class Server { public static void main(String[] args) { try { ServerSocket ss = new ServerSocket(9999); while (true){ Socket socket = ss.accept(); SocketThread thread = new SocketThread(socket); thread.run(); } } catch (IOException e) { e.printStackTrace(); } } }
package javabasic.fileupload; import java.io.DataInputStream; import java.io.FileOutputStream; import java.io.IOException; import java.io.InputStream; import java.net.Socket; import java.util.UUID; /** * @author Lyyyys * @version 1.0 * @date 2021/7/12 11:24 */ public class SocketThread extends Thread{ private Socket socket; public SocketThread(Socket socket){ this.socket = socket; } @Override public void run() { FileOutputStream fos = null; DataInputStream dis = null; try { InputStream is = socket.getInputStream(); dis = new DataInputStream(is); String suffix = dis.readUTF(); fos = new FileOutputStream("D:\\照片\\server\\" + UUID.randomUUID() + suffix); int len = 0; byte[] buffer = new byte[1024]; while ((len = dis.read(buffer)) > 0){ fos.write(buffer,0,len); } } catch (IOException e) { e.printStackTrace(); } finally { if(fos != null){ try { fos.close(); } catch (IOException e) { e.printStackTrace(); } } } } }
IO练习练习常用方法
public class Demo1 { public static void main(String[] args) { } //递归地列出一个目录下所有文件: private static void listAllFiles(File dir){ if(dir == null || !dir.exists()){ return; } if(dir.isFile()){ System.out.println(dir.getName()); return; } for (File file:dir.listFiles()) { listAllFiles(file); } } //复制文件 public static void copyFile(String src,String dest) throws IOException { FileInputStream in = new FileInputStream(src); FileOutputStream out = new FileOutputStream(dest); byte[] buffer = new byte[20*1024]; while (in.read(buffer,0,buffer.length) != -1){ out.write(buffer); } in.close(); out.close(); } //实现逐行输出文本文件的内容 public static void readFileContent(String filePath) throws IOException { BufferedReader br = new BufferedReader(new FileReader(filePath)); String line =""; while(br.readLine() != null){ System.out.println(line = br.readLine()); } //为什么只需要关闭一个窗口即可因为bufferReader采用的是装饰器模式 //调用BufferReader的close方法也会调用FileReader的close方法 br.close(); } }
NIO三大重点
一、通道
通道与流的不同之处在于
流只能在一个方向上移动(一个流必须是 InputStream 或者 OutputStream 的子类),而通道是双向的,可以用于读、写或者同时用于读写。 通道包括以下类型:
① FileChannel: 从文件中读写数据;
② DatagramChannel: 通过 UDP 读写网络中数据;
③ SocketChannel: 通过 TCP 读写网络中数据;
④ ServerSocketChannel: 可以监听新进来的 TCP 连接,对每一个新进来的连接都会创建一个 SocketChannel。
二、缓冲区
发送给一个通道的所有数据都必须首先放到缓冲区中,同样地,从通道中读取的任何数据都要先读到缓冲区中。也就是说,不会直接对通道进行读写数据,而是要先经过缓冲区。 缓冲区实质上是一个数组,但它不仅仅是一个数组。缓冲区提供了对数据的结构化访问,而且还可以跟踪系统的读/写进程。
public static ByteBuffer allocate(int capacity) public static ByteBuffer allocateDirect(int capacity)
第一种分配方式产生的内存开销是在JVM中的,而另外一种的分配方式产生的开销在JVM之外,以就是系统级的内存分配。当Java程序接收到外部传来的数据时,首先是被系统内存所获取,然后在由系统内存复制复制到JVM内存中供Java程序使用。
区别:数据量小第一种比较快因为直接就分配到JVM内存中,但当数据量大的时候使用后者较好,因为第一种还要从系统内存当中复制到JVM内存中。
public class demo2 { public static void main(String[] args) throws IOException { FileInputStream fio = new FileInputStream("src"); //获取输入字节流的通道 FileChannel fiochannel = fio.getChannel(); FileOutputStream fos = new FileOutputStream("dist"); //获取输出字节流的通道 FileChannel foschannel = fos.getChannel(); //为缓冲区分配1024字节 ByteBuffer byteBuffer = ByteBuffer.allocateDirect(1024); while (true){ //从输入通道中读取数据到缓冲区中 int r = fiochannel.read(byteBuffer); if(r == -1) break; //缓冲区切换读写 byteBuffer.flip(); //由输出通道去写缓冲区的内容 foschannel.write(byteBuffer); //清空缓冲区的内容 byteBuffer.clear(); } } }
三、选择器
NIO 实现了 IO 多路复用中的 Reactor 模型,一个线程 Thread 使用一个选择器 Selector 通过轮询的方式去监听多个通道 Channel 上的事件,从而让一个线程就可以处理多个事件。
ServerSocketChannel ssChannel = ServerSocketChannel.open(); ssChannel.configureBlocking(false); ssChannel.register(selector, SelectionKey.OP_ACCEPT);
通过配置监听的通道 Channel 为非阻塞,那么当 Channel 上的 IO 事件还未到达时,就不会进入阻塞状态一直等待,而是继续轮询其它 Channel,找到 IO 事件已经到达的 Channel 执行
在将通道注册到选择器上时,还需要指定要注册的具体事件,主要有以下几类:
public static final int OP_READ = 1 << 0; public static final int OP_WRITE = 1 << 2; public static final int OP_CONNECT = 1 << 3; public static final int OP_ACCEPT = 1 << 4;
① SelectionKey.OP_ACCEPT —— 接收连接继续事件,表示服务器监听到了客户连接,服务器可以接收这个连接了
② SelectionKey.OP_CONNECT —— 连接就绪事件,表示客户与服务器的连接已经建立成功
③ SelectionKey.OP_READ —— 读就绪事件,表示通道中已经有了可读的数据,可以执行读操作了(通道目前有数据,可以进行读操作了)
④ SelectionKey.OP_WRITE —— 写就绪事件,表示已经可以向通道写数据了(通道目前可以用于写操作)
1.当向通道中注册SelectionKey.OP_READ事件后,如果客户端有向缓存中write数据,下次轮询时,则会 isReadable()=true;
2.当向通道中注册SelectionKey.OP_WRITE事件后,这时你会发现当前轮询线程中isWritable()一直为ture,如果不设置为其他事件
4.IO多路复用
NIOClient实例
package javabasic.niodemo; import java.io.IOException; import java.io.OutputStream; import java.net.Socket; /** * @author Lyyyys * @version 1.0 * @date 2021/7/18 11:39 */ public class NIOClient { public static void main(String[] args) throws IOException { Socket socket = new Socket("127.0.0.1", 8888); OutputStream out = socket.getOutputStream(); String s = "hello world"; //前后IO传输格式一致,byte字节 out.write(s.getBytes()); out.close(); } }
NIOServer实例
package javabasic.niodemo; import java.io.IOException; import java.net.InetSocketAddress; import java.net.ServerSocket; import java.nio.ByteBuffer; import java.nio.channels.SelectionKey; import java.nio.channels.Selector; import java.nio.channels.ServerSocketChannel; import java.nio.channels.SocketChannel; import java.util.Iterator; import java.util.Set; /** * @author Lyyyys * @version 1.0 * @date 2021/7/18 11:39 */ public class NIOServer { public static void main(String[] args) throws Exception { //1.创建选择器 Selector selector = Selector.open(); //2.讲通道注册到选择器上 ServerSocketChannel sschannel = ServerSocketChannel.open(); sschannel.register(selector, SelectionKey.OP_ACCEPT); sschannel.configureBlocking(false); //设置绑定ServerSocket的地址 ServerSocket ssocket = sschannel.socket(); InetSocketAddress address = new InetSocketAddress("127.0.0.1",8888); ssocket.bind(address); //4.事件循环 while (true){ //3.监听事件 selector.select(); Set<SelectionKey> keys = selector.selectedKeys(); Iterator<SelectionKey> iterator = keys.iterator(); if(iterator.hasNext()){ SelectionKey key = iterator.next(); if(key.isAcceptable()){ ServerSocketChannel sschannel1 = (ServerSocketChannel) key.channel(); //服务器为每一个连接创建SocketChannel SocketChannel sChannel = sschannel1.accept(); sChannel.configureBlocking(false); sChannel.register(selector,SelectionKey.OP_READ); }else if(key.isReadable()){ SocketChannel sChannel = (SocketChannel) key.channel(); System.out.println(readDataFromSocketChannel(sChannel)); sChannel.close(); } iterator.remove(); } } } private static String readDataFromSocketChannel(SocketChannel sChannel) throws IOException { ByteBuffer buffer = ByteBuffer.allocate(1024); StringBuffer data = new StringBuffer(); while (true){ int n = sChannel.read(buffer); if(n == -1){ break; } buffer.flip(); int limit = buffer.limit(); char[] dst = new char[limit]; for (int i = 0; i < limit; i++) { dst[i] = (char) buffer.get(i); } data.append(dst); buffer.clear(); } return data.toString(); } }