diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableImpl.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableImpl.java index e9291cc5148..6a1af184d3e 100644 --- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableImpl.java +++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableImpl.java @@ -201,17 +201,31 @@ public class KTableImpl extends AbstractStream implements KTable< private KTable doMapValues(final ValueMapperWithKey mapper, final MaterializedInternal> materializedInternal) { - // we actually do not need generate store names at all since if it is not specified, we will not - // materialize the store; but we still need to burn one index BEFORE generating the processor to keep compatibility. - if (materializedInternal != null && materializedInternal.storeName() == null) { - builder.newStoreName(MAPVALUES_NAME); + final Serde keySerde; + final Serde valueSerde; + final String queryableStoreName; + final StoreBuilder> storeBuilder; + + if (materializedInternal != null) { + // we actually do not need to generate store names at all since if it is not specified, we will not + // materialize the store; but we still need to burn one index BEFORE generating the processor to keep compatibility. + if (materializedInternal.storeName() == null) { + builder.newStoreName(MAPVALUES_NAME); + } + keySerde = materializedInternal.keySerde() != null ? materializedInternal.keySerde() : this.keySerde; + valueSerde = materializedInternal.valueSerde(); + queryableStoreName = materializedInternal.queryableStoreName(); + // only materialize if materialized is specified and it has queryable name + storeBuilder = queryableStoreName != null ? (new KeyValueStoreMaterializer<>(materializedInternal)).materialize() : null; + } else { + keySerde = this.keySerde; + valueSerde = null; + queryableStoreName = null; + storeBuilder = null; } final String name = builder.newProcessorName(MAPVALUES_NAME); - // only materialize if the state store has queryable name - final String queryableStoreName = materializedInternal != null ? materializedInternal.queryableStoreName() : null; - final StoreBuilder> storeBuilder = queryableStoreName != null ? (new KeyValueStoreMaterializer<>(materializedInternal)).materialize() : null; final KTableProcessorSupplier processorSupplier = new KTableMapValues<>(this, mapper, queryableStoreName); // leaving in calls to ITB until building topology with graph @@ -232,8 +246,8 @@ public class KTableImpl extends AbstractStream implements KTable< // we preserve the value following the order of 1) materialized, 2) null return new KTableImpl<>( name, - materializedInternal != null && materializedInternal.keySerde() != null ? materializedInternal.keySerde() : keySerde, - materializedInternal != null ? materializedInternal.valueSerde() : null, + keySerde, + valueSerde, sourceNodes, queryableStoreName, processorSupplier,