diff --git a/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java b/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java index 6d6788dab55..36cbe6cad00 100644 --- a/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java +++ b/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java @@ -1077,19 +1077,33 @@ public class KafkaAdminClient extends AdminClient { } } + /** + * Returns true if a topic name cannot be represented in an RPC. This function does NOT check + * whether the name is too long, contains invalid characters, etc. It is better to enforce + * those policies on the server, so that they can be changed in the future if needed. + */ + private static boolean topicNameIsUnrepresentable(String topicName) { + return (topicName == null) || topicName.isEmpty(); + } + @Override public CreateTopicsResult createTopics(final Collection newTopics, final CreateTopicsOptions options) { final Map> topicFutures = new HashMap<>(newTopics.size()); final Map topicsMap = new HashMap<>(newTopics.size()); for (NewTopic newTopic : newTopics) { - if (topicFutures.get(newTopic.name()) == null) { + if (topicNameIsUnrepresentable(newTopic.name())) { + KafkaFutureImpl future = new KafkaFutureImpl<>(); + future.completeExceptionally(new InvalidTopicException("The given topic name '" + + newTopic.name() + "' cannot be represented in a request.")); + topicFutures.put(newTopic.name(), future); + } else if (!topicFutures.containsKey(newTopic.name())) { topicFutures.put(newTopic.name(), new KafkaFutureImpl()); topicsMap.put(newTopic.name(), newTopic.convertToTopicDetails()); } } final long now = time.milliseconds(); - runnable.call(new Call("createTopics", calcDeadlineMs(now, options.timeoutMs()), + Call call = new Call("createTopics", calcDeadlineMs(now, options.timeoutMs()), new ControllerNodeProvider()) { @Override @@ -1128,7 +1142,10 @@ public class KafkaAdminClient extends AdminClient { void handleFailure(Throwable throwable) { completeAllExceptionally(topicFutures.values(), throwable); } - }, now); + }; + if (!topicsMap.isEmpty()) { + runnable.call(call, now); + } return new CreateTopicsResult(new HashMap>(topicFutures)); } @@ -1137,12 +1154,17 @@ public class KafkaAdminClient extends AdminClient { DeleteTopicsOptions options) { final Map> topicFutures = new HashMap<>(topicNames.size()); for (String topicName : topicNames) { - if (topicFutures.get(topicName) == null) { + if (topicNameIsUnrepresentable(topicName)) { + KafkaFutureImpl future = new KafkaFutureImpl<>(); + future.completeExceptionally(new InvalidTopicException("The given topic name '" + + topicName + "' cannot be represented in a request.")); + topicFutures.put(topicName, future); + } else if (!topicFutures.containsKey(topicName)) { topicFutures.put(topicName, new KafkaFutureImpl()); } } final long now = time.milliseconds(); - runnable.call(new Call("deleteTopics", calcDeadlineMs(now, options.timeoutMs()), + Call call = new Call("deleteTopics", calcDeadlineMs(now, options.timeoutMs()), new ControllerNodeProvider()) { @Override @@ -1181,7 +1203,10 @@ public class KafkaAdminClient extends AdminClient { void handleFailure(Throwable throwable) { completeAllExceptionally(topicFutures.values(), throwable); } - }, now); + }; + if (!topicNames.isEmpty()) { + runnable.call(call, now); + } return new DeleteTopicsResult(new HashMap>(topicFutures)); } @@ -1223,13 +1248,18 @@ public class KafkaAdminClient extends AdminClient { final Map> topicFutures = new HashMap<>(topicNames.size()); final ArrayList topicNamesList = new ArrayList<>(); for (String topicName : topicNames) { - if (!topicFutures.containsKey(topicName)) { + if (topicNameIsUnrepresentable(topicName)) { + KafkaFutureImpl future = new KafkaFutureImpl(); + future.completeExceptionally(new InvalidTopicException("The given topic name '" + + topicName + "' cannot be represented in a request.")); + topicFutures.put(topicName, future); + } else if (!topicFutures.containsKey(topicName)) { topicFutures.put(topicName, new KafkaFutureImpl()); topicNamesList.add(topicName); } } final long now = time.milliseconds(); - runnable.call(new Call("describeTopics", calcDeadlineMs(now, options.timeoutMs()), + Call call = new Call("describeTopics", calcDeadlineMs(now, options.timeoutMs()), new ControllerNodeProvider()) { private boolean supportsDisablingTopicCreation = true; @@ -1298,7 +1328,10 @@ public class KafkaAdminClient extends AdminClient { void handleFailure(Throwable throwable) { completeAllExceptionally(topicFutures.values(), throwable); } - }, now); + }; + if (!topicNamesList.isEmpty()) { + runnable.call(call, now); + } return new DescribeTopicsResult(new HashMap>(topicFutures)); } diff --git a/clients/src/test/java/org/apache/kafka/clients/admin/KafkaAdminClientTest.java b/clients/src/test/java/org/apache/kafka/clients/admin/KafkaAdminClientTest.java index 84588a9f3be..186ccf06cb5 100644 --- a/clients/src/test/java/org/apache/kafka/clients/admin/KafkaAdminClientTest.java +++ b/clients/src/test/java/org/apache/kafka/clients/admin/KafkaAdminClientTest.java @@ -64,6 +64,7 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.util.ArrayList; +import java.util.Arrays; import java.util.Collection; import java.util.Collections; import java.util.HashMap; @@ -215,6 +216,38 @@ public class KafkaAdminClientTest { } } + @Test + public void testInvalidTopicNames() throws Exception { + try (AdminClientUnitTestEnv env = mockClientEnv()) { + env.kafkaClient().setNodeApiVersions(NodeApiVersions.create()); + env.kafkaClient().prepareMetadataUpdate(env.cluster(), Collections.emptySet()); + env.kafkaClient().setNode(env.cluster().controller()); + + List sillyTopicNames = Arrays.asList(new String[] {"", null}); + Map> deleteFutures = + env.adminClient().deleteTopics(sillyTopicNames).values(); + for (String sillyTopicName : sillyTopicNames) { + assertFutureError(deleteFutures.get(sillyTopicName), InvalidTopicException.class); + } + + Map> describeFutures = + env.adminClient().describeTopics(sillyTopicNames).values(); + for (String sillyTopicName : sillyTopicNames) { + assertFutureError(describeFutures.get(sillyTopicName), InvalidTopicException.class); + } + + List newTopics = new ArrayList<>(); + for (String sillyTopicName : sillyTopicNames) { + newTopics.add(new NewTopic(sillyTopicName, 1, (short) 1)); + } + Map> createFutures = + env.adminClient().createTopics(newTopics).values(); + for (String sillyTopicName : sillyTopicNames) { + assertFutureError(createFutures .get(sillyTopicName), InvalidTopicException.class); + } + } + } + private static final AclBinding ACL1 = new AclBinding(new Resource(ResourceType.TOPIC, "mytopic3"), new AccessControlEntry("User:ANONYMOUS", "*", AclOperation.DESCRIBE, AclPermissionType.ALLOW)); private static final AclBinding ACL2 = new AclBinding(new Resource(ResourceType.TOPIC, "mytopic4"),