Browse Source

KAFKA-10147 MockAdminClient#describeConfigs(Collection<ConfigResource>) is unable to handle broker resource (#8853)

Author: Chia-Ping Tsai <chia7712@gmail.com>
Reviewers: Boyang Chen <boyang@confluent.io>, Randall Hauch <rhauch@gmail.com>
pull/8889/head
Chia-Ping Tsai 4 years ago committed by GitHub
parent
commit
26e238c6f5
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 72
      clients/src/test/java/org/apache/kafka/clients/admin/MockAdminClient.java

72
clients/src/test/java/org/apache/kafka/clients/admin/MockAdminClient.java

@ -51,6 +51,7 @@ import java.util.List; @@ -51,6 +51,7 @@ import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.stream.Collectors;
public class MockAdminClient extends AdminClient {
public static final String DEFAULT_CLUSTER_ID = "I4ZmrWqfT2e-upky_4fdPA";
@ -365,51 +366,6 @@ public class MockAdminClient extends AdminClient { @@ -365,51 +366,6 @@ public class MockAdminClient extends AdminClient {
return new DescribeTopicsResult(topicDescriptions);
}
@Override
public DescribeConfigsResult describeConfigs(Collection<ConfigResource> resources) {
Map<ConfigResource, KafkaFuture<Config>> topicConfigs = new HashMap<>();
if (timeoutNextRequests > 0) {
for (ConfigResource requestedResource : resources) {
KafkaFutureImpl<Config> future = new KafkaFutureImpl<>();
future.completeExceptionally(new TimeoutException());
topicConfigs.put(requestedResource, future);
}
--timeoutNextRequests;
return new DescribeConfigsResult(topicConfigs);
}
for (ConfigResource requestedResource : resources) {
if (requestedResource.type() != ConfigResource.Type.TOPIC) {
continue;
}
for (Map.Entry<String, TopicMetadata> topicDescription : allTopics.entrySet()) {
String topicName = topicDescription.getKey();
if (topicName.equals(requestedResource.name()) && !topicDescription.getValue().markedForDeletion) {
if (topicDescription.getValue().fetchesRemainingUntilVisible > 0) {
topicDescription.getValue().fetchesRemainingUntilVisible--;
} else {
TopicMetadata topicMetadata = topicDescription.getValue();
KafkaFutureImpl<Config> future = new KafkaFutureImpl<>();
Collection<ConfigEntry> entries = new ArrayList<>();
topicMetadata.configs.forEach((k, v) -> entries.add(new ConfigEntry(k, v)));
future.complete(new Config(entries));
topicConfigs.put(requestedResource, future);
break;
}
}
}
if (!topicConfigs.containsKey(requestedResource)) {
KafkaFutureImpl<Config> future = new KafkaFutureImpl<>();
future.completeExceptionally(new UnknownTopicOrPartitionException("Resource " + requestedResource + " not found."));
topicConfigs.put(requestedResource, future);
}
}
return new DescribeConfigsResult(topicConfigs);
}
@Override
synchronized public DeleteTopicsResult deleteTopics(Collection<String> topicsToDelete, DeleteTopicsOptions options) {
Map<String, KafkaFuture<Void>> deleteTopicsResult = new HashMap<>();
@ -535,6 +491,19 @@ public class MockAdminClient extends AdminClient { @@ -535,6 +491,19 @@ public class MockAdminClient extends AdminClient {
@Override
synchronized public DescribeConfigsResult describeConfigs(Collection<ConfigResource> resources, DescribeConfigsOptions options) {
if (timeoutNextRequests > 0) {
Map<ConfigResource, KafkaFuture<Config>> configs = new HashMap<>();
for (ConfigResource requestedResource : resources) {
KafkaFutureImpl<Config> future = new KafkaFutureImpl<>();
future.completeExceptionally(new TimeoutException());
configs.put(requestedResource, future);
}
--timeoutNextRequests;
return new DescribeConfigsResult(configs);
}
Map<ConfigResource, KafkaFuture<Config>> results = new HashMap<>();
for (ConfigResource resource : resources) {
KafkaFutureImpl<Config> future = new KafkaFutureImpl<>();
@ -551,7 +520,7 @@ public class MockAdminClient extends AdminClient { @@ -551,7 +520,7 @@ public class MockAdminClient extends AdminClient {
synchronized private Config getResourceDescription(ConfigResource resource) {
switch (resource.type()) {
case BROKER: {
int brokerId = Integer.valueOf(resource.name());
int brokerId = Integer.parseInt(resource.name());
if (brokerId >= brokerConfigs.size()) {
throw new InvalidRequestException("Broker " + resource.name() +
" not found.");
@ -560,10 +529,15 @@ public class MockAdminClient extends AdminClient { @@ -560,10 +529,15 @@ public class MockAdminClient extends AdminClient {
}
case TOPIC: {
TopicMetadata topicMetadata = allTopics.get(resource.name());
if (topicMetadata == null) {
throw new UnknownTopicOrPartitionException();
if (topicMetadata != null && !topicMetadata.markedForDeletion) {
if (topicMetadata.fetchesRemainingUntilVisible > 0)
topicMetadata.fetchesRemainingUntilVisible = Math.max(0, topicMetadata.fetchesRemainingUntilVisible - 1);
else return new Config(topicMetadata.configs.entrySet().stream()
.map(entry -> new ConfigEntry(entry.getKey(), entry.getValue()))
.collect(Collectors.toList()));
}
return toConfigObject(topicMetadata.configs);
throw new UnknownTopicOrPartitionException("Resource " + resource + " not found.");
}
default:
throw new UnsupportedOperationException("Not implemented yet");

Loading…
Cancel
Save