package com.dong.mytest.demo.client; import cn.hutool.extra.spring.SpringUtil; import com.dong.mytest.demo.common.dto.DelayMessage; import com.dong.mytest.demo.common.util.DateUtil; import com.dong.mytest.demo.service.delayqueue.DelayQueueConsumer; import com.google.common.collect.Lists; import lombok.extern.slf4j.Slf4j; import org.redisson.api.RBlockingQueue; import org.redisson.api.RDelayedQueue; import org.redisson.api.RedissonClient; import org.springframework.beans.factory.InitializingBean; import org.springframework.stereotype.Component; import javax.annotation.Resource; import java.util.List; import java.util.Map; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.TimeUnit; /** * @author dong */ @Slf4j @Component public class RedissonDelayQueueClient implements InitializingBean { @Resource private RedissonClient redissonClient; private final Map<String, RDelayedQueue<DelayMessage>> delayQueueMap = new ConcurrentHashMap<>(16); public void addDelayMessage(DelayMessage delayMessage) { log.info("delayMessage={}", delayMessage); if (delayQueueMap.get(delayMessage.getQueueName()) == null) { log.warn("queueName={},该延迟队列不存在,请确认后再试...", delayMessage.getQueueName()); return; } delayMessage.setCreateTime(DateUtil.getNowFormatStr()); RDelayedQueue<DelayMessage> rDelayedQueue = delayQueueMap.get(delayMessage.getQueueName()); rDelayedQueue.offer(delayMessage, delayMessage.getDelayTime(), delayMessage.getTimeUnit() == null ? TimeUnit.SECONDS : delayMessage.getTimeUnit()); } @Override public void afterPropertiesSet() throws Exception { // 有新的延迟队列在这里添加,队列消费类需要继承DelayQueueConsumer,并且service名称为 ${queueName}Consumer List<String> queueNameList = Lists.newArrayList("orderAutoCancelDelayQueue"); // 加载延迟队列 for (String queueName : queueNameList) { DelayQueueConsumer delayQueueConsumer = SpringUtil.getBean(queueName + "Consumer"); if (delayQueueConsumer == null) { throw new RuntimeException("queueName=" + queueName + ",delayQueueConsumer=null,请检查配置..."); } // Redisson的延时队列是对另一个队列的再包装,使用时要先将延时消息添加到延时队列中,当延时队列中的消息达到设定的延时时间后,该延时消息才会进行进入到被包装队列中,因此,我们只需要对被包装队列进行监听即可。 RBlockingQueue<DelayMessage> rBlockingQueue = redissonClient.getBlockingDeque(queueName); RDelayedQueue<DelayMessage> rDelayedQueue = redissonClient.getDelayedQueue(rBlockingQueue); delayQueueMap.put(queueName, rDelayedQueue); // 订阅新元素的到来,调用的是takeAsync(),异步执行 rBlockingQueue.subscribeOnElements(delayQueueConsumer::execute); } } }
package com.dong.mytest.demo.common.dto; import com.alibaba.fastjson.JSON; import lombok.Data; import java.io.Serializable; import java.util.concurrent.TimeUnit; /** * @author dong */ @Data public class DelayMessage implements Serializable { private String queueName; private Long delayTime; private TimeUnit timeUnit; private String msgBody; private String createTime; @Override public String toString() { return JSON.toJSONString(this); } }
package com.dong.mytest.demo.service.delayqueue; import com.dong.mytest.demo.common.dto.DelayMessage; /** * @author dong */ public interface DelayQueueConsumer { /** * 执行延迟消息 * * @param delayMessage delayMessage */ void execute(DelayMessage delayMessage); }
package com.dong.mytest.demo.service.delayqueue.impl; import com.dong.mytest.demo.common.dto.DelayMessage; import com.dong.mytest.demo.service.delayqueue.DelayQueueConsumer; import lombok.extern.slf4j.Slf4j; import org.springframework.stereotype.Service; /** * @author dong */ @Service("orderAutoCancelDelayQueueConsumer") @Slf4j public class OrderAutoCancelDelayQueueConsumer implements DelayQueueConsumer { @Override public void execute(DelayMessage delayMessage) { log.info("====OrderAutoCancelConsumer=====delayMessage={}", delayMessage); } }