前面几期带大家分析了Netty的服务端启动,客户端连接,客户端消息处理的流程,本期就来讲解Netty中一个极其重要的组件,事件循环器EventLoop。
我们平常使用Netty的时候,一般都是用NioEventLoopGroup进行操作,所以我们从NioEventLoopGroup开始分析。首先来看一下NioEventLoopGroup的继承体系
从上图可以看出NioEventLoopGroup是一个线程池,所以有一个execute()方法,并且实现了ScheduledExecutorService接口,说明它还可以执行一些调度任务。但是去看它自身和它的父类的源码,可以发现它并没持有真正的线程资源,而是由NioEventLoop持有。因为NioEventLoopGroup内部管理了多个NioEventLoop,所以我觉得把NioEventLoopGroup叫做线程组会更好理解一些。我们还是先从NioEventLoopGroup的构造开始分析。
#NioEventLoopGroup
//无参构造 public NioEventLoopGroup() { this(0); } //创建的NioEventLoop个数,也就是线程数 public NioEventLoopGroup(int nThreads) { this(nThreads, (Executor) null); } public NioEventLoopGroup(int nThreads, Executor executor) { //传了一个null的executor,还有一个selector的提供器 this(nThreads, executor, SelectorProvider.provider()); } public NioEventLoopGroup( int nThreads, Executor executor, final SelectorProvider selectorProvider) { //多传了一个选择策略 this(nThreads, executor, selectorProvider, DefaultSelectStrategyFactory.INSTANCE); } public NioEventLoopGroup(int nThreads, Executor executor, final SelectorProvider selectorProvider, final SelectStrategyFactory selectStrategyFactory) { //多传了一个拒绝策略(线程池的拒绝策略),并调用父类的构造 super(nThreads, executor, selectorProvider, selectStrategyFactory, RejectedExecutionHandlers.reject()); }
#MultithreadEventLoopGroup
protected MultithreadEventLoopGroup(int nThreads, Executor executor, Object... args) { //将线程数设置为cpu核数*2,并且将前面的参数传给父类构造 super(nThreads == 0 ? DEFAULT_EVENT_LOOP_THREADS : nThreads, executor, args); }
#MultithreadEventExecutorGroup
protected MultithreadEventExecutorGroup(int nThreads, Executor executor, Object... args) { //多传了一个选择器工厂 this(nThreads, executor, DefaultEventExecutorChooserFactory.INSTANCE, args); } //到这里构造算是调用完了,总结一下都传了哪些参数 //1. nThreads 线程数 //2. executor 执行器 (目前是null) //3. chooserFactory 选择器工厂 //4. args 将之前的参数包装成对象数组了 // args[0] selectorProvider 选择器提供器 // args[1] selectStrategyFactory 选择策略工厂 // args[2] RejectedExecutionHandlers.reject() 拒绝策略 protected MultithreadEventExecutorGroup(int nThreads, Executor executor, EventExecutorChooserFactory chooserFactory, Object... args) { if (nThreads <= 0) { throw new IllegalArgumentException(String.format("nThreads: %d (expected: > 0)", nThreads)); } //从构造链路来看是null if (executor == null) { //下面会讲到这个 executor = new ThreadPerTaskExecutor(newDefaultThreadFactory()); } //创建管理EventLoop的数组 children = new EventExecutor[nThreads]; for (int i = 0; i < nThreads; i ++) { boolean success = false; try { //创建NioEventLoop children[i] = newChild(executor, args); success = true; } catch (Exception e) { throw new IllegalStateException("failed to create a child event loop", e); } finally { //只要有一个创建失败,其他的都要关闭 if (!success) { for (int j = 0; j < i; j ++) { children[j].shutdownGracefully(); } for (int j = 0; j < i; j ++) { EventExecutor e = children[j]; try { while (!e.isTerminated()) { e.awaitTermination(Integer.MAX_VALUE, TimeUnit.SECONDS); } } catch (InterruptedException interrupted) { Thread.currentThread().interrupt(); break; } } } } } //选择器工厂创建选择器 //选择器的作用就是通过不同的算法,从children中选出来一个NioEventLoop //具体算法可以去看下#DefaultEventExecutorChooserFactory,默认实现了两种 chooser = chooserFactory.newChooser(children); //终止监听器 final FutureListener<Object> terminationListener = new FutureListener<Object>() { @Override public void operationComplete(Future<Object> future) throws Exception { if (terminatedChildren.incrementAndGet() == children.length) { terminationFuture.setSuccess(null); } } }; //为每一个NioEventLoop添加监听 for (EventExecutor e: children) { e.terminationFuture().addListener(terminationListener); } Set<EventExecutor> childrenSet = new LinkedHashSet<EventExecutor>(children.length); Collections.addAll(childrenSet, children); readonlyChildren = Collections.unmodifiableSet(childrenSet); }
从上面的构造我们挑出以下两行做为重点
executor = new ThreadPerTaskExecutor(newDefaultThreadFactory()); //...... children[i] = newChild(executor, args);
首先来看newDefaultThreadFactory()
protected ThreadFactory newDefaultThreadFactory() { //返回了一个默认的线程工厂 return new DefaultThreadFactory(getClass()); }
再去看#ThreadPerTaskExecutor
public final class ThreadPerTaskExecutor implements Executor { //持有默认线程工厂的引用 private final ThreadFactory threadFactory; public ThreadPerTaskExecutor(ThreadFactory threadFactory) { this.threadFactory = ObjectUtil.checkNotNull(threadFactory, "threadFactory"); } @Override public void execute(Runnable command) { //每执行一个任务,就会创建一个线程 threadFactory.newThread(command).start(); } }
这个newThread()就是在#DefaultThreadFactory里
@Override public Thread newThread(Runnable r) { Thread t = newThread(FastThreadLocalRunnable.wrap(r), prefix + nextId.incrementAndGet()); try { if (t.isDaemon() != daemon) { t.setDaemon(daemon); } if (t.getPriority() != priority) { t.setPriority(priority); } } catch (Exception ignored) { } return t; } protected Thread newThread(Runnable r, String name) { //创建的线程类型是FastThreadLocalThread return new FastThreadLocalThread(threadGroup, r, name); }
到这里我们知道了executor内部保存了一个线程工厂,并且提交一个任务就会创建一个线程。
再来看看**children[i] = newChild(executor, args);**的源码,将刚刚赋值好的executor和args都传了进去。
#NioEventLoopGroup
@Override protected EventLoop newChild(Executor executor, Object... args) throws Exception { //我们分析的构造链路,args只有3个参数,所以queueFactory也为null EventLoopTaskQueueFactory queueFactory = args.length == 4 ? (EventLoopTaskQueueFactory) args[3] : null; //将参数传入NioEventLoop构造,并创建对象 return new NioEventLoop(this, executor, (SelectorProvider) args[0], ((SelectStrategyFactory) args[1]).newSelectStrategy(), (RejectedExecutionHandler) args[2], queueFactory); }
到这里,NioEventLoopGroup的流程就已经走完了,我们知道了NioEventLoopGroup中保存了一个NioEventLoop的数组,还保存了一个选择器(用来选出NioEventLoop)。接下来是分析NioEventLoop的时候了。
还是先来看看NioEventLoop的继承体系
可以发现,NioEventLoop是一个单线程的线程池,我们从构造开始看。
//parent:nioEventLoopGroup //executor:ThreadPerTaskExecutor 内部持有默认的线程工厂 //selectorProvider:选择器提供器 //strategy:选择策略 //rejectedExecutionHandler:拒绝策略 //queueFactory:null NioEventLoop(NioEventLoopGroup parent, Executor executor, SelectorProvider selectorProvider, SelectStrategy strategy, RejectedExecutionHandler rejectedExecutionHandler, EventLoopTaskQueueFactory queueFactory) { //继续传向父类构造,newTaskQueue()创建队列 super(parent, executor, false, newTaskQueue(queueFactory), newTaskQueue(queueFactory), rejectedExecutionHandler); this.provider = ObjectUtil.checkNotNull(selectorProvider, "selectorProvider"); this.selectStrategy = ObjectUtil.checkNotNull(strategy, "selectStrategy"); //下面这三行代码 ,创建出来了 seletor实例,也就是说每个 NioEventLoop都持有一个seletor实例。 final SelectorTuple selectorTuple = openSelector(); this.selector = selectorTuple.selector; this.unwrappedSelector = selectorTuple.unwrappedSelector; }
#SingleThreadEventLoop
protected SingleThreadEventLoop(EventLoopGroup parent, Executor executor, boolean addTaskWakesUp, Queue<Runnable> taskQueue, Queue<Runnable> tailTaskQueue, RejectedExecutionHandler rejectedExecutionHandler) { //taskQueue:任务队列 super(parent, executor, addTaskWakesUp, taskQueue, rejectedExecutionHandler); //不是重点 tailTasks = ObjectUtil.checkNotNull(tailTaskQueue, "tailTaskQueue"); }
#SingleThreadEventExecutor
protected SingleThreadEventExecutor(EventExecutorGroup parent, Executor executor, boolean addTaskWakesUp, Queue<Runnable> taskQueue, RejectedExecutionHandler rejectedHandler) { //最终到#AbstractEventExecutor,仅仅是赋值parent字段,表示当前的NioEventLoop属于哪个Group super(parent); //赋值操作 this.addTaskWakesUp = addTaskWakesUp; this.maxPendingTasks = DEFAULT_MAX_PENDING_EXECUTOR_TASKS; //在MultithreadEventExecutorGroup构造中创建的ThreadPerTaskExecutor,下面会用到,别忘了! this.executor = ThreadExecutorMap.apply(executor, this); this.taskQueue = ObjectUtil.checkNotNull(taskQueue, "taskQueue"); this.rejectedExecutionHandler = ObjectUtil.checkNotNull(rejectedHandler, "rejectedHandler"); }
到这里我们可以知道,NioEventLoop内部持有一个Selector,一个任务队列,还有一个ThreadPerTaskExecutor类型的线程池。在NioEventLoop的父类SingleThreadEventExecutor中还持有了Thread的引用,但到目前为止,我们都还没有看到线程是如何被创建出来的。还记得在前面几期里提到NioEventLoop执行的那些任务吗?它们都是通过NioEventLoop的execute()去提交任务,所以现在我们去看看execute()方法。
NioEventLoop的execute()方法是在它的父类SingleThreadEventExecutor中实现
#SingleThreadEventExecutor
@Override public void execute(Runnable task) { ObjectUtil.checkNotNull(task, "task"); execute(task, !(task instanceof LazyRunnable) && wakesUpForTask(task)); } private void execute(Runnable task, boolean immediate) { //注意!!! //在第一期Netty服务端启动流程里,是主线程调用的此方法,所以inEventLoop是false boolean inEventLoop = inEventLoop(); //向任务队列添加一个任务 addTask(task); //主线程调用,所以进入 if (!inEventLoop) { //核心方法 startThread(); //关闭状态就进入拒绝逻辑 if (isShutdown()) { boolean reject = false; try { if (removeTask(task)) { reject = true; } } catch (UnsupportedOperationException e) { } if (reject) { reject(); } } } if (!addTaskWakesUp && immediate) { wakeup(inEventLoop); } } private void startThread() { if (state == ST_NOT_STARTED) { if (STATE_UPDATER.compareAndSet(this, ST_NOT_STARTED, ST_STARTED)) { boolean success = false; try { doStartThread(); success = true; } finally { if (!success) { STATE_UPDATER.compareAndSet(this, ST_STARTED, ST_NOT_STARTED); } } } } } //核心方法 private void doStartThread() { assert thread == null; //executor:ThreadPerTaskExecutor //内部持有一个默认线程工厂,调用execute()就会创建一个线程 executor.execute(new Runnable() { @Override public void run() { //将当前线程赋值给thread //这时NioEventLoop内部持有线程了 thread = Thread.currentThread(); if (interrupted) { thread.interrupt(); } boolean success = false; updateLastExecutionTime(); try { //重点来了!!! //该线程调用了SingleThreadEventExecutor的run() //目前的实现类是NioEventLoop,所以会调用到NioEventLoop的run() SingleThreadEventExecutor.this.run(); success = true; } catch (Throwable t) { logger.warn("Unexpected exception from an event executor: ", t); } finally { //...... } } }); }
看到这里终于清楚了,绕了一个大弯,最终还是调用到了NioEventLoop的run()方法去了,前面几期说到过,NioEventLoop的run()就是核心,在里面会处理各种任务(io事件,用户提交的任务)。所以我们直接看run()。
这里的源码非常多,我们也是挑重点来讲。
@Override protected void run() { //epoll bug的一个特征计数变量。后面会解释 int selectCnt = 0; for (;;) { try { // 1. >=0 表示 selector的返回值,注册在多路复用器上 就绪的 个数。 // 2. <0 常量状态:CONTINUE BUSY_WAIT SELECT int strategy; try { //selectStrategy:DefaultSelectStrategy对象。 // 根据当前NioEventLoop 是否有本地任务,来决定怎么处理。 // 1.有任务,那么调用多路复用器的 selectNow() 方法,返回多路复用器上就绪channel个数。 // 2.没有任务,返回 -1 ,下面会根据常量再进行相应的逻辑。 strategy = selectStrategy.calculateStrategy(selectNowSupplier, hasTasks()); switch (strategy) { //-2 case SelectStrategy.CONTINUE: continue; //-3 case SelectStrategy.BUSY_WAIT: //-1 case SelectStrategy.SELECT: //因为NioEventLoop实现了ScheduledExecutorService,所以也可以执行调度任务 //获可调度任务执行截止时间,没有任务返回-1 long curDeadlineNanos = nextScheduledTaskDeadlineNanos(); if (curDeadlineNanos == -1L) { //EventLoop内没有需要周期性执行的任务。 //设置成long最大值。 curDeadlineNanos = NONE; // nothing on the calendar } nextWakeupNanos.set(curDeadlineNanos); try { //判断是否有任务需要执行 if (!hasTasks()) { //没有本地普通任务需要执行。 // 1.curDeadlineNanos long最大值,说明没有周期性任务的情况。 // 2.curDeadlineNanos 表示周期性任务需要执行的截止时间。 // 最终 strategy 表示 就绪的 ch 事件个数。 strategy = select(curDeadlineNanos); } } finally { nextWakeupNanos.lazySet(AWAKE); } default: } } catch (IOException e) { rebuildSelector0(); selectCnt = 0; handleLoopException(e); continue; } //epoll bug计数变量++ selectCnt++; cancelledKeys = 0; needsToSelectAgain = false; //线程处理IO事件的时间占比,默认是50%。 final int ioRatio = this.ioRatio; //表示本轮线程有没有处理过本地任务。 boolean ranTasks; //占比为100,说明IO优先,IO处理完之后,再处理本地任务。 if (ioRatio == 100) { try { if (strategy > 0) { //当前NioEventLoop的selector上 有就绪的 ch 事件。 //处理IO事件的方法入口。看过前面两期的朋友应该有影响 processSelectedKeys(); } } finally { //IO事件执行完后,执行本地任务队列内的任务。 ranTasks = runAllTasks(); } } //占比不是100,当前NioEventLoop内的selector上有就绪的事件。 else if (strategy > 0) { // IO 事件处理的开始时间。 final long ioStartTime = System.nanoTime(); try { //处理IO事件 processSelectedKeys(); } finally { //IO事件处理总耗时。 final long ioTime = System.nanoTime() - ioStartTime; //ioTime*(100 - ioRatio)/ioRatio 根据IO时间计算出一个执行本地队列任务的最大时间。 ranTasks = runAllTasks(ioTime * (100 - ioRatio) / ioRatio); } } //当前NioEventLoop内的selector上没有就绪的事件,只处理本地任务就可以了。 else { // 执行最少数量的 本地任务.. 最多 最多 执行 64 个任务。 ranTasks = runAllTasks(0); // This will run the minimum number of tasks } if (ranTasks || strategy > 0) { //...... //正常NioEventLoop线程从selector多路复用器上唤醒后工作是因为有io事件。 //会把selectCnt置为0。 selectCnt = 0; } else if (unexpectedSelectorWakeup(selectCnt)) { selectCnt = 0; } } catch (CancelledKeyException e) { //...... } catch (Throwable t) { handleLoopException(t); } try { if (isShuttingDown()) { closeAll(); if (confirmShutdown()) { return; } } } catch (Throwable t) { handleLoopException(t); } } }
整个方法看下来,NioEventLoop内的线程一直在for(;;)里面自旋,分别要处理Io事件和任务队列的任务。对于处理IO事件的
processSelectedKeys()方法,在处理客户端连接流程那一章开头分析过,没看过或忘记了的可以再去看看。在这里主要分析一下处理任务队列的方法runAllTasks(),和带有运行时间的runAllTasks(long timeoutNanos)这两个方法。
#SingleThreadEventExecutor
protected boolean runAllTasks() { assert inEventLoop(); boolean fetchedAll; boolean ranAtLeastOne = false; do { //fetchedAll:是否拿出所有的调度任务,没有的话,下一轮循环继续 fetchedAll = fetchFromScheduledTaskQueue(); //执行普通任务队列的任务 if (runAllTasksFrom(taskQueue)) { ranAtLeastOne = true; } } while (!fetchedAll); // keep on processing until we fetched all scheduled tasks. //至少处理了一个任务 if (ranAtLeastOne) { //设置一下最后执行时间 lastExecutionTime = ScheduledFutureTask.nanoTime(); } //执行tailTasks队列的方法,比较少见 afterRunningAllTasks(); return ranAtLeastOne; } private boolean fetchFromScheduledTaskQueue() { //队列是空直接返回 if (scheduledTaskQueue == null || scheduledTaskQueue.isEmpty()) { return true; } //获取当前时间 long nanoTime = AbstractScheduledEventExecutor.nanoTime(); //循环的从调度队列取出任务,放入普通任务队列 for (;;) { //pollScheduledTask():从调度任务队列取一个已经到截止时间的调度任务,并且从调度队列中移除这个任务 Runnable scheduledTask = pollScheduledTask(nanoTime); if (scheduledTask == null) { return true; } //将调度任务添加到普通任务队列 if (!taskQueue.offer(scheduledTask)) { //添加失败,说明普通任务队列已满,要将此任务重新放回到调度任务队列 scheduledTaskQueue.add((ScheduledFutureTask<?>) scheduledTask); return false; } } } protected final boolean runAllTasksFrom(Queue<Runnable> taskQueue) { //从普通任务队列取一个任务 Runnable task = pollTaskFrom(taskQueue); //第一个任务为null,说明队列为空 if (task == null) { //快速失败,这种思想在AQS的节点入队里可以看到 return false; } //自旋,处理任务队列中的任务 for (;;) { //安全的执行每一个任务 safeExecute(task); //取出下一个任务 task = pollTaskFrom(taskQueue); if (task == null) { //队列为空,所有任务处理完毕 return true; } } }
以上是无参runAllTasks()方法的分析,就是将调度队列的任务移除并加入到普通任务队列中,然后循环并处理普通任务队列中所有的任务。接下来再看看带有运行时间的runAllTasks(long timeoutNanos)。
#SingleThreadEventExecutor
//timeoutNanos:最多执行该纳秒 protected boolean runAllTasks(long timeoutNanos) { //将调度队列的任务移除并添加到普通任务队列 fetchFromScheduledTaskQueue(); Runnable task = pollTask(); if (task == null) { //快速失败,并执行tailTasks队列的方法,比较少见 afterRunningAllTasks(); return false; } //计算处理任务的截止时间 final long deadline = timeoutNanos > 0 ? ScheduledFutureTask.nanoTime() + timeoutNanos : 0; //处理的任务数量 long runTasks = 0; long lastExecutionTime; for (;;) { //安全执行任务 safeExecute(task); //处理任务数++ runTasks ++; //0X3F:111111 十进制为63 //什么数 & 63 为0 ? // 1000000 & 111111 64 //10000000 & 111111 128 //11000000 & 111111 192 ... //64的倍数 & 63 为0,说明每执行64个任务后,该条件会成立 if ((runTasks & 0x3F) == 0) { //更新最后执行时间 lastExecutionTime = ScheduledFutureTask.nanoTime(); //如果最后执行时间超过执行此方法的截止时间,有两种情况 //1. timeoutNanos不为0,那么会执行到截止时间 //2. timeoutNanos为0,那么最多只会执行64个任务 if (lastExecutionTime >= deadline) { //退出循环 break; } } //取下一个任务 task = pollTask(); if (task == null) { //为null就更新时间 lastExecutionTime = ScheduledFutureTask.nanoTime(); break; } } //执行tailTasks队列的方法,比较少见 afterRunningAllTasks(); //将最后执行时间赋值给成员属性 this.lastExecutionTime = lastExecutionTime; return true; }
从这个方法可以看出有两个分支,分别是timeoutNanos是否为0的两种情况,在NioEventLoop的run()方法中也可以找到这两个分支。在runAllTasks(long timeoutNanos)方法内,每执行64个任务就会进行一次判断,看执行普通任务是否超时,除了这一判断,对任务的处理和runAllTasks()基本一致。
以上就是NioEventLoop执行普通任务的逻辑,难度不是很大,没有理清的朋友可以再去梳理一遍流程。
在NioEventLoop的run()内,我们还留了一个问题没有解释,就是selectCnt变量,是用来处理epoll bug的。下面就对这个问题进行讲解。
我们再来看看run()方法,这次只注释了跟epoll bug有关的代码
@Override protected void run() { // epoll bug 计数变量。 int selectCnt = 0; for (;;) { try { int strategy; try { strategy = selectStrategy.calculateStrategy(selectNowSupplier, hasTasks()); switch (strategy) { case SelectStrategy.CONTINUE: continue; case SelectStrategy.BUSY_WAIT: case SelectStrategy.SELECT: long curDeadlineNanos = nextScheduledTaskDeadlineNanos(); if (curDeadlineNanos == -1L) { curDeadlineNanos = NONE; } nextWakeupNanos.set(curDeadlineNanos); try { if (!hasTasks()) { //正常来说,没有任务或者IO事件时,线程会在此处阻塞 //直到有就绪的channel,或者到了调度任务的截止时间才会唤醒 //如果触发epoll bug,selector.select()在没有channel就绪时也会被唤醒 //导致线程一直在for(;;)里面空轮询,cpu使用率100% strategy = select(curDeadlineNanos); } } finally { nextWakeupNanos.lazySet(AWAKE); } default: } } catch (IOException e) { //IO异常也重建selector,下面会说到 rebuildSelector0(); selectCnt = 0; handleLoopException(e); continue; } //epoll bug 计数变量 //正常来说,线程没有在上面的select()里面阻塞住,说明是有任务要执行 //但是如果没有任务执行,线程就会一直空轮询,然后对selectCnt++ //所以触发bug后,selectCnt在短时间内会变的很大 selectCnt++; cancelledKeys = 0; needsToSelectAgain = false; final int ioRatio = this.ioRatio; //是否执行了任务,执行了为true,没有执行为false //这是为了判断有普通任务需要执行的情况 //因为普通任务可能执行时间非常短,然后导致线程一直自旋,selectCnt也会累加的很大 boolean ranTasks; if (ioRatio == 100) { try { if (strategy > 0) { processSelectedKeys(); } } finally { //ranTasks赋值 ranTasks = runAllTasks(); } } else if (strategy > 0) { final long ioStartTime = System.nanoTime(); try { processSelectedKeys(); } finally { final long ioTime = System.nanoTime() - ioStartTime; //ranTasks赋值 ranTasks = runAllTasks(ioTime * (100 - ioRatio) / ioRatio); } } else { //ranTasks赋值 ranTasks = runAllTasks(0); } //ranTasks判断,执行了普通任务条件一定成立 if (ranTasks || strategy > 0) { //MIN_PREMATURE_SELECTOR_RETURNS=3 //可能是调度任务截止时间没差很远,在短时间内selector.select()唤醒了好几次 //仅仅打印一下日志 if (selectCnt > MIN_PREMATURE_SELECTOR_RETURNS && logger.isDebugEnabled()) { logger.debug("Selector.select() returned prematurely {} times in a row for Selector {}.", selectCnt - 1, selector); } //到这里说明执行了IO事件或者普通任务,判断selector没有出现epoll bug, //将selectCnt归0 selectCnt = 0; //如果进入下面这个if分支 //ranTasks为false,没有执行过普通任务 //strategy<=0 说明selector上没有就绪channel,但没有被selector.select()阻塞 //进去看unexpectedSelectorWakeup(selectCnt) } else if (unexpectedSelectorWakeup(selectCnt)) { //将属性重新赋值 selectCnt = 0; } } catch (CancelledKeyException e) { if (logger.isDebugEnabled()) { logger.debug(CancelledKeyException.class.getSimpleName() + " raised by a Selector {} - JDK bug?", selector, e); } } catch (Throwable t) { handleLoopException(t); } try { if (isShuttingDown()) { closeAll(); if (confirmShutdown()) { return; } } } catch (Throwable t) { handleLoopException(t); } } } private boolean unexpectedSelectorWakeup(int selectCnt) { //线程被中断,不是重点 if (Thread.interrupted()) { if (logger.isDebugEnabled()) { logger.debug("Selector.select() returned prematurely because " + "Thread.currentThread().interrupt() was called. Use " + "NioEventLoop.shutdownGracefully() to shutdown the NioEventLoop."); } return true; } //SELECTOR_AUTO_REBUILD_THRESHOLD:默认值是512 //如果selectCnt>=512,Netty就认为触发了epoll bug if (SELECTOR_AUTO_REBUILD_THRESHOLD > 0 && selectCnt >= SELECTOR_AUTO_REBUILD_THRESHOLD) { logger.warn("Selector.select() returned prematurely {} times in a row; rebuilding Selector {}.", selectCnt, selector); //重建Selector rebuildSelector(); return true; } return false; } public void rebuildSelector() { //正常流程是在EventLoop线程发生 if (!inEventLoop()) { execute(new Runnable() { @Override public void run() { rebuildSelector0(); } }); return; } //走这个逻辑,重建selector rebuildSelector0(); } private void rebuildSelector0() { //获取已经触发bug的selector final Selector oldSelector = selector; final SelectorTuple newSelectorTuple; if (oldSelector == null) { return; } try { //重新创建selector newSelectorTuple = openSelector(); } catch (Exception e) { logger.warn("Failed to create a new Selector.", e); return; } int nChannels = 0; //遍历老的selector,将channel重新注册到新的selector上 for (SelectionKey key: oldSelector.keys()) { //channel Object a = key.attachment(); try { if (!key.isValid() || key.channel().keyFor(newSelectorTuple.unwrappedSelector) != null) { continue; } //获取老的selector的感兴趣事件 int interestOps = key.interestOps(); //销毁 key.cancel(); //注册channel到新的selector上 SelectionKey newKey = key.channel().register(newSelectorTuple.unwrappedSelector, interestOps, a); if (a instanceof AbstractNioChannel) { ((AbstractNioChannel) a).selectionKey = newKey; } nChannels ++; } catch (Exception e) { logger.warn("Failed to re-register a Channel to the new Selector.", e); if (a instanceof AbstractNioChannel) { AbstractNioChannel ch = (AbstractNioChannel) a; ch.unsafe().close(ch.unsafe().voidPromise()); } else { @SuppressWarnings("unchecked") NioTask<SelectableChannel> task = (NioTask<SelectableChannel>) a; invokeChannelUnregistered(task, key, e); } } } //给NioEventLoop的字段重新赋值 selector = newSelectorTuple.selector; unwrappedSelector = newSelectorTuple.unwrappedSelector; try { //关闭老的selector oldSelector.close(); } catch (Throwable t) { if (logger.isWarnEnabled()) { logger.warn("Failed to close the old Selector.", t); } } if (logger.isInfoEnabled()) { logger.info("Migrated " + nChannels + " channel(s) to the new Selector."); } }
到这里Netty解决epoll bug的办法就讲解完成了。总结一下,在run()方法内通过两个变量,循环计数和是否处理过普通任务来进行判断,当前selector是否触发了bug,如果触发就进行重建,并将旧的selector上注册的channel重新注册到新的selector上,这样就完成了selector的重建,然后又可以继续处理IO事件或普通任务了。
以上就是NioEventLoop的源码分析,看完后我们对EventLoopGroup和EventLoop有了更深的认识。EventLoopGroup中并不保存线程实例,而是保存了一个选择器(用来选出单个EventLoop)和一个EventLoop数组。而EventLoop内部保存了线程实例,selector,普通任务队列和调度任务队列,并且会在调用execute()方法后开启线程,回调到EventLoop的run()方法内进行处理IO事件和普通任务。如果到这里还是不太清楚过程的朋友,最好打开源码按照本文的流程自行梳理几遍,相信很快就能掌握这些知识了。