pool-1-thread-2 Start. Time = Sun Apr 12 11:14:42 CST 2020
pool-1-thread-1 End. Time = Sun Apr 12 11:14:47 CST 2020
pool-1-thread-4 End. Time = Sun Apr 12 11:14:47 CST 2020
pool-1-thread-5 End. Time = Sun Apr 12 11:14:47 CST 2020
pool-1-thread-3 End. Time = Sun Apr 12 11:14:47 CST 2020
pool-1-thread-2 End. Time = Sun Apr 12 11:14:47 CST 2020
## 2.线程池原理分析 承接上一节,我们通过代码输出结果可以看出:**线程首先会先执行 5 个任务,然后这些任务有任务被执行完的话,就会去拿新的任务执行。** 大家可以先通过上面讲解的内容,分析一下到底是咋回事?(自己独立思考一会) 现在,我们就分析上面的输出内容来简单分析一下线程池原理。 **为了搞懂线程池的原理,我们需要首先分析一下`execute`方法。**在上一节中的Demo中我们使用`executor.execute(worker)`来提交一个任务到线程池中去,这个方法非常重要,下面我们来看看它的源码: ```java // 存放线程池的运行状态 (runState) 和线程池内有效线程的数量 (workerCount) private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0)); private static int workerCountOf(int c) { return c & CAPACITY; } //任务队列 private final BlockingQueue<Runnable> workQueue; public void execute(Runnable command) { // 如果任务为null,则抛出异常。 if (command == null) throw new NullPointerException(); // ctl 中保存的线程池当前的一些状态信息 int c = ctl.get(); // 下面会涉及到 3 步 操作 // 1.首先判断当前线程池中之行的任务数量是否小于 corePoolSize // 如果小于的话,通过addWorker(command, true)新建一个线程,并将任务(command)添 加到该线程中;然后,启动该线程从而执行任务。 if (workerCountOf(c) < corePoolSize) { if (addWorker(command, true)) return; c = ctl.get(); } // 2.如果当前之行的任务数量大于等于 corePoolSize 的时候就会走到这里 // 通过 isRunning 方法判断线程池状态,线程池处于RUNNING状态才会被并且队列可以加入任务,该任务才会被加入进去 if (isRunning(c) && workQueue.offer(command)) { int recheck = ctl.get(); // 再次获取线程池状态,如果线程池状态不是RUNNING状态就需要从任务队列中移除任务,并尝试判断线程是否全部执行完毕。同时执行拒绝策略。 if (!isRunning(recheck) && remove(command)) reject(command); // 如果当前线程池为空就新创建一个线程并执行。 else if (workerCountOf(recheck) == 0) addWorker(null, false); } //3. 通过addWorker(command, false)新建一个线程,并将任务(command)添加到该线程中;然后,启动该线程从而执行任务。 //如果addWorker(command, false)执行失败,则通过reject()执行相应的拒绝策略的内容。 else if (!addWorker(command, false)) reject(command); }
通过下图可以更好的对上面这 3 步做一个展示
addWorker
这个方法主要用来创建新的工作线程,如果返回true说明创建和启动工作线程成功,否则的话返回的就是false。
// 全局锁,并发操作必备 private final ReentrantLock mainLock = new ReentrantLock(); // 跟踪线程池的最大大小,只有在持有全局锁mainLock的前提下才能访问此集合 private int largestPoolSize; // 工作线程集合,存放线程池中所有的(活跃的)工作线程,只有在持有全局锁mainLock的前提下才 能访问此集合 private final HashSet<Worker> workers = new HashSet<>(); //获取线程池状态 private static int runStateOf(int c) { return c & ~CAPACITY; } //判断线程池的状态是否为 Running private static boolean isRunning(int c) { return c < SHUTDOWN; } /** * 添加新的工作线程到线程池 * @param firstTask 要执行 * @param core参数为true的话表示使用线程池的基本大小,为false使用线程池最大大小 * @return 添加成功就返回true否则返回false */ private boolean addWorker(Runnable firstTask, boolean core) { retry: for (;;) { //这两句用来获取线程池的状态 int c = ctl.get(); int rs = runStateOf(c); // Check if queue empty only if necessary. if (rs >= SHUTDOWN && ! (rs == SHUTDOWN && firstTask == null && ! workQueue.isEmpty())) return false; for (;;) { //获取线程池中线程的数量 int wc = workerCountOf(c); // core参数为true的话表明队列也满了,线程池大小变为 maximumPoolSize if (wc >= CAPACITY || wc >= (core ? corePoolSize : maximumPoolSize)) return false; //原子操作将workcount的数量加1 if (compareAndIncrementWorkerCount(c)) break retry; // 如果线程的状态改变了就再次执行上述操作 c = ctl.get(); if (runStateOf(c) != rs) continue retry; // else CAS failed due to workerCount change; retry inner loop } } // 标记工作线程是否启动成功 boolean workerStarted = false; // 标记工作线程是否创建成功 boolean workerAdded = false; Worker w = null; try { w = new Worker(firstTask); final Thread t = w.thread; if (t != null) { // 加锁 final ReentrantLock mainLock = this.mainLock; mainLock.lock(); try { //获取线程池状态 int rs = runStateOf(ctl.get()); //rs < SHUTDOWN 如果线程池状态依然为RUNNING,并且线程的状态是存活的话,就会将工作线程添加到工作线程集合中 //(rs=SHUTDOWN && firstTask == null)如果线程池状态小于STOP,也就是 RUNNING或者SHUTDOWN状态下,同时传入的任务实例firstTask为null,则需要添加到工作线程集合和启 动新的Worker // firstTask == null证明只新建线程而不执行任务 if (rs < SHUTDOWN || ( rs == SHUTDOWN && firstTask == null)) { if (t.isAlive()) // precheck that t is startable throw new IllegalThreadStateException(); workers.add(w); //更新当前工作线程的最大容量 int s = workers.size(); if (s > largestPoolSize) largestPoolSize = s; // 工作线程是否启动成功 workerAdded = true; } } finally { // 释放锁 mainLock.unlock(); } 如果成功添加工作线程,则调用Worker内部的线程实例t的Thread#start()方 法启动真实的线程实例 if (workerAdded) { t.start(); /// 标记线程启动成功 workerStarted = true; } } } finally { // 线程启动失败,需要从工作线程中移除对应的Worker if (! workerStarted) addWorkerFailed(w); } return workerStarted; }
现在,让我们在回到上一节我们写的Demo, 现在应该是不是很容易就可以搞懂它的原理了呢?
没搞懂的话,也没关系,可以看看我的分析:
我们在代码中模拟了 10 个任务,我们配置的核心线程数为 5 、等待队列容量为 100 ,所以每次只可能存在 5 个任务同时执行,剩下的 5 个任务会被放到等待队列中去。当前的5个任务中如果有任务被执行完了,线程池就会去拿新的任务执行。
Runnable
自Java 1.0以来一直存在,但Callable
仅在 Java1.5中引入,目的就是为了来处理Runnable
不支持的用例。 Runnable
接口不会返回结果或抛出检查异常,但是Callable
接口可以。所以,如果任务不需要返回结果或抛出异常推荐使用Runnable
接口,这样代码看起来会更加简洁。
工具类Executors
可以实现Runnable
对象和Callable
对象之间的相互转换。(Executors.callable
(Runnable task
)或Executors.callable
(Runnable task
,Object resule
) )。
Runnable.java
@FunctionalInterface public interface Runnable { /** * 被线程执行,没有返回值也无法抛出异常 */ public abstract void run(); }
Callable.java
@FunctionalInterface public interface Callable<V> { /** * 计算结果,或在无法这样做时抛出异常。 * @return 计算得出的结果 * @throws 如果无法计算结果,则抛出异常 */ V call() throws Exception; }
1.execute()
方法用于提交不需要返回值的任务,所以无法判断任务是否被线程池执行成功与否;
2.submit()
方法用于提交需要返回值的任务。线程池会返回一个Future
类型的对象,通过这个Future
对象可以判断任务是否执行成功,并且可以通过Future
的get()
方法来获取返回值,get()
方法会阻塞当前线程直到任务完成,而使用get(long timeout,TimeUnit unit)
方法则会阻塞当前线程一段时间后立即返回,这时候有可能任务没有执行完。
我们以AbstractExecutorService
接口中的一个 submit 方法为例子来看看源代码:
public Future<?> submit(Runnable task) { if (task == null) throw new NullPointerException(); RunnableFuture<Void> ftask = newTaskFor(task, null); execute(ftask); return ftask; }
上面方法调用的newTaskFor
方法返回了一个FutureTask
对象。
protected <T> RunnableFuture<T> newTaskFor(Runnable runnable, T value) { return new FutureTask<T>(runnable, value); }
我们再来看看execute()
方法:
public void execute(Runnable command) { ... }
shutdown()
:关闭线程池,线程池的状态变为SHUTDOWN
。线程池不再接受新任务了,但是队列里的任务得执行完毕。shutdownNow()
:关闭线程池,线程的状态变为STOP
。线程池会终止当前正在运行的任务,并停止处理排队的任务并返回正在等待执行的List。isShutDown
当调用shutdown()
方法后返回为 true。isTerminated
当调用shutdown()
方法后,并且所有提交的任务完成后返回为 true4.加餐: Callable + ThreadPoolExecutor 示例代码
MyCallable.java
import java.util.concurrent.Callable; public class MyCallable implements Callable<String> { @Override public String call() throws Exception { Thread.sleep(1000); //返回执行当前 Callable 的线程名字 return Thread.currentThread().getName(); } }
CallableDemo.java
import java.util.ArrayList; import java.util.Date; import java.util.List; import java.util.concurrent.ArrayBlockingQueue; import java.util.concurrent.Callable; import java.util.concurrent.ExecutionException; import java.util.concurrent.Future; import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.TimeUnit; public class CallableDemo { private static final int CORE_POOL_SIZE = 5; private static final int MAX_POOL_SIZE = 10; private static final int QUEUE_CAPACITY = 100; private static final Long KEEP_ALIVE_TIME = 1L; public static void main(String[] args) { //使用阿里巴巴推荐的创建线程池的方式 //通过ThreadPoolExecutor构造函数自定义参数创建 ThreadPoolExecutor executor = new ThreadPoolExecutor( CORE_POOL_SIZE, MAX_POOL_SIZE, KEEP_ALIVE_TIME, TimeUnit.SECONDS, new ArrayBlockingQueue<>(QUEUE_CAPACITY), new ThreadPoolExecutor.CallerRunsPolicy()); List<Future<String>> futureList = new ArrayList<>(); Callable<String> callable = new MyCallable(); for (int i = 0; i < 10; i++) { //提交任务到线程池 Future<String> future = executor.submit(callable); //将返回值 future 添加到 list,我们可以通过 future 获得 执行 Callable 得到 的返回值 futureList.add(future); } for (Future<String> fut : futureList) { try { System.out.println(new Date() + "::" + fut.get()); } catch (InterruptedException | ExecutionException e) { e.printStackTrace(); } } //关闭线程池 executor.shutdown(); } }
Output:
Wed Nov 13 13:40:41 CST 2019::pool-1-thread-1 Wed Nov 13 13:40:42 CST 2019::pool-1-thread-2 Wed Nov 13 13:40:42 CST 2019::pool-1-thread-3 Wed Nov 13 13:40:42 CST 2019::pool-1-thread-4 Wed Nov 13 13:40:42 CST 2019::pool-1-thread-5 Wed Nov 13 13:40:42 CST 2019::pool-1-thread-3 Wed Nov 13 13:40:43 CST 2019::pool-1-thread-2 Wed Nov 13 13:40:43 CST 2019::pool-1-thread-1 Wed Nov 13 13:40:43 CST 2019::pool-1-thread-4 Wed Nov 13 13:40:43 CST 2019::pool-1-thread-5
FixedThreadPool
被称为可重用固定线程数的线程池。通过Executors类中的相关源代码来看一下相关实现:
/** * 创建一个可重用固定数量线程的线程池 */ public static ExecutorService newFixedThreadPool(int nThreads, ThreadFactory threadFactory) { return new ThreadPoolExecutor(nThreads, nThreads, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>(), threadFactory); }
另外还有一个FixedThreadPool
的实现方法,和上面的类似,所以这里不多做阐述:
public static ExecutorService newFixedThreadPool(int nThreads) { return new ThreadPoolExecutor(nThreads, nThreads, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>()); }
从上面源代码可以看出新创建的FixedThreadPool
的corePoolSize
和maximumPoolSize
都被设置为nThreads
,这个nThreads
参数是我们使用的时候自己传递的。
FixedThreadPool
的execute()
方法运行示意图
上图说明:
LinkedBlockingQueue
;LinkedBlockingQueue
中获取任务来执行;FixedThreadPool
使用无界队列LinkedBlockingQueue
(队列的容量为 Intger.MAX_VALUE)作为线程池的工作队列会对线程池带来如下影响:
corePoolSize
后,新任务将在无界队列中等待,因此线程池中的线程数不会超过corePoolSize
;maximumPoolSize
将是一个无效参数,因为不可能存在任务队列满的情况。所以,通过创建FixedThreadPool
的源码可以看出创建的FixedThreadPool
的 corePoolSize
和maximumPoolSize
被设置为同一个值。keepAliveTime
将是一个无效参数;FixedThreadPool
(未执行shutdown()
或shutdownNow()
)不会拒绝任务,在任务比较多的时候会导致 OOM(内存溢出)。SingleThreadExecutor
是只有一个线程的线程池。下面看SingleThreadExecutor的实现:
/** *返回只有一个线程的线程池 */ public static ExecutorService newSingleThreadExecutor(ThreadFactory threadFactory) { return new FinalizableDelegatedExecutorService (new ThreadPoolExecutor(1, 1, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>(), threadFactory)); }
public static ExecutorService newSingleThreadExecutor() { return new FinalizableDelegatedExecutorService (new ThreadPoolExecutor(1, 1, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>())); }
从上面源代码可以看出新创建的SingleThreadExecutor
的corePoolSize
和maximumPoolSize
都被设置为 1.其他参数和FixedThreadPool
相同。
SingleThreadExecutor
的运行示意图
上图说明;
1.如果当前运行的线程数少于corePoolSize,则创建一个新的线程执行任务;
2.当前线程池中有一个运行的线程后,将任务加入LinkedBlockingQueue
3.线程执行完当前的任务后,会在循环中反复从LinkedBlockingQueue
中获取任务来执行;
SingleThreadExecutor
使用无界队列LinkedBlockingQueue
作为线程池的工作队列(队列的容量为 Intger.MAX_VALUE)。SingleThreadExecutor
使用无界队列作为线程池的工作队列会对线程池带来的影响与FixedThreadPool
相同。说简单点就是可能会导致OOM,
CachedThreadPool
是一个会根据需要创建新线程的线程池。下面通过源码来看看
CachedThreadPool
的实现:
/** * 创建一个线程池,根据需要创建新线程,但会在先前构建的线程可用时重用它。 */ public static ExecutorService newCachedThreadPool(ThreadFactory threadFactory) { return new ThreadPoolExecutor(0, Integer.MAX_VALUE, 60L, TimeUnit.SECONDS, new SynchronousQueue<Runnable>(), threadFactory); }
public static ExecutorService newCachedThreadPool() { return new ThreadPoolExecutor(0, Integer.MAX_VALUE, 60L, TimeUnit.SECONDS, new SynchronousQueue<Runnable>()); }
CachedThreadPool
的corePoolSize
被设置为空(0),maximumPoolSize
被设置为Integer.MAX.VALUE,即它是无界的,这也就意味着如果主线程提交任务的速度高于maximumPool
中线程处理任务的速度时,CachedThreadPool
会不断创建新的线程。极端情况下,这样会导致耗尽cpu 和内存资源。
CachedThreadPool 的 execute()方法的执行示意图
上图说明:
SynchronousQueue.offer(Runnable task)
提交任务到任务队列。如果当前maximumPool
中有闲线程正在执行SynchronousQueue.poll(keepAliveTime,TimeUnit.NANOSECONDS)
,那么主线程执行offer操作与空闲线程执行的poll
操作配对成功,主线程把任务交给空闲线程执行,execute()
方法maximumPool
为空,或者maximumPool
中没有空闲线程时,将没有线程执行SynchronousQueue.poll(keepAliveTime,TimeUnit.NANOSECONDS)
。这种情况下,步骤 1 将失败,此时CachedThreadPool
会创建新线程执行任务,execute 方法执行完成;CachedThreadPool
允许创建的线程数量为Integer.MAX_VALUE,可能会创建大量线程,从而导致OOM。
ScheduledThreadPoolExecutor
主要用来在给定的延迟后运行任务,或者定期执行任务。 这个在实际项目中基本不会被用到,因为有其他方案选择比如quartz
。大家只需要简单了解一下它的思想。
ScheduledThreadPoolExecutor
使用的任务队列DelayQueue
封装了一个PriorityQueue
, PriorityQueue
会对队列中的任务进行排序,执行所需时间短的放在前面先被执行( ScheduledFutureTask
的time
变量小的先执行),如果执行所需时间相同则先提交的任务将被先执行( ScheduledFutureTask
的squenceNumber
变量小的先执行)。
ScheduledThreadPoolExecutor
和Timer
的比较:
Timer
对系统时钟的变化敏感,ScheduledThreadPoolExecutor
不是;Timer
只有一个执行线程,因此长时间运行的任务可以延迟其他任务。ScheduledThreadPoolExecutor
可以配置任意数量的线程。 此外,如果你想(通过提供ThreadFactory),你可以完全控制创建的线程;TimerTask
中抛出的运行时异常会杀死一个线程,从而导致 Timer
死机:-( …即计划任务将不再运行。ScheduledThreadExecutor
不仅捕获运行时异常,还允许您在需要时处理它们(通过重写afterExecute
方法ThreadPoolExecutor
)。抛出异常的任务将被取消,但其他任务将继续运行。综上,在 JDK1.5 之后,你没有理由再使用 Timer 进行任务调度了。
备注: Quartz 是一个由 java 编写的任务调度库,由OpenSymphony 组织开源出来。在实际项目开发中使用 Quartz 的还是居多,比较推荐使用 Quartz。因为 Quartz 理论上能够同时对上万个任务进行调度,拥有丰富的功能特性,包括任务调度、任务持久化、可集群化、插件等等。
ScheduledThreadPoolExecutor
的执行主要分为两大部分:
ScheduledThreadPoolExecutor
的scheduleAtFixedRate()
方法或者scheduleWirhFixedDelay()
方法时,会向ScheduledThreadPoolExecutor
的DelayQueue
添加一个实现了RunnableScheduledFuture
接口ScheduledFutureTask
。这个月马上就又要过去了,还在找工作的小伙伴要做好准备了,小编整理了大厂java程序员面试涉及到的绝大部分面试题及答案,希望能帮助到大家
CodeChina开源项目:【一线大厂Java面试题解析+核心总结学习笔记+最新讲解视频】
scheduleAtFixedRate()
方法或者scheduleWirhFixedDelay()
方法时,会向ScheduledThreadPoolExecutor
的DelayQueue
添加一个实现了RunnableScheduledFuture
接口ScheduledFutureTask
。
这个月马上就又要过去了,还在找工作的小伙伴要做好准备了,小编整理了大厂java程序员面试涉及到的绝大部分面试题及答案,希望能帮助到大家
CodeChina开源项目:【一线大厂Java面试题解析+核心总结学习笔记+最新讲解视频】
[外链图片转存中…(img-omkPZrmg-1630820838033)]
[外链图片转存中…(img-KgZiPFGD-1630820838033)]