Browse Source

MINOR: Fix `Consumed` to return new object instead of `this` (#14550)

We embrace immutability and thus should return a new object instead of
`this`, similar to other config classed we use in the DSL.

Side JavaDocs cleanup for a bunch of classes.

Reviewers: Guozhang Wang <wangguoz@gmail.com>
pull/14553/head
Matthias J. Sax 1 year ago committed by GitHub
parent
commit
649e2cbc8f
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 19
      streams/src/main/java/org/apache/kafka/streams/kstream/Aggregator.java
  2. 107
      streams/src/main/java/org/apache/kafka/streams/kstream/Branched.java
  3. 38
      streams/src/main/java/org/apache/kafka/streams/kstream/BranchedKStream.java
  4. 84
      streams/src/main/java/org/apache/kafka/streams/kstream/CogroupedKStream.java
  5. 107
      streams/src/main/java/org/apache/kafka/streams/kstream/Consumed.java
  6. 7
      streams/src/main/java/org/apache/kafka/streams/kstream/EmitStrategy.java
  7. 7
      streams/src/main/java/org/apache/kafka/streams/kstream/ForeachAction.java
  8. 1
      streams/src/main/java/org/apache/kafka/streams/kstream/GlobalKTable.java
  9. 61
      streams/src/main/java/org/apache/kafka/streams/kstream/Grouped.java
  10. 8
      streams/src/main/java/org/apache/kafka/streams/kstream/Initializer.java

19
streams/src/main/java/org/apache/kafka/streams/kstream/Aggregator.java

@ -24,10 +24,11 @@ package org.apache.kafka.streams.kstream; @@ -24,10 +24,11 @@ package org.apache.kafka.streams.kstream;
* {@code Aggregator} is used in combination with {@link Initializer} that provides an initial aggregation value.
* <p>
* {@code Aggregator} can be used to implement aggregation functions like count.
*
* @param <K> key type
* @param <V> input value type
* @param <VA> aggregate value type
* @param <VAgg> aggregate value type
*
* @see Initializer
* @see KGroupedStream#aggregate(Initializer, Aggregator)
* @see KGroupedStream#aggregate(Initializer, Aggregator, Materialized)
@ -37,15 +38,19 @@ package org.apache.kafka.streams.kstream; @@ -37,15 +38,19 @@ package org.apache.kafka.streams.kstream;
* @see SessionWindowedKStream#aggregate(Initializer, Aggregator, Merger, Materialized)
* @see Reducer
*/
public interface Aggregator<K, V, VA> {
public interface Aggregator<K, V, VAgg> {
/**
* Compute a new aggregate from the key and value of a record and the current aggregate of the same key.
*
* @param key the key of the record
* @param value the value of the record
* @param aggregate the current aggregate value
* @param key
* the key of the record
* @param value
* the value of the record
* @param aggregate
* the current aggregate value
*
* @return the new aggregate value
*/
VA apply(final K key, final V value, final VA aggregate);
VAgg apply(final K key, final V value, final VAgg aggregate);
}

107
streams/src/main/java/org/apache/kafka/streams/kstream/Branched.java

@ -41,12 +41,19 @@ public class Branched<K, V> implements NamedOperation<Branched<K, V>> { @@ -41,12 +41,19 @@ public class Branched<K, V> implements NamedOperation<Branched<K, V>> {
this.chainConsumer = chainConsumer;
}
protected Branched(final Branched<K, V> branched) {
this(branched.name, branched.chainFunction, branched.chainConsumer);
}
/**
* Create an instance of {@code Branched} with provided branch name suffix.
*
* @param name the branch name suffix to be used (see {@link BranchedKStream} description for details)
* @param <K> key type
* @param <V> value type
* @param name
* the branch name suffix to be used (see {@link BranchedKStream} description for details)
*
* @param <K> key type
* @param <V> value type
*
* @return a new instance of {@code Branched}
*/
public static <K, V> Branched<K, V> as(final String name) {
@ -57,16 +64,20 @@ public class Branched<K, V> implements NamedOperation<Branched<K, V>> { @@ -57,16 +64,20 @@ public class Branched<K, V> implements NamedOperation<Branched<K, V>> {
/**
* Create an instance of {@code Branched} with provided chain function.
*
* @param chain A function that will be applied to the branch. If the provided function returns
* {@code null}, its result is ignored, otherwise it is added to the {@code Map} returned
* by {@link BranchedKStream#defaultBranch()} or {@link BranchedKStream#noDefaultBranch()} (see
* {@link BranchedKStream} description for details).
* @param <K> key type
* @param <V> value type
* @param chain
* A function that will be applied to the branch. If the provided function returns
* {@code null}, its result is ignored, otherwise it is added to the {@code Map} returned
* by {@link BranchedKStream#defaultBranch()} or {@link BranchedKStream#noDefaultBranch()} (see
* {@link BranchedKStream} description for details).
*
* @param <K> key type
* @param <V> value type
*
* @return a new instance of {@code Branched}
*/
public static <K, V> Branched<K, V> withFunction(
final Function<? super KStream<K, V>, ? extends KStream<K, V>> chain) {
final Function<? super KStream<K, V>, ? extends KStream<K, V>> chain
) {
Objects.requireNonNull(chain, "chain function cannot be null");
return new Branched<>(null, chain, null);
}
@ -74,12 +85,15 @@ public class Branched<K, V> implements NamedOperation<Branched<K, V>> { @@ -74,12 +85,15 @@ public class Branched<K, V> implements NamedOperation<Branched<K, V>> {
/**
* Create an instance of {@code Branched} with provided chain consumer.
*
* @param chain A consumer to which the branch will be sent. If a consumer is provided,
* the respective branch will not be added to the resulting {@code Map} returned
* by {@link BranchedKStream#defaultBranch()} or {@link BranchedKStream#noDefaultBranch()} (see
* {@link BranchedKStream} description for details).
* @param <K> key type
* @param <V> value type
* @param chain
* A consumer to which the branch will be sent. If a consumer is provided,
* the respective branch will not be added to the resulting {@code Map} returned
* by {@link BranchedKStream#defaultBranch()} or {@link BranchedKStream#noDefaultBranch()} (see
* {@link BranchedKStream} description for details).
*
* @param <K> key type
* @param <V> value type
*
* @return a new instance of {@code Branched}
*/
public static <K, V> Branched<K, V> withConsumer(final Consumer<KStream<K, V>> chain) {
@ -90,18 +104,24 @@ public class Branched<K, V> implements NamedOperation<Branched<K, V>> { @@ -90,18 +104,24 @@ public class Branched<K, V> implements NamedOperation<Branched<K, V>> {
/**
* Create an instance of {@code Branched} with provided chain function and branch name suffix.
*
* @param chain A function that will be applied to the branch. If the provided function returns
* {@code null}, its result is ignored, otherwise it is added to the {@code Map} returned
* by {@link BranchedKStream#defaultBranch()} or {@link BranchedKStream#noDefaultBranch()} (see
* {@link BranchedKStream} description for details).
* @param name the branch name suffix to be used. If {@code null}, a default branch name suffix will be generated
* (see {@link BranchedKStream} description for details)
* @param <K> key type
* @param <V> value type
* @param chain
* A function that will be applied to the branch. If the provided function returns
* {@code null}, its result is ignored, otherwise it is added to the {@code Map} returned
* by {@link BranchedKStream#defaultBranch()} or {@link BranchedKStream#noDefaultBranch()} (see
* {@link BranchedKStream} description for details).
* @param name
* the branch name suffix to be used. If {@code null}, a default branch name suffix will be generated
* (see {@link BranchedKStream} description for details)
*
* @param <K> key type
* @param <V> value type
*
* @return a new instance of {@code Branched}
*/
public static <K, V> Branched<K, V> withFunction(
final Function<? super KStream<K, V>, ? extends KStream<K, V>> chain, final String name) {
final Function<? super KStream<K, V>, ? extends KStream<K, V>> chain,
final String name
) {
Objects.requireNonNull(chain, "chain function cannot be null");
return new Branched<>(name, chain, null);
}
@ -109,14 +129,18 @@ public class Branched<K, V> implements NamedOperation<Branched<K, V>> { @@ -109,14 +129,18 @@ public class Branched<K, V> implements NamedOperation<Branched<K, V>> {
/**
* Create an instance of {@code Branched} with provided chain consumer and branch name suffix.
*
* @param chain A consumer to which the branch will be sent. If a non-null consumer is provided,
* the respective branch will not be added to the resulting {@code Map} returned
* by {@link BranchedKStream#defaultBranch()} or {@link BranchedKStream#noDefaultBranch()} (see
* {@link BranchedKStream} description for details).
* @param name the branch name suffix to be used. If {@code null}, a default branch name suffix will be generated
* (see {@link BranchedKStream} description for details)
* @param <K> key type
* @param <V> value type
* @param chain
* A consumer to which the branch will be sent. If a non-null consumer is provided,
* the respective branch will not be added to the resulting {@code Map} returned
* by {@link BranchedKStream#defaultBranch()} or {@link BranchedKStream#noDefaultBranch()} (see
* {@link BranchedKStream} description for details).
* @param name
* the branch name suffix to be used. If {@code null}, a default branch name suffix will be generated
* (see {@link BranchedKStream} description for details)
*
* @param <K> key type
* @param <V> value type
*
* @return a new instance of {@code Branched}
*/
public static <K, V> Branched<K, V> withConsumer(final Consumer<? super KStream<K, V>> chain,
@ -125,21 +149,14 @@ public class Branched<K, V> implements NamedOperation<Branched<K, V>> { @@ -125,21 +149,14 @@ public class Branched<K, V> implements NamedOperation<Branched<K, V>> {
return new Branched<>(name, null, chain);
}
/**
* Create an instance of {@code Branched} from an existing instance.
*
* @param branched the instance of {@code Branched} to copy
*/
protected Branched(final Branched<K, V> branched) {
this(branched.name, branched.chainFunction, branched.chainConsumer);
}
/**
* Configure the instance of {@code Branched} with a branch name suffix.
*
* @param name the branch name suffix to be used. If {@code null} a default branch name suffix will be generated (see
* {@link BranchedKStream} description for details)
* @return {@code this}
* @param name
* the branch name suffix to be used. If {@code null} a default branch name suffix will be generated (see
* {@link BranchedKStream} description for details)
*
* @return {@code this} to facilitate method chaining
*/
@Override
public Branched<K, V> withName(final String name) {

38
streams/src/main/java/org/apache/kafka/streams/kstream/BranchedKStream.java

@ -78,7 +78,7 @@ import java.util.Map; @@ -78,7 +78,7 @@ import java.util.Map;
*
* <h3>Direct Branch Consuming</h3>
* In many cases we do not need to have a single scope for all the branches, each branch being processed completely
* independently from others. Then we can use 'consuming' lambdas or method references in {@link Branched} parameter:
* independently of others. Then we can use 'consuming' lambdas or method references in {@link Branched} parameter:
*
* <pre> {@code
* source.split()
@ -112,16 +112,19 @@ import java.util.Map; @@ -112,16 +112,19 @@ import java.util.Map;
*
* @param <K> Type of keys
* @param <V> Type of values
*
* @see KStream
*/
public interface BranchedKStream<K, V> {
/**
* Define a branch for records that match the predicate.
*
* @param predicate A {@link Predicate} instance, against which each record will be evaluated.
* If this predicate returns {@code true} for a given record, the record will be
* routed to the current branch and will not be evaluated against the predicates
* for the remaining branches.
* @param predicate
* A {@link Predicate} instance, against which each record will be evaluated.
* If this predicate returns {@code true} for a given record, the record will be
* routed to the current branch and will not be evaluated against the predicates
* for the remaining branches.
*
* @return {@code this} to facilitate method chaining
*/
BranchedKStream<K, V> branch(Predicate<? super K, ? super V> predicate);
@ -129,13 +132,16 @@ public interface BranchedKStream<K, V> { @@ -129,13 +132,16 @@ public interface BranchedKStream<K, V> {
/**
* Define a branch for records that match the predicate.
*
* @param predicate A {@link Predicate} instance, against which each record will be evaluated.
* If this predicate returns {@code true} for a given record, the record will be
* routed to the current branch and will not be evaluated against the predicates
* for the remaining branches.
* @param branched A {@link Branched} parameter, that allows to define a branch name, an in-place
* branch consumer or branch mapper (see <a href="#examples">code examples</a>
* for {@link BranchedKStream})
* @param predicate
* A {@link Predicate} instance, against which each record will be evaluated.
* If this predicate returns {@code true} for a given record, the record will be
* routed to the current branch and will not be evaluated against the predicates
* for the remaining branches.
* @param branched
* A {@link Branched} parameter, that allows to define a branch name, an in-place
* branch consumer or branch mapper (see <a href="#examples">code examples</a>
* for {@link BranchedKStream})
*
* @return {@code this} to facilitate method chaining
*/
BranchedKStream<K, V> branch(Predicate<? super K, ? super V> predicate, Branched<K, V> branched);
@ -153,9 +159,11 @@ public interface BranchedKStream<K, V> { @@ -153,9 +159,11 @@ public interface BranchedKStream<K, V> {
* Finalize the construction of branches and defines the default branch for the messages not intercepted
* by other branches. Calling {@code defaultBranch} or {@link #noDefaultBranch()} is optional.
*
* @param branched A {@link Branched} parameter, that allows to define a branch name, an in-place
* branch consumer or branch mapper (see <a href="#examples">code examples</a>
* for {@link BranchedKStream})
* @param branched
* A {@link Branched} parameter, that allows to define a branch name, an in-place
* branch consumer or branch mapper (see <a href="#examples">code examples</a>
* for {@link BranchedKStream})
*
* @return {@link Map} of named branches. For rules of forming the resulting map, see {@link BranchedKStream}
* <a href="#maprules">description</a>.
*/

84
streams/src/main/java/org/apache/kafka/streams/kstream/CogroupedKStream.java

@ -36,9 +36,9 @@ import org.apache.kafka.streams.state.TimestampedKeyValueStore; @@ -36,9 +36,9 @@ import org.apache.kafka.streams.state.TimestampedKeyValueStore;
* {@link KGroupedStream#cogroup(Aggregator) cogroup(...)}.
*
* @param <K> Type of keys
* @param <VOut> Type of values after agg
* @param <VAgg> Type of values after agg
*/
public interface CogroupedKStream<K, VOut> {
public interface CogroupedKStream<K, VAgg> {
/**
* Add an already {@link KGroupedStream grouped KStream} to this {@code CogroupedKStream}.
@ -54,13 +54,17 @@ public interface CogroupedKStream<K, VOut> { @@ -54,13 +54,17 @@ public interface CogroupedKStream<K, VOut> {
* using the initial intermediate aggregation result provided via the {@link Initializer} that is passed into
* {@link #aggregate(Initializer)}) and the record's value.
*
* @param groupedStream a group stream
* @param aggregator an {@link Aggregator} that computes a new aggregate result
* @param <VIn> Type of input values
* @param groupedStream
* a group stream
* @param aggregator
* an {@link Aggregator} that computes a new aggregate result
*
* @param <V> Type of input values
*
* @return a {@code CogroupedKStream}
*/
<VIn> CogroupedKStream<K, VOut> cogroup(final KGroupedStream<K, VIn> groupedStream,
final Aggregator<? super K, ? super VIn, VOut> aggregator);
<V> CogroupedKStream<K, VAgg> cogroup(final KGroupedStream<K, V> groupedStream,
final Aggregator<? super K, ? super V, VAgg> aggregator);
/**
* Aggregate the values of records in these streams by the grouped key.
@ -105,12 +109,14 @@ public interface CogroupedKStream<K, VOut> { @@ -105,12 +109,14 @@ public interface CogroupedKStream<K, VOut> {
* <p>
* You can retrieve all generated internal topic names via {@link Topology#describe()}.
*
* @param initializer an {@link Initializer} that computes an initial intermediate aggregation
* result. Cannot be {@code null}.
* @param initializer
* an {@link Initializer} that computes an initial intermediate aggregation
* result. Cannot be {@code null}.
*
* @return a {@link KTable} that contains "update" records with unmodified keys, and values that
* represent the latest (rolling) aggregate for each key
*/
KTable<K, VOut> aggregate(final Initializer<VOut> initializer);
KTable<K, VAgg> aggregate(final Initializer<VAgg> initializer);
/**
* Aggregate the values of records in these streams by the grouped key.
@ -156,13 +162,15 @@ public interface CogroupedKStream<K, VOut> { @@ -156,13 +162,15 @@ public interface CogroupedKStream<K, VOut> {
* <p>
* You can retrieve all generated internal topic names via {@link Topology#describe()}.
*
* @param initializer an {@link Initializer} that computes an initial intermediate aggregation
* result. Cannot be {@code null}.
* @param named name the processor. Cannot be {@code null}.
* @param initializer
* an {@link Initializer} that computes an initial intermediate aggregation result. Cannot be {@code null}.
* @param named
* name the processor. Cannot be {@code null}.
*
* @return a {@link KTable} that contains "update" records with unmodified keys, and values that
* represent the latest (rolling) aggregate for each key
*/
KTable<K, VOut> aggregate(final Initializer<VOut> initializer,
KTable<K, VAgg> aggregate(final Initializer<VAgg> initializer,
final Named named);
/**
@ -208,15 +216,16 @@ public interface CogroupedKStream<K, VOut> { @@ -208,15 +216,16 @@ public interface CogroupedKStream<K, VOut> {
* <p>
* You can retrieve all generated internal topic names via {@link Topology#describe()}.
*
* @param initializer an {@link Initializer} that computes an initial intermediate aggregation
* result. Cannot be {@code null}.
* @param materialized an instance of {@link Materialized} used to materialize a state store.
* Cannot be {@code null}.
* @param initializer
* an {@link Initializer} that computes an initial intermediate aggregation result. Cannot be {@code null}.
* @param materialized
* an instance of {@link Materialized} used to materialize a state store. Cannot be {@code null}.
*
* @return a {@link KTable} that contains "update" records with unmodified keys, and values that
* represent the latest (rolling) aggregate for each key
*/
KTable<K, VOut> aggregate(final Initializer<VOut> initializer,
final Materialized<K, VOut, KeyValueStore<Bytes, byte[]>> materialized);
KTable<K, VAgg> aggregate(final Initializer<VAgg> initializer,
final Materialized<K, VAgg, KeyValueStore<Bytes, byte[]>> materialized);
/**
* Aggregate the values of records in these streams by the grouped key.
@ -262,44 +271,53 @@ public interface CogroupedKStream<K, VOut> { @@ -262,44 +271,53 @@ public interface CogroupedKStream<K, VOut> {
* <p>
* You can retrieve all generated internal topic names via {@link Topology#describe()}.
*
* @param initializer an {@link Initializer} that computes an initial intermediate aggregation
* result. Cannot be {@code null}.
* @param materialized an instance of {@link Materialized} used to materialize a state store.
* Cannot be {@code null}.
* @param named name the processors. Cannot be {@code null}.
* @param initializer
* an {@link Initializer} that computes an initial intermediate aggregation result. Cannot be {@code null}.
* @param materialized
* an instance of {@link Materialized} used to materialize a state store. Cannot be {@code null}.
* @param named
* name the processors. Cannot be {@code null}.
*
* @return a {@link KTable} that contains "update" records with unmodified keys, and values that
* represent the latest (rolling) aggregate for each key
*/
KTable<K, VOut> aggregate(final Initializer<VOut> initializer,
KTable<K, VAgg> aggregate(final Initializer<VAgg> initializer,
final Named named,
final Materialized<K, VOut, KeyValueStore<Bytes, byte[]>> materialized);
final Materialized<K, VAgg, KeyValueStore<Bytes, byte[]>> materialized);
/**
* Create a new {@link TimeWindowedCogroupedKStream} instance that can be used to perform windowed
* aggregations.
*
* @param windows the specification of the aggregation {@link Windows}
* @param windows
* the specification of the aggregation {@link Windows}
*
* @param <W> the window type
*
* @return an instance of {@link TimeWindowedCogroupedKStream}
*/
<W extends Window> TimeWindowedCogroupedKStream<K, VOut> windowedBy(final Windows<W> windows);
<W extends Window> TimeWindowedCogroupedKStream<K, VAgg> windowedBy(final Windows<W> windows);
/**
* Create a new {@link TimeWindowedCogroupedKStream} instance that can be used to perform sliding
* windowed aggregations.
*
* @param windows the specification of the aggregation {@link SlidingWindows}
* @param windows
* the specification of the aggregation {@link SlidingWindows}
*
* @return an instance of {@link TimeWindowedCogroupedKStream}
*/
TimeWindowedCogroupedKStream<K, VOut> windowedBy(final SlidingWindows windows);
TimeWindowedCogroupedKStream<K, VAgg> windowedBy(final SlidingWindows windows);
/**
* Create a new {@link SessionWindowedCogroupedKStream} instance that can be used to perform session
* windowed aggregations.
*
* @param windows the specification of the aggregation {@link SessionWindows}
* @param windows
* the specification of the aggregation {@link SessionWindows}
*
* @return an instance of {@link SessionWindowedCogroupedKStream}
*/
SessionWindowedCogroupedKStream<K, VOut> windowedBy(final SessionWindows windows);
SessionWindowedCogroupedKStream<K, VAgg> windowedBy(final SessionWindows windows);
}

107
streams/src/main/java/org/apache/kafka/streams/kstream/Consumed.java

@ -70,10 +70,6 @@ public class Consumed<K, V> implements NamedOperation<Consumed<K, V>> { @@ -70,10 +70,6 @@ public class Consumed<K, V> implements NamedOperation<Consumed<K, V>> {
this.processorName = processorName;
}
/**
* Create an instance of {@link Consumed} from an existing instance.
* @param consumed the instance of {@link Consumed} to copy
*/
protected Consumed(final Consumed<K, V> consumed) {
this(consumed.keySerde,
consumed.valueSerde,
@ -86,12 +82,18 @@ public class Consumed<K, V> implements NamedOperation<Consumed<K, V>> { @@ -86,12 +82,18 @@ public class Consumed<K, V> implements NamedOperation<Consumed<K, V>> {
/**
* Create an instance of {@link Consumed} with the supplied arguments. {@code null} values are acceptable.
*
* @param keySerde the key serde. If {@code null} the default key serde from config will be used
* @param valueSerde the value serde. If {@code null} the default value serde from config will be used
* @param timestampExtractor the timestamp extractor to used. If {@code null} the default timestamp extractor from config will be used
* @param resetPolicy the offset reset policy to be used. If {@code null} the default reset policy from config will be used
* @param <K> key type
* @param <V> value type
* @param keySerde
* the key serde. If {@code null} the default key serde from config will be used
* @param valueSerde
* the value serde. If {@code null} the default value serde from config will be used
* @param timestampExtractor
* the timestamp extractor to used. If {@code null} the default timestamp extractor from config will be used
* @param resetPolicy
* the offset reset policy to be used. If {@code null} the default reset policy from config will be used
*
* @param <K> key type
* @param <V> value type
*
* @return a new instance of {@link Consumed}
*/
public static <K, V> Consumed<K, V> with(final Serde<K> keySerde,
@ -99,16 +101,19 @@ public class Consumed<K, V> implements NamedOperation<Consumed<K, V>> { @@ -99,16 +101,19 @@ public class Consumed<K, V> implements NamedOperation<Consumed<K, V>> {
final TimestampExtractor timestampExtractor,
final Topology.AutoOffsetReset resetPolicy) {
return new Consumed<>(keySerde, valueSerde, timestampExtractor, resetPolicy, null);
}
/**
* Create an instance of {@link Consumed} with key and value {@link Serde}s.
*
* @param keySerde the key serde. If {@code null} the default key serde from config will be used
* @param valueSerde the value serde. If {@code null} the default value serde from config will be used
* @param <K> key type
* @param <V> value type
* @param keySerde
* the key serde. If {@code null} the default key serde from config will be used
* @param valueSerde
* the value serde. If {@code null} the default value serde from config will be used
*
* @param <K> key type
* @param <V> value type
*
* @return a new instance of {@link Consumed}
*/
public static <K, V> Consumed<K, V> with(final Serde<K> keySerde,
@ -119,9 +124,12 @@ public class Consumed<K, V> implements NamedOperation<Consumed<K, V>> { @@ -119,9 +124,12 @@ public class Consumed<K, V> implements NamedOperation<Consumed<K, V>> {
/**
* Create an instance of {@link Consumed} with a {@link TimestampExtractor}.
*
* @param timestampExtractor the timestamp extractor to used. If {@code null} the default timestamp extractor from config will be used
* @param <K> key type
* @param <V> value type
* @param timestampExtractor
* the timestamp extractor to used. If {@code null} the default timestamp extractor from config will be used
*
* @param <K> key type
* @param <V> value type
*
* @return a new instance of {@link Consumed}
*/
public static <K, V> Consumed<K, V> with(final TimestampExtractor timestampExtractor) {
@ -131,9 +139,12 @@ public class Consumed<K, V> implements NamedOperation<Consumed<K, V>> { @@ -131,9 +139,12 @@ public class Consumed<K, V> implements NamedOperation<Consumed<K, V>> {
/**
* Create an instance of {@link Consumed} with a {@link org.apache.kafka.streams.Topology.AutoOffsetReset Topology.AutoOffsetReset}.
*
* @param resetPolicy the offset reset policy to be used. If {@code null} the default reset policy from config will be used
* @param <K> key type
* @param <V> value type
* @param resetPolicy
* the offset reset policy to be used. If {@code null} the default reset policy from config will be used
*
* @param <K> key type
* @param <V> value type
*
* @return a new instance of {@link Consumed}
*/
public static <K, V> Consumed<K, V> with(final Topology.AutoOffsetReset resetPolicy) {
@ -143,9 +154,12 @@ public class Consumed<K, V> implements NamedOperation<Consumed<K, V>> { @@ -143,9 +154,12 @@ public class Consumed<K, V> implements NamedOperation<Consumed<K, V>> {
/**
* Create an instance of {@link Consumed} with provided processor name.
*
* @param processorName the processor name to be used. If {@code null} a default processor name will be generated
* @param <K> key type
* @param <V> value type
* @param processorName
* the processor name to be used. If {@code null} a default processor name will be generated
*
* @param <K> key type
* @param <V> value type
*
* @return a new instance of {@link Consumed}
*/
public static <K, V> Consumed<K, V> as(final String processorName) {
@ -155,57 +169,62 @@ public class Consumed<K, V> implements NamedOperation<Consumed<K, V>> { @@ -155,57 +169,62 @@ public class Consumed<K, V> implements NamedOperation<Consumed<K, V>> {
/**
* Configure the instance of {@link Consumed} with a key {@link Serde}.
*
* @param keySerde the key serde. If {@code null}the default key serde from config will be used
* @return this
* @param keySerde
* the key serde. If {@code null} the default key serde from config will be used
*
* @return a new instance of {@link Consumed}
*/
public Consumed<K, V> withKeySerde(final Serde<K> keySerde) {
this.keySerde = keySerde;
return this;
return new Consumed<K, V>(keySerde, valueSerde, timestampExtractor, resetPolicy, processorName);
}
/**
* Configure the instance of {@link Consumed} with a value {@link Serde}.
*
* @param valueSerde the value serde. If {@code null} the default value serde from config will be used
* @return this
* @param valueSerde
* the value serde. If {@code null} the default value serde from config will be used
*
* @return a new instance of {@link Consumed}
*/
public Consumed<K, V> withValueSerde(final Serde<V> valueSerde) {
this.valueSerde = valueSerde;
return this;
return new Consumed<K, V>(keySerde, valueSerde, timestampExtractor, resetPolicy, processorName);
}
/**
* Configure the instance of {@link Consumed} with a {@link TimestampExtractor}.
*
* @param timestampExtractor the timestamp extractor to used. If {@code null} the default timestamp extractor from config will be used
* @return this
* @param timestampExtractor
* the timestamp extractor to used. If {@code null} the default timestamp extractor from config will be used
*
* @return a new instance of {@link Consumed}
*/
public Consumed<K, V> withTimestampExtractor(final TimestampExtractor timestampExtractor) {
this.timestampExtractor = timestampExtractor;
return this;
return new Consumed<K, V>(keySerde, valueSerde, timestampExtractor, resetPolicy, processorName);
}
/**
* Configure the instance of {@link Consumed} with a {@link org.apache.kafka.streams.Topology.AutoOffsetReset Topology.AutoOffsetReset}.
*
* @param resetPolicy the offset reset policy to be used. If {@code null} the default reset policy from config will be used
* @return this
* @param resetPolicy
* the offset reset policy to be used. If {@code null} the default reset policy from config will be used
*
* @return a new instance of {@link Consumed}
*/
public Consumed<K, V> withOffsetResetPolicy(final Topology.AutoOffsetReset resetPolicy) {
this.resetPolicy = resetPolicy;
return this;
return new Consumed<K, V>(keySerde, valueSerde, timestampExtractor, resetPolicy, processorName);
}
/**
* Configure the instance of {@link Consumed} with a processor name.
*
* @param processorName the processor name to be used. If {@code null} a default processor name will be generated
* @return this
* @param processorName
* the processor name to be used. If {@code null} a default processor name will be generated
*
* @return a new instance of {@link Consumed}
*/
@Override
public Consumed<K, V> withName(final String processorName) {
this.processorName = processorName;
return this;
return new Consumed<K, V>(keySerde, valueSerde, timestampExtractor, resetPolicy, processorName);
}
@Override

7
streams/src/main/java/org/apache/kafka/streams/kstream/EmitStrategy.java

@ -68,7 +68,8 @@ public interface EmitStrategy { @@ -68,7 +68,8 @@ public interface EmitStrategy {
}
/**
* Returns the strategy type
* Returns the strategy type.
*
* @return Emit strategy type
*/
StrategyType type();
@ -87,7 +88,7 @@ public interface EmitStrategy { @@ -87,7 +88,7 @@ public interface EmitStrategy {
* @see UnlimitedWindows
* @see WindowUpdateStrategy
*
* @return WindowCloseStrategy instance
* @return "window close" {@code EmitStrategy} instance
*/
static EmitStrategy onWindowClose() {
return new WindowCloseStrategy();
@ -103,7 +104,7 @@ public interface EmitStrategy { @@ -103,7 +104,7 @@ public interface EmitStrategy {
* @see UnlimitedWindows
* @see WindowCloseStrategy
*
* @return WindowCloseStrategy instance
* @return "window update" {@code EmitStrategy} instance
*/
static EmitStrategy onWindowUpdate() {
return new WindowUpdateStrategy();

7
streams/src/main/java/org/apache/kafka/streams/kstream/ForeachAction.java

@ -27,6 +27,7 @@ package org.apache.kafka.streams.kstream; @@ -27,6 +27,7 @@ package org.apache.kafka.streams.kstream;
*
* @param <K> key type
* @param <V> value type
*
* @see KStream#foreach(ForeachAction)
*/
public interface ForeachAction<K, V> {
@ -34,8 +35,10 @@ public interface ForeachAction<K, V> { @@ -34,8 +35,10 @@ public interface ForeachAction<K, V> {
/**
* Perform an action for each record of a stream.
*
* @param key the key of the record
* @param value the value of the record
* @param key
* the key of the record
* @param value
* the value of the record
*/
void apply(final K key, final V value);
}

1
streams/src/main/java/org/apache/kafka/streams/kstream/GlobalKTable.java

@ -59,6 +59,7 @@ import org.apache.kafka.streams.state.ReadOnlyKeyValueStore; @@ -59,6 +59,7 @@ import org.apache.kafka.streams.state.ReadOnlyKeyValueStore;
*
* @param <K> Type of primary keys
* @param <V> Type of value changes
*
* @see KTable
* @see StreamsBuilder#globalTable(String)
* @see KStream#join(GlobalKTable, KeyValueMapper, ValueJoiner)

61
streams/src/main/java/org/apache/kafka/streams/kstream/Grouped.java

@ -48,8 +48,14 @@ public class Grouped<K, V> implements NamedOperation<Grouped<K, V>> { @@ -48,8 +48,14 @@ public class Grouped<K, V> implements NamedOperation<Grouped<K, V>> {
/**
* Create a {@link Grouped} instance with the provided name used as part of the repartition topic if required.
*
* @param name the name used for a repartition topic if required
* @param name
* the name used for a repartition topic if required
*
* @param <K> the key type
* @param <V> the value type
*
* @return a new {@link Grouped} configured with the name
*
* @see KStream#groupByKey(Grouped)
* @see KStream#groupBy(KeyValueMapper, Grouped)
* @see KTable#groupBy(KeyValueMapper, Grouped)
@ -62,8 +68,14 @@ public class Grouped<K, V> implements NamedOperation<Grouped<K, V>> { @@ -62,8 +68,14 @@ public class Grouped<K, V> implements NamedOperation<Grouped<K, V>> {
/**
* Create a {@link Grouped} instance with the provided keySerde. If {@code null} the default key serde from config will be used.
*
* @param keySerde the Serde used for serializing the key. If {@code null} the default key serde from config will be used
* @param keySerde
* the Serde used for serializing the key. If {@code null} the default key serde from config will be used
*
* @param <K> the key type
* @param <V> the value type
*
* @return a new {@link Grouped} configured with the keySerde
*
* @see KStream#groupByKey(Grouped)
* @see KStream#groupBy(KeyValueMapper, Grouped)
* @see KTable#groupBy(KeyValueMapper, Grouped)
@ -76,8 +88,14 @@ public class Grouped<K, V> implements NamedOperation<Grouped<K, V>> { @@ -76,8 +88,14 @@ public class Grouped<K, V> implements NamedOperation<Grouped<K, V>> {
/**
* Create a {@link Grouped} instance with the provided valueSerde. If {@code null} the default value serde from config will be used.
*
* @param valueSerde the {@link Serde} used for serializing the value. If {@code null} the default value serde from config will be used
* @param valueSerde
* the {@link Serde} used for serializing the value. If {@code null} the default value serde from config will be used
*
* @param <K> the key type
* @param <V> the value type
*
* @return a new {@link Grouped} configured with the valueSerde
*
* @see KStream#groupByKey(Grouped)
* @see KStream#groupBy(KeyValueMapper, Grouped)
* @see KTable#groupBy(KeyValueMapper, Grouped)
@ -90,10 +108,18 @@ public class Grouped<K, V> implements NamedOperation<Grouped<K, V>> { @@ -90,10 +108,18 @@ public class Grouped<K, V> implements NamedOperation<Grouped<K, V>> {
* Create a {@link Grouped} instance with the provided name, keySerde, and valueSerde. If the keySerde and/or the valueSerde is
* {@code null} the default value for the respective serde from config will be used.
*
* @param name the name used as part of the repartition topic name if required
* @param keySerde the {@link Serde} used for serializing the key. If {@code null} the default key serde from config will be used
* @param valueSerde the {@link Serde} used for serializing the value. If {@code null} the default value serde from config will be used
* @param name
* the name used as part of the repartition topic name if required
* @param keySerde
* the {@link Serde} used for serializing the key. If {@code null} the default key serde from config will be used
* @param valueSerde
* the {@link Serde} used for serializing the value. If {@code null} the default value serde from config will be used
*
* @param <K> the key type
* @param <V> the value type
*
* @return a new {@link Grouped} configured with the name, keySerde, and valueSerde
*
* @see KStream#groupByKey(Grouped)
* @see KStream#groupBy(KeyValueMapper, Grouped)
* @see KTable#groupBy(KeyValueMapper, Grouped)
@ -109,9 +135,16 @@ public class Grouped<K, V> implements NamedOperation<Grouped<K, V>> { @@ -109,9 +135,16 @@ public class Grouped<K, V> implements NamedOperation<Grouped<K, V>> {
* Create a {@link Grouped} instance with the provided keySerde and valueSerde. If the keySerde and/or the valueSerde is
* {@code null} the default value for the respective serde from config will be used.
*
* @param keySerde the {@link Serde} used for serializing the key. If {@code null} the default key serde from config will be used
* @param valueSerde the {@link Serde} used for serializing the value. If {@code null} the default value serde from config will be used
* @param keySerde
* the {@link Serde} used for serializing the key. If {@code null} the default key serde from config will be used
* @param valueSerde
* the {@link Serde} used for serializing the value. If {@code null} the default value serde from config will be used
*
* @param <K> the key type
* @param <V> the value type
*
* @return a new {@link Grouped} configured with the keySerde, and valueSerde
*
* @see KStream#groupByKey(Grouped)
* @see KStream#groupBy(KeyValueMapper, Grouped)
* @see KTable#groupBy(KeyValueMapper, Grouped)
@ -125,7 +158,9 @@ public class Grouped<K, V> implements NamedOperation<Grouped<K, V>> { @@ -125,7 +158,9 @@ public class Grouped<K, V> implements NamedOperation<Grouped<K, V>> {
* Perform the grouping operation with the name for a repartition topic if required. Note
* that Kafka Streams does not always create repartition topics for grouping operations.
*
* @param name the name used for the processor name and as part of the repartition topic name if required
* @param name
* the name used for the processor name and as part of the repartition topic name if required
*
* @return a new {@link Grouped} instance configured with the name
* */
@Override
@ -136,7 +171,9 @@ public class Grouped<K, V> implements NamedOperation<Grouped<K, V>> { @@ -136,7 +171,9 @@ public class Grouped<K, V> implements NamedOperation<Grouped<K, V>> {
/**
* Perform the grouping operation using the provided keySerde for serializing the key.
*
* @param keySerde {@link Serde} to use for serializing the key. If {@code null} the default key serde from config will be used
* @param keySerde
* {@link Serde} to use for serializing the key. If {@code null} the default key serde from config will be used
*
* @return a new {@link Grouped} instance configured with the keySerde
*/
public Grouped<K, V> withKeySerde(final Serde<K> keySerde) {
@ -146,7 +183,9 @@ public class Grouped<K, V> implements NamedOperation<Grouped<K, V>> { @@ -146,7 +183,9 @@ public class Grouped<K, V> implements NamedOperation<Grouped<K, V>> {
/**
* Perform the grouping operation using the provided valueSerde for serializing the value.
*
* @param valueSerde {@link Serde} to use for serializing the value. If {@code null} the default value serde from config will be used
* @param valueSerde
* {@link Serde} to use for serializing the value. If {@code null} the default value serde from config will be used
*
* @return a new {@link Grouped} instance configured with the valueSerde
*/
public Grouped<K, V> withValueSerde(final Serde<V> valueSerde) {

8
streams/src/main/java/org/apache/kafka/streams/kstream/Initializer.java

@ -16,12 +16,12 @@ @@ -16,12 +16,12 @@
*/
package org.apache.kafka.streams.kstream;
/**
* The {@code Initializer} interface for creating an initial value in aggregations.
* {@code Initializer} is used in combination with {@link Aggregator}.
*
* @param <VA> aggregate value type
* @param <VAgg> aggregate value type
*
* @see Aggregator
* @see KGroupedStream#aggregate(Initializer, Aggregator)
* @see KGroupedStream#aggregate(Initializer, Aggregator, Materialized)
@ -30,12 +30,12 @@ package org.apache.kafka.streams.kstream; @@ -30,12 +30,12 @@ package org.apache.kafka.streams.kstream;
* @see SessionWindowedKStream#aggregate(Initializer, Aggregator, Merger)
* @see SessionWindowedKStream#aggregate(Initializer, Aggregator, Merger, Materialized)
*/
public interface Initializer<VA> {
public interface Initializer<VAgg> {
/**
* Return the initial value for an aggregation.
*
* @return the initial value for an aggregation
*/
VA apply();
VAgg apply();
}

Loading…
Cancel
Save