本文主要是介绍rocketmq核心源码分析第二十三篇一顺序消息,对大家解决编程问题具有一定的参考价值,需要的程序猿们随着小编来一起学习吧!
文章目录
顺序消息
使用demo
- 通过MessageQueueSelector对mqs进行选择
- 一般按业务维度保障分区顺序
defaultMQProducer.send(msg,new MessageQueueSelector(){
public MessageQueue select(List<MessageQueue> mqs, Message msg, Object arg) {
通常用法: msg的业务唯一键相同的消息发送到同一队列
保障分区顺序性
int i = (int)arg % mqs.size();
return mqs.get(i);
}
},16/* 为select方法的arg参数*/ );
源码分析
- 顺序消息通过sendSelectImpl实现发送
- 获取topic对应的TopicPublishInfo
- 获取topic消息队列集合
- 根据selector选择指定的消息队列
- 消息发送
private SendResult sendSelectImpl(
Message msg,
MessageQueueSelector selector,
Object arg,
final CommunicationMode communicationMode,
final SendCallback sendCallback, final long timeout
) throws MQClientException, RemotingException, MQBrokerException, InterruptedException {
long beginStartTime = System.currentTimeMillis();
this.makeSureStateOK();
Validators.checkMessage(msg, this.defaultMQProducer);
获取topic发布信息
TopicPublishInfo topicPublishInfo = this.tryToFindTopicPublishInfo(msg.getTopic());
if (topicPublishInfo != null && topicPublishInfo.ok()) {
MessageQueue mq = null;
try {
获取topic消息队列集合
List<MessageQueue> messageQueueList =
mQClientFactory.getMQAdminImpl().parsePublishMessageQueues(topicPublishInfo.getMessageQueueList());
Message userMessage = MessageAccessor.cloneMessage(msg);
String userTopic = NamespaceUtil.withoutNamespace(userMessage.getTopic(), mQClientFactory.getClientConfig().getNamespace());
userMessage.setTopic(userTopic);
根据selector选择指定的消息队列
mq = mQClientFactory.getClientConfig().queueWithNamespace(selector.select(messageQueueList, userMessage, arg));
} catch (Throwable e) {
throw new MQClientException("select message queue throwed exception.", e);
}
long costTime = System.currentTimeMillis() - beginStartTime;
if (timeout < costTime) {
throw new RemotingTooMuchRequestException("sendSelectImpl call timeout");
}
消息发送
if (mq != null) {
return this.sendKernelImpl(msg, mq, communicationMode, sendCallback, null, timeout - costTime);
} else {
throw new MQClientException("select message queue return null.", null);
}
}
validateNameServerSetting();
throw new MQClientException("No route info for this topic, " + msg.getTopic(), null);
}
总结
- 顺序消息是指将同一业务键的消息发往同一消息队列MessageQueue
- 只保障分区顺序性
- 消费端不管是并发消费还是顺序消费都是按照MessageQueue的维度进行拉取,但并发消费由于多线程干扰[所以顺序消费最好是采用ConsumeMessageOrderlyService]
这篇关于rocketmq核心源码分析第二十三篇一顺序消息的文章就介绍到这儿,希望我们推荐的文章对大家有所帮助,也希望大家多多支持为之网!