消息有序指可以按照消息的发送顺序来消费。RocketMQ可以保证消息是有序的。
在MQ模型中,序列需要通过3个阶段来保证:
1)发送消息时保持顺序。
2)消息的存储顺序与发送顺序相同。
3)当消息被消费时,它们的保存顺序与它们存储的顺序相同。
发送时保持顺序是指对于有顺序要求的消息,用户应该在同一个线程中同步发送。 存储和发送的顺序是一样的,它要求在同一个线程中发送的消息A和B必须在空间A之前存储。 消费和存储的一致性要求消息A和B到达Consumer后必须按照A和B的顺序进行处理。
1. 生产者
创建一个OrderProducer类:
import org.apache.rocketmq.client.exception.MQBrokerException; import org.apache.rocketmq.client.exception.MQClientException; import org.apache.rocketmq.client.producer.DefaultMQProducer; import org.apache.rocketmq.client.producer.MessageQueueSelector; import org.apache.rocketmq.client.producer.SendResult; import org.apache.rocketmq.common.message.Message; import org.apache.rocketmq.common.message.MessageQueue; import org.apache.rocketmq.remoting.common.RemotingHelper; import org.apache.rocketmq.remoting.exception.RemotingException; import java.io.UnsupportedEncodingException; import java.util.List; public class OrderProducer { public static void main(String[] args) throws MQClientException, UnsupportedEncodingException, RemotingException, InterruptedException, MQBrokerException { //1. 创建消息生产者producer,并制定生产者组名 DefaultMQProducer producer = new DefaultMQProducer("demo_producer_group"); //2. 指定Nameserver地址 producer.setNamesrvAddr("192.168.195.128:9876"); //3. 启动producer producer.start(); //4. 创建消息 for (int i = 0; i < 5; i++) { Message msg = new Message("TopicOrderDemo", "Tag1", "Key" + i, ("HelloWorld" + i).getBytes(RemotingHelper.DEFAULT_CHARSET)); //5. 发送消息 SendResult result = producer.send(msg, new MessageQueueSelector() { /** * * @param mqs:发送的消息信息 * @param msg:消息对象 * @param arg:指定对应的队列下标 * @return */ @Override public MessageQueue select(List<MessageQueue> mqs, Message msg, Object arg) { //获取队列的下标 Integer index = (Integer) arg; //获取对应下标的队列 return mqs.get(index); } }, 1); System.out.println(result); } //6. 关闭producer producer.shutdown(); } }
输出:
SendResult [sendStatus=SEND_OK, msgId=C0A8017A385818B4AAC250DAEAF70000, offsetMsgId=C0A8C38000002A9F000000000000086F, messageQueue=MessageQueue [topic=TopicOrderDemo, brokerName=rocketmq-nameserver1, queueId=1], queueOffset=0]
SendResult [sendStatus=SEND_OK, msgId=C0A8017A385818B4AAC250DAEB130001, offsetMsgId=C0A8C38000002A9F0000000000000937, messageQueue=MessageQueue [topic=TopicOrderDemo, brokerName=rocketmq-nameserver1, queueId=1], queueOffset=1]
SendResult [sendStatus=SEND_OK, msgId=C0A8017A385818B4AAC250DAEB1C0002, offsetMsgId=C0A8C38000002A9F00000000000009FF, messageQueue=MessageQueue [topic=TopicOrderDemo, brokerName=rocketmq-nameserver1, queueId=1], queueOffset=2]
SendResult [sendStatus=SEND_OK, msgId=C0A8017A385818B4AAC250DAEB1E0003, offsetMsgId=C0A8C38000002A9F0000000000000AC7, messageQueue=MessageQueue [topic=TopicOrderDemo, brokerName=rocketmq-nameserver1, queueId=1], queueOffset=3]
SendResult [sendStatus=SEND_OK, msgId=C0A8017A385818B4AAC250DAEB210004, offsetMsgId=C0A8C38000002A9F0000000000000B8F, messageQueue=MessageQueue [topic=TopicOrderDemo, brokerName=rocketmq-nameserver1, queueId=1], queueOffset=4]
2. 消费者
创建一个OrderConsumer类:
import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer; import org.apache.rocketmq.client.consumer.listener.*; import org.apache.rocketmq.client.exception.MQClientException; import org.apache.rocketmq.common.message.MessageExt; import org.apache.rocketmq.remoting.common.RemotingHelper; import java.io.UnsupportedEncodingException; import java.util.List; public class OrderConsumer { public static void main(String[] args) throws MQClientException { //1. 创建消费者Consumer,制定消费者组名 DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("demo_producer_group"); //2. 指定Nameserver地址 consumer.setNamesrvAddr("192.168.195.128:9876"); //3. 订阅主题Topic和Tag consumer.subscribe("TopicOrderDemo", //要消费的主题 "*"); //过滤规则 //4. 创建消息监听 consumer.setMessageListener(new MessageListenerOrderly() { @Override public ConsumeOrderlyStatus consumeMessage(List<MessageExt> msgs, ConsumeOrderlyContext context) { //读取消息 for (MessageExt msg : msgs) { //获取主题 String topic = msg.getTopic(); //获取标签 String tags = msg.getTags(); //获取信息 byte[] body = msg.getBody(); try { String result = new String(body, RemotingHelper.DEFAULT_CHARSET); System.out.println("OrderConsumer消费信息——Topic: " + topic + ", Tags: " + tags + ", Result: " + result); } catch (UnsupportedEncodingException e) { e.printStackTrace(); return ConsumeOrderlyStatus.SUSPEND_CURRENT_QUEUE_A_MOMENT; } } return ConsumeOrderlyStatus.SUCCESS; } }); //5.启动消费者consumer consumer.start(); } }
输出:
OrderConsumer消费信息——Topic: TopicOrderDemo, Tags: Tag1, Result: HelloWorld0
OrderConsumer消费信息——Topic: TopicOrderDemo, Tags: Tag1, Result: HelloWorld1
OrderConsumer消费信息——Topic: TopicOrderDemo, Tags: Tag1, Result: HelloWorld2
OrderConsumer消费信息——Topic: TopicOrderDemo, Tags: Tag1, Result: HelloWorld3
OrderConsumer消费信息——Topic: TopicOrderDemo, Tags: Tag1, Result: HelloWorld4