Browse Source

KAFKA-8715; Fix buggy reliance on state timestamp in static member.id generation (#7116)

The bug is that we accidentally used the current state timestamp for the group instead of the real current time. When a group is first loaded, this timestamp is not initialized, so this resulted in a `NoSuchElementException`. Additionally this violated the intended uniqueness of the memberId, which could have broken the group instance fencing. Fix is made and unit test to make sure the timestamp is properly encoded within the returned member.id.

Reviewers: Ismael Juma <ismael@juma.me.uk>, Guozhang Wang <wangguoz@gmail.com>, Jason Gustafson <jason@confluent.io>
pull/6963/head
Boyang Chen 5 years ago committed by Jason Gustafson
parent
commit
79341a12f2
  1. 2
      core/src/main/scala/kafka/coordinator/group/GroupMetadata.scala
  2. 17
      core/src/test/scala/unit/kafka/coordinator/group/GroupCoordinatorTest.scala

2
core/src/main/scala/kafka/coordinator/group/GroupMetadata.scala

@ -365,7 +365,7 @@ private[group] class GroupMetadata(val groupId: String, initialState: GroupState @@ -365,7 +365,7 @@ private[group] class GroupMetadata(val groupId: String, initialState: GroupState
case None =>
clientId + GroupMetadata.MemberIdDelimiter + UUID.randomUUID().toString
case Some(instanceId) =>
instanceId + GroupMetadata.MemberIdDelimiter + currentStateTimestamp.get
instanceId + GroupMetadata.MemberIdDelimiter + UUID.randomUUID().toString
}
}

17
core/src/test/scala/unit/kafka/coordinator/group/GroupCoordinatorTest.scala

@ -873,6 +873,23 @@ class GroupCoordinatorTest { @@ -873,6 +873,23 @@ class GroupCoordinatorTest {
assertEquals(Errors.FENCED_INSTANCE_ID, invalidHeartbeatResult)
}
@Test
def shouldGetDifferentStaticMemberIdAfterEachRejoin(): Unit = {
val initialResult = staticMembersJoinAndRebalance(leaderInstanceId, followerInstanceId)
val timeAdvance = 1
var lastMemberId = initialResult.leaderId
for (_ <- 1 to 5) {
EasyMock.reset(replicaManager)
val joinGroupResult = staticJoinGroup(groupId, JoinGroupRequest.UNKNOWN_MEMBER_ID,
leaderInstanceId, protocolType, protocols, clockAdvance = timeAdvance)
assertTrue(joinGroupResult.memberId.startsWith(leaderInstanceId.get))
assertNotEquals(lastMemberId, joinGroupResult.memberId)
lastMemberId = joinGroupResult.memberId
}
}
@Test
def testOffsetCommitDeadGroup() {
val memberId = "memberId"

Loading…
Cancel
Save