Browse Source

KAFKA-6275: Add DeleteGroups API (KIP-229) (#4479)

Reviewers: Manikumar Reddy O <manikumar.reddy@gmail.com>, Jason Gustafson <jason@confluent.io>
pull/4499/head
Vahid Hashemian 7 years ago committed by Jason Gustafson
parent
commit
1ed6da7cc8
  1. 31
      clients/src/main/java/org/apache/kafka/common/errors/GroupIdNotFoundException.java
  2. 31
      clients/src/main/java/org/apache/kafka/common/errors/GroupNotEmptyException.java
  3. 5
      clients/src/main/java/org/apache/kafka/common/protocol/ApiKeys.java
  4. 24
      clients/src/main/java/org/apache/kafka/common/protocol/Errors.java
  5. 2
      clients/src/main/java/org/apache/kafka/common/requests/AbstractRequest.java
  6. 2
      clients/src/main/java/org/apache/kafka/common/requests/AbstractResponse.java
  7. 117
      clients/src/main/java/org/apache/kafka/common/requests/DeleteGroupsRequest.java
  8. 129
      clients/src/main/java/org/apache/kafka/common/requests/DeleteGroupsResponse.java
  9. 13
      clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java
  10. 2
      core/src/main/scala/kafka/admin/AclCommand.scala
  11. 43
      core/src/main/scala/kafka/admin/AdminClient.scala
  12. 3
      core/src/main/scala/kafka/admin/AdminUtils.scala
  13. 84
      core/src/main/scala/kafka/admin/ConsumerGroupCommand.scala
  14. 10
      core/src/main/scala/kafka/api/ApiVersion.scala
  15. 48
      core/src/main/scala/kafka/coordinator/group/GroupCoordinator.scala
  16. 17
      core/src/main/scala/kafka/coordinator/group/GroupMetadataManager.scala
  17. 20
      core/src/main/scala/kafka/server/KafkaApis.scala
  18. 40
      core/src/test/scala/integration/kafka/api/AuthorizerIntegrationTest.scala
  19. 251
      core/src/test/scala/unit/kafka/admin/DeleteConsumerGroupsTest.scala
  20. 88
      core/src/test/scala/unit/kafka/coordinator/group/GroupCoordinatorTest.scala
  21. 20
      core/src/test/scala/unit/kafka/coordinator/group/GroupMetadataManagerTest.scala
  22. 8
      core/src/test/scala/unit/kafka/server/RequestQuotaTest.scala

31
clients/src/main/java/org/apache/kafka/common/errors/GroupIdNotFoundException.java

@ -0,0 +1,31 @@ @@ -0,0 +1,31 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.common.errors;
public class GroupIdNotFoundException extends ApiException {
private final String groupId;
public GroupIdNotFoundException(String groupId) {
super("The group id " + groupId + " was not found");
this.groupId = groupId;
}
public String groupId() {
return groupId;
}
}

31
clients/src/main/java/org/apache/kafka/common/errors/GroupNotEmptyException.java

@ -0,0 +1,31 @@ @@ -0,0 +1,31 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.common.errors;
public class GroupNotEmptyException extends ApiException {
private final String groupId;
public GroupNotEmptyException(String groupId) {
super("The group " + groupId + " is not empty");
this.groupId = groupId;
}
public String groupId() {
return groupId;
}
}

5
clients/src/main/java/org/apache/kafka/common/protocol/ApiKeys.java

@ -43,6 +43,8 @@ import org.apache.kafka.common.requests.CreateTopicsRequest; @@ -43,6 +43,8 @@ import org.apache.kafka.common.requests.CreateTopicsRequest;
import org.apache.kafka.common.requests.CreateTopicsResponse;
import org.apache.kafka.common.requests.DeleteAclsRequest;
import org.apache.kafka.common.requests.DeleteAclsResponse;
import org.apache.kafka.common.requests.DeleteGroupsRequest;
import org.apache.kafka.common.requests.DeleteGroupsResponse;
import org.apache.kafka.common.requests.DeleteRecordsRequest;
import org.apache.kafka.common.requests.DeleteRecordsResponse;
import org.apache.kafka.common.requests.DeleteTopicsRequest;
@ -183,7 +185,8 @@ public enum ApiKeys { @@ -183,7 +185,8 @@ public enum ApiKeys {
CREATE_DELEGATION_TOKEN(38, "CreateDelegationToken", CreateDelegationTokenRequest.schemaVersions(), CreateDelegationTokenResponse.schemaVersions()),
RENEW_DELEGATION_TOKEN(39, "RenewDelegationToken", RenewDelegationTokenRequest.schemaVersions(), RenewDelegationTokenResponse.schemaVersions()),
EXPIRE_DELEGATION_TOKEN(40, "ExpireDelegationToken", ExpireDelegationTokenRequest.schemaVersions(), ExpireDelegationTokenResponse.schemaVersions()),
DESCRIBE_DELEGATION_TOKEN(41, "DescribeDelegationToken", DescribeDelegationTokenRequest.schemaVersions(), DescribeDelegationTokenResponse.schemaVersions());
DESCRIBE_DELEGATION_TOKEN(41, "DescribeDelegationToken", DescribeDelegationTokenRequest.schemaVersions(), DescribeDelegationTokenResponse.schemaVersions()),
DELETE_GROUPS(42, "DeleteGroups", DeleteGroupsRequest.schemaVersions(), DeleteGroupsResponse.schemaVersions());
private static final ApiKeys[] ID_TO_TYPE;
private static final int MIN_API_KEY = 0;

24
clients/src/main/java/org/apache/kafka/common/protocol/Errors.java

@ -17,8 +17,6 @@ @@ -17,8 +17,6 @@
package org.apache.kafka.common.protocol;
import org.apache.kafka.common.errors.ApiException;
import org.apache.kafka.common.errors.InvalidPrincipalTypeException;
import org.apache.kafka.common.errors.SaslAuthenticationException;
import org.apache.kafka.common.errors.BrokerNotAvailableException;
import org.apache.kafka.common.errors.ClusterAuthorizationException;
import org.apache.kafka.common.errors.ConcurrentTransactionsException;
@ -26,12 +24,15 @@ import org.apache.kafka.common.errors.ControllerMovedException; @@ -26,12 +24,15 @@ import org.apache.kafka.common.errors.ControllerMovedException;
import org.apache.kafka.common.errors.CoordinatorLoadInProgressException;
import org.apache.kafka.common.errors.CoordinatorNotAvailableException;
import org.apache.kafka.common.errors.CorruptRecordException;
import org.apache.kafka.common.errors.LogDirNotFoundException;
import org.apache.kafka.common.errors.DuplicateSequenceException;
import org.apache.kafka.common.errors.DelegationTokenAuthorizationException;
import org.apache.kafka.common.errors.DelegationTokenDisabledException;
import org.apache.kafka.common.errors.DelegationTokenExpiredException;
import org.apache.kafka.common.errors.DelegationTokenNotFoundException;
import org.apache.kafka.common.errors.DelegationTokenOwnerMismatchException;
import org.apache.kafka.common.errors.GroupAuthorizationException;
import org.apache.kafka.common.errors.GroupIdNotFoundException;
import org.apache.kafka.common.errors.GroupNotEmptyException;
import org.apache.kafka.common.errors.IllegalGenerationException;
import org.apache.kafka.common.errors.IllegalSaslStateException;
import org.apache.kafka.common.errors.InconsistentGroupProtocolException;
@ -41,6 +42,7 @@ import org.apache.kafka.common.errors.InvalidFetchSizeException; @@ -41,6 +42,7 @@ import org.apache.kafka.common.errors.InvalidFetchSizeException;
import org.apache.kafka.common.errors.InvalidGroupIdException;
import org.apache.kafka.common.errors.InvalidPartitionsException;
import org.apache.kafka.common.errors.InvalidPidMappingException;
import org.apache.kafka.common.errors.InvalidPrincipalTypeException;
import org.apache.kafka.common.errors.InvalidReplicaAssignmentException;
import org.apache.kafka.common.errors.InvalidReplicationFactorException;
import org.apache.kafka.common.errors.InvalidRequestException;
@ -52,6 +54,7 @@ import org.apache.kafka.common.errors.InvalidTxnStateException; @@ -52,6 +54,7 @@ import org.apache.kafka.common.errors.InvalidTxnStateException;
import org.apache.kafka.common.errors.InvalidTxnTimeoutException;
import org.apache.kafka.common.errors.KafkaStorageException;
import org.apache.kafka.common.errors.LeaderNotAvailableException;
import org.apache.kafka.common.errors.LogDirNotFoundException;
import org.apache.kafka.common.errors.NetworkException;
import org.apache.kafka.common.errors.NotControllerException;
import org.apache.kafka.common.errors.NotCoordinatorException;
@ -70,10 +73,9 @@ import org.apache.kafka.common.errors.RecordBatchTooLargeException; @@ -70,10 +73,9 @@ import org.apache.kafka.common.errors.RecordBatchTooLargeException;
import org.apache.kafka.common.errors.RecordTooLargeException;
import org.apache.kafka.common.errors.ReplicaNotAvailableException;
import org.apache.kafka.common.errors.RetriableException;
import org.apache.kafka.common.errors.SaslAuthenticationException;
import org.apache.kafka.common.errors.SecurityDisabledException;
import org.apache.kafka.common.errors.TimeoutException;
import org.apache.kafka.common.errors.DelegationTokenAuthorizationException;
import org.apache.kafka.common.errors.DelegationTokenExpiredException;
import org.apache.kafka.common.errors.TopicAuthorizationException;
import org.apache.kafka.common.errors.TopicExistsException;
import org.apache.kafka.common.errors.TransactionalIdAuthorizationException;
@ -594,6 +596,18 @@ public enum Errors { @@ -594,6 +596,18 @@ public enum Errors {
public ApiException build(String message) {
return new InvalidPrincipalTypeException(message);
}
}),
NON_EMPTY_GROUP(68, "The group is not empty", new ApiExceptionBuilder() {
@Override
public ApiException build(String message) {
return new GroupNotEmptyException(message);
}
}),
GROUP_ID_NOT_FOUND(69, "The group id does not exist", new ApiExceptionBuilder() {
@Override
public ApiException build(String message) {
return new GroupIdNotFoundException(message);
}
});
private interface ApiExceptionBuilder {

2
clients/src/main/java/org/apache/kafka/common/requests/AbstractRequest.java

@ -222,6 +222,8 @@ public abstract class AbstractRequest extends AbstractRequestResponse { @@ -222,6 +222,8 @@ public abstract class AbstractRequest extends AbstractRequestResponse {
return new ExpireDelegationTokenRequest(struct, apiVersion);
case DESCRIBE_DELEGATION_TOKEN:
return new DescribeDelegationTokenRequest(struct, apiVersion);
case DELETE_GROUPS:
return new DeleteGroupsRequest(struct, apiVersion);
default:
throw new AssertionError(String.format("ApiKey %s is not currently handled in `parseRequest`, the " +
"code should be updated to do so.", apiKey));

2
clients/src/main/java/org/apache/kafka/common/requests/AbstractResponse.java

@ -154,6 +154,8 @@ public abstract class AbstractResponse extends AbstractRequestResponse { @@ -154,6 +154,8 @@ public abstract class AbstractResponse extends AbstractRequestResponse {
return new ExpireDelegationTokenResponse(struct);
case DESCRIBE_DELEGATION_TOKEN:
return new DescribeDelegationTokenResponse(struct);
case DELETE_GROUPS:
return new DeleteGroupsResponse(struct);
default:
throw new AssertionError(String.format("ApiKey %s is not currently handled in `parseResponse`, the " +
"code should be updated to do so.", apiKey));

117
clients/src/main/java/org/apache/kafka/common/requests/DeleteGroupsRequest.java

@ -0,0 +1,117 @@ @@ -0,0 +1,117 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.common.requests;
import org.apache.kafka.common.protocol.ApiKeys;
import org.apache.kafka.common.protocol.Errors;
import org.apache.kafka.common.protocol.types.ArrayOf;
import org.apache.kafka.common.protocol.types.Field;
import org.apache.kafka.common.protocol.types.Schema;
import org.apache.kafka.common.protocol.types.Struct;
import org.apache.kafka.common.utils.Utils;
import java.nio.ByteBuffer;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Map;
import java.util.Set;
import static org.apache.kafka.common.protocol.types.Type.STRING;
public class DeleteGroupsRequest extends AbstractRequest {
private static final String GROUPS_KEY_NAME = "groups";
/* DeleteGroups api */
private static final Schema DELETE_GROUPS_REQUEST_V0 = new Schema(
new Field(GROUPS_KEY_NAME, new ArrayOf(STRING), "An array of groups to be deleted."));
public static Schema[] schemaVersions() {
return new Schema[]{DELETE_GROUPS_REQUEST_V0};
}
private final Set<String> groups;
public static class Builder extends AbstractRequest.Builder<DeleteGroupsRequest> {
private final Set<String> groups;
public Builder(Set<String> groups) {
super(ApiKeys.DELETE_GROUPS);
this.groups = groups;
}
@Override
public DeleteGroupsRequest build(short version) {
return new DeleteGroupsRequest(groups, version);
}
@Override
public String toString() {
StringBuilder bld = new StringBuilder();
bld.append("(type=DeleteGroupsRequest").
append(", groups=(").append(Utils.join(groups, ", ")).append(")").
append(")");
return bld.toString();
}
}
private DeleteGroupsRequest(Set<String> groups, short version) {
super(version);
this.groups = groups;
}
public DeleteGroupsRequest(Struct struct, short version) {
super(version);
Object[] groupsArray = struct.getArray(GROUPS_KEY_NAME);
Set<String> groups = new HashSet<>(groupsArray.length);
for (Object group : groupsArray)
groups.add((String) group);
this.groups = groups;
}
@Override
protected Struct toStruct() {
Struct struct = new Struct(ApiKeys.DELETE_GROUPS.requestSchema(version()));
struct.set(GROUPS_KEY_NAME, groups.toArray());
return struct;
}
@Override
public AbstractResponse getErrorResponse(int throttleTimeMs, Throwable e) {
Errors error = Errors.forException(e);
Map<String, Errors> groupErrors = new HashMap<>(groups.size());
for (String group : groups)
groupErrors.put(group, error);
switch (version()) {
case 0:
return new DeleteGroupsResponse(throttleTimeMs, groupErrors);
default:
throw new IllegalArgumentException(String.format("Version %d is not valid. Valid versions for %s are 0 to %d",
version(), ApiKeys.DELETE_GROUPS.name, ApiKeys.DELETE_GROUPS.latestVersion()));
}
}
public Set<String> groups() {
return groups;
}
public static DeleteGroupsRequest parse(ByteBuffer buffer, short version) {
return new DeleteGroupsRequest(ApiKeys.DELETE_GROUPS.parseRequest(version, buffer), version);
}
}

129
clients/src/main/java/org/apache/kafka/common/requests/DeleteGroupsResponse.java

@ -0,0 +1,129 @@ @@ -0,0 +1,129 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.common.requests;
import org.apache.kafka.common.protocol.ApiKeys;
import org.apache.kafka.common.protocol.Errors;
import org.apache.kafka.common.protocol.types.ArrayOf;
import org.apache.kafka.common.protocol.types.Field;
import org.apache.kafka.common.protocol.types.Schema;
import org.apache.kafka.common.protocol.types.Struct;
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import static org.apache.kafka.common.protocol.CommonFields.ERROR_CODE;
import static org.apache.kafka.common.protocol.CommonFields.GROUP_ID;
import static org.apache.kafka.common.protocol.CommonFields.THROTTLE_TIME_MS;
public class DeleteGroupsResponse extends AbstractResponse {
private static final String GROUP_ERROR_CODES_KEY_NAME = "group_error_codes";
private static final Schema GROUP_ERROR_CODE = new Schema(
GROUP_ID,
ERROR_CODE);
private static final Schema DELETE_GROUPS_RESPONSE_V0 = new Schema(
THROTTLE_TIME_MS,
new Field(GROUP_ERROR_CODES_KEY_NAME, new ArrayOf(GROUP_ERROR_CODE), "An array of per group error codes."));
public static Schema[] schemaVersions() {
return new Schema[]{DELETE_GROUPS_RESPONSE_V0};
}
/**
* Possible error codes:
*
* COORDINATOR_LOAD_IN_PROGRESS (14)
* COORDINATOR_NOT_AVAILABLE(15)
* NOT_COORDINATOR (16)
* INVALID_GROUP_ID(24)
* GROUP_AUTHORIZATION_FAILED(30)
* NON_EMPTY_GROUP(68)
* GROUP_ID_NOT_FOUND(69)
*/
private final Map<String, Errors> errors;
private final int throttleTimeMs;
public DeleteGroupsResponse(Map<String, Errors> errors) {
this(DEFAULT_THROTTLE_TIME, errors);
}
public DeleteGroupsResponse(int throttleTimeMs, Map<String, Errors> errors) {
this.throttleTimeMs = throttleTimeMs;
this.errors = errors;
}
public DeleteGroupsResponse(Struct struct) {
this.throttleTimeMs = struct.getOrElse(THROTTLE_TIME_MS, DEFAULT_THROTTLE_TIME);
Object[] groupErrorCodesStructs = struct.getArray(GROUP_ERROR_CODES_KEY_NAME);
Map<String, Errors> errors = new HashMap<>();
for (Object groupErrorCodeStructObj : groupErrorCodesStructs) {
Struct groupErrorCodeStruct = (Struct) groupErrorCodeStructObj;
String group = groupErrorCodeStruct.get(GROUP_ID);
Errors error = Errors.forCode(groupErrorCodeStruct.get(ERROR_CODE));
errors.put(group, error);
}
this.errors = errors;
}
@Override
protected Struct toStruct(short version) {
Struct struct = new Struct(ApiKeys.DELETE_GROUPS.responseSchema(version));
struct.setIfExists(THROTTLE_TIME_MS, throttleTimeMs);
List<Struct> groupErrorCodeStructs = new ArrayList<>(errors.size());
for (Map.Entry<String, Errors> groupError : errors.entrySet()) {
Struct groupErrorCodeStruct = struct.instance(GROUP_ERROR_CODES_KEY_NAME);
groupErrorCodeStruct.set(GROUP_ID, groupError.getKey());
groupErrorCodeStruct.set(ERROR_CODE, groupError.getValue().code());
groupErrorCodeStructs.add(groupErrorCodeStruct);
}
struct.set(GROUP_ERROR_CODES_KEY_NAME, groupErrorCodeStructs.toArray());
return struct;
}
public int throttleTimeMs() {
return throttleTimeMs;
}
public Map<String, Errors> errors() {
return errors;
}
public boolean hasError(String group) {
return errors.containsKey(group) && errors.get(group) != Errors.NONE;
}
public Errors get(String group) {
return errors.get(group);
}
@Override
public Map<Errors, Integer> errorCounts() {
return errorCounts(errors);
}
public static DeleteGroupsResponse parse(ByteBuffer buffer, short version) {
return new DeleteGroupsResponse(ApiKeys.DELETE_GROUPS.responseSchema(version).read(buffer));
}
}

13
clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java

@ -112,6 +112,9 @@ public class RequestResponseTest { @@ -112,6 +112,9 @@ public class RequestResponseTest {
checkRequest(createDescribeGroupRequest());
checkErrorResponse(createDescribeGroupRequest(), new UnknownServerException());
checkResponse(createDescribeGroupResponse(), 0);
checkRequest(createDeleteGroupsRequest());
checkErrorResponse(createDeleteGroupsRequest(), new UnknownServerException());
checkResponse(createDeleteGroupsResponse(), 0);
checkRequest(createListOffsetRequest(1));
checkErrorResponse(createListOffsetRequest(1), new UnknownServerException());
checkResponse(createListOffsetResponse(1), 1);
@ -641,6 +644,16 @@ public class RequestResponseTest { @@ -641,6 +644,16 @@ public class RequestResponseTest {
return new LeaveGroupResponse(Errors.NONE);
}
private DeleteGroupsRequest createDeleteGroupsRequest() {
return new DeleteGroupsRequest.Builder(Collections.singleton("test-group")).build();
}
private DeleteGroupsResponse createDeleteGroupsResponse() {
Map<String, Errors> result = new HashMap<>();
result.put("test-group", Errors.NONE);
return new DeleteGroupsResponse(result);
}
@SuppressWarnings("deprecation")
private ListOffsetRequest createListOffsetRequest(int version) {
if (version == 0) {

2
core/src/main/scala/kafka/admin/AclCommand.scala

@ -32,7 +32,7 @@ object AclCommand extends Logging { @@ -32,7 +32,7 @@ object AclCommand extends Logging {
val Newline = scala.util.Properties.lineSeparator
val ResourceTypeToValidOperations = Map[ResourceType, Set[Operation]] (
Topic -> Set(Read, Write, Describe, Delete, DescribeConfigs, AlterConfigs, All),
Group -> Set(Read, Describe, All),
Group -> Set(Read, Describe, Delete, All),
Cluster -> Set(Create, ClusterAction, DescribeConfigs, AlterConfigs, IdempotentWrite, Alter, Describe, All),
TransactionalId -> Set(Describe, Write, All),
DelegationToken -> Set(Describe, All)

43
core/src/main/scala/kafka/admin/AdminClient.scala

@ -369,6 +369,49 @@ class AdminClient(val time: Time, @@ -369,6 +369,49 @@ class AdminClient(val time: Time,
(response.error, response.tokens().asScala.toList)
}
def deleteConsumerGroups(groups: List[String]): Map[String, Errors] = {
def coordinatorLookup(group: String): Either[Node, Errors] = {
try {
Left(findCoordinator(group))
} catch {
case e: Throwable =>
if (e.isInstanceOf[TimeoutException])
Right(Errors.COORDINATOR_NOT_AVAILABLE)
else
Right(Errors.forException(e))
}
}
var errors: Map[String, Errors] = Map()
var groupsPerCoordinator: Map[Node, List[String]] = Map()
groups.foreach { group =>
coordinatorLookup(group) match {
case Right(error) =>
errors += group -> error
case Left(coordinator) =>
groupsPerCoordinator.get(coordinator) match {
case Some(gList) =>
val gListNew = group :: gList
groupsPerCoordinator += coordinator -> gListNew
case None =>
groupsPerCoordinator += coordinator -> List(group)
}
}
}
groupsPerCoordinator.foreach { case (coordinator, groups) =>
val responseBody = send(coordinator, ApiKeys.DELETE_GROUPS, new DeleteGroupsRequest.Builder(groups.toSet.asJava))
val response = responseBody.asInstanceOf[DeleteGroupsResponse]
groups.foreach {
case group if response.hasError(group) => errors += group -> response.errors.get(group)
case group => errors += group -> Errors.NONE
}
}
errors
}
def close() {
running = false

3
core/src/main/scala/kafka/admin/AdminUtils.scala

@ -429,9 +429,10 @@ object AdminUtils extends Logging with AdminUtilities { @@ -429,9 +429,10 @@ object AdminUtils extends Logging with AdminUtilities {
* @param topic Topic of the consumer group information we wish to delete
*/
@deprecated("This method has been deprecated and will be removed in a future release.", "0.11.0.0")
def deleteAllConsumerGroupInfoForTopicInZK(zkUtils: ZkUtils, topic: String) {
def deleteAllConsumerGroupInfoForTopicInZK(zkUtils: ZkUtils, topic: String): Set[String] = {
val groups = zkUtils.getAllConsumerGroupsForTopic(topic)
groups.foreach(group => deleteConsumerGroupInfoForTopicInZK(zkUtils, group, topic))
groups
}
def topicExists(zkUtils: ZkUtils, topic: String): Boolean =

84
core/src/main/scala/kafka/admin/ConsumerGroupCommand.scala

@ -75,12 +75,8 @@ object ConsumerGroupCommand extends Logging { @@ -75,12 +75,8 @@ object ConsumerGroupCommand extends Logging {
consumerGroupService.listGroups().foreach(println(_))
else if (opts.options.has(opts.describeOpt))
consumerGroupService.describeGroup()
else if (opts.options.has(opts.deleteOpt)) {
consumerGroupService match {
case service: ZkConsumerGroupService => service.deleteGroups()
case _ => throw new IllegalStateException(s"delete is not supported for $consumerGroupService.")
}
}
else if (opts.options.has(opts.deleteOpt))
consumerGroupService.deleteGroups()
else if (opts.options.has(opts.resetOffsetsOpt)) {
val offsetsToReset = consumerGroupService.resetOffsets()
if (opts.options.has(opts.exportOpt)) {
@ -344,6 +340,8 @@ object ConsumerGroupCommand extends Logging { @@ -344,6 +340,8 @@ object ConsumerGroupCommand extends Logging {
def resetOffsets(): Map[TopicPartition, OffsetAndMetadata] = throw new UnsupportedOperationException
def exportOffsetsToReset(assignmentsToReset: Map[TopicPartition, OffsetAndMetadata]): String = throw new UnsupportedOperationException
def deleteGroups(): Map[String, Errors]
}
@deprecated("This class has been deprecated and will be removed in a future release.", "0.11.0.0")
@ -362,13 +360,15 @@ object ConsumerGroupCommand extends Logging { @@ -362,13 +360,15 @@ object ConsumerGroupCommand extends Logging {
zkUtils.getConsumerGroups().toList
}
def deleteGroups() {
def deleteGroups(): Map[String, Errors] = {
if (opts.options.has(opts.groupOpt) && opts.options.has(opts.topicOpt))
deleteForTopic()
deleteGroupsInfoForTopic()
else if (opts.options.has(opts.groupOpt))
deleteForGroup()
deleteGroupsInfo()
else if (opts.options.has(opts.topicOpt))
deleteAllForTopic()
deleteAllGroupsInfoForTopic()
Map()
}
def collectGroupOffsets(): (Option[String], Option[Seq[PartitionAssignmentState]]) = {
@ -476,45 +476,57 @@ object ConsumerGroupCommand extends Logging { @@ -476,45 +476,57 @@ object ConsumerGroupCommand extends Logging {
}.toMap
}
private def deleteForGroup() {
private def deleteGroupsInfo(): Map[String, Errors] = {
val groups = opts.options.valuesOf(opts.groupOpt)
groups.asScala.foreach { group =>
groups.asScala.map { group =>
try {
if (AdminUtils.deleteConsumerGroupInZK(zkUtils, group))
if (AdminUtils.deleteConsumerGroupInZK(zkUtils, group)) {
println(s"Deleted all consumer group information for group '$group' in zookeeper.")
else
group -> Errors.NONE
}
else {
printError(s"Delete for group '$group' failed because its consumers are still active.")
group -> Errors.NON_EMPTY_GROUP
}
}
catch {
case e: ZkNoNodeException =>
printError(s"Delete for group '$group' failed because group does not exist.", Some(e))
group -> Errors.forException(e)
}
}
}.toMap
}
private def deleteForTopic() {
private def deleteGroupsInfoForTopic(): Map[String, Errors] = {
val groups = opts.options.valuesOf(opts.groupOpt)
val topic = opts.options.valueOf(opts.topicOpt)
Topic.validate(topic)
groups.asScala.foreach { group =>
groups.asScala.map { group =>
try {
if (AdminUtils.deleteConsumerGroupInfoForTopicInZK(zkUtils, group, topic))
if (AdminUtils.deleteConsumerGroupInfoForTopicInZK(zkUtils, group, topic)) {
println(s"Deleted consumer group information for group '$group' topic '$topic' in zookeeper.")
else
group -> Errors.NONE
}
else {
printError(s"Delete for group '$group' topic '$topic' failed because its consumers are still active.")
group -> Errors.NON_EMPTY_GROUP
}
}
catch {
case e: ZkNoNodeException =>
printError(s"Delete for group '$group' topic '$topic' failed because group does not exist.", Some(e))
group -> Errors.forException(e)
}
}
}.toMap
}
private def deleteAllForTopic() {
private def deleteAllGroupsInfoForTopic(): Map[String, Errors] = {
val topic = opts.options.valueOf(opts.topicOpt)
Topic.validate(topic)
AdminUtils.deleteAllConsumerGroupInfoForTopicInZK(zkUtils, topic)
val deletedGroups = AdminUtils.deleteAllConsumerGroupInfoForTopicInZK(zkUtils, topic)
println(s"Deleted consumer group information for all inactive consumer groups for topic '$topic' in zookeeper.")
deletedGroups.map(_ -> Errors.NONE).toMap
}
private def getZkConsumer(brokerId: Int): Option[SimpleConsumer] = {
@ -830,6 +842,27 @@ object ConsumerGroupCommand extends Logging { @@ -830,6 +842,27 @@ object ConsumerGroupCommand extends Logging {
rows.foldRight("")(_ + "\n" + _)
}
override def deleteGroups(): Map[String, Errors] = {
val groupsToDelete = opts.options.valuesOf(opts.groupOpt).asScala.toList
val result = adminClient.deleteConsumerGroups(groupsToDelete)
val successfullyDeleted = result.filter {
case (_, error) => error == Errors.NONE
}.keySet
if (successfullyDeleted.size == result.size)
println(s"Deletion of requested consumer groups (${successfullyDeleted.mkString("'", ", ", "'")}) was successful.")
else {
printError("Deletion of some consumer groups failed:")
result.foreach {
case (group, error) if error != Errors.NONE => println(s"* Group '$group' could not be deleted due to: ${error.toString}")
case _ => // no need to print successful deletions individually
}
if (successfullyDeleted.nonEmpty)
println(s"\nThese consumer groups were deleted successfully: ${successfullyDeleted.mkString("'", ", ", "'")}")
}
result
}
}
sealed trait LogOffsetResult
@ -987,10 +1020,9 @@ object ConsumerGroupCommand extends Logging { @@ -987,10 +1020,9 @@ object ConsumerGroupCommand extends Logging {
s"The new consumer is used by default if the $bootstrapServerOpt option is provided.")
}
if (options.has(deleteOpt))
CommandLineUtils.printUsageAndDie(parser, s"Option $deleteOpt is only valid with $zkConnectOpt. Note that " +
"there is no need to delete group metadata for the new consumer as the group is deleted when the last " +
"committed offset for that group expires.")
if (options.has(deleteOpt) && options.has(topicOpt))
CommandLineUtils.printUsageAndDie(parser, s"When deleting a consumer group the option $topicOpt is only " +
s"valid with $zkConnectOpt. The new consumer does not support topic-specific offset deletion from a consumer group.")
}
if (describeOptPresent)

10
core/src/main/scala/kafka/api/ApiVersion.scala

@ -72,7 +72,9 @@ object ApiVersion { @@ -72,7 +72,9 @@ object ApiVersion {
"0.11.0" -> KAFKA_0_11_0_IV2,
// Introduced LeaderAndIsrRequest V1, UpdateMetadataRequest V4 and FetchRequest V6 via KIP-112
"1.0-IV0" -> KAFKA_1_0_IV0,
"1.0" -> KAFKA_1_0_IV0
"1.0" -> KAFKA_1_0_IV0,
// Introduced DeleteGroupsRequest V0 via KIP-229
"1.1-IV0" -> KAFKA_1_1_IV0
)
private val versionPattern = "\\.".r
@ -184,3 +186,9 @@ case object KAFKA_1_0_IV0 extends ApiVersion { @@ -184,3 +186,9 @@ case object KAFKA_1_0_IV0 extends ApiVersion {
val id: Int = 13
}
case object KAFKA_1_1_IV0 extends ApiVersion {
val version: String = "1.1-IV0"
val messageFormatVersion: Byte = RecordBatch.MAGIC_VALUE_V2
val id: Int = 14
}

48
core/src/main/scala/kafka/coordinator/group/GroupCoordinator.scala

@ -338,6 +338,52 @@ class GroupCoordinator(val brokerId: Int, @@ -338,6 +338,52 @@ class GroupCoordinator(val brokerId: Int,
}
}
def handleDeleteGroups(groupIds: Set[String]): Map[String, Errors] = {
if (!isActive.get) {
groupIds.map(_ -> Errors.COORDINATOR_NOT_AVAILABLE).toMap
} else {
var groupErrors: Map[String, Errors] = Map()
var eligibleGroups: Seq[GroupMetadata] = Seq()
groupIds.foreach { groupId =>
if (!validGroupId(groupId))
groupErrors += groupId -> Errors.INVALID_GROUP_ID
else if (!isCoordinatorForGroup(groupId))
groupErrors += groupId -> Errors.NOT_COORDINATOR
else if (isCoordinatorLoadInProgress(groupId))
groupErrors += groupId -> Errors.COORDINATOR_LOAD_IN_PROGRESS
else {
groupManager.getGroup(groupId) match {
case None =>
groupErrors += groupId ->
(if (groupManager.groupNotExists(groupId)) Errors.GROUP_ID_NOT_FOUND else Errors.NOT_COORDINATOR)
case Some(group) =>
group.inLock {
group.currentState match {
case Dead =>
groupErrors += groupId ->
(if (groupManager.groupNotExists(groupId)) Errors.GROUP_ID_NOT_FOUND else Errors.NOT_COORDINATOR)
case Empty =>
group.transitionTo(Dead)
eligibleGroups :+= group
case _ =>
groupErrors += groupId -> Errors.NON_EMPTY_GROUP
}
}
}
}
}
if (eligibleGroups.nonEmpty) {
groupManager.cleanupGroupMetadata(None, eligibleGroups, Long.MaxValue)
groupErrors ++= eligibleGroups.map(_.groupId -> Errors.NONE).toMap
info(s"The following groups were deleted: ${eligibleGroups.map(_.groupId).mkString(", ")}")
}
groupErrors
}
}
def handleHeartbeat(groupId: String,
memberId: String,
generationId: Int,
@ -522,7 +568,7 @@ class GroupCoordinator(val brokerId: Int, @@ -522,7 +568,7 @@ class GroupCoordinator(val brokerId: Int,
}
def handleDeletedPartitions(topicPartitions: Seq[TopicPartition]) {
groupManager.cleanupGroupMetadata(Some(topicPartitions))
groupManager.cleanupGroupMetadata(Some(topicPartitions), groupManager.currentGroups, time.milliseconds())
}
private def validateGroup(groupId: String): Option[Errors] = {

17
core/src/main/scala/kafka/coordinator/group/GroupMetadataManager.scala

@ -158,6 +158,13 @@ class GroupMetadataManager(brokerId: Int, @@ -158,6 +158,13 @@ class GroupMetadataManager(brokerId: Int,
def isLoading(): Boolean = inLock(partitionLock) { loadingPartitions.nonEmpty }
// return true iff group is owned and the group doesn't exist
def groupNotExists(groupId: String) = inLock(partitionLock) {
isGroupLocal(groupId) && getGroup(groupId).forall { group =>
group.inLock(group.is(Dead))
}
}
// visible for testing
private[group] def isGroupOpenForProducer(producerId: Long, groupId: String) = openGroupsForProducer.get(producerId) match {
case Some(groups) =>
@ -706,14 +713,16 @@ class GroupMetadataManager(brokerId: Int, @@ -706,14 +713,16 @@ class GroupMetadataManager(brokerId: Int,
// visible for testing
private[group] def cleanupGroupMetadata(): Unit = {
cleanupGroupMetadata(None)
cleanupGroupMetadata(None, groupMetadataCache.values, time.milliseconds())
}
def cleanupGroupMetadata(deletedTopicPartitions: Option[Seq[TopicPartition]]) {
val startMs = time.milliseconds()
def cleanupGroupMetadata(deletedTopicPartitions: Option[Seq[TopicPartition]],
groups: Iterable[GroupMetadata],
startMs: Long) {
var offsetsRemoved = 0
groupMetadataCache.foreach { case (groupId, group) =>
groups.foreach { group =>
val groupId = group.groupId
val (removedOffsets, groupIsDead, generation) = group.inLock {
val removedOffsets = deletedTopicPartitions match {
case Some(topicPartitions) => group.removeOffsets(topicPartitions)

20
core/src/main/scala/kafka/server/KafkaApis.scala

@ -141,7 +141,8 @@ class KafkaApis(val requestChannel: RequestChannel, @@ -141,7 +141,8 @@ class KafkaApis(val requestChannel: RequestChannel,
case ApiKeys.CREATE_DELEGATION_TOKEN => handleCreateTokenRequest(request)
case ApiKeys.RENEW_DELEGATION_TOKEN => handleRenewTokenRequest(request)
case ApiKeys.EXPIRE_DELEGATION_TOKEN => handleExpireTokenRequest(request)
case ApiKeys.DESCRIBE_DELEGATION_TOKEN=> handleDescribeTokensRequest(request)
case ApiKeys.DESCRIBE_DELEGATION_TOKEN => handleDescribeTokensRequest(request)
case ApiKeys.DELETE_GROUPS => handleDeleteGroupsRequest(request)
}
} catch {
case e: FatalExitError => throw e
@ -488,7 +489,7 @@ class KafkaApis(val requestChannel: RequestChannel, @@ -488,7 +489,7 @@ class KafkaApis(val requestChannel: RequestChannel,
val nonExistingTopicResponseData = mutable.ArrayBuffer[(TopicPartition, FetchResponse.PartitionData)]()
val authorizedRequestInfo = mutable.ArrayBuffer[(TopicPartition, FetchRequest.PartitionData)]()
if (fetchRequest.isFromFollower() && !authorize(request.session, ClusterAction, Resource.ClusterResource))
if (fetchRequest.isFromFollower() && !authorize(request.session, ClusterAction, Resource.ClusterResource))
for (topicPartition <- fetchRequest.fetchData.asScala.keys)
unauthorizedTopicResponseData += topicPartition -> new FetchResponse.PartitionData(Errors.CLUSTER_AUTHORIZATION_FAILED,
FetchResponse.INVALID_HIGHWATERMARK, FetchResponse.INVALID_LAST_STABLE_OFFSET,
@ -1221,6 +1222,21 @@ class KafkaApis(val requestChannel: RequestChannel, @@ -1221,6 +1222,21 @@ class KafkaApis(val requestChannel: RequestChannel,
}
}
def handleDeleteGroupsRequest(request: RequestChannel.Request): Unit = {
val deleteGroupsRequest = request.body[DeleteGroupsRequest]
var groups = deleteGroupsRequest.groups.asScala.toSet
val (authorizedGroups, unauthorizedGroups) = groups.partition { group =>
authorize(request.session, Delete, new Resource(Group, group))
}
val groupDeletionResult = groupCoordinator.handleDeleteGroups(authorizedGroups) ++
unauthorizedGroups.map(_ -> Errors.GROUP_AUTHORIZATION_FAILED)
sendResponseMaybeThrottle(request, requestThrottleMs =>
new DeleteGroupsResponse(requestThrottleMs, groupDeletionResult.asJava))
}
def handleHeartbeatRequest(request: RequestChannel.Request) {
val heartbeatRequest = request.body[HeartbeatRequest]

40
core/src/test/scala/integration/kafka/api/AuthorizerIntegrationTest.scala

@ -78,6 +78,7 @@ class AuthorizerIntegrationTest extends BaseRequestTest { @@ -78,6 +78,7 @@ class AuthorizerIntegrationTest extends BaseRequestTest {
val groupReadAcl = Map(groupResource -> Set(new Acl(KafkaPrincipal.ANONYMOUS, Allow, Acl.WildCardHost, Read)))
val groupDescribeAcl = Map(groupResource -> Set(new Acl(KafkaPrincipal.ANONYMOUS, Allow, Acl.WildCardHost, Describe)))
val groupDeleteAcl = Map(groupResource -> Set(new Acl(KafkaPrincipal.ANONYMOUS, Allow, Acl.WildCardHost, Delete)))
val clusterAcl = Map(Resource.ClusterResource -> Set(new Acl(KafkaPrincipal.ANONYMOUS, Allow, Acl.WildCardHost, ClusterAction)))
val clusterCreateAcl = Map(Resource.ClusterResource -> Set(new Acl(KafkaPrincipal.ANONYMOUS, Allow, Acl.WildCardHost, Create)))
val clusterAlterAcl = Map(Resource.ClusterResource -> Set(new Acl(KafkaPrincipal.ANONYMOUS, Allow, Acl.WildCardHost, Alter)))
@ -125,6 +126,7 @@ class AuthorizerIntegrationTest extends BaseRequestTest { @@ -125,6 +126,7 @@ class AuthorizerIntegrationTest extends BaseRequestTest {
ApiKeys.DESCRIBE_GROUPS -> classOf[DescribeGroupsResponse],
ApiKeys.HEARTBEAT -> classOf[HeartbeatResponse],
ApiKeys.LEAVE_GROUP -> classOf[LeaveGroupResponse],
ApiKeys.DELETE_GROUPS -> classOf[DeleteGroupsResponse],
ApiKeys.LEADER_AND_ISR -> classOf[requests.LeaderAndIsrResponse],
ApiKeys.STOP_REPLICA -> classOf[requests.StopReplicaResponse],
ApiKeys.CONTROLLED_SHUTDOWN -> classOf[requests.ControlledShutdownResponse],
@ -162,6 +164,7 @@ class AuthorizerIntegrationTest extends BaseRequestTest { @@ -162,6 +164,7 @@ class AuthorizerIntegrationTest extends BaseRequestTest {
ApiKeys.DESCRIBE_GROUPS -> ((resp: DescribeGroupsResponse) => resp.groups.get(group).error),
ApiKeys.HEARTBEAT -> ((resp: HeartbeatResponse) => resp.error),
ApiKeys.LEAVE_GROUP -> ((resp: LeaveGroupResponse) => resp.error),
ApiKeys.DELETE_GROUPS -> ((resp: DeleteGroupsResponse) => resp.get(group)),
ApiKeys.LEADER_AND_ISR -> ((resp: requests.LeaderAndIsrResponse) => resp.responses.asScala.find(_._1 == tp).get._2),
ApiKeys.STOP_REPLICA -> ((resp: requests.StopReplicaResponse) => resp.responses.asScala.find(_._1 == tp).get._2),
ApiKeys.CONTROLLED_SHUTDOWN -> ((resp: requests.ControlledShutdownResponse) => resp.error),
@ -202,6 +205,7 @@ class AuthorizerIntegrationTest extends BaseRequestTest { @@ -202,6 +205,7 @@ class AuthorizerIntegrationTest extends BaseRequestTest {
ApiKeys.DESCRIBE_GROUPS -> groupDescribeAcl,
ApiKeys.HEARTBEAT -> groupReadAcl,
ApiKeys.LEAVE_GROUP -> groupReadAcl,
ApiKeys.DELETE_GROUPS -> groupDeleteAcl,
ApiKeys.LEADER_AND_ISR -> clusterAcl,
ApiKeys.STOP_REPLICA -> clusterAcl,
ApiKeys.CONTROLLED_SHUTDOWN -> clusterAcl,
@ -343,6 +347,8 @@ class AuthorizerIntegrationTest extends BaseRequestTest { @@ -343,6 +347,8 @@ class AuthorizerIntegrationTest extends BaseRequestTest {
private def leaveGroupRequest = new LeaveGroupRequest.Builder(group, "").build()
private def deleteGroupsRequest = new DeleteGroupsRequest.Builder(Set(group).asJava).build()
private def leaderAndIsrRequest = {
new requests.LeaderAndIsrRequest.Builder(ApiKeys.LEADER_AND_ISR.latestVersion, brokerId, Int.MaxValue,
Map(tp -> new LeaderAndIsrRequest.PartitionState(Int.MaxValue, brokerId, Int.MaxValue, List(brokerId).asJava, 2, Seq(brokerId).asJava, false)).asJava,
@ -468,7 +474,8 @@ class AuthorizerIntegrationTest extends BaseRequestTest { @@ -468,7 +474,8 @@ class AuthorizerIntegrationTest extends BaseRequestTest {
ApiKeys.DELETE_RECORDS -> deleteRecordsRequest,
ApiKeys.ADD_PARTITIONS_TO_TXN -> addPartitionsToTxnRequest,
ApiKeys.ADD_OFFSETS_TO_TXN -> addOffsetsToTxnRequest,
ApiKeys.CREATE_PARTITIONS -> createPartitionsRequest
ApiKeys.CREATE_PARTITIONS -> createPartitionsRequest,
ApiKeys.DELETE_GROUPS -> deleteGroupsRequest
)
for ((key, request) <- requestKeyToRequest) {
@ -495,7 +502,7 @@ class AuthorizerIntegrationTest extends BaseRequestTest { @@ -495,7 +502,7 @@ class AuthorizerIntegrationTest extends BaseRequestTest {
def testFetchFollowerRequest() {
val key = ApiKeys.FETCH
val request = createFetchFollowerRequest
removeAllAcls()
val resources = Set(topicResource.resourceType, Resource.ClusterResource.resourceType)
sendRequestAndVerifyResponseError(key, request, resources, isAuthorized = false)
@ -503,7 +510,7 @@ class AuthorizerIntegrationTest extends BaseRequestTest { @@ -503,7 +510,7 @@ class AuthorizerIntegrationTest extends BaseRequestTest {
val readAcls = topicReadAcl.get(topicResource).get
addAndVerifyAcls(readAcls, topicResource)
sendRequestAndVerifyResponseError(key, request, resources, isAuthorized = false)
val clusterAcls = clusterAcl.get(Resource.ClusterResource).get
addAndVerifyAcls(clusterAcls, Resource.ClusterResource)
sendRequestAndVerifyResponseError(key, request, resources, isAuthorized = true)
@ -960,6 +967,33 @@ class AuthorizerIntegrationTest extends BaseRequestTest { @@ -960,6 +967,33 @@ class AuthorizerIntegrationTest extends BaseRequestTest {
consumerGroupService.close()
}
@Test
def testDeleteGroupApiWithDeleteGroupAcl() {
addAndVerifyAcls(Set(new Acl(KafkaPrincipal.ANONYMOUS, Allow, Acl.WildCardHost, Read)), groupResource)
addAndVerifyAcls(Set(new Acl(KafkaPrincipal.ANONYMOUS, Allow, Acl.WildCardHost, Read)), topicResource)
addAndVerifyAcls(Set(new Acl(KafkaPrincipal.ANONYMOUS, Allow, Acl.WildCardHost, Delete)), groupResource)
this.consumers.head.assign(List(tp).asJava)
this.consumers.head.commitSync(Map(tp -> new OffsetAndMetadata(5, "")).asJava)
val result = AdminClient.createSimplePlaintext(brokerList).deleteConsumerGroups(List(group))
assert(result.size == 1 && result.keySet.contains(group) && result.get(group).contains(Errors.NONE))
}
@Test
def testDeleteGroupApiWithNoDeleteGroupAcl() {
addAndVerifyAcls(Set(new Acl(KafkaPrincipal.ANONYMOUS, Allow, Acl.WildCardHost, Read)), groupResource)
addAndVerifyAcls(Set(new Acl(KafkaPrincipal.ANONYMOUS, Allow, Acl.WildCardHost, Read)), topicResource)
this.consumers.head.assign(List(tp).asJava)
this.consumers.head.commitSync(Map(tp -> new OffsetAndMetadata(5, "")).asJava)
val result = AdminClient.createSimplePlaintext(brokerList).deleteConsumerGroups(List(group))
assert(result.size == 1 && result.keySet.contains(group) && result.get(group).contains(Errors.GROUP_AUTHORIZATION_FAILED))
}
@Test
def testDeleteGroupApiWithNoDeleteGroupAcl2() {
val result = AdminClient.createSimplePlaintext(brokerList).deleteConsumerGroups(List(group))
assert(result.size == 1 && result.keySet.contains(group) && result.get(group).contains(Errors.GROUP_AUTHORIZATION_FAILED))
}
@Test
def testUnauthorizedDeleteTopicsWithoutDescribe() {
val response = connectAndSend(deleteTopicsRequest, ApiKeys.DELETE_TOPICS)

251
core/src/test/scala/unit/kafka/admin/DeleteConsumerGroupsTest.scala

@ -0,0 +1,251 @@ @@ -0,0 +1,251 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package unit.kafka.admin
import kafka.admin.ConsumerGroupCommandTest
import kafka.utils.TestUtils
import org.apache.kafka.common.protocol.Errors
import org.junit.Assert._
import org.junit.Test
class DeleteConsumerGroupTest extends ConsumerGroupCommandTest {
@Test(expected = classOf[joptsimple.OptionException])
def testDeleteWithTopicOption() {
TestUtils.createOffsetsTopic(zkClient, servers)
val cgcArgs = Array("--bootstrap-server", brokerList, "--delete", "--group", group, "--topic")
getConsumerGroupService(cgcArgs)
fail("Expected an error due to presence of mutually exclusive options")
}
@Test
def testDeleteCmdNonExistingGroup() {
TestUtils.createOffsetsTopic(zkClient, servers)
val missingGroup = "missing.group"
val cgcArgs = Array("--bootstrap-server", brokerList, "--delete", "--group", missingGroup)
val service = getConsumerGroupService(cgcArgs)
val output = TestUtils.grabConsoleOutput(service.deleteGroups())
assertTrue(s"The expected error (${Errors.GROUP_ID_NOT_FOUND}) was not detected while deleting consumer group",
output.contains(s"Group '$missingGroup' could not be deleted due to: ${Errors.GROUP_ID_NOT_FOUND.toString}"))
}
@Test
def testDeleteNonExistingGroup() {
TestUtils.createOffsetsTopic(zkClient, servers)
val missingGroup = "missing.group"
// note the group to be deleted is a different (non-existing) group
val cgcArgs = Array("--bootstrap-server", brokerList, "--delete", "--group", missingGroup)
val service = getConsumerGroupService(cgcArgs)
val result = service.deleteGroups()
assertTrue(s"The expected error (${Errors.GROUP_ID_NOT_FOUND}) was not detected while deleting consumer group",
result.size == 1 && result.keySet.contains(missingGroup) && result.get(missingGroup).contains(Errors.GROUP_ID_NOT_FOUND))
}
@Test
def testDeleteCmdInvalidGroupId() {
TestUtils.createOffsetsTopic(zkClient, servers)
val invalidGroupId = ""
val cgcArgs = Array("--bootstrap-server", brokerList, "--delete", "--group", invalidGroupId)
val service = getConsumerGroupService(cgcArgs)
val output = TestUtils.grabConsoleOutput(service.deleteGroups())
assertTrue(s"The expected error (${Errors.INVALID_GROUP_ID}) was not detected while deleting consumer group",
output.contains(s"Group '$invalidGroupId' could not be deleted due to: ${Errors.INVALID_GROUP_ID.toString}"))
}
@Test
def testDeleteInvalidGroupId() {
TestUtils.createOffsetsTopic(zkClient, servers)
val invalidGroupId = ""
val cgcArgs = Array("--bootstrap-server", brokerList, "--delete", "--group", invalidGroupId)
val service = getConsumerGroupService(cgcArgs)
val result = service.deleteGroups()
assertTrue(s"The expected error (${Errors.INVALID_GROUP_ID}) was not detected while deleting consumer group",
result.size == 1 && result.keySet.contains(invalidGroupId) && result.get(invalidGroupId).contains(Errors.INVALID_GROUP_ID))
}
@Test
def testDeleteCmdNonEmptyGroup() {
TestUtils.createOffsetsTopic(zkClient, servers)
// run one consumer in the group
addConsumerGroupExecutor(numConsumers = 1)
val cgcArgs = Array("--bootstrap-server", brokerList, "--delete", "--group", group)
val service = getConsumerGroupService(cgcArgs)
TestUtils.waitUntilTrue(() => {
service.listGroups().contains(group)
}, "The group did not initialize as expected.")
val output = TestUtils.grabConsoleOutput(service.deleteGroups())
assertTrue(s"The expected error (${Errors.NON_EMPTY_GROUP}) was not detected while deleting consumer group",
output.contains(s"Group '$group' could not be deleted due to: ${Errors.NON_EMPTY_GROUP}"))
}
@Test
def testDeleteNonEmptyGroup() {
TestUtils.createOffsetsTopic(zkClient, servers)
// run one consumer in the group
addConsumerGroupExecutor(numConsumers = 1)
val cgcArgs = Array("--bootstrap-server", brokerList, "--delete", "--group", group)
val service = getConsumerGroupService(cgcArgs)
TestUtils.waitUntilTrue(() => {
service.listGroups().contains(group)
}, "The group did not initialize as expected.")
val result = service.deleteGroups()
assertTrue(s"The expected error (${Errors.NON_EMPTY_GROUP}) was not detected while deleting consumer group",
result.size == 1 && result.keySet.contains(group) && result.get(group).contains(Errors.NON_EMPTY_GROUP))
}
@Test
def testDeleteCmdEmptyGroup() {
TestUtils.createOffsetsTopic(zkClient, servers)
// run one consumer in the group
val executor = addConsumerGroupExecutor(numConsumers = 1)
val cgcArgs = Array("--bootstrap-server", brokerList, "--delete", "--group", group)
val service = getConsumerGroupService(cgcArgs)
TestUtils.waitUntilTrue(() => {
service.listGroups().contains(group)
}, "The group did not initialize as expected.")
executor.shutdown()
TestUtils.waitUntilTrue(() => {
service.collectGroupState().state == "Empty"
}, "The group did become empty as expected.")
val output = TestUtils.grabConsoleOutput(service.deleteGroups())
assertTrue(s"The consumer group could not be deleted as expected",
output.contains(s"Deletion of requested consumer groups ('$group') was successful."))
}
@Test
def testDeleteEmptyGroup() {
TestUtils.createOffsetsTopic(zkClient, servers)
// run one consumer in the group
val executor = addConsumerGroupExecutor(numConsumers = 1)
val cgcArgs = Array("--bootstrap-server", brokerList, "--delete", "--group", group)
val service = getConsumerGroupService(cgcArgs)
TestUtils.waitUntilTrue(() => {
service.listGroups().contains(group)
}, "The group did not initialize as expected.")
executor.shutdown()
TestUtils.waitUntilTrue(() => {
service.collectGroupState().state == "Empty"
}, "The group did become empty as expected.")
val result = service.deleteGroups()
assertTrue(s"The consumer group could not be deleted as expected",
result.size == 1 && result.keySet.contains(group) && result.get(group).contains(Errors.NONE))
}
@Test
def testDeleteCmdWithMixOfSuccessAndError() {
TestUtils.createOffsetsTopic(zkClient, servers)
val missingGroup = "missing.group"
// run one consumer in the group
val executor = addConsumerGroupExecutor(numConsumers = 1)
val cgcArgs = Array("--bootstrap-server", brokerList, "--delete", "--group", group)
val service = getConsumerGroupService(cgcArgs)
TestUtils.waitUntilTrue(() => {
service.listGroups().contains(group)
}, "The group did not initialize as expected.")
executor.shutdown()
TestUtils.waitUntilTrue(() => {
service.collectGroupState().state == "Empty"
}, "The group did become empty as expected.")
val service2 = getConsumerGroupService(cgcArgs ++ Array("--group", missingGroup))
val output = TestUtils.grabConsoleOutput(service2.deleteGroups())
assertTrue(s"The consumer group deletion did not work as expected",
output.contains(s"Group '$missingGroup' could not be deleted due to: ${Errors.GROUP_ID_NOT_FOUND}") &&
output.contains(s"These consumer groups were deleted successfully: '$group'"))
}
@Test
def testDeleteWithMixOfSuccessAndError() {
TestUtils.createOffsetsTopic(zkClient, servers)
val missingGroup = "missing.group"
// run one consumer in the group
val executor = addConsumerGroupExecutor(numConsumers = 1)
val cgcArgs = Array("--bootstrap-server", brokerList, "--delete", "--group", group)
val service = getConsumerGroupService(cgcArgs)
TestUtils.waitUntilTrue(() => {
service.listGroups().contains(group)
}, "The group did not initialize as expected.")
executor.shutdown()
TestUtils.waitUntilTrue(() => {
service.collectGroupState().state == "Empty"
}, "The group did become empty as expected.")
val service2 = getConsumerGroupService(cgcArgs ++ Array("--group", missingGroup))
val result = service2.deleteGroups()
assertTrue(s"The consumer group deletion did not work as expected",
result.size == 2 &&
result.keySet.contains(group) && result.get(group).contains(Errors.NONE) &&
result.keySet.contains(missingGroup) && result.get(missingGroup).contains(Errors.GROUP_ID_NOT_FOUND))
}
@Test
def testDeleteCmdWithShortInitialization() {
// run one consumer in the group
val executor = addConsumerGroupExecutor(numConsumers = 1)
val cgcArgs = Array("--bootstrap-server", brokerList, "--delete", "--group", group)
val service = getConsumerGroupService(cgcArgs)
val output = TestUtils.grabConsoleOutput(service.deleteGroups())
assertTrue(s"The consumer group deletion did not work as expected",
output.contains(s"Group '$group' could not be deleted due to: ${Errors.COORDINATOR_NOT_AVAILABLE}"))
}
@Test
def testDeleteWithShortInitialization() {
// run one consumer in the group
val executor = addConsumerGroupExecutor(numConsumers = 1)
val cgcArgs = Array("--bootstrap-server", brokerList, "--delete", "--group", group)
val service = getConsumerGroupService(cgcArgs)
val result = service.deleteGroups()
assertTrue(s"The consumer group deletion did not work as expected",
result.size == 1 &&
result.keySet.contains(group) && result.get(group).contains(Errors.COORDINATOR_NOT_AVAILABLE))
}
}

88
core/src/test/scala/unit/kafka/coordinator/group/GroupCoordinatorTest.scala

@ -30,6 +30,7 @@ import org.easymock.{Capture, EasyMock, IAnswer} @@ -30,6 +30,7 @@ import org.easymock.{Capture, EasyMock, IAnswer}
import java.util.concurrent.TimeUnit
import java.util.concurrent.locks.ReentrantLock
import kafka.cluster.Partition
import kafka.zk.KafkaZkClient
import org.apache.kafka.common.internals.Topic
import org.junit.Assert._
@ -1270,6 +1271,93 @@ class GroupCoordinatorTest extends JUnitSuite { @@ -1270,6 +1271,93 @@ class GroupCoordinatorTest extends JUnitSuite {
assertTrue(summary.members.forall(_.assignment.isEmpty))
}
@Test
def testDeleteNonEmptyGroup() {
val memberId = JoinGroupRequest.UNKNOWN_MEMBER_ID
val joinGroupResult = joinGroup(groupId, memberId, protocolType, protocols)
val result = groupCoordinator.handleDeleteGroups(Set(groupId).toSet)
assert(result.size == 1 && result.contains(groupId) && result.get(groupId).contains(Errors.NON_EMPTY_GROUP))
}
@Test
def testDeleteGroupWithInvalidGroupId() {
val invalidGroupId = ""
val result = groupCoordinator.handleDeleteGroups(Set(invalidGroupId).toSet)
assert(result.size == 1 && result.contains(invalidGroupId) && result.get(invalidGroupId).contains(Errors.INVALID_GROUP_ID))
}
@Test
def testDeleteGroupWithWrongCoordinator() {
val result = groupCoordinator.handleDeleteGroups(Set(otherGroupId).toSet)
assert(result.size == 1 && result.contains(otherGroupId) && result.get(otherGroupId).contains(Errors.NOT_COORDINATOR))
}
@Test
def testDeleteEmptyGroup() {
val memberId = JoinGroupRequest.UNKNOWN_MEMBER_ID
val joinGroupResult = joinGroup(groupId, memberId, protocolType, protocols)
EasyMock.reset(replicaManager)
val leaveGroupResult = leaveGroup(groupId, joinGroupResult.memberId)
assertEquals(Errors.NONE, leaveGroupResult)
val groupTopicPartition = new TopicPartition(Topic.GROUP_METADATA_TOPIC_NAME, groupPartitionId)
val partition = EasyMock.niceMock(classOf[Partition])
EasyMock.reset(replicaManager)
EasyMock.expect(replicaManager.getMagic(EasyMock.anyObject())).andStubReturn(Some(RecordBatch.CURRENT_MAGIC_VALUE))
EasyMock.expect(replicaManager.getPartition(groupTopicPartition)).andStubReturn(Some(partition))
EasyMock.expect(replicaManager.nonOfflinePartition(groupTopicPartition)).andStubReturn(Some(partition))
EasyMock.replay(replicaManager, partition)
val result = groupCoordinator.handleDeleteGroups(Set(groupId).toSet)
assert(result.size == 1 && result.contains(groupId) && result.get(groupId).contains(Errors.NONE))
}
@Test
def testDeleteEmptyGroupWithStoredOffsets() {
val memberId = JoinGroupRequest.UNKNOWN_MEMBER_ID
val joinGroupResult = joinGroup(groupId, memberId, protocolType, protocols)
val assignedMemberId = joinGroupResult.memberId
val joinGroupError = joinGroupResult.error
assertEquals(Errors.NONE, joinGroupError)
EasyMock.reset(replicaManager)
val syncGroupResult = syncGroupLeader(groupId, joinGroupResult.generationId, assignedMemberId, Map(assignedMemberId -> Array[Byte]()))
val syncGroupError = syncGroupResult._2
assertEquals(Errors.NONE, syncGroupError)
EasyMock.reset(replicaManager)
val tp = new TopicPartition("topic", 0)
val offset = OffsetAndMetadata(0)
val commitOffsetResult = commitOffsets(groupId, assignedMemberId, joinGroupResult.generationId, immutable.Map(tp -> offset))
assertEquals(Errors.NONE, commitOffsetResult(tp))
val describeGroupResult = groupCoordinator.handleDescribeGroup(groupId)
assertEquals(Stable.toString, describeGroupResult._2.state)
assertEquals(assignedMemberId, describeGroupResult._2.members.head.memberId)
EasyMock.reset(replicaManager)
val leaveGroupResult = leaveGroup(groupId, assignedMemberId)
assertEquals(Errors.NONE, leaveGroupResult)
val groupTopicPartition = new TopicPartition(Topic.GROUP_METADATA_TOPIC_NAME, groupPartitionId)
val partition = EasyMock.niceMock(classOf[Partition])
EasyMock.reset(replicaManager)
EasyMock.expect(replicaManager.getMagic(EasyMock.anyObject())).andStubReturn(Some(RecordBatch.CURRENT_MAGIC_VALUE))
EasyMock.expect(replicaManager.getPartition(groupTopicPartition)).andStubReturn(Some(partition))
EasyMock.expect(replicaManager.nonOfflinePartition(groupTopicPartition)).andStubReturn(Some(partition))
EasyMock.replay(replicaManager, partition)
val result = groupCoordinator.handleDeleteGroups(Set(groupId).toSet)
assert(result.size == 1 && result.contains(groupId) && result.get(groupId).contains(Errors.NONE))
assertEquals(Dead.toString, groupCoordinator.handleDescribeGroup(groupId)._2.state)
}
@Test
def shouldDelayInitialRebalanceByGroupInitialRebalanceDelayOnEmptyGroup() {
val firstJoinFuture = sendJoinGroup(groupId, JoinGroupRequest.UNKNOWN_MEMBER_ID, protocolType, protocols)

20
core/src/test/scala/unit/kafka/coordinator/group/GroupMetadataManagerTest.scala

@ -497,6 +497,26 @@ class GroupMetadataManagerTest { @@ -497,6 +497,26 @@ class GroupMetadataManagerTest {
}
}
@Test
def testGroupNotExists() {
// group is not owned
assertFalse(groupMetadataManager.groupNotExists(groupId))
groupMetadataManager.addPartitionOwnership(groupPartitionId)
// group is owned but does not exist yet
assertTrue(groupMetadataManager.groupNotExists(groupId))
val group = new GroupMetadata(groupId, initialState = Empty)
groupMetadataManager.addGroup(group)
// group is owned but not Dead
assertFalse(groupMetadataManager.groupNotExists(groupId))
group.transitionTo(Dead)
// group is owned and Dead
assertTrue(groupMetadataManager.groupNotExists(groupId))
}
private def appendConsumerOffsetCommit(buffer: ByteBuffer, baseOffset: Long, offsets: Map[TopicPartition, Long]) = {
val builder = MemoryRecords.builder(buffer, CompressionType.NONE, TimestampType.LOG_APPEND_TIME, baseOffset)
val commitRecords = createCommittedOffsetRecords(offsets)

8
core/src/test/scala/unit/kafka/server/RequestQuotaTest.scala

@ -312,12 +312,15 @@ class RequestQuotaTest extends BaseRequestTest { @@ -312,12 +312,15 @@ class RequestQuotaTest extends BaseRequestTest {
case ApiKeys.EXPIRE_DELEGATION_TOKEN =>
new ExpireDelegationTokenRequest.Builder(ByteBuffer.allocate(10), 1000)
case ApiKeys.DESCRIBE_DELEGATION_TOKEN=>
case ApiKeys.DESCRIBE_DELEGATION_TOKEN =>
new DescribeDelegationTokenRequest.Builder(Collections.singletonList(SecurityUtils.parseKafkaPrincipal("User:test")))
case ApiKeys.RENEW_DELEGATION_TOKEN=>
case ApiKeys.RENEW_DELEGATION_TOKEN =>
new RenewDelegationTokenRequest.Builder(ByteBuffer.allocate(10), 1000)
case ApiKeys.DELETE_GROUPS =>
new DeleteGroupsRequest.Builder(Collections.singleton("test-group"))
case _ =>
throw new IllegalArgumentException("Unsupported API key " + apiKey)
}
@ -416,6 +419,7 @@ class RequestQuotaTest extends BaseRequestTest { @@ -416,6 +419,7 @@ class RequestQuotaTest extends BaseRequestTest {
case ApiKeys.DESCRIBE_DELEGATION_TOKEN=> new DescribeDelegationTokenResponse(response).throttleTimeMs
case ApiKeys.EXPIRE_DELEGATION_TOKEN => new ExpireDelegationTokenResponse(response).throttleTimeMs
case ApiKeys.RENEW_DELEGATION_TOKEN => new RenewDelegationTokenResponse(response).throttleTimeMs
case ApiKeys.DELETE_GROUPS => new DeleteGroupsResponse(response).throttleTimeMs
case requestId => throw new IllegalArgumentException(s"No throttle time for $requestId")
}
}

Loading…
Cancel
Save