Browse Source

KAFKA-7921: log at error level for missing source topic (#6262)

This condition is a fatal error, so error level is warranted, to provide more context on why Streams shuts down.

Reviewers: Matthias J. Sax <matthias@confluent.io>, Guozhang Wang <guozhang@confluent.io>
pull/6267/head
John Roesler 6 years ago committed by Guozhang Wang
parent
commit
1aae604861
  1. 2
      streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
  2. 2
      streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsPartitionAssignor.java

2
streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java

@ -267,7 +267,7 @@ public class StreamThread extends Thread { @@ -267,7 +267,7 @@ public class StreamThread extends Thread {
taskManager.suspendedStandbyTaskIds());
if (streamThread.assignmentErrorCode.get() == StreamsPartitionAssignor.Error.INCOMPLETE_SOURCE_TOPIC_METADATA.code()) {
log.debug("Received error code {} - shutdown", streamThread.assignmentErrorCode.get());
log.error("Received error code {} - shutdown", streamThread.assignmentErrorCode.get());
streamThread.shutdown();
streamThread.setStateListener(null);
return;

2
streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsPartitionAssignor.java

@ -463,6 +463,8 @@ public class StreamsPartitionAssignor implements PartitionAssignor, Configurable @@ -463,6 +463,8 @@ public class StreamsPartitionAssignor implements PartitionAssignor, Configurable
for (final String topic : topicsInfo.sourceTopics) {
if (!topicsInfo.repartitionSourceTopics.keySet().contains(topic) &&
!metadata.topics().contains(topic)) {
log.error("Missing source topic {} durign assignment. Returning error {}.",
topic, Error.INCOMPLETE_SOURCE_TOPIC_METADATA.name());
return errorAssignment(clientsMetadata, topic, Error.INCOMPLETE_SOURCE_TOPIC_METADATA.code);
}
}

Loading…
Cancel
Save