Browse Source

KAFKA-14499: [6/N] Add MemberId and MemberEpoch to OffsetFetchRequest (#14321)

This patch adds the MemberId and the MemberEpoch fields to the OffsetFetchRequest. Those fields will be populated when the new consumer group protocol is used to ensure that the member fetching the offset has the correct member id and epoch. If it does not, UNKNOWN_MEMBER_ID or STALE_MEMBER_EPOCH are returned to the client.

Our initial idea was to implement the same for the old protocol. The field is called GenerationIdOrMemberEpoch in KIP-848 to materialize this. As a second though, I think that we should only do it for the new protocol. The effort to implement it in the old protocol is not worth it in my opinion.

Reviewers: Ritika Reddy <rreddy@confluent.io>, Calvin Liu <caliu@confluent.io>, Justine Olshan <jolshan@confluent.io>
pull/14348/head
David Jacot 1 year ago committed by GitHub
parent
commit
7054625c45
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 21
      clients/src/main/resources/common/message/OffsetFetchRequest.json
  2. 17
      clients/src/main/resources/common/message/OffsetFetchResponse.json
  3. 44
      clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java
  4. 1
      core/src/test/scala/unit/kafka/server/OffsetFetchRequestTest.scala
  5. 10
      group-coordinator/src/main/java/org/apache/kafka/coordinator/group/Group.java
  6. 18
      group-coordinator/src/main/java/org/apache/kafka/coordinator/group/OffsetMetadataManager.java
  7. 41
      group-coordinator/src/main/java/org/apache/kafka/coordinator/group/consumer/ConsumerGroup.java
  8. 10
      group-coordinator/src/main/java/org/apache/kafka/coordinator/group/generic/GenericGroup.java
  9. 154
      group-coordinator/src/test/java/org/apache/kafka/coordinator/group/OffsetMetadataManagerTest.java
  10. 28
      group-coordinator/src/test/java/org/apache/kafka/coordinator/group/consumer/ConsumerGroupTest.java

21
clients/src/main/resources/common/message/OffsetFetchRequest.json

@ -32,8 +32,15 @@ @@ -32,8 +32,15 @@
//
// Version 7 is adding the require stable flag.
//
// Version 8 is adding support for fetching offsets for multiple groups at a time
"validVersions": "0-8",
// Version 8 is adding support for fetching offsets for multiple groups at a time.
//
// Version 9 is the first version that can be used with the new consumer group protocol (KIP-848). It adds
// the MemberId and MemberEpoch fields. Those are filled in and validated when the new consumer protocol is used.
//
// Version 9 is added as part of KIP-848 and is still under development. Hence, the last version of the
// API is not exposed by default by brokers unless explicitly enabled.
"latestVersionUnstable": true,
"validVersions": "0-9",
"flexibleVersions": "6+",
"fields": [
{ "name": "GroupId", "type": "string", "versions": "0-7", "entityType": "groupId",
@ -47,8 +54,12 @@ @@ -47,8 +54,12 @@
]},
{ "name": "Groups", "type": "[]OffsetFetchRequestGroup", "versions": "8+",
"about": "Each group we would like to fetch offsets for", "fields": [
{ "name": "groupId", "type": "string", "versions": "8+", "entityType": "groupId",
{ "name": "GroupId", "type": "string", "versions": "8+", "entityType": "groupId",
"about": "The group ID."},
{ "name": "MemberId", "type": "string", "versions": "9+", "nullableVersions": "9+", "default": null, "ignorable": true,
"about": "The member ID assigned by the group coordinator if using the new consumer protocol (KIP-848)." },
{ "name": "MemberEpoch", "type": "int32", "versions": "9+", "default": "-1", "ignorable": true,
"about": "The member epoch if using the new consumer protocol (KIP-848)." },
{ "name": "Topics", "type": "[]OffsetFetchRequestTopics", "versions": "8+", "nullableVersions": "8+",
"about": "Each topic we would like to fetch offsets for, or null to fetch offsets for all topics.", "fields": [
{ "name": "Name", "type": "string", "versions": "8+", "entityType": "topicName",
@ -57,7 +68,7 @@ @@ -57,7 +68,7 @@
"about": "The partition indexes we would like to fetch offsets for." }
]}
]},
{"name": "RequireStable", "type": "bool", "versions": "7+", "default": "false",
"about": "Whether broker should hold on returning unstable offsets but set a retriable error code for the partitions."}
{ "name": "RequireStable", "type": "bool", "versions": "7+", "default": "false",
"about": "Whether broker should hold on returning unstable offsets but set a retriable error code for the partitions." }
]
}

