diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java index 3fa4ba2813a..679d85dfd15 100644 --- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java +++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java @@ -138,11 +138,15 @@ public abstract class AbstractCoordinator implements Closeable { private RequestFuture joinFuture = null; private RequestFuture findCoordinatorFuture = null; private volatile RuntimeException fatalFindCoordinatorException = null; + + // 消费者代,每次rebalance结束会更新 TODO? + // 初始代为-1 private Generation generation = Generation.NO_GENERATION; private long lastRebalanceStartMs = -1L; private long lastRebalanceEndMs = -1L; private long lastTimeOfConnectionMs = -1L; // starting logging a warning only after unable to connect for a while + // Coordinator State protected MemberState state = MemberState.UNJOINED; @@ -405,6 +409,7 @@ public abstract class AbstractCoordinator implements Closeable { * @return true iff the operation succeeded */ boolean joinGroupIfNeeded(final Timer timer) { + // 循环条件:rejoinNeeded || joinFuture != null; while (rejoinNeededOrPending()) { if (!ensureCoordinatorReady(timer)) { return false; @@ -415,13 +420,14 @@ public abstract class AbstractCoordinator implements Closeable { // on each iteration of the loop because an event requiring a rebalance (such as a metadata // refresh which changes the matched subscription set) can occur while another rebalance is // still in progress. + // 准备join group if (needsJoinPrepare) { // need to set the flag before calling onJoinPrepare since the user callback may throw // exception, in which case upon retry we should not retry onJoinPrepare either. needsJoinPrepare = false; onJoinPrepare(generation.generationId, generation.memberId); } - + // 发送JOIN_GROUP请求 final RequestFuture future = initiateJoinGroup(); client.poll(future, timer); if (!future.isDone()) { @@ -430,6 +436,7 @@ public abstract class AbstractCoordinator implements Closeable { } if (future.succeeded()) { + // 防止heartbeat线程并发修改,使用synchronized获取最新版本 Generation generationSnapshot; MemberState stateSnapshot; @@ -437,15 +444,20 @@ public abstract class AbstractCoordinator implements Closeable { // Can't use synchronized for {@code onJoinComplete}, because it can be long enough // and shouldn't block heartbeat thread. // See {@link PlaintextConsumerTest#testMaxPollIntervalMsDelayInAssignment} + // onJoinComplete执行太久,防止block heartbeat线程 synchronized (AbstractCoordinator.this) { generationSnapshot = this.generation; stateSnapshot = this.state; } + // 实际在SYNC_GROUP成功后执行,此时state=STABLE if (!generationSnapshot.equals(Generation.NO_GENERATION) && stateSnapshot == MemberState.STABLE) { // Duplicate the buffer in case `onJoinComplete` does not complete and needs to be retried. + + // 这里是SYNC_GROUP的返回值 assignment[] ByteBuffer memberAssignment = future.value().duplicate(); + // 处理加入组成功 onJoinComplete(generationSnapshot.generationId, generationSnapshot.memberId, generationSnapshot.protocolName, memberAssignment); // Generally speaking we should always resetJoinGroupFuture once the future is done, but here @@ -564,15 +576,19 @@ public abstract class AbstractCoordinator implements Closeable { super(generation); } + // 处理JOIN_GROUP请求的返回值 @Override public void handle(JoinGroupResponse joinResponse, RequestFuture future) { Errors error = joinResponse.error(); + // 没有错误场景 if (error == Errors.NONE) { if (isProtocolTypeInconsistent(joinResponse.data().protocolType())) { log.error("JoinGroup failed: Inconsistent Protocol Type, received {} but expected {}", joinResponse.data().protocolType(), protocolType()); future.raise(Errors.INCONSISTENT_GROUP_PROTOCOL); - } else { + } + // + else { log.debug("Received successful JoinGroup response: {}", joinResponse); sensors.joinSensor.record(response.requestLatencyMs()); @@ -581,7 +597,10 @@ public abstract class AbstractCoordinator implements Closeable { // if the consumer was woken up before a rebalance completes, we may have already left // the group. In this case, we do not want to continue with the sync group. future.raise(new UnjoinedGroupException()); - } else { + } + + else { + // 状态转化到COMPLETING_REBALANCE state = MemberState.COMPLETING_REBALANCE; // we only need to enable heartbeat thread whenever we transit to @@ -589,25 +608,35 @@ public abstract class AbstractCoordinator implements Closeable { if (heartbeatThread != null) heartbeatThread.enable(); + // 根据响应中的generationId、memberId和分区分配策略更新generation对象 AbstractCoordinator.this.generation = new Generation( joinResponse.data().generationId(), joinResponse.data().memberId(), joinResponse.data().protocolName()); log.info("Successfully joined group with generation {}", AbstractCoordinator.this.generation); + // 处理leader后续事件 if (joinResponse.isLeader()) { + // chain(future)表示:使用SYNC_GROUP的返回值代替JOIN_GROUP onJoinLeader(joinResponse).chain(future); - } else { + } + // 处理follower后续事件 + else { + // chain(future)表示:使用SYNC_GROUP的返回值代替JOIN_GROUP onJoinFollower().chain(future); } } } } - } else if (error == Errors.COORDINATOR_LOAD_IN_PROGRESS) { + } + + else if (error == Errors.COORDINATOR_LOAD_IN_PROGRESS) { log.info("JoinGroup failed: Coordinator {} is loading the group.", coordinator()); // backoff and retry future.raise(error); - } else if (error == Errors.UNKNOWN_MEMBER_ID) { + } + + else if (error == Errors.UNKNOWN_MEMBER_ID) { log.info("JoinGroup failed: {} Need to re-join the group. Sent generation was {}", error.message(), sentGeneration); // only need to reset the member id if generation has not been changed, @@ -616,20 +645,26 @@ public abstract class AbstractCoordinator implements Closeable { resetGenerationOnResponseError(ApiKeys.JOIN_GROUP, error); future.raise(error); - } else if (error == Errors.COORDINATOR_NOT_AVAILABLE + } + + else if (error == Errors.COORDINATOR_NOT_AVAILABLE || error == Errors.NOT_COORDINATOR) { // re-discover the coordinator and retry with backoff markCoordinatorUnknown(error); log.info("JoinGroup failed: {} Marking coordinator unknown. Sent generation was {}", error.message(), sentGeneration); future.raise(error); - } else if (error == Errors.FENCED_INSTANCE_ID) { + } + + else if (error == Errors.FENCED_INSTANCE_ID) { // for join-group request, even if the generation has changed we would not expect the instance id // gets fenced, and hence we always treat this as a fatal error log.error("JoinGroup failed: The group instance id {} has been fenced by another instance. " + "Sent generation was {}", rebalanceConfig.groupInstanceId, sentGeneration); future.raise(error); - } else if (error == Errors.INCONSISTENT_GROUP_PROTOCOL + } + + else if (error == Errors.INCONSISTENT_GROUP_PROTOCOL || error == Errors.INVALID_SESSION_TIMEOUT || error == Errors.INVALID_GROUP_ID || error == Errors.GROUP_AUTHORIZATION_FAILED @@ -644,11 +679,15 @@ public abstract class AbstractCoordinator implements Closeable { } else { future.raise(error); } - } else if (error == Errors.UNSUPPORTED_VERSION) { + } + + else if (error == Errors.UNSUPPORTED_VERSION) { log.error("JoinGroup failed due to unsupported version error. Please unset field group.instance.id " + "and retry to see if the problem resolves"); future.raise(error); - } else if (error == Errors.MEMBER_ID_REQUIRED) { + } + + else if (error == Errors.MEMBER_ID_REQUIRED) { // Broker requires a concrete member id to be allowed to join the group. Update member id // and send another join group request in next cycle. String memberId = joinResponse.data().memberId(); @@ -658,11 +697,15 @@ public abstract class AbstractCoordinator implements Closeable { AbstractCoordinator.this.generation = new Generation(OffsetCommitRequest.DEFAULT_GENERATION_ID, memberId, null); } future.raise(error); - } else if (error == Errors.REBALANCE_IN_PROGRESS) { + } + + else if (error == Errors.REBALANCE_IN_PROGRESS) { log.info("JoinGroup failed due to non-fatal error: REBALANCE_IN_PROGRESS, " + "which could indicate a replication timeout on the broker. Will retry."); future.raise(error); - } else { + } + + else { // unexpected error, throw the exception log.error("JoinGroup failed due to unexpected error: {}", error.message()); future.raise(new KafkaException("Unexpected error in join group response: " + error.message())); @@ -670,7 +713,9 @@ public abstract class AbstractCoordinator implements Closeable { } } + // JOIN_GROUP请求,follower节点执行此方法 private RequestFuture onJoinFollower() { + // follower很简单,发送empty assignments给coordinator即可 // send follower's sync group with an empty assignment SyncGroupRequest.Builder requestBuilder = new SyncGroupRequest.Builder( @@ -687,12 +732,15 @@ public abstract class AbstractCoordinator implements Closeable { return sendSyncGroupRequest(requestBuilder); } + // JOIN_GROUP请求,leader节点执行此方法 private RequestFuture onJoinLeader(JoinGroupResponse joinResponse) { try { + // 重点,leader执行分区分配方案 assignment // perform the leader synchronization and send back the assignment for the group Map groupAssignment = performAssignment(joinResponse.data().leader(), joinResponse.data().protocolName(), joinResponse.data().members()); + // 准备SYNC_GROUP请求发送给Coordinator,包含分区分配方案 List groupAssignmentList = new ArrayList<>(); for (Map.Entry assignment : groupAssignment.entrySet()) { groupAssignmentList.add(new SyncGroupRequestData.SyncGroupRequestAssignment() @@ -1096,7 +1144,9 @@ public abstract class AbstractCoordinator implements Closeable { .setMemberId(this.generation.memberId) .setGroupInstanceId(this.rebalanceConfig.groupInstanceId.orElse(null)) .setGenerationId(this.generation.generationId)); + return client.send(coordinator, requestBuilder) + // heartbeat的返回值处理 .compose(new HeartbeatResponseHandler(generation)); } @@ -1105,6 +1155,7 @@ public abstract class AbstractCoordinator implements Closeable { super(generation); } + // 处理heartbeat请求的响应 @Override public void handle(HeartbeatResponse heartbeatResponse, RequestFuture future) { sensors.heartbeatSensor.record(response.requestLatencyMs()); @@ -1113,18 +1164,23 @@ public abstract class AbstractCoordinator implements Closeable { if (error == Errors.NONE) { log.debug("Received successful Heartbeat response"); future.complete(null); - } else if (error == Errors.COORDINATOR_NOT_AVAILABLE + } + // GroupCoordinator发生变动 + else if (error == Errors.COORDINATOR_NOT_AVAILABLE || error == Errors.NOT_COORDINATOR) { log.info("Attempt to heartbeat failed since coordinator {} is either not started or not valid", coordinator()); markCoordinatorUnknown(error); future.raise(error); - } else if (error == Errors.REBALANCE_IN_PROGRESS) { + } + // 当前处于REBALANCE_IN_PROGRESS,触发Rejoin JOIN_GROUP + else if (error == Errors.REBALANCE_IN_PROGRESS) { // since we may be sending the request during rebalance, we should check // this case and ignore the REBALANCE_IN_PROGRESS error synchronized (AbstractCoordinator.this) { if (state == MemberState.STABLE) { log.info("Attempt to heartbeat failed since group is rebalancing"); + // 仅更改状态 rejoinNeeded = true requestRejoin(); future.raise(error); } else { @@ -1132,7 +1188,9 @@ public abstract class AbstractCoordinator implements Closeable { future.complete(null); } } - } else if (error == Errors.ILLEGAL_GENERATION || + } + // + else if (error == Errors.ILLEGAL_GENERATION || error == Errors.UNKNOWN_MEMBER_ID || error == Errors.FENCED_INSTANCE_ID) { if (generationUnchanged()) { @@ -1307,11 +1365,13 @@ public abstract class AbstractCoordinator implements Closeable { } } + // Kafka消费者的心跳线程 private class HeartbeatThread extends KafkaThread implements AutoCloseable { private boolean enabled = false; private boolean closed = false; private final AtomicReference failed = new AtomicReference<>(null); + // 名字: `kafka-coordinator-heartbeat-thread | ${groupId}` private HeartbeatThread() { super(HEARTBEAT_THREAD_PREFIX + (rebalanceConfig.groupId.isEmpty() ? "" : " | " + rebalanceConfig.groupId), true); } @@ -1353,6 +1413,7 @@ public abstract class AbstractCoordinator implements Closeable { log.debug("Heartbeat thread started"); while (true) { synchronized (AbstractCoordinator.this) { + // 双控制变量: closed, enabled if (closed) return; @@ -1369,9 +1430,11 @@ public abstract class AbstractCoordinator implements Closeable { continue; } + // heartbeat作用2:帮助处理网络请求 client.pollNoWakeup(); long now = time.milliseconds(); + // GroupCoordinator 未知 if (coordinatorUnknown()) { if (findCoordinatorFuture != null) { // clear the future so that after the backoff, if the hb still sees coordinator unknown in @@ -1383,12 +1446,16 @@ public abstract class AbstractCoordinator implements Closeable { } else { lookupCoordinator(); } - } else if (heartbeat.sessionTimeoutExpired(now)) { + } + // sessionTimer超时 + else if (heartbeat.sessionTimeoutExpired(now)) { // the session timeout has expired without seeing a successful heartbeat, so we should // probably make sure the coordinator is still healthy. markCoordinatorUnknown("session timed out without receiving a " + "heartbeat response"); - } else if (heartbeat.pollTimeoutExpired(now)) { + } + // pollTimer超时 + else if (heartbeat.pollTimeoutExpired(now)) { // the poll timeout has expired, which means that the foreground thread has stalled // in between calls to poll(). String leaveReason = "consumer poll timeout has expired. This means the time between subsequent calls to poll() " + @@ -1397,17 +1464,24 @@ public abstract class AbstractCoordinator implements Closeable { "You can address this either by increasing max.poll.interval.ms or by reducing " + "the maximum size of batches returned in poll() with max.poll.records."; maybeLeaveGroup(leaveReason); - } else if (!heartbeat.shouldHeartbeat(now)) { + } + // 距离上一次心跳时间是否超过`heartbeat.interval.ms` + else if (!heartbeat.shouldHeartbeat(now)) { // poll again after waiting for the retry backoff in case the heartbeat failed or the // coordinator disconnected AbstractCoordinator.this.wait(rebalanceConfig.retryBackoffMs); - } else { + } + // 前置检查通过,需要发送心跳 + else { + // 更新状态:发送中 heartbeat.sentHeartbeat(now); + // 发送heartbeat请求 final RequestFuture heartbeatFuture = sendHeartbeatRequest(); heartbeatFuture.addListener(new RequestFutureListener() { @Override public void onSuccess(Void value) { synchronized (AbstractCoordinator.this) { + // 更新heartbeat状态:发送成功 heartbeat.receiveHeartbeat(); } } @@ -1415,13 +1489,16 @@ public abstract class AbstractCoordinator implements Closeable { @Override public void onFailure(RuntimeException e) { synchronized (AbstractCoordinator.this) { + // RebalanceInProgressException异常,客户端认为正常收到结果 if (e instanceof RebalanceInProgressException) { // it is valid to continue heartbeating while the group is rebalancing. This // ensures that the coordinator keeps the member in the group for as long // as the duration of the rebalance timeout. If we stop sending heartbeats, // however, then the session timeout may expire before we can rejoin. heartbeat.receiveHeartbeat(); - } else if (e instanceof FencedInstanceIdException) { + } + // 其他异常,标记为失败 + else if (e instanceof FencedInstanceIdException) { log.error("Caught fenced group.instance.id {} error in heartbeat thread", rebalanceConfig.groupInstanceId); heartbeatThread.failed.set(e); } else { diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java index 51837e57495..a37dcc90547 100644 --- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java +++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java @@ -408,6 +408,8 @@ public final class ConsumerCoordinator extends AbstractCoordinator { // Revoke partitions that were previously owned but no longer assigned; // note that we should only change the assignment (or update the assignor's state) // AFTER we've triggered the revoke callback + + // 触发subscriptions.rebalanceListener.onPartitionsRevoked() firstException.compareAndSet(null, invokePartitionsRevoked(revokedPartitions)); // If revoked any partitions, need to re-join the group afterwards @@ -421,6 +423,7 @@ public final class ConsumerCoordinator extends AbstractCoordinator { maybeUpdateJoinedSubscription(assignedPartitions); // Catch any exception here to make sure we could complete the user callback. + // 触发assignor.onAssignment() firstException.compareAndSet(null, invokeOnAssignment(assignor, assignment)); // Reschedule the auto commit starting from now @@ -430,6 +433,7 @@ public final class ConsumerCoordinator extends AbstractCoordinator { subscriptions.assignFromSubscribed(assignedPartitions); // Add partitions that were not previously owned but are now assigned + // 触发subscriptions.rebalanceListener.onPartitionsAssigned() firstException.compareAndSet(null, invokePartitionsAssigned(addedPartitions)); if (firstException.get() != null) { diff --git a/core/src/main/scala/kafka/coordinator/group/DelayedJoin.scala b/core/src/main/scala/kafka/coordinator/group/DelayedJoin.scala index 22dfa9d157b..fe2e250c7f7 100644 --- a/core/src/main/scala/kafka/coordinator/group/DelayedJoin.scala +++ b/core/src/main/scala/kafka/coordinator/group/DelayedJoin.scala @@ -39,13 +39,21 @@ private[group] class DelayedJoin( rebalanceTimeout, group.lock ) { - override def tryComplete(): Boolean = coordinator.tryCompleteJoin(group, forceComplete _) + override def tryComplete(): Boolean = { + // 调用 GroupCoordinator.tryComplete + // forceComplete: 取消超时时间的计时器,并执行 onComplete 方法 + coordinator.tryCompleteJoin(group, forceComplete _) + } override def onExpiration(): Unit = { // try to complete delayed actions introduced by coordinator.onCompleteJoin tryToCompleteDelayedAction() } - override def onComplete(): Unit = coordinator.onCompleteJoin(group) + override def onComplete(): Unit = { + // 执行成功回调 + // + coordinator.onCompleteJoin(group) + } // TODO: remove this ugly chain after we move the action queue to handler thread private def tryToCompleteDelayedAction(): Unit = coordinator.groupManager.replicaManager.tryCompleteActions() diff --git a/core/src/main/scala/kafka/coordinator/group/GroupCoordinator.scala b/core/src/main/scala/kafka/coordinator/group/GroupCoordinator.scala index 54573e02db0..fafcb8ce5fa 100644 --- a/core/src/main/scala/kafka/coordinator/group/GroupCoordinator.scala +++ b/core/src/main/scala/kafka/coordinator/group/GroupCoordinator.scala @@ -152,26 +152,28 @@ class GroupCoordinator(val brokerId: Int, } } - def handleJoinGroup(groupId: String, - memberId: String, - groupInstanceId: Option[String], - requireKnownMemberId: Boolean, - clientId: String, - clientHost: String, + def handleJoinGroup(groupId: String, // 消费者groupId + memberId: String,// 消费者groupId + groupInstanceId: Option[String],// 消费者组实例id + requireKnownMemberId: Boolean,// 是否需要成员ID不为空 + clientId: String,// 消费者clientId + clientHost: String,// 消费者主机名 rebalanceTimeoutMs: Int, sessionTimeoutMs: Int, protocolType: String, protocols: List[(String, Array[Byte])], responseCallback: JoinCallback): Unit = { + // 校验入参groupId及状态 validateGroupStatus(groupId, ApiKeys.JOIN_GROUP).foreach { error => responseCallback(JoinGroupResult(memberId, error)) return } - + // 校验入参sessionTimeoutMs if (sessionTimeoutMs < groupConfig.groupMinSessionTimeoutMs || sessionTimeoutMs > groupConfig.groupMaxSessionTimeoutMs) { responseCallback(JoinGroupResult(memberId, Errors.INVALID_SESSION_TIMEOUT)) } else { + // 消费者组成员ID是否为空 val isUnknownMember = memberId == JoinGroupRequest.UNKNOWN_MEMBER_ID // group is created if it does not exist and the member id is UNKNOWN. if member // is specified but group does not exist, request is rejected with UNKNOWN_MEMBER_ID @@ -180,12 +182,17 @@ class GroupCoordinator(val brokerId: Int, responseCallback(JoinGroupResult(memberId, Errors.UNKNOWN_MEMBER_ID)) case Some(group) => group.inLock { + // 判断消费者组是否有足够空间接受当前成员 if (!acceptJoiningMember(group, memberId)) { group.remove(memberId) responseCallback(JoinGroupResult(JoinGroupRequest.UNKNOWN_MEMBER_ID, Errors.GROUP_MAX_SIZE_REACHED)) - } else if (isUnknownMember) { + } + // 安排空id成员入组 + else if (isUnknownMember) { doUnknownJoinGroup(group, groupInstanceId, requireKnownMemberId, clientId, clientHost, rebalanceTimeoutMs, sessionTimeoutMs, protocolType, protocols, responseCallback) - } else { + } + // 安排非空id成员入组 + else { doJoinGroup(group, memberId, groupInstanceId, clientId, clientHost, rebalanceTimeoutMs, sessionTimeoutMs, protocolType, protocols, responseCallback) } @@ -209,28 +216,45 @@ class GroupCoordinator(val brokerId: Int, protocols: List[(String, Array[Byte])], responseCallback: JoinCallback): Unit = { group.inLock { + // 如果消费者组状态为Dead if (group.is(Dead)) { // if the group is marked as dead, it means some other thread has just removed the group // from the coordinator metadata; it is likely that the group has migrated to some other // coordinator OR the group is in a transient unstable phase. Let the member retry // finding the correct coordinator and rejoin. responseCallback(JoinGroupResult(JoinGroupRequest.UNKNOWN_MEMBER_ID, Errors.COORDINATOR_NOT_AVAILABLE)) - } else if (!group.supportsProtocols(protocolType, MemberMetadata.plainProtocolSet(protocols))) { + } + // 检查消费者版本和策略 + // 如果成员配置的协议类型/分区消费分配策略与消费者组的不匹配,封装INCONSISTENT_GROUP_PROTOCOL异常并调用回调函数返回 + // 这里需要注意一点:新加入成员的设置的分区分配策略,必须至少有一个策略是组内所有成员都支持的,因为消费者组选举分区分配策略时 + // 第一步就是要获取所有成员都支持的分区分配策略,否则无法选举 + else if (!group.supportsProtocols(protocolType, MemberMetadata.plainProtocolSet(protocols))) { responseCallback(JoinGroupResult(JoinGroupRequest.UNKNOWN_MEMBER_ID, Errors.INCONSISTENT_GROUP_PROTOCOL)) - } else { - val newMemberId = group.generateMemberId(clientId, groupInstanceId) + } + else { + // 服务端生成memberId,用`client.id-${UUID}`或者`groupInstanceId.id-${UUID}`拼接而成 + val newMemberId = group.generateMemberId(clientId, groupInstanceId) + // 添加静态成员,并触发rebalance if (group.hasStaticMember(groupInstanceId)) { updateStaticMemberAndRebalance(group, newMemberId, groupInstanceId, protocols, responseCallback) - } else if (requireKnownMemberId) { - // If member id required (dynamic membership), register the member in the pending member list - // and send back a response to call for another join group request with allocated member id. + } + // 如果要求成员ID不为空,默认为true + // 当满足条件 joinGroupRequest.version >= 4 && groupInstanceId.isEmpty,requireKnownMemberId为true + else if (requireKnownMemberId) { + // 如果申请加入组的成员 memberId 为空,服务端会先生成一个memberId然后将该请求 "打回去",携带生成的 memberId 和 MEMBER_ID_REQUIRED 异常信息。 + // 当客户端收到包含该异常信息的响应,会根据返回的 memberId更新自身的信息,并重新发送 JoinGroupRequest,之后就会调用 doJoinGroup 方法了 + + // If member id required (dynamic membership), register the member in the pending member list + // and send back a response to call for another join group request with allocated member id. debug(s"Dynamic member with unknown member id joins group ${group.groupId} in " + s"${group.currentState} state. Created a new member id $newMemberId and request the member to rejoin with this id.") group.addPendingMember(newMemberId) addPendingMemberExpiration(group, newMemberId, sessionTimeoutMs) responseCallback(JoinGroupResult(newMemberId, Errors.MEMBER_ID_REQUIRED)) - } else { + } + // 增加member并触发rebalance + else { info(s"${if (groupInstanceId.isDefined) "Static" else "Dynamic"} Member with unknown member id joins group ${group.groupId} in " + s"${group.currentState} state. Created a new member id $newMemberId for this member and add to the group.") addMemberAndRebalance(rebalanceTimeoutMs, sessionTimeoutMs, newMemberId, groupInstanceId, @@ -240,6 +264,7 @@ class GroupCoordinator(val brokerId: Int, } } + // 安排设置了memberId的消费者加入组 private def doJoinGroup(group: GroupMetadata, memberId: String, groupInstanceId: Option[String], @@ -251,6 +276,7 @@ class GroupCoordinator(val brokerId: Int, protocols: List[(String, Array[Byte])], responseCallback: JoinCallback): Unit = { group.inLock { + // 前置检查,各类异常情况 if (group.is(Dead)) { // if the group is marked as dead, it means some other thread has just removed the group // from the coordinator metadata; this is likely that the group has migrated to some other @@ -259,7 +285,9 @@ class GroupCoordinator(val brokerId: Int, responseCallback(JoinGroupResult(memberId, Errors.COORDINATOR_NOT_AVAILABLE)) } else if (!group.supportsProtocols(protocolType, MemberMetadata.plainProtocolSet(protocols))) { responseCallback(JoinGroupResult(memberId, Errors.INCONSISTENT_GROUP_PROTOCOL)) - } else if (group.isPendingMember(memberId)) { + } + // 第一阶段,处理`待决成员`入组申请 + else if (group.isPendingMember(memberId)) { // A rejoining pending member will be accepted. Note that pending member will never be a static member. if (groupInstanceId.isDefined) { throw new IllegalStateException(s"the static member $groupInstanceId was not expected to be assigned " + @@ -267,31 +295,50 @@ class GroupCoordinator(val brokerId: Int, } else { debug(s"Dynamic Member with specific member id $memberId joins group ${group.groupId} in " + s"${group.currentState} state. Adding to the group now.") + // `待决成员`加入组 addMemberAndRebalance(rebalanceTimeoutMs, sessionTimeoutMs, memberId, groupInstanceId, clientId, clientHost, protocolType, protocols, group, responseCallback) } - } else { + } + // 第二阶段,处理`非待决成员`的入组申请 + else { + // 消费者组实例id没找到 val groupInstanceIdNotFound = groupInstanceId.isDefined && !group.hasStaticMember(groupInstanceId) + // 检查static member.id是否最新 if (group.isStaticMemberFenced(memberId, groupInstanceId, "join-group")) { // given member id doesn't match with the groupInstanceId. Inform duplicate instance to shut down immediately. responseCallback(JoinGroupResult(memberId, Errors.FENCED_INSTANCE_ID)) - } else if (!group.has(memberId) || groupInstanceIdNotFound) { + } + // memberId找不到,失败 + else if (!group.has(memberId) || groupInstanceIdNotFound) { // If the dynamic member trying to register with an unrecognized id, or // the static member joins with unknown group instance id, send the response to let // it reset its member id and retry. responseCallback(JoinGroupResult(memberId, Errors.UNKNOWN_MEMBER_ID)) - } else { + } + // + else { + // 获取成员的元数据 val member = group.get(memberId) group.currentState match { + // 如果是PreparingRebalance状态,就说明消费者组正要开启 Rebalance 流程, + // 那么,调用 updateMemberAndRebalance 方法更新成员信息,并开始准备 Rebalance 即可。 + // 更新成员信息并开始准备Rebalance + // GroupCoordinator状态机:PreparingRebalance => PreparingRebalance case PreparingRebalance => updateMemberAndRebalance(group, member, protocols, s"Member ${member.memberId} joining group during ${group.currentState}", responseCallback) + // GroupCoordinator状态机:CompletingRebalance => PreparingRebalance case CompletingRebalance => + // 如果成员以前申请过加入组 if (member.matches(protocols)) { // member is joining with the same metadata (which could be because it failed to // receive the initial JoinGroup response), so just return current group information // for the current generation. + // 就判断一下,该成员的分区消费分配策略与订阅分区列表是否和已保存记录中的一致, + // 如果相同,就说明该成员已经应该发起过加入组的操作,并且 Coordinator 已经批准了,只是该成员没有收到, + // 因此,针对这种情况,代码构造一个 JoinGroupResult 对象,直接返回当前的组信息给成员。 responseCallback(JoinGroupResult( members = if (group.isLeader(memberId)) { group.currentMemberMetadata @@ -306,19 +353,27 @@ class GroupCoordinator(val brokerId: Int, error = Errors.NONE)) } else { // member has changed metadata, so force a rebalance + // 否则,就说明成员变更了订阅信息或分配策略,更新成员信息并开始准备Rebalance updateMemberAndRebalance(group, member, protocols, s"Updating metadata for member ${member.memberId} during ${group.currentState}", responseCallback) } + // Stable状态 + // GroupCoordinator状态机:Stable => PreparingRebalance case Stable => val member = group.get(memberId) + // 如果成员是Leader成员,强制Rebalance if (group.isLeader(memberId)) { // force a rebalance if the leader sends JoinGroup; // This allows the leader to trigger rebalances for changes affecting assignment // which do not affect the member metadata (such as topic metadata changes for the consumer) updateMemberAndRebalance(group, member, protocols, s"leader ${member.memberId} re-joining group during ${group.currentState}", responseCallback) - } else if (!member.matches(protocols)) { + } + // 如果成员元数据发生变更,强制Rebalance + else if (!member.matches(protocols)) { updateMemberAndRebalance(group, member, protocols, s"Updating metadata for member ${member.memberId} during ${group.currentState}", responseCallback) - } else { + } + // 如果不属于上述2种情况,仅返回当前组信息 + else { // for followers with no actual change to their metadata, just return group information // for the current generation which will allow them to issue SyncGroup responseCallback(JoinGroupResult( @@ -331,6 +386,7 @@ class GroupCoordinator(val brokerId: Int, error = Errors.NONE)) } + // 异常状态,返回 case Empty | Dead => // Group reaches unexpected state. Let the joining member reset their generation and rejoin. warn(s"Attempt to add rejoining member $memberId of group ${group.groupId} in " + @@ -342,13 +398,14 @@ class GroupCoordinator(val brokerId: Int, } } + // 处理SYNC_GROUP请求 def handleSyncGroup(groupId: String, generation: Int, memberId: String, protocolType: Option[String], protocolName: Option[String], groupInstanceId: Option[String], - groupAssignment: Map[String, Array[Byte]], + groupAssignment: Map[String, Array[Byte]], // 消费者提交的分区分配方案 responseCallback: SyncCallback): Unit = { validateGroupStatus(groupId, ApiKeys.SYNC_GROUP) match { case Some(error) if error == Errors.COORDINATOR_LOAD_IN_PROGRESS => @@ -361,8 +418,10 @@ class GroupCoordinator(val brokerId: Int, case Some(error) => responseCallback(SyncGroupResult(error)) case None => + // 获取组元素对象 groupManager.getGroup(groupId) match { case None => responseCallback(SyncGroupResult(Errors.UNKNOWN_MEMBER_ID)) + // 执行doSyncGroup case Some(group) => doSyncGroup(group, generation, memberId, protocolType, protocolName, groupInstanceId, groupAssignment, responseCallback) } @@ -402,6 +461,9 @@ class GroupCoordinator(val brokerId: Int, case PreparingRebalance => responseCallback(SyncGroupResult(Errors.REBALANCE_IN_PROGRESS)) + // 正常场景,收到leader的sync_group请求后 + // 1. 调用GroupMetadataManager.storeGroup保存组信息 + // 2. group状态更新到Stable case CompletingRebalance => group.get(memberId).awaitingSyncCallback = responseCallback removePendingSyncMember(group, memberId) @@ -411,6 +473,7 @@ class GroupCoordinator(val brokerId: Int, info(s"Assignment received from leader for group ${group.groupId} for generation ${group.generationId}. " + s"The group has ${group.size} members, ${group.allStaticMembers.size} of which are static.") + // 补全缺失的member和assignments // fill any missing members with an empty assignment val missing = group.allMembers.diff(groupAssignment.keySet) val assignment = groupAssignment ++ missing.map(_ -> Array.empty[Byte]).toMap @@ -419,17 +482,25 @@ class GroupCoordinator(val brokerId: Int, warn(s"Setting empty assignments for members $missing of ${group.groupId} for generation ${group.generationId}") } + // 调用GroupMetadataManager.storeGroup保存组信息 groupManager.storeGroup(group, assignment, (error: Errors) => { group.inLock { // another member may have joined the group while we were awaiting this callback, // so we must ensure we are still in the CompletingRebalance state and the same generation // when it gets invoked. if we have transitioned to another state, then do nothing if (group.is(CompletingRebalance) && generationId == group.generationId) { + // 有错误 if (error != Errors.NONE) { + // 清空分配方案并发送给所有成员 resetAndPropagateAssignmentError(group, error) + // 准备开启新一轮的Rebalance maybePrepareRebalance(group, s"error when storing group assignment during SyncGroup (member: $memberId)") - } else { + } + // 没有错误, + else { + // 在消费者组元数据中为每个消费者成员保存分配方案并发送给所有成员 setAndPropagateAssignment(group, assignment) + // group状态更新到Stable group.transitionTo(Stable) } } @@ -606,11 +677,13 @@ class GroupCoordinator(val brokerId: Int, groupError -> partitionErrors } + // GroupCoordinator处理Heartbeat def handleHeartbeat(groupId: String, memberId: String, groupInstanceId: Option[String], generationId: Int, responseCallback: Errors => Unit): Unit = { + // 当前Coordinator处于loading状态 validateGroupStatus(groupId, ApiKeys.HEARTBEAT).foreach { error => if (error == Errors.COORDINATOR_LOAD_IN_PROGRESS) // the group is still loading, so respond just blindly @@ -620,11 +693,13 @@ class GroupCoordinator(val brokerId: Int, return } + // groupManager.getGroup(groupId) match { case None => responseCallback(Errors.UNKNOWN_MEMBER_ID) case Some(group) => group.inLock { + // 异常情况 if (group.is(Dead)) { // if the group is marked as dead, it means some other thread has just removed the group // from the coordinator metadata; this is likely that the group has migrated to some other @@ -637,7 +712,10 @@ class GroupCoordinator(val brokerId: Int, responseCallback(Errors.UNKNOWN_MEMBER_ID) } else if (generationId != group.generationId) { responseCallback(Errors.ILLEGAL_GENERATION) - } else { + } + + // 正常情况下都会标记heartbeat成功 + else { group.currentState match { case Empty => responseCallback(Errors.UNKNOWN_MEMBER_ID) @@ -649,6 +727,7 @@ class GroupCoordinator(val brokerId: Int, completeAndScheduleNextHeartbeatExpiration(group, member) responseCallback(Errors.NONE) + // PreparingRebalance返回REBALANCE_IN_PROGRESS,客户端也会按正常返回处理 case PreparingRebalance => val member = group.get(memberId) completeAndScheduleNextHeartbeatExpiration(group, member) @@ -928,31 +1007,40 @@ class GroupCoordinator(val brokerId: Int, private def setAndPropagateAssignment(group: GroupMetadata, assignment: Map[String, Array[Byte]]): Unit = { assert(group.is(CompletingRebalance)) + // 更新member.assignment group.allMemberMetadata.foreach(member => member.assignment = assignment(member.memberId)) + propagateAssignment(group, Errors.NONE) } private def resetAndPropagateAssignmentError(group: GroupMetadata, error: Errors): Unit = { assert(group.is(CompletingRebalance)) + // group所有member分区分配方案更新为empty group.allMemberMetadata.foreach(_.assignment = Array.empty) + // 发送给member propagateAssignment(group, error) } + // 分区分配方案返回给group内所有member private def propagateAssignment(group: GroupMetadata, error: Errors): Unit = { val (protocolType, protocolName) = if (error == Errors.NONE) (group.protocolType, group.protocolName) else (None, None) + + // for (member <- group.allMemberMetadata) { if (member.assignment.isEmpty && error == Errors.NONE) { warn(s"Sending empty assignment to member ${member.memberId} of ${group.groupId} for generation ${group.generationId} with no errors") } + // 调用回调函数member.awaitingSyncCallback,每个消费者只会收到自己的分区分配方案 if (group.maybeInvokeSyncCallback(member, SyncGroupResult(protocolType, protocolName, member.assignment, error))) { // reset the session timeout for members after propagating the member's assignment. // This is because if any member's session expired while we were still awaiting either // the leader sync group or the storage callback, its expiration will be ignored and no // future heartbeat expectations will not be scheduled. + // 如果返回true,则设置下次心跳的时间 completeAndScheduleNextHeartbeatExpiration(group, member) } } @@ -965,13 +1053,16 @@ class GroupCoordinator(val brokerId: Int, completeAndScheduleNextExpiration(group, member, member.sessionTimeoutMs) } + // 完成当前心跳,并设置下次心跳的超时时间 private def completeAndScheduleNextExpiration(group: GroupMetadata, member: MemberMetadata, timeoutMs: Long): Unit = { val memberKey = MemberKey(group.groupId, member.memberId) + // 完成本次心跳 // complete current heartbeat expectation member.heartbeatSatisfied = true heartbeatPurgatory.checkAndComplete(memberKey) + // 设置下次心跳,超时时间timeoutMs // reschedule the next heartbeat expiration deadline member.heartbeatSatisfied = false val delayedHeartbeat = new DelayedHeartbeat(this, group, member.memberId, isPending = false, timeoutMs) @@ -1003,15 +1094,23 @@ class GroupCoordinator(val brokerId: Int, protocols: List[(String, Array[Byte])], group: GroupMetadata, callback: JoinCallback): Unit = { + val member = new MemberMetadata(memberId, groupInstanceId, clientId, clientHost, rebalanceTimeoutMs, sessionTimeoutMs, protocolType, protocols) + // 标识该成员是新成员,isNew 字段与心跳设置相关联 member.isNew = true // update the newMemberAdded flag to indicate that the join group can be further delayed + // 组状态是PreparingRebalance且generationId == 0,说明是第一次进行Rebalance,那么设置newMemberAdded = true + // 这个变量的作用,是 Kafka 为消费者组 Rebalance 流程做的一个性能优化。 + // 大致的思想:消费者组首次进行 Rebalance 时,让 Coordinator 多等待一段时间,从而让更多的消费者组成员加入到组中, + // 以免后来者申请入组而反复进行 Rebalance。这段多等待的时间,由服务端参数 group.initial.rebalance.delay.ms 设置。 if (group.is(PreparingRebalance) && group.generationId == 0) group.newMemberAdded = true + // 向消费者组添加成员 + // 如果还没有选出Leader成员,则设置当前成员为Leader(重要操作) group.add(member, callback) // The session timeout does not affect new members since they do not have their memberId and @@ -1020,14 +1119,18 @@ class GroupCoordinator(val brokerId: Int, // timeout during a long rebalance), they may simply retry which will lead to a lot of defunct // members in the rebalance. To prevent this going on indefinitely, we timeout JoinGroup requests // for new members. If the new member is still there, we expect it to retry. + // 设置下次心跳超期时间 completeAndScheduleNextExpiration(group, member, NewMemberJoinTimeoutMs) if (member.isStaticMember) { info(s"Adding new static member $groupInstanceId to group ${group.groupId} with member id $memberId.") + // 静态成员加入组 group.addStaticMember(groupInstanceId, memberId) } else { + // 当前已经加入组,则从`待决成员`中删除 group.removePendingMember(memberId) } + // 准备rebalance maybePrepareRebalance(group, s"Adding new member $memberId with group instance id $groupInstanceId") } @@ -1113,12 +1216,15 @@ class GroupCoordinator(val brokerId: Int, protocols: List[(String, Array[Byte])], reason: String, callback: JoinCallback): Unit = { + // 更新组成员信息;调用 GroupMetadata 的 updateMember 方法来更新消费者组成员; group.updateMember(member, protocols, callback) + // 这一步的核心思想,是将消费者组状态变更到 PreparingRebalance,然后创建 DelayedJoin 对象,并交由 Purgatory,等待延时处理加入组操作 maybePrepareRebalance(group, reason) } private def maybePrepareRebalance(group: GroupMetadata, reason: String): Unit = { group.inLock { + // 状态属于三者之一 Stable, CompletingRebalance, Empty if (group.canRebalance) prepareRebalance(group, reason) } @@ -1126,6 +1232,7 @@ class GroupCoordinator(val brokerId: Int, // package private for testing private[group] def prepareRebalance(group: GroupMetadata, reason: String): Unit = { + // 如果当前CompletingRebalance, 清空分配方案并返回REBALANCE_IN_PROGRESS // if any members are awaiting sync, cancel their request and have them rejoin if (group.is(CompletingRebalance)) resetAndPropagateAssignmentError(group, Errors.REBALANCE_IN_PROGRESS) @@ -1133,6 +1240,8 @@ class GroupCoordinator(val brokerId: Int, // if a sync expiration is pending, cancel it. removeSyncExpiration(group) + // 如果是Empty状态,则初始化InitialDelayedJoin对象 + // 如果是Stable状态,则初始化DelayedJoin对象 val delayedRebalance = if (group.is(Empty)) new InitialDelayedJoin(this, rebalancePurgatory, @@ -1143,12 +1252,14 @@ class GroupCoordinator(val brokerId: Int, else new DelayedJoin(this, group, group.rebalanceTimeoutMs) + // 状态更新到PreparingRebalance group.transitionTo(PreparingRebalance) info(s"Preparing to rebalance group ${group.groupId} in state ${group.currentState} with old generation " + s"${group.generationId} (${Topic.GROUP_METADATA_TOPIC_NAME}-${partitionFor(group.groupId)}) (reason: $reason)") val groupKey = GroupJoinKey(group.groupId) + // 尝试完成加入组操作,如果没有完成,则设置监听,延时进行加入 rebalancePurgatory.tryCompleteElseWatch(delayedRebalance, Seq(groupKey)) } @@ -1184,20 +1295,25 @@ class GroupCoordinator(val brokerId: Int, def onCompleteJoin(group: GroupMetadata): Unit = { group.inLock { + // 尚未加入group的消费者 val notYetRejoinedDynamicMembers = group.notYetRejoinedMembers.filterNot(_._2.isStaticMember) if (notYetRejoinedDynamicMembers.nonEmpty) { info(s"Group ${group.groupId} removed dynamic members " + s"who haven't joined: ${notYetRejoinedDynamicMembers.keySet}") + // 清理未加入组的消费者,取消heartbeat notYetRejoinedDynamicMembers.values.foreach { failedMember => removeHeartbeatForLeavingMember(group, failedMember) group.remove(failedMember.memberId) } } + // 组的状态 == dead if (group.is(Dead)) { info(s"Group ${group.groupId} is dead, skipping rebalance stage") - } else if (!group.maybeElectNewJoinedLeader() && group.allMembers.nonEmpty) { + } + // 如果组成员不为空,且还未选出Leader成员 + else if (!group.maybeElectNewJoinedLeader() && group.allMembers.nonEmpty) { // If all members are not rejoining, we will postpone the completion // of rebalance preparing stage, and send out another delayed operation // until session timeout removes all the non-responsive members. @@ -1205,8 +1321,11 @@ class GroupCoordinator(val brokerId: Int, rebalancePurgatory.tryCompleteElseWatch( new DelayedJoin(this, group, group.rebalanceTimeoutMs), Seq(GroupJoinKey(group.groupId))) - } else { + } + + else { group.initNextGeneration() + // 组为空 if (group.is(Empty)) { info(s"Group ${group.groupId} with generation ${group.generationId} is now empty " + s"(${Topic.GROUP_METADATA_TOPIC_NAME}-${partitionFor(group.groupId)})") @@ -1219,13 +1338,17 @@ class GroupCoordinator(val brokerId: Int, warn(s"Failed to write empty metadata for group ${group.groupId}: ${error.message}") } }) - } else { + } + // 组不为空 + else { info(s"Stabilized group ${group.groupId} generation ${group.generationId} " + s"(${Topic.GROUP_METADATA_TOPIC_NAME}-${partitionFor(group.groupId)}) with ${group.size} members") + // 遍历所有组员 // trigger the awaiting join group response callback for all the members after rebalancing for (member <- group.allMemberMetadata) { val joinResult = JoinGroupResult( + // 重要1,members信息仅发送给leader members = if (group.isLeader(member.memberId)) { group.currentMemberMetadata } else { @@ -1233,18 +1356,23 @@ class GroupCoordinator(val brokerId: Int, }, memberId = member.memberId, generationId = group.generationId, + // 重要2,确定分区分配策略 + // 服务端只是帮忙确定了整个组的分区分配策略,而分配消费分区的任务则交给了 Leader 消费者。 protocolType = group.protocolType, protocolName = group.protocolName, leaderId = group.leaderOrNull, error = Errors.NONE) - + // 调用回调函数返回 group.maybeInvokeJoinCallback(member, joinResult) + // 完成当前心跳任务并设置下一个 completeAndScheduleNextHeartbeatExpiration(group, member) + // 标记该成员为非新成员 member.isNew = false - + // pendingSyncMembers新增member group.addPendingSyncMember(member.memberId) } + // 调度PendingSync任务,这个是啥? schedulePendingSync(group) } } diff --git a/core/src/main/scala/kafka/coordinator/group/GroupMetadata.scala b/core/src/main/scala/kafka/coordinator/group/GroupMetadata.scala index 773d5ed11a8..ebedaf251d9 100644 --- a/core/src/main/scala/kafka/coordinator/group/GroupMetadata.scala +++ b/core/src/main/scala/kafka/coordinator/group/GroupMetadata.scala @@ -382,7 +382,12 @@ private[group] class GroupMetadata(val groupId: String, initialState: GroupState def notYetRejoinedMembers = members.filter(!_._2.isAwaitingJoin).toMap - def hasAllMembersJoined = members.size == numMembersAwaitingJoin && pendingMembers.isEmpty + def hasAllMembersJoined = { + // 判断组中是否创建了所有成员的元数据对象,条件有两个 + // 1.组中成员元数据对象数 = 申请加入组的成员数 + // 2.待决成员列表为空 + members.size == numMembersAwaitingJoin && pendingMembers.isEmpty + } def allMembers = members.keySet @@ -409,7 +414,7 @@ private[group] class GroupMetadata(val groupId: String, initialState: GroupState def generateMemberId(clientId: String, groupInstanceId: Option[String]): String = { groupInstanceId match { - case None => + case None =>rebalanceTimeoutMs clientId + GroupMetadata.MemberIdDelimiter + UUID.randomUUID().toString case Some(instanceId) => instanceId + GroupMetadata.MemberIdDelimiter + UUID.randomUUID().toString @@ -733,30 +738,46 @@ private[group] class GroupMetadata(val groupId: String, initialState: GroupState }.toMap } + // 处理 def removeExpiredOffsets(currentTimestamp: Long, offsetRetentionMs: Long): Map[TopicPartition, OffsetAndMetadata] = { - + // 用于获取订阅分区过期的位移值 def getExpiredOffsets(baseTimestamp: CommitRecordMetadataAndOffset => Long, + // subscribedTopics是豁免名单,名单中的topic不会被清理offset subscribedTopics: Set[String] = Set.empty): Map[TopicPartition, OffsetAndMetadata] = { + + // 遍历offsets中的所有分区,过滤出同时满足以下2个条件的数据 + // 条件1:该主题分区已经完成位移提交 + // 条件2:该主题分区在位移主题中对应消息的存在时间超过了阈值 offsets.filter { case (topicPartition, commitRecordMetadataAndOffset) => + // filter提交1:入参subscribedTopics是一个黑名单过滤器 !subscribedTopics.contains(topicPartition.topic()) && - !pendingOffsetCommits.contains(topicPartition) && { + // filter条件2:正在提交的pendingOffsetCommits中,不包含topicPartition。反之则不过期 + !pendingOffsetCommits.contains(topicPartition) && + // filter条件3:过期判定 + { + // 获取位移消息中的expireTimestamp时间戳,表示过期时间 commitRecordMetadataAndOffset.offsetAndMetadata.expireTimestamp match { case None => + // expireTimestamp值为空,判断是否超时 // current version with no per partition retention currentTimestamp - baseTimestamp(commitRecordMetadataAndOffset) >= offsetRetentionMs case Some(expireTimestamp) => + // expireTimestamp不为空,则判断当前时间和expireTimestamp // older versions with explicit expire_timestamp field => old expiration semantics is used currentTimestamp >= expireTimestamp } } }.map { case (topicPartition, commitRecordOffsetAndMetadata) => + // 返回offset对象 (topicPartition, commitRecordOffsetAndMetadata.offsetAndMetadata) }.toMap } + // offsets中需要清理的三类场景 val expiredOffsets: Map[TopicPartition, OffsetAndMetadata] = protocolType match { + // 场景1、废弃的Consumer case Some(_) if is(Empty) => // no consumer exists in the group => // - if current state timestamp exists and retention period has passed since group became Empty, @@ -768,6 +789,7 @@ private[group] class GroupMetadata(val groupId: String, initialState: GroupState .getOrElse(commitRecordMetadataAndOffset.offsetAndMetadata.commitTimestamp) ) + // 场景2、存活的consumer case Some(ConsumerProtocol.PROTOCOL_TYPE) if subscribedTopics.isDefined && is(Stable) => // consumers exist in the group and group is stable => // - if the group is aware of the subscribed topics and retention period had passed since the @@ -775,9 +797,10 @@ private[group] class GroupMetadata(val groupId: String, initialState: GroupState // expired getExpiredOffsets( _.offsetAndMetadata.commitTimestamp, - subscribedTopics.get + subscribedTopics.get // 传入黑名单 ) + // 场景3、standalone消费者 case None => // protocolType is None => standalone (simple) consumer, that uses Kafka for offset storage only // expire offsets with no pending offset commit that retention period has passed since their last commit @@ -790,7 +813,9 @@ private[group] class GroupMetadata(val groupId: String, initialState: GroupState if (expiredOffsets.nonEmpty) debug(s"Expired offsets from group '$groupId': ${expiredOffsets.keySet}") + // 清理offsets集合 offsets --= expiredOffsets.keySet + // 返回expiredOffsets expiredOffsets } diff --git a/core/src/main/scala/kafka/coordinator/group/GroupMetadataManager.scala b/core/src/main/scala/kafka/coordinator/group/GroupMetadataManager.scala index cb03c4cbb6f..1e73190d914 100644 --- a/core/src/main/scala/kafka/coordinator/group/GroupMetadataManager.scala +++ b/core/src/main/scala/kafka/coordinator/group/GroupMetadataManager.scala @@ -174,6 +174,7 @@ class GroupMetadataManager(brokerId: Int, def startup(retrieveGroupMetadataTopicPartitionCount: () => Int, enableMetadataExpiration: Boolean): Unit = { groupMetadataTopicPartitionCount = retrieveGroupMetadataTopicPartitionCount() scheduler.startup() + // 启动后台定时任务, if (enableMetadataExpiration) { scheduler.schedule(name = "delete-expired-group-metadata", fun = () => cleanupGroupMetadata(), @@ -241,17 +242,22 @@ class GroupMetadataManager(brokerId: Int, } } + /** + * 保存消费者组注册信息,写入磁盘`__consumer_offsets` + */ def storeGroup(group: GroupMetadata, groupAssignment: Map[String, Array[Byte]], responseCallback: Errors => Unit): Unit = { getMagic(partitionFor(group.groupId)) match { case Some(magicValue) => + // 构建注册消息的Key, Value // We always use CREATE_TIME, like the producer. The conversion to LOG_APPEND_TIME (if necessary) happens automatically. val timestampType = TimestampType.CREATE_TIME val timestamp = time.milliseconds() val key = GroupMetadataManager.groupMetadataKey(group.groupId) val value = GroupMetadataManager.groupMetadataValue(group, groupAssignment, interBrokerProtocolVersion) + // 使用 Key 和 Value 构建待写入消息集合。这里的消息集合类是 MemoryRecords val records = { val buffer = ByteBuffer.allocate(AbstractRecords.estimateSizeInBytes(magicValue, compressionType, Seq(new SimpleRecord(timestamp, key, value)).asJava)) @@ -260,6 +266,7 @@ class GroupMetadataManager(brokerId: Int, builder.build() } + // 计算要写入的目标分区, topic = __consumer_offsets val groupMetadataPartition = new TopicPartition(Topic.GROUP_METADATA_TOPIC_NAME, partitionFor(group.groupId)) val groupMetadataRecords = Map(groupMetadataPartition -> records) val generationId = group.generationId @@ -313,6 +320,8 @@ class GroupMetadataManager(brokerId: Int, responseCallback(responseError) } + // 向__consumer_offsets主题写入消息 + // 该方法就是调用 ReplicaManager 的 appendRecords 方法,将消息写入到位移主题中 appendForGroup(group, groupMetadataRecords, putCacheCallback) case None => @@ -338,9 +347,9 @@ class GroupMetadataManager(brokerId: Int, /** * Store offsets by appending it to the replicated log and then inserting to cache */ - def storeOffsets(group: GroupMetadata, + def storeOffsets(group: GroupMetadata, // 消费者组信息 consumerId: String, - offsetMetadata: immutable.Map[TopicPartition, OffsetAndMetadata], + offsetMetadata: immutable.Map[TopicPartition, OffsetAndMetadata], // offset responseCallback: immutable.Map[TopicPartition, Errors] => Unit, producerId: Long = RecordBatch.NO_PRODUCER_ID, producerEpoch: Short = RecordBatch.NO_PRODUCER_EPOCH): Unit = { @@ -369,11 +378,14 @@ class GroupMetadataManager(brokerId: Int, val timestampType = TimestampType.CREATE_TIME val timestamp = time.milliseconds() + // 构造开始 + // 构造写入数据,类型SimpleRecord, key, value val records = filteredOffsetMetadata.map { case (topicPartition, offsetAndMetadata) => val key = GroupMetadataManager.offsetCommitKey(group.groupId, topicPartition) val value = GroupMetadataManager.offsetCommitValue(offsetAndMetadata, interBrokerProtocolVersion) new SimpleRecord(timestamp, key, value) } + // 构造写入消息,topic = __consumer_offsets val offsetTopicPartition = new TopicPartition(Topic.GROUP_METADATA_TOPIC_NAME, partitionFor(group.groupId)) val buffer = ByteBuffer.allocate(AbstractRecords.estimateSizeInBytes(magicValue, compressionType, records.asJava)) @@ -385,14 +397,18 @@ class GroupMetadataManager(brokerId: Int, records.foreach(builder.append) val entries = Map(offsetTopicPartition -> builder.build()) + // 构造结束 + // 回调函数 // set the callback function to insert offsets into cache after log append completed def putCacheCallback(responseStatus: Map[TopicPartition, PartitionResponse]): Unit = { + // 确保消息写入到指定位移主题分区,否则抛出异常 // the append response should only contain the topics partition if (responseStatus.size != 1 || !responseStatus.contains(offsetTopicPartition)) throw new IllegalStateException("Append status %s should only have one partition %s" .format(responseStatus, offsetTopicPartition)) + // 更新指标 // record the number of offsets committed to the log offsetCommitsSensor.record(records.size) @@ -401,17 +417,21 @@ class GroupMetadataManager(brokerId: Int, val status = responseStatus(offsetTopicPartition) val responseError = group.inLock { + // 写入成功 if (status.error == Errors.NONE) { if (!group.is(Dead)) { filteredOffsetMetadata.forKeyValue { (topicPartition, offsetAndMetadata) => if (isTxnOffsetCommit) group.onTxnOffsetCommitAppend(producerId, topicPartition, CommitRecordMetadataAndOffset(Some(status.baseOffset), offsetAndMetadata)) else + // 调用GroupMetadata的onOffsetCommitAppend方法填充元数据 group.onOffsetCommitAppend(topicPartition, CommitRecordMetadataAndOffset(Some(status.baseOffset), offsetAndMetadata)) } } Errors.NONE - } else { + } + // 写入存在错误 + else { if (!group.is(Dead)) { if (!group.hasPendingOffsetCommitsFromProducer(producerId)) removeProducerGroup(producerId, group.groupId) @@ -469,10 +489,13 @@ class GroupMetadataManager(brokerId: Int, group.prepareOffsetCommit(offsetMetadata) } } - + // 写入消息到位移主题,同时调用putCacheCallback方法更新消费者元数据 + // 该方法就是调用 ReplicaManager 的 appendRecords 方法,将消息写入到位移主题中。 appendForGroup(group, entries, putCacheCallback) + // 如果不是Coordinator case None => + // 返回NOT_COORDINATOR异常, val commitStatus = offsetMetadata.map { case (topicPartition, _) => (topicPartition, Errors.NOT_COORDINATOR) } @@ -487,7 +510,10 @@ class GroupMetadataManager(brokerId: Int, */ def getOffsets(groupId: String, requireStable: Boolean, topicPartitionsOpt: Option[Seq[TopicPartition]]): Map[TopicPartition, PartitionData] = { trace("Getting offsets of %s for group %s.".format(topicPartitionsOpt.getOrElse("all partitions"), groupId)) + // 从cache获取offset val group = groupMetadataCache.get(groupId) + + // 没有组数据,返回空数据 if (group == null) { topicPartitionsOpt.getOrElse(Seq.empty[TopicPartition]).map { topicPartition => val partitionData = new PartitionData(OffsetFetchResponse.INVALID_OFFSET, @@ -496,6 +522,7 @@ class GroupMetadataManager(brokerId: Int, }.toMap } else { group.inLock { + // 如果组处于Dead状态,返回空数据 if (group.is(Dead)) { topicPartitionsOpt.getOrElse(Seq.empty[TopicPartition]).map { topicPartition => val partitionData = new PartitionData(OffsetFetchResponse.INVALID_OFFSET, @@ -506,6 +533,7 @@ class GroupMetadataManager(brokerId: Int, val topicPartitions = topicPartitionsOpt.getOrElse(group.allOffsets.keySet) topicPartitions.map { topicPartition => + // 如果针对该分区,当前正处于pending状态,则返回空数据 if (requireStable && group.hasPendingOffsetCommitsForTopicPartition(topicPartition)) { topicPartition -> new PartitionData(OffsetFetchResponse.INVALID_OFFSET, Optional.empty(), "", Errors.UNSTABLE_OFFSET_COMMIT) @@ -514,6 +542,7 @@ class GroupMetadataManager(brokerId: Int, case None => new PartitionData(OffsetFetchResponse.INVALID_OFFSET, Optional.empty(), "", Errors.NONE) + // 返回确定的offset case Some(offsetAndMetadata) => new PartitionData(offsetAndMetadata.offset, offsetAndMetadata.leaderEpoch, offsetAndMetadata.metadata, Errors.NONE) @@ -533,6 +562,7 @@ class GroupMetadataManager(brokerId: Int, val topicPartition = new TopicPartition(Topic.GROUP_METADATA_TOPIC_NAME, offsetsPartition) info(s"Scheduling loading of offsets and group metadata from $topicPartition for epoch $coordinatorEpoch") val startTimeMs = time.milliseconds() + // 异步调用loadGroupsAndOffsets方法 scheduler.schedule(topicPartition.toString, () => loadGroupsAndOffsets(topicPartition, coordinatorEpoch, onGroupLoaded, startTimeMs)) } @@ -570,18 +600,27 @@ class GroupMetadataManager(brokerId: Int, } private def doLoadGroupsAndOffsets(topicPartition: TopicPartition, onGroupLoaded: GroupMetadata => Unit): Unit = { + // 获取__consumer_offsets的LEO def logEndOffset: Long = replicaManager.getLogEndOffset(topicPartition).getOrElse(-1L) replicaManager.getLog(topicPartition) match { case None => warn(s"Attempted to load offsets and group metadata from $topicPartition, but found no log") + // 获取本地副本日志,从哪个地方开始获取? case Some(log) => + // 四个重要集合 + + // loadedOffsets:已完成位移值加载的分区列表 val loadedOffsets = mutable.Map[GroupTopicPartition, CommitRecordMetadataAndOffset]() + // pendingOffsets:位移值正在加载中的分区列表,只用于 Kafka 事务 val pendingOffsets = mutable.Map[Long, mutable.Map[GroupTopicPartition, CommitRecordMetadataAndOffset]]() + // loadedGroups:已完成组信息加载的消费者组列表 val loadedGroups = mutable.Map[String, GroupMetadata]() + // removedGroups:待移除的消费者组列表。 val removedGroups = mutable.Set[String]() + // 第一部分:读取位移主题 // buffer may not be needed if records are read from memory var buffer = ByteBuffer.allocate(0) @@ -592,6 +631,7 @@ class GroupMetadataManager(brokerId: Int, var readAtLeastOneRecord = true while (currOffset < logEndOffset && readAtLeastOneRecord && !shuttingDown.get()) { + // 从currOffset开始读取log,一次最多loadBufferSize条 val fetchDataInfo = log.read(currOffset, maxLength = config.loadBufferSize, isolation = FetchLogEnd, @@ -600,7 +640,9 @@ class GroupMetadataManager(brokerId: Int, readAtLeastOneRecord = fetchDataInfo.records.sizeInBytes > 0 val memRecords = (fetchDataInfo.records: @unchecked) match { + // MemoryRecords 直接返回 case records: MemoryRecords => records + // FileRecords 读取到buffer并转化为MemoryRecords case fileRecords: FileRecords => val sizeInBytes = fileRecords.sizeInBytes val bytesNeeded = Math.max(config.loadBufferSize, sizeInBytes) @@ -620,7 +662,9 @@ class GroupMetadataManager(brokerId: Int, MemoryRecords.readableRecords(buffer) } + // 第二部分:读取分区数据,填充四个集合 memRecords.batches.forEach { batch => + // 处理事务消息 TxnOffsetCommit val isTxnOffsetCommit = batch.isTransactional if (batch.isControlBatch) { val recordIterator = batch.iterator @@ -637,7 +681,9 @@ class GroupMetadataManager(brokerId: Int, } pendingOffsets.remove(batch.producerId) } - } else { + } + // 处理一般消息 + else { var batchBaseOffset: Option[Long] = None for (record <- batch.asScala) { require(record.hasKey, "Group metadata/offset entry key should not be null") @@ -651,17 +697,22 @@ class GroupMetadataManager(brokerId: Int, // load offset val groupTopicPartition = offsetKey.key + // 如果消息没有value,也就是墓碑消息 if (!record.hasValue) { if (isTxnOffsetCommit) pendingOffsets(batch.producerId).remove(groupTopicPartition) else + // 由于是墓碑消息,所以这个主题分区对应的消息已经过期 + // 之前如果加载了,应该从已完成位移值加载的分区列表中移除 loadedOffsets.remove(groupTopicPartition) } else { val offsetAndMetadata = GroupMetadataManager.readOffsetMessageValue(record.value) if (isTxnOffsetCommit) pendingOffsets(batch.producerId).put(groupTopicPartition, CommitRecordMetadataAndOffset(batchBaseOffset, offsetAndMetadata)) - else + else { + // 更新消息 loadedOffsets.put(groupTopicPartition, CommitRecordMetadataAndOffset(batchBaseOffset, offsetAndMetadata)) + } } case groupMetadataKey: GroupMetadataKey => @@ -685,6 +736,10 @@ class GroupMetadataManager(brokerId: Int, } } + // 第三步,处理4个集合 + + // 对loadedOffsets 进行分组,将完成信息加载的组对应的消费者组位移值保存到groupOffsets + // 将有消费者组消费位移,却没有消费者组信息的保存到emptyGroupOffsets val (groupOffsets, emptyGroupOffsets) = loadedOffsets .groupBy(_._1.group) .map { case (k, v) => @@ -757,14 +812,17 @@ class GroupMetadataManager(brokerId: Int, * * @param offsetsPartition Groups belonging to this partition of the offsets topic will be deleted from the cache. */ + // 当Broker卸任某些消费者组的 Coordinator 角色时,它需要将这些消费者组的信息从 groupMetadataCache 中全部移除掉 def removeGroupsForPartition(offsetsPartition: Int, coordinatorEpoch: Option[Int], onGroupUnloaded: GroupMetadata => Unit): Unit = { val topicPartition = new TopicPartition(Topic.GROUP_METADATA_TOPIC_NAME, offsetsPartition) info(s"Scheduling unloading of offsets and group metadata from $topicPartition") + // 定时任务执行 scheduler.schedule(topicPartition.toString, () => removeGroupsAndOffsets(topicPartition, coordinatorEpoch, onGroupUnloaded)) } + // 清理缓存:ownedPartitions, loadingPartitions, groupMetadataCache, openGroupsForProducer private [group] def removeGroupsAndOffsets(topicPartition: TopicPartition, coordinatorEpoch: Option[Int], onGroupUnloaded: GroupMetadata => Unit): Unit = { @@ -780,11 +838,12 @@ class GroupMetadataManager(brokerId: Int, // to prevent coordinator's check-and-get-group race condition ownedPartitions.remove(offsetsPartition) loadingPartitions.remove(offsetsPartition) - + // 清理 groupMetadataCache for (group <- groupMetadataCache.values) { if (partitionFor(group.groupId) == offsetsPartition) { onGroupUnloaded(group) groupMetadataCache.remove(group.groupId, group) + // 清理openGroupsForProducer removeGroupFromAllProducers(group.groupId) numGroupsRemoved += 1 numOffsetsRemoved += group.numOffsets @@ -823,7 +882,9 @@ class GroupMetadataManager(brokerId: Int, // visible for testing private[group] def cleanupGroupMetadata(): Unit = { val currentTimestamp = time.milliseconds() + // 清理过期的offset数据,删除内存数据,磁盘写入Tombstone墓碑消息 val numOffsetsRemoved = cleanupGroupMetadata(groupMetadataCache.values, group => { + // config.offsetsRetentionMs,配置项`offsets.retention.minutes` group.removeExpiredOffsets(currentTimestamp, config.offsetsRetentionMs) }) offsetExpiredSensor.record(numOffsetsRemoved) @@ -844,6 +905,7 @@ class GroupMetadataManager(brokerId: Int, groups.foreach { group => val groupId = group.groupId val (removedOffsets, groupIsDead, generation) = group.inLock { + // 调用函数,清理缓存并返回需要写入tombstone数据 val removedOffsets = selector(group) if (group.is(Empty) && !group.hasOffsets) { info(s"Group $groupId transitioned to Dead in generation ${group.generationId}") @@ -860,11 +922,13 @@ class GroupMetadataManager(brokerId: Int, val timestampType = TimestampType.CREATE_TIME val timestamp = time.milliseconds() + // 构造tombstone数据 replicaManager.onlinePartition(appendPartition).foreach { partition => val tombstones = ArrayBuffer.empty[SimpleRecord] removedOffsets.forKeyValue { (topicPartition, offsetAndMetadata) => trace(s"Removing expired/deleted offset and metadata for $groupId, $topicPartition: $offsetAndMetadata") val commitKey = GroupMetadataManager.offsetCommitKey(groupId, topicPartition) + // value=null,表示tombstone tombstones += new SimpleRecord(timestamp, commitKey, null) } trace(s"Marked ${removedOffsets.size} offsets in $appendPartition for deletion.") @@ -880,6 +944,7 @@ class GroupMetadataManager(brokerId: Int, trace(s"Group $groupId removed from the metadata cache and marked for deletion in $appendPartition.") } + // 写入磁盘 if (tombstones.nonEmpty) { try { // do not need to require acks since even if the tombstone is lost, diff --git a/core/src/main/scala/kafka/server/DelayedOperation.scala b/core/src/main/scala/kafka/server/DelayedOperation.scala index 09fd337aa9f..99f8e447b7f 100644 --- a/core/src/main/scala/kafka/server/DelayedOperation.scala +++ b/core/src/main/scala/kafka/server/DelayedOperation.scala @@ -67,8 +67,10 @@ abstract class DelayedOperation(override val delayMs: Long, */ def forceComplete(): Boolean = { if (completed.compareAndSet(false, true)) { + // 取消timeout timer // cancel the timeout timer cancel() + // 执行onComplete方法 onComplete() true } else { @@ -106,6 +108,7 @@ abstract class DelayedOperation(override val delayMs: Long, * @param f else function to be executed after first tryComplete returns false * @return result of tryComplete */ + // 线程安全版本的tryComplete,最多尝试执行2次tryComplete,第二次之前执行入参函数f private[server] def safeTryCompleteOrElse(f: => Unit): Boolean = inLock(lock) { if (tryComplete()) true else { @@ -118,6 +121,7 @@ abstract class DelayedOperation(override val delayMs: Long, /** * Thread-safe variant of tryComplete() */ + // 线程安全版本的tryComplete private[server] def safeTryComplete(): Boolean = inLock(lock)(tryComplete()) /* @@ -231,6 +235,8 @@ final class DelayedOperationPurgatory[T <: DelayedOperation](purgatoryName: Stri // To avoid the above scenario, we recommend DelayedOperationPurgatory.checkAndComplete() be called without holding // any exclusive lock. Since DelayedOperationPurgatory.checkAndComplete() completes delayed operations asynchronously, // holding a exclusive lock to make the call is often unnecessary. + + // 先执行operation.safeTryCompleteOrElse方法 if (operation.safeTryCompleteOrElse { watchKeys.foreach(key => watchForOperation(key, operation)) if (watchKeys.nonEmpty) estimatedTotalOperations.incrementAndGet() diff --git a/core/src/main/scala/kafka/server/KafkaApis.scala b/core/src/main/scala/kafka/server/KafkaApis.scala index 088444d3cb6..0e1db82f8ba 100644 --- a/core/src/main/scala/kafka/server/KafkaApis.scala +++ b/core/src/main/scala/kafka/server/KafkaApis.scala @@ -1318,17 +1318,27 @@ class KafkaApis(val requestChannel: RequestChannel, requestHelper.sendResponseMaybeThrottle(request, createResponse) } + // 处理FIND_COORDINATOR请求,分为两类 + // 消费者查找GroupCoordinator + // 事务查找TransactionCoordinator def handleFindCoordinatorRequest(request: RequestChannel.Request): Unit = { val findCoordinatorRequest = request.body[FindCoordinatorRequest] + // 消费者鉴权失败1 if (findCoordinatorRequest.data.keyType == CoordinatorType.GROUP.id && - !authHelper.authorize(request.context, DESCRIBE, GROUP, findCoordinatorRequest.data.key)) + !authHelper.authorize(request.context, DESCRIBE, GROUP, findCoordinatorRequest.data.key)) { requestHelper.sendErrorResponseMaybeThrottle(request, Errors.GROUP_AUTHORIZATION_FAILED.exception) + } + // 事务鉴权失败2 else if (findCoordinatorRequest.data.keyType == CoordinatorType.TRANSACTION.id && - !authHelper.authorize(request.context, DESCRIBE, TRANSACTIONAL_ID, findCoordinatorRequest.data.key)) - requestHelper.sendErrorResponseMaybeThrottle(request, Errors.TRANSACTIONAL_ID_AUTHORIZATION_FAILED.exception) + !authHelper.authorize(request.context, DESCRIBE, TRANSACTIONAL_ID, findCoordinatorRequest.data.key)) { + requestHelper.sendErrorResponseMaybeThrottle(request, Errors.TRANSACTIONAL_ID_AUTHORIZATION_FAILED.exception) + } + // else { + // 获取分区 val (partition, internalTopicName) = CoordinatorType.forId(findCoordinatorRequest.data.keyType) match { + // case CoordinatorType.GROUP => (groupCoordinator.partitionFor(findCoordinatorRequest.data.key), GROUP_METADATA_TOPIC_NAME) @@ -1336,6 +1346,7 @@ class KafkaApis(val requestChannel: RequestChannel, (txnCoordinator.partitionFor(findCoordinatorRequest.data.key), TRANSACTION_STATE_TOPIC_NAME) } + // 获取topic对应的元信息 val topicMetadata = metadataCache.getTopicMetadata(Set(internalTopicName), request.context.listenerName) def createFindCoordinatorResponse(error: Errors, node: Node, @@ -1350,16 +1361,21 @@ class KafkaApis(val requestChannel: RequestChannel, .setThrottleTimeMs(requestThrottleMs)) } + // coordinator不可用 if (topicMetadata.headOption.isEmpty) { val controllerMutationQuota = quotas.controllerMutation.newPermissiveQuotaFor(request) autoTopicCreationManager.createTopics(Seq(internalTopicName).toSet, controllerMutationQuota) requestHelper.sendResponseMaybeThrottle(request, requestThrottleMs => createFindCoordinatorResponse( Errors.COORDINATOR_NOT_AVAILABLE, Node.noNode, requestThrottleMs)) - } else { + } + // + else { def createResponse(requestThrottleMs: Int): AbstractResponse = { val responseBody = if (topicMetadata.head.errorCode != Errors.NONE.code) { createFindCoordinatorResponse(Errors.COORDINATOR_NOT_AVAILABLE, Node.noNode, requestThrottleMs) } else { + // 寻找coordinator节点,获取消费者组所属分区的leader,所在的broker node + // 查找路径:group.id => partition => broker node val coordinatorEndpoint = topicMetadata.head.partitions.asScala .find(_.partitionIndex == partition) .filter(_.leaderId != MetadataResponse.NO_LEADER_ID) @@ -1543,6 +1559,7 @@ class KafkaApis(val requestChannel: RequestChannel, )) } + // 前置校验 if (syncGroupRequest.data.groupInstanceId != null && config.interBrokerProtocolVersion < KAFKA_2_3_IV0) { // Only enable static membership when IBP >= 2.3, because it is not safe for the broker to use the static member logic // until we are sure that all brokers support it. If static group being loaded by an older coordinator, it will discard @@ -1553,7 +1570,10 @@ class KafkaApis(val requestChannel: RequestChannel, sendResponseCallback(SyncGroupResult(Errors.INCONSISTENT_GROUP_PROTOCOL)) } else if (!authHelper.authorize(request.context, READ, GROUP, syncGroupRequest.data.groupId)) { sendResponseCallback(SyncGroupResult(Errors.GROUP_AUTHORIZATION_FAILED)) - } else { + } + + // 校验通过 + else { val assignmentMap = immutable.Map.newBuilder[String, Array[Byte]] syncGroupRequest.data.assignments.forEach { assignment => assignmentMap += (assignment.memberId -> assignment.assignment) @@ -1598,6 +1618,7 @@ class KafkaApis(val requestChannel: RequestChannel, }) } + // Broker处理Heartbeat请求 def handleHeartbeatRequest(request: RequestChannel.Request): Unit = { val heartbeatRequest = request.body[HeartbeatRequest] @@ -1615,18 +1636,23 @@ class KafkaApis(val requestChannel: RequestChannel, requestHelper.sendResponseMaybeThrottle(request, createResponse) } + // static member支持从2.3开始 if (heartbeatRequest.data.groupInstanceId != null && config.interBrokerProtocolVersion < KAFKA_2_3_IV0) { // Only enable static membership when IBP >= 2.3, because it is not safe for the broker to use the static member logic // until we are sure that all brokers support it. If static group being loaded by an older coordinator, it will discard // the group.instance.id field, so static members could accidentally become "dynamic", which leads to wrong states. sendResponseCallback(Errors.UNSUPPORTED_VERSION) - } else if (!authHelper.authorize(request.context, READ, GROUP, heartbeatRequest.data.groupId)) { + } + // 认证失败 + else if (!authHelper.authorize(request.context, READ, GROUP, heartbeatRequest.data.groupId)) { requestHelper.sendResponseMaybeThrottle(request, requestThrottleMs => new HeartbeatResponse( new HeartbeatResponseData() .setThrottleTimeMs(requestThrottleMs) .setErrorCode(Errors.GROUP_AUTHORIZATION_FAILED.code))) - } else { + } + // 交给GroupCoordinator处理Heartbeat + else { // let the coordinator to handle heartbeat groupCoordinator.handleHeartbeat( heartbeatRequest.data.groupId, diff --git a/core/src/main/scala/kafka/utils/timer/Timer.scala b/core/src/main/scala/kafka/utils/timer/Timer.scala index 6973dcb2a79..3c8b3d8f29c 100644 --- a/core/src/main/scala/kafka/utils/timer/Timer.scala +++ b/core/src/main/scala/kafka/utils/timer/Timer.scala @@ -76,7 +76,9 @@ class SystemTimer(executorName: String, private[this] val readLock = readWriteLock.readLock() private[this] val writeLock = readWriteLock.writeLock() + // 增加任务 def add(timerTask: TimerTask): Unit = { + // 增加任务使用共享锁readLock readLock.lock() try { addTimerTaskEntry(new TimerTaskEntry(timerTask, timerTask.delayMs + Time.SYSTEM.hiResClockMs)) @@ -86,6 +88,8 @@ class SystemTimer(executorName: String, } private def addTimerTaskEntry(timerTaskEntry: TimerTaskEntry): Unit = { + // 增加任务 + // 失败,如果因为任务到期,直接提交线程池执行 if (!timingWheel.add(timerTaskEntry)) { // Already expired or cancelled if (!timerTaskEntry.cancelled) @@ -98,13 +102,21 @@ class SystemTimer(executorName: String, * waits up to timeoutMs before giving up. */ def advanceClock(timeoutMs: Long): Boolean = { + // DelayQueue中获取头节点,最多等待timeoutMs var bucket = delayQueue.poll(timeoutMs, TimeUnit.MILLISECONDS) if (bucket != null) { + // 时间轮前进使用writeLock writeLock.lock() try { while (bucket != null) { + // 移动时间轮 timingWheel.advanceClock(bucket.getExpiration) + // 既然被poll,bucket中的Task可能过期 + // 如果过期,则线程池执行 + // 如果未过期,则重新放入时间轮,完成降级 bucket.flush(addTimerTaskEntry) + + // 非阻塞地取出任务,将当前时点所有过期的 bucket 全部取出 bucket = delayQueue.poll() } } finally { diff --git a/core/src/main/scala/kafka/utils/timer/TimerTaskList.scala b/core/src/main/scala/kafka/utils/timer/TimerTaskList.scala index efd04806180..e28eec9ae2b 100644 --- a/core/src/main/scala/kafka/utils/timer/TimerTaskList.scala +++ b/core/src/main/scala/kafka/utils/timer/TimerTaskList.scala @@ -59,6 +59,7 @@ private[timer] class TimerTaskList(taskCounter: AtomicInteger) extends Delayed { } } + // 插入链表的结尾 // Add a timer task entry to this list def add(timerTaskEntry: TimerTaskEntry): Unit = { var done = false diff --git a/core/src/main/scala/kafka/utils/timer/TimingWheel.scala b/core/src/main/scala/kafka/utils/timer/TimingWheel.scala index 4535f3f3e8e..f6a28593d53 100644 --- a/core/src/main/scala/kafka/utils/timer/TimingWheel.scala +++ b/core/src/main/scala/kafka/utils/timer/TimingWheel.scala @@ -98,19 +98,29 @@ import java.util.concurrent.atomic.AtomicInteger */ @nonthreadsafe private[timer] class TimingWheel(tickMs: Long, wheelSize: Int, startMs: Long, taskCounter: AtomicInteger, queue: DelayQueue[TimerTaskList]) { - + // tickMs,时间轮每个格子的时间跨度 + // wheelSize,时间轮的格子数量 + // interval,时间轮的整体跨度 private[this] val interval = tickMs * wheelSize + + // buckets, size = wheelSize, 每个节点都是TimerTaskList private[this] val buckets = Array.tabulate[TimerTaskList](wheelSize) { _ => new TimerTaskList(taskCounter) } + // currentTime:表盘指针,表示当前时间轮所处的时间。这里对时间做了调整,表示小于当前时间的最大时间跨度的整数倍。 + // 假设当前时间戳为 123 毫秒,时间轮每格跨度为 20 ms,那么 currentTimer 就是小于 123 且 20 的整数倍,即 120 ms。 + // currentTime 将整个时间轮划分为到期部分和未到期部分,currentTime 当前执行的时间格也属于到期部分,表示刚好到期。 + // 需要处理当前时间格对应的 TimerTaskList 中的所有任务。 private[this] var currentTime = startMs - (startMs % tickMs) // rounding down to multiple of tickMs // overflowWheel can potentially be updated and read by two concurrent threads through add(). // Therefore, it needs to be volatile due to the issue of Double-Checked Locking pattern with JVM + // 上层时间轮对象,不一定存在,按需创建 @volatile private[this] var overflowWheel: TimingWheel = null private[this] def addOverflowWheel(): Unit = { synchronized { if (overflowWheel == null) { + // parent.tickMs = this.interval overflowWheel = new TimingWheel( tickMs = interval, wheelSize = wheelSize, @@ -122,7 +132,9 @@ private[timer] class TimingWheel(tickMs: Long, wheelSize: Int, startMs: Long, ta } } + // 时间轮增加任务 def add(timerTaskEntry: TimerTaskEntry): Boolean = { + // Task的过期时间戳 val expiration = timerTaskEntry.expirationMs if (timerTaskEntry.cancelled) { @@ -131,12 +143,16 @@ private[timer] class TimingWheel(tickMs: Long, wheelSize: Int, startMs: Long, ta } else if (expiration < currentTime + tickMs) { // Already expired false - } else if (expiration < currentTime + interval) { + } + // 当前时间轮可容纳 + else if (expiration < currentTime + interval) { + // 定位到 bucket,并追加到 TimerTaskList // Put in its own bucket val virtualId = expiration / tickMs val bucket = buckets((virtualId % wheelSize.toLong).toInt) bucket.add(timerTaskEntry) + // 更新bucket.expiration,并入列queue // Set the bucket expiration time if (bucket.setExpiration(virtualId * tickMs)) { // The bucket needs to be enqueued because it was an expired bucket @@ -147,18 +163,23 @@ private[timer] class TimingWheel(tickMs: Long, wheelSize: Int, startMs: Long, ta queue.offer(bucket) } true - } else { + } + // 当前时间轮不可容纳,放入parent时间轮 + else { // Out of the interval. Put it into the parent timer if (overflowWheel == null) addOverflowWheel() overflowWheel.add(timerTaskEntry) } } + // 时间轮前进 // Try to advance the clock def advanceClock(timeMs: Long): Unit = { if (timeMs >= currentTime + tickMs) { + // 更新currentTime指针 currentTime = timeMs - (timeMs % tickMs) + // 级联调用parent // Try to advance the clock of the overflow wheel if present if (overflowWheel != null) overflowWheel.advanceClock(currentTime) }