Java教程

并发编程六、线程池原理解析

本文主要是介绍并发编程六、线程池原理解析,对大家解决编程问题具有一定的参考价值,需要的程序猿们随着小编来一起学习吧!

前言:

  1. 文章内容:线程与进程、线程生命周期、线程中断、线程常见问题总结
  2. 本文章内容来源于笔者学习笔记,内容可能与相关书籍内容重合
  3. 偏向于知识核心总结,非零基础学习文章,可用于知识的体系建立,核心内容复习,如有帮助,十分荣幸
  4. 相关文献:并发编程实战、计算机原理

 为何要用线程池?

  1. 创建/销毁线程,是个重资源的操作,为了避免频繁的创建和销毁线程,让创建的线程进行复用,就用了线程池
  2. 在一些需要多线程和缓冲任务的场景下,线程池能够提供缓冲和线程管理机制

在实际项目中使用线程池的场景?

  1. 根据系统的承受能力,调整线程池中工作线线程的数目,防止因为消耗过多的内存,而把服务器累趴下
  2. 定时任务系统里用线程池:在可以预计任务执行时间的情况下,可以设置每个任务错开时间执行,保证多个线程池高峰期不重合。
  3. 业务用线程池:根据内存大小和任务的大小,合理的给一个阻塞队列值,然后根据业务类型来走拒绝策略,回到正常的业务系统

线程池的七大核心参数:

  • 核心线程数:默认情况下一直存在于线程池中,即使没有任务执行
  • 线程池最大线程数:非核心线程+核心线程数总和。如果队列已经满了,则会创建新的线程,如果线程数超过了该值,会执行拒绝策略(如果是无界队列,该参数就失效了)。注意:在ThreadPoolExecutor内部,并不区分哪个线程是核心,哪个是非核心,超过了核心线程数量的线程会被销毁,那么剩下的就是核心线程了。
  • 非核心线程闲置超时时长:非核心线程处于闲置状态时间超过该值,就被销毁。设置allowCoreThreadTimeOut(true)也可作用于核心线程
  • 超时时长单位:微毫秒(NANOSECONDS)、微秒(MICROSECONDS)、毫秒(MILLSECONDS)、秒(MINUTES)
  • 阻塞队列:存放等待执行的线程任务对象
  • 线程工厂:创建线程的工厂,批量创建线程。统一在创建线程时设置一些参数。如果不指定,会新建一个默认的线程工厂
  • 拒绝处理策略:线程数量大于最大线程数就会采用拒绝处理策略
    • AbortPolicy:默认策略,丢弃任务并抛出异常
    • DiscardPolicy:丢弃新任务,但不抛异常
    • DiscardOlderstPolicy:丢弃队列中最旧的任务,在执行execute方法
    • CallerRunsPolicy:直接用当前业务线程来执行任务

线程池状态:

  • RUNNING:线程池创建后处于RUNNING状态
  • SHUTDOWN:调用shutdown方法进入该状态,线程池不接受新任务,清除空闲工作线程,等待阻塞队列中的任务完成
  • STOP:调用shutdownNow方法进入该状态,线程池不接受新任务,中断所有线程,阻塞队列中未执行的任务全部丢弃
  • TIDYING:所有任务已终止,ctl记录的任务数量为0,进入该状态,然后执行terminated函数
  • TERMINATED:执行完terminated函数后进入该状态

线程池execute执行过程:

四种常用线程池:

  • newCachedThreadPool:核心池大小为0,最大线程数是Integer.MAX_VALUE。任务创建就进入SynchronousQueue中。超时设置60s,适用于执行短时间任务,线程复用率较高,不会占用很多资源。
  • newFixedThreadPool:核心池与最大线程数大小一致,创建的都是核心线程,采用LinkedBlockingQueue大小Integer.MAX_VALUE。由于创建核心线程,没有任务时在getTask方法中会阻塞在take方法上,线程不会被回收,这时占用资源也更多。几乎不会触发拒绝策略,阻塞队列很大。CachedThreadPool因为最大线程数超大,也几乎不会触发拒绝策略。
  • newSingleThreadExecutor:有且仅有一个核心线程,使用LinkedBlockingQueue,不会创建非核心线程,任务按照先来先执行顺序,唯一线程不空闲,任务在队列中等待。
  • newScheduledThreadPool:定长线程池,支持定时及周期性任务执行。

