前言:
private List<Integer> container = new ArrayList<>(); private volatile int size; private volatile int capacity; private Lock lock = new ReentrantLock(); //condition private final Condition notEmpty = lock.newCondition(); private final Condition notFull = lock.newCondition(); MyBlockQueue(int cap){ this.capacity = cap; } public void put(int data){ try{ lock.lock(); try { while (size > capacity){ System.out.println("队列满了"); notEmpty.await(); } } catch (InterruptedException e) { e.printStackTrace(); } ++size; container.add(data); notFull.signal(); }finally { lock.unlock(); } } public int take(){ try{ lock.lock(); try { while (size==0){ System.out.println("队列空了"); notFull.await(); } } catch (InterruptedException e) { e.printStackTrace(); } --size; Integer ret = container.get(0); container.remove(0); notEmpty.signal(); return ret; }finally { lock.unlock(); } }
//int类型,前三位标识线程池状态,后29位标识有效线程的数量 private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0)); //29位,方便ctl进行位运算使用的常量 private static final int COUNT_BITS = Integer.SIZE - 3; //线程池的线程最大容量 private static final int CAPACITY = (1 << COUNT_BITS) - 1; //线程池运行状态 前三位 private static final int RUNNING = -1 << COUNT_BITS;//运行中:111 private static final int SHUTDOWN = 0 << COUNT_BITS;//线程被shutdown,继续执行完剩下的任务 000 private static final int STOP = 1 << COUNT_BITS;//线程被shutdownNow,线程池停止,中断所有任务 001 private static final int TIDYING = 2 << COUNT_BITS;//shutdown或shutdownNow后,任务都被处理完,到这个过渡状态 010 private static final int TERMINATED = 3 << COUNT_BITS;//线程池停止 011
public void execute(Runnable command) { //非空判断 if (command == null) throw new NullPointerException(); //获取ctl变量 int c = ctl.get(); //workerCountOf():获取线程池中正在工作的线程数 判断是否小于核心数 if (workerCountOf(c) < corePoolSize) { //核心线程还有,创建核心线程,并执行任务。传入true代表是核心线程 if (addWorker(command, true)) return; //添加核心线程数失败,重新获取ctl c = ctl.get(); } //如果线程池运行状态是运行中,将任务追加到阻塞队列 if (isRunning(c) && workQu eue.offer(command)) { //添加阻塞队列成功 int recheck = ctl.get(); //二次校验线程池状态 如果不是运行中,并移除成功,就执行拒绝策略 if (! isRunning(recheck) && remove(command)) reject(command); else if (workerCountOf(recheck) == 0) //如果工作线程为0,创建一个工作线程 addWorker(null, false); } //没有核心线程且阻塞队列满了 添加有一个非核心线程 创建成功返回true,失败返回false else if (!addWorker(command, false)) //添加最大线程数失败,执行拒绝策略 reject(command); }
private boolean addWorker(Runnable firstTask, boolean core) { retry: for (;;) {//外循环判断线程池状态 //获取ctl 和runStateOf():获取当前线程池运行状态 int c = ctl.get(); int rs = runStateOf(c); //rs >= SHUTDOWN:说明线程池执行了shutdown或shutdownNow if (rs >= SHUTDOWN && ! (rs == SHUTDOWN && //线程池停了 firstTask == null && //任务为null ! workQueue.isEmpty())) //队列空了 return false;//返回false,不创建工作线程 for (;;) {//内存循环判断线程池工作线程数量 //获取当前线程池中正在工作的线程数 int wc = workerCountOf(c); //工作线程大于核心线程 或 工作线程大于最大线程 返回false,不创建工作线程 if (wc >= CAPACITY || wc >= (core ? corePoolSize : maximumPoolSize)) return false; //满足可以创建工作线程的条件,CAS操作:正在工作的线程数+1。这里没有真正的创建工作线程 if (compareAndIncrementWorkerCount(c)) break retry; //重新获取ctl c = ctl.get(); //CAS失败,就要判断是重新执行内循环,还是外循环。如果线程池状态改变,则执行外循环。否则执行内循环 if (runStateOf(c) != rs) continue retry; } } //声明两个标识 workerStarted:工作线程启动了吗 workerAdded:工作线程添加了吗 boolean workerStarted = false; boolean workerAdded = false; Worker w = null; try { //new一个worker,并传入任务 w = new Worker(firstTask); final Thread t = w.thread; if (t != null) { //创建工作线程时,通过ReentrantLock来保证多线程并发创建,获取了线程池的全局锁 final ReentrantLock mainLock = this.mainLock; //上锁,shutdown、shutdownNow也需要获取全局锁 mainLock.lock(); try { int rs = runStateOf(ctl.get()); //线程池状态是否是运行状态 if (rs < SHUTDOWN || (rs == SHUTDOWN && firstTask == null)) {//或者线程池为shutdown并且任务是null if (t.isAlive()) //线程正在运行,抛出异常。这里工作线程还没有创建和启动,不会是运行。 throw new IllegalThreadStateException(); //将工作线程追加到workers,worker是一个HaseSet<Worker>。 workers.add(w); int s = workers.size(); //如果现在工作线程数大于历史最大值,替换掉历史最大值 if (s > largestPoolSize) largestPoolSize = s; //修改工作线程添加标记为true,标识工作线程创建好了 workerAdded = true; } } finally { //释放锁 mainLock.unlock(); } //如果工作线程添加成功率 if (workerAdded) { //启动线程 t.start(); //修改工作线程启动标识为true workerStarted = true; } } } finally { //如果工作线程启动失败 if (! workerStarted) //补救操作 addWorkerFailed(w); } return workerStarted;//返回工作线程的启动结果 }
private final class Worker extends AbstractQueuedSynchronizer implements Runnable { private static final long serialVersionUID = 6138294804551838833L; //存放线程对象 final Thread thread; //存放任务 Runnable firstTask; volatile long completedTasks; Worker(Runnable firstTask) { setState(-1); //设置state 在runWorker之前禁止中断worker创建过程。 this.firstTask = firstTask; this.thread = getThreadFactory().newThread(this); } //调用worker的start方法,实际上执行的worker中的run方法,即runWorker方法 public void run() { runWorker(this); }
final void runWorker(Worker w) { //获取当前线程 Thread wt = Thread.currentThread(); //获取worker中具体任务 Runnable task = w.firstTask; //将worker中任务设置为null w.firstTask = null; //这里unlock实际调用时aqs的release(1),释放构造方法设置的标识。代表当前线程可以被打断 w.unlock(); //设置标记为true 最后都为true表示:线程执行或获取任务过程发生异常 boolean completedAbruptly = true; try { //如果worker中task有任务,直接执行。如果没有任务,就从队列中取任务getTask() while (task != null || (task = getTask()) != null) { //要执行任务,添加一个标记。即便shutdown也不会打断任务执行 w.lock(); //判断当前线程池状态以及线程状态,是否需要被中断。这时候线程池都不能执行任务了,就会执行中断 if ((runStateAtLeast(ctl.get(), STOP) || (Thread.interrupted() && runStateAtLeast(ctl.get(), STOP))) && !wt.isInterrupted()) wt.interrupt(); try { //执行任务之前执行beforeExecute:前置增强 beforeExecute(wt, task); Throwable thrown = null; try {//这个的try是让线程执行出异常后,项目代码依旧会往后走 //执行任务 task.run(); } catch (RuntimeException x) { thrown = x; throw x; } catch (Error x) { thrown = x; throw x; } catch (Throwable x) { thrown = x; throw new Error(x); } finally { //执行任务之后执行afterExecute:后置增强,这样可以让开发者可以在线程执行之前和之后添加额外处理。 afterExecute(task, thrown); } } finally { //线程执行完了,任务设置为null,任务处理完毕0 task = null; //当前worker处理的任务数+1 w.completedTasks++; //去掉标记 w.unlock(); } } //没有任务进来了,队列任务也执行完了,就是false completedAbruptly = false; } finally { //当任务没有了或队列中也没有任务了执行processWorkerExit。销毁线程 processWorkerExit(w, completedAbruptly); } }
private Runnable getTask() { //设置超时标记 boolean timedOut = false; for (;;) { //获取运行状态 int c = ctl.get(); int rs = runStateOf(c); //如果线程池非运行状态 并且 (线程池已经停止或工作队列是空的) if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) { //CAS的减少工作线程数 decrementWorkerCount(); return null;//返回null } //获取工作中的线程数 int wc = workerCountOf(c); //allowCoreThreadTimeOut:false核心线程空闲也存活,ture核心线程使用keepAliveTime超时等待工作 //工作线程数大于核心线程数或核心线程可销毁,timed就是true boolean timed = allowCoreThreadTimeOut || wc > corePoolSize; //如果工作线程大于最大线程数 或者 工作线程数大于最大线程数了,或核心线程可销毁 if ((wc > maximumPoolSize || (timed && timedOut)) && (wc > 1 || workQueue.isEmpty())) { //工作线程数至少大于1,是因为timed为true代表工作队列可能还有任务,所以至少要保留一条线程执行任务 // 或者 工作队列空了那可以直接销毁 cas减少工作线程数,成功返回null if (compareAndDecrementWorkerCount(c)) return null; continue;//CAS失败,再次循环 } try { //如果timed为true 则超时的去队列获取任务,否则阻塞获取 Runnable r = timed ? workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) : workQueue.take(); if (r != null) //如果返回不是null,说明队列有任务 return r; timedOut = true;//超时标记为true,表示超时时间到了,队列中还没有任务。继续执行循环 } catch (InterruptedException retry) { timedOut = false; } } }
private void processWorkerExit(Worker w, boolean completedAbruptly) { //为true 表示线程池在执行任务或取队列任务执行过程发生了异常 if (completedAbruptly) //CAS的把工作线程数减为0 decrementWorkerCount(); //获取全局锁 final ReentrantLock mainLock = this.mainLock; mainLock.lock(); try { //线程池完成的任务数进行叠加 completedTaskCount += w.completedTasks; //将线程移除出worker的set workers.remove(w); } finally { //释放全局锁 mainLock.unlock(); } //校验线程池的状态,试图终止线程池 tryTerminate(); int c = ctl.get(); //线程池状态小于stop 即运行或shutdown状态 if (runStateLessThan(c, STOP)) { //线程池是正常退出,即队列没有任务,且任务执行完了 if (!completedAbruptly) { //如果设置了核心线程可销毁 则最新线程为0,否则是核心线程数 int min = allowCoreThreadTimeOut ? 0 : corePoolSize; //如果min是0 并且队列不为空 if (min == 0 && ! workQueue.isEmpty()) //保留一个线程执行任务 min = 1; if (workerCountOf(c) >= min) return;//如果工作线程数大于等于最小线程数,返回 } //如果是异常退出走这里,创建一个线程 //如果当前线程数量<最小最小线程数,也要创建一个线程 addWorker(null, false); } }