线程的创建需要开辟虚拟机栈、本地方法栈、程序计数器等线程私有的内存空间,在线程销毁时需要回收这些系统资源。频繁的创建和销毁线程会浪费大量的系统资源,增加并发编程的风险;另外,线程自身无法解决在服务器负载过大的时候让新的线程等待或友好的拒绝服务的问题。所以需要通过线程池协调多个线程,并实现类似主次线程隔离、定时执行、周期执行等任务。
线程池的作用包括:
ThreadPoolExecutor顶层接口是Executor,ExecutorService接口继承了Executor接口,定义了 管理线程任务的方法;
ExecutorService的抽象类AbstractExecutorService提供了submit()、invokeAll()等部分方法的实现;
ThreadPoolExecutor实现了最复杂的运行部分,一方面维护自身的生命周期,另一方面同时管理线程和任务,使两者良好的结合从而执行并行任务。
public ThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue, ThreadFactory threadFactory, RejectedExecutionHandler handler) { if (corePoolSize < 0 || maximumPoolSize <= 0 || maximumPoolSize < corePoolSize || keepAliveTime < 0) throw new IllegalArgumentException(); if (workQueue == null || threadFactory == null || handler == null) throw new NullPointerException(); //其余代码 }
corePoolSize 核心线程数。如果等于0,任务执行完之后,没有任何请求进入时销毁线程池的线程;如果大于0,即使本地任务执行完毕,核心线程也不会被销毁。这个值设置过大会浪费资源,设置过小会导致线程频繁的创建销毁线程
maximumPoolSize 最大线程数。从构造函数中第一个if可以看出,该值必须大于或等于1。如果待执行的线程数大于此值,则需要借助第五个参数的帮助,缓存在队列中。如果maximumPoolSize 和corePoolSize 相等,即是固定大小线程池
keepAliveTime 线程空闲时间。当空闲时间达到keepAliveTime值时,线程会被销毁,知道只剩下corePoolSize个线程为止,避免浪费内存和句柄资源。默认情况下,当线程数大于corePoolSize时,keepAliveTime才会起作用。但是当ThreadPoolExecutor的allowCoreThreadTimeOut变量设置为True时,核心线程超时后也会被回收
TimeUnit keepAliveTime的时间单位,通常时间单位是TimeUnit.SECONDS。
workQueue 缓存队列。当请求的线程数大于corePoolSize时,线程进入BlockingQueue阻塞队列。
threadFactory 线程工厂。用来生产一组相同任务的线程。线程池的命名时通过给这个factory增加组名前缀来实现的。
handler 拒绝策略。当workQueue 的任务缓存区到达上限,并且活动线程数大于maximumPoolSize的时候,线程池通过该策略处理请求。
友好的拒绝策略可以是以下三种:
1、保存到数据库进行削峰填谷。在空闲时再提取出来执行 2、转向某个提示页面 3、打印日志
线程池的5种状态:Running、ShutDown、Stop、Tidying、Terminated。
线程池状态源码:
// Integer共有32位,最右边29位标识工作线程数,最左边三位表示线程池状态 // 三个二进制位可以表示从0到7的8个不同数值 private static final int COUNT_BITS = Integer.SIZE - 3; // 类似于子网掩码,用于位的与运算 private static final int CAPACITY = (1 << COUNT_BITS) - 1; // runState is stored in the high-order bits private static final int RUNNING = -1 << COUNT_BITS; private static final int SHUTDOWN = 0 << COUNT_BITS; private static final int STOP = 1 << COUNT_BITS; private static final int TIDYING = 2 << COUNT_BITS; private static final int TERMINATED = 3 << COUNT_BITS;
线程池的状态用高三位表示,其中包括了符号位。五种 状态的十进制按从小到大的顺序依次为:RUNNING<SHUTDOWN<STOP<TIDYING<TERMINATED,这样可以根据值的大小来确定线程池的状态,例如isRunning的判断:
private static boolean isRunning(int c) { return c < SHUTDOWN; }
五种状态介绍:
所有的任务都是由execute方法完成的
execute源码分析:
public void execute(Runnable command) { if (command == null) throw new NullPointerException(); /* * Proceed in 3 steps: * * 1. If fewer than corePoolSize threads are running, try to * start a new thread with the given command as its first * task. The call to addWorker atomically checks runState and * workerCount, and so prevents false alarms that would add * threads when it shouldn't, by returning false. * * 2. If a task can be successfully queued, then we still need * to double-check whether we should have added a thread * (because existing ones died since last checking) or that * the pool shut down since entry into this method. So we * recheck state and if necessary roll back the enqueuing if * stopped, or start a new thread if there are none. * * 3. If we cannot queue task, then we try to add a new * thread. If it fails, we know we are shut down or saturated * and so reject the task. */ // 包含线程数以及线程池状态的值 int c = ctl.get(); // 如果工作线程数小于核心线程数,则创建线程任务并执行 if (workerCountOf(c) < corePoolSize) { if (addWorker(command, true)) return; // 如果创建失败,防止外部已经在线程池中加入新的任务,重新获取 c = ctl.get(); } // 只有线程处于RUNNING状态,才执行 置入队列 if (isRunning(c) && workQueue.offer(command)) { int recheck = ctl.get(); // 这里做了double check,如果此时线程池不是RUNNING状态,将加入队列的任务一处 if (! isRunning(recheck) && remove(command)) reject(command); // 如果之前的线程已经被消费完,新建一个线程 else if (workerCountOf(recheck) == 0) addWorker(null, false); } // 核心池和队列都满了,尝试创建一个新的线程 else if (!addWorker(command, false)) // 如果创建失败,则进行拒绝策略 reject(command); }
JDK 提供了四种工作队列
public class Test { private static ExecutorService pool; public static void main(String[] args) { //maximumPoolSize设置为2 ,拒绝策略为AbortPolic策略,直接抛出异常 pool = new ThreadPoolExecutor(1, 2, 1000, TimeUnit.MILLISECONDS, new SynchronousQueue<Runnable>(), Executors.defaultThreadFactory(), new ThreadPoolExecutor.AbortPolicy()); for(int i=0;i<3;i++) { pool.execute(new ThreadTask()); } } } class ThreadTask implements Runnable{ public ThreadTask() {} public void run() { System.out.println(Thread.currentThread().getName()); } }
执行结果:
public class Test { private static ExecutorService pool; public static void main(String[] args) { //maximumPoolSize设置为2 ,拒绝策略为AbortPolic策略,直接抛出异常 pool = new ThreadPoolExecutor(1, 2, 1000, TimeUnit.MILLISECONDS, new ArrayBlockingQueue<Runnable>(5), Executors.defaultThreadFactory(), new ThreadPoolExecutor.AbortPolicy()); for(int i=0;i<8;i++) { pool.execute(new ThreadTask()); } } } class ThreadTask implements Runnable{ public ThreadTask() {} public void run() { System.out.println(Thread.currentThread().getName()); } }
执行结果:
pool = new ThreadPoolExecutor(1, 2, 1000, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>(), Executors.defaultThreadFactory(), new ThreadPoolExecutor.AbortPolicy());
执行结果:
public class Test { private static ExecutorService pool; public static void main(String[] args) { //maximumPoolSize设置为2 ,拒绝策略为AbortPolic策略,直接抛出异常 pool = new ThreadPoolExecutor(1, 2, 1000, TimeUnit.MILLISECONDS, new PriorityBlockingQueue<Runnable>(), Executors.defaultThreadFactory(), new ThreadPoolExecutor.AbortPolicy()); for(int i=0;i<8;i++) { pool.execute(new ThreadTask(i)); } } } class ThreadTask implements Runnable,Comparable<ThreadTask>{ private int priority; public ThreadTask() {} public ThreadTask(int priority) { this.priority = priority; } public void run() { System.out.println(Thread.currentThread().getName() + "===> priority is : " + this.priority); } @Override public int compareTo(ThreadTask o) { return this.priority > o.priority ? -1 : 1; } }
执行结果:
除此之外还有一些队列:
线程池中线程就是通过 ThreadPoolExecutor 中的 ThreadFactory 线程工厂创建的。通过自定义 ThreadFactory,可以按需要对线程池中创建的线程进行一些特殊的设置,例如设置线程命名:
pool = new ThreadPoolExecutor(1, 2, 1000, TimeUnit.MILLISECONDS, new PriorityBlockingQueue<Runnable>(), new ThreadFactory() { @Override public Thread newThread(Runnable r) { return new Thread(r, "testThread-" + new Date()); } }, new ThreadPoolExecutor.AbortPolicy());
运行效果:
在ThreadPoolExecutor中提供了四个公开的静态内部类:
拿CallerRunsPolicy来分析一下:
pool = new ThreadPoolExecutor(1, 2, 1000, TimeUnit.MILLISECONDS, new ArrayBlockingQueue<Runnable>(3), Executors.defaultThreadFactory(), new ThreadPoolExecutor.CallerRunsPolicy());
可以看到有两个线程直接走的main
看下源码:
/** * A handler for rejected tasks that runs the rejected task * directly in the calling thread of the {@code execute} method, * unless the executor has been shut down, in which case the task * is discarded. */ public static class CallerRunsPolicy implements RejectedExecutionHandler { /** * Creates a {@code CallerRunsPolicy}. */ public CallerRunsPolicy() { } /** * Executes task r in the caller's thread, unless the executor * has been shut down, in which case the task is discarded. * * @param r the runnable task requested to be executed * @param e the executor attempting to execute this task */ public void rejectedExecution(Runnable r, ThreadPoolExecutor e) { if (!e.isShutdown()) { r.run(); } } }
可以看出内置的拒绝策略实现了 RejectedExecutionHandler,我们也可以自己定义一个拒绝策略:
pool = new ThreadPoolExecutor(1, 2, 1000, TimeUnit.MILLISECONDS, new ArrayBlockingQueue<Runnable>(3), Executors.defaultThreadFactory(), new RejectedExecutionHandler() { @Override public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) { System.out.println(r.toString() + "进入个性化拒绝策略"); r.run(); } });
运行效果:
public static ExecutorService newCachedThreadPool() { return new ThreadPoolExecutor(0, Integer.MAX_VALUE, 60L, TimeUnit.SECONDS, new SynchronousQueue<Runnable>()); }
newCachedThreadPool将创建一个可缓存的线程池。如果线程池的当前规模超过了处理需求时,那么就会根据keepAliveTime来回收部分空闲线程,默认是60秒;如果任务数增加,再次创建出新线程处理任务。
public static ExecutorService newFixedThreadPool(int nThreads) { return new ThreadPoolExecutor(nThreads, nThreads, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>()); }
newFixedThreadPool将创建一个固定长度的线程池,每当提交一个任务时就会创建一个线程,直到达线程池的最大数量;输入的参数即是固定线程数,即是核心线程数也是最大线程数,不存在空闲线程,所以keepAliveTime等于0,该线程池配置的缓存队列是无界的,也就是说,如果我的线程都阻塞后,会有大量请求在缓存队列中不会被执行。
public static ExecutorService newSingleThreadExecutor() { return new FinalizableDelegatedExecutorService (new ThreadPoolExecutor(1, 1, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>())); }
newSingleThreadExecutor是一个单线程线程池,相当于单线程串行执行所有任务,保证按任务的提交顺序依次执行
public static ScheduledExecutorService newScheduledThreadPool(int corePoolSize) { return new ScheduledThreadPoolExecutor(corePoolSize); }
public ScheduledThreadPoolExecutor(int corePoolSize) { super(corePoolSize, Integer.MAX_VALUE, 0, NANOSECONDS, new DelayedWorkQueue()); }
newScheduledThreadPool创建了一个固定长度的线程池,而且使用DelayedWorkQueue以延迟或定时或周期的方式来执行任务,类似于Timer。可应用于重发机制。
public static ExecutorService newWorkStealingPool() { return new ForkJoinPool (Runtime.getRuntime().availableProcessors(), ForkJoinPool.defaultForkJoinWorkerThreadFactory, null, true); }
newWorkStealingPool是jdk8引入,创建持有足够线程的线程池支持给定的并发量,并通过多个队列减少竞争,该方法将cpu数量设置为默认并发量。
可以分析下ForkJoinPool的构造函数:
/** * Creates a {@code ForkJoinPool} with the given parameters. * * @param parallelism the parallelism level. For default value, * use {@link java.lang.Runtime#availableProcessors}. * @param factory the factory for creating new threads. For default value, * use {@link #defaultForkJoinWorkerThreadFactory}. * @param handler the handler for internal worker threads that * terminate due to unrecoverable errors encountered while executing * tasks. For default value, use {@code null}. * @param asyncMode if true, * establishes local first-in-first-out scheduling mode for forked * tasks that are never joined. This mode may be more appropriate * than default locally stack-based mode in applications in which * worker threads only process event-style asynchronous tasks. * For default value, use {@code false}. * @throws IllegalArgumentException if parallelism less than or * equal to zero, or greater than implementation limit * @throws NullPointerException if the factory is null * @throws SecurityException if a security manager exists and * the caller is not permitted to modify threads * because it does not hold {@link * java.lang.RuntimePermission}{@code ("modifyThread")} */ public ForkJoinPool(int parallelism, ForkJoinWorkerThreadFactory factory, UncaughtExceptionHandler handler, boolean asyncMode) { this(checkParallelism(parallelism), checkFactory(factory), handler, asyncMode ? FIFO_QUEUE : LIFO_QUEUE, "ForkJoinPool-" + nextPoolId() + "-worker-"); checkPermission(); } private static int checkParallelism(int parallelism) { if (parallelism <= 0 || parallelism > MAX_CAP) throw new IllegalArgumentException(); return parallelism; }
构造函数上也有大段的注释解释了每个参数的意思,其中
上述几种线程池的相关类图:
线程池尽力不要使用Executors,而是通过ThreadPoolExecutor的方式创建,这样的处理方式能更加明确线程池的运行规则,规避资源耗尽的风险
本篇文章参考资料:
《码出高效-Java开发手册》7.4章节
作者:日常更新——Java线程池ThreadPoolExecutor类
作者:杨七 ——Java线程池总结