消息队列要能支持组件通信消息的快速读写,而 Redis 本身支持数据的高速访问,正好可以满足消息队列的读写性能需求。消息队列的特征和 Redis 提供的消息队列方案。把这两方面的知识和实践经验串连起来,理解基于 Redis 实现消息队列的技术实践。以后当需要为分布式系统组件做消息队列选型时,可以根据组件通信量和消息通信速度的要求,选择出适合的 Redis 消息队列方案。
消息队列存取消息的过程:在分布式系统中,当两个组件要基于消息队列进行通信时,一个组件会把要处理的数据以消息的形式传递给消息队列,这个组件就可以继续执行其他操作了;远端的另一个组件从消息队列中把消息读取出来,再在本地进行处理。
假设组件 1 需要对采集到的数据进行求和计算,并写入数据库,消息到达的速度很快,组件 1 没有办法及时地既做采集,又做计算,并且写入数据库。所以使用基于消息队列的通信,让组件 1 把数据 x 和 y 保存为 JSON 格式的消息,再发到消息队列,它就可以继续接收新的数据了。组件 2 则异步地从消息队列中把数据读取出来,在服务器 2 上进行求和计算后,再写入数据库。如下图:
一般把消息队列中发送消息的组件称为生产者(例子中的组件 1),把接收消息的组件称为消费者(例子中的组件 2),通用的消息队列的架构模型:
使用消息队列时,消费者可以异步读取生产者消息,然后再进行处理。即使生产者发送消息的速度远远超过了消费者处理消息的速度,生产者已经发送的消息也可以缓存在消息队列中,避免阻塞生产者,这是消息队列作为分布式组件通信的一大优势。
不过消息队列在存取消息时,必须要满足三个需求,分别是消息保序、处理重复的消息和保证消息可靠性。
Redis 的 List 和 Streams 两种数据类型可以满足消息队列的这三个需求。
解决消息保序问题:
List 本身就是按先进先出的顺序对数据进行存取的,使用 List 作为消息队列保存消息,已经能满足消息保序的需求了。
生产者使用 LPUSH 命令把要发送的消息依次写入 List,消费者使用 RPOP 命令,从 List 的另一端按照消息的写入顺序,依次读取消息并进行处理。
生产者先用 LPUSH 写入了两条库存消息,分别是 5 和 3,表示要把库存更新为 5 和 3;消费者则用 RPOP 把两条消息依次读出,然后进行相应的处理:
消费者读取数据时的性能风险:
在生产者往 List 中写入数据时,List 并不会主动地通知消费者有新消息写入,如果消费者想要及时处理消息,就需要在程序中不停地调用 RPOP 命令(比如使用一个 while(1) 循 环)。如果有新消息写入,RPOP 命令就会返回结果,RPOP 命令返回空值,再继续循环。
即使没有新消息写入 List,消费者也要不停地调用 RPOP 命令,导致消费者程序的 CPU 一直消耗在执行 RPOP 命令上,带来不必要的性能损失。
Redis 提供了 BRPOP 命令。BRPOP 命令也称为阻塞式读取,客户端在没有读到队列数据时,自动阻塞,直到有新的数据写入队列,再开始读取新数据。和消费者程序自己不停地调用 RPOP 命令相比,这种方式能节省 CPU 开销。
解决重复消息处理的问题要求:消费者程序本身能对重复消息进行判断。
当收到一条消息后,消费者程序就可以对比收到的消息 ID 和记录的已处理过的消息 ID, 来判断当前收到的消息有没有经过处理。如果已经处理过,消费者程序就不再进行处理了。这种处理特性称为幂等性:对于同一条消息,消费者收到一次的处理结果和收到多次的处理结果是一致的。
List 不会为每个消息生成 ID 号的,消息的全局唯一 ID 号就需要生产者程序在发送消息前自行生成。用 LPUSH 命令把消息插入 List 时,需要在消息中包含这个全局唯一 ID。
执行以下命令,把一条全局 ID 为 101030001、库存量为 5 的消息插入了消息队列:
LPUSH mq "101030001:stock:5" (integer) 1
解决消息可靠性问题:
消费者程序从 List 中读取一条消息后,List 就不会再留存这条消息了。消费者程序在处理消息的过程出现了故障或宕机,会导致消息没有处理完成,消费者程序再次启动后,就没法再次从 List 中读取消息了。
List 类型提供了 BRPOPLPUSH 命令:让消费者程序从 一个 List 中读取消息,Redis 会把这个消息再插入到另一个 List(可以叫作备份 List)留存。消费者程序读了消息但没能正常处理,重启后可以从备份 List 中重新读取消息并进行处理了。
用 BRPOPLPUSH 命令留存消息,以及消费者再次读取消息的过程:
生产者先用 LPUSH 把消息“5”“3”插入到消息队列 mq 中。消费者程序使用 BRPOPLPUSH 命令读取消息“5”,同时消息“5”还会被 Redis 插入到 mqback 队列中。消费者程序处理消息“5”时宕机了,重启后可以从 mqback 中再次读取消息“5”,继续处理。
基于 List 类型满足分布式组件对消息队列的三大需求。用 List 做消息队列时可能遇到的一个问题:生产者消息发送很快,而消费者处理消息的速度比较慢,这就导致 List 中的消息越积越多,给 Redis 的内存带来很大压力。
启动多个消费者程序组成一个消费组,一起分担处理 List 中的消息。 但是List 类型并不支持消费组的实现。
Redis 从 5.0 版本开始提供的 Streams 数据类型了。
和 List 相比,Streams 同样能够满足消息队列的三大需求。还支持消费组形式的消息读取。
Streams 是 Redis 专门为消息队列设计的数据类型,提供了丰富的消息队列操作命令:
XADD 命令往消息队列中插入新消息,消息的格式是键 - 值对形式。对于插入的每一条消息,Streams 可以自动为其生成一个全局唯一的 ID。
执行下面的命令可以往名称为 mqstream 的消息队列中插入一条消息,消息的键是 repo,值是 5。消息队列名称后面的*,表示让 Redis 为插入的数据自动生成一个全局唯一的 ID,例如“1599203861727-0”。也可以不用*,在消息队列名称后自行设定一个 ID 号,只要保证这个 ID 号是全局唯一的就行。相比自行设定 ID 号,使用*会更加方便高效。
XADD mqstream * repo 5 "1599203861727-0"
消息的全局唯一 ID 由两部分组成,第一部分“1599203861727”是数据插入时,以毫秒为单位计算的当前服务器时间,第二部分表示插入消息在当前毫秒内的消息序号,从 0 开始编号的。例如,“1599203861727-0”就表示在“1599203861727”毫秒内的第 1 条消息。
XREAD 命令从消息队列中读取:
XREAD 在读取消息时可以指定一个消息 ID,从这个消息 ID 的下一条消息开始进行读取。
执行下面的命令,从 ID 号为 1599203861727-0 的消息开始,读取后续的所有消息(示例中一共 3 条)。
XREAD BLOCK 100 STREAMS mqstream 1599203861727-0 1) 1) "mqstream" 2) 1) 1) "1599274912765-0" 2) 1) "repo" 2) "3" 2) 1) "1599274925823-0" 2) 1) "repo" 2) "2" 3) 1) "1599274927910-0" 2) 1) "repo" 2) "1"
消费者也可以在调用 XRAED 时设定 block 配置项,实现类似于 BRPOP 的阻塞读取操作。当消息队列中没有消息时设置了 block 配置项,XREAD 就会阻塞,阻塞的时长可以在 block 配置项进行设置。
下面的命令最后的“$”符号表示读取最新的消息, 设置了 block 10000 的配置项,10000 的单位是毫秒,表明 XREAD 在读取最新消息时,如果没有消息到来,XREAD 将阻塞 10000 毫秒(即 10 秒),然后再返回。 下面命令中的 XREAD 执行后,消息队列 mqstream 中一直没有消息,所以XREAD 在 10 秒后返回空值(nil)。
XREAD block 10000 streams mqstream $ (nil) (10.00s)
这些操作是 List 也支持的, 下面是Streams 特有的功能:
Streams 使用 XGROUP 创建消费组,创建消费组之后,Streams 可以使用 XREADGROUP 命令让消费组内的消费者读取消息,执行下面的命令,创建一个名为 group1 的消费组,这个消费组消费的消息队列是 mqstream。
XGROUP create mqstream group1 0 OK
再执行一段命令,让 group1 消费组里的消费者 consumer1 从 mqstream 中读取所有消息,命令最后的参数“>”,表示从第一条尚未被消费的消息开始读取。 因为在 consumer1 读取消息前,group1 中没有其他消费者读取过消息,所以 consumer1 就得到 mqstream 消息队列中的所有消息了(一共 4 条)。
XREADGROUP group group1 consumer1 streams mqstream > 1) 1) "mqstream" 2) 1) 1) "1599203861727-0" 2) 1) "repo" 2) "5" 2) 1) "1599274912765-0" 2) 1) "repo" 2) "3" 3) 1) "1599274925823-0" 2) 1) "repo" 2) "2" 4) 1) "1599274927910-0" 2) 1) "repo" 2) "1"
消息队列中的消息一旦被消费组里的一个消费者读取了,就不能再被该消费组内的其他消费者读取了。比如执行完刚才的 XREADGROUP 命令后,再执行下面的命令,让 group1 内的 consumer2 读取消息时,consumer2 读到的就是空值,因为消息已经被 consumer1 读取完了:
XREADGROUP group group1 consumer2 streams mqstream 0 1) 1) "mqstream" 2) (empty list or set)
使用消费组的目的是让组内的多个消费者共同分担读取消息,通常会让每个消费者读取部分消息,从而实现消息读取负载在多个消费者间是均衡分布的。执行下列命令,让 group2 中的 consumer1、2、3 各自读取一条消息。
XREADGROUP group group2 consumer1 count 1 streams mqstream > 1) 1) "mqstream" 2) 1) 1) "1599203861727-0" 2) 1) "repo" 2) "5" XREADGROUP group group2 consumer2 count 1 streams mqstream > 1) 1) "mqstream" 2) 1) 1) "1599274912765-0" 2) 1) "repo" 2) "3" XREADGROUP group group2 consumer3 count 1 streams mqstream > 1) 1) "mqstream" 2) 1) 1) "1599274925823-0" 2) 1) "repo" 2) "2"
为了保证消费者在发生故障或宕机再次重启后,仍然可以读取未处理完的消息,Streams 会自动使用内部队列(也称为 PENDING List)留存消费组里每个消费者读取的消息,直到消费者使用 XACK 命令通知 Streams“消息已经处理完成”。如果消费者没有成功处理消息,不会给 Streams 发送 XACK 命令,消息仍然会留存。消费者重启后,用 XPENDING 命令查看已读取、但尚未确认处理完成的消息。
group2 中各个消费者已读取、但尚未确认的消息个数。其中, XPENDING 返回结果的第二、三行分别表示 group2 中所有消费者读取的消息最小 ID 和最大 ID。
XPENDING mqstream group2 1) (integer) 3 2) "1599203861727-0" 3) "1599274925823-0" 4) 1) 1) "consumer1" 2) "1" 2) 1) "consumer2" 2) "1" 3) 1) "consumer3" 2) "1"
进一步查看某个消费者具体读取了哪些数据,执行下面的命令:
XPENDING mqstream group2 - + 10 consumer2 1) 1) "1599274912765-0" 2) "consumer2" 3) (integer) 513336 4) (integer) 1
consumer2 已读取的消息的 ID 是 1599274912765-0。
消息 1599274912765-0 被 consumer2 处理了,consumer2 就可以使用 XACK 命令通知 Streams,这条消息就会被删除。当再使用 XPENDING 命令查看时,可以看到consumer2 已经没有已读取、但尚未确认处理的消息了。
XACK mqstream group2 1599274912765-0 (integer) 1 XPENDING mqstream group2 - + 10 consumer2 (empty list or set)
Streams 是 Redis 5.0 专门针对消息队列场景设计的数据类型,如果 Redis 是 5.0 及 5.0 以后的版本,就可以考虑把 Streams 用作消息队列了。
分布式系统组件使用消息队列时的三大需求:消息保序、重复消息处理和消息可靠性保证,
这三大需求可以进一步转换为对消息队列的三大要求:消息数据有序存取,消息数据具有全局唯一编号,以及消息数据在消费完成后被删除。
List 和 Streams 实现消息队列的特点和区别:
Redis 是否适合做消息队列,业界一直是有争论的。很多人认为,要使用消息队列就应该采用 Kafka、RabbitMQ 这些专门面向消息队列场景的软件,而 Redis 更加适合做缓存。
Redis 是一个非常轻量级的键值数据 库,部署一个 Redis 实例就是启动一个进程,部署 Redis 集群,也就是部署多个 Redis 实例。而 Kafka、RabbitMQ 部署时,涉及额外的组件,例如 Kafka 的运行就需要再部署 ZooKeeper。相比 Redis 来说,Kafka 和 RabbitMQ 一般被认为是重量级的消息队列。
所以关于是否用 Redis 做消息队列的问题,不能一概而论,需要考虑业务层面的数据体量,以及对性能、可靠性、可扩展性的需求。如果分布式系统中的组件消息通信量不大,Redis 只需要使用有限的内存空间就能满足消息存储的需求,Redis 的高性能特性能支持快速的消息读写,不失为消息队列的一个好的解决方案。