GlobalEventExecutor 是AbstractScheduledEventExecutor 的实现,就是提供了一个单线程的单例,然后自动启动线程去执行任务,且如果空闲(即没有任务)超过1s则停止。 需要注意的是, 经测试, 这里的1s 是指总共耗时超过1s。 就是说,如果一个任务执行n久,然后执行完毕,然后0.2s 后又来了一个任务,然后这个0.2 是计算在内的,如果后面累计又停止0.8,那么加起来就是1s,那么就会终止线程,然后会重新创建一个新的线程。
这个做法感觉是非常奇怪的! 为什么会这样?它是怎么实现的呢?
首先看到 GlobalEventExecutor的关键属性有 quietPeriodTask , taskRunner, threadFactory,thread;
threadFactory仅仅用来创建 新的thread
thread 用来执行taskRunner
/** * Single-thread singleton {@link EventExecutor}. It starts the thread automatically and stops it when there is no * task pending in the task queue for 1 second. Please note it is not scalable to schedule large number of tasks to * this executor; use a dedicated executor. */ public final class GlobalEventExecutor extends AbstractScheduledEventExecutor implements OrderedEventExecutor { private static final long SCHEDULE_QUIET_PERIOD_INTERVAL = TimeUnit.SECONDS.toNanos(1); public static final GlobalEventExecutor INSTANCE = new GlobalEventExecutor(); final BlockingQueue<Runnable> taskQueue = new LinkedBlockingQueue<Runnable>(); final ScheduledFutureTask<Void> quietPeriodTask = new ScheduledFutureTask<Void>( this, Executors.<Void>callable(new Runnable() { @Override public void run() { // NOOP } }, null), ScheduledFutureTask.deadlineNanos(SCHEDULE_QUIET_PERIOD_INTERVAL), -SCHEDULE_QUIET_PERIOD_INTERVAL); // because the GlobalEventExecutor is a singleton, tasks submitted to it can come from arbitrary threads and this // can trigger the creation of a thread from arbitrary thread groups; for this reason, the thread factory must not // be sticky about its thread group // visible for testing final ThreadFactory threadFactory; // 仅仅用来创建 新的thread private final TaskRunner taskRunner = new TaskRunner(); private final AtomicBoolean started = new AtomicBoolean(); volatile Thread thread; private GlobalEventExecutor() { scheduledTaskQueue().add(quietPeriodTask); threadFactory = ThreadExecutorMap.apply(new DefaultThreadFactory( DefaultThreadFactory.toPoolName(getClass()), false, Thread.NORM_PRIORITY, null), this); } ... }
quietPeriodTask 当然也是Runnable,而且是ScheduledFutureTask,而且是很特殊的ScheduledFutureTask,因为它的period 参数是负数,这就意味着,它的deadlineNanos = nanoTime() - periodNanos; 就是它的deadline 时间是当前时间1s 之后。 因为ScheduledFutureTask 是使用优先级队列来实现的,那么它period 参数是负数意味着 quietPeriodTask 是被排在最末尾的。
观察源码 ,发现它其实就是专门用来计算 空闲时间的,每当正常提交的任务执行完了之后,它就开始执行,但是它 也不是真正的执行,他仅仅是等待;最多等待1s;如果等待的过程中有新的task任务,那么等待会被唤醒,这个是由 阻塞队列的性质决定的:
java.util.concurrent.ArrayBlockingQueue#offer(E)
public boolean offer(E e) { checkNotNull(e); final ReentrantLock lock = this.lock; lock.lock(); try { if (count == items.length) return false; else { enqueue(e); return true; } } finally { lock.unlock(); } }
private void enqueue(E x) { // assert lock.getHoldCount() == 1; // assert items[putIndex] == null; final Object[] items = this.items; items[putIndex] = x; if (++putIndex == items.length) putIndex = 0; count++; notEmpty.signal(); // 唤醒 }
TaskRunner 长什么样,TaskRunner 其实也是一个Runnable ,然后里面又是 先scheduledTaskQueue再 taskQueue 的顺序获取task,然后调用了task即Runnable 的run 方法, final class TaskRunner implements Runnable {
@Override public void run() { for (;;) { Runnable task = takeTask(); // 不管怎么样, 这里肯定都能获取一个task, 可能是正常提交的task, 如果没有,那么就是quietPeriodTask if (task != null) { // 一般情况下, 是不会返回null 的 try { task.run();// 这里让 task 执行完毕 } catch (Throwable t) { logger.warn("Unexpected exception from the global event executor: ", t); } if (task != quietPeriodTask) { continue;// 如果不是quietPeriodTask 那么说明还有其他正常提交的task, 不要执行后面的代码, 否则 通过后面的代码,检查 } } Queue<ScheduledFutureTask<?>> scheduledTaskQueue = GlobalEventExecutor.this.scheduledTaskQueue; // Terminate if there is no task in the queue (except the noop task). // 什么情况下,size 为1呢? 怎么会? 如果size 为1,那么 肯定就是quietPeriodTask 吧 if (taskQueue.isEmpty() && (scheduledTaskQueue == null || scheduledTaskQueue.size() == 1)) {// 如果定时调度任务queue里面为空,或者只有一个任务, // Mark the current thread as stopped. // The following CAS must always success and must be uncontended, // because only one thread should be running at the same time. boolean stopped = started.compareAndSet(true, false);// 走到这里,应该还只有一个线程在执行; 安全的设置started为false,表示正在停止 assert stopped;// 必须cas 成功 // Check if there are pending entries added by execute() or schedule*() while we do CAS above. // Do not check scheduledTaskQueue because it is not thread-safe and can only be mutated from a // TaskRunner actively running tasks. if (taskQueue.isEmpty()) { // A) No new task was added and thus there's nothing to handle // -> safe to terminate because there's nothing left to do // B) A new thread started and handled all the new tasks. // -> safe to terminate the new thread will take care the rest break;// 如果真的没有 正常提交的task了, 那么break,那么TaskRunner 也将会执行完毕,然后线程结束! } // There are pending tasks added again. if (!started.compareAndSet(false, true)) { // 走到这里, 说明上面的判断,即taskQueue.isEmpty 为false,那么还有任务需要执行,那么再次cas方式设置started为true,表示正在执行 // startThread() started a new thread and set 'started' to true. // -> terminate this thread so that the new thread reads from taskQueue exclusively. break; // 如果无法 cas方式设置started, 那么可能是其他其他线程已经启动,那么当前线程也无须再继续执行了! break 方式退出是唯一正常的操作! } // New tasks were added, but this worker was faster to set 'started' to true. // i.e. a new worker thread was not started by startThread(). // -> keep this thread alive to handle the newly added entries. } } } }
takeTask是关键
/** * Take the next {@link Runnable} from the task queue and so will block if no task is currently present. * * @return {@code null} if the executor thread has been interrupted or waken up. 一般情况下不返回null, 除非被中断,或者唤醒。 唤醒? */ Runnable takeTask() { BlockingQueue<Runnable> taskQueue = this.taskQueue; for (;;) { ScheduledFutureTask<?> scheduledTask = peekScheduledTask(); // 这里仅仅是 peek, why?因为 我们永远只从scheduledTaskQueue头部取,取出来后执行,执行完后 它会自动的自己重新放入queue, peek 其实是只有在 scheduledTaskQueue 为空的情况下才会返回 null, if (scheduledTask == null) { // 如果scheduledTaskQueue 都没有了数据, Runnable task = null; try { task = taskQueue.take();//那么从taskQueue中获取; 那么为什么需要先检查scheduledTaskQueue 是否有数据? } catch (InterruptedException e) { // Ignore } return task;// 这里,也有可能返回的是 quietPeriodTask } else { long delayNanos = scheduledTask.delayNanos(); // 这个是 scheduledTask 需要等待的时间 Runnable task = null; if (delayNanos > 0) { try { // 既然scheduledTask 还需要等待delayNanos , 那也不能白白等待, 不如在这个时间内趁机看看taskQueue 还有任务, 有任务,马上执行。 否则等待最多delayNanos; 超时了自然就是执行scheduledTask task = taskQueue.poll(delayNanos, TimeUnit.NANOSECONDS); 如果scheduledTaskQueue 有定时任务,那么, poll 的方式等待它, 直到超时,或者 被唤醒; 这个做法其实有点奇怪。 // 这里的poll , 可能消耗的就是 quietPeriodTask 的时间 } catch (InterruptedException e) { // Waken up. return null; } } if (task == null) { 等待delayNanos;taskQueue 还是没有任务,超时了自然就是执行scheduledTask // We need to fetch the scheduled tasks now as otherwise there may be a chance that // scheduled tasks are never executed if there is always one task in the taskQueue. // This is for example true for the read task of OIO Transport // See https://github.com/netty/netty/issues/1614 fetchFromScheduledTaskQueue(); // 如果还是为空,那么 scheduledTaskQueue的数据全部取出放到taskQueue;为什么全部?这样做是为了防止如果taskQueue 永远有任务,那么scheduledTaskQueue 没有机会执行 task = taskQueue.poll(); // 然后从taskQueue中获取 } if (task != null) { return task; // 这里, 如果没有其他task, 返回的肯定就是 quietPeriodTask } } } }
关于ScheduledFutureTask, 具体可以看看io.netty.util.concurrent.ScheduledFutureTask#run