C/C++教程

并发编程-Executor具体实现之ThreadPoolExecutor

本文主要是介绍并发编程-Executor具体实现之ThreadPoolExecutor,对大家解决编程问题具有一定的参考价值,需要的程序猿们随着小编来一起学习吧!

并发编程

Executor线程池原理和源码解析

ThreadPoolExecutor 默认线程池

线程池的创建

 public ThreadPoolExecutor(int corePoolSize,
                              int maximumPoolSize,
                              long keepAliveTime,
                              TimeUnit unit,
                              BlockingQueue<Runnable> workQueue,
                              ThreadFactory threadFactory,
                              RejectedExecutionHandler handler)                        
参数解释
corePoolSize

线程池中的核心线程数量,当提交一个任务时,就会创建一个新的线程执行当前任务。知道线程数量等于corePoolSize

maximumPoolSize

线程池允许的最大线程数量。如果当前阻塞队列满了,且继续提交任务,线程池会继续创建线程去执行新提交的任务,知道当前线程池的线程数量等于maximumPoolSize

keepAliveTime

线程池非核心线程所允许的最大空闲时间。当前线程池的线程数量大于corePoolSize时,如果没有新的任务提交,非核心线程不会立刻销毁,而是会等待keepAliveTime的时间

unit

keepAliveTime的单位

workQueue

用来保存等待被执行的任务的队列,任务必须实现Runable接口。JDK中提供如下几种阻塞队列

  1. ArrayBlockingQueue :基于数组结构的有界阻塞队列,按FIFO排序任务;
  2. LinkedBlockingQueue: 基于列表结构的阻塞队列,按FIFO排序任务,吞吐量高于ArrayBlockingQueue
  3. SynchronousQueue:一个不存储元素的阻塞队列,每个插入操作必须等到另一个线程调用移除操作,否正插入操作一直处于阻塞状态,吞吐量通常要高于LinkedBlockingQueue
  4. priorityBlockingQuene:具有优先级的无界阻塞队列;
threadFactory

线程工厂,用来创建线程。Executors.defaultThreadFactory() 来创建线程。使用默认的ThreadFactory来创建线程
时,会使新创建的线程具有相同的NORM_PRIORITY优先级并且是非守护线程,同时也设置了线程的名称。

handler

拒绝策略。当阻塞队列满了,线程池的数量等于maximumPoolSize,并且没有空闲的线程时,如果继续提交任务。线程池会就会启动拒绝策略,线程池提供了四种策略:

  1. AbortPolicy:默认策略,直接抛出异常
  2. CallerRunsPolicy: 将任务抛给提交任务的线程执行
  3. DiscardOldestPolicy:新的任务将被执行。丢弃阻塞队列中最先提交的任务
  4. DiscardPolicy:直接丢弃新提交的任务

上面的4种策略都是ThreadPoolExecutor的内部类。当然也可以根据应用场景实现RejectedExecutionHandler接口,自定义饱和策略,如记录日志或持久化存储不能处理的任务。

线程池监控

// 线程池已执行与未执行的任务总数
public long getTaskCount()

// 已完成的任务数
public long getCompletedTaskCount()

// 线程池当前线程数
public int getPoolSize()

//线程池中正在执行任务的线程数量
public int getActiveCount()

线程池原理

在这里插入图片描述

源码分析

