Browse Source

KAFKA-6418; AdminClient should handle empty or null topic names better (#4470)

pull/4429/merge
Colin Patrick McCabe 7 years ago committed by Jason Gustafson
parent
commit
ef93998fa7
  1. 51
      clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java
  2. 33
      clients/src/test/java/org/apache/kafka/clients/admin/KafkaAdminClientTest.java

51
clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java

@ -1077,19 +1077,33 @@ public class KafkaAdminClient extends AdminClient { @@ -1077,19 +1077,33 @@ public class KafkaAdminClient extends AdminClient {
}
}
/**
* Returns true if a topic name cannot be represented in an RPC. This function does NOT check
* whether the name is too long, contains invalid characters, etc. It is better to enforce
* those policies on the server, so that they can be changed in the future if needed.
*/
private static boolean topicNameIsUnrepresentable(String topicName) {
return (topicName == null) || topicName.isEmpty();
}
@Override
public CreateTopicsResult createTopics(final Collection<NewTopic> newTopics,
final CreateTopicsOptions options) {
final Map<String, KafkaFutureImpl<Void>> topicFutures = new HashMap<>(newTopics.size());
final Map<String, CreateTopicsRequest.TopicDetails> topicsMap = new HashMap<>(newTopics.size());
for (NewTopic newTopic : newTopics) {
if (topicFutures.get(newTopic.name()) == null) {
if (topicNameIsUnrepresentable(newTopic.name())) {
KafkaFutureImpl<Void> future = new KafkaFutureImpl<>();
future.completeExceptionally(new InvalidTopicException("The given topic name '" +
newTopic.name() + "' cannot be represented in a request."));
topicFutures.put(newTopic.name(), future);
} else if (!topicFutures.containsKey(newTopic.name())) {
topicFutures.put(newTopic.name(), new KafkaFutureImpl<Void>());
topicsMap.put(newTopic.name(), newTopic.convertToTopicDetails());
}
}
final long now = time.milliseconds();
runnable.call(new Call("createTopics", calcDeadlineMs(now, options.timeoutMs()),
Call call = new Call("createTopics", calcDeadlineMs(now, options.timeoutMs()),
new ControllerNodeProvider()) {
@Override
@ -1128,7 +1142,10 @@ public class KafkaAdminClient extends AdminClient { @@ -1128,7 +1142,10 @@ public class KafkaAdminClient extends AdminClient {
void handleFailure(Throwable throwable) {
completeAllExceptionally(topicFutures.values(), throwable);
}
}, now);
};
if (!topicsMap.isEmpty()) {
runnable.call(call, now);
}
return new CreateTopicsResult(new HashMap<String, KafkaFuture<Void>>(topicFutures));
}
@ -1137,12 +1154,17 @@ public class KafkaAdminClient extends AdminClient { @@ -1137,12 +1154,17 @@ public class KafkaAdminClient extends AdminClient {
DeleteTopicsOptions options) {
final Map<String, KafkaFutureImpl<Void>> topicFutures = new HashMap<>(topicNames.size());
for (String topicName : topicNames) {
if (topicFutures.get(topicName) == null) {
if (topicNameIsUnrepresentable(topicName)) {
KafkaFutureImpl<Void> future = new KafkaFutureImpl<>();
future.completeExceptionally(new InvalidTopicException("The given topic name '" +
topicName + "' cannot be represented in a request."));
topicFutures.put(topicName, future);
} else if (!topicFutures.containsKey(topicName)) {
topicFutures.put(topicName, new KafkaFutureImpl<Void>());
}
}
final long now = time.milliseconds();
runnable.call(new Call("deleteTopics", calcDeadlineMs(now, options.timeoutMs()),
Call call = new Call("deleteTopics", calcDeadlineMs(now, options.timeoutMs()),
new ControllerNodeProvider()) {
@Override
@ -1181,7 +1203,10 @@ public class KafkaAdminClient extends AdminClient { @@ -1181,7 +1203,10 @@ public class KafkaAdminClient extends AdminClient {
void handleFailure(Throwable throwable) {
completeAllExceptionally(topicFutures.values(), throwable);
}
}, now);
};
if (!topicNames.isEmpty()) {
runnable.call(call, now);
}
return new DeleteTopicsResult(new HashMap<String, KafkaFuture<Void>>(topicFutures));
}
@ -1223,13 +1248,18 @@ public class KafkaAdminClient extends AdminClient { @@ -1223,13 +1248,18 @@ public class KafkaAdminClient extends AdminClient {
final Map<String, KafkaFutureImpl<TopicDescription>> topicFutures = new HashMap<>(topicNames.size());
final ArrayList<String> topicNamesList = new ArrayList<>();
for (String topicName : topicNames) {
if (!topicFutures.containsKey(topicName)) {
if (topicNameIsUnrepresentable(topicName)) {
KafkaFutureImpl<TopicDescription> future = new KafkaFutureImpl<TopicDescription>();
future.completeExceptionally(new InvalidTopicException("The given topic name '" +
topicName + "' cannot be represented in a request."));
topicFutures.put(topicName, future);
} else if (!topicFutures.containsKey(topicName)) {
topicFutures.put(topicName, new KafkaFutureImpl<TopicDescription>());
topicNamesList.add(topicName);
}
}
final long now = time.milliseconds();
runnable.call(new Call("describeTopics", calcDeadlineMs(now, options.timeoutMs()),
Call call = new Call("describeTopics", calcDeadlineMs(now, options.timeoutMs()),
new ControllerNodeProvider()) {
private boolean supportsDisablingTopicCreation = true;
@ -1298,7 +1328,10 @@ public class KafkaAdminClient extends AdminClient { @@ -1298,7 +1328,10 @@ public class KafkaAdminClient extends AdminClient {
void handleFailure(Throwable throwable) {
completeAllExceptionally(topicFutures.values(), throwable);
}
}, now);
};
if (!topicNamesList.isEmpty()) {
runnable.call(call, now);
}
return new DescribeTopicsResult(new HashMap<String, KafkaFuture<TopicDescription>>(topicFutures));
}

33
clients/src/test/java/org/apache/kafka/clients/admin/KafkaAdminClientTest.java

@ -64,6 +64,7 @@ import org.slf4j.Logger; @@ -64,6 +64,7 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
@ -215,6 +216,38 @@ public class KafkaAdminClientTest { @@ -215,6 +216,38 @@ public class KafkaAdminClientTest {
}
}
@Test
public void testInvalidTopicNames() throws Exception {
try (AdminClientUnitTestEnv env = mockClientEnv()) {
env.kafkaClient().setNodeApiVersions(NodeApiVersions.create());
env.kafkaClient().prepareMetadataUpdate(env.cluster(), Collections.<String>emptySet());
env.kafkaClient().setNode(env.cluster().controller());
List<String> sillyTopicNames = Arrays.asList(new String[] {"", null});
Map<String, KafkaFuture<Void>> deleteFutures =
env.adminClient().deleteTopics(sillyTopicNames).values();
for (String sillyTopicName : sillyTopicNames) {
assertFutureError(deleteFutures.get(sillyTopicName), InvalidTopicException.class);
}
Map<String, KafkaFuture<TopicDescription>> describeFutures =
env.adminClient().describeTopics(sillyTopicNames).values();
for (String sillyTopicName : sillyTopicNames) {
assertFutureError(describeFutures.get(sillyTopicName), InvalidTopicException.class);
}
List<NewTopic> newTopics = new ArrayList<>();
for (String sillyTopicName : sillyTopicNames) {
newTopics.add(new NewTopic(sillyTopicName, 1, (short) 1));
}
Map<String, KafkaFuture<Void>> createFutures =
env.adminClient().createTopics(newTopics).values();
for (String sillyTopicName : sillyTopicNames) {
assertFutureError(createFutures .get(sillyTopicName), InvalidTopicException.class);
}
}
}
private static final AclBinding ACL1 = new AclBinding(new Resource(ResourceType.TOPIC, "mytopic3"),
new AccessControlEntry("User:ANONYMOUS", "*", AclOperation.DESCRIBE, AclPermissionType.ALLOW));
private static final AclBinding ACL2 = new AclBinding(new Resource(ResourceType.TOPIC, "mytopic4"),

Loading…
Cancel
Save