系统或者项目中难免会遇到各种需要自动去执行的任务,实现这些任务的手段也多种多样,如操作系统的crontab,spring框架的quartz,java的Timer和ScheduledThreadPool都是定时任务中的典型手段。
Timer是java中最典型的基于优先级队列+最小堆实现的定时器,内部维护一个存放定时任务的优先级队列,该优先级队列使用了最小堆排序。当我们调用schedule方法的时候,一个新的任务被加入queue,堆重排,始终保持堆顶是执行时间最小(即最近马上要执行)的。同时,内部相当于起了一个线程不断扫描队列,从队列中依次获取堆顶元素执行,任务得到调度。
下面以Timer为例,介绍优先级队列+最小堆算法的实现原理:
package com.oldlu.timer; import java.util.Timer; import java.util.TimerTask; class Task extends TimerTask { @Override public void run() { System.out.println("running..."); } } public class TimerDemo { public static void main(String[] args) { Timer t=new Timer(); //在1秒后执行,以后每2秒跑一次 t.schedule(new Task(), 1000,2000); } }
新加任务时,t.schedule方法会add到队列
void add(TimerTask task) { // Grow backing store if necessary if (size + 1 == queue.length) queue = Arrays.copyOf(queue, 2*queue.length); queue[++size] = task; fixUp(size); }
add实现了容量维护,不足时扩容,同时将新任务追加到队列队尾,触发堆排序,始终保持堆顶元素最小
//最小堆排序 private void fixUp(int k) { while (k > 1) { //k指针指向当前新加入的节点,也就是队列的末尾节点,j为其父节点 int j = k >> 1; //如果新加入的执行时间比父节点晚,那不需要动 if (queue[j].nextExecutionTime <= queue[k].nextExecutionTime) break; //如果大于其父节点,父子交换 TimerTask tmp = queue[j]; queue[j] = queue[k]; queue[k] = tmp; //交换后,当前指针继续指向新加入的节点,继续循环,知道堆重排合格 k = j; } }
线程调度中的run,主要调用内部mainLoop()方法,使用while循环
private void mainLoop() { while (true) { try { TimerTask task; boolean taskFired; synchronized(queue) { //... // Queue nonempty; look at first evt and do the right thing long currentTime, executionTime; task = queue.getMin(); synchronized(task.lock) { //... //当前时间 currentTime = System.currentTimeMillis(); //要执行的时间 executionTime = task.nextExecutionTime; //判断是否到了执行时间 if (taskFired = (executionTime<=currentTime)) { //判断下一次执行时间,单次的执行完移除 //循环的修改下次执行时间 if (task.period == 0) { // Non‐repeating, remove queue.removeMin(); task.state = TimerTask.EXECUTED; } else { // Repeating task, reschedule //下次时间的计算有两种策略 //1.period是负数,那下一次的执行时间就是当前时间‐period //2.period是正数,那下一次就是该任务本次的执行时间+period //注意!这两种策略大不相同。因为Timer是单线程的 //如果是1,那么currentTime是当前时间,就受任务执行长短影响 //如果是2,那么executionTime是绝对时间戳,与任务长短无关 queue.rescheduleMin( task.period<0 ? currentTime ‐ task.period : executionTime + task.period); } } } //不到执行时间,等待 if (!taskFired) // Task hasn't yet fired; wait queue.wait(executionTime ‐ currentTime); } //到达执行时间,run! if (taskFired) // Task fired; run it, holding no locks task.run(); } catch(InterruptedException e) { } } }
本节使用Timer为了介绍算法原理,但是Timer已过时,实际应用中推荐使用
ScheduledThreadPoolExecutor(同样内部使用DelayedWorkQueue和最小堆排序)
Timer是单线程,一旦一个失败或出现异常,将打断全部任务队列,线程池不会
Timer在jdk1.3+,而线程池需要jdk1.5+
时间轮是一种更为常见的定时调度算法,各种操作系统的定时任务调度,linux crontab,基于java的通信框架
Netty等。其灵感来源于我们生活中的时钟。
轮盘实际上是一个头尾相接的环状数组,数组的个数即是插槽数,每个插槽中可以放置任务。
以1天为例,将任务的执行时间%12,根据得到的数值,放置在时间轮上,小时指针沿着轮盘扫描,扫到的点取出
任务执行:
问题:比如3点钟,有多个任务执行怎么办?
答案:在每个槽上设置一个队列,队列可以无限追加,解决时间点冲突问题(类似HashMap结构)
问题:每个轮盘的时间有限,比如1个月后的第3天的5点怎么办?
方案一:加长时间刻度,扩充到1年
优缺点:简单,占据大量内存,即使插槽没有任务也要空轮询,白白的资源浪费,时间、空间复杂度都高
方案二:每个任务记录一个计数器,表示转多少圈后才会执行。没当指针过来后,计数器减1,减到0的再执行
优缺点:每到一个指针都需要取出链表遍历判断,时间复杂度高,但是空间复杂度低
方案三:设置多个时间轮,年轮,月轮,天轮。1天内的放入天轮,1年后的则放入年轮,当年轮指针读到后,将任
务取出,放入下一级的月轮对应的插槽,月轮再到天轮,直到最小精度取到,任务被执行。
优缺点:不需要额外的遍历时间,但是占据了多个轮的空间。空间复杂度升高,但是时间复杂度降低
定义Task类
package com.oldlu.timer; public class RoundTask { //延迟多少秒后执行 int delay; //加入的序列号,只是标记一下加入的顺序 int index; public RoundTask(int index, int delay) { this.index = index; this.delay = delay; } void run() { System.out.println("task " + index + " start , delay = "+delay); } @Override public String toString() { return String.valueOf(index+"="+delay); } }
时间轮算法:
package com.oldlu.timer; import java.util.LinkedList; import java.util.Random; import java.util.concurrent.Executors; import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicInteger; public class RoundDemo { //小轮槽数 int size1=10; //大轮槽数 int size2=5; //小轮,数组,每个元素是一个链表 LinkedList<RoundTask>[] t1 = new LinkedList[size1]; //大轮 LinkedList<RoundTask>[] t2 = new LinkedList[size2]; //小轮计数器,指针跳动的格数,每秒加1 final AtomicInteger flag1=new AtomicInteger(0); //大轮计数器,指针跳动个格数,即每10s加1 final AtomicInteger flag2=new AtomicInteger(0); //调度器,拖动指针跳动 ScheduledExecutorService service = Executors.newScheduledThreadPool(2); public RoundDemo(){ //初始化时间轮 for (int i = 0; i < size1; i++) { t1[i]=new LinkedList<>(); } for (int i = 0; i < size2; i++) { t2[i]=new LinkedList<>(); } } //打印时间轮的结构,数组+链表 void print(){ System.out.println("t1:"); for (int i = 0; i < t1.length; i++) { System.out.println(t1[i]); } System.out.println("t2:"); for (int i = 0; i < t2.length; i++) { System.out.println(t2[i]); } } //添加任务到时间轮 void add(RoundTask task){ int delay = task.delay; if (delay < size1){ //10以内的,在小轮 t1[delay].addLast(task); }else { //超过小轮的放入大轮,槽除以小轮的长度 t2[delay/size1].addLast(task); } } void startT1(){ //每秒执行一次,推动时间轮旋转,取到任务立马执行 service.scheduleAtFixedRate(new Runnable() { @Override public void run() { int point = flag1.getAndIncrement()%size1; System.out.println("t1 ‐‐‐‐‐> slot "+point); LinkedList<RoundTask> list = t1[point]; if (!list.isEmpty()){ //如果当前槽内有任务,取出来,依次执行,执行完移除 while (list.size() != 0){ list.getFirst().run(); list.removeFirst(); } } } },0,1, TimeUnit.SECONDS); } void startT2(){ //每10秒执行一次,推动时间轮旋转,取到任务下方到t1 service.scheduleAtFixedRate(new Runnable() { @Override public void run() { int point = flag2.getAndIncrement()%size2; System.out.println("t2 =====> slot "+point); LinkedList<RoundTask> list = t2[point]; if (!list.isEmpty()){ //如果当前槽内有任务,取出,放到定义的小轮 while (list.size() != 0){ RoundTask task = list.getFirst(); //放入小轮哪个槽呢?小轮的槽按10取余数 t1[task.delay % size1].addLast(task); //从大轮中移除 list.removeFirst(); } } } },0,10, TimeUnit.SECONDS); } public static void main(String[] args) { RoundDemo roundDemo = new RoundDemo(); //生成100个任务,每个任务的延迟时间随机 for (int i = 0; i < 100; i++) { roundDemo.add(new RoundTask(i,new Random().nextInt(50))); } //打印,查看时间轮任务布局 roundDemo.print(); //启动大轮 roundDemo.startT2(); //小轮启动 roundDemo.startT1(); } }
输出结果严格按delay顺序执行,而不管index是何时被提交的
t1为小轮,10个槽,每个1s,10s一轮回
t2为大轮,5个槽,每个10s,50s一轮回
t1循环到每个槽时,打印槽内的任务数据,如 t1–>slot9 , 打印了3个9s执行的数据
t2循环到每个槽时,将槽内的任务delay时间取余10后,放入对应的t1槽中,如 t2==>slot1
那么t1旋转对应的圈数后,可以取到t2下放过来的任务并执行,如10,11…