diff --git a/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java b/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java index 88705195338..f0e6635db89 100644 --- a/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java +++ b/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java @@ -2551,7 +2551,7 @@ public class KafkaAdminClient extends AdminClient { void handleResponse(AbstractResponse abstractResponse) { final FindCoordinatorResponse fcResponse = (FindCoordinatorResponse) abstractResponse; - if (handleFindCoordinatorError(fcResponse, futures.get(groupId))) + if (handleGroupRequestError(fcResponse.error(), futures.get(groupId))) return; final long nowDescribeConsumerGroups = time.milliseconds(); @@ -2577,38 +2577,37 @@ public class KafkaAdminClient extends AdminClient { .findFirst().get(); final Errors groupError = Errors.forCode(describedGroup.errorCode()); - if (groupError != Errors.NONE) { - // TODO: KAFKA-6789, we can retry based on the error code - future.completeExceptionally(groupError.exception()); - } else { - final String protocolType = describedGroup.protocolType(); - if (protocolType.equals(ConsumerProtocol.PROTOCOL_TYPE) || protocolType.isEmpty()) { - final List members = describedGroup.members(); - final List memberDescriptions = new ArrayList<>(members.size()); - final Set authorizedOperations = validAclOperations(describedGroup.authorizedOperations()); - for (DescribedGroupMember groupMember : members) { - Set partitions = Collections.emptySet(); - if (groupMember.memberAssignment().length > 0) { - final PartitionAssignor.Assignment assignment = ConsumerProtocol. - deserializeAssignment(ByteBuffer.wrap(groupMember.memberAssignment())); - partitions = new HashSet<>(assignment.partitions()); - } - final MemberDescription memberDescription = - new MemberDescription(groupMember.memberId(), - groupMember.clientId(), - groupMember.clientHost(), - new MemberAssignment(partitions)); - memberDescriptions.add(memberDescription); + + if (handleGroupRequestError(groupError, future)) + return; + + final String protocolType = describedGroup.protocolType(); + if (protocolType.equals(ConsumerProtocol.PROTOCOL_TYPE) || protocolType.isEmpty()) { + final List members = describedGroup.members(); + final List memberDescriptions = new ArrayList<>(members.size()); + final Set authorizedOperations = validAclOperations(describedGroup.authorizedOperations()); + for (DescribedGroupMember groupMember : members) { + Set partitions = Collections.emptySet(); + if (groupMember.memberAssignment().length > 0) { + final PartitionAssignor.Assignment assignment = ConsumerProtocol. + deserializeAssignment(ByteBuffer.wrap(groupMember.memberAssignment())); + partitions = new HashSet<>(assignment.partitions()); } - final ConsumerGroupDescription consumerGroupDescription = - new ConsumerGroupDescription(groupId, protocolType.isEmpty(), - memberDescriptions, - describedGroup.protocolData(), - ConsumerGroupState.parse(describedGroup.groupState()), - fcResponse.node(), - authorizedOperations); - future.complete(consumerGroupDescription); + final MemberDescription memberDescription = + new MemberDescription(groupMember.memberId(), + groupMember.clientId(), + groupMember.clientHost(), + new MemberAssignment(partitions)); + memberDescriptions.add(memberDescription); } + final ConsumerGroupDescription consumerGroupDescription = + new ConsumerGroupDescription(groupId, protocolType.isEmpty(), + memberDescriptions, + describedGroup.protocolData(), + ConsumerGroupState.parse(describedGroup.groupState()), + fcResponse.node(), + authorizedOperations); + future.complete(consumerGroupDescription); } } @@ -2641,11 +2640,10 @@ public class KafkaAdminClient extends AdminClient { .collect(Collectors.toSet()); } - private boolean handleFindCoordinatorError(FindCoordinatorResponse response, KafkaFutureImpl future) { - Errors error = response.error(); - if (error.exception() instanceof RetriableException) { + private boolean handleGroupRequestError(Errors error, KafkaFutureImpl future) { + if (error == Errors.COORDINATOR_LOAD_IN_PROGRESS || error == Errors.COORDINATOR_NOT_AVAILABLE) { throw error.exception(); - } else if (response.hasError()) { + } else if (error != Errors.NONE) { future.completeExceptionally(error.exception()); return true; } @@ -2797,7 +2795,7 @@ public class KafkaAdminClient extends AdminClient { void handleResponse(AbstractResponse abstractResponse) { final FindCoordinatorResponse response = (FindCoordinatorResponse) abstractResponse; - if (handleFindCoordinatorError(response, groupOffsetListingFuture)) + if (handleGroupRequestError(response.error(), groupOffsetListingFuture)) return; final long nowListConsumerGroupOffsets = time.milliseconds(); @@ -2815,26 +2813,25 @@ public class KafkaAdminClient extends AdminClient { final OffsetFetchResponse response = (OffsetFetchResponse) abstractResponse; final Map groupOffsetsListing = new HashMap<>(); - if (response.hasError()) { - groupOffsetListingFuture.completeExceptionally(response.error().exception()); - } else { - for (Map.Entry entry : - response.responseData().entrySet()) { - final TopicPartition topicPartition = entry.getKey(); - OffsetFetchResponse.PartitionData partitionData = entry.getValue(); - final Errors error = partitionData.error; - - if (error == Errors.NONE) { - final Long offset = partitionData.offset; - final String metadata = partitionData.metadata; - final Optional leaderEpoch = partitionData.leaderEpoch; - groupOffsetsListing.put(topicPartition, new OffsetAndMetadata(offset, leaderEpoch, metadata)); - } else { - log.warn("Skipping return offset for {} due to error {}.", topicPartition, error); - } + if (handleGroupRequestError(response.error(), groupOffsetListingFuture)) + return; + + for (Map.Entry entry : + response.responseData().entrySet()) { + final TopicPartition topicPartition = entry.getKey(); + OffsetFetchResponse.PartitionData partitionData = entry.getValue(); + final Errors error = partitionData.error; + + if (error == Errors.NONE) { + final Long offset = partitionData.offset; + final String metadata = partitionData.metadata; + final Optional leaderEpoch = partitionData.leaderEpoch; + groupOffsetsListing.put(topicPartition, new OffsetAndMetadata(offset, leaderEpoch, metadata)); + } else { + log.warn("Skipping return offset for {} due to error {}.", topicPartition, error); } - groupOffsetListingFuture.complete(groupOffsetsListing); } + groupOffsetListingFuture.complete(groupOffsetsListing); } @Override @@ -2891,7 +2888,7 @@ public class KafkaAdminClient extends AdminClient { void handleResponse(AbstractResponse abstractResponse) { final FindCoordinatorResponse response = (FindCoordinatorResponse) abstractResponse; - if (handleFindCoordinatorError(response, futures.get(groupId))) + if (handleGroupRequestError(response.error(), futures.get(groupId))) return; final long nowDeleteConsumerGroups = time.milliseconds(); @@ -2912,11 +2909,10 @@ public class KafkaAdminClient extends AdminClient { KafkaFutureImpl future = futures.get(groupId); final Errors groupError = response.get(groupId); - if (groupError != Errors.NONE) { - future.completeExceptionally(groupError.exception()); - } else { - future.complete(null); - } + if (handleGroupRequestError(groupError, future)) + return; + + future.complete(null); } @Override diff --git a/clients/src/main/java/org/apache/kafka/common/requests/ListGroupsResponse.java b/clients/src/main/java/org/apache/kafka/common/requests/ListGroupsResponse.java index af6f7212e8c..de169987252 100644 --- a/clients/src/main/java/org/apache/kafka/common/requests/ListGroupsResponse.java +++ b/clients/src/main/java/org/apache/kafka/common/requests/ListGroupsResponse.java @@ -62,7 +62,7 @@ public class ListGroupsResponse extends AbstractResponse { /** * Possible error codes: * - * COORDINATOR_LOADING_IN_PROGRESS (14) + * COORDINATOR_LOAD_IN_PROGRESS (14) * COORDINATOR_NOT_AVAILABLE (15) * AUTHORIZATION_FAILED (29) */ diff --git a/clients/src/test/java/org/apache/kafka/clients/admin/KafkaAdminClientTest.java b/clients/src/test/java/org/apache/kafka/clients/admin/KafkaAdminClientTest.java index 687dad2ed12..40ba8b1781e 100644 --- a/clients/src/test/java/org/apache/kafka/clients/admin/KafkaAdminClientTest.java +++ b/clients/src/test/java/org/apache/kafka/clients/admin/KafkaAdminClientTest.java @@ -942,7 +942,7 @@ public class KafkaAdminClientTest { Collections.emptySet(), Collections.emptySet(), nodes.get(0)); - try (AdminClientUnitTestEnv env = new AdminClientUnitTestEnv(cluster)) { + try (AdminClientUnitTestEnv env = new AdminClientUnitTestEnv(cluster, AdminClientConfig.RETRIES_CONFIG, "2")) { env.kafkaClient().setNodeApiVersions(NodeApiVersions.create()); // Empty metadata response should be retried @@ -972,8 +972,8 @@ public class KafkaAdminClientTest { // handle retriable errors env.kafkaClient().prepareResponseFrom( new ListGroupsResponse( - Errors.COORDINATOR_NOT_AVAILABLE, - Collections.emptyList() + Errors.COORDINATOR_NOT_AVAILABLE, + Collections.emptyList() ), node1); env.kafkaClient().prepareResponseFrom( @@ -1076,9 +1076,37 @@ public class KafkaAdminClientTest { try (AdminClientUnitTestEnv env = new AdminClientUnitTestEnv(cluster)) { env.kafkaClient().setNodeApiVersions(NodeApiVersions.create()); - env.kafkaClient().prepareResponse(prepareFindCoordinatorResponse(Errors.NONE, env.cluster().controller())); + //Retriable FindCoordinatorResponse errors should be retried + env.kafkaClient().prepareResponse(FindCoordinatorResponse.prepareResponse(Errors.COORDINATOR_NOT_AVAILABLE, Node.noNode())); + env.kafkaClient().prepareResponse(FindCoordinatorResponse.prepareResponse(Errors.COORDINATOR_LOAD_IN_PROGRESS, Node.noNode())); + + env.kafkaClient().prepareResponse(FindCoordinatorResponse.prepareResponse(Errors.NONE, env.cluster().controller())); DescribeGroupsResponseData data = new DescribeGroupsResponseData(); + + //Retriable errors should be retried + data.groups().add(DescribeGroupsResponse.groupMetadata( + "group-0", + Errors.COORDINATOR_LOAD_IN_PROGRESS, + "", + "", + "", + Collections.emptyList(), + Collections.emptySet())); + env.kafkaClient().prepareResponse(new DescribeGroupsResponse(data)); + + data = new DescribeGroupsResponseData(); + data.groups().add(DescribeGroupsResponse.groupMetadata( + "group-0", + Errors.COORDINATOR_NOT_AVAILABLE, + "", + "", + "", + Collections.emptyList(), + Collections.emptySet())); + env.kafkaClient().prepareResponse(new DescribeGroupsResponse(data)); + + data = new DescribeGroupsResponseData(); TopicPartition myTopicPartition0 = new TopicPartition("my_topic", 0); TopicPartition myTopicPartition1 = new TopicPartition("my_topic", 1); TopicPartition myTopicPartition2 = new TopicPartition("my_topic", 2); @@ -1143,7 +1171,14 @@ public class KafkaAdminClientTest { try (AdminClientUnitTestEnv env = new AdminClientUnitTestEnv(cluster)) { env.kafkaClient().setNodeApiVersions(NodeApiVersions.create()); - env.kafkaClient().prepareResponse(prepareFindCoordinatorResponse(Errors.NONE, env.cluster().controller())); + //Retriable FindCoordinatorResponse errors should be retried + env.kafkaClient().prepareResponse(FindCoordinatorResponse.prepareResponse(Errors.COORDINATOR_NOT_AVAILABLE, Node.noNode())); + + env.kafkaClient().prepareResponse(FindCoordinatorResponse.prepareResponse(Errors.NONE, env.cluster().controller())); + + //Retriable errors should be retried + env.kafkaClient().prepareResponse(new OffsetFetchResponse(Errors.COORDINATOR_NOT_AVAILABLE, Collections.emptyMap())); + env.kafkaClient().prepareResponse(new OffsetFetchResponse(Errors.COORDINATOR_LOAD_IN_PROGRESS, Collections.emptyMap())); TopicPartition myTopicPartition0 = new TopicPartition("my_topic", 0); TopicPartition myTopicPartition1 = new TopicPartition("my_topic", 1); @@ -1192,9 +1227,9 @@ public class KafkaAdminClientTest { env.kafkaClient().prepareResponse(prepareFindCoordinatorResponse(Errors.NONE, env.cluster().controller())); - final Map response = new HashMap<>(); - response.put("group-0", Errors.NONE); - env.kafkaClient().prepareResponse(new DeleteGroupsResponse(response)); + final Map validResponse = new HashMap<>(); + validResponse.put("group-0", Errors.NONE); + env.kafkaClient().prepareResponse(new DeleteGroupsResponse(validResponse)); final DeleteConsumerGroupsResult result = env.adminClient().deleteConsumerGroups(groupIds); @@ -1207,6 +1242,23 @@ public class KafkaAdminClientTest { final DeleteConsumerGroupsResult errorResult = env.adminClient().deleteConsumerGroups(groupIds); TestUtils.assertFutureError(errorResult.deletedGroups().get("group-0"), GroupAuthorizationException.class); + //Retriable errors should be retried + env.kafkaClient().prepareResponse(FindCoordinatorResponse.prepareResponse(Errors.NONE, env.cluster().controller())); + + final Map errorResponse1 = new HashMap<>(); + errorResponse1.put("group-0", Errors.COORDINATOR_NOT_AVAILABLE); + env.kafkaClient().prepareResponse(new DeleteGroupsResponse(errorResponse1)); + + final Map errorResponse2 = new HashMap<>(); + errorResponse2.put("group-0", Errors.COORDINATOR_LOAD_IN_PROGRESS); + env.kafkaClient().prepareResponse(new DeleteGroupsResponse(errorResponse2)); + + env.kafkaClient().prepareResponse(new DeleteGroupsResponse(validResponse)); + + final DeleteConsumerGroupsResult errorResult1 = env.adminClient().deleteConsumerGroups(groupIds); + + final KafkaFuture errorResults = errorResult1.deletedGroups().get("group-0"); + assertNull(errorResults.get()); } }