课程信息:
- 课程名称:RabbitMQ消息中间件极速入门与实战
- 章节名称:RabbitMQ整合SpringBoot2.x 消费端处理消息
- 讲师姓名:阿神
课程内容:
编写消费端的相关操作
1.编写application.properties文件
# SpringBoot整合RabbitMQ的基本配置: spring.rabbitmq.addresses=127.0.0.1 spring.rabbitmq.port=5672 spring.rabbitmq.username=guest spring.rabbitmq.password=guest spring.rabbitmq.virtual-host=/ spring.rabbitmq.connection-timeout=15000 # SpringBoot整合RabbitMQ的消费端配置: spring.rabbitmq.listener.simple.concurrency=5 #签收模式:手工签收 spring.rabbitmq.listener.simple.acknowledge-mode=manual # 最大并发 spring.rabbitmq.listener.simple.max-concurrency=10 # 限流 同一时间只能有一条数据 spring.rabbitmq.listener.simple.prefetch=1 # ???? server.servlet.context-path=/ server.port=8002
2. 编写Order类
3. 编写OrderReceive类,这是实现消费者功能的主要类
@Component public class OrderReceive { // 此注解当控制台中没有设置的exchange与Queue时,可自动创建 @RabbitListener(bindings = @QueueBinding( value = @Queue(value = "order-queue", durable = "true"), exchange = @Exchange(value = "order-exchange", durable = "true", type = "topic"), key = "order.*" )) // 手工签收依赖于Channel // @RabbitHandler: 标识如果有消息过来,消费者要消费的时候调用此方法,没有监听绑定队列 @RabbitHandler public void onOrderMessage(@Payload Order order, @Headers Map<String, Objects> headers, Channel channel) throws Exception{ // 消费者操作 System.out.println("---------收到消息,开始消费-----------"); System.out.println("订单ID:" + order.getId()); // Objects deliveryTag = headers.get(AmqpHeaders.DELIVERY_TAG); long l = Long.parseLong(Objects.toString(headers.get(AmqpHeaders.DELIVERY_TAG))); // 第一个参数: // 第二个参数:表示是否支持批量签收 // 发送一个签收的响应 channel.basicAck(l, false); } }
4. 在producer发送一条消息,可直接在consumer接受一条消息,如下图
5. 这里出现了一个小问题,这里的Long与Object转换的时候,我使用了toString()转化为String类型,在转化为Long类型,如下:
long l = Long.parseLong(headers.get(AmqpHeaders.DELIVERY_TAG).toString());
但这样写会出现一个错误,如下图:
这里改为这样,使用Object.toString()即可解决:
long l = Long.parseLong(Objects.toString(headers.get(AmqpHeaders.DELIVERY_TAG)));
学习心得:
本节课我学习了RabbitMQ消费者端的一些操作,对RabbitMQ有了一个更深的了解。
课程截图: