Browse Source

KAFKA-4302: Simplify KTableSource

KTableSource is always materialized since IQ:
  - removed flag KTableSource#materialized
  - removed MaterializedKTableSourceProcessor

Author: Matthias J. Sax <matthias@confluent.io>

Reviewers: Eno Thereska, Guozhang Wang

Closes #2065 from mjsax/kafka-4302-simplify-ktablesource
pull/2065/merge
Matthias J. Sax 8 years ago committed by Guozhang Wang
parent
commit
18223415b0
  1. 13
      streams/src/main/java/org/apache/kafka/streams/kstream/KStreamBuilder.java
  2. 25
      streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableImpl.java
  3. 22
      streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableSource.java

13
streams/src/main/java/org/apache/kafka/streams/kstream/KStreamBuilder.java

@ -22,7 +22,9 @@ import org.apache.kafka.streams.kstream.internals.KStreamImpl; @@ -22,7 +22,9 @@ import org.apache.kafka.streams.kstream.internals.KStreamImpl;
import org.apache.kafka.streams.kstream.internals.KTableImpl;
import org.apache.kafka.streams.kstream.internals.KTableSource;
import org.apache.kafka.streams.processor.ProcessorSupplier;
import org.apache.kafka.streams.processor.StateStoreSupplier;
import org.apache.kafka.streams.processor.TopologyBuilder;
import org.apache.kafka.streams.state.internals.RocksDBKeyValueStoreSupplier;
import java.util.Collections;
import java.util.concurrent.atomic.AtomicInteger;
@ -139,7 +141,7 @@ public class KStreamBuilder extends TopologyBuilder { @@ -139,7 +141,7 @@ public class KStreamBuilder extends TopologyBuilder {
* @param valSerde value serde used to send key-value pairs,
* if not specified the default value serde defined in the configuration will be used
* @param topic the topic name; cannot be null
* @param storeName the state store name used if this KTable is materialized, can be null if materialization not expected
* @param storeName the state store name used for the materialized KTable
* @return a {@link KTable} for the specified topics
*/
public <K, V> KTable<K, V> table(Serde<K> keySerde, Serde<V> valSerde, String topic, final String storeName) {
@ -151,7 +153,14 @@ public class KStreamBuilder extends TopologyBuilder { @@ -151,7 +153,14 @@ public class KStreamBuilder extends TopologyBuilder {
addProcessor(name, processorSupplier, source);
final KTableImpl kTable = new KTableImpl<>(this, name, processorSupplier, Collections.singleton(source), keySerde, valSerde, storeName);
kTable.materialize((KTableSource) processorSupplier);
StateStoreSupplier storeSupplier = new RocksDBKeyValueStoreSupplier<>(storeName,
keySerde,
valSerde,
false,
Collections.<String, String>emptyMap(),
true);
addStateStore(storeSupplier, name);
connectSourceStoreAndTopic(storeName, topic);
return kTable;

25
streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableImpl.java

@ -19,7 +19,6 @@ package org.apache.kafka.streams.kstream.internals; @@ -19,7 +19,6 @@ package org.apache.kafka.streams.kstream.internals;
import org.apache.kafka.common.serialization.Serde;
import org.apache.kafka.streams.KeyValue;
import org.apache.kafka.streams.errors.StreamsException;
import org.apache.kafka.streams.errors.TopologyBuilderException;
import org.apache.kafka.streams.kstream.ForeachAction;
import org.apache.kafka.streams.kstream.KGroupedTable;
@ -31,14 +30,11 @@ import org.apache.kafka.streams.kstream.Predicate; @@ -31,14 +30,11 @@ import org.apache.kafka.streams.kstream.Predicate;
import org.apache.kafka.streams.kstream.ValueJoiner;
import org.apache.kafka.streams.kstream.ValueMapper;
import org.apache.kafka.streams.processor.ProcessorSupplier;
import org.apache.kafka.streams.processor.StateStoreSupplier;
import org.apache.kafka.streams.processor.StreamPartitioner;
import org.apache.kafka.streams.state.internals.RocksDBKeyValueStoreSupplier;
import java.io.FileNotFoundException;
import java.io.FileOutputStream;
import java.io.PrintStream;
import java.util.Collections;
import java.util.Objects;
import java.util.Set;
@ -371,9 +367,6 @@ public class KTableImpl<K, S, V> extends AbstractStream<K> implements KTable<K, @@ -371,9 +367,6 @@ public class KTableImpl<K, S, V> extends AbstractStream<K> implements KTable<K,
KTableValueGetterSupplier<K, V> valueGetterSupplier() {
if (processorSupplier instanceof KTableSource) {
KTableSource<K, V> source = (KTableSource<K, V>) processorSupplier;
if (!source.isMaterialized()) {
throw new StreamsException("Source is not materialized");
}
return new KTableSourceValueGetterSupplier<>(source.storeName);
} else if (processorSupplier instanceof KStreamAggProcessorSupplier) {
return ((KStreamAggProcessorSupplier<?, K, S, V>) processorSupplier).view();
@ -387,9 +380,6 @@ public class KTableImpl<K, S, V> extends AbstractStream<K> implements KTable<K, @@ -387,9 +380,6 @@ public class KTableImpl<K, S, V> extends AbstractStream<K> implements KTable<K,
if (!sendOldValues) {
if (processorSupplier instanceof KTableSource) {
KTableSource<K, ?> source = (KTableSource<K, V>) processorSupplier;
if (!source.isMaterialized()) {
throw new StreamsException("Source is not materialized");
}
source.enableSendingOldValues();
} else if (processorSupplier instanceof KStreamAggProcessorSupplier) {
((KStreamAggProcessorSupplier<?, K, S, V>) processorSupplier).enableSendingOldValues();
@ -404,19 +394,4 @@ public class KTableImpl<K, S, V> extends AbstractStream<K> implements KTable<K, @@ -404,19 +394,4 @@ public class KTableImpl<K, S, V> extends AbstractStream<K> implements KTable<K,
return sendOldValues;
}
public void materialize(KTableSource<K, ?> source) {
synchronized (source) {
if (!source.isMaterialized()) {
StateStoreSupplier storeSupplier = new RocksDBKeyValueStoreSupplier<>(source.storeName,
keySerde,
valSerde,
false,
Collections.<String, String>emptyMap(),
true);
// mark this state as non internal hence it is read directly from a user topic
topology.addStateStore(storeSupplier, name);
source.materialize();
}
}
}
}

22
streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableSource.java

@ -29,7 +29,6 @@ public class KTableSource<K, V> implements ProcessorSupplier<K, V> { @@ -29,7 +29,6 @@ public class KTableSource<K, V> implements ProcessorSupplier<K, V> {
public final String storeName;
private boolean materialized = false;
private boolean sendOldValues = false;
public KTableSource(String storeName) {
@ -38,15 +37,7 @@ public class KTableSource<K, V> implements ProcessorSupplier<K, V> { @@ -38,15 +37,7 @@ public class KTableSource<K, V> implements ProcessorSupplier<K, V> {
@Override
public Processor<K, V> get() {
return materialized ? new MaterializedKTableSourceProcessor() : new KTableSourceProcessor();
}
public void materialize() {
materialized = true;
}
public boolean isMaterialized() {
return materialized;
return new KTableSourceProcessor();
}
public void enableSendingOldValues() {
@ -54,17 +45,6 @@ public class KTableSource<K, V> implements ProcessorSupplier<K, V> { @@ -54,17 +45,6 @@ public class KTableSource<K, V> implements ProcessorSupplier<K, V> {
}
private class KTableSourceProcessor extends AbstractProcessor<K, V> {
@Override
public void process(K key, V value) {
// the keys should never be null
if (key == null)
throw new StreamsException("Record key for the source KTable from store name " + storeName + " should not be null.");
context().forward(key, new Change<>(value, null));
}
}
private class MaterializedKTableSourceProcessor extends AbstractProcessor<K, V> {
private KeyValueStore<K, V> store;

Loading…
Cancel
Save