this.dispatcherList = new LinkedList<>(); this.dispatcherList.addLast(new CommitLogDispatcherBuildConsumeQueue()); this.dispatcherList.addLast(new CommitLogDispatcherBuildIndex());
doDispatch()会遍历CommitLogDispatcher,调用它们的dispatch()方法。其中专门用来通知ConsumeQueue的Dispatcher是CommitLogDispatcherBuildConsumeQueue。
class CommitLogDispatcherBuildConsumeQueue implements CommitLogDispatcher { @Override public void dispatch(DispatchRequest request) { final int tranType = MessageSysFlag.getTransactionValue(request.getSysFlag()); switch (tranType) { case MessageSysFlag.TRANSACTION_NOT_TYPE: case MessageSysFlag.TRANSACTION_COMMIT_TYPE: DefaultMessageStore.this.putMessagePositionInfo(request); break; case MessageSysFlag.TRANSACTION_PREPARED_TYPE: case MessageSysFlag.TRANSACTION_ROLLBACK_TYPE: break; } } }
当ReputMessageService调用了CommitLogDispatcherBuildConsumeQueue的dispatch()后,CommitLogDispatcherBuildConsumeQueue便会调用 DefaultMessageStore.this.putMessagePositionInfo(request):
public void putMessagePositionInfo(DispatchRequest dispatchRequest) { ConsumeQueue cq = this.findConsumeQueue(dispatchRequest.getTopic(), dispatchRequest.getQueueId()); cq.putMessagePositionInfoWrapper(dispatchRequest); }
putMessagePositionInfo()逻辑有两步:
1:调用findConsumeQueue(),根据消息的topic以及消息所属的ConsumeQueueId,找到对应的ConsumeQueue。
findConsumeQueue()会先从consumeQueueTable中查询topic的ConsumeQueueMap,如果未找到,便会为Topic创建一个新的ConcurrentMap<Integer/* queueId */, ConsumeQueue>,存放到表中。
接着在从Topic的ConcurrentMap中,根据QueueId,查询ConsumeQueue,如果未找到,便也会创建一个新的ConsumeQueue,存放到Map中。ConsumeQueue便是此时被创建的。
2:当找到消息对应的ConsumeQueue后,便调用ConsumeQueue的putMessagePositionInfoWrapper()方法,更新ConsumeQueue。
上面主要讲了ReputMessageService是如何通知ConsumeQueue的,现在我们就要看看ConsumeQueue收到通知后是如何更新的,更新逻辑就在putMessagePositionInfoWrapper()中。
putMessagePositionInfoWrapper()中调用了putMessagePositionInfo(),并引入了重试机制。
我们来看看putMessagePositionInfo()中的主要逻辑:
1:判断消息是否已经被处理过
if (offset <= this.maxPhysicOffset) { return true; }
maxPhysicOffset记录了上一次ConsumeQueue更新的消息在CommitLog中的偏移量,如果本次消息偏移量小于maxPhysicOffset,则表明消息已经被更新过,直接返回。