|
|
|
@ -18,32 +18,33 @@
@@ -18,32 +18,33 @@
|
|
|
|
|
package org.apache.kafka.streams.kstream.internals; |
|
|
|
|
|
|
|
|
|
import org.apache.kafka.common.serialization.Serdes; |
|
|
|
|
import org.apache.kafka.common.serialization.StringSerializer; |
|
|
|
|
import org.apache.kafka.common.utils.Bytes; |
|
|
|
|
import org.apache.kafka.streams.kstream.Consumed; |
|
|
|
|
import org.apache.kafka.streams.KeyValue; |
|
|
|
|
import org.apache.kafka.streams.StreamsBuilder; |
|
|
|
|
import org.apache.kafka.streams.TopologyTestDriver; |
|
|
|
|
import org.apache.kafka.streams.kstream.Consumed; |
|
|
|
|
import org.apache.kafka.streams.kstream.ForeachAction; |
|
|
|
|
import org.apache.kafka.streams.kstream.KStream; |
|
|
|
|
import org.apache.kafka.streams.kstream.Materialized; |
|
|
|
|
import org.apache.kafka.streams.kstream.Serialized; |
|
|
|
|
import org.apache.kafka.streams.kstream.TimeWindowedKStream; |
|
|
|
|
import org.apache.kafka.streams.kstream.TimeWindows; |
|
|
|
|
import org.apache.kafka.streams.kstream.Windowed; |
|
|
|
|
import org.apache.kafka.streams.kstream.TimeWindowedKStream; |
|
|
|
|
import org.apache.kafka.streams.state.WindowStore; |
|
|
|
|
import org.apache.kafka.test.KStreamTestDriver; |
|
|
|
|
import org.apache.kafka.streams.test.ConsumerRecordFactory; |
|
|
|
|
import org.apache.kafka.test.MockAggregator; |
|
|
|
|
import org.apache.kafka.test.MockInitializer; |
|
|
|
|
import org.apache.kafka.test.MockReducer; |
|
|
|
|
import org.apache.kafka.test.StreamsTestUtils; |
|
|
|
|
import org.apache.kafka.test.TestUtils; |
|
|
|
|
import org.junit.Before; |
|
|
|
|
import org.junit.Rule; |
|
|
|
|
import org.junit.Test; |
|
|
|
|
|
|
|
|
|
import java.util.Arrays; |
|
|
|
|
import java.util.HashMap; |
|
|
|
|
import java.util.List; |
|
|
|
|
import java.util.Map; |
|
|
|
|
import java.util.Properties; |
|
|
|
|
|
|
|
|
|
import static org.hamcrest.CoreMatchers.equalTo; |
|
|
|
|
import static org.hamcrest.MatcherAssert.assertThat; |
|
|
|
@ -52,9 +53,8 @@ public class TimeWindowedKStreamImplTest {
@@ -52,9 +53,8 @@ public class TimeWindowedKStreamImplTest {
|
|
|
|
|
|
|
|
|
|
private static final String TOPIC = "input"; |
|
|
|
|
private final StreamsBuilder builder = new StreamsBuilder(); |
|
|
|
|
|
|
|
|
|
@Rule |
|
|
|
|
public final KStreamTestDriver driver = new KStreamTestDriver(); |
|
|
|
|
private final ConsumerRecordFactory<String, String> recordFactory = new ConsumerRecordFactory<>(new StringSerializer(), new StringSerializer()); |
|
|
|
|
private final Properties props = StreamsTestUtils.topologyTestConfig(Serdes.String(), Serdes.String()); |
|
|
|
|
private TimeWindowedKStream<String, String> windowedStream; |
|
|
|
|
|
|
|
|
|
@Before |
|
|
|
@ -76,7 +76,9 @@ public class TimeWindowedKStreamImplTest {
@@ -76,7 +76,9 @@ public class TimeWindowedKStreamImplTest {
|
|
|
|
|
} |
|
|
|
|
}); |
|
|
|
|
|
|
|
|
|
processData(); |
|
|
|
|
try (final TopologyTestDriver driver = new TopologyTestDriver(builder.build(), props, 0L)) { |
|
|
|
|
processData(driver); |
|
|
|
|
} |
|
|
|
|
assertThat(results.get(new Windowed<>("1", new TimeWindow(0, 500))), equalTo(2L)); |
|
|
|
|
assertThat(results.get(new Windowed<>("2", new TimeWindow(500, 1000))), equalTo(1L)); |
|
|
|
|
assertThat(results.get(new Windowed<>("1", new TimeWindow(500, 1000))), equalTo(1L)); |
|
|
|
@ -95,7 +97,9 @@ public class TimeWindowedKStreamImplTest {
@@ -95,7 +97,9 @@ public class TimeWindowedKStreamImplTest {
|
|
|
|
|
} |
|
|
|
|
}); |
|
|
|
|
|
|
|
|
|
processData(); |
|
|
|
|
try (final TopologyTestDriver driver = new TopologyTestDriver(builder.build(), props, 0L)) { |
|
|
|
|
processData(driver); |
|
|
|
|
} |
|
|
|
|
assertThat(results.get(new Windowed<>("1", new TimeWindow(0, 500))), equalTo("1+2")); |
|
|
|
|
assertThat(results.get(new Windowed<>("2", new TimeWindow(500, 1000))), equalTo("1")); |
|
|
|
|
assertThat(results.get(new Windowed<>("1", new TimeWindow(500, 1000))), equalTo("3")); |
|
|
|
@ -115,29 +119,32 @@ public class TimeWindowedKStreamImplTest {
@@ -115,29 +119,32 @@ public class TimeWindowedKStreamImplTest {
|
|
|
|
|
results.put(key, value); |
|
|
|
|
} |
|
|
|
|
}); |
|
|
|
|
processData(); |
|
|
|
|
try (final TopologyTestDriver driver = new TopologyTestDriver(builder.build(), props, 0L)) { |
|
|
|
|
processData(driver); |
|
|
|
|
} |
|
|
|
|
assertThat(results.get(new Windowed<>("1", new TimeWindow(0, 500))), equalTo("0+1+2")); |
|
|
|
|
assertThat(results.get(new Windowed<>("2", new TimeWindow(500, 1000))), equalTo("0+1")); |
|
|
|
|
assertThat(results.get(new Windowed<>("1", new TimeWindow(500, 1000))), equalTo("0+3")); |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
@SuppressWarnings("unchecked") |
|
|
|
|
@Test |
|
|
|
|
public void shouldMaterializeCount() { |
|
|
|
|
windowedStream.count(Materialized.<String, Long, WindowStore<Bytes, byte[]>>as("count-store") |
|
|
|
|
.withKeySerde(Serdes.String()) |
|
|
|
|
.withValueSerde(Serdes.Long())); |
|
|
|
|
|
|
|
|
|
processData(); |
|
|
|
|
final WindowStore<String, Long> windowStore = (WindowStore<String, Long>) driver.allStateStores().get("count-store"); |
|
|
|
|
final List<KeyValue<Windowed<String>, Long>> data = StreamsTestUtils.toList(windowStore.fetch("1", "2", 0, 1000)); |
|
|
|
|
assertThat(data, equalTo(Arrays.asList( |
|
|
|
|
KeyValue.pair(new Windowed<>("1", new TimeWindow(0, 500)), 2L), |
|
|
|
|
KeyValue.pair(new Windowed<>("1", new TimeWindow(500, 1000)), 1L), |
|
|
|
|
KeyValue.pair(new Windowed<>("2", new TimeWindow(500, 1000)), 1L)))); |
|
|
|
|
try (final TopologyTestDriver driver = new TopologyTestDriver(builder.build(), props, 0L)) { |
|
|
|
|
processData(driver); |
|
|
|
|
final WindowStore<String, Long> windowStore = driver.getWindowStore("count-store"); |
|
|
|
|
final List<KeyValue<Windowed<String>, Long>> data = StreamsTestUtils.toList(windowStore.fetch("1", "2", 0, 1000)); |
|
|
|
|
|
|
|
|
|
assertThat(data, equalTo(Arrays.asList( |
|
|
|
|
KeyValue.pair(new Windowed<>("1", new TimeWindow(0, 500)), 2L), |
|
|
|
|
KeyValue.pair(new Windowed<>("1", new TimeWindow(500, 1000)), 1L), |
|
|
|
|
KeyValue.pair(new Windowed<>("2", new TimeWindow(500, 1000)), 1L)))); |
|
|
|
|
} |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
@SuppressWarnings("unchecked") |
|
|
|
|
@Test |
|
|
|
|
public void shouldMaterializeReduced() { |
|
|
|
|
windowedStream.reduce(MockReducer.STRING_ADDER, |
|
|
|
@ -145,17 +152,18 @@ public class TimeWindowedKStreamImplTest {
@@ -145,17 +152,18 @@ public class TimeWindowedKStreamImplTest {
|
|
|
|
|
.withKeySerde(Serdes.String()) |
|
|
|
|
.withValueSerde(Serdes.String())); |
|
|
|
|
|
|
|
|
|
processData(); |
|
|
|
|
final WindowStore<String, String> windowStore = (WindowStore<String, String>) driver.allStateStores().get("reduced"); |
|
|
|
|
final List<KeyValue<Windowed<String>, String>> data = StreamsTestUtils.toList(windowStore.fetch("1", "2", 0, 1000)); |
|
|
|
|
try (final TopologyTestDriver driver = new TopologyTestDriver(builder.build(), props, 0L)) { |
|
|
|
|
processData(driver); |
|
|
|
|
final WindowStore<String, String> windowStore = driver.getWindowStore("reduced"); |
|
|
|
|
final List<KeyValue<Windowed<String>, String>> data = StreamsTestUtils.toList(windowStore.fetch("1", "2", 0, 1000)); |
|
|
|
|
|
|
|
|
|
assertThat(data, equalTo(Arrays.asList( |
|
|
|
|
KeyValue.pair(new Windowed<>("1", new TimeWindow(0, 500)), "1+2"), |
|
|
|
|
KeyValue.pair(new Windowed<>("1", new TimeWindow(500, 1000)), "3"), |
|
|
|
|
KeyValue.pair(new Windowed<>("2", new TimeWindow(500, 1000)), "1")))); |
|
|
|
|
assertThat(data, equalTo(Arrays.asList( |
|
|
|
|
KeyValue.pair(new Windowed<>("1", new TimeWindow(0, 500)), "1+2"), |
|
|
|
|
KeyValue.pair(new Windowed<>("1", new TimeWindow(500, 1000)), "3"), |
|
|
|
|
KeyValue.pair(new Windowed<>("2", new TimeWindow(500, 1000)), "1")))); |
|
|
|
|
} |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
@SuppressWarnings("unchecked") |
|
|
|
|
@Test |
|
|
|
|
public void shouldMaterializeAggregated() { |
|
|
|
|
windowedStream.aggregate(MockInitializer.STRING_INIT, |
|
|
|
@ -164,13 +172,15 @@ public class TimeWindowedKStreamImplTest {
@@ -164,13 +172,15 @@ public class TimeWindowedKStreamImplTest {
|
|
|
|
|
.withKeySerde(Serdes.String()) |
|
|
|
|
.withValueSerde(Serdes.String())); |
|
|
|
|
|
|
|
|
|
processData(); |
|
|
|
|
final WindowStore<String, String> windowStore = (WindowStore<String, String>) driver.allStateStores().get("aggregated"); |
|
|
|
|
final List<KeyValue<Windowed<String>, String>> data = StreamsTestUtils.toList(windowStore.fetch("1", "2", 0, 1000)); |
|
|
|
|
assertThat(data, equalTo(Arrays.asList( |
|
|
|
|
KeyValue.pair(new Windowed<>("1", new TimeWindow(0, 500)), "0+1+2"), |
|
|
|
|
KeyValue.pair(new Windowed<>("1", new TimeWindow(500, 1000)), "0+3"), |
|
|
|
|
KeyValue.pair(new Windowed<>("2", new TimeWindow(500, 1000)), "0+1")))); |
|
|
|
|
try (final TopologyTestDriver driver = new TopologyTestDriver(builder.build(), props, 0L)) { |
|
|
|
|
processData(driver); |
|
|
|
|
final WindowStore<String, String> windowStore = driver.getWindowStore("aggregated"); |
|
|
|
|
final List<KeyValue<Windowed<String>, String>> data = StreamsTestUtils.toList(windowStore.fetch("1", "2", 0, 1000)); |
|
|
|
|
assertThat(data, equalTo(Arrays.asList( |
|
|
|
|
KeyValue.pair(new Windowed<>("1", new TimeWindow(0, 500)), "0+1+2"), |
|
|
|
|
KeyValue.pair(new Windowed<>("1", new TimeWindow(500, 1000)), "0+3"), |
|
|
|
|
KeyValue.pair(new Windowed<>("2", new TimeWindow(500, 1000)), "0+1")))); |
|
|
|
|
} |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
@Test(expected = NullPointerException.class) |
|
|
|
@ -227,16 +237,11 @@ public class TimeWindowedKStreamImplTest {
@@ -227,16 +237,11 @@ public class TimeWindowedKStreamImplTest {
|
|
|
|
|
windowedStream.count(null); |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
private void processData() { |
|
|
|
|
driver.setUp(builder, TestUtils.tempDirectory(), 0); |
|
|
|
|
driver.setTime(10); |
|
|
|
|
driver.process(TOPIC, "1", "1"); |
|
|
|
|
driver.setTime(15); |
|
|
|
|
driver.process(TOPIC, "1", "2"); |
|
|
|
|
driver.setTime(500); |
|
|
|
|
driver.process(TOPIC, "1", "3"); |
|
|
|
|
driver.process(TOPIC, "2", "1"); |
|
|
|
|
driver.flushState(); |
|
|
|
|
private void processData(final TopologyTestDriver driver) { |
|
|
|
|
driver.pipeInput(recordFactory.create(TOPIC, "1", "1", 10L)); |
|
|
|
|
driver.pipeInput(recordFactory.create(TOPIC, "1", "2", 15L)); |
|
|
|
|
driver.pipeInput(recordFactory.create(TOPIC, "1", "3", 500L)); |
|
|
|
|
driver.pipeInput(recordFactory.create(TOPIC, "2", "1", 500L)); |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
} |