消息发送的步骤简要罗列如下:
首先IDEA选择maven框架quick start,创建完毕之后,在POM.xml文件里面导入 rocketmq-client 依赖。
<dependency> <groupId>org.apache.rocketmq</groupId> <artifactId>rocketmq-client</artifactId> <version>4.7.1</version> </dependency>
同步消息的特点是可靠性高、用途广泛,应用场景有:短信通知和重要消息通知类功能。
同步发送消息代码如下:
private static final String SYNC_MSG_TOPIC = "SYNC_MSG_TOPIC"; private static final String SYNC_MSG_TAG = "SYNC_MSG_TAG"; private static final String NAMESRV_ADDR = "localhost:19876;localhost:29876"; private static final String PRODUCER_GROUP = "GROUP_NAME"; public static void main(String[] args) throws MQClientException, MQBrokerException, RemotingException, InterruptedException { //创建Producer、设置Namesrv地址、启动Producer DefaultMQProducer producer = new DefaultMQProducer(PRODUCER_GROUP); producer.setNamesrvAddr(NAMESRV_ADDR); producer.start(); for (int i = 0; i < 10; i++) { //设置topic、tag和内容 Message msg = new Message(SYNC_MSG_TOPIC, SYNC_MSG_TAG, (i + "号消息里面包含的数据内容").getBytes(StandardCharsets.UTF_8)); //发送 System.out.printf("%d号消息,发送状态:%s\n", i, producer.send(msg)); //线程睡1s之后再发送下一条 TimeUnit.SECONDS.sleep(1); } //关闭生产者,否则程序不会退出 producer.shutdown(); }
异步消息通常用在对响应时间要求短的业务场景,即Producer不能容忍Broker太长时间的响应。
异步消息的代码特点就是需要制定回调函数,当消息回传回来的时候,需要调用回调函数执行响应消息的处理逻辑。
异步消息发送代码如下:
private static final String ASYNC_MSG_TOPIC = "ASYNC_MSG_TOPIC"; private static final String ASYNC_MSG_TAG = "ASYNC_MSG_TAG"; private static final String NAMESRV_ADDR = "localhost:19876;localhost:29876"; private static final String PRODUCER_GROUP = "GROUP_NAME"; @SneakyThrows public static void main(String[] args) { DefaultMQProducer producer = initProducer(); for (int i = 0; i < 10; i++) { Message msg = new Message(ASYNC_MSG_TOPIC, ASYNC_MSG_TAG, (i + "号消息里面包含的数据内容").getBytes(StandardCharsets.UTF_8)); producer.send(msg, new SendCallback() { @Override public void onSuccess(SendResult result) { System.out.println("发送结果:" + result); } @Override public void onException(Throwable e) { System.out.println("发送异常:" + e.getMessage()); } }); TimeUnit.SECONDS.sleep(1); } producer.shutdown(); } private static DefaultMQProducer initProducer() throws MQClientException { DefaultMQProducer producer = new DefaultMQProducer(PRODUCER_GROUP); producer.setNamesrvAddr(NAMESRV_ADDR); producer.start(); return producer; }
根据运行程序的性能情况,异步发送的消息也有可能会发送失败。比如将下面的线程睡眠的时延设定为
TimeUnit.MILLISECONDS.sleep(25);
生产者不关心发送的消息的接收情况,比如发送日志。
public static void main(String[] args) throws MQClientException, RemotingException, InterruptedException { DefaultMQProducer producer = initProducer(); for (int i = 0; i < 10; i++) { //设置topic、tag和内容 Message msg = new Message(ONE_WAY_MSG_TOPIC, ONE_WAY_MSG_TAG, (i + "号消息里面包含的数据内容").getBytes(StandardCharsets.UTF_8)); //发送,无返回值 producer.sendOneway(msg); //线程睡1s之后再发送下一条 TimeUnit.MILLISECONDS.sleep(100); } //关闭生产者,否则程序不会退出 producer.shutdown(); }
消息消费的步骤简要罗列如下:
使用消费者,消费之前产生并存储在Broker节点里面的异步消息:
private static final String CONSUMER_GROUP = "CONSUMER_GROUP"; private static final String NAMESRV_ADDR = "localhost:19876;localhost:29876"; @SneakyThrows public static void main(String[] args) { //指定异步生产者同主体和标签 DefaultMQPushConsumer consumer = initConsumer(AsyncProducer.ASYNC_MSG_TOPIC, AsyncProducer.ASYNC_MSG_TAG); //接收消息内容 consumer.registerMessageListener((MessageListenerConcurrently) (msgs, context) -> { msgs.forEach(msg-> System.out.println(new String(msg.getBody()))); return ConsumeConcurrentlyStatus.CONSUME_SUCCESS; }); //启动消费者 consumer.start(); } private static DefaultMQPushConsumer initConsumer(String topic, String tag) throws MQClientException { DefaultMQPushConsumer consumer = new DefaultMQPushConsumer(CONSUMER_GROUP); consumer.setNamesrvAddr(NAMESRV_ADDR); consumer.subscribe(topic, tag); return consumer; }
可以同时操作之前的任意生产者程序(注意要配置topic和tag)来实时的、直观的查看消息的生产和消费,一旦生产者生产结束之后,消费者马上就能获取到订阅的消息并输出。
解析和要点:
consumer.subscribe(topic, tag);
这种调用,也可以是 consumer.subscribe(topic, "tag1||tag2||tag3");
这种写法来支持多个tag的消息接收。消费者可以启动多个,能够更加快速的处理发布的消息。既然有多个消费者,那么就要考虑谁消费多少消息的问题。
每个消费者消费相同的的消息。比如发布消息a、b、c,广播模式就像发送广播一样每个人都能听见相同内容,对于消费者们来说,每个人都要处理一遍a、b、c。
配合CONSUMER_GROUP = "CONSUMER_GROUP"
,能够使得同个消费者组的每个消费者消费同样多的消息。
消费者们共同处理消息a、b、c,每个人的数量不一样,但是消费的消息总数就是3个,比如消费者A消费a、b,消费者B消费c。
配合CONSUMER_GROUP = "CONSUMER_GROUP"
,能够使得同个消费者组的每个消费者共同消费掉一批消息,各自分工来实现负载均衡的效果。
开启两个之前的Consumer程序。
consumer.setMessageModel(model);
在consumer设置里面设置model为 MessageModel.CLUSTERING
负载均衡模式。
@SneakyThrows public static void main(String[] args) { //指定异步生产者同主体和标签 DefaultMQPushConsumer consumer = initConsumer(ONE_WAY_MSG_TOPIC, "*", MessageModel.CLUSTERING); //... } private static DefaultMQPushConsumer initConsumer(String topic, String tag, MessageModel model) throws MQClientException { DefaultMQPushConsumer consumer = new DefaultMQPushConsumer(CONSUMER_GROUP); //... consumer.setMessageModel(model); //... }
默认的消费模式是负载均衡,可以修改为广播模式
MessageModel.BROADCASTING
,可以看见每个消费者都消费相同数量的消息
顺序消息要求,消费者消费消息的顺序,必须和生产者生产消息的顺序一致。
既然是消息队列,那么底层数据结构给人的第一想法就是队列结构,天生就是因为这种数据结构从而满足FIFO的特质。可是rmq并不是这样。
rmq的broker持有多个队列,pdc如果发送多个Topic消息之后,brk的多个队列会分别收入到对应Topic的队列里面。消费者采用监听的方式,多线程的去接收消息。但是问题出在,消费者的多只“耳朵”,在没有控制顺序的机制的保证下,是无法做到顺序的收听到消息的。
保证消息顺序的做法一般如下:
保证局部消息的顺序的做法就是将甲乙各自享有两个消息队列,各自的消息按照顺序在队列里存储并发送即可,消费的时候每个队列里的消息都有对应的消费者(消费者和队列是一对多关系)独自单线程的消费消息。
下面给出一个下订单的案例,其流程是:
订单号相同的会被先后发送到同个消息队列中,消费时,消费者会根据订单号找到同个队列,按照流程1、2、3、4的获取消息。
参见附录
public static final String ORDER_TOPIC = "ORDER_TOPIC"; public static final String ORDER_TAG = "ORDER_TAG"; @SneakyThrows public static void main(String[] args) { DefaultMQProducer producer = MqUtils.initProducer(); //待发送订单集合 List<Order> orderList = Order.orders(); //发送消息 for (int i = 0, orderListSize = orderList.size(); i < orderListSize; i++) { Order o = orderList.get(i); Message msg = new Message(ORDER_TOPIC, ORDER_TAG, "index:" + i, o.toString() .getBytes(StandardCharsets.UTF_8)); /* * MessageQueueSelector消息队列选择器,生产者到底将生产的消息放在哪个队列由此对象给出的规则决定 * o.getOrderId() 作为业务标识: 订单id */ SendResult sendResult = producer.send(msg, new MessageQueueSelector() { /** * * @param mqs 队列集合 * @param msg 消息对象 * @param arg 业务表示的参数 来自o.getOrderId() * @return */ @Override public MessageQueue select(List<MessageQueue> mqs, Message msg, Object arg) { Long orderId = (Long) arg; //自定义订单队列选择规则:取模 int mod = (int) (orderId % mqs.size()); return mqs.get(mod); } }, o.getOrderId()); System.out.println("发送结果" + sendResult); } //关闭 producer.shutdown(); }
@SneakyThrows public static void main(String[] args) { DefaultMQPushConsumer consumer = MqUtils.initConsumer(SequenceProducer.ORDER_TOPIC, "*", null); //注册回调处理逻辑 consumer.registerMessageListener((MessageListenerOrderly) (msgs, context) -> { msgs.forEach(msg -> System.out.println("线程名称" + Thread.currentThread() .getName() + "消费了消息:" + new String(msg.getBody()))); return ConsumeOrderlyStatus.SUCCESS; }); consumer.start(); System.out.println("消费者启动!"); }
消费者启动!
线程名称ConsumeMessageThread_2消费了消息:Order(orderId=1001, description=创建订单)
线程名称ConsumeMessageThread_1消费了消息:Order(orderId=2002, description=创建订单)
线程名称ConsumeMessageThread_3消费了消息:Order(orderId=1099, description=创建订单)
线程名称ConsumeMessageThread_3消费了消息:Order(orderId=1099, description=付款)
线程名称ConsumeMessageThread_1消费了消息:Order(orderId=2002, description=付款)
线程名称ConsumeMessageThread_2消费了消息:Order(orderId=1001, description=付款)
线程名称ConsumeMessageThread_1消费了消息:Order(orderId=2002, description=推送)
线程名称ConsumeMessageThread_2消费了消息:Order(orderId=1001, description=推送)
线程名称ConsumeMessageThread_2消费了消息:Order(orderId=1001, description=完成)
可以看到,同一个订单的消费顺序,完全按照预定的顺序消费。无论是完整的订单(编号1001)还是其余两个不完整流程的订单。
消息不会立即给消费者消费,而是给Message设定的时延等待之后去消费。
private static final Integer DELAY_TIME_LEVEL_5_SEC = 2; public static final String DELAY_TOPIC = "DELAY_TOPIC"; @SneakyThrows public static void main(String[] args) { DefaultMQProducer producer = initProducer(); for (int i = 0; i < 10; i++) { Message msg = new Message(DELAY_TOPIC, "*", ("消息的内容:Hello World from" + i).getBytes(StandardCharsets.UTF_8)); msg.setDelayTimeLevel(DELAY_TIME_LEVEL_5_SEC); SendResult send = producer.send(msg); System.out.println("发送结果:" + send); TimeUnit.SECONDS.sleep(1); } producer.shutdown(); }
@SneakyThrows public static void main(String[] args) { DefaultMQPushConsumer consumer = initConsumer(DelayProducer.DELAY_TOPIC, "*", null); consumer.registerMessageListener((MessageListenerConcurrently) (msgs, context) -> { prettyPrintMessage(msgs); return ConsumeConcurrentlyStatus.CONSUME_SUCCESS; }); consumer.start(); System.out.println("消费者启动完成!"); }
线程名称ConsumeMessageThread_2消费了消息:消息的内容:Hello World from2延迟时间:88219
线程名称ConsumeMessageThread_4消费了消息:消息的内容:Hello World from0延迟时间:90260
线程名称ConsumeMessageThread_6消费了消息:消息的内容:Hello World from8延迟时间:82175
线程名称ConsumeMessageThread_5消费了消息:消息的内容:Hello World from9延迟时间:81164
线程名称ConsumeMessageThread_3消费了消息:消息的内容:Hello World from1延迟时间:89226
线程名称ConsumeMessageThread_1消费了消息:消息的内容:Hello World from3延迟时间:87213
线程名称ConsumeMessageThread_7消费了消息:消息的内容:Hello World from4延迟时间:87194
线程名称ConsumeMessageThread_8消费了消息:消息的内容:Hello World from5延迟时间:86189
线程名称ConsumeMessageThread_9消费了消息:消息的内容:Hello World from7延迟时间:84172
线程名称ConsumeMessageThread_10消费了消息:消息的内容:Hello World from6延迟时间:85183
将生产者的时延设置取消掉,也就是将msg.setDelayTimeLevel(DELAY_TIME_LEVEL_5_SEC)这段代码注释掉再次启动生产者,对比观察一下效果。
线程名称ConsumeMessageThread_11消费了消息:消息的内容:Hello World from0延迟时间:3
线程名称ConsumeMessageThread_12消费了消息:消息的内容:Hello World from1延迟时间:2
线程名称ConsumeMessageThread_13消费了消息:消息的内容:Hello World from2延迟时间:3
线程名称ConsumeMessageThread_14消费了消息:消息的内容:Hello World from3延迟时间:2
线程名称ConsumeMessageThread_15消费了消息:消息的内容:Hello World from4延迟时间:2
线程名称ConsumeMessageThread_16消费了消息:消息的内容:Hello World from5延迟时间:3
线程名称ConsumeMessageThread_17消费了消息:消息的内容:Hello World from6延迟时间:2
线程名称ConsumeMessageThread_18消费了消息:消息的内容:Hello World from7延迟时间:3
线程名称ConsumeMessageThread_19消费了消息:消息的内容:Hello World from8延迟时间:3
线程名称ConsumeMessageThread_20消费了消息:消息的内容:Hello World from9延迟时间:3
之前的消息发送都是在for循环里面去做,循环调用生产者的send方法并获得结果。当然生产者存在一个方法的重写,可以实现对一个集合的消息进行接受,批量的发送消息。
整个代码其余部分和上面的大致不差,除了传入Message对象集合交给生产者发送的地方有区别以外:
Message msg1 = new Message(BATCH_TOPIC, "*", ("消息的内容:Hello World from 1").getBytes(UTF_8)); Message msg2 = new Message(BATCH_TOPIC, "*", ("消息的内容:Hello World from 1").getBytes(UTF_8)); Message msg3 = new Message(BATCH_TOPIC, "*", ("消息的内容:Hello World from 1").getBytes(UTF_8)); List<Message> messages = new ArrayList<>(Arrays.asList(msg1, msg2, msg3)); //发送并输出 SendResult sendResult = producer.send(messages);
需要关注的是,消息的大小不能超过4M,当消息大小过大的时候,就要考虑对其进行分批发送。下面采用一个工具类去完成对消息集合的分割
ListSplitter splitter = new ListSplitter(messages); while (splitter.hasNext()) { List<Message> batch = splitter.next(); SendResult sendResult = producer.send(batch); prettyPrintSendResult(sendResult); }
工具类的代码放在附录中。
@Data @Builder public class Order { /** * 订单号 */ private Long orderId; /** * 订单步骤描述 */ private String description; public static List<Order> orders() { List<Order> orders = new ArrayList<>(); orders.add(Order.builder() .orderId(1001L) .description("创建订单") .build()); orders.add(Order.builder() .orderId(1001L) .description("付款") .build()); orders.add(Order.builder() .orderId(1001L) .description("推送") .build()); orders.add(Order.builder() .orderId(1001L) .description("完成") .build()); orders.add(Order.builder() .orderId(1099L) .description("创建订单") .build()); orders.add(Order.builder() .orderId(1099L) .description("付款") .build()); orders.add(Order.builder() .orderId(2002L) .description("创建订单") .build()); orders.add(Order.builder() .orderId(2002L) .description("付款") .build()); orders.add(Order.builder() .orderId(2002L) .description("推送") .build()); return orders; } }
public class MqUtils { /** * nameServer集群的地址 */ public static final String NAMESRV_ADDR = "localhost:19876;localhost:29876"; /** * 生产者组名 */ public static final String PRODUCER_GROUP = "GROUP_NAME"; /** *消费者组名 */ public static final String CONSUMER_GROUP = "CONSUMER_GROUP"; /** * 初始化生产者 * PRODUCER_GROUP = "GROUP_NAME"; */ @NotNull public static DefaultMQProducer initProducer() throws MQClientException { DefaultMQProducer producer = new DefaultMQProducer(PRODUCER_GROUP); producer.setNamesrvAddr(NAMESRV_ADDR); producer.start(); return producer; } /** * 初始化消费者 * PRODUCER_GROUP = "GROUP_NAME"; */ @NotNull public static DefaultMQPushConsumer initConsumer(String topic, String tag, MessageModel model) throws MQClientException { DefaultMQPushConsumer consumer = new DefaultMQPushConsumer(CONSUMER_GROUP); consumer.setNamesrvAddr(NAMESRV_ADDR); if (Objects.nonNull(model)){ consumer.setMessageModel(model); } consumer.subscribe(topic, tag); return consumer; } /** * 格式化输出消息相关信息 */ public static void prettyPrintMessage(List<MessageExt> msgs){ msgs.forEach(msg -> System.out.println("线程名称" + Thread.currentThread() .getName() + "消费了消息:" + new String(msg.getBody()))); } }
public class ListSplitter implements Iterator<List<Message>> { private static final int SIZE_LIMIT = 1024 * 1024 * 4; private static final int LOG_BYTE_SIZE = 20; private final List<Message> messages; private int currIndex; public ListSplitter(List<Message> messages) {this.messages = messages;} @Override public boolean hasNext() { return currIndex < messages.size(); } @Override public List<Message> next() { int nextIndex = currIndex; int totalSize = 0; for (; nextIndex < messages.size(); nextIndex++) { Message msg = messages.get(nextIndex); //主体和内容 int tmpSize = msg.getTopic() .length() + msg.getBody().length; //额外属性 for (Map.Entry<String, String> entry : msg.getProperties() .entrySet()) { tmpSize += entry.getKey() .length() + entry.getValue() .length(); } //日志的大小 tmpSize += LOG_BYTE_SIZE; if (tmpSize > SIZE_LIMIT) { if (nextIndex - currIndex == 0) { nextIndex++; } break; } //总大小相加比4M大 if (tmpSize + totalSize > SIZE_LIMIT) { break; } else { totalSize += tmpSize; } } List<Message> subList = messages.subList(currIndex, nextIndex); currIndex = nextIndex; return subList; } }