Browse Source

KAFKA-6958: Overload KTable methods to allow to name operation name using the new Named class (#6412)

Sub-task required to allow to define custom processor names with KStreams DSL(KIP-307). This is the 4th PR for KIP-307. 

Reviewers: John Roesler <john@confluent.io>, Bill Bejeck <bbejeck@gmail.com>
pull/6962/head
Florian Hussonnois 5 years ago committed by Bill Bejeck
parent
commit
6d6366cd55
  1. 1177
      streams/src/main/java/org/apache/kafka/streams/kstream/KTable.java
  2. 193
      streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableImpl.java
  3. 29
      streams/src/test/java/org/apache/kafka/streams/StreamsBuilderTest.java
  4. 2
      streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableImplTest.java

1177
streams/src/main/java/org/apache/kafka/streams/kstream/KTable.java

File diff suppressed because it is too large Load Diff

193
streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableImpl.java

@ -25,6 +25,7 @@ import org.apache.kafka.streams.kstream.KStream; @@ -25,6 +25,7 @@ import org.apache.kafka.streams.kstream.KStream;
import org.apache.kafka.streams.kstream.KTable;
import org.apache.kafka.streams.kstream.KeyValueMapper;
import org.apache.kafka.streams.kstream.Materialized;
import org.apache.kafka.streams.kstream.Named;
import org.apache.kafka.streams.kstream.Predicate;
import org.apache.kafka.streams.kstream.Suppressed;
import org.apache.kafka.streams.kstream.ValueJoiner;
@ -113,6 +114,7 @@ public class KTableImpl<K, S, V> extends AbstractStream<K, V> implements KTable< @@ -113,6 +114,7 @@ public class KTableImpl<K, S, V> extends AbstractStream<K, V> implements KTable<
}
private KTable<K, V> doFilter(final Predicate<? super K, ? super V> predicate,
final Named named,
final MaterializedInternal<K, V, KeyValueStore<Bytes, byte[]>> materializedInternal,
final boolean filterNot) {
final Serde<K> keySerde;
@ -140,8 +142,7 @@ public class KTableImpl<K, S, V> extends AbstractStream<K, V> implements KTable< @@ -140,8 +142,7 @@ public class KTableImpl<K, S, V> extends AbstractStream<K, V> implements KTable<
queryableStoreName = null;
storeBuilder = null;
}
final String name = builder.newProcessorName(FILTER_NAME);
final String name = new NamedInternal(named).orElseGenerateWithPrefix(builder, FILTER_NAME);
final KTableProcessorSupplier<K, V, V> processorSupplier =
new KTableFilter<>(this, predicate, filterNot, queryableStoreName);
@ -171,36 +172,64 @@ public class KTableImpl<K, S, V> extends AbstractStream<K, V> implements KTable< @@ -171,36 +172,64 @@ public class KTableImpl<K, S, V> extends AbstractStream<K, V> implements KTable<
@Override
public KTable<K, V> filter(final Predicate<? super K, ? super V> predicate) {
Objects.requireNonNull(predicate, "predicate can't be null");
return doFilter(predicate, null, false);
return doFilter(predicate, NamedInternal.empty(), null, false);
}
@Override
public KTable<K, V> filter(final Predicate<? super K, ? super V> predicate, final Named named) {
Objects.requireNonNull(predicate, "predicate can't be null");
return doFilter(predicate, named, null, false);
}
@Override
public KTable<K, V> filter(final Predicate<? super K, ? super V> predicate,
final Named named,
final Materialized<K, V, KeyValueStore<Bytes, byte[]>> materialized) {
Objects.requireNonNull(predicate, "predicate can't be null");
Objects.requireNonNull(materialized, "materialized can't be null");
final MaterializedInternal<K, V, KeyValueStore<Bytes, byte[]>> materializedInternal = new MaterializedInternal<>(materialized);
return doFilter(predicate, materializedInternal, false);
return doFilter(predicate, named, materializedInternal, false);
}
@Override
public KTable<K, V> filter(final Predicate<? super K, ? super V> predicate,
final Materialized<K, V, KeyValueStore<Bytes, byte[]>> materialized) {
return filter(predicate, NamedInternal.empty(), materialized);
}
@Override
public KTable<K, V> filterNot(final Predicate<? super K, ? super V> predicate) {
Objects.requireNonNull(predicate, "predicate can't be null");
return doFilter(predicate, null, true);
return doFilter(predicate, NamedInternal.empty(), null, true);
}
@Override
public KTable<K, V> filterNot(final Predicate<? super K, ? super V> predicate,
final Named named) {
Objects.requireNonNull(predicate, "predicate can't be null");
return doFilter(predicate, named, null, true);
}
@Override
public KTable<K, V> filterNot(final Predicate<? super K, ? super V> predicate,
final Materialized<K, V, KeyValueStore<Bytes, byte[]>> materialized) {
return filterNot(predicate, NamedInternal.empty(), materialized);
}
@Override
public KTable<K, V> filterNot(final Predicate<? super K, ? super V> predicate,
final Named named,
final Materialized<K, V, KeyValueStore<Bytes, byte[]>> materialized) {
Objects.requireNonNull(predicate, "predicate can't be null");
Objects.requireNonNull(materialized, "materialized can't be null");
final MaterializedInternal<K, V, KeyValueStore<Bytes, byte[]>> materializedInternal = new MaterializedInternal<>(materialized);
return doFilter(predicate, materializedInternal, true);
final NamedInternal renamed = new NamedInternal(named);
return doFilter(predicate, renamed, materializedInternal, true);
}
private <VR> KTable<K, VR> doMapValues(final ValueMapperWithKey<? super K, ? super V, ? extends VR> mapper,
final Named named,
final MaterializedInternal<K, VR, KeyValueStore<Bytes, byte[]>> materializedInternal) {
final Serde<K> keySerde;
final Serde<VR> valueSerde;
@ -225,7 +254,7 @@ public class KTableImpl<K, S, V> extends AbstractStream<K, V> implements KTable< @@ -225,7 +254,7 @@ public class KTableImpl<K, S, V> extends AbstractStream<K, V> implements KTable<
storeBuilder = null;
}
final String name = builder.newProcessorName(MAPVALUES_NAME);
final String name = new NamedInternal(named).orElseGenerateWithPrefix(builder, MAPVALUES_NAME);
final KTableProcessorSupplier<K, V, VR> processorSupplier = new KTableMapValues<>(this, mapper, queryableStoreName);
@ -260,54 +289,101 @@ public class KTableImpl<K, S, V> extends AbstractStream<K, V> implements KTable< @@ -260,54 +289,101 @@ public class KTableImpl<K, S, V> extends AbstractStream<K, V> implements KTable<
@Override
public <VR> KTable<K, VR> mapValues(final ValueMapper<? super V, ? extends VR> mapper) {
Objects.requireNonNull(mapper, "mapper can't be null");
return doMapValues(withKey(mapper), null);
return doMapValues(withKey(mapper), NamedInternal.empty(), null);
}
@Override
public <VR> KTable<K, VR> mapValues(final ValueMapper<? super V, ? extends VR> mapper,
final Named named) {
Objects.requireNonNull(mapper, "mapper can't be null");
return doMapValues(withKey(mapper), named, null);
}
@Override
public <VR> KTable<K, VR> mapValues(final ValueMapperWithKey<? super K, ? super V, ? extends VR> mapper) {
Objects.requireNonNull(mapper, "mapper can't be null");
return doMapValues(mapper, null);
return doMapValues(mapper, NamedInternal.empty(), null);
}
@Override
public <VR> KTable<K, VR> mapValues(final ValueMapperWithKey<? super K, ? super V, ? extends VR> mapper,
final Named named) {
Objects.requireNonNull(mapper, "mapper can't be null");
return doMapValues(mapper, named, null);
}
@Override
public <VR> KTable<K, VR> mapValues(final ValueMapper<? super V, ? extends VR> mapper,
final Materialized<K, VR, KeyValueStore<Bytes, byte[]>> materialized) {
return mapValues(mapper, NamedInternal.empty(), materialized);
}
@Override
public <VR> KTable<K, VR> mapValues(final ValueMapper<? super V, ? extends VR> mapper,
final Named named,
final Materialized<K, VR, KeyValueStore<Bytes, byte[]>> materialized) {
Objects.requireNonNull(mapper, "mapper can't be null");
Objects.requireNonNull(materialized, "materialized can't be null");
final MaterializedInternal<K, VR, KeyValueStore<Bytes, byte[]>> materializedInternal = new MaterializedInternal<>(materialized);
return doMapValues(withKey(mapper), materializedInternal);
return doMapValues(withKey(mapper), named, materializedInternal);
}
@Override
public <VR> KTable<K, VR> mapValues(final ValueMapperWithKey<? super K, ? super V, ? extends VR> mapper,
final Materialized<K, VR, KeyValueStore<Bytes, byte[]>> materialized) {
return mapValues(mapper, NamedInternal.empty(), materialized);
}
@Override
public <VR> KTable<K, VR> mapValues(final ValueMapperWithKey<? super K, ? super V, ? extends VR> mapper,
final Named named,
final Materialized<K, VR, KeyValueStore<Bytes, byte[]>> materialized) {
Objects.requireNonNull(mapper, "mapper can't be null");
Objects.requireNonNull(materialized, "materialized can't be null");
final MaterializedInternal<K, VR, KeyValueStore<Bytes, byte[]>> materializedInternal = new MaterializedInternal<>(materialized);
return doMapValues(mapper, materializedInternal);
return doMapValues(mapper, named, materializedInternal);
}
@Override
public <VR> KTable<K, VR> transformValues(final ValueTransformerWithKeySupplier<? super K, ? super V, ? extends VR> transformerSupplier,
final String... stateStoreNames) {
return doTransformValues(transformerSupplier, null, NamedInternal.empty(), stateStoreNames);
}
@Override
public <VR> KTable<K, VR> transformValues(final ValueTransformerWithKeySupplier<? super K, ? super V, ? extends VR> transformerSupplier,
final Named named,
final String... stateStoreNames) {
return doTransformValues(transformerSupplier, null, stateStoreNames);
Objects.requireNonNull(named, "processorName can't be null");
return doTransformValues(transformerSupplier, null, new NamedInternal(named), stateStoreNames);
}
@Override
public <VR> KTable<K, VR> transformValues(final ValueTransformerWithKeySupplier<? super K, ? super V, ? extends VR> transformerSupplier,
final Materialized<K, VR, KeyValueStore<Bytes, byte[]>> materialized,
final String... stateStoreNames) {
return transformValues(transformerSupplier, materialized, NamedInternal.empty(), stateStoreNames);
}
@Override
public <VR> KTable<K, VR> transformValues(final ValueTransformerWithKeySupplier<? super K, ? super V, ? extends VR> transformerSupplier,
final Materialized<K, VR, KeyValueStore<Bytes, byte[]>> materialized,
final Named named,
final String... stateStoreNames) {
Objects.requireNonNull(materialized, "materialized can't be null");
Objects.requireNonNull(named, "named can't be null");
final MaterializedInternal<K, VR, KeyValueStore<Bytes, byte[]>> materializedInternal = new MaterializedInternal<>(materialized);
return doTransformValues(transformerSupplier, materializedInternal, stateStoreNames);
return doTransformValues(transformerSupplier, materializedInternal, new NamedInternal(named), stateStoreNames);
}
private <VR> KTable<K, VR> doTransformValues(final ValueTransformerWithKeySupplier<? super K, ? super V, ? extends VR> transformerSupplier,
final MaterializedInternal<K, VR, KeyValueStore<Bytes, byte[]>> materializedInternal,
final NamedInternal namedInternal,
final String... stateStoreNames) {
Objects.requireNonNull(stateStoreNames, "stateStoreNames");
final Serde<K> keySerde;
@ -331,7 +407,7 @@ public class KTableImpl<K, S, V> extends AbstractStream<K, V> implements KTable< @@ -331,7 +407,7 @@ public class KTableImpl<K, S, V> extends AbstractStream<K, V> implements KTable<
storeBuilder = null;
}
final String name = builder.newProcessorName(TRANSFORMVALUES_NAME);
final String name = namedInternal.orElseGenerateWithPrefix(builder, TRANSFORMVALUES_NAME);
final KTableProcessorSupplier<K, V, VR> processorSupplier = new KTableTransformValues<>(
this,
@ -364,8 +440,14 @@ public class KTableImpl<K, S, V> extends AbstractStream<K, V> implements KTable< @@ -364,8 +440,14 @@ public class KTableImpl<K, S, V> extends AbstractStream<K, V> implements KTable<
@Override
public KStream<K, V> toStream() {
final String name = builder.newProcessorName(TOSTREAM_NAME);
return toStream(NamedInternal.empty());
}
@Override
public KStream<K, V> toStream(final Named named) {
Objects.requireNonNull(named, "named can't be null");
final String name = new NamedInternal(named).orElseGenerateWithPrefix(builder, TOSTREAM_NAME);
final ProcessorSupplier<K, Change<V>> kStreamMapValues = new KStreamMapValues<>((key, change) -> change.newValue);
final ProcessorParameters<K, V> processorParameters = unsafeCastProcessorParametersToCompletelyDifferentType(
new ProcessorParameters<>(kStreamMapValues, name)
@ -387,6 +469,12 @@ public class KTableImpl<K, S, V> extends AbstractStream<K, V> implements KTable< @@ -387,6 +469,12 @@ public class KTableImpl<K, S, V> extends AbstractStream<K, V> implements KTable<
return toStream().selectKey(mapper);
}
@Override
public <K1> KStream<K1, V> toStream(final KeyValueMapper<? super K, ? super V, ? extends K1> mapper,
final Named named) {
return toStream(named).selectKey(mapper);
}
@Override
public KTable<K, V> suppress(final Suppressed<? super K> suppressed) {
final String name;
@ -407,8 +495,7 @@ public class KTableImpl<K, S, V> extends AbstractStream<K, V> implements KTable< @@ -407,8 +495,7 @@ public class KTableImpl<K, S, V> extends AbstractStream<K, V> implements KTable<
storeName,
this
);
final ProcessorGraphNode<K, Change<V>> node = new StatefulProcessorNode<>(
name,
new ProcessorParameters<>(suppressionSupplier, name),
@ -452,64 +539,112 @@ public class KTableImpl<K, S, V> extends AbstractStream<K, V> implements KTable< @@ -452,64 +539,112 @@ public class KTableImpl<K, S, V> extends AbstractStream<K, V> implements KTable<
@Override
public <V1, R> KTable<K, R> join(final KTable<K, V1> other,
final ValueJoiner<? super V, ? super V1, ? extends R> joiner) {
return doJoin(other, joiner, null, false, false);
return doJoin(other, joiner, NamedInternal.empty(), null, false, false);
}
@Override
public <V1, R> KTable<K, R> join(final KTable<K, V1> other,
final ValueJoiner<? super V, ? super V1, ? extends R> joiner,
final Named named) {
return doJoin(other, joiner, named, null, false, false);
}
@Override
public <VO, VR> KTable<K, VR> join(final KTable<K, VO> other,
final ValueJoiner<? super V, ? super VO, ? extends VR> joiner,
final Materialized<K, VR, KeyValueStore<Bytes, byte[]>> materialized) {
return join(other, joiner, NamedInternal.empty(), materialized);
}
@Override
public <VO, VR> KTable<K, VR> join(final KTable<K, VO> other,
final ValueJoiner<? super V, ? super VO, ? extends VR> joiner,
final Named named,
final Materialized<K, VR, KeyValueStore<Bytes, byte[]>> materialized) {
Objects.requireNonNull(materialized, "materialized can't be null");
final MaterializedInternal<K, VR, KeyValueStore<Bytes, byte[]>> materializedInternal =
new MaterializedInternal<>(materialized, builder, MERGE_NAME);
return doJoin(other, joiner, materializedInternal, false, false);
return doJoin(other, joiner, named, materializedInternal, false, false);
}
@Override
public <V1, R> KTable<K, R> outerJoin(final KTable<K, V1> other,
final ValueJoiner<? super V, ? super V1, ? extends R> joiner) {
return doJoin(other, joiner, null, true, true);
return outerJoin(other, joiner, NamedInternal.empty());
}
@Override
public <V1, R> KTable<K, R> outerJoin(final KTable<K, V1> other,
final ValueJoiner<? super V, ? super V1, ? extends R> joiner,
final Named named) {
return doJoin(other, joiner, named, null, true, true);
}
@Override
public <VO, VR> KTable<K, VR> outerJoin(final KTable<K, VO> other,
final ValueJoiner<? super V, ? super VO, ? extends VR> joiner,
final Materialized<K, VR, KeyValueStore<Bytes, byte[]>> materialized) {
return outerJoin(other, joiner, NamedInternal.empty(), materialized);
}
@Override
public <VO, VR> KTable<K, VR> outerJoin(final KTable<K, VO> other,
final ValueJoiner<? super V, ? super VO, ? extends VR> joiner,
final Named named,
final Materialized<K, VR, KeyValueStore<Bytes, byte[]>> materialized) {
Objects.requireNonNull(materialized, "materialized can't be null");
final MaterializedInternal<K, VR, KeyValueStore<Bytes, byte[]>> materializedInternal =
new MaterializedInternal<>(materialized, builder, MERGE_NAME);
return doJoin(other, joiner, materializedInternal, true, true);
return doJoin(other, joiner, named, materializedInternal, true, true);
}
@Override
public <V1, R> KTable<K, R> leftJoin(final KTable<K, V1> other,
final ValueJoiner<? super V, ? super V1, ? extends R> joiner) {
return doJoin(other, joiner, null, true, false);
return leftJoin(other, joiner, NamedInternal.empty());
}
@Override
public <V1, R> KTable<K, R> leftJoin(final KTable<K, V1> other,
final ValueJoiner<? super V, ? super V1, ? extends R> joiner,
final Named named) {
return doJoin(other, joiner, named, null, true, false);
}
@Override
public <VO, VR> KTable<K, VR> leftJoin(final KTable<K, VO> other,
final ValueJoiner<? super V, ? super VO, ? extends VR> joiner,
final Materialized<K, VR, KeyValueStore<Bytes, byte[]>> materialized) {
return leftJoin(other, joiner, NamedInternal.empty(), materialized);
}
@Override
public <VO, VR> KTable<K, VR> leftJoin(final KTable<K, VO> other,
final ValueJoiner<? super V, ? super VO, ? extends VR> joiner,
final Named named,
final Materialized<K, VR, KeyValueStore<Bytes, byte[]>> materialized) {
Objects.requireNonNull(materialized, "materialized can't be null");
final MaterializedInternal<K, VR, KeyValueStore<Bytes, byte[]>> materializedInternal =
new MaterializedInternal<>(materialized, builder, MERGE_NAME);
return doJoin(other, joiner, materializedInternal, true, false);
return doJoin(other, joiner, named, materializedInternal, true, false);
}
@SuppressWarnings("unchecked")
private <VO, VR> KTable<K, VR> doJoin(final KTable<K, VO> other,
final ValueJoiner<? super V, ? super VO, ? extends VR> joiner,
final Named joinName,
final MaterializedInternal<K, VR, KeyValueStore<Bytes, byte[]>> materializedInternal,
final boolean leftOuter,
final boolean rightOuter) {
Objects.requireNonNull(other, "other can't be null");
Objects.requireNonNull(joiner, "joiner can't be null");
Objects.requireNonNull(joinName, "joinName can't be null");
final String joinMergeName = builder.newProcessorName(MERGE_NAME);
final NamedInternal renamed = new NamedInternal(joinName);
final String joinMergeName = renamed.orElseGenerateWithPrefix(builder, MERGE_NAME);
final Set<String> allSourceNodes = ensureJoinableWith((AbstractStream<K, VO>) other);
if (leftOuter) {
@ -533,8 +668,8 @@ public class KTableImpl<K, S, V> extends AbstractStream<K, V> implements KTable< @@ -533,8 +668,8 @@ public class KTableImpl<K, S, V> extends AbstractStream<K, V> implements KTable<
joinOther = new KTableKTableOuterJoin<>((KTableImpl<K, ?, VO>) other, this, reverseJoiner(joiner));
}
final String joinThisName = builder.newProcessorName(JOINTHIS_NAME);
final String joinOtherName = builder.newProcessorName(JOINOTHER_NAME);
final String joinThisName = renamed.suffixWithOrElseGet("-join-this", builder, JOINTHIS_NAME);
final String joinOtherName = renamed.suffixWithOrElseGet("-join-other", builder, JOINOTHER_NAME);
final ProcessorParameters<K, Change<V>> joinThisProcessorParameters = new ProcessorParameters<>(joinThis, joinThisName);
final ProcessorParameters<K, Change<VO>> joinOtherProcessorParameters = new ProcessorParameters<>(joinOther, joinOtherName);
@ -605,7 +740,8 @@ public class KTableImpl<K, S, V> extends AbstractStream<K, V> implements KTable< @@ -605,7 +740,8 @@ public class KTableImpl<K, S, V> extends AbstractStream<K, V> implements KTable<
final Grouped<K1, V1> grouped) {
Objects.requireNonNull(selector, "selector can't be null");
Objects.requireNonNull(grouped, "grouped can't be null");
final String selectName = builder.newProcessorName(SELECT_NAME);
final GroupedInternal<K1, V1> groupedInternal = new GroupedInternal<>(grouped);
final String selectName = new NamedInternal(groupedInternal.name()).orElseGenerateWithPrefix(builder, SELECT_NAME);
final KTableProcessorSupplier<K, V, KeyValue<K1, V1>> selectSupplier = new KTableRepartitionMap<>(this, selector);
final ProcessorParameters<K, Change<V>> processorParameters = new ProcessorParameters<>(selectSupplier, selectName);
@ -616,7 +752,6 @@ public class KTableImpl<K, S, V> extends AbstractStream<K, V> implements KTable< @@ -616,7 +752,6 @@ public class KTableImpl<K, S, V> extends AbstractStream<K, V> implements KTable<
builder.addGraphNode(this.streamsGraphNode, groupByMapNode);
this.enableSendingOldValues();
final GroupedInternal<K1, V1> groupedInternal = new GroupedInternal<>(grouped);
return new KGroupedTableImpl<>(
builder,
selectName,

29
streams/src/test/java/org/apache/kafka/streams/StreamsBuilderTest.java

@ -705,6 +705,35 @@ public class StreamsBuilderTest { @@ -705,6 +705,35 @@ public class StreamsBuilderTest {
assertSpecifiedNameForOperation(topology, "KSTREAM-SOURCE-0000000000", STREAM_OPERATION_NAME);
}
@Test
@SuppressWarnings("unchecked")
public void shouldUseSpecifiedNameForToStream() {
builder.table(STREAM_TOPIC)
.toStream(Named.as("to-stream"));
builder.build();
final ProcessorTopology topology = builder.internalTopologyBuilder.rewriteTopology(new StreamsConfig(props)).build();
assertSpecifiedNameForOperation(topology,
"KSTREAM-SOURCE-0000000001",
"KTABLE-SOURCE-0000000002",
"to-stream");
}
@Test
@SuppressWarnings("unchecked")
public void shouldUseSpecifiedNameForToStreamWithMapper() {
builder.table(STREAM_TOPIC)
.toStream(KeyValue::pair, Named.as("to-stream"));
builder.build();
final ProcessorTopology topology = builder.internalTopologyBuilder.rewriteTopology(new StreamsConfig(props)).build();
assertSpecifiedNameForOperation(topology,
"KSTREAM-SOURCE-0000000001",
"KTABLE-SOURCE-0000000002",
"to-stream",
"KSTREAM-KEY-SELECT-0000000004");
}
private static void assertSpecifiedNameForOperation(final ProcessorTopology topology, final String... expected) {
final List<ProcessorNode> processors = topology.processors();
assertEquals("Invalid number of expected processors", expected.length, processors.size());

2
streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableImplTest.java

@ -351,7 +351,7 @@ public class KTableImplTest { @@ -351,7 +351,7 @@ public class KTableImplTest {
@Test(expected = NullPointerException.class)
public void shouldNotAllowNullSelectorOnToStream() {
table.toStream(null);
table.toStream((KeyValueMapper) null);
}
@Test(expected = NullPointerException.class)

Loading…
Cancel
Save