RocketMQ版本4.6.0,记录自己看源码的过程
发送消息的示例:
public class Producer { public static void main(String[] args) throws MQClientException, InterruptedException { DefaultMQProducer producer = new DefaultMQProducer("producerGroup"); producer.setNamesrvAddr("localhost:9876"); producer.start(); for (int i = 0; i < 1; i++) { try { Message msg = new Message("TestTopic" /* Topic */, "TagA" /* Tag */, ("Hello RocketMQ " + i).getBytes(RemotingHelper.DEFAULT_CHARSET) /* Message body */ ); SendResult sendResult = producer.send(msg); System.out.printf("%s%n", sendResult); } catch (Exception e) { e.printStackTrace(); Thread.sleep(1000); } } producer.shutdown(); } }
首先根据生产组创建一个生产者实例
public DefaultMQProducer(final String namespace, final String producerGroup, RPCHook rpcHook) { this.namespace = namespace; this.producerGroup = producerGroup; // 创建生产者内部实现类,几乎所有的功能都是委托给这个实现类 defaultMQProducerImpl = new DefaultMQProducerImpl(this, rpcHook); }
DefaultMQProducer属性:
protected final transient DefaultMQProducerImpl defaultMQProducerImpl; private final InternalLogger log = ClientLogger.getLog(); private String producerGroup; private String createTopicKey = MixAll.AUTO_CREATE_TOPIC_KEY_TOPIC; private volatile int defaultTopicQueueNums = 4; /** * 发送消息默认超时时间是3s */ private int sendMsgTimeout = 3000; /** * 消息超过4k时启用压缩 */ private int compressMsgBodyOverHowmuch = 1024 * 4; /** * 发送同步消息失败时最大的重试次数 */ private int retryTimesWhenSendFailed = 2; /** * 异步发送失败时重试次数 */ private int retryTimesWhenSendAsyncFailed = 2; private boolean retryAnotherBrokerWhenNotStoreOK = false; /** * 允许发送的最大消息长度 */ private int maxMessageSize = 1024 * 1024 * 4; // 4M private TraceDispatcher traceDispatcher = null;
设置好namesrv地址后启动DefaultMQProducer
@Override public void start() throws MQClientException { this.setProducerGroup(withNamespace(this.producerGroup)); // 启动内部实现类 this.defaultMQProducerImpl.start(); if (null != traceDispatcher) { try { traceDispatcher.start(this.getNamesrvAddr(), this.getAccessChannel()); } catch (MQClientException e) { log.warn("trace dispatcher start failed ", e); } } }
可以看到是通过defaultMQProducerImpl启动
/** * 启动生产者 * * @param startFactory 是否要启动MQClientInstance */ public void start(final boolean startFactory) throws MQClientException { switch (this.serviceState) { case CREATE_JUST: this.serviceState = ServiceState.START_FAILED; this.checkConfig(); if (!this.defaultMQProducer.getProducerGroup().equals(MixAll.CLIENT_INNER_PRODUCER_GROUP)) { // 重新设置InstanceName为进程id this.defaultMQProducer.changeInstanceNameToPID(); } // 获取或创建MQClientInstance实例 this.mQClientFactory = MQClientManager.getInstance().getOrCreateMQClientInstance(this.defaultMQProducer, rpcHook); // 向MQClientInstance注册该生产者 boolean registerOK = mQClientFactory.registerProducer(this.defaultMQProducer.getProducerGroup(), this); if (!registerOK) { this.serviceState = ServiceState.CREATE_JUST; throw new MQClientException("The producer group[" + this.defaultMQProducer.getProducerGroup() + "] has been created before, specify another name please." + FAQUrl.suggestTodo(FAQUrl.GROUP_NAME_DUPLICATE_URL), null); } // todo 不知道这个主题是干嘛的 this.topicPublishInfoTable.put(this.defaultMQProducer.getCreateTopicKey(), new TopicPublishInfo()); // 启动mQClientFactory,这里会启动多个定时任务以及一些功能服务 if (startFactory) { mQClientFactory.start(); } log.info("the producer [{}] start OK. sendMessageWithVIPChannel={}", this.defaultMQProducer.getProducerGroup(), this.defaultMQProducer.isSendMessageWithVIPChannel()); this.serviceState = ServiceState.RUNNING; break; case RUNNING: case START_FAILED: case SHUTDOWN_ALREADY: throw new MQClientException("The producer service state not OK, maybe started once, " + this.serviceState + FAQUrl.suggestTodo(FAQUrl.CLIENT_SERVICE_NOT_OK), null); default: break; } // 向所有broker发送心跳 this.mQClientFactory.sendHeartbeatToAllBrokerWithLock(); this.timer.scheduleAtFixedRate(new TimerTask() { @Override public void run() { try { RequestFutureTable.scanExpiredRequest(); } catch (Throwable e) { log.error("scan RequestFutureTable exception", e); } } }, 1000 * 3, 1000); }
该方法会调用两次,第一次就是现在的生产者启动,第二次是中间启动MQClientInstance中的发送重试消息的生产者启动,当然,第二次是针对消费端起作用,因为MQClientInstance是生产者和消费者共用。
先检查ProducerGroup是否符合要求以及重新设置InstanceName为进程id
public void changeInstanceNameToPID() { if (this.instanceName.equals("DEFAULT")) { this.instanceName = String.valueOf(UtilAll.getPid()); } }
如果instanceName是"DEFAULT",则将instanceName重新设置为进程id,防止在同一台机器上起多个程序但instanceName相同。
然后获取或创建MQClientInstance实例
public class MQClientManager { private final static InternalLogger log = ClientLogger.getLog(); // 单例模式,饿汉式 private static MQClientManager instance = new MQClientManager(); private AtomicInteger factoryIndexGenerator = new AtomicInteger(); // MQClientInstance实例缓存 private ConcurrentMap<String/* clientId */, MQClientInstance> factoryTable = new ConcurrentHashMap<String, MQClientInstance>(); private MQClientManager() { } public static MQClientManager getInstance() { return instance; } public MQClientInstance getOrCreateMQClientInstance(final ClientConfig clientConfig) { return getOrCreateMQClientInstance(clientConfig, null); } public MQClientInstance getOrCreateMQClientInstance(final ClientConfig clientConfig, RPCHook rpcHook) { String clientId = clientConfig.buildMQClientId(); MQClientInstance instance = this.factoryTable.get(clientId); // 根据clientId从缓存中获取,没获取到则创建一个放到缓存中 if (null == instance) { instance = new MQClientInstance(clientConfig.cloneClientConfig(), this.factoryIndexGenerator.getAndIncrement(), clientId, rpcHook); MQClientInstance prev = this.factoryTable.putIfAbsent(clientId, instance); if (prev != null) { instance = prev; log.warn("Returned Previous MQClientInstance for clientId:[{}]", clientId); } else { log.info("Created new MQClientInstance for clientId:[{}]", clientId); } } return instance; } public void removeClientFactory(final String clientId) { this.factoryTable.remove(clientId); } }
整个JVM实例中只存在一个MQClientInstance实例,并维护一个MQClientInstance缓存factoryTable,一个clientId只会创建一个MQClientInstance。clientId = 客户端IP + “@” + instanceName,同一个JVM内,生产者和消费者都是用一个MQClientInstance实例。
接着向MQClientInstance以生产组名为key注册该生产者
public class MQClientInstance { /** * 同一个生产组在同一个JVM中只有一个MQProducerInner实例(如果是启动消费者,该缓存为空) */ private final ConcurrentMap<String/* group */, MQProducerInner> producerTable = new ConcurrentHashMap<String, MQProducerInner>(); public boolean registerProducer(final String group, final DefaultMQProducerImpl producer) { if (null == group || null == producer) { return false; } MQProducerInner prev = this.producerTable.putIfAbsent(group, producer); if (prev != null) { log.warn("the producer group[{}] exist already.", group); return false; } return true; } }
启动mQClientFactory,这里会启动多个定时任务以及一些功能服务,如果该实例已经启动过一次,则本次不会真正执行
/** * 一个实例只会启动一次 */ public void start() throws MQClientException { synchronized (this) { switch (this.serviceState) { case CREATE_JUST: this.serviceState = ServiceState.START_FAILED; // If not specified,looking address from name server if (null == this.clientConfig.getNamesrvAddr()) { this.mQClientAPIImpl.fetchNameServerAddr(); } // 主要负责对外的API请求,启动netty客户端 this.mQClientAPIImpl.start(); // 启动一些定时任务,比如定时从NameServer拉取路由信息,定时向broker发送心跳等 this.startScheduledTask(); // 启动拉取消息服务 this.pullMessageService.start(); // 启动重平衡服务 this.rebalanceService.start(); // 用来消费者往broker发送重试消息?? this.defaultMQProducer.getDefaultMQProducerImpl().start(false); log.info("the client factory [{}] start OK", this.clientId); this.serviceState = ServiceState.RUNNING; break; case START_FAILED: throw new MQClientException("The Factory object[" + this.getClientId() + "] has been created before, and failed.", null); default: break; } } }
可以看到除了启动netty客户端和启动一些定时任务,其它几个服务都是根据消费者有关,这几个等分析消费者时再详细分析,这里就看下启动了哪些定时任务
private void startScheduledTask() { if (null == this.clientConfig.getNamesrvAddr()) { this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() { @Override public void run() { try { MQClientInstance.this.mQClientAPIImpl.fetchNameServerAddr(); } catch (Exception e) { log.error("ScheduledTask fetchNameServerAddr exception", e); } } }, 1000 * 10, 1000 * 60 * 2, TimeUnit.MILLISECONDS); } // 定时从NameServer拉取路由信息 this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() { @Override public void run() { try { MQClientInstance.this.updateTopicRouteInfoFromNameServer(); } catch (Exception e) { log.error("ScheduledTask updateTopicRouteInfoFromNameServer exception", e); } } }, 10, this.clientConfig.getPollNameServerInterval(), TimeUnit.MILLISECONDS); // 定时清理下线的broker和向所有的broker发送心跳 this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() { @Override public void run() { try { MQClientInstance.this.cleanOfflineBroker(); MQClientInstance.this.sendHeartbeatToAllBrokerWithLock(); } catch (Exception e) { log.error("ScheduledTask sendHeartbeatToAllBroker exception", e); } } }, 1000, this.clientConfig.getHeartbeatBrokerInterval(), TimeUnit.MILLISECONDS); // 定时持久化消费进度,默认5s this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() { @Override public void run() { try { MQClientInstance.this.persistAllConsumerOffset(); } catch (Exception e) { log.error("ScheduledTask persistAllConsumerOffset exception", e); } } }, 1000 * 10, this.clientConfig.getPersistConsumerOffsetInterval(), TimeUnit.MILLISECONDS); this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() { @Override public void run() { try { MQClientInstance.this.adjustThreadPool(); } catch (Exception e) { log.error("ScheduledTask adjustThreadPool exception", e); } } }, 1, 1, TimeUnit.MINUTES); }
在启动的最后,立即执行一次向所有broker发送心跳的操作
this.mQClientFactory.sendHeartbeatToAllBrokerWithLock();
可以看到,单单在启动生产者的过程就挺复杂的。
接下来,才要开始发送消息
Message msg = new Message("TestTopic", "TagA", ("Hello RocketMQ " + i).getBytes(RemotingHelper.DEFAULT_CHARSET)); SendResult sendResult = producer.send(msg);
创建消息类Message,设置好发送的主题,标签和消息内容,调用DefaultMQProducer实例方法send同步发送
@Override public SendResult send( Message msg) throws MQClientException, RemotingException, MQBrokerException, InterruptedException { // 验证消息,验证内容包括主题,消息体不能为空,消息长度不能为0且默认不能超过允许发送消息的最大长度4M Validators.checkMessage(msg, this); msg.setTopic(withNamespace(msg.getTopic())); return this.defaultMQProducerImpl.send(msg); }
委托给DefaultMQProducerImpl来发送消息
public SendResult send( Message msg) throws MQClientException, RemotingException, MQBrokerException, InterruptedException { return send(msg, this.defaultMQProducer.getSendMsgTimeout()); } /** * 默认发送同步消息,没有回调函数 */ public SendResult send(Message msg, long timeout) throws MQClientException, RemotingException, MQBrokerException, InterruptedException { return this.sendDefaultImpl(msg, CommunicationMode.SYNC, null, timeout); } /** * 发送消息,同步异步通用,根据communicationMode判断 */ private SendResult sendDefaultImpl( Message msg, final CommunicationMode communicationMode, final SendCallback sendCallback, final long timeout ) throws MQClientException, RemotingException, MQBrokerException, InterruptedException { this.makeSureStateOK(); // 验证消息 Validators.checkMessage(msg, this.defaultMQProducer); final long invokeID = random.nextLong(); long beginTimestampFirst = System.currentTimeMillis(); long beginTimestampPrev = beginTimestampFirst; long endTimestamp = beginTimestampFirst; // 尝试去获取路由信息,如果本地有,则从本地缓存获取,否则发送请求去NameServer获取并缓存到本地 TopicPublishInfo topicPublishInfo = this.tryToFindTopicPublishInfo(msg.getTopic()); if (topicPublishInfo != null && topicPublishInfo.ok()) { boolean callTimeout = false; MessageQueue mq = null; Exception exception = null; SendResult sendResult = null; // 发送失败重试次数,同步的根据设置,异步不会自动重试 int timesTotal = communicationMode == CommunicationMode.SYNC ? 1 + this.defaultMQProducer.getRetryTimesWhenSendFailed() : 1; int times = 0; // 记录重试过程中每次发送到的brokerName String[] brokersSent = new String[timesTotal]; // for循环重试 for (; times < timesTotal; times++) { // 上一次发送的队列所在的broker名称,用于broker避障机制 String lastBrokerName = null == mq ? null : mq.getBrokerName(); // 正常是轮询选择一个MessageQueue MessageQueue mqSelected = this.selectOneMessageQueue(topicPublishInfo, lastBrokerName); if (mqSelected != null) { mq = mqSelected; brokersSent[times] = mq.getBrokerName(); try { beginTimestampPrev = System.currentTimeMillis(); if (times > 0) { //Reset topic with namespace during resend. msg.setTopic(this.defaultMQProducer.withNamespace(msg.getTopic())); } long costTime = beginTimestampPrev - beginTimestampFirst; // 发送超时不会进行重试,直接退出 // 因为beginTimestampFirst是从循环开始之前就开始计时, // 第一次超时后第二次循环到这timeout < costTime肯定为true就超时退出了 if (timeout < costTime) { callTimeout = true; break; } // 发送消息 sendResult = this.sendKernelImpl(msg, mq, communicationMode, sendCallback, topicPublishInfo, timeout - costTime); endTimestamp = System.currentTimeMillis(); this.updateFaultItem(mq.getBrokerName(), endTimestamp - beginTimestampPrev, false); switch (communicationMode) { // 异步的和单向的不需要响应结果,而同步的需要返回响应结果 case ASYNC: return null; case ONEWAY: return null; case SYNC: if (sendResult.getSendStatus() != SendStatus.SEND_OK) { if (this.defaultMQProducer.isRetryAnotherBrokerWhenNotStoreOK()) { continue; } } return sendResult; default: break; } } catch (RemotingException e) { endTimestamp = System.currentTimeMillis(); this.updateFaultItem(mq.getBrokerName(), endTimestamp - beginTimestampPrev, true); log.warn(String.format("sendKernelImpl exception, resend at once, InvokeID: %s, RT: %sms, Broker: %s", invokeID, endTimestamp - beginTimestampPrev, mq), e); log.warn(msg.toString()); exception = e; continue; } catch (MQClientException e) { endTimestamp = System.currentTimeMillis(); this.updateFaultItem(mq.getBrokerName(), endTimestamp - beginTimestampPrev, true); log.warn(String.format("sendKernelImpl exception, resend at once, InvokeID: %s, RT: %sms, Broker: %s", invokeID, endTimestamp - beginTimestampPrev, mq), e); log.warn(msg.toString()); exception = e; continue; } catch (MQBrokerException e) { endTimestamp = System.currentTimeMillis(); this.updateFaultItem(mq.getBrokerName(), endTimestamp - beginTimestampPrev, true); log.warn(String.format("sendKernelImpl exception, resend at once, InvokeID: %s, RT: %sms, Broker: %s", invokeID, endTimestamp - beginTimestampPrev, mq), e); log.warn(msg.toString()); exception = e; switch (e.getResponseCode()) { case ResponseCode.TOPIC_NOT_EXIST: case ResponseCode.SERVICE_NOT_AVAILABLE: case ResponseCode.SYSTEM_ERROR: case ResponseCode.NO_PERMISSION: case ResponseCode.NO_BUYER_ID: case ResponseCode.NOT_IN_CURRENT_UNIT: continue; default: if (sendResult != null) { return sendResult; } throw e; } } catch (InterruptedException e) { endTimestamp = System.currentTimeMillis(); this.updateFaultItem(mq.getBrokerName(), endTimestamp - beginTimestampPrev, false); log.warn(String.format("sendKernelImpl exception, throw exception, InvokeID: %s, RT: %sms, Broker: %s", invokeID, endTimestamp - beginTimestampPrev, mq), e); log.warn(msg.toString()); log.warn("sendKernelImpl exception", e); log.warn(msg.toString()); throw e; } } else { break; } } if (sendResult != null) { return sendResult; } String info = String.format("Send [%d] times, still failed, cost [%d]ms, Topic: %s, BrokersSent: %s", times, System.currentTimeMillis() - beginTimestampFirst, msg.getTopic(), Arrays.toString(brokersSent)); info += FAQUrl.suggestTodo(FAQUrl.SEND_MSG_FAILED); MQClientException mqClientException = new MQClientException(info, exception); if (callTimeout) { throw new RemotingTooMuchRequestException("sendDefaultImpl call timeout"); } if (exception instanceof MQBrokerException) { mqClientException.setResponseCode(((MQBrokerException) exception).getResponseCode()); } else if (exception instanceof RemotingConnectException) { mqClientException.setResponseCode(ClientErrorCode.CONNECT_BROKER_EXCEPTION); } else if (exception instanceof RemotingTimeoutException) { mqClientException.setResponseCode(ClientErrorCode.ACCESS_BROKER_TIMEOUT); } else if (exception instanceof MQClientException) { mqClientException.setResponseCode(ClientErrorCode.BROKER_NOT_EXIST_EXCEPTION); } throw mqClientException; } validateNameServerSetting(); throw new MQClientException("No route info of this topic: " + msg.getTopic() + FAQUrl.suggestTodo(FAQUrl.NO_TOPIC_ROUTE_INFO), null).setResponseCode(ClientErrorCode.NOT_FOUND_TOPIC_EXCEPTION); }
发送之前,需要知道消息要发往哪个broker,所以就需要获取路由信息
// 尝试去获取路由信息,如果本地有,则从本地缓存获取,否则发送请求去NameServer获取并缓存到本地 TopicPublishInfo topicPublishInfo = this.tryToFindTopicPublishInfo(msg.getTopic());
private TopicPublishInfo tryToFindTopicPublishInfo(final String topic) { // 从本地缓存获取topic对应的路由信息 TopicPublishInfo topicPublishInfo = this.topicPublishInfoTable.get(topic); // 本地该topic路由缓存信息为空则去NameServer拉取 if (null == topicPublishInfo || !topicPublishInfo.ok()) { this.topicPublishInfoTable.putIfAbsent(topic, new TopicPublishInfo()); // 从NameServer拉取topic对应的路由信息 this.mQClientFactory.updateTopicRouteInfoFromNameServer(topic); // 拉取完后再从缓存中尝试获取 topicPublishInfo = this.topicPublishInfoTable.get(topic); } if (topicPublishInfo.isHaveTopicRouterInfo() || topicPublishInfo.ok()) { return topicPublishInfo; } else { // 还没获取到则去拉取默认的路由信息 this.mQClientFactory.updateTopicRouteInfoFromNameServer(topic, true, this.defaultMQProducer); topicPublishInfo = this.topicPublishInfoTable.get(topic); return topicPublishInfo; } }
先看一下DefaultMQProducerImpl缓存的路由信息长啥样
// 缓存路由信息,key是topic private final ConcurrentMap<String/* topic */, TopicPublishInfo> topicPublishInfoTable = new ConcurrentHashMap<String, TopicPublishInfo>();
TopicPublishInfo是topic对应的路由信息
public class TopicPublishInfo { // 是否是顺序消息 private boolean orderTopic = false; private boolean haveTopicRouterInfo = false; // 该主题下的消息队列 private List<MessageQueue> messageQueueList = new ArrayList<MessageQueue>(); // 每选择一次消息队列,该值会自增1,如果自增到int的最大值,则重置为0,用于轮询选择消息队列 private volatile ThreadLocalIndex sendWhichQueue = new ThreadLocalIndex(); // topic路由数据 private TopicRouteData topicRouteData; }
/** * topic中的消息队列 */ public class MessageQueue implements Comparable<MessageQueue>, Serializable { private static final long serialVersionUID = 6191200464116433425L; private String topic; private String brokerName; private int queueId; }
/** * 一个topic路由信息 */ public class TopicRouteData extends RemotingSerializable { private String orderTopicConf; // topic队列元数据 private List<QueueData> queueDatas; // topic分布的broker元数据,一个BrokerData是一个主从的信息 private List<BrokerData> brokerDatas; private HashMap<String/* brokerAddr */, List<String>/* Filter Server */> filterServerTable; }
/** * 队列元数据 */ public class QueueData implements Comparable<QueueData> { // 队列所属broker private String brokerName; // 读队列数 private int readQueueNums; // 写队列数 private int writeQueueNums; private int perm; private int topicSynFlag; }
/** * 保存一个主从架构中的broker地址信息,broker元数据 */ public class BrokerData implements Comparable<BrokerData> { /** * 集群名 */ private String cluster; /** * broker名称,一个主从架构中broker名称相同 */ private String brokerName; /** * 一个主从架构中每个broker的地址 * <0, IP:PORT> * <1, IP:PORT> * <2, IP:PORT> */ private HashMap<Long/* brokerId */, String/* broker address */> brokerAddrs; private final Random random = new Random(); }
以上几个类就是构成了完整的路由信息。
获取路由的流程:先从本地缓存获取topic对应的路由信息,没找到则去NameServer拉取该topic的路由信息,如果还是没找到,则用默认的topic路由信息。
获取到路由信息后,就需要从消费队列中选择一个队列,然后发往该队列。如果发送失败会进行重试,默认情况下,同步发送会重试两次,异步发送不会重试,注意,超时不会重试。
选择消息队列时有两种方式:
● sendLatencyFaultEnable=false,默认不启用Broker故障延迟机制。
● sendLatencyFaultEnable=true,启用Broker故障延迟机制。
源码如下:
MQFaultStrategy
public MessageQueue selectOneMessageQueue(final TopicPublishInfo tpInfo, final String lastBrokerName) { // 判断是否启用broker故障延迟机制,默认没有开启 if (this.sendLatencyFaultEnable) { try { int index = tpInfo.getSendWhichQueue().getAndIncrement(); for (int i = 0; i < tpInfo.getMessageQueueList().size(); i++) { int pos = Math.abs(index++) % tpInfo.getMessageQueueList().size(); if (pos < 0) pos = 0; MessageQueue mq = tpInfo.getMessageQueueList().get(pos); if (latencyFaultTolerance.isAvailable(mq.getBrokerName())) { if (null == lastBrokerName || mq.getBrokerName().equals(lastBrokerName)) return mq; } } final String notBestBroker = latencyFaultTolerance.pickOneAtLeast(); int writeQueueNums = tpInfo.getQueueIdByBroker(notBestBroker); if (writeQueueNums > 0) { final MessageQueue mq = tpInfo.selectOneMessageQueue(); if (notBestBroker != null) { mq.setBrokerName(notBestBroker); mq.setQueueId(tpInfo.getSendWhichQueue().getAndIncrement() % writeQueueNums); } return mq; } else { latencyFaultTolerance.remove(notBestBroker); } } catch (Exception e) { log.error("Error occurred when selecting message queue", e); } return tpInfo.selectOneMessageQueue(); } // 默认是没有启用broker故障延迟机制,走这里,直接通过轮询方式去选择一个Message Queue return tpInfo.selectOneMessageQueue(lastBrokerName); }
该机制原理是:首先在一次消息发送过程中,由于重试可能会多次执行选择消息队列这个方法,lastBrokerName就是上一次选择的执行发送消息失败的broker。第一次执行消息队列选择时,lastBrokerName为null,此时直接用sendWhichQueue自增再获取值,与当前路由表中消息队列个数取模,返回该位置的MessageQueue,如果消息发送再失败的话,下次进行消息队列选择时规避上次MessageQueue所在的Broker,否则还是很有可能再次失败。
该机制在一次消息发送过程中能成功规避故障的broker,但如果没有该机制,如果broker宕机,由于路由算法中的消息队列是按broker排序的,顺序选择,如果上一次根据路由算法选择的是宕机的broker的第一个队列,那么随后的下次选择的是宕机broker的第二个队列,消息发送很有可能会失败,再次引发重试,带来不必要的性能损耗。
消息发送的API入口:DefaultMQProducerImpl#sendKernelImpl
private SendResult sendKernelImpl(final Message msg, final MessageQueue mq, final CommunicationMode communicationMode, final SendCallback sendCallback, final TopicPublishInfo topicPublishInfo, final long timeout) throws MQClientException, RemotingException, MQBrokerException, InterruptedException { long beginStartTime = System.currentTimeMillis(); // 获得broker的地址,没找到则重新去NameServer拉取 String brokerAddr = this.mQClientFactory.findBrokerAddressInPublish(mq.getBrokerName()); if (null == brokerAddr) { tryToFindTopicPublishInfo(mq.getTopic()); brokerAddr = this.mQClientFactory.findBrokerAddressInPublish(mq.getBrokerName()); } SendMessageContext context = null; if (brokerAddr != null) { brokerAddr = MixAll.brokerVIPChannel(this.defaultMQProducer.isSendMessageWithVIPChannel(), brokerAddr); byte[] prevBody = msg.getBody(); try { //for MessageBatch,ID has been set in the generating process if (!(msg instanceof MessageBatch)) { // 分配全局唯一ID MessageClientIDSetter.setUniqID(msg); } boolean topicWithNamespace = false; if (null != this.mQClientFactory.getClientConfig().getNamespace()) { msg.setInstanceId(this.mQClientFactory.getClientConfig().getNamespace()); topicWithNamespace = true; } int sysFlag = 0; boolean msgBodyCompressed = false; // 消息体超过4k则会进行压缩 if (this.tryToCompressMessage(msg)) { sysFlag |= MessageSysFlag.COMPRESSED_FLAG; msgBodyCompressed = true; } final String tranMsg = msg.getProperty(MessageConst.PROPERTY_TRANSACTION_PREPARED); if (tranMsg != null && Boolean.parseBoolean(tranMsg)) { sysFlag |= MessageSysFlag.TRANSACTION_PREPARED_TYPE; } // 省略代码。。。 // 构建发送消息请求数据 SendMessageRequestHeader requestHeader = new SendMessageRequestHeader(); requestHeader.setProducerGroup(this.defaultMQProducer.getProducerGroup()); requestHeader.setTopic(msg.getTopic()); requestHeader.setDefaultTopic(this.defaultMQProducer.getCreateTopicKey()); requestHeader.setDefaultTopicQueueNums(this.defaultMQProducer.getDefaultTopicQueueNums()); requestHeader.setQueueId(mq.getQueueId()); requestHeader.setSysFlag(sysFlag); requestHeader.setBornTimestamp(System.currentTimeMillis()); requestHeader.setFlag(msg.getFlag()); requestHeader.setProperties(MessageDecoder.messageProperties2String(msg.getProperties())); requestHeader.setReconsumeTimes(0); requestHeader.setUnitMode(this.isUnitMode()); requestHeader.setBatch(msg instanceof MessageBatch); if (requestHeader.getTopic().startsWith(MixAll.RETRY_GROUP_TOPIC_PREFIX)) { String reconsumeTimes = MessageAccessor.getReconsumeTime(msg); if (reconsumeTimes != null) { requestHeader.setReconsumeTimes(Integer.valueOf(reconsumeTimes)); MessageAccessor.clearProperty(msg, MessageConst.PROPERTY_RECONSUME_TIME); } String maxReconsumeTimes = MessageAccessor.getMaxReconsumeTimes(msg); if (maxReconsumeTimes != null) { requestHeader.setMaxReconsumeTimes(Integer.valueOf(maxReconsumeTimes)); MessageAccessor.clearProperty(msg, MessageConst.PROPERTY_MAX_RECONSUME_TIMES); } } SendResult sendResult = null; // 根据不同的通信模式不同发送逻辑 switch (communicationMode) { // 异步 case ASYNC: Message tmpMessage = msg; boolean messageCloned = false; if (msgBodyCompressed) { //If msg body was compressed, msgbody should be reset using prevBody. //Clone new message using commpressed message body and recover origin massage. //Fix bug:https://github.com/apache/rocketmq-externals/issues/66 tmpMessage = MessageAccessor.cloneMessage(msg); messageCloned = true; msg.setBody(prevBody); } if (topicWithNamespace) { if (!messageCloned) { tmpMessage = MessageAccessor.cloneMessage(msg); messageCloned = true; } msg.setTopic(NamespaceUtil.withoutNamespace(msg.getTopic(), this.defaultMQProducer.getNamespace())); } long costTimeAsync = System.currentTimeMillis() - beginStartTime; if (timeout < costTimeAsync) { throw new RemotingTooMuchRequestException("sendKernelImpl call timeout"); } sendResult = this.mQClientFactory.getMQClientAPIImpl().sendMessage( brokerAddr, mq.getBrokerName(), tmpMessage, requestHeader, timeout - costTimeAsync, communicationMode, sendCallback, topicPublishInfo, this.mQClientFactory, this.defaultMQProducer.getRetryTimesWhenSendAsyncFailed(), context, this); break; // 单向以及同步 case ONEWAY: case SYNC: long costTimeSync = System.currentTimeMillis() - beginStartTime; if (timeout < costTimeSync) { throw new RemotingTooMuchRequestException("sendKernelImpl call timeout"); } // 发送消息 sendResult = this.mQClientFactory.getMQClientAPIImpl().sendMessage( brokerAddr, mq.getBrokerName(), msg, requestHeader, timeout - costTimeSync, communicationMode, context, this); break; default: assert false; break; } if (this.hasSendMessageHook()) { context.setSendResult(sendResult); this.executeSendMessageHookAfter(context); } return sendResult; } catch (RemotingException e) { if (this.hasSendMessageHook()) { context.setException(e); this.executeSendMessageHookAfter(context); } throw e; } catch (MQBrokerException e) { if (this.hasSendMessageHook()) { context.setException(e); this.executeSendMessageHookAfter(context); } throw e; } catch (InterruptedException e) { if (this.hasSendMessageHook()) { context.setException(e); this.executeSendMessageHookAfter(context); } throw e; } finally { msg.setBody(prevBody); msg.setTopic(NamespaceUtil.withoutNamespace(msg.getTopic(), this.defaultMQProducer.getNamespace())); } } throw new MQClientException("The broker[" + mq.getBrokerName() + "] not exist", null); }
Step1:先根据MessageQueue得到broker的地址
public class MQClientInstance { // 保存broker集群中每个broker地址 private final ConcurrentMap<String/* Broker Name */, HashMap<Long/* brokerId */, String/* address */>> brokerAddrTable = new ConcurrentHashMap<String, HashMap<Long, String>>(); public String findBrokerAddressInPublish(final String brokerName) { HashMap<Long/* brokerId */, String/* address */> map = this.brokerAddrTable.get(brokerName); if (map != null && !map.isEmpty()) { return map.get(MixAll.MASTER_ID); } return null; } }
如果从MQClientInstance中brokerAddrTable的缓存中没有找到broker地址信息,则重新去NameServer拉取,还没找到就抛异常。
Step2:设置全局唯一ID。
Step3:构建发送消息请求数据。
Step4:根据不同的通信模式使用MQClientAPIImpl去执行不同发送逻辑,分为同步,异步和单向。
最终发送请求的逻辑可以参考上面发送注册请求的逻辑。
《儒猿技术窝——从 0 开始带你成为消息中间件实战高手》