C/C++教程

netty 之 GlobalEventExecutor

本文主要是介绍netty 之 GlobalEventExecutor,对大家解决编程问题具有一定的参考价值,需要的程序猿们随着小编来一起学习吧!

GlobalEventExecutor 是AbstractScheduledEventExecutor  的实现,就是提供了一个单线程的单例,然后自动启动线程去执行任务,且如果空闲(即没有任务)超过1s则停止。 需要注意的是, 经测试, 这里的1s 是指总共耗时超过1s。 就是说,如果一个任务执行n久,然后执行完毕,然后0.2s 后又来了一个任务,然后这个0.2 是计算在内的,如果后面累计又停止0.8,那么加起来就是1s,那么就会终止线程,然后会重新创建一个新的线程。

 

这个做法感觉是非常奇怪的! 为什么会这样?它是怎么实现的呢?

 

首先看到 GlobalEventExecutor的关键属性有 quietPeriodTask , taskRunner, threadFactory,thread;

threadFactory仅仅用来创建 新的thread

thread 用来执行taskRunner

/**
 * Single-thread singleton {@link EventExecutor}.  It starts the thread automatically and stops it when there is no
 * task pending in the task queue for 1 second.  Please note it is not scalable to schedule large number of tasks to
 * this executor; use a dedicated executor.
 */
public final class GlobalEventExecutor extends AbstractScheduledEventExecutor implements OrderedEventExecutor {

    private static final long SCHEDULE_QUIET_PERIOD_INTERVAL = TimeUnit.SECONDS.toNanos(1);

    public static final GlobalEventExecutor INSTANCE = new GlobalEventExecutor();

    final BlockingQueue<Runnable> taskQueue = new LinkedBlockingQueue<Runnable>();
    final ScheduledFutureTask<Void> quietPeriodTask = new ScheduledFutureTask<Void>(
            this, Executors.<Void>callable(new Runnable() {
        @Override
        public void run() {
            // NOOP
        }
    }, null), ScheduledFutureTask.deadlineNanos(SCHEDULE_QUIET_PERIOD_INTERVAL), -SCHEDULE_QUIET_PERIOD_INTERVAL);

    // because the GlobalEventExecutor is a singleton, tasks submitted to it can come from arbitrary threads and this
    // can trigger the creation of a thread from arbitrary thread groups; for this reason, the thread factory must not
    // be sticky about its thread group
    // visible for testing
    final ThreadFactory threadFactory; // 仅仅用来创建 新的thread
    private final TaskRunner taskRunner = new TaskRunner();
    private final AtomicBoolean started = new AtomicBoolean();
    volatile Thread thread; 


    private GlobalEventExecutor() {
        scheduledTaskQueue().add(quietPeriodTask);
        threadFactory = ThreadExecutorMap.apply(new DefaultThreadFactory(
                DefaultThreadFactory.toPoolName(getClass()), false, Thread.NORM_PRIORITY, null), this);
    }
...

}

quietPeriodTask 当然也是Runnable,而且是ScheduledFutureTask,而且是很特殊的ScheduledFutureTask,因为它的period 参数是负数,这就意味着,它的deadlineNanos = nanoTime() - periodNanos; 就是它的deadline 时间是当前时间1s 之后。 因为ScheduledFutureTask 是使用优先级队列来实现的,那么它period 参数是负数意味着 quietPeriodTask  是被排在最末尾的。

 

观察源码 ,发现它其实就是专门用来计算 空闲时间的,每当正常提交的任务执行完了之后,它就开始执行,但是它 也不是真正的执行,他仅仅是等待;最多等待1s;如果等待的过程中有新的task任务,那么等待会被唤醒,这个是由 阻塞队列的性质决定的:

 java.util.concurrent.ArrayBlockingQueue#offer(E)

    public boolean offer(E e) {
        checkNotNull(e);
        final ReentrantLock lock = this.lock;
        lock.lock();
        try {
            if (count == items.length)
                return false;
            else {
                enqueue(e);
                return true;
            }
        } finally {
            lock.unlock();
        }
    }

private void enqueue(E x) {
    // assert lock.getHoldCount() == 1;
    // assert items[putIndex] == null;
    final Object[] items = this.items;
    items[putIndex] = x;
    if (++putIndex == items.length)
        putIndex = 0;
    count++;
    notEmpty.signal(); // 唤醒
}
 

 

