From 26e238c6f5a242459f52bfefa97a6b0c247b2d5e Mon Sep 17 00:00:00 2001 From: Chia-Ping Tsai Date: Wed, 17 Jun 2020 22:56:07 +0800 Subject: [PATCH] KAFKA-10147 MockAdminClient#describeConfigs(Collection) is unable to handle broker resource (#8853) Author: Chia-Ping Tsai Reviewers: Boyang Chen , Randall Hauch --- .../kafka/clients/admin/MockAdminClient.java | 72 ++++++------------- 1 file changed, 23 insertions(+), 49 deletions(-) diff --git a/clients/src/test/java/org/apache/kafka/clients/admin/MockAdminClient.java b/clients/src/test/java/org/apache/kafka/clients/admin/MockAdminClient.java index 2b86d4fb274..7c6a9559b8e 100644 --- a/clients/src/test/java/org/apache/kafka/clients/admin/MockAdminClient.java +++ b/clients/src/test/java/org/apache/kafka/clients/admin/MockAdminClient.java @@ -51,6 +51,7 @@ import java.util.List; import java.util.Map; import java.util.Optional; import java.util.Set; +import java.util.stream.Collectors; public class MockAdminClient extends AdminClient { public static final String DEFAULT_CLUSTER_ID = "I4ZmrWqfT2e-upky_4fdPA"; @@ -365,51 +366,6 @@ public class MockAdminClient extends AdminClient { return new DescribeTopicsResult(topicDescriptions); } - @Override - public DescribeConfigsResult describeConfigs(Collection resources) { - Map> topicConfigs = new HashMap<>(); - - if (timeoutNextRequests > 0) { - for (ConfigResource requestedResource : resources) { - KafkaFutureImpl future = new KafkaFutureImpl<>(); - future.completeExceptionally(new TimeoutException()); - topicConfigs.put(requestedResource, future); - } - - --timeoutNextRequests; - return new DescribeConfigsResult(topicConfigs); - } - - for (ConfigResource requestedResource : resources) { - if (requestedResource.type() != ConfigResource.Type.TOPIC) { - continue; - } - for (Map.Entry topicDescription : allTopics.entrySet()) { - String topicName = topicDescription.getKey(); - if (topicName.equals(requestedResource.name()) && !topicDescription.getValue().markedForDeletion) { - if (topicDescription.getValue().fetchesRemainingUntilVisible > 0) { - topicDescription.getValue().fetchesRemainingUntilVisible--; - } else { - TopicMetadata topicMetadata = topicDescription.getValue(); - KafkaFutureImpl future = new KafkaFutureImpl<>(); - Collection entries = new ArrayList<>(); - topicMetadata.configs.forEach((k, v) -> entries.add(new ConfigEntry(k, v))); - future.complete(new Config(entries)); - topicConfigs.put(requestedResource, future); - break; - } - } - } - if (!topicConfigs.containsKey(requestedResource)) { - KafkaFutureImpl future = new KafkaFutureImpl<>(); - future.completeExceptionally(new UnknownTopicOrPartitionException("Resource " + requestedResource + " not found.")); - topicConfigs.put(requestedResource, future); - } - } - - return new DescribeConfigsResult(topicConfigs); - } - @Override synchronized public DeleteTopicsResult deleteTopics(Collection topicsToDelete, DeleteTopicsOptions options) { Map> deleteTopicsResult = new HashMap<>(); @@ -535,6 +491,19 @@ public class MockAdminClient extends AdminClient { @Override synchronized public DescribeConfigsResult describeConfigs(Collection resources, DescribeConfigsOptions options) { + + if (timeoutNextRequests > 0) { + Map> configs = new HashMap<>(); + for (ConfigResource requestedResource : resources) { + KafkaFutureImpl future = new KafkaFutureImpl<>(); + future.completeExceptionally(new TimeoutException()); + configs.put(requestedResource, future); + } + + --timeoutNextRequests; + return new DescribeConfigsResult(configs); + } + Map> results = new HashMap<>(); for (ConfigResource resource : resources) { KafkaFutureImpl future = new KafkaFutureImpl<>(); @@ -551,7 +520,7 @@ public class MockAdminClient extends AdminClient { synchronized private Config getResourceDescription(ConfigResource resource) { switch (resource.type()) { case BROKER: { - int brokerId = Integer.valueOf(resource.name()); + int brokerId = Integer.parseInt(resource.name()); if (brokerId >= brokerConfigs.size()) { throw new InvalidRequestException("Broker " + resource.name() + " not found."); @@ -560,10 +529,15 @@ public class MockAdminClient extends AdminClient { } case TOPIC: { TopicMetadata topicMetadata = allTopics.get(resource.name()); - if (topicMetadata == null) { - throw new UnknownTopicOrPartitionException(); + if (topicMetadata != null && !topicMetadata.markedForDeletion) { + if (topicMetadata.fetchesRemainingUntilVisible > 0) + topicMetadata.fetchesRemainingUntilVisible = Math.max(0, topicMetadata.fetchesRemainingUntilVisible - 1); + else return new Config(topicMetadata.configs.entrySet().stream() + .map(entry -> new ConfigEntry(entry.getKey(), entry.getValue())) + .collect(Collectors.toList())); + } - return toConfigObject(topicMetadata.configs); + throw new UnknownTopicOrPartitionException("Resource " + resource + " not found."); } default: throw new UnsupportedOperationException("Not implemented yet");