Browse Source

KAFKA-8389: Remove redundant bookkeeping from MockProcessor (#6761)

Remove processedKeys / processedValues / processedWithTimestamps as they are covered with processed already.

Reviewers: Matthias J. Sax <mjsax@apache.org>, John Roesler <john@confluent.io>, Boyang Chen <boyang@confluent.io>
pull/6833/head
Guozhang Wang 6 years ago committed by GitHub
parent
commit
a4c0b1841a
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 28
      streams/src/test/java/org/apache/kafka/streams/kstream/internals/KGroupedStreamImplTest.java
  2. 32
      streams/src/test/java/org/apache/kafka/test/MockProcessor.java

28
streams/src/test/java/org/apache/kafka/streams/kstream/internals/KGroupedStreamImplTest.java

@ -22,7 +22,6 @@ import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.common.serialization.StringSerializer; import org.apache.kafka.common.serialization.StringSerializer;
import org.apache.kafka.common.utils.Bytes; import org.apache.kafka.common.utils.Bytes;
import org.apache.kafka.streams.KeyValue; import org.apache.kafka.streams.KeyValue;
import org.apache.kafka.streams.KeyValueTimestamp;
import org.apache.kafka.streams.StreamsBuilder; import org.apache.kafka.streams.StreamsBuilder;
import org.apache.kafka.streams.TopologyTestDriver; import org.apache.kafka.streams.TopologyTestDriver;
import org.apache.kafka.streams.errors.TopologyException; import org.apache.kafka.streams.errors.TopologyException;
@ -43,6 +42,7 @@ import org.apache.kafka.streams.state.ValueAndTimestamp;
import org.apache.kafka.streams.test.ConsumerRecordFactory; import org.apache.kafka.streams.test.ConsumerRecordFactory;
import org.apache.kafka.test.MockAggregator; import org.apache.kafka.test.MockAggregator;
import org.apache.kafka.test.MockInitializer; import org.apache.kafka.test.MockInitializer;
import org.apache.kafka.test.MockProcessor;
import org.apache.kafka.test.MockProcessorSupplier; import org.apache.kafka.test.MockProcessorSupplier;
import org.apache.kafka.test.MockReducer; import org.apache.kafka.test.MockReducer;
import org.apache.kafka.test.StreamsTestUtils; import org.apache.kafka.test.StreamsTestUtils;
@ -575,19 +575,19 @@ public class KGroupedStreamImplTest {
driver.pipeInput(recordFactory.create(TOPIC, "2", "B", 500L)); driver.pipeInput(recordFactory.create(TOPIC, "2", "B", 500L));
driver.pipeInput(recordFactory.create(TOPIC, "3", "B", 100L)); driver.pipeInput(recordFactory.create(TOPIC, "3", "B", 100L));
} }
assertThat(supplier.theCapturedProcessor().processedWithTimestamps, equalTo(Arrays.asList( assertThat(supplier.theCapturedProcessor().processed, equalTo(Arrays.asList(
new KeyValueTimestamp<>(new Windowed<>("1", new TimeWindow(0L, 500L)), 1L, 0L), MockProcessor.makeRecord(new Windowed<>("1", new TimeWindow(0L, 500L)), 1L, 0L),
new KeyValueTimestamp<>(new Windowed<>("1", new TimeWindow(0L, 500L)), 2L, 499L), MockProcessor.makeRecord(new Windowed<>("1", new TimeWindow(0L, 500L)), 2L, 499L),
new KeyValueTimestamp<>(new Windowed<>("1", new TimeWindow(0L, 500L)), 3L, 499L), MockProcessor.makeRecord(new Windowed<>("1", new TimeWindow(0L, 500L)), 3L, 499L),
new KeyValueTimestamp<>(new Windowed<>("2", new TimeWindow(0L, 500L)), 1L, 0L), MockProcessor.makeRecord(new Windowed<>("2", new TimeWindow(0L, 500L)), 1L, 0L),
new KeyValueTimestamp<>(new Windowed<>("2", new TimeWindow(0L, 500L)), 2L, 100L), MockProcessor.makeRecord(new Windowed<>("2", new TimeWindow(0L, 500L)), 2L, 100L),
new KeyValueTimestamp<>(new Windowed<>("2", new TimeWindow(0L, 500L)), 3L, 200L), MockProcessor.makeRecord(new Windowed<>("2", new TimeWindow(0L, 500L)), 3L, 200L),
new KeyValueTimestamp<>(new Windowed<>("3", new TimeWindow(0L, 500L)), 1L, 1L), MockProcessor.makeRecord(new Windowed<>("3", new TimeWindow(0L, 500L)), 1L, 1L),
new KeyValueTimestamp<>(new Windowed<>("1", new TimeWindow(500L, 1000L)), 1L, 500L), MockProcessor.makeRecord(new Windowed<>("1", new TimeWindow(500L, 1000L)), 1L, 500L),
new KeyValueTimestamp<>(new Windowed<>("1", new TimeWindow(500L, 1000L)), 2L, 500L), MockProcessor.makeRecord(new Windowed<>("1", new TimeWindow(500L, 1000L)), 2L, 500L),
new KeyValueTimestamp<>(new Windowed<>("2", new TimeWindow(500L, 1000L)), 1L, 500L), MockProcessor.makeRecord(new Windowed<>("2", new TimeWindow(500L, 1000L)), 1L, 500L),
new KeyValueTimestamp<>(new Windowed<>("2", new TimeWindow(500L, 1000L)), 2L, 500L), MockProcessor.makeRecord(new Windowed<>("2", new TimeWindow(500L, 1000L)), 2L, 500L),
new KeyValueTimestamp<>(new Windowed<>("3", new TimeWindow(0L, 500L)), 2L, 100L) MockProcessor.makeRecord(new Windowed<>("3", new TimeWindow(0L, 500L)), 2L, 100L)
))); )));
} }

