Browse Source

KAFKA-8222 & KIP-345 part 5: admin request to batch remove members (#7122)

This PR adds supporting features for static membership. It could batch remove consumers from the group with provided group.instance.id list.

Reviewers: Guozhang Wang <wangguoz@gmail.com>
pull/7317/head
Boyang Chen 5 years ago committed by Guozhang Wang
parent
commit
e59e4caadc
  1. 12
      clients/src/main/java/org/apache/kafka/clients/admin/Admin.java
  2. 61
      clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java
  3. 49
      clients/src/main/java/org/apache/kafka/clients/admin/MembershipChangeResult.java
  4. 50
      clients/src/main/java/org/apache/kafka/clients/admin/RemoveMemberFromConsumerGroupOptions.java
  5. 85
      clients/src/main/java/org/apache/kafka/clients/admin/RemoveMemberFromGroupResult.java
  6. 4
      clients/src/main/java/org/apache/kafka/common/requests/LeaveGroupResponse.java
  7. 2
      clients/src/main/resources/common/message/LeaveGroupResponse.json
  8. 133
      clients/src/test/java/org/apache/kafka/clients/admin/KafkaAdminClientTest.java
  9. 50
      clients/src/test/java/org/apache/kafka/clients/admin/MembershipChangeResultTest.java
  10. 5
      clients/src/test/java/org/apache/kafka/clients/admin/MockAdminClient.java
  11. 38
      clients/src/test/java/org/apache/kafka/clients/admin/RemoveMemberFromConsumerGroupOptionsTest.java
  12. 154
      clients/src/test/java/org/apache/kafka/clients/admin/RemoveMemberFromGroupResultTest.java
  13. 85
      core/src/test/scala/integration/kafka/api/AdminClientIntegrationTest.scala

12
clients/src/main/java/org/apache/kafka/clients/admin/Admin.java

@ -34,6 +34,7 @@ import org.apache.kafka.common.acl.AclBinding; @@ -34,6 +34,7 @@ import org.apache.kafka.common.acl.AclBinding;
import org.apache.kafka.common.acl.AclBindingFilter;
import org.apache.kafka.common.annotation.InterfaceStability;
import org.apache.kafka.common.config.ConfigResource;
import org.apache.kafka.common.requests.LeaveGroupResponse;
/**
* The administrative client for Kafka, which supports managing and inspecting topics, brokers, configurations and ACLs.
@ -1045,6 +1046,17 @@ public interface Admin extends AutoCloseable { @@ -1045,6 +1046,17 @@ public interface Admin extends AutoCloseable {
ListPartitionReassignmentsResult listPartitionReassignments(Optional<Set<TopicPartition>> partitions,
ListPartitionReassignmentsOptions options);
/**
* Remove members from the consumer group by given member identities.
* <p>
* For possible error codes, refer to {@link LeaveGroupResponse}.
*
* @param groupId The ID of the group to remove member from.
* @param options The options to carry removing members' information.
* @return The MembershipChangeResult.
*/
MembershipChangeResult removeMemberFromConsumerGroup(String groupId, RemoveMemberFromConsumerGroupOptions options);
/**
* Get the metrics kept by the adminClient
*/

61
clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java

@ -143,6 +143,8 @@ import org.apache.kafka.common.requests.FindCoordinatorRequest; @@ -143,6 +143,8 @@ import org.apache.kafka.common.requests.FindCoordinatorRequest;
import org.apache.kafka.common.requests.FindCoordinatorResponse;
import org.apache.kafka.common.requests.IncrementalAlterConfigsRequest;
import org.apache.kafka.common.requests.IncrementalAlterConfigsResponse;
import org.apache.kafka.common.requests.LeaveGroupRequest;
import org.apache.kafka.common.requests.LeaveGroupResponse;
import org.apache.kafka.common.requests.ListGroupsRequest;
import org.apache.kafka.common.requests.ListGroupsResponse;
import org.apache.kafka.common.requests.ListPartitionReassignmentsRequest;
@ -3339,4 +3341,63 @@ public class KafkaAdminClient extends AdminClient { @@ -3339,4 +3341,63 @@ public class KafkaAdminClient extends AdminClient {
return (resource.type() == ConfigResource.Type.BROKER && !resource.isDefault())
|| resource.type() == ConfigResource.Type.BROKER_LOGGER;
}
@Override
public MembershipChangeResult removeMemberFromConsumerGroup(String groupId,
RemoveMemberFromConsumerGroupOptions options) {
final long startFindCoordinatorMs = time.milliseconds();
final long deadline = calcDeadlineMs(startFindCoordinatorMs, options.timeoutMs());
KafkaFutureImpl<RemoveMemberFromGroupResult> future = new KafkaFutureImpl<>();
ConsumerGroupOperationContext<RemoveMemberFromGroupResult, RemoveMemberFromConsumerGroupOptions> context =
new ConsumerGroupOperationContext<>(groupId, options, deadline, future);
Call findCoordinatorCall = getFindCoordinatorCall(context,
() -> KafkaAdminClient.this.getRemoveMembersFromGroupCall(context));
runnable.call(findCoordinatorCall, startFindCoordinatorMs);
return new MembershipChangeResult(future);
}
private Call getRemoveMembersFromGroupCall(ConsumerGroupOperationContext
<RemoveMemberFromGroupResult, RemoveMemberFromConsumerGroupOptions> context) {
return new Call("leaveGroup",
context.getDeadline(),
new ConstantNodeIdProvider(context.getNode().get().id())) {
@Override
AbstractRequest.Builder createRequest(int timeoutMs) {
return new LeaveGroupRequest.Builder(context.getGroupId(),
context.getOptions().getMembers());
}
@Override
void handleResponse(AbstractResponse abstractResponse) {
final LeaveGroupResponse response = (LeaveGroupResponse) abstractResponse;
// If coordinator changed since we fetched it, retry
if (context.hasCoordinatorMoved(response)) {
rescheduleTask(context, () -> getRemoveMembersFromGroupCall(context));
return;
}
// If error is transient coordinator error, retry
Errors error = response.error();
if (error == Errors.COORDINATOR_LOAD_IN_PROGRESS || error == Errors.COORDINATOR_NOT_AVAILABLE) {
throw error.exception();
}
final RemoveMemberFromGroupResult membershipChangeResult =
new RemoveMemberFromGroupResult(response, context.getOptions().getMembers());
context.getFuture().complete(membershipChangeResult);
}
@Override
void handleFailure(Throwable throwable) {
context.getFuture().completeExceptionally(throwable);
}
};
}
}

49
clients/src/main/java/org/apache/kafka/clients/admin/MembershipChangeResult.java

@ -0,0 +1,49 @@ @@ -0,0 +1,49 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.clients.admin;
import org.apache.kafka.common.KafkaFuture;
import org.apache.kafka.common.annotation.InterfaceStability;
import java.util.concurrent.ExecutionException;
/**
* The result of the {@link KafkaAdminClient#removeMemberFromConsumerGroup(String, RemoveMemberFromConsumerGroupOptions)} call.
*
* The API of this class is evolving, see {@link AdminClient} for details.
*/
@InterfaceStability.Evolving
public class MembershipChangeResult {
private KafkaFuture<RemoveMemberFromGroupResult> future;
MembershipChangeResult(KafkaFuture<RemoveMemberFromGroupResult> future) {
this.future = future;
}
/**
* Return a future which contains the member removal results.
*/
public RemoveMemberFromGroupResult all() throws ExecutionException, InterruptedException {
return future.get();
}
// Visible for testing
public KafkaFuture<RemoveMemberFromGroupResult> future() {
return future;
}
}

50
clients/src/main/java/org/apache/kafka/clients/admin/RemoveMemberFromConsumerGroupOptions.java

@ -0,0 +1,50 @@ @@ -0,0 +1,50 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.clients.admin;
import org.apache.kafka.common.annotation.InterfaceStability;
import org.apache.kafka.common.message.LeaveGroupRequestData.MemberIdentity;
import org.apache.kafka.common.requests.JoinGroupRequest;
import java.util.Collection;
import java.util.List;
import java.util.stream.Collectors;
/**
* Options for {@link AdminClient#removeMemberFromConsumerGroup(String, RemoveMemberFromConsumerGroupOptions)}.
* It carries the members to be removed from the consumer group.
*
* The API of this class is evolving, see {@link AdminClient} for details.
*/
@InterfaceStability.Evolving
public class RemoveMemberFromConsumerGroupOptions extends AbstractOptions<RemoveMemberFromConsumerGroupOptions> {
private List<MemberIdentity> members;
public RemoveMemberFromConsumerGroupOptions(Collection<String> groupInstanceIds) {
members = groupInstanceIds.stream().map(
instanceId -> new MemberIdentity()
.setGroupInstanceId(instanceId)
.setMemberId(JoinGroupRequest.UNKNOWN_MEMBER_ID)
).collect(Collectors.toList());
}
public List<MemberIdentity> getMembers() {
return members;
}
}

85
clients/src/main/java/org/apache/kafka/clients/admin/RemoveMemberFromGroupResult.java

@ -0,0 +1,85 @@ @@ -0,0 +1,85 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.clients.admin;
import org.apache.kafka.common.KafkaFuture;
import org.apache.kafka.common.internals.KafkaFutureImpl;
import org.apache.kafka.common.message.LeaveGroupRequestData.MemberIdentity;
import org.apache.kafka.common.message.LeaveGroupResponseData.MemberResponse;
import org.apache.kafka.common.protocol.Errors;
import org.apache.kafka.common.requests.LeaveGroupResponse;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
/**
* Result of a batch member removal operation.
*/
public class RemoveMemberFromGroupResult {
private final Errors topLevelError;
private final Map<MemberIdentity, KafkaFuture<Void>> memberFutures;
private boolean hasError = false;
RemoveMemberFromGroupResult(LeaveGroupResponse response,
List<MemberIdentity> membersToRemove) {
this.topLevelError = response.topLevelError();
this.memberFutures = new HashMap<>(membersToRemove.size());
if (this.topLevelError != Errors.NONE) {
// If the populated error is a top-level error, fail every member's future.
for (MemberIdentity memberIdentity : membersToRemove) {
KafkaFutureImpl<Void> future = new KafkaFutureImpl<>();
future.completeExceptionally(topLevelError.exception());
memberFutures.put(memberIdentity, future);
}
hasError = true;
} else {
for (MemberResponse memberResponse : response.memberResponses()) {
KafkaFutureImpl<Void> future = new KafkaFutureImpl<>();
Errors memberError = Errors.forCode(memberResponse.errorCode());
if (memberError != Errors.NONE) {
future.completeExceptionally(memberError.exception());
hasError = true;
} else {
future.complete(null);
}
memberFutures.put(new MemberIdentity()
.setMemberId(memberResponse.memberId())
.setGroupInstanceId(memberResponse.groupInstanceId()), future);
}
}
}
public Errors topLevelError() {
return topLevelError;
}
public boolean hasError() {
return hasError;
}
/**
* Futures of members with corresponding errors when they leave the group.
*
* @return list of members who failed to be removed
*/
public Map<MemberIdentity, KafkaFuture<Void>> memberFutures() {
return memberFutures;
}
}

4
clients/src/main/java/org/apache/kafka/common/requests/LeaveGroupResponse.java

@ -97,6 +97,10 @@ public class LeaveGroupResponse extends AbstractResponse { @@ -97,6 +97,10 @@ public class LeaveGroupResponse extends AbstractResponse {
return getError(Errors.forCode(data.errorCode()), data.members());
}
public Errors topLevelError() {
return Errors.forCode(data.errorCode());
}
private static Errors getError(Errors topLevelError, List<MemberResponse> memberResponses) {
if (topLevelError != Errors.NONE) {
return topLevelError;

2
clients/src/main/resources/common/message/LeaveGroupResponse.json

@ -19,7 +19,7 @@ @@ -19,7 +19,7 @@
"name": "LeaveGroupResponse",
// Version 1 adds the throttle time.
// Starting in version 2, on quota violation, brokers send out responses before throttling.
// Starting in version 3, we will make leave group request into batch mode.
// Starting in version 3, we will make leave group request into batch mode and add group.instance.id.
"validVersions": "0-3",
"fields": [
{ "name": "ThrottleTimeMs", "type": "int32", "versions": "1+", "ignorable": true,

133
clients/src/test/java/org/apache/kafka/clients/admin/KafkaAdminClientTest.java

@ -50,6 +50,7 @@ import org.apache.kafka.common.errors.SaslAuthenticationException; @@ -50,6 +50,7 @@ import org.apache.kafka.common.errors.SaslAuthenticationException;
import org.apache.kafka.common.errors.SecurityDisabledException;
import org.apache.kafka.common.errors.TimeoutException;
import org.apache.kafka.common.errors.TopicDeletionDisabledException;
import org.apache.kafka.common.errors.UnknownMemberIdException;
import org.apache.kafka.common.errors.UnknownServerException;
import org.apache.kafka.common.errors.UnknownTopicOrPartitionException;
import org.apache.kafka.common.message.AlterPartitionReassignmentsResponseData;
@ -66,6 +67,9 @@ import org.apache.kafka.common.message.ElectLeadersResponseData.PartitionResult; @@ -66,6 +67,9 @@ import org.apache.kafka.common.message.ElectLeadersResponseData.PartitionResult;
import org.apache.kafka.common.message.ElectLeadersResponseData.ReplicaElectionResult;
import org.apache.kafka.common.message.IncrementalAlterConfigsResponseData.AlterConfigsResourceResponse;
import org.apache.kafka.common.message.IncrementalAlterConfigsResponseData;
import org.apache.kafka.common.message.LeaveGroupRequestData.MemberIdentity;
import org.apache.kafka.common.message.LeaveGroupResponseData;
import org.apache.kafka.common.message.LeaveGroupResponseData.MemberResponse;
import org.apache.kafka.common.message.ListGroupsResponseData;
import org.apache.kafka.common.message.ListPartitionReassignmentsResponseData;
import org.apache.kafka.common.protocol.Errors;
@ -89,6 +93,7 @@ import org.apache.kafka.common.requests.DescribeGroupsResponse; @@ -89,6 +93,7 @@ import org.apache.kafka.common.requests.DescribeGroupsResponse;
import org.apache.kafka.common.requests.ElectLeadersResponse;
import org.apache.kafka.common.requests.FindCoordinatorResponse;
import org.apache.kafka.common.requests.IncrementalAlterConfigsResponse;
import org.apache.kafka.common.requests.LeaveGroupResponse;
import org.apache.kafka.common.requests.ListGroupsResponse;
import org.apache.kafka.common.requests.ListPartitionReassignmentsResponse;
import org.apache.kafka.common.requests.MetadataRequest;
@ -224,7 +229,7 @@ public class KafkaAdminClientTest { @@ -224,7 +229,7 @@ public class KafkaAdminClientTest {
private static Cluster mockBootstrapCluster() {
return Cluster.bootstrap(ClientUtils.parseAndValidateAddresses(
Collections.singletonList("localhost:8121"), ClientDnsLookup.DEFAULT));
singletonList("localhost:8121"), ClientDnsLookup.DEFAULT));
}
private static AdminClientUnitTestEnv mockClientEnv(String... configVals) {
@ -301,7 +306,7 @@ public class KafkaAdminClientTest { @@ -301,7 +306,7 @@ public class KafkaAdminClientTest {
// This tests the scenario in which the bootstrap server is unreachable for a short while,
// which prevents AdminClient from being able to send the initial metadata request
Cluster cluster = Cluster.bootstrap(Collections.singletonList(new InetSocketAddress("localhost", 8121)));
Cluster cluster = Cluster.bootstrap(singletonList(new InetSocketAddress("localhost", 8121)));
Map<Node, Long> unreachableNodes = Collections.singletonMap(cluster.nodes().get(0), 200L);
try (final AdminClientUnitTestEnv env = new AdminClientUnitTestEnv(Time.SYSTEM, cluster,
AdminClientUnitTestEnv.clientConfigs(), unreachableNodes)) {
@ -429,19 +434,19 @@ public class KafkaAdminClientTest { @@ -429,19 +434,19 @@ public class KafkaAdminClientTest {
env.kafkaClient().prepareResponse(body -> body instanceof DeleteTopicsRequest,
prepareDeleteTopicsResponse("myTopic", Errors.NONE));
KafkaFuture<Void> future = env.adminClient().deleteTopics(Collections.singletonList("myTopic"),
KafkaFuture<Void> future = env.adminClient().deleteTopics(singletonList("myTopic"),
new DeleteTopicsOptions()).all();
future.get();
env.kafkaClient().prepareResponse(body -> body instanceof DeleteTopicsRequest,
prepareDeleteTopicsResponse("myTopic", Errors.TOPIC_DELETION_DISABLED));
future = env.adminClient().deleteTopics(Collections.singletonList("myTopic"),
future = env.adminClient().deleteTopics(singletonList("myTopic"),
new DeleteTopicsOptions()).all();
TestUtils.assertFutureError(future, TopicDeletionDisabledException.class);
env.kafkaClient().prepareResponse(body -> body instanceof DeleteTopicsRequest,
prepareDeleteTopicsResponse("myTopic", Errors.UNKNOWN_TOPIC_OR_PARTITION));
future = env.adminClient().deleteTopics(Collections.singletonList("myTopic"),
future = env.adminClient().deleteTopics(singletonList("myTopic"),
new DeleteTopicsOptions()).all();
TestUtils.assertFutureError(future, UnknownTopicOrPartitionException.class);
}
@ -1550,8 +1555,8 @@ public class KafkaAdminClientTest { @@ -1550,8 +1555,8 @@ public class KafkaAdminClientTest {
AlterConfigOp.OpType.APPEND);
final Map<ConfigResource, Collection<AlterConfigOp>> configs = new HashMap<>();
configs.put(brokerResource, Collections.singletonList(alterConfigOp1));
configs.put(topicResource, Collections.singletonList(alterConfigOp2));
configs.put(brokerResource, singletonList(alterConfigOp1));
configs.put(topicResource, singletonList(alterConfigOp2));
AlterConfigsResult result = env.adminClient().incrementalAlterConfigs(configs);
TestUtils.assertFutureError(result.values().get(brokerResource), ClusterAuthorizationException.class);
@ -1566,7 +1571,117 @@ public class KafkaAdminClientTest { @@ -1566,7 +1571,117 @@ public class KafkaAdminClientTest {
.setErrorMessage(ApiError.NONE.message()));
env.kafkaClient().prepareResponse(new IncrementalAlterConfigsResponse(responseData));
env.adminClient().incrementalAlterConfigs(Collections.singletonMap(brokerResource, asList(alterConfigOp1))).all().get();
env.adminClient().incrementalAlterConfigs(Collections.singletonMap(brokerResource, singletonList(alterConfigOp1))).all().get();
}
}
@Test
public void testRemoveMembersFromGroup() throws Exception {
try (AdminClientUnitTestEnv env = mockClientEnv()) {
final String instanceOne = "instance-1";
final String instanceTwo = "instance-2";
env.kafkaClient().setNodeApiVersions(NodeApiVersions.create());
MemberResponse responseOne = new MemberResponse()
.setGroupInstanceId(instanceOne)
.setErrorCode(Errors.UNKNOWN_MEMBER_ID.code());
MemberResponse responseTwo = new MemberResponse()
.setGroupInstanceId(instanceTwo)
.setErrorCode(Errors.NONE.code());
List<MemberResponse> memberResponses = Arrays.asList(responseOne, responseTwo);
// Retriable FindCoordinatorResponse errors should be retried
env.kafkaClient().prepareResponse(prepareFindCoordinatorResponse(Errors.COORDINATOR_NOT_AVAILABLE, Node.noNode()));
env.kafkaClient().prepareResponse(prepareFindCoordinatorResponse(Errors.COORDINATOR_LOAD_IN_PROGRESS, Node.noNode()));
env.kafkaClient().prepareResponse(prepareFindCoordinatorResponse(Errors.NONE, env.cluster().controller()));
// Retriable errors should be retried
env.kafkaClient().prepareResponse(null, true);
env.kafkaClient().prepareResponse(new LeaveGroupResponse(new LeaveGroupResponseData()
.setErrorCode(Errors.COORDINATOR_NOT_AVAILABLE.code())));
env.kafkaClient().prepareResponse(new LeaveGroupResponse(new LeaveGroupResponseData()
.setErrorCode(Errors.COORDINATOR_LOAD_IN_PROGRESS.code())));
// Inject a top-level non-retriable error
env.kafkaClient().prepareResponse(new LeaveGroupResponse(new LeaveGroupResponseData()
.setErrorCode(Errors.UNKNOWN_SERVER_ERROR.code())));
String groupId = "groupId";
List<String> membersToRemove = Arrays.asList(instanceOne, instanceTwo);
final MembershipChangeResult unknownErrorResult = env.adminClient().removeMemberFromConsumerGroup(
groupId,
new RemoveMemberFromConsumerGroupOptions(membersToRemove)
);
RemoveMemberFromGroupResult result = unknownErrorResult.all();
assertTrue(result.hasError());
assertEquals(Errors.UNKNOWN_SERVER_ERROR, result.topLevelError());
Map<MemberIdentity, KafkaFuture<Void>> memberFutures = result.memberFutures();
assertEquals(2, memberFutures.size());
for (Map.Entry<MemberIdentity, KafkaFuture<Void>> entry : memberFutures.entrySet()) {
KafkaFuture<Void> memberFuture = entry.getValue();
assertTrue(memberFuture.isCompletedExceptionally());
try {
memberFuture.get();
fail("get() should throw exception");
} catch (ExecutionException | InterruptedException e0) {
assertTrue(e0.getCause() instanceof UnknownServerException);
}
}
// Inject one member level error.
env.kafkaClient().prepareResponse(prepareFindCoordinatorResponse(Errors.NONE, env.cluster().controller()));
env.kafkaClient().prepareResponse(new LeaveGroupResponse(new LeaveGroupResponseData()
.setErrorCode(Errors.NONE.code())
.setMembers(memberResponses)));
final MembershipChangeResult memberLevelErrorResult = env.adminClient().removeMemberFromConsumerGroup(
groupId,
new RemoveMemberFromConsumerGroupOptions(membersToRemove)
);
result = memberLevelErrorResult.all();
assertTrue(result.hasError());
assertEquals(Errors.NONE, result.topLevelError());
memberFutures = result.memberFutures();
assertEquals(2, memberFutures.size());
for (Map.Entry<MemberIdentity, KafkaFuture<Void>> entry : memberFutures.entrySet()) {
KafkaFuture<Void> memberFuture = entry.getValue();
if (entry.getKey().groupInstanceId().equals(instanceOne)) {
try {
memberFuture.get();
fail("get() should throw ExecutionException");
} catch (ExecutionException | InterruptedException e0) {
assertTrue(e0.getCause() instanceof UnknownMemberIdException);
}
} else {
assertFalse(memberFuture.isCompletedExceptionally());
}
}
// Return success.
env.kafkaClient().prepareResponse(prepareFindCoordinatorResponse(Errors.NONE, env.cluster().controller()));
env.kafkaClient().prepareResponse(new LeaveGroupResponse(new LeaveGroupResponseData()
.setErrorCode(Errors.NONE.code())
.setMembers(Collections.singletonList(responseTwo))));
final MembershipChangeResult noErrorResult = env.adminClient().removeMemberFromConsumerGroup(
groupId,
new RemoveMemberFromConsumerGroupOptions(membersToRemove)
);
result = noErrorResult.all();
assertFalse(result.hasError());
assertEquals(Errors.NONE, result.topLevelError());
memberFutures = result.memberFutures();
assertEquals(1, memberFutures.size());
for (Map.Entry<MemberIdentity, KafkaFuture<Void>> entry : memberFutures.entrySet()) {
assertFalse(entry.getValue().isCompletedExceptionally());
}
}
}
@ -1838,7 +1953,5 @@ public class KafkaAdminClientTest { @@ -1838,7 +1953,5 @@ public class KafkaAdminClientTest {
}
}
}
}
}

50
clients/src/test/java/org/apache/kafka/clients/admin/MembershipChangeResultTest.java

@ -0,0 +1,50 @@ @@ -0,0 +1,50 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.clients.admin;
import org.apache.kafka.common.internals.KafkaFutureImpl;
import org.apache.kafka.common.message.LeaveGroupResponseData;
import org.apache.kafka.common.requests.LeaveGroupResponse;
import org.junit.Test;
import java.util.Collections;
import java.util.concurrent.ExecutionException;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.fail;
public class MembershipChangeResultTest {
@Test
public void testConstructor() {
KafkaFutureImpl<RemoveMemberFromGroupResult> removeMemberFuture = new KafkaFutureImpl<>();
MembershipChangeResult changeResult = new MembershipChangeResult(removeMemberFuture);
assertEquals(removeMemberFuture, changeResult.future());
RemoveMemberFromGroupResult removeMemberFromGroupResult = new RemoveMemberFromGroupResult(
new LeaveGroupResponse(new LeaveGroupResponseData()),
Collections.emptyList()
);
removeMemberFuture.complete(removeMemberFromGroupResult);
try {
assertEquals(removeMemberFromGroupResult, changeResult.all());
} catch (ExecutionException | InterruptedException e) {
fail("Unexpected exception " + e + " when trying to get remove member result");
}
}
}

5
clients/src/test/java/org/apache/kafka/clients/admin/MockAdminClient.java

@ -357,6 +357,11 @@ public class MockAdminClient extends AdminClient { @@ -357,6 +357,11 @@ public class MockAdminClient extends AdminClient {
throw new UnsupportedOperationException("Not implemented yet");
}
@Override
public MembershipChangeResult removeMemberFromConsumerGroup(String groupId, RemoveMemberFromConsumerGroupOptions options) {
throw new UnsupportedOperationException("Not implemented yet");
}
@Override
public CreateAclsResult createAcls(Collection<AclBinding> acls, CreateAclsOptions options) {
throw new UnsupportedOperationException("Not implemented yet");

38
clients/src/test/java/org/apache/kafka/clients/admin/RemoveMemberFromConsumerGroupOptionsTest.java

@ -0,0 +1,38 @@ @@ -0,0 +1,38 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.clients.admin;
import org.apache.kafka.common.message.LeaveGroupRequestData.MemberIdentity;
import org.junit.Test;
import java.util.Collections;
import java.util.List;
import static org.junit.Assert.assertEquals;
public class RemoveMemberFromConsumerGroupOptionsTest {
@Test
public void testConstructor() {
List<String> groupInstanceIds = Collections.singletonList("instance-1");
RemoveMemberFromConsumerGroupOptions options = new RemoveMemberFromConsumerGroupOptions(groupInstanceIds);
assertEquals(Collections.singletonList(
new MemberIdentity().setGroupInstanceId("instance-1")), options.getMembers());
}
}

154
clients/src/test/java/org/apache/kafka/clients/admin/RemoveMemberFromGroupResultTest.java

@ -0,0 +1,154 @@ @@ -0,0 +1,154 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.clients.admin;
import org.apache.kafka.common.KafkaFuture;
import org.apache.kafka.common.errors.FencedInstanceIdException;
import org.apache.kafka.common.errors.GroupAuthorizationException;
import org.apache.kafka.common.message.LeaveGroupRequestData.MemberIdentity;
import org.apache.kafka.common.message.LeaveGroupResponseData;
import org.apache.kafka.common.message.LeaveGroupResponseData.MemberResponse;
import org.apache.kafka.common.protocol.Errors;
import org.apache.kafka.common.requests.LeaveGroupResponse;
import org.junit.Before;
import org.junit.Test;
import java.util.Arrays;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ExecutionException;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.fail;
public class RemoveMemberFromGroupResultTest {
private String instanceOne = "instance-1";
private String instanceTwo = "instance-2";
private List<MemberIdentity> membersToRemove;
private List<MemberResponse> memberResponses;
@Before
public void setUp() {
membersToRemove = Arrays.asList(
new MemberIdentity()
.setGroupInstanceId(instanceOne),
new MemberIdentity()
.setGroupInstanceId(instanceTwo)
);
memberResponses = Arrays.asList(
new MemberResponse()
.setGroupInstanceId(instanceOne),
new MemberResponse()
.setGroupInstanceId(instanceTwo)
);
}
@Test
public void testTopLevelErrorConstructor() {
RemoveMemberFromGroupResult topLevelErrorResult =
new RemoveMemberFromGroupResult(new LeaveGroupResponse(
new LeaveGroupResponseData()
.setErrorCode(Errors.GROUP_AUTHORIZATION_FAILED.code())
.setMembers(memberResponses)), membersToRemove);
assertTrue(topLevelErrorResult.hasError());
assertEquals(Errors.GROUP_AUTHORIZATION_FAILED, topLevelErrorResult.topLevelError());
Map<MemberIdentity, KafkaFuture<Void>> memberFutures = topLevelErrorResult.memberFutures();
assertEquals(2, memberFutures.size());
for (Map.Entry<MemberIdentity, KafkaFuture<Void>> entry : memberFutures.entrySet()) {
KafkaFuture<Void> memberFuture = entry.getValue();
assertTrue(memberFuture.isCompletedExceptionally());
try {
memberFuture.get();
fail("get() should throw ExecutionException");
} catch (ExecutionException | InterruptedException e0) {
assertTrue(e0.getCause() instanceof GroupAuthorizationException);
}
}
}
@Test
public void testMemberLevelErrorConstructor() {
MemberResponse responseOne = new MemberResponse()
.setGroupInstanceId(instanceOne)
.setErrorCode(Errors.FENCED_INSTANCE_ID.code());
MemberResponse responseTwo = new MemberResponse()
.setGroupInstanceId(instanceTwo)
.setErrorCode(Errors.NONE.code());
RemoveMemberFromGroupResult memberLevelErrorResult = new RemoveMemberFromGroupResult(
new LeaveGroupResponse(new LeaveGroupResponseData()
.setMembers(Arrays.asList(responseOne, responseTwo))),
membersToRemove);
assertTrue(memberLevelErrorResult.hasError());
assertEquals(Errors.NONE, memberLevelErrorResult.topLevelError());
Map<MemberIdentity, KafkaFuture<Void>> memberFutures = memberLevelErrorResult.memberFutures();
assertEquals(2, memberFutures.size());
for (Map.Entry<MemberIdentity, KafkaFuture<Void>> entry : memberFutures.entrySet()) {
KafkaFuture<Void> memberFuture = entry.getValue();
if (entry.getKey().groupInstanceId().equals(instanceOne)) {
assertTrue(memberFuture.isCompletedExceptionally());
try {
memberFuture.get();
fail("get() should throw ExecutionException");
} catch (ExecutionException | InterruptedException e0) {
assertTrue(e0.getCause() instanceof FencedInstanceIdException);
}
} else {
assertFalse(memberFuture.isCompletedExceptionally());
try {
memberFuture.get();
} catch (ExecutionException | InterruptedException e0) {
fail("get() shouldn't throw exception");
}
}
}
}
@Test
public void testNoErrorConstructor() {
MemberResponse responseOne = new MemberResponse()
.setGroupInstanceId(instanceOne)
.setErrorCode(Errors.NONE.code());
MemberResponse responseTwo = new MemberResponse()
.setGroupInstanceId(instanceTwo)
.setErrorCode(Errors.NONE.code());
// If no error is specified, failed members are not visible.
RemoveMemberFromGroupResult noErrorResult = new RemoveMemberFromGroupResult(
new LeaveGroupResponse(new LeaveGroupResponseData()
.setMembers(Arrays.asList(responseOne, responseTwo))),
membersToRemove);
assertFalse(noErrorResult.hasError());
assertEquals(Errors.NONE, noErrorResult.topLevelError());
Map<MemberIdentity, KafkaFuture<Void>> memberFutures = noErrorResult.memberFutures();
assertEquals(2, memberFutures.size());
for (Map.Entry<MemberIdentity, KafkaFuture<Void>> entry : memberFutures.entrySet()) {
try {
entry.getValue().get();
} catch (ExecutionException | InterruptedException e0) {
fail("get() shouldn't throw exception");
}
}
}
}

85
core/src/test/scala/integration/kafka/api/AdminClientIntegrationTest.scala

@ -43,7 +43,11 @@ import org.apache.kafka.common.TopicPartitionReplica @@ -43,7 +43,11 @@ import org.apache.kafka.common.TopicPartitionReplica
import org.apache.kafka.common.acl._
import org.apache.kafka.common.config.{ConfigResource, LogLevelConfig}
import org.apache.kafka.common.errors._
import org.apache.kafka.common.requests.{DeleteRecordsRequest, MetadataResponse}
import org.apache.kafka.common.internals.KafkaFutureImpl
import org.apache.kafka.common.message.LeaveGroupRequestData.MemberIdentity
import org.apache.kafka.common.message.LeaveGroupResponseData.MemberResponse
import org.apache.kafka.common.protocol.Errors
import org.apache.kafka.common.requests.{DeleteRecordsRequest, JoinGroupRequest, MetadataResponse}
import org.apache.kafka.common.resource.{PatternType, Resource, ResourcePattern, ResourceType}
import org.apache.kafka.common.utils.{Time, Utils}
import org.junit.Assert._
@ -1168,10 +1172,12 @@ class AdminClientIntegrationTest extends IntegrationTestHarness with Logging { @@ -1168,10 +1172,12 @@ class AdminClientIntegrationTest extends IntegrationTestHarness with Logging {
}
val testGroupId = "test_group_id"
val testClientId = "test_client_id"
val testInstanceId = "test_instance_id"
val fakeGroupId = "fake_group_id"
val newConsumerConfig = new Properties(consumerConfig)
newConsumerConfig.setProperty(ConsumerConfig.GROUP_ID_CONFIG, testGroupId)
newConsumerConfig.setProperty(ConsumerConfig.CLIENT_ID_CONFIG, testClientId)
newConsumerConfig.setProperty(ConsumerConfig.GROUP_INSTANCE_ID_CONFIG, testInstanceId)
val consumer = createConsumer(configOverrides = newConsumerConfig)
val latch = new CountDownLatch(1)
try {
@ -1201,13 +1207,13 @@ class AdminClientIntegrationTest extends IntegrationTestHarness with Logging { @@ -1201,13 +1207,13 @@ class AdminClientIntegrationTest extends IntegrationTestHarness with Logging {
!matching.isEmpty
}, s"Expected to be able to list $testGroupId")
val result = client.describeConsumerGroups(Seq(testGroupId, fakeGroupId).asJava,
val describeWithFakeGroupResult = client.describeConsumerGroups(Seq(testGroupId, fakeGroupId).asJava,
new DescribeConsumerGroupsOptions().includeAuthorizedOperations(true))
assertEquals(2, result.describedGroups().size())
assertEquals(2, describeWithFakeGroupResult.describedGroups().size())
// Test that we can get information about the test consumer group.
assertTrue(result.describedGroups().containsKey(testGroupId))
val testGroupDescription = result.describedGroups().get(testGroupId).get()
assertTrue(describeWithFakeGroupResult.describedGroups().containsKey(testGroupId))
var testGroupDescription = describeWithFakeGroupResult.describedGroups().get(testGroupId).get()
assertEquals(testGroupId, testGroupDescription.groupId())
assertFalse(testGroupDescription.isSimpleConsumerGroup())
@ -1223,8 +1229,8 @@ class AdminClientIntegrationTest extends IntegrationTestHarness with Logging { @@ -1223,8 +1229,8 @@ class AdminClientIntegrationTest extends IntegrationTestHarness with Logging {
assertEquals(expectedOperations, testGroupDescription.authorizedOperations())
// Test that the fake group is listed as dead.
assertTrue(result.describedGroups().containsKey(fakeGroupId))
val fakeGroupDescription = result.describedGroups().get(fakeGroupId).get()
assertTrue(describeWithFakeGroupResult.describedGroups().containsKey(fakeGroupId))
val fakeGroupDescription = describeWithFakeGroupResult.describedGroups().get(fakeGroupId).get()
assertEquals(fakeGroupId, fakeGroupDescription.groupId())
assertEquals(0, fakeGroupDescription.members().size())
@ -1233,7 +1239,7 @@ class AdminClientIntegrationTest extends IntegrationTestHarness with Logging { @@ -1233,7 +1239,7 @@ class AdminClientIntegrationTest extends IntegrationTestHarness with Logging {
assertEquals(expectedOperations, fakeGroupDescription.authorizedOperations())
// Test that all() returns 2 results
assertEquals(2, result.all().get().size())
assertEquals(2, describeWithFakeGroupResult.all().get().size())
// Test listConsumerGroupOffsets
TestUtils.waitUntilTrue(() => {
@ -1242,8 +1248,30 @@ class AdminClientIntegrationTest extends IntegrationTestHarness with Logging { @@ -1242,8 +1248,30 @@ class AdminClientIntegrationTest extends IntegrationTestHarness with Logging {
parts.containsKey(part) && (parts.get(part).offset() == 1)
}, s"Expected the offset for partition 0 to eventually become 1.")
// Test delete non-exist consumer instance
val invalidInstanceId = "invalid-instance-id"
var removeMemberResult = client.removeMemberFromConsumerGroup(testGroupId, new RemoveMemberFromConsumerGroupOptions(
Collections.singletonList(invalidInstanceId)
)).all()
assertTrue(removeMemberResult.hasError)
assertEquals(Errors.NONE, removeMemberResult.topLevelError)
val firstMemberFutures = removeMemberResult.memberFutures()
assertEquals(1, firstMemberFutures.size)
firstMemberFutures.values.asScala foreach { case value =>
try {
value.get()
} catch {
case e: ExecutionException =>
assertTrue(e.getCause.isInstanceOf[UnknownMemberIdException])
case _ =>
fail("Should have caught exception in getting member future")
}
}
// Test consumer group deletion
val deleteResult = client.deleteConsumerGroups(Seq(testGroupId, fakeGroupId).asJava)
var deleteResult = client.deleteConsumerGroups(Seq(testGroupId, fakeGroupId).asJava)
assertEquals(2, deleteResult.deletedGroups().size())
// Deleting the fake group ID should get GroupIdNotFoundException.
@ -1255,6 +1283,45 @@ class AdminClientIntegrationTest extends IntegrationTestHarness with Logging { @@ -1255,6 +1283,45 @@ class AdminClientIntegrationTest extends IntegrationTestHarness with Logging {
assertTrue(deleteResult.deletedGroups().containsKey(testGroupId))
assertFutureExceptionTypeEquals(deleteResult.deletedGroups().get(testGroupId),
classOf[GroupNotEmptyException])
// Test delete correct member
removeMemberResult = client.removeMemberFromConsumerGroup(testGroupId, new RemoveMemberFromConsumerGroupOptions(
Collections.singletonList(testInstanceId)
)).all()
assertFalse(removeMemberResult.hasError)
assertEquals(Errors.NONE, removeMemberResult.topLevelError)
val deletedMemberFutures = removeMemberResult.memberFutures()
assertEquals(1, firstMemberFutures.size)
deletedMemberFutures.values.asScala foreach { case value =>
try {
value.get()
} catch {
case e: ExecutionException =>
assertTrue(e.getCause.isInstanceOf[UnknownMemberIdException])
case _ =>
fail("Should have caught exception in getting member future")
}
}
// The group should contain no member now.
val describeTestGroupResult = client.describeConsumerGroups(Seq(testGroupId).asJava,
new DescribeConsumerGroupsOptions().includeAuthorizedOperations(true))
assertEquals(1, describeTestGroupResult.describedGroups().size())
testGroupDescription = describeTestGroupResult.describedGroups().get(testGroupId).get()
assertEquals(testGroupId, testGroupDescription.groupId)
assertFalse(testGroupDescription.isSimpleConsumerGroup)
assertTrue(testGroupDescription.members().isEmpty)
// Consumer group deletion on empty group should succeed
deleteResult = client.deleteConsumerGroups(Seq(testGroupId).asJava)
assertEquals(1, deleteResult.deletedGroups().size())
assertTrue(deleteResult.deletedGroups().containsKey(testGroupId))
assertNull(deleteResult.deletedGroups().get(testGroupId).get())
} finally {
consumerThread.interrupt()
consumerThread.join()

Loading…
Cancel
Save