diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorStateManager.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorStateManager.java index 5e5eaa9aabd..51a50e903b2 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorStateManager.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorStateManager.java @@ -133,13 +133,16 @@ public class ProcessorStateManager { } // check that the underlying change log topic exist or not - String topic; + String topic = null; if (loggingEnabled) { topic = storeChangelogTopic(this.applicationId, store.name()); } else if (sourceStoreToSourceTopic != null && sourceStoreToSourceTopic.containsKey(store.name())) { topic = sourceStoreToSourceTopic.get(store.name()); - } else { - throw new IllegalArgumentException(String.format("task [%s] Store is neither built from source topic, nor has a changelog.", taskId)); + } + + if (topic == null) { + this.stores.put(store.name(), store); + return; } // block until the partition is ready for this state changelog topic or time has elapsed @@ -167,12 +170,14 @@ public class ProcessorStateManager { } } while (partitionNotFound && System.currentTimeMillis() < startTime + waitTime); - if (partitionNotFound) + if (partitionNotFound) { throw new StreamsException(String.format("task [%s] Store %s's change log (%s) does not contain partition %s", taskId, store.name(), topic, partition)); + } if (isStandby) { - if (store.persistent()) + if (store.persistent()) { restoreCallbacks.put(topic, stateRestoreCallback); + } } else { restoreActiveState(topic, stateRestoreCallback); }