Redis 的sub/pub机制,可以实现部分的消息队列机制,但遇到网络连接中断,数据库宕机等情况,数据易丢失无法重复消费。基于list的消费队列可以简单的实现消费队列的部分功能。
# 消费队列mq中插入一条消息序列为00001,消息值为msg:5的数据 127.0.0.1:6379> lpush mq 00001:msg:5 (integer) 1 # 消费一条消息 127.0.0.1:6379> rpop mq "00001:msg:5"
127.0.0.1:6379> lpush mq 00002:msg:7 00003:msg:4 (integer) 2 # 阻塞式(3s 超时)消费一条消息 127.0.0.1:6379> brpop mq 3 1) "mq" 2) "00002:msg:7" (2.51s) 127.0.0.1:6379> brpop mq 3 1) "mq" 2) "00003:msg:4"
127.0.0.1:6379> lpush mq 00004:msg:9 (integer) 1 # 阻塞式(3s 超时)消费一条消息,并将消费的数据放入mq2队列,预防消费失败时数据丢失。 127.0.0.1:6379> brpoplpush mq mq2 3 "00003:msg:9" # 消费成功后再删除掉mq2中的消息 127.0.0.1:6379> rpop mq2 "00003:msg:9"
Redis5.0之后新支持了streams数据结构用于支持完备功能的消息队列,新增消费组消费,ACK确认等完备机制。
127.0.0.1:6379> xadd mq * money -5 "1633939083516-0" 127.0.0.1:6379> xadd mq * money -5 money +10 "1633953837863-0"
127.0.0.1:6379> xrange mq - + 1) 1) "1633939083516-0" 2) 1) "money" 2) "-5" 2) 1) "1633953837863-0" 2) 1) "money" 2) "-5" 3) "money" 4) "+10"
127.0.0.1:6379> xread count 2 block 5000 streams mq 0-0 1) 1) "mq" 2) 1) 1) "1633939083516-0" 2) 1) "money" 2) "-5" 2) 1) "1633953837863-0" 2) 1) "money" 2) "-5" 3) "money" 4) "+10"
# 创建消费mq队列的group1组,从ID 0-0开始消费。 127.0.0.1:6379> xgroup create mq group1 0-0 OK
# group1组的consumer1开始从0-0消费mq队列 127.0.0.1:6379> xreadgroup GROUP group1 consumer1 STREAMS mq 0-0 1) 1) "mq" 2) 1) 1) "1633939083516-0" 2) 1) "money" 2) "-5" 2) 1) "1633953837863-0" 2) 1) "money" 2) "-5" 3) "money" 4) "+10"
# 查询当前读取但为确认消费的消息 127.0.0.1:6379> xpending mq group1 - + 2 consumer1 1) 1) "1633939083516-0" 2) "consumer1" 3) (integer) 672137 4) (integer) 1 2) 1) "1633953837863-0" 2) "consumer1" 3) (integer) 672137 4) (integer) 1
# 确认1633939083516-0消息 127.0.0.1:6379> xack mq group1 1633939083516-0 (integer) 1 127.0.0.1:6379> xpending mq group1 - + 2 consumer1 1) 1) "1633953837863-0" 2) "consumer1" 3) (integer) 840477 4) (integer) 1