Message Queue, 消息队列, 先入先出,通信机制。
四大MQ对比
四大组件:
名词解释:
生产者将消息推给交换机,交换机再将消息发送到队列。如果不指定交换机,默认使用默认交换机。
简单模式如果不指定交换机,则使用默认交换机。
一般要求指定交换机
简单模式的交换机类型是direct
public class Producer { public static void main(String[] args) { // 创建连接工厂 ConnectionFactory connectionFactory = new ConnectionFactory(); connectionFactory.setHost("192.168.80.130"); connectionFactory.setPort(5672); connectionFactory.setUsername("guest"); connectionFactory.setPassword("guest"); connectionFactory.setVirtualHost("/"); Connection connection = null; Channel channel = null; try { // 创建连接 connection = connectionFactory.newConnection("生产者"); // 通过连接获取channel channel = connection.createChannel(); // 通过创建交换机,声明队列,绑定关系,路由key,发送消息,接收消息 String queueName = "queue1"; /* * 队列名字 * 是否要持久化 * 排他性,是否为独占 * 是否自动删除,最后一个消息被消费后队列是否自动删除 * 携带一些附加参数 * */ channel.queueDeclare(queueName, false, false, false, null); // 准备消息内容 String message = "hello rabbit!"; // 发送消息给队列 channel.basicPublish("", queueName, null, message.getBytes(StandardCharsets.UTF_8)); } catch (IOException e) { e.printStackTrace(); } catch (TimeoutException e) { e.printStackTrace(); } finally { // 关闭channel if (channel != null && channel.isOpen()) { try { channel.close(); } catch (IOException | TimeoutException e) { e.printStackTrace(); } } // 关闭连接 if (connection != null && connection.isOpen()) { try { connection.close(); } catch (IOException e) { e.printStackTrace(); } } } } }
public class Consumer { public static void main(String[] args) throws IOException, TimeoutException { ConnectionFactory connectionFactory = new ConnectionFactory(); connectionFactory.setHost("192.168.80.130"); connectionFactory.setPort(5672); connectionFactory.setUsername("guest"); connectionFactory.setPassword("guest"); connectionFactory.setVirtualHost("/"); Connection connection = connectionFactory.newConnection(); Channel channel = connection.createChannel(); System.out.println("等待接收消息..."); // 推送的消息如何进行消费的接口回调 DeliverCallback deliverCallback = (consumerTag, delivery) -> { String message = new String(delivery.getBody()); System.out.println(message); }; // 取消消费的一个回调接口, 如在消费的时候队列被删除掉了 CancelCallback cancelCallback = (consumerTag) ->{ System.out.println("消息消费被中断了"); }; /* * 消费者消费消息 * 1. 消费哪个队列 * 2. 消费成功后是否自动挡应答,true自动,false手动 * 3. 消费者成功消费的回调 * 4. 消费者未成功消费的回调 */ channel.basicConsume("simple", true, deliverCallback, cancelCallback); } }
多个消费者轮流从消息队列中取出消息进行消费
public class RabbitMQUtils { public static Channel getChannel () throws Exception { ConnectionFactory connectionFactory = new ConnectionFactory(); connectionFactory.setHost("192.168.80.130"); connectionFactory.setPort(5672); connectionFactory.setUsername("guest"); connectionFactory.setPassword("guest"); connectionFactory.setVirtualHost("/"); Connection connection = connectionFactory.newConnection(); Channel channel = connection.createChannel(); return channel; } }
public class Consumer { public static void main(String[] args) throws Exception { Channel channel = RabbitMQUtils.getChannel(); DeliverCallback deliverCallback = (consumerTag, delivery) -> { String message = new String(delivery.getBody()); System.out.println("接收到消息:" + message); }; CancelCallback cancelCallback = (consumerTag) -> { System.out.println(consumerTag + "消费者取消消费"); }; System.out.println("消费者启动等待消费..."); // 工作模式,轮询消费 channel.basicConsume("work", true, deliverCallback, cancelCallback); } }
public class Producer { public static void main(String[] args) throws Exception { try (Channel channel = RabbitMQUtils.getChannel()) { channel.queueDeclare("work", false, false, false, null); // 从控制台当中接收信息 Scanner scanner = new Scanner(System.in); while (scanner.hasNext()) { String message = scanner.next(); channel.basicPublish("", "work", null, message.getBytes(StandardCharsets.UTF_8)); System.out.println("发送消息完成:" + message); } } } }
int prefetchCount = 1; channel.basicQos(prefetchCount);
交换机的类型为Fanout,即不处理路由键,只需要简单的将队列绑定到交换机上。
交换机类型是direct,而且需要添加路由将队列绑定到交换机上
支持模糊匹配的路由。交换机类型是topic,需要添加模糊路由进行队列和交换机的绑定
主题模式的路由不能随意写,必须是一个单词列表,以点分隔开
模糊匹配规则
#
表示没有、一个或多个*
表示一个见后面
为了保证消息都被消费者消费了,没有丢失,引入了消息应答机制
消费应答机制:消费者在接收到消息并且处理该消息后,告诉rabbitmq它已经处理了,rabbitmq可以把该消息删除了
Channel.basicAck
肯定确认,已成功处理该消息,可以将其丢弃Channel.basicNack
否定确认Channel.basicReject
否定确认,不处理该消息了,可以将其丢弃如果消费者没有发送ACK确认,RabbitMQ将知道该消息未完全处理,并将其重新排队。如果此时其他消费者可以处理,该消息可以重新分发给另一个消费者。这样可以确保消息不会丢失。
public void produce() throws Exception { try (Channel channel = RabbitMQUtils.getChannel()) { channel.queueDeclare(TASK_QUEUE_NAME, false, false, false, null); Scanner sc = new Scanner(System.in); System.out.println("请输入信息:"); while (sc.hasNext()){ String message = sc.nextLine(); channel.basicPublish("", TASK_QUEUE_NAME, null, message.getBytes("UTF-8")); System.out.println("生产者发出消息" + message); } } }
public void consume() throws Exeception{ Channel channel = RabbitUtils.getChannel(); channel.queueDeclare(TASK_QUEUE_NAME, false, false, false, null); System.out.println("c1等待接收消息..."); DeleverCallback deliverCallback = (consumerTag, delivery) -> { String message = new String(delivery.getBody(), "UTF-8"); SleepUtils.sleep(1); System.out.println("接收到消息" + message); // 消息标记tag // false 代表直营到接收到的哪个传递的消息,true应答所有消息 channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false) }; // 手动应答 boolean autoAck = false; channel.basicConsume(TASK_QUEUE_NAME, autoAck, deliverCallback, consumerTag -> {}); // 最后一个参数未失败了怎么处理 }
public class SleepUtils { public static void sleep(int second){ try { Thread.sleep(second * 1000); } catch (InterruptedException e){ Thread.currentThread().interrupt(); } }}
为确保消息不会丢失,需要将队列和消息都持久化
当消息被投递到匹配的队列后,broker就会发送一个确认给生产者,这就使得生产者直到消息已经正确到达目的地了。
这是一种同步确认发布地方式,发布一个消息后只有它被确认发布,后续消息才能继续发布
public void publishMessageIndividually() throws Exception { try (Channel channel = RabbitMQUtils.getChannel()){ String queueName = "confim_individually"; channel.queueDeclare(queueName, false, false, false, null); channel.confirmSelect(); // 开启确认发布 long begin = System.currentTimeMills(); for (int i = 0; i < MESSAGE_COUNT; i++){ String message = "第" + i + "条消息"; channel.basicPublish("", queueName, null, message.getBytes()); // 服务端返回false或超时时间内未返回,生产者可以重发消息 boolean flag = channel.waitForConfirm(); if (flag){ System.out.println("消息发送成功"); } } long end = System.currentTimeMillis(); System.out.println("发布" + MESSAGE_COUNT + "个单独确认消息,耗时" + (end - begin) + "ms") }}
先发布一批消息,然后一起确认可以极大提高吞吐量,缺点是,如果发生故障,不知道哪个消息出现了问题,也是同步的
public void publishMessageBatch() throws Exception { try (Channel channel = RabbitMQUtils.getChannel()){ String queueName = "batch_confim"; channel.queueDeclare(queueName, false, false, false, null); channel.confirmSelect(); // 开启确认发布 int batchSize = 100; // 批量确认消息大小 int messageCount = 0; // 未确认消息个数 long begin = System.currentTimeMills(); for (int i = 0; i < MESSAGE_COUNT; i++){ String message = "第" + i + "条消息"; channel.basicPublish("", queueName, null, message.getBytes()); // 服务端返回false或超时时间内未返回,生产者可以重发消息 messageCount++; if (messageCount == batchSize){ channel.waitForConfirms(); messageCount = 0; } } // 为了确保还有剩余没有确认消息,再次确认 if (messageCount > 0){ chennel.waitForConfirms(); } long end = System.currentTimeMillis(); System.out.println("发布" + MESSAGE_COUNT + "个批量确认消息,耗时" + (end - begin) + "ms") }}
利用回调函数来达到消息可靠性传递
public void publishMessageAsync() throws Exception { try (Channel channel = RabbitMQUtils.getChannel()){ String name = "async_confirm"; channel.queueDeclare(queueName, false, false, false, null); // 开启确认发布 channel.confirmSelect(); // 线程安全有序的哈希表,适用于高并发的情况 ConcurrentSkipListMap<Long, String> outstandingConfirms = new ConcurrentSkipListMap<>(); // 确认收到消息的一个回调 // sequence 消息序列号 // true/false 小于或等于当前序列号的消息/等于当前序列号的消息 ConfirmCallback ackCallback = (sequenceNumber, multiple) -> { if (multiple) { // 返回的是小于等于当前序列号的未确认消息,是一个map ConcurrentNavigableMap<Long, String> confirmed = outstandingConfirms.headMap(sequenceNumber, true); // 清除该部分未确认消息 confirmed.clear(); } else { // 只清除当前序列号的消息 outstandingConfirms.remove(sequenceNumber); } }; ConfirmCallback nackCallback = (sequenceNumber, multiple) -> { String message = oustandingConfrims.get(sequenceNumber); System.out.println("发布的消息" + message + "未被确认,序列号:" + sequenceNumber); }; // 添加一个异步确认的监听器 channel.addConfirmListener(ackCallback, nackCallback); long begin = System.currentTimeMillis(); for (int i = 0; i < MESSAGE_COUNT; i++){ String message = "消息" + i; // 关联序列号和消息体 // 全都是还未确认的消息体 outstandingConfirms.put(channel.getNextPublishSeqNo(), message); channel.basicPublish("", queueName, null, message.getBytes()); } long end = System.currentTimeMillis(); System.out.println("发布" + MESSAGE_COUNT + "个异步确认消息,耗时" + (end - begin)); }}
一般来说,producer将消息投递到broker或者queue里,consumer从queue中取出消息进行消费,但是某些是偶由于特定的原因导致queue中的某些消息无法被消费,这样的消息如果没有后续处理,就成为死信。
为了保证消息不丢失,需要用到死信队列来处理死信。
演示消息TTL过期,处理死信队列
public void produce() throws Exception { try (Channel channe = RabbitUtils.getChannel()){ channel.exchangeDeclare(NORMAL_EXCHANGE, BuiltinExchangeType.DIRECT); // 设置TTL时间 AMQP.BasicProperties().builder().expiration("10000").build(); for (int i = 0; i < 11 ; i++){ String message = "info" +i; channel.basicPublish(NORMAL_EXCHANGE, "normal", properties, message.getBytes()); System.out.println("生产者发送消息" + message); } }}
public void consumer() throws Exception{ Channel channel = RabbitUtils.getChannel(); // 声明死信和普通交换机类型为direct channel.exchangeDeclare(NORMAL_EXCHANGE, BuiltinExchangeType.DIRECT); channel.exchangeDeclare(DEAD_EXCHANGE, BuiltinExchangeType.DIRECT); // 声明死信队列 String deadQueue = "dead-queue"; channel.queueDeclare(deadQueue, false, false, false, null); // 死信队列绑定死信交换机 channel.queueBind(deadQueue, DEAD_EXCHANGE, "dead"); // 正常队列绑定死信队列信息 Map<String, Object> params = new HashMap<>(); // 正常队列设置死信交换机 params.put("x-dead-letter-exchange", DEAD_EXCHANGE); // 正常队列设置死信routing-key params.put("x-dead-letter-routing-key", "dead"); String normalQueue = "normal-queue"; channel.queueDeclare(normalQueue, false, false, false, params); channel.queueBind(normalQueue, NORMAL_EXCHANGE, "normal"); System.out.println("等待接收消息..."); DeliverCallback deliverCallback = (consumerTag, delivery) -> { String message = new String(delivery.getBody(), "UTF-8"); System.out.println("Consumer01接收到消息" + message); }; channel.basicConsume(normalQueue, true, deliverCallback, consumerTag -> {}); }
延迟队列就是存放需要在指定时间被处理的元素的队列
TTL是消息或队列的属性,表面该消息或队列中所有消息的最大存活时间。
如果消息在TTL设定的时间内没有被消费,则消息会变为死信。
创建两个队列QA和QB,分别设置TTL为10s和40s,然后创建一个交换机X和死信交换机Y,再创建一个死信队列QD
RabbitMQ只会检查第一个消息是否过期,如果过期则丢到死信队列;如果第一个消息过期时间很长,而第二个消息过期时间很短,第二个消息并不会优先得到执行。
将rabbtimq_delayed_message_exchange
插件加成放到RabbtiMQ目录下
这样可以定义一个自定义的delay交换机,可以让第二个消息先得到执行
对于无法投递的消息,既不想丢失消息,又不想设置mandatory参数增加生产者的复杂性,该怎么做呢?
可以利用备份交换机,当交换机接收到一条不可路由的消息时,将会把这条消息转发到备份交换机中,由备份交换机来进行转发和处理,备份交换机类型常常为Fanout
用户对于同一操作发起的一次请求或多次请求的结果是一致的,不会因为多次点击而产生副作用。
消费者再消费消息后,给MQ返回ack时网络中断,故MQ没有收到确认消息,该条消息会重写发送给消费者,而实际上该消息已经被消费,造成了重复消费问题。
解决办法:利用全局唯一ID,每次消费消息时,用该id判断,该消息是否已经被消费过了
RabbitMQ支持优先级队列,可以在往优先级队列中添加消息时设置优先级,让优先级高的消息先执行
惰性队列会尽可能将消息存入磁盘中,而在消费者消费到相应消息时,才会被加载到内存中,它的一个中要设计目标时能够支持更长的队列,即支持更多的消息存储。
将队列镜像到集群中的其他Broker节点上,如果集群中的一个节点失效了,队列能自动切换到镜像中的另一个节点上以保证服务的可用性。