From 2421a69556f0275efc99680c05f0bab27ecafb0c Mon Sep 17 00:00:00 2001 From: "Matthias J. Sax" Date: Fri, 1 Nov 2019 14:14:57 -0700 Subject: [PATCH] MINOR: Fix Kafka Streams JavaDocs with regard to new StreamJoined class (#7627) Reviewers: Bruno Cadonna , Bill Bejeck --- .../kafka/streams/kstream/JoinWindows.java | 6 +-- .../apache/kafka/streams/kstream/Joined.java | 3 +- .../apache/kafka/streams/kstream/KStream.java | 49 ++++++++++--------- .../apache/kafka/streams/kstream/Printed.java | 8 +-- .../kafka/streams/kstream/StreamJoined.java | 3 +- .../kafka/streams/kstream/ValueJoiner.java | 6 +-- .../kafka/streams/scala/kstream/KStream.scala | 16 +++--- 7 files changed, 46 insertions(+), 45 deletions(-) diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/JoinWindows.java b/streams/src/main/java/org/apache/kafka/streams/kstream/JoinWindows.java index f7e3585d137..5a180a24fa4 100644 --- a/streams/src/main/java/org/apache/kafka/streams/kstream/JoinWindows.java +++ b/streams/src/main/java/org/apache/kafka/streams/kstream/JoinWindows.java @@ -61,11 +61,11 @@ import static org.apache.kafka.streams.kstream.internals.WindowingDefaults.DEFAU * @see UnlimitedWindows * @see SessionWindows * @see KStream#join(KStream, ValueJoiner, JoinWindows) - * @see KStream#join(KStream, ValueJoiner, JoinWindows, Joined) + * @see KStream#join(KStream, ValueJoiner, JoinWindows, StreamJoined) * @see KStream#leftJoin(KStream, ValueJoiner, JoinWindows) - * @see KStream#leftJoin(KStream, ValueJoiner, JoinWindows, Joined) + * @see KStream#leftJoin(KStream, ValueJoiner, JoinWindows, StreamJoined) * @see KStream#outerJoin(KStream, ValueJoiner, JoinWindows) - * @see KStream#outerJoin(KStream, ValueJoiner, JoinWindows, Joined) + * @see KStream#outerJoin(KStream, ValueJoiner, JoinWindows, StreamJoined) * @see TimestampExtractor */ public final class JoinWindows extends Windows { diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/Joined.java b/streams/src/main/java/org/apache/kafka/streams/kstream/Joined.java index 1343487c9e2..15978373a86 100644 --- a/streams/src/main/java/org/apache/kafka/streams/kstream/Joined.java +++ b/streams/src/main/java/org/apache/kafka/streams/kstream/Joined.java @@ -20,7 +20,8 @@ import org.apache.kafka.common.serialization.Serde; /** * The {@code Joined} class represents optional params that can be passed to - * {@link KStream#join}, {@link KStream#leftJoin}, and {@link KStream#outerJoin} operations. + * {@link KStream#join(KTable, ValueJoiner, Joined) KStream#join(KTable,...)} and + * {@link KStream#leftJoin(KTable, ValueJoiner) KStream#leftJoin(KTable,...)} operations. */ public class Joined implements NamedOperation> { diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/KStream.java b/streams/src/main/java/org/apache/kafka/streams/kstream/KStream.java index b4e02253bf2..882f0550f6d 100644 --- a/streams/src/main/java/org/apache/kafka/streams/kstream/KStream.java +++ b/streams/src/main/java/org/apache/kafka/streams/kstream/KStream.java @@ -2367,8 +2367,8 @@ public interface KStream { /** * Join records of this stream with another {@code KStream}'s records using windowed inner equi join using the - * {@link Joined} instance for configuration of the {@link Serde key serde}, {@link Serde this stream's value serde}, - * and {@link Serde the other stream's value serde}. + * {@link StreamJoined} instance for configuration of the {@link Serde key serde}, {@link Serde this stream's value + * serde}, {@link Serde the other stream's value serde}, and used state stores. * The join is computed on the records' key with join attribute {@code thisKStream.key == otherKStream.key}. * Furthermore, two records are only joined if their timestamps are close to each other as defined by the given * {@link JoinWindows}, i.e., the window defines an additional join predicate on the record timestamps. @@ -2437,8 +2437,8 @@ public interface KStream { * @param streamJoined a {@link StreamJoined} used to configure join stores * @return a {@code KStream} that contains join-records for each key and values computed by the given * {@link ValueJoiner}, one for each matched record-pair with the same key and within the joining window intervals - * @see #leftJoin(KStream, ValueJoiner, JoinWindows, Joined) - * @see #outerJoin(KStream, ValueJoiner, JoinWindows, Joined) + * @see #leftJoin(KStream, ValueJoiner, JoinWindows, StreamJoined) + * @see #outerJoin(KStream, ValueJoiner, JoinWindows, StreamJoined) */ KStream join(final KStream otherStream, final ValueJoiner joiner, @@ -2613,8 +2613,8 @@ public interface KStream { /** * Join records of this stream with another {@code KStream}'s records using windowed left equi join using the - * {@link Joined} instance for configuration of the {@link Serde key serde}, {@link Serde this stream's value serde}, - * and {@link Serde the other stream's value serde}. + * {@link StreamJoined} instance for configuration of the {@link Serde key serde}, {@link Serde this stream's value + * serde}, {@link Serde the other stream's value serde}, and used state stores. * In contrast to {@link #join(KStream, ValueJoiner, JoinWindows) inner-join}, all records from this stream will * produce at least one output record (cf. below). * The join is computed on the records' key with join attribute {@code thisKStream.key == otherKStream.key}. @@ -2678,17 +2678,17 @@ public interface KStream { *

* You can retrieve all generated internal topic names via {@link Topology#describe()}. * - * @param the value type of the other stream - * @param the value type of the result stream - * @param otherStream the {@code KStream} to be joined with this stream - * @param joiner a {@link ValueJoiner} that computes the join result for a pair of matching records - * @param windows the specification of the {@link JoinWindows} - * @param streamJoined + * @param the value type of the other stream + * @param the value type of the result stream + * @param otherStream the {@code KStream} to be joined with this stream + * @param joiner a {@link ValueJoiner} that computes the join result for a pair of matching records + * @param windows the specification of the {@link JoinWindows} + * @param streamJoined a {@link StreamJoined} instance to configure serdes and state stores * @return a {@code KStream} that contains join-records for each key and values computed by the given * {@link ValueJoiner}, one for each matched record-pair with the same key plus one for each non-matching record of * this {@code KStream} and within the joining window intervals - * @see #join(KStream, ValueJoiner, JoinWindows, Joined) - * @see #outerJoin(KStream, ValueJoiner, JoinWindows, Joined) + * @see #join(KStream, ValueJoiner, JoinWindows, StreamJoined) + * @see #outerJoin(KStream, ValueJoiner, JoinWindows, StreamJoined) */ KStream leftJoin(final KStream otherStream, final ValueJoiner joiner, @@ -2865,8 +2865,8 @@ public interface KStream { /** * Join records of this stream with another {@code KStream}'s records using windowed outer equi join using the - * {@link Joined} instance for configuration of the {@link Serde key serde}, {@link Serde this stream's value serde}, - * and {@link Serde the other stream's value serde}. + * {@link StreamJoined} instance for configuration of the {@link Serde key serde}, {@link Serde this stream's value + * serde}, {@link Serde the other stream's value serde}, and used state stores. * In contrast to {@link #join(KStream, ValueJoiner, JoinWindows) inner-join} or * {@link #leftJoin(KStream, ValueJoiner, JoinWindows) left-join}, all records from both streams will produce at * least one output record (cf. below). @@ -2931,17 +2931,17 @@ public interface KStream { *

* You can retrieve all generated internal topic names via {@link Topology#describe()}. * - * @param the value type of the other stream - * @param the value type of the result stream - * @param otherStream the {@code KStream} to be joined with this stream - * @param joiner a {@link ValueJoiner} that computes the join result for a pair of matching records - * @param windows the specification of the {@link JoinWindows} - * @param streamJoined + * @param the value type of the other stream + * @param the value type of the result stream + * @param otherStream the {@code KStream} to be joined with this stream + * @param joiner a {@link ValueJoiner} that computes the join result for a pair of matching records + * @param windows the specification of the {@link JoinWindows} + * @param streamJoined a {@link StreamJoined} instance to configure serdes and state stores * @return a {@code KStream} that contains join-records for each key and values computed by the given * {@link ValueJoiner}, one for each matched record-pair with the same key plus one for each non-matching record of * both {@code KStream} and within the joining window intervals - * @see #join(KStream, ValueJoiner, JoinWindows, Joined) - * @see #leftJoin(KStream, ValueJoiner, JoinWindows, Joined) + * @see #join(KStream, ValueJoiner, JoinWindows, StreamJoined) + * @see #leftJoin(KStream, ValueJoiner, JoinWindows, StreamJoined) */ KStream outerJoin(final KStream otherStream, final ValueJoiner joiner, @@ -3327,6 +3327,7 @@ public interface KStream { final KeyValueMapper keyValueMapper, final ValueJoiner joiner, final Named named); + /** * Join records of this stream with {@link GlobalKTable}'s records using non-windowed left equi join. * In contrast to {@link #join(GlobalKTable, KeyValueMapper, ValueJoiner) inner-join}, all records from this stream diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/Printed.java b/streams/src/main/java/org/apache/kafka/streams/kstream/Printed.java index 53c00c52303..fdcd9cb335f 100644 --- a/streams/src/main/java/org/apache/kafka/streams/kstream/Printed.java +++ b/streams/src/main/java/org/apache/kafka/streams/kstream/Printed.java @@ -35,12 +35,8 @@ public class Printed implements NamedOperation> { protected final OutputStream outputStream; protected String label; protected String processorName; - protected KeyValueMapper mapper = new KeyValueMapper() { - @Override - public String apply(final K key, final V value) { - return String.format("%s, %s", key, value); - } - }; + protected KeyValueMapper mapper = + (KeyValueMapper) (key, value) -> String.format("%s, %s", key, value); private Printed(final OutputStream outputStream) { this.outputStream = outputStream; diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/StreamJoined.java b/streams/src/main/java/org/apache/kafka/streams/kstream/StreamJoined.java index f769174321c..06d5fc535f1 100644 --- a/streams/src/main/java/org/apache/kafka/streams/kstream/StreamJoined.java +++ b/streams/src/main/java/org/apache/kafka/streams/kstream/StreamJoined.java @@ -27,8 +27,7 @@ import org.apache.kafka.streams.state.WindowBytesStoreSupplier; * @param this value type * @param other value type */ -public class StreamJoined - implements NamedOperation> { +public class StreamJoined implements NamedOperation> { protected final Serde keySerde; protected final Serde valueSerde; diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/ValueJoiner.java b/streams/src/main/java/org/apache/kafka/streams/kstream/ValueJoiner.java index 0f0a7472cae..0e573750d2a 100644 --- a/streams/src/main/java/org/apache/kafka/streams/kstream/ValueJoiner.java +++ b/streams/src/main/java/org/apache/kafka/streams/kstream/ValueJoiner.java @@ -27,11 +27,11 @@ package org.apache.kafka.streams.kstream; * @param second value type * @param joined value type * @see KStream#join(KStream, ValueJoiner, JoinWindows) - * @see KStream#join(KStream, ValueJoiner, JoinWindows, Joined) + * @see KStream#join(KStream, ValueJoiner, JoinWindows, StreamJoined) * @see KStream#leftJoin(KStream, ValueJoiner, JoinWindows) - * @see KStream#leftJoin(KStream, ValueJoiner, JoinWindows, Joined) + * @see KStream#leftJoin(KStream, ValueJoiner, JoinWindows, StreamJoined) * @see KStream#outerJoin(KStream, ValueJoiner, JoinWindows) - * @see KStream#outerJoin(KStream, ValueJoiner, JoinWindows, Joined) + * @see KStream#outerJoin(KStream, ValueJoiner, JoinWindows, StreamJoined) * @see KStream#join(KTable, ValueJoiner) * @see KStream#join(KTable, ValueJoiner, Joined) * @see KStream#leftJoin(KTable, ValueJoiner) diff --git a/streams/streams-scala/src/main/scala/org/apache/kafka/streams/scala/kstream/KStream.scala b/streams/streams-scala/src/main/scala/org/apache/kafka/streams/scala/kstream/KStream.scala index 9d19418a0a9..4627875f8f0 100644 --- a/streams/streams-scala/src/main/scala/org/apache/kafka/streams/scala/kstream/KStream.scala +++ b/streams/streams-scala/src/main/scala/org/apache/kafka/streams/scala/kstream/KStream.scala @@ -37,7 +37,8 @@ import org.apache.kafka.streams.scala.ImplicitConversions._ import scala.collection.JavaConverters._ /** - * Wraps the Java class [[org.apache.kafka.streams.kstream.KStream]] and delegates method calls to the underlying Java object. + * Wraps the Java class [[org.apache.kafka.streams.kstream.KStream KStream]] and delegates method calls to the + * underlying Java object. * * @tparam K Type of keys * @tparam V Type of values @@ -48,8 +49,7 @@ import scala.collection.JavaConverters._ class KStream[K, V](val inner: KStreamJ[K, V]) { /** - * Create a new [[KStream]] that consists all records of this stream which satisfies the given - * predicate + * Create a new [[KStream]] that consists all records of this stream which satisfies the given predicate. * * @param predicate a filter that is applied to each record * @return a [[KStream]] that contains only those records that satisfy the given predicate @@ -60,7 +60,7 @@ class KStream[K, V](val inner: KStreamJ[K, V]) { /** * Create a new [[KStream]] that consists all records of this stream which do not satisfy the given - * predicate + * predicate. * * @param predicate a filter that is applied to each record * @return a [[KStream]] that contains only those records that do not satisfy the given predicate @@ -432,7 +432,7 @@ class KStream[K, V](val inner: KStreamJ[K, V]) { /** * Join records of this stream with another [[KStream]]'s records using windowed inner equi join with - * serializers and deserializers supplied by the implicit `Joined` instance. + * serializers and deserializers supplied by the implicit `StreamJoined` instance. * * @param otherStream the [[KStream]] to be joined with this stream * @param joiner a function that computes the join result for a pair of matching records @@ -466,7 +466,9 @@ class KStream[K, V](val inner: KStreamJ[K, V]) { * @return a [[KStream]] that contains join-records for each key and values computed by the given `joiner`, * one for each matched record-pair with the same key * @see `org.apache.kafka.streams.kstream.KStream#join` + * @deprecated since 2.4. Use [[KStream#join(KStream, ValueJoiner, JoinWindows, StreamJoined)]] instead. */ + @deprecated def join[VT, VR](table: KTable[K, VT])(joiner: (V, VT) => VR)(implicit joined: Joined[K, V, VT]): KStream[K, VR] = inner.join[VT, VR](table.inner, joiner.asValueJoiner, joined) @@ -493,7 +495,7 @@ class KStream[K, V](val inner: KStreamJ[K, V]) { /** * Join records of this stream with another [[KStream]]'s records using windowed left equi join with - * serializers and deserializers supplied by the implicit `Joined` instance. + * serializers and deserializers supplied by the implicit `StreamJoined` instance. * * @param otherStream the [[KStream]] to be joined with this stream * @param joiner a function that computes the join result for a pair of matching records @@ -527,7 +529,9 @@ class KStream[K, V](val inner: KStreamJ[K, V]) { * @return a [[KStream]] that contains join-records for each key and values computed by the given `joiner`, * one for each matched record-pair with the same key * @see `org.apache.kafka.streams.kstream.KStream#leftJoin` + * @deprecated since 2.4. Use [[KStream#leftJoin(KStream, ValueJoiner, JoinWindows, StreamJoined)]] instead. */ + @deprecated def leftJoin[VT, VR](table: KTable[K, VT])(joiner: (V, VT) => VR)(implicit joined: Joined[K, V, VT]): KStream[K, VR] = inner.leftJoin[VT, VR](table.inner, joiner.asValueJoiner, joined)