Browse Source

KAFKA-7502: Cleanup KTable materialization logic in a single place (doMapValues) (#6520)

* Move materialization logic from TableProcessorNode to KTableImpl

1. TableProcessorNode: remove materializedInternal, use storeBuilder instead.
2. Instantiate StoreBuilder in KTableImpl#[doFilter, doMapValues, doTransformValues], instead of TableProcessorNode#init.

* Cleanup KTableImpl#doMapValues

* 1. Add TableProcessorNode(String, ProcessorParameters, StoreBuilder). 2. Reformat+trivial changes on TableProcessorNode.java.
pull/6526/head
Lee Dongjin 6 years ago committed by Bill Bejeck
parent
commit
ff78c684ff
  1. 32
      streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableImpl.java

32
streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableImpl.java

@ -201,17 +201,31 @@ public class KTableImpl<K, S, V> extends AbstractStream<K, V> implements KTable<
private <VR> KTable<K, VR> doMapValues(final ValueMapperWithKey<? super K, ? super V, ? extends VR> mapper, private <VR> KTable<K, VR> doMapValues(final ValueMapperWithKey<? super K, ? super V, ? extends VR> mapper,
final MaterializedInternal<K, VR, KeyValueStore<Bytes, byte[]>> materializedInternal) { final MaterializedInternal<K, VR, KeyValueStore<Bytes, byte[]>> materializedInternal) {
// we actually do not need generate store names at all since if it is not specified, we will not final Serde<K> keySerde;
// materialize the store; but we still need to burn one index BEFORE generating the processor to keep compatibility. final Serde<VR> valueSerde;
if (materializedInternal != null && materializedInternal.storeName() == null) { final String queryableStoreName;
builder.newStoreName(MAPVALUES_NAME); final StoreBuilder<KeyValueStore<K, VR>> storeBuilder;
if (materializedInternal != null) {
// we actually do not need to generate store names at all since if it is not specified, we will not
// materialize the store; but we still need to burn one index BEFORE generating the processor to keep compatibility.
if (materializedInternal.storeName() == null) {
builder.newStoreName(MAPVALUES_NAME);
}
keySerde = materializedInternal.keySerde() != null ? materializedInternal.keySerde() : this.keySerde;
valueSerde = materializedInternal.valueSerde();
queryableStoreName = materializedInternal.queryableStoreName();
// only materialize if materialized is specified and it has queryable name
storeBuilder = queryableStoreName != null ? (new KeyValueStoreMaterializer<>(materializedInternal)).materialize() : null;
} else {
keySerde = this.keySerde;
valueSerde = null;
queryableStoreName = null;
storeBuilder = null;
} }
final String name = builder.newProcessorName(MAPVALUES_NAME); final String name = builder.newProcessorName(MAPVALUES_NAME);
// only materialize if the state store has queryable name
final String queryableStoreName = materializedInternal != null ? materializedInternal.queryableStoreName() : null;
final StoreBuilder<KeyValueStore<K, VR>> storeBuilder = queryableStoreName != null ? (new KeyValueStoreMaterializer<>(materializedInternal)).materialize() : null;
final KTableProcessorSupplier<K, V, VR> processorSupplier = new KTableMapValues<>(this, mapper, queryableStoreName); final KTableProcessorSupplier<K, V, VR> processorSupplier = new KTableMapValues<>(this, mapper, queryableStoreName);
// leaving in calls to ITB until building topology with graph // leaving in calls to ITB until building topology with graph
@ -232,8 +246,8 @@ public class KTableImpl<K, S, V> extends AbstractStream<K, V> implements KTable<
// we preserve the value following the order of 1) materialized, 2) null // we preserve the value following the order of 1) materialized, 2) null
return new KTableImpl<>( return new KTableImpl<>(
name, name,
materializedInternal != null && materializedInternal.keySerde() != null ? materializedInternal.keySerde() : keySerde, keySerde,
materializedInternal != null ? materializedInternal.valueSerde() : null, valueSerde,
sourceNodes, sourceNodes,
queryableStoreName, queryableStoreName,
processorSupplier, processorSupplier,

Loading…
Cancel
Save