execute 方法
public void execute(Runnable command) {
        if (command == null)
            throw new NullPointerException();
        // ctl记录着runState和workerCount
        int c = ctl.get();
        /**
        workerCountOf方法取出低29位的值,表示当前线程数;如果当前线程数
       小于corePoolSize,则新建一个线程放入线程池中,并把任务由该线程执行
        */
        if (workerCountOf(c) < corePoolSize) {
        /**
         addWorker中的第二个参数表示限制添加线程的数量是根据corePoolSize来判断还是maximumPoolSize来判断;
         如果为true,根据corePoolSize来判断;
         如果为false,则根据maximumPoolSize来判断
        */
            if (addWorker(command, true))
                return;
           // 添加失败,重新获取ctl的值
            c = ctl.get();
        }
        // 判断当前线程状态,如果当前线程是允许状态并且任务添加到队列成功
        if (isRunning(c) && workQueue.offer(command)) {
        // 重新获取ctl的值
            int recheck = ctl.get();
            // 再次判断当前线程状态,如果不是running,移除上一步添加的任务
            if (! isRunning(recheck) && remove(command))
            // 执行过后通过handler使用拒绝策略对该任务进行处理,整个方法返回
                reject(command);
            else if (workerCountOf(recheck) == 0)
             /**
              获取线程池中的有效线程数,如果数量是0,则执行addWork方法
              这里传入的参数表示
              1.null,表示在线程池中创建一个线程,但不去启动
              2.false,将线程池的有限线程数量的上限设置为maximumPoolSize,
                添加线程时根据maximumPoolSize来判断;
            */  
                addWorker(null, false);
        }
          /**
              如果执行到这里,有两种情况:
             1. 线程池已经不是RUNNING状态;
             2. 线程池是RUNNING状态,但workerCount >= corePoolSize并且workQueue已满。
              这时,再次调用addWorker方法,但第二个参数传入为false,将线程池的有限线程数量的上限设置为maximumPoolSize;
              如果失败则拒绝该任务
         */
        else if (!addWorker(command, false))
            reject(command);
    }

简单来说,在执行execute()方法时如果状态一直是RUNNING时,的执行过程如下:

  1. 如果workerCount < corePoolSize,则创建并启动一个线程来执行新提交的任务;
  2. 如果workerCount >= corePoolSize,且线程池内的阻塞队列未满,则将任务添加到该阻塞队列中;
  3. 如果workerCount >= corePoolSize && workerCount < maximumPoolSize,且线程池内的阻塞队列已满,则创建并启动一个线程来执行新提交的任务;
  4. 如果workerCount >= maximumPoolSize,并且线程池内的阻塞队列已满, 则根据拒绝策略来处理该任务, 默认的处理方式是直接抛异常。

这里要注意一下addWorker(null, false);,也就是创建一个线程,但并没有传入任务,因为任务已经被添加到workQueue中了,所以worker在执行的时候,会直接从workQueue中获取任务。所以,在workerCountOf(recheck) == 0时执行addWorker(null, false);也是为了保证线程池在RUNNING状态下必须要有一个线程来执行任务。
execute方法执行流程如下:
在这里插入图片描述

addWorker方法