阻塞队列:

  • ArrayBlockingQueue:数组结构有界队列,先进先出原则,新元素插入到队列尾部,获取从队列头开始获取元素
  • LinkedBlockingQueue:链表构成无界队列,默认大小Integer.MAX_VALUE,先进先出
  • DelayQueue:只有当元素指定延迟时间到了,才能从队列中获取元素,无大小限制
  • PriorityBlockingQueue:基于优先级的无界阻塞队列,生产不阻塞,消费阻塞。使用时生产数据速度必能快于消费速度,时间长会耗尽堆空间。放入元素必须实现Comparable接口,排序值高的永远排在前面
  • SynchronousQueue:没有内部容量,put一个元素必须等待take取数据,然后生产者会阻塞

手写阻塞队列:

private List<Integer> container = new ArrayList<>();
    private volatile int size;
    private volatile int capacity;
    private Lock lock = new ReentrantLock();
    //condition
    private final Condition notEmpty = lock.newCondition();
    private final Condition notFull = lock.newCondition();

    MyBlockQueue(int cap){
        this.capacity = cap;
    }
    public void put(int data){
        try{
            lock.lock();
            try {
                while (size > capacity){
                    System.out.println("队列满了");
                    notEmpty.await();
                }
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            ++size;
            container.add(data);
            notFull.signal();
        }finally {
            lock.unlock();
        }
    }
    public int take(){
        try{
            lock.lock();
            try {
                while (size==0){
                    System.out.println("队列空了");
                    notFull.await();
                }
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            --size;
            Integer ret = container.get(0);
            container.remove(0);
            notEmpty.signal();
            return ret;
        }finally {
            lock.unlock();
        }
    }

如何针对不同类型的任务来使用线程池呢?

  1. 高并发、任务执行时间短:任务耗时短,要求线程尽量少,如果线程太多,有可能出现线程切换和管理的时间,大于任务执行的时间,那效率就低了
  2. 并发不高,任务执行时间长:如果业务时间长集中在IO操作,那就是IO密集型任务,IO操作不占用CPU,所以不要让所有CPU闲下来,可以加大线程池中线程数目,让CPU处理更多业务。如果业务时间长集中在计算上,那就是计算密集型任务,线程池中线程数设置少一点,减少线程上下文切换
  3. 并发高,任务执行时间长:考虑具体任务类型,是IO密集还是CPU密集,线程池这块要控制好线程数和队列容量,不然线程池很容易满。改善任务,对任务执行时间进行优化,考虑某些数据是否可以走缓存,不重要的业务是否可以用中间件拆分解耦,异步处理+回调降低执行时间。

源码分析


带着问题看源码:

  如何在线程执行前后额外操作、线程池如何复用线程,线程池如何保证线程不会销毁,超过核心线程数的线程是如何销毁,能够实现创建的线程数达到最大线程数再将任务放入队列吗

 核心属性ctl与线程池状态

//int类型,前三位标识线程池状态,后29位标识有效线程的数量
private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));
//29位,方便ctl进行位运算使用的常量
private static final int COUNT_BITS = Integer.SIZE - 3;
//线程池的线程最大容量
private static final int CAPACITY   = (1 << COUNT_BITS) - 1;

//线程池运行状态 前三位
private static final int RUNNING    = -1 << COUNT_BITS;//运行中:111
private static final int SHUTDOWN   =  0 << COUNT_BITS;//线程被shutdown,继续执行完剩下的任务 000
private static final int STOP       =  1 << COUNT_BITS;//线程被shutdownNow,线程池停止,中断所有任务 001
private static final int TIDYING    =  2 << COUNT_BITS;//shutdown或shutdownNow后,任务都被处理完,到这个过渡状态 010
private static final int TERMINATED =  3 << COUNT_BITS;//线程池停止 011

execute:执行方法

public void execute(Runnable command) {
    //非空判断
    if (command == null)
        throw new NullPointerException();
    //获取ctl变量    
    int c = ctl.get();
    //workerCountOf():获取线程池中正在工作的线程数  判断是否小于核心数
    if (workerCountOf(c) < corePoolSize) {
        //核心线程还有,创建核心线程,并执行任务。传入true代表是核心线程
        if (addWorker(command, true))
            return;
        //添加核心线程数失败,重新获取ctl
        c = ctl.get();
    }
    //如果线程池运行状态是运行中,将任务追加到阻塞队列
    if (isRunning(c) && workQu eue.offer(command)) {
        //添加阻塞队列成功
        int recheck = ctl.get();
        //二次校验线程池状态 如果不是运行中,并移除成功,就执行拒绝策略
        if (! isRunning(recheck) && remove(command))
            reject(command);
        else if (workerCountOf(recheck) == 0)
        //如果工作线程为0,创建一个工作线程
            addWorker(null, false);
    }
    //没有核心线程且阻塞队列满了 添加有一个非核心线程 创建成功返回true,失败返回false
    else if (!addWorker(command, false))
        //添加最大线程数失败,执行拒绝策略
        reject(command);
}

