From 18223415b091b0c05af4767191bb97b366aa8fa5 Mon Sep 17 00:00:00 2001 From: "Matthias J. Sax" Date: Sun, 30 Oct 2016 11:34:17 -0700 Subject: [PATCH] KAFKA-4302: Simplify KTableSource KTableSource is always materialized since IQ: - removed flag KTableSource#materialized - removed MaterializedKTableSourceProcessor Author: Matthias J. Sax Reviewers: Eno Thereska, Guozhang Wang Closes #2065 from mjsax/kafka-4302-simplify-ktablesource --- .../kafka/streams/kstream/KStreamBuilder.java | 13 ++++++++-- .../streams/kstream/internals/KTableImpl.java | 25 ------------------- .../kstream/internals/KTableSource.java | 22 +--------------- 3 files changed, 12 insertions(+), 48 deletions(-) diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/KStreamBuilder.java b/streams/src/main/java/org/apache/kafka/streams/kstream/KStreamBuilder.java index f9544cc7f1b..38d126e4641 100644 --- a/streams/src/main/java/org/apache/kafka/streams/kstream/KStreamBuilder.java +++ b/streams/src/main/java/org/apache/kafka/streams/kstream/KStreamBuilder.java @@ -22,7 +22,9 @@ import org.apache.kafka.streams.kstream.internals.KStreamImpl; import org.apache.kafka.streams.kstream.internals.KTableImpl; import org.apache.kafka.streams.kstream.internals.KTableSource; import org.apache.kafka.streams.processor.ProcessorSupplier; +import org.apache.kafka.streams.processor.StateStoreSupplier; import org.apache.kafka.streams.processor.TopologyBuilder; +import org.apache.kafka.streams.state.internals.RocksDBKeyValueStoreSupplier; import java.util.Collections; import java.util.concurrent.atomic.AtomicInteger; @@ -139,7 +141,7 @@ public class KStreamBuilder extends TopologyBuilder { * @param valSerde value serde used to send key-value pairs, * if not specified the default value serde defined in the configuration will be used * @param topic the topic name; cannot be null - * @param storeName the state store name used if this KTable is materialized, can be null if materialization not expected + * @param storeName the state store name used for the materialized KTable * @return a {@link KTable} for the specified topics */ public KTable table(Serde keySerde, Serde valSerde, String topic, final String storeName) { @@ -151,7 +153,14 @@ public class KStreamBuilder extends TopologyBuilder { addProcessor(name, processorSupplier, source); final KTableImpl kTable = new KTableImpl<>(this, name, processorSupplier, Collections.singleton(source), keySerde, valSerde, storeName); - kTable.materialize((KTableSource) processorSupplier); + StateStoreSupplier storeSupplier = new RocksDBKeyValueStoreSupplier<>(storeName, + keySerde, + valSerde, + false, + Collections.emptyMap(), + true); + + addStateStore(storeSupplier, name); connectSourceStoreAndTopic(storeName, topic); return kTable; diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableImpl.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableImpl.java index 683dc00a064..cd83d50e871 100644 --- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableImpl.java +++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableImpl.java @@ -19,7 +19,6 @@ package org.apache.kafka.streams.kstream.internals; import org.apache.kafka.common.serialization.Serde; import org.apache.kafka.streams.KeyValue; -import org.apache.kafka.streams.errors.StreamsException; import org.apache.kafka.streams.errors.TopologyBuilderException; import org.apache.kafka.streams.kstream.ForeachAction; import org.apache.kafka.streams.kstream.KGroupedTable; @@ -31,14 +30,11 @@ import org.apache.kafka.streams.kstream.Predicate; import org.apache.kafka.streams.kstream.ValueJoiner; import org.apache.kafka.streams.kstream.ValueMapper; import org.apache.kafka.streams.processor.ProcessorSupplier; -import org.apache.kafka.streams.processor.StateStoreSupplier; import org.apache.kafka.streams.processor.StreamPartitioner; -import org.apache.kafka.streams.state.internals.RocksDBKeyValueStoreSupplier; import java.io.FileNotFoundException; import java.io.FileOutputStream; import java.io.PrintStream; -import java.util.Collections; import java.util.Objects; import java.util.Set; @@ -371,9 +367,6 @@ public class KTableImpl extends AbstractStream implements KTable valueGetterSupplier() { if (processorSupplier instanceof KTableSource) { KTableSource source = (KTableSource) processorSupplier; - if (!source.isMaterialized()) { - throw new StreamsException("Source is not materialized"); - } return new KTableSourceValueGetterSupplier<>(source.storeName); } else if (processorSupplier instanceof KStreamAggProcessorSupplier) { return ((KStreamAggProcessorSupplier) processorSupplier).view(); @@ -387,9 +380,6 @@ public class KTableImpl extends AbstractStream implements KTable source = (KTableSource) processorSupplier; - if (!source.isMaterialized()) { - throw new StreamsException("Source is not materialized"); - } source.enableSendingOldValues(); } else if (processorSupplier instanceof KStreamAggProcessorSupplier) { ((KStreamAggProcessorSupplier) processorSupplier).enableSendingOldValues(); @@ -404,19 +394,4 @@ public class KTableImpl extends AbstractStream implements KTable source) { - synchronized (source) { - if (!source.isMaterialized()) { - StateStoreSupplier storeSupplier = new RocksDBKeyValueStoreSupplier<>(source.storeName, - keySerde, - valSerde, - false, - Collections.emptyMap(), - true); - // mark this state as non internal hence it is read directly from a user topic - topology.addStateStore(storeSupplier, name); - source.materialize(); - } - } - } } diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableSource.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableSource.java index d8d389f7d8f..20a80f4964a 100644 --- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableSource.java +++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableSource.java @@ -29,7 +29,6 @@ public class KTableSource implements ProcessorSupplier { public final String storeName; - private boolean materialized = false; private boolean sendOldValues = false; public KTableSource(String storeName) { @@ -38,15 +37,7 @@ public class KTableSource implements ProcessorSupplier { @Override public Processor get() { - return materialized ? new MaterializedKTableSourceProcessor() : new KTableSourceProcessor(); - } - - public void materialize() { - materialized = true; - } - - public boolean isMaterialized() { - return materialized; + return new KTableSourceProcessor(); } public void enableSendingOldValues() { @@ -54,17 +45,6 @@ public class KTableSource implements ProcessorSupplier { } private class KTableSourceProcessor extends AbstractProcessor { - @Override - public void process(K key, V value) { - // the keys should never be null - if (key == null) - throw new StreamsException("Record key for the source KTable from store name " + storeName + " should not be null."); - - context().forward(key, new Change<>(value, null)); - } - } - - private class MaterializedKTableSourceProcessor extends AbstractProcessor { private KeyValueStore store;