addWorker方法的主要工作是在线程池中创建一个新的线程并执行,firstTask参数用于指定新增的线程执行的第一个任务,core参数为true 表示在新增线程时会判断当前活动线程数是否少于corePoolSize。false表示新增线程前需要判断当前活动线程数是否少于maximumPoolSize。代码如下

 private boolean addWorker(Runnable firstTask, boolean core) {
        retry:
        for (;;) {
            int c = ctl.get();
            // 获取线程池状态
            int rs = runStateOf(c);

            /* 如果rs>=shutdown,则表示此时不再接收新任务
              接着判断以下3个条件,只要一个不满足,就返回false
              1.rs==SHUDOWN ,这时表示关闭状态,不在接收新任务提交,阻塞队列中已经保存的任务可以继续执行
              2.当前提交的任务为空
              3.阻塞队列不为空
            */
            if (rs >= SHUTDOWN &&
                ! (rs == SHUTDOWN &&
                   firstTask == null &&
                   ! workQueue.isEmpty()))
                return false;

            for (;;) {
                int wc = workerCountOf(c);
                /** wc表示当前线程数
                wc >= CAPACITY,也就是ctl的低29位的最大值(二进制是29个一),返回false
                这里的core是addWorker方法的第二个参数,如果是true,则用corePoolSize来进行比较
                false则用maximumPoolSize来进行比较
                */
                if (wc >= CAPACITY ||
                    wc >= (core ? corePoolSize : maximumPoolSize))
                    return false;
                    // 尝试增加workerCount,如果成功,则跳出第一个for循环
                if (compareAndIncrementWorkerCount(c))
                    break retry;
                c = ctl.get();  // Re-read ctl
                // 如果当前的允许状态不等于rs,说明状态已被改变,返回第一个for循环继续执行
                if (runStateOf(c) != rs)
                    continue retry;
                // else CAS failed due to workerCount change; retry inner loop
            }
        }

        boolean workerStarted = false;
        boolean workerAdded = false;
        Worker w = null;
        try {
        // 根据firstTask来创建worker对象
            w = new Worker(firstTask);
            // 每一个firstTask对象都会创建一个线程
            final Thread t = w.thread;
            if (t != null) {
                final ReentrantLock mainLock = this.mainLock;
                mainLock.lock();
                try {
                    int rs = runStateOf(ctl.get());
                // rs < SHUTDOWN表示是RUNNING状态
                //如果is是RUNNING状态或者rs是SHUTDOWN状态并且firstTask为null,向线程池中添加线程
                // 因为在SHUTDOWN时不会添加新的任务,但是还会执行workQueue在的任务
                    if (rs < SHUTDOWN ||
                        (rs == SHUTDOWN && firstTask == null)) {
                        if (t.isAlive()) // precheck that t is startable
                            throw new IllegalThreadStateException();
                            // workers是一个hashSet
                        workers.add(w);
                        int s = workers.size();
                        // largestPoolSize记录着线程池中出现过的最大线程数量
                        if (s > largestPoolSize)
                            largestPoolSize = s;
                        workerAdded = true;
                    }
                } finally {
                    mainLock.unlock();
                }
                if (workerAdded) {
                // 启动线程
                    t.start();
                    workerStarted = true;
                }
            }
        } finally {
            if (! workerStarted)
                addWorkerFailed(w);
        }
        return workerStarted;
    }

线程池中每一个线程都被封装成一个worker对象,ThreadPool维护的其实就是一组Worker对象。
Worker类继承了AQS,并实现了Runnable接口。注意其中的firstTask和Thread属性:

  1. firstTask用来保存传入的任务;
  2. Thread是在调用构造方法时通过ThreadFactory来创建的线程。是用来处理任务的线程

在调用构造方法时,需要把任务传入,这里通过getThreadFactory().newThread(this)来新建一个线程。newThread方法传入的参数是this,因为Worker本身继承了Runnable接口,也就是一个线程。所以一个Worker对象在启动的时候会调用Worker类中的run方法
Worker继承了AQS,使用AQS来实现独占锁的功能。为什么不使用ReenTrantLock来实现呢?可以看到tryAcquire方法,他是不允许重入的,而ReentrantLock是允许重入的:

  1. lock方法一旦获取了独占锁,表示当前线程正在执行任务中
  2. 如果正在执行任务,则不应该中断线程;
  3. 如果该线程现在不是独占锁的状态,也就是空闲的状态,说明它没有任务处理,这时可以对该线程进行中断
  4. 线程池在执行shutdown方法或tryTerminate方法时,会调用interruptldWorkers方法来中断空闲的线程,interruptldWorkers方法会使用tryLocj方法来判断线程池是否时空闲状态
  5. 之所以设置为不可重入,是因为我们不希望任务在调用像setCorePoolSize这样的线程池控制方法时重新获取锁。如果使用ReenTrantLock,它是可重入的,这样如果在任务调用了如setCorePoolSize这类线程池控制的方法,会中断正在运行的线程

所以,Worker继承自AQS,用于判断线程是否空闲以及是否可以被中断。

此外,在构造方法中执行了setState(-1);,把state变量设置为-1,为什么这么做呢?
是因为AQS中默认的state是0,如果刚创建了一个Worker对象,还没有执行任务时,这时
就不应该被中断,看一下tryAquire方法:

protected boolean tryAcquire(int unused) {
 //cas修改state,不可重入
 if (compareAndSetState(0, 1)) {
     setExclusiveOwnerThread(Thread.currentThread());
    return true;
   }
 return false;
 }

