diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableImpl.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableImpl.java index 0cf56ef54a8..a746d31289d 100644 --- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableImpl.java +++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableImpl.java @@ -756,8 +756,8 @@ public class KTableImpl extends AbstractStream implements KTable joinOther; if (!leftOuter) { // inner - joinThis = new KTableKTableJoin<>(this, (KTableImpl) other, joiner); - joinOther = new KTableKTableJoin<>((KTableImpl) other, this, reverseJoiner(joiner)); + joinThis = new KTableKTableInnerJoin<>(this, (KTableImpl) other, joiner); + joinOther = new KTableKTableInnerJoin<>((KTableImpl) other, this, reverseJoiner(joiner)); } else if (!rightOuter) { // left joinThis = new KTableKTableLeftJoin<>(this, (KTableImpl) other, joiner); joinOther = new KTableKTableRightJoin<>((KTableImpl) other, this, reverseJoiner(joiner)); diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableKTableAbstractJoinValueGetterSupplier.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableKTableAbstractJoinValueGetterSupplier.java index d36920ad7f0..f2de67f4385 100644 --- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableKTableAbstractJoinValueGetterSupplier.java +++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableKTableAbstractJoinValueGetterSupplier.java @@ -16,8 +16,9 @@ */ package org.apache.kafka.streams.kstream.internals; -import java.util.ArrayList; import java.util.Collections; +import java.util.HashSet; +import java.util.Set; public abstract class KTableKTableAbstractJoinValueGetterSupplier implements KTableValueGetterSupplier { final KTableValueGetterSupplier valueGetterSupplier1; @@ -33,7 +34,7 @@ public abstract class KTableKTableAbstractJoinValueGetterSupplier public String[] storeNames() { final String[] storeNames1 = valueGetterSupplier1.storeNames(); final String[] storeNames2 = valueGetterSupplier2.storeNames(); - final ArrayList stores = new ArrayList<>(storeNames1.length + storeNames2.length); + final Set stores = new HashSet<>(storeNames1.length + storeNames2.length); Collections.addAll(stores, storeNames1); Collections.addAll(stores, storeNames2); return stores.toArray(new String[stores.size()]); diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableKTableJoin.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableKTableInnerJoin.java similarity index 59% rename from streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableKTableJoin.java rename to streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableKTableInnerJoin.java index c424f4fca6a..e1701754e7a 100644 --- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableKTableJoin.java +++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableKTableInnerJoin.java @@ -22,7 +22,7 @@ import org.apache.kafka.streams.processor.AbstractProcessor; import org.apache.kafka.streams.processor.Processor; import org.apache.kafka.streams.processor.ProcessorContext; -class KTableKTableJoin extends KTableKTableAbstractJoin { +class KTableKTableInnerJoin extends KTableKTableAbstractJoin { private final KeyValueMapper keyValueMapper = new KeyValueMapper() { @Override @@ -31,7 +31,7 @@ class KTableKTableJoin extends KTableKTableAbstractJoin table1, KTableImpl table2, ValueJoiner joiner) { + KTableKTableInnerJoin(KTableImpl table1, KTableImpl table2, ValueJoiner joiner) { super(table1, table2, joiner); } @@ -42,20 +42,17 @@ class KTableKTableJoin extends KTableKTableAbstractJoin view() { - return new KTableKTableAbstractJoinValueGetterSupplier(valueGetterSupplier1, valueGetterSupplier2); + return new KTableKTableInnerJoinValueGetterSupplier(valueGetterSupplier1, valueGetterSupplier2); } - private class KTableKTableAbstractJoinValueGetterSupplier extends org.apache.kafka.streams.kstream.internals.KTableKTableAbstractJoinValueGetterSupplier { + private class KTableKTableInnerJoinValueGetterSupplier extends KTableKTableAbstractJoinValueGetterSupplier { - public KTableKTableAbstractJoinValueGetterSupplier(KTableValueGetterSupplier valueGetterSupplier1, KTableValueGetterSupplier valueGetterSupplier2) { + KTableKTableInnerJoinValueGetterSupplier(KTableValueGetterSupplier valueGetterSupplier1, KTableValueGetterSupplier valueGetterSupplier2) { super(valueGetterSupplier1, valueGetterSupplier2); } public KTableValueGetter get() { - return new KTableKTableJoinValueGetter<>(valueGetterSupplier1.get(), - valueGetterSupplier2.get(), - joiner, - keyValueMapper); + return new KTableKTableInnerJoinValueGetter(valueGetterSupplier1.get(), valueGetterSupplier2.get()); } } @@ -63,7 +60,7 @@ class KTableKTableJoin extends KTableKTableAbstractJoin valueGetter; - public KTableKTableJoinProcessor(KTableValueGetter valueGetter) { + KTableKTableJoinProcessor(KTableValueGetter valueGetter) { this.valueGetter = valueGetter; } @@ -100,4 +97,38 @@ class KTableKTableJoin extends KTableKTableAbstractJoin { + + private final KTableValueGetter valueGetter1; + private final KTableValueGetter valueGetter2; + + KTableKTableInnerJoinValueGetter(final KTableValueGetter valueGetter1, + final KTableValueGetter valueGetter2) { + this.valueGetter1 = valueGetter1; + this.valueGetter2 = valueGetter2; + } + + @Override + public void init(final ProcessorContext context) { + valueGetter1.init(context); + valueGetter2.init(context); + } + + @Override + public R get(final K key) { + final V1 value1 = valueGetter1.get(key); + + if (value1 != null) { + V2 value2 = valueGetter2.get(keyValueMapper.apply(key, value1)); + + if (value2 != null) { + return joiner.apply(value1, value2); + } else { + return null; + } + } else { + return null; + } + } + } } diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableKTableJoinMerger.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableKTableJoinMerger.java index d27b8bd0249..6400750a776 100644 --- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableKTableJoinMerger.java +++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableKTableJoinMerger.java @@ -21,6 +21,10 @@ import org.apache.kafka.streams.processor.Processor; import org.apache.kafka.streams.processor.ProcessorContext; import org.apache.kafka.streams.state.KeyValueStore; +import java.util.Collections; +import java.util.HashSet; +import java.util.Set; + class KTableKTableJoinMerger implements KTableProcessorSupplier { private final KTableImpl parent1; @@ -56,21 +60,12 @@ class KTableKTableJoinMerger implements KTableProcessorSupplier { @Override public String[] storeNames() { - // we need to allow the downstream processor to be able to access both ends of the joining table's value getters final String[] storeNames1 = parent1.valueGetterSupplier().storeNames(); final String[] storeNames2 = parent2.valueGetterSupplier().storeNames(); - - final String[] stores = new String[storeNames1.length + storeNames2.length]; - int i = 0; - for (final String storeName : storeNames1) { - stores[i] = storeName; - i++; - } - for (final String storeName : storeNames2) { - stores[i] = storeName; - i++; - } - return stores; + final Set stores = new HashSet<>(storeNames1.length + storeNames2.length); + Collections.addAll(stores, storeNames1); + Collections.addAll(stores, storeNames2); + return stores.toArray(new String[stores.size()]); } }; } diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableKTableJoinValueGetter.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableKTableJoinValueGetter.java deleted file mode 100644 index c8c3eb72d48..00000000000 --- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableKTableJoinValueGetter.java +++ /dev/null @@ -1,62 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.kafka.streams.kstream.internals; - -import org.apache.kafka.streams.kstream.KeyValueMapper; -import org.apache.kafka.streams.kstream.ValueJoiner; -import org.apache.kafka.streams.processor.ProcessorContext; - -class KTableKTableJoinValueGetter implements KTableValueGetter { - - private final KTableValueGetter valueGetter1; - private final KTableValueGetter valueGetter2; - private final ValueJoiner joiner; - private final KeyValueMapper keyValueMapper; - - public KTableKTableJoinValueGetter(final KTableValueGetter valueGetter1, - final KTableValueGetter valueGetter2, - final ValueJoiner joiner, - final KeyValueMapper keyValueMapper) { - this.valueGetter1 = valueGetter1; - this.valueGetter2 = valueGetter2; - this.joiner = joiner; - this.keyValueMapper = keyValueMapper; - } - - @Override - public void init(ProcessorContext context) { - valueGetter1.init(context); - valueGetter2.init(context); - } - - @Override - public R get(K1 key) { - R newValue = null; - V1 value1 = valueGetter1.get(key); - - if (value1 != null) { - V2 value2 = valueGetter2.get(keyValueMapper.apply(key, value1)); - - if (value2 != null) { - newValue = joiner.apply(value1, value2); - } - } - - return newValue; - } - -} diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableKTableLeftJoin.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableKTableLeftJoin.java index 33aef0238e5..bb3e652a61c 100644 --- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableKTableLeftJoin.java +++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableKTableLeftJoin.java @@ -34,12 +34,12 @@ class KTableKTableLeftJoin extends KTableKTableAbstractJoin view() { - return new KTableKTableLeftAbstractJoinValueGetterSupplier(valueGetterSupplier1, valueGetterSupplier2); + return new KTableKTableLeftJoinValueGetterSupplier(valueGetterSupplier1, valueGetterSupplier2); } - private class KTableKTableLeftAbstractJoinValueGetterSupplier extends KTableKTableAbstractJoinValueGetterSupplier { + private class KTableKTableLeftJoinValueGetterSupplier extends KTableKTableAbstractJoinValueGetterSupplier { - public KTableKTableLeftAbstractJoinValueGetterSupplier(KTableValueGetterSupplier valueGetterSupplier1, KTableValueGetterSupplier valueGetterSupplier2) { + KTableKTableLeftJoinValueGetterSupplier(KTableValueGetterSupplier valueGetterSupplier1, KTableValueGetterSupplier valueGetterSupplier2) { super(valueGetterSupplier1, valueGetterSupplier2); } @@ -53,7 +53,7 @@ class KTableKTableLeftJoin extends KTableKTableAbstractJoin valueGetter; - public KTableKTableLeftJoinProcessor(KTableValueGetter valueGetter) { + KTableKTableLeftJoinProcessor(KTableValueGetter valueGetter) { this.valueGetter = valueGetter; } @@ -94,7 +94,7 @@ class KTableKTableLeftJoin extends KTableKTableAbstractJoin valueGetter1; private final KTableValueGetter valueGetter2; - public KTableKTableLeftJoinValueGetter(KTableValueGetter valueGetter1, KTableValueGetter valueGetter2) { + KTableKTableLeftJoinValueGetter(KTableValueGetter valueGetter1, KTableValueGetter valueGetter2) { this.valueGetter1 = valueGetter1; this.valueGetter2 = valueGetter2; } diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableKTableOuterJoin.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableKTableOuterJoin.java index d2e1d79ab70..e7c170e0b3c 100644 --- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableKTableOuterJoin.java +++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableKTableOuterJoin.java @@ -34,12 +34,12 @@ class KTableKTableOuterJoin extends KTableKTableAbstractJoin view() { - return new KTableKTableOuterAbstractJoinValueGetterSupplier(valueGetterSupplier1, valueGetterSupplier2); + return new KTableKTableOuterJoinValueGetterSupplier(valueGetterSupplier1, valueGetterSupplier2); } - private class KTableKTableOuterAbstractJoinValueGetterSupplier extends KTableKTableAbstractJoinValueGetterSupplier { + private class KTableKTableOuterJoinValueGetterSupplier extends KTableKTableAbstractJoinValueGetterSupplier { - public KTableKTableOuterAbstractJoinValueGetterSupplier(KTableValueGetterSupplier valueGetterSupplier1, KTableValueGetterSupplier valueGetterSupplier2) { + KTableKTableOuterJoinValueGetterSupplier(KTableValueGetterSupplier valueGetterSupplier1, KTableValueGetterSupplier valueGetterSupplier2) { super(valueGetterSupplier1, valueGetterSupplier2); } @@ -52,7 +52,7 @@ class KTableKTableOuterJoin extends KTableKTableAbstractJoin valueGetter; - public KTableKTableOuterJoinProcessor(KTableValueGetter valueGetter) { + KTableKTableOuterJoinProcessor(KTableValueGetter valueGetter) { this.valueGetter = valueGetter; } @@ -94,7 +94,7 @@ class KTableKTableOuterJoin extends KTableKTableAbstractJoin valueGetter1; private final KTableValueGetter valueGetter2; - public KTableKTableOuterJoinValueGetter(KTableValueGetter valueGetter1, KTableValueGetter valueGetter2) { + KTableKTableOuterJoinValueGetter(KTableValueGetter valueGetter1, KTableValueGetter valueGetter2) { this.valueGetter1 = valueGetter1; this.valueGetter2 = valueGetter2; } diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableKTableRightJoin.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableKTableRightJoin.java index f4c840ba702..c540cf94fc2 100644 --- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableKTableRightJoin.java +++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableKTableRightJoin.java @@ -35,12 +35,12 @@ class KTableKTableRightJoin extends KTableKTableAbstractJoin view() { - return new KTableKTableRightAbstractJoinValueGetterSupplier(valueGetterSupplier1, valueGetterSupplier2); + return new KTableKTableRightJoinValueGetterSupplier(valueGetterSupplier1, valueGetterSupplier2); } - private class KTableKTableRightAbstractJoinValueGetterSupplier extends KTableKTableAbstractJoinValueGetterSupplier { + private class KTableKTableRightJoinValueGetterSupplier extends KTableKTableAbstractJoinValueGetterSupplier { - public KTableKTableRightAbstractJoinValueGetterSupplier(KTableValueGetterSupplier valueGetterSupplier1, KTableValueGetterSupplier valueGetterSupplier2) { + KTableKTableRightJoinValueGetterSupplier(KTableValueGetterSupplier valueGetterSupplier1, KTableValueGetterSupplier valueGetterSupplier2) { super(valueGetterSupplier1, valueGetterSupplier2); } @@ -53,7 +53,7 @@ class KTableKTableRightJoin extends KTableKTableAbstractJoin valueGetter; - public KTableKTableRightJoinProcessor(KTableValueGetter valueGetter) { + KTableKTableRightJoinProcessor(KTableValueGetter valueGetter) { this.valueGetter = valueGetter; } @@ -94,7 +94,7 @@ class KTableKTableRightJoin extends KTableKTableAbstractJoin valueGetter1; private final KTableValueGetter valueGetter2; - public KTableKTableRightJoinValueGetter(KTableValueGetter valueGetter1, KTableValueGetter valueGetter2) { + KTableKTableRightJoinValueGetter(KTableValueGetter valueGetter1, KTableValueGetter valueGetter2) { this.valueGetter1 = valueGetter1; this.valueGetter2 = valueGetter2; } diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableKTableJoinTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableKTableInnerJoinTest.java similarity index 99% rename from streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableKTableJoinTest.java rename to streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableKTableInnerJoinTest.java index 09d4aa03aa6..b890e2f991d 100644 --- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableKTableJoinTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableKTableInnerJoinTest.java @@ -42,7 +42,7 @@ import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertNull; import static org.junit.Assert.assertTrue; -public class KTableKTableJoinTest { +public class KTableKTableInnerJoinTest { final private String topic1 = "topic1"; final private String topic2 = "topic2";