作者:Grey
原文地址:Java IO学习笔记七:多路复用从单线程到多线程
在前面提到的多路复用的服务端代码中, 我们在处理读数据的同时,也处理了写事件:
public void readHandler(SelectionKey key) { SocketChannel client = (SocketChannel) key.channel(); ByteBuffer buffer = (ByteBuffer) key.attachment(); buffer.clear(); int read; try { while (true) { read = client.read(buffer); if (read > 0) { buffer.flip(); while (buffer.hasRemaining()) { client.write(buffer); } buffer.clear(); } else if (read == 0) { break; } else { client.close(); break; } } } catch (IOException e) { e.printStackTrace(); } }
为了权责清晰一些,我们分开了两个事件处理:
一个负责写,一个负责读
读的事件处理, 如下代码
public void readHandler(SelectionKey key) { System.out.println("read handler....."); SocketChannel client = (SocketChannel) key.channel(); ByteBuffer buffer = (ByteBuffer) key.attachment(); buffer.clear(); int read = 0; try { while (true) { read = client.read(buffer); if (read > 0) { client.register(key.selector(), SelectionKey.OP_WRITE, buffer); } else if (read == 0) { break; } else { client.close(); break; } } } catch (IOException e) { e.printStackTrace(); } }
其中read > 0 即从客户端读取到了数据,我们才注册一个写事件:
client.register(key.selector(), SelectionKey.OP_WRITE, buffer);
其他事件不注册写事件。(PS:只要send-queue没有满,就可以注册写事件)
写事件的处理逻辑如下:
private void writeHandler(SelectionKey key) { System.out.println("write handler..."); SocketChannel client = (SocketChannel) key.channel(); ByteBuffer buffer = (ByteBuffer) key.attachment(); buffer.flip(); while (buffer.hasRemaining()) { try { client.write(buffer); } catch (IOException e) { e.printStackTrace(); } } buffer.clear(); key.cancel(); try { client.close(); } catch (IOException e) { e.printStackTrace(); } }
写完后,调用key.cancel() 取消注册,并关闭客户端。既然分了读和写的不同处理流程,那么在主方法里面调用的时候:
while (iter.hasNext()) { SelectionKey key = iter.next(); iter.remove(); if (key.isAcceptable()) { acceptHandler(key); } else if (key.isReadable()) { readHandler(key); } else if (key.isWritable()) { writeHandler(key); } }
增加了
if (key.isWritable()) { writeHandler(key); }
测试一下,运行SocketMultiplexingV2.java
并通过一个客户端连接进来:
nc 192.168.205.1 9090
客户端发送一些内容:
nc 192.168.205.1 9090 asdfasdfasf asdfasdfasf
可以正常接收到数据。
考虑有一个fd执行耗时,在一个线性里会阻塞后续FD的处理,同时,考虑资源利用,充分利用cpu核数。
我们来实现一个基于多线程的多路复用模型。
将N个FD分组(这里的FD就是Socket连接),每一组一个selector,将一个selector压到一个线程上(最好的线程数量是: cpu核数或者cpu核数*2)
每个selector中的fd是线性执行的。假设有100w个连接,如果有四个线程,那么每个线程处理25w个。
分组的FD和处理这堆FD的Selector我们封装到一个数据结构中,假设叫:SelectorThread,其成员变量至少有如下:
public class SelectorThread { ... Selector selector = null; // 存Selector对应要处理的FD队列 LinkedBlockingQueue<Channel> lbq = new LinkedBlockingQueue<>(); ... }
由于其处理是线性的,且我们要开很多个线程来处理,所以SelectorThread本身是一个线程类(实现Runnable接口)
public class SelectorThread implements Runnable { ... }
在run方法中,我们就可以把之前单线程处理selector的常规操作代码移植过来:
.... while (true) { .... if (selector.select() > 0) { Iterator<SelectionKey> iter = selector.selectedKeys().iterator(); while (iter.hasNext()) { SelectionKey key = iter.next(); iter.remove(); if (key.isAcceptable()) { acceptHandler(key); } else if (key.isReadable()) { readHandler(key); } else if (key.isWritable()) { } } } .... } ....
SelectorThread设计好以后,我们需要一个可以组织SelectorThread的类,假设叫SelectorThreadGroup,这个类的主要职责就是安排哪些FD由哪些Selector来接管,这个类里面持有两个SelectorThread数组,一个用于分配服务端,一个用于分配每次客户端的Socket请求。
// 服务端,可以启动多个服务端 SelectorThread[] bosses; // 客户端的Socket请求 SelectorThread[] workers;
构造器中初始化这两个数组
SelectorThreadGroup(int bossNum, int workerNum) { bosses = new SelectorThread[bossNum]; workers = new SelectorThread[workerNum]; for (int i = 0; i < bossNum; i++) { bosses[i] = new SelectorThread(this); new Thread(bosses[i]).start(); } for (int i = 0; i < workerNum; i++) { workers[i] = new SelectorThread(this); new Thread(workers[i]).start(); } }
以下代码是针对每次的请求,如何分配Selector:
... public void nextSelector(Channel c) { try { SelectorThread st; if (c instanceof ServerSocketChannel) { st = nextBoss(); st.lbq.put(c); st.setWorker(workerGroup); } else { st = nextWork(); st.lbq.add(c); } st.selector.wakeup(); } catch (InterruptedException e) { e.printStackTrace(); } } private SelectorThread nextBoss() { int index = xid.incrementAndGet() % bosses.length; return bosses[index]; } private SelectorThread nextWork() { int index = xid.incrementAndGet() % workers.length; //动用worker的线程分配 return workers[index]; } ...
这里要区分两类Channel,一类是ServerSocketChannel,即我们每次启动的服务端,另外一类就是连接服务端的Socket请求,这两类最好是分到不同的SelectorThread中的队列中去。分配的算法是朴素的轮询算法(除以数组长度取模)
这样我们主函数只需要和SelectorThreadGroup交互即可:
public class Startup { public static void main(String[] args) { // 开辟了三个SelectorThread给服务端,开辟了三个SelectorThread给客户端去接收Socket SelectorThreadGroup group = new SelectorThreadGroup(3,3); group.bind(9999); group.bind(8888); group.bind(6666); group.bind(7777); } }
启动Startup,
开启一个客户端,请求服务端,测试一下:
[root@io io]# nc 192.168.205.1 7777 sdfasdfs sdfasdfs
客户端请求的数据可以返回,服务端可以监听到客户端的请求:
Thread-1 register listen Thread-0 register listen Thread-2 register listen Thread-1 register listen Thread-1 acceptHandler...... Thread-5 register client: /192.168.205.138:44152
因为我们开了四个端口的监听,但是我们只设置了三个服务端SelectorThread,所以可以看到Thread-1监听了两个服务端。
新接入的客户端连接是从Thread-5开始的,不会和前面的Thread-0,Thread-1,Thread-2冲突。
再次来一个新的客户端连接
[root@io io]# nc 192.168.205.1 8888 sdfasdfas sdfasdfas
输入一些内容,依然可以得到服务端的响应
服务端这边日志显示:
Thread-3 register client: /192.168.205.138:33262 Thread-3 read......
显示是Thread-3捕获了新的连接,也不会和前面的Thread-0,Thread-1,Thread-2冲突。
完整源码:Github