线程池,将线程资源当做一个池子,里面维护着一些保持活跃的线程来执行任务,避免了线程的频繁创建、销毁带来的资源损耗。
线程池带来的好处:
降低资源消耗:通过重用已经创建的线程来降低线程创建和销毁的消耗。
提高响应速度:任务到达时不需要等待线程创建就可以立即执行。
提高线程的可管理性:线程池可以统一管理、分配、调优和监控。
//单线程线程池,保证任务按指定顺序执行 ExecutorService service1 = Executors.newSingleThreadExecutor(); //创建一个固定大小的线程池,可以控制线程的最大并发数 ExecutorService service2 = Executors.newFixedThreadPool(5); //创建一个可缓存的线程池,如果线程池长度超过处理需要,可灵活回收空闲线程,若无可回收,否则新建线程 ExecutorService service3 = Executors.newCachedThreadPool (); //创建一个定时线程池,支持定时及周期性的执行任务 ExecutorService service4 = Executors.newScheduledThreadPool(5);
【强制】线程池不允许使用 Executors 去创建,而是通过 ThreadPoolExecutor的方式,这样 的处理方式让写的同学更加明确线程池的运行规则,规避资源耗尽的风险。 说明: Executors 返回的线程池对象的弊端如下:
1 FixedThreadPool 和 SingleThreadPool : 允许的请求队列长度为 Integer.MAX_VALUE ,可能会堆积大量的请求,从而导致 OOM 。
2 CachedThreadPool 和 ScheduledThreadPool : 允许的创建线程数量为 Integer.MAX_VALUE ,可能会创建大量的线程,从而导致 OOM 。
Executor接口:只有一个execute方法,定义提交任务的方法
ExecutorService:是Executor的子接口,增加了一些常用的对线程的控制方法。
AbstractExecutorService: 是一个抽象类,ThreadPoolExecutor其实就是实现了这个抽象类。
提交单个任务:void execute(Runnable command)、Future
多个任务批量执行:
invokeAll(Collection<? extends Callable
invokeAny(Collection<? extends Callable
修改线程池状态:
shutdown(): 将线程池状态置为SHUTDOWN,停止接收任务,但会把现有工作线程和任务等待队列里的任务跑完
shutdownNow():将线程池状态置为STOP,停止接收任务,忽略任务等待队列,尝试中断工作线程。
ThreadPoolExecutor的一些关键属性
corePoolSize 核心线程数,即空闲时保留的线程数
maximumPoolSize 最大线程数,代表该线程池能同时执行任务的最大线程数,当队列无界时其实不生效
keepAliveTime 线程的存活时间,一般指超过corePoolSize数的线程最久能保留的时间
线程池的阻塞队列(WorkQueue): 任务提交的等待队列
注:阻塞队列的有界无界会关系到最大线程数属性是否生效,和线程池添加任务逻辑有关
注:其实利用阻塞队列的offer添加、poll提取,可以控制队列中任务执行的顺序
ArrayBlockingQueue:基于数组结构的有界阻塞队列,FIFO。 LinkedBlockingQueue:基于链表结构的有界阻塞队列,FIFO。 SynchronousQueue:不存储元素的阻塞队列,每个插入操作都必须等待一个移出操作,反之亦然。 PriorityBlockingQueue:具有优先级别的阻塞队列。
threadFactory 线程的制造工厂
线程池通过他来创建线程,可以自定义一些线程的基本属性。
线程池的拒绝策略(RejectedExecutionHandler )
AbortPolicy:直接抛出异常,默认策略;
CallerRunsPolicy:用调用者所在的线程来执行任务;
DiscardOldestPolicy:丢弃阻塞队列中靠最前的任务,并执行当前任务;
DiscardPolicy:直接丢弃任务;
线程池提交任务的流程图
ctl是一个32位数,高3位保存线程池的状态,后29位保存此时线程池中的Woker类线程数量
ctl源码:
private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));//0^32 private static final int COUNT_BITS = Integer.SIZE - 3;//29 private static final int CAPACITY = (1 << COUNT_BITS) - 1;//最大线程容量 //线程池有以下五种状态 private static final int RUNNING = -1 << COUNT_BITS; \\接受新execute的task, 执行已入队的task private static final int SHUTDOWN = 0 << COUNT_BITS; \\不接受新execute的task, 但执行已入队的task, 中断所有空闲的线程 private static final int STOP = 1 << COUNT_BITS; \\不接受新execute的task, 不执行已入队的task, 中断所有的线程 private static final int TIDYING = 2 << COUNT_BITS; \\所有线程停止, workerCount数量为0, 将执行钩子方法: terminated() private static final int TERMINATED = 3 << COUNT_BITS; \\terminated()方法执行完毕
workers源码:
private final HashSet<Worker> workers = new HashSet<Worker>(); \\存储线程池的工作线程,Worker是ThreadPoolExecutor的内部类,也是ThreadPoolExecutor的工作线程。
Worker源码:
private final class Worker extends AbstractQueuedSynchronizer implements Runnable { //实际执行线程 final Thread thread; //提交的任务 Runnable firstTask; //完成的任务数统计 volatile long completedTasks; // 构造函数,setState(-1),使线程不会被打断 Worker(Runnable firstTask) { setState(-1); // inhibit interrupts until runWorker this.firstTask = firstTask; this.thread = getThreadFactory().newThread(this); } //重写Runnable接口的run方法,实际调用runWorker方法,后面讲解runWorker方法 public void run() { runWorker(this); }
public void execute(Runnable command) { if (command == null) throw new NullPointerException(); // 获取ctl的值 int c = ctl.get(); // wokerCountOf(c)计算工作线程的数量 // 1、如果工作线程数小于核心线程数,创建一个工作线程到workers。 if (workerCountOf(c) < corePoolSize) { if (addWorker(command, true))//添加任务到workers工作线程set return; c = ctl.get(); } //2、 当前线程池工作线程数大于cordPoolSize,尝试添加任务到等待队列workQueue if (isRunning(c) && workQueue.offer(command)) { int recheck = ctl.get(); //对线程池状态做再一步检查 if (! isRunning(recheck) && remove(command)) reject(command); else if (workerCountOf(recheck) == 0) addWorker(null, false); } //3\任务添加等待队列失败,会尝试新建一个非核心的任务Worker工作线程 else if (!addWorker(command, false)) //新建工作线程失败,执行拒绝策略 reject(command); }
private boolean addWorker(Runnable firstTask, boolean core) { retry: for (;;) { int c = ctl.get(); int rs = runStateOf(c); // 判断当前线程池状态,及SHUTDOWN状态可以执行等待队列里的任务 if (rs >= SHUTDOWN && ! (rs == SHUTDOWN && firstTask == null && ! workQueue.isEmpty())) return false; for (;;) { int wc = workerCountOf(c); // 判断工作线程数量 if (wc >= CAPACITY || wc >= (core ? corePoolSize : maximumPoolSize)) return false; // ctl中工作线程数加一,跳出当前循环 if (compareAndIncrementWorkerCount(c)) break retry; c = ctl.get(); // Re-read ctl if (runStateOf(c) != rs) // 增加工作线程失败,ctl状态变更,再来一次 continue retry; // else CAS failed due to workerCount change; retry inner loop } } boolean workerStarted = false; boolean workerAdded = false; Worker w = null; try { w = new Worker(firstTask); final Thread t = w.thread; if (t != null) { // 可重入锁 final ReentrantLock mainLock = this.mainLock; mainLock.lock(); try { int rs = runStateOf(ctl.get()); if (rs < SHUTDOWN || (rs == SHUTDOWN && firstTask == null)) { if (t.isAlive()) // precheck that t is startable throw new IllegalThreadStateException(); workers.add(w); // 工作线程set中增加一个Work int s = workers.size(); if (s > largestPoolSize) largestPoolSize = s; workerAdded = true; } } finally { mainLock.unlock(); } if (workerAdded) { t.start(); workerStarted = true; } } } finally { if (! workerStarted) addWorkerFailed(w); } return workerStarted; }
runWorker():注意while循环,当while循环条件不满足时,这个工作线程就执行完了,应该销毁。
final void runWorker(Worker w) { Thread wt = Thread.currentThread(); Runnable task = w.firstTask; w.firstTask = null; w.unlock(); // allow interrupts boolean completedAbruptly = true; try { // getTask()持续从人物等待队列workQueue获取任务 while (task != null || (task = getTask()) != null) { w.lock(); if ((runStateAtLeast(ctl.get(), STOP) || (Thread.interrupted() && runStateAtLeast(ctl.get(), STOP))) && !wt.isInterrupted()) wt.interrupt(); try { beforeExecute(wt, task); Throwable thrown = null; try { // 实际执行我们提交任务的run方法 task.run(); } catch (RuntimeException x) { thrown = x; throw x; } catch (Error x) { thrown = x; throw x; } catch (Throwable x) { thrown = x; throw new Error(x); } finally { afterExecute(task, thrown); } } finally { task = null; w.completedTasks++; w.unlock(); } } completedAbruptly = false; } finally { processWorkerExit(w, completedAbruptly); } }
getTask()方法源码:工作线程从任务队列中获取任务执行的方法体,关键的地方在workQueue.poll和workQueue.take这两个方法,poll会超过等待时间返回null,take则会一直阻塞,实现了空闲时间核心线程保留,非核心线程销毁的逻辑。
private Runnable getTask() { boolean timedOut = false; // Did the last poll() time out? // 自循环保证池状态的一致性 for (;;) { int c = ctl.get(); int rs = runStateOf(c); // 如果等待队列为空,则工作线程数减1,并返回null;注意该线程是否被销毁的逻辑在runWorker(Worker)方法内,此处不用处理 if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) { decrementWorkerCount(); return null; } int wc = workerCountOf(c); // 获取工作线程数 // 工作线程是否被销毁? // 如果allowCoreThreadTimeOut设置为false,表明当该线程不是核心线程时就会被销毁,否则核心线程不会被销毁; // 如果allowCoreThreadTimeOut设置为true,表明即使是核心线程也会被销毁; boolean timed = allowCoreThreadTimeOut || wc > corePoolSize; //如果工作线程数大于maximumPoolSize且工作线程数大于1或者等待队列为空,则工作线程数减1且返回null if ((wc > maximumPoolSize || (timed && timedOut)) && (wc > 1 || workQueue.isEmpty())) { if (compareAndDecrementWorkerCount(c)) return null; continue; } try { //如果timed为true,则执行poll(Long,TimeUnit),否则执行take() // poll(Long,TimeUnit)会限制线程等待指定时间,如果等待超时则返回null;take()则不会限制等待时间,阻塞直到返回一个任务; Runnable r = timed ? workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) : workQueue.take(); if (r != null) return r; timedOut = true; // 等待超时了 } catch (InterruptedException retry) { timedOut = false; } } }
使用例子: ``` newThreadPoolExecutor(...new PriorityBlockingQueue(6, new MyRunnableComparator<MyRunnable>()\\MyRunnableComparator自定义的一个比较器) static class MyRunnableComparator<M extends BaseNumber> implements Comparator<MyRunnable> { @Override public int compare(MyRunnable r1, MyRunnable r2) { if (r1.getNum() > r2.getNum()) { return -1; } else if (r1.getNum().equals(r2.getNum())) { return 0; } else { return 1; } } } ``` 元素采用二叉堆数组形式实现: https://images0.cnblogs.com/i/497634/201403/182345301461858.jpg
CAS:compareAndSwamp,比较并交换,CAS有3个操作数,内存值V,预期值A,更新值B。只有当V=A时,才把V更新为B。
具体实现例子为为UNSAFE类的compareAndSwapInt方法:unsafe.compareAndSwapInt(this, valueOffset, expect, update);\\实际调用CPU底层指令实现
缺点:会出现ABA问题