Browse Source

KAFKA-15664: Add 3.4 Streams upgrade system tests (#14601)

Reviewers: Luke Chen <showuon@gmail.com>,  Matthias J. Sax <mjsax@apache.org>
pull/14602/head
Mickael Maison 1 year ago committed by GitHub
parent
commit
9c77c17c4e
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 14
      build.gradle
  2. 1
      settings.gradle
  3. 299
      streams/upgrade-system-tests-34/src/test/java/org/apache/kafka/streams/tests/SmokeTestClient.java
  4. 670
      streams/upgrade-system-tests-34/src/test/java/org/apache/kafka/streams/tests/SmokeTestDriver.java
  5. 131
      streams/upgrade-system-tests-34/src/test/java/org/apache/kafka/streams/tests/SmokeTestUtil.java
  6. 100
      streams/upgrade-system-tests-34/src/test/java/org/apache/kafka/streams/tests/StreamsSmokeTest.java
  7. 120
      streams/upgrade-system-tests-34/src/test/java/org/apache/kafka/streams/tests/StreamsUpgradeTest.java
  8. 4
      tests/kafkatest/tests/streams/streams_application_upgrade_test.py
  9. 5
      tests/kafkatest/tests/streams/streams_broker_compatibility_test.py
  10. 9
      tests/kafkatest/tests/streams/streams_upgrade_test.py

14
build.gradle

@ -2170,6 +2170,7 @@ project(':streams') { @@ -2170,6 +2170,7 @@ project(':streams') {
':streams:upgrade-system-tests-31:test',
':streams:upgrade-system-tests-32:test',
':streams:upgrade-system-tests-33:test',
':streams:upgrade-system-tests-34:test',
':streams:examples:test'
]
)
@ -2555,6 +2556,19 @@ project(':streams:upgrade-system-tests-33') { @@ -2555,6 +2556,19 @@ project(':streams:upgrade-system-tests-33') {
}
}
project(':streams:upgrade-system-tests-34') {
archivesBaseName = "kafka-streams-upgrade-system-tests-34"
dependencies {
testImplementation libs.kafkaStreams_34
testRuntimeOnly libs.junitJupiter
}
systemTestLibs {
dependsOn testJar
}
}
project(':jmh-benchmarks') {
apply plugin: 'com.github.johnrengelman.shadow'

1
settings.gradle

@ -92,6 +92,7 @@ include 'clients', @@ -92,6 +92,7 @@ include 'clients',
'streams:upgrade-system-tests-31',
'streams:upgrade-system-tests-32',
'streams:upgrade-system-tests-33',
'streams:upgrade-system-tests-34',
'tools',
'tools:tools-api',
'trogdor'

299
streams/upgrade-system-tests-34/src/test/java/org/apache/kafka/streams/tests/SmokeTestClient.java

@ -0,0 +1,299 @@ @@ -0,0 +1,299 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.streams.tests;
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.common.utils.Bytes;
import org.apache.kafka.common.utils.KafkaThread;
import org.apache.kafka.common.utils.Utils;
import org.apache.kafka.streams.KafkaStreams;
import org.apache.kafka.streams.KeyValue;
import org.apache.kafka.streams.StreamsBuilder;
import org.apache.kafka.streams.StreamsConfig;
import org.apache.kafka.streams.Topology;
import org.apache.kafka.streams.errors.StreamsUncaughtExceptionHandler;
import org.apache.kafka.streams.kstream.Consumed;
import org.apache.kafka.streams.kstream.Grouped;
import org.apache.kafka.streams.kstream.KGroupedStream;
import org.apache.kafka.streams.kstream.KStream;
import org.apache.kafka.streams.kstream.KTable;
import org.apache.kafka.streams.kstream.Materialized;
import org.apache.kafka.streams.kstream.Produced;
import org.apache.kafka.streams.kstream.Suppressed.BufferConfig;
import org.apache.kafka.streams.kstream.TimeWindows;
import org.apache.kafka.streams.kstream.Windowed;
import org.apache.kafka.streams.state.Stores;
import org.apache.kafka.streams.state.WindowStore;
import java.io.File;
import java.io.IOException;
import java.nio.file.Files;
import java.time.Duration;
import java.time.Instant;
import java.util.Properties;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import static org.apache.kafka.streams.kstream.Suppressed.untilWindowCloses;
public class SmokeTestClient extends SmokeTestUtil {
private final String name;
private KafkaStreams streams;
private boolean uncaughtException = false;
private boolean started;
private volatile boolean closed;
private static void addShutdownHook(final String name, final Runnable runnable) {
if (name != null) {
Runtime.getRuntime().addShutdownHook(KafkaThread.nonDaemon(name, runnable));
} else {
Runtime.getRuntime().addShutdownHook(new Thread(runnable));
}
}
private static File tempDirectory() {
final String prefix = "kafka-";
final File file;
try {
file = Files.createTempDirectory(prefix).toFile();
} catch (final IOException ex) {
throw new RuntimeException("Failed to create a temp dir", ex);
}
file.deleteOnExit();
addShutdownHook("delete-temp-file-shutdown-hook", () -> {
try {
Utils.delete(file);
} catch (final IOException e) {
System.out.println("Error deleting " + file.getAbsolutePath());
e.printStackTrace(System.out);
}
});
return file;
}
public SmokeTestClient(final String name) {
this.name = name;
}
public boolean started() {
return started;
}
public boolean closed() {
return closed;
}
public void start(final Properties streamsProperties) {
final Topology build = getTopology();
streams = new KafkaStreams(build, getStreamsConfig(streamsProperties));
final CountDownLatch countDownLatch = new CountDownLatch(1);
streams.setStateListener((newState, oldState) -> {
System.out.printf("%s %s: %s -> %s%n", name, Instant.now(), oldState, newState);
if (oldState == KafkaStreams.State.REBALANCING && newState == KafkaStreams.State.RUNNING) {
started = true;
countDownLatch.countDown();
}
if (newState == KafkaStreams.State.NOT_RUNNING) {
closed = true;
}
});
streams.setUncaughtExceptionHandler(e -> {
System.out.println(name + ": SMOKE-TEST-CLIENT-EXCEPTION");
System.out.println(name + ": FATAL: An unexpected exception is encountered: " + e);
e.printStackTrace(System.out);
uncaughtException = true;
return StreamsUncaughtExceptionHandler.StreamThreadExceptionResponse.SHUTDOWN_CLIENT;
});
addShutdownHook("streams-shutdown-hook", this::close);
streams.start();
try {
if (!countDownLatch.await(1, TimeUnit.MINUTES)) {
System.out.println(name + ": SMOKE-TEST-CLIENT-EXCEPTION: Didn't start in one minute");
}
} catch (final InterruptedException e) {
System.out.println(name + ": SMOKE-TEST-CLIENT-EXCEPTION: " + e);
e.printStackTrace(System.out);
}
System.out.println(name + ": SMOKE-TEST-CLIENT-STARTED");
System.out.println(name + " started at " + Instant.now());
}
public void closeAsync() {
streams.close(Duration.ZERO);
}
public void close() {
final boolean closed = streams.close(Duration.ofMinutes(1));
if (closed && !uncaughtException) {
System.out.println(name + ": SMOKE-TEST-CLIENT-CLOSED");
} else if (closed) {
System.out.println(name + ": SMOKE-TEST-CLIENT-EXCEPTION");
} else {
System.out.println(name + ": SMOKE-TEST-CLIENT-EXCEPTION: Didn't close");
}
}
private Properties getStreamsConfig(final Properties props) {
final Properties fullProps = new Properties(props);
fullProps.put(StreamsConfig.APPLICATION_ID_CONFIG, "SmokeTest");
fullProps.put(StreamsConfig.CLIENT_ID_CONFIG, "SmokeTest-" + name);
fullProps.put(StreamsConfig.STATE_DIR_CONFIG, tempDirectory().getAbsolutePath());
fullProps.putAll(props);
return fullProps;
}
public Topology getTopology() {
final StreamsBuilder builder = new StreamsBuilder();
final Consumed<String, Integer> stringIntConsumed = Consumed.with(stringSerde, intSerde);
final KStream<String, Integer> source = builder.stream("data", stringIntConsumed);
source.filterNot((k, v) -> k.equals("flush"))
.to("echo", Produced.with(stringSerde, intSerde));
final KStream<String, Integer> data = source.filter((key, value) -> value == null || value != END);
data.process(SmokeTestUtil.printProcessorSupplier("data", name));
// min
final KGroupedStream<String, Integer> groupedData = data.groupByKey(Grouped.with(stringSerde, intSerde));
final KTable<Windowed<String>, Integer> minAggregation = groupedData
.windowedBy(TimeWindows.ofSizeAndGrace(Duration.ofDays(1), Duration.ofMinutes(1)))
.aggregate(
() -> Integer.MAX_VALUE,
(aggKey, value, aggregate) -> (value < aggregate) ? value : aggregate,
Materialized
.<String, Integer, WindowStore<Bytes, byte[]>>as("uwin-min")
.withValueSerde(intSerde)
.withRetention(Duration.ofHours(25))
);
streamify(minAggregation, "min-raw");
streamify(minAggregation.suppress(untilWindowCloses(BufferConfig.unbounded())), "min-suppressed");
minAggregation
.toStream(new Unwindow<>())
.filterNot((k, v) -> k.equals("flush"))
.to("min", Produced.with(stringSerde, intSerde));
final KTable<Windowed<String>, Integer> smallWindowSum = groupedData
.windowedBy(TimeWindows.ofSizeAndGrace(Duration.ofSeconds(2), Duration.ofSeconds(30)).advanceBy(Duration.ofSeconds(1)))
.reduce(Integer::sum);
streamify(smallWindowSum, "sws-raw");
streamify(smallWindowSum.suppress(untilWindowCloses(BufferConfig.unbounded())), "sws-suppressed");
final KTable<String, Integer> minTable = builder.table(
"min",
Consumed.with(stringSerde, intSerde),
Materialized.as("minStoreName"));
minTable.toStream().process(SmokeTestUtil.printProcessorSupplier("min", name));
// max
groupedData
.windowedBy(TimeWindows.ofSizeWithNoGrace(Duration.ofDays(2)))
.aggregate(
() -> Integer.MIN_VALUE,
(aggKey, value, aggregate) -> (value > aggregate) ? value : aggregate,
Materialized.<String, Integer, WindowStore<Bytes, byte[]>>as("uwin-max").withValueSerde(intSerde))
.toStream(new Unwindow<>())
.filterNot((k, v) -> k.equals("flush"))
.to("max", Produced.with(stringSerde, intSerde));
final KTable<String, Integer> maxTable = builder.table(
"max",
Consumed.with(stringSerde, intSerde),
Materialized.as("maxStoreName"));
maxTable.toStream().process(SmokeTestUtil.printProcessorSupplier("max", name));
// sum
groupedData
.windowedBy(TimeWindows.ofSizeWithNoGrace(Duration.ofDays(2)))
.aggregate(
() -> 0L,
(aggKey, value, aggregate) -> (long) value + aggregate,
Materialized.<String, Long, WindowStore<Bytes, byte[]>>as("win-sum").withValueSerde(longSerde))
.toStream(new Unwindow<>())
.filterNot((k, v) -> k.equals("flush"))
.to("sum", Produced.with(stringSerde, longSerde));
final Consumed<String, Long> stringLongConsumed = Consumed.with(stringSerde, longSerde);
final KTable<String, Long> sumTable = builder.table("sum", stringLongConsumed);
sumTable.toStream().process(SmokeTestUtil.printProcessorSupplier("sum", name));
// cnt
groupedData
.windowedBy(TimeWindows.ofSizeWithNoGrace(Duration.ofDays(2)))
.count(Materialized.as("uwin-cnt"))
.toStream(new Unwindow<>())
.filterNot((k, v) -> k.equals("flush"))
.to("cnt", Produced.with(stringSerde, longSerde));
final KTable<String, Long> cntTable = builder.table(
"cnt",
Consumed.with(stringSerde, longSerde),
Materialized.as("cntStoreName"));
cntTable.toStream().process(SmokeTestUtil.printProcessorSupplier("cnt", name));
// dif
maxTable
.join(
minTable,
(value1, value2) -> value1 - value2)
.toStream()
.filterNot((k, v) -> k.equals("flush"))
.to("dif", Produced.with(stringSerde, intSerde));
// avg
sumTable
.join(
cntTable,
(value1, value2) -> (double) value1 / (double) value2)
.toStream()
.filterNot((k, v) -> k.equals("flush"))
.to("avg", Produced.with(stringSerde, doubleSerde));
// test repartition
final Agg agg = new Agg();
cntTable.groupBy(agg.selector(), Grouped.with(stringSerde, longSerde))
.aggregate(agg.init(), agg.adder(), agg.remover(),
Materialized.<String, Long>as(Stores.inMemoryKeyValueStore("cntByCnt"))
.withKeySerde(Serdes.String())
.withValueSerde(Serdes.Long()))
.toStream()
.to("tagg", Produced.with(stringSerde, longSerde));
return builder.build();
}
private static void streamify(final KTable<Windowed<String>, Integer> windowedTable, final String topic) {
windowedTable
.toStream()
.filterNot((k, v) -> k.key().equals("flush"))
.map((key, value) -> new KeyValue<>(key.toString(), value))
.to(topic, Produced.with(stringSerde, intSerde));
}
}

670
streams/upgrade-system-tests-34/src/test/java/org/apache/kafka/streams/tests/SmokeTestDriver.java

@ -0,0 +1,670 @@ @@ -0,0 +1,670 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.streams.tests;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.producer.Callback;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;
import org.apache.kafka.common.PartitionInfo;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.errors.TimeoutException;
import org.apache.kafka.common.serialization.ByteArraySerializer;
import org.apache.kafka.common.serialization.Deserializer;
import org.apache.kafka.common.serialization.Serde;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.apache.kafka.common.utils.Exit;
import org.apache.kafka.common.utils.Utils;
import java.io.ByteArrayOutputStream;
import java.io.PrintStream;
import java.nio.charset.StandardCharsets;
import java.time.Duration;
import java.time.Instant;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.Random;
import java.util.Set;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.Function;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import static java.util.Collections.emptyMap;
import static org.apache.kafka.common.utils.Utils.mkEntry;
public class SmokeTestDriver extends SmokeTestUtil {
private static final String[] NUMERIC_VALUE_TOPICS = {
"data",
"echo",
"max",
"min", "min-suppressed", "min-raw",
"dif",
"sum",
"sws-raw", "sws-suppressed",
"cnt",
"avg",
"tagg"
};
private static final String[] STRING_VALUE_TOPICS = {
"fk"
};
private static final String[] TOPICS = new String[NUMERIC_VALUE_TOPICS.length + STRING_VALUE_TOPICS.length];
static {
System.arraycopy(NUMERIC_VALUE_TOPICS, 0, TOPICS, 0, NUMERIC_VALUE_TOPICS.length);
System.arraycopy(STRING_VALUE_TOPICS, 0, TOPICS, NUMERIC_VALUE_TOPICS.length, STRING_VALUE_TOPICS.length);
}
private static final int MAX_RECORD_EMPTY_RETRIES = 30;
private static class ValueList {
public final String key;
private final int[] values;
private int index;
ValueList(final int min, final int max) {
key = min + "-" + max;
values = new int[max - min + 1];
for (int i = 0; i < values.length; i++) {
values[i] = min + i;
}
// We want to randomize the order of data to test not completely predictable processing order
// However, values are also use as a timestamp of the record. (TODO: separate data and timestamp)
// We keep some correlation of time and order. Thus, the shuffling is done with a sliding window
shuffle(values, 10);
index = 0;
}
int next() {
return (index < values.length) ? values[index++] : -1;
}
}
public static String[] topics() {
return Arrays.copyOf(TOPICS, TOPICS.length);
}
static void generatePerpetually(final String kafka,
final int numKeys,
final int maxRecordsPerKey) {
final Properties producerProps = generatorProperties(kafka);
int numRecordsProduced = 0;
final ValueList[] data = new ValueList[numKeys];
for (int i = 0; i < numKeys; i++) {
data[i] = new ValueList(i, i + maxRecordsPerKey - 1);
}
final Random rand = new Random();
try (final KafkaProducer<byte[], byte[]> producer = new KafkaProducer<>(producerProps)) {
while (true) {
final int index = rand.nextInt(numKeys);
final String key = data[index].key;
final int value = data[index].next();
final ProducerRecord<byte[], byte[]> record =
new ProducerRecord<>(
"data",
stringSerde.serializer().serialize("", key),
intSerde.serializer().serialize("", value)
);
producer.send(record);
final ProducerRecord<byte[], byte[]> fkRecord =
new ProducerRecord<>(
"fk",
intSerde.serializer().serialize("", value),
stringSerde.serializer().serialize("", key)
);
producer.send(fkRecord);
numRecordsProduced++;
if (numRecordsProduced % 100 == 0) {
System.out.println(Instant.now() + " " + numRecordsProduced + " records produced");
}
Utils.sleep(2);
}
}
}
public static Map<String, Set<Integer>> generate(final String kafka,
final int numKeys,
final int maxRecordsPerKey,
final Duration timeToSpend) {
final Properties producerProps = generatorProperties(kafka);
int numRecordsProduced = 0;
final Map<String, Set<Integer>> allData = new HashMap<>();
final ValueList[] data = new ValueList[numKeys];
for (int i = 0; i < numKeys; i++) {
data[i] = new ValueList(i, i + maxRecordsPerKey - 1);
allData.put(data[i].key, new HashSet<>());
}
final Random rand = new Random();
int remaining = data.length;
final long recordPauseTime = timeToSpend.toMillis() / numKeys / maxRecordsPerKey;
final List<ProducerRecord<byte[], byte[]>> dataNeedRetry = new ArrayList<>();
final List<ProducerRecord<byte[], byte[]>> fkNeedRetry = new ArrayList<>();
try (final KafkaProducer<byte[], byte[]> producer = new KafkaProducer<>(producerProps)) {
while (remaining > 0) {
final int index = rand.nextInt(remaining);
final String key = data[index].key;
final int value = data[index].next();
if (value < 0) {
remaining--;
data[index] = data[remaining];
} else {
final ProducerRecord<byte[], byte[]> record =
new ProducerRecord<>(
"data",
stringSerde.serializer().serialize("", key),
intSerde.serializer().serialize("", value)
);
producer.send(record, new TestCallback(record, dataNeedRetry));
final ProducerRecord<byte[], byte[]> fkRecord =
new ProducerRecord<>(
"fk",
intSerde.serializer().serialize("", value),
stringSerde.serializer().serialize("", key)
);
producer.send(fkRecord, new TestCallback(fkRecord, fkNeedRetry));
numRecordsProduced++;
allData.get(key).add(value);
if (numRecordsProduced % 100 == 0) {
System.out.println(Instant.now() + " " + numRecordsProduced + " records produced");
}
Utils.sleep(Math.max(recordPauseTime, 2));
}
}
producer.flush();
retry(producer, dataNeedRetry, stringSerde);
retry(producer, fkNeedRetry, intSerde);
flush(producer,
"data",
stringSerde.serializer().serialize("", "flush"),
intSerde.serializer().serialize("", 0)
);
flush(producer,
"fk",
intSerde.serializer().serialize("", 0),
stringSerde.serializer().serialize("", "flush")
);
}
return Collections.unmodifiableMap(allData);
}
private static void retry(final KafkaProducer<byte[], byte[]> producer,
List<ProducerRecord<byte[], byte[]>> needRetry,
final Serde<?> keySerde) {
int remainingRetries = 5;
while (!needRetry.isEmpty()) {
final List<ProducerRecord<byte[], byte[]>> needRetry2 = new ArrayList<>();
for (final ProducerRecord<byte[], byte[]> record : needRetry) {
System.out.println(
"retry producing " + keySerde.deserializer().deserialize("", record.key()));
producer.send(record, new TestCallback(record, needRetry2));
}
producer.flush();
needRetry = needRetry2;
if (--remainingRetries == 0 && !needRetry.isEmpty()) {
System.err.println("Failed to produce all records after multiple retries");
Exit.exit(1);
}
}
}
private static void flush(final KafkaProducer<byte[], byte[]> producer,
final String topic,
final byte[] keyBytes,
final byte[] valBytes) {
// now that we've sent everything, we'll send some final records with a timestamp high enough to flush out
// all suppressed records.
final List<PartitionInfo> partitions = producer.partitionsFor(topic);
for (final PartitionInfo partition : partitions) {
producer.send(new ProducerRecord<>(
partition.topic(),
partition.partition(),
System.currentTimeMillis() + Duration.ofDays(2).toMillis(),
keyBytes,
valBytes
));
}
}
private static Properties generatorProperties(final String kafka) {
final Properties producerProps = new Properties();
producerProps.put(ProducerConfig.CLIENT_ID_CONFIG, "SmokeTest");
producerProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, kafka);
producerProps.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, ByteArraySerializer.class);
producerProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, ByteArraySerializer.class);
producerProps.put(ProducerConfig.ACKS_CONFIG, "all");
return producerProps;
}
private static class TestCallback implements Callback {
private final ProducerRecord<byte[], byte[]> originalRecord;
private final List<ProducerRecord<byte[], byte[]>> needRetry;
TestCallback(final ProducerRecord<byte[], byte[]> originalRecord,
final List<ProducerRecord<byte[], byte[]>> needRetry) {
this.originalRecord = originalRecord;
this.needRetry = needRetry;
}
@Override
public void onCompletion(final RecordMetadata metadata, final Exception exception) {
if (exception != null) {
if (exception instanceof TimeoutException) {
needRetry.add(originalRecord);
} else {
exception.printStackTrace();
Exit.exit(1);
}
}
}
}
private static void shuffle(final int[] data, @SuppressWarnings("SameParameterValue") final int windowSize) {
final Random rand = new Random();
for (int i = 0; i < data.length; i++) {
// we shuffle data within windowSize
final int j = rand.nextInt(Math.min(data.length - i, windowSize)) + i;
// swap
final int tmp = data[i];
data[i] = data[j];
data[j] = tmp;
}
}
public static class NumberDeserializer implements Deserializer<Number> {
@Override
public Number deserialize(final String topic, final byte[] data) {
final Number value;
switch (topic) {
case "data":
case "echo":
case "min":
case "min-raw":
case "min-suppressed":
case "sws-raw":
case "sws-suppressed":
case "max":
case "dif":
value = intSerde.deserializer().deserialize(topic, data);
break;
case "sum":
case "cnt":
case "tagg":
value = longSerde.deserializer().deserialize(topic, data);
break;
case "avg":
value = doubleSerde.deserializer().deserialize(topic, data);
break;
default:
throw new RuntimeException("unknown topic: " + topic);
}
return value;
}
}
public static VerificationResult verify(final String kafka,
final Map<String, Set<Integer>> inputs,
final int maxRecordsPerKey) {
final Properties props = new Properties();
props.put(ConsumerConfig.CLIENT_ID_CONFIG, "verifier");
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, kafka);
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, NumberDeserializer.class);
props.put(ConsumerConfig.ISOLATION_LEVEL_CONFIG, "read_committed");
final KafkaConsumer<String, Number> consumer = new KafkaConsumer<>(props);
final List<TopicPartition> partitions = getAllPartitions(consumer, NUMERIC_VALUE_TOPICS);
consumer.assign(partitions);
consumer.seekToBeginning(partitions);
final int recordsGenerated = inputs.size() * maxRecordsPerKey;
int recordsProcessed = 0;
final Map<String, AtomicInteger> processed =
Stream.of(NUMERIC_VALUE_TOPICS)
.collect(Collectors.toMap(t -> t, t -> new AtomicInteger(0)));
final Map<String, Map<String, LinkedList<ConsumerRecord<String, Number>>>> events = new HashMap<>();
VerificationResult verificationResult = new VerificationResult(false, "no results yet");
int retry = 0;
final long start = System.currentTimeMillis();
while (System.currentTimeMillis() - start < TimeUnit.MINUTES.toMillis(6)) {
final ConsumerRecords<String, Number> records = consumer.poll(Duration.ofSeconds(5));
if (records.isEmpty() && recordsProcessed >= recordsGenerated) {
verificationResult = verifyAll(inputs, events, false);
if (verificationResult.passed()) {
break;
} else if (retry++ > MAX_RECORD_EMPTY_RETRIES) {
System.out.println(Instant.now() + " Didn't get any more results, verification hasn't passed, and out of retries.");
break;
} else {
System.out.println(Instant.now() + " Didn't get any more results, but verification hasn't passed (yet). Retrying..." + retry);
}
} else {
System.out.println(Instant.now() + " Get some more results from " + records.partitions() + ", resetting retry.");
retry = 0;
for (final ConsumerRecord<String, Number> record : records) {
final String key = record.key();
final String topic = record.topic();
processed.get(topic).incrementAndGet();
if (topic.equals("echo")) {
recordsProcessed++;
if (recordsProcessed % 100 == 0) {
System.out.println("Echo records processed = " + recordsProcessed);
}
}
events.computeIfAbsent(topic, t -> new HashMap<>())
.computeIfAbsent(key, k -> new LinkedList<>())
.add(record);
}
System.out.println(processed);
}
}
consumer.close();
final long finished = System.currentTimeMillis() - start;
System.out.println("Verification time=" + finished);
System.out.println("-------------------");
System.out.println("Result Verification");
System.out.println("-------------------");
System.out.println("recordGenerated=" + recordsGenerated);
System.out.println("recordProcessed=" + recordsProcessed);
if (recordsProcessed > recordsGenerated) {
System.out.println("PROCESSED-MORE-THAN-GENERATED");
} else if (recordsProcessed < recordsGenerated) {
System.out.println("PROCESSED-LESS-THAN-GENERATED");
}
boolean success;
final Map<String, Set<Number>> received =
events.get("echo")
.entrySet()
.stream()
.map(entry -> mkEntry(
entry.getKey(),
entry.getValue().stream().map(ConsumerRecord::value).collect(Collectors.toSet()))
)
.collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue));
success = inputs.equals(received);
if (success) {
System.out.println("ALL-RECORDS-DELIVERED");
} else {
int missedCount = 0;
for (final Map.Entry<String, Set<Integer>> entry : inputs.entrySet()) {
missedCount += received.get(entry.getKey()).size();
}
System.out.println("missedRecords=" + missedCount);
}
// give it one more try if it's not already passing.
if (!verificationResult.passed()) {
verificationResult = verifyAll(inputs, events, true);
}
success &= verificationResult.passed();
System.out.println(verificationResult.result());
System.out.println(success ? "SUCCESS" : "FAILURE");
return verificationResult;
}
public static class VerificationResult {
private final boolean passed;
private final String result;
VerificationResult(final boolean passed, final String result) {
this.passed = passed;
this.result = result;
}
public boolean passed() {
return passed;
}
public String result() {
return result;
}
}
private static VerificationResult verifyAll(final Map<String, Set<Integer>> inputs,
final Map<String, Map<String, LinkedList<ConsumerRecord<String, Number>>>> events,
final boolean printResults) {
final ByteArrayOutputStream byteArrayOutputStream = new ByteArrayOutputStream();
boolean pass;
try (final PrintStream resultStream = new PrintStream(byteArrayOutputStream)) {
pass = verifyTAgg(resultStream, inputs, events.get("tagg"), printResults);
pass &= verifySuppressed(resultStream, "min-suppressed", events, printResults);
pass &= verify(resultStream, "min-suppressed", inputs, events, windowedKey -> {
final String unwindowedKey = windowedKey.substring(1, windowedKey.length() - 1).replaceAll("@.*", "");
return getMin(unwindowedKey);
}, printResults);
pass &= verifySuppressed(resultStream, "sws-suppressed", events, printResults);
pass &= verify(resultStream, "min", inputs, events, SmokeTestDriver::getMin, printResults);
pass &= verify(resultStream, "max", inputs, events, SmokeTestDriver::getMax, printResults);
pass &= verify(resultStream, "dif", inputs, events, key -> getMax(key).intValue() - getMin(key).intValue(), printResults);
pass &= verify(resultStream, "sum", inputs, events, SmokeTestDriver::getSum, printResults);
pass &= verify(resultStream, "cnt", inputs, events, key1 -> getMax(key1).intValue() - getMin(key1).intValue() + 1L, printResults);
pass &= verify(resultStream, "avg", inputs, events, SmokeTestDriver::getAvg, printResults);
}
return new VerificationResult(pass, new String(byteArrayOutputStream.toByteArray(), StandardCharsets.UTF_8));
}
private static boolean verify(final PrintStream resultStream,
final String topic,
final Map<String, Set<Integer>> inputData,
final Map<String, Map<String, LinkedList<ConsumerRecord<String, Number>>>> events,
final Function<String, Number> keyToExpectation,
final boolean printResults) {
final Map<String, LinkedList<ConsumerRecord<String, Number>>> observedInputEvents = events.get("data");
final Map<String, LinkedList<ConsumerRecord<String, Number>>> outputEvents = events.getOrDefault(topic, emptyMap());
if (outputEvents.isEmpty()) {
resultStream.println(topic + " is empty");
return false;
} else {
resultStream.printf("verifying %s with %d keys%n", topic, outputEvents.size());
if (outputEvents.size() != inputData.size()) {
resultStream.printf("fail: resultCount=%d expectedCount=%s%n\tresult=%s%n\texpected=%s%n",
outputEvents.size(), inputData.size(), outputEvents.keySet(), inputData.keySet());
return false;
}
for (final Map.Entry<String, LinkedList<ConsumerRecord<String, Number>>> entry : outputEvents.entrySet()) {
final String key = entry.getKey();
final Number expected = keyToExpectation.apply(key);
final Number actual = entry.getValue().getLast().value();
if (!expected.equals(actual)) {
resultStream.printf("%s fail: key=%s actual=%s expected=%s%n", topic, key, actual, expected);
if (printResults) {
resultStream.printf("\t inputEvents=%n%s%n\t" +
"echoEvents=%n%s%n\tmaxEvents=%n%s%n\tminEvents=%n%s%n\tdifEvents=%n%s%n\tcntEvents=%n%s%n\ttaggEvents=%n%s%n",
indent("\t\t", observedInputEvents.get(key)),
indent("\t\t", events.getOrDefault("echo", emptyMap()).getOrDefault(key, new LinkedList<>())),
indent("\t\t", events.getOrDefault("max", emptyMap()).getOrDefault(key, new LinkedList<>())),
indent("\t\t", events.getOrDefault("min", emptyMap()).getOrDefault(key, new LinkedList<>())),
indent("\t\t", events.getOrDefault("dif", emptyMap()).getOrDefault(key, new LinkedList<>())),
indent("\t\t", events.getOrDefault("cnt", emptyMap()).getOrDefault(key, new LinkedList<>())),
indent("\t\t", events.getOrDefault("tagg", emptyMap()).getOrDefault(key, new LinkedList<>())));
if (!Utils.mkSet("echo", "max", "min", "dif", "cnt", "tagg").contains(topic))
resultStream.printf("%sEvents=%n%s%n", topic, indent("\t\t", entry.getValue()));
}
return false;
}
}
return true;
}
}
private static boolean verifySuppressed(final PrintStream resultStream,
@SuppressWarnings("SameParameterValue") final String topic,
final Map<String, Map<String, LinkedList<ConsumerRecord<String, Number>>>> events,
final boolean printResults) {
resultStream.println("verifying suppressed " + topic);
final Map<String, LinkedList<ConsumerRecord<String, Number>>> topicEvents = events.getOrDefault(topic, emptyMap());
for (final Map.Entry<String, LinkedList<ConsumerRecord<String, Number>>> entry : topicEvents.entrySet()) {
if (entry.getValue().size() != 1) {
final String unsuppressedTopic = topic.replace("-suppressed", "-raw");
final String key = entry.getKey();
final String unwindowedKey = key.substring(1, key.length() - 1).replaceAll("@.*", "");
resultStream.printf("fail: key=%s%n\tnon-unique result:%n%s%n",
key,
indent("\t\t", entry.getValue()));
if (printResults)
resultStream.printf("\tresultEvents:%n%s%n\tinputEvents:%n%s%n",
indent("\t\t", events.get(unsuppressedTopic).get(key)),
indent("\t\t", events.get("data").get(unwindowedKey)));
return false;
}
}
return true;
}
private static String indent(@SuppressWarnings("SameParameterValue") final String prefix,
final Iterable<ConsumerRecord<String, Number>> list) {
final StringBuilder stringBuilder = new StringBuilder();
for (final ConsumerRecord<String, Number> record : list) {
stringBuilder.append(prefix).append(record).append('\n');
}
return stringBuilder.toString();
}
private static Long getSum(final String key) {
final int min = getMin(key).intValue();
final int max = getMax(key).intValue();
return ((long) min + max) * (max - min + 1L) / 2L;
}
private static Double getAvg(final String key) {
final int min = getMin(key).intValue();
final int max = getMax(key).intValue();
return ((long) min + max) / 2.0;
}
private static boolean verifyTAgg(final PrintStream resultStream,
final Map<String, Set<Integer>> allData,
final Map<String, LinkedList<ConsumerRecord<String, Number>>> taggEvents,
final boolean printResults) {
if (taggEvents == null) {
resultStream.println("tagg is missing");
return false;
} else if (taggEvents.isEmpty()) {
resultStream.println("tagg is empty");
return false;
} else {
resultStream.println("verifying tagg");
// generate expected answer
final Map<String, Long> expected = new HashMap<>();
for (final String key : allData.keySet()) {
final int min = getMin(key).intValue();
final int max = getMax(key).intValue();
final String cnt = Long.toString(max - min + 1L);
expected.put(cnt, expected.getOrDefault(cnt, 0L) + 1);
}
// check the result
for (final Map.Entry<String, LinkedList<ConsumerRecord<String, Number>>> entry : taggEvents.entrySet()) {
final String key = entry.getKey();
Long expectedCount = expected.remove(key);
if (expectedCount == null) {
expectedCount = 0L;
}
if (entry.getValue().getLast().value().longValue() != expectedCount) {
resultStream.println("fail: key=" + key + " tagg=" + entry.getValue() + " expected=" + expectedCount);
if (printResults)
resultStream.println("\t taggEvents: " + entry.getValue());
return false;
}
}
}
return true;
}
private static Number getMin(final String key) {
return Integer.parseInt(key.split("-")[0]);
}
private static Number getMax(final String key) {
return Integer.parseInt(key.split("-")[1]);
}
private static List<TopicPartition> getAllPartitions(final KafkaConsumer<?, ?> consumer, final String... topics) {
final List<TopicPartition> partitions = new ArrayList<>();
for (final String topic : topics) {
for (final PartitionInfo info : consumer.partitionsFor(topic)) {
partitions.add(new TopicPartition(info.topic(), info.partition()));
}
}
return partitions;
}
}

