RocketMQ版本4.6.0,记录自己看源码的过程
在消费者启动过程中,会启动MQClientInstance,而MQClientInstance中会启动多个定时任务,其中就包括定时上报消费进度:
private void startScheduledTask() { // 省略其它定时任务。。。 // 定时持久化消费进度,默认5s this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() { @Override public void run() { try { MQClientInstance.this.persistAllConsumerOffset(); } catch (Exception e) { log.error("ScheduledTask persistAllConsumerOffset exception", e); } } }, 1000 * 10, this.clientConfig.getPersistConsumerOffsetInterval(), TimeUnit.MILLISECONDS); // 省略其它定时任务。。。 } /** * 持久化全部消费进度 */ private void persistAllConsumerOffset() { // 获取该JVM上全部消费者实例 Iterator<Entry<String, MQConsumerInner>> it = this.consumerTable.entrySet().iterator(); while (it.hasNext()) { Entry<String, MQConsumerInner> entry = it.next(); MQConsumerInner impl = entry.getValue(); // 调用各个消费者的持久化接口 impl.persistConsumerOffset(); } }
调用各个消费者的上报接口
DefaultMQPushConsumerImpl
/** * 持久化当前消费者消费进度 */ @Override public void persistConsumerOffset() { try { this.makeSureStateOK(); Set<MessageQueue> mqs = new HashSet<MessageQueue>(); // 获取该消费者分配的消息队列 Set<MessageQueue> allocateMq = this.rebalanceImpl.getProcessQueueTable().keySet(); mqs.addAll(allocateMq); // 使用消费进度组件持久化全部队列消费进度 this.offsetStore.persistAll(mqs); } catch (Exception e) { log.error("group: " + this.defaultMQPushConsumer.getConsumerGroup() + " persistConsumerOffset exception", e); } }
使用消费进度组件上报该消费者的所有消费队列消费进度
/** * 持久化消费进度 */ @Override public void persistAll(Set<MessageQueue> mqs) { if (null == mqs || mqs.isEmpty()) return; final HashSet<MessageQueue> unusedMQ = new HashSet<MessageQueue>(); // 遍历每个队列当前的消费进度 for (Map.Entry<MessageQueue, AtomicLong> entry : this.offsetTable.entrySet()) { MessageQueue mq = entry.getKey(); // 消费进度 AtomicLong offset = entry.getValue(); if (offset != null) { if (mqs.contains(mq)) { try { // 更新消费进度到broker中 this.updateConsumeOffsetToBroker(mq, offset.get()); log.info("[persistAll] Group: {} ClientId: {} updateConsumeOffsetToBroker {} {}", this.groupName, this.mQClientFactory.getClientId(), mq, offset.get()); } catch (Exception e) { log.error("updateConsumeOffsetToBroker exception, " + mq.toString(), e); } } else { unusedMQ.add(mq); } } } if (!unusedMQ.isEmpty()) { for (MessageQueue mq : unusedMQ) { this.offsetTable.remove(mq); log.info("remove unused mq, {}, {}", mq, this.groupName); } } } /** * 以单向方式更新消费进度到broker */ private void updateConsumeOffsetToBroker(MessageQueue mq, long offset) throws RemotingException, MQBrokerException, InterruptedException, MQClientException { updateConsumeOffsetToBroker(mq, offset, true); } /** * Update the Consumer Offset synchronously, once the Master is off, updated to Slave, here need to be optimized. */ @Override public void updateConsumeOffsetToBroker(MessageQueue mq, long offset, boolean isOneway) throws RemotingException, MQBrokerException, InterruptedException, MQClientException { // 获得一个broker地址,正常情况下都是master broker FindBrokerResult findBrokerResult = this.mQClientFactory.findBrokerAddressInAdmin(mq.getBrokerName()); if (null == findBrokerResult) { this.mQClientFactory.updateTopicRouteInfoFromNameServer(mq.getTopic()); findBrokerResult = this.mQClientFactory.findBrokerAddressInAdmin(mq.getBrokerName()); } if (findBrokerResult != null) { // 构建更新队列消费进度得请求数据 UpdateConsumerOffsetRequestHeader requestHeader = new UpdateConsumerOffsetRequestHeader(); requestHeader.setTopic(mq.getTopic()); requestHeader.setConsumerGroup(this.groupName); requestHeader.setQueueId(mq.getQueueId()); requestHeader.setCommitOffset(offset); if (isOneway) { // 单向发送更新进度请求 this.mQClientFactory.getMQClientAPIImpl().updateConsumerOffsetOneway( findBrokerResult.getBrokerAddr(), requestHeader, 1000 * 5); } else { this.mQClientFactory.getMQClientAPIImpl().updateConsumerOffset( findBrokerResult.getBrokerAddr(), requestHeader, 1000 * 5); } } else { throw new MQClientException("The broker[" + mq.getBrokerName() + "] not exist", null); } }
以单向的方式发送请求,请求码为UPDATE_CONSUMER_OFFSET
public void updateConsumerOffsetOneway( final String addr, final UpdateConsumerOffsetRequestHeader requestHeader, final long timeoutMillis ) throws RemotingConnectException, RemotingTooMuchRequestException, RemotingTimeoutException, RemotingSendRequestException, InterruptedException { RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.UPDATE_CONSUMER_OFFSET, requestHeader); this.remotingClient.invokeOneway(MixAll.brokerVIPChannel(this.clientConfig.isVipChannelEnabled(), addr), request, timeoutMillis); }
broker处理更新消费进度的处理器是ConsumerManageProcessor
@Override public RemotingCommand processRequest(ChannelHandlerContext ctx, RemotingCommand request) throws RemotingCommandException { switch (request.getCode()) { case RequestCode.GET_CONSUMER_LIST_BY_GROUP: return this.getConsumerListByGroup(ctx, request); // 处理consumer发过来的更新消费进度的请求 case RequestCode.UPDATE_CONSUMER_OFFSET: return this.updateConsumerOffset(ctx, request); case RequestCode.QUERY_CONSUMER_OFFSET: return this.queryConsumerOffset(ctx, request); default: break; } return null; } /** * 更新消息队列消费进度 */ private RemotingCommand updateConsumerOffset(ChannelHandlerContext ctx, RemotingCommand request) throws RemotingCommandException { final RemotingCommand response = RemotingCommand.createResponseCommand(UpdateConsumerOffsetResponseHeader.class); final UpdateConsumerOffsetRequestHeader requestHeader = (UpdateConsumerOffsetRequestHeader) request .decodeCommandCustomHeader(UpdateConsumerOffsetRequestHeader.class); // 向消费进度管理组件提交消费进度 this.brokerController.getConsumerOffsetManager().commitOffset(RemotingHelper.parseChannelRemoteAddr(ctx.channel()), requestHeader.getConsumerGroup(), requestHeader.getTopic(), requestHeader.getQueueId(), requestHeader.getCommitOffset()); response.setCode(ResponseCode.SUCCESS); response.setRemark(null); return response; }
向消费进度管理组件提交消费进度,消费进度统一由消费进度管理器管理
/** * 用来管理消费者的消费进度 */ public class ConsumerOffsetManager extends ConfigManager { /** * 消费进度表 */ private ConcurrentMap<String/* topic@group */, ConcurrentMap<Integer, Long>> offsetTable = new ConcurrentHashMap<String, ConcurrentMap<Integer, Long>>(512); /** * 上报消费进度 * @param clientHost 消费端地址 * @param group 消费组 * @param topic 主题 * @param queueId 队列id * @param offset 要更新的进度 */ public void commitOffset(final String clientHost, final String group, final String topic, final int queueId, final long offset) { // topic@group String key = topic + TOPIC_GROUP_SEPARATOR + group; this.commitOffset(clientHost, key, queueId, offset); } private void commitOffset(final String clientHost, final String key, final int queueId, final long offset) { ConcurrentMap<Integer, Long> map = this.offsetTable.get(key); // 第一次上报消费进度 if (null == map) { map = new ConcurrentHashMap<Integer, Long>(32); map.put(queueId, offset); this.offsetTable.put(key, map); } else { // 更新队列的消费进度 Long storeOffset = map.put(queueId, offset); if (storeOffset != null && offset < storeOffset) { log.warn("[NOTIFYME]update consumer offset less than store. clientHost={}, key={}, queueId={}, requestOffset={}, storeOffset={}", clientHost, key, queueId, offset, storeOffset); } } } }
这部分逻辑还是很清晰的。
《儒猿技术窝——从 0 开始带你成为消息中间件实战高手》