From 7429f49780a69d0c5eda8ef0c69a3209aecee11c Mon Sep 17 00:00:00 2001 From: "Michael G. Noll" Date: Fri, 30 Jun 2017 08:59:10 +0100 Subject: [PATCH] MINOR: reduce() javadocs: clarify position of arguments Author: Michael G. Noll Reviewers: Matthias J. Sax , Eno Thereska , Damian Guy Closes #2651 from miguno/trunk-reduce-javadocs --- .../kafka/streams/kstream/KGroupedStream.java | 66 +++++++++++++++++-- .../kafka/streams/kstream/KGroupedTable.java | 14 ++-- 2 files changed, 68 insertions(+), 12 deletions(-) diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/KGroupedStream.java b/streams/src/main/java/org/apache/kafka/streams/kstream/KGroupedStream.java index e02231afee9..e6faf8ce4bf 100644 --- a/streams/src/main/java/org/apache/kafka/streams/kstream/KGroupedStream.java +++ b/streams/src/main/java/org/apache/kafka/streams/kstream/KGroupedStream.java @@ -409,7 +409,16 @@ public interface KGroupedStream { * Furthermore, updates to the store are sent downstream into a {@link KTable} changelog stream. *

* The specified {@link Reducer} is applied for each input record and computes a new aggregate using the current - * aggregate and the record's value. + * aggregate (first argument) and the record's value (second argument): + *

