Java教程

Java 线程池 ThreadPoolExecutor -01

本文主要是介绍Java 线程池 ThreadPoolExecutor -01,对大家解决编程问题具有一定的参考价值,需要的程序猿们随着小编来一起学习吧!

感谢 参考:
http://www.cnblogs.com/trust-freedom/p/6681948.html
https://www.jianshu.com/p/ae67972d1156

一、为什么使用线程池

  1. 创建和销毁线程伴随着系统的开销,过于频繁的创建/销毁x线程 会很大程度上影响处理效率
  2. 线程并发数过多,抢占系统资源可能会导致阻塞
  3. 想对线程进行简单的管理

二、线程池作用

线程池作用是针对于为什么使用线程池来说的:

  1. 降低资源消耗,通过重复利用已创建的线程降低线程创建和销毁造成的消耗
  2. 提高响应速度,当任务到达时,任务可以不需要创建新的线程就能立即执行
  3. 提高线程的可管理性

三、使用 Executor ThreadPoolExecutor

3.1

Java中 ,线程池的概念是Executorz这个接口,具体实现是ThreadPollExecutor

3.2 线程池构造函数简介
  1. 5个参数
 public ThreadPoolExecutor(int corePoolSize,  int maximumPoolSize,  long keepAliveTime,  TimeUnit unit,  BlockingQueue<Runnable> workQueue)

  1. 6个参数
public ThreadPoolExecutor(int corePoolSize,  int maximumPoolSize,  long keepAliveTime,  TimeUnit unit,  BlockingQueue<Runnable> workQueue,  ThreadFactory threadFactory)

  1. 6个参数2
public ThreadPoolExecutor(int corePoolSize,  int maximumPoolSize,  long keepAliveTime,  TimeUnit unit,  BlockingQueue<Runnable> workQueue,  RejectedExecutionHandler handler)

  1. 7个参数
public ThreadPoolExecutor(int corePoolSize,  int maximumPoolSize,  long keepAliveTime,  TimeUnit unit,  BlockingQueue<Runnable> workQueue,  ThreadFactory threadFactory,  RejectedExecutionHandler handler)

参数解释:
int corePoolSize
(1)核心线程池中核心线程的最大数
(2)核心线程默认情况下会一直存活在线程池中,即使闲置好长时间
(3) 设置allowCoreThreadTimeOut = true 闲置线程到达keepAliveTime时间就会被销毁

int maximumPoolSize
(1)线程总数 线程池中线程的最大数
(2)线程总数(maximumPoolSize) = 核心线程数(corePoolSize)+ 非核心线程数

long keepAliveTime
(1)闲置线程保留时长 当线程数超过核心线程数(corePoolSize) ,如果有闲置的线程,机会被保留keepAliveTime时长 如果超过这个时长还没有任务执行 那就销毁

TimeUnit unit
(1)keepAliveTime的单位
(2)

		TimeUnit.HOURS; //小时
		TimeUnit.MINUTES;//分钟
		TimeUnit.SECONDS;//秒//等等

BlockingQueue workQueue
(1)队列 存储将被执行的任务
(2)它只保存通过execute()方法提交的任务
(3)workQueue 常见队列类型

  • SynchronousQueue (newCachedThreadPool 使用)
    点击查看推荐文章
    这个队列接受到任务时,会直接提交给线程处理,而不保留,如果所有线程在工作的话,那就新建一个线程来执行,所以为了保证线程数不达到线程总数(maximumPoolSize) ,maximumPoolSize的值被设置为Integer.MAX_VALUE
//new ThreadPoolExecutor( 第二个参数 设置为了Integer.MAX_VALUEpublic static ExecutorService newCachedThreadPool() {return new ThreadPoolExecutor(0, Integer.MAX_VALUE,  60L, TimeUnit.SECONDS,  new SynchronousQueue<Runnable>());}

  • LinkedBlockingQueue (newFixedThreadPool ,newSingleThreadExecutor 使用 )

  • DelayedWorkQueue (newScheduledThreadPool 使用 )
    队列接收到任务时先入队,只有达到指定延时时间,才会执行任务

