系列文章目录和关于我
参考:[nio.pdf (oswego.edu)](https://gee.cs.oswego.edu/dl/cpjslides/nio.pdf)
The reactor design pattern is an event handling pattern for handling service requests delivered concurrently to a service handler by one or more inputs. The service handler then demultiplexes the incoming requests and dispatches them synchronously to the associated request handlers.
Reactor模式是一种用于处理高并发的设计模式,也被称为事件驱动模式。在这种模式中,应用程序会将输入事件交给一个事件处理器,称为Reactor,Reactor会监听所有输入事件,并将它们分发
给相应的处理程序进行处理。这种模式可以大大提高应用程序的性能和可扩展性,因为它使用了非阻塞I/O和异步处理技术,使得一个进程可以同时处理多个事件,而不会因为某个事件的处理时间过长而影响其他事件的处理。Reactor模式被广泛应用于网络编程和操作系统级别的事件驱动程序。
在传统BIO模式中有多少个客户端请求,就需要多少个对于的线程进行一对一的处理。
这种模型有如下缺点:
Java NIO 带来非阻塞IO,和IO多路复用。
得益于非阻塞IO和IO多路复用,让服务可以处理更多的并发请,不再受限于一个客户端一个线程来处理,而是一个线程可以维护多个客户端。
可以看到java 中NIO有点reactor的意思:
Selector多路复用器监听IO事件进行分发,针对连接事件,读写事件进行不同的处理。
Reactor核心是Reactor加上对应的处理器Handler,Reactor在一个单独的线程中运行,负责监听和分发事件,将接收到的事件交给不同的Handler来处理,Handler是处理程序执行I/O事件的实际操作。
得益于Java NIO 非阻塞IO 于 IO多路复用
。例如Netty就使用了Reactor模式,程序员只需要写如何处理事件
这个模型诠释了Reactor模式的组成部分:
public class Reactor implements Runnable { //多路复用器 final Selector selector; //服务端Channel final ServerSocketChannel serverSocket; Reactor(int port) throws IOException { selector = Selector.open(); serverSocket = ServerSocketChannel.open(); serverSocket.socket().bind( new InetSocketAddress(port)); serverSocket.configureBlocking(false); // 注册io多路复用器连接事件 SelectionKey sk = serverSocket.register(selector, SelectionKey.OP_ACCEPT); // 将服务端Channel 关联一个Acceptor sk.attach(new Acceptor()); } @Override public void run() { try { while (!Thread.interrupted()) { selector.select(); Set<SelectionKey> selected = selector.selectedKeys(); Iterator<SelectionKey> it = selected.iterator(); while (it.hasNext()) //分发 dispatch(it.next()); selected.clear(); } } catch (IOException ex) { /* ... */ } } void dispatch(SelectionKey k) { // 拿到关联的acceptor 或者handler Runnable r = (Runnable) (k.attachment()); if (r != null) r.run(); } //内部类 负责处理连接事件 class Acceptor implements Runnable { public void run() { try { // 拿到Channel SocketChannel c = serverSocket.accept(); if (c != null) // 创建handler new Handler(selector, c); } catch (IOException ex) { /* ... */ } } } final class Handler implements Runnable { final SocketChannel socket; final SelectionKey sk; ByteBuffer input = ByteBuffer.allocate(1024); ByteBuffer output = ByteBuffer.allocate(1024); static final int READING = 0, SENDING = 1; int state = READING; //设置非阻塞 //监听可读事件 Handler(Selector sel, SocketChannel c) throws IOException { socket = c; c.configureBlocking(false); sk = socket.register(sel, 0); sk.attach(this); sk.interestOps(SelectionKey.OP_READ); sel.wakeup(); } boolean inputIsComplete() { return false; } boolean outputIsComplete() { return false; } void process() { } public void run() { try { //如果可读 if (state == READING) read(); //如果可写 else if (state == SENDING) send(); } catch (IOException ex) { /* ... */ } } void read() throws IOException { socket.read(input); if (inputIsComplete()) { process(); state = SENDING; sk.interestOps(SelectionKey.OP_WRITE); } } void send() throws IOException { socket.write(output); if (outputIsComplete()) sk.cancel(); } } }
可以看到Reactor模式将Channel和Acceptor,Handler进行绑定依赖于SelectionKey#attach
方法,通过此方法在不同的事件发生时调用SelectionKey#attachment
方法,获取到对应的处理程序进行处理。
Reactor由单线程运行,通过IO多路复用Selector监听多个事件是否就绪,得益于Channel提供的非阻塞IO能力,当IO没有就绪的时候,单线程不会阻塞而是继续处理下一个。
由于其单线程的原因,无法利用计算机多核心资源,并且如果读取请求内容处理的过程存在耗时操作(比如数据库,rpc等)那么回导致下一个事件得不到快速的响应。
引入多线程解决单线程Reactor的不足
可以看到多线程模型引入了线程池,对于就绪的可读,可写IO事件交给线程池进行处理。
主要是对单线程模型中的Handler进行改造,将处理逻辑提交到线程池中。
多线程模型涉及到共享资源的使用,不如读写Channel依赖的Buffer如何分配。
可以看到多线程模型的缺点:线程通信和同步逻辑复杂,需要处理多线程安全问题。
在这种模型中,mainReactor负责处理连接建立事件,只需要一个线程即可。subReactor负责和建立连接的socket进行数据交互并处理业务逻辑,并且每一个subReactor可持有一个独立的Selector进行IO多路复用事件监听。
// SubReactor 池子,负责负载均衡的选择SubReactor public class SubReactorPool { final static SubReactor[] subReactors; static final AtomicInteger count = new AtomicInteger(); static { int availableProcessors = Runtime.getRuntime().availableProcessors(); subReactors = new SubReactor[availableProcessors]; for (int i = 0; i < subReactors.length; i++) { subReactors[i] = new SubReactor(); } } static class SubReactor implements Runnable{ // 业务处理线程池 final static Executor poolExecutor = Executors.newCachedThreadPool(); // io多路复用 Selector selector; SubReactor() { try { selector = Selector.open(); } catch (IOException e) { throw new RuntimeException(e); } } public void registry(SocketChannel socketChannel) throws ClosedChannelException { socketChannel.register(selector,SelectionKey.OP_READ); } @Override public void run() { while (true){ try { selector.select(); } catch (IOException e) { throw new RuntimeException(e); } Set<SelectionKey> selectionKeys = selector.selectedKeys(); Iterator<SelectionKey> iterator = selectionKeys.iterator(); while (iterator.hasNext()){ SelectionKey sk = iterator.next(); if (sk.isReadable()) { poolExecutor.execute(()->new Handler(sk)); } // 可写,。。。。 iterator.remove(); } } } } //选择合适的SubReactor static SubReactor loadBalanceChoose(SocketChannel socketChannel){ int countInt = count.getAndAdd(1); return subReactors[countInt % subReactors.length]; } }
多Reactor解决了单个Selector注册连接,读写事件,导致内核轮询的时候需要判断太多fd而效率缓慢的问题。
在Tomcat请求处理流程与源码浅析 - Cuzzz - 博客园 (cnblogs.com)中,说到Tomcat Connector的设计
其中
下图展示了Acceptor 和 Poller的协作
这一步借助ServerSocketChannel#accept方法,进行等待客户端连接,Acceptor单线程进行监听。
这一步设置非阻塞,并且使用计数取模的方式实现多个Poller的负载均衡
然后将事件保证为PollerEvent 提交到Poller的阻塞队列中
轮询阻塞队列中的PollerEvent,并且调用run方法,run方法会把事件注册到Poller的Selector上,注意下面的注册将NioSocketWrapper作为attachment进行了绑定
tomcat处理事件的时候,会创建出SocketProcessor进行处理,SocketProcessor是一个Runnable,最后会提交到线程池。