{@code
+     * // At the example of a Reducer
+     * new Reducer() {
+     *   @Override
+     *   public Long apply(Long aggValue, Long currValue) {
+     *     return aggValue + currValue;
+     *   }
+     * }
+ *

* If there is no current aggregate the {@link Reducer} is not applied and the new aggregate will be the record's * value as-is. * Thus, {@code reduce(Reducer, String)} can be used to compute aggregate functions like sum, min, or max. @@ -461,7 +470,16 @@ public interface KGroupedStream { * Furthermore, updates to the store are sent downstream into a {@link KTable} changelog stream. *

* The specified {@link Reducer} is applied for each input record and computes a new aggregate using the current - * aggregate and the record's value. + * aggregate (first argument) and the record's value (second argument): + *

{@code
+     * // At the example of a Reducer
+     * new Reducer() {
+     *   @Override
+     *   public Long apply(Long aggValue, Long currValue) {
+     *     return aggValue + currValue;
+     *   }
+     * }
+ *

* If there is no current aggregate the {@link Reducer} is not applied and the new aggregate will be the record's * value as-is. * Thus, {@code reduce(Reducer, StateStoreSupplier)} can be used to compute aggregate functions like sum, min, or @@ -509,7 +527,16 @@ public interface KGroupedStream { * "windowed" implies that the {@link KTable} key is a combined key of the original record key and a window ID. *

* The specified {@link Reducer} is applied for each input record and computes a new aggregate using the current - * aggregate and the record's value. + * aggregate (first argument) and the record's value (second argument): + *

{@code
+     * // At the example of a Reducer
+     * new Reducer() {
+     *   @Override
+     *   public Long apply(Long aggValue, Long currValue) {
+     *     return aggValue + currValue;
+     *   }
+     * }
+ *

* If there is no current aggregate the {@link Reducer} is not applied and the new aggregate will be the record's * value as-is. * Thus, {@code reduce(Reducer, Windows, String)} can be used to compute aggregate functions like sum, min, or max. @@ -610,7 +637,16 @@ public interface KGroupedStream { * "windowed" implies that the {@link KTable} key is a combined key of the original record key and a window ID. *

* The specified {@link Reducer} is applied for each input record and computes a new aggregate using the current - * aggregate and the record's value. + * aggregate (first argument) and the record's value (second argument): + *

{@code
+     * // At the example of a Reducer
+     * new Reducer() {
+     *   @Override
+     *   public Long apply(Long aggValue, Long currValue) {
+     *     return aggValue + currValue;
+     *   }
+     * }
+ *

* If there is no current aggregate the {@link Reducer} is not applied and the new aggregate will be the record's * value as-is. * Thus, {@code reduce(Reducer, Windows, StateStoreSupplier)} can be used to compute aggregate functions like sum, @@ -660,7 +696,16 @@ public interface KGroupedStream { * "windowed" implies that the {@link KTable} key is a combined key of the original record key and a window ID. *

* The specified {@link Reducer} is applied for each input record and computes a new aggregate using the current - * aggregate and the record's value. + * aggregate (first argument) and the record's value (second argument): + *

{@code
+     * // At the example of a Reducer
+     * new Reducer() {
+     *   @Override
+     *   public Long apply(Long aggValue, Long currValue) {
+     *     return aggValue + currValue;
+     *   }
+     * }
+ *

* If there is no current aggregate the {@link Reducer} is not applied and the new aggregate will be the record's * value as-is. * Thus, {@code reduce(Reducer, SessionWindows, String)} can be used to compute aggregate functions like sum, min, @@ -749,7 +794,16 @@ public interface KGroupedStream { * "windowed" implies that the {@link KTable} key is a combined key of the original record key and a window ID. *

* The specified {@link Reducer} is applied for each input record and computes a new aggregate using the current - * aggregate and the record's value. + * aggregate (first argument) and the record's value (second argument): + *

{@code
+     * // At the example of a Reducer
+     * new Reducer() {
+     *   @Override
+     *   public Long apply(Long aggValue, Long currValue) {
+     *     return aggValue + currValue;
+     *   }
+     * }
+ *

* If there is no current aggregate the {@link Reducer} is not applied and the new aggregate will be the record's * value as-is. * Thus, {@code reduce(Reducer, SessionWindows, StateStoreSupplier)} can be used to compute aggregate functions like diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/KGroupedTable.java b/streams/src/main/java/org/apache/kafka/streams/kstream/KGroupedTable.java index 5d019c4a648..bf0df555714 100644 --- a/streams/src/main/java/org/apache/kafka/streams/kstream/KGroupedTable.java +++ b/streams/src/main/java/org/apache/kafka/streams/kstream/KGroupedTable.java @@ -163,10 +163,11 @@ public interface KGroupedTable { *

* Each update to the original {@link KTable} results in a two step update of the result {@link KTable}. * The specified {@link Reducer adder} is applied for each update record and computes a new aggregate using the - * current aggregate and the record's value by adding the new record to the aggregate. + * current aggregate (first argument) and the record's value (second argument) by adding the new record to the + * aggregate. * The specified {@link Reducer substractor} is applied for each "replaced" record of the original {@link KTable} - * and computes a new aggregate using the current aggregate and the record's value by "removing" the "replaced" - * record from the aggregate. + * and computes a new aggregate using the current aggregate (first argument) and the record's value (second + * argument) by "removing" the "replaced" record from the aggregate. * If there is no current aggregate the {@link Reducer} is not applied and the new aggregate will be the record's * value as-is. * Thus, {@code reduce(Reducer, Reducer, String)} can be used to compute aggregate functions like sum. @@ -290,10 +291,11 @@ public interface KGroupedTable { *

* Each update to the original {@link KTable} results in a two step update of the result {@link KTable}. * The specified {@link Reducer adder} is applied for each update record and computes a new aggregate using the - * current aggregate and the record's value by adding the new record to the aggregate. + * current aggregate (first argument) and the record's value (second argument) by adding the new record to the + * aggregate. * The specified {@link Reducer substractor} is applied for each "replaced" record of the original {@link KTable} - * and computes a new aggregate using the current aggregate and the record's value by "removing" the "replaced" - * record from the aggregate. + * and computes a new aggregate using the current aggregate (first argument) and the record's value (second + * argument) by "removing" the "replaced" record from the aggregate. * If there is no current aggregate the {@link Reducer} is not applied and the new aggregate will be the record's * value as-is. * Thus, {@code reduce(Reducer, Reducer, String)} can be used to compute aggregate functions like sum.