Browse Source

KAFKA-6592: ConsoleConsumer should support WindowedSerdes (#4797)

Have Console consumer support TimeWindowedDeserializer/SessionWindowedDeserializer.

Reviewers: Guozhang Wang <wangguoz@gmail.com>
pull/4863/head
huxi 7 years ago committed by Guozhang Wang
parent
commit
4e35a2bfb7
  1. 47
      core/src/main/scala/kafka/tools/ConsoleConsumer.scala
  2. 19
      core/src/test/scala/unit/kafka/tools/ConsoleConsumerTest.scala
  3. 232
      streams/src/test/java/org/apache/kafka/streams/integration/KStreamAggregationIntegrationTest.java

47
core/src/main/scala/kafka/tools/ConsoleConsumer.scala

@ -37,6 +37,7 @@ import org.apache.kafka.common.record.TimestampType
import org.apache.kafka.common.serialization.{ByteArrayDeserializer, Deserializer} import org.apache.kafka.common.serialization.{ByteArrayDeserializer, Deserializer}
import org.apache.kafka.common.utils.Utils import org.apache.kafka.common.utils.Utils
import scala.collection.JavaConversions
import scala.collection.JavaConverters._ import scala.collection.JavaConverters._
/** /**
@ -45,6 +46,11 @@ import scala.collection.JavaConverters._
object ConsoleConsumer extends Logging { object ConsoleConsumer extends Logging {
var messageCount = 0 var messageCount = 0
// Keep same names with StreamConfig.DEFAULT_WINDOWED_KEY_SERDE_INNER_CLASS
// and StreamConfig.DEFAULT_WINDOWED_VALUE_SERDE_INNER_CLASS
// visible for testing
private[tools] val innerKeySerdeName = "default.windowed.key.serde.inner"
private[tools] val innerValueSerdeName = "default.windowed.value.serde.inner"
private val shutdownLatch = new CountDownLatch(1) private val shutdownLatch = new CountDownLatch(1)
@ -291,7 +297,17 @@ object ConsoleConsumer extends Logging {
.describedAs("class") .describedAs("class")
.ofType(classOf[String]) .ofType(classOf[String])
.defaultsTo(classOf[DefaultMessageFormatter].getName) .defaultsTo(classOf[DefaultMessageFormatter].getName)
val messageFormatterArgOpt = parser.accepts("property", "The properties to initialize the message formatter.") val messageFormatterArgOpt = parser.accepts("property",
"The properties to initialize the message formatter. Default properties include:\n" +
"\tprint.timestamp=true|false\n" +
"\tprint.key=true|false\n" +
"\tprint.value=true|false\n" +
"\tkey.separator=<key.separator>\n" +
"\tline.separator=<line.separator>\n" +
"\tkey.deserializer=<key.deserializer>\n" +
"\tvalue.deserializer=<value.deserializer>\n" +
"\tdefault.windowed.key.serde.inner=<windowed.key.serde.inner>\n" +
"\tdefault.windowed.value.serde.inner=<windowed.value.serde.inner>")
.withRequiredArg .withRequiredArg
.describedAs("prop") .describedAs("prop")
.ofType(classOf[String]) .ofType(classOf[String])
@ -328,6 +344,18 @@ object ConsoleConsumer extends Logging {
.withRequiredArg .withRequiredArg
.describedAs("deserializer for values") .describedAs("deserializer for values")
.ofType(classOf[String]) .ofType(classOf[String])
val innerKeyDeserializerOpt = parser.accepts(innerKeySerdeName,
"inner serde for key when windowed deserialzier is used; would be ignored otherwise. " +
"For example: org.apache.kafka.common.serialization.Serdes\\$StringSerde")
.withRequiredArg
.describedAs("inner serde for key")
.ofType(classOf[String])
val innerValueDeserializerOpt = parser.accepts(innerValueSerdeName,
"inner serde for value when windowed deserialzier is used; would be ignored otherwise. " +
"For example: org.apache.kafka.common.serialization.Serdes\\$StringSerde")
.withRequiredArg
.describedAs("inner serde for values")
.ofType(classOf[String])
val enableSystestEventsLoggingOpt = parser.accepts("enable-systest-events", val enableSystestEventsLoggingOpt = parser.accepts("enable-systest-events",
"Log lifecycle events of the consumer in addition to logging consumed " + "Log lifecycle events of the consumer in addition to logging consumed " +
"messages. (This is specific for system tests.)") "messages. (This is specific for system tests.)")
@ -372,6 +400,8 @@ object ConsoleConsumer extends Logging {
val bootstrapServer = options.valueOf(bootstrapServerOpt) val bootstrapServer = options.valueOf(bootstrapServerOpt)
val keyDeserializer = options.valueOf(keyDeserializerOpt) val keyDeserializer = options.valueOf(keyDeserializerOpt)
val valueDeserializer = options.valueOf(valueDeserializerOpt) val valueDeserializer = options.valueOf(valueDeserializerOpt)
val innerKeyDeserializer = options.valueOf(innerKeyDeserializerOpt)
val innerValueDeserializer = options.valueOf(innerValueDeserializerOpt)
val isolationLevel = options.valueOf(isolationLevelOpt).toString val isolationLevel = options.valueOf(isolationLevelOpt).toString
val formatter: MessageFormatter = messageFormatterClass.newInstance().asInstanceOf[MessageFormatter] val formatter: MessageFormatter = messageFormatterClass.newInstance().asInstanceOf[MessageFormatter]
@ -381,6 +411,13 @@ object ConsoleConsumer extends Logging {
if (valueDeserializer != null && !valueDeserializer.isEmpty) { if (valueDeserializer != null && !valueDeserializer.isEmpty) {
formatterArgs.setProperty(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, valueDeserializer) formatterArgs.setProperty(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, valueDeserializer)
} }
if (innerKeyDeserializer != null && !innerKeyDeserializer.isEmpty) {
formatterArgs.setProperty(innerKeySerdeName, innerKeyDeserializer)
}
if (innerValueDeserializer != null && !innerValueDeserializer.isEmpty) {
formatterArgs.setProperty(innerValueSerdeName, innerValueDeserializer)
}
formatter.init(formatterArgs) formatter.init(formatterArgs)
if (useOldConsumer) { if (useOldConsumer) {
@ -521,11 +558,15 @@ class DefaultMessageFormatter extends MessageFormatter {
if (props.containsKey("line.separator")) if (props.containsKey("line.separator"))
lineSeparator = props.getProperty("line.separator").getBytes(StandardCharsets.UTF_8) lineSeparator = props.getProperty("line.separator").getBytes(StandardCharsets.UTF_8)
// Note that `toString` will be called on the instance returned by `Deserializer.deserialize` // Note that `toString` will be called on the instance returned by `Deserializer.deserialize`
if (props.containsKey("key.deserializer")) if (props.containsKey("key.deserializer")) {
keyDeserializer = Some(Class.forName(props.getProperty("key.deserializer")).newInstance().asInstanceOf[Deserializer[_]]) keyDeserializer = Some(Class.forName(props.getProperty("key.deserializer")).newInstance().asInstanceOf[Deserializer[_]])
keyDeserializer.get.configure(JavaConversions.propertiesAsScalaMap(props).asJava, true)
}
// Note that `toString` will be called on the instance returned by `Deserializer.deserialize` // Note that `toString` will be called on the instance returned by `Deserializer.deserialize`
if (props.containsKey("value.deserializer")) if (props.containsKey("value.deserializer")) {
valueDeserializer = Some(Class.forName(props.getProperty("value.deserializer")).newInstance().asInstanceOf[Deserializer[_]]) valueDeserializer = Some(Class.forName(props.getProperty("value.deserializer")).newInstance().asInstanceOf[Deserializer[_]])
valueDeserializer.get.configure(JavaConversions.propertiesAsScalaMap(props).asJava, false)
}
} }
def writeTo(consumerRecord: ConsumerRecord[Array[Byte], Array[Byte]], output: PrintStream) { def writeTo(consumerRecord: ConsumerRecord[Array[Byte], Array[Byte]], output: PrintStream) {

19
core/src/test/scala/unit/kafka/tools/ConsoleConsumerTest.scala

@ -25,6 +25,7 @@ import kafka.utils.{Exit, TestUtils}
import org.apache.kafka.clients.consumer.{ConsumerRecord, MockConsumer, OffsetResetStrategy} import org.apache.kafka.clients.consumer.{ConsumerRecord, MockConsumer, OffsetResetStrategy}
import org.apache.kafka.common.TopicPartition import org.apache.kafka.common.TopicPartition
import org.apache.kafka.clients.consumer.ConsumerConfig import org.apache.kafka.clients.consumer.ConsumerConfig
import org.apache.kafka.common.serialization.ByteArrayDeserializer
import org.easymock.EasyMock import org.easymock.EasyMock
import org.junit.Assert._ import org.junit.Assert._
import org.junit.{Before, Test} import org.junit.{Before, Test}
@ -537,4 +538,22 @@ class ConsoleConsumerTest {
Exit.resetExitProcedure() Exit.resetExitProcedure()
} }
@Test
def testCustomPropertyShouldBePassedToConfigureMethod(): Unit = {
val args = Array(
"--bootstrap-server", "localhost:9092",
"--topic", "test",
"--property", "print.key=true",
"--property", "key.deserializer=org.apache.kafka.common.serialization.ByteArrayDeserializer",
"--" + ConsoleConsumer.innerKeySerdeName, "org.apache.kafka.common.serialization.Serdes$StringSerde",
"--property", "my-test1=abc"
)
val config = new ConsoleConsumer.ConsumerConfig(args)
assertTrue(config.formatter.isInstanceOf[DefaultMessageFormatter])
val formatter = config.formatter.asInstanceOf[DefaultMessageFormatter]
assertTrue(formatter.keyDeserializer.get.isInstanceOf[ByteArrayDeserializer])
assertTrue(config.formatterArgs.containsKey("my-test1"))
assertTrue(config.formatterArgs.containsKey(ConsoleConsumer.innerKeySerdeName))
}
} }

232
streams/src/test/java/org/apache/kafka/streams/integration/KStreamAggregationIntegrationTest.java

@ -16,12 +16,14 @@
*/ */
package org.apache.kafka.streams.integration; package org.apache.kafka.streams.integration;
import kafka.tools.ConsoleConsumer;
import kafka.utils.MockTime; import kafka.utils.MockTime;
import org.apache.kafka.clients.consumer.ConsumerConfig; import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.common.serialization.Deserializer; import org.apache.kafka.common.serialization.Deserializer;
import org.apache.kafka.common.serialization.IntegerDeserializer; import org.apache.kafka.common.serialization.IntegerDeserializer;
import org.apache.kafka.common.serialization.IntegerSerializer; import org.apache.kafka.common.serialization.IntegerSerializer;
import org.apache.kafka.common.serialization.LongDeserializer; import org.apache.kafka.common.serialization.LongDeserializer;
import org.apache.kafka.common.serialization.Serde;
import org.apache.kafka.common.serialization.Serdes; import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.common.serialization.StringDeserializer; import org.apache.kafka.common.serialization.StringDeserializer;
import org.apache.kafka.common.serialization.StringSerializer; import org.apache.kafka.common.serialization.StringSerializer;
@ -43,10 +45,14 @@ import org.apache.kafka.streams.kstream.Materialized;
import org.apache.kafka.streams.kstream.Produced; import org.apache.kafka.streams.kstream.Produced;
import org.apache.kafka.streams.kstream.Reducer; import org.apache.kafka.streams.kstream.Reducer;
import org.apache.kafka.streams.kstream.Serialized; import org.apache.kafka.streams.kstream.Serialized;
import org.apache.kafka.streams.kstream.SessionWindowedDeserializer;
import org.apache.kafka.streams.kstream.SessionWindows; import org.apache.kafka.streams.kstream.SessionWindows;
import org.apache.kafka.streams.kstream.TimeWindowedDeserializer;
import org.apache.kafka.streams.kstream.TimeWindows; import org.apache.kafka.streams.kstream.TimeWindows;
import org.apache.kafka.streams.kstream.Windowed; import org.apache.kafka.streams.kstream.Windowed;
import org.apache.kafka.streams.kstream.WindowedSerdes;
import org.apache.kafka.streams.kstream.internals.SessionWindow; import org.apache.kafka.streams.kstream.internals.SessionWindow;
import org.apache.kafka.streams.kstream.internals.TimeWindow;
import org.apache.kafka.streams.state.KeyValueIterator; import org.apache.kafka.streams.state.KeyValueIterator;
import org.apache.kafka.streams.state.QueryableStoreTypes; import org.apache.kafka.streams.state.QueryableStoreTypes;
import org.apache.kafka.streams.state.ReadOnlySessionStore; import org.apache.kafka.streams.state.ReadOnlySessionStore;
@ -60,14 +66,18 @@ import org.junit.ClassRule;
import org.junit.Test; import org.junit.Test;
import org.junit.experimental.categories.Category; import org.junit.experimental.categories.Category;
import java.io.ByteArrayOutputStream;
import java.io.IOException; import java.io.IOException;
import java.io.PrintStream;
import java.util.Arrays; import java.util.Arrays;
import java.util.Collections; import java.util.Collections;
import java.util.Comparator; import java.util.Comparator;
import java.util.HashMap; import java.util.HashMap;
import java.util.HashSet;
import java.util.List; import java.util.List;
import java.util.Map; import java.util.Map;
import java.util.Properties; import java.util.Properties;
import java.util.Set;
import java.util.concurrent.CountDownLatch; import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeUnit;
@ -75,6 +85,7 @@ import static org.hamcrest.CoreMatchers.equalTo;
import static org.hamcrest.MatcherAssert.assertThat; import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.core.Is.is; import static org.hamcrest.core.Is.is;
import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertTrue;
@Category({IntegrationTest.class}) @Category({IntegrationTest.class})
public class KStreamAggregationIntegrationTest { public class KStreamAggregationIntegrationTest {
@ -205,32 +216,36 @@ public class KStreamAggregationIntegrationTest {
produceMessages(secondBatchTimestamp); produceMessages(secondBatchTimestamp);
produceMessages(secondBatchTimestamp); produceMessages(secondBatchTimestamp);
Serde<Windowed<String>> windowedSerde = WindowedSerdes.timeWindowedSerdeFrom(String.class);
groupedStream groupedStream
.windowedBy(TimeWindows.of(500L)) .windowedBy(TimeWindows.of(500L))
.reduce(reducer) .reduce(reducer)
.toStream(new KeyValueMapper<Windowed<String>, String, String>() { .toStream()
@Override .to(outputTopic, Produced.with(windowedSerde, Serdes.String()));
public String apply(final Windowed<String> windowedKey, final String value) {
return windowedKey.key() + "@" + windowedKey.window().start();
}
})
.to(outputTopic, Produced.with(Serdes.String(), Serdes.String()));
startStreams(); startStreams();
final List<KeyValue<String, String>> windowedOutput = receiveMessages( final List<KeyValue<Windowed<String>, String>> windowedOutput = receiveMessages(
new StringDeserializer(), new TimeWindowedDeserializer<String>(),
new StringDeserializer(), new StringDeserializer(),
String.class,
15); 15);
final Comparator<KeyValue<String, String>> // read from ConsoleConsumer
String resultFromConsoleConsumer = readWindowedKeyedMessagesViaConsoleConsumer(
new TimeWindowedDeserializer<String>(),
new StringDeserializer(),
String.class,
15);
final Comparator<KeyValue<Windowed<String>, String>>
comparator = comparator =
new Comparator<KeyValue<String, String>>() { new Comparator<KeyValue<Windowed<String>, String>>() {
@Override @Override
public int compare(final KeyValue<String, String> o1, public int compare(final KeyValue<Windowed<String>, String> o1,
final KeyValue<String, String> o2) { final KeyValue<Windowed<String>, String> o2) {
return KStreamAggregationIntegrationTest.compare(o1, o2); final int keyComparison = o1.key.key().compareTo(o2.key.key());
return keyComparison == 0 ? o1.value.compareTo(o2.value) : keyComparison;
} }
}; };
@ -238,25 +253,36 @@ public class KStreamAggregationIntegrationTest {
final long firstBatchWindow = firstBatchTimestamp / 500 * 500; final long firstBatchWindow = firstBatchTimestamp / 500 * 500;
final long secondBatchWindow = secondBatchTimestamp / 500 * 500; final long secondBatchWindow = secondBatchTimestamp / 500 * 500;
assertThat(windowedOutput, is( List<KeyValue<Windowed<String>, String>> expectResult = Arrays.asList(
Arrays.asList( new KeyValue<>(new Windowed<>("A", new TimeWindow(firstBatchWindow, Long.MAX_VALUE)), "A"),
new KeyValue<>("A@" + firstBatchWindow, "A"), new KeyValue<>(new Windowed<>("A", new TimeWindow(secondBatchWindow, Long.MAX_VALUE)), "A"),
new KeyValue<>("A@" + secondBatchWindow, "A"), new KeyValue<>(new Windowed<>("A", new TimeWindow(secondBatchWindow, Long.MAX_VALUE)), "A:A"),
new KeyValue<>("A@" + secondBatchWindow, "A:A"), new KeyValue<>(new Windowed<>("B", new TimeWindow(firstBatchWindow, Long.MAX_VALUE)), "B"),
new KeyValue<>("B@" + firstBatchWindow, "B"), new KeyValue<>(new Windowed<>("B", new TimeWindow(secondBatchWindow, Long.MAX_VALUE)), "B"),
new KeyValue<>("B@" + secondBatchWindow, "B"), new KeyValue<>(new Windowed<>("B", new TimeWindow(secondBatchWindow, Long.MAX_VALUE)), "B:B"),
new KeyValue<>("B@" + secondBatchWindow, "B:B"), new KeyValue<>(new Windowed<>("C", new TimeWindow(firstBatchWindow, Long.MAX_VALUE)), "C"),
new KeyValue<>("C@" + firstBatchWindow, "C"), new KeyValue<>(new Windowed<>("C", new TimeWindow(secondBatchWindow, Long.MAX_VALUE)), "C"),
new KeyValue<>("C@" + secondBatchWindow, "C"), new KeyValue<>(new Windowed<>("C", new TimeWindow(secondBatchWindow, Long.MAX_VALUE)), "C:C"),
new KeyValue<>("C@" + secondBatchWindow, "C:C"), new KeyValue<>(new Windowed<>("D", new TimeWindow(firstBatchWindow, Long.MAX_VALUE)), "D"),
new KeyValue<>("D@" + firstBatchWindow, "D"), new KeyValue<>(new Windowed<>("D", new TimeWindow(secondBatchWindow, Long.MAX_VALUE)), "D"),
new KeyValue<>("D@" + secondBatchWindow, "D"), new KeyValue<>(new Windowed<>("D", new TimeWindow(secondBatchWindow, Long.MAX_VALUE)), "D:D"),
new KeyValue<>("D@" + secondBatchWindow, "D:D"), new KeyValue<>(new Windowed<>("E", new TimeWindow(firstBatchWindow, Long.MAX_VALUE)), "E"),
new KeyValue<>("E@" + firstBatchWindow, "E"), new KeyValue<>(new Windowed<>("E", new TimeWindow(secondBatchWindow, Long.MAX_VALUE)), "E"),
new KeyValue<>("E@" + secondBatchWindow, "E"), new KeyValue<>(new Windowed<>("E", new TimeWindow(secondBatchWindow, Long.MAX_VALUE)), "E:E")
new KeyValue<>("E@" + secondBatchWindow, "E:E") );
) assertThat(windowedOutput, is(expectResult));
));
Set<String> expectResultString = new HashSet<>(expectResult.size());
for (KeyValue<Windowed<String>, String> eachRecord: expectResult) {
expectResultString.add(eachRecord.toString());
}
// check every message is contained in the expect result
String[] allRecords = resultFromConsoleConsumer.split("\n");
for (String record: allRecords) {
record = "KeyValue(" + record + ")";
assertTrue(expectResultString.contains(record));
}
} }
@SuppressWarnings("deprecation") @SuppressWarnings("deprecation")
@ -309,34 +335,39 @@ public class KStreamAggregationIntegrationTest {
produceMessages(secondTimestamp); produceMessages(secondTimestamp);
produceMessages(secondTimestamp); produceMessages(secondTimestamp);
Serde<Windowed<String>> windowedSerde = WindowedSerdes.timeWindowedSerdeFrom(String.class);
groupedStream.windowedBy(TimeWindows.of(500L)) groupedStream.windowedBy(TimeWindows.of(500L))
.aggregate( .aggregate(
initializer, initializer,
aggregator, aggregator,
Materialized.<String, Integer, WindowStore<Bytes, byte[]>>with(null, Serdes.Integer()) Materialized.<String, Integer, WindowStore<Bytes, byte[]>>with(null, Serdes.Integer())
) )
.toStream(new KeyValueMapper<Windowed<String>, Integer, String>() { .toStream()
@Override .to(outputTopic, Produced.with(windowedSerde, Serdes.Integer()));
public String apply(final Windowed<String> windowedKey, final Integer value) {
return windowedKey.key() + "@" + windowedKey.window().start();
}
})
.to(outputTopic, Produced.with(Serdes.String(), Serdes.Integer()));
startStreams(); startStreams();
final List<KeyValue<String, Integer>> windowedMessages = receiveMessages( final List<KeyValue<Windowed<String>, Integer>> windowedMessages = receiveMessages(
new StringDeserializer(), new TimeWindowedDeserializer<String>(),
new IntegerDeserializer(), new IntegerDeserializer(),
String.class,
15); 15);
final Comparator<KeyValue<String, Integer>> // read from ConsoleConsumer
String resultFromConsoleConsumer = readWindowedKeyedMessagesViaConsoleConsumer(
new TimeWindowedDeserializer<String>(),
new IntegerDeserializer(),
String.class,
15);
final Comparator<KeyValue<Windowed<String>, Integer>>
comparator = comparator =
new Comparator<KeyValue<String, Integer>>() { new Comparator<KeyValue<Windowed<String>, Integer>>() {
@Override @Override
public int compare(final KeyValue<String, Integer> o1, public int compare(final KeyValue<Windowed<String>, Integer> o1,
final KeyValue<String, Integer> o2) { final KeyValue<Windowed<String>, Integer> o2) {
return KStreamAggregationIntegrationTest.compare(o1, o2); final int keyComparison = o1.key.key().compareTo(o2.key.key());
return keyComparison == 0 ? o1.value.compareTo(o2.value) : keyComparison;
} }
}; };
@ -345,24 +376,37 @@ public class KStreamAggregationIntegrationTest {
final long firstWindow = firstTimestamp / 500 * 500; final long firstWindow = firstTimestamp / 500 * 500;
final long secondWindow = secondTimestamp / 500 * 500; final long secondWindow = secondTimestamp / 500 * 500;
assertThat(windowedMessages, is( List<KeyValue<Windowed<String>, Integer>> expectResult = Arrays.asList(
Arrays.asList( new KeyValue<>(new Windowed<>("A", new TimeWindow(firstWindow, Long.MAX_VALUE)), 1),
new KeyValue<>("A@" + firstWindow, 1), new KeyValue<>(new Windowed<>("A", new TimeWindow(secondWindow, Long.MAX_VALUE)), 1),
new KeyValue<>("A@" + secondWindow, 1), new KeyValue<>(new Windowed<>("A", new TimeWindow(secondWindow, Long.MAX_VALUE)), 2),
new KeyValue<>("A@" + secondWindow, 2), new KeyValue<>(new Windowed<>("B", new TimeWindow(firstWindow, Long.MAX_VALUE)), 1),
new KeyValue<>("B@" + firstWindow, 1), new KeyValue<>(new Windowed<>("B", new TimeWindow(secondWindow, Long.MAX_VALUE)), 1),
new KeyValue<>("B@" + secondWindow, 1), new KeyValue<>(new Windowed<>("B", new TimeWindow(secondWindow, Long.MAX_VALUE)), 2),
new KeyValue<>("B@" + secondWindow, 2), new KeyValue<>(new Windowed<>("C", new TimeWindow(firstWindow, Long.MAX_VALUE)), 1),
new KeyValue<>("C@" + firstWindow, 1), new KeyValue<>(new Windowed<>("C", new TimeWindow(secondWindow, Long.MAX_VALUE)), 1),
new KeyValue<>("C@" + secondWindow, 1), new KeyValue<>(new Windowed<>("C", new TimeWindow(secondWindow, Long.MAX_VALUE)), 2),
new KeyValue<>("C@" + secondWindow, 2), new KeyValue<>(new Windowed<>("D", new TimeWindow(firstWindow, Long.MAX_VALUE)), 1),
new KeyValue<>("D@" + firstWindow, 1), new KeyValue<>(new Windowed<>("D", new TimeWindow(secondWindow, Long.MAX_VALUE)), 1),
new KeyValue<>("D@" + secondWindow, 1), new KeyValue<>(new Windowed<>("D", new TimeWindow(secondWindow, Long.MAX_VALUE)), 2),
new KeyValue<>("D@" + secondWindow, 2), new KeyValue<>(new Windowed<>("E", new TimeWindow(firstWindow, Long.MAX_VALUE)), 1),
new KeyValue<>("E@" + firstWindow, 1), new KeyValue<>(new Windowed<>("E", new TimeWindow(secondWindow, Long.MAX_VALUE)), 1),
new KeyValue<>("E@" + secondWindow, 1), new KeyValue<>(new Windowed<>("E", new TimeWindow(secondWindow, Long.MAX_VALUE)), 2));
new KeyValue<>("E@" + secondWindow, 2)
))); assertThat(windowedMessages, is(expectResult));
Set<String> expectResultString = new HashSet<>(expectResult.size());
for (KeyValue<Windowed<String>, Integer> eachRecord: expectResult) {
expectResultString.add(eachRecord.toString());
}
// check every message is contained in the expect result
String[] allRecords = resultFromConsoleConsumer.split("\n");
for (String record: allRecords) {
record = "KeyValue(" + record + ")";
assertTrue(expectResultString.contains(record));
}
} }
private void shouldCountHelper() throws Exception { private void shouldCountHelper() throws Exception {
@ -685,26 +729,66 @@ public class KStreamAggregationIntegrationTest {
kafkaStreams.start(); kafkaStreams.start();
} }
private <K, V> List<KeyValue<K, V>> receiveMessages(final Deserializer<K> private <K, V> List<KeyValue<K, V>> receiveMessages(final Deserializer<K>
keyDeserializer, keyDeserializer,
final Deserializer<V> final Deserializer<V>
valueDeserializer, valueDeserializer,
final int numMessages) final int numMessages)
throws InterruptedException { throws InterruptedException {
return receiveMessages(keyDeserializer, valueDeserializer, null, numMessages);
}
private <K, V> List<KeyValue<K, V>> receiveMessages(final Deserializer<K>
keyDeserializer,
final Deserializer<V>
valueDeserializer,
final Class innerClass,
final int numMessages) throws InterruptedException {
final Properties consumerProperties = new Properties(); final Properties consumerProperties = new Properties();
consumerProperties consumerProperties.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, CLUSTER.bootstrapServers());
.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, CLUSTER.bootstrapServers());
consumerProperties.setProperty(ConsumerConfig.GROUP_ID_CONFIG, "kgroupedstream-test-" + testNo); consumerProperties.setProperty(ConsumerConfig.GROUP_ID_CONFIG, "kgroupedstream-test-" + testNo);
consumerProperties.setProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest"); consumerProperties.setProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
consumerProperties.setProperty(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, keyDeserializer.getClass().getName()); consumerProperties.setProperty(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, keyDeserializer.getClass().getName());
consumerProperties.setProperty(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, valueDeserializer.getClass().getName()); consumerProperties.setProperty(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, valueDeserializer.getClass().getName());
if (keyDeserializer instanceof TimeWindowedDeserializer || keyDeserializer instanceof SessionWindowedDeserializer) {
consumerProperties.setProperty(StreamsConfig.DEFAULT_WINDOWED_KEY_SERDE_INNER_CLASS,
Serdes.serdeFrom(innerClass).getClass().getName());
}
return IntegrationTestUtils.waitUntilMinKeyValueRecordsReceived( return IntegrationTestUtils.waitUntilMinKeyValueRecordsReceived(
consumerProperties, consumerProperties,
outputTopic, outputTopic,
numMessages, numMessages,
60 * 1000); 60 * 1000);
} }
private <K, V> String readWindowedKeyedMessagesViaConsoleConsumer(final Deserializer<K> keyDeserializer,
final Deserializer<V> valueDeserializer,
final Class innerClass,
final int numMessages) {
ByteArrayOutputStream newConsole = new ByteArrayOutputStream();
PrintStream originalStream = System.out;
try (PrintStream newStream = new PrintStream(newConsole)) {
System.setOut(newStream);
String keySeparator = ", ";
// manually construct the console consumer argument array
String[] args = new String[] {
"--bootstrap-server", CLUSTER.bootstrapServers(),
"--from-beginning",
"--property", "print.key=true",
"--topic", outputTopic,
"--max-messages", String.valueOf(numMessages),
"--property", "key.deserializer=" + keyDeserializer.getClass().getName(),
"--property", "value.deserializer=" + valueDeserializer.getClass().getName(),
"--property", "key.separator=" + keySeparator,
"--" + StreamsConfig.DEFAULT_WINDOWED_KEY_SERDE_INNER_CLASS, Serdes.serdeFrom(innerClass).getClass().getName()
};
ConsoleConsumer.messageCount_$eq(0); //reset the message count
ConsoleConsumer.run(new ConsoleConsumer.ConsumerConfig(args));
newStream.flush();
System.setOut(originalStream);
return newConsole.toString();
}
}
} }

Loading…
Cancel
Save