C/C++教程

ThreadPoolExecutor添加线程源码解析——addWorker

本文主要是介绍ThreadPoolExecutor添加线程源码解析——addWorker,对大家解决编程问题具有一定的参考价值,需要的程序猿们随着小编来一起学习吧!

ThreadPoolExecutor添加线程源码解析——addWorker

该方法的主要目的就是为了向线程池中创建线程(worker),并执行线程。

    private boolean addWorker(Runnable firstTask, boolean core) {
        retry:
        for (;;) {
            int c = ctl.get();
            int rs = runStateOf(c);

            // Check if queue empty only if necessary.
            if (rs >= SHUTDOWN &&
                ! (rs == SHUTDOWN &&
                   firstTask == null &&
                   ! workQueue.isEmpty()))
                return false;

            for (;;) {
                int wc = workerCountOf(c);
                if (wc >= CAPACITY ||
                    wc >= (core ? corePoolSize : maximumPoolSize))
                    return false;
                if (compareAndIncrementWorkerCount(c))
                    break retry;
                c = ctl.get();  // Re-read ctl
                if (runStateOf(c) != rs)
                    continue retry;
                // else CAS failed due to workerCount change; retry inner loop
            }
        }

        boolean workerStarted = false;
        boolean workerAdded = false;
        Worker w = null;
        try {
            w = new Worker(firstTask);
            final Thread t = w.thread;
            if (t != null) {
                final ReentrantLock mainLock = this.mainLock;
                mainLock.lock();
                try {
                    // Recheck while holding lock.
                    // Back out on ThreadFactory failure or if
                    // shut down before lock acquired.
                    int rs = runStateOf(ctl.get());

                    if (rs < SHUTDOWN ||
                        (rs == SHUTDOWN && firstTask == null)) {
                        if (t.isAlive()) // precheck that t is startable
                            throw new IllegalThreadStateException();
                        workers.add(w);
                        int s = workers.size();
                        if (s > largestPoolSize)
                            largestPoolSize = s;
                        workerAdded = true;
                    }
                } finally {
                    mainLock.unlock();
                }
                if (workerAdded) {
                    t.start();
                    workerStarted = true;
                }
            }
        } finally {
            if (! workerStarted)
                addWorkerFailed(w);
        }
        return workerStarted;
    }

该方法代码很长,也很难理解。 这里我将它分成两个部分。逐一击破。

    private boolean addWorker(Runnable firstTask, boolean core) {

        //第一部分: 主要是判断当前线程池是否可以添加线程
        retry:
        for (;;) {...}


		//第二部分:执行添加线程和执行线程的逻辑
        boolean workerStarted = false;
        boolean workerAdded = false;
        Worker w = null;
        try {...} finally {
            if (! workerStarted)
                addWorkerFailed(w);
        }
        return workerStarted;
    }

这两部分主要的工作内容由注释可知。

第一部分

  private boolean addWorker(Runnable firstTask, boolean core) {    
		retry:
        for (;;) {
            int c = ctl.get();
            int rs = runStateOf(c);

            // Check if queue empty only if necessary.
            if (rs >= SHUTDOWN &&
                ! (rs == SHUTDOWN &&
                   firstTask == null &&
                   ! workQueue.isEmpty()))
                return false;

            for (;;) {
                int wc = workerCountOf(c);
                if (wc >= CAPACITY ||
                    wc >= (core ? corePoolSize : maximumPoolSize))
                    return false;
                if (compareAndIncrementWorkerCount(c))
                    break retry;
                c = ctl.get();  // Re-read ctl
                if (runStateOf(c) != rs)
                    continue retry;
                // else CAS failed due to workerCount change; retry inner loop
            }
        }
   	
      
      //....第二部分的代码省略
  }

这是第一部分的代码。 在前面execute()方法中 频繁的调用了 addWorker()方法,但是参数都不相同。

//添加一个有初始任务的线程,且当前线程池中的数量加上该线程小于等于核心线程数
addWorker(command, true) 
    
//添加一个有初始任务的线程,且当前线程池中的数量加上该线程小于等于最大线程数
addWorker(command, false)
    
//添加一个没有初始任务的线程,且当前线程池中的数量加上该线程小于等于最大线程数
addWorker(null, false)

addWorker()方法中两个参数 Runnable firstTask, boolean core 的含义分别是:

  • Runnable firstTask : 代表提交的任务。

    ​ 当firstTask 为null时 , 创建的是一个不带初始任务的线程(worker)

  • boolean core:代表是否使用核心线程数的限制

    ​ true 代表使用核心线程数(corePoolSize)的限制

    ​ false 代表使用最大线程数(maximumPoolSize)的限制

        retry:
        for (;;) {
            
            //获取最新的ctl,并根据ctl获取当前线程池的状态
            int c = ctl.get();
            int rs = runStateOf(c);

            // Check if queue empty only if necessary.
            if (rs >= SHUTDOWN &&
                ! (rs == SHUTDOWN &&
                   firstTask == null &&
                   ! workQueue.isEmpty()))
                return false;

            for (;;) {...}
        }

这里有一个外自旋中内嵌了一个内自旋。 先看内自旋外的代码。

if (rs >= SHUTDOWN &&
    ! (rs == SHUTDOWN &&
       firstTask == null &&
       ! workQueue.isEmpty()))
    return false;

条件成立则不允许往线程池中添加线程。