131
streams/upgrade-system-tests-34/src/test/java/org/apache/kafka/streams/tests/SmokeTestUtil.java

@ -0,0 +1,131 @@ @@ -0,0 +1,131 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.streams.tests;
import org.apache.kafka.common.serialization.Serde;
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.streams.KeyValue;
import org.apache.kafka.streams.kstream.Aggregator;
import org.apache.kafka.streams.kstream.Initializer;
import org.apache.kafka.streams.kstream.KeyValueMapper;
import org.apache.kafka.streams.kstream.Windowed;
import org.apache.kafka.streams.processor.api.ContextualProcessor;
import org.apache.kafka.streams.processor.api.ProcessorContext;
import org.apache.kafka.streams.processor.api.ProcessorSupplier;
import org.apache.kafka.streams.processor.api.Record;
import java.time.Instant;
public class SmokeTestUtil {
final static int END = Integer.MAX_VALUE;
static ProcessorSupplier<Object, Object, Void, Void> printProcessorSupplier(final String topic) {
return printProcessorSupplier(topic, "");
}
static ProcessorSupplier<Object, Object, Void, Void> printProcessorSupplier(final String topic, final String name) {
return () -> new ContextualProcessor<Object, Object, Void, Void>() {
private int numRecordsProcessed = 0;
private long smallestOffset = Long.MAX_VALUE;
private long largestOffset = Long.MIN_VALUE;
@Override
public void init(final ProcessorContext<Void, Void> context) {
super.init(context);
System.out.println("[3.4] initializing processor: topic=" + topic + " taskId=" + context.taskId());
System.out.flush();
numRecordsProcessed = 0;
smallestOffset = Long.MAX_VALUE;
largestOffset = Long.MIN_VALUE;
}
@Override
public void process(final Record<Object, Object> record) {
numRecordsProcessed++;
if (numRecordsProcessed % 100 == 0) {
System.out.printf("%s: %s%n", name, Instant.now());
System.out.println("processed " + numRecordsProcessed + " records from topic=" + topic);
}
context().recordMetadata().ifPresent(recordMetadata -> {
if (smallestOffset > recordMetadata.offset()) {
smallestOffset = recordMetadata.offset();
}
if (largestOffset < recordMetadata.offset()) {
largestOffset = recordMetadata.offset();
}
});
}
@Override
public void close() {
System.out.printf("Close processor for task %s%n", context().taskId());
System.out.println("processed " + numRecordsProcessed + " records");
final long processed;
if (largestOffset >= smallestOffset) {
processed = 1L + largestOffset - smallestOffset;
} else {
processed = 0L;
}
System.out.println("offset " + smallestOffset + " to " + largestOffset + " -> processed " + processed);
System.out.flush();
}
};
}
public static final class Unwindow<K, V> implements KeyValueMapper<Windowed<K>, V, K> {
@Override
public K apply(final Windowed<K> winKey, final V value) {
return winKey.key();
}
}
public static class Agg {
KeyValueMapper<String, Long, KeyValue<String, Long>> selector() {
return (key, value) -> new KeyValue<>(value == null ? null : Long.toString(value), 1L);
}
public Initializer<Long> init() {
return () -> 0L;
}
Aggregator<String, Long, Long> adder() {
return (aggKey, value, aggregate) -> aggregate + value;
}
Aggregator<String, Long, Long> remover() {
return (aggKey, value, aggregate) -> aggregate - value;
}
}
public static Serde<String> stringSerde = Serdes.String();
public static Serde<Integer> intSerde = Serdes.Integer();
static Serde<Long> longSerde = Serdes.Long();
static Serde<Double> doubleSerde = Serdes.Double();
public static void sleep(final long duration) {
try {
Thread.sleep(duration);
} catch (final Exception ignore) { }
}
}

