Browse Source

MINOR: Allow creation of statestore without logging enabled or explicit source topic

guozhangwang

Author: dan norwood <norwood@confluent.io>

Reviewers: Eno Thereska, Damian Guy, Guozhang Wang

Closes #1828 from norwood/manual-store
pull/1828/merge
Dan Norwood 8 years ago committed by Guozhang Wang
parent
commit
896ad63f14
  1. 15
      streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorStateManager.java

15
streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorStateManager.java

@ -133,13 +133,16 @@ public class ProcessorStateManager { @@ -133,13 +133,16 @@ public class ProcessorStateManager {
}
// check that the underlying change log topic exist or not
String topic;
String topic = null;
if (loggingEnabled) {
topic = storeChangelogTopic(this.applicationId, store.name());
} else if (sourceStoreToSourceTopic != null && sourceStoreToSourceTopic.containsKey(store.name())) {
topic = sourceStoreToSourceTopic.get(store.name());
} else {
throw new IllegalArgumentException(String.format("task [%s] Store is neither built from source topic, nor has a changelog.", taskId));
}
if (topic == null) {
this.stores.put(store.name(), store);
return;
}
// block until the partition is ready for this state changelog topic or time has elapsed
@ -167,12 +170,14 @@ public class ProcessorStateManager { @@ -167,12 +170,14 @@ public class ProcessorStateManager {
}
} while (partitionNotFound && System.currentTimeMillis() < startTime + waitTime);
if (partitionNotFound)
if (partitionNotFound) {
throw new StreamsException(String.format("task [%s] Store %s's change log (%s) does not contain partition %s", taskId, store.name(), topic, partition));
}
if (isStandby) {
if (store.persistent())
if (store.persistent()) {
restoreCallbacks.put(topic, stateRestoreCallback);
}
} else {
restoreActiveState(topic, stateRestoreCallback);
}

Loading…
Cancel
Save