消息确认机制
为了保证消息丢失,需要付出两方面的代价:一方面是性能的损耗,一方面可能造成消息重复消费。为了保证消息只被消费一次,我们需要保证消费多条消息时所得到的结果就是相同的,即幂等的。消息在生产和消费的过程中都可能会产生重复,所以你要做的是在生产过程和消费过程中增加消息幂等性的保证。
生产者保证消息唯一性
给每一个生产者和消息赋予唯一的ID,消息存储时以<生产者ID,最后一条消息ID>存储,当某一个生产者产生新的消息时,消息队列服务端会比对存储的最后一条,如果一致就认为是重复的消息,服务端会自动丢弃。
消费者保证消费的唯一性
设置全局唯一的消息ID。在消息被生产的时候给它生成一个全局唯一的消息ID,消息被处理之后把这个ID存储在数据库/缓存中,在处理下一条消息之前先从数据库里面检查这个全局ID是否被消费过,如果被消费过就放弃消费。
问题:如果消息在处理之后,还没有来得及写入数据库,消费者宕机了重启之后发现数据库中并没有这条消息,还是会重复执行两次消费逻辑。
解决:这时你就需要引入事务机制,保证消息处理和写入数据库必须同时成功或者同时失败。
适用:全局唯一的消息ID+事务的范式使得消息处理的成本就更高了,所以如果对于消息重复没有特别严格的要求,可以直接使用全局唯一的消息ID,而不考虑引入事务。
适用乐观锁。给数据中增加一个版本号的字段,在生产消息时先查询版本号,并且将版本号连同消息一起发送给消息队列。消费端在拿到消息和版本号后,在执行更新账户金额SQL的时候带上版本号
update user set amount = amount + 20, version=version+1 where userId=1 and version=1;
SQL可以执行成功,version+1;在执行第二条相同的消息时,由于version值不再是1,所以这条SQL不能执行成功,也就保证了消息的幂等性。
消息队列已经堆积了大量的消息,消费者由于性能问题不能及时消费消息,就容易造成消息堆积。
如何增加增加消费者的消息处理能力?
消费者: