diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KGroupedStreamImplTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KGroupedStreamImplTest.java index 0c850e711b2..b19fd55bfad 100644 --- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KGroupedStreamImplTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KGroupedStreamImplTest.java @@ -22,7 +22,6 @@ import org.apache.kafka.common.serialization.Serdes; import org.apache.kafka.common.serialization.StringSerializer; import org.apache.kafka.common.utils.Bytes; import org.apache.kafka.streams.KeyValue; -import org.apache.kafka.streams.KeyValueTimestamp; import org.apache.kafka.streams.StreamsBuilder; import org.apache.kafka.streams.TopologyTestDriver; import org.apache.kafka.streams.errors.TopologyException; @@ -43,6 +42,7 @@ import org.apache.kafka.streams.state.ValueAndTimestamp; import org.apache.kafka.streams.test.ConsumerRecordFactory; import org.apache.kafka.test.MockAggregator; import org.apache.kafka.test.MockInitializer; +import org.apache.kafka.test.MockProcessor; import org.apache.kafka.test.MockProcessorSupplier; import org.apache.kafka.test.MockReducer; import org.apache.kafka.test.StreamsTestUtils; @@ -575,19 +575,19 @@ public class KGroupedStreamImplTest { driver.pipeInput(recordFactory.create(TOPIC, "2", "B", 500L)); driver.pipeInput(recordFactory.create(TOPIC, "3", "B", 100L)); } - assertThat(supplier.theCapturedProcessor().processedWithTimestamps, equalTo(Arrays.asList( - new KeyValueTimestamp<>(new Windowed<>("1", new TimeWindow(0L, 500L)), 1L, 0L), - new KeyValueTimestamp<>(new Windowed<>("1", new TimeWindow(0L, 500L)), 2L, 499L), - new KeyValueTimestamp<>(new Windowed<>("1", new TimeWindow(0L, 500L)), 3L, 499L), - new KeyValueTimestamp<>(new Windowed<>("2", new TimeWindow(0L, 500L)), 1L, 0L), - new KeyValueTimestamp<>(new Windowed<>("2", new TimeWindow(0L, 500L)), 2L, 100L), - new KeyValueTimestamp<>(new Windowed<>("2", new TimeWindow(0L, 500L)), 3L, 200L), - new KeyValueTimestamp<>(new Windowed<>("3", new TimeWindow(0L, 500L)), 1L, 1L), - new KeyValueTimestamp<>(new Windowed<>("1", new TimeWindow(500L, 1000L)), 1L, 500L), - new KeyValueTimestamp<>(new Windowed<>("1", new TimeWindow(500L, 1000L)), 2L, 500L), - new KeyValueTimestamp<>(new Windowed<>("2", new TimeWindow(500L, 1000L)), 1L, 500L), - new KeyValueTimestamp<>(new Windowed<>("2", new TimeWindow(500L, 1000L)), 2L, 500L), - new KeyValueTimestamp<>(new Windowed<>("3", new TimeWindow(0L, 500L)), 2L, 100L) + assertThat(supplier.theCapturedProcessor().processed, equalTo(Arrays.asList( + MockProcessor.makeRecord(new Windowed<>("1", new TimeWindow(0L, 500L)), 1L, 0L), + MockProcessor.makeRecord(new Windowed<>("1", new TimeWindow(0L, 500L)), 2L, 499L), + MockProcessor.makeRecord(new Windowed<>("1", new TimeWindow(0L, 500L)), 3L, 499L), + MockProcessor.makeRecord(new Windowed<>("2", new TimeWindow(0L, 500L)), 1L, 0L), + MockProcessor.makeRecord(new Windowed<>("2", new TimeWindow(0L, 500L)), 2L, 100L), + MockProcessor.makeRecord(new Windowed<>("2", new TimeWindow(0L, 500L)), 3L, 200L), + MockProcessor.makeRecord(new Windowed<>("3", new TimeWindow(0L, 500L)), 1L, 1L), + MockProcessor.makeRecord(new Windowed<>("1", new TimeWindow(500L, 1000L)), 1L, 500L), + MockProcessor.makeRecord(new Windowed<>("1", new TimeWindow(500L, 1000L)), 2L, 500L), + MockProcessor.makeRecord(new Windowed<>("2", new TimeWindow(500L, 1000L)), 1L, 500L), + MockProcessor.makeRecord(new Windowed<>("2", new TimeWindow(500L, 1000L)), 2L, 500L), + MockProcessor.makeRecord(new Windowed<>("3", new TimeWindow(0L, 500L)), 2L, 100L) ))); } diff --git a/streams/src/test/java/org/apache/kafka/test/MockProcessor.java b/streams/src/test/java/org/apache/kafka/test/MockProcessor.java index 0c7a01575f7..f1e05975401 100644 --- a/streams/src/test/java/org/apache/kafka/test/MockProcessor.java +++ b/streams/src/test/java/org/apache/kafka/test/MockProcessor.java @@ -16,7 +16,6 @@ */ package org.apache.kafka.test; -import org.apache.kafka.streams.KeyValueTimestamp; import org.apache.kafka.streams.processor.AbstractProcessor; import org.apache.kafka.streams.processor.Cancellable; import org.apache.kafka.streams.processor.ProcessorContext; @@ -30,13 +29,9 @@ import java.util.Map; import static org.junit.Assert.assertEquals; -@SuppressWarnings("WeakerAccess") public class MockProcessor extends AbstractProcessor { public final ArrayList processed = new ArrayList<>(); - public final ArrayList processedKeys = new ArrayList<>(); - public final ArrayList processedValues = new ArrayList<>(); - public final ArrayList processedWithTimestamps = new ArrayList<>(); public final Map> lastValueAndTimestampPerKey = new HashMap<>(); public final ArrayList punctuatedStreamTime = new ArrayList<>(); @@ -81,19 +76,13 @@ public class MockProcessor extends AbstractProcessor { @Override public void process(final K key, final V value) { - processedKeys.add(key); - processedValues.add(value); - processedWithTimestamps.add(new KeyValueTimestamp<>(key, value, context().timestamp())); if (value != null) { lastValueAndTimestampPerKey.put(key, ValueAndTimestamp.make(value, context().timestamp())); } else { lastValueAndTimestampPerKey.remove(key); } - processed.add( - (key == null ? "null" : key) + - ":" + (value == null ? "null" : value) + - " (ts: " + context().timestamp() + ")" - ); + + processed.add(makeRecord(key, value, context().timestamp())); if (commitRequested) { context().commit(); @@ -101,24 +90,19 @@ public class MockProcessor extends AbstractProcessor { } } - public void checkAndClearProcessResult(final String... expected) { - assertEquals("the number of outputs:" + processed, expected.length, processed.size()); - for (int i = 0; i < expected.length; i++) { - assertEquals("output[" + i + "]:", expected[i], processed.get(i)); - } - - processed.clear(); - processedWithTimestamps.clear(); + public static String makeRecord(final Object key, final Object value, final long timestamp) { + return (key == null ? "null" : key) + + ":" + (value == null ? "null" : value) + + " (ts: " + timestamp + ")"; } - public void checkAndClearProcessResult(final KeyValueTimestamp... expected) { + public void checkAndClearProcessResult(final String... expected) { assertEquals("the number of outputs:" + processed, expected.length, processed.size()); for (int i = 0; i < expected.length; i++) { - assertEquals("output[" + i + "]:", expected[i], processedWithTimestamps.get(i)); + assertEquals("output[" + i + "]:", expected[i], processed.get(i)); } processed.clear(); - processedWithTimestamps.clear(); } public void requestCommit() {