单Reactor线程模型,最终还是使用了一个线程,和上篇文章最后的示例基本上没啥差别,只是拆分了三个类来进行处理。
这里的线程模型,是在客户端连接后,业务数据的部分抽出来,放到线程池中处理,这样做的好处是,如果处理业务数据时间很长,也不会影响客户端的读写操作。
上面图示是简单的基于工作线程的工作模式,把业务数据处理单独抽出来,在线程池处理。
最终的线程模型,是多Reactor线程模型。
客户端数据的读写也是在多个线程中进行处理,充分提高了性能。
MainReactor-Thread和Acceptor代码:
/** * mainReactor-Thread * 接收客户端连接,然后交给Acceptor处理 */ class MainReactor extends Thread { //创建一个Selector public Selector selector; AtomicInteger integer = new AtomicInteger(0); public MainReactor() throws IOException { selector = Selector.open(); } @Override public void run() { while (true) { try { //启动Selector selector.select(); //获取事件 Set<SelectionKey> selectionKeys = selector.selectedKeys(); Iterator<SelectionKey> iterator = selectionKeys.iterator(); //遍历事件 while (iterator.hasNext()) { SelectionKey selectionKey = iterator.next(); if (selectionKey.isAcceptable()) { //获取客户端通道 SocketChannel socketChannel = ((ServerSocketChannel)selectionKey.channel()).accept(); Acceptor acceptor = new Acceptor(); //把客户端通道交给Acceptor去处理 acceptor.register(socketChannel); } //处理完之后要移除 iterator.remove(); } } catch (IOException e) { e.printStackTrace(); } } } public void register(ServerSocketChannel serverSocketChannel) throws ClosedChannelException { //注册OP_ACCEPT事件 serverSocketChannel.register(selector, SelectionKey.OP_ACCEPT); } /** * 处理客户端的连接 * 给每个客户端连接分配一个subReactor-Thread */ class Acceptor { public void register(SocketChannel socketChannel) throws IOException { //设置为非阻塞模式 socketChannel.configureBlocking(false); int index = integer.getAndIncrement() % subReactors.length; SubReactor subReactor = subReactors[index]; //给客户端连接分配一个subReactor线程 subReactor.register(socketChannel); //启动subReactor线程 subReactor.start(); System.out.println("收到新连接:" + socketChannel.getRemoteAddress()); } } }
SubReactor-Threads代码:
/** * 一个线程负责多个客户端连接 * 从channel中读数据、写数据 */ class SubReactor extends Thread { //创建一个Selector public Selector selector; //用于判断SubReactor线程是否已经启动 public volatile boolean isRunning = false; public SubReactor() throws IOException { selector = Selector.open(); } @Override public void start() { //判断SubReactor线程是否已经启动 //如果没有启动,就启动SubReactor线程 if (!isRunning) { isRunning = true; super.start(); } } @Override public void run() { while (isRunning) { try { //启动Selector selector.select(); //获取事件 Set<SelectionKey> selectionKeys = selector.selectedKeys(); Iterator<SelectionKey> iterator = selectionKeys.iterator(); //遍历事件 while (iterator.hasNext()) { SelectionKey selectionKey = iterator.next(); if (selectionKey.isReadable()) { try { SocketChannel socketChannel = (SocketChannel)selectionKey.channel(); new Handler(socketChannel); } catch (Exception e) { e.printStackTrace(); selectionKey.cancel(); } } //处理完之后要移除 iterator.remove(); } } catch (IOException e) { e.printStackTrace(); } } } public void register(SocketChannel socketChannel) throws IOException { //注册OP_READ事件 socketChannel.register(selector, SelectionKey.OP_READ); } /** 读取或者写入数据 */ class Handler { //用来读取或者写入数据 public Handler(SocketChannel socketChannel) throws IOException { ByteBuffer readBuffer = ByteBuffer.allocate(1024); while (socketChannel.isOpen() && socketChannel.read(readBuffer) != -1) { //如果有数据可读,简单的判断一下大于0 if (readBuffer.position() > 0) { break; } } //没有数据可读,就直接返回 if (readBuffer.position() == 0) { return; } //转换为读取模式 readBuffer.flip(); byte[] bytes = new byte[readBuffer.limit()]; readBuffer.get(bytes); System.out.println("获取到新的数据:" + new String(bytes)); System.out.println("获取到新的数据,来自:" + socketChannel.getRemoteAddress()); //线程池,用来处理业务数据 threadPool.execute(new Runnable() { @Override public void run() { } }); //向客户端写数据 String response = "HTTP/1.1 200 OK\r\n" + "Content-Length: 11\r\n\r\n" + "hello world"; ByteBuffer writeBuffer = ByteBuffer.wrap(response.getBytes()); while (writeBuffer.hasRemaining()) { socketChannel.write(writeBuffer); } } } }
初始化代码:
/** 服务端通道 */ public ServerSocketChannel serverSocketChannel; /** 用来接收客户端连接 */ public MainReactor mainReactor; /** 用来处理客户端连接的读取、写入 */ public SubReactor[] subReactors = new SubReactor[10]; /** 线程池,用来处理客户端连接后的业务逻辑 */ public ExecutorService threadPool = Executors.newCachedThreadPool(); public static void main(String[] args) throws IOException { NioReactor nioReactor = new NioReactor(); nioReactor.initAndRegister(); nioReactor.init(); nioReactor.bind(); } /** 初始化服务端 */ public void init() throws IOException { //创建一个服务端通道 serverSocketChannel = ServerSocketChannel.open(); //设置为非阻塞模式 serverSocketChannel.configureBlocking(false); //注册到mainReactor-Thread mainReactor.register(serverSocketChannel); //启动mainReactor-Thread线程 mainReactor.start(); } /** 服务端绑定端口 */ public void bind() throws IOException { serverSocketChannel.socket().bind(new InetSocketAddress(8056)); System.out.println("服务端启动成功"); } /** 初始化MainReactor和SubReactor */ public void initAndRegister() throws IOException { mainReactor = new MainReactor(); for (int i=0; i<subReactors.length; i++) { subReactors[i] = new SubReactor(); } }
上面代码mainReactorThread和subReactorThread中有大量的重复代码,可以提取出来处理:
/** 多路复用,reactor线程模型 */ public class NioReactor2 { abstract class ReactorThread extends Thread { //创建一个Selector public Selector selector; //用于判断线程是否已经启动 public volatile boolean isRunning = false; /** 有事件发生,就调用这个方法 */ public abstract void handler(SelectableChannel channel) throws IOException; public ReactorThread() throws IOException { selector = Selector.open(); } @Override public void run() { while (isRunning) { //启动Selector try { selector.select(); //获取事件 Set<SelectionKey> selectionKeys = selector.selectedKeys(); Iterator<SelectionKey> iterator = selectionKeys.iterator(); //遍历事件 while (iterator.hasNext()) { SelectionKey key = iterator.next(); int readyOps = key.readyOps(); //只关注是否有OP_ACCEPT和OP_READ事件 if ((readyOps & (SelectionKey.OP_ACCEPT | SelectionKey.OP_READ)) != 0 || readyOps == 0) { try { //获取channel SelectableChannel channel = key.channel(); //设置为非阻塞模式 channel.configureBlocking(false); //有事件,就调用handler方法 handler(channel); if (!channel.isOpen()) { //如果channel关闭,就取消key key.cancel(); } } catch (Exception e) { e.printStackTrace(); //如果有异常,就取消key key.cancel(); } } //处理完之后要移除 iterator.remove(); } selector.selectNow(); } catch (IOException e) { e.printStackTrace(); } } } public SelectionKey register(SelectableChannel channel) throws ClosedChannelException { //先注册到Selector,并没有注册任何事件 return channel.register(selector, 0); } @Override public void start() { //判断SubReactor线程是否已经启动 //如果没有启动,就启动SubReactor线程 if (!isRunning) { isRunning = true; super.start(); } } } /** 服务端通道 */ public ServerSocketChannel serverSocketChannel; /** 用来接收客户端连接 */ public ReactorThread[] mainReactors = new ReactorThread[1];; /** 用来处理客户端连接的读取、写入 */ public ReactorThread[] subReactors = new ReactorThread[10]; /** 线程池,用来处理客户端连接后的业务逻辑 */ public ExecutorService threadPool = Executors.newCachedThreadPool(); /** * 初始化mainReactors和subReactors */ public void initAndRegister() throws IOException { //subReactors线程,用来客户端连接后的读写 for (int i=0; i<subReactors.length; i++) { subReactors[i] = new ReactorThread() { @Override public void handler(SelectableChannel channel) throws IOException { SocketChannel socketChannel = (SocketChannel)channel; ByteBuffer readBuffer = ByteBuffer.allocate(1024); while (socketChannel.isOpen() && socketChannel.read(readBuffer) != -1) { //如果有数据可读,简单的判断一下大于0 if (readBuffer.position() > 0) { break; } } //没有数据可读,就直接返回 if (readBuffer.position() == 0) { return; } //转换为读取模式 readBuffer.flip(); byte[] bytes = new byte[readBuffer.limit()]; readBuffer.get(bytes); System.out.println("获取到新的数据:" + new String(bytes)); System.out.println("获取到新的数据,来自:" + socketChannel.getRemoteAddress()); //线程池,用来处理业务数据 threadPool.execute(new Runnable() { @Override public void run() { } }); //向客户端写数据 String response = "HTTP/1.1 200 OK\r\n" + "Content-Length: 11\r\n\r\n" + "hello world"; ByteBuffer writeBuffer = ByteBuffer.wrap(response.getBytes()); while (writeBuffer.hasRemaining()) { socketChannel.write(writeBuffer); } } }; } //mainReactors线程,用于客户端的连接 for (int i=0; i<mainReactors.length; i++) { mainReactors[i] = new ReactorThread() { AtomicInteger integer = new AtomicInteger(0); @Override public void handler(SelectableChannel channel) throws IOException { //获取客户端通道 SocketChannel socketChannel = ((ServerSocketChannel)channel).accept(); //设置为非阻塞模式 socketChannel.configureBlocking(false); int index = integer.getAndIncrement() % subReactors.length; ReactorThread subReactor = subReactors[index]; //启动线程 subReactor.start(); //注册事件 SelectionKey key = subReactor.register(socketChannel); key.interestOps(SelectionKey.OP_READ); System.out.println("收到新连接:" + socketChannel.getRemoteAddress()); } }; } } /** 初始化服务端 */ public void init() throws IOException { //创建一个服务端通道 serverSocketChannel = ServerSocketChannel.open(); //设置为非阻塞模式 serverSocketChannel.configureBlocking(false); //注册到mainReactor-Thread int index = new Random().nextInt(mainReactors.length); SelectionKey keys = mainReactors[index].register(serverSocketChannel); keys.interestOps(SelectionKey.OP_ACCEPT); //启动mainReactor-Thread线程 mainReactors[index].start(); } /** 服务端绑定端口 */ public void bind() throws IOException { serverSocketChannel.socket().bind(new InetSocketAddress(8056)); System.out.println("服务端启动成功"); } public static void main(String[] args) throws IOException { NioReactor2 nioReactor = new NioReactor2(); nioReactor.initAndRegister(); nioReactor.init(); nioReactor.bind(); } }
到此,NIO中的Reactor线程模型就结束了,上面的示例可以拆分几个类进行处理,还可以根据HTTP协议的部分,解析请求,做一个简单的tomcat服务器。