addWorker():创建工作线程

private boolean addWorker(Runnable firstTask, boolean core) {
    retry:
    for (;;) {//外循环判断线程池状态
        //获取ctl 和runStateOf():获取当前线程池运行状态
        int c = ctl.get();
        int rs = runStateOf(c);
        //rs >= SHUTDOWN:说明线程池执行了shutdown或shutdownNow
        if (rs >= SHUTDOWN &&
            ! (rs == SHUTDOWN && //线程池停了
               firstTask == null && //任务为null
               ! workQueue.isEmpty())) //队列空了
            return false;//返回false,不创建工作线程
        for (;;) {//内存循环判断线程池工作线程数量
            //获取当前线程池中正在工作的线程数
            int wc = workerCountOf(c);
            //工作线程大于核心线程 或 工作线程大于最大线程 返回false,不创建工作线程
            if (wc >= CAPACITY ||
                wc >= (core ? corePoolSize : maximumPoolSize))
                return false;
            //满足可以创建工作线程的条件,CAS操作:正在工作的线程数+1。这里没有真正的创建工作线程
            if (compareAndIncrementWorkerCount(c))
                break retry;
            //重新获取ctl
            c = ctl.get();
            //CAS失败,就要判断是重新执行内循环,还是外循环。如果线程池状态改变,则执行外循环。否则执行内循环  
            if (runStateOf(c) != rs)
                continue retry;
        }
    }
    //声明两个标识 workerStarted:工作线程启动了吗  workerAdded:工作线程添加了吗
    boolean workerStarted = false;
    boolean workerAdded = false;
    Worker w = null;
    try {
        //new一个worker,并传入任务
        w = new Worker(firstTask);
        final Thread t = w.thread;
        if (t != null) {
            //创建工作线程时,通过ReentrantLock来保证多线程并发创建,获取了线程池的全局锁
            final ReentrantLock mainLock = this.mainLock;
            //上锁,shutdown、shutdownNow也需要获取全局锁
            mainLock.lock();
            try {
                int rs = runStateOf(ctl.get());
                //线程池状态是否是运行状态
                if (rs < SHUTDOWN ||
                    (rs == SHUTDOWN && firstTask == null)) {//或者线程池为shutdown并且任务是null
                    if (t.isAlive()) //线程正在运行,抛出异常。这里工作线程还没有创建和启动,不会是运行。
                        throw new IllegalThreadStateException();
                     //将工作线程追加到workers,worker是一个HaseSet<Worker>。  
                     workers.add(w);
                    int s = workers.size();
                    //如果现在工作线程数大于历史最大值,替换掉历史最大值
                    if (s > largestPoolSize)
                        largestPoolSize = s;
                    //修改工作线程添加标记为true,标识工作线程创建好了
                    workerAdded = true;
                }
            } finally {
                //释放锁
                mainLock.unlock();
            }
            //如果工作线程添加成功率
            if (workerAdded) {
                //启动线程
                t.start();
                //修改工作线程启动标识为true
                workerStarted = true;
            }
        }
    } finally {
        //如果工作线程启动失败
        if (! workerStarted)
            //补救操作
            addWorkerFailed(w);
    }
    return workerStarted;//返回工作线程的启动结果
}

Work对象:封装工作线程

  Worker继承了AQS,实现了Runnable。

