Go教程

RabbitMQ使用 prefetch_count优化队列的消费,使用死信队列和延迟队列实现消息的定时重试,golang版本

本文主要是介绍RabbitMQ使用 prefetch_count优化队列的消费,使用死信队列和延迟队列实现消息的定时重试,golang版本,对大家解决编程问题具有一定的参考价值,需要的程序猿们随着小编来一起学习吧!
  • RabbitMQ 的优化
    • channel
    • prefetch Count
    • 死信队列
      • 什么是死信队列
      • 使用场景
      • 代码实现
    • 延迟队列
      • 什么是延迟队列
      • 使用场景
      • 实现延迟队列的方式
        • Queue TTL
        • Message TTL
      • 使用 Queue TTL 设置过期时间
      • 使用 Message TTL 设置过期时间
      • 使用插件还是Queue TTL处理延迟队列呢?
    • 参考

RabbitMQ 的优化

channel

生产者,消费者和 RabbitMQ 都会建立连接。为了避免建立过多的 TCP 连接,减少资源额消耗。

AMQP 协议引入了信道(channel),多个 channel 使用同一个 TCP 连接,起到对 TCP 连接的复用。

不过 channel 的连接数是有上限的,过多的连接会导致复用的 TCP 拥堵。

const (
	maxChannelMax = (2 << 15) - 1
	defaultChannelMax = (2 << 10) - 1
)

通过http://github.com/streadway/amqp这个client来连接 RabbitMQ,这里面定义了最大值65535和默认最大值2047。

prefetch Count

什么是prefetch Count,先举个栗子:

假定 RabbitMQ 队列有 N 个消费队列,RabbitMQ 队列中的消息将以轮询的方式发送给消费者。

消息的数量是 M,那么每个消费者得到的数据就是 M%N。如果某一台的机器中的消费者,因为自身的原因,或者消息本身处理所需要的时间很久,消费的很慢,但是其他消费者分配的消息很快就消费完了,然后处于闲置状态,这就造成资源的浪费,消息队列的吞吐量也降低了。

这时候prefetch Count就登场了,通过引入prefetch Count来避免消费能力有限的消息队列分配过多的消息,而消息处理能力较好的消费者没有消息处理的情况。

RabbitM 会保存一个消费者的列表,每发送一条消息都会为对应的消费者计数,如果达到了所设定的上限,那么 RabbitMQ 就不会向这个消费者再发送任何消息。直到消费者确认了某条消息之后 RabbitMQ 将相应的计数减1,之后消费者可以继续接收消息,直到再次到达计数上限。这种机制可以类比于 TCP/IP 中的"滑动窗口"。

所以消息不会被处理速度很慢的消费者过多霸占,能够很好的分配到其它处理速度较好的消费者中。通俗的说就是消费者最多从 RabbitMQ 中获取的未消费消息的数量。

prefetch Count数量设置为多少合适呢?大概就是30吧,具体可以参见Finding bottlenecks with RabbitMQ 3.3

谈到了prefetch Count,我们还要看了 global 这个参数,RabbitMQ 为了提升相关的性能,在 AMQPO-9-1 协议之上重新定义了 global 这个参数

global 参数 AMQPO-9-1 RabbitMQ
false 信道上所有的消费者都需要遵从 prefetchC unt 的限 信道上新的消费者需要遵从 prefetchCount 的限定值定值
true 当前通信链路(Connection) 上所有的消费者都要遵从 prefetchCount 的限定值,就是同一Connection上的消费者共享 信道上所有的消费者都需要遵从 prefetchCunt 的上限,就是同一信道上的消费者共享

prefetchSize:预读取的单条消息内容大小上限(包含),可以简单理解为消息有效载荷字节数组的最大长度限制,0表示无上限,单位为 B。

如果prefetch Count为 0 呢,表示预读取的消息数量没有上限。

举个错误使用的栗子:

之前一个队列的消费者消费速度过慢,prefetch Count为0,然后新写了一个消费者,prefetch Count设置为30,并且起了10个pod,来处理消息。老的消费者还没有下线也在处理消息。

