David Jacot
3 years ago
committed by
GitHub
14 changed files with 2519 additions and 6 deletions
@ -0,0 +1,299 @@
@@ -0,0 +1,299 @@
|
||||
/* |
||||
* Licensed to the Apache Software Foundation (ASF) under one or more |
||||
* contributor license agreements. See the NOTICE file distributed with |
||||
* this work for additional information regarding copyright ownership. |
||||
* The ASF licenses this file to You under the Apache License, Version 2.0 |
||||
* (the "License"); you may not use this file except in compliance with |
||||
* the License. You may obtain a copy of the License at |
||||
* |
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
* |
||||
* Unless required by applicable law or agreed to in writing, software |
||||
* distributed under the License is distributed on an "AS IS" BASIS, |
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
||||
* See the License for the specific language governing permissions and |
||||
* limitations under the License. |
||||
*/ |
||||
package org.apache.kafka.streams.tests; |
||||
|
||||
import org.apache.kafka.common.serialization.Serdes; |
||||
import org.apache.kafka.common.utils.Bytes; |
||||
import org.apache.kafka.common.utils.KafkaThread; |
||||
import org.apache.kafka.common.utils.Utils; |
||||
import org.apache.kafka.streams.KafkaStreams; |
||||
import org.apache.kafka.streams.KeyValue; |
||||
import org.apache.kafka.streams.StreamsBuilder; |
||||
import org.apache.kafka.streams.StreamsConfig; |
||||
import org.apache.kafka.streams.Topology; |
||||
import org.apache.kafka.streams.errors.StreamsUncaughtExceptionHandler; |
||||
import org.apache.kafka.streams.kstream.Consumed; |
||||
import org.apache.kafka.streams.kstream.Grouped; |
||||
import org.apache.kafka.streams.kstream.KGroupedStream; |
||||
import org.apache.kafka.streams.kstream.KStream; |
||||
import org.apache.kafka.streams.kstream.KTable; |
||||
import org.apache.kafka.streams.kstream.Materialized; |
||||
import org.apache.kafka.streams.kstream.Produced; |
||||
import org.apache.kafka.streams.kstream.Suppressed.BufferConfig; |
||||
import org.apache.kafka.streams.kstream.TimeWindows; |
||||
import org.apache.kafka.streams.kstream.Windowed; |
||||
import org.apache.kafka.streams.state.Stores; |
||||
import org.apache.kafka.streams.state.WindowStore; |
||||
|
||||
import java.io.File; |
||||
import java.io.IOException; |
||||
import java.nio.file.Files; |
||||
import java.time.Duration; |
||||
import java.time.Instant; |
||||
import java.util.Properties; |
||||
import java.util.concurrent.CountDownLatch; |
||||
import java.util.concurrent.TimeUnit; |
||||
|
||||
import static org.apache.kafka.streams.kstream.Suppressed.untilWindowCloses; |
||||
|
||||
public class SmokeTestClient extends SmokeTestUtil { |
||||
|
||||
private final String name; |
||||
|
||||
private KafkaStreams streams; |
||||
private boolean uncaughtException = false; |
||||
private boolean started; |
||||
private volatile boolean closed; |
||||
|
||||
private static void addShutdownHook(final String name, final Runnable runnable) { |
||||
if (name != null) { |
||||
Runtime.getRuntime().addShutdownHook(KafkaThread.nonDaemon(name, runnable)); |
||||
} else { |
||||
Runtime.getRuntime().addShutdownHook(new Thread(runnable)); |
||||
} |
||||
} |
||||
|
||||
private static File tempDirectory() { |
||||
final String prefix = "kafka-"; |
||||
final File file; |
||||
try { |
||||
file = Files.createTempDirectory(prefix).toFile(); |
||||
} catch (final IOException ex) { |
||||
throw new RuntimeException("Failed to create a temp dir", ex); |
||||
} |
||||
file.deleteOnExit(); |
||||
|
||||
addShutdownHook("delete-temp-file-shutdown-hook", () -> { |
||||
try { |
||||
Utils.delete(file); |
||||
} catch (final IOException e) { |
||||
System.out.println("Error deleting " + file.getAbsolutePath()); |
||||
e.printStackTrace(System.out); |
||||
} |
||||
}); |
||||
|
||||
return file; |
||||
} |
||||
|
||||
public SmokeTestClient(final String name) { |
||||
this.name = name; |
||||
} |
||||
|
||||
public boolean started() { |
||||
return started; |
||||
} |
||||
|
||||
public boolean closed() { |
||||
return closed; |
||||
} |
||||
|
||||
public void start(final Properties streamsProperties) { |
||||
final Topology build = getTopology(); |
||||
streams = new KafkaStreams(build, getStreamsConfig(streamsProperties)); |
||||
|
||||
final CountDownLatch countDownLatch = new CountDownLatch(1); |
||||
streams.setStateListener((newState, oldState) -> { |
||||
System.out.printf("%s %s: %s -> %s%n", name, Instant.now(), oldState, newState); |
||||
if (oldState == KafkaStreams.State.REBALANCING && newState == KafkaStreams.State.RUNNING) { |
||||
started = true; |
||||
countDownLatch.countDown(); |
||||
} |
||||
|
||||
if (newState == KafkaStreams.State.NOT_RUNNING) { |
||||
closed = true; |
||||
} |
||||
}); |
||||
|
||||
streams.setUncaughtExceptionHandler(e -> { |
||||
System.out.println(name + ": SMOKE-TEST-CLIENT-EXCEPTION"); |
||||
System.out.println(name + ": FATAL: An unexpected exception is encountered: " + e); |
||||
e.printStackTrace(System.out); |
||||
uncaughtException = true; |
||||
return StreamsUncaughtExceptionHandler.StreamThreadExceptionResponse.SHUTDOWN_CLIENT; |
||||
}); |
||||
|
||||
addShutdownHook("streams-shutdown-hook", this::close); |
||||
|
||||
streams.start(); |
||||
try { |
||||
if (!countDownLatch.await(1, TimeUnit.MINUTES)) { |
||||
System.out.println(name + ": SMOKE-TEST-CLIENT-EXCEPTION: Didn't start in one minute"); |
||||
} |
||||
} catch (final InterruptedException e) { |
||||
System.out.println(name + ": SMOKE-TEST-CLIENT-EXCEPTION: " + e); |
||||
e.printStackTrace(System.out); |
||||
} |
||||
System.out.println(name + ": SMOKE-TEST-CLIENT-STARTED"); |
||||
System.out.println(name + " started at " + Instant.now()); |
||||
} |
||||
|
||||
public void closeAsync() { |
||||
streams.close(Duration.ZERO); |
||||
} |
||||
|
||||
public void close() { |
||||
final boolean closed = streams.close(Duration.ofMinutes(1)); |
||||
|
||||
if (closed && !uncaughtException) { |
||||
System.out.println(name + ": SMOKE-TEST-CLIENT-CLOSED"); |
||||
} else if (closed) { |
||||
System.out.println(name + ": SMOKE-TEST-CLIENT-EXCEPTION"); |
||||
} else { |
||||
System.out.println(name + ": SMOKE-TEST-CLIENT-EXCEPTION: Didn't close"); |
||||
} |
||||
} |
||||
|
||||
private Properties getStreamsConfig(final Properties props) { |
||||
final Properties fullProps = new Properties(props); |
||||
fullProps.put(StreamsConfig.APPLICATION_ID_CONFIG, "SmokeTest"); |
||||
fullProps.put(StreamsConfig.CLIENT_ID_CONFIG, "SmokeTest-" + name); |
||||
fullProps.put(StreamsConfig.STATE_DIR_CONFIG, tempDirectory().getAbsolutePath()); |
||||
fullProps.putAll(props); |
||||
return fullProps; |
||||
} |
||||
|
||||
public Topology getTopology() { |
||||
final StreamsBuilder builder = new StreamsBuilder(); |
||||
final Consumed<String, Integer> stringIntConsumed = Consumed.with(stringSerde, intSerde); |
||||
final KStream<String, Integer> source = builder.stream("data", stringIntConsumed); |
||||
source.filterNot((k, v) -> k.equals("flush")) |
||||
.to("echo", Produced.with(stringSerde, intSerde)); |
||||
final KStream<String, Integer> data = source.filter((key, value) -> value == null || value != END); |
||||
data.process(SmokeTestUtil.printProcessorSupplier("data", name)); |
||||
|
||||
// min
|
||||
final KGroupedStream<String, Integer> groupedData = data.groupByKey(Grouped.with(stringSerde, intSerde)); |
||||
|
||||
final KTable<Windowed<String>, Integer> minAggregation = groupedData |
||||
.windowedBy(TimeWindows.ofSizeAndGrace(Duration.ofDays(1), Duration.ofMinutes(1))) |
||||
.aggregate( |
||||
() -> Integer.MAX_VALUE, |
||||
(aggKey, value, aggregate) -> (value < aggregate) ? value : aggregate, |
||||
Materialized |
||||
.<String, Integer, WindowStore<Bytes, byte[]>>as("uwin-min") |
||||
.withValueSerde(intSerde) |
||||
.withRetention(Duration.ofHours(25)) |
||||
); |
||||
|
||||
streamify(minAggregation, "min-raw"); |
||||
|
||||
streamify(minAggregation.suppress(untilWindowCloses(BufferConfig.unbounded())), "min-suppressed"); |
||||
|
||||
minAggregation |
||||
.toStream(new Unwindow<>()) |
||||
.filterNot((k, v) -> k.equals("flush")) |
||||
.to("min", Produced.with(stringSerde, intSerde)); |
||||
|
||||
final KTable<Windowed<String>, Integer> smallWindowSum = groupedData |
||||
.windowedBy(TimeWindows.ofSizeAndGrace(Duration.ofSeconds(2), Duration.ofSeconds(30)).advanceBy(Duration.ofSeconds(1))) |
||||
.reduce(Integer::sum); |
||||
|
||||
streamify(smallWindowSum, "sws-raw"); |
||||
streamify(smallWindowSum.suppress(untilWindowCloses(BufferConfig.unbounded())), "sws-suppressed"); |
||||
|
||||
final KTable<String, Integer> minTable = builder.table( |
||||
"min", |
||||
Consumed.with(stringSerde, intSerde), |
||||
Materialized.as("minStoreName")); |
||||
|
||||
minTable.toStream().process(SmokeTestUtil.printProcessorSupplier("min", name)); |
||||
|
||||
// max
|
||||
groupedData |
||||
.windowedBy(TimeWindows.ofSizeWithNoGrace(Duration.ofDays(2))) |
||||
.aggregate( |
||||
() -> Integer.MIN_VALUE, |
||||
(aggKey, value, aggregate) -> (value > aggregate) ? value : aggregate, |
||||
Materialized.<String, Integer, WindowStore<Bytes, byte[]>>as("uwin-max").withValueSerde(intSerde)) |
||||
.toStream(new Unwindow<>()) |
||||
.filterNot((k, v) -> k.equals("flush")) |
||||
.to("max", Produced.with(stringSerde, intSerde)); |
||||
|
||||
final KTable<String, Integer> maxTable = builder.table( |
||||
"max", |
||||
Consumed.with(stringSerde, intSerde), |
||||
Materialized.as("maxStoreName")); |
||||
maxTable.toStream().process(SmokeTestUtil.printProcessorSupplier("max", name)); |
||||
|
||||
// sum
|
||||
groupedData |
||||
.windowedBy(TimeWindows.ofSizeWithNoGrace(Duration.ofDays(2))) |
||||
.aggregate( |
||||
() -> 0L, |
||||
(aggKey, value, aggregate) -> (long) value + aggregate, |
||||
Materialized.<String, Long, WindowStore<Bytes, byte[]>>as("win-sum").withValueSerde(longSerde)) |
||||
.toStream(new Unwindow<>()) |
||||
.filterNot((k, v) -> k.equals("flush")) |
||||
.to("sum", Produced.with(stringSerde, longSerde)); |
||||
|
||||
final Consumed<String, Long> stringLongConsumed = Consumed.with(stringSerde, longSerde); |
||||
final KTable<String, Long> sumTable = builder.table("sum", stringLongConsumed); |
||||
sumTable.toStream().process(SmokeTestUtil.printProcessorSupplier("sum", name)); |
||||
|
||||
// cnt
|
||||
groupedData |
||||
.windowedBy(TimeWindows.ofSizeWithNoGrace(Duration.ofDays(2))) |
||||
.count(Materialized.as("uwin-cnt")) |
||||
.toStream(new Unwindow<>()) |
||||
.filterNot((k, v) -> k.equals("flush")) |
||||
.to("cnt", Produced.with(stringSerde, longSerde)); |
||||
|
||||
final KTable<String, Long> cntTable = builder.table( |
||||
"cnt", |
||||
Consumed.with(stringSerde, longSerde), |
||||
Materialized.as("cntStoreName")); |
||||
cntTable.toStream().process(SmokeTestUtil.printProcessorSupplier("cnt", name)); |
||||
|
||||
// dif
|
||||
maxTable |
||||
.join( |
||||
minTable, |
||||
(value1, value2) -> value1 - value2) |
||||
.toStream() |
||||
.filterNot((k, v) -> k.equals("flush")) |
||||
.to("dif", Produced.with(stringSerde, intSerde)); |
||||
|
||||
// avg
|
||||
sumTable |
||||
.join( |
||||
cntTable, |
||||
(value1, value2) -> (double) value1 / (double) value2) |
||||
.toStream() |
||||
.filterNot((k, v) -> k.equals("flush")) |
||||
.to("avg", Produced.with(stringSerde, doubleSerde)); |
||||
|
||||
// test repartition
|
||||
final Agg agg = new Agg(); |
||||
cntTable.groupBy(agg.selector(), Grouped.with(stringSerde, longSerde)) |
||||
.aggregate(agg.init(), agg.adder(), agg.remover(), |
||||
Materialized.<String, Long>as(Stores.inMemoryKeyValueStore("cntByCnt")) |
||||
.withKeySerde(Serdes.String()) |
||||
.withValueSerde(Serdes.Long())) |
||||
.toStream() |
||||
.to("tagg", Produced.with(stringSerde, longSerde)); |
||||
|
||||
return builder.build(); |
||||
} |
||||
|
||||
private static void streamify(final KTable<Windowed<String>, Integer> windowedTable, final String topic) { |
||||
windowedTable |
||||
.toStream() |
||||
.filterNot((k, v) -> k.key().equals("flush")) |
||||
.map((key, value) -> new KeyValue<>(key.toString(), value)) |
||||
.to(topic, Produced.with(stringSerde, intSerde)); |
||||
} |
||||
} |
@ -0,0 +1,622 @@
@@ -0,0 +1,622 @@
|
||||
/* |
||||
* Licensed to the Apache Software Foundation (ASF) under one or more |
||||
* contributor license agreements. See the NOTICE file distributed with |
||||
* this work for additional information regarding copyright ownership. |
||||
* The ASF licenses this file to You under the Apache License, Version 2.0 |
||||
* (the "License"); you may not use this file except in compliance with |
||||
* the License. You may obtain a copy of the License at |
||||
* |
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
* |
||||
* Unless required by applicable law or agreed to in writing, software |
||||
* distributed under the License is distributed on an "AS IS" BASIS, |
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
||||
* See the License for the specific language governing permissions and |
||||
* limitations under the License. |
||||
*/ |
||||
package org.apache.kafka.streams.tests; |
||||
|
||||
import org.apache.kafka.clients.consumer.ConsumerConfig; |
||||
import org.apache.kafka.clients.consumer.ConsumerRecord; |
||||
import org.apache.kafka.clients.consumer.ConsumerRecords; |
||||
import org.apache.kafka.clients.consumer.KafkaConsumer; |
||||
import org.apache.kafka.clients.producer.Callback; |
||||
import org.apache.kafka.clients.producer.KafkaProducer; |
||||
import org.apache.kafka.clients.producer.ProducerConfig; |
||||
import org.apache.kafka.clients.producer.ProducerRecord; |
||||
import org.apache.kafka.clients.producer.RecordMetadata; |
||||
import org.apache.kafka.common.PartitionInfo; |
||||
import org.apache.kafka.common.TopicPartition; |
||||
import org.apache.kafka.common.errors.TimeoutException; |
||||
import org.apache.kafka.common.serialization.ByteArraySerializer; |
||||
import org.apache.kafka.common.serialization.Deserializer; |
||||
import org.apache.kafka.common.serialization.StringDeserializer; |
||||
import org.apache.kafka.common.utils.Exit; |
||||
import org.apache.kafka.common.utils.Utils; |
||||
|
||||
import java.io.ByteArrayOutputStream; |
||||
import java.io.PrintStream; |
||||
import java.nio.charset.StandardCharsets; |
||||
import java.time.Duration; |
||||
import java.time.Instant; |
||||
import java.util.ArrayList; |
||||
import java.util.Arrays; |
||||
import java.util.Collections; |
||||
import java.util.HashMap; |
||||
import java.util.HashSet; |
||||
import java.util.LinkedList; |
||||
import java.util.List; |
||||
import java.util.Map; |
||||
import java.util.Properties; |
||||
import java.util.Random; |
||||
import java.util.Set; |
||||
import java.util.concurrent.TimeUnit; |
||||
import java.util.concurrent.atomic.AtomicInteger; |
||||
import java.util.function.Function; |
||||
import java.util.stream.Collectors; |
||||
import java.util.stream.Stream; |
||||
|
||||
import static java.util.Collections.emptyMap; |
||||
import static org.apache.kafka.common.utils.Utils.mkEntry; |
||||
|
||||
public class SmokeTestDriver extends SmokeTestUtil { |
||||
private static final String[] TOPICS = { |
||||
"data", |
||||
"echo", |
||||
"max", |
||||
"min", "min-suppressed", "min-raw", |
||||
"dif", |
||||
"sum", |
||||
"sws-raw", "sws-suppressed", |
||||
"cnt", |
||||
"avg", |
||||
"tagg" |
||||
}; |
||||
|
||||
private static final int MAX_RECORD_EMPTY_RETRIES = 30; |
||||
|
||||
private static class ValueList { |
||||
public final String key; |
||||
private final int[] values; |
||||
private int index; |
||||
|
||||
ValueList(final int min, final int max) { |
||||
key = min + "-" + max; |
||||
|
||||
values = new int[max - min + 1]; |
||||
for (int i = 0; i < values.length; i++) { |
||||
values[i] = min + i; |
||||
} |
||||
// We want to randomize the order of data to test not completely predictable processing order
|
||||
// However, values are also use as a timestamp of the record. (TODO: separate data and timestamp)
|
||||
// We keep some correlation of time and order. Thus, the shuffling is done with a sliding window
|
||||
shuffle(values, 10); |
||||
|
||||
index = 0; |
||||
} |
||||
|
||||
int next() { |
||||
return (index < values.length) ? values[index++] : -1; |
||||
} |
||||
} |
||||
|
||||
public static String[] topics() { |
||||
return Arrays.copyOf(TOPICS, TOPICS.length); |
||||
} |
||||
|
||||
static void generatePerpetually(final String kafka, |
||||
final int numKeys, |
||||
final int maxRecordsPerKey) { |
||||
final Properties producerProps = generatorProperties(kafka); |
||||
|
||||
int numRecordsProduced = 0; |
||||
|
||||
final ValueList[] data = new ValueList[numKeys]; |
||||
for (int i = 0; i < numKeys; i++) { |
||||
data[i] = new ValueList(i, i + maxRecordsPerKey - 1); |
||||
} |
||||
|
||||
final Random rand = new Random(); |
||||
|
||||
try (final KafkaProducer<byte[], byte[]> producer = new KafkaProducer<>(producerProps)) { |
||||
while (true) { |
||||
final int index = rand.nextInt(numKeys); |
||||
final String key = data[index].key; |
||||
final int value = data[index].next(); |
||||
|
||||
final ProducerRecord<byte[], byte[]> record = |
||||
new ProducerRecord<>( |
||||
"data", |
||||
stringSerde.serializer().serialize("", key), |
||||
intSerde.serializer().serialize("", value) |
||||
); |
||||
|
||||
producer.send(record); |
||||
|
||||
numRecordsProduced++; |
||||
if (numRecordsProduced % 100 == 0) { |
||||
System.out.println(Instant.now() + " " + numRecordsProduced + " records produced"); |
||||
} |
||||
Utils.sleep(2); |
||||
} |
||||
} |
||||
} |
||||
|
||||
public static Map<String, Set<Integer>> generate(final String kafka, |
||||
final int numKeys, |
||||
final int maxRecordsPerKey, |
||||
final Duration timeToSpend) { |
||||
final Properties producerProps = generatorProperties(kafka); |
||||
|
||||
|
||||
int numRecordsProduced = 0; |
||||
|
||||
final Map<String, Set<Integer>> allData = new HashMap<>(); |
||||
final ValueList[] data = new ValueList[numKeys]; |
||||
for (int i = 0; i < numKeys; i++) { |
||||
data[i] = new ValueList(i, i + maxRecordsPerKey - 1); |
||||
allData.put(data[i].key, new HashSet<>()); |
||||
} |
||||
final Random rand = new Random(); |
||||
|
||||
int remaining = data.length; |
||||
|
||||
final long recordPauseTime = timeToSpend.toMillis() / numKeys / maxRecordsPerKey; |
||||
|
||||
List<ProducerRecord<byte[], byte[]>> needRetry = new ArrayList<>(); |
||||
|
||||
try (final KafkaProducer<byte[], byte[]> producer = new KafkaProducer<>(producerProps)) { |
||||
while (remaining > 0) { |
||||
final int index = rand.nextInt(remaining); |
||||
final String key = data[index].key; |
||||
final int value = data[index].next(); |
||||
|
||||
if (value < 0) { |
||||
remaining--; |
||||
data[index] = data[remaining]; |
||||
} else { |
||||
|
||||
final ProducerRecord<byte[], byte[]> record = |
||||
new ProducerRecord<>( |
||||
"data", |
||||
stringSerde.serializer().serialize("", key), |
||||
intSerde.serializer().serialize("", value) |
||||
); |
||||
|
||||
producer.send(record, new TestCallback(record, needRetry)); |
||||
|
||||
numRecordsProduced++; |
||||
allData.get(key).add(value); |
||||
if (numRecordsProduced % 100 == 0) { |
||||
System.out.println(Instant.now() + " " + numRecordsProduced + " records produced"); |
||||
} |
||||
Utils.sleep(Math.max(recordPauseTime, 2)); |
||||
} |
||||
} |
||||
producer.flush(); |
||||
|
||||
int remainingRetries = 5; |
||||
while (!needRetry.isEmpty()) { |
||||
final List<ProducerRecord<byte[], byte[]>> needRetry2 = new ArrayList<>(); |
||||
for (final ProducerRecord<byte[], byte[]> record : needRetry) { |
||||
System.out.println("retry producing " + stringSerde.deserializer().deserialize("", record.key())); |
||||
producer.send(record, new TestCallback(record, needRetry2)); |
||||
} |
||||
producer.flush(); |
||||
needRetry = needRetry2; |
||||
|
||||
if (--remainingRetries == 0 && !needRetry.isEmpty()) { |
||||
System.err.println("Failed to produce all records after multiple retries"); |
||||
Exit.exit(1); |
||||
} |
||||
} |
||||
|
||||
// now that we've sent everything, we'll send some final records with a timestamp high enough to flush out
|
||||
// all suppressed records.
|
||||
final List<PartitionInfo> partitions = producer.partitionsFor("data"); |
||||
for (final PartitionInfo partition : partitions) { |
||||
producer.send(new ProducerRecord<>( |
||||
partition.topic(), |
||||
partition.partition(), |
||||
System.currentTimeMillis() + Duration.ofDays(2).toMillis(), |
||||
stringSerde.serializer().serialize("", "flush"), |
||||
intSerde.serializer().serialize("", 0) |
||||
)); |
||||
} |
||||
} |
||||
return Collections.unmodifiableMap(allData); |
||||
} |
||||
|
||||
private static Properties generatorProperties(final String kafka) { |
||||
final Properties producerProps = new Properties(); |
||||
producerProps.put(ProducerConfig.CLIENT_ID_CONFIG, "SmokeTest"); |
||||
producerProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, kafka); |
||||
producerProps.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, ByteArraySerializer.class); |
||||
producerProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, ByteArraySerializer.class); |
||||
producerProps.put(ProducerConfig.ACKS_CONFIG, "all"); |
||||
return producerProps; |
||||
} |
||||
|
||||
private static class TestCallback implements Callback { |
||||
private final ProducerRecord<byte[], byte[]> originalRecord; |
||||
private final List<ProducerRecord<byte[], byte[]>> needRetry; |
||||
|
||||
TestCallback(final ProducerRecord<byte[], byte[]> originalRecord, |
||||
final List<ProducerRecord<byte[], byte[]>> needRetry) { |
||||
this.originalRecord = originalRecord; |
||||
this.needRetry = needRetry; |
||||
} |
||||
|
||||
@Override |
||||
public void onCompletion(final RecordMetadata metadata, final Exception exception) { |
||||
if (exception != null) { |
||||
if (exception instanceof TimeoutException) { |
||||
needRetry.add(originalRecord); |
||||
} else { |
||||
exception.printStackTrace(); |
||||
Exit.exit(1); |
||||
} |
||||
} |
||||
} |
||||
} |
||||
|
||||
private static void shuffle(final int[] data, @SuppressWarnings("SameParameterValue") final int windowSize) { |
||||
final Random rand = new Random(); |
||||
for (int i = 0; i < data.length; i++) { |
||||
// we shuffle data within windowSize
|
||||
final int j = rand.nextInt(Math.min(data.length - i, windowSize)) + i; |
||||
|
||||
// swap
|
||||
final int tmp = data[i]; |
||||
data[i] = data[j]; |
||||
data[j] = tmp; |
||||
} |
||||
} |
||||
|
||||
public static class NumberDeserializer implements Deserializer<Number> { |
||||
@Override |
||||
public Number deserialize(final String topic, final byte[] data) { |
||||
final Number value; |
||||
switch (topic) { |
||||
case "data": |
||||
case "echo": |
||||
case "min": |
||||
case "min-raw": |
||||
case "min-suppressed": |
||||
case "sws-raw": |
||||
case "sws-suppressed": |
||||
case "max": |
||||
case "dif": |
||||
value = intSerde.deserializer().deserialize(topic, data); |
||||
break; |
||||
case "sum": |
||||
case "cnt": |
||||
case "tagg": |
||||
value = longSerde.deserializer().deserialize(topic, data); |
||||
break; |
||||
case "avg": |
||||
value = doubleSerde.deserializer().deserialize(topic, data); |
||||
break; |
||||
default: |
||||
throw new RuntimeException("unknown topic: " + topic); |
||||
} |
||||
return value; |
||||
} |
||||
} |
||||
|
||||
public static VerificationResult verify(final String kafka, |
||||
final Map<String, Set<Integer>> inputs, |
||||
final int maxRecordsPerKey) { |
||||
final Properties props = new Properties(); |
||||
props.put(ConsumerConfig.CLIENT_ID_CONFIG, "verifier"); |
||||
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, kafka); |
||||
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class); |
||||
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, NumberDeserializer.class); |
||||
props.put(ConsumerConfig.ISOLATION_LEVEL_CONFIG, "read_committed"); |
||||
|
||||
final KafkaConsumer<String, Number> consumer = new KafkaConsumer<>(props); |
||||
final List<TopicPartition> partitions = getAllPartitions(consumer, TOPICS); |
||||
consumer.assign(partitions); |
||||
consumer.seekToBeginning(partitions); |
||||
|
||||
final int recordsGenerated = inputs.size() * maxRecordsPerKey; |
||||
int recordsProcessed = 0; |
||||
final Map<String, AtomicInteger> processed = |
||||
Stream.of(TOPICS) |
||||
.collect(Collectors.toMap(t -> t, t -> new AtomicInteger(0))); |
||||
|
||||
final Map<String, Map<String, LinkedList<ConsumerRecord<String, Number>>>> events = new HashMap<>(); |
||||
|
||||
VerificationResult verificationResult = new VerificationResult(false, "no results yet"); |
||||
int retry = 0; |
||||
final long start = System.currentTimeMillis(); |
||||
while (System.currentTimeMillis() - start < TimeUnit.MINUTES.toMillis(6)) { |
||||
final ConsumerRecords<String, Number> records = consumer.poll(Duration.ofSeconds(5)); |
||||
if (records.isEmpty() && recordsProcessed >= recordsGenerated) { |
||||
verificationResult = verifyAll(inputs, events, false); |
||||
if (verificationResult.passed()) { |
||||
break; |
||||
} else if (retry++ > MAX_RECORD_EMPTY_RETRIES) { |
||||
System.out.println(Instant.now() + " Didn't get any more results, verification hasn't passed, and out of retries."); |
||||
break; |
||||
} else { |
||||
System.out.println(Instant.now() + " Didn't get any more results, but verification hasn't passed (yet). Retrying..." + retry); |
||||
} |
||||
} else { |
||||
System.out.println(Instant.now() + " Get some more results from " + records.partitions() + ", resetting retry."); |
||||
|
||||
retry = 0; |
||||
for (final ConsumerRecord<String, Number> record : records) { |
||||
final String key = record.key(); |
||||
|
||||
final String topic = record.topic(); |
||||
processed.get(topic).incrementAndGet(); |
||||
|
||||
if (topic.equals("echo")) { |
||||
recordsProcessed++; |
||||
if (recordsProcessed % 100 == 0) { |
||||
System.out.println("Echo records processed = " + recordsProcessed); |
||||
} |
||||
} |
||||
|
||||
events.computeIfAbsent(topic, t -> new HashMap<>()) |
||||
.computeIfAbsent(key, k -> new LinkedList<>()) |
||||
.add(record); |
||||
} |
||||
|
||||
System.out.println(processed); |
||||
} |
||||
} |
||||
consumer.close(); |
||||
final long finished = System.currentTimeMillis() - start; |
||||
System.out.println("Verification time=" + finished); |
||||
System.out.println("-------------------"); |
||||
System.out.println("Result Verification"); |
||||
System.out.println("-------------------"); |
||||
System.out.println("recordGenerated=" + recordsGenerated); |
||||
System.out.println("recordProcessed=" + recordsProcessed); |
||||
|
||||
if (recordsProcessed > recordsGenerated) { |
||||
System.out.println("PROCESSED-MORE-THAN-GENERATED"); |
||||
} else if (recordsProcessed < recordsGenerated) { |
||||
System.out.println("PROCESSED-LESS-THAN-GENERATED"); |
||||
} |
||||
|
||||
boolean success; |
||||
|
||||
final Map<String, Set<Number>> received = |
||||
events.get("echo") |
||||
.entrySet() |
||||
.stream() |
||||
.map(entry -> mkEntry( |
||||
entry.getKey(), |
||||
entry.getValue().stream().map(ConsumerRecord::value).collect(Collectors.toSet())) |
||||
) |
||||
.collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue)); |
||||
|
||||
success = inputs.equals(received); |
||||
|
||||
if (success) { |
||||
System.out.println("ALL-RECORDS-DELIVERED"); |
||||
} else { |
||||
int missedCount = 0; |
||||
for (final Map.Entry<String, Set<Integer>> entry : inputs.entrySet()) { |
||||
missedCount += received.get(entry.getKey()).size(); |
||||
} |
||||
System.out.println("missedRecords=" + missedCount); |
||||
} |
||||
|
||||
// give it one more try if it's not already passing.
|
||||
if (!verificationResult.passed()) { |
||||
verificationResult = verifyAll(inputs, events, true); |
||||
} |
||||
success &= verificationResult.passed(); |
||||
|
||||
System.out.println(verificationResult.result()); |
||||
|
||||
System.out.println(success ? "SUCCESS" : "FAILURE"); |
||||
return verificationResult; |
||||
} |
||||
|
||||
public static class VerificationResult { |
||||
private final boolean passed; |
||||
private final String result; |
||||
|
||||
VerificationResult(final boolean passed, final String result) { |
||||
this.passed = passed; |
||||
this.result = result; |
||||
} |
||||
|
||||
public boolean passed() { |
||||
return passed; |
||||
} |
||||
|
||||
public String result() { |
||||
return result; |
||||
} |
||||
} |
||||
|
||||
private static VerificationResult verifyAll(final Map<String, Set<Integer>> inputs, |
||||
final Map<String, Map<String, LinkedList<ConsumerRecord<String, Number>>>> events, |
||||
final boolean printResults) { |
||||
final ByteArrayOutputStream byteArrayOutputStream = new ByteArrayOutputStream(); |
||||
boolean pass; |
||||
try (final PrintStream resultStream = new PrintStream(byteArrayOutputStream)) { |
||||
pass = verifyTAgg(resultStream, inputs, events.get("tagg"), printResults); |
||||
pass &= verifySuppressed(resultStream, "min-suppressed", events, printResults); |
||||
pass &= verify(resultStream, "min-suppressed", inputs, events, windowedKey -> { |
||||
final String unwindowedKey = windowedKey.substring(1, windowedKey.length() - 1).replaceAll("@.*", ""); |
||||
return getMin(unwindowedKey); |
||||
}, printResults); |
||||
pass &= verifySuppressed(resultStream, "sws-suppressed", events, printResults); |
||||
pass &= verify(resultStream, "min", inputs, events, SmokeTestDriver::getMin, printResults); |
||||
pass &= verify(resultStream, "max", inputs, events, SmokeTestDriver::getMax, printResults); |
||||
pass &= verify(resultStream, "dif", inputs, events, key -> getMax(key).intValue() - getMin(key).intValue(), printResults); |
||||
pass &= verify(resultStream, "sum", inputs, events, SmokeTestDriver::getSum, printResults); |
||||
pass &= verify(resultStream, "cnt", inputs, events, key1 -> getMax(key1).intValue() - getMin(key1).intValue() + 1L, printResults); |
||||
pass &= verify(resultStream, "avg", inputs, events, SmokeTestDriver::getAvg, printResults); |
||||
} |
||||
return new VerificationResult(pass, new String(byteArrayOutputStream.toByteArray(), StandardCharsets.UTF_8)); |
||||
} |
||||
|
||||
private static boolean verify(final PrintStream resultStream, |
||||
final String topic, |
||||
final Map<String, Set<Integer>> inputData, |
||||
final Map<String, Map<String, LinkedList<ConsumerRecord<String, Number>>>> events, |
||||
final Function<String, Number> keyToExpectation, |
||||
final boolean printResults) { |
||||
final Map<String, LinkedList<ConsumerRecord<String, Number>>> observedInputEvents = events.get("data"); |
||||
final Map<String, LinkedList<ConsumerRecord<String, Number>>> outputEvents = events.getOrDefault(topic, emptyMap()); |
||||
if (outputEvents.isEmpty()) { |
||||
resultStream.println(topic + " is empty"); |
||||
return false; |
||||
} else { |
||||
resultStream.printf("verifying %s with %d keys%n", topic, outputEvents.size()); |
||||
|
||||
if (outputEvents.size() != inputData.size()) { |
||||
resultStream.printf("fail: resultCount=%d expectedCount=%s%n\tresult=%s%n\texpected=%s%n", |
||||
outputEvents.size(), inputData.size(), outputEvents.keySet(), inputData.keySet()); |
||||
return false; |
||||
} |
||||
for (final Map.Entry<String, LinkedList<ConsumerRecord<String, Number>>> entry : outputEvents.entrySet()) { |
||||
final String key = entry.getKey(); |
||||
final Number expected = keyToExpectation.apply(key); |
||||
final Number actual = entry.getValue().getLast().value(); |
||||
if (!expected.equals(actual)) { |
||||
resultStream.printf("%s fail: key=%s actual=%s expected=%s%n", topic, key, actual, expected); |
||||
|
||||
if (printResults) { |
||||
resultStream.printf("\t inputEvents=%n%s%n\t" + |
||||
"echoEvents=%n%s%n\tmaxEvents=%n%s%n\tminEvents=%n%s%n\tdifEvents=%n%s%n\tcntEvents=%n%s%n\ttaggEvents=%n%s%n", |
||||
indent("\t\t", observedInputEvents.get(key)), |
||||
indent("\t\t", events.getOrDefault("echo", emptyMap()).getOrDefault(key, new LinkedList<>())), |
||||
indent("\t\t", events.getOrDefault("max", emptyMap()).getOrDefault(key, new LinkedList<>())), |
||||
indent("\t\t", events.getOrDefault("min", emptyMap()).getOrDefault(key, new LinkedList<>())), |
||||
indent("\t\t", events.getOrDefault("dif", emptyMap()).getOrDefault(key, new LinkedList<>())), |
||||
indent("\t\t", events.getOrDefault("cnt", emptyMap()).getOrDefault(key, new LinkedList<>())), |
||||
indent("\t\t", events.getOrDefault("tagg", emptyMap()).getOrDefault(key, new LinkedList<>()))); |
||||
|
||||
if (!Utils.mkSet("echo", "max", "min", "dif", "cnt", "tagg").contains(topic)) |
||||
resultStream.printf("%sEvents=%n%s%n", topic, indent("\t\t", entry.getValue())); |
||||
} |
||||
|
||||
return false; |
||||
} |
||||
} |
||||
return true; |
||||
} |
||||
} |
||||
|
||||
|
||||
private static boolean verifySuppressed(final PrintStream resultStream, |
||||
@SuppressWarnings("SameParameterValue") final String topic, |
||||
final Map<String, Map<String, LinkedList<ConsumerRecord<String, Number>>>> events, |
||||
final boolean printResults) { |
||||
resultStream.println("verifying suppressed " + topic); |
||||
final Map<String, LinkedList<ConsumerRecord<String, Number>>> topicEvents = events.getOrDefault(topic, emptyMap()); |
||||
for (final Map.Entry<String, LinkedList<ConsumerRecord<String, Number>>> entry : topicEvents.entrySet()) { |
||||
if (entry.getValue().size() != 1) { |
||||
final String unsuppressedTopic = topic.replace("-suppressed", "-raw"); |
||||
final String key = entry.getKey(); |
||||
final String unwindowedKey = key.substring(1, key.length() - 1).replaceAll("@.*", ""); |
||||
resultStream.printf("fail: key=%s%n\tnon-unique result:%n%s%n", |
||||
key, |
||||
indent("\t\t", entry.getValue())); |
||||
|
||||
if (printResults) |
||||
resultStream.printf("\tresultEvents:%n%s%n\tinputEvents:%n%s%n", |
||||
indent("\t\t", events.get(unsuppressedTopic).get(key)), |
||||
indent("\t\t", events.get("data").get(unwindowedKey))); |
||||
|
||||
return false; |
||||
} |
||||
} |
||||
return true; |
||||
} |
||||
|
||||
private static String indent(@SuppressWarnings("SameParameterValue") final String prefix, |
||||
final Iterable<ConsumerRecord<String, Number>> list) { |
||||
final StringBuilder stringBuilder = new StringBuilder(); |
||||
for (final ConsumerRecord<String, Number> record : list) { |
||||
stringBuilder.append(prefix).append(record).append('\n'); |
||||
} |
||||
return stringBuilder.toString(); |
||||
} |
||||
|
||||
private static Long getSum(final String key) { |
||||
final int min = getMin(key).intValue(); |
||||
final int max = getMax(key).intValue(); |
||||
return ((long) min + max) * (max - min + 1L) / 2L; |
||||
} |
||||
|
||||
private static Double getAvg(final String key) { |
||||
final int min = getMin(key).intValue(); |
||||
final int max = getMax(key).intValue(); |
||||
return ((long) min + max) / 2.0; |
||||
} |
||||
|
||||
|
||||
private static boolean verifyTAgg(final PrintStream resultStream, |
||||
final Map<String, Set<Integer>> allData, |
||||
final Map<String, LinkedList<ConsumerRecord<String, Number>>> taggEvents, |
||||
final boolean printResults) { |
||||
if (taggEvents == null) { |
||||
resultStream.println("tagg is missing"); |
||||
return false; |
||||
} else if (taggEvents.isEmpty()) { |
||||
resultStream.println("tagg is empty"); |
||||
return false; |
||||
} else { |
||||
resultStream.println("verifying tagg"); |
||||
|
||||
// generate expected answer
|
||||
final Map<String, Long> expected = new HashMap<>(); |
||||
for (final String key : allData.keySet()) { |
||||
final int min = getMin(key).intValue(); |
||||
final int max = getMax(key).intValue(); |
||||
final String cnt = Long.toString(max - min + 1L); |
||||
|
||||
expected.put(cnt, expected.getOrDefault(cnt, 0L) + 1); |
||||
} |
||||
|
||||
// check the result
|
||||
for (final Map.Entry<String, LinkedList<ConsumerRecord<String, Number>>> entry : taggEvents.entrySet()) { |
||||
final String key = entry.getKey(); |
||||
Long expectedCount = expected.remove(key); |
||||
if (expectedCount == null) { |
||||
expectedCount = 0L; |
||||
} |
||||
|
||||
if (entry.getValue().getLast().value().longValue() != expectedCount) { |
||||
resultStream.println("fail: key=" + key + " tagg=" + entry.getValue() + " expected=" + expectedCount); |
||||
|
||||
if (printResults) |
||||
resultStream.println("\t taggEvents: " + entry.getValue()); |
||||
return false; |
||||
} |
||||
} |
||||
|
||||
} |
||||
return true; |
||||
} |
||||
|
||||
private static Number getMin(final String key) { |
||||
return Integer.parseInt(key.split("-")[0]); |
||||
} |
||||
|
||||
private static Number getMax(final String key) { |
||||
return Integer.parseInt(key.split("-")[1]); |
||||
} |
||||
|
||||
private static List<TopicPartition> getAllPartitions(final KafkaConsumer<?, ?> consumer, final String... topics) { |
||||
final List<TopicPartition> partitions = new ArrayList<>(); |
||||
|
||||
for (final String topic : topics) { |
||||
for (final PartitionInfo info : consumer.partitionsFor(topic)) { |
||||
partitions.add(new TopicPartition(info.topic(), info.partition())); |
||||
} |
||||
} |
||||
return partitions; |
||||
} |
||||
|
||||
} |
@ -0,0 +1,131 @@
@@ -0,0 +1,131 @@
|
||||
/* |
||||
* Licensed to the Apache Software Foundation (ASF) under one or more |
||||
* contributor license agreements. See the NOTICE file distributed with |
||||
* this work for additional information regarding copyright ownership. |
||||
* The ASF licenses this file to You under the Apache License, Version 2.0 |
||||
* (the "License"); you may not use this file except in compliance with |
||||
* the License. You may obtain a copy of the License at |
||||
* |
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
* |
||||
* Unless required by applicable law or agreed to in writing, software |
||||
* distributed under the License is distributed on an "AS IS" BASIS, |
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
||||
* See the License for the specific language governing permissions and |
||||
* limitations under the License. |
||||
*/ |
||||
package org.apache.kafka.streams.tests; |
||||
|
||||
import org.apache.kafka.common.serialization.Serde; |
||||
import org.apache.kafka.common.serialization.Serdes; |
||||
import org.apache.kafka.streams.KeyValue; |
||||
import org.apache.kafka.streams.kstream.Aggregator; |
||||
import org.apache.kafka.streams.kstream.Initializer; |
||||
import org.apache.kafka.streams.kstream.KeyValueMapper; |
||||
import org.apache.kafka.streams.kstream.Windowed; |
||||
import org.apache.kafka.streams.processor.api.ContextualProcessor; |
||||
import org.apache.kafka.streams.processor.api.ProcessorContext; |
||||
import org.apache.kafka.streams.processor.api.ProcessorSupplier; |
||||
|
||||
import java.time.Instant; |
||||
import org.apache.kafka.streams.processor.api.Record; |
||||
|
||||
public class SmokeTestUtil { |
||||
|
||||
final static int END = Integer.MAX_VALUE; |
||||
|
||||
static ProcessorSupplier<Object, Object, Void, Void> printProcessorSupplier(final String topic) { |
||||
return printProcessorSupplier(topic, ""); |
||||
} |
||||
|
||||
static ProcessorSupplier<Object, Object, Void, Void> printProcessorSupplier(final String topic, final String name) { |
||||
return () -> new ContextualProcessor<Object, Object, Void, Void>() { |
||||
private int numRecordsProcessed = 0; |
||||
private long smallestOffset = Long.MAX_VALUE; |
||||
private long largestOffset = Long.MIN_VALUE; |
||||
|
||||
@Override |
||||
public void init(final ProcessorContext<Void, Void> context) { |
||||
super.init(context); |
||||
System.out.println("[3.0] initializing processor: topic=" + topic + " taskId=" + context.taskId()); |
||||
System.out.flush(); |
||||
numRecordsProcessed = 0; |
||||
smallestOffset = Long.MAX_VALUE; |
||||
largestOffset = Long.MIN_VALUE; |
||||
} |
||||
|
||||
@Override |
||||
public void process(final Record<Object, Object> record) { |
||||
numRecordsProcessed++; |
||||
if (numRecordsProcessed % 100 == 0) { |
||||
System.out.printf("%s: %s%n", name, Instant.now()); |
||||
System.out.println("processed " + numRecordsProcessed + " records from topic=" + topic); |
||||
} |
||||
|
||||
context().recordMetadata().ifPresent(recordMetadata -> { |
||||
if (smallestOffset > recordMetadata.offset()) { |
||||
smallestOffset = recordMetadata.offset(); |
||||
} |
||||
if (largestOffset < recordMetadata.offset()) { |
||||
largestOffset = recordMetadata.offset(); |
||||
} |
||||
}); |
||||
} |
||||
|
||||
@Override |
||||
public void close() { |
||||
System.out.printf("Close processor for task %s%n", context().taskId()); |
||||
System.out.println("processed " + numRecordsProcessed + " records"); |
||||
final long processed; |
||||
if (largestOffset >= smallestOffset) { |
||||
processed = 1L + largestOffset - smallestOffset; |
||||
} else { |
||||
processed = 0L; |
||||
} |
||||
System.out.println("offset " + smallestOffset + " to " + largestOffset + " -> processed " + processed); |
||||
System.out.flush(); |
||||
} |
||||
}; |
||||
} |
||||
|
||||
public static final class Unwindow<K, V> implements KeyValueMapper<Windowed<K>, V, K> { |
||||
@Override |
||||
public K apply(final Windowed<K> winKey, final V value) { |
||||
return winKey.key(); |
||||
} |
||||
} |
||||
|
||||
public static class Agg { |
||||
|
||||
KeyValueMapper<String, Long, KeyValue<String, Long>> selector() { |
||||
return (key, value) -> new KeyValue<>(value == null ? null : Long.toString(value), 1L); |
||||
} |
||||
|
||||
public Initializer<Long> init() { |
||||
return () -> 0L; |
||||
} |
||||
|
||||
Aggregator<String, Long, Long> adder() { |
||||
return (aggKey, value, aggregate) -> aggregate + value; |
||||
} |
||||
|
||||
Aggregator<String, Long, Long> remover() { |
||||
return (aggKey, value, aggregate) -> aggregate - value; |
||||
} |
||||
} |
||||
|
||||
public static Serde<String> stringSerde = Serdes.String(); |
||||
|
||||
public static Serde<Integer> intSerde = Serdes.Integer(); |
||||
|
||||
static Serde<Long> longSerde = Serdes.Long(); |
||||
|
||||
static Serde<Double> doubleSerde = Serdes.Double(); |
||||
|
||||
public static void sleep(final long duration) { |
||||
try { |
||||
Thread.sleep(duration); |
||||
} catch (final Exception ignore) { } |
||||
} |
||||
|
||||
} |
@ -0,0 +1,100 @@
@@ -0,0 +1,100 @@
|
||||
/* |
||||
* Licensed to the Apache Software Foundation (ASF) under one or more |
||||
* contributor license agreements. See the NOTICE file distributed with |
||||
* this work for additional information regarding copyright ownership. |
||||
* The ASF licenses this file to You under the Apache License, Version 2.0 |
||||
* (the "License"); you may not use this file except in compliance with |
||||
* the License. You may obtain a copy of the License at |
||||
* |
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
* |
||||
* Unless required by applicable law or agreed to in writing, software |
||||
* distributed under the License is distributed on an "AS IS" BASIS, |
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
||||
* See the License for the specific language governing permissions and |
||||
* limitations under the License. |
||||
*/ |
||||
package org.apache.kafka.streams.tests; |
||||
|
||||
import org.apache.kafka.common.utils.Exit; |
||||
import org.apache.kafka.common.utils.Utils; |
||||
import org.apache.kafka.streams.StreamsConfig; |
||||
|
||||
import java.io.IOException; |
||||
import java.time.Duration; |
||||
import java.util.Map; |
||||
import java.util.Properties; |
||||
import java.util.Set; |
||||
import java.util.UUID; |
||||
|
||||
import static org.apache.kafka.streams.tests.SmokeTestDriver.generate; |
||||
import static org.apache.kafka.streams.tests.SmokeTestDriver.generatePerpetually; |
||||
|
||||
public class StreamsSmokeTest { |
||||
|
||||
/** |
||||
* args ::= kafka propFileName command disableAutoTerminate |
||||
* command := "run" | "process" |
||||
* |
||||
* @param args |
||||
*/ |
||||
public static void main(final String[] args) throws IOException { |
||||
if (args.length < 2) { |
||||
System.err.println("StreamsSmokeTest are expecting two parameters: propFile, command; but only see " + args.length + " parameter"); |
||||
Exit.exit(1); |
||||
} |
||||
|
||||
final String propFileName = args[0]; |
||||
final String command = args[1]; |
||||
final boolean disableAutoTerminate = args.length > 2; |
||||
|
||||
final Properties streamsProperties = Utils.loadProps(propFileName); |
||||
final String kafka = streamsProperties.getProperty(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG); |
||||
final String processingGuarantee = streamsProperties.getProperty(StreamsConfig.PROCESSING_GUARANTEE_CONFIG); |
||||
|
||||
if (kafka == null) { |
||||
System.err.println("No bootstrap kafka servers specified in " + StreamsConfig.BOOTSTRAP_SERVERS_CONFIG); |
||||
Exit.exit(1); |
||||
} |
||||
|
||||
if ("process".equals(command)) { |
||||
if (!StreamsConfig.AT_LEAST_ONCE.equals(processingGuarantee) && |
||||
!StreamsConfig.EXACTLY_ONCE_V2.equals(processingGuarantee)) { |
||||
|
||||
System.err.println("processingGuarantee must be either " + StreamsConfig.AT_LEAST_ONCE + " or " + |
||||
StreamsConfig.EXACTLY_ONCE_V2); |
||||
|
||||
Exit.exit(1); |
||||
} |
||||
} |
||||
|
||||
System.out.println("StreamsTest instance started (StreamsSmokeTest)"); |
||||
System.out.println("command=" + command); |
||||
System.out.println("props=" + streamsProperties); |
||||
System.out.println("disableAutoTerminate=" + disableAutoTerminate); |
||||
|
||||
switch (command) { |
||||
case "run": |
||||
// this starts the driver (data generation and result verification)
|
||||
final int numKeys = 10; |
||||
final int maxRecordsPerKey = 500; |
||||
if (disableAutoTerminate) { |
||||
generatePerpetually(kafka, numKeys, maxRecordsPerKey); |
||||
} else { |
||||
// slow down data production to span 30 seconds so that system tests have time to
|
||||
// do their bounces, etc.
|
||||
final Map<String, Set<Integer>> allData = |
||||
generate(kafka, numKeys, maxRecordsPerKey, Duration.ofSeconds(30)); |
||||
SmokeTestDriver.verify(kafka, allData, maxRecordsPerKey); |
||||
} |
||||
break; |
||||
case "process": |
||||
// this starts the stream processing app
|
||||
new SmokeTestClient(UUID.randomUUID().toString()).start(streamsProperties); |
||||
break; |
||||
default: |
||||
System.out.println("unknown command: " + command); |
||||
} |
||||
} |
||||
|
||||
} |
@ -0,0 +1,88 @@
@@ -0,0 +1,88 @@
|
||||
/* |
||||
* Licensed to the Apache Software Foundation (ASF) under one or more |
||||
* contributor license agreements. See the NOTICE file distributed with |
||||
* this work for additional information regarding copyright ownership. |
||||
* The ASF licenses this file to You under the Apache License, Version 2.0 |
||||
* (the "License"); you may not use this file except in compliance with |
||||
* the License. You may obtain a copy of the License at |
||||
* |
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
* |
||||
* Unless required by applicable law or agreed to in writing, software |
||||
* distributed under the License is distributed on an "AS IS" BASIS, |
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
||||
* See the License for the specific language governing permissions and |
||||
* limitations under the License. |
||||
*/ |
||||
package org.apache.kafka.streams.tests; |
||||
|
||||
import org.apache.kafka.common.utils.Utils; |
||||
import org.apache.kafka.streams.KafkaStreams; |
||||
import org.apache.kafka.streams.StreamsBuilder; |
||||
import org.apache.kafka.streams.StreamsConfig; |
||||
import org.apache.kafka.streams.kstream.KStream; |
||||
import org.apache.kafka.streams.processor.api.ContextualProcessor; |
||||
import org.apache.kafka.streams.processor.api.ProcessorContext; |
||||
import org.apache.kafka.streams.processor.api.ProcessorSupplier; |
||||
import org.apache.kafka.streams.processor.api.Record; |
||||
|
||||
import java.util.Properties; |
||||
|
||||
|
||||
public class StreamsUpgradeTest { |
||||
|
||||
@SuppressWarnings("unchecked") |
||||
public static void main(final String[] args) throws Exception { |
||||
if (args.length < 1) { |
||||
System.err.println("StreamsUpgradeTest requires one argument (properties-file) but provided none"); |
||||
} |
||||
final String propFileName = args[0]; |
||||
|
||||
final Properties streamsProperties = Utils.loadProps(propFileName); |
||||
|
||||
System.out.println("StreamsTest instance started (StreamsUpgradeTest v3.0)"); |
||||
System.out.println("props=" + streamsProperties); |
||||
|
||||
final StreamsBuilder builder = new StreamsBuilder(); |
||||
final KStream dataStream = builder.stream("data"); |
||||
dataStream.process(printProcessorSupplier()); |
||||
dataStream.to("echo"); |
||||
|
||||
final Properties config = new Properties(); |
||||
config.setProperty(StreamsConfig.APPLICATION_ID_CONFIG, "StreamsUpgradeTest"); |
||||
config.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, 1000); |
||||
config.putAll(streamsProperties); |
||||
|
||||
final KafkaStreams streams = new KafkaStreams(builder.build(), config); |
||||
streams.start(); |
||||
|
||||
Runtime.getRuntime().addShutdownHook(new Thread(() -> { |
||||
streams.close(); |
||||
System.out.println("UPGRADE-TEST-CLIENT-CLOSED"); |
||||
System.out.flush(); |
||||
})); |
||||
} |
||||
|
||||
private static <KIn, VIn, KOut, VOut> ProcessorSupplier<KIn, VIn, KOut, VOut> printProcessorSupplier() { |
||||
return () -> new ContextualProcessor<KIn, VIn, KOut, VOut>() { |
||||
private int numRecordsProcessed = 0; |
||||
|
||||
@Override |
||||
public void init(final ProcessorContext<KOut, VOut> context) { |
||||
System.out.println("[3.0] initializing processor: topic=data taskId=" + context.taskId()); |
||||
numRecordsProcessed = 0; |
||||
} |
||||
|
||||
@Override |
||||
public void process(final Record<KIn, VIn> record) { |
||||
numRecordsProcessed++; |
||||
if (numRecordsProcessed % 100 == 0) { |
||||
System.out.println("processed " + numRecordsProcessed + " records from topic=data"); |
||||
} |
||||
} |
||||
|
||||
@Override |
||||
public void close() {} |
||||
}; |
||||
} |
||||
} |
@ -0,0 +1,299 @@
@@ -0,0 +1,299 @@
|
||||
/* |
||||
* Licensed to the Apache Software Foundation (ASF) under one or more |
||||
* contributor license agreements. See the NOTICE file distributed with |
||||
* this work for additional information regarding copyright ownership. |
||||
* The ASF licenses this file to You under the Apache License, Version 2.0 |
||||
* (the "License"); you may not use this file except in compliance with |
||||
* the License. You may obtain a copy of the License at |
||||
* |
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
* |
||||
* Unless required by applicable law or agreed to in writing, software |
||||
* distributed under the License is distributed on an "AS IS" BASIS, |
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
||||
* See the License for the specific language governing permissions and |
||||
* limitations under the License. |
||||
*/ |
||||
package org.apache.kafka.streams.tests; |
||||
|
||||
import org.apache.kafka.common.serialization.Serdes; |
||||
import org.apache.kafka.common.utils.Bytes; |
||||
import org.apache.kafka.common.utils.KafkaThread; |
||||
import org.apache.kafka.common.utils.Utils; |
||||
import org.apache.kafka.streams.KafkaStreams; |
||||
import org.apache.kafka.streams.KeyValue; |
||||
import org.apache.kafka.streams.StreamsBuilder; |
||||
import org.apache.kafka.streams.StreamsConfig; |
||||
import org.apache.kafka.streams.Topology; |
||||
import org.apache.kafka.streams.errors.StreamsUncaughtExceptionHandler; |
||||
import org.apache.kafka.streams.kstream.Consumed; |
||||
import org.apache.kafka.streams.kstream.Grouped; |
||||
import org.apache.kafka.streams.kstream.KGroupedStream; |
||||
import org.apache.kafka.streams.kstream.KStream; |
||||
import org.apache.kafka.streams.kstream.KTable; |
||||
import org.apache.kafka.streams.kstream.Materialized; |
||||
import org.apache.kafka.streams.kstream.Produced; |
||||
import org.apache.kafka.streams.kstream.Suppressed.BufferConfig; |
||||
import org.apache.kafka.streams.kstream.TimeWindows; |
||||
import org.apache.kafka.streams.kstream.Windowed; |
||||
import org.apache.kafka.streams.state.Stores; |
||||
import org.apache.kafka.streams.state.WindowStore; |
||||
|
||||
import java.io.File; |
||||
import java.io.IOException; |
||||
import java.nio.file.Files; |
||||
import java.time.Duration; |
||||
import java.time.Instant; |
||||
import java.util.Properties; |
||||
import java.util.concurrent.CountDownLatch; |
||||
import java.util.concurrent.TimeUnit; |
||||
|
||||
import static org.apache.kafka.streams.kstream.Suppressed.untilWindowCloses; |
||||
|
||||
public class SmokeTestClient extends SmokeTestUtil { |
||||
|
||||
private final String name; |
||||
|
||||
private KafkaStreams streams; |
||||
private boolean uncaughtException = false; |
||||
private boolean started; |
||||
private volatile boolean closed; |
||||
|
||||
private static void addShutdownHook(final String name, final Runnable runnable) { |
||||
if (name != null) { |
||||
Runtime.getRuntime().addShutdownHook(KafkaThread.nonDaemon(name, runnable)); |
||||
} else { |
||||
Runtime.getRuntime().addShutdownHook(new Thread(runnable)); |
||||
} |
||||
} |
||||
|
||||
private static File tempDirectory() { |
||||
final String prefix = "kafka-"; |
||||
final File file; |
||||
try { |
||||
file = Files.createTempDirectory(prefix).toFile(); |
||||
} catch (final IOException ex) { |
||||
throw new RuntimeException("Failed to create a temp dir", ex); |
||||
} |
||||
file.deleteOnExit(); |
||||
|
||||
addShutdownHook("delete-temp-file-shutdown-hook", () -> { |
||||
try { |
||||
Utils.delete(file); |
||||
} catch (final IOException e) { |
||||
System.out.println("Error deleting " + file.getAbsolutePath()); |
||||
e.printStackTrace(System.out); |
||||
} |
||||
}); |
||||
|
||||
return file; |
||||
} |
||||
|
||||
public SmokeTestClient(final String name) { |
||||
this.name = name; |
||||
} |
||||
|
||||
public boolean started() { |
||||
return started; |
||||
} |
||||
|
||||
public boolean closed() { |
||||
return closed; |
||||
} |
||||
|
||||
public void start(final Properties streamsProperties) { |
||||
final Topology build = getTopology(); |
||||
streams = new KafkaStreams(build, getStreamsConfig(streamsProperties)); |
||||
|
||||
final CountDownLatch countDownLatch = new CountDownLatch(1); |
||||
streams.setStateListener((newState, oldState) -> { |
||||
System.out.printf("%s %s: %s -> %s%n", name, Instant.now(), oldState, newState); |
||||
if (oldState == KafkaStreams.State.REBALANCING && newState == KafkaStreams.State.RUNNING) { |
||||
started = true; |
||||
countDownLatch.countDown(); |
||||
} |
||||
|
||||
if (newState == KafkaStreams.State.NOT_RUNNING) { |
||||
closed = true; |
||||
} |
||||
}); |
||||
|
||||
streams.setUncaughtExceptionHandler(e -> { |
||||
System.out.println(name + ": SMOKE-TEST-CLIENT-EXCEPTION"); |
||||
System.out.println(name + ": FATAL: An unexpected exception is encountered: " + e); |
||||
e.printStackTrace(System.out); |
||||
uncaughtException = true; |
||||
return StreamsUncaughtExceptionHandler.StreamThreadExceptionResponse.SHUTDOWN_CLIENT; |
||||
}); |
||||
|
||||
addShutdownHook("streams-shutdown-hook", this::close); |
||||
|
||||
streams.start(); |
||||
try { |
||||
if (!countDownLatch.await(1, TimeUnit.MINUTES)) { |
||||
System.out.println(name + ": SMOKE-TEST-CLIENT-EXCEPTION: Didn't start in one minute"); |
||||
} |
||||
} catch (final InterruptedException e) { |
||||
System.out.println(name + ": SMOKE-TEST-CLIENT-EXCEPTION: " + e); |
||||
e.printStackTrace(System.out); |
||||
} |
||||
System.out.println(name + ": SMOKE-TEST-CLIENT-STARTED"); |
||||
System.out.println(name + " started at " + Instant.now()); |
||||
} |
||||
|
||||
public void closeAsync() { |
||||
streams.close(Duration.ZERO); |
||||
} |
||||
|
||||
public void close() { |
||||
final boolean closed = streams.close(Duration.ofMinutes(1)); |
||||
|
||||
if (closed && !uncaughtException) { |
||||
System.out.println(name + ": SMOKE-TEST-CLIENT-CLOSED"); |
||||
} else if (closed) { |
||||
System.out.println(name + ": SMOKE-TEST-CLIENT-EXCEPTION"); |
||||
} else { |
||||
System.out.println(name + ": SMOKE-TEST-CLIENT-EXCEPTION: Didn't close"); |
||||
} |
||||
} |
||||
|
||||
private Properties getStreamsConfig(final Properties props) { |
||||
final Properties fullProps = new Properties(props); |
||||
fullProps.put(StreamsConfig.APPLICATION_ID_CONFIG, "SmokeTest"); |
||||
fullProps.put(StreamsConfig.CLIENT_ID_CONFIG, "SmokeTest-" + name); |
||||
fullProps.put(StreamsConfig.STATE_DIR_CONFIG, tempDirectory().getAbsolutePath()); |
||||
fullProps.putAll(props); |
||||
return fullProps; |
||||
} |
||||
|
||||
public Topology getTopology() { |
||||
final StreamsBuilder builder = new StreamsBuilder(); |
||||
final Consumed<String, Integer> stringIntConsumed = Consumed.with(stringSerde, intSerde); |
||||
final KStream<String, Integer> source = builder.stream("data", stringIntConsumed); |
||||
source.filterNot((k, v) -> k.equals("flush")) |
||||
.to("echo", Produced.with(stringSerde, intSerde)); |
||||
final KStream<String, Integer> data = source.filter((key, value) -> value == null || value != END); |
||||
data.process(SmokeTestUtil.printProcessorSupplier("data", name)); |
||||
|
||||
// min
|
||||
final KGroupedStream<String, Integer> groupedData = data.groupByKey(Grouped.with(stringSerde, intSerde)); |
||||
|
||||
final KTable<Windowed<String>, Integer> minAggregation = groupedData |
||||
.windowedBy(TimeWindows.ofSizeAndGrace(Duration.ofDays(1), Duration.ofMinutes(1))) |
||||
.aggregate( |
||||
() -> Integer.MAX_VALUE, |
||||
(aggKey, value, aggregate) -> (value < aggregate) ? value : aggregate, |
||||
Materialized |
||||
.<String, Integer, WindowStore<Bytes, byte[]>>as("uwin-min") |
||||
.withValueSerde(intSerde) |
||||
.withRetention(Duration.ofHours(25)) |
||||
); |
||||
|
||||
streamify(minAggregation, "min-raw"); |
||||
|
||||
streamify(minAggregation.suppress(untilWindowCloses(BufferConfig.unbounded())), "min-suppressed"); |
||||
|
||||
minAggregation |
||||
.toStream(new Unwindow<>()) |
||||
.filterNot((k, v) -> k.equals("flush")) |
||||
.to("min", Produced.with(stringSerde, intSerde)); |
||||
|
||||
final KTable<Windowed<String>, Integer> smallWindowSum = groupedData |
||||
.windowedBy(TimeWindows.ofSizeAndGrace(Duration.ofSeconds(2), Duration.ofSeconds(30)).advanceBy(Duration.ofSeconds(1))) |
||||
.reduce(Integer::sum); |
||||
|
||||
streamify(smallWindowSum, "sws-raw"); |
||||
streamify(smallWindowSum.suppress(untilWindowCloses(BufferConfig.unbounded())), "sws-suppressed"); |
||||
|
||||
final KTable<String, Integer> minTable = builder.table( |
||||
"min", |
||||
Consumed.with(stringSerde, intSerde), |
||||
Materialized.as("minStoreName")); |
||||
|
||||
minTable.toStream().process(SmokeTestUtil.printProcessorSupplier("min", name)); |
||||
|
||||
// max
|
||||
groupedData |
||||
.windowedBy(TimeWindows.ofSizeWithNoGrace(Duration.ofDays(2))) |
||||
.aggregate( |
||||
() -> Integer.MIN_VALUE, |
||||
(aggKey, value, aggregate) -> (value > aggregate) ? value : aggregate, |
||||
Materialized.<String, Integer, WindowStore<Bytes, byte[]>>as("uwin-max").withValueSerde(intSerde)) |
||||
.toStream(new Unwindow<>()) |
||||
.filterNot((k, v) -> k.equals("flush")) |
||||
.to("max", Produced.with(stringSerde, intSerde)); |
||||
|
||||
final KTable<String, Integer> maxTable = builder.table( |
||||
"max", |
||||
Consumed.with(stringSerde, intSerde), |
||||
Materialized.as("maxStoreName")); |
||||
maxTable.toStream().process(SmokeTestUtil.printProcessorSupplier("max", name)); |
||||
|
||||
// sum
|
||||
groupedData |
||||
.windowedBy(TimeWindows.ofSizeWithNoGrace(Duration.ofDays(2))) |
||||
.aggregate( |
||||
() -> 0L, |
||||
(aggKey, value, aggregate) -> (long) value + aggregate, |
||||
Materialized.<String, Long, WindowStore<Bytes, byte[]>>as("win-sum").withValueSerde(longSerde)) |
||||
.toStream(new Unwindow<>()) |
||||
.filterNot((k, v) -> k.equals("flush")) |
||||
.to("sum", Produced.with(stringSerde, longSerde)); |
||||
|
||||
final Consumed<String, Long> stringLongConsumed = Consumed.with(stringSerde, longSerde); |
||||
final KTable<String, Long> sumTable = builder.table("sum", stringLongConsumed); |
||||
sumTable.toStream().process(SmokeTestUtil.printProcessorSupplier("sum", name)); |
||||
|
||||
// cnt
|
||||
groupedData |
||||
.windowedBy(TimeWindows.ofSizeWithNoGrace(Duration.ofDays(2))) |
||||
.count(Materialized.as("uwin-cnt")) |
||||
.toStream(new Unwindow<>()) |
||||
.filterNot((k, v) -> k.equals("flush")) |
||||
.to("cnt", Produced.with(stringSerde, longSerde)); |
||||
|
||||
final KTable<String, Long> cntTable = builder.table( |
||||
"cnt", |
||||
Consumed.with(stringSerde, longSerde), |
||||
Materialized.as("cntStoreName")); |
||||
cntTable.toStream().process(SmokeTestUtil.printProcessorSupplier("cnt", name)); |
||||
|
||||
// dif
|
||||
maxTable |
||||
.join( |
||||
minTable, |
||||
(value1, value2) -> value1 - value2) |
||||
.toStream() |
||||
.filterNot((k, v) -> k.equals("flush")) |
||||
.to("dif", Produced.with(stringSerde, intSerde)); |
||||
|
||||
// avg
|
||||
sumTable |
||||
.join( |
||||
cntTable, |
||||
(value1, value2) -> (double) value1 / (double) value2) |
||||
.toStream() |
||||
.filterNot((k, v) -> k.equals("flush")) |
||||
.to("avg", Produced.with(stringSerde, doubleSerde)); |
||||
|
||||
// test repartition
|
||||
final Agg agg = new Agg(); |
||||
cntTable.groupBy(agg.selector(), Grouped.with(stringSerde, longSerde)) |
||||
.aggregate(agg.init(), agg.adder(), agg.remover(), |
||||
Materialized.<String, Long>as(Stores.inMemoryKeyValueStore("cntByCnt")) |
||||
.withKeySerde(Serdes.String()) |
||||
.withValueSerde(Serdes.Long())) |
||||
.toStream() |
||||
.to("tagg", Produced.with(stringSerde, longSerde)); |
||||
|
||||
return builder.build(); |
||||
} |
||||
|
||||
private static void streamify(final KTable<Windowed<String>, Integer> windowedTable, final String topic) { |
||||
windowedTable |
||||
.toStream() |
||||
.filterNot((k, v) -> k.key().equals("flush")) |
||||
.map((key, value) -> new KeyValue<>(key.toString(), value)) |
||||
.to(topic, Produced.with(stringSerde, intSerde)); |
||||
} |
||||
} |
@ -0,0 +1,622 @@
@@ -0,0 +1,622 @@
|
||||
/* |
||||
* Licensed to the Apache Software Foundation (ASF) under one or more |
||||
* contributor license agreements. See the NOTICE file distributed with |
||||
* this work for additional information regarding copyright ownership. |
||||
* The ASF licenses this file to You under the Apache License, Version 2.0 |
||||
* (the "License"); you may not use this file except in compliance with |
||||
* the License. You may obtain a copy of the License at |
||||
* |
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
* |
||||
* Unless required by applicable law or agreed to in writing, software |
||||
* distributed under the License is distributed on an "AS IS" BASIS, |
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
||||
* See the License for the specific language governing permissions and |
||||
* limitations under the License. |
||||
*/ |
||||
package org.apache.kafka.streams.tests; |
||||
|
||||
import org.apache.kafka.clients.consumer.ConsumerConfig; |
||||
import org.apache.kafka.clients.consumer.ConsumerRecord; |
||||
import org.apache.kafka.clients.consumer.ConsumerRecords; |
||||
import org.apache.kafka.clients.consumer.KafkaConsumer; |
||||
import org.apache.kafka.clients.producer.Callback; |
||||
import org.apache.kafka.clients.producer.KafkaProducer; |
||||
import org.apache.kafka.clients.producer.ProducerConfig; |
||||
import org.apache.kafka.clients.producer.ProducerRecord; |
||||
import org.apache.kafka.clients.producer.RecordMetadata; |
||||
import org.apache.kafka.common.PartitionInfo; |
||||
import org.apache.kafka.common.TopicPartition; |
||||
import org.apache.kafka.common.errors.TimeoutException; |
||||
import org.apache.kafka.common.serialization.ByteArraySerializer; |
||||
import org.apache.kafka.common.serialization.Deserializer; |
||||
import org.apache.kafka.common.serialization.StringDeserializer; |
||||
import org.apache.kafka.common.utils.Exit; |
||||
import org.apache.kafka.common.utils.Utils; |
||||
|
||||
import java.io.ByteArrayOutputStream; |
||||
import java.io.PrintStream; |
||||
import java.nio.charset.StandardCharsets; |
||||
import java.time.Duration; |
||||
import java.time.Instant; |
||||
import java.util.ArrayList; |
||||
import java.util.Arrays; |
||||
import java.util.Collections; |
||||
import java.util.HashMap; |
||||
import java.util.HashSet; |
||||
import java.util.LinkedList; |
||||
import java.util.List; |
||||
import java.util.Map; |
||||
import java.util.Properties; |
||||
import java.util.Random; |
||||
import java.util.Set; |
||||
import java.util.concurrent.TimeUnit; |
||||
import java.util.concurrent.atomic.AtomicInteger; |
||||
import java.util.function.Function; |
||||
import java.util.stream.Collectors; |
||||
import java.util.stream.Stream; |
||||
|
||||
import static java.util.Collections.emptyMap; |
||||
import static org.apache.kafka.common.utils.Utils.mkEntry; |
||||
|
||||
public class SmokeTestDriver extends SmokeTestUtil { |
||||
private static final String[] TOPICS = { |
||||
"data", |
||||
"echo", |
||||
"max", |
||||
"min", "min-suppressed", "min-raw", |
||||
"dif", |
||||
"sum", |
||||
"sws-raw", "sws-suppressed", |
||||
"cnt", |
||||
"avg", |
||||
"tagg" |
||||
}; |
||||
|
||||
private static final int MAX_RECORD_EMPTY_RETRIES = 30; |
||||
|
||||
private static class ValueList { |
||||
public final String key; |
||||
private final int[] values; |
||||
private int index; |
||||
|
||||
ValueList(final int min, final int max) { |
||||
key = min + "-" + max; |
||||
|
||||
values = new int[max - min + 1]; |
||||
for (int i = 0; i < values.length; i++) { |
||||
values[i] = min + i; |
||||
} |
||||
// We want to randomize the order of data to test not completely predictable processing order
|
||||
// However, values are also use as a timestamp of the record. (TODO: separate data and timestamp)
|
||||
// We keep some correlation of time and order. Thus, the shuffling is done with a sliding window
|
||||
shuffle(values, 10); |
||||
|
||||
index = 0; |
||||
} |
||||
|
||||
int next() { |
||||
return (index < values.length) ? values[index++] : -1; |
||||
} |
||||
} |
||||
|
||||
public static String[] topics() { |
||||
return Arrays.copyOf(TOPICS, TOPICS.length); |
||||
} |
||||
|
||||
static void generatePerpetually(final String kafka, |
||||
final int numKeys, |
||||
final int maxRecordsPerKey) { |
||||
final Properties producerProps = generatorProperties(kafka); |
||||
|
||||
int numRecordsProduced = 0; |
||||
|
||||
final ValueList[] data = new ValueList[numKeys]; |
||||
for (int i = 0; i < numKeys; i++) { |
||||
data[i] = new ValueList(i, i + maxRecordsPerKey - 1); |
||||
} |
||||
|
||||
final Random rand = new Random(); |
||||
|
||||
try (final KafkaProducer<byte[], byte[]> producer = new KafkaProducer<>(producerProps)) { |
||||
while (true) { |
||||
final int index = rand.nextInt(numKeys); |
||||
final String key = data[index].key; |
||||
final int value = data[index].next(); |
||||
|
||||
final ProducerRecord<byte[], byte[]> record = |
||||
new ProducerRecord<>( |
||||
"data", |
||||
stringSerde.serializer().serialize("", key), |
||||
intSerde.serializer().serialize("", value) |
||||
); |
||||
|
||||
producer.send(record); |
||||
|
||||
numRecordsProduced++; |
||||
if (numRecordsProduced % 100 == 0) { |
||||
System.out.println(Instant.now() + " " + numRecordsProduced + " records produced"); |
||||
} |
||||
Utils.sleep(2); |
||||
} |
||||
} |
||||
} |
||||
|
||||
public static Map<String, Set<Integer>> generate(final String kafka, |
||||
final int numKeys, |
||||
final int maxRecordsPerKey, |
||||
final Duration timeToSpend) { |
||||
final Properties producerProps = generatorProperties(kafka); |
||||
|
||||
|
||||
int numRecordsProduced = 0; |
||||
|
||||
final Map<String, Set<Integer>> allData = new HashMap<>(); |
||||
final ValueList[] data = new ValueList[numKeys]; |
||||
for (int i = 0; i < numKeys; i++) { |
||||
data[i] = new ValueList(i, i + maxRecordsPerKey - 1); |
||||
allData.put(data[i].key, new HashSet<>()); |
||||
} |
||||
final Random rand = new Random(); |
||||
|
||||
int remaining = data.length; |
||||
|
||||
final long recordPauseTime = timeToSpend.toMillis() / numKeys / maxRecordsPerKey; |
||||
|
||||
List<ProducerRecord<byte[], byte[]>> needRetry = new ArrayList<>(); |
||||
|
||||
try (final KafkaProducer<byte[], byte[]> producer = new KafkaProducer<>(producerProps)) { |
||||
while (remaining > 0) { |
||||
final int index = rand.nextInt(remaining); |
||||
final String key = data[index].key; |
||||
final int value = data[index].next(); |
||||
|
||||
if (value < 0) { |
||||
remaining--; |
||||
data[index] = data[remaining]; |
||||
} else { |
||||
|
||||
final ProducerRecord<byte[], byte[]> record = |
||||
new ProducerRecord<>( |
||||
"data", |
||||
stringSerde.serializer().serialize("", key), |
||||
intSerde.serializer().serialize("", value) |
||||
); |
||||
|
||||
producer.send(record, new TestCallback(record, needRetry)); |
||||
|
||||
numRecordsProduced++; |
||||
allData.get(key).add(value); |
||||
if (numRecordsProduced % 100 == 0) { |
||||
System.out.println(Instant.now() + " " + numRecordsProduced + " records produced"); |
||||
} |
||||
Utils.sleep(Math.max(recordPauseTime, 2)); |
||||
} |
||||
} |
||||
producer.flush(); |
||||
|
||||
int remainingRetries = 5; |
||||
while (!needRetry.isEmpty()) { |
||||
final List<ProducerRecord<byte[], byte[]>> needRetry2 = new ArrayList<>(); |
||||
for (final ProducerRecord<byte[], byte[]> record : needRetry) { |
||||
System.out.println("retry producing " + stringSerde.deserializer().deserialize("", record.key())); |
||||
producer.send(record, new TestCallback(record, needRetry2)); |
||||
} |
||||
producer.flush(); |
||||
needRetry = needRetry2; |
||||
|
||||
if (--remainingRetries == 0 && !needRetry.isEmpty()) { |
||||
System.err.println("Failed to produce all records after multiple retries"); |
||||
Exit.exit(1); |
||||
} |
||||
} |
||||
|
||||
// now that we've sent everything, we'll send some final records with a timestamp high enough to flush out
|
||||
// all suppressed records.
|
||||
final List<PartitionInfo> partitions = producer.partitionsFor("data"); |
||||
for (final PartitionInfo partition : partitions) { |
||||
producer.send(new ProducerRecord<>( |
||||
partition.topic(), |
||||
partition.partition(), |
||||
System.currentTimeMillis() + Duration.ofDays(2).toMillis(), |
||||
stringSerde.serializer().serialize("", "flush"), |
||||
intSerde.serializer().serialize("", 0) |
||||
)); |
||||
} |
||||
} |
||||
return Collections.unmodifiableMap(allData); |
||||
} |
||||
|
||||
private static Properties generatorProperties(final String kafka) { |
||||
final Properties producerProps = new Properties(); |
||||
producerProps.put(ProducerConfig.CLIENT_ID_CONFIG, "SmokeTest"); |
||||
producerProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, kafka); |
||||
producerProps.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, ByteArraySerializer.class); |
||||
producerProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, ByteArraySerializer.class); |
||||
producerProps.put(ProducerConfig.ACKS_CONFIG, "all"); |
||||
return producerProps; |
||||
} |
||||
|
||||
private static class TestCallback implements Callback { |
||||
private final ProducerRecord<byte[], byte[]> originalRecord; |
||||
private final List<ProducerRecord<byte[], byte[]>> needRetry; |
||||
|
||||
TestCallback(final ProducerRecord<byte[], byte[]> originalRecord, |
||||
final List<ProducerRecord<byte[], byte[]>> needRetry) { |
||||
this.originalRecord = originalRecord; |
||||
this.needRetry = needRetry; |
||||
} |
||||
|
||||
@Override |
||||
public void onCompletion(final RecordMetadata metadata, final Exception exception) { |
||||
if (exception != null) { |
||||
if (exception instanceof TimeoutException) { |
||||
needRetry.add(originalRecord); |
||||
} else { |
||||
exception.printStackTrace(); |
||||
Exit.exit(1); |
||||
} |
||||
} |
||||
} |
||||
} |
||||
|
||||
private static void shuffle(final int[] data, @SuppressWarnings("SameParameterValue") final int windowSize) { |
||||
final Random rand = new Random(); |
||||
for (int i = 0; i < data.length; i++) { |
||||
// we shuffle data within windowSize
|
||||
final int j = rand.nextInt(Math.min(data.length - i, windowSize)) + i; |
||||
|
||||
// swap
|
||||
final int tmp = data[i]; |
||||
data[i] = data[j]; |
||||
data[j] = tmp; |
||||
} |
||||
} |
||||
|
||||
public static class NumberDeserializer implements Deserializer<Number> { |
||||
@Override |
||||
public Number deserialize(final String topic, final byte[] data) { |
||||
final Number value; |
||||
switch (topic) { |
||||
case "data": |
||||
case "echo": |
||||
case "min": |
||||
case "min-raw": |
||||
case "min-suppressed": |
||||
case "sws-raw": |
||||
case "sws-suppressed": |
||||
case "max": |
||||
case "dif": |
||||
value = intSerde.deserializer().deserialize(topic, data); |
||||
break; |
||||
case "sum": |
||||
case "cnt": |
||||
case "tagg": |
||||
value = longSerde.deserializer().deserialize(topic, data); |
||||
break; |
||||
case "avg": |
||||
value = doubleSerde.deserializer().deserialize(topic, data); |
||||
break; |
||||
default: |
||||
throw new RuntimeException("unknown topic: " + topic); |
||||
} |
||||
return value; |
||||
} |
||||
} |
||||
|
||||
public static VerificationResult verify(final String kafka, |
||||
final Map<String, Set<Integer>> inputs, |
||||
final int maxRecordsPerKey) { |
||||
final Properties props = new Properties(); |
||||
props.put(ConsumerConfig.CLIENT_ID_CONFIG, "verifier"); |
||||
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, kafka); |
||||
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class); |
||||
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, NumberDeserializer.class); |
||||
props.put(ConsumerConfig.ISOLATION_LEVEL_CONFIG, "read_committed"); |
||||
|
||||
final KafkaConsumer<String, Number> consumer = new KafkaConsumer<>(props); |
||||
final List<TopicPartition> partitions = getAllPartitions(consumer, TOPICS); |
||||
consumer.assign(partitions); |
||||
consumer.seekToBeginning(partitions); |
||||
|
||||
final int recordsGenerated = inputs.size() * maxRecordsPerKey; |
||||
int recordsProcessed = 0; |
||||
final Map<String, AtomicInteger> processed = |
||||
Stream.of(TOPICS) |
||||
.collect(Collectors.toMap(t -> t, t -> new AtomicInteger(0))); |
||||
|
||||
final Map<String, Map<String, LinkedList<ConsumerRecord<String, Number>>>> events = new HashMap<>(); |
||||
|
||||
VerificationResult verificationResult = new VerificationResult(false, "no results yet"); |
||||
int retry = 0; |
||||
final long start = System.currentTimeMillis(); |
||||
while (System.currentTimeMillis() - start < TimeUnit.MINUTES.toMillis(6)) { |
||||
final ConsumerRecords<String, Number> records = consumer.poll(Duration.ofSeconds(5)); |
||||
if (records.isEmpty() && recordsProcessed >= recordsGenerated) { |
||||
verificationResult = verifyAll(inputs, events, false); |
||||
if (verificationResult.passed()) { |
||||
break; |
||||
} else if (retry++ > MAX_RECORD_EMPTY_RETRIES) { |
||||
System.out.println(Instant.now() + " Didn't get any more results, verification hasn't passed, and out of retries."); |
||||
break; |
||||
} else { |
||||
System.out.println(Instant.now() + " Didn't get any more results, but verification hasn't passed (yet). Retrying..." + retry); |
||||
} |
||||
} else { |
||||
System.out.println(Instant.now() + " Get some more results from " + records.partitions() + ", resetting retry."); |
||||
|
||||
retry = 0; |
||||
for (final ConsumerRecord<String, Number> record : records) { |
||||
final String key = record.key(); |
||||
|
||||
final String topic = record.topic(); |
||||
processed.get(topic).incrementAndGet(); |
||||
|
||||
if (topic.equals("echo")) { |
||||
recordsProcessed++; |
||||
if (recordsProcessed % 100 == 0) { |
||||
System.out.println("Echo records processed = " + recordsProcessed); |
||||
} |
||||
} |
||||
|
||||
events.computeIfAbsent(topic, t -> new HashMap<>()) |
||||
.computeIfAbsent(key, k -> new LinkedList<>()) |
||||
.add(record); |
||||
} |
||||
|
||||
System.out.println(processed); |
||||
} |
||||
} |
||||
consumer.close(); |
||||
final long finished = System.currentTimeMillis() - start; |
||||
System.out.println("Verification time=" + finished); |
||||
System.out.println("-------------------"); |
||||
System.out.println("Result Verification"); |
||||
System.out.println("-------------------"); |
||||
System.out.println("recordGenerated=" + recordsGenerated); |
||||
System.out.println("recordProcessed=" + recordsProcessed); |
||||
|
||||
if (recordsProcessed > recordsGenerated) { |
||||
System.out.println("PROCESSED-MORE-THAN-GENERATED"); |
||||
} else if (recordsProcessed < recordsGenerated) { |
||||
System.out.println("PROCESSED-LESS-THAN-GENERATED"); |
||||
} |
||||
|
||||
boolean success; |
||||
|
||||
final Map<String, Set<Number>> received = |
||||
events.get("echo") |
||||
.entrySet() |
||||
.stream() |
||||
.map(entry -> mkEntry( |
||||
entry.getKey(), |
||||
entry.getValue().stream().map(ConsumerRecord::value).collect(Collectors.toSet())) |
||||
) |
||||
.collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue)); |
||||
|
||||
success = inputs.equals(received); |
||||
|
||||
if (success) { |
||||
System.out.println("ALL-RECORDS-DELIVERED"); |
||||
} else { |
||||
int missedCount = 0; |
||||
for (final Map.Entry<String, Set<Integer>> entry : inputs.entrySet()) { |
||||
missedCount += received.get(entry.getKey()).size(); |
||||
} |
||||
System.out.println("missedRecords=" + missedCount); |
||||
} |
||||
|
||||
// give it one more try if it's not already passing.
|
||||
if (!verificationResult.passed()) { |
||||
verificationResult = verifyAll(inputs, events, true); |
||||
} |
||||
success &= verificationResult.passed(); |
||||
|
||||
System.out.println(verificationResult.result()); |
||||
|
||||
System.out.println(success ? "SUCCESS" : "FAILURE"); |
||||
return verificationResult; |
||||
} |
||||
|
||||
public static class VerificationResult { |
||||
private final boolean passed; |
||||
private final String result; |
||||
|
||||
VerificationResult(final boolean passed, final String result) { |
||||
this.passed = passed; |
||||
this.result = result; |
||||
} |
||||
|
||||
public boolean passed() { |
||||
return passed; |
||||
} |
||||
|
||||
public String result() { |
||||
return result; |
||||
} |
||||
} |
||||
|
||||
private static VerificationResult verifyAll(final Map<String, Set<Integer>> inputs, |
||||
final Map<String, Map<String, LinkedList<ConsumerRecord<String, Number>>>> events, |
||||
final boolean printResults) { |
||||
final ByteArrayOutputStream byteArrayOutputStream = new ByteArrayOutputStream(); |
||||
boolean pass; |
||||
try (final PrintStream resultStream = new PrintStream(byteArrayOutputStream)) { |
||||
pass = verifyTAgg(resultStream, inputs, events.get("tagg"), printResults); |
||||
pass &= verifySuppressed(resultStream, "min-suppressed", events, printResults); |
||||
pass &= verify(resultStream, "min-suppressed", inputs, events, windowedKey -> { |
||||
final String unwindowedKey = windowedKey.substring(1, windowedKey.length() - 1).replaceAll("@.*", ""); |
||||
return getMin(unwindowedKey); |
||||
}, printResults); |
||||
pass &= verifySuppressed(resultStream, "sws-suppressed", events, printResults); |
||||
pass &= verify(resultStream, "min", inputs, events, SmokeTestDriver::getMin, printResults); |
||||
pass &= verify(resultStream, "max", inputs, events, SmokeTestDriver::getMax, printResults); |
||||
pass &= verify(resultStream, "dif", inputs, events, key -> getMax(key).intValue() - getMin(key).intValue(), printResults); |
||||
pass &= verify(resultStream, "sum", inputs, events, SmokeTestDriver::getSum, printResults); |
||||
pass &= verify(resultStream, "cnt", inputs, events, key1 -> getMax(key1).intValue() - getMin(key1).intValue() + 1L, printResults); |
||||
pass &= verify(resultStream, "avg", inputs, events, SmokeTestDriver::getAvg, printResults); |
||||
} |
||||
return new VerificationResult(pass, new String(byteArrayOutputStream.toByteArray(), StandardCharsets.UTF_8)); |
||||
} |
||||
|
||||
private static boolean verify(final PrintStream resultStream, |
||||
final String topic, |
||||
final Map<String, Set<Integer>> inputData, |
||||
final Map<String, Map<String, LinkedList<ConsumerRecord<String, Number>>>> events, |
||||
final Function<String, Number> keyToExpectation, |
||||
final boolean printResults) { |
||||
final Map<String, LinkedList<ConsumerRecord<String, Number>>> observedInputEvents = events.get("data"); |
||||
final Map<String, LinkedList<ConsumerRecord<String, Number>>> outputEvents = events.getOrDefault(topic, emptyMap()); |
||||
if (outputEvents.isEmpty()) { |
||||
resultStream.println(topic + " is empty"); |
||||
return false; |
||||
} else { |
||||
resultStream.printf("verifying %s with %d keys%n", topic, outputEvents.size()); |
||||
|
||||
if (outputEvents.size() != inputData.size()) { |
||||
resultStream.printf("fail: resultCount=%d expectedCount=%s%n\tresult=%s%n\texpected=%s%n", |
||||
outputEvents.size(), inputData.size(), outputEvents.keySet(), inputData.keySet()); |
||||
return false; |
||||
} |
||||
for (final Map.Entry<String, LinkedList<ConsumerRecord<String, Number>>> entry : outputEvents.entrySet()) { |
||||
final String key = entry.getKey(); |
||||
final Number expected = keyToExpectation.apply(key); |
||||
final Number actual = entry.getValue().getLast().value(); |
||||
if (!expected.equals(actual)) { |
||||
resultStream.printf("%s fail: key=%s actual=%s expected=%s%n", topic, key, actual, expected); |
||||
|
||||
if (printResults) { |
||||
resultStream.printf("\t inputEvents=%n%s%n\t" + |
||||
"echoEvents=%n%s%n\tmaxEvents=%n%s%n\tminEvents=%n%s%n\tdifEvents=%n%s%n\tcntEvents=%n%s%n\ttaggEvents=%n%s%n", |
||||
indent("\t\t", observedInputEvents.get(key)), |
||||
indent("\t\t", events.getOrDefault("echo", emptyMap()).getOrDefault(key, new LinkedList<>())), |
||||
indent("\t\t", events.getOrDefault("max", emptyMap()).getOrDefault(key, new LinkedList<>())), |
||||
indent("\t\t", events.getOrDefault("min", emptyMap()).getOrDefault(key, new LinkedList<>())), |
||||
indent("\t\t", events.getOrDefault("dif", emptyMap()).getOrDefault(key, new LinkedList<>())), |
||||
indent("\t\t", events.getOrDefault("cnt", emptyMap()).getOrDefault(key, new LinkedList<>())), |
||||
indent("\t\t", events.getOrDefault("tagg", emptyMap()).getOrDefault(key, new LinkedList<>()))); |
||||
|
||||
if (!Utils.mkSet("echo", "max", "min", "dif", "cnt", "tagg").contains(topic)) |
||||
resultStream.printf("%sEvents=%n%s%n", topic, indent("\t\t", entry.getValue())); |
||||
} |
||||
|
||||
return false; |
||||
} |
||||
} |
||||
return true; |
||||
} |
||||
} |
||||
|
||||
|
||||
private static boolean verifySuppressed(final PrintStream resultStream, |
||||
@SuppressWarnings("SameParameterValue") final String topic, |
||||
final Map<String, Map<String, LinkedList<ConsumerRecord<String, Number>>>> events, |
||||
final boolean printResults) { |
||||
resultStream.println("verifying suppressed " + topic); |
||||
final Map<String, LinkedList<ConsumerRecord<String, Number>>> topicEvents = events.getOrDefault(topic, emptyMap()); |
||||
for (final Map.Entry<String, LinkedList<ConsumerRecord<String, Number>>> entry : topicEvents.entrySet()) { |
||||
if (entry.getValue().size() != 1) { |
||||
final String unsuppressedTopic = topic.replace("-suppressed", "-raw"); |
||||
final String key = entry.getKey(); |
||||
final String unwindowedKey = key.substring(1, key.length() - 1).replaceAll("@.*", ""); |
||||
resultStream.printf("fail: key=%s%n\tnon-unique result:%n%s%n", |
||||
key, |
||||
indent("\t\t", entry.getValue())); |
||||
|
||||
if (printResults) |
||||
resultStream.printf("\tresultEvents:%n%s%n\tinputEvents:%n%s%n", |
||||
indent("\t\t", events.get(unsuppressedTopic).get(key)), |
||||
indent("\t\t", events.get("data").get(unwindowedKey))); |
||||
|
||||
return false; |
||||
} |
||||
} |
||||
return true; |
||||
} |
||||
|
||||
private static String indent(@SuppressWarnings("SameParameterValue") final String prefix, |
||||
final Iterable<ConsumerRecord<String, Number>> list) { |
||||
final StringBuilder stringBuilder = new StringBuilder(); |
||||
for (final ConsumerRecord<String, Number> record : list) { |
||||
stringBuilder.append(prefix).append(record).append('\n'); |
||||
} |
||||
return stringBuilder.toString(); |
||||
} |
||||
|
||||
private static Long getSum(final String key) { |
||||
final int min = getMin(key).intValue(); |
||||
final int max = getMax(key).intValue(); |
||||
return ((long) min + max) * (max - min + 1L) / 2L; |
||||
} |
||||
|
||||
private static Double getAvg(final String key) { |
||||
final int min = getMin(key).intValue(); |
||||
final int max = getMax(key).intValue(); |
||||
return ((long) min + max) / 2.0; |
||||
} |
||||
|
||||
|
||||
private static boolean verifyTAgg(final PrintStream resultStream, |
||||
final Map<String, Set<Integer>> allData, |
||||
final Map<String, LinkedList<ConsumerRecord<String, Number>>> taggEvents, |
||||
final boolean printResults) { |
||||
if (taggEvents == null) { |
||||
resultStream.println("tagg is missing"); |
||||
return false; |
||||
} else if (taggEvents.isEmpty()) { |
||||
resultStream.println("tagg is empty"); |
||||
return false; |
||||
} else { |
||||
resultStream.println("verifying tagg"); |
||||
|
||||
// generate expected answer
|
||||
final Map<String, Long> expected = new HashMap<>(); |
||||
for (final String key : allData.keySet()) { |
||||
final int min = getMin(key).intValue(); |
||||
final int max = getMax(key).intValue(); |
||||
final String cnt = Long.toString(max - min + 1L); |
||||
|
||||
expected.put(cnt, expected.getOrDefault(cnt, 0L) + 1); |
||||
} |
||||
|
||||
// check the result
|
||||
for (final Map.Entry<String, LinkedList<ConsumerRecord<String, Number>>> entry : taggEvents.entrySet()) { |
||||
final String key = entry.getKey(); |
||||
Long expectedCount = expected.remove(key); |
||||
if (expectedCount == null) { |
||||
expectedCount = 0L; |
||||
} |
||||
|
||||
if (entry.getValue().getLast().value().longValue() != expectedCount) { |
||||
resultStream.println("fail: key=" + key + " tagg=" + entry.getValue() + " expected=" + expectedCount); |
||||
|
||||
if (printResults) |
||||
resultStream.println("\t taggEvents: " + entry.getValue()); |
||||
return false; |
||||
} |
||||
} |
||||
|
||||
} |
||||
return true; |
||||
} |
||||
|
||||
private static Number getMin(final String key) { |
||||
return Integer.parseInt(key.split("-")[0]); |
||||
} |
||||
|
||||
private static Number getMax(final String key) { |
||||
return Integer.parseInt(key.split("-")[1]); |
||||
} |
||||
|
||||
private static List<TopicPartition> getAllPartitions(final KafkaConsumer<?, ?> consumer, final String... topics) { |
||||
final List<TopicPartition> partitions = new ArrayList<>(); |
||||
|
||||
for (final String topic : topics) { |
||||
for (final PartitionInfo info : consumer.partitionsFor(topic)) { |
||||
partitions.add(new TopicPartition(info.topic(), info.partition())); |
||||
} |
||||
} |
||||
return partitions; |
||||
} |
||||
|
||||
} |
@ -0,0 +1,131 @@
@@ -0,0 +1,131 @@
|
||||
/* |
||||
* Licensed to the Apache Software Foundation (ASF) under one or more |
||||
* contributor license agreements. See the NOTICE file distributed with |
||||
* this work for additional information regarding copyright ownership. |
||||
* The ASF licenses this file to You under the Apache License, Version 2.0 |
||||
* (the "License"); you may not use this file except in compliance with |
||||
* the License. You may obtain a copy of the License at |
||||
* |
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
* |
||||
* Unless required by applicable law or agreed to in writing, software |
||||
* distributed under the License is distributed on an "AS IS" BASIS, |
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
||||
* See the License for the specific language governing permissions and |
||||
* limitations under the License. |
||||
*/ |
||||
package org.apache.kafka.streams.tests; |
||||
|
||||
import org.apache.kafka.common.serialization.Serde; |
||||
import org.apache.kafka.common.serialization.Serdes; |
||||
import org.apache.kafka.streams.KeyValue; |
||||
import org.apache.kafka.streams.kstream.Aggregator; |
||||
import org.apache.kafka.streams.kstream.Initializer; |
||||
import org.apache.kafka.streams.kstream.KeyValueMapper; |
||||
import org.apache.kafka.streams.kstream.Windowed; |
||||
import org.apache.kafka.streams.processor.api.ContextualProcessor; |
||||
import org.apache.kafka.streams.processor.api.ProcessorContext; |
||||
import org.apache.kafka.streams.processor.api.ProcessorSupplier; |
||||
|
||||
import java.time.Instant; |
||||
import org.apache.kafka.streams.processor.api.Record; |
||||
|
||||
public class SmokeTestUtil { |
||||
|
||||
final static int END = Integer.MAX_VALUE; |
||||
|
||||
static ProcessorSupplier<Object, Object, Void, Void> printProcessorSupplier(final String topic) { |
||||
return printProcessorSupplier(topic, ""); |
||||
} |
||||
|
||||
static ProcessorSupplier<Object, Object, Void, Void> printProcessorSupplier(final String topic, final String name) { |
||||
return () -> new ContextualProcessor<Object, Object, Void, Void>() { |
||||
private int numRecordsProcessed = 0; |
||||
private long smallestOffset = Long.MAX_VALUE; |
||||
private long largestOffset = Long.MIN_VALUE; |
||||
|
||||
@Override |
||||
public void init(final ProcessorContext<Void, Void> context) { |
||||
super.init(context); |
||||
System.out.println("[3.1] initializing processor: topic=" + topic + " taskId=" + context.taskId()); |
||||
System.out.flush(); |
||||
numRecordsProcessed = 0; |
||||
smallestOffset = Long.MAX_VALUE; |
||||
largestOffset = Long.MIN_VALUE; |
||||
} |
||||
|
||||
@Override |
||||
public void process(final Record<Object, Object> record) { |
||||
numRecordsProcessed++; |
||||
if (numRecordsProcessed % 100 == 0) { |
||||
System.out.printf("%s: %s%n", name, Instant.now()); |
||||
System.out.println("processed " + numRecordsProcessed + " records from topic=" + topic); |
||||
} |
||||
|
||||
context().recordMetadata().ifPresent(recordMetadata -> { |
||||
if (smallestOffset > recordMetadata.offset()) { |
||||
smallestOffset = recordMetadata.offset(); |
||||
} |
||||
if (largestOffset < recordMetadata.offset()) { |
||||
largestOffset = recordMetadata.offset(); |
||||
} |
||||
}); |
||||
} |
||||
|
||||
@Override |
||||
public void close() { |
||||
System.out.printf("Close processor for task %s%n", context().taskId()); |
||||
System.out.println("processed " + numRecordsProcessed + " records"); |
||||
final long processed; |
||||
if (largestOffset >= smallestOffset) { |
||||
processed = 1L + largestOffset - smallestOffset; |
||||
} else { |
||||
processed = 0L; |
||||
} |
||||
System.out.println("offset " + smallestOffset + " to " + largestOffset + " -> processed " + processed); |
||||
System.out.flush(); |
||||
} |
||||
}; |
||||
} |
||||
|
||||
public static final class Unwindow<K, V> implements KeyValueMapper<Windowed<K>, V, K> { |
||||
@Override |
||||
public K apply(final Windowed<K> winKey, final V value) { |
||||
return winKey.key(); |
||||
} |
||||
} |
||||
|
||||
public static class Agg { |
||||
|
||||
KeyValueMapper<String, Long, KeyValue<String, Long>> selector() { |
||||
return (key, value) -> new KeyValue<>(value == null ? null : Long.toString(value), 1L); |
||||
} |
||||
|
||||
public Initializer<Long> init() { |
||||
return () -> 0L; |
||||
} |
||||
|
||||
Aggregator<String, Long, Long> adder() { |
||||
return (aggKey, value, aggregate) -> aggregate + value; |
||||
} |
||||
|
||||
Aggregator<String, Long, Long> remover() { |
||||
return (aggKey, value, aggregate) -> aggregate - value; |
||||
} |
||||
} |
||||
|
||||
public static Serde<String> stringSerde = Serdes.String(); |
||||
|
||||
public static Serde<Integer> intSerde = Serdes.Integer(); |
||||
|
||||
static Serde<Long> longSerde = Serdes.Long(); |
||||
|
||||
static Serde<Double> doubleSerde = Serdes.Double(); |
||||
|
||||
public static void sleep(final long duration) { |
||||
try { |
||||
Thread.sleep(duration); |
||||
} catch (final Exception ignore) { } |
||||
} |
||||
|
||||
} |
@ -0,0 +1,100 @@
@@ -0,0 +1,100 @@
|
||||
/* |
||||
* Licensed to the Apache Software Foundation (ASF) under one or more |
||||
* contributor license agreements. See the NOTICE file distributed with |
||||
* this work for additional information regarding copyright ownership. |
||||
* The ASF licenses this file to You under the Apache License, Version 2.0 |
||||
* (the "License"); you may not use this file except in compliance with |
||||
* the License. You may obtain a copy of the License at |
||||
* |
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
* |
||||
* Unless required by applicable law or agreed to in writing, software |
||||
* distributed under the License is distributed on an "AS IS" BASIS, |
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
||||
* See the License for the specific language governing permissions and |
||||
* limitations under the License. |
||||
*/ |
||||
package org.apache.kafka.streams.tests; |
||||
|
||||
import org.apache.kafka.common.utils.Exit; |
||||
import org.apache.kafka.common.utils.Utils; |
||||
import org.apache.kafka.streams.StreamsConfig; |
||||
|
||||
import java.io.IOException; |
||||
import java.time.Duration; |
||||
import java.util.Map; |
||||
import java.util.Properties; |
||||
import java.util.Set; |
||||
import java.util.UUID; |
||||
|
||||
import static org.apache.kafka.streams.tests.SmokeTestDriver.generate; |
||||
import static org.apache.kafka.streams.tests.SmokeTestDriver.generatePerpetually; |
||||
|
||||
public class StreamsSmokeTest { |
||||
|
||||
/** |
||||
* args ::= kafka propFileName command disableAutoTerminate |
||||
* command := "run" | "process" |
||||
* |
||||
* @param args |
||||
*/ |
||||
public static void main(final String[] args) throws IOException { |
||||
if (args.length < 2) { |
||||
System.err.println("StreamsSmokeTest are expecting two parameters: propFile, command; but only see " + args.length + " parameter"); |
||||
Exit.exit(1); |
||||
} |
||||
|
||||
final String propFileName = args[0]; |
||||
final String command = args[1]; |
||||
final boolean disableAutoTerminate = args.length > 2; |
||||
|
||||
final Properties streamsProperties = Utils.loadProps(propFileName); |
||||
final String kafka = streamsProperties.getProperty(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG); |
||||
final String processingGuarantee = streamsProperties.getProperty(StreamsConfig.PROCESSING_GUARANTEE_CONFIG); |
||||
|
||||
if (kafka == null) { |
||||
System.err.println("No bootstrap kafka servers specified in " + StreamsConfig.BOOTSTRAP_SERVERS_CONFIG); |
||||
Exit.exit(1); |
||||
} |
||||
|
||||
if ("process".equals(command)) { |
||||
if (!StreamsConfig.AT_LEAST_ONCE.equals(processingGuarantee) && |
||||
!StreamsConfig.EXACTLY_ONCE_V2.equals(processingGuarantee)) { |
||||
|
||||
System.err.println("processingGuarantee must be either " + StreamsConfig.AT_LEAST_ONCE + " or " + |
||||
StreamsConfig.EXACTLY_ONCE_V2); |
||||
|
||||
Exit.exit(1); |
||||
} |
||||
} |
||||
|
||||
System.out.println("StreamsTest instance started (StreamsSmokeTest)"); |
||||
System.out.println("command=" + command); |
||||
System.out.println("props=" + streamsProperties); |
||||
System.out.println("disableAutoTerminate=" + disableAutoTerminate); |
||||
|
||||
switch (command) { |
||||
case "run": |
||||
// this starts the driver (data generation and result verification)
|
||||
final int numKeys = 10; |
||||
final int maxRecordsPerKey = 500; |
||||
if (disableAutoTerminate) { |
||||
generatePerpetually(kafka, numKeys, maxRecordsPerKey); |
||||
} else { |
||||
// slow down data production to span 30 seconds so that system tests have time to
|
||||
// do their bounces, etc.
|
||||
final Map<String, Set<Integer>> allData = |
||||
generate(kafka, numKeys, maxRecordsPerKey, Duration.ofSeconds(30)); |
||||
SmokeTestDriver.verify(kafka, allData, maxRecordsPerKey); |
||||
} |
||||
break; |
||||
case "process": |
||||
// this starts the stream processing app
|
||||
new SmokeTestClient(UUID.randomUUID().toString()).start(streamsProperties); |
||||
break; |
||||
default: |
||||
System.out.println("unknown command: " + command); |
||||
} |
||||
} |
||||
|
||||
} |
@ -0,0 +1,88 @@
@@ -0,0 +1,88 @@
|
||||
/* |
||||
* Licensed to the Apache Software Foundation (ASF) under one or more |
||||
* contributor license agreements. See the NOTICE file distributed with |
||||
* this work for additional information regarding copyright ownership. |
||||
* The ASF licenses this file to You under the Apache License, Version 2.0 |
||||
* (the "License"); you may not use this file except in compliance with |
||||
* the License. You may obtain a copy of the License at |
||||
* |
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
* |
||||
* Unless required by applicable law or agreed to in writing, software |
||||
* distributed under the License is distributed on an "AS IS" BASIS, |
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
||||
* See the License for the specific language governing permissions and |
||||
* limitations under the License. |
||||
*/ |
||||
package org.apache.kafka.streams.tests; |
||||
|
||||
import org.apache.kafka.common.utils.Utils; |
||||
import org.apache.kafka.streams.KafkaStreams; |
||||
import org.apache.kafka.streams.StreamsBuilder; |
||||
import org.apache.kafka.streams.StreamsConfig; |
||||
import org.apache.kafka.streams.kstream.KStream; |
||||
import org.apache.kafka.streams.processor.api.ContextualProcessor; |
||||
import org.apache.kafka.streams.processor.api.ProcessorContext; |
||||
import org.apache.kafka.streams.processor.api.ProcessorSupplier; |
||||
import org.apache.kafka.streams.processor.api.Record; |
||||
|
||||
import java.util.Properties; |
||||
|
||||
|
||||
public class StreamsUpgradeTest { |
||||
|
||||
@SuppressWarnings("unchecked") |
||||
public static void main(final String[] args) throws Exception { |
||||
if (args.length < 1) { |
||||
System.err.println("StreamsUpgradeTest requires one argument (properties-file) but provided none"); |
||||
} |
||||
final String propFileName = args[0]; |
||||
|
||||
final Properties streamsProperties = Utils.loadProps(propFileName); |
||||
|
||||
System.out.println("StreamsTest instance started (StreamsUpgradeTest v3.1)"); |
||||
System.out.println("props=" + streamsProperties); |
||||
|
||||
final StreamsBuilder builder = new StreamsBuilder(); |
||||
final KStream dataStream = builder.stream("data"); |
||||
dataStream.process(printProcessorSupplier()); |
||||
dataStream.to("echo"); |
||||
|
||||
final Properties config = new Properties(); |
||||
config.setProperty(StreamsConfig.APPLICATION_ID_CONFIG, "StreamsUpgradeTest"); |
||||
config.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, 1000); |
||||
config.putAll(streamsProperties); |
||||
|
||||
final KafkaStreams streams = new KafkaStreams(builder.build(), config); |
||||
streams.start(); |
||||
|
||||
Runtime.getRuntime().addShutdownHook(new Thread(() -> { |
||||
streams.close(); |
||||
System.out.println("UPGRADE-TEST-CLIENT-CLOSED"); |
||||
System.out.flush(); |
||||
})); |
||||
} |
||||
|
||||
private static <KIn, VIn, KOut, VOut> ProcessorSupplier<KIn, VIn, KOut, VOut> printProcessorSupplier() { |
||||
return () -> new ContextualProcessor<KIn, VIn, KOut, VOut>() { |
||||
private int numRecordsProcessed = 0; |
||||
|
||||
@Override |
||||
public void init(final ProcessorContext<KOut, VOut> context) { |
||||
System.out.println("[3.1] initializing processor: topic=data taskId=" + context.taskId()); |
||||
numRecordsProcessed = 0; |
||||
} |
||||
|
||||
@Override |
||||
public void process(final Record<KIn, VIn> record) { |
||||
numRecordsProcessed++; |
||||
if (numRecordsProcessed % 100 == 0) { |
||||
System.out.println("processed " + numRecordsProcessed + " records from topic=data"); |
||||
} |
||||
} |
||||
|
||||
@Override |
||||
public void close() {} |
||||
}; |
||||
} |
||||
} |
Loading…
Reference in new issue