tryAcquire方法是根据state是否是0来判断的,所以,setState(-1);将state设置为-1是为了禁止在执行任务前对线程进行中断。正因为如此,在runWorker方法中会先调用Worker对象的unlock方法将state设置为0

runWorker方法

在worker类中的run方法调用了runWorker方法来执行任务,runWoker方法的代码如下:

final void runWorker(Worker w) {
        Thread wt = Thread.currentThread();
        Runnable task = w.firstTask;
        w.firstTask = null;
        w.unlock(); // allow interrupts
        // 是否因为异常退出循环
        boolean completedAbruptly = true;
        try {
        // 如果task为空,则通过getTask来获取任务
            while (task != null || (task = getTask()) != null) {
                w.lock();
             
                //如果线程池正在停止,那么要保证当前线程是中断状态;
              // 如果不是的话,则要保证当前线程不是中断状态;
                if ((runStateAtLeast(ctl.get(), STOP) ||
                     (Thread.interrupted() &&
                      runStateAtLeast(ctl.get(), STOP))) &&
                    !wt.isInterrupted())
                    wt.interrupt();
                try {
                    beforeExecute(wt, task);
                    Throwable thrown = null;
                    try {
                        task.run();
                    } catch (RuntimeException x) {
                        thrown = x; throw x;
                    } catch (Error x) {
                        thrown = x; throw x;
                    } catch (Throwable x) {
                        thrown = x; throw new Error(x);
                    } finally {
                        afterExecute(task, thrown);
                    }
                } finally {
                    task = null;
                    w.completedTasks++;
                    w.unlock();
                }
            }
            completedAbruptly = false;
        } finally {
            processWorkerExit(w, completedAbruptly);
        }
    }

runWorker方法的执行过程

  1. while循环不断的通过getTask()方法来获取任务;
  2. getTask()方法从阻塞队列中获取任务;
  3. 如果线程正在停止,那么要保证当前线程是中断状态,否则要保证当当前线程不是中断状态
  4. 调用task.run()执行实际的任务逻辑
  5. 如果task为null则跳出循环,执行processWorkerExit()方法
  6. runWoker方法执行完毕,也代表着Worker中的run方法执行完毕,线程销毁

这里的beforeExecute方法和afterExecute方法在ThreadPoolExecutor类中是空的,留给子类来实现
completedAbruptly变量来表示在执行任务过程中是否出现了异常,在processWorkerExit方法中会对该变量的值进行判断

getTask方法

getTask方法是用来从阻塞队列中获取任务,代码如下

 private Runnable getTask() {
       // timeOut变量的值表示上次从阻塞队列中取任务时是否超时
        boolean timedOut = false; // Did the last poll() time out?

        for (;;) {
            int c = ctl.get();
            int rs = runStateOf(c);
            /**
              如果线程池状态rs>=SHUTDOWN,也就是非RUNNING状态,再进行一下判断:
              1. rs >=SOP,线程池是否正在STOP
              2. 阻塞队列是否为空
              如果以上条件都满足,则将workerCount减1并返回null
            */
            if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) {
                decrementWorkerCount();
                return null;
            }

            int wc = workerCountOf(c);

            /**
              timed变量用于判断是否需要进行超时控制。
              allowCoreThreadTimeOut 默认为false,也就是核心线程不允许进行超时
              wc > corePoolSize,表示当前线程池中的线程数量大于核心线程数量,对于超过核心线程数量的这些
              线程,需要进行超时控制
            */
            boolean timed = allowCoreThreadTimeOut || wc > corePoolSize;
            /**
              wc > maximumPoolSize的情况是因为可能在此方法执行阶段同时的执行了setMaximumPoolSize方法;
              timed和timeOut如果为true,表示当前操作需要进行超时控制,并且上次从阻塞队列中获取任务发生了超时
              接下来判断,如果有效线程数量大于1,或者阻塞队列是空的,那么尝试将workerCount减1
              如果减1失败,则返回重试
              如果wc == 1 时,也就是说明当前线程是线程池中唯一的一个线程了
            */
            if ((wc > maximumPoolSize || (timed && timedOut))
                && (wc > 1 || workQueue.isEmpty())) {
                if (compareAndDecrementWorkerCount(c))
                    return null;
                continue;
            }
     
            try {
            //根据timed来判断,如果为true,则通过阻塞队列的pol方法进行超时控制,
            //如果keepAliveTime时间内没有获取到任务。则返回null;
            // 否则通过take方法,如果这时队列为空,则take方法会阻塞知道队列不为空
                Runnable r = timed ?
                    workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :
                    workQueue.take();
                if (r != null)
                    return r;
                    // 如果r == null 说明已经超时,timedOut设置为true
                timedOut = true;
            } catch (InterruptedException retry) {
            // 如果获取任务时当前线程发生了中断,则设置timeOut为false并返回,循环重试
                timedOut = false;
            }
        }
    }

