From 77e6e8ec054608a30626271b4952b63294a93c3b Mon Sep 17 00:00:00 2001 From: "Matthias J. Sax" Date: Thu, 30 May 2019 09:46:12 -0700 Subject: [PATCH] KAFKA-6455: Update integration tests to verify result timestamps (#6751) Reviewers: Bill Bejeck , John Roesler --- .../kstream/internals/KTableReduce.java | 3 +- .../AbstractJoinIntegrationTest.java | 68 ++- .../AbstractResetIntegrationTest.java | 36 +- .../FineGrainedAutoResetIntegrationTest.java | 7 +- .../GlobalKTableIntegrationTest.java | 113 +++-- ...StreamAggregationDedupIntegrationTest.java | 63 ++- .../KStreamAggregationIntegrationTest.java | 227 +++++---- .../StreamStreamJoinIntegrationTest.java | 349 +++++++++---- .../StreamTableJoinIntegrationTest.java | 72 +-- .../TableTableJoinIntegrationTest.java | 479 +++++++++++------- .../utils/IntegrationTestUtils.java | 91 +++- .../kafka/test/MockProcessorSupplier.java | 6 +- 12 files changed, 922 insertions(+), 592 deletions(-) diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableReduce.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableReduce.java index 1b1a2bf3420..171912fd23a 100644 --- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableReduce.java +++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableReduce.java @@ -80,7 +80,7 @@ public class KTableReduce implements KTableProcessorSupplier { final ValueAndTimestamp oldAggAndTimestamp = store.get(key); final V oldAgg = getValueOrNull(oldAggAndTimestamp); final V intermediateAgg; - long newTimestamp = context().timestamp(); + long newTimestamp; // first try to remove the old value if (value.oldValue != null && oldAgg != null) { @@ -88,6 +88,7 @@ public class KTableReduce implements KTableProcessorSupplier { newTimestamp = Math.max(context().timestamp(), oldAggAndTimestamp.timestamp()); } else { intermediateAgg = oldAgg; + newTimestamp = context().timestamp(); } // then try to add the new value diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/AbstractJoinIntegrationTest.java b/streams/src/test/java/org/apache/kafka/streams/integration/AbstractJoinIntegrationTest.java index 7fa108d59a6..daea1d732ae 100644 --- a/streams/src/test/java/org/apache/kafka/streams/integration/AbstractJoinIntegrationTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/integration/AbstractJoinIntegrationTest.java @@ -27,6 +27,7 @@ import org.apache.kafka.common.serialization.StringDeserializer; import org.apache.kafka.common.serialization.StringSerializer; import org.apache.kafka.streams.KafkaStreams; import org.apache.kafka.streams.KeyValue; +import org.apache.kafka.streams.KeyValueTimestamp; import org.apache.kafka.streams.StreamsBuilder; import org.apache.kafka.streams.StreamsConfig; import org.apache.kafka.streams.integration.utils.EmbeddedKafkaCluster; @@ -35,6 +36,7 @@ import org.apache.kafka.streams.kstream.ValueJoiner; import org.apache.kafka.streams.state.KeyValueIterator; import org.apache.kafka.streams.state.QueryableStoreTypes; import org.apache.kafka.streams.state.ReadOnlyKeyValueStore; +import org.apache.kafka.streams.state.ValueAndTimestamp; import org.apache.kafka.test.IntegrationTest; import org.apache.kafka.test.TestUtils; import org.junit.After; @@ -51,6 +53,7 @@ import java.util.ArrayList; import java.util.Arrays; import java.util.Collection; import java.util.Iterator; +import java.util.LinkedList; import java.util.List; import java.util.Properties; import java.util.concurrent.atomic.AtomicBoolean; @@ -86,7 +89,7 @@ public abstract class AbstractJoinIntegrationTest { static final String INPUT_TOPIC_RIGHT = "inputTopicRight"; static final String INPUT_TOPIC_LEFT = "inputTopicLeft"; static final String OUTPUT_TOPIC = "outputTopic"; - private final long anyUniqueKey = 0L; + static final long ANY_UNIQUE_KEY = 0L; private final static Properties PRODUCER_CONFIG = new Properties(); private final static Properties RESULT_CONSUMER_CONFIG = new Properties(); @@ -163,13 +166,13 @@ public abstract class AbstractJoinIntegrationTest { CLUSTER.deleteAllTopicsAndWait(120000); } - private void checkResult(final String outputTopic, final List expectedResult) throws InterruptedException { - final List result = IntegrationTestUtils.waitUntilMinValuesRecordsReceived(RESULT_CONSUMER_CONFIG, outputTopic, expectedResult.size(), 30 * 1000L); - assertThat(result, is(expectedResult)); + private void checkResult(final String outputTopic, final List> expectedResult) throws InterruptedException { + IntegrationTestUtils.verifyKeyValueTimestamps(RESULT_CONSUMER_CONFIG, outputTopic, expectedResult); } - private void checkResult(final String outputTopic, final String expectedFinalResult, final int expectedTotalNumRecords) throws InterruptedException { - final List result = IntegrationTestUtils.waitUntilMinValuesRecordsReceived(RESULT_CONSUMER_CONFIG, outputTopic, expectedTotalNumRecords, 30 * 1000L); + private void checkResult(final String outputTopic, final KeyValueTimestamp expectedFinalResult, final int expectedTotalNumRecords) throws InterruptedException { + final List> result = + IntegrationTestUtils.waitUntilMinKeyValueWithTimestampRecordsReceived(RESULT_CONSUMER_CONFIG, outputTopic, expectedTotalNumRecords, 30 * 1000L); assertThat(result.get(result.size() - 1), is(expectedFinalResult)); } @@ -177,7 +180,7 @@ public abstract class AbstractJoinIntegrationTest { * Runs the actual test. Checks the result after each input record to ensure fixed processing order. * If an input tuple does not trigger any result, "expectedResult" should contain a "null" entry */ - void runTest(final List> expectedResult) throws Exception { + void runTest(final List>> expectedResult) throws Exception { runTest(expectedResult, null); } @@ -186,28 +189,34 @@ public abstract class AbstractJoinIntegrationTest { * Runs the actual test. Checks the result after each input record to ensure fixed processing order. * If an input tuple does not trigger any result, "expectedResult" should contain a "null" entry */ - void runTest(final List> expectedResult, final String storeName) throws Exception { + void runTest(final List>> expectedResult, final String storeName) throws Exception { assert expectedResult.size() == input.size(); IntegrationTestUtils.purgeLocalStreamsState(STREAMS_CONFIG); streams = new KafkaStreams(builder.build(), STREAMS_CONFIG); - String expectedFinalResult = null; + KeyValueTimestamp expectedFinalResult = null; try { streams.start(); - long ts = System.currentTimeMillis(); + final long firstTimestamp = System.currentTimeMillis(); + long ts = firstTimestamp; - final Iterator> resultIterator = expectedResult.iterator(); + final Iterator>> resultIterator = expectedResult.iterator(); for (final Input singleInput : input) { producer.send(new ProducerRecord<>(singleInput.topic, null, ++ts, singleInput.record.key, singleInput.record.value)).get(); - final List expected = resultIterator.next(); + final List> expected = resultIterator.next(); if (expected != null) { - checkResult(OUTPUT_TOPIC, expected); - expectedFinalResult = expected.get(expected.size() - 1); + final List> updatedExpected = new LinkedList<>(); + for (final KeyValueTimestamp record : expected) { + updatedExpected.add(new KeyValueTimestamp<>(record.key(), record.value(), firstTimestamp + record.timestamp())); + } + + checkResult(OUTPUT_TOPIC, updatedExpected); + expectedFinalResult = updatedExpected.get(expected.size() - 1); } } @@ -222,21 +231,22 @@ public abstract class AbstractJoinIntegrationTest { /* * Runs the actual test. Checks the final result only after expected number of records have been consumed. */ - void runTest(final String expectedFinalResult) throws Exception { + void runTest(final KeyValueTimestamp expectedFinalResult) throws Exception { runTest(expectedFinalResult, null); } /* * Runs the actual test. Checks the final result only after expected number of records have been consumed. */ - void runTest(final String expectedFinalResult, final String storeName) throws Exception { + void runTest(final KeyValueTimestamp expectedFinalResult, final String storeName) throws Exception { IntegrationTestUtils.purgeLocalStreamsState(STREAMS_CONFIG); streams = new KafkaStreams(builder.build(), STREAMS_CONFIG); try { streams.start(); - long ts = System.currentTimeMillis(); + final long firstTimestamp = System.currentTimeMillis(); + long ts = firstTimestamp; for (final Input singleInput : input) { producer.send(new ProducerRecord<>(singleInput.topic, null, ++ts, singleInput.record.key, singleInput.record.value)).get(); @@ -244,10 +254,15 @@ public abstract class AbstractJoinIntegrationTest { TestUtils.waitForCondition(() -> finalResultReached.get(), "Never received expected final result."); - checkResult(OUTPUT_TOPIC, expectedFinalResult, numRecordsExpected); + final KeyValueTimestamp updatedExpectedFinalResult = + new KeyValueTimestamp<>( + expectedFinalResult.key(), + expectedFinalResult.value(), + firstTimestamp + expectedFinalResult.timestamp()); + checkResult(OUTPUT_TOPIC, updatedExpectedFinalResult, numRecordsExpected); if (storeName != null) { - checkQueryableStore(storeName, expectedFinalResult); + checkQueryableStore(storeName, updatedExpectedFinalResult); } } finally { streams.close(); @@ -257,15 +272,16 @@ public abstract class AbstractJoinIntegrationTest { /* * Checks the embedded queryable state store snapshot */ - private void checkQueryableStore(final String queryableName, final String expectedFinalResult) { - final ReadOnlyKeyValueStore store = streams.store(queryableName, QueryableStoreTypes.keyValueStore()); + private void checkQueryableStore(final String queryableName, final KeyValueTimestamp expectedFinalResult) { + final ReadOnlyKeyValueStore> store = streams.store(queryableName, QueryableStoreTypes.timestampedKeyValueStore()); - final KeyValueIterator all = store.all(); - final KeyValue onlyEntry = all.next(); + final KeyValueIterator> all = store.all(); + final KeyValue> onlyEntry = all.next(); try { - assertThat(onlyEntry.key, is(anyUniqueKey)); - assertThat(onlyEntry.value, is(expectedFinalResult)); + assertThat(onlyEntry.key, is(expectedFinalResult.key())); + assertThat(onlyEntry.value.value(), is(expectedFinalResult.value())); + assertThat(onlyEntry.value.timestamp(), is(expectedFinalResult.timestamp())); assertThat(all.hasNext(), is(false)); } finally { all.close(); @@ -278,7 +294,7 @@ public abstract class AbstractJoinIntegrationTest { Input(final String topic, final V value) { this.topic = topic; - record = KeyValue.pair(anyUniqueKey, value); + record = KeyValue.pair(ANY_UNIQUE_KEY, value); } } } diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/AbstractResetIntegrationTest.java b/streams/src/test/java/org/apache/kafka/streams/integration/AbstractResetIntegrationTest.java index c9ae1bbcb85..000f2993c73 100644 --- a/streams/src/test/java/org/apache/kafka/streams/integration/AbstractResetIntegrationTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/integration/AbstractResetIntegrationTest.java @@ -16,7 +16,7 @@ */ package org.apache.kafka.streams.integration; -import java.time.Duration; +import kafka.tools.StreamsResetter; import org.apache.kafka.clients.CommonClientConfigs; import org.apache.kafka.clients.admin.AdminClient; import org.apache.kafka.clients.admin.ConsumerGroupDescription; @@ -39,10 +39,8 @@ import org.apache.kafka.streams.Topology; import org.apache.kafka.streams.integration.utils.EmbeddedKafkaCluster; import org.apache.kafka.streams.integration.utils.IntegrationTestUtils; import org.apache.kafka.streams.kstream.KStream; -import org.apache.kafka.streams.kstream.KeyValueMapper; import org.apache.kafka.streams.kstream.Produced; import org.apache.kafka.streams.kstream.TimeWindows; -import org.apache.kafka.streams.kstream.Windowed; import org.apache.kafka.test.IntegrationTest; import org.apache.kafka.test.TestCondition; import org.apache.kafka.test.TestUtils; @@ -56,6 +54,7 @@ import java.io.BufferedWriter; import java.io.File; import java.io.FileWriter; import java.text.SimpleDateFormat; +import java.time.Duration; import java.util.ArrayList; import java.util.Arrays; import java.util.Calendar; @@ -65,8 +64,6 @@ import java.util.Map; import java.util.Properties; import java.util.concurrent.ExecutionException; -import kafka.tools.StreamsResetter; - import static java.time.Duration.ofMillis; import static org.hamcrest.CoreMatchers.equalTo; import static org.hamcrest.MatcherAssert.assertThat; @@ -223,7 +220,7 @@ public abstract class AbstractResetIntegrationTest { } } - void shouldNotAllowToResetWhileStreamsIsRunning() throws Exception { + void shouldNotAllowToResetWhileStreamsIsRunning() { appID = testId + "-not-reset-during-runtime"; final String[] parameters = new String[] { "--application-id", appID, @@ -390,7 +387,6 @@ public abstract class AbstractResetIntegrationTest { final File resetFile = File.createTempFile("reset", ".csv"); try (final BufferedWriter writer = new BufferedWriter(new FileWriter(resetFile))) { writer.write(INPUT_TOPIC + ",0,1"); - writer.close(); } streams = new KafkaStreams(setupTopologyWithoutIntermediateUserTopic(), streamsConfig); @@ -434,7 +430,6 @@ public abstract class AbstractResetIntegrationTest { final File resetFile = File.createTempFile("reset", ".csv"); try (final BufferedWriter writer = new BufferedWriter(new FileWriter(resetFile))) { writer.write(INPUT_TOPIC + ",0,1"); - writer.close(); } streams = new KafkaStreams(setupTopologyWithoutIntermediateUserTopic(), streamsConfig); @@ -482,7 +477,6 @@ public abstract class AbstractResetIntegrationTest { final File resetFile = File.createTempFile("reset", ".csv"); try (final BufferedWriter writer = new BufferedWriter(new FileWriter(resetFile))) { writer.write(INPUT_TOPIC + ",0,1"); - writer.close(); } streams = new KafkaStreams(setupTopologyWithoutIntermediateUserTopic(), streamsConfig); @@ -514,12 +508,7 @@ public abstract class AbstractResetIntegrationTest { final KStream input = builder.stream(INPUT_TOPIC); // use map to trigger internal re-partitioning before groupByKey - input.map(new KeyValueMapper>() { - @Override - public KeyValue apply(final Long key, final String value) { - return new KeyValue<>(key, value); - } - }) + input.map(KeyValue::new) .groupByKey() .count() .toStream() @@ -530,12 +519,7 @@ public abstract class AbstractResetIntegrationTest { .windowedBy(TimeWindows.of(ofMillis(35)).advanceBy(ofMillis(10))) .count() .toStream() - .map(new KeyValueMapper, Long, KeyValue>() { - @Override - public KeyValue apply(final Windowed key, final Long value) { - return new KeyValue<>(key.window().start() + key.window().end(), value); - } - }) + .map((key, value) -> new KeyValue<>(key.window().start() + key.window().end(), value)) .to(outputTopic2, Produced.with(Serdes.Long(), Serdes.Long())); return builder.build(); @@ -547,12 +531,8 @@ public abstract class AbstractResetIntegrationTest { final KStream input = builder.stream(INPUT_TOPIC); // use map to trigger internal re-partitioning before groupByKey - input.map(new KeyValueMapper>() { - @Override - public KeyValue apply(final Long key, final String value) { - return new KeyValue<>(key, key); - } - }).to(OUTPUT_TOPIC, Produced.with(Serdes.Long(), Serdes.Long())); + input.map((key, value) -> new KeyValue<>(key, key)) + .to(OUTPUT_TOPIC, Produced.with(Serdes.Long(), Serdes.Long())); return builder.build(); } @@ -590,7 +570,7 @@ public abstract class AbstractResetIntegrationTest { parameterList.add(resetScenarioArg); } - final String[] parameters = parameterList.toArray(new String[parameterList.size()]); + final String[] parameters = parameterList.toArray(new String[0]); final Properties cleanUpConfig = new Properties(); cleanUpConfig.put(ConsumerConfig.HEARTBEAT_INTERVAL_MS_CONFIG, 100); diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/FineGrainedAutoResetIntegrationTest.java b/streams/src/test/java/org/apache/kafka/streams/integration/FineGrainedAutoResetIntegrationTest.java index 87d6a169309..45ea71cf9f5 100644 --- a/streams/src/test/java/org/apache/kafka/streams/integration/FineGrainedAutoResetIntegrationTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/integration/FineGrainedAutoResetIntegrationTest.java @@ -17,6 +17,7 @@ package org.apache.kafka.streams.integration; +import kafka.utils.MockTime; import org.apache.kafka.clients.consumer.ConsumerConfig; import org.apache.kafka.clients.consumer.KafkaConsumer; import org.apache.kafka.clients.consumer.OffsetAndMetadata; @@ -55,8 +56,6 @@ import java.util.Map; import java.util.Properties; import java.util.regex.Pattern; -import kafka.utils.MockTime; - import static org.hamcrest.CoreMatchers.equalTo; import static org.hamcrest.CoreMatchers.is; import static org.hamcrest.MatcherAssert.assertThat; @@ -191,8 +190,8 @@ public class FineGrainedAutoResetIntegrationTest { final StreamsBuilder builder = new StreamsBuilder(); - final KStream pattern1Stream = builder.stream(Pattern.compile("topic-\\d" + topicSuffix), Consumed.with(Topology.AutoOffsetReset.EARLIEST)); - final KStream pattern2Stream = builder.stream(Pattern.compile("topic-[A-D]" + topicSuffix), Consumed.with(Topology.AutoOffsetReset.LATEST)); + final KStream pattern1Stream = builder.stream(Pattern.compile("topic-\\d" + topicSuffix), Consumed.with(Topology.AutoOffsetReset.EARLIEST)); + final KStream pattern2Stream = builder.stream(Pattern.compile("topic-[A-D]" + topicSuffix), Consumed.with(Topology.AutoOffsetReset.LATEST)); final KStream namedTopicsStream = builder.stream(Arrays.asList(topicY, topicZ)); pattern1Stream.to(outputTopic, Produced.with(stringSerde, stringSerde)); diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/GlobalKTableIntegrationTest.java b/streams/src/test/java/org/apache/kafka/streams/integration/GlobalKTableIntegrationTest.java index d3e0d245693..0a9148d61fb 100644 --- a/streams/src/test/java/org/apache/kafka/streams/integration/GlobalKTableIntegrationTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/integration/GlobalKTableIntegrationTest.java @@ -29,7 +29,6 @@ import org.apache.kafka.streams.StreamsConfig; import org.apache.kafka.streams.integration.utils.EmbeddedKafkaCluster; import org.apache.kafka.streams.integration.utils.IntegrationTestUtils; import org.apache.kafka.streams.kstream.Consumed; -import org.apache.kafka.streams.kstream.ForeachAction; import org.apache.kafka.streams.kstream.GlobalKTable; import org.apache.kafka.streams.kstream.KStream; import org.apache.kafka.streams.kstream.KeyValueMapper; @@ -39,7 +38,9 @@ import org.apache.kafka.streams.state.KeyValueStore; import org.apache.kafka.streams.state.QueryableStoreTypes; import org.apache.kafka.streams.state.ReadOnlyKeyValueStore; import org.apache.kafka.streams.state.Stores; +import org.apache.kafka.streams.state.ValueAndTimestamp; import org.apache.kafka.test.IntegrationTest; +import org.apache.kafka.test.MockProcessorSupplier; import org.apache.kafka.test.TestUtils; import org.junit.After; import org.junit.Before; @@ -68,7 +69,6 @@ public class GlobalKTableIntegrationTest { private final KeyValueMapper keyMapper = (key, value) -> value; private final ValueJoiner joiner = (value1, value2) -> value1 + "+" + value2; private final String globalStore = "globalStore"; - private final Map results = new HashMap<>(); private StreamsBuilder builder; private Properties streamsConfiguration; private KafkaStreams kafkaStreams; @@ -76,7 +76,7 @@ public class GlobalKTableIntegrationTest { private String streamTopic; private GlobalKTable globalTable; private KStream stream; - private ForeachAction foreachAction; + private MockProcessorSupplier supplier; @Before public void before() throws Exception { @@ -96,7 +96,7 @@ public class GlobalKTableIntegrationTest { .withValueSerde(Serdes.String())); final Consumed stringLongConsumed = Consumed.with(Serdes.String(), Serdes.Long()); stream = builder.stream(streamTopic, stringLongConsumed); - foreachAction = results::put; + supplier = new MockProcessorSupplier<>(); } @After @@ -110,24 +110,34 @@ public class GlobalKTableIntegrationTest { @Test public void shouldKStreamGlobalKTableLeftJoin() throws Exception { final KStream streamTableJoin = stream.leftJoin(globalTable, keyMapper, joiner); - streamTableJoin.foreach(foreachAction); + streamTableJoin.process(supplier); produceInitialGlobalTableValues(); startStreams(); + long firstTimestamp = mockTime.milliseconds(); produceTopicValues(streamTopic); - final Map expected = new HashMap<>(); - expected.put("a", "1+A"); - expected.put("b", "2+B"); - expected.put("c", "3+C"); - expected.put("d", "4+D"); - expected.put("e", "5+null"); + final Map> expected = new HashMap<>(); + expected.put("a", ValueAndTimestamp.make("1+A", firstTimestamp)); + expected.put("b", ValueAndTimestamp.make("2+B", firstTimestamp + 1L)); + expected.put("c", ValueAndTimestamp.make("3+C", firstTimestamp + 2L)); + expected.put("d", ValueAndTimestamp.make("4+D", firstTimestamp + 3L)); + expected.put("e", ValueAndTimestamp.make("5+null", firstTimestamp + 4L)); TestUtils.waitForCondition( - () -> results.equals(expected), + () -> { + if (supplier.capturedProcessorsCount() < 2) { + return false; + } + final Map> result = new HashMap<>(); + result.putAll(supplier.capturedProcessors(2).get(0).lastValueAndTimestampPerKey); + result.putAll(supplier.capturedProcessors(2).get(1).lastValueAndTimestampPerKey); + return result.equals(expected); + }, 30000L, "waiting for initial values"); + firstTimestamp = mockTime.milliseconds(); produceGlobalTableValues(); final ReadOnlyKeyValueStore replicatedStore = @@ -138,16 +148,29 @@ public class GlobalKTableIntegrationTest { 30000, "waiting for data in replicated store"); + final ReadOnlyKeyValueStore> replicatedStoreWithTimestamp = + kafkaStreams.store(globalStore, QueryableStoreTypes.timestampedKeyValueStore()); + assertThat(replicatedStoreWithTimestamp.get(5L), equalTo(ValueAndTimestamp.make("J", firstTimestamp + 4L))); + + firstTimestamp = mockTime.milliseconds(); produceTopicValues(streamTopic); - expected.put("a", "1+F"); - expected.put("b", "2+G"); - expected.put("c", "3+H"); - expected.put("d", "4+I"); - expected.put("e", "5+J"); + expected.put("a", ValueAndTimestamp.make("1+F", firstTimestamp)); + expected.put("b", ValueAndTimestamp.make("2+G", firstTimestamp + 1L)); + expected.put("c", ValueAndTimestamp.make("3+H", firstTimestamp + 2L)); + expected.put("d", ValueAndTimestamp.make("4+I", firstTimestamp + 3L)); + expected.put("e", ValueAndTimestamp.make("5+J", firstTimestamp + 4L)); TestUtils.waitForCondition( - () -> results.equals(expected), + () -> { + if (supplier.capturedProcessorsCount() < 2) { + return false; + } + final Map> result = new HashMap<>(); + result.putAll(supplier.capturedProcessors(2).get(0).lastValueAndTimestampPerKey); + result.putAll(supplier.capturedProcessors(2).get(1).lastValueAndTimestampPerKey); + return result.equals(expected); + }, 30000L, "waiting for final values"); } @@ -155,23 +178,33 @@ public class GlobalKTableIntegrationTest { @Test public void shouldKStreamGlobalKTableJoin() throws Exception { final KStream streamTableJoin = stream.join(globalTable, keyMapper, joiner); - streamTableJoin.foreach(foreachAction); + streamTableJoin.process(supplier); produceInitialGlobalTableValues(); startStreams(); + long firstTimestamp = mockTime.milliseconds(); produceTopicValues(streamTopic); - final Map expected = new HashMap<>(); - expected.put("a", "1+A"); - expected.put("b", "2+B"); - expected.put("c", "3+C"); - expected.put("d", "4+D"); + final Map> expected = new HashMap<>(); + expected.put("a", ValueAndTimestamp.make("1+A", firstTimestamp)); + expected.put("b", ValueAndTimestamp.make("2+B", firstTimestamp + 1L)); + expected.put("c", ValueAndTimestamp.make("3+C", firstTimestamp + 2L)); + expected.put("d", ValueAndTimestamp.make("4+D", firstTimestamp + 3L)); TestUtils.waitForCondition( - () -> results.equals(expected), + () -> { + if (supplier.capturedProcessorsCount() < 2) { + return false; + } + final Map> result = new HashMap<>(); + result.putAll(supplier.capturedProcessors(2).get(0).lastValueAndTimestampPerKey); + result.putAll(supplier.capturedProcessors(2).get(1).lastValueAndTimestampPerKey); + return result.equals(expected); + }, 30000L, "waiting for initial values"); + firstTimestamp = mockTime.milliseconds(); produceGlobalTableValues(); final ReadOnlyKeyValueStore replicatedStore = @@ -182,16 +215,29 @@ public class GlobalKTableIntegrationTest { 30000, "waiting for data in replicated store"); + final ReadOnlyKeyValueStore> replicatedStoreWithTimestamp = + kafkaStreams.store(globalStore, QueryableStoreTypes.timestampedKeyValueStore()); + assertThat(replicatedStoreWithTimestamp.get(5L), equalTo(ValueAndTimestamp.make("J", firstTimestamp + 4L))); + + firstTimestamp = mockTime.milliseconds(); produceTopicValues(streamTopic); - expected.put("a", "1+F"); - expected.put("b", "2+G"); - expected.put("c", "3+H"); - expected.put("d", "4+I"); - expected.put("e", "5+J"); + expected.put("a", ValueAndTimestamp.make("1+F", firstTimestamp)); + expected.put("b", ValueAndTimestamp.make("2+G", firstTimestamp + 1L)); + expected.put("c", ValueAndTimestamp.make("3+H", firstTimestamp + 2L)); + expected.put("d", ValueAndTimestamp.make("4+I", firstTimestamp + 3L)); + expected.put("e", ValueAndTimestamp.make("5+J", firstTimestamp + 4L)); TestUtils.waitForCondition( - () -> results.equals(expected), + () -> { + if (supplier.capturedProcessorsCount() < 2) { + return false; + } + final Map> result = new HashMap<>(); + result.putAll(supplier.capturedProcessors(2).get(0).lastValueAndTimestampPerKey); + result.putAll(supplier.capturedProcessors(2).get(1).lastValueAndTimestampPerKey); + return result.equals(expected); + }, 30000L, "waiting for final values"); } @@ -209,11 +255,16 @@ public class GlobalKTableIntegrationTest { startStreams(); ReadOnlyKeyValueStore store = kafkaStreams.store(globalStore, QueryableStoreTypes.keyValueStore()); assertThat(store.approximateNumEntries(), equalTo(4L)); + ReadOnlyKeyValueStore> timestampedStore = + kafkaStreams.store(globalStore, QueryableStoreTypes.timestampedKeyValueStore()); + assertThat(timestampedStore.approximateNumEntries(), equalTo(4L)); kafkaStreams.close(); startStreams(); store = kafkaStreams.store(globalStore, QueryableStoreTypes.keyValueStore()); assertThat(store.approximateNumEntries(), equalTo(4L)); + timestampedStore = kafkaStreams.store(globalStore, QueryableStoreTypes.timestampedKeyValueStore()); + assertThat(timestampedStore.approximateNumEntries(), equalTo(4L)); } private void createTopics() throws Exception { diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/KStreamAggregationDedupIntegrationTest.java b/streams/src/test/java/org/apache/kafka/streams/integration/KStreamAggregationDedupIntegrationTest.java index 14346cf195e..61f6356853a 100644 --- a/streams/src/test/java/org/apache/kafka/streams/integration/KStreamAggregationDedupIntegrationTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/integration/KStreamAggregationDedupIntegrationTest.java @@ -26,6 +26,7 @@ import org.apache.kafka.common.serialization.StringDeserializer; import org.apache.kafka.common.serialization.StringSerializer; import org.apache.kafka.streams.KafkaStreams; import org.apache.kafka.streams.KeyValue; +import org.apache.kafka.streams.KeyValueTimestamp; import org.apache.kafka.streams.StreamsBuilder; import org.apache.kafka.streams.StreamsConfig; import org.apache.kafka.streams.integration.utils.EmbeddedKafkaCluster; @@ -66,8 +67,7 @@ public class KStreamAggregationDedupIntegrationTest { private static final long COMMIT_INTERVAL_MS = 300L; @ClassRule - public static final EmbeddedKafkaCluster CLUSTER = - new EmbeddedKafkaCluster(NUM_BROKERS); + public static final EmbeddedKafkaCluster CLUSTER = new EmbeddedKafkaCluster(NUM_BROKERS); private final MockTime mockTime = CLUSTER.time; private static volatile AtomicInteger testNo = new AtomicInteger(0); @@ -119,17 +119,18 @@ public class KStreamAggregationDedupIntegrationTest { startStreams(); - produceMessages(System.currentTimeMillis()); + final long timestamp = System.currentTimeMillis(); + produceMessages(timestamp); validateReceivedMessages( new StringDeserializer(), new StringDeserializer(), Arrays.asList( - KeyValue.pair("A", "A:A"), - KeyValue.pair("B", "B:B"), - KeyValue.pair("C", "C:C"), - KeyValue.pair("D", "D:D"), - KeyValue.pair("E", "E:E"))); + new KeyValueTimestamp<>("A", "A:A", timestamp), + new KeyValueTimestamp<>("B", "B:B", timestamp), + new KeyValueTimestamp<>("C", "C:C", timestamp), + new KeyValueTimestamp<>("D", "D:D", timestamp), + new KeyValueTimestamp<>("E", "E:E", timestamp))); } @Test @@ -155,16 +156,16 @@ public class KStreamAggregationDedupIntegrationTest { new StringDeserializer(), new StringDeserializer(), Arrays.asList( - new KeyValue<>("A@" + firstBatchWindow, "A"), - new KeyValue<>("A@" + secondBatchWindow, "A:A"), - new KeyValue<>("B@" + firstBatchWindow, "B"), - new KeyValue<>("B@" + secondBatchWindow, "B:B"), - new KeyValue<>("C@" + firstBatchWindow, "C"), - new KeyValue<>("C@" + secondBatchWindow, "C:C"), - new KeyValue<>("D@" + firstBatchWindow, "D"), - new KeyValue<>("D@" + secondBatchWindow, "D:D"), - new KeyValue<>("E@" + firstBatchWindow, "E"), - new KeyValue<>("E@" + secondBatchWindow, "E:E") + new KeyValueTimestamp<>("A@" + firstBatchWindow, "A", firstBatchTimestamp), + new KeyValueTimestamp<>("A@" + secondBatchWindow, "A:A", secondBatchTimestamp), + new KeyValueTimestamp<>("B@" + firstBatchWindow, "B", firstBatchTimestamp), + new KeyValueTimestamp<>("B@" + secondBatchWindow, "B:B", secondBatchTimestamp), + new KeyValueTimestamp<>("C@" + firstBatchWindow, "C", firstBatchTimestamp), + new KeyValueTimestamp<>("C@" + secondBatchWindow, "C:C", secondBatchTimestamp), + new KeyValueTimestamp<>("D@" + firstBatchWindow, "D", firstBatchTimestamp), + new KeyValueTimestamp<>("D@" + secondBatchWindow, "D:D", secondBatchTimestamp), + new KeyValueTimestamp<>("E@" + firstBatchWindow, "E", firstBatchTimestamp), + new KeyValueTimestamp<>("E@" + secondBatchWindow, "E:E", secondBatchTimestamp) ) ); } @@ -189,11 +190,11 @@ public class KStreamAggregationDedupIntegrationTest { new StringDeserializer(), new LongDeserializer(), Arrays.asList( - KeyValue.pair("1@" + window, 2L), - KeyValue.pair("2@" + window, 2L), - KeyValue.pair("3@" + window, 2L), - KeyValue.pair("4@" + window, 2L), - KeyValue.pair("5@" + window, 2L) + new KeyValueTimestamp<>("1@" + window, 2L, timestamp), + new KeyValueTimestamp<>("2@" + window, 2L, timestamp), + new KeyValueTimestamp<>("3@" + window, 2L, timestamp), + new KeyValueTimestamp<>("4@" + window, 2L, timestamp), + new KeyValueTimestamp<>("5@" + window, 2L, timestamp) ) ); } @@ -232,20 +233,16 @@ public class KStreamAggregationDedupIntegrationTest { private void validateReceivedMessages(final Deserializer keyDeserializer, final Deserializer valueDeserializer, - final List> expectedRecords) + final List> expectedRecords) throws InterruptedException { final Properties consumerProperties = new Properties(); - consumerProperties - .setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, CLUSTER.bootstrapServers()); - consumerProperties.setProperty(ConsumerConfig.GROUP_ID_CONFIG, "kgroupedstream-test-" + - testNo); + consumerProperties.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, CLUSTER.bootstrapServers()); + consumerProperties.setProperty(ConsumerConfig.GROUP_ID_CONFIG, "kgroupedstream-test-" + testNo); consumerProperties.setProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest"); - consumerProperties.setProperty(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, - keyDeserializer.getClass().getName()); - consumerProperties.setProperty(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, - valueDeserializer.getClass().getName()); + consumerProperties.setProperty(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, keyDeserializer.getClass().getName()); + consumerProperties.setProperty(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, valueDeserializer.getClass().getName()); - IntegrationTestUtils.waitUntilFinalKeyValueRecordsReceived( + IntegrationTestUtils.waitUntilFinalKeyValueTimestampRecordsReceived( consumerProperties, outputTopic, expectedRecords); diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/KStreamAggregationIntegrationTest.java b/streams/src/test/java/org/apache/kafka/streams/integration/KStreamAggregationIntegrationTest.java index 2f06af740c0..dcf72a501ec 100644 --- a/streams/src/test/java/org/apache/kafka/streams/integration/KStreamAggregationIntegrationTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/integration/KStreamAggregationIntegrationTest.java @@ -29,6 +29,7 @@ import org.apache.kafka.common.serialization.StringDeserializer; import org.apache.kafka.common.serialization.StringSerializer; import org.apache.kafka.streams.KafkaStreams; import org.apache.kafka.streams.KeyValue; +import org.apache.kafka.streams.KeyValueTimestamp; import org.apache.kafka.streams.StreamsBuilder; import org.apache.kafka.streams.StreamsConfig; import org.apache.kafka.streams.integration.utils.EmbeddedKafkaCluster; @@ -98,8 +99,7 @@ public class KStreamAggregationIntegrationTest { private static final int NUM_BROKERS = 1; @ClassRule - public static final EmbeddedKafkaCluster CLUSTER = - new EmbeddedKafkaCluster(NUM_BROKERS); + public static final EmbeddedKafkaCluster CLUSTER = new EmbeddedKafkaCluster(NUM_BROKERS); private static volatile AtomicInteger testNo = new AtomicInteger(0); private final MockTime mockTime = CLUSTER.time; @@ -122,8 +122,7 @@ public class KStreamAggregationIntegrationTest { streamsConfiguration = new Properties(); final String applicationId = "kgrouped-stream-test-" + testNo.incrementAndGet(); streamsConfiguration.put(StreamsConfig.APPLICATION_ID_CONFIG, applicationId); - streamsConfiguration - .put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, CLUSTER.bootstrapServers()); + streamsConfiguration.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, CLUSTER.bootstrapServers()); streamsConfiguration.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest"); streamsConfiguration.put(StreamsConfig.STATE_DIR_CONFIG, TestUtils.tempDirectory().getPath()); streamsConfiguration.put(StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG, 0); @@ -160,30 +159,35 @@ public class KStreamAggregationIntegrationTest { produceMessages(mockTime.milliseconds()); - final List> results = receiveMessages( + final List> results = receiveMessages( new StringDeserializer(), new StringDeserializer(), 10); results.sort(KStreamAggregationIntegrationTest::compare); - assertThat(results, is(Arrays.asList(KeyValue.pair("A", "A"), - KeyValue.pair("A", "A:A"), - KeyValue.pair("B", "B"), - KeyValue.pair("B", "B:B"), - KeyValue.pair("C", "C"), - KeyValue.pair("C", "C:C"), - KeyValue.pair("D", "D"), - KeyValue.pair("D", "D:D"), - KeyValue.pair("E", "E"), - KeyValue.pair("E", "E:E")))); + assertThat(results, is(Arrays.asList( + new KeyValueTimestamp("A", "A", mockTime.milliseconds()), + new KeyValueTimestamp("A", "A:A", mockTime.milliseconds()), + new KeyValueTimestamp("B", "B", mockTime.milliseconds()), + new KeyValueTimestamp("B", "B:B", mockTime.milliseconds()), + new KeyValueTimestamp("C", "C", mockTime.milliseconds()), + new KeyValueTimestamp("C", "C:C", mockTime.milliseconds()), + new KeyValueTimestamp("D", "D", mockTime.milliseconds()), + new KeyValueTimestamp("D", "D:D", mockTime.milliseconds()), + new KeyValueTimestamp("E", "E", mockTime.milliseconds()), + new KeyValueTimestamp("E", "E:E", mockTime.milliseconds())))); } - private static int compare(final KeyValue o1, - final KeyValue o2) { - final int keyComparison = o1.key.compareTo(o2.key); + private static int compare(final KeyValueTimestamp o1, + final KeyValueTimestamp o2) { + final int keyComparison = o1.key().compareTo(o2.key()); if (keyComparison == 0) { - return o1.value.compareTo(o2.value); + final int valueComparison = o1.value().compareTo(o2.value()); + if (valueComparison == 0) { + return Long.compare(o1.timestamp(), o2.timestamp()); + } + return valueComparison; } return keyComparison; } @@ -206,7 +210,7 @@ public class KStreamAggregationIntegrationTest { startStreams(); - final List, String>> windowedOutput = receiveMessages( + final List, String>> windowedOutput = receiveMessages( new TimeWindowedDeserializer<>(), new StringDeserializer(), String.class, @@ -218,44 +222,45 @@ public class KStreamAggregationIntegrationTest { new StringDeserializer(), String.class, 15, - false); + true); - final Comparator, String>> - comparator = - Comparator.comparing((KeyValue, String> o) -> o.key.key()).thenComparing(o -> o.value); + final Comparator, String>> comparator = + Comparator.comparing((KeyValueTimestamp, String> o) -> o.key().key()) + .thenComparing(KeyValueTimestamp::value); windowedOutput.sort(comparator); final long firstBatchWindow = firstBatchTimestamp / 500 * 500; final long secondBatchWindow = secondBatchTimestamp / 500 * 500; - final List, String>> expectResult = Arrays.asList( - new KeyValue<>(new Windowed<>("A", new TimeWindow(firstBatchWindow, Long.MAX_VALUE)), "A"), - new KeyValue<>(new Windowed<>("A", new TimeWindow(secondBatchWindow, Long.MAX_VALUE)), "A"), - new KeyValue<>(new Windowed<>("A", new TimeWindow(secondBatchWindow, Long.MAX_VALUE)), "A:A"), - new KeyValue<>(new Windowed<>("B", new TimeWindow(firstBatchWindow, Long.MAX_VALUE)), "B"), - new KeyValue<>(new Windowed<>("B", new TimeWindow(secondBatchWindow, Long.MAX_VALUE)), "B"), - new KeyValue<>(new Windowed<>("B", new TimeWindow(secondBatchWindow, Long.MAX_VALUE)), "B:B"), - new KeyValue<>(new Windowed<>("C", new TimeWindow(firstBatchWindow, Long.MAX_VALUE)), "C"), - new KeyValue<>(new Windowed<>("C", new TimeWindow(secondBatchWindow, Long.MAX_VALUE)), "C"), - new KeyValue<>(new Windowed<>("C", new TimeWindow(secondBatchWindow, Long.MAX_VALUE)), "C:C"), - new KeyValue<>(new Windowed<>("D", new TimeWindow(firstBatchWindow, Long.MAX_VALUE)), "D"), - new KeyValue<>(new Windowed<>("D", new TimeWindow(secondBatchWindow, Long.MAX_VALUE)), "D"), - new KeyValue<>(new Windowed<>("D", new TimeWindow(secondBatchWindow, Long.MAX_VALUE)), "D:D"), - new KeyValue<>(new Windowed<>("E", new TimeWindow(firstBatchWindow, Long.MAX_VALUE)), "E"), - new KeyValue<>(new Windowed<>("E", new TimeWindow(secondBatchWindow, Long.MAX_VALUE)), "E"), - new KeyValue<>(new Windowed<>("E", new TimeWindow(secondBatchWindow, Long.MAX_VALUE)), "E:E") + final List, String>> expectResult = Arrays.asList( + new KeyValueTimestamp<>(new Windowed<>("A", new TimeWindow(firstBatchWindow, Long.MAX_VALUE)), "A", firstBatchTimestamp), + new KeyValueTimestamp<>(new Windowed<>("A", new TimeWindow(secondBatchWindow, Long.MAX_VALUE)), "A", secondBatchTimestamp), + new KeyValueTimestamp<>(new Windowed<>("A", new TimeWindow(secondBatchWindow, Long.MAX_VALUE)), "A:A", secondBatchTimestamp), + new KeyValueTimestamp<>(new Windowed<>("B", new TimeWindow(firstBatchWindow, Long.MAX_VALUE)), "B", firstBatchTimestamp), + new KeyValueTimestamp<>(new Windowed<>("B", new TimeWindow(secondBatchWindow, Long.MAX_VALUE)), "B", secondBatchTimestamp), + new KeyValueTimestamp<>(new Windowed<>("B", new TimeWindow(secondBatchWindow, Long.MAX_VALUE)), "B:B", secondBatchTimestamp), + new KeyValueTimestamp<>(new Windowed<>("C", new TimeWindow(firstBatchWindow, Long.MAX_VALUE)), "C", firstBatchTimestamp), + new KeyValueTimestamp<>(new Windowed<>("C", new TimeWindow(secondBatchWindow, Long.MAX_VALUE)), "C", secondBatchTimestamp), + new KeyValueTimestamp<>(new Windowed<>("C", new TimeWindow(secondBatchWindow, Long.MAX_VALUE)), "C:C", secondBatchTimestamp), + new KeyValueTimestamp<>(new Windowed<>("D", new TimeWindow(firstBatchWindow, Long.MAX_VALUE)), "D", firstBatchTimestamp), + new KeyValueTimestamp<>(new Windowed<>("D", new TimeWindow(secondBatchWindow, Long.MAX_VALUE)), "D", secondBatchTimestamp), + new KeyValueTimestamp<>(new Windowed<>("D", new TimeWindow(secondBatchWindow, Long.MAX_VALUE)), "D:D", secondBatchTimestamp), + new KeyValueTimestamp<>(new Windowed<>("E", new TimeWindow(firstBatchWindow, Long.MAX_VALUE)), "E", firstBatchTimestamp), + new KeyValueTimestamp<>(new Windowed<>("E", new TimeWindow(secondBatchWindow, Long.MAX_VALUE)), "E", secondBatchTimestamp), + new KeyValueTimestamp<>(new Windowed<>("E", new TimeWindow(secondBatchWindow, Long.MAX_VALUE)), "E:E", secondBatchTimestamp) ); assertThat(windowedOutput, is(expectResult)); final Set expectResultString = new HashSet<>(expectResult.size()); - for (final KeyValue, String> eachRecord: expectResult) { - expectResultString.add(eachRecord.toString()); + for (final KeyValueTimestamp, String> eachRecord: expectResult) { + expectResultString.add("CreateTime:" + eachRecord.timestamp() + ", " + + eachRecord.key() + ", " + eachRecord.value()); } // check every message is contained in the expect result final String[] allRecords = resultFromConsoleConsumer.split("\n"); for (final String record: allRecords) { - assertTrue(expectResultString.contains("KeyValue(" + record + ")")); + assertTrue(expectResultString.contains(record)); } } @@ -273,7 +278,7 @@ public class KStreamAggregationIntegrationTest { produceMessages(mockTime.milliseconds()); - final List> results = receiveMessages( + final List> results = receiveMessages( new StringDeserializer(), new IntegerDeserializer(), 10); @@ -281,16 +286,16 @@ public class KStreamAggregationIntegrationTest { results.sort(KStreamAggregationIntegrationTest::compare); assertThat(results, is(Arrays.asList( - KeyValue.pair("A", 1), - KeyValue.pair("A", 2), - KeyValue.pair("B", 1), - KeyValue.pair("B", 2), - KeyValue.pair("C", 1), - KeyValue.pair("C", 2), - KeyValue.pair("D", 1), - KeyValue.pair("D", 2), - KeyValue.pair("E", 1), - KeyValue.pair("E", 2) + new KeyValueTimestamp("A", 1, mockTime.milliseconds()), + new KeyValueTimestamp("A", 2, mockTime.milliseconds()), + new KeyValueTimestamp("B", 1, mockTime.milliseconds()), + new KeyValueTimestamp("B", 2, mockTime.milliseconds()), + new KeyValueTimestamp("C", 1, mockTime.milliseconds()), + new KeyValueTimestamp("C", 2, mockTime.milliseconds()), + new KeyValueTimestamp("D", 1, mockTime.milliseconds()), + new KeyValueTimestamp("D", 2, mockTime.milliseconds()), + new KeyValueTimestamp("E", 1, mockTime.milliseconds()), + new KeyValueTimestamp("E", 2, mockTime.milliseconds()) ))); } @@ -315,7 +320,7 @@ public class KStreamAggregationIntegrationTest { startStreams(); - final List, KeyValue>> windowedMessages = receiveMessagesWithTimestamp( + final List, Integer>> windowedMessages = receiveMessagesWithTimestamp( new TimeWindowedDeserializer<>(), new IntegerDeserializer(), String.class, @@ -329,37 +334,36 @@ public class KStreamAggregationIntegrationTest { 15, true); - final Comparator, KeyValue>> - comparator = - Comparator.comparing((KeyValue, KeyValue> o) -> o.key.key()).thenComparingInt(o -> o.value.key); - + final Comparator, Integer>> comparator = + Comparator.comparing((KeyValueTimestamp, Integer> o) -> o.key().key()) + .thenComparingInt(KeyValueTimestamp::value); windowedMessages.sort(comparator); final long firstWindow = firstTimestamp / 500 * 500; final long secondWindow = secondTimestamp / 500 * 500; - final List, KeyValue>> expectResult = Arrays.asList( - new KeyValue<>(new Windowed<>("A", new TimeWindow(firstWindow, Long.MAX_VALUE)), KeyValue.pair(1, firstTimestamp)), - new KeyValue<>(new Windowed<>("A", new TimeWindow(secondWindow, Long.MAX_VALUE)), KeyValue.pair(1, secondTimestamp)), - new KeyValue<>(new Windowed<>("A", new TimeWindow(secondWindow, Long.MAX_VALUE)), KeyValue.pair(2, secondTimestamp)), - new KeyValue<>(new Windowed<>("B", new TimeWindow(firstWindow, Long.MAX_VALUE)), KeyValue.pair(1, firstTimestamp)), - new KeyValue<>(new Windowed<>("B", new TimeWindow(secondWindow, Long.MAX_VALUE)), KeyValue.pair(1, secondTimestamp)), - new KeyValue<>(new Windowed<>("B", new TimeWindow(secondWindow, Long.MAX_VALUE)), KeyValue.pair(2, secondTimestamp)), - new KeyValue<>(new Windowed<>("C", new TimeWindow(firstWindow, Long.MAX_VALUE)), KeyValue.pair(1, firstTimestamp)), - new KeyValue<>(new Windowed<>("C", new TimeWindow(secondWindow, Long.MAX_VALUE)), KeyValue.pair(1, secondTimestamp)), - new KeyValue<>(new Windowed<>("C", new TimeWindow(secondWindow, Long.MAX_VALUE)), KeyValue.pair(2, secondTimestamp)), - new KeyValue<>(new Windowed<>("D", new TimeWindow(firstWindow, Long.MAX_VALUE)), KeyValue.pair(1, firstTimestamp)), - new KeyValue<>(new Windowed<>("D", new TimeWindow(secondWindow, Long.MAX_VALUE)), KeyValue.pair(1, secondTimestamp)), - new KeyValue<>(new Windowed<>("D", new TimeWindow(secondWindow, Long.MAX_VALUE)), KeyValue.pair(2, secondTimestamp)), - new KeyValue<>(new Windowed<>("E", new TimeWindow(firstWindow, Long.MAX_VALUE)), KeyValue.pair(1, firstTimestamp)), - new KeyValue<>(new Windowed<>("E", new TimeWindow(secondWindow, Long.MAX_VALUE)), KeyValue.pair(1, secondTimestamp)), - new KeyValue<>(new Windowed<>("E", new TimeWindow(secondWindow, Long.MAX_VALUE)), KeyValue.pair(2, secondTimestamp))); + final List, Integer>> expectResult = Arrays.asList( + new KeyValueTimestamp<>(new Windowed<>("A", new TimeWindow(firstWindow, Long.MAX_VALUE)), 1, firstTimestamp), + new KeyValueTimestamp<>(new Windowed<>("A", new TimeWindow(secondWindow, Long.MAX_VALUE)), 1, secondTimestamp), + new KeyValueTimestamp<>(new Windowed<>("A", new TimeWindow(secondWindow, Long.MAX_VALUE)), 2, secondTimestamp), + new KeyValueTimestamp<>(new Windowed<>("B", new TimeWindow(firstWindow, Long.MAX_VALUE)), 1, firstTimestamp), + new KeyValueTimestamp<>(new Windowed<>("B", new TimeWindow(secondWindow, Long.MAX_VALUE)), 1, secondTimestamp), + new KeyValueTimestamp<>(new Windowed<>("B", new TimeWindow(secondWindow, Long.MAX_VALUE)), 2, secondTimestamp), + new KeyValueTimestamp<>(new Windowed<>("C", new TimeWindow(firstWindow, Long.MAX_VALUE)), 1, firstTimestamp), + new KeyValueTimestamp<>(new Windowed<>("C", new TimeWindow(secondWindow, Long.MAX_VALUE)), 1, secondTimestamp), + new KeyValueTimestamp<>(new Windowed<>("C", new TimeWindow(secondWindow, Long.MAX_VALUE)), 2, secondTimestamp), + new KeyValueTimestamp<>(new Windowed<>("D", new TimeWindow(firstWindow, Long.MAX_VALUE)), 1, firstTimestamp), + new KeyValueTimestamp<>(new Windowed<>("D", new TimeWindow(secondWindow, Long.MAX_VALUE)), 1, secondTimestamp), + new KeyValueTimestamp<>(new Windowed<>("D", new TimeWindow(secondWindow, Long.MAX_VALUE)), 2, secondTimestamp), + new KeyValueTimestamp<>(new Windowed<>("E", new TimeWindow(firstWindow, Long.MAX_VALUE)), 1, firstTimestamp), + new KeyValueTimestamp<>(new Windowed<>("E", new TimeWindow(secondWindow, Long.MAX_VALUE)), 1, secondTimestamp), + new KeyValueTimestamp<>(new Windowed<>("E", new TimeWindow(secondWindow, Long.MAX_VALUE)), 2, secondTimestamp)); assertThat(windowedMessages, is(expectResult)); final Set expectResultString = new HashSet<>(expectResult.size()); - for (final KeyValue, KeyValue> eachRecord: expectResult) { - expectResultString.add("CreateTime:" + eachRecord.value.value + ", " + eachRecord.key.toString() + ", " + eachRecord.value.key); + for (final KeyValueTimestamp, Integer> eachRecord: expectResult) { + expectResultString.add("CreateTime:" + eachRecord.timestamp() + ", " + eachRecord.key() + ", " + eachRecord.value()); } // check every message is contained in the expect result @@ -375,23 +379,23 @@ public class KStreamAggregationIntegrationTest { produceMessages(mockTime.milliseconds()); - final List> results = receiveMessages( + final List> results = receiveMessages( new StringDeserializer(), new LongDeserializer(), 10); results.sort(KStreamAggregationIntegrationTest::compare); assertThat(results, is(Arrays.asList( - KeyValue.pair("A", 1L), - KeyValue.pair("A", 2L), - KeyValue.pair("B", 1L), - KeyValue.pair("B", 2L), - KeyValue.pair("C", 1L), - KeyValue.pair("C", 2L), - KeyValue.pair("D", 1L), - KeyValue.pair("D", 2L), - KeyValue.pair("E", 1L), - KeyValue.pair("E", 2L) + new KeyValueTimestamp("A", 1L, mockTime.milliseconds()), + new KeyValueTimestamp("A", 2L, mockTime.milliseconds()), + new KeyValueTimestamp("B", 1L, mockTime.milliseconds()), + new KeyValueTimestamp("B", 2L, mockTime.milliseconds()), + new KeyValueTimestamp("C", 1L, mockTime.milliseconds()), + new KeyValueTimestamp("C", 2L, mockTime.milliseconds()), + new KeyValueTimestamp("D", 1L, mockTime.milliseconds()), + new KeyValueTimestamp("D", 2L, mockTime.milliseconds()), + new KeyValueTimestamp("E", 1L, mockTime.milliseconds()), + new KeyValueTimestamp("E", 2L, mockTime.milliseconds()) ))); } @@ -430,7 +434,7 @@ public class KStreamAggregationIntegrationTest { startStreams(); - final List> results = receiveMessages( + final List> results = receiveMessages( new StringDeserializer(), new LongDeserializer(), 10); @@ -438,18 +442,17 @@ public class KStreamAggregationIntegrationTest { final long window = timestamp / 500 * 500; assertThat(results, is(Arrays.asList( - KeyValue.pair("1@" + window, 1L), - KeyValue.pair("1@" + window, 2L), - KeyValue.pair("2@" + window, 1L), - KeyValue.pair("2@" + window, 2L), - KeyValue.pair("3@" + window, 1L), - KeyValue.pair("3@" + window, 2L), - KeyValue.pair("4@" + window, 1L), - KeyValue.pair("4@" + window, 2L), - KeyValue.pair("5@" + window, 1L), - KeyValue.pair("5@" + window, 2L) + new KeyValueTimestamp("1@" + window, 1L, timestamp), + new KeyValueTimestamp("1@" + window, 2L, timestamp), + new KeyValueTimestamp("2@" + window, 1L, timestamp), + new KeyValueTimestamp("2@" + window, 2L, timestamp), + new KeyValueTimestamp("3@" + window, 1L, timestamp), + new KeyValueTimestamp("3@" + window, 2L, timestamp), + new KeyValueTimestamp("4@" + window, 1L, timestamp), + new KeyValueTimestamp("4@" + window, 2L, timestamp), + new KeyValueTimestamp("5@" + window, 1L, timestamp), + new KeyValueTimestamp("5@" + window, 2L, timestamp) ))); - } @Test @@ -796,17 +799,17 @@ public class KStreamAggregationIntegrationTest { kafkaStreams.start(); } - private List> receiveMessages(final Deserializer keyDeserializer, - final Deserializer valueDeserializer, - final int numMessages) + private List> receiveMessages(final Deserializer keyDeserializer, + final Deserializer valueDeserializer, + final int numMessages) throws InterruptedException { return receiveMessages(keyDeserializer, valueDeserializer, null, numMessages); } - private List> receiveMessages(final Deserializer keyDeserializer, - final Deserializer valueDeserializer, - final Class innerClass, - final int numMessages) throws InterruptedException { + private List> receiveMessages(final Deserializer keyDeserializer, + final Deserializer valueDeserializer, + final Class innerClass, + final int numMessages) throws InterruptedException { final Properties consumerProperties = new Properties(); consumerProperties.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, CLUSTER.bootstrapServers()); consumerProperties.setProperty(ConsumerConfig.GROUP_ID_CONFIG, "kgroupedstream-test-" + testNo); @@ -817,17 +820,17 @@ public class KStreamAggregationIntegrationTest { consumerProperties.setProperty(StreamsConfig.DEFAULT_WINDOWED_KEY_SERDE_INNER_CLASS, Serdes.serdeFrom(innerClass).getClass().getName()); } - return IntegrationTestUtils.waitUntilMinKeyValueRecordsReceived( + return IntegrationTestUtils.waitUntilMinKeyValueWithTimestampRecordsReceived( consumerProperties, outputTopic, numMessages, 60 * 1000); } - private List>> receiveMessagesWithTimestamp(final Deserializer keyDeserializer, - final Deserializer valueDeserializer, - final Class innerClass, - final int numMessages) throws InterruptedException { + private List> receiveMessagesWithTimestamp(final Deserializer keyDeserializer, + final Deserializer valueDeserializer, + final Class innerClass, + final int numMessages) throws InterruptedException { final Properties consumerProperties = new Properties(); consumerProperties.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, CLUSTER.bootstrapServers()); consumerProperties.setProperty(ConsumerConfig.GROUP_ID_CONFIG, "kgroupedstream-test-" + testNo); diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/StreamStreamJoinIntegrationTest.java b/streams/src/test/java/org/apache/kafka/streams/integration/StreamStreamJoinIntegrationTest.java index 646185ebb93..4be14c22a98 100644 --- a/streams/src/test/java/org/apache/kafka/streams/integration/StreamStreamJoinIntegrationTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/integration/StreamStreamJoinIntegrationTest.java @@ -16,6 +16,7 @@ */ package org.apache.kafka.streams.integration; +import org.apache.kafka.streams.KeyValueTimestamp; import org.apache.kafka.streams.StreamsBuilder; import org.apache.kafka.streams.StreamsConfig; import org.apache.kafka.streams.kstream.JoinWindows; @@ -62,22 +63,36 @@ public class StreamStreamJoinIntegrationTest extends AbstractJoinIntegrationTest public void testInner() throws Exception { STREAMS_CONFIG.put(StreamsConfig.APPLICATION_ID_CONFIG, appID + "-inner"); - final List> expectedResult = Arrays.asList( + final List>> expectedResult = Arrays.asList( null, null, null, - Collections.singletonList("A-a"), - Collections.singletonList("B-a"), - Arrays.asList("A-b", "B-b"), + Collections.singletonList(new KeyValueTimestamp<>(ANY_UNIQUE_KEY, "A-a", 4L)), + Collections.singletonList(new KeyValueTimestamp<>(ANY_UNIQUE_KEY, "B-a", 5L)), + Arrays.asList( + new KeyValueTimestamp<>(ANY_UNIQUE_KEY, "A-b", 6L), + new KeyValueTimestamp<>(ANY_UNIQUE_KEY, "B-b", 6L)), null, null, - Arrays.asList("C-a", "C-b"), - Arrays.asList("A-c", "B-c", "C-c"), + Arrays.asList( + new KeyValueTimestamp<>(ANY_UNIQUE_KEY, "C-a", 9L), + new KeyValueTimestamp<>(ANY_UNIQUE_KEY, "C-b", 9L)), + Arrays.asList( + new KeyValueTimestamp<>(ANY_UNIQUE_KEY, "A-c", 10L), + new KeyValueTimestamp<>(ANY_UNIQUE_KEY, "B-c", 10L), + new KeyValueTimestamp<>(ANY_UNIQUE_KEY, "C-c", 10L)), null, null, null, - Arrays.asList("A-d", "B-d", "C-d"), - Arrays.asList("D-a", "D-b", "D-c", "D-d") + Arrays.asList( + new KeyValueTimestamp<>(ANY_UNIQUE_KEY, "A-d", 14L), + new KeyValueTimestamp<>(ANY_UNIQUE_KEY, "B-d", 14L), + new KeyValueTimestamp<>(ANY_UNIQUE_KEY, "C-d", 14L)), + Arrays.asList( + new KeyValueTimestamp<>(ANY_UNIQUE_KEY, "D-a", 15L), + new KeyValueTimestamp<>(ANY_UNIQUE_KEY, "D-b", 15L), + new KeyValueTimestamp<>(ANY_UNIQUE_KEY, "D-c", 15L), + new KeyValueTimestamp<>(ANY_UNIQUE_KEY, "D-d", 15L)) ); leftStream.join(rightStream, valueJoiner, JoinWindows.of(ofSeconds(10))).to(OUTPUT_TOPIC); @@ -89,27 +104,41 @@ public class StreamStreamJoinIntegrationTest extends AbstractJoinIntegrationTest public void testInnerRepartitioned() throws Exception { STREAMS_CONFIG.put(StreamsConfig.APPLICATION_ID_CONFIG, appID + "-inner-repartitioned"); - final List> expectedResult = Arrays.asList( - null, - null, - null, - Collections.singletonList("A-a"), - Collections.singletonList("B-a"), - Arrays.asList("A-b", "B-b"), - null, - null, - Arrays.asList("C-a", "C-b"), - Arrays.asList("A-c", "B-c", "C-c"), - null, - null, - null, - Arrays.asList("A-d", "B-d", "C-d"), - Arrays.asList("D-a", "D-b", "D-c", "D-d") + final List>> expectedResult = Arrays.asList( + null, + null, + null, + Collections.singletonList(new KeyValueTimestamp<>(ANY_UNIQUE_KEY, "A-a", 4L)), + Collections.singletonList(new KeyValueTimestamp<>(ANY_UNIQUE_KEY, "B-a", 5L)), + Arrays.asList( + new KeyValueTimestamp<>(ANY_UNIQUE_KEY, "A-b", 6L), + new KeyValueTimestamp<>(ANY_UNIQUE_KEY, "B-b", 6L)), + null, + null, + Arrays.asList( + new KeyValueTimestamp<>(ANY_UNIQUE_KEY, "C-a", 9L), + new KeyValueTimestamp<>(ANY_UNIQUE_KEY, "C-b", 9L)), + Arrays.asList( + new KeyValueTimestamp<>(ANY_UNIQUE_KEY, "A-c", 10L), + new KeyValueTimestamp<>(ANY_UNIQUE_KEY, "B-c", 10L), + new KeyValueTimestamp<>(ANY_UNIQUE_KEY, "C-c", 10L)), + null, + null, + null, + Arrays.asList( + new KeyValueTimestamp<>(ANY_UNIQUE_KEY, "A-d", 14L), + new KeyValueTimestamp<>(ANY_UNIQUE_KEY, "B-d", 14L), + new KeyValueTimestamp<>(ANY_UNIQUE_KEY, "C-d", 14L)), + Arrays.asList( + new KeyValueTimestamp<>(ANY_UNIQUE_KEY, "D-a", 15L), + new KeyValueTimestamp<>(ANY_UNIQUE_KEY, "D-b", 15L), + new KeyValueTimestamp<>(ANY_UNIQUE_KEY, "D-c", 15L), + new KeyValueTimestamp<>(ANY_UNIQUE_KEY, "D-d", 15L)) ); - leftStream.map(MockMapper.noOpKeyValueMapper()) - .join(rightStream.flatMap(MockMapper.noOpFlatKeyValueMapper()) - .selectKey(MockMapper.selectKeyKeyValueMapper()), + leftStream.map(MockMapper.noOpKeyValueMapper()) + .join(rightStream.flatMap(MockMapper.noOpFlatKeyValueMapper()) + .selectKey(MockMapper.selectKeyKeyValueMapper()), valueJoiner, JoinWindows.of(ofSeconds(10))).to(OUTPUT_TOPIC); runTest(expectedResult); @@ -119,22 +148,36 @@ public class StreamStreamJoinIntegrationTest extends AbstractJoinIntegrationTest public void testLeft() throws Exception { STREAMS_CONFIG.put(StreamsConfig.APPLICATION_ID_CONFIG, appID + "-left"); - final List> expectedResult = Arrays.asList( + final List>> expectedResult = Arrays.asList( null, null, - Collections.singletonList("A-null"), - Collections.singletonList("A-a"), - Collections.singletonList("B-a"), - Arrays.asList("A-b", "B-b"), + Collections.singletonList(new KeyValueTimestamp<>(ANY_UNIQUE_KEY, "A-null", 3L)), + Collections.singletonList(new KeyValueTimestamp<>(ANY_UNIQUE_KEY, "A-a", 4L)), + Collections.singletonList(new KeyValueTimestamp<>(ANY_UNIQUE_KEY, "B-a", 5L)), + Arrays.asList( + new KeyValueTimestamp<>(ANY_UNIQUE_KEY, "A-b", 6L), + new KeyValueTimestamp<>(ANY_UNIQUE_KEY, "B-b", 6L)), null, null, - Arrays.asList("C-a", "C-b"), - Arrays.asList("A-c", "B-c", "C-c"), + Arrays.asList( + new KeyValueTimestamp<>(ANY_UNIQUE_KEY, "C-a", 9L), + new KeyValueTimestamp<>(ANY_UNIQUE_KEY, "C-b", 9L)), + Arrays.asList( + new KeyValueTimestamp<>(ANY_UNIQUE_KEY, "A-c", 10L), + new KeyValueTimestamp<>(ANY_UNIQUE_KEY, "B-c", 10L), + new KeyValueTimestamp<>(ANY_UNIQUE_KEY, "C-c", 10L)), null, null, null, - Arrays.asList("A-d", "B-d", "C-d"), - Arrays.asList("D-a", "D-b", "D-c", "D-d") + Arrays.asList( + new KeyValueTimestamp<>(ANY_UNIQUE_KEY, "A-d", 14L), + new KeyValueTimestamp<>(ANY_UNIQUE_KEY, "B-d", 14L), + new KeyValueTimestamp<>(ANY_UNIQUE_KEY, "C-d", 14L)), + Arrays.asList( + new KeyValueTimestamp<>(ANY_UNIQUE_KEY, "D-a", 15L), + new KeyValueTimestamp<>(ANY_UNIQUE_KEY, "D-b", 15L), + new KeyValueTimestamp<>(ANY_UNIQUE_KEY, "D-c", 15L), + new KeyValueTimestamp<>(ANY_UNIQUE_KEY, "D-d", 15L)) ); leftStream.leftJoin(rightStream, valueJoiner, JoinWindows.of(ofSeconds(10))).to(OUTPUT_TOPIC); @@ -146,27 +189,41 @@ public class StreamStreamJoinIntegrationTest extends AbstractJoinIntegrationTest public void testLeftRepartitioned() throws Exception { STREAMS_CONFIG.put(StreamsConfig.APPLICATION_ID_CONFIG, appID + "-left-repartitioned"); - final List> expectedResult = Arrays.asList( - null, - null, - Collections.singletonList("A-null"), - Collections.singletonList("A-a"), - Collections.singletonList("B-a"), - Arrays.asList("A-b", "B-b"), - null, - null, - Arrays.asList("C-a", "C-b"), - Arrays.asList("A-c", "B-c", "C-c"), - null, - null, - null, - Arrays.asList("A-d", "B-d", "C-d"), - Arrays.asList("D-a", "D-b", "D-c", "D-d") + final List>> expectedResult = Arrays.asList( + null, + null, + Collections.singletonList(new KeyValueTimestamp<>(ANY_UNIQUE_KEY, "A-null", 3L)), + Collections.singletonList(new KeyValueTimestamp<>(ANY_UNIQUE_KEY, "A-a", 4L)), + Collections.singletonList(new KeyValueTimestamp<>(ANY_UNIQUE_KEY, "B-a", 5L)), + Arrays.asList( + new KeyValueTimestamp<>(ANY_UNIQUE_KEY, "A-b", 6L), + new KeyValueTimestamp<>(ANY_UNIQUE_KEY, "B-b", 6L)), + null, + null, + Arrays.asList( + new KeyValueTimestamp<>(ANY_UNIQUE_KEY, "C-a", 9L), + new KeyValueTimestamp<>(ANY_UNIQUE_KEY, "C-b", 9L)), + Arrays.asList( + new KeyValueTimestamp<>(ANY_UNIQUE_KEY, "A-c", 10L), + new KeyValueTimestamp<>(ANY_UNIQUE_KEY, "B-c", 10L), + new KeyValueTimestamp<>(ANY_UNIQUE_KEY, "C-c", 10L)), + null, + null, + null, + Arrays.asList( + new KeyValueTimestamp<>(ANY_UNIQUE_KEY, "A-d", 14L), + new KeyValueTimestamp<>(ANY_UNIQUE_KEY, "B-d", 14L), + new KeyValueTimestamp<>(ANY_UNIQUE_KEY, "C-d", 14L)), + Arrays.asList( + new KeyValueTimestamp<>(ANY_UNIQUE_KEY, "D-a", 15L), + new KeyValueTimestamp<>(ANY_UNIQUE_KEY, "D-b", 15L), + new KeyValueTimestamp<>(ANY_UNIQUE_KEY, "D-c", 15L), + new KeyValueTimestamp<>(ANY_UNIQUE_KEY, "D-d", 15L)) ); - leftStream.map(MockMapper.noOpKeyValueMapper()) - .leftJoin(rightStream.flatMap(MockMapper.noOpFlatKeyValueMapper()) - .selectKey(MockMapper.selectKeyKeyValueMapper()), + leftStream.map(MockMapper.noOpKeyValueMapper()) + .leftJoin(rightStream.flatMap(MockMapper.noOpFlatKeyValueMapper()) + .selectKey(MockMapper.selectKeyKeyValueMapper()), valueJoiner, JoinWindows.of(ofSeconds(10))).to(OUTPUT_TOPIC); runTest(expectedResult); @@ -176,22 +233,36 @@ public class StreamStreamJoinIntegrationTest extends AbstractJoinIntegrationTest public void testOuter() throws Exception { STREAMS_CONFIG.put(StreamsConfig.APPLICATION_ID_CONFIG, appID + "-outer"); - final List> expectedResult = Arrays.asList( + final List>> expectedResult = Arrays.asList( null, null, - Collections.singletonList("A-null"), - Collections.singletonList("A-a"), - Collections.singletonList("B-a"), - Arrays.asList("A-b", "B-b"), + Collections.singletonList(new KeyValueTimestamp<>(ANY_UNIQUE_KEY, "A-null", 3L)), + Collections.singletonList(new KeyValueTimestamp<>(ANY_UNIQUE_KEY, "A-a", 4L)), + Collections.singletonList(new KeyValueTimestamp<>(ANY_UNIQUE_KEY, "B-a", 5L)), + Arrays.asList( + new KeyValueTimestamp<>(ANY_UNIQUE_KEY, "A-b", 6L), + new KeyValueTimestamp<>(ANY_UNIQUE_KEY, "B-b", 6L)), null, null, - Arrays.asList("C-a", "C-b"), - Arrays.asList("A-c", "B-c", "C-c"), + Arrays.asList( + new KeyValueTimestamp<>(ANY_UNIQUE_KEY, "C-a", 9L), + new KeyValueTimestamp<>(ANY_UNIQUE_KEY, "C-b", 9L)), + Arrays.asList( + new KeyValueTimestamp<>(ANY_UNIQUE_KEY, "A-c", 10L), + new KeyValueTimestamp<>(ANY_UNIQUE_KEY, "B-c", 10L), + new KeyValueTimestamp<>(ANY_UNIQUE_KEY, "C-c", 10L)), null, null, null, - Arrays.asList("A-d", "B-d", "C-d"), - Arrays.asList("D-a", "D-b", "D-c", "D-d") + Arrays.asList( + new KeyValueTimestamp<>(ANY_UNIQUE_KEY, "A-d", 14L), + new KeyValueTimestamp<>(ANY_UNIQUE_KEY, "B-d", 14L), + new KeyValueTimestamp<>(ANY_UNIQUE_KEY, "C-d", 14L)), + Arrays.asList( + new KeyValueTimestamp<>(ANY_UNIQUE_KEY, "D-a", 15L), + new KeyValueTimestamp<>(ANY_UNIQUE_KEY, "D-b", 15L), + new KeyValueTimestamp<>(ANY_UNIQUE_KEY, "D-c", 15L), + new KeyValueTimestamp<>(ANY_UNIQUE_KEY, "D-d", 15L)) ); leftStream.outerJoin(rightStream, valueJoiner, JoinWindows.of(ofSeconds(10))).to(OUTPUT_TOPIC); @@ -203,27 +274,41 @@ public class StreamStreamJoinIntegrationTest extends AbstractJoinIntegrationTest public void testOuterRepartitioned() throws Exception { STREAMS_CONFIG.put(StreamsConfig.APPLICATION_ID_CONFIG, appID + "-outer"); - final List> expectedResult = Arrays.asList( - null, - null, - Collections.singletonList("A-null"), - Collections.singletonList("A-a"), - Collections.singletonList("B-a"), - Arrays.asList("A-b", "B-b"), - null, - null, - Arrays.asList("C-a", "C-b"), - Arrays.asList("A-c", "B-c", "C-c"), - null, - null, - null, - Arrays.asList("A-d", "B-d", "C-d"), - Arrays.asList("D-a", "D-b", "D-c", "D-d") + final List>> expectedResult = Arrays.asList( + null, + null, + Collections.singletonList(new KeyValueTimestamp<>(ANY_UNIQUE_KEY, "A-null", 3L)), + Collections.singletonList(new KeyValueTimestamp<>(ANY_UNIQUE_KEY, "A-a", 4L)), + Collections.singletonList(new KeyValueTimestamp<>(ANY_UNIQUE_KEY, "B-a", 5L)), + Arrays.asList( + new KeyValueTimestamp<>(ANY_UNIQUE_KEY, "A-b", 6L), + new KeyValueTimestamp<>(ANY_UNIQUE_KEY, "B-b", 6L)), + null, + null, + Arrays.asList( + new KeyValueTimestamp<>(ANY_UNIQUE_KEY, "C-a", 9L), + new KeyValueTimestamp<>(ANY_UNIQUE_KEY, "C-b", 9L)), + Arrays.asList( + new KeyValueTimestamp<>(ANY_UNIQUE_KEY, "A-c", 10L), + new KeyValueTimestamp<>(ANY_UNIQUE_KEY, "B-c", 10L), + new KeyValueTimestamp<>(ANY_UNIQUE_KEY, "C-c", 10L)), + null, + null, + null, + Arrays.asList( + new KeyValueTimestamp<>(ANY_UNIQUE_KEY, "A-d", 14L), + new KeyValueTimestamp<>(ANY_UNIQUE_KEY, "B-d", 14L), + new KeyValueTimestamp<>(ANY_UNIQUE_KEY, "C-d", 14L)), + Arrays.asList( + new KeyValueTimestamp<>(ANY_UNIQUE_KEY, "D-a", 15L), + new KeyValueTimestamp<>(ANY_UNIQUE_KEY, "D-b", 15L), + new KeyValueTimestamp<>(ANY_UNIQUE_KEY, "D-c", 15L), + new KeyValueTimestamp<>(ANY_UNIQUE_KEY, "D-d", 15L)) ); - leftStream.map(MockMapper.noOpKeyValueMapper()) - .outerJoin(rightStream.flatMap(MockMapper.noOpFlatKeyValueMapper()) - .selectKey(MockMapper.selectKeyKeyValueMapper()), + leftStream.map(MockMapper.noOpKeyValueMapper()) + .outerJoin(rightStream.flatMap(MockMapper.noOpFlatKeyValueMapper()) + .selectKey(MockMapper.selectKeyKeyValueMapper()), valueJoiner, JoinWindows.of(ofSeconds(10))).to(OUTPUT_TOPIC); runTest(expectedResult); @@ -233,26 +318,84 @@ public class StreamStreamJoinIntegrationTest extends AbstractJoinIntegrationTest public void testMultiInner() throws Exception { STREAMS_CONFIG.put(StreamsConfig.APPLICATION_ID_CONFIG, appID + "-multi-inner"); - final List> expectedResult = Arrays.asList( - null, - null, - null, - Collections.singletonList("A-a-a"), - Collections.singletonList("B-a-a"), - Arrays.asList("A-b-a", "B-b-a", "A-a-b", "B-a-b", "A-b-b", "B-b-b"), - null, - null, - Arrays.asList("C-a-a", "C-a-b", "C-b-a", "C-b-b"), - Arrays.asList("A-c-a", "A-c-b", "B-c-a", "B-c-b", "C-c-a", "C-c-b", "A-a-c", "B-a-c", - "A-b-c", "B-b-c", "C-a-c", "C-b-c", "A-c-c", "B-c-c", "C-c-c"), - null, - null, - null, - Arrays.asList("A-d-a", "A-d-b", "A-d-c", "B-d-a", "B-d-b", "B-d-c", "C-d-a", "C-d-b", "C-d-c", - "A-a-d", "B-a-d", "A-b-d", "B-b-d", "C-a-d", "C-b-d", "A-c-d", "B-c-d", "C-c-d", - "A-d-d", "B-d-d", "C-d-d"), - Arrays.asList("D-a-a", "D-a-b", "D-a-c", "D-a-d", "D-b-a", "D-b-b", "D-b-c", "D-b-d", "D-c-a", - "D-c-b", "D-c-c", "D-c-d", "D-d-a", "D-d-b", "D-d-c", "D-d-d") + final List>> expectedResult = Arrays.asList( + null, + null, + null, + Collections.singletonList(new KeyValueTimestamp<>(ANY_UNIQUE_KEY, "A-a-a", 4L)), + Collections.singletonList(new KeyValueTimestamp<>(ANY_UNIQUE_KEY, "B-a-a", 5L)), + Arrays.asList( + new KeyValueTimestamp<>(ANY_UNIQUE_KEY, "A-b-a", 6L), + new KeyValueTimestamp<>(ANY_UNIQUE_KEY, "B-b-a", 6L), + new KeyValueTimestamp<>(ANY_UNIQUE_KEY, "A-a-b", 6L), + new KeyValueTimestamp<>(ANY_UNIQUE_KEY, "B-a-b", 6L), + new KeyValueTimestamp<>(ANY_UNIQUE_KEY, "A-b-b", 6L), + new KeyValueTimestamp<>(ANY_UNIQUE_KEY, "B-b-b", 6L)), + null, + null, + Arrays.asList( + new KeyValueTimestamp<>(ANY_UNIQUE_KEY, "C-a-a", 9L), + new KeyValueTimestamp<>(ANY_UNIQUE_KEY, "C-a-b", 9L), + new KeyValueTimestamp<>(ANY_UNIQUE_KEY, "C-b-a", 9L), + new KeyValueTimestamp<>(ANY_UNIQUE_KEY, "C-b-b", 9L)), + Arrays.asList( + new KeyValueTimestamp<>(ANY_UNIQUE_KEY, "A-c-a", 10L), + new KeyValueTimestamp<>(ANY_UNIQUE_KEY, "A-c-b", 10L), + new KeyValueTimestamp<>(ANY_UNIQUE_KEY, "B-c-a", 10L), + new KeyValueTimestamp<>(ANY_UNIQUE_KEY, "B-c-b", 10L), + new KeyValueTimestamp<>(ANY_UNIQUE_KEY, "C-c-a", 10L), + new KeyValueTimestamp<>(ANY_UNIQUE_KEY, "C-c-b", 10L), + new KeyValueTimestamp<>(ANY_UNIQUE_KEY, "A-a-c", 10L), + new KeyValueTimestamp<>(ANY_UNIQUE_KEY, "B-a-c", 10L), + new KeyValueTimestamp<>(ANY_UNIQUE_KEY, "A-b-c", 10L), + new KeyValueTimestamp<>(ANY_UNIQUE_KEY, "B-b-c", 10L), + new KeyValueTimestamp<>(ANY_UNIQUE_KEY, "C-a-c", 10L), + new KeyValueTimestamp<>(ANY_UNIQUE_KEY, "C-b-c", 10L), + new KeyValueTimestamp<>(ANY_UNIQUE_KEY, "A-c-c", 10L), + new KeyValueTimestamp<>(ANY_UNIQUE_KEY, "B-c-c", 10L), + new KeyValueTimestamp<>(ANY_UNIQUE_KEY, "C-c-c", 10L)), + null, + null, + null, + Arrays.asList( + new KeyValueTimestamp<>(ANY_UNIQUE_KEY, "A-d-a", 14L), + new KeyValueTimestamp<>(ANY_UNIQUE_KEY, "A-d-b", 14L), + new KeyValueTimestamp<>(ANY_UNIQUE_KEY, "A-d-c", 14L), + new KeyValueTimestamp<>(ANY_UNIQUE_KEY, "B-d-a", 14L), + new KeyValueTimestamp<>(ANY_UNIQUE_KEY, "B-d-b", 14L), + new KeyValueTimestamp<>(ANY_UNIQUE_KEY, "B-d-c", 14L), + new KeyValueTimestamp<>(ANY_UNIQUE_KEY, "C-d-a", 14L), + new KeyValueTimestamp<>(ANY_UNIQUE_KEY, "C-d-b", 14L), + new KeyValueTimestamp<>(ANY_UNIQUE_KEY, "C-d-c", 14L), + new KeyValueTimestamp<>(ANY_UNIQUE_KEY, "A-a-d", 14L), + new KeyValueTimestamp<>(ANY_UNIQUE_KEY, "B-a-d", 14L), + new KeyValueTimestamp<>(ANY_UNIQUE_KEY, "A-b-d", 14L), + new KeyValueTimestamp<>(ANY_UNIQUE_KEY, "B-b-d", 14L), + new KeyValueTimestamp<>(ANY_UNIQUE_KEY, "C-a-d", 14L), + new KeyValueTimestamp<>(ANY_UNIQUE_KEY, "C-b-d", 14L), + new KeyValueTimestamp<>(ANY_UNIQUE_KEY, "A-c-d", 14L), + new KeyValueTimestamp<>(ANY_UNIQUE_KEY, "B-c-d", 14L), + new KeyValueTimestamp<>(ANY_UNIQUE_KEY, "C-c-d", 14L), + new KeyValueTimestamp<>(ANY_UNIQUE_KEY, "A-d-d", 14L), + new KeyValueTimestamp<>(ANY_UNIQUE_KEY, "B-d-d", 14L), + new KeyValueTimestamp<>(ANY_UNIQUE_KEY, "C-d-d", 14L)), + Arrays.asList( + new KeyValueTimestamp<>(ANY_UNIQUE_KEY, "D-a-a", 15L), + new KeyValueTimestamp<>(ANY_UNIQUE_KEY, "D-a-b", 15L), + new KeyValueTimestamp<>(ANY_UNIQUE_KEY, "D-a-c", 15L), + new KeyValueTimestamp<>(ANY_UNIQUE_KEY, "D-a-d", 15L), + new KeyValueTimestamp<>(ANY_UNIQUE_KEY, "D-b-a", 15L), + new KeyValueTimestamp<>(ANY_UNIQUE_KEY, "D-b-b", 15L), + new KeyValueTimestamp<>(ANY_UNIQUE_KEY, "D-b-c", 15L), + new KeyValueTimestamp<>(ANY_UNIQUE_KEY, "D-b-d", 15L), + new KeyValueTimestamp<>(ANY_UNIQUE_KEY, "D-c-a", 15L), + new KeyValueTimestamp<>(ANY_UNIQUE_KEY, "D-c-b", 15L), + new KeyValueTimestamp<>(ANY_UNIQUE_KEY, "D-c-c", 15L), + new KeyValueTimestamp<>(ANY_UNIQUE_KEY, "D-c-d", 15L), + new KeyValueTimestamp<>(ANY_UNIQUE_KEY, "D-d-a", 15L), + new KeyValueTimestamp<>(ANY_UNIQUE_KEY, "D-d-b", 15L), + new KeyValueTimestamp<>(ANY_UNIQUE_KEY, "D-d-c", 15L), + new KeyValueTimestamp<>(ANY_UNIQUE_KEY, "D-d-d", 15L)) ); leftStream.join(rightStream, valueJoiner, JoinWindows.of(ofSeconds(10))) diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/StreamTableJoinIntegrationTest.java b/streams/src/test/java/org/apache/kafka/streams/integration/StreamTableJoinIntegrationTest.java index ca78d025add..772c91d7b36 100644 --- a/streams/src/test/java/org/apache/kafka/streams/integration/StreamTableJoinIntegrationTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/integration/StreamTableJoinIntegrationTest.java @@ -17,6 +17,7 @@ package org.apache.kafka.streams.integration; import org.apache.kafka.streams.KafkaStreamsWrapper; +import org.apache.kafka.streams.KeyValueTimestamp; import org.apache.kafka.streams.StreamsBuilder; import org.apache.kafka.streams.StreamsConfig; import org.apache.kafka.streams.integration.utils.IntegrationTestUtils; @@ -34,8 +35,7 @@ import java.util.Arrays; import java.util.Collections; import java.util.List; -import static org.junit.Assert.assertEquals; - +import static org.junit.Assert.assertTrue; /** * Tests all available joins of Kafka Streams DSL. @@ -82,30 +82,30 @@ public class StreamTableJoinIntegrationTest extends AbstractJoinIntegrationTest TestUtils.waitForCondition(listener::revokedToPendingShutdownSeen, "Did not seen thread state transited to PENDING_SHUTDOWN"); streams.close(); - assertEquals(listener.createdToRevokedSeen(), true); - assertEquals(listener.revokedToPendingShutdownSeen(), true); + assertTrue(listener.createdToRevokedSeen()); + assertTrue(listener.revokedToPendingShutdownSeen()); } @Test public void testInner() throws Exception { STREAMS_CONFIG.put(StreamsConfig.APPLICATION_ID_CONFIG, appID + "-inner"); - final List> expectedResult = Arrays.asList( - null, - null, - null, - null, - Collections.singletonList("B-a"), - null, - null, - null, - null, - null, - null, - null, - null, - null, - Collections.singletonList("D-d") + final List>> expectedResult = Arrays.asList( + null, + null, + null, + null, + Collections.singletonList(new KeyValueTimestamp<>(ANY_UNIQUE_KEY, "B-a", 5L)), + null, + null, + null, + null, + null, + null, + null, + null, + null, + Collections.singletonList(new KeyValueTimestamp<>(ANY_UNIQUE_KEY, "D-d", 15L)) ); leftStream.join(rightTable, valueJoiner).to(OUTPUT_TOPIC); @@ -117,22 +117,22 @@ public class StreamTableJoinIntegrationTest extends AbstractJoinIntegrationTest public void testLeft() throws Exception { STREAMS_CONFIG.put(StreamsConfig.APPLICATION_ID_CONFIG, appID + "-left"); - final List> expectedResult = Arrays.asList( - null, - null, - Collections.singletonList("A-null"), - null, - Collections.singletonList("B-a"), - null, - null, - null, - Collections.singletonList("C-null"), - null, - null, - null, - null, - null, - Collections.singletonList("D-d") + final List>> expectedResult = Arrays.asList( + null, + null, + Collections.singletonList(new KeyValueTimestamp<>(ANY_UNIQUE_KEY, "A-null", 3L)), + null, + Collections.singletonList(new KeyValueTimestamp<>(ANY_UNIQUE_KEY, "B-a", 5L)), + null, + null, + null, + Collections.singletonList(new KeyValueTimestamp<>(ANY_UNIQUE_KEY, "C-null", 9L)), + null, + null, + null, + null, + null, + Collections.singletonList(new KeyValueTimestamp<>(ANY_UNIQUE_KEY, "D-d", 15L)) ); leftStream.leftJoin(rightTable, valueJoiner).to(OUTPUT_TOPIC); diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/TableTableJoinIntegrationTest.java b/streams/src/test/java/org/apache/kafka/streams/integration/TableTableJoinIntegrationTest.java index 73d1e3d315a..2b685f9e63f 100644 --- a/streams/src/test/java/org/apache/kafka/streams/integration/TableTableJoinIntegrationTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/integration/TableTableJoinIntegrationTest.java @@ -18,6 +18,7 @@ package org.apache.kafka.streams.integration; import org.apache.kafka.common.serialization.Serdes; import org.apache.kafka.common.utils.Bytes; +import org.apache.kafka.streams.KeyValueTimestamp; import org.apache.kafka.streams.StreamsBuilder; import org.apache.kafka.streams.StreamsConfig; import org.apache.kafka.streams.kstream.ForeachAction; @@ -59,8 +60,8 @@ public class TableTableJoinIntegrationTest extends AbstractJoinIntegrationTest { rightTable = builder.table(INPUT_TOPIC_RIGHT, Materialized.>as("right").withLoggingDisabled()); } - final private String expectedFinalJoinResult = "D-d"; - final private String expectedFinalMultiJoinResult = "D-d-d"; + final private KeyValueTimestamp expectedFinalJoinResult = new KeyValueTimestamp<>(ANY_UNIQUE_KEY, "D-d", 15L); + final private KeyValueTimestamp expectedFinalMultiJoinResult = new KeyValueTimestamp<>(ANY_UNIQUE_KEY, "D-d-d", 15L); final private String storeName = appID + "-store"; private Materialized> materialized = Materialized.>as(storeName) @@ -70,7 +71,7 @@ public class TableTableJoinIntegrationTest extends AbstractJoinIntegrationTest { .withLoggingDisabled(); final private class CountingPeek implements ForeachAction { - final private String expected; + final private KeyValueTimestamp expected; CountingPeek(final boolean multiJoin) { this.expected = multiJoin ? expectedFinalMultiJoinResult : expectedFinalJoinResult; @@ -79,7 +80,7 @@ public class TableTableJoinIntegrationTest extends AbstractJoinIntegrationTest { @Override public void apply(final Long key, final String value) { numRecordsExpected++; - if (expected.equals(value)) { + if (expected.value().equals(value)) { final boolean ret = finalResultReached.compareAndSet(false, true); if (!ret) { @@ -98,22 +99,22 @@ public class TableTableJoinIntegrationTest extends AbstractJoinIntegrationTest { leftTable.join(rightTable, valueJoiner, materialized).toStream().peek(new CountingPeek(false)).to(OUTPUT_TOPIC); runTest(expectedFinalJoinResult, storeName); } else { - final List> expectedResult = Arrays.asList( - null, - null, - null, - Collections.singletonList("A-a"), - Collections.singletonList("B-a"), - Collections.singletonList("B-b"), - Collections.singletonList((String) null), - null, - null, - Collections.singletonList("C-c"), - Collections.singletonList((String) null), - null, - null, - null, - Collections.singletonList("D-d") + final List>> expectedResult = Arrays.asList( + null, + null, + null, + Collections.singletonList(new KeyValueTimestamp<>(ANY_UNIQUE_KEY, "A-a", 4L)), + Collections.singletonList(new KeyValueTimestamp<>(ANY_UNIQUE_KEY, "B-a", 5L)), + Collections.singletonList(new KeyValueTimestamp<>(ANY_UNIQUE_KEY, "B-b", 6L)), + Collections.singletonList(new KeyValueTimestamp<>(ANY_UNIQUE_KEY, null, 7L)), + null, + null, + Collections.singletonList(new KeyValueTimestamp<>(ANY_UNIQUE_KEY, "C-c", 10L)), + Collections.singletonList(new KeyValueTimestamp<>(ANY_UNIQUE_KEY, null, 11L)), + null, + null, + null, + Collections.singletonList(new KeyValueTimestamp<>(ANY_UNIQUE_KEY, "D-d", 15L)) ); leftTable.join(rightTable, valueJoiner, materialized).toStream().to(OUTPUT_TOPIC); @@ -129,22 +130,22 @@ public class TableTableJoinIntegrationTest extends AbstractJoinIntegrationTest { leftTable.leftJoin(rightTable, valueJoiner, materialized).toStream().peek(new CountingPeek(false)).to(OUTPUT_TOPIC); runTest(expectedFinalJoinResult, storeName); } else { - final List> expectedResult = Arrays.asList( - null, - null, - Collections.singletonList("A-null"), - Collections.singletonList("A-a"), - Collections.singletonList("B-a"), - Collections.singletonList("B-b"), - Collections.singletonList((String) null), - null, - Collections.singletonList("C-null"), - Collections.singletonList("C-c"), - Collections.singletonList("C-null"), - Collections.singletonList((String) null), - null, - null, - Collections.singletonList("D-d") + final List>> expectedResult = Arrays.asList( + null, + null, + Collections.singletonList(new KeyValueTimestamp<>(ANY_UNIQUE_KEY, "A-null", 3L)), + Collections.singletonList(new KeyValueTimestamp<>(ANY_UNIQUE_KEY, "A-a", 4L)), + Collections.singletonList(new KeyValueTimestamp<>(ANY_UNIQUE_KEY, "B-a", 5L)), + Collections.singletonList(new KeyValueTimestamp<>(ANY_UNIQUE_KEY, "B-b", 6L)), + Collections.singletonList(new KeyValueTimestamp<>(ANY_UNIQUE_KEY, null, 7L)), + null, + Collections.singletonList(new KeyValueTimestamp<>(ANY_UNIQUE_KEY, "C-null", 9L)), + Collections.singletonList(new KeyValueTimestamp<>(ANY_UNIQUE_KEY, "C-c", 10L)), + Collections.singletonList(new KeyValueTimestamp<>(ANY_UNIQUE_KEY, "C-null", 11L)), + Collections.singletonList(new KeyValueTimestamp<>(ANY_UNIQUE_KEY, null, 12L)), + null, + null, + Collections.singletonList(new KeyValueTimestamp<>(ANY_UNIQUE_KEY, "D-d", 15L)) ); leftTable.leftJoin(rightTable, valueJoiner, materialized).toStream().to(OUTPUT_TOPIC); @@ -160,22 +161,22 @@ public class TableTableJoinIntegrationTest extends AbstractJoinIntegrationTest { leftTable.outerJoin(rightTable, valueJoiner, materialized).toStream().peek(new CountingPeek(false)).to(OUTPUT_TOPIC); runTest(expectedFinalJoinResult, storeName); } else { - final List> expectedResult = Arrays.asList( - null, - null, - Collections.singletonList("A-null"), - Collections.singletonList("A-a"), - Collections.singletonList("B-a"), - Collections.singletonList("B-b"), - Collections.singletonList("null-b"), - Collections.singletonList((String) null), - Collections.singletonList("C-null"), - Collections.singletonList("C-c"), - Collections.singletonList("C-null"), - Collections.singletonList((String) null), - null, - Collections.singletonList("null-d"), - Collections.singletonList("D-d") + final List>> expectedResult = Arrays.asList( + null, + null, + Collections.singletonList(new KeyValueTimestamp<>(ANY_UNIQUE_KEY, "A-null", 3L)), + Collections.singletonList(new KeyValueTimestamp<>(ANY_UNIQUE_KEY, "A-a", 4L)), + Collections.singletonList(new KeyValueTimestamp<>(ANY_UNIQUE_KEY, "B-a", 5L)), + Collections.singletonList(new KeyValueTimestamp<>(ANY_UNIQUE_KEY, "B-b", 6L)), + Collections.singletonList(new KeyValueTimestamp<>(ANY_UNIQUE_KEY, "null-b", 7L)), + Collections.singletonList(new KeyValueTimestamp<>(ANY_UNIQUE_KEY, null, 8L)), + Collections.singletonList(new KeyValueTimestamp<>(ANY_UNIQUE_KEY, "C-null", 9L)), + Collections.singletonList(new KeyValueTimestamp<>(ANY_UNIQUE_KEY, "C-c", 10L)), + Collections.singletonList(new KeyValueTimestamp<>(ANY_UNIQUE_KEY, "C-null", 11L)), + Collections.singletonList(new KeyValueTimestamp<>(ANY_UNIQUE_KEY, null, 12L)), + null, + Collections.singletonList(new KeyValueTimestamp<>(ANY_UNIQUE_KEY, "null-d", 14L)), + Collections.singletonList(new KeyValueTimestamp<>(ANY_UNIQUE_KEY, "D-d", 15L)) ); leftTable.outerJoin(rightTable, valueJoiner, materialized).toStream().to(OUTPUT_TOPIC); @@ -197,22 +198,29 @@ public class TableTableJoinIntegrationTest extends AbstractJoinIntegrationTest { } else { // FIXME: the duplicate below for all the multi-joins // are due to KAFKA-6443, should be updated once it is fixed. - final List> expectedResult = Arrays.asList( - null, - null, - null, - Arrays.asList("A-a-a", "A-a-a"), - Collections.singletonList("B-a-a"), - Arrays.asList("B-b-b", "B-b-b"), - Collections.singletonList((String) null), - null, - null, - Arrays.asList("C-c-c", "C-c-c"), - null, - null, - null, - null, - Collections.singletonList("D-d-d") + final List>> expectedResult = Arrays.asList( + null, + null, + null, + Arrays.asList( + new KeyValueTimestamp<>(ANY_UNIQUE_KEY, "A-a-a", 4L), + new KeyValueTimestamp<>(ANY_UNIQUE_KEY, "A-a-a", 4L)), + Collections.singletonList(new KeyValueTimestamp<>(ANY_UNIQUE_KEY, "B-a-a", 5L)), + Arrays.asList( + new KeyValueTimestamp<>(ANY_UNIQUE_KEY, "B-b-b", 6L), + new KeyValueTimestamp<>(ANY_UNIQUE_KEY, "B-b-b", 6L)), + Collections.singletonList(new KeyValueTimestamp<>(ANY_UNIQUE_KEY, null, 7L)), + null, + null, + Arrays.asList( + new KeyValueTimestamp<>(ANY_UNIQUE_KEY, "C-c-c", 10L), + new KeyValueTimestamp<>(ANY_UNIQUE_KEY, "C-c-c", 10L)), + null, // correct would be -> new KeyValueTimestamp<>(ANY_UNIQUE_KEY, null, 11L) + // we don't get correct value, because of self-join of `rightTable` + null, + null, + null, + Collections.singletonList(new KeyValueTimestamp<>(ANY_UNIQUE_KEY, "D-d-d", 15L)) ); leftTable.join(rightTable, valueJoiner) @@ -235,22 +243,28 @@ public class TableTableJoinIntegrationTest extends AbstractJoinIntegrationTest { .to(OUTPUT_TOPIC); runTest(expectedFinalMultiJoinResult, storeName); } else { - final List> expectedResult = Arrays.asList( - null, - null, - null, - Arrays.asList("A-a-a", "A-a-a"), - Collections.singletonList("B-a-a"), - Arrays.asList("B-b-b", "B-b-b"), - Collections.singletonList((String) null), - null, - null, - Arrays.asList("C-c-c", "C-c-c"), - Collections.singletonList((String) null), - null, - null, - null, - Collections.singletonList("D-d-d") + final List>> expectedResult = Arrays.asList( + null, + null, + null, + Arrays.asList( + new KeyValueTimestamp<>(ANY_UNIQUE_KEY, "A-a-a", 4L), + new KeyValueTimestamp<>(ANY_UNIQUE_KEY, "A-a-a", 4L)), + Collections.singletonList(new KeyValueTimestamp<>(ANY_UNIQUE_KEY, "B-a-a", 5L)), + Arrays.asList( + new KeyValueTimestamp<>(ANY_UNIQUE_KEY, "B-b-b", 6L), + new KeyValueTimestamp<>(ANY_UNIQUE_KEY, "B-b-b", 6L)), + Collections.singletonList(new KeyValueTimestamp<>(ANY_UNIQUE_KEY, null, 7L)), + null, + null, + Arrays.asList( + new KeyValueTimestamp<>(ANY_UNIQUE_KEY, "C-c-c", 10L), + new KeyValueTimestamp<>(ANY_UNIQUE_KEY, "C-c-c", 10L)), + Collections.singletonList(new KeyValueTimestamp<>(ANY_UNIQUE_KEY, null, 11L)), + null, + null, + null, + Collections.singletonList(new KeyValueTimestamp<>(ANY_UNIQUE_KEY, "D-d-d", 15L)) ); leftTable.join(rightTable, valueJoiner) @@ -274,22 +288,33 @@ public class TableTableJoinIntegrationTest extends AbstractJoinIntegrationTest { .to(OUTPUT_TOPIC); runTest(expectedFinalMultiJoinResult, storeName); } else { - final List> expectedResult = Arrays.asList( - null, - null, - null, - Arrays.asList("A-a-a", "A-a-a"), - Collections.singletonList("B-a-a"), - Arrays.asList("B-b-b", "B-b-b"), - Collections.singletonList("null-b"), - Collections.singletonList((String) null), - null, - Arrays.asList("C-c-c", "C-c-c"), - Arrays.asList((String) null, null), - null, - null, - null, - Arrays.asList("null-d", "D-d-d") + final List>> expectedResult = Arrays.asList( + null, + null, + null, + Arrays.asList( + new KeyValueTimestamp<>(ANY_UNIQUE_KEY, "A-a-a", 4L), + new KeyValueTimestamp<>(ANY_UNIQUE_KEY, "A-a-a", 4L)), + Collections.singletonList(new KeyValueTimestamp<>(ANY_UNIQUE_KEY, "B-a-a", 5L)), + Arrays.asList( + new KeyValueTimestamp<>(ANY_UNIQUE_KEY, "B-b-b", 6L), + new KeyValueTimestamp<>(ANY_UNIQUE_KEY, "B-b-b", 6L)), + Collections.singletonList(new KeyValueTimestamp<>(ANY_UNIQUE_KEY, "null-b", 7L)), + Collections.singletonList(new KeyValueTimestamp<>(ANY_UNIQUE_KEY, null, 8L)), + null, + Arrays.asList( + new KeyValueTimestamp<>(ANY_UNIQUE_KEY, "C-c-c", 10L), + new KeyValueTimestamp<>(ANY_UNIQUE_KEY, "C-c-c", 10L)), + Arrays.asList( + new KeyValueTimestamp<>(ANY_UNIQUE_KEY, null, 11L), + new KeyValueTimestamp<>(ANY_UNIQUE_KEY, null, 11L)), + null, + null, + null, + Arrays.asList( + // incorrect result `null-d` is caused by self-join of `rightTable` + new KeyValueTimestamp<>(ANY_UNIQUE_KEY, "null-d", 14L), + new KeyValueTimestamp<>(ANY_UNIQUE_KEY, "D-d-d", 15L)) ); leftTable.join(rightTable, valueJoiner) @@ -312,22 +337,28 @@ public class TableTableJoinIntegrationTest extends AbstractJoinIntegrationTest { .to(OUTPUT_TOPIC); runTest(expectedFinalMultiJoinResult, storeName); } else { - final List> expectedResult = Arrays.asList( - null, - null, - null, - Arrays.asList("A-a-a", "A-a-a"), - Collections.singletonList("B-a-a"), - Arrays.asList("B-b-b", "B-b-b"), - Collections.singletonList((String) null), - null, - null, - Arrays.asList("C-c-c", "C-c-c"), - Collections.singletonList((String) null), - null, - null, - null, - Collections.singletonList("D-d-d") + final List>> expectedResult = Arrays.asList( + null, + null, + null, + Arrays.asList( + new KeyValueTimestamp<>(ANY_UNIQUE_KEY, "A-a-a", 4L), + new KeyValueTimestamp<>(ANY_UNIQUE_KEY, "A-a-a", 4L)), + Collections.singletonList(new KeyValueTimestamp<>(ANY_UNIQUE_KEY, "B-a-a", 5L)), + Arrays.asList( + new KeyValueTimestamp<>(ANY_UNIQUE_KEY, "B-b-b", 6L), + new KeyValueTimestamp<>(ANY_UNIQUE_KEY, "B-b-b", 6L)), + Collections.singletonList(new KeyValueTimestamp<>(ANY_UNIQUE_KEY, null, 7L)), + null, + null, + Arrays.asList( + new KeyValueTimestamp<>(ANY_UNIQUE_KEY, "C-c-c", 10L), + new KeyValueTimestamp<>(ANY_UNIQUE_KEY, "C-c-c", 10L)), + Collections.singletonList(new KeyValueTimestamp<>(ANY_UNIQUE_KEY, null, 11L)), + null, + null, + null, + Collections.singletonList(new KeyValueTimestamp<>(ANY_UNIQUE_KEY, "D-d-d", 15L)) ); leftTable.leftJoin(rightTable, valueJoiner) @@ -351,22 +382,32 @@ public class TableTableJoinIntegrationTest extends AbstractJoinIntegrationTest { .to(OUTPUT_TOPIC); runTest(expectedFinalMultiJoinResult, storeName); } else { - final List> expectedResult = Arrays.asList( - null, - null, - null, - Arrays.asList("A-null-null", "A-a-a", "A-a-a"), - Collections.singletonList("B-a-a"), - Arrays.asList("B-b-b", "B-b-b"), - Collections.singletonList((String) null), - null, - null, - Arrays.asList("C-null-null", "C-c-c", "C-c-c"), - Arrays.asList("C-null-null", "C-null-null"), - Collections.singletonList((String) null), - null, - null, - Collections.singletonList("D-d-d") + final List>> expectedResult = Arrays.asList( + null, + null, + null, + Arrays.asList( + new KeyValueTimestamp<>(ANY_UNIQUE_KEY, "A-null-null", 3L), + new KeyValueTimestamp<>(ANY_UNIQUE_KEY, "A-a-a", 4L), + new KeyValueTimestamp<>(ANY_UNIQUE_KEY, "A-a-a", 4L)), + Collections.singletonList(new KeyValueTimestamp<>(ANY_UNIQUE_KEY, "B-a-a", 5L)), + Arrays.asList( + new KeyValueTimestamp<>(ANY_UNIQUE_KEY, "B-b-b", 6L), + new KeyValueTimestamp<>(ANY_UNIQUE_KEY, "B-b-b", 6L)), + Collections.singletonList(new KeyValueTimestamp<>(ANY_UNIQUE_KEY, null, 7L)), + null, + null, + Arrays.asList( + new KeyValueTimestamp<>(ANY_UNIQUE_KEY, "C-null-null", 9L), + new KeyValueTimestamp<>(ANY_UNIQUE_KEY, "C-c-c", 10L), + new KeyValueTimestamp<>(ANY_UNIQUE_KEY, "C-c-c", 10L)), + Arrays.asList( + new KeyValueTimestamp<>(ANY_UNIQUE_KEY, "C-null-null", 11L), + new KeyValueTimestamp<>(ANY_UNIQUE_KEY, "C-null-null", 11L)), + Collections.singletonList(new KeyValueTimestamp<>(ANY_UNIQUE_KEY, null, 12L)), + null, + null, + Collections.singletonList(new KeyValueTimestamp<>(ANY_UNIQUE_KEY, "D-d-d", 15L)) ); leftTable.leftJoin(rightTable, valueJoiner) @@ -390,22 +431,34 @@ public class TableTableJoinIntegrationTest extends AbstractJoinIntegrationTest { .to(OUTPUT_TOPIC); runTest(expectedFinalMultiJoinResult, storeName); } else { - final List> expectedResult = Arrays.asList( - null, - null, - null, - Arrays.asList("A-null-null", "A-a-a", "A-a-a"), - Collections.singletonList("B-a-a"), - Arrays.asList("B-b-b", "B-b-b"), - Collections.singletonList("null-b"), - Collections.singletonList((String) null), - null, - Arrays.asList("C-null-null", "C-c-c", "C-c-c"), - Arrays.asList("C-null-null", "C-null-null"), - Collections.singletonList((String) null), - null, - null, - Arrays.asList("null-d", "D-d-d") + final List>> expectedResult = Arrays.asList( + null, + null, + null, + Arrays.asList( + new KeyValueTimestamp<>(ANY_UNIQUE_KEY, "A-null-null", 3L), + new KeyValueTimestamp<>(ANY_UNIQUE_KEY, "A-a-a", 4L), + new KeyValueTimestamp<>(ANY_UNIQUE_KEY, "A-a-a", 4L)), + Collections.singletonList(new KeyValueTimestamp<>(ANY_UNIQUE_KEY, "B-a-a", 5L)), + Arrays.asList( + new KeyValueTimestamp<>(ANY_UNIQUE_KEY, "B-b-b", 6L), + new KeyValueTimestamp<>(ANY_UNIQUE_KEY, "B-b-b", 6L)), + Collections.singletonList(new KeyValueTimestamp<>(ANY_UNIQUE_KEY, "null-b", 7L)), + Collections.singletonList(new KeyValueTimestamp<>(ANY_UNIQUE_KEY, null, 8L)), + null, + Arrays.asList( + new KeyValueTimestamp<>(ANY_UNIQUE_KEY, "C-null-null", 9L), + new KeyValueTimestamp<>(ANY_UNIQUE_KEY, "C-c-c", 10L), + new KeyValueTimestamp<>(ANY_UNIQUE_KEY, "C-c-c", 10L)), + Arrays.asList( + new KeyValueTimestamp<>(ANY_UNIQUE_KEY, "C-null-null", 11L), + new KeyValueTimestamp<>(ANY_UNIQUE_KEY, "C-null-null", 11L)), + Collections.singletonList(new KeyValueTimestamp<>(ANY_UNIQUE_KEY, null, 12L)), + null, + null, + Arrays.asList( + new KeyValueTimestamp<>(ANY_UNIQUE_KEY, "null-d", 14L), + new KeyValueTimestamp<>(ANY_UNIQUE_KEY, "D-d-d", 15L)) ); leftTable.leftJoin(rightTable, valueJoiner) @@ -428,22 +481,30 @@ public class TableTableJoinIntegrationTest extends AbstractJoinIntegrationTest { .to(OUTPUT_TOPIC); runTest(expectedFinalMultiJoinResult, storeName); } else { - final List> expectedResult = Arrays.asList( - null, - null, - null, - Arrays.asList("A-a-a", "A-a-a"), - Collections.singletonList("B-a-a"), - Arrays.asList("B-b-b", "B-b-b"), - Collections.singletonList("null-b-b"), - null, - null, - Arrays.asList("C-c-c", "C-c-c"), - Collections.singletonList((String) null), - null, - null, - Arrays.asList("null-d-d", "null-d-d"), - Collections.singletonList("D-d-d") + final List>> expectedResult = Arrays.asList( + null, + null, + null, + Arrays.asList( + new KeyValueTimestamp<>(ANY_UNIQUE_KEY, "A-a-a", 4L), + new KeyValueTimestamp<>(ANY_UNIQUE_KEY, "A-a-a", 4L)), + Collections.singletonList(new KeyValueTimestamp<>(ANY_UNIQUE_KEY, "B-a-a", 5L)), + Arrays.asList( + new KeyValueTimestamp<>(ANY_UNIQUE_KEY, "B-b-b", 6L), + new KeyValueTimestamp<>(ANY_UNIQUE_KEY, "B-b-b", 6L)), + Collections.singletonList(new KeyValueTimestamp<>(ANY_UNIQUE_KEY, "null-b-b", 7L)), + null, + null, + Arrays.asList( + new KeyValueTimestamp<>(ANY_UNIQUE_KEY, "C-c-c", 10L), + new KeyValueTimestamp<>(ANY_UNIQUE_KEY, "C-c-c", 10L)), + Collections.singletonList(new KeyValueTimestamp<>(ANY_UNIQUE_KEY, null, 11L)), + null, + null, + Arrays.asList( + new KeyValueTimestamp<>(ANY_UNIQUE_KEY, "null-d-d", 14L), + new KeyValueTimestamp<>(ANY_UNIQUE_KEY, "null-d-d", 14L)), + Collections.singletonList(new KeyValueTimestamp<>(ANY_UNIQUE_KEY, "D-d-d", 15L)) ); leftTable.outerJoin(rightTable, valueJoiner) @@ -467,22 +528,34 @@ public class TableTableJoinIntegrationTest extends AbstractJoinIntegrationTest { .to(OUTPUT_TOPIC); runTest(expectedFinalMultiJoinResult, storeName); } else { - final List> expectedResult = Arrays.asList( - null, - null, - null, - Arrays.asList("A-null-null", "A-a-a", "A-a-a"), - Collections.singletonList("B-a-a"), - Arrays.asList("B-b-b", "B-b-b"), - Collections.singletonList("null-b-b"), - Collections.singletonList((String) null), - null, - Arrays.asList("C-null-null", "C-c-c", "C-c-c"), - Arrays.asList("C-null-null", "C-null-null"), - Collections.singletonList((String) null), - null, - Arrays.asList("null-d-d", "null-d-d"), - Collections.singletonList("D-d-d") + final List>> expectedResult = Arrays.asList( + null, + null, + null, + Arrays.asList( + new KeyValueTimestamp<>(ANY_UNIQUE_KEY, "A-null-null", 3L), + new KeyValueTimestamp<>(ANY_UNIQUE_KEY, "A-a-a", 4L), + new KeyValueTimestamp<>(ANY_UNIQUE_KEY, "A-a-a", 4L)), + Collections.singletonList(new KeyValueTimestamp<>(ANY_UNIQUE_KEY, "B-a-a", 5L)), + Arrays.asList( + new KeyValueTimestamp<>(ANY_UNIQUE_KEY, "B-b-b", 6L), + new KeyValueTimestamp<>(ANY_UNIQUE_KEY, "B-b-b", 6L)), + Collections.singletonList(new KeyValueTimestamp<>(ANY_UNIQUE_KEY, "null-b-b", 7L)), + Collections.singletonList(new KeyValueTimestamp<>(ANY_UNIQUE_KEY, null, 8L)), + null, + Arrays.asList( + new KeyValueTimestamp<>(ANY_UNIQUE_KEY, "C-null-null", 9L), + new KeyValueTimestamp<>(ANY_UNIQUE_KEY, "C-c-c", 10L), + new KeyValueTimestamp<>(ANY_UNIQUE_KEY, "C-c-c", 10L)), + Arrays.asList( + new KeyValueTimestamp<>(ANY_UNIQUE_KEY, "C-null-null", 11L), + new KeyValueTimestamp<>(ANY_UNIQUE_KEY, "C-null-null", 11L)), + Collections.singletonList(new KeyValueTimestamp<>(ANY_UNIQUE_KEY, null, 12L)), + null, + Arrays.asList( + new KeyValueTimestamp<>(ANY_UNIQUE_KEY, "null-d-d", 14L), + new KeyValueTimestamp<>(ANY_UNIQUE_KEY, "null-d-d", 14L)), + Collections.singletonList(new KeyValueTimestamp<>(ANY_UNIQUE_KEY, "D-d-d", 15L)) ); leftTable.outerJoin(rightTable, valueJoiner) @@ -506,22 +579,36 @@ public class TableTableJoinIntegrationTest extends AbstractJoinIntegrationTest { .to(OUTPUT_TOPIC); runTest(expectedFinalMultiJoinResult, storeName); } else { - final List> expectedResult = Arrays.asList( - null, - null, - null, - Arrays.asList("A-null-null", "A-a-a", "A-a-a"), - Collections.singletonList("B-a-a"), - Arrays.asList("B-b-b", "B-b-b"), - Collections.singletonList("null-b-b"), - Arrays.asList((String) null, null), - null, - Arrays.asList("C-null-null", "C-c-c", "C-c-c"), - Arrays.asList("C-null-null", "C-null-null"), - Collections.singletonList((String) null), - null, - null, - Arrays.asList("null-d-d", "null-d-d", "D-d-d") + final List>> expectedResult = Arrays.asList( + null, + null, + null, + Arrays.asList( + new KeyValueTimestamp<>(ANY_UNIQUE_KEY, "A-null-null", 3L), + new KeyValueTimestamp<>(ANY_UNIQUE_KEY, "A-a-a", 4L), + new KeyValueTimestamp<>(ANY_UNIQUE_KEY, "A-a-a", 4L)), + Collections.singletonList(new KeyValueTimestamp<>(ANY_UNIQUE_KEY, "B-a-a", 5L)), + Arrays.asList( + new KeyValueTimestamp<>(ANY_UNIQUE_KEY, "B-b-b", 6L), + new KeyValueTimestamp<>(ANY_UNIQUE_KEY, "B-b-b", 6L)), + Collections.singletonList(new KeyValueTimestamp<>(ANY_UNIQUE_KEY, "null-b-b", 7L)), + Arrays.asList( + new KeyValueTimestamp<>(ANY_UNIQUE_KEY, null, 8L), + new KeyValueTimestamp<>(ANY_UNIQUE_KEY, null, 8L)), + Collections.singletonList(new KeyValueTimestamp<>(ANY_UNIQUE_KEY, "C-null-null", 9L)), + Arrays.asList( + new KeyValueTimestamp<>(ANY_UNIQUE_KEY, "C-c-c", 10L), + new KeyValueTimestamp<>(ANY_UNIQUE_KEY, "C-c-c", 10L)), + Arrays.asList( + new KeyValueTimestamp<>(ANY_UNIQUE_KEY, "C-null-null", 11L), + new KeyValueTimestamp<>(ANY_UNIQUE_KEY, "C-null-null", 11L)), + Collections.singletonList(new KeyValueTimestamp<>(ANY_UNIQUE_KEY, null, 12L)), + null, + null, + Arrays.asList( + new KeyValueTimestamp<>(ANY_UNIQUE_KEY, "null-d-d", 14L), + new KeyValueTimestamp<>(ANY_UNIQUE_KEY, "null-d-d", 14L), + new KeyValueTimestamp<>(ANY_UNIQUE_KEY, "D-d-d", 15L)) ); leftTable.outerJoin(rightTable, valueJoiner) diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/utils/IntegrationTestUtils.java b/streams/src/test/java/org/apache/kafka/streams/integration/utils/IntegrationTestUtils.java index e6cf85d83b0..46515aa8469 100644 --- a/streams/src/test/java/org/apache/kafka/streams/integration/utils/IntegrationTestUtils.java +++ b/streams/src/test/java/org/apache/kafka/streams/integration/utils/IntegrationTestUtils.java @@ -241,12 +241,14 @@ public class IntegrationTestUtils { * @param Key type of the data records * @param Value type of the data records */ + @SuppressWarnings("WeakerAccess") public static void produceKeyValuesSynchronouslyWithTimestamp(final String topic, final Collection> records, final Properties producerConfig, final Long timestamp, final boolean enableTransactions) - throws ExecutionException, InterruptedException { + throws ExecutionException, InterruptedException { + produceKeyValuesSynchronouslyWithTimestamp(topic, records, producerConfig, null, timestamp, enableTransactions); } @@ -260,13 +262,15 @@ public class IntegrationTestUtils { * @param Key type of the data records * @param Value type of the data records */ + @SuppressWarnings("WeakerAccess") public static void produceKeyValuesSynchronouslyWithTimestamp(final String topic, final Collection> records, final Properties producerConfig, final Headers headers, final Long timestamp, final boolean enableTransactions) - throws ExecutionException, InterruptedException { + throws ExecutionException, InterruptedException { + try (final Producer producer = new KafkaProducer<>(producerConfig)) { if (enableTransactions) { producer.initTransactions(); @@ -369,6 +373,7 @@ public class IntegrationTestUtils { * @param enableTransactions Send messages in a transaction * @param Value type of the data records */ + @SuppressWarnings("WeakerAccess") public static void produceValuesSynchronously(final String topic, final Collection records, final Properties producerConfig, @@ -427,6 +432,7 @@ public class IntegrationTestUtils { * @param Value type of the data records * @return All the records consumed, or null if no records are consumed */ + @SuppressWarnings("WeakerAccess") public static List> waitUntilMinRecordsReceived(final Properties consumerConfig, final String topic, final int expectedNumRecords) throws InterruptedException { @@ -444,6 +450,7 @@ public class IntegrationTestUtils { * @param Value type of the data records * @return All the records consumed, or null if no records are consumed */ + @SuppressWarnings("WeakerAccess") public static List> waitUntilMinRecordsReceived(final Properties consumerConfig, final String topic, final int expectedNumRecords, @@ -519,14 +526,14 @@ public class IntegrationTestUtils { * @param Key type of the data records * @param Value type of the data records */ - public static List>> waitUntilMinKeyValueWithTimestampRecordsReceived(final Properties consumerConfig, + public static List> waitUntilMinKeyValueWithTimestampRecordsReceived(final Properties consumerConfig, final String topic, final int expectedNumRecords, final long waitTime) throws InterruptedException { - final List>> accumData = new ArrayList<>(); + final List> accumData = new ArrayList<>(); try (final Consumer consumer = createConsumer(consumerConfig)) { final TestCondition valuesRead = () -> { - final List>> readData = + final List> readData = readKeyValuesWithTimestamp(topic, consumer, waitTime, expectedNumRecords); accumData.addAll(readData); return accumData.size() >= expectedNumRecords; @@ -553,6 +560,22 @@ public class IntegrationTestUtils { return waitUntilFinalKeyValueRecordsReceived(consumerConfig, topic, expectedRecords, DEFAULT_TIMEOUT); } + /** + * Wait until final key-value mappings have been consumed. + * + * @param consumerConfig Kafka Consumer configuration + * @param topic Kafka topic to consume from + * @param expectedRecords Expected key-value mappings + * @param Key type of the data records + * @param Value type of the data records + * @return All the mappings consumed, or null if no records are consumed + */ + public static List> waitUntilFinalKeyValueTimestampRecordsReceived(final Properties consumerConfig, + final String topic, + final List> expectedRecords) throws InterruptedException { + return waitUntilFinalKeyValueRecordsReceived(consumerConfig, topic, expectedRecords, DEFAULT_TIMEOUT, true); + } + /** * Wait until final key-value mappings have been consumed. * @@ -564,28 +587,53 @@ public class IntegrationTestUtils { * @param Value type of the data records * @return All the mappings consumed, or null if no records are consumed */ + @SuppressWarnings("WeakerAccess") public static List> waitUntilFinalKeyValueRecordsReceived(final Properties consumerConfig, final String topic, final List> expectedRecords, final long waitTime) throws InterruptedException { - final List> accumData = new ArrayList<>(); + return waitUntilFinalKeyValueRecordsReceived(consumerConfig, topic, expectedRecords, waitTime, false); + } + + public static List> waitUntilFinalKeyValueTimestampRecordsReceived(final Properties consumerConfig, + final String topic, + final List> expectedRecords, + final long waitTime) throws InterruptedException { + return waitUntilFinalKeyValueRecordsReceived(consumerConfig, topic, expectedRecords, waitTime, true); + } + + @SuppressWarnings("unchecked") + private static List waitUntilFinalKeyValueRecordsReceived(final Properties consumerConfig, + final String topic, + final List expectedRecords, + final long waitTime, + final boolean withTimestamp) throws InterruptedException { + final List accumData = new ArrayList<>(); try (final Consumer consumer = createConsumer(consumerConfig)) { final TestCondition valuesRead = () -> { - final List> readData = - readKeyValues(topic, consumer, waitTime, expectedRecords.size()); + final List readData; + if (withTimestamp) { + readData = (List) readKeyValuesWithTimestamp(topic, consumer, waitTime, expectedRecords.size()); + } else { + readData = (List) readKeyValues(topic, consumer, waitTime, expectedRecords.size()); + } accumData.addAll(readData); // filter out all intermediate records we don't want - final List> accumulatedActual = accumData.stream().filter(expectedRecords::contains).collect(Collectors.toList()); + final List accumulatedActual = accumData.stream().filter(expectedRecords::contains).collect(Collectors.toList()); // still need to check that for each key, the ordering is expected - final Map>> finalAccumData = new HashMap<>(); - for (final KeyValue kv : accumulatedActual) { - finalAccumData.computeIfAbsent(kv.key, key -> new ArrayList<>()).add(kv); + final Map> finalAccumData = new HashMap<>(); + for (final T kv : accumulatedActual) { + finalAccumData.computeIfAbsent( + (K) (withTimestamp ? ((KeyValueTimestamp) kv).key() : ((KeyValue) kv).key), + key -> new ArrayList<>()).add(kv); } - final Map>> finalExpected = new HashMap<>(); - for (final KeyValue kv : expectedRecords) { - finalExpected.computeIfAbsent(kv.key, key -> new ArrayList<>()).add(kv); + final Map> finalExpected = new HashMap<>(); + for (final T kv : expectedRecords) { + finalExpected.computeIfAbsent( + (K) (withTimestamp ? ((KeyValueTimestamp) kv).key() : ((KeyValue) kv).key), + key -> new ArrayList<>()).add(kv); } // returns true only if the remaining records in both lists are the same and in the same order @@ -643,6 +691,7 @@ public class IntegrationTestUtils { return accumData; } + @SuppressWarnings("WeakerAccess") public static void waitForTopicPartitions(final List servers, final List partitions, final long timeout) throws InterruptedException { @@ -863,14 +912,14 @@ public class IntegrationTestUtils { * @param maxMessages Maximum number of messages to read via the consumer * @return The KeyValue elements retrieved via the consumer */ - private static List>> readKeyValuesWithTimestamp(final String topic, - final Consumer consumer, - final long waitTime, - final int maxMessages) { - final List>> consumedValues = new ArrayList<>(); + private static List> readKeyValuesWithTimestamp(final String topic, + final Consumer consumer, + final long waitTime, + final int maxMessages) { + final List> consumedValues = new ArrayList<>(); final List> records = readRecords(topic, consumer, waitTime, maxMessages); for (final ConsumerRecord record : records) { - consumedValues.add(new KeyValue<>(record.key(), KeyValue.pair(record.value(), record.timestamp()))); + consumedValues.add(new KeyValueTimestamp<>(record.key(), record.value(), record.timestamp())); } return consumedValues; } diff --git a/streams/src/test/java/org/apache/kafka/test/MockProcessorSupplier.java b/streams/src/test/java/org/apache/kafka/test/MockProcessorSupplier.java index 0bd22d482f2..a6959e19628 100644 --- a/streams/src/test/java/org/apache/kafka/test/MockProcessorSupplier.java +++ b/streams/src/test/java/org/apache/kafka/test/MockProcessorSupplier.java @@ -56,7 +56,11 @@ public class MockProcessorSupplier implements ProcessorSupplier { return capturedProcessors(1).get(0); } - // get the captured processors with the expected number + public int capturedProcessorsCount() { + return processors.size(); + } + + // get the captured processors with the expected number public List> capturedProcessors(final int expectedNumberOfProcessors) { assertEquals(expectedNumberOfProcessors, processors.size());