100
streams/upgrade-system-tests-34/src/test/java/org/apache/kafka/streams/tests/StreamsSmokeTest.java

@ -0,0 +1,100 @@ @@ -0,0 +1,100 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.streams.tests;
import org.apache.kafka.common.utils.Exit;
import org.apache.kafka.common.utils.Utils;
import org.apache.kafka.streams.StreamsConfig;
import java.io.IOException;
import java.time.Duration;
import java.util.Map;
import java.util.Properties;
import java.util.Set;
import java.util.UUID;
import static org.apache.kafka.streams.tests.SmokeTestDriver.generate;
import static org.apache.kafka.streams.tests.SmokeTestDriver.generatePerpetually;
public class StreamsSmokeTest {
/**
* args ::= kafka propFileName command disableAutoTerminate
* command := "run" | "process"
*
* @param args
*/
public static void main(final String[] args) throws IOException {
if (args.length < 2) {
System.err.println("StreamsSmokeTest are expecting two parameters: propFile, command; but only see " + args.length + " parameter");
Exit.exit(1);
}
final String propFileName = args[0];
final String command = args[1];
final boolean disableAutoTerminate = args.length > 2;
final Properties streamsProperties = Utils.loadProps(propFileName);
final String kafka = streamsProperties.getProperty(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG);
final String processingGuarantee = streamsProperties.getProperty(StreamsConfig.PROCESSING_GUARANTEE_CONFIG);
if (kafka == null) {
System.err.println("No bootstrap kafka servers specified in " + StreamsConfig.BOOTSTRAP_SERVERS_CONFIG);
Exit.exit(1);
}
if ("process".equals(command)) {
if (!StreamsConfig.AT_LEAST_ONCE.equals(processingGuarantee) &&
!StreamsConfig.EXACTLY_ONCE_V2.equals(processingGuarantee)) {
System.err.println("processingGuarantee must be either " + StreamsConfig.AT_LEAST_ONCE + " or " +
StreamsConfig.EXACTLY_ONCE_V2);
Exit.exit(1);
}
}
System.out.println("StreamsTest instance started (StreamsSmokeTest)");
System.out.println("command=" + command);
System.out.println("props=" + streamsProperties);
System.out.println("disableAutoTerminate=" + disableAutoTerminate);
switch (command) {
case "run":
// this starts the driver (data generation and result verification)
final int numKeys = 10;
final int maxRecordsPerKey = 500;
if (disableAutoTerminate) {
generatePerpetually(kafka, numKeys, maxRecordsPerKey);
} else {
// slow down data production to span 30 seconds so that system tests have time to
// do their bounces, etc.
final Map<String, Set<Integer>> allData =
generate(kafka, numKeys, maxRecordsPerKey, Duration.ofSeconds(30));
SmokeTestDriver.verify(kafka, allData, maxRecordsPerKey);
}
break;
case "process":
// this starts the stream processing app
new SmokeTestClient(UUID.randomUUID().toString()).start(streamsProperties);
break;
default:
System.out.println("unknown command: " + command);
}
}
}

