diff --git a/clients/src/main/java/org/apache/kafka/clients/admin/Admin.java b/clients/src/main/java/org/apache/kafka/clients/admin/Admin.java index 8fdd21b1ad5..48630d46bb1 100644 --- a/clients/src/main/java/org/apache/kafka/clients/admin/Admin.java +++ b/clients/src/main/java/org/apache/kafka/clients/admin/Admin.java @@ -34,6 +34,7 @@ import org.apache.kafka.common.acl.AclBinding; import org.apache.kafka.common.acl.AclBindingFilter; import org.apache.kafka.common.annotation.InterfaceStability; import org.apache.kafka.common.config.ConfigResource; +import org.apache.kafka.common.requests.LeaveGroupResponse; /** * The administrative client for Kafka, which supports managing and inspecting topics, brokers, configurations and ACLs. @@ -1045,6 +1046,17 @@ public interface Admin extends AutoCloseable { ListPartitionReassignmentsResult listPartitionReassignments(Optional> partitions, ListPartitionReassignmentsOptions options); + /** + * Remove members from the consumer group by given member identities. + *

+ * For possible error codes, refer to {@link LeaveGroupResponse}. + * + * @param groupId The ID of the group to remove member from. + * @param options The options to carry removing members' information. + * @return The MembershipChangeResult. + */ + MembershipChangeResult removeMemberFromConsumerGroup(String groupId, RemoveMemberFromConsumerGroupOptions options); + /** * Get the metrics kept by the adminClient */ diff --git a/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java b/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java index 3ed98190957..c7ea97005a5 100644 --- a/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java +++ b/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java @@ -143,6 +143,8 @@ import org.apache.kafka.common.requests.FindCoordinatorRequest; import org.apache.kafka.common.requests.FindCoordinatorResponse; import org.apache.kafka.common.requests.IncrementalAlterConfigsRequest; import org.apache.kafka.common.requests.IncrementalAlterConfigsResponse; +import org.apache.kafka.common.requests.LeaveGroupRequest; +import org.apache.kafka.common.requests.LeaveGroupResponse; import org.apache.kafka.common.requests.ListGroupsRequest; import org.apache.kafka.common.requests.ListGroupsResponse; import org.apache.kafka.common.requests.ListPartitionReassignmentsRequest; @@ -2043,9 +2045,9 @@ public class KafkaAdminClient extends AdminClient { return futures; } - private IncrementalAlterConfigsRequestData toIncrementalAlterConfigsRequestData(final Collection resources, - final Map> configs, - final boolean validateOnly) { + private IncrementalAlterConfigsRequestData toIncrementalAlterConfigsRequestData(final Collection resources, + final Map> configs, + final boolean validateOnly) { IncrementalAlterConfigsRequestData requestData = new IncrementalAlterConfigsRequestData(); requestData.setValidateOnly(validateOnly); for (ConfigResource resource : resources) { @@ -3339,4 +3341,63 @@ public class KafkaAdminClient extends AdminClient { return (resource.type() == ConfigResource.Type.BROKER && !resource.isDefault()) || resource.type() == ConfigResource.Type.BROKER_LOGGER; } + + @Override + public MembershipChangeResult removeMemberFromConsumerGroup(String groupId, + RemoveMemberFromConsumerGroupOptions options) { + final long startFindCoordinatorMs = time.milliseconds(); + final long deadline = calcDeadlineMs(startFindCoordinatorMs, options.timeoutMs()); + + KafkaFutureImpl future = new KafkaFutureImpl<>(); + + ConsumerGroupOperationContext context = + new ConsumerGroupOperationContext<>(groupId, options, deadline, future); + + Call findCoordinatorCall = getFindCoordinatorCall(context, + () -> KafkaAdminClient.this.getRemoveMembersFromGroupCall(context)); + runnable.call(findCoordinatorCall, startFindCoordinatorMs); + + return new MembershipChangeResult(future); + } + + + private Call getRemoveMembersFromGroupCall(ConsumerGroupOperationContext + context) { + return new Call("leaveGroup", + context.getDeadline(), + new ConstantNodeIdProvider(context.getNode().get().id())) { + @Override + AbstractRequest.Builder createRequest(int timeoutMs) { + return new LeaveGroupRequest.Builder(context.getGroupId(), + context.getOptions().getMembers()); + } + + @Override + void handleResponse(AbstractResponse abstractResponse) { + final LeaveGroupResponse response = (LeaveGroupResponse) abstractResponse; + + // If coordinator changed since we fetched it, retry + if (context.hasCoordinatorMoved(response)) { + rescheduleTask(context, () -> getRemoveMembersFromGroupCall(context)); + return; + } + + // If error is transient coordinator error, retry + Errors error = response.error(); + if (error == Errors.COORDINATOR_LOAD_IN_PROGRESS || error == Errors.COORDINATOR_NOT_AVAILABLE) { + throw error.exception(); + } + + final RemoveMemberFromGroupResult membershipChangeResult = + new RemoveMemberFromGroupResult(response, context.getOptions().getMembers()); + + context.getFuture().complete(membershipChangeResult); + } + + @Override + void handleFailure(Throwable throwable) { + context.getFuture().completeExceptionally(throwable); + } + }; + } } diff --git a/clients/src/main/java/org/apache/kafka/clients/admin/MembershipChangeResult.java b/clients/src/main/java/org/apache/kafka/clients/admin/MembershipChangeResult.java new file mode 100644 index 00000000000..e704ed9a816 --- /dev/null +++ b/clients/src/main/java/org/apache/kafka/clients/admin/MembershipChangeResult.java @@ -0,0 +1,49 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.clients.admin; + +import org.apache.kafka.common.KafkaFuture; +import org.apache.kafka.common.annotation.InterfaceStability; + +import java.util.concurrent.ExecutionException; + +/** + * The result of the {@link KafkaAdminClient#removeMemberFromConsumerGroup(String, RemoveMemberFromConsumerGroupOptions)} call. + * + * The API of this class is evolving, see {@link AdminClient} for details. + */ +@InterfaceStability.Evolving +public class MembershipChangeResult { + + private KafkaFuture future; + + MembershipChangeResult(KafkaFuture future) { + this.future = future; + } + + /** + * Return a future which contains the member removal results. + */ + public RemoveMemberFromGroupResult all() throws ExecutionException, InterruptedException { + return future.get(); + } + + // Visible for testing + public KafkaFuture future() { + return future; + } +} diff --git a/clients/src/main/java/org/apache/kafka/clients/admin/RemoveMemberFromConsumerGroupOptions.java b/clients/src/main/java/org/apache/kafka/clients/admin/RemoveMemberFromConsumerGroupOptions.java new file mode 100644 index 00000000000..ed1fdab0be1 --- /dev/null +++ b/clients/src/main/java/org/apache/kafka/clients/admin/RemoveMemberFromConsumerGroupOptions.java @@ -0,0 +1,50 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.clients.admin; + +import org.apache.kafka.common.annotation.InterfaceStability; +import org.apache.kafka.common.message.LeaveGroupRequestData.MemberIdentity; +import org.apache.kafka.common.requests.JoinGroupRequest; + +import java.util.Collection; +import java.util.List; +import java.util.stream.Collectors; + +/** + * Options for {@link AdminClient#removeMemberFromConsumerGroup(String, RemoveMemberFromConsumerGroupOptions)}. + * It carries the members to be removed from the consumer group. + * + * The API of this class is evolving, see {@link AdminClient} for details. + */ +@InterfaceStability.Evolving +public class RemoveMemberFromConsumerGroupOptions extends AbstractOptions { + + private List members; + + public RemoveMemberFromConsumerGroupOptions(Collection groupInstanceIds) { + members = groupInstanceIds.stream().map( + instanceId -> new MemberIdentity() + .setGroupInstanceId(instanceId) + .setMemberId(JoinGroupRequest.UNKNOWN_MEMBER_ID) + ).collect(Collectors.toList()); + } + + public List getMembers() { + return members; + } +} + diff --git a/clients/src/main/java/org/apache/kafka/clients/admin/RemoveMemberFromGroupResult.java b/clients/src/main/java/org/apache/kafka/clients/admin/RemoveMemberFromGroupResult.java new file mode 100644 index 00000000000..1bd9a8be56f --- /dev/null +++ b/clients/src/main/java/org/apache/kafka/clients/admin/RemoveMemberFromGroupResult.java @@ -0,0 +1,85 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.clients.admin; + +import org.apache.kafka.common.KafkaFuture; +import org.apache.kafka.common.internals.KafkaFutureImpl; +import org.apache.kafka.common.message.LeaveGroupRequestData.MemberIdentity; +import org.apache.kafka.common.message.LeaveGroupResponseData.MemberResponse; +import org.apache.kafka.common.protocol.Errors; +import org.apache.kafka.common.requests.LeaveGroupResponse; + +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +/** + * Result of a batch member removal operation. + */ +public class RemoveMemberFromGroupResult { + + private final Errors topLevelError; + private final Map> memberFutures; + private boolean hasError = false; + + RemoveMemberFromGroupResult(LeaveGroupResponse response, + List membersToRemove) { + this.topLevelError = response.topLevelError(); + this.memberFutures = new HashMap<>(membersToRemove.size()); + + if (this.topLevelError != Errors.NONE) { + // If the populated error is a top-level error, fail every member's future. + for (MemberIdentity memberIdentity : membersToRemove) { + KafkaFutureImpl future = new KafkaFutureImpl<>(); + future.completeExceptionally(topLevelError.exception()); + memberFutures.put(memberIdentity, future); + } + hasError = true; + } else { + for (MemberResponse memberResponse : response.memberResponses()) { + KafkaFutureImpl future = new KafkaFutureImpl<>(); + Errors memberError = Errors.forCode(memberResponse.errorCode()); + if (memberError != Errors.NONE) { + future.completeExceptionally(memberError.exception()); + hasError = true; + } else { + future.complete(null); + } + memberFutures.put(new MemberIdentity() + .setMemberId(memberResponse.memberId()) + .setGroupInstanceId(memberResponse.groupInstanceId()), future); + } + } + } + + public Errors topLevelError() { + return topLevelError; + } + + public boolean hasError() { + return hasError; + } + + /** + * Futures of members with corresponding errors when they leave the group. + * + * @return list of members who failed to be removed + */ + public Map> memberFutures() { + return memberFutures; + } +} diff --git a/clients/src/main/java/org/apache/kafka/common/requests/LeaveGroupResponse.java b/clients/src/main/java/org/apache/kafka/common/requests/LeaveGroupResponse.java index 97a583f3d22..e64fde16898 100644 --- a/clients/src/main/java/org/apache/kafka/common/requests/LeaveGroupResponse.java +++ b/clients/src/main/java/org/apache/kafka/common/requests/LeaveGroupResponse.java @@ -97,6 +97,10 @@ public class LeaveGroupResponse extends AbstractResponse { return getError(Errors.forCode(data.errorCode()), data.members()); } + public Errors topLevelError() { + return Errors.forCode(data.errorCode()); + } + private static Errors getError(Errors topLevelError, List memberResponses) { if (topLevelError != Errors.NONE) { return topLevelError; diff --git a/clients/src/main/resources/common/message/LeaveGroupResponse.json b/clients/src/main/resources/common/message/LeaveGroupResponse.json index 2bbf63df236..495d5c39ba1 100644 --- a/clients/src/main/resources/common/message/LeaveGroupResponse.json +++ b/clients/src/main/resources/common/message/LeaveGroupResponse.json @@ -19,7 +19,7 @@ "name": "LeaveGroupResponse", // Version 1 adds the throttle time. // Starting in version 2, on quota violation, brokers send out responses before throttling. - // Starting in version 3, we will make leave group request into batch mode. + // Starting in version 3, we will make leave group request into batch mode and add group.instance.id. "validVersions": "0-3", "fields": [ { "name": "ThrottleTimeMs", "type": "int32", "versions": "1+", "ignorable": true, diff --git a/clients/src/test/java/org/apache/kafka/clients/admin/KafkaAdminClientTest.java b/clients/src/test/java/org/apache/kafka/clients/admin/KafkaAdminClientTest.java index ff9e27206b8..5d4d84b2d47 100644 --- a/clients/src/test/java/org/apache/kafka/clients/admin/KafkaAdminClientTest.java +++ b/clients/src/test/java/org/apache/kafka/clients/admin/KafkaAdminClientTest.java @@ -50,6 +50,7 @@ import org.apache.kafka.common.errors.SaslAuthenticationException; import org.apache.kafka.common.errors.SecurityDisabledException; import org.apache.kafka.common.errors.TimeoutException; import org.apache.kafka.common.errors.TopicDeletionDisabledException; +import org.apache.kafka.common.errors.UnknownMemberIdException; import org.apache.kafka.common.errors.UnknownServerException; import org.apache.kafka.common.errors.UnknownTopicOrPartitionException; import org.apache.kafka.common.message.AlterPartitionReassignmentsResponseData; @@ -66,6 +67,9 @@ import org.apache.kafka.common.message.ElectLeadersResponseData.PartitionResult; import org.apache.kafka.common.message.ElectLeadersResponseData.ReplicaElectionResult; import org.apache.kafka.common.message.IncrementalAlterConfigsResponseData.AlterConfigsResourceResponse; import org.apache.kafka.common.message.IncrementalAlterConfigsResponseData; +import org.apache.kafka.common.message.LeaveGroupRequestData.MemberIdentity; +import org.apache.kafka.common.message.LeaveGroupResponseData; +import org.apache.kafka.common.message.LeaveGroupResponseData.MemberResponse; import org.apache.kafka.common.message.ListGroupsResponseData; import org.apache.kafka.common.message.ListPartitionReassignmentsResponseData; import org.apache.kafka.common.protocol.Errors; @@ -89,6 +93,7 @@ import org.apache.kafka.common.requests.DescribeGroupsResponse; import org.apache.kafka.common.requests.ElectLeadersResponse; import org.apache.kafka.common.requests.FindCoordinatorResponse; import org.apache.kafka.common.requests.IncrementalAlterConfigsResponse; +import org.apache.kafka.common.requests.LeaveGroupResponse; import org.apache.kafka.common.requests.ListGroupsResponse; import org.apache.kafka.common.requests.ListPartitionReassignmentsResponse; import org.apache.kafka.common.requests.MetadataRequest; @@ -224,7 +229,7 @@ public class KafkaAdminClientTest { private static Cluster mockBootstrapCluster() { return Cluster.bootstrap(ClientUtils.parseAndValidateAddresses( - Collections.singletonList("localhost:8121"), ClientDnsLookup.DEFAULT)); + singletonList("localhost:8121"), ClientDnsLookup.DEFAULT)); } private static AdminClientUnitTestEnv mockClientEnv(String... configVals) { @@ -301,7 +306,7 @@ public class KafkaAdminClientTest { // This tests the scenario in which the bootstrap server is unreachable for a short while, // which prevents AdminClient from being able to send the initial metadata request - Cluster cluster = Cluster.bootstrap(Collections.singletonList(new InetSocketAddress("localhost", 8121))); + Cluster cluster = Cluster.bootstrap(singletonList(new InetSocketAddress("localhost", 8121))); Map unreachableNodes = Collections.singletonMap(cluster.nodes().get(0), 200L); try (final AdminClientUnitTestEnv env = new AdminClientUnitTestEnv(Time.SYSTEM, cluster, AdminClientUnitTestEnv.clientConfigs(), unreachableNodes)) { @@ -429,19 +434,19 @@ public class KafkaAdminClientTest { env.kafkaClient().prepareResponse(body -> body instanceof DeleteTopicsRequest, prepareDeleteTopicsResponse("myTopic", Errors.NONE)); - KafkaFuture future = env.adminClient().deleteTopics(Collections.singletonList("myTopic"), + KafkaFuture future = env.adminClient().deleteTopics(singletonList("myTopic"), new DeleteTopicsOptions()).all(); future.get(); env.kafkaClient().prepareResponse(body -> body instanceof DeleteTopicsRequest, prepareDeleteTopicsResponse("myTopic", Errors.TOPIC_DELETION_DISABLED)); - future = env.adminClient().deleteTopics(Collections.singletonList("myTopic"), + future = env.adminClient().deleteTopics(singletonList("myTopic"), new DeleteTopicsOptions()).all(); TestUtils.assertFutureError(future, TopicDeletionDisabledException.class); env.kafkaClient().prepareResponse(body -> body instanceof DeleteTopicsRequest, prepareDeleteTopicsResponse("myTopic", Errors.UNKNOWN_TOPIC_OR_PARTITION)); - future = env.adminClient().deleteTopics(Collections.singletonList("myTopic"), + future = env.adminClient().deleteTopics(singletonList("myTopic"), new DeleteTopicsOptions()).all(); TestUtils.assertFutureError(future, UnknownTopicOrPartitionException.class); } @@ -1550,8 +1555,8 @@ public class KafkaAdminClientTest { AlterConfigOp.OpType.APPEND); final Map> configs = new HashMap<>(); - configs.put(brokerResource, Collections.singletonList(alterConfigOp1)); - configs.put(topicResource, Collections.singletonList(alterConfigOp2)); + configs.put(brokerResource, singletonList(alterConfigOp1)); + configs.put(topicResource, singletonList(alterConfigOp2)); AlterConfigsResult result = env.adminClient().incrementalAlterConfigs(configs); TestUtils.assertFutureError(result.values().get(brokerResource), ClusterAuthorizationException.class); @@ -1566,7 +1571,117 @@ public class KafkaAdminClientTest { .setErrorMessage(ApiError.NONE.message())); env.kafkaClient().prepareResponse(new IncrementalAlterConfigsResponse(responseData)); - env.adminClient().incrementalAlterConfigs(Collections.singletonMap(brokerResource, asList(alterConfigOp1))).all().get(); + env.adminClient().incrementalAlterConfigs(Collections.singletonMap(brokerResource, singletonList(alterConfigOp1))).all().get(); + } + } + + @Test + public void testRemoveMembersFromGroup() throws Exception { + try (AdminClientUnitTestEnv env = mockClientEnv()) { + final String instanceOne = "instance-1"; + final String instanceTwo = "instance-2"; + env.kafkaClient().setNodeApiVersions(NodeApiVersions.create()); + + MemberResponse responseOne = new MemberResponse() + .setGroupInstanceId(instanceOne) + .setErrorCode(Errors.UNKNOWN_MEMBER_ID.code()); + + MemberResponse responseTwo = new MemberResponse() + .setGroupInstanceId(instanceTwo) + .setErrorCode(Errors.NONE.code()); + + List memberResponses = Arrays.asList(responseOne, responseTwo); + + // Retriable FindCoordinatorResponse errors should be retried + env.kafkaClient().prepareResponse(prepareFindCoordinatorResponse(Errors.COORDINATOR_NOT_AVAILABLE, Node.noNode())); + env.kafkaClient().prepareResponse(prepareFindCoordinatorResponse(Errors.COORDINATOR_LOAD_IN_PROGRESS, Node.noNode())); + env.kafkaClient().prepareResponse(prepareFindCoordinatorResponse(Errors.NONE, env.cluster().controller())); + + // Retriable errors should be retried + env.kafkaClient().prepareResponse(null, true); + env.kafkaClient().prepareResponse(new LeaveGroupResponse(new LeaveGroupResponseData() + .setErrorCode(Errors.COORDINATOR_NOT_AVAILABLE.code()))); + env.kafkaClient().prepareResponse(new LeaveGroupResponse(new LeaveGroupResponseData() + .setErrorCode(Errors.COORDINATOR_LOAD_IN_PROGRESS.code()))); + + // Inject a top-level non-retriable error + env.kafkaClient().prepareResponse(new LeaveGroupResponse(new LeaveGroupResponseData() + .setErrorCode(Errors.UNKNOWN_SERVER_ERROR.code()))); + + String groupId = "groupId"; + List membersToRemove = Arrays.asList(instanceOne, instanceTwo); + final MembershipChangeResult unknownErrorResult = env.adminClient().removeMemberFromConsumerGroup( + groupId, + new RemoveMemberFromConsumerGroupOptions(membersToRemove) + ); + + RemoveMemberFromGroupResult result = unknownErrorResult.all(); + assertTrue(result.hasError()); + assertEquals(Errors.UNKNOWN_SERVER_ERROR, result.topLevelError()); + + Map> memberFutures = result.memberFutures(); + assertEquals(2, memberFutures.size()); + for (Map.Entry> entry : memberFutures.entrySet()) { + KafkaFuture memberFuture = entry.getValue(); + assertTrue(memberFuture.isCompletedExceptionally()); + try { + memberFuture.get(); + fail("get() should throw exception"); + } catch (ExecutionException | InterruptedException e0) { + assertTrue(e0.getCause() instanceof UnknownServerException); + } + } + + // Inject one member level error. + env.kafkaClient().prepareResponse(prepareFindCoordinatorResponse(Errors.NONE, env.cluster().controller())); + env.kafkaClient().prepareResponse(new LeaveGroupResponse(new LeaveGroupResponseData() + .setErrorCode(Errors.NONE.code()) + .setMembers(memberResponses))); + + final MembershipChangeResult memberLevelErrorResult = env.adminClient().removeMemberFromConsumerGroup( + groupId, + new RemoveMemberFromConsumerGroupOptions(membersToRemove) + ); + + result = memberLevelErrorResult.all(); + assertTrue(result.hasError()); + assertEquals(Errors.NONE, result.topLevelError()); + + memberFutures = result.memberFutures(); + assertEquals(2, memberFutures.size()); + for (Map.Entry> entry : memberFutures.entrySet()) { + KafkaFuture memberFuture = entry.getValue(); + if (entry.getKey().groupInstanceId().equals(instanceOne)) { + try { + memberFuture.get(); + fail("get() should throw ExecutionException"); + } catch (ExecutionException | InterruptedException e0) { + assertTrue(e0.getCause() instanceof UnknownMemberIdException); + } + } else { + assertFalse(memberFuture.isCompletedExceptionally()); + } + } + + // Return success. + env.kafkaClient().prepareResponse(prepareFindCoordinatorResponse(Errors.NONE, env.cluster().controller())); + env.kafkaClient().prepareResponse(new LeaveGroupResponse(new LeaveGroupResponseData() + .setErrorCode(Errors.NONE.code()) + .setMembers(Collections.singletonList(responseTwo)))); + + final MembershipChangeResult noErrorResult = env.adminClient().removeMemberFromConsumerGroup( + groupId, + new RemoveMemberFromConsumerGroupOptions(membersToRemove) + ); + result = noErrorResult.all(); + assertFalse(result.hasError()); + assertEquals(Errors.NONE, result.topLevelError()); + + memberFutures = result.memberFutures(); + assertEquals(1, memberFutures.size()); + for (Map.Entry> entry : memberFutures.entrySet()) { + assertFalse(entry.getValue().isCompletedExceptionally()); + } } } @@ -1838,7 +1953,5 @@ public class KafkaAdminClientTest { } } } - } - } diff --git a/clients/src/test/java/org/apache/kafka/clients/admin/MembershipChangeResultTest.java b/clients/src/test/java/org/apache/kafka/clients/admin/MembershipChangeResultTest.java new file mode 100644 index 00000000000..5a1357296b1 --- /dev/null +++ b/clients/src/test/java/org/apache/kafka/clients/admin/MembershipChangeResultTest.java @@ -0,0 +1,50 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.clients.admin; + +import org.apache.kafka.common.internals.KafkaFutureImpl; +import org.apache.kafka.common.message.LeaveGroupResponseData; +import org.apache.kafka.common.requests.LeaveGroupResponse; +import org.junit.Test; + +import java.util.Collections; +import java.util.concurrent.ExecutionException; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.fail; + +public class MembershipChangeResultTest { + + @Test + public void testConstructor() { + KafkaFutureImpl removeMemberFuture = new KafkaFutureImpl<>(); + + MembershipChangeResult changeResult = new MembershipChangeResult(removeMemberFuture); + assertEquals(removeMemberFuture, changeResult.future()); + RemoveMemberFromGroupResult removeMemberFromGroupResult = new RemoveMemberFromGroupResult( + new LeaveGroupResponse(new LeaveGroupResponseData()), + Collections.emptyList() + ); + + removeMemberFuture.complete(removeMemberFromGroupResult); + try { + assertEquals(removeMemberFromGroupResult, changeResult.all()); + } catch (ExecutionException | InterruptedException e) { + fail("Unexpected exception " + e + " when trying to get remove member result"); + } + } +} diff --git a/clients/src/test/java/org/apache/kafka/clients/admin/MockAdminClient.java b/clients/src/test/java/org/apache/kafka/clients/admin/MockAdminClient.java index baaf6613841..86ba572f532 100644 --- a/clients/src/test/java/org/apache/kafka/clients/admin/MockAdminClient.java +++ b/clients/src/test/java/org/apache/kafka/clients/admin/MockAdminClient.java @@ -357,6 +357,11 @@ public class MockAdminClient extends AdminClient { throw new UnsupportedOperationException("Not implemented yet"); } + @Override + public MembershipChangeResult removeMemberFromConsumerGroup(String groupId, RemoveMemberFromConsumerGroupOptions options) { + throw new UnsupportedOperationException("Not implemented yet"); + } + @Override public CreateAclsResult createAcls(Collection acls, CreateAclsOptions options) { throw new UnsupportedOperationException("Not implemented yet"); diff --git a/clients/src/test/java/org/apache/kafka/clients/admin/RemoveMemberFromConsumerGroupOptionsTest.java b/clients/src/test/java/org/apache/kafka/clients/admin/RemoveMemberFromConsumerGroupOptionsTest.java new file mode 100644 index 00000000000..b36461efbcd --- /dev/null +++ b/clients/src/test/java/org/apache/kafka/clients/admin/RemoveMemberFromConsumerGroupOptionsTest.java @@ -0,0 +1,38 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.clients.admin; + +import org.apache.kafka.common.message.LeaveGroupRequestData.MemberIdentity; +import org.junit.Test; + +import java.util.Collections; +import java.util.List; + +import static org.junit.Assert.assertEquals; + +public class RemoveMemberFromConsumerGroupOptionsTest { + + @Test + public void testConstructor() { + List groupInstanceIds = Collections.singletonList("instance-1"); + + RemoveMemberFromConsumerGroupOptions options = new RemoveMemberFromConsumerGroupOptions(groupInstanceIds); + + assertEquals(Collections.singletonList( + new MemberIdentity().setGroupInstanceId("instance-1")), options.getMembers()); + } +} diff --git a/clients/src/test/java/org/apache/kafka/clients/admin/RemoveMemberFromGroupResultTest.java b/clients/src/test/java/org/apache/kafka/clients/admin/RemoveMemberFromGroupResultTest.java new file mode 100644 index 00000000000..f4a6ebe948f --- /dev/null +++ b/clients/src/test/java/org/apache/kafka/clients/admin/RemoveMemberFromGroupResultTest.java @@ -0,0 +1,154 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.clients.admin; + +import org.apache.kafka.common.KafkaFuture; +import org.apache.kafka.common.errors.FencedInstanceIdException; +import org.apache.kafka.common.errors.GroupAuthorizationException; +import org.apache.kafka.common.message.LeaveGroupRequestData.MemberIdentity; +import org.apache.kafka.common.message.LeaveGroupResponseData; +import org.apache.kafka.common.message.LeaveGroupResponseData.MemberResponse; +import org.apache.kafka.common.protocol.Errors; +import org.apache.kafka.common.requests.LeaveGroupResponse; +import org.junit.Before; +import org.junit.Test; + +import java.util.Arrays; +import java.util.List; +import java.util.Map; +import java.util.concurrent.ExecutionException; + +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertTrue; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.fail; + +public class RemoveMemberFromGroupResultTest { + + private String instanceOne = "instance-1"; + private String instanceTwo = "instance-2"; + private List membersToRemove; + + private List memberResponses; + + @Before + public void setUp() { + membersToRemove = Arrays.asList( + new MemberIdentity() + .setGroupInstanceId(instanceOne), + new MemberIdentity() + .setGroupInstanceId(instanceTwo) + ); + + memberResponses = Arrays.asList( + new MemberResponse() + .setGroupInstanceId(instanceOne), + new MemberResponse() + .setGroupInstanceId(instanceTwo) + ); + } + + @Test + public void testTopLevelErrorConstructor() { + RemoveMemberFromGroupResult topLevelErrorResult = + new RemoveMemberFromGroupResult(new LeaveGroupResponse( + new LeaveGroupResponseData() + .setErrorCode(Errors.GROUP_AUTHORIZATION_FAILED.code()) + .setMembers(memberResponses)), membersToRemove); + + assertTrue(topLevelErrorResult.hasError()); + assertEquals(Errors.GROUP_AUTHORIZATION_FAILED, topLevelErrorResult.topLevelError()); + + Map> memberFutures = topLevelErrorResult.memberFutures(); + assertEquals(2, memberFutures.size()); + for (Map.Entry> entry : memberFutures.entrySet()) { + KafkaFuture memberFuture = entry.getValue(); + assertTrue(memberFuture.isCompletedExceptionally()); + try { + memberFuture.get(); + fail("get() should throw ExecutionException"); + } catch (ExecutionException | InterruptedException e0) { + assertTrue(e0.getCause() instanceof GroupAuthorizationException); + } + } + } + + @Test + public void testMemberLevelErrorConstructor() { + MemberResponse responseOne = new MemberResponse() + .setGroupInstanceId(instanceOne) + .setErrorCode(Errors.FENCED_INSTANCE_ID.code()); + MemberResponse responseTwo = new MemberResponse() + .setGroupInstanceId(instanceTwo) + .setErrorCode(Errors.NONE.code()); + + RemoveMemberFromGroupResult memberLevelErrorResult = new RemoveMemberFromGroupResult( + new LeaveGroupResponse(new LeaveGroupResponseData() + .setMembers(Arrays.asList(responseOne, responseTwo))), + membersToRemove); + assertTrue(memberLevelErrorResult.hasError()); + assertEquals(Errors.NONE, memberLevelErrorResult.topLevelError()); + + Map> memberFutures = memberLevelErrorResult.memberFutures(); + assertEquals(2, memberFutures.size()); + for (Map.Entry> entry : memberFutures.entrySet()) { + KafkaFuture memberFuture = entry.getValue(); + if (entry.getKey().groupInstanceId().equals(instanceOne)) { + assertTrue(memberFuture.isCompletedExceptionally()); + try { + memberFuture.get(); + fail("get() should throw ExecutionException"); + } catch (ExecutionException | InterruptedException e0) { + assertTrue(e0.getCause() instanceof FencedInstanceIdException); + } + } else { + assertFalse(memberFuture.isCompletedExceptionally()); + try { + memberFuture.get(); + } catch (ExecutionException | InterruptedException e0) { + fail("get() shouldn't throw exception"); + } + } + } + } + + @Test + public void testNoErrorConstructor() { + MemberResponse responseOne = new MemberResponse() + .setGroupInstanceId(instanceOne) + .setErrorCode(Errors.NONE.code()); + MemberResponse responseTwo = new MemberResponse() + .setGroupInstanceId(instanceTwo) + .setErrorCode(Errors.NONE.code()); + // If no error is specified, failed members are not visible. + RemoveMemberFromGroupResult noErrorResult = new RemoveMemberFromGroupResult( + new LeaveGroupResponse(new LeaveGroupResponseData() + .setMembers(Arrays.asList(responseOne, responseTwo))), + membersToRemove); + assertFalse(noErrorResult.hasError()); + assertEquals(Errors.NONE, noErrorResult.topLevelError()); + Map> memberFutures = noErrorResult.memberFutures(); + assertEquals(2, memberFutures.size()); + for (Map.Entry> entry : memberFutures.entrySet()) { + try { + entry.getValue().get(); + } catch (ExecutionException | InterruptedException e0) { + fail("get() shouldn't throw exception"); + } + } + } +} diff --git a/core/src/test/scala/integration/kafka/api/AdminClientIntegrationTest.scala b/core/src/test/scala/integration/kafka/api/AdminClientIntegrationTest.scala index 0484f6a5f78..e2e0ca71a03 100644 --- a/core/src/test/scala/integration/kafka/api/AdminClientIntegrationTest.scala +++ b/core/src/test/scala/integration/kafka/api/AdminClientIntegrationTest.scala @@ -43,7 +43,11 @@ import org.apache.kafka.common.TopicPartitionReplica import org.apache.kafka.common.acl._ import org.apache.kafka.common.config.{ConfigResource, LogLevelConfig} import org.apache.kafka.common.errors._ -import org.apache.kafka.common.requests.{DeleteRecordsRequest, MetadataResponse} +import org.apache.kafka.common.internals.KafkaFutureImpl +import org.apache.kafka.common.message.LeaveGroupRequestData.MemberIdentity +import org.apache.kafka.common.message.LeaveGroupResponseData.MemberResponse +import org.apache.kafka.common.protocol.Errors +import org.apache.kafka.common.requests.{DeleteRecordsRequest, JoinGroupRequest, MetadataResponse} import org.apache.kafka.common.resource.{PatternType, Resource, ResourcePattern, ResourceType} import org.apache.kafka.common.utils.{Time, Utils} import org.junit.Assert._ @@ -1168,10 +1172,12 @@ class AdminClientIntegrationTest extends IntegrationTestHarness with Logging { } val testGroupId = "test_group_id" val testClientId = "test_client_id" + val testInstanceId = "test_instance_id" val fakeGroupId = "fake_group_id" val newConsumerConfig = new Properties(consumerConfig) newConsumerConfig.setProperty(ConsumerConfig.GROUP_ID_CONFIG, testGroupId) newConsumerConfig.setProperty(ConsumerConfig.CLIENT_ID_CONFIG, testClientId) + newConsumerConfig.setProperty(ConsumerConfig.GROUP_INSTANCE_ID_CONFIG, testInstanceId) val consumer = createConsumer(configOverrides = newConsumerConfig) val latch = new CountDownLatch(1) try { @@ -1201,13 +1207,13 @@ class AdminClientIntegrationTest extends IntegrationTestHarness with Logging { !matching.isEmpty }, s"Expected to be able to list $testGroupId") - val result = client.describeConsumerGroups(Seq(testGroupId, fakeGroupId).asJava, + val describeWithFakeGroupResult = client.describeConsumerGroups(Seq(testGroupId, fakeGroupId).asJava, new DescribeConsumerGroupsOptions().includeAuthorizedOperations(true)) - assertEquals(2, result.describedGroups().size()) + assertEquals(2, describeWithFakeGroupResult.describedGroups().size()) // Test that we can get information about the test consumer group. - assertTrue(result.describedGroups().containsKey(testGroupId)) - val testGroupDescription = result.describedGroups().get(testGroupId).get() + assertTrue(describeWithFakeGroupResult.describedGroups().containsKey(testGroupId)) + var testGroupDescription = describeWithFakeGroupResult.describedGroups().get(testGroupId).get() assertEquals(testGroupId, testGroupDescription.groupId()) assertFalse(testGroupDescription.isSimpleConsumerGroup()) @@ -1223,8 +1229,8 @@ class AdminClientIntegrationTest extends IntegrationTestHarness with Logging { assertEquals(expectedOperations, testGroupDescription.authorizedOperations()) // Test that the fake group is listed as dead. - assertTrue(result.describedGroups().containsKey(fakeGroupId)) - val fakeGroupDescription = result.describedGroups().get(fakeGroupId).get() + assertTrue(describeWithFakeGroupResult.describedGroups().containsKey(fakeGroupId)) + val fakeGroupDescription = describeWithFakeGroupResult.describedGroups().get(fakeGroupId).get() assertEquals(fakeGroupId, fakeGroupDescription.groupId()) assertEquals(0, fakeGroupDescription.members().size()) @@ -1233,7 +1239,7 @@ class AdminClientIntegrationTest extends IntegrationTestHarness with Logging { assertEquals(expectedOperations, fakeGroupDescription.authorizedOperations()) // Test that all() returns 2 results - assertEquals(2, result.all().get().size()) + assertEquals(2, describeWithFakeGroupResult.all().get().size()) // Test listConsumerGroupOffsets TestUtils.waitUntilTrue(() => { @@ -1242,8 +1248,30 @@ class AdminClientIntegrationTest extends IntegrationTestHarness with Logging { parts.containsKey(part) && (parts.get(part).offset() == 1) }, s"Expected the offset for partition 0 to eventually become 1.") + // Test delete non-exist consumer instance + val invalidInstanceId = "invalid-instance-id" + var removeMemberResult = client.removeMemberFromConsumerGroup(testGroupId, new RemoveMemberFromConsumerGroupOptions( + Collections.singletonList(invalidInstanceId) + )).all() + + assertTrue(removeMemberResult.hasError) + assertEquals(Errors.NONE, removeMemberResult.topLevelError) + + val firstMemberFutures = removeMemberResult.memberFutures() + assertEquals(1, firstMemberFutures.size) + firstMemberFutures.values.asScala foreach { case value => + try { + value.get() + } catch { + case e: ExecutionException => + assertTrue(e.getCause.isInstanceOf[UnknownMemberIdException]) + case _ => + fail("Should have caught exception in getting member future") + } + } + // Test consumer group deletion - val deleteResult = client.deleteConsumerGroups(Seq(testGroupId, fakeGroupId).asJava) + var deleteResult = client.deleteConsumerGroups(Seq(testGroupId, fakeGroupId).asJava) assertEquals(2, deleteResult.deletedGroups().size()) // Deleting the fake group ID should get GroupIdNotFoundException. @@ -1255,6 +1283,45 @@ class AdminClientIntegrationTest extends IntegrationTestHarness with Logging { assertTrue(deleteResult.deletedGroups().containsKey(testGroupId)) assertFutureExceptionTypeEquals(deleteResult.deletedGroups().get(testGroupId), classOf[GroupNotEmptyException]) + + // Test delete correct member + removeMemberResult = client.removeMemberFromConsumerGroup(testGroupId, new RemoveMemberFromConsumerGroupOptions( + Collections.singletonList(testInstanceId) + )).all() + + assertFalse(removeMemberResult.hasError) + assertEquals(Errors.NONE, removeMemberResult.topLevelError) + + val deletedMemberFutures = removeMemberResult.memberFutures() + assertEquals(1, firstMemberFutures.size) + deletedMemberFutures.values.asScala foreach { case value => + try { + value.get() + } catch { + case e: ExecutionException => + assertTrue(e.getCause.isInstanceOf[UnknownMemberIdException]) + case _ => + fail("Should have caught exception in getting member future") + } + } + + // The group should contain no member now. + val describeTestGroupResult = client.describeConsumerGroups(Seq(testGroupId).asJava, + new DescribeConsumerGroupsOptions().includeAuthorizedOperations(true)) + assertEquals(1, describeTestGroupResult.describedGroups().size()) + + testGroupDescription = describeTestGroupResult.describedGroups().get(testGroupId).get() + + assertEquals(testGroupId, testGroupDescription.groupId) + assertFalse(testGroupDescription.isSimpleConsumerGroup) + assertTrue(testGroupDescription.members().isEmpty) + + // Consumer group deletion on empty group should succeed + deleteResult = client.deleteConsumerGroups(Seq(testGroupId).asJava) + assertEquals(1, deleteResult.deletedGroups().size()) + + assertTrue(deleteResult.deletedGroups().containsKey(testGroupId)) + assertNull(deleteResult.deletedGroups().get(testGroupId).get()) } finally { consumerThread.interrupt() consumerThread.join()