众所周知,消息队列是应用系统中重要的组件,主要解决应用解耦,异步消息,流量削锋等问题,实现高性能,高可用,可伸缩和最终一致性架构。目前使用较多的消息队列有 ActiveMQ,RabbitMQ,ZeroMQ,Kafka,MetaMQ,RocketMQ.
但是如果你不想为你的系统引入一个重量级(相对 redis 来说)的 mq,但是想要享受解耦、异步消息等特性,通过本文你就 get 到了,通过 redis 实现一个简单版的 mq。
redis 通常作为缓存服务引入,因此大部分系统都会有 redis
redis 本身的资源消耗是极小的,符合我们的轻量要求
redis 速度很快,几乎不会出现速度瓶颈
redis 有持久化方案,调整配置项可以在数据安全和速度间进行取舍(参考这篇)[https://segmentfault.com/a/1190000002906345]
利用 redis 的队列结构来实现消息队列。redis 单个队列最多支持 2*32-1 条数据,对于大部分应用是完全够用的。
简单来说就是:
每个 topic 对应一条队列
从队列一段写入数据,从另一端读取数据
消费失败,重新将消息放入队列
注意:代码仅供个人尝鲜使用,请勿用于真实生产环境
代码仅可在 springboot 环境中使用
注解代码如下:
@Target(ElementType.TYPE)@Retention(RetentionPolicy.RUNTIME)@Componentpublic @interface MqConsumer { /** * 队列主题 */ String topic() default "default_es_topic"; }
被该注解修饰的类,将会接收 topic 下的消息。
接口代码如下:
public interface RedisConsumer { /** * 功能描述: 消费方法,消费者类必须继承此方法 * * @param message 数据载体 * @author 123 * @date 2020/3/28 22:41 */ void deal(String message); }
本接口用于定于接受消息的处理方法。
本部分为核心代码,首先需要获取代码中被注解修饰的类,然后建立一个循环从 redis 队列中取数据,最后调用类对象的 deal 方法消费消息,如果 deal 方法抛出错误,认为消费失败,重新将该数据放入队列中。
扫描部分代码如下:
/** * MqConfiguration.java */@Overridepublic void run(ApplicationArguments args) { Map
run 方法在 spring 扫描完毕后调用,通过实现ApplicationRunner
接口实现,通过 spring 的方法来获取所有被MqConsumer
接口注解的类(否则需要自己写类加载器)。数据汇总完毕后使用一个线程来进行无线循环从 redis 队列中取数据。
执行线程部分代码如下:
private Runnable loop() { return () -> { while (true) { AtomicInteger count = new AtomicInteger(0); topicMap.forEach((k, v) -> { try { String message = mqUtil.getRedisTemplate().opsForList().rightPop(k); if (message == null) { count.getAndIncrement(); } else { pushTask(v, message, k); } } catch (RedisConnectionFailureException connException) { log.error("redis无法连接,10s后重试", connException); sleep(10); } catch (Exception e) { log.error("redis消息队列异常", e); } }); if (count.get() == topicMap.keySet().size()) { //当所有的队列都为空时休眠1s sleep(1); } } }; }private void pushTask(RedisConsumer item, String value, String key) { threadPoolExecutor.execute(() -> { try { item.deal(value); } catch (Exception e) { log.error("执行消费任务出错", e); //非广播消息进行数据回补 mqUtil.getRedisTemplate().opsForList().rightPush(key, value); } }); }
loop 方法无限循环根据 topic 从 redis 中取数据,如果取到数据,调用 pushTask 方法执行,如果执行报错将会进行数据回补。
完整代码见本文结尾
运行项目后调用,MainController
中的接口即可测试。
完整代码:点击下方阅读原文,在文章底部就有了哦!