From ff78c684ff22d81174cd789a4ac7e7e1fe4dfc8a Mon Sep 17 00:00:00 2001 From: Lee Dongjin Date: Sat, 30 Mar 2019 06:10:04 +0900 Subject: [PATCH] KAFKA-7502: Cleanup KTable materialization logic in a single place (doMapValues) (#6520) * Move materialization logic from TableProcessorNode to KTableImpl 1. TableProcessorNode: remove materializedInternal, use storeBuilder instead. 2. Instantiate StoreBuilder in KTableImpl#[doFilter, doMapValues, doTransformValues], instead of TableProcessorNode#init. * Cleanup KTableImpl#doMapValues * 1. Add TableProcessorNode(String, ProcessorParameters, StoreBuilder). 2. Reformat+trivial changes on TableProcessorNode.java. --- .../streams/kstream/internals/KTableImpl.java | 32 +++++++++++++------ 1 file changed, 23 insertions(+), 9 deletions(-) diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableImpl.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableImpl.java index e9291cc5148..6a1af184d3e 100644 --- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableImpl.java +++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableImpl.java @@ -201,17 +201,31 @@ public class KTableImpl extends AbstractStream implements KTable< private KTable doMapValues(final ValueMapperWithKey mapper, final MaterializedInternal> materializedInternal) { - // we actually do not need generate store names at all since if it is not specified, we will not - // materialize the store; but we still need to burn one index BEFORE generating the processor to keep compatibility. - if (materializedInternal != null && materializedInternal.storeName() == null) { - builder.newStoreName(MAPVALUES_NAME); + final Serde keySerde; + final Serde valueSerde; + final String queryableStoreName; + final StoreBuilder> storeBuilder; + + if (materializedInternal != null) { + // we actually do not need to generate store names at all since if it is not specified, we will not + // materialize the store; but we still need to burn one index BEFORE generating the processor to keep compatibility. + if (materializedInternal.storeName() == null) { + builder.newStoreName(MAPVALUES_NAME); + } + keySerde = materializedInternal.keySerde() != null ? materializedInternal.keySerde() : this.keySerde; + valueSerde = materializedInternal.valueSerde(); + queryableStoreName = materializedInternal.queryableStoreName(); + // only materialize if materialized is specified and it has queryable name + storeBuilder = queryableStoreName != null ? (new KeyValueStoreMaterializer<>(materializedInternal)).materialize() : null; + } else { + keySerde = this.keySerde; + valueSerde = null; + queryableStoreName = null; + storeBuilder = null; } final String name = builder.newProcessorName(MAPVALUES_NAME); - // only materialize if the state store has queryable name - final String queryableStoreName = materializedInternal != null ? materializedInternal.queryableStoreName() : null; - final StoreBuilder> storeBuilder = queryableStoreName != null ? (new KeyValueStoreMaterializer<>(materializedInternal)).materialize() : null; final KTableProcessorSupplier processorSupplier = new KTableMapValues<>(this, mapper, queryableStoreName); // leaving in calls to ITB until building topology with graph @@ -232,8 +246,8 @@ public class KTableImpl extends AbstractStream implements KTable< // we preserve the value following the order of 1) materialized, 2) null return new KTableImpl<>( name, - materializedInternal != null && materializedInternal.keySerde() != null ? materializedInternal.keySerde() : keySerde, - materializedInternal != null ? materializedInternal.valueSerde() : null, + keySerde, + valueSerde, sourceNodes, queryableStoreName, processorSupplier,