RabbitMq死信队列有三种发生情况:
生产者:
import com.cn.mq.utils.ConnectionMq; import com.rabbitmq.client.AMQP; import com.rabbitmq.client.BuiltinExchangeType; import com.rabbitmq.client.Channel; /** * @Description 死信 * @ClassName Producer * @Author yangff * @date 2021.09.28 22:52 */ public class Producer { private static final String NORMAL_EXCHANGE = "normal_exchange"; public static void main(String[] argv) throws Exception { try (Channel channel = ConnectionMq.getChannel()) { channel.exchangeDeclare(NORMAL_EXCHANGE, BuiltinExchangeType.DIRECT); //设置消息的 TTL 时间 10s=10000ms //消息被拒绝情况下不使用,手动进行消息确认 // AMQP.BasicProperties properties = new // AMQP.BasicProperties().builder().expiration("10000").build(); //该信息是用作演示队列个数限制 for (int i = 1; i <11 ; i++) { String message="info"+i; //消息被拒绝情况下不使用,手动进行消息确认 // channel.basicPublish(NORMAL_EXCHANGE, "zhangsan", properties, // message.getBytes()); channel.basicPublish(NORMAL_EXCHANGE, "zhangsan", null, message.getBytes()); System.out.println("生产者发送消息:"+message); } } } }
1.消息 TTL 过期
消费者1:
import com.cn.mq.utils.ConnectionMq; import com.rabbitmq.client.BuiltinExchangeType; import com.rabbitmq.client.Channel; import com.rabbitmq.client.DeliverCallback; import java.util.HashMap; import java.util.Map; /** * @Description * @ClassName Consumer01 * @Author yangff * @date 2021.09.28 22:53 * 启动之后关闭该消费者 模拟其接收不到消息 */ public class Consumer01 { //普通交换机名称 private static final String NORMAL_EXCHANGE = "normal_exchange"; //死信交换机名称 private static final String DEAD_EXCHANGE = "dead_exchange"; public static void main(String[] argv) throws Exception { Channel channel = ConnectionMq.getChannel(); //声明死信和普通交换机 类型为 direct channel.exchangeDeclare(NORMAL_EXCHANGE, BuiltinExchangeType.DIRECT); channel.exchangeDeclare(DEAD_EXCHANGE, BuiltinExchangeType.DIRECT); //声明死信队列 String deadQueue = "dead-queue"; channel.queueDeclare(deadQueue, false, false, false, null); //死信队列绑定死信交换机与 routingkey channel.queueBind(deadQueue, DEAD_EXCHANGE, "lisi"); //正常队列绑定死信队列信息 Map<String, Object> params = new HashMap<>(); //过期时间 10s=100000ms;可在生产者出设置 // params.put("x-message-ttl", 10000); //正常队列设置死信交换机 参数 key 是固定值 params.put("x-dead-letter-exchange", DEAD_EXCHANGE); //正常队列设置死信 routing-key 参数 key 是固定值 params.put("x-dead-letter-routing-key", "lisi"); String normalQueue = "normal-queue"; //声明普通队列 channel.queueDeclare(normalQueue, false, false, false, params); channel.queueBind(normalQueue, NORMAL_EXCHANGE, "zhangsan"); System.out.println("等待接收消息....."); DeliverCallback deliverCallback = (consumerTag, delivery) -> { String message = new String(delivery.getBody(), "UTF-8"); System.out.println("Consumer01 接收到消息"+message); }; channel.basicConsume(normalQueue, true, deliverCallback, consumerTag -> { }); } }
消费者2:
import com.cn.mq.utils.ConnectionMq; import com.rabbitmq.client.BuiltinExchangeType; import com.rabbitmq.client.Channel; import com.rabbitmq.client.DeliverCallback; /** * @Description * @ClassName Consumer02 * @Author yangff * @date 2021.09.28 22:54 */ public class Consumer02 { private static final String DEAD_EXCHANGE = "dead_exchange"; public static void main(String[] argv) throws Exception { Channel channel = ConnectionMq.getChannel(); channel.exchangeDeclare(DEAD_EXCHANGE, BuiltinExchangeType.DIRECT); String deadQueue = "dead-queue"; channel.queueDeclare(deadQueue, false, false, false, null); channel.queueBind(deadQueue, DEAD_EXCHANGE, "lisi"); System.out.println("等待接收死信队列消息....."); DeliverCallback deliverCallback = (consumerTag, delivery) -> { String message = new String(delivery.getBody(), "UTF-8"); System.out.println("Consumer02 接收死信队列的消息" + message); }; channel.basicConsume(deadQueue, true, deliverCallback, consumerTag -> { }); } }
2.队列达到最大长度
设置最大长度在生产者和消费者都可定义
生产者:
消费者:
3.消息被拒绝
配置消息拒绝时,需要进行手动进行消息确认,将生产者中设置消息的 TTL 时间取消,在消费者中将autoAck设置false;