120
streams/upgrade-system-tests-34/src/test/java/org/apache/kafka/streams/tests/StreamsUpgradeTest.java

@ -0,0 +1,120 @@ @@ -0,0 +1,120 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.streams.tests;
import org.apache.kafka.common.utils.Utils;
import org.apache.kafka.streams.KafkaStreams;
import org.apache.kafka.streams.StreamsBuilder;
import org.apache.kafka.streams.StreamsConfig;
import org.apache.kafka.streams.kstream.Consumed;
import org.apache.kafka.streams.kstream.KStream;
import org.apache.kafka.streams.kstream.KTable;
import org.apache.kafka.streams.kstream.Produced;
import org.apache.kafka.streams.processor.api.ContextualProcessor;
import org.apache.kafka.streams.processor.api.ProcessorContext;
import org.apache.kafka.streams.processor.api.ProcessorSupplier;
import org.apache.kafka.streams.processor.api.Record;
import java.util.Properties;
import static org.apache.kafka.streams.tests.SmokeTestUtil.intSerde;
import static org.apache.kafka.streams.tests.SmokeTestUtil.stringSerde;
public class StreamsUpgradeTest {
@SuppressWarnings("unchecked")
public static void main(final String[] args) throws Exception {
if (args.length < 1) {
System.err.println("StreamsUpgradeTest requires one argument (properties-file) but provided none");
}
final String propFileName = args[0];
final Properties streamsProperties = Utils.loadProps(propFileName);
System.out.println("StreamsTest instance started (StreamsUpgradeTest v3.4)");
System.out.println("props=" + streamsProperties);
final StreamsBuilder builder = new StreamsBuilder();
final KTable<String, Integer> dataTable = builder.table(
"data", Consumed.with(stringSerde, intSerde));
final KStream<String, Integer> dataStream = dataTable.toStream();
dataStream.process(printProcessorSupplier("data"));
dataStream.to("echo");
final boolean runFkJoin = Boolean.parseBoolean(streamsProperties.getProperty(
"test.run_fk_join",
"false"));
if (runFkJoin) {
try {
final KTable<Integer, String> fkTable = builder.table(
"fk", Consumed.with(intSerde, stringSerde));
buildFKTable(dataStream, fkTable);
} catch (final Exception e) {
System.err.println("Caught " + e.getMessage());
}
}
final Properties config = new Properties();
config.setProperty(
StreamsConfig.APPLICATION_ID_CONFIG,
"StreamsUpgradeTest");
config.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, 1000);
config.putAll(streamsProperties);
final KafkaStreams streams = new KafkaStreams(builder.build(), config);
streams.start();
Runtime.getRuntime().addShutdownHook(new Thread(() -> {
streams.close();
System.out.println("UPGRADE-TEST-CLIENT-CLOSED");
System.out.flush();
}));
}
private static void buildFKTable(final KStream<String, Integer> primaryTable,
final KTable<Integer, String> otherTable) {
final KStream<String, String> kStream = primaryTable.toTable()
.join(otherTable, v -> v, (k0, v0) -> v0)
.toStream();
kStream.process(printProcessorSupplier("fk"));
kStream.to("fk-result", Produced.with(stringSerde, stringSerde));
}
private static <KIn, VIn, KOut, VOut> ProcessorSupplier<KIn, VIn, KOut, VOut> printProcessorSupplier(final String topic) {
return () -> new ContextualProcessor<KIn, VIn, KOut, VOut>() {
private int numRecordsProcessed = 0;
@Override
public void init(final ProcessorContext<KOut, VOut> context) {
System.out.println("[3.4] initializing processor: topic=" + topic + "taskId=" + context.taskId());
numRecordsProcessed = 0;
}
@Override
public void process(final Record<KIn, VIn> record) {
numRecordsProcessed++;
if (numRecordsProcessed % 100 == 0) {
System.out.println("processed " + numRecordsProcessed + " records from topic=" + topic);
}
}
@Override
public void close() {}
};
}
}

