电商大家都用过吧,下单后若未支付,通常都有一段支付倒计时,比如15分钟,若时间到了之后,还未支付的,订单将被关闭,库存将被释放。
这种业务就需要用到延迟队列的功能,将任务丢到延迟队列、设置一个延迟时间、回调函数,到了时间之后,延迟队列将回调指定的函数消费指定的任务。
下面代码是一个通用的延迟队列的实现,大家可以直接拿去用。
代码还是比较简单的,技术要点:
import java.util.concurrent.*;import java.util.function.Consumer;import java.util.logging.Logger;public class DelayQueueService<T> { Logger logger = Logger.getLogger(DelayQueueService.class.getName()); //延迟队列名称 private String delayQueueName; private DelayQueue<DelayedTask> delayQueue = new DelayQueue<>(); //处理队列中任务的线程池 private ExecutorService executorService; public DelayQueueService(String delayQueueName) { this(delayQueueName, Executors.newFixedThreadPool(Runtime.getRuntime().availableProcessors() * 4)); } public DelayQueueService(String delayQueueName, ExecutorService executorService) { this.delayQueueName = delayQueueName; this.executorService = executorService; //启动队列消费 this.start(); } /** * 添加任务 * * @param delayedTimeUnit 延迟时间单位 * @param delayedTime 延迟时间 * @param task 任务 * @param consumer 任务消费者(到期了会回调) */ public void addTask(TimeUnit delayedTimeUnit, long delayedTime, T task, Consumer<T> consumer) { this.delayQueue.offer(new DelayedTask(delayedTimeUnit, delayedTime, task, consumer)); } private void start() { //轮询从延迟队列中拉取任务,然后调用线程池进行处理 Thread pollThread = new Thread(() -> { while (true) { try { DelayedTask delayedTask = this.delayQueue.poll(100, TimeUnit.MILLISECONDS); if (this.executorService.isShutdown()) { break; } if (delayedTask != null) { executorService.submit(() -> { delayedTask.consumer.accept(delayedTask.task); }); } } catch (InterruptedException e) { logger.warning(e.getMessage()); } } }); pollThread.setDaemon(Thread.currentThread().isDaemon()); pollThread.setName(this.getClass().getName() + "-pollThread-" + this.delayQueueName); pollThread.start(); } public void close() { if (!this.executorService.isShutdown()) { this.executorService.shutdown(); } } public class DelayedTask implements Delayed { //延迟时间单位 private TimeUnit delayedTimeUnit; //延迟时间 private long delayedTime; //到期时间(毫秒) private long endTime; //延迟任务信息 private T task; //消费者 private Consumer<T> consumer; public DelayedTask(TimeUnit delayedTimeUnit, long delayedTime, T task, Consumer<T> consumer) { this.delayedTimeUnit = delayedTimeUnit; this.delayedTime = delayedTime; this.task = task; this.endTime = System.currentTimeMillis() + delayedTimeUnit.toMillis(delayedTime); this.consumer = consumer; } @Override public long getDelay(TimeUnit unit) { return unit.convert(this.endTime - System.currentTimeMillis(), TimeUnit.MILLISECONDS); } @Override public int compareTo(Delayed o) { DelayedTask task = (DelayedTask) o; return Long.compare(this.endTime, task.endTime); } } public static void main(String[] args) { //创建一个延迟队列:用来对超过支付日期的订单进行关闭 String delayQueueName = "orderCloseDelayQueue"; //1、创建延迟队列 DelayQueueService<String> orderCloseDelayQueue = new DelayQueueService<String>(delayQueueName); for (int i = 1; i <= 10; i++) { //2、调用addTask将延迟任务加入延迟队列 orderCloseDelayQueue.addTask(TimeUnit.SECONDS, i, "订单" + i, new Consumer<String>() { @Override public void accept(String s) { System.out.println(System.currentTimeMillis() + "," + Thread.currentThread() + ",关闭订单:" + s); } }); } //3、系统关闭的时候,调用延迟队列的close方法 //orderCloseDelayQueue.close(); }}
main方法中模拟了10个延迟任务,运行看看效果,输出
1614346780438,Thread[pool-1-thread-1,5,main],关闭订单:订单11614346781437,Thread[pool-1-thread-2,5,main],关闭订单:订单21614346782436,Thread[pool-1-thread-3,5,main],关闭订单:订单31614346783437,Thread[pool-1-thread-4,5,main],关闭订单:订单41614346784437,Thread[pool-1-thread-5,5,main],关闭订单:订单51614346785437,Thread[pool-1-thread-6,5,main],关闭订单:订单61614346786437,Thread[pool-1-thread-7,5,main],关闭订单:订单71614346787436,Thread[pool-1-thread-8,5,main],关闭订单:订单81614346788437,Thread[pool-1-thread-9,5,main],关闭订单:订单91614346789437,Thread[pool-1-thread-10,5,main],关闭订单:订单10来源:http://itsoku.com/course/1/205