Browse Source

KAFKA-6455: Update integration tests to verify result timestamps (#6751)

Reviewers: Bill Bejeck <bill@confluent.io>, John Roesler <john@confluent.io>
pull/6846/head
Matthias J. Sax 6 years ago committed by GitHub
parent
commit
77e6e8ec05
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 3
      streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableReduce.java
  2. 68
      streams/src/test/java/org/apache/kafka/streams/integration/AbstractJoinIntegrationTest.java
  3. 36
      streams/src/test/java/org/apache/kafka/streams/integration/AbstractResetIntegrationTest.java
  4. 7
      streams/src/test/java/org/apache/kafka/streams/integration/FineGrainedAutoResetIntegrationTest.java
  5. 113
      streams/src/test/java/org/apache/kafka/streams/integration/GlobalKTableIntegrationTest.java
  6. 63
      streams/src/test/java/org/apache/kafka/streams/integration/KStreamAggregationDedupIntegrationTest.java
  7. 215
      streams/src/test/java/org/apache/kafka/streams/integration/KStreamAggregationIntegrationTest.java
  8. 305
      streams/src/test/java/org/apache/kafka/streams/integration/StreamStreamJoinIntegrationTest.java
  9. 24
      streams/src/test/java/org/apache/kafka/streams/integration/StreamTableJoinIntegrationTest.java
  10. 363
      streams/src/test/java/org/apache/kafka/streams/integration/TableTableJoinIntegrationTest.java
  11. 81
      streams/src/test/java/org/apache/kafka/streams/integration/utils/IntegrationTestUtils.java
  12. 4
      streams/src/test/java/org/apache/kafka/test/MockProcessorSupplier.java

3
streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableReduce.java

@ -80,7 +80,7 @@ public class KTableReduce<K, V> implements KTableProcessorSupplier<K, V, V> { @@ -80,7 +80,7 @@ public class KTableReduce<K, V> implements KTableProcessorSupplier<K, V, V> {
final ValueAndTimestamp<V> oldAggAndTimestamp = store.get(key);
final V oldAgg = getValueOrNull(oldAggAndTimestamp);
final V intermediateAgg;
long newTimestamp = context().timestamp();
long newTimestamp;
// first try to remove the old value
if (value.oldValue != null && oldAgg != null) {
@ -88,6 +88,7 @@ public class KTableReduce<K, V> implements KTableProcessorSupplier<K, V, V> { @@ -88,6 +88,7 @@ public class KTableReduce<K, V> implements KTableProcessorSupplier<K, V, V> {
newTimestamp = Math.max(context().timestamp(), oldAggAndTimestamp.timestamp());
} else {
intermediateAgg = oldAgg;
newTimestamp = context().timestamp();
}
// then try to add the new value

68
streams/src/test/java/org/apache/kafka/streams/integration/AbstractJoinIntegrationTest.java

@ -27,6 +27,7 @@ import org.apache.kafka.common.serialization.StringDeserializer; @@ -27,6 +27,7 @@ import org.apache.kafka.common.serialization.StringDeserializer;
import org.apache.kafka.common.serialization.StringSerializer;
import org.apache.kafka.streams.KafkaStreams;
import org.apache.kafka.streams.KeyValue;
import org.apache.kafka.streams.KeyValueTimestamp;
import org.apache.kafka.streams.StreamsBuilder;
import org.apache.kafka.streams.StreamsConfig;
import org.apache.kafka.streams.integration.utils.EmbeddedKafkaCluster;
@ -35,6 +36,7 @@ import org.apache.kafka.streams.kstream.ValueJoiner; @@ -35,6 +36,7 @@ import org.apache.kafka.streams.kstream.ValueJoiner;
import org.apache.kafka.streams.state.KeyValueIterator;
import org.apache.kafka.streams.state.QueryableStoreTypes;
import org.apache.kafka.streams.state.ReadOnlyKeyValueStore;
import org.apache.kafka.streams.state.ValueAndTimestamp;
import org.apache.kafka.test.IntegrationTest;
import org.apache.kafka.test.TestUtils;
import org.junit.After;
@ -51,6 +53,7 @@ import java.util.ArrayList; @@ -51,6 +53,7 @@ import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Iterator;
import java.util.LinkedList;
import java.util.List;
import java.util.Properties;
import java.util.concurrent.atomic.AtomicBoolean;
@ -86,7 +89,7 @@ public abstract class AbstractJoinIntegrationTest { @@ -86,7 +89,7 @@ public abstract class AbstractJoinIntegrationTest {
static final String INPUT_TOPIC_RIGHT = "inputTopicRight";
static final String INPUT_TOPIC_LEFT = "inputTopicLeft";
static final String OUTPUT_TOPIC = "outputTopic";
private final long anyUniqueKey = 0L;
static final long ANY_UNIQUE_KEY = 0L;
private final static Properties PRODUCER_CONFIG = new Properties();
private final static Properties RESULT_CONSUMER_CONFIG = new Properties();
@ -163,13 +166,13 @@ public abstract class AbstractJoinIntegrationTest { @@ -163,13 +166,13 @@ public abstract class AbstractJoinIntegrationTest {
CLUSTER.deleteAllTopicsAndWait(120000);
}
private void checkResult(final String outputTopic, final List<String> expectedResult) throws InterruptedException {
final List<String> result = IntegrationTestUtils.waitUntilMinValuesRecordsReceived(RESULT_CONSUMER_CONFIG, outputTopic, expectedResult.size(), 30 * 1000L);
assertThat(result, is(expectedResult));
private void checkResult(final String outputTopic, final List<KeyValueTimestamp<Long, String>> expectedResult) throws InterruptedException {
IntegrationTestUtils.verifyKeyValueTimestamps(RESULT_CONSUMER_CONFIG, outputTopic, expectedResult);
}
private void checkResult(final String outputTopic, final String expectedFinalResult, final int expectedTotalNumRecords) throws InterruptedException {
final List<String> result = IntegrationTestUtils.waitUntilMinValuesRecordsReceived(RESULT_CONSUMER_CONFIG, outputTopic, expectedTotalNumRecords, 30 * 1000L);
private void checkResult(final String outputTopic, final KeyValueTimestamp<Long, String> expectedFinalResult, final int expectedTotalNumRecords) throws InterruptedException {
final List<KeyValueTimestamp<Long, String>> result =
IntegrationTestUtils.waitUntilMinKeyValueWithTimestampRecordsReceived(RESULT_CONSUMER_CONFIG, outputTopic, expectedTotalNumRecords, 30 * 1000L);
assertThat(result.get(result.size() - 1), is(expectedFinalResult));
}
@ -177,7 +180,7 @@ public abstract class AbstractJoinIntegrationTest { @@ -177,7 +180,7 @@ public abstract class AbstractJoinIntegrationTest {
* Runs the actual test. Checks the result after each input record to ensure fixed processing order.
* If an input tuple does not trigger any result, "expectedResult" should contain a "null" entry
*/
void runTest(final List<List<String>> expectedResult) throws Exception {
void runTest(final List<List<KeyValueTimestamp<Long, String>>> expectedResult) throws Exception {
runTest(expectedResult, null);
}
@ -186,28 +189,34 @@ public abstract class AbstractJoinIntegrationTest { @@ -186,28 +189,34 @@ public abstract class AbstractJoinIntegrationTest {
* Runs the actual test. Checks the result after each input record to ensure fixed processing order.
* If an input tuple does not trigger any result, "expectedResult" should contain a "null" entry
*/
void runTest(final List<List<String>> expectedResult, final String storeName) throws Exception {
void runTest(final List<List<KeyValueTimestamp<Long, String>>> expectedResult, final String storeName) throws Exception {
assert expectedResult.size() == input.size();
IntegrationTestUtils.purgeLocalStreamsState(STREAMS_CONFIG);
streams = new KafkaStreams(builder.build(), STREAMS_CONFIG);
String expectedFinalResult = null;
KeyValueTimestamp<Long, String> expectedFinalResult = null;
try {
streams.start();
long ts = System.currentTimeMillis();
final long firstTimestamp = System.currentTimeMillis();
long ts = firstTimestamp;
final Iterator<List<String>> resultIterator = expectedResult.iterator();
final Iterator<List<KeyValueTimestamp<Long, String>>> resultIterator = expectedResult.iterator();
for (final Input<String> singleInput : input) {
producer.send(new ProducerRecord<>(singleInput.topic, null, ++ts, singleInput.record.key, singleInput.record.value)).get();
final List<String> expected = resultIterator.next();
final List<KeyValueTimestamp<Long, String>> expected = resultIterator.next();
if (expected != null) {
checkResult(OUTPUT_TOPIC, expected);
expectedFinalResult = expected.get(expected.size() - 1);
final List<KeyValueTimestamp<Long, String>> updatedExpected = new LinkedList<>();
for (final KeyValueTimestamp<Long, String> record : expected) {
updatedExpected.add(new KeyValueTimestamp<>(record.key(), record.value(), firstTimestamp + record.timestamp()));
}
checkResult(OUTPUT_TOPIC, updatedExpected);
expectedFinalResult = updatedExpected.get(expected.size() - 1);
}
}
@ -222,21 +231,22 @@ public abstract class AbstractJoinIntegrationTest { @@ -222,21 +231,22 @@ public abstract class AbstractJoinIntegrationTest {
/*
* Runs the actual test. Checks the final result only after expected number of records have been consumed.
*/
void runTest(final String expectedFinalResult) throws Exception {
void runTest(final KeyValueTimestamp<Long, String> expectedFinalResult) throws Exception {
runTest(expectedFinalResult, null);
}
/*
* Runs the actual test. Checks the final result only after expected number of records have been consumed.
*/
void runTest(final String expectedFinalResult, final String storeName) throws Exception {
void runTest(final KeyValueTimestamp<Long, String> expectedFinalResult, final String storeName) throws Exception {
IntegrationTestUtils.purgeLocalStreamsState(STREAMS_CONFIG);
streams = new KafkaStreams(builder.build(), STREAMS_CONFIG);
try {
streams.start();
long ts = System.currentTimeMillis();
final long firstTimestamp = System.currentTimeMillis();
long ts = firstTimestamp;
for (final Input<String> singleInput : input) {
producer.send(new ProducerRecord<>(singleInput.topic, null, ++ts, singleInput.record.key, singleInput.record.value)).get();
@ -244,10 +254,15 @@ public abstract class AbstractJoinIntegrationTest { @@ -244,10 +254,15 @@ public abstract class AbstractJoinIntegrationTest {
TestUtils.waitForCondition(() -> finalResultReached.get(), "Never received expected final result.");
checkResult(OUTPUT_TOPIC, expectedFinalResult, numRecordsExpected);
final KeyValueTimestamp<Long, String> updatedExpectedFinalResult =
new KeyValueTimestamp<>(
expectedFinalResult.key(),
expectedFinalResult.value(),
firstTimestamp + expectedFinalResult.timestamp());
checkResult(OUTPUT_TOPIC, updatedExpectedFinalResult, numRecordsExpected);
if (storeName != null) {
checkQueryableStore(storeName, expectedFinalResult);
checkQueryableStore(storeName, updatedExpectedFinalResult);
}
} finally {
streams.close();
@ -257,15 +272,16 @@ public abstract class AbstractJoinIntegrationTest { @@ -257,15 +272,16 @@ public abstract class AbstractJoinIntegrationTest {
/*
* Checks the embedded queryable state store snapshot
*/
private void checkQueryableStore(final String queryableName, final String expectedFinalResult) {
final ReadOnlyKeyValueStore<Long, String> store = streams.store(queryableName, QueryableStoreTypes.keyValueStore());
private void checkQueryableStore(final String queryableName, final KeyValueTimestamp<Long, String> expectedFinalResult) {
final ReadOnlyKeyValueStore<Long, ValueAndTimestamp<String>> store = streams.store(queryableName, QueryableStoreTypes.timestampedKeyValueStore());
final KeyValueIterator<Long, String> all = store.all();
final KeyValue<Long, String> onlyEntry = all.next();
final KeyValueIterator<Long, ValueAndTimestamp<String>> all = store.all();
final KeyValue<Long, ValueAndTimestamp<String>> onlyEntry = all.next();
try {
assertThat(onlyEntry.key, is(anyUniqueKey));
assertThat(onlyEntry.value, is(expectedFinalResult));
assertThat(onlyEntry.key, is(expectedFinalResult.key()));
assertThat(onlyEntry.value.value(), is(expectedFinalResult.value()));
assertThat(onlyEntry.value.timestamp(), is(expectedFinalResult.timestamp()));
assertThat(all.hasNext(), is(false));
} finally {
all.close();
@ -278,7 +294,7 @@ public abstract class AbstractJoinIntegrationTest { @@ -278,7 +294,7 @@ public abstract class AbstractJoinIntegrationTest {
Input(final String topic, final V value) {
this.topic = topic;
record = KeyValue.pair(anyUniqueKey, value);
record = KeyValue.pair(ANY_UNIQUE_KEY, value);
}
}
}

36
streams/src/test/java/org/apache/kafka/streams/integration/AbstractResetIntegrationTest.java

@ -16,7 +16,7 @@ @@ -16,7 +16,7 @@
*/
package org.apache.kafka.streams.integration;
import java.time.Duration;
import kafka.tools.StreamsResetter;
import org.apache.kafka.clients.CommonClientConfigs;
import org.apache.kafka.clients.admin.AdminClient;
import org.apache.kafka.clients.admin.ConsumerGroupDescription;
@ -39,10 +39,8 @@ import org.apache.kafka.streams.Topology; @@ -39,10 +39,8 @@ import org.apache.kafka.streams.Topology;
import org.apache.kafka.streams.integration.utils.EmbeddedKafkaCluster;
import org.apache.kafka.streams.integration.utils.IntegrationTestUtils;
import org.apache.kafka.streams.kstream.KStream;
import org.apache.kafka.streams.kstream.KeyValueMapper;
import org.apache.kafka.streams.kstream.Produced;
import org.apache.kafka.streams.kstream.TimeWindows;
import org.apache.kafka.streams.kstream.Windowed;
import org.apache.kafka.test.IntegrationTest;
import org.apache.kafka.test.TestCondition;
import org.apache.kafka.test.TestUtils;
@ -56,6 +54,7 @@ import java.io.BufferedWriter; @@ -56,6 +54,7 @@ import java.io.BufferedWriter;
import java.io.File;
import java.io.FileWriter;
import java.text.SimpleDateFormat;
import java.time.Duration;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Calendar;
@ -65,8 +64,6 @@ import java.util.Map; @@ -65,8 +64,6 @@ import java.util.Map;
import java.util.Properties;
import java.util.concurrent.ExecutionException;
import kafka.tools.StreamsResetter;
import static java.time.Duration.ofMillis;
import static org.hamcrest.CoreMatchers.equalTo;
import static org.hamcrest.MatcherAssert.assertThat;
@ -223,7 +220,7 @@ public abstract class AbstractResetIntegrationTest { @@ -223,7 +220,7 @@ public abstract class AbstractResetIntegrationTest {
}
}
void shouldNotAllowToResetWhileStreamsIsRunning() throws Exception {
void shouldNotAllowToResetWhileStreamsIsRunning() {
appID = testId + "-not-reset-during-runtime";
final String[] parameters = new String[] {
"--application-id", appID,
@ -390,7 +387,6 @@ public abstract class AbstractResetIntegrationTest { @@ -390,7 +387,6 @@ public abstract class AbstractResetIntegrationTest {
final File resetFile = File.createTempFile("reset", ".csv");
try (final BufferedWriter writer = new BufferedWriter(new FileWriter(resetFile))) {
writer.write(INPUT_TOPIC + ",0,1");
writer.close();
}
streams = new KafkaStreams(setupTopologyWithoutIntermediateUserTopic(), streamsConfig);
@ -434,7 +430,6 @@ public abstract class AbstractResetIntegrationTest { @@ -434,7 +430,6 @@ public abstract class AbstractResetIntegrationTest {
final File resetFile = File.createTempFile("reset", ".csv");
try (final BufferedWriter writer = new BufferedWriter(new FileWriter(resetFile))) {
writer.write(INPUT_TOPIC + ",0,1");
writer.close();
}
streams = new KafkaStreams(setupTopologyWithoutIntermediateUserTopic(), streamsConfig);
@ -482,7 +477,6 @@ public abstract class AbstractResetIntegrationTest { @@ -482,7 +477,6 @@ public abstract class AbstractResetIntegrationTest {
final File resetFile = File.createTempFile("reset", ".csv");
try (final BufferedWriter writer = new BufferedWriter(new FileWriter(resetFile))) {
writer.write(INPUT_TOPIC + ",0,1");
writer.close();
}
streams = new KafkaStreams(setupTopologyWithoutIntermediateUserTopic(), streamsConfig);
@ -514,12 +508,7 @@ public abstract class AbstractResetIntegrationTest { @@ -514,12 +508,7 @@ public abstract class AbstractResetIntegrationTest {
final KStream<Long, String> input = builder.stream(INPUT_TOPIC);
// use map to trigger internal re-partitioning before groupByKey
input.map(new KeyValueMapper<Long, String, KeyValue<Long, String>>() {
@Override
public KeyValue<Long, String> apply(final Long key, final String value) {
return new KeyValue<>(key, value);
}
})
input.map(KeyValue::new)
.groupByKey()
.count()
.toStream()
@ -530,12 +519,7 @@ public abstract class AbstractResetIntegrationTest { @@ -530,12 +519,7 @@ public abstract class AbstractResetIntegrationTest {
.windowedBy(TimeWindows.of(ofMillis(35)).advanceBy(ofMillis(10)))
.count()
.toStream()
.map(new KeyValueMapper<Windowed<Long>, Long, KeyValue<Long, Long>>() {
@Override
public KeyValue<Long, Long> apply(final Windowed<Long> key, final Long value) {
return new KeyValue<>(key.window().start() + key.window().end(), value);
}
})
.map((key, value) -> new KeyValue<>(key.window().start() + key.window().end(), value))
.to(outputTopic2, Produced.with(Serdes.Long(), Serdes.Long()));
return builder.build();
@ -547,12 +531,8 @@ public abstract class AbstractResetIntegrationTest { @@ -547,12 +531,8 @@ public abstract class AbstractResetIntegrationTest {
final KStream<Long, String> input = builder.stream(INPUT_TOPIC);
// use map to trigger internal re-partitioning before groupByKey
input.map(new KeyValueMapper<Long, String, KeyValue<Long, Long>>() {
@Override
public KeyValue<Long, Long> apply(final Long key, final String value) {
return new KeyValue<>(key, key);
}
}).to(OUTPUT_TOPIC, Produced.with(Serdes.Long(), Serdes.Long()));
input.map((key, value) -> new KeyValue<>(key, key))
.to(OUTPUT_TOPIC, Produced.with(Serdes.Long(), Serdes.Long()));
return builder.build();
}
@ -590,7 +570,7 @@ public abstract class AbstractResetIntegrationTest { @@ -590,7 +570,7 @@ public abstract class AbstractResetIntegrationTest {
parameterList.add(resetScenarioArg);
}
final String[] parameters = parameterList.toArray(new String[parameterList.size()]);
final String[] parameters = parameterList.toArray(new String[0]);
final Properties cleanUpConfig = new Properties();
cleanUpConfig.put(ConsumerConfig.HEARTBEAT_INTERVAL_MS_CONFIG, 100);

7
streams/src/test/java/org/apache/kafka/streams/integration/FineGrainedAutoResetIntegrationTest.java

@ -17,6 +17,7 @@ @@ -17,6 +17,7 @@
package org.apache.kafka.streams.integration;
import kafka.utils.MockTime;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.consumer.OffsetAndMetadata;
@ -55,8 +56,6 @@ import java.util.Map; @@ -55,8 +56,6 @@ import java.util.Map;
import java.util.Properties;
import java.util.regex.Pattern;
import kafka.utils.MockTime;
import static org.hamcrest.CoreMatchers.equalTo;
import static org.hamcrest.CoreMatchers.is;
import static org.hamcrest.MatcherAssert.assertThat;
@ -191,8 +190,8 @@ public class FineGrainedAutoResetIntegrationTest { @@ -191,8 +190,8 @@ public class FineGrainedAutoResetIntegrationTest {
final StreamsBuilder builder = new StreamsBuilder();
final KStream<String, String> pattern1Stream = builder.stream(Pattern.compile("topic-\\d" + topicSuffix), Consumed.<String, String>with(Topology.AutoOffsetReset.EARLIEST));
final KStream<String, String> pattern2Stream = builder.stream(Pattern.compile("topic-[A-D]" + topicSuffix), Consumed.<String, String>with(Topology.AutoOffsetReset.LATEST));
final KStream<String, String> pattern1Stream = builder.stream(Pattern.compile("topic-\\d" + topicSuffix), Consumed.with(Topology.AutoOffsetReset.EARLIEST));
final KStream<String, String> pattern2Stream = builder.stream(Pattern.compile("topic-[A-D]" + topicSuffix), Consumed.with(Topology.AutoOffsetReset.LATEST));
final KStream<String, String> namedTopicsStream = builder.stream(Arrays.asList(topicY, topicZ));
pattern1Stream.to(outputTopic, Produced.with(stringSerde, stringSerde));

113
streams/src/test/java/org/apache/kafka/streams/integration/GlobalKTableIntegrationTest.java

@ -29,7 +29,6 @@ import org.apache.kafka.streams.StreamsConfig; @@ -29,7 +29,6 @@ import org.apache.kafka.streams.StreamsConfig;
import org.apache.kafka.streams.integration.utils.EmbeddedKafkaCluster;
import org.apache.kafka.streams.integration.utils.IntegrationTestUtils;
import org.apache.kafka.streams.kstream.Consumed;
import org.apache.kafka.streams.kstream.ForeachAction;
import org.apache.kafka.streams.kstream.GlobalKTable;
import org.apache.kafka.streams.kstream.KStream;
import org.apache.kafka.streams.kstream.KeyValueMapper;
@ -39,7 +38,9 @@ import org.apache.kafka.streams.state.KeyValueStore; @@ -39,7 +38,9 @@ import org.apache.kafka.streams.state.KeyValueStore;
import org.apache.kafka.streams.state.QueryableStoreTypes;
import org.apache.kafka.streams.state.ReadOnlyKeyValueStore;
import org.apache.kafka.streams.state.Stores;
import org.apache.kafka.streams.state.ValueAndTimestamp;
import org.apache.kafka.test.IntegrationTest;
import org.apache.kafka.test.MockProcessorSupplier;
import org.apache.kafka.test.TestUtils;
import org.junit.After;
import org.junit.Before;
@ -68,7 +69,6 @@ public class GlobalKTableIntegrationTest { @@ -68,7 +69,6 @@ public class GlobalKTableIntegrationTest {
private final KeyValueMapper<String, Long, Long> keyMapper = (key, value) -> value;
private final ValueJoiner<Long, String, String> joiner = (value1, value2) -> value1 + "+" + value2;
private final String globalStore = "globalStore";
private final Map<String, String> results = new HashMap<>();
private StreamsBuilder builder;
private Properties streamsConfiguration;
private KafkaStreams kafkaStreams;
@ -76,7 +76,7 @@ public class GlobalKTableIntegrationTest { @@ -76,7 +76,7 @@ public class GlobalKTableIntegrationTest {
private String streamTopic;
private GlobalKTable<Long, String> globalTable;
private KStream<String, Long> stream;
private ForeachAction<String, String> foreachAction;
private MockProcessorSupplier<String, String> supplier;
@Before
public void before() throws Exception {
@ -96,7 +96,7 @@ public class GlobalKTableIntegrationTest { @@ -96,7 +96,7 @@ public class GlobalKTableIntegrationTest {
.withValueSerde(Serdes.String()));
final Consumed<String, Long> stringLongConsumed = Consumed.with(Serdes.String(), Serdes.Long());
stream = builder.stream(streamTopic, stringLongConsumed);
foreachAction = results::put;
supplier = new MockProcessorSupplier<>();
}
@After
@ -110,24 +110,34 @@ public class GlobalKTableIntegrationTest { @@ -110,24 +110,34 @@ public class GlobalKTableIntegrationTest {
@Test
public void shouldKStreamGlobalKTableLeftJoin() throws Exception {
final KStream<String, String> streamTableJoin = stream.leftJoin(globalTable, keyMapper, joiner);
streamTableJoin.foreach(foreachAction);
streamTableJoin.process(supplier);
produceInitialGlobalTableValues();
startStreams();
long firstTimestamp = mockTime.milliseconds();
produceTopicValues(streamTopic);
final Map<String, String> expected = new HashMap<>();
expected.put("a", "1+A");
expected.put("b", "2+B");
expected.put("c", "3+C");
expected.put("d", "4+D");
expected.put("e", "5+null");
final Map<String, ValueAndTimestamp<String>> expected = new HashMap<>();
expected.put("a", ValueAndTimestamp.make("1+A", firstTimestamp));
expected.put("b", ValueAndTimestamp.make("2+B", firstTimestamp + 1L));
expected.put("c", ValueAndTimestamp.make("3+C", firstTimestamp + 2L));
expected.put("d", ValueAndTimestamp.make("4+D", firstTimestamp + 3L));
expected.put("e", ValueAndTimestamp.make("5+null", firstTimestamp + 4L));
TestUtils.waitForCondition(
() -> results.equals(expected),
() -> {
if (supplier.capturedProcessorsCount() < 2) {
return false;
}
final Map<String, ValueAndTimestamp<String>> result = new HashMap<>();
result.putAll(supplier.capturedProcessors(2).get(0).lastValueAndTimestampPerKey);
result.putAll(supplier.capturedProcessors(2).get(1).lastValueAndTimestampPerKey);
return result.equals(expected);
},
30000L,
"waiting for initial values");
firstTimestamp = mockTime.milliseconds();
produceGlobalTableValues();
final ReadOnlyKeyValueStore<Long, String> replicatedStore =
@ -138,16 +148,29 @@ public class GlobalKTableIntegrationTest { @@ -138,16 +148,29 @@ public class GlobalKTableIntegrationTest {
30000,
"waiting for data in replicated store");
final ReadOnlyKeyValueStore<Long, ValueAndTimestamp<String>> replicatedStoreWithTimestamp =
kafkaStreams.store(globalStore, QueryableStoreTypes.timestampedKeyValueStore());
assertThat(replicatedStoreWithTimestamp.get(5L), equalTo(ValueAndTimestamp.make("J", firstTimestamp + 4L)));
firstTimestamp = mockTime.milliseconds();
produceTopicValues(streamTopic);
expected.put("a", "1+F");
expected.put("b", "2+G");
expected.put("c", "3+H");
expected.put("d", "4+I");
expected.put("e", "5+J");
expected.put("a", ValueAndTimestamp.make("1+F", firstTimestamp));
expected.put("b", ValueAndTimestamp.make("2+G", firstTimestamp + 1L));
expected.put("c", ValueAndTimestamp.make("3+H", firstTimestamp + 2L));
expected.put("d", ValueAndTimestamp.make("4+I", firstTimestamp + 3L));
expected.put("e", ValueAndTimestamp.make("5+J", firstTimestamp + 4L));
TestUtils.waitForCondition(
() -> results.equals(expected),
() -> {
if (supplier.capturedProcessorsCount() < 2) {
return false;
}
final Map<String, ValueAndTimestamp<String>> result = new HashMap<>();
result.putAll(supplier.capturedProcessors(2).get(0).lastValueAndTimestampPerKey);
result.putAll(supplier.capturedProcessors(2).get(1).lastValueAndTimestampPerKey);
return result.equals(expected);
},
30000L,
"waiting for final values");
}
@ -155,23 +178,33 @@ public class GlobalKTableIntegrationTest { @@ -155,23 +178,33 @@ public class GlobalKTableIntegrationTest {
@Test
public void shouldKStreamGlobalKTableJoin() throws Exception {
final KStream<String, String> streamTableJoin = stream.join(globalTable, keyMapper, joiner);
streamTableJoin.foreach(foreachAction);
streamTableJoin.process(supplier);
produceInitialGlobalTableValues();
startStreams();
long firstTimestamp = mockTime.milliseconds();
produceTopicValues(streamTopic);
final Map<String, String> expected = new HashMap<>();
expected.put("a", "1+A");
expected.put("b", "2+B");
expected.put("c", "3+C");
expected.put("d", "4+D");
final Map<String, ValueAndTimestamp<String>> expected = new HashMap<>();
expected.put("a", ValueAndTimestamp.make("1+A", firstTimestamp));
expected.put("b", ValueAndTimestamp.make("2+B", firstTimestamp + 1L));
expected.put("c", ValueAndTimestamp.make("3+C", firstTimestamp + 2L));
expected.put("d", ValueAndTimestamp.make("4+D", firstTimestamp + 3L));
TestUtils.waitForCondition(
() -> results.equals(expected),
() -> {
if (supplier.capturedProcessorsCount() < 2) {
return false;
}
final Map<String, ValueAndTimestamp<String>> result = new HashMap<>();
result.putAll(supplier.capturedProcessors(2).get(0).lastValueAndTimestampPerKey);
result.putAll(supplier.capturedProcessors(2).get(1).lastValueAndTimestampPerKey);
return result.equals(expected);
},
30000L,
"waiting for initial values");
firstTimestamp = mockTime.milliseconds();
produceGlobalTableValues();
final ReadOnlyKeyValueStore<Long, String> replicatedStore =
@ -182,16 +215,29 @@ public class GlobalKTableIntegrationTest { @@ -182,16 +215,29 @@ public class GlobalKTableIntegrationTest {
30000,
"waiting for data in replicated store");
final ReadOnlyKeyValueStore<Long, ValueAndTimestamp<String>> replicatedStoreWithTimestamp =
kafkaStreams.store(globalStore, QueryableStoreTypes.timestampedKeyValueStore());
assertThat(replicatedStoreWithTimestamp.get(5L), equalTo(ValueAndTimestamp.make("J", firstTimestamp + 4L)));
firstTimestamp = mockTime.milliseconds();
produceTopicValues(streamTopic);
expected.put("a", "1+F");
expected.put("b", "2+G");
expected.put("c", "3+H");
expected.put("d", "4+I");
expected.put("e", "5+J");
expected.put("a", ValueAndTimestamp.make("1+F", firstTimestamp));
expected.put("b", ValueAndTimestamp.make("2+G", firstTimestamp + 1L));
expected.put("c", ValueAndTimestamp.make("3+H", firstTimestamp + 2L));
expected.put("d", ValueAndTimestamp.make("4+I", firstTimestamp + 3L));
expected.put("e", ValueAndTimestamp.make("5+J", firstTimestamp + 4L));
TestUtils.waitForCondition(
() -> results.equals(expected),
() -> {
if (supplier.capturedProcessorsCount() < 2) {
return false;
}
final Map<String, ValueAndTimestamp<String>> result = new HashMap<>();
result.putAll(supplier.capturedProcessors(2).get(0).lastValueAndTimestampPerKey);
result.putAll(supplier.capturedProcessors(2).get(1).lastValueAndTimestampPerKey);
return result.equals(expected);
},
30000L,
"waiting for final values");
}
@ -209,11 +255,16 @@ public class GlobalKTableIntegrationTest { @@ -209,11 +255,16 @@ public class GlobalKTableIntegrationTest {
startStreams();
ReadOnlyKeyValueStore<Long, String> store = kafkaStreams.store(globalStore, QueryableStoreTypes.keyValueStore());
assertThat(store.approximateNumEntries(), equalTo(4L));
ReadOnlyKeyValueStore<Long, ValueAndTimestamp<String>> timestampedStore =
kafkaStreams.store(globalStore, QueryableStoreTypes.timestampedKeyValueStore());
assertThat(timestampedStore.approximateNumEntries(), equalTo(4L));
kafkaStreams.close();
startStreams();
store = kafkaStreams.store(globalStore, QueryableStoreTypes.keyValueStore());
assertThat(store.approximateNumEntries(), equalTo(4L));
timestampedStore = kafkaStreams.store(globalStore, QueryableStoreTypes.timestampedKeyValueStore());
assertThat(timestampedStore.approximateNumEntries(), equalTo(4L));
}
private void createTopics() throws Exception {

63
streams/src/test/java/org/apache/kafka/streams/integration/KStreamAggregationDedupIntegrationTest.java

@ -26,6 +26,7 @@ import org.apache.kafka.common.serialization.StringDeserializer; @@ -26,6 +26,7 @@ import org.apache.kafka.common.serialization.StringDeserializer;
import org.apache.kafka.common.serialization.StringSerializer;
import org.apache.kafka.streams.KafkaStreams;
import org.apache.kafka.streams.KeyValue;
import org.apache.kafka.streams.KeyValueTimestamp;
import org.apache.kafka.streams.StreamsBuilder;
import org.apache.kafka.streams.StreamsConfig;
import org.apache.kafka.streams.integration.utils.EmbeddedKafkaCluster;
@ -66,8 +67,7 @@ public class KStreamAggregationDedupIntegrationTest { @@ -66,8 +67,7 @@ public class KStreamAggregationDedupIntegrationTest {
private static final long COMMIT_INTERVAL_MS = 300L;
@ClassRule
public static final EmbeddedKafkaCluster CLUSTER =
new EmbeddedKafkaCluster(NUM_BROKERS);
public static final EmbeddedKafkaCluster CLUSTER = new EmbeddedKafkaCluster(NUM_BROKERS);
private final MockTime mockTime = CLUSTER.time;
private static volatile AtomicInteger testNo = new AtomicInteger(0);
@ -119,17 +119,18 @@ public class KStreamAggregationDedupIntegrationTest { @@ -119,17 +119,18 @@ public class KStreamAggregationDedupIntegrationTest {
startStreams();
produceMessages(System.currentTimeMillis());
final long timestamp = System.currentTimeMillis();
produceMessages(timestamp);
validateReceivedMessages(
new StringDeserializer(),
new StringDeserializer(),
Arrays.asList(
KeyValue.pair("A", "A:A"),
KeyValue.pair("B", "B:B"),
KeyValue.pair("C", "C:C"),
KeyValue.pair("D", "D:D"),
KeyValue.pair("E", "E:E")));
new KeyValueTimestamp<>("A", "A:A", timestamp),
new KeyValueTimestamp<>("B", "B:B", timestamp),
new KeyValueTimestamp<>("C", "C:C", timestamp),
new KeyValueTimestamp<>("D", "D:D", timestamp),
new KeyValueTimestamp<>("E", "E:E", timestamp)));
}
@Test
@ -155,16 +156,16 @@ public class KStreamAggregationDedupIntegrationTest { @@ -155,16 +156,16 @@ public class KStreamAggregationDedupIntegrationTest {
new StringDeserializer(),
new StringDeserializer(),
Arrays.asList(
new KeyValue<>("A@" + firstBatchWindow, "A"),
new KeyValue<>("A@" + secondBatchWindow, "A:A"),
new KeyValue<>("B@" + firstBatchWindow, "B"),
new KeyValue<>("B@" + secondBatchWindow, "B:B"),
new KeyValue<>("C@" + firstBatchWindow, "C"),
new KeyValue<>("C@" + secondBatchWindow, "C:C"),
new KeyValue<>("D@" + firstBatchWindow, "D"),
new KeyValue<>("D@" + secondBatchWindow, "D:D"),
new KeyValue<>("E@" + firstBatchWindow, "E"),
new KeyValue<>("E@" + secondBatchWindow, "E:E")
new KeyValueTimestamp<>("A@" + firstBatchWindow, "A", firstBatchTimestamp),
new KeyValueTimestamp<>("A@" + secondBatchWindow, "A:A", secondBatchTimestamp),
new KeyValueTimestamp<>("B@" + firstBatchWindow, "B", firstBatchTimestamp),
new KeyValueTimestamp<>("B@" + secondBatchWindow, "B:B", secondBatchTimestamp),
new KeyValueTimestamp<>("C@" + firstBatchWindow, "C", firstBatchTimestamp),
new KeyValueTimestamp<>("C@" + secondBatchWindow, "C:C", secondBatchTimestamp),
new KeyValueTimestamp<>("D@" + firstBatchWindow, "D", firstBatchTimestamp),
new KeyValueTimestamp<>("D@" + secondBatchWindow, "D:D", secondBatchTimestamp),
new KeyValueTimestamp<>("E@" + firstBatchWindow, "E", firstBatchTimestamp),
new KeyValueTimestamp<>("E@" + secondBatchWindow, "E:E", secondBatchTimestamp)
)
);
}
@ -189,11 +190,11 @@ public class KStreamAggregationDedupIntegrationTest { @@ -189,11 +190,11 @@ public class KStreamAggregationDedupIntegrationTest {
new StringDeserializer(),
new LongDeserializer(),
Arrays.asList(
KeyValue.pair("1@" + window, 2L),
KeyValue.pair("2@" + window, 2L),
KeyValue.pair("3@" + window, 2L),
KeyValue.pair("4@" + window, 2L),
KeyValue.pair("5@" + window, 2L)
new KeyValueTimestamp<>("1@" + window, 2L, timestamp),
new KeyValueTimestamp<>("2@" + window, 2L, timestamp),
new KeyValueTimestamp<>("3@" + window, 2L, timestamp),
new KeyValueTimestamp<>("4@" + window, 2L, timestamp),
new KeyValueTimestamp<>("5@" + window, 2L, timestamp)
)
);
}
@ -232,20 +233,16 @@ public class KStreamAggregationDedupIntegrationTest { @@ -232,20 +233,16 @@ public class KStreamAggregationDedupIntegrationTest {
private <K, V> void validateReceivedMessages(final Deserializer<K> keyDeserializer,
final Deserializer<V> valueDeserializer,
final List<KeyValue<K, V>> expectedRecords)
final List<KeyValueTimestamp<K, V>> expectedRecords)
throws InterruptedException {
final Properties consumerProperties = new Properties();
consumerProperties
.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, CLUSTER.bootstrapServers());
consumerProperties.setProperty(ConsumerConfig.GROUP_ID_CONFIG, "kgroupedstream-test-" +
testNo);
consumerProperties.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, CLUSTER.bootstrapServers());
consumerProperties.setProperty(ConsumerConfig.GROUP_ID_CONFIG, "kgroupedstream-test-" + testNo);
consumerProperties.setProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
consumerProperties.setProperty(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,
keyDeserializer.getClass().getName());
consumerProperties.setProperty(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,
valueDeserializer.getClass().getName());
consumerProperties.setProperty(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, keyDeserializer.getClass().getName());
consumerProperties.setProperty(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, valueDeserializer.getClass().getName());
IntegrationTestUtils.waitUntilFinalKeyValueRecordsReceived(
IntegrationTestUtils.waitUntilFinalKeyValueTimestampRecordsReceived(
consumerProperties,
outputTopic,
expectedRecords);

215
streams/src/test/java/org/apache/kafka/streams/integration/KStreamAggregationIntegrationTest.java

@ -29,6 +29,7 @@ import org.apache.kafka.common.serialization.StringDeserializer; @@ -29,6 +29,7 @@ import org.apache.kafka.common.serialization.StringDeserializer;
import org.apache.kafka.common.serialization.StringSerializer;
import org.apache.kafka.streams.KafkaStreams;
import org.apache.kafka.streams.KeyValue;
import org.apache.kafka.streams.KeyValueTimestamp;
import org.apache.kafka.streams.StreamsBuilder;
import org.apache.kafka.streams.StreamsConfig;
import org.apache.kafka.streams.integration.utils.EmbeddedKafkaCluster;
@ -98,8 +99,7 @@ public class KStreamAggregationIntegrationTest { @@ -98,8 +99,7 @@ public class KStreamAggregationIntegrationTest {
private static final int NUM_BROKERS = 1;
@ClassRule
public static final EmbeddedKafkaCluster CLUSTER =
new EmbeddedKafkaCluster(NUM_BROKERS);
public static final EmbeddedKafkaCluster CLUSTER = new EmbeddedKafkaCluster(NUM_BROKERS);
private static volatile AtomicInteger testNo = new AtomicInteger(0);
private final MockTime mockTime = CLUSTER.time;
@ -122,8 +122,7 @@ public class KStreamAggregationIntegrationTest { @@ -122,8 +122,7 @@ public class KStreamAggregationIntegrationTest {
streamsConfiguration = new Properties();
final String applicationId = "kgrouped-stream-test-" + testNo.incrementAndGet();
streamsConfiguration.put(StreamsConfig.APPLICATION_ID_CONFIG, applicationId);
streamsConfiguration
.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, CLUSTER.bootstrapServers());
streamsConfiguration.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, CLUSTER.bootstrapServers());
streamsConfiguration.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
streamsConfiguration.put(StreamsConfig.STATE_DIR_CONFIG, TestUtils.tempDirectory().getPath());
streamsConfiguration.put(StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG, 0);
@ -160,30 +159,35 @@ public class KStreamAggregationIntegrationTest { @@ -160,30 +159,35 @@ public class KStreamAggregationIntegrationTest {
produceMessages(mockTime.milliseconds());
final List<KeyValue<String, String>> results = receiveMessages(
final List<KeyValueTimestamp<String, String>> results = receiveMessages(
new StringDeserializer(),
new StringDeserializer(),
10);
results.sort(KStreamAggregationIntegrationTest::compare);
assertThat(results, is(Arrays.asList(KeyValue.pair("A", "A"),
KeyValue.pair("A", "A:A"),
KeyValue.pair("B", "B"),
KeyValue.pair("B", "B:B"),
KeyValue.pair("C", "C"),
KeyValue.pair("C", "C:C"),
KeyValue.pair("D", "D"),
KeyValue.pair("D", "D:D"),
KeyValue.pair("E", "E"),
KeyValue.pair("E", "E:E"))));
}
private static <K extends Comparable, V extends Comparable> int compare(final KeyValue<K, V> o1,
final KeyValue<K, V> o2) {
final int keyComparison = o1.key.compareTo(o2.key);
assertThat(results, is(Arrays.asList(
new KeyValueTimestamp("A", "A", mockTime.milliseconds()),
new KeyValueTimestamp("A", "A:A", mockTime.milliseconds()),
new KeyValueTimestamp("B", "B", mockTime.milliseconds()),
new KeyValueTimestamp("B", "B:B", mockTime.milliseconds()),
new KeyValueTimestamp("C", "C", mockTime.milliseconds()),
new KeyValueTimestamp("C", "C:C", mockTime.milliseconds()),
new KeyValueTimestamp("D", "D", mockTime.milliseconds()),
new KeyValueTimestamp("D", "D:D", mockTime.milliseconds()),
new KeyValueTimestamp("E", "E", mockTime.milliseconds()),
new KeyValueTimestamp("E", "E:E", mockTime.milliseconds()))));
}
private static <K extends Comparable, V extends Comparable> int compare(final KeyValueTimestamp<K, V> o1,
final KeyValueTimestamp<K, V> o2) {
final int keyComparison = o1.key().compareTo(o2.key());
if (keyComparison == 0) {
return o1.value.compareTo(o2.value);
final int valueComparison = o1.value().compareTo(o2.value());
if (valueComparison == 0) {
return Long.compare(o1.timestamp(), o2.timestamp());
}
return valueComparison;
}
return keyComparison;
}
@ -206,7 +210,7 @@ public class KStreamAggregationIntegrationTest { @@ -206,7 +210,7 @@ public class KStreamAggregationIntegrationTest {
startStreams();
final List<KeyValue<Windowed<String>, String>> windowedOutput = receiveMessages(
final List<KeyValueTimestamp<Windowed<String>, String>> windowedOutput = receiveMessages(
new TimeWindowedDeserializer<>(),
new StringDeserializer(),
String.class,
@ -218,44 +222,45 @@ public class KStreamAggregationIntegrationTest { @@ -218,44 +222,45 @@ public class KStreamAggregationIntegrationTest {
new StringDeserializer(),
String.class,
15,
false);
true);
final Comparator<KeyValue<Windowed<String>, String>>
comparator =
Comparator.comparing((KeyValue<Windowed<String>, String> o) -> o.key.key()).thenComparing(o -> o.value);
final Comparator<KeyValueTimestamp<Windowed<String>, String>> comparator =
Comparator.comparing((KeyValueTimestamp<Windowed<String>, String> o) -> o.key().key())
.thenComparing(KeyValueTimestamp::value);
windowedOutput.sort(comparator);
final long firstBatchWindow = firstBatchTimestamp / 500 * 500;
final long secondBatchWindow = secondBatchTimestamp / 500 * 500;
final List<KeyValue<Windowed<String>, String>> expectResult = Arrays.asList(
new KeyValue<>(new Windowed<>("A", new TimeWindow(firstBatchWindow, Long.MAX_VALUE)), "A"),
new KeyValue<>(new Windowed<>("A", new TimeWindow(secondBatchWindow, Long.MAX_VALUE)), "A"),
new KeyValue<>(new Windowed<>("A", new TimeWindow(secondBatchWindow, Long.MAX_VALUE)), "A:A"),
new KeyValue<>(new Windowed<>("B", new TimeWindow(firstBatchWindow, Long.MAX_VALUE)), "B"),
new KeyValue<>(new Windowed<>("B", new TimeWindow(secondBatchWindow, Long.MAX_VALUE)), "B"),
new KeyValue<>(new Windowed<>("B", new TimeWindow(secondBatchWindow, Long.MAX_VALUE)), "B:B"),
new KeyValue<>(new Windowed<>("C", new TimeWindow(firstBatchWindow, Long.MAX_VALUE)), "C"),
new KeyValue<>(new Windowed<>("C", new TimeWindow(secondBatchWindow, Long.MAX_VALUE)), "C"),
new KeyValue<>(new Windowed<>("C", new TimeWindow(secondBatchWindow, Long.MAX_VALUE)), "C:C"),
new KeyValue<>(new Windowed<>("D", new TimeWindow(firstBatchWindow, Long.MAX_VALUE)), "D"),
new KeyValue<>(new Windowed<>("D", new TimeWindow(secondBatchWindow, Long.MAX_VALUE)), "D"),
new KeyValue<>(new Windowed<>("D", new TimeWindow(secondBatchWindow, Long.MAX_VALUE)), "D:D"),
new KeyValue<>(new Windowed<>("E", new TimeWindow(firstBatchWindow, Long.MAX_VALUE)), "E"),
new KeyValue<>(new Windowed<>("E", new TimeWindow(secondBatchWindow, Long.MAX_VALUE)), "E"),
new KeyValue<>(new Windowed<>("E", new TimeWindow(secondBatchWindow, Long.MAX_VALUE)), "E:E")
final List<KeyValueTimestamp<Windowed<String>, String>> expectResult = Arrays.asList(
new KeyValueTimestamp<>(new Windowed<>("A", new TimeWindow(firstBatchWindow, Long.MAX_VALUE)), "A", firstBatchTimestamp),
new KeyValueTimestamp<>(new Windowed<>("A", new TimeWindow(secondBatchWindow, Long.MAX_VALUE)), "A", secondBatchTimestamp),
new KeyValueTimestamp<>(new Windowed<>("A", new TimeWindow(secondBatchWindow, Long.MAX_VALUE)), "A:A", secondBatchTimestamp),
new KeyValueTimestamp<>(new Windowed<>("B", new TimeWindow(firstBatchWindow, Long.MAX_VALUE)), "B", firstBatchTimestamp),
new KeyValueTimestamp<>(new Windowed<>("B", new TimeWindow(secondBatchWindow, Long.MAX_VALUE)), "B", secondBatchTimestamp),
new KeyValueTimestamp<>(new Windowed<>("B", new TimeWindow(secondBatchWindow, Long.MAX_VALUE)), "B:B", secondBatchTimestamp),
new KeyValueTimestamp<>(new Windowed<>("C", new TimeWindow(firstBatchWindow, Long.MAX_VALUE)), "C", firstBatchTimestamp),
new KeyValueTimestamp<>(new Windowed<>("C", new TimeWindow(secondBatchWindow, Long.MAX_VALUE)), "C", secondBatchTimestamp),
new KeyValueTimestamp<>(new Windowed<>("C", new TimeWindow(secondBatchWindow, Long.MAX_VALUE)), "C:C", secondBatchTimestamp),
new KeyValueTimestamp<>(new Windowed<>("D", new TimeWindow(firstBatchWindow, Long.MAX_VALUE)), "D", firstBatchTimestamp),
new KeyValueTimestamp<>(new Windowed<>("D", new TimeWindow(secondBatchWindow, Long.MAX_VALUE)), "D", secondBatchTimestamp),
new KeyValueTimestamp<>(new Windowed<>("D", new TimeWindow(secondBatchWindow, Long.MAX_VALUE)), "D:D", secondBatchTimestamp),
new KeyValueTimestamp<>(new Windowed<>("E", new TimeWindow(firstBatchWindow, Long.MAX_VALUE)), "E", firstBatchTimestamp),
new KeyValueTimestamp<>(new Windowed<>("E", new TimeWindow(secondBatchWindow, Long.MAX_VALUE)), "E", secondBatchTimestamp),
new KeyValueTimestamp<>(new Windowed<>("E", new TimeWindow(secondBatchWindow, Long.MAX_VALUE)), "E:E", secondBatchTimestamp)
);
assertThat(windowedOutput, is(expectResult));
final Set<String> expectResultString = new HashSet<>(expectResult.size());
for (final KeyValue<Windowed<String>, String> eachRecord: expectResult) {
expectResultString.add(eachRecord.toString());
for (final KeyValueTimestamp<Windowed<String>, String> eachRecord: expectResult) {
expectResultString.add("CreateTime:" + eachRecord.timestamp() + ", "
+ eachRecord.key() + ", " + eachRecord.value());
}
// check every message is contained in the expect result
final String[] allRecords = resultFromConsoleConsumer.split("\n");
for (final String record: allRecords) {
assertTrue(expectResultString.contains("KeyValue(" + record + ")"));
assertTrue(expectResultString.contains(record));
}
}
@ -273,7 +278,7 @@ public class KStreamAggregationIntegrationTest { @@ -273,7 +278,7 @@ public class KStreamAggregationIntegrationTest {
produceMessages(mockTime.milliseconds());
final List<KeyValue<String, Integer>> results = receiveMessages(
final List<KeyValueTimestamp<String, Integer>> results = receiveMessages(
new StringDeserializer(),
new IntegerDeserializer(),
10);
@ -281,16 +286,16 @@ public class KStreamAggregationIntegrationTest { @@ -281,16 +286,16 @@ public class KStreamAggregationIntegrationTest {
results.sort(KStreamAggregationIntegrationTest::compare);
assertThat(results, is(Arrays.asList(
KeyValue.pair("A", 1),
KeyValue.pair("A", 2),
KeyValue.pair("B", 1),
KeyValue.pair("B", 2),
KeyValue.pair("C", 1),
KeyValue.pair("C", 2),
KeyValue.pair("D", 1),
KeyValue.pair("D", 2),
KeyValue.pair("E", 1),
KeyValue.pair("E", 2)
new KeyValueTimestamp("A", 1, mockTime.milliseconds()),
new KeyValueTimestamp("A", 2, mockTime.milliseconds()),
new KeyValueTimestamp("B", 1, mockTime.milliseconds()),
new KeyValueTimestamp("B", 2, mockTime.milliseconds()),
new KeyValueTimestamp("C", 1, mockTime.milliseconds()),
new KeyValueTimestamp("C", 2, mockTime.milliseconds()),
new KeyValueTimestamp("D", 1, mockTime.milliseconds()),
new KeyValueTimestamp("D", 2, mockTime.milliseconds()),
new KeyValueTimestamp("E", 1, mockTime.milliseconds()),
new KeyValueTimestamp("E", 2, mockTime.milliseconds())
)));
}
@ -315,7 +320,7 @@ public class KStreamAggregationIntegrationTest { @@ -315,7 +320,7 @@ public class KStreamAggregationIntegrationTest {
startStreams();
final List<KeyValue<Windowed<String>, KeyValue<Integer, Long>>> windowedMessages = receiveMessagesWithTimestamp(
final List<KeyValueTimestamp<Windowed<String>, Integer>> windowedMessages = receiveMessagesWithTimestamp(
new TimeWindowedDeserializer<>(),
new IntegerDeserializer(),
String.class,
@ -329,37 +334,36 @@ public class KStreamAggregationIntegrationTest { @@ -329,37 +334,36 @@ public class KStreamAggregationIntegrationTest {
15,
true);
final Comparator<KeyValue<Windowed<String>, KeyValue<Integer, Long>>>
comparator =
Comparator.comparing((KeyValue<Windowed<String>, KeyValue<Integer, Long>> o) -> o.key.key()).thenComparingInt(o -> o.value.key);
final Comparator<KeyValueTimestamp<Windowed<String>, Integer>> comparator =
Comparator.comparing((KeyValueTimestamp<Windowed<String>, Integer> o) -> o.key().key())
.thenComparingInt(KeyValueTimestamp::value);
windowedMessages.sort(comparator);
final long firstWindow = firstTimestamp / 500 * 500;
final long secondWindow = secondTimestamp / 500 * 500;
final List<KeyValue<Windowed<String>, KeyValue<Integer, Long>>> expectResult = Arrays.asList(
new KeyValue<>(new Windowed<>("A", new TimeWindow(firstWindow, Long.MAX_VALUE)), KeyValue.pair(1, firstTimestamp)),
new KeyValue<>(new Windowed<>("A", new TimeWindow(secondWindow, Long.MAX_VALUE)), KeyValue.pair(1, secondTimestamp)),
new KeyValue<>(new Windowed<>("A", new TimeWindow(secondWindow, Long.MAX_VALUE)), KeyValue.pair(2, secondTimestamp)),
new KeyValue<>(new Windowed<>("B", new TimeWindow(firstWindow, Long.MAX_VALUE)), KeyValue.pair(1, firstTimestamp)),
new KeyValue<>(new Windowed<>("B", new TimeWindow(secondWindow, Long.MAX_VALUE)), KeyValue.pair(1, secondTimestamp)),
new KeyValue<>(new Windowed<>("B", new TimeWindow(secondWindow, Long.MAX_VALUE)), KeyValue.pair(2, secondTimestamp)),
new KeyValue<>(new Windowed<>("C", new TimeWindow(firstWindow, Long.MAX_VALUE)), KeyValue.pair(1, firstTimestamp)),
new KeyValue<>(new Windowed<>("C", new TimeWindow(secondWindow, Long.MAX_VALUE)), KeyValue.pair(1, secondTimestamp)),
new KeyValue<>(new Windowed<>("C", new TimeWindow(secondWindow, Long.MAX_VALUE)), KeyValue.pair(2, secondTimestamp)),
new KeyValue<>(new Windowed<>("D", new TimeWindow(firstWindow, Long.MAX_VALUE)), KeyValue.pair(1, firstTimestamp)),
new KeyValue<>(new Windowed<>("D", new TimeWindow(secondWindow, Long.MAX_VALUE)), KeyValue.pair(1, secondTimestamp)),
new KeyValue<>(new Windowed<>("D", new TimeWindow(secondWindow, Long.MAX_VALUE)), KeyValue.pair(2, secondTimestamp)),
new KeyValue<>(new Windowed<>("E", new TimeWindow(firstWindow, Long.MAX_VALUE)), KeyValue.pair(1, firstTimestamp)),
new KeyValue<>(new Windowed<>("E", new TimeWindow(secondWindow, Long.MAX_VALUE)), KeyValue.pair(1, secondTimestamp)),
new KeyValue<>(new Windowed<>("E", new TimeWindow(secondWindow, Long.MAX_VALUE)), KeyValue.pair(2, secondTimestamp)));
final List<KeyValueTimestamp<Windowed<String>, Integer>> expectResult = Arrays.asList(
new KeyValueTimestamp<>(new Windowed<>("A", new TimeWindow(firstWindow, Long.MAX_VALUE)), 1, firstTimestamp),
new KeyValueTimestamp<>(new Windowed<>("A", new TimeWindow(secondWindow, Long.MAX_VALUE)), 1, secondTimestamp),
new KeyValueTimestamp<>(new Windowed<>("A", new TimeWindow(secondWindow, Long.MAX_VALUE)), 2, secondTimestamp),
new KeyValueTimestamp<>(new Windowed<>("B", new TimeWindow(firstWindow, Long.MAX_VALUE)), 1, firstTimestamp),
new KeyValueTimestamp<>(new Windowed<>("B", new TimeWindow(secondWindow, Long.MAX_VALUE)), 1, secondTimestamp),
new KeyValueTimestamp<>(new Windowed<>("B", new TimeWindow(secondWindow, Long.MAX_VALUE)), 2, secondTimestamp),
new KeyValueTimestamp<>(new Windowed<>("C", new TimeWindow(firstWindow, Long.MAX_VALUE)), 1, firstTimestamp),
new KeyValueTimestamp<>(new Windowed<>("C", new TimeWindow(secondWindow, Long.MAX_VALUE)), 1, secondTimestamp),
new KeyValueTimestamp<>(new Windowed<>("C", new TimeWindow(secondWindow, Long.MAX_VALUE)), 2, secondTimestamp),
new KeyValueTimestamp<>(new Windowed<>("D", new TimeWindow(firstWindow, Long.MAX_VALUE)), 1, firstTimestamp),
new KeyValueTimestamp<>(new Windowed<>("D", new TimeWindow(secondWindow, Long.MAX_VALUE)), 1, secondTimestamp),
new KeyValueTimestamp<>(new Windowed<>("D", new TimeWindow(secondWindow, Long.MAX_VALUE)), 2, secondTimestamp),
new KeyValueTimestamp<>(new Windowed<>("E", new TimeWindow(firstWindow, Long.MAX_VALUE)), 1, firstTimestamp),
new KeyValueTimestamp<>(new Windowed<>("E", new TimeWindow(secondWindow, Long.MAX_VALUE)), 1, secondTimestamp),
new KeyValueTimestamp<>(new Windowed<>("E", new TimeWindow(secondWindow, Long.MAX_VALUE)), 2, secondTimestamp));
assertThat(windowedMessages, is(expectResult));
final Set<String> expectResultString = new HashSet<>(expectResult.size());
for (final KeyValue<Windowed<String>, KeyValue<Integer, Long>> eachRecord: expectResult) {
expectResultString.add("CreateTime:" + eachRecord.value.value + ", " + eachRecord.key.toString() + ", " + eachRecord.value.key);
for (final KeyValueTimestamp<Windowed<String>, Integer> eachRecord: expectResult) {
expectResultString.add("CreateTime:" + eachRecord.timestamp() + ", " + eachRecord.key() + ", " + eachRecord.value());
}
// check every message is contained in the expect result
@ -375,23 +379,23 @@ public class KStreamAggregationIntegrationTest { @@ -375,23 +379,23 @@ public class KStreamAggregationIntegrationTest {
produceMessages(mockTime.milliseconds());
final List<KeyValue<String, Long>> results = receiveMessages(
final List<KeyValueTimestamp<String, Long>> results = receiveMessages(
new StringDeserializer(),
new LongDeserializer(),
10);
results.sort(KStreamAggregationIntegrationTest::compare);
assertThat(results, is(Arrays.asList(
KeyValue.pair("A", 1L),
KeyValue.pair("A", 2L),
KeyValue.pair("B", 1L),
KeyValue.pair("B", 2L),
KeyValue.pair("C", 1L),
KeyValue.pair("C", 2L),
KeyValue.pair("D", 1L),
KeyValue.pair("D", 2L),
KeyValue.pair("E", 1L),
KeyValue.pair("E", 2L)
new KeyValueTimestamp("A", 1L, mockTime.milliseconds()),
new KeyValueTimestamp("A", 2L, mockTime.milliseconds()),
new KeyValueTimestamp("B", 1L, mockTime.milliseconds()),
new KeyValueTimestamp("B", 2L, mockTime.milliseconds()),
new KeyValueTimestamp("C", 1L, mockTime.milliseconds()),
new KeyValueTimestamp("C", 2L, mockTime.milliseconds()),
new KeyValueTimestamp("D", 1L, mockTime.milliseconds()),
new KeyValueTimestamp("D", 2L, mockTime.milliseconds()),
new KeyValueTimestamp("E", 1L, mockTime.milliseconds()),
new KeyValueTimestamp("E", 2L, mockTime.milliseconds())
)));
}
@ -430,7 +434,7 @@ public class KStreamAggregationIntegrationTest { @@ -430,7 +434,7 @@ public class KStreamAggregationIntegrationTest {
startStreams();
final List<KeyValue<String, Long>> results = receiveMessages(
final List<KeyValueTimestamp<String, Long>> results = receiveMessages(
new StringDeserializer(),
new LongDeserializer(),
10);
@ -438,18 +442,17 @@ public class KStreamAggregationIntegrationTest { @@ -438,18 +442,17 @@ public class KStreamAggregationIntegrationTest {
final long window = timestamp / 500 * 500;
assertThat(results, is(Arrays.asList(
KeyValue.pair("1@" + window, 1L),
KeyValue.pair("1@" + window, 2L),
KeyValue.pair("2@" + window, 1L),
KeyValue.pair("2@" + window, 2L),
KeyValue.pair("3@" + window, 1L),
KeyValue.pair("3@" + window, 2L),
KeyValue.pair("4@" + window, 1L),
KeyValue.pair("4@" + window, 2L),
KeyValue.pair("5@" + window, 1L),
KeyValue.pair("5@" + window, 2L)
new KeyValueTimestamp("1@" + window, 1L, timestamp),
new KeyValueTimestamp("1@" + window, 2L, timestamp),
new KeyValueTimestamp("2@" + window, 1L, timestamp),
new KeyValueTimestamp("2@" + window, 2L, timestamp),
new KeyValueTimestamp("3@" + window, 1L, timestamp),
new KeyValueTimestamp("3@" + window, 2L, timestamp),
new KeyValueTimestamp("4@" + window, 1L, timestamp),
new KeyValueTimestamp("4@" + window, 2L, timestamp),
new KeyValueTimestamp("5@" + window, 1L, timestamp),
new KeyValueTimestamp("5@" + window, 2L, timestamp)
)));
}
@Test
@ -796,14 +799,14 @@ public class KStreamAggregationIntegrationTest { @@ -796,14 +799,14 @@ public class KStreamAggregationIntegrationTest {
kafkaStreams.start();
}
private <K, V> List<KeyValue<K, V>> receiveMessages(final Deserializer<K> keyDeserializer,
private <K, V> List<KeyValueTimestamp<K, V>> receiveMessages(final Deserializer<K> keyDeserializer,
final Deserializer<V> valueDeserializer,
final int numMessages)
throws InterruptedException {
return receiveMessages(keyDeserializer, valueDeserializer, null, numMessages);
}
private <K, V> List<KeyValue<K, V>> receiveMessages(final Deserializer<K> keyDeserializer,
private <K, V> List<KeyValueTimestamp<K, V>> receiveMessages(final Deserializer<K> keyDeserializer,
final Deserializer<V> valueDeserializer,
final Class innerClass,
final int numMessages) throws InterruptedException {
@ -817,14 +820,14 @@ public class KStreamAggregationIntegrationTest { @@ -817,14 +820,14 @@ public class KStreamAggregationIntegrationTest {
consumerProperties.setProperty(StreamsConfig.DEFAULT_WINDOWED_KEY_SERDE_INNER_CLASS,
Serdes.serdeFrom(innerClass).getClass().getName());
}
return IntegrationTestUtils.waitUntilMinKeyValueRecordsReceived(
return IntegrationTestUtils.waitUntilMinKeyValueWithTimestampRecordsReceived(
consumerProperties,
outputTopic,
numMessages,
60 * 1000);
}
private <K, V> List<KeyValue<K, KeyValue<V, Long>>> receiveMessagesWithTimestamp(final Deserializer<K> keyDeserializer,
private <K, V> List<KeyValueTimestamp<K, V>> receiveMessagesWithTimestamp(final Deserializer<K> keyDeserializer,
final Deserializer<V> valueDeserializer,
final Class innerClass,
final int numMessages) throws InterruptedException {

305
streams/src/test/java/org/apache/kafka/streams/integration/StreamStreamJoinIntegrationTest.java

@ -16,6 +16,7 @@ @@ -16,6 +16,7 @@
*/
package org.apache.kafka.streams.integration;
import org.apache.kafka.streams.KeyValueTimestamp;
import org.apache.kafka.streams.StreamsBuilder;
import org.apache.kafka.streams.StreamsConfig;
import org.apache.kafka.streams.kstream.JoinWindows;
@ -62,22 +63,36 @@ public class StreamStreamJoinIntegrationTest extends AbstractJoinIntegrationTest @@ -62,22 +63,36 @@ public class StreamStreamJoinIntegrationTest extends AbstractJoinIntegrationTest
public void testInner() throws Exception {
STREAMS_CONFIG.put(StreamsConfig.APPLICATION_ID_CONFIG, appID + "-inner");
final List<List<String>> expectedResult = Arrays.asList(
final List<List<KeyValueTimestamp<Long, String>>> expectedResult = Arrays.asList(
null,
null,
null,
Collections.singletonList("A-a"),
Collections.singletonList("B-a"),
Arrays.asList("A-b", "B-b"),
Collections.singletonList(new KeyValueTimestamp<>(ANY_UNIQUE_KEY, "A-a", 4L)),
Collections.singletonList(new KeyValueTimestamp<>(ANY_UNIQUE_KEY, "B-a", 5L)),
Arrays.asList(
new KeyValueTimestamp<>(ANY_UNIQUE_KEY, "A-b", 6L),
new KeyValueTimestamp<>(ANY_UNIQUE_KEY, "B-b", 6L)),
null,
null,
Arrays.asList("C-a", "C-b"),
Arrays.asList("A-c", "B-c", "C-c"),
Arrays.asList(
new KeyValueTimestamp<>(ANY_UNIQUE_KEY, "C-a", 9L),
new KeyValueTimestamp<>(ANY_UNIQUE_KEY, "C-b", 9L)),
Arrays.asList(
new KeyValueTimestamp<>(ANY_UNIQUE_KEY, "A-c", 10L),
new KeyValueTimestamp<>(ANY_UNIQUE_KEY, "B-c", 10L),
new KeyValueTimestamp<>(ANY_UNIQUE_KEY, "C-c", 10L)),
null,
null,
null,
Arrays.asList("A-d", "B-d", "C-d"),
Arrays.asList("D-a", "D-b", "D-c", "D-d")
Arrays.asList(
new KeyValueTimestamp<>(ANY_UNIQUE_KEY, "A-d", 14L),
new KeyValueTimestamp<>(ANY_UNIQUE_KEY, "B-d", 14L),
new KeyValueTimestamp<>(ANY_UNIQUE_KEY, "C-d", 14L)),
Arrays.asList(
new KeyValueTimestamp<>(ANY_UNIQUE_KEY, "D-a", 15L),
new KeyValueTimestamp<>(ANY_UNIQUE_KEY, "D-b", 15L),
new KeyValueTimestamp<>(ANY_UNIQUE_KEY, "D-c", 15L),
new KeyValueTimestamp<>(ANY_UNIQUE_KEY, "D-d", 15L))
);
leftStream.join(rightStream, valueJoiner, JoinWindows.of(ofSeconds(10))).to(OUTPUT_TOPIC);
@ -89,27 +104,41 @@ public class StreamStreamJoinIntegrationTest extends AbstractJoinIntegrationTest @@ -89,27 +104,41 @@ public class StreamStreamJoinIntegrationTest extends AbstractJoinIntegrationTest
public void testInnerRepartitioned() throws Exception {
STREAMS_CONFIG.put(StreamsConfig.APPLICATION_ID_CONFIG, appID + "-inner-repartitioned");
final List<List<String>> expectedResult = Arrays.asList(
final List<List<KeyValueTimestamp<Long, String>>> expectedResult = Arrays.asList(
null,
null,
null,
Collections.singletonList("A-a"),
Collections.singletonList("B-a"),
Arrays.asList("A-b", "B-b"),
Collections.singletonList(new KeyValueTimestamp<>(ANY_UNIQUE_KEY, "A-a", 4L)),
Collections.singletonList(new KeyValueTimestamp<>(ANY_UNIQUE_KEY, "B-a", 5L)),
Arrays.asList(
new KeyValueTimestamp<>(ANY_UNIQUE_KEY, "A-b", 6L),
new KeyValueTimestamp<>(ANY_UNIQUE_KEY, "B-b", 6L)),
null,
null,
Arrays.asList("C-a", "C-b"),
Arrays.asList("A-c", "B-c", "C-c"),
Arrays.asList(
new KeyValueTimestamp<>(ANY_UNIQUE_KEY, "C-a", 9L),
new KeyValueTimestamp<>(ANY_UNIQUE_KEY, "C-b", 9L)),
Arrays.asList(
new KeyValueTimestamp<>(ANY_UNIQUE_KEY, "A-c", 10L),
new KeyValueTimestamp<>(ANY_UNIQUE_KEY, "B-c", 10L),
new KeyValueTimestamp<>(ANY_UNIQUE_KEY, "C-c", 10L)),
null,
null,
null,
Arrays.asList("A-d", "B-d", "C-d"),
Arrays.asList("D-a", "D-b", "D-c", "D-d")
Arrays.asList(
new KeyValueTimestamp<>(ANY_UNIQUE_KEY, "A-d", 14L),
new KeyValueTimestamp<>(ANY_UNIQUE_KEY, "B-d", 14L),
new KeyValueTimestamp<>(ANY_UNIQUE_KEY, "C-d", 14L)),
Arrays.asList(
new KeyValueTimestamp<>(ANY_UNIQUE_KEY, "D-a", 15L),
new KeyValueTimestamp<>(ANY_UNIQUE_KEY, "D-b", 15L),
new KeyValueTimestamp<>(ANY_UNIQUE_KEY, "D-c", 15L),
new KeyValueTimestamp<>(ANY_UNIQUE_KEY, "D-d", 15L))
);
leftStream.map(MockMapper.<Long, String>noOpKeyValueMapper())
.join(rightStream.flatMap(MockMapper.<Long, String>noOpFlatKeyValueMapper())
.selectKey(MockMapper.<Long, String>selectKeyKeyValueMapper()),
leftStream.map(MockMapper.noOpKeyValueMapper())
.join(rightStream.flatMap(MockMapper.noOpFlatKeyValueMapper())
.selectKey(MockMapper.selectKeyKeyValueMapper()),
valueJoiner, JoinWindows.of(ofSeconds(10))).to(OUTPUT_TOPIC);
runTest(expectedResult);
@ -119,22 +148,36 @@ public class StreamStreamJoinIntegrationTest extends AbstractJoinIntegrationTest @@ -119,22 +148,36 @@ public class StreamStreamJoinIntegrationTest extends AbstractJoinIntegrationTest
public void testLeft() throws Exception {
STREAMS_CONFIG.put(StreamsConfig.APPLICATION_ID_CONFIG, appID + "-left");
final List<List<String>> expectedResult = Arrays.asList(
final List<List<KeyValueTimestamp<Long, String>>> expectedResult = Arrays.asList(
null,
null,
Collections.singletonList("A-null"),
Collections.singletonList("A-a"),
Collections.singletonList("B-a"),
Arrays.asList("A-b", "B-b"),
Collections.singletonList(new KeyValueTimestamp<>(ANY_UNIQUE_KEY, "A-null", 3L)),
Collections.singletonList(new KeyValueTimestamp<>(ANY_UNIQUE_KEY, "A-a", 4L)),
Collections.singletonList(new KeyValueTimestamp<>(ANY_UNIQUE_KEY, "B-a", 5L)),
Arrays.asList(
new KeyValueTimestamp<>(ANY_UNIQUE_KEY, "A-b", 6L),
new KeyValueTimestamp<>(ANY_UNIQUE_KEY, "B-b", 6L)),
null,
null,
Arrays.asList("C-a", "C-b"),
Arrays.asList("A-c", "B-c", "C-c"),
Arrays.asList(
new KeyValueTimestamp<>(ANY_UNIQUE_KEY, "C-a", 9L),
new KeyValueTimestamp<>(ANY_UNIQUE_KEY, "C-b", 9L)),
Arrays.asList(
new KeyValueTimestamp<>(ANY_UNIQUE_KEY, "A-c", 10L),
new KeyValueTimestamp<>(ANY_UNIQUE_KEY, "B-c", 10L),
new KeyValueTimestamp<>(ANY_UNIQUE_KEY, "C-c", 10L)),
null,
null,
null,
Arrays.asList("A-d", "B-d", "C-d"),
Arrays.asList("D-a", "D-b", "D-c", "D-d")
Arrays.asList(
new KeyValueTimestamp<>(ANY_UNIQUE_KEY, "A-d", 14L),
new KeyValueTimestamp<>(ANY_UNIQUE_KEY, "B-d", 14L),
new KeyValueTimestamp<>(ANY_UNIQUE_KEY, "C-d", 14L)),
Arrays.asList(
new KeyValueTimestamp<>(ANY_UNIQUE_KEY, "D-a", 15L),
new KeyValueTimestamp<>(ANY_UNIQUE_KEY, "D-b", 15L),
new KeyValueTimestamp<>(ANY_UNIQUE_KEY, "D-c", 15L),
new KeyValueTimestamp<>(ANY_UNIQUE_KEY, "D-d", 15L))
);
leftStream.leftJoin(rightStream, valueJoiner, JoinWindows.of(ofSeconds(10))).to(OUTPUT_TOPIC);
@ -146,27 +189,41 @@ public class StreamStreamJoinIntegrationTest extends AbstractJoinIntegrationTest @@ -146,27 +189,41 @@ public class StreamStreamJoinIntegrationTest extends AbstractJoinIntegrationTest
public void testLeftRepartitioned() throws Exception {
STREAMS_CONFIG.put(StreamsConfig.APPLICATION_ID_CONFIG, appID + "-left-repartitioned");
final List<List<String>> expectedResult = Arrays.asList(
final List<List<KeyValueTimestamp<Long, String>>> expectedResult = Arrays.asList(
null,
null,
Collections.singletonList("A-null"),
Collections.singletonList("A-a"),
Collections.singletonList("B-a"),
Arrays.asList("A-b", "B-b"),
Collections.singletonList(new KeyValueTimestamp<>(ANY_UNIQUE_KEY, "A-null", 3L)),
Collections.singletonList(new KeyValueTimestamp<>(ANY_UNIQUE_KEY, "A-a", 4L)),
Collections.singletonList(new KeyValueTimestamp<>(ANY_UNIQUE_KEY, "B-a", 5L)),
Arrays.asList(
new KeyValueTimestamp<>(ANY_UNIQUE_KEY, "A-b", 6L),
new KeyValueTimestamp<>(ANY_UNIQUE_KEY, "B-b", 6L)),
null,
null,
Arrays.asList("C-a", "C-b"),
Arrays.asList("A-c", "B-c", "C-c"),
Arrays.asList(
new KeyValueTimestamp<>(ANY_UNIQUE_KEY, "C-a", 9L),
new KeyValueTimestamp<>(ANY_UNIQUE_KEY, "C-b", 9L)),
Arrays.asList(
new KeyValueTimestamp<>(ANY_UNIQUE_KEY, "A-c", 10L),
new KeyValueTimestamp<>(ANY_UNIQUE_KEY, "B-c", 10L),
new KeyValueTimestamp<>(ANY_UNIQUE_KEY, "C-c", 10L)),
null,
null,
null,
Arrays.asList("A-d", "B-d", "C-d"),
Arrays.asList("D-a", "D-b", "D-c", "D-d")
Arrays.asList(
new KeyValueTimestamp<>(ANY_UNIQUE_KEY, "A-d", 14L),
new KeyValueTimestamp<>(ANY_UNIQUE_KEY, "B-d", 14L),
new KeyValueTimestamp<>(ANY_UNIQUE_KEY, "C-d", 14L)),
Arrays.asList(
new KeyValueTimestamp<>(ANY_UNIQUE_KEY, "D-a", 15L),
new KeyValueTimestamp<>(ANY_UNIQUE_KEY, "D-b", 15L),
new KeyValueTimestamp<>(ANY_UNIQUE_KEY, "D-c", 15L),
new KeyValueTimestamp<>(ANY_UNIQUE_KEY, "D-d", 15L))
);
leftStream.map(MockMapper.<Long, String>noOpKeyValueMapper())
.leftJoin(rightStream.flatMap(MockMapper.<Long, String>noOpFlatKeyValueMapper())
.selectKey(MockMapper.<Long, String>selectKeyKeyValueMapper()),
leftStream.map(MockMapper.noOpKeyValueMapper())
.leftJoin(rightStream.flatMap(MockMapper.noOpFlatKeyValueMapper())
.selectKey(MockMapper.selectKeyKeyValueMapper()),
valueJoiner, JoinWindows.of(ofSeconds(10))).to(OUTPUT_TOPIC);
runTest(expectedResult);
@ -176,22 +233,36 @@ public class StreamStreamJoinIntegrationTest extends AbstractJoinIntegrationTest @@ -176,22 +233,36 @@ public class StreamStreamJoinIntegrationTest extends AbstractJoinIntegrationTest
public void testOuter() throws Exception {
STREAMS_CONFIG.put(StreamsConfig.APPLICATION_ID_CONFIG, appID + "-outer");
final List<List<String>> expectedResult = Arrays.asList(
final List<List<KeyValueTimestamp<Long, String>>> expectedResult = Arrays.asList(
null,
null,
Collections.singletonList("A-null"),
Collections.singletonList("A-a"),
Collections.singletonList("B-a"),
Arrays.asList("A-b", "B-b"),
Collections.singletonList(new KeyValueTimestamp<>(ANY_UNIQUE_KEY, "A-null", 3L)),
Collections.singletonList(new KeyValueTimestamp<>(ANY_UNIQUE_KEY, "A-a", 4L)),
Collections.singletonList(new KeyValueTimestamp<>(ANY_UNIQUE_KEY, "B-a", 5L)),
Arrays.asList(
new KeyValueTimestamp<>(ANY_UNIQUE_KEY, "A-b", 6L),
new KeyValueTimestamp<>(ANY_UNIQUE_KEY, "B-b", 6L)),
null,
null,
Arrays.asList("C-a", "C-b"),
Arrays.asList("A-c", "B-c", "C-c"),
Arrays.asList(
new KeyValueTimestamp<>(ANY_UNIQUE_KEY, "C-a", 9L),
new KeyValueTimestamp<>(ANY_UNIQUE_KEY, "C-b", 9L)),
Arrays.asList(
new KeyValueTimestamp<>(ANY_UNIQUE_KEY, "A-c", 10L),
new KeyValueTimestamp<>(ANY_UNIQUE_KEY, "B-c", 10L),
new KeyValueTimestamp<>(ANY_UNIQUE_KEY, "C-c", 10L)),
null,
null,
null,
Arrays.asList("A-d", "B-d", "C-d"),
Arrays.asList("D-a", "D-b", "D-c", "D-d")
Arrays.asList(
new KeyValueTimestamp<>(ANY_UNIQUE_KEY, "A-d", 14L),
new KeyValueTimestamp<>(ANY_UNIQUE_KEY, "B-d", 14L),
new KeyValueTimestamp<>(ANY_UNIQUE_KEY, "C-d", 14L)),
Arrays.asList(
new KeyValueTimestamp<>(ANY_UNIQUE_KEY, "D-a", 15L),
new KeyValueTimestamp<>(ANY_UNIQUE_KEY, "D-b", 15L),
new KeyValueTimestamp<>(ANY_UNIQUE_KEY, "D-c", 15L),
new KeyValueTimestamp<>(ANY_UNIQUE_KEY, "D-d", 15L))
);
leftStream.outerJoin(rightStream, valueJoiner, JoinWindows.of(ofSeconds(10))).to(OUTPUT_TOPIC);
@ -203,27 +274,41 @@ public class StreamStreamJoinIntegrationTest extends AbstractJoinIntegrationTest @@ -203,27 +274,41 @@ public class StreamStreamJoinIntegrationTest extends AbstractJoinIntegrationTest
public void testOuterRepartitioned() throws Exception {
STREAMS_CONFIG.put(StreamsConfig.APPLICATION_ID_CONFIG, appID + "-outer");
final List<List<String>> expectedResult = Arrays.asList(
final List<List<KeyValueTimestamp<Long, String>>> expectedResult = Arrays.asList(
null,
null,
Collections.singletonList("A-null"),
Collections.singletonList("A-a"),
Collections.singletonList("B-a"),
Arrays.asList("A-b", "B-b"),
Collections.singletonList(new KeyValueTimestamp<>(ANY_UNIQUE_KEY, "A-null", 3L)),
Collections.singletonList(new KeyValueTimestamp<>(ANY_UNIQUE_KEY, "A-a", 4L)),
Collections.singletonList(new KeyValueTimestamp<>(ANY_UNIQUE_KEY, "B-a", 5L)),
Arrays.asList(
new KeyValueTimestamp<>(ANY_UNIQUE_KEY, "A-b", 6L),
new KeyValueTimestamp<>(ANY_UNIQUE_KEY, "B-b", 6L)),
null,
null,
Arrays.asList("C-a", "C-b"),
Arrays.asList("A-c", "B-c", "C-c"),
Arrays.asList(
new KeyValueTimestamp<>(ANY_UNIQUE_KEY, "C-a", 9L),
new KeyValueTimestamp<>(ANY_UNIQUE_KEY, "C-b", 9L)),
Arrays.asList(
new KeyValueTimestamp<>(ANY_UNIQUE_KEY, "A-c", 10L),
new KeyValueTimestamp<>(ANY_UNIQUE_KEY, "B-c", 10L),
new KeyValueTimestamp<>(ANY_UNIQUE_KEY, "C-c", 10L)),
null,
null,
null,
Arrays.asList("A-d", "B-d", "C-d"),
Arrays.asList("D-a", "D-b", "D-c", "D-d")
Arrays.asList(
new KeyValueTimestamp<>(ANY_UNIQUE_KEY, "A-d", 14L),
new KeyValueTimestamp<>(ANY_UNIQUE_KEY, "B-d", 14L),
new KeyValueTimestamp<>(ANY_UNIQUE_KEY, "C-d", 14L)),
Arrays.asList(
new KeyValueTimestamp<>(ANY_UNIQUE_KEY, "D-a", 15L),
new KeyValueTimestamp<>(ANY_UNIQUE_KEY, "D-b", 15L),
new KeyValueTimestamp<>(ANY_UNIQUE_KEY, "D-c", 15L),
new KeyValueTimestamp<>(ANY_UNIQUE_KEY, "D-d", 15L))
);
leftStream.map(MockMapper.<Long, String>noOpKeyValueMapper())
.outerJoin(rightStream.flatMap(MockMapper.<Long, String>noOpFlatKeyValueMapper())
.selectKey(MockMapper.<Long, String>selectKeyKeyValueMapper()),
leftStream.map(MockMapper.noOpKeyValueMapper())
.outerJoin(rightStream.flatMap(MockMapper.noOpFlatKeyValueMapper())
.selectKey(MockMapper.selectKeyKeyValueMapper()),
valueJoiner, JoinWindows.of(ofSeconds(10))).to(OUTPUT_TOPIC);
runTest(expectedResult);
@ -233,26 +318,84 @@ public class StreamStreamJoinIntegrationTest extends AbstractJoinIntegrationTest @@ -233,26 +318,84 @@ public class StreamStreamJoinIntegrationTest extends AbstractJoinIntegrationTest
public void testMultiInner() throws Exception {
STREAMS_CONFIG.put(StreamsConfig.APPLICATION_ID_CONFIG, appID + "-multi-inner");
final List<List<String>> expectedResult = Arrays.asList(
null,
null,
null,
Collections.singletonList("A-a-a"),
Collections.singletonList("B-a-a"),
Arrays.asList("A-b-a", "B-b-a", "A-a-b", "B-a-b", "A-b-b", "B-b-b"),
null,
null,
Arrays.asList("C-a-a", "C-a-b", "C-b-a", "C-b-b"),
Arrays.asList("A-c-a", "A-c-b", "B-c-a", "B-c-b", "C-c-a", "C-c-b", "A-a-c", "B-a-c",
"A-b-c", "B-b-c", "C-a-c", "C-b-c", "A-c-c", "B-c-c", "C-c-c"),
null,
null,
null,
Arrays.asList("A-d-a", "A-d-b", "A-d-c", "B-d-a", "B-d-b", "B-d-c", "C-d-a", "C-d-b", "C-d-c",
"A-a-d", "B-a-d", "A-b-d", "B-b-d", "C-a-d", "C-b-d", "A-c-d", "B-c-d", "C-c-d",
"A-d-d", "B-d-d", "C-d-d"),
Arrays.asList("D-a-a", "D-a-b", "D-a-c", "D-a-d", "D-b-a", "D-b-b", "D-b-c", "D-b-d", "D-c-a",
"D-c-b", "D-c-c", "D-c-d", "D-d-a", "D-d-b", "D-d-c", "D-d-d")
final List<List<KeyValueTimestamp<Long, String>>> expectedResult = Arrays.asList(
null,
null,
null,
Collections.singletonList(new KeyValueTimestamp<>(ANY_UNIQUE_KEY, "A-a-a", 4L)),
Collections.singletonList(new KeyValueTimestamp<>(ANY_UNIQUE_KEY, "B-a-a", 5L)),
Arrays.asList(
new KeyValueTimestamp<>(ANY_UNIQUE_KEY, "A-b-a", 6L),
new KeyValueTimestamp<>(ANY_UNIQUE_KEY, "B-b-a", 6L),
new KeyValueTimestamp<>(ANY_UNIQUE_KEY, "A-a-b", 6L),
new KeyValueTimestamp<>(ANY_UNIQUE_KEY, "B-a-b", 6L),
new KeyValueTimestamp<>(ANY_UNIQUE_KEY, "A-b-b", 6L),
new KeyValueTimestamp<>(ANY_UNIQUE_KEY, "B-b-b", 6L)),
null,
null,
Arrays.asList(
new KeyValueTimestamp<>(ANY_UNIQUE_KEY, "C-a-a", 9L),
new KeyValueTimestamp<>(ANY_UNIQUE_KEY, "C-a-b", 9L),
new KeyValueTimestamp<>(ANY_UNIQUE_KEY, "C-b-a", 9L),
new KeyValueTimestamp<>(ANY_UNIQUE_KEY, "C-b-b", 9L)),
Arrays.asList(
new KeyValueTimestamp<>(ANY_UNIQUE_KEY, "A-c-a", 10L),
new KeyValueTimestamp<>(ANY_UNIQUE_KEY, "A-c-b", 10L),
new KeyValueTimestamp<>(ANY_UNIQUE_KEY, "B-c-a", 10L),
new KeyValueTimestamp<>(ANY_UNIQUE_KEY, "B-c-b", 10L),
new KeyValueTimestamp<>(ANY_UNIQUE_KEY, "C-c-a", 10L),
new KeyValueTimestamp<>(ANY_UNIQUE_KEY, "C-c-b", 10L),
new KeyValueTimestamp<>(ANY_UNIQUE_KEY, "A-a-c", 10L),
new KeyValueTimestamp<>(ANY_UNIQUE_KEY, "B-a-c", 10L),
new KeyValueTimestamp<>(ANY_UNIQUE_KEY, "A-b-c", 10L),
new KeyValueTimestamp<>(ANY_UNIQUE_KEY, "B-b-c", 10L),
new KeyValueTimestamp<>(ANY_UNIQUE_KEY, "C-a-c", 10L),
new KeyValueTimestamp<>(ANY_UNIQUE_KEY, "C-b-c", 10L),
new KeyValueTimestamp<>(ANY_UNIQUE_KEY, "A-c-c", 10L),
new KeyValueTimestamp<>(ANY_UNIQUE_KEY, "B-c-c", 10L),
new KeyValueTimestamp<>(ANY_UNIQUE_KEY, "C-c-c", 10L)),
null,
null,
null,
Arrays.asList(
new KeyValueTimestamp<>(ANY_UNIQUE_KEY, "A-d-a", 14L),
new KeyValueTimestamp<>(ANY_UNIQUE_KEY, "A-d-b", 14L),
new KeyValueTimestamp<>(ANY_UNIQUE_KEY, "A-d-c", 14L),
new KeyValueTimestamp<>(ANY_UNIQUE_KEY, "B-d-a", 14L),
new KeyValueTimestamp<>(ANY_UNIQUE_KEY, "B-d-b", 14L),
new KeyValueTimestamp<>(ANY_UNIQUE_KEY, "B-d-c", 14L),
new KeyValueTimestamp<>(ANY_UNIQUE_KEY, "C-d-a", 14L),
new KeyValueTimestamp<>(ANY_UNIQUE_KEY, "C-d-b", 14L),
new KeyValueTimestamp<>(ANY_UNIQUE_KEY, "C-d-c", 14L),
new KeyValueTimestamp<>(ANY_UNIQUE_KEY, "A-a-d", 14L),
new KeyValueTimestamp<>(ANY_UNIQUE_KEY, "B-a-d", 14L),
new KeyValueTimestamp<>(ANY_UNIQUE_KEY, "A-b-d", 14L),
new KeyValueTimestamp<>(ANY_UNIQUE_KEY, "B-b-d", 14L),
new KeyValueTimestamp<>(ANY_UNIQUE_KEY, "C-a-d", 14L),
new KeyValueTimestamp<>(ANY_UNIQUE_KEY, "C-b-d", 14L),
new KeyValueTimestamp<>(ANY_UNIQUE_KEY, "A-c-d", 14L),
new KeyValueTimestamp<>(ANY_UNIQUE_KEY, "B-c-d", 14L),
new KeyValueTimestamp<>(ANY_UNIQUE_KEY, "C-c-d", 14L),
new KeyValueTimestamp<>(ANY_UNIQUE_KEY, "A-d-d", 14L),
new KeyValueTimestamp<>(ANY_UNIQUE_KEY, "B-d-d", 14L),
new KeyValueTimestamp<>(ANY_UNIQUE_KEY, "C-d-d", 14L)),
Arrays.asList(
new KeyValueTimestamp<>(ANY_UNIQUE_KEY, "D-a-a", 15L),
new KeyValueTimestamp<>(ANY_UNIQUE_KEY, "D-a-b", 15L),
new KeyValueTimestamp<>(ANY_UNIQUE_KEY, "D-a-c", 15L),
new KeyValueTimestamp<>(ANY_UNIQUE_KEY, "D-a-d", 15L),
new KeyValueTimestamp<>(ANY_UNIQUE_KEY, "D-b-a", 15L),
new KeyValueTimestamp<>(ANY_UNIQUE_KEY, "D-b-b", 15L),
new KeyValueTimestamp<>(ANY_UNIQUE_KEY, "D-b-c", 15L),
new KeyValueTimestamp<>(ANY_UNIQUE_KEY, "D-b-d", 15L),
new KeyValueTimestamp<>(ANY_UNIQUE_KEY, "D-c-a", 15L),
new KeyValueTimestamp<>(ANY_UNIQUE_KEY, "D-c-b", 15L),
new KeyValueTimestamp<>(ANY_UNIQUE_KEY, "D-c-c", 15L),
new KeyValueTimestamp<>(ANY_UNIQUE_KEY, "D-c-d", 15L),
new KeyValueTimestamp<>(ANY_UNIQUE_KEY, "D-d-a", 15L),
new KeyValueTimestamp<>(ANY_UNIQUE_KEY, "D-d-b", 15L),
new KeyValueTimestamp<>(ANY_UNIQUE_KEY, "D-d-c", 15L),
new KeyValueTimestamp<>(ANY_UNIQUE_KEY, "D-d-d", 15L))
);
leftStream.join(rightStream, valueJoiner, JoinWindows.of(ofSeconds(10)))

24
streams/src/test/java/org/apache/kafka/streams/integration/StreamTableJoinIntegrationTest.java

@ -17,6 +17,7 @@ @@ -17,6 +17,7 @@
package org.apache.kafka.streams.integration;
import org.apache.kafka.streams.KafkaStreamsWrapper;
import org.apache.kafka.streams.KeyValueTimestamp;
import org.apache.kafka.streams.StreamsBuilder;
import org.apache.kafka.streams.StreamsConfig;
import org.apache.kafka.streams.integration.utils.IntegrationTestUtils;
@ -34,8 +35,7 @@ import java.util.Arrays; @@ -34,8 +35,7 @@ import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;
/**
* Tests all available joins of Kafka Streams DSL.
@ -82,20 +82,20 @@ public class StreamTableJoinIntegrationTest extends AbstractJoinIntegrationTest @@ -82,20 +82,20 @@ public class StreamTableJoinIntegrationTest extends AbstractJoinIntegrationTest
TestUtils.waitForCondition(listener::revokedToPendingShutdownSeen, "Did not seen thread state transited to PENDING_SHUTDOWN");
streams.close();
assertEquals(listener.createdToRevokedSeen(), true);
assertEquals(listener.revokedToPendingShutdownSeen(), true);
assertTrue(listener.createdToRevokedSeen());
assertTrue(listener.revokedToPendingShutdownSeen());
}
@Test
public void testInner() throws Exception {
STREAMS_CONFIG.put(StreamsConfig.APPLICATION_ID_CONFIG, appID + "-inner");
final List<List<String>> expectedResult = Arrays.asList(
final List<List<KeyValueTimestamp<Long, String>>> expectedResult = Arrays.asList(
null,
null,
null,
null,
Collections.singletonList("B-a"),
Collections.singletonList(new KeyValueTimestamp<>(ANY_UNIQUE_KEY, "B-a", 5L)),
null,
null,
null,
@ -105,7 +105,7 @@ public class StreamTableJoinIntegrationTest extends AbstractJoinIntegrationTest @@ -105,7 +105,7 @@ public class StreamTableJoinIntegrationTest extends AbstractJoinIntegrationTest
null,
null,
null,
Collections.singletonList("D-d")
Collections.singletonList(new KeyValueTimestamp<>(ANY_UNIQUE_KEY, "D-d", 15L))
);
leftStream.join(rightTable, valueJoiner).to(OUTPUT_TOPIC);
@ -117,22 +117,22 @@ public class StreamTableJoinIntegrationTest extends AbstractJoinIntegrationTest @@ -117,22 +117,22 @@ public class StreamTableJoinIntegrationTest extends AbstractJoinIntegrationTest
public void testLeft() throws Exception {
STREAMS_CONFIG.put(StreamsConfig.APPLICATION_ID_CONFIG, appID + "-left");
final List<List<String>> expectedResult = Arrays.asList(
final List<List<KeyValueTimestamp<Long, String>>> expectedResult = Arrays.asList(
null,
null,
Collections.singletonList("A-null"),
Collections.singletonList(new KeyValueTimestamp<>(ANY_UNIQUE_KEY, "A-null", 3L)),
null,
Collections.singletonList("B-a"),
Collections.singletonList(new KeyValueTimestamp<>(ANY_UNIQUE_KEY, "B-a", 5L)),
null,
null,
null,
Collections.singletonList("C-null"),
Collections.singletonList(new KeyValueTimestamp<>(ANY_UNIQUE_KEY, "C-null", 9L)),
null,
null,
null,
null,
null,
Collections.singletonList("D-d")
Collections.singletonList(new KeyValueTimestamp<>(ANY_UNIQUE_KEY, "D-d", 15L))
);
leftStream.leftJoin(rightTable, valueJoiner).to(OUTPUT_TOPIC);

363
streams/src/test/java/org/apache/kafka/streams/integration/TableTableJoinIntegrationTest.java

@ -18,6 +18,7 @@ package org.apache.kafka.streams.integration; @@ -18,6 +18,7 @@ package org.apache.kafka.streams.integration;
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.common.utils.Bytes;
import org.apache.kafka.streams.KeyValueTimestamp;
import org.apache.kafka.streams.StreamsBuilder;
import org.apache.kafka.streams.StreamsConfig;
import org.apache.kafka.streams.kstream.ForeachAction;
@ -59,8 +60,8 @@ public class TableTableJoinIntegrationTest extends AbstractJoinIntegrationTest { @@ -59,8 +60,8 @@ public class TableTableJoinIntegrationTest extends AbstractJoinIntegrationTest {
rightTable = builder.table(INPUT_TOPIC_RIGHT, Materialized.<Long, String, KeyValueStore<Bytes, byte[]>>as("right").withLoggingDisabled());
}
final private String expectedFinalJoinResult = "D-d";
final private String expectedFinalMultiJoinResult = "D-d-d";
final private KeyValueTimestamp<Long, String> expectedFinalJoinResult = new KeyValueTimestamp<>(ANY_UNIQUE_KEY, "D-d", 15L);
final private KeyValueTimestamp<Long, String> expectedFinalMultiJoinResult = new KeyValueTimestamp<>(ANY_UNIQUE_KEY, "D-d-d", 15L);
final private String storeName = appID + "-store";
private Materialized<Long, String, KeyValueStore<Bytes, byte[]>> materialized = Materialized.<Long, String, KeyValueStore<Bytes, byte[]>>as(storeName)
@ -70,7 +71,7 @@ public class TableTableJoinIntegrationTest extends AbstractJoinIntegrationTest { @@ -70,7 +71,7 @@ public class TableTableJoinIntegrationTest extends AbstractJoinIntegrationTest {
.withLoggingDisabled();
final private class CountingPeek implements ForeachAction<Long, String> {
final private String expected;
final private KeyValueTimestamp<Long, String> expected;
CountingPeek(final boolean multiJoin) {
this.expected = multiJoin ? expectedFinalMultiJoinResult : expectedFinalJoinResult;
@ -79,7 +80,7 @@ public class TableTableJoinIntegrationTest extends AbstractJoinIntegrationTest { @@ -79,7 +80,7 @@ public class TableTableJoinIntegrationTest extends AbstractJoinIntegrationTest {
@Override
public void apply(final Long key, final String value) {
numRecordsExpected++;
if (expected.equals(value)) {
if (expected.value().equals(value)) {
final boolean ret = finalResultReached.compareAndSet(false, true);
if (!ret) {
@ -98,22 +99,22 @@ public class TableTableJoinIntegrationTest extends AbstractJoinIntegrationTest { @@ -98,22 +99,22 @@ public class TableTableJoinIntegrationTest extends AbstractJoinIntegrationTest {
leftTable.join(rightTable, valueJoiner, materialized).toStream().peek(new CountingPeek(false)).to(OUTPUT_TOPIC);
runTest(expectedFinalJoinResult, storeName);
} else {
final List<List<String>> expectedResult = Arrays.asList(
final List<List<KeyValueTimestamp<Long, String>>> expectedResult = Arrays.asList(
null,
null,
null,
Collections.singletonList("A-a"),
Collections.singletonList("B-a"),
Collections.singletonList("B-b"),
Collections.singletonList((String) null),
Collections.singletonList(new KeyValueTimestamp<>(ANY_UNIQUE_KEY, "A-a", 4L)),
Collections.singletonList(new KeyValueTimestamp<>(ANY_UNIQUE_KEY, "B-a", 5L)),
Collections.singletonList(new KeyValueTimestamp<>(ANY_UNIQUE_KEY, "B-b", 6L)),
Collections.singletonList(new KeyValueTimestamp<>(ANY_UNIQUE_KEY, null, 7L)),
null,
null,
Collections.singletonList("C-c"),
Collections.singletonList((String) null),
Collections.singletonList(new KeyValueTimestamp<>(ANY_UNIQUE_KEY, "C-c", 10L)),
Collections.singletonList(new KeyValueTimestamp<>(ANY_UNIQUE_KEY, null, 11L)),
null,
null,
null,
Collections.singletonList("D-d")
Collections.singletonList(new KeyValueTimestamp<>(ANY_UNIQUE_KEY, "D-d", 15L))
);
leftTable.join(rightTable, valueJoiner, materialized).toStream().to(OUTPUT_TOPIC);
@ -129,22 +130,22 @@ public class TableTableJoinIntegrationTest extends AbstractJoinIntegrationTest { @@ -129,22 +130,22 @@ public class TableTableJoinIntegrationTest extends AbstractJoinIntegrationTest {
leftTable.leftJoin(rightTable, valueJoiner, materialized).toStream().peek(new CountingPeek(false)).to(OUTPUT_TOPIC);
runTest(expectedFinalJoinResult, storeName);
} else {
final List<List<String>> expectedResult = Arrays.asList(
final List<List<KeyValueTimestamp<Long, String>>> expectedResult = Arrays.asList(
null,
null,
Collections.singletonList("A-null"),
Collections.singletonList("A-a"),
Collections.singletonList("B-a"),
Collections.singletonList("B-b"),
Collections.singletonList((String) null),
Collections.singletonList(new KeyValueTimestamp<>(ANY_UNIQUE_KEY, "A-null", 3L)),
Collections.singletonList(new KeyValueTimestamp<>(ANY_UNIQUE_KEY, "A-a", 4L)),
Collections.singletonList(new KeyValueTimestamp<>(ANY_UNIQUE_KEY, "B-a", 5L)),
Collections.singletonList(new KeyValueTimestamp<>(ANY_UNIQUE_KEY, "B-b", 6L)),
Collections.singletonList(new KeyValueTimestamp<>(ANY_UNIQUE_KEY, null, 7L)),
null,
Collections.singletonList("C-null"),
Collections.singletonList("C-c"),
Collections.singletonList("C-null"),
Collections.singletonList((String) null),
Collections.singletonList(new KeyValueTimestamp<>(ANY_UNIQUE_KEY, "C-null", 9L)),
Collections.singletonList(new KeyValueTimestamp<>(ANY_UNIQUE_KEY, "C-c", 10L)),
Collections.singletonList(new KeyValueTimestamp<>(ANY_UNIQUE_KEY, "C-null", 11L)),
Collections.singletonList(new KeyValueTimestamp<>(ANY_UNIQUE_KEY, null, 12L)),
null,
null,
Collections.singletonList("D-d")
Collections.singletonList(new KeyValueTimestamp<>(ANY_UNIQUE_KEY, "D-d", 15L))
);
leftTable.leftJoin(rightTable, valueJoiner, materialized).toStream().to(OUTPUT_TOPIC);
@ -160,22 +161,22 @@ public class TableTableJoinIntegrationTest extends AbstractJoinIntegrationTest { @@ -160,22 +161,22 @@ public class TableTableJoinIntegrationTest extends AbstractJoinIntegrationTest {
leftTable.outerJoin(rightTable, valueJoiner, materialized).toStream().peek(new CountingPeek(false)).to(OUTPUT_TOPIC);
runTest(expectedFinalJoinResult, storeName);
} else {
final List<List<String>> expectedResult = Arrays.asList(
null,
null,
Collections.singletonList("A-null"),
Collections.singletonList("A-a"),
Collections.singletonList("B-a"),
Collections.singletonList("B-b"),
Collections.singletonList("null-b"),
Collections.singletonList((String) null),
Collections.singletonList("C-null"),
Collections.singletonList("C-c"),
Collections.singletonList("C-null"),
Collections.singletonList((String) null),
null,
Collections.singletonList("null-d"),
Collections.singletonList("D-d")
final List<List<KeyValueTimestamp<Long, String>>> expectedResult = Arrays.asList(
null,
null,
Collections.singletonList(new KeyValueTimestamp<>(ANY_UNIQUE_KEY, "A-null", 3L)),
Collections.singletonList(new KeyValueTimestamp<>(ANY_UNIQUE_KEY, "A-a", 4L)),
Collections.singletonList(new KeyValueTimestamp<>(ANY_UNIQUE_KEY, "B-a", 5L)),
Collections.singletonList(new KeyValueTimestamp<>(ANY_UNIQUE_KEY, "B-b", 6L)),
Collections.singletonList(new KeyValueTimestamp<>(ANY_UNIQUE_KEY, "null-b", 7L)),
Collections.singletonList(new KeyValueTimestamp<>(ANY_UNIQUE_KEY, null, 8L)),
Collections.singletonList(new KeyValueTimestamp<>(ANY_UNIQUE_KEY, "C-null", 9L)),
Collections.singletonList(new KeyValueTimestamp<>(ANY_UNIQUE_KEY, "C-c", 10L)),
Collections.singletonList(new KeyValueTimestamp<>(ANY_UNIQUE_KEY, "C-null", 11L)),
Collections.singletonList(new KeyValueTimestamp<>(ANY_UNIQUE_KEY, null, 12L)),
null,
Collections.singletonList(new KeyValueTimestamp<>(ANY_UNIQUE_KEY, "null-d", 14L)),
Collections.singletonList(new KeyValueTimestamp<>(ANY_UNIQUE_KEY, "D-d", 15L))
);
leftTable.outerJoin(rightTable, valueJoiner, materialized).toStream().to(OUTPUT_TOPIC);
@ -197,22 +198,29 @@ public class TableTableJoinIntegrationTest extends AbstractJoinIntegrationTest { @@ -197,22 +198,29 @@ public class TableTableJoinIntegrationTest extends AbstractJoinIntegrationTest {
} else {
// FIXME: the duplicate below for all the multi-joins
// are due to KAFKA-6443, should be updated once it is fixed.
final List<List<String>> expectedResult = Arrays.asList(
final List<List<KeyValueTimestamp<Long, String>>> expectedResult = Arrays.asList(
null,
null,
null,
Arrays.asList("A-a-a", "A-a-a"),
Collections.singletonList("B-a-a"),
Arrays.asList("B-b-b", "B-b-b"),
Collections.singletonList((String) null),
Arrays.asList(
new KeyValueTimestamp<>(ANY_UNIQUE_KEY, "A-a-a", 4L),
new KeyValueTimestamp<>(ANY_UNIQUE_KEY, "A-a-a", 4L)),
Collections.singletonList(new KeyValueTimestamp<>(ANY_UNIQUE_KEY, "B-a-a", 5L)),
Arrays.asList(
new KeyValueTimestamp<>(ANY_UNIQUE_KEY, "B-b-b", 6L),
new KeyValueTimestamp<>(ANY_UNIQUE_KEY, "B-b-b", 6L)),
Collections.singletonList(new KeyValueTimestamp<>(ANY_UNIQUE_KEY, null, 7L)),
null,
null,
Arrays.asList("C-c-c", "C-c-c"),
Arrays.asList(
new KeyValueTimestamp<>(ANY_UNIQUE_KEY, "C-c-c", 10L),
new KeyValueTimestamp<>(ANY_UNIQUE_KEY, "C-c-c", 10L)),
null, // correct would be -> new KeyValueTimestamp<>(ANY_UNIQUE_KEY, null, 11L)
// we don't get correct value, because of self-join of `rightTable`
null,
null,
null,
null,
Collections.singletonList("D-d-d")
Collections.singletonList(new KeyValueTimestamp<>(ANY_UNIQUE_KEY, "D-d-d", 15L))
);
leftTable.join(rightTable, valueJoiner)
@ -235,22 +243,28 @@ public class TableTableJoinIntegrationTest extends AbstractJoinIntegrationTest { @@ -235,22 +243,28 @@ public class TableTableJoinIntegrationTest extends AbstractJoinIntegrationTest {
.to(OUTPUT_TOPIC);
runTest(expectedFinalMultiJoinResult, storeName);
} else {
final List<List<String>> expectedResult = Arrays.asList(
final List<List<KeyValueTimestamp<Long, String>>> expectedResult = Arrays.asList(
null,
null,
null,
Arrays.asList("A-a-a", "A-a-a"),
Collections.singletonList("B-a-a"),
Arrays.asList("B-b-b", "B-b-b"),
Collections.singletonList((String) null),
Arrays.asList(
new KeyValueTimestamp<>(ANY_UNIQUE_KEY, "A-a-a", 4L),
new KeyValueTimestamp<>(ANY_UNIQUE_KEY, "A-a-a", 4L)),
Collections.singletonList(new KeyValueTimestamp<>(ANY_UNIQUE_KEY, "B-a-a", 5L)),
Arrays.asList(
new KeyValueTimestamp<>(ANY_UNIQUE_KEY, "B-b-b", 6L),
new KeyValueTimestamp<>(ANY_UNIQUE_KEY, "B-b-b", 6L)),
Collections.singletonList(new KeyValueTimestamp<>(ANY_UNIQUE_KEY, null, 7L)),
null,
null,
Arrays.asList("C-c-c", "C-c-c"),
Collections.singletonList((String) null),
Arrays.asList(
new KeyValueTimestamp<>(ANY_UNIQUE_KEY, "C-c-c", 10L),
new KeyValueTimestamp<>(ANY_UNIQUE_KEY, "C-c-c", 10L)),
Collections.singletonList(new KeyValueTimestamp<>(ANY_UNIQUE_KEY, null, 11L)),
null,
null,
null,
Collections.singletonList("D-d-d")
Collections.singletonList(new KeyValueTimestamp<>(ANY_UNIQUE_KEY, "D-d-d", 15L))
);
leftTable.join(rightTable, valueJoiner)
@ -274,22 +288,33 @@ public class TableTableJoinIntegrationTest extends AbstractJoinIntegrationTest { @@ -274,22 +288,33 @@ public class TableTableJoinIntegrationTest extends AbstractJoinIntegrationTest {
.to(OUTPUT_TOPIC);
runTest(expectedFinalMultiJoinResult, storeName);
} else {
final List<List<String>> expectedResult = Arrays.asList(
final List<List<KeyValueTimestamp<Long, String>>> expectedResult = Arrays.asList(
null,
null,
null,
Arrays.asList("A-a-a", "A-a-a"),
Collections.singletonList("B-a-a"),
Arrays.asList("B-b-b", "B-b-b"),
Collections.singletonList("null-b"),
Collections.singletonList((String) null),
Arrays.asList(
new KeyValueTimestamp<>(ANY_UNIQUE_KEY, "A-a-a", 4L),
new KeyValueTimestamp<>(ANY_UNIQUE_KEY, "A-a-a", 4L)),
Collections.singletonList(new KeyValueTimestamp<>(ANY_UNIQUE_KEY, "B-a-a", 5L)),
Arrays.asList(
new KeyValueTimestamp<>(ANY_UNIQUE_KEY, "B-b-b", 6L),
new KeyValueTimestamp<>(ANY_UNIQUE_KEY, "B-b-b", 6L)),
Collections.singletonList(new KeyValueTimestamp<>(ANY_UNIQUE_KEY, "null-b", 7L)),
Collections.singletonList(new KeyValueTimestamp<>(ANY_UNIQUE_KEY, null, 8L)),
null,
Arrays.asList("C-c-c", "C-c-c"),
Arrays.asList((String) null, null),
Arrays.asList(
new KeyValueTimestamp<>(ANY_UNIQUE_KEY, "C-c-c", 10L),
new KeyValueTimestamp<>(ANY_UNIQUE_KEY, "C-c-c", 10L)),
Arrays.asList(
new KeyValueTimestamp<>(ANY_UNIQUE_KEY, null, 11L),
new KeyValueTimestamp<>(ANY_UNIQUE_KEY, null, 11L)),
null,
null,
null,
Arrays.asList("null-d", "D-d-d")
Arrays.asList(
// incorrect result `null-d` is caused by self-join of `rightTable`
new KeyValueTimestamp<>(ANY_UNIQUE_KEY, "null-d", 14L),
new KeyValueTimestamp<>(ANY_UNIQUE_KEY, "D-d-d", 15L))
);
leftTable.join(rightTable, valueJoiner)
@ -312,22 +337,28 @@ public class TableTableJoinIntegrationTest extends AbstractJoinIntegrationTest { @@ -312,22 +337,28 @@ public class TableTableJoinIntegrationTest extends AbstractJoinIntegrationTest {
.to(OUTPUT_TOPIC);
runTest(expectedFinalMultiJoinResult, storeName);
} else {
final List<List<String>> expectedResult = Arrays.asList(
final List<List<KeyValueTimestamp<Long, String>>> expectedResult = Arrays.asList(
null,
null,
null,
Arrays.asList("A-a-a", "A-a-a"),
Collections.singletonList("B-a-a"),
Arrays.asList("B-b-b", "B-b-b"),
Collections.singletonList((String) null),
Arrays.asList(
new KeyValueTimestamp<>(ANY_UNIQUE_KEY, "A-a-a", 4L),
new KeyValueTimestamp<>(ANY_UNIQUE_KEY, "A-a-a", 4L)),
Collections.singletonList(new KeyValueTimestamp<>(ANY_UNIQUE_KEY, "B-a-a", 5L)),
Arrays.asList(
new KeyValueTimestamp<>(ANY_UNIQUE_KEY, "B-b-b", 6L),
new KeyValueTimestamp<>(ANY_UNIQUE_KEY, "B-b-b", 6L)),
Collections.singletonList(new KeyValueTimestamp<>(ANY_UNIQUE_KEY, null, 7L)),
null,
null,
Arrays.asList("C-c-c", "C-c-c"),
Collections.singletonList((String) null),
Arrays.asList(
new KeyValueTimestamp<>(ANY_UNIQUE_KEY, "C-c-c", 10L),
new KeyValueTimestamp<>(ANY_UNIQUE_KEY, "C-c-c", 10L)),
Collections.singletonList(new KeyValueTimestamp<>(ANY_UNIQUE_KEY, null, 11L)),
null,
null,
null,
Collections.singletonList("D-d-d")
Collections.singletonList(new KeyValueTimestamp<>(ANY_UNIQUE_KEY, "D-d-d", 15L))
);
leftTable.leftJoin(rightTable, valueJoiner)
@ -351,22 +382,32 @@ public class TableTableJoinIntegrationTest extends AbstractJoinIntegrationTest { @@ -351,22 +382,32 @@ public class TableTableJoinIntegrationTest extends AbstractJoinIntegrationTest {
.to(OUTPUT_TOPIC);
runTest(expectedFinalMultiJoinResult, storeName);
} else {
final List<List<String>> expectedResult = Arrays.asList(
final List<List<KeyValueTimestamp<Long, String>>> expectedResult = Arrays.asList(
null,
null,
null,
Arrays.asList("A-null-null", "A-a-a", "A-a-a"),
Collections.singletonList("B-a-a"),
Arrays.asList("B-b-b", "B-b-b"),
Collections.singletonList((String) null),
Arrays.asList(
new KeyValueTimestamp<>(ANY_UNIQUE_KEY, "A-null-null", 3L),
new KeyValueTimestamp<>(ANY_UNIQUE_KEY, "A-a-a", 4L),
new KeyValueTimestamp<>(ANY_UNIQUE_KEY, "A-a-a", 4L)),
Collections.singletonList(new KeyValueTimestamp<>(ANY_UNIQUE_KEY, "B-a-a", 5L)),
Arrays.asList(
new KeyValueTimestamp<>(ANY_UNIQUE_KEY, "B-b-b", 6L),
new KeyValueTimestamp<>(ANY_UNIQUE_KEY, "B-b-b", 6L)),
Collections.singletonList(new KeyValueTimestamp<>(ANY_UNIQUE_KEY, null, 7L)),
null,
null,
Arrays.asList("C-null-null", "C-c-c", "C-c-c"),
Arrays.asList("C-null-null", "C-null-null"),
Collections.singletonList((String) null),
Arrays.asList(
new KeyValueTimestamp<>(ANY_UNIQUE_KEY, "C-null-null", 9L),
new KeyValueTimestamp<>(ANY_UNIQUE_KEY, "C-c-c", 10L),
new KeyValueTimestamp<>(ANY_UNIQUE_KEY, "C-c-c", 10L)),
Arrays.asList(
new KeyValueTimestamp<>(ANY_UNIQUE_KEY, "C-null-null", 11L),
new KeyValueTimestamp<>(ANY_UNIQUE_KEY, "C-null-null", 11L)),
Collections.singletonList(new KeyValueTimestamp<>(ANY_UNIQUE_KEY, null, 12L)),
null,
null,
Collections.singletonList("D-d-d")
Collections.singletonList(new KeyValueTimestamp<>(ANY_UNIQUE_KEY, "D-d-d", 15L))
);
leftTable.leftJoin(rightTable, valueJoiner)
@ -390,22 +431,34 @@ public class TableTableJoinIntegrationTest extends AbstractJoinIntegrationTest { @@ -390,22 +431,34 @@ public class TableTableJoinIntegrationTest extends AbstractJoinIntegrationTest {
.to(OUTPUT_TOPIC);
runTest(expectedFinalMultiJoinResult, storeName);
} else {
final List<List<String>> expectedResult = Arrays.asList(
null,
null,
null,
Arrays.asList("A-null-null", "A-a-a", "A-a-a"),
Collections.singletonList("B-a-a"),
Arrays.asList("B-b-b", "B-b-b"),
Collections.singletonList("null-b"),
Collections.singletonList((String) null),
null,
Arrays.asList("C-null-null", "C-c-c", "C-c-c"),
Arrays.asList("C-null-null", "C-null-null"),
Collections.singletonList((String) null),
null,
null,
Arrays.asList("null-d", "D-d-d")
final List<List<KeyValueTimestamp<Long, String>>> expectedResult = Arrays.asList(
null,
null,
null,
Arrays.asList(
new KeyValueTimestamp<>(ANY_UNIQUE_KEY, "A-null-null", 3L),
new KeyValueTimestamp<>(ANY_UNIQUE_KEY, "A-a-a", 4L),
new KeyValueTimestamp<>(ANY_UNIQUE_KEY, "A-a-a", 4L)),
Collections.singletonList(new KeyValueTimestamp<>(ANY_UNIQUE_KEY, "B-a-a", 5L)),
Arrays.asList(
new KeyValueTimestamp<>(ANY_UNIQUE_KEY, "B-b-b", 6L),
new KeyValueTimestamp<>(ANY_UNIQUE_KEY, "B-b-b", 6L)),
Collections.singletonList(new KeyValueTimestamp<>(ANY_UNIQUE_KEY, "null-b", 7L)),
Collections.singletonList(new KeyValueTimestamp<>(ANY_UNIQUE_KEY, null, 8L)),
null,
Arrays.asList(
new KeyValueTimestamp<>(ANY_UNIQUE_KEY, "C-null-null", 9L),
new KeyValueTimestamp<>(ANY_UNIQUE_KEY, "C-c-c", 10L),
new KeyValueTimestamp<>(ANY_UNIQUE_KEY, "C-c-c", 10L)),
Arrays.asList(
new KeyValueTimestamp<>(ANY_UNIQUE_KEY, "C-null-null", 11L),
new KeyValueTimestamp<>(ANY_UNIQUE_KEY, "C-null-null", 11L)),
Collections.singletonList(new KeyValueTimestamp<>(ANY_UNIQUE_KEY, null, 12L)),
null,
null,
Arrays.asList(
new KeyValueTimestamp<>(ANY_UNIQUE_KEY, "null-d", 14L),
new KeyValueTimestamp<>(ANY_UNIQUE_KEY, "D-d-d", 15L))
);
leftTable.leftJoin(rightTable, valueJoiner)
@ -428,22 +481,30 @@ public class TableTableJoinIntegrationTest extends AbstractJoinIntegrationTest { @@ -428,22 +481,30 @@ public class TableTableJoinIntegrationTest extends AbstractJoinIntegrationTest {
.to(OUTPUT_TOPIC);
runTest(expectedFinalMultiJoinResult, storeName);
} else {
final List<List<String>> expectedResult = Arrays.asList(
final List<List<KeyValueTimestamp<Long, String>>> expectedResult = Arrays.asList(
null,
null,
null,
Arrays.asList("A-a-a", "A-a-a"),
Collections.singletonList("B-a-a"),
Arrays.asList("B-b-b", "B-b-b"),
Collections.singletonList("null-b-b"),
Arrays.asList(
new KeyValueTimestamp<>(ANY_UNIQUE_KEY, "A-a-a", 4L),
new KeyValueTimestamp<>(ANY_UNIQUE_KEY, "A-a-a", 4L)),
Collections.singletonList(new KeyValueTimestamp<>(ANY_UNIQUE_KEY, "B-a-a", 5L)),
Arrays.asList(
new KeyValueTimestamp<>(ANY_UNIQUE_KEY, "B-b-b", 6L),
new KeyValueTimestamp<>(ANY_UNIQUE_KEY, "B-b-b", 6L)),
Collections.singletonList(new KeyValueTimestamp<>(ANY_UNIQUE_KEY, "null-b-b", 7L)),
null,
null,
Arrays.asList("C-c-c", "C-c-c"),
Collections.singletonList((String) null),
Arrays.asList(
new KeyValueTimestamp<>(ANY_UNIQUE_KEY, "C-c-c", 10L),
new KeyValueTimestamp<>(ANY_UNIQUE_KEY, "C-c-c", 10L)),
Collections.singletonList(new KeyValueTimestamp<>(ANY_UNIQUE_KEY, null, 11L)),
null,
null,
Arrays.asList("null-d-d", "null-d-d"),
Collections.singletonList("D-d-d")
Arrays.asList(
new KeyValueTimestamp<>(ANY_UNIQUE_KEY, "null-d-d", 14L),
new KeyValueTimestamp<>(ANY_UNIQUE_KEY, "null-d-d", 14L)),
Collections.singletonList(new KeyValueTimestamp<>(ANY_UNIQUE_KEY, "D-d-d", 15L))
);
leftTable.outerJoin(rightTable, valueJoiner)
@ -467,22 +528,34 @@ public class TableTableJoinIntegrationTest extends AbstractJoinIntegrationTest { @@ -467,22 +528,34 @@ public class TableTableJoinIntegrationTest extends AbstractJoinIntegrationTest {
.to(OUTPUT_TOPIC);
runTest(expectedFinalMultiJoinResult, storeName);
} else {
final List<List<String>> expectedResult = Arrays.asList(
null,
null,
null,
Arrays.asList("A-null-null", "A-a-a", "A-a-a"),
Collections.singletonList("B-a-a"),
Arrays.asList("B-b-b", "B-b-b"),
Collections.singletonList("null-b-b"),
Collections.singletonList((String) null),
null,
Arrays.asList("C-null-null", "C-c-c", "C-c-c"),
Arrays.asList("C-null-null", "C-null-null"),
Collections.singletonList((String) null),
null,
Arrays.asList("null-d-d", "null-d-d"),
Collections.singletonList("D-d-d")
final List<List<KeyValueTimestamp<Long, String>>> expectedResult = Arrays.asList(
null,
null,
null,
Arrays.asList(
new KeyValueTimestamp<>(ANY_UNIQUE_KEY, "A-null-null", 3L),
new KeyValueTimestamp<>(ANY_UNIQUE_KEY, "A-a-a", 4L),
new KeyValueTimestamp<>(ANY_UNIQUE_KEY, "A-a-a", 4L)),
Collections.singletonList(new KeyValueTimestamp<>(ANY_UNIQUE_KEY, "B-a-a", 5L)),
Arrays.asList(
new KeyValueTimestamp<>(ANY_UNIQUE_KEY, "B-b-b", 6L),
new KeyValueTimestamp<>(ANY_UNIQUE_KEY, "B-b-b", 6L)),
Collections.singletonList(new KeyValueTimestamp<>(ANY_UNIQUE_KEY, "null-b-b", 7L)),
Collections.singletonList(new KeyValueTimestamp<>(ANY_UNIQUE_KEY, null, 8L)),
null,
Arrays.asList(
new KeyValueTimestamp<>(ANY_UNIQUE_KEY, "C-null-null", 9L),
new KeyValueTimestamp<>(ANY_UNIQUE_KEY, "C-c-c", 10L),
new KeyValueTimestamp<>(ANY_UNIQUE_KEY, "C-c-c", 10L)),
Arrays.asList(
new KeyValueTimestamp<>(ANY_UNIQUE_KEY, "C-null-null", 11L),
new KeyValueTimestamp<>(ANY_UNIQUE_KEY, "C-null-null", 11L)),
Collections.singletonList(new KeyValueTimestamp<>(ANY_UNIQUE_KEY, null, 12L)),
null,
Arrays.asList(
new KeyValueTimestamp<>(ANY_UNIQUE_KEY, "null-d-d", 14L),
new KeyValueTimestamp<>(ANY_UNIQUE_KEY, "null-d-d", 14L)),
Collections.singletonList(new KeyValueTimestamp<>(ANY_UNIQUE_KEY, "D-d-d", 15L))
);
leftTable.outerJoin(rightTable, valueJoiner)
@ -506,22 +579,36 @@ public class TableTableJoinIntegrationTest extends AbstractJoinIntegrationTest { @@ -506,22 +579,36 @@ public class TableTableJoinIntegrationTest extends AbstractJoinIntegrationTest {
.to(OUTPUT_TOPIC);
runTest(expectedFinalMultiJoinResult, storeName);
} else {
final List<List<String>> expectedResult = Arrays.asList(
null,
null,
null,
Arrays.asList("A-null-null", "A-a-a", "A-a-a"),
Collections.singletonList("B-a-a"),
Arrays.asList("B-b-b", "B-b-b"),
Collections.singletonList("null-b-b"),
Arrays.asList((String) null, null),
null,
Arrays.asList("C-null-null", "C-c-c", "C-c-c"),
Arrays.asList("C-null-null", "C-null-null"),
Collections.singletonList((String) null),
null,
null,
Arrays.asList("null-d-d", "null-d-d", "D-d-d")
final List<List<KeyValueTimestamp<Long, String>>> expectedResult = Arrays.asList(
null,
null,
null,
Arrays.asList(
new KeyValueTimestamp<>(ANY_UNIQUE_KEY, "A-null-null", 3L),
new KeyValueTimestamp<>(ANY_UNIQUE_KEY, "A-a-a", 4L),
new KeyValueTimestamp<>(ANY_UNIQUE_KEY, "A-a-a", 4L)),
Collections.singletonList(new KeyValueTimestamp<>(ANY_UNIQUE_KEY, "B-a-a", 5L)),
Arrays.asList(
new KeyValueTimestamp<>(ANY_UNIQUE_KEY, "B-b-b", 6L),
new KeyValueTimestamp<>(ANY_UNIQUE_KEY, "B-b-b", 6L)),
Collections.singletonList(new KeyValueTimestamp<>(ANY_UNIQUE_KEY, "null-b-b", 7L)),
Arrays.asList(
new KeyValueTimestamp<>(ANY_UNIQUE_KEY, null, 8L),
new KeyValueTimestamp<>(ANY_UNIQUE_KEY, null, 8L)),
Collections.singletonList(new KeyValueTimestamp<>(ANY_UNIQUE_KEY, "C-null-null", 9L)),
Arrays.asList(
new KeyValueTimestamp<>(ANY_UNIQUE_KEY, "C-c-c", 10L),
new KeyValueTimestamp<>(ANY_UNIQUE_KEY, "C-c-c", 10L)),
Arrays.asList(
new KeyValueTimestamp<>(ANY_UNIQUE_KEY, "C-null-null", 11L),
new KeyValueTimestamp<>(ANY_UNIQUE_KEY, "C-null-null", 11L)),
Collections.singletonList(new KeyValueTimestamp<>(ANY_UNIQUE_KEY, null, 12L)),
null,
null,
Arrays.asList(
new KeyValueTimestamp<>(ANY_UNIQUE_KEY, "null-d-d", 14L),
new KeyValueTimestamp<>(ANY_UNIQUE_KEY, "null-d-d", 14L),
new KeyValueTimestamp<>(ANY_UNIQUE_KEY, "D-d-d", 15L))
);
leftTable.outerJoin(rightTable, valueJoiner)

81
streams/src/test/java/org/apache/kafka/streams/integration/utils/IntegrationTestUtils.java

@ -241,12 +241,14 @@ public class IntegrationTestUtils { @@ -241,12 +241,14 @@ public class IntegrationTestUtils {
* @param <K> Key type of the data records
* @param <V> Value type of the data records
*/
@SuppressWarnings("WeakerAccess")
public static <K, V> void produceKeyValuesSynchronouslyWithTimestamp(final String topic,
final Collection<KeyValue<K, V>> records,
final Properties producerConfig,
final Long timestamp,
final boolean enableTransactions)
throws ExecutionException, InterruptedException {
produceKeyValuesSynchronouslyWithTimestamp(topic, records, producerConfig, null, timestamp, enableTransactions);
}
@ -260,6 +262,7 @@ public class IntegrationTestUtils { @@ -260,6 +262,7 @@ public class IntegrationTestUtils {
* @param <K> Key type of the data records
* @param <V> Value type of the data records
*/
@SuppressWarnings("WeakerAccess")
public static <K, V> void produceKeyValuesSynchronouslyWithTimestamp(final String topic,
final Collection<KeyValue<K, V>> records,
final Properties producerConfig,
@ -267,6 +270,7 @@ public class IntegrationTestUtils { @@ -267,6 +270,7 @@ public class IntegrationTestUtils {
final Long timestamp,
final boolean enableTransactions)
throws ExecutionException, InterruptedException {
try (final Producer<K, V> producer = new KafkaProducer<>(producerConfig)) {
if (enableTransactions) {
producer.initTransactions();
@ -369,6 +373,7 @@ public class IntegrationTestUtils { @@ -369,6 +373,7 @@ public class IntegrationTestUtils {
* @param enableTransactions Send messages in a transaction
* @param <V> Value type of the data records
*/
@SuppressWarnings("WeakerAccess")
public static <V> void produceValuesSynchronously(final String topic,
final Collection<V> records,
final Properties producerConfig,
@ -427,6 +432,7 @@ public class IntegrationTestUtils { @@ -427,6 +432,7 @@ public class IntegrationTestUtils {
* @param <V> Value type of the data records
* @return All the records consumed, or null if no records are consumed
*/
@SuppressWarnings("WeakerAccess")
public static <K, V> List<ConsumerRecord<K, V>> waitUntilMinRecordsReceived(final Properties consumerConfig,
final String topic,
final int expectedNumRecords) throws InterruptedException {
@ -444,6 +450,7 @@ public class IntegrationTestUtils { @@ -444,6 +450,7 @@ public class IntegrationTestUtils {
* @param <V> Value type of the data records
* @return All the records consumed, or null if no records are consumed
*/
@SuppressWarnings("WeakerAccess")
public static <K, V> List<ConsumerRecord<K, V>> waitUntilMinRecordsReceived(final Properties consumerConfig,
final String topic,
final int expectedNumRecords,
@ -519,14 +526,14 @@ public class IntegrationTestUtils { @@ -519,14 +526,14 @@ public class IntegrationTestUtils {
* @param <K> Key type of the data records
* @param <V> Value type of the data records
*/
public static <K, V> List<KeyValue<K, KeyValue<V, Long>>> waitUntilMinKeyValueWithTimestampRecordsReceived(final Properties consumerConfig,
public static <K, V> List<KeyValueTimestamp<K, V>> waitUntilMinKeyValueWithTimestampRecordsReceived(final Properties consumerConfig,
final String topic,
final int expectedNumRecords,
final long waitTime) throws InterruptedException {
final List<KeyValue<K, KeyValue<V, Long>>> accumData = new ArrayList<>();
final List<KeyValueTimestamp<K, V>> accumData = new ArrayList<>();
try (final Consumer<K, V> consumer = createConsumer(consumerConfig)) {
final TestCondition valuesRead = () -> {
final List<KeyValue<K, KeyValue<V, Long>>> readData =
final List<KeyValueTimestamp<K, V>> readData =
readKeyValuesWithTimestamp(topic, consumer, waitTime, expectedNumRecords);
accumData.addAll(readData);
return accumData.size() >= expectedNumRecords;
@ -553,6 +560,22 @@ public class IntegrationTestUtils { @@ -553,6 +560,22 @@ public class IntegrationTestUtils {
return waitUntilFinalKeyValueRecordsReceived(consumerConfig, topic, expectedRecords, DEFAULT_TIMEOUT);
}
/**
* Wait until final key-value mappings have been consumed.
*
* @param consumerConfig Kafka Consumer configuration
* @param topic Kafka topic to consume from
* @param expectedRecords Expected key-value mappings
* @param <K> Key type of the data records
* @param <V> Value type of the data records
* @return All the mappings consumed, or null if no records are consumed
*/
public static <K, V> List<KeyValueTimestamp<K, V>> waitUntilFinalKeyValueTimestampRecordsReceived(final Properties consumerConfig,
final String topic,
final List<KeyValueTimestamp<K, V>> expectedRecords) throws InterruptedException {
return waitUntilFinalKeyValueRecordsReceived(consumerConfig, topic, expectedRecords, DEFAULT_TIMEOUT, true);
}
/**
* Wait until final key-value mappings have been consumed.
*
@ -564,28 +587,53 @@ public class IntegrationTestUtils { @@ -564,28 +587,53 @@ public class IntegrationTestUtils {
* @param <V> Value type of the data records
* @return All the mappings consumed, or null if no records are consumed
*/
@SuppressWarnings("WeakerAccess")
public static <K, V> List<KeyValue<K, V>> waitUntilFinalKeyValueRecordsReceived(final Properties consumerConfig,
final String topic,
final List<KeyValue<K, V>> expectedRecords,
final long waitTime) throws InterruptedException {
final List<KeyValue<K, V>> accumData = new ArrayList<>();
return waitUntilFinalKeyValueRecordsReceived(consumerConfig, topic, expectedRecords, waitTime, false);
}
public static <K, V> List<KeyValueTimestamp<K, V>> waitUntilFinalKeyValueTimestampRecordsReceived(final Properties consumerConfig,
final String topic,
final List<KeyValueTimestamp<K, V>> expectedRecords,
final long waitTime) throws InterruptedException {
return waitUntilFinalKeyValueRecordsReceived(consumerConfig, topic, expectedRecords, waitTime, true);
}
@SuppressWarnings("unchecked")
private static <K, V, T> List<T> waitUntilFinalKeyValueRecordsReceived(final Properties consumerConfig,
final String topic,
final List<T> expectedRecords,
final long waitTime,
final boolean withTimestamp) throws InterruptedException {
final List<T> accumData = new ArrayList<>();
try (final Consumer<K, V> consumer = createConsumer(consumerConfig)) {
final TestCondition valuesRead = () -> {
final List<KeyValue<K, V>> readData =
readKeyValues(topic, consumer, waitTime, expectedRecords.size());
final List<T> readData;
if (withTimestamp) {
readData = (List<T>) readKeyValuesWithTimestamp(topic, consumer, waitTime, expectedRecords.size());
} else {
readData = (List<T>) readKeyValues(topic, consumer, waitTime, expectedRecords.size());
}
accumData.addAll(readData);
// filter out all intermediate records we don't want
final List<KeyValue<K, V>> accumulatedActual = accumData.stream().filter(expectedRecords::contains).collect(Collectors.toList());
final List<T> accumulatedActual = accumData.stream().filter(expectedRecords::contains).collect(Collectors.toList());
// still need to check that for each key, the ordering is expected
final Map<K, List<KeyValue<K, V>>> finalAccumData = new HashMap<>();
for (final KeyValue<K, V> kv : accumulatedActual) {
finalAccumData.computeIfAbsent(kv.key, key -> new ArrayList<>()).add(kv);
final Map<K, List<T>> finalAccumData = new HashMap<>();
for (final T kv : accumulatedActual) {
finalAccumData.computeIfAbsent(
(K) (withTimestamp ? ((KeyValueTimestamp) kv).key() : ((KeyValue) kv).key),
key -> new ArrayList<>()).add(kv);
}
final Map<K, List<KeyValue<K, V>>> finalExpected = new HashMap<>();
for (final KeyValue<K, V> kv : expectedRecords) {
finalExpected.computeIfAbsent(kv.key, key -> new ArrayList<>()).add(kv);
final Map<K, List<T>> finalExpected = new HashMap<>();
for (final T kv : expectedRecords) {
finalExpected.computeIfAbsent(
(K) (withTimestamp ? ((KeyValueTimestamp) kv).key() : ((KeyValue) kv).key),
key -> new ArrayList<>()).add(kv);
}
// returns true only if the remaining records in both lists are the same and in the same order
@ -643,6 +691,7 @@ public class IntegrationTestUtils { @@ -643,6 +691,7 @@ public class IntegrationTestUtils {
return accumData;
}
@SuppressWarnings("WeakerAccess")
public static void waitForTopicPartitions(final List<KafkaServer> servers,
final List<TopicPartition> partitions,
final long timeout) throws InterruptedException {
@ -863,14 +912,14 @@ public class IntegrationTestUtils { @@ -863,14 +912,14 @@ public class IntegrationTestUtils {
* @param maxMessages Maximum number of messages to read via the consumer
* @return The KeyValue elements retrieved via the consumer
*/
private static <K, V> List<KeyValue<K, KeyValue<V, Long>>> readKeyValuesWithTimestamp(final String topic,
private static <K, V> List<KeyValueTimestamp<K, V>> readKeyValuesWithTimestamp(final String topic,
final Consumer<K, V> consumer,
final long waitTime,
final int maxMessages) {
final List<KeyValue<K, KeyValue<V, Long>>> consumedValues = new ArrayList<>();
final List<KeyValueTimestamp<K, V>> consumedValues = new ArrayList<>();
final List<ConsumerRecord<K, V>> records = readRecords(topic, consumer, waitTime, maxMessages);
for (final ConsumerRecord<K, V> record : records) {
consumedValues.add(new KeyValue<>(record.key(), KeyValue.pair(record.value(), record.timestamp())));
consumedValues.add(new KeyValueTimestamp<>(record.key(), record.value(), record.timestamp()));
}
return consumedValues;
}

4
streams/src/test/java/org/apache/kafka/test/MockProcessorSupplier.java

@ -56,6 +56,10 @@ public class MockProcessorSupplier<K, V> implements ProcessorSupplier<K, V> { @@ -56,6 +56,10 @@ public class MockProcessorSupplier<K, V> implements ProcessorSupplier<K, V> {
return capturedProcessors(1).get(0);
}
public int capturedProcessorsCount() {
return processors.size();
}
// get the captured processors with the expected number
public List<MockProcessor<K, V>> capturedProcessors(final int expectedNumberOfProcessors) {
assertEquals(expectedNumberOfProcessors, processors.size());

Loading…
Cancel
Save