但是发现消费速度还是很慢,有大量的消息处于 unacked 。如果明白prefetch Count的含义其实就已经可以猜到问题的原因了。

老的消费者prefetch Count为0,所以很多 unacked 消息都被它持有了,虽然新加了几个新的消费者,但是都处于空闲状态,最后停掉了prefetch Count为0的消费者,很快消费速度就正常了。

死信队列

什么是死信队列

一般消息满足下面几种情况就会消息变成死信

  • 消息被否定确认,使用 channel.basicNackchannel.basicReject ,并且此时 requeue 属性被设置为false;

  • 消息过期,消息在队列的存活时间超过设置的 TT L时间;

  • 队列达到最大长度,消息队列的消息数量已经超过最大队列长度。

当一个消息满足上面的几种条件变成死信(dead message)之后,会被重新推送到死信交换器(DLX ,全称为 Dead-Letter-Exchange)。绑定 DLX 的队列就是死信队列。

所以死信队列也并不是什么特殊的队列,只是绑定到了死信交换机中了,死信交换机也没有什么特殊,我们只是用这个来处理死信队列了,和别的交换机没有本质上的区别。

对于需要处理死信队列的业务,跟我们正常的业务处理一样,也是定义一个独有的路由key,并对应的配置一个死信队列进行监听,然后 key 绑定的死信交换机中。

使用场景

当消息的消费出现问题时,出问题的消息不被丢失,进行消息的暂存,方便后续的排查处理。

代码实现

死信队列的使用,可参看下文,配合延迟队列实现消息重试的机制。

延迟队列

什么是延迟队列

延迟队列就是用来存储进行延迟消费的消息。

什么是延迟消息?

就是不希望消费者马上消费的消息,等待指定的时间才进行消费的消息。

使用场景

1、关闭空闲连接。服务器中,有很多客户端的连接,空闲一段时间之后需要关闭;

2、清理过期数据业务上。比如缓存中的对象,超过了空闲时间,需要从缓存中移出;

3、任务超时处理。在网络协议滑动窗口请求应答式交互时,处理超时未响应的请求;

4、下单之后如果三十分钟之内没有付款就自动取消订单;

5、订餐通知:下单成功后60s之后给用户发送短信通知;

6、当订单一直处于未支付状态时,如何及时的关闭订单,并退还库存;

7、定期检查处于退款状态的订单是否已经退款成功;

8、新创建店铺,N天内没有上传商品,系统如何知道该信息,并发送激活短信;

9、定时任务调度:使用DelayQueue保存当天将会执行的任务和执行时间,一旦从DelayQueue中获取到任务就开始执行。

总结下来就是一些延迟处理的业务场景

实现延迟队列的方式

RabbitMQ 中本身并没有直接提供延迟队列的功能,可以通过死信队列和 TTL 。来实现延迟队的功能。

先来了解下过期时间 TTL,消息一旦超过设置的 TTL 值,就会变成死信。这里需要注意的是 TTL 的单位是毫秒。设置过期时间一般与两种方式

  • 1、通过队列属性设置,队列中的消息有相同的过期时间;

  • 2、通过消息本身单独设置,每条消息有自己的的过期时间。

如果两种一起设置,消息的 TTL 以两者之间较小的那个数值为准。

上面两种 TTL 过期时间,消息队列的处理是不同的。第一种,消息一旦过期就会从消息队列中删除,第二种,消息过期了不会马上进行删除操作,删除的操作,是在投递到消费者之前进行判断的。

第一种方式中相同过期时间的消息是在同一个队列中,所以过期的消息总是在头部,只要在头部进行扫描就好了。第二种方式,过期的时间不同,但是消息是在同一个消息队列中的,如果要清理掉所有过期的时间就需要遍历所有的消息,当然这也是不合理的,所以会在消息被消费的时候,进行过期的判断。这个处理思想和 redis 过期 key 的清理有点神似。

Queue TTL

