diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AssignorSelection.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AssignorSelection.java
deleted file mode 100644
index 5eaae957ea5..00000000000
--- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AssignorSelection.java
+++ /dev/null
@@ -1,88 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.kafka.clients.consumer.internals;
-
-import java.util.Objects;
-import java.util.Optional;
-
-/**
- * Selection of a type of assignor used by a member to get partitions assigned as part of a
- * consumer group. Currently supported assignors are:
- *
SERVER assignors
- *
- * Server assignors include a name of the server assignor selected, ex. uniform, range.
- */
-public class AssignorSelection {
- public enum Type { SERVER }
-
- private final AssignorSelection.Type type;
- private final Optional serverAssignor;
-
- private AssignorSelection(Type type, String serverAssignor) {
- this.type = type;
- if (type == Type.SERVER) {
- this.serverAssignor = Optional.ofNullable(serverAssignor);
- } else {
- throw new IllegalArgumentException("Unsupported assignor type " + type);
- }
- }
-
- public static AssignorSelection newServerAssignor(String serverAssignor) {
- if (serverAssignor == null) {
- throw new IllegalArgumentException("Selected server assignor name cannot be null");
- }
- if (serverAssignor.isEmpty()) {
- throw new IllegalArgumentException("Selected server assignor name cannot be empty");
- }
- return new AssignorSelection(Type.SERVER, serverAssignor);
- }
-
- public static AssignorSelection defaultAssignor() {
- // TODO: review default selection
- return new AssignorSelection(Type.SERVER, null);
- }
-
- public Optional serverAssignor() {
- return this.serverAssignor;
- }
-
- public Type type() {
- return this.type;
- }
-
- @Override
- public boolean equals(Object assignorSelection) {
- if (this == assignorSelection) return true;
- if (assignorSelection == null || getClass() != assignorSelection.getClass()) return false;
- return Objects.equals(((AssignorSelection) assignorSelection).type, this.type) &&
- Objects.equals(((AssignorSelection) assignorSelection).serverAssignor, this.serverAssignor);
- }
-
- @Override
- public int hashCode() {
- return Objects.hash(type, serverAssignor);
- }
-
- @Override
- public String toString() {
- return "AssignorSelection(" +
- "type=" + type + ", " +
- "serverAssignor='" + serverAssignor + '\'' +
- ')';
- }
-}
diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/DefaultBackgroundThread.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/DefaultBackgroundThread.java
index 774f4a141b8..74322e6fec0 100644
--- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/DefaultBackgroundThread.java
+++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/DefaultBackgroundThread.java
@@ -194,7 +194,7 @@ public class DefaultBackgroundThread extends KafkaThread {
config,
coordinatorRequestManager,
groupState);
- MembershipManager membershipManager = new MembershipManagerImpl(groupState.groupId);
+ MembershipManager membershipManager = new MembershipManagerImpl(groupState.groupId, logContext);
heartbeatRequestManager = new HeartbeatRequestManager(
this.time,
logContext,
diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/HeartbeatRequestManager.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/HeartbeatRequestManager.java
index 7941c4effb2..2e821693fd6 100644
--- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/HeartbeatRequestManager.java
+++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/HeartbeatRequestManager.java
@@ -179,7 +179,7 @@ public class HeartbeatRequestManager implements RequestManager {
data.setSubscribedTopicNames(new ArrayList<>(this.subscriptions.subscription()));
}
- this.membershipManager.assignorSelection().serverAssignor().ifPresent(data::setServerAssignor);
+ this.membershipManager.serverAssignor().ifPresent(data::setServerAssignor);
NetworkClientDelegate.UnsentRequest request = new NetworkClientDelegate.UnsentRequest(
new ConsumerGroupHeartbeatRequest.Builder(data),
diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/MemberState.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/MemberState.java
index 4da2eb54e5c..ebcb279cf37 100644
--- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/MemberState.java
+++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/MemberState.java
@@ -63,7 +63,7 @@ public enum MemberState {
RECONCILING.previousValidStates = Arrays.asList(STABLE, UNJOINED);
- FAILED.previousValidStates = Arrays.asList(STABLE, RECONCILING);
+ FAILED.previousValidStates = Arrays.asList(UNJOINED, STABLE, RECONCILING);
FENCED.previousValidStates = Arrays.asList(STABLE, RECONCILING);
diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/MembershipManager.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/MembershipManager.java
index c0fb9ed3903..8a95a80c659 100644
--- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/MembershipManager.java
+++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/MembershipManager.java
@@ -21,7 +21,8 @@ import org.apache.kafka.common.message.ConsumerGroupHeartbeatResponseData;
import java.util.Optional;
/**
- * Manages group membership for a single member.
+ * A stateful object tracking the state of a single member in relationship to a consumer group:
+ *
* Responsible for:
* Keeping member state
* Keeping assignment for the member
@@ -29,50 +30,73 @@ import java.util.Optional;
*/
public interface MembershipManager {
+ /**
+ * @return Group ID of the consumer group the member is part of (or wants to be part of).
+ */
String groupId();
+ /**
+ * @return Instance ID used by the member when joining the group. If non-empty, it will indicate that
+ * this is a static member.
+ */
Optional groupInstanceId();
+ /**
+ * @return Member ID assigned by the server to this member when it joins the consumer group.
+ */
String memberId();
+ /**
+ * @return Current epoch of the member, maintained by the server.
+ */
int memberEpoch();
+ /**
+ * @return Current state of this member in relationship to a consumer group, as defined in
+ * {@link MemberState}.
+ */
MemberState state();
/**
- * Update the current state of the member based on a heartbeat response
+ * Update member info and transition member state based on a heartbeat response.
+ *
+ * @param response Heartbeat response to extract member info and errors from.
*/
void updateState(ConsumerGroupHeartbeatResponseData response);
/**
- * Returns the {@link AssignorSelection} for the member
+ * @return Server-side assignor implementation configured for the member, that will be sent
+ * out to the server to be used. If empty, then the server will select the assignor.
*/
- AssignorSelection assignorSelection();
+ Optional serverAssignor();
/**
- * Returns the current assignment for the member
+ * @return Current assignment for the member.
*/
- ConsumerGroupHeartbeatResponseData.Assignment assignment();
+ ConsumerGroupHeartbeatResponseData.Assignment currentAssignment();
/**
* Update the assignment for the member, indicating that the provided assignment is the new
* current assignment.
*/
- void updateAssignment(ConsumerGroupHeartbeatResponseData.Assignment assignment);
+ void onTargetAssignmentProcessComplete(ConsumerGroupHeartbeatResponseData.Assignment assignment);
/**
- * Transition the member to the FENCED state. This is only invoked when the heartbeat returns a
- * FENCED_MEMBER_EPOCH or UNKNOWN_MEMBER_ID error code.
+ * Transition the member to the FENCED state and update the member info as required. This is
+ * only invoked when the heartbeat returns a FENCED_MEMBER_EPOCH or UNKNOWN_MEMBER_ID error.
+ * code.
*/
void transitionToFenced();
/**
- * Transition the member to the FAILED state. This is invoked when the heartbeat returns a non-retriable error.
+ * Transition the member to the FAILED state and update the member info as required. This is
+ * invoked when un-recoverable errors occur (ex. when the heartbeat returns a non-retriable
+ * error or when errors occur while executing the user-provided callbacks)
*/
void transitionToFailed();
/**
- * Return true if the member should send heartbeat to the coordinator
+ * @return True if the member should send heartbeat to the coordinator.
*/
boolean shouldSendHeartbeat();
}
diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/MembershipManagerImpl.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/MembershipManagerImpl.java
index f4f97bb36f9..2a9a5d2992d 100644
--- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/MembershipManagerImpl.java
+++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/MembershipManagerImpl.java
@@ -19,100 +19,143 @@ package org.apache.kafka.clients.consumer.internals;
import org.apache.kafka.common.message.ConsumerGroupHeartbeatResponseData;
import org.apache.kafka.common.protocol.Errors;
+import org.apache.kafka.common.requests.ConsumerGroupHeartbeatRequest;
+import org.apache.kafka.common.utils.LogContext;
+import org.slf4j.Logger;
import java.util.Optional;
/**
- * Membership manager that maintains group membership for a single member following the new
+ * Membership manager that maintains group membership for a single member, following the new
* consumer group protocol.
*
- * This keeps membership state and assignment updated in-memory, based on the heartbeat responses
- * the member receives. It is also responsible for computing assignment for the group based on
- * the metadata, if the member has been selected by the broker to do so.
+ * This is responsible for:
+ * Keeping member info (ex. member id, member epoch, assignment, etc.)
+ * Keeping member state as defined in {@link MemberState}.
+ *
+ * Member info and state are updated based on the heartbeat responses the member receives.
*/
public class MembershipManagerImpl implements MembershipManager {
+ /**
+ * Group ID of the consumer group the member will be part of, provided when creating the current
+ * membership manager.
+ */
private final String groupId;
+
+ /**
+ * Group instance ID to be used by the member, provided when creating the current membership manager.
+ */
private final Optional groupInstanceId;
+
+ /**
+ * Member ID assigned by the server to the member, received in a heartbeat response when
+ * joining the group specified in {@link #groupId}
+ */
private String memberId;
+
+ /**
+ * Current epoch of the member. It will be set to 0 by the member, and provided to the server
+ * on the heartbeat request, to join the group. It will be then maintained by the server,
+ * incremented as the member reconciles and acknowledges the assignments it receives. It will
+ * be reset to 0 if the member gets fenced.
+ */
private int memberEpoch;
+
+ /**
+ * Current state of this member as part of the consumer group, as defined in {@link MemberState}
+ */
private MemberState state;
- private AssignorSelection assignorSelection;
+
+ /**
+ * Name of the server-side assignor this member has configured to use. It will be sent
+ * out to the server on the {@link ConsumerGroupHeartbeatRequest}. If not defined, the server
+ * will select the assignor implementation to use.
+ */
+ private final Optional serverAssignor;
/**
* Assignment that the member received from the server and successfully processed.
*/
private ConsumerGroupHeartbeatResponseData.Assignment currentAssignment;
+
/**
* Assignment that the member received from the server but hasn't completely processed
* yet.
*/
private Optional targetAssignment;
+
/**
- * Latest assignment that the member received from the server while a {@link #targetAssignment}
- * was in process.
+ * Logger.
*/
- private Optional nextTargetAssignment;
+ private final Logger log;
- public MembershipManagerImpl(String groupId) {
- this(groupId, null, null);
+ public MembershipManagerImpl(String groupId, LogContext logContext) {
+ this(groupId, null, null, logContext);
}
- public MembershipManagerImpl(String groupId, String groupInstanceId, AssignorSelection assignorSelection) {
+ public MembershipManagerImpl(String groupId,
+ String groupInstanceId,
+ String serverAssignor,
+ LogContext logContext) {
this.groupId = groupId;
this.state = MemberState.UNJOINED;
- if (assignorSelection == null) {
- setAssignorSelection(AssignorSelection.defaultAssignor());
- } else {
- setAssignorSelection(assignorSelection);
- }
+ this.serverAssignor = Optional.ofNullable(serverAssignor);
this.groupInstanceId = Optional.ofNullable(groupInstanceId);
this.targetAssignment = Optional.empty();
- this.nextTargetAssignment = Optional.empty();
+ this.log = logContext.logger(MembershipManagerImpl.class);
}
/**
- * Update assignor selection for the member.
+ * Update the member state, setting it to the nextState only if it is a valid transition.
*
- * @param assignorSelection New assignor selection
- * @throws IllegalArgumentException If the provided assignor selection is null
+ * @throws IllegalStateException If transitioning from the member {@link #state} to the
+ * nextState is not allowed as defined in {@link MemberState}.
*/
- public final void setAssignorSelection(AssignorSelection assignorSelection) {
- if (assignorSelection == null) {
- throw new IllegalArgumentException("Assignor selection cannot be null");
- }
- this.assignorSelection = assignorSelection;
- }
-
private void transitionTo(MemberState nextState) {
if (!this.state.equals(nextState) && !nextState.getPreviousValidStates().contains(state)) {
- // TODO: handle invalid state transition
- throw new RuntimeException(String.format("Invalid state transition from %s to %s",
+ throw new IllegalStateException(String.format("Invalid state transition from %s to %s",
state, nextState));
}
+ log.trace("Member {} transitioned from {} to {}.", memberId, state, nextState);
this.state = nextState;
}
+ /**
+ * {@inheritDoc}
+ */
@Override
public String groupId() {
return groupId;
}
+ /**
+ * {@inheritDoc}
+ */
@Override
public Optional groupInstanceId() {
return groupInstanceId;
}
+ /**
+ * {@inheritDoc}
+ */
@Override
public String memberId() {
return memberId;
}
+ /**
+ * {@inheritDoc}
+ */
@Override
public int memberEpoch() {
return memberEpoch;
}
+ /**
+ * {@inheritDoc}
+ */
@Override
public void updateState(ConsumerGroupHeartbeatResponseData response) {
if (response.errorCode() != Errors.NONE.code()) {
@@ -120,7 +163,7 @@ public class MembershipManagerImpl implements MembershipManager {
"Unexpected error in Heartbeat response. Expected no error, but received: %s",
Errors.forCode(response.errorCode())
);
- throw new IllegalStateException(errorMessage);
+ throw new IllegalArgumentException(errorMessage);
}
this.memberId = response.memberId();
this.memberEpoch = response.memberEpoch();
@@ -131,14 +174,21 @@ public class MembershipManagerImpl implements MembershipManager {
maybeTransitionToStable();
}
+ /**
+ * {@inheritDoc}
+ */
@Override
public void transitionToFenced() {
resetEpoch();
transitionTo(MemberState.FENCED);
}
+ /**
+ * {@inheritDoc}
+ */
@Override
public void transitionToFailed() {
+ log.error("Member {} transitioned to {} state", memberId, MemberState.FAILED);
transitionTo(MemberState.FAILED);
}
@@ -160,92 +210,92 @@ public class MembershipManagerImpl implements MembershipManager {
return state.equals(MemberState.STABLE);
}
+ /**
+ * Take new target assignment received from the server and set it as targetAssignment to be
+ * processed. Following the consumer group protocol, the server won't send a new target
+ * member while a previous one hasn't been acknowledged by the member, so this will fail
+ * if a target assignment already exists.
+ *
+ * @throws IllegalStateException If a target assignment already exists.
+ */
private void setTargetAssignment(ConsumerGroupHeartbeatResponseData.Assignment newTargetAssignment) {
if (!targetAssignment.isPresent()) {
+ log.info("Member {} accepted new target assignment {} to reconcile", memberId, newTargetAssignment);
targetAssignment = Optional.of(newTargetAssignment);
} else {
- // Keep the latest next target assignment
- nextTargetAssignment = Optional.of(newTargetAssignment);
+ transitionToFailed();
+ throw new IllegalStateException("Cannot set new target assignment because a " +
+ "previous one pending to be reconciled already exists.");
}
}
- private boolean hasPendingTargetAssignment() {
- return targetAssignment.isPresent() || nextTargetAssignment.isPresent();
- }
-
-
/**
- * Update state and assignment as the member has successfully processed a new target
- * assignment.
- * This indicates the end of the reconciliation phase for the member, and makes the target
- * assignment the new current assignment.
- *
- * @param assignment Target assignment the member was able to successfully process
+ * Returns true if the member has a target assignment being processed.
*/
- public void onAssignmentProcessSuccess(ConsumerGroupHeartbeatResponseData.Assignment assignment) {
- updateAssignment(assignment);
- transitionTo(MemberState.STABLE);
- }
-
- /**
- * Update state and member info as the member was not able to process the assignment, due to
- * errors in the execution of the user-provided callbacks.
- *
- * @param error Exception found during the execution of the user-provided callbacks
- */
- public void onAssignmentProcessFailure(Throwable error) {
- transitionTo(MemberState.FAILED);
- // TODO: update member info appropriately, to clear up whatever shouldn't be kept in
- // this unrecoverable state
+ private boolean hasPendingTargetAssignment() {
+ return targetAssignment.isPresent();
}
private void resetEpoch() {
this.memberEpoch = 0;
}
+ /**
+ * {@inheritDoc}
+ */
@Override
public MemberState state() {
return state;
}
+ /**
+ * {@inheritDoc}
+ */
@Override
- public AssignorSelection assignorSelection() {
- return this.assignorSelection;
+ public Optional serverAssignor() {
+ return this.serverAssignor;
}
+ /**
+ * {@inheritDoc}
+ */
@Override
- public ConsumerGroupHeartbeatResponseData.Assignment assignment() {
+ public ConsumerGroupHeartbeatResponseData.Assignment currentAssignment() {
return this.currentAssignment;
}
- // VisibleForTesting
+
+ /**
+ * @return Assignment that the member received from the server but hasn't completely processed
+ * yet. Visible for testing.
+ */
Optional targetAssignment() {
return targetAssignment;
}
- // VisibleForTesting
- Optional nextTargetAssignment() {
- return nextTargetAssignment;
- }
-
/**
- * Set the current assignment for the member. This indicates that the reconciliation of the
- * target assignment has been successfully completed.
- * This will clear the {@link #targetAssignment}, and take on the
- * {@link #nextTargetAssignment} if any.
+ * This indicates that the reconciliation of the target assignment has been successfully
+ * completed, so it will make it effective by assigning it to the current assignment.
*
- * @param assignment Assignment that has been successfully processed as part of the
- * reconciliation process.
+ * @params Assignment that has been successfully reconciled. This is expected to
+ * match the target assignment defined in {@link #targetAssignment()}
*/
@Override
- public void updateAssignment(ConsumerGroupHeartbeatResponseData.Assignment assignment) {
- this.currentAssignment = assignment;
- if (!nextTargetAssignment.isPresent()) {
- targetAssignment = Optional.empty();
- } else {
- targetAssignment = Optional.of(nextTargetAssignment.get());
- nextTargetAssignment = Optional.empty();
+ public void onTargetAssignmentProcessComplete(ConsumerGroupHeartbeatResponseData.Assignment assignment) {
+ if (assignment == null) {
+ throw new IllegalArgumentException("Assignment cannot be null");
}
- maybeTransitionToStable();
+ if (!assignment.equals(targetAssignment.orElse(null))) {
+ // This could be simplified to remove the assignment param and just assume that what
+ // was reconciled was the targetAssignment, but keeping it explicit and failing fast
+ // here to uncover any issues in the interaction of the assignment processing logic
+ // and this.
+ throw new IllegalStateException(String.format("Reconciled assignment %s does not " +
+ "match the expected target assignment %s", assignment,
+ targetAssignment.orElse(null)));
+ }
+ this.currentAssignment = assignment;
+ targetAssignment = Optional.empty();
+ transitionTo(MemberState.STABLE);
}
}
diff --git a/clients/src/test/java/org/apache/kafka/clients/consumer/AssignorSelectionTest.java b/clients/src/test/java/org/apache/kafka/clients/consumer/AssignorSelectionTest.java
deleted file mode 100644
index 57c61e3dc14..00000000000
--- a/clients/src/test/java/org/apache/kafka/clients/consumer/AssignorSelectionTest.java
+++ /dev/null
@@ -1,58 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.kafka.clients.consumer;
-
-import org.apache.kafka.clients.consumer.internals.AssignorSelection;
-import org.junit.jupiter.api.Test;
-
-import static org.junit.jupiter.api.Assertions.assertEquals;
-import static org.junit.jupiter.api.Assertions.assertNotEquals;
-import static org.junit.jupiter.api.Assertions.assertThrows;
-import static org.junit.jupiter.api.Assertions.assertTrue;
-
-public class AssignorSelectionTest {
-
- @Test
- public void testServerAssignorCannotBeNullOrEmptyIfSelected() {
- assertThrows(IllegalArgumentException.class,
- () -> AssignorSelection.newServerAssignor(null));
- assertThrows(IllegalArgumentException.class,
- () -> AssignorSelection.newServerAssignor(""));
- }
-
- @Test
- public void testEquals() {
- // Server assignors
- AssignorSelection selection1 = AssignorSelection.newServerAssignor("range");
- AssignorSelection selection2 = AssignorSelection.newServerAssignor("range");
- assertEquals(selection1, selection1);
- assertEquals(selection1, selection2);
- AssignorSelection selection3 = AssignorSelection.newServerAssignor("uniform");
- assertNotEquals(selection1, selection3);
- assertNotEquals(selection1, null);
- }
-
- @Test
- public void testServerAssignorSelection() {
- String assignorName = "uniform";
- AssignorSelection selection = AssignorSelection.newServerAssignor(assignorName);
- assertEquals(AssignorSelection.Type.SERVER, selection.type());
- assertTrue(selection.serverAssignor().isPresent());
- assertEquals(assignorName, selection.serverAssignor().get());
- }
-}
diff --git a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/HeartbeatRequestManagerTest.java b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/HeartbeatRequestManagerTest.java
index 598deace7f5..0856b4ebcb9 100644
--- a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/HeartbeatRequestManagerTest.java
+++ b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/HeartbeatRequestManagerTest.java
@@ -94,7 +94,7 @@ public class HeartbeatRequestManagerTest {
coordinatorRequestManager = mock(CoordinatorRequestManager.class);
when(coordinatorRequestManager.coordinator()).thenReturn(Optional.of(new Node(1, "localhost", 9999)));
subscriptionState = mock(SubscriptionState.class);
- membershipManager = spy(new MembershipManagerImpl(GROUP_ID));
+ membershipManager = spy(new MembershipManagerImpl(GROUP_ID, logContext));
heartbeatRequestState = mock(HeartbeatRequestManager.HeartbeatRequestState.class);
errorEventHandler = mock(ErrorEventHandler.class);
heartbeatRequestManager = createManager();
@@ -234,7 +234,7 @@ public class HeartbeatRequestManagerTest {
Properties prop = createConsumerConfig();
prop.setProperty(MAX_POLL_INTERVAL_MS_CONFIG, "10000");
config = new ConsumerConfig(prop);
- membershipManager = new MembershipManagerImpl(GROUP_ID, GROUP_INSTANCE_ID, null);
+ membershipManager = new MembershipManagerImpl(GROUP_ID, GROUP_INSTANCE_ID, null, logContext);
heartbeatRequestState = new HeartbeatRequestManager.HeartbeatRequestState(
logContext,
time,
diff --git a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/MembershipManagerImplTest.java b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/MembershipManagerImplTest.java
index 4b6aa80c04e..d78bbf2ab63 100644
--- a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/MembershipManagerImplTest.java
+++ b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/MembershipManagerImplTest.java
@@ -21,9 +21,12 @@ import org.apache.kafka.common.Uuid;
import org.apache.kafka.common.message.ConsumerGroupHeartbeatResponseData;
import org.apache.kafka.common.protocol.Errors;
import org.apache.kafka.common.requests.ConsumerGroupHeartbeatResponse;
+import org.apache.kafka.common.utils.LogContext;
import org.junit.jupiter.api.Test;
import java.util.Arrays;
+import java.util.Collections;
+import java.util.Optional;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertFalse;
@@ -35,40 +38,26 @@ public class MembershipManagerImplTest {
private static final String GROUP_ID = "test-group";
private static final String MEMBER_ID = "test-member-1";
private static final int MEMBER_EPOCH = 1;
+ private final LogContext logContext = new LogContext();
@Test
- public void testMembershipManagerDefaultAssignor() {
- MembershipManagerImpl membershipManager = new MembershipManagerImpl(GROUP_ID);
- assertEquals(AssignorSelection.defaultAssignor(), membershipManager.assignorSelection());
+ public void testMembershipManagerServerAssignor() {
+ MembershipManagerImpl membershipManager = new MembershipManagerImpl(GROUP_ID, logContext);
+ assertEquals(Optional.empty(), membershipManager.serverAssignor());
- membershipManager = new MembershipManagerImpl(GROUP_ID, "instance1", null);
- assertEquals(AssignorSelection.defaultAssignor(), membershipManager.assignorSelection());
- }
-
- @Test
- public void testMembershipManagerAssignorSelectionUpdate() {
- AssignorSelection firstAssignorSelection = AssignorSelection.newServerAssignor("uniform");
- MembershipManagerImpl membershipManager = new MembershipManagerImpl(GROUP_ID, "instance1",
- firstAssignorSelection);
- assertEquals(firstAssignorSelection, membershipManager.assignorSelection());
-
- AssignorSelection secondAssignorSelection = AssignorSelection.newServerAssignor("range");
- membershipManager.setAssignorSelection(secondAssignorSelection);
- assertEquals(secondAssignorSelection, membershipManager.assignorSelection());
-
- assertThrows(IllegalArgumentException.class,
- () -> membershipManager.setAssignorSelection(null));
+ membershipManager = new MembershipManagerImpl(GROUP_ID, "instance1", "Uniform", logContext);
+ assertEquals(Optional.of("Uniform"), membershipManager.serverAssignor());
}
@Test
public void testMembershipManagerInitSupportsEmptyGroupInstanceId() {
- new MembershipManagerImpl(GROUP_ID);
- new MembershipManagerImpl(GROUP_ID, null, AssignorSelection.defaultAssignor());
+ new MembershipManagerImpl(GROUP_ID, logContext);
+ new MembershipManagerImpl(GROUP_ID, null, null, logContext);
}
@Test
public void testTransitionToReconcilingOnlyIfAssignmentReceived() {
- MembershipManagerImpl membershipManager = new MembershipManagerImpl(GROUP_ID);
+ MembershipManagerImpl membershipManager = new MembershipManagerImpl(GROUP_ID, logContext);
assertEquals(MemberState.UNJOINED, membershipManager.state());
ConsumerGroupHeartbeatResponse responseWithoutAssignment =
@@ -84,7 +73,7 @@ public class MembershipManagerImplTest {
@Test
public void testMemberIdAndEpochResetOnFencedMembers() {
- MembershipManagerImpl membershipManager = new MembershipManagerImpl(GROUP_ID);
+ MembershipManagerImpl membershipManager = new MembershipManagerImpl(GROUP_ID, logContext);
ConsumerGroupHeartbeatResponse heartbeatResponse =
createConsumerGroupHeartbeatResponse(null);
membershipManager.updateState(heartbeatResponse.data());
@@ -99,7 +88,7 @@ public class MembershipManagerImplTest {
@Test
public void testTransitionToFailure() {
- MembershipManagerImpl membershipManager = new MembershipManagerImpl(GROUP_ID);
+ MembershipManagerImpl membershipManager = new MembershipManagerImpl(GROUP_ID, logContext);
ConsumerGroupHeartbeatResponse heartbeatResponse =
createConsumerGroupHeartbeatResponse(null);
membershipManager.updateState(heartbeatResponse.data());
@@ -112,62 +101,144 @@ public class MembershipManagerImplTest {
}
@Test
- public void testUpdateAssignment() {
- MembershipManagerImpl membershipManager = new MembershipManagerImpl(GROUP_ID);
+ public void testFencingWhenStateIsStable() {
+ MembershipManagerImpl membershipManager = new MembershipManagerImpl(GROUP_ID, logContext);
+ ConsumerGroupHeartbeatResponse heartbeatResponse = createConsumerGroupHeartbeatResponse(null);
+ membershipManager.updateState(heartbeatResponse.data());
+ assertEquals(MemberState.STABLE, membershipManager.state());
+
+ testStateUpdateOnFenceError(membershipManager);
+ }
+
+ @Test
+ public void testFencingWhenStateIsReconciling() {
+ MembershipManagerImpl membershipManager = new MembershipManagerImpl(GROUP_ID, logContext);
+ ConsumerGroupHeartbeatResponse heartbeatResponse = createConsumerGroupHeartbeatResponse(createAssignment());
+ membershipManager.updateState(heartbeatResponse.data());
+ assertEquals(MemberState.RECONCILING, membershipManager.state());
+
+ testStateUpdateOnFenceError(membershipManager);
+ }
+
+ @Test
+ public void testFatalFailureWhenStateIsUnjoined() {
+ MembershipManagerImpl membershipManager = new MembershipManagerImpl(GROUP_ID, logContext);
+ assertEquals(MemberState.UNJOINED, membershipManager.state());
+
+ testStateUpdateOnFatalFailure(membershipManager);
+ }
+
+ @Test
+ public void testFatalFailureWhenStateIsStable() {
+ MembershipManagerImpl membershipManager = new MembershipManagerImpl(GROUP_ID, logContext);
+ ConsumerGroupHeartbeatResponse heartbeatResponse = createConsumerGroupHeartbeatResponse(null);
+ membershipManager.updateState(heartbeatResponse.data());
+ assertEquals(MemberState.STABLE, membershipManager.state());
+
+ testStateUpdateOnFatalFailure(membershipManager);
+ }
+
+ @Test
+ public void testFencingShouldNotHappenWhenStateIsUnjoined() {
+ MembershipManagerImpl membershipManager = new MembershipManagerImpl(GROUP_ID, logContext);
+ assertEquals(MemberState.UNJOINED, membershipManager.state());
+
+ // Getting fenced when the member is not part of the group is not expected and should
+ // fail with invalid transition.
+ assertThrows(IllegalStateException.class, membershipManager::transitionToFenced);
+ }
+
+ @Test
+ public void testUpdateStateFailsOnResponsesWithErrors() {
+ MembershipManagerImpl membershipManager = new MembershipManagerImpl(GROUP_ID, logContext);
+ // Updating state with a heartbeat response containing errors cannot be performed and
+ // should fail.
+ ConsumerGroupHeartbeatResponse unknownMemberResponse =
+ createConsumerGroupHeartbeatResponseWithError(Errors.UNKNOWN_MEMBER_ID);
+ assertThrows(IllegalArgumentException.class,
+ () -> membershipManager.updateState(unknownMemberResponse.data()));
+ }
+
+ @Test
+ public void testAssignmentUpdatedAsReceivedAndProcessed() {
+ MembershipManagerImpl membershipManager = new MembershipManagerImpl(GROUP_ID, logContext);
ConsumerGroupHeartbeatResponseData.Assignment newAssignment = createAssignment();
ConsumerGroupHeartbeatResponse heartbeatResponse =
createConsumerGroupHeartbeatResponse(newAssignment);
membershipManager.updateState(heartbeatResponse.data());
// Target assignment should be in the process of being reconciled
- checkAssignments(membershipManager, null, newAssignment, null);
+ checkAssignments(membershipManager, null, newAssignment);
+ // Mark assignment processing completed
+ membershipManager.onTargetAssignmentProcessComplete(newAssignment);
+ // Target assignment should now be the current assignment
+ checkAssignments(membershipManager, newAssignment, null);
}
@Test
- public void testUpdateAssignmentReceivingAssignmentWhileAnotherInProcess() {
- MembershipManagerImpl membershipManager = new MembershipManagerImpl(GROUP_ID);
+ public void testMemberFailsIfAssignmentReceivedWhileAnotherOnBeingReconciled() {
+ MembershipManagerImpl membershipManager = new MembershipManagerImpl(GROUP_ID, logContext);
ConsumerGroupHeartbeatResponseData.Assignment newAssignment1 = createAssignment();
membershipManager.updateState(createConsumerGroupHeartbeatResponse(newAssignment1).data());
// First target assignment received should be in the process of being reconciled
- checkAssignments(membershipManager, null, newAssignment1, null);
+ checkAssignments(membershipManager, null, newAssignment1);
// Second target assignment received while there is another one being reconciled
ConsumerGroupHeartbeatResponseData.Assignment newAssignment2 = createAssignment();
- membershipManager.updateState(createConsumerGroupHeartbeatResponse(newAssignment2).data());
- checkAssignments(membershipManager, null, newAssignment1, newAssignment2);
+ assertThrows(IllegalStateException.class,
+ () -> membershipManager.updateState(createConsumerGroupHeartbeatResponse(newAssignment2).data()));
+ assertEquals(MemberState.FAILED, membershipManager.state());
}
@Test
- public void testNextTargetAssignmentHoldsLatestAssignmentReceivedWhileAnotherInProcess() {
- MembershipManagerImpl membershipManager = new MembershipManagerImpl(GROUP_ID);
- ConsumerGroupHeartbeatResponseData.Assignment newAssignment1 = createAssignment();
- membershipManager.updateState(createConsumerGroupHeartbeatResponse(newAssignment1).data());
-
- // First target assignment received, remains in the process of being reconciled
- checkAssignments(membershipManager, null, newAssignment1, null);
-
- // Second target assignment received while there is another one being reconciled
- ConsumerGroupHeartbeatResponseData.Assignment newAssignment2 = createAssignment();
- membershipManager.updateState(createConsumerGroupHeartbeatResponse(newAssignment2).data());
- checkAssignments(membershipManager, null, newAssignment1, newAssignment2);
+ public void testAssignmentUpdatedFailsIfAssignmentReconciledDoesNotMatchTargetAssignment() {
+ MembershipManagerImpl membershipManager = new MembershipManagerImpl(GROUP_ID, logContext);
+ ConsumerGroupHeartbeatResponseData.Assignment targetAssignment = new ConsumerGroupHeartbeatResponseData.Assignment()
+ .setTopicPartitions(Collections.singletonList(
+ new ConsumerGroupHeartbeatResponseData.TopicPartitions()
+ .setTopicId(Uuid.randomUuid())
+ .setPartitions(Arrays.asList(0, 1, 2))));
+ ConsumerGroupHeartbeatResponse heartbeatResponse =
+ createConsumerGroupHeartbeatResponse(targetAssignment);
+ membershipManager.updateState(heartbeatResponse.data());
- // If more assignments are received while there is one being reconciled, the most recent
- // assignment received is kept as nextTargetAssignment
- ConsumerGroupHeartbeatResponseData.Assignment newAssignment3 = createAssignment();
- membershipManager.updateState(createConsumerGroupHeartbeatResponse(newAssignment3).data());
- checkAssignments(membershipManager, null, newAssignment1, newAssignment3);
+ // Target assignment should be in the process of being reconciled
+ checkAssignments(membershipManager, null, targetAssignment);
+ // Mark assignment processing completed
+ ConsumerGroupHeartbeatResponseData.Assignment reconciled =
+ new ConsumerGroupHeartbeatResponseData.Assignment()
+ .setTopicPartitions(Collections.singletonList(
+ new ConsumerGroupHeartbeatResponseData.TopicPartitions()
+ .setTopicId(Uuid.randomUuid())
+ .setPartitions(Collections.singletonList(0))));
+ assertThrows(IllegalStateException.class, () -> membershipManager.onTargetAssignmentProcessComplete(reconciled));
}
private void checkAssignments(
MembershipManagerImpl membershipManager,
ConsumerGroupHeartbeatResponseData.Assignment expectedCurrentAssignment,
- ConsumerGroupHeartbeatResponseData.Assignment expectedTargetAssignment,
- ConsumerGroupHeartbeatResponseData.Assignment expectedNextTargetAssignment) {
- assertEquals(expectedCurrentAssignment, membershipManager.assignment());
+ ConsumerGroupHeartbeatResponseData.Assignment expectedTargetAssignment) {
+ assertEquals(expectedCurrentAssignment, membershipManager.currentAssignment());
assertEquals(expectedTargetAssignment, membershipManager.targetAssignment().orElse(null));
- assertEquals(expectedNextTargetAssignment, membershipManager.nextTargetAssignment().orElse(null));
+ }
+ private void testStateUpdateOnFenceError(MembershipManager membershipManager) {
+ membershipManager.transitionToFenced();
+ assertEquals(MemberState.FENCED, membershipManager.state());
+ // Should reset member epoch and keep member id
+ assertFalse(membershipManager.memberId().isEmpty());
+ assertEquals(0, membershipManager.memberEpoch());
+ }
+
+ private void testStateUpdateOnFatalFailure(MembershipManager membershipManager) {
+ String initialMemberId = membershipManager.memberId();
+ int initialMemberEpoch = membershipManager.memberEpoch();
+ membershipManager.transitionToFailed();
+ assertEquals(MemberState.FAILED, membershipManager.state());
+ // Should not reset member id or epoch
+ assertEquals(initialMemberId, membershipManager.memberId());
+ assertEquals(initialMemberEpoch, membershipManager.memberEpoch());
}
private ConsumerGroupHeartbeatResponse createConsumerGroupHeartbeatResponse(ConsumerGroupHeartbeatResponseData.Assignment assignment) {