线程池核心参数以及工作原理参考第六节
线程池简单原理见1.7
/* * 线程池核心属性之一 ctl。 * 高三位表示当前线程池运行状态,低29位表示当前线程池中所拥有的线程数量。 * 是一个原子类 AtomicInteger。 */ private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0)); /* * Integer.SIZE = 32 * 32 - 3 = 29 表示低29位用来存放当前线程数量的位 */ private static final int COUNT_BITS = Integer.SIZE - 3; /* * 表示低29位能表示的最大的线程数 就是 1 << 29 - 1 (大概是5亿多) * CAPACITY = 000 11111111111111111111111111111 */ private static final int CAPACITY = (1 << COUNT_BITS) - 1; /* * 下面的表示线程池的5种状态 * 状态从上到下依次递增。 */ //111 00000000000000000000000000000 (转换为二进制) 转换成10进制是一个负数 private static final int RUNNING = -1 << COUNT_BITS; //000 00000000000000000000000000000 private static final int SHUTDOWN = 0 << COUNT_BITS; //001 00000000000000000000000000000 private static final int STOP = 1 << COUNT_BITS; //010 00000000000000000000000000000 private static final int TIDYING = 2 << COUNT_BITS; //011 00000000000000000000000000000 private static final int TERMINATED = 3 << COUNT_BITS; /* * 获取当前线程池的运行状态。 * ~CAPACITY = 111 00000000000000000000000000000 * 因为要进行一个&运算,而~CAPACITY的值是固定的,根据这个值并且我们知道ctl的高三位 * 表示线程池的运行状态,所以进行&运算后就能获取到ctl的高三位的状态,即线程池的状态。 */ private static int runStateOf(int c) { return c & ~CAPACITY; } /* * 获取当前线程池的线程数量。 * CAPACITY = 000 11111111111111111111111111111 * 这个值跟ctl进行&运算,取出ctl的低29位的值,即表示获取线程池中的线程数量。 */ private static int workerCountOf(int c) { return c & CAPACITY; } /* * 用在重置当前线程池ctl值时会用到 * rs 表示线程池状态, wc表示当前线程池中worker(线程)数量 * |表示的就是不进位加法 表示的就是通过rs 和 wc重新构建一个ctl。 */ private static int ctlOf(int rs, int wc) { return rs | wc; } /* * 表示当前线程池ctl所表示的状态是否小于某个状态s * RUNNING < SHUTDOWN < STOP < TIDYING < TERMINATED */ private static boolean runStateLessThan(int c, int s) { return c < s; } /* * 表示当前线程池ctl所表示的状态是否大于等于某个状态s。 * */ private static boolean runStateAtLeast(int c, int s) { return c >= s; } /* * 判断线程池是否处于RUNNING状态 * 小于SHUTDOWN的状态一定是RUNNING状态 */ private static boolean isRunning(int c) { return c < SHUTDOWN; }
/* * 使用CAS的方式让 ctl值+1,成功返回true失败返回false。 * 即尝试添加一个线程。(Worker实际上就是工作者线程) */ private boolean compareAndIncrementWorkerCount(int expect) { return ctl.compareAndSet(expect, expect + 1); } /* * 使用CAS的方式让 ctl值-1,成功返回true失败返回false。 * 即尝试干掉一个线程。(Worker实际上就是工作者线程) */ private boolean compareAndDecrementWorkerCount(int expect) { return ctl.compareAndSet(expect, expect - 1); } /** * 将ctl的值-1, 这个方法一定成功,使用的是 自旋 + CAS的方式保证。 */ private void decrementWorkerCount() { do {} while (!compareAndDecrementWorkerCount(ctl.get())); }
/* * 增加worker(线程) 减少worker 时需要持有mainLock,修改线程池运行状态时也需要。 */ private final ReentrantLock mainLock = new ReentrantLock(); //线程池中真正存到 worker(Thread)的地方 工作者集合。 private final HashSet<Worker> workers = new HashSet<Worker>(); /* * 当外部线程调用awaitTermination()方法时,外部线程会阻塞等待当前线程池状态为Termination为止 * * 底层类似AQS的原理,等待就是将当前线程封装成一个Node,然后进入Condition的等待队列 中,当线程池状态变为termination时, * 会通过调用termination.signalAll()方法会将这 些线程全部唤醒,进入到阻塞队列中(AQS),继续去争抢锁(每次只有头节点可以获得锁) */ private final Condition termination = mainLock.newCondition(); //记录线程池生命周期内,线程数的最大值 private int largestPoolSize; //记录线程池所完成的任务总数,当一个worker退出时,会将worker完成的任务累加到这个属性中 private long completedTaskCount; /* * 线程池7大核心参数之一:任务队列:BlockingQueue(阻塞队列)是一个接口。 * 当线程池中的正在工作的线程达到核心线程数时,这时再提交的任务会直接放到workQueue中。 * 常用的实现类有基于数组的阻塞队列 ArrayBlockingQueue * 基于链表的阻塞队列 LinkedBlockingQueue */ private final BlockingQueue<Runnable> workQueue; /* * 线程池的7大参数之一,线程的创建工厂,是一个接口 * 一般不推荐使用默认的实现类DefaultThreadFactory。 */ private volatile ThreadFactory threadFactory; /* * 线程池7大核心参数之一,拒绝策略,是一个接口,有四种实现,默认是直接丢弃并抛出异常。 * DiscardOldestPolicy --->丢弃队列中最老(最先入队)的任务 * AbortPolicy --->直接丢弃新来的任务 抛出异常 (默认的) * CallerRunsPolicy --->直接调用run方法,相当于同步方法 * DiscardPolicy --->直接丢弃新来的任务 不抛出异常 */ private volatile RejectedExecutionHandler handler; /* * 线程池7大核心参数之一: 空闲线程存活时间 * 当allowCoreThreadTimeOut为false时,只有当非核心线程空闲时间达到指定时间时才会被 * 回收。 * 当allowCoreThreadTimeOut为true时,线程池内所有的线程到达指定的时间均会被回收。 * 此参数常常和 TimeUnit一起使用,指定超时时间的单位(也是线程池的7大核心参数之一) */ private volatile long keepAliveTime; /* * 线程池7大核心参数之一: 核心线程数 */ private volatile int corePoolSize; /* * 线程池7大核心参数之一: 最大线程数 */ private volatile int maximumPoolSize; //控制线程池内核心线程空闲时间达到指定时间时能否被回收 private volatile boolean allowCoreThreadTimeOut;
private final class Worker extends AbstractQueuedSynchronizer //是AQS的子类 implements Runnable //实现了Runnable接口 { /* * Worker采用了AQS的 独占 模式 * 独占模式:两个重要属性 state 和 ExclusiveOwnerThread * state: 0时表示表示未被占用,> 0时表示被占用 < 0时表示初始状态。 * ExclusiveOwnerThread表示抢占到锁的线程。 */ private static final long serialVersionUID = 6138294804551838833L; //worker内部封装的工作线程 final Thread thread; //假设firstTask不为NULL,那么当worker启动后(内部的线程启动)会优先执行firstTask,当执行完firstTask后,会到队列中去获取下一个任务。 Runnable firstTask; //记录当前worker所完成的任务数量 volatile long completedTasks; /* * 构造器 传来的Runnable任务可以为NULL,firstTask为NULL的线程启动后会去队列中 * 获取任务 */ Worker(Runnable firstTask) { //设置AQS独占模式为初始化中状态,这个时候不能被抢占锁 setState(-1); //为内部的firstTask赋值 this.firstTask = firstTask; /* * 使用线程工厂创建了一个线程,并且将当前worker指定为Runnable,也就是说当 * thread启动的时候会议worker.run为入口 */ this.thread = getThreadFactory().newThread(this); } /* * 当worker启动时,会执行run()方法。当前的这个Worker就是一个任务(Runnable) * 底层调用runWorker()直接将this传入了。 */ public void run() { //直接将当前对象传入进行执行。 runWorker(this); } /* * 判断当前worker的锁是否被占用 * state为0 表示为被占用 * state为1 表示被占用 */ protected boolean isHeldExclusively() { return getState() != 0; } //尝试去占用worker的独占锁 protected boolean tryAcquire(int unused) { //CAS的方式,将state设置为1 if (compareAndSetState(0, 1)) { //CAS成功,则将exclusiveOwnerThread设置为当前线程 setExclusiveOwnerThread(Thread.currentThread()); return true; } return false; } /* 尝试释放锁 * 外部不会直接调用这个方法,这个方法时AQS内调用的 * 外部调用unlock时,unlock -> AQS.release ->tryRelease (模板方法模式) */ protected boolean tryRelease(int unused) { setExclusiveOwnerThread(null); setState(0); return true; } //加锁,加锁失败时会阻塞当前线程。(类似ReentarntLock) public void lock() { acquire(1); } /* * 尝试去加锁,如果锁是未被持有状态,那么加锁成功后会返回true * 否则 不会阻塞当前线程会返回false。 */ public boolean tryLock() { return tryAcquire(1); } //启动worker之前会先调用unlock(),强制将独占线程置为NULL,将state变为0. public void unlock() { release(1); } //返回当前worker的lock被占用 public boolean isLocked() { return isHeldExclusively(); } void interruptIfStarted() { Thread t; if (getState() >= 0 && (t = thread) != null && !t.isInterrupted()) { try { t.interrupt(); } catch (SecurityException ignore) { } } } }
/* * 七个参数的构造方法。传入七大核心参数,为内部属性赋值。 */ public ThreadPoolExecutor(int corePoolSize, //核心线程数 int maximumPoolSize, //最大线程数 long keepAliveTime, //空闲线程存活时间 TimeUnit unit, //时间单位 BlockingQueue<Runnable> workQueue, //阻塞队列 ThreadFactory threadFactory, //线程工厂 RejectedExecutionHandler handler) { //拒绝策略 //判断参数是否合法 if (corePoolSize < 0 || maximumPoolSize <= 0 || maximumPoolSize < corePoolSize || keepAliveTime < 0) throw new IllegalArgumentException(); //workQueue threadFactory handler不能为NULL。 if (workQueue == null || threadFactory == null || handler == null) throw new NullPointerException(); this.acc = System.getSecurityManager() == null ? null : AccessController.getContext(); //为属性赋值 this.corePoolSize = corePoolSize; this.maximumPoolSize = maximumPoolSize; this.workQueue = workQueue; this.keepAliveTime = unit.toNanos(keepAliveTime); this.threadFactory = threadFactory; this.handler = handler; }