Browse Source

MINOR: Code refacotring in KTable-KTable Join (#4486)

1. Rename KTableKTableJoin to KTableKTableInnerJoin. Also removed abstract from other joins.
2. Merge KTableKTableJoinValueGetter.java into KTableKTableInnerJoin.
3. Use set instead of arrays in the stores function, to avoid duplicate stores to be connected to processors.

Reviewers: Bill Bejeck <bill@confluent.io>, Matthias J. Sax <matthias@confluent.io>
pull/4487/head
Guozhang Wang 7 years ago committed by GitHub
parent
commit
fdc14daced
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 4
      streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableImpl.java
  2. 5
      streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableKTableAbstractJoinValueGetterSupplier.java
  3. 51
      streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableKTableInnerJoin.java
  4. 21
      streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableKTableJoinMerger.java
  5. 62
      streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableKTableJoinValueGetter.java
  6. 10
      streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableKTableLeftJoin.java
  7. 10
      streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableKTableOuterJoin.java
  8. 10
      streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableKTableRightJoin.java
  9. 2
      streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableKTableInnerJoinTest.java

4
streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableImpl.java

@ -756,8 +756,8 @@ public class KTableImpl<K, S, V> extends AbstractStream<K> implements KTable<K,
final KTableKTableAbstractJoin<K, R, V1, V> joinOther; final KTableKTableAbstractJoin<K, R, V1, V> joinOther;
if (!leftOuter) { // inner if (!leftOuter) { // inner
joinThis = new KTableKTableJoin<>(this, (KTableImpl<K, ?, V1>) other, joiner); joinThis = new KTableKTableInnerJoin<>(this, (KTableImpl<K, ?, V1>) other, joiner);
joinOther = new KTableKTableJoin<>((KTableImpl<K, ?, V1>) other, this, reverseJoiner(joiner)); joinOther = new KTableKTableInnerJoin<>((KTableImpl<K, ?, V1>) other, this, reverseJoiner(joiner));
} else if (!rightOuter) { // left } else if (!rightOuter) { // left
joinThis = new KTableKTableLeftJoin<>(this, (KTableImpl<K, ?, V1>) other, joiner); joinThis = new KTableKTableLeftJoin<>(this, (KTableImpl<K, ?, V1>) other, joiner);
joinOther = new KTableKTableRightJoin<>((KTableImpl<K, ?, V1>) other, this, reverseJoiner(joiner)); joinOther = new KTableKTableRightJoin<>((KTableImpl<K, ?, V1>) other, this, reverseJoiner(joiner));

5
streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableKTableAbstractJoinValueGetterSupplier.java

@ -16,8 +16,9 @@
*/ */
package org.apache.kafka.streams.kstream.internals; package org.apache.kafka.streams.kstream.internals;
import java.util.ArrayList;
import java.util.Collections; import java.util.Collections;
import java.util.HashSet;
import java.util.Set;
public abstract class KTableKTableAbstractJoinValueGetterSupplier<K, R, V1, V2> implements KTableValueGetterSupplier<K, R> { public abstract class KTableKTableAbstractJoinValueGetterSupplier<K, R, V1, V2> implements KTableValueGetterSupplier<K, R> {
final KTableValueGetterSupplier<K, V1> valueGetterSupplier1; final KTableValueGetterSupplier<K, V1> valueGetterSupplier1;
@ -33,7 +34,7 @@ public abstract class KTableKTableAbstractJoinValueGetterSupplier<K, R, V1, V2>
public String[] storeNames() { public String[] storeNames() {
final String[] storeNames1 = valueGetterSupplier1.storeNames(); final String[] storeNames1 = valueGetterSupplier1.storeNames();
final String[] storeNames2 = valueGetterSupplier2.storeNames(); final String[] storeNames2 = valueGetterSupplier2.storeNames();
final ArrayList<String> stores = new ArrayList<>(storeNames1.length + storeNames2.length); final Set<String> stores = new HashSet<>(storeNames1.length + storeNames2.length);
Collections.addAll(stores, storeNames1); Collections.addAll(stores, storeNames1);
Collections.addAll(stores, storeNames2); Collections.addAll(stores, storeNames2);
return stores.toArray(new String[stores.size()]); return stores.toArray(new String[stores.size()]);

51
streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableKTableJoin.java → streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableKTableInnerJoin.java

@ -22,7 +22,7 @@ import org.apache.kafka.streams.processor.AbstractProcessor;
import org.apache.kafka.streams.processor.Processor; import org.apache.kafka.streams.processor.Processor;
import org.apache.kafka.streams.processor.ProcessorContext; import org.apache.kafka.streams.processor.ProcessorContext;
class KTableKTableJoin<K, R, V1, V2> extends KTableKTableAbstractJoin<K, R, V1, V2> { class KTableKTableInnerJoin<K, R, V1, V2> extends KTableKTableAbstractJoin<K, R, V1, V2> {
private final KeyValueMapper<K, V1, K> keyValueMapper = new KeyValueMapper<K, V1, K>() { private final KeyValueMapper<K, V1, K> keyValueMapper = new KeyValueMapper<K, V1, K>() {
@Override @Override
@ -31,7 +31,7 @@ class KTableKTableJoin<K, R, V1, V2> extends KTableKTableAbstractJoin<K, R, V1,
} }
}; };
KTableKTableJoin(KTableImpl<K, ?, V1> table1, KTableImpl<K, ?, V2> table2, ValueJoiner<? super V1, ? super V2, ? extends R> joiner) { KTableKTableInnerJoin(KTableImpl<K, ?, V1> table1, KTableImpl<K, ?, V2> table2, ValueJoiner<? super V1, ? super V2, ? extends R> joiner) {
super(table1, table2, joiner); super(table1, table2, joiner);
} }
@ -42,20 +42,17 @@ class KTableKTableJoin<K, R, V1, V2> extends KTableKTableAbstractJoin<K, R, V1,
@Override @Override
public KTableValueGetterSupplier<K, R> view() { public KTableValueGetterSupplier<K, R> view() {
return new KTableKTableAbstractJoinValueGetterSupplier(valueGetterSupplier1, valueGetterSupplier2); return new KTableKTableInnerJoinValueGetterSupplier(valueGetterSupplier1, valueGetterSupplier2);
} }
private class KTableKTableAbstractJoinValueGetterSupplier extends org.apache.kafka.streams.kstream.internals.KTableKTableAbstractJoinValueGetterSupplier<K, R, V1, V2> { private class KTableKTableInnerJoinValueGetterSupplier extends KTableKTableAbstractJoinValueGetterSupplier<K, R, V1, V2> {
public KTableKTableAbstractJoinValueGetterSupplier(KTableValueGetterSupplier<K, V1> valueGetterSupplier1, KTableValueGetterSupplier<K, V2> valueGetterSupplier2) { KTableKTableInnerJoinValueGetterSupplier(KTableValueGetterSupplier<K, V1> valueGetterSupplier1, KTableValueGetterSupplier<K, V2> valueGetterSupplier2) {
super(valueGetterSupplier1, valueGetterSupplier2); super(valueGetterSupplier1, valueGetterSupplier2);
} }
public KTableValueGetter<K, R> get() { public KTableValueGetter<K, R> get() {
return new KTableKTableJoinValueGetter<>(valueGetterSupplier1.get(), return new KTableKTableInnerJoinValueGetter(valueGetterSupplier1.get(), valueGetterSupplier2.get());
valueGetterSupplier2.get(),
joiner,
keyValueMapper);
} }
} }
@ -63,7 +60,7 @@ class KTableKTableJoin<K, R, V1, V2> extends KTableKTableAbstractJoin<K, R, V1,
private final KTableValueGetter<K, V2> valueGetter; private final KTableValueGetter<K, V2> valueGetter;
public KTableKTableJoinProcessor(KTableValueGetter<K, V2> valueGetter) { KTableKTableJoinProcessor(KTableValueGetter<K, V2> valueGetter) {
this.valueGetter = valueGetter; this.valueGetter = valueGetter;
} }
@ -100,4 +97,38 @@ class KTableKTableJoin<K, R, V1, V2> extends KTableKTableAbstractJoin<K, R, V1,
} }
} }
private class KTableKTableInnerJoinValueGetter implements KTableValueGetter<K, R> {
private final KTableValueGetter<K, V1> valueGetter1;
private final KTableValueGetter<K, V2> valueGetter2;
KTableKTableInnerJoinValueGetter(final KTableValueGetter<K, V1> valueGetter1,
final KTableValueGetter<K, V2> valueGetter2) {
this.valueGetter1 = valueGetter1;
this.valueGetter2 = valueGetter2;
}
@Override
public void init(final ProcessorContext context) {
valueGetter1.init(context);
valueGetter2.init(context);
}
@Override
public R get(final K key) {
final V1 value1 = valueGetter1.get(key);
if (value1 != null) {
V2 value2 = valueGetter2.get(keyValueMapper.apply(key, value1));
if (value2 != null) {
return joiner.apply(value1, value2);
} else {
return null;
}
} else {
return null;
}
}
}
} }