4
tests/kafkatest/tests/streams/streams_application_upgrade_test.py

@ -22,12 +22,12 @@ from kafkatest.services.kafka import KafkaService @@ -22,12 +22,12 @@ from kafkatest.services.kafka import KafkaService
from kafkatest.services.streams import StreamsSmokeTestDriverService, StreamsSmokeTestJobRunnerService
from kafkatest.services.zookeeper import ZookeeperService
from kafkatest.version import LATEST_2_2, LATEST_2_3, LATEST_2_4, LATEST_2_5, LATEST_2_6, LATEST_2_7, LATEST_2_8, \
LATEST_3_0, LATEST_3_1, LATEST_3_2, LATEST_3_3, DEV_VERSION, KafkaVersion
LATEST_3_0, LATEST_3_1, LATEST_3_2, LATEST_3_3, LATEST_3_4, DEV_VERSION, KafkaVersion
smoke_test_versions = [str(LATEST_2_2), str(LATEST_2_3), str(LATEST_2_4),
str(LATEST_2_5), str(LATEST_2_6), str(LATEST_2_7),
str(LATEST_2_8), str(LATEST_3_0), str(LATEST_3_1),
str(LATEST_3_2), str(LATEST_3_3)]
str(LATEST_3_2), str(LATEST_3_3), str(LATEST_3_4)]
dev_version = [str(DEV_VERSION)]
class StreamsUpgradeTest(Test):

