Browse Source

Kafka V2.8源码阅读:消费者部分

hekai-study-v2.8
Kai 11 months ago
parent
commit
52952492a9
  1. 119
      clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java
  2. 4
      clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java
  3. 12
      core/src/main/scala/kafka/coordinator/group/DelayedJoin.scala
  4. 184
      core/src/main/scala/kafka/coordinator/group/GroupCoordinator.scala
  5. 35
      core/src/main/scala/kafka/coordinator/group/GroupMetadata.scala
  6. 79
      core/src/main/scala/kafka/coordinator/group/GroupMetadataManager.scala
  7. 6
      core/src/main/scala/kafka/server/DelayedOperation.scala
  8. 38
      core/src/main/scala/kafka/server/KafkaApis.scala
  9. 12
      core/src/main/scala/kafka/utils/timer/Timer.scala
  10. 1
      core/src/main/scala/kafka/utils/timer/TimerTaskList.scala
  11. 27
      core/src/main/scala/kafka/utils/timer/TimingWheel.scala

119
clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java

@ -138,11 +138,15 @@ public abstract class AbstractCoordinator implements Closeable { @@ -138,11 +138,15 @@ public abstract class AbstractCoordinator implements Closeable {
private RequestFuture<ByteBuffer> joinFuture = null;
private RequestFuture<Void> findCoordinatorFuture = null;
private volatile RuntimeException fatalFindCoordinatorException = null;
// 消费者代,每次rebalance结束会更新 TODO?
// 初始代为-1
private Generation generation = Generation.NO_GENERATION;
private long lastRebalanceStartMs = -1L;
private long lastRebalanceEndMs = -1L;
private long lastTimeOfConnectionMs = -1L; // starting logging a warning only after unable to connect for a while
// Coordinator State
protected MemberState state = MemberState.UNJOINED;
@ -405,6 +409,7 @@ public abstract class AbstractCoordinator implements Closeable { @@ -405,6 +409,7 @@ public abstract class AbstractCoordinator implements Closeable {
* @return true iff the operation succeeded
*/
boolean joinGroupIfNeeded(final Timer timer) {
// 循环条件:rejoinNeeded || joinFuture != null;
while (rejoinNeededOrPending()) {
if (!ensureCoordinatorReady(timer)) {
return false;
@ -415,13 +420,14 @@ public abstract class AbstractCoordinator implements Closeable { @@ -415,13 +420,14 @@ public abstract class AbstractCoordinator implements Closeable {
// on each iteration of the loop because an event requiring a rebalance (such as a metadata
// refresh which changes the matched subscription set) can occur while another rebalance is
// still in progress.
// 准备join group
if (needsJoinPrepare) {
// need to set the flag before calling onJoinPrepare since the user callback may throw
// exception, in which case upon retry we should not retry onJoinPrepare either.
needsJoinPrepare = false;
onJoinPrepare(generation.generationId, generation.memberId);
}
// 发送JOIN_GROUP请求
final RequestFuture<ByteBuffer> future = initiateJoinGroup();
client.poll(future, timer);
if (!future.isDone()) {
@ -430,6 +436,7 @@ public abstract class AbstractCoordinator implements Closeable { @@ -430,6 +436,7 @@ public abstract class AbstractCoordinator implements Closeable {
}
if (future.succeeded()) {
// 防止heartbeat线程并发修改,使用synchronized获取最新版本
Generation generationSnapshot;
MemberState stateSnapshot;
@ -437,15 +444,20 @@ public abstract class AbstractCoordinator implements Closeable { @@ -437,15 +444,20 @@ public abstract class AbstractCoordinator implements Closeable {
// Can't use synchronized for {@code onJoinComplete}, because it can be long enough
// and shouldn't block heartbeat thread.
// See {@link PlaintextConsumerTest#testMaxPollIntervalMsDelayInAssignment}
// onJoinComplete执行太久,防止block heartbeat线程
synchronized (AbstractCoordinator.this) {
generationSnapshot = this.generation;
stateSnapshot = this.state;
}
// 实际在SYNC_GROUP成功后执行,此时state=STABLE
if (!generationSnapshot.equals(Generation.NO_GENERATION) && stateSnapshot == MemberState.STABLE) {
// Duplicate the buffer in case `onJoinComplete` does not complete and needs to be retried.
// 这里是SYNC_GROUP的返回值 assignment[]
ByteBuffer memberAssignment = future.value().duplicate();
// 处理加入组成功
onJoinComplete(generationSnapshot.generationId, generationSnapshot.memberId, generationSnapshot.protocolName, memberAssignment);
// Generally speaking we should always resetJoinGroupFuture once the future is done, but here
@ -564,15 +576,19 @@ public abstract class AbstractCoordinator implements Closeable { @@ -564,15 +576,19 @@ public abstract class AbstractCoordinator implements Closeable {
super(generation);
}
// 处理JOIN_GROUP请求的返回值
@Override
public void handle(JoinGroupResponse joinResponse, RequestFuture<ByteBuffer> future) {
Errors error = joinResponse.error();
// 没有错误场景
if (error == Errors.NONE) {
if (isProtocolTypeInconsistent(joinResponse.data().protocolType())) {
log.error("JoinGroup failed: Inconsistent Protocol Type, received {} but expected {}",
joinResponse.data().protocolType(), protocolType());
future.raise(Errors.INCONSISTENT_GROUP_PROTOCOL);
} else {
}
//
else {
log.debug("Received successful JoinGroup response: {}", joinResponse);
sensors.joinSensor.record(response.requestLatencyMs());
@ -581,7 +597,10 @@ public abstract class AbstractCoordinator implements Closeable { @@ -581,7 +597,10 @@ public abstract class AbstractCoordinator implements Closeable {
// if the consumer was woken up before a rebalance completes, we may have already left
// the group. In this case, we do not want to continue with the sync group.
future.raise(new UnjoinedGroupException());
} else {
}
else {
// 状态转化到COMPLETING_REBALANCE
state = MemberState.COMPLETING_REBALANCE;
// we only need to enable heartbeat thread whenever we transit to
@ -589,25 +608,35 @@ public abstract class AbstractCoordinator implements Closeable { @@ -589,25 +608,35 @@ public abstract class AbstractCoordinator implements Closeable {
if (heartbeatThread != null)
heartbeatThread.enable();
// 根据响应中的generationId、memberId和分区分配策略更新generation对象
AbstractCoordinator.this.generation = new Generation(
joinResponse.data().generationId(),
joinResponse.data().memberId(), joinResponse.data().protocolName());
log.info("Successfully joined group with generation {}", AbstractCoordinator.this.generation);
// 处理leader后续事件
if (joinResponse.isLeader()) {
// chain(future)表示:使用SYNC_GROUP的返回值代替JOIN_GROUP
onJoinLeader(joinResponse).chain(future);
} else {
}
// 处理follower后续事件
else {
// chain(future)表示:使用SYNC_GROUP的返回值代替JOIN_GROUP
onJoinFollower().chain(future);
}
}
}
}
} else if (error == Errors.COORDINATOR_LOAD_IN_PROGRESS) {
}
else if (error == Errors.COORDINATOR_LOAD_IN_PROGRESS) {
log.info("JoinGroup failed: Coordinator {} is loading the group.", coordinator());
// backoff and retry
future.raise(error);
} else if (error == Errors.UNKNOWN_MEMBER_ID) {
}
else if (error == Errors.UNKNOWN_MEMBER_ID) {
log.info("JoinGroup failed: {} Need to re-join the group. Sent generation was {}",
error.message(), sentGeneration);
// only need to reset the member id if generation has not been changed,
@ -616,20 +645,26 @@ public abstract class AbstractCoordinator implements Closeable { @@ -616,20 +645,26 @@ public abstract class AbstractCoordinator implements Closeable {
resetGenerationOnResponseError(ApiKeys.JOIN_GROUP, error);
future.raise(error);
} else if (error == Errors.COORDINATOR_NOT_AVAILABLE
}
else if (error == Errors.COORDINATOR_NOT_AVAILABLE
|| error == Errors.NOT_COORDINATOR) {
// re-discover the coordinator and retry with backoff
markCoordinatorUnknown(error);
log.info("JoinGroup failed: {} Marking coordinator unknown. Sent generation was {}",
error.message(), sentGeneration);
future.raise(error);
} else if (error == Errors.FENCED_INSTANCE_ID) {
}
else if (error == Errors.FENCED_INSTANCE_ID) {
// for join-group request, even if the generation has changed we would not expect the instance id
// gets fenced, and hence we always treat this as a fatal error
log.error("JoinGroup failed: The group instance id {} has been fenced by another instance. " +
"Sent generation was {}", rebalanceConfig.groupInstanceId, sentGeneration);
future.raise(error);
} else if (error == Errors.INCONSISTENT_GROUP_PROTOCOL
}
else if (error == Errors.INCONSISTENT_GROUP_PROTOCOL
|| error == Errors.INVALID_SESSION_TIMEOUT
|| error == Errors.INVALID_GROUP_ID
|| error == Errors.GROUP_AUTHORIZATION_FAILED
@ -644,11 +679,15 @@ public abstract class AbstractCoordinator implements Closeable { @@ -644,11 +679,15 @@ public abstract class AbstractCoordinator implements Closeable {
} else {
future.raise(error);
}
} else if (error == Errors.UNSUPPORTED_VERSION) {
}
else if (error == Errors.UNSUPPORTED_VERSION) {
log.error("JoinGroup failed due to unsupported version error. Please unset field group.instance.id " +
"and retry to see if the problem resolves");
future.raise(error);
} else if (error == Errors.MEMBER_ID_REQUIRED) {
}
else if (error == Errors.MEMBER_ID_REQUIRED) {
// Broker requires a concrete member id to be allowed to join the group. Update member id
// and send another join group request in next cycle.
String memberId = joinResponse.data().memberId();
@ -658,11 +697,15 @@ public abstract class AbstractCoordinator implements Closeable { @@ -658,11 +697,15 @@ public abstract class AbstractCoordinator implements Closeable {
AbstractCoordinator.this.generation = new Generation(OffsetCommitRequest.DEFAULT_GENERATION_ID, memberId, null);
}
future.raise(error);
} else if (error == Errors.REBALANCE_IN_PROGRESS) {
}
else if (error == Errors.REBALANCE_IN_PROGRESS) {
log.info("JoinGroup failed due to non-fatal error: REBALANCE_IN_PROGRESS, " +
"which could indicate a replication timeout on the broker. Will retry.");
future.raise(error);
} else {
}
else {
// unexpected error, throw the exception
log.error("JoinGroup failed due to unexpected error: {}", error.message());
future.raise(new KafkaException("Unexpected error in join group response: " + error.message()));
@ -670,7 +713,9 @@ public abstract class AbstractCoordinator implements Closeable { @@ -670,7 +713,9 @@ public abstract class AbstractCoordinator implements Closeable {
}
}
// JOIN_GROUP请求,follower节点执行此方法
private RequestFuture<ByteBuffer> onJoinFollower() {
// follower很简单,发送empty assignments给coordinator即可
// send follower's sync group with an empty assignment
SyncGroupRequest.Builder requestBuilder =
new SyncGroupRequest.Builder(
@ -687,12 +732,15 @@ public abstract class AbstractCoordinator implements Closeable { @@ -687,12 +732,15 @@ public abstract class AbstractCoordinator implements Closeable {
return sendSyncGroupRequest(requestBuilder);
}
// JOIN_GROUP请求,leader节点执行此方法
private RequestFuture<ByteBuffer> onJoinLeader(JoinGroupResponse joinResponse) {
try {
// 重点,leader执行分区分配方案 assignment
// perform the leader synchronization and send back the assignment for the group
Map<String, ByteBuffer> groupAssignment = performAssignment(joinResponse.data().leader(), joinResponse.data().protocolName(),
joinResponse.data().members());
// 准备SYNC_GROUP请求发送给Coordinator,包含分区分配方案
List<SyncGroupRequestData.SyncGroupRequestAssignment> groupAssignmentList = new ArrayList<>();
for (Map.Entry<String, ByteBuffer> assignment : groupAssignment.entrySet()) {
groupAssignmentList.add(new SyncGroupRequestData.SyncGroupRequestAssignment()
@ -1096,7 +1144,9 @@ public abstract class AbstractCoordinator implements Closeable { @@ -1096,7 +1144,9 @@ public abstract class AbstractCoordinator implements Closeable {
.setMemberId(this.generation.memberId)
.setGroupInstanceId(this.rebalanceConfig.groupInstanceId.orElse(null))
.setGenerationId(this.generation.generationId));
return client.send(coordinator, requestBuilder)
// heartbeat的返回值处理
.compose(new HeartbeatResponseHandler(generation));
}
@ -1105,6 +1155,7 @@ public abstract class AbstractCoordinator implements Closeable { @@ -1105,6 +1155,7 @@ public abstract class AbstractCoordinator implements Closeable {
super(generation);
}
// 处理heartbeat请求的响应
@Override
public void handle(HeartbeatResponse heartbeatResponse, RequestFuture<Void> future) {
sensors.heartbeatSensor.record(response.requestLatencyMs());
@ -1113,18 +1164,23 @@ public abstract class AbstractCoordinator implements Closeable { @@ -1113,18 +1164,23 @@ public abstract class AbstractCoordinator implements Closeable {
if (error == Errors.NONE) {
log.debug("Received successful Heartbeat response");
future.complete(null);
} else if (error == Errors.COORDINATOR_NOT_AVAILABLE
}
// GroupCoordinator发生变动
else if (error == Errors.COORDINATOR_NOT_AVAILABLE
|| error == Errors.NOT_COORDINATOR) {
log.info("Attempt to heartbeat failed since coordinator {} is either not started or not valid",
coordinator());
markCoordinatorUnknown(error);
future.raise(error);
} else if (error == Errors.REBALANCE_IN_PROGRESS) {
}
// 当前处于REBALANCE_IN_PROGRESS,触发Rejoin JOIN_GROUP
else if (error == Errors.REBALANCE_IN_PROGRESS) {
// since we may be sending the request during rebalance, we should check
// this case and ignore the REBALANCE_IN_PROGRESS error
synchronized (AbstractCoordinator.this) {
if (state == MemberState.STABLE) {
log.info("Attempt to heartbeat failed since group is rebalancing");
// 仅更改状态 rejoinNeeded = true
requestRejoin();
future.raise(error);
} else {
@ -1132,7 +1188,9 @@ public abstract class AbstractCoordinator implements Closeable { @@ -1132,7 +1188,9 @@ public abstract class AbstractCoordinator implements Closeable {
future.complete(null);
}
}
} else if (error == Errors.ILLEGAL_GENERATION ||
}
//
else if (error == Errors.ILLEGAL_GENERATION ||
error == Errors.UNKNOWN_MEMBER_ID ||
error == Errors.FENCED_INSTANCE_ID) {
if (generationUnchanged()) {
@ -1307,11 +1365,13 @@ public abstract class AbstractCoordinator implements Closeable { @@ -1307,11 +1365,13 @@ public abstract class AbstractCoordinator implements Closeable {
}
}
// Kafka消费者的心跳线程
private class HeartbeatThread extends KafkaThread implements AutoCloseable {
private boolean enabled = false;
private boolean closed = false;
private final AtomicReference<RuntimeException> failed = new AtomicReference<>(null);
// 名字: `kafka-coordinator-heartbeat-thread | ${groupId}`
private HeartbeatThread() {
super(HEARTBEAT_THREAD_PREFIX + (rebalanceConfig.groupId.isEmpty() ? "" : " | " + rebalanceConfig.groupId), true);
}
@ -1353,6 +1413,7 @@ public abstract class AbstractCoordinator implements Closeable { @@ -1353,6 +1413,7 @@ public abstract class AbstractCoordinator implements Closeable {
log.debug("Heartbeat thread started");
while (true) {
synchronized (AbstractCoordinator.this) {
// 双控制变量: closed, enabled
if (closed)
return;
@ -1369,9 +1430,11 @@ public abstract class AbstractCoordinator implements Closeable { @@ -1369,9 +1430,11 @@ public abstract class AbstractCoordinator implements Closeable {
continue;
}
// heartbeat作用2:帮助处理网络请求
client.pollNoWakeup();
long now = time.milliseconds();
// GroupCoordinator 未知
if (coordinatorUnknown()) {
if (findCoordinatorFuture != null) {
// clear the future so that after the backoff, if the hb still sees coordinator unknown in
@ -1383,12 +1446,16 @@ public abstract class AbstractCoordinator implements Closeable { @@ -1383,12 +1446,16 @@ public abstract class AbstractCoordinator implements Closeable {
} else {
lookupCoordinator();
}
} else if (heartbeat.sessionTimeoutExpired(now)) {
}
// sessionTimer超时
else if (heartbeat.sessionTimeoutExpired(now)) {
// the session timeout has expired without seeing a successful heartbeat, so we should
// probably make sure the coordinator is still healthy.
markCoordinatorUnknown("session timed out without receiving a "
+ "heartbeat response");
} else if (heartbeat.pollTimeoutExpired(now)) {
}
// pollTimer超时
else if (heartbeat.pollTimeoutExpired(now)) {
// the poll timeout has expired, which means that the foreground thread has stalled
// in between calls to poll().
String leaveReason = "consumer poll timeout has expired. This means the time between subsequent calls to poll() " +
@ -1397,17 +1464,24 @@ public abstract class AbstractCoordinator implements Closeable { @@ -1397,17 +1464,24 @@ public abstract class AbstractCoordinator implements Closeable {
"You can address this either by increasing max.poll.interval.ms or by reducing " +
"the maximum size of batches returned in poll() with max.poll.records.";
maybeLeaveGroup(leaveReason);
} else if (!heartbeat.shouldHeartbeat(now)) {
}
// 距离上一次心跳时间是否超过`heartbeat.interval.ms`
else if (!heartbeat.shouldHeartbeat(now)) {
// poll again after waiting for the retry backoff in case the heartbeat failed or the
// coordinator disconnected
AbstractCoordinator.this.wait(rebalanceConfig.retryBackoffMs);
} else {
}
// 前置检查通过,需要发送心跳
else {
// 更新状态:发送中
heartbeat.sentHeartbeat(now);
// 发送heartbeat请求
final RequestFuture<Void> heartbeatFuture = sendHeartbeatRequest();
heartbeatFuture.addListener(new RequestFutureListener<Void>() {
@Override
public void onSuccess(Void value) {
synchronized (AbstractCoordinator.this) {
// 更新heartbeat状态:发送成功
heartbeat.receiveHeartbeat();
}
}
@ -1415,13 +1489,16 @@ public abstract class AbstractCoordinator implements Closeable { @@ -1415,13 +1489,16 @@ public abstract class AbstractCoordinator implements Closeable {
@Override
public void onFailure(RuntimeException e) {
synchronized (AbstractCoordinator.this) {
// RebalanceInProgressException异常,客户端认为正常收到结果
if (e instanceof RebalanceInProgressException) {
// it is valid to continue heartbeating while the group is rebalancing. This
// ensures that the coordinator keeps the member in the group for as long
// as the duration of the rebalance timeout. If we stop sending heartbeats,
// however, then the session timeout may expire before we can rejoin.
heartbeat.receiveHeartbeat();
} else if (e instanceof FencedInstanceIdException) {
}
// 其他异常,标记为失败
else if (e instanceof FencedInstanceIdException) {
log.error("Caught fenced group.instance.id {} error in heartbeat thread", rebalanceConfig.groupInstanceId);
heartbeatThread.failed.set(e);
} else {

4
clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java

@ -408,6 +408,8 @@ public final class ConsumerCoordinator extends AbstractCoordinator { @@ -408,6 +408,8 @@ public final class ConsumerCoordinator extends AbstractCoordinator {
// Revoke partitions that were previously owned but no longer assigned;
// note that we should only change the assignment (or update the assignor's state)
// AFTER we've triggered the revoke callback
// 触发subscriptions.rebalanceListener.onPartitionsRevoked()
firstException.compareAndSet(null, invokePartitionsRevoked(revokedPartitions));
// If revoked any partitions, need to re-join the group afterwards
@ -421,6 +423,7 @@ public final class ConsumerCoordinator extends AbstractCoordinator { @@ -421,6 +423,7 @@ public final class ConsumerCoordinator extends AbstractCoordinator {
maybeUpdateJoinedSubscription(assignedPartitions);
// Catch any exception here to make sure we could complete the user callback.
// 触发assignor.onAssignment()
firstException.compareAndSet(null, invokeOnAssignment(assignor, assignment));
// Reschedule the auto commit starting from now
@ -430,6 +433,7 @@ public final class ConsumerCoordinator extends AbstractCoordinator { @@ -430,6 +433,7 @@ public final class ConsumerCoordinator extends AbstractCoordinator {
subscriptions.assignFromSubscribed(assignedPartitions);
// Add partitions that were not previously owned but are now assigned
// 触发subscriptions.rebalanceListener.onPartitionsAssigned()
firstException.compareAndSet(null, invokePartitionsAssigned(addedPartitions));
if (firstException.get() != null) {

12
core/src/main/scala/kafka/coordinator/group/DelayedJoin.scala

@ -39,13 +39,21 @@ private[group] class DelayedJoin( @@ -39,13 +39,21 @@ private[group] class DelayedJoin(
rebalanceTimeout,
group.lock
) {
override def tryComplete(): Boolean = coordinator.tryCompleteJoin(group, forceComplete _)
override def tryComplete(): Boolean = {
// 调用 GroupCoordinator.tryComplete
// forceComplete: 取消超时时间的计时器并执行 onComplete 方法
coordinator.tryCompleteJoin(group, forceComplete _)
}
override def onExpiration(): Unit = {
// try to complete delayed actions introduced by coordinator.onCompleteJoin
tryToCompleteDelayedAction()
}
override def onComplete(): Unit = coordinator.onCompleteJoin(group)
override def onComplete(): Unit = {
// 执行成功回调
//
coordinator.onCompleteJoin(group)
}
// TODO: remove this ugly chain after we move the action queue to handler thread
private def tryToCompleteDelayedAction(): Unit = coordinator.groupManager.replicaManager.tryCompleteActions()

184
core/src/main/scala/kafka/coordinator/group/GroupCoordinator.scala

@ -152,26 +152,28 @@ class GroupCoordinator(val brokerId: Int, @@ -152,26 +152,28 @@ class GroupCoordinator(val brokerId: Int,
}
}
def handleJoinGroup(groupId: String,
memberId: String,
groupInstanceId: Option[String],
requireKnownMemberId: Boolean,
clientId: String,
clientHost: String,
def handleJoinGroup(groupId: String, // 消费者groupId
memberId: String,// 消费者groupId
groupInstanceId: Option[String],// 消费者组实例id
requireKnownMemberId: Boolean,// 是否需要成员ID不为空
clientId: String,// 消费者clientId
clientHost: String,// 消费者主机名
rebalanceTimeoutMs: Int,
sessionTimeoutMs: Int,
protocolType: String,
protocols: List[(String, Array[Byte])],
responseCallback: JoinCallback): Unit = {
// 校验入参groupId及状态
validateGroupStatus(groupId, ApiKeys.JOIN_GROUP).foreach { error =>
responseCallback(JoinGroupResult(memberId, error))
return
}
// 校验入参sessionTimeoutMs
if (sessionTimeoutMs < groupConfig.groupMinSessionTimeoutMs ||
sessionTimeoutMs > groupConfig.groupMaxSessionTimeoutMs) {
responseCallback(JoinGroupResult(memberId, Errors.INVALID_SESSION_TIMEOUT))
} else {
// 消费者组成员ID是否为空
val isUnknownMember = memberId == JoinGroupRequest.UNKNOWN_MEMBER_ID
// group is created if it does not exist and the member id is UNKNOWN. if member
// is specified but group does not exist, request is rejected with UNKNOWN_MEMBER_ID
@ -180,12 +182,17 @@ class GroupCoordinator(val brokerId: Int, @@ -180,12 +182,17 @@ class GroupCoordinator(val brokerId: Int,
responseCallback(JoinGroupResult(memberId, Errors.UNKNOWN_MEMBER_ID))
case Some(group) =>
group.inLock {
// 判断消费者组是否有足够空间接受当前成员
if (!acceptJoiningMember(group, memberId)) {
group.remove(memberId)
responseCallback(JoinGroupResult(JoinGroupRequest.UNKNOWN_MEMBER_ID, Errors.GROUP_MAX_SIZE_REACHED))
} else if (isUnknownMember) {
}
// 安排空id成员入组
else if (isUnknownMember) {
doUnknownJoinGroup(group, groupInstanceId, requireKnownMemberId, clientId, clientHost, rebalanceTimeoutMs, sessionTimeoutMs, protocolType, protocols, responseCallback)
} else {
}
// 安排非空id成员入组
else {
doJoinGroup(group, memberId, groupInstanceId, clientId, clientHost, rebalanceTimeoutMs, sessionTimeoutMs, protocolType, protocols, responseCallback)
}
@ -209,20 +216,35 @@ class GroupCoordinator(val brokerId: Int, @@ -209,20 +216,35 @@ class GroupCoordinator(val brokerId: Int,
protocols: List[(String, Array[Byte])],
responseCallback: JoinCallback): Unit = {
group.inLock {
// 如果消费者组状态为Dead
if (group.is(Dead)) {
// if the group is marked as dead, it means some other thread has just removed the group
// from the coordinator metadata; it is likely that the group has migrated to some other
// coordinator OR the group is in a transient unstable phase. Let the member retry
// finding the correct coordinator and rejoin.
responseCallback(JoinGroupResult(JoinGroupRequest.UNKNOWN_MEMBER_ID, Errors.COORDINATOR_NOT_AVAILABLE))
} else if (!group.supportsProtocols(protocolType, MemberMetadata.plainProtocolSet(protocols))) {
}
// 检查消费者版本和策略
// 如果成员配置的协议类型/分区消费分配策略与消费者组的不匹配封装INCONSISTENT_GROUP_PROTOCOL异常并调用回调函数返回
// 这里需要注意一点新加入成员的设置的分区分配策略必须至少有一个策略是组内所有成员都支持的因为消费者组选举分区分配策略时
// 第一步就是要获取所有成员都支持的分区分配策略否则无法选举
else if (!group.supportsProtocols(protocolType, MemberMetadata.plainProtocolSet(protocols))) {
responseCallback(JoinGroupResult(JoinGroupRequest.UNKNOWN_MEMBER_ID, Errors.INCONSISTENT_GROUP_PROTOCOL))
} else {
val newMemberId = group.generateMemberId(clientId, groupInstanceId)
}
else {
// 服务端生成memberId`client.id-${UUID}`或者`groupInstanceId.id-${UUID}`拼接而成
val newMemberId = group.generateMemberId(clientId, groupInstanceId)
// 添加静态成员并触发rebalance
if (group.hasStaticMember(groupInstanceId)) {
updateStaticMemberAndRebalance(group, newMemberId, groupInstanceId, protocols, responseCallback)
} else if (requireKnownMemberId) {
}
// 如果要求成员ID不为空默认为true
// 当满足条件 joinGroupRequest.version >= 4 && groupInstanceId.isEmptyrequireKnownMemberId为true
else if (requireKnownMemberId) {
// 如果申请加入组的成员 memberId 为空服务端会先生成一个memberId然后将该请求 "打回去"携带生成的 memberId MEMBER_ID_REQUIRED 异常信息
// 当客户端收到包含该异常信息的响应会根据返回的 memberId更新自身的信息并重新发送 JoinGroupRequest之后就会调用 doJoinGroup 方法了
// If member id required (dynamic membership), register the member in the pending member list
// and send back a response to call for another join group request with allocated member id.
debug(s"Dynamic member with unknown member id joins group ${group.groupId} in " +
@ -230,7 +252,9 @@ class GroupCoordinator(val brokerId: Int, @@ -230,7 +252,9 @@ class GroupCoordinator(val brokerId: Int,
group.addPendingMember(newMemberId)
addPendingMemberExpiration(group, newMemberId, sessionTimeoutMs)
responseCallback(JoinGroupResult(newMemberId, Errors.MEMBER_ID_REQUIRED))
} else {
}
// 增加member并触发rebalance
else {
info(s"${if (groupInstanceId.isDefined) "Static" else "Dynamic"} Member with unknown member id joins group ${group.groupId} in " +
s"${group.currentState} state. Created a new member id $newMemberId for this member and add to the group.")
addMemberAndRebalance(rebalanceTimeoutMs, sessionTimeoutMs, newMemberId, groupInstanceId,
@ -240,6 +264,7 @@ class GroupCoordinator(val brokerId: Int, @@ -240,6 +264,7 @@ class GroupCoordinator(val brokerId: Int,
}
}
// 安排设置了memberId的消费者加入组
private def doJoinGroup(group: GroupMetadata,
memberId: String,
groupInstanceId: Option[String],
@ -251,6 +276,7 @@ class GroupCoordinator(val brokerId: Int, @@ -251,6 +276,7 @@ class GroupCoordinator(val brokerId: Int,
protocols: List[(String, Array[Byte])],
responseCallback: JoinCallback): Unit = {
group.inLock {
// 前置检查各类异常情况
if (group.is(Dead)) {
// if the group is marked as dead, it means some other thread has just removed the group
// from the coordinator metadata; this is likely that the group has migrated to some other
@ -259,7 +285,9 @@ class GroupCoordinator(val brokerId: Int, @@ -259,7 +285,9 @@ class GroupCoordinator(val brokerId: Int,
responseCallback(JoinGroupResult(memberId, Errors.COORDINATOR_NOT_AVAILABLE))
} else if (!group.supportsProtocols(protocolType, MemberMetadata.plainProtocolSet(protocols))) {
responseCallback(JoinGroupResult(memberId, Errors.INCONSISTENT_GROUP_PROTOCOL))
} else if (group.isPendingMember(memberId)) {
}
// 第一阶段处理`待决成员`入组申请
else if (group.isPendingMember(memberId)) {
// A rejoining pending member will be accepted. Note that pending member will never be a static member.
if (groupInstanceId.isDefined) {
throw new IllegalStateException(s"the static member $groupInstanceId was not expected to be assigned " +
@ -267,31 +295,50 @@ class GroupCoordinator(val brokerId: Int, @@ -267,31 +295,50 @@ class GroupCoordinator(val brokerId: Int,
} else {
debug(s"Dynamic Member with specific member id $memberId joins group ${group.groupId} in " +
s"${group.currentState} state. Adding to the group now.")
// `待决成员`加入组
addMemberAndRebalance(rebalanceTimeoutMs, sessionTimeoutMs, memberId, groupInstanceId,
clientId, clientHost, protocolType, protocols, group, responseCallback)
}
} else {
}
// 第二阶段处理`非待决成员`的入组申请
else {
// 消费者组实例id没找到
val groupInstanceIdNotFound = groupInstanceId.isDefined && !group.hasStaticMember(groupInstanceId)
// 检查static member.id是否最新
if (group.isStaticMemberFenced(memberId, groupInstanceId, "join-group")) {
// given member id doesn't match with the groupInstanceId. Inform duplicate instance to shut down immediately.
responseCallback(JoinGroupResult(memberId, Errors.FENCED_INSTANCE_ID))
} else if (!group.has(memberId) || groupInstanceIdNotFound) {
}
// memberId找不到失败
else if (!group.has(memberId) || groupInstanceIdNotFound) {
// If the dynamic member trying to register with an unrecognized id, or
// the static member joins with unknown group instance id, send the response to let
// it reset its member id and retry.
responseCallback(JoinGroupResult(memberId, Errors.UNKNOWN_MEMBER_ID))
} else {
}
//
else {
// 获取成员的元数据
val member = group.get(memberId)
group.currentState match {
// 如果是PreparingRebalance状态就说明消费者组正要开启 Rebalance 流程
// 那么调用 updateMemberAndRebalance 方法更新成员信息并开始准备 Rebalance 即可
// 更新成员信息并开始准备Rebalance
// GroupCoordinator状态机PreparingRebalance => PreparingRebalance
case PreparingRebalance =>
updateMemberAndRebalance(group, member, protocols, s"Member ${member.memberId} joining group during ${group.currentState}", responseCallback)
// GroupCoordinator状态机CompletingRebalance => PreparingRebalance
case CompletingRebalance =>
// 如果成员以前申请过加入组
if (member.matches(protocols)) {
// member is joining with the same metadata (which could be because it failed to
// receive the initial JoinGroup response), so just return current group information
// for the current generation.
// 就判断一下该成员的分区消费分配策略与订阅分区列表是否和已保存记录中的一致
// 如果相同就说明该成员已经应该发起过加入组的操作并且 Coordinator 已经批准了只是该成员没有收到
// 因此针对这种情况代码构造一个 JoinGroupResult 对象直接返回当前的组信息给成员
responseCallback(JoinGroupResult(
members = if (group.isLeader(memberId)) {
group.currentMemberMetadata
@ -306,19 +353,27 @@ class GroupCoordinator(val brokerId: Int, @@ -306,19 +353,27 @@ class GroupCoordinator(val brokerId: Int,
error = Errors.NONE))
} else {
// member has changed metadata, so force a rebalance
// 否则就说明成员变更了订阅信息或分配策略更新成员信息并开始准备Rebalance
updateMemberAndRebalance(group, member, protocols, s"Updating metadata for member ${member.memberId} during ${group.currentState}", responseCallback)
}
// Stable状态
// GroupCoordinator状态机Stable => PreparingRebalance
case Stable =>
val member = group.get(memberId)
// 如果成员是Leader成员强制Rebalance
if (group.isLeader(memberId)) {
// force a rebalance if the leader sends JoinGroup;
// This allows the leader to trigger rebalances for changes affecting assignment
// which do not affect the member metadata (such as topic metadata changes for the consumer)
updateMemberAndRebalance(group, member, protocols, s"leader ${member.memberId} re-joining group during ${group.currentState}", responseCallback)
} else if (!member.matches(protocols)) {
}
// 如果成员元数据发生变更强制Rebalance
else if (!member.matches(protocols)) {
updateMemberAndRebalance(group, member, protocols, s"Updating metadata for member ${member.memberId} during ${group.currentState}", responseCallback)
} else {
}
// 如果不属于上述2种情况仅返回当前组信息
else {
// for followers with no actual change to their metadata, just return group information
// for the current generation which will allow them to issue SyncGroup
responseCallback(JoinGroupResult(
@ -331,6 +386,7 @@ class GroupCoordinator(val brokerId: Int, @@ -331,6 +386,7 @@ class GroupCoordinator(val brokerId: Int,
error = Errors.NONE))
}
// 异常状态返回
case Empty | Dead =>
// Group reaches unexpected state. Let the joining member reset their generation and rejoin.
warn(s"Attempt to add rejoining member $memberId of group ${group.groupId} in " +
@ -342,13 +398,14 @@ class GroupCoordinator(val brokerId: Int, @@ -342,13 +398,14 @@ class GroupCoordinator(val brokerId: Int,
}
}
// 处理SYNC_GROUP请求
def handleSyncGroup(groupId: String,
generation: Int,
memberId: String,
protocolType: Option[String],
protocolName: Option[String],
groupInstanceId: Option[String],
groupAssignment: Map[String, Array[Byte]],
groupAssignment: Map[String, Array[Byte]], // 消费者提交的分区分配方案
responseCallback: SyncCallback): Unit = {
validateGroupStatus(groupId, ApiKeys.SYNC_GROUP) match {
case Some(error) if error == Errors.COORDINATOR_LOAD_IN_PROGRESS =>
@ -361,8 +418,10 @@ class GroupCoordinator(val brokerId: Int, @@ -361,8 +418,10 @@ class GroupCoordinator(val brokerId: Int,
case Some(error) => responseCallback(SyncGroupResult(error))
case None =>
// 获取组元素对象
groupManager.getGroup(groupId) match {
case None => responseCallback(SyncGroupResult(Errors.UNKNOWN_MEMBER_ID))
// 执行doSyncGroup
case Some(group) => doSyncGroup(group, generation, memberId, protocolType, protocolName,
groupInstanceId, groupAssignment, responseCallback)
}
@ -402,6 +461,9 @@ class GroupCoordinator(val brokerId: Int, @@ -402,6 +461,9 @@ class GroupCoordinator(val brokerId: Int,
case PreparingRebalance =>
responseCallback(SyncGroupResult(Errors.REBALANCE_IN_PROGRESS))
// 正常场景收到leader的sync_group请求后
// 1. 调用GroupMetadataManager.storeGroup保存组信息
// 2. group状态更新到Stable
case CompletingRebalance =>
group.get(memberId).awaitingSyncCallback = responseCallback
removePendingSyncMember(group, memberId)
@ -411,6 +473,7 @@ class GroupCoordinator(val brokerId: Int, @@ -411,6 +473,7 @@ class GroupCoordinator(val brokerId: Int,
info(s"Assignment received from leader for group ${group.groupId} for generation ${group.generationId}. " +
s"The group has ${group.size} members, ${group.allStaticMembers.size} of which are static.")
// 补全缺失的member和assignments
// fill any missing members with an empty assignment
val missing = group.allMembers.diff(groupAssignment.keySet)
val assignment = groupAssignment ++ missing.map(_ -> Array.empty[Byte]).toMap
@ -419,17 +482,25 @@ class GroupCoordinator(val brokerId: Int, @@ -419,17 +482,25 @@ class GroupCoordinator(val brokerId: Int,
warn(s"Setting empty assignments for members $missing of ${group.groupId} for generation ${group.generationId}")
}
// 调用GroupMetadataManager.storeGroup保存组信息
groupManager.storeGroup(group, assignment, (error: Errors) => {
group.inLock {
// another member may have joined the group while we were awaiting this callback,
// so we must ensure we are still in the CompletingRebalance state and the same generation
// when it gets invoked. if we have transitioned to another state, then do nothing
if (group.is(CompletingRebalance) && generationId == group.generationId) {
// 有错误
if (error != Errors.NONE) {
// 清空分配方案并发送给所有成员
resetAndPropagateAssignmentError(group, error)
// 准备开启新一轮的Rebalance
maybePrepareRebalance(group, s"error when storing group assignment during SyncGroup (member: $memberId)")
} else {
}
// 没有错误
else {
// 在消费者组元数据中为每个消费者成员保存分配方案并发送给所有成员
setAndPropagateAssignment(group, assignment)
// group状态更新到Stable
group.transitionTo(Stable)
}
}
@ -606,11 +677,13 @@ class GroupCoordinator(val brokerId: Int, @@ -606,11 +677,13 @@ class GroupCoordinator(val brokerId: Int,
groupError -> partitionErrors
}
// GroupCoordinator处理Heartbeat
def handleHeartbeat(groupId: String,
memberId: String,
groupInstanceId: Option[String],
generationId: Int,
responseCallback: Errors => Unit): Unit = {
// 当前Coordinator处于loading状态
validateGroupStatus(groupId, ApiKeys.HEARTBEAT).foreach { error =>
if (error == Errors.COORDINATOR_LOAD_IN_PROGRESS)
// the group is still loading, so respond just blindly
@ -620,11 +693,13 @@ class GroupCoordinator(val brokerId: Int, @@ -620,11 +693,13 @@ class GroupCoordinator(val brokerId: Int,
return
}
//
groupManager.getGroup(groupId) match {
case None =>
responseCallback(Errors.UNKNOWN_MEMBER_ID)
case Some(group) => group.inLock {
// 异常情况
if (group.is(Dead)) {
// if the group is marked as dead, it means some other thread has just removed the group
// from the coordinator metadata; this is likely that the group has migrated to some other
@ -637,7 +712,10 @@ class GroupCoordinator(val brokerId: Int, @@ -637,7 +712,10 @@ class GroupCoordinator(val brokerId: Int,
responseCallback(Errors.UNKNOWN_MEMBER_ID)
} else if (generationId != group.generationId) {
responseCallback(Errors.ILLEGAL_GENERATION)
} else {
}
// 正常情况下都会标记heartbeat成功
else {
group.currentState match {
case Empty =>
responseCallback(Errors.UNKNOWN_MEMBER_ID)
@ -649,6 +727,7 @@ class GroupCoordinator(val brokerId: Int, @@ -649,6 +727,7 @@ class GroupCoordinator(val brokerId: Int,
completeAndScheduleNextHeartbeatExpiration(group, member)
responseCallback(Errors.NONE)
// PreparingRebalance返回REBALANCE_IN_PROGRESS客户端也会按正常返回处理
case PreparingRebalance =>
val member = group.get(memberId)
completeAndScheduleNextHeartbeatExpiration(group, member)
@ -928,31 +1007,40 @@ class GroupCoordinator(val brokerId: Int, @@ -928,31 +1007,40 @@ class GroupCoordinator(val brokerId: Int,
private def setAndPropagateAssignment(group: GroupMetadata, assignment: Map[String, Array[Byte]]): Unit = {
assert(group.is(CompletingRebalance))
// 更新member.assignment
group.allMemberMetadata.foreach(member => member.assignment = assignment(member.memberId))
propagateAssignment(group, Errors.NONE)
}
private def resetAndPropagateAssignmentError(group: GroupMetadata, error: Errors): Unit = {
assert(group.is(CompletingRebalance))
// group所有member分区分配方案更新为empty
group.allMemberMetadata.foreach(_.assignment = Array.empty)
// 发送给member
propagateAssignment(group, error)
}
// 分区分配方案返回给group内所有member
private def propagateAssignment(group: GroupMetadata, error: Errors): Unit = {
val (protocolType, protocolName) = if (error == Errors.NONE)
(group.protocolType, group.protocolName)
else
(None, None)
//
for (member <- group.allMemberMetadata) {
if (member.assignment.isEmpty && error == Errors.NONE) {
warn(s"Sending empty assignment to member ${member.memberId} of ${group.groupId} for generation ${group.generationId} with no errors")
}
// 调用回调函数member.awaitingSyncCallback每个消费者只会收到自己的分区分配方案
if (group.maybeInvokeSyncCallback(member, SyncGroupResult(protocolType, protocolName, member.assignment, error))) {
// reset the session timeout for members after propagating the member's assignment.
// This is because if any member's session expired while we were still awaiting either
// the leader sync group or the storage callback, its expiration will be ignored and no
// future heartbeat expectations will not be scheduled.
// 如果返回true则设置下次心跳的时间
completeAndScheduleNextHeartbeatExpiration(group, member)
}
}
@ -965,13 +1053,16 @@ class GroupCoordinator(val brokerId: Int, @@ -965,13 +1053,16 @@ class GroupCoordinator(val brokerId: Int,
completeAndScheduleNextExpiration(group, member, member.sessionTimeoutMs)
}
// 完成当前心跳并设置下次心跳的超时时间
private def completeAndScheduleNextExpiration(group: GroupMetadata, member: MemberMetadata, timeoutMs: Long): Unit = {
val memberKey = MemberKey(group.groupId, member.memberId)
// 完成本次心跳
// complete current heartbeat expectation
member.heartbeatSatisfied = true
heartbeatPurgatory.checkAndComplete(memberKey)
// 设置下次心跳超时时间timeoutMs
// reschedule the next heartbeat expiration deadline
member.heartbeatSatisfied = false
val delayedHeartbeat = new DelayedHeartbeat(this, group, member.memberId, isPending = false, timeoutMs)
@ -1003,15 +1094,23 @@ class GroupCoordinator(val brokerId: Int, @@ -1003,15 +1094,23 @@ class GroupCoordinator(val brokerId: Int,
protocols: List[(String, Array[Byte])],
group: GroupMetadata,
callback: JoinCallback): Unit = {
val member = new MemberMetadata(memberId, groupInstanceId, clientId, clientHost,
rebalanceTimeoutMs, sessionTimeoutMs, protocolType, protocols)
// 标识该成员是新成员isNew 字段与心跳设置相关联
member.isNew = true
// update the newMemberAdded flag to indicate that the join group can be further delayed
// 组状态是PreparingRebalance且generationId == 0说明是第一次进行Rebalance那么设置newMemberAdded = true
// 这个变量的作用 Kafka 为消费者组 Rebalance 流程做的一个性能优化
// 大致的思想消费者组首次进行 Rebalance Coordinator 多等待一段时间从而让更多的消费者组成员加入到组中
// 以免后来者申请入组而反复进行 Rebalance这段多等待的时间由服务端参数 group.initial.rebalance.delay.ms 设置
if (group.is(PreparingRebalance) && group.generationId == 0)
group.newMemberAdded = true
// 向消费者组添加成员
// 如果还没有选出Leader成员则设置当前成员为Leader重要操作
group.add(member, callback)
// The session timeout does not affect new members since they do not have their memberId and
@ -1020,14 +1119,18 @@ class GroupCoordinator(val brokerId: Int, @@ -1020,14 +1119,18 @@ class GroupCoordinator(val brokerId: Int,
// timeout during a long rebalance), they may simply retry which will lead to a lot of defunct
// members in the rebalance. To prevent this going on indefinitely, we timeout JoinGroup requests
// for new members. If the new member is still there, we expect it to retry.
// 设置下次心跳超期时间
completeAndScheduleNextExpiration(group, member, NewMemberJoinTimeoutMs)
if (member.isStaticMember) {
info(s"Adding new static member $groupInstanceId to group ${group.groupId} with member id $memberId.")
// 静态成员加入组
group.addStaticMember(groupInstanceId, memberId)
} else {
// 当前已经加入组则从`待决成员`中删除
group.removePendingMember(memberId)
}
// 准备rebalance
maybePrepareRebalance(group, s"Adding new member $memberId with group instance id $groupInstanceId")
}
@ -1113,12 +1216,15 @@ class GroupCoordinator(val brokerId: Int, @@ -1113,12 +1216,15 @@ class GroupCoordinator(val brokerId: Int,
protocols: List[(String, Array[Byte])],
reason: String,
callback: JoinCallback): Unit = {
// 更新组成员信息调用 GroupMetadata updateMember 方法来更新消费者组成员
group.updateMember(member, protocols, callback)
// 这一步的核心思想是将消费者组状态变更到 PreparingRebalance然后创建 DelayedJoin 对象并交由 Purgatory等待延时处理加入组操作
maybePrepareRebalance(group, reason)
}
private def maybePrepareRebalance(group: GroupMetadata, reason: String): Unit = {
group.inLock {
// 状态属于三者之一 Stable, CompletingRebalance, Empty
if (group.canRebalance)
prepareRebalance(group, reason)
}
@ -1126,6 +1232,7 @@ class GroupCoordinator(val brokerId: Int, @@ -1126,6 +1232,7 @@ class GroupCoordinator(val brokerId: Int,
// package private for testing
private[group] def prepareRebalance(group: GroupMetadata, reason: String): Unit = {
// 如果当前CompletingRebalance 清空分配方案并返回REBALANCE_IN_PROGRESS
// if any members are awaiting sync, cancel their request and have them rejoin
if (group.is(CompletingRebalance))
resetAndPropagateAssignmentError(group, Errors.REBALANCE_IN_PROGRESS)
@ -1133,6 +1240,8 @@ class GroupCoordinator(val brokerId: Int, @@ -1133,6 +1240,8 @@ class GroupCoordinator(val brokerId: Int,
// if a sync expiration is pending, cancel it.
removeSyncExpiration(group)
// 如果是Empty状态则初始化InitialDelayedJoin对象
// 如果是Stable状态则初始化DelayedJoin对象
val delayedRebalance = if (group.is(Empty))
new InitialDelayedJoin(this,
rebalancePurgatory,
@ -1143,12 +1252,14 @@ class GroupCoordinator(val brokerId: Int, @@ -1143,12 +1252,14 @@ class GroupCoordinator(val brokerId: Int,
else
new DelayedJoin(this, group, group.rebalanceTimeoutMs)
// 状态更新到PreparingRebalance
group.transitionTo(PreparingRebalance)
info(s"Preparing to rebalance group ${group.groupId} in state ${group.currentState} with old generation " +
s"${group.generationId} (${Topic.GROUP_METADATA_TOPIC_NAME}-${partitionFor(group.groupId)}) (reason: $reason)")
val groupKey = GroupJoinKey(group.groupId)
// 尝试完成加入组操作如果没有完成则设置监听延时进行加入
rebalancePurgatory.tryCompleteElseWatch(delayedRebalance, Seq(groupKey))
}
@ -1184,20 +1295,25 @@ class GroupCoordinator(val brokerId: Int, @@ -1184,20 +1295,25 @@ class GroupCoordinator(val brokerId: Int,
def onCompleteJoin(group: GroupMetadata): Unit = {
group.inLock {
// 尚未加入group的消费者
val notYetRejoinedDynamicMembers = group.notYetRejoinedMembers.filterNot(_._2.isStaticMember)
if (notYetRejoinedDynamicMembers.nonEmpty) {
info(s"Group ${group.groupId} removed dynamic members " +
s"who haven't joined: ${notYetRejoinedDynamicMembers.keySet}")
// 清理未加入组的消费者取消heartbeat
notYetRejoinedDynamicMembers.values.foreach { failedMember =>
removeHeartbeatForLeavingMember(group, failedMember)
group.remove(failedMember.memberId)
}
}
// 组的状态 == dead
if (group.is(Dead)) {
info(s"Group ${group.groupId} is dead, skipping rebalance stage")
} else if (!group.maybeElectNewJoinedLeader() && group.allMembers.nonEmpty) {
}
// 如果组成员不为空且还未选出Leader成员
else if (!group.maybeElectNewJoinedLeader() && group.allMembers.nonEmpty) {
// If all members are not rejoining, we will postpone the completion
// of rebalance preparing stage, and send out another delayed operation
// until session timeout removes all the non-responsive members.
@ -1205,8 +1321,11 @@ class GroupCoordinator(val brokerId: Int, @@ -1205,8 +1321,11 @@ class GroupCoordinator(val brokerId: Int,
rebalancePurgatory.tryCompleteElseWatch(
new DelayedJoin(this, group, group.rebalanceTimeoutMs),
Seq(GroupJoinKey(group.groupId)))
} else {
}
else {
group.initNextGeneration()
// 组为空
if (group.is(Empty)) {
info(s"Group ${group.groupId} with generation ${group.generationId} is now empty " +
s"(${Topic.GROUP_METADATA_TOPIC_NAME}-${partitionFor(group.groupId)})")
@ -1219,13 +1338,17 @@ class GroupCoordinator(val brokerId: Int, @@ -1219,13 +1338,17 @@ class GroupCoordinator(val brokerId: Int,
warn(s"Failed to write empty metadata for group ${group.groupId}: ${error.message}")
}
})
} else {
}
// 组不为空
else {
info(s"Stabilized group ${group.groupId} generation ${group.generationId} " +
s"(${Topic.GROUP_METADATA_TOPIC_NAME}-${partitionFor(group.groupId)}) with ${group.size} members")
// 遍历所有组员
// trigger the awaiting join group response callback for all the members after rebalancing
for (member <- group.allMemberMetadata) {
val joinResult = JoinGroupResult(
// 重要1members信息仅发送给leader
members = if (group.isLeader(member.memberId)) {
group.currentMemberMetadata
} else {
@ -1233,18 +1356,23 @@ class GroupCoordinator(val brokerId: Int, @@ -1233,18 +1356,23 @@ class GroupCoordinator(val brokerId: Int,
},
memberId = member.memberId,
generationId = group.generationId,
// 重要2确定分区分配策略
// 服务端只是帮忙确定了整个组的分区分配策略而分配消费分区的任务则交给了 Leader 消费者
protocolType = group.protocolType,
protocolName = group.protocolName,
leaderId = group.leaderOrNull,
error = Errors.NONE)
// 调用回调函数返回
group.maybeInvokeJoinCallback(member, joinResult)
// 完成当前心跳任务并设置下一个
completeAndScheduleNextHeartbeatExpiration(group, member)
// 标记该成员为非新成员
member.isNew = false
// pendingSyncMembers新增member
group.addPendingSyncMember(member.memberId)
}
// 调度PendingSync任务这个是啥
schedulePendingSync(group)
}
}

35
core/src/main/scala/kafka/coordinator/group/GroupMetadata.scala

@ -382,7 +382,12 @@ private[group] class GroupMetadata(val groupId: String, initialState: GroupState @@ -382,7 +382,12 @@ private[group] class GroupMetadata(val groupId: String, initialState: GroupState
def notYetRejoinedMembers = members.filter(!_._2.isAwaitingJoin).toMap
def hasAllMembersJoined = members.size == numMembersAwaitingJoin && pendingMembers.isEmpty
def hasAllMembersJoined = {
// 判断组中是否创建了所有成员的元数据对象条件有两个
// 1.组中成员元数据对象数 = 申请加入组的成员数
// 2.待决成员列表为空
members.size == numMembersAwaitingJoin && pendingMembers.isEmpty
}
def allMembers = members.keySet
@ -409,7 +414,7 @@ private[group] class GroupMetadata(val groupId: String, initialState: GroupState @@ -409,7 +414,7 @@ private[group] class GroupMetadata(val groupId: String, initialState: GroupState
def generateMemberId(clientId: String,
groupInstanceId: Option[String]): String = {
groupInstanceId match {
case None =>
case None =>rebalanceTimeoutMs
clientId + GroupMetadata.MemberIdDelimiter + UUID.randomUUID().toString
case Some(instanceId) =>
instanceId + GroupMetadata.MemberIdDelimiter + UUID.randomUUID().toString
@ -733,30 +738,46 @@ private[group] class GroupMetadata(val groupId: String, initialState: GroupState @@ -733,30 +738,46 @@ private[group] class GroupMetadata(val groupId: String, initialState: GroupState
}.toMap
}
// 处理
def removeExpiredOffsets(currentTimestamp: Long, offsetRetentionMs: Long): Map[TopicPartition, OffsetAndMetadata] = {
// 用于获取订阅分区过期的位移值
def getExpiredOffsets(baseTimestamp: CommitRecordMetadataAndOffset => Long,
// subscribedTopics是豁免名单名单中的topic不会被清理offset
subscribedTopics: Set[String] = Set.empty): Map[TopicPartition, OffsetAndMetadata] = {
// 遍历offsets中的所有分区过滤出同时满足以下2个条件的数据
// 条件1该主题分区已经完成位移提交
// 条件2该主题分区在位移主题中对应消息的存在时间超过了阈值
offsets.filter {
case (topicPartition, commitRecordMetadataAndOffset) =>
// filter提交1入参subscribedTopics是一个黑名单过滤器
!subscribedTopics.contains(topicPartition.topic()) &&
!pendingOffsetCommits.contains(topicPartition) && {
// filter条件2正在提交的pendingOffsetCommits中不包含topicPartition反之则不过期
!pendingOffsetCommits.contains(topicPartition) &&
// filter条件3过期判定
{
// 获取位移消息中的expireTimestamp时间戳表示过期时间
commitRecordMetadataAndOffset.offsetAndMetadata.expireTimestamp match {
case None =>
// expireTimestamp值为空判断是否超时
// current version with no per partition retention
currentTimestamp - baseTimestamp(commitRecordMetadataAndOffset) >= offsetRetentionMs
case Some(expireTimestamp) =>
// expireTimestamp不为空则判断当前时间和expireTimestamp
// older versions with explicit expire_timestamp field => old expiration semantics is used
currentTimestamp >= expireTimestamp
}
}
}.map {
case (topicPartition, commitRecordOffsetAndMetadata) =>
// 返回offset对象
(topicPartition, commitRecordOffsetAndMetadata.offsetAndMetadata)
}.toMap
}
// offsets中需要清理的三类场景
val expiredOffsets: Map[TopicPartition, OffsetAndMetadata] = protocolType match {
// 场景1废弃的Consumer
case Some(_) if is(Empty) =>
// no consumer exists in the group =>
// - if current state timestamp exists and retention period has passed since group became Empty,
@ -768,6 +789,7 @@ private[group] class GroupMetadata(val groupId: String, initialState: GroupState @@ -768,6 +789,7 @@ private[group] class GroupMetadata(val groupId: String, initialState: GroupState
.getOrElse(commitRecordMetadataAndOffset.offsetAndMetadata.commitTimestamp)
)
// 场景2存活的consumer
case Some(ConsumerProtocol.PROTOCOL_TYPE) if subscribedTopics.isDefined && is(Stable) =>
// consumers exist in the group and group is stable =>
// - if the group is aware of the subscribed topics and retention period had passed since the
@ -775,9 +797,10 @@ private[group] class GroupMetadata(val groupId: String, initialState: GroupState @@ -775,9 +797,10 @@ private[group] class GroupMetadata(val groupId: String, initialState: GroupState
// expired
getExpiredOffsets(
_.offsetAndMetadata.commitTimestamp,
subscribedTopics.get
subscribedTopics.get // 传入黑名单
)
// 场景3standalone消费者
case None =>
// protocolType is None => standalone (simple) consumer, that uses Kafka for offset storage only
// expire offsets with no pending offset commit that retention period has passed since their last commit
@ -790,7 +813,9 @@ private[group] class GroupMetadata(val groupId: String, initialState: GroupState @@ -790,7 +813,9 @@ private[group] class GroupMetadata(val groupId: String, initialState: GroupState
if (expiredOffsets.nonEmpty)
debug(s"Expired offsets from group '$groupId': ${expiredOffsets.keySet}")
// 清理offsets集合
offsets --= expiredOffsets.keySet
// 返回expiredOffsets
expiredOffsets
}

79
core/src/main/scala/kafka/coordinator/group/GroupMetadataManager.scala

@ -174,6 +174,7 @@ class GroupMetadataManager(brokerId: Int, @@ -174,6 +174,7 @@ class GroupMetadataManager(brokerId: Int,
def startup(retrieveGroupMetadataTopicPartitionCount: () => Int, enableMetadataExpiration: Boolean): Unit = {
groupMetadataTopicPartitionCount = retrieveGroupMetadataTopicPartitionCount()
scheduler.startup()
// 启动后台定时任务
if (enableMetadataExpiration) {
scheduler.schedule(name = "delete-expired-group-metadata",
fun = () => cleanupGroupMetadata(),
@ -241,17 +242,22 @@ class GroupMetadataManager(brokerId: Int, @@ -241,17 +242,22 @@ class GroupMetadataManager(brokerId: Int,
}
}
/**
* 保存消费者组注册信息写入磁盘`__consumer_offsets`
*/
def storeGroup(group: GroupMetadata,
groupAssignment: Map[String, Array[Byte]],
responseCallback: Errors => Unit): Unit = {
getMagic(partitionFor(group.groupId)) match {
case Some(magicValue) =>
// 构建注册消息的Key, Value
// We always use CREATE_TIME, like the producer. The conversion to LOG_APPEND_TIME (if necessary) happens automatically.
val timestampType = TimestampType.CREATE_TIME
val timestamp = time.milliseconds()
val key = GroupMetadataManager.groupMetadataKey(group.groupId)
val value = GroupMetadataManager.groupMetadataValue(group, groupAssignment, interBrokerProtocolVersion)
// 使用 Key Value 构建待写入消息集合这里的消息集合类是 MemoryRecords
val records = {
val buffer = ByteBuffer.allocate(AbstractRecords.estimateSizeInBytes(magicValue, compressionType,
Seq(new SimpleRecord(timestamp, key, value)).asJava))
@ -260,6 +266,7 @@ class GroupMetadataManager(brokerId: Int, @@ -260,6 +266,7 @@ class GroupMetadataManager(brokerId: Int,
builder.build()
}
// 计算要写入的目标分区 topic = __consumer_offsets
val groupMetadataPartition = new TopicPartition(Topic.GROUP_METADATA_TOPIC_NAME, partitionFor(group.groupId))
val groupMetadataRecords = Map(groupMetadataPartition -> records)
val generationId = group.generationId
@ -313,6 +320,8 @@ class GroupMetadataManager(brokerId: Int, @@ -313,6 +320,8 @@ class GroupMetadataManager(brokerId: Int,
responseCallback(responseError)
}
// 向__consumer_offsets主题写入消息
// 该方法就是调用 ReplicaManager appendRecords 方法将消息写入到位移主题中
appendForGroup(group, groupMetadataRecords, putCacheCallback)
case None =>
@ -338,9 +347,9 @@ class GroupMetadataManager(brokerId: Int, @@ -338,9 +347,9 @@ class GroupMetadataManager(brokerId: Int,
/**
* Store offsets by appending it to the replicated log and then inserting to cache
*/
def storeOffsets(group: GroupMetadata,
def storeOffsets(group: GroupMetadata, // 消费者组信息
consumerId: String,
offsetMetadata: immutable.Map[TopicPartition, OffsetAndMetadata],
offsetMetadata: immutable.Map[TopicPartition, OffsetAndMetadata], // offset
responseCallback: immutable.Map[TopicPartition, Errors] => Unit,
producerId: Long = RecordBatch.NO_PRODUCER_ID,
producerEpoch: Short = RecordBatch.NO_PRODUCER_EPOCH): Unit = {
@ -369,11 +378,14 @@ class GroupMetadataManager(brokerId: Int, @@ -369,11 +378,14 @@ class GroupMetadataManager(brokerId: Int,
val timestampType = TimestampType.CREATE_TIME
val timestamp = time.milliseconds()
// 构造开始
// 构造写入数据类型SimpleRecord, key, value
val records = filteredOffsetMetadata.map { case (topicPartition, offsetAndMetadata) =>
val key = GroupMetadataManager.offsetCommitKey(group.groupId, topicPartition)
val value = GroupMetadataManager.offsetCommitValue(offsetAndMetadata, interBrokerProtocolVersion)
new SimpleRecord(timestamp, key, value)
}
// 构造写入消息topic = __consumer_offsets
val offsetTopicPartition = new TopicPartition(Topic.GROUP_METADATA_TOPIC_NAME, partitionFor(group.groupId))
val buffer = ByteBuffer.allocate(AbstractRecords.estimateSizeInBytes(magicValue, compressionType, records.asJava))
@ -385,14 +397,18 @@ class GroupMetadataManager(brokerId: Int, @@ -385,14 +397,18 @@ class GroupMetadataManager(brokerId: Int,
records.foreach(builder.append)
val entries = Map(offsetTopicPartition -> builder.build())
// 构造结束
// 回调函数
// set the callback function to insert offsets into cache after log append completed
def putCacheCallback(responseStatus: Map[TopicPartition, PartitionResponse]): Unit = {
// 确保消息写入到指定位移主题分区否则抛出异常
// the append response should only contain the topics partition
if (responseStatus.size != 1 || !responseStatus.contains(offsetTopicPartition))
throw new IllegalStateException("Append status %s should only have one partition %s"
.format(responseStatus, offsetTopicPartition))
// 更新指标
// record the number of offsets committed to the log
offsetCommitsSensor.record(records.size)
@ -401,17 +417,21 @@ class GroupMetadataManager(brokerId: Int, @@ -401,17 +417,21 @@ class GroupMetadataManager(brokerId: Int,
val status = responseStatus(offsetTopicPartition)
val responseError = group.inLock {
// 写入成功
if (status.error == Errors.NONE) {
if (!group.is(Dead)) {
filteredOffsetMetadata.forKeyValue { (topicPartition, offsetAndMetadata) =>
if (isTxnOffsetCommit)
group.onTxnOffsetCommitAppend(producerId, topicPartition, CommitRecordMetadataAndOffset(Some(status.baseOffset), offsetAndMetadata))
else
// 调用GroupMetadata的onOffsetCommitAppend方法填充元数据
group.onOffsetCommitAppend(topicPartition, CommitRecordMetadataAndOffset(Some(status.baseOffset), offsetAndMetadata))
}
}
Errors.NONE
} else {
}
// 写入存在错误
else {
if (!group.is(Dead)) {
if (!group.hasPendingOffsetCommitsFromProducer(producerId))
removeProducerGroup(producerId, group.groupId)
@ -469,10 +489,13 @@ class GroupMetadataManager(brokerId: Int, @@ -469,10 +489,13 @@ class GroupMetadataManager(brokerId: Int,
group.prepareOffsetCommit(offsetMetadata)
}
}
// 写入消息到位移主题同时调用putCacheCallback方法更新消费者元数据
// 该方法就是调用 ReplicaManager appendRecords 方法将消息写入到位移主题中
appendForGroup(group, entries, putCacheCallback)
// 如果不是Coordinator
case None =>
// 返回NOT_COORDINATOR异常
val commitStatus = offsetMetadata.map { case (topicPartition, _) =>
(topicPartition, Errors.NOT_COORDINATOR)
}
@ -487,7 +510,10 @@ class GroupMetadataManager(brokerId: Int, @@ -487,7 +510,10 @@ class GroupMetadataManager(brokerId: Int,
*/
def getOffsets(groupId: String, requireStable: Boolean, topicPartitionsOpt: Option[Seq[TopicPartition]]): Map[TopicPartition, PartitionData] = {
trace("Getting offsets of %s for group %s.".format(topicPartitionsOpt.getOrElse("all partitions"), groupId))
// 从cache获取offset
val group = groupMetadataCache.get(groupId)
// 没有组数据返回空数据
if (group == null) {
topicPartitionsOpt.getOrElse(Seq.empty[TopicPartition]).map { topicPartition =>
val partitionData = new PartitionData(OffsetFetchResponse.INVALID_OFFSET,
@ -496,6 +522,7 @@ class GroupMetadataManager(brokerId: Int, @@ -496,6 +522,7 @@ class GroupMetadataManager(brokerId: Int,
}.toMap
} else {
group.inLock {
// 如果组处于Dead状态返回空数据
if (group.is(Dead)) {
topicPartitionsOpt.getOrElse(Seq.empty[TopicPartition]).map { topicPartition =>
val partitionData = new PartitionData(OffsetFetchResponse.INVALID_OFFSET,
@ -506,6 +533,7 @@ class GroupMetadataManager(brokerId: Int, @@ -506,6 +533,7 @@ class GroupMetadataManager(brokerId: Int,
val topicPartitions = topicPartitionsOpt.getOrElse(group.allOffsets.keySet)
topicPartitions.map { topicPartition =>
// 如果针对该分区当前正处于pending状态则返回空数据
if (requireStable && group.hasPendingOffsetCommitsForTopicPartition(topicPartition)) {
topicPartition -> new PartitionData(OffsetFetchResponse.INVALID_OFFSET,
Optional.empty(), "", Errors.UNSTABLE_OFFSET_COMMIT)
@ -514,6 +542,7 @@ class GroupMetadataManager(brokerId: Int, @@ -514,6 +542,7 @@ class GroupMetadataManager(brokerId: Int,
case None =>
new PartitionData(OffsetFetchResponse.INVALID_OFFSET,
Optional.empty(), "", Errors.NONE)
// 返回确定的offset
case Some(offsetAndMetadata) =>
new PartitionData(offsetAndMetadata.offset,
offsetAndMetadata.leaderEpoch, offsetAndMetadata.metadata, Errors.NONE)
@ -533,6 +562,7 @@ class GroupMetadataManager(brokerId: Int, @@ -533,6 +562,7 @@ class GroupMetadataManager(brokerId: Int,
val topicPartition = new TopicPartition(Topic.GROUP_METADATA_TOPIC_NAME, offsetsPartition)
info(s"Scheduling loading of offsets and group metadata from $topicPartition for epoch $coordinatorEpoch")
val startTimeMs = time.milliseconds()
// 异步调用loadGroupsAndOffsets方法
scheduler.schedule(topicPartition.toString, () => loadGroupsAndOffsets(topicPartition, coordinatorEpoch, onGroupLoaded, startTimeMs))
}
@ -570,18 +600,27 @@ class GroupMetadataManager(brokerId: Int, @@ -570,18 +600,27 @@ class GroupMetadataManager(brokerId: Int,
}
private def doLoadGroupsAndOffsets(topicPartition: TopicPartition, onGroupLoaded: GroupMetadata => Unit): Unit = {
// 获取__consumer_offsets的LEO
def logEndOffset: Long = replicaManager.getLogEndOffset(topicPartition).getOrElse(-1L)
replicaManager.getLog(topicPartition) match {
case None =>
warn(s"Attempted to load offsets and group metadata from $topicPartition, but found no log")
// 获取本地副本日志从哪个地方开始获取
case Some(log) =>
// 四个重要集合
// loadedOffsets已完成位移值加载的分区列表
val loadedOffsets = mutable.Map[GroupTopicPartition, CommitRecordMetadataAndOffset]()
// pendingOffsets位移值正在加载中的分区列表只用于 Kafka 事务
val pendingOffsets = mutable.Map[Long, mutable.Map[GroupTopicPartition, CommitRecordMetadataAndOffset]]()
// loadedGroups已完成组信息加载的消费者组列表
val loadedGroups = mutable.Map[String, GroupMetadata]()
// removedGroups待移除的消费者组列表
val removedGroups = mutable.Set[String]()
// 第一部分读取位移主题
// buffer may not be needed if records are read from memory
var buffer = ByteBuffer.allocate(0)
@ -592,6 +631,7 @@ class GroupMetadataManager(brokerId: Int, @@ -592,6 +631,7 @@ class GroupMetadataManager(brokerId: Int,
var readAtLeastOneRecord = true
while (currOffset < logEndOffset && readAtLeastOneRecord && !shuttingDown.get()) {
// 从currOffset开始读取log一次最多loadBufferSize条
val fetchDataInfo = log.read(currOffset,
maxLength = config.loadBufferSize,
isolation = FetchLogEnd,
@ -600,7 +640,9 @@ class GroupMetadataManager(brokerId: Int, @@ -600,7 +640,9 @@ class GroupMetadataManager(brokerId: Int,
readAtLeastOneRecord = fetchDataInfo.records.sizeInBytes > 0
val memRecords = (fetchDataInfo.records: @unchecked) match {
// MemoryRecords 直接返回
case records: MemoryRecords => records
// FileRecords 读取到buffer并转化为MemoryRecords
case fileRecords: FileRecords =>
val sizeInBytes = fileRecords.sizeInBytes
val bytesNeeded = Math.max(config.loadBufferSize, sizeInBytes)
@ -620,7 +662,9 @@ class GroupMetadataManager(brokerId: Int, @@ -620,7 +662,9 @@ class GroupMetadataManager(brokerId: Int,
MemoryRecords.readableRecords(buffer)
}
// 第二部分读取分区数据填充四个集合
memRecords.batches.forEach { batch =>
// 处理事务消息 TxnOffsetCommit
val isTxnOffsetCommit = batch.isTransactional
if (batch.isControlBatch) {
val recordIterator = batch.iterator
@ -637,7 +681,9 @@ class GroupMetadataManager(brokerId: Int, @@ -637,7 +681,9 @@ class GroupMetadataManager(brokerId: Int,
}
pendingOffsets.remove(batch.producerId)
}
} else {
}
// 处理一般消息
else {
var batchBaseOffset: Option[Long] = None
for (record <- batch.asScala) {
require(record.hasKey, "Group metadata/offset entry key should not be null")
@ -651,18 +697,23 @@ class GroupMetadataManager(brokerId: Int, @@ -651,18 +697,23 @@ class GroupMetadataManager(brokerId: Int,
// load offset
val groupTopicPartition = offsetKey.key
// 如果消息没有value也就是墓碑消息
if (!record.hasValue) {
if (isTxnOffsetCommit)
pendingOffsets(batch.producerId).remove(groupTopicPartition)
else
// 由于是墓碑消息所以这个主题分区对应的消息已经过期
// 之前如果加载了应该从已完成位移值加载的分区列表中移除
loadedOffsets.remove(groupTopicPartition)
} else {
val offsetAndMetadata = GroupMetadataManager.readOffsetMessageValue(record.value)
if (isTxnOffsetCommit)
pendingOffsets(batch.producerId).put(groupTopicPartition, CommitRecordMetadataAndOffset(batchBaseOffset, offsetAndMetadata))
else
else {
// 更新消息
loadedOffsets.put(groupTopicPartition, CommitRecordMetadataAndOffset(batchBaseOffset, offsetAndMetadata))
}
}
case groupMetadataKey: GroupMetadataKey =>
// load group metadata
@ -685,6 +736,10 @@ class GroupMetadataManager(brokerId: Int, @@ -685,6 +736,10 @@ class GroupMetadataManager(brokerId: Int,
}
}
// 第三步处理4个集合
// 对loadedOffsets 进行分组将完成信息加载的组对应的消费者组位移值保存到groupOffsets
// 将有消费者组消费位移却没有消费者组信息的保存到emptyGroupOffsets
val (groupOffsets, emptyGroupOffsets) = loadedOffsets
.groupBy(_._1.group)
.map { case (k, v) =>
@ -757,14 +812,17 @@ class GroupMetadataManager(brokerId: Int, @@ -757,14 +812,17 @@ class GroupMetadataManager(brokerId: Int,
*
* @param offsetsPartition Groups belonging to this partition of the offsets topic will be deleted from the cache.
*/
// 当Broker卸任某些消费者组的 Coordinator 角色时它需要将这些消费者组的信息从 groupMetadataCache 中全部移除掉
def removeGroupsForPartition(offsetsPartition: Int,
coordinatorEpoch: Option[Int],
onGroupUnloaded: GroupMetadata => Unit): Unit = {
val topicPartition = new TopicPartition(Topic.GROUP_METADATA_TOPIC_NAME, offsetsPartition)
info(s"Scheduling unloading of offsets and group metadata from $topicPartition")
// 定时任务执行
scheduler.schedule(topicPartition.toString, () => removeGroupsAndOffsets(topicPartition, coordinatorEpoch, onGroupUnloaded))
}
// 清理缓存ownedPartitions, loadingPartitions, groupMetadataCache, openGroupsForProducer
private [group] def removeGroupsAndOffsets(topicPartition: TopicPartition,
coordinatorEpoch: Option[Int],
onGroupUnloaded: GroupMetadata => Unit): Unit = {
@ -780,11 +838,12 @@ class GroupMetadataManager(brokerId: Int, @@ -780,11 +838,12 @@ class GroupMetadataManager(brokerId: Int,
// to prevent coordinator's check-and-get-group race condition
ownedPartitions.remove(offsetsPartition)
loadingPartitions.remove(offsetsPartition)
// 清理 groupMetadataCache
for (group <- groupMetadataCache.values) {
if (partitionFor(group.groupId) == offsetsPartition) {
onGroupUnloaded(group)
groupMetadataCache.remove(group.groupId, group)
// 清理openGroupsForProducer
removeGroupFromAllProducers(group.groupId)
numGroupsRemoved += 1
numOffsetsRemoved += group.numOffsets
@ -823,7 +882,9 @@ class GroupMetadataManager(brokerId: Int, @@ -823,7 +882,9 @@ class GroupMetadataManager(brokerId: Int,
// visible for testing
private[group] def cleanupGroupMetadata(): Unit = {
val currentTimestamp = time.milliseconds()
// 清理过期的offset数据删除内存数据磁盘写入Tombstone墓碑消息
val numOffsetsRemoved = cleanupGroupMetadata(groupMetadataCache.values, group => {
// config.offsetsRetentionMs配置项`offsets.retention.minutes`
group.removeExpiredOffsets(currentTimestamp, config.offsetsRetentionMs)
})
offsetExpiredSensor.record(numOffsetsRemoved)
@ -844,6 +905,7 @@ class GroupMetadataManager(brokerId: Int, @@ -844,6 +905,7 @@ class GroupMetadataManager(brokerId: Int,
groups.foreach { group =>
val groupId = group.groupId
val (removedOffsets, groupIsDead, generation) = group.inLock {
// 调用函数清理缓存并返回需要写入tombstone数据
val removedOffsets = selector(group)
if (group.is(Empty) && !group.hasOffsets) {
info(s"Group $groupId transitioned to Dead in generation ${group.generationId}")
@ -860,11 +922,13 @@ class GroupMetadataManager(brokerId: Int, @@ -860,11 +922,13 @@ class GroupMetadataManager(brokerId: Int,
val timestampType = TimestampType.CREATE_TIME
val timestamp = time.milliseconds()
// 构造tombstone数据
replicaManager.onlinePartition(appendPartition).foreach { partition =>
val tombstones = ArrayBuffer.empty[SimpleRecord]
removedOffsets.forKeyValue { (topicPartition, offsetAndMetadata) =>
trace(s"Removing expired/deleted offset and metadata for $groupId, $topicPartition: $offsetAndMetadata")
val commitKey = GroupMetadataManager.offsetCommitKey(groupId, topicPartition)
// value=null表示tombstone
tombstones += new SimpleRecord(timestamp, commitKey, null)
}
trace(s"Marked ${removedOffsets.size} offsets in $appendPartition for deletion.")
@ -880,6 +944,7 @@ class GroupMetadataManager(brokerId: Int, @@ -880,6 +944,7 @@ class GroupMetadataManager(brokerId: Int,
trace(s"Group $groupId removed from the metadata cache and marked for deletion in $appendPartition.")
}
// 写入磁盘
if (tombstones.nonEmpty) {
try {
// do not need to require acks since even if the tombstone is lost,

6
core/src/main/scala/kafka/server/DelayedOperation.scala

@ -67,8 +67,10 @@ abstract class DelayedOperation(override val delayMs: Long, @@ -67,8 +67,10 @@ abstract class DelayedOperation(override val delayMs: Long,
*/
def forceComplete(): Boolean = {
if (completed.compareAndSet(false, true)) {
// 取消timeout timer
// cancel the timeout timer
cancel()
// 执行onComplete方法
onComplete()
true
} else {
@ -106,6 +108,7 @@ abstract class DelayedOperation(override val delayMs: Long, @@ -106,6 +108,7 @@ abstract class DelayedOperation(override val delayMs: Long,
* @param f else function to be executed after first tryComplete returns false
* @return result of tryComplete
*/
// 线程安全版本的tryComplete最多尝试执行2次tryComplete第二次之前执行入参函数f
private[server] def safeTryCompleteOrElse(f: => Unit): Boolean = inLock(lock) {
if (tryComplete()) true
else {
@ -118,6 +121,7 @@ abstract class DelayedOperation(override val delayMs: Long, @@ -118,6 +121,7 @@ abstract class DelayedOperation(override val delayMs: Long,
/**
* Thread-safe variant of tryComplete()
*/
// 线程安全版本的tryComplete
private[server] def safeTryComplete(): Boolean = inLock(lock)(tryComplete())
/*
@ -231,6 +235,8 @@ final class DelayedOperationPurgatory[T <: DelayedOperation](purgatoryName: Stri @@ -231,6 +235,8 @@ final class DelayedOperationPurgatory[T <: DelayedOperation](purgatoryName: Stri
// To avoid the above scenario, we recommend DelayedOperationPurgatory.checkAndComplete() be called without holding
// any exclusive lock. Since DelayedOperationPurgatory.checkAndComplete() completes delayed operations asynchronously,
// holding a exclusive lock to make the call is often unnecessary.
// 先执行operation.safeTryCompleteOrElse方法
if (operation.safeTryCompleteOrElse {
watchKeys.foreach(key => watchForOperation(key, operation))
if (watchKeys.nonEmpty) estimatedTotalOperations.incrementAndGet()

38
core/src/main/scala/kafka/server/KafkaApis.scala

@ -1318,17 +1318,27 @@ class KafkaApis(val requestChannel: RequestChannel, @@ -1318,17 +1318,27 @@ class KafkaApis(val requestChannel: RequestChannel,
requestHelper.sendResponseMaybeThrottle(request, createResponse)
}
// 处理FIND_COORDINATOR请求分为两类
// 消费者查找GroupCoordinator
// 事务查找TransactionCoordinator
def handleFindCoordinatorRequest(request: RequestChannel.Request): Unit = {
val findCoordinatorRequest = request.body[FindCoordinatorRequest]
// 消费者鉴权失败1
if (findCoordinatorRequest.data.keyType == CoordinatorType.GROUP.id &&
!authHelper.authorize(request.context, DESCRIBE, GROUP, findCoordinatorRequest.data.key))
!authHelper.authorize(request.context, DESCRIBE, GROUP, findCoordinatorRequest.data.key)) {
requestHelper.sendErrorResponseMaybeThrottle(request, Errors.GROUP_AUTHORIZATION_FAILED.exception)
}
// 事务鉴权失败2
else if (findCoordinatorRequest.data.keyType == CoordinatorType.TRANSACTION.id &&
!authHelper.authorize(request.context, DESCRIBE, TRANSACTIONAL_ID, findCoordinatorRequest.data.key))
!authHelper.authorize(request.context, DESCRIBE, TRANSACTIONAL_ID, findCoordinatorRequest.data.key)) {
requestHelper.sendErrorResponseMaybeThrottle(request, Errors.TRANSACTIONAL_ID_AUTHORIZATION_FAILED.exception)
}
//
else {
// 获取分区
val (partition, internalTopicName) = CoordinatorType.forId(findCoordinatorRequest.data.keyType) match {
//
case CoordinatorType.GROUP =>
(groupCoordinator.partitionFor(findCoordinatorRequest.data.key), GROUP_METADATA_TOPIC_NAME)
@ -1336,6 +1346,7 @@ class KafkaApis(val requestChannel: RequestChannel, @@ -1336,6 +1346,7 @@ class KafkaApis(val requestChannel: RequestChannel,
(txnCoordinator.partitionFor(findCoordinatorRequest.data.key), TRANSACTION_STATE_TOPIC_NAME)
}
// 获取topic对应的元信息
val topicMetadata = metadataCache.getTopicMetadata(Set(internalTopicName), request.context.listenerName)
def createFindCoordinatorResponse(error: Errors,
node: Node,
@ -1350,16 +1361,21 @@ class KafkaApis(val requestChannel: RequestChannel, @@ -1350,16 +1361,21 @@ class KafkaApis(val requestChannel: RequestChannel,
.setThrottleTimeMs(requestThrottleMs))
}
// coordinator不可用
if (topicMetadata.headOption.isEmpty) {
val controllerMutationQuota = quotas.controllerMutation.newPermissiveQuotaFor(request)
autoTopicCreationManager.createTopics(Seq(internalTopicName).toSet, controllerMutationQuota)
requestHelper.sendResponseMaybeThrottle(request, requestThrottleMs => createFindCoordinatorResponse(
Errors.COORDINATOR_NOT_AVAILABLE, Node.noNode, requestThrottleMs))
} else {
}
//
else {
def createResponse(requestThrottleMs: Int): AbstractResponse = {
val responseBody = if (topicMetadata.head.errorCode != Errors.NONE.code) {
createFindCoordinatorResponse(Errors.COORDINATOR_NOT_AVAILABLE, Node.noNode, requestThrottleMs)
} else {
// 寻找coordinator节点获取消费者组所属分区的leader所在的broker node
// 查找路径group.id => partition => broker node
val coordinatorEndpoint = topicMetadata.head.partitions.asScala
.find(_.partitionIndex == partition)
.filter(_.leaderId != MetadataResponse.NO_LEADER_ID)
@ -1543,6 +1559,7 @@ class KafkaApis(val requestChannel: RequestChannel, @@ -1543,6 +1559,7 @@ class KafkaApis(val requestChannel: RequestChannel,
))
}
// 前置校验
if (syncGroupRequest.data.groupInstanceId != null && config.interBrokerProtocolVersion < KAFKA_2_3_IV0) {
// Only enable static membership when IBP >= 2.3, because it is not safe for the broker to use the static member logic
// until we are sure that all brokers support it. If static group being loaded by an older coordinator, it will discard
@ -1553,7 +1570,10 @@ class KafkaApis(val requestChannel: RequestChannel, @@ -1553,7 +1570,10 @@ class KafkaApis(val requestChannel: RequestChannel,
sendResponseCallback(SyncGroupResult(Errors.INCONSISTENT_GROUP_PROTOCOL))
} else if (!authHelper.authorize(request.context, READ, GROUP, syncGroupRequest.data.groupId)) {
sendResponseCallback(SyncGroupResult(Errors.GROUP_AUTHORIZATION_FAILED))
} else {
}
// 校验通过
else {
val assignmentMap = immutable.Map.newBuilder[String, Array[Byte]]
syncGroupRequest.data.assignments.forEach { assignment =>
assignmentMap += (assignment.memberId -> assignment.assignment)
@ -1598,6 +1618,7 @@ class KafkaApis(val requestChannel: RequestChannel, @@ -1598,6 +1618,7 @@ class KafkaApis(val requestChannel: RequestChannel,
})
}
// Broker处理Heartbeat请求
def handleHeartbeatRequest(request: RequestChannel.Request): Unit = {
val heartbeatRequest = request.body[HeartbeatRequest]
@ -1615,18 +1636,23 @@ class KafkaApis(val requestChannel: RequestChannel, @@ -1615,18 +1636,23 @@ class KafkaApis(val requestChannel: RequestChannel,
requestHelper.sendResponseMaybeThrottle(request, createResponse)
}
// static member支持从2.3开始
if (heartbeatRequest.data.groupInstanceId != null && config.interBrokerProtocolVersion < KAFKA_2_3_IV0) {
// Only enable static membership when IBP >= 2.3, because it is not safe for the broker to use the static member logic
// until we are sure that all brokers support it. If static group being loaded by an older coordinator, it will discard
// the group.instance.id field, so static members could accidentally become "dynamic", which leads to wrong states.
sendResponseCallback(Errors.UNSUPPORTED_VERSION)
} else if (!authHelper.authorize(request.context, READ, GROUP, heartbeatRequest.data.groupId)) {
}
// 认证失败
else if (!authHelper.authorize(request.context, READ, GROUP, heartbeatRequest.data.groupId)) {
requestHelper.sendResponseMaybeThrottle(request, requestThrottleMs =>
new HeartbeatResponse(
new HeartbeatResponseData()
.setThrottleTimeMs(requestThrottleMs)
.setErrorCode(Errors.GROUP_AUTHORIZATION_FAILED.code)))
} else {
}
// 交给GroupCoordinator处理Heartbeat
else {
// let the coordinator to handle heartbeat
groupCoordinator.handleHeartbeat(
heartbeatRequest.data.groupId,

12
core/src/main/scala/kafka/utils/timer/Timer.scala

@ -76,7 +76,9 @@ class SystemTimer(executorName: String, @@ -76,7 +76,9 @@ class SystemTimer(executorName: String,
private[this] val readLock = readWriteLock.readLock()
private[this] val writeLock = readWriteLock.writeLock()
// 增加任务
def add(timerTask: TimerTask): Unit = {
// 增加任务使用共享锁readLock
readLock.lock()
try {
addTimerTaskEntry(new TimerTaskEntry(timerTask, timerTask.delayMs + Time.SYSTEM.hiResClockMs))
@ -86,6 +88,8 @@ class SystemTimer(executorName: String, @@ -86,6 +88,8 @@ class SystemTimer(executorName: String,
}
private def addTimerTaskEntry(timerTaskEntry: TimerTaskEntry): Unit = {
// 增加任务
// 失败如果因为任务到期直接提交线程池执行
if (!timingWheel.add(timerTaskEntry)) {
// Already expired or cancelled
if (!timerTaskEntry.cancelled)
@ -98,13 +102,21 @@ class SystemTimer(executorName: String, @@ -98,13 +102,21 @@ class SystemTimer(executorName: String,
* waits up to timeoutMs before giving up.
*/
def advanceClock(timeoutMs: Long): Boolean = {
// DelayQueue中获取头节点最多等待timeoutMs
var bucket = delayQueue.poll(timeoutMs, TimeUnit.MILLISECONDS)
if (bucket != null) {
// 时间轮前进使用writeLock
writeLock.lock()
try {
while (bucket != null) {
// 移动时间轮
timingWheel.advanceClock(bucket.getExpiration)
// 既然被pollbucket中的Task可能过期
// 如果过期则线程池执行
// 如果未过期则重新放入时间轮完成降级
bucket.flush(addTimerTaskEntry)
// 非阻塞地取出任务将当前时点所有过期的 bucket 全部取出
bucket = delayQueue.poll()
}
} finally {

1
core/src/main/scala/kafka/utils/timer/TimerTaskList.scala

@ -59,6 +59,7 @@ private[timer] class TimerTaskList(taskCounter: AtomicInteger) extends Delayed { @@ -59,6 +59,7 @@ private[timer] class TimerTaskList(taskCounter: AtomicInteger) extends Delayed {
}
}
// 插入链表的结尾
// Add a timer task entry to this list
def add(timerTaskEntry: TimerTaskEntry): Unit = {
var done = false

27
core/src/main/scala/kafka/utils/timer/TimingWheel.scala

@ -98,19 +98,29 @@ import java.util.concurrent.atomic.AtomicInteger @@ -98,19 +98,29 @@ import java.util.concurrent.atomic.AtomicInteger
*/
@nonthreadsafe
private[timer] class TimingWheel(tickMs: Long, wheelSize: Int, startMs: Long, taskCounter: AtomicInteger, queue: DelayQueue[TimerTaskList]) {
// tickMs时间轮每个格子的时间跨度
// wheelSize时间轮的格子数量
// interval时间轮的整体跨度
private[this] val interval = tickMs * wheelSize
// buckets, size = wheelSize, 每个节点都是TimerTaskList
private[this] val buckets = Array.tabulate[TimerTaskList](wheelSize) { _ => new TimerTaskList(taskCounter) }
// currentTime表盘指针表示当前时间轮所处的时间这里对时间做了调整表示小于当前时间的最大时间跨度的整数倍
// 假设当前时间戳为 123 毫秒时间轮每格跨度为 20 ms那么 currentTimer 就是小于 123 20 的整数倍 120 ms
// currentTime 将整个时间轮划分为到期部分和未到期部分currentTime 当前执行的时间格也属于到期部分表示刚好到期
// 需要处理当前时间格对应的 TimerTaskList 中的所有任务
private[this] var currentTime = startMs - (startMs % tickMs) // rounding down to multiple of tickMs
// overflowWheel can potentially be updated and read by two concurrent threads through add().
// Therefore, it needs to be volatile due to the issue of Double-Checked Locking pattern with JVM
// 上层时间轮对象不一定存在按需创建
@volatile private[this] var overflowWheel: TimingWheel = null
private[this] def addOverflowWheel(): Unit = {
synchronized {
if (overflowWheel == null) {
// parent.tickMs = this.interval
overflowWheel = new TimingWheel(
tickMs = interval,
wheelSize = wheelSize,
@ -122,7 +132,9 @@ private[timer] class TimingWheel(tickMs: Long, wheelSize: Int, startMs: Long, ta @@ -122,7 +132,9 @@ private[timer] class TimingWheel(tickMs: Long, wheelSize: Int, startMs: Long, ta
}
}
// 时间轮增加任务
def add(timerTaskEntry: TimerTaskEntry): Boolean = {
// Task的过期时间戳
val expiration = timerTaskEntry.expirationMs
if (timerTaskEntry.cancelled) {
@ -131,12 +143,16 @@ private[timer] class TimingWheel(tickMs: Long, wheelSize: Int, startMs: Long, ta @@ -131,12 +143,16 @@ private[timer] class TimingWheel(tickMs: Long, wheelSize: Int, startMs: Long, ta
} else if (expiration < currentTime + tickMs) {
// Already expired
false
} else if (expiration < currentTime + interval) {
}
// 当前时间轮可容纳
else if (expiration < currentTime + interval) {
// 定位到 bucket并追加到 TimerTaskList
// Put in its own bucket
val virtualId = expiration / tickMs
val bucket = buckets((virtualId % wheelSize.toLong).toInt)
bucket.add(timerTaskEntry)
// 更新bucket.expiration并入列queue
// Set the bucket expiration time
if (bucket.setExpiration(virtualId * tickMs)) {
// The bucket needs to be enqueued because it was an expired bucket
@ -147,18 +163,23 @@ private[timer] class TimingWheel(tickMs: Long, wheelSize: Int, startMs: Long, ta @@ -147,18 +163,23 @@ private[timer] class TimingWheel(tickMs: Long, wheelSize: Int, startMs: Long, ta
queue.offer(bucket)
}
true
} else {
}
// 当前时间轮不可容纳放入parent时间轮
else {
// Out of the interval. Put it into the parent timer
if (overflowWheel == null) addOverflowWheel()
overflowWheel.add(timerTaskEntry)
}
}
// 时间轮前进
// Try to advance the clock
def advanceClock(timeMs: Long): Unit = {
if (timeMs >= currentTime + tickMs) {
// 更新currentTime指针
currentTime = timeMs - (timeMs % tickMs)
// 级联调用parent
// Try to advance the clock of the overflow wheel if present
if (overflowWheel != null) overflowWheel.advanceClock(currentTime)
}

Loading…
Cancel
Save