17
clients/src/main/resources/common/message/OffsetFetchResponse.json

@ -32,8 +32,21 @@ @@ -32,8 +32,21 @@
// Version 7 adds pending offset commit as new error response on partition level.
//
// Version 8 is adding support for fetching offsets for multiple groups
"validVersions": "0-8",
//
// Version 9 is the first version that can be used with the new consumer group protocol (KIP-848). The response is
// the same as version 8 but can return STALE_MEMBER_EPOCH and UNKNOWN_MEMBER_ID errors when the new consumer group
// protocol is used.
"validVersions": "0-9",
"flexibleVersions": "6+",
// Supported errors:
// - GROUP_AUTHORIZATION_FAILED (version 0+)
// - NOT_COORDINATOR (version 0+)
// - COORDINATOR_NOT_AVAILABLE (version 0+)
// - COORDINATOR_LOAD_IN_PROGRESS (version 0+)
// - GROUP_ID_NOT_FOUND (version 0+)
// - UNSTABLE_OFFSET_COMMIT (version 7+)
// - UNKNOWN_MEMBER_ID (version 9+)
// - STALE_MEMBER_EPOCH (version 9+)
"fields": [
{ "name": "ThrottleTimeMs", "type": "int32", "versions": "3+", "ignorable": true,
"about": "The duration in milliseconds for which the request was throttled due to a quota violation, or zero if the request did not violate any quota." },
@ -59,7 +72,7 @@ @@ -59,7 +72,7 @@
"about": "The top-level error code, or 0 if there was no error." },
{ "name": "Groups", "type": "[]OffsetFetchResponseGroup", "versions": "8+",
"about": "The responses per group id.", "fields": [
{ "name": "groupId", "type": "string", "versions": "8+", "entityType": "groupId",
{ "name": "GroupId", "type": "string", "versions": "8+", "entityType": "groupId",
"about": "The group ID." },
{ "name": "Topics", "type": "[]OffsetFetchResponseTopics", "versions": "8+",
"about": "The responses per topic.", "fields": [

44
clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java

@ -242,6 +242,8 @@ import org.apache.kafka.common.utils.Utils; @@ -242,6 +242,8 @@ import org.apache.kafka.common.utils.Utils;
import org.apache.kafka.test.TestUtils;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.ValueSource;
import java.nio.BufferUnderflowException;
import java.nio.ByteBuffer;
@ -777,29 +779,27 @@ public class RequestResponseTest { @@ -777,29 +779,27 @@ public class RequestResponseTest {
}
}
@Test
public void testOffsetFetchRequestBuilderToStringV8AndAbove() {
List<Boolean> stableFlags = asList(true, false);
for (Boolean requireStable : stableFlags) {
String allTopicPartitionsString = new OffsetFetchRequest.Builder(
Collections.singletonMap("someGroup", null),
requireStable,
false)
.toString();
assertTrue(allTopicPartitionsString.contains("groups=[OffsetFetchRequestGroup"
+ "(groupId='someGroup', topics=null)], requireStable=" + requireStable));
@ParameterizedTest
@ValueSource(booleans = {false, true})
public void testOffsetFetchRequestBuilderToStringV8AndAbove(boolean requireStable) {
String allTopicPartitionsString = new OffsetFetchRequest.Builder(
Collections.singletonMap("someGroup", null),
requireStable,
false
).toString();
assertTrue(allTopicPartitionsString.contains("groups=[OffsetFetchRequestGroup"
+ "(groupId='someGroup', memberId='', memberEpoch=-1, topics=null)], requireStable=" + requireStable));
String subsetTopicPartitionsString = new OffsetFetchRequest.Builder(
Collections.singletonMap(
"group1",
singletonList(new TopicPartition("test11", 1))),
requireStable,
false)
.toString();
assertTrue(subsetTopicPartitionsString.contains("test11"));
assertTrue(subsetTopicPartitionsString.contains("group1"));
assertTrue(subsetTopicPartitionsString.contains("requireStable=" + requireStable));
}
String subsetTopicPartitionsString = new OffsetFetchRequest.Builder(
Collections.singletonMap(
"group1",
singletonList(new TopicPartition("test11", 1))),
requireStable,
false
).toString();
assertTrue(subsetTopicPartitionsString.contains("test11"));
assertTrue(subsetTopicPartitionsString.contains("group1"));
assertTrue(subsetTopicPartitionsString.contains("requireStable=" + requireStable));
}
@Test

1
core/src/test/scala/unit/kafka/server/OffsetFetchRequestTest.scala

@ -70,6 +70,7 @@ class OffsetFetchRequestTest extends BaseRequestTest { @@ -70,6 +70,7 @@ class OffsetFetchRequestTest extends BaseRequestTest {
properties.put(KafkaConfig.TransactionsTopicPartitionsProp, "1")
properties.put(KafkaConfig.TransactionsTopicReplicationFactorProp, "1")
properties.put(KafkaConfig.TransactionsTopicMinISRProp, "1")
properties.setProperty(KafkaConfig.UnstableApiVersionsEnableProp, "true")
}
@BeforeEach

10
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/Group.java

@ -69,6 +69,14 @@ public interface Group { @@ -69,6 +69,14 @@ public interface Group {
/**
* Validates the OffsetFetch request.
*
* @param memberId The member id for consumer groups.
* @param memberEpoch The member epoch for consumer groups.
* @param lastCommittedOffset The last committed offsets in the timeline.
*/
void validateOffsetFetch() throws KafkaException;
void validateOffsetFetch(
String memberId,
int memberEpoch,
long lastCommittedOffset
) throws KafkaException;
}

18
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/OffsetMetadataManager.java

@ -225,17 +225,21 @@ public class OffsetMetadataManager { @@ -225,17 +225,21 @@ public class OffsetMetadataManager {
}
/**
* Validates an OffsetCommit request.
* Validates an OffsetFetch request.
*
* @param groupId The group id.
* @param request The actual request.
* @param lastCommittedOffset The last committed offsets in the timeline.
*/
private void validateOffsetFetch(
String groupId,
OffsetFetchRequestData.OffsetFetchRequestGroup request,
long lastCommittedOffset
) throws GroupIdNotFoundException {
Group group = groupMetadataManager.group(groupId, lastCommittedOffset);
group.validateOffsetFetch();
Group group = groupMetadataManager.group(request.groupId(), lastCommittedOffset);
group.validateOffsetFetch(
request.memberId(),
request.memberEpoch(),
lastCommittedOffset
);
}
/**
@ -343,7 +347,7 @@ public class OffsetMetadataManager { @@ -343,7 +347,7 @@ public class OffsetMetadataManager {
) throws ApiException {
boolean failAllPartitions = false;
try {
validateOffsetFetch(request.groupId(), lastCommittedOffset);
validateOffsetFetch(request, lastCommittedOffset);
} catch (GroupIdNotFoundException ex) {
failAllPartitions = true;
}
@ -398,7 +402,7 @@ public class OffsetMetadataManager { @@ -398,7 +402,7 @@ public class OffsetMetadataManager {
long lastCommittedOffset
) throws ApiException {
try {
validateOffsetFetch(request.groupId(), lastCommittedOffset);
validateOffsetFetch(request, lastCommittedOffset);
} catch (GroupIdNotFoundException ex) {
return new OffsetFetchResponseData.OffsetFetchResponseGroup()
.setGroupId(request.groupId())

41
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/consumer/ConsumerGroup.java

@ -19,7 +19,6 @@ package org.apache.kafka.coordinator.group.consumer; @@ -19,7 +19,6 @@ package org.apache.kafka.coordinator.group.consumer;
import org.apache.kafka.common.Uuid;
import org.apache.kafka.common.errors.StaleMemberEpochException;
import org.apache.kafka.common.errors.UnknownMemberIdException;
import org.apache.kafka.common.protocol.Errors;
import org.apache.kafka.coordinator.group.Group;
import org.apache.kafka.image.ClusterImage;
import org.apache.kafka.image.TopicImage;
@ -538,17 +537,47 @@ public class ConsumerGroup implements Group { @@ -538,17 +537,47 @@ public class ConsumerGroup implements Group {
if (memberEpoch < 0 && members().isEmpty()) return;
final ConsumerGroupMember member = getOrMaybeCreateMember(memberId, false);
if (memberEpoch != member.memberEpoch()) {
throw Errors.STALE_MEMBER_EPOCH.exception();
}
validateMemberEpoch(memberEpoch, member.memberEpoch());
}
/**
* Validates the OffsetFetch request.
*
* @param memberId The member id for consumer groups.
* @param memberEpoch The member epoch for consumer groups.
* @param lastCommittedOffset The last committed offsets in the timeline.
*/
@Override
public void validateOffsetFetch() {
// Nothing.
public void validateOffsetFetch(
String memberId,
int memberEpoch,
long lastCommittedOffset
) throws UnknownMemberIdException, StaleMemberEpochException {
// When the member id is null and the member epoch is -1, the request either comes
// from the admin client or from a client which does not provide them. In this case,
// the fetch request is accepted.
if (memberId == null && memberEpoch < 0) return;
final ConsumerGroupMember member = members.get(memberId, lastCommittedOffset);
if (member == null) {
throw new UnknownMemberIdException(String.format("Member %s is not a member of group %s.",
memberId, groupId));
}
validateMemberEpoch(memberEpoch, member.memberEpoch());
}
/**
* Throws a StaleMemberEpochException if the received member epoch does not match
* the expected member epoch.
*/
private void validateMemberEpoch(
int receivedMemberEpoch,
int expectedMemberEpoch
) throws StaleMemberEpochException {
if (receivedMemberEpoch != expectedMemberEpoch) {
throw new StaleMemberEpochException(String.format("The received member epoch %d does not match "
+ "the expected member epoch %d.", receivedMemberEpoch, expectedMemberEpoch));
}
}
/**

10
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/generic/GenericGroup.java

@ -823,9 +823,17 @@ public class GenericGroup implements Group { @@ -823,9 +823,17 @@ public class GenericGroup implements Group {
/**
* Validates the OffsetFetch request.
*
* @param memberId The member id. This is not provided for generic groups.
* @param memberEpoch The member epoch for consumer groups. This is not provided for generic groups.
* @param lastCommittedOffset The last committed offsets in the timeline.
*/
@Override
public void validateOffsetFetch() throws GroupIdNotFoundException {
public void validateOffsetFetch(
String memberId,
int memberEpoch,
long lastCommittedOffset
) throws GroupIdNotFoundException {
if (isInState(DEAD)) {
throw new GroupIdNotFoundException(String.format("Group %s is in dead state.", groupId));
}

154
group-coordinator/src/test/java/org/apache/kafka/coordinator/group/OffsetMetadataManagerTest.java

@ -181,10 +181,28 @@ public class OffsetMetadataManagerTest { @@ -181,10 +181,28 @@ public class OffsetMetadataManagerTest {
String groupId,
List<OffsetFetchRequestData.OffsetFetchRequestTopics> topics,
long committedOffset
) {
return fetchOffsets(
groupId,
null,
-1,
topics,
committedOffset
);
}
public List<OffsetFetchResponseData.OffsetFetchResponseTopics> fetchOffsets(
String groupId,
String memberId,
int memberEpoch,
List<OffsetFetchRequestData.OffsetFetchRequestTopics> topics,
long committedOffset
) {
OffsetFetchResponseData.OffsetFetchResponseGroup response = offsetMetadataManager.fetchOffsets(
new OffsetFetchRequestData.OffsetFetchRequestGroup()
.setGroupId(groupId)
.setMemberId(memberId)
.setMemberEpoch(memberEpoch)
.setTopics(topics),
committedOffset
);
@ -195,9 +213,26 @@ public class OffsetMetadataManagerTest { @@ -195,9 +213,26 @@ public class OffsetMetadataManagerTest {
public List<OffsetFetchResponseData.OffsetFetchResponseTopics> fetchAllOffsets(
String groupId,
long committedOffset
) {
return fetchAllOffsets(
groupId,
null,
-1,
committedOffset
);
}
public List<OffsetFetchResponseData.OffsetFetchResponseTopics> fetchAllOffsets(
String groupId,
String memberId,
int memberEpoch,
long committedOffset
) {
OffsetFetchResponseData.OffsetFetchResponseGroup response = offsetMetadataManager.fetchAllOffsets(
new OffsetFetchRequestData.OffsetFetchRequestGroup().setGroupId(groupId),
new OffsetFetchRequestData.OffsetFetchRequestGroup()
.setGroupId(groupId)
.setMemberId(memberId)
.setMemberEpoch(memberEpoch),
committedOffset
);
assertEquals(groupId, response.groupId());
@ -1409,6 +1444,123 @@ public class OffsetMetadataManagerTest { @@ -1409,6 +1444,123 @@ public class OffsetMetadataManagerTest {
), context.fetchAllOffsets("group", Long.MAX_VALUE));
}
@Test
public void testConsumerGroupOffsetFetchWithMemberIdAndEpoch() {
OffsetMetadataManagerTestContext context = new OffsetMetadataManagerTestContext.Builder().build();
// Create consumer group.
ConsumerGroup group = context.groupMetadataManager.getOrMaybeCreateConsumerGroup("group", true);
// Create member.
group.getOrMaybeCreateMember("member", true);
// Commit offset.
context.commitOffset("group", "foo", 0, 100L, 1);
// Fetch offsets case.
List<OffsetFetchRequestData.OffsetFetchRequestTopics> topics = Collections.singletonList(
new OffsetFetchRequestData.OffsetFetchRequestTopics()
.setName("foo")
.setPartitionIndexes(Collections.singletonList(0))
);
assertEquals(Collections.singletonList(
new OffsetFetchResponseData.OffsetFetchResponseTopics()
.setName("foo")
.setPartitions(Collections.singletonList(
mkOffsetPartitionResponse(0, 100L, 1, "metadata")
))
), context.fetchOffsets("group", "member", 0, topics, Long.MAX_VALUE));
// Fetch all offsets case.
assertEquals(Collections.singletonList(
new OffsetFetchResponseData.OffsetFetchResponseTopics()
.setName("foo")
.setPartitions(Collections.singletonList(
mkOffsetPartitionResponse(0, 100L, 1, "metadata")
))
), context.fetchAllOffsets("group", "member", 0, Long.MAX_VALUE));
}
@Test
public void testConsumerGroupOffsetFetchFromAdminClient() {
OffsetMetadataManagerTestContext context = new OffsetMetadataManagerTestContext.Builder().build();
// Create consumer group.
ConsumerGroup group = context.groupMetadataManager.getOrMaybeCreateConsumerGroup("group", true);
// Create member.
group.getOrMaybeCreateMember("member", true);
// Commit offset.
context.commitOffset("group", "foo", 0, 100L, 1);
// Fetch offsets case.
List<OffsetFetchRequestData.OffsetFetchRequestTopics> topics = Collections.singletonList(
new OffsetFetchRequestData.OffsetFetchRequestTopics()
.setName("foo")
.setPartitionIndexes(Collections.singletonList(0))
);
assertEquals(Collections.singletonList(
new OffsetFetchResponseData.OffsetFetchResponseTopics()
.setName("foo")
.setPartitions(Collections.singletonList(
mkOffsetPartitionResponse(0, 100L, 1, "metadata")
))
), context.fetchOffsets("group", topics, Long.MAX_VALUE));
// Fetch all offsets case.
assertEquals(Collections.singletonList(
new OffsetFetchResponseData.OffsetFetchResponseTopics()
.setName("foo")
.setPartitions(Collections.singletonList(
mkOffsetPartitionResponse(0, 100L, 1, "metadata")
))
), context.fetchAllOffsets("group", Long.MAX_VALUE));
}
@Test
public void testConsumerGroupOffsetFetchWithUnknownMemberId() {
OffsetMetadataManagerTestContext context = new OffsetMetadataManagerTestContext.Builder().build();
context.groupMetadataManager.getOrMaybeCreateConsumerGroup("group", true);
// Fetch offsets case.
List<OffsetFetchRequestData.OffsetFetchRequestTopics> topics = Collections.singletonList(
new OffsetFetchRequestData.OffsetFetchRequestTopics()
.setName("foo")
.setPartitionIndexes(Collections.singletonList(0))
);
// Fetch offsets cases.
assertThrows(UnknownMemberIdException.class,
() -> context.fetchOffsets("group", "", 0, topics, Long.MAX_VALUE));
assertThrows(UnknownMemberIdException.class,
() -> context.fetchOffsets("group", "member", 0, topics, Long.MAX_VALUE));
// Fetch all offsets cases.
assertThrows(UnknownMemberIdException.class,
() -> context.fetchAllOffsets("group", "", 0, Long.MAX_VALUE));
assertThrows(UnknownMemberIdException.class,
() -> context.fetchAllOffsets("group", "member", 0, Long.MAX_VALUE));
}
@Test
public void testConsumerGroupOffsetFetchWithStaleMemberEpoch() {
OffsetMetadataManagerTestContext context = new OffsetMetadataManagerTestContext.Builder().build();
ConsumerGroup group = context.groupMetadataManager.getOrMaybeCreateConsumerGroup("group", true);
group.getOrMaybeCreateMember("member", true);
// Fetch offsets case.
List<OffsetFetchRequestData.OffsetFetchRequestTopics> topics = Collections.singletonList(
new OffsetFetchRequestData.OffsetFetchRequestTopics()
.setName("foo")
.setPartitionIndexes(Collections.singletonList(0))
);
// Fetch offsets case.
assertThrows(StaleMemberEpochException.class,
() -> context.fetchOffsets("group", "member", 10, topics, Long.MAX_VALUE));
// Fetch all offsets case.
assertThrows(StaleMemberEpochException.class,
() -> context.fetchAllOffsets("group", "member", 10, Long.MAX_VALUE));
}
static private OffsetFetchResponseData.OffsetFetchResponsePartitions mkOffsetPartitionResponse(
int partition,
long offset,

28
group-coordinator/src/test/java/org/apache/kafka/coordinator/group/consumer/ConsumerGroupTest.java

@ -631,4 +631,32 @@ public class ConsumerGroupTest { @@ -631,4 +631,32 @@ public class ConsumerGroupTest {
// This should succeed.
group.validateOffsetCommit("member-id", "", 0);
}
@Test
public void testValidateOffsetFetch() {
SnapshotRegistry snapshotRegistry = new SnapshotRegistry(new LogContext());
ConsumerGroup group = new ConsumerGroup(snapshotRegistry, "group-foo");
// Simulate a call from the admin client without member id and member epoch.
group.validateOffsetFetch(null, -1, Long.MAX_VALUE);
// The member does not exist.
assertThrows(UnknownMemberIdException.class, () ->
group.validateOffsetFetch("member-id", 0, Long.MAX_VALUE));
// Create a member.
snapshotRegistry.getOrCreateSnapshot(0);
group.getOrMaybeCreateMember("member-id", true);
// The member does not exist at last committed offset 0.
assertThrows(UnknownMemberIdException.class, () ->
group.validateOffsetFetch("member-id", 0, 0));
// The member exists but the epoch is stale when the last committed offset is not considered.
assertThrows(StaleMemberEpochException.class, () ->
group.validateOffsetFetch("member-id", 10, Long.MAX_VALUE));
// This should succeed.
group.validateOffsetFetch("member-id", 0, Long.MAX_VALUE);
}
}

Loading…
Cancel
Save