这里重要的地方是第二个if判断,目的是控制线程池的有效线程数量。由上文中的分析可以知道,在执行execute方法时,如果当前线程池的线程数量超过了corePoolSize且小于maximumPoolSize,并且workQueue已满时,则可以增加工作线程,但这时如果超时没有获取到任务,也就是timedOut为true的情况,说明workQueue已经为空了,也就说明了当前线程池中不需要那么多线程来执行任务了,可以把多于corePoolSize数量的线程销毁掉,保持线程数在corePoolSize即可。
什么时候会销毁?当然是runWorker方法执行完之后,也就是Worker中的run方法执行完,由JVM自动回收。getTask 方法返回null 时, 在runWorker 方法中会跳出while 循环, 然后会执行processWorkerExit方法

processWokerExit方法
 private void processWorkerExit(Worker w, boolean completedAbruptly) {
 // 如果completedAbruptly为true,则说明线程之执行出现了异常,需要将workerCount减1
 // 如果没有异常,说明getTask()方法已经对workerCount进行了减1的操作,这里就没必要再减了
        if (completedAbruptly) // If abrupt, then workerCount wasn't adjusted
            decrementWorkerCount();

        final ReentrantLock mainLock = this.mainLock;
        mainLock.lock();
        try {
            // 统计完成的任务数
            completedTaskCount += w.completedTasks;
            // 从workers中移除,也就表示着从线程池中移除了一个工作线程
            workers.remove(w);
        } finally {
            mainLock.unlock();
        }
       // 根据线程池状态进行判断是否结束线程池
        tryTerminate();

        int c = ctl.get();
        /*
            当线程池是RUNNING或SHUTDOWN状态时,如果worker是异常结束,那么会直接addWorker;
            如果allowCoreThreadTimeOut=true,并且等待队列有任务,至少保留一个worker;
            如果allowCoreThreadTimeOut=false,workerCount不少于corePoolSize
 */
        if (runStateLessThan(c, STOP)) {
            if (!completedAbruptly) {
                int min = allowCoreThreadTimeOut ? 0 : corePoolSize;
                if (min == 0 && ! workQueue.isEmpty())
                    min = 1;
                if (workerCountOf(c) >= min)
                    return; // replacement not needed
            }
            addWorker(null, false);
        }
    }

至此,processWorkerExit执行完之后,工作线程被销毁,以上就是整个工作线程的生命周期,从execute方法开始,Worker使用ThreadFactory创建新的工作线程,runWorker通过getTask获取任务,然后执行任务,如果getTask返回null,进入processWorkerExit方法,整个线程结束,如图所示:
在这里插入图片描述
这个小节我们讲解了ThreadPoolExecutor 的创建方式以及通过解读源码的方式了解了ThreadPoolExecutor 的工作原理,下一小节我们将讲解Executor的另一个实现类ScheduledThreadPoolExecutor 定时线程池

这篇关于并发编程-Executor具体实现之ThreadPoolExecutor的文章就介绍到这儿,希望我们推荐的文章对大家有所帮助,也希望大家多多支持为之网!