From a289865266618d9736fe49d11edfbc2b146f5148 Mon Sep 17 00:00:00 2001 From: Joan Goyeau Date: Tue, 21 Aug 2018 22:41:59 +0100 Subject: [PATCH] MINOR: Small refactorings on KTable joins (#5540) Reviewers: Guozhang Wang --- .../apache/kafka/streams/kstream/internals/KTableImpl.java | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableImpl.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableImpl.java index bbec96c7100..352e42d3918 100644 --- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableImpl.java +++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableImpl.java @@ -359,8 +359,6 @@ public class KTableImpl extends AbstractStream implements KTable KTable join(final KTable other, final ValueJoiner joiner, final Materialized> materialized) { - Objects.requireNonNull(other, "other can't be null"); - Objects.requireNonNull(joiner, "joiner can't be null"); Objects.requireNonNull(materialized, "materialized can't be null"); final MaterializedInternal> materializedInternal = new MaterializedInternal<>(materialized); materializedInternal.generateStoreNameIfNeeded(builder, MERGE_NAME); @@ -378,6 +376,7 @@ public class KTableImpl extends AbstractStream implements KTable KTable outerJoin(final KTable other, final ValueJoiner joiner, final Materialized> materialized) { + Objects.requireNonNull(materialized, "materialized can't be null"); final MaterializedInternal> materializedInternal = new MaterializedInternal<>(materialized); materializedInternal.generateStoreNameIfNeeded(builder, MERGE_NAME); @@ -394,8 +393,10 @@ public class KTableImpl extends AbstractStream implements KTable KTable leftJoin(final KTable other, final ValueJoiner joiner, final Materialized> materialized) { + Objects.requireNonNull(materialized, "materialized can't be null"); final MaterializedInternal> materializedInternal = new MaterializedInternal<>(materialized); materializedInternal.generateStoreNameIfNeeded(builder, MERGE_NAME); + return doJoin(other, joiner, materializedInternal, true, false); } @@ -410,7 +411,6 @@ public class KTableImpl extends AbstractStream implements KTable) other, joiner,