为什么需要消费端消费?
例如平时在淘宝天猫京东秒杀抢购中,看到一件商品,全国14亿人民,假如其中300w人来说跟你一起抢购这个东西,秒杀商品仅仅就那么几秒时间,300个w的请求一起在几秒内提交到服务器,服务器怎么处理,总不能全部一起后台逻辑判断吧?
活动一开始会有大量并发写请求到达服务端,那我们如何做才能保证我们的服务不崩溃,还能控制并发,此时就需要mq出马,利用mq的消费端限流操作,对消息进行削峰处理,如何削峰?
当300w的消息投递全部投到我们mq中,我们机器内存,磁盘是否支持?
当300w的消息投递全部投到我们mq中,再或者说大于300w呢?
当300w的消息投递速度远快于消费速度时,随着时间积累就会出现“消息积压”?
我们的消息中间件本身是具备能力是有容量限制的,如果长期运行并没有任何处理,最终会导致Broker崩 溃,如果是分布式系统的故障往往会发生上下游传递,产生的连锁反应那就会很悲剧,最终因为mq崩溃导致我们的系统整个链路崩溃,后果会很严重。
那我们改如何取解决这些问题呢?
1 针对大量消息,我们可以限流?
如何限流,根据消息存储容量?磁盘容量,内存容量?
2 消息很多,消费消息速度远远小于生产速度
怎么设置消费一个消息,再发给我们一个消息呢?
3 提高我们程序的处理能力,消费消息更快一些呢?
4 等等等等
RabbitMQ 为我们提供了三种机制
当大量的消息丢到mq中,首先mq中消息是极少的,刚开始消息索引和消息体都是存储在内存中,但随着消息越来越多,消息索引存储在内存中,消息体存储在磁盘上,随着消息越来越多,内存空间越来越小,开始讲一部分或者再丢到的mq中的消息刷到磁盘或者直接存储到磁盘,每次伴随着磁盘操作, 消费端还需要消费消息,mq的性能会急剧下降。
针对这中情况:
RabbitMQ 可以对内存和磁盘使用量设置阈值,当达到阈值后,生产者将被阻塞(block),直到对应项指标恢复正常。全局上可以防止超大流量、消息积压等导致的Broker被压垮。
当内存受限或磁盘可用空间受限的时候,服务器都会暂时阻止连接,服务器将暂停从发布消息的已连接客户端的套接字读取数据。连接心跳监视也将被禁用。所有网络连接将在rabbitmqctl和管理插件中显示为“已阻止”,这意味着它们尚未尝试发布,因此可以继续或被阻止,这意味着它们已发布,现在已暂停。兼容的客户端被阻止时将收到通知。
具体配置:
在/etc/rabbitmq/rabbitmq.conf中配置磁盘可用空间大小:
磁盘设置
内存设置
面向每一个连接进行流控。当单个队列达到最大流速时,或者多个队列达到总流速时,都会触发流控。触发单个链接的流控可
能是因为connection、channel、queue的某一个过程处于flow状态,这些状态都可以从监控平台看到。
connection
channle
消息队列
RabbitMQ中有一种QoS
保证机制,可以限制Channel上接收到的未被Ack的消息数量,如果超过这个数量限制RabbitMQ将不会再往消费端推送消息。这是一种流控手段,可以防止大量消息瞬时从Broker送达消费端造成消费端巨大压力(甚至压垮消费端)。
比较值得注意的是QoS
机制仅对于消费端推模式有效,对拉模式无效。而且不支持NONE Ack
模式。
执行channel.basicConsume
方法之前通过 channel.basicQoS
方法可以设置该数量
。消息的发送是异步的,消息的确认也是异步的。在消费者消费慢的时候,可以设置Qos
的prefetchCount
,它表示broker在向消费者发送消息的时候,一旦发送了prefetchCount个消息而没有一个消息确认的时候,就停止发送。
消费者确认一个,broker就发送一个,确认两个
就发送两个
。换句话说,消费者确认多少
,broker就发送多少
,消费者等待处理的个数永远限制在prefetchCount个。
生产者
java package gaojitexing.limit; import com.rabbitmq.client.BuiltinExchangeType; import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; import com.rabbitmq.client.ConnectionFactory; import java.io.IOException; import java.util.concurrent.TimeoutException; /** * 生产者生产消息 */ public class Produce { public static void main(String[] args) throws IOException, TimeoutException { //连接工厂 ConnectionFactory connectionFactory = new ConnectionFactory(); connectionFactory.setHost("localhost"); connectionFactory.setVirtualHost("/"); connectionFactory.setUsername("guest"); connectionFactory.setPassword("guest"); //amqp 协议端口 connectionFactory.setPort(5672); //简历连接 Connection connection = connectionFactory.newConnection(); //获取通道 Channel channel = connection.createChannel(); //声明队列 channel.queueDeclare("queue.qos", true, false, false, null); //声明交换器 交换器名称, 交换器类型,是否持久化,是否自动删除的, 属性map几乎 channel.exchangeDeclare("ex.qos", BuiltinExchangeType.FANOUT, false, false, false, null); //发送消息 交换器 路由key,属性,消息 -> amqp协议会将消息发送出去 channel.queueBind("queue.qos", "ex.qos", "qos.key"); String aeration = "hello-"; for (int i = 0; i < 20; i++) { channel.basicPublish("ex.qos", "qos.key", null, (aeration + " --- > " + i).getBytes()); } channel.close(); connection.close(); } }
消费者
package gaojitexing.limit; import com.rabbitmq.client.*; import java.io.IOException; import java.net.URISyntaxException; import java.security.KeyManagementException; import java.security.NoSuchAlgorithmException; import java.util.concurrent.TimeoutException; /** * 推消息 * qos 保证机制 仅对推消息可用,对拉消无效 */ public class LisenterComsumer { public static void main(String[] args) throws NoSuchAlgorithmException, KeyManagementException, URISyntaxException, IOException, TimeoutException { //连接工厂 ConnectionFactory connectionFactory = new ConnectionFactory(); connectionFactory.setUri("amqp://guest:guest@localhost:5672/%2f"); // connectionFactory.setVirtualHost("/"); // connectionFactory.setUsername("guest"); // connectionFactory.setPassword("guest"); // //amqp 协议端口 // connectionFactory.setPort(5672); //建立连接 Connection connection = connectionFactory.newConnection(); //获取通道 Channel channel = connection.createChannel(); //定义消息队列 channel.queueDeclare("queue.qos", true, false, false, null); //todo 仅对推消息可用 //表示Qos 是10个消息,最多有10个消息等待确认 channel.basicQos(10); //如果设置为ture 表示使用当前channle的客户端的Consumer 该设置都生效,false表示仅限当前Consumer // channel.basicQos(10,false); //第一个参数表示未确认消息的大小,mq没有实现,不用管,一般第三个用不到。 // channel.basicQos(100,10,true); //false 表示不自动确认 channel.basicConsume("queue.qos", false, new DefaultConsumer(channel) { @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { System.out.println("获取到的消息" + new String(body)); try { //模拟消费消息时间 Thread.sleep(3_000); } catch (InterruptedException e) { e.printStackTrace(); } //envelope.getDeliveryTag() deliveryTag表示消息的唯一标志, //手动确认消息 第二个参数表示表示是否是批量确认 // channel.basicAck(envelope.getDeliveryTag(), false); //批量确认 减少每个消息都发送去人带来的网络影响 //如果对于每个消息都发送确认,增加了网络流量,此时可以批量确认消息。如果设置了 multiple为true,消费者在确认的时候,比如说id是8的消息确认了,则在8之前的所有消息都 确认了。 channel.basicAck(envelope.getDeliveryTag(), true); //手动确认 者处理失败 //第二个参数表示不确认多个还是一个消息,最后一个参数表示不确认的消息是否重新放回队列 //todo qos保证机制必须确认机制 , 不支持NONE Ack模式。 // channel.basicNack(envelope.getDeliveryTag(),false,true); //手动拒绝消息(只能拒收一条消息)。第二个参数表示是否重新入列,是否重新入列,然后重发会一直重发重试 // channel.basicReject(envelope.getDeliveryTag(), true); } }); // channel.close(); // connection.close(); } }
我们发送消息,发送多次,通过控制台可以看到 未被确认始终是10
个,确认消息和总数一直在变化,两者之间差就是中间为未被确认消息的总数。
消费端消费一个,mq然后再推送给消费者一个,始终维持在10个,直到客户端消费完。
生产者往往是希望自己产生的消息能快速投递出去,而当消息投递太快且超过了下游的消费速度时就容易出现消息积压/堆积,所以,从上游来讲我们应该在生产端应用程序中也可以加入限流、应急开关等控制手段,避免超过Broker端的极限承载能力或者压垮下游消费者。
我们期望下游消费端能尽快消费完消息,而且还要防止瞬时大量消息压垮消费端(推模式),我们期望消费端处理速度是最快、最稳定而且还相对均匀(比较理想化)。提高消费者消费消息就可以提升下游应用的吞吐量和缩短消费过程的耗时。
//设置channle 最大并发请求书 connectionFactory.setRequestedChannelMax(10); //默认的线程池 connectionFactory.setThreadFactory(Executors.defaultThreadFactory());