21
streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableKTableJoinMerger.java

@ -21,6 +21,10 @@ import org.apache.kafka.streams.processor.Processor;
import org.apache.kafka.streams.processor.ProcessorContext; import org.apache.kafka.streams.processor.ProcessorContext;
import org.apache.kafka.streams.state.KeyValueStore; import org.apache.kafka.streams.state.KeyValueStore;
import java.util.Collections;
import java.util.HashSet;
import java.util.Set;
class KTableKTableJoinMerger<K, V> implements KTableProcessorSupplier<K, V, V> { class KTableKTableJoinMerger<K, V> implements KTableProcessorSupplier<K, V, V> {
private final KTableImpl<K, ?, V> parent1; private final KTableImpl<K, ?, V> parent1;
@ -56,21 +60,12 @@ class KTableKTableJoinMerger<K, V> implements KTableProcessorSupplier<K, V, V> {
@Override @Override
public String[] storeNames() { public String[] storeNames() {
// we need to allow the downstream processor to be able to access both ends of the joining table's value getters
final String[] storeNames1 = parent1.valueGetterSupplier().storeNames(); final String[] storeNames1 = parent1.valueGetterSupplier().storeNames();
final String[] storeNames2 = parent2.valueGetterSupplier().storeNames(); final String[] storeNames2 = parent2.valueGetterSupplier().storeNames();
final Set<String> stores = new HashSet<>(storeNames1.length + storeNames2.length);
final String[] stores = new String[storeNames1.length + storeNames2.length]; Collections.addAll(stores, storeNames1);
int i = 0; Collections.addAll(stores, storeNames2);
for (final String storeName : storeNames1) { return stores.toArray(new String[stores.size()]);
stores[i] = storeName;
i++;
}
for (final String storeName : storeNames2) {
stores[i] = storeName;
i++;
}
return stores;
} }
}; };
} }

