Browse Source

KAFKA-8155: Add 2.2.0 release to system tests (#6597)

Reviewers: Bill Bejeck <bill@confluent.io>, Boyang Chen <boyang@confluent.io>, Bruno Cadonna <bruno@confluent.io>, Guozhang Wang <guozhang@confuent.io>
pull/6879/head
Matthias J. Sax 6 years ago committed by GitHub
parent
commit
ba3dc49437
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 12
      build.gradle
  2. 2
      gradle/dependencies.gradle
  3. 1
      settings.gradle
  4. 5
      streams/upgrade-system-tests-0102/src/test/java/org/apache/kafka/streams/tests/StreamsUpgradeTest.java
  5. 5
      streams/upgrade-system-tests-0110/src/test/java/org/apache/kafka/streams/tests/StreamsUpgradeTest.java
  6. 5
      streams/upgrade-system-tests-10/src/test/java/org/apache/kafka/streams/tests/StreamsUpgradeTest.java
  7. 5
      streams/upgrade-system-tests-11/src/test/java/org/apache/kafka/streams/tests/StreamsUpgradeTest.java
  8. 3
      streams/upgrade-system-tests-20/src/test/java/org/apache/kafka/streams/tests/StreamsUpgradeTest.java
  9. 3
      streams/upgrade-system-tests-21/src/test/java/org/apache/kafka/streams/tests/StreamsUpgradeTest.java
  10. 90
      streams/upgrade-system-tests-22/src/test/java/org/apache/kafka/streams/tests/StreamsUpgradeTest.java
  11. 1
      tests/docker/Dockerfile
  12. 5
      tests/kafkatest/services/streams.py
  13. 6
      tests/kafkatest/tests/streams/streams_upgrade_test.py

12
build.gradle

@ -1363,6 +1363,18 @@ project(':streams:upgrade-system-tests-21') { @@ -1363,6 +1363,18 @@ project(':streams:upgrade-system-tests-21') {
}
}
project(':streams:upgrade-system-tests-22') {
archivesBaseName = "kafka-streams-upgrade-system-tests-22"
dependencies {
testCompile libs.kafkaStreams_22
}
systemTestLibs {
dependsOn testJar
}
}
project(':jmh-benchmarks') {
apply plugin: 'com.github.johnrengelman.shadow'

2
gradle/dependencies.gradle

@ -79,6 +79,7 @@ versions += [ @@ -79,6 +79,7 @@ versions += [
kafka_11: "1.1.1",
kafka_20: "2.0.1",
kafka_21: "2.1.1",
kafka_22: "2.2.0",
lz4: "1.6.0",
mavenArtifact: "3.6.1",
metrics: "2.2.0",
@ -143,6 +144,7 @@ libs += [ @@ -143,6 +144,7 @@ libs += [
kafkaStreams_11: "org.apache.kafka:kafka-streams:$versions.kafka_11",
kafkaStreams_20: "org.apache.kafka:kafka-streams:$versions.kafka_20",
kafkaStreams_21: "org.apache.kafka:kafka-streams:$versions.kafka_21",
kafkaStreams_22: "org.apache.kafka:kafka-streams:$versions.kafka_22",
log4j: "log4j:log4j:$versions.log4j",
lz4: "org.lz4:lz4-java:$versions.lz4",
metrics: "com.yammer.metrics:metrics-core:$versions.metrics",

1
settings.gradle

@ -37,4 +37,5 @@ include 'clients', @@ -37,4 +37,5 @@ include 'clients',
'streams:upgrade-system-tests-11',
'streams:upgrade-system-tests-20',
'streams:upgrade-system-tests-21',
'streams:upgrade-system-tests-22',
'tools'

5
streams/upgrade-system-tests-0102/src/test/java/org/apache/kafka/streams/tests/StreamsUpgradeTest.java

@ -30,13 +30,10 @@ import java.util.Properties; @@ -30,13 +30,10 @@ import java.util.Properties;
public class StreamsUpgradeTest {
/**
* This test cannot be executed, as long as Kafka 0.10.2.2 is not released
*/
@SuppressWarnings("unchecked")
public static void main(final String[] args) throws Exception {
if (args.length < 2) {
System.err.println("StreamsUpgradeTest requires three argument (kafka-url, properties-file) but only " + args.length + " provided: "
System.err.println("StreamsUpgradeTest requires two argument (kafka-url, properties-file) but only " + args.length + " provided: "
+ (args.length > 0 ? args[0] : ""));
}
final String kafka = args[0];

5
streams/upgrade-system-tests-0110/src/test/java/org/apache/kafka/streams/tests/StreamsUpgradeTest.java

@ -30,13 +30,10 @@ import java.util.Properties; @@ -30,13 +30,10 @@ import java.util.Properties;
public class StreamsUpgradeTest {
/**
* This test cannot be executed, as long as Kafka 0.11.0.3 is not released
*/
@SuppressWarnings("unchecked")
public static void main(final String[] args) throws Exception {
if (args.length < 2) {
System.err.println("StreamsUpgradeTest requires three argument (kafka-url, properties-file) but only " + args.length + " provided: "
System.err.println("StreamsUpgradeTest requires two argument (kafka-url, properties-file) but only " + args.length + " provided: "
+ (args.length > 0 ? args[0] : ""));
}
final String kafka = args[0];

5
streams/upgrade-system-tests-10/src/test/java/org/apache/kafka/streams/tests/StreamsUpgradeTest.java

@ -30,13 +30,10 @@ import java.util.Properties; @@ -30,13 +30,10 @@ import java.util.Properties;
public class StreamsUpgradeTest {
/**
* This test cannot be executed, as long as Kafka 1.0.2 is not released
*/
@SuppressWarnings("unchecked")
public static void main(final String[] args) throws Exception {
if (args.length < 2) {
System.err.println("StreamsUpgradeTest requires three argument (kafka-url, properties-file) but only " + args.length + " provided: "
System.err.println("StreamsUpgradeTest requires two argument (kafka-url, properties-file) but only " + args.length + " provided: "
+ (args.length > 0 ? args[0] : ""));
}
final String kafka = args[0];

5
streams/upgrade-system-tests-11/src/test/java/org/apache/kafka/streams/tests/StreamsUpgradeTest.java

@ -30,13 +30,10 @@ import java.util.Properties; @@ -30,13 +30,10 @@ import java.util.Properties;
public class StreamsUpgradeTest {
/**
* This test cannot be executed, as long as Kafka 1.1.1 is not released
*/
@SuppressWarnings("unchecked")
public static void main(final String[] args) throws Exception {
if (args.length < 2) {
System.err.println("StreamsUpgradeTest requires three argument (kafka-url, properties-file) but only " + args.length + " provided: "
System.err.println("StreamsUpgradeTest requires two argument (kafka-url, properties-file) but only " + args.length + " provided: "
+ (args.length > 0 ? args[0] : ""));
}
final String kafka = args[0];

3
streams/upgrade-system-tests-20/src/test/java/org/apache/kafka/streams/tests/StreamsUpgradeTest.java

@ -29,11 +29,10 @@ import java.util.Properties; @@ -29,11 +29,10 @@ import java.util.Properties;
public class StreamsUpgradeTest {
@SuppressWarnings("unchecked")
public static void main(final String[] args) throws Exception {
if (args.length < 2) {
System.err.println("StreamsUpgradeTest requires three argument (kafka-url, properties-file) but only " + args.length + " provided: "
System.err.println("StreamsUpgradeTest requires two argument (kafka-url, properties-file) but only " + args.length + " provided: "
+ (args.length > 0 ? args[0] : ""));
}
final String kafka = args[0];

3
streams/upgrade-system-tests-21/src/test/java/org/apache/kafka/streams/tests/StreamsUpgradeTest.java

@ -29,11 +29,10 @@ import java.util.Properties; @@ -29,11 +29,10 @@ import java.util.Properties;
public class StreamsUpgradeTest {
@SuppressWarnings("unchecked")
public static void main(final String[] args) throws Exception {
if (args.length < 2) {
System.err.println("StreamsUpgradeTest requires three argument (kafka-url, properties-file) but only " + args.length + " provided: "
System.err.println("StreamsUpgradeTest requires two argument (kafka-url, properties-file) but only " + args.length + " provided: "
+ (args.length > 0 ? args[0] : ""));
}
final String kafka = args[0];

90
streams/upgrade-system-tests-22/src/test/java/org/apache/kafka/streams/tests/StreamsUpgradeTest.java

@ -0,0 +1,90 @@ @@ -0,0 +1,90 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.streams.tests;
import org.apache.kafka.common.utils.Utils;
import org.apache.kafka.streams.KafkaStreams;
import org.apache.kafka.streams.StreamsBuilder;
import org.apache.kafka.streams.StreamsConfig;
import org.apache.kafka.streams.kstream.KStream;
import org.apache.kafka.streams.processor.AbstractProcessor;
import org.apache.kafka.streams.processor.ProcessorContext;
import org.apache.kafka.streams.processor.ProcessorSupplier;
import java.util.Properties;
public class StreamsUpgradeTest {
@SuppressWarnings("unchecked")
public static void main(final String[] args) throws Exception {
if (args.length < 2) {
System.err.println("StreamsUpgradeTest requires two argument (kafka-url, properties-file) but only " + args.length + " provided: "
+ (args.length > 0 ? args[0] : ""));
}
final String kafka = args[0];
final String propFileName = args.length > 1 ? args[1] : null;
final Properties streamsProperties = Utils.loadProps(propFileName);
System.out.println("StreamsTest instance started (StreamsUpgradeTest v2.2)");
System.out.println("kafka=" + kafka);
System.out.println("props=" + streamsProperties);
final StreamsBuilder builder = new StreamsBuilder();
final KStream dataStream = builder.stream("data");
dataStream.process(printProcessorSupplier());
dataStream.to("echo");
final Properties config = new Properties();
config.setProperty(StreamsConfig.APPLICATION_ID_CONFIG, "StreamsUpgradeTest");
config.setProperty(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, kafka);
config.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, 1000);
config.putAll(streamsProperties);
final KafkaStreams streams = new KafkaStreams(builder.build(), config);
streams.start();
Runtime.getRuntime().addShutdownHook(new Thread(() -> {
streams.close();
System.out.println("UPGRADE-TEST-CLIENT-CLOSED");
System.out.flush();
}));
}
private static <K, V> ProcessorSupplier<K, V> printProcessorSupplier() {
return () -> new AbstractProcessor<K, V>() {
private int numRecordsProcessed = 0;
@Override
public void init(final ProcessorContext context) {
System.out.println("[2.2] initializing processor: topic=data taskId=" + context.taskId());
numRecordsProcessed = 0;
}
@Override
public void process(final K key, final V value) {
numRecordsProcessed++;
if (numRecordsProcessed % 100 == 0) {
System.out.println("processed " + numRecordsProcessed + " records from topic=data");
}
}
@Override
public void close() {}
};
}
}

1
tests/docker/Dockerfile

@ -65,6 +65,7 @@ RUN curl -s "$KAFKA_MIRROR/kafka-streams-1.0.2-test.jar" -o /opt/kafka-1.0.2/lib @@ -65,6 +65,7 @@ RUN curl -s "$KAFKA_MIRROR/kafka-streams-1.0.2-test.jar" -o /opt/kafka-1.0.2/lib
RUN curl -s "$KAFKA_MIRROR/kafka-streams-1.1.1-test.jar" -o /opt/kafka-1.1.1/libs/kafka-streams-1.1.1-test.jar
RUN curl -s "$KAFKA_MIRROR/kafka-streams-2.0.1-test.jar" -o /opt/kafka-2.0.1/libs/kafka-streams-2.0.1-test.jar
RUN curl -s "$KAFKA_MIRROR/kafka-streams-2.1.1-test.jar" -o /opt/kafka-2.1.1/libs/kafka-streams-2.1.1-test.jar
RUN curl -s "$KAFKA_MIRROR/kafka-streams-2.2.0-test.jar" -o /opt/kafka-2.2.0/libs/kafka-streams-2.2.0-test.jar
# The version of Kibosh to use for testing.
# If you update this, also update vagrant/base.sy

5
tests/kafkatest/services/streams.py

@ -21,7 +21,8 @@ from ducktape.utils.util import wait_until @@ -21,7 +21,8 @@ from ducktape.utils.util import wait_until
from kafkatest.directory_layout.kafka_path import KafkaPathResolverMixin
from kafkatest.services.kafka import KafkaConfig
from kafkatest.services.monitor.jmx import JmxMixin
from kafkatest.version import LATEST_0_10_0, LATEST_0_10_1, LATEST_0_10_2, LATEST_0_11_0, LATEST_1_0, LATEST_1_1, LATEST_2_0, LATEST_2_1
from kafkatest.version import LATEST_0_10_0, LATEST_0_10_1, LATEST_0_10_2, LATEST_0_11_0, LATEST_1_0, LATEST_1_1,\
LATEST_2_0, LATEST_2_1, LATEST_2_2
STATE_DIR = "state.dir"
@ -487,7 +488,7 @@ class StreamsUpgradeTestJobRunnerService(StreamsTestBaseService): @@ -487,7 +488,7 @@ class StreamsUpgradeTestJobRunnerService(StreamsTestBaseService):
args = self.args.copy()
if self.KAFKA_STREAMS_VERSION in [str(LATEST_0_10_0), str(LATEST_0_10_1), str(LATEST_0_10_2),
str(LATEST_0_11_0), str(LATEST_1_0), str(LATEST_1_1),
str(LATEST_2_0), str(LATEST_2_1)]:
str(LATEST_2_0), str(LATEST_2_1), str(LATEST_2_2)]:
args['kafka'] = self.kafka.bootstrap_servers()
else:
args['kafka'] = ""

6
tests/kafkatest/tests/streams/streams_upgrade_test.py

@ -24,7 +24,7 @@ from kafkatest.services.streams import StreamsSmokeTestDriverService, StreamsSmo @@ -24,7 +24,7 @@ from kafkatest.services.streams import StreamsSmokeTestDriverService, StreamsSmo
StreamsUpgradeTestJobRunnerService
from kafkatest.services.zookeeper import ZookeeperService
from kafkatest.version import LATEST_0_10_0, LATEST_0_10_1, LATEST_0_10_2, LATEST_0_11_0, LATEST_1_0, LATEST_1_1, \
LATEST_2_0, LATEST_2_1, DEV_BRANCH, DEV_VERSION, KafkaVersion
LATEST_2_0, LATEST_2_1, LATEST_2_2, DEV_BRANCH, DEV_VERSION, KafkaVersion
# broker 0.10.0 is not compatible with newer Kafka Streams versions
broker_upgrade_versions = [str(LATEST_0_10_1), str(LATEST_0_10_2), str(LATEST_0_11_0), str(LATEST_1_0), str(LATEST_1_1), str(LATEST_2_0), str(LATEST_2_1), str(DEV_BRANCH)]
@ -34,7 +34,9 @@ metadata_2_versions = [str(LATEST_0_10_1), str(LATEST_0_10_2), str(LATEST_0_11_0 @@ -34,7 +34,9 @@ metadata_2_versions = [str(LATEST_0_10_1), str(LATEST_0_10_2), str(LATEST_0_11_0
# once 0.10.1.2 is available backward_compatible_metadata_2_versions
# can be replaced with metadata_2_versions
backward_compatible_metadata_2_versions = [str(LATEST_0_10_2), str(LATEST_0_11_0), str(LATEST_1_0), str(LATEST_1_1)]
metadata_3_or_higher_versions = [str(LATEST_2_0), str(LATEST_2_1), str(DEV_VERSION)]
# If we add a new version below, we also need to add this version to `streams.py`:
# -> class `StreamsUpgradeTestJobRunnerService`, method `start_cmd`, variable `KAFKA_STREAMS_VERSION`
metadata_3_or_higher_versions = [str(LATEST_2_0), str(LATEST_2_1), str(LATEST_2_2), str(DEV_VERSION)]
"""
After each release one should first check that the released version has been uploaded to

Loading…
Cancel
Save