Java 从 0 到架构师目录:【Java从0到架构师】学习记录
依赖:
<dependency> <groupId>org.apache.rocketmq</groupId> <artifactId>rocketmq-spring-boot-starter</artifactId> <version>2.1.0</version> </dependency>
生产者:
rocketmq.name-server=192.168.52.128:9876 rocketmq.producer.group=my_group server.port=9999
@RestController public class HelloController { @Autowired private RocketMQTemplate rocketMQTemplate; @RequestMapping("01_hello") public String sendMsg(String message) throws Exception{ SendResult sendResult = rocketMQTemplate.syncSend("01_boot_hello", message); return JSON.toJSONString(sendResult); } }
消费者:
rocketmq.name-server=192.168.52.128:9876 server.port=7777
@Component @RocketMQMessageListener( topic = "01_boot_hello", consumerGroup = "maoge_consumer" ) public class HelloConsumer implements RocketMQListener<String> { @Override public void onMessage(String msg) { System.out.println("接收到的消息:" + msg); } }
同步消息:syncSend
rocketMQTemplate.syncSend("01_boot_hello", message);
异步消息:asyncSend
rocketMQTemplate.asyncSend("02_boot_async", message, new SendCallback() { @Override public void onSuccess(SendResult sendResult) { System.out.println("消息发送成功:" + JSON.toJSONString(sendResult)); } @Override public void onException(Throwable throwable) { System.out.println("消息处理失败:" + throwable.getMessage()); } });
一次性消息:sendOneWay
rocketMQTemplate.sendOneWay("03_boot_oneway",message);
集群模式:
@Component @RocketMQMessageListener( topic = "02_boot_model", messageModel = MessageModel.CLUSTERING, consumerGroup = "02_boot_cluster" ) public class ClusterConsumer01 implements RocketMQListener<MessageExt> { @Override public void onMessage(MessageExt message) { System.out.println(JSON.toJSONString(message)); } }
广播模式:
// 广播模式是实时消费消息的,在广播模式消费者启动之前的消息,无法接收 // 广播模式下发送失败的消息不会重试 @Component @RocketMQMessageListener( topic = "02_boot_model", messageModel = MessageModel.BROADCASTING, consumerGroup = "02_boot_cluster" ) public class ClusterConsumer02 implements RocketMQListener<MessageExt> { @Override public void onMessage(MessageExt message) { System.out.println(JSON.toJSONString(message)); } }
使用原生的 Produce 对象:
DefaultMAProducer producer = rocketMQTemplate.getProducer(); Message message = new Message(topic, "TagA", "9527", msg.getBytes()); message.setDelayTimeLevel(3); // 在实际工作中,确保消息可靠性,捕获对应的异常 producer.send(message);
使用 Spring 接口:
Message<String> msg = MessageBuilder.withPayload(message).build(); rocketMQTemplate.syncSend("04_boot_delay", msg, 3000, 3);
// 在发送的消息 Topic:Tag 中间使用冒号隔开 rocketMQTemplate.convertAndSend("01-boot-hello:TagB", message, map);
Message<?> msg = MessageBuilder.withPayload(message) .setHeader(MessageConst.PROPERTY_KEYS, "1100").build(); rocketMQTemplate.send("01-boot-hello", msg);
// 过滤设置: 需要开启 broker 的支持用户属性配置 // enablePropertyFilter=true Map<String,Object> map=new HashMap<>(); //用户自定义属性 map.put("name", "hesj"); map.put("age", "18"); //也可以设置系统属性 map.put(MessageConst.PROPERTY_KEYS,age); rocketMQTemplate.convertAndSend("01-boot-hello:TagB", message, map);
// 在RocketMQMessageListener添加注解 @RocketMQMessageListener( consumerGroup = "02_boot_cluster", topic = "02_boot_model", messageModel = MessageModel.BROADCASTING, // 消息过滤 selectorType = SelectorType.TAG, selectorExpression = "age > 16" )