Browse Source

KAFKA-4001: Improve Kafka Streams Join Semantics (KIP-77)

- fixed leftJoin -> outerJoin test bug
 - simplified to only use values
 - fixed inner KTable-KTable join
 - fixed left KTable-KTable join
 - fixed outer KTable-KTable join
 - fixed inner, left, and outer left KStream-KStream joins
 - added inner KStream-KTable join
 - fixed left KStream-KTable join

Author: Matthias J. Sax <matthias@confluent.io>

Reviewers: Damian Guy <damian.guy@gmail.com>, Guozhang Wang <wangguoz@gmail.com>

Closes #1777 from mjsax/kafka-4001-joins
pull/1777/merge
Matthias J. Sax 8 years ago committed by Guozhang Wang
parent
commit
62c0972efc
  1. 44
      streams/src/main/java/org/apache/kafka/streams/kstream/KStream.java
  2. 272
      streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamImpl.java
  3. 27
      streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamJoin.java
  4. 75
      streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamKTableJoin.java
  5. 66
      streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamKTableLeftJoin.java
  6. 2
      streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamWindowAggregate.java
  7. 103
      streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableImpl.java
  8. 21
      streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableKTableJoin.java
  9. 19
      streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableKTableLeftJoin.java
  10. 16
      streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableKTableOuterJoin.java
  11. 24
      streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableKTableRightJoin.java
  12. 4
      streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorContextImpl.java
  13. 433
      streams/src/test/java/org/apache/kafka/streams/integration/JoinIntegrationTest.java
  14. 74
      streams/src/test/java/org/apache/kafka/streams/integration/KTableKTableJoinIntegrationTest.java
  15. 98
      streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamLeftJoinTest.java
  16. 146
      streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamKTableJoinTest.java
  17. 22
      streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamWindowAggregateTest.java
  18. 93
      streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableKTableJoinTest.java
  19. 18
      streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableKTableLeftJoinTest.java
  20. 12
      streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableKTableOuterJoinTest.java

44
streams/src/main/java/org/apache/kafka/streams/kstream/KStream.java