通过 channel.queueDeclare 方法中的 x-expires 参数可以控制队列被自动删除前处于未使用状态的时间。未使用的意思是队列上没有任何的消费者,队列也没有被重新声明,并且在过期时间段内也未调用过 Basic.Get 命令。

	if _, err := channel.QueueDeclare("delay.3s.test",
		true, false, false, false, amqp.Table{
			"x-dead-letter-exchange":    b.exchange,
			"x-dead-letter-routing-key": ps.key,
			"x-expires":                 3000,
		},
	); err != nil {
		return err
	}
Message TTL

对于 Message TTL 设置有两种方式

  • Per-Queue Message TTL

通过在 queue.declare 中设置 x-message-ttl 参数,可以控制在当前队列中,消息的过期时间。不过同一个消息被投到多个队列中,设置x-message-ttl的队列,里面消息的过期,不会对其他队列中相同的消息有影响。不同队列处理消息的过期是隔离的。

	if _, err := channel.QueueDeclare("delay.3s.test",
		true, false, false, false, amqp.Table{
			"x-dead-letter-exchange":    b.exchange,
			"x-dead-letter-routing-key": ps.key,
			"x-message-ttl":             3000,
		},
	); err != nil {
		return err
	}
  • Per-Message TTL

通过 expiration 就可以设置每条消息的过期时间,需要注意的是 expiration 是字符串类型。

	delayQ := "delay.3s.test"
	if _, err := channel.QueueDeclare(delayQ,
		true, false, false, false, amqp.Table{
			"x-dead-letter-exchange":    b.exchange,
			"x-dead-letter-routing-key": ps.key,
		},
	); err != nil {
		return err
	}

	if err := channel.Publish("", delayQ, false, false, amqp.Publishing{
		Headers:      amqp.Table{"x-retry-count": retryCount + 1},
		Body:         d.Body,
		DeliveryMode: amqp.Persistent,
		Expiration:   "3000",
	}); err != nil {
		return err
	}

通过延迟队列来处理延迟消费的场景,可以借助于死信队列来处理

延迟队列通常的使用:消费者订阅死信队列 deadQueue,然后需要延迟处理的消息都发送到 delayNormal 中。然后 delayNormal 中的消息 TTL 过期时间到了,消息会被存储到死信队列 deadQueue。我们只需要正常消费,死信队列 deadQueue 中的数据就行了,这样就实现对数据延迟消费的逻辑了。

使用 Queue TTL 设置过期时间

举个线上处理消息重传的的栗子:

消费者处理队列中的消息,一个消息在处理的过程中,会出现错误,针对某些特性的错误,希望这些消息能够退回到队列中,过一段时间在进行消费。当然,如果不进行 Ack,或者 Ack 之后重推到队列中,消费者就能再次进行重试消费。但是这样会有一个问题,消费队列中消息消费很快,刚重推的消息马上就到了队列头部,消费者可能马上又拿到这个消息,然后一直处于重试的死循环,影响其他消息的消费。这时候延迟队列就登场了,我们可以借助于延迟队列,设置特定的延迟时间,让这些消息的重试,发生到之后某个时间点。并且重试一定次数之后,就可以选择丢弃这个消息了。

来看下流程图:

mq

具体的处理步骤:

1、生产者推送消息到 work-exchange 中,然后发送到 work-queue 队列;

2、消费者订阅 work-queue 队列,这是正常的业务消费;

3、对于需要进行延迟重试的消息,发送到延迟队列中;

4、延迟队列会绑定一个死信系列,死信队列的 exchange 和 routing-key,就是上面正常处理业务 work-queue 消息队里的 exchange 和 routing-key,这样过期的消息就能够重推到业务的队列中,每次重推到延迟队列的时候会记录消息重推的次数,如果达到我们设定的上限,就可以丢弃数据,落库或其他的操作了;

5、所以消费者只需要监听处理 work-queue 队列就可以了;

6、无用的延迟队列,到了删除的时间节点,会进行自动的删除。

上代码,文中 Demo 的地址

这篇关于RabbitMQ使用 prefetch_count优化队列的消费,使用死信队列和延迟队列实现消息的定时重试,golang版本的文章就介绍到这儿,希望我们推荐的文章对大家有所帮助,也希望大家多多支持为之网!