Browse Source

MINOR: Fix Kafka Streams JavaDocs with regard to new StreamJoined class (#7627)

Reviewers: Bruno Cadonna <bruno@confluent.io>, Bill Bejeck <bbejeck@gmail.com>
pull/7634/head
Matthias J. Sax 5 years ago committed by Bill Bejeck
parent
commit
2421a69556
  1. 6
      streams/src/main/java/org/apache/kafka/streams/kstream/JoinWindows.java
  2. 3
      streams/src/main/java/org/apache/kafka/streams/kstream/Joined.java
  3. 49
      streams/src/main/java/org/apache/kafka/streams/kstream/KStream.java
  4. 8
      streams/src/main/java/org/apache/kafka/streams/kstream/Printed.java
  5. 3
      streams/src/main/java/org/apache/kafka/streams/kstream/StreamJoined.java
  6. 6
      streams/src/main/java/org/apache/kafka/streams/kstream/ValueJoiner.java
  7. 16
      streams/streams-scala/src/main/scala/org/apache/kafka/streams/scala/kstream/KStream.scala

6
streams/src/main/java/org/apache/kafka/streams/kstream/JoinWindows.java

@ -61,11 +61,11 @@ import static org.apache.kafka.streams.kstream.internals.WindowingDefaults.DEFAU
* @see UnlimitedWindows * @see UnlimitedWindows
* @see SessionWindows * @see SessionWindows
* @see KStream#join(KStream, ValueJoiner, JoinWindows) * @see KStream#join(KStream, ValueJoiner, JoinWindows)
* @see KStream#join(KStream, ValueJoiner, JoinWindows, Joined) * @see KStream#join(KStream, ValueJoiner, JoinWindows, StreamJoined)
* @see KStream#leftJoin(KStream, ValueJoiner, JoinWindows) * @see KStream#leftJoin(KStream, ValueJoiner, JoinWindows)
* @see KStream#leftJoin(KStream, ValueJoiner, JoinWindows, Joined) * @see KStream#leftJoin(KStream, ValueJoiner, JoinWindows, StreamJoined)
* @see KStream#outerJoin(KStream, ValueJoiner, JoinWindows) * @see KStream#outerJoin(KStream, ValueJoiner, JoinWindows)
* @see KStream#outerJoin(KStream, ValueJoiner, JoinWindows, Joined) * @see KStream#outerJoin(KStream, ValueJoiner, JoinWindows, StreamJoined)
* @see TimestampExtractor * @see TimestampExtractor
*/ */
public final class JoinWindows extends Windows<Window> { public final class JoinWindows extends Windows<Window> {

3
streams/src/main/java/org/apache/kafka/streams/kstream/Joined.java

@ -20,7 +20,8 @@ import org.apache.kafka.common.serialization.Serde;
/** /**
* The {@code Joined} class represents optional params that can be passed to * The {@code Joined} class represents optional params that can be passed to
* {@link KStream#join}, {@link KStream#leftJoin}, and {@link KStream#outerJoin} operations. * {@link KStream#join(KTable, ValueJoiner, Joined) KStream#join(KTable,...)} and
* {@link KStream#leftJoin(KTable, ValueJoiner) KStream#leftJoin(KTable,...)} operations.
*/ */
public class Joined<K, V, VO> implements NamedOperation<Joined<K, V, VO>> { public class Joined<K, V, VO> implements NamedOperation<Joined<K, V, VO>> {

49
streams/src/main/java/org/apache/kafka/streams/kstream/KStream.java

@ -2367,8 +2367,8 @@ public interface KStream<K, V> {
/** /**
* Join records of this stream with another {@code KStream}'s records using windowed inner equi join using the * Join records of this stream with another {@code KStream}'s records using windowed inner equi join using the
* {@link Joined} instance for configuration of the {@link Serde key serde}, {@link Serde this stream's value serde}, * {@link StreamJoined} instance for configuration of the {@link Serde key serde}, {@link Serde this stream's value
* and {@link Serde the other stream's value serde}. * serde}, {@link Serde the other stream's value serde}, and used state stores.
* The join is computed on the records' key with join attribute {@code thisKStream.key == otherKStream.key}. * The join is computed on the records' key with join attribute {@code thisKStream.key == otherKStream.key}.
* Furthermore, two records are only joined if their timestamps are close to each other as defined by the given * Furthermore, two records are only joined if their timestamps are close to each other as defined by the given
* {@link JoinWindows}, i.e., the window defines an additional join predicate on the record timestamps. * {@link JoinWindows}, i.e., the window defines an additional join predicate on the record timestamps.
@ -2437,8 +2437,8 @@ public interface KStream<K, V> {
* @param streamJoined a {@link StreamJoined} used to configure join stores * @param streamJoined a {@link StreamJoined} used to configure join stores
* @return a {@code KStream} that contains join-records for each key and values computed by the given * @return a {@code KStream} that contains join-records for each key and values computed by the given
* {@link ValueJoiner}, one for each matched record-pair with the same key and within the joining window intervals * {@link ValueJoiner}, one for each matched record-pair with the same key and within the joining window intervals
* @see #leftJoin(KStream, ValueJoiner, JoinWindows, Joined) * @see #leftJoin(KStream, ValueJoiner, JoinWindows, StreamJoined)
* @see #outerJoin(KStream, ValueJoiner, JoinWindows, Joined) * @see #outerJoin(KStream, ValueJoiner, JoinWindows, StreamJoined)
*/ */
<VO, VR> KStream<K, VR> join(final KStream<K, VO> otherStream, <VO, VR> KStream<K, VR> join(final KStream<K, VO> otherStream,
final ValueJoiner<? super V, ? super VO, ? extends VR> joiner, final ValueJoiner<? super V, ? super VO, ? extends VR> joiner,
@ -2613,8 +2613,8 @@ public interface KStream<K, V> {
/** /**
* Join records of this stream with another {@code KStream}'s records using windowed left equi join using the * Join records of this stream with another {@code KStream}'s records using windowed left equi join using the
* {@link Joined} instance for configuration of the {@link Serde key serde}, {@link Serde this stream's value serde}, * {@link StreamJoined} instance for configuration of the {@link Serde key serde}, {@link Serde this stream's value
* and {@link Serde the other stream's value serde}. * serde}, {@link Serde the other stream's value serde}, and used state stores.
* In contrast to {@link #join(KStream, ValueJoiner, JoinWindows) inner-join}, all records from this stream will * In contrast to {@link #join(KStream, ValueJoiner, JoinWindows) inner-join}, all records from this stream will
* produce at least one output record (cf. below). * produce at least one output record (cf. below).
* The join is computed on the records' key with join attribute {@code thisKStream.key == otherKStream.key}. * The join is computed on the records' key with join attribute {@code thisKStream.key == otherKStream.key}.
@ -2678,17 +2678,17 @@ public interface KStream<K, V> {
* <p> * <p>
* You can retrieve all generated internal topic names via {@link Topology#describe()}. * You can retrieve all generated internal topic names via {@link Topology#describe()}.
* *
* @param <VO> the value type of the other stream * @param <VO> the value type of the other stream
* @param <VR> the value type of the result stream * @param <VR> the value type of the result stream
* @param otherStream the {@code KStream} to be joined with this stream * @param otherStream the {@code KStream} to be joined with this stream
* @param joiner a {@link ValueJoiner} that computes the join result for a pair of matching records * @param joiner a {@link ValueJoiner} that computes the join result for a pair of matching records
* @param windows the specification of the {@link JoinWindows} * @param windows the specification of the {@link JoinWindows}
* @param streamJoined * @param streamJoined a {@link StreamJoined} instance to configure serdes and state stores
* @return a {@code KStream} that contains join-records for each key and values computed by the given * @return a {@code KStream} that contains join-records for each key and values computed by the given
* {@link ValueJoiner}, one for each matched record-pair with the same key plus one for each non-matching record of * {@link ValueJoiner}, one for each matched record-pair with the same key plus one for each non-matching record of
* this {@code KStream} and within the joining window intervals * this {@code KStream} and within the joining window intervals
* @see #join(KStream, ValueJoiner, JoinWindows, Joined) * @see #join(KStream, ValueJoiner, JoinWindows, StreamJoined)
* @see #outerJoin(KStream, ValueJoiner, JoinWindows, Joined) * @see #outerJoin(KStream, ValueJoiner, JoinWindows, StreamJoined)
*/ */
<VO, VR> KStream<K, VR> leftJoin(final KStream<K, VO> otherStream, <VO, VR> KStream<K, VR> leftJoin(final KStream<K, VO> otherStream,
final ValueJoiner<? super V, ? super VO, ? extends VR> joiner, final ValueJoiner<? super V, ? super VO, ? extends VR> joiner,
@ -2865,8 +2865,8 @@ public interface KStream<K, V> {
/** /**
* Join records of this stream with another {@code KStream}'s records using windowed outer equi join using the * Join records of this stream with another {@code KStream}'s records using windowed outer equi join using the
* {@link Joined} instance for configuration of the {@link Serde key serde}, {@link Serde this stream's value serde}, * {@link StreamJoined} instance for configuration of the {@link Serde key serde}, {@link Serde this stream's value
* and {@link Serde the other stream's value serde}. * serde}, {@link Serde the other stream's value serde}, and used state stores.
* In contrast to {@link #join(KStream, ValueJoiner, JoinWindows) inner-join} or * In contrast to {@link #join(KStream, ValueJoiner, JoinWindows) inner-join} or
* {@link #leftJoin(KStream, ValueJoiner, JoinWindows) left-join}, all records from both streams will produce at * {@link #leftJoin(KStream, ValueJoiner, JoinWindows) left-join}, all records from both streams will produce at
* least one output record (cf. below). * least one output record (cf. below).
@ -2931,17 +2931,17 @@ public interface KStream<K, V> {
* <p> * <p>
* You can retrieve all generated internal topic names via {@link Topology#describe()}. * You can retrieve all generated internal topic names via {@link Topology#describe()}.
* *
* @param <VO> the value type of the other stream * @param <VO> the value type of the other stream
* @param <VR> the value type of the result stream * @param <VR> the value type of the result stream
* @param otherStream the {@code KStream} to be joined with this stream * @param otherStream the {@code KStream} to be joined with this stream
* @param joiner a {@link ValueJoiner} that computes the join result for a pair of matching records * @param joiner a {@link ValueJoiner} that computes the join result for a pair of matching records
* @param windows the specification of the {@link JoinWindows} * @param windows the specification of the {@link JoinWindows}
* @param streamJoined * @param streamJoined a {@link StreamJoined} instance to configure serdes and state stores
* @return a {@code KStream} that contains join-records for each key and values computed by the given * @return a {@code KStream} that contains join-records for each key and values computed by the given
* {@link ValueJoiner}, one for each matched record-pair with the same key plus one for each non-matching record of * {@link ValueJoiner}, one for each matched record-pair with the same key plus one for each non-matching record of
* both {@code KStream} and within the joining window intervals * both {@code KStream} and within the joining window intervals
* @see #join(KStream, ValueJoiner, JoinWindows, Joined) * @see #join(KStream, ValueJoiner, JoinWindows, StreamJoined)
* @see #leftJoin(KStream, ValueJoiner, JoinWindows, Joined) * @see #leftJoin(KStream, ValueJoiner, JoinWindows, StreamJoined)
*/ */
<VO, VR> KStream<K, VR> outerJoin(final KStream<K, VO> otherStream, <VO, VR> KStream<K, VR> outerJoin(final KStream<K, VO> otherStream,
final ValueJoiner<? super V, ? super VO, ? extends VR> joiner, final ValueJoiner<? super V, ? super VO, ? extends VR> joiner,
@ -3327,6 +3327,7 @@ public interface KStream<K, V> {
final KeyValueMapper<? super K, ? super V, ? extends GK> keyValueMapper, final KeyValueMapper<? super K, ? super V, ? extends GK> keyValueMapper,
final ValueJoiner<? super V, ? super GV, ? extends RV> joiner, final ValueJoiner<? super V, ? super GV, ? extends RV> joiner,
final Named named); final Named named);
/** /**
* Join records of this stream with {@link GlobalKTable}'s records using non-windowed left equi join. * Join records of this stream with {@link GlobalKTable}'s records using non-windowed left equi join.
* In contrast to {@link #join(GlobalKTable, KeyValueMapper, ValueJoiner) inner-join}, all records from this stream * In contrast to {@link #join(GlobalKTable, KeyValueMapper, ValueJoiner) inner-join}, all records from this stream

8
streams/src/main/java/org/apache/kafka/streams/kstream/Printed.java

@ -35,12 +35,8 @@ public class Printed<K, V> implements NamedOperation<Printed<K, V>> {
protected final OutputStream outputStream; protected final OutputStream outputStream;
protected String label; protected String label;
protected String processorName; protected String processorName;
protected KeyValueMapper<? super K, ? super V, String> mapper = new KeyValueMapper<K, V, String>() { protected KeyValueMapper<? super K, ? super V, String> mapper =
@Override (KeyValueMapper<K, V, String>) (key, value) -> String.format("%s, %s", key, value);
public String apply(final K key, final V value) {
return String.format("%s, %s", key, value);
}
};
private Printed(final OutputStream outputStream) { private Printed(final OutputStream outputStream) {
this.outputStream = outputStream; this.outputStream = outputStream;

3
streams/src/main/java/org/apache/kafka/streams/kstream/StreamJoined.java

@ -27,8 +27,7 @@ import org.apache.kafka.streams.state.WindowBytesStoreSupplier;
* @param <V1> this value type * @param <V1> this value type
* @param <V2> other value type * @param <V2> other value type
*/ */
public class StreamJoined<K, V1, V2> public class StreamJoined<K, V1, V2> implements NamedOperation<StreamJoined<K, V1, V2>> {
implements NamedOperation<StreamJoined<K, V1, V2>> {
protected final Serde<K> keySerde; protected final Serde<K> keySerde;
protected final Serde<V1> valueSerde; protected final Serde<V1> valueSerde;

6
streams/src/main/java/org/apache/kafka/streams/kstream/ValueJoiner.java

@ -27,11 +27,11 @@ package org.apache.kafka.streams.kstream;
* @param <V2> second value type * @param <V2> second value type
* @param <VR> joined value type * @param <VR> joined value type
* @see KStream#join(KStream, ValueJoiner, JoinWindows) * @see KStream#join(KStream, ValueJoiner, JoinWindows)
* @see KStream#join(KStream, ValueJoiner, JoinWindows, Joined) * @see KStream#join(KStream, ValueJoiner, JoinWindows, StreamJoined)
* @see KStream#leftJoin(KStream, ValueJoiner, JoinWindows) * @see KStream#leftJoin(KStream, ValueJoiner, JoinWindows)
* @see KStream#leftJoin(KStream, ValueJoiner, JoinWindows, Joined) * @see KStream#leftJoin(KStream, ValueJoiner, JoinWindows, StreamJoined)
* @see KStream#outerJoin(KStream, ValueJoiner, JoinWindows) * @see KStream#outerJoin(KStream, ValueJoiner, JoinWindows)
* @see KStream#outerJoin(KStream, ValueJoiner, JoinWindows, Joined) * @see KStream#outerJoin(KStream, ValueJoiner, JoinWindows, StreamJoined)
* @see KStream#join(KTable, ValueJoiner) * @see KStream#join(KTable, ValueJoiner)
* @see KStream#join(KTable, ValueJoiner, Joined) * @see KStream#join(KTable, ValueJoiner, Joined)
* @see KStream#leftJoin(KTable, ValueJoiner) * @see KStream#leftJoin(KTable, ValueJoiner)

16
streams/streams-scala/src/main/scala/org/apache/kafka/streams/scala/kstream/KStream.scala

@ -37,7 +37,8 @@ import org.apache.kafka.streams.scala.ImplicitConversions._
import scala.collection.JavaConverters._ import scala.collection.JavaConverters._
/** /**
* Wraps the Java class [[org.apache.kafka.streams.kstream.KStream]] and delegates method calls to the underlying Java object. * Wraps the Java class [[org.apache.kafka.streams.kstream.KStream KStream]] and delegates method calls to the
* underlying Java object.
* *
* @tparam K Type of keys * @tparam K Type of keys
* @tparam V Type of values * @tparam V Type of values
@ -48,8 +49,7 @@ import scala.collection.JavaConverters._
class KStream[K, V](val inner: KStreamJ[K, V]) { class KStream[K, V](val inner: KStreamJ[K, V]) {
/** /**
* Create a new [[KStream]] that consists all records of this stream which satisfies the given * Create a new [[KStream]] that consists all records of this stream which satisfies the given predicate.
* predicate
* *
* @param predicate a filter that is applied to each record * @param predicate a filter that is applied to each record
* @return a [[KStream]] that contains only those records that satisfy the given predicate * @return a [[KStream]] that contains only those records that satisfy the given predicate
@ -60,7 +60,7 @@ class KStream[K, V](val inner: KStreamJ[K, V]) {
/** /**
* Create a new [[KStream]] that consists all records of this stream which do <em>not</em> satisfy the given * Create a new [[KStream]] that consists all records of this stream which do <em>not</em> satisfy the given
* predicate * predicate.
* *
* @param predicate a filter that is applied to each record * @param predicate a filter that is applied to each record
* @return a [[KStream]] that contains only those records that do <em>not</em> satisfy the given predicate * @return a [[KStream]] that contains only those records that do <em>not</em> satisfy the given predicate
@ -432,7 +432,7 @@ class KStream[K, V](val inner: KStreamJ[K, V]) {
/** /**
* Join records of this stream with another [[KStream]]'s records using windowed inner equi join with * Join records of this stream with another [[KStream]]'s records using windowed inner equi join with
* serializers and deserializers supplied by the implicit `Joined` instance. * serializers and deserializers supplied by the implicit `StreamJoined` instance.
* *
* @param otherStream the [[KStream]] to be joined with this stream * @param otherStream the [[KStream]] to be joined with this stream
* @param joiner a function that computes the join result for a pair of matching records * @param joiner a function that computes the join result for a pair of matching records
@ -466,7 +466,9 @@ class KStream[K, V](val inner: KStreamJ[K, V]) {
* @return a [[KStream]] that contains join-records for each key and values computed by the given `joiner`, * @return a [[KStream]] that contains join-records for each key and values computed by the given `joiner`,
* one for each matched record-pair with the same key * one for each matched record-pair with the same key
* @see `org.apache.kafka.streams.kstream.KStream#join` * @see `org.apache.kafka.streams.kstream.KStream#join`
* @deprecated since 2.4. Use [[KStream#join(KStream, ValueJoiner, JoinWindows, StreamJoined)]] instead.
*/ */
@deprecated
def join[VT, VR](table: KTable[K, VT])(joiner: (V, VT) => VR)(implicit joined: Joined[K, V, VT]): KStream[K, VR] = def join[VT, VR](table: KTable[K, VT])(joiner: (V, VT) => VR)(implicit joined: Joined[K, V, VT]): KStream[K, VR] =
inner.join[VT, VR](table.inner, joiner.asValueJoiner, joined) inner.join[VT, VR](table.inner, joiner.asValueJoiner, joined)
@ -493,7 +495,7 @@ class KStream[K, V](val inner: KStreamJ[K, V]) {
/** /**
* Join records of this stream with another [[KStream]]'s records using windowed left equi join with * Join records of this stream with another [[KStream]]'s records using windowed left equi join with
* serializers and deserializers supplied by the implicit `Joined` instance. * serializers and deserializers supplied by the implicit `StreamJoined` instance.
* *
* @param otherStream the [[KStream]] to be joined with this stream * @param otherStream the [[KStream]] to be joined with this stream
* @param joiner a function that computes the join result for a pair of matching records * @param joiner a function that computes the join result for a pair of matching records
@ -527,7 +529,9 @@ class KStream[K, V](val inner: KStreamJ[K, V]) {
* @return a [[KStream]] that contains join-records for each key and values computed by the given `joiner`, * @return a [[KStream]] that contains join-records for each key and values computed by the given `joiner`,
* one for each matched record-pair with the same key * one for each matched record-pair with the same key
* @see `org.apache.kafka.streams.kstream.KStream#leftJoin` * @see `org.apache.kafka.streams.kstream.KStream#leftJoin`
* @deprecated since 2.4. Use [[KStream#leftJoin(KStream, ValueJoiner, JoinWindows, StreamJoined)]] instead.
*/ */
@deprecated
def leftJoin[VT, VR](table: KTable[K, VT])(joiner: (V, VT) => VR)(implicit joined: Joined[K, V, VT]): KStream[K, VR] = def leftJoin[VT, VR](table: KTable[K, VT])(joiner: (V, VT) => VR)(implicit joined: Joined[K, V, VT]): KStream[K, VR] =
inner.leftJoin[VT, VR](table.inner, joiner.asValueJoiner, joined) inner.leftJoin[VT, VR](table.inner, joiner.asValueJoiner, joined)

Loading…
Cancel
Save