diff --git a/core/src/main/scala/kafka/coordinator/group/GroupCoordinator.scala b/core/src/main/scala/kafka/coordinator/group/GroupCoordinator.scala index 0be2a48867e..64884ec5dbb 100644 --- a/core/src/main/scala/kafka/coordinator/group/GroupCoordinator.scala +++ b/core/src/main/scala/kafka/coordinator/group/GroupCoordinator.scala @@ -1110,24 +1110,37 @@ class GroupCoordinator(val brokerId: Int, def tryCompleteHeartbeat(group: GroupMetadata, memberId: String, isPending: Boolean, heartbeatDeadline: Long, forceComplete: () => Boolean) = { group.inLock { - if (isPending) { + // The group has been unloaded and invalid, we should complete the heartbeat. + if (group.is(Dead)) { + forceComplete() + } else if (isPending) { // complete the heartbeat if the member has joined the group if (group.has(memberId)) { forceComplete() } else false - } - else { - val member = group.get(memberId) - if (member.shouldKeepAlive(heartbeatDeadline) || member.isLeaving) { + } else { + if (shouldCompleteNonPendingHeartbeat(group, memberId, heartbeatDeadline)) { forceComplete() } else false } } } + def shouldCompleteNonPendingHeartbeat(group: GroupMetadata, memberId: String, heartbeatDeadline: Long): Boolean = { + if (group.has(memberId)) { + val member = group.get(memberId) + member.shouldKeepAlive(heartbeatDeadline) || member.isLeaving + } else { + info(s"Member id $memberId was not found in ${group.groupId} during heartbeat expiration.") + false + } + } + def onExpireHeartbeat(group: GroupMetadata, memberId: String, isPending: Boolean, heartbeatDeadline: Long): Unit = { group.inLock { - if (isPending) { + if (group.is(Dead)) { + info(s"Received notification of heartbeat expiration for member $memberId after group ${group.groupId} had already been unloaded or deleted.") + } else if (isPending) { info(s"Pending member $memberId in group ${group.groupId} has been removed after session timeout expiration.") removePendingMemberAndUpdateGroup(group, memberId) } else if (!group.has(memberId)) { diff --git a/core/src/test/scala/unit/kafka/coordinator/group/GroupCoordinatorTest.scala b/core/src/test/scala/unit/kafka/coordinator/group/GroupCoordinatorTest.scala index 9cc0d3a18ab..91a963d7f14 100644 --- a/core/src/test/scala/unit/kafka/coordinator/group/GroupCoordinatorTest.scala +++ b/core/src/test/scala/unit/kafka/coordinator/group/GroupCoordinatorTest.scala @@ -1188,15 +1188,15 @@ class GroupCoordinatorTest { EasyMock.reset(replicaManager) val followerId = followerJoinGroupResult.memberId - val follwerSyncGroupResult = syncGroupFollower(groupId, leaderJoinGroupResult.generationId, followerId) - assertEquals(Errors.NONE, follwerSyncGroupResult._2) + val followerSyncGroupResult = syncGroupFollower(groupId, leaderJoinGroupResult.generationId, followerId) + assertEquals(Errors.NONE, followerSyncGroupResult._2) assertTrue(getGroup(groupId).is(Stable)) new RebalanceResult(newGeneration, leaderId, leaderSyncGroupResult._1, followerId, - follwerSyncGroupResult._1) + followerSyncGroupResult._1) } private def checkJoinGroupResult(joinGroupResult: JoinGroupResult, @@ -3101,6 +3101,31 @@ class GroupCoordinatorTest { assertEquals(Errors.NONE, thirdResult.error) } + @Test + def testCompleteHeartbeatWithGroupDead(): Unit = { + val rebalanceResult = staticMembersJoinAndRebalance(leaderInstanceId, followerInstanceId) + EasyMock.reset(replicaManager) + heartbeat(groupId, rebalanceResult.leaderId, rebalanceResult.generation) + val group = getGroup(groupId) + group.transitionTo(Dead) + val leaderMemberId = rebalanceResult.leaderId + assertTrue(groupCoordinator.tryCompleteHeartbeat(group, leaderMemberId, false, DefaultSessionTimeout, () => true)) + groupCoordinator.onExpireHeartbeat(group, leaderMemberId, false, DefaultSessionTimeout) + assertTrue(group.has(leaderMemberId)) + } + + @Test + def testCompleteHeartbeatWithMemberAlreadyRemoved(): Unit = { + val rebalanceResult = staticMembersJoinAndRebalance(leaderInstanceId, followerInstanceId) + EasyMock.reset(replicaManager) + heartbeat(groupId, rebalanceResult.leaderId, rebalanceResult.generation) + val group = getGroup(groupId) + val leaderMemberId = rebalanceResult.leaderId + group.remove(leaderMemberId) + assertFalse(groupCoordinator.tryCompleteHeartbeat(group, leaderMemberId, false, DefaultSessionTimeout, () => true)) + groupCoordinator.onExpireHeartbeat(group, leaderMemberId, false, DefaultSessionTimeout) + } + private def getGroup(groupId: String): GroupMetadata = { val groupOpt = groupCoordinator.groupManager.getGroup(groupId) assertTrue(groupOpt.isDefined)