线程池具体使用细节不再本篇文章的讨论范围,想了解用法的请自行百度,这里仅展示一个线程池小Demo
该线程池的作用为:使用线程池执行100000个线程,线程任务实现的是callable接口,每个线程打印相应的逻辑,然后返回(此处线程池并没有对返回的结果进行接收)
public class TestThreadPool { public static void main(String[] args) throws ExecutionException, InterruptedException { CopyOnWriteArrayList<Future> retList = new CopyOnWriteArrayList<>(); List<Task> taskList = new ArrayList<>(); for (int i = 0; i < 100000; i++) { taskList.add(new Task(i)); } ExecutorService threadPool = new ThreadPoolExecutor( 1,//核心的线程数量 3,//最大的线程数量 10,//等待一定事件后关闭最大线程 TimeUnit.MILLISECONDS,//等待时间的单位 new LinkedBlockingQueue<>(10),//创建一个队列 Executors.defaultThreadFactory(),//创建线程的线程工厂 new ThreadPoolExecutor.CallerRunsPolicy());// 拒绝策略 for (int i = 0; i < taskList.size(); i++) { threadPool.submit(taskList.get(i)); } } } class Task implements Callable<Integer> { private Integer num; public Task(Integer num) { this.num = num; } @Override public Integer call() throws Exception { System.out.println("---uuu--" + num); return num; } }
三大方法
//1.创建一个只有一个线程的线程池 ExecutorService threadPool = Executors.newSingleThreadExecutor(); //2.创建一个可伸缩的线程池 ExecutorService threadPool = Executors.newCachedThreadPool(); //3.创建一个指定最大数量的线程池 ExecutorService threadPool = Executors.newFixedThreadPool(3);
七大参数
ExecutorService threadPool = new ThreadPoolExecutor( 1,//核心的线程数量 3,//最大的线程数量 10,//等待一定事件后关闭最大线程 TimeUnit.MILLISECONDS,//等待时间的单位 new LinkedBlockingQueue<>(10),//创建一个队列,存放任务 Executors.defaultThreadFactory(),//创建线程的线程工厂 new ThreadPoolExecutor.CallerRunsPolicy());// 拒绝策略
四种拒绝策略
//多出来的线程,直接抛出异常 new ThreadPoolExecutor.AbortPolicy() //谁开启的这个线程,就让这个线程返回给谁执行。比如main线程开启的,那就返回给main线程执行 new ThreadPoolExecutor.CallerRunsPolicy() //如果队列线程数量满了以后,直接丢弃,不抛出异常 new ThreadPoolExecutor.DiscardPolicy() //队列满了以后,尝试去和最早的线程竞争,也不会抛出异常 new ThreadPoolExecutor.DiscardOldestPolicy
1、调用execute方法,submit就是在execute的方法上套了一层RunnableFuture。(下面代码中的workQueue就是我们初始化线程池的时候传入的队列)
2、addWorker方法。该方法主要看关注传入的参数firskTask,即我们传入的线程任务,也就是execute方法的参数
3、它将我们传入的线程封装为一个worker对象。将线程任务firskTask赋值给firstTask属性,将worker对象赋值给thread属性。(对应的getThreadFactory方法就是调用我们初始化线程池的时候的线程工厂去创建)
4、回到第二步的代码中,除了将我们的线程任务添加到workers集合中,还调用了t变量的start方法。t变量就是第三步的thread变量,即使我们的worker对象
5、调用worker对象的start方法。由于Worker实现了Runnable接口,所以start方法对应的就是调用Runnable接口的run方法
6、然后调用对应的runWorker方法。在该方法中,拿到worker对象的firstTask属性,即我们前文execute方法的参数,也即是我们的线程任务。然后调用线程任务的run 方法
简单理一下这个流程,我们可以得到如下结论:
1、还是以execute方法为入口,然后直接跳转到上面第二小节的第六点,进入runWorker方法。run方法发生异常,会进入进入后面的processWorkerExit方法。传入的参数w是worker对象
2、在该方法中,首先将worker对象从workers队列中移除(前面有说到workers这个队列中存放的是所有任务),然后添加一个firstTask为null的任务。这里又会调用addWorker方法,但是由于firstTask是null,所以就没有对应的线程任务可以执行
总结:即如果使用线程池在执行多个线程任务的时候,其中一个线程发生异常。那么线程池会捕获这个发生异常的线程,然后将这个线程从线程任务队列中移除(workers)。再添加一个任务为null的线程任务,再执行这个null任务。也可以变相的理解为我们要执行的线程任务,被丢弃了。
我相信你在看了源码之后,就能够很简单的记住,它们每种拒绝策略的作用
使用拒绝策略的时候会调用这个reject方法,对应的handler在初始化线程池的时候会传入具体的实现
AbortPolicy:直接抛出异常
public void rejectedExecution(Runnable r, ThreadPoolExecutor e) { throw new RejectedExecutionException("Task " + r.toString() + " rejected from " + e.toString()); }
CallerRunsPolicy:交由调用它的线程执行
public void rejectedExecution(Runnable r, ThreadPoolExecutor e) { if (!e.isShutdown()) { r.run(); } }
DiscardOldestPolicy:弹出队列(poll)中的其他线程任务
public void rejectedExecution(Runnable r, ThreadPoolExecutor e) { if (!e.isShutdown()) { e.getQueue().poll(); e.execute(r); } }
DiscardPolicy:什么操作都不执行
public void rejectedExecution(Runnable r, ThreadPoolExecutor e) { }
自定义拒绝策略
自定义拒绝策略只需要实现RejectedExecutionHandler接口,再重写rejectedExecution方法即可。我们可以将任务写到类似mq的中间件,或者持久化到数据库。再编写一个线程去不断监控线程池的队列情况,如果发现队列中任务下降到一定的阈值,那么我们就可以读取之前任务,将他们再次存放到队列中。