62
streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableKTableJoinValueGetter.java

@ -1,62 +0,0 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.streams.kstream.internals;
import org.apache.kafka.streams.kstream.KeyValueMapper;
import org.apache.kafka.streams.kstream.ValueJoiner;
import org.apache.kafka.streams.processor.ProcessorContext;
class KTableKTableJoinValueGetter<K1, V1, K2, V2, R> implements KTableValueGetter<K1, R> {
private final KTableValueGetter<K1, V1> valueGetter1;
private final KTableValueGetter<K2, V2> valueGetter2;
private final ValueJoiner<? super V1, ? super V2, ? extends R> joiner;
private final KeyValueMapper<K1, V1, K2> keyValueMapper;
public KTableKTableJoinValueGetter(final KTableValueGetter<K1, V1> valueGetter1,
final KTableValueGetter<K2, V2> valueGetter2,
final ValueJoiner<? super V1, ? super V2, ? extends R> joiner,
final KeyValueMapper<K1, V1, K2> keyValueMapper) {
this.valueGetter1 = valueGetter1;
this.valueGetter2 = valueGetter2;
this.joiner = joiner;
this.keyValueMapper = keyValueMapper;
}
@Override
public void init(ProcessorContext context) {
valueGetter1.init(context);
valueGetter2.init(context);
}
@Override
public R get(K1 key) {
R newValue = null;
V1 value1 = valueGetter1.get(key);
if (value1 != null) {
V2 value2 = valueGetter2.get(keyValueMapper.apply(key, value1));
if (value2 != null) {
newValue = joiner.apply(value1, value2);
}
}
return newValue;
}
}

10
streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableKTableLeftJoin.java

