感谢 参考:
http://www.cnblogs.com/trust-freedom/p/6681948.html
https://www.jianshu.com/p/ae67972d1156
线程池作用是针对于为什么使用线程池来说的:
Java中 ,线程池的概念是Executorz这个接口,具体实现是ThreadPollExecutor
public ThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue)
public ThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue, ThreadFactory threadFactory)
public ThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue, RejectedExecutionHandler handler)
public ThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue, ThreadFactory threadFactory, RejectedExecutionHandler handler)
参数解释:
int corePoolSize
(1)核心线程池中核心线程的最大数
(2)核心线程默认情况下会一直存活在线程池中,即使闲置好长时间
(3) 设置allowCoreThreadTimeOut = true 闲置线程到达keepAliveTime时间就会被销毁
int maximumPoolSize
(1)线程总数 线程池中线程的最大数
(2)线程总数(maximumPoolSize) = 核心线程数(corePoolSize)+ 非核心线程数
long keepAliveTime
(1)闲置线程保留时长 当线程数超过核心线程数(corePoolSize) ,如果有闲置的线程,机会被保留keepAliveTime时长 如果超过这个时长还没有任务执行 那就销毁
TimeUnit unit
(1)keepAliveTime的单位
(2)
TimeUnit.HOURS; //小时 TimeUnit.MINUTES;//分钟 TimeUnit.SECONDS;//秒//等等
BlockingQueue workQueue
(1)队列 存储将被执行的任务
(2)它只保存通过execute()方法提交的任务
(3)workQueue 常见队列类型
//new ThreadPoolExecutor( 第二个参数 设置为了Integer.MAX_VALUEpublic static ExecutorService newCachedThreadPool() {return new ThreadPoolExecutor(0, Integer.MAX_VALUE, 60L, TimeUnit.SECONDS, new SynchronousQueue<Runnable>());}
LinkedBlockingQueue (newFixedThreadPool ,newSingleThreadExecutor 使用 )
DelayedWorkQueue (newScheduledThreadPool 使用 )
队列接收到任务时先入队,只有达到指定延时时间,才会执行任务
ThreadFactory threadFactory
(1)线程池内线程的创建方式 (这个工厂类),一般不自定义 使用默认DefaultThreadFactory
(2) 如果自定义需要实现ThreadFactory 重写newThread方法
RejectedExecutionHandler handler
(1)发生异常的时候使用
execute方法
public static void main(String[] args) { Executors.newFixedThreadPool(10).execute(new MyRunnablee(1));}class MyRunnablee implements Runnable {int flag ;public MyRunnablee(int flag) {this.flag = flag;}public void run() { System.out.println(Thread.currentThread().getName() + ":执行run--"+flag);}}
/** 执行被提交的任务 可能会创建一个新线程或者是使用已有的线程进行执行 也可能被拒绝,如果线程池已经关闭shutdown 或者是线程池内线任务数达到最大值 */public void execute(Runnable command) {//如果任务为null 抛出异常if (command == null)throw new NullPointerException();/* * 三个步骤: *1. 如果现有线程少于核心线程数(corePoolSize) 会试着去创建一个新的线程, 并把当前人任务作为新线程的第一个任务去执行 它会调用addWorker方法 addWorker方法内会进行运行状态和线程总数的校验 防止在不能添加的时候添加了线程 * 2.如果第1步成功 还需要再次检测是否应该添加线程,因为上次添加后 现在可能已经有死亡的了 或者进入方法后线程池关闭了,所以我们重新检测状态 如果需要回滚队列,如果没有线程的话 新建一个线程 *3. 如果没有添加成功,我们试着去创建一个新线程 如果失败了,可能是线程池已经关闭或者已经饱和 我们就拒绝新任务的添加 * *///获取当前线程数int c = ctl.get();//如果当前线程数< 核心线程数 添加新的线程 把任务作为线程的第一个任务执行if (workerCountOf(c) < corePoolSize) {//成功if (addWorker(command, true))return;//不成功,重新获取 每次使用ctl的时候都需要重新获取 //不成功可能是因为://1. 线程池关闭了//2. 并发情况下别的线程 优先创建了worker 导致 workerCountOf(c) > corePoolSizec = ctl.get();}//第一步失败&&如果是运行状态 //把任务添加任务队列中if (isRunning(c) && workQueue.offer(command)) {//添加成功之后 再进行一次判断 int recheck = ctl.get();//如果线程不是运行状态了 需要从workQueue中删除添加的任务 人后拒绝任务if (! isRunning(recheck) && remove(command))reject(command);//如果是运行状态 或删除失败的话(有线程在执行要删除的任务) //如果没有线程执行任务了(worker数量为0) 那么新建一个新的线程(addWorker(null, false)),任务为null 确保有线程执行任务else if (workerCountOf(recheck) == 0)addWorker(null, false);}//如果线程池不是runing状态 或者入队失败 尝试开启新线程 扩容到maxPoolSize else if (!addWorker(command, false))reject(command);}
流程:
借用别人一张图(自己还没学会怎么画)
如果当前线程数少于核心线程数(corePoolSize),就addWorker(command, true),如果创建成功就返回,否则执行后续
创建失败的原因可能有:
(1) 线程池已经关闭(shutdown) 不能再接受任务
(2)workerCountOf© > corePoolSize ,在进行了workerCountOf© < corePoolSize 判断之后,犹豫并发原因,别的线程优先创建了worker 导致workerCountOf© > corePoolSize
如果线程是running状态将task加入workQueue队列中,如果成功进行双重校验,如果失败可能是队列已满 则执行后续步骤
为什么要进行双重校验呢:主要是判断刚加入的task是否有线程进行执行
(1)如果线程不是running状态,应该拒绝添加任务,把刚添加的任务从workQueue中移除
(2)如果是running状态,或者从workQueue中移除失败(刚好有一个执行完的线程接受了这个新任务),要确保还有线程执行任务(创建一个不带任务的worker)
3.如果线程池不是running状态,或者无法入队列,尝试开启新的线程,扩容至maxPoolSize 如果添加失败了 那就拒绝任务
/** *1. 检查根据给定的边界(corePoolSize 或maximumPoolSize ) *core==true 的时候是 corePoolSize 否则是maximumPoolSize * 2. 如果条件符合创建一个新的workder并把任务作为线程的第一个任务执行 */ private boolean addWorker(Runnable firstTask, boolean core) { //外层循环 判断线程池状态retry:for (;;) {int c = ctl.get();int rs = runStateOf(c);// Check if queue empty only if necessary.//1.如果状态大于 SHUTDOWN 也就是STOP TIDYING TERMINATED 不能添加worker//2.如果rs== SHUTDOWN firstTask不为空 不能添加新的worker 因为SHUTDOWN的线程不能接受新任务//3.workQueue为空 不用添加新worker 因为这个新worker就是为了处理task 如果没有task 那添加有啥意义if (rs >= SHUTDOWN &&! (rs == SHUTDOWN && firstTask == null && ! workQueue.isEmpty()))return false;//内层循环 负责worker+1 +1成功之后 才会真正的new Worker然后添加到wokers中for (;;) {int wc = workerCountOf(c);//判断长度是否大于最大要求长度 如果core是true 就用corePoolSize 如果是false 就用maximumPoolSizeif (wc >= CAPACITY ||wc >= (core ? corePoolSize : maximumPoolSize))return false;//+1 如果成功 结束循环 如果成功 结束外层循环 if (compareAndIncrementWorkerCount(c))break retry;c = ctl.get(); // Re-read ctl//如果+1 不成功 并且状态改变不等于之前获取的状态 继续外层循环if (runStateOf(c) != rs)continue retry;// else CAS failed due to workerCount change; retry inner loop}} //========+1成功 开始创建新的Worker //worker开启状态boolean workerStarted = false;//worker添加状态boolean workerAdded = false;Worker w = null;try {//新建worker//1.设置worker AQS同步状态state = -1//2.设置成员变量firstTask的值 第一个任务//3.利用ThreadFactory 创建一个线程 把当前worker传入构造函数 因为worker本身就继承了Runnable 我们在worker的run方法中执行runWorker() //runWorker方法也是传递当前对象进去 因为什么呢? 因为任务在当前对象的firstTask属性种存储着 到哪儿都要带着 厉害了/*Worker(Runnable firstTask) { setState(-1); // inhibit interrupts until runWorker this.firstTask = firstTask; this.thread = getThreadFactory().newThread(this); }*/w = new Worker(firstTask);final Thread t = w.thread;if (t != null) {final ReentrantLock mainLock = this.mainLock;//========================获取锁==================mainLock.lock();try {// Recheck while holding lock.// Back out on ThreadFactory failure or if// shut down before lock acquired.int rs = runStateOf(ctl.get());//添加之前 还要进行判断 //1.rs 状态是小于SHUTDOWN 也就是线程池没有关闭呢 //2.状态是SHUTDOWN 并且 firstTask==null 因为SHUTDOWN的线程池不能添加新workerif (rs < SHUTDOWN ||(rs == SHUTDOWN && firstTask == null)) {//如果线程已经开启 就不能再添加worker了 胡闹吗不是 已经开启了 咋还能再添加 再开启if (t.isAlive()) // precheck that t is startablethrow new IllegalThreadStateException();//添加workers.add(w);int s = workers.size();if (s > largestPoolSize)largestPoolSize = s;//设置添加成功状态为trueworkerAdded = true;}} finally {//依旧是这样 在finally中释放锁mainLock.unlock();//=========================释放锁===============================}//如果添加成功了 那就是风风雨雨都过去了 开启线程 线程会在还行worker的run worker的run中会调用runWorkerif (workerAdded) {t.start();workerStarted = true;}}} finally {//如果没有开启成功 移除worker worker总数-1 判断如果线程池能终止的话就终止if (! workerStarted)addWorkerFailed(w);}return workerStarted;}
执行流程:
判断当前线程池是否可以添加worker
1.1 线程池状态大于SHUTDOWN 可能为STOP TIDYING TERMINATED 不能添加worker
1.2 如果线程池状态为SHUTDOWN 并且任务不为null 不能添加worker 因为关闭的线程不能添加新任务
1.3 如果workQueue为空 不用添加worker 因为添加worker是为了处理workQueue中的task 它都没有了了处理个啥
worker数量+1
添加新的worker 把任务添加到worker属性中
开启woker的thread 线程
这就有意思了 worker本身就是一个Runnable子类
worker有一个属性是thread 而他的runnable参数用的是this 也就是worker本身 这里启动thread 实质就是开始执行worker的run方法 而run方法中调用了runWorker() runWorker方法的参数也是this 因为实际任务时worker的一个属性 所以必须传入worker
//内部类 worker 继承AbstractQueuedSynchronizer Runnable//继承AbstractQueuedSynchronizer是简化执行任务时获取和释放锁 //在这里看到一个问题:为什么不直接执行execute(commond) 提交的commond 而是用worker包一下呢?//答:为了控制中断//用什么控制呢?//用AQS 锁 运行时上锁就不能中断 //worker实现了一个简单的不可重入锁 不是用ReentrantLock 可重入锁 //这里有很多东西不懂 可能需要以后回过头来 才会领悟private final class Workerextends AbstractQueuedSynchronizerimplements Runnable{/** * This class will never be serialized, but we provide a * serialVersionUID to suppress a javac warning. */private static final long serialVersionUID = 6138294804551838833L;/** Thread this worker is running in. Null if factory fails. */final Thread thread;/** Initial task to run. Possibly null. */Runnable firstTask;/** Per-thread task counter */volatile long completedTasks;/** * Creates with given first task and thread from ThreadFactory. * @param firstTask the first task (null if none) */Worker(Runnable firstTask) {setState(-1); // 设置状态 大于0表示锁已经被获取 this.firstTask = firstTask;//把任务给这个属性this.thread = getThreadFactory().newThread(this);//创建一个线程}//run方法public void run() {runWorker(this);}//返回是否被锁 0表示没被锁 1表示被锁protected boolean isHeldExclusively() {return getState() != 0;}//尝试获取锁 protected boolean tryAcquire(int unused) {//尝试将状态从0改变为1 每次都是由0到1不是+1 那么说明是不可 重入锁if (compareAndSetState(0, 1)) {//如果获取成功 设置exclusiveOwnerThread为当前线程 setExclusiveOwnerThread(Thread.currentThread());return true;}return false;}//尝试释放锁protected boolean tryRelease(int unused) {//设置exclusiveOwnerThread=nullsetExclusiveOwnerThread(null);//设置状态为0 state=0setState(0);return true;}//这几个方法是AbstractQueuedSynchronizer的抽象方法 需要实现 就是用这几个方法来实现AQS 不可重入锁public void lock() { acquire(1); }//尝试获取锁public boolean tryLock() { return tryAcquire(1); }//尝试释放锁public void unlock() { release(1); }//判断是否被锁public boolean isLocked() { return isHeldExclusively(); }//中断 void interruptIfStarted() {Thread t;//符合 : state>0 t!=null && t没有被中断 //worker刚创建的时候state给了-1 就是为了不让中断 机智if (getState() >= 0 && (t = thread) != null && !t.isInterrupted()) {try {t.interrupt();} catch (SecurityException ignore) {}}}}
worker类说明:
worker控制中断:
2. 初始化state = -1 此时不允许中断 只有在runWoker中将state设置为0是 才能中断
2.1 线程池shutdown的时候回获取锁tryLock 如果当前线程worker在执行 不能被中断
2.2 shutdownNow线程池时 不用获取锁 但是shutdownNow-》interruptIfStarted方法也有判断getState() >= 0 才能中断
这里用不可重入锁 是为了在worker获取锁的情况下 不再进入一些其他需要加锁的方法
盗图:
这里需要注意 没有任务->processWorkerExit 不一定是说没有任务就马上会执行processWorkerExit 这就说到getTask获取任务这个方法了 如果worker总数小于核心线程数 没有指定线程闲置超时时间的话 队列会调用take 阻塞方法 也就是说worker会一直等待有任务进来 如果worker总数超过核心线程数,或者指定了allowCoreThreadTimeOut 那就会调用poll 方法 会在指定时间后返回null(如果没有获取任务)
final void runWorker(Worker w) {Thread wt = Thread.currentThread();Runnable task = w.firstTask;w.firstTask = null;//先解锁 解锁了 这个时候是可以被中断的 w.unlock(); // allow interruptsboolean completedAbruptly = true;try {//无限循环 //如果firstTask为null 获取任务 getTask方法 //不为的话空先执行firstTask//如果getTask 也为null 结束循环 销毁worker while (task != null || (task = getTask()) != null) {//获取锁 获取锁后可就不能被中断了 w.lock();//1. ctl.get() > STOP 不是 RUNNING/SHUTDOWN/STOP 中断 //或者 线程中断&&ctl.get() > STOP //并且 2. 线程不被中断 //中断!!! 线程 if ((runStateAtLeast(ctl.get(), STOP) || (Thread.interrupted() && runStateAtLeast(ctl.get(), STOP))) &&!wt.isInterrupted())wt.interrupt();try {//执行任务之前beforeExecute(wt, task);Throwable thrown = null;try {//神奇了 在这里手动调用run 这才是真正的业务逻辑 //我理解 就是为了顺序执行 执行完了 我就知道是完了 如果放到线程 让线程直接start() 这个run是否执行完很难控制 判断 task.run();} catch (RuntimeException x) {thrown = x; throw x;} catch (Error x) {thrown = x; throw x;} catch (Throwable x) {thrown = x; throw new Error(x);} finally {//执行任务之后 可以看到 这里用了thrown 这可是在catch中初始化的哦 所以说额 catch是优先于finally 执行的 哈哈 课外话了 afterExecute(task, thrown);}} finally {//任务设置为nulltask = null;//完成线程数+1w.completedTasks++;//解锁 这时候可以中断了w.unlock();}}//是不是因为用户异常终止 true是 false不是 如果是用户异常那么就在while中异常了 直接走finnaly了 就不会走completedAbruptly = false 所以completedAbruptly = false就表示 没有异常 completedAbruptly = false;} finally {processWorkerExit(w, completedAbruptly);}}
盗图:
private Runnable getTask() {boolean timedOut = false; // Did the last poll() time out?//for (;;) {int c = ctl.get();int rs = runStateOf(c);// shutdown && workQueue 为null //stop状态 (sutdownNow 会导致stop)if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) {// worker-1 decrementWorkerCount();//成功返回null return null;}int wc = workerCountOf(c);//allowCoreThreadTimeOut 允许闲置线程销毁 也就是说没有task后 不会阻塞 超时会返回null 然后worker就会被销毁 //wc > corePoolSize 大于核心线程数 跟上边逻辑一样 boolean timed = allowCoreThreadTimeOut || wc > corePoolSize;//(worker总数大于最大线程数 或者 需要超时并且从队列获取已经是null)//并且(wc>1 或者 worker没有任务)if ((wc > maximumPoolSize || (timed && timedOut))&& (wc > 1 || workQueue.isEmpty())) {if (compareAndDecrementWorkerCount(c))return null;continue;}try {//根据是否需要超时 获取任务 Runnable r = timed ?workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :workQueue.take(); //如果不为空 返回rif (r != null)return r;//如果是null 设置标志位timedOut = true timedOut = true;} catch (InterruptedException retry) {timedOut = false;}}}
worker线程退出 销毁
private void processWorkerExit(Worker w, boolean completedAbruptly) { //如果是突然中止 也就是异常了 需要这里-1 //如果不是突然终止(没有异常) 就不需要-1了 getTask()已经-1 了 if (completedAbruptly) // If abrupt, then workerCount wasn't adjusteddecrementWorkerCount();final ReentrantLock mainLock = this.mainLock;mainLock.lock();try {//worker完成的任务数 +到线程池完成的总任务数中 completedTaskCount += w.completedTasks;//从workers中移除workers.remove(w);} finally {mainLock.unlock();}//判断线程是否满足终止条件 然后尝试终止 tryTerminate();//是否需要增加worker int c = ctl.get();//状态是 running、shutdownif (runStateLessThan(c, STOP)) {//如果是突然终止 那就可能还有任务没有被完成 if (!completedAbruptly) {int min = allowCoreThreadTimeOut ? 0 : corePoolSize;//如果min==0 就是allowCoreThreadTimeOut =true 就是不需要维护核心线程池//不需要维护核心线程池 并且任务队列不是空 if (min == 0 && ! workQueue.isEmpty())min = 1;//如果总的worker大于最小min 返回 否则创建workerif (workerCountOf(c) >= min)return; // replacement not needed}//增加新的workeraddWorker(null, false);}}
processWorkerExit流程:
worker数量-1
A、如果是突然终止,说明是task执行时异常情况导致,即run()方法执行时发生了异常,那么正在工作的worker线程数量需要-1
B、如果不是突然终止,说明是worker线程没有task可执行了,不用-1,因为已经在getTask()方法中-1了
从Workers Set中移除worker,删除时需要上锁mainlock
tryTerminate():在对线程池有负效益的操作时,都需要“尝试终止”线程池,大概逻辑:
判断线程池是否满足终止的状态
A、如果状态满足,但还有线程池还有线程,尝试对其发出中断响应,使其能进入退出流程
B、没有线程了,更新状态为tidying->terminated
是否需要增加worker线程,如果线程池还没有完全终止,仍需要保持一定数量的线程
线程池状态是running 或 shutdown
A、如果当前线程是突然终止的,addWorker()
B、如果当前线程不是突然终止的,但当前线程数量 < 要维护的线程数量,addWorker()
故如果调用线程池shutdown(),直到workQueue为空前,线程池都会维持corePoolSize个线程,然后再逐渐销毁这corePoolSize个线程