@ -5,9 +5,9 @@ @@ -5,9 +5,9 @@
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* <p>
* http://www.apache.org/licenses/LICENSE-2.0
* <p>
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
@ -532,6 +532,39 @@ public interface KStream<K, V> { @@ -532,6 +532,39 @@ public interface KStream<K, V> {
ValueJoiner<V, V1, R> joiner,
JoinWindows windows);
/**
* Combine values of this stream with {@link KTable}'s elements of the same key using non-windowed Inner Join.
* If a record key or value is {@code null} it will not included in the resulting {@link KStream}
*
* @param table the instance of {@link KTable} joined with this stream
* @param joiner the instance of {@link ValueJoiner}
* @param <V1> the value type of the table
* @param <V2> the value type of the new stream
* @return a {@link KStream} that contains join-records for each key and values computed by the given {@link ValueJoiner},
* one for each matched record-pair with the same key
*/
<V1, V2> KStream<K, V2> join(KTable<K, V1> table, ValueJoiner<V, V1, V2> joiner);
/**
* Combine values of this stream with {@link KTable}'s elements of the same key using non-windowed Inner Join.
* If a record key or value is {@code null} it will not included in the resulting {@link KStream}
*
* @param table the instance of {@link KTable} joined with this stream
* @param valueJoiner the instance of {@link ValueJoiner}
* @param keySerde key serdes for materializing this stream.
* If not specified the default serdes defined in the configs will be used
* @param valSerde value serdes for materializing this stream,
* if not specified the default serdes defined in the configs will be used
* @param <V1> the value type of the table
* @param <V2> the value type of the new stream
* @return a {@link KStream} that contains join-records for each key and values computed by the given {@link ValueJoiner},
* one for each matched record-pair with the same key and within the joining window intervals
*/
<V1, V2> KStream<K, V2> join(KTable<K, V1> table,
ValueJoiner<V, V1, V2> valueJoiner,
Serde<K> keySerde,
Serde<V> valSerde);
/**
* Combine values of this stream with {@link KTable}'s elements of the same key using non-windowed Left Join.
* If a record key is null it will not included in the resulting {@link KStream}
@ -566,6 +599,7 @@ public interface KStream<K, V> { @@ -566,6 +599,7 @@ public interface KStream<K, V> {
ValueJoiner<V, V1, V2> valueJoiner,
Serde<K> keySerde,
Serde<V> valSerde);
/**
* Group the records of this {@link KStream} using the provided {@link KeyValueMapper} and
* default serializers and deserializers. If a record key is null it will not included in
@ -592,8 +626,8 @@ public interface KStream<K, V> { @@ -592,8 +626,8 @@ public interface KStream<K, V> {
* @return a {@link KGroupedStream} that contains the grouped records of the original {@link KStream}
*/
<K1> KGroupedStream<K1, V> groupBy(KeyValueMapper<K, V, K1> selector,
Serde<K1> keySerde,
Serde<V> valSerde);
Serde<K1> keySerde,
Serde<V> valSerde);
/**
* Group the records with the same key into a {@link KGroupedStream} while preserving the

272
streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamImpl.java

@ -5,9 +5,9 @@ @@ -5,9 +5,9 @@
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* <p>
* http://www.apache.org/licenses/LICENSE-2.0
* <p>
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
@ -20,24 +20,25 @@ package org.apache.kafka.streams.kstream.internals; @@ -20,24 +20,25 @@ package org.apache.kafka.streams.kstream.internals;
import org.apache.kafka.common.serialization.Deserializer;
import org.apache.kafka.common.serialization.Serde;
import org.apache.kafka.common.serialization.Serializer;
import org.apache.kafka.streams.KeyValue;
import org.apache.kafka.streams.errors.TopologyBuilderException;
import org.apache.kafka.streams.kstream.ForeachAction;
import org.apache.kafka.streams.kstream.JoinWindows;
import org.apache.kafka.streams.kstream.KGroupedStream;
import org.apache.kafka.streams.kstream.KStream;
import org.apache.kafka.streams.kstream.KStreamBuilder;
import org.apache.kafka.streams.kstream.KTable;
import org.apache.kafka.streams.KeyValue;
import org.apache.kafka.streams.kstream.ForeachAction;
import org.apache.kafka.streams.kstream.TransformerSupplier;
import org.apache.kafka.streams.kstream.ValueJoiner;
import org.apache.kafka.streams.kstream.ValueTransformerSupplier;
import org.apache.kafka.streams.kstream.KStream;
import org.apache.kafka.streams.kstream.KeyValueMapper;
import org.apache.kafka.streams.kstream.Predicate;
import org.apache.kafka.streams.kstream.TransformerSupplier;
import org.apache.kafka.streams.kstream.ValueJoiner;
import org.apache.kafka.streams.kstream.ValueMapper;
import org.apache.kafka.streams.kstream.ValueTransformerSupplier;
import org.apache.kafka.streams.processor.ProcessorSupplier;
import org.apache.kafka.streams.processor.StateStoreSupplier;
import org.apache.kafka.streams.processor.StreamPartitioner;
import org.apache.kafka.streams.state.Stores;
import java.io.FileNotFoundException;
import java.io.FileOutputStream;
import java.io.PrintStream;
@ -63,6 +64,8 @@ public class KStreamImpl<K, V> extends AbstractStream<K> implements KStream<K, V @@ -63,6 +64,8 @@ public class KStreamImpl<K, V> extends AbstractStream<K> implements KStream<K, V
public static final String JOINOTHER_NAME = "KSTREAM-JOINOTHER-";
public static final String JOIN_NAME = "KSTREAM-JOIN-";
public static final String LEFTJOIN_NAME = "KSTREAM-LEFTJOIN-";
private static final String MAP_NAME = "KSTREAM-MAP-";
@ -345,7 +348,7 @@ public class KStreamImpl<K, V> extends AbstractStream<K> implements KStream<K, V @@ -345,7 +348,7 @@ public class KStreamImpl<K, V> extends AbstractStream<K> implements KStream<K, V
Serializer<K> keySerializer = keySerde == null ? null : keySerde.serializer();
Serializer<V> valSerializer = valSerde == null ? null : valSerde.serializer();
if (partitioner == null && keySerializer != null && keySerializer instanceof WindowedSerializer) {
WindowedSerializer<Object> windowedSerializer = (WindowedSerializer<Object>) keySerializer;
partitioner = (StreamPartitioner<K, V>) new WindowedStreamPartitioner<Object, V>(windowedSerializer);
@ -386,78 +389,59 @@ public class KStreamImpl<K, V> extends AbstractStream<K> implements KStream<K, V @@ -386,78 +389,59 @@ public class KStreamImpl<K, V> extends AbstractStream<K> implements KStream<K, V
@Override
public <V1, R> KStream<K, R> join(
KStream<K, V1> other,
ValueJoiner<V, V1, R> joiner,
JoinWindows windows,
Serde<K> keySerde,
Serde<V> thisValueSerde,
Serde<V1> otherValueSerde) {
final KStream<K, V1> other,
final ValueJoiner<V, V1, R> joiner,
final JoinWindows windows,
final Serde<K> keySerde,
final Serde<V> thisValueSerde,
final Serde<V1> otherValueSerde) {
return join(other, joiner, windows, keySerde, thisValueSerde, otherValueSerde, false);
return doJoin(other, joiner, windows, keySerde, thisValueSerde, otherValueSerde, new KStreamImplJoin(false, false));
}
@Override
public <V1, R> KStream<K, R> join(
KStream<K, V1> other,
ValueJoiner<V, V1, R> joiner,
JoinWindows windows) {
final KStream<K, V1> other,
final ValueJoiner<V, V1, R> joiner,
final JoinWindows windows) {
return join(other, joiner, windows, null, null, null, false);
return join(other, joiner, windows, null, null, null);
}
@Override
public <V1, R> KStream<K, R> outerJoin(
KStream<K, V1> other,
ValueJoiner<V, V1, R> joiner,
JoinWindows windows,
Serde<K> keySerde,
Serde<V> thisValueSerde,
Serde<V1> otherValueSerde) {
final KStream<K, V1> other,
final ValueJoiner<V, V1, R> joiner,
final JoinWindows windows,
final Serde<K> keySerde,
final Serde<V> thisValueSerde,
final Serde<V1> otherValueSerde) {
return join(other, joiner, windows, keySerde, thisValueSerde, otherValueSerde, true);
return doJoin(other, joiner, windows, keySerde, thisValueSerde, otherValueSerde, new KStreamImplJoin(true, true));
}
@Override
public <V1, R> KStream<K, R> outerJoin(
KStream<K, V1> other,
ValueJoiner<V, V1, R> joiner,
JoinWindows windows) {
final KStream<K, V1> other,
final ValueJoiner<V, V1, R> joiner,
final JoinWindows windows) {
return join(other, joiner, windows, null, null, null, true);
return outerJoin(other, joiner, windows, null, null, null);
}
@SuppressWarnings("unchecked")
private <V1, R> KStream<K, R> join(
KStream<K, V1> other,
ValueJoiner<V, V1, R> joiner,
JoinWindows windows,
Serde<K> keySerde,
Serde<V> thisValueSerde,
Serde<V1> otherValueSerde,
boolean outer) {
return doJoin(other,
joiner,
windows,
keySerde,
thisValueSerde,
otherValueSerde,
new DefaultJoin(outer));
}
private <V1, R> KStream<K, R> doJoin(KStream<K, V1> other,
ValueJoiner<V, V1, R> joiner,
JoinWindows windows,
Serde<K> keySerde,
Serde<V> thisValueSerde,
Serde<V1> otherValueSerde,
KStreamImplJoin join) {
private <V1, R> KStream<K, R> doJoin(final KStream<K, V1> other,
final ValueJoiner<V, V1, R> joiner,
final JoinWindows windows,
final Serde<K> keySerde,
final Serde<V> thisValueSerde,
final Serde<V1> otherValueSerde,
final KStreamImplJoin join) {
Objects.requireNonNull(other, "other KStream can't be null");
Objects.requireNonNull(joiner, "joiner can't be null");
Objects.requireNonNull(windows, "windows can't be null");
KStreamImpl<K, V> joinThis = this;
KStreamImpl<K, V1> joinOther = (KStreamImpl) other;
KStreamImpl<K, V1> joinOther = (KStreamImpl<K, V1>) other;
if (joinThis.repartitionRequired) {
joinThis = joinThis.repartitionForJoin(keySerde, thisValueSerde, null);
@ -531,20 +515,20 @@ public class KStreamImpl<K, V> extends AbstractStream<K> implements KStream<K, V @@ -531,20 +515,20 @@ public class KStreamImpl<K, V> extends AbstractStream<K> implements KStream<K, V
@SuppressWarnings("unchecked")
@Override
public <V1, R> KStream<K, R> leftJoin(
KStream<K, V1> other,
ValueJoiner<V, V1, R> joiner,
JoinWindows windows,
Serde<K> keySerde,
Serde<V> thisValSerde,
Serde<V1> otherValueSerde) {
final KStream<K, V1> other,
final ValueJoiner<V, V1, R> joiner,
final JoinWindows windows,
final Serde<K> keySerde,
final Serde<V> thisValSerde,
final Serde<V1> otherValueSerde) {
return doJoin(other,
joiner,
windows,
keySerde,
thisValSerde,
otherValueSerde,
new LeftJoin());
joiner,
windows,
keySerde,
thisValSerde,
otherValueSerde,
new KStreamImplJoin(true, false));
}
@Override
@ -558,41 +542,60 @@ public class KStreamImpl<K, V> extends AbstractStream<K> implements KStream<K, V @@ -558,41 +542,60 @@ public class KStreamImpl<K, V> extends AbstractStream<K> implements KStream<K, V
@SuppressWarnings("unchecked")
@Override
public <V1, R> KStream<K, R> leftJoin(KTable<K, V1> other, ValueJoiner<V, V1, R> joiner) {
return leftJoin(other, joiner, null, null);
public <V1, R> KStream<K, R> join(final KTable<K, V1> other, final ValueJoiner<V, V1, R> joiner) {
return join(other, joiner, null, null);
}
public <V1, R> KStream<K, R> leftJoin(KTable<K, V1> other,
ValueJoiner<V, V1, R> joiner,
Serde<K> keySerde,
Serde<V> valueSerde) {
Objects.requireNonNull(other, "other KTable can't be null");
Objects.requireNonNull(joiner, "joiner can't be null");
@Override
public <V1, R> KStream<K, R> join(final KTable<K, V1> other,
final ValueJoiner<V, V1, R> joiner,
final Serde<K> keySerde,
final Serde<V> valueSerde) {
if (repartitionRequired) {
KStreamImpl<K, V> thisStreamRepartitioned = this.repartitionForJoin(keySerde,
valueSerde, null);
return thisStreamRepartitioned.doStreamTableLeftJoin(other, joiner);
final KStreamImpl<K, V> thisStreamRepartitioned = repartitionForJoin(keySerde,
valueSerde, null);
return thisStreamRepartitioned.doStreamTableJoin(other, joiner, false);
} else {
return doStreamTableLeftJoin(other, joiner);
return doStreamTableJoin(other, joiner, false);
}
}
private <V1, R> KStream<K, R> doStreamTableLeftJoin(final KTable<K, V1> other,
final ValueJoiner<V, V1, R> joiner) {
Set<String> allSourceNodes = ensureJoinableWith((AbstractStream<K>) other);
private <V1, R> KStream<K, R> doStreamTableJoin(final KTable<K, V1> other,
final ValueJoiner<V, V1, R> joiner,
final boolean leftJoin) {
Objects.requireNonNull(other, "other KTable can't be null");
Objects.requireNonNull(joiner, "joiner can't be null");
final Set<String> allSourceNodes = ensureJoinableWith((AbstractStream<K>) other);
String name = topology.newName(LEFTJOIN_NAME);
final String name = topology.newName(leftJoin ? LEFTJOIN_NAME : JOIN_NAME);
topology.addProcessor(name, new KStreamKTableLeftJoin<>((KTableImpl<K, ?, V1>) other, joiner), this.name);
topology.connectProcessorAndStateStores(name, ((KTableImpl<K, ?, V1>) other).valueGetterSupplier().storeNames());
topology.addProcessor(name, new KStreamKTableJoin<>((KTableImpl<K, ?, V1>) other, joiner, leftJoin), this.name);
topology.connectProcessorAndStateStores(name, other.getStoreName());
topology.connectProcessors(this.name, ((KTableImpl<K, ?, V1>) other).name);
return new KStreamImpl<>(topology, name, allSourceNodes, false);
}
@Override
public <V1, R> KStream<K, R> leftJoin(final KTable<K, V1> other, final ValueJoiner<V, V1, R> joiner) {
return leftJoin(other, joiner, null, null);
}
public <V1, R> KStream<K, R> leftJoin(final KTable<K, V1> other,
final ValueJoiner<V, V1, R> joiner,
final Serde<K> keySerde,
final Serde<V> valueSerde) {
if (repartitionRequired) {
final KStreamImpl<K, V> thisStreamRepartitioned = this.repartitionForJoin(keySerde,
valueSerde, null);
return thisStreamRepartitioned.doStreamTableJoin(other, joiner, true);
} else {
return doStreamTableJoin(other, joiner, true);
}
}
@Override
public <K1> KGroupedStream<K1, V> groupBy(KeyValueMapper<K, V, K1> selector) {
return groupBy(selector, null, null);
@ -600,8 +603,8 @@ public class KStreamImpl<K, V> extends AbstractStream<K> implements KStream<K, V @@ -600,8 +603,8 @@ public class KStreamImpl<K, V> extends AbstractStream<K> implements KStream<K, V
@Override
public <K1> KGroupedStream<K1, V> groupBy(KeyValueMapper<K, V, K1> selector,
Serde<K1> keySerde,
Serde<V> valSerde) {
Serde<K1> keySerde,
Serde<V> valSerde) {
Objects.requireNonNull(selector, "selector can't be null");
String selectName = internalSelectKey(selector);
@ -641,26 +644,16 @@ public class KStreamImpl<K, V> extends AbstractStream<K> implements KStream<K, V @@ -641,26 +644,16 @@ public class KStreamImpl<K, V> extends AbstractStream<K> implements KStream<K, V
.build();
}
private interface KStreamImplJoin {
private class KStreamImplJoin {
<K1, R, V1, V2> KStream<K1, R> join(KStream<K1, V1> lhs,
KStream<K1, V2> other,
ValueJoiner<V1, V2, R> joiner,
JoinWindows windows,
Serde<K1> keySerde,
Serde<V1> lhsValueSerde,
Serde<V2> otherValueSerde);
}
private class DefaultJoin implements KStreamImplJoin {
private final boolean outer;
private final boolean leftOuter;
private final boolean rightOuter;
DefaultJoin(final boolean outer) {
this.outer = outer;
KStreamImplJoin(final boolean leftOuter, final boolean rightOuter) {
this.leftOuter = leftOuter;
this.rightOuter = rightOuter;
}
@Override
public <K1, R, V1, V2> KStream<K1, R> join(KStream<K1, V1> lhs,
KStream<K1, V2> other,
ValueJoiner<V1, V2, R> joiner,
@ -670,12 +663,12 @@ public class KStreamImpl<K, V> extends AbstractStream<K> implements KStream<K, V @@ -670,12 +663,12 @@ public class KStreamImpl<K, V> extends AbstractStream<K> implements KStream<K, V
Serde<V2> otherValueSerde) {
String thisWindowStreamName = topology.newName(WINDOWED_NAME);
String otherWindowStreamName = topology.newName(WINDOWED_NAME);
String joinThisName = outer ? topology.newName(OUTERTHIS_NAME) : topology.newName(JOINTHIS_NAME);
String joinOtherName = outer ? topology.newName(OUTEROTHER_NAME) : topology.newName(JOINOTHER_NAME);
String joinThisName = rightOuter ? topology.newName(OUTERTHIS_NAME) : topology.newName(JOINTHIS_NAME);
String joinOtherName = leftOuter ? topology.newName(OUTEROTHER_NAME) : topology.newName(JOINOTHER_NAME);
String joinMergeName = topology.newName(MERGE_NAME);
StateStoreSupplier thisWindow =
createWindowedStateStore(windows, keySerde, lhsValueSerde, joinThisName + "-store");
createWindowedStateStore(windows, keySerde, lhsValueSerde, joinThisName + "-store");
StateStoreSupplier otherWindow =
createWindowedStateStore(windows, keySerde, otherValueSerde, joinOtherName + "-store");
@ -688,16 +681,16 @@ public class KStreamImpl<K, V> extends AbstractStream<K> implements KStream<K, V @@ -688,16 +681,16 @@ public class KStreamImpl<K, V> extends AbstractStream<K> implements KStream<K, V
windows.before + windows.after + 1,
windows.maintainMs());
KStreamKStreamJoin<K1, R, V1, V2> joinThis = new KStreamKStreamJoin<>(otherWindow.name(),
windows.before,
windows.after,
joiner,
outer);
KStreamKStreamJoin<K1, R, V2, V1> joinOther = new KStreamKStreamJoin<>(thisWindow.name(),
windows.after,
windows.before,
reverseJoiner(joiner),
outer);
final KStreamKStreamJoin<K1, R, V1, V2> joinThis = new KStreamKStreamJoin<>(otherWindow.name(),
windows.before,
windows.after,
joiner,
leftOuter);
final KStreamKStreamJoin<K1, R, V2, V1> joinOther = new KStreamKStreamJoin<>(thisWindow.name(),
windows.after,
windows.before,
reverseJoiner(joiner),
rightOuter);
KStreamPassThrough<K1, R> joinMerge = new KStreamPassThrough<>();
@ -716,39 +709,4 @@ public class KStreamImpl<K, V> extends AbstractStream<K> implements KStream<K, V @@ -716,39 +709,4 @@ public class KStreamImpl<K, V> extends AbstractStream<K> implements KStream<K, V
}
}
private class LeftJoin implements KStreamImplJoin {
@Override
public <K1, R, V1, V2> KStream<K1, R> join(KStream<K1, V1> lhs,
KStream<K1, V2> other,
ValueJoiner<V1, V2, R> joiner,
JoinWindows windows,
Serde<K1> keySerde,
Serde<V1> lhsValueSerde,
Serde<V2> otherValueSerde) {
String otherWindowStreamName = topology.newName(WINDOWED_NAME);
String joinThisName = topology.newName(LEFTJOIN_NAME);
StateStoreSupplier otherWindow =
createWindowedStateStore(windows, keySerde, otherValueSerde, joinThisName + "-store");
KStreamJoinWindow<K1, V1>
otherWindowedStream = new KStreamJoinWindow<>(otherWindow.name(), windows.before + windows.after + 1, windows.maintainMs());
KStreamKStreamJoin<K1, R, V1, V2>
joinThis = new KStreamKStreamJoin<>(otherWindow.name(), windows.before, windows.after, joiner, true);
topology.addProcessor(otherWindowStreamName, otherWindowedStream, ((AbstractStream) other).name);
topology.addProcessor(joinThisName, joinThis, ((AbstractStream) lhs).name);
topology.addStateStore(otherWindow, joinThisName, otherWindowStreamName);
Set<String> allSourceNodes = new HashSet<>(((AbstractStream) lhs).sourceNodes);
allSourceNodes.addAll(((KStreamImpl<K1, V2>) other).sourceNodes);
return new KStreamImpl<>(topology, joinThisName, allSourceNodes, false);
}
}
}

27
streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamJoin.java

@ -5,9 +5,9 @@ @@ -5,9 +5,9 @@
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* <p>
* http://www.apache.org/licenses/LICENSE-2.0
* <p>
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
@ -52,7 +52,6 @@ class KStreamKStreamJoin<K, R, V1, V2> implements ProcessorSupplier<K, V1> { @@ -52,7 +52,6 @@ class KStreamKStreamJoin<K, R, V1, V2> implements ProcessorSupplier<K, V1> {
private WindowStore<K, V2> otherWindow;
@SuppressWarnings("unchecked")
@Override
public void init(ProcessorContext context) {
super.init(context);
@ -62,14 +61,21 @@ class KStreamKStreamJoin<K, R, V1, V2> implements ProcessorSupplier<K, V1> { @@ -62,14 +61,21 @@ class KStreamKStreamJoin<K, R, V1, V2> implements ProcessorSupplier<K, V1> {
@Override
public void process(K key, V1 value) {
if (key == null)
public void process(final K key, final V1 value) {
// we do join iff keys are equal, thus, if key is null we cannot join and just ignore the record
//
// we also ignore the record if value is null, because in a key-value data model a null-value indicates
// an empty message (ie, there is nothing to be joined) -- this contrast SQL NULL semantics
// furthermore, on left/outer joins 'null' in ValueJoiner#apply() indicates a missing record --
// thus, to be consistent and to avoid ambiguous null semantics, null values are ignored
if (key == null || value == null) {
return;
}
boolean needOuterJoin = KStreamKStreamJoin.this.outer;
boolean needOuterJoin = outer;
long timeFrom = Math.max(0L, context().timestamp() - joinBeforeMs);
long timeTo = Math.max(0L, context().timestamp() + joinAfterMs);
final long timeFrom = Math.max(0L, context().timestamp() - joinBeforeMs);
final long timeTo = Math.max(0L, context().timestamp() + joinAfterMs);
try (WindowStoreIterator<V2> iter = otherWindow.fetch(key, timeFrom, timeTo)) {
while (iter.hasNext()) {
@ -77,8 +83,9 @@ class KStreamKStreamJoin<K, R, V1, V2> implements ProcessorSupplier<K, V1> { @@ -77,8 +83,9 @@ class KStreamKStreamJoin<K, R, V1, V2> implements ProcessorSupplier<K, V1> {
context().forward(key, joiner.apply(value, iter.next().value));
}
if (needOuterJoin)
if (needOuterJoin) {
context().forward(key, joiner.apply(value, null));
}
}
}
}

75
streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamKTableJoin.java

@ -0,0 +1,75 @@ @@ -0,0 +1,75 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
* <p>
* http://www.apache.org/licenses/LICENSE-2.0
* <p>
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.streams.kstream.internals;
import org.apache.kafka.streams.kstream.ValueJoiner;
import org.apache.kafka.streams.processor.AbstractProcessor;
import org.apache.kafka.streams.processor.Processor;
import org.apache.kafka.streams.processor.ProcessorContext;
import org.apache.kafka.streams.processor.ProcessorSupplier;
class KStreamKTableJoin<K, R, V1, V2> implements ProcessorSupplier<K, V1> {
private final KTableValueGetterSupplier<K, V2> valueGetterSupplier;
private final ValueJoiner<V1, V2, R> joiner;
private final boolean leftJoin;
KStreamKTableJoin(final KTableImpl<K, ?, V2> table, final ValueJoiner<V1, V2, R> joiner, final boolean leftJoin) {
valueGetterSupplier = table.valueGetterSupplier();
this.joiner = joiner;
this.leftJoin = leftJoin;
}
@Override
public Processor<K, V1> get() {
return new KStreamKTableJoinProcessor(valueGetterSupplier.get(), leftJoin);
}
private class KStreamKTableJoinProcessor extends AbstractProcessor<K, V1> {
private final KTableValueGetter<K, V2> valueGetter;
private final boolean leftJoin;
KStreamKTableJoinProcessor(final KTableValueGetter<K, V2> valueGetter, final boolean leftJoin) {
this.valueGetter = valueGetter;
this.leftJoin = leftJoin;
}
@Override
public void init(final ProcessorContext context) {
super.init(context);
valueGetter.init(context);
}
@Override
public void process(final K key, final V1 value) {
// we do join iff keys are equal, thus, if key is null we cannot join and just ignore the record
//
// we also ignore the record if value is null, because in a key-value data model a null-value indicates
// an empty message (ie, there is nothing to be joined) -- this contrast SQL NULL semantics
// furthermore, on left/outer joins 'null' in ValueJoiner#apply() indicates a missing record --
// thus, to be consistent and to avoid ambiguous null semantics, null values are ignored
if (key != null && value != null) {
final V2 value2 = valueGetter.get(key);
if (leftJoin || value2 != null) {
context().forward(key, joiner.apply(value, value2));
}
}
}
}
}

66
streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamKTableLeftJoin.java

@ -1,66 +0,0 @@ @@ -1,66 +0,0 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.streams.kstream.internals;
import org.apache.kafka.streams.kstream.ValueJoiner;
import org.apache.kafka.streams.processor.AbstractProcessor;
import org.apache.kafka.streams.processor.Processor;
import org.apache.kafka.streams.processor.ProcessorContext;
import org.apache.kafka.streams.processor.ProcessorSupplier;
class KStreamKTableLeftJoin<K, R, V1, V2> implements ProcessorSupplier<K, V1> {
private final KTableValueGetterSupplier<K, V2> valueGetterSupplier;
private final ValueJoiner<V1, V2, R> joiner;
KStreamKTableLeftJoin(KTableImpl<K, ?, V2> table, ValueJoiner<V1, V2, R> joiner) {
this.valueGetterSupplier = table.valueGetterSupplier();
this.joiner = joiner;
}
@Override
public Processor<K, V1> get() {
return new KStreamKTableLeftJoinProcessor(valueGetterSupplier.get());
}
private class KStreamKTableLeftJoinProcessor extends AbstractProcessor<K, V1> {
private final KTableValueGetter<K, V2> valueGetter;
public KStreamKTableLeftJoinProcessor(KTableValueGetter<K, V2> valueGetter) {
this.valueGetter = valueGetter;
}
@SuppressWarnings("unchecked")
@Override
public void init(ProcessorContext context) {
super.init(context);
valueGetter.init(context);
}
@Override
public void process(K key, V1 value) {
// if the key is null, we do not need proceed joining
// the record with the table
if (key != null) {
context().forward(key, joiner.apply(value, valueGetter.get(key)));
}
}
}
}

2
streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamWindowAggregate.java

@ -17,8 +17,8 @@ @@ -17,8 +17,8 @@
package org.apache.kafka.streams.kstream.internals;
import org.apache.kafka.streams.KeyValue;
import org.apache.kafka.streams.kstream.Aggregator;
import org.apache.kafka.streams.KeyValue;
import org.apache.kafka.streams.kstream.Initializer;
import org.apache.kafka.streams.kstream.Window;
import org.apache.kafka.streams.kstream.Windowed;

103
streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableImpl.java

@ -5,9 +5,9 @@ @@ -5,9 +5,9 @@
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* <p>
* http://www.apache.org/licenses/LICENSE-2.0
* <p>
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
@ -44,6 +44,7 @@ import java.util.Set; @@ -44,6 +44,7 @@ import java.util.Set;
/**
* The implementation class of {@link KTable}.
*
* @param <K> the key type
* @param <S> the source's (parent's) value type
* @param <V> the value type
@ -283,77 +284,55 @@ public class KTableImpl<K, S, V> extends AbstractStream<K> implements KTable<K, @@ -283,77 +284,55 @@ public class KTableImpl<K, S, V> extends AbstractStream<K> implements KTable<K,
return toStream().selectKey(mapper);
}
@SuppressWarnings("unchecked")
@Override
public <V1, R> KTable<K, R> join(KTable<K, V1> other, ValueJoiner<V, V1, R> joiner) {
Objects.requireNonNull(other, "other can't be null");
Objects.requireNonNull(joiner, "joiner can't be null");
Set<String> allSourceNodes = ensureJoinableWith((AbstractStream<K>) other);
String joinThisName = topology.newName(JOINTHIS_NAME);
String joinOtherName = topology.newName(JOINOTHER_NAME);
String joinMergeName = topology.newName(MERGE_NAME);
KTableKTableJoin<K, R, V, V1> joinThis = new KTableKTableJoin<>(this, (KTableImpl<K, ?, V1>) other, joiner);
KTableKTableJoin<K, R, V1, V> joinOther = new KTableKTableJoin<>((KTableImpl<K, ?, V1>) other, this, reverseJoiner(joiner));
KTableKTableJoinMerger<K, R> joinMerge = new KTableKTableJoinMerger<>(
new KTableImpl<K, V, R>(topology, joinThisName, joinThis, this.sourceNodes, this.storeName),
new KTableImpl<K, V1, R>(topology, joinOtherName, joinOther, ((KTableImpl<K, ?, ?>) other).sourceNodes, other.getStoreName())
);
topology.addProcessor(joinThisName, joinThis, this.name);
topology.addProcessor(joinOtherName, joinOther, ((KTableImpl) other).name);
topology.addProcessor(joinMergeName, joinMerge, joinThisName, joinOtherName);
topology.connectProcessorAndStateStores(joinThisName, ((KTableImpl) other).valueGetterSupplier().storeNames());
topology.connectProcessorAndStateStores(joinOtherName, valueGetterSupplier().storeNames());
return new KTableImpl<>(topology, joinMergeName, joinMerge, allSourceNodes, null);
public <V1, R> KTable<K, R> join(final KTable<K, V1> other, final ValueJoiner<V, V1, R> joiner) {
return doJoin(other, joiner, false, false);
}
@SuppressWarnings("unchecked")
@Override
public <V1, R> KTable<K, R> outerJoin(KTable<K, V1> other, ValueJoiner<V, V1, R> joiner) {
Objects.requireNonNull(other, "other can't be null");
Objects.requireNonNull(joiner, "joiner can't be null");
Set<String> allSourceNodes = ensureJoinableWith((AbstractStream<K>) other);
String joinThisName = topology.newName(OUTERTHIS_NAME);
String joinOtherName = topology.newName(OUTEROTHER_NAME);
String joinMergeName = topology.newName(MERGE_NAME);
KTableKTableOuterJoin<K, R, V, V1> joinThis = new KTableKTableOuterJoin<>(this, (KTableImpl<K, ?, V1>) other, joiner);
KTableKTableOuterJoin<K, R, V1, V> joinOther = new KTableKTableOuterJoin<>((KTableImpl<K, ?, V1>) other, this, reverseJoiner(joiner));
KTableKTableJoinMerger<K, R> joinMerge = new KTableKTableJoinMerger<>(
new KTableImpl<K, V, R>(topology, joinThisName, joinThis, this.sourceNodes, this.storeName),
new KTableImpl<K, V1, R>(topology, joinOtherName, joinOther, ((KTableImpl<K, ?, ?>) other).sourceNodes, other.getStoreName())
);
topology.addProcessor(joinThisName, joinThis, this.name);
topology.addProcessor(joinOtherName, joinOther, ((KTableImpl) other).name);
topology.addProcessor(joinMergeName, joinMerge, joinThisName, joinOtherName);
topology.connectProcessorAndStateStores(joinThisName, ((KTableImpl) other).valueGetterSupplier().storeNames());
topology.connectProcessorAndStateStores(joinOtherName, valueGetterSupplier().storeNames());
public <V1, R> KTable<K, R> outerJoin(final KTable<K, V1> other, final ValueJoiner<V, V1, R> joiner) {
return doJoin(other, joiner, true, true);
}
return new KTableImpl<>(topology, joinMergeName, joinMerge, allSourceNodes, null);
@Override
public <V1, R> KTable<K, R> leftJoin(final KTable<K, V1> other, final ValueJoiner<V, V1, R> joiner) {
return doJoin(other, joiner, true, false);
}
@SuppressWarnings("unchecked")
@Override
public <V1, R> KTable<K, R> leftJoin(KTable<K, V1> other, ValueJoiner<V, V1, R> joiner) {
private <V1, R> KTable<K, R> doJoin(final KTable<K, V1> other, ValueJoiner<V, V1, R> joiner, final boolean leftOuter, final boolean rightOuter) {
Objects.requireNonNull(other, "other can't be null");
Objects.requireNonNull(joiner, "joiner can't be null");
Set<String> allSourceNodes = ensureJoinableWith((AbstractStream<K>) other);
String joinThisName = topology.newName(LEFTTHIS_NAME);
String joinOtherName = topology.newName(LEFTOTHER_NAME);
String joinMergeName = topology.newName(MERGE_NAME);
final Set<String> allSourceNodes = ensureJoinableWith((AbstractStream<K>) other);
if (leftOuter) {
enableSendingOldValues();
}
if (rightOuter) {
((KTableImpl) other).enableSendingOldValues();
}
final String joinThisName = topology.newName(JOINTHIS_NAME);
final String joinOtherName = topology.newName(JOINOTHER_NAME);
final String joinMergeName = topology.newName(MERGE_NAME);
final KTableKTableAbstractJoin<K, R, V, V1> joinThis;
final KTableKTableAbstractJoin<K, R, V1, V> joinOther;
if (!leftOuter) { // inner
joinThis = new KTableKTableJoin<>(this, (KTableImpl<K, ?, V1>) other, joiner);
joinOther = new KTableKTableJoin<>((KTableImpl<K, ?, V1>) other, this, reverseJoiner(joiner));
} else if (!rightOuter) { // left
joinThis = new KTableKTableLeftJoin<>(this, (KTableImpl<K, ?, V1>) other, joiner);
joinOther = new KTableKTableRightJoin<>((KTableImpl<K, ?, V1>) other, this, reverseJoiner(joiner));
} else { // outer
joinThis = new KTableKTableOuterJoin<>(this, (KTableImpl<K, ?, V1>) other, joiner);
joinOther = new KTableKTableOuterJoin<>((KTableImpl<K, ?, V1>) other, this, reverseJoiner(joiner));
}
KTableKTableLeftJoin<K, R, V, V1> joinThis = new KTableKTableLeftJoin<>(this, (KTableImpl<K, ?, V1>) other, joiner);
KTableKTableRightJoin<K, R, V1, V> joinOther = new KTableKTableRightJoin<>((KTableImpl<K, ?, V1>) other, this, reverseJoiner(joiner));
KTableKTableJoinMerger<K, R> joinMerge = new KTableKTableJoinMerger<>(
new KTableImpl<K, V, R>(topology, joinThisName, joinThis, this.sourceNodes, this.storeName),
final KTableKTableJoinMerger<K, R> joinMerge = new KTableKTableJoinMerger<>(
new KTableImpl<K, V, R>(topology, joinThisName, joinThis, sourceNodes, storeName),
new KTableImpl<K, V1, R>(topology, joinOtherName, joinOther, ((KTableImpl<K, ?, ?>) other).sourceNodes, other.getStoreName())
);

21
streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableKTableJoin.java

@ -5,9 +5,9 @@ @@ -5,9 +5,9 @@
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* <p>
* http://www.apache.org/licenses/LICENSE-2.0
* <p>
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
@ -69,23 +69,26 @@ class KTableKTableJoin<K, R, V1, V2> extends KTableKTableAbstractJoin<K, R, V1, @@ -69,23 +69,26 @@ class KTableKTableJoin<K, R, V1, V2> extends KTableKTableAbstractJoin<K, R, V1,
* @throws StreamsException if key is null
*/
@Override
public void process(K key, Change<V1> change) {
public void process(final K key, final Change<V1> change) {
// the keys should never be null
if (key == null)
throw new StreamsException("Record key for KTable join operator should not be null.");
R newValue = null;
R oldValue = null;
V2 value2 = null;
if (change.newValue != null || change.oldValue != null)
value2 = valueGetter.get(key);
final V2 value2 = valueGetter.get(key);
if (value2 == null) {
return;
}
if (change.newValue != null && value2 != null)
if (change.newValue != null) {
newValue = joiner.apply(change.newValue, value2);
}
if (sendOldValues && change.oldValue != null && value2 != null)
if (sendOldValues && change.oldValue != null) {
oldValue = joiner.apply(change.oldValue, value2);
}
context().forward(key, new Change<>(newValue, oldValue));
}

19
streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableKTableLeftJoin.java

@ -5,9 +5,9 @@ @@ -5,9 +5,9 @@
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* <p>
* http://www.apache.org/licenses/LICENSE-2.0
* <p>
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
@ -70,27 +70,28 @@ class KTableKTableLeftJoin<K, R, V1, V2> extends KTableKTableAbstractJoin<K, R, @@ -70,27 +70,28 @@ class KTableKTableLeftJoin<K, R, V1, V2> extends KTableKTableAbstractJoin<K, R,
* @throws StreamsException if key is null
*/
@Override
public void process(K key, Change<V1> change) {
public void process(final K key, final Change<V1> change) {
// the keys should never be null
if (key == null)
throw new StreamsException("Record key for KTable left-join operator should not be null.");
R newValue = null;
R oldValue = null;
V2 value2 = null;
if (change.newValue != null || change.oldValue != null)
value2 = valueGetter.get(key);
final V2 value2 = valueGetter.get(key);
if (value2 == null && change.newValue == null && change.oldValue == null) {
return;
}
if (change.newValue != null)
if (change.newValue != null) {
newValue = joiner.apply(change.newValue, value2);
}
if (sendOldValues && change.oldValue != null)
oldValue = joiner.apply(change.oldValue, value2);
context().forward(key, new Change<>(newValue, oldValue));
}
}
private class KTableKTableLeftJoinValueGetter implements KTableValueGetter<K, R> {

16
streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableKTableOuterJoin.java

@ -69,21 +69,25 @@ class KTableKTableOuterJoin<K, R, V1, V2> extends KTableKTableAbstractJoin<K, R, @@ -69,21 +69,25 @@ class KTableKTableOuterJoin<K, R, V1, V2> extends KTableKTableAbstractJoin<K, R,
* @throws StreamsException if key is null
*/
@Override
public void process(K key, Change<V1> change) {
public void process(final K key, final Change<V1> change) {
// the keys should never be null
if (key == null)
throw new StreamsException("Record key for KTable outer-join operator should not be null.");
R newValue = null;
R oldValue = null;
V2 value2 = valueGetter.get(key);
if (change.newValue != null || value2 != null)
final V2 value2 = valueGetter.get(key);
if (value2 == null && change.newValue == null && change.oldValue == null) {
return;
}
if (value2 != null || change.newValue != null) {
newValue = joiner.apply(change.newValue, value2);
}
if (sendOldValues) {
if (change.oldValue != null || value2 != null)
oldValue = joiner.apply(change.oldValue, value2);
if (sendOldValues && (value2 != null || change.oldValue != null)) {
oldValue = joiner.apply(change.oldValue, value2);
}
context().forward(key, new Change<>(newValue, oldValue));

24
streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableKTableRightJoin.java

@ -5,9 +5,9 @@ @@ -5,9 +5,9 @@
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* <p>
* http://www.apache.org/licenses/LICENSE-2.0
* <p>
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
@ -70,19 +70,23 @@ class KTableKTableRightJoin<K, R, V1, V2> extends KTableKTableAbstractJoin<K, R, @@ -70,19 +70,23 @@ class KTableKTableRightJoin<K, R, V1, V2> extends KTableKTableAbstractJoin<K, R,
* @throws StreamsException if key is null
*/
@Override
public void process(K key, Change<V1> change) {
public void process(final K key, final Change<V1> change) {
// the keys should never be null
if (key == null)
throw new StreamsException("Record key for KTable right-join operator should not be null.");
R newValue = null;
final R newValue;
R oldValue = null;
V2 value2 = valueGetter.get(key);
if (value2 != null) {
newValue = joiner.apply(change.newValue, value2);
if (sendOldValues)
oldValue = joiner.apply(change.oldValue, value2);
final V2 value2 = valueGetter.get(key);
if (value2 == null) {
return;
}
newValue = joiner.apply(change.newValue, value2);
if (sendOldValues) {
oldValue = joiner.apply(change.oldValue, value2);
}
context().forward(key, new Change<>(newValue, oldValue));

4
streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorContextImpl.java

@ -48,7 +48,6 @@ public class ProcessorContextImpl implements InternalProcessorContext, RecordCol @@ -48,7 +48,6 @@ public class ProcessorContextImpl implements InternalProcessorContext, RecordCol
private RecordContext recordContext;
private ProcessorNode currentNode;
@SuppressWarnings("unchecked")
public ProcessorContextImpl(TaskId id,
StreamTask task,
StreamsConfig config,
@ -194,7 +193,6 @@ public class ProcessorContextImpl implements InternalProcessorContext, RecordCol @@ -194,7 +193,6 @@ public class ProcessorContextImpl implements InternalProcessorContext, RecordCol
return recordContext.timestamp();
}
@SuppressWarnings("unchecked")
@Override
public <K, V> void forward(K key, V value) {
ProcessorNode previousNode = currentNode;
@ -208,7 +206,6 @@ public class ProcessorContextImpl implements InternalProcessorContext, RecordCol @@ -208,7 +206,6 @@ public class ProcessorContextImpl implements InternalProcessorContext, RecordCol
}
}
@SuppressWarnings("unchecked")
@Override
public <K, V> void forward(K key, V value, int childIndex) {
ProcessorNode previousNode = currentNode;
@ -221,7 +218,6 @@ public class ProcessorContextImpl implements InternalProcessorContext, RecordCol @@ -221,7 +218,6 @@ public class ProcessorContextImpl implements InternalProcessorContext, RecordCol
}
}
@SuppressWarnings("unchecked")
@Override
public <K, V> void forward(K key, V value, String childName) {
for (ProcessorNode child : (List<ProcessorNode<K, V>>) currentNode.children()) {

433
streams/src/test/java/org/apache/kafka/streams/integration/JoinIntegrationTest.java

@ -0,0 +1,433 @@ @@ -0,0 +1,433 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
* <p>
* http://www.apache.org/licenses/LICENSE-2.0
* <p>
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.streams.integration;
import kafka.utils.ZkUtils;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.common.security.JaasUtils;
import org.apache.kafka.common.serialization.LongDeserializer;
import org.apache.kafka.common.serialization.LongSerializer;
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.apache.kafka.common.serialization.StringSerializer;
import org.apache.kafka.streams.KafkaStreams;
import org.apache.kafka.streams.KeyValue;
import org.apache.kafka.streams.StreamsConfig;
import org.apache.kafka.streams.integration.utils.EmbeddedKafkaCluster;
import org.apache.kafka.streams.integration.utils.IntegrationTestUtils;
import org.apache.kafka.streams.kstream.JoinWindows;
import org.apache.kafka.streams.kstream.KStream;
import org.apache.kafka.streams.kstream.KStreamBuilder;
import org.apache.kafka.streams.kstream.KTable;
import org.apache.kafka.streams.kstream.ValueJoiner;
import org.apache.kafka.test.TestCondition;
import org.apache.kafka.test.TestUtils;
import org.junit.After;
import org.junit.AfterClass;
import org.junit.Before;
import org.junit.BeforeClass;
import org.junit.ClassRule;
import org.junit.Test;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Properties;
import java.util.Set;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.core.Is.is;
/**
* Tests all available joins of Kafka Streams DSL.
*/
public class JoinIntegrationTest {
@ClassRule
public static final EmbeddedKafkaCluster CLUSTER = new EmbeddedKafkaCluster(1);
private static ZkUtils zkUtils = null;
private static final String APP_ID = "join-integration-test";
private static final String INPUT_TOPIC_1 = "inputTopicLeft";
private static final String INPUT_TOPIC_2 = "inputTopicRight";
private static final String OUTPUT_TOPIC = "outputTopic";
private final static Properties PRODUCER_CONFIG = new Properties();
private final static Properties RESULT_CONSUMER_CONFIG = new Properties();
private final static Properties STREAMS_CONFIG = new Properties();
private KStreamBuilder builder;
private KStream<Long, String> leftStream;
private KStream<Long, String> rightStream;
private KTable<Long, String> leftTable;
private KTable<Long, String> rightTable;
private final List<Input<String>> input = Arrays.asList(
new Input<>(INPUT_TOPIC_1, (String) null),
new Input<>(INPUT_TOPIC_2, (String) null),
new Input<>(INPUT_TOPIC_1, "A"),
new Input<>(INPUT_TOPIC_2, "a"),
new Input<>(INPUT_TOPIC_1, "B"),
new Input<>(INPUT_TOPIC_2, "b"),
new Input<>(INPUT_TOPIC_1, (String) null),
new Input<>(INPUT_TOPIC_2, (String) null),
new Input<>(INPUT_TOPIC_1, "C"),
new Input<>(INPUT_TOPIC_2, "c"),
new Input<>(INPUT_TOPIC_2, (String) null),
new Input<>(INPUT_TOPIC_1, (String) null),
new Input<>(INPUT_TOPIC_2, (String) null),
new Input<>(INPUT_TOPIC_2, "d"),
new Input<>(INPUT_TOPIC_1, "D")
);
private final ValueJoiner<String, String, String> valueJoiner = new ValueJoiner<String, String, String>() {
@Override
public String apply(final String value1, final String value2) {
return value1 + "-" + value2;
}
};
private final TestCondition topicsGotDeleted = new TopicsGotDeletedCondition();
@BeforeClass
public static void setupConfigsAndUtils() throws Exception {
PRODUCER_CONFIG.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, CLUSTER.bootstrapServers());
PRODUCER_CONFIG.put(ProducerConfig.ACKS_CONFIG, "all");
PRODUCER_CONFIG.put(ProducerConfig.RETRIES_CONFIG, 0);
PRODUCER_CONFIG.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, LongSerializer.class);
PRODUCER_CONFIG.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
RESULT_CONSUMER_CONFIG.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, CLUSTER.bootstrapServers());
RESULT_CONSUMER_CONFIG.put(ConsumerConfig.GROUP_ID_CONFIG, APP_ID + "-result-consumer");
RESULT_CONSUMER_CONFIG.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
RESULT_CONSUMER_CONFIG.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, LongDeserializer.class);
RESULT_CONSUMER_CONFIG.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
STREAMS_CONFIG.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, CLUSTER.bootstrapServers());
STREAMS_CONFIG.put(StreamsConfig.ZOOKEEPER_CONNECT_CONFIG, CLUSTER.zKConnectString());
STREAMS_CONFIG.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
STREAMS_CONFIG.put(StreamsConfig.STATE_DIR_CONFIG, TestUtils.tempDirectory().getPath());
STREAMS_CONFIG.put(StreamsConfig.KEY_SERDE_CLASS_CONFIG, Serdes.Long().getClass());
STREAMS_CONFIG.put(StreamsConfig.VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass());
STREAMS_CONFIG.put(StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG, 0);
zkUtils = ZkUtils.apply(CLUSTER.zKConnectString(),
30000,
30000,
JaasUtils.isZkSecurityEnabled());
}
@AfterClass
public static void release() {
if (zkUtils != null) {
zkUtils.close();
}
}
@Before
public void prepareTopology() throws Exception {
CLUSTER.createTopic(INPUT_TOPIC_1);
CLUSTER.createTopic(INPUT_TOPIC_2);
CLUSTER.createTopic(OUTPUT_TOPIC);
builder = new KStreamBuilder();
leftTable = builder.table(INPUT_TOPIC_1, "leftTable");
rightTable = builder.table(INPUT_TOPIC_2, "rightTable");
leftStream = leftTable.toStream();
rightStream = rightTable.toStream();
}
@After
public void cleanup() throws Exception {
CLUSTER.deleteTopic(INPUT_TOPIC_1);
CLUSTER.deleteTopic(INPUT_TOPIC_2);
CLUSTER.deleteTopic(OUTPUT_TOPIC);
TestUtils.waitForCondition(topicsGotDeleted, 120000, "Topics not deleted after 120 seconds.");
}
private void checkResult(final String outputTopic, final List<String> expectedResult) throws Exception {
if (expectedResult != null) {
final List<String> result = IntegrationTestUtils.waitUntilMinValuesRecordsReceived(RESULT_CONSUMER_CONFIG, outputTopic, expectedResult.size(), Long.MAX_VALUE);
assertThat(result, is(expectedResult));
}
}
/*
* Runs the actual test. Checks the result after each input record to ensure fixed processing order.
* If an input tuple does not trigger any result, "expectedResult" should contain a "null" entry
*/
private void runTest(final List<List<String>> expectedResult) throws Exception {
assert expectedResult.size() == input.size();
IntegrationTestUtils.purgeLocalStreamsState(STREAMS_CONFIG);
final KafkaStreams streams = new KafkaStreams(builder, STREAMS_CONFIG);
try {
streams.start();
long ts = System.currentTimeMillis();
final Iterator<List<String>> resultIterator = expectedResult.iterator();
for (final Input<String> singleInput : input) {
IntegrationTestUtils.produceKeyValuesSynchronouslyWithTimestamp(singleInput.topic, Collections.singleton(singleInput.record), PRODUCER_CONFIG, ++ts);
checkResult(OUTPUT_TOPIC, resultIterator.next());
}
} finally {
streams.close();
}
}
@Test
public void testInnerKStreamKStream() throws Exception {
STREAMS_CONFIG.put(StreamsConfig.APPLICATION_ID_CONFIG, APP_ID + "-inner-KStream-KStream");
final List<List<String>> expectedResult = Arrays.asList(
null,
null,
null,
Collections.singletonList("A-a"),
Collections.singletonList("B-a"),
Arrays.asList("A-b", "B-b"),
null,
null,
Arrays.asList("C-a", "C-b"),
Arrays.asList("A-c", "B-c", "C-c"),
null,
null,
null,
Arrays.asList("A-d", "B-d", "C-d"),
Arrays.asList("D-a", "D-b", "D-c", "D-d")
);
leftStream.join(rightStream, valueJoiner, JoinWindows.of(10000)).to(OUTPUT_TOPIC);
runTest(expectedResult);
}
@Test
public void testLeftKStreamKStream() throws Exception {
STREAMS_CONFIG.put(StreamsConfig.APPLICATION_ID_CONFIG, APP_ID + "-left-KStream-KStream");
final List<List<String>> expectedResult = Arrays.asList(
null,
null,
Collections.singletonList("A-null"),
Collections.singletonList("A-a"),
Collections.singletonList("B-a"),
Arrays.asList("A-b", "B-b"),
null,
null,
Arrays.asList("C-a", "C-b"),
Arrays.asList("A-c", "B-c", "C-c"),
null,
null,
null,
Arrays.asList("A-d", "B-d", "C-d"),
Arrays.asList("D-a", "D-b", "D-c", "D-d")
);
leftStream.leftJoin(rightStream, valueJoiner, JoinWindows.of(10000)).to(OUTPUT_TOPIC);
runTest(expectedResult);
}
@Test
public void testOuterKStreamKStream() throws Exception {
STREAMS_CONFIG.put(StreamsConfig.APPLICATION_ID_CONFIG, APP_ID + "-outer-KStream-KStream");
final List<List<String>> expectedResult = Arrays.asList(
null,
null,
Collections.singletonList("A-null"),
Collections.singletonList("A-a"),
Collections.singletonList("B-a"),
Arrays.asList("A-b", "B-b"),
null,
null,
Arrays.asList("C-a", "C-b"),
Arrays.asList("A-c", "B-c", "C-c"),
null,
null,
null,
Arrays.asList("A-d", "B-d", "C-d"),
Arrays.asList("D-a", "D-b", "D-c", "D-d")
);
leftStream.outerJoin(rightStream, valueJoiner, JoinWindows.of(10000)).to(OUTPUT_TOPIC);
runTest(expectedResult);
}
@Test
public void testInnerKStreamKTable() throws Exception {
STREAMS_CONFIG.put(StreamsConfig.APPLICATION_ID_CONFIG, APP_ID + "-inner-KStream-KTable");
final List<List<String>> expectedResult = Arrays.asList(
null,
null,
null,
null,
Collections.singletonList("B-a"),
null,
null,
null,
null,
null,
null,
null,
null,
null,
Collections.singletonList("D-d")
);
leftStream.join(rightTable, valueJoiner).to(OUTPUT_TOPIC);
runTest(expectedResult);
}
@Test
public void testLeftKStreamKTable() throws Exception {
STREAMS_CONFIG.put(StreamsConfig.APPLICATION_ID_CONFIG, APP_ID + "-left-KStream-KTable");
final List<List<String>> expectedResult = Arrays.asList(
null,
null,
Collections.singletonList("A-null"),
null,
Collections.singletonList("B-a"),
null,
null,
null,
Collections.singletonList("C-null"),
null,
null,
null,
null,
null,
Collections.singletonList("D-d")
);
leftStream.leftJoin(rightTable, valueJoiner).to(OUTPUT_TOPIC);
runTest(expectedResult);
}
@Test
public void testInnerKTableKTable() throws Exception {
STREAMS_CONFIG.put(StreamsConfig.APPLICATION_ID_CONFIG, APP_ID + "-inner-KTable-KTable");
final List<List<String>> expectedResult = Arrays.asList(
null,
null,
null,
Collections.singletonList("A-a"),
Collections.singletonList("B-a"),
Collections.singletonList("B-b"),
Collections.singletonList((String) null),
null,
null,
Collections.singletonList("C-c"),
Collections.singletonList((String) null),
null,
null,
null,
Collections.singletonList("D-d")
);
leftTable.join(rightTable, valueJoiner).to(OUTPUT_TOPIC);
runTest(expectedResult);
}
@Test
public void testLeftKTableKTable() throws Exception {
STREAMS_CONFIG.put(StreamsConfig.APPLICATION_ID_CONFIG, APP_ID + "-left-KTable-KTable");
final List<List<String>> expectedResult = Arrays.asList(
null,
null,
Collections.singletonList("A-null"),
Collections.singletonList("A-a"),
Collections.singletonList("B-a"),
Collections.singletonList("B-b"),
Collections.singletonList((String) null),
null,
Collections.singletonList("C-null"),
Collections.singletonList("C-c"),
Collections.singletonList("C-null"),
Collections.singletonList((String) null),
null,
null,
Collections.singletonList("D-d")
);
leftTable.leftJoin(rightTable, valueJoiner).to(OUTPUT_TOPIC);
runTest(expectedResult);
}
@Test
public void testOuterKTableKTable() throws Exception {
STREAMS_CONFIG.put(StreamsConfig.APPLICATION_ID_CONFIG, APP_ID + "-outer-KTable-KTable");
final List<List<String>> expectedResult = Arrays.asList(
null,
null,
Collections.singletonList("A-null"),
Collections.singletonList("A-a"),
Collections.singletonList("B-a"),
Collections.singletonList("B-b"),
Collections.singletonList("null-b"),
Collections.singletonList((String) null),
Collections.singletonList("C-null"),
Collections.singletonList("C-c"),
Collections.singletonList("C-null"),
Collections.singletonList((String) null),
null,
Collections.singletonList("null-d"),
Collections.singletonList("D-d")
);
leftTable.outerJoin(rightTable, valueJoiner).to(OUTPUT_TOPIC);
runTest(expectedResult);
}
private final class TopicsGotDeletedCondition implements TestCondition {
@Override
public boolean conditionMet() {
final Set<String> allTopics = new HashSet<>();
allTopics.addAll(scala.collection.JavaConversions.seqAsJavaList(zkUtils.getAllTopics()));
return !allTopics.contains(INPUT_TOPIC_1) && !allTopics.contains(INPUT_TOPIC_2) && !allTopics.contains(OUTPUT_TOPIC);
}
}
private final class Input<V> {
String topic;
KeyValue<Long, V> record;
private final long anyUniqueKey = 0L;
Input(final String topic, final V value) {
this.topic = topic;
record = KeyValue.pair(anyUniqueKey, value);
}
}
}

74
streams/src/test/java/org/apache/kafka/streams/integration/KTableKTableJoinIntegrationTest.java

@ -71,49 +71,49 @@ public class KTableKTableJoinIntegrationTest { @@ -71,49 +71,49 @@ public class KTableKTableJoinIntegrationTest {
public static Object[] parameters() {
return new Object[][]{
{JoinType.INNER, JoinType.INNER, Arrays.asList(
new KeyValue<>("a", null),
new KeyValue<>("b", null),
new KeyValue<>("c", null),
new KeyValue<>("a", null),
new KeyValue<>("b", null),
new KeyValue<>("b", "B1-B2-B3"),
new KeyValue<>("c", null))
},
// new KeyValue<>("a", null),
// new KeyValue<>("b", null),
// new KeyValue<>("c", null),
// new KeyValue<>("a", null),
// new KeyValue<>("b", null),
new KeyValue<>("b", "B1-B2-B3")//,
// new KeyValue<>("c", null)
)},
{JoinType.INNER, JoinType.LEFT, Arrays.asList(
new KeyValue<>("a", null),
new KeyValue<>("b", null),
new KeyValue<>("c", null),
new KeyValue<>("a", null),
new KeyValue<>("b", null),
new KeyValue<>("b", "B1-B2-B3"),
new KeyValue<>("c", null)
// new KeyValue<>("a", null),
// new KeyValue<>("b", null),
// new KeyValue<>("c", null),
// new KeyValue<>("a", null),
// new KeyValue<>("b", null),
new KeyValue<>("b", "B1-B2-B3")//,
// new KeyValue<>("c", null)
)},
{JoinType.INNER, JoinType.OUTER, Arrays.asList(
new KeyValue<>("a", "null-A3"),
new KeyValue<>("b", "null-B3"),
new KeyValue<>("c", "null-C3"),
new KeyValue<>("a", "null-A3"),
new KeyValue<>("b", "null-B3"),
new KeyValue<>("b", "B1-B2-B3"),
new KeyValue<>("c", "null-C3")
// new KeyValue<>("a", "null-A3"),
// new KeyValue<>("b", "null-B3"),
new KeyValue<>("b", "B1-B2-B3")//,
// new KeyValue<>("c", "null-C3")
)},
{JoinType.LEFT, JoinType.INNER, Arrays.asList(
new KeyValue<>("a", null),
new KeyValue<>("b", null),
new KeyValue<>("c", null),
// new KeyValue<>("a", null),
// new KeyValue<>("b", null),
// new KeyValue<>("c", null),
new KeyValue<>("a", "A1-null-A3"),
new KeyValue<>("b", "B1-null-B3"),
new KeyValue<>("b", "B1-B2-B3"),
new KeyValue<>("c", null)
new KeyValue<>("b", "B1-B2-B3")//,
// new KeyValue<>("c", null)
)},
{JoinType.LEFT, JoinType.LEFT, Arrays.asList(
new KeyValue<>("a", null),
new KeyValue<>("b", null),
new KeyValue<>("c", null),
// new KeyValue<>("a", null),
// new KeyValue<>("b", null),
// new KeyValue<>("c", null),
new KeyValue<>("a", "A1-null-A3"),
new KeyValue<>("b", "B1-null-B3"),
new KeyValue<>("b", "B1-B2-B3"),
new KeyValue<>("c", null)
new KeyValue<>("b", "B1-B2-B3")//,
// new KeyValue<>("c", null)
)},
{JoinType.LEFT, JoinType.OUTER, Arrays.asList(
new KeyValue<>("a", "null-A3"),
@ -121,22 +121,22 @@ public class KTableKTableJoinIntegrationTest { @@ -121,22 +121,22 @@ public class KTableKTableJoinIntegrationTest {
new KeyValue<>("c", "null-C3"),
new KeyValue<>("a", "A1-null-A3"),
new KeyValue<>("b", "B1-null-B3"),
new KeyValue<>("b", "B1-B2-B3"),
new KeyValue<>("c", "null-C3")
new KeyValue<>("b", "B1-B2-B3")//,
// new KeyValue<>("c", "null-C3")
)},
{JoinType.OUTER, JoinType.INNER, Arrays.asList(
new KeyValue<>("a", null),
new KeyValue<>("b", null),
new KeyValue<>("c", null),
// new KeyValue<>("a", null),
// new KeyValue<>("b", null),
// new KeyValue<>("c", null),
new KeyValue<>("a", "A1-null-A3"),
new KeyValue<>("b", "B1-null-B3"),
new KeyValue<>("b", "B1-B2-B3"),
new KeyValue<>("c", "null-C2-C3")
)},
{JoinType.OUTER, JoinType.LEFT, Arrays.asList(
new KeyValue<>("a", null),
new KeyValue<>("b", null),
new KeyValue<>("c", null),
// new KeyValue<>("a", null),
// new KeyValue<>("b", null),
// new KeyValue<>("c", null),
new KeyValue<>("a", "A1-null-A3"),
new KeyValue<>("b", "B1-null-B3"),
new KeyValue<>("b", "B1-B2-B3"),

98
streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamLeftJoinTest.java

@ -5,9 +5,9 @@ @@ -5,9 +5,9 @@
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* <p>
* http://www.apache.org/licenses/LICENSE-2.0
* <p>
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
@ -72,10 +72,10 @@ public class KStreamKStreamLeftJoinTest { @@ -72,10 +72,10 @@ public class KStreamKStreamLeftJoinTest {
final int[] expectedKeys = new int[]{0, 1, 2, 3};
KStream<Integer, String> stream1;
KStream<Integer, String> stream2;
KStream<Integer, String> joined;
MockProcessorSupplier<Integer, String> processor;
final KStream<Integer, String> stream1;
final KStream<Integer, String> stream2;
final KStream<Integer, String> joined;
final MockProcessorSupplier<Integer, String> processor;
processor = new MockProcessorSupplier<>();
stream1 = builder.stream(intSerde, stringSerde, topic1);
@ -84,7 +84,7 @@ public class KStreamKStreamLeftJoinTest { @@ -84,7 +84,7 @@ public class KStreamKStreamLeftJoinTest {
joined = stream1.leftJoin(stream2, MockValueJoiner.STRING_JOINER, JoinWindows.of(100), intSerde, stringSerde, stringSerde);
joined.process(processor);
Collection<Set<String>> copartitionGroups = builder.copartitionGroups();
final Collection<Set<String>> copartitionGroups = builder.copartitionGroups();
assertEquals(1, copartitionGroups.size());
assertEquals(new HashSet<>(Arrays.asList(topic1, topic2)), copartitionGroups.iterator().next());
@ -93,8 +93,10 @@ public class KStreamKStreamLeftJoinTest { @@ -93,8 +93,10 @@ public class KStreamKStreamLeftJoinTest {
driver.setTime(0L);
// push two items to the primary stream. the other window is empty
// w {}
// --> w = {}
// w1 {}
// w2 {}
// --> w1 = { 0:X0, 1:X1 }
// --> w2 = {}
for (int i = 0; i < 2; i++) {
driver.process(topic1, expectedKeys[i], "X" + expectedKeys[i]);
@ -103,38 +105,47 @@ public class KStreamKStreamLeftJoinTest { @@ -103,38 +105,47 @@ public class KStreamKStreamLeftJoinTest {
processor.checkAndClearProcessResult("0:X0+null", "1:X1+null");
// push two items to the other stream. this should produce two items.
// w {}
// --> w = { 0:Y0, 1:Y1 }
// w1 = { 0:X0, 1:X1 }
// w2 {}
// --> w1 = { 0:X0, 1:X1 }
// --> w2 = { 0:Y0, 1:Y1 }
for (int i = 0; i < 2; i++) {
driver.process(topic2, expectedKeys[i], "Y" + expectedKeys[i]);
}
driver.flushState();
processor.checkAndClearProcessResult();
// push all four items to the primary stream. this should produce four items.
// w = { 0:Y0, 1:Y1 }
// --> w = { 0:Y0, 1:Y1 }
processor.checkAndClearProcessResult("0:X0+Y0", "1:X1+Y1");
for (int i = 0; i < expectedKeys.length; i++) {
// push three items to the primary stream. this should produce four items.
// w1 = { 0:X0, 1:X1 }
// w2 = { 0:Y0, 1:Y1 }
// --> w1 = { 0:X0, 1:X1, 0:X0, 1:X1, 2:X2 }
// --> w2 = { 0:Y0, 1:Y1 }
for (int i = 0; i < 3; i++) {
driver.process(topic1, expectedKeys[i], "X" + expectedKeys[i]);
}
driver.flushState();
processor.checkAndClearProcessResult("0:X0+Y0", "1:X1+Y1", "2:X2+null", "3:X3+null");
processor.checkAndClearProcessResult("0:X0+Y0", "1:X1+Y1", "2:X2+null");
// push all items to the other stream. this should produce no items.
// w = { 0:Y0, 1:Y1 }
// --> w = { 0:Y0, 1:Y1, 0:YY0, 0:YY0, 1:YY1, 2:YY2, 3:YY3 }
// push all items to the other stream. this should produce 5 items
// w1 = { 0:X0, 1:X1, 0:X0, 1:X1, 2:X2 }
// w2 = { 0:Y0, 1:Y1 }
// --> w1 = { 0:X0, 1:X1, 0:X0, 1:X1, 2:X2 }
// --> w2 = { 0:Y0, 1:Y1, 0:YY0, 1:YY1, 2:YY2, 3:YY3}
for (int i = 0; i < expectedKeys.length; i++) {
driver.process(topic2, expectedKeys[i], "YY" + expectedKeys[i]);
}
driver.flushState();
processor.checkAndClearProcessResult();
processor.checkAndClearProcessResult("0:X0+YY0", "0:X0+YY0", "1:X1+YY1", "1:X1+YY1", "2:X2+YY2");
// push all four items to the primary stream. this should produce four items.
// w = { 0:Y0, 1:Y1, 0:YY0, 0:YY0, 1:YY1, 2:YY2, 3:YY3
// --> w = { 0:Y0, 1:Y1, 0:YY0, 0:YY0, 1:YY1, 2:YY2, 3:YY3 }
// push all four items to the primary stream. this should produce six items.
// w1 = { 0:X0, 1:X1, 0:X0, 1:X1, 2:X2 }
// w2 = { 0:Y0, 1:Y1, 0:YY0, 1:YY1, 2:YY2, 3:YY3}
// --> w1 = { 0:X0, 1:X1, 0:X0, 1:X1, 2:X2, 0:XX0, 1:XX1, 2:XX2, 3:XX3 }
// --> w2 = { 0:Y0, 1:Y1, 0:YY0, 1:YY1, 2:YY2, 3:YY3}
for (int i = 0; i < expectedKeys.length; i++) {
driver.process(topic1, expectedKeys[i], "XX" + expectedKeys[i]);
@ -151,10 +162,10 @@ public class KStreamKStreamLeftJoinTest { @@ -151,10 +162,10 @@ public class KStreamKStreamLeftJoinTest {
long time = 0L;
KStream<Integer, String> stream1;
KStream<Integer, String> stream2;
KStream<Integer, String> joined;
MockProcessorSupplier<Integer, String> processor;
final KStream<Integer, String> stream1;
final KStream<Integer, String> stream2;
final KStream<Integer, String> joined;
final MockProcessorSupplier<Integer, String> processor;
processor = new MockProcessorSupplier<>();
stream1 = builder.stream(intSerde, stringSerde, topic1);
@ -163,7 +174,7 @@ public class KStreamKStreamLeftJoinTest { @@ -163,7 +174,7 @@ public class KStreamKStreamLeftJoinTest {
joined = stream1.leftJoin(stream2, MockValueJoiner.STRING_JOINER, JoinWindows.of(100), intSerde, stringSerde, stringSerde);
joined.process(processor);
Collection<Set<String>> copartitionGroups = builder.copartitionGroups();
final Collection<Set<String>> copartitionGroups = builder.copartitionGroups();
assertEquals(1, copartitionGroups.size());
assertEquals(new HashSet<>(Arrays.asList(topic1, topic2)), copartitionGroups.iterator().next());
@ -171,8 +182,10 @@ public class KStreamKStreamLeftJoinTest { @@ -171,8 +182,10 @@ public class KStreamKStreamLeftJoinTest {
driver = new KStreamTestDriver(builder, stateDir);
// push two items to the primary stream. the other window is empty. this should produce two items
// w = {}
// --> w = {}
// w1 = {}
// w2 = {}
// --> w1 = { 0:X0, 1:X1 }
// --> w2 = {}
setRecordContext(time, topic1);
for (int i = 0; i < 2; i++) {
@ -182,23 +195,27 @@ public class KStreamKStreamLeftJoinTest { @@ -182,23 +195,27 @@ public class KStreamKStreamLeftJoinTest {
processor.checkAndClearProcessResult("0:X0+null", "1:X1+null");
// push two items to the other stream. this should produce no items.
// w = {}
// --> w = { 0:Y0, 1:Y1 }
// w1 = { 0:X0, 1:X1 }
// w2 = {}
// --> w1 = { 0:X0, 1:X1 }
// --> w2 = { 0:Y0, 1:Y1 }
setRecordContext(time, topic2);
for (int i = 0; i < 2; i++) {
driver.process(topic2, expectedKeys[i], "Y" + expectedKeys[i]);
}
driver.flushState();
processor.checkAndClearProcessResult();
processor.checkAndClearProcessResult("0:X0+Y0", "1:X1+Y1");
// clear logically
time = 1000L;
setRecordContext(time, topic2);
// push all items to the other stream. this should produce no items.
// w = {}
// --> w = { 0:Y0, 1:Y1, 2:Y2, 3:Y3 }
// w1 = {}
// w2 = {}
// --> w1 = {}
// --> w2 = { 0:Y0, 1:Y1, 2:Y2, 3:Y3 }
for (int i = 0; i < expectedKeys.length; i++) {
setRecordContext(time + i, topic2);
driver.process(topic2, expectedKeys[i], "Y" + expectedKeys[i]);
@ -206,8 +223,11 @@ public class KStreamKStreamLeftJoinTest { @@ -206,8 +223,11 @@ public class KStreamKStreamLeftJoinTest {
driver.flushState();
processor.checkAndClearProcessResult();
// gradually expire items in window.
// w = { 0:Y0, 1:Y1, 2:Y2, 3:Y3 }
// gradually expire items in window 2.
// w1 = {}
// w2 = {}
// --> w1 = {}
// --> w2 = { 0:Y0, 1:Y1, 2:Y2, 3:Y3 }
time = 1000L + 100L;
setRecordContext(time, topic1);

146
streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamKTableJoinTest.java

@ -0,0 +1,146 @@ @@ -0,0 +1,146 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
* <p>
* http://www.apache.org/licenses/LICENSE-2.0
* <p>
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.streams.kstream.internals;
import org.apache.kafka.common.serialization.Serde;
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.streams.kstream.KStream;
import org.apache.kafka.streams.kstream.KStreamBuilder;
import org.apache.kafka.streams.kstream.KTable;
import org.apache.kafka.test.KStreamTestDriver;
import org.apache.kafka.test.MockProcessorSupplier;
import org.apache.kafka.test.MockValueJoiner;
import org.apache.kafka.test.TestUtils;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
import java.io.File;
import java.io.IOException;
import java.util.Arrays;
import java.util.Collection;
import java.util.HashSet;
import java.util.Set;
import static org.junit.Assert.assertEquals;
public class KStreamKTableJoinTest {
final private String topic1 = "topic1";
final private String topic2 = "topic2";
final private Serde<Integer> intSerde = Serdes.Integer();
final private Serde<String> stringSerde = Serdes.String();
private KStreamTestDriver driver = null;
private File stateDir = null;
@After
public void tearDown() {
if (driver != null) {
driver.close();
}
driver = null;
}
@Before
public void setUp() throws IOException {
stateDir = TestUtils.tempDirectory("kafka-test");
}
@Test
public void testJoin() throws Exception {
final KStreamBuilder builder = new KStreamBuilder();
final int[] expectedKeys = new int[]{0, 1, 2, 3};
final KStream<Integer, String> stream;
final KTable<Integer, String> table;
final MockProcessorSupplier<Integer, String> processor;
processor = new MockProcessorSupplier<>();
stream = builder.stream(intSerde, stringSerde, topic1);
table = builder.table(intSerde, stringSerde, topic2, "anyStoreName");
stream.join(table, MockValueJoiner.STRING_JOINER).process(processor);
final Collection<Set<String>> copartitionGroups = builder.copartitionGroups();
assertEquals(1, copartitionGroups.size());
assertEquals(new HashSet<>(Arrays.asList(topic1, topic2)), copartitionGroups.iterator().next());
driver = new KStreamTestDriver(builder, stateDir);
driver.setTime(0L);
// push two items to the primary stream. the other table is empty
for (int i = 0; i < 2; i++) {
driver.process(topic1, expectedKeys[i], "X" + expectedKeys[i]);
}
processor.checkAndClearProcessResult();
// push two items to the other stream. this should not produce any item.
for (int i = 0; i < 2; i++) {
driver.process(topic2, expectedKeys[i], "Y" + expectedKeys[i]);
}
processor.checkAndClearProcessResult();
// push all four items to the primary stream. this should produce two items.
for (int i = 0; i < expectedKeys.length; i++) {
driver.process(topic1, expectedKeys[i], "X" + expectedKeys[i]);
}
processor.checkAndClearProcessResult("0:X0+Y0", "1:X1+Y1");
// push all items to the other stream. this should not produce any item
for (int i = 0; i < expectedKeys.length; i++) {
driver.process(topic2, expectedKeys[i], "YY" + expectedKeys[i]);
}
processor.checkAndClearProcessResult();
// push all four items to the primary stream. this should produce four items.
for (int i = 0; i < expectedKeys.length; i++) {
driver.process(topic1, expectedKeys[i], "X" + expectedKeys[i]);
}
processor.checkAndClearProcessResult("0:X0+YY0", "1:X1+YY1", "2:X2+YY2", "3:X3+YY3");
// push two items with null to the other stream as deletes. this should not produce any item.
for (int i = 0; i < 2; i++) {
driver.process(topic2, expectedKeys[i], null);
}
processor.checkAndClearProcessResult();
// push all four items to the primary stream. this should produce two items.
for (int i = 0; i < expectedKeys.length; i++) {
driver.process(topic1, expectedKeys[i], "XX" + expectedKeys[i]);
}
processor.checkAndClearProcessResult("2:XX2+YY2", "3:XX3+YY3");
}
}

22
streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamWindowAggregateTest.java

@ -5,9 +5,9 @@ @@ -5,9 +5,9 @@
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* <p>
* http://www.apache.org/licenses/LICENSE-2.0
* <p>
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
@ -228,13 +228,7 @@ public class KStreamWindowAggregateTest { @@ -228,13 +228,7 @@ public class KStreamWindowAggregateTest {
"[A@0]:0+1+1"
);
proc2.checkAndClearProcessResult();
proc3.checkAndClearProcessResult(
"[A@0]:null",
"[B@0]:null",
"[C@0]:null",
"[D@0]:null",
"[A@0]:null"
);
proc3.checkAndClearProcessResult();
setRecordContext(5, topic1);
driver.process(topic1, "A", "1");
@ -260,13 +254,7 @@ public class KStreamWindowAggregateTest { @@ -260,13 +254,7 @@ public class KStreamWindowAggregateTest {
"[C@0]:0+3+3", "[C@5]:0+3"
);
proc2.checkAndClearProcessResult();
proc3.checkAndClearProcessResult(
"[A@0]:null", "[A@5]:null",
"[B@0]:null", "[B@5]:null",
"[D@0]:null", "[D@5]:null",
"[B@0]:null", "[B@5]:null",
"[C@0]:null", "[C@5]:null"
);
proc3.checkAndClearProcessResult();
setRecordContext(0, topic1);
driver.process(topic2, "A", "a");

93
streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableKTableJoinTest.java

@ -5,9 +5,9 @@ @@ -5,9 +5,9 @@
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* <p>
* http://www.apache.org/licenses/LICENSE-2.0
* <p>
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
@ -19,9 +19,9 @@ package org.apache.kafka.streams.kstream.internals; @@ -19,9 +19,9 @@ package org.apache.kafka.streams.kstream.internals;
import org.apache.kafka.common.serialization.Serde;
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.streams.KeyValue;
import org.apache.kafka.streams.kstream.KStreamBuilder;
import org.apache.kafka.streams.kstream.KTable;
import org.apache.kafka.streams.KeyValue;
import org.apache.kafka.test.KStreamTestDriver;
import org.apache.kafka.test.MockProcessorSupplier;
import org.apache.kafka.test.MockValueJoiner;
@ -74,10 +74,10 @@ public class KTableKTableJoinTest { @@ -74,10 +74,10 @@ public class KTableKTableJoinTest {
final int[] expectedKeys = new int[]{0, 1, 2, 3};
KTable<Integer, String> table1;
KTable<Integer, String> table2;
KTable<Integer, String> joined;
MockProcessorSupplier<Integer, String> processor;
final KTable<Integer, String> table1;
final KTable<Integer, String> table2;
final KTable<Integer, String> joined;
final MockProcessorSupplier<Integer, String> processor;
processor = new MockProcessorSupplier<>();
table1 = builder.table(intSerde, stringSerde, topic1, storeName1);
@ -85,17 +85,17 @@ public class KTableKTableJoinTest { @@ -85,17 +85,17 @@ public class KTableKTableJoinTest {
joined = table1.join(table2, MockValueJoiner.STRING_JOINER);
joined.toStream().process(processor);
Collection<Set<String>> copartitionGroups = builder.copartitionGroups();
final Collection<Set<String>> copartitionGroups = builder.copartitionGroups();
assertEquals(1, copartitionGroups.size());
assertEquals(new HashSet<>(Arrays.asList(topic1, topic2)), copartitionGroups.iterator().next());
KTableValueGetterSupplier<Integer, String> getterSupplier = ((KTableImpl<Integer, String, String>) joined).valueGetterSupplier();
final KTableValueGetterSupplier<Integer, String> getterSupplier = ((KTableImpl<Integer, String, String>) joined).valueGetterSupplier();
driver = new KStreamTestDriver(builder, stateDir);
driver.setTime(0L);
KTableValueGetter<Integer, String> getter = getterSupplier.get();
final KTableValueGetter<Integer, String> getter = getterSupplier.get();
getter.init(driver.context());
// push two items to the primary stream. the other table is empty
@ -105,8 +105,7 @@ public class KTableKTableJoinTest { @@ -105,8 +105,7 @@ public class KTableKTableJoinTest {
}
driver.flushState();
processor.checkAndClearProcessResult("0:null", "1:null");
checkJoinedValues(getter, kv(0, null), kv(1, null), kv(2, null), kv(3, null));
processor.checkAndClearProcessResult();
// push two items to the other stream. this should produce two items.
@ -116,17 +115,17 @@ public class KTableKTableJoinTest { @@ -116,17 +115,17 @@ public class KTableKTableJoinTest {
driver.flushState();
processor.checkAndClearProcessResult("0:X0+Y0", "1:X1+Y1");
checkJoinedValues(getter, kv(0, "X0+Y0"), kv(1, "X1+Y1"), kv(2, null), kv(3, null));
checkJoinedValues(getter, kv(0, "X0+Y0"), kv(1, "X1+Y1"));
// push all four items to the primary stream. this should produce four items.
// push all four items to the primary stream. this should produce two items.
for (int i = 0; i < expectedKeys.length; i++) {
driver.process(topic1, expectedKeys[i], "X" + expectedKeys[i]);
driver.process(topic1, expectedKeys[i], "XX" + expectedKeys[i]);
}
driver.flushState();
processor.checkAndClearProcessResult("0:X0+Y0", "1:X1+Y1", "2:null", "3:null");
checkJoinedValues(getter, kv(0, "X0+Y0"), kv(1, "X1+Y1"), kv(2, null), kv(3, null));
processor.checkAndClearProcessResult("0:XX0+Y0", "1:XX1+Y1");
checkJoinedValues(getter, kv(0, "XX0+Y0"), kv(1, "XX1+Y1"));
// push all items to the other stream. this should produce four items.
for (int i = 0; i < expectedKeys.length; i++) {
@ -134,8 +133,8 @@ public class KTableKTableJoinTest { @@ -134,8 +133,8 @@ public class KTableKTableJoinTest {
}
driver.flushState();
processor.checkAndClearProcessResult("0:X0+YY0", "1:X1+YY1", "2:X2+YY2", "3:X3+YY3");
checkJoinedValues(getter, kv(0, "X0+YY0"), kv(1, "X1+YY1"), kv(2, "X2+YY2"), kv(3, "X3+YY3"));
processor.checkAndClearProcessResult("0:XX0+YY0", "1:XX1+YY1", "2:XX2+YY2", "3:XX3+YY3");
checkJoinedValues(getter, kv(0, "XX0+YY0"), kv(1, "XX1+YY1"), kv(2, "XX2+YY2"), kv(3, "XX3+YY3"));
// push all four items to the primary stream. this should produce four items.
@ -155,17 +154,17 @@ public class KTableKTableJoinTest { @@ -155,17 +154,17 @@ public class KTableKTableJoinTest {
driver.flushState();
processor.checkAndClearProcessResult("0:null", "1:null");
checkJoinedValues(getter, kv(0, null), kv(1, null), kv(2, "X2+YY2"), kv(3, "X3+YY3"));
checkJoinedValues(getter, kv(0, null), kv(1, null));
// push all four items to the primary stream. this should produce four items.
// push all four items to the primary stream. this should produce two items.
for (int i = 0; i < expectedKeys.length; i++) {
driver.process(topic1, expectedKeys[i], "XX" + expectedKeys[i]);
}
driver.flushState();
processor.checkAndClearProcessResult("0:null", "1:null", "2:XX2+YY2", "3:XX3+YY3");
checkJoinedValues(getter, kv(0, null), kv(1, null), kv(2, "XX2+YY2"), kv(3, "XX3+YY3"));
processor.checkAndClearProcessResult("2:XX2+YY2", "3:XX3+YY3");
checkJoinedValues(getter, kv(2, "XX2+YY2"), kv(3, "XX3+YY3"));
}
@Test
@ -174,10 +173,10 @@ public class KTableKTableJoinTest { @@ -174,10 +173,10 @@ public class KTableKTableJoinTest {
final int[] expectedKeys = new int[]{0, 1, 2, 3};
KTable<Integer, String> table1;
KTable<Integer, String> table2;
KTable<Integer, String> joined;
MockProcessorSupplier<Integer, String> proc;
final KTable<Integer, String> table1;
final KTable<Integer, String> table2;
final KTable<Integer, String> joined;
final MockProcessorSupplier<Integer, String> proc;
table1 = builder.table(intSerde, stringSerde, topic1, storeName1);
table2 = builder.table(intSerde, stringSerde, topic2, storeName2);
@ -200,7 +199,7 @@ public class KTableKTableJoinTest { @@ -200,7 +199,7 @@ public class KTableKTableJoinTest {
}
driver.flushState();
proc.checkAndClearProcessResult("0:(null<-null)", "1:(null<-null)");
proc.checkAndClearProcessResult();
// push two items to the other stream. this should produce two items.
@ -211,21 +210,21 @@ public class KTableKTableJoinTest { @@ -211,21 +210,21 @@ public class KTableKTableJoinTest {
proc.checkAndClearProcessResult("0:(X0+Y0<-null)", "1:(X1+Y1<-null)");
// push all four items to the primary stream. this should produce four items.
// push all four items to the primary stream. this should produce two items.
for (int i = 0; i < expectedKeys.length; i++) {
driver.process(topic1, expectedKeys[i], "X" + expectedKeys[i]);
driver.process(topic1, expectedKeys[i], "XX" + expectedKeys[i]);
}
driver.flushState();
proc.checkAndClearProcessResult("0:(X0+Y0<-null)", "1:(X1+Y1<-null)", "2:(null<-null)", "3:(null<-null)");
proc.checkAndClearProcessResult("0:(XX0+Y0<-null)", "1:(XX1+Y1<-null)");
// push all items to the other stream. this should produce four items.
for (int i = 0; i < expectedKeys.length; i++) {
driver.process(topic2, expectedKeys[i], "YY" + expectedKeys[i]);
}
driver.flushState();
proc.checkAndClearProcessResult("0:(X0+YY0<-null)", "1:(X1+YY1<-null)", "2:(X2+YY2<-null)", "3:(X3+YY3<-null)");
proc.checkAndClearProcessResult("0:(XX0+YY0<-null)", "1:(XX1+YY1<-null)", "2:(XX2+YY2<-null)", "3:(XX3+YY3<-null)");
// push all four items to the primary stream. this should produce four items.
@ -243,13 +242,13 @@ public class KTableKTableJoinTest { @@ -243,13 +242,13 @@ public class KTableKTableJoinTest {
driver.flushState();
proc.checkAndClearProcessResult("0:(null<-null)", "1:(null<-null)");
// push all four items to the primary stream. this should produce four items.
// push all four items to the primary stream. this should produce two items.
for (int i = 0; i < expectedKeys.length; i++) {
driver.process(topic1, expectedKeys[i], "XX" + expectedKeys[i]);
}
driver.flushState();
proc.checkAndClearProcessResult("0:(null<-null)", "1:(null<-null)", "2:(XX2+YY2<-null)", "3:(XX3+YY3<-null)");
proc.checkAndClearProcessResult("2:(XX2+YY2<-null)", "3:(XX3+YY3<-null)");
}
@Test
@ -258,10 +257,10 @@ public class KTableKTableJoinTest { @@ -258,10 +257,10 @@ public class KTableKTableJoinTest {
final int[] expectedKeys = new int[]{0, 1, 2, 3};
KTable<Integer, String> table1;
KTable<Integer, String> table2;
KTable<Integer, String> joined;
MockProcessorSupplier<Integer, String> proc;
final KTable<Integer, String> table1;
final KTable<Integer, String> table2;
final KTable<Integer, String> joined;
final MockProcessorSupplier<Integer, String> proc;
table1 = builder.table(intSerde, stringSerde, topic1, storeName1);
table2 = builder.table(intSerde, stringSerde, topic2, storeName2);
@ -285,7 +284,7 @@ public class KTableKTableJoinTest { @@ -285,7 +284,7 @@ public class KTableKTableJoinTest {
driver.process(topic1, expectedKeys[i], "X" + expectedKeys[i]);
}
driver.flushState();
proc.checkAndClearProcessResult("0:(null<-null)", "1:(null<-null)");
proc.checkAndClearProcessResult();
// push two items to the other stream. this should produce two items.
@ -295,20 +294,20 @@ public class KTableKTableJoinTest { @@ -295,20 +294,20 @@ public class KTableKTableJoinTest {
driver.flushState();
proc.checkAndClearProcessResult("0:(X0+Y0<-null)", "1:(X1+Y1<-null)");
// push all four items to the primary stream. this should produce four items.
// push all four items to the primary stream. this should produce two items.
for (int i = 0; i < expectedKeys.length; i++) {
driver.process(topic1, expectedKeys[i], "X" + expectedKeys[i]);
driver.process(topic1, expectedKeys[i], "XX" + expectedKeys[i]);
}
driver.flushState();
proc.checkAndClearProcessResult("0:(X0+Y0<-X0+Y0)", "1:(X1+Y1<-X1+Y1)", "2:(null<-null)", "3:(null<-null)");
proc.checkAndClearProcessResult("0:(XX0+Y0<-X0+Y0)", "1:(XX1+Y1<-X1+Y1)");
// push all items to the other stream. this should produce four items.
for (int i = 0; i < expectedKeys.length; i++) {
driver.process(topic2, expectedKeys[i], "YY" + expectedKeys[i]);
}
driver.flushState();
proc.checkAndClearProcessResult("0:(X0+YY0<-X0+Y0)", "1:(X1+YY1<-X1+Y1)", "2:(X2+YY2<-null)", "3:(X3+YY3<-null)");
proc.checkAndClearProcessResult("0:(XX0+YY0<-XX0+Y0)", "1:(XX1+YY1<-XX1+Y1)", "2:(XX2+YY2<-null)", "3:(XX3+YY3<-null)");
// push all four items to the primary stream. this should produce four items.
@ -316,7 +315,7 @@ public class KTableKTableJoinTest { @@ -316,7 +315,7 @@ public class KTableKTableJoinTest {
driver.process(topic1, expectedKeys[i], "X" + expectedKeys[i]);
}
driver.flushState();
proc.checkAndClearProcessResult("0:(X0+YY0<-X0+YY0)", "1:(X1+YY1<-X1+YY1)", "2:(X2+YY2<-X2+YY2)", "3:(X3+YY3<-X3+YY3)");
proc.checkAndClearProcessResult("0:(X0+YY0<-XX0+YY0)", "1:(X1+YY1<-XX1+YY1)", "2:(X2+YY2<-XX2+YY2)", "3:(X3+YY3<-XX3+YY3)");
// push two items with null to the other stream as deletes. this should produce two item.
@ -326,13 +325,13 @@ public class KTableKTableJoinTest { @@ -326,13 +325,13 @@ public class KTableKTableJoinTest {
driver.flushState();
proc.checkAndClearProcessResult("0:(null<-X0+YY0)", "1:(null<-X1+YY1)");
// push all four items to the primary stream. this should produce four items.
// push all four items to the primary stream. this should produce two items.
for (int i = 0; i < expectedKeys.length; i++) {
driver.process(topic1, expectedKeys[i], "XX" + expectedKeys[i]);
}
driver.flushState();
proc.checkAndClearProcessResult("0:(null<-null)", "1:(null<-null)", "2:(XX2+YY2<-X2+YY2)", "3:(XX3+YY3<-X3+YY3)");
proc.checkAndClearProcessResult("2:(XX2+YY2<-X2+YY2)", "3:(XX3+YY3<-X3+YY3)");
}
private KeyValue<Integer, String> kv(Integer key, String value) {

18
streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableKTableLeftJoinTest.java

@ -5,9 +5,9 @@ @@ -5,9 +5,9 @@
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* <p>
* http://www.apache.org/licenses/LICENSE-2.0
* <p>
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
@ -19,9 +19,9 @@ package org.apache.kafka.streams.kstream.internals; @@ -19,9 +19,9 @@ package org.apache.kafka.streams.kstream.internals;
import org.apache.kafka.common.serialization.Serde;
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.streams.KeyValue;
import org.apache.kafka.streams.kstream.KStreamBuilder;
import org.apache.kafka.streams.kstream.KTable;
import org.apache.kafka.streams.KeyValue;
import org.apache.kafka.test.KStreamTestDriver;
import org.apache.kafka.test.MockProcessorSupplier;
import org.apache.kafka.test.MockValueJoiner;
@ -164,10 +164,10 @@ public class KTableKTableLeftJoinTest { @@ -164,10 +164,10 @@ public class KTableKTableLeftJoinTest {
final int[] expectedKeys = new int[]{0, 1, 2, 3};
KTable<Integer, String> table1;
KTable<Integer, String> table2;
KTable<Integer, String> joined;
MockProcessorSupplier<Integer, String> proc;
final KTable<Integer, String> table1;
final KTable<Integer, String> table2;
final KTable<Integer, String> joined;
final MockProcessorSupplier<Integer, String> proc;
table1 = builder.table(intSerde, stringSerde, topic1, storeName1);
table2 = builder.table(intSerde, stringSerde, topic2, storeName2);
@ -179,7 +179,7 @@ public class KTableKTableLeftJoinTest { @@ -179,7 +179,7 @@ public class KTableKTableLeftJoinTest {
driver = new KStreamTestDriver(builder, stateDir);
driver.setTime(0L);
assertFalse(((KTableImpl<?, ?, ?>) table1).sendingOldValueEnabled());
assertTrue(((KTableImpl<?, ?, ?>) table1).sendingOldValueEnabled());
assertFalse(((KTableImpl<?, ?, ?>) table2).sendingOldValueEnabled());
assertFalse(((KTableImpl<?, ?, ?>) joined).sendingOldValueEnabled());

12
streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableKTableOuterJoinTest.java

@ -5,9 +5,9 @@ @@ -5,9 +5,9 @@
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* <p>
* http://www.apache.org/licenses/LICENSE-2.0
* <p>
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
@ -19,9 +19,9 @@ package org.apache.kafka.streams.kstream.internals; @@ -19,9 +19,9 @@ package org.apache.kafka.streams.kstream.internals;
import org.apache.kafka.common.serialization.Serde;
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.streams.KeyValue;
import org.apache.kafka.streams.kstream.KStreamBuilder;
import org.apache.kafka.streams.kstream.KTable;
import org.apache.kafka.streams.KeyValue;
import org.apache.kafka.test.KStreamTestDriver;
import org.apache.kafka.test.MockProcessorSupplier;
import org.apache.kafka.test.MockValueJoiner;
@ -189,8 +189,8 @@ public class KTableKTableOuterJoinTest { @@ -189,8 +189,8 @@ public class KTableKTableOuterJoinTest {
driver = new KStreamTestDriver(builder, stateDir);
assertFalse(((KTableImpl<?, ?, ?>) table1).sendingOldValueEnabled());
assertFalse(((KTableImpl<?, ?, ?>) table2).sendingOldValueEnabled());
assertTrue(((KTableImpl<?, ?, ?>) table1).sendingOldValueEnabled());
assertTrue(((KTableImpl<?, ?, ?>) table2).sendingOldValueEnabled());
assertFalse(((KTableImpl<?, ?, ?>) joined).sendingOldValueEnabled());
// push two items to the primary stream. the other table is empty

Loading…
Cancel
Save