Java教程

【Java从0到架构师】RocketMQ 使用 - 集成 SpringBoot

本文主要是介绍【Java从0到架构师】RocketMQ 使用 - 集成 SpringBoot,对大家解决编程问题具有一定的参考价值,需要的程序猿们随着小编来一起学习吧!

RocketMQ 消息中间件

  • 集成 SpringBoot
    • 入门案例
    • 生产消息类型 - 同步、异步、一次性
    • 消费模式 - 集群、广播
    • 延时消息
    • 设置消息标签
    • 设置消息的 Key
    • 自定义属性设置
    • 消息过滤
    • 发送消息的方式

Java 从 0 到架构师目录:【Java从0到架构师】学习记录

集成 SpringBoot

入门案例

依赖:

<dependency>
	<groupId>org.apache.rocketmq</groupId>
	<artifactId>rocketmq-spring-boot-starter</artifactId>
	<version>2.1.0</version>
</dependency>

生产者

  • 配置文件:
rocketmq.name-server=192.168.52.128:9876
rocketmq.producer.group=my_group
server.port=9999
  • 实现代码:
@RestController
public class HelloController {
    @Autowired
    private RocketMQTemplate rocketMQTemplate;
    
    @RequestMapping("01_hello")
    public String sendMsg(String message) throws Exception{
        SendResult sendResult = rocketMQTemplate.syncSend("01_boot_hello", message);
        return JSON.toJSONString(sendResult);
    }
}

消费者

  • 配置文件:
rocketmq.name-server=192.168.52.128:9876
server.port=7777
  • 实现代码:
@Component
@RocketMQMessageListener(
        topic = "01_boot_hello",
        consumerGroup = "maoge_consumer"
)
public class HelloConsumer implements RocketMQListener<String> {
    @Override
    public void onMessage(String msg) {
        System.out.println("接收到的消息:" + msg);
    }
}

生产消息类型 - 同步、异步、一次性

同步消息syncSend

rocketMQTemplate.syncSend("01_boot_hello", message);

异步消息asyncSend

rocketMQTemplate.asyncSend("02_boot_async", message, new SendCallback() {
    @Override
    public void onSuccess(SendResult sendResult) {
        System.out.println("消息发送成功:" + JSON.toJSONString(sendResult));
    }
    @Override
    public void onException(Throwable throwable) {
        System.out.println("消息处理失败:" + throwable.getMessage());
    }
});

一次性消息sendOneWay

rocketMQTemplate.sendOneWay("03_boot_oneway",message);

消费模式 - 集群、广播

集群模式

@Component
@RocketMQMessageListener(
        topic = "02_boot_model",
        messageModel = MessageModel.CLUSTERING,
        consumerGroup = "02_boot_cluster"
)
public class ClusterConsumer01 implements RocketMQListener<MessageExt> {
    @Override
    public void onMessage(MessageExt message) {
        System.out.println(JSON.toJSONString(message));
    }
}

广播模式

// 广播模式是实时消费消息的,在广播模式消费者启动之前的消息,无法接收
// 广播模式下发送失败的消息不会重试
@Component
@RocketMQMessageListener(
        topic = "02_boot_model",
        messageModel = MessageModel.BROADCASTING,
        consumerGroup = "02_boot_cluster"
)
public class ClusterConsumer02 implements RocketMQListener<MessageExt> {
    @Override
    public void onMessage(MessageExt message) {
        System.out.println(JSON.toJSONString(message));
    }
}

延时消息

使用原生的 Produce 对象

DefaultMAProducer producer =  rocketMQTemplate.getProducer();
Message message = new Message(topic, "TagA", "9527", msg.getBytes());
message.setDelayTimeLevel(3);
// 在实际工作中,确保消息可靠性,捕获对应的异常
producer.send(message);

使用 Spring 接口

Message<String> msg = MessageBuilder.withPayload(message).build();
rocketMQTemplate.syncSend("04_boot_delay", msg, 3000, 3);

设置消息标签

// 在发送的消息 Topic:Tag 中间使用冒号隔开
rocketMQTemplate.convertAndSend("01-boot-hello:TagB", message, map);

设置消息的 Key

Message<?> msg = MessageBuilder.withPayload(message)
	.setHeader(MessageConst.PROPERTY_KEYS, "1100").build();
rocketMQTemplate.send("01-boot-hello", msg);

自定义属性设置

// 过滤设置: 需要开启 broker 的支持用户属性配置
// enablePropertyFilter=true

Map<String,Object> map=new HashMap<>();
//用户自定义属性
map.put("name", "hesj");
map.put("age", "18");
//也可以设置系统属性
map.put(MessageConst.PROPERTY_KEYS,age);
rocketMQTemplate.convertAndSend("01-boot-hello:TagB", message, map);

消息过滤

// 在RocketMQMessageListener添加注解
@RocketMQMessageListener(
        consumerGroup = "02_boot_cluster",
        topic = "02_boot_model",
        messageModel = MessageModel.BROADCASTING,
		// 	消息过滤
        selectorType = SelectorType.TAG,
		selectorExpression = "age > 16"
)

发送消息的方式

  • 直接使用 rocketMQTemplate
  • 使用 DefaultMQProducer 对象
  • 使用 Spring 的 Message 接口
这篇关于【Java从0到架构师】RocketMQ 使用 - 集成 SpringBoot的文章就介绍到这儿,希望我们推荐的文章对大家有所帮助,也希望大家多多支持为之网!