private final class Worker
    extends AbstractQueuedSynchronizer
    implements Runnable
{
    private static final long serialVersionUID = 6138294804551838833L;
    //存放线程对象
    final Thread thread;
    //存放任务
    Runnable firstTask;
    volatile long completedTasks;
    Worker(Runnable firstTask) {
        setState(-1); //设置state 在runWorker之前禁止中断worker创建过程。
        this.firstTask = firstTask;
        this.thread = getThreadFactory().newThread(this);
    }
    //调用worker的start方法,实际上执行的worker中的run方法,即runWorker方法
    public void run() {
        runWorker(this);
    }

runWorker方法:

  • 实现线程执行前后额外增强:beforeExecute和afterExecute
  • 线程复用:runWorker方法中,有个while循环,会一直判断任务不为null或队列中是否有任务。有新任务就调用task.run执行。不会再去创建新线程,来达到复用的目的。

final void runWorker(Worker w) {
    //获取当前线程
    Thread wt = Thread.currentThread();
    //获取worker中具体任务
    Runnable task = w.firstTask;
    //将worker中任务设置为null
    w.firstTask = null;
    //这里unlock实际调用时aqs的release(1),释放构造方法设置的标识。代表当前线程可以被打断
    w.unlock(); 
    //设置标记为true 最后都为true表示:线程执行或获取任务过程发生异常
    boolean completedAbruptly = true;
    try {
        //如果worker中task有任务,直接执行。如果没有任务,就从队列中取任务getTask()
        while (task != null || (task = getTask()) != null) {
            //要执行任务,添加一个标记。即便shutdown也不会打断任务执行
            w.lock();
            //判断当前线程池状态以及线程状态,是否需要被中断。这时候线程池都不能执行任务了,就会执行中断
            if ((runStateAtLeast(ctl.get(), STOP) ||
                 (Thread.interrupted() &&
                  runStateAtLeast(ctl.get(), STOP))) &&
                !wt.isInterrupted())
                wt.interrupt();
            try {
                //执行任务之前执行beforeExecute:前置增强
                beforeExecute(wt, task);
                Throwable thrown = null;
                try {//这个的try是让线程执行出异常后,项目代码依旧会往后走
                    //执行任务
                    task.run();
                } catch (RuntimeException x) {
                    thrown = x; throw x;
                } catch (Error x) {
                    thrown = x; throw x;
                } catch (Throwable x) {
                    thrown = x; throw new Error(x);
                } finally {
                    //执行任务之后执行afterExecute:后置增强,这样可以让开发者可以在线程执行之前和之后添加额外处理。
                    afterExecute(task, thrown);
                }
            } finally {
                //线程执行完了,任务设置为null,任务处理完毕0
                task = null;
                //当前worker处理的任务数+1
                w.completedTasks++;
                //去掉标记
                w.unlock();
            }
        }
        //没有任务进来了,队列任务也执行完了,就是false
        completedAbruptly = false;
    } finally {
        //当任务没有了或队列中也没有任务了执行processWorkerExit。销毁线程
        processWorkerExit(w, completedAbruptly);
    }
}

getTask:

  在getTsak时,会判断工作线程数是否大于最大线程数、核心线程是否设置了可销毁、队列是否为空。来判断是否要CAS的减少工作线程数。如果线程池是非运行状态并且工作队列是空的会CAS的减少工作线程数,并销毁线程。

private Runnable getTask() {
    //设置超时标记
    boolean timedOut = false; 
    for (;;) {
        //获取运行状态
        int c = ctl.get();
        int rs = runStateOf(c);
        //如果线程池非运行状态 并且 (线程池已经停止或工作队列是空的)
        if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) {
            //CAS的减少工作线程数
            decrementWorkerCount();
            return null;//返回null
        }
        //获取工作中的线程数
        int wc = workerCountOf(c);
        //allowCoreThreadTimeOut:false核心线程空闲也存活,ture核心线程使用keepAliveTime超时等待工作
        //工作线程数大于核心线程数或核心线程可销毁,timed就是true
        boolean timed = allowCoreThreadTimeOut || wc > corePoolSize;
        //如果工作线程大于最大线程数 或者 工作线程数大于最大线程数了,或核心线程可销毁
        if ((wc > maximumPoolSize || (timed && timedOut))
            && (wc > 1 || workQueue.isEmpty())) {
            //工作线程数至少大于1,是因为timed为true代表工作队列可能还有任务,所以至少要保留一条线程执行任务
            // 或者 工作队列空了那可以直接销毁 cas减少工作线程数,成功返回null
            if (compareAndDecrementWorkerCount(c))
                return null;
            continue;//CAS失败,再次循环
        }

        try {
            //如果timed为true 则超时的去队列获取任务,否则阻塞获取
            Runnable r = timed ?
                workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :
                workQueue.take();
            if (r != null)
            //如果返回不是null,说明队列有任务
                return r;
            timedOut = true;//超时标记为true,表示超时时间到了,队列中还没有任务。继续执行循环
        } catch (InterruptedException retry) {
            timedOut = false;
        }
    }
}

