这一篇,我们来聊一聊线程池,和他的设计原理,以及一些关键参数和如何动态设计线程参数,很多东西咱们在之前的文章中已经讨论过,这一篇就相对轻松点,run。
创建线程遇见的麻烦
- 线程的频繁创建和销毁需要消耗cpu的资源
- 线程数量越多,cpu的资源的开销就越大
- 如果线程的数量,远远大于cpu的核心数量的话,那么cpu就需要不断的进行上下文切换(比如线程从A切换到B那程序要记住上次执行的线程A执行的位置),而如果他不断的进行切换的话,那从某个方面就耗费了性能,因为他就没有空做正事了,而全身心的都在线程切换上了。
那我们就自然而然的想到了线程的复用,这里金引入了【池化技术】:连接池、对象池、内存池、线程池这些都是池化技术的体现,那池化技术的核心就是复用,把一些复杂的对象,创建过程中需要消耗很多资源的对象,放置在一个池子中。
【线程池】:我可以提前创建一系列的现场保存在池子中,有任务要执行的时候,就从池子中获取一个线程去执行
java中提供一个Executors去操纵线程,其中提供这么几个方法进行线程池的构建(他们都是使用的是【ThreadPoolExecutor】):
- 【newFixedThreadPool】:固定线程数量,根据你传递的数构建相应的线程数
public static ExecutorService newFixedThreadPool(int nThreads) { return new ThreadPoolExecutor(nThreads, nThreads, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>()); }
- 【newSingleThreadExecutor】:只有一个线程的线程池
public static ExecutorService newSingleThreadExecutor() { return new FinalizableDelegatedExecutorService (new ThreadPoolExecutor(1, 1, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>())); }
- 【newCachedThreadPool】:可以缓存的线程池(有多少请求,此线程即可以创建多少个线程来处理请求)
public static ExecutorService newCachedThreadPool() { return new ThreadPoolExecutor(0, Integer.MAX_VALUE, 60L, TimeUnit.SECONDS, new SynchronousQueue<Runnable>()); }
- 【newScheduledThreadPool】:提供了按照周期执行的线程池(只是顶层对定时器做了一层封装,底层还是调用的)
public static ScheduledExecutorService newSingleThreadScheduledExecutor() { return new DelegatedScheduledExecutorService (new ScheduledThreadPoolExecutor(1)); }
【需求】:
实现线程的重复利用
【问题】:
- 【如何实现线程的复用】:让线程实现复用的唯一办法就是让线程不结束,结束的点有:interrupt方法,stop方法,以及程序的正常运行,那可把他们放在一个循环中
- 【如何让线程执行指派的新任务】:使用共享内存可以做到(多线程访问一个公共的资源),比如定义一个全局的list,线程去不断的从这个里面取数据(其实就是一个生产者,和消费者的模型,类似于阻塞队列)
- 【如果队列满了怎么办】:究其根本就是数据量太多,线程太少,消费的速度比不上生产的速度,那我们就需要增加线程,增减消费的速度
- 【如果扩容后,依然无法消费庞大的数据怎么办】:【拒绝策略】我们都知道线程的数量是由cpu决定的,如果你创建了很多线程那从某种情况下就导致cpu负荷,降低了效率,所以我们做相应的处理(报错、直接丢弃、直接调用run方法、或者把队列中等待时间最久的线程抛弃、也可以让用户自定义)
- 【请求峰过去了如何销毁线程】:前面我们介绍阻塞队列常用方法的时候有一个【poll方法】(如果超时还没有产生数据,那就返回null)我们就可以使用这个方法去判断,然后把当前的自旋结束
- 【线程一直处于运行状态是否合理】:有任务来的时候运行,没有任务的时候对线程进行阻塞。
【需要的参数】:
线程数量、最大线程数量、选择哪种拒绝策略、任务队列(阻塞队列)、扩容后的线程需要的存活时间和存活单位这个为pool(因为一旦请求峰过去 了,那就不需要这么多线程去处理任务了,就需要设置他们可以存活多久)
public ThreadPoolExecutor(int corePoolSize, // 核心线程数 int maximumPoolSize, //最大线程数 long keepAliveTime,//存活时间 TimeUnit unit,//存活单位 BlockingQueue<Runnable> workQueue,//阻塞队列 ThreadFactory threadFactory //线程工厂(如果想自定义线程池中线程的名字), RejectedExecutionHandler handler) //拒绝策略{ if (corePoolSize < 0 || maximumPoolSize <= 0 || maximumPoolSize < corePoolSize || keepAliveTime < 0) throw new IllegalArgumentException(); if (workQueue == null || threadFactory == null || handler == null) throw new NullPointerException(); this.corePoolSize = corePoolSize; this.maximumPoolSize = maximumPoolSize; this.workQueue = workQueue; this.keepAliveTime = unit.toNanos(keepAliveTime); this.threadFactory = threadFactory; this.handler = handler; }【conclusion】:那基于上面的猜想,我们可以使用阻塞队列让其在整个线程池可见(公共资源),并且可以让线程一直处于活着的状态。当有任务的时候,每个线程自己进行自旋,充当一个消费者,对任务进行执行 ,并且前面也说了阻塞队列是线程安全的,如果到达数据高峰期则进行扩容(线程增加),请求峰期过去后,对某些线程进行释放
!!! 线程池中的线程是延迟加载的,
- 那首先肯定就是初始化核心线程 调用阻塞队列的方法,
- 把任务传递进去(offer()会返回是否存储成功和失败)
- true:说明当前核心线程可以支持当前的访问量
false:增减工作线程
- 如果添加失败,则说明当前的工作线程达到了你传递的最大的线程数,直接调用你传递的拒绝策略
public void execute(Runnable command) { if (command == null) throw new NullPointerException(); int c = ctl.get(); //判断当前工作的线程是否小于核心线程数(延迟初始化) if (workerCountOf(c) < corePoolSize) { //添加工作线程,并且执行任务 if (addWorker(command, true)) return; c = ctl.get(); } //添加到阻塞队列 if (isRunning(c) && workQueue.offer(command)) { int recheck = ctl.get(); if (! isRunning(recheck) && remove(command)) reject(command); else if (workerCountOf(recheck) == 0) addWorker(null, false); } //如果阻塞队列满了,则添加工作线程 else if (!addWorker(command, false)) //调用拒绝策略 reject(command); }添加任务到阻塞队列中,因为是延迟初始化,需要创建线程
private boolean addWorker(Runnable firstTask, boolean core) { retry: //通过原子操作增加线程数量 for (;;) { int c = ctl.get(); int rs = runStateOf(c); // Check if queue empty only if necessary. if (rs >= SHUTDOWN && ! (rs == SHUTDOWN && firstTask == null && ! workQueue.isEmpty())) return false; for (;;) { int wc = workerCountOf(c); //判断是否可以进行创建(是否是最大数) if (wc >= CAPACITY || wc >= (core ? corePoolSize : maximumPoolSize)) return false; //通过原子操作增加线程数量 if (compareAndIncrementWorkerCount(c)) //直接走到最上面向下执行 break retry; c = ctl.get(); // Re-read ctl if (runStateOf(c) != rs) continue retry; // else CAS failed due to workerCount change; retry inner loop } } //初始化工作线程 boolean workerStarted = false; boolean workerAdded = false; Worker w = null; try { //构建一个工作线程,并传入任务(但是这里还没有启动) w = new Worker(firstTask); final Thread t = w.thread; if (t != null) { final ReentrantLock mainLock = this.mainLock; mainLock.lock(); try { int rs = runStateOf(ctl.get()); if (rs < SHUTDOWN || (rs == SHUTDOWN && firstTask == null)) { if (t.isAlive()) throw new IllegalThreadStateException(); //如果不是中断状态则添加他到容器中这是存储工作线程的线程容器(HashSet<Worker> workers = new HashSet<Worker>();) workers.add(w); int s = workers.size(); //重新更新最大线程数量 if (s > largestPoolSize) largestPoolSize = s; //这里添加成功 workerAdded = true; } } finally { mainLock.unlock(); } if (workerAdded) { //启动线程 t.start(); workerStarted = true; } } } finally { //如果失败,把添加的线程释放,然后回归元素 if (! workerStarted) addWorkerFailed(w); } return workerStarted; }运行线程去获取任务
final void runWorker(Worker w) { Thread wt = Thread.currentThread(); Runnable task = w.firstTask; w.firstTask = null; w.unlock(); // allow interrupts boolean completedAbruptly = true; try { //保证当前线程不结束,直到task为空 while (task != null || (task = getTask()) != null) { //worker继承了Aqs 这里实现了一个互斥锁,表示当前线程正在运行一个任务,如果其他地方呀shutdown则必须保证线程执行完成 w.lock(); if ((runStateAtLeast(ctl.get(), STOP) || (Thread.interrupted() && runStateAtLeast(ctl.get(), STOP))) && !wt.isInterrupted()) wt.interrupt(); try { //这里是空的实现,我们可以自定义自己的扩展 beforeExecute(wt, task); Throwable thrown = null; try { task.run(); } catch (RuntimeException x) { thrown = x; throw x; } catch (Error x) { thrown = x; throw x; } catch (Throwable x) { thrown = x; throw new Error(x); } finally { //这里是空的实现,我们可以自定义自己的扩展 afterExecute(task, thrown); } } finally { task = null; w.completedTasks++; w.unlock(); } } completedAbruptly = false; } finally { processWorkerExit(w, completedAbruptly); } }进行任务获取
private Runnable getTask() { boolean timedOut = false; // Did the last poll() time out? for (;;) { int c = ctl.get(); int rs = runStateOf(c); //如果线程池已经是结束状态,,则清理所有的线程 // Check if queue empty only if necessary. if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) { decrementWorkerCount(); return null; } int wc = workerCountOf(c); //是否允许超时 // Are workers subject to culling? boolean timed = allowCoreThreadTimeOut || wc > corePoolSize; //如果当前线程大于核心线程数 if ((wc > maximumPoolSize || (timed && timedOut)) && (wc > 1 || workQueue.isEmpty())) { //通过cas减少工作线程数量 if (compareAndDecrementWorkerCount(c)) //表示要销毁当前工作线程 return null; continue; } //进行任务的获取 try { //如果当前阻塞没有任务,当前线程会阻塞在这里 Runnable r = timed ? workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) : workQueue.take(); if (r != null) return r; timedOut = true; } catch (InterruptedException retry) { timedOut = false; } } }
线程池提供了修改线程的数量,我们可以通过配置中心来动态配置,然而我们需要设置多少线程呢,其实也没有一个准确的数字,因为系统中可能不止你一个线程池。常见的计算线程数的算法
IO密集型 = 2Ncpu(数据库数据交互、文件上传下载、网络数据传输等等)
计算密集型 = Ncpu(复杂算法)
下面给出动态改变线程数和队列数的代码。队列数是final修饰的,我们只有复制原来的队列出来,给里面加一个方法
public class DynamicChangeNumber { static ThreadPoolExecutor executorService = new ThreadPoolExecutor(2, 4, 60, TimeUnit.SECONDS, new ReSizeLinkedBlockingQueue<>(30)); public static void main(String[] args) { for (int i = 0; i < 10; i++) { executorService.execute(() -> { try { //模拟任务的执行 Thread.sleep(1000); } catch (InterruptedException e) { e.printStackTrace(); } }); } printThreadStatus("改变之前"); executorService.setCorePoolSize(4); executorService.setMaximumPoolSize(10); ReSizeLinkedBlockingQueue reSizeLinkedBlockingQueue = (ReSizeLinkedBlockingQueue) executorService.getQueue(); reSizeLinkedBlockingQueue.setCapacity(100); printThreadStatus("改变之后"); } public static void printThreadStatus(String name) { ReSizeLinkedBlockingQueue reSizeLinkedBlockingQueue = (ReSizeLinkedBlockingQueue) executorService.getQueue(); System.out.println(name+"------>核心线程数" + executorService.getPoolSize() + "最大线程数" + executorService.getMaximumPoolSize() + "队列容量" + (reSizeLinkedBlockingQueue.size() + reSizeLinkedBlockingQueue.remainingCapacity())); } }结果