diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/AbstractStream.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/AbstractStream.java index 54461340b34..cb0a388876c 100644 --- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/AbstractStream.java +++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/AbstractStream.java @@ -46,7 +46,7 @@ public abstract class AbstractStream { protected final String name; protected final Serde keySerde; - protected final Serde valSerde; + protected final Serde valueSerde; protected final Set subTopologySourceNodes; protected final StreamsGraphNode streamsGraphNode; protected final InternalStreamsBuilder builder; @@ -57,14 +57,14 @@ public abstract class AbstractStream { this.name = stream.name; this.builder = stream.builder; this.keySerde = stream.keySerde; - this.valSerde = stream.valSerde; + this.valueSerde = stream.valueSerde; this.subTopologySourceNodes = stream.subTopologySourceNodes; this.streamsGraphNode = stream.streamsGraphNode; } AbstractStream(final String name, final Serde keySerde, - final Serde valSerde, + final Serde valueSerde, final Set subTopologySourceNodes, final StreamsGraphNode streamsGraphNode, final InternalStreamsBuilder builder) { @@ -75,7 +75,7 @@ public abstract class AbstractStream { this.name = name; this.builder = builder; this.keySerde = keySerde; - this.valSerde = valSerde; + this.valueSerde = valueSerde; this.subTopologySourceNodes = subTopologySourceNodes; this.streamsGraphNode = streamsGraphNode; } @@ -143,6 +143,6 @@ public abstract class AbstractStream { } public Serde valueSerde() { - return valSerde; + return valueSerde; } } diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/CogroupedStreamAggregateBuilder.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/CogroupedStreamAggregateBuilder.java index 4fdb1f25a2a..ddad482292d 100644 --- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/CogroupedStreamAggregateBuilder.java +++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/CogroupedStreamAggregateBuilder.java @@ -53,7 +53,7 @@ class CogroupedStreamAggregateBuilder { final NamedInternal named, final StoreBuilder storeBuilder, final Serde keySerde, - final Serde valSerde, + final Serde valueSerde, final String queryableName, final Windows windows, final SessionWindows sessionWindows, @@ -68,7 +68,7 @@ class CogroupedStreamAggregateBuilder { final String repartionNamePrefix = repartitionReqs.userProvidedRepartitionTopicName != null ? repartitionReqs.userProvidedRepartitionTopicName : storeBuilder.name(); - createRepartitionSource(repartionNamePrefix, repartitionNodeBuilder, repartitionReqs.keySerde, repartitionReqs.valSerde); + createRepartitionSource(repartionNamePrefix, repartitionNodeBuilder, repartitionReqs.keySerde, repartitionReqs.valueSerde); if (!parentNodes.containsKey(repartitionReqs)) { final StreamsGraphNode repartitionNode = repartitionNodeBuilder.build(); @@ -118,7 +118,7 @@ class CogroupedStreamAggregateBuilder { return new KTableImpl( mergeProcessorName, keySerde, - valSerde, + valueSerde, Collections.singleton(mergeNode.nodeName()), queryableName, passThrough, diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/ConsumedInternal.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/ConsumedInternal.java index 94ceaffde08..574489541fb 100644 --- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/ConsumedInternal.java +++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/ConsumedInternal.java @@ -30,10 +30,10 @@ public class ConsumedInternal extends Consumed { public ConsumedInternal(final Serde keySerde, - final Serde valSerde, + final Serde valueSerde, final TimestampExtractor timestampExtractor, final Topology.AutoOffsetReset offsetReset) { - this(Consumed.with(keySerde, valSerde, timestampExtractor, offsetReset)); + this(Consumed.with(keySerde, valueSerde, timestampExtractor, offsetReset)); } public ConsumedInternal() { diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/GroupedStreamAggregateBuilder.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/GroupedStreamAggregateBuilder.java index 861dfaed144..414665761e8 100644 --- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/GroupedStreamAggregateBuilder.java +++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/GroupedStreamAggregateBuilder.java @@ -71,7 +71,7 @@ class GroupedStreamAggregateBuilder { final KStreamAggProcessorSupplier aggregateSupplier, final String queryableStoreName, final Serde keySerde, - final Serde valSerde) { + final Serde valueSerde) { assert queryableStoreName == null || queryableStoreName.equals(storeBuilder.name()); final String aggFunctionName = functionName.name(); @@ -107,7 +107,7 @@ class GroupedStreamAggregateBuilder { return new KTableImpl<>(aggFunctionName, keySerde, - valSerde, + valueSerde, sourceName.equals(this.name) ? subTopologySourceNodes : Collections.singleton(sourceName), queryableStoreName, aggregateSupplier, diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KGroupedStreamImpl.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KGroupedStreamImpl.java index bd8586aabea..c0ad1728f4f 100644 --- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KGroupedStreamImpl.java +++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KGroupedStreamImpl.java @@ -67,7 +67,7 @@ class KGroupedStreamImpl extends AbstractStream implements KGroupedS @Override public KTable reduce(final Reducer reducer) { - return reduce(reducer, Materialized.with(keySerde, valSerde)); + return reduce(reducer, Materialized.with(keySerde, valueSerde)); } @Override @@ -91,7 +91,7 @@ class KGroupedStreamImpl extends AbstractStream implements KGroupedS materializedInternal.withKeySerde(keySerde); } if (materializedInternal.valueSerde() == null) { - materializedInternal.withValueSerde(valSerde); + materializedInternal.withValueSerde(valueSerde); } final String name = new NamedInternal(named).orElseGenerateWithPrefix(builder, REDUCE_NAME); @@ -196,7 +196,7 @@ class KGroupedStreamImpl extends AbstractStream implements KGroupedS subTopologySourceNodes, name, keySerde, - valSerde, + valueSerde, aggregateBuilder, streamsGraphNode ); @@ -211,7 +211,7 @@ class KGroupedStreamImpl extends AbstractStream implements KGroupedS subTopologySourceNodes, name, keySerde, - valSerde, + valueSerde, aggregateBuilder, streamsGraphNode ); diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KGroupedTableImpl.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KGroupedTableImpl.java index e710b4b4bea..b0f779c2503 100644 --- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KGroupedTableImpl.java +++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KGroupedTableImpl.java @@ -116,7 +116,7 @@ public class KGroupedTableImpl extends AbstractStream implements KGr .withSinkName(sinkName) .withSourceName(sourceName) .withKeySerde(keySerde) - .withValueSerde(valSerde) + .withValueSerde(valueSerde) .withNodeName(sourceName).build(); } @@ -143,7 +143,7 @@ public class KGroupedTableImpl extends AbstractStream implements KGr materializedInternal.withKeySerde(keySerde); } if (materializedInternal.valueSerde() == null) { - materializedInternal.withValueSerde(valSerde); + materializedInternal.withValueSerde(valueSerde); } final ProcessorSupplier> aggregateSupplier = new KTableReduce<>( materializedInternal.storeName(), @@ -155,7 +155,7 @@ public class KGroupedTableImpl extends AbstractStream implements KGr @Override public KTable reduce(final Reducer adder, final Reducer subtractor) { - return reduce(adder, subtractor, Materialized.with(keySerde, valSerde)); + return reduce(adder, subtractor, Materialized.with(keySerde, valueSerde)); } diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamImpl.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamImpl.java index 2282287f56e..3d00f220f99 100644 --- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamImpl.java +++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamImpl.java @@ -164,7 +164,7 @@ public class KStreamImpl extends AbstractStream implements KStream( name, keySerde, - valSerde, + valueSerde, subTopologySourceNodes, repartitionRequired, filterProcessorNode, @@ -193,7 +193,7 @@ public class KStreamImpl extends AbstractStream implements KStream( name, keySerde, - valSerde, + valueSerde, subTopologySourceNodes, repartitionRequired, filterNotProcessorNode, @@ -220,7 +220,7 @@ public class KStreamImpl extends AbstractStream implements KStream( selectKeyProcessorNode.nodeName(), null, - valSerde, + valueSerde, subTopologySourceNodes, true, selectKeyProcessorNode, @@ -432,7 +432,7 @@ public class KStreamImpl extends AbstractStream implements KStream( name, keySerde, - valSerde, + valueSerde, subTopologySourceNodes, repartitionRequired, peekNode, @@ -485,7 +485,7 @@ public class KStreamImpl extends AbstractStream implements KStream(childNames[i], innerProcessorParameters); builder.addGraphNode(branchNode, branchChildNode); - branchChildren[i] = new KStreamImpl<>(childNames[i], keySerde, valSerde, subTopologySourceNodes, repartitionRequired, branchChildNode, builder); + branchChildren[i] = new KStreamImpl<>(childNames[i], keySerde, valueSerde, subTopologySourceNodes, repartitionRequired, branchChildNode, builder); } return branchChildren; @@ -537,7 +537,7 @@ public class KStreamImpl extends AbstractStream implements KStream through(final String topic) { - return through(topic, Produced.with(keySerde, valSerde, null)); + return through(topic, Produced.with(keySerde, valueSerde, null)); } @Deprecated @@ -552,7 +552,7 @@ public class KStreamImpl extends AbstractStream implements KStream extends AbstractStream implements KStream valueSerde = repartitionedInternal.valueSerde() == null ? valSerde : repartitionedInternal.valueSerde(); + final Serde valueSerde = repartitionedInternal.valueSerde() == null ? this.valueSerde : repartitionedInternal.valueSerde(); final Serde keySerde = repartitionedInternal.keySerde() == null ? this.keySerde : repartitionedInternal.keySerde(); final UnoptimizableRepartitionNodeBuilder unoptimizableRepartitionNodeBuilder = UnoptimizableRepartitionNode @@ -622,7 +622,7 @@ public class KStreamImpl extends AbstractStream implements KStream extends AbstractStream implements KStream(topic), producedInternal); } @Override public void to(final TopicNameExtractor topicExtractor) { - to(topicExtractor, Produced.with(keySerde, valSerde, null)); + to(topicExtractor, Produced.with(keySerde, valueSerde, null)); } @Override @@ -657,7 +657,7 @@ public class KStreamImpl extends AbstractStream implements KStream extends AbstractStream implements KStream toTable() { - return toTable(NamedInternal.empty(), Materialized.with(keySerde, valSerde)); + return toTable(NamedInternal.empty(), Materialized.with(keySerde, valueSerde)); } @Override public KTable toTable(final Named named) { - return toTable(named, Materialized.with(keySerde, valSerde)); + return toTable(named, Materialized.with(keySerde, valueSerde)); } @Override @@ -705,7 +705,7 @@ public class KStreamImpl extends AbstractStream implements KStream valueSerdeOverride = materializedInternal.valueSerde() == null - ? valSerde + ? valueSerde : materializedInternal.valueSerde(); final Set subTopologySourceNodes; @@ -757,7 +757,7 @@ public class KStreamImpl extends AbstractStream implements KStream KGroupedStream groupBy(final KeyValueMapper keySelector) { - return groupBy(keySelector, Grouped.with(null, valSerde)); + return groupBy(keySelector, Grouped.with(null, valueSerde)); } @Override @@ -795,7 +795,7 @@ public class KStreamImpl extends AbstractStream implements KStream groupByKey() { - return groupByKey(Grouped.with(keySerde, valSerde)); + return groupByKey(Grouped.with(keySerde, valueSerde)); } @Override @@ -982,7 +982,7 @@ public class KStreamImpl extends AbstractStream implements KStream keySerdeOverride, final Serde valueSerdeOverride) { final Serde repartitionKeySerde = keySerdeOverride != null ? keySerdeOverride : keySerde; - final Serde repartitionValueSerde = valueSerdeOverride != null ? valueSerdeOverride : valSerde; + final Serde repartitionValueSerde = valueSerdeOverride != null ? valueSerdeOverride : valueSerde; final OptimizableRepartitionNodeBuilder optimizableRepartitionNodeBuilder = OptimizableRepartitionNode.optimizableRepartitionNodeBuilder(); // we still need to create the repartitioned source each time @@ -1013,7 +1013,7 @@ public class KStreamImpl extends AbstractStream implements KStream> String createRepartitionedSource(final InternalStreamsBuilder builder, final Serde keySerde, - final Serde valSerde, + final Serde valueSerde, final String repartitionTopicNamePrefix, final StreamPartitioner streamPartitioner, final BaseRepartitionNodeBuilder baseRepartitionNodeBuilder) { @@ -1048,7 +1048,7 @@ public class KStreamImpl extends AbstractStream implements KStream extends AbstractStream implements KTable< public KTableImpl(final String name, final Serde keySerde, - final Serde valSerde, + final Serde valueSerde, final Set subTopologySourceNodes, final String queryableStoreName, final ProcessorSupplier processorSupplier, final StreamsGraphNode streamsGraphNode, final InternalStreamsBuilder builder) { - super(name, keySerde, valSerde, subTopologySourceNodes, streamsGraphNode, builder); + super(name, keySerde, valueSerde, subTopologySourceNodes, streamsGraphNode, builder); this.processorSupplier = processorSupplier; this.queryableStoreName = queryableStoreName; } @@ -167,13 +167,13 @@ public class KTableImpl extends AbstractStream implements KTable< // we preserve the key following the order of 1) materialized, 2) parent keySerde = materializedInternal.keySerde() != null ? materializedInternal.keySerde() : this.keySerde; // we preserve the value following the order of 1) materialized, 2) parent - valueSerde = materializedInternal.valueSerde() != null ? materializedInternal.valueSerde() : this.valSerde; + valueSerde = materializedInternal.valueSerde() != null ? materializedInternal.valueSerde() : this.valueSerde; queryableStoreName = materializedInternal.queryableStoreName(); // only materialize if materialized is specified and it has queryable name storeBuilder = queryableStoreName != null ? (new TimestampedKeyValueStoreMaterializer<>(materializedInternal)).materialize() : null; } else { keySerde = this.keySerde; - valueSerde = this.valSerde; + valueSerde = this.valueSerde; queryableStoreName = null; storeBuilder = null; } @@ -497,7 +497,7 @@ public class KTableImpl extends AbstractStream implements KTable< builder.addGraphNode(this.streamsGraphNode, toStreamNode); // we can inherit parent key and value serde - return new KStreamImpl<>(name, keySerde, valSerde, subTopologySourceNodes, false, toStreamNode, builder); + return new KStreamImpl<>(name, keySerde, valueSerde, subTopologySourceNodes, false, toStreamNode, builder); } @Override @@ -539,13 +539,13 @@ public class KTableImpl extends AbstractStream implements KTable< storeBuilder = new InMemoryTimeOrderedKeyValueBuffer.Builder<>( storeName, keySerde, - valSerde) + valueSerde) .withLoggingEnabled(topicConfig); } else { storeBuilder = new InMemoryTimeOrderedKeyValueBuffer.Builder<>( storeName, keySerde, - valSerde) + valueSerde) .withLoggingDisabled(); } @@ -560,7 +560,7 @@ public class KTableImpl extends AbstractStream implements KTable< return new KTableImpl( name, keySerde, - valSerde, + valueSerde, Collections.singleton(this.name), null, suppressionSupplier, @@ -998,7 +998,7 @@ public class KTableImpl extends AbstractStream implements KTable< final Serde foreignKeySerde = ((KTableImpl) foreignKeyTable).keySerde; final Serde> subscriptionWrapperSerde = new SubscriptionWrapperSerde<>(subscriptionPrimaryKeySerdePseudoTopic, keySerde); final SubscriptionResponseWrapperSerde responseWrapperSerde = - new SubscriptionResponseWrapperSerde<>(((KTableImpl) foreignKeyTable).valSerde); + new SubscriptionResponseWrapperSerde<>(((KTableImpl) foreignKeyTable).valueSerde); final CombinedKeySchema combinedKeySchema = new CombinedKeySchema<>( subscriptionForeignKeySerdePseudoTopic, @@ -1014,7 +1014,7 @@ public class KTableImpl extends AbstractStream implements KTable< subscriptionForeignKeySerdePseudoTopic, valueHashSerdePseudoTopic, foreignKeySerde, - valSerde == null ? null : valSerde.serializer(), + valueSerde == null ? null : valueSerde.serializer(), leftJoin ), renamed.suffixWithOrElseGet("-subscription-registration-processor", builder, SUBSCRIPTION_REGISTRATION) @@ -1117,7 +1117,7 @@ public class KTableImpl extends AbstractStream implements KTable< final KTableValueGetterSupplier primaryKeyValueGetter = valueGetterSupplier(); final SubscriptionResolverJoinProcessorSupplier resolverProcessorSupplier = new SubscriptionResolverJoinProcessorSupplier<>( primaryKeyValueGetter, - valSerde == null ? null : valSerde.serializer(), + valueSerde == null ? null : valueSerde.serializer(), valueHashSerdePseudoTopic, joiner, leftJoin diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/SessionWindowedKStreamImpl.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/SessionWindowedKStreamImpl.java index a0ddefcb3c9..1ca0addc496 100644 --- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/SessionWindowedKStreamImpl.java +++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/SessionWindowedKStreamImpl.java @@ -53,10 +53,10 @@ public class SessionWindowedKStreamImpl extends AbstractStream imple final Set subTopologySourceNodes, final String name, final Serde keySerde, - final Serde valSerde, + final Serde valueSerde, final GroupedStreamAggregateBuilder aggregateBuilder, final StreamsGraphNode streamsGraphNode) { - super(name, keySerde, valSerde, subTopologySourceNodes, streamsGraphNode, builder); + super(name, keySerde, valueSerde, subTopologySourceNodes, streamsGraphNode, builder); Objects.requireNonNull(windows, "windows can't be null"); this.windows = windows; this.aggregateBuilder = aggregateBuilder; @@ -124,7 +124,7 @@ public class SessionWindowedKStreamImpl extends AbstractStream imple @Override public KTable, V> reduce(final Reducer reducer, final Named named) { - return reduce(reducer, named, Materialized.with(keySerde, valSerde)); + return reduce(reducer, named, Materialized.with(keySerde, valueSerde)); } @Override @@ -147,7 +147,7 @@ public class SessionWindowedKStreamImpl extends AbstractStream imple materializedInternal.withKeySerde(keySerde); } if (materializedInternal.valueSerde() == null) { - materializedInternal.withValueSerde(valSerde); + materializedInternal.withValueSerde(valueSerde); } final String reduceName = new NamedInternal(named).orElseGenerateWithPrefix(builder, REDUCE_NAME); diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/TimeWindowedKStreamImpl.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/TimeWindowedKStreamImpl.java index 67b4ab98789..aaf8fbcc531 100644 --- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/TimeWindowedKStreamImpl.java +++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/TimeWindowedKStreamImpl.java @@ -54,10 +54,10 @@ public class TimeWindowedKStreamImpl extends AbstractStr final Set subTopologySourceNodes, final String name, final Serde keySerde, - final Serde valSerde, + final Serde valueSerde, final GroupedStreamAggregateBuilder aggregateBuilder, final StreamsGraphNode streamsGraphNode) { - super(name, keySerde, valSerde, subTopologySourceNodes, streamsGraphNode, builder); + super(name, keySerde, valueSerde, subTopologySourceNodes, streamsGraphNode, builder); this.windows = Objects.requireNonNull(windows, "windows can't be null"); this.aggregateBuilder = aggregateBuilder; } @@ -165,7 +165,7 @@ public class TimeWindowedKStreamImpl extends AbstractStr @Override public KTable, V> reduce(final Reducer reducer, final Named named) { - return reduce(reducer, named, Materialized.with(keySerde, valSerde)); + return reduce(reducer, named, Materialized.with(keySerde, valueSerde)); } @Override @@ -189,7 +189,7 @@ public class TimeWindowedKStreamImpl extends AbstractStr materializedInternal.withKeySerde(keySerde); } if (materializedInternal.valueSerde() == null) { - materializedInternal.withValueSerde(valSerde); + materializedInternal.withValueSerde(valueSerde); } final String reduceName = new NamedInternal(named).orElseGenerateWithPrefix(builder, REDUCE_NAME); diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/InMemoryTimeOrderedKeyValueBuffer.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/InMemoryTimeOrderedKeyValueBuffer.java index 9feccb9d57a..461a373c14d 100644 --- a/streams/src/main/java/org/apache/kafka/streams/state/internals/InMemoryTimeOrderedKeyValueBuffer.java +++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/InMemoryTimeOrderedKeyValueBuffer.java @@ -92,14 +92,14 @@ public final class InMemoryTimeOrderedKeyValueBuffer implements TimeOrdere private final String storeName; private final Serde keySerde; - private final Serde valSerde; + private final Serde valueSerde; private boolean loggingEnabled = true; private Map logConfig = new HashMap<>(); - public Builder(final String storeName, final Serde keySerde, final Serde valSerde) { + public Builder(final String storeName, final Serde keySerde, final Serde valueSerde) { this.storeName = storeName; this.keySerde = keySerde; - this.valSerde = valSerde; + this.valueSerde = valueSerde; } /** @@ -140,7 +140,7 @@ public final class InMemoryTimeOrderedKeyValueBuffer implements TimeOrdere @Override public InMemoryTimeOrderedKeyValueBuffer build() { - return new InMemoryTimeOrderedKeyValueBuffer<>(storeName, loggingEnabled, keySerde, valSerde); + return new InMemoryTimeOrderedKeyValueBuffer<>(storeName, loggingEnabled, keySerde, valueSerde); } @Override diff --git a/streams/src/test/java/org/apache/kafka/test/InternalMockProcessorContext.java b/streams/src/test/java/org/apache/kafka/test/InternalMockProcessorContext.java index f1fd916eb08..fd96b7f881b 100644 --- a/streams/src/test/java/org/apache/kafka/test/InternalMockProcessorContext.java +++ b/streams/src/test/java/org/apache/kafka/test/InternalMockProcessorContext.java @@ -68,7 +68,7 @@ public class InternalMockProcessorContext private TaskType taskType = TaskType.ACTIVE; private Serde keySerde; - private Serde valSerde; + private Serde valueSerde; private long timestamp = -1L; public InternalMockProcessorContext() { @@ -123,12 +123,12 @@ public class InternalMockProcessorContext public InternalMockProcessorContext(final File stateDir, final Serde keySerde, - final Serde valSerde, + final Serde valueSerde, final StreamsConfig config) { this( stateDir, keySerde, - valSerde, + valueSerde, new StreamsMetricsImpl(new Metrics(), "mock", StreamsConfig.METRICS_LATEST), config, null, @@ -157,13 +157,13 @@ public class InternalMockProcessorContext public InternalMockProcessorContext(final File stateDir, final Serde keySerde, - final Serde valSerde, + final Serde valueSerde, final RecordCollector collector, final ThreadCache cache) { this( stateDir, keySerde, - valSerde, + valueSerde, new StreamsMetricsImpl(new Metrics(), "mock", StreamsConfig.METRICS_LATEST), new StreamsConfig(StreamsTestUtils.getStreamsConfig()), () -> collector, @@ -173,7 +173,7 @@ public class InternalMockProcessorContext public InternalMockProcessorContext(final File stateDir, final Serde keySerde, - final Serde valSerde, + final Serde valueSerde, final StreamsMetricsImpl metrics, final StreamsConfig config, final RecordCollector.Supplier collectorSupplier, @@ -188,7 +188,7 @@ public class InternalMockProcessorContext super.setCurrentNode(new ProcessorNode<>("TESTING_NODE")); this.stateDir = stateDir; this.keySerde = keySerde; - this.valSerde = valSerde; + this.valueSerde = valueSerde; this.recordCollectorSupplier = collectorSupplier; this.metrics().setRocksDBMetricsRecordingTrigger(new RocksDBMetricsRecordingTrigger(new SystemTime())); } @@ -207,8 +207,8 @@ public class InternalMockProcessorContext this.keySerde = keySerde; } - public void setValueSerde(final Serde valSerde) { - this.valSerde = valSerde; + public void setValueSerde(final Serde valueSerde) { + this.valueSerde = valueSerde; } @Override @@ -218,7 +218,7 @@ public class InternalMockProcessorContext @Override public Serde valueSerde() { - return valSerde; + return valueSerde; } // state mgr will be overridden by the state dir and store maps