TaskRunner  长什么样,TaskRunner  其实也是一个Runnable ,然后里面又是 先scheduledTaskQueue再 taskQueue 的顺序获取task,然后调用了task即Runnable  的run 方法,     final class TaskRunner implements Runnable {

        @Override
        public void run() {
            for (;;) {
                Runnable task = takeTask(); // 不管怎么样, 这里肯定都能获取一个task, 可能是正常提交的task, 如果没有,那么就是quietPeriodTask 
                if (task != null) { // 一般情况下, 是不会返回null 的
                    try {
                        task.run();// 这里让 task 执行完毕
                    } catch (Throwable t) {
                        logger.warn("Unexpected exception from the global event executor: ", t);
                    }

                    if (task != quietPeriodTask) {
                        continue;// 如果不是quietPeriodTask 那么说明还有其他正常提交的task, 不要执行后面的代码, 否则 通过后面的代码,检查
                    }
                }

                Queue<ScheduledFutureTask<?>> scheduledTaskQueue = GlobalEventExecutor.this.scheduledTaskQueue;
                // Terminate if there is no task in the queue (except the noop task).

          // 什么情况下,size 为1呢? 怎么会?  如果size 为1,那么 肯定就是quietPeriodTask 吧

                if (taskQueue.isEmpty() && (scheduledTaskQueue == null || scheduledTaskQueue.size() == 1)) {// 如果定时调度任务queue里面为空,或者只有一个任务,
                    // Mark the current thread as stopped.
                    // The following CAS must always success and must be uncontended,
                    // because only one thread should be running at the same time.
                    boolean stopped = started.compareAndSet(true, false);//  走到这里,应该还只有一个线程在执行; 安全的设置started为false,表示正在停止
                    assert stopped;// 必须cas 成功

                    // Check if there are pending entries added by execute() or schedule*() while we do CAS above.
                    // Do not check scheduledTaskQueue because it is not thread-safe and can only be mutated from a
                    // TaskRunner actively running tasks.
                    if (taskQueue.isEmpty()) {
                        // A) No new task was added and thus there's nothing to handle
                        //    -> safe to terminate because there's nothing left to do
                        // B) A new thread started and handled all the new tasks.
                        //    -> safe to terminate the new thread will take care the rest
                        break;//  如果真的没有 正常提交的task了, 那么break,那么TaskRunner  也将会执行完毕,然后线程结束!
                    }

                    // There are pending tasks added again.
                    if (!started.compareAndSet(false, true)) { // 走到这里, 说明上面的判断,即taskQueue.isEmpty 为false,那么还有任务需要执行,那么再次cas方式设置started为true,表示正在执行
                        // startThread() started a new thread and set 'started' to true.
                        // -> terminate this thread so that the new thread reads from taskQueue exclusively.
                        break; // 如果无法 cas方式设置started, 那么可能是其他其他线程已经启动,那么当前线程也无须再继续执行了! break 方式退出是唯一正常的操作!
                    }

                    // New tasks were added, but this worker was faster to set 'started' to true.
                    // i.e. a new worker thread was not started by startThread().
                    // -> keep this thread alive to handle the newly added entries.
                }
            }
        }
    }

 

takeTask是关键

/**
     * Take the next {@link Runnable} from the task queue and so will block if no task is currently present.
     *
     * @return {@code null} if the executor thread has been interrupted or waken up.  一般情况下不返回null, 除非被中断,或者唤醒。 唤醒?
     */
    Runnable takeTask() {
        BlockingQueue<Runnable> taskQueue = this.taskQueue;
        for (;;) {
            ScheduledFutureTask<?> scheduledTask = peekScheduledTask(); // 这里仅仅是 peek, why?因为 我们永远只从scheduledTaskQueue头部取,取出来后执行,执行完后 它会自动的自己重新放入queue,
        peek 其实是只有在 scheduledTaskQueue 为空的情况下才会返回 null,


            if (scheduledTask == null) { // 如果scheduledTaskQueue 都没有了数据,
                Runnable task = null;
                try {
                    task = taskQueue.take();//那么从taskQueue中获取; 那么为什么需要先检查scheduledTaskQueue 是否有数据?
                } catch (InterruptedException e) {
                    // Ignore
                }
                return task;// 这里,也有可能返回的是 quietPeriodTask  
            } else {
                long delayNanos = scheduledTask.delayNanos(); // 这个是 scheduledTask 需要等待的时间
                Runnable task = null;
                if (delayNanos > 0) {
                    try { // 既然scheduledTask 还需要等待delayNanos , 那也不能白白等待, 不如在这个时间内趁机看看taskQueue 还有任务, 有任务,马上执行。 否则等待最多delayNanos; 超时了自然就是执行scheduledTask
                        task = taskQueue.poll(delayNanos, TimeUnit.NANOSECONDS); 如果scheduledTaskQueue 有定时任务,那么, poll 的方式等待它, 直到超时,或者 被唤醒; 这个做法其实有点奇怪。
                    // 这里的poll , 可能消耗的就是 quietPeriodTask  的时间
                    } catch (InterruptedException e) {
                        // Waken up.
                        return null;
                    }
                }
                if (task == null) { 等待delayNanos;taskQueue 还是没有任务,超时了自然就是执行scheduledTask
                    // We need to fetch the scheduled tasks now as otherwise there may be a chance that
                    // scheduled tasks are never executed if there is always one task in the taskQueue.
                    // This is for example true for the read task of OIO Transport
                    // See https://github.com/netty/netty/issues/1614
                    fetchFromScheduledTaskQueue(); // 如果还是为空,那么 scheduledTaskQueue的数据全部取出放到taskQueue;为什么全部?这样做是为了防止如果taskQueue 永远有任务,那么scheduledTaskQueue 没有机会执行
                    task = taskQueue.poll(); //  然后从taskQueue中获取
                }

                if (task != null) {
                    return task; // 这里, 如果没有其他task, 返回的肯定就是 quietPeriodTask  
                }
            }
        }
    }

 

关于ScheduledFutureTask,  具体可以看看io.netty.util.concurrent.ScheduledFutureTask#run 

 

这篇关于netty 之 GlobalEventExecutor的文章就介绍到这儿,希望我们推荐的文章对大家有所帮助,也希望大家多多支持为之网!