5
tests/kafkatest/tests/streams/streams_broker_compatibility_test.py

@ -23,7 +23,7 @@ from kafkatest.services.verifiable_consumer import VerifiableConsumer @@ -23,7 +23,7 @@ from kafkatest.services.verifiable_consumer import VerifiableConsumer
from kafkatest.services.zookeeper import ZookeeperService
from kafkatest.version import LATEST_0_11_0, LATEST_0_10_2, LATEST_0_10_1, LATEST_0_10_0, LATEST_1_0, LATEST_1_1, \
LATEST_2_0, LATEST_2_1, LATEST_2_2, LATEST_2_3, LATEST_2_4, LATEST_2_5, LATEST_2_6, LATEST_2_7, LATEST_2_8, \
LATEST_3_0, LATEST_3_1, LATEST_3_2, LATEST_3_3, KafkaVersion
LATEST_3_0, LATEST_3_1, LATEST_3_2, LATEST_3_3, LATEST_3_4, KafkaVersion
class StreamsBrokerCompatibility(Test):
@ -64,6 +64,7 @@ class StreamsBrokerCompatibility(Test): @@ -64,6 +64,7 @@ class StreamsBrokerCompatibility(Test):
@cluster(num_nodes=4)
@parametrize(broker_version=str(LATEST_3_4))
@parametrize(broker_version=str(LATEST_3_3))
@parametrize(broker_version=str(LATEST_3_2))
@parametrize(broker_version=str(LATEST_3_1))
@ -97,6 +98,7 @@ class StreamsBrokerCompatibility(Test): @@ -97,6 +98,7 @@ class StreamsBrokerCompatibility(Test):
self.kafka.stop()
@cluster(num_nodes=4)
@parametrize(broker_version=str(LATEST_3_4))
@parametrize(broker_version=str(LATEST_3_3))
@parametrize(broker_version=str(LATEST_3_2))
@parametrize(broker_version=str(LATEST_3_1))
@ -130,6 +132,7 @@ class StreamsBrokerCompatibility(Test): @@ -130,6 +132,7 @@ class StreamsBrokerCompatibility(Test):
self.kafka.stop()
@cluster(num_nodes=4)
@parametrize(broker_version=str(LATEST_3_4))
@parametrize(broker_version=str(LATEST_3_3))
@parametrize(broker_version=str(LATEST_3_2))
@parametrize(broker_version=str(LATEST_3_1))

