Browse Source

KAFKA-8896: Check group state before completing delayed heartbeat (#7377)

This is a defensive fix for KAFKA-8896, which would cause group coordinator crash when the heartbeat member was not found.
pull/7431/head
Boyang Chen 5 years ago committed by David Arthur
parent
commit
270e610915
  1. 25
      core/src/main/scala/kafka/coordinator/group/GroupCoordinator.scala
  2. 31
      core/src/test/scala/unit/kafka/coordinator/group/GroupCoordinatorTest.scala

25
core/src/main/scala/kafka/coordinator/group/GroupCoordinator.scala

@ -1110,24 +1110,37 @@ class GroupCoordinator(val brokerId: Int, @@ -1110,24 +1110,37 @@ class GroupCoordinator(val brokerId: Int,
def tryCompleteHeartbeat(group: GroupMetadata, memberId: String, isPending: Boolean, heartbeatDeadline: Long, forceComplete: () => Boolean) = {
group.inLock {
if (isPending) {
// The group has been unloaded and invalid, we should complete the heartbeat.
if (group.is(Dead)) {
forceComplete()
} else if (isPending) {
// complete the heartbeat if the member has joined the group
if (group.has(memberId)) {
forceComplete()
} else false
}
else {
val member = group.get(memberId)
if (member.shouldKeepAlive(heartbeatDeadline) || member.isLeaving) {
} else {
if (shouldCompleteNonPendingHeartbeat(group, memberId, heartbeatDeadline)) {
forceComplete()
} else false
}
}
}
def shouldCompleteNonPendingHeartbeat(group: GroupMetadata, memberId: String, heartbeatDeadline: Long): Boolean = {
if (group.has(memberId)) {
val member = group.get(memberId)
member.shouldKeepAlive(heartbeatDeadline) || member.isLeaving
} else {
info(s"Member id $memberId was not found in ${group.groupId} during heartbeat expiration.")
false
}
}
def onExpireHeartbeat(group: GroupMetadata, memberId: String, isPending: Boolean, heartbeatDeadline: Long): Unit = {
group.inLock {
if (isPending) {
if (group.is(Dead)) {
info(s"Received notification of heartbeat expiration for member $memberId after group ${group.groupId} had already been unloaded or deleted.")
} else if (isPending) {
info(s"Pending member $memberId in group ${group.groupId} has been removed after session timeout expiration.")
removePendingMemberAndUpdateGroup(group, memberId)
} else if (!group.has(memberId)) {

31
core/src/test/scala/unit/kafka/coordinator/group/GroupCoordinatorTest.scala

@ -1188,15 +1188,15 @@ class GroupCoordinatorTest { @@ -1188,15 +1188,15 @@ class GroupCoordinatorTest {
EasyMock.reset(replicaManager)
val followerId = followerJoinGroupResult.memberId
val follwerSyncGroupResult = syncGroupFollower(groupId, leaderJoinGroupResult.generationId, followerId)
assertEquals(Errors.NONE, follwerSyncGroupResult._2)
val followerSyncGroupResult = syncGroupFollower(groupId, leaderJoinGroupResult.generationId, followerId)
assertEquals(Errors.NONE, followerSyncGroupResult._2)
assertTrue(getGroup(groupId).is(Stable))
new RebalanceResult(newGeneration,
leaderId,
leaderSyncGroupResult._1,
followerId,
follwerSyncGroupResult._1)
followerSyncGroupResult._1)
}
private def checkJoinGroupResult(joinGroupResult: JoinGroupResult,
@ -3101,6 +3101,31 @@ class GroupCoordinatorTest { @@ -3101,6 +3101,31 @@ class GroupCoordinatorTest {
assertEquals(Errors.NONE, thirdResult.error)
}
@Test
def testCompleteHeartbeatWithGroupDead(): Unit = {
val rebalanceResult = staticMembersJoinAndRebalance(leaderInstanceId, followerInstanceId)
EasyMock.reset(replicaManager)
heartbeat(groupId, rebalanceResult.leaderId, rebalanceResult.generation)
val group = getGroup(groupId)
group.transitionTo(Dead)
val leaderMemberId = rebalanceResult.leaderId
assertTrue(groupCoordinator.tryCompleteHeartbeat(group, leaderMemberId, false, DefaultSessionTimeout, () => true))
groupCoordinator.onExpireHeartbeat(group, leaderMemberId, false, DefaultSessionTimeout)
assertTrue(group.has(leaderMemberId))
}
@Test
def testCompleteHeartbeatWithMemberAlreadyRemoved(): Unit = {
val rebalanceResult = staticMembersJoinAndRebalance(leaderInstanceId, followerInstanceId)
EasyMock.reset(replicaManager)
heartbeat(groupId, rebalanceResult.leaderId, rebalanceResult.generation)
val group = getGroup(groupId)
val leaderMemberId = rebalanceResult.leaderId
group.remove(leaderMemberId)
assertFalse(groupCoordinator.tryCompleteHeartbeat(group, leaderMemberId, false, DefaultSessionTimeout, () => true))
groupCoordinator.onExpireHeartbeat(group, leaderMemberId, false, DefaultSessionTimeout)
}
private def getGroup(groupId: String): GroupMetadata = {
val groupOpt = groupCoordinator.groupManager.getGroup(groupId)
assertTrue(groupOpt.isDefined)

Loading…
Cancel
Save