从该条件判断中得知有以下几种情况不允许往线程池中添加线程:

  • 当前线程池的状态 > SHUTDOWN (RUNNING , SHUTDOWN ,STOP , TIDYING , TERMINATED )
  • 当前线程池的状态 = SHUTDOWN 且 创建的线程(worker)会有初始任务(firstTask!=null)
  • 当前线程池的状态 = SHUTDOWN , 且 没有初始任务(firstTask==null) 但任务队列是空(workQueue.isEmpty())
            for (;;) {
                int wc = workerCountOf(c);
                if (wc >= CAPACITY ||
                    wc >= (core ? corePoolSize : maximumPoolSize))
                    return false;
                if (compareAndIncrementWorkerCount(c))
                    break retry;
                c = ctl.get();  // Re-read ctl
                if (runStateOf(c) != rs)
                    continue retry;
                // else CAS failed due to workerCount change; retry inner loop
            }

换言之若要走到内自旋,首先要保证有以下两种情况的其中之一:

  • 当前线程池的状态是RUNNING
  • 当前线程池的状态是SHUTDOWN,不带有初始任务(firstTask) 且任务队列(workQueue)不为空

int wc = workerCountOf(c); :根据ctl获得目前线程池中的线程数量

if (wc >= CAPACITY ||wc >= (core ? corePoolSize : maximumPoolSize)) : 前面说过core 参数是为了规定添加线程要使用哪种数量限制。根据当前线程池的线程数量与规定的限制比较来决定能否继续添加线程。

if (compareAndIncrementWorkerCount(c) : 该条件判断中是CAS操作,走到该条件时,说明当前线程池已经允许添加线程了。 若该CAS操作成功 则退出外自旋,去执行添加线程和执行线程的操作。 若该CAS失败,则根据后面的判断决定是继续内自旋,还是重新外自旋。

compareAndIncrementWorkerCount(c) :该CAS操作就是将ctl值 加1 (当前线程池中的线程数量加1) , 其主要目的就是为了与其它提交任务的线程争抢该线程池的位置,以此来确保自己不被corePoolSize或者maximumPoolSize限制,可以理解拿到了一块可以执行 创建线程,添加线程池,执行线程令牌

c = ctl.get(); if (runStateOf(c) != rs) : 目的是为了确保当前线程池的状态是否被修改。若被修改,则需要进入外自旋重新根据线程池状态来决定是否可以向线程池添加线程。若没有被修改,则继续内自旋去与其它提交任务的线程来抢令牌。

小结

第一部分的代码主要是判断当前线程池是否可以添加线程。 那么允许添加线程的情况有以下几种:

  • 当前线程池的状态是RUNNING, 且当前线程池中的线程数量 小于corePoolSizemaximumPoolSize
  • 当前线程池的状态是SHUTDOWN, 没有初始任务(firstTask ), 任务队列(workQueue)中有任务, 且当前线程池中的线程数量 小于corePoolSizemaximumPoolSize

第二部分

    private boolean addWorker(Runnable firstTask, boolean core) {

		//第一部分代码省略...
        retry:
        for (;;) {...}


		//第二部分 
        
        //线程worker是否启动
        boolean workerStarted = false;
        
        //线程worker是否添加
        boolean workerAdded = false;
        
        //线程worker的引用 w
        Worker w = null;
        try {
            
            //创建worker
            w = new Worker(firstTask);
            
            // 将worker的thread 赋值给t
            final Thread t = w.thread;
            
            //判断worker内的thread是否为空,这里主要是为了防止自定义的ThreadFactory有BUG
            if (t != null) {
                
                //赋值全局锁
                final ReentrantLock mainLock = this.mainLock;
                
                /**
                   获取全局锁 
                   主要目的: 1.保证锁内的操作是原子操作,不会受其它外界影响
                   			2.在线程池内部 对线程状态状态修改都需要获取这把全局锁
                **/
                mainLock.lock();
                try {
              
                    //再次校验,为了防止在上锁前,有外部线程修改了线程池的状态。
                    int rs = runStateOf(ctl.get());
                    if (rs < SHUTDOWN ||
                        (rs == SHUTDOWN && firstTask == null)) {
                        
                        //这里也是为了防止自定义的ThreadFactory有BUG
                        if (t.isAlive()) 
                            throw new IllegalThreadStateException();
                        
                        //将线程worker添加进线程池 (HashSet workers)
                        //由于前面获取到了令牌,因此这里一定会添加成功
                        workers.add(w);
                        
                        //更新线程池声明周期内的最大线程数量值
                        int s = workers.size();
                        if (s > largestPoolSize)
                            largestPoolSize = s;
                        
                        //设置worker添加成功
                        workerAdded = true;
                    }
                } finally {
                    
                    //释放全局锁
                    mainLock.unlock();
                }
                
                /**
                	条件成立: 启动线程,设置启动线程成功
                	条件不成立: 说明在上全局锁前,有外部线程修改了线程池的状态
                **/
                if (workerAdded) {
                    t.start();
                    workerStarted = true;
                }
            }
        } finally {
            
            //条件成立: 说明启动失败
            if (! workerStarted)
                
                /**
                	需要做清理工作:
                		1.释放令牌  ctl-1      
                		2.workers.remove(w)
                **/
                addWorkerFailed(w);
        }
        //返回最终的添加结果
        return workerStarted;
    }

第二部分是addWorker() 的核心部分,虽然是核心但理解起来并不难。其主要工作就是以下三步:

  • 创建线程 w = new Worker(firstTask);
  • 添加线程池 workers.add(w);
  • 执行线程 t.start();
这篇关于ThreadPoolExecutor添加线程源码解析——addWorker的文章就介绍到这儿,希望我们推荐的文章对大家有所帮助,也希望大家多多支持为之网!