Browse Source

KAFKA-3902: Optimize KTable.filter in Streams DSL to avoid forwarding if both old and new values are null

The contribution is my original work and that I license the work to the project under the project's open source license.

Contributors: Guozhang Wang, Phil Derome
guozhangwang

Added checkEmpty to validate processor does nothing  and added a inhibit check for filter to fix issue.

Author: Philippe Derome <phderome@gmail.com>
Author: Phil Derome <phderome@gmail.com>
Author: Damian Guy <damian.guy@gmail.com>

Reviewers: Guozhang Wang <wangguoz@gmail.com>

Closes #1556 from phderome/DEROME-3902
pull/1556/merge
Philippe Derome 8 years ago committed by Guozhang Wang
parent
commit
2098529b44
  1. 3
      streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableFilter.java
  2. 39
      streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableFilterTest.java
  3. 6
      streams/src/test/java/org/apache/kafka/test/MockProcessorSupplier.java

3
streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableFilter.java

@ -77,6 +77,9 @@ class KTableFilter<K, V> implements KTableProcessorSupplier<K, V, V> { @@ -77,6 +77,9 @@ class KTableFilter<K, V> implements KTableProcessorSupplier<K, V, V> {
V newValue = computeValue(key, change.newValue);
V oldValue = sendOldValues ? computeValue(key, change.oldValue) : null;
if (sendOldValues && oldValue == null && newValue == null)
return; // unnecessary to forward here.
context().forward(key, new Change<>(newValue, oldValue));
}

39
streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableFilterTest.java

@ -24,6 +24,8 @@ import org.apache.kafka.streams.kstream.KTable; @@ -24,6 +24,8 @@ import org.apache.kafka.streams.kstream.KTable;
import org.apache.kafka.streams.kstream.Predicate;
import org.apache.kafka.test.KStreamTestDriver;
import org.apache.kafka.test.MockProcessorSupplier;
import org.apache.kafka.test.MockReducer;
import org.apache.kafka.test.MockKeyValueMapper;
import org.apache.kafka.test.TestUtils;
import org.junit.After;
import org.junit.Before;
@ -254,7 +256,7 @@ public class KTableFilterTest { @@ -254,7 +256,7 @@ public class KTableFilterTest {
driver.process(topic1, "C", 1);
proc1.checkAndClearProcessResult("A:(1<-null)", "B:(1<-null)", "C:(1<-null)");
proc2.checkAndClearProcessResult("A:(null<-null)", "B:(null<-null)", "C:(null<-null)");
proc2.checkEmptyAndClearProcessResult();
driver.process(topic1, "A", 2);
driver.process(topic1, "B", 2);
@ -271,7 +273,40 @@ public class KTableFilterTest { @@ -271,7 +273,40 @@ public class KTableFilterTest {
driver.process(topic1, "B", null);
proc1.checkAndClearProcessResult("A:(null<-3)", "B:(null<-2)");
proc2.checkAndClearProcessResult("A:(null<-null)", "B:(null<-2)");
proc2.checkAndClearProcessResult("B:(null<-2)");
}
@Test
public void testSkipNullOnMaterialization() throws IOException {
// Do not explicitly set enableSendingOldValues. Let a further downstream stateful operator trigger it instead.
KStreamBuilder builder = new KStreamBuilder();
String topic1 = "topic1";
KTableImpl<String, String, String> table1 =
(KTableImpl<String, String, String>) builder.table(stringSerde, stringSerde, topic1);
KTableImpl<String, String, String> table2 = (KTableImpl<String, String, String>) table1.filter(
new Predicate<String, String>() {
@Override
public boolean test(String key, String value) {
return value.equalsIgnoreCase("accept");
}
}).groupBy(MockKeyValueMapper.<String, String>NoOpKeyValueMapper())
.reduce(MockReducer.STRING_ADDER, MockReducer.STRING_REMOVER, "mock-result");
MockProcessorSupplier<String, String> proc1 = new MockProcessorSupplier<>();
MockProcessorSupplier<String, String> proc2 = new MockProcessorSupplier<>();
builder.addProcessor("proc1", proc1, table1.name);
builder.addProcessor("proc2", proc2, table2.name);
driver = new KStreamTestDriver(builder, stateDir, stringSerde, stringSerde);
driver.process(topic1, "A", "reject");
driver.process(topic1, "B", "reject");
driver.process(topic1, "C", "reject");
proc1.checkAndClearProcessResult("A:(reject<-null)", "B:(reject<-null)", "C:(reject<-null)");
proc2.checkEmptyAndClearProcessResult();
}
}

6
streams/src/test/java/org/apache/kafka/test/MockProcessorSupplier.java

@ -82,6 +82,12 @@ public class MockProcessorSupplier<K, V> implements ProcessorSupplier<K, V> { @@ -82,6 +82,12 @@ public class MockProcessorSupplier<K, V> implements ProcessorSupplier<K, V> {
processed.clear();
}
public void checkEmptyAndClearProcessResult() {
assertEquals("the number of outputs:", 0, processed.size());
processed.clear();
}
public void checkAndClearPunctuateResult(long... expected) {
assertEquals("the number of outputs:", expected.length, punctuated.size());

Loading…
Cancel
Save