diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/KStream.java b/streams/src/main/java/org/apache/kafka/streams/kstream/KStream.java
index 032efb5233e..4483e9fd11f 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/KStream.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/KStream.java
@@ -5,9 +5,9 @@
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
@@ -532,6 +532,39 @@ public interface KStream {
ValueJoiner joiner,
JoinWindows windows);
+ /**
+ * Combine values of this stream with {@link KTable}'s elements of the same key using non-windowed Inner Join.
+ * If a record key or value is {@code null} it will not included in the resulting {@link KStream}
+ *
+ * @param table the instance of {@link KTable} joined with this stream
+ * @param joiner the instance of {@link ValueJoiner}
+ * @param the value type of the table
+ * @param the value type of the new stream
+ * @return a {@link KStream} that contains join-records for each key and values computed by the given {@link ValueJoiner},
+ * one for each matched record-pair with the same key
+ */
+ KStream join(KTable table, ValueJoiner joiner);
+
+ /**
+ * Combine values of this stream with {@link KTable}'s elements of the same key using non-windowed Inner Join.
+ * If a record key or value is {@code null} it will not included in the resulting {@link KStream}
+ *
+ * @param table the instance of {@link KTable} joined with this stream
+ * @param valueJoiner the instance of {@link ValueJoiner}
+ * @param keySerde key serdes for materializing this stream.
+ * If not specified the default serdes defined in the configs will be used
+ * @param valSerde value serdes for materializing this stream,
+ * if not specified the default serdes defined in the configs will be used
+ * @param the value type of the table
+ * @param the value type of the new stream
+ * @return a {@link KStream} that contains join-records for each key and values computed by the given {@link ValueJoiner},
+ * one for each matched record-pair with the same key and within the joining window intervals
+ */
+ KStream join(KTable table,
+ ValueJoiner valueJoiner,
+ Serde keySerde,
+ Serde valSerde);
+
/**
* Combine values of this stream with {@link KTable}'s elements of the same key using non-windowed Left Join.
* If a record key is null it will not included in the resulting {@link KStream}
@@ -566,6 +599,7 @@ public interface KStream {
ValueJoiner valueJoiner,
Serde keySerde,
Serde valSerde);
+
/**
* Group the records of this {@link KStream} using the provided {@link KeyValueMapper} and
* default serializers and deserializers. If a record key is null it will not included in
@@ -592,8 +626,8 @@ public interface KStream {
* @return a {@link KGroupedStream} that contains the grouped records of the original {@link KStream}
*/
KGroupedStream groupBy(KeyValueMapper selector,
- Serde keySerde,
- Serde valSerde);
+ Serde keySerde,
+ Serde valSerde);
/**
* Group the records with the same key into a {@link KGroupedStream} while preserving the
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamImpl.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamImpl.java
index bb77e963655..b67fca53589 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamImpl.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamImpl.java
@@ -5,9 +5,9 @@
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
@@ -20,24 +20,25 @@ package org.apache.kafka.streams.kstream.internals;
import org.apache.kafka.common.serialization.Deserializer;
import org.apache.kafka.common.serialization.Serde;
import org.apache.kafka.common.serialization.Serializer;
+import org.apache.kafka.streams.KeyValue;
import org.apache.kafka.streams.errors.TopologyBuilderException;
+import org.apache.kafka.streams.kstream.ForeachAction;
import org.apache.kafka.streams.kstream.JoinWindows;
import org.apache.kafka.streams.kstream.KGroupedStream;
+import org.apache.kafka.streams.kstream.KStream;
import org.apache.kafka.streams.kstream.KStreamBuilder;
import org.apache.kafka.streams.kstream.KTable;
-import org.apache.kafka.streams.KeyValue;
-import org.apache.kafka.streams.kstream.ForeachAction;
-import org.apache.kafka.streams.kstream.TransformerSupplier;
-import org.apache.kafka.streams.kstream.ValueJoiner;
-import org.apache.kafka.streams.kstream.ValueTransformerSupplier;
-import org.apache.kafka.streams.kstream.KStream;
import org.apache.kafka.streams.kstream.KeyValueMapper;
import org.apache.kafka.streams.kstream.Predicate;
+import org.apache.kafka.streams.kstream.TransformerSupplier;
+import org.apache.kafka.streams.kstream.ValueJoiner;
import org.apache.kafka.streams.kstream.ValueMapper;
+import org.apache.kafka.streams.kstream.ValueTransformerSupplier;
import org.apache.kafka.streams.processor.ProcessorSupplier;
import org.apache.kafka.streams.processor.StateStoreSupplier;
import org.apache.kafka.streams.processor.StreamPartitioner;
import org.apache.kafka.streams.state.Stores;
+
import java.io.FileNotFoundException;
import java.io.FileOutputStream;
import java.io.PrintStream;
@@ -63,6 +64,8 @@ public class KStreamImpl extends AbstractStream implements KStream extends AbstractStream implements KStream keySerializer = keySerde == null ? null : keySerde.serializer();
Serializer valSerializer = valSerde == null ? null : valSerde.serializer();
-
+
if (partitioner == null && keySerializer != null && keySerializer instanceof WindowedSerializer) {
WindowedSerializer windowedSerializer = (WindowedSerializer) keySerializer;
partitioner = (StreamPartitioner) new WindowedStreamPartitioner(windowedSerializer);
@@ -386,78 +389,59 @@ public class KStreamImpl extends AbstractStream implements KStream KStream join(
- KStream other,
- ValueJoiner joiner,
- JoinWindows windows,
- Serde keySerde,
- Serde thisValueSerde,
- Serde otherValueSerde) {
+ final KStream other,
+ final ValueJoiner joiner,
+ final JoinWindows windows,
+ final Serde keySerde,
+ final Serde thisValueSerde,
+ final Serde otherValueSerde) {
- return join(other, joiner, windows, keySerde, thisValueSerde, otherValueSerde, false);
+ return doJoin(other, joiner, windows, keySerde, thisValueSerde, otherValueSerde, new KStreamImplJoin(false, false));
}
@Override
public KStream join(
- KStream other,
- ValueJoiner joiner,
- JoinWindows windows) {
+ final KStream other,
+ final ValueJoiner joiner,
+ final JoinWindows windows) {
- return join(other, joiner, windows, null, null, null, false);
+ return join(other, joiner, windows, null, null, null);
}
@Override
public KStream outerJoin(
- KStream other,
- ValueJoiner joiner,
- JoinWindows windows,
- Serde keySerde,
- Serde thisValueSerde,
- Serde otherValueSerde) {
+ final KStream other,
+ final ValueJoiner joiner,
+ final JoinWindows windows,
+ final Serde keySerde,
+ final Serde thisValueSerde,
+ final Serde otherValueSerde) {
- return join(other, joiner, windows, keySerde, thisValueSerde, otherValueSerde, true);
+ return doJoin(other, joiner, windows, keySerde, thisValueSerde, otherValueSerde, new KStreamImplJoin(true, true));
}
@Override
public KStream outerJoin(
- KStream other,
- ValueJoiner joiner,
- JoinWindows windows) {
+ final KStream other,
+ final ValueJoiner joiner,
+ final JoinWindows windows) {
- return join(other, joiner, windows, null, null, null, true);
+ return outerJoin(other, joiner, windows, null, null, null);
}
- @SuppressWarnings("unchecked")
- private KStream join(
- KStream other,
- ValueJoiner joiner,
- JoinWindows windows,
- Serde keySerde,
- Serde thisValueSerde,
- Serde otherValueSerde,
- boolean outer) {
-
- return doJoin(other,
- joiner,
- windows,
- keySerde,
- thisValueSerde,
- otherValueSerde,
- new DefaultJoin(outer));
- }
-
- private KStream doJoin(KStream other,
- ValueJoiner joiner,
- JoinWindows windows,
- Serde keySerde,
- Serde thisValueSerde,
- Serde otherValueSerde,
- KStreamImplJoin join) {
+ private KStream doJoin(final KStream other,
+ final ValueJoiner joiner,
+ final JoinWindows windows,
+ final Serde keySerde,
+ final Serde thisValueSerde,
+ final Serde otherValueSerde,
+ final KStreamImplJoin join) {
Objects.requireNonNull(other, "other KStream can't be null");
Objects.requireNonNull(joiner, "joiner can't be null");
Objects.requireNonNull(windows, "windows can't be null");
KStreamImpl joinThis = this;
- KStreamImpl joinOther = (KStreamImpl) other;
+ KStreamImpl joinOther = (KStreamImpl) other;
if (joinThis.repartitionRequired) {
joinThis = joinThis.repartitionForJoin(keySerde, thisValueSerde, null);
@@ -531,20 +515,20 @@ public class KStreamImpl extends AbstractStream implements KStream KStream leftJoin(
- KStream other,
- ValueJoiner joiner,
- JoinWindows windows,
- Serde keySerde,
- Serde thisValSerde,
- Serde otherValueSerde) {
+ final KStream other,
+ final ValueJoiner joiner,
+ final JoinWindows windows,
+ final Serde keySerde,
+ final Serde thisValSerde,
+ final Serde otherValueSerde) {
return doJoin(other,
- joiner,
- windows,
- keySerde,
- thisValSerde,
- otherValueSerde,
- new LeftJoin());
+ joiner,
+ windows,
+ keySerde,
+ thisValSerde,
+ otherValueSerde,
+ new KStreamImplJoin(true, false));
}
@Override
@@ -558,41 +542,60 @@ public class KStreamImpl extends AbstractStream implements KStream KStream leftJoin(KTable other, ValueJoiner joiner) {
- return leftJoin(other, joiner, null, null);
+ public KStream join(final KTable other, final ValueJoiner joiner) {
+ return join(other, joiner, null, null);
}
- public KStream leftJoin(KTable other,
- ValueJoiner joiner,
- Serde keySerde,
- Serde valueSerde) {
- Objects.requireNonNull(other, "other KTable can't be null");
- Objects.requireNonNull(joiner, "joiner can't be null");
-
+ @Override
+ public KStream join(final KTable other,
+ final ValueJoiner joiner,
+ final Serde keySerde,
+ final Serde valueSerde) {
if (repartitionRequired) {
- KStreamImpl thisStreamRepartitioned = this.repartitionForJoin(keySerde,
- valueSerde, null);
- return thisStreamRepartitioned.doStreamTableLeftJoin(other, joiner);
+ final KStreamImpl thisStreamRepartitioned = repartitionForJoin(keySerde,
+ valueSerde, null);
+ return thisStreamRepartitioned.doStreamTableJoin(other, joiner, false);
} else {
- return doStreamTableLeftJoin(other, joiner);
+ return doStreamTableJoin(other, joiner, false);
}
-
}
- private KStream doStreamTableLeftJoin(final KTable other,
- final ValueJoiner joiner) {
- Set allSourceNodes = ensureJoinableWith((AbstractStream) other);
+ private KStream doStreamTableJoin(final KTable other,
+ final ValueJoiner joiner,
+ final boolean leftJoin) {
+ Objects.requireNonNull(other, "other KTable can't be null");
+ Objects.requireNonNull(joiner, "joiner can't be null");
+
+ final Set allSourceNodes = ensureJoinableWith((AbstractStream) other);
- String name = topology.newName(LEFTJOIN_NAME);
+ final String name = topology.newName(leftJoin ? LEFTJOIN_NAME : JOIN_NAME);
- topology.addProcessor(name, new KStreamKTableLeftJoin<>((KTableImpl) other, joiner), this.name);
- topology.connectProcessorAndStateStores(name, ((KTableImpl) other).valueGetterSupplier().storeNames());
+ topology.addProcessor(name, new KStreamKTableJoin<>((KTableImpl) other, joiner, leftJoin), this.name);
+ topology.connectProcessorAndStateStores(name, other.getStoreName());
topology.connectProcessors(this.name, ((KTableImpl) other).name);
return new KStreamImpl<>(topology, name, allSourceNodes, false);
}
+ @Override
+ public KStream leftJoin(final KTable other, final ValueJoiner joiner) {
+ return leftJoin(other, joiner, null, null);
+ }
+
+ public KStream leftJoin(final KTable other,
+ final ValueJoiner joiner,
+ final Serde keySerde,
+ final Serde valueSerde) {
+ if (repartitionRequired) {
+ final KStreamImpl thisStreamRepartitioned = this.repartitionForJoin(keySerde,
+ valueSerde, null);
+ return thisStreamRepartitioned.doStreamTableJoin(other, joiner, true);
+ } else {
+ return doStreamTableJoin(other, joiner, true);
+ }
+ }
+
@Override
public KGroupedStream groupBy(KeyValueMapper selector) {
return groupBy(selector, null, null);
@@ -600,8 +603,8 @@ public class KStreamImpl extends AbstractStream implements KStream KGroupedStream groupBy(KeyValueMapper selector,
- Serde keySerde,
- Serde valSerde) {
+ Serde keySerde,
+ Serde valSerde) {
Objects.requireNonNull(selector, "selector can't be null");
String selectName = internalSelectKey(selector);
@@ -641,26 +644,16 @@ public class KStreamImpl extends AbstractStream implements KStream KStream join(KStream lhs,
- KStream other,
- ValueJoiner joiner,
- JoinWindows windows,
- Serde keySerde,
- Serde lhsValueSerde,
- Serde otherValueSerde);
- }
-
- private class DefaultJoin implements KStreamImplJoin {
-
- private final boolean outer;
+ private final boolean leftOuter;
+ private final boolean rightOuter;
- DefaultJoin(final boolean outer) {
- this.outer = outer;
+ KStreamImplJoin(final boolean leftOuter, final boolean rightOuter) {
+ this.leftOuter = leftOuter;
+ this.rightOuter = rightOuter;
}
- @Override
public KStream join(KStream lhs,
KStream other,
ValueJoiner joiner,
@@ -670,12 +663,12 @@ public class KStreamImpl extends AbstractStream implements KStream otherValueSerde) {
String thisWindowStreamName = topology.newName(WINDOWED_NAME);
String otherWindowStreamName = topology.newName(WINDOWED_NAME);
- String joinThisName = outer ? topology.newName(OUTERTHIS_NAME) : topology.newName(JOINTHIS_NAME);
- String joinOtherName = outer ? topology.newName(OUTEROTHER_NAME) : topology.newName(JOINOTHER_NAME);
+ String joinThisName = rightOuter ? topology.newName(OUTERTHIS_NAME) : topology.newName(JOINTHIS_NAME);
+ String joinOtherName = leftOuter ? topology.newName(OUTEROTHER_NAME) : topology.newName(JOINOTHER_NAME);
String joinMergeName = topology.newName(MERGE_NAME);
StateStoreSupplier thisWindow =
- createWindowedStateStore(windows, keySerde, lhsValueSerde, joinThisName + "-store");
+ createWindowedStateStore(windows, keySerde, lhsValueSerde, joinThisName + "-store");
StateStoreSupplier otherWindow =
createWindowedStateStore(windows, keySerde, otherValueSerde, joinOtherName + "-store");
@@ -688,16 +681,16 @@ public class KStreamImpl extends AbstractStream implements KStream joinThis = new KStreamKStreamJoin<>(otherWindow.name(),
- windows.before,
- windows.after,
- joiner,
- outer);
- KStreamKStreamJoin joinOther = new KStreamKStreamJoin<>(thisWindow.name(),
- windows.after,
- windows.before,
- reverseJoiner(joiner),
- outer);
+ final KStreamKStreamJoin joinThis = new KStreamKStreamJoin<>(otherWindow.name(),
+ windows.before,
+ windows.after,
+ joiner,
+ leftOuter);
+ final KStreamKStreamJoin joinOther = new KStreamKStreamJoin<>(thisWindow.name(),
+ windows.after,
+ windows.before,
+ reverseJoiner(joiner),
+ rightOuter);
KStreamPassThrough joinMerge = new KStreamPassThrough<>();
@@ -716,39 +709,4 @@ public class KStreamImpl extends AbstractStream implements KStream KStream join(KStream lhs,
- KStream other,
- ValueJoiner joiner,
- JoinWindows windows,
- Serde keySerde,
- Serde lhsValueSerde,
- Serde otherValueSerde) {
- String otherWindowStreamName = topology.newName(WINDOWED_NAME);
- String joinThisName = topology.newName(LEFTJOIN_NAME);
-
- StateStoreSupplier otherWindow =
- createWindowedStateStore(windows, keySerde, otherValueSerde, joinThisName + "-store");
-
- KStreamJoinWindow
- otherWindowedStream = new KStreamJoinWindow<>(otherWindow.name(), windows.before + windows.after + 1, windows.maintainMs());
- KStreamKStreamJoin
- joinThis = new KStreamKStreamJoin<>(otherWindow.name(), windows.before, windows.after, joiner, true);
-
-
-
- topology.addProcessor(otherWindowStreamName, otherWindowedStream, ((AbstractStream) other).name);
- topology.addProcessor(joinThisName, joinThis, ((AbstractStream) lhs).name);
- topology.addStateStore(otherWindow, joinThisName, otherWindowStreamName);
-
- Set allSourceNodes = new HashSet<>(((AbstractStream) lhs).sourceNodes);
- allSourceNodes.addAll(((KStreamImpl) other).sourceNodes);
- return new KStreamImpl<>(topology, joinThisName, allSourceNodes, false);
- }
- }
-
-
}
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamJoin.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamJoin.java
index edde0090832..41547b9d3a5 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamJoin.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamJoin.java
@@ -5,9 +5,9 @@
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
@@ -52,7 +52,6 @@ class KStreamKStreamJoin implements ProcessorSupplier {
private WindowStore otherWindow;
- @SuppressWarnings("unchecked")
@Override
public void init(ProcessorContext context) {
super.init(context);
@@ -62,14 +61,21 @@ class KStreamKStreamJoin implements ProcessorSupplier {
@Override
- public void process(K key, V1 value) {
- if (key == null)
+ public void process(final K key, final V1 value) {
+ // we do join iff keys are equal, thus, if key is null we cannot join and just ignore the record
+ //
+ // we also ignore the record if value is null, because in a key-value data model a null-value indicates
+ // an empty message (ie, there is nothing to be joined) -- this contrast SQL NULL semantics
+ // furthermore, on left/outer joins 'null' in ValueJoiner#apply() indicates a missing record --
+ // thus, to be consistent and to avoid ambiguous null semantics, null values are ignored
+ if (key == null || value == null) {
return;
+ }
- boolean needOuterJoin = KStreamKStreamJoin.this.outer;
+ boolean needOuterJoin = outer;
- long timeFrom = Math.max(0L, context().timestamp() - joinBeforeMs);
- long timeTo = Math.max(0L, context().timestamp() + joinAfterMs);
+ final long timeFrom = Math.max(0L, context().timestamp() - joinBeforeMs);
+ final long timeTo = Math.max(0L, context().timestamp() + joinAfterMs);
try (WindowStoreIterator iter = otherWindow.fetch(key, timeFrom, timeTo)) {
while (iter.hasNext()) {
@@ -77,8 +83,9 @@ class KStreamKStreamJoin implements ProcessorSupplier {
context().forward(key, joiner.apply(value, iter.next().value));
}
- if (needOuterJoin)
+ if (needOuterJoin) {
context().forward(key, joiner.apply(value, null));
+ }
}
}
}
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamKTableJoin.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamKTableJoin.java
new file mode 100644
index 00000000000..1027b96d4f6
--- /dev/null
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamKTableJoin.java
@@ -0,0 +1,75 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.streams.kstream.internals;
+
+import org.apache.kafka.streams.kstream.ValueJoiner;
+import org.apache.kafka.streams.processor.AbstractProcessor;
+import org.apache.kafka.streams.processor.Processor;
+import org.apache.kafka.streams.processor.ProcessorContext;
+import org.apache.kafka.streams.processor.ProcessorSupplier;
+
+class KStreamKTableJoin implements ProcessorSupplier {
+
+ private final KTableValueGetterSupplier valueGetterSupplier;
+ private final ValueJoiner joiner;
+ private final boolean leftJoin;
+
+ KStreamKTableJoin(final KTableImpl table, final ValueJoiner joiner, final boolean leftJoin) {
+ valueGetterSupplier = table.valueGetterSupplier();
+ this.joiner = joiner;
+ this.leftJoin = leftJoin;
+ }
+
+ @Override
+ public Processor get() {
+ return new KStreamKTableJoinProcessor(valueGetterSupplier.get(), leftJoin);
+ }
+
+ private class KStreamKTableJoinProcessor extends AbstractProcessor {
+
+ private final KTableValueGetter valueGetter;
+ private final boolean leftJoin;
+
+ KStreamKTableJoinProcessor(final KTableValueGetter valueGetter, final boolean leftJoin) {
+ this.valueGetter = valueGetter;
+ this.leftJoin = leftJoin;
+ }
+
+ @Override
+ public void init(final ProcessorContext context) {
+ super.init(context);
+ valueGetter.init(context);
+ }
+
+ @Override
+ public void process(final K key, final V1 value) {
+ // we do join iff keys are equal, thus, if key is null we cannot join and just ignore the record
+ //
+ // we also ignore the record if value is null, because in a key-value data model a null-value indicates
+ // an empty message (ie, there is nothing to be joined) -- this contrast SQL NULL semantics
+ // furthermore, on left/outer joins 'null' in ValueJoiner#apply() indicates a missing record --
+ // thus, to be consistent and to avoid ambiguous null semantics, null values are ignored
+ if (key != null && value != null) {
+ final V2 value2 = valueGetter.get(key);
+ if (leftJoin || value2 != null) {
+ context().forward(key, joiner.apply(value, value2));
+ }
+ }
+ }
+ }
+}
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamKTableLeftJoin.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamKTableLeftJoin.java
deleted file mode 100644
index 92b9b59f945..00000000000
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamKTableLeftJoin.java
+++ /dev/null
@@ -1,66 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.kafka.streams.kstream.internals;
-
-import org.apache.kafka.streams.kstream.ValueJoiner;
-import org.apache.kafka.streams.processor.AbstractProcessor;
-import org.apache.kafka.streams.processor.Processor;
-import org.apache.kafka.streams.processor.ProcessorContext;
-import org.apache.kafka.streams.processor.ProcessorSupplier;
-
-class KStreamKTableLeftJoin implements ProcessorSupplier {
-
- private final KTableValueGetterSupplier valueGetterSupplier;
- private final ValueJoiner joiner;
-
- KStreamKTableLeftJoin(KTableImpl table, ValueJoiner joiner) {
- this.valueGetterSupplier = table.valueGetterSupplier();
- this.joiner = joiner;
- }
-
- @Override
- public Processor get() {
- return new KStreamKTableLeftJoinProcessor(valueGetterSupplier.get());
- }
-
- private class KStreamKTableLeftJoinProcessor extends AbstractProcessor {
-
- private final KTableValueGetter valueGetter;
-
- public KStreamKTableLeftJoinProcessor(KTableValueGetter valueGetter) {
- this.valueGetter = valueGetter;
- }
-
- @SuppressWarnings("unchecked")
- @Override
- public void init(ProcessorContext context) {
- super.init(context);
- valueGetter.init(context);
- }
-
- @Override
- public void process(K key, V1 value) {
- // if the key is null, we do not need proceed joining
- // the record with the table
- if (key != null) {
- context().forward(key, joiner.apply(value, valueGetter.get(key)));
- }
- }
- }
-
-}
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamWindowAggregate.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamWindowAggregate.java
index 718e52bda11..55b09166522 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamWindowAggregate.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamWindowAggregate.java
@@ -17,8 +17,8 @@
package org.apache.kafka.streams.kstream.internals;
-import org.apache.kafka.streams.KeyValue;
import org.apache.kafka.streams.kstream.Aggregator;
+import org.apache.kafka.streams.KeyValue;
import org.apache.kafka.streams.kstream.Initializer;
import org.apache.kafka.streams.kstream.Window;
import org.apache.kafka.streams.kstream.Windowed;
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableImpl.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableImpl.java
index 6423cffabf2..683dc00a064 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableImpl.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableImpl.java
@@ -5,9 +5,9 @@
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
@@ -44,6 +44,7 @@ import java.util.Set;
/**
* The implementation class of {@link KTable}.
+ *
* @param the key type
* @param the source's (parent's) value type
* @param the value type
@@ -283,77 +284,55 @@ public class KTableImpl