9
tests/kafkatest/tests/streams/streams_upgrade_test.py

@ -26,7 +26,7 @@ from kafkatest.services.zookeeper import ZookeeperService @@ -26,7 +26,7 @@ from kafkatest.services.zookeeper import ZookeeperService
from kafkatest.tests.streams.utils import extract_generation_from_logs, extract_generation_id
from kafkatest.version import LATEST_0_10_0, LATEST_0_10_1, LATEST_0_10_2, LATEST_0_11_0, LATEST_1_0, LATEST_1_1, \
LATEST_2_0, LATEST_2_1, LATEST_2_2, LATEST_2_3, LATEST_2_4, LATEST_2_5, LATEST_2_6, LATEST_2_7, LATEST_2_8, \
LATEST_3_0, LATEST_3_1, LATEST_3_2, LATEST_3_3, DEV_BRANCH, DEV_VERSION, KafkaVersion
LATEST_3_0, LATEST_3_1, LATEST_3_2, LATEST_3_3, LATEST_3_4, DEV_BRANCH, DEV_VERSION, KafkaVersion
# broker 0.10.0 is not compatible with newer Kafka Streams versions
# broker 0.10.1 and 0.10.2 do not support headers, as required by suppress() (since v2.2.1)
@ -34,8 +34,7 @@ broker_upgrade_versions = [str(LATEST_0_11_0), str(LATEST_1_0), str(LATEST_1_1), @@ -34,8 +34,7 @@ broker_upgrade_versions = [str(LATEST_0_11_0), str(LATEST_1_0), str(LATEST_1_1),
str(LATEST_2_0), str(LATEST_2_1), str(LATEST_2_2), str(LATEST_2_3),
str(LATEST_2_4), str(LATEST_2_5), str(LATEST_2_6), str(LATEST_2_7),
str(LATEST_2_8), str(LATEST_3_0), str(LATEST_3_1), str(LATEST_3_2),
str(LATEST_3_3),
str(DEV_BRANCH)]
str(LATEST_3_3), str(LATEST_3_4), str(DEV_BRANCH)]
metadata_1_versions = [str(LATEST_0_10_0)]
metadata_2_versions = [str(LATEST_0_10_1), str(LATEST_0_10_2), str(LATEST_0_11_0), str(LATEST_1_0), str(LATEST_1_1),
@ -45,7 +44,7 @@ metadata_2_versions = [str(LATEST_0_10_1), str(LATEST_0_10_2), str(LATEST_0_11_0 @@ -45,7 +44,7 @@ metadata_2_versions = [str(LATEST_0_10_1), str(LATEST_0_10_2), str(LATEST_0_11_0
# -> https://issues.apache.org/jira/browse/KAFKA-14646
# thus, we cannot test two bounce rolling upgrade because we know it's broken
# instead we add version 2.4...3.3 to the `metadata_2_versions` upgrade list
#fk_join_versions = [str(LATEST_3_4)]
fk_join_versions = [str(LATEST_3_4)]
"""
@ -204,7 +203,7 @@ class StreamsUpgradeTest(Test): @@ -204,7 +203,7 @@ class StreamsUpgradeTest(Test):
@cluster(num_nodes=6)
@matrix(from_version=metadata_1_versions, to_version=[str(DEV_VERSION)])
@matrix(from_version=metadata_2_versions, to_version=[str(DEV_VERSION)])
#@matrix(from_version=fk_join_versions, to_version=[str(DEV_VERSION)])
@matrix(from_version=fk_join_versions, to_version=[str(DEV_VERSION)])
def test_rolling_upgrade_with_2_bounces(self, from_version, to_version):
"""
This test verifies that the cluster successfully upgrades despite changes in the metadata and FK

Loading…
Cancel
Save