diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/KTable.java b/streams/src/main/java/org/apache/kafka/streams/kstream/KTable.java index 36cd17b2de7..39e5e225b2c 100644 --- a/streams/src/main/java/org/apache/kafka/streams/kstream/KTable.java +++ b/streams/src/main/java/org/apache/kafka/streams/kstream/KTable.java @@ -90,6 +90,68 @@ public interface KTable { */ KTable filter(final Predicate predicate); + /** + * Create a new {@code KTable} that consists of all records of this {@code KTable} which satisfy the given + * predicate, with default serializers, deserializers, and state store. + * All records that do not satisfy the predicate are dropped. + * For each {@code KTable} update, the filter is evaluated based on the current update + * record and then an update record is produced for the result {@code KTable}. + * This is a stateless record-by-record operation. + *

+ * Note that {@code filter} for a changelog stream works differently than {@link KStream#filter(Predicate) + * record stream filters}, because {@link KeyValue records} with {@code null} values (so-called tombstone records) + * have delete semantics. + * Thus, for tombstones the provided filter predicate is not evaluated but the tombstone record is forwarded + * directly if required (i.e., if there is anything to be deleted). + * Furthermore, for each record that gets dropped (i.e., does not satisfy the given predicate) a tombstone record + * is forwarded. + * + * @param predicate a filter {@link Predicate} that is applied to each record + * @param named a {@link Named} config used to name the processor in the topology + * @return a {@code KTable} that contains only those records that satisfy the given predicate + * @see #filterNot(Predicate) + */ + KTable filter(final Predicate predicate, final Named named); + + /** + * Create a new {@code KTable} that consists of all records of this {@code KTable} which satisfy the given + * predicate, with the {@link Serde key serde}, {@link Serde value serde}, and the underlying + * {@link KeyValueStore materialized state storage} configured in the {@link Materialized} instance. + * All records that do not satisfy the predicate are dropped. + * For each {@code KTable} update, the filter is evaluated based on the current update + * record and then an update record is produced for the result {@code KTable}. + * This is a stateless record-by-record operation. + *

+ * Note that {@code filter} for a changelog stream works differently than {@link KStream#filter(Predicate) + * record stream filters}, because {@link KeyValue records} with {@code null} values (so-called tombstone records) + * have delete semantics. + * Thus, for tombstones the provided filter predicate is not evaluated but the tombstone record is forwarded + * directly if required (i.e., if there is anything to be deleted). + * Furthermore, for each record that gets dropped (i.e., does not satisfy the given predicate) a tombstone record + * is forwarded. + *

+ * To query the local {@link KeyValueStore} it must be obtained via + * {@link KafkaStreams#store(String, QueryableStoreType) KafkaStreams#store(...)}: + *

{@code
+     * KafkaStreams streams = ... // filtering words
+     * ReadOnlyKeyValueStore localStore = streams.store(queryableStoreName, QueryableStoreTypes.keyValueStore());
+     * K key = "some-word";
+     * V valueForKey = localStore.get(key); // key must be local (application state is shared over all running Kafka Streams instances)
+     * }
+ * For non-local keys, a custom RPC mechanism must be implemented using {@link KafkaStreams#allMetadata()} to + * query the value of the key on a parallel running instance of your Kafka Streams application. + * The store name to query with is specified by {@link Materialized#as(String)} or {@link Materialized#as(KeyValueBytesStoreSupplier)}. + *

+ * + * @param predicate a filter {@link Predicate} that is applied to each record + * @param materialized a {@link Materialized} that describes how the {@link StateStore} for the resulting {@code KTable} + * should be materialized. Cannot be {@code null} + * @return a {@code KTable} that contains only those records that satisfy the given predicate + * @see #filterNot(Predicate, Materialized) + */ + KTable filter(final Predicate predicate, + final Materialized> materialized); + /** * Create a new {@code KTable} that consists of all records of this {@code KTable} which satisfy the given * predicate, with the {@link Serde key serde}, {@link Serde value serde}, and the underlying @@ -121,12 +183,14 @@ public interface KTable { *

* * @param predicate a filter {@link Predicate} that is applied to each record + * @param named a {@link Named} config used to name the processor in the topology * @param materialized a {@link Materialized} that describes how the {@link StateStore} for the resulting {@code KTable} * should be materialized. Cannot be {@code null} * @return a {@code KTable} that contains only those records that satisfy the given predicate * @see #filterNot(Predicate, Materialized) */ KTable filter(final Predicate predicate, + final Named named, final Materialized> materialized); /** @@ -151,6 +215,29 @@ public interface KTable { */ KTable filterNot(final Predicate predicate); + /** + * Create a new {@code KTable} that consists all records of this {@code KTable} which do not satisfy the + * given predicate, with default serializers, deserializers, and state store. + * All records that do satisfy the predicate are dropped. + * For each {@code KTable} update, the filter is evaluated based on the current update + * record and then an update record is produced for the result {@code KTable}. + * This is a stateless record-by-record operation. + *

+ * Note that {@code filterNot} for a changelog stream works differently than {@link KStream#filterNot(Predicate) + * record stream filters}, because {@link KeyValue records} with {@code null} values (so-called tombstone records) + * have delete semantics. + * Thus, for tombstones the provided filter predicate is not evaluated but the tombstone record is forwarded + * directly if required (i.e., if there is anything to be deleted). + * Furthermore, for each record that gets dropped (i.e., does satisfy the given predicate) a tombstone record is + * forwarded. + * + * @param predicate a filter {@link Predicate} that is applied to each record + * @param named a {@link Named} config used to name the processor in the topology + * @return a {@code KTable} that contains only those records that do not satisfy the given predicate + * @see #filter(Predicate) + */ + KTable filterNot(final Predicate predicate, final Named named); + /** * Create a new {@code KTable} that consists all records of this {@code KTable} which do not satisfy the * given predicate, with the {@link Serde key serde}, {@link Serde value serde}, and the underlying @@ -189,6 +276,45 @@ public interface KTable { KTable filterNot(final Predicate predicate, final Materialized> materialized); + /** + * Create a new {@code KTable} that consists all records of this {@code KTable} which do not satisfy the + * given predicate, with the {@link Serde key serde}, {@link Serde value serde}, and the underlying + * {@link KeyValueStore materialized state storage} configured in the {@link Materialized} instance. + * All records that do satisfy the predicate are dropped. + * For each {@code KTable} update, the filter is evaluated based on the current update + * record and then an update record is produced for the result {@code KTable}. + * This is a stateless record-by-record operation. + *

+ * Note that {@code filterNot} for a changelog stream works differently than {@link KStream#filterNot(Predicate) + * record stream filters}, because {@link KeyValue records} with {@code null} values (so-called tombstone records) + * have delete semantics. + * Thus, for tombstones the provided filter predicate is not evaluated but the tombstone record is forwarded + * directly if required (i.e., if there is anything to be deleted). + * Furthermore, for each record that gets dropped (i.e., does satisfy the given predicate) a tombstone record is + * forwarded. + *

+ * To query the local {@link KeyValueStore} it must be obtained via + * {@link KafkaStreams#store(String, QueryableStoreType) KafkaStreams#store(...)}: + *

{@code
+     * KafkaStreams streams = ... // filtering words
+     * ReadOnlyKeyValueStore localStore = streams.store(queryableStoreName, QueryableStoreTypes.keyValueStore());
+     * K key = "some-word";
+     * V valueForKey = localStore.get(key); // key must be local (application state is shared over all running Kafka Streams instances)
+     * }
+ * For non-local keys, a custom RPC mechanism must be implemented using {@link KafkaStreams#allMetadata()} to + * query the value of the key on a parallel running instance of your Kafka Streams application. + * The store name to query with is specified by {@link Materialized#as(String)} or {@link Materialized#as(KeyValueBytesStoreSupplier)}. + *

+ * @param predicate a filter {@link Predicate} that is applied to each record + * @param named a {@link Named} config used to name the processor in the topology + * @param materialized a {@link Materialized} that describes how the {@link StateStore} for the resulting {@code KTable} + * should be materialized. Cannot be {@code null} + * @return a {@code KTable} that contains only those records that do not satisfy the given predicate + * @see #filter(Predicate, Materialized) + */ + KTable filterNot(final Predicate predicate, + final Named named, + final Materialized> materialized); /** * Create a new {@code KTable} by transforming the value of each record in this {@code KTable} into a new value @@ -201,11 +327,7 @@ public interface KTable { * The example below counts the number of token of the value string. *

{@code
      * KTable inputTable = builder.table("topic");
-     * KTable outputTable = inputTable.mapValue(new ValueMapper {
-     *     Integer apply(String value) {
-     *         return value.split(" ").length;
-     *     }
-     * });
+     * KTable outputTable = inputTable.mapValues(value -> value.split(" ").length);
      * }
*

* This operation preserves data co-location with respect to the key. @@ -224,6 +346,38 @@ public interface KTable { */ KTable mapValues(final ValueMapper mapper); + /** + * Create a new {@code KTable} by transforming the value of each record in this {@code KTable} into a new value + * (with possibly a new type) in the new {@code KTable}, with default serializers, deserializers, and state store. + * For each {@code KTable} update the provided {@link ValueMapper} is applied to the value of the updated record and + * computes a new value for it, resulting in an updated record for the result {@code KTable}. + * Thus, an input record {@code } can be transformed into an output record {@code }. + * This is a stateless record-by-record operation. + *

+ * The example below counts the number of token of the value string. + *

{@code
+     * KTable inputTable = builder.table("topic");
+     * KTable outputTable = inputTable.mapValues(value -> value.split(" ").length, Named.as("countTokenValue"));
+     * }
+ *

+ * This operation preserves data co-location with respect to the key. + * Thus, no internal data redistribution is required if a key based operator (like a join) is applied to + * the result {@code KTable}. + *

+ * Note that {@code mapValues} for a changelog stream works differently than {@link KStream#mapValues(ValueMapper) + * record stream filters}, because {@link KeyValue records} with {@code null} values (so-called tombstone records) + * have delete semantics. + * Thus, for tombstones the provided value-mapper is not evaluated but the tombstone record is forwarded directly to + * delete the corresponding record in the result {@code KTable}. + * + * @param mapper a {@link ValueMapper} that computes a new output value + * @param named a {@link Named} config used to name the processor in the topology + * @param the value type of the result {@code KTable} + * @return a {@code KTable} that contains records with unmodified keys and new values (possibly of different type) + */ + KTable mapValues(final ValueMapper mapper, + final Named named); + /** * Create a new {@code KTable} by transforming the value of each record in this {@code KTable} into a new value * (with possibly a new type) in the new {@code KTable}, with default serializers, deserializers, and state store. @@ -235,11 +389,8 @@ public interface KTable { * The example below counts the number of token of value and key strings. *

{@code
      * KTable inputTable = builder.table("topic");
-     * KTable outputTable = inputTable.mapValue(new ValueMapperWithKey {
-     *     Integer apply(String readOnlyKey, String value) {
-     *          return readOnlyKey.split(" ").length + value.split(" ").length;
-     *     }
-     * });
+     * KTable outputTable =
+     *  inputTable.mapValues((readOnlyKey, value) -> readOnlyKey.split(" ").length + value.split(" ").length);
      * }
*

* Note that the key is read-only and should not be modified, as this can lead to corrupt partitioning. @@ -259,6 +410,86 @@ public interface KTable { */ KTable mapValues(final ValueMapperWithKey mapper); + /** + * Create a new {@code KTable} by transforming the value of each record in this {@code KTable} into a new value + * (with possibly a new type) in the new {@code KTable}, with default serializers, deserializers, and state store. + * For each {@code KTable} update the provided {@link ValueMapperWithKey} is applied to the value of the update + * record and computes a new value for it, resulting in an updated record for the result {@code KTable}. + * Thus, an input record {@code } can be transformed into an output record {@code }. + * This is a stateless record-by-record operation. + *

+ * The example below counts the number of token of value and key strings. + *

{@code
+     * KTable inputTable = builder.table("topic");
+     * KTable outputTable =
+     *  inputTable.mapValues((readOnlyKey, value) -> readOnlyKey.split(" ").length + value.split(" ").length, Named.as("countTokenValueAndKey"));
+     * }
+ *

+ * Note that the key is read-only and should not be modified, as this can lead to corrupt partitioning. + * This operation preserves data co-location with respect to the key. + * Thus, no internal data redistribution is required if a key based operator (like a join) is applied to + * the result {@code KTable}. + *

+ * Note that {@code mapValues} for a changelog stream works differently than {@link KStream#mapValues(ValueMapperWithKey) + * record stream filters}, because {@link KeyValue records} with {@code null} values (so-called tombstone records) + * have delete semantics. + * Thus, for tombstones the provided value-mapper is not evaluated but the tombstone record is forwarded directly to + * delete the corresponding record in the result {@code KTable}. + * + * @param mapper a {@link ValueMapperWithKey} that computes a new output value + * @param named a {@link Named} config used to name the processor in the topology + * @param the value type of the result {@code KTable} + * @return a {@code KTable} that contains records with unmodified keys and new values (possibly of different type) + */ + KTable mapValues(final ValueMapperWithKey mapper, + final Named named); + + /** + * Create a new {@code KTable} by transforming the value of each record in this {@code KTable} into a new value + * (with possibly a new type) in the new {@code KTable}, with the {@link Serde key serde}, {@link Serde value serde}, + * and the underlying {@link KeyValueStore materialized state storage} configured in the {@link Materialized} + * instance. + * For each {@code KTable} update the provided {@link ValueMapper} is applied to the value of the updated record and + * computes a new value for it, resulting in an updated record for the result {@code KTable}. + * Thus, an input record {@code } can be transformed into an output record {@code }. + * This is a stateless record-by-record operation. + *

+ * The example below counts the number of token of the value string. + *

{@code
+     * KTable inputTable = builder.table("topic");
+     * KTable outputTable = inputTable.mapValue(new ValueMapper {
+     *     Integer apply(String value) {
+     *         return value.split(" ").length;
+     *     }
+     * });
+     * }
+ *

+ * To query the local {@link KeyValueStore} representing outputTable above it must be obtained via + * {@link KafkaStreams#store(String, QueryableStoreType) KafkaStreams#store(...)}: + * For non-local keys, a custom RPC mechanism must be implemented using {@link KafkaStreams#allMetadata()} to + * query the value of the key on a parallel running instance of your Kafka Streams application. + * The store name to query with is specified by {@link Materialized#as(String)} or {@link Materialized#as(KeyValueBytesStoreSupplier)}. + *

+ * This operation preserves data co-location with respect to the key. + * Thus, no internal data redistribution is required if a key based operator (like a join) is applied to + * the result {@code KTable}. + *

+ * Note that {@code mapValues} for a changelog stream works differently than {@link KStream#mapValues(ValueMapper) + * record stream filters}, because {@link KeyValue records} with {@code null} values (so-called tombstone records) + * have delete semantics. + * Thus, for tombstones the provided value-mapper is not evaluated but the tombstone record is forwarded directly to + * delete the corresponding record in the result {@code KTable}. + * + * @param mapper a {@link ValueMapper} that computes a new output value + * @param materialized a {@link Materialized} that describes how the {@link StateStore} for the resulting {@code KTable} + * should be materialized. Cannot be {@code null} + * @param the value type of the result {@code KTable} + * + * @return a {@code KTable} that contains records with unmodified keys and new values (possibly of different type) + */ + KTable mapValues(final ValueMapper mapper, + final Materialized> materialized); + /** * Create a new {@code KTable} by transforming the value of each record in this {@code KTable} into a new value * (with possibly a new type) in the new {@code KTable}, with the {@link Serde key serde}, {@link Serde value serde}, @@ -296,6 +527,7 @@ public interface KTable { * delete the corresponding record in the result {@code KTable}. * * @param mapper a {@link ValueMapper} that computes a new output value + * @param named a {@link Named} config used to name the processor in the topology * @param materialized a {@link Materialized} that describes how the {@link StateStore} for the resulting {@code KTable} * should be materialized. Cannot be {@code null} * @param the value type of the result {@code KTable} @@ -303,6 +535,7 @@ public interface KTable { * @return a {@code KTable} that contains records with unmodified keys and new values (possibly of different type) */ KTable mapValues(final ValueMapper mapper, + final Named named, final Materialized> materialized); /** @@ -353,22 +586,83 @@ public interface KTable { final Materialized> materialized); /** - * Convert this changelog stream to a {@link KStream}. - *

- * Note that this is a logical operation and only changes the "interpretation" of the stream, i.e., each record of - * this changelog stream is no longer treated as an updated record (cf. {@link KStream} vs {@code KTable}). - * - * @return a {@link KStream} that contains the same records as this {@code KTable} - */ - KStream toStream(); - - /** - * Convert this changelog stream to a {@link KStream} using the given {@link KeyValueMapper} to select the new key. + * Create a new {@code KTable} by transforming the value of each record in this {@code KTable} into a new value + * (with possibly a new type) in the new {@code KTable}, with the {@link Serde key serde}, {@link Serde value serde}, + * and the underlying {@link KeyValueStore materialized state storage} configured in the {@link Materialized} + * instance. + * For each {@code KTable} update the provided {@link ValueMapperWithKey} is applied to the value of the update + * record and computes a new value for it, resulting in an updated record for the result {@code KTable}. + * Thus, an input record {@code } can be transformed into an output record {@code }. + * This is a stateless record-by-record operation. *

- * For example, you can compute the new key as the length of the value string. + * The example below counts the number of token of value and key strings. *

{@code
-     * KTable table = builder.table("topic");
-     * KTable keyedStream = table.toStream(new KeyValueMapper {
+     * KTable inputTable = builder.table("topic");
+     * KTable outputTable = inputTable.mapValue(new ValueMapperWithKey {
+     *     Integer apply(String readOnlyKey, String value) {
+     *          return readOnlyKey.split(" ").length + value.split(" ").length;
+     *     }
+     * });
+     * }
+ *

+ * To query the local {@link KeyValueStore} representing outputTable above it must be obtained via + * {@link KafkaStreams#store(String, QueryableStoreType) KafkaStreams#store(...)}: + * For non-local keys, a custom RPC mechanism must be implemented using {@link KafkaStreams#allMetadata()} to + * query the value of the key on a parallel running instance of your Kafka Streams application. + * The store name to query with is specified by {@link Materialized#as(String)} or {@link Materialized#as(KeyValueBytesStoreSupplier)}. + *

+ * Note that the key is read-only and should not be modified, as this can lead to corrupt partitioning. + * This operation preserves data co-location with respect to the key. + * Thus, no internal data redistribution is required if a key based operator (like a join) is applied to + * the result {@code KTable}. + *

+ * Note that {@code mapValues} for a changelog stream works differently than {@link KStream#mapValues(ValueMapper) + * record stream filters}, because {@link KeyValue records} with {@code null} values (so-called tombstone records) + * have delete semantics. + * Thus, for tombstones the provided value-mapper is not evaluated but the tombstone record is forwarded directly to + * delete the corresponding record in the result {@code KTable}. + * + * @param mapper a {@link ValueMapperWithKey} that computes a new output value + * @param named a {@link Named} config used to name the processor in the topology + * @param materialized a {@link Materialized} that describes how the {@link StateStore} for the resulting {@code KTable} + * should be materialized. Cannot be {@code null} + * @param the value type of the result {@code KTable} + * + * @return a {@code KTable} that contains records with unmodified keys and new values (possibly of different type) + */ + KTable mapValues(final ValueMapperWithKey mapper, + final Named named, + final Materialized> materialized); + + /** + * Convert this changelog stream to a {@link KStream}. + *

+ * Note that this is a logical operation and only changes the "interpretation" of the stream, i.e., each record of + * this changelog stream is no longer treated as an updated record (cf. {@link KStream} vs {@code KTable}). + * + * @return a {@link KStream} that contains the same records as this {@code KTable} + */ + KStream toStream(); + + /** + * Convert this changelog stream to a {@link KStream}. + *

+ * Note that this is a logical operation and only changes the "interpretation" of the stream, i.e., each record of + * this changelog stream is no longer treated as an updated record (cf. {@link KStream} vs {@code KTable}). + * + * @param named a {@link Named} config used to name the processor in the topology + * + * @return a {@link KStream} that contains the same records as this {@code KTable} + */ + KStream toStream(final Named named); + + /** + * Convert this changelog stream to a {@link KStream} using the given {@link KeyValueMapper} to select the new key. + *

+ * For example, you can compute the new key as the length of the value string. + *

{@code
+     * KTable table = builder.table("topic");
+     * KTable keyedStream = table.toStream(new KeyValueMapper {
      *     Integer apply(String key, String value) {
      *         return value.length();
      *     }
@@ -389,6 +683,35 @@ public interface KTable {
      */
      KStream toStream(final KeyValueMapper mapper);
 
+    /**
+     * Convert this changelog stream to a {@link KStream} using the given {@link KeyValueMapper} to select the new key.
+     * 

+ * For example, you can compute the new key as the length of the value string. + *

{@code
+     * KTable table = builder.table("topic");
+     * KTable keyedStream = table.toStream(new KeyValueMapper {
+     *     Integer apply(String key, String value) {
+     *         return value.length();
+     *     }
+     * });
+     * }
+ * Setting a new key might result in an internal data redistribution if a key based operator (like an aggregation or + * join) is applied to the result {@link KStream}. + *

+ * This operation is equivalent to calling + * {@code table.}{@link #toStream() toStream}{@code ().}{@link KStream#selectKey(KeyValueMapper) selectKey(KeyValueMapper)}. + *

+ * Note that {@link #toStream()} is a logical operation and only changes the "interpretation" of the stream, i.e., + * each record of this changelog stream is no longer treated as an updated record (cf. {@link KStream} vs {@code KTable}). + * + * @param mapper a {@link KeyValueMapper} that computes a new key for each record + * @param named a {@link Named} config used to name the processor in the topology + * @param the new key type of the result stream + * @return a {@link KStream} that contains the same records as this {@code KTable} + */ + KStream toStream(final KeyValueMapper mapper, + final Named named); + /** * Suppress some updates from this changelog stream, determined by the supplied {@link Suppressed} configuration. * @@ -472,6 +795,160 @@ public interface KTable { KTable transformValues(final ValueTransformerWithKeySupplier transformerSupplier, final String... stateStoreNames); + /** + * Create a new {@code KTable} by transforming the value of each record in this {@code KTable} into a new value + * (with possibly a new type), with default serializers, deserializers, and state store. + * A {@link ValueTransformerWithKey} (provided by the given {@link ValueTransformerWithKeySupplier}) is applied to each input + * record value and computes a new value for it. + * Thus, an input record {@code } can be transformed into an output record {@code }. + * This is similar to {@link #mapValues(ValueMapperWithKey)}, but more flexible, allowing access to additional state-stores, + * and access to the {@link ProcessorContext}. + * Furthermore, via {@link org.apache.kafka.streams.processor.Punctuator#punctuate(long)} the processing progress can be observed and additional + * periodic actions can be performed. + *

+ * If the downstream topology uses aggregation functions, (e.g. {@link KGroupedTable#reduce}, {@link KGroupedTable#aggregate}, etc), + * care must be taken when dealing with state, (either held in state-stores or transformer instances), to ensure correct aggregate results. + * In contrast, if the resulting KTable is materialized, (cf. {@link #transformValues(ValueTransformerWithKeySupplier, Materialized, String...)}), + * such concerns are handled for you. + *

+ * In order to assign a state, the state must be created and registered beforehand: + *

{@code
+     * // create store
+     * StoreBuilder> keyValueStoreBuilder =
+     *         Stores.keyValueStoreBuilder(Stores.persistentKeyValueStore("myValueTransformState"),
+     *                 Serdes.String(),
+     *                 Serdes.String());
+     * // register store
+     * builder.addStateStore(keyValueStoreBuilder);
+     *
+     * KTable outputTable = inputTable.transformValues(new ValueTransformerWithKeySupplier() { ... }, "myValueTransformState");
+     * }
+ *

+ * Within the {@link ValueTransformerWithKey}, the state is obtained via the + * {@link ProcessorContext}. + * To trigger periodic actions via {@link org.apache.kafka.streams.processor.Punctuator#punctuate(long) punctuate()}, + * a schedule must be registered. + *

{@code
+     * new ValueTransformerWithKeySupplier() {
+     *     ValueTransformerWithKey get() {
+     *         return new ValueTransformerWithKey() {
+     *             private KeyValueStore state;
+     *
+     *             void init(ProcessorContext context) {
+     *                 this.state = (KeyValueStore)context.getStateStore("myValueTransformState");
+     *                 context.schedule(Duration.ofSeconds(1), PunctuationType.WALL_CLOCK_TIME, new Punctuator(..)); // punctuate each 1000ms, can access this.state
+     *             }
+     *
+     *             NewValueType transform(K readOnlyKey, V value) {
+     *                 // can access this.state and use read-only key
+     *                 return new NewValueType(readOnlyKey); // or null
+     *             }
+     *
+     *             void close() {
+     *                 // can access this.state
+     *             }
+     *         }
+     *     }
+     * }
+     * }
+ *

+ * Note that the key is read-only and should not be modified, as this can lead to corrupt partitioning. + * Setting a new value preserves data co-location with respect to the key. + * + * @param transformerSupplier a instance of {@link ValueTransformerWithKeySupplier} that generates a + * {@link ValueTransformerWithKey}. + * At least one transformer instance will be created per streaming task. + * Transformers do not need to be thread-safe. + * @param named a {@link Named} config used to name the processor in the topology + * @param stateStoreNames the names of the state stores used by the processor + * @param the value type of the result table + * @return a {@code KTable} that contains records with unmodified key and new values (possibly of different type) + * @see #mapValues(ValueMapper) + * @see #mapValues(ValueMapperWithKey) + */ + KTable transformValues(final ValueTransformerWithKeySupplier transformerSupplier, + final Named named, + final String... stateStoreNames); + + /** + * Create a new {@code KTable} by transforming the value of each record in this {@code KTable} into a new value + * (with possibly a new type), with the {@link Serde key serde}, {@link Serde value serde}, and the underlying + * {@link KeyValueStore materialized state storage} configured in the {@link Materialized} instance. + * A {@link ValueTransformerWithKey} (provided by the given {@link ValueTransformerWithKeySupplier}) is applied to each input + * record value and computes a new value for it. + * This is similar to {@link #mapValues(ValueMapperWithKey)}, but more flexible, allowing stateful, rather than stateless, + * record-by-record operation, access to additional state-stores, and access to the {@link ProcessorContext}. + * Furthermore, via {@link org.apache.kafka.streams.processor.Punctuator#punctuate(long)} the processing progress can be observed and additional + * periodic actions can be performed. + * The resulting {@code KTable} is materialized into another state store (additional to the provided state store names) + * as specified by the user via {@link Materialized} parameter, and is queryable through its given name. + *

+ * In order to assign a state, the state must be created and registered beforehand: + *

{@code
+     * // create store
+     * StoreBuilder> keyValueStoreBuilder =
+     *         Stores.keyValueStoreBuilder(Stores.persistentKeyValueStore("myValueTransformState"),
+     *                 Serdes.String(),
+     *                 Serdes.String());
+     * // register store
+     * builder.addStateStore(keyValueStoreBuilder);
+     *
+     * KTable outputTable = inputTable.transformValues(
+     *     new ValueTransformerWithKeySupplier() { ... },
+     *     Materialized.>as("outputTable")
+     *                                 .withKeySerde(Serdes.String())
+     *                                 .withValueSerde(Serdes.String()),
+     *     "myValueTransformState");
+     * }
+ *

+ * Within the {@link ValueTransformerWithKey}, the state is obtained via the + * {@link ProcessorContext}. + * To trigger periodic actions via {@link org.apache.kafka.streams.processor.Punctuator#punctuate(long) punctuate()}, + * a schedule must be registered. + *

{@code
+     * new ValueTransformerWithKeySupplier() {
+     *     ValueTransformerWithKey get() {
+     *         return new ValueTransformerWithKey() {
+     *             private KeyValueStore state;
+     *
+     *             void init(ProcessorContext context) {
+     *                 this.state = (KeyValueStore)context.getStateStore("myValueTransformState");
+     *                 context.schedule(Duration.ofSeconds(1), PunctuationType.WALL_CLOCK_TIME, new Punctuator(..)); // punctuate each 1000ms, can access this.state
+     *             }
+     *
+     *             NewValueType transform(K readOnlyKey, V value) {
+     *                 // can access this.state and use read-only key
+     *                 return new NewValueType(readOnlyKey); // or null
+     *             }
+     *
+     *             void close() {
+     *                 // can access this.state
+     *             }
+     *         }
+     *     }
+     * }
+     * }
+ *

+ * Note that the key is read-only and should not be modified, as this can lead to corrupt partitioning. + * Setting a new value preserves data co-location with respect to the key. + * + * @param transformerSupplier a instance of {@link ValueTransformerWithKeySupplier} that generates a + * {@link ValueTransformerWithKey}. + * At least one transformer instance will be created per streaming task. + * Transformers do not need to be thread-safe. + * @param materialized an instance of {@link Materialized} used to describe how the state store of the + * resulting table should be materialized. + * Cannot be {@code null} + * @param stateStoreNames the names of the state stores used by the processor + * @param the value type of the result table + * @return a {@code KTable} that contains records with unmodified key and new values (possibly of different type) + * @see #mapValues(ValueMapper) + * @see #mapValues(ValueMapperWithKey) + */ + KTable transformValues(final ValueTransformerWithKeySupplier transformerSupplier, + final Materialized> materialized, + final String... stateStoreNames); + /** * Create a new {@code KTable} by transforming the value of each record in this {@code KTable} into a new value * (with possibly a new type), with the {@link Serde key serde}, {@link Serde value serde}, and the underlying @@ -541,6 +1018,7 @@ public interface KTable { * @param materialized an instance of {@link Materialized} used to describe how the state store of the * resulting table should be materialized. * Cannot be {@code null} + * @param named a {@link Named} config used to name the processor in the topology * @param stateStoreNames the names of the state stores used by the processor * @param the value type of the result table * @return a {@code KTable} that contains records with unmodified key and new values (possibly of different type) @@ -549,6 +1027,7 @@ public interface KTable { */ KTable transformValues(final ValueTransformerWithKeySupplier transformerSupplier, final Materialized> materialized, + final Named named, final String... stateStoreNames); /** @@ -640,20 +1119,416 @@ public interface KTable { * records to and rereading all updated records from it, such that the resulting {@link KGroupedTable} is partitioned * on the new key. * - * @param selector a {@link KeyValueMapper} that computes a new grouping key and value to be aggregated - * @param grouped the {@link Grouped} instance used to specify {@link org.apache.kafka.common.serialization.Serdes} - * and the name for a repartition topic if repartitioning is required. - * @param the key type of the result {@link KGroupedTable} - * @param the value type of the result {@link KGroupedTable} - * @return a {@link KGroupedTable} that contains the re-grouped records of the original {@code KTable} + * @param selector a {@link KeyValueMapper} that computes a new grouping key and value to be aggregated + * @param grouped the {@link Grouped} instance used to specify {@link org.apache.kafka.common.serialization.Serdes} + * and the name for a repartition topic if repartitioning is required. + * @param the key type of the result {@link KGroupedTable} + * @param the value type of the result {@link KGroupedTable} + * @return a {@link KGroupedTable} that contains the re-grouped records of the original {@code KTable} + */ + KGroupedTable groupBy(final KeyValueMapper> selector, + final Grouped grouped); + + /** + * Join records of this {@code KTable} with another {@code KTable}'s records using non-windowed inner equi join, + * with default serializers, deserializers, and state store. + * The join is a primary key join with join attribute {@code thisKTable.key == otherKTable.key}. + * The result is an ever updating {@code KTable} that represents the current (i.e., processing time) result + * of the join. + *

+ * The join is computed by (1) updating the internal state of one {@code KTable} and (2) performing a lookup for a + * matching record in the current (i.e., processing time) internal state of the other {@code KTable}. + * This happens in a symmetric way, i.e., for each update of either {@code this} or the {@code other} input + * {@code KTable} the result gets updated. + *

+ * For each {@code KTable} record that finds a corresponding record in the other {@code KTable} the provided + * {@link ValueJoiner} will be called to compute a value (with arbitrary type) for the result record. + * The key of the result record is the same as for both joining input records. + *

+ * Note that {@link KeyValue records} with {@code null} values (so-called tombstone records) have delete semantics. + * Thus, for input tombstones the provided value-joiner is not called but a tombstone record is forwarded + * directly to delete a record in the result {@code KTable} if required (i.e., if there is anything to be deleted). + *

+ * Input records with {@code null} key will be dropped and no join computation is performed. + *

+ * Example: + * + * + * + * + * + * + * + * + * + * + * + * + * + * + * + * + * + * + * + * + * + * + * + * + * + * + * + * + * + * + * + * + * + * + * + * + *
thisKTablethisStateotherKTableotherStateresult updated record
<K1:A><K1:A>
<K1:A><K1:b><K1:b><K1:ValueJoiner(A,b)>
<K1:C><K1:C><K1:b><K1:ValueJoiner(C,b)>
<K1:C><K1:null><K1:null>
+ * Both input streams (or to be more precise, their underlying source topics) need to have the same number of + * partitions. + * + * @param other the other {@code KTable} to be joined with this {@code KTable} + * @param joiner a {@link ValueJoiner} that computes the join result for a pair of matching records + * @param the value type of the other {@code KTable} + * @param the value type of the result {@code KTable} + * @return a {@code KTable} that contains join-records for each key and values computed by the given + * {@link ValueJoiner}, one for each matched record-pair with the same key + * @see #leftJoin(KTable, ValueJoiner) + * @see #outerJoin(KTable, ValueJoiner) + */ + KTable join(final KTable other, + final ValueJoiner joiner); + + /** + * Join records of this {@code KTable} with another {@code KTable}'s records using non-windowed inner equi join, + * with default serializers, deserializers, and state store. + * The join is a primary key join with join attribute {@code thisKTable.key == otherKTable.key}. + * The result is an ever updating {@code KTable} that represents the current (i.e., processing time) result + * of the join. + *

+ * The join is computed by (1) updating the internal state of one {@code KTable} and (2) performing a lookup for a + * matching record in the current (i.e., processing time) internal state of the other {@code KTable}. + * This happens in a symmetric way, i.e., for each update of either {@code this} or the {@code other} input + * {@code KTable} the result gets updated. + *

+ * For each {@code KTable} record that finds a corresponding record in the other {@code KTable} the provided + * {@link ValueJoiner} will be called to compute a value (with arbitrary type) for the result record. + * The key of the result record is the same as for both joining input records. + *

+ * Note that {@link KeyValue records} with {@code null} values (so-called tombstone records) have delete semantics. + * Thus, for input tombstones the provided value-joiner is not called but a tombstone record is forwarded + * directly to delete a record in the result {@code KTable} if required (i.e., if there is anything to be deleted). + *

+ * Input records with {@code null} key will be dropped and no join computation is performed. + *

+ * Example: + * + * + * + * + * + * + * + * + * + * + * + * + * + * + * + * + * + * + * + * + * + * + * + * + * + * + * + * + * + * + * + * + * + * + * + * + *
thisKTablethisStateotherKTableotherStateresult updated record
<K1:A><K1:A>
<K1:A><K1:b><K1:b><K1:ValueJoiner(A,b)>
<K1:C><K1:C><K1:b><K1:ValueJoiner(C,b)>
<K1:C><K1:null><K1:null>
+ * Both input streams (or to be more precise, their underlying source topics) need to have the same number of + * partitions. + * + * @param other the other {@code KTable} to be joined with this {@code KTable} + * @param joiner a {@link ValueJoiner} that computes the join result for a pair of matching records + * @param named a {@link Named} config used to name the processor in the topology + * @param the value type of the other {@code KTable} + * @param the value type of the result {@code KTable} + * @return a {@code KTable} that contains join-records for each key and values computed by the given + * {@link ValueJoiner}, one for each matched record-pair with the same key + * @see #leftJoin(KTable, ValueJoiner) + * @see #outerJoin(KTable, ValueJoiner) + */ + KTable join(final KTable other, + final ValueJoiner joiner, + final Named named); + + /** + * Join records of this {@code KTable} with another {@code KTable}'s records using non-windowed inner equi join, + * with the {@link Materialized} instance for configuration of the {@link Serde key serde}, + * {@link Serde the result table's value serde}, and {@link KeyValueStore state store}. + * The join is a primary key join with join attribute {@code thisKTable.key == otherKTable.key}. + * The result is an ever updating {@code KTable} that represents the current (i.e., processing time) result + * of the join. + *

+ * The join is computed by (1) updating the internal state of one {@code KTable} and (2) performing a lookup for a + * matching record in the current (i.e., processing time) internal state of the other {@code KTable}. + * This happens in a symmetric way, i.e., for each update of either {@code this} or the {@code other} input + * {@code KTable} the result gets updated. + *

+ * For each {@code KTable} record that finds a corresponding record in the other {@code KTable} the provided + * {@link ValueJoiner} will be called to compute a value (with arbitrary type) for the result record. + * The key of the result record is the same as for both joining input records. + *

+ * Note that {@link KeyValue records} with {@code null} values (so-called tombstone records) have delete semantics. + * Thus, for input tombstones the provided value-joiner is not called but a tombstone record is forwarded + * directly to delete a record in the result {@code KTable} if required (i.e., if there is anything to be deleted). + *

+ * Input records with {@code null} key will be dropped and no join computation is performed. + *

+ * Example: + * + * + * + * + * + * + * + * + * + * + * + * + * + * + * + * + * + * + * + * + * + * + * + * + * + * + * + * + * + * + * + * + * + * + * + * + *
thisKTablethisStateotherKTableotherStateresult updated record
<K1:A><K1:A>
<K1:A><K1:b><K1:b><K1:ValueJoiner(A,b)>
<K1:C><K1:C><K1:b><K1:ValueJoiner(C,b)>
<K1:C><K1:null><K1:null>
+ * Both input streams (or to be more precise, their underlying source topics) need to have the same number of + * partitions. + * + * @param other the other {@code KTable} to be joined with this {@code KTable} + * @param joiner a {@link ValueJoiner} that computes the join result for a pair of matching records + * @param materialized an instance of {@link Materialized} used to describe how the state store should be materialized. + * Cannot be {@code null} + * @param the value type of the other {@code KTable} + * @param the value type of the result {@code KTable} + * @return a {@code KTable} that contains join-records for each key and values computed by the given + * {@link ValueJoiner}, one for each matched record-pair with the same key + * @see #leftJoin(KTable, ValueJoiner, Materialized) + * @see #outerJoin(KTable, ValueJoiner, Materialized) + */ + KTable join(final KTable other, + final ValueJoiner joiner, + final Materialized> materialized); + + /** + * Join records of this {@code KTable} with another {@code KTable}'s records using non-windowed inner equi join, + * with the {@link Materialized} instance for configuration of the {@link Serde key serde}, + * {@link Serde the result table's value serde}, and {@link KeyValueStore state store}. + * The join is a primary key join with join attribute {@code thisKTable.key == otherKTable.key}. + * The result is an ever updating {@code KTable} that represents the current (i.e., processing time) result + * of the join. + *

+ * The join is computed by (1) updating the internal state of one {@code KTable} and (2) performing a lookup for a + * matching record in the current (i.e., processing time) internal state of the other {@code KTable}. + * This happens in a symmetric way, i.e., for each update of either {@code this} or the {@code other} input + * {@code KTable} the result gets updated. + *

+ * For each {@code KTable} record that finds a corresponding record in the other {@code KTable} the provided + * {@link ValueJoiner} will be called to compute a value (with arbitrary type) for the result record. + * The key of the result record is the same as for both joining input records. + *

+ * Note that {@link KeyValue records} with {@code null} values (so-called tombstone records) have delete semantics. + * Thus, for input tombstones the provided value-joiner is not called but a tombstone record is forwarded + * directly to delete a record in the result {@code KTable} if required (i.e., if there is anything to be deleted). + *

+ * Input records with {@code null} key will be dropped and no join computation is performed. + *

+ * Example: + * + * + * + * + * + * + * + * + * + * + * + * + * + * + * + * + * + * + * + * + * + * + * + * + * + * + * + * + * + * + * + * + * + * + * + * + *
thisKTablethisStateotherKTableotherStateresult updated record
<K1:A><K1:A>
<K1:A><K1:b><K1:b><K1:ValueJoiner(A,b)>
<K1:C><K1:C><K1:b><K1:ValueJoiner(C,b)>
<K1:C><K1:null><K1:null>
+ * Both input streams (or to be more precise, their underlying source topics) need to have the same number of + * partitions. + * + * @param other the other {@code KTable} to be joined with this {@code KTable} + * @param joiner a {@link ValueJoiner} that computes the join result for a pair of matching records + * @param named a {@link Named} config used to name the processor in the topology + * @param materialized an instance of {@link Materialized} used to describe how the state store should be materialized. + * Cannot be {@code null} + * @param the value type of the other {@code KTable} + * @param the value type of the result {@code KTable} + * @return a {@code KTable} that contains join-records for each key and values computed by the given + * {@link ValueJoiner}, one for each matched record-pair with the same key + * @see #leftJoin(KTable, ValueJoiner, Materialized) + * @see #outerJoin(KTable, ValueJoiner, Materialized) + */ + KTable join(final KTable other, + final ValueJoiner joiner, + final Named named, + final Materialized> materialized); + + /** + * Join records of this {@code KTable} (left input) with another {@code KTable}'s (right input) records using + * non-windowed left equi join, with default serializers, deserializers, and state store. + * The join is a primary key join with join attribute {@code thisKTable.key == otherKTable.key}. + * In contrast to {@link #join(KTable, ValueJoiner) inner-join}, all records from left {@code KTable} will produce + * an output record (cf. below). + * The result is an ever updating {@code KTable} that represents the current (i.e., processing time) result + * of the join. + *

+ * The join is computed by (1) updating the internal state of one {@code KTable} and (2) performing a lookup for a + * matching record in the current (i.e., processing time) internal state of the other {@code KTable}. + * This happens in a symmetric way, i.e., for each update of either {@code this} or the {@code other} input + * {@code KTable} the result gets updated. + *

+ * For each {@code KTable} record that finds a corresponding record in the other {@code KTable}'s state the + * provided {@link ValueJoiner} will be called to compute a value (with arbitrary type) for the result record. + * Additionally, for each record of left {@code KTable} that does not find a corresponding record in the + * right {@code KTable}'s state the provided {@link ValueJoiner} will be called with {@code rightValue = + * null} to compute a value (with arbitrary type) for the result record. + * The key of the result record is the same as for both joining input records. + *

+ * Note that {@link KeyValue records} with {@code null} values (so-called tombstone records) have delete semantics. + * For example, for left input tombstones the provided value-joiner is not called but a tombstone record is + * forwarded directly to delete a record in the result {@code KTable} if required (i.e., if there is anything to be + * deleted). + *

+ * Input records with {@code null} key will be dropped and no join computation is performed. + *

+ * Example: + * + * + * + * + * + * + * + * + * + * + * + * + * + * + * + * + * + * + * + * + * + * + * + * + * + * + * + * + * + * + * + * + * + * + * + * + *
thisKTablethisStateotherKTableotherStateresult updated record
<K1:A><K1:A><K1:ValueJoiner(A,null)>
<K1:A><K1:b><K1:b><K1:ValueJoiner(A,b)>
<K1:null><K1:b><K1:null>
<K1:null>
+ * Both input streams (or to be more precise, their underlying source topics) need to have the same number of + * partitions. + * + * @param other the other {@code KTable} to be joined with this {@code KTable} + * @param joiner a {@link ValueJoiner} that computes the join result for a pair of matching records + * @param the value type of the other {@code KTable} + * @param the value type of the result {@code KTable} + * @return a {@code KTable} that contains join-records for each key and values computed by the given + * {@link ValueJoiner}, one for each matched record-pair with the same key plus one for each non-matching record of + * left {@code KTable} + * @see #join(KTable, ValueJoiner) + * @see #outerJoin(KTable, ValueJoiner) */ - KGroupedTable groupBy(final KeyValueMapper> selector, - final Grouped grouped); + KTable leftJoin(final KTable other, + final ValueJoiner joiner); /** - * Join records of this {@code KTable} with another {@code KTable}'s records using non-windowed inner equi join, - * with default serializers, deserializers, and state store. + * Join records of this {@code KTable} (left input) with another {@code KTable}'s (right input) records using + * non-windowed left equi join, with default serializers, deserializers, and state store. * The join is a primary key join with join attribute {@code thisKTable.key == otherKTable.key}. + * In contrast to {@link #join(KTable, ValueJoiner) inner-join}, all records from left {@code KTable} will produce + * an output record (cf. below). * The result is an ever updating {@code KTable} that represents the current (i.e., processing time) result * of the join. *

@@ -662,13 +1537,17 @@ public interface KTable { * This happens in a symmetric way, i.e., for each update of either {@code this} or the {@code other} input * {@code KTable} the result gets updated. *

- * For each {@code KTable} record that finds a corresponding record in the other {@code KTable} the provided - * {@link ValueJoiner} will be called to compute a value (with arbitrary type) for the result record. + * For each {@code KTable} record that finds a corresponding record in the other {@code KTable}'s state the + * provided {@link ValueJoiner} will be called to compute a value (with arbitrary type) for the result record. + * Additionally, for each record of left {@code KTable} that does not find a corresponding record in the + * right {@code KTable}'s state the provided {@link ValueJoiner} will be called with {@code rightValue = + * null} to compute a value (with arbitrary type) for the result record. * The key of the result record is the same as for both joining input records. *

* Note that {@link KeyValue records} with {@code null} values (so-called tombstone records) have delete semantics. - * Thus, for input tombstones the provided value-joiner is not called but a tombstone record is forwarded - * directly to delete a record in the result {@code KTable} if required (i.e., if there is anything to be deleted). + * For example, for left input tombstones the provided value-joiner is not called but a tombstone record is + * forwarded directly to delete a record in the result {@code KTable} if required (i.e., if there is anything to be + * deleted). *

* Input records with {@code null} key will be dropped and no join computation is performed. *

@@ -686,7 +1565,7 @@ public interface KTable { * <K1:A> * * - * + * <K1:ValueJoiner(A,null)> * * * @@ -696,18 +1575,18 @@ public interface KTable { * <K1:ValueJoiner(A,b)> * * - * <K1:C> - * <K1:C> + * <K1:null> + * * * <K1:b> - * <K1:ValueJoiner(C,b)> + * <K1:null> * * * - * <K1:C> - * <K1:null> * * <K1:null> + * + * * * * Both input streams (or to be more precise, their underlying source topics) need to have the same number of @@ -715,21 +1594,26 @@ public interface KTable { * * @param other the other {@code KTable} to be joined with this {@code KTable} * @param joiner a {@link ValueJoiner} that computes the join result for a pair of matching records + * @param named a {@link Named} config used to name the processor in the topology * @param the value type of the other {@code KTable} * @param the value type of the result {@code KTable} * @return a {@code KTable} that contains join-records for each key and values computed by the given - * {@link ValueJoiner}, one for each matched record-pair with the same key - * @see #leftJoin(KTable, ValueJoiner) + * {@link ValueJoiner}, one for each matched record-pair with the same key plus one for each non-matching record of + * left {@code KTable} + * @see #join(KTable, ValueJoiner) * @see #outerJoin(KTable, ValueJoiner) */ - KTable join(final KTable other, - final ValueJoiner joiner); + KTable leftJoin(final KTable other, + final ValueJoiner joiner, + final Named named); /** - * Join records of this {@code KTable} with another {@code KTable}'s records using non-windowed inner equi join, - * with the {@link Materialized} instance for configuration of the {@link Serde key serde}, + * Join records of this {@code KTable} (left input) with another {@code KTable}'s (right input) records using + * non-windowed left equi join, with the {@link Materialized} instance for configuration of the {@link Serde key serde}, * {@link Serde the result table's value serde}, and {@link KeyValueStore state store}. * The join is a primary key join with join attribute {@code thisKTable.key == otherKTable.key}. + * In contrast to {@link #join(KTable, ValueJoiner) inner-join}, all records from left {@code KTable} will produce + * an output record (cf. below). * The result is an ever updating {@code KTable} that represents the current (i.e., processing time) result * of the join. *

@@ -738,13 +1622,17 @@ public interface KTable { * This happens in a symmetric way, i.e., for each update of either {@code this} or the {@code other} input * {@code KTable} the result gets updated. *

- * For each {@code KTable} record that finds a corresponding record in the other {@code KTable} the provided - * {@link ValueJoiner} will be called to compute a value (with arbitrary type) for the result record. + * For each {@code KTable} record that finds a corresponding record in the other {@code KTable}'s state the + * provided {@link ValueJoiner} will be called to compute a value (with arbitrary type) for the result record. + * Additionally, for each record of left {@code KTable} that does not find a corresponding record in the + * right {@code KTable}'s state the provided {@link ValueJoiner} will be called with {@code rightValue = + * null} to compute a value (with arbitrary type) for the result record. * The key of the result record is the same as for both joining input records. *

* Note that {@link KeyValue records} with {@code null} values (so-called tombstone records) have delete semantics. - * Thus, for input tombstones the provided value-joiner is not called but a tombstone record is forwarded - * directly to delete a record in the result {@code KTable} if required (i.e., if there is anything to be deleted). + * For example, for left input tombstones the provided value-joiner is not called but a tombstone record is + * forwarded directly to delete a record in the result {@code KTable} if required (i.e., if there is anything to be + * deleted). *

* Input records with {@code null} key will be dropped and no join computation is performed. *

@@ -762,7 +1650,7 @@ public interface KTable { * <K1:A> * * - * + * <K1:ValueJoiner(A,null)> * * * @@ -772,18 +1660,18 @@ public interface KTable { * <K1:ValueJoiner(A,b)> * * - * <K1:C> - * <K1:C> + * <K1:null> + * * * <K1:b> - * <K1:ValueJoiner(C,b)> + * <K1:null> * * * - * <K1:C> - * <K1:null> * * <K1:null> + * + * * * * Both input streams (or to be more precise, their underlying source topics) need to have the same number of @@ -796,18 +1684,19 @@ public interface KTable { * @param the value type of the other {@code KTable} * @param the value type of the result {@code KTable} * @return a {@code KTable} that contains join-records for each key and values computed by the given - * {@link ValueJoiner}, one for each matched record-pair with the same key - * @see #leftJoin(KTable, ValueJoiner, Materialized) + * {@link ValueJoiner}, one for each matched record-pair with the same key plus one for each non-matching record of + * left {@code KTable} + * @see #join(KTable, ValueJoiner, Materialized) * @see #outerJoin(KTable, ValueJoiner, Materialized) */ - KTable join(final KTable other, - final ValueJoiner joiner, - final Materialized> materialized); - + KTable leftJoin(final KTable other, + final ValueJoiner joiner, + final Materialized> materialized); /** * Join records of this {@code KTable} (left input) with another {@code KTable}'s (right input) records using - * non-windowed left equi join, with default serializers, deserializers, and state store. + * non-windowed left equi join, with the {@link Materialized} instance for configuration of the {@link Serde key serde}, + * {@link Serde the result table's value serde}, and {@link KeyValueStore state store}. * The join is a primary key join with join attribute {@code thisKTable.key == otherKTable.key}. * In contrast to {@link #join(KTable, ValueJoiner) inner-join}, all records from left {@code KTable} will produce * an output record (cf. below). @@ -874,26 +1763,30 @@ public interface KTable { * Both input streams (or to be more precise, their underlying source topics) need to have the same number of * partitions. * - * @param other the other {@code KTable} to be joined with this {@code KTable} - * @param joiner a {@link ValueJoiner} that computes the join result for a pair of matching records - * @param the value type of the other {@code KTable} - * @param the value type of the result {@code KTable} + * @param other the other {@code KTable} to be joined with this {@code KTable} + * @param joiner a {@link ValueJoiner} that computes the join result for a pair of matching records + * @param named a {@link Named} config used to name the processor in the topology + * @param materialized an instance of {@link Materialized} used to describe how the state store should be materialized. + * Cannot be {@code null} + * @param the value type of the other {@code KTable} + * @param the value type of the result {@code KTable} * @return a {@code KTable} that contains join-records for each key and values computed by the given * {@link ValueJoiner}, one for each matched record-pair with the same key plus one for each non-matching record of * left {@code KTable} - * @see #join(KTable, ValueJoiner) - * @see #outerJoin(KTable, ValueJoiner) + * @see #join(KTable, ValueJoiner, Materialized) + * @see #outerJoin(KTable, ValueJoiner, Materialized) */ KTable leftJoin(final KTable other, - final ValueJoiner joiner); + final ValueJoiner joiner, + final Named named, + final Materialized> materialized); /** * Join records of this {@code KTable} (left input) with another {@code KTable}'s (right input) records using - * non-windowed left equi join, with the {@link Materialized} instance for configuration of the {@link Serde key serde}, - * {@link Serde the result table's value serde}, and {@link KeyValueStore state store}. + * non-windowed outer equi join, with default serializers, deserializers, and state store. * The join is a primary key join with join attribute {@code thisKTable.key == otherKTable.key}. - * In contrast to {@link #join(KTable, ValueJoiner) inner-join}, all records from left {@code KTable} will produce - * an output record (cf. below). + * In contrast to {@link #join(KTable, ValueJoiner) inner-join} or {@link #leftJoin(KTable, ValueJoiner) left-join}, + * all records from both input {@code KTable}s will produce an output record (cf. below). * The result is an ever updating {@code KTable} that represents the current (i.e., processing time) result * of the join. *

@@ -904,15 +1797,14 @@ public interface KTable { *

* For each {@code KTable} record that finds a corresponding record in the other {@code KTable}'s state the * provided {@link ValueJoiner} will be called to compute a value (with arbitrary type) for the result record. - * Additionally, for each record of left {@code KTable} that does not find a corresponding record in the - * right {@code KTable}'s state the provided {@link ValueJoiner} will be called with {@code rightValue = - * null} to compute a value (with arbitrary type) for the result record. + * Additionally, for each record that does not find a corresponding record in the corresponding other + * {@code KTable}'s state the provided {@link ValueJoiner} will be called with {@code null} value for the + * corresponding other value to compute a value (with arbitrary type) for the result record. * The key of the result record is the same as for both joining input records. *

* Note that {@link KeyValue records} with {@code null} values (so-called tombstone records) have delete semantics. - * For example, for left input tombstones the provided value-joiner is not called but a tombstone record is - * forwarded directly to delete a record in the result {@code KTable} if required (i.e., if there is anything to be - * deleted). + * Thus, for input tombstones the provided value-joiner is not called but a tombstone record is forwarded directly + * to delete a record in the result {@code KTable} if required (i.e., if there is anything to be deleted). *

* Input records with {@code null} key will be dropped and no join computation is performed. *

@@ -944,34 +1836,31 @@ public interface KTable { * * * <K1:b> - * <K1:null> + * <K1:ValueJoiner(null,b)> * * * * * <K1:null> * - * + * <K1:null> * * * Both input streams (or to be more precise, their underlying source topics) need to have the same number of * partitions. * - * @param other the other {@code KTable} to be joined with this {@code KTable} - * @param joiner a {@link ValueJoiner} that computes the join result for a pair of matching records - * @param materialized an instance of {@link Materialized} used to describe how the state store should be materialized. - * Cannot be {@code null} - * @param the value type of the other {@code KTable} - * @param the value type of the result {@code KTable} + * @param other the other {@code KTable} to be joined with this {@code KTable} + * @param joiner a {@link ValueJoiner} that computes the join result for a pair of matching records + * @param the value type of the other {@code KTable} + * @param the value type of the result {@code KTable} * @return a {@code KTable} that contains join-records for each key and values computed by the given * {@link ValueJoiner}, one for each matched record-pair with the same key plus one for each non-matching record of - * left {@code KTable} - * @see #join(KTable, ValueJoiner, Materialized) - * @see #outerJoin(KTable, ValueJoiner, Materialized) + * both {@code KTable}s + * @see #join(KTable, ValueJoiner) + * @see #leftJoin(KTable, ValueJoiner) */ - KTable leftJoin(final KTable other, - final ValueJoiner joiner, - final Materialized> materialized); + KTable outerJoin(final KTable other, + final ValueJoiner joiner); /** @@ -1044,6 +1933,7 @@ public interface KTable { * * @param other the other {@code KTable} to be joined with this {@code KTable} * @param joiner a {@link ValueJoiner} that computes the join result for a pair of matching records + * @param named a {@link Named} config used to name the processor in the topology * @param the value type of the other {@code KTable} * @param the value type of the result {@code KTable} * @return a {@code KTable} that contains join-records for each key and values computed by the given @@ -1053,7 +1943,94 @@ public interface KTable { * @see #leftJoin(KTable, ValueJoiner) */ KTable outerJoin(final KTable other, - final ValueJoiner joiner); + final ValueJoiner joiner, + final Named named); + + /** + * Join records of this {@code KTable} (left input) with another {@code KTable}'s (right input) records using + * non-windowed outer equi join, with the {@link Materialized} instance for configuration of the {@link Serde key serde}, + * {@link Serde the result table's value serde}, and {@link KeyValueStore state store}. + * The join is a primary key join with join attribute {@code thisKTable.key == otherKTable.key}. + * In contrast to {@link #join(KTable, ValueJoiner) inner-join} or {@link #leftJoin(KTable, ValueJoiner) left-join}, + * all records from both input {@code KTable}s will produce an output record (cf. below). + * The result is an ever updating {@code KTable} that represents the current (i.e., processing time) result + * of the join. + *

+ * The join is computed by (1) updating the internal state of one {@code KTable} and (2) performing a lookup for a + * matching record in the current (i.e., processing time) internal state of the other {@code KTable}. + * This happens in a symmetric way, i.e., for each update of either {@code this} or the {@code other} input + * {@code KTable} the result gets updated. + *

+ * For each {@code KTable} record that finds a corresponding record in the other {@code KTable}'s state the + * provided {@link ValueJoiner} will be called to compute a value (with arbitrary type) for the result record. + * Additionally, for each record that does not find a corresponding record in the corresponding other + * {@code KTable}'s state the provided {@link ValueJoiner} will be called with {@code null} value for the + * corresponding other value to compute a value (with arbitrary type) for the result record. + * The key of the result record is the same as for both joining input records. + *

+ * Note that {@link KeyValue records} with {@code null} values (so-called tombstone records) have delete semantics. + * Thus, for input tombstones the provided value-joiner is not called but a tombstone record is forwarded directly + * to delete a record in the result {@code KTable} if required (i.e., if there is anything to be deleted). + *

+ * Input records with {@code null} key will be dropped and no join computation is performed. + *

+ * Example: + * + * + * + * + * + * + * + * + * + * + * + * + * + * + * + * + * + * + * + * + * + * + * + * + * + * + * + * + * + * + * + * + * + * + * + * + *
thisKTablethisStateotherKTableotherStateresult updated record
<K1:A><K1:A><K1:ValueJoiner(A,null)>
<K1:A><K1:b><K1:b><K1:ValueJoiner(A,b)>
<K1:null><K1:b><K1:ValueJoiner(null,b)>
<K1:null><K1:null>
+ * Both input streams (or to be more precise, their underlying source topics) need to have the same number of + * partitions. + * + * @param other the other {@code KTable} to be joined with this {@code KTable} + * @param joiner a {@link ValueJoiner} that computes the join result for a pair of matching records + * @param materialized an instance of {@link Materialized} used to describe how the state store should be materialized. + * Cannot be {@code null} + * @param the value type of the other {@code KTable} + * @param the value type of the result {@code KTable} + * @return a {@code KTable} that contains join-records for each key and values computed by the given + * {@link ValueJoiner}, one for each matched record-pair with the same key plus one for each non-matching record of + * both {@code KTable}s + * @see #join(KTable, ValueJoiner) + * @see #leftJoin(KTable, ValueJoiner) + */ + KTable outerJoin(final KTable other, + final ValueJoiner joiner, + final Materialized> materialized); + /** * Join records of this {@code KTable} (left input) with another {@code KTable}'s (right input) records using @@ -1126,6 +2103,7 @@ public interface KTable { * * @param other the other {@code KTable} to be joined with this {@code KTable} * @param joiner a {@link ValueJoiner} that computes the join result for a pair of matching records + * @param named a {@link Named} config used to name the processor in the topology * @param materialized an instance of {@link Materialized} used to describe how the state store should be materialized. * Cannot be {@code null} * @param the value type of the other {@code KTable} @@ -1138,6 +2116,7 @@ public interface KTable { */ KTable outerJoin(final KTable other, final ValueJoiner joiner, + final Named named, final Materialized> materialized); /** diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableImpl.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableImpl.java index 666a109bbc0..4bc102a746d 100644 --- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableImpl.java +++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableImpl.java @@ -25,6 +25,7 @@ import org.apache.kafka.streams.kstream.KStream; import org.apache.kafka.streams.kstream.KTable; import org.apache.kafka.streams.kstream.KeyValueMapper; import org.apache.kafka.streams.kstream.Materialized; +import org.apache.kafka.streams.kstream.Named; import org.apache.kafka.streams.kstream.Predicate; import org.apache.kafka.streams.kstream.Suppressed; import org.apache.kafka.streams.kstream.ValueJoiner; @@ -113,6 +114,7 @@ public class KTableImpl extends AbstractStream implements KTable< } private KTable doFilter(final Predicate predicate, + final Named named, final MaterializedInternal> materializedInternal, final boolean filterNot) { final Serde keySerde; @@ -140,8 +142,7 @@ public class KTableImpl extends AbstractStream implements KTable< queryableStoreName = null; storeBuilder = null; } - - final String name = builder.newProcessorName(FILTER_NAME); + final String name = new NamedInternal(named).orElseGenerateWithPrefix(builder, FILTER_NAME); final KTableProcessorSupplier processorSupplier = new KTableFilter<>(this, predicate, filterNot, queryableStoreName); @@ -171,36 +172,64 @@ public class KTableImpl extends AbstractStream implements KTable< @Override public KTable filter(final Predicate predicate) { Objects.requireNonNull(predicate, "predicate can't be null"); - return doFilter(predicate, null, false); + return doFilter(predicate, NamedInternal.empty(), null, false); + } + + @Override + public KTable filter(final Predicate predicate, final Named named) { + Objects.requireNonNull(predicate, "predicate can't be null"); + return doFilter(predicate, named, null, false); } @Override public KTable filter(final Predicate predicate, + final Named named, final Materialized> materialized) { Objects.requireNonNull(predicate, "predicate can't be null"); Objects.requireNonNull(materialized, "materialized can't be null"); final MaterializedInternal> materializedInternal = new MaterializedInternal<>(materialized); - return doFilter(predicate, materializedInternal, false); + return doFilter(predicate, named, materializedInternal, false); + } + + @Override + public KTable filter(final Predicate predicate, + final Materialized> materialized) { + return filter(predicate, NamedInternal.empty(), materialized); } @Override public KTable filterNot(final Predicate predicate) { Objects.requireNonNull(predicate, "predicate can't be null"); - return doFilter(predicate, null, true); + return doFilter(predicate, NamedInternal.empty(), null, true); + } + + @Override + public KTable filterNot(final Predicate predicate, + final Named named) { + Objects.requireNonNull(predicate, "predicate can't be null"); + return doFilter(predicate, named, null, true); + } + + @Override + public KTable filterNot(final Predicate predicate, + final Materialized> materialized) { + return filterNot(predicate, NamedInternal.empty(), materialized); } @Override public KTable filterNot(final Predicate predicate, + final Named named, final Materialized> materialized) { Objects.requireNonNull(predicate, "predicate can't be null"); Objects.requireNonNull(materialized, "materialized can't be null"); final MaterializedInternal> materializedInternal = new MaterializedInternal<>(materialized); - - return doFilter(predicate, materializedInternal, true); + final NamedInternal renamed = new NamedInternal(named); + return doFilter(predicate, renamed, materializedInternal, true); } private KTable doMapValues(final ValueMapperWithKey mapper, + final Named named, final MaterializedInternal> materializedInternal) { final Serde keySerde; final Serde valueSerde; @@ -225,7 +254,7 @@ public class KTableImpl extends AbstractStream implements KTable< storeBuilder = null; } - final String name = builder.newProcessorName(MAPVALUES_NAME); + final String name = new NamedInternal(named).orElseGenerateWithPrefix(builder, MAPVALUES_NAME); final KTableProcessorSupplier processorSupplier = new KTableMapValues<>(this, mapper, queryableStoreName); @@ -260,54 +289,101 @@ public class KTableImpl extends AbstractStream implements KTable< @Override public KTable mapValues(final ValueMapper mapper) { Objects.requireNonNull(mapper, "mapper can't be null"); - return doMapValues(withKey(mapper), null); + return doMapValues(withKey(mapper), NamedInternal.empty(), null); + } + + @Override + public KTable mapValues(final ValueMapper mapper, + final Named named) { + Objects.requireNonNull(mapper, "mapper can't be null"); + return doMapValues(withKey(mapper), named, null); } @Override public KTable mapValues(final ValueMapperWithKey mapper) { Objects.requireNonNull(mapper, "mapper can't be null"); - return doMapValues(mapper, null); + return doMapValues(mapper, NamedInternal.empty(), null); + } + + @Override + public KTable mapValues(final ValueMapperWithKey mapper, + final Named named) { + Objects.requireNonNull(mapper, "mapper can't be null"); + return doMapValues(mapper, named, null); + } + + @Override + public KTable mapValues(final ValueMapper mapper, + final Materialized> materialized) { + return mapValues(mapper, NamedInternal.empty(), materialized); } @Override public KTable mapValues(final ValueMapper mapper, + final Named named, final Materialized> materialized) { Objects.requireNonNull(mapper, "mapper can't be null"); Objects.requireNonNull(materialized, "materialized can't be null"); final MaterializedInternal> materializedInternal = new MaterializedInternal<>(materialized); - return doMapValues(withKey(mapper), materializedInternal); + return doMapValues(withKey(mapper), named, materializedInternal); } @Override public KTable mapValues(final ValueMapperWithKey mapper, final Materialized> materialized) { + return mapValues(mapper, NamedInternal.empty(), materialized); + } + + @Override + public KTable mapValues(final ValueMapperWithKey mapper, + final Named named, + final Materialized> materialized) { Objects.requireNonNull(mapper, "mapper can't be null"); Objects.requireNonNull(materialized, "materialized can't be null"); final MaterializedInternal> materializedInternal = new MaterializedInternal<>(materialized); - return doMapValues(mapper, materializedInternal); + + return doMapValues(mapper, named, materializedInternal); + } + + @Override + public KTable transformValues(final ValueTransformerWithKeySupplier transformerSupplier, + final String... stateStoreNames) { + return doTransformValues(transformerSupplier, null, NamedInternal.empty(), stateStoreNames); } @Override public KTable transformValues(final ValueTransformerWithKeySupplier transformerSupplier, + final Named named, final String... stateStoreNames) { - return doTransformValues(transformerSupplier, null, stateStoreNames); + Objects.requireNonNull(named, "processorName can't be null"); + return doTransformValues(transformerSupplier, null, new NamedInternal(named), stateStoreNames); } @Override public KTable transformValues(final ValueTransformerWithKeySupplier transformerSupplier, final Materialized> materialized, final String... stateStoreNames) { + return transformValues(transformerSupplier, materialized, NamedInternal.empty(), stateStoreNames); + } + + @Override + public KTable transformValues(final ValueTransformerWithKeySupplier transformerSupplier, + final Materialized> materialized, + final Named named, + final String... stateStoreNames) { Objects.requireNonNull(materialized, "materialized can't be null"); + Objects.requireNonNull(named, "named can't be null"); final MaterializedInternal> materializedInternal = new MaterializedInternal<>(materialized); - return doTransformValues(transformerSupplier, materializedInternal, stateStoreNames); + return doTransformValues(transformerSupplier, materializedInternal, new NamedInternal(named), stateStoreNames); } private KTable doTransformValues(final ValueTransformerWithKeySupplier transformerSupplier, final MaterializedInternal> materializedInternal, + final NamedInternal namedInternal, final String... stateStoreNames) { Objects.requireNonNull(stateStoreNames, "stateStoreNames"); final Serde keySerde; @@ -331,7 +407,7 @@ public class KTableImpl extends AbstractStream implements KTable< storeBuilder = null; } - final String name = builder.newProcessorName(TRANSFORMVALUES_NAME); + final String name = namedInternal.orElseGenerateWithPrefix(builder, TRANSFORMVALUES_NAME); final KTableProcessorSupplier processorSupplier = new KTableTransformValues<>( this, @@ -364,8 +440,14 @@ public class KTableImpl extends AbstractStream implements KTable< @Override public KStream toStream() { - final String name = builder.newProcessorName(TOSTREAM_NAME); + return toStream(NamedInternal.empty()); + } + + @Override + public KStream toStream(final Named named) { + Objects.requireNonNull(named, "named can't be null"); + final String name = new NamedInternal(named).orElseGenerateWithPrefix(builder, TOSTREAM_NAME); final ProcessorSupplier> kStreamMapValues = new KStreamMapValues<>((key, change) -> change.newValue); final ProcessorParameters processorParameters = unsafeCastProcessorParametersToCompletelyDifferentType( new ProcessorParameters<>(kStreamMapValues, name) @@ -387,6 +469,12 @@ public class KTableImpl extends AbstractStream implements KTable< return toStream().selectKey(mapper); } + @Override + public KStream toStream(final KeyValueMapper mapper, + final Named named) { + return toStream(named).selectKey(mapper); + } + @Override public KTable suppress(final Suppressed suppressed) { final String name; @@ -407,8 +495,7 @@ public class KTableImpl extends AbstractStream implements KTable< storeName, this ); - - + final ProcessorGraphNode> node = new StatefulProcessorNode<>( name, new ProcessorParameters<>(suppressionSupplier, name), @@ -452,64 +539,112 @@ public class KTableImpl extends AbstractStream implements KTable< @Override public KTable join(final KTable other, final ValueJoiner joiner) { - return doJoin(other, joiner, null, false, false); + return doJoin(other, joiner, NamedInternal.empty(), null, false, false); + } + + @Override + public KTable join(final KTable other, + final ValueJoiner joiner, + final Named named) { + return doJoin(other, joiner, named, null, false, false); + } + + @Override + public KTable join(final KTable other, + final ValueJoiner joiner, + final Materialized> materialized) { + return join(other, joiner, NamedInternal.empty(), materialized); } @Override public KTable join(final KTable other, final ValueJoiner joiner, + final Named named, final Materialized> materialized) { Objects.requireNonNull(materialized, "materialized can't be null"); final MaterializedInternal> materializedInternal = new MaterializedInternal<>(materialized, builder, MERGE_NAME); - return doJoin(other, joiner, materializedInternal, false, false); + return doJoin(other, joiner, named, materializedInternal, false, false); } @Override public KTable outerJoin(final KTable other, final ValueJoiner joiner) { - return doJoin(other, joiner, null, true, true); + return outerJoin(other, joiner, NamedInternal.empty()); + } + + @Override + public KTable outerJoin(final KTable other, + final ValueJoiner joiner, + final Named named) { + return doJoin(other, joiner, named, null, true, true); + } + + @Override + public KTable outerJoin(final KTable other, + final ValueJoiner joiner, + final Materialized> materialized) { + return outerJoin(other, joiner, NamedInternal.empty(), materialized); } @Override public KTable outerJoin(final KTable other, final ValueJoiner joiner, + final Named named, final Materialized> materialized) { Objects.requireNonNull(materialized, "materialized can't be null"); final MaterializedInternal> materializedInternal = new MaterializedInternal<>(materialized, builder, MERGE_NAME); - return doJoin(other, joiner, materializedInternal, true, true); + return doJoin(other, joiner, named, materializedInternal, true, true); } @Override public KTable leftJoin(final KTable other, final ValueJoiner joiner) { - return doJoin(other, joiner, null, true, false); + return leftJoin(other, joiner, NamedInternal.empty()); + } + + @Override + public KTable leftJoin(final KTable other, + final ValueJoiner joiner, + final Named named) { + return doJoin(other, joiner, named, null, true, false); } @Override public KTable leftJoin(final KTable other, final ValueJoiner joiner, final Materialized> materialized) { + return leftJoin(other, joiner, NamedInternal.empty(), materialized); + } + + @Override + public KTable leftJoin(final KTable other, + final ValueJoiner joiner, + final Named named, + final Materialized> materialized) { Objects.requireNonNull(materialized, "materialized can't be null"); final MaterializedInternal> materializedInternal = new MaterializedInternal<>(materialized, builder, MERGE_NAME); - return doJoin(other, joiner, materializedInternal, true, false); + return doJoin(other, joiner, named, materializedInternal, true, false); } @SuppressWarnings("unchecked") private KTable doJoin(final KTable other, final ValueJoiner joiner, + final Named joinName, final MaterializedInternal> materializedInternal, final boolean leftOuter, final boolean rightOuter) { Objects.requireNonNull(other, "other can't be null"); Objects.requireNonNull(joiner, "joiner can't be null"); + Objects.requireNonNull(joinName, "joinName can't be null"); - final String joinMergeName = builder.newProcessorName(MERGE_NAME); + final NamedInternal renamed = new NamedInternal(joinName); + final String joinMergeName = renamed.orElseGenerateWithPrefix(builder, MERGE_NAME); final Set allSourceNodes = ensureJoinableWith((AbstractStream) other); if (leftOuter) { @@ -533,8 +668,8 @@ public class KTableImpl extends AbstractStream implements KTable< joinOther = new KTableKTableOuterJoin<>((KTableImpl) other, this, reverseJoiner(joiner)); } - final String joinThisName = builder.newProcessorName(JOINTHIS_NAME); - final String joinOtherName = builder.newProcessorName(JOINOTHER_NAME); + final String joinThisName = renamed.suffixWithOrElseGet("-join-this", builder, JOINTHIS_NAME); + final String joinOtherName = renamed.suffixWithOrElseGet("-join-other", builder, JOINOTHER_NAME); final ProcessorParameters> joinThisProcessorParameters = new ProcessorParameters<>(joinThis, joinThisName); final ProcessorParameters> joinOtherProcessorParameters = new ProcessorParameters<>(joinOther, joinOtherName); @@ -605,7 +740,8 @@ public class KTableImpl extends AbstractStream implements KTable< final Grouped grouped) { Objects.requireNonNull(selector, "selector can't be null"); Objects.requireNonNull(grouped, "grouped can't be null"); - final String selectName = builder.newProcessorName(SELECT_NAME); + final GroupedInternal groupedInternal = new GroupedInternal<>(grouped); + final String selectName = new NamedInternal(groupedInternal.name()).orElseGenerateWithPrefix(builder, SELECT_NAME); final KTableProcessorSupplier> selectSupplier = new KTableRepartitionMap<>(this, selector); final ProcessorParameters> processorParameters = new ProcessorParameters<>(selectSupplier, selectName); @@ -616,7 +752,6 @@ public class KTableImpl extends AbstractStream implements KTable< builder.addGraphNode(this.streamsGraphNode, groupByMapNode); this.enableSendingOldValues(); - final GroupedInternal groupedInternal = new GroupedInternal<>(grouped); return new KGroupedTableImpl<>( builder, selectName, diff --git a/streams/src/test/java/org/apache/kafka/streams/StreamsBuilderTest.java b/streams/src/test/java/org/apache/kafka/streams/StreamsBuilderTest.java index 49477b0fb11..68dd3ac4ac3 100644 --- a/streams/src/test/java/org/apache/kafka/streams/StreamsBuilderTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/StreamsBuilderTest.java @@ -705,6 +705,35 @@ public class StreamsBuilderTest { assertSpecifiedNameForOperation(topology, "KSTREAM-SOURCE-0000000000", STREAM_OPERATION_NAME); } + @Test + @SuppressWarnings("unchecked") + public void shouldUseSpecifiedNameForToStream() { + builder.table(STREAM_TOPIC) + .toStream(Named.as("to-stream")); + + builder.build(); + final ProcessorTopology topology = builder.internalTopologyBuilder.rewriteTopology(new StreamsConfig(props)).build(); + assertSpecifiedNameForOperation(topology, + "KSTREAM-SOURCE-0000000001", + "KTABLE-SOURCE-0000000002", + "to-stream"); + } + + @Test + @SuppressWarnings("unchecked") + public void shouldUseSpecifiedNameForToStreamWithMapper() { + builder.table(STREAM_TOPIC) + .toStream(KeyValue::pair, Named.as("to-stream")); + + builder.build(); + final ProcessorTopology topology = builder.internalTopologyBuilder.rewriteTopology(new StreamsConfig(props)).build(); + assertSpecifiedNameForOperation(topology, + "KSTREAM-SOURCE-0000000001", + "KTABLE-SOURCE-0000000002", + "to-stream", + "KSTREAM-KEY-SELECT-0000000004"); + } + private static void assertSpecifiedNameForOperation(final ProcessorTopology topology, final String... expected) { final List processors = topology.processors(); assertEquals("Invalid number of expected processors", expected.length, processors.size()); diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableImplTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableImplTest.java index e9f688bbf4d..f245fec380a 100644 --- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableImplTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableImplTest.java @@ -351,7 +351,7 @@ public class KTableImplTest { @Test(expected = NullPointerException.class) public void shouldNotAllowNullSelectorOnToStream() { - table.toStream(null); + table.toStream((KeyValueMapper) null); } @Test(expected = NullPointerException.class)