From 62c0972efc525cc0677bd3fd470bd9fbbd70b004 Mon Sep 17 00:00:00 2001 From: "Matthias J. Sax" Date: Thu, 20 Oct 2016 13:06:25 -0700 Subject: [PATCH] KAFKA-4001: Improve Kafka Streams Join Semantics (KIP-77) - fixed leftJoin -> outerJoin test bug - simplified to only use values - fixed inner KTable-KTable join - fixed left KTable-KTable join - fixed outer KTable-KTable join - fixed inner, left, and outer left KStream-KStream joins - added inner KStream-KTable join - fixed left KStream-KTable join Author: Matthias J. Sax Reviewers: Damian Guy , Guozhang Wang Closes #1777 from mjsax/kafka-4001-joins --- .../apache/kafka/streams/kstream/KStream.java | 44 +- .../kstream/internals/KStreamImpl.java | 272 +++++------ .../kstream/internals/KStreamKStreamJoin.java | 27 +- .../kstream/internals/KStreamKTableJoin.java | 75 +++ .../internals/KStreamKTableLeftJoin.java | 66 --- .../internals/KStreamWindowAggregate.java | 2 +- .../streams/kstream/internals/KTableImpl.java | 103 ++--- .../kstream/internals/KTableKTableJoin.java | 21 +- .../internals/KTableKTableLeftJoin.java | 19 +- .../internals/KTableKTableOuterJoin.java | 16 +- .../internals/KTableKTableRightJoin.java | 24 +- .../internals/ProcessorContextImpl.java | 4 - .../integration/JoinIntegrationTest.java | 433 ++++++++++++++++++ .../KTableKTableJoinIntegrationTest.java | 74 +-- .../internals/KStreamKStreamLeftJoinTest.java | 98 ++-- .../internals/KStreamKTableJoinTest.java | 146 ++++++ .../internals/KStreamWindowAggregateTest.java | 22 +- .../internals/KTableKTableJoinTest.java | 93 ++-- .../internals/KTableKTableLeftJoinTest.java | 18 +- .../internals/KTableKTableOuterJoinTest.java | 12 +- 20 files changed, 1075 insertions(+), 494 deletions(-) create mode 100644 streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamKTableJoin.java delete mode 100644 streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamKTableLeftJoin.java create mode 100644 streams/src/test/java/org/apache/kafka/streams/integration/JoinIntegrationTest.java create mode 100644 streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamKTableJoinTest.java diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/KStream.java b/streams/src/main/java/org/apache/kafka/streams/kstream/KStream.java index 032efb5233e..4483e9fd11f 100644 --- a/streams/src/main/java/org/apache/kafka/streams/kstream/KStream.java +++ b/streams/src/main/java/org/apache/kafka/streams/kstream/KStream.java @@ -5,9 +5,9 @@ * The ASF licenses this file to You under the Apache License, Version 2.0 * (the "License"); you may not use this file except in compliance with * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * + *

+ * http://www.apache.org/licenses/LICENSE-2.0 + *

* Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. @@ -532,6 +532,39 @@ public interface KStream { ValueJoiner joiner, JoinWindows windows); + /** + * Combine values of this stream with {@link KTable}'s elements of the same key using non-windowed Inner Join. + * If a record key or value is {@code null} it will not included in the resulting {@link KStream} + * + * @param table the instance of {@link KTable} joined with this stream + * @param joiner the instance of {@link ValueJoiner} + * @param the value type of the table + * @param the value type of the new stream + * @return a {@link KStream} that contains join-records for each key and values computed by the given {@link ValueJoiner}, + * one for each matched record-pair with the same key + */ + KStream join(KTable table, ValueJoiner joiner); + + /** + * Combine values of this stream with {@link KTable}'s elements of the same key using non-windowed Inner Join. + * If a record key or value is {@code null} it will not included in the resulting {@link KStream} + * + * @param table the instance of {@link KTable} joined with this stream + * @param valueJoiner the instance of {@link ValueJoiner} + * @param keySerde key serdes for materializing this stream. + * If not specified the default serdes defined in the configs will be used + * @param valSerde value serdes for materializing this stream, + * if not specified the default serdes defined in the configs will be used + * @param the value type of the table + * @param the value type of the new stream + * @return a {@link KStream} that contains join-records for each key and values computed by the given {@link ValueJoiner}, + * one for each matched record-pair with the same key and within the joining window intervals + */ + KStream join(KTable table, + ValueJoiner valueJoiner, + Serde keySerde, + Serde valSerde); + /** * Combine values of this stream with {@link KTable}'s elements of the same key using non-windowed Left Join. * If a record key is null it will not included in the resulting {@link KStream} @@ -566,6 +599,7 @@ public interface KStream { ValueJoiner valueJoiner, Serde keySerde, Serde valSerde); + /** * Group the records of this {@link KStream} using the provided {@link KeyValueMapper} and * default serializers and deserializers. If a record key is null it will not included in @@ -592,8 +626,8 @@ public interface KStream { * @return a {@link KGroupedStream} that contains the grouped records of the original {@link KStream} */ KGroupedStream groupBy(KeyValueMapper selector, - Serde keySerde, - Serde valSerde); + Serde keySerde, + Serde valSerde); /** * Group the records with the same key into a {@link KGroupedStream} while preserving the diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamImpl.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamImpl.java index bb77e963655..b67fca53589 100644 --- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamImpl.java +++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamImpl.java @@ -5,9 +5,9 @@ * The ASF licenses this file to You under the Apache License, Version 2.0 * (the "License"); you may not use this file except in compliance with * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * + *

+ * http://www.apache.org/licenses/LICENSE-2.0 + *

* Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. @@ -20,24 +20,25 @@ package org.apache.kafka.streams.kstream.internals; import org.apache.kafka.common.serialization.Deserializer; import org.apache.kafka.common.serialization.Serde; import org.apache.kafka.common.serialization.Serializer; +import org.apache.kafka.streams.KeyValue; import org.apache.kafka.streams.errors.TopologyBuilderException; +import org.apache.kafka.streams.kstream.ForeachAction; import org.apache.kafka.streams.kstream.JoinWindows; import org.apache.kafka.streams.kstream.KGroupedStream; +import org.apache.kafka.streams.kstream.KStream; import org.apache.kafka.streams.kstream.KStreamBuilder; import org.apache.kafka.streams.kstream.KTable; -import org.apache.kafka.streams.KeyValue; -import org.apache.kafka.streams.kstream.ForeachAction; -import org.apache.kafka.streams.kstream.TransformerSupplier; -import org.apache.kafka.streams.kstream.ValueJoiner; -import org.apache.kafka.streams.kstream.ValueTransformerSupplier; -import org.apache.kafka.streams.kstream.KStream; import org.apache.kafka.streams.kstream.KeyValueMapper; import org.apache.kafka.streams.kstream.Predicate; +import org.apache.kafka.streams.kstream.TransformerSupplier; +import org.apache.kafka.streams.kstream.ValueJoiner; import org.apache.kafka.streams.kstream.ValueMapper; +import org.apache.kafka.streams.kstream.ValueTransformerSupplier; import org.apache.kafka.streams.processor.ProcessorSupplier; import org.apache.kafka.streams.processor.StateStoreSupplier; import org.apache.kafka.streams.processor.StreamPartitioner; import org.apache.kafka.streams.state.Stores; + import java.io.FileNotFoundException; import java.io.FileOutputStream; import java.io.PrintStream; @@ -63,6 +64,8 @@ public class KStreamImpl extends AbstractStream implements KStream extends AbstractStream implements KStream keySerializer = keySerde == null ? null : keySerde.serializer(); Serializer valSerializer = valSerde == null ? null : valSerde.serializer(); - + if (partitioner == null && keySerializer != null && keySerializer instanceof WindowedSerializer) { WindowedSerializer windowedSerializer = (WindowedSerializer) keySerializer; partitioner = (StreamPartitioner) new WindowedStreamPartitioner(windowedSerializer); @@ -386,78 +389,59 @@ public class KStreamImpl extends AbstractStream implements KStream KStream join( - KStream other, - ValueJoiner joiner, - JoinWindows windows, - Serde keySerde, - Serde thisValueSerde, - Serde otherValueSerde) { + final KStream other, + final ValueJoiner joiner, + final JoinWindows windows, + final Serde keySerde, + final Serde thisValueSerde, + final Serde otherValueSerde) { - return join(other, joiner, windows, keySerde, thisValueSerde, otherValueSerde, false); + return doJoin(other, joiner, windows, keySerde, thisValueSerde, otherValueSerde, new KStreamImplJoin(false, false)); } @Override public KStream join( - KStream other, - ValueJoiner joiner, - JoinWindows windows) { + final KStream other, + final ValueJoiner joiner, + final JoinWindows windows) { - return join(other, joiner, windows, null, null, null, false); + return join(other, joiner, windows, null, null, null); } @Override public KStream outerJoin( - KStream other, - ValueJoiner joiner, - JoinWindows windows, - Serde keySerde, - Serde thisValueSerde, - Serde otherValueSerde) { + final KStream other, + final ValueJoiner joiner, + final JoinWindows windows, + final Serde keySerde, + final Serde thisValueSerde, + final Serde otherValueSerde) { - return join(other, joiner, windows, keySerde, thisValueSerde, otherValueSerde, true); + return doJoin(other, joiner, windows, keySerde, thisValueSerde, otherValueSerde, new KStreamImplJoin(true, true)); } @Override public KStream outerJoin( - KStream other, - ValueJoiner joiner, - JoinWindows windows) { + final KStream other, + final ValueJoiner joiner, + final JoinWindows windows) { - return join(other, joiner, windows, null, null, null, true); + return outerJoin(other, joiner, windows, null, null, null); } - @SuppressWarnings("unchecked") - private KStream join( - KStream other, - ValueJoiner joiner, - JoinWindows windows, - Serde keySerde, - Serde thisValueSerde, - Serde otherValueSerde, - boolean outer) { - - return doJoin(other, - joiner, - windows, - keySerde, - thisValueSerde, - otherValueSerde, - new DefaultJoin(outer)); - } - - private KStream doJoin(KStream other, - ValueJoiner joiner, - JoinWindows windows, - Serde keySerde, - Serde thisValueSerde, - Serde otherValueSerde, - KStreamImplJoin join) { + private KStream doJoin(final KStream other, + final ValueJoiner joiner, + final JoinWindows windows, + final Serde keySerde, + final Serde thisValueSerde, + final Serde otherValueSerde, + final KStreamImplJoin join) { Objects.requireNonNull(other, "other KStream can't be null"); Objects.requireNonNull(joiner, "joiner can't be null"); Objects.requireNonNull(windows, "windows can't be null"); KStreamImpl joinThis = this; - KStreamImpl joinOther = (KStreamImpl) other; + KStreamImpl joinOther = (KStreamImpl) other; if (joinThis.repartitionRequired) { joinThis = joinThis.repartitionForJoin(keySerde, thisValueSerde, null); @@ -531,20 +515,20 @@ public class KStreamImpl extends AbstractStream implements KStream KStream leftJoin( - KStream other, - ValueJoiner joiner, - JoinWindows windows, - Serde keySerde, - Serde thisValSerde, - Serde otherValueSerde) { + final KStream other, + final ValueJoiner joiner, + final JoinWindows windows, + final Serde keySerde, + final Serde thisValSerde, + final Serde otherValueSerde) { return doJoin(other, - joiner, - windows, - keySerde, - thisValSerde, - otherValueSerde, - new LeftJoin()); + joiner, + windows, + keySerde, + thisValSerde, + otherValueSerde, + new KStreamImplJoin(true, false)); } @Override @@ -558,41 +542,60 @@ public class KStreamImpl extends AbstractStream implements KStream KStream leftJoin(KTable other, ValueJoiner joiner) { - return leftJoin(other, joiner, null, null); + public KStream join(final KTable other, final ValueJoiner joiner) { + return join(other, joiner, null, null); } - public KStream leftJoin(KTable other, - ValueJoiner joiner, - Serde keySerde, - Serde valueSerde) { - Objects.requireNonNull(other, "other KTable can't be null"); - Objects.requireNonNull(joiner, "joiner can't be null"); - + @Override + public KStream join(final KTable other, + final ValueJoiner joiner, + final Serde keySerde, + final Serde valueSerde) { if (repartitionRequired) { - KStreamImpl thisStreamRepartitioned = this.repartitionForJoin(keySerde, - valueSerde, null); - return thisStreamRepartitioned.doStreamTableLeftJoin(other, joiner); + final KStreamImpl thisStreamRepartitioned = repartitionForJoin(keySerde, + valueSerde, null); + return thisStreamRepartitioned.doStreamTableJoin(other, joiner, false); } else { - return doStreamTableLeftJoin(other, joiner); + return doStreamTableJoin(other, joiner, false); } - } - private KStream doStreamTableLeftJoin(final KTable other, - final ValueJoiner joiner) { - Set allSourceNodes = ensureJoinableWith((AbstractStream) other); + private KStream doStreamTableJoin(final KTable other, + final ValueJoiner joiner, + final boolean leftJoin) { + Objects.requireNonNull(other, "other KTable can't be null"); + Objects.requireNonNull(joiner, "joiner can't be null"); + + final Set allSourceNodes = ensureJoinableWith((AbstractStream) other); - String name = topology.newName(LEFTJOIN_NAME); + final String name = topology.newName(leftJoin ? LEFTJOIN_NAME : JOIN_NAME); - topology.addProcessor(name, new KStreamKTableLeftJoin<>((KTableImpl) other, joiner), this.name); - topology.connectProcessorAndStateStores(name, ((KTableImpl) other).valueGetterSupplier().storeNames()); + topology.addProcessor(name, new KStreamKTableJoin<>((KTableImpl) other, joiner, leftJoin), this.name); + topology.connectProcessorAndStateStores(name, other.getStoreName()); topology.connectProcessors(this.name, ((KTableImpl) other).name); return new KStreamImpl<>(topology, name, allSourceNodes, false); } + @Override + public KStream leftJoin(final KTable other, final ValueJoiner joiner) { + return leftJoin(other, joiner, null, null); + } + + public KStream leftJoin(final KTable other, + final ValueJoiner joiner, + final Serde keySerde, + final Serde valueSerde) { + if (repartitionRequired) { + final KStreamImpl thisStreamRepartitioned = this.repartitionForJoin(keySerde, + valueSerde, null); + return thisStreamRepartitioned.doStreamTableJoin(other, joiner, true); + } else { + return doStreamTableJoin(other, joiner, true); + } + } + @Override public KGroupedStream groupBy(KeyValueMapper selector) { return groupBy(selector, null, null); @@ -600,8 +603,8 @@ public class KStreamImpl extends AbstractStream implements KStream KGroupedStream groupBy(KeyValueMapper selector, - Serde keySerde, - Serde valSerde) { + Serde keySerde, + Serde valSerde) { Objects.requireNonNull(selector, "selector can't be null"); String selectName = internalSelectKey(selector); @@ -641,26 +644,16 @@ public class KStreamImpl extends AbstractStream implements KStream KStream join(KStream lhs, - KStream other, - ValueJoiner joiner, - JoinWindows windows, - Serde keySerde, - Serde lhsValueSerde, - Serde otherValueSerde); - } - - private class DefaultJoin implements KStreamImplJoin { - - private final boolean outer; + private final boolean leftOuter; + private final boolean rightOuter; - DefaultJoin(final boolean outer) { - this.outer = outer; + KStreamImplJoin(final boolean leftOuter, final boolean rightOuter) { + this.leftOuter = leftOuter; + this.rightOuter = rightOuter; } - @Override public KStream join(KStream lhs, KStream other, ValueJoiner joiner, @@ -670,12 +663,12 @@ public class KStreamImpl extends AbstractStream implements KStream otherValueSerde) { String thisWindowStreamName = topology.newName(WINDOWED_NAME); String otherWindowStreamName = topology.newName(WINDOWED_NAME); - String joinThisName = outer ? topology.newName(OUTERTHIS_NAME) : topology.newName(JOINTHIS_NAME); - String joinOtherName = outer ? topology.newName(OUTEROTHER_NAME) : topology.newName(JOINOTHER_NAME); + String joinThisName = rightOuter ? topology.newName(OUTERTHIS_NAME) : topology.newName(JOINTHIS_NAME); + String joinOtherName = leftOuter ? topology.newName(OUTEROTHER_NAME) : topology.newName(JOINOTHER_NAME); String joinMergeName = topology.newName(MERGE_NAME); StateStoreSupplier thisWindow = - createWindowedStateStore(windows, keySerde, lhsValueSerde, joinThisName + "-store"); + createWindowedStateStore(windows, keySerde, lhsValueSerde, joinThisName + "-store"); StateStoreSupplier otherWindow = createWindowedStateStore(windows, keySerde, otherValueSerde, joinOtherName + "-store"); @@ -688,16 +681,16 @@ public class KStreamImpl extends AbstractStream implements KStream joinThis = new KStreamKStreamJoin<>(otherWindow.name(), - windows.before, - windows.after, - joiner, - outer); - KStreamKStreamJoin joinOther = new KStreamKStreamJoin<>(thisWindow.name(), - windows.after, - windows.before, - reverseJoiner(joiner), - outer); + final KStreamKStreamJoin joinThis = new KStreamKStreamJoin<>(otherWindow.name(), + windows.before, + windows.after, + joiner, + leftOuter); + final KStreamKStreamJoin joinOther = new KStreamKStreamJoin<>(thisWindow.name(), + windows.after, + windows.before, + reverseJoiner(joiner), + rightOuter); KStreamPassThrough joinMerge = new KStreamPassThrough<>(); @@ -716,39 +709,4 @@ public class KStreamImpl extends AbstractStream implements KStream KStream join(KStream lhs, - KStream other, - ValueJoiner joiner, - JoinWindows windows, - Serde keySerde, - Serde lhsValueSerde, - Serde otherValueSerde) { - String otherWindowStreamName = topology.newName(WINDOWED_NAME); - String joinThisName = topology.newName(LEFTJOIN_NAME); - - StateStoreSupplier otherWindow = - createWindowedStateStore(windows, keySerde, otherValueSerde, joinThisName + "-store"); - - KStreamJoinWindow - otherWindowedStream = new KStreamJoinWindow<>(otherWindow.name(), windows.before + windows.after + 1, windows.maintainMs()); - KStreamKStreamJoin - joinThis = new KStreamKStreamJoin<>(otherWindow.name(), windows.before, windows.after, joiner, true); - - - - topology.addProcessor(otherWindowStreamName, otherWindowedStream, ((AbstractStream) other).name); - topology.addProcessor(joinThisName, joinThis, ((AbstractStream) lhs).name); - topology.addStateStore(otherWindow, joinThisName, otherWindowStreamName); - - Set allSourceNodes = new HashSet<>(((AbstractStream) lhs).sourceNodes); - allSourceNodes.addAll(((KStreamImpl) other).sourceNodes); - return new KStreamImpl<>(topology, joinThisName, allSourceNodes, false); - } - } - - } diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamJoin.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamJoin.java index edde0090832..41547b9d3a5 100644 --- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamJoin.java +++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamJoin.java @@ -5,9 +5,9 @@ * The ASF licenses this file to You under the Apache License, Version 2.0 * (the "License"); you may not use this file except in compliance with * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * + *

+ * http://www.apache.org/licenses/LICENSE-2.0 + *

* Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. @@ -52,7 +52,6 @@ class KStreamKStreamJoin implements ProcessorSupplier { private WindowStore otherWindow; - @SuppressWarnings("unchecked") @Override public void init(ProcessorContext context) { super.init(context); @@ -62,14 +61,21 @@ class KStreamKStreamJoin implements ProcessorSupplier { @Override - public void process(K key, V1 value) { - if (key == null) + public void process(final K key, final V1 value) { + // we do join iff keys are equal, thus, if key is null we cannot join and just ignore the record + // + // we also ignore the record if value is null, because in a key-value data model a null-value indicates + // an empty message (ie, there is nothing to be joined) -- this contrast SQL NULL semantics + // furthermore, on left/outer joins 'null' in ValueJoiner#apply() indicates a missing record -- + // thus, to be consistent and to avoid ambiguous null semantics, null values are ignored + if (key == null || value == null) { return; + } - boolean needOuterJoin = KStreamKStreamJoin.this.outer; + boolean needOuterJoin = outer; - long timeFrom = Math.max(0L, context().timestamp() - joinBeforeMs); - long timeTo = Math.max(0L, context().timestamp() + joinAfterMs); + final long timeFrom = Math.max(0L, context().timestamp() - joinBeforeMs); + final long timeTo = Math.max(0L, context().timestamp() + joinAfterMs); try (WindowStoreIterator iter = otherWindow.fetch(key, timeFrom, timeTo)) { while (iter.hasNext()) { @@ -77,8 +83,9 @@ class KStreamKStreamJoin implements ProcessorSupplier { context().forward(key, joiner.apply(value, iter.next().value)); } - if (needOuterJoin) + if (needOuterJoin) { context().forward(key, joiner.apply(value, null)); + } } } } diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamKTableJoin.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamKTableJoin.java new file mode 100644 index 00000000000..1027b96d4f6 --- /dev/null +++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamKTableJoin.java @@ -0,0 +1,75 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + *

+ * http://www.apache.org/licenses/LICENSE-2.0 + *

+ * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kafka.streams.kstream.internals; + +import org.apache.kafka.streams.kstream.ValueJoiner; +import org.apache.kafka.streams.processor.AbstractProcessor; +import org.apache.kafka.streams.processor.Processor; +import org.apache.kafka.streams.processor.ProcessorContext; +import org.apache.kafka.streams.processor.ProcessorSupplier; + +class KStreamKTableJoin implements ProcessorSupplier { + + private final KTableValueGetterSupplier valueGetterSupplier; + private final ValueJoiner joiner; + private final boolean leftJoin; + + KStreamKTableJoin(final KTableImpl table, final ValueJoiner joiner, final boolean leftJoin) { + valueGetterSupplier = table.valueGetterSupplier(); + this.joiner = joiner; + this.leftJoin = leftJoin; + } + + @Override + public Processor get() { + return new KStreamKTableJoinProcessor(valueGetterSupplier.get(), leftJoin); + } + + private class KStreamKTableJoinProcessor extends AbstractProcessor { + + private final KTableValueGetter valueGetter; + private final boolean leftJoin; + + KStreamKTableJoinProcessor(final KTableValueGetter valueGetter, final boolean leftJoin) { + this.valueGetter = valueGetter; + this.leftJoin = leftJoin; + } + + @Override + public void init(final ProcessorContext context) { + super.init(context); + valueGetter.init(context); + } + + @Override + public void process(final K key, final V1 value) { + // we do join iff keys are equal, thus, if key is null we cannot join and just ignore the record + // + // we also ignore the record if value is null, because in a key-value data model a null-value indicates + // an empty message (ie, there is nothing to be joined) -- this contrast SQL NULL semantics + // furthermore, on left/outer joins 'null' in ValueJoiner#apply() indicates a missing record -- + // thus, to be consistent and to avoid ambiguous null semantics, null values are ignored + if (key != null && value != null) { + final V2 value2 = valueGetter.get(key); + if (leftJoin || value2 != null) { + context().forward(key, joiner.apply(value, value2)); + } + } + } + } +} diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamKTableLeftJoin.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamKTableLeftJoin.java deleted file mode 100644 index 92b9b59f945..00000000000 --- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamKTableLeftJoin.java +++ /dev/null @@ -1,66 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.kafka.streams.kstream.internals; - -import org.apache.kafka.streams.kstream.ValueJoiner; -import org.apache.kafka.streams.processor.AbstractProcessor; -import org.apache.kafka.streams.processor.Processor; -import org.apache.kafka.streams.processor.ProcessorContext; -import org.apache.kafka.streams.processor.ProcessorSupplier; - -class KStreamKTableLeftJoin implements ProcessorSupplier { - - private final KTableValueGetterSupplier valueGetterSupplier; - private final ValueJoiner joiner; - - KStreamKTableLeftJoin(KTableImpl table, ValueJoiner joiner) { - this.valueGetterSupplier = table.valueGetterSupplier(); - this.joiner = joiner; - } - - @Override - public Processor get() { - return new KStreamKTableLeftJoinProcessor(valueGetterSupplier.get()); - } - - private class KStreamKTableLeftJoinProcessor extends AbstractProcessor { - - private final KTableValueGetter valueGetter; - - public KStreamKTableLeftJoinProcessor(KTableValueGetter valueGetter) { - this.valueGetter = valueGetter; - } - - @SuppressWarnings("unchecked") - @Override - public void init(ProcessorContext context) { - super.init(context); - valueGetter.init(context); - } - - @Override - public void process(K key, V1 value) { - // if the key is null, we do not need proceed joining - // the record with the table - if (key != null) { - context().forward(key, joiner.apply(value, valueGetter.get(key))); - } - } - } - -} diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamWindowAggregate.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamWindowAggregate.java index 718e52bda11..55b09166522 100644 --- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamWindowAggregate.java +++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamWindowAggregate.java @@ -17,8 +17,8 @@ package org.apache.kafka.streams.kstream.internals; -import org.apache.kafka.streams.KeyValue; import org.apache.kafka.streams.kstream.Aggregator; +import org.apache.kafka.streams.KeyValue; import org.apache.kafka.streams.kstream.Initializer; import org.apache.kafka.streams.kstream.Window; import org.apache.kafka.streams.kstream.Windowed; diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableImpl.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableImpl.java index 6423cffabf2..683dc00a064 100644 --- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableImpl.java +++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableImpl.java @@ -5,9 +5,9 @@ * The ASF licenses this file to You under the Apache License, Version 2.0 * (the "License"); you may not use this file except in compliance with * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * + *

+ * http://www.apache.org/licenses/LICENSE-2.0 + *

* Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. @@ -44,6 +44,7 @@ import java.util.Set; /** * The implementation class of {@link KTable}. + * * @param the key type * @param the source's (parent's) value type * @param the value type @@ -283,77 +284,55 @@ public class KTableImpl extends AbstractStream implements KTable KTable join(KTable other, ValueJoiner joiner) { - Objects.requireNonNull(other, "other can't be null"); - Objects.requireNonNull(joiner, "joiner can't be null"); - - Set allSourceNodes = ensureJoinableWith((AbstractStream) other); - - String joinThisName = topology.newName(JOINTHIS_NAME); - String joinOtherName = topology.newName(JOINOTHER_NAME); - String joinMergeName = topology.newName(MERGE_NAME); - - KTableKTableJoin joinThis = new KTableKTableJoin<>(this, (KTableImpl) other, joiner); - KTableKTableJoin joinOther = new KTableKTableJoin<>((KTableImpl) other, this, reverseJoiner(joiner)); - KTableKTableJoinMerger joinMerge = new KTableKTableJoinMerger<>( - new KTableImpl(topology, joinThisName, joinThis, this.sourceNodes, this.storeName), - new KTableImpl(topology, joinOtherName, joinOther, ((KTableImpl) other).sourceNodes, other.getStoreName()) - ); - - topology.addProcessor(joinThisName, joinThis, this.name); - topology.addProcessor(joinOtherName, joinOther, ((KTableImpl) other).name); - topology.addProcessor(joinMergeName, joinMerge, joinThisName, joinOtherName); - topology.connectProcessorAndStateStores(joinThisName, ((KTableImpl) other).valueGetterSupplier().storeNames()); - topology.connectProcessorAndStateStores(joinOtherName, valueGetterSupplier().storeNames()); - - return new KTableImpl<>(topology, joinMergeName, joinMerge, allSourceNodes, null); + public KTable join(final KTable other, final ValueJoiner joiner) { + return doJoin(other, joiner, false, false); } - @SuppressWarnings("unchecked") @Override - public KTable outerJoin(KTable other, ValueJoiner joiner) { - Objects.requireNonNull(other, "other can't be null"); - Objects.requireNonNull(joiner, "joiner can't be null"); - - Set allSourceNodes = ensureJoinableWith((AbstractStream) other); - - String joinThisName = topology.newName(OUTERTHIS_NAME); - String joinOtherName = topology.newName(OUTEROTHER_NAME); - String joinMergeName = topology.newName(MERGE_NAME); - - KTableKTableOuterJoin joinThis = new KTableKTableOuterJoin<>(this, (KTableImpl) other, joiner); - KTableKTableOuterJoin joinOther = new KTableKTableOuterJoin<>((KTableImpl) other, this, reverseJoiner(joiner)); - KTableKTableJoinMerger joinMerge = new KTableKTableJoinMerger<>( - new KTableImpl(topology, joinThisName, joinThis, this.sourceNodes, this.storeName), - new KTableImpl(topology, joinOtherName, joinOther, ((KTableImpl) other).sourceNodes, other.getStoreName()) - ); - - topology.addProcessor(joinThisName, joinThis, this.name); - topology.addProcessor(joinOtherName, joinOther, ((KTableImpl) other).name); - topology.addProcessor(joinMergeName, joinMerge, joinThisName, joinOtherName); - topology.connectProcessorAndStateStores(joinThisName, ((KTableImpl) other).valueGetterSupplier().storeNames()); - topology.connectProcessorAndStateStores(joinOtherName, valueGetterSupplier().storeNames()); + public KTable outerJoin(final KTable other, final ValueJoiner joiner) { + return doJoin(other, joiner, true, true); + } - return new KTableImpl<>(topology, joinMergeName, joinMerge, allSourceNodes, null); + @Override + public KTable leftJoin(final KTable other, final ValueJoiner joiner) { + return doJoin(other, joiner, true, false); } @SuppressWarnings("unchecked") - @Override - public KTable leftJoin(KTable other, ValueJoiner joiner) { + private KTable doJoin(final KTable other, ValueJoiner joiner, final boolean leftOuter, final boolean rightOuter) { Objects.requireNonNull(other, "other can't be null"); Objects.requireNonNull(joiner, "joiner can't be null"); - Set allSourceNodes = ensureJoinableWith((AbstractStream) other); - String joinThisName = topology.newName(LEFTTHIS_NAME); - String joinOtherName = topology.newName(LEFTOTHER_NAME); - String joinMergeName = topology.newName(MERGE_NAME); + final Set allSourceNodes = ensureJoinableWith((AbstractStream) other); + + if (leftOuter) { + enableSendingOldValues(); + } + if (rightOuter) { + ((KTableImpl) other).enableSendingOldValues(); + } + + final String joinThisName = topology.newName(JOINTHIS_NAME); + final String joinOtherName = topology.newName(JOINOTHER_NAME); + final String joinMergeName = topology.newName(MERGE_NAME); + + final KTableKTableAbstractJoin joinThis; + final KTableKTableAbstractJoin joinOther; + + if (!leftOuter) { // inner + joinThis = new KTableKTableJoin<>(this, (KTableImpl) other, joiner); + joinOther = new KTableKTableJoin<>((KTableImpl) other, this, reverseJoiner(joiner)); + } else if (!rightOuter) { // left + joinThis = new KTableKTableLeftJoin<>(this, (KTableImpl) other, joiner); + joinOther = new KTableKTableRightJoin<>((KTableImpl) other, this, reverseJoiner(joiner)); + } else { // outer + joinThis = new KTableKTableOuterJoin<>(this, (KTableImpl) other, joiner); + joinOther = new KTableKTableOuterJoin<>((KTableImpl) other, this, reverseJoiner(joiner)); + } - KTableKTableLeftJoin joinThis = new KTableKTableLeftJoin<>(this, (KTableImpl) other, joiner); - KTableKTableRightJoin joinOther = new KTableKTableRightJoin<>((KTableImpl) other, this, reverseJoiner(joiner)); - KTableKTableJoinMerger joinMerge = new KTableKTableJoinMerger<>( - new KTableImpl(topology, joinThisName, joinThis, this.sourceNodes, this.storeName), + final KTableKTableJoinMerger joinMerge = new KTableKTableJoinMerger<>( + new KTableImpl(topology, joinThisName, joinThis, sourceNodes, storeName), new KTableImpl(topology, joinOtherName, joinOther, ((KTableImpl) other).sourceNodes, other.getStoreName()) ); diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableKTableJoin.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableKTableJoin.java index cbd626df442..49f671573a9 100644 --- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableKTableJoin.java +++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableKTableJoin.java @@ -5,9 +5,9 @@ * The ASF licenses this file to You under the Apache License, Version 2.0 * (the "License"); you may not use this file except in compliance with * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * + *

+ * http://www.apache.org/licenses/LICENSE-2.0 + *

* Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. @@ -69,23 +69,26 @@ class KTableKTableJoin extends KTableKTableAbstractJoin change) { + public void process(final K key, final Change change) { // the keys should never be null if (key == null) throw new StreamsException("Record key for KTable join operator should not be null."); R newValue = null; R oldValue = null; - V2 value2 = null; - if (change.newValue != null || change.oldValue != null) - value2 = valueGetter.get(key); + final V2 value2 = valueGetter.get(key); + if (value2 == null) { + return; + } - if (change.newValue != null && value2 != null) + if (change.newValue != null) { newValue = joiner.apply(change.newValue, value2); + } - if (sendOldValues && change.oldValue != null && value2 != null) + if (sendOldValues && change.oldValue != null) { oldValue = joiner.apply(change.oldValue, value2); + } context().forward(key, new Change<>(newValue, oldValue)); } diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableKTableLeftJoin.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableKTableLeftJoin.java index 4bee38cb064..5f5cad67845 100644 --- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableKTableLeftJoin.java +++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableKTableLeftJoin.java @@ -5,9 +5,9 @@ * The ASF licenses this file to You under the Apache License, Version 2.0 * (the "License"); you may not use this file except in compliance with * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * + *

+ * http://www.apache.org/licenses/LICENSE-2.0 + *

* Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. @@ -70,27 +70,28 @@ class KTableKTableLeftJoin extends KTableKTableAbstractJoin change) { + public void process(final K key, final Change change) { // the keys should never be null if (key == null) throw new StreamsException("Record key for KTable left-join operator should not be null."); R newValue = null; R oldValue = null; - V2 value2 = null; - if (change.newValue != null || change.oldValue != null) - value2 = valueGetter.get(key); + final V2 value2 = valueGetter.get(key); + if (value2 == null && change.newValue == null && change.oldValue == null) { + return; + } - if (change.newValue != null) + if (change.newValue != null) { newValue = joiner.apply(change.newValue, value2); + } if (sendOldValues && change.oldValue != null) oldValue = joiner.apply(change.oldValue, value2); context().forward(key, new Change<>(newValue, oldValue)); } - } private class KTableKTableLeftJoinValueGetter implements KTableValueGetter { diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableKTableOuterJoin.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableKTableOuterJoin.java index ad7dbde37c3..2bfd8a5611d 100644 --- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableKTableOuterJoin.java +++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableKTableOuterJoin.java @@ -69,21 +69,25 @@ class KTableKTableOuterJoin extends KTableKTableAbstractJoin change) { + public void process(final K key, final Change change) { // the keys should never be null if (key == null) throw new StreamsException("Record key for KTable outer-join operator should not be null."); R newValue = null; R oldValue = null; - V2 value2 = valueGetter.get(key); - if (change.newValue != null || value2 != null) + final V2 value2 = valueGetter.get(key); + if (value2 == null && change.newValue == null && change.oldValue == null) { + return; + } + + if (value2 != null || change.newValue != null) { newValue = joiner.apply(change.newValue, value2); + } - if (sendOldValues) { - if (change.oldValue != null || value2 != null) - oldValue = joiner.apply(change.oldValue, value2); + if (sendOldValues && (value2 != null || change.oldValue != null)) { + oldValue = joiner.apply(change.oldValue, value2); } context().forward(key, new Change<>(newValue, oldValue)); diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableKTableRightJoin.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableKTableRightJoin.java index 80aadaa1427..8aeadcc3a5d 100644 --- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableKTableRightJoin.java +++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableKTableRightJoin.java @@ -5,9 +5,9 @@ * The ASF licenses this file to You under the Apache License, Version 2.0 * (the "License"); you may not use this file except in compliance with * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * + *

+ * http://www.apache.org/licenses/LICENSE-2.0 + *

* Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. @@ -70,19 +70,23 @@ class KTableKTableRightJoin extends KTableKTableAbstractJoin change) { + public void process(final K key, final Change change) { // the keys should never be null if (key == null) throw new StreamsException("Record key for KTable right-join operator should not be null."); - R newValue = null; + final R newValue; R oldValue = null; - V2 value2 = valueGetter.get(key); - if (value2 != null) { - newValue = joiner.apply(change.newValue, value2); - if (sendOldValues) - oldValue = joiner.apply(change.oldValue, value2); + final V2 value2 = valueGetter.get(key); + if (value2 == null) { + return; + } + + newValue = joiner.apply(change.newValue, value2); + + if (sendOldValues) { + oldValue = joiner.apply(change.oldValue, value2); } context().forward(key, new Change<>(newValue, oldValue)); diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorContextImpl.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorContextImpl.java index 195e5a46003..11ca30ec26f 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorContextImpl.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorContextImpl.java @@ -48,7 +48,6 @@ public class ProcessorContextImpl implements InternalProcessorContext, RecordCol private RecordContext recordContext; private ProcessorNode currentNode; - @SuppressWarnings("unchecked") public ProcessorContextImpl(TaskId id, StreamTask task, StreamsConfig config, @@ -194,7 +193,6 @@ public class ProcessorContextImpl implements InternalProcessorContext, RecordCol return recordContext.timestamp(); } - @SuppressWarnings("unchecked") @Override public void forward(K key, V value) { ProcessorNode previousNode = currentNode; @@ -208,7 +206,6 @@ public class ProcessorContextImpl implements InternalProcessorContext, RecordCol } } - @SuppressWarnings("unchecked") @Override public void forward(K key, V value, int childIndex) { ProcessorNode previousNode = currentNode; @@ -221,7 +218,6 @@ public class ProcessorContextImpl implements InternalProcessorContext, RecordCol } } - @SuppressWarnings("unchecked") @Override public void forward(K key, V value, String childName) { for (ProcessorNode child : (List>) currentNode.children()) { diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/JoinIntegrationTest.java b/streams/src/test/java/org/apache/kafka/streams/integration/JoinIntegrationTest.java new file mode 100644 index 00000000000..0f705884b2b --- /dev/null +++ b/streams/src/test/java/org/apache/kafka/streams/integration/JoinIntegrationTest.java @@ -0,0 +1,433 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + *

+ * http://www.apache.org/licenses/LICENSE-2.0 + *

+ * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.streams.integration; + +import kafka.utils.ZkUtils; +import org.apache.kafka.clients.consumer.ConsumerConfig; +import org.apache.kafka.clients.producer.ProducerConfig; +import org.apache.kafka.common.security.JaasUtils; +import org.apache.kafka.common.serialization.LongDeserializer; +import org.apache.kafka.common.serialization.LongSerializer; +import org.apache.kafka.common.serialization.Serdes; +import org.apache.kafka.common.serialization.StringDeserializer; +import org.apache.kafka.common.serialization.StringSerializer; +import org.apache.kafka.streams.KafkaStreams; +import org.apache.kafka.streams.KeyValue; +import org.apache.kafka.streams.StreamsConfig; +import org.apache.kafka.streams.integration.utils.EmbeddedKafkaCluster; +import org.apache.kafka.streams.integration.utils.IntegrationTestUtils; +import org.apache.kafka.streams.kstream.JoinWindows; +import org.apache.kafka.streams.kstream.KStream; +import org.apache.kafka.streams.kstream.KStreamBuilder; +import org.apache.kafka.streams.kstream.KTable; +import org.apache.kafka.streams.kstream.ValueJoiner; +import org.apache.kafka.test.TestCondition; +import org.apache.kafka.test.TestUtils; +import org.junit.After; +import org.junit.AfterClass; +import org.junit.Before; +import org.junit.BeforeClass; +import org.junit.ClassRule; +import org.junit.Test; + +import java.util.Arrays; +import java.util.Collections; +import java.util.HashSet; +import java.util.Iterator; +import java.util.List; +import java.util.Properties; +import java.util.Set; + +import static org.hamcrest.MatcherAssert.assertThat; +import static org.hamcrest.core.Is.is; + +/** + * Tests all available joins of Kafka Streams DSL. + */ +public class JoinIntegrationTest { + @ClassRule + public static final EmbeddedKafkaCluster CLUSTER = new EmbeddedKafkaCluster(1); + + private static ZkUtils zkUtils = null; + + private static final String APP_ID = "join-integration-test"; + private static final String INPUT_TOPIC_1 = "inputTopicLeft"; + private static final String INPUT_TOPIC_2 = "inputTopicRight"; + private static final String OUTPUT_TOPIC = "outputTopic"; + + private final static Properties PRODUCER_CONFIG = new Properties(); + private final static Properties RESULT_CONSUMER_CONFIG = new Properties(); + private final static Properties STREAMS_CONFIG = new Properties(); + + private KStreamBuilder builder; + private KStream leftStream; + private KStream rightStream; + private KTable leftTable; + private KTable rightTable; + + private final List> input = Arrays.asList( + new Input<>(INPUT_TOPIC_1, (String) null), + new Input<>(INPUT_TOPIC_2, (String) null), + new Input<>(INPUT_TOPIC_1, "A"), + new Input<>(INPUT_TOPIC_2, "a"), + new Input<>(INPUT_TOPIC_1, "B"), + new Input<>(INPUT_TOPIC_2, "b"), + new Input<>(INPUT_TOPIC_1, (String) null), + new Input<>(INPUT_TOPIC_2, (String) null), + new Input<>(INPUT_TOPIC_1, "C"), + new Input<>(INPUT_TOPIC_2, "c"), + new Input<>(INPUT_TOPIC_2, (String) null), + new Input<>(INPUT_TOPIC_1, (String) null), + new Input<>(INPUT_TOPIC_2, (String) null), + new Input<>(INPUT_TOPIC_2, "d"), + new Input<>(INPUT_TOPIC_1, "D") + ); + + private final ValueJoiner valueJoiner = new ValueJoiner() { + @Override + public String apply(final String value1, final String value2) { + return value1 + "-" + value2; + } + }; + + private final TestCondition topicsGotDeleted = new TopicsGotDeletedCondition(); + + @BeforeClass + public static void setupConfigsAndUtils() throws Exception { + PRODUCER_CONFIG.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, CLUSTER.bootstrapServers()); + PRODUCER_CONFIG.put(ProducerConfig.ACKS_CONFIG, "all"); + PRODUCER_CONFIG.put(ProducerConfig.RETRIES_CONFIG, 0); + PRODUCER_CONFIG.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, LongSerializer.class); + PRODUCER_CONFIG.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class); + + RESULT_CONSUMER_CONFIG.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, CLUSTER.bootstrapServers()); + RESULT_CONSUMER_CONFIG.put(ConsumerConfig.GROUP_ID_CONFIG, APP_ID + "-result-consumer"); + RESULT_CONSUMER_CONFIG.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest"); + RESULT_CONSUMER_CONFIG.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, LongDeserializer.class); + RESULT_CONSUMER_CONFIG.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class); + + STREAMS_CONFIG.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, CLUSTER.bootstrapServers()); + STREAMS_CONFIG.put(StreamsConfig.ZOOKEEPER_CONNECT_CONFIG, CLUSTER.zKConnectString()); + STREAMS_CONFIG.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest"); + STREAMS_CONFIG.put(StreamsConfig.STATE_DIR_CONFIG, TestUtils.tempDirectory().getPath()); + STREAMS_CONFIG.put(StreamsConfig.KEY_SERDE_CLASS_CONFIG, Serdes.Long().getClass()); + STREAMS_CONFIG.put(StreamsConfig.VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass()); + STREAMS_CONFIG.put(StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG, 0); + + zkUtils = ZkUtils.apply(CLUSTER.zKConnectString(), + 30000, + 30000, + JaasUtils.isZkSecurityEnabled()); + } + + @AfterClass + public static void release() { + if (zkUtils != null) { + zkUtils.close(); + } + } + + @Before + public void prepareTopology() throws Exception { + CLUSTER.createTopic(INPUT_TOPIC_1); + CLUSTER.createTopic(INPUT_TOPIC_2); + CLUSTER.createTopic(OUTPUT_TOPIC); + + builder = new KStreamBuilder(); + leftTable = builder.table(INPUT_TOPIC_1, "leftTable"); + rightTable = builder.table(INPUT_TOPIC_2, "rightTable"); + leftStream = leftTable.toStream(); + rightStream = rightTable.toStream(); + } + + @After + public void cleanup() throws Exception { + CLUSTER.deleteTopic(INPUT_TOPIC_1); + CLUSTER.deleteTopic(INPUT_TOPIC_2); + CLUSTER.deleteTopic(OUTPUT_TOPIC); + + TestUtils.waitForCondition(topicsGotDeleted, 120000, "Topics not deleted after 120 seconds."); + } + + private void checkResult(final String outputTopic, final List expectedResult) throws Exception { + if (expectedResult != null) { + final List result = IntegrationTestUtils.waitUntilMinValuesRecordsReceived(RESULT_CONSUMER_CONFIG, outputTopic, expectedResult.size(), Long.MAX_VALUE); + assertThat(result, is(expectedResult)); + } + } + + /* + * Runs the actual test. Checks the result after each input record to ensure fixed processing order. + * If an input tuple does not trigger any result, "expectedResult" should contain a "null" entry + */ + private void runTest(final List> expectedResult) throws Exception { + assert expectedResult.size() == input.size(); + + IntegrationTestUtils.purgeLocalStreamsState(STREAMS_CONFIG); + final KafkaStreams streams = new KafkaStreams(builder, STREAMS_CONFIG); + try { + streams.start(); + + long ts = System.currentTimeMillis(); + + final Iterator> resultIterator = expectedResult.iterator(); + for (final Input singleInput : input) { + IntegrationTestUtils.produceKeyValuesSynchronouslyWithTimestamp(singleInput.topic, Collections.singleton(singleInput.record), PRODUCER_CONFIG, ++ts); + checkResult(OUTPUT_TOPIC, resultIterator.next()); + } + } finally { + streams.close(); + } + } + + @Test + public void testInnerKStreamKStream() throws Exception { + STREAMS_CONFIG.put(StreamsConfig.APPLICATION_ID_CONFIG, APP_ID + "-inner-KStream-KStream"); + + final List> expectedResult = Arrays.asList( + null, + null, + null, + Collections.singletonList("A-a"), + Collections.singletonList("B-a"), + Arrays.asList("A-b", "B-b"), + null, + null, + Arrays.asList("C-a", "C-b"), + Arrays.asList("A-c", "B-c", "C-c"), + null, + null, + null, + Arrays.asList("A-d", "B-d", "C-d"), + Arrays.asList("D-a", "D-b", "D-c", "D-d") + ); + + leftStream.join(rightStream, valueJoiner, JoinWindows.of(10000)).to(OUTPUT_TOPIC); + + runTest(expectedResult); + } + + @Test + public void testLeftKStreamKStream() throws Exception { + STREAMS_CONFIG.put(StreamsConfig.APPLICATION_ID_CONFIG, APP_ID + "-left-KStream-KStream"); + + final List> expectedResult = Arrays.asList( + null, + null, + Collections.singletonList("A-null"), + Collections.singletonList("A-a"), + Collections.singletonList("B-a"), + Arrays.asList("A-b", "B-b"), + null, + null, + Arrays.asList("C-a", "C-b"), + Arrays.asList("A-c", "B-c", "C-c"), + null, + null, + null, + Arrays.asList("A-d", "B-d", "C-d"), + Arrays.asList("D-a", "D-b", "D-c", "D-d") + ); + + leftStream.leftJoin(rightStream, valueJoiner, JoinWindows.of(10000)).to(OUTPUT_TOPIC); + + runTest(expectedResult); + } + + @Test + public void testOuterKStreamKStream() throws Exception { + STREAMS_CONFIG.put(StreamsConfig.APPLICATION_ID_CONFIG, APP_ID + "-outer-KStream-KStream"); + + final List> expectedResult = Arrays.asList( + null, + null, + Collections.singletonList("A-null"), + Collections.singletonList("A-a"), + Collections.singletonList("B-a"), + Arrays.asList("A-b", "B-b"), + null, + null, + Arrays.asList("C-a", "C-b"), + Arrays.asList("A-c", "B-c", "C-c"), + null, + null, + null, + Arrays.asList("A-d", "B-d", "C-d"), + Arrays.asList("D-a", "D-b", "D-c", "D-d") + ); + + leftStream.outerJoin(rightStream, valueJoiner, JoinWindows.of(10000)).to(OUTPUT_TOPIC); + + runTest(expectedResult); + } + + @Test + public void testInnerKStreamKTable() throws Exception { + STREAMS_CONFIG.put(StreamsConfig.APPLICATION_ID_CONFIG, APP_ID + "-inner-KStream-KTable"); + + final List> expectedResult = Arrays.asList( + null, + null, + null, + null, + Collections.singletonList("B-a"), + null, + null, + null, + null, + null, + null, + null, + null, + null, + Collections.singletonList("D-d") + ); + + leftStream.join(rightTable, valueJoiner).to(OUTPUT_TOPIC); + + runTest(expectedResult); + } + + @Test + public void testLeftKStreamKTable() throws Exception { + STREAMS_CONFIG.put(StreamsConfig.APPLICATION_ID_CONFIG, APP_ID + "-left-KStream-KTable"); + + final List> expectedResult = Arrays.asList( + null, + null, + Collections.singletonList("A-null"), + null, + Collections.singletonList("B-a"), + null, + null, + null, + Collections.singletonList("C-null"), + null, + null, + null, + null, + null, + Collections.singletonList("D-d") + ); + + leftStream.leftJoin(rightTable, valueJoiner).to(OUTPUT_TOPIC); + + runTest(expectedResult); + } + + @Test + public void testInnerKTableKTable() throws Exception { + STREAMS_CONFIG.put(StreamsConfig.APPLICATION_ID_CONFIG, APP_ID + "-inner-KTable-KTable"); + + final List> expectedResult = Arrays.asList( + null, + null, + null, + Collections.singletonList("A-a"), + Collections.singletonList("B-a"), + Collections.singletonList("B-b"), + Collections.singletonList((String) null), + null, + null, + Collections.singletonList("C-c"), + Collections.singletonList((String) null), + null, + null, + null, + Collections.singletonList("D-d") + ); + + leftTable.join(rightTable, valueJoiner).to(OUTPUT_TOPIC); + + runTest(expectedResult); + } + + @Test + public void testLeftKTableKTable() throws Exception { + STREAMS_CONFIG.put(StreamsConfig.APPLICATION_ID_CONFIG, APP_ID + "-left-KTable-KTable"); + + final List> expectedResult = Arrays.asList( + null, + null, + Collections.singletonList("A-null"), + Collections.singletonList("A-a"), + Collections.singletonList("B-a"), + Collections.singletonList("B-b"), + Collections.singletonList((String) null), + null, + Collections.singletonList("C-null"), + Collections.singletonList("C-c"), + Collections.singletonList("C-null"), + Collections.singletonList((String) null), + null, + null, + Collections.singletonList("D-d") + ); + + leftTable.leftJoin(rightTable, valueJoiner).to(OUTPUT_TOPIC); + + runTest(expectedResult); + } + + @Test + public void testOuterKTableKTable() throws Exception { + STREAMS_CONFIG.put(StreamsConfig.APPLICATION_ID_CONFIG, APP_ID + "-outer-KTable-KTable"); + + final List> expectedResult = Arrays.asList( + null, + null, + Collections.singletonList("A-null"), + Collections.singletonList("A-a"), + Collections.singletonList("B-a"), + Collections.singletonList("B-b"), + Collections.singletonList("null-b"), + Collections.singletonList((String) null), + Collections.singletonList("C-null"), + Collections.singletonList("C-c"), + Collections.singletonList("C-null"), + Collections.singletonList((String) null), + null, + Collections.singletonList("null-d"), + Collections.singletonList("D-d") + ); + + leftTable.outerJoin(rightTable, valueJoiner).to(OUTPUT_TOPIC); + + runTest(expectedResult); + } + + private final class TopicsGotDeletedCondition implements TestCondition { + @Override + public boolean conditionMet() { + final Set allTopics = new HashSet<>(); + allTopics.addAll(scala.collection.JavaConversions.seqAsJavaList(zkUtils.getAllTopics())); + return !allTopics.contains(INPUT_TOPIC_1) && !allTopics.contains(INPUT_TOPIC_2) && !allTopics.contains(OUTPUT_TOPIC); + } + } + + private final class Input { + String topic; + KeyValue record; + + private final long anyUniqueKey = 0L; + + Input(final String topic, final V value) { + this.topic = topic; + record = KeyValue.pair(anyUniqueKey, value); + } + } +} diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/KTableKTableJoinIntegrationTest.java b/streams/src/test/java/org/apache/kafka/streams/integration/KTableKTableJoinIntegrationTest.java index 85e2cf72b46..2cd38594f3c 100644 --- a/streams/src/test/java/org/apache/kafka/streams/integration/KTableKTableJoinIntegrationTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/integration/KTableKTableJoinIntegrationTest.java @@ -71,49 +71,49 @@ public class KTableKTableJoinIntegrationTest { public static Object[] parameters() { return new Object[][]{ {JoinType.INNER, JoinType.INNER, Arrays.asList( - new KeyValue<>("a", null), - new KeyValue<>("b", null), - new KeyValue<>("c", null), - new KeyValue<>("a", null), - new KeyValue<>("b", null), - new KeyValue<>("b", "B1-B2-B3"), - new KeyValue<>("c", null)) - }, +// new KeyValue<>("a", null), +// new KeyValue<>("b", null), +// new KeyValue<>("c", null), +// new KeyValue<>("a", null), +// new KeyValue<>("b", null), + new KeyValue<>("b", "B1-B2-B3")//, +// new KeyValue<>("c", null) + )}, {JoinType.INNER, JoinType.LEFT, Arrays.asList( - new KeyValue<>("a", null), - new KeyValue<>("b", null), - new KeyValue<>("c", null), - new KeyValue<>("a", null), - new KeyValue<>("b", null), - new KeyValue<>("b", "B1-B2-B3"), - new KeyValue<>("c", null) +// new KeyValue<>("a", null), +// new KeyValue<>("b", null), +// new KeyValue<>("c", null), +// new KeyValue<>("a", null), +// new KeyValue<>("b", null), + new KeyValue<>("b", "B1-B2-B3")//, +// new KeyValue<>("c", null) )}, {JoinType.INNER, JoinType.OUTER, Arrays.asList( new KeyValue<>("a", "null-A3"), new KeyValue<>("b", "null-B3"), new KeyValue<>("c", "null-C3"), - new KeyValue<>("a", "null-A3"), - new KeyValue<>("b", "null-B3"), - new KeyValue<>("b", "B1-B2-B3"), - new KeyValue<>("c", "null-C3") +// new KeyValue<>("a", "null-A3"), +// new KeyValue<>("b", "null-B3"), + new KeyValue<>("b", "B1-B2-B3")//, +// new KeyValue<>("c", "null-C3") )}, {JoinType.LEFT, JoinType.INNER, Arrays.asList( - new KeyValue<>("a", null), - new KeyValue<>("b", null), - new KeyValue<>("c", null), +// new KeyValue<>("a", null), +// new KeyValue<>("b", null), +// new KeyValue<>("c", null), new KeyValue<>("a", "A1-null-A3"), new KeyValue<>("b", "B1-null-B3"), - new KeyValue<>("b", "B1-B2-B3"), - new KeyValue<>("c", null) + new KeyValue<>("b", "B1-B2-B3")//, +// new KeyValue<>("c", null) )}, {JoinType.LEFT, JoinType.LEFT, Arrays.asList( - new KeyValue<>("a", null), - new KeyValue<>("b", null), - new KeyValue<>("c", null), +// new KeyValue<>("a", null), +// new KeyValue<>("b", null), +// new KeyValue<>("c", null), new KeyValue<>("a", "A1-null-A3"), new KeyValue<>("b", "B1-null-B3"), - new KeyValue<>("b", "B1-B2-B3"), - new KeyValue<>("c", null) + new KeyValue<>("b", "B1-B2-B3")//, +// new KeyValue<>("c", null) )}, {JoinType.LEFT, JoinType.OUTER, Arrays.asList( new KeyValue<>("a", "null-A3"), @@ -121,22 +121,22 @@ public class KTableKTableJoinIntegrationTest { new KeyValue<>("c", "null-C3"), new KeyValue<>("a", "A1-null-A3"), new KeyValue<>("b", "B1-null-B3"), - new KeyValue<>("b", "B1-B2-B3"), - new KeyValue<>("c", "null-C3") + new KeyValue<>("b", "B1-B2-B3")//, +// new KeyValue<>("c", "null-C3") )}, {JoinType.OUTER, JoinType.INNER, Arrays.asList( - new KeyValue<>("a", null), - new KeyValue<>("b", null), - new KeyValue<>("c", null), +// new KeyValue<>("a", null), +// new KeyValue<>("b", null), +// new KeyValue<>("c", null), new KeyValue<>("a", "A1-null-A3"), new KeyValue<>("b", "B1-null-B3"), new KeyValue<>("b", "B1-B2-B3"), new KeyValue<>("c", "null-C2-C3") )}, {JoinType.OUTER, JoinType.LEFT, Arrays.asList( - new KeyValue<>("a", null), - new KeyValue<>("b", null), - new KeyValue<>("c", null), +// new KeyValue<>("a", null), +// new KeyValue<>("b", null), +// new KeyValue<>("c", null), new KeyValue<>("a", "A1-null-A3"), new KeyValue<>("b", "B1-null-B3"), new KeyValue<>("b", "B1-B2-B3"), diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamLeftJoinTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamLeftJoinTest.java index 80512d9adb2..58090fd8deb 100644 --- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamLeftJoinTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamLeftJoinTest.java @@ -5,9 +5,9 @@ * The ASF licenses this file to You under the Apache License, Version 2.0 * (the "License"); you may not use this file except in compliance with * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * + *

+ * http://www.apache.org/licenses/LICENSE-2.0 + *

* Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. @@ -72,10 +72,10 @@ public class KStreamKStreamLeftJoinTest { final int[] expectedKeys = new int[]{0, 1, 2, 3}; - KStream stream1; - KStream stream2; - KStream joined; - MockProcessorSupplier processor; + final KStream stream1; + final KStream stream2; + final KStream joined; + final MockProcessorSupplier processor; processor = new MockProcessorSupplier<>(); stream1 = builder.stream(intSerde, stringSerde, topic1); @@ -84,7 +84,7 @@ public class KStreamKStreamLeftJoinTest { joined = stream1.leftJoin(stream2, MockValueJoiner.STRING_JOINER, JoinWindows.of(100), intSerde, stringSerde, stringSerde); joined.process(processor); - Collection> copartitionGroups = builder.copartitionGroups(); + final Collection> copartitionGroups = builder.copartitionGroups(); assertEquals(1, copartitionGroups.size()); assertEquals(new HashSet<>(Arrays.asList(topic1, topic2)), copartitionGroups.iterator().next()); @@ -93,8 +93,10 @@ public class KStreamKStreamLeftJoinTest { driver.setTime(0L); // push two items to the primary stream. the other window is empty - // w {} - // --> w = {} + // w1 {} + // w2 {} + // --> w1 = { 0:X0, 1:X1 } + // --> w2 = {} for (int i = 0; i < 2; i++) { driver.process(topic1, expectedKeys[i], "X" + expectedKeys[i]); @@ -103,38 +105,47 @@ public class KStreamKStreamLeftJoinTest { processor.checkAndClearProcessResult("0:X0+null", "1:X1+null"); // push two items to the other stream. this should produce two items. - // w {} - // --> w = { 0:Y0, 1:Y1 } + // w1 = { 0:X0, 1:X1 } + // w2 {} + // --> w1 = { 0:X0, 1:X1 } + // --> w2 = { 0:Y0, 1:Y1 } for (int i = 0; i < 2; i++) { driver.process(topic2, expectedKeys[i], "Y" + expectedKeys[i]); } driver.flushState(); - processor.checkAndClearProcessResult(); - // push all four items to the primary stream. this should produce four items. - // w = { 0:Y0, 1:Y1 } - // --> w = { 0:Y0, 1:Y1 } + processor.checkAndClearProcessResult("0:X0+Y0", "1:X1+Y1"); - for (int i = 0; i < expectedKeys.length; i++) { + // push three items to the primary stream. this should produce four items. + // w1 = { 0:X0, 1:X1 } + // w2 = { 0:Y0, 1:Y1 } + // --> w1 = { 0:X0, 1:X1, 0:X0, 1:X1, 2:X2 } + // --> w2 = { 0:Y0, 1:Y1 } + + for (int i = 0; i < 3; i++) { driver.process(topic1, expectedKeys[i], "X" + expectedKeys[i]); } driver.flushState(); - processor.checkAndClearProcessResult("0:X0+Y0", "1:X1+Y1", "2:X2+null", "3:X3+null"); + processor.checkAndClearProcessResult("0:X0+Y0", "1:X1+Y1", "2:X2+null"); - // push all items to the other stream. this should produce no items. - // w = { 0:Y0, 1:Y1 } - // --> w = { 0:Y0, 1:Y1, 0:YY0, 0:YY0, 1:YY1, 2:YY2, 3:YY3 } + // push all items to the other stream. this should produce 5 items + // w1 = { 0:X0, 1:X1, 0:X0, 1:X1, 2:X2 } + // w2 = { 0:Y0, 1:Y1 } + // --> w1 = { 0:X0, 1:X1, 0:X0, 1:X1, 2:X2 } + // --> w2 = { 0:Y0, 1:Y1, 0:YY0, 1:YY1, 2:YY2, 3:YY3} for (int i = 0; i < expectedKeys.length; i++) { driver.process(topic2, expectedKeys[i], "YY" + expectedKeys[i]); } driver.flushState(); - processor.checkAndClearProcessResult(); + processor.checkAndClearProcessResult("0:X0+YY0", "0:X0+YY0", "1:X1+YY1", "1:X1+YY1", "2:X2+YY2"); - // push all four items to the primary stream. this should produce four items. - // w = { 0:Y0, 1:Y1, 0:YY0, 0:YY0, 1:YY1, 2:YY2, 3:YY3 - // --> w = { 0:Y0, 1:Y1, 0:YY0, 0:YY0, 1:YY1, 2:YY2, 3:YY3 } + // push all four items to the primary stream. this should produce six items. + // w1 = { 0:X0, 1:X1, 0:X0, 1:X1, 2:X2 } + // w2 = { 0:Y0, 1:Y1, 0:YY0, 1:YY1, 2:YY2, 3:YY3} + // --> w1 = { 0:X0, 1:X1, 0:X0, 1:X1, 2:X2, 0:XX0, 1:XX1, 2:XX2, 3:XX3 } + // --> w2 = { 0:Y0, 1:Y1, 0:YY0, 1:YY1, 2:YY2, 3:YY3} for (int i = 0; i < expectedKeys.length; i++) { driver.process(topic1, expectedKeys[i], "XX" + expectedKeys[i]); @@ -151,10 +162,10 @@ public class KStreamKStreamLeftJoinTest { long time = 0L; - KStream stream1; - KStream stream2; - KStream joined; - MockProcessorSupplier processor; + final KStream stream1; + final KStream stream2; + final KStream joined; + final MockProcessorSupplier processor; processor = new MockProcessorSupplier<>(); stream1 = builder.stream(intSerde, stringSerde, topic1); @@ -163,7 +174,7 @@ public class KStreamKStreamLeftJoinTest { joined = stream1.leftJoin(stream2, MockValueJoiner.STRING_JOINER, JoinWindows.of(100), intSerde, stringSerde, stringSerde); joined.process(processor); - Collection> copartitionGroups = builder.copartitionGroups(); + final Collection> copartitionGroups = builder.copartitionGroups(); assertEquals(1, copartitionGroups.size()); assertEquals(new HashSet<>(Arrays.asList(topic1, topic2)), copartitionGroups.iterator().next()); @@ -171,8 +182,10 @@ public class KStreamKStreamLeftJoinTest { driver = new KStreamTestDriver(builder, stateDir); // push two items to the primary stream. the other window is empty. this should produce two items - // w = {} - // --> w = {} + // w1 = {} + // w2 = {} + // --> w1 = { 0:X0, 1:X1 } + // --> w2 = {} setRecordContext(time, topic1); for (int i = 0; i < 2; i++) { @@ -182,23 +195,27 @@ public class KStreamKStreamLeftJoinTest { processor.checkAndClearProcessResult("0:X0+null", "1:X1+null"); // push two items to the other stream. this should produce no items. - // w = {} - // --> w = { 0:Y0, 1:Y1 } + // w1 = { 0:X0, 1:X1 } + // w2 = {} + // --> w1 = { 0:X0, 1:X1 } + // --> w2 = { 0:Y0, 1:Y1 } setRecordContext(time, topic2); for (int i = 0; i < 2; i++) { driver.process(topic2, expectedKeys[i], "Y" + expectedKeys[i]); } driver.flushState(); - processor.checkAndClearProcessResult(); + processor.checkAndClearProcessResult("0:X0+Y0", "1:X1+Y1"); // clear logically time = 1000L; setRecordContext(time, topic2); // push all items to the other stream. this should produce no items. - // w = {} - // --> w = { 0:Y0, 1:Y1, 2:Y2, 3:Y3 } + // w1 = {} + // w2 = {} + // --> w1 = {} + // --> w2 = { 0:Y0, 1:Y1, 2:Y2, 3:Y3 } for (int i = 0; i < expectedKeys.length; i++) { setRecordContext(time + i, topic2); driver.process(topic2, expectedKeys[i], "Y" + expectedKeys[i]); @@ -206,8 +223,11 @@ public class KStreamKStreamLeftJoinTest { driver.flushState(); processor.checkAndClearProcessResult(); - // gradually expire items in window. - // w = { 0:Y0, 1:Y1, 2:Y2, 3:Y3 } + // gradually expire items in window 2. + // w1 = {} + // w2 = {} + // --> w1 = {} + // --> w2 = { 0:Y0, 1:Y1, 2:Y2, 3:Y3 } time = 1000L + 100L; setRecordContext(time, topic1); diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamKTableJoinTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamKTableJoinTest.java new file mode 100644 index 00000000000..742e852d134 --- /dev/null +++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamKTableJoinTest.java @@ -0,0 +1,146 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + *

+ * http://www.apache.org/licenses/LICENSE-2.0 + *

+ * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kafka.streams.kstream.internals; + +import org.apache.kafka.common.serialization.Serde; +import org.apache.kafka.common.serialization.Serdes; +import org.apache.kafka.streams.kstream.KStream; +import org.apache.kafka.streams.kstream.KStreamBuilder; +import org.apache.kafka.streams.kstream.KTable; +import org.apache.kafka.test.KStreamTestDriver; +import org.apache.kafka.test.MockProcessorSupplier; +import org.apache.kafka.test.MockValueJoiner; +import org.apache.kafka.test.TestUtils; +import org.junit.After; +import org.junit.Before; +import org.junit.Test; + +import java.io.File; +import java.io.IOException; +import java.util.Arrays; +import java.util.Collection; +import java.util.HashSet; +import java.util.Set; + +import static org.junit.Assert.assertEquals; + +public class KStreamKTableJoinTest { + + final private String topic1 = "topic1"; + final private String topic2 = "topic2"; + + final private Serde intSerde = Serdes.Integer(); + final private Serde stringSerde = Serdes.String(); + + private KStreamTestDriver driver = null; + private File stateDir = null; + + @After + public void tearDown() { + if (driver != null) { + driver.close(); + } + driver = null; + } + + @Before + public void setUp() throws IOException { + stateDir = TestUtils.tempDirectory("kafka-test"); + } + + @Test + public void testJoin() throws Exception { + final KStreamBuilder builder = new KStreamBuilder(); + + final int[] expectedKeys = new int[]{0, 1, 2, 3}; + + final KStream stream; + final KTable table; + final MockProcessorSupplier processor; + + processor = new MockProcessorSupplier<>(); + stream = builder.stream(intSerde, stringSerde, topic1); + table = builder.table(intSerde, stringSerde, topic2, "anyStoreName"); + stream.join(table, MockValueJoiner.STRING_JOINER).process(processor); + + final Collection> copartitionGroups = builder.copartitionGroups(); + + assertEquals(1, copartitionGroups.size()); + assertEquals(new HashSet<>(Arrays.asList(topic1, topic2)), copartitionGroups.iterator().next()); + + driver = new KStreamTestDriver(builder, stateDir); + driver.setTime(0L); + + // push two items to the primary stream. the other table is empty + + for (int i = 0; i < 2; i++) { + driver.process(topic1, expectedKeys[i], "X" + expectedKeys[i]); + } + + processor.checkAndClearProcessResult(); + + // push two items to the other stream. this should not produce any item. + + for (int i = 0; i < 2; i++) { + driver.process(topic2, expectedKeys[i], "Y" + expectedKeys[i]); + } + + processor.checkAndClearProcessResult(); + + // push all four items to the primary stream. this should produce two items. + + for (int i = 0; i < expectedKeys.length; i++) { + driver.process(topic1, expectedKeys[i], "X" + expectedKeys[i]); + } + + processor.checkAndClearProcessResult("0:X0+Y0", "1:X1+Y1"); + + // push all items to the other stream. this should not produce any item + for (int i = 0; i < expectedKeys.length; i++) { + driver.process(topic2, expectedKeys[i], "YY" + expectedKeys[i]); + } + + processor.checkAndClearProcessResult(); + + // push all four items to the primary stream. this should produce four items. + + for (int i = 0; i < expectedKeys.length; i++) { + driver.process(topic1, expectedKeys[i], "X" + expectedKeys[i]); + } + + processor.checkAndClearProcessResult("0:X0+YY0", "1:X1+YY1", "2:X2+YY2", "3:X3+YY3"); + + // push two items with null to the other stream as deletes. this should not produce any item. + + for (int i = 0; i < 2; i++) { + driver.process(topic2, expectedKeys[i], null); + } + + processor.checkAndClearProcessResult(); + + // push all four items to the primary stream. this should produce two items. + + for (int i = 0; i < expectedKeys.length; i++) { + driver.process(topic1, expectedKeys[i], "XX" + expectedKeys[i]); + } + + processor.checkAndClearProcessResult("2:XX2+YY2", "3:XX3+YY3"); + } + + +} diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamWindowAggregateTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamWindowAggregateTest.java index 37aac0c5218..2175dd53261 100644 --- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamWindowAggregateTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamWindowAggregateTest.java @@ -5,9 +5,9 @@ * The ASF licenses this file to You under the Apache License, Version 2.0 * (the "License"); you may not use this file except in compliance with * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * + *

+ * http://www.apache.org/licenses/LICENSE-2.0 + *

* Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. @@ -228,13 +228,7 @@ public class KStreamWindowAggregateTest { "[A@0]:0+1+1" ); proc2.checkAndClearProcessResult(); - proc3.checkAndClearProcessResult( - "[A@0]:null", - "[B@0]:null", - "[C@0]:null", - "[D@0]:null", - "[A@0]:null" - ); + proc3.checkAndClearProcessResult(); setRecordContext(5, topic1); driver.process(topic1, "A", "1"); @@ -260,13 +254,7 @@ public class KStreamWindowAggregateTest { "[C@0]:0+3+3", "[C@5]:0+3" ); proc2.checkAndClearProcessResult(); - proc3.checkAndClearProcessResult( - "[A@0]:null", "[A@5]:null", - "[B@0]:null", "[B@5]:null", - "[D@0]:null", "[D@5]:null", - "[B@0]:null", "[B@5]:null", - "[C@0]:null", "[C@5]:null" - ); + proc3.checkAndClearProcessResult(); setRecordContext(0, topic1); driver.process(topic2, "A", "a"); diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableKTableJoinTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableKTableJoinTest.java index ba106680175..1bd8600ccf6 100644 --- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableKTableJoinTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableKTableJoinTest.java @@ -5,9 +5,9 @@ * The ASF licenses this file to You under the Apache License, Version 2.0 * (the "License"); you may not use this file except in compliance with * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * + *

+ * http://www.apache.org/licenses/LICENSE-2.0 + *

* Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. @@ -19,9 +19,9 @@ package org.apache.kafka.streams.kstream.internals; import org.apache.kafka.common.serialization.Serde; import org.apache.kafka.common.serialization.Serdes; +import org.apache.kafka.streams.KeyValue; import org.apache.kafka.streams.kstream.KStreamBuilder; import org.apache.kafka.streams.kstream.KTable; -import org.apache.kafka.streams.KeyValue; import org.apache.kafka.test.KStreamTestDriver; import org.apache.kafka.test.MockProcessorSupplier; import org.apache.kafka.test.MockValueJoiner; @@ -74,10 +74,10 @@ public class KTableKTableJoinTest { final int[] expectedKeys = new int[]{0, 1, 2, 3}; - KTable table1; - KTable table2; - KTable joined; - MockProcessorSupplier processor; + final KTable table1; + final KTable table2; + final KTable joined; + final MockProcessorSupplier processor; processor = new MockProcessorSupplier<>(); table1 = builder.table(intSerde, stringSerde, topic1, storeName1); @@ -85,17 +85,17 @@ public class KTableKTableJoinTest { joined = table1.join(table2, MockValueJoiner.STRING_JOINER); joined.toStream().process(processor); - Collection> copartitionGroups = builder.copartitionGroups(); + final Collection> copartitionGroups = builder.copartitionGroups(); assertEquals(1, copartitionGroups.size()); assertEquals(new HashSet<>(Arrays.asList(topic1, topic2)), copartitionGroups.iterator().next()); - KTableValueGetterSupplier getterSupplier = ((KTableImpl) joined).valueGetterSupplier(); + final KTableValueGetterSupplier getterSupplier = ((KTableImpl) joined).valueGetterSupplier(); driver = new KStreamTestDriver(builder, stateDir); driver.setTime(0L); - KTableValueGetter getter = getterSupplier.get(); + final KTableValueGetter getter = getterSupplier.get(); getter.init(driver.context()); // push two items to the primary stream. the other table is empty @@ -105,8 +105,7 @@ public class KTableKTableJoinTest { } driver.flushState(); - processor.checkAndClearProcessResult("0:null", "1:null"); - checkJoinedValues(getter, kv(0, null), kv(1, null), kv(2, null), kv(3, null)); + processor.checkAndClearProcessResult(); // push two items to the other stream. this should produce two items. @@ -116,17 +115,17 @@ public class KTableKTableJoinTest { driver.flushState(); processor.checkAndClearProcessResult("0:X0+Y0", "1:X1+Y1"); - checkJoinedValues(getter, kv(0, "X0+Y0"), kv(1, "X1+Y1"), kv(2, null), kv(3, null)); + checkJoinedValues(getter, kv(0, "X0+Y0"), kv(1, "X1+Y1")); - // push all four items to the primary stream. this should produce four items. + // push all four items to the primary stream. this should produce two items. for (int i = 0; i < expectedKeys.length; i++) { - driver.process(topic1, expectedKeys[i], "X" + expectedKeys[i]); + driver.process(topic1, expectedKeys[i], "XX" + expectedKeys[i]); } driver.flushState(); - processor.checkAndClearProcessResult("0:X0+Y0", "1:X1+Y1", "2:null", "3:null"); - checkJoinedValues(getter, kv(0, "X0+Y0"), kv(1, "X1+Y1"), kv(2, null), kv(3, null)); + processor.checkAndClearProcessResult("0:XX0+Y0", "1:XX1+Y1"); + checkJoinedValues(getter, kv(0, "XX0+Y0"), kv(1, "XX1+Y1")); // push all items to the other stream. this should produce four items. for (int i = 0; i < expectedKeys.length; i++) { @@ -134,8 +133,8 @@ public class KTableKTableJoinTest { } driver.flushState(); - processor.checkAndClearProcessResult("0:X0+YY0", "1:X1+YY1", "2:X2+YY2", "3:X3+YY3"); - checkJoinedValues(getter, kv(0, "X0+YY0"), kv(1, "X1+YY1"), kv(2, "X2+YY2"), kv(3, "X3+YY3")); + processor.checkAndClearProcessResult("0:XX0+YY0", "1:XX1+YY1", "2:XX2+YY2", "3:XX3+YY3"); + checkJoinedValues(getter, kv(0, "XX0+YY0"), kv(1, "XX1+YY1"), kv(2, "XX2+YY2"), kv(3, "XX3+YY3")); // push all four items to the primary stream. this should produce four items. @@ -155,17 +154,17 @@ public class KTableKTableJoinTest { driver.flushState(); processor.checkAndClearProcessResult("0:null", "1:null"); - checkJoinedValues(getter, kv(0, null), kv(1, null), kv(2, "X2+YY2"), kv(3, "X3+YY3")); + checkJoinedValues(getter, kv(0, null), kv(1, null)); - // push all four items to the primary stream. this should produce four items. + // push all four items to the primary stream. this should produce two items. for (int i = 0; i < expectedKeys.length; i++) { driver.process(topic1, expectedKeys[i], "XX" + expectedKeys[i]); } driver.flushState(); - processor.checkAndClearProcessResult("0:null", "1:null", "2:XX2+YY2", "3:XX3+YY3"); - checkJoinedValues(getter, kv(0, null), kv(1, null), kv(2, "XX2+YY2"), kv(3, "XX3+YY3")); + processor.checkAndClearProcessResult("2:XX2+YY2", "3:XX3+YY3"); + checkJoinedValues(getter, kv(2, "XX2+YY2"), kv(3, "XX3+YY3")); } @Test @@ -174,10 +173,10 @@ public class KTableKTableJoinTest { final int[] expectedKeys = new int[]{0, 1, 2, 3}; - KTable table1; - KTable table2; - KTable joined; - MockProcessorSupplier proc; + final KTable table1; + final KTable table2; + final KTable joined; + final MockProcessorSupplier proc; table1 = builder.table(intSerde, stringSerde, topic1, storeName1); table2 = builder.table(intSerde, stringSerde, topic2, storeName2); @@ -200,7 +199,7 @@ public class KTableKTableJoinTest { } driver.flushState(); - proc.checkAndClearProcessResult("0:(null<-null)", "1:(null<-null)"); + proc.checkAndClearProcessResult(); // push two items to the other stream. this should produce two items. @@ -211,21 +210,21 @@ public class KTableKTableJoinTest { proc.checkAndClearProcessResult("0:(X0+Y0<-null)", "1:(X1+Y1<-null)"); - // push all four items to the primary stream. this should produce four items. + // push all four items to the primary stream. this should produce two items. for (int i = 0; i < expectedKeys.length; i++) { - driver.process(topic1, expectedKeys[i], "X" + expectedKeys[i]); + driver.process(topic1, expectedKeys[i], "XX" + expectedKeys[i]); } driver.flushState(); - proc.checkAndClearProcessResult("0:(X0+Y0<-null)", "1:(X1+Y1<-null)", "2:(null<-null)", "3:(null<-null)"); + proc.checkAndClearProcessResult("0:(XX0+Y0<-null)", "1:(XX1+Y1<-null)"); // push all items to the other stream. this should produce four items. for (int i = 0; i < expectedKeys.length; i++) { driver.process(topic2, expectedKeys[i], "YY" + expectedKeys[i]); } driver.flushState(); - proc.checkAndClearProcessResult("0:(X0+YY0<-null)", "1:(X1+YY1<-null)", "2:(X2+YY2<-null)", "3:(X3+YY3<-null)"); + proc.checkAndClearProcessResult("0:(XX0+YY0<-null)", "1:(XX1+YY1<-null)", "2:(XX2+YY2<-null)", "3:(XX3+YY3<-null)"); // push all four items to the primary stream. this should produce four items. @@ -243,13 +242,13 @@ public class KTableKTableJoinTest { driver.flushState(); proc.checkAndClearProcessResult("0:(null<-null)", "1:(null<-null)"); - // push all four items to the primary stream. this should produce four items. + // push all four items to the primary stream. this should produce two items. for (int i = 0; i < expectedKeys.length; i++) { driver.process(topic1, expectedKeys[i], "XX" + expectedKeys[i]); } driver.flushState(); - proc.checkAndClearProcessResult("0:(null<-null)", "1:(null<-null)", "2:(XX2+YY2<-null)", "3:(XX3+YY3<-null)"); + proc.checkAndClearProcessResult("2:(XX2+YY2<-null)", "3:(XX3+YY3<-null)"); } @Test @@ -258,10 +257,10 @@ public class KTableKTableJoinTest { final int[] expectedKeys = new int[]{0, 1, 2, 3}; - KTable table1; - KTable table2; - KTable joined; - MockProcessorSupplier proc; + final KTable table1; + final KTable table2; + final KTable joined; + final MockProcessorSupplier proc; table1 = builder.table(intSerde, stringSerde, topic1, storeName1); table2 = builder.table(intSerde, stringSerde, topic2, storeName2); @@ -285,7 +284,7 @@ public class KTableKTableJoinTest { driver.process(topic1, expectedKeys[i], "X" + expectedKeys[i]); } driver.flushState(); - proc.checkAndClearProcessResult("0:(null<-null)", "1:(null<-null)"); + proc.checkAndClearProcessResult(); // push two items to the other stream. this should produce two items. @@ -295,20 +294,20 @@ public class KTableKTableJoinTest { driver.flushState(); proc.checkAndClearProcessResult("0:(X0+Y0<-null)", "1:(X1+Y1<-null)"); - // push all four items to the primary stream. this should produce four items. + // push all four items to the primary stream. this should produce two items. for (int i = 0; i < expectedKeys.length; i++) { - driver.process(topic1, expectedKeys[i], "X" + expectedKeys[i]); + driver.process(topic1, expectedKeys[i], "XX" + expectedKeys[i]); } driver.flushState(); - proc.checkAndClearProcessResult("0:(X0+Y0<-X0+Y0)", "1:(X1+Y1<-X1+Y1)", "2:(null<-null)", "3:(null<-null)"); + proc.checkAndClearProcessResult("0:(XX0+Y0<-X0+Y0)", "1:(XX1+Y1<-X1+Y1)"); // push all items to the other stream. this should produce four items. for (int i = 0; i < expectedKeys.length; i++) { driver.process(topic2, expectedKeys[i], "YY" + expectedKeys[i]); } driver.flushState(); - proc.checkAndClearProcessResult("0:(X0+YY0<-X0+Y0)", "1:(X1+YY1<-X1+Y1)", "2:(X2+YY2<-null)", "3:(X3+YY3<-null)"); + proc.checkAndClearProcessResult("0:(XX0+YY0<-XX0+Y0)", "1:(XX1+YY1<-XX1+Y1)", "2:(XX2+YY2<-null)", "3:(XX3+YY3<-null)"); // push all four items to the primary stream. this should produce four items. @@ -316,7 +315,7 @@ public class KTableKTableJoinTest { driver.process(topic1, expectedKeys[i], "X" + expectedKeys[i]); } driver.flushState(); - proc.checkAndClearProcessResult("0:(X0+YY0<-X0+YY0)", "1:(X1+YY1<-X1+YY1)", "2:(X2+YY2<-X2+YY2)", "3:(X3+YY3<-X3+YY3)"); + proc.checkAndClearProcessResult("0:(X0+YY0<-XX0+YY0)", "1:(X1+YY1<-XX1+YY1)", "2:(X2+YY2<-XX2+YY2)", "3:(X3+YY3<-XX3+YY3)"); // push two items with null to the other stream as deletes. this should produce two item. @@ -326,13 +325,13 @@ public class KTableKTableJoinTest { driver.flushState(); proc.checkAndClearProcessResult("0:(null<-X0+YY0)", "1:(null<-X1+YY1)"); - // push all four items to the primary stream. this should produce four items. + // push all four items to the primary stream. this should produce two items. for (int i = 0; i < expectedKeys.length; i++) { driver.process(topic1, expectedKeys[i], "XX" + expectedKeys[i]); } driver.flushState(); - proc.checkAndClearProcessResult("0:(null<-null)", "1:(null<-null)", "2:(XX2+YY2<-X2+YY2)", "3:(XX3+YY3<-X3+YY3)"); + proc.checkAndClearProcessResult("2:(XX2+YY2<-X2+YY2)", "3:(XX3+YY3<-X3+YY3)"); } private KeyValue kv(Integer key, String value) { diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableKTableLeftJoinTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableKTableLeftJoinTest.java index 5f846785e7b..816979a2f59 100644 --- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableKTableLeftJoinTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableKTableLeftJoinTest.java @@ -5,9 +5,9 @@ * The ASF licenses this file to You under the Apache License, Version 2.0 * (the "License"); you may not use this file except in compliance with * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * + *

+ * http://www.apache.org/licenses/LICENSE-2.0 + *

* Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. @@ -19,9 +19,9 @@ package org.apache.kafka.streams.kstream.internals; import org.apache.kafka.common.serialization.Serde; import org.apache.kafka.common.serialization.Serdes; +import org.apache.kafka.streams.KeyValue; import org.apache.kafka.streams.kstream.KStreamBuilder; import org.apache.kafka.streams.kstream.KTable; -import org.apache.kafka.streams.KeyValue; import org.apache.kafka.test.KStreamTestDriver; import org.apache.kafka.test.MockProcessorSupplier; import org.apache.kafka.test.MockValueJoiner; @@ -164,10 +164,10 @@ public class KTableKTableLeftJoinTest { final int[] expectedKeys = new int[]{0, 1, 2, 3}; - KTable table1; - KTable table2; - KTable joined; - MockProcessorSupplier proc; + final KTable table1; + final KTable table2; + final KTable joined; + final MockProcessorSupplier proc; table1 = builder.table(intSerde, stringSerde, topic1, storeName1); table2 = builder.table(intSerde, stringSerde, topic2, storeName2); @@ -179,7 +179,7 @@ public class KTableKTableLeftJoinTest { driver = new KStreamTestDriver(builder, stateDir); driver.setTime(0L); - assertFalse(((KTableImpl) table1).sendingOldValueEnabled()); + assertTrue(((KTableImpl) table1).sendingOldValueEnabled()); assertFalse(((KTableImpl) table2).sendingOldValueEnabled()); assertFalse(((KTableImpl) joined).sendingOldValueEnabled()); diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableKTableOuterJoinTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableKTableOuterJoinTest.java index a6249bc2ece..8d1c70a35ea 100644 --- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableKTableOuterJoinTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableKTableOuterJoinTest.java @@ -5,9 +5,9 @@ * The ASF licenses this file to You under the Apache License, Version 2.0 * (the "License"); you may not use this file except in compliance with * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * + *

+ * http://www.apache.org/licenses/LICENSE-2.0 + *

* Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. @@ -19,9 +19,9 @@ package org.apache.kafka.streams.kstream.internals; import org.apache.kafka.common.serialization.Serde; import org.apache.kafka.common.serialization.Serdes; +import org.apache.kafka.streams.KeyValue; import org.apache.kafka.streams.kstream.KStreamBuilder; import org.apache.kafka.streams.kstream.KTable; -import org.apache.kafka.streams.KeyValue; import org.apache.kafka.test.KStreamTestDriver; import org.apache.kafka.test.MockProcessorSupplier; import org.apache.kafka.test.MockValueJoiner; @@ -189,8 +189,8 @@ public class KTableKTableOuterJoinTest { driver = new KStreamTestDriver(builder, stateDir); - assertFalse(((KTableImpl) table1).sendingOldValueEnabled()); - assertFalse(((KTableImpl) table2).sendingOldValueEnabled()); + assertTrue(((KTableImpl) table1).sendingOldValueEnabled()); + assertTrue(((KTableImpl) table2).sendingOldValueEnabled()); assertFalse(((KTableImpl) joined).sendingOldValueEnabled()); // push two items to the primary stream. the other table is empty