@ -320,7 +320,8 @@ class GroupCoordinator(val brokerId: Int,
} else {
} else {
val member = group . get ( memberId )
val member = group . get ( memberId )
removeHeartbeatForLeavingMember ( group , member )
removeHeartbeatForLeavingMember ( group , member )
onMemberFailure ( group , member )
debug ( s" Member ${ member . memberId } in group ${ group . groupId } has left, removing it from the group " )
removeMemberAndUpdateGroup ( group , member )
responseCallback ( Errors . NONE )
responseCallback ( Errors . NONE )
}
}
}
}
@ -692,8 +693,7 @@ class GroupCoordinator(val brokerId: Int,
joinPurgatory . tryCompleteElseWatch ( delayedRebalance , Seq ( groupKey ) )
joinPurgatory . tryCompleteElseWatch ( delayedRebalance , Seq ( groupKey ) )
}
}
private def onMemberFailure ( group : GroupMetadata , member : MemberMetadata ) {
private def removeMemberAndUpdateGroup ( group : GroupMetadata , member : MemberMetadata ) {
debug ( s" Member ${ member . memberId } in group ${ group . groupId } has failed " )
group . remove ( member . memberId )
group . remove ( member . memberId )
group . currentState match {
group . currentState match {
case Dead | Empty =>
case Dead | Empty =>
@ -774,8 +774,10 @@ class GroupCoordinator(val brokerId: Int,
def onExpireHeartbeat ( group : GroupMetadata , member : MemberMetadata , heartbeatDeadline : Long ) {
def onExpireHeartbeat ( group : GroupMetadata , member : MemberMetadata , heartbeatDeadline : Long ) {
group synchronized {
group synchronized {
if ( ! shouldKeepMemberAlive ( member , heartbeatDeadline ) )
if ( ! shouldKeepMemberAlive ( member , heartbeatDeadline ) ) {
onMemberFailure ( group , member )
info ( s" Member ${ member . memberId } in group ${ group . groupId } has failed, removing it from the group " )
removeMemberAndUpdateGroup ( group , member )
}
}
}
}
}