Java教程

Java高并发专题之37、如何实现一个通用的延迟队列?

本文主要是介绍Java高并发专题之37、如何实现一个通用的延迟队列?,对大家解决编程问题具有一定的参考价值,需要的程序猿们随着小编来一起学习吧!

电商大家都用过吧,下单后若未支付,通常都有一段支付倒计时,比如15分钟,若时间到了之后,还未支付的,订单将被关闭,库存将被释放。

这种业务就需要用到延迟队列的功能,将任务丢到延迟队列、设置一个延迟时间、回调函数,到了时间之后,延迟队列将回调指定的函数消费指定的任务。

下面代码是一个通用的延迟队列的实现,大家可以直接拿去用。

代码还是比较简单的,技术要点:

  • 调用addTask方法将任务丢到延迟队列中,主要参数(延迟时间、任务信息、回调【任务到期后会进行回调】)
  • 使用到了java中的延迟队列DelayQueue来存放延迟任务
  • 下面的构造方法会自动调用一个start方法,start方法中会自动启动一个线程,线程轮询从延迟队列中拉取到期的任务,然后丢到线程池executorService.submit中进行处理,会自动调用创建延迟任务中指定的回调函数
  • main方法中有使用步骤
import java.util.concurrent.*;import java.util.function.Consumer;import java.util.logging.Logger;public class DelayQueueService<T> {    Logger logger = Logger.getLogger(DelayQueueService.class.getName());    //延迟队列名称    private String delayQueueName;    private DelayQueue<DelayedTask> delayQueue = new DelayQueue<>();    //处理队列中任务的线程池    private ExecutorService executorService;    public DelayQueueService(String delayQueueName) {        this(delayQueueName, Executors.newFixedThreadPool(Runtime.getRuntime().availableProcessors() * 4));    }    public DelayQueueService(String delayQueueName, ExecutorService executorService) {        this.delayQueueName = delayQueueName;        this.executorService = executorService;        //启动队列消费        this.start();    }    /**     * 添加任务     *     * @param delayedTimeUnit 延迟时间单位     * @param delayedTime     延迟时间     * @param task            任务     * @param consumer        任务消费者(到期了会回调)     */    public void addTask(TimeUnit delayedTimeUnit, long delayedTime, T task, Consumer<T> consumer) {        this.delayQueue.offer(new DelayedTask(delayedTimeUnit, delayedTime, task, consumer));    }    private void start() {        //轮询从延迟队列中拉取任务,然后调用线程池进行处理        Thread pollThread = new Thread(() -> {            while (true) {                try {                    DelayedTask delayedTask = this.delayQueue.poll(100, TimeUnit.MILLISECONDS);                    if (this.executorService.isShutdown()) {                        break;                    }                    if (delayedTask != null) {                        executorService.submit(() -> {                            delayedTask.consumer.accept(delayedTask.task);                        });                    }                } catch (InterruptedException e) {                    logger.warning(e.getMessage());                }            }        });        pollThread.setDaemon(Thread.currentThread().isDaemon());        pollThread.setName(this.getClass().getName() + "-pollThread-" + this.delayQueueName);        pollThread.start();    }    public void close() {        if (!this.executorService.isShutdown()) {            this.executorService.shutdown();        }    }    public class DelayedTask implements Delayed {        //延迟时间单位        private TimeUnit delayedTimeUnit;        //延迟时间        private long delayedTime;        //到期时间(毫秒)        private long endTime;        //延迟任务信息        private T task;        //消费者        private Consumer<T> consumer;        public DelayedTask(TimeUnit delayedTimeUnit, long delayedTime, T task, Consumer<T> consumer) {            this.delayedTimeUnit = delayedTimeUnit;            this.delayedTime = delayedTime;            this.task = task;            this.endTime = System.currentTimeMillis() + delayedTimeUnit.toMillis(delayedTime);            this.consumer = consumer;        }        @Override        public long getDelay(TimeUnit unit) {            return unit.convert(this.endTime - System.currentTimeMillis(), TimeUnit.MILLISECONDS);        }        @Override        public int compareTo(Delayed o) {            DelayedTask task = (DelayedTask) o;            return Long.compare(this.endTime, task.endTime);        }    }    public static void main(String[] args) {        //创建一个延迟队列:用来对超过支付日期的订单进行关闭        String delayQueueName = "orderCloseDelayQueue";        //1、创建延迟队列        DelayQueueService<String> orderCloseDelayQueue = new DelayQueueService<String>(delayQueueName);        for (int i = 1; i <= 10; i++) {            //2、调用addTask将延迟任务加入延迟队列            orderCloseDelayQueue.addTask(TimeUnit.SECONDS, i, "订单" + i, new Consumer<String>() {                @Override                public void accept(String s) {                    System.out.println(System.currentTimeMillis() + "," + Thread.currentThread() + ",关闭订单:" + s);                }            });        }        //3、系统关闭的时候,调用延迟队列的close方法        //orderCloseDelayQueue.close();    }}

main方法中模拟了10个延迟任务,运行看看效果,输出

1614346780438,Thread[pool-1-thread-1,5,main],关闭订单:订单11614346781437,Thread[pool-1-thread-2,5,main],关闭订单:订单21614346782436,Thread[pool-1-thread-3,5,main],关闭订单:订单31614346783437,Thread[pool-1-thread-4,5,main],关闭订单:订单41614346784437,Thread[pool-1-thread-5,5,main],关闭订单:订单51614346785437,Thread[pool-1-thread-6,5,main],关闭订单:订单61614346786437,Thread[pool-1-thread-7,5,main],关闭订单:订单71614346787436,Thread[pool-1-thread-8,5,main],关闭订单:订单81614346788437,Thread[pool-1-thread-9,5,main],关闭订单:订单91614346789437,Thread[pool-1-thread-10,5,main],关闭订单:订单10
来源:http://itsoku.com/course/1/205
这篇关于Java高并发专题之37、如何实现一个通用的延迟队列?的文章就介绍到这儿,希望我们推荐的文章对大家有所帮助,也希望大家多多支持为之网!