Browse Source

KAFKA-14506: Implement DeleteGroups API and OffsetDelete API (#14408)

This patch implements DeleteGroups and OffsetDelete API in the new group coordinator.

Reviewers: yangy0000, Ritika Reddy <rreddy@confluent.io>, Jeff Kim <jeff.kim@confluent.io>, David Jacot <djacot@confluent.io>
pull/14485/head
Dongnuo Lyu 1 year ago committed by GitHub
parent
commit
a12f9f97c9
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 4
      checkstyle/suppressions.xml
  2. 32
      clients/src/main/java/org/apache/kafka/common/requests/DeleteGroupsRequest.java
  3. 49
      clients/src/test/java/org/apache/kafka/common/requests/DeleteGroupsRequestTest.java
  4. 27
      group-coordinator/src/main/java/org/apache/kafka/coordinator/group/Group.java
  5. 125
      group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorService.java
  6. 78
      group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorShard.java
  7. 27
      group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java
  8. 104
      group-coordinator/src/main/java/org/apache/kafka/coordinator/group/OffsetMetadataManager.java
  9. 43
      group-coordinator/src/main/java/org/apache/kafka/coordinator/group/consumer/ConsumerGroup.java
  10. 53
      group-coordinator/src/main/java/org/apache/kafka/coordinator/group/generic/GenericGroup.java
  11. 246
      group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupCoordinatorServiceTest.java
  12. 147
      group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupCoordinatorShardTest.java
  13. 30
      group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTest.java
  14. 206
      group-coordinator/src/test/java/org/apache/kafka/coordinator/group/OffsetMetadataManagerTest.java
  15. 30
      group-coordinator/src/test/java/org/apache/kafka/coordinator/group/consumer/ConsumerGroupTest.java
  16. 61
      group-coordinator/src/test/java/org/apache/kafka/coordinator/group/generic/GenericGroupTest.java

4
checkstyle/suppressions.xml

@ -326,11 +326,11 @@ @@ -326,11 +326,11 @@
<suppress checks="(NPathComplexity|MethodLength)"
files="(GroupMetadataManager|ConsumerGroupTest|GroupMetadataManagerTest).java"/>
<suppress checks="ClassFanOutComplexity"
files="(GroupMetadataManager|GroupMetadataManagerTest|GroupCoordinatorServiceTest).java"/>
files="(GroupMetadataManager|GroupMetadataManagerTest|GroupCoordinatorService|GroupCoordinatorServiceTest).java"/>
<suppress checks="ParameterNumber"
files="(ConsumerGroupMember|GroupMetadataManager).java"/>
<suppress checks="ClassDataAbstractionCouplingCheck"
files="(RecordHelpersTest|GroupMetadataManager|GroupMetadataManagerTest|GroupCoordinatorServiceTest).java"/>
files="(RecordHelpersTest|GroupMetadataManager|GroupMetadataManagerTest|GroupCoordinatorServiceTest|GroupCoordinatorShardTest).java"/>
<suppress checks="JavaNCSS"
files="GroupMetadataManagerTest.java"/>

32
clients/src/main/java/org/apache/kafka/common/requests/DeleteGroupsRequest.java

@ -18,13 +18,12 @@ package org.apache.kafka.common.requests; @@ -18,13 +18,12 @@ package org.apache.kafka.common.requests;
import org.apache.kafka.common.message.DeleteGroupsRequestData;
import org.apache.kafka.common.message.DeleteGroupsResponseData;
import org.apache.kafka.common.message.DeleteGroupsResponseData.DeletableGroupResult;
import org.apache.kafka.common.message.DeleteGroupsResponseData.DeletableGroupResultCollection;
import org.apache.kafka.common.protocol.ApiKeys;
import org.apache.kafka.common.protocol.ByteBufferAccessor;
import org.apache.kafka.common.protocol.Errors;
import java.nio.ByteBuffer;
import java.util.List;
public class DeleteGroupsRequest extends AbstractRequest {
public static class Builder extends AbstractRequest.Builder<DeleteGroupsRequest> {
@ -55,18 +54,9 @@ public class DeleteGroupsRequest extends AbstractRequest { @@ -55,18 +54,9 @@ public class DeleteGroupsRequest extends AbstractRequest {
@Override
public AbstractResponse getErrorResponse(int throttleTimeMs, Throwable e) {
Errors error = Errors.forException(e);
DeletableGroupResultCollection groupResults = new DeletableGroupResultCollection();
for (String groupId : data.groupsNames()) {
groupResults.add(new DeletableGroupResult()
.setGroupId(groupId)
.setErrorCode(error.code()));
}
return new DeleteGroupsResponse(
new DeleteGroupsResponseData()
.setResults(groupResults)
.setThrottleTimeMs(throttleTimeMs)
return new DeleteGroupsResponse(new DeleteGroupsResponseData()
.setResults(getErrorResultCollection(data.groupsNames(), Errors.forException(e)))
.setThrottleTimeMs(throttleTimeMs)
);
}
@ -78,4 +68,18 @@ public class DeleteGroupsRequest extends AbstractRequest { @@ -78,4 +68,18 @@ public class DeleteGroupsRequest extends AbstractRequest {
public DeleteGroupsRequestData data() {
return data;
}
public static DeleteGroupsResponseData.DeletableGroupResultCollection getErrorResultCollection(
List<String> groupIds,
Errors error
) {
DeleteGroupsResponseData.DeletableGroupResultCollection resultCollection =
new DeleteGroupsResponseData.DeletableGroupResultCollection();
groupIds.forEach(groupId -> resultCollection.add(
new DeleteGroupsResponseData.DeletableGroupResult()
.setGroupId(groupId)
.setErrorCode(error.code())
));
return resultCollection;
}
}

49
clients/src/test/java/org/apache/kafka/common/requests/DeleteGroupsRequestTest.java

@ -0,0 +1,49 @@ @@ -0,0 +1,49 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.common.requests;
import org.apache.kafka.common.message.DeleteGroupsRequestData;
import org.apache.kafka.common.message.DeleteGroupsResponseData;
import org.apache.kafka.common.protocol.Errors;
import org.junit.jupiter.api.Test;
import java.util.Arrays;
import static org.apache.kafka.common.requests.DeleteGroupsRequest.getErrorResultCollection;
import static org.junit.jupiter.api.Assertions.assertEquals;
public class DeleteGroupsRequestTest {
@Test
public void testGetErrorResultCollection() {
String groupId1 = "group-id-1";
String groupId2 = "group-id-2";
DeleteGroupsRequestData data = new DeleteGroupsRequestData()
.setGroupsNames(Arrays.asList(groupId1, groupId2));
DeleteGroupsResponseData.DeletableGroupResultCollection expectedResultCollection =
new DeleteGroupsResponseData.DeletableGroupResultCollection(Arrays.asList(
new DeleteGroupsResponseData.DeletableGroupResult()
.setGroupId(groupId1)
.setErrorCode(Errors.COORDINATOR_LOAD_IN_PROGRESS.code()),
new DeleteGroupsResponseData.DeletableGroupResult()
.setGroupId(groupId2)
.setErrorCode(Errors.COORDINATOR_LOAD_IN_PROGRESS.code())
).iterator());
assertEquals(expectedResultCollection, getErrorResultCollection(data.groupsNames(), Errors.COORDINATOR_LOAD_IN_PROGRESS));
}
}

27
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/Group.java

@ -19,6 +19,8 @@ package org.apache.kafka.coordinator.group; @@ -19,6 +19,8 @@ package org.apache.kafka.coordinator.group;
import org.apache.kafka.common.KafkaException;
import org.apache.kafka.common.message.ListGroupsResponseData;
import java.util.List;
/**
* Interface common for all groups.
*/
@ -90,4 +92,29 @@ public interface Group { @@ -90,4 +92,29 @@ public interface Group {
int memberEpoch,
long lastCommittedOffset
) throws KafkaException;
/**
* Validates the OffsetDelete request.
*/
void validateOffsetDelete() throws KafkaException;
/**
* Validates the DeleteGroups request.
*/
void validateDeleteGroup() throws KafkaException;
/**
* Returns true if the group is actively subscribed to the topic.
*
* @param topic The topic name.
* @return Whether the group is subscribed to the topic.
*/
boolean isSubscribedToTopic(String topic);
/**
* Populates the list of records with tombstone(s) for deleting the group.
*
* @param records The list of records.
*/
void createGroupTombstoneRecords(List<Record> records);
}

125
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorService.java

@ -53,6 +53,7 @@ import org.apache.kafka.common.message.SyncGroupResponseData; @@ -53,6 +53,7 @@ import org.apache.kafka.common.message.SyncGroupResponseData;
import org.apache.kafka.common.message.TxnOffsetCommitRequestData;
import org.apache.kafka.common.message.TxnOffsetCommitResponseData;
import org.apache.kafka.common.protocol.Errors;
import org.apache.kafka.common.requests.DeleteGroupsRequest;
import org.apache.kafka.common.requests.OffsetCommitRequest;
import org.apache.kafka.common.requests.RequestContext;
import org.apache.kafka.common.requests.TransactionResult;
@ -77,6 +78,8 @@ import org.slf4j.Logger; @@ -77,6 +78,8 @@ import org.slf4j.Logger;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.HashMap;
import java.util.OptionalInt;
import java.util.Properties;
import java.util.Set;
@ -523,9 +526,50 @@ public class GroupCoordinatorService implements GroupCoordinator { @@ -523,9 +526,50 @@ public class GroupCoordinatorService implements GroupCoordinator {
return FutureUtils.failedFuture(Errors.COORDINATOR_NOT_AVAILABLE.exception());
}
return FutureUtils.failedFuture(Errors.UNSUPPORTED_VERSION.exception(
"This API is not implemented yet."
));
final List<CompletableFuture<DeleteGroupsResponseData.DeletableGroupResultCollection>> futures =
new ArrayList<>(groupIds.size());
final Map<TopicPartition, List<String>> groupsByTopicPartition = new HashMap<>();
groupIds.forEach(groupId -> {
// For backwards compatibility, we support DeleteGroups for the empty group id.
if (groupId == null) {
futures.add(CompletableFuture.completedFuture(DeleteGroupsRequest.getErrorResultCollection(
Collections.singletonList(null),
Errors.INVALID_GROUP_ID
)));
} else {
final TopicPartition topicPartition = topicPartitionFor(groupId);
groupsByTopicPartition
.computeIfAbsent(topicPartition, __ -> new ArrayList<>())
.add(groupId);
}
});
groupsByTopicPartition.forEach((topicPartition, groupList) -> {
CompletableFuture<DeleteGroupsResponseData.DeletableGroupResultCollection> future =
runtime.scheduleWriteOperation(
"delete-groups",
topicPartition,
coordinator -> coordinator.deleteGroups(context, groupList)
).exceptionally(exception ->
DeleteGroupsRequest.getErrorResultCollection(groupList, normalizeException(exception))
);
futures.add(future);
});
final CompletableFuture<Void> allFutures = CompletableFuture.allOf(futures.toArray(new CompletableFuture[0]));
return allFutures.thenApply(__ -> {
final DeleteGroupsResponseData.DeletableGroupResultCollection res = new DeleteGroupsResponseData.DeletableGroupResultCollection();
futures.forEach(future ->
// We don't use res.addAll(future.join()) because DeletableGroupResultCollection is an ImplicitLinkedHashMultiCollection,
// which has requirements for adding elements (see ImplicitLinkedHashCollection.java#add).
future.join().forEach(result ->
res.add(result.duplicate())
)
);
return res;
});
}
/**
@ -641,37 +685,9 @@ public class GroupCoordinatorService implements GroupCoordinator { @@ -641,37 +685,9 @@ public class GroupCoordinatorService implements GroupCoordinator {
"commit-offset",
topicPartitionFor(request.groupId()),
coordinator -> coordinator.commitOffset(context, request)
).exceptionally(exception -> {
if (exception instanceof UnknownTopicOrPartitionException ||
exception instanceof NotEnoughReplicasException) {
return OffsetCommitRequest.getErrorResponse(
request,
Errors.COORDINATOR_NOT_AVAILABLE
);
}
if (exception instanceof NotLeaderOrFollowerException ||
exception instanceof KafkaStorageException) {
return OffsetCommitRequest.getErrorResponse(
request,
Errors.NOT_COORDINATOR
);
}
if (exception instanceof RecordTooLargeException ||
exception instanceof RecordBatchTooLargeException ||
exception instanceof InvalidFetchSizeException) {
return OffsetCommitRequest.getErrorResponse(
request,
Errors.INVALID_COMMIT_OFFSET_SIZE
);
}
return OffsetCommitRequest.getErrorResponse(
request,
Errors.forException(exception)
);
});
).exceptionally(exception ->
OffsetCommitRequest.getErrorResponse(request, normalizeException(exception))
);
}
/**
@ -705,9 +721,20 @@ public class GroupCoordinatorService implements GroupCoordinator { @@ -705,9 +721,20 @@ public class GroupCoordinatorService implements GroupCoordinator {
return FutureUtils.failedFuture(Errors.COORDINATOR_NOT_AVAILABLE.exception());
}
return FutureUtils.failedFuture(Errors.UNSUPPORTED_VERSION.exception(
"This API is not implemented yet."
));
if (!isGroupIdNotEmpty(request.groupId())) {
return CompletableFuture.completedFuture(new OffsetDeleteResponseData()
.setErrorCode(Errors.INVALID_GROUP_ID.code())
);
}
return runtime.scheduleWriteOperation(
"delete-offsets",
topicPartitionFor(request.groupId()),
coordinator -> coordinator.deleteOffsets(context, request)
).exceptionally(exception ->
new OffsetDeleteResponseData()
.setErrorCode(normalizeException(exception).code())
);
}
/**
@ -827,4 +854,28 @@ public class GroupCoordinatorService implements GroupCoordinator { @@ -827,4 +854,28 @@ public class GroupCoordinatorService implements GroupCoordinator {
private static boolean isGroupIdNotEmpty(String groupId) {
return groupId != null && !groupId.isEmpty();
}
/**
* Handles the exception in the scheduleWriteOperation.
* @return The Errors instance associated with the given exception.
*/
private static Errors normalizeException(Throwable exception) {
if (exception instanceof UnknownTopicOrPartitionException ||
exception instanceof NotEnoughReplicasException) {
return Errors.COORDINATOR_NOT_AVAILABLE;
}
if (exception instanceof NotLeaderOrFollowerException ||
exception instanceof KafkaStorageException) {
return Errors.NOT_COORDINATOR;
}
if (exception instanceof RecordTooLargeException ||
exception instanceof RecordBatchTooLargeException ||
exception instanceof InvalidFetchSizeException) {
return Errors.UNKNOWN_SERVER_ERROR;
}
return Errors.forException(exception);
}
}

78
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorShard.java

@ -18,6 +18,7 @@ package org.apache.kafka.coordinator.group; @@ -18,6 +18,7 @@ package org.apache.kafka.coordinator.group;
import org.apache.kafka.common.message.ConsumerGroupHeartbeatRequestData;
import org.apache.kafka.common.message.ConsumerGroupHeartbeatResponseData;
import org.apache.kafka.common.message.DeleteGroupsResponseData;
import org.apache.kafka.common.message.HeartbeatRequestData;
import org.apache.kafka.common.message.HeartbeatResponseData;
import org.apache.kafka.common.message.JoinGroupRequestData;
@ -27,12 +28,15 @@ import org.apache.kafka.common.message.LeaveGroupResponseData; @@ -27,12 +28,15 @@ import org.apache.kafka.common.message.LeaveGroupResponseData;
import org.apache.kafka.common.message.ListGroupsResponseData;
import org.apache.kafka.common.message.OffsetCommitRequestData;
import org.apache.kafka.common.message.OffsetCommitResponseData;
import org.apache.kafka.common.message.OffsetDeleteRequestData;
import org.apache.kafka.common.message.OffsetDeleteResponseData;
import org.apache.kafka.common.message.OffsetFetchRequestData;
import org.apache.kafka.common.message.OffsetFetchResponseData;
import org.apache.kafka.common.message.SyncGroupRequestData;
import org.apache.kafka.common.message.SyncGroupResponseData;
import org.apache.kafka.common.errors.ApiException;
import org.apache.kafka.common.protocol.ApiMessage;
import org.apache.kafka.common.protocol.Errors;
import org.apache.kafka.common.requests.RequestContext;
import org.apache.kafka.common.utils.LogContext;
import org.apache.kafka.common.utils.Time;
@ -60,7 +64,9 @@ import org.apache.kafka.image.MetadataDelta; @@ -60,7 +64,9 @@ import org.apache.kafka.image.MetadataDelta;
import org.apache.kafka.image.MetadataImage;
import org.apache.kafka.server.common.ApiMessageAndVersion;
import org.apache.kafka.timeline.SnapshotRegistry;
import org.slf4j.Logger;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.CompletableFuture;
@ -156,12 +162,18 @@ public class GroupCoordinatorShard implements CoordinatorShard<Record> { @@ -156,12 +162,18 @@ public class GroupCoordinatorShard implements CoordinatorShard<Record> {
.build();
return new GroupCoordinatorShard(
logContext,
groupMetadataManager,
offsetMetadataManager
);
}
}
/**
* The logger.
*/
private final Logger log;
/**
* The group metadata manager.
*/
@ -175,13 +187,16 @@ public class GroupCoordinatorShard implements CoordinatorShard<Record> { @@ -175,13 +187,16 @@ public class GroupCoordinatorShard implements CoordinatorShard<Record> {
/**
* Constructor.
*
* @param logContext The log context.
* @param groupMetadataManager The group metadata manager.
* @param offsetMetadataManager The offset metadata manager.
*/
GroupCoordinatorShard(
LogContext logContext,
GroupMetadataManager groupMetadataManager,
OffsetMetadataManager offsetMetadataManager
) {
this.log = logContext.logger(GroupCoordinatorShard.class);
this.groupMetadataManager = groupMetadataManager;
this.offsetMetadataManager = offsetMetadataManager;
}
@ -262,6 +277,51 @@ public class GroupCoordinatorShard implements CoordinatorShard<Record> { @@ -262,6 +277,51 @@ public class GroupCoordinatorShard implements CoordinatorShard<Record> {
);
}
/**
* Handles a DeleteGroups request.
*
* @param context The request context.
* @param groupIds The groupIds of the groups to be deleted
* @return A Result containing the DeleteGroupsResponseData.DeletableGroupResultCollection response and
* a list of records to update the state machine.
*/
public CoordinatorResult<DeleteGroupsResponseData.DeletableGroupResultCollection, Record> deleteGroups(
RequestContext context,
List<String> groupIds
) throws ApiException {
final DeleteGroupsResponseData.DeletableGroupResultCollection resultCollection =
new DeleteGroupsResponseData.DeletableGroupResultCollection(groupIds.size());
final List<Record> records = new ArrayList<>();
int numDeletedOffsets = 0;
final List<String> deletedGroups = new ArrayList<>();
for (String groupId : groupIds) {
try {
groupMetadataManager.validateDeleteGroup(groupId);
numDeletedOffsets += offsetMetadataManager.deleteAllOffsets(groupId, records);
groupMetadataManager.deleteGroup(groupId, records);
deletedGroups.add(groupId);
resultCollection.add(
new DeleteGroupsResponseData.DeletableGroupResult()
.setGroupId(groupId)
);
} catch (ApiException exception) {
resultCollection.add(
new DeleteGroupsResponseData.DeletableGroupResult()
.setGroupId(groupId)
.setErrorCode(Errors.forException(exception).code())
);
}
}
log.info("The following groups were deleted: {}. A total of {} offsets were removed.",
String.join(", ", deletedGroups),
numDeletedOffsets
);
return new CoordinatorResult<>(records, resultCollection);
}
/**
* Fetch offsets for a given set of partitions and a given group.
*
@ -295,7 +355,7 @@ public class GroupCoordinatorShard implements CoordinatorShard<Record> { @@ -295,7 +355,7 @@ public class GroupCoordinatorShard implements CoordinatorShard<Record> {
}
/**
* Handles a OffsetCommit request.
* Handles an OffsetCommit request.
*
* @param context The request context.
* @param request The actual OffsetCommit request.
@ -341,6 +401,22 @@ public class GroupCoordinatorShard implements CoordinatorShard<Record> { @@ -341,6 +401,22 @@ public class GroupCoordinatorShard implements CoordinatorShard<Record> {
return groupMetadataManager.genericGroupLeave(context, request);
}
/**
* Handles a OffsetDelete request.
*
* @param context The request context.
* @param request The actual OffsetDelete request.
*
* @return A Result containing the OffsetDeleteResponse response and
* a list of records to update the state machine.
*/
public CoordinatorResult<OffsetDeleteResponseData, Record> deleteOffsets(
RequestContext context,
OffsetDeleteRequestData request
) throws ApiException {
return offsetMetadataManager.deleteOffsets(request);
}
/**
* The coordinator has been loaded. This is used to apply any
* post loading operations (e.g. registering timers).

27
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java

@ -3060,6 +3060,33 @@ public class GroupMetadataManager { @@ -3060,6 +3060,33 @@ public class GroupMetadataManager {
group.remove(member.memberId());
}
/**
* Handles a DeleteGroups request.
* Populates the record list passed in with record to update the state machine.
* Validations are done in {@link GroupCoordinatorShard#deleteGroups(RequestContext, List)} by
* calling {@link GroupMetadataManager#validateDeleteGroup(String)}.
*
* @param groupId The id of the group to be deleted. It has been checked in {@link GroupMetadataManager#validateDeleteGroup}.
* @param records The record list to populate.
*/
public void deleteGroup(
String groupId,
List<Record> records
) {
// At this point, we have already validated the group id, so we know that the group exists and that no exception will be thrown.
group(groupId).createGroupTombstoneRecords(records);
}
/**
* Validates the DeleteGroups request.
*
* @param groupId The id of the group to be deleted.
*/
void validateDeleteGroup(String groupId) throws ApiException {
Group group = group(groupId);
group.validateDeleteGroup();
}
/**
* Checks whether the given protocol type or name in the request is inconsistent with the group's.
*

104
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/OffsetMetadataManager.java

@ -23,8 +23,10 @@ import org.apache.kafka.common.message.OffsetCommitRequestData; @@ -23,8 +23,10 @@ import org.apache.kafka.common.message.OffsetCommitRequestData;
import org.apache.kafka.common.message.OffsetCommitResponseData;
import org.apache.kafka.common.message.OffsetCommitResponseData.OffsetCommitResponseTopic;
import org.apache.kafka.common.message.OffsetCommitResponseData.OffsetCommitResponsePartition;
import org.apache.kafka.common.message.OffsetDeleteRequestData;
import org.apache.kafka.common.message.OffsetFetchRequestData;
import org.apache.kafka.common.message.OffsetFetchResponseData;
import org.apache.kafka.common.message.OffsetDeleteResponseData;
import org.apache.kafka.common.protocol.Errors;
import org.apache.kafka.common.requests.OffsetCommitRequest;
import org.apache.kafka.common.requests.RequestContext;
@ -45,6 +47,7 @@ import java.util.ArrayList; @@ -45,6 +47,7 @@ import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.OptionalLong;
import java.util.concurrent.atomic.AtomicInteger;
import static org.apache.kafka.common.requests.OffsetFetchResponse.INVALID_OFFSET;
@ -242,6 +245,19 @@ public class OffsetMetadataManager { @@ -242,6 +245,19 @@ public class OffsetMetadataManager {
);
}
/**
* Validates an OffsetDelete request.
*
* @param request The actual request.
*/
private Group validateOffsetDelete(
OffsetDeleteRequestData request
) throws GroupIdNotFoundException {
Group group = groupMetadataManager.group(request.groupId());
group.validateOffsetDelete();
return group;
}
/**
* Computes the expiration timestamp based on the retention time provided in the OffsetCommit
* request.
@ -333,6 +349,94 @@ public class OffsetMetadataManager { @@ -333,6 +349,94 @@ public class OffsetMetadataManager {
return new CoordinatorResult<>(records, response);
}
/**
* Handles an OffsetDelete request.
*
* @param request The OffsetDelete request.
*
* @return A Result containing the OffsetDeleteResponseData response and
* a list of records to update the state machine.
*/
public CoordinatorResult<OffsetDeleteResponseData, Record> deleteOffsets(
OffsetDeleteRequestData request
) throws ApiException {
final Group group = validateOffsetDelete(request);
final List<Record> records = new ArrayList<>();
final OffsetDeleteResponseData.OffsetDeleteResponseTopicCollection responseTopicCollection =
new OffsetDeleteResponseData.OffsetDeleteResponseTopicCollection();
final TimelineHashMap<String, TimelineHashMap<Integer, OffsetAndMetadata>> offsetsByTopic =
offsetsByGroup.get(request.groupId());
request.topics().forEach(topic -> {
final OffsetDeleteResponseData.OffsetDeleteResponsePartitionCollection responsePartitionCollection =
new OffsetDeleteResponseData.OffsetDeleteResponsePartitionCollection();
if (group.isSubscribedToTopic(topic.name())) {
topic.partitions().forEach(partition ->
responsePartitionCollection.add(new OffsetDeleteResponseData.OffsetDeleteResponsePartition()
.setPartitionIndex(partition.partitionIndex())
.setErrorCode(Errors.GROUP_SUBSCRIBED_TO_TOPIC.code())
)
);
} else {
final TimelineHashMap<Integer, OffsetAndMetadata> offsetsByPartition = offsetsByTopic == null ?
null : offsetsByTopic.get(topic.name());
if (offsetsByPartition != null) {
topic.partitions().forEach(partition -> {
if (offsetsByPartition.containsKey(partition.partitionIndex())) {
responsePartitionCollection.add(new OffsetDeleteResponseData.OffsetDeleteResponsePartition()
.setPartitionIndex(partition.partitionIndex())
);
records.add(RecordHelpers.newOffsetCommitTombstoneRecord(
request.groupId(),
topic.name(),
partition.partitionIndex()
));
}
});
}
}
responseTopicCollection.add(new OffsetDeleteResponseData.OffsetDeleteResponseTopic()
.setName(topic.name())
.setPartitions(responsePartitionCollection)
);
});
return new CoordinatorResult<>(
records,
new OffsetDeleteResponseData().setTopics(responseTopicCollection)
);
}
/**
* Deletes offsets as part of a DeleteGroups request.
* Populates the record list passed in with records to update the state machine.
* Validations are done in {@link GroupCoordinatorShard#deleteGroups(RequestContext, List)}
*
* @param groupId The id of the given group.
* @param records The record list to populate.
*
* @return The number of offsets to be deleted.
*/
public int deleteAllOffsets(
String groupId,
List<Record> records
) {
TimelineHashMap<String, TimelineHashMap<Integer, OffsetAndMetadata>> offsetsByTopic = offsetsByGroup.get(groupId);
AtomicInteger numDeletedOffsets = new AtomicInteger();
if (offsetsByTopic != null) {
offsetsByTopic.forEach((topic, offsetsByPartition) ->
offsetsByPartition.keySet().forEach(partition -> {
records.add(RecordHelpers.newOffsetCommitTombstoneRecord(groupId, topic, partition));
numDeletedOffsets.getAndIncrement();
})
);
}
return numDeletedOffsets.get();
}
/**
* Fetch offsets for a given Group.
*

43
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/consumer/ConsumerGroup.java

@ -18,10 +18,14 @@ package org.apache.kafka.coordinator.group.consumer; @@ -18,10 +18,14 @@ package org.apache.kafka.coordinator.group.consumer;
import org.apache.kafka.clients.consumer.internals.ConsumerProtocol;
import org.apache.kafka.common.Uuid;
import org.apache.kafka.common.errors.ApiException;
import org.apache.kafka.common.errors.StaleMemberEpochException;
import org.apache.kafka.common.errors.UnknownMemberIdException;
import org.apache.kafka.common.message.ListGroupsResponseData;
import org.apache.kafka.common.protocol.Errors;
import org.apache.kafka.coordinator.group.Group;
import org.apache.kafka.coordinator.group.Record;
import org.apache.kafka.coordinator.group.RecordHelpers;
import org.apache.kafka.image.ClusterImage;
import org.apache.kafka.image.TopicImage;
import org.apache.kafka.image.TopicsImage;
@ -33,6 +37,7 @@ import org.apache.kafka.timeline.TimelineObject; @@ -33,6 +37,7 @@ import org.apache.kafka.timeline.TimelineObject;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
@ -341,6 +346,16 @@ public class ConsumerGroup implements Group { @@ -341,6 +346,16 @@ public class ConsumerGroup implements Group {
return Collections.unmodifiableSet(subscribedTopicNames.keySet());
}
/**
* Returns true if the consumer group is actively subscribed to the topic.
*
* @param topic The topic name.
* @return whether the group is subscribed to the topic.
*/
public boolean isSubscribedToTopic(String topic) {
return subscribedTopicNames.containsKey(topic);
}
/**
* Returns the target assignment of the member.
*
@ -592,6 +607,34 @@ public class ConsumerGroup implements Group { @@ -592,6 +607,34 @@ public class ConsumerGroup implements Group {
validateMemberEpoch(memberEpoch, member.memberEpoch());
}
/**
* Validates the OffsetDelete request.
*/
@Override
public void validateOffsetDelete() {}
/**
* Validates the DeleteGroups request.
*/
@Override
public void validateDeleteGroup() throws ApiException {
if (state() != ConsumerGroupState.EMPTY) {
throw Errors.NON_EMPTY_GROUP.exception();
}
}
/**
* Populates the list of records with tombstone(s) for deleting the group.
*
* @param records The list of records.
*/
@Override
public void createGroupTombstoneRecords(List<Record> records) {
records.add(RecordHelpers.newTargetAssignmentEpochTombstoneRecord(groupId()));
records.add(RecordHelpers.newGroupSubscriptionMetadataTombstoneRecord(groupId()));
records.add(RecordHelpers.newGroupEpochTombstoneRecord(groupId()));
}
/**
* Throws a StaleMemberEpochException if the received member epoch does not match
* the expected member epoch.

53
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/generic/GenericGroup.java

@ -17,6 +17,7 @@ @@ -17,6 +17,7 @@
package org.apache.kafka.coordinator.group.generic;
import org.apache.kafka.clients.consumer.internals.ConsumerProtocol;
import org.apache.kafka.common.errors.ApiException;
import org.apache.kafka.common.errors.CoordinatorNotAvailableException;
import org.apache.kafka.common.errors.FencedInstanceIdException;
import org.apache.kafka.common.errors.GroupIdNotFoundException;
@ -32,6 +33,8 @@ import org.apache.kafka.common.protocol.types.SchemaException; @@ -32,6 +33,8 @@ import org.apache.kafka.common.protocol.types.SchemaException;
import org.apache.kafka.common.utils.LogContext;
import org.apache.kafka.common.utils.Time;
import org.apache.kafka.coordinator.group.Group;
import org.apache.kafka.coordinator.group.Record;
import org.apache.kafka.coordinator.group.RecordHelpers;
import org.slf4j.Logger;
import java.nio.ByteBuffer;
@ -54,6 +57,7 @@ import static org.apache.kafka.coordinator.group.generic.GenericGroupState.COMPL @@ -54,6 +57,7 @@ import static org.apache.kafka.coordinator.group.generic.GenericGroupState.COMPL
import static org.apache.kafka.coordinator.group.generic.GenericGroupState.DEAD;
import static org.apache.kafka.coordinator.group.generic.GenericGroupState.EMPTY;
import static org.apache.kafka.coordinator.group.generic.GenericGroupState.PREPARING_REBALANCE;
import static org.apache.kafka.coordinator.group.generic.GenericGroupState.STABLE;
/**
* This class holds metadata for a generic group where the
@ -849,6 +853,51 @@ public class GenericGroup implements Group { @@ -849,6 +853,51 @@ public class GenericGroup implements Group {
}
}
/**
* Validates the OffsetDelete request.
*/
@Override
public void validateOffsetDelete() throws ApiException {
switch (currentState()) {
case DEAD:
throw new GroupIdNotFoundException(String.format("Group %s is in dead state.", groupId));
case STABLE:
case PREPARING_REBALANCE:
case COMPLETING_REBALANCE:
if (!usesConsumerGroupProtocol()) {
throw Errors.NON_EMPTY_GROUP.exception();
}
break;
default:
}
}
/**
* Validates the DeleteGroups request.
*/
@Override
public void validateDeleteGroup() throws ApiException {
switch (currentState()) {
case DEAD:
throw new GroupIdNotFoundException(String.format("Group %s is in dead state.", groupId));
case STABLE:
case PREPARING_REBALANCE:
case COMPLETING_REBALANCE:
throw Errors.NON_EMPTY_GROUP.exception();
default:
}
}
/**
* Populates the list of records with tombstone(s) for deleting the group.
*
* @param records The list of records.
*/
@Override
public void createGroupTombstoneRecords(List<Record> records) {
records.add(RecordHelpers.newGroupMetadataTombstoneRecord(groupId()));
}
/**
* Verify the member id is up to date for static members. Return true if both conditions met:
* 1. given member is a known static member to group
@ -1015,10 +1064,10 @@ public class GenericGroup implements Group { @@ -1015,10 +1064,10 @@ public class GenericGroup implements Group {
/**
* Returns true if the consumer group is actively subscribed to the topic. When the consumer
* group does not know, because the information is not available yet or because the it has
* group does not know, because the information is not available yet or because it has
* failed to parse the Consumer Protocol, it returns true to be safe.
*
* @param topic the topic name.
* @param topic The topic name.
* @return whether the group is subscribed to the topic.
*/
public boolean isSubscribedToTopic(String topic) {

246
group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupCoordinatorServiceTest.java

@ -34,12 +34,15 @@ import org.apache.kafka.common.errors.UnknownMemberIdException; @@ -34,12 +34,15 @@ import org.apache.kafka.common.errors.UnknownMemberIdException;
import org.apache.kafka.common.errors.UnknownTopicOrPartitionException;
import org.apache.kafka.common.message.ConsumerGroupHeartbeatRequestData;
import org.apache.kafka.common.message.ConsumerGroupHeartbeatResponseData;
import org.apache.kafka.common.message.DeleteGroupsResponseData;
import org.apache.kafka.common.message.HeartbeatRequestData;
import org.apache.kafka.common.message.HeartbeatResponseData;
import org.apache.kafka.common.message.JoinGroupRequestData;
import org.apache.kafka.common.message.JoinGroupResponseData;
import org.apache.kafka.common.message.ListGroupsRequestData;
import org.apache.kafka.common.message.ListGroupsResponseData;
import org.apache.kafka.common.message.OffsetDeleteRequestData;
import org.apache.kafka.common.message.OffsetDeleteResponseData;
import org.apache.kafka.common.message.OffsetFetchRequestData;
import org.apache.kafka.common.message.OffsetFetchResponseData;
import org.apache.kafka.common.message.LeaveGroupRequestData;
@ -499,7 +502,6 @@ public class GroupCoordinatorServiceTest { @@ -499,7 +502,6 @@ public class GroupCoordinatorServiceTest {
.setGroupId(null)
.setMemberId(UNKNOWN_MEMBER_ID);
CompletableFuture<SyncGroupResponseData> response = service.syncGroup(
requestContext(ApiKeys.SYNC_GROUP),
request,
@ -936,4 +938,246 @@ public class GroupCoordinatorServiceTest { @@ -936,4 +938,246 @@ public class GroupCoordinatorServiceTest {
assertEquals(expectedResponse, future.get());
}
@Test
public void testDeleteOffsets() throws Exception {
CoordinatorRuntime<GroupCoordinatorShard, Record> runtime = mockRuntime();
GroupCoordinatorService service = new GroupCoordinatorService(
new LogContext(),
createConfig(),
runtime
);
service.startup(() -> 1);
OffsetDeleteRequestData.OffsetDeleteRequestTopicCollection requestTopicCollection =
new OffsetDeleteRequestData.OffsetDeleteRequestTopicCollection(Collections.singletonList(
new OffsetDeleteRequestData.OffsetDeleteRequestTopic()
.setName("topic")
.setPartitions(Collections.singletonList(
new OffsetDeleteRequestData.OffsetDeleteRequestPartition().setPartitionIndex(0)
))
).iterator());
OffsetDeleteRequestData request = new OffsetDeleteRequestData()
.setGroupId("group")
.setTopics(requestTopicCollection);
OffsetDeleteResponseData.OffsetDeleteResponsePartitionCollection responsePartitionCollection =
new OffsetDeleteResponseData.OffsetDeleteResponsePartitionCollection(Collections.singletonList(
new OffsetDeleteResponseData.OffsetDeleteResponsePartition().setPartitionIndex(0)
).iterator());
OffsetDeleteResponseData.OffsetDeleteResponseTopicCollection responseTopicCollection =
new OffsetDeleteResponseData.OffsetDeleteResponseTopicCollection(Collections.singletonList(
new OffsetDeleteResponseData.OffsetDeleteResponseTopic().setPartitions(responsePartitionCollection)
).iterator());
OffsetDeleteResponseData response = new OffsetDeleteResponseData()
.setTopics(responseTopicCollection);
when(runtime.scheduleWriteOperation(
ArgumentMatchers.eq("delete-offsets"),
ArgumentMatchers.eq(new TopicPartition("__consumer_offsets", 0)),
ArgumentMatchers.any()
)).thenReturn(CompletableFuture.completedFuture(response));
CompletableFuture<OffsetDeleteResponseData> future = service.deleteOffsets(
requestContext(ApiKeys.OFFSET_DELETE),
request,
BufferSupplier.NO_CACHING
);
assertTrue(future.isDone());
assertEquals(response, future.get());
}
@Test
public void testDeleteOffsetsInvalidGroupId() throws Exception {
CoordinatorRuntime<GroupCoordinatorShard, Record> runtime = mockRuntime();
GroupCoordinatorService service = new GroupCoordinatorService(
new LogContext(),
createConfig(),
runtime
);
service.startup(() -> 1);
OffsetDeleteRequestData.OffsetDeleteRequestTopicCollection requestTopicCollection =
new OffsetDeleteRequestData.OffsetDeleteRequestTopicCollection(Collections.singletonList(
new OffsetDeleteRequestData.OffsetDeleteRequestTopic()
.setName("topic")
.setPartitions(Collections.singletonList(
new OffsetDeleteRequestData.OffsetDeleteRequestPartition().setPartitionIndex(0)
))
).iterator());
OffsetDeleteRequestData request = new OffsetDeleteRequestData().setGroupId("")
.setTopics(requestTopicCollection);
OffsetDeleteResponseData response = new OffsetDeleteResponseData()
.setErrorCode(Errors.INVALID_GROUP_ID.code());
when(runtime.scheduleWriteOperation(
ArgumentMatchers.eq("delete-offsets"),
ArgumentMatchers.eq(new TopicPartition("__consumer_offsets", 0)),
ArgumentMatchers.any()
)).thenReturn(CompletableFuture.completedFuture(response));
CompletableFuture<OffsetDeleteResponseData> future = service.deleteOffsets(
requestContext(ApiKeys.OFFSET_DELETE),
request,
BufferSupplier.NO_CACHING
);
assertTrue(future.isDone());
assertEquals(response, future.get());
}
@ParameterizedTest
@MethodSource("testConsumerGroupHeartbeatWithExceptionSource")
public void testDeleteOffsetsWithException(
Throwable exception,
short expectedErrorCode
) throws Exception {
CoordinatorRuntime<GroupCoordinatorShard, Record> runtime = mockRuntime();
GroupCoordinatorService service = new GroupCoordinatorService(
new LogContext(),
createConfig(),
runtime
);
service.startup(() -> 1);
OffsetDeleteRequestData.OffsetDeleteRequestTopicCollection requestTopicCollection =
new OffsetDeleteRequestData.OffsetDeleteRequestTopicCollection(Collections.singletonList(
new OffsetDeleteRequestData.OffsetDeleteRequestTopic()
.setName("topic")
.setPartitions(Collections.singletonList(
new OffsetDeleteRequestData.OffsetDeleteRequestPartition().setPartitionIndex(0)
))
).iterator());
OffsetDeleteRequestData request = new OffsetDeleteRequestData()
.setGroupId("group")
.setTopics(requestTopicCollection);
OffsetDeleteResponseData response = new OffsetDeleteResponseData()
.setErrorCode(expectedErrorCode);
when(runtime.scheduleWriteOperation(
ArgumentMatchers.eq("delete-offsets"),
ArgumentMatchers.eq(new TopicPartition("__consumer_offsets", 0)),
ArgumentMatchers.any()
)).thenReturn(FutureUtils.failedFuture(exception));
CompletableFuture<OffsetDeleteResponseData> future = service.deleteOffsets(
requestContext(ApiKeys.OFFSET_DELETE),
request,
BufferSupplier.NO_CACHING
);
assertTrue(future.isDone());
assertEquals(response, future.get());
}
@Test
public void testDeleteGroups() throws Exception {
CoordinatorRuntime<GroupCoordinatorShard, Record> runtime = mockRuntime();
GroupCoordinatorService service = new GroupCoordinatorService(
new LogContext(),
createConfig(),
runtime
);
service.startup(() -> 3);
DeleteGroupsResponseData.DeletableGroupResultCollection resultCollection1 =
new DeleteGroupsResponseData.DeletableGroupResultCollection();
DeleteGroupsResponseData.DeletableGroupResult result1 = new DeleteGroupsResponseData.DeletableGroupResult()
.setGroupId("group-id-1");
resultCollection1.add(result1);
DeleteGroupsResponseData.DeletableGroupResultCollection resultCollection2 =
new DeleteGroupsResponseData.DeletableGroupResultCollection();
DeleteGroupsResponseData.DeletableGroupResult result2 = new DeleteGroupsResponseData.DeletableGroupResult()
.setGroupId("group-id-2");
resultCollection2.add(result2);
DeleteGroupsResponseData.DeletableGroupResult result3 = new DeleteGroupsResponseData.DeletableGroupResult()
.setGroupId("group-id-3")
.setErrorCode(Errors.COORDINATOR_LOAD_IN_PROGRESS.code());
DeleteGroupsResponseData.DeletableGroupResultCollection expectedResultCollection =
new DeleteGroupsResponseData.DeletableGroupResultCollection();
expectedResultCollection.addAll(Arrays.asList(
new DeleteGroupsResponseData.DeletableGroupResult().setGroupId(null).setErrorCode(Errors.INVALID_GROUP_ID.code()),
result2.duplicate(),
result3.duplicate(),
result1.duplicate()
));
when(runtime.partitions()).thenReturn(Sets.newSet(
new TopicPartition("__consumer_offsets", 0),
new TopicPartition("__consumer_offsets", 1),
new TopicPartition("__consumer_offsets", 2)
));
when(runtime.scheduleWriteOperation(
ArgumentMatchers.eq("delete-groups"),
ArgumentMatchers.eq(new TopicPartition("__consumer_offsets", 2)),
ArgumentMatchers.any()
)).thenReturn(CompletableFuture.completedFuture(resultCollection1));
CompletableFuture<Object> resultCollectionFuture = new CompletableFuture<>();
when(runtime.scheduleWriteOperation(
ArgumentMatchers.eq("delete-groups"),
ArgumentMatchers.eq(new TopicPartition("__consumer_offsets", 0)),
ArgumentMatchers.any()
)).thenReturn(resultCollectionFuture);
when(runtime.scheduleWriteOperation(
ArgumentMatchers.eq("delete-groups"),
ArgumentMatchers.eq(new TopicPartition("__consumer_offsets", 1)),
ArgumentMatchers.any()
)).thenReturn(FutureUtils.failedFuture(Errors.COORDINATOR_LOAD_IN_PROGRESS.exception()));
List<String> groupIds = Arrays.asList("group-id-1", "group-id-2", "group-id-3", null);
CompletableFuture<DeleteGroupsResponseData.DeletableGroupResultCollection> future =
service.deleteGroups(requestContext(ApiKeys.DELETE_GROUPS), groupIds, BufferSupplier.NO_CACHING);
assertFalse(future.isDone());
resultCollectionFuture.complete(resultCollection2);
assertTrue(future.isDone());
assertEquals(expectedResultCollection, future.get());
}
@ParameterizedTest
@MethodSource("testConsumerGroupHeartbeatWithExceptionSource")
public void testDeleteGroupsWithException(
Throwable exception,
short expectedErrorCode
) throws Exception {
CoordinatorRuntime<GroupCoordinatorShard, Record> runtime = mockRuntime();
GroupCoordinatorService service = new GroupCoordinatorService(
new LogContext(),
createConfig(),
runtime
);
service.startup(() -> 1);
when(runtime.scheduleWriteOperation(
ArgumentMatchers.eq("delete-groups"),
ArgumentMatchers.eq(new TopicPartition("__consumer_offsets", 0)),
ArgumentMatchers.any()
)).thenReturn(FutureUtils.failedFuture(exception));
CompletableFuture<DeleteGroupsResponseData.DeletableGroupResultCollection> future =
service.deleteGroups(
requestContext(ApiKeys.DELETE_GROUPS),
Collections.singletonList("group-id"),
BufferSupplier.NO_CACHING
);
assertEquals(
new DeleteGroupsResponseData.DeletableGroupResultCollection(Collections.singletonList(
new DeleteGroupsResponseData.DeletableGroupResult()
.setGroupId("group-id")
.setErrorCode(expectedErrorCode)
).iterator()),
future.get()
);
}
}

147
group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupCoordinatorShardTest.java

@ -18,10 +18,13 @@ package org.apache.kafka.coordinator.group; @@ -18,10 +18,13 @@ package org.apache.kafka.coordinator.group;
import org.apache.kafka.common.message.ConsumerGroupHeartbeatRequestData;
import org.apache.kafka.common.message.ConsumerGroupHeartbeatResponseData;
import org.apache.kafka.common.message.DeleteGroupsResponseData;
import org.apache.kafka.common.message.OffsetCommitRequestData;
import org.apache.kafka.common.message.OffsetCommitResponseData;
import org.apache.kafka.common.protocol.ApiKeys;
import org.apache.kafka.common.protocol.Errors;
import org.apache.kafka.common.requests.RequestContext;
import org.apache.kafka.common.utils.LogContext;
import org.apache.kafka.coordinator.group.generated.ConsumerGroupCurrentMemberAssignmentKey;
import org.apache.kafka.coordinator.group.generated.ConsumerGroupCurrentMemberAssignmentValue;
import org.apache.kafka.coordinator.group.generated.ConsumerGroupMemberMetadataKey;
@ -42,14 +45,22 @@ import org.apache.kafka.coordinator.group.runtime.CoordinatorResult; @@ -42,14 +45,22 @@ import org.apache.kafka.coordinator.group.runtime.CoordinatorResult;
import org.apache.kafka.image.MetadataImage;
import org.apache.kafka.server.common.ApiMessageAndVersion;
import org.junit.jupiter.api.Test;
import org.mockito.ArgumentMatchers;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import static org.apache.kafka.coordinator.group.TestUtil.requestContext;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertThrows;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.anyList;
import static org.mockito.ArgumentMatchers.anyString;
import static org.mockito.ArgumentMatchers.eq;
import static org.mockito.Mockito.doAnswer;
import static org.mockito.Mockito.doThrow;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
@ -62,6 +73,7 @@ public class GroupCoordinatorShardTest { @@ -62,6 +73,7 @@ public class GroupCoordinatorShardTest {
GroupMetadataManager groupMetadataManager = mock(GroupMetadataManager.class);
OffsetMetadataManager offsetMetadataManager = mock(OffsetMetadataManager.class);
GroupCoordinatorShard coordinator = new GroupCoordinatorShard(
new LogContext(),
groupMetadataManager,
offsetMetadataManager
);
@ -86,6 +98,7 @@ public class GroupCoordinatorShardTest { @@ -86,6 +98,7 @@ public class GroupCoordinatorShardTest {
GroupMetadataManager groupMetadataManager = mock(GroupMetadataManager.class);
OffsetMetadataManager offsetMetadataManager = mock(OffsetMetadataManager.class);
GroupCoordinatorShard coordinator = new GroupCoordinatorShard(
new LogContext(),
groupMetadataManager,
offsetMetadataManager
);
@ -105,11 +118,127 @@ public class GroupCoordinatorShardTest { @@ -105,11 +118,127 @@ public class GroupCoordinatorShardTest {
assertEquals(result, coordinator.commitOffset(context, request));
}
@Test
public void testDeleteGroups() {
GroupMetadataManager groupMetadataManager = mock(GroupMetadataManager.class);
OffsetMetadataManager offsetMetadataManager = mock(OffsetMetadataManager.class);
GroupCoordinatorShard coordinator = new GroupCoordinatorShard(
new LogContext(),
groupMetadataManager,
offsetMetadataManager
);
RequestContext context = requestContext(ApiKeys.DELETE_GROUPS);
List<String> groupIds = Arrays.asList("group-id-1", "group-id-2");
DeleteGroupsResponseData.DeletableGroupResultCollection expectedResultCollection = new DeleteGroupsResponseData.DeletableGroupResultCollection();
List<Record> expectedRecords = new ArrayList<>();
for (String groupId : groupIds) {
expectedResultCollection.add(new DeleteGroupsResponseData.DeletableGroupResult().setGroupId(groupId));
expectedRecords.addAll(Arrays.asList(
RecordHelpers.newOffsetCommitTombstoneRecord(groupId, "topic-name", 0),
RecordHelpers.newGroupMetadataTombstoneRecord(groupId)
));
}
CoordinatorResult<DeleteGroupsResponseData.DeletableGroupResultCollection, Record> expectedResult = new CoordinatorResult<>(
expectedRecords,
expectedResultCollection
);
when(offsetMetadataManager.deleteAllOffsets(anyString(), anyList())).thenAnswer(invocation -> {
String groupId = invocation.getArgument(0);
List<Record> records = invocation.getArgument(1);
records.add(RecordHelpers.newOffsetCommitTombstoneRecord(groupId, "topic-name", 0));
return 1;
});
// Mockito#when only stubs method returning non-void value, so we use Mockito#doAnswer instead.
doAnswer(invocation -> {
String groupId = invocation.getArgument(0);
List<Record> records = invocation.getArgument(1);
records.add(RecordHelpers.newGroupMetadataTombstoneRecord(groupId));
return null;
}).when(groupMetadataManager).deleteGroup(anyString(), anyList());
CoordinatorResult<DeleteGroupsResponseData.DeletableGroupResultCollection, Record> coordinatorResult =
coordinator.deleteGroups(context, groupIds);
for (String groupId : groupIds) {
verify(groupMetadataManager, times(1)).validateDeleteGroup(ArgumentMatchers.eq(groupId));
verify(groupMetadataManager, times(1)).deleteGroup(ArgumentMatchers.eq(groupId), anyList());
verify(offsetMetadataManager, times(1)).deleteAllOffsets(ArgumentMatchers.eq(groupId), anyList());
}
assertEquals(expectedResult, coordinatorResult);
}
@Test
public void testDeleteGroupsInvalidGroupId() {
GroupMetadataManager groupMetadataManager = mock(GroupMetadataManager.class);
OffsetMetadataManager offsetMetadataManager = mock(OffsetMetadataManager.class);
GroupCoordinatorShard coordinator = new GroupCoordinatorShard(
new LogContext(),
groupMetadataManager,
offsetMetadataManager
);
RequestContext context = requestContext(ApiKeys.DELETE_GROUPS);
List<String> groupIds = Arrays.asList("group-id-1", "group-id-2", "group-id-3");
DeleteGroupsResponseData.DeletableGroupResultCollection expectedResultCollection =
new DeleteGroupsResponseData.DeletableGroupResultCollection(Arrays.asList(
new DeleteGroupsResponseData.DeletableGroupResult()
.setGroupId("group-id-1"),
new DeleteGroupsResponseData.DeletableGroupResult()
.setGroupId("group-id-2")
.setErrorCode(Errors.INVALID_GROUP_ID.code()),
new DeleteGroupsResponseData.DeletableGroupResult()
.setGroupId("group-id-3")
).iterator());
List<Record> expectedRecords = Arrays.asList(
RecordHelpers.newOffsetCommitTombstoneRecord("group-id-1", "topic-name", 0),
RecordHelpers.newGroupMetadataTombstoneRecord("group-id-1"),
RecordHelpers.newOffsetCommitTombstoneRecord("group-id-3", "topic-name", 0),
RecordHelpers.newGroupMetadataTombstoneRecord("group-id-3")
);
CoordinatorResult<DeleteGroupsResponseData.DeletableGroupResultCollection, Record> expectedResult = new CoordinatorResult<>(
expectedRecords,
expectedResultCollection
);
// Mockito#when only stubs method returning non-void value, so we use Mockito#doAnswer and Mockito#doThrow instead.
doThrow(Errors.INVALID_GROUP_ID.exception())
.when(groupMetadataManager).validateDeleteGroup(ArgumentMatchers.eq("group-id-2"));
doAnswer(invocation -> {
String groupId = invocation.getArgument(0);
List<Record> records = invocation.getArgument(1);
records.add(RecordHelpers.newOffsetCommitTombstoneRecord(groupId, "topic-name", 0));
return null;
}).when(offsetMetadataManager).deleteAllOffsets(anyString(), anyList());
doAnswer(invocation -> {
String groupId = invocation.getArgument(0);
List<Record> records = invocation.getArgument(1);
records.add(RecordHelpers.newGroupMetadataTombstoneRecord(groupId));
return null;
}).when(groupMetadataManager).deleteGroup(anyString(), anyList());
CoordinatorResult<DeleteGroupsResponseData.DeletableGroupResultCollection, Record> coordinatorResult =
coordinator.deleteGroups(context, groupIds);
for (String groupId : groupIds) {
verify(groupMetadataManager, times(1)).validateDeleteGroup(eq(groupId));
if (!groupId.equals("group-id-2")) {
verify(groupMetadataManager, times(1)).deleteGroup(eq(groupId), anyList());
verify(offsetMetadataManager, times(1)).deleteAllOffsets(eq(groupId), anyList());
}
}
assertEquals(expectedResult, coordinatorResult);
}
@Test
public void testReplayOffsetCommit() {
GroupMetadataManager groupMetadataManager = mock(GroupMetadataManager.class);
OffsetMetadataManager offsetMetadataManager = mock(OffsetMetadataManager.class);
GroupCoordinatorShard coordinator = new GroupCoordinatorShard(
new LogContext(),
groupMetadataManager,
offsetMetadataManager
);
@ -135,6 +264,7 @@ public class GroupCoordinatorShardTest { @@ -135,6 +264,7 @@ public class GroupCoordinatorShardTest {
GroupMetadataManager groupMetadataManager = mock(GroupMetadataManager.class);
OffsetMetadataManager offsetMetadataManager = mock(OffsetMetadataManager.class);
GroupCoordinatorShard coordinator = new GroupCoordinatorShard(
new LogContext(),
groupMetadataManager,
offsetMetadataManager
);
@ -159,6 +289,7 @@ public class GroupCoordinatorShardTest { @@ -159,6 +289,7 @@ public class GroupCoordinatorShardTest {
GroupMetadataManager groupMetadataManager = mock(GroupMetadataManager.class);
OffsetMetadataManager offsetMetadataManager = mock(OffsetMetadataManager.class);
GroupCoordinatorShard coordinator = new GroupCoordinatorShard(
new LogContext(),
groupMetadataManager,
offsetMetadataManager
);
@ -179,6 +310,7 @@ public class GroupCoordinatorShardTest { @@ -179,6 +310,7 @@ public class GroupCoordinatorShardTest {
GroupMetadataManager groupMetadataManager = mock(GroupMetadataManager.class);
OffsetMetadataManager offsetMetadataManager = mock(OffsetMetadataManager.class);
GroupCoordinatorShard coordinator = new GroupCoordinatorShard(
new LogContext(),
groupMetadataManager,
offsetMetadataManager
);
@ -198,6 +330,7 @@ public class GroupCoordinatorShardTest { @@ -198,6 +330,7 @@ public class GroupCoordinatorShardTest {
GroupMetadataManager groupMetadataManager = mock(GroupMetadataManager.class);
OffsetMetadataManager offsetMetadataManager = mock(OffsetMetadataManager.class);
GroupCoordinatorShard coordinator = new GroupCoordinatorShard(
new LogContext(),
groupMetadataManager,
offsetMetadataManager
);
@ -218,6 +351,7 @@ public class GroupCoordinatorShardTest { @@ -218,6 +351,7 @@ public class GroupCoordinatorShardTest {
GroupMetadataManager groupMetadataManager = mock(GroupMetadataManager.class);
OffsetMetadataManager offsetMetadataManager = mock(OffsetMetadataManager.class);
GroupCoordinatorShard coordinator = new GroupCoordinatorShard(
new LogContext(),
groupMetadataManager,
offsetMetadataManager
);
@ -237,6 +371,7 @@ public class GroupCoordinatorShardTest { @@ -237,6 +371,7 @@ public class GroupCoordinatorShardTest {
GroupMetadataManager groupMetadataManager = mock(GroupMetadataManager.class);
OffsetMetadataManager offsetMetadataManager = mock(OffsetMetadataManager.class);
GroupCoordinatorShard coordinator = new GroupCoordinatorShard(
new LogContext(),
groupMetadataManager,
offsetMetadataManager
);
@ -257,6 +392,7 @@ public class GroupCoordinatorShardTest { @@ -257,6 +392,7 @@ public class GroupCoordinatorShardTest {
GroupMetadataManager groupMetadataManager = mock(GroupMetadataManager.class);
OffsetMetadataManager offsetMetadataManager = mock(OffsetMetadataManager.class);
GroupCoordinatorShard coordinator = new GroupCoordinatorShard(
new LogContext(),
groupMetadataManager,
offsetMetadataManager
);
@ -276,6 +412,7 @@ public class GroupCoordinatorShardTest { @@ -276,6 +412,7 @@ public class GroupCoordinatorShardTest {
GroupMetadataManager groupMetadataManager = mock(GroupMetadataManager.class);
OffsetMetadataManager offsetMetadataManager = mock(OffsetMetadataManager.class);
GroupCoordinatorShard coordinator = new GroupCoordinatorShard(
new LogContext(),
groupMetadataManager,
offsetMetadataManager
);
@ -296,6 +433,7 @@ public class GroupCoordinatorShardTest { @@ -296,6 +433,7 @@ public class GroupCoordinatorShardTest {
GroupMetadataManager groupMetadataManager = mock(GroupMetadataManager.class);
OffsetMetadataManager offsetMetadataManager = mock(OffsetMetadataManager.class);
GroupCoordinatorShard coordinator = new GroupCoordinatorShard(
new LogContext(),
groupMetadataManager,
offsetMetadataManager
);
@ -315,6 +453,7 @@ public class GroupCoordinatorShardTest { @@ -315,6 +453,7 @@ public class GroupCoordinatorShardTest {
GroupMetadataManager groupMetadataManager = mock(GroupMetadataManager.class);
OffsetMetadataManager offsetMetadataManager = mock(OffsetMetadataManager.class);
GroupCoordinatorShard coordinator = new GroupCoordinatorShard(
new LogContext(),
groupMetadataManager,
offsetMetadataManager
);
@ -335,6 +474,7 @@ public class GroupCoordinatorShardTest { @@ -335,6 +474,7 @@ public class GroupCoordinatorShardTest {
GroupMetadataManager groupMetadataManager = mock(GroupMetadataManager.class);
OffsetMetadataManager offsetMetadataManager = mock(OffsetMetadataManager.class);
GroupCoordinatorShard coordinator = new GroupCoordinatorShard(
new LogContext(),
groupMetadataManager,
offsetMetadataManager
);
@ -354,6 +494,7 @@ public class GroupCoordinatorShardTest { @@ -354,6 +494,7 @@ public class GroupCoordinatorShardTest {
GroupMetadataManager groupMetadataManager = mock(GroupMetadataManager.class);
OffsetMetadataManager offsetMetadataManager = mock(OffsetMetadataManager.class);
GroupCoordinatorShard coordinator = new GroupCoordinatorShard(
new LogContext(),
groupMetadataManager,
offsetMetadataManager
);
@ -374,6 +515,7 @@ public class GroupCoordinatorShardTest { @@ -374,6 +515,7 @@ public class GroupCoordinatorShardTest {
GroupMetadataManager groupMetadataManager = mock(GroupMetadataManager.class);
OffsetMetadataManager offsetMetadataManager = mock(OffsetMetadataManager.class);
GroupCoordinatorShard coordinator = new GroupCoordinatorShard(
new LogContext(),
groupMetadataManager,
offsetMetadataManager
);
@ -393,6 +535,7 @@ public class GroupCoordinatorShardTest { @@ -393,6 +535,7 @@ public class GroupCoordinatorShardTest {
GroupMetadataManager groupMetadataManager = mock(GroupMetadataManager.class);
OffsetMetadataManager offsetMetadataManager = mock(OffsetMetadataManager.class);
GroupCoordinatorShard coordinator = new GroupCoordinatorShard(
new LogContext(),
groupMetadataManager,
offsetMetadataManager
);
@ -405,6 +548,7 @@ public class GroupCoordinatorShardTest { @@ -405,6 +548,7 @@ public class GroupCoordinatorShardTest {
GroupMetadataManager groupMetadataManager = mock(GroupMetadataManager.class);
OffsetMetadataManager offsetMetadataManager = mock(OffsetMetadataManager.class);
GroupCoordinatorShard coordinator = new GroupCoordinatorShard(
new LogContext(),
groupMetadataManager,
offsetMetadataManager
);
@ -424,6 +568,7 @@ public class GroupCoordinatorShardTest { @@ -424,6 +568,7 @@ public class GroupCoordinatorShardTest {
GroupMetadataManager groupMetadataManager = mock(GroupMetadataManager.class);
OffsetMetadataManager offsetMetadataManager = mock(OffsetMetadataManager.class);
GroupCoordinatorShard coordinator = new GroupCoordinatorShard(
new LogContext(),
groupMetadataManager,
offsetMetadataManager
);
@ -443,6 +588,7 @@ public class GroupCoordinatorShardTest { @@ -443,6 +588,7 @@ public class GroupCoordinatorShardTest {
GroupMetadataManager groupMetadataManager = mock(GroupMetadataManager.class);
OffsetMetadataManager offsetMetadataManager = mock(OffsetMetadataManager.class);
GroupCoordinatorShard coordinator = new GroupCoordinatorShard(
new LogContext(),
groupMetadataManager,
offsetMetadataManager
);
@ -463,6 +609,7 @@ public class GroupCoordinatorShardTest { @@ -463,6 +609,7 @@ public class GroupCoordinatorShardTest {
GroupMetadataManager groupMetadataManager = mock(GroupMetadataManager.class);
OffsetMetadataManager offsetMetadataManager = mock(OffsetMetadataManager.class);
GroupCoordinatorShard coordinator = new GroupCoordinatorShard(
new LogContext(),
groupMetadataManager,
offsetMetadataManager
);

30
group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTest.java

@ -738,7 +738,7 @@ public class GroupMetadataManagerTest { @@ -738,7 +738,7 @@ public class GroupMetadataManagerTest {
request,
responseFuture
);
return new SyncResult(responseFuture, coordinatorResult);
}
@ -9355,6 +9355,34 @@ public class GroupMetadataManagerTest { @@ -9355,6 +9355,34 @@ public class GroupMetadataManagerTest {
assertEquals(expectedResponse, leaveResult.response());
}
@Test
public void testGenericGroupDelete() {
GroupMetadataManagerTestContext context = new GroupMetadataManagerTestContext.Builder()
.build();
GenericGroup group = context.createGenericGroup("group-id");
List<Record> expectedRecords = Collections.singletonList(RecordHelpers.newGroupMetadataTombstoneRecord("group-id"));
List<Record> records = new ArrayList<>();
context.groupMetadataManager.deleteGroup("group-id", records);
assertEquals(expectedRecords, records);
}
@Test
public void testConsumerGroupDelete() {
GroupMetadataManagerTestContext context = new GroupMetadataManagerTestContext.Builder()
.build();
ConsumerGroup group = context.groupMetadataManager.getOrMaybeCreateConsumerGroup("group-id", true);
List<Record> expectedRecords = Arrays.asList(
RecordHelpers.newTargetAssignmentEpochTombstoneRecord("group-id"),
RecordHelpers.newGroupSubscriptionMetadataTombstoneRecord("group-id"),
RecordHelpers.newGroupEpochTombstoneRecord("group-id")
);
List<Record> records = new ArrayList<>();
context.groupMetadataManager.deleteGroup("group-id", records);
assertEquals(expectedRecords, records);
}
private static void assertNoOrEmptyResult(List<ExpiredTimeout<Void, Record>> timeouts) {
assertTrue(timeouts.size() <= 1);
timeouts.forEach(timeout -> assertEquals(EMPTY_RESULT, timeout.result));

206
group-coordinator/src/test/java/org/apache/kafka/coordinator/group/OffsetMetadataManagerTest.java

@ -16,6 +16,7 @@ @@ -16,6 +16,7 @@
*/
package org.apache.kafka.coordinator.group;
import org.apache.kafka.common.Uuid;
import org.apache.kafka.common.errors.CoordinatorNotAvailableException;
import org.apache.kafka.common.errors.GroupIdNotFoundException;
import org.apache.kafka.common.errors.IllegalGenerationException;
@ -26,6 +27,8 @@ import org.apache.kafka.common.errors.UnsupportedVersionException; @@ -26,6 +27,8 @@ import org.apache.kafka.common.errors.UnsupportedVersionException;
import org.apache.kafka.common.message.JoinGroupRequestData;
import org.apache.kafka.common.message.OffsetCommitRequestData;
import org.apache.kafka.common.message.OffsetCommitResponseData;
import org.apache.kafka.common.message.OffsetDeleteRequestData;
import org.apache.kafka.common.message.OffsetDeleteResponseData;
import org.apache.kafka.common.message.OffsetFetchRequestData;
import org.apache.kafka.common.message.OffsetFetchResponseData;
import org.apache.kafka.common.network.ClientInformation;
@ -55,8 +58,10 @@ import org.apache.kafka.server.common.MetadataVersion; @@ -55,8 +58,10 @@ import org.apache.kafka.server.common.MetadataVersion;
import org.apache.kafka.timeline.SnapshotRegistry;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.EnumSource;
import java.net.InetAddress;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
@ -70,6 +75,7 @@ import static org.junit.jupiter.api.Assertions.assertFalse; @@ -70,6 +75,7 @@ import static org.junit.jupiter.api.Assertions.assertFalse;
import static org.junit.jupiter.api.Assertions.assertNotNull;
import static org.junit.jupiter.api.Assertions.assertNull;
import static org.junit.jupiter.api.Assertions.assertThrows;
import static org.junit.jupiter.api.Assertions.assertTrue;
public class OffsetMetadataManagerTest {
static class OffsetMetadataManagerTestContext {
@ -177,6 +183,26 @@ public class OffsetMetadataManagerTest { @@ -177,6 +183,26 @@ public class OffsetMetadataManagerTest {
return result;
}
public CoordinatorResult<OffsetDeleteResponseData, Record> deleteOffsets(
OffsetDeleteRequestData request
) {
CoordinatorResult<OffsetDeleteResponseData, Record> result = offsetMetadataManager.deleteOffsets(request);
result.records().forEach(this::replay);
return result;
}
public int deleteAllOffsets(
String groupId,
List<Record> records
) {
List<Record> addedRecords = new ArrayList<>();
int numDeletedOffsets = offsetMetadataManager.deleteAllOffsets(groupId, addedRecords);
addedRecords.forEach(this::replay);
records.addAll(addedRecords);
return numDeletedOffsets;
}
public List<OffsetFetchResponseData.OffsetFetchResponseTopics> fetchOffsets(
String groupId,
List<OffsetFetchRequestData.OffsetFetchRequestTopics> topics,
@ -257,8 +283,6 @@ public class OffsetMetadataManagerTest { @@ -257,8 +283,6 @@ public class OffsetMetadataManagerTest {
long offset,
int leaderEpoch
) {
snapshotRegistry.getOrCreateSnapshot(lastWrittenOffset);
replay(RecordHelpers.newOffsetCommitRecord(
groupId,
topic,
@ -274,6 +298,18 @@ public class OffsetMetadataManagerTest { @@ -274,6 +298,18 @@ public class OffsetMetadataManagerTest {
));
}
public void deleteOffset(
String groupId,
String topic,
int partition
) {
replay(RecordHelpers.newOffsetCommitTombstoneRecord(
groupId,
topic,
partition
));
}
private ApiMessage messageOrNull(ApiMessageAndVersion apiMessageAndVersion) {
if (apiMessageAndVersion == null) {
return null;
@ -285,6 +321,8 @@ public class OffsetMetadataManagerTest { @@ -285,6 +321,8 @@ public class OffsetMetadataManagerTest {
private void replay(
Record record
) {
snapshotRegistry.getOrCreateSnapshot(lastWrittenOffset);
ApiMessageAndVersion key = record.key();
ApiMessageAndVersion value = record.value();
@ -307,6 +345,63 @@ public class OffsetMetadataManagerTest { @@ -307,6 +345,63 @@ public class OffsetMetadataManagerTest {
lastWrittenOffset++;
}
public void testOffsetDeleteWith(
String groupId,
String topic,
int partition,
Errors expectedError
) {
final OffsetDeleteRequestData.OffsetDeleteRequestTopicCollection requestTopicCollection =
new OffsetDeleteRequestData.OffsetDeleteRequestTopicCollection(Collections.singletonList(
new OffsetDeleteRequestData.OffsetDeleteRequestTopic()
.setName(topic)
.setPartitions(Collections.singletonList(
new OffsetDeleteRequestData.OffsetDeleteRequestPartition().setPartitionIndex(partition)
))
).iterator());
final OffsetDeleteResponseData.OffsetDeleteResponsePartitionCollection expectedResponsePartitionCollection =
new OffsetDeleteResponseData.OffsetDeleteResponsePartitionCollection();
if (hasOffset(groupId, topic, partition)) {
expectedResponsePartitionCollection.add(
new OffsetDeleteResponseData.OffsetDeleteResponsePartition()
.setPartitionIndex(partition)
.setErrorCode(expectedError.code())
);
}
final OffsetDeleteResponseData.OffsetDeleteResponseTopicCollection expectedResponseTopicCollection =
new OffsetDeleteResponseData.OffsetDeleteResponseTopicCollection(Collections.singletonList(
new OffsetDeleteResponseData.OffsetDeleteResponseTopic()
.setName(topic)
.setPartitions(expectedResponsePartitionCollection)
).iterator());
List<Record> expectedRecords = Collections.emptyList();
if (hasOffset(groupId, topic, partition) && expectedError == Errors.NONE) {
expectedRecords = Collections.singletonList(
RecordHelpers.newOffsetCommitTombstoneRecord(groupId, topic, partition)
);
}
final CoordinatorResult<OffsetDeleteResponseData, Record> coordinatorResult = deleteOffsets(
new OffsetDeleteRequestData()
.setGroupId(groupId)
.setTopics(requestTopicCollection)
);
assertEquals(new OffsetDeleteResponseData().setTopics(expectedResponseTopicCollection), coordinatorResult.response());
assertEquals(expectedRecords, coordinatorResult.records());
}
public boolean hasOffset(
String groupId,
String topic,
int partition
) {
return offsetMetadataManager.offset(groupId, topic, partition) != null;
}
}
@ParameterizedTest
@ -1561,6 +1656,113 @@ public class OffsetMetadataManagerTest { @@ -1561,6 +1656,113 @@ public class OffsetMetadataManagerTest {
() -> context.fetchAllOffsets("group", "member", 10, Long.MAX_VALUE));
}
@Test
public void testGenericGroupOffsetDelete() {
OffsetMetadataManagerTestContext context = new OffsetMetadataManagerTestContext.Builder().build();
GenericGroup group = context.groupMetadataManager.getOrMaybeCreateGenericGroup(
"foo",
true
);
context.commitOffset("foo", "bar", 0, 100L, 0);
group.setSubscribedTopics(Optional.of(Collections.emptySet()));
context.testOffsetDeleteWith("foo", "bar", 0, Errors.NONE);
}
@Test
public void testGenericGroupOffsetDeleteWithErrors() {
OffsetMetadataManagerTestContext context = new OffsetMetadataManagerTestContext.Builder().build();
GenericGroup group = context.groupMetadataManager.getOrMaybeCreateGenericGroup(
"foo",
true
);
group.setSubscribedTopics(Optional.of(Collections.singleton("bar")));
context.commitOffset("foo", "bar", 0, 100L, 0);
// Delete the offset whose topic partition doesn't exist.
context.testOffsetDeleteWith("foo", "bar1", 0, Errors.NONE);
// Delete the offset from the topic that the group is subscribed to.
context.testOffsetDeleteWith("foo", "bar", 0, Errors.GROUP_SUBSCRIBED_TO_TOPIC);
}
@Test
public void testConsumerGroupOffsetDelete() {
OffsetMetadataManagerTestContext context = new OffsetMetadataManagerTestContext.Builder().build();
ConsumerGroup group = context.groupMetadataManager.getOrMaybeCreateConsumerGroup(
"foo",
true
);
context.commitOffset("foo", "bar", 0, 100L, 0);
assertFalse(group.isSubscribedToTopic("bar"));
context.testOffsetDeleteWith("foo", "bar", 0, Errors.NONE);
}
@Test
public void testConsumerGroupOffsetDeleteWithErrors() {
OffsetMetadataManagerTestContext context = new OffsetMetadataManagerTestContext.Builder().build();
ConsumerGroup group = context.groupMetadataManager.getOrMaybeCreateConsumerGroup(
"foo",
true
);
MetadataImage image = new GroupMetadataManagerTest.MetadataImageBuilder()
.addTopic(Uuid.randomUuid(), "foo", 1)
.addRacks()
.build();
ConsumerGroupMember member1 = new ConsumerGroupMember.Builder("member1")
.setSubscribedTopicNames(Collections.singletonList("bar"))
.build();
group.computeSubscriptionMetadata(
null,
member1,
image.topics(),
image.cluster()
);
group.updateMember(member1);
context.commitOffset("foo", "bar", 0, 100L, 0);
assertTrue(group.isSubscribedToTopic("bar"));
// Delete the offset whose topic partition doesn't exist.
context.testOffsetDeleteWith("foo", "bar1", 0, Errors.NONE);
// Delete the offset from the topic that the group is subscribed to.
context.testOffsetDeleteWith("foo", "bar", 0, Errors.GROUP_SUBSCRIBED_TO_TOPIC);
}
@ParameterizedTest
@EnumSource(Group.GroupType.class)
public void testDeleteGroupAllOffsets(Group.GroupType groupType) {
OffsetMetadataManagerTestContext context = new OffsetMetadataManagerTestContext.Builder().build();
switch (groupType) {
case GENERIC:
context.groupMetadataManager.getOrMaybeCreateGenericGroup(
"foo",
true
);
break;
case CONSUMER:
context.groupMetadataManager.getOrMaybeCreateConsumerGroup(
"foo",
true
);
break;
default:
throw new IllegalArgumentException("Invalid group type: " + groupType);
}
context.commitOffset("foo", "bar-0", 0, 100L, 0);
context.commitOffset("foo", "bar-0", 1, 100L, 0);
context.commitOffset("foo", "bar-1", 0, 100L, 0);
List<Record> expectedRecords = Arrays.asList(
RecordHelpers.newOffsetCommitTombstoneRecord("foo", "bar-1", 0),
RecordHelpers.newOffsetCommitTombstoneRecord("foo", "bar-0", 0),
RecordHelpers.newOffsetCommitTombstoneRecord("foo", "bar-0", 1)
);
List<Record> records = new ArrayList<>();
int numDeleteOffsets = context.deleteAllOffsets("foo", records);
assertEquals(expectedRecords, records);
assertEquals(3, numDeleteOffsets);
}
static private OffsetFetchResponseData.OffsetFetchResponsePartitions mkOffsetPartitionResponse(
int partition,
long offset,

30
group-coordinator/src/test/java/org/apache/kafka/coordinator/group/consumer/ConsumerGroupTest.java

@ -17,6 +17,7 @@ @@ -17,6 +17,7 @@
package org.apache.kafka.coordinator.group.consumer;
import org.apache.kafka.common.Uuid;
import org.apache.kafka.common.errors.GroupNotEmptyException;
import org.apache.kafka.common.errors.StaleMemberEpochException;
import org.apache.kafka.common.errors.UnknownMemberIdException;
import org.apache.kafka.common.utils.LogContext;
@ -35,6 +36,7 @@ import static org.apache.kafka.common.utils.Utils.mkMap; @@ -35,6 +36,7 @@ import static org.apache.kafka.common.utils.Utils.mkMap;
import static org.apache.kafka.coordinator.group.AssignmentTestUtil.mkAssignment;
import static org.apache.kafka.coordinator.group.AssignmentTestUtil.mkTopicAssignment;
import static org.apache.kafka.coordinator.group.RecordHelpersTest.mkMapOfPartitionRacks;
import static org.junit.jupiter.api.Assertions.assertDoesNotThrow;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertFalse;
import static org.junit.jupiter.api.Assertions.assertThrows;
@ -673,4 +675,32 @@ public class ConsumerGroupTest { @@ -673,4 +675,32 @@ public class ConsumerGroupTest {
// This should succeed.
group.validateOffsetFetch("member-id", 0, Long.MAX_VALUE);
}
@Test
public void testValidateDeleteGroup() {
ConsumerGroup consumerGroup = createConsumerGroup("foo");
assertEquals(ConsumerGroup.ConsumerGroupState.EMPTY, consumerGroup.state());
assertDoesNotThrow(consumerGroup::validateDeleteGroup);
ConsumerGroupMember member1 = new ConsumerGroupMember.Builder("member1")
.setMemberEpoch(1)
.setPreviousMemberEpoch(0)
.setTargetMemberEpoch(1)
.build();
consumerGroup.updateMember(member1);
assertEquals(ConsumerGroup.ConsumerGroupState.RECONCILING, consumerGroup.state());
assertThrows(GroupNotEmptyException.class, consumerGroup::validateDeleteGroup);
consumerGroup.setGroupEpoch(1);
assertEquals(ConsumerGroup.ConsumerGroupState.ASSIGNING, consumerGroup.state());
assertThrows(GroupNotEmptyException.class, consumerGroup::validateDeleteGroup);
consumerGroup.setTargetAssignmentEpoch(1);
assertEquals(ConsumerGroup.ConsumerGroupState.STABLE, consumerGroup.state());
assertThrows(GroupNotEmptyException.class, consumerGroup::validateDeleteGroup);
}
}

61
group-coordinator/src/test/java/org/apache/kafka/coordinator/group/generic/GenericGroupTest.java

@ -20,6 +20,8 @@ import org.apache.kafka.clients.consumer.ConsumerPartitionAssignor; @@ -20,6 +20,8 @@ import org.apache.kafka.clients.consumer.ConsumerPartitionAssignor;
import org.apache.kafka.clients.consumer.internals.ConsumerProtocol;
import org.apache.kafka.common.errors.CoordinatorNotAvailableException;
import org.apache.kafka.common.errors.FencedInstanceIdException;
import org.apache.kafka.common.errors.GroupIdNotFoundException;
import org.apache.kafka.common.errors.GroupNotEmptyException;
import org.apache.kafka.common.errors.IllegalGenerationException;
import org.apache.kafka.common.errors.RebalanceInProgressException;
import org.apache.kafka.common.errors.UnknownMemberIdException;
@ -45,6 +47,7 @@ import static org.apache.kafka.coordinator.group.generic.GenericGroupState.DEAD; @@ -45,6 +47,7 @@ import static org.apache.kafka.coordinator.group.generic.GenericGroupState.DEAD;
import static org.apache.kafka.coordinator.group.generic.GenericGroupState.EMPTY;
import static org.apache.kafka.coordinator.group.generic.GenericGroupState.PREPARING_REBALANCE;
import static org.apache.kafka.coordinator.group.generic.GenericGroupState.STABLE;
import static org.junit.jupiter.api.Assertions.assertDoesNotThrow;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertFalse;
import static org.junit.jupiter.api.Assertions.assertNull;
@ -776,7 +779,7 @@ public class GenericGroupTest { @@ -776,7 +779,7 @@ public class GenericGroupTest {
protocolType,
protocols
);
group.add(member);
assertTrue(group.hasMemberId(memberId));
assertTrue(group.hasStaticMember(groupInstanceId));
@ -1026,6 +1029,62 @@ public class GenericGroupTest { @@ -1026,6 +1029,62 @@ public class GenericGroupTest {
() -> group.validateOffsetCommit("member-id", "new-instance-id", 1));
}
@Test
public void testValidateOffsetDelete() {
assertFalse(group.usesConsumerGroupProtocol());
group.transitionTo(PREPARING_REBALANCE);
assertThrows(GroupNotEmptyException.class, group::validateOffsetDelete);
group.transitionTo(COMPLETING_REBALANCE);
assertThrows(GroupNotEmptyException.class, group::validateOffsetDelete);
group.transitionTo(STABLE);
assertThrows(GroupNotEmptyException.class, group::validateOffsetDelete);
JoinGroupRequestProtocolCollection protocols = new JoinGroupRequestProtocolCollection();
protocols.add(new JoinGroupRequestProtocol()
.setName("roundrobin")
.setMetadata(new byte[0]));
GenericGroupMember member = new GenericGroupMember(
memberId,
Optional.of(groupInstanceId),
clientId,
clientHost,
rebalanceTimeoutMs,
sessionTimeoutMs,
protocolType,
protocols
);
group.add(member);
assertTrue(group.usesConsumerGroupProtocol());
group.transitionTo(PREPARING_REBALANCE);
assertDoesNotThrow(group::validateOffsetDelete);
group.transitionTo(COMPLETING_REBALANCE);
assertDoesNotThrow(group::validateOffsetDelete);
group.transitionTo(STABLE);
assertDoesNotThrow(group::validateOffsetDelete);
group.transitionTo(PREPARING_REBALANCE);
group.transitionTo(EMPTY);
assertDoesNotThrow(group::validateOffsetDelete);
group.transitionTo(DEAD);
assertThrows(GroupIdNotFoundException.class, group::validateOffsetDelete);
}
@Test
public void testValidateDeleteGroup() {
group.transitionTo(PREPARING_REBALANCE);
assertThrows(GroupNotEmptyException.class, group::validateDeleteGroup);
group.transitionTo(COMPLETING_REBALANCE);
assertThrows(GroupNotEmptyException.class, group::validateDeleteGroup);
group.transitionTo(STABLE);
assertThrows(GroupNotEmptyException.class, group::validateDeleteGroup);
group.transitionTo(PREPARING_REBALANCE);
group.transitionTo(EMPTY);
assertDoesNotThrow(group::validateDeleteGroup);
group.transitionTo(DEAD);
assertThrows(GroupIdNotFoundException.class, group::validateDeleteGroup);
}
private void assertState(GenericGroup group, GenericGroupState targetState) {
Set<GenericGroupState> otherStates = new HashSet<>();
otherStates.add(STABLE);

Loading…
Cancel
Save