diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamWindowReduceTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamWindowReduceTest.java index aa2397170f6..4ae2f76698b 100644 --- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamWindowReduceTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamWindowReduceTest.java @@ -17,25 +17,32 @@ package org.apache.kafka.streams.kstream.internals; import org.apache.kafka.common.serialization.Serdes; -import org.apache.kafka.streams.kstream.Consumed; +import org.apache.kafka.common.serialization.StringSerializer; import org.apache.kafka.streams.StreamsBuilder; +import org.apache.kafka.streams.TopologyTestDriver; +import org.apache.kafka.streams.kstream.Consumed; import org.apache.kafka.streams.kstream.Reducer; import org.apache.kafka.streams.kstream.Serialized; import org.apache.kafka.streams.kstream.TimeWindows; import org.apache.kafka.streams.processor.internals.testutil.LogCaptureAppender; -import org.apache.kafka.test.KStreamTestDriver; -import org.apache.kafka.test.TestUtils; +import org.apache.kafka.streams.test.ConsumerRecordFactory; +import org.apache.kafka.test.StreamsTestUtils; import org.junit.Test; +import java.util.Properties; + import static org.apache.kafka.test.StreamsTestUtils.getMetricByName; import static org.hamcrest.CoreMatchers.hasItem; import static org.hamcrest.MatcherAssert.assertThat; import static org.junit.Assert.assertEquals; public class KStreamWindowReduceTest { + + private final Properties props = StreamsTestUtils.topologyTestConfig(Serdes.String(), Serdes.String()); + private final ConsumerRecordFactory recordFactory = new ConsumerRecordFactory<>(new StringSerializer(), new StringSerializer()); + @Test public void shouldLogAndMeterOnNullKey() { - final KStreamTestDriver driver = new KStreamTestDriver(); final StreamsBuilder builder = new StreamsBuilder(); builder @@ -49,14 +56,14 @@ public class KStreamWindowReduceTest { } }); - driver.setUp(builder, TestUtils.tempDirectory(), 0); - final LogCaptureAppender appender = LogCaptureAppender.createAndRegister(); - driver.process("TOPIC", null, "asdf"); - driver.flushState(); - LogCaptureAppender.unregister(appender); + try (final TopologyTestDriver driver = new TopologyTestDriver(builder.build(), props)) { + final LogCaptureAppender appender = LogCaptureAppender.createAndRegister(); + driver.pipeInput(recordFactory.create("TOPIC", null, "asdf")); + LogCaptureAppender.unregister(appender); - assertEquals(1.0, getMetricByName(driver.context().metrics().metrics(), "skipped-records-total", "stream-metrics").metricValue()); - assertThat(appender.getMessages(), hasItem("Skipping record due to null key. value=[asdf] topic=[TOPIC] partition=[-1] offset=[-1]")); + assertEquals(1.0, getMetricByName(driver.metrics(), "skipped-records-total", "stream-metrics").metricValue()); + assertThat(appender.getMessages(), hasItem("Skipping record due to null key. value=[asdf] topic=[TOPIC] partition=[0] offset=[0]")); + } } } diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableMapKeysTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableMapKeysTest.java index 14552d6b325..081c6a069aa 100644 --- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableMapKeysTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableMapKeysTest.java @@ -17,40 +17,30 @@ package org.apache.kafka.streams.kstream.internals; -import org.apache.kafka.common.serialization.Serde; +import org.apache.kafka.common.serialization.IntegerSerializer; import org.apache.kafka.common.serialization.Serdes; -import org.apache.kafka.streams.kstream.Consumed; +import org.apache.kafka.common.serialization.StringSerializer; import org.apache.kafka.streams.StreamsBuilder; +import org.apache.kafka.streams.TopologyTestDriver; +import org.apache.kafka.streams.kstream.Consumed; import org.apache.kafka.streams.kstream.KStream; import org.apache.kafka.streams.kstream.KTable; import org.apache.kafka.streams.kstream.KeyValueMapper; -import org.apache.kafka.test.KStreamTestDriver; +import org.apache.kafka.streams.test.ConsumerRecordFactory; import org.apache.kafka.test.MockProcessorSupplier; -import org.apache.kafka.test.TestUtils; - -import org.junit.Before; -import org.junit.Rule; +import org.apache.kafka.test.StreamsTestUtils; import org.junit.Test; -import java.io.File; import java.util.HashMap; import java.util.Map; +import java.util.Properties; import static org.junit.Assert.assertEquals; public class KTableMapKeysTest { - final private Serde stringSerde = new Serdes.StringSerde(); - final private Serde integerSerde = new Serdes.IntegerSerde(); - private File stateDir = null; - @Rule - public final KStreamTestDriver driver = new KStreamTestDriver(); - - - @Before - public void setUp() { - stateDir = TestUtils.tempDirectory("kafka-test"); - } + private final ConsumerRecordFactory recordFactory = new ConsumerRecordFactory<>(new IntegerSerializer(), new StringSerializer()); + private final Properties props = StreamsTestUtils.topologyTestConfig(Serdes.Integer(), Serdes.String()); @Test public void testMapKeysConvertingToStream() { @@ -58,7 +48,7 @@ public class KTableMapKeysTest { String topic1 = "topic_map_keys"; - KTable table1 = builder.table(topic1, Consumed.with(integerSerde, stringSerde)); + KTable table1 = builder.table(topic1, Consumed.with(Serdes.Integer(), Serdes.String())); final Map keyMap = new HashMap<>(); keyMap.put(1, "ONE"); @@ -82,11 +72,11 @@ public class KTableMapKeysTest { convertedStream.process(supplier); - driver.setUp(builder, stateDir); - for (int i = 0; i < originalKeys.length; i++) { - driver.process(topic1, originalKeys[i], values[i]); + try (final TopologyTestDriver driver = new TopologyTestDriver(builder.build(), props)) { + for (int i = 0; i < originalKeys.length; i++) { + driver.pipeInput(recordFactory.create(topic1, originalKeys[i], values[i])); + } } - driver.flushState(); assertEquals(3, supplier.theCapturedProcessor().processed.size()); diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/SessionWindowedKStreamImplTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/SessionWindowedKStreamImplTest.java index 08fa65c2ad9..825edb3eb37 100644 --- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/SessionWindowedKStreamImplTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/SessionWindowedKStreamImplTest.java @@ -18,10 +18,12 @@ package org.apache.kafka.streams.kstream.internals; import org.apache.kafka.common.serialization.Serdes; +import org.apache.kafka.common.serialization.StringSerializer; import org.apache.kafka.common.utils.Bytes; -import org.apache.kafka.streams.kstream.Consumed; import org.apache.kafka.streams.KeyValue; import org.apache.kafka.streams.StreamsBuilder; +import org.apache.kafka.streams.TopologyTestDriver; +import org.apache.kafka.streams.kstream.Consumed; import org.apache.kafka.streams.kstream.ForeachAction; import org.apache.kafka.streams.kstream.KStream; import org.apache.kafka.streams.kstream.Materialized; @@ -31,20 +33,19 @@ import org.apache.kafka.streams.kstream.SessionWindowedKStream; import org.apache.kafka.streams.kstream.SessionWindows; import org.apache.kafka.streams.kstream.Windowed; import org.apache.kafka.streams.state.SessionStore; -import org.apache.kafka.test.KStreamTestDriver; +import org.apache.kafka.streams.test.ConsumerRecordFactory; import org.apache.kafka.test.MockAggregator; import org.apache.kafka.test.MockInitializer; import org.apache.kafka.test.MockReducer; import org.apache.kafka.test.StreamsTestUtils; -import org.apache.kafka.test.TestUtils; import org.junit.Before; -import org.junit.Rule; import org.junit.Test; import java.util.Arrays; import java.util.HashMap; import java.util.List; import java.util.Map; +import java.util.Properties; import static org.hamcrest.CoreMatchers.equalTo; import static org.hamcrest.MatcherAssert.assertThat; @@ -53,9 +54,9 @@ public class SessionWindowedKStreamImplTest { private static final String TOPIC = "input"; private final StreamsBuilder builder = new StreamsBuilder(); + private final ConsumerRecordFactory recordFactory = new ConsumerRecordFactory<>(new StringSerializer(), new StringSerializer()); + private final Properties props = StreamsTestUtils.topologyTestConfig(Serdes.String(), Serdes.String()); - @Rule - public final KStreamTestDriver driver = new KStreamTestDriver(); private final Merger sessionMerger = new Merger() { @Override public String apply(final String aggKey, final String aggOne, final String aggTwo) { @@ -83,7 +84,9 @@ public class SessionWindowedKStreamImplTest { } }); - processData(); + try (final TopologyTestDriver driver = new TopologyTestDriver(builder.build(), props, 0L)) { + processData(driver); + } assertThat(results.get(new Windowed<>("1", new SessionWindow(10, 15))), equalTo(2L)); assertThat(results.get(new Windowed<>("2", new SessionWindow(600, 600))), equalTo(1L)); assertThat(results.get(new Windowed<>("1", new SessionWindow(600, 600))), equalTo(1L)); @@ -101,7 +104,9 @@ public class SessionWindowedKStreamImplTest { } }); - processData(); + try (final TopologyTestDriver driver = new TopologyTestDriver(builder.build(), props, 0L)) { + processData(driver); + } assertThat(results.get(new Windowed<>("1", new SessionWindow(10, 15))), equalTo("1+2")); assertThat(results.get(new Windowed<>("2", new SessionWindow(600, 600))), equalTo("1")); assertThat(results.get(new Windowed<>("1", new SessionWindow(600, 600))), equalTo("3")); @@ -121,42 +126,45 @@ public class SessionWindowedKStreamImplTest { results.put(key, value); } }); - processData(); + try (final TopologyTestDriver driver = new TopologyTestDriver(builder.build(), props, 0L)) { + processData(driver); + } assertThat(results.get(new Windowed<>("1", new SessionWindow(10, 15))), equalTo("0+0+1+2")); assertThat(results.get(new Windowed<>("2", new SessionWindow(600, 600))), equalTo("0+1")); assertThat(results.get(new Windowed<>("1", new SessionWindow(600, 600))), equalTo("0+3")); } - @SuppressWarnings("unchecked") @Test public void shouldMaterializeCount() { stream.count(Materialized.>as("count-store")); - processData(); - final SessionStore store = (SessionStore) driver.allStateStores().get("count-store"); - final List, Long>> data = StreamsTestUtils.toList(store.fetch("1", "2")); - assertThat(data, equalTo(Arrays.asList( - KeyValue.pair(new Windowed<>("1", new SessionWindow(10, 15)), 2L), - KeyValue.pair(new Windowed<>("1", new SessionWindow(600, 600)), 1L), - KeyValue.pair(new Windowed<>("2", new SessionWindow(600, 600)), 1L)))); + try (final TopologyTestDriver driver = new TopologyTestDriver(builder.build(), props, 0L)) { + processData(driver); + final SessionStore store = driver.getSessionStore("count-store"); + final List, Long>> data = StreamsTestUtils.toList(store.fetch("1", "2")); + assertThat(data, equalTo(Arrays.asList( + KeyValue.pair(new Windowed<>("1", new SessionWindow(10, 15)), 2L), + KeyValue.pair(new Windowed<>("1", new SessionWindow(600, 600)), 1L), + KeyValue.pair(new Windowed<>("2", new SessionWindow(600, 600)), 1L)))); + } } - @SuppressWarnings("unchecked") @Test public void shouldMaterializeReduced() { stream.reduce(MockReducer.STRING_ADDER, Materialized.>as("reduced")); - processData(); - final SessionStore sessionStore = (SessionStore) driver.allStateStores().get("reduced"); - final List, String>> data = StreamsTestUtils.toList(sessionStore.fetch("1", "2")); + try (final TopologyTestDriver driver = new TopologyTestDriver(builder.build(), props, 0L)) { + processData(driver); + final SessionStore sessionStore = driver.getSessionStore("reduced"); + final List, String>> data = StreamsTestUtils.toList(sessionStore.fetch("1", "2")); - assertThat(data, equalTo(Arrays.asList( - KeyValue.pair(new Windowed<>("1", new SessionWindow(10, 15)), "1+2"), - KeyValue.pair(new Windowed<>("1", new SessionWindow(600, 600)), "3"), - KeyValue.pair(new Windowed<>("2", new SessionWindow(600, 600)), "1")))); + assertThat(data, equalTo(Arrays.asList( + KeyValue.pair(new Windowed<>("1", new SessionWindow(10, 15)), "1+2"), + KeyValue.pair(new Windowed<>("1", new SessionWindow(600, 600)), "3"), + KeyValue.pair(new Windowed<>("2", new SessionWindow(600, 600)), "1")))); + } } - @SuppressWarnings("unchecked") @Test public void shouldMaterializeAggregated() { stream.aggregate(MockInitializer.STRING_INIT, @@ -164,13 +172,15 @@ public class SessionWindowedKStreamImplTest { sessionMerger, Materialized.>as("aggregated").withValueSerde(Serdes.String())); - processData(); - final SessionStore sessionStore = (SessionStore) driver.allStateStores().get("aggregated"); - final List, String>> data = StreamsTestUtils.toList(sessionStore.fetch("1", "2")); - assertThat(data, equalTo(Arrays.asList( - KeyValue.pair(new Windowed<>("1", new SessionWindow(10, 15)), "0+0+1+2"), - KeyValue.pair(new Windowed<>("1", new SessionWindow(600, 600)), "0+3"), - KeyValue.pair(new Windowed<>("2", new SessionWindow(600, 600)), "0+1")))); + try (final TopologyTestDriver driver = new TopologyTestDriver(builder.build(), props, 0L)) { + processData(driver); + final SessionStore sessionStore = driver.getSessionStore("aggregated"); + final List, String>> data = StreamsTestUtils.toList(sessionStore.fetch("1", "2")); + assertThat(data, equalTo(Arrays.asList( + KeyValue.pair(new Windowed<>("1", new SessionWindow(10, 15)), "0+0+1+2"), + KeyValue.pair(new Windowed<>("1", new SessionWindow(600, 600)), "0+3"), + KeyValue.pair(new Windowed<>("2", new SessionWindow(600, 600)), "0+1")))); + } } @Test(expected = NullPointerException.class) @@ -243,16 +253,11 @@ public class SessionWindowedKStreamImplTest { stream.count(null); } - private void processData() { - driver.setUp(builder, TestUtils.tempDirectory(), 0); - driver.setTime(10); - driver.process(TOPIC, "1", "1"); - driver.setTime(15); - driver.process(TOPIC, "1", "2"); - driver.setTime(600); - driver.process(TOPIC, "1", "3"); - driver.process(TOPIC, "2", "1"); - driver.flushState(); + private void processData(final TopologyTestDriver driver) { + driver.pipeInput(recordFactory.create(TOPIC, "1", "1", 10)); + driver.pipeInput(recordFactory.create(TOPIC, "1", "2", 15)); + driver.pipeInput(recordFactory.create(TOPIC, "1", "3", 600)); + driver.pipeInput(recordFactory.create(TOPIC, "2", "1", 600)); } } \ No newline at end of file diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/TimeWindowedKStreamImplTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/TimeWindowedKStreamImplTest.java index 610e52f2ed6..7b885b23bf2 100644 --- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/TimeWindowedKStreamImplTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/TimeWindowedKStreamImplTest.java @@ -18,32 +18,33 @@ package org.apache.kafka.streams.kstream.internals; import org.apache.kafka.common.serialization.Serdes; +import org.apache.kafka.common.serialization.StringSerializer; import org.apache.kafka.common.utils.Bytes; -import org.apache.kafka.streams.kstream.Consumed; import org.apache.kafka.streams.KeyValue; import org.apache.kafka.streams.StreamsBuilder; +import org.apache.kafka.streams.TopologyTestDriver; +import org.apache.kafka.streams.kstream.Consumed; import org.apache.kafka.streams.kstream.ForeachAction; import org.apache.kafka.streams.kstream.KStream; import org.apache.kafka.streams.kstream.Materialized; import org.apache.kafka.streams.kstream.Serialized; +import org.apache.kafka.streams.kstream.TimeWindowedKStream; import org.apache.kafka.streams.kstream.TimeWindows; import org.apache.kafka.streams.kstream.Windowed; -import org.apache.kafka.streams.kstream.TimeWindowedKStream; import org.apache.kafka.streams.state.WindowStore; -import org.apache.kafka.test.KStreamTestDriver; +import org.apache.kafka.streams.test.ConsumerRecordFactory; import org.apache.kafka.test.MockAggregator; import org.apache.kafka.test.MockInitializer; import org.apache.kafka.test.MockReducer; import org.apache.kafka.test.StreamsTestUtils; -import org.apache.kafka.test.TestUtils; import org.junit.Before; -import org.junit.Rule; import org.junit.Test; import java.util.Arrays; import java.util.HashMap; import java.util.List; import java.util.Map; +import java.util.Properties; import static org.hamcrest.CoreMatchers.equalTo; import static org.hamcrest.MatcherAssert.assertThat; @@ -52,9 +53,8 @@ public class TimeWindowedKStreamImplTest { private static final String TOPIC = "input"; private final StreamsBuilder builder = new StreamsBuilder(); - - @Rule - public final KStreamTestDriver driver = new KStreamTestDriver(); + private final ConsumerRecordFactory recordFactory = new ConsumerRecordFactory<>(new StringSerializer(), new StringSerializer()); + private final Properties props = StreamsTestUtils.topologyTestConfig(Serdes.String(), Serdes.String()); private TimeWindowedKStream windowedStream; @Before @@ -76,7 +76,9 @@ public class TimeWindowedKStreamImplTest { } }); - processData(); + try (final TopologyTestDriver driver = new TopologyTestDriver(builder.build(), props, 0L)) { + processData(driver); + } assertThat(results.get(new Windowed<>("1", new TimeWindow(0, 500))), equalTo(2L)); assertThat(results.get(new Windowed<>("2", new TimeWindow(500, 1000))), equalTo(1L)); assertThat(results.get(new Windowed<>("1", new TimeWindow(500, 1000))), equalTo(1L)); @@ -95,7 +97,9 @@ public class TimeWindowedKStreamImplTest { } }); - processData(); + try (final TopologyTestDriver driver = new TopologyTestDriver(builder.build(), props, 0L)) { + processData(driver); + } assertThat(results.get(new Windowed<>("1", new TimeWindow(0, 500))), equalTo("1+2")); assertThat(results.get(new Windowed<>("2", new TimeWindow(500, 1000))), equalTo("1")); assertThat(results.get(new Windowed<>("1", new TimeWindow(500, 1000))), equalTo("3")); @@ -115,29 +119,32 @@ public class TimeWindowedKStreamImplTest { results.put(key, value); } }); - processData(); + try (final TopologyTestDriver driver = new TopologyTestDriver(builder.build(), props, 0L)) { + processData(driver); + } assertThat(results.get(new Windowed<>("1", new TimeWindow(0, 500))), equalTo("0+1+2")); assertThat(results.get(new Windowed<>("2", new TimeWindow(500, 1000))), equalTo("0+1")); assertThat(results.get(new Windowed<>("1", new TimeWindow(500, 1000))), equalTo("0+3")); } - @SuppressWarnings("unchecked") @Test public void shouldMaterializeCount() { windowedStream.count(Materialized.>as("count-store") .withKeySerde(Serdes.String()) .withValueSerde(Serdes.Long())); - processData(); - final WindowStore windowStore = (WindowStore) driver.allStateStores().get("count-store"); - final List, Long>> data = StreamsTestUtils.toList(windowStore.fetch("1", "2", 0, 1000)); - assertThat(data, equalTo(Arrays.asList( - KeyValue.pair(new Windowed<>("1", new TimeWindow(0, 500)), 2L), - KeyValue.pair(new Windowed<>("1", new TimeWindow(500, 1000)), 1L), - KeyValue.pair(new Windowed<>("2", new TimeWindow(500, 1000)), 1L)))); + try (final TopologyTestDriver driver = new TopologyTestDriver(builder.build(), props, 0L)) { + processData(driver); + final WindowStore windowStore = driver.getWindowStore("count-store"); + final List, Long>> data = StreamsTestUtils.toList(windowStore.fetch("1", "2", 0, 1000)); + + assertThat(data, equalTo(Arrays.asList( + KeyValue.pair(new Windowed<>("1", new TimeWindow(0, 500)), 2L), + KeyValue.pair(new Windowed<>("1", new TimeWindow(500, 1000)), 1L), + KeyValue.pair(new Windowed<>("2", new TimeWindow(500, 1000)), 1L)))); + } } - @SuppressWarnings("unchecked") @Test public void shouldMaterializeReduced() { windowedStream.reduce(MockReducer.STRING_ADDER, @@ -145,17 +152,18 @@ public class TimeWindowedKStreamImplTest { .withKeySerde(Serdes.String()) .withValueSerde(Serdes.String())); - processData(); - final WindowStore windowStore = (WindowStore) driver.allStateStores().get("reduced"); - final List, String>> data = StreamsTestUtils.toList(windowStore.fetch("1", "2", 0, 1000)); + try (final TopologyTestDriver driver = new TopologyTestDriver(builder.build(), props, 0L)) { + processData(driver); + final WindowStore windowStore = driver.getWindowStore("reduced"); + final List, String>> data = StreamsTestUtils.toList(windowStore.fetch("1", "2", 0, 1000)); - assertThat(data, equalTo(Arrays.asList( - KeyValue.pair(new Windowed<>("1", new TimeWindow(0, 500)), "1+2"), - KeyValue.pair(new Windowed<>("1", new TimeWindow(500, 1000)), "3"), - KeyValue.pair(new Windowed<>("2", new TimeWindow(500, 1000)), "1")))); + assertThat(data, equalTo(Arrays.asList( + KeyValue.pair(new Windowed<>("1", new TimeWindow(0, 500)), "1+2"), + KeyValue.pair(new Windowed<>("1", new TimeWindow(500, 1000)), "3"), + KeyValue.pair(new Windowed<>("2", new TimeWindow(500, 1000)), "1")))); + } } - @SuppressWarnings("unchecked") @Test public void shouldMaterializeAggregated() { windowedStream.aggregate(MockInitializer.STRING_INIT, @@ -164,13 +172,15 @@ public class TimeWindowedKStreamImplTest { .withKeySerde(Serdes.String()) .withValueSerde(Serdes.String())); - processData(); - final WindowStore windowStore = (WindowStore) driver.allStateStores().get("aggregated"); - final List, String>> data = StreamsTestUtils.toList(windowStore.fetch("1", "2", 0, 1000)); - assertThat(data, equalTo(Arrays.asList( - KeyValue.pair(new Windowed<>("1", new TimeWindow(0, 500)), "0+1+2"), - KeyValue.pair(new Windowed<>("1", new TimeWindow(500, 1000)), "0+3"), - KeyValue.pair(new Windowed<>("2", new TimeWindow(500, 1000)), "0+1")))); + try (final TopologyTestDriver driver = new TopologyTestDriver(builder.build(), props, 0L)) { + processData(driver); + final WindowStore windowStore = driver.getWindowStore("aggregated"); + final List, String>> data = StreamsTestUtils.toList(windowStore.fetch("1", "2", 0, 1000)); + assertThat(data, equalTo(Arrays.asList( + KeyValue.pair(new Windowed<>("1", new TimeWindow(0, 500)), "0+1+2"), + KeyValue.pair(new Windowed<>("1", new TimeWindow(500, 1000)), "0+3"), + KeyValue.pair(new Windowed<>("2", new TimeWindow(500, 1000)), "0+1")))); + } } @Test(expected = NullPointerException.class) @@ -227,16 +237,11 @@ public class TimeWindowedKStreamImplTest { windowedStream.count(null); } - private void processData() { - driver.setUp(builder, TestUtils.tempDirectory(), 0); - driver.setTime(10); - driver.process(TOPIC, "1", "1"); - driver.setTime(15); - driver.process(TOPIC, "1", "2"); - driver.setTime(500); - driver.process(TOPIC, "1", "3"); - driver.process(TOPIC, "2", "1"); - driver.flushState(); + private void processData(final TopologyTestDriver driver) { + driver.pipeInput(recordFactory.create(TOPIC, "1", "1", 10L)); + driver.pipeInput(recordFactory.create(TOPIC, "1", "2", 15L)); + driver.pipeInput(recordFactory.create(TOPIC, "1", "3", 500L)); + driver.pipeInput(recordFactory.create(TOPIC, "2", "1", 500L)); } } \ No newline at end of file