Browse Source

MINOR: Caching layer should forward record timestamp (#5423)

Reviewer: Guozhang Wang <guozhang@confluent.io>
pull/5408/merge
Matthias J. Sax 6 years ago committed by GitHub
parent
commit
42af41d5fc
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 2
      streams/src/main/java/org/apache/kafka/streams/state/internals/CachingSessionStore.java
  2. 2
      streams/src/main/java/org/apache/kafka/streams/state/internals/CachingWindowStore.java
  3. 274
      streams/src/test/java/org/apache/kafka/streams/integration/KStreamAggregationIntegrationTest.java
  4. 118
      streams/src/test/java/org/apache/kafka/streams/integration/utils/IntegrationTestUtils.java

2
streams/src/main/java/org/apache/kafka/streams/state/internals/CachingSessionStore.java

@ -147,7 +147,7 @@ class CachingSessionStore<K, AGG> extends WrappedStateStore.AbstractStateStore i @@ -147,7 +147,7 @@ class CachingSessionStore<K, AGG> extends WrappedStateStore.AbstractStateStore i
context.headers(),
true,
context.offset(),
key.window().end(),
context.timestamp(),
context.partition(),
context.topic());
cache.put(cacheName, cacheFunction.cacheKey(binaryKey), entry);

2
streams/src/main/java/org/apache/kafka/streams/state/internals/CachingWindowStore.java

@ -156,7 +156,7 @@ class CachingWindowStore<K, V> extends WrappedStateStore.AbstractStateStore impl @@ -156,7 +156,7 @@ class CachingWindowStore<K, V> extends WrappedStateStore.AbstractStateStore impl
context.headers(),
true,
context.offset(),
timestamp,
context.timestamp(),
context.partition(),
context.topic());
cache.put(name, cacheFunction.cacheKey(keyBytes), entry);

274
streams/src/test/java/org/apache/kafka/streams/integration/KStreamAggregationIntegrationTest.java

