Redis教程

Redis 消息队列

本文主要是介绍Redis 消息队列,对大家解决编程问题具有一定的参考价值,需要的程序猿们随着小编来一起学习吧!

一、消息队列

消息队列的基本需求

  • 消息保序:消息的产生与消费有先后顺序,消息队列需保证消费的有序性。
  • 重复消息处理:生产者可能会发送重复消息,需保证消费的幂等性。
  • 消息可靠性保证:消息不能丢失,消费失败后,需保证有机制可以重新消费。

消息队列可靠性的保证

  • 生产者丢数据
  • 消费者丢数据
  • 消息队列自身丢数据

二、基于list的消息队列

Redis 的sub/pub机制,可以实现部分的消息队列机制,但遇到网络连接中断,数据库宕机等情况,数据易丢失无法重复消费。基于list的消费队列可以简单的实现消费队列的部分功能。

实现指令

  • LPUSH:队列插入一条消息;
  • RPOP:队列消费一条消息;
  • BRPOP:阻塞式的消费一条消息;
  • BRPOPLPUSH:阻塞式的消费一条消息的同时将此消息插入另一个消费队列(用于消费失败时的可重复消费);

使用示例

# 消费队列mq中插入一条消息序列为00001,消息值为msg:5的数据
127.0.0.1:6379> lpush mq 00001:msg:5
(integer) 1
# 消费一条消息
127.0.0.1:6379> rpop mq 
"00001:msg:5"
127.0.0.1:6379> lpush mq 00002:msg:7 00003:msg:4
(integer) 2
# 阻塞式(3s 超时)消费一条消息
127.0.0.1:6379> brpop mq 3
1) "mq"
2) "00002:msg:7"
(2.51s)
127.0.0.1:6379> brpop mq 3
1) "mq"
2) "00003:msg:4"
127.0.0.1:6379> lpush mq 00004:msg:9
(integer) 1
# 阻塞式(3s 超时)消费一条消息,并将消费的数据放入mq2队列,预防消费失败时数据丢失。
127.0.0.1:6379> brpoplpush mq mq2 3
"00003:msg:9"
# 消费成功后再删除掉mq2中的消息
127.0.0.1:6379> rpop mq2
"00003:msg:9"

存在问题

  1. 业务需自行保证序列ID唯一,避免重复消费。
  2. BRPOPLPUSH为了防止消费失败后可重新消费,会多产生的一条消费队列,占用内存。
  3. 并不支持消费组概念,当消费速度较慢时,会导致消息队列数据积压。
  4. 消息队列自身基于redis,本身存在数据丢失风险。

三、基于streams的消息队列

Redis5.0之后新支持了streams数据结构用于支持完备功能的消息队列,新增消费组消费,ACK确认等完备机制。

实现指令

  • XADD:插入消息,保证有序,可以自动生成全局唯一 ID;
  • XREAD:用于读取消息,可以按 ID 读取数据;
  • XREADGROUP:按消费组形式读取消息;
  • XPENDING:用来查询每个消费组内所有消费者已读取但尚未确认的消息;
  • XACK:用于向消息队列确认消息处理已完成;

使用示例

  • xadd key ID field string [field string …]
    • ID :可自己指定唯一序列号,一般用* 指定系统自动生成,生成ID序列号:[毫秒时间戳-0开头序列],如"1633939083516-0"
127.0.0.1:6379> xadd mq * money -5
"1633939083516-0"
127.0.0.1:6379> xadd mq * money -5 money +10
"1633953837863-0"
  • xrange key start end [COUNT count]
127.0.0.1:6379> xrange mq - +
1) 1) "1633939083516-0"
   2) 1) "money"
      2) "-5"
2) 1) "1633953837863-0"
   2) 1) "money"
      2) "-5"
      3) "money"
      4) "+10"
  • xread [COUNT count] [BLOCK milliseconds] STREAMS key [key …] ID [ID …]
    • count :读取数目
    • block :阻塞毫秒数,不设置则代表非阻塞式读取
    • key :消息队列名称
    • id :消息序列号,0-0 代表最小开始,$ 代表从现在开始。
127.0.0.1:6379> xread count 2 block 5000 streams mq 0-0
1) 1) "mq"
   2) 1) 1) "1633939083516-0"
         2) 1) "money"
            2) "-5"
      2) 1) "1633953837863-0"
         2) 1) "money"
            2) "-5"
            3) "money"
            4) "+10"

  • xgroup [CREATE key groupname id-or-$ ] [SETID key id-or-$] [DESTROY key groupname] [DELCONSUMER key groupname consumername]
# 创建消费mq队列的group1组,从ID 0-0开始消费。
127.0.0.1:6379> xgroup create mq group1 0-0
OK
  • xreadgroup GROUP group consumer [COUNT count] [BLOCK milliseconds] STREAMS key [key …] ID [ID …]
# group1组的consumer1开始从0-0消费mq队列
127.0.0.1:6379> xreadgroup GROUP group1 consumer1 STREAMS mq 0-0
1) 1) "mq"
   2) 1) 1) "1633939083516-0"
         2) 1) "money"
            2) "-5"
      2) 1) "1633953837863-0"
         2) 1) "money"
            2) "-5"
            3) "money"
            4) "+10"
  • xpending key group [start end count] [consumer]
# 查询当前读取但为确认消费的消息
127.0.0.1:6379> xpending mq group1 - + 2 consumer1
1) 1) "1633939083516-0"
   2) "consumer1"
   3) (integer) 672137
   4) (integer) 1
2) 1) "1633953837863-0"
   2) "consumer1"
   3) (integer) 672137
   4) (integer) 1
  • xack key group ID [ID …]
# 确认1633939083516-0消息
127.0.0.1:6379> xack mq group1 1633939083516-0
(integer) 1
127.0.0.1:6379> xpending mq group1 - + 2 consumer1
1) 1) "1633953837863-0"
   2) "consumer1"
   3) (integer) 840477
   4) (integer) 1

存在问题

  1. Redis5.0之后版本开始支持。
  2. 消息队列自身基于redis,依然存在数据丢失风险。
  3. 轻量级消息队列,适用于丢失数据不敏感业务(发短信,发通知)。
这篇关于Redis 消息队列的文章就介绍到这儿,希望我们推荐的文章对大家有所帮助,也希望大家多多支持为之网!