Browse Source

MINOR: code cleanup for inconsistent naming (#8871)

Reviewer: Matthias J. Sax <matthias@confluent.io>
pull/7160/head
Vito Jeng 4 years ago committed by GitHub
parent
commit
965877c11c
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 10
      streams/src/main/java/org/apache/kafka/streams/kstream/internals/AbstractStream.java
  2. 6
      streams/src/main/java/org/apache/kafka/streams/kstream/internals/CogroupedStreamAggregateBuilder.java
  3. 4
      streams/src/main/java/org/apache/kafka/streams/kstream/internals/ConsumedInternal.java
  4. 4
      streams/src/main/java/org/apache/kafka/streams/kstream/internals/GroupedStreamAggregateBuilder.java
  5. 8
      streams/src/main/java/org/apache/kafka/streams/kstream/internals/KGroupedStreamImpl.java
  6. 6
      streams/src/main/java/org/apache/kafka/streams/kstream/internals/KGroupedTableImpl.java
  7. 40
      streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamImpl.java
  8. 22
      streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableImpl.java
  9. 8
      streams/src/main/java/org/apache/kafka/streams/kstream/internals/SessionWindowedKStreamImpl.java
  10. 8
      streams/src/main/java/org/apache/kafka/streams/kstream/internals/TimeWindowedKStreamImpl.java
  11. 8
      streams/src/main/java/org/apache/kafka/streams/state/internals/InMemoryTimeOrderedKeyValueBuffer.java
  12. 20
      streams/src/test/java/org/apache/kafka/test/InternalMockProcessorContext.java

10
streams/src/main/java/org/apache/kafka/streams/kstream/internals/AbstractStream.java

@ -46,7 +46,7 @@ public abstract class AbstractStream<K, V> { @@ -46,7 +46,7 @@ public abstract class AbstractStream<K, V> {
protected final String name;
protected final Serde<K> keySerde;
protected final Serde<V> valSerde;
protected final Serde<V> valueSerde;
protected final Set<String> subTopologySourceNodes;
protected final StreamsGraphNode streamsGraphNode;
protected final InternalStreamsBuilder builder;
@ -57,14 +57,14 @@ public abstract class AbstractStream<K, V> { @@ -57,14 +57,14 @@ public abstract class AbstractStream<K, V> {
this.name = stream.name;
this.builder = stream.builder;
this.keySerde = stream.keySerde;
this.valSerde = stream.valSerde;
this.valueSerde = stream.valueSerde;
this.subTopologySourceNodes = stream.subTopologySourceNodes;
this.streamsGraphNode = stream.streamsGraphNode;
}
AbstractStream(final String name,
final Serde<K> keySerde,
final Serde<V> valSerde,
final Serde<V> valueSerde,
final Set<String> subTopologySourceNodes,
final StreamsGraphNode streamsGraphNode,
final InternalStreamsBuilder builder) {
@ -75,7 +75,7 @@ public abstract class AbstractStream<K, V> { @@ -75,7 +75,7 @@ public abstract class AbstractStream<K, V> {
this.name = name;
this.builder = builder;
this.keySerde = keySerde;
this.valSerde = valSerde;
this.valueSerde = valueSerde;
this.subTopologySourceNodes = subTopologySourceNodes;
this.streamsGraphNode = streamsGraphNode;
}
@ -143,6 +143,6 @@ public abstract class AbstractStream<K, V> { @@ -143,6 +143,6 @@ public abstract class AbstractStream<K, V> {
}
public Serde<V> valueSerde() {
return valSerde;
return valueSerde;
}
}

6
streams/src/main/java/org/apache/kafka/streams/kstream/internals/CogroupedStreamAggregateBuilder.java

@ -53,7 +53,7 @@ class CogroupedStreamAggregateBuilder<K, VOut> { @@ -53,7 +53,7 @@ class CogroupedStreamAggregateBuilder<K, VOut> {
final NamedInternal named,
final StoreBuilder<?> storeBuilder,
final Serde<KR> keySerde,
final Serde<VOut> valSerde,
final Serde<VOut> valueSerde,
final String queryableName,
final Windows<W> windows,
final SessionWindows sessionWindows,
@ -68,7 +68,7 @@ class CogroupedStreamAggregateBuilder<K, VOut> { @@ -68,7 +68,7 @@ class CogroupedStreamAggregateBuilder<K, VOut> {
final String repartionNamePrefix = repartitionReqs.userProvidedRepartitionTopicName != null ?
repartitionReqs.userProvidedRepartitionTopicName : storeBuilder.name();
createRepartitionSource(repartionNamePrefix, repartitionNodeBuilder, repartitionReqs.keySerde, repartitionReqs.valSerde);
createRepartitionSource(repartionNamePrefix, repartitionNodeBuilder, repartitionReqs.keySerde, repartitionReqs.valueSerde);
if (!parentNodes.containsKey(repartitionReqs)) {
final StreamsGraphNode repartitionNode = repartitionNodeBuilder.build();
@ -118,7 +118,7 @@ class CogroupedStreamAggregateBuilder<K, VOut> { @@ -118,7 +118,7 @@ class CogroupedStreamAggregateBuilder<K, VOut> {
return new KTableImpl<KR, VIn, VOut>(
mergeProcessorName,
keySerde,
valSerde,
valueSerde,
Collections.singleton(mergeNode.nodeName()),
queryableName,
passThrough,

4
streams/src/main/java/org/apache/kafka/streams/kstream/internals/ConsumedInternal.java

@ -30,10 +30,10 @@ public class ConsumedInternal<K, V> extends Consumed<K, V> { @@ -30,10 +30,10 @@ public class ConsumedInternal<K, V> extends Consumed<K, V> {
public ConsumedInternal(final Serde<K> keySerde,
final Serde<V> valSerde,
final Serde<V> valueSerde,
final TimestampExtractor timestampExtractor,
final Topology.AutoOffsetReset offsetReset) {
this(Consumed.with(keySerde, valSerde, timestampExtractor, offsetReset));
this(Consumed.with(keySerde, valueSerde, timestampExtractor, offsetReset));
}
public ConsumedInternal() {

4
streams/src/main/java/org/apache/kafka/streams/kstream/internals/GroupedStreamAggregateBuilder.java

@ -71,7 +71,7 @@ class GroupedStreamAggregateBuilder<K, V> { @@ -71,7 +71,7 @@ class GroupedStreamAggregateBuilder<K, V> {
final KStreamAggProcessorSupplier<K, KR, V, VR> aggregateSupplier,
final String queryableStoreName,
final Serde<KR> keySerde,
final Serde<VR> valSerde) {
final Serde<VR> valueSerde) {
assert queryableStoreName == null || queryableStoreName.equals(storeBuilder.name());
final String aggFunctionName = functionName.name();
@ -107,7 +107,7 @@ class GroupedStreamAggregateBuilder<K, V> { @@ -107,7 +107,7 @@ class GroupedStreamAggregateBuilder<K, V> {
return new KTableImpl<>(aggFunctionName,
keySerde,
valSerde,
valueSerde,
sourceName.equals(this.name) ? subTopologySourceNodes : Collections.singleton(sourceName),
queryableStoreName,
aggregateSupplier,

8
streams/src/main/java/org/apache/kafka/streams/kstream/internals/KGroupedStreamImpl.java

@ -67,7 +67,7 @@ class KGroupedStreamImpl<K, V> extends AbstractStream<K, V> implements KGroupedS @@ -67,7 +67,7 @@ class KGroupedStreamImpl<K, V> extends AbstractStream<K, V> implements KGroupedS
@Override
public KTable<K, V> reduce(final Reducer<V> reducer) {
return reduce(reducer, Materialized.with(keySerde, valSerde));
return reduce(reducer, Materialized.with(keySerde, valueSerde));
}
@Override
@ -91,7 +91,7 @@ class KGroupedStreamImpl<K, V> extends AbstractStream<K, V> implements KGroupedS @@ -91,7 +91,7 @@ class KGroupedStreamImpl<K, V> extends AbstractStream<K, V> implements KGroupedS
materializedInternal.withKeySerde(keySerde);
}
if (materializedInternal.valueSerde() == null) {
materializedInternal.withValueSerde(valSerde);
materializedInternal.withValueSerde(valueSerde);
}
final String name = new NamedInternal(named).orElseGenerateWithPrefix(builder, REDUCE_NAME);
@ -196,7 +196,7 @@ class KGroupedStreamImpl<K, V> extends AbstractStream<K, V> implements KGroupedS @@ -196,7 +196,7 @@ class KGroupedStreamImpl<K, V> extends AbstractStream<K, V> implements KGroupedS
subTopologySourceNodes,
name,
keySerde,
valSerde,
valueSerde,
aggregateBuilder,
streamsGraphNode
);
@ -211,7 +211,7 @@ class KGroupedStreamImpl<K, V> extends AbstractStream<K, V> implements KGroupedS @@ -211,7 +211,7 @@ class KGroupedStreamImpl<K, V> extends AbstractStream<K, V> implements KGroupedS
subTopologySourceNodes,
name,
keySerde,
valSerde,
valueSerde,
aggregateBuilder,
streamsGraphNode
);

6
streams/src/main/java/org/apache/kafka/streams/kstream/internals/KGroupedTableImpl.java

@ -116,7 +116,7 @@ public class KGroupedTableImpl<K, V> extends AbstractStream<K, V> implements KGr @@ -116,7 +116,7 @@ public class KGroupedTableImpl<K, V> extends AbstractStream<K, V> implements KGr
.withSinkName(sinkName)
.withSourceName(sourceName)
.withKeySerde(keySerde)
.withValueSerde(valSerde)
.withValueSerde(valueSerde)
.withNodeName(sourceName).build();
}
@ -143,7 +143,7 @@ public class KGroupedTableImpl<K, V> extends AbstractStream<K, V> implements KGr @@ -143,7 +143,7 @@ public class KGroupedTableImpl<K, V> extends AbstractStream<K, V> implements KGr
materializedInternal.withKeySerde(keySerde);
}
if (materializedInternal.valueSerde() == null) {
materializedInternal.withValueSerde(valSerde);
materializedInternal.withValueSerde(valueSerde);
}
final ProcessorSupplier<K, Change<V>> aggregateSupplier = new KTableReduce<>(
materializedInternal.storeName(),
@ -155,7 +155,7 @@ public class KGroupedTableImpl<K, V> extends AbstractStream<K, V> implements KGr @@ -155,7 +155,7 @@ public class KGroupedTableImpl<K, V> extends AbstractStream<K, V> implements KGr
@Override
public KTable<K, V> reduce(final Reducer<V> adder,
final Reducer<V> subtractor) {
return reduce(adder, subtractor, Materialized.with(keySerde, valSerde));
return reduce(adder, subtractor, Materialized.with(keySerde, valueSerde));
}

40
streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamImpl.java

@ -164,7 +164,7 @@ public class KStreamImpl<K, V> extends AbstractStream<K, V> implements KStream<K @@ -164,7 +164,7 @@ public class KStreamImpl<K, V> extends AbstractStream<K, V> implements KStream<K
return new KStreamImpl<>(
name,
keySerde,
valSerde,
valueSerde,
subTopologySourceNodes,
repartitionRequired,
filterProcessorNode,
@ -193,7 +193,7 @@ public class KStreamImpl<K, V> extends AbstractStream<K, V> implements KStream<K @@ -193,7 +193,7 @@ public class KStreamImpl<K, V> extends AbstractStream<K, V> implements KStream<K
return new KStreamImpl<>(
name,
keySerde,
valSerde,
valueSerde,
subTopologySourceNodes,
repartitionRequired,
filterNotProcessorNode,
@ -220,7 +220,7 @@ public class KStreamImpl<K, V> extends AbstractStream<K, V> implements KStream<K @@ -220,7 +220,7 @@ public class KStreamImpl<K, V> extends AbstractStream<K, V> implements KStream<K
return new KStreamImpl<>(
selectKeyProcessorNode.nodeName(),
null,
valSerde,
valueSerde,
subTopologySourceNodes,
true,
selectKeyProcessorNode,
@ -432,7 +432,7 @@ public class KStreamImpl<K, V> extends AbstractStream<K, V> implements KStream<K @@ -432,7 +432,7 @@ public class KStreamImpl<K, V> extends AbstractStream<K, V> implements KStream<K
return new KStreamImpl<>(
name,
keySerde,
valSerde,
valueSerde,
subTopologySourceNodes,
repartitionRequired,
peekNode,
@ -485,7 +485,7 @@ public class KStreamImpl<K, V> extends AbstractStream<K, V> implements KStream<K @@ -485,7 +485,7 @@ public class KStreamImpl<K, V> extends AbstractStream<K, V> implements KStream<K
new ProcessorGraphNode<>(childNames[i], innerProcessorParameters);
builder.addGraphNode(branchNode, branchChildNode);
branchChildren[i] = new KStreamImpl<>(childNames[i], keySerde, valSerde, subTopologySourceNodes, repartitionRequired, branchChildNode, builder);
branchChildren[i] = new KStreamImpl<>(childNames[i], keySerde, valueSerde, subTopologySourceNodes, repartitionRequired, branchChildNode, builder);
}
return branchChildren;
@ -537,7 +537,7 @@ public class KStreamImpl<K, V> extends AbstractStream<K, V> implements KStream<K @@ -537,7 +537,7 @@ public class KStreamImpl<K, V> extends AbstractStream<K, V> implements KStream<K
@Deprecated
@Override
public KStream<K, V> through(final String topic) {
return through(topic, Produced.with(keySerde, valSerde, null));
return through(topic, Produced.with(keySerde, valueSerde, null));
}
@Deprecated
@ -552,7 +552,7 @@ public class KStreamImpl<K, V> extends AbstractStream<K, V> implements KStream<K @@ -552,7 +552,7 @@ public class KStreamImpl<K, V> extends AbstractStream<K, V> implements KStream<K
producedInternal.withKeySerde(keySerde);
}
if (producedInternal.valueSerde() == null) {
producedInternal.withValueSerde(valSerde);
producedInternal.withValueSerde(valueSerde);
}
to(topic, producedInternal);
@ -585,7 +585,7 @@ public class KStreamImpl<K, V> extends AbstractStream<K, V> implements KStream<K @@ -585,7 +585,7 @@ public class KStreamImpl<K, V> extends AbstractStream<K, V> implements KStream<K
final String name = repartitionedInternal.name() != null ? repartitionedInternal.name() : builder
.newProcessorName(REPARTITION_NAME);
final Serde<V> valueSerde = repartitionedInternal.valueSerde() == null ? valSerde : repartitionedInternal.valueSerde();
final Serde<V> valueSerde = repartitionedInternal.valueSerde() == null ? this.valueSerde : repartitionedInternal.valueSerde();
final Serde<K> keySerde = repartitionedInternal.keySerde() == null ? this.keySerde : repartitionedInternal.keySerde();
final UnoptimizableRepartitionNodeBuilder<K, V> unoptimizableRepartitionNodeBuilder = UnoptimizableRepartitionNode
@ -622,7 +622,7 @@ public class KStreamImpl<K, V> extends AbstractStream<K, V> implements KStream<K @@ -622,7 +622,7 @@ public class KStreamImpl<K, V> extends AbstractStream<K, V> implements KStream<K
@Override
public void to(final String topic) {
to(topic, Produced.with(keySerde, valSerde, null));
to(topic, Produced.with(keySerde, valueSerde, null));
}
@Override
@ -636,14 +636,14 @@ public class KStreamImpl<K, V> extends AbstractStream<K, V> implements KStream<K @@ -636,14 +636,14 @@ public class KStreamImpl<K, V> extends AbstractStream<K, V> implements KStream<K
producedInternal.withKeySerde(keySerde);
}
if (producedInternal.valueSerde() == null) {
producedInternal.withValueSerde(valSerde);
producedInternal.withValueSerde(valueSerde);
}
to(new StaticTopicNameExtractor<>(topic), producedInternal);
}
@Override
public void to(final TopicNameExtractor<K, V> topicExtractor) {
to(topicExtractor, Produced.with(keySerde, valSerde, null));
to(topicExtractor, Produced.with(keySerde, valueSerde, null));
}
@Override
@ -657,7 +657,7 @@ public class KStreamImpl<K, V> extends AbstractStream<K, V> implements KStream<K @@ -657,7 +657,7 @@ public class KStreamImpl<K, V> extends AbstractStream<K, V> implements KStream<K
producedInternal.withKeySerde(keySerde);
}
if (producedInternal.valueSerde() == null) {
producedInternal.withValueSerde(valSerde);
producedInternal.withValueSerde(valueSerde);
}
to(topicExtractor, producedInternal);
}
@ -676,12 +676,12 @@ public class KStreamImpl<K, V> extends AbstractStream<K, V> implements KStream<K @@ -676,12 +676,12 @@ public class KStreamImpl<K, V> extends AbstractStream<K, V> implements KStream<K
@Override
public KTable<K, V> toTable() {
return toTable(NamedInternal.empty(), Materialized.with(keySerde, valSerde));
return toTable(NamedInternal.empty(), Materialized.with(keySerde, valueSerde));
}
@Override
public KTable<K, V> toTable(final Named named) {
return toTable(named, Materialized.with(keySerde, valSerde));
return toTable(named, Materialized.with(keySerde, valueSerde));
}
@Override
@ -705,7 +705,7 @@ public class KStreamImpl<K, V> extends AbstractStream<K, V> implements KStream<K @@ -705,7 +705,7 @@ public class KStreamImpl<K, V> extends AbstractStream<K, V> implements KStream<K
? keySerde
: materializedInternal.keySerde();
final Serde<V> valueSerdeOverride = materializedInternal.valueSerde() == null
? valSerde
? valueSerde
: materializedInternal.valueSerde();
final Set<String> subTopologySourceNodes;
@ -757,7 +757,7 @@ public class KStreamImpl<K, V> extends AbstractStream<K, V> implements KStream<K @@ -757,7 +757,7 @@ public class KStreamImpl<K, V> extends AbstractStream<K, V> implements KStream<K
@Override
public <KR> KGroupedStream<KR, V> groupBy(final KeyValueMapper<? super K, ? super V, KR> keySelector) {
return groupBy(keySelector, Grouped.with(null, valSerde));
return groupBy(keySelector, Grouped.with(null, valueSerde));
}
@Override
@ -795,7 +795,7 @@ public class KStreamImpl<K, V> extends AbstractStream<K, V> implements KStream<K @@ -795,7 +795,7 @@ public class KStreamImpl<K, V> extends AbstractStream<K, V> implements KStream<K
@Override
public KGroupedStream<K, V> groupByKey() {
return groupByKey(Grouped.with(keySerde, valSerde));
return groupByKey(Grouped.with(keySerde, valueSerde));
}
@Override
@ -982,7 +982,7 @@ public class KStreamImpl<K, V> extends AbstractStream<K, V> implements KStream<K @@ -982,7 +982,7 @@ public class KStreamImpl<K, V> extends AbstractStream<K, V> implements KStream<K
final Serde<K> keySerdeOverride,
final Serde<V> valueSerdeOverride) {
final Serde<K> repartitionKeySerde = keySerdeOverride != null ? keySerdeOverride : keySerde;
final Serde<V> repartitionValueSerde = valueSerdeOverride != null ? valueSerdeOverride : valSerde;
final Serde<V> repartitionValueSerde = valueSerdeOverride != null ? valueSerdeOverride : valueSerde;
final OptimizableRepartitionNodeBuilder<K, V> optimizableRepartitionNodeBuilder =
OptimizableRepartitionNode.optimizableRepartitionNodeBuilder();
// we still need to create the repartitioned source each time
@ -1013,7 +1013,7 @@ public class KStreamImpl<K, V> extends AbstractStream<K, V> implements KStream<K @@ -1013,7 +1013,7 @@ public class KStreamImpl<K, V> extends AbstractStream<K, V> implements KStream<K
static <K1, V1, RN extends BaseRepartitionNode<K1, V1>> String createRepartitionedSource(final InternalStreamsBuilder builder,
final Serde<K1> keySerde,
final Serde<V1> valSerde,
final Serde<V1> valueSerde,
final String repartitionTopicNamePrefix,
final StreamPartitioner<K1, V1> streamPartitioner,
final BaseRepartitionNodeBuilder<K1, V1, RN> baseRepartitionNodeBuilder) {
@ -1048,7 +1048,7 @@ public class KStreamImpl<K, V> extends AbstractStream<K, V> implements KStream<K @@ -1048,7 +1048,7 @@ public class KStreamImpl<K, V> extends AbstractStream<K, V> implements KStream<K
);
baseRepartitionNodeBuilder.withKeySerde(keySerde)
.withValueSerde(valSerde)
.withValueSerde(valueSerde)
.withSourceName(sourceName)
.withRepartitionTopic(repartitionTopicName)
.withSinkName(sinkName)

22
streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableImpl.java

@ -132,13 +132,13 @@ public class KTableImpl<K, S, V> extends AbstractStream<K, V> implements KTable< @@ -132,13 +132,13 @@ public class KTableImpl<K, S, V> extends AbstractStream<K, V> implements KTable<
public KTableImpl(final String name,
final Serde<K> keySerde,
final Serde<V> valSerde,
final Serde<V> valueSerde,
final Set<String> subTopologySourceNodes,
final String queryableStoreName,
final ProcessorSupplier<?, ?> processorSupplier,
final StreamsGraphNode streamsGraphNode,
final InternalStreamsBuilder builder) {
super(name, keySerde, valSerde, subTopologySourceNodes, streamsGraphNode, builder);
super(name, keySerde, valueSerde, subTopologySourceNodes, streamsGraphNode, builder);
this.processorSupplier = processorSupplier;
this.queryableStoreName = queryableStoreName;
}
@ -167,13 +167,13 @@ public class KTableImpl<K, S, V> extends AbstractStream<K, V> implements KTable< @@ -167,13 +167,13 @@ public class KTableImpl<K, S, V> extends AbstractStream<K, V> implements KTable<
// we preserve the key following the order of 1) materialized, 2) parent
keySerde = materializedInternal.keySerde() != null ? materializedInternal.keySerde() : this.keySerde;
// we preserve the value following the order of 1) materialized, 2) parent
valueSerde = materializedInternal.valueSerde() != null ? materializedInternal.valueSerde() : this.valSerde;
valueSerde = materializedInternal.valueSerde() != null ? materializedInternal.valueSerde() : this.valueSerde;
queryableStoreName = materializedInternal.queryableStoreName();
// only materialize if materialized is specified and it has queryable name
storeBuilder = queryableStoreName != null ? (new TimestampedKeyValueStoreMaterializer<>(materializedInternal)).materialize() : null;
} else {
keySerde = this.keySerde;
valueSerde = this.valSerde;
valueSerde = this.valueSerde;
queryableStoreName = null;
storeBuilder = null;
}
@ -497,7 +497,7 @@ public class KTableImpl<K, S, V> extends AbstractStream<K, V> implements KTable< @@ -497,7 +497,7 @@ public class KTableImpl<K, S, V> extends AbstractStream<K, V> implements KTable<
builder.addGraphNode(this.streamsGraphNode, toStreamNode);
// we can inherit parent key and value serde
return new KStreamImpl<>(name, keySerde, valSerde, subTopologySourceNodes, false, toStreamNode, builder);
return new KStreamImpl<>(name, keySerde, valueSerde, subTopologySourceNodes, false, toStreamNode, builder);
}
@Override
@ -539,13 +539,13 @@ public class KTableImpl<K, S, V> extends AbstractStream<K, V> implements KTable< @@ -539,13 +539,13 @@ public class KTableImpl<K, S, V> extends AbstractStream<K, V> implements KTable<
storeBuilder = new InMemoryTimeOrderedKeyValueBuffer.Builder<>(
storeName,
keySerde,
valSerde)
valueSerde)
.withLoggingEnabled(topicConfig);
} else {
storeBuilder = new InMemoryTimeOrderedKeyValueBuffer.Builder<>(
storeName,
keySerde,
valSerde)
valueSerde)
.withLoggingDisabled();
}
@ -560,7 +560,7 @@ public class KTableImpl<K, S, V> extends AbstractStream<K, V> implements KTable< @@ -560,7 +560,7 @@ public class KTableImpl<K, S, V> extends AbstractStream<K, V> implements KTable<
return new KTableImpl<K, S, V>(
name,
keySerde,
valSerde,
valueSerde,
Collections.singleton(this.name),
null,
suppressionSupplier,
@ -998,7 +998,7 @@ public class KTableImpl<K, S, V> extends AbstractStream<K, V> implements KTable< @@ -998,7 +998,7 @@ public class KTableImpl<K, S, V> extends AbstractStream<K, V> implements KTable<
final Serde<KO> foreignKeySerde = ((KTableImpl<KO, VO, ?>) foreignKeyTable).keySerde;
final Serde<SubscriptionWrapper<K>> subscriptionWrapperSerde = new SubscriptionWrapperSerde<>(subscriptionPrimaryKeySerdePseudoTopic, keySerde);
final SubscriptionResponseWrapperSerde<VO> responseWrapperSerde =
new SubscriptionResponseWrapperSerde<>(((KTableImpl<KO, VO, VO>) foreignKeyTable).valSerde);
new SubscriptionResponseWrapperSerde<>(((KTableImpl<KO, VO, VO>) foreignKeyTable).valueSerde);
final CombinedKeySchema<KO, K> combinedKeySchema = new CombinedKeySchema<>(
subscriptionForeignKeySerdePseudoTopic,
@ -1014,7 +1014,7 @@ public class KTableImpl<K, S, V> extends AbstractStream<K, V> implements KTable< @@ -1014,7 +1014,7 @@ public class KTableImpl<K, S, V> extends AbstractStream<K, V> implements KTable<
subscriptionForeignKeySerdePseudoTopic,
valueHashSerdePseudoTopic,
foreignKeySerde,
valSerde == null ? null : valSerde.serializer(),
valueSerde == null ? null : valueSerde.serializer(),
leftJoin
),
renamed.suffixWithOrElseGet("-subscription-registration-processor", builder, SUBSCRIPTION_REGISTRATION)
@ -1117,7 +1117,7 @@ public class KTableImpl<K, S, V> extends AbstractStream<K, V> implements KTable< @@ -1117,7 +1117,7 @@ public class KTableImpl<K, S, V> extends AbstractStream<K, V> implements KTable<
final KTableValueGetterSupplier<K, V> primaryKeyValueGetter = valueGetterSupplier();
final SubscriptionResolverJoinProcessorSupplier<K, V, VO, VR> resolverProcessorSupplier = new SubscriptionResolverJoinProcessorSupplier<>(
primaryKeyValueGetter,
valSerde == null ? null : valSerde.serializer(),
valueSerde == null ? null : valueSerde.serializer(),
valueHashSerdePseudoTopic,
joiner,
leftJoin

8
streams/src/main/java/org/apache/kafka/streams/kstream/internals/SessionWindowedKStreamImpl.java

@ -53,10 +53,10 @@ public class SessionWindowedKStreamImpl<K, V> extends AbstractStream<K, V> imple @@ -53,10 +53,10 @@ public class SessionWindowedKStreamImpl<K, V> extends AbstractStream<K, V> imple
final Set<String> subTopologySourceNodes,
final String name,
final Serde<K> keySerde,
final Serde<V> valSerde,
final Serde<V> valueSerde,
final GroupedStreamAggregateBuilder<K, V> aggregateBuilder,
final StreamsGraphNode streamsGraphNode) {
super(name, keySerde, valSerde, subTopologySourceNodes, streamsGraphNode, builder);
super(name, keySerde, valueSerde, subTopologySourceNodes, streamsGraphNode, builder);
Objects.requireNonNull(windows, "windows can't be null");
this.windows = windows;
this.aggregateBuilder = aggregateBuilder;
@ -124,7 +124,7 @@ public class SessionWindowedKStreamImpl<K, V> extends AbstractStream<K, V> imple @@ -124,7 +124,7 @@ public class SessionWindowedKStreamImpl<K, V> extends AbstractStream<K, V> imple
@Override
public KTable<Windowed<K>, V> reduce(final Reducer<V> reducer, final Named named) {
return reduce(reducer, named, Materialized.with(keySerde, valSerde));
return reduce(reducer, named, Materialized.with(keySerde, valueSerde));
}
@Override
@ -147,7 +147,7 @@ public class SessionWindowedKStreamImpl<K, V> extends AbstractStream<K, V> imple @@ -147,7 +147,7 @@ public class SessionWindowedKStreamImpl<K, V> extends AbstractStream<K, V> imple
materializedInternal.withKeySerde(keySerde);
}
if (materializedInternal.valueSerde() == null) {
materializedInternal.withValueSerde(valSerde);
materializedInternal.withValueSerde(valueSerde);
}
final String reduceName = new NamedInternal(named).orElseGenerateWithPrefix(builder, REDUCE_NAME);

8
streams/src/main/java/org/apache/kafka/streams/kstream/internals/TimeWindowedKStreamImpl.java

@ -54,10 +54,10 @@ public class TimeWindowedKStreamImpl<K, V, W extends Window> extends AbstractStr @@ -54,10 +54,10 @@ public class TimeWindowedKStreamImpl<K, V, W extends Window> extends AbstractStr
final Set<String> subTopologySourceNodes,
final String name,
final Serde<K> keySerde,
final Serde<V> valSerde,
final Serde<V> valueSerde,
final GroupedStreamAggregateBuilder<K, V> aggregateBuilder,
final StreamsGraphNode streamsGraphNode) {
super(name, keySerde, valSerde, subTopologySourceNodes, streamsGraphNode, builder);
super(name, keySerde, valueSerde, subTopologySourceNodes, streamsGraphNode, builder);
this.windows = Objects.requireNonNull(windows, "windows can't be null");
this.aggregateBuilder = aggregateBuilder;
}
@ -165,7 +165,7 @@ public class TimeWindowedKStreamImpl<K, V, W extends Window> extends AbstractStr @@ -165,7 +165,7 @@ public class TimeWindowedKStreamImpl<K, V, W extends Window> extends AbstractStr
@Override
public KTable<Windowed<K>, V> reduce(final Reducer<V> reducer, final Named named) {
return reduce(reducer, named, Materialized.with(keySerde, valSerde));
return reduce(reducer, named, Materialized.with(keySerde, valueSerde));
}
@Override
@ -189,7 +189,7 @@ public class TimeWindowedKStreamImpl<K, V, W extends Window> extends AbstractStr @@ -189,7 +189,7 @@ public class TimeWindowedKStreamImpl<K, V, W extends Window> extends AbstractStr
materializedInternal.withKeySerde(keySerde);
}
if (materializedInternal.valueSerde() == null) {
materializedInternal.withValueSerde(valSerde);
materializedInternal.withValueSerde(valueSerde);
}
final String reduceName = new NamedInternal(named).orElseGenerateWithPrefix(builder, REDUCE_NAME);

8
streams/src/main/java/org/apache/kafka/streams/state/internals/InMemoryTimeOrderedKeyValueBuffer.java

@ -92,14 +92,14 @@ public final class InMemoryTimeOrderedKeyValueBuffer<K, V> implements TimeOrdere @@ -92,14 +92,14 @@ public final class InMemoryTimeOrderedKeyValueBuffer<K, V> implements TimeOrdere
private final String storeName;
private final Serde<K> keySerde;
private final Serde<V> valSerde;
private final Serde<V> valueSerde;
private boolean loggingEnabled = true;
private Map<String, String> logConfig = new HashMap<>();
public Builder(final String storeName, final Serde<K> keySerde, final Serde<V> valSerde) {
public Builder(final String storeName, final Serde<K> keySerde, final Serde<V> valueSerde) {
this.storeName = storeName;
this.keySerde = keySerde;
this.valSerde = valSerde;
this.valueSerde = valueSerde;
}
/**
@ -140,7 +140,7 @@ public final class InMemoryTimeOrderedKeyValueBuffer<K, V> implements TimeOrdere @@ -140,7 +140,7 @@ public final class InMemoryTimeOrderedKeyValueBuffer<K, V> implements TimeOrdere
@Override
public InMemoryTimeOrderedKeyValueBuffer<K, V> build() {
return new InMemoryTimeOrderedKeyValueBuffer<>(storeName, loggingEnabled, keySerde, valSerde);
return new InMemoryTimeOrderedKeyValueBuffer<>(storeName, loggingEnabled, keySerde, valueSerde);
}
@Override

20
streams/src/test/java/org/apache/kafka/test/InternalMockProcessorContext.java

@ -68,7 +68,7 @@ public class InternalMockProcessorContext @@ -68,7 +68,7 @@ public class InternalMockProcessorContext
private TaskType taskType = TaskType.ACTIVE;
private Serde<?> keySerde;
private Serde<?> valSerde;
private Serde<?> valueSerde;
private long timestamp = -1L;
public InternalMockProcessorContext() {
@ -123,12 +123,12 @@ public class InternalMockProcessorContext @@ -123,12 +123,12 @@ public class InternalMockProcessorContext
public InternalMockProcessorContext(final File stateDir,
final Serde<?> keySerde,
final Serde<?> valSerde,
final Serde<?> valueSerde,
final StreamsConfig config) {
this(
stateDir,
keySerde,
valSerde,
valueSerde,
new StreamsMetricsImpl(new Metrics(), "mock", StreamsConfig.METRICS_LATEST),
config,
null,
@ -157,13 +157,13 @@ public class InternalMockProcessorContext @@ -157,13 +157,13 @@ public class InternalMockProcessorContext
public InternalMockProcessorContext(final File stateDir,
final Serde<?> keySerde,
final Serde<?> valSerde,
final Serde<?> valueSerde,
final RecordCollector collector,
final ThreadCache cache) {
this(
stateDir,
keySerde,
valSerde,
valueSerde,
new StreamsMetricsImpl(new Metrics(), "mock", StreamsConfig.METRICS_LATEST),
new StreamsConfig(StreamsTestUtils.getStreamsConfig()),
() -> collector,
@ -173,7 +173,7 @@ public class InternalMockProcessorContext @@ -173,7 +173,7 @@ public class InternalMockProcessorContext
public InternalMockProcessorContext(final File stateDir,
final Serde<?> keySerde,
final Serde<?> valSerde,
final Serde<?> valueSerde,
final StreamsMetricsImpl metrics,
final StreamsConfig config,
final RecordCollector.Supplier collectorSupplier,
@ -188,7 +188,7 @@ public class InternalMockProcessorContext @@ -188,7 +188,7 @@ public class InternalMockProcessorContext
super.setCurrentNode(new ProcessorNode<>("TESTING_NODE"));
this.stateDir = stateDir;
this.keySerde = keySerde;
this.valSerde = valSerde;
this.valueSerde = valueSerde;
this.recordCollectorSupplier = collectorSupplier;
this.metrics().setRocksDBMetricsRecordingTrigger(new RocksDBMetricsRecordingTrigger(new SystemTime()));
}
@ -207,8 +207,8 @@ public class InternalMockProcessorContext @@ -207,8 +207,8 @@ public class InternalMockProcessorContext
this.keySerde = keySerde;
}
public void setValueSerde(final Serde<?> valSerde) {
this.valSerde = valSerde;
public void setValueSerde(final Serde<?> valueSerde) {
this.valueSerde = valueSerde;
}
@Override
@ -218,7 +218,7 @@ public class InternalMockProcessorContext @@ -218,7 +218,7 @@ public class InternalMockProcessorContext
@Override
public Serde<?> valueSerde() {
return valSerde;
return valueSerde;
}
// state mgr will be overridden by the state dir and store maps

Loading…
Cancel
Save