From fdc14dacedc3d3497dcc222b44f539e6355b45a9 Mon Sep 17 00:00:00 2001 From: Guozhang Wang Date: Mon, 29 Jan 2018 17:48:53 -0800 Subject: [PATCH] MINOR: Code refacotring in KTable-KTable Join (#4486) 1. Rename KTableKTableJoin to KTableKTableInnerJoin. Also removed abstract from other joins. 2. Merge KTableKTableJoinValueGetter.java into KTableKTableInnerJoin. 3. Use set instead of arrays in the stores function, to avoid duplicate stores to be connected to processors. Reviewers: Bill Bejeck , Matthias J. Sax --- .../streams/kstream/internals/KTableImpl.java | 4 +- ...KTableAbstractJoinValueGetterSupplier.java | 5 +- ...leJoin.java => KTableKTableInnerJoin.java} | 51 ++++++++++++--- .../internals/KTableKTableJoinMerger.java | 21 +++---- .../KTableKTableJoinValueGetter.java | 62 ------------------- .../internals/KTableKTableLeftJoin.java | 10 +-- .../internals/KTableKTableOuterJoin.java | 10 +-- .../internals/KTableKTableRightJoin.java | 10 +-- ...st.java => KTableKTableInnerJoinTest.java} | 2 +- 9 files changed, 70 insertions(+), 105 deletions(-) rename streams/src/main/java/org/apache/kafka/streams/kstream/internals/{KTableKTableJoin.java => KTableKTableInnerJoin.java} (59%) delete mode 100644 streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableKTableJoinValueGetter.java rename streams/src/test/java/org/apache/kafka/streams/kstream/internals/{KTableKTableJoinTest.java => KTableKTableInnerJoinTest.java} (99%) diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableImpl.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableImpl.java index 0cf56ef54a8..a746d31289d 100644 --- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableImpl.java +++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableImpl.java @@ -756,8 +756,8 @@ public class KTableImpl extends AbstractStream implements KTable joinOther; if (!leftOuter) { // inner - joinThis = new KTableKTableJoin<>(this, (KTableImpl) other, joiner); - joinOther = new KTableKTableJoin<>((KTableImpl) other, this, reverseJoiner(joiner)); + joinThis = new KTableKTableInnerJoin<>(this, (KTableImpl) other, joiner); + joinOther = new KTableKTableInnerJoin<>((KTableImpl) other, this, reverseJoiner(joiner)); } else if (!rightOuter) { // left joinThis = new KTableKTableLeftJoin<>(this, (KTableImpl) other, joiner); joinOther = new KTableKTableRightJoin<>((KTableImpl) other, this, reverseJoiner(joiner)); diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableKTableAbstractJoinValueGetterSupplier.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableKTableAbstractJoinValueGetterSupplier.java index d36920ad7f0..f2de67f4385 100644 --- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableKTableAbstractJoinValueGetterSupplier.java +++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableKTableAbstractJoinValueGetterSupplier.java @@ -16,8 +16,9 @@ */ package org.apache.kafka.streams.kstream.internals; -import java.util.ArrayList; import java.util.Collections; +import java.util.HashSet; +import java.util.Set; public abstract class KTableKTableAbstractJoinValueGetterSupplier implements KTableValueGetterSupplier { final KTableValueGetterSupplier valueGetterSupplier1; @@ -33,7 +34,7 @@ public abstract class KTableKTableAbstractJoinValueGetterSupplier public String[] storeNames() { final String[] storeNames1 = valueGetterSupplier1.storeNames(); final String[] storeNames2 = valueGetterSupplier2.storeNames(); - final ArrayList stores = new ArrayList<>(storeNames1.length + storeNames2.length); + final Set stores = new HashSet<>(storeNames1.length + storeNames2.length); Collections.addAll(stores, storeNames1); Collections.addAll(stores, storeNames2); return stores.toArray(new String[stores.size()]); diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableKTableJoin.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableKTableInnerJoin.java similarity index 59% rename from streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableKTableJoin.java rename to streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableKTableInnerJoin.java index c424f4fca6a..e1701754e7a 100644 --- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableKTableJoin.java +++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableKTableInnerJoin.java @@ -22,7 +22,7 @@ import org.apache.kafka.streams.processor.AbstractProcessor; import org.apache.kafka.streams.processor.Processor; import org.apache.kafka.streams.processor.ProcessorContext; -class KTableKTableJoin extends KTableKTableAbstractJoin { +class KTableKTableInnerJoin extends KTableKTableAbstractJoin { private final KeyValueMapper keyValueMapper = new KeyValueMapper() { @Override @@ -31,7 +31,7 @@ class KTableKTableJoin extends KTableKTableAbstractJoin table1, KTableImpl table2, ValueJoiner joiner) { + KTableKTableInnerJoin(KTableImpl table1, KTableImpl table2, ValueJoiner joiner) { super(table1, table2, joiner); } @@ -42,20 +42,17 @@ class KTableKTableJoin extends KTableKTableAbstractJoin view() { - return new KTableKTableAbstractJoinValueGetterSupplier(valueGetterSupplier1, valueGetterSupplier2); + return new KTableKTableInnerJoinValueGetterSupplier(valueGetterSupplier1, valueGetterSupplier2); } - private class KTableKTableAbstractJoinValueGetterSupplier extends org.apache.kafka.streams.kstream.internals.KTableKTableAbstractJoinValueGetterSupplier { + private class KTableKTableInnerJoinValueGetterSupplier extends KTableKTableAbstractJoinValueGetterSupplier { - public KTableKTableAbstractJoinValueGetterSupplier(KTableValueGetterSupplier valueGetterSupplier1, KTableValueGetterSupplier valueGetterSupplier2) { + KTableKTableInnerJoinValueGetterSupplier(KTableValueGetterSupplier valueGetterSupplier1, KTableValueGetterSupplier valueGetterSupplier2) { super(valueGetterSupplier1, valueGetterSupplier2); } public KTableValueGetter get() { - return new KTableKTableJoinValueGetter<>(valueGetterSupplier1.get(), - valueGetterSupplier2.get(), - joiner, - keyValueMapper); + return new KTableKTableInnerJoinValueGetter(valueGetterSupplier1.get(), valueGetterSupplier2.get()); } } @@ -63,7 +60,7 @@ class KTableKTableJoin extends KTableKTableAbstractJoin valueGetter; - public KTableKTableJoinProcessor(KTableValueGetter valueGetter) { + KTableKTableJoinProcessor(KTableValueGetter valueGetter) { this.valueGetter = valueGetter; } @@ -100,4 +97,38 @@ class KTableKTableJoin extends KTableKTableAbstractJoin { + + private final KTableValueGetter valueGetter1; + private final KTableValueGetter valueGetter2; + + KTableKTableInnerJoinValueGetter(final KTableValueGetter valueGetter1, + final KTableValueGetter valueGetter2) { + this.valueGetter1 = valueGetter1; + this.valueGetter2 = valueGetter2; + } + + @Override + public void init(final ProcessorContext context) { + valueGetter1.init(context); + valueGetter2.init(context); + } + + @Override + public R get(final K key) { + final V1 value1 = valueGetter1.get(key); + + if (value1 != null) { + V2 value2 = valueGetter2.get(keyValueMapper.apply(key, value1)); + + if (value2 != null) { + return joiner.apply(value1, value2); + } else { + return null; + } + } else { + return null; + } + } + } } diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableKTableJoinMerger.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableKTableJoinMerger.java index d27b8bd0249..6400750a776 100644 --- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableKTableJoinMerger.java +++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableKTableJoinMerger.java @@ -21,6 +21,10 @@ import org.apache.kafka.streams.processor.Processor; import org.apache.kafka.streams.processor.ProcessorContext; import org.apache.kafka.streams.state.KeyValueStore; +import java.util.Collections; +import java.util.HashSet; +import java.util.Set; + class KTableKTableJoinMerger implements KTableProcessorSupplier { private final KTableImpl parent1; @@ -56,21 +60,12 @@ class KTableKTableJoinMerger implements KTableProcessorSupplier { @Override public String[] storeNames() { - // we need to allow the downstream processor to be able to access both ends of the joining table's value getters final String[] storeNames1 = parent1.valueGetterSupplier().storeNames(); final String[] storeNames2 = parent2.valueGetterSupplier().storeNames(); - - final String[] stores = new String[storeNames1.length + storeNames2.length]; - int i = 0; - for (final String storeName : storeNames1) { - stores[i] = storeName; - i++; - } - for (final String storeName : storeNames2) { - stores[i] = storeName; - i++; - } - return stores; + final Set stores = new HashSet<>(storeNames1.length + storeNames2.length); + Collections.addAll(stores, storeNames1); + Collections.addAll(stores, storeNames2); + return stores.toArray(new String[stores.size()]); } }; } diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableKTableJoinValueGetter.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableKTableJoinValueGetter.java deleted file mode 100644 index c8c3eb72d48..00000000000 --- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableKTableJoinValueGetter.java +++ /dev/null @@ -1,62 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.kafka.streams.kstream.internals; - -import org.apache.kafka.streams.kstream.KeyValueMapper; -import org.apache.kafka.streams.kstream.ValueJoiner; -import org.apache.kafka.streams.processor.ProcessorContext; - -class KTableKTableJoinValueGetter implements KTableValueGetter { - - private final KTableValueGetter valueGetter1; - private final KTableValueGetter valueGetter2; - private final ValueJoiner joiner; - private final KeyValueMapper keyValueMapper; - - public KTableKTableJoinValueGetter(final KTableValueGetter valueGetter1, - final KTableValueGetter valueGetter2, - final ValueJoiner joiner, - final KeyValueMapper keyValueMapper) { - this.valueGetter1 = valueGetter1; - this.valueGetter2 = valueGetter2; - this.joiner = joiner; - this.keyValueMapper = keyValueMapper; - } - - @Override - public void init(ProcessorContext context) { - valueGetter1.init(context); - valueGetter2.init(context); - } - - @Override - public R get(K1 key) { - R newValue = null; - V1 value1 = valueGetter1.get(key); - - if (value1 != null) { - V2 value2 = valueGetter2.get(keyValueMapper.apply(key, value1)); - - if (value2 != null) { - newValue = joiner.apply(value1, value2); - } - } - - return newValue; - } - -} diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableKTableLeftJoin.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableKTableLeftJoin.java index 33aef0238e5..bb3e652a61c 100644 --- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableKTableLeftJoin.java +++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableKTableLeftJoin.java @@ -34,12 +34,12 @@ class KTableKTableLeftJoin extends KTableKTableAbstractJoin view() { - return new KTableKTableLeftAbstractJoinValueGetterSupplier(valueGetterSupplier1, valueGetterSupplier2); + return new KTableKTableLeftJoinValueGetterSupplier(valueGetterSupplier1, valueGetterSupplier2); } - private class KTableKTableLeftAbstractJoinValueGetterSupplier extends KTableKTableAbstractJoinValueGetterSupplier { + private class KTableKTableLeftJoinValueGetterSupplier extends KTableKTableAbstractJoinValueGetterSupplier { - public KTableKTableLeftAbstractJoinValueGetterSupplier(KTableValueGetterSupplier valueGetterSupplier1, KTableValueGetterSupplier valueGetterSupplier2) { + KTableKTableLeftJoinValueGetterSupplier(KTableValueGetterSupplier valueGetterSupplier1, KTableValueGetterSupplier valueGetterSupplier2) { super(valueGetterSupplier1, valueGetterSupplier2); } @@ -53,7 +53,7 @@ class KTableKTableLeftJoin extends KTableKTableAbstractJoin valueGetter; - public KTableKTableLeftJoinProcessor(KTableValueGetter valueGetter) { + KTableKTableLeftJoinProcessor(KTableValueGetter valueGetter) { this.valueGetter = valueGetter; } @@ -94,7 +94,7 @@ class KTableKTableLeftJoin extends KTableKTableAbstractJoin valueGetter1; private final KTableValueGetter valueGetter2; - public KTableKTableLeftJoinValueGetter(KTableValueGetter valueGetter1, KTableValueGetter valueGetter2) { + KTableKTableLeftJoinValueGetter(KTableValueGetter valueGetter1, KTableValueGetter valueGetter2) { this.valueGetter1 = valueGetter1; this.valueGetter2 = valueGetter2; } diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableKTableOuterJoin.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableKTableOuterJoin.java index d2e1d79ab70..e7c170e0b3c 100644 --- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableKTableOuterJoin.java +++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableKTableOuterJoin.java @@ -34,12 +34,12 @@ class KTableKTableOuterJoin extends KTableKTableAbstractJoin view() { - return new KTableKTableOuterAbstractJoinValueGetterSupplier(valueGetterSupplier1, valueGetterSupplier2); + return new KTableKTableOuterJoinValueGetterSupplier(valueGetterSupplier1, valueGetterSupplier2); } - private class KTableKTableOuterAbstractJoinValueGetterSupplier extends KTableKTableAbstractJoinValueGetterSupplier { + private class KTableKTableOuterJoinValueGetterSupplier extends KTableKTableAbstractJoinValueGetterSupplier { - public KTableKTableOuterAbstractJoinValueGetterSupplier(KTableValueGetterSupplier valueGetterSupplier1, KTableValueGetterSupplier valueGetterSupplier2) { + KTableKTableOuterJoinValueGetterSupplier(KTableValueGetterSupplier valueGetterSupplier1, KTableValueGetterSupplier valueGetterSupplier2) { super(valueGetterSupplier1, valueGetterSupplier2); } @@ -52,7 +52,7 @@ class KTableKTableOuterJoin extends KTableKTableAbstractJoin valueGetter; - public KTableKTableOuterJoinProcessor(KTableValueGetter valueGetter) { + KTableKTableOuterJoinProcessor(KTableValueGetter valueGetter) { this.valueGetter = valueGetter; } @@ -94,7 +94,7 @@ class KTableKTableOuterJoin extends KTableKTableAbstractJoin valueGetter1; private final KTableValueGetter valueGetter2; - public KTableKTableOuterJoinValueGetter(KTableValueGetter valueGetter1, KTableValueGetter valueGetter2) { + KTableKTableOuterJoinValueGetter(KTableValueGetter valueGetter1, KTableValueGetter valueGetter2) { this.valueGetter1 = valueGetter1; this.valueGetter2 = valueGetter2; } diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableKTableRightJoin.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableKTableRightJoin.java index f4c840ba702..c540cf94fc2 100644 --- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableKTableRightJoin.java +++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableKTableRightJoin.java @@ -35,12 +35,12 @@ class KTableKTableRightJoin extends KTableKTableAbstractJoin view() { - return new KTableKTableRightAbstractJoinValueGetterSupplier(valueGetterSupplier1, valueGetterSupplier2); + return new KTableKTableRightJoinValueGetterSupplier(valueGetterSupplier1, valueGetterSupplier2); } - private class KTableKTableRightAbstractJoinValueGetterSupplier extends KTableKTableAbstractJoinValueGetterSupplier { + private class KTableKTableRightJoinValueGetterSupplier extends KTableKTableAbstractJoinValueGetterSupplier { - public KTableKTableRightAbstractJoinValueGetterSupplier(KTableValueGetterSupplier valueGetterSupplier1, KTableValueGetterSupplier valueGetterSupplier2) { + KTableKTableRightJoinValueGetterSupplier(KTableValueGetterSupplier valueGetterSupplier1, KTableValueGetterSupplier valueGetterSupplier2) { super(valueGetterSupplier1, valueGetterSupplier2); } @@ -53,7 +53,7 @@ class KTableKTableRightJoin extends KTableKTableAbstractJoin valueGetter; - public KTableKTableRightJoinProcessor(KTableValueGetter valueGetter) { + KTableKTableRightJoinProcessor(KTableValueGetter valueGetter) { this.valueGetter = valueGetter; } @@ -94,7 +94,7 @@ class KTableKTableRightJoin extends KTableKTableAbstractJoin valueGetter1; private final KTableValueGetter valueGetter2; - public KTableKTableRightJoinValueGetter(KTableValueGetter valueGetter1, KTableValueGetter valueGetter2) { + KTableKTableRightJoinValueGetter(KTableValueGetter valueGetter1, KTableValueGetter valueGetter2) { this.valueGetter1 = valueGetter1; this.valueGetter2 = valueGetter2; } diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableKTableJoinTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableKTableInnerJoinTest.java similarity index 99% rename from streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableKTableJoinTest.java rename to streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableKTableInnerJoinTest.java index 09d4aa03aa6..b890e2f991d 100644 --- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableKTableJoinTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableKTableInnerJoinTest.java @@ -42,7 +42,7 @@ import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertNull; import static org.junit.Assert.assertTrue; -public class KTableKTableJoinTest { +public class KTableKTableInnerJoinTest { final private String topic1 = "topic1"; final private String topic2 = "topic2";