Browse Source

MINOR: Remove throwing exception if not found from describe topics (#6112)

We recently improved the handling of the InternalTopicManager retries with #6085. The AdminClient will throw an InvalidTopicException if the topic is not found. We need to ignore that exception as when calling AdminClient#describe we may not have had a chance to create the topic yet, especially with the case of internal topics

I've created a new test asserting that when an InvalidTopicException is thrown when the topic is not found we continue on.

Reviewers: John Roesler <john@confluent.io>, Guozhang Wang <guozhang@confluent.io>
pull/6121/head
Bill Bejeck 6 years ago committed by Guozhang Wang
parent
commit
c238af29bf
  1. 7
      streams/src/main/java/org/apache/kafka/streams/processor/internals/InternalTopicManager.java
  2. 31
      streams/src/test/java/org/apache/kafka/streams/processor/internals/InternalTopicManagerTest.java

7
streams/src/main/java/org/apache/kafka/streams/processor/internals/InternalTopicManager.java

@ -23,6 +23,7 @@ import org.apache.kafka.clients.admin.DescribeTopicsResult; @@ -23,6 +23,7 @@ import org.apache.kafka.clients.admin.DescribeTopicsResult;
import org.apache.kafka.clients.admin.NewTopic;
import org.apache.kafka.clients.admin.TopicDescription;
import org.apache.kafka.common.KafkaFuture;
import org.apache.kafka.common.errors.InvalidTopicException;
import org.apache.kafka.common.errors.LeaderNotAvailableException;
import org.apache.kafka.common.errors.TopicExistsException;
import org.apache.kafka.common.errors.UnknownTopicOrPartitionException;
@ -203,9 +204,11 @@ public class InternalTopicManager { @@ -203,9 +204,11 @@ public class InternalTopicManager {
} catch (final ExecutionException couldNotDescribeTopicException) {
final Throwable cause = couldNotDescribeTopicException.getCause();
if (cause instanceof UnknownTopicOrPartitionException ||
cause instanceof LeaderNotAvailableException) {
cause instanceof LeaderNotAvailableException ||
(cause instanceof InvalidTopicException &&
cause.getMessage().equals("Topic " + topicName + " not found."))) {
// This topic didn't exist or leader is not known yet, proceed to try to create it
log.debug("Topic {} is unknown, hence not existed yet.", topicName);
log.debug("Topic {} is unknown or not found, hence not existed yet.", topicName);
} else {
log.error("Unexpected error during topic description for {}.\n" +
"Error message was: {}", topicName, cause.toString());

31
streams/src/test/java/org/apache/kafka/streams/processor/internals/InternalTopicManagerTest.java

@ -28,6 +28,7 @@ import org.apache.kafka.common.errors.TimeoutException; @@ -28,6 +28,7 @@ import org.apache.kafka.common.errors.TimeoutException;
import org.apache.kafka.common.utils.Utils;
import org.apache.kafka.streams.StreamsConfig;
import org.apache.kafka.streams.errors.StreamsException;
import org.apache.kafka.streams.processor.internals.testutil.LogCaptureAppender;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
@ -192,6 +193,36 @@ public class InternalTopicManagerTest { @@ -192,6 +193,36 @@ public class InternalTopicManagerTest {
}
}
@Test
public void shouldLogWhenTopicNotFoundAndNotThrowException() {
LogCaptureAppender.setClassLoggerToDebug(InternalTopicManager.class);
final LogCaptureAppender appender = LogCaptureAppender.createAndRegister();
mockAdminClient.addTopic(
false,
topic,
Collections.singletonList(new TopicPartitionInfo(0, broker1, cluster, Collections.emptyList())),
null);
final InternalTopicConfig internalTopicConfig = new RepartitionTopicConfig(topic, Collections.emptyMap());
internalTopicConfig.setNumberOfPartitions(1);
final InternalTopicConfig internalTopicConfigII = new RepartitionTopicConfig("internal-topic", Collections.emptyMap());
internalTopicConfigII.setNumberOfPartitions(1);
final Map<String, InternalTopicConfig> topicConfigMap = new HashMap<>();
topicConfigMap.put(topic, internalTopicConfig);
topicConfigMap.put("internal-topic", internalTopicConfigII);
internalTopicManager.makeReady(topicConfigMap);
boolean foundExpectedMessage = false;
for (final String message : appender.getMessages()) {
foundExpectedMessage |= message.contains("Topic internal-topic is unknown or not found, hence not existed yet.");
}
assertTrue(foundExpectedMessage);
}
@Test
public void shouldExhaustRetriesOnMarkedForDeletionTopic() {
mockAdminClient.addTopic(

Loading…
Cancel
Save