Browse Source

KAFKA-8992; Redefine RemoveMembersFromGroup interface on AdminClient (#7478)

This PR fixes the inconsistency involved in the `removeMembersFromGroup` admin API calls:

1. Fail the `all()` request when there is sub level error (either partition or member)
2. Change getMembers() to members()
3. Hide the actual Errors from user
4. Do not expose generated MemberIdentity type
5. Use more consistent naming for Options and Result types

Reviewers: Guozhang Wang <wangguoz@gmail.com>, David Jacot <djacot@confluent.io>, Jason Gustafson <jason@confluent.io>
pull/7585/head
Boyang Chen 5 years ago committed by Jason Gustafson
parent
commit
77fc498889
  1. 2
      clients/src/main/java/org/apache/kafka/clients/admin/Admin.java
  2. 65
      clients/src/main/java/org/apache/kafka/clients/admin/DeleteConsumerGroupOffsetsResult.java
  3. 77
      clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java
  4. 58
      clients/src/main/java/org/apache/kafka/clients/admin/MemberToRemove.java
  5. 50
      clients/src/main/java/org/apache/kafka/clients/admin/RemoveMemberFromConsumerGroupOptions.java
  6. 85
      clients/src/main/java/org/apache/kafka/clients/admin/RemoveMemberFromGroupResult.java
  7. 28
      clients/src/main/java/org/apache/kafka/clients/admin/RemoveMembersFromConsumerGroupOptions.java
  8. 96
      clients/src/main/java/org/apache/kafka/clients/admin/RemoveMembersFromConsumerGroupResult.java
  9. 6
      clients/src/main/java/org/apache/kafka/common/requests/LeaveGroupRequest.java
  10. 118
      clients/src/test/java/org/apache/kafka/clients/admin/DeleteConsumerGroupOffsetsResultTest.java
  11. 138
      clients/src/test/java/org/apache/kafka/clients/admin/KafkaAdminClientTest.java
  12. 50
      clients/src/test/java/org/apache/kafka/clients/admin/MembershipChangeResultTest.java
  13. 2
      clients/src/test/java/org/apache/kafka/clients/admin/MockAdminClient.java
  14. 154
      clients/src/test/java/org/apache/kafka/clients/admin/RemoveMemberFromGroupResultTest.java
  15. 13
      clients/src/test/java/org/apache/kafka/clients/admin/RemoveMembersFromConsumerGroupOptionsTest.java
  16. 119
      clients/src/test/java/org/apache/kafka/clients/admin/RemoveMembersFromConsumerGroupResultTest.java
  17. 68
      core/src/main/scala/kafka/admin/ConsumerGroupCommand.scala
  18. 61
      core/src/test/scala/integration/kafka/api/AdminClientIntegrationTest.scala
  19. 2
      core/src/test/scala/integration/kafka/api/AuthorizerIntegrationTest.scala
  20. 18
      core/src/test/scala/unit/kafka/admin/DeleteOffsetsConsumerGroupCommandIntegrationTest.scala

2
clients/src/main/java/org/apache/kafka/clients/admin/Admin.java

@ -1082,7 +1082,7 @@ public interface Admin extends AutoCloseable { @@ -1082,7 +1082,7 @@ public interface Admin extends AutoCloseable {
* @param options The options to carry removing members' information.
* @return The MembershipChangeResult.
*/
MembershipChangeResult removeMemberFromConsumerGroup(String groupId, RemoveMemberFromConsumerGroupOptions options);
RemoveMembersFromConsumerGroupResult removeMembersFromConsumerGroup(String groupId, RemoveMembersFromConsumerGroupOptions options);
/**
* <p>Alters offsets for the specified group. In order to succeed, the group must be empty.

65
clients/src/main/java/org/apache/kafka/clients/admin/DeleteConsumerGroupOffsetsResult.java

@ -18,8 +18,6 @@ package org.apache.kafka.clients.admin; @@ -18,8 +18,6 @@ package org.apache.kafka.clients.admin;
import java.util.Set;
import org.apache.kafka.common.KafkaFuture;
import org.apache.kafka.common.KafkaFuture.BaseFunction;
import org.apache.kafka.common.KafkaFuture.BiConsumer;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.annotation.InterfaceStability;
@ -35,50 +33,65 @@ import org.apache.kafka.common.protocol.Errors; @@ -35,50 +33,65 @@ import org.apache.kafka.common.protocol.Errors;
@InterfaceStability.Evolving
public class DeleteConsumerGroupOffsetsResult {
private final KafkaFuture<Map<TopicPartition, Errors>> future;
private final Set<TopicPartition> partitions;
DeleteConsumerGroupOffsetsResult(KafkaFuture<Map<TopicPartition, Errors>> future) {
DeleteConsumerGroupOffsetsResult(KafkaFuture<Map<TopicPartition, Errors>> future, Set<TopicPartition> partitions) {
this.future = future;
this.partitions = partitions;
}
/**
* Return a future which can be used to check the result for a given partition.
*/
public KafkaFuture<Void> partitionResult(final TopicPartition partition) {
if (!partitions.contains(partition)) {
throw new IllegalArgumentException("Partition " + partition + " was not included in the original request");
}
final KafkaFutureImpl<Void> result = new KafkaFutureImpl<>();
this.future.whenComplete(new BiConsumer<Map<TopicPartition, Errors>, Throwable>() {
@Override
public void accept(final Map<TopicPartition, Errors> topicPartitions, final Throwable throwable) {
if (throwable != null) {
result.completeExceptionally(throwable);
} else if (!topicPartitions.containsKey(partition)) {
result.completeExceptionally(new IllegalArgumentException(
"Group offset deletion for partition \"" + partition +
"\" was not attempted"));
} else {
final Errors error = topicPartitions.get(partition);
if (error == Errors.NONE) {
result.complete(null);
} else {
result.completeExceptionally(error.exception());
}
}
this.future.whenComplete((topicPartitions, throwable) -> {
if (throwable != null) {
result.completeExceptionally(throwable);
} else if (!maybeCompleteExceptionally(topicPartitions, partition, result)) {
result.complete(null);
}
});
return result;
}
/**
* Return a future which succeeds only if all the deletions succeed.
* If not, the first partition error shall be returned.
*/
public KafkaFuture<Void> all() {
return this.future.thenApply(new BaseFunction<Map<TopicPartition, Errors>, Void>() {
@Override
public Void apply(final Map<TopicPartition, Errors> topicPartitionErrorsMap) {
return null;
final KafkaFutureImpl<Void> result = new KafkaFutureImpl<>();
this.future.whenComplete((topicPartitions, throwable) -> {
if (throwable != null) {
result.completeExceptionally(throwable);
} else {
for (TopicPartition partition : partitions) {
if (maybeCompleteExceptionally(topicPartitions, partition, result)) {
return;
}
}
result.complete(null);
}
});
return result;
}
private boolean maybeCompleteExceptionally(Map<TopicPartition, Errors> partitionLevelErrors,
TopicPartition partition,
KafkaFutureImpl<Void> result) {
Throwable exception = KafkaAdminClient.getSubLevelError(partitionLevelErrors, partition,
"Offset deletion result for partition \"" + partition + "\" was not included in the response");
if (exception != null) {
result.completeExceptionally(exception);
return true;
} else {
return false;
}
}
}

77
clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java

@ -87,6 +87,8 @@ import org.apache.kafka.common.message.IncrementalAlterConfigsRequestData; @@ -87,6 +87,8 @@ import org.apache.kafka.common.message.IncrementalAlterConfigsRequestData;
import org.apache.kafka.common.message.IncrementalAlterConfigsRequestData.AlterConfigsResource;
import org.apache.kafka.common.message.IncrementalAlterConfigsRequestData.AlterableConfig;
import org.apache.kafka.common.message.IncrementalAlterConfigsRequestData.AlterableConfigCollection;
import org.apache.kafka.common.message.LeaveGroupRequestData.MemberIdentity;
import org.apache.kafka.common.message.LeaveGroupResponseData.MemberResponse;
import org.apache.kafka.common.message.ListGroupsRequestData;
import org.apache.kafka.common.message.ListGroupsResponseData;
import org.apache.kafka.common.message.ListPartitionReassignmentsRequestData;
@ -158,6 +160,7 @@ import org.apache.kafka.common.requests.FindCoordinatorRequest.CoordinatorType; @@ -158,6 +160,7 @@ import org.apache.kafka.common.requests.FindCoordinatorRequest.CoordinatorType;
import org.apache.kafka.common.requests.FindCoordinatorResponse;
import org.apache.kafka.common.requests.IncrementalAlterConfigsRequest;
import org.apache.kafka.common.requests.IncrementalAlterConfigsResponse;
import org.apache.kafka.common.requests.JoinGroupRequest;
import org.apache.kafka.common.requests.LeaveGroupRequest;
import org.apache.kafka.common.requests.LeaveGroupResponse;
import org.apache.kafka.common.requests.ListGroupsRequest;
@ -2653,7 +2656,7 @@ public class KafkaAdminClient extends AdminClient { @@ -2653,7 +2656,7 @@ public class KafkaAdminClient extends AdminClient {
ConsumerGroupOperationContext<ConsumerGroupDescription, DescribeConsumerGroupsOptions> context =
new ConsumerGroupOperationContext<>(groupId, options, deadline, futures.get(groupId));
Call findCoordinatorCall = getFindCoordinatorCall(context,
() -> KafkaAdminClient.this.getDescribeConsumerGroupsCall(context));
() -> getDescribeConsumerGroupsCall(context));
runnable.call(findCoordinatorCall, startFindCoordinatorMs);
}
@ -2980,7 +2983,7 @@ public class KafkaAdminClient extends AdminClient { @@ -2980,7 +2983,7 @@ public class KafkaAdminClient extends AdminClient {
new ConsumerGroupOperationContext<>(groupId, options, deadline, groupOffsetListingFuture);
Call findCoordinatorCall = getFindCoordinatorCall(context,
() -> KafkaAdminClient.this.getListConsumerGroupOffsetsCall(context));
() -> getListConsumerGroupOffsetsCall(context));
runnable.call(findCoordinatorCall, startFindCoordinatorMs);
return new ListConsumerGroupOffsetsResult(groupOffsetListingFuture);
@ -3052,7 +3055,7 @@ public class KafkaAdminClient extends AdminClient { @@ -3052,7 +3055,7 @@ public class KafkaAdminClient extends AdminClient {
ConsumerGroupOperationContext<Void, DeleteConsumerGroupsOptions> context =
new ConsumerGroupOperationContext<>(groupId, options, deadline, future);
Call findCoordinatorCall = getFindCoordinatorCall(context,
() -> KafkaAdminClient.this.getDeleteConsumerGroupsCall(context));
() -> getDeleteConsumerGroupsCall(context));
runnable.call(findCoordinatorCall, startFindCoordinatorMs);
}
@ -3104,7 +3107,7 @@ public class KafkaAdminClient extends AdminClient { @@ -3104,7 +3107,7 @@ public class KafkaAdminClient extends AdminClient {
if (groupIdIsUnrepresentable(groupId)) {
future.completeExceptionally(new InvalidGroupIdException("The given group id '" +
groupId + "' cannot be represented in a request."));
return new DeleteConsumerGroupOffsetsResult(future);
return new DeleteConsumerGroupOffsetsResult(future, partitions);
}
final long startFindCoordinatorMs = time.milliseconds();
@ -3113,10 +3116,10 @@ public class KafkaAdminClient extends AdminClient { @@ -3113,10 +3116,10 @@ public class KafkaAdminClient extends AdminClient {
new ConsumerGroupOperationContext<>(groupId, options, deadline, future);
Call findCoordinatorCall = getFindCoordinatorCall(context,
() -> KafkaAdminClient.this.getDeleteConsumerGroupOffsetsCall(context, partitions));
() -> getDeleteConsumerGroupOffsetsCall(context, partitions));
runnable.call(findCoordinatorCall, startFindCoordinatorMs);
return new DeleteConsumerGroupOffsetsResult(future);
return new DeleteConsumerGroupOffsetsResult(future, partitions);
}
private Call getDeleteConsumerGroupOffsetsCall(
@ -3162,13 +3165,10 @@ public class KafkaAdminClient extends AdminClient { @@ -3162,13 +3165,10 @@ public class KafkaAdminClient extends AdminClient {
return;
final Map<TopicPartition, Errors> partitions = new HashMap<>();
response.data.topics().forEach(topic -> {
topic.partitions().forEach(partition -> {
partitions.put(
new TopicPartition(topic.name(), partition.partitionIndex()),
Errors.forCode(partition.errorCode()));
});
});
response.data.topics().forEach(topic -> topic.partitions().forEach(partition -> partitions.put(
new TopicPartition(topic.name(), partition.partitionIndex()),
Errors.forCode(partition.errorCode())))
);
context.future().complete(partitions);
}
@ -3468,33 +3468,32 @@ public class KafkaAdminClient extends AdminClient { @@ -3468,33 +3468,32 @@ public class KafkaAdminClient extends AdminClient {
}
@Override
public MembershipChangeResult removeMemberFromConsumerGroup(String groupId,
RemoveMemberFromConsumerGroupOptions options) {
public RemoveMembersFromConsumerGroupResult removeMembersFromConsumerGroup(String groupId,
RemoveMembersFromConsumerGroupOptions options) {
final long startFindCoordinatorMs = time.milliseconds();
final long deadline = calcDeadlineMs(startFindCoordinatorMs, options.timeoutMs());
KafkaFutureImpl<RemoveMemberFromGroupResult> future = new KafkaFutureImpl<>();
KafkaFutureImpl<Map<MemberIdentity, Errors>> future = new KafkaFutureImpl<>();
ConsumerGroupOperationContext<RemoveMemberFromGroupResult, RemoveMemberFromConsumerGroupOptions> context =
ConsumerGroupOperationContext<Map<MemberIdentity, Errors>, RemoveMembersFromConsumerGroupOptions> context =
new ConsumerGroupOperationContext<>(groupId, options, deadline, future);
Call findCoordinatorCall = getFindCoordinatorCall(context,
() -> KafkaAdminClient.this.getRemoveMembersFromGroupCall(context));
() -> getRemoveMembersFromGroupCall(context));
runnable.call(findCoordinatorCall, startFindCoordinatorMs);
return new MembershipChangeResult(future);
return new RemoveMembersFromConsumerGroupResult(future, options.members());
}
private Call getRemoveMembersFromGroupCall(ConsumerGroupOperationContext
<RemoveMemberFromGroupResult, RemoveMemberFromConsumerGroupOptions> context) {
private Call getRemoveMembersFromGroupCall(ConsumerGroupOperationContext<Map<MemberIdentity, Errors>, RemoveMembersFromConsumerGroupOptions> context) {
return new Call("leaveGroup",
context.deadline(),
new ConstantNodeIdProvider(context.node().get().id())) {
@Override
LeaveGroupRequest.Builder createRequest(int timeoutMs) {
return new LeaveGroupRequest.Builder(context.groupId(),
context.options().getMembers());
context.options().members().stream().map(
MemberToRemove::toMemberIdentity).collect(Collectors.toList()));
}
@Override
@ -3507,16 +3506,19 @@ public class KafkaAdminClient extends AdminClient { @@ -3507,16 +3506,19 @@ public class KafkaAdminClient extends AdminClient {
return;
}
// If error is transient coordinator error, retry
Errors error = response.error();
if (error == Errors.COORDINATOR_LOAD_IN_PROGRESS || error == Errors.COORDINATOR_NOT_AVAILABLE) {
throw error.exception();
}
final RemoveMemberFromGroupResult membershipChangeResult =
new RemoveMemberFromGroupResult(response, context.options().getMembers());
if (handleGroupRequestError(response.topLevelError(), context.future()))
return;
context.future().complete(membershipChangeResult);
final Map<MemberIdentity, Errors> memberErrors = new HashMap<>();
for (MemberResponse memberResponse : response.memberResponses()) {
// We set member.id to empty here explicitly, so that the lookup will succeed as user doesn't
// know the exact member.id.
memberErrors.put(new MemberIdentity()
.setMemberId(JoinGroupRequest.UNKNOWN_MEMBER_ID)
.setGroupInstanceId(memberResponse.groupInstanceId()),
Errors.forCode(memberResponse.errorCode()));
}
context.future().complete(memberErrors);
}
@Override
@ -3736,4 +3738,15 @@ public class KafkaAdminClient extends AdminClient { @@ -3736,4 +3738,15 @@ public class KafkaAdminClient extends AdminClient {
return calls;
}
/**
* Get a sub level error when the request is in batch. If given key was not found,
* return an {@link IllegalArgumentException}.
*/
static <K> Throwable getSubLevelError(Map<K, Errors> subLevelErrors, K subKey, String keyNotFoundMsg) {
if (!subLevelErrors.containsKey(subKey)) {
return new IllegalArgumentException(keyNotFoundMsg);
} else {
return subLevelErrors.get(subKey).exception();
}
}
}

58
clients/src/main/java/org/apache/kafka/clients/admin/MemberToRemove.java

@ -0,0 +1,58 @@ @@ -0,0 +1,58 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.clients.admin;
import org.apache.kafka.common.message.LeaveGroupRequestData.MemberIdentity;
import org.apache.kafka.common.requests.JoinGroupRequest;
import java.util.Objects;
/**
* A struct containing information about the member to be removed.
*/
public class MemberToRemove {
private final String groupInstanceId;
public MemberToRemove(String groupInstanceId) {
this.groupInstanceId = groupInstanceId;
}
@Override
public boolean equals(Object o) {
if (o instanceof MemberToRemove) {
MemberToRemove otherMember = (MemberToRemove) o;
return this.groupInstanceId.equals(otherMember.groupInstanceId);
} else {
return false;
}
}
@Override
public int hashCode() {
return Objects.hash(groupInstanceId);
}
MemberIdentity toMemberIdentity() {
return new MemberIdentity()
.setGroupInstanceId(groupInstanceId)
.setMemberId(JoinGroupRequest.UNKNOWN_MEMBER_ID);
}
public String groupInstanceId() {
return groupInstanceId;
}
}

50
clients/src/main/java/org/apache/kafka/clients/admin/RemoveMemberFromConsumerGroupOptions.java

@ -1,50 +0,0 @@ @@ -1,50 +0,0 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.clients.admin;
import org.apache.kafka.common.annotation.InterfaceStability;
import org.apache.kafka.common.message.LeaveGroupRequestData.MemberIdentity;
import org.apache.kafka.common.requests.JoinGroupRequest;
import java.util.Collection;
import java.util.List;
import java.util.stream.Collectors;
/**
* Options for {@link AdminClient#removeMemberFromConsumerGroup(String, RemoveMemberFromConsumerGroupOptions)}.
* It carries the members to be removed from the consumer group.
*
* The API of this class is evolving, see {@link AdminClient} for details.
*/
@InterfaceStability.Evolving
public class RemoveMemberFromConsumerGroupOptions extends AbstractOptions<RemoveMemberFromConsumerGroupOptions> {
private List<MemberIdentity> members;
public RemoveMemberFromConsumerGroupOptions(Collection<String> groupInstanceIds) {
members = groupInstanceIds.stream().map(
instanceId -> new MemberIdentity()
.setGroupInstanceId(instanceId)
.setMemberId(JoinGroupRequest.UNKNOWN_MEMBER_ID)
).collect(Collectors.toList());
}
public List<MemberIdentity> getMembers() {
return members;
}
}

85
clients/src/main/java/org/apache/kafka/clients/admin/RemoveMemberFromGroupResult.java

@ -1,85 +0,0 @@ @@ -1,85 +0,0 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.clients.admin;
import org.apache.kafka.common.KafkaFuture;
import org.apache.kafka.common.internals.KafkaFutureImpl;
import org.apache.kafka.common.message.LeaveGroupRequestData.MemberIdentity;
import org.apache.kafka.common.message.LeaveGroupResponseData.MemberResponse;
import org.apache.kafka.common.protocol.Errors;
import org.apache.kafka.common.requests.LeaveGroupResponse;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
/**
* Result of a batch member removal operation.
*/
public class RemoveMemberFromGroupResult {
private final Errors topLevelError;
private final Map<MemberIdentity, KafkaFuture<Void>> memberFutures;
private boolean hasError = false;
RemoveMemberFromGroupResult(LeaveGroupResponse response,
List<MemberIdentity> membersToRemove) {
this.topLevelError = response.topLevelError();
this.memberFutures = new HashMap<>(membersToRemove.size());
if (this.topLevelError != Errors.NONE) {
// If the populated error is a top-level error, fail every member's future.
for (MemberIdentity memberIdentity : membersToRemove) {
KafkaFutureImpl<Void> future = new KafkaFutureImpl<>();
future.completeExceptionally(topLevelError.exception());
memberFutures.put(memberIdentity, future);
}
hasError = true;
} else {
for (MemberResponse memberResponse : response.memberResponses()) {
KafkaFutureImpl<Void> future = new KafkaFutureImpl<>();
Errors memberError = Errors.forCode(memberResponse.errorCode());
if (memberError != Errors.NONE) {
future.completeExceptionally(memberError.exception());
hasError = true;
} else {
future.complete(null);
}
memberFutures.put(new MemberIdentity()
.setMemberId(memberResponse.memberId())
.setGroupInstanceId(memberResponse.groupInstanceId()), future);
}
}
}
public Errors topLevelError() {
return topLevelError;
}
public boolean hasError() {
return hasError;
}
/**
* Futures of members with corresponding errors when they leave the group.
*
* @return list of members who failed to be removed
*/
public Map<MemberIdentity, KafkaFuture<Void>> memberFutures() {
return memberFutures;
}
}

28
clients/src/main/java/org/apache/kafka/clients/admin/MembershipChangeResult.java → clients/src/main/java/org/apache/kafka/clients/admin/RemoveMembersFromConsumerGroupOptions.java

@ -16,34 +16,28 @@ @@ -16,34 +16,28 @@
*/
package org.apache.kafka.clients.admin;
import org.apache.kafka.common.KafkaFuture;
import org.apache.kafka.common.annotation.InterfaceStability;
import java.util.concurrent.ExecutionException;
import java.util.Collection;
import java.util.HashSet;
import java.util.Set;
/**
* The result of the {@link KafkaAdminClient#removeMemberFromConsumerGroup(String, RemoveMemberFromConsumerGroupOptions)} call.
* Options for {@link AdminClient#removeMembersFromConsumerGroup(String, RemoveMembersFromConsumerGroupOptions)}.
* It carries the members to be removed from the consumer group.
*
* The API of this class is evolving, see {@link AdminClient} for details.
*/
@InterfaceStability.Evolving
public class MembershipChangeResult {
public class RemoveMembersFromConsumerGroupOptions extends AbstractOptions<RemoveMembersFromConsumerGroupOptions> {
private KafkaFuture<RemoveMemberFromGroupResult> future;
private Set<MemberToRemove> members;
MembershipChangeResult(KafkaFuture<RemoveMemberFromGroupResult> future) {
this.future = future;
public RemoveMembersFromConsumerGroupOptions(Collection<MemberToRemove> members) {
this.members = new HashSet<>(members);
}
/**
* Return a future which contains the member removal results.
*/
public RemoveMemberFromGroupResult all() throws ExecutionException, InterruptedException {
return future.get();
}
// Visible for testing
public KafkaFuture<RemoveMemberFromGroupResult> future() {
return future;
public Set<MemberToRemove> members() {
return members;
}
}

96
clients/src/main/java/org/apache/kafka/clients/admin/RemoveMembersFromConsumerGroupResult.java

@ -0,0 +1,96 @@ @@ -0,0 +1,96 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.clients.admin;
import org.apache.kafka.common.KafkaFuture;
import org.apache.kafka.common.internals.KafkaFutureImpl;
import org.apache.kafka.common.message.LeaveGroupRequestData.MemberIdentity;
import org.apache.kafka.common.protocol.Errors;
import java.util.Map;
import java.util.Set;
/**
* The result of the {@link Admin#removeMembersFromConsumerGroup(String, RemoveMembersFromConsumerGroupOptions)} call.
*
* The API of this class is evolving, see {@link Admin} for details.
*/
public class RemoveMembersFromConsumerGroupResult {
private final KafkaFuture<Map<MemberIdentity, Errors>> future;
private final Set<MemberToRemove> memberInfos;
RemoveMembersFromConsumerGroupResult(KafkaFuture<Map<MemberIdentity, Errors>> future,
Set<MemberToRemove> memberInfos) {
this.future = future;
this.memberInfos = memberInfos;
}
/**
* Returns a future which indicates whether the request was 100% success, i.e. no
* either top level or member level error.
* If not, the first member error shall be returned.
*/
public KafkaFuture<Void> all() {
final KafkaFutureImpl<Void> result = new KafkaFutureImpl<>();
this.future.whenComplete((memberErrors, throwable) -> {
if (throwable != null) {
result.completeExceptionally(throwable);
} else {
for (MemberToRemove memberToRemove : memberInfos) {
if (maybeCompleteExceptionally(memberErrors, memberToRemove.toMemberIdentity(), result)) {
return;
}
}
result.complete(null);
}
});
return result;
}
/**
* Returns the selected member future.
*/
public KafkaFuture<Void> memberResult(MemberToRemove member) {
if (!memberInfos.contains(member)) {
throw new IllegalArgumentException("Member " + member + " was not included in the original request");
}
final KafkaFutureImpl<Void> result = new KafkaFutureImpl<>();
this.future.whenComplete((memberErrors, throwable) -> {
if (throwable != null) {
result.completeExceptionally(throwable);
} else if (!maybeCompleteExceptionally(memberErrors, member.toMemberIdentity(), result)) {
result.complete(null);
}
});
return result;
}
private boolean maybeCompleteExceptionally(Map<MemberIdentity, Errors> memberErrors,
MemberIdentity member,
KafkaFutureImpl<Void> result) {
Throwable exception = KafkaAdminClient.getSubLevelError(memberErrors, member,
"Member \"" + member + "\" was not included in the removal response");
if (exception != null) {
result.completeExceptionally(exception);
return true;
} else {
return false;
}
}
}

6
clients/src/main/java/org/apache/kafka/common/requests/LeaveGroupRequest.java

@ -36,7 +36,11 @@ public class LeaveGroupRequest extends AbstractRequest { @@ -36,7 +36,11 @@ public class LeaveGroupRequest extends AbstractRequest {
private final List<MemberIdentity> members;
public Builder(String groupId, List<MemberIdentity> members) {
super(ApiKeys.LEAVE_GROUP);
this(groupId, members, ApiKeys.LEAVE_GROUP.oldestVersion(), ApiKeys.LEAVE_GROUP.latestVersion());
}
Builder(String groupId, List<MemberIdentity> members, short oldestVersion, short latestVersion) {
super(ApiKeys.LEAVE_GROUP, oldestVersion, latestVersion);
this.groupId = groupId;
this.members = members;
if (members.isEmpty()) {

118
clients/src/test/java/org/apache/kafka/clients/admin/DeleteConsumerGroupOffsetsResultTest.java

@ -0,0 +1,118 @@ @@ -0,0 +1,118 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.clients.admin;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.errors.GroupAuthorizationException;
import org.apache.kafka.common.errors.UnknownTopicOrPartitionException;
import org.apache.kafka.common.internals.KafkaFutureImpl;
import org.apache.kafka.common.protocol.Errors;
import org.apache.kafka.test.TestUtils;
import org.junit.Before;
import org.junit.Test;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ExecutionException;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertNull;
import static org.junit.Assert.assertThrows;
public class DeleteConsumerGroupOffsetsResultTest {
private final String topic = "topic";
private final TopicPartition tpZero = new TopicPartition(topic, 0);
private final TopicPartition tpOne = new TopicPartition(topic, 1);
private Set<TopicPartition> partitions;
private Map<TopicPartition, Errors> errorsMap;
private KafkaFutureImpl<Map<TopicPartition, Errors>> partitionFutures;
@Before
public void setUp() {
partitionFutures = new KafkaFutureImpl<>();
partitions = new HashSet<>();
partitions.add(tpZero);
partitions.add(tpOne);
errorsMap = new HashMap<>();
errorsMap.put(tpZero, Errors.NONE);
errorsMap.put(tpOne, Errors.UNKNOWN_TOPIC_OR_PARTITION);
}
@Test
public void testTopLevelErrorConstructor() throws InterruptedException {
partitionFutures.completeExceptionally(Errors.GROUP_AUTHORIZATION_FAILED.exception());
DeleteConsumerGroupOffsetsResult topLevelErrorResult =
new DeleteConsumerGroupOffsetsResult(partitionFutures, partitions);
TestUtils.assertFutureError(topLevelErrorResult.all(), GroupAuthorizationException.class);
}
@Test
public void testPartitionLevelErrorConstructor() throws ExecutionException, InterruptedException {
createAndVerifyPartitionLevelErrror();
}
@Test
public void testPartitionMissingInResponseErrorConstructor() throws InterruptedException, ExecutionException {
errorsMap.remove(tpOne);
partitionFutures.complete(errorsMap);
assertFalse(partitionFutures.isCompletedExceptionally());
DeleteConsumerGroupOffsetsResult missingPartitionResult =
new DeleteConsumerGroupOffsetsResult(partitionFutures, partitions);
TestUtils.assertFutureError(missingPartitionResult.all(), IllegalArgumentException.class);
assertNull(missingPartitionResult.partitionResult(tpZero).get());
TestUtils.assertFutureError(missingPartitionResult.partitionResult(tpOne), IllegalArgumentException.class);
}
@Test
public void testPartitionMissingInRequestErrorConstructor() throws InterruptedException, ExecutionException {
DeleteConsumerGroupOffsetsResult partitionLevelErrorResult = createAndVerifyPartitionLevelErrror();
assertThrows(IllegalArgumentException.class, () -> partitionLevelErrorResult.partitionResult(new TopicPartition("invalid-topic", 0)));
}
@Test
public void testNoErrorConstructor() throws ExecutionException, InterruptedException {
Map<TopicPartition, Errors> errorsMap = new HashMap<>();
errorsMap.put(tpZero, Errors.NONE);
errorsMap.put(tpOne, Errors.NONE);
DeleteConsumerGroupOffsetsResult noErrorResult =
new DeleteConsumerGroupOffsetsResult(partitionFutures, partitions);
partitionFutures.complete(errorsMap);
assertNull(noErrorResult.all().get());
assertNull(noErrorResult.partitionResult(tpZero).get());
assertNull(noErrorResult.partitionResult(tpOne).get());
}
private DeleteConsumerGroupOffsetsResult createAndVerifyPartitionLevelErrror() throws InterruptedException, ExecutionException {
partitionFutures.complete(errorsMap);
assertFalse(partitionFutures.isCompletedExceptionally());
DeleteConsumerGroupOffsetsResult partitionLevelErrorResult =
new DeleteConsumerGroupOffsetsResult(partitionFutures, partitions);
TestUtils.assertFutureError(partitionLevelErrorResult.all(), UnknownTopicOrPartitionException.class);
assertNull(partitionLevelErrorResult.partitionResult(tpZero).get());
TestUtils.assertFutureError(partitionLevelErrorResult.partitionResult(tpOne), UnknownTopicOrPartitionException.class);
return partitionLevelErrorResult;
}
}

138
clients/src/test/java/org/apache/kafka/clients/admin/KafkaAdminClientTest.java

@ -41,6 +41,7 @@ import org.apache.kafka.common.acl.AclPermissionType; @@ -41,6 +41,7 @@ import org.apache.kafka.common.acl.AclPermissionType;
import org.apache.kafka.common.config.ConfigResource;
import org.apache.kafka.common.errors.AuthenticationException;
import org.apache.kafka.common.errors.ClusterAuthorizationException;
import org.apache.kafka.common.errors.FencedInstanceIdException;
import org.apache.kafka.common.errors.GroupAuthorizationException;
import org.apache.kafka.common.errors.GroupSubscribedToTopicException;
import org.apache.kafka.common.errors.InvalidRequestException;
@ -158,6 +159,7 @@ import static org.junit.Assert.assertEquals; @@ -158,6 +159,7 @@ import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertNull;
import static org.junit.Assert.assertThrows;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
@ -1709,10 +1711,10 @@ public class KafkaAdminClientTest { @@ -1709,10 +1711,10 @@ public class KafkaAdminClientTest {
final DeleteConsumerGroupOffsetsResult errorResult = env.adminClient().deleteConsumerGroupOffsets(
groupId, Stream.of(tp1, tp2).collect(Collectors.toSet()));
assertNull(errorResult.all().get());
assertNull(errorResult.partitionResult(tp1).get());
TestUtils.assertFutureError(errorResult.all(), GroupSubscribedToTopicException.class);
TestUtils.assertFutureError(errorResult.partitionResult(tp2), GroupSubscribedToTopicException.class);
TestUtils.assertFutureError(errorResult.partitionResult(tp3), IllegalArgumentException.class);
assertThrows(IllegalArgumentException.class, () -> errorResult.partitionResult(tp3));
}
}
@ -1785,13 +1787,13 @@ public class KafkaAdminClientTest { @@ -1785,13 +1787,13 @@ public class KafkaAdminClientTest {
final String groupId = "group-0";
final TopicPartition tp1 = new TopicPartition("foo", 0);
final List<Errors> retriableErrors = Arrays.asList(
final List<Errors> nonRetriableErrors = Arrays.asList(
Errors.GROUP_AUTHORIZATION_FAILED, Errors.INVALID_GROUP_ID, Errors.GROUP_ID_NOT_FOUND);
try (AdminClientUnitTestEnv env = new AdminClientUnitTestEnv(cluster)) {
env.kafkaClient().setNodeApiVersions(NodeApiVersions.create());
for (Errors error : retriableErrors) {
for (Errors error : nonRetriableErrors) {
env.kafkaClient().prepareResponse(
prepareFindCoordinatorResponse(Errors.NONE, env.cluster().controller()));
@ -1939,16 +1941,6 @@ public class KafkaAdminClientTest { @@ -1939,16 +1941,6 @@ public class KafkaAdminClientTest {
final String instanceTwo = "instance-2";
env.kafkaClient().setNodeApiVersions(NodeApiVersions.create());
MemberResponse responseOne = new MemberResponse()
.setGroupInstanceId(instanceOne)
.setErrorCode(Errors.UNKNOWN_MEMBER_ID.code());
MemberResponse responseTwo = new MemberResponse()
.setGroupInstanceId(instanceTwo)
.setErrorCode(Errors.NONE.code());
List<MemberResponse> memberResponses = Arrays.asList(responseOne, responseTwo);
// Retriable FindCoordinatorResponse errors should be retried
env.kafkaClient().prepareResponse(prepareFindCoordinatorResponse(Errors.COORDINATOR_NOT_AVAILABLE, Node.noNode()));
env.kafkaClient().prepareResponse(prepareFindCoordinatorResponse(Errors.COORDINATOR_LOAD_IN_PROGRESS, Node.noNode()));
@ -1966,79 +1958,76 @@ public class KafkaAdminClientTest { @@ -1966,79 +1958,76 @@ public class KafkaAdminClientTest {
.setErrorCode(Errors.UNKNOWN_SERVER_ERROR.code())));
String groupId = "groupId";
List<String> membersToRemove = Arrays.asList(instanceOne, instanceTwo);
final MembershipChangeResult unknownErrorResult = env.adminClient().removeMemberFromConsumerGroup(
Collection<MemberToRemove> membersToRemove = Arrays.asList(new MemberToRemove(instanceOne),
new MemberToRemove(instanceTwo));
final RemoveMembersFromConsumerGroupResult unknownErrorResult = env.adminClient().removeMembersFromConsumerGroup(
groupId,
new RemoveMemberFromConsumerGroupOptions(membersToRemove)
new RemoveMembersFromConsumerGroupOptions(membersToRemove)
);
RemoveMemberFromGroupResult result = unknownErrorResult.all();
assertTrue(result.hasError());
assertEquals(Errors.UNKNOWN_SERVER_ERROR, result.topLevelError());
Map<MemberIdentity, KafkaFuture<Void>> memberFutures = result.memberFutures();
assertEquals(2, memberFutures.size());
for (Map.Entry<MemberIdentity, KafkaFuture<Void>> entry : memberFutures.entrySet()) {
KafkaFuture<Void> memberFuture = entry.getValue();
assertTrue(memberFuture.isCompletedExceptionally());
try {
memberFuture.get();
fail("get() should throw exception");
} catch (ExecutionException | InterruptedException e0) {
assertTrue(e0.getCause() instanceof UnknownServerException);
}
}
MemberToRemove memberOne = new MemberToRemove(instanceOne);
MemberToRemove memberTwo = new MemberToRemove(instanceTwo);
TestUtils.assertFutureError(unknownErrorResult.all(), UnknownServerException.class);
TestUtils.assertFutureError(unknownErrorResult.memberResult(memberOne), UnknownServerException.class);
TestUtils.assertFutureError(unknownErrorResult.memberResult(memberTwo), UnknownServerException.class);
MemberResponse responseOne = new MemberResponse()
.setGroupInstanceId(instanceOne)
.setErrorCode(Errors.UNKNOWN_MEMBER_ID.code());
MemberResponse responseTwo = new MemberResponse()
.setGroupInstanceId(instanceTwo)
.setErrorCode(Errors.NONE.code());
// Inject one member level error.
env.kafkaClient().prepareResponse(prepareFindCoordinatorResponse(Errors.NONE, env.cluster().controller()));
env.kafkaClient().prepareResponse(new LeaveGroupResponse(new LeaveGroupResponseData()
.setErrorCode(Errors.NONE.code())
.setMembers(memberResponses)));
.setMembers(Arrays.asList(responseOne, responseTwo))));
final MembershipChangeResult memberLevelErrorResult = env.adminClient().removeMemberFromConsumerGroup(
final RemoveMembersFromConsumerGroupResult memberLevelErrorResult = env.adminClient().removeMembersFromConsumerGroup(
groupId,
new RemoveMemberFromConsumerGroupOptions(membersToRemove)
new RemoveMembersFromConsumerGroupOptions(membersToRemove)
);
result = memberLevelErrorResult.all();
assertTrue(result.hasError());
assertEquals(Errors.NONE, result.topLevelError());
memberFutures = result.memberFutures();
assertEquals(2, memberFutures.size());
for (Map.Entry<MemberIdentity, KafkaFuture<Void>> entry : memberFutures.entrySet()) {
KafkaFuture<Void> memberFuture = entry.getValue();
if (entry.getKey().groupInstanceId().equals(instanceOne)) {
try {
memberFuture.get();
fail("get() should throw ExecutionException");
} catch (ExecutionException | InterruptedException e0) {
assertTrue(e0.getCause() instanceof UnknownMemberIdException);
}
} else {
assertFalse(memberFuture.isCompletedExceptionally());
}
}
TestUtils.assertFutureError(memberLevelErrorResult.all(), UnknownMemberIdException.class);
TestUtils.assertFutureError(memberLevelErrorResult.memberResult(memberOne), UnknownMemberIdException.class);
assertNull(memberLevelErrorResult.memberResult(memberTwo).get());
// Return success.
// Return with missing member.
env.kafkaClient().prepareResponse(prepareFindCoordinatorResponse(Errors.NONE, env.cluster().controller()));
env.kafkaClient().prepareResponse(new LeaveGroupResponse(new LeaveGroupResponseData()
.setErrorCode(Errors.NONE.code())
.setMembers(Collections.singletonList(responseTwo))));
final MembershipChangeResult noErrorResult = env.adminClient().removeMemberFromConsumerGroup(
final RemoveMembersFromConsumerGroupResult missingMemberResult = env.adminClient().removeMembersFromConsumerGroup(
groupId,
new RemoveMemberFromConsumerGroupOptions(membersToRemove)
new RemoveMembersFromConsumerGroupOptions(membersToRemove)
);
result = noErrorResult.all();
assertFalse(result.hasError());
assertEquals(Errors.NONE, result.topLevelError());
memberFutures = result.memberFutures();
assertEquals(1, memberFutures.size());
for (Map.Entry<MemberIdentity, KafkaFuture<Void>> entry : memberFutures.entrySet()) {
assertFalse(entry.getValue().isCompletedExceptionally());
}
TestUtils.assertFutureError(missingMemberResult.all(), IllegalArgumentException.class);
// The memberOne was not included in the response.
TestUtils.assertFutureError(missingMemberResult.memberResult(memberOne), IllegalArgumentException.class);
assertNull(missingMemberResult.memberResult(memberTwo).get());
// Return with success.
env.kafkaClient().prepareResponse(prepareFindCoordinatorResponse(Errors.NONE, env.cluster().controller()));
env.kafkaClient().prepareResponse(new LeaveGroupResponse(
new LeaveGroupResponseData().setErrorCode(Errors.NONE.code()).setMembers(
Arrays.asList(responseTwo,
new MemberResponse().setGroupInstanceId(instanceOne).setErrorCode(Errors.NONE.code())
))
));
final RemoveMembersFromConsumerGroupResult noErrorResult = env.adminClient().removeMembersFromConsumerGroup(
groupId,
new RemoveMembersFromConsumerGroupOptions(membersToRemove)
);
assertNull(noErrorResult.all().get());
assertNull(noErrorResult.memberResult(memberOne).get());
assertNull(noErrorResult.memberResult(memberTwo).get());
}
}
@ -2708,6 +2697,21 @@ public class KafkaAdminClientTest { @@ -2708,6 +2697,21 @@ public class KafkaAdminClientTest {
}
}
@Test
public void testGetSubLevelError() {
List<MemberIdentity> memberIdentities = Arrays.asList(
new MemberIdentity().setGroupInstanceId("instance-0"),
new MemberIdentity().setGroupInstanceId("instance-1"));
Map<MemberIdentity, Errors> errorsMap = new HashMap<>();
errorsMap.put(memberIdentities.get(0), Errors.NONE);
errorsMap.put(memberIdentities.get(1), Errors.FENCED_INSTANCE_ID);
assertEquals(IllegalArgumentException.class, KafkaAdminClient.getSubLevelError(errorsMap,
new MemberIdentity().setGroupInstanceId("non-exist-id"), "For unit test").getClass());
assertNull(KafkaAdminClient.getSubLevelError(errorsMap, memberIdentities.get(0), "For unit test"));
assertEquals(FencedInstanceIdException.class, KafkaAdminClient.getSubLevelError(
errorsMap, memberIdentities.get(1), "For unit test").getClass());
}
private static MemberDescription convertToMemberDescriptions(DescribedGroupMember member,
MemberAssignment assignment) {
return new MemberDescription(member.memberId(),

50
clients/src/test/java/org/apache/kafka/clients/admin/MembershipChangeResultTest.java

@ -1,50 +0,0 @@ @@ -1,50 +0,0 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.clients.admin;
import org.apache.kafka.common.internals.KafkaFutureImpl;
import org.apache.kafka.common.message.LeaveGroupResponseData;
import org.apache.kafka.common.requests.LeaveGroupResponse;
import org.junit.Test;
import java.util.Collections;
import java.util.concurrent.ExecutionException;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.fail;
public class MembershipChangeResultTest {
@Test
public void testConstructor() {
KafkaFutureImpl<RemoveMemberFromGroupResult> removeMemberFuture = new KafkaFutureImpl<>();
MembershipChangeResult changeResult = new MembershipChangeResult(removeMemberFuture);
assertEquals(removeMemberFuture, changeResult.future());
RemoveMemberFromGroupResult removeMemberFromGroupResult = new RemoveMemberFromGroupResult(
new LeaveGroupResponse(new LeaveGroupResponseData()),
Collections.emptyList()
);
removeMemberFuture.complete(removeMemberFromGroupResult);
try {
assertEquals(removeMemberFromGroupResult, changeResult.all());
} catch (ExecutionException | InterruptedException e) {
fail("Unexpected exception " + e + " when trying to get remove member result");
}
}
}

2
clients/src/test/java/org/apache/kafka/clients/admin/MockAdminClient.java

@ -364,7 +364,7 @@ public class MockAdminClient extends AdminClient { @@ -364,7 +364,7 @@ public class MockAdminClient extends AdminClient {
}
@Override
public MembershipChangeResult removeMemberFromConsumerGroup(String groupId, RemoveMemberFromConsumerGroupOptions options) {
public RemoveMembersFromConsumerGroupResult removeMembersFromConsumerGroup(String groupId, RemoveMembersFromConsumerGroupOptions options) {
throw new UnsupportedOperationException("Not implemented yet");
}

154
clients/src/test/java/org/apache/kafka/clients/admin/RemoveMemberFromGroupResultTest.java

@ -1,154 +0,0 @@ @@ -1,154 +0,0 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.clients.admin;
import org.apache.kafka.common.KafkaFuture;
import org.apache.kafka.common.errors.FencedInstanceIdException;
import org.apache.kafka.common.errors.GroupAuthorizationException;
import org.apache.kafka.common.message.LeaveGroupRequestData.MemberIdentity;
import org.apache.kafka.common.message.LeaveGroupResponseData;
import org.apache.kafka.common.message.LeaveGroupResponseData.MemberResponse;
import org.apache.kafka.common.protocol.Errors;
import org.apache.kafka.common.requests.LeaveGroupResponse;
import org.junit.Before;
import org.junit.Test;
import java.util.Arrays;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ExecutionException;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.fail;
public class RemoveMemberFromGroupResultTest {
private String instanceOne = "instance-1";
private String instanceTwo = "instance-2";
private List<MemberIdentity> membersToRemove;
private List<MemberResponse> memberResponses;
@Before
public void setUp() {
membersToRemove = Arrays.asList(
new MemberIdentity()
.setGroupInstanceId(instanceOne),
new MemberIdentity()
.setGroupInstanceId(instanceTwo)
);
memberResponses = Arrays.asList(
new MemberResponse()
.setGroupInstanceId(instanceOne),
new MemberResponse()
.setGroupInstanceId(instanceTwo)
);
}
@Test
public void testTopLevelErrorConstructor() {
RemoveMemberFromGroupResult topLevelErrorResult =
new RemoveMemberFromGroupResult(new LeaveGroupResponse(
new LeaveGroupResponseData()
.setErrorCode(Errors.GROUP_AUTHORIZATION_FAILED.code())
.setMembers(memberResponses)), membersToRemove);
assertTrue(topLevelErrorResult.hasError());
assertEquals(Errors.GROUP_AUTHORIZATION_FAILED, topLevelErrorResult.topLevelError());
Map<MemberIdentity, KafkaFuture<Void>> memberFutures = topLevelErrorResult.memberFutures();
assertEquals(2, memberFutures.size());
for (Map.Entry<MemberIdentity, KafkaFuture<Void>> entry : memberFutures.entrySet()) {
KafkaFuture<Void> memberFuture = entry.getValue();
assertTrue(memberFuture.isCompletedExceptionally());
try {
memberFuture.get();
fail("get() should throw ExecutionException");
} catch (ExecutionException | InterruptedException e0) {
assertTrue(e0.getCause() instanceof GroupAuthorizationException);
}
}
}
@Test
public void testMemberLevelErrorConstructor() {
MemberResponse responseOne = new MemberResponse()
.setGroupInstanceId(instanceOne)
.setErrorCode(Errors.FENCED_INSTANCE_ID.code());
MemberResponse responseTwo = new MemberResponse()
.setGroupInstanceId(instanceTwo)
.setErrorCode(Errors.NONE.code());
RemoveMemberFromGroupResult memberLevelErrorResult = new RemoveMemberFromGroupResult(
new LeaveGroupResponse(new LeaveGroupResponseData()
.setMembers(Arrays.asList(responseOne, responseTwo))),
membersToRemove);
assertTrue(memberLevelErrorResult.hasError());
assertEquals(Errors.NONE, memberLevelErrorResult.topLevelError());
Map<MemberIdentity, KafkaFuture<Void>> memberFutures = memberLevelErrorResult.memberFutures();
assertEquals(2, memberFutures.size());
for (Map.Entry<MemberIdentity, KafkaFuture<Void>> entry : memberFutures.entrySet()) {
KafkaFuture<Void> memberFuture = entry.getValue();
if (entry.getKey().groupInstanceId().equals(instanceOne)) {
assertTrue(memberFuture.isCompletedExceptionally());
try {
memberFuture.get();
fail("get() should throw ExecutionException");
} catch (ExecutionException | InterruptedException e0) {
assertTrue(e0.getCause() instanceof FencedInstanceIdException);
}
} else {
assertFalse(memberFuture.isCompletedExceptionally());
try {
memberFuture.get();
} catch (ExecutionException | InterruptedException e0) {
fail("get() shouldn't throw exception");
}
}
}
}
@Test
public void testNoErrorConstructor() {
MemberResponse responseOne = new MemberResponse()
.setGroupInstanceId(instanceOne)
.setErrorCode(Errors.NONE.code());
MemberResponse responseTwo = new MemberResponse()
.setGroupInstanceId(instanceTwo)
.setErrorCode(Errors.NONE.code());
// If no error is specified, failed members are not visible.
RemoveMemberFromGroupResult noErrorResult = new RemoveMemberFromGroupResult(
new LeaveGroupResponse(new LeaveGroupResponseData()
.setMembers(Arrays.asList(responseOne, responseTwo))),
membersToRemove);
assertFalse(noErrorResult.hasError());
assertEquals(Errors.NONE, noErrorResult.topLevelError());
Map<MemberIdentity, KafkaFuture<Void>> memberFutures = noErrorResult.memberFutures();
assertEquals(2, memberFutures.size());
for (Map.Entry<MemberIdentity, KafkaFuture<Void>> entry : memberFutures.entrySet()) {
try {
entry.getValue().get();
} catch (ExecutionException | InterruptedException e0) {
fail("get() shouldn't throw exception");
}
}
}
}

13
clients/src/test/java/org/apache/kafka/clients/admin/RemoveMemberFromConsumerGroupOptionsTest.java → clients/src/test/java/org/apache/kafka/clients/admin/RemoveMembersFromConsumerGroupOptionsTest.java

@ -16,23 +16,20 @@ @@ -16,23 +16,20 @@
*/
package org.apache.kafka.clients.admin;
import org.apache.kafka.common.message.LeaveGroupRequestData.MemberIdentity;
import org.junit.Test;
import java.util.Collections;
import java.util.List;
import static org.junit.Assert.assertEquals;
public class RemoveMemberFromConsumerGroupOptionsTest {
public class RemoveMembersFromConsumerGroupOptionsTest {
@Test
public void testConstructor() {
List<String> groupInstanceIds = Collections.singletonList("instance-1");
RemoveMembersFromConsumerGroupOptions options = new RemoveMembersFromConsumerGroupOptions(
Collections.singleton(new MemberToRemove("instance-1")));
RemoveMemberFromConsumerGroupOptions options = new RemoveMemberFromConsumerGroupOptions(groupInstanceIds);
assertEquals(Collections.singletonList(
new MemberIdentity().setGroupInstanceId("instance-1")), options.getMembers());
assertEquals(Collections.singleton(
new MemberToRemove("instance-1")), options.members());
}
}

119
clients/src/test/java/org/apache/kafka/clients/admin/RemoveMembersFromConsumerGroupResultTest.java

@ -0,0 +1,119 @@ @@ -0,0 +1,119 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.clients.admin;
import org.apache.kafka.common.errors.FencedInstanceIdException;
import org.apache.kafka.common.errors.GroupAuthorizationException;
import org.apache.kafka.common.internals.KafkaFutureImpl;
import org.apache.kafka.common.message.LeaveGroupRequestData.MemberIdentity;
import org.apache.kafka.common.protocol.Errors;
import org.apache.kafka.test.TestUtils;
import org.junit.Before;
import org.junit.Test;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ExecutionException;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertNull;
import static org.junit.Assert.assertThrows;
public class RemoveMembersFromConsumerGroupResultTest {
private final MemberToRemove instanceOne = new MemberToRemove("instance-1");
private final MemberToRemove instanceTwo = new MemberToRemove("instance-2");
private Set<MemberToRemove> membersToRemove;
private Map<MemberIdentity, Errors> errorsMap;
private KafkaFutureImpl<Map<MemberIdentity, Errors>> memberFutures;
@Before
public void setUp() {
memberFutures = new KafkaFutureImpl<>();
membersToRemove = new HashSet<>();
membersToRemove.add(instanceOne);
membersToRemove.add(instanceTwo);
errorsMap = new HashMap<>();
errorsMap.put(instanceOne.toMemberIdentity(), Errors.NONE);
errorsMap.put(instanceTwo.toMemberIdentity(), Errors.FENCED_INSTANCE_ID);
}
@Test
public void testTopLevelErrorConstructor() throws InterruptedException {
memberFutures.completeExceptionally(Errors.GROUP_AUTHORIZATION_FAILED.exception());
RemoveMembersFromConsumerGroupResult topLevelErrorResult =
new RemoveMembersFromConsumerGroupResult(memberFutures, membersToRemove);
TestUtils.assertFutureError(topLevelErrorResult.all(), GroupAuthorizationException.class);
}
@Test
public void testMemberLevelErrorConstructor() throws InterruptedException, ExecutionException {
createAndVerifyMemberLevelError();
}
@Test
public void testMemberMissingErrorInRequestConstructor() throws InterruptedException, ExecutionException {
errorsMap.remove(instanceTwo.toMemberIdentity());
memberFutures.complete(errorsMap);
assertFalse(memberFutures.isCompletedExceptionally());
RemoveMembersFromConsumerGroupResult missingMemberResult =
new RemoveMembersFromConsumerGroupResult(memberFutures, membersToRemove);
TestUtils.assertFutureError(missingMemberResult.all(), IllegalArgumentException.class);
assertNull(missingMemberResult.memberResult(instanceOne).get());
TestUtils.assertFutureError(missingMemberResult.memberResult(instanceTwo), IllegalArgumentException.class);
}
@Test
public void testMemberLevelErrorInResponseConstructor() throws InterruptedException, ExecutionException {
RemoveMembersFromConsumerGroupResult memberLevelErrorResult = createAndVerifyMemberLevelError();
assertThrows(IllegalArgumentException.class, () -> memberLevelErrorResult.memberResult(
new MemberToRemove("invalid-instance-id"))
);
}
@Test
public void testNoErrorConstructor() throws ExecutionException, InterruptedException {
Map<MemberIdentity, Errors> errorsMap = new HashMap<>();
errorsMap.put(instanceOne.toMemberIdentity(), Errors.NONE);
errorsMap.put(instanceTwo.toMemberIdentity(), Errors.NONE);
RemoveMembersFromConsumerGroupResult noErrorResult =
new RemoveMembersFromConsumerGroupResult(memberFutures, membersToRemove);
memberFutures.complete(errorsMap);
assertNull(noErrorResult.all().get());
assertNull(noErrorResult.memberResult(instanceOne).get());
assertNull(noErrorResult.memberResult(instanceTwo).get());
}
private RemoveMembersFromConsumerGroupResult createAndVerifyMemberLevelError() throws InterruptedException, ExecutionException {
memberFutures.complete(errorsMap);
assertFalse(memberFutures.isCompletedExceptionally());
RemoveMembersFromConsumerGroupResult memberLevelErrorResult =
new RemoveMembersFromConsumerGroupResult(memberFutures, membersToRemove);
TestUtils.assertFutureError(memberLevelErrorResult.all(), FencedInstanceIdException.class);
assertNull(memberLevelErrorResult.memberResult(instanceOne).get());
TestUtils.assertFutureError(memberLevelErrorResult.memberResult(instanceTwo), FencedInstanceIdException.class);
return memberLevelErrorResult;
}
}

68
core/src/main/scala/kafka/admin/ConsumerGroupCommand.scala

@ -403,8 +403,8 @@ object ConsumerGroupCommand extends Logging { @@ -403,8 +403,8 @@ object ConsumerGroupCommand extends Logging {
result
}
def deleteOffsets(groupId: String, topics: List[String]): Map[TopicPartition, Throwable] = {
var result: Map[TopicPartition, Throwable] = mutable.HashMap()
def deleteOffsets(groupId: String, topics: List[String]): (Errors, Map[TopicPartition, Throwable]) = {
var partitionLevelResult: Map[TopicPartition, Throwable] = mutable.HashMap()
val (topicWithPartitions, topicWithoutPartitions) = topics.partition(_.contains(":"))
@ -426,7 +426,7 @@ object ConsumerGroupCommand extends Logging { @@ -426,7 +426,7 @@ object ConsumerGroupCommand extends Logging {
new TopicPartition(topic, partition.partition())
}
case Failure(e) =>
result += new TopicPartition(topic, -1) -> e
partitionLevelResult += new TopicPartition(topic, -1) -> e
List.empty
}
}
@ -439,46 +439,54 @@ object ConsumerGroupCommand extends Logging { @@ -439,46 +439,54 @@ object ConsumerGroupCommand extends Logging {
withTimeoutMs(new DeleteConsumerGroupOffsetsOptions)
)
deleteResult.all().get()
var topLevelException = Errors.NONE
Try(deleteResult.all.get) match {
case Success(_) =>
case Failure(e) => topLevelException = Errors.forException(e.getCause)
}
partitions.foreach { partition =>
Try(deleteResult.partitionResult(partition).get()) match {
case Success(_) => result += partition -> null
case Failure(e) => result += partition -> e
case Success(_) => partitionLevelResult += partition -> null
case Failure(e) => partitionLevelResult += partition -> e
}
}
result
(topLevelException, partitionLevelResult)
}
def deleteOffsets(): Unit = {
val groupId = opts.options.valueOf(opts.groupOpt)
val topics = opts.options.valuesOf(opts.topicOpt).asScala.toList
try {
val result = deleteOffsets(groupId, topics)
val (topLevelResult, partitionLevelResult) = deleteOffsets(groupId, topics)
topLevelResult match {
case Errors.NONE =>
println(s"Request succeed for deleting offsets with topic ${topics.mkString(", ")} group $groupId")
case Errors.INVALID_GROUP_ID =>
printError(s"'$groupId' is not valid.")
case Errors.GROUP_ID_NOT_FOUND =>
printError(s"'$groupId' does not exist.")
case Errors.GROUP_AUTHORIZATION_FAILED =>
printError(s"Access to '$groupId' is not authorized.")
case Errors.NON_EMPTY_GROUP =>
printError(s"Deleting offsets of a consumer group '$groupId' is forbidden if the group is not empty.")
case Errors.GROUP_SUBSCRIBED_TO_TOPIC |
Errors.TOPIC_AUTHORIZATION_FAILED |
Errors.UNKNOWN_TOPIC_OR_PARTITION =>
printError(s"Encounter some partition level error, see the follow-up details:")
case _ =>
printError(s"Encounter some unknown error: $topLevelResult")
}
println("\n%-30s %-15s %-15s".format("TOPIC", "PARTITION", "STATUS"))
result.toList.sortBy(t => t._1.topic + t._1.partition.toString).foreach { case (tp, error) =>
println("%-30s %-15s %-15s".format(
tp.topic,
if (tp.partition >= 0) tp.partition else "Not Provided",
if (error != null) s"Error: ${error.getMessage}" else "Successful"
))
}
} catch {
case e: ExecutionException =>
Errors.forException(e.getCause) match {
case Errors.INVALID_GROUP_ID =>
printError(s"'$groupId' is not valid.")
case Errors.GROUP_ID_NOT_FOUND =>
printError(s"'$groupId' does not exist.")
case Errors.GROUP_AUTHORIZATION_FAILED =>
printError(s"Access to '$groupId' is not authorized.")
case Errors.NON_EMPTY_GROUP =>
printError(s"Deleting offsets of a non consumer group '$groupId' is forbidden if the group is not empty.")
case _ => throw e
}
println("\n%-30s %-15s %-15s".format("TOPIC", "PARTITION", "STATUS"))
partitionLevelResult.toList.sortBy(t => t._1.topic + t._1.partition.toString).foreach { case (tp, error) =>
println("%-30s %-15s %-15s".format(
tp.topic,
if (tp.partition >= 0) tp.partition else "Not Provided",
if (error != null) s"Error: ${error.getMessage}" else "Successful"
))
}
}

61
core/src/test/scala/integration/kafka/api/AdminClientIntegrationTest.scala

@ -33,7 +33,6 @@ import kafka.utils.TestUtils._ @@ -33,7 +33,6 @@ import kafka.utils.TestUtils._
import kafka.utils.{Log4jController, Logging, TestUtils}
import kafka.zk.KafkaZkClient
import org.apache.kafka.clients.admin._
import org.apache.kafka.clients.consumer.ConsumerRecords
import org.apache.kafka.clients.consumer.{ConsumerConfig, KafkaConsumer}
import org.apache.kafka.clients.producer.KafkaProducer
import org.apache.kafka.clients.producer.ProducerRecord
@ -41,7 +40,6 @@ import org.apache.kafka.common.{ConsumerGroupState, ElectionType, TopicPartition @@ -41,7 +40,6 @@ import org.apache.kafka.common.{ConsumerGroupState, ElectionType, TopicPartition
import org.apache.kafka.common.acl._
import org.apache.kafka.common.config.{ConfigResource, LogLevelConfig}
import org.apache.kafka.common.errors._
import org.apache.kafka.common.protocol.Errors
import org.apache.kafka.common.requests.{DeleteRecordsRequest, MetadataResponse}
import org.apache.kafka.common.resource.{PatternType, Resource, ResourcePattern, ResourceType}
import org.apache.kafka.common.utils.{Time, Utils}
@ -1235,7 +1233,7 @@ class AdminClientIntegrationTest extends IntegrationTestHarness with Logging { @@ -1235,7 +1233,7 @@ class AdminClientIntegrationTest extends IntegrationTestHarness with Logging {
// Test that we can list the new group.
TestUtils.waitUntilTrue(() => {
val matching = client.listConsumerGroups.all.get().asScala.filter(_.groupId == testGroupId)
!matching.isEmpty
matching.nonEmpty
}, s"Expected to be able to list $testGroupId")
val describeWithFakeGroupResult = client.describeConsumerGroups(Seq(testGroupId, fakeGroupId).asJava,
@ -1247,7 +1245,7 @@ class AdminClientIntegrationTest extends IntegrationTestHarness with Logging { @@ -1247,7 +1245,7 @@ class AdminClientIntegrationTest extends IntegrationTestHarness with Logging {
var testGroupDescription = describeWithFakeGroupResult.describedGroups().get(testGroupId).get()
assertEquals(testGroupId, testGroupDescription.groupId())
assertFalse(testGroupDescription.isSimpleConsumerGroup())
assertFalse(testGroupDescription.isSimpleConsumerGroup)
assertEquals(1, testGroupDescription.members().size())
val member = testGroupDescription.members().iterator().next()
assertEquals(testClientId, member.clientId())
@ -1281,25 +1279,13 @@ class AdminClientIntegrationTest extends IntegrationTestHarness with Logging { @@ -1281,25 +1279,13 @@ class AdminClientIntegrationTest extends IntegrationTestHarness with Logging {
// Test delete non-exist consumer instance
val invalidInstanceId = "invalid-instance-id"
var removeMemberResult = client.removeMemberFromConsumerGroup(testGroupId, new RemoveMemberFromConsumerGroupOptions(
Collections.singletonList(invalidInstanceId)
)).all()
var removeMembersResult = client.removeMembersFromConsumerGroup(testGroupId, new RemoveMembersFromConsumerGroupOptions(
Collections.singleton(new MemberToRemove(invalidInstanceId))
))
assertTrue(removeMemberResult.hasError)
assertEquals(Errors.NONE, removeMemberResult.topLevelError)
val firstMemberFutures = removeMemberResult.memberFutures()
assertEquals(1, firstMemberFutures.size)
firstMemberFutures.values.asScala foreach { case value =>
try {
value.get()
} catch {
case e: ExecutionException =>
assertTrue(e.getCause.isInstanceOf[UnknownMemberIdException])
case t: Throwable =>
fail(s"Should have caught exception in getting member future: $t")
}
}
TestUtils.assertFutureExceptionTypeEquals(removeMembersResult.all, classOf[UnknownMemberIdException])
val firstMemberFuture = removeMembersResult.memberResult(new MemberToRemove(invalidInstanceId))
TestUtils.assertFutureExceptionTypeEquals(firstMemberFuture, classOf[UnknownMemberIdException])
// Test consumer group deletion
var deleteResult = client.deleteConsumerGroups(Seq(testGroupId, fakeGroupId).asJava)
@ -1316,25 +1302,13 @@ class AdminClientIntegrationTest extends IntegrationTestHarness with Logging { @@ -1316,25 +1302,13 @@ class AdminClientIntegrationTest extends IntegrationTestHarness with Logging {
classOf[GroupNotEmptyException])
// Test delete correct member
removeMemberResult = client.removeMemberFromConsumerGroup(testGroupId, new RemoveMemberFromConsumerGroupOptions(
Collections.singletonList(testInstanceId)
)).all()
removeMembersResult = client.removeMembersFromConsumerGroup(testGroupId, new RemoveMembersFromConsumerGroupOptions(
Collections.singleton(new MemberToRemove(testInstanceId))
))
assertFalse(removeMemberResult.hasError)
assertEquals(Errors.NONE, removeMemberResult.topLevelError)
val deletedMemberFutures = removeMemberResult.memberFutures()
assertEquals(1, firstMemberFutures.size)
deletedMemberFutures.values.asScala foreach { case value =>
try {
value.get()
} catch {
case e: ExecutionException =>
assertTrue(e.getCause.isInstanceOf[UnknownMemberIdException])
case t: Throwable =>
fail(s"Should have caught exception in getting member future: $t")
}
}
assertNull(removeMembersResult.all().get())
val validMemberFuture = removeMembersResult.memberResult(new MemberToRemove(testInstanceId))
assertNull(validMemberFuture.get())
// The group should contain no member now.
val describeTestGroupResult = client.describeConsumerGroups(Seq(testGroupId).asJava,
@ -1404,7 +1378,8 @@ class AdminClientIntegrationTest extends IntegrationTestHarness with Logging { @@ -1404,7 +1378,8 @@ class AdminClientIntegrationTest extends IntegrationTestHarness with Logging {
// Test offset deletion while consuming
val offsetDeleteResult = client.deleteConsumerGroupOffsets(testGroupId, Set(tp1, tp2).asJava)
assertNull(offsetDeleteResult.all().get())
// Top level error will equal to the first partition level error
assertFutureExceptionTypeEquals(offsetDeleteResult.all(), classOf[GroupSubscribedToTopicException])
assertFutureExceptionTypeEquals(offsetDeleteResult.partitionResult(tp1),
classOf[GroupSubscribedToTopicException])
assertFutureExceptionTypeEquals(offsetDeleteResult.partitionResult(tp2),
@ -1426,11 +1401,11 @@ class AdminClientIntegrationTest extends IntegrationTestHarness with Logging { @@ -1426,11 +1401,11 @@ class AdminClientIntegrationTest extends IntegrationTestHarness with Logging {
// Test offset deletion when group is empty
val offsetDeleteResult = client.deleteConsumerGroupOffsets(testGroupId, Set(tp1, tp2).asJava)
assertNull(offsetDeleteResult.all().get())
assertFutureExceptionTypeEquals(offsetDeleteResult.all(),
classOf[UnknownTopicOrPartitionException])
assertNull(offsetDeleteResult.partitionResult(tp1).get())
assertFutureExceptionTypeEquals(offsetDeleteResult.partitionResult(tp2),
classOf[UnknownTopicOrPartitionException])
} finally {
Utils.closeQuietly(client, "adminClient")
}

2
core/src/test/scala/integration/kafka/api/AuthorizerIntegrationTest.scala

@ -1399,7 +1399,7 @@ class AuthorizerIntegrationTest extends BaseRequestTest { @@ -1399,7 +1399,7 @@ class AuthorizerIntegrationTest extends BaseRequestTest {
addAndVerifyAcls(Set(new AccessControlEntry(userPrincipalStr, WildcardHost, DELETE, ALLOW)), groupResource)
addAndVerifyAcls(Set(new AccessControlEntry(userPrincipalStr, WildcardHost, READ, ALLOW)), groupResource)
val result = createAdminClient().deleteConsumerGroupOffsets(group, Set(tp).asJava)
assertNull(result.all().get())
TestUtils.assertFutureExceptionTypeEquals(result.all(), classOf[TopicAuthorizationException])
TestUtils.assertFutureExceptionTypeEquals(result.partitionResult(tp), classOf[TopicAuthorizationException])
}

18
core/src/test/scala/unit/kafka/admin/DeleteOffsetsConsumerGroupCommandIntegrationTest.scala

@ -18,7 +18,6 @@ @@ -18,7 +18,6 @@
package kafka.admin
import java.util.Properties
import java.util.concurrent.ExecutionException
import kafka.server.Defaults
import kafka.utils.TestUtils
@ -51,14 +50,9 @@ class DeleteOffsetsConsumerGroupCommandIntegrationTest extends ConsumerGroupComm @@ -51,14 +50,9 @@ class DeleteOffsetsConsumerGroupCommandIntegrationTest extends ConsumerGroupComm
val group = "missing.group"
val topic = "foo:1"
val service = getConsumerGroupService(getArgs(group, topic))
try {
service.deleteOffsets(group, List(topic))
fail("GroupIdNotFoundException should have been raised")
} catch {
case e: ExecutionException =>
if (e.getCause != Errors.GROUP_ID_NOT_FOUND.exception())
throw e
}
val (error, _) = service.deleteOffsets(group, List(topic))
assertEquals(Errors.GROUP_ID_NOT_FOUND, error)
}
@Test
@ -134,8 +128,12 @@ class DeleteOffsetsConsumerGroupCommandIntegrationTest extends ConsumerGroupComm @@ -134,8 +128,12 @@ class DeleteOffsetsConsumerGroupCommandIntegrationTest extends ConsumerGroupComm
withConsumerGroup {
val topic = if (inputPartition >= 0) inputTopic + ":" + inputPartition else inputTopic
val service = getConsumerGroupService(getArgs(group, topic))
val partitions = service.deleteOffsets(group, List(topic))
val (topLevelError, partitions) = service.deleteOffsets(group, List(topic))
val tp = new TopicPartition(inputTopic, expectedPartition)
// Partition level error should propagate to top level, unless this is due to a missed partition attempt.
if (inputPartition >= 0) {
assertEquals(expectedError, topLevelError)
}
if (expectedError == Errors.NONE)
assertNull(partitions(tp))
else

Loading…
Cancel
Save