Executor 框架是在 Java5 中引入的,通过该框架来控制线程的启动,执行,关闭,简化并发编程。
Executor 基于生产者-消费者模式,提交任务的线程相当于生产者,执行任务的线程相当于消费者。同时,Executor 的实现还提供了对任务执行的生命周期管理的支持。
Executor
接口。Eexcutor 框架的基础,将任务提交和任务执行解耦。
ExecutorService
接口。继承自 Executor 接口,具备管理执行器和任务生命周期的方法,提交任务机制更完善。
ThreadPoolExecutor
核心实现类。执行被提交的任务。
ScheduledThreadPoolExecutor
实现类。支持定时或者延迟执行任务。
Executors
提供了一系列静态工厂用于创建各种线程池。返回的线程池都实现了 ExecutorService 接口。
execute(Runnable) 和 submit(Runnable)
execute(Runnable) 接受一个 java.lang.Runnable 对象作为参数,并且以异步的方式去执行。
submit(Runnable) 同样接受一个 java.lang.Runnable 对象作为参数,但是会返回一个 Future 对象。该Future 对象可以用于判断 Runnable 是否结束执行。
Eexcutor 框架由 3 大部分组成:
任务
每个任务都是用 Runnable/Callable 接口的实现类表示。
任务执行
任务执行的核心是采用 Executor 框架,核心类是 ThreadPoolExecutor。
异步计算的结果
异步任务需要返回结果,提交任务后需要返回 Future, FutureTask 实现。
运行过程:
线程池是指管理一组同构工作线程的资源池。
使用线程池的好处:
线程复用
可以重用线程池中的线程,减少因对象创建、销毁所带来的性能开销。
可控制最大并发数
提高系统资源利用率,同时避免过多的资源竞争,避免堵塞。
线程管理
能够对多线程进行简单的管理,使线程的使用简单、高效。
提高响应性
不会因为等待创建线程而延迟任务执行,从而提高了响应性。
Executors 提供了一系列静态工厂用于创建各种线程池:
newFixedThreadPool
创建一个固定长度的线程池。每当提交一个任务时就创建一个线程,直到达到线程池的最大数量,这时线程池的规模将不再变化。
newCachedThreadPool
创建一个可缓存的线程池。如果线程池的当前规模超过了处理需求时,那么将回收空闲的线程,当需求增加时,则可以添加新的线程,线程池的规模不存在任何限制。
newSingleThreadPool
创建单个工作线程来执行任务,如果某个线程异常,则会创建另一个线程来替代,能确保依照任务在队列中的顺序来串行执行。
newScheduledThreadPool
创建一个固定长度的线程池。可以定时或者延迟执行任务。
创建线程池主要是 ThreadPoolExecutor 类来完成。ThreadPoolExecutor 的有许多重载的构造方法,通过参数最多的构造方法来理解创建线程池有哪些需要配置的参数。ThreadPoolExecutor 的构造方法为:
ThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue, ThreadFactory threadFactory, RejectedExecutionHandler handler)
参数说明:
Executors.defaultThreadFactory()
Executors 创建线程池会发生 OOM(Out Of Memory)
Java中的 BlockingQueue 主要有两种:
ArrayBlockingQueue
用数组实现的有界阻塞队列,必须设置容量。
LinkedBlockingQueue
一个用链表实现的有界阻塞队列,容量可以选择进行设置,如果不设置的话,将是一个无边界的阻塞队列,最大长度为 Integer.MAX_VALUE。
看 Executors.newFixedThreadPool 源码:
public static ExecutorService newFixedThreadPool(int nThreads) { return new ThreadPoolExecutor(nThreads, nThreads, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>()); }
LinkedBlockingQueue 并没有设置容量,LinkedBlockingQueue 就是一个无边界队列,对于一个无边界队列来说,是可不断的向队列中加入任务,这种情况下就有可能因为任务过多而导致内存溢出问题。
创建线程池的正确姿势
避免使用 Executors 创建线程池,主要是避免使用其中的默认实现,那么我们可以自己直接调用ThreadPoolExecutor 的构造函数来自己创建线程池。在创建的同时,给 BlockQueue 指定容量即可。
private static ExecutorService executor = new ThreadPoolExecutor(10, 10, 60L, TimeUnit.SECONDS, new ArrayBlockingQueue(10));
(1)线程池刚创建时,里面没有一个线程,任务队列作为参数传入。此时,即使任务队列中有任务,线程池也不会马上执行他们。
(2)当调用 execute() 方法添加一个任务时,线程池会做如下判断:
RejectExecutionExcaption
(3)当一个线程完成任务后,在从队列中取出一个任务来执行
(4)如果一个线程空闲超过一定时间(keepAlivTime),线程池会判断,如果正在运行的线程数 > corePoolSize,则回收该线程。线程池在任务执行完后,线程数会维持在 corePoolSize 的大小。
状态转换图:
shutdown() 和 shutdownNow() 这两个方法的原理都是遍历线程池中所有的线程,然后依次中断线程。 shutdown() 和 shutdownNow() 还是有不一样的地方:
可以看出 shutdown() 方法会将正在执行的任务继续执行完,而 shutdownNow() 会直接中断正在执行的任务。
要想合理配置线程池的大小,首先分析任务的特性,可以从以下几个角度分析:
CPU 密集型任务(指利用 CPU 计算能力的任务,比如在内存中对大量数据进行排序)配置尽可能少的线程数量,如配置 (N cpu) + 1 个线程的线程池。
IO 密集型任务(比如网络读取、文件读取)则由于需要等待 IO 操作,线程并不是一直在执行任务,则配置尽可能多的线程,如 2 * (N cpu) +1。
混合型的任务,如果可以拆分,则将其拆分成一个 CPU 密集型任务和一个 IO 密集型任务,只要这两个任务执行的时间相差不是太大,那么分解后执行的吞吐率要高于串行执行的吞吐率,如果这两个任务执行时间相差太大,则没必要进行分解。
优先级不同的任务可以使用优先级队列 PriorityBlockingQueue 来处理。它可以让优先级高的任务先得到执行,需要注意的是如果一直有优先级高的任务提交到队列里,那么优先级低的任务可能永远不能执行。
执行时间不同的任务可以交给不同规模的线程池来处理,或者也可以使用优先级队列,让执行时间短的任务先执行。
依赖数据库连接池的任务,因为线程提交 SQL 后需要等待数据库返回结果,如果等待的时间越长 CPU 空闲时间就越长,那么线程数应该设置越大,这样才能更好的利用 CPU。
注意:
可通过 Runtime.getRuntime().availableProcessors() 方法获得当前设备的 CPU 个数。
最佳线程数 = (线程等待时间/线程 CPU 时间 + 1) (N cpu)*
阻塞队列最好是使用有界队列,如果采用无界队列的话,一旦任务积压在阻塞队列中的话就会占用过多的内存资源,甚至会使得系统崩溃。