From 48449b68fd70c96451e03afe7d3cd5b2800240fa Mon Sep 17 00:00:00 2001 From: Lianet Magrans <98415067+lianetm@users.noreply.github.com> Date: Wed, 18 Oct 2023 11:10:18 -0400 Subject: [PATCH] KAFKA-15554: Client state changes for handling one assignment at a time & minor improvements (#14413) This patch includes: - target assignment changes : accepting only one at a time according to the updated protocol. - changes for error handling, leaving responsibility in the heartbeatManager and exposing only the functionality for when the state needs to be updated (on successful HB, on fencing, on fatal failure) - allow transitions for failures when joining - tests & minor improvements/fixes addressing initial version review Reviewers: Kirk True , Philip Nee , David Jacot --- .../consumer/internals/AssignorSelection.java | 88 -------- .../internals/DefaultBackgroundThread.java | 2 +- .../internals/HeartbeatRequestManager.java | 2 +- .../consumer/internals/MemberState.java | 2 +- .../consumer/internals/MembershipManager.java | 46 +++- .../internals/MembershipManagerImpl.java | 212 +++++++++++------- .../consumer/AssignorSelectionTest.java | 58 ----- .../HeartbeatRequestManagerTest.java | 4 +- .../internals/MembershipManagerImplTest.java | 179 ++++++++++----- 9 files changed, 296 insertions(+), 297 deletions(-) delete mode 100644 clients/src/main/java/org/apache/kafka/clients/consumer/internals/AssignorSelection.java delete mode 100644 clients/src/test/java/org/apache/kafka/clients/consumer/AssignorSelectionTest.java diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AssignorSelection.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AssignorSelection.java deleted file mode 100644 index 5eaae957ea5..00000000000 --- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AssignorSelection.java +++ /dev/null @@ -1,88 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.kafka.clients.consumer.internals; - -import java.util.Objects; -import java.util.Optional; - -/** - * Selection of a type of assignor used by a member to get partitions assigned as part of a - * consumer group. Currently supported assignors are: - *
  • SERVER assignors
  • - *

    - * Server assignors include a name of the server assignor selected, ex. uniform, range. - */ -public class AssignorSelection { - public enum Type { SERVER } - - private final AssignorSelection.Type type; - private final Optional serverAssignor; - - private AssignorSelection(Type type, String serverAssignor) { - this.type = type; - if (type == Type.SERVER) { - this.serverAssignor = Optional.ofNullable(serverAssignor); - } else { - throw new IllegalArgumentException("Unsupported assignor type " + type); - } - } - - public static AssignorSelection newServerAssignor(String serverAssignor) { - if (serverAssignor == null) { - throw new IllegalArgumentException("Selected server assignor name cannot be null"); - } - if (serverAssignor.isEmpty()) { - throw new IllegalArgumentException("Selected server assignor name cannot be empty"); - } - return new AssignorSelection(Type.SERVER, serverAssignor); - } - - public static AssignorSelection defaultAssignor() { - // TODO: review default selection - return new AssignorSelection(Type.SERVER, null); - } - - public Optional serverAssignor() { - return this.serverAssignor; - } - - public Type type() { - return this.type; - } - - @Override - public boolean equals(Object assignorSelection) { - if (this == assignorSelection) return true; - if (assignorSelection == null || getClass() != assignorSelection.getClass()) return false; - return Objects.equals(((AssignorSelection) assignorSelection).type, this.type) && - Objects.equals(((AssignorSelection) assignorSelection).serverAssignor, this.serverAssignor); - } - - @Override - public int hashCode() { - return Objects.hash(type, serverAssignor); - } - - @Override - public String toString() { - return "AssignorSelection(" + - "type=" + type + ", " + - "serverAssignor='" + serverAssignor + '\'' + - ')'; - } -} diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/DefaultBackgroundThread.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/DefaultBackgroundThread.java index 774f4a141b8..74322e6fec0 100644 --- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/DefaultBackgroundThread.java +++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/DefaultBackgroundThread.java @@ -194,7 +194,7 @@ public class DefaultBackgroundThread extends KafkaThread { config, coordinatorRequestManager, groupState); - MembershipManager membershipManager = new MembershipManagerImpl(groupState.groupId); + MembershipManager membershipManager = new MembershipManagerImpl(groupState.groupId, logContext); heartbeatRequestManager = new HeartbeatRequestManager( this.time, logContext, diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/HeartbeatRequestManager.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/HeartbeatRequestManager.java index 7941c4effb2..2e821693fd6 100644 --- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/HeartbeatRequestManager.java +++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/HeartbeatRequestManager.java @@ -179,7 +179,7 @@ public class HeartbeatRequestManager implements RequestManager { data.setSubscribedTopicNames(new ArrayList<>(this.subscriptions.subscription())); } - this.membershipManager.assignorSelection().serverAssignor().ifPresent(data::setServerAssignor); + this.membershipManager.serverAssignor().ifPresent(data::setServerAssignor); NetworkClientDelegate.UnsentRequest request = new NetworkClientDelegate.UnsentRequest( new ConsumerGroupHeartbeatRequest.Builder(data), diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/MemberState.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/MemberState.java index 4da2eb54e5c..ebcb279cf37 100644 --- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/MemberState.java +++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/MemberState.java @@ -63,7 +63,7 @@ public enum MemberState { RECONCILING.previousValidStates = Arrays.asList(STABLE, UNJOINED); - FAILED.previousValidStates = Arrays.asList(STABLE, RECONCILING); + FAILED.previousValidStates = Arrays.asList(UNJOINED, STABLE, RECONCILING); FENCED.previousValidStates = Arrays.asList(STABLE, RECONCILING); diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/MembershipManager.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/MembershipManager.java index c0fb9ed3903..8a95a80c659 100644 --- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/MembershipManager.java +++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/MembershipManager.java @@ -21,7 +21,8 @@ import org.apache.kafka.common.message.ConsumerGroupHeartbeatResponseData; import java.util.Optional; /** - * Manages group membership for a single member. + * A stateful object tracking the state of a single member in relationship to a consumer group: + *

    * Responsible for: *

  • Keeping member state
  • *
  • Keeping assignment for the member
  • @@ -29,50 +30,73 @@ import java.util.Optional; */ public interface MembershipManager { + /** + * @return Group ID of the consumer group the member is part of (or wants to be part of). + */ String groupId(); + /** + * @return Instance ID used by the member when joining the group. If non-empty, it will indicate that + * this is a static member. + */ Optional groupInstanceId(); + /** + * @return Member ID assigned by the server to this member when it joins the consumer group. + */ String memberId(); + /** + * @return Current epoch of the member, maintained by the server. + */ int memberEpoch(); + /** + * @return Current state of this member in relationship to a consumer group, as defined in + * {@link MemberState}. + */ MemberState state(); /** - * Update the current state of the member based on a heartbeat response + * Update member info and transition member state based on a heartbeat response. + * + * @param response Heartbeat response to extract member info and errors from. */ void updateState(ConsumerGroupHeartbeatResponseData response); /** - * Returns the {@link AssignorSelection} for the member + * @return Server-side assignor implementation configured for the member, that will be sent + * out to the server to be used. If empty, then the server will select the assignor. */ - AssignorSelection assignorSelection(); + Optional serverAssignor(); /** - * Returns the current assignment for the member + * @return Current assignment for the member. */ - ConsumerGroupHeartbeatResponseData.Assignment assignment(); + ConsumerGroupHeartbeatResponseData.Assignment currentAssignment(); /** * Update the assignment for the member, indicating that the provided assignment is the new * current assignment. */ - void updateAssignment(ConsumerGroupHeartbeatResponseData.Assignment assignment); + void onTargetAssignmentProcessComplete(ConsumerGroupHeartbeatResponseData.Assignment assignment); /** - * Transition the member to the FENCED state. This is only invoked when the heartbeat returns a - * FENCED_MEMBER_EPOCH or UNKNOWN_MEMBER_ID error code. + * Transition the member to the FENCED state and update the member info as required. This is + * only invoked when the heartbeat returns a FENCED_MEMBER_EPOCH or UNKNOWN_MEMBER_ID error. + * code. */ void transitionToFenced(); /** - * Transition the member to the FAILED state. This is invoked when the heartbeat returns a non-retriable error. + * Transition the member to the FAILED state and update the member info as required. This is + * invoked when un-recoverable errors occur (ex. when the heartbeat returns a non-retriable + * error or when errors occur while executing the user-provided callbacks) */ void transitionToFailed(); /** - * Return true if the member should send heartbeat to the coordinator + * @return True if the member should send heartbeat to the coordinator. */ boolean shouldSendHeartbeat(); } diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/MembershipManagerImpl.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/MembershipManagerImpl.java index f4f97bb36f9..2a9a5d2992d 100644 --- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/MembershipManagerImpl.java +++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/MembershipManagerImpl.java @@ -19,100 +19,143 @@ package org.apache.kafka.clients.consumer.internals; import org.apache.kafka.common.message.ConsumerGroupHeartbeatResponseData; import org.apache.kafka.common.protocol.Errors; +import org.apache.kafka.common.requests.ConsumerGroupHeartbeatRequest; +import org.apache.kafka.common.utils.LogContext; +import org.slf4j.Logger; import java.util.Optional; /** - * Membership manager that maintains group membership for a single member following the new + * Membership manager that maintains group membership for a single member, following the new * consumer group protocol. *

    - * This keeps membership state and assignment updated in-memory, based on the heartbeat responses - * the member receives. It is also responsible for computing assignment for the group based on - * the metadata, if the member has been selected by the broker to do so. + * This is responsible for: + *

  • Keeping member info (ex. member id, member epoch, assignment, etc.)
  • + *
  • Keeping member state as defined in {@link MemberState}.
  • + *

    + * Member info and state are updated based on the heartbeat responses the member receives. */ public class MembershipManagerImpl implements MembershipManager { + /** + * Group ID of the consumer group the member will be part of, provided when creating the current + * membership manager. + */ private final String groupId; + + /** + * Group instance ID to be used by the member, provided when creating the current membership manager. + */ private final Optional groupInstanceId; + + /** + * Member ID assigned by the server to the member, received in a heartbeat response when + * joining the group specified in {@link #groupId} + */ private String memberId; + + /** + * Current epoch of the member. It will be set to 0 by the member, and provided to the server + * on the heartbeat request, to join the group. It will be then maintained by the server, + * incremented as the member reconciles and acknowledges the assignments it receives. It will + * be reset to 0 if the member gets fenced. + */ private int memberEpoch; + + /** + * Current state of this member as part of the consumer group, as defined in {@link MemberState} + */ private MemberState state; - private AssignorSelection assignorSelection; + + /** + * Name of the server-side assignor this member has configured to use. It will be sent + * out to the server on the {@link ConsumerGroupHeartbeatRequest}. If not defined, the server + * will select the assignor implementation to use. + */ + private final Optional serverAssignor; /** * Assignment that the member received from the server and successfully processed. */ private ConsumerGroupHeartbeatResponseData.Assignment currentAssignment; + /** * Assignment that the member received from the server but hasn't completely processed * yet. */ private Optional targetAssignment; + /** - * Latest assignment that the member received from the server while a {@link #targetAssignment} - * was in process. + * Logger. */ - private Optional nextTargetAssignment; + private final Logger log; - public MembershipManagerImpl(String groupId) { - this(groupId, null, null); + public MembershipManagerImpl(String groupId, LogContext logContext) { + this(groupId, null, null, logContext); } - public MembershipManagerImpl(String groupId, String groupInstanceId, AssignorSelection assignorSelection) { + public MembershipManagerImpl(String groupId, + String groupInstanceId, + String serverAssignor, + LogContext logContext) { this.groupId = groupId; this.state = MemberState.UNJOINED; - if (assignorSelection == null) { - setAssignorSelection(AssignorSelection.defaultAssignor()); - } else { - setAssignorSelection(assignorSelection); - } + this.serverAssignor = Optional.ofNullable(serverAssignor); this.groupInstanceId = Optional.ofNullable(groupInstanceId); this.targetAssignment = Optional.empty(); - this.nextTargetAssignment = Optional.empty(); + this.log = logContext.logger(MembershipManagerImpl.class); } /** - * Update assignor selection for the member. + * Update the member state, setting it to the nextState only if it is a valid transition. * - * @param assignorSelection New assignor selection - * @throws IllegalArgumentException If the provided assignor selection is null + * @throws IllegalStateException If transitioning from the member {@link #state} to the + * nextState is not allowed as defined in {@link MemberState}. */ - public final void setAssignorSelection(AssignorSelection assignorSelection) { - if (assignorSelection == null) { - throw new IllegalArgumentException("Assignor selection cannot be null"); - } - this.assignorSelection = assignorSelection; - } - private void transitionTo(MemberState nextState) { if (!this.state.equals(nextState) && !nextState.getPreviousValidStates().contains(state)) { - // TODO: handle invalid state transition - throw new RuntimeException(String.format("Invalid state transition from %s to %s", + throw new IllegalStateException(String.format("Invalid state transition from %s to %s", state, nextState)); } + log.trace("Member {} transitioned from {} to {}.", memberId, state, nextState); this.state = nextState; } + /** + * {@inheritDoc} + */ @Override public String groupId() { return groupId; } + /** + * {@inheritDoc} + */ @Override public Optional groupInstanceId() { return groupInstanceId; } + /** + * {@inheritDoc} + */ @Override public String memberId() { return memberId; } + /** + * {@inheritDoc} + */ @Override public int memberEpoch() { return memberEpoch; } + /** + * {@inheritDoc} + */ @Override public void updateState(ConsumerGroupHeartbeatResponseData response) { if (response.errorCode() != Errors.NONE.code()) { @@ -120,7 +163,7 @@ public class MembershipManagerImpl implements MembershipManager { "Unexpected error in Heartbeat response. Expected no error, but received: %s", Errors.forCode(response.errorCode()) ); - throw new IllegalStateException(errorMessage); + throw new IllegalArgumentException(errorMessage); } this.memberId = response.memberId(); this.memberEpoch = response.memberEpoch(); @@ -131,14 +174,21 @@ public class MembershipManagerImpl implements MembershipManager { maybeTransitionToStable(); } + /** + * {@inheritDoc} + */ @Override public void transitionToFenced() { resetEpoch(); transitionTo(MemberState.FENCED); } + /** + * {@inheritDoc} + */ @Override public void transitionToFailed() { + log.error("Member {} transitioned to {} state", memberId, MemberState.FAILED); transitionTo(MemberState.FAILED); } @@ -160,92 +210,92 @@ public class MembershipManagerImpl implements MembershipManager { return state.equals(MemberState.STABLE); } + /** + * Take new target assignment received from the server and set it as targetAssignment to be + * processed. Following the consumer group protocol, the server won't send a new target + * member while a previous one hasn't been acknowledged by the member, so this will fail + * if a target assignment already exists. + * + * @throws IllegalStateException If a target assignment already exists. + */ private void setTargetAssignment(ConsumerGroupHeartbeatResponseData.Assignment newTargetAssignment) { if (!targetAssignment.isPresent()) { + log.info("Member {} accepted new target assignment {} to reconcile", memberId, newTargetAssignment); targetAssignment = Optional.of(newTargetAssignment); } else { - // Keep the latest next target assignment - nextTargetAssignment = Optional.of(newTargetAssignment); + transitionToFailed(); + throw new IllegalStateException("Cannot set new target assignment because a " + + "previous one pending to be reconciled already exists."); } } - private boolean hasPendingTargetAssignment() { - return targetAssignment.isPresent() || nextTargetAssignment.isPresent(); - } - - /** - * Update state and assignment as the member has successfully processed a new target - * assignment. - * This indicates the end of the reconciliation phase for the member, and makes the target - * assignment the new current assignment. - * - * @param assignment Target assignment the member was able to successfully process + * Returns true if the member has a target assignment being processed. */ - public void onAssignmentProcessSuccess(ConsumerGroupHeartbeatResponseData.Assignment assignment) { - updateAssignment(assignment); - transitionTo(MemberState.STABLE); - } - - /** - * Update state and member info as the member was not able to process the assignment, due to - * errors in the execution of the user-provided callbacks. - * - * @param error Exception found during the execution of the user-provided callbacks - */ - public void onAssignmentProcessFailure(Throwable error) { - transitionTo(MemberState.FAILED); - // TODO: update member info appropriately, to clear up whatever shouldn't be kept in - // this unrecoverable state + private boolean hasPendingTargetAssignment() { + return targetAssignment.isPresent(); } private void resetEpoch() { this.memberEpoch = 0; } + /** + * {@inheritDoc} + */ @Override public MemberState state() { return state; } + /** + * {@inheritDoc} + */ @Override - public AssignorSelection assignorSelection() { - return this.assignorSelection; + public Optional serverAssignor() { + return this.serverAssignor; } + /** + * {@inheritDoc} + */ @Override - public ConsumerGroupHeartbeatResponseData.Assignment assignment() { + public ConsumerGroupHeartbeatResponseData.Assignment currentAssignment() { return this.currentAssignment; } - // VisibleForTesting + + /** + * @return Assignment that the member received from the server but hasn't completely processed + * yet. Visible for testing. + */ Optional targetAssignment() { return targetAssignment; } - // VisibleForTesting - Optional nextTargetAssignment() { - return nextTargetAssignment; - } - /** - * Set the current assignment for the member. This indicates that the reconciliation of the - * target assignment has been successfully completed. - * This will clear the {@link #targetAssignment}, and take on the - * {@link #nextTargetAssignment} if any. + * This indicates that the reconciliation of the target assignment has been successfully + * completed, so it will make it effective by assigning it to the current assignment. * - * @param assignment Assignment that has been successfully processed as part of the - * reconciliation process. + * @params Assignment that has been successfully reconciled. This is expected to + * match the target assignment defined in {@link #targetAssignment()} */ @Override - public void updateAssignment(ConsumerGroupHeartbeatResponseData.Assignment assignment) { - this.currentAssignment = assignment; - if (!nextTargetAssignment.isPresent()) { - targetAssignment = Optional.empty(); - } else { - targetAssignment = Optional.of(nextTargetAssignment.get()); - nextTargetAssignment = Optional.empty(); + public void onTargetAssignmentProcessComplete(ConsumerGroupHeartbeatResponseData.Assignment assignment) { + if (assignment == null) { + throw new IllegalArgumentException("Assignment cannot be null"); } - maybeTransitionToStable(); + if (!assignment.equals(targetAssignment.orElse(null))) { + // This could be simplified to remove the assignment param and just assume that what + // was reconciled was the targetAssignment, but keeping it explicit and failing fast + // here to uncover any issues in the interaction of the assignment processing logic + // and this. + throw new IllegalStateException(String.format("Reconciled assignment %s does not " + + "match the expected target assignment %s", assignment, + targetAssignment.orElse(null))); + } + this.currentAssignment = assignment; + targetAssignment = Optional.empty(); + transitionTo(MemberState.STABLE); } } diff --git a/clients/src/test/java/org/apache/kafka/clients/consumer/AssignorSelectionTest.java b/clients/src/test/java/org/apache/kafka/clients/consumer/AssignorSelectionTest.java deleted file mode 100644 index 57c61e3dc14..00000000000 --- a/clients/src/test/java/org/apache/kafka/clients/consumer/AssignorSelectionTest.java +++ /dev/null @@ -1,58 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.kafka.clients.consumer; - -import org.apache.kafka.clients.consumer.internals.AssignorSelection; -import org.junit.jupiter.api.Test; - -import static org.junit.jupiter.api.Assertions.assertEquals; -import static org.junit.jupiter.api.Assertions.assertNotEquals; -import static org.junit.jupiter.api.Assertions.assertThrows; -import static org.junit.jupiter.api.Assertions.assertTrue; - -public class AssignorSelectionTest { - - @Test - public void testServerAssignorCannotBeNullOrEmptyIfSelected() { - assertThrows(IllegalArgumentException.class, - () -> AssignorSelection.newServerAssignor(null)); - assertThrows(IllegalArgumentException.class, - () -> AssignorSelection.newServerAssignor("")); - } - - @Test - public void testEquals() { - // Server assignors - AssignorSelection selection1 = AssignorSelection.newServerAssignor("range"); - AssignorSelection selection2 = AssignorSelection.newServerAssignor("range"); - assertEquals(selection1, selection1); - assertEquals(selection1, selection2); - AssignorSelection selection3 = AssignorSelection.newServerAssignor("uniform"); - assertNotEquals(selection1, selection3); - assertNotEquals(selection1, null); - } - - @Test - public void testServerAssignorSelection() { - String assignorName = "uniform"; - AssignorSelection selection = AssignorSelection.newServerAssignor(assignorName); - assertEquals(AssignorSelection.Type.SERVER, selection.type()); - assertTrue(selection.serverAssignor().isPresent()); - assertEquals(assignorName, selection.serverAssignor().get()); - } -} diff --git a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/HeartbeatRequestManagerTest.java b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/HeartbeatRequestManagerTest.java index 598deace7f5..0856b4ebcb9 100644 --- a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/HeartbeatRequestManagerTest.java +++ b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/HeartbeatRequestManagerTest.java @@ -94,7 +94,7 @@ public class HeartbeatRequestManagerTest { coordinatorRequestManager = mock(CoordinatorRequestManager.class); when(coordinatorRequestManager.coordinator()).thenReturn(Optional.of(new Node(1, "localhost", 9999))); subscriptionState = mock(SubscriptionState.class); - membershipManager = spy(new MembershipManagerImpl(GROUP_ID)); + membershipManager = spy(new MembershipManagerImpl(GROUP_ID, logContext)); heartbeatRequestState = mock(HeartbeatRequestManager.HeartbeatRequestState.class); errorEventHandler = mock(ErrorEventHandler.class); heartbeatRequestManager = createManager(); @@ -234,7 +234,7 @@ public class HeartbeatRequestManagerTest { Properties prop = createConsumerConfig(); prop.setProperty(MAX_POLL_INTERVAL_MS_CONFIG, "10000"); config = new ConsumerConfig(prop); - membershipManager = new MembershipManagerImpl(GROUP_ID, GROUP_INSTANCE_ID, null); + membershipManager = new MembershipManagerImpl(GROUP_ID, GROUP_INSTANCE_ID, null, logContext); heartbeatRequestState = new HeartbeatRequestManager.HeartbeatRequestState( logContext, time, diff --git a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/MembershipManagerImplTest.java b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/MembershipManagerImplTest.java index 4b6aa80c04e..d78bbf2ab63 100644 --- a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/MembershipManagerImplTest.java +++ b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/MembershipManagerImplTest.java @@ -21,9 +21,12 @@ import org.apache.kafka.common.Uuid; import org.apache.kafka.common.message.ConsumerGroupHeartbeatResponseData; import org.apache.kafka.common.protocol.Errors; import org.apache.kafka.common.requests.ConsumerGroupHeartbeatResponse; +import org.apache.kafka.common.utils.LogContext; import org.junit.jupiter.api.Test; import java.util.Arrays; +import java.util.Collections; +import java.util.Optional; import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertFalse; @@ -35,40 +38,26 @@ public class MembershipManagerImplTest { private static final String GROUP_ID = "test-group"; private static final String MEMBER_ID = "test-member-1"; private static final int MEMBER_EPOCH = 1; + private final LogContext logContext = new LogContext(); @Test - public void testMembershipManagerDefaultAssignor() { - MembershipManagerImpl membershipManager = new MembershipManagerImpl(GROUP_ID); - assertEquals(AssignorSelection.defaultAssignor(), membershipManager.assignorSelection()); + public void testMembershipManagerServerAssignor() { + MembershipManagerImpl membershipManager = new MembershipManagerImpl(GROUP_ID, logContext); + assertEquals(Optional.empty(), membershipManager.serverAssignor()); - membershipManager = new MembershipManagerImpl(GROUP_ID, "instance1", null); - assertEquals(AssignorSelection.defaultAssignor(), membershipManager.assignorSelection()); - } - - @Test - public void testMembershipManagerAssignorSelectionUpdate() { - AssignorSelection firstAssignorSelection = AssignorSelection.newServerAssignor("uniform"); - MembershipManagerImpl membershipManager = new MembershipManagerImpl(GROUP_ID, "instance1", - firstAssignorSelection); - assertEquals(firstAssignorSelection, membershipManager.assignorSelection()); - - AssignorSelection secondAssignorSelection = AssignorSelection.newServerAssignor("range"); - membershipManager.setAssignorSelection(secondAssignorSelection); - assertEquals(secondAssignorSelection, membershipManager.assignorSelection()); - - assertThrows(IllegalArgumentException.class, - () -> membershipManager.setAssignorSelection(null)); + membershipManager = new MembershipManagerImpl(GROUP_ID, "instance1", "Uniform", logContext); + assertEquals(Optional.of("Uniform"), membershipManager.serverAssignor()); } @Test public void testMembershipManagerInitSupportsEmptyGroupInstanceId() { - new MembershipManagerImpl(GROUP_ID); - new MembershipManagerImpl(GROUP_ID, null, AssignorSelection.defaultAssignor()); + new MembershipManagerImpl(GROUP_ID, logContext); + new MembershipManagerImpl(GROUP_ID, null, null, logContext); } @Test public void testTransitionToReconcilingOnlyIfAssignmentReceived() { - MembershipManagerImpl membershipManager = new MembershipManagerImpl(GROUP_ID); + MembershipManagerImpl membershipManager = new MembershipManagerImpl(GROUP_ID, logContext); assertEquals(MemberState.UNJOINED, membershipManager.state()); ConsumerGroupHeartbeatResponse responseWithoutAssignment = @@ -84,7 +73,7 @@ public class MembershipManagerImplTest { @Test public void testMemberIdAndEpochResetOnFencedMembers() { - MembershipManagerImpl membershipManager = new MembershipManagerImpl(GROUP_ID); + MembershipManagerImpl membershipManager = new MembershipManagerImpl(GROUP_ID, logContext); ConsumerGroupHeartbeatResponse heartbeatResponse = createConsumerGroupHeartbeatResponse(null); membershipManager.updateState(heartbeatResponse.data()); @@ -99,7 +88,7 @@ public class MembershipManagerImplTest { @Test public void testTransitionToFailure() { - MembershipManagerImpl membershipManager = new MembershipManagerImpl(GROUP_ID); + MembershipManagerImpl membershipManager = new MembershipManagerImpl(GROUP_ID, logContext); ConsumerGroupHeartbeatResponse heartbeatResponse = createConsumerGroupHeartbeatResponse(null); membershipManager.updateState(heartbeatResponse.data()); @@ -112,62 +101,144 @@ public class MembershipManagerImplTest { } @Test - public void testUpdateAssignment() { - MembershipManagerImpl membershipManager = new MembershipManagerImpl(GROUP_ID); + public void testFencingWhenStateIsStable() { + MembershipManagerImpl membershipManager = new MembershipManagerImpl(GROUP_ID, logContext); + ConsumerGroupHeartbeatResponse heartbeatResponse = createConsumerGroupHeartbeatResponse(null); + membershipManager.updateState(heartbeatResponse.data()); + assertEquals(MemberState.STABLE, membershipManager.state()); + + testStateUpdateOnFenceError(membershipManager); + } + + @Test + public void testFencingWhenStateIsReconciling() { + MembershipManagerImpl membershipManager = new MembershipManagerImpl(GROUP_ID, logContext); + ConsumerGroupHeartbeatResponse heartbeatResponse = createConsumerGroupHeartbeatResponse(createAssignment()); + membershipManager.updateState(heartbeatResponse.data()); + assertEquals(MemberState.RECONCILING, membershipManager.state()); + + testStateUpdateOnFenceError(membershipManager); + } + + @Test + public void testFatalFailureWhenStateIsUnjoined() { + MembershipManagerImpl membershipManager = new MembershipManagerImpl(GROUP_ID, logContext); + assertEquals(MemberState.UNJOINED, membershipManager.state()); + + testStateUpdateOnFatalFailure(membershipManager); + } + + @Test + public void testFatalFailureWhenStateIsStable() { + MembershipManagerImpl membershipManager = new MembershipManagerImpl(GROUP_ID, logContext); + ConsumerGroupHeartbeatResponse heartbeatResponse = createConsumerGroupHeartbeatResponse(null); + membershipManager.updateState(heartbeatResponse.data()); + assertEquals(MemberState.STABLE, membershipManager.state()); + + testStateUpdateOnFatalFailure(membershipManager); + } + + @Test + public void testFencingShouldNotHappenWhenStateIsUnjoined() { + MembershipManagerImpl membershipManager = new MembershipManagerImpl(GROUP_ID, logContext); + assertEquals(MemberState.UNJOINED, membershipManager.state()); + + // Getting fenced when the member is not part of the group is not expected and should + // fail with invalid transition. + assertThrows(IllegalStateException.class, membershipManager::transitionToFenced); + } + + @Test + public void testUpdateStateFailsOnResponsesWithErrors() { + MembershipManagerImpl membershipManager = new MembershipManagerImpl(GROUP_ID, logContext); + // Updating state with a heartbeat response containing errors cannot be performed and + // should fail. + ConsumerGroupHeartbeatResponse unknownMemberResponse = + createConsumerGroupHeartbeatResponseWithError(Errors.UNKNOWN_MEMBER_ID); + assertThrows(IllegalArgumentException.class, + () -> membershipManager.updateState(unknownMemberResponse.data())); + } + + @Test + public void testAssignmentUpdatedAsReceivedAndProcessed() { + MembershipManagerImpl membershipManager = new MembershipManagerImpl(GROUP_ID, logContext); ConsumerGroupHeartbeatResponseData.Assignment newAssignment = createAssignment(); ConsumerGroupHeartbeatResponse heartbeatResponse = createConsumerGroupHeartbeatResponse(newAssignment); membershipManager.updateState(heartbeatResponse.data()); // Target assignment should be in the process of being reconciled - checkAssignments(membershipManager, null, newAssignment, null); + checkAssignments(membershipManager, null, newAssignment); + // Mark assignment processing completed + membershipManager.onTargetAssignmentProcessComplete(newAssignment); + // Target assignment should now be the current assignment + checkAssignments(membershipManager, newAssignment, null); } @Test - public void testUpdateAssignmentReceivingAssignmentWhileAnotherInProcess() { - MembershipManagerImpl membershipManager = new MembershipManagerImpl(GROUP_ID); + public void testMemberFailsIfAssignmentReceivedWhileAnotherOnBeingReconciled() { + MembershipManagerImpl membershipManager = new MembershipManagerImpl(GROUP_ID, logContext); ConsumerGroupHeartbeatResponseData.Assignment newAssignment1 = createAssignment(); membershipManager.updateState(createConsumerGroupHeartbeatResponse(newAssignment1).data()); // First target assignment received should be in the process of being reconciled - checkAssignments(membershipManager, null, newAssignment1, null); + checkAssignments(membershipManager, null, newAssignment1); // Second target assignment received while there is another one being reconciled ConsumerGroupHeartbeatResponseData.Assignment newAssignment2 = createAssignment(); - membershipManager.updateState(createConsumerGroupHeartbeatResponse(newAssignment2).data()); - checkAssignments(membershipManager, null, newAssignment1, newAssignment2); + assertThrows(IllegalStateException.class, + () -> membershipManager.updateState(createConsumerGroupHeartbeatResponse(newAssignment2).data())); + assertEquals(MemberState.FAILED, membershipManager.state()); } @Test - public void testNextTargetAssignmentHoldsLatestAssignmentReceivedWhileAnotherInProcess() { - MembershipManagerImpl membershipManager = new MembershipManagerImpl(GROUP_ID); - ConsumerGroupHeartbeatResponseData.Assignment newAssignment1 = createAssignment(); - membershipManager.updateState(createConsumerGroupHeartbeatResponse(newAssignment1).data()); - - // First target assignment received, remains in the process of being reconciled - checkAssignments(membershipManager, null, newAssignment1, null); - - // Second target assignment received while there is another one being reconciled - ConsumerGroupHeartbeatResponseData.Assignment newAssignment2 = createAssignment(); - membershipManager.updateState(createConsumerGroupHeartbeatResponse(newAssignment2).data()); - checkAssignments(membershipManager, null, newAssignment1, newAssignment2); + public void testAssignmentUpdatedFailsIfAssignmentReconciledDoesNotMatchTargetAssignment() { + MembershipManagerImpl membershipManager = new MembershipManagerImpl(GROUP_ID, logContext); + ConsumerGroupHeartbeatResponseData.Assignment targetAssignment = new ConsumerGroupHeartbeatResponseData.Assignment() + .setTopicPartitions(Collections.singletonList( + new ConsumerGroupHeartbeatResponseData.TopicPartitions() + .setTopicId(Uuid.randomUuid()) + .setPartitions(Arrays.asList(0, 1, 2)))); + ConsumerGroupHeartbeatResponse heartbeatResponse = + createConsumerGroupHeartbeatResponse(targetAssignment); + membershipManager.updateState(heartbeatResponse.data()); - // If more assignments are received while there is one being reconciled, the most recent - // assignment received is kept as nextTargetAssignment - ConsumerGroupHeartbeatResponseData.Assignment newAssignment3 = createAssignment(); - membershipManager.updateState(createConsumerGroupHeartbeatResponse(newAssignment3).data()); - checkAssignments(membershipManager, null, newAssignment1, newAssignment3); + // Target assignment should be in the process of being reconciled + checkAssignments(membershipManager, null, targetAssignment); + // Mark assignment processing completed + ConsumerGroupHeartbeatResponseData.Assignment reconciled = + new ConsumerGroupHeartbeatResponseData.Assignment() + .setTopicPartitions(Collections.singletonList( + new ConsumerGroupHeartbeatResponseData.TopicPartitions() + .setTopicId(Uuid.randomUuid()) + .setPartitions(Collections.singletonList(0)))); + assertThrows(IllegalStateException.class, () -> membershipManager.onTargetAssignmentProcessComplete(reconciled)); } private void checkAssignments( MembershipManagerImpl membershipManager, ConsumerGroupHeartbeatResponseData.Assignment expectedCurrentAssignment, - ConsumerGroupHeartbeatResponseData.Assignment expectedTargetAssignment, - ConsumerGroupHeartbeatResponseData.Assignment expectedNextTargetAssignment) { - assertEquals(expectedCurrentAssignment, membershipManager.assignment()); + ConsumerGroupHeartbeatResponseData.Assignment expectedTargetAssignment) { + assertEquals(expectedCurrentAssignment, membershipManager.currentAssignment()); assertEquals(expectedTargetAssignment, membershipManager.targetAssignment().orElse(null)); - assertEquals(expectedNextTargetAssignment, membershipManager.nextTargetAssignment().orElse(null)); + } + private void testStateUpdateOnFenceError(MembershipManager membershipManager) { + membershipManager.transitionToFenced(); + assertEquals(MemberState.FENCED, membershipManager.state()); + // Should reset member epoch and keep member id + assertFalse(membershipManager.memberId().isEmpty()); + assertEquals(0, membershipManager.memberEpoch()); + } + + private void testStateUpdateOnFatalFailure(MembershipManager membershipManager) { + String initialMemberId = membershipManager.memberId(); + int initialMemberEpoch = membershipManager.memberEpoch(); + membershipManager.transitionToFailed(); + assertEquals(MemberState.FAILED, membershipManager.state()); + // Should not reset member id or epoch + assertEquals(initialMemberId, membershipManager.memberId()); + assertEquals(initialMemberEpoch, membershipManager.memberEpoch()); } private ConsumerGroupHeartbeatResponse createConsumerGroupHeartbeatResponse(ConsumerGroupHeartbeatResponseData.Assignment assignment) {