Browse Source

KAFKA-7909; Ensure timely rebalance completion after pending members rejoin or fail (#6251)

Fix the following situations, where pending members (one that has a member-id, but hasn't joined the group) can cause rebalance operations to fail: 

- In AbstractCoordinator, a pending consumer should be allowed to leave.
- A rebalance operation must successfully complete if a pending member either joins or times out.
- During a rebalance operation, a pending member must be able to leave a group.

Reviewers: Boyang Chen <bchen11@outlook.com>, Ismael Juma <ismael@juma.me.uk>, Jason Gustafson <jason@confluent.io>
pull/6297/head
Arjun Satish 6 years ago committed by Jason Gustafson
parent
commit
4cb8f56b45
  1. 30
      clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java
  2. 1
      clients/src/test/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinatorTest.java
  3. 35
      clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinatorTest.java
  4. 45
      core/src/main/scala/kafka/coordinator/group/GroupCoordinator.scala
  5. 2
      core/src/main/scala/kafka/coordinator/group/GroupMetadata.scala
  6. 187
      core/src/test/scala/unit/kafka/coordinator/group/GroupCoordinatorTest.scala

30
clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java

@ -759,6 +759,15 @@ public abstract class AbstractCoordinator implements Closeable { @@ -759,6 +759,15 @@ public abstract class AbstractCoordinator implements Closeable {
return generation != null && generation.generationId == generationId;
}
/**
* @return true if the current generation's member ID is valid, false otherwise
*/
// Visible for testing
final synchronized boolean hasValidMemberId() {
return generation != null && generation.hasMemberId();
}
/**
* Reset the generation and memberId because we have fallen out of the group.
*/
@ -807,10 +816,10 @@ public abstract class AbstractCoordinator implements Closeable { @@ -807,10 +816,10 @@ public abstract class AbstractCoordinator implements Closeable {
* Leave the current group and reset local generation/memberId.
*/
public synchronized void maybeLeaveGroup() {
if (!coordinatorUnknown() && state != MemberState.UNJOINED && generation.isValid()) {
if (!coordinatorUnknown() && state != MemberState.UNJOINED && generation.hasMemberId()) {
// this is a minimal effort attempt to leave the group. we do not
// attempt any resending if the request fails or times out.
log.info("Sending LeaveGroup request to coordinator {}", coordinator);
log.info("Member {} sending LeaveGroup request to coordinator {}", generation.memberId, coordinator);
LeaveGroupRequest.Builder request = new LeaveGroupRequest.Builder(new LeaveGroupRequestData()
.setGroupId(groupId).setMemberId(generation.memberId));
client.send(coordinator, request)
@ -1121,8 +1130,12 @@ public abstract class AbstractCoordinator implements Closeable { @@ -1121,8 +1130,12 @@ public abstract class AbstractCoordinator implements Closeable {
this.protocol = protocol;
}
public boolean isValid() {
return generationId != OffsetCommitRequest.DEFAULT_GENERATION_ID;
/**
* @return true if this generation has a valid member id, false otherwise. A member might have an id before
* it becomes part of a group generation.
*/
public boolean hasMemberId() {
return !memberId.isEmpty();
}
@Override
@ -1139,6 +1152,15 @@ public abstract class AbstractCoordinator implements Closeable { @@ -1139,6 +1152,15 @@ public abstract class AbstractCoordinator implements Closeable {
public int hashCode() {
return Objects.hash(generationId, memberId, protocol);
}
@Override
public String toString() {
return "Generation{" +
"generationId=" + generationId +
", memberId='" + memberId + '\'' +
", protocol='" + protocol + '\'' +
'}';
}
}
private static class UnjoinedGroupException extends RetriableException {

1
clients/src/test/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinatorTest.java

@ -222,6 +222,7 @@ public class AbstractCoordinatorTest { @@ -222,6 +222,7 @@ public class AbstractCoordinatorTest {
assertTrue(consumerClient.poll(future, mockTime.timer(REQUEST_TIMEOUT_MS)));
assertEquals(Errors.MEMBER_ID_REQUIRED.message(), future.exception().getMessage());
assertTrue(coordinator.rejoinNeededOrPending());
assertTrue(coordinator.hasValidMemberId());
assertTrue(coordinator.hasMatchingGenerationId(generation));
future = coordinator.sendJoinGroupRequest();
assertTrue(consumerClient.poll(future, mockTime.timer(REBALANCE_TIMEOUT_MS)));

35
clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinatorTest.java

@ -744,6 +744,41 @@ public class ConsumerCoordinatorTest { @@ -744,6 +744,41 @@ public class ConsumerCoordinatorTest {
assertNull(generation);
}
/**
* This test checks if a consumer that has a valid member ID but an invalid generation
* ({@link org.apache.kafka.clients.consumer.internals.AbstractCoordinator.Generation#NO_GENERATION})
* can still execute a leave group request. Such a situation may arise when a consumer has initiated a JoinGroup
* request without a memberId, but is shutdown or restarted before it has a chance to initiate and complete the
* second request.
*/
@Test
public void testPendingMemberShouldLeaveGroup() {
final String consumerId = "consumer-id";
subscriptions.subscribe(singleton(topic1), rebalanceListener);
client.prepareResponse(groupCoordinatorResponse(node, Errors.NONE));
coordinator.ensureCoordinatorReady(time.timer(Long.MAX_VALUE));
// here we return a DEFAULT_GENERATION_ID, but valid member id and leader id.
client.prepareResponse(joinGroupFollowerResponse(-1, consumerId, "leader-id", Errors.MEMBER_ID_REQUIRED));
// execute join group
coordinator.joinGroupIfNeeded(time.timer(0));
final AtomicBoolean received = new AtomicBoolean(false);
client.prepareResponse(new MockClient.RequestMatcher() {
@Override
public boolean matches(AbstractRequest body) {
received.set(true);
LeaveGroupRequest leaveRequest = (LeaveGroupRequest) body;
return leaveRequest.data().memberId().equals(consumerId);
}
}, new LeaveGroupResponse(new LeaveGroupResponseData().setErrorCode(Errors.NONE.code())));
coordinator.maybeLeaveGroup();
assertTrue(received.get());
}
@Test(expected = KafkaException.class)
public void testUnexpectedErrorOnSyncGroup() {
final String consumerId = "consumer";

45
core/src/main/scala/kafka/coordinator/group/GroupCoordinator.scala

@ -143,6 +143,11 @@ class GroupCoordinator(val brokerId: Int, @@ -143,6 +143,11 @@ class GroupCoordinator(val brokerId: Int,
} else {
doJoinGroup(group, memberId, clientId, clientHost, rebalanceTimeoutMs, sessionTimeoutMs, protocolType, protocols, responseCallback)
}
// attempt to complete JoinGroup
if (group.is(PreparingRebalance)) {
joinPurgatory.checkAndComplete(GroupKey(group.groupId))
}
}
}
}
@ -258,13 +263,10 @@ class GroupCoordinator(val brokerId: Int, @@ -258,13 +263,10 @@ class GroupCoordinator(val brokerId: Int,
case Empty | Dead =>
// Group reaches unexpected state. Let the joining member reset their generation and rejoin.
warn(s"Attempt to add rejoining member ${memberId} of group ${group.groupId} in " +
warn(s"Attempt to add rejoining member $memberId of group ${group.groupId} in " +
s"unexpected group state ${group.currentState}")
responseCallback(joinError(memberId, Errors.UNKNOWN_MEMBER_ID))
}
if (group.is(PreparingRebalance))
joinPurgatory.checkAndComplete(GroupKey(group.groupId))
}
}
}
@ -365,12 +367,21 @@ class GroupCoordinator(val brokerId: Int, @@ -365,12 +367,21 @@ class GroupCoordinator(val brokerId: Int,
case Some(group) =>
group.inLock {
if (group.is(Dead) || !group.has(memberId)) {
if (group.is(Dead)) {
responseCallback(Errors.UNKNOWN_MEMBER_ID)
} else if (group.isPendingMember(memberId)) {
// if a pending member is leaving, it needs to be removed from the pending list, heartbeat cancelled
// and if necessary, prompt a JoinGroup completion.
info(s"Pending member $memberId is leaving group ${group.groupId}.")
removePendingMemberAndUpdateGroup(group, memberId)
heartbeatPurgatory.checkAndComplete(MemberKey(group.groupId, memberId))
responseCallback(Errors.NONE)
} else if (!group.has(memberId)) {
responseCallback(Errors.UNKNOWN_MEMBER_ID)
} else {
val member = group.get(memberId)
removeHeartbeatForLeavingMember(group, member)
debug(s"Member ${member.memberId} in group ${group.groupId} has left, removing it from the group")
info(s"Member ${member.memberId} in group ${group.groupId} has left, removing it from the group")
removeMemberAndUpdateGroup(group, member, s"removing member $memberId on LeaveGroup")
responseCallback(Errors.NONE)
}
@ -775,8 +786,8 @@ class GroupCoordinator(val brokerId: Int, @@ -775,8 +786,8 @@ class GroupCoordinator(val brokerId: Int,
// for new members. If the new member is still there, we expect it to retry.
completeAndScheduleNextExpiration(group, member, NewMemberJoinTimeoutMs)
maybePrepareRebalance(group, s"Adding new member $memberId")
group.removePendingMember(memberId)
maybePrepareRebalance(group, s"Adding new member $memberId")
}
private def updateMemberAndRebalance(group: GroupMetadata,
@ -833,6 +844,14 @@ class GroupCoordinator(val brokerId: Int, @@ -833,6 +844,14 @@ class GroupCoordinator(val brokerId: Int,
}
}
private def removePendingMemberAndUpdateGroup(group: GroupMetadata, memberId: String) {
group.removePendingMember(memberId)
if (group.is(PreparingRebalance)) {
joinPurgatory.checkAndComplete(GroupKey(group.groupId))
}
}
def tryCompleteJoin(group: GroupMetadata, forceComplete: () => Boolean) = {
group.inLock {
if (group.hasAllMembersJoined)
@ -898,8 +917,12 @@ class GroupCoordinator(val brokerId: Int, @@ -898,8 +917,12 @@ class GroupCoordinator(val brokerId: Int,
def tryCompleteHeartbeat(group: GroupMetadata, memberId: String, isPending: Boolean, heartbeatDeadline: Long, forceComplete: () => Boolean) = {
group.inLock {
if (isPending)
group.has(memberId)
if (isPending) {
// complete the heartbeat if the member has joined the group
if (group.has(memberId)) {
forceComplete()
} else false
}
else {
val member = group.get(memberId)
if (member.shouldKeepAlive(heartbeatDeadline) || member.isLeaving) {
@ -912,8 +935,8 @@ class GroupCoordinator(val brokerId: Int, @@ -912,8 +935,8 @@ class GroupCoordinator(val brokerId: Int,
def onExpireHeartbeat(group: GroupMetadata, memberId: String, isPending: Boolean, heartbeatDeadline: Long) {
group.inLock {
if (isPending) {
debug(s"Pending member $memberId has been removed after session timeout expiration.")
group.removePendingMember(memberId)
info(s"Pending member $memberId in group ${group.groupId} has been removed after session timeout expiration.")
removePendingMemberAndUpdateGroup(group, memberId)
} else if (!group.has(memberId)) {
debug(s"Member $memberId has already been removed from the group.")
} else {

2
core/src/main/scala/kafka/coordinator/group/GroupMetadata.scala

@ -250,6 +250,8 @@ private[group] class GroupMetadata(val groupId: String, initialState: GroupState @@ -250,6 +250,8 @@ private[group] class GroupMetadata(val groupId: String, initialState: GroupState
def allMembers = members.keySet
def numPending = pendingMembers.size
def allMemberMetadata = members.values.toList
def rebalanceTimeoutMs = members.values.foldLeft(0) { (timeout, member) =>

187
core/src/test/scala/unit/kafka/coordinator/group/GroupCoordinatorTest.scala

@ -788,6 +788,193 @@ class GroupCoordinatorTest extends JUnitSuite { @@ -788,6 +788,193 @@ class GroupCoordinatorTest extends JUnitSuite {
assertNotEquals(firstGenerationId, secondJoinResult.generationId)
}
/**
* Test if the following scenario completes a rebalance correctly: A new member starts a JoinGroup request with
* an UNKNOWN_MEMBER_ID, attempting to join a stable group. But never initiates the second JoinGroup request with
* the provided member ID and times out. The test checks if original member remains the sole member in this group,
* which should remain stable throughout this test.
*/
@Test
def testSecondMemberPartiallyJoinAndTimeout() {
val firstJoinResult = joinGroup(groupId, JoinGroupRequest.UNKNOWN_MEMBER_ID, protocolType, protocols)
val firstMemberId = firstJoinResult.memberId
val firstGenerationId = firstJoinResult.generationId
assertEquals(firstMemberId, firstJoinResult.leaderId)
assertEquals(Errors.NONE, firstJoinResult.error)
//Starting sync group leader
EasyMock.reset(replicaManager)
val firstSyncResult = syncGroupLeader(groupId, firstGenerationId, firstMemberId, Map(firstMemberId -> Array[Byte]()))
assertEquals(Errors.NONE, firstSyncResult._2)
timer.advanceClock(100)
assertEquals(1, groupCoordinator.groupManager.getGroup(groupId).get.allMembers.size)
assertEquals(0, groupCoordinator.groupManager.getGroup(groupId).get.numPending)
val group = groupCoordinator.groupManager.getGroup(groupId).get
// ensure the group is stable before a new member initiates join request
assertEquals(Stable, group.currentState)
// new member initiates join group
EasyMock.reset(replicaManager)
val secondJoinResult = joinGroupPartial(groupId, JoinGroupRequest.UNKNOWN_MEMBER_ID, protocolType, protocols)
assertEquals(Errors.MEMBER_ID_REQUIRED, secondJoinResult.error)
assertEquals(1, group.numPending)
assertEquals(Stable, group.currentState)
EasyMock.reset(replicaManager)
EasyMock.expect(replicaManager.getMagic(EasyMock.anyObject())).andReturn(Some(RecordBatch.MAGIC_VALUE_V1)).anyTimes()
EasyMock.replay(replicaManager)
// advance clock to timeout the pending member
assertEquals(1, group.allMembers.size)
assertEquals(1, group.numPending)
timer.advanceClock(300)
// original (firstMember) member sends heartbeats to prevent session timeouts.
EasyMock.reset(replicaManager)
val heartbeatResult = heartbeat(groupId, firstMemberId, 1)
assertEquals(Errors.NONE, heartbeatResult)
// timeout the pending member
timer.advanceClock(300)
// at this point the second member should have been removed from pending list (session timeout),
// and the group should be in Stable state with only the first member in it.
assertEquals(1, group.allMembers.size)
assertEquals(0, group.numPending)
assertEquals(Stable, group.currentState)
assertTrue(group.has(firstMemberId))
}
/**
* Create a group with two members in Stable state. Create a third pending member by completing it's first JoinGroup
* request without a member id.
*/
private def setupGroupWithPendingMember(): JoinGroupResult = {
// add the first member
val joinResult1 = joinGroup(groupId, JoinGroupRequest.UNKNOWN_MEMBER_ID, protocolType, protocols)
assertGroupState(groupState = CompletingRebalance)
// now the group is stable, with the one member that joined above
EasyMock.reset(replicaManager)
val firstSyncResult = syncGroupLeader(groupId, joinResult1.generationId, joinResult1.memberId, Map(joinResult1.memberId -> Array[Byte]()))
assertEquals(Errors.NONE, firstSyncResult._2)
assertGroupState(groupState = Stable)
// start the join for the second member
EasyMock.reset(replicaManager)
val secondJoinFuture = sendJoinGroup(groupId, JoinGroupRequest.UNKNOWN_MEMBER_ID, protocolType, protocols)
// rejoin the first member back into the group
EasyMock.reset(replicaManager)
val firstJoinFuture = sendJoinGroup(groupId, joinResult1.memberId, protocolType, protocols)
val firstMemberJoinResult = await(firstJoinFuture, DefaultSessionTimeout+100)
val secondMemberJoinResult = await(secondJoinFuture, DefaultSessionTimeout+100)
assertGroupState(groupState = CompletingRebalance)
// stabilize the group
EasyMock.reset(replicaManager)
val secondSyncResult = syncGroupLeader(groupId, firstMemberJoinResult.generationId, joinResult1.memberId, Map(joinResult1.memberId -> Array[Byte]()))
assertEquals(Errors.NONE, secondSyncResult._2)
assertGroupState(groupState = Stable)
// re-join an existing member, to transition the group to PreparingRebalance state.
EasyMock.reset(replicaManager)
sendJoinGroup(groupId, firstMemberJoinResult.memberId, protocolType, protocols)
assertGroupState(groupState = PreparingRebalance)
// create a pending member in the group
EasyMock.reset(replicaManager)
var pendingMember = joinGroupPartial(groupId, JoinGroupRequest.UNKNOWN_MEMBER_ID, protocolType, protocols, sessionTimeout=100)
assertEquals(1, groupCoordinator.groupManager.getGroup(groupId).get.numPending)
// re-join the second existing member
EasyMock.reset(replicaManager)
sendJoinGroup(groupId, secondMemberJoinResult.memberId, protocolType, protocols)
assertGroupState(groupState = PreparingRebalance)
assertEquals(1, groupCoordinator.groupManager.getGroup(groupId).get.numPending)
pendingMember
}
/**
* Setup a group in with a pending member. The test checks if the a pending member joining completes the rebalancing
* operation
*/
@Test
def testJoinGroupCompletionWhenPendingMemberJoins() {
val pendingMember = setupGroupWithPendingMember()
// compete join group for the pending member
EasyMock.reset(replicaManager)
val pendingMemberJoinFuture = sendJoinGroup(groupId, pendingMember.memberId, protocolType, protocols)
await(pendingMemberJoinFuture, DefaultSessionTimeout+100)
assertGroupState(groupState = CompletingRebalance)
assertEquals(3, group().allMembers.size)
assertEquals(0, group().numPending)
}
/**
* Setup a group in with a pending member. The test checks if the timeout of the pending member will
* cause the group to return to a CompletingRebalance state.
*/
@Test
def testJoinGroupCompletionWhenPendingMemberTimesOut() {
setupGroupWithPendingMember()
// Advancing Clock by > 100 (session timeout for third and fourth member)
// and < 500 (for first and second members). This will force the coordinator to attempt join
// completion on heartbeat expiration (since we are in PendingRebalance stage).
EasyMock.reset(replicaManager)
EasyMock.expect(replicaManager.getMagic(EasyMock.anyObject())).andReturn(Some(RecordBatch.MAGIC_VALUE_V1)).anyTimes()
EasyMock.replay(replicaManager)
timer.advanceClock(120)
assertGroupState(groupState = CompletingRebalance)
assertEquals(2, group().allMembers.size)
assertEquals(0, group().numPending)
}
@Test
def testPendingMembersLeavesGroup(): Unit = {
val pending = setupGroupWithPendingMember()
EasyMock.reset(replicaManager)
val leaveGroupResult = leaveGroup(groupId, pending.memberId)
assertEquals(Errors.NONE, leaveGroupResult)
assertGroupState(groupState = CompletingRebalance)
assertEquals(2, group().allMembers.size)
assertEquals(0, group().numPending)
}
private def group(groupId: String = groupId) = {
groupCoordinator.groupManager.getGroup(groupId) match {
case Some(g) => g
case None => null
}
}
private def assertGroupState(groupId: String = groupId,
groupState: GroupState): Unit = {
groupCoordinator.groupManager.getGroup(groupId) match {
case Some(group) => assertEquals(groupState, group.currentState)
case None => fail(s"Group $groupId not found in coordinator")
}
}
private def joinGroupPartial(groupId: String,
memberId: String,
protocolType: String,
protocols: List[(String, Array[Byte])],
sessionTimeout: Int = DefaultSessionTimeout,
rebalanceTimeout: Int = DefaultRebalanceTimeout): JoinGroupResult = {
val requireKnownMemberId = true
val responseFuture = sendJoinGroup(groupId, memberId, protocolType, protocols, sessionTimeout, rebalanceTimeout, requireKnownMemberId)
Await.result(responseFuture, Duration(rebalanceTimeout + 100, TimeUnit.MILLISECONDS))
}
@Test
def testLeaderFailureInSyncGroup() {
// to get a group of two members:

Loading…
Cancel
Save