ThreadFactory threadFactory
(1)线程池内线程的创建方式 (这个工厂类),一般不自定义 使用默认DefaultThreadFactory
(2) 如果自定义需要实现ThreadFactory 重写newThread方法

RejectedExecutionHandler handler
(1)发生异常的时候使用

3.3 ThreadPoolExecutor添加任务

execute方法

public static void main(String[] args) {
	Executors.newFixedThreadPool(10).execute(new MyRunnablee(1));}class MyRunnablee implements Runnable {int flag ;public MyRunnablee(int flag) {this.flag = flag;}public void run() {
		System.out.println(Thread.currentThread().getName() + ":执行run--"+flag);}}

/**
      执行被提交的任务 可能会创建一个新线程或者是使用已有的线程进行执行
      也可能被拒绝,如果线程池已经关闭shutdown 或者是线程池内线任务数达到最大值
     */public void execute(Runnable command) {//如果任务为null 抛出异常if (command == null)throw new NullPointerException();/*
         * 三个步骤:
         *1. 如果现有线程少于核心线程数(corePoolSize) 会试着去创建一个新的线程,
         并把当前人任务作为新线程的第一个任务去执行  
         它会调用addWorker方法 addWorker方法内会进行运行状态和线程总数的校验  
         防止在不能添加的时候添加了线程     
         * 2.如果第1步成功 还需要再次检测是否应该添加线程,因为上次添加后 现在可能已经有死亡的了 或者进入方法后线程池关闭了,所以我们重新检测状态 如果需要回滚队列,如果没有线程的话 新建一个线程 

		*3. 如果没有添加成功,我们试着去创建一个新线程 如果失败了,可能是线程池已经关闭或者已经饱和 我们就拒绝新任务的添加
         *  
         *///获取当前线程数int c = ctl.get();//如果当前线程数< 核心线程数  添加新的线程 把任务作为线程的第一个任务执行if (workerCountOf(c) < corePoolSize) {//成功if (addWorker(command, true))return;//不成功,重新获取 每次使用ctl的时候都需要重新获取 //不成功可能是因为://1. 线程池关闭了//2. 并发情况下别的线程 优先创建了worker   导致  workerCountOf(c) > corePoolSizec = ctl.get();}//第一步失败&&如果是运行状态 //把任务添加任务队列中if (isRunning(c) && workQueue.offer(command)) {//添加成功之后 再进行一次判断  int recheck = ctl.get();//如果线程不是运行状态了 需要从workQueue中删除添加的任务 人后拒绝任务if (! isRunning(recheck) && remove(command))reject(command);//如果是运行状态 或删除失败的话(有线程在执行要删除的任务)  //如果没有线程执行任务了(worker数量为0) 那么新建一个新的线程(addWorker(null, false)),任务为null  确保有线程执行任务else if (workerCountOf(recheck) == 0)addWorker(null, false);}//如果线程池不是runing状态 或者入队失败 尝试开启新线程  扩容到maxPoolSize else if (!addWorker(command, false))reject(command);}

流程:
借用别人一张图(自己还没学会怎么画)
在这里插入图片描述

  1. 如果当前线程数少于核心线程数(corePoolSize),就addWorker(command, true),如果创建成功就返回,否则执行后续
    创建失败的原因可能有:
    (1) 线程池已经关闭(shutdown) 不能再接受任务
    (2)workerCountOf© > corePoolSize ,在进行了workerCountOf© < corePoolSize 判断之后,犹豫并发原因,别的线程优先创建了worker 导致workerCountOf© > corePoolSize

  2. 如果线程是running状态将task加入workQueue队列中,如果成功进行双重校验,如果失败可能是队列已满 则执行后续步骤
    为什么要进行双重校验呢:主要是判断刚加入的task是否有线程进行执行
    (1)如果线程不是running状态,应该拒绝添加任务,把刚添加的任务从workQueue中移除
    (2)如果是running状态,或者从workQueue中移除失败(刚好有一个执行完的线程接受了这个新任务),要确保还有线程执行任务(创建一个不带任务的worker)

3.如果线程池不是running状态,或者无法入队列,尝试开启新的线程,扩容至maxPoolSize 如果添加失败了 那就拒绝任务

3.4 ThreadPoolExecutor addWorker方法
/**
*1. 检查根据给定的边界(corePoolSize 或maximumPoolSize  )
*core==true 的时候是 corePoolSize   否则是maximumPoolSize  
* 2. 如果条件符合创建一个新的workder并把任务作为线程的第一个任务执行
*/
 private boolean addWorker(Runnable firstTask, boolean core) {
		   //外层循环 判断线程池状态retry:for (;;) {int c = ctl.get();int rs = runStateOf(c);// Check if queue empty only if necessary.//1.如果状态大于 SHUTDOWN 也就是STOP  TIDYING  TERMINATED 不能添加worker//2.如果rs== SHUTDOWN  firstTask不为空  不能添加新的worker 因为SHUTDOWN的线程不能接受新任务//3.workQueue为空 不用添加新worker 因为这个新worker就是为了处理task 如果没有task 那添加有啥意义if (rs >= SHUTDOWN &&! (rs == SHUTDOWN &&   firstTask == null &&   ! workQueue.isEmpty()))return false;//内层循环 负责worker+1  +1成功之后 才会真正的new Worker然后添加到wokers中for (;;) {int wc = workerCountOf(c);//判断长度是否大于最大要求长度 如果core是true 就用corePoolSize  如果是false 就用maximumPoolSizeif (wc >= CAPACITY ||wc >= (core ? corePoolSize : maximumPoolSize))return false;//+1 如果成功 结束循环 如果成功 结束外层循环   if (compareAndIncrementWorkerCount(c))break retry;c = ctl.get();  // Re-read ctl//如果+1  不成功 并且状态改变不等于之前获取的状态 继续外层循环if (runStateOf(c) != rs)continue retry;// else CAS failed due to workerCount change; retry inner loop}}
	 
	 
	 		//========+1成功 开始创建新的Worker
	 		//worker开启状态boolean workerStarted = false;//worker添加状态boolean workerAdded = false;Worker w = null;try {//新建worker//1.设置worker  AQS同步状态state = -1//2.设置成员变量firstTask的值 第一个任务//3.利用ThreadFactory 创建一个线程  把当前worker传入构造函数 因为worker本身就继承了Runnable 我们在worker的run方法中执行runWorker() //runWorker方法也是传递当前对象进去 因为什么呢? 因为任务在当前对象的firstTask属性种存储着 到哪儿都要带着  厉害了/*Worker(Runnable firstTask) {
	                 setState(-1); // inhibit interrupts until runWorker
	                 this.firstTask = firstTask;
	                 this.thread = getThreadFactory().newThread(this);
	            }*/w = new Worker(firstTask);final Thread t = w.thread;if (t != null) {final ReentrantLock mainLock = this.mainLock;//========================获取锁==================mainLock.lock();try {// Recheck while holding lock.// Back out on ThreadFactory failure or if// shut down before lock acquired.int rs = runStateOf(ctl.get());//添加之前 还要进行判断 //1.rs 状态是小于SHUTDOWN 也就是线程池没有关闭呢  //2.状态是SHUTDOWN 并且 firstTask==null 因为SHUTDOWN的线程池不能添加新workerif (rs < SHUTDOWN ||(rs == SHUTDOWN && firstTask == null)) {//如果线程已经开启 就不能再添加worker了 胡闹吗不是 已经开启了 咋还能再添加 再开启if (t.isAlive()) // precheck that t is startablethrow new IllegalThreadStateException();//添加workers.add(w);int s = workers.size();if (s > largestPoolSize)largestPoolSize = s;//设置添加成功状态为trueworkerAdded = true;}} finally {//依旧是这样 在finally中释放锁mainLock.unlock();//=========================释放锁===============================}//如果添加成功了 那就是风风雨雨都过去了 开启线程  线程会在还行worker的run worker的run中会调用runWorkerif (workerAdded) {t.start();workerStarted = true;}}} finally {//如果没有开启成功   移除worker  worker总数-1   判断如果线程池能终止的话就终止if (! workerStarted)addWorkerFailed(w);}return workerStarted;}

执行流程:

  1. 判断当前线程池是否可以添加worker
    1.1 线程池状态大于SHUTDOWN 可能为STOP TIDYING TERMINATED 不能添加worker
    1.2 如果线程池状态为SHUTDOWN 并且任务不为null 不能添加worker 因为关闭的线程不能添加新任务
    1.3 如果workQueue为空 不用添加worker 因为添加worker是为了处理workQueue中的task 它都没有了了处理个啥

  2. worker数量+1

  3. 添加新的worker 把任务添加到worker属性中

  4. 开启woker的thread 线程
    这就有意思了 worker本身就是一个Runnable子类
    worker有一个属性是thread 而他的runnable参数用的是this 也就是worker本身 这里启动thread 实质就是开始执行worker的run方法 而run方法中调用了runWorker() runWorker方法的参数也是this 因为实际任务时worker的一个属性 所以必须传入worker

3.5 ThreadPoolExecutor Worker内部类
//内部类 worker 继承AbstractQueuedSynchronizer  Runnable//继承AbstractQueuedSynchronizer是简化执行任务时获取和释放锁 //在这里看到一个问题:为什么不直接执行execute(commond) 提交的commond 而是用worker包一下呢?//答:为了控制中断//用什么控制呢?//用AQS 锁 运行时上锁就不能中断  //worker实现了一个简单的不可重入锁  不是用ReentrantLock 可重入锁 //这里有很多东西不懂 可能需要以后回过头来 才会领悟private final class Workerextends AbstractQueuedSynchronizerimplements Runnable{/**
         * This class will never be serialized, but we provide a
         * serialVersionUID to suppress a javac warning.
         */private static final long serialVersionUID = 6138294804551838833L;/** Thread this worker is running in.  Null if factory fails. */final Thread thread;/** Initial task to run.  Possibly null. */Runnable firstTask;/** Per-thread task counter */volatile long completedTasks;/**
         * Creates with given first task and thread from ThreadFactory.
         * @param firstTask the first task (null if none)
         */Worker(Runnable firstTask) {setState(-1); // 设置状态 大于0表示锁已经被获取 this.firstTask = firstTask;//把任务给这个属性this.thread = getThreadFactory().newThread(this);//创建一个线程}//run方法public void run() {runWorker(this);}//返回是否被锁 0表示没被锁  1表示被锁protected boolean isHeldExclusively() {return getState() != 0;}//尝试获取锁 protected boolean tryAcquire(int unused) {//尝试将状态从0改变为1 每次都是由0到1不是+1  那么说明是不可 重入锁if (compareAndSetState(0, 1)) {//如果获取成功 设置exclusiveOwnerThread为当前线程 setExclusiveOwnerThread(Thread.currentThread());return true;}return false;}//尝试释放锁protected boolean tryRelease(int unused) {//设置exclusiveOwnerThread=nullsetExclusiveOwnerThread(null);//设置状态为0    state=0setState(0);return true;}//这几个方法是AbstractQueuedSynchronizer的抽象方法 需要实现  就是用这几个方法来实现AQS 不可重入锁public void lock()        { acquire(1); }//尝试获取锁public boolean tryLock()  { return tryAcquire(1); }//尝试释放锁public void unlock()      { release(1); }//判断是否被锁public boolean isLocked() { return isHeldExclusively(); }//中断 void interruptIfStarted() {Thread t;//符合 : state>0  t!=null  && t没有被中断 //worker刚创建的时候state给了-1 就是为了不让中断 机智if (getState() >= 0 && (t = thread) != null && !t.isInterrupted()) {try {t.interrupt();} catch (SecurityException ignore) {}}}}

worker类说明:

  1. new Worker
    1.1 设置state = -1 不让中断
    1.2 设置firstTask 为execute(任务) 传入的任务
    1.3 创建 线程getThreadFactory().newThread(this)

worker控制中断:
2. 初始化state = -1 此时不允许中断 只有在runWoker中将state设置为0是 才能中断
2.1 线程池shutdown的时候回获取锁tryLock 如果当前线程worker在执行 不能被中断
在这里插入图片描述
在这里插入图片描述
在这里插入图片描述
2.2 shutdownNow线程池时 不用获取锁 但是shutdownNow-》interruptIfStarted方法也有判断getState() >= 0 才能中断

  1. 为了防止在某种情况下worker被中断 runWorker每次运行任务的时候都会获取锁 这样防止其他中断获取锁而中断当前worker 使任务丢失

这里用不可重入锁 是为了在worker获取锁的情况下 不再进入一些其他需要加锁的方法

3.6 ThreadPoolExecutor Worker内部类 的 runWorker 方法

盗图:
在这里插入图片描述
这里需要注意 没有任务->processWorkerExit 不一定是说没有任务就马上会执行processWorkerExit 这就说到getTask获取任务这个方法了 如果worker总数小于核心线程数 没有指定线程闲置超时时间的话 队列会调用take 阻塞方法 也就是说worker会一直等待有任务进来 如果worker总数超过核心线程数,或者指定了allowCoreThreadTimeOut 那就会调用poll 方法 会在指定时间后返回null(如果没有获取任务)

final void runWorker(Worker w) {Thread wt = Thread.currentThread();Runnable task = w.firstTask;w.firstTask = null;//先解锁  解锁了 这个时候是可以被中断的 w.unlock(); // allow interruptsboolean completedAbruptly = true;try {//无限循环 //如果firstTask为null 获取任务 getTask方法 //不为的话空先执行firstTask//如果getTask 也为null 结束循环 销毁worker while (task != null || (task = getTask()) != null) {//获取锁 获取锁后可就不能被中断了 w.lock();//1. ctl.get() > STOP  不是 RUNNING/SHUTDOWN/STOP 中断 //或者 线程中断&&ctl.get() > STOP	             //并且 2. 线程不被中断 //中断!!! 线程 if ((runStateAtLeast(ctl.get(), STOP) || (Thread.interrupted() &&  runStateAtLeast(ctl.get(), STOP))) &&!wt.isInterrupted())wt.interrupt();try {//执行任务之前beforeExecute(wt, task);Throwable thrown = null;try {//神奇了  在这里手动调用run    这才是真正的业务逻辑 //我理解  就是为了顺序执行 执行完了 我就知道是完了 如果放到线程 让线程直接start()  这个run是否执行完很难控制 判断 task.run();} catch (RuntimeException x) {thrown = x; throw x;} catch (Error x) {thrown = x; throw x;} catch (Throwable x) {thrown = x; throw new Error(x);} finally {//执行任务之后  可以看到 这里用了thrown  这可是在catch中初始化的哦  所以说额  catch是优先于finally 执行的 哈哈 课外话了 afterExecute(task, thrown);}} finally {//任务设置为nulltask = null;//完成线程数+1w.completedTasks++;//解锁 这时候可以中断了w.unlock();}}//是不是因为用户异常终止  true是  false不是   如果是用户异常那么就在while中异常了 直接走finnaly了 就不会走completedAbruptly = false 所以completedAbruptly = false就表示 没有异常  completedAbruptly = false;} finally {processWorkerExit(w, completedAbruptly);}}

3.7 ThreadPoolExecutor 的 getTask 方法

盗图:
在这里插入图片描述

private Runnable getTask() {boolean timedOut = false; // Did the last poll() time out?//for (;;) {int c = ctl.get();int rs = runStateOf(c);// shutdown && workQueue 为null  //stop状态 (sutdownNow 会导致stop)if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) {// worker-1 decrementWorkerCount();//成功返回null return null;}int wc = workerCountOf(c);//allowCoreThreadTimeOut  允许闲置线程销毁 也就是说没有task后 不会阻塞  超时会返回null  然后worker就会被销毁 //wc > corePoolSize 大于核心线程数 跟上边逻辑一样 boolean timed = allowCoreThreadTimeOut || wc > corePoolSize;//(worker总数大于最大线程数 或者 需要超时并且从队列获取已经是null)//并且(wc>1  或者 worker没有任务)if ((wc > maximumPoolSize || (timed && timedOut))&& (wc > 1 || workQueue.isEmpty())) {if (compareAndDecrementWorkerCount(c))return null;continue;}try {//根据是否需要超时 获取任务  Runnable r = timed ?workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :workQueue.take(); //如果不为空 返回rif (r != null)return r;//如果是null  设置标志位timedOut = true timedOut = true;} catch (InterruptedException retry) {timedOut = false;}}}

3.8 ThreadPoolExecutor processWorkerExit方法

worker线程退出 销毁

 private void processWorkerExit(Worker w, boolean completedAbruptly) {   	//如果是突然中止 也就是异常了 需要这里-1    	//如果不是突然终止(没有异常) 就不需要-1了  getTask()已经-1 了 if (completedAbruptly) // If abrupt, then workerCount wasn't adjusteddecrementWorkerCount();final ReentrantLock mainLock = this.mainLock;mainLock.lock();try {//worker完成的任务数 +到线程池完成的总任务数中 completedTaskCount += w.completedTasks;//从workers中移除workers.remove(w);} finally {mainLock.unlock();}//判断线程是否满足终止条件 然后尝试终止 tryTerminate();//是否需要增加worker int c = ctl.get();//状态是 running、shutdownif (runStateLessThan(c, STOP)) {//如果是突然终止 那就可能还有任务没有被完成 if (!completedAbruptly) {int min = allowCoreThreadTimeOut ? 0 : corePoolSize;//如果min==0 就是allowCoreThreadTimeOut =true 就是不需要维护核心线程池//不需要维护核心线程池 并且任务队列不是空 if (min == 0 && ! workQueue.isEmpty())min = 1;//如果总的worker大于最小min  返回  否则创建workerif (workerCountOf(c) >= min)return; // replacement not needed}//增加新的workeraddWorker(null, false);}}

processWorkerExit流程:

  1. worker数量-1
    A、如果是突然终止,说明是task执行时异常情况导致,即run()方法执行时发生了异常,那么正在工作的worker线程数量需要-1
    B、如果不是突然终止,说明是worker线程没有task可执行了,不用-1,因为已经在getTask()方法中-1了

  2. 从Workers Set中移除worker,删除时需要上锁mainlock

  3. tryTerminate():在对线程池有负效益的操作时,都需要“尝试终止”线程池,大概逻辑:
    判断线程池是否满足终止的状态
    A、如果状态满足,但还有线程池还有线程,尝试对其发出中断响应,使其能进入退出流程
    B、没有线程了,更新状态为tidying->terminated

  4. 是否需要增加worker线程,如果线程池还没有完全终止,仍需要保持一定数量的线程
    线程池状态是running 或 shutdown
    A、如果当前线程是突然终止的,addWorker()
    B、如果当前线程不是突然终止的,但当前线程数量 < 要维护的线程数量,addWorker()
    故如果调用线程池shutdown(),直到workQueue为空前,线程池都会维持corePoolSize个线程,然后再逐渐销毁这corePoolSize个线程

这篇关于Java 线程池 ThreadPoolExecutor -01的文章就介绍到这儿,希望我们推荐的文章对大家有所帮助,也希望大家多多支持为之网!