在ThreadPoolExecutor类的文档注释中有这么一句话:An ExecutorService that executes each submitted task using one of possibly several pooled threads, normally configured using Executors factory methods.
也就是说,已经提供了通过Executors获取线程池的工厂方法,也就是常说的五种线程池:
Executors中的方法:
/** * @since 1.5 * @author Doug Lea */ public class Executors { public static ExecutorService newFixedThreadPool(int nThreads) { return new ThreadPoolExecutor(nThreads, nThreads, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>()); } public static ExecutorService newWorkStealingPool(int parallelism) { return new ForkJoinPool (parallelism, ForkJoinPool.defaultForkJoinWorkerThreadFactory, null, true); } public static ExecutorService newSingleThreadExecutor() { return new FinalizableDelegatedExecutorService (new ThreadPoolExecutor(1, 1, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>())); } public static ExecutorService newCachedThreadPool() { return new ThreadPoolExecutor(0, Integer.MAX_VALUE, 60L, TimeUnit.SECONDS, new SynchronousQueue<Runnable>()); } public static ScheduledExecutorService newSingleThreadScheduledExecutor() { return new DelegatedScheduledExecutorService (new ScheduledThreadPoolExecutor(1)); } }
可以看到,这些方法最终都是实例化一个ThreadPoolExecutor对象。
ThreadPoolExecutor类中一共提供了四个构造方法:
这几个方法只有入参不同而已,也就是说,我们可以根据所需要的自定义线程池配置,选择合适的方法;而前面三个最终都是调用第四个方法实例化ThreadPoolExecutor的对象。
public ThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue) { this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, Executors.defaultThreadFactory(), defaultHandler); } public ThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue, ThreadFactory threadFactory) { this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, threadFactory, defaultHandler); } public ThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue, RejectedExecutionHandler handler) { this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, Executors.defaultThreadFactory(), handler); } public ThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue, ThreadFactory threadFactory, RejectedExecutionHandler handler) { if (corePoolSize < 0 || maximumPoolSize <= 0 || maximumPoolSize < corePoolSize || keepAliveTime < 0) throw new IllegalArgumentException(); if (workQueue == null || threadFactory == null || handler == null) throw new NullPointerException(); this.acc = System.getSecurityManager() == null ? null : AccessController.getContext(); this.corePoolSize = corePoolSize; this.maximumPoolSize = maximumPoolSize; this.workQueue = workQueue; this.keepAliveTime = unit.toNanos(keepAliveTime); this.threadFactory = threadFactory; this.handler = handler; }
通过构造方法public ThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue, ThreadFactory threadFactory, RejectedExecutionHandler handler)
我们可以知晓线程池可自定义的参数。
通过API可知,各参数意义如下:
用于指定线程池中处于就绪状态的线程最小数,也就是说,当没有任务提交时,线程池中保持多少个工作线程。如果设置了工作线程的超时时间,则此值可能为0。注:当向线程池提交一个任务时,若线程池已创建的线程数小于corePoolSize,即便此时存在空闲线程,也会通过创建一个新线程来执行该任务,直到已创建的线程数大于或等于corePoolSize。
/** * Core pool size is the minimum number of workers to keep alive * (and not allow to time out etc) unless allowCoreThreadTimeOut * is set, in which case the minimum is zero. */ private volatile int corePoolSize;
用于指定线程池可容纳的最大工作线程数,线程池根据corePoolSize和maximumPoolSize来自动调节容量大小:
A ThreadPoolExecutor will automatically adjust the pool size according to the bounds set by corePoolSize and maximumPoolSize.
/** * Maximum pool size. Note that the actual maximum is internally * bounded by CAPACITY. */ private volatile int maximumPoolSize;
指定空闲线程等待的超时时间(默认是纳秒)。默认情况下,只有当线程池中的线程数大于corePoolSize时,keepAliveTime才会起作用。如果任务很多,并且每个任务执行的时间比较短,可以调大这个时间,提高线程的利用率。
/** * Timeout in nanoseconds for idle threads waiting for work. * Threads use this timeout when there are more than corePoolSize * present or if allowCoreThreadTimeOut. Otherwise they wait * forever for new work. */ private volatile long keepAliveTime;
指定keepAliveTime的单位,默认使用纳秒。TimeUnit是一个枚举,定义的值如下:
TimeUnit.NANOSECONDS; //纳秒 TimeUnit.MICROSECONDS; //微妙 TimeUnit.MILLISECONDS; //毫秒 TimeUnit.SECONDS; //秒 TimeUnit.MINUTES; //分钟 TimeUnit.HOURS; //小时 TimeUnit.DAYS; //天 public enum TimeUnit { /** * Time unit representing one thousandth of a microsecond */ NANOSECONDS { public long toNanos(long d) { return d; } public long toMicros(long d) { return d/(C1/C0); } public long toMillis(long d) { return d/(C2/C0); } public long toSeconds(long d) { return d/(C3/C0); } public long toMinutes(long d) { return d/(C4/C0); } public long toHours(long d) { return d/(C5/C0); } public long toDays(long d) { return d/(C6/C0); } public long convert(long d, TimeUnit u) { return u.toNanos(d); } int excessNanos(long d, long m) { return (int)(d - (m*C2)); } }, ... }
用于存储等待中的任务的队列。
/** * The queue used for holding tasks and handing off to worker * threads. We do not require that workQueue.poll() returning * null necessarily means that workQueue.isEmpty(), so rely * solely on isEmpty to see if the queue is empty (which we must * do for example when deciding whether to transition from * SHUTDOWN to TIDYING). This accommodates special-purpose * queues such as DelayQueues for which poll() is allowed to * return null even if it may later return non-null when delays * expire. */ private final BlockingQueue<Runnable> workQueue;
而BlockingQueue的实现类如下:
而Executors工厂方法中使用的是LinkedBlockingQueue
、DelayedWorkQueue
和SynchronousQueue
。
用于创建线程。
/** * Factory for new threads. All threads are created using this * factory (via method addWorker). All callers must be prepared * for addWorker to fail, which may reflect a system or user's * policy limiting the number of threads. Even though it is not * treated as an error, failure to create threads may result in * new tasks being rejected or existing ones remaining stuck in * the queue. * * We go further and preserve pool invariants even in the face of * errors such as OutOfMemoryError, that might be thrown while * trying to create threads. Such errors are rather common due to * the need to allocate a native stack in Thread.start, and users * will want to perform clean pool shutdown to clean up. There * will likely be enough memory available for the cleanup code to * complete without encountering yet another OutOfMemoryError. */ private volatile ThreadFactory threadFactory;
用于指定当线程池和工作队列都处于饱和状态时,新任务的处理策略。
/** * Handler called when saturated or shutdown in execute. */ private volatile RejectedExecutionHandler handler;
AbortPolicy:终止策略
通过抛出一个RejectedExecutionException
异常而拒绝此任务。
** * A handler for rejected tasks that throws a * {@code RejectedExecutionException}. */ public static class AbortPolicy implements RejectedExecutionHandler {...}
CallerRunsPolicy:调用者执行策略
直接让调用线程执行此任务,若调用线程已经停止,则丢弃此任务。
/** * A handler for rejected tasks that runs the rejected task * directly in the calling thread of the {@code execute} method, * unless the executor has been shut down, in which case the task * is discarded. */ public static class CallerRunsPolicy implements RejectedExecutionHandler {...}
DiscardOldestPolicy:丢弃最旧任务策略
丢弃处于等待队列头部的任务,并尝试执行此任务。
/** * A handler for rejected tasks that discards the oldest unhandled * request and then retries {@code execute}, unless the executor * is shut down, in which case the task is discarded. */ public static class DiscardOldestPolicy implements RejectedExecutionHandler {...}
DiscardPolicy:丢弃策略
静默拒绝任务,不抛出异常。
/** * A handler for rejected tasks that silently discards the * rejected task. */ public static class DiscardPolicy implements RejectedExecutionHandler {...}
向线程池提交任务主要有两种方式
execute(Runnable command)
submit(Callable<T> task)
、submit(Runnable task, T result)
、submit(Runnable task)
传入一个Runnable对象,无返回值。无法知晓任务是否执行成功。
public class ThreadPoolExecutor extends AbstractExecutorService { ... public void execute(Runnable command) { if (command == null) throw new NullPointerException(); ... } }
传入一个Callable或Runnable对象,返回一个Future对象。通过返回的Future对象可判断让任务是否执行成功。可通过future的get方法来获取任务执行结果,get方法会阻塞住直到任务完成,而使用get(long timeout, TimeUnit unit)方法则会阻塞一段时间后立即返回,这时有可能任务没有执行完。
public interface ExecutorService extends Executor { <T> Future<T> submit(Callable<T> task); <T> Future<T> submit(Runnable task, T result); Future<?> submit(Runnable task); ... }
ThreadPoolExecutor提供了两个关闭方法:shutdown()
和shutdownNow()
shutdown的原理是只是将线程池的状态设置成SHUTDOWN状态,然后中断所有没有正在执行任务的线程。shutdownNow的原理是遍历线程池中的工作线程,然后逐个调用线程的interrupt方法来中断线程,所以无法响应中断的任务可能永远无法终止。shutdownNow会首先将线程池的状态设置成STOP,然后尝试停止所有的正在执行或暂停任务的线程,并返回等待执行任务的列表。
只要调用了这两个关闭方法的其中一个,isShutdown方法就会返回true。当所有的任务都已关闭后,才表示线程池关闭成功,这时调用isTerminaed方法会返回true。至于我们应该调用哪一种方法来关闭线程池,应该由提交到线程池的任务特性决定,通常调用shutdown来关闭线程池,如果任务不一定要执行完,则可以调用shutdownNow。
public class ThreadPoolExecutor extends AbstractExecutorService { ... public void shutdown() { final ReentrantLock mainLock = this.mainLock; mainLock.lock(); try { //检查终止权限 checkShutdownAccess(); //设置线程池状态为SHUTDOWN advanceRunState(SHUTDOWN); //中断线程 interruptIdleWorkers(); onShutdown(); // hook for ScheduledThreadPoolExecutor } finally { mainLock.unlock(); } tryTerminate(); } public List<Runnable> shutdownNow() { List<Runnable> tasks; final ReentrantLock mainLock = this.mainLock; mainLock.lock(); try { //检查终止权限 checkShutdownAccess(); //设置线程池状态为STOP advanceRunState(STOP); //中断线程 interruptWorkers(); //获取待执行任务的列表 tasks = drainQueue(); } finally { mainLock.unlock(); } tryTerminate(); return tasks; } }