I/O实际上是Input和Output,也就是输入和输出。而流其实是一种抽象的概念,它表示的是数据的无结构化传递。会被当成无结构的字节序列或字符序列。流可以当作是磁盘与内存之间的一个管道。
在Java中I/O流操作很多,但是核心体系实际上就只有File(文件流)、InputStream(字节输入流)、OutputStream(字节输出流)、Reader(字符输入流)、Writer(字符输出流)。
- 访问管道处理流,是用来去完成管道的读写操作,用于线程间的通讯
- 访问数组处理流,是针对内存的操作
- 缓冲流是提供一个缓冲区,对于缓冲区的一个处理流,避免每次与磁盘的交互,提高输入输出的一个效率
- 对象流,主要用在序列化这个机制上,将一个对象序列化后转换成一个可存储可传输的对象,传输时用到的流。
- 转换流:将字符流转换成字节流
- 打印流
- 硬盘
- 内存
- 键盘
- 网络
File类是Java中为文件进行创建、删除、重命名、移动等操作而设计的一个类
- File(File parent, String child):根据parent抽象路径名和child路径名字符串创建一个新的File实例。
- File(String pathname):将指定路径名转化为抽象路径名创建一个新的File实例。
- File(String parent, String child):根据parent路径名和child路径名创建一个File实例。
- File(URI uri):指定URI转化为抽象路径名。
public static void main(String[] args) { File file = new File("D:\\appdata\\IODemo\\Capture001.png"); try ( FileOutputStream fileOutputStream = new FileOutputStream("D:\\appdata\\IODemo\\Capture002.png"); FileInputStream fileInputStream = new FileInputStream(file)) { // 1.7之后,将流写入try()中,代码执行完毕后,会自动关闭流 int len = 0; byte[] buffer = new byte[1024]; long start = System.currentTimeMillis(); while ((len = fileInputStream.read(buffer)) != -1) { fileOutputStream.write(buffer, 0, len); } long end = System.currentTimeMillis(); System.out.println((end - start) / 1000); } catch (IOException e) { e.printStackTrace(); } }
流一定要关闭,否则当前线程没执行完会一直使其被进程占用。
try (FileReader reader = new FileReader("/appdata/IODemo/IODemo"); FileWriter writer = new FileWriter("/appdata/IODemo/IODemo.txt")) { int i = 0; char[] chars = new char[1]; while ((i = reader.read(chars)) != -1) { writer.write(new String(chars, 0, i)); } } catch (Exception e) { e.printStackTrace(); }
缓冲流是带缓冲区的处理流,他会提供一个缓冲区,缓冲区的作用主要目的是:避免每次和硬盘打交道,能够提高输入/输出的执行效率。
BufferedInputStream
private static int DEFAULT_BUFFER_SIZE = 8192; // 默认8Kb的缓冲区 private static int MAX_BUFFER_SIZE = Integer.MAX_VALUE - 8; // 最大缓冲区大小 // 每次读取的8Kb size的字节会存储在buf[]数组中 //每次调用read()方法时,会首先去尝试从这个数组中读取,如果读取失败,会从数据源(磁盘上)去读取 protected volatile byte buf[]; // 两种构造方法最终调用该方法,带int参数的会覆盖默认的8Kb size public BufferedInputStream(InputStream in, int size) { super(in); if (size <= 0) { throw new IllegalArgumentException("Buffer size <= 0"); } buf = new byte[size]; }
其实缓冲流原理上是帮我们封装了8Kb大小的数据,先从磁盘读8Kb到我们内存,后由我们自己去操作这8Kb的数据,当处理完8Kb缓冲区没有了,再加载数据到缓冲区,再读到内存去处理。当我们用普通流去处理文件,将buffer[]设置的稍微大一点,一样可以达到提高效率的结果。
public static void main(String[] args) { try (BufferedInputStream bufferedInputStream = new BufferedInputStream(new FileInputStream("/appdata/IODemo/IODemo")); BufferedOutputStream bufferedOutputStream = new BufferedOutputStream(new FileOutputStream("/appdata/IODemo/IODemo.txt"))) { int len = 0; byte[] bytes = new byte[1024]; while ((len = bufferedInputStream.read(bytes)) != -1) { // System.out.println(new String(bytes, 0, len)); bufferedOutputStream.write(bytes, 0, len); bufferedOutputStream.flush(); } } catch (IOException e) { e.printStackTrace(); } }
将创建InputStream写入到try()中,可以帮我们实现close()关闭流的操作,这个close中包含了buffred的flush操作,如果没有关闭流,又没有手动flush(),将会丢失数据。
public void close() throws IOException { try (OutputStream ostream = out) { flush(); } }
try (BufferedReader reader = new BufferedReader(new InputStreamReader(new FileInputStream("/appdata/IODemo/IODemo"), StandardCharsets.UTF_8))) { String str; while ((str = reader.readLine()) != null) { System.out.println(str); } } catch (Exception e) { e.printStackTrace(); }
try (InputStream inputStream = new FileInputStream("/appdata/IODemo/IODemo"); InputStreamReader reader = new InputStreamReader(inputStream, StandardCharsets.UTF_8)) { char[] chars = new char[1024]; int i; while ((i = reader.read(chars)) != -1) { System.out.println(new String(chars, 0, i)); } } catch (Exception e) { e.printStackTrace(); }
在这个转换流中,时可以指定字符集编码的。
关于序列化和反序列化这个问题,我在18年参加工作的时候,遇到过一个项目,之后就再没有用过了。当时架构还是分布式dubbo+zookeeper,但是传输报文竟然用到这个我是没想到的。
什么是序列化和反序列化?
- 序列化是把对象的状态信息转化为可存储或传输的形式的过程,也就是把对象转化为字节序列的过程成为对象的序列化
- 反序列化是序列化的逆向过程,把字节数组反序列化为对象。
public class UserSerializable implements Serializable { private static final long serialVersionUID = 8160464260217334369L; private String name; private int age; public void setName(String name) { this.name = name; } public void setAge(int age) { this.age = age; } @Override public String toString() { return "UserSerializable{" + "name='" + name + '\'' + ", age=" + age + '}'; } public static void main(String[] args) { UserSerializable user = new UserSerializable(); user.setAge(26); user.setName("Elian"); String fileName = "/appdata/IODemo/User"; try (FileInputStream fileInputStream = new FileInputStream(fileName); FileOutputStream fileOutputStream = new FileOutputStream(fileName); ObjectOutputStream outputStream = new ObjectOutputStream(fileOutputStream); ObjectInputStream objectInputStream = new ObjectInputStream(fileInputStream) ) { outputStream.writeObject(user); outputStream.flush(); UserSerializable newUser = (UserSerializable) objectInputStream.readObject(); System.out.println(newUser); } catch (Exception e) { e.printStackTrace(); } } }
public class NIOFirstDemo { public static void main(String[] args) { bio(); bufferBio(); nio(); mmap(); zeroCopy(); } private static void bio() { try (FileInputStream bioInputStream = new FileInputStream("/appdata/IODemo/jdk api 1.8_google.CHM"); FileOutputStream bioOutputStream = new FileOutputStream("/appdata/IODemo/jdk_bio.CHM")) { // bio实现copy long bioStart = System.currentTimeMillis(); int len = 0; byte[] buffer = new byte[1024]; while ((len = bioInputStream.read(buffer)) != -1) { bioOutputStream.write(buffer, 0, len); } bioOutputStream.flush(); System.out.println(System.currentTimeMillis() - bioStart); } catch (Exception e) { e.printStackTrace(); } } private static void bufferBio() { try (BufferedInputStream bioInputStream = new BufferedInputStream(new FileInputStream("/appdata/IODemo/jdk api 1.8_google.CHM")); BufferedOutputStream bioOutputStream = new BufferedOutputStream(new FileOutputStream("/appdata/IODemo/jdk_bufferBio.CHM"))) { // bio实现copy long bioStart = System.currentTimeMillis(); int len = 0; byte[] buffer = new byte[1024]; while ((len = bioInputStream.read(buffer)) != -1) { bioOutputStream.write(buffer, 0, len); } bioOutputStream.flush(); System.out.println(System.currentTimeMillis() - bioStart); } catch (Exception e) { e.printStackTrace(); } } private static void nio() { try (FileChannel inChannel = FileChannel.open(Paths.get("/appdata/IODemo/jdk api 1.8_google.CHM"), StandardOpenOption.READ); FileChannel outChannel = FileChannel.open(Paths.get("/appdata/IODemo/jdk_nio.CHM"), StandardOpenOption.CREATE, StandardOpenOption.WRITE, StandardOpenOption.READ);) { // nio 实现copy long nioStart = System.currentTimeMillis(); int len = 0; ByteBuffer buffer = ByteBuffer.allocate(1024); while ((len = inChannel.read(buffer)) != -1) { buffer.flip(); outChannel.write(buffer); buffer.clear(); } System.out.println(System.currentTimeMillis() - nioStart); } catch (Exception e) { e.printStackTrace(); } } // 依然将用户空间的 private static void mmap() { try (FileChannel inChannel = FileChannel.open(Paths.get("/appdata/IODemo/jdk api 1.8_google.CHM"), StandardOpenOption.READ); FileChannel outChannel = FileChannel.open(Paths.get("/appdata/IODemo/jdb_mmap.CHM"), StandardOpenOption.CREATE, StandardOpenOption.WRITE, StandardOpenOption.READ);) { long mmapStart = System.currentTimeMillis(); MappedByteBuffer inMappedBuffer = inChannel.map(FileChannel.MapMode.READ_ONLY, 0, inChannel.size()); MappedByteBuffer outMappedBuffer = outChannel.map(FileChannel.MapMode.READ_WRITE, 0, inChannel.size()); byte[] bytes = new byte[inMappedBuffer.limit()]; inMappedBuffer.get(bytes); outMappedBuffer.put(bytes); System.out.println("mmap:" + (System.currentTimeMillis() - mmapStart)); } catch (Exception e) { e.printStackTrace(); } } private static void zeroCopy() { try(FileChannel inChannel = FileChannel.open(Paths.get("/appdata/IODemo/jdk api 1.8_google.CHM"), StandardOpenOption.READ); FileChannel outChannel = FileChannel.open(Paths.get("/appdata/IODemo/jdk_zeroCopy.CHM"), StandardOpenOption.CREATE, StandardOpenOption.WRITE, StandardOpenOption.READ);) { long zeroCopyStart = System.currentTimeMillis(); inChannel.transferTo(0, inChannel.size(), outChannel); System.out.println(System.currentTimeMillis() - zeroCopyStart); } catch (Exception e) { e.printStackTrace(); } } }
实验顺序(速度由快到慢排序)
zeroCopy(零拷贝) > mmap(内存映射) > bufferedInputStream > bio(基于channle) ~= nio
zerCopy无需将文件映射到内存,mmap会将buffer读进内存,关于Buffer继续往下看4.2。
// 服务端 final int DEFAULT_PORT = 9090; try (ServerSocket serverSocket = new ServerSocket(DEFAULT_PORT)) { Socket socket = serverSocket.accept();// 阻塞操作,等待客户端的连接 System.out.println("Client port:" + socket.getPort() + " has been connected!"); BufferedReader bufferedReader = new BufferedReader(new InputStreamReader(socket.getInputStream(), StandardCharsets.UTF_8)); String str = bufferedReader.readLine(); System.out.println("Client Content:" + str); BufferedWriter writer = new BufferedWriter(new OutputStreamWriter(socket.getOutputStream(), StandardCharsets.UTF_8)); writer.write(str + "\n"); // 如果不换行,客户端会一直等待读取完 writer.flush(); bufferedReader.close(); writer.close(); } catch (Exception e) { e.printStackTrace(); } try (Socket socket = new Socket("localhost", 9090)) { OutputStream outputStream = socket.getOutputStream(); outputStream.write("Hello Elian\n".getBytes(StandardCharsets.UTF_8)); // 不换行服务端会一直等待读取完,进入阻塞 BufferedReader reader = new BufferedReader(new InputStreamReader(socket.getInputStream())); System.out.println(reader.readLine()); } catch (Exception e) { e.printStackTrace(); }
客户端是怎样找到目标服务的呢?
客户端发起请求的时候,在不同的层去增加不同的协议头,在数据链路层组装目标机器的Mac地址,这个地址是通过ARP协议,我们已知目标的IP,需要获得目标的Mac地址,会发送一个广播消息,会在网段内去询问这个IP是谁,目标地址会发送自己Mac地址给到当前这个发送端,就可以去组装目标的Mac地址。那么在数据发送过程中,进入IP广播后,某个网卡就会发现,对应Mac的网卡就会把数据包收进来。
本地磁盘IO通信:
网络磁盘通信:
两者不同在于:本地磁盘要通过DMA(直接存储访问器)将磁盘上的内容读取到内核空间缓冲区,再从内核空间缓冲区读到用户空间缓冲区进行操作。而网络IO是通过网卡中的缓冲区读取到系统内核缓冲区,如果应用进程一直没有调用socket的read()方法读取数据将数据copy到用户缓冲区,数据会一直被缓存在内核缓冲区里面。
accept()每次只能接收一个并处理一个socket,这样只能等上一个socket处理完才能继续处理下一个请求。BIO每次阻塞两个位置,第一个阻塞位置是accept过程,另一个阻塞过程是I/O流读写的过程。
解决办法:通过线程池进行处理。
public static void main(String[] args) { final int DEFAULT_PORT = 9090; try (ServerSocket serverSocket = new ServerSocket(DEFAULT_PORT)) { ExecutorService executorService = Executors.newFixedThreadPool(4); while (true) { Socket socket = serverSocket.accept();// 阻塞操作,等待客户端的连接 executorService.submit(new ServerSocketThread(socket)); } } catch (Exception e) { e.printStackTrace(); } } public class ServerSocketThread implements Runnable { private Socket socket; public ServerSocketThread(Socket socket) { this.socket = socket; } @Override public void run() { System.out.println("Client port:" + socket.getPort() + " has been connected"); try (BufferedReader reader = new BufferedReader(new InputStreamReader(socket.getInputStream())); BufferedWriter writer=new BufferedWriter((new OutputStreamWriter(socket.getOutputStream())))){ String clinetStr = reader.readLine(); System.out.println("Client resived message: " + clinetStr); Thread.sleep(15000); writer.write("OK.\n"); } catch(Exception e){ e.printStackTrace(); } } }
现在还有一个缺点:
线程数取决于计算机本身的线程数,但是线程数设置太大,又会造成线程之间切换造成的资源消耗。
RPC(Remote Procedure Call) 远程过程调用,是一种通过网络从计算机程序上请求服务,而不需要了解底层网络技术的协议。一般用来实现部署在不同机器上的系统之间的方法调用,使得程序能够像访问本地系统资源一样,通过网络传输去访问远端系统资源。
// 1. 公共类 // 接口 public interface IHelloWorld { String sayHello(String content); } // Request public class RpcRequest implements Serializable { private static final long serialVersionUID = -7922155162004878476L; private String className; private String methodName; private Object[] parameters; private Class[] types; public String getClassName() { return className; } public void setClassName(String className) { this.className = className; } public String getMethodName() { return methodName; } public void setMethodName(String methodName) { this.methodName = methodName; } public Object[] getParameters() { return parameters; } public void setParameters(Object[] parameters) { this.parameters = parameters; } public Class[] getTypes() { return types; } public void setTypes(Class[] types) { this.types = types; } } // 2. provider // impl public class HelloWorldImpl implements IHelloWorld { @Override public String sayHello(String content) { return "Hello " + content; } } // Server public class RpcProxyServer { private final ExecutorService executorService = Executors.newCachedThreadPool(); public void publisher (int port) { try (ServerSocket server = new ServerSocket(port)) { while (true) { final Socket socket = server.accept(); executorService.execute(new ProcessorHandler(socket)); } } catch (Exception e) { e.printStackTrace(); } } } public class ProcessorHandler implements Runnable { private final Socket socket; public ProcessorHandler(Socket socket) { this.socket = socket; } @Override public void run() { try (ObjectOutputStream objectOutputStream = new ObjectOutputStream(socket.getOutputStream()); ObjectInputStream objectInputStream = new ObjectInputStream(socket.getInputStream())) { RpcRequest request = (RpcRequest)objectInputStream.readObject(); Object object = invoke(request); objectOutputStream.writeObject(object); objectOutputStream.flush(); } catch (Exception e) { e.printStackTrace(); } } private Object invoke(RpcRequest request) throws Exception { Class<?> clazz = Class.forName(request.getClassName()); Method method = clazz.getMethod(request.getMethodName(), request.getTypes()); if (request.getClassName().substring(request.getClassName().lastIndexOf('.') + 1).equals("IHelloWorld")) return method.invoke(new HelloWorldImpl(), request.getParameters()); else return null; } } // 3.consumer // client public class App { public static void main( String[] args ) { RpcProxyClient client = new RpcProxyClient(); IHelloWorld iHelloWorld = client.clientProxy(IHelloWorld.class, "localhost", 9090); System.out.println(iHelloWorld.sayHello("Elian")); } } // Client public class RpcProxyClient { public <T> T clientProxy(final Class<T> interfaceCls, final String host, final int port) { return (T) Proxy.newProxyInstance(interfaceCls.getClassLoader(), new Class<?>[]{interfaceCls}, new RemoteInvocationHandler(host, port)); } } // 动态代理类 public class RemoteInvocationHandler implements InvocationHandler { private String host; private int port; public RemoteInvocationHandler(String host, int port) { this.host = host; this.port = port; } @Override public Object invoke(Object proxy, Method method, Object[] args) throws Throwable { RpcRequest request = new RpcRequest(); request.setClassName(method.getDeclaringClass().getName()); request.setMethodName(method.getName()); request.setParameters(args); request.setTypes(method.getParameterTypes()); RpcNetTransport rpcNetTransport = new RpcNetTransport(host, port); Object object = rpcNetTransport.send(request); return object; } } // reader读取返回报文 public class RpcNetTransport { private String host; private int port; public RpcNetTransport(String host, int port) { this.host = host; this.port = port; } public Object send( RpcRequest request ) { try (Socket socket = new Socket(host, port); ObjectOutputStream objectOutputStream = new ObjectOutputStream(socket.getOutputStream()); ObjectInputStream objectInputStream = new ObjectInputStream(socket.getInputStream())) { objectOutputStream.writeObject(request); objectOutputStream.flush(); return objectInputStream.readObject(); } catch (Exception e) { e.printStackTrace(); } return null; } }
阻塞IO,非阻塞IO,IO复用,信号驱动IO,异步IO,无论哪种IO,都是为了能够提高服务端能够并行处理的连接数量。
阻塞IO
应用进程调用accept()时,触发系统把数据从网卡缓冲区复制到内核空间,再从内核空间复制到用户空间,如果这个过程中,数据没有准备好,到数据返回或发生错误返回之前,用户进程一直处于阻塞状态,这个就是阻塞IO。
非阻塞是指用户进程调用accept()后,如果数据没有准备好,会返回一个EWOULDBLOCK状态,并创建一个线程出来不断轮询返回结果。由此可见会增加CPU的消耗。
单个进程可以同时处理多个客户端的网络IO链接,我们可以把所有链接过来的客户端注册到select/poll复用器上,用一个线程或者进程来调用这个select/poll,调用这个select的时候会阻塞,阻塞的时候,内核会去监视所有select/poll所负责的socket,当其中一个socket准备好的时候,那么这个select/poll就会返回,如果再次调用这个select的时候,就会把数据从内核拷贝到用户空间。
select/poll模型最大的缺点是,他只能线性的轮询1024个链接,当然这1024个链接只有少数处于活跃状态,会导致网络的延迟。jdk1.5之前的NIO是使用这种模型。
这种模型处理的情况是:多个不同的监听,而且只是提高了并发连接数,并不是提高单个线程处理性能。连接数少的情况下,不一定比BIO效率更高。
对select/poll进行的优化:
- 对单个进程所打开的连接数没有限制;
- 利用每个文件描述符fd上的callback函数来实现异步回调,不需要轮询了;
- mmap,可以通过内核和用户空间映射同一块内存来减少内存复制。
总结:
相比较老的IO来说,所有操作都是基于Channel和Buffer来说的,可以将Channel看成是InputStream/OutputStream,应用程序与磁盘/网络缓冲区之间的一个通道,而所有数据操作都是通过缓冲区来实现的。
通道(Channel):Java NIO数据来源,可以是网络,也可以是本地磁盘
缓冲区(Buffer):数据读写的中转区
选择器(selectors):异步IO的核心类,可以实现异步非阻塞IO,一个selectors可以管理多个通道Channel
FileChannle:从文件中读取数据
DatagramChannel:通过UDP协议读写网络中的数据
SocketChannel:通过TCP协议读写网络中的数据
ServerSocketChannel:监听一个TCP连接,对于每一个新的客户端连接都会创建一个SocketChannel
缓冲区本质上是一块可以写入的数据,以及从中读取数据的内存,实际上也是一个byte[]数据,只是在NIO中被封装成了NIO Buffer对象,并提供了一组方法来访问这个内存块,要理解buffer的工作原理,需要知道几个属性:
private int position = 0; // 下一个位置 private int limit; // private int capacity; // 容量,buffer数组初始化的最大容量 private int mark; // 标记
读:position=0; limit = capacity = [size];当要添加的数据byte[].lenth > limit - position时都可以成功。
flip():limit=position; position=0,防止多余数据的写出
写:position遍历到limit的过程
get():有4个重载方法,get()获取一个单字节,get(int) 获取特定位置的字节,get(byte[]) ,get(byte[], int, int)获取一段字节
put():有5个重载,put(byte),put(int, byte),put(ByteBuffer),put(byte[], int, int),put(byte[])
堆内内存:由JVM控制的内存,堆外内存不数据JVM运行时内存,而是用的系统内存,但GC会触发回收。ByteBuffer有两个子类:HeapByteBuffer和DirectByteBuffer
HeapByteBuffer:JVM堆内存
DirectByteBuffer:堆外本地内存
MappedByteBuffer:mmap的内存映射,读写性能极高
MappedByteBuffer将文件直接映射到内存。可以映射整个文件,如果文件比较大的话可以考虑分段进行映射,只要指定文件的感兴趣部分就可以。
由于MappedByteBuffer申请的是直接内存,因此不受Minor GC控制,只能在发生Full GC时才能被回收,因此Java提供了DirectByteBuffer类来改善这一情况。它是MappedByteBuffer类的子类,同时它实现了DirectBuffer接口,维护一个Cleaner对象来完成内存回收。因此它既可以通过Full GC来回收内存,也可以调用clean()方法来进行回收
FileChannel提供了map方法来把文件映射为内存对象:
MappedByteBuffer outMappedBuffer = outChannel.map(FileChannel.MapMode.READ_WRITE, 0, inChannel.size());
使用堆外内存的原因
对垃圾回收停顿的改善。因为full gc时,垃圾收集器会对所有分配的堆内内存进行扫描,垃圾收集对Java应用造成的影响,跟堆的大小是成正比的。过大的堆会影响Java应用的性能。如果使用堆外内存的话,堆外内存是直接受操作系统管理。这样做的结果就是能保持一个较小的JVM堆内存,以减少垃圾收集对应用的影响。(full gc时会触发堆外空闲内存的回收。)
减少了数据从JVM拷贝到native堆的次数,在某些场景下可以提升程序I/O的性能。
可以突破JVM内存限制,操作更多的物理内存。
使用堆外内存的问题
堆外内存难以控制,如果内存泄漏,那么很难排查(VisualVM可以通过安装插件来监控堆外内存)。
堆外内存只能通过序列化和反序列化来存储,保存对象速度比堆内存慢,不适合存储很复杂的对象。一般简单的对象或者扁平化的比较适合。
直接内存的访问速度(读写方面)会快于堆内存。在申请内存空间时,堆内存速度高于直接内存。
当直接内存不足时会触发full gc,排查full gc的时候,一定要考虑。
ByteBuffer模型
初始
read(), put()
position = n; limit = capacity = 8; mark = -1;
flip()
limit = position; // 用来设置限制 position = 0; mark = -1;
mark()
mark = postion; // 标记
reset()
position = mark;
clear()实际上数据还在
position = 0; limit = capacity; mark = -1;
Linux支持的零拷贝方式:
server
public class ZeroCopyServer { public static void main(String[] args) { try (ServerSocketChannel serverSocketChannel = ServerSocketChannel.open(); RandomAccessFile writeFile = new RandomAccessFile("/appdata/IODemo/Capture001_zerCopy.png", "rw"); FileChannel fileChannel = writeFile.getChannel(); ) { long start = System.currentTimeMillis(); serverSocketChannel.bind(new InetSocketAddress(9090)); SocketChannel socketChannel = serverSocketChannel.accept(); ByteBuffer buffer = ByteBuffer.allocate(8192); int i = 0; int j = 0; /*while ((i = socketChannel.read(buffer)) != -1) { buffer.flip(); fileChannel.map(FileChannel.MapMode.READ_WRITE, j, i ).put(buffer); buffer.clear(); j += i; }*/ // 2527ms mmap()方式 while ((i = socketChannel.read(buffer)) != -1) { buffer.flip(); fileChannel.write(buffer); buffer.clear(); j += i; } // 4462ms 普通写 System.out.println("传输大小:" + j + ";时间:" + (System.currentTimeMillis() - start)); } catch (Exception e) { e.printStackTrace(); } } }
client
public class ZeroCopyClient { public static void main(String[] args) { try (SocketChannel socketChannel = SocketChannel.open(); FileChannel fileChannel = FileChannel.open(Paths.get("/appdata/IODemo/Capture001.png"))) { socketChannel.connect(new InetSocketAddress("localhost", 9090)); int position = 0; long size=fileChannel.size(); while (size > 0) { long transfer = fileChannel.transferTo(position, fileChannel.size(), socketChannel); // 零拷贝,只从File Copy到缓冲区 position += transfer; size -= transfer; } System.out.println("上传文件大小:" + position); } catch (Exception e) { e.printStackTrace(); } } }
Selector(选择器,多路复用器)是Java NIO中能够检测一到多个NIO通道,是否为诸如读写事件做好准备的组件。这样,一个单独的线程可以管理多个channel,从而管理多个网络连接。
服务端处理过程:
Selector.open()开启一个多路复用器,将ServerSocketChannel注册到selector上,这个ServerSocketServer必然不能是阻塞的,一个Channel会以4种状态注册到selector上:
通过Selector的select()方法可以阻塞selection的操作,当通道中有已准备好进行I/O操作的SelectionKey,会返回这些准备好的SelectionKey的个数,下面是select()的重载方法:
int select():阻塞到至少有一个通道在你注册的事件上就绪了。
int select(long timeout):和select()一样,但最长阻塞时间为timeout毫秒。
int selectNow():非阻塞,只要有通道就绪就立刻返回。
select()方法返回的int值表示有多少通道已经就绪,是自上次调用select()方法后有多少通道变成就绪状态。之前在select()调用时进入就绪的通道不会在本次调用中被记入,而在前一次select()调用进入就绪但现在已经不在处于就绪的通道也不会被记入。例如:首次调用select()方法,如果有一个通道变成就绪状态,返回了1,若再次调用select()方法,如果另一个通道就绪了,它会再次返回1。
当selector.select()返回了准备好的连接数量后,可以通过selector.selectedKeys()获取所有已就绪的Channel的描述符selectedKey,这个selectKyes中包含了它所对应的selector和channel,并且能获取到当前这个selectedKey对应的channel的状态(key.isAcceptable()等)
如果当前selectedKey描述的是一个isAcceptable(),可以从当前selectedKey中将其对应的ServerSocketChannel也就是我们最初注册进来的channel获取出来,并建立accept()监听,进入阻塞(其实已经不用阻塞了,肯定是个准备好的channel,拿到SocketChannel后,将其设置为非阻塞,通过SelectionKey.OP_READ状态注册到selector中去,最后将其在selectKeys中移除。
注册事件状态时,可用 | 连接,比如SelectionKey.OP_READ | SelectionKey.OP_ACCEPT
// selector.open(); private native int epollCreate(); // serverSocketChannel/socketChannle.register(selector, SelectionKey.OP_ACCEPT) private native void epollCtl(int epfd, int opcode, int fd, int events); // selector.select() private native int epollwait(long pollAddress, int numfds, long timeout, int epfd) throws IOException;
再次进行selector.select(),这时会返回刚刚readable的SelectionKey,通过selector.selectionKeys()拿到后,判断其状态为isReadable(),就可以对其进行读写操作了,最后也要将其描述符移除掉
客户端处理过程:
server
public class NIOSelectorServer { public static void main(String[] args) { try (Selector selector = Selector.open(); ServerSocketChannel serverSocketChannel = ServerSocketChannel.open()) { serverSocketChannel.configureBlocking(false); // 在多路复用器中,这个必须设置为非阻塞 serverSocketChannel.bind(new InetSocketAddress(9090)); // 监听连接事件 // 将serverSocketChannel注册到selector上 serverSocketChannel.register(selector, SelectionKey.OP_ACCEPT); while (true) { // 参数可以带时间:0:阻塞;有时间:设置一个超时时间 selector.select(); // 阻塞所有注册到多路复用器上的事件 Set<SelectionKey> selectionKeys = selector.selectedKeys(); // 对于连接的SocketChannel的selectKey的集合 Iterator<SelectionKey> iterator = selectionKeys.iterator(); while (iterator.hasNext()) { SelectionKey selectionKey = iterator.next(); iterator.remove(); // 避免重复处理 // socket两种状态:listen 通信R/W if (selectionKey.isAcceptable()) { // 是一个连接事件 acceptHandler(selectionKey); } else if (selectionKey.isReadable()) { // 是一个读事件 readHandler(selectionKey); } } } } catch (Exception e) { e.printStackTrace(); } } public static void acceptHandler(SelectionKey key) { try { ServerSocketChannel ssc = (ServerSocketChannel) key.channel(); SocketChannel client = ssc.accept(); // 目的是调用accept接收客户端,例如fd7 client.configureBlocking(false); ByteBuffer buffer = ByteBuffer.allocate(1024); client.register(key.selector(), SelectionKey.OP_READ, buffer); System.out.println("-------------------------------------------"); System.out.println("新客户端:" + client.getRemoteAddress()); System.out.println("-------------------------------------------"); } catch (IOException e) { e.printStackTrace(); } } public static void readHandler(SelectionKey key) { SocketChannel channel = (SocketChannel) key.channel(); ByteBuffer buffer = (ByteBuffer) key.attachment(); try { channel.read(buffer); buffer.flip(); System.out.println("Client Info: "+new String(buffer.array())); buffer.clear(); buffer.put("Hello Client, i'm Server".getBytes()); buffer.flip(); channel.write(buffer); channel.close(); } catch (IOException e) { e.printStackTrace(); } } }
client
public class NIOSelectorClient { public static void main(String[] args) { try (Selector selector = Selector.open()) { SocketChannel socketChannel = SocketChannel.open(); socketChannel.configureBlocking(false); socketChannel.connect(new InetSocketAddress("localhost", 9090)); socketChannel.register(selector, SelectionKey.OP_CONNECT); while (true) { selector.select(); Set<SelectionKey> selectionKeySet = selector.selectedKeys(); Iterator<SelectionKey> selectionKeyIterator = selectionKeySet.iterator(); while (selectionKeyIterator.hasNext()) { SelectionKey selectionKey = selectionKeyIterator.next(); selectionKeyIterator.remove(); if (selectionKey.isConnectable()) { connectHandler(selector, selectionKey); } else if (selectionKey.isReadable()) { readHandler(selectionKey); } } } } catch (Exception e) { e.printStackTrace(); } } private static void connectHandler(Selector selector, SelectionKey selectionKey) throws IOException { SocketChannel channel = (SocketChannel) selectionKey.channel(); if (channel.isConnectionPending()) { channel.finishConnect(); } channel.configureBlocking(false); channel.write(ByteBuffer.wrap("Hello Server, I'm NIO Client".getBytes())); channel.register(selector, SelectionKey.OP_READ); } private static void readHandler(SelectionKey selectionKey) throws IOException { SocketChannel channel = (SocketChannel) selectionKey.channel(); ByteBuffer byteBuffer = ByteBuffer.allocate(1024); channel.read(byteBuffer); byteBuffer.flip(); System.out.println("client receive message: " + new String(byteBuffer.array())); channel.close(); } }
accpet() + new Thread(() -> { // 业务处理 }).start();
new SocketServerChannel().registror(selector, SelectionKey.OP_ACCEPT); while(true) { selector.select(); seletor.selectedKeys().iterator(); while (iterator.hasnext()) { // 业务处理 } }
在上面业务处理部分加入多线程。
Netty,两个Grop,一个处理accept,一个处理业务