在使用线程时,需要new一个,用完了又要销毁,这样频繁的创建和销毁很耗资源,所以就提供了线程池。道理和连接池差不多,连接池是为了避免频繁的创建和释放连接,所以在连
接池中就有一定数量的连接,要用时从连接池拿出,用完归还给连接池,线程池也一样。
线程池:一种线程使用模式。线程过多会带来调度开销,进而影响缓存局部性和整体性能。而线程池维护着多
个线程,等待着监督管理者分配可并发执行的任务。这避免了在处理短时间任务时创建与销毁线程的代价。线程池不仅能够保证内核的充分利用,还能防止过分调度。
线程池用法很简单, 分为三步。首先用工具类Executors创建线程池,然后给线程池分配任务,最后关闭线程池就行了。
1 public class ThreadPoolTest { 2 public static void main(String[] args) throws Exception { 3 4 // 1.创建一个 10 个线程数的线程池 5 ExecutorService service = Executors.newFixedThreadPool(10); 6 7 // 2.执行一个Runnable 8 service.execute(new Number1()); 9 10 // 2.提交一个Callable 11 Future<Integer> future = service.submit(new Number2()); 12 Integer integer = future.get(); 13 System.out.println("result = " + integer); 14 15 // 关闭线程池 16 service.shutdown(); 17 } 18 } 19 20 class Number1 implements Runnable { 21 22 @Override 23 public void run() { 24 System.out.println("----打印Runnable----"); 25 } 26 } 27 28 // 10以内数求和 29 class Number2 implements Callable<Integer> { 30 private int sum = 0; 31 32 @Override 33 public Integer call() { 34 for (int i = 0; i <= 10; i++) { 35 if (i % 2 == 0) { 36 sum += i; 37 } 38 } 39 return sum; 40 } 41 }
注意:线程用完,要关闭线程池,否则程序依然在运行中。
JDK 5.0 起提供了线程池相关API:顶级接口Executor,及子接口 ExecutorService 和工具类Executors。
JUC包描述:图片来源API文档
Executors:工具类,线程池的工厂类,用于创建并返回不同类型的线程池。
1 // 一池N线程:创建一个固定(可重用)线程数的线程池。 2 ExecutorService Executors.newFixedThreadPool(int nThreads) 3 4 // 一池一线程:创建一个只有一个线程的线程池。 5 ExecutorService Executors.newSingleThreadExecutor() 6 7 // 可扩容:创建一个可根据需要线程数,创建新的线程的线程池。 8 ExecutorService Executors.newCachedThreadPool() 9 10 // 可用于调度:创建一个线程池,它可安排在给定延迟后运行命令或者定期的执行。 11 ScheduledExecutorService Executors.newScheduledThreadPool(int corePoolSize)
ExecutorService:
1 // 执行任务/命令,没有返回值,一般用来执行Runnable 2 void execute(Runnable command) 3 4 // 可用于提交一个Runnable,但没有返回值 5 Future<?> submit(Runnable task) 6 7 // 执行任务,有返回值,一般用来执行Callable 8 <T> Future<T> submit(Callable<T> task) 9 10 // 关闭连接池 11 void shutdown()
代码示例:创建固定 5 个线程的线程池为 10 个任务服务。
1 public class ThreadPoolTest { 2 public static void main(String[] args) { 3 // 1.创建一个 10 个线程数的线程池 4 ExecutorService service = Executors.newFixedThreadPool(5); 5 6 try { 7 for (int i = 0; i < 10; i++) { 8 int finalI = i; 9 service.execute(() -> { 10 11 System.out.println(Thread.currentThread().getName() + " 为客户 " + finalI + " 办理业务~"); 12 13 // try { 14 // Thread.sleep(1000_000); 15 // } catch (InterruptedException e) { 16 // e.printStackTrace(); 17 // } 18 19 }); 20 } 21 } finally { 22 service.shutdown(); 23 } 24 } 25 } 26 27 // 可能的一种结果 28 pool-1-thread-1 为客户 0 办理业务~ 29 pool-1-thread-2 为客户 1 办理业务~ 30 pool-1-thread-2 为客户 6 办理业务~ 31 pool-1-thread-2 为客户 7 办理业务~ 32 pool-1-thread-2 为客户 8 办理业务~ 33 pool-1-thread-1 为客户 5 办理业务~ 34 pool-1-thread-2 为客户 9 办理业务~ 35 pool-1-thread-3 为客户 2 办理业务~ 36 pool-1-thread-4 为客户 3 办理业务~ 37 pool-1-thread-5 为客户 4 办理业务~
可以看到,银行 5 个窗口为 10 个客户相继服务。若前面服务时间长(打开注释),线程池便没有新的线程来执行任务了。程序会陷入等待中。
代码示例:创建单个线程的线程池为 10 个线程服务。代码同上,只修改:
1 ExecutorService service = Executors.newSingleThreadExecutor();
代码示例:创建可扩容线程的线程池为 10 个线程服务。代码同上,只修改:
1 ExecutorService service = Executors.newCachedThreadPool();
为什么要用线程池管理线程呢?当然是为了线程复用。
背景:经常创建和销毁、使用量特别大的资源,比如并发情况下的线程,对性能影响很大。
思路:提前创建好多个线程,放入线程池中,使用时直接获取,使用完放回池中。可以避免频繁的创建和销毁,实现重复利用。类似生活中的公共交通工具。
好处:提高响应速度(减少了创建新线程的时间);降低资源消耗(重复利用线程池中线程,不需要每次都创建);便于线程管理。
前面介绍了三种(固定数、单一的、可变的)创建线程池的方式,实际工作用哪一个呢?都不使用!为什么呢?
《阿里巴巴Java开发手册》明确规定:线程池不允许使用Executors创建,而是通过ThreadPoolExecutor的方式,规避资源耗尽风险。
查看源码,可以看到,用Executors创建线程池的三种方式中,都 new 了一个 ThreadPoolExecutor。所以,实际生产一般通过 ThreadPoolExecutor 的 7 个参数,自定义线程池。
源码示例:7 个参数的构造器。
1 public ThreadPoolExecutor(int corePoolSize, 2 int maximumPoolSize, 3 long keepAliveTime, 4 TimeUnit unit, 5 BlockingQueue<Runnable> workQueue, 6 ThreadFactory threadFactory, 7 RejectedExecutionHandler handler) { 8 if (corePoolSize < 0 || 9 maximumPoolSize <= 0 || 10 maximumPoolSize < corePoolSize || 11 keepAliveTime < 0) 12 throw new IllegalArgumentException(); 13 if (workQueue == null || threadFactory == null || handler == null) 14 throw new NullPointerException(); 15 this.corePoolSize = corePoolSize; 16 this.maximumPoolSize = maximumPoolSize; 17 this.workQueue = workQueue; 18 this.keepAliveTime = unit.toNanos(keepAliveTime); 19 this.threadFactory = threadFactory; 20 this.handler = handler; 21 }
介绍线程池之前,先来看一个生活中的案例。银行业务办理流程,如图:
某银行一共有 5 个服务窗口,但平时一般只开放两个,另外三个不开放。大厅中还有 10 个等待服务的座位。某天:
(1)客人1(用Thread1表示)来办理业务,他就直接去开放的窗口1办理(假设他需要服务的时间很长,一直在服务中,后面的也一样)。
(2)Thread2来办理业务,由于窗口1在服务中,所以他去了开放的窗口2办理。
(3)Thread3来办理业务,由于窗口1和窗口2都在服务中,所以他去了大厅的等待服务座位上排队等待。
(4)Thread4~Thread12 同理Thread3。
(5)Thread13来办理业务,由于窗口1和窗口2都在服务中,且此时大厅的等待座位上也已满。银行经理便将关闭的窗口3打开来为Thread13服务。注意:这里并不是Thread13去大厅排队,然后队列中队头元素Thread3出队接受服务。而是直接为Thread13服务。
(6)Thread14,Thread15来办理业务,会开放窗口4为Thread14服务,开放窗口5为Thread15服务。
(7)Thread16来办理业务,此时,已无可用窗口,且大厅的等待座位上也已满。银行便拒绝再为 Thread16 服务。
说明:若 Thread13、Thread14、Thread15 业务办理完毕后,没有新的客人来银行办理业务。那么窗口3、窗口4、窗口5会在一定时间后又关闭起来。
下面介绍 ThreadPoolExecutor 构造器中的 7 个核心参数。
corePoolSize:线程池的核心线程数。
maximumPoolSize:线程池的最大线程数,要大于corePoolSize。
keepAliveTime:非核心线程闲置下来最多存活的时间。
unit:线程池中非核心线程保持存活的时间单位,与keepAliveTime一起使用。
workQueue:用来保存提交后,等待执行任务的阻塞队列。
threadFactory:创建线程的工厂类。
handler:拒绝策略。
在理解上一节"银行服务"的过程后,就不难理解上面 7 个参数的含义。
corePoolSize = 2:窗口1 + 窗口2。
maximumPoolSize = 5:窗口1 + 窗口2 + 窗口3 + 窗口4 + 窗口5。
workQueue = 10:银行大厅排队队列的大小。关于阻塞队列 BlockingQueue<Runnable> workQueue 请看这篇。
keepAliveTime + unit:"窗口3、窗口4、窗口5会在一定时间后又关闭起来"的时间。
handler:"银行便拒绝再为 Thread16 服务"的拒绝方式。
在了解 ThreadPoolExecutor 7个核心参数的作用后,再看Executors创建的三种线程池的源码,就不难理解他们的作用。也就明白为什么《阿里巴巴Java开发手册》中禁止使用Executors创建,而要使用ThreadPoolExecutor自定义线程池。
源码示例:
一池N线程:创建一个固定(可重用)线程数的线程池。
1 ExecutorService Executors.newFixedThreadPool(int nThreads); 2 3 public static ExecutorService newFixedThreadPool(int nThreads) { 4 return new ThreadPoolExecutor(nThreads, nThreads, 5 0L, TimeUnit.MILLISECONDS, 6 new LinkedBlockingQueue<Runnable>()); 7 } 8 9 public LinkedBlockingQueue() { 10 this(Integer.MAX_VALUE); 11 }
一池一线程:创建一个只有一个线程的线程池。
1 ExecutorService Executors.newSingleThreadExecutor(); 2 3 public static ExecutorService newSingleThreadExecutor() { 4 return new FinalizableDelegatedExecutorService 5 (new ThreadPoolExecutor(1, 1, 6 0L, TimeUnit.MILLISECONDS, 7 new LinkedBlockingQueue<Runnable>())); 8 } 9 10 public LinkedBlockingQueue() { 11 this(Integer.MAX_VALUE); 12 }
可扩容:创建一个可根据需要线程数,创建新的线程的线程池。
1 ExecutorService Executors.newCachedThreadPool(); 2 3 public static ExecutorService newCachedThreadPool() { 4 return new ThreadPoolExecutor(0, Integer.MAX_VALUE, 5 60L, TimeUnit.SECONDS, 6 new SynchronousQueue<Runnable>()); 7 }
在理解"银行服务"的过程后,其实也就说清楚了线程池的工作流程。只是一些细节没有说,比如:
(1)窗口3为Thread13服务完成后,Thread14才来,情况如何?
(2)……
代码示例:银行 2+3 个窗口为陆续来的 16 个客户服务。
1 public class ThreadPoolDemo { 2 public static void main(String[] args) throws Exception { 3 // 模拟上图中 2 + 3 + 10(大厅排队长度) 的线程池 4 ThreadPoolExecutor executor = new ThreadPoolExecutor( 5 2, 5 6 , 6, TimeUnit.SECONDS 7 , new ArrayBlockingQueue<>(10) 8 , Executors.defaultThreadFactory() 9 , new ThreadPoolExecutor.AbortPolicy()); 10 11 // 创建16个线程模拟16个客户 12 final int num = 16; 13 for (int i = 1; i <= num; i++) { 14 int finalI = i; 15 16 // 这里主要为了给线程起名字. 17 Thread thread = new Thread(() -> { 18 System.out.println(Thread.currentThread().getName() + "=====" + finalI + " 号客人开始服务"); 19 20 // 假设 13 和 16 服务很快. 21 if (finalI != 16 && finalI != 13) { 22 try { 23 // 正在为 finalI 号客户服务中 24 Thread.sleep(1000_000); 25 } catch (InterruptedException e) { 26 e.printStackTrace(); 27 } 28 } 29 30 System.out.println(Thread.currentThread().getName() + "=====" + finalI + " 号客人服务结束"); 31 }, "------" + i); 32 33 System.out.println(thread.getName() + " 号客人来了"); 34 executor.execute(thread); 35 36 // 让主线程休息一下,保证上面开启的线程先执行. 37 Thread.sleep(200); 38 } 39 40 // 保证上面的线程执行完 41 Thread.sleep(1_000); 42 43 System.out.println("===核心线程数===" + executor.getCorePoolSize()); 44 System.out.println("===总任务数=====" + executor.getTaskCount()); 45 46 final BlockingQueue<Runnable> queue = executor.getQueue(); 47 System.out.println("===正在排队====" + queue.size()); 48 49 System.out.println("===最大线程数===" + executor.getMaximumPoolSize()); 50 System.out.println("==============" + executor.getPoolSize()); 51 System.out.println("===完成任务数===" + executor.getCompletedTaskCount()); 52 System.out.println("==============" + executor.getLargestPoolSize()); 53 54 executor.shutdown(); 55 } 56 } 57 58 // 结果(程序未停止) 59 ------1 号客人来了 60 pool-1-thread-1=====1 号客人开始服务 61 ------2 号客人来了 62 pool-1-thread-2=====2 号客人开始服务 63 ------3 号客人来了 64 ------4 号客人来了 65 ------5 号客人来了 66 ------6 号客人来了 67 ------7 号客人来了 68 ------8 号客人来了 69 ------9 号客人来了 70 ------10 号客人来了 71 ------11 号客人来了 72 ------12 号客人来了 // 到这里都不难理解 73 ------13 号客人来了 74 pool-1-thread-3=====13 号客人开始服务 75 pool-1-thread-3=====13 号客人服务结束 // 开放窗口3为客户13立刻服务完毕. 76 pool-1-thread-3=====3 号客人开始服务 // 阻塞队列,队头 客户3 出队接受窗口3的服务 77 ------14 号客人来了 // 加入阻塞队列队尾 78 ------15 号客人来了 79 pool-1-thread-4=====15 号客人开始服务 80 ------16 号客人来了 81 pool-1-thread-5=====16 号客人开始服务 82 pool-1-thread-5=====16 号客人服务结束 83 pool-1-thread-5=====4 号客人开始服务 84 ===核心线程数===2 85 ===总任务数=====16 86 ===正在排队====9 87 ===最大线程数===5 88 ==============5 89 ===完成任务数===2 90 ==============5
其他的情况,可通过修改代码示例中相关参数进行测试,自然就理解。
线程在Java中属于稀缺资源,线程池不是越大越好,也不是越小越好。那么,线程池的参数要如何设置才合理呢?
任务分为CPU密集型、IO密集型、混合型。
CPU密集型:大部分都在用CPU跟内存,加密,逻辑操作,业务处理等。
IO密集型:数据库链接,网络通讯传输等。
CPU密集型:一般推荐线程池不要过大,一般是CPU数 + 1,+1是因为可能存在页缺失(就是可能存在有些数据在硬盘中需要多来一个线程将数据读入内存)。如果线程池数太大,可能会频繁的进行线程上下文切换跟任务调度。
获得当前CPU核心数代码如下:
1 Runtime.getRuntime().availableProcessors();
IO密集型:线程数适当大一点,机器的CPU核心数*2。
混合型:可以考虑根绝情况将它拆分成CPU密集型和IO密集型任务,如果执行时间相差不大,拆分可以提升吞吐量,反之没有必要。
当线程池的任务缓存队列已满并且线程池中的线程数目达到maximumPoolSize,如果还有任务到来就会采取任务拒绝策略,就会调用这个接口里的这个方法。也就是"银行便拒绝再为 Thread16 服务"的拒绝方式。
1 public interface RejectedExecutionHandler { 2 void rejectedExecution(Runnable r, ThreadPoolExecutor executor); 3 }
ThreadPoolExecutor 提供了四种拒绝策略,分别是
AbortPolicy:直接抛出异常,这也是默认策略。
CallerRunsPolicy:返回给调用者处理。用调用者所在线程来运行任务。
DiscardOldestPolicy:抛弃队列中等待最久的任务,然后把当前任务加入队列中尝试再次提交当前任务。
DiscardPolicy:不处理,直接丢弃当前任务。
代码示例:4种拒绝策略,代码同上,只需修改:
1 // 1.去掉这个if条件 2 if (finalI != 16 && finalI != 13) {} 3 4 5 6 // 2.1 线程池创建时拒绝策略为: 7 new ThreadPoolExecutor.AbortPolicy() 8 // 2.1 结果,直接抛出了异常.(只截取了最后一点) 9 ------16 号客人来了 10 Exception in thread "main" java.util.concurrent.RejectedExecutionException 11 12 13 14 // 2.2 线程池创建时拒绝策略为: 15 new ThreadPoolExecutor.CallerRunsPolicy() 16 // 2.2 结果,返回给调用者main处理了.(只截取了最后一点) 17 ------16 号客人来了 18 main=====16 号客人开始服务 19 20 21 22 // 2.3 线程池创建时拒绝策略为: 23 new ThreadPoolExecutor.DiscardOldestPolicy() 24 // 2.3 结果,见下图 25 26 27 28 // 2.4 线程池创建时拒绝策略为: 29 new ThreadPoolExecutor.DiscardPolicy() 30 // 2.4 结果(丢弃了任务,没有任何处理)
通过debug断点的方式,可以查看到:DiscardOldestPolicy策略中,此时阻塞队列中是客户4~客户16。也就是客户3 出队,被抛弃,客户16入队等待。
如果不使用线程池提供的4种拒绝策略,也可以自己实现拒绝策略的接口,实现对这些超出数量的任务的处理。比如:为被拒绝的任务开启一个新的线程执行,如下。
1 // 线程池创建时拒绝策略为: 2 new MyRejectedExecutionHandler() 3 // 结果 4 ------16 号客人来了 5 ---开启新线程处理任务---=====16 号客人开始服务 6 7 8 // 自定义的策略拒绝 9 class MyRejectedExecutionHandler implements RejectedExecutionHandler { 10 11 @Override 12 public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) { 13 new Thread(r, "---开启新线程处理任务---").start(); 14 } 15 }
参考文档:https://www.matools.com/api/java8
百度网盘:https://pan.baidu.com/s/1FZ9DNr0sF1mAc6Nq_6QZmg 密码: 4sw1