From 1ed6da7cc8eb2231f73509c907c2e67af2f249d2 Mon Sep 17 00:00:00 2001 From: Vahid Hashemian Date: Wed, 31 Jan 2018 13:23:12 -0800 Subject: [PATCH] KAFKA-6275: Add DeleteGroups API (KIP-229) (#4479) Reviewers: Manikumar Reddy O , Jason Gustafson --- .../errors/GroupIdNotFoundException.java | 31 +++ .../common/errors/GroupNotEmptyException.java | 31 +++ .../apache/kafka/common/protocol/ApiKeys.java | 5 +- .../apache/kafka/common/protocol/Errors.java | 24 +- .../common/requests/AbstractRequest.java | 2 + .../common/requests/AbstractResponse.java | 2 + .../common/requests/DeleteGroupsRequest.java | 117 ++++++++ .../common/requests/DeleteGroupsResponse.java | 129 +++++++++ .../common/requests/RequestResponseTest.java | 13 + .../main/scala/kafka/admin/AclCommand.scala | 2 +- .../main/scala/kafka/admin/AdminClient.scala | 43 +++ .../main/scala/kafka/admin/AdminUtils.scala | 3 +- .../kafka/admin/ConsumerGroupCommand.scala | 84 ++++-- .../src/main/scala/kafka/api/ApiVersion.scala | 10 +- .../coordinator/group/GroupCoordinator.scala | 48 +++- .../group/GroupMetadataManager.scala | 17 +- .../main/scala/kafka/server/KafkaApis.scala | 20 +- .../kafka/api/AuthorizerIntegrationTest.scala | 40 ++- .../admin/DeleteConsumerGroupsTest.scala | 251 ++++++++++++++++++ .../group/GroupCoordinatorTest.scala | 88 ++++++ .../group/GroupMetadataManagerTest.scala | 20 ++ .../unit/kafka/server/RequestQuotaTest.scala | 8 +- 22 files changed, 941 insertions(+), 47 deletions(-) create mode 100644 clients/src/main/java/org/apache/kafka/common/errors/GroupIdNotFoundException.java create mode 100644 clients/src/main/java/org/apache/kafka/common/errors/GroupNotEmptyException.java create mode 100644 clients/src/main/java/org/apache/kafka/common/requests/DeleteGroupsRequest.java create mode 100644 clients/src/main/java/org/apache/kafka/common/requests/DeleteGroupsResponse.java create mode 100644 core/src/test/scala/unit/kafka/admin/DeleteConsumerGroupsTest.scala diff --git a/clients/src/main/java/org/apache/kafka/common/errors/GroupIdNotFoundException.java b/clients/src/main/java/org/apache/kafka/common/errors/GroupIdNotFoundException.java new file mode 100644 index 00000000000..1ff30f14fc0 --- /dev/null +++ b/clients/src/main/java/org/apache/kafka/common/errors/GroupIdNotFoundException.java @@ -0,0 +1,31 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.common.errors; + +public class GroupIdNotFoundException extends ApiException { + private final String groupId; + + public GroupIdNotFoundException(String groupId) { + super("The group id " + groupId + " was not found"); + this.groupId = groupId; + } + + public String groupId() { + return groupId; + } + +} diff --git a/clients/src/main/java/org/apache/kafka/common/errors/GroupNotEmptyException.java b/clients/src/main/java/org/apache/kafka/common/errors/GroupNotEmptyException.java new file mode 100644 index 00000000000..264e613719c --- /dev/null +++ b/clients/src/main/java/org/apache/kafka/common/errors/GroupNotEmptyException.java @@ -0,0 +1,31 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.common.errors; + +public class GroupNotEmptyException extends ApiException { + private final String groupId; + + public GroupNotEmptyException(String groupId) { + super("The group " + groupId + " is not empty"); + this.groupId = groupId; + } + + public String groupId() { + return groupId; + } + +} diff --git a/clients/src/main/java/org/apache/kafka/common/protocol/ApiKeys.java b/clients/src/main/java/org/apache/kafka/common/protocol/ApiKeys.java index b408e80b176..e0cdfd9c573 100644 --- a/clients/src/main/java/org/apache/kafka/common/protocol/ApiKeys.java +++ b/clients/src/main/java/org/apache/kafka/common/protocol/ApiKeys.java @@ -43,6 +43,8 @@ import org.apache.kafka.common.requests.CreateTopicsRequest; import org.apache.kafka.common.requests.CreateTopicsResponse; import org.apache.kafka.common.requests.DeleteAclsRequest; import org.apache.kafka.common.requests.DeleteAclsResponse; +import org.apache.kafka.common.requests.DeleteGroupsRequest; +import org.apache.kafka.common.requests.DeleteGroupsResponse; import org.apache.kafka.common.requests.DeleteRecordsRequest; import org.apache.kafka.common.requests.DeleteRecordsResponse; import org.apache.kafka.common.requests.DeleteTopicsRequest; @@ -183,7 +185,8 @@ public enum ApiKeys { CREATE_DELEGATION_TOKEN(38, "CreateDelegationToken", CreateDelegationTokenRequest.schemaVersions(), CreateDelegationTokenResponse.schemaVersions()), RENEW_DELEGATION_TOKEN(39, "RenewDelegationToken", RenewDelegationTokenRequest.schemaVersions(), RenewDelegationTokenResponse.schemaVersions()), EXPIRE_DELEGATION_TOKEN(40, "ExpireDelegationToken", ExpireDelegationTokenRequest.schemaVersions(), ExpireDelegationTokenResponse.schemaVersions()), - DESCRIBE_DELEGATION_TOKEN(41, "DescribeDelegationToken", DescribeDelegationTokenRequest.schemaVersions(), DescribeDelegationTokenResponse.schemaVersions()); + DESCRIBE_DELEGATION_TOKEN(41, "DescribeDelegationToken", DescribeDelegationTokenRequest.schemaVersions(), DescribeDelegationTokenResponse.schemaVersions()), + DELETE_GROUPS(42, "DeleteGroups", DeleteGroupsRequest.schemaVersions(), DeleteGroupsResponse.schemaVersions()); private static final ApiKeys[] ID_TO_TYPE; private static final int MIN_API_KEY = 0; diff --git a/clients/src/main/java/org/apache/kafka/common/protocol/Errors.java b/clients/src/main/java/org/apache/kafka/common/protocol/Errors.java index bd5b8001171..e2b8aeaf709 100644 --- a/clients/src/main/java/org/apache/kafka/common/protocol/Errors.java +++ b/clients/src/main/java/org/apache/kafka/common/protocol/Errors.java @@ -17,8 +17,6 @@ package org.apache.kafka.common.protocol; import org.apache.kafka.common.errors.ApiException; -import org.apache.kafka.common.errors.InvalidPrincipalTypeException; -import org.apache.kafka.common.errors.SaslAuthenticationException; import org.apache.kafka.common.errors.BrokerNotAvailableException; import org.apache.kafka.common.errors.ClusterAuthorizationException; import org.apache.kafka.common.errors.ConcurrentTransactionsException; @@ -26,12 +24,15 @@ import org.apache.kafka.common.errors.ControllerMovedException; import org.apache.kafka.common.errors.CoordinatorLoadInProgressException; import org.apache.kafka.common.errors.CoordinatorNotAvailableException; import org.apache.kafka.common.errors.CorruptRecordException; -import org.apache.kafka.common.errors.LogDirNotFoundException; import org.apache.kafka.common.errors.DuplicateSequenceException; +import org.apache.kafka.common.errors.DelegationTokenAuthorizationException; import org.apache.kafka.common.errors.DelegationTokenDisabledException; +import org.apache.kafka.common.errors.DelegationTokenExpiredException; import org.apache.kafka.common.errors.DelegationTokenNotFoundException; import org.apache.kafka.common.errors.DelegationTokenOwnerMismatchException; import org.apache.kafka.common.errors.GroupAuthorizationException; +import org.apache.kafka.common.errors.GroupIdNotFoundException; +import org.apache.kafka.common.errors.GroupNotEmptyException; import org.apache.kafka.common.errors.IllegalGenerationException; import org.apache.kafka.common.errors.IllegalSaslStateException; import org.apache.kafka.common.errors.InconsistentGroupProtocolException; @@ -41,6 +42,7 @@ import org.apache.kafka.common.errors.InvalidFetchSizeException; import org.apache.kafka.common.errors.InvalidGroupIdException; import org.apache.kafka.common.errors.InvalidPartitionsException; import org.apache.kafka.common.errors.InvalidPidMappingException; +import org.apache.kafka.common.errors.InvalidPrincipalTypeException; import org.apache.kafka.common.errors.InvalidReplicaAssignmentException; import org.apache.kafka.common.errors.InvalidReplicationFactorException; import org.apache.kafka.common.errors.InvalidRequestException; @@ -52,6 +54,7 @@ import org.apache.kafka.common.errors.InvalidTxnStateException; import org.apache.kafka.common.errors.InvalidTxnTimeoutException; import org.apache.kafka.common.errors.KafkaStorageException; import org.apache.kafka.common.errors.LeaderNotAvailableException; +import org.apache.kafka.common.errors.LogDirNotFoundException; import org.apache.kafka.common.errors.NetworkException; import org.apache.kafka.common.errors.NotControllerException; import org.apache.kafka.common.errors.NotCoordinatorException; @@ -70,10 +73,9 @@ import org.apache.kafka.common.errors.RecordBatchTooLargeException; import org.apache.kafka.common.errors.RecordTooLargeException; import org.apache.kafka.common.errors.ReplicaNotAvailableException; import org.apache.kafka.common.errors.RetriableException; +import org.apache.kafka.common.errors.SaslAuthenticationException; import org.apache.kafka.common.errors.SecurityDisabledException; import org.apache.kafka.common.errors.TimeoutException; -import org.apache.kafka.common.errors.DelegationTokenAuthorizationException; -import org.apache.kafka.common.errors.DelegationTokenExpiredException; import org.apache.kafka.common.errors.TopicAuthorizationException; import org.apache.kafka.common.errors.TopicExistsException; import org.apache.kafka.common.errors.TransactionalIdAuthorizationException; @@ -594,6 +596,18 @@ public enum Errors { public ApiException build(String message) { return new InvalidPrincipalTypeException(message); } + }), + NON_EMPTY_GROUP(68, "The group is not empty", new ApiExceptionBuilder() { + @Override + public ApiException build(String message) { + return new GroupNotEmptyException(message); + } + }), + GROUP_ID_NOT_FOUND(69, "The group id does not exist", new ApiExceptionBuilder() { + @Override + public ApiException build(String message) { + return new GroupIdNotFoundException(message); + } }); private interface ApiExceptionBuilder { diff --git a/clients/src/main/java/org/apache/kafka/common/requests/AbstractRequest.java b/clients/src/main/java/org/apache/kafka/common/requests/AbstractRequest.java index cd213d9c1e7..d2b93c4590e 100644 --- a/clients/src/main/java/org/apache/kafka/common/requests/AbstractRequest.java +++ b/clients/src/main/java/org/apache/kafka/common/requests/AbstractRequest.java @@ -222,6 +222,8 @@ public abstract class AbstractRequest extends AbstractRequestResponse { return new ExpireDelegationTokenRequest(struct, apiVersion); case DESCRIBE_DELEGATION_TOKEN: return new DescribeDelegationTokenRequest(struct, apiVersion); + case DELETE_GROUPS: + return new DeleteGroupsRequest(struct, apiVersion); default: throw new AssertionError(String.format("ApiKey %s is not currently handled in `parseRequest`, the " + "code should be updated to do so.", apiKey)); diff --git a/clients/src/main/java/org/apache/kafka/common/requests/AbstractResponse.java b/clients/src/main/java/org/apache/kafka/common/requests/AbstractResponse.java index fb012989536..608f6c33746 100644 --- a/clients/src/main/java/org/apache/kafka/common/requests/AbstractResponse.java +++ b/clients/src/main/java/org/apache/kafka/common/requests/AbstractResponse.java @@ -154,6 +154,8 @@ public abstract class AbstractResponse extends AbstractRequestResponse { return new ExpireDelegationTokenResponse(struct); case DESCRIBE_DELEGATION_TOKEN: return new DescribeDelegationTokenResponse(struct); + case DELETE_GROUPS: + return new DeleteGroupsResponse(struct); default: throw new AssertionError(String.format("ApiKey %s is not currently handled in `parseResponse`, the " + "code should be updated to do so.", apiKey)); diff --git a/clients/src/main/java/org/apache/kafka/common/requests/DeleteGroupsRequest.java b/clients/src/main/java/org/apache/kafka/common/requests/DeleteGroupsRequest.java new file mode 100644 index 00000000000..bda6b33a18b --- /dev/null +++ b/clients/src/main/java/org/apache/kafka/common/requests/DeleteGroupsRequest.java @@ -0,0 +1,117 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.common.requests; + +import org.apache.kafka.common.protocol.ApiKeys; +import org.apache.kafka.common.protocol.Errors; +import org.apache.kafka.common.protocol.types.ArrayOf; +import org.apache.kafka.common.protocol.types.Field; +import org.apache.kafka.common.protocol.types.Schema; +import org.apache.kafka.common.protocol.types.Struct; +import org.apache.kafka.common.utils.Utils; + +import java.nio.ByteBuffer; +import java.util.HashMap; +import java.util.HashSet; +import java.util.Map; +import java.util.Set; + +import static org.apache.kafka.common.protocol.types.Type.STRING; + +public class DeleteGroupsRequest extends AbstractRequest { + private static final String GROUPS_KEY_NAME = "groups"; + + /* DeleteGroups api */ + private static final Schema DELETE_GROUPS_REQUEST_V0 = new Schema( + new Field(GROUPS_KEY_NAME, new ArrayOf(STRING), "An array of groups to be deleted.")); + + public static Schema[] schemaVersions() { + return new Schema[]{DELETE_GROUPS_REQUEST_V0}; + } + + private final Set groups; + + public static class Builder extends AbstractRequest.Builder { + private final Set groups; + + public Builder(Set groups) { + super(ApiKeys.DELETE_GROUPS); + this.groups = groups; + } + + @Override + public DeleteGroupsRequest build(short version) { + return new DeleteGroupsRequest(groups, version); + } + + @Override + public String toString() { + StringBuilder bld = new StringBuilder(); + bld.append("(type=DeleteGroupsRequest"). + append(", groups=(").append(Utils.join(groups, ", ")).append(")"). + append(")"); + return bld.toString(); + } + } + + private DeleteGroupsRequest(Set groups, short version) { + super(version); + this.groups = groups; + } + + public DeleteGroupsRequest(Struct struct, short version) { + super(version); + Object[] groupsArray = struct.getArray(GROUPS_KEY_NAME); + Set groups = new HashSet<>(groupsArray.length); + for (Object group : groupsArray) + groups.add((String) group); + + this.groups = groups; + } + + @Override + protected Struct toStruct() { + Struct struct = new Struct(ApiKeys.DELETE_GROUPS.requestSchema(version())); + struct.set(GROUPS_KEY_NAME, groups.toArray()); + return struct; + } + + @Override + public AbstractResponse getErrorResponse(int throttleTimeMs, Throwable e) { + Errors error = Errors.forException(e); + Map groupErrors = new HashMap<>(groups.size()); + for (String group : groups) + groupErrors.put(group, error); + + switch (version()) { + case 0: + return new DeleteGroupsResponse(throttleTimeMs, groupErrors); + default: + throw new IllegalArgumentException(String.format("Version %d is not valid. Valid versions for %s are 0 to %d", + version(), ApiKeys.DELETE_GROUPS.name, ApiKeys.DELETE_GROUPS.latestVersion())); + } + } + + public Set groups() { + return groups; + } + + public static DeleteGroupsRequest parse(ByteBuffer buffer, short version) { + return new DeleteGroupsRequest(ApiKeys.DELETE_GROUPS.parseRequest(version, buffer), version); + } + +} diff --git a/clients/src/main/java/org/apache/kafka/common/requests/DeleteGroupsResponse.java b/clients/src/main/java/org/apache/kafka/common/requests/DeleteGroupsResponse.java new file mode 100644 index 00000000000..d97bb0d1156 --- /dev/null +++ b/clients/src/main/java/org/apache/kafka/common/requests/DeleteGroupsResponse.java @@ -0,0 +1,129 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.common.requests; + +import org.apache.kafka.common.protocol.ApiKeys; +import org.apache.kafka.common.protocol.Errors; +import org.apache.kafka.common.protocol.types.ArrayOf; +import org.apache.kafka.common.protocol.types.Field; +import org.apache.kafka.common.protocol.types.Schema; +import org.apache.kafka.common.protocol.types.Struct; + +import java.nio.ByteBuffer; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +import static org.apache.kafka.common.protocol.CommonFields.ERROR_CODE; +import static org.apache.kafka.common.protocol.CommonFields.GROUP_ID; +import static org.apache.kafka.common.protocol.CommonFields.THROTTLE_TIME_MS; + +public class DeleteGroupsResponse extends AbstractResponse { + private static final String GROUP_ERROR_CODES_KEY_NAME = "group_error_codes"; + + private static final Schema GROUP_ERROR_CODE = new Schema( + GROUP_ID, + ERROR_CODE); + + private static final Schema DELETE_GROUPS_RESPONSE_V0 = new Schema( + THROTTLE_TIME_MS, + new Field(GROUP_ERROR_CODES_KEY_NAME, new ArrayOf(GROUP_ERROR_CODE), "An array of per group error codes.")); + + public static Schema[] schemaVersions() { + return new Schema[]{DELETE_GROUPS_RESPONSE_V0}; + } + + + /** + * Possible error codes: + * + * COORDINATOR_LOAD_IN_PROGRESS (14) + * COORDINATOR_NOT_AVAILABLE(15) + * NOT_COORDINATOR (16) + * INVALID_GROUP_ID(24) + * GROUP_AUTHORIZATION_FAILED(30) + * NON_EMPTY_GROUP(68) + * GROUP_ID_NOT_FOUND(69) + */ + + private final Map errors; + private final int throttleTimeMs; + + public DeleteGroupsResponse(Map errors) { + this(DEFAULT_THROTTLE_TIME, errors); + } + + public DeleteGroupsResponse(int throttleTimeMs, Map errors) { + this.throttleTimeMs = throttleTimeMs; + this.errors = errors; + } + + public DeleteGroupsResponse(Struct struct) { + this.throttleTimeMs = struct.getOrElse(THROTTLE_TIME_MS, DEFAULT_THROTTLE_TIME); + Object[] groupErrorCodesStructs = struct.getArray(GROUP_ERROR_CODES_KEY_NAME); + Map errors = new HashMap<>(); + for (Object groupErrorCodeStructObj : groupErrorCodesStructs) { + Struct groupErrorCodeStruct = (Struct) groupErrorCodeStructObj; + String group = groupErrorCodeStruct.get(GROUP_ID); + Errors error = Errors.forCode(groupErrorCodeStruct.get(ERROR_CODE)); + errors.put(group, error); + } + + this.errors = errors; + } + + @Override + protected Struct toStruct(short version) { + Struct struct = new Struct(ApiKeys.DELETE_GROUPS.responseSchema(version)); + struct.setIfExists(THROTTLE_TIME_MS, throttleTimeMs); + List groupErrorCodeStructs = new ArrayList<>(errors.size()); + for (Map.Entry groupError : errors.entrySet()) { + Struct groupErrorCodeStruct = struct.instance(GROUP_ERROR_CODES_KEY_NAME); + groupErrorCodeStruct.set(GROUP_ID, groupError.getKey()); + groupErrorCodeStruct.set(ERROR_CODE, groupError.getValue().code()); + groupErrorCodeStructs.add(groupErrorCodeStruct); + } + struct.set(GROUP_ERROR_CODES_KEY_NAME, groupErrorCodeStructs.toArray()); + return struct; + } + + public int throttleTimeMs() { + return throttleTimeMs; + } + + public Map errors() { + return errors; + } + + public boolean hasError(String group) { + return errors.containsKey(group) && errors.get(group) != Errors.NONE; + } + + public Errors get(String group) { + return errors.get(group); + } + + @Override + public Map errorCounts() { + return errorCounts(errors); + } + + public static DeleteGroupsResponse parse(ByteBuffer buffer, short version) { + return new DeleteGroupsResponse(ApiKeys.DELETE_GROUPS.responseSchema(version).read(buffer)); + } +} diff --git a/clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java b/clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java index 27406167361..b5420b5195d 100644 --- a/clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java +++ b/clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java @@ -112,6 +112,9 @@ public class RequestResponseTest { checkRequest(createDescribeGroupRequest()); checkErrorResponse(createDescribeGroupRequest(), new UnknownServerException()); checkResponse(createDescribeGroupResponse(), 0); + checkRequest(createDeleteGroupsRequest()); + checkErrorResponse(createDeleteGroupsRequest(), new UnknownServerException()); + checkResponse(createDeleteGroupsResponse(), 0); checkRequest(createListOffsetRequest(1)); checkErrorResponse(createListOffsetRequest(1), new UnknownServerException()); checkResponse(createListOffsetResponse(1), 1); @@ -641,6 +644,16 @@ public class RequestResponseTest { return new LeaveGroupResponse(Errors.NONE); } + private DeleteGroupsRequest createDeleteGroupsRequest() { + return new DeleteGroupsRequest.Builder(Collections.singleton("test-group")).build(); + } + + private DeleteGroupsResponse createDeleteGroupsResponse() { + Map result = new HashMap<>(); + result.put("test-group", Errors.NONE); + return new DeleteGroupsResponse(result); + } + @SuppressWarnings("deprecation") private ListOffsetRequest createListOffsetRequest(int version) { if (version == 0) { diff --git a/core/src/main/scala/kafka/admin/AclCommand.scala b/core/src/main/scala/kafka/admin/AclCommand.scala index fa6333c9533..6dd227223e0 100644 --- a/core/src/main/scala/kafka/admin/AclCommand.scala +++ b/core/src/main/scala/kafka/admin/AclCommand.scala @@ -32,7 +32,7 @@ object AclCommand extends Logging { val Newline = scala.util.Properties.lineSeparator val ResourceTypeToValidOperations = Map[ResourceType, Set[Operation]] ( Topic -> Set(Read, Write, Describe, Delete, DescribeConfigs, AlterConfigs, All), - Group -> Set(Read, Describe, All), + Group -> Set(Read, Describe, Delete, All), Cluster -> Set(Create, ClusterAction, DescribeConfigs, AlterConfigs, IdempotentWrite, Alter, Describe, All), TransactionalId -> Set(Describe, Write, All), DelegationToken -> Set(Describe, All) diff --git a/core/src/main/scala/kafka/admin/AdminClient.scala b/core/src/main/scala/kafka/admin/AdminClient.scala index c0921694efb..772277f1021 100644 --- a/core/src/main/scala/kafka/admin/AdminClient.scala +++ b/core/src/main/scala/kafka/admin/AdminClient.scala @@ -369,6 +369,49 @@ class AdminClient(val time: Time, (response.error, response.tokens().asScala.toList) } + def deleteConsumerGroups(groups: List[String]): Map[String, Errors] = { + + def coordinatorLookup(group: String): Either[Node, Errors] = { + try { + Left(findCoordinator(group)) + } catch { + case e: Throwable => + if (e.isInstanceOf[TimeoutException]) + Right(Errors.COORDINATOR_NOT_AVAILABLE) + else + Right(Errors.forException(e)) + } + } + + var errors: Map[String, Errors] = Map() + var groupsPerCoordinator: Map[Node, List[String]] = Map() + + groups.foreach { group => + coordinatorLookup(group) match { + case Right(error) => + errors += group -> error + case Left(coordinator) => + groupsPerCoordinator.get(coordinator) match { + case Some(gList) => + val gListNew = group :: gList + groupsPerCoordinator += coordinator -> gListNew + case None => + groupsPerCoordinator += coordinator -> List(group) + } + } + } + + groupsPerCoordinator.foreach { case (coordinator, groups) => + val responseBody = send(coordinator, ApiKeys.DELETE_GROUPS, new DeleteGroupsRequest.Builder(groups.toSet.asJava)) + val response = responseBody.asInstanceOf[DeleteGroupsResponse] + groups.foreach { + case group if response.hasError(group) => errors += group -> response.errors.get(group) + case group => errors += group -> Errors.NONE + } + } + + errors + } def close() { running = false diff --git a/core/src/main/scala/kafka/admin/AdminUtils.scala b/core/src/main/scala/kafka/admin/AdminUtils.scala index f21b94281c3..2ae03aa65f2 100644 --- a/core/src/main/scala/kafka/admin/AdminUtils.scala +++ b/core/src/main/scala/kafka/admin/AdminUtils.scala @@ -429,9 +429,10 @@ object AdminUtils extends Logging with AdminUtilities { * @param topic Topic of the consumer group information we wish to delete */ @deprecated("This method has been deprecated and will be removed in a future release.", "0.11.0.0") - def deleteAllConsumerGroupInfoForTopicInZK(zkUtils: ZkUtils, topic: String) { + def deleteAllConsumerGroupInfoForTopicInZK(zkUtils: ZkUtils, topic: String): Set[String] = { val groups = zkUtils.getAllConsumerGroupsForTopic(topic) groups.foreach(group => deleteConsumerGroupInfoForTopicInZK(zkUtils, group, topic)) + groups } def topicExists(zkUtils: ZkUtils, topic: String): Boolean = diff --git a/core/src/main/scala/kafka/admin/ConsumerGroupCommand.scala b/core/src/main/scala/kafka/admin/ConsumerGroupCommand.scala index 3aa821cf89c..77c5b4d5d50 100755 --- a/core/src/main/scala/kafka/admin/ConsumerGroupCommand.scala +++ b/core/src/main/scala/kafka/admin/ConsumerGroupCommand.scala @@ -75,12 +75,8 @@ object ConsumerGroupCommand extends Logging { consumerGroupService.listGroups().foreach(println(_)) else if (opts.options.has(opts.describeOpt)) consumerGroupService.describeGroup() - else if (opts.options.has(opts.deleteOpt)) { - consumerGroupService match { - case service: ZkConsumerGroupService => service.deleteGroups() - case _ => throw new IllegalStateException(s"delete is not supported for $consumerGroupService.") - } - } + else if (opts.options.has(opts.deleteOpt)) + consumerGroupService.deleteGroups() else if (opts.options.has(opts.resetOffsetsOpt)) { val offsetsToReset = consumerGroupService.resetOffsets() if (opts.options.has(opts.exportOpt)) { @@ -344,6 +340,8 @@ object ConsumerGroupCommand extends Logging { def resetOffsets(): Map[TopicPartition, OffsetAndMetadata] = throw new UnsupportedOperationException def exportOffsetsToReset(assignmentsToReset: Map[TopicPartition, OffsetAndMetadata]): String = throw new UnsupportedOperationException + + def deleteGroups(): Map[String, Errors] } @deprecated("This class has been deprecated and will be removed in a future release.", "0.11.0.0") @@ -362,13 +360,15 @@ object ConsumerGroupCommand extends Logging { zkUtils.getConsumerGroups().toList } - def deleteGroups() { + def deleteGroups(): Map[String, Errors] = { if (opts.options.has(opts.groupOpt) && opts.options.has(opts.topicOpt)) - deleteForTopic() + deleteGroupsInfoForTopic() else if (opts.options.has(opts.groupOpt)) - deleteForGroup() + deleteGroupsInfo() else if (opts.options.has(opts.topicOpt)) - deleteAllForTopic() + deleteAllGroupsInfoForTopic() + + Map() } def collectGroupOffsets(): (Option[String], Option[Seq[PartitionAssignmentState]]) = { @@ -476,45 +476,57 @@ object ConsumerGroupCommand extends Logging { }.toMap } - private def deleteForGroup() { + private def deleteGroupsInfo(): Map[String, Errors] = { val groups = opts.options.valuesOf(opts.groupOpt) - groups.asScala.foreach { group => + groups.asScala.map { group => try { - if (AdminUtils.deleteConsumerGroupInZK(zkUtils, group)) + if (AdminUtils.deleteConsumerGroupInZK(zkUtils, group)) { println(s"Deleted all consumer group information for group '$group' in zookeeper.") - else + group -> Errors.NONE + } + else { printError(s"Delete for group '$group' failed because its consumers are still active.") + group -> Errors.NON_EMPTY_GROUP + } } catch { case e: ZkNoNodeException => printError(s"Delete for group '$group' failed because group does not exist.", Some(e)) + group -> Errors.forException(e) } - } + }.toMap } - private def deleteForTopic() { + private def deleteGroupsInfoForTopic(): Map[String, Errors] = { val groups = opts.options.valuesOf(opts.groupOpt) val topic = opts.options.valueOf(opts.topicOpt) Topic.validate(topic) - groups.asScala.foreach { group => + groups.asScala.map { group => try { - if (AdminUtils.deleteConsumerGroupInfoForTopicInZK(zkUtils, group, topic)) + if (AdminUtils.deleteConsumerGroupInfoForTopicInZK(zkUtils, group, topic)) { println(s"Deleted consumer group information for group '$group' topic '$topic' in zookeeper.") - else + group -> Errors.NONE + } + else { printError(s"Delete for group '$group' topic '$topic' failed because its consumers are still active.") + group -> Errors.NON_EMPTY_GROUP + } } catch { case e: ZkNoNodeException => printError(s"Delete for group '$group' topic '$topic' failed because group does not exist.", Some(e)) + group -> Errors.forException(e) } - } + }.toMap } - private def deleteAllForTopic() { + private def deleteAllGroupsInfoForTopic(): Map[String, Errors] = { val topic = opts.options.valueOf(opts.topicOpt) Topic.validate(topic) - AdminUtils.deleteAllConsumerGroupInfoForTopicInZK(zkUtils, topic) + val deletedGroups = AdminUtils.deleteAllConsumerGroupInfoForTopicInZK(zkUtils, topic) println(s"Deleted consumer group information for all inactive consumer groups for topic '$topic' in zookeeper.") + deletedGroups.map(_ -> Errors.NONE).toMap + } private def getZkConsumer(brokerId: Int): Option[SimpleConsumer] = { @@ -830,6 +842,27 @@ object ConsumerGroupCommand extends Logging { rows.foldRight("")(_ + "\n" + _) } + override def deleteGroups(): Map[String, Errors] = { + val groupsToDelete = opts.options.valuesOf(opts.groupOpt).asScala.toList + val result = adminClient.deleteConsumerGroups(groupsToDelete) + val successfullyDeleted = result.filter { + case (_, error) => error == Errors.NONE + }.keySet + + if (successfullyDeleted.size == result.size) + println(s"Deletion of requested consumer groups (${successfullyDeleted.mkString("'", ", ", "'")}) was successful.") + else { + printError("Deletion of some consumer groups failed:") + result.foreach { + case (group, error) if error != Errors.NONE => println(s"* Group '$group' could not be deleted due to: ${error.toString}") + case _ => // no need to print successful deletions individually + } + if (successfullyDeleted.nonEmpty) + println(s"\nThese consumer groups were deleted successfully: ${successfullyDeleted.mkString("'", ", ", "'")}") + } + + result + } } sealed trait LogOffsetResult @@ -987,10 +1020,9 @@ object ConsumerGroupCommand extends Logging { s"The new consumer is used by default if the $bootstrapServerOpt option is provided.") } - if (options.has(deleteOpt)) - CommandLineUtils.printUsageAndDie(parser, s"Option $deleteOpt is only valid with $zkConnectOpt. Note that " + - "there is no need to delete group metadata for the new consumer as the group is deleted when the last " + - "committed offset for that group expires.") + if (options.has(deleteOpt) && options.has(topicOpt)) + CommandLineUtils.printUsageAndDie(parser, s"When deleting a consumer group the option $topicOpt is only " + + s"valid with $zkConnectOpt. The new consumer does not support topic-specific offset deletion from a consumer group.") } if (describeOptPresent) diff --git a/core/src/main/scala/kafka/api/ApiVersion.scala b/core/src/main/scala/kafka/api/ApiVersion.scala index e509fc516ab..f95fb892799 100644 --- a/core/src/main/scala/kafka/api/ApiVersion.scala +++ b/core/src/main/scala/kafka/api/ApiVersion.scala @@ -72,7 +72,9 @@ object ApiVersion { "0.11.0" -> KAFKA_0_11_0_IV2, // Introduced LeaderAndIsrRequest V1, UpdateMetadataRequest V4 and FetchRequest V6 via KIP-112 "1.0-IV0" -> KAFKA_1_0_IV0, - "1.0" -> KAFKA_1_0_IV0 + "1.0" -> KAFKA_1_0_IV0, + // Introduced DeleteGroupsRequest V0 via KIP-229 + "1.1-IV0" -> KAFKA_1_1_IV0 ) private val versionPattern = "\\.".r @@ -184,3 +186,9 @@ case object KAFKA_1_0_IV0 extends ApiVersion { val id: Int = 13 } +case object KAFKA_1_1_IV0 extends ApiVersion { + val version: String = "1.1-IV0" + val messageFormatVersion: Byte = RecordBatch.MAGIC_VALUE_V2 + val id: Int = 14 +} + diff --git a/core/src/main/scala/kafka/coordinator/group/GroupCoordinator.scala b/core/src/main/scala/kafka/coordinator/group/GroupCoordinator.scala index ee4fc4b78e9..5ae855232eb 100644 --- a/core/src/main/scala/kafka/coordinator/group/GroupCoordinator.scala +++ b/core/src/main/scala/kafka/coordinator/group/GroupCoordinator.scala @@ -338,6 +338,52 @@ class GroupCoordinator(val brokerId: Int, } } + def handleDeleteGroups(groupIds: Set[String]): Map[String, Errors] = { + if (!isActive.get) { + groupIds.map(_ -> Errors.COORDINATOR_NOT_AVAILABLE).toMap + } else { + var groupErrors: Map[String, Errors] = Map() + var eligibleGroups: Seq[GroupMetadata] = Seq() + + groupIds.foreach { groupId => + if (!validGroupId(groupId)) + groupErrors += groupId -> Errors.INVALID_GROUP_ID + else if (!isCoordinatorForGroup(groupId)) + groupErrors += groupId -> Errors.NOT_COORDINATOR + else if (isCoordinatorLoadInProgress(groupId)) + groupErrors += groupId -> Errors.COORDINATOR_LOAD_IN_PROGRESS + else { + groupManager.getGroup(groupId) match { + case None => + groupErrors += groupId -> + (if (groupManager.groupNotExists(groupId)) Errors.GROUP_ID_NOT_FOUND else Errors.NOT_COORDINATOR) + case Some(group) => + group.inLock { + group.currentState match { + case Dead => + groupErrors += groupId -> + (if (groupManager.groupNotExists(groupId)) Errors.GROUP_ID_NOT_FOUND else Errors.NOT_COORDINATOR) + case Empty => + group.transitionTo(Dead) + eligibleGroups :+= group + case _ => + groupErrors += groupId -> Errors.NON_EMPTY_GROUP + } + } + } + } + } + + if (eligibleGroups.nonEmpty) { + groupManager.cleanupGroupMetadata(None, eligibleGroups, Long.MaxValue) + groupErrors ++= eligibleGroups.map(_.groupId -> Errors.NONE).toMap + info(s"The following groups were deleted: ${eligibleGroups.map(_.groupId).mkString(", ")}") + } + + groupErrors + } + } + def handleHeartbeat(groupId: String, memberId: String, generationId: Int, @@ -522,7 +568,7 @@ class GroupCoordinator(val brokerId: Int, } def handleDeletedPartitions(topicPartitions: Seq[TopicPartition]) { - groupManager.cleanupGroupMetadata(Some(topicPartitions)) + groupManager.cleanupGroupMetadata(Some(topicPartitions), groupManager.currentGroups, time.milliseconds()) } private def validateGroup(groupId: String): Option[Errors] = { diff --git a/core/src/main/scala/kafka/coordinator/group/GroupMetadataManager.scala b/core/src/main/scala/kafka/coordinator/group/GroupMetadataManager.scala index 65996985f46..3391fc3a221 100644 --- a/core/src/main/scala/kafka/coordinator/group/GroupMetadataManager.scala +++ b/core/src/main/scala/kafka/coordinator/group/GroupMetadataManager.scala @@ -158,6 +158,13 @@ class GroupMetadataManager(brokerId: Int, def isLoading(): Boolean = inLock(partitionLock) { loadingPartitions.nonEmpty } + // return true iff group is owned and the group doesn't exist + def groupNotExists(groupId: String) = inLock(partitionLock) { + isGroupLocal(groupId) && getGroup(groupId).forall { group => + group.inLock(group.is(Dead)) + } + } + // visible for testing private[group] def isGroupOpenForProducer(producerId: Long, groupId: String) = openGroupsForProducer.get(producerId) match { case Some(groups) => @@ -706,14 +713,16 @@ class GroupMetadataManager(brokerId: Int, // visible for testing private[group] def cleanupGroupMetadata(): Unit = { - cleanupGroupMetadata(None) + cleanupGroupMetadata(None, groupMetadataCache.values, time.milliseconds()) } - def cleanupGroupMetadata(deletedTopicPartitions: Option[Seq[TopicPartition]]) { - val startMs = time.milliseconds() + def cleanupGroupMetadata(deletedTopicPartitions: Option[Seq[TopicPartition]], + groups: Iterable[GroupMetadata], + startMs: Long) { var offsetsRemoved = 0 - groupMetadataCache.foreach { case (groupId, group) => + groups.foreach { group => + val groupId = group.groupId val (removedOffsets, groupIsDead, generation) = group.inLock { val removedOffsets = deletedTopicPartitions match { case Some(topicPartitions) => group.removeOffsets(topicPartitions) diff --git a/core/src/main/scala/kafka/server/KafkaApis.scala b/core/src/main/scala/kafka/server/KafkaApis.scala index 13f51646667..2dd6951f46d 100644 --- a/core/src/main/scala/kafka/server/KafkaApis.scala +++ b/core/src/main/scala/kafka/server/KafkaApis.scala @@ -141,7 +141,8 @@ class KafkaApis(val requestChannel: RequestChannel, case ApiKeys.CREATE_DELEGATION_TOKEN => handleCreateTokenRequest(request) case ApiKeys.RENEW_DELEGATION_TOKEN => handleRenewTokenRequest(request) case ApiKeys.EXPIRE_DELEGATION_TOKEN => handleExpireTokenRequest(request) - case ApiKeys.DESCRIBE_DELEGATION_TOKEN=> handleDescribeTokensRequest(request) + case ApiKeys.DESCRIBE_DELEGATION_TOKEN => handleDescribeTokensRequest(request) + case ApiKeys.DELETE_GROUPS => handleDeleteGroupsRequest(request) } } catch { case e: FatalExitError => throw e @@ -488,7 +489,7 @@ class KafkaApis(val requestChannel: RequestChannel, val nonExistingTopicResponseData = mutable.ArrayBuffer[(TopicPartition, FetchResponse.PartitionData)]() val authorizedRequestInfo = mutable.ArrayBuffer[(TopicPartition, FetchRequest.PartitionData)]() - if (fetchRequest.isFromFollower() && !authorize(request.session, ClusterAction, Resource.ClusterResource)) + if (fetchRequest.isFromFollower() && !authorize(request.session, ClusterAction, Resource.ClusterResource)) for (topicPartition <- fetchRequest.fetchData.asScala.keys) unauthorizedTopicResponseData += topicPartition -> new FetchResponse.PartitionData(Errors.CLUSTER_AUTHORIZATION_FAILED, FetchResponse.INVALID_HIGHWATERMARK, FetchResponse.INVALID_LAST_STABLE_OFFSET, @@ -1221,6 +1222,21 @@ class KafkaApis(val requestChannel: RequestChannel, } } + def handleDeleteGroupsRequest(request: RequestChannel.Request): Unit = { + val deleteGroupsRequest = request.body[DeleteGroupsRequest] + var groups = deleteGroupsRequest.groups.asScala.toSet + + val (authorizedGroups, unauthorizedGroups) = groups.partition { group => + authorize(request.session, Delete, new Resource(Group, group)) + } + + val groupDeletionResult = groupCoordinator.handleDeleteGroups(authorizedGroups) ++ + unauthorizedGroups.map(_ -> Errors.GROUP_AUTHORIZATION_FAILED) + + sendResponseMaybeThrottle(request, requestThrottleMs => + new DeleteGroupsResponse(requestThrottleMs, groupDeletionResult.asJava)) + } + def handleHeartbeatRequest(request: RequestChannel.Request) { val heartbeatRequest = request.body[HeartbeatRequest] diff --git a/core/src/test/scala/integration/kafka/api/AuthorizerIntegrationTest.scala b/core/src/test/scala/integration/kafka/api/AuthorizerIntegrationTest.scala index 39c1ea35328..248d219cf31 100644 --- a/core/src/test/scala/integration/kafka/api/AuthorizerIntegrationTest.scala +++ b/core/src/test/scala/integration/kafka/api/AuthorizerIntegrationTest.scala @@ -78,6 +78,7 @@ class AuthorizerIntegrationTest extends BaseRequestTest { val groupReadAcl = Map(groupResource -> Set(new Acl(KafkaPrincipal.ANONYMOUS, Allow, Acl.WildCardHost, Read))) val groupDescribeAcl = Map(groupResource -> Set(new Acl(KafkaPrincipal.ANONYMOUS, Allow, Acl.WildCardHost, Describe))) + val groupDeleteAcl = Map(groupResource -> Set(new Acl(KafkaPrincipal.ANONYMOUS, Allow, Acl.WildCardHost, Delete))) val clusterAcl = Map(Resource.ClusterResource -> Set(new Acl(KafkaPrincipal.ANONYMOUS, Allow, Acl.WildCardHost, ClusterAction))) val clusterCreateAcl = Map(Resource.ClusterResource -> Set(new Acl(KafkaPrincipal.ANONYMOUS, Allow, Acl.WildCardHost, Create))) val clusterAlterAcl = Map(Resource.ClusterResource -> Set(new Acl(KafkaPrincipal.ANONYMOUS, Allow, Acl.WildCardHost, Alter))) @@ -125,6 +126,7 @@ class AuthorizerIntegrationTest extends BaseRequestTest { ApiKeys.DESCRIBE_GROUPS -> classOf[DescribeGroupsResponse], ApiKeys.HEARTBEAT -> classOf[HeartbeatResponse], ApiKeys.LEAVE_GROUP -> classOf[LeaveGroupResponse], + ApiKeys.DELETE_GROUPS -> classOf[DeleteGroupsResponse], ApiKeys.LEADER_AND_ISR -> classOf[requests.LeaderAndIsrResponse], ApiKeys.STOP_REPLICA -> classOf[requests.StopReplicaResponse], ApiKeys.CONTROLLED_SHUTDOWN -> classOf[requests.ControlledShutdownResponse], @@ -162,6 +164,7 @@ class AuthorizerIntegrationTest extends BaseRequestTest { ApiKeys.DESCRIBE_GROUPS -> ((resp: DescribeGroupsResponse) => resp.groups.get(group).error), ApiKeys.HEARTBEAT -> ((resp: HeartbeatResponse) => resp.error), ApiKeys.LEAVE_GROUP -> ((resp: LeaveGroupResponse) => resp.error), + ApiKeys.DELETE_GROUPS -> ((resp: DeleteGroupsResponse) => resp.get(group)), ApiKeys.LEADER_AND_ISR -> ((resp: requests.LeaderAndIsrResponse) => resp.responses.asScala.find(_._1 == tp).get._2), ApiKeys.STOP_REPLICA -> ((resp: requests.StopReplicaResponse) => resp.responses.asScala.find(_._1 == tp).get._2), ApiKeys.CONTROLLED_SHUTDOWN -> ((resp: requests.ControlledShutdownResponse) => resp.error), @@ -202,6 +205,7 @@ class AuthorizerIntegrationTest extends BaseRequestTest { ApiKeys.DESCRIBE_GROUPS -> groupDescribeAcl, ApiKeys.HEARTBEAT -> groupReadAcl, ApiKeys.LEAVE_GROUP -> groupReadAcl, + ApiKeys.DELETE_GROUPS -> groupDeleteAcl, ApiKeys.LEADER_AND_ISR -> clusterAcl, ApiKeys.STOP_REPLICA -> clusterAcl, ApiKeys.CONTROLLED_SHUTDOWN -> clusterAcl, @@ -343,6 +347,8 @@ class AuthorizerIntegrationTest extends BaseRequestTest { private def leaveGroupRequest = new LeaveGroupRequest.Builder(group, "").build() + private def deleteGroupsRequest = new DeleteGroupsRequest.Builder(Set(group).asJava).build() + private def leaderAndIsrRequest = { new requests.LeaderAndIsrRequest.Builder(ApiKeys.LEADER_AND_ISR.latestVersion, brokerId, Int.MaxValue, Map(tp -> new LeaderAndIsrRequest.PartitionState(Int.MaxValue, brokerId, Int.MaxValue, List(brokerId).asJava, 2, Seq(brokerId).asJava, false)).asJava, @@ -468,7 +474,8 @@ class AuthorizerIntegrationTest extends BaseRequestTest { ApiKeys.DELETE_RECORDS -> deleteRecordsRequest, ApiKeys.ADD_PARTITIONS_TO_TXN -> addPartitionsToTxnRequest, ApiKeys.ADD_OFFSETS_TO_TXN -> addOffsetsToTxnRequest, - ApiKeys.CREATE_PARTITIONS -> createPartitionsRequest + ApiKeys.CREATE_PARTITIONS -> createPartitionsRequest, + ApiKeys.DELETE_GROUPS -> deleteGroupsRequest ) for ((key, request) <- requestKeyToRequest) { @@ -495,7 +502,7 @@ class AuthorizerIntegrationTest extends BaseRequestTest { def testFetchFollowerRequest() { val key = ApiKeys.FETCH val request = createFetchFollowerRequest - + removeAllAcls() val resources = Set(topicResource.resourceType, Resource.ClusterResource.resourceType) sendRequestAndVerifyResponseError(key, request, resources, isAuthorized = false) @@ -503,7 +510,7 @@ class AuthorizerIntegrationTest extends BaseRequestTest { val readAcls = topicReadAcl.get(topicResource).get addAndVerifyAcls(readAcls, topicResource) sendRequestAndVerifyResponseError(key, request, resources, isAuthorized = false) - + val clusterAcls = clusterAcl.get(Resource.ClusterResource).get addAndVerifyAcls(clusterAcls, Resource.ClusterResource) sendRequestAndVerifyResponseError(key, request, resources, isAuthorized = true) @@ -960,6 +967,33 @@ class AuthorizerIntegrationTest extends BaseRequestTest { consumerGroupService.close() } + @Test + def testDeleteGroupApiWithDeleteGroupAcl() { + addAndVerifyAcls(Set(new Acl(KafkaPrincipal.ANONYMOUS, Allow, Acl.WildCardHost, Read)), groupResource) + addAndVerifyAcls(Set(new Acl(KafkaPrincipal.ANONYMOUS, Allow, Acl.WildCardHost, Read)), topicResource) + addAndVerifyAcls(Set(new Acl(KafkaPrincipal.ANONYMOUS, Allow, Acl.WildCardHost, Delete)), groupResource) + this.consumers.head.assign(List(tp).asJava) + this.consumers.head.commitSync(Map(tp -> new OffsetAndMetadata(5, "")).asJava) + val result = AdminClient.createSimplePlaintext(brokerList).deleteConsumerGroups(List(group)) + assert(result.size == 1 && result.keySet.contains(group) && result.get(group).contains(Errors.NONE)) + } + + @Test + def testDeleteGroupApiWithNoDeleteGroupAcl() { + addAndVerifyAcls(Set(new Acl(KafkaPrincipal.ANONYMOUS, Allow, Acl.WildCardHost, Read)), groupResource) + addAndVerifyAcls(Set(new Acl(KafkaPrincipal.ANONYMOUS, Allow, Acl.WildCardHost, Read)), topicResource) + this.consumers.head.assign(List(tp).asJava) + this.consumers.head.commitSync(Map(tp -> new OffsetAndMetadata(5, "")).asJava) + val result = AdminClient.createSimplePlaintext(brokerList).deleteConsumerGroups(List(group)) + assert(result.size == 1 && result.keySet.contains(group) && result.get(group).contains(Errors.GROUP_AUTHORIZATION_FAILED)) + } + + @Test + def testDeleteGroupApiWithNoDeleteGroupAcl2() { + val result = AdminClient.createSimplePlaintext(brokerList).deleteConsumerGroups(List(group)) + assert(result.size == 1 && result.keySet.contains(group) && result.get(group).contains(Errors.GROUP_AUTHORIZATION_FAILED)) + } + @Test def testUnauthorizedDeleteTopicsWithoutDescribe() { val response = connectAndSend(deleteTopicsRequest, ApiKeys.DELETE_TOPICS) diff --git a/core/src/test/scala/unit/kafka/admin/DeleteConsumerGroupsTest.scala b/core/src/test/scala/unit/kafka/admin/DeleteConsumerGroupsTest.scala new file mode 100644 index 00000000000..cc236d5e8ba --- /dev/null +++ b/core/src/test/scala/unit/kafka/admin/DeleteConsumerGroupsTest.scala @@ -0,0 +1,251 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package unit.kafka.admin + +import kafka.admin.ConsumerGroupCommandTest +import kafka.utils.TestUtils +import org.apache.kafka.common.protocol.Errors +import org.junit.Assert._ +import org.junit.Test + +class DeleteConsumerGroupTest extends ConsumerGroupCommandTest { + + @Test(expected = classOf[joptsimple.OptionException]) + def testDeleteWithTopicOption() { + TestUtils.createOffsetsTopic(zkClient, servers) + val cgcArgs = Array("--bootstrap-server", brokerList, "--delete", "--group", group, "--topic") + getConsumerGroupService(cgcArgs) + fail("Expected an error due to presence of mutually exclusive options") + } + + @Test + def testDeleteCmdNonExistingGroup() { + TestUtils.createOffsetsTopic(zkClient, servers) + val missingGroup = "missing.group" + + val cgcArgs = Array("--bootstrap-server", brokerList, "--delete", "--group", missingGroup) + val service = getConsumerGroupService(cgcArgs) + + val output = TestUtils.grabConsoleOutput(service.deleteGroups()) + assertTrue(s"The expected error (${Errors.GROUP_ID_NOT_FOUND}) was not detected while deleting consumer group", + output.contains(s"Group '$missingGroup' could not be deleted due to: ${Errors.GROUP_ID_NOT_FOUND.toString}")) + } + + @Test + def testDeleteNonExistingGroup() { + TestUtils.createOffsetsTopic(zkClient, servers) + val missingGroup = "missing.group" + + // note the group to be deleted is a different (non-existing) group + val cgcArgs = Array("--bootstrap-server", brokerList, "--delete", "--group", missingGroup) + val service = getConsumerGroupService(cgcArgs) + + val result = service.deleteGroups() + assertTrue(s"The expected error (${Errors.GROUP_ID_NOT_FOUND}) was not detected while deleting consumer group", + result.size == 1 && result.keySet.contains(missingGroup) && result.get(missingGroup).contains(Errors.GROUP_ID_NOT_FOUND)) + } + + @Test + def testDeleteCmdInvalidGroupId() { + TestUtils.createOffsetsTopic(zkClient, servers) + val invalidGroupId = "" + + val cgcArgs = Array("--bootstrap-server", brokerList, "--delete", "--group", invalidGroupId) + val service = getConsumerGroupService(cgcArgs) + + val output = TestUtils.grabConsoleOutput(service.deleteGroups()) + assertTrue(s"The expected error (${Errors.INVALID_GROUP_ID}) was not detected while deleting consumer group", + output.contains(s"Group '$invalidGroupId' could not be deleted due to: ${Errors.INVALID_GROUP_ID.toString}")) + } + + @Test + def testDeleteInvalidGroupId() { + TestUtils.createOffsetsTopic(zkClient, servers) + val invalidGroupId = "" + + val cgcArgs = Array("--bootstrap-server", brokerList, "--delete", "--group", invalidGroupId) + val service = getConsumerGroupService(cgcArgs) + + val result = service.deleteGroups() + assertTrue(s"The expected error (${Errors.INVALID_GROUP_ID}) was not detected while deleting consumer group", + result.size == 1 && result.keySet.contains(invalidGroupId) && result.get(invalidGroupId).contains(Errors.INVALID_GROUP_ID)) + } + + @Test + def testDeleteCmdNonEmptyGroup() { + TestUtils.createOffsetsTopic(zkClient, servers) + + // run one consumer in the group + addConsumerGroupExecutor(numConsumers = 1) + val cgcArgs = Array("--bootstrap-server", brokerList, "--delete", "--group", group) + val service = getConsumerGroupService(cgcArgs) + + TestUtils.waitUntilTrue(() => { + service.listGroups().contains(group) + }, "The group did not initialize as expected.") + + val output = TestUtils.grabConsoleOutput(service.deleteGroups()) + assertTrue(s"The expected error (${Errors.NON_EMPTY_GROUP}) was not detected while deleting consumer group", + output.contains(s"Group '$group' could not be deleted due to: ${Errors.NON_EMPTY_GROUP}")) + } + + @Test + def testDeleteNonEmptyGroup() { + TestUtils.createOffsetsTopic(zkClient, servers) + + // run one consumer in the group + addConsumerGroupExecutor(numConsumers = 1) + val cgcArgs = Array("--bootstrap-server", brokerList, "--delete", "--group", group) + val service = getConsumerGroupService(cgcArgs) + + TestUtils.waitUntilTrue(() => { + service.listGroups().contains(group) + }, "The group did not initialize as expected.") + + val result = service.deleteGroups() + assertTrue(s"The expected error (${Errors.NON_EMPTY_GROUP}) was not detected while deleting consumer group", + result.size == 1 && result.keySet.contains(group) && result.get(group).contains(Errors.NON_EMPTY_GROUP)) + } + + @Test + def testDeleteCmdEmptyGroup() { + TestUtils.createOffsetsTopic(zkClient, servers) + + // run one consumer in the group + val executor = addConsumerGroupExecutor(numConsumers = 1) + val cgcArgs = Array("--bootstrap-server", brokerList, "--delete", "--group", group) + val service = getConsumerGroupService(cgcArgs) + + TestUtils.waitUntilTrue(() => { + service.listGroups().contains(group) + }, "The group did not initialize as expected.") + + executor.shutdown() + + TestUtils.waitUntilTrue(() => { + service.collectGroupState().state == "Empty" + }, "The group did become empty as expected.") + + val output = TestUtils.grabConsoleOutput(service.deleteGroups()) + assertTrue(s"The consumer group could not be deleted as expected", + output.contains(s"Deletion of requested consumer groups ('$group') was successful.")) + } + + @Test + def testDeleteEmptyGroup() { + TestUtils.createOffsetsTopic(zkClient, servers) + + // run one consumer in the group + val executor = addConsumerGroupExecutor(numConsumers = 1) + val cgcArgs = Array("--bootstrap-server", brokerList, "--delete", "--group", group) + val service = getConsumerGroupService(cgcArgs) + + TestUtils.waitUntilTrue(() => { + service.listGroups().contains(group) + }, "The group did not initialize as expected.") + + executor.shutdown() + + TestUtils.waitUntilTrue(() => { + service.collectGroupState().state == "Empty" + }, "The group did become empty as expected.") + + val result = service.deleteGroups() + assertTrue(s"The consumer group could not be deleted as expected", + result.size == 1 && result.keySet.contains(group) && result.get(group).contains(Errors.NONE)) + } + + @Test + def testDeleteCmdWithMixOfSuccessAndError() { + TestUtils.createOffsetsTopic(zkClient, servers) + val missingGroup = "missing.group" + + // run one consumer in the group + val executor = addConsumerGroupExecutor(numConsumers = 1) + val cgcArgs = Array("--bootstrap-server", brokerList, "--delete", "--group", group) + val service = getConsumerGroupService(cgcArgs) + + TestUtils.waitUntilTrue(() => { + service.listGroups().contains(group) + }, "The group did not initialize as expected.") + + executor.shutdown() + + TestUtils.waitUntilTrue(() => { + service.collectGroupState().state == "Empty" + }, "The group did become empty as expected.") + + val service2 = getConsumerGroupService(cgcArgs ++ Array("--group", missingGroup)) + val output = TestUtils.grabConsoleOutput(service2.deleteGroups()) + assertTrue(s"The consumer group deletion did not work as expected", + output.contains(s"Group '$missingGroup' could not be deleted due to: ${Errors.GROUP_ID_NOT_FOUND}") && + output.contains(s"These consumer groups were deleted successfully: '$group'")) + } + + @Test + def testDeleteWithMixOfSuccessAndError() { + TestUtils.createOffsetsTopic(zkClient, servers) + val missingGroup = "missing.group" + + // run one consumer in the group + val executor = addConsumerGroupExecutor(numConsumers = 1) + val cgcArgs = Array("--bootstrap-server", brokerList, "--delete", "--group", group) + val service = getConsumerGroupService(cgcArgs) + + TestUtils.waitUntilTrue(() => { + service.listGroups().contains(group) + }, "The group did not initialize as expected.") + + executor.shutdown() + + TestUtils.waitUntilTrue(() => { + service.collectGroupState().state == "Empty" + }, "The group did become empty as expected.") + + val service2 = getConsumerGroupService(cgcArgs ++ Array("--group", missingGroup)) + val result = service2.deleteGroups() + assertTrue(s"The consumer group deletion did not work as expected", + result.size == 2 && + result.keySet.contains(group) && result.get(group).contains(Errors.NONE) && + result.keySet.contains(missingGroup) && result.get(missingGroup).contains(Errors.GROUP_ID_NOT_FOUND)) + } + + @Test + def testDeleteCmdWithShortInitialization() { + // run one consumer in the group + val executor = addConsumerGroupExecutor(numConsumers = 1) + val cgcArgs = Array("--bootstrap-server", brokerList, "--delete", "--group", group) + val service = getConsumerGroupService(cgcArgs) + + val output = TestUtils.grabConsoleOutput(service.deleteGroups()) + assertTrue(s"The consumer group deletion did not work as expected", + output.contains(s"Group '$group' could not be deleted due to: ${Errors.COORDINATOR_NOT_AVAILABLE}")) + } + + @Test + def testDeleteWithShortInitialization() { + // run one consumer in the group + val executor = addConsumerGroupExecutor(numConsumers = 1) + val cgcArgs = Array("--bootstrap-server", brokerList, "--delete", "--group", group) + val service = getConsumerGroupService(cgcArgs) + + val result = service.deleteGroups() + assertTrue(s"The consumer group deletion did not work as expected", + result.size == 1 && + result.keySet.contains(group) && result.get(group).contains(Errors.COORDINATOR_NOT_AVAILABLE)) + } +} diff --git a/core/src/test/scala/unit/kafka/coordinator/group/GroupCoordinatorTest.scala b/core/src/test/scala/unit/kafka/coordinator/group/GroupCoordinatorTest.scala index a62e7aa932b..2c9e81d723a 100644 --- a/core/src/test/scala/unit/kafka/coordinator/group/GroupCoordinatorTest.scala +++ b/core/src/test/scala/unit/kafka/coordinator/group/GroupCoordinatorTest.scala @@ -30,6 +30,7 @@ import org.easymock.{Capture, EasyMock, IAnswer} import java.util.concurrent.TimeUnit import java.util.concurrent.locks.ReentrantLock +import kafka.cluster.Partition import kafka.zk.KafkaZkClient import org.apache.kafka.common.internals.Topic import org.junit.Assert._ @@ -1270,6 +1271,93 @@ class GroupCoordinatorTest extends JUnitSuite { assertTrue(summary.members.forall(_.assignment.isEmpty)) } + @Test + def testDeleteNonEmptyGroup() { + val memberId = JoinGroupRequest.UNKNOWN_MEMBER_ID + val joinGroupResult = joinGroup(groupId, memberId, protocolType, protocols) + + val result = groupCoordinator.handleDeleteGroups(Set(groupId).toSet) + assert(result.size == 1 && result.contains(groupId) && result.get(groupId).contains(Errors.NON_EMPTY_GROUP)) + } + + @Test + def testDeleteGroupWithInvalidGroupId() { + val invalidGroupId = "" + val result = groupCoordinator.handleDeleteGroups(Set(invalidGroupId).toSet) + assert(result.size == 1 && result.contains(invalidGroupId) && result.get(invalidGroupId).contains(Errors.INVALID_GROUP_ID)) + } + + @Test + def testDeleteGroupWithWrongCoordinator() { + val result = groupCoordinator.handleDeleteGroups(Set(otherGroupId).toSet) + assert(result.size == 1 && result.contains(otherGroupId) && result.get(otherGroupId).contains(Errors.NOT_COORDINATOR)) + } + + @Test + def testDeleteEmptyGroup() { + val memberId = JoinGroupRequest.UNKNOWN_MEMBER_ID + val joinGroupResult = joinGroup(groupId, memberId, protocolType, protocols) + + EasyMock.reset(replicaManager) + val leaveGroupResult = leaveGroup(groupId, joinGroupResult.memberId) + assertEquals(Errors.NONE, leaveGroupResult) + + val groupTopicPartition = new TopicPartition(Topic.GROUP_METADATA_TOPIC_NAME, groupPartitionId) + val partition = EasyMock.niceMock(classOf[Partition]) + + EasyMock.reset(replicaManager) + EasyMock.expect(replicaManager.getMagic(EasyMock.anyObject())).andStubReturn(Some(RecordBatch.CURRENT_MAGIC_VALUE)) + EasyMock.expect(replicaManager.getPartition(groupTopicPartition)).andStubReturn(Some(partition)) + EasyMock.expect(replicaManager.nonOfflinePartition(groupTopicPartition)).andStubReturn(Some(partition)) + EasyMock.replay(replicaManager, partition) + + val result = groupCoordinator.handleDeleteGroups(Set(groupId).toSet) + assert(result.size == 1 && result.contains(groupId) && result.get(groupId).contains(Errors.NONE)) + } + + @Test + def testDeleteEmptyGroupWithStoredOffsets() { + val memberId = JoinGroupRequest.UNKNOWN_MEMBER_ID + + val joinGroupResult = joinGroup(groupId, memberId, protocolType, protocols) + val assignedMemberId = joinGroupResult.memberId + val joinGroupError = joinGroupResult.error + assertEquals(Errors.NONE, joinGroupError) + + EasyMock.reset(replicaManager) + val syncGroupResult = syncGroupLeader(groupId, joinGroupResult.generationId, assignedMemberId, Map(assignedMemberId -> Array[Byte]())) + val syncGroupError = syncGroupResult._2 + assertEquals(Errors.NONE, syncGroupError) + + EasyMock.reset(replicaManager) + val tp = new TopicPartition("topic", 0) + val offset = OffsetAndMetadata(0) + val commitOffsetResult = commitOffsets(groupId, assignedMemberId, joinGroupResult.generationId, immutable.Map(tp -> offset)) + assertEquals(Errors.NONE, commitOffsetResult(tp)) + + val describeGroupResult = groupCoordinator.handleDescribeGroup(groupId) + assertEquals(Stable.toString, describeGroupResult._2.state) + assertEquals(assignedMemberId, describeGroupResult._2.members.head.memberId) + + EasyMock.reset(replicaManager) + val leaveGroupResult = leaveGroup(groupId, assignedMemberId) + assertEquals(Errors.NONE, leaveGroupResult) + + val groupTopicPartition = new TopicPartition(Topic.GROUP_METADATA_TOPIC_NAME, groupPartitionId) + val partition = EasyMock.niceMock(classOf[Partition]) + + EasyMock.reset(replicaManager) + EasyMock.expect(replicaManager.getMagic(EasyMock.anyObject())).andStubReturn(Some(RecordBatch.CURRENT_MAGIC_VALUE)) + EasyMock.expect(replicaManager.getPartition(groupTopicPartition)).andStubReturn(Some(partition)) + EasyMock.expect(replicaManager.nonOfflinePartition(groupTopicPartition)).andStubReturn(Some(partition)) + EasyMock.replay(replicaManager, partition) + + val result = groupCoordinator.handleDeleteGroups(Set(groupId).toSet) + assert(result.size == 1 && result.contains(groupId) && result.get(groupId).contains(Errors.NONE)) + + assertEquals(Dead.toString, groupCoordinator.handleDescribeGroup(groupId)._2.state) + } + @Test def shouldDelayInitialRebalanceByGroupInitialRebalanceDelayOnEmptyGroup() { val firstJoinFuture = sendJoinGroup(groupId, JoinGroupRequest.UNKNOWN_MEMBER_ID, protocolType, protocols) diff --git a/core/src/test/scala/unit/kafka/coordinator/group/GroupMetadataManagerTest.scala b/core/src/test/scala/unit/kafka/coordinator/group/GroupMetadataManagerTest.scala index 62ebf297a6c..b358c4e8279 100644 --- a/core/src/test/scala/unit/kafka/coordinator/group/GroupMetadataManagerTest.scala +++ b/core/src/test/scala/unit/kafka/coordinator/group/GroupMetadataManagerTest.scala @@ -497,6 +497,26 @@ class GroupMetadataManagerTest { } } + @Test + def testGroupNotExists() { + // group is not owned + assertFalse(groupMetadataManager.groupNotExists(groupId)) + + groupMetadataManager.addPartitionOwnership(groupPartitionId) + // group is owned but does not exist yet + assertTrue(groupMetadataManager.groupNotExists(groupId)) + + val group = new GroupMetadata(groupId, initialState = Empty) + groupMetadataManager.addGroup(group) + + // group is owned but not Dead + assertFalse(groupMetadataManager.groupNotExists(groupId)) + + group.transitionTo(Dead) + // group is owned and Dead + assertTrue(groupMetadataManager.groupNotExists(groupId)) + } + private def appendConsumerOffsetCommit(buffer: ByteBuffer, baseOffset: Long, offsets: Map[TopicPartition, Long]) = { val builder = MemoryRecords.builder(buffer, CompressionType.NONE, TimestampType.LOG_APPEND_TIME, baseOffset) val commitRecords = createCommittedOffsetRecords(offsets) diff --git a/core/src/test/scala/unit/kafka/server/RequestQuotaTest.scala b/core/src/test/scala/unit/kafka/server/RequestQuotaTest.scala index a90fa64914d..bfbae2bde03 100644 --- a/core/src/test/scala/unit/kafka/server/RequestQuotaTest.scala +++ b/core/src/test/scala/unit/kafka/server/RequestQuotaTest.scala @@ -312,12 +312,15 @@ class RequestQuotaTest extends BaseRequestTest { case ApiKeys.EXPIRE_DELEGATION_TOKEN => new ExpireDelegationTokenRequest.Builder(ByteBuffer.allocate(10), 1000) - case ApiKeys.DESCRIBE_DELEGATION_TOKEN=> + case ApiKeys.DESCRIBE_DELEGATION_TOKEN => new DescribeDelegationTokenRequest.Builder(Collections.singletonList(SecurityUtils.parseKafkaPrincipal("User:test"))) - case ApiKeys.RENEW_DELEGATION_TOKEN=> + case ApiKeys.RENEW_DELEGATION_TOKEN => new RenewDelegationTokenRequest.Builder(ByteBuffer.allocate(10), 1000) + case ApiKeys.DELETE_GROUPS => + new DeleteGroupsRequest.Builder(Collections.singleton("test-group")) + case _ => throw new IllegalArgumentException("Unsupported API key " + apiKey) } @@ -416,6 +419,7 @@ class RequestQuotaTest extends BaseRequestTest { case ApiKeys.DESCRIBE_DELEGATION_TOKEN=> new DescribeDelegationTokenResponse(response).throttleTimeMs case ApiKeys.EXPIRE_DELEGATION_TOKEN => new ExpireDelegationTokenResponse(response).throttleTimeMs case ApiKeys.RENEW_DELEGATION_TOKEN => new RenewDelegationTokenResponse(response).throttleTimeMs + case ApiKeys.DELETE_GROUPS => new DeleteGroupsResponse(response).throttleTimeMs case requestId => throw new IllegalArgumentException(s"No throttle time for $requestId") } }