diff --git a/core/src/main/scala/kafka/coordinator/group/GroupCoordinator.scala b/core/src/main/scala/kafka/coordinator/group/GroupCoordinator.scala index d9d120c5ec3..42bc3c3f3c6 100644 --- a/core/src/main/scala/kafka/coordinator/group/GroupCoordinator.scala +++ b/core/src/main/scala/kafka/coordinator/group/GroupCoordinator.scala @@ -320,7 +320,8 @@ class GroupCoordinator(val brokerId: Int, } else { val member = group.get(memberId) removeHeartbeatForLeavingMember(group, member) - onMemberFailure(group, member) + debug(s"Member ${member.memberId} in group ${group.groupId} has left, removing it from the group") + removeMemberAndUpdateGroup(group, member) responseCallback(Errors.NONE) } } @@ -692,8 +693,7 @@ class GroupCoordinator(val brokerId: Int, joinPurgatory.tryCompleteElseWatch(delayedRebalance, Seq(groupKey)) } - private def onMemberFailure(group: GroupMetadata, member: MemberMetadata) { - debug(s"Member ${member.memberId} in group ${group.groupId} has failed") + private def removeMemberAndUpdateGroup(group: GroupMetadata, member: MemberMetadata) { group.remove(member.memberId) group.currentState match { case Dead | Empty => @@ -774,8 +774,10 @@ class GroupCoordinator(val brokerId: Int, def onExpireHeartbeat(group: GroupMetadata, member: MemberMetadata, heartbeatDeadline: Long) { group synchronized { - if (!shouldKeepMemberAlive(member, heartbeatDeadline)) - onMemberFailure(group, member) + if (!shouldKeepMemberAlive(member, heartbeatDeadline)) { + info(s"Member ${member.memberId} in group ${group.groupId} has failed, removing it from the group") + removeMemberAndUpdateGroup(group, member) + } } }