diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java index e96c11028cf..71df0f963da 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java @@ -267,7 +267,7 @@ public class StreamThread extends Thread { taskManager.suspendedStandbyTaskIds()); if (streamThread.assignmentErrorCode.get() == StreamsPartitionAssignor.Error.INCOMPLETE_SOURCE_TOPIC_METADATA.code()) { - log.debug("Received error code {} - shutdown", streamThread.assignmentErrorCode.get()); + log.error("Received error code {} - shutdown", streamThread.assignmentErrorCode.get()); streamThread.shutdown(); streamThread.setStateListener(null); return; diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsPartitionAssignor.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsPartitionAssignor.java index 2f649bc8c37..3d94572f385 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsPartitionAssignor.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsPartitionAssignor.java @@ -463,6 +463,8 @@ public class StreamsPartitionAssignor implements PartitionAssignor, Configurable for (final String topic : topicsInfo.sourceTopics) { if (!topicsInfo.repartitionSourceTopics.keySet().contains(topic) && !metadata.topics().contains(topic)) { + log.error("Missing source topic {} durign assignment. Returning error {}.", + topic, Error.INCOMPLETE_SOURCE_TOPIC_METADATA.name()); return errorAssignment(clientsMetadata, topic, Error.INCOMPLETE_SOURCE_TOPIC_METADATA.code); } }