Browse Source

KAFKA-15275 - Client state machine basic components, states and initial transitions (#14323)

Initial definition of the core components for maintaining group membership on the client following the new consumer group protocol.

This PR includes:
- Membership management for keeping member state and assignment, based on the heartbeat responses received.
- Assignor selection logic to support server side assignors.
This only includes the basic initial states and transitions, to be extended as we implement the protocol.

This is intended to be used from the heartbeat and assignment requests manager that actually build and process the heartbeat and assignment related requests.

Reviewers: Philip Nee <pnee@confluent.io>, Kirk True <ktrue@confluent.io>, David Jacot <djacot@confluent.io>
pull/14388/head
Lianet Magrans 1 year ago committed by GitHub
parent
commit
0e47fa7537
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 84
      clients/src/main/java/org/apache/kafka/clients/consumer/internals/AssignorSelection.java
  2. 82
      clients/src/main/java/org/apache/kafka/clients/consumer/internals/MemberState.java
  3. 62
      clients/src/main/java/org/apache/kafka/clients/consumer/internals/MembershipManager.java
  4. 240
      clients/src/main/java/org/apache/kafka/clients/consumer/internals/MembershipManagerImpl.java
  5. 56
      clients/src/test/java/org/apache/kafka/clients/consumer/AssignorSelectionTest.java
  6. 202
      clients/src/test/java/org/apache/kafka/clients/consumer/internals/MembershipManagerImplTest.java

84
clients/src/main/java/org/apache/kafka/clients/consumer/internals/AssignorSelection.java

@ -0,0 +1,84 @@ @@ -0,0 +1,84 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.clients.consumer.internals;
import java.util.Objects;
/**
* Selection of a type of assignor used by a member to get partitions assigned as part of a
* consumer group. Currently supported assignors are:
* <li>SERVER assignors</li>
* <p/>
* Server assignors include a name of the server assignor selected, ex. uniform, range.
*/
public class AssignorSelection {
public enum Type { SERVER }
private final AssignorSelection.Type type;
private String serverAssignor;
private AssignorSelection(Type type, String serverAssignor) {
this.type = type;
if (type == Type.SERVER) {
this.serverAssignor = serverAssignor;
} else {
throw new IllegalArgumentException("Unsupported assignor type " + type);
}
}
public static AssignorSelection newServerAssignor(String serverAssignor) {
if (serverAssignor == null) {
throw new IllegalArgumentException("Selected server assignor name cannot be null");
}
if (serverAssignor.isEmpty()) {
throw new IllegalArgumentException("Selected server assignor name cannot be empty");
}
return new AssignorSelection(Type.SERVER, serverAssignor);
}
public static AssignorSelection defaultAssignor() {
// TODO: review default selection
return new AssignorSelection(Type.SERVER, "uniform");
}
public String serverAssignor() {
return this.serverAssignor;
}
public Type type() {
return this.type;
}
@Override
public boolean equals(Object assignorSelection) {
if (this == assignorSelection) return true;
if (assignorSelection == null || getClass() != assignorSelection.getClass()) return false;
return Objects.equals(((AssignorSelection) assignorSelection).type, this.type) &&
Objects.equals(((AssignorSelection) assignorSelection).serverAssignor, this.serverAssignor);
}
@Override
public int hashCode() {
return Objects.hash(type, serverAssignor);
}
@Override
public String toString() {
return String.format("Assignor selection {type:%s, name:%s}", type, serverAssignor);
}
}

82
clients/src/main/java/org/apache/kafka/clients/consumer/internals/MemberState.java

@ -0,0 +1,82 @@ @@ -0,0 +1,82 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.clients.consumer.internals;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
public enum MemberState {
/**
* Member has not joined a consumer group yet, or has been fenced and needs to re-join.
*/
UNJOINED,
/**
* Member has received a new target assignment (partitions could have been assigned or
* revoked), and it is processing it. While in this state, the member will
* invoke the user callbacks for onPartitionsAssigned or onPartitionsRevoked, and then make
* the new assignment effective.
*/
// TODO: determine if separate state will be needed for assign/revoke (not for now)
RECONCILING,
/**
* Member is active in a group (heartbeating) and has processed all assignments received.
*/
STABLE,
/**
* Member transitions to this state when it receives a
* {@link org.apache.kafka.common.protocol.Errors#UNKNOWN_MEMBER_ID} or
* {@link org.apache.kafka.common.protocol.Errors#FENCED_MEMBER_EPOCH} error from the
* broker. This is a recoverable state, where the member
* gives up its partitions by invoking the user callbacks for onPartitionsLost, and then
* transitions to {@link #UNJOINED} to rejoin the group as a new member.
*/
FENCED,
/**
* The member failed with an unrecoverable error
*/
FAILED;
static {
// Valid state transitions
STABLE.previousValidStates = Arrays.asList(UNJOINED, RECONCILING);
RECONCILING.previousValidStates = Arrays.asList(STABLE, UNJOINED);
FAILED.previousValidStates = Arrays.asList(STABLE, RECONCILING);
FENCED.previousValidStates = Arrays.asList(STABLE, RECONCILING);
UNJOINED.previousValidStates = Arrays.asList(FENCED);
}
private List<MemberState> previousValidStates;
MemberState() {
this.previousValidStates = new ArrayList<>();
}
public List<MemberState> getPreviousValidStates() {
return this.previousValidStates;
}
}

62
clients/src/main/java/org/apache/kafka/clients/consumer/internals/MembershipManager.java

@ -0,0 +1,62 @@ @@ -0,0 +1,62 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.clients.consumer.internals;
import org.apache.kafka.common.message.ConsumerGroupHeartbeatResponseData;
import java.util.Optional;
/**
* Manages group membership for a single member.
* Responsible for:
* <li>Keeping member state</li>
* <li>Keeping assignment for the member</li>
* <li>Computing assignment for the group if the member is required to do so<li/>
*/
public interface MembershipManager {
String groupId();
Optional<String> groupInstanceId();
String memberId();
int memberEpoch();
MemberState state();
/**
* Update the current state of the member based on a heartbeat response
*/
void updateState(ConsumerGroupHeartbeatResponseData response);
/**
* Returns the {@link AssignorSelection} for the member
*/
AssignorSelection assignorSelection();
/**
* Returns the current assignment for the member
*/
ConsumerGroupHeartbeatResponseData.Assignment assignment();
/**
* Update the assignment for the member, indicating that the provided assignment is the new
* current assignment.
*/
void updateAssignment(ConsumerGroupHeartbeatResponseData.Assignment assignment);
}

240
clients/src/main/java/org/apache/kafka/clients/consumer/internals/MembershipManagerImpl.java

@ -0,0 +1,240 @@ @@ -0,0 +1,240 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.clients.consumer.internals;
import org.apache.kafka.common.message.ConsumerGroupHeartbeatResponseData;
import org.apache.kafka.common.protocol.Errors;
import java.util.Optional;
/**
* Membership manager that maintains group membership for a single member following the new
* consumer group protocol.
* <p/>
* This keeps membership state and assignment updated in-memory, based on the heartbeat responses
* the member receives. It is also responsible for computing assignment for the group based on
* the metadata, if the member has been selected by the broker to do so.
*/
public class MembershipManagerImpl implements MembershipManager {
private final String groupId;
private final Optional<String> groupInstanceId;
private String memberId;
private int memberEpoch;
private MemberState state;
private AssignorSelection assignorSelection;
/**
* Assignment that the member received from the server and successfully processed.
*/
private ConsumerGroupHeartbeatResponseData.Assignment currentAssignment;
/**
* Assignment that the member received from the server but hasn't completely processed
* yet.
*/
private Optional<ConsumerGroupHeartbeatResponseData.Assignment> targetAssignment;
/**
* Latest assignment that the member received from the server while a {@link #targetAssignment}
* was in process.
*/
private Optional<ConsumerGroupHeartbeatResponseData.Assignment> nextTargetAssignment;
public MembershipManagerImpl(String groupId) {
this(groupId, null, null);
}
public MembershipManagerImpl(String groupId, String groupInstanceId, AssignorSelection assignorSelection) {
this.groupId = groupId;
this.state = MemberState.UNJOINED;
if (assignorSelection == null) {
setAssignorSelection(AssignorSelection.defaultAssignor());
} else {
setAssignorSelection(assignorSelection);
}
this.groupInstanceId = Optional.ofNullable(groupInstanceId);
this.targetAssignment = Optional.empty();
this.nextTargetAssignment = Optional.empty();
}
/**
* Update assignor selection for the member.
*
* @param assignorSelection New assignor selection
* @throws IllegalArgumentException If the provided assignor selection is null
*/
public void setAssignorSelection(AssignorSelection assignorSelection) {
if (assignorSelection == null) {
throw new IllegalArgumentException("Assignor selection cannot be null");
}
this.assignorSelection = assignorSelection;
}
private void transitionTo(MemberState nextState) {
if (!this.state.equals(nextState) && !nextState.getPreviousValidStates().contains(state)) {
// TODO: handle invalid state transition
throw new RuntimeException(String.format("Invalid state transition from %s to %s",
state, nextState));
}
this.state = nextState;
}
@Override
public String groupId() {
return groupId;
}
@Override
public Optional<String> groupInstanceId() {
return groupInstanceId;
}
@Override
public String memberId() {
return memberId;
}
@Override
public int memberEpoch() {
return memberEpoch;
}
@Override
public void updateState(ConsumerGroupHeartbeatResponseData response) {
if (response.errorCode() == Errors.NONE.code()) {
this.memberId = response.memberId();
this.memberEpoch = response.memberEpoch();
ConsumerGroupHeartbeatResponseData.Assignment assignment = response.assignment();
if (assignment != null) {
setTargetAssignment(assignment);
}
maybeTransitionToStable();
} else {
if (response.errorCode() == Errors.FENCED_MEMBER_EPOCH.code() || response.errorCode() == Errors.UNKNOWN_MEMBER_ID.code()) {
resetEpoch();
transitionTo(MemberState.FENCED);
} else if (response.errorCode() == Errors.UNRELEASED_INSTANCE_ID.code()) {
transitionTo(MemberState.FAILED);
}
// TODO: handle other errors here to update state accordingly, mainly making the
// distinction between the recoverable errors and the fatal ones, that should FAILED
// the member
}
}
/**
* Transition to {@link MemberState#STABLE} only if there are no target assignments left to
* reconcile. Transition to {@link MemberState#RECONCILING} otherwise.
*/
private boolean maybeTransitionToStable() {
if (!hasPendingTargetAssignment()) {
transitionTo(MemberState.STABLE);
} else {
transitionTo(MemberState.RECONCILING);
}
return state.equals(MemberState.STABLE);
}
private void setTargetAssignment(ConsumerGroupHeartbeatResponseData.Assignment newTargetAssignment) {
if (!targetAssignment.isPresent()) {
targetAssignment = Optional.of(newTargetAssignment);
} else {
// Keep the latest next target assignment
nextTargetAssignment = Optional.of(newTargetAssignment);
}
}
private boolean hasPendingTargetAssignment() {
return targetAssignment.isPresent() || nextTargetAssignment.isPresent();
}
/**
* Update state and assignment as the member has successfully processed a new target
* assignment.
* This indicates the end of the reconciliation phase for the member, and makes the target
* assignment the new current assignment.
*
* @param assignment Target assignment the member was able to successfully process
*/
public void onAssignmentProcessSuccess(ConsumerGroupHeartbeatResponseData.Assignment assignment) {
updateAssignment(assignment);
transitionTo(MemberState.STABLE);
}
/**
* Update state and member info as the member was not able to process the assignment, due to
* errors in the execution of the user-provided callbacks.
*
* @param error Exception found during the execution of the user-provided callbacks
*/
public void onAssignmentProcessFailure(Throwable error) {
transitionTo(MemberState.FAILED);
// TODO: update member info appropriately, to clear up whatever shouldn't be kept in
// this unrecoverable state
}
private void resetEpoch() {
this.memberEpoch = 0;
}
@Override
public MemberState state() {
return state;
}
@Override
public AssignorSelection assignorSelection() {
return this.assignorSelection;
}
@Override
public ConsumerGroupHeartbeatResponseData.Assignment assignment() {
return this.currentAssignment;
}
// VisibleForTesting
Optional<ConsumerGroupHeartbeatResponseData.Assignment> targetAssignment() {
return targetAssignment;
}
// VisibleForTesting
Optional<ConsumerGroupHeartbeatResponseData.Assignment> nextTargetAssignment() {
return nextTargetAssignment;
}
/**
* Set the current assignment for the member. This indicates that the reconciliation of the
* target assignment has been successfully completed.
* This will clear the {@link #targetAssignment}, and take on the
* {@link #nextTargetAssignment} if any.
*
* @param assignment Assignment that has been successfully processed as part of the
* reconciliation process.
*/
@Override
public void updateAssignment(ConsumerGroupHeartbeatResponseData.Assignment assignment) {
this.currentAssignment = assignment;
if (!nextTargetAssignment.isPresent()) {
targetAssignment = Optional.empty();
} else {
targetAssignment = Optional.of(nextTargetAssignment.get());
nextTargetAssignment = Optional.empty();
}
maybeTransitionToStable();
}
}

56
clients/src/test/java/org/apache/kafka/clients/consumer/AssignorSelectionTest.java

@ -0,0 +1,56 @@ @@ -0,0 +1,56 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.clients.consumer;
import org.apache.kafka.clients.consumer.internals.AssignorSelection;
import org.junit.jupiter.api.Test;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertNotEquals;
import static org.junit.jupiter.api.Assertions.assertThrows;
public class AssignorSelectionTest {
@Test
public void testServerAssignorCannotBeNullOrEmptyIfSelected() {
assertThrows(IllegalArgumentException.class,
() -> AssignorSelection.newServerAssignor(null));
assertThrows(IllegalArgumentException.class,
() -> AssignorSelection.newServerAssignor(""));
}
@Test
public void testEquals() {
// Server assignors
AssignorSelection selection1 = AssignorSelection.newServerAssignor("range");
AssignorSelection selection2 = AssignorSelection.newServerAssignor("range");
assertEquals(selection1, selection1);
assertEquals(selection1, selection2);
AssignorSelection selection3 = AssignorSelection.newServerAssignor("uniform");
assertNotEquals(selection1, selection3);
assertNotEquals(selection1, null);
}
@Test
public void testServerAssignorSelection() {
String assignorName = "uniform";
AssignorSelection selection = AssignorSelection.newServerAssignor(assignorName);
assertEquals(AssignorSelection.Type.SERVER, selection.type());
assertEquals(assignorName, selection.serverAssignor());
}
}

202
clients/src/test/java/org/apache/kafka/clients/consumer/internals/MembershipManagerImplTest.java

@ -0,0 +1,202 @@ @@ -0,0 +1,202 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.clients.consumer.internals;
import org.apache.kafka.common.Uuid;
import org.apache.kafka.common.message.ConsumerGroupHeartbeatResponseData;
import org.apache.kafka.common.protocol.Errors;
import org.apache.kafka.common.requests.ConsumerGroupHeartbeatResponse;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.EnumSource;
import java.util.Arrays;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertFalse;
import static org.junit.jupiter.api.Assertions.assertNotEquals;
import static org.junit.jupiter.api.Assertions.assertThrows;
public class MembershipManagerImplTest {
private static final String GROUP_ID = "test-group";
private static final String MEMBER_ID = "test-member-1";
private static final int MEMBER_EPOCH = 1;
@Test
public void testMembershipManagerDefaultAssignor() {
MembershipManagerImpl membershipManager = new MembershipManagerImpl(GROUP_ID);
assertEquals(AssignorSelection.defaultAssignor(), membershipManager.assignorSelection());
membershipManager = new MembershipManagerImpl(GROUP_ID, "instance1", null);
assertEquals(AssignorSelection.defaultAssignor(), membershipManager.assignorSelection());
}
@Test
public void testMembershipManagerAssignorSelectionUpdate() {
AssignorSelection firstAssignorSelection = AssignorSelection.newServerAssignor("uniform");
MembershipManagerImpl membershipManager = new MembershipManagerImpl(GROUP_ID, "instance1",
firstAssignorSelection);
assertEquals(firstAssignorSelection, membershipManager.assignorSelection());
AssignorSelection secondAssignorSelection = AssignorSelection.newServerAssignor("range");
membershipManager.setAssignorSelection(secondAssignorSelection);
assertEquals(secondAssignorSelection, membershipManager.assignorSelection());
assertThrows(IllegalArgumentException.class,
() -> membershipManager.setAssignorSelection(null));
}
@Test
public void testMembershipManagerInitSupportsEmptyGroupInstanceId() {
new MembershipManagerImpl(GROUP_ID);
new MembershipManagerImpl(GROUP_ID, null, AssignorSelection.defaultAssignor());
}
@Test
public void testTransitionToReconcilingOnlyIfAssignmentReceived() {
MembershipManagerImpl membershipManager = new MembershipManagerImpl(GROUP_ID);
assertEquals(MemberState.UNJOINED, membershipManager.state());
ConsumerGroupHeartbeatResponse responseWithoutAssignment =
createConsumerGroupHeartbeatResponse(null);
membershipManager.updateState(responseWithoutAssignment.data());
assertNotEquals(MemberState.RECONCILING, membershipManager.state());
ConsumerGroupHeartbeatResponse responseWithAssignment =
createConsumerGroupHeartbeatResponse(createAssignment());
membershipManager.updateState(responseWithAssignment.data());
assertEquals(MemberState.RECONCILING, membershipManager.state());
}
@ParameterizedTest
@EnumSource(Errors.class)
public void testMemberIdAndEpochResetOnErrors(Errors error) {
MembershipManagerImpl membershipManager = new MembershipManagerImpl(GROUP_ID);
ConsumerGroupHeartbeatResponse heartbeatResponse =
createConsumerGroupHeartbeatResponse(null);
membershipManager.updateState(heartbeatResponse.data());
assertEquals(MemberState.STABLE, membershipManager.state());
assertEquals(MEMBER_ID, membershipManager.memberId());
assertEquals(MEMBER_EPOCH, membershipManager.memberEpoch());
if (error == Errors.UNKNOWN_MEMBER_ID || error == Errors.FENCED_MEMBER_EPOCH) {
// Should reset member epoch and keep member id
ConsumerGroupHeartbeatResponse heartbeatResponseWithMemberIdError =
createConsumerGroupHeartbeatResponseWithError(Errors.FENCED_MEMBER_EPOCH);
membershipManager.updateState(heartbeatResponseWithMemberIdError.data());
assertFalse(membershipManager.memberId().isEmpty());
assertEquals(0, membershipManager.memberEpoch());
} else {
// Should not reset member id or epoch
ConsumerGroupHeartbeatResponse heartbeatResponseWithError =
createConsumerGroupHeartbeatResponseWithError(error);
membershipManager.updateState(heartbeatResponseWithError.data());
assertFalse(membershipManager.memberId().isEmpty());
assertNotEquals(0, membershipManager.memberEpoch());
}
}
@Test
public void testUpdateAssignment() {
MembershipManagerImpl membershipManager = new MembershipManagerImpl(GROUP_ID);
ConsumerGroupHeartbeatResponseData.Assignment newAssignment = createAssignment();
ConsumerGroupHeartbeatResponse heartbeatResponse =
createConsumerGroupHeartbeatResponse(newAssignment);
membershipManager.updateState(heartbeatResponse.data());
// Target assignment should be in the process of being reconciled
checkAssignments(membershipManager, null, newAssignment, null);
}
@Test
public void testUpdateAssignmentReceivingAssignmentWhileAnotherInProcess() {
MembershipManagerImpl membershipManager = new MembershipManagerImpl(GROUP_ID);
ConsumerGroupHeartbeatResponseData.Assignment newAssignment1 = createAssignment();
membershipManager.updateState(createConsumerGroupHeartbeatResponse(newAssignment1).data());
// First target assignment received should be in the process of being reconciled
checkAssignments(membershipManager, null, newAssignment1, null);
// Second target assignment received while there is another one being reconciled
ConsumerGroupHeartbeatResponseData.Assignment newAssignment2 = createAssignment();
membershipManager.updateState(createConsumerGroupHeartbeatResponse(newAssignment2).data());
checkAssignments(membershipManager, null, newAssignment1, newAssignment2);
}
@Test
public void testNextTargetAssignmentHoldsLatestAssignmentReceivedWhileAnotherInProcess() {
MembershipManagerImpl membershipManager = new MembershipManagerImpl(GROUP_ID);
ConsumerGroupHeartbeatResponseData.Assignment newAssignment1 = createAssignment();
membershipManager.updateState(createConsumerGroupHeartbeatResponse(newAssignment1).data());
// First target assignment received, remains in the process of being reconciled
checkAssignments(membershipManager, null, newAssignment1, null);
// Second target assignment received while there is another one being reconciled
ConsumerGroupHeartbeatResponseData.Assignment newAssignment2 = createAssignment();
membershipManager.updateState(createConsumerGroupHeartbeatResponse(newAssignment2).data());
checkAssignments(membershipManager, null, newAssignment1, newAssignment2);
// If more assignments are received while there is one being reconciled, the most recent
// assignment received is kept as nextTargetAssignment
ConsumerGroupHeartbeatResponseData.Assignment newAssignment3 = createAssignment();
membershipManager.updateState(createConsumerGroupHeartbeatResponse(newAssignment3).data());
checkAssignments(membershipManager, null, newAssignment1, newAssignment3);
}
private void checkAssignments(
MembershipManagerImpl membershipManager,
ConsumerGroupHeartbeatResponseData.Assignment expectedCurrentAssignment,
ConsumerGroupHeartbeatResponseData.Assignment expectedTargetAssignment,
ConsumerGroupHeartbeatResponseData.Assignment expectedNextTargetAssignment) {
assertEquals(expectedCurrentAssignment, membershipManager.assignment());
assertEquals(expectedTargetAssignment, membershipManager.targetAssignment().orElse(null));
assertEquals(expectedNextTargetAssignment, membershipManager.nextTargetAssignment().orElse(null));
}
private ConsumerGroupHeartbeatResponse createConsumerGroupHeartbeatResponse(ConsumerGroupHeartbeatResponseData.Assignment assignment) {
return new ConsumerGroupHeartbeatResponse(new ConsumerGroupHeartbeatResponseData()
.setErrorCode(Errors.NONE.code())
.setMemberId(MEMBER_ID)
.setMemberEpoch(MEMBER_EPOCH)
.setAssignment(assignment));
}
private ConsumerGroupHeartbeatResponse createConsumerGroupHeartbeatResponseWithError(Errors error) {
return new ConsumerGroupHeartbeatResponse(new ConsumerGroupHeartbeatResponseData()
.setErrorCode(error.code())
.setMemberId(MEMBER_ID)
.setMemberEpoch(5));
}
private ConsumerGroupHeartbeatResponseData.Assignment createAssignment() {
return new ConsumerGroupHeartbeatResponseData.Assignment()
.setAssignedTopicPartitions(Arrays.asList(
new ConsumerGroupHeartbeatResponseData.TopicPartitions()
.setTopicId(Uuid.randomUuid())
.setPartitions(Arrays.asList(0, 1, 2)),
new ConsumerGroupHeartbeatResponseData.TopicPartitions()
.setTopicId(Uuid.randomUuid())
.setPartitions(Arrays.asList(3, 4, 5))
));
}
}
Loading…
Cancel
Save