From 8b9f6d17f2a62f7dbb1810de8aaac248a84556cc Mon Sep 17 00:00:00 2001 From: Mickael Maison Date: Mon, 23 Oct 2023 13:26:50 +0200 Subject: [PATCH] KAFKA-15093: Add 3.5 Streams upgrade system tests (#14602) Reviewers: Matthias J. Sax --- build.gradle | 14 + settings.gradle | 1 + .../kafka/streams/tests/SmokeTestClient.java | 299 ++++++++ .../kafka/streams/tests/SmokeTestDriver.java | 670 ++++++++++++++++++ .../kafka/streams/tests/SmokeTestUtil.java | 131 ++++ .../kafka/streams/tests/StreamsSmokeTest.java | 100 +++ .../streams/tests/StreamsUpgradeTest.java | 120 ++++ .../streams_application_upgrade_test.py | 5 +- .../streams_broker_compatibility_test.py | 5 +- .../tests/streams/streams_upgrade_test.py | 6 +- 10 files changed, 1345 insertions(+), 6 deletions(-) create mode 100644 streams/upgrade-system-tests-35/src/test/java/org/apache/kafka/streams/tests/SmokeTestClient.java create mode 100644 streams/upgrade-system-tests-35/src/test/java/org/apache/kafka/streams/tests/SmokeTestDriver.java create mode 100644 streams/upgrade-system-tests-35/src/test/java/org/apache/kafka/streams/tests/SmokeTestUtil.java create mode 100644 streams/upgrade-system-tests-35/src/test/java/org/apache/kafka/streams/tests/StreamsSmokeTest.java create mode 100644 streams/upgrade-system-tests-35/src/test/java/org/apache/kafka/streams/tests/StreamsUpgradeTest.java diff --git a/build.gradle b/build.gradle index c2d1938c1bd..3326f207b81 100644 --- a/build.gradle +++ b/build.gradle @@ -2171,6 +2171,7 @@ project(':streams') { ':streams:upgrade-system-tests-32:test', ':streams:upgrade-system-tests-33:test', ':streams:upgrade-system-tests-34:test', + ':streams:upgrade-system-tests-35:test', ':streams:examples:test' ] ) @@ -2569,6 +2570,19 @@ project(':streams:upgrade-system-tests-34') { } } +project(':streams:upgrade-system-tests-35') { + archivesBaseName = "kafka-streams-upgrade-system-tests-35" + + dependencies { + testImplementation libs.kafkaStreams_35 + testRuntimeOnly libs.junitJupiter + } + + systemTestLibs { + dependsOn testJar + } +} + project(':jmh-benchmarks') { apply plugin: 'com.github.johnrengelman.shadow' diff --git a/settings.gradle b/settings.gradle index b1a3726c545..e11551ed94a 100644 --- a/settings.gradle +++ b/settings.gradle @@ -93,6 +93,7 @@ include 'clients', 'streams:upgrade-system-tests-32', 'streams:upgrade-system-tests-33', 'streams:upgrade-system-tests-34', + 'streams:upgrade-system-tests-35', 'tools', 'tools:tools-api', 'trogdor' diff --git a/streams/upgrade-system-tests-35/src/test/java/org/apache/kafka/streams/tests/SmokeTestClient.java b/streams/upgrade-system-tests-35/src/test/java/org/apache/kafka/streams/tests/SmokeTestClient.java new file mode 100644 index 00000000000..dc0ad4d5601 --- /dev/null +++ b/streams/upgrade-system-tests-35/src/test/java/org/apache/kafka/streams/tests/SmokeTestClient.java @@ -0,0 +1,299 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.streams.tests; + +import org.apache.kafka.common.serialization.Serdes; +import org.apache.kafka.common.utils.Bytes; +import org.apache.kafka.common.utils.KafkaThread; +import org.apache.kafka.common.utils.Utils; +import org.apache.kafka.streams.KafkaStreams; +import org.apache.kafka.streams.KeyValue; +import org.apache.kafka.streams.StreamsBuilder; +import org.apache.kafka.streams.StreamsConfig; +import org.apache.kafka.streams.Topology; +import org.apache.kafka.streams.errors.StreamsUncaughtExceptionHandler; +import org.apache.kafka.streams.kstream.Consumed; +import org.apache.kafka.streams.kstream.Grouped; +import org.apache.kafka.streams.kstream.KGroupedStream; +import org.apache.kafka.streams.kstream.KStream; +import org.apache.kafka.streams.kstream.KTable; +import org.apache.kafka.streams.kstream.Materialized; +import org.apache.kafka.streams.kstream.Produced; +import org.apache.kafka.streams.kstream.Suppressed.BufferConfig; +import org.apache.kafka.streams.kstream.TimeWindows; +import org.apache.kafka.streams.kstream.Windowed; +import org.apache.kafka.streams.state.Stores; +import org.apache.kafka.streams.state.WindowStore; + +import java.io.File; +import java.io.IOException; +import java.nio.file.Files; +import java.time.Duration; +import java.time.Instant; +import java.util.Properties; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; + +import static org.apache.kafka.streams.kstream.Suppressed.untilWindowCloses; + +public class SmokeTestClient extends SmokeTestUtil { + + private final String name; + + private KafkaStreams streams; + private boolean uncaughtException = false; + private boolean started; + private volatile boolean closed; + + private static void addShutdownHook(final String name, final Runnable runnable) { + if (name != null) { + Runtime.getRuntime().addShutdownHook(KafkaThread.nonDaemon(name, runnable)); + } else { + Runtime.getRuntime().addShutdownHook(new Thread(runnable)); + } + } + + private static File tempDirectory() { + final String prefix = "kafka-"; + final File file; + try { + file = Files.createTempDirectory(prefix).toFile(); + } catch (final IOException ex) { + throw new RuntimeException("Failed to create a temp dir", ex); + } + file.deleteOnExit(); + + addShutdownHook("delete-temp-file-shutdown-hook", () -> { + try { + Utils.delete(file); + } catch (final IOException e) { + System.out.println("Error deleting " + file.getAbsolutePath()); + e.printStackTrace(System.out); + } + }); + + return file; + } + + public SmokeTestClient(final String name) { + this.name = name; + } + + public boolean started() { + return started; + } + + public boolean closed() { + return closed; + } + + public void start(final Properties streamsProperties) { + final Topology build = getTopology(); + streams = new KafkaStreams(build, getStreamsConfig(streamsProperties)); + + final CountDownLatch countDownLatch = new CountDownLatch(1); + streams.setStateListener((newState, oldState) -> { + System.out.printf("%s %s: %s -> %s%n", name, Instant.now(), oldState, newState); + if (oldState == KafkaStreams.State.REBALANCING && newState == KafkaStreams.State.RUNNING) { + started = true; + countDownLatch.countDown(); + } + + if (newState == KafkaStreams.State.NOT_RUNNING) { + closed = true; + } + }); + + streams.setUncaughtExceptionHandler(e -> { + System.out.println(name + ": SMOKE-TEST-CLIENT-EXCEPTION"); + System.out.println(name + ": FATAL: An unexpected exception is encountered: " + e); + e.printStackTrace(System.out); + uncaughtException = true; + return StreamsUncaughtExceptionHandler.StreamThreadExceptionResponse.SHUTDOWN_CLIENT; + }); + + addShutdownHook("streams-shutdown-hook", this::close); + + streams.start(); + try { + if (!countDownLatch.await(1, TimeUnit.MINUTES)) { + System.out.println(name + ": SMOKE-TEST-CLIENT-EXCEPTION: Didn't start in one minute"); + } + } catch (final InterruptedException e) { + System.out.println(name + ": SMOKE-TEST-CLIENT-EXCEPTION: " + e); + e.printStackTrace(System.out); + } + System.out.println(name + ": SMOKE-TEST-CLIENT-STARTED"); + System.out.println(name + " started at " + Instant.now()); + } + + public void closeAsync() { + streams.close(Duration.ZERO); + } + + public void close() { + final boolean closed = streams.close(Duration.ofMinutes(1)); + + if (closed && !uncaughtException) { + System.out.println(name + ": SMOKE-TEST-CLIENT-CLOSED"); + } else if (closed) { + System.out.println(name + ": SMOKE-TEST-CLIENT-EXCEPTION"); + } else { + System.out.println(name + ": SMOKE-TEST-CLIENT-EXCEPTION: Didn't close"); + } + } + + private Properties getStreamsConfig(final Properties props) { + final Properties fullProps = new Properties(props); + fullProps.put(StreamsConfig.APPLICATION_ID_CONFIG, "SmokeTest"); + fullProps.put(StreamsConfig.CLIENT_ID_CONFIG, "SmokeTest-" + name); + fullProps.put(StreamsConfig.STATE_DIR_CONFIG, tempDirectory().getAbsolutePath()); + fullProps.putAll(props); + return fullProps; + } + + public Topology getTopology() { + final StreamsBuilder builder = new StreamsBuilder(); + final Consumed stringIntConsumed = Consumed.with(stringSerde, intSerde); + final KStream source = builder.stream("data", stringIntConsumed); + source.filterNot((k, v) -> k.equals("flush")) + .to("echo", Produced.with(stringSerde, intSerde)); + final KStream data = source.filter((key, value) -> value == null || value != END); + data.process(SmokeTestUtil.printProcessorSupplier("data", name)); + + // min + final KGroupedStream groupedData = data.groupByKey(Grouped.with(stringSerde, intSerde)); + + final KTable, Integer> minAggregation = groupedData + .windowedBy(TimeWindows.ofSizeAndGrace(Duration.ofDays(1), Duration.ofMinutes(1))) + .aggregate( + () -> Integer.MAX_VALUE, + (aggKey, value, aggregate) -> (value < aggregate) ? value : aggregate, + Materialized + .>as("uwin-min") + .withValueSerde(intSerde) + .withRetention(Duration.ofHours(25)) + ); + + streamify(minAggregation, "min-raw"); + + streamify(minAggregation.suppress(untilWindowCloses(BufferConfig.unbounded())), "min-suppressed"); + + minAggregation + .toStream(new Unwindow<>()) + .filterNot((k, v) -> k.equals("flush")) + .to("min", Produced.with(stringSerde, intSerde)); + + final KTable, Integer> smallWindowSum = groupedData + .windowedBy(TimeWindows.ofSizeAndGrace(Duration.ofSeconds(2), Duration.ofSeconds(30)).advanceBy(Duration.ofSeconds(1))) + .reduce(Integer::sum); + + streamify(smallWindowSum, "sws-raw"); + streamify(smallWindowSum.suppress(untilWindowCloses(BufferConfig.unbounded())), "sws-suppressed"); + + final KTable minTable = builder.table( + "min", + Consumed.with(stringSerde, intSerde), + Materialized.as("minStoreName")); + + minTable.toStream().process(SmokeTestUtil.printProcessorSupplier("min", name)); + + // max + groupedData + .windowedBy(TimeWindows.ofSizeWithNoGrace(Duration.ofDays(2))) + .aggregate( + () -> Integer.MIN_VALUE, + (aggKey, value, aggregate) -> (value > aggregate) ? value : aggregate, + Materialized.>as("uwin-max").withValueSerde(intSerde)) + .toStream(new Unwindow<>()) + .filterNot((k, v) -> k.equals("flush")) + .to("max", Produced.with(stringSerde, intSerde)); + + final KTable maxTable = builder.table( + "max", + Consumed.with(stringSerde, intSerde), + Materialized.as("maxStoreName")); + maxTable.toStream().process(SmokeTestUtil.printProcessorSupplier("max", name)); + + // sum + groupedData + .windowedBy(TimeWindows.ofSizeWithNoGrace(Duration.ofDays(2))) + .aggregate( + () -> 0L, + (aggKey, value, aggregate) -> (long) value + aggregate, + Materialized.>as("win-sum").withValueSerde(longSerde)) + .toStream(new Unwindow<>()) + .filterNot((k, v) -> k.equals("flush")) + .to("sum", Produced.with(stringSerde, longSerde)); + + final Consumed stringLongConsumed = Consumed.with(stringSerde, longSerde); + final KTable sumTable = builder.table("sum", stringLongConsumed); + sumTable.toStream().process(SmokeTestUtil.printProcessorSupplier("sum", name)); + + // cnt + groupedData + .windowedBy(TimeWindows.ofSizeWithNoGrace(Duration.ofDays(2))) + .count(Materialized.as("uwin-cnt")) + .toStream(new Unwindow<>()) + .filterNot((k, v) -> k.equals("flush")) + .to("cnt", Produced.with(stringSerde, longSerde)); + + final KTable cntTable = builder.table( + "cnt", + Consumed.with(stringSerde, longSerde), + Materialized.as("cntStoreName")); + cntTable.toStream().process(SmokeTestUtil.printProcessorSupplier("cnt", name)); + + // dif + maxTable + .join( + minTable, + (value1, value2) -> value1 - value2) + .toStream() + .filterNot((k, v) -> k.equals("flush")) + .to("dif", Produced.with(stringSerde, intSerde)); + + // avg + sumTable + .join( + cntTable, + (value1, value2) -> (double) value1 / (double) value2) + .toStream() + .filterNot((k, v) -> k.equals("flush")) + .to("avg", Produced.with(stringSerde, doubleSerde)); + + // test repartition + final Agg agg = new Agg(); + cntTable.groupBy(agg.selector(), Grouped.with(stringSerde, longSerde)) + .aggregate(agg.init(), agg.adder(), agg.remover(), + Materialized.as(Stores.inMemoryKeyValueStore("cntByCnt")) + .withKeySerde(Serdes.String()) + .withValueSerde(Serdes.Long())) + .toStream() + .to("tagg", Produced.with(stringSerde, longSerde)); + + return builder.build(); + } + + private static void streamify(final KTable, Integer> windowedTable, final String topic) { + windowedTable + .toStream() + .filterNot((k, v) -> k.key().equals("flush")) + .map((key, value) -> new KeyValue<>(key.toString(), value)) + .to(topic, Produced.with(stringSerde, intSerde)); + } +} diff --git a/streams/upgrade-system-tests-35/src/test/java/org/apache/kafka/streams/tests/SmokeTestDriver.java b/streams/upgrade-system-tests-35/src/test/java/org/apache/kafka/streams/tests/SmokeTestDriver.java new file mode 100644 index 00000000000..dbacbb9625b --- /dev/null +++ b/streams/upgrade-system-tests-35/src/test/java/org/apache/kafka/streams/tests/SmokeTestDriver.java @@ -0,0 +1,670 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.streams.tests; + +import org.apache.kafka.clients.consumer.ConsumerConfig; +import org.apache.kafka.clients.consumer.ConsumerRecord; +import org.apache.kafka.clients.consumer.ConsumerRecords; +import org.apache.kafka.clients.consumer.KafkaConsumer; +import org.apache.kafka.clients.producer.Callback; +import org.apache.kafka.clients.producer.KafkaProducer; +import org.apache.kafka.clients.producer.ProducerConfig; +import org.apache.kafka.clients.producer.ProducerRecord; +import org.apache.kafka.clients.producer.RecordMetadata; +import org.apache.kafka.common.PartitionInfo; +import org.apache.kafka.common.TopicPartition; +import org.apache.kafka.common.errors.TimeoutException; +import org.apache.kafka.common.serialization.ByteArraySerializer; +import org.apache.kafka.common.serialization.Deserializer; +import org.apache.kafka.common.serialization.Serde; +import org.apache.kafka.common.serialization.StringDeserializer; +import org.apache.kafka.common.utils.Exit; +import org.apache.kafka.common.utils.Utils; + +import java.io.ByteArrayOutputStream; +import java.io.PrintStream; +import java.nio.charset.StandardCharsets; +import java.time.Duration; +import java.time.Instant; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collections; +import java.util.HashMap; +import java.util.HashSet; +import java.util.LinkedList; +import java.util.List; +import java.util.Map; +import java.util.Properties; +import java.util.Random; +import java.util.Set; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.function.Function; +import java.util.stream.Collectors; +import java.util.stream.Stream; + +import static java.util.Collections.emptyMap; +import static org.apache.kafka.common.utils.Utils.mkEntry; + +public class SmokeTestDriver extends SmokeTestUtil { + private static final String[] NUMERIC_VALUE_TOPICS = { + "data", + "echo", + "max", + "min", "min-suppressed", "min-raw", + "dif", + "sum", + "sws-raw", "sws-suppressed", + "cnt", + "avg", + "tagg" + }; + private static final String[] STRING_VALUE_TOPICS = { + "fk" + }; + + private static final String[] TOPICS = new String[NUMERIC_VALUE_TOPICS.length + STRING_VALUE_TOPICS.length]; + static { + System.arraycopy(NUMERIC_VALUE_TOPICS, 0, TOPICS, 0, NUMERIC_VALUE_TOPICS.length); + System.arraycopy(STRING_VALUE_TOPICS, 0, TOPICS, NUMERIC_VALUE_TOPICS.length, STRING_VALUE_TOPICS.length); + } + + private static final int MAX_RECORD_EMPTY_RETRIES = 30; + + private static class ValueList { + public final String key; + private final int[] values; + private int index; + + ValueList(final int min, final int max) { + key = min + "-" + max; + + values = new int[max - min + 1]; + for (int i = 0; i < values.length; i++) { + values[i] = min + i; + } + // We want to randomize the order of data to test not completely predictable processing order + // However, values are also use as a timestamp of the record. (TODO: separate data and timestamp) + // We keep some correlation of time and order. Thus, the shuffling is done with a sliding window + shuffle(values, 10); + + index = 0; + } + + int next() { + return (index < values.length) ? values[index++] : -1; + } + } + + public static String[] topics() { + return Arrays.copyOf(TOPICS, TOPICS.length); + } + + static void generatePerpetually(final String kafka, + final int numKeys, + final int maxRecordsPerKey) { + final Properties producerProps = generatorProperties(kafka); + + int numRecordsProduced = 0; + + final ValueList[] data = new ValueList[numKeys]; + for (int i = 0; i < numKeys; i++) { + data[i] = new ValueList(i, i + maxRecordsPerKey - 1); + } + + final Random rand = new Random(); + + try (final KafkaProducer producer = new KafkaProducer<>(producerProps)) { + while (true) { + final int index = rand.nextInt(numKeys); + final String key = data[index].key; + final int value = data[index].next(); + + final ProducerRecord record = + new ProducerRecord<>( + "data", + stringSerde.serializer().serialize("", key), + intSerde.serializer().serialize("", value) + ); + producer.send(record); + + final ProducerRecord fkRecord = + new ProducerRecord<>( + "fk", + intSerde.serializer().serialize("", value), + stringSerde.serializer().serialize("", key) + ); + producer.send(fkRecord); + + numRecordsProduced++; + if (numRecordsProduced % 100 == 0) { + System.out.println(Instant.now() + " " + numRecordsProduced + " records produced"); + } + Utils.sleep(2); + } + } + } + + public static Map> generate(final String kafka, + final int numKeys, + final int maxRecordsPerKey, + final Duration timeToSpend) { + final Properties producerProps = generatorProperties(kafka); + + int numRecordsProduced = 0; + + final Map> allData = new HashMap<>(); + final ValueList[] data = new ValueList[numKeys]; + for (int i = 0; i < numKeys; i++) { + data[i] = new ValueList(i, i + maxRecordsPerKey - 1); + allData.put(data[i].key, new HashSet<>()); + } + final Random rand = new Random(); + + int remaining = data.length; + + final long recordPauseTime = timeToSpend.toMillis() / numKeys / maxRecordsPerKey; + + final List> dataNeedRetry = new ArrayList<>(); + final List> fkNeedRetry = new ArrayList<>(); + + try (final KafkaProducer producer = new KafkaProducer<>(producerProps)) { + while (remaining > 0) { + final int index = rand.nextInt(remaining); + final String key = data[index].key; + final int value = data[index].next(); + + if (value < 0) { + remaining--; + data[index] = data[remaining]; + } else { + final ProducerRecord record = + new ProducerRecord<>( + "data", + stringSerde.serializer().serialize("", key), + intSerde.serializer().serialize("", value) + ); + + producer.send(record, new TestCallback(record, dataNeedRetry)); + + final ProducerRecord fkRecord = + new ProducerRecord<>( + "fk", + intSerde.serializer().serialize("", value), + stringSerde.serializer().serialize("", key) + ); + + producer.send(fkRecord, new TestCallback(fkRecord, fkNeedRetry)); + + numRecordsProduced++; + allData.get(key).add(value); + if (numRecordsProduced % 100 == 0) { + System.out.println(Instant.now() + " " + numRecordsProduced + " records produced"); + } + Utils.sleep(Math.max(recordPauseTime, 2)); + } + } + producer.flush(); + + retry(producer, dataNeedRetry, stringSerde); + retry(producer, fkNeedRetry, intSerde); + + flush(producer, + "data", + stringSerde.serializer().serialize("", "flush"), + intSerde.serializer().serialize("", 0) + ); + flush(producer, + "fk", + intSerde.serializer().serialize("", 0), + stringSerde.serializer().serialize("", "flush") + ); + } + return Collections.unmodifiableMap(allData); + } + + private static void retry(final KafkaProducer producer, + List> needRetry, + final Serde keySerde) { + int remainingRetries = 5; + while (!needRetry.isEmpty()) { + final List> needRetry2 = new ArrayList<>(); + for (final ProducerRecord record : needRetry) { + System.out.println( + "retry producing " + keySerde.deserializer().deserialize("", record.key())); + producer.send(record, new TestCallback(record, needRetry2)); + } + producer.flush(); + needRetry = needRetry2; + if (--remainingRetries == 0 && !needRetry.isEmpty()) { + System.err.println("Failed to produce all records after multiple retries"); + Exit.exit(1); + } + } + } + + private static void flush(final KafkaProducer producer, + final String topic, + final byte[] keyBytes, + final byte[] valBytes) { + // now that we've sent everything, we'll send some final records with a timestamp high enough to flush out + // all suppressed records. + final List partitions = producer.partitionsFor(topic); + for (final PartitionInfo partition : partitions) { + producer.send(new ProducerRecord<>( + partition.topic(), + partition.partition(), + System.currentTimeMillis() + Duration.ofDays(2).toMillis(), + keyBytes, + valBytes + )); + } + } + + private static Properties generatorProperties(final String kafka) { + final Properties producerProps = new Properties(); + producerProps.put(ProducerConfig.CLIENT_ID_CONFIG, "SmokeTest"); + producerProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, kafka); + producerProps.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, ByteArraySerializer.class); + producerProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, ByteArraySerializer.class); + producerProps.put(ProducerConfig.ACKS_CONFIG, "all"); + return producerProps; + } + + private static class TestCallback implements Callback { + private final ProducerRecord originalRecord; + private final List> needRetry; + + TestCallback(final ProducerRecord originalRecord, + final List> needRetry) { + this.originalRecord = originalRecord; + this.needRetry = needRetry; + } + + @Override + public void onCompletion(final RecordMetadata metadata, final Exception exception) { + if (exception != null) { + if (exception instanceof TimeoutException) { + needRetry.add(originalRecord); + } else { + exception.printStackTrace(); + Exit.exit(1); + } + } + } + } + + private static void shuffle(final int[] data, @SuppressWarnings("SameParameterValue") final int windowSize) { + final Random rand = new Random(); + for (int i = 0; i < data.length; i++) { + // we shuffle data within windowSize + final int j = rand.nextInt(Math.min(data.length - i, windowSize)) + i; + + // swap + final int tmp = data[i]; + data[i] = data[j]; + data[j] = tmp; + } + } + + public static class NumberDeserializer implements Deserializer { + @Override + public Number deserialize(final String topic, final byte[] data) { + final Number value; + switch (topic) { + case "data": + case "echo": + case "min": + case "min-raw": + case "min-suppressed": + case "sws-raw": + case "sws-suppressed": + case "max": + case "dif": + value = intSerde.deserializer().deserialize(topic, data); + break; + case "sum": + case "cnt": + case "tagg": + value = longSerde.deserializer().deserialize(topic, data); + break; + case "avg": + value = doubleSerde.deserializer().deserialize(topic, data); + break; + default: + throw new RuntimeException("unknown topic: " + topic); + } + return value; + } + } + + public static VerificationResult verify(final String kafka, + final Map> inputs, + final int maxRecordsPerKey) { + final Properties props = new Properties(); + props.put(ConsumerConfig.CLIENT_ID_CONFIG, "verifier"); + props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, kafka); + props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class); + props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, NumberDeserializer.class); + props.put(ConsumerConfig.ISOLATION_LEVEL_CONFIG, "read_committed"); + + final KafkaConsumer consumer = new KafkaConsumer<>(props); + final List partitions = getAllPartitions(consumer, NUMERIC_VALUE_TOPICS); + consumer.assign(partitions); + consumer.seekToBeginning(partitions); + + final int recordsGenerated = inputs.size() * maxRecordsPerKey; + int recordsProcessed = 0; + final Map processed = + Stream.of(NUMERIC_VALUE_TOPICS) + .collect(Collectors.toMap(t -> t, t -> new AtomicInteger(0))); + + final Map>>> events = new HashMap<>(); + + VerificationResult verificationResult = new VerificationResult(false, "no results yet"); + int retry = 0; + final long start = System.currentTimeMillis(); + while (System.currentTimeMillis() - start < TimeUnit.MINUTES.toMillis(6)) { + final ConsumerRecords records = consumer.poll(Duration.ofSeconds(5)); + if (records.isEmpty() && recordsProcessed >= recordsGenerated) { + verificationResult = verifyAll(inputs, events, false); + if (verificationResult.passed()) { + break; + } else if (retry++ > MAX_RECORD_EMPTY_RETRIES) { + System.out.println(Instant.now() + " Didn't get any more results, verification hasn't passed, and out of retries."); + break; + } else { + System.out.println(Instant.now() + " Didn't get any more results, but verification hasn't passed (yet). Retrying..." + retry); + } + } else { + System.out.println(Instant.now() + " Get some more results from " + records.partitions() + ", resetting retry."); + + retry = 0; + for (final ConsumerRecord record : records) { + final String key = record.key(); + + final String topic = record.topic(); + processed.get(topic).incrementAndGet(); + + if (topic.equals("echo")) { + recordsProcessed++; + if (recordsProcessed % 100 == 0) { + System.out.println("Echo records processed = " + recordsProcessed); + } + } + + events.computeIfAbsent(topic, t -> new HashMap<>()) + .computeIfAbsent(key, k -> new LinkedList<>()) + .add(record); + } + + System.out.println(processed); + } + } + consumer.close(); + final long finished = System.currentTimeMillis() - start; + System.out.println("Verification time=" + finished); + System.out.println("-------------------"); + System.out.println("Result Verification"); + System.out.println("-------------------"); + System.out.println("recordGenerated=" + recordsGenerated); + System.out.println("recordProcessed=" + recordsProcessed); + + if (recordsProcessed > recordsGenerated) { + System.out.println("PROCESSED-MORE-THAN-GENERATED"); + } else if (recordsProcessed < recordsGenerated) { + System.out.println("PROCESSED-LESS-THAN-GENERATED"); + } + + boolean success; + + final Map> received = + events.get("echo") + .entrySet() + .stream() + .map(entry -> mkEntry( + entry.getKey(), + entry.getValue().stream().map(ConsumerRecord::value).collect(Collectors.toSet())) + ) + .collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue)); + + success = inputs.equals(received); + + if (success) { + System.out.println("ALL-RECORDS-DELIVERED"); + } else { + int missedCount = 0; + for (final Map.Entry> entry : inputs.entrySet()) { + missedCount += received.get(entry.getKey()).size(); + } + System.out.println("missedRecords=" + missedCount); + } + + // give it one more try if it's not already passing. + if (!verificationResult.passed()) { + verificationResult = verifyAll(inputs, events, true); + } + success &= verificationResult.passed(); + + System.out.println(verificationResult.result()); + + System.out.println(success ? "SUCCESS" : "FAILURE"); + return verificationResult; + } + + public static class VerificationResult { + private final boolean passed; + private final String result; + + VerificationResult(final boolean passed, final String result) { + this.passed = passed; + this.result = result; + } + + public boolean passed() { + return passed; + } + + public String result() { + return result; + } + } + + private static VerificationResult verifyAll(final Map> inputs, + final Map>>> events, + final boolean printResults) { + final ByteArrayOutputStream byteArrayOutputStream = new ByteArrayOutputStream(); + boolean pass; + try (final PrintStream resultStream = new PrintStream(byteArrayOutputStream)) { + pass = verifyTAgg(resultStream, inputs, events.get("tagg"), printResults); + pass &= verifySuppressed(resultStream, "min-suppressed", events, printResults); + pass &= verify(resultStream, "min-suppressed", inputs, events, windowedKey -> { + final String unwindowedKey = windowedKey.substring(1, windowedKey.length() - 1).replaceAll("@.*", ""); + return getMin(unwindowedKey); + }, printResults); + pass &= verifySuppressed(resultStream, "sws-suppressed", events, printResults); + pass &= verify(resultStream, "min", inputs, events, SmokeTestDriver::getMin, printResults); + pass &= verify(resultStream, "max", inputs, events, SmokeTestDriver::getMax, printResults); + pass &= verify(resultStream, "dif", inputs, events, key -> getMax(key).intValue() - getMin(key).intValue(), printResults); + pass &= verify(resultStream, "sum", inputs, events, SmokeTestDriver::getSum, printResults); + pass &= verify(resultStream, "cnt", inputs, events, key1 -> getMax(key1).intValue() - getMin(key1).intValue() + 1L, printResults); + pass &= verify(resultStream, "avg", inputs, events, SmokeTestDriver::getAvg, printResults); + } + return new VerificationResult(pass, new String(byteArrayOutputStream.toByteArray(), StandardCharsets.UTF_8)); + } + + private static boolean verify(final PrintStream resultStream, + final String topic, + final Map> inputData, + final Map>>> events, + final Function keyToExpectation, + final boolean printResults) { + final Map>> observedInputEvents = events.get("data"); + final Map>> outputEvents = events.getOrDefault(topic, emptyMap()); + if (outputEvents.isEmpty()) { + resultStream.println(topic + " is empty"); + return false; + } else { + resultStream.printf("verifying %s with %d keys%n", topic, outputEvents.size()); + + if (outputEvents.size() != inputData.size()) { + resultStream.printf("fail: resultCount=%d expectedCount=%s%n\tresult=%s%n\texpected=%s%n", + outputEvents.size(), inputData.size(), outputEvents.keySet(), inputData.keySet()); + return false; + } + for (final Map.Entry>> entry : outputEvents.entrySet()) { + final String key = entry.getKey(); + final Number expected = keyToExpectation.apply(key); + final Number actual = entry.getValue().getLast().value(); + if (!expected.equals(actual)) { + resultStream.printf("%s fail: key=%s actual=%s expected=%s%n", topic, key, actual, expected); + + if (printResults) { + resultStream.printf("\t inputEvents=%n%s%n\t" + + "echoEvents=%n%s%n\tmaxEvents=%n%s%n\tminEvents=%n%s%n\tdifEvents=%n%s%n\tcntEvents=%n%s%n\ttaggEvents=%n%s%n", + indent("\t\t", observedInputEvents.get(key)), + indent("\t\t", events.getOrDefault("echo", emptyMap()).getOrDefault(key, new LinkedList<>())), + indent("\t\t", events.getOrDefault("max", emptyMap()).getOrDefault(key, new LinkedList<>())), + indent("\t\t", events.getOrDefault("min", emptyMap()).getOrDefault(key, new LinkedList<>())), + indent("\t\t", events.getOrDefault("dif", emptyMap()).getOrDefault(key, new LinkedList<>())), + indent("\t\t", events.getOrDefault("cnt", emptyMap()).getOrDefault(key, new LinkedList<>())), + indent("\t\t", events.getOrDefault("tagg", emptyMap()).getOrDefault(key, new LinkedList<>()))); + + if (!Utils.mkSet("echo", "max", "min", "dif", "cnt", "tagg").contains(topic)) + resultStream.printf("%sEvents=%n%s%n", topic, indent("\t\t", entry.getValue())); + } + + return false; + } + } + return true; + } + } + + + private static boolean verifySuppressed(final PrintStream resultStream, + @SuppressWarnings("SameParameterValue") final String topic, + final Map>>> events, + final boolean printResults) { + resultStream.println("verifying suppressed " + topic); + final Map>> topicEvents = events.getOrDefault(topic, emptyMap()); + for (final Map.Entry>> entry : topicEvents.entrySet()) { + if (entry.getValue().size() != 1) { + final String unsuppressedTopic = topic.replace("-suppressed", "-raw"); + final String key = entry.getKey(); + final String unwindowedKey = key.substring(1, key.length() - 1).replaceAll("@.*", ""); + resultStream.printf("fail: key=%s%n\tnon-unique result:%n%s%n", + key, + indent("\t\t", entry.getValue())); + + if (printResults) + resultStream.printf("\tresultEvents:%n%s%n\tinputEvents:%n%s%n", + indent("\t\t", events.get(unsuppressedTopic).get(key)), + indent("\t\t", events.get("data").get(unwindowedKey))); + + return false; + } + } + return true; + } + + private static String indent(@SuppressWarnings("SameParameterValue") final String prefix, + final Iterable> list) { + final StringBuilder stringBuilder = new StringBuilder(); + for (final ConsumerRecord record : list) { + stringBuilder.append(prefix).append(record).append('\n'); + } + return stringBuilder.toString(); + } + + private static Long getSum(final String key) { + final int min = getMin(key).intValue(); + final int max = getMax(key).intValue(); + return ((long) min + max) * (max - min + 1L) / 2L; + } + + private static Double getAvg(final String key) { + final int min = getMin(key).intValue(); + final int max = getMax(key).intValue(); + return ((long) min + max) / 2.0; + } + + + private static boolean verifyTAgg(final PrintStream resultStream, + final Map> allData, + final Map>> taggEvents, + final boolean printResults) { + if (taggEvents == null) { + resultStream.println("tagg is missing"); + return false; + } else if (taggEvents.isEmpty()) { + resultStream.println("tagg is empty"); + return false; + } else { + resultStream.println("verifying tagg"); + + // generate expected answer + final Map expected = new HashMap<>(); + for (final String key : allData.keySet()) { + final int min = getMin(key).intValue(); + final int max = getMax(key).intValue(); + final String cnt = Long.toString(max - min + 1L); + + expected.put(cnt, expected.getOrDefault(cnt, 0L) + 1); + } + + // check the result + for (final Map.Entry>> entry : taggEvents.entrySet()) { + final String key = entry.getKey(); + Long expectedCount = expected.remove(key); + if (expectedCount == null) { + expectedCount = 0L; + } + + if (entry.getValue().getLast().value().longValue() != expectedCount) { + resultStream.println("fail: key=" + key + " tagg=" + entry.getValue() + " expected=" + expectedCount); + + if (printResults) + resultStream.println("\t taggEvents: " + entry.getValue()); + return false; + } + } + + } + return true; + } + + private static Number getMin(final String key) { + return Integer.parseInt(key.split("-")[0]); + } + + private static Number getMax(final String key) { + return Integer.parseInt(key.split("-")[1]); + } + + private static List getAllPartitions(final KafkaConsumer consumer, final String... topics) { + final List partitions = new ArrayList<>(); + + for (final String topic : topics) { + for (final PartitionInfo info : consumer.partitionsFor(topic)) { + partitions.add(new TopicPartition(info.topic(), info.partition())); + } + } + return partitions; + } + +} diff --git a/streams/upgrade-system-tests-35/src/test/java/org/apache/kafka/streams/tests/SmokeTestUtil.java b/streams/upgrade-system-tests-35/src/test/java/org/apache/kafka/streams/tests/SmokeTestUtil.java new file mode 100644 index 00000000000..9f3efa0f7d6 --- /dev/null +++ b/streams/upgrade-system-tests-35/src/test/java/org/apache/kafka/streams/tests/SmokeTestUtil.java @@ -0,0 +1,131 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.streams.tests; + +import org.apache.kafka.common.serialization.Serde; +import org.apache.kafka.common.serialization.Serdes; +import org.apache.kafka.streams.KeyValue; +import org.apache.kafka.streams.kstream.Aggregator; +import org.apache.kafka.streams.kstream.Initializer; +import org.apache.kafka.streams.kstream.KeyValueMapper; +import org.apache.kafka.streams.kstream.Windowed; +import org.apache.kafka.streams.processor.api.ContextualProcessor; +import org.apache.kafka.streams.processor.api.ProcessorContext; +import org.apache.kafka.streams.processor.api.ProcessorSupplier; +import org.apache.kafka.streams.processor.api.Record; + +import java.time.Instant; + +public class SmokeTestUtil { + + final static int END = Integer.MAX_VALUE; + + static ProcessorSupplier printProcessorSupplier(final String topic) { + return printProcessorSupplier(topic, ""); + } + + static ProcessorSupplier printProcessorSupplier(final String topic, final String name) { + return () -> new ContextualProcessor() { + private int numRecordsProcessed = 0; + private long smallestOffset = Long.MAX_VALUE; + private long largestOffset = Long.MIN_VALUE; + + @Override + public void init(final ProcessorContext context) { + super.init(context); + System.out.println("[3.5] initializing processor: topic=" + topic + " taskId=" + context.taskId()); + System.out.flush(); + numRecordsProcessed = 0; + smallestOffset = Long.MAX_VALUE; + largestOffset = Long.MIN_VALUE; + } + + @Override + public void process(final Record record) { + numRecordsProcessed++; + if (numRecordsProcessed % 100 == 0) { + System.out.printf("%s: %s%n", name, Instant.now()); + System.out.println("processed " + numRecordsProcessed + " records from topic=" + topic); + } + + context().recordMetadata().ifPresent(recordMetadata -> { + if (smallestOffset > recordMetadata.offset()) { + smallestOffset = recordMetadata.offset(); + } + if (largestOffset < recordMetadata.offset()) { + largestOffset = recordMetadata.offset(); + } + }); + } + + @Override + public void close() { + System.out.printf("Close processor for task %s%n", context().taskId()); + System.out.println("processed " + numRecordsProcessed + " records"); + final long processed; + if (largestOffset >= smallestOffset) { + processed = 1L + largestOffset - smallestOffset; + } else { + processed = 0L; + } + System.out.println("offset " + smallestOffset + " to " + largestOffset + " -> processed " + processed); + System.out.flush(); + } + }; + } + + public static final class Unwindow implements KeyValueMapper, V, K> { + @Override + public K apply(final Windowed winKey, final V value) { + return winKey.key(); + } + } + + public static class Agg { + + KeyValueMapper> selector() { + return (key, value) -> new KeyValue<>(value == null ? null : Long.toString(value), 1L); + } + + public Initializer init() { + return () -> 0L; + } + + Aggregator adder() { + return (aggKey, value, aggregate) -> aggregate + value; + } + + Aggregator remover() { + return (aggKey, value, aggregate) -> aggregate - value; + } + } + + public static Serde stringSerde = Serdes.String(); + + public static Serde intSerde = Serdes.Integer(); + + static Serde longSerde = Serdes.Long(); + + static Serde doubleSerde = Serdes.Double(); + + public static void sleep(final long duration) { + try { + Thread.sleep(duration); + } catch (final Exception ignore) { } + } + +} diff --git a/streams/upgrade-system-tests-35/src/test/java/org/apache/kafka/streams/tests/StreamsSmokeTest.java b/streams/upgrade-system-tests-35/src/test/java/org/apache/kafka/streams/tests/StreamsSmokeTest.java new file mode 100644 index 00000000000..5803b2fbd02 --- /dev/null +++ b/streams/upgrade-system-tests-35/src/test/java/org/apache/kafka/streams/tests/StreamsSmokeTest.java @@ -0,0 +1,100 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.streams.tests; + +import org.apache.kafka.common.utils.Exit; +import org.apache.kafka.common.utils.Utils; +import org.apache.kafka.streams.StreamsConfig; + +import java.io.IOException; +import java.time.Duration; +import java.util.Map; +import java.util.Properties; +import java.util.Set; +import java.util.UUID; + +import static org.apache.kafka.streams.tests.SmokeTestDriver.generate; +import static org.apache.kafka.streams.tests.SmokeTestDriver.generatePerpetually; + +public class StreamsSmokeTest { + + /** + * args ::= kafka propFileName command disableAutoTerminate + * command := "run" | "process" + * + * @param args + */ + public static void main(final String[] args) throws IOException { + if (args.length < 2) { + System.err.println("StreamsSmokeTest are expecting two parameters: propFile, command; but only see " + args.length + " parameter"); + Exit.exit(1); + } + + final String propFileName = args[0]; + final String command = args[1]; + final boolean disableAutoTerminate = args.length > 2; + + final Properties streamsProperties = Utils.loadProps(propFileName); + final String kafka = streamsProperties.getProperty(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG); + final String processingGuarantee = streamsProperties.getProperty(StreamsConfig.PROCESSING_GUARANTEE_CONFIG); + + if (kafka == null) { + System.err.println("No bootstrap kafka servers specified in " + StreamsConfig.BOOTSTRAP_SERVERS_CONFIG); + Exit.exit(1); + } + + if ("process".equals(command)) { + if (!StreamsConfig.AT_LEAST_ONCE.equals(processingGuarantee) && + !StreamsConfig.EXACTLY_ONCE_V2.equals(processingGuarantee)) { + + System.err.println("processingGuarantee must be either " + StreamsConfig.AT_LEAST_ONCE + " or " + + StreamsConfig.EXACTLY_ONCE_V2); + + Exit.exit(1); + } + } + + System.out.println("StreamsTest instance started (StreamsSmokeTest)"); + System.out.println("command=" + command); + System.out.println("props=" + streamsProperties); + System.out.println("disableAutoTerminate=" + disableAutoTerminate); + + switch (command) { + case "run": + // this starts the driver (data generation and result verification) + final int numKeys = 10; + final int maxRecordsPerKey = 500; + if (disableAutoTerminate) { + generatePerpetually(kafka, numKeys, maxRecordsPerKey); + } else { + // slow down data production to span 30 seconds so that system tests have time to + // do their bounces, etc. + final Map> allData = + generate(kafka, numKeys, maxRecordsPerKey, Duration.ofSeconds(30)); + SmokeTestDriver.verify(kafka, allData, maxRecordsPerKey); + } + break; + case "process": + // this starts the stream processing app + new SmokeTestClient(UUID.randomUUID().toString()).start(streamsProperties); + break; + default: + System.out.println("unknown command: " + command); + } + } + +} diff --git a/streams/upgrade-system-tests-35/src/test/java/org/apache/kafka/streams/tests/StreamsUpgradeTest.java b/streams/upgrade-system-tests-35/src/test/java/org/apache/kafka/streams/tests/StreamsUpgradeTest.java new file mode 100644 index 00000000000..4eca18abaef --- /dev/null +++ b/streams/upgrade-system-tests-35/src/test/java/org/apache/kafka/streams/tests/StreamsUpgradeTest.java @@ -0,0 +1,120 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.streams.tests; + +import org.apache.kafka.common.utils.Utils; +import org.apache.kafka.streams.KafkaStreams; +import org.apache.kafka.streams.StreamsBuilder; +import org.apache.kafka.streams.StreamsConfig; +import org.apache.kafka.streams.kstream.Consumed; +import org.apache.kafka.streams.kstream.KStream; +import org.apache.kafka.streams.kstream.KTable; +import org.apache.kafka.streams.kstream.Produced; +import org.apache.kafka.streams.processor.api.ContextualProcessor; +import org.apache.kafka.streams.processor.api.ProcessorContext; +import org.apache.kafka.streams.processor.api.ProcessorSupplier; +import org.apache.kafka.streams.processor.api.Record; + +import java.util.Properties; + +import static org.apache.kafka.streams.tests.SmokeTestUtil.intSerde; +import static org.apache.kafka.streams.tests.SmokeTestUtil.stringSerde; + + +public class StreamsUpgradeTest { + + @SuppressWarnings("unchecked") + public static void main(final String[] args) throws Exception { + if (args.length < 1) { + System.err.println("StreamsUpgradeTest requires one argument (properties-file) but provided none"); + } + final String propFileName = args[0]; + + final Properties streamsProperties = Utils.loadProps(propFileName); + + System.out.println("StreamsTest instance started (StreamsUpgradeTest v3.5)"); + System.out.println("props=" + streamsProperties); + + final StreamsBuilder builder = new StreamsBuilder(); + final KTable dataTable = builder.table( + "data", Consumed.with(stringSerde, intSerde)); + final KStream dataStream = dataTable.toStream(); + dataStream.process(printProcessorSupplier("data")); + dataStream.to("echo"); + + final boolean runFkJoin = Boolean.parseBoolean(streamsProperties.getProperty( + "test.run_fk_join", + "false")); + if (runFkJoin) { + try { + final KTable fkTable = builder.table( + "fk", Consumed.with(intSerde, stringSerde)); + buildFKTable(dataStream, fkTable); + } catch (final Exception e) { + System.err.println("Caught " + e.getMessage()); + } + } + + final Properties config = new Properties(); + config.setProperty( + StreamsConfig.APPLICATION_ID_CONFIG, + "StreamsUpgradeTest"); + config.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, 1000); + config.putAll(streamsProperties); + + final KafkaStreams streams = new KafkaStreams(builder.build(), config); + streams.start(); + + Runtime.getRuntime().addShutdownHook(new Thread(() -> { + streams.close(); + System.out.println("UPGRADE-TEST-CLIENT-CLOSED"); + System.out.flush(); + })); + } + + private static void buildFKTable(final KStream primaryTable, + final KTable otherTable) { + final KStream kStream = primaryTable.toTable() + .join(otherTable, v -> v, (k0, v0) -> v0) + .toStream(); + kStream.process(printProcessorSupplier("fk")); + kStream.to("fk-result", Produced.with(stringSerde, stringSerde)); + } + + private static ProcessorSupplier printProcessorSupplier(final String topic) { + return () -> new ContextualProcessor() { + private int numRecordsProcessed = 0; + + @Override + public void init(final ProcessorContext context) { + System.out.println("[3.5] initializing processor: topic=" + topic + "taskId=" + context.taskId()); + numRecordsProcessed = 0; + } + + @Override + public void process(final Record record) { + numRecordsProcessed++; + if (numRecordsProcessed % 100 == 0) { + System.out.println("processed " + numRecordsProcessed + " records from topic=" + topic); + } + } + + @Override + public void close() {} + }; + } +} diff --git a/tests/kafkatest/tests/streams/streams_application_upgrade_test.py b/tests/kafkatest/tests/streams/streams_application_upgrade_test.py index 65895586f86..38efd5e2e54 100644 --- a/tests/kafkatest/tests/streams/streams_application_upgrade_test.py +++ b/tests/kafkatest/tests/streams/streams_application_upgrade_test.py @@ -22,12 +22,13 @@ from kafkatest.services.kafka import KafkaService from kafkatest.services.streams import StreamsSmokeTestDriverService, StreamsSmokeTestJobRunnerService from kafkatest.services.zookeeper import ZookeeperService from kafkatest.version import LATEST_2_2, LATEST_2_3, LATEST_2_4, LATEST_2_5, LATEST_2_6, LATEST_2_7, LATEST_2_8, \ - LATEST_3_0, LATEST_3_1, LATEST_3_2, LATEST_3_3, LATEST_3_4, DEV_VERSION, KafkaVersion + LATEST_3_0, LATEST_3_1, LATEST_3_2, LATEST_3_3, LATEST_3_4, LATEST_3_5, DEV_VERSION, KafkaVersion smoke_test_versions = [str(LATEST_2_2), str(LATEST_2_3), str(LATEST_2_4), str(LATEST_2_5), str(LATEST_2_6), str(LATEST_2_7), str(LATEST_2_8), str(LATEST_3_0), str(LATEST_3_1), - str(LATEST_3_2), str(LATEST_3_3), str(LATEST_3_4)] + str(LATEST_3_2), str(LATEST_3_3), str(LATEST_3_4), + str(LATEST_3_5)] dev_version = [str(DEV_VERSION)] class StreamsUpgradeTest(Test): diff --git a/tests/kafkatest/tests/streams/streams_broker_compatibility_test.py b/tests/kafkatest/tests/streams/streams_broker_compatibility_test.py index f943c470541..00c5cfa9ddb 100644 --- a/tests/kafkatest/tests/streams/streams_broker_compatibility_test.py +++ b/tests/kafkatest/tests/streams/streams_broker_compatibility_test.py @@ -23,7 +23,7 @@ from kafkatest.services.verifiable_consumer import VerifiableConsumer from kafkatest.services.zookeeper import ZookeeperService from kafkatest.version import LATEST_0_11_0, LATEST_0_10_2, LATEST_0_10_1, LATEST_0_10_0, LATEST_1_0, LATEST_1_1, \ LATEST_2_0, LATEST_2_1, LATEST_2_2, LATEST_2_3, LATEST_2_4, LATEST_2_5, LATEST_2_6, LATEST_2_7, LATEST_2_8, \ - LATEST_3_0, LATEST_3_1, LATEST_3_2, LATEST_3_3, LATEST_3_4, KafkaVersion + LATEST_3_0, LATEST_3_1, LATEST_3_2, LATEST_3_3, LATEST_3_4, LATEST_3_5, KafkaVersion class StreamsBrokerCompatibility(Test): @@ -64,6 +64,7 @@ class StreamsBrokerCompatibility(Test): @cluster(num_nodes=4) + @parametrize(broker_version=str(LATEST_3_5)) @parametrize(broker_version=str(LATEST_3_4)) @parametrize(broker_version=str(LATEST_3_3)) @parametrize(broker_version=str(LATEST_3_2)) @@ -98,6 +99,7 @@ class StreamsBrokerCompatibility(Test): self.kafka.stop() @cluster(num_nodes=4) + @parametrize(broker_version=str(LATEST_3_5)) @parametrize(broker_version=str(LATEST_3_4)) @parametrize(broker_version=str(LATEST_3_3)) @parametrize(broker_version=str(LATEST_3_2)) @@ -132,6 +134,7 @@ class StreamsBrokerCompatibility(Test): self.kafka.stop() @cluster(num_nodes=4) + @parametrize(broker_version=str(LATEST_3_5)) @parametrize(broker_version=str(LATEST_3_4)) @parametrize(broker_version=str(LATEST_3_3)) @parametrize(broker_version=str(LATEST_3_2)) diff --git a/tests/kafkatest/tests/streams/streams_upgrade_test.py b/tests/kafkatest/tests/streams/streams_upgrade_test.py index 1033e3ad2c5..945b1306669 100644 --- a/tests/kafkatest/tests/streams/streams_upgrade_test.py +++ b/tests/kafkatest/tests/streams/streams_upgrade_test.py @@ -26,7 +26,7 @@ from kafkatest.services.zookeeper import ZookeeperService from kafkatest.tests.streams.utils import extract_generation_from_logs, extract_generation_id from kafkatest.version import LATEST_0_10_0, LATEST_0_10_1, LATEST_0_10_2, LATEST_0_11_0, LATEST_1_0, LATEST_1_1, \ LATEST_2_0, LATEST_2_1, LATEST_2_2, LATEST_2_3, LATEST_2_4, LATEST_2_5, LATEST_2_6, LATEST_2_7, LATEST_2_8, \ - LATEST_3_0, LATEST_3_1, LATEST_3_2, LATEST_3_3, LATEST_3_4, DEV_BRANCH, DEV_VERSION, KafkaVersion + LATEST_3_0, LATEST_3_1, LATEST_3_2, LATEST_3_3, LATEST_3_4, LATEST_3_5, DEV_BRANCH, DEV_VERSION, KafkaVersion # broker 0.10.0 is not compatible with newer Kafka Streams versions # broker 0.10.1 and 0.10.2 do not support headers, as required by suppress() (since v2.2.1) @@ -34,7 +34,7 @@ broker_upgrade_versions = [str(LATEST_0_11_0), str(LATEST_1_0), str(LATEST_1_1), str(LATEST_2_0), str(LATEST_2_1), str(LATEST_2_2), str(LATEST_2_3), str(LATEST_2_4), str(LATEST_2_5), str(LATEST_2_6), str(LATEST_2_7), str(LATEST_2_8), str(LATEST_3_0), str(LATEST_3_1), str(LATEST_3_2), - str(LATEST_3_3), str(LATEST_3_4), str(DEV_BRANCH)] + str(LATEST_3_3), str(LATEST_3_4), str(LATEST_3_5), str(DEV_BRANCH)] metadata_1_versions = [str(LATEST_0_10_0)] metadata_2_versions = [str(LATEST_0_10_1), str(LATEST_0_10_2), str(LATEST_0_11_0), str(LATEST_1_0), str(LATEST_1_1), @@ -44,7 +44,7 @@ metadata_2_versions = [str(LATEST_0_10_1), str(LATEST_0_10_2), str(LATEST_0_11_0 # -> https://issues.apache.org/jira/browse/KAFKA-14646 # thus, we cannot test two bounce rolling upgrade because we know it's broken # instead we add version 2.4...3.3 to the `metadata_2_versions` upgrade list -fk_join_versions = [str(LATEST_3_4)] +fk_join_versions = [str(LATEST_3_4), str(LATEST_3_5)] """