From 7054625c45dc6edb3c07271fe4a6c24b4638424f Mon Sep 17 00:00:00 2001 From: David Jacot Date: Tue, 5 Sep 2023 23:36:38 -0700 Subject: [PATCH] KAFKA-14499: [6/N] Add MemberId and MemberEpoch to OffsetFetchRequest (#14321) This patch adds the MemberId and the MemberEpoch fields to the OffsetFetchRequest. Those fields will be populated when the new consumer group protocol is used to ensure that the member fetching the offset has the correct member id and epoch. If it does not, UNKNOWN_MEMBER_ID or STALE_MEMBER_EPOCH are returned to the client. Our initial idea was to implement the same for the old protocol. The field is called GenerationIdOrMemberEpoch in KIP-848 to materialize this. As a second though, I think that we should only do it for the new protocol. The effort to implement it in the old protocol is not worth it in my opinion. Reviewers: Ritika Reddy , Calvin Liu , Justine Olshan --- .../common/message/OffsetFetchRequest.json | 21 ++- .../common/message/OffsetFetchResponse.json | 17 +- .../common/requests/RequestResponseTest.java | 44 ++--- .../kafka/server/OffsetFetchRequestTest.scala | 1 + .../apache/kafka/coordinator/group/Group.java | 10 +- .../group/OffsetMetadataManager.java | 18 +- .../group/consumer/ConsumerGroup.java | 41 ++++- .../group/generic/GenericGroup.java | 10 +- .../group/OffsetMetadataManagerTest.java | 154 +++++++++++++++++- .../group/consumer/ConsumerGroupTest.java | 28 ++++ 10 files changed, 299 insertions(+), 45 deletions(-) diff --git a/clients/src/main/resources/common/message/OffsetFetchRequest.json b/clients/src/main/resources/common/message/OffsetFetchRequest.json index 8f3c4144ab0..b0f564e7764 100644 --- a/clients/src/main/resources/common/message/OffsetFetchRequest.json +++ b/clients/src/main/resources/common/message/OffsetFetchRequest.json @@ -32,8 +32,15 @@ // // Version 7 is adding the require stable flag. // - // Version 8 is adding support for fetching offsets for multiple groups at a time - "validVersions": "0-8", + // Version 8 is adding support for fetching offsets for multiple groups at a time. + // + // Version 9 is the first version that can be used with the new consumer group protocol (KIP-848). It adds + // the MemberId and MemberEpoch fields. Those are filled in and validated when the new consumer protocol is used. + // + // Version 9 is added as part of KIP-848 and is still under development. Hence, the last version of the + // API is not exposed by default by brokers unless explicitly enabled. + "latestVersionUnstable": true, + "validVersions": "0-9", "flexibleVersions": "6+", "fields": [ { "name": "GroupId", "type": "string", "versions": "0-7", "entityType": "groupId", @@ -47,8 +54,12 @@ ]}, { "name": "Groups", "type": "[]OffsetFetchRequestGroup", "versions": "8+", "about": "Each group we would like to fetch offsets for", "fields": [ - { "name": "groupId", "type": "string", "versions": "8+", "entityType": "groupId", + { "name": "GroupId", "type": "string", "versions": "8+", "entityType": "groupId", "about": "The group ID."}, + { "name": "MemberId", "type": "string", "versions": "9+", "nullableVersions": "9+", "default": null, "ignorable": true, + "about": "The member ID assigned by the group coordinator if using the new consumer protocol (KIP-848)." }, + { "name": "MemberEpoch", "type": "int32", "versions": "9+", "default": "-1", "ignorable": true, + "about": "The member epoch if using the new consumer protocol (KIP-848)." }, { "name": "Topics", "type": "[]OffsetFetchRequestTopics", "versions": "8+", "nullableVersions": "8+", "about": "Each topic we would like to fetch offsets for, or null to fetch offsets for all topics.", "fields": [ { "name": "Name", "type": "string", "versions": "8+", "entityType": "topicName", @@ -57,7 +68,7 @@ "about": "The partition indexes we would like to fetch offsets for." } ]} ]}, - {"name": "RequireStable", "type": "bool", "versions": "7+", "default": "false", - "about": "Whether broker should hold on returning unstable offsets but set a retriable error code for the partitions."} + { "name": "RequireStable", "type": "bool", "versions": "7+", "default": "false", + "about": "Whether broker should hold on returning unstable offsets but set a retriable error code for the partitions." } ] } diff --git a/clients/src/main/resources/common/message/OffsetFetchResponse.json b/clients/src/main/resources/common/message/OffsetFetchResponse.json index 71acf0b4d2e..0b4cc10c3b4 100644 --- a/clients/src/main/resources/common/message/OffsetFetchResponse.json +++ b/clients/src/main/resources/common/message/OffsetFetchResponse.json @@ -32,8 +32,21 @@ // Version 7 adds pending offset commit as new error response on partition level. // // Version 8 is adding support for fetching offsets for multiple groups - "validVersions": "0-8", + // + // Version 9 is the first version that can be used with the new consumer group protocol (KIP-848). The response is + // the same as version 8 but can return STALE_MEMBER_EPOCH and UNKNOWN_MEMBER_ID errors when the new consumer group + // protocol is used. + "validVersions": "0-9", "flexibleVersions": "6+", + // Supported errors: + // - GROUP_AUTHORIZATION_FAILED (version 0+) + // - NOT_COORDINATOR (version 0+) + // - COORDINATOR_NOT_AVAILABLE (version 0+) + // - COORDINATOR_LOAD_IN_PROGRESS (version 0+) + // - GROUP_ID_NOT_FOUND (version 0+) + // - UNSTABLE_OFFSET_COMMIT (version 7+) + // - UNKNOWN_MEMBER_ID (version 9+) + // - STALE_MEMBER_EPOCH (version 9+) "fields": [ { "name": "ThrottleTimeMs", "type": "int32", "versions": "3+", "ignorable": true, "about": "The duration in milliseconds for which the request was throttled due to a quota violation, or zero if the request did not violate any quota." }, @@ -59,7 +72,7 @@ "about": "The top-level error code, or 0 if there was no error." }, { "name": "Groups", "type": "[]OffsetFetchResponseGroup", "versions": "8+", "about": "The responses per group id.", "fields": [ - { "name": "groupId", "type": "string", "versions": "8+", "entityType": "groupId", + { "name": "GroupId", "type": "string", "versions": "8+", "entityType": "groupId", "about": "The group ID." }, { "name": "Topics", "type": "[]OffsetFetchResponseTopics", "versions": "8+", "about": "The responses per topic.", "fields": [ diff --git a/clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java b/clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java index be4cd2244b6..c8bd3563b53 100644 --- a/clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java +++ b/clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java @@ -242,6 +242,8 @@ import org.apache.kafka.common.utils.Utils; import org.apache.kafka.test.TestUtils; import org.junit.jupiter.api.Assertions; import org.junit.jupiter.api.Test; +import org.junit.jupiter.params.ParameterizedTest; +import org.junit.jupiter.params.provider.ValueSource; import java.nio.BufferUnderflowException; import java.nio.ByteBuffer; @@ -777,29 +779,27 @@ public class RequestResponseTest { } } - @Test - public void testOffsetFetchRequestBuilderToStringV8AndAbove() { - List stableFlags = asList(true, false); - for (Boolean requireStable : stableFlags) { - String allTopicPartitionsString = new OffsetFetchRequest.Builder( - Collections.singletonMap("someGroup", null), - requireStable, - false) - .toString(); - assertTrue(allTopicPartitionsString.contains("groups=[OffsetFetchRequestGroup" - + "(groupId='someGroup', topics=null)], requireStable=" + requireStable)); + @ParameterizedTest + @ValueSource(booleans = {false, true}) + public void testOffsetFetchRequestBuilderToStringV8AndAbove(boolean requireStable) { + String allTopicPartitionsString = new OffsetFetchRequest.Builder( + Collections.singletonMap("someGroup", null), + requireStable, + false + ).toString(); + assertTrue(allTopicPartitionsString.contains("groups=[OffsetFetchRequestGroup" + + "(groupId='someGroup', memberId='', memberEpoch=-1, topics=null)], requireStable=" + requireStable)); - String subsetTopicPartitionsString = new OffsetFetchRequest.Builder( - Collections.singletonMap( - "group1", - singletonList(new TopicPartition("test11", 1))), - requireStable, - false) - .toString(); - assertTrue(subsetTopicPartitionsString.contains("test11")); - assertTrue(subsetTopicPartitionsString.contains("group1")); - assertTrue(subsetTopicPartitionsString.contains("requireStable=" + requireStable)); - } + String subsetTopicPartitionsString = new OffsetFetchRequest.Builder( + Collections.singletonMap( + "group1", + singletonList(new TopicPartition("test11", 1))), + requireStable, + false + ).toString(); + assertTrue(subsetTopicPartitionsString.contains("test11")); + assertTrue(subsetTopicPartitionsString.contains("group1")); + assertTrue(subsetTopicPartitionsString.contains("requireStable=" + requireStable)); } @Test diff --git a/core/src/test/scala/unit/kafka/server/OffsetFetchRequestTest.scala b/core/src/test/scala/unit/kafka/server/OffsetFetchRequestTest.scala index cf1c52841e5..cfe550e2671 100644 --- a/core/src/test/scala/unit/kafka/server/OffsetFetchRequestTest.scala +++ b/core/src/test/scala/unit/kafka/server/OffsetFetchRequestTest.scala @@ -70,6 +70,7 @@ class OffsetFetchRequestTest extends BaseRequestTest { properties.put(KafkaConfig.TransactionsTopicPartitionsProp, "1") properties.put(KafkaConfig.TransactionsTopicReplicationFactorProp, "1") properties.put(KafkaConfig.TransactionsTopicMinISRProp, "1") + properties.setProperty(KafkaConfig.UnstableApiVersionsEnableProp, "true") } @BeforeEach diff --git a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/Group.java b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/Group.java index 0ffc741dcdd..44290ae7fe7 100644 --- a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/Group.java +++ b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/Group.java @@ -69,6 +69,14 @@ public interface Group { /** * Validates the OffsetFetch request. + * + * @param memberId The member id for consumer groups. + * @param memberEpoch The member epoch for consumer groups. + * @param lastCommittedOffset The last committed offsets in the timeline. */ - void validateOffsetFetch() throws KafkaException; + void validateOffsetFetch( + String memberId, + int memberEpoch, + long lastCommittedOffset + ) throws KafkaException; } diff --git a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/OffsetMetadataManager.java b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/OffsetMetadataManager.java index e551a61e849..c8bf388d714 100644 --- a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/OffsetMetadataManager.java +++ b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/OffsetMetadataManager.java @@ -225,17 +225,21 @@ public class OffsetMetadataManager { } /** - * Validates an OffsetCommit request. + * Validates an OffsetFetch request. * - * @param groupId The group id. + * @param request The actual request. * @param lastCommittedOffset The last committed offsets in the timeline. */ private void validateOffsetFetch( - String groupId, + OffsetFetchRequestData.OffsetFetchRequestGroup request, long lastCommittedOffset ) throws GroupIdNotFoundException { - Group group = groupMetadataManager.group(groupId, lastCommittedOffset); - group.validateOffsetFetch(); + Group group = groupMetadataManager.group(request.groupId(), lastCommittedOffset); + group.validateOffsetFetch( + request.memberId(), + request.memberEpoch(), + lastCommittedOffset + ); } /** @@ -343,7 +347,7 @@ public class OffsetMetadataManager { ) throws ApiException { boolean failAllPartitions = false; try { - validateOffsetFetch(request.groupId(), lastCommittedOffset); + validateOffsetFetch(request, lastCommittedOffset); } catch (GroupIdNotFoundException ex) { failAllPartitions = true; } @@ -398,7 +402,7 @@ public class OffsetMetadataManager { long lastCommittedOffset ) throws ApiException { try { - validateOffsetFetch(request.groupId(), lastCommittedOffset); + validateOffsetFetch(request, lastCommittedOffset); } catch (GroupIdNotFoundException ex) { return new OffsetFetchResponseData.OffsetFetchResponseGroup() .setGroupId(request.groupId()) diff --git a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/consumer/ConsumerGroup.java b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/consumer/ConsumerGroup.java index f6eaed02fa3..5328020baac 100644 --- a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/consumer/ConsumerGroup.java +++ b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/consumer/ConsumerGroup.java @@ -19,7 +19,6 @@ package org.apache.kafka.coordinator.group.consumer; import org.apache.kafka.common.Uuid; import org.apache.kafka.common.errors.StaleMemberEpochException; import org.apache.kafka.common.errors.UnknownMemberIdException; -import org.apache.kafka.common.protocol.Errors; import org.apache.kafka.coordinator.group.Group; import org.apache.kafka.image.ClusterImage; import org.apache.kafka.image.TopicImage; @@ -538,17 +537,47 @@ public class ConsumerGroup implements Group { if (memberEpoch < 0 && members().isEmpty()) return; final ConsumerGroupMember member = getOrMaybeCreateMember(memberId, false); - if (memberEpoch != member.memberEpoch()) { - throw Errors.STALE_MEMBER_EPOCH.exception(); - } + validateMemberEpoch(memberEpoch, member.memberEpoch()); } /** * Validates the OffsetFetch request. + * + * @param memberId The member id for consumer groups. + * @param memberEpoch The member epoch for consumer groups. + * @param lastCommittedOffset The last committed offsets in the timeline. */ @Override - public void validateOffsetFetch() { - // Nothing. + public void validateOffsetFetch( + String memberId, + int memberEpoch, + long lastCommittedOffset + ) throws UnknownMemberIdException, StaleMemberEpochException { + // When the member id is null and the member epoch is -1, the request either comes + // from the admin client or from a client which does not provide them. In this case, + // the fetch request is accepted. + if (memberId == null && memberEpoch < 0) return; + + final ConsumerGroupMember member = members.get(memberId, lastCommittedOffset); + if (member == null) { + throw new UnknownMemberIdException(String.format("Member %s is not a member of group %s.", + memberId, groupId)); + } + validateMemberEpoch(memberEpoch, member.memberEpoch()); + } + + /** + * Throws a StaleMemberEpochException if the received member epoch does not match + * the expected member epoch. + */ + private void validateMemberEpoch( + int receivedMemberEpoch, + int expectedMemberEpoch + ) throws StaleMemberEpochException { + if (receivedMemberEpoch != expectedMemberEpoch) { + throw new StaleMemberEpochException(String.format("The received member epoch %d does not match " + + "the expected member epoch %d.", receivedMemberEpoch, expectedMemberEpoch)); + } } /** diff --git a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/generic/GenericGroup.java b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/generic/GenericGroup.java index edad170806b..3347970aaf0 100644 --- a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/generic/GenericGroup.java +++ b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/generic/GenericGroup.java @@ -823,9 +823,17 @@ public class GenericGroup implements Group { /** * Validates the OffsetFetch request. + * + * @param memberId The member id. This is not provided for generic groups. + * @param memberEpoch The member epoch for consumer groups. This is not provided for generic groups. + * @param lastCommittedOffset The last committed offsets in the timeline. */ @Override - public void validateOffsetFetch() throws GroupIdNotFoundException { + public void validateOffsetFetch( + String memberId, + int memberEpoch, + long lastCommittedOffset + ) throws GroupIdNotFoundException { if (isInState(DEAD)) { throw new GroupIdNotFoundException(String.format("Group %s is in dead state.", groupId)); } diff --git a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/OffsetMetadataManagerTest.java b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/OffsetMetadataManagerTest.java index e60ec1bc4e9..7f355da76aa 100644 --- a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/OffsetMetadataManagerTest.java +++ b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/OffsetMetadataManagerTest.java @@ -181,10 +181,28 @@ public class OffsetMetadataManagerTest { String groupId, List topics, long committedOffset + ) { + return fetchOffsets( + groupId, + null, + -1, + topics, + committedOffset + ); + } + + public List fetchOffsets( + String groupId, + String memberId, + int memberEpoch, + List topics, + long committedOffset ) { OffsetFetchResponseData.OffsetFetchResponseGroup response = offsetMetadataManager.fetchOffsets( new OffsetFetchRequestData.OffsetFetchRequestGroup() .setGroupId(groupId) + .setMemberId(memberId) + .setMemberEpoch(memberEpoch) .setTopics(topics), committedOffset ); @@ -195,9 +213,26 @@ public class OffsetMetadataManagerTest { public List fetchAllOffsets( String groupId, long committedOffset + ) { + return fetchAllOffsets( + groupId, + null, + -1, + committedOffset + ); + } + + public List fetchAllOffsets( + String groupId, + String memberId, + int memberEpoch, + long committedOffset ) { OffsetFetchResponseData.OffsetFetchResponseGroup response = offsetMetadataManager.fetchAllOffsets( - new OffsetFetchRequestData.OffsetFetchRequestGroup().setGroupId(groupId), + new OffsetFetchRequestData.OffsetFetchRequestGroup() + .setGroupId(groupId) + .setMemberId(memberId) + .setMemberEpoch(memberEpoch), committedOffset ); assertEquals(groupId, response.groupId()); @@ -1409,6 +1444,123 @@ public class OffsetMetadataManagerTest { ), context.fetchAllOffsets("group", Long.MAX_VALUE)); } + @Test + public void testConsumerGroupOffsetFetchWithMemberIdAndEpoch() { + OffsetMetadataManagerTestContext context = new OffsetMetadataManagerTestContext.Builder().build(); + // Create consumer group. + ConsumerGroup group = context.groupMetadataManager.getOrMaybeCreateConsumerGroup("group", true); + // Create member. + group.getOrMaybeCreateMember("member", true); + // Commit offset. + context.commitOffset("group", "foo", 0, 100L, 1); + + // Fetch offsets case. + List topics = Collections.singletonList( + new OffsetFetchRequestData.OffsetFetchRequestTopics() + .setName("foo") + .setPartitionIndexes(Collections.singletonList(0)) + ); + + assertEquals(Collections.singletonList( + new OffsetFetchResponseData.OffsetFetchResponseTopics() + .setName("foo") + .setPartitions(Collections.singletonList( + mkOffsetPartitionResponse(0, 100L, 1, "metadata") + )) + ), context.fetchOffsets("group", "member", 0, topics, Long.MAX_VALUE)); + + // Fetch all offsets case. + assertEquals(Collections.singletonList( + new OffsetFetchResponseData.OffsetFetchResponseTopics() + .setName("foo") + .setPartitions(Collections.singletonList( + mkOffsetPartitionResponse(0, 100L, 1, "metadata") + )) + ), context.fetchAllOffsets("group", "member", 0, Long.MAX_VALUE)); + } + + @Test + public void testConsumerGroupOffsetFetchFromAdminClient() { + OffsetMetadataManagerTestContext context = new OffsetMetadataManagerTestContext.Builder().build(); + // Create consumer group. + ConsumerGroup group = context.groupMetadataManager.getOrMaybeCreateConsumerGroup("group", true); + // Create member. + group.getOrMaybeCreateMember("member", true); + // Commit offset. + context.commitOffset("group", "foo", 0, 100L, 1); + + // Fetch offsets case. + List topics = Collections.singletonList( + new OffsetFetchRequestData.OffsetFetchRequestTopics() + .setName("foo") + .setPartitionIndexes(Collections.singletonList(0)) + ); + + assertEquals(Collections.singletonList( + new OffsetFetchResponseData.OffsetFetchResponseTopics() + .setName("foo") + .setPartitions(Collections.singletonList( + mkOffsetPartitionResponse(0, 100L, 1, "metadata") + )) + ), context.fetchOffsets("group", topics, Long.MAX_VALUE)); + + // Fetch all offsets case. + assertEquals(Collections.singletonList( + new OffsetFetchResponseData.OffsetFetchResponseTopics() + .setName("foo") + .setPartitions(Collections.singletonList( + mkOffsetPartitionResponse(0, 100L, 1, "metadata") + )) + ), context.fetchAllOffsets("group", Long.MAX_VALUE)); + } + + @Test + public void testConsumerGroupOffsetFetchWithUnknownMemberId() { + OffsetMetadataManagerTestContext context = new OffsetMetadataManagerTestContext.Builder().build(); + context.groupMetadataManager.getOrMaybeCreateConsumerGroup("group", true); + + // Fetch offsets case. + List topics = Collections.singletonList( + new OffsetFetchRequestData.OffsetFetchRequestTopics() + .setName("foo") + .setPartitionIndexes(Collections.singletonList(0)) + ); + + // Fetch offsets cases. + assertThrows(UnknownMemberIdException.class, + () -> context.fetchOffsets("group", "", 0, topics, Long.MAX_VALUE)); + assertThrows(UnknownMemberIdException.class, + () -> context.fetchOffsets("group", "member", 0, topics, Long.MAX_VALUE)); + + // Fetch all offsets cases. + assertThrows(UnknownMemberIdException.class, + () -> context.fetchAllOffsets("group", "", 0, Long.MAX_VALUE)); + assertThrows(UnknownMemberIdException.class, + () -> context.fetchAllOffsets("group", "member", 0, Long.MAX_VALUE)); + } + + @Test + public void testConsumerGroupOffsetFetchWithStaleMemberEpoch() { + OffsetMetadataManagerTestContext context = new OffsetMetadataManagerTestContext.Builder().build(); + ConsumerGroup group = context.groupMetadataManager.getOrMaybeCreateConsumerGroup("group", true); + group.getOrMaybeCreateMember("member", true); + + // Fetch offsets case. + List topics = Collections.singletonList( + new OffsetFetchRequestData.OffsetFetchRequestTopics() + .setName("foo") + .setPartitionIndexes(Collections.singletonList(0)) + ); + + // Fetch offsets case. + assertThrows(StaleMemberEpochException.class, + () -> context.fetchOffsets("group", "member", 10, topics, Long.MAX_VALUE)); + + // Fetch all offsets case. + assertThrows(StaleMemberEpochException.class, + () -> context.fetchAllOffsets("group", "member", 10, Long.MAX_VALUE)); + } + static private OffsetFetchResponseData.OffsetFetchResponsePartitions mkOffsetPartitionResponse( int partition, long offset, diff --git a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/consumer/ConsumerGroupTest.java b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/consumer/ConsumerGroupTest.java index a715d4187bb..aa848aa3f5d 100644 --- a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/consumer/ConsumerGroupTest.java +++ b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/consumer/ConsumerGroupTest.java @@ -631,4 +631,32 @@ public class ConsumerGroupTest { // This should succeed. group.validateOffsetCommit("member-id", "", 0); } + + @Test + public void testValidateOffsetFetch() { + SnapshotRegistry snapshotRegistry = new SnapshotRegistry(new LogContext()); + ConsumerGroup group = new ConsumerGroup(snapshotRegistry, "group-foo"); + + // Simulate a call from the admin client without member id and member epoch. + group.validateOffsetFetch(null, -1, Long.MAX_VALUE); + + // The member does not exist. + assertThrows(UnknownMemberIdException.class, () -> + group.validateOffsetFetch("member-id", 0, Long.MAX_VALUE)); + + // Create a member. + snapshotRegistry.getOrCreateSnapshot(0); + group.getOrMaybeCreateMember("member-id", true); + + // The member does not exist at last committed offset 0. + assertThrows(UnknownMemberIdException.class, () -> + group.validateOffsetFetch("member-id", 0, 0)); + + // The member exists but the epoch is stale when the last committed offset is not considered. + assertThrows(StaleMemberEpochException.class, () -> + group.validateOffsetFetch("member-id", 10, Long.MAX_VALUE)); + + // This should succeed. + group.validateOffsetFetch("member-id", 0, Long.MAX_VALUE); + } }