一、需求切入点
在公司做的一个系统业务需要有个定时提醒的功能(数据在mysql中),要求提醒的时间差精准到分钟
解决方案有:
public class MessageProvider { // 延时队列的服务 通过这个bean来统一管理数据 private final DelayingQueueService delayingQueueService; private static String APPOINTMENT_REMIND = "APPOINTMENT_REMIND"; /** * 往队列中添加消息 * @param messageContent */ public void sendMessage(String messageContent, long delay) { try { //业务代码 …… // 将分装好的数据写进队列 delayingQueueService.push(queueMessage); } } catch (Exception e) { e.printStackTrace(); } } /** * 撤回消息 业务延伸 * @param members */ public void withdrawMessage(Long members){ delayingQueueService.removeByMembersId(members); } }
消费者,只关心需要消费的数据
// 从延伸队列拉取符合消费的数据 List<QueueMessage> msgList = delayingQueueService.pull(); ``` msgList.stream().forEach(msg -> { // 拿出已经到期的预约提示 发起提醒 if (current >= msg.getDelayTime()) { try { // 进行业务消费 …… //成功消费后移除消息 delayingQueueService.remove(msg); } ```
延时队列实现
public class DelayingQueueService { private static ObjectMapper mapper = Jackson2ObjectMapperBuilder.json().build(); //key:membersId value:message private static ConcurrentHashMap <Long,String> membersMap= new ConcurrentHashMap<>(); private final StringRedisTemplate redisTemplate; /** * 可以不同业务用不同的key */ public static final String QUEUE_NAME = "message:queue"; /** * 锁key */ public static final String LOCK_KEY="message_lock_key"; /** * 插入消息 * * @param queueMessage * @return */ @SneakyThrows public Boolean push(QueueMessage queueMessage) { String messageStr = mapper.writeValueAsString(queueMessage); Boolean addFlag = redisTemplate.opsForZSet().add(QUEUE_NAME, messageStr, queueMessage.getDelayTime()); membersMap.put(membersId,messageStr); return addFlag; } /** * 移除消息 * * @param queueMessage * @return */ @SneakyThrows public Boolean remove(QueueMessage queueMessage) { Long remove = redisTemplate.opsForZSet().remove(QUEUE_NAME, mapper.writeValueAsString(queueMessage)); if(remove>0){ membersMap.remove(membersId); } return remove > 0 ? true : false; } /** * 拉取最新需要 * 被消费的消息 * rangeByScore 根据score范围获取 0-当前时间戳可以拉取当前时间及以前的需要被消费的消息 * * @return */ public List<QueueMessage> pull() { List<QueueMessage> msgList =new ArrayList<>(); try { Set<String> strings = redisTemplate.opsForZSet().rangeByScore(QUEUE_NAME, 0, System.currentTimeMillis()); if (strings == null) { return null; } msgList = strings.stream().map(msg -> { QueueMessage message = null; try { message = mapper.readValue(msg, QueueMessage.class); } catch (JsonProcessingException e) { e.printStackTrace(); } return message; }).collect(Collectors.toList()); } catch (Exception e) { log.error(e.toString()); } return msgList; } //获得锁 public Boolean getLock(){ boolean lock = false; //获得锁 lock = redisTemplate.opsForValue().setIfAbsent(LOCK_KEY,QUEUE_NAME+"is locking !",30, TimeUnit.SECONDS); return lock; } public void releaseLock(){ redisTemplate.delete(LOCK_KEY); } }
三、结束语
本文所述的方法也是存在一些小的缺点,比如,数据的正常操作依赖于第三方组件,如果redis挂掉了,这个服务就down掉了,实现延时队列的方法有很多种,基于业务与系统本身的情况,兼容利弊去做一些取舍,以达到最好的效果