diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/KGroupedStream.java b/streams/src/main/java/org/apache/kafka/streams/kstream/KGroupedStream.java index 05e4ac967bc..121d0a4cb9f 100644 --- a/streams/src/main/java/org/apache/kafka/streams/kstream/KGroupedStream.java +++ b/streams/src/main/java/org/apache/kafka/streams/kstream/KGroupedStream.java @@ -146,7 +146,9 @@ public interface KGroupedStream { * * @param reducer a {@link Reducer} that computes a new aggregate result. Cannot be {@code null}. * @return a {@link KTable} that contains "update" records with unmodified keys, and values that represent the - * latest (rolling) aggregate for each key + * latest (rolling) aggregate for each key. If the reduce function returns {@code null}, it is then interpreted as + * deletion for the key, and future messages of the same key coming from upstream operators + * will be handled as newly initialized value. */ KTable reduce(final Reducer reducer); @@ -208,7 +210,9 @@ public interface KGroupedStream { * @param reducer a {@link Reducer} that computes a new aggregate result. Cannot be {@code null}. * @param materialized an instance of {@link Materialized} used to materialize a state store. Cannot be {@code null}. * @return a {@link KTable} that contains "update" records with unmodified keys, and values that represent the - * latest (rolling) aggregate for each key + * latest (rolling) aggregate for each key. If the reduce function returns {@code null}, it is then interpreted as + * deletion for the key, and future messages of the same key coming from upstream operators + * will be handled as newly initialized value. */ KTable reduce(final Reducer reducer, final Materialized> materialized); @@ -251,7 +255,9 @@ public interface KGroupedStream { * @param aggregator an {@link Aggregator} that computes a new aggregate result * @param the value type of the resulting {@link KTable} * @return a {@link KTable} that contains "update" records with unmodified keys, and values that represent the - * latest (rolling) aggregate for each key + * latest (rolling) aggregate for each key. If the aggregate function returns {@code null}, it is then interpreted as + * deletion for the key, and future messages of the same key coming from upstream operators + * will be handled as newly initialized value. */ KTable aggregate(final Initializer initializer, final Aggregator aggregator); @@ -308,7 +314,9 @@ public interface KGroupedStream { * @param materialized an instance of {@link Materialized} used to materialize a state store. Cannot be {@code null}. * @param the value type of the resulting {@link KTable} * @return a {@link KTable} that contains "update" records with unmodified keys, and values that represent the - * latest (rolling) aggregate for each key + * latest (rolling) aggregate for each key. If the aggregate function returns {@code null}, it is then interpreted as + * deletion for the key, and future messages of the same key coming from upstream operators + * will be handled as newly initialized value. */ KTable aggregate(final Initializer initializer, final Aggregator aggregator,