Browse Source

MINOR: Adding system test for named repartition topics (#5913)

This is a system test for doing a rolling upgrade of a topology with a named repartition topic.

1. An initial Kafka Streams application is started on 3 nodes. The topology has one operation forcing a repartition and the repartition topic is explicitly named.
2. Each node is started and processing of data is validated
3. Then one node is stopped (full stop is verified)
4. A property is set signaling the node to add operations to the topology before the repartition node which forces a renumbering of all operators (except repartition node)
5. Restart the node and confirm processing records
6. Repeat the steps for the other 2 nodes completing the rolling upgrade

I ran two runs of the system test with 25 repeats in each run for a total of 50 test runs.
All test runs passed

Reviewers: John Roesler <john@confluent.io>, Matthias J. Sax <matthias@confluent.io>
pull/5995/head
Bill Bejeck 6 years ago committed by Guozhang Wang
parent
commit
ab1fb3fdde
  1. 120
      streams/src/test/java/org/apache/kafka/streams/tests/StreamsNamedRepartitionTest.java
  2. 22
      tests/kafkatest/services/streams.py
  3. 119
      tests/kafkatest/tests/streams/streams_named_repartition_topic_test.py

120
streams/src/test/java/org/apache/kafka/streams/tests/StreamsNamedRepartitionTest.java

@ -0,0 +1,120 @@ @@ -0,0 +1,120 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.streams.tests;
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.common.utils.Utils;
import org.apache.kafka.streams.KafkaStreams;
import org.apache.kafka.streams.KafkaStreams.State;
import org.apache.kafka.streams.StreamsBuilder;
import org.apache.kafka.streams.StreamsConfig;
import org.apache.kafka.streams.Topology;
import org.apache.kafka.streams.kstream.Aggregator;
import org.apache.kafka.streams.kstream.Consumed;
import org.apache.kafka.streams.kstream.Grouped;
import org.apache.kafka.streams.kstream.Initializer;
import org.apache.kafka.streams.kstream.KStream;
import org.apache.kafka.streams.kstream.Materialized;
import org.apache.kafka.streams.kstream.Produced;
import java.time.Duration;
import java.util.Objects;
import java.util.Properties;
import java.util.function.Function;
public class StreamsNamedRepartitionTest {
public static void main(final String[] args) throws Exception {
if (args.length < 1) {
System.err.println("StreamsNamedRepartitionTest requires one argument (properties-file) but none provided: ");
}
final String propFileName = args[0];
final Properties streamsProperties = Utils.loadProps(propFileName);
System.out.println("StreamsTest instance started NAMED_REPARTITION_TEST");
System.out.println("props=" + streamsProperties);
final String inputTopic = (String) (Objects.requireNonNull(streamsProperties.remove("input.topic")));
final String aggregationTopic = (String) (Objects.requireNonNull(streamsProperties.remove("aggregation.topic")));
final boolean addOperators = Boolean.valueOf(Objects.requireNonNull((String) streamsProperties.remove("add.operations")));
final Initializer<Integer> initializer = () -> 0;
final Aggregator<String, String, Integer> aggregator = (k, v, agg) -> agg + Integer.parseInt(v);
final Function<String, String> keyFunction = s -> Integer.toString(Integer.parseInt(s) % 9);
final StreamsBuilder builder = new StreamsBuilder();
final KStream<String, String> sourceStream = builder.stream(inputTopic, Consumed.with(Serdes.String(), Serdes.String()));
final KStream<String, String> mappedStream = sourceStream.selectKey((k, v) -> keyFunction.apply(v));
final KStream<String, String> maybeUpdatedStream;
if (addOperators) {
maybeUpdatedStream = mappedStream.filter((k, v) -> true).mapValues(v -> Integer.toString(Integer.parseInt(v) + 1));
} else {
maybeUpdatedStream = mappedStream;
}
maybeUpdatedStream.groupByKey(Grouped.with("grouped-stream", Serdes.String(), Serdes.String()))
.aggregate(initializer, aggregator, Materialized.with(Serdes.String(), Serdes.Integer()))
.toStream()
.peek((k, v) -> System.out.println(String.format("AGGREGATED key=%s value=%s", k, v)))
.to(aggregationTopic, Produced.with(Serdes.String(), Serdes.Integer()));
final Properties config = new Properties();
config.setProperty(StreamsConfig.APPLICATION_ID_CONFIG, "StreamsNamedRepartitionTest");
config.setProperty(StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG, "0");
config.setProperty(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName());
config.setProperty(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName());
config.putAll(streamsProperties);
final Topology topology = builder.build(config);
final KafkaStreams streams = new KafkaStreams(topology, config);
streams.setStateListener((oldState, newState) -> {
if (oldState == State.REBALANCING && newState == State.RUNNING) {
if (addOperators) {
System.out.println("UPDATED Topology");
} else {
System.out.println("REBALANCING -> RUNNING");
}
System.out.flush();
}
});
streams.start();
Runtime.getRuntime().addShutdownHook(new Thread(() -> {
System.out.println("closing Kafka Streams instance");
System.out.flush();
streams.close(Duration.ofMillis(5000));
System.out.println("NAMED_REPARTITION_TEST Streams Stopped");
System.out.flush();
}));
}
}

22
tests/kafkatest/services/streams.py

@ -506,3 +506,25 @@ class StreamsUpgradeTestJobRunnerService(StreamsTestBaseService): @@ -506,3 +506,25 @@ class StreamsUpgradeTestJobRunnerService(StreamsTestBaseService):
self.logger.info("Executing: " + cmd)
return cmd
class StreamsNamedRepartitionTopicService(StreamsTestBaseService):
def __init__(self, test_context, kafka):
super(StreamsNamedRepartitionTopicService, self).__init__(test_context,
kafka,
"org.apache.kafka.streams.tests.StreamsNamedRepartitionTest",
"")
self.ADD_ADDITIONAL_OPS = 'false'
self.INPUT_TOPIC = None
self.AGGREGATION_TOPIC = None
def prop_file(self):
properties = {streams_property.STATE_DIR: self.PERSISTENT_ROOT,
streams_property.KAFKA_SERVERS: self.kafka.bootstrap_servers()}
properties['input.topic'] = self.INPUT_TOPIC
properties['aggregation.topic'] = self.AGGREGATION_TOPIC
properties['add.operations'] = self.ADD_ADDITIONAL_OPS
cfg = KafkaConfig(**properties)
return cfg.render()

119
tests/kafkatest/tests/streams/streams_named_repartition_topic_test.py

@ -0,0 +1,119 @@ @@ -0,0 +1,119 @@
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import time
from ducktape.tests.test import Test
from ducktape.utils.util import wait_until
from kafkatest.services.kafka import KafkaService
from kafkatest.services.streams import StreamsNamedRepartitionTopicService
from kafkatest.services.verifiable_producer import VerifiableProducer
from kafkatest.services.zookeeper import ZookeeperService
class StreamsNamedRepartitionTopicTest(Test):
"""
Tests using a named repartition topic by starting
application then doing a rolling upgrade with added
operations and the application still runs
"""
input_topic = 'inputTopic'
aggregation_topic = 'aggregationTopic'
pattern = 'AGGREGATED'
def __init__(self, test_context):
super(StreamsNamedRepartitionTopicTest, self).__init__(test_context)
self.topics = {
self.input_topic: {'partitions': 6},
self.aggregation_topic: {'partitions': 6}
}
self.zookeeper = ZookeeperService(self.test_context, num_nodes=1)
self.kafka = KafkaService(self.test_context, num_nodes=1,
zk=self.zookeeper, topics=self.topics)
self.producer = VerifiableProducer(self.test_context,
1,
self.kafka,
self.input_topic,
throughput=1000,
acks=1)
def test_upgrade_topology_with_named_repartition_topic(self):
self.zookeeper.start()
self.kafka.start()
processor1 = StreamsNamedRepartitionTopicService(self.test_context, self.kafka)
processor2 = StreamsNamedRepartitionTopicService(self.test_context, self.kafka)
processor3 = StreamsNamedRepartitionTopicService(self.test_context, self.kafka)
processors = [processor1, processor2, processor3]
self.producer.start()
for processor in processors:
processor.CLEAN_NODE_ENABLED = False
self.set_topics(processor)
processor.start()
self.verify_running(processor, 'REBALANCING -> RUNNING')
self.verify_processing(processors)
# do rolling upgrade
for processor in processors:
self.verify_stopped(processor)
# will tell app to add operations before repartition topic
processor.ADD_ADDITIONAL_OPS = 'true'
processor.start()
self.verify_running(processor, 'UPDATED Topology')
self.verify_processing(processors)
self.stop_processors(processors)
self.producer.stop()
self.kafka.stop()
self.zookeeper.stop()
@staticmethod
def verify_running(processor, message):
with processor.node.account.monitor_log(processor.STDOUT_FILE) as monitor:
monitor.wait_until(message,
timeout_sec=60,
err_msg="Never saw '%s' message " % message + str(processor.node.account))
@staticmethod
def verify_stopped(processor):
node = processor.node
with node.account.monitor_log(processor.STDOUT_FILE) as monitor:
processor.stop()
monitor.wait_until('NAMED_REPARTITION_TEST Streams Stopped',
timeout_sec=60,
err_msg="'NAMED_REPARTITION_TEST Streams Stopped' message" + str(processor.node.account))
def verify_processing(self, processors):
for processor in processors:
with processor.node.account.monitor_log(processor.STDOUT_FILE) as monitor:
monitor.wait_until(self.pattern,
timeout_sec=60,
err_msg="Never saw processing of %s " % self.pattern + str(processor.node.account))
def stop_processors(self, processors):
for processor in processors:
self.verify_stopped(processor)
def set_topics(self, processor):
processor.INPUT_TOPIC = self.input_topic
processor.AGGREGATION_TOPIC = self.aggregation_topic
Loading…
Cancel
Save