Browse Source

KAFKA-14499: [4/N] Implement OffsetFetch API (#14120)

This patch implements the OffsetFetch API in the new group coordinator.

I found out that implementing the `RequireStable` flag is hard (to not say impossible) in the current model. For the context, the flag is here to ensure that an OffsetRequest request does not return stale offsets if there are pending offsets to be committed. In the scala code, we basically check the pending offsets data structure and if they are any pending offsets, we return the `UNSTABLE_OFFSET_COMMIT` error. This tells the consumer to retry.

In our new model, we don't have the pending offsets data structure. Instead, we use a timeline data structure to handle all the pending/uncommitted changes. Because of this we don't know whether offsets are pending for a particular group. Instead of doing this, I propose to not return the `UNSTABLE_OFFSET_COMMIT` error anymore. Instead, when `RequireStable` is set, we use a write operation to ensure that we read the latest offsets. If they are uncommitted offsets, the write operation ensures that the response is only return when they are committed. This gives a similar behaviour in the end.

Reviewers: Justine Olshan <jolshan@confluent.io>
pull/8115/merge
David Jacot 1 year ago committed by GitHub
parent
commit
68b7031dc4
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 5
      group-coordinator/src/main/java/org/apache/kafka/coordinator/group/Group.java
  2. 66
      group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorService.java
  3. 37
      group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorShard.java
  4. 14
      group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java
  5. 220
      group-coordinator/src/main/java/org/apache/kafka/coordinator/group/OffsetMetadataManager.java
  6. 8
      group-coordinator/src/main/java/org/apache/kafka/coordinator/group/consumer/ConsumerGroup.java
  7. 11
      group-coordinator/src/main/java/org/apache/kafka/coordinator/group/generic/GenericGroup.java
  8. 98
      group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupCoordinatorServiceTest.java
  9. 392
      group-coordinator/src/test/java/org/apache/kafka/coordinator/group/OffsetMetadataManagerTest.java

5
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/Group.java

@ -66,4 +66,9 @@ public interface Group { @@ -66,4 +66,9 @@ public interface Group {
String groupInstanceId,
int generationIdOrMemberEpoch
) throws KafkaException;
/**
* Validates the OffsetFetch request.
*/
void validateOffsetFetch() throws KafkaException;
}

66
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorService.java

@ -61,6 +61,7 @@ import org.apache.kafka.common.utils.Utils; @@ -61,6 +61,7 @@ import org.apache.kafka.common.utils.Utils;
import org.apache.kafka.coordinator.group.runtime.CoordinatorShardBuilderSupplier;
import org.apache.kafka.coordinator.group.runtime.CoordinatorEventProcessor;
import org.apache.kafka.coordinator.group.runtime.CoordinatorLoader;
import org.apache.kafka.coordinator.group.runtime.CoordinatorResult;
import org.apache.kafka.coordinator.group.runtime.CoordinatorRuntime;
import org.apache.kafka.coordinator.group.runtime.MultiThreadedEventProcessor;
import org.apache.kafka.coordinator.group.runtime.PartitionWriter;
@ -71,6 +72,7 @@ import org.apache.kafka.server.util.FutureUtils; @@ -71,6 +72,7 @@ import org.apache.kafka.server.util.FutureUtils;
import org.apache.kafka.server.util.timer.Timer;
import org.slf4j.Logger;
import java.util.Collections;
import java.util.List;
import java.util.OptionalInt;
import java.util.Properties;
@ -480,9 +482,35 @@ public class GroupCoordinatorService implements GroupCoordinator { @@ -480,9 +482,35 @@ public class GroupCoordinatorService implements GroupCoordinator {
return FutureUtils.failedFuture(Errors.COORDINATOR_NOT_AVAILABLE.exception());
}
return FutureUtils.failedFuture(Errors.UNSUPPORTED_VERSION.exception(
"This API is not implemented yet."
));
// For backwards compatibility, we support fetch commits for the empty group id.
if (groupId == null) {
return FutureUtils.failedFuture(Errors.INVALID_GROUP_ID.exception());
}
// The require stable flag when set tells the broker to hold on returning unstable
// (or uncommitted) offsets. In the previous implementation of the group coordinator,
// the UNSTABLE_OFFSET_COMMIT error is returned when unstable offsets are present. As
// the new implementation relies on timeline data structures, the coordinator does not
// really know whether offsets are stable or not so it is hard to return the same error.
// Instead, we use a write operation when the flag is set to guarantee that the fetch
// is based on all the available offsets and to ensure that the response waits until
// the pending offsets are committed. Otherwise, we use a read operation.
if (requireStable) {
return runtime.scheduleWriteOperation(
"fetch-offsets",
topicPartitionFor(groupId),
coordinator -> new CoordinatorResult<>(
Collections.emptyList(),
coordinator.fetchOffsets(groupId, topics, Long.MAX_VALUE)
)
);
} else {
return runtime.scheduleReadOperation(
"fetch-offsets",
topicPartitionFor(groupId),
(coordinator, offset) -> coordinator.fetchOffsets(groupId, topics, offset)
);
}
}
/**
@ -498,9 +526,35 @@ public class GroupCoordinatorService implements GroupCoordinator { @@ -498,9 +526,35 @@ public class GroupCoordinatorService implements GroupCoordinator {
return FutureUtils.failedFuture(Errors.COORDINATOR_NOT_AVAILABLE.exception());
}
return FutureUtils.failedFuture(Errors.UNSUPPORTED_VERSION.exception(
"This API is not implemented yet."
));
// For backwards compatibility, we support fetch commits for the empty group id.
if (groupId == null) {
return FutureUtils.failedFuture(Errors.INVALID_GROUP_ID.exception());
}
// The require stable flag when set tells the broker to hold on returning unstable
// (or uncommitted) offsets. In the previous implementation of the group coordinator,
// the UNSTABLE_OFFSET_COMMIT error is returned when unstable offsets are present. As
// the new implementation relies on timeline data structures, the coordinator does not
// really know whether offsets are stable or not so it is hard to return the same error.
// Instead, we use a write operation when the flag is set to guarantee that the fetch
// is based on all the available offsets and to ensure that the response waits until
// the pending offsets are committed. Otherwise, we use a read operation.
if (requireStable) {
return runtime.scheduleWriteOperation(
"fetch-all-offsets",
topicPartitionFor(groupId),
coordinator -> new CoordinatorResult<>(
Collections.emptyList(),
coordinator.fetchAllOffsets(groupId, Long.MAX_VALUE)
)
);
} else {
return runtime.scheduleReadOperation(
"fetch-all-offsets",
topicPartitionFor(groupId),
(coordinator, offset) -> coordinator.fetchAllOffsets(groupId, offset)
);
}
}
/**

37
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorShard.java

@ -24,6 +24,8 @@ import org.apache.kafka.common.message.JoinGroupRequestData; @@ -24,6 +24,8 @@ import org.apache.kafka.common.message.JoinGroupRequestData;
import org.apache.kafka.common.message.JoinGroupResponseData;
import org.apache.kafka.common.message.OffsetCommitRequestData;
import org.apache.kafka.common.message.OffsetCommitResponseData;
import org.apache.kafka.common.message.OffsetFetchRequestData;
import org.apache.kafka.common.message.OffsetFetchResponseData;
import org.apache.kafka.common.message.SyncGroupRequestData;
import org.apache.kafka.common.message.SyncGroupResponseData;
import org.apache.kafka.common.errors.ApiException;
@ -56,6 +58,7 @@ import org.apache.kafka.image.MetadataImage; @@ -56,6 +58,7 @@ import org.apache.kafka.image.MetadataImage;
import org.apache.kafka.server.common.ApiMessageAndVersion;
import org.apache.kafka.timeline.SnapshotRegistry;
import java.util.List;
import java.util.concurrent.CompletableFuture;
/**
@ -256,6 +259,40 @@ public class GroupCoordinatorShard implements CoordinatorShard<Record> { @@ -256,6 +259,40 @@ public class GroupCoordinatorShard implements CoordinatorShard<Record> {
);
}
/**
* Fetch offsets for a given set of partitions and a given group.
*
* @param groupId The group id.
* @param topics The topics to fetch the offsets for.
* @param epoch The epoch (or offset) used to read from the
* timeline data structure.
*
* @return A List of OffsetFetchResponseTopics response.
*/
public List<OffsetFetchResponseData.OffsetFetchResponseTopics> fetchOffsets(
String groupId,
List<OffsetFetchRequestData.OffsetFetchRequestTopics> topics,
long epoch
) throws ApiException {
return offsetMetadataManager.fetchOffsets(groupId, topics, epoch);
}
/**
* Fetch all offsets for a given group.
*
* @param groupId The group id.
* @param epoch The epoch (or offset) used to read from the
* timeline data structure.
*
* @return A List of OffsetFetchResponseTopics response.
*/
public List<OffsetFetchResponseData.OffsetFetchResponseTopics> fetchAllOffsets(
String groupId,
long epoch
) throws ApiException {
return offsetMetadataManager.fetchAllOffsets(groupId, epoch);
}
/**
* Handles a OffsetCommit request.
*

14
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java

@ -401,7 +401,19 @@ public class GroupMetadataManager { @@ -401,7 +401,19 @@ public class GroupMetadataManager {
* @return The group corresponding to the group id or throw GroupIdNotFoundException.
*/
public Group group(String groupId) throws GroupIdNotFoundException {
Group group = groups.get(groupId);
Group group = groups.get(groupId, Long.MAX_VALUE);
if (group == null) {
throw new GroupIdNotFoundException(String.format("Group %s not found.", groupId));
}
return group;
}
/**
* @return The group corresponding to the group id at the given committed offset
* or throw GroupIdNotFoundException.
*/
public Group group(String groupId, long committedOffset) throws GroupIdNotFoundException {
Group group = groups.get(groupId, committedOffset);
if (group == null) {
throw new GroupIdNotFoundException(String.format("Group %s not found.", groupId));
}

220
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/OffsetMetadataManager.java

@ -16,7 +16,6 @@ @@ -16,7 +16,6 @@
*/
package org.apache.kafka.coordinator.group;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.errors.ApiException;
import org.apache.kafka.common.errors.GroupIdNotFoundException;
import org.apache.kafka.common.errors.StaleMemberEpochException;
@ -24,6 +23,8 @@ import org.apache.kafka.common.message.OffsetCommitRequestData; @@ -24,6 +23,8 @@ import org.apache.kafka.common.message.OffsetCommitRequestData;
import org.apache.kafka.common.message.OffsetCommitResponseData;
import org.apache.kafka.common.message.OffsetCommitResponseData.OffsetCommitResponseTopic;
import org.apache.kafka.common.message.OffsetCommitResponseData.OffsetCommitResponsePartition;
import org.apache.kafka.common.message.OffsetFetchRequestData;
import org.apache.kafka.common.message.OffsetFetchResponseData;
import org.apache.kafka.common.protocol.Errors;
import org.apache.kafka.common.requests.OffsetCommitRequest;
import org.apache.kafka.common.requests.RequestContext;
@ -41,10 +42,12 @@ import org.apache.kafka.timeline.TimelineHashMap; @@ -41,10 +42,12 @@ import org.apache.kafka.timeline.TimelineHashMap;
import org.slf4j.Logger;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.OptionalLong;
import static org.apache.kafka.common.requests.OffsetFetchResponse.INVALID_OFFSET;
/**
* The OffsetMetadataManager manages the offsets of all the groups. It basically maintains
* a mapping from group id to topic-partition to offset. This class has two kinds of methods:
@ -145,9 +148,9 @@ public class OffsetMetadataManager { @@ -145,9 +148,9 @@ public class OffsetMetadataManager {
private final int offsetMetadataMaxSize;
/**
* The offsets keyed by topic-partition and group id.
* The offsets keyed by group id, topic name and partition id.
*/
private final TimelineHashMap<String, TimelineHashMap<TopicPartition, OffsetAndMetadata>> offsetsByGroup;
private final TimelineHashMap<String, TimelineHashMap<String, TimelineHashMap<Integer, OffsetAndMetadata>>> offsetsByGroup;
OffsetMetadataManager(
SnapshotRegistry snapshotRegistry,
@ -221,6 +224,20 @@ public class OffsetMetadataManager { @@ -221,6 +224,20 @@ public class OffsetMetadataManager {
return group;
}
/**
* Validates an OffsetCommit request.
*
* @param groupId The group id.
* @param lastCommittedOffset The last committed offsets in the timeline.
*/
private void validateOffsetFetch(
String groupId,
long lastCommittedOffset
) throws GroupIdNotFoundException {
Group group = groupMetadataManager.group(groupId, lastCommittedOffset);
group.validateOffsetFetch();
}
/**
* Computes the expiration timestamp based on the retention time provided in the OffsetCommit
* request.
@ -312,6 +329,109 @@ public class OffsetMetadataManager { @@ -312,6 +329,109 @@ public class OffsetMetadataManager {
return new CoordinatorResult<>(records, response);
}
/**
* Fetch offsets for a given Group.
*
* @param groupId The group id.
* @param topics The topics to fetch the offsets for.
* @param lastCommittedOffset The last committed offsets in the timeline.
*
* @return A List of OffsetFetchResponseTopics response.
*/
public List<OffsetFetchResponseData.OffsetFetchResponseTopics> fetchOffsets(
String groupId,
List<OffsetFetchRequestData.OffsetFetchRequestTopics> topics,
long lastCommittedOffset
) throws ApiException {
boolean failAllPartitions = false;
try {
validateOffsetFetch(groupId, lastCommittedOffset);
} catch (GroupIdNotFoundException ex) {
failAllPartitions = true;
}
final List<OffsetFetchResponseData.OffsetFetchResponseTopics> topicResponses = new ArrayList<>(topics.size());
final TimelineHashMap<String, TimelineHashMap<Integer, OffsetAndMetadata>> groupOffsets =
failAllPartitions ? null : offsetsByGroup.get(groupId, lastCommittedOffset);
topics.forEach(topic -> {
final OffsetFetchResponseData.OffsetFetchResponseTopics topicResponse =
new OffsetFetchResponseData.OffsetFetchResponseTopics().setName(topic.name());
topicResponses.add(topicResponse);
final TimelineHashMap<Integer, OffsetAndMetadata> topicOffsets = groupOffsets == null ?
null : groupOffsets.get(topic.name(), lastCommittedOffset);
topic.partitionIndexes().forEach(partitionIndex -> {
final OffsetAndMetadata offsetAndMetadata = topicOffsets == null ?
null : topicOffsets.get(partitionIndex, lastCommittedOffset);
if (offsetAndMetadata == null) {
topicResponse.partitions().add(new OffsetFetchResponseData.OffsetFetchResponsePartitions()
.setPartitionIndex(partitionIndex)
.setCommittedOffset(INVALID_OFFSET)
.setCommittedLeaderEpoch(-1)
.setMetadata(""));
} else {
topicResponse.partitions().add(new OffsetFetchResponseData.OffsetFetchResponsePartitions()
.setPartitionIndex(partitionIndex)
.setCommittedOffset(offsetAndMetadata.offset)
.setCommittedLeaderEpoch(offsetAndMetadata.leaderEpoch.orElse(-1))
.setMetadata(offsetAndMetadata.metadata));
}
});
});
return topicResponses;
}
/**
* Fetch all offsets for a given Group.
*
* @param groupId The group id.
* @param lastCommittedOffset The last committed offsets in the timeline.
*
* @return A List of OffsetFetchResponseTopics response.
*/
public List<OffsetFetchResponseData.OffsetFetchResponseTopics> fetchAllOffsets(
String groupId,
long lastCommittedOffset
) throws ApiException {
try {
validateOffsetFetch(groupId, lastCommittedOffset);
} catch (GroupIdNotFoundException ex) {
return Collections.emptyList();
}
final List<OffsetFetchResponseData.OffsetFetchResponseTopics> topicResponses = new ArrayList<>();
final TimelineHashMap<String, TimelineHashMap<Integer, OffsetAndMetadata>> groupOffsets =
offsetsByGroup.get(groupId, lastCommittedOffset);
if (groupOffsets != null) {
groupOffsets.entrySet(lastCommittedOffset).forEach(topicEntry -> {
final String topic = topicEntry.getKey();
final TimelineHashMap<Integer, OffsetAndMetadata> topicOffsets = topicEntry.getValue();
final OffsetFetchResponseData.OffsetFetchResponseTopics topicResponse =
new OffsetFetchResponseData.OffsetFetchResponseTopics().setName(topic);
topicResponses.add(topicResponse);
topicOffsets.entrySet(lastCommittedOffset).forEach(partitionEntry -> {
final int partition = partitionEntry.getKey();
final OffsetAndMetadata offsetAndMetadata = partitionEntry.getValue();
topicResponse.partitions().add(new OffsetFetchResponseData.OffsetFetchResponsePartitions()
.setPartitionIndex(partition)
.setCommittedOffset(offsetAndMetadata.offset)
.setCommittedLeaderEpoch(offsetAndMetadata.leaderEpoch.orElse(-1))
.setMetadata(offsetAndMetadata.metadata));
});
});
}
return topicResponses;
}
/**
* Replays OffsetCommitKey/Value to update or delete the corresponding offsets.
*
@ -323,7 +443,8 @@ public class OffsetMetadataManager { @@ -323,7 +443,8 @@ public class OffsetMetadataManager {
OffsetCommitValue value
) {
final String groupId = key.group();
final TopicPartition tp = new TopicPartition(key.topic(), key.partition());
final String topic = key.topic();
final int partition = key.partition();
if (value != null) {
// The generic or consumer group should exist when offsets are committed or
@ -337,22 +458,18 @@ public class OffsetMetadataManager { @@ -337,22 +458,18 @@ public class OffsetMetadataManager {
groupMetadataManager.getOrMaybeCreateGenericGroup(groupId, true);
}
final OffsetAndMetadata offsetAndMetadata = OffsetAndMetadata.fromRecord(value);
TimelineHashMap<TopicPartition, OffsetAndMetadata> offsets = offsetsByGroup.get(groupId);
if (offsets == null) {
offsets = new TimelineHashMap<>(snapshotRegistry, 0);
offsetsByGroup.put(groupId, offsets);
}
offsets.put(tp, offsetAndMetadata);
updateOffset(
groupId,
topic,
partition,
OffsetAndMetadata.fromRecord(value)
);
} else {
TimelineHashMap<TopicPartition, OffsetAndMetadata> offsets = offsetsByGroup.get(groupId);
if (offsets != null) {
offsets.remove(tp);
if (offsets.isEmpty()) {
offsetsByGroup.remove(groupId);
}
}
removeOffset(
groupId,
topic,
partition
);
}
}
@ -372,12 +489,67 @@ public class OffsetMetadataManager { @@ -372,12 +489,67 @@ public class OffsetMetadataManager {
*
* package-private for testing.
*/
OffsetAndMetadata offset(String groupId, TopicPartition tp) {
Map<TopicPartition, OffsetAndMetadata> offsets = offsetsByGroup.get(groupId);
if (offsets == null) {
OffsetAndMetadata offset(String groupId, String topic, int partition) {
TimelineHashMap<String, TimelineHashMap<Integer, OffsetAndMetadata>> topicOffsets = offsetsByGroup.get(groupId);
if (topicOffsets == null) {
return null;
} else {
return offsets.get(tp);
TimelineHashMap<Integer, OffsetAndMetadata> partitionOffsets = topicOffsets.get(topic);
if (partitionOffsets == null) {
return null;
} else {
return partitionOffsets.get(partition);
}
}
}
/**
* Updates the offset.
*
* @param groupId The group id.
* @param topic The topic name.
* @param partition The partition id.
* @param offsetAndMetadata The offset metadata.
*/
private void updateOffset(
String groupId,
String topic,
int partition,
OffsetAndMetadata offsetAndMetadata
) {
TimelineHashMap<String, TimelineHashMap<Integer, OffsetAndMetadata>> topicOffsets = offsetsByGroup
.computeIfAbsent(groupId, __ -> new TimelineHashMap<>(snapshotRegistry, 0));
TimelineHashMap<Integer, OffsetAndMetadata> partitionOffsets = topicOffsets
.computeIfAbsent(topic, __ -> new TimelineHashMap<>(snapshotRegistry, 0));
partitionOffsets.put(partition, offsetAndMetadata);
}
/**
* Removes the offset.
*
* @param groupId The group id.
* @param topic The topic name.
* @param partition The partition id.
*/
private void removeOffset(
String groupId,
String topic,
int partition
) {
TimelineHashMap<String, TimelineHashMap<Integer, OffsetAndMetadata>> topicOffsets = offsetsByGroup.get(groupId);
if (topicOffsets == null)
return;
TimelineHashMap<Integer, OffsetAndMetadata> partitionOffsets = topicOffsets.get(topic);
if (partitionOffsets == null)
return;
partitionOffsets.remove(partition);
if (partitionOffsets.isEmpty())
topicOffsets.remove(topic);
if (topicOffsets.isEmpty())
offsetsByGroup.remove(groupId);
}
}

8
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/consumer/ConsumerGroup.java

@ -543,6 +543,14 @@ public class ConsumerGroup implements Group { @@ -543,6 +543,14 @@ public class ConsumerGroup implements Group {
}
}
/**
* Validates the OffsetFetch request.
*/
@Override
public void validateOffsetFetch() {
// Nothing.
}
/**
* Updates the current state of the group.
*/

11
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/generic/GenericGroup.java

@ -19,6 +19,7 @@ package org.apache.kafka.coordinator.group.generic; @@ -19,6 +19,7 @@ package org.apache.kafka.coordinator.group.generic;
import org.apache.kafka.clients.consumer.internals.ConsumerProtocol;
import org.apache.kafka.common.errors.CoordinatorNotAvailableException;
import org.apache.kafka.common.errors.FencedInstanceIdException;
import org.apache.kafka.common.errors.GroupIdNotFoundException;
import org.apache.kafka.common.errors.IllegalGenerationException;
import org.apache.kafka.common.errors.UnknownMemberIdException;
import org.apache.kafka.common.message.JoinGroupRequestData.JoinGroupRequestProtocolCollection;
@ -820,6 +821,16 @@ public class GenericGroup implements Group { @@ -820,6 +821,16 @@ public class GenericGroup implements Group {
}
}
/**
* Validates the OffsetFetch request.
*/
@Override
public void validateOffsetFetch() throws GroupIdNotFoundException {
if (isInState(DEAD)) {
throw new GroupIdNotFoundException(String.format("Group %s is in dead state.", groupId));
}
}
/**
* Verify the member id is up to date for static members. Return true if both conditions met:
* 1. given member is a known static member to group

98
group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupCoordinatorServiceTest.java

@ -35,6 +35,8 @@ import org.apache.kafka.common.message.HeartbeatRequestData; @@ -35,6 +35,8 @@ import org.apache.kafka.common.message.HeartbeatRequestData;
import org.apache.kafka.common.message.HeartbeatResponseData;
import org.apache.kafka.common.message.JoinGroupRequestData;
import org.apache.kafka.common.message.JoinGroupResponseData;
import org.apache.kafka.common.message.OffsetFetchRequestData;
import org.apache.kafka.common.message.OffsetFetchResponseData;
import org.apache.kafka.common.message.SyncGroupRequestData;
import org.apache.kafka.common.message.SyncGroupResponseData;
import org.apache.kafka.common.network.ClientInformation;
@ -56,10 +58,12 @@ import org.junit.jupiter.api.Test; @@ -56,10 +58,12 @@ import org.junit.jupiter.api.Test;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.Arguments;
import org.junit.jupiter.params.provider.MethodSource;
import org.junit.jupiter.params.provider.ValueSource;
import org.mockito.ArgumentMatchers;
import java.net.InetAddress;
import java.util.Collections;
import java.util.List;
import java.util.OptionalInt;
import java.util.Properties;
import java.util.concurrent.CompletableFuture;
@ -595,4 +599,98 @@ public class GroupCoordinatorServiceTest { @@ -595,4 +599,98 @@ public class GroupCoordinatorServiceTest {
future.get()
);
}
@ParameterizedTest
@ValueSource(booleans = {true, false})
public void testFetchOffsets(
boolean requireStable
) throws ExecutionException, InterruptedException, TimeoutException {
CoordinatorRuntime<GroupCoordinatorShard, Record> runtime = mockRuntime();
GroupCoordinatorService service = new GroupCoordinatorService(
new LogContext(),
createConfig(),
runtime
);
service.startup(() -> 1);
List<OffsetFetchRequestData.OffsetFetchRequestTopics> topicsRequest =
Collections.singletonList(new OffsetFetchRequestData.OffsetFetchRequestTopics()
.setName("foo")
.setPartitionIndexes(Collections.singletonList(0)));
List<OffsetFetchResponseData.OffsetFetchResponseTopics> topicsResponse =
Collections.singletonList(new OffsetFetchResponseData.OffsetFetchResponseTopics()
.setName("foo")
.setPartitions(Collections.singletonList(new OffsetFetchResponseData.OffsetFetchResponsePartitions()
.setPartitionIndex(0)
.setCommittedOffset(100L))));
if (requireStable) {
when(runtime.scheduleWriteOperation(
ArgumentMatchers.eq("fetch-offsets"),
ArgumentMatchers.eq(new TopicPartition("__consumer_offsets", 0)),
ArgumentMatchers.any()
)).thenReturn(CompletableFuture.completedFuture(topicsResponse));
} else {
when(runtime.scheduleReadOperation(
ArgumentMatchers.eq("fetch-offsets"),
ArgumentMatchers.eq(new TopicPartition("__consumer_offsets", 0)),
ArgumentMatchers.any()
)).thenReturn(CompletableFuture.completedFuture(topicsResponse));
}
CompletableFuture<List<OffsetFetchResponseData.OffsetFetchResponseTopics>> future = service.fetchOffsets(
requestContext(ApiKeys.OFFSET_FETCH),
"group",
topicsRequest,
requireStable
);
assertEquals(topicsResponse, future.get(5, TimeUnit.SECONDS));
}
@ParameterizedTest
@ValueSource(booleans = {true, false})
public void testFetchAllOffsets(
boolean requireStable
) throws ExecutionException, InterruptedException, TimeoutException {
CoordinatorRuntime<GroupCoordinatorShard, Record> runtime = mockRuntime();
GroupCoordinatorService service = new GroupCoordinatorService(
new LogContext(),
createConfig(),
runtime
);
service.startup(() -> 1);
List<OffsetFetchResponseData.OffsetFetchResponseTopics> topicsResponse =
Collections.singletonList(new OffsetFetchResponseData.OffsetFetchResponseTopics()
.setName("foo")
.setPartitions(Collections.singletonList(new OffsetFetchResponseData.OffsetFetchResponsePartitions()
.setPartitionIndex(0)
.setCommittedOffset(100L))));
if (requireStable) {
when(runtime.scheduleWriteOperation(
ArgumentMatchers.eq("fetch-all-offsets"),
ArgumentMatchers.eq(new TopicPartition("__consumer_offsets", 0)),
ArgumentMatchers.any()
)).thenReturn(CompletableFuture.completedFuture(topicsResponse));
} else {
when(runtime.scheduleReadOperation(
ArgumentMatchers.eq("fetch-all-offsets"),
ArgumentMatchers.eq(new TopicPartition("__consumer_offsets", 0)),
ArgumentMatchers.any()
)).thenReturn(CompletableFuture.completedFuture(topicsResponse));
}
CompletableFuture<List<OffsetFetchResponseData.OffsetFetchResponseTopics>> future = service.fetchAllOffsets(
requestContext(ApiKeys.OFFSET_FETCH),
"group",
requireStable
);
assertEquals(topicsResponse, future.get(5, TimeUnit.SECONDS));
}
}

392
group-coordinator/src/test/java/org/apache/kafka/coordinator/group/OffsetMetadataManagerTest.java

@ -16,7 +16,6 @@ @@ -16,7 +16,6 @@
*/
package org.apache.kafka.coordinator.group;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.errors.CoordinatorNotAvailableException;
import org.apache.kafka.common.errors.GroupIdNotFoundException;
import org.apache.kafka.common.errors.IllegalGenerationException;
@ -27,6 +26,8 @@ import org.apache.kafka.common.errors.UnsupportedVersionException; @@ -27,6 +26,8 @@ import org.apache.kafka.common.errors.UnsupportedVersionException;
import org.apache.kafka.common.message.JoinGroupRequestData;
import org.apache.kafka.common.message.OffsetCommitRequestData;
import org.apache.kafka.common.message.OffsetCommitResponseData;
import org.apache.kafka.common.message.OffsetFetchRequestData;
import org.apache.kafka.common.message.OffsetFetchResponseData;
import org.apache.kafka.common.network.ClientInformation;
import org.apache.kafka.common.network.ListenerName;
import org.apache.kafka.common.protocol.ApiKeys;
@ -50,6 +51,7 @@ import org.apache.kafka.coordinator.group.generic.GenericGroupState; @@ -50,6 +51,7 @@ import org.apache.kafka.coordinator.group.generic.GenericGroupState;
import org.apache.kafka.coordinator.group.runtime.CoordinatorResult;
import org.apache.kafka.image.MetadataImage;
import org.apache.kafka.server.common.ApiMessageAndVersion;
import org.apache.kafka.server.common.MetadataVersion;
import org.apache.kafka.timeline.SnapshotRegistry;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.params.ParameterizedTest;
@ -62,6 +64,7 @@ import java.util.Optional; @@ -62,6 +64,7 @@ import java.util.Optional;
import java.util.OptionalInt;
import java.util.OptionalLong;
import static org.apache.kafka.common.requests.OffsetFetchResponse.INVALID_OFFSET;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertFalse;
import static org.junit.jupiter.api.Assertions.assertNotNull;
@ -174,6 +177,28 @@ public class OffsetMetadataManagerTest { @@ -174,6 +177,28 @@ public class OffsetMetadataManagerTest {
return result;
}
public List<OffsetFetchResponseData.OffsetFetchResponseTopics> fetchOffsets(
String groupId,
List<OffsetFetchRequestData.OffsetFetchRequestTopics> topics,
long committedOffset
) {
return offsetMetadataManager.fetchOffsets(
groupId,
topics,
committedOffset
);
}
public List<OffsetFetchResponseData.OffsetFetchResponseTopics> fetchAllOffsets(
String groupId,
long committedOffset
) {
return offsetMetadataManager.fetchAllOffsets(
groupId,
committedOffset
);
}
public List<MockCoordinatorTimer.ExpiredTimeout<Void, Record>> sleep(long ms) {
time.sleep(ms);
List<MockCoordinatorTimer.ExpiredTimeout<Void, Record>> timeouts = timer.poll();
@ -185,6 +210,30 @@ public class OffsetMetadataManagerTest { @@ -185,6 +210,30 @@ public class OffsetMetadataManagerTest {
return timeouts;
}
public void commitOffset(
String groupId,
String topic,
int partition,
long offset,
int leaderEpoch
) {
snapshotRegistry.getOrCreateSnapshot(lastWrittenOffset);
replay(RecordHelpers.newOffsetCommitRecord(
groupId,
topic,
partition,
new OffsetAndMetadata(
offset,
OptionalInt.of(leaderEpoch),
"metadata",
time.milliseconds(),
OptionalLong.empty()
),
MetadataVersion.latest()
));
}
private ApiMessage messageOrNull(ApiMessageAndVersion apiMessageAndVersion) {
if (apiMessageAndVersion == null) {
return null;
@ -1040,6 +1089,342 @@ public class OffsetMetadataManagerTest { @@ -1040,6 +1089,342 @@ public class OffsetMetadataManagerTest {
);
}
@Test
public void testGenericGroupFetchOffsetsWithDeadGroup() {
OffsetMetadataManagerTestContext context = new OffsetMetadataManagerTestContext.Builder().build();
// Create a dead group.
GenericGroup group = context.groupMetadataManager.getOrMaybeCreateGenericGroup(
"group",
true
);
group.transitionTo(GenericGroupState.DEAD);
List<OffsetFetchRequestData.OffsetFetchRequestTopics> request = Arrays.asList(
new OffsetFetchRequestData.OffsetFetchRequestTopics()
.setName("foo")
.setPartitionIndexes(Arrays.asList(0, 1)),
new OffsetFetchRequestData.OffsetFetchRequestTopics()
.setName("bar")
.setPartitionIndexes(Collections.singletonList(0))
);
List<OffsetFetchResponseData.OffsetFetchResponseTopics> expectedResponse = Arrays.asList(
new OffsetFetchResponseData.OffsetFetchResponseTopics()
.setName("foo")
.setPartitions(Arrays.asList(
mkInvalidOffsetPartitionResponse(0),
mkInvalidOffsetPartitionResponse(1)
)),
new OffsetFetchResponseData.OffsetFetchResponseTopics()
.setName("bar")
.setPartitions(Collections.singletonList(
mkInvalidOffsetPartitionResponse(0)
))
);
assertEquals(expectedResponse, context.fetchOffsets("group", request, Long.MAX_VALUE));
}
@Test
public void testFetchOffsetsWithUnknownGroup() {
OffsetMetadataManagerTestContext context = new OffsetMetadataManagerTestContext.Builder().build();
List<OffsetFetchRequestData.OffsetFetchRequestTopics> request = Arrays.asList(
new OffsetFetchRequestData.OffsetFetchRequestTopics()
.setName("foo")
.setPartitionIndexes(Arrays.asList(0, 1)),
new OffsetFetchRequestData.OffsetFetchRequestTopics()
.setName("bar")
.setPartitionIndexes(Collections.singletonList(0))
);
List<OffsetFetchResponseData.OffsetFetchResponseTopics> expectedResponse = Arrays.asList(
new OffsetFetchResponseData.OffsetFetchResponseTopics()
.setName("foo")
.setPartitions(Arrays.asList(
mkInvalidOffsetPartitionResponse(0),
mkInvalidOffsetPartitionResponse(1)
)),
new OffsetFetchResponseData.OffsetFetchResponseTopics()
.setName("bar")
.setPartitions(Collections.singletonList(
mkInvalidOffsetPartitionResponse(0)
))
);
assertEquals(expectedResponse, context.fetchOffsets("group", request, Long.MAX_VALUE));
}
@Test
public void testFetchOffsetsAtDifferentCommittedOffset() {
OffsetMetadataManagerTestContext context = new OffsetMetadataManagerTestContext.Builder().build();
context.groupMetadataManager.getOrMaybeCreateConsumerGroup("group", true);
assertEquals(0, context.lastWrittenOffset);
context.commitOffset("group", "foo", 0, 100L, 1);
assertEquals(1, context.lastWrittenOffset);
context.commitOffset("group", "foo", 1, 110L, 1);
assertEquals(2, context.lastWrittenOffset);
context.commitOffset("group", "bar", 0, 200L, 1);
assertEquals(3, context.lastWrittenOffset);
context.commitOffset("group", "foo", 1, 111L, 2);
assertEquals(4, context.lastWrittenOffset);
context.commitOffset("group", "bar", 1, 210L, 2);
assertEquals(5, context.lastWrittenOffset);
// Always use the same request.
List<OffsetFetchRequestData.OffsetFetchRequestTopics> request = Arrays.asList(
new OffsetFetchRequestData.OffsetFetchRequestTopics()
.setName("foo")
.setPartitionIndexes(Arrays.asList(0, 1)),
new OffsetFetchRequestData.OffsetFetchRequestTopics()
.setName("bar")
.setPartitionIndexes(Arrays.asList(0, 1))
);
// Fetching with 0 should return all invalid offsets.
assertEquals(Arrays.asList(
new OffsetFetchResponseData.OffsetFetchResponseTopics()
.setName("foo")
.setPartitions(Arrays.asList(
mkInvalidOffsetPartitionResponse(0),
mkInvalidOffsetPartitionResponse(1)
)),
new OffsetFetchResponseData.OffsetFetchResponseTopics()
.setName("bar")
.setPartitions(Arrays.asList(
mkInvalidOffsetPartitionResponse(0),
mkInvalidOffsetPartitionResponse(1)
))
), context.fetchOffsets("group", request, 0L));
// Fetching with 1 should return data up to offset 1.
assertEquals(Arrays.asList(
new OffsetFetchResponseData.OffsetFetchResponseTopics()
.setName("foo")
.setPartitions(Arrays.asList(
mkOffsetPartitionResponse(0, 100L, 1, "metadata"),
mkInvalidOffsetPartitionResponse(1)
)),
new OffsetFetchResponseData.OffsetFetchResponseTopics()
.setName("bar")
.setPartitions(Arrays.asList(
mkInvalidOffsetPartitionResponse(0),
mkInvalidOffsetPartitionResponse(1)
))
), context.fetchOffsets("group", request, 1L));
// Fetching with 2 should return data up to offset 2.
assertEquals(Arrays.asList(
new OffsetFetchResponseData.OffsetFetchResponseTopics()
.setName("foo")
.setPartitions(Arrays.asList(
mkOffsetPartitionResponse(0, 100L, 1, "metadata"),
mkOffsetPartitionResponse(1, 110L, 1, "metadata")
)),
new OffsetFetchResponseData.OffsetFetchResponseTopics()
.setName("bar")
.setPartitions(Arrays.asList(
mkInvalidOffsetPartitionResponse(0),
mkInvalidOffsetPartitionResponse(1)
))
), context.fetchOffsets("group", request, 2L));
// Fetching with 3 should return data up to offset 3.
assertEquals(Arrays.asList(
new OffsetFetchResponseData.OffsetFetchResponseTopics()
.setName("foo")
.setPartitions(Arrays.asList(
mkOffsetPartitionResponse(0, 100L, 1, "metadata"),
mkOffsetPartitionResponse(1, 110L, 1, "metadata")
)),
new OffsetFetchResponseData.OffsetFetchResponseTopics()
.setName("bar")
.setPartitions(Arrays.asList(
mkOffsetPartitionResponse(0, 200L, 1, "metadata"),
mkInvalidOffsetPartitionResponse(1)
))
), context.fetchOffsets("group", request, 3L));
// Fetching with 4 should return data up to offset 4.
assertEquals(Arrays.asList(
new OffsetFetchResponseData.OffsetFetchResponseTopics()
.setName("foo")
.setPartitions(Arrays.asList(
mkOffsetPartitionResponse(0, 100L, 1, "metadata"),
mkOffsetPartitionResponse(1, 111L, 2, "metadata")
)),
new OffsetFetchResponseData.OffsetFetchResponseTopics()
.setName("bar")
.setPartitions(Arrays.asList(
mkOffsetPartitionResponse(0, 200L, 1, "metadata"),
mkInvalidOffsetPartitionResponse(1)
))
), context.fetchOffsets("group", request, 4L));
// Fetching with 5 should return data up to offset 5.
assertEquals(Arrays.asList(
new OffsetFetchResponseData.OffsetFetchResponseTopics()
.setName("foo")
.setPartitions(Arrays.asList(
mkOffsetPartitionResponse(0, 100L, 1, "metadata"),
mkOffsetPartitionResponse(1, 111L, 2, "metadata")
)),
new OffsetFetchResponseData.OffsetFetchResponseTopics()
.setName("bar")
.setPartitions(Arrays.asList(
mkOffsetPartitionResponse(0, 200L, 1, "metadata"),
mkOffsetPartitionResponse(1, 210L, 2, "metadata")
))
), context.fetchOffsets("group", request, 5L));
// Fetching with Long.MAX_VALUE should return all offsets.
assertEquals(Arrays.asList(
new OffsetFetchResponseData.OffsetFetchResponseTopics()
.setName("foo")
.setPartitions(Arrays.asList(
mkOffsetPartitionResponse(0, 100L, 1, "metadata"),
mkOffsetPartitionResponse(1, 111L, 2, "metadata")
)),
new OffsetFetchResponseData.OffsetFetchResponseTopics()
.setName("bar")
.setPartitions(Arrays.asList(
mkOffsetPartitionResponse(0, 200L, 1, "metadata"),
mkOffsetPartitionResponse(1, 210L, 2, "metadata")
))
), context.fetchOffsets("group", request, Long.MAX_VALUE));
}
@Test
public void testGenericGroupFetchAllOffsetsWithDeadGroup() {
OffsetMetadataManagerTestContext context = new OffsetMetadataManagerTestContext.Builder().build();
// Create a dead group.
GenericGroup group = context.groupMetadataManager.getOrMaybeCreateGenericGroup(
"group",
true
);
group.transitionTo(GenericGroupState.DEAD);
assertEquals(Collections.emptyList(), context.fetchAllOffsets("group", Long.MAX_VALUE));
}
@Test
public void testFetchAllOffsetsWithUnknownGroup() {
OffsetMetadataManagerTestContext context = new OffsetMetadataManagerTestContext.Builder().build();
assertEquals(Collections.emptyList(), context.fetchAllOffsets("group", Long.MAX_VALUE));
}
@Test
public void testFetchAllOffsetsAtDifferentCommittedOffset() {
OffsetMetadataManagerTestContext context = new OffsetMetadataManagerTestContext.Builder().build();
context.groupMetadataManager.getOrMaybeCreateConsumerGroup("group", true);
assertEquals(0, context.lastWrittenOffset);
context.commitOffset("group", "foo", 0, 100L, 1);
assertEquals(1, context.lastWrittenOffset);
context.commitOffset("group", "foo", 1, 110L, 1);
assertEquals(2, context.lastWrittenOffset);
context.commitOffset("group", "bar", 0, 200L, 1);
assertEquals(3, context.lastWrittenOffset);
context.commitOffset("group", "foo", 1, 111L, 2);
assertEquals(4, context.lastWrittenOffset);
context.commitOffset("group", "bar", 1, 210L, 2);
assertEquals(5, context.lastWrittenOffset);
// Fetching with 0 should no offsets.
assertEquals(Collections.emptyList(), context.fetchAllOffsets("group", 0L));
// Fetching with 1 should return data up to offset 1.
assertEquals(Arrays.asList(
new OffsetFetchResponseData.OffsetFetchResponseTopics()
.setName("foo")
.setPartitions(Arrays.asList(
mkOffsetPartitionResponse(0, 100L, 1, "metadata")
))
), context.fetchAllOffsets("group", 1L));
// Fetching with 2 should return data up to offset 2.
assertEquals(Arrays.asList(
new OffsetFetchResponseData.OffsetFetchResponseTopics()
.setName("foo")
.setPartitions(Arrays.asList(
mkOffsetPartitionResponse(0, 100L, 1, "metadata"),
mkOffsetPartitionResponse(1, 110L, 1, "metadata")
))
), context.fetchAllOffsets("group", 2L));
// Fetching with 3 should return data up to offset 3.
assertEquals(Arrays.asList(
new OffsetFetchResponseData.OffsetFetchResponseTopics()
.setName("bar")
.setPartitions(Arrays.asList(
mkOffsetPartitionResponse(0, 200L, 1, "metadata")
)),
new OffsetFetchResponseData.OffsetFetchResponseTopics()
.setName("foo")
.setPartitions(Arrays.asList(
mkOffsetPartitionResponse(0, 100L, 1, "metadata"),
mkOffsetPartitionResponse(1, 110L, 1, "metadata")
))
), context.fetchAllOffsets("group", 3L));
// Fetching with 4 should return data up to offset 4.
assertEquals(Arrays.asList(
new OffsetFetchResponseData.OffsetFetchResponseTopics()
.setName("bar")
.setPartitions(Arrays.asList(
mkOffsetPartitionResponse(0, 200L, 1, "metadata")
)),
new OffsetFetchResponseData.OffsetFetchResponseTopics()
.setName("foo")
.setPartitions(Arrays.asList(
mkOffsetPartitionResponse(0, 100L, 1, "metadata"),
mkOffsetPartitionResponse(1, 111L, 2, "metadata")
))
), context.fetchAllOffsets("group", 4L));
// Fetching with Long.MAX_VALUE should return all offsets.
assertEquals(Arrays.asList(
new OffsetFetchResponseData.OffsetFetchResponseTopics()
.setName("bar")
.setPartitions(Arrays.asList(
mkOffsetPartitionResponse(0, 200L, 1, "metadata"),
mkOffsetPartitionResponse(1, 210L, 2, "metadata")
)),
new OffsetFetchResponseData.OffsetFetchResponseTopics()
.setName("foo")
.setPartitions(Arrays.asList(
mkOffsetPartitionResponse(0, 100L, 1, "metadata"),
mkOffsetPartitionResponse(1, 111L, 2, "metadata")
))
), context.fetchAllOffsets("group", Long.MAX_VALUE));
}
static private OffsetFetchResponseData.OffsetFetchResponsePartitions mkOffsetPartitionResponse(
int partition,
long offset,
int leaderEpoch,
String metadata
) {
return new OffsetFetchResponseData.OffsetFetchResponsePartitions()
.setPartitionIndex(partition)
.setCommittedOffset(offset)
.setCommittedLeaderEpoch(leaderEpoch)
.setMetadata(metadata);
}
static private OffsetFetchResponseData.OffsetFetchResponsePartitions mkInvalidOffsetPartitionResponse(int partition) {
return new OffsetFetchResponseData.OffsetFetchResponsePartitions()
.setPartitionIndex(partition)
.setCommittedOffset(INVALID_OFFSET)
.setCommittedLeaderEpoch(-1)
.setMetadata("");
}
@Test
public void testReplay() {
OffsetMetadataManagerTestContext context = new OffsetMetadataManagerTestContext.Builder().build();
@ -1098,7 +1483,7 @@ public class OffsetMetadataManagerTest { @@ -1098,7 +1483,7 @@ public class OffsetMetadataManagerTest {
));
// Verify that the offset is gone.
assertNull(context.offsetMetadataManager.offset("foo", new TopicPartition("bar", 0)));
assertNull(context.offsetMetadataManager.offset("foo", "bar", 0));
}
private void verifyReplay(
@ -1118,7 +1503,8 @@ public class OffsetMetadataManagerTest { @@ -1118,7 +1503,8 @@ public class OffsetMetadataManagerTest {
assertEquals(offsetAndMetadata, context.offsetMetadataManager.offset(
groupId,
new TopicPartition(topic, partition)
topic,
partition
));
}

Loading…
Cancel
Save