processWorkerExit:

  1. 线程销毁时:会校验线程池状态试图终止线程池,然后看是否是正常退出,如果异常退出新增一个工作线程继续处理。
  2. 如果是正常退出,代表队列没任务且任务执行完了,然后判断是否设置了核心线程可销毁,设置了最小线程数为0,没有就核心线程数。
  3. 如果最小线程数为0了,但队列不为空,就保留一个线程继续执行任务

private void processWorkerExit(Worker w, boolean completedAbruptly) {
    //为true 表示线程池在执行任务或取队列任务执行过程发生了异常
    if (completedAbruptly) 
        //CAS的把工作线程数减为0
        decrementWorkerCount();
    //获取全局锁
    final ReentrantLock mainLock = this.mainLock;
    mainLock.lock();
    try {
        //线程池完成的任务数进行叠加
        completedTaskCount += w.completedTasks;
        //将线程移除出worker的set
        workers.remove(w);
    } finally {
        //释放全局锁
        mainLock.unlock();
    }
    //校验线程池的状态,试图终止线程池
    tryTerminate();
    int c = ctl.get();
    //线程池状态小于stop 即运行或shutdown状态
    if (runStateLessThan(c, STOP)) {
        //线程池是正常退出,即队列没有任务,且任务执行完了
        if (!completedAbruptly) {
            //如果设置了核心线程可销毁 则最新线程为0,否则是核心线程数
            int min = allowCoreThreadTimeOut ? 0 : corePoolSize;
            //如果min是0 并且队列不为空
            if (min == 0 && ! workQueue.isEmpty())
                //保留一个线程执行任务
                min = 1;
            if (workerCountOf(c) >= min)
                return;//如果工作线程数大于等于最小线程数,返回
        }
        //如果是异常退出走这里,创建一个线程
        //如果当前线程数量<最小最小线程数,也要创建一个线程
        addWorker(null, false);
    }
}

线程池如何实现线程复用:

  线程池对Thread做了包装,不重复调用Thread.start。而是自己有一个Runnable.run,run方法在循环里跑。跑得过程中不断检查是否有新加入的子Runnable对象,有的话就调用自己的run。相当于把一个大run把其他小run串联起来。同一个Thread可以执行不同的runnable,线程池把线程和Runnable通过队列解耦了,线程从队列中不断获取新任务

线程池的线程是如何不保证不被销毁的:

  线程池的线程都被封装成了Worker,在Worker内部会循环的从队列获取任务,如果获取的任务为空,则销毁线程,如果任务不为空,则执行任务。从队列获取任务时,如果当前线程数量没有超过核心线程数量,会阻塞式的获取任务,超过了或者设置了核心线程可超过会超时时的获取任务,超时时间为KeepAliveTime,超时时间获取不到就会返回空个,则会结束线程。即通过一个循环保证线程不会结束运行,从而保证不被销毁。

线程池是如何动态调整参数的:

  • ThreadPoolExecutor.setCorePoolSize // 修改核心线程数
  • ThreadPoolExecutor.setMaximumPoolSize // 修改最大线程数
  • ThreadPoolExecutor.setKeepAliveTime // 修改空闲线程存活时间
  • ThreadPoolExecutor.setRejectedExecutionHandler // 修改拒绝策略
  • ThreadPoolExecutor.setThreadFactory // 修改线程工厂
  通过这些方法可以设置ThreadPoolExecutor的参数,要动态调整参数。可以将相关配置配置在配置中心,启动时从配置中心读,同时监听配置中心的配置变化。可以开放controller接口,在后台可以实时调用接口从而实现配置动态化

超过核心线程数的线程是如何销毁的:

  如果超过了核心线程数,在获取任务时会超时式的从队列获取任务,如果早keepAliveTime时间后没有任务返回,会返回null,会导致while循环退出,从而走到了销毁线程的逻辑。

能够实现创建的线程数达到最大线程数再将任务放入队列吗:

  在execute方法第二步,如果workQueue.offer返回了false,就会创建新线程来执行任务。只需要重写队列的offer方法。实现如果当前线程数量没有达到最大线程数量时,返回false,走创建线程逻辑,而不是将任务放入队列。

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

这篇关于并发编程六、线程池原理解析的文章就介绍到这儿,希望我们推荐的文章对大家有所帮助,也希望大家多多支持为之网!