32
streams/src/test/java/org/apache/kafka/test/MockProcessor.java

@ -16,7 +16,6 @@
*/ */
package org.apache.kafka.test; package org.apache.kafka.test;
import org.apache.kafka.streams.KeyValueTimestamp;
import org.apache.kafka.streams.processor.AbstractProcessor; import org.apache.kafka.streams.processor.AbstractProcessor;
import org.apache.kafka.streams.processor.Cancellable; import org.apache.kafka.streams.processor.Cancellable;
import org.apache.kafka.streams.processor.ProcessorContext; import org.apache.kafka.streams.processor.ProcessorContext;
@ -30,13 +29,9 @@ import java.util.Map;
import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertEquals;
@SuppressWarnings("WeakerAccess")
public class MockProcessor<K, V> extends AbstractProcessor<K, V> { public class MockProcessor<K, V> extends AbstractProcessor<K, V> {
public final ArrayList<String> processed = new ArrayList<>(); public final ArrayList<String> processed = new ArrayList<>();
public final ArrayList<K> processedKeys = new ArrayList<>();
public final ArrayList<V> processedValues = new ArrayList<>();
public final ArrayList<KeyValueTimestamp> processedWithTimestamps = new ArrayList<>();
public final Map<K, ValueAndTimestamp<V>> lastValueAndTimestampPerKey = new HashMap<>(); public final Map<K, ValueAndTimestamp<V>> lastValueAndTimestampPerKey = new HashMap<>();
public final ArrayList<Long> punctuatedStreamTime = new ArrayList<>(); public final ArrayList<Long> punctuatedStreamTime = new ArrayList<>();
@ -81,19 +76,13 @@ public class MockProcessor<K, V> extends AbstractProcessor<K, V> {
@Override @Override
public void process(final K key, final V value) { public void process(final K key, final V value) {
processedKeys.add(key);
processedValues.add(value);
processedWithTimestamps.add(new KeyValueTimestamp<>(key, value, context().timestamp()));
if (value != null) { if (value != null) {
lastValueAndTimestampPerKey.put(key, ValueAndTimestamp.make(value, context().timestamp())); lastValueAndTimestampPerKey.put(key, ValueAndTimestamp.make(value, context().timestamp()));
} else { } else {
lastValueAndTimestampPerKey.remove(key); lastValueAndTimestampPerKey.remove(key);
} }
processed.add(
(key == null ? "null" : key) + processed.add(makeRecord(key, value, context().timestamp()));
":" + (value == null ? "null" : value) +
" (ts: " + context().timestamp() + ")"
);
if (commitRequested) { if (commitRequested) {
context().commit(); context().commit();
@ -101,24 +90,19 @@ public class MockProcessor<K, V> extends AbstractProcessor<K, V> {
} }
} }
public void checkAndClearProcessResult(final String... expected) { public static String makeRecord(final Object key, final Object value, final long timestamp) {
assertEquals("the number of outputs:" + processed, expected.length, processed.size()); return (key == null ? "null" : key) +
for (int i = 0; i < expected.length; i++) { ":" + (value == null ? "null" : value) +
assertEquals("output[" + i + "]:", expected[i], processed.get(i)); " (ts: " + timestamp + ")";
}
processed.clear();
processedWithTimestamps.clear();
} }
public void checkAndClearProcessResult(final KeyValueTimestamp... expected) { public void checkAndClearProcessResult(final String... expected) {
assertEquals("the number of outputs:" + processed, expected.length, processed.size()); assertEquals("the number of outputs:" + processed, expected.length, processed.size());
for (int i = 0; i < expected.length; i++) { for (int i = 0; i < expected.length; i++) {
assertEquals("output[" + i + "]:", expected[i], processedWithTimestamps.get(i)); assertEquals("output[" + i + "]:", expected[i], processed.get(i));
} }
processed.clear(); processed.clear();
processedWithTimestamps.clear();
} }
public void requestCommit() { public void requestCommit() {

Loading…
Cancel
Save