@ -34,12 +34,12 @@ class KTableKTableLeftJoin<K, R, V1, V2> extends KTableKTableAbstractJoin<K, R,
@Override @Override
public KTableValueGetterSupplier<K, R> view() { public KTableValueGetterSupplier<K, R> view() {
return new KTableKTableLeftAbstractJoinValueGetterSupplier(valueGetterSupplier1, valueGetterSupplier2); return new KTableKTableLeftJoinValueGetterSupplier(valueGetterSupplier1, valueGetterSupplier2);
} }
private class KTableKTableLeftAbstractJoinValueGetterSupplier extends KTableKTableAbstractJoinValueGetterSupplier<K, R, V1, V2> { private class KTableKTableLeftJoinValueGetterSupplier extends KTableKTableAbstractJoinValueGetterSupplier<K, R, V1, V2> {
public KTableKTableLeftAbstractJoinValueGetterSupplier(KTableValueGetterSupplier<K, V1> valueGetterSupplier1, KTableValueGetterSupplier<K, V2> valueGetterSupplier2) { KTableKTableLeftJoinValueGetterSupplier(KTableValueGetterSupplier<K, V1> valueGetterSupplier1, KTableValueGetterSupplier<K, V2> valueGetterSupplier2) {
super(valueGetterSupplier1, valueGetterSupplier2); super(valueGetterSupplier1, valueGetterSupplier2);
} }
@ -53,7 +53,7 @@ class KTableKTableLeftJoin<K, R, V1, V2> extends KTableKTableAbstractJoin<K, R,
private final KTableValueGetter<K, V2> valueGetter; private final KTableValueGetter<K, V2> valueGetter;
public KTableKTableLeftJoinProcessor(KTableValueGetter<K, V2> valueGetter) { KTableKTableLeftJoinProcessor(KTableValueGetter<K, V2> valueGetter) {
this.valueGetter = valueGetter; this.valueGetter = valueGetter;
} }
@ -94,7 +94,7 @@ class KTableKTableLeftJoin<K, R, V1, V2> extends KTableKTableAbstractJoin<K, R,
private final KTableValueGetter<K, V1> valueGetter1; private final KTableValueGetter<K, V1> valueGetter1;
private final KTableValueGetter<K, V2> valueGetter2; private final KTableValueGetter<K, V2> valueGetter2;
public KTableKTableLeftJoinValueGetter(KTableValueGetter<K, V1> valueGetter1, KTableValueGetter<K, V2> valueGetter2) { KTableKTableLeftJoinValueGetter(KTableValueGetter<K, V1> valueGetter1, KTableValueGetter<K, V2> valueGetter2) {
this.valueGetter1 = valueGetter1; this.valueGetter1 = valueGetter1;
this.valueGetter2 = valueGetter2; this.valueGetter2 = valueGetter2;
} }

10
streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableKTableOuterJoin.java

@ -34,12 +34,12 @@ class KTableKTableOuterJoin<K, R, V1, V2> extends KTableKTableAbstractJoin<K, R,
@Override @Override
public KTableValueGetterSupplier<K, R> view() { public KTableValueGetterSupplier<K, R> view() {
return new KTableKTableOuterAbstractJoinValueGetterSupplier(valueGetterSupplier1, valueGetterSupplier2); return new KTableKTableOuterJoinValueGetterSupplier(valueGetterSupplier1, valueGetterSupplier2);
} }
private class KTableKTableOuterAbstractJoinValueGetterSupplier extends KTableKTableAbstractJoinValueGetterSupplier<K, R, V1, V2> { private class KTableKTableOuterJoinValueGetterSupplier extends KTableKTableAbstractJoinValueGetterSupplier<K, R, V1, V2> {
public KTableKTableOuterAbstractJoinValueGetterSupplier(KTableValueGetterSupplier<K, V1> valueGetterSupplier1, KTableValueGetterSupplier<K, V2> valueGetterSupplier2) { KTableKTableOuterJoinValueGetterSupplier(KTableValueGetterSupplier<K, V1> valueGetterSupplier1, KTableValueGetterSupplier<K, V2> valueGetterSupplier2) {
super(valueGetterSupplier1, valueGetterSupplier2); super(valueGetterSupplier1, valueGetterSupplier2);
} }
@ -52,7 +52,7 @@ class KTableKTableOuterJoin<K, R, V1, V2> extends KTableKTableAbstractJoin<K, R,
private final KTableValueGetter<K, V2> valueGetter; private final KTableValueGetter<K, V2> valueGetter;
public KTableKTableOuterJoinProcessor(KTableValueGetter<K, V2> valueGetter) { KTableKTableOuterJoinProcessor(KTableValueGetter<K, V2> valueGetter) {
this.valueGetter = valueGetter; this.valueGetter = valueGetter;
} }
@ -94,7 +94,7 @@ class KTableKTableOuterJoin<K, R, V1, V2> extends KTableKTableAbstractJoin<K, R,
private final KTableValueGetter<K, V1> valueGetter1; private final KTableValueGetter<K, V1> valueGetter1;
private final KTableValueGetter<K, V2> valueGetter2; private final KTableValueGetter<K, V2> valueGetter2;
public KTableKTableOuterJoinValueGetter(KTableValueGetter<K, V1> valueGetter1, KTableValueGetter<K, V2> valueGetter2) { KTableKTableOuterJoinValueGetter(KTableValueGetter<K, V1> valueGetter1, KTableValueGetter<K, V2> valueGetter2) {
this.valueGetter1 = valueGetter1; this.valueGetter1 = valueGetter1;
this.valueGetter2 = valueGetter2; this.valueGetter2 = valueGetter2;
} }

10
streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableKTableRightJoin.java

@ -35,12 +35,12 @@ class KTableKTableRightJoin<K, R, V1, V2> extends KTableKTableAbstractJoin<K, R,
@Override @Override
public KTableValueGetterSupplier<K, R> view() { public KTableValueGetterSupplier<K, R> view() {
return new KTableKTableRightAbstractJoinValueGetterSupplier(valueGetterSupplier1, valueGetterSupplier2); return new KTableKTableRightJoinValueGetterSupplier(valueGetterSupplier1, valueGetterSupplier2);
} }
private class KTableKTableRightAbstractJoinValueGetterSupplier extends KTableKTableAbstractJoinValueGetterSupplier<K, R, V1, V2> { private class KTableKTableRightJoinValueGetterSupplier extends KTableKTableAbstractJoinValueGetterSupplier<K, R, V1, V2> {
public KTableKTableRightAbstractJoinValueGetterSupplier(KTableValueGetterSupplier<K, V1> valueGetterSupplier1, KTableValueGetterSupplier<K, V2> valueGetterSupplier2) { KTableKTableRightJoinValueGetterSupplier(KTableValueGetterSupplier<K, V1> valueGetterSupplier1, KTableValueGetterSupplier<K, V2> valueGetterSupplier2) {
super(valueGetterSupplier1, valueGetterSupplier2); super(valueGetterSupplier1, valueGetterSupplier2);
} }
@ -53,7 +53,7 @@ class KTableKTableRightJoin<K, R, V1, V2> extends KTableKTableAbstractJoin<K, R,
private final KTableValueGetter<K, V2> valueGetter; private final KTableValueGetter<K, V2> valueGetter;
public KTableKTableRightJoinProcessor(KTableValueGetter<K, V2> valueGetter) { KTableKTableRightJoinProcessor(KTableValueGetter<K, V2> valueGetter) {
this.valueGetter = valueGetter; this.valueGetter = valueGetter;
} }
@ -94,7 +94,7 @@ class KTableKTableRightJoin<K, R, V1, V2> extends KTableKTableAbstractJoin<K, R,
private final KTableValueGetter<K, V1> valueGetter1; private final KTableValueGetter<K, V1> valueGetter1;
private final KTableValueGetter<K, V2> valueGetter2; private final KTableValueGetter<K, V2> valueGetter2;
public KTableKTableRightJoinValueGetter(KTableValueGetter<K, V1> valueGetter1, KTableValueGetter<K, V2> valueGetter2) { KTableKTableRightJoinValueGetter(KTableValueGetter<K, V1> valueGetter1, KTableValueGetter<K, V2> valueGetter2) {
this.valueGetter1 = valueGetter1; this.valueGetter1 = valueGetter1;
this.valueGetter2 = valueGetter2; this.valueGetter2 = valueGetter2;
} }

2
streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableKTableJoinTest.java → streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableKTableInnerJoinTest.java

@ -42,7 +42,7 @@ import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertNull; import static org.junit.Assert.assertNull;
import static org.junit.Assert.assertTrue; import static org.junit.Assert.assertTrue;
public class KTableKTableJoinTest { public class KTableKTableInnerJoinTest {
final private String topic1 = "topic1"; final private String topic1 = "topic1";
final private String topic2 = "topic2"; final private String topic2 = "topic2";
Loading…
Cancel
Save