Browse Source

MINOR: Small refactorings on KTable joins (#5540)

Reviewers: Guozhang Wang <wangguoz@gmail.com>
pull/5555/head
Joan Goyeau 6 years ago committed by Guozhang Wang
parent
commit
a289865266
  1. 6
      streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableImpl.java

6
streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableImpl.java

@ -359,8 +359,6 @@ public class KTableImpl<K, S, V> extends AbstractStream<K> implements KTable<K,
public <VO, VR> KTable<K, VR> join(final KTable<K, VO> other, public <VO, VR> KTable<K, VR> join(final KTable<K, VO> other,
final ValueJoiner<? super V, ? super VO, ? extends VR> joiner, final ValueJoiner<? super V, ? super VO, ? extends VR> joiner,
final Materialized<K, VR, KeyValueStore<Bytes, byte[]>> materialized) { final Materialized<K, VR, KeyValueStore<Bytes, byte[]>> materialized) {
Objects.requireNonNull(other, "other can't be null");
Objects.requireNonNull(joiner, "joiner can't be null");
Objects.requireNonNull(materialized, "materialized can't be null"); Objects.requireNonNull(materialized, "materialized can't be null");
final MaterializedInternal<K, VR, KeyValueStore<Bytes, byte[]>> materializedInternal = new MaterializedInternal<>(materialized); final MaterializedInternal<K, VR, KeyValueStore<Bytes, byte[]>> materializedInternal = new MaterializedInternal<>(materialized);
materializedInternal.generateStoreNameIfNeeded(builder, MERGE_NAME); materializedInternal.generateStoreNameIfNeeded(builder, MERGE_NAME);
@ -378,6 +376,7 @@ public class KTableImpl<K, S, V> extends AbstractStream<K> implements KTable<K,
public <VO, VR> KTable<K, VR> outerJoin(final KTable<K, VO> other, public <VO, VR> KTable<K, VR> outerJoin(final KTable<K, VO> other,
final ValueJoiner<? super V, ? super VO, ? extends VR> joiner, final ValueJoiner<? super V, ? super VO, ? extends VR> joiner,
final Materialized<K, VR, KeyValueStore<Bytes, byte[]>> materialized) { final Materialized<K, VR, KeyValueStore<Bytes, byte[]>> materialized) {
Objects.requireNonNull(materialized, "materialized can't be null");
final MaterializedInternal<K, VR, KeyValueStore<Bytes, byte[]>> materializedInternal = new MaterializedInternal<>(materialized); final MaterializedInternal<K, VR, KeyValueStore<Bytes, byte[]>> materializedInternal = new MaterializedInternal<>(materialized);
materializedInternal.generateStoreNameIfNeeded(builder, MERGE_NAME); materializedInternal.generateStoreNameIfNeeded(builder, MERGE_NAME);
@ -394,8 +393,10 @@ public class KTableImpl<K, S, V> extends AbstractStream<K> implements KTable<K,
public <VO, VR> KTable<K, VR> leftJoin(final KTable<K, VO> other, public <VO, VR> KTable<K, VR> leftJoin(final KTable<K, VO> other,
final ValueJoiner<? super V, ? super VO, ? extends VR> joiner, final ValueJoiner<? super V, ? super VO, ? extends VR> joiner,
final Materialized<K, VR, KeyValueStore<Bytes, byte[]>> materialized) { final Materialized<K, VR, KeyValueStore<Bytes, byte[]>> materialized) {
Objects.requireNonNull(materialized, "materialized can't be null");
final MaterializedInternal<K, VR, KeyValueStore<Bytes, byte[]>> materializedInternal = new MaterializedInternal<>(materialized); final MaterializedInternal<K, VR, KeyValueStore<Bytes, byte[]>> materializedInternal = new MaterializedInternal<>(materialized);
materializedInternal.generateStoreNameIfNeeded(builder, MERGE_NAME); materializedInternal.generateStoreNameIfNeeded(builder, MERGE_NAME);
return doJoin(other, joiner, materializedInternal, true, false); return doJoin(other, joiner, materializedInternal, true, false);
} }
@ -410,7 +411,6 @@ public class KTableImpl<K, S, V> extends AbstractStream<K> implements KTable<K,
final String internalQueryableName = materializedInternal == null ? null : materializedInternal.storeName(); final String internalQueryableName = materializedInternal == null ? null : materializedInternal.storeName();
final String joinMergeName = builder.newProcessorName(MERGE_NAME); final String joinMergeName = builder.newProcessorName(MERGE_NAME);
return buildJoin( return buildJoin(
(AbstractStream<K>) other, (AbstractStream<K>) other,
joiner, joiner,

Loading…
Cancel
Save