@ -28,7 +28,6 @@ import org.apache.kafka.common.serialization.Serdes; @@ -28,7 +28,6 @@ import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.apache.kafka.common.serialization.StringSerializer;
import org.apache.kafka.common.utils.Bytes;
import org.apache.kafka.streams.kstream.Consumed;
import org.apache.kafka.streams.KafkaStreams;
import org.apache.kafka.streams.KeyValue;
import org.apache.kafka.streams.StreamsBuilder;
@ -36,7 +35,7 @@ import org.apache.kafka.streams.StreamsConfig; @@ -36,7 +35,7 @@ import org.apache.kafka.streams.StreamsConfig;
import org.apache.kafka.streams.integration.utils.EmbeddedKafkaCluster;
import org.apache.kafka.streams.integration.utils.IntegrationTestUtils;
import org.apache.kafka.streams.kstream.Aggregator;
import org.apache.kafka.streams.kstream.ForeachAction;
import org.apache.kafka.streams.kstream.Consumed;
import org.apache.kafka.streams.kstream.Initializer;
import org.apache.kafka.streams.kstream.KGroupedStream;
import org.apache.kafka.streams.kstream.KStream;
@ -49,16 +48,16 @@ import org.apache.kafka.streams.kstream.SessionWindowedDeserializer; @@ -49,16 +48,16 @@ import org.apache.kafka.streams.kstream.SessionWindowedDeserializer;
import org.apache.kafka.streams.kstream.SessionWindows;
import org.apache.kafka.streams.kstream.TimeWindowedDeserializer;
import org.apache.kafka.streams.kstream.TimeWindows;
import org.apache.kafka.streams.kstream.Transformer;
import org.apache.kafka.streams.kstream.Windowed;
import org.apache.kafka.streams.kstream.WindowedSerdes;
import org.apache.kafka.streams.kstream.internals.SessionWindow;
import org.apache.kafka.streams.kstream.internals.TimeWindow;
import org.apache.kafka.streams.processor.ProcessorContext;
import org.apache.kafka.streams.state.KeyValueIterator;
import org.apache.kafka.streams.state.KeyValueStore;
import org.apache.kafka.streams.state.QueryableStoreTypes;
import org.apache.kafka.streams.state.ReadOnlySessionStore;
import org.apache.kafka.streams.state.SessionStore;
import org.apache.kafka.streams.state.WindowStore;
import org.apache.kafka.test.IntegrationTest;
import org.apache.kafka.test.MockMapper;
import org.apache.kafka.test.TestUtils;
@ -136,24 +135,9 @@ public class KStreamAggregationIntegrationTest { @@ -136,24 +135,9 @@ public class KStreamAggregationIntegrationTest {
mapper,
Serialized.with(Serdes.String(), Serdes.String()));
reducer = new Reducer<String>() {
@Override
public String apply(final String value1, final String value2) {
return value1 + ":" + value2;
}
};
initializer = new Initializer<Integer>() {
@Override
public Integer apply() {
return 0;
}
};
aggregator = new Aggregator<String, String, Integer>() {
@Override
public Integer apply(final String aggKey, final String value, final Integer aggregate) {
return aggregate + value.length();
}
};
reducer = (value1, value2) -> value1 + ":" + value2;
initializer = () -> 0;
aggregator = (aggKey, value, aggregate) -> aggregate + value.length();
}
@After
@ -181,12 +165,7 @@ public class KStreamAggregationIntegrationTest { @@ -181,12 +165,7 @@ public class KStreamAggregationIntegrationTest {
new StringDeserializer(),
10);
Collections.sort(results, new Comparator<KeyValue<String, String>>() {
@Override
public int compare(final KeyValue<String, String> o1, final KeyValue<String, String> o2) {
return KStreamAggregationIntegrationTest.compare(o1, o2);
}
});
Collections.sort(results, KStreamAggregationIntegrationTest::compare);
assertThat(results, is(Arrays.asList(KeyValue.pair("A", "A"),
KeyValue.pair("A", "A:A"),
@ -218,7 +197,7 @@ public class KStreamAggregationIntegrationTest { @@ -218,7 +197,7 @@ public class KStreamAggregationIntegrationTest {
produceMessages(secondBatchTimestamp);
produceMessages(secondBatchTimestamp);
Serde<Windowed<String>> windowedSerde = WindowedSerdes.timeWindowedSerdeFrom(String.class);
final Serde<Windowed<String>> windowedSerde = WindowedSerdes.timeWindowedSerdeFrom(String.class);
groupedStream
.windowedBy(TimeWindows.of(500L))
.reduce(reducer)
@ -228,34 +207,28 @@ public class KStreamAggregationIntegrationTest { @@ -228,34 +207,28 @@ public class KStreamAggregationIntegrationTest {
startStreams();
final List<KeyValue<Windowed<String>, String>> windowedOutput = receiveMessages(
new TimeWindowedDeserializer<String>(),
new TimeWindowedDeserializer<>(),
new StringDeserializer(),
String.class,
15);
// read from ConsoleConsumer
String resultFromConsoleConsumer = readWindowedKeyedMessagesViaConsoleConsumer(
new TimeWindowedDeserializer<String>(),
new StringDeserializer(),
String.class,
15);
final String resultFromConsoleConsumer = readWindowedKeyedMessagesViaConsoleConsumer(
new TimeWindowedDeserializer<String>(),
new StringDeserializer(),
String.class,
15,
false);
final Comparator<KeyValue<Windowed<String>, String>>
comparator =
new Comparator<KeyValue<Windowed<String>, String>>() {
@Override
public int compare(final KeyValue<Windowed<String>, String> o1,
final KeyValue<Windowed<String>, String> o2) {
final int keyComparison = o1.key.key().compareTo(o2.key.key());
return keyComparison == 0 ? o1.value.compareTo(o2.value) : keyComparison;
}
};
Comparator.comparing((KeyValue<Windowed<String>, String> o) -> o.key.key()).thenComparing(o -> o.value);
Collections.sort(windowedOutput, comparator);
final long firstBatchWindow = firstBatchTimestamp / 500 * 500;
final long secondBatchWindow = secondBatchTimestamp / 500 * 500;
List<KeyValue<Windowed<String>, String>> expectResult = Arrays.asList(
final List<KeyValue<Windowed<String>, String>> expectResult = Arrays.asList(
new KeyValue<>(new Windowed<>("A", new TimeWindow(firstBatchWindow, Long.MAX_VALUE)), "A"),
new KeyValue<>(new Windowed<>("A", new TimeWindow(secondBatchWindow, Long.MAX_VALUE)), "A"),
new KeyValue<>(new Windowed<>("A", new TimeWindow(secondBatchWindow, Long.MAX_VALUE)), "A:A"),
@ -274,13 +247,13 @@ public class KStreamAggregationIntegrationTest { @@ -274,13 +247,13 @@ public class KStreamAggregationIntegrationTest {
);
assertThat(windowedOutput, is(expectResult));
Set<String> expectResultString = new HashSet<>(expectResult.size());
for (KeyValue<Windowed<String>, String> eachRecord: expectResult) {
final Set<String> expectResultString = new HashSet<>(expectResult.size());
for (final KeyValue<Windowed<String>, String> eachRecord: expectResult) {
expectResultString.add(eachRecord.toString());
}
// check every message is contained in the expect result
String[] allRecords = resultFromConsoleConsumer.split("\n");
final String[] allRecords = resultFromConsoleConsumer.split("\n");
for (String record: allRecords) {
record = "KeyValue(" + record + ")";
assertTrue(expectResultString.contains(record));
@ -306,12 +279,7 @@ public class KStreamAggregationIntegrationTest { @@ -306,12 +279,7 @@ public class KStreamAggregationIntegrationTest {
new IntegerDeserializer(),
10);
Collections.sort(results, new Comparator<KeyValue<String, Integer>>() {
@Override
public int compare(final KeyValue<String, Integer> o1, final KeyValue<String, Integer> o2) {
return KStreamAggregationIntegrationTest.compare(o1, o2);
}
});
Collections.sort(results, KStreamAggregationIntegrationTest::compare);
assertThat(results, is(Arrays.asList(
KeyValue.pair("A", 1),
@ -336,75 +304,68 @@ public class KStreamAggregationIntegrationTest { @@ -336,75 +304,68 @@ public class KStreamAggregationIntegrationTest {
produceMessages(secondTimestamp);
produceMessages(secondTimestamp);
Serde<Windowed<String>> windowedSerde = WindowedSerdes.timeWindowedSerdeFrom(String.class);
final Serde<Windowed<String>> windowedSerde = WindowedSerdes.timeWindowedSerdeFrom(String.class);
groupedStream.windowedBy(TimeWindows.of(500L))
.aggregate(
initializer,
aggregator,
Materialized.<String, Integer, WindowStore<Bytes, byte[]>>with(null, Serdes.Integer())
Materialized.with(null, Serdes.Integer())
)
.toStream()
.to(outputTopic, Produced.with(windowedSerde, Serdes.Integer()));
startStreams();
final List<KeyValue<Windowed<String>, Integer>> windowedMessages = receiveMessages(
new TimeWindowedDeserializer<String>(),
final List<KeyValue<Windowed<String>, KeyValue<Integer, Long>>> windowedMessages = receiveMessagesWithTimestamp(
new TimeWindowedDeserializer<>(),
new IntegerDeserializer(),
String.class,
15);
// read from ConsoleConsumer
String resultFromConsoleConsumer = readWindowedKeyedMessagesViaConsoleConsumer(
new TimeWindowedDeserializer<String>(),
new IntegerDeserializer(),
String.class,
15);
final String resultFromConsoleConsumer = readWindowedKeyedMessagesViaConsoleConsumer(
new TimeWindowedDeserializer<String>(),
new IntegerDeserializer(),
String.class,
15,
true);
final Comparator<KeyValue<Windowed<String>, Integer>>
final Comparator<KeyValue<Windowed<String>, KeyValue<Integer, Long>>>
comparator =
new Comparator<KeyValue<Windowed<String>, Integer>>() {
@Override
public int compare(final KeyValue<Windowed<String>, Integer> o1,
final KeyValue<Windowed<String>, Integer> o2) {
final int keyComparison = o1.key.key().compareTo(o2.key.key());
return keyComparison == 0 ? o1.value.compareTo(o2.value) : keyComparison;
}
};
Comparator.comparing((KeyValue<Windowed<String>, KeyValue<Integer, Long>> o) -> o.key.key()).thenComparingInt(o -> o.value.key);
Collections.sort(windowedMessages, comparator);
final long firstWindow = firstTimestamp / 500 * 500;
final long secondWindow = secondTimestamp / 500 * 500;
List<KeyValue<Windowed<String>, Integer>> expectResult = Arrays.asList(
new KeyValue<>(new Windowed<>("A", new TimeWindow(firstWindow, Long.MAX_VALUE)), 1),
new KeyValue<>(new Windowed<>("A", new TimeWindow(secondWindow, Long.MAX_VALUE)), 1),
new KeyValue<>(new Windowed<>("A", new TimeWindow(secondWindow, Long.MAX_VALUE)), 2),
new KeyValue<>(new Windowed<>("B", new TimeWindow(firstWindow, Long.MAX_VALUE)), 1),
new KeyValue<>(new Windowed<>("B", new TimeWindow(secondWindow, Long.MAX_VALUE)), 1),
new KeyValue<>(new Windowed<>("B", new TimeWindow(secondWindow, Long.MAX_VALUE)), 2),
new KeyValue<>(new Windowed<>("C", new TimeWindow(firstWindow, Long.MAX_VALUE)), 1),
new KeyValue<>(new Windowed<>("C", new TimeWindow(secondWindow, Long.MAX_VALUE)), 1),
new KeyValue<>(new Windowed<>("C", new TimeWindow(secondWindow, Long.MAX_VALUE)), 2),
new KeyValue<>(new Windowed<>("D", new TimeWindow(firstWindow, Long.MAX_VALUE)), 1),
new KeyValue<>(new Windowed<>("D", new TimeWindow(secondWindow, Long.MAX_VALUE)), 1),
new KeyValue<>(new Windowed<>("D", new TimeWindow(secondWindow, Long.MAX_VALUE)), 2),
new KeyValue<>(new Windowed<>("E", new TimeWindow(firstWindow, Long.MAX_VALUE)), 1),
new KeyValue<>(new Windowed<>("E", new TimeWindow(secondWindow, Long.MAX_VALUE)), 1),
new KeyValue<>(new Windowed<>("E", new TimeWindow(secondWindow, Long.MAX_VALUE)), 2));
final List<KeyValue<Windowed<String>, KeyValue<Integer, Long>>> expectResult = Arrays.asList(
new KeyValue<>(new Windowed<>("A", new TimeWindow(firstWindow, Long.MAX_VALUE)), KeyValue.pair(1, firstTimestamp)),
new KeyValue<>(new Windowed<>("A", new TimeWindow(secondWindow, Long.MAX_VALUE)), KeyValue.pair(1, secondTimestamp)),
new KeyValue<>(new Windowed<>("A", new TimeWindow(secondWindow, Long.MAX_VALUE)), KeyValue.pair(2, secondTimestamp)),
new KeyValue<>(new Windowed<>("B", new TimeWindow(firstWindow, Long.MAX_VALUE)), KeyValue.pair(1, firstTimestamp)),
new KeyValue<>(new Windowed<>("B", new TimeWindow(secondWindow, Long.MAX_VALUE)), KeyValue.pair(1, secondTimestamp)),
new KeyValue<>(new Windowed<>("B", new TimeWindow(secondWindow, Long.MAX_VALUE)), KeyValue.pair(2, secondTimestamp)),
new KeyValue<>(new Windowed<>("C", new TimeWindow(firstWindow, Long.MAX_VALUE)), KeyValue.pair(1, firstTimestamp)),
new KeyValue<>(new Windowed<>("C", new TimeWindow(secondWindow, Long.MAX_VALUE)), KeyValue.pair(1, secondTimestamp)),
new KeyValue<>(new Windowed<>("C", new TimeWindow(secondWindow, Long.MAX_VALUE)), KeyValue.pair(2, secondTimestamp)),
new KeyValue<>(new Windowed<>("D", new TimeWindow(firstWindow, Long.MAX_VALUE)), KeyValue.pair(1, firstTimestamp)),
new KeyValue<>(new Windowed<>("D", new TimeWindow(secondWindow, Long.MAX_VALUE)), KeyValue.pair(1, secondTimestamp)),
new KeyValue<>(new Windowed<>("D", new TimeWindow(secondWindow, Long.MAX_VALUE)), KeyValue.pair(2, secondTimestamp)),
new KeyValue<>(new Windowed<>("E", new TimeWindow(firstWindow, Long.MAX_VALUE)), KeyValue.pair(1, firstTimestamp)),
new KeyValue<>(new Windowed<>("E", new TimeWindow(secondWindow, Long.MAX_VALUE)), KeyValue.pair(1, secondTimestamp)),
new KeyValue<>(new Windowed<>("E", new TimeWindow(secondWindow, Long.MAX_VALUE)), KeyValue.pair(2, secondTimestamp)));
assertThat(windowedMessages, is(expectResult));
Set<String> expectResultString = new HashSet<>(expectResult.size());
for (KeyValue<Windowed<String>, Integer> eachRecord: expectResult) {
expectResultString.add(eachRecord.toString());
final Set<String> expectResultString = new HashSet<>(expectResult.size());
for (final KeyValue<Windowed<String>, KeyValue<Integer, Long>> eachRecord: expectResult) {
expectResultString.add("CreateTime:" + eachRecord.value.value + ", " + eachRecord.key.toString() + ", " + eachRecord.value.key);
}
// check every message is contained in the expect result
String[] allRecords = resultFromConsoleConsumer.split("\n");
for (String record: allRecords) {
record = "KeyValue(" + record + ")";
final String[] allRecords = resultFromConsoleConsumer.split("\n");
for (final String record: allRecords) {
assertTrue(expectResultString.contains(record));
}
@ -419,12 +380,7 @@ public class KStreamAggregationIntegrationTest { @@ -419,12 +380,7 @@ public class KStreamAggregationIntegrationTest {
new StringDeserializer(),
new LongDeserializer(),
10);
Collections.sort(results, new Comparator<KeyValue<String, Long>>() {
@Override
public int compare(final KeyValue<String, Long> o1, final KeyValue<String, Long> o2) {
return KStreamAggregationIntegrationTest.compare(o1, o2);
}
});
Collections.sort(results, KStreamAggregationIntegrationTest::compare);
assertThat(results, is(Arrays.asList(
KeyValue.pair("A", 1L),
@ -444,7 +400,7 @@ public class KStreamAggregationIntegrationTest { @@ -444,7 +400,7 @@ public class KStreamAggregationIntegrationTest {
public void shouldCount() throws Exception {
produceMessages(mockTime.milliseconds());
groupedStream.count(Materialized.<String, Long, KeyValueStore<Bytes, byte[]>>as("count-by-key"))
groupedStream.count(Materialized.as("count-by-key"))
.toStream()
.to(outputTopic, Produced.with(Serdes.String(), Serdes.Long()));
@ -471,12 +427,7 @@ public class KStreamAggregationIntegrationTest { @@ -471,12 +427,7 @@ public class KStreamAggregationIntegrationTest {
stream.groupByKey(Serialized.with(Serdes.Integer(), Serdes.String()))
.windowedBy(TimeWindows.of(500L))
.count()
.toStream(new KeyValueMapper<Windowed<Integer>, Long, String>() {
@Override
public String apply(final Windowed<Integer> windowedKey, final Long value) {
return windowedKey.key() + "@" + windowedKey.window().start();
}
}).to(outputTopic, Produced.with(Serdes.String(), Serdes.Long()));
.toStream((windowedKey, value) -> windowedKey.key() + "@" + windowedKey.window().start()).to(outputTopic, Produced.with(Serdes.String(), Serdes.Long()));
startStreams();
@ -484,12 +435,7 @@ public class KStreamAggregationIntegrationTest { @@ -484,12 +435,7 @@ public class KStreamAggregationIntegrationTest {
new StringDeserializer(),
new LongDeserializer(),
10);
Collections.sort(results, new Comparator<KeyValue<String, Long>>() {
@Override
public int compare(final KeyValue<String, Long> o1, final KeyValue<String, Long> o2) {
return KStreamAggregationIntegrationTest.compare(o1, o2);
}
});
Collections.sort(results, KStreamAggregationIntegrationTest::compare);
final long window = timestamp / 500 * 500;
assertThat(results, is(Arrays.asList(
@ -568,7 +514,7 @@ public class KStreamAggregationIntegrationTest { @@ -568,7 +514,7 @@ public class KStreamAggregationIntegrationTest {
new Properties()),
t4);
final Map<Windowed<String>, Long> results = new HashMap<>();
final Map<Windowed<String>, KeyValue<Long, Long>> results = new HashMap<>();
final CountDownLatch latch = new CountDownLatch(11);
builder.stream(userSessionsStream, Consumed.with(Serdes.String(), Serdes.String()))
@ -576,23 +522,34 @@ public class KStreamAggregationIntegrationTest { @@ -576,23 +522,34 @@ public class KStreamAggregationIntegrationTest {
.windowedBy(SessionWindows.with(sessionGap).until(maintainMillis))
.count()
.toStream()
.foreach(new ForeachAction<Windowed<String>, Long>() {
@Override
public void apply(final Windowed<String> key, final Long value) {
results.put(key, value);
latch.countDown();
}
});
.transform(() -> new Transformer<Windowed<String>, Long, KeyValue<Object, Object>>() {
private ProcessorContext context;
@Override
public void init(final ProcessorContext context) {
this.context = context;
}
@Override
public KeyValue<Object, Object> transform(final Windowed<String> key, final Long value) {
results.put(key, KeyValue.pair(value, context.timestamp()));
latch.countDown();
return null;
}
@Override
public void close() {}
});
startStreams();
latch.await(30, TimeUnit.SECONDS);
assertThat(results.get(new Windowed<>("bob", new SessionWindow(t1, t1))), equalTo(1L));
assertThat(results.get(new Windowed<>("penny", new SessionWindow(t1, t1))), equalTo(1L));
assertThat(results.get(new Windowed<>("jo", new SessionWindow(t1, t1))), equalTo(1L));
assertThat(results.get(new Windowed<>("jo", new SessionWindow(t4, t4))), equalTo(1L));
assertThat(results.get(new Windowed<>("emily", new SessionWindow(t1, t2))), equalTo(2L));
assertThat(results.get(new Windowed<>("bob", new SessionWindow(t3, t4))), equalTo(2L));
assertThat(results.get(new Windowed<>("penny", new SessionWindow(t3, t3))), equalTo(1L));
assertThat(results.get(new Windowed<>("bob", new SessionWindow(t1, t1))), equalTo(KeyValue.pair(1L, t1)));
assertThat(results.get(new Windowed<>("penny", new SessionWindow(t1, t1))), equalTo(KeyValue.pair(1L, t1)));
assertThat(results.get(new Windowed<>("jo", new SessionWindow(t1, t1))), equalTo(KeyValue.pair(1L, t1)));
assertThat(results.get(new Windowed<>("jo", new SessionWindow(t4, t4))), equalTo(KeyValue.pair(1L, t4)));
assertThat(results.get(new Windowed<>("emily", new SessionWindow(t1, t2))), equalTo(KeyValue.pair(2L, t2)));
assertThat(results.get(new Windowed<>("bob", new SessionWindow(t3, t4))), equalTo(KeyValue.pair(2L, t4)));
assertThat(results.get(new Windowed<>("penny", new SessionWindow(t3, t3))), equalTo(KeyValue.pair(1L, t3)));
}
@Test
@ -662,25 +619,17 @@ public class KStreamAggregationIntegrationTest { @@ -662,25 +619,17 @@ public class KStreamAggregationIntegrationTest {
builder.stream(userSessionsStream, Consumed.with(Serdes.String(), Serdes.String()))
.groupByKey(Serialized.with(Serdes.String(), Serdes.String()))
.windowedBy(SessionWindows.with(sessionGap).until(maintainMillis))
.reduce(new Reducer<String>() {
@Override
public String apply(final String value1, final String value2) {
return value1 + ":" + value2;
}
}, Materialized.<String, String, SessionStore<Bytes, byte[]>>as(userSessionsStore))
.reduce((value1, value2) -> value1 + ":" + value2, Materialized.as(userSessionsStore))
.toStream()
.foreach(new ForeachAction<Windowed<String>, String>() {
@Override
public void apply(final Windowed<String> key, final String value) {
results.put(key, value);
latch.countDown();
}
.foreach((key, value) -> {
results.put(key, value);
latch.countDown();
});
startStreams();
latch.await(30, TimeUnit.SECONDS);
final ReadOnlySessionStore<String, String> sessionStore
= kafkaStreams.store(userSessionsStore, QueryableStoreTypes.<String, String>sessionStore());
= kafkaStreams.store(userSessionsStore, QueryableStoreTypes.sessionStore());
// verify correct data received
assertThat(results.get(new Windowed<>("bob", new SessionWindow(t1, t1))), equalTo("start"));
@ -732,16 +681,14 @@ public class KStreamAggregationIntegrationTest { @@ -732,16 +681,14 @@ public class KStreamAggregationIntegrationTest {
}
private <K, V> List<KeyValue<K, V>> receiveMessages(final Deserializer<K> keyDeserializer,
final Deserializer<V> valueDeserializer,
final int numMessages)
final Deserializer<V> valueDeserializer,
final int numMessages)
throws InterruptedException {
return receiveMessages(keyDeserializer, valueDeserializer, null, numMessages);
}
private <K, V> List<KeyValue<K, V>> receiveMessages(final Deserializer<K>
keyDeserializer,
final Deserializer<V>
valueDeserializer,
private <K, V> List<KeyValue<K, V>> receiveMessages(final Deserializer<K> keyDeserializer,
final Deserializer<V> valueDeserializer,
final Class innerClass,
final int numMessages) throws InterruptedException {
final Properties consumerProperties = new Properties();
@ -761,21 +708,44 @@ public class KStreamAggregationIntegrationTest { @@ -761,21 +708,44 @@ public class KStreamAggregationIntegrationTest {
60 * 1000);
}
private <K, V> List<KeyValue<K, KeyValue<V, Long>>> receiveMessagesWithTimestamp(final Deserializer<K> keyDeserializer,
final Deserializer<V> valueDeserializer,
final Class innerClass,
final int numMessages) throws InterruptedException {
final Properties consumerProperties = new Properties();
consumerProperties.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, CLUSTER.bootstrapServers());
consumerProperties.setProperty(ConsumerConfig.GROUP_ID_CONFIG, "kgroupedstream-test-" + testNo);
consumerProperties.setProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
consumerProperties.setProperty(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, keyDeserializer.getClass().getName());
consumerProperties.setProperty(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, valueDeserializer.getClass().getName());
if (keyDeserializer instanceof TimeWindowedDeserializer || keyDeserializer instanceof SessionWindowedDeserializer) {
consumerProperties.setProperty(StreamsConfig.DEFAULT_WINDOWED_KEY_SERDE_INNER_CLASS,
Serdes.serdeFrom(innerClass).getClass().getName());
}
return IntegrationTestUtils.waitUntilMinKeyValueWithTimestampRecordsReceived(
consumerProperties,
outputTopic,
numMessages,
60 * 1000);
}
private <K, V> String readWindowedKeyedMessagesViaConsoleConsumer(final Deserializer<K> keyDeserializer,
final Deserializer<V> valueDeserializer,
final Class innerClass,
final int numMessages) {
ByteArrayOutputStream newConsole = new ByteArrayOutputStream();
PrintStream originalStream = System.out;
try (PrintStream newStream = new PrintStream(newConsole)) {
final int numMessages,
final boolean printTimestamp) {
final ByteArrayOutputStream newConsole = new ByteArrayOutputStream();
final PrintStream originalStream = System.out;
try (final PrintStream newStream = new PrintStream(newConsole)) {
System.setOut(newStream);
String keySeparator = ", ";
final String keySeparator = ", ";
// manually construct the console consumer argument array
String[] args = new String[] {
final String[] args = new String[] {
"--bootstrap-server", CLUSTER.bootstrapServers(),
"--from-beginning",
"--property", "print.key=true",
"--property", "print.timestamp=" + printTimestamp,
"--topic", outputTopic,
"--max-messages", String.valueOf(numMessages),
"--property", "key.deserializer=" + keyDeserializer.getClass().getName(),

118
streams/src/test/java/org/apache/kafka/streams/integration/utils/IntegrationTestUtils.java

@ -192,7 +192,7 @@ public class IntegrationTestUtils { @@ -192,7 +192,7 @@ public class IntegrationTestUtils {
final Long timestamp,
final boolean enabledTransactions)
throws ExecutionException, InterruptedException {
try (Producer<K, V> producer = new KafkaProducer<>(producerConfig)) {
try (final Producer<K, V> producer = new KafkaProducer<>(producerConfig)) {
if (enabledTransactions) {
producer.initTransactions();
producer.beginTransaction();
@ -310,14 +310,39 @@ public class IntegrationTestUtils { @@ -310,14 +310,39 @@ public class IntegrationTestUtils {
final long waitTime) throws InterruptedException {
final List<KeyValue<K, V>> accumData = new ArrayList<>();
try (final Consumer<K, V> consumer = createConsumer(consumerConfig)) {
final TestCondition valuesRead = new TestCondition() {
@Override
public boolean conditionMet() {
final List<KeyValue<K, V>> readData =
readKeyValues(topic, consumer, waitTime, expectedNumRecords);
accumData.addAll(readData);
return accumData.size() >= expectedNumRecords;
}
final TestCondition valuesRead = () -> {
final List<KeyValue<K, V>> readData =
readKeyValues(topic, consumer, waitTime, expectedNumRecords);
accumData.addAll(readData);
return accumData.size() >= expectedNumRecords;
};
final String conditionDetails = "Did not receive all " + expectedNumRecords + " records from topic " + topic;
TestUtils.waitForCondition(valuesRead, waitTime, conditionDetails);
}
return accumData;
}
/**
* Wait until enough data (key-value records) has been consumed.
*
* @param consumerConfig Kafka Consumer configuration
* @param topic Topic to consume from
* @param expectedNumRecords Minimum number of expected records
* @param waitTime Upper bound in waiting time in milliseconds
* @return All the records consumed, or null if no records are consumed
* @throws AssertionError if the given wait time elapses
*/
public static <K, V> List<KeyValue<K, KeyValue<V, Long>>> waitUntilMinKeyValueWithTimestampRecordsReceived(final Properties consumerConfig,
final String topic,
final int expectedNumRecords,
final long waitTime) throws InterruptedException {
final List<KeyValue<K, KeyValue<V, Long>>> accumData = new ArrayList<>();
try (final Consumer<K, V> consumer = createConsumer(consumerConfig)) {
final TestCondition valuesRead = () -> {
final List<KeyValue<K, KeyValue<V, Long>>> readData =
readKeyValuesWithTimestamp(topic, consumer, waitTime, expectedNumRecords);
accumData.addAll(readData);
return accumData.size() >= expectedNumRecords;
};
final String conditionDetails = "Did not receive all " + expectedNumRecords + " records from topic " + topic;
TestUtils.waitForCondition(valuesRead, waitTime, conditionDetails);
@ -331,14 +356,11 @@ public class IntegrationTestUtils { @@ -331,14 +356,11 @@ public class IntegrationTestUtils {
final long waitTime) throws InterruptedException {
final List<ConsumerRecord<K, V>> accumData = new ArrayList<>();
try (final Consumer<K, V> consumer = createConsumer(consumerConfig)) {
final TestCondition valuesRead = new TestCondition() {
@Override
public boolean conditionMet() {
final List<ConsumerRecord<K, V>> readData =
readRecords(topic, consumer, waitTime, expectedNumRecords);
accumData.addAll(readData);
return accumData.size() >= expectedNumRecords;
}
final TestCondition valuesRead = () -> {
final List<ConsumerRecord<K, V>> readData =
readRecords(topic, consumer, waitTime, expectedNumRecords);
accumData.addAll(readData);
return accumData.size() >= expectedNumRecords;
};
final String conditionDetails = "Did not receive all " + expectedNumRecords + " records from topic " + topic;
TestUtils.waitForCondition(valuesRead, waitTime, conditionDetails);
@ -369,14 +391,11 @@ public class IntegrationTestUtils { @@ -369,14 +391,11 @@ public class IntegrationTestUtils {
final long waitTime) throws InterruptedException {
final List<V> accumData = new ArrayList<>();
try (final Consumer<Object, V> consumer = createConsumer(consumerConfig)) {
final TestCondition valuesRead = new TestCondition() {
@Override
public boolean conditionMet() {
final List<V> readData =
readValues(topic, consumer, waitTime, expectedNumRecords);
accumData.addAll(readData);
return accumData.size() >= expectedNumRecords;
}
final TestCondition valuesRead = () -> {
final List<V> readData =
readValues(topic, consumer, waitTime, expectedNumRecords);
accumData.addAll(readData);
return accumData.size() >= expectedNumRecords;
};
final String conditionDetails = "Did not receive all " + expectedNumRecords + " records from topic " + topic;
TestUtils.waitForCondition(valuesRead, waitTime, conditionDetails);
@ -401,23 +420,20 @@ public class IntegrationTestUtils { @@ -401,23 +420,20 @@ public class IntegrationTestUtils {
final String topic,
final int partition,
final long timeout) throws InterruptedException {
TestUtils.waitForCondition(new TestCondition() {
@Override
public boolean conditionMet() {
for (final KafkaServer server : servers) {
final MetadataCache metadataCache = server.apis().metadataCache();
final Option<UpdateMetadataRequest.PartitionState> partitionInfo =
metadataCache.getPartitionInfo(topic, partition);
if (partitionInfo.isEmpty()) {
return false;
}
final UpdateMetadataRequest.PartitionState metadataPartitionState = partitionInfo.get();
if (!Request.isValidBrokerId(metadataPartitionState.basePartitionState.leader)) {
return false;
}
TestUtils.waitForCondition(() -> {
for (final KafkaServer server : servers) {
final MetadataCache metadataCache = server.apis().metadataCache();
final Option<UpdateMetadataRequest.PartitionState> partitionInfo =
metadataCache.getPartitionInfo(topic, partition);
if (partitionInfo.isEmpty()) {
return false;
}
final UpdateMetadataRequest.PartitionState metadataPartitionState = partitionInfo.get();
if (!Request.isValidBrokerId(metadataPartitionState.basePartitionState.leader)) {
return false;
}
return true;
}
return true;
}, timeout, "metadata for topic=" + topic + " partition=" + partition + " not propagated to all brokers");
}
@ -502,6 +518,28 @@ public class IntegrationTestUtils { @@ -502,6 +518,28 @@ public class IntegrationTestUtils {
return consumedValues;
}
/**
* Returns up to `maxMessages` by reading via the provided consumer (the topic(s) to read from
* are already configured in the consumer).
*
* @param topic Kafka topic to read messages from
* @param consumer Kafka consumer
* @param waitTime Maximum wait time in milliseconds
* @param maxMessages Maximum number of messages to read via the consumer
* @return The KeyValue elements retrieved via the consumer
*/
private static <K, V> List<KeyValue<K, KeyValue<V, Long>>> readKeyValuesWithTimestamp(final String topic,
final Consumer<K, V> consumer,
final long waitTime,
final int maxMessages) {
final List<KeyValue<K, KeyValue<V, Long>>> consumedValues = new ArrayList<>();
final List<ConsumerRecord<K, V>> records = readRecords(topic, consumer, waitTime, maxMessages);
for (final ConsumerRecord<K, V> record : records) {
consumedValues.add(new KeyValue<>(record.key(), KeyValue.pair(record.value(), record.timestamp())));
}
return consumedValues;
}
private static <K, V> List<ConsumerRecord<K, V>> readRecords(final String topic,
final Consumer<K, V> consumer,
final long waitTime,

Loading…
Cancel
Save