这部分主要来说明消费者对协议的处理。
这块的代码要追溯到joinGroup请求结束,通过前面的源码分析我们知道joinGroup主要是判断是否发起rebalance以及等待其他组成员加入组,而在所有成员加入或者RebalanceTimeout
之后会调用onCompleteJoin方法,代码如下。
def onCompleteJoin(group: GroupMetadata): Unit = { //1.1 针对动态成员的处理 group.inLock { group.notYetRejoinedMembers.filterNot(_.isStaticMember) foreach { failedMember => removeHeartbeatForLeavingMember(group, failedMember) group.remove(failedMember.memberId) group.removeStaticMember(failedMember.groupInstanceId) } if (group.is(Dead)) { info(s"Group ${group.groupId} is dead, skipping rebalance stage") //leader没有rejoin且没有member能选,则group.maybeElectNewJoinedLeader返回false,我们需要再次延时。通过maybeElectNewJoinedLeader选出leader } else if (!group.maybeElectNewJoinedLeader() && group.allMembers.nonEmpty) { // If all members are not rejoining, we will postpone the completion // of rebalance preparing stage, and send out another delayed operation // until session timeout removes all the non-responsive members. error(s"Group ${group.groupId} could not complete rebalance because no members rejoined") joinPurgatory.tryCompleteElseWatch( new DelayedJoin(this, group, group.rebalanceTimeoutMs), Seq(GroupKey(group.groupId))) } else { //1.2 状态转为CompletingRebalance,投票选择协议,选择票数最多的那个 group.initNextGeneration() if (group.is(Empty)) { info(s"Group ${group.groupId} with generation ${group.generationId} is now empty " + s"(${Topic.GROUP_METADATA_TOPIC_NAME}-${partitionFor(group.groupId)})") groupManager.storeGroup(group, Map.empty, error => { if (error != Errors.NONE) { // we failed to write the empty group metadata. If the broker fails before another rebalance, // the previous generation written to the log will become active again (and most likely timeout). // This should be safe since there are no active members in an empty generation, so we just warn. warn(s"Failed to write empty metadata for group ${group.groupId}: ${error.message}") } }) } else { //1.3 选举完成后就返回 // trigger the awaiting join group response callback for all the members after rebalancing for (member <- group.allMemberMetadata) { val joinResult = JoinGroupResult( members = if (group.isLeader(member.memberId)) { group.currentMemberMetadata } else { List.empty }, memberId = member.memberId, generationId = group.generationId, protocolType = group.protocolType, protocolName = group.protocolName, leaderId = group.leaderOrNull, error = Errors.NONE) group.maybeInvokeJoinCallback(member, joinResult) completeAndScheduleNextHeartbeatExpiration(group, member) member.isNew = false } } } } }
这是一个与前面关联的点,在前面一篇的案例中我们知道如果是静态成员会拥有更长的session时间,而动态成员则是在断连之后第一次Rebalance时就剔除掉下线的member,处理就是在1.1的
代码中。在这里会过滤掉静态成员,将没有加入组的成员移除掉。
在initNextGeneration方法中可以看到会对generationId自增加一,generationId相当于group的纪元,每次发生Rebalance都会自增。接着是设置protocolName,针对选举协议的部分就是在Some(selectProtocol)中。
def initNextGeneration() = { if (members.nonEmpty) { generationId += 1 protocolName = Some(selectProtocol) subscribedTopics = computeSubscribedTopics() transitionTo(CompletingRebalance) } else { generationId += 1 protocolName = None subscribedTopics = computeSubscribedTopics() transitionTo(Empty) } receivedConsumerOffsetCommits = false receivedTransactionalOffsetCommits = false }
对于selectProtocol这段代码还挺好理解的,就是对所有的member设置的协议投票,取票数最多的协议,这个代码逻辑也与很多资料说的相符,但针对协议的处理只是简单这样吗?
大家可以想象一个case:如果有四个消费者,其中三个都设置的StickyAssignor,而剩的这个设置的CooperativeStickyAssignor,正好剩的这个被选为leader会发生什么呢?
(前面分析过,服务端选择消费者leader也很随意,就是取member的第一个)。
def selectProtocol: String = { if (members.isEmpty) throw new IllegalStateException("Cannot select protocol for empty group") // select the protocol for this group which is supported by all members val candidates = candidateProtocols // let each member vote for one of the protocols and choose the one with the most votes val votes: List[(String, Int)] = allMemberMetadata .map(_.vote(candidates)) .groupBy(identity) .mapValues(_.size) .toList votes.maxBy(_._2)._1 }
Exception in thread “main” org.apache.kafka.common.errors.InconsistentGroupProtocolException: The group member’s supported protocols are incompatible with those of existing members or first group member tried to join with empty protocol type or empty protocol list.
def supportsProtocols(memberProtocolType: String, memberProtocols: Set[String]) = { if (is(Empty)) !memberProtocolType.isEmpty && memberProtocols.nonEmpty else protocolType.contains(memberProtocolType) && memberProtocols.exists(supportedProtocols(_) == members.size) }
对于成为leader的消费者,服务端会返回成员信息,其他的则返回空,返回参数样例如下
JoinGroupResponseData(throttleTimeMs=0, errorCode=0, generationId=3, protocolType=‘consumer’, protocolName=‘sticky’, leader=‘mykafka-group_4_1-d563db3b-0fcd-4ce8-8e65-12a663dba0f7’, memberId=‘mykafka-group_4_1-d563db3b-0fcd-4ce8-8e65-12a663dba0f7’, members=[JoinGroupResponseMember(memberId=‘mykafka-group_4_2-4c364a51-2d52-446f-b4d4-2b61ae3738c0’, groupInstanceId=null, metadata=[0, 1, 0, 0, 0, 1, 0, 7, 116, 111, 112, 105, 99, 95, 49, -1, -1, -1, -1, 0, 0, 0, 0]), JoinGroupResponseMember(memberId=‘mykafka-group_4_1-d563db3b-0fcd-4ce8-8e65-12a663dba0f7’, groupInstanceId=null, metadata=[0, 1, 0, 0, 0, 1, 0, 7, 116, 111, 112, 105, 99, 95, 49, -1, -1, -1, -1, 0, 0, 0, 0])])
JoinGroupResponseData(throttleTimeMs=0, errorCode=0, generationId=3, protocolType=‘consumer’, protocolName=‘sticky’, leader=‘mykafka-group_4_1-d563db3b-0fcd-4ce8-8e65-12a663dba0f7’, memberId=‘mykafka-group_4_2-4c364a51-2d52-446f-b4d4-2b61ae3738c0’, members=[])
在发送joinGroup请求之后有给response设置处理类JoinGroupResponseHandler,最终返回的是JoinGroupResponseHandler
处理之后的处理结果,我们来看JoinGroupResponseHandler中是如何处理的。
if (error == Errors.NONE) { if (isProtocolTypeInconsistent(joinResponse.data().protocolType())) { log.debug("JoinGroup failed due to inconsistent Protocol Type, received {} but expected {}", joinResponse.data().protocolType(), protocolType()); future.raise(Errors.INCONSISTENT_GROUP_PROTOCOL); } else { log.info("Received successful JoinGroup response: {}", joinResponse); sensors.joinSensor.record(response.requestLatencyMs()); synchronized (AbstractCoordinator.this) { if (state != MemberState.REBALANCING) { // if the consumer was woken up before a rebalance completes, we may have already left // the group. In this case, we do not want to continue with the sync group. future.raise(new UnjoinedGroupException()); } else { //根据返回参数的回调来做处理 AbstractCoordinator.this.generation = new Generation( joinResponse.data().generationId(), joinResponse.data().memberId(), joinResponse.data().protocolName()); //针对是否是leader分开处理 if (joinResponse.isLeader()) { onJoinLeader(joinResponse).chain(future); } else { onJoinFollower().chain(future); } } } } }
根据投票决定的分配规则分配分区,分配结束后发送SyncGroupRequest请求,在方法performAssignment中还会更新subscriptionState中的groupSubscription 及subscription,以及org.apache.kafka.clients.consumer.internals.ConsumerCoordinator中的 assignmentSnapshot以及 metadataSnapshot
private RequestFuture<ByteBuffer> onJoinLeader(JoinGroupResponse joinResponse) { try { // perform the leader synchronization and send back the assignment for the group //根据分区分配策略来分配组成员处理的分区 Map<String, ByteBuffer> groupAssignment = performAssignment(joinResponse.data().leader(), joinResponse.data().protocolName(), joinResponse.data().members()); List<SyncGroupRequestData.SyncGroupRequestAssignment> groupAssignmentList = new ArrayList<>(); for (Map.Entry<String, ByteBuffer> assignment : groupAssignment.entrySet()) { groupAssignmentList.add(new SyncGroupRequestData.SyncGroupRequestAssignment() .setMemberId(assignment.getKey()) .setAssignment(Utils.toArray(assignment.getValue())) ); } SyncGroupRequest.Builder requestBuilder = new SyncGroupRequest.Builder( new SyncGroupRequestData() .setGroupId(rebalanceConfig.groupId) .setMemberId(generation.memberId) .setProtocolType(protocolType()) .setProtocolName(generation.protocolName) .setGroupInstanceId(this.rebalanceConfig.groupInstanceId.orElse(null)) .setGenerationId(generation.generationId) .setAssignments(groupAssignmentList) ); log.debug("Sending leader SyncGroup to coordinator {} at generation {}: {}", this.coordinator, this.generation, requestBuilder); return sendSyncGroupRequest(requestBuilder); } catch (RuntimeException e) { return RequestFuture.failure(e); } }
针对follower就直接发送SyncGroupRequest
private RequestFuture<ByteBuffer> onJoinFollower() { // send follower's sync group with an empty assignment SyncGroupRequest.Builder requestBuilder = new SyncGroupRequest.Builder( new SyncGroupRequestData() .setGroupId(rebalanceConfig.groupId) .setMemberId(generation.memberId) .setProtocolType(protocolType()) .setProtocolName(generation.protocolName) .setGroupInstanceId(this.rebalanceConfig.groupInstanceId.orElse(null)) .setGenerationId(generation.generationId) .setAssignments(Collections.emptyList()) ); log.debug("Sending follower SyncGroup to coordinator {} at generation {}: {}", this.coordinator, this.generation, requestBuilder); return sendSyncGroupRequest(requestBuilder); }
这部分到发送同步数据请求这里就结束了,下一篇会来继续分析发送同步请求之后做的事,以及针对四个分配规则来深入分析