diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/InternalTopicManager.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/InternalTopicManager.java index ee7fd3e2cb6..f9fe0423482 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/InternalTopicManager.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/InternalTopicManager.java @@ -23,7 +23,6 @@ import org.apache.kafka.clients.admin.DescribeTopicsResult; import org.apache.kafka.clients.admin.NewTopic; import org.apache.kafka.clients.admin.TopicDescription; import org.apache.kafka.common.KafkaFuture; -import org.apache.kafka.common.errors.InvalidTopicException; import org.apache.kafka.common.errors.LeaderNotAvailableException; import org.apache.kafka.common.errors.TopicExistsException; import org.apache.kafka.common.errors.UnknownTopicOrPartitionException; @@ -204,9 +203,7 @@ public class InternalTopicManager { } catch (final ExecutionException couldNotDescribeTopicException) { final Throwable cause = couldNotDescribeTopicException.getCause(); if (cause instanceof UnknownTopicOrPartitionException || - cause instanceof LeaderNotAvailableException || - (cause instanceof InvalidTopicException && - cause.getMessage().equals("Topic " + topicName + " not found."))) { + cause instanceof LeaderNotAvailableException) { // This topic didn't exist or leader is not known yet, proceed to try to create it log.debug("Topic {} is unknown or not found, hence not existed yet.", topicName); } else {