Browse Source

KAFKA-15554: Client state changes for handling one assignment at a time & minor improvements (#14413)

This patch includes:
- target assignment changes : accepting only one at a time according to the updated protocol.
- changes for error handling, leaving responsibility in the heartbeatManager and exposing only the functionality for when the state needs to be updated (on successful HB, on fencing, on fatal failure)
- allow transitions for failures when joining
- tests & minor improvements/fixes addressing initial version review

Reviewers: Kirk True <ktrue@confluent.io>, Philip Nee <pnee@confluent.io>, David Jacot <djacot@confluent.io>
pull/14577/head
Lianet Magrans 1 year ago committed by GitHub
parent
commit
48449b68fd
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 88
      clients/src/main/java/org/apache/kafka/clients/consumer/internals/AssignorSelection.java
  2. 2
      clients/src/main/java/org/apache/kafka/clients/consumer/internals/DefaultBackgroundThread.java
  3. 2
      clients/src/main/java/org/apache/kafka/clients/consumer/internals/HeartbeatRequestManager.java
  4. 2
      clients/src/main/java/org/apache/kafka/clients/consumer/internals/MemberState.java
  5. 46
      clients/src/main/java/org/apache/kafka/clients/consumer/internals/MembershipManager.java
  6. 210
      clients/src/main/java/org/apache/kafka/clients/consumer/internals/MembershipManagerImpl.java
  7. 58
      clients/src/test/java/org/apache/kafka/clients/consumer/AssignorSelectionTest.java
  8. 4
      clients/src/test/java/org/apache/kafka/clients/consumer/internals/HeartbeatRequestManagerTest.java
  9. 179
      clients/src/test/java/org/apache/kafka/clients/consumer/internals/MembershipManagerImplTest.java

88
clients/src/main/java/org/apache/kafka/clients/consumer/internals/AssignorSelection.java

@ -1,88 +0,0 @@ @@ -1,88 +0,0 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.clients.consumer.internals;
import java.util.Objects;
import java.util.Optional;
/**
* Selection of a type of assignor used by a member to get partitions assigned as part of a
* consumer group. Currently supported assignors are:
* <li>SERVER assignors</li>
* <p/>
* Server assignors include a name of the server assignor selected, ex. uniform, range.
*/
public class AssignorSelection {
public enum Type { SERVER }
private final AssignorSelection.Type type;
private final Optional<String> serverAssignor;
private AssignorSelection(Type type, String serverAssignor) {
this.type = type;
if (type == Type.SERVER) {
this.serverAssignor = Optional.ofNullable(serverAssignor);
} else {
throw new IllegalArgumentException("Unsupported assignor type " + type);
}
}
public static AssignorSelection newServerAssignor(String serverAssignor) {
if (serverAssignor == null) {
throw new IllegalArgumentException("Selected server assignor name cannot be null");
}
if (serverAssignor.isEmpty()) {
throw new IllegalArgumentException("Selected server assignor name cannot be empty");
}
return new AssignorSelection(Type.SERVER, serverAssignor);
}
public static AssignorSelection defaultAssignor() {
// TODO: review default selection
return new AssignorSelection(Type.SERVER, null);
}
public Optional<String> serverAssignor() {
return this.serverAssignor;
}
public Type type() {
return this.type;
}
@Override
public boolean equals(Object assignorSelection) {
if (this == assignorSelection) return true;
if (assignorSelection == null || getClass() != assignorSelection.getClass()) return false;
return Objects.equals(((AssignorSelection) assignorSelection).type, this.type) &&
Objects.equals(((AssignorSelection) assignorSelection).serverAssignor, this.serverAssignor);
}
@Override
public int hashCode() {
return Objects.hash(type, serverAssignor);
}
@Override
public String toString() {
return "AssignorSelection(" +
"type=" + type + ", " +
"serverAssignor='" + serverAssignor + '\'' +
')';
}
}

2
clients/src/main/java/org/apache/kafka/clients/consumer/internals/DefaultBackgroundThread.java

@ -194,7 +194,7 @@ public class DefaultBackgroundThread extends KafkaThread { @@ -194,7 +194,7 @@ public class DefaultBackgroundThread extends KafkaThread {
config,
coordinatorRequestManager,
groupState);
MembershipManager membershipManager = new MembershipManagerImpl(groupState.groupId);
MembershipManager membershipManager = new MembershipManagerImpl(groupState.groupId, logContext);
heartbeatRequestManager = new HeartbeatRequestManager(
this.time,
logContext,

2
clients/src/main/java/org/apache/kafka/clients/consumer/internals/HeartbeatRequestManager.java

@ -179,7 +179,7 @@ public class HeartbeatRequestManager implements RequestManager { @@ -179,7 +179,7 @@ public class HeartbeatRequestManager implements RequestManager {
data.setSubscribedTopicNames(new ArrayList<>(this.subscriptions.subscription()));
}
this.membershipManager.assignorSelection().serverAssignor().ifPresent(data::setServerAssignor);
this.membershipManager.serverAssignor().ifPresent(data::setServerAssignor);
NetworkClientDelegate.UnsentRequest request = new NetworkClientDelegate.UnsentRequest(
new ConsumerGroupHeartbeatRequest.Builder(data),

2
clients/src/main/java/org/apache/kafka/clients/consumer/internals/MemberState.java

@ -63,7 +63,7 @@ public enum MemberState { @@ -63,7 +63,7 @@ public enum MemberState {
RECONCILING.previousValidStates = Arrays.asList(STABLE, UNJOINED);
FAILED.previousValidStates = Arrays.asList(STABLE, RECONCILING);
FAILED.previousValidStates = Arrays.asList(UNJOINED, STABLE, RECONCILING);
FENCED.previousValidStates = Arrays.asList(STABLE, RECONCILING);

46
clients/src/main/java/org/apache/kafka/clients/consumer/internals/MembershipManager.java

@ -21,7 +21,8 @@ import org.apache.kafka.common.message.ConsumerGroupHeartbeatResponseData; @@ -21,7 +21,8 @@ import org.apache.kafka.common.message.ConsumerGroupHeartbeatResponseData;
import java.util.Optional;
/**
* Manages group membership for a single member.
* A stateful object tracking the state of a single member in relationship to a consumer group:
* <p/>
* Responsible for:
* <li>Keeping member state</li>
* <li>Keeping assignment for the member</li>
@ -29,50 +30,73 @@ import java.util.Optional; @@ -29,50 +30,73 @@ import java.util.Optional;
*/
public interface MembershipManager {
/**
* @return Group ID of the consumer group the member is part of (or wants to be part of).
*/
String groupId();
/**
* @return Instance ID used by the member when joining the group. If non-empty, it will indicate that
* this is a static member.
*/
Optional<String> groupInstanceId();
/**
* @return Member ID assigned by the server to this member when it joins the consumer group.
*/
String memberId();
/**
* @return Current epoch of the member, maintained by the server.
*/
int memberEpoch();
/**
* @return Current state of this member in relationship to a consumer group, as defined in
* {@link MemberState}.
*/
MemberState state();
/**
* Update the current state of the member based on a heartbeat response
* Update member info and transition member state based on a heartbeat response.
*
* @param response Heartbeat response to extract member info and errors from.
*/
void updateState(ConsumerGroupHeartbeatResponseData response);
/**
* Returns the {@link AssignorSelection} for the member
* @return Server-side assignor implementation configured for the member, that will be sent
* out to the server to be used. If empty, then the server will select the assignor.
*/
AssignorSelection assignorSelection();
Optional<String> serverAssignor();
/**
* Returns the current assignment for the member
* @return Current assignment for the member.
*/
ConsumerGroupHeartbeatResponseData.Assignment assignment();
ConsumerGroupHeartbeatResponseData.Assignment currentAssignment();
/**
* Update the assignment for the member, indicating that the provided assignment is the new
* current assignment.
*/
void updateAssignment(ConsumerGroupHeartbeatResponseData.Assignment assignment);
void onTargetAssignmentProcessComplete(ConsumerGroupHeartbeatResponseData.Assignment assignment);
/**
* Transition the member to the FENCED state. This is only invoked when the heartbeat returns a
* FENCED_MEMBER_EPOCH or UNKNOWN_MEMBER_ID error code.
* Transition the member to the FENCED state and update the member info as required. This is
* only invoked when the heartbeat returns a FENCED_MEMBER_EPOCH or UNKNOWN_MEMBER_ID error.
* code.
*/
void transitionToFenced();
/**
* Transition the member to the FAILED state. This is invoked when the heartbeat returns a non-retriable error.
* Transition the member to the FAILED state and update the member info as required. This is
* invoked when un-recoverable errors occur (ex. when the heartbeat returns a non-retriable
* error or when errors occur while executing the user-provided callbacks)
*/
void transitionToFailed();
/**
* Return true if the member should send heartbeat to the coordinator
* @return True if the member should send heartbeat to the coordinator.
*/
boolean shouldSendHeartbeat();
}

210
clients/src/main/java/org/apache/kafka/clients/consumer/internals/MembershipManagerImpl.java

@ -19,100 +19,143 @@ package org.apache.kafka.clients.consumer.internals; @@ -19,100 +19,143 @@ package org.apache.kafka.clients.consumer.internals;
import org.apache.kafka.common.message.ConsumerGroupHeartbeatResponseData;
import org.apache.kafka.common.protocol.Errors;
import org.apache.kafka.common.requests.ConsumerGroupHeartbeatRequest;
import org.apache.kafka.common.utils.LogContext;
import org.slf4j.Logger;
import java.util.Optional;
/**
* Membership manager that maintains group membership for a single member following the new
* Membership manager that maintains group membership for a single member, following the new
* consumer group protocol.
* <p/>
* This keeps membership state and assignment updated in-memory, based on the heartbeat responses
* the member receives. It is also responsible for computing assignment for the group based on
* the metadata, if the member has been selected by the broker to do so.
* This is responsible for:
* <li>Keeping member info (ex. member id, member epoch, assignment, etc.)</li>
* <li>Keeping member state as defined in {@link MemberState}.</li>
* <p/>
* Member info and state are updated based on the heartbeat responses the member receives.
*/
public class MembershipManagerImpl implements MembershipManager {
/**
* Group ID of the consumer group the member will be part of, provided when creating the current
* membership manager.
*/
private final String groupId;
/**
* Group instance ID to be used by the member, provided when creating the current membership manager.
*/
private final Optional<String> groupInstanceId;
/**
* Member ID assigned by the server to the member, received in a heartbeat response when
* joining the group specified in {@link #groupId}
*/
private String memberId;
/**
* Current epoch of the member. It will be set to 0 by the member, and provided to the server
* on the heartbeat request, to join the group. It will be then maintained by the server,
* incremented as the member reconciles and acknowledges the assignments it receives. It will
* be reset to 0 if the member gets fenced.
*/
private int memberEpoch;
/**
* Current state of this member as part of the consumer group, as defined in {@link MemberState}
*/
private MemberState state;
private AssignorSelection assignorSelection;
/**
* Name of the server-side assignor this member has configured to use. It will be sent
* out to the server on the {@link ConsumerGroupHeartbeatRequest}. If not defined, the server
* will select the assignor implementation to use.
*/
private final Optional<String> serverAssignor;
/**
* Assignment that the member received from the server and successfully processed.
*/
private ConsumerGroupHeartbeatResponseData.Assignment currentAssignment;
/**
* Assignment that the member received from the server but hasn't completely processed
* yet.
*/
private Optional<ConsumerGroupHeartbeatResponseData.Assignment> targetAssignment;
/**
* Latest assignment that the member received from the server while a {@link #targetAssignment}
* was in process.
* Logger.
*/
private Optional<ConsumerGroupHeartbeatResponseData.Assignment> nextTargetAssignment;
private final Logger log;
public MembershipManagerImpl(String groupId) {
this(groupId, null, null);
public MembershipManagerImpl(String groupId, LogContext logContext) {
this(groupId, null, null, logContext);
}
public MembershipManagerImpl(String groupId, String groupInstanceId, AssignorSelection assignorSelection) {
public MembershipManagerImpl(String groupId,
String groupInstanceId,
String serverAssignor,
LogContext logContext) {
this.groupId = groupId;
this.state = MemberState.UNJOINED;
if (assignorSelection == null) {
setAssignorSelection(AssignorSelection.defaultAssignor());
} else {
setAssignorSelection(assignorSelection);
}
this.serverAssignor = Optional.ofNullable(serverAssignor);
this.groupInstanceId = Optional.ofNullable(groupInstanceId);
this.targetAssignment = Optional.empty();
this.nextTargetAssignment = Optional.empty();
this.log = logContext.logger(MembershipManagerImpl.class);
}
/**
* Update assignor selection for the member.
* Update the member state, setting it to the nextState only if it is a valid transition.
*
* @param assignorSelection New assignor selection
* @throws IllegalArgumentException If the provided assignor selection is null
* @throws IllegalStateException If transitioning from the member {@link #state} to the
* nextState is not allowed as defined in {@link MemberState}.
*/
public final void setAssignorSelection(AssignorSelection assignorSelection) {
if (assignorSelection == null) {
throw new IllegalArgumentException("Assignor selection cannot be null");
}
this.assignorSelection = assignorSelection;
}
private void transitionTo(MemberState nextState) {
if (!this.state.equals(nextState) && !nextState.getPreviousValidStates().contains(state)) {
// TODO: handle invalid state transition
throw new RuntimeException(String.format("Invalid state transition from %s to %s",
throw new IllegalStateException(String.format("Invalid state transition from %s to %s",
state, nextState));
}
log.trace("Member {} transitioned from {} to {}.", memberId, state, nextState);
this.state = nextState;
}
/**
* {@inheritDoc}
*/
@Override
public String groupId() {
return groupId;
}
/**
* {@inheritDoc}
*/
@Override
public Optional<String> groupInstanceId() {
return groupInstanceId;
}
/**
* {@inheritDoc}
*/
@Override
public String memberId() {
return memberId;
}
/**
* {@inheritDoc}
*/
@Override
public int memberEpoch() {
return memberEpoch;
}
/**
* {@inheritDoc}
*/
@Override
public void updateState(ConsumerGroupHeartbeatResponseData response) {
if (response.errorCode() != Errors.NONE.code()) {
@ -120,7 +163,7 @@ public class MembershipManagerImpl implements MembershipManager { @@ -120,7 +163,7 @@ public class MembershipManagerImpl implements MembershipManager {
"Unexpected error in Heartbeat response. Expected no error, but received: %s",
Errors.forCode(response.errorCode())
);
throw new IllegalStateException(errorMessage);
throw new IllegalArgumentException(errorMessage);
}
this.memberId = response.memberId();
this.memberEpoch = response.memberEpoch();
@ -131,14 +174,21 @@ public class MembershipManagerImpl implements MembershipManager { @@ -131,14 +174,21 @@ public class MembershipManagerImpl implements MembershipManager {
maybeTransitionToStable();
}
/**
* {@inheritDoc}
*/
@Override
public void transitionToFenced() {
resetEpoch();
transitionTo(MemberState.FENCED);
}
/**
* {@inheritDoc}
*/
@Override
public void transitionToFailed() {
log.error("Member {} transitioned to {} state", memberId, MemberState.FAILED);
transitionTo(MemberState.FAILED);
}
@ -160,92 +210,92 @@ public class MembershipManagerImpl implements MembershipManager { @@ -160,92 +210,92 @@ public class MembershipManagerImpl implements MembershipManager {
return state.equals(MemberState.STABLE);
}
/**
* Take new target assignment received from the server and set it as targetAssignment to be
* processed. Following the consumer group protocol, the server won't send a new target
* member while a previous one hasn't been acknowledged by the member, so this will fail
* if a target assignment already exists.
*
* @throws IllegalStateException If a target assignment already exists.
*/
private void setTargetAssignment(ConsumerGroupHeartbeatResponseData.Assignment newTargetAssignment) {
if (!targetAssignment.isPresent()) {
log.info("Member {} accepted new target assignment {} to reconcile", memberId, newTargetAssignment);
targetAssignment = Optional.of(newTargetAssignment);
} else {
// Keep the latest next target assignment
nextTargetAssignment = Optional.of(newTargetAssignment);
}
}
private boolean hasPendingTargetAssignment() {
return targetAssignment.isPresent() || nextTargetAssignment.isPresent();
transitionToFailed();
throw new IllegalStateException("Cannot set new target assignment because a " +
"previous one pending to be reconciled already exists.");
}
/**
* Update state and assignment as the member has successfully processed a new target
* assignment.
* This indicates the end of the reconciliation phase for the member, and makes the target
* assignment the new current assignment.
*
* @param assignment Target assignment the member was able to successfully process
*/
public void onAssignmentProcessSuccess(ConsumerGroupHeartbeatResponseData.Assignment assignment) {
updateAssignment(assignment);
transitionTo(MemberState.STABLE);
}
/**
* Update state and member info as the member was not able to process the assignment, due to
* errors in the execution of the user-provided callbacks.
*
* @param error Exception found during the execution of the user-provided callbacks
* Returns true if the member has a target assignment being processed.
*/
public void onAssignmentProcessFailure(Throwable error) {
transitionTo(MemberState.FAILED);
// TODO: update member info appropriately, to clear up whatever shouldn't be kept in
// this unrecoverable state
private boolean hasPendingTargetAssignment() {
return targetAssignment.isPresent();
}
private void resetEpoch() {
this.memberEpoch = 0;
}
/**
* {@inheritDoc}
*/
@Override
public MemberState state() {
return state;
}
/**
* {@inheritDoc}
*/
@Override
public AssignorSelection assignorSelection() {
return this.assignorSelection;
public Optional<String> serverAssignor() {
return this.serverAssignor;
}
/**
* {@inheritDoc}
*/
@Override
public ConsumerGroupHeartbeatResponseData.Assignment assignment() {
public ConsumerGroupHeartbeatResponseData.Assignment currentAssignment() {
return this.currentAssignment;
}
// VisibleForTesting
/**
* @return Assignment that the member received from the server but hasn't completely processed
* yet. Visible for testing.
*/
Optional<ConsumerGroupHeartbeatResponseData.Assignment> targetAssignment() {
return targetAssignment;
}
// VisibleForTesting
Optional<ConsumerGroupHeartbeatResponseData.Assignment> nextTargetAssignment() {
return nextTargetAssignment;
}
/**
* Set the current assignment for the member. This indicates that the reconciliation of the
* target assignment has been successfully completed.
* This will clear the {@link #targetAssignment}, and take on the
* {@link #nextTargetAssignment} if any.
* This indicates that the reconciliation of the target assignment has been successfully
* completed, so it will make it effective by assigning it to the current assignment.
*
* @param assignment Assignment that has been successfully processed as part of the
* reconciliation process.
* @params Assignment that has been successfully reconciled. This is expected to
* match the target assignment defined in {@link #targetAssignment()}
*/
@Override
public void updateAssignment(ConsumerGroupHeartbeatResponseData.Assignment assignment) {
public void onTargetAssignmentProcessComplete(ConsumerGroupHeartbeatResponseData.Assignment assignment) {
if (assignment == null) {
throw new IllegalArgumentException("Assignment cannot be null");
}
if (!assignment.equals(targetAssignment.orElse(null))) {
// This could be simplified to remove the assignment param and just assume that what
// was reconciled was the targetAssignment, but keeping it explicit and failing fast
// here to uncover any issues in the interaction of the assignment processing logic
// and this.
throw new IllegalStateException(String.format("Reconciled assignment %s does not " +
"match the expected target assignment %s", assignment,
targetAssignment.orElse(null)));
}
this.currentAssignment = assignment;
if (!nextTargetAssignment.isPresent()) {
targetAssignment = Optional.empty();
} else {
targetAssignment = Optional.of(nextTargetAssignment.get());
nextTargetAssignment = Optional.empty();
}
maybeTransitionToStable();
transitionTo(MemberState.STABLE);
}
}

58
clients/src/test/java/org/apache/kafka/clients/consumer/AssignorSelectionTest.java

@ -1,58 +0,0 @@ @@ -1,58 +0,0 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.clients.consumer;
import org.apache.kafka.clients.consumer.internals.AssignorSelection;
import org.junit.jupiter.api.Test;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertNotEquals;
import static org.junit.jupiter.api.Assertions.assertThrows;
import static org.junit.jupiter.api.Assertions.assertTrue;
public class AssignorSelectionTest {
@Test
public void testServerAssignorCannotBeNullOrEmptyIfSelected() {
assertThrows(IllegalArgumentException.class,
() -> AssignorSelection.newServerAssignor(null));
assertThrows(IllegalArgumentException.class,
() -> AssignorSelection.newServerAssignor(""));
}
@Test
public void testEquals() {
// Server assignors
AssignorSelection selection1 = AssignorSelection.newServerAssignor("range");
AssignorSelection selection2 = AssignorSelection.newServerAssignor("range");
assertEquals(selection1, selection1);
assertEquals(selection1, selection2);
AssignorSelection selection3 = AssignorSelection.newServerAssignor("uniform");
assertNotEquals(selection1, selection3);
assertNotEquals(selection1, null);
}
@Test
public void testServerAssignorSelection() {
String assignorName = "uniform";
AssignorSelection selection = AssignorSelection.newServerAssignor(assignorName);
assertEquals(AssignorSelection.Type.SERVER, selection.type());
assertTrue(selection.serverAssignor().isPresent());
assertEquals(assignorName, selection.serverAssignor().get());
}
}

4
clients/src/test/java/org/apache/kafka/clients/consumer/internals/HeartbeatRequestManagerTest.java

@ -94,7 +94,7 @@ public class HeartbeatRequestManagerTest { @@ -94,7 +94,7 @@ public class HeartbeatRequestManagerTest {
coordinatorRequestManager = mock(CoordinatorRequestManager.class);
when(coordinatorRequestManager.coordinator()).thenReturn(Optional.of(new Node(1, "localhost", 9999)));
subscriptionState = mock(SubscriptionState.class);
membershipManager = spy(new MembershipManagerImpl(GROUP_ID));
membershipManager = spy(new MembershipManagerImpl(GROUP_ID, logContext));
heartbeatRequestState = mock(HeartbeatRequestManager.HeartbeatRequestState.class);
errorEventHandler = mock(ErrorEventHandler.class);
heartbeatRequestManager = createManager();
@ -234,7 +234,7 @@ public class HeartbeatRequestManagerTest { @@ -234,7 +234,7 @@ public class HeartbeatRequestManagerTest {
Properties prop = createConsumerConfig();
prop.setProperty(MAX_POLL_INTERVAL_MS_CONFIG, "10000");
config = new ConsumerConfig(prop);
membershipManager = new MembershipManagerImpl(GROUP_ID, GROUP_INSTANCE_ID, null);
membershipManager = new MembershipManagerImpl(GROUP_ID, GROUP_INSTANCE_ID, null, logContext);
heartbeatRequestState = new HeartbeatRequestManager.HeartbeatRequestState(
logContext,
time,

179
clients/src/test/java/org/apache/kafka/clients/consumer/internals/MembershipManagerImplTest.java

@ -21,9 +21,12 @@ import org.apache.kafka.common.Uuid; @@ -21,9 +21,12 @@ import org.apache.kafka.common.Uuid;
import org.apache.kafka.common.message.ConsumerGroupHeartbeatResponseData;
import org.apache.kafka.common.protocol.Errors;
import org.apache.kafka.common.requests.ConsumerGroupHeartbeatResponse;
import org.apache.kafka.common.utils.LogContext;
import org.junit.jupiter.api.Test;
import java.util.Arrays;
import java.util.Collections;
import java.util.Optional;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertFalse;
@ -35,40 +38,26 @@ public class MembershipManagerImplTest { @@ -35,40 +38,26 @@ public class MembershipManagerImplTest {
private static final String GROUP_ID = "test-group";
private static final String MEMBER_ID = "test-member-1";
private static final int MEMBER_EPOCH = 1;
private final LogContext logContext = new LogContext();
@Test
public void testMembershipManagerDefaultAssignor() {
MembershipManagerImpl membershipManager = new MembershipManagerImpl(GROUP_ID);
assertEquals(AssignorSelection.defaultAssignor(), membershipManager.assignorSelection());
public void testMembershipManagerServerAssignor() {
MembershipManagerImpl membershipManager = new MembershipManagerImpl(GROUP_ID, logContext);
assertEquals(Optional.empty(), membershipManager.serverAssignor());
membershipManager = new MembershipManagerImpl(GROUP_ID, "instance1", null);
assertEquals(AssignorSelection.defaultAssignor(), membershipManager.assignorSelection());
}
@Test
public void testMembershipManagerAssignorSelectionUpdate() {
AssignorSelection firstAssignorSelection = AssignorSelection.newServerAssignor("uniform");
MembershipManagerImpl membershipManager = new MembershipManagerImpl(GROUP_ID, "instance1",
firstAssignorSelection);
assertEquals(firstAssignorSelection, membershipManager.assignorSelection());
AssignorSelection secondAssignorSelection = AssignorSelection.newServerAssignor("range");
membershipManager.setAssignorSelection(secondAssignorSelection);
assertEquals(secondAssignorSelection, membershipManager.assignorSelection());
assertThrows(IllegalArgumentException.class,
() -> membershipManager.setAssignorSelection(null));
membershipManager = new MembershipManagerImpl(GROUP_ID, "instance1", "Uniform", logContext);
assertEquals(Optional.of("Uniform"), membershipManager.serverAssignor());
}
@Test
public void testMembershipManagerInitSupportsEmptyGroupInstanceId() {
new MembershipManagerImpl(GROUP_ID);
new MembershipManagerImpl(GROUP_ID, null, AssignorSelection.defaultAssignor());
new MembershipManagerImpl(GROUP_ID, logContext);
new MembershipManagerImpl(GROUP_ID, null, null, logContext);
}
@Test
public void testTransitionToReconcilingOnlyIfAssignmentReceived() {
MembershipManagerImpl membershipManager = new MembershipManagerImpl(GROUP_ID);
MembershipManagerImpl membershipManager = new MembershipManagerImpl(GROUP_ID, logContext);
assertEquals(MemberState.UNJOINED, membershipManager.state());
ConsumerGroupHeartbeatResponse responseWithoutAssignment =
@ -84,7 +73,7 @@ public class MembershipManagerImplTest { @@ -84,7 +73,7 @@ public class MembershipManagerImplTest {
@Test
public void testMemberIdAndEpochResetOnFencedMembers() {
MembershipManagerImpl membershipManager = new MembershipManagerImpl(GROUP_ID);
MembershipManagerImpl membershipManager = new MembershipManagerImpl(GROUP_ID, logContext);
ConsumerGroupHeartbeatResponse heartbeatResponse =
createConsumerGroupHeartbeatResponse(null);
membershipManager.updateState(heartbeatResponse.data());
@ -99,7 +88,7 @@ public class MembershipManagerImplTest { @@ -99,7 +88,7 @@ public class MembershipManagerImplTest {
@Test
public void testTransitionToFailure() {
MembershipManagerImpl membershipManager = new MembershipManagerImpl(GROUP_ID);
MembershipManagerImpl membershipManager = new MembershipManagerImpl(GROUP_ID, logContext);
ConsumerGroupHeartbeatResponse heartbeatResponse =
createConsumerGroupHeartbeatResponse(null);
membershipManager.updateState(heartbeatResponse.data());
@ -112,62 +101,144 @@ public class MembershipManagerImplTest { @@ -112,62 +101,144 @@ public class MembershipManagerImplTest {
}
@Test
public void testUpdateAssignment() {
MembershipManagerImpl membershipManager = new MembershipManagerImpl(GROUP_ID);
public void testFencingWhenStateIsStable() {
MembershipManagerImpl membershipManager = new MembershipManagerImpl(GROUP_ID, logContext);
ConsumerGroupHeartbeatResponse heartbeatResponse = createConsumerGroupHeartbeatResponse(null);
membershipManager.updateState(heartbeatResponse.data());
assertEquals(MemberState.STABLE, membershipManager.state());
testStateUpdateOnFenceError(membershipManager);
}
@Test
public void testFencingWhenStateIsReconciling() {
MembershipManagerImpl membershipManager = new MembershipManagerImpl(GROUP_ID, logContext);
ConsumerGroupHeartbeatResponse heartbeatResponse = createConsumerGroupHeartbeatResponse(createAssignment());
membershipManager.updateState(heartbeatResponse.data());
assertEquals(MemberState.RECONCILING, membershipManager.state());
testStateUpdateOnFenceError(membershipManager);
}
@Test
public void testFatalFailureWhenStateIsUnjoined() {
MembershipManagerImpl membershipManager = new MembershipManagerImpl(GROUP_ID, logContext);
assertEquals(MemberState.UNJOINED, membershipManager.state());
testStateUpdateOnFatalFailure(membershipManager);
}
@Test
public void testFatalFailureWhenStateIsStable() {
MembershipManagerImpl membershipManager = new MembershipManagerImpl(GROUP_ID, logContext);
ConsumerGroupHeartbeatResponse heartbeatResponse = createConsumerGroupHeartbeatResponse(null);
membershipManager.updateState(heartbeatResponse.data());
assertEquals(MemberState.STABLE, membershipManager.state());
testStateUpdateOnFatalFailure(membershipManager);
}
@Test
public void testFencingShouldNotHappenWhenStateIsUnjoined() {
MembershipManagerImpl membershipManager = new MembershipManagerImpl(GROUP_ID, logContext);
assertEquals(MemberState.UNJOINED, membershipManager.state());
// Getting fenced when the member is not part of the group is not expected and should
// fail with invalid transition.
assertThrows(IllegalStateException.class, membershipManager::transitionToFenced);
}
@Test
public void testUpdateStateFailsOnResponsesWithErrors() {
MembershipManagerImpl membershipManager = new MembershipManagerImpl(GROUP_ID, logContext);
// Updating state with a heartbeat response containing errors cannot be performed and
// should fail.
ConsumerGroupHeartbeatResponse unknownMemberResponse =
createConsumerGroupHeartbeatResponseWithError(Errors.UNKNOWN_MEMBER_ID);
assertThrows(IllegalArgumentException.class,
() -> membershipManager.updateState(unknownMemberResponse.data()));
}
@Test
public void testAssignmentUpdatedAsReceivedAndProcessed() {
MembershipManagerImpl membershipManager = new MembershipManagerImpl(GROUP_ID, logContext);
ConsumerGroupHeartbeatResponseData.Assignment newAssignment = createAssignment();
ConsumerGroupHeartbeatResponse heartbeatResponse =
createConsumerGroupHeartbeatResponse(newAssignment);
membershipManager.updateState(heartbeatResponse.data());
// Target assignment should be in the process of being reconciled
checkAssignments(membershipManager, null, newAssignment, null);
checkAssignments(membershipManager, null, newAssignment);
// Mark assignment processing completed
membershipManager.onTargetAssignmentProcessComplete(newAssignment);
// Target assignment should now be the current assignment
checkAssignments(membershipManager, newAssignment, null);
}
@Test
public void testUpdateAssignmentReceivingAssignmentWhileAnotherInProcess() {
MembershipManagerImpl membershipManager = new MembershipManagerImpl(GROUP_ID);
public void testMemberFailsIfAssignmentReceivedWhileAnotherOnBeingReconciled() {
MembershipManagerImpl membershipManager = new MembershipManagerImpl(GROUP_ID, logContext);
ConsumerGroupHeartbeatResponseData.Assignment newAssignment1 = createAssignment();
membershipManager.updateState(createConsumerGroupHeartbeatResponse(newAssignment1).data());
// First target assignment received should be in the process of being reconciled
checkAssignments(membershipManager, null, newAssignment1, null);
checkAssignments(membershipManager, null, newAssignment1);
// Second target assignment received while there is another one being reconciled
ConsumerGroupHeartbeatResponseData.Assignment newAssignment2 = createAssignment();
membershipManager.updateState(createConsumerGroupHeartbeatResponse(newAssignment2).data());
checkAssignments(membershipManager, null, newAssignment1, newAssignment2);
assertThrows(IllegalStateException.class,
() -> membershipManager.updateState(createConsumerGroupHeartbeatResponse(newAssignment2).data()));
assertEquals(MemberState.FAILED, membershipManager.state());
}
@Test
public void testNextTargetAssignmentHoldsLatestAssignmentReceivedWhileAnotherInProcess() {
MembershipManagerImpl membershipManager = new MembershipManagerImpl(GROUP_ID);
ConsumerGroupHeartbeatResponseData.Assignment newAssignment1 = createAssignment();
membershipManager.updateState(createConsumerGroupHeartbeatResponse(newAssignment1).data());
// First target assignment received, remains in the process of being reconciled
checkAssignments(membershipManager, null, newAssignment1, null);
// Second target assignment received while there is another one being reconciled
ConsumerGroupHeartbeatResponseData.Assignment newAssignment2 = createAssignment();
membershipManager.updateState(createConsumerGroupHeartbeatResponse(newAssignment2).data());
checkAssignments(membershipManager, null, newAssignment1, newAssignment2);
public void testAssignmentUpdatedFailsIfAssignmentReconciledDoesNotMatchTargetAssignment() {
MembershipManagerImpl membershipManager = new MembershipManagerImpl(GROUP_ID, logContext);
ConsumerGroupHeartbeatResponseData.Assignment targetAssignment = new ConsumerGroupHeartbeatResponseData.Assignment()
.setTopicPartitions(Collections.singletonList(
new ConsumerGroupHeartbeatResponseData.TopicPartitions()
.setTopicId(Uuid.randomUuid())
.setPartitions(Arrays.asList(0, 1, 2))));
ConsumerGroupHeartbeatResponse heartbeatResponse =
createConsumerGroupHeartbeatResponse(targetAssignment);
membershipManager.updateState(heartbeatResponse.data());
// If more assignments are received while there is one being reconciled, the most recent
// assignment received is kept as nextTargetAssignment
ConsumerGroupHeartbeatResponseData.Assignment newAssignment3 = createAssignment();
membershipManager.updateState(createConsumerGroupHeartbeatResponse(newAssignment3).data());
checkAssignments(membershipManager, null, newAssignment1, newAssignment3);
// Target assignment should be in the process of being reconciled
checkAssignments(membershipManager, null, targetAssignment);
// Mark assignment processing completed
ConsumerGroupHeartbeatResponseData.Assignment reconciled =
new ConsumerGroupHeartbeatResponseData.Assignment()
.setTopicPartitions(Collections.singletonList(
new ConsumerGroupHeartbeatResponseData.TopicPartitions()
.setTopicId(Uuid.randomUuid())
.setPartitions(Collections.singletonList(0))));
assertThrows(IllegalStateException.class, () -> membershipManager.onTargetAssignmentProcessComplete(reconciled));
}
private void checkAssignments(
MembershipManagerImpl membershipManager,
ConsumerGroupHeartbeatResponseData.Assignment expectedCurrentAssignment,
ConsumerGroupHeartbeatResponseData.Assignment expectedTargetAssignment,
ConsumerGroupHeartbeatResponseData.Assignment expectedNextTargetAssignment) {
assertEquals(expectedCurrentAssignment, membershipManager.assignment());
ConsumerGroupHeartbeatResponseData.Assignment expectedTargetAssignment) {
assertEquals(expectedCurrentAssignment, membershipManager.currentAssignment());
assertEquals(expectedTargetAssignment, membershipManager.targetAssignment().orElse(null));
assertEquals(expectedNextTargetAssignment, membershipManager.nextTargetAssignment().orElse(null));
}
private void testStateUpdateOnFenceError(MembershipManager membershipManager) {
membershipManager.transitionToFenced();
assertEquals(MemberState.FENCED, membershipManager.state());
// Should reset member epoch and keep member id
assertFalse(membershipManager.memberId().isEmpty());
assertEquals(0, membershipManager.memberEpoch());
}
private void testStateUpdateOnFatalFailure(MembershipManager membershipManager) {
String initialMemberId = membershipManager.memberId();
int initialMemberEpoch = membershipManager.memberEpoch();
membershipManager.transitionToFailed();
assertEquals(MemberState.FAILED, membershipManager.state());
// Should not reset member id or epoch
assertEquals(initialMemberId, membershipManager.memberId());
assertEquals(initialMemberEpoch, membershipManager.memberEpoch());
}
private ConsumerGroupHeartbeatResponse createConsumerGroupHeartbeatResponse(ConsumerGroupHeartbeatResponseData.Assignment assignment) {

Loading…
Cancel
Save