Browse Source

MINOR: Streams upgrade system test cleanup (#7571)

Reviewers: Guozhang Wang <wangguoz@gmail.com>, Sophie Blee-Goldman <sophie@confluent.io>, Boyang Chen <boyang@confluent.io>,
pull/7591/merge
Bill Bejeck 5 years ago committed by GitHub
parent
commit
c015169aa6
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 4
      streams/src/test/java/org/apache/kafka/streams/tests/StreamsUpgradeTest.java
  2. 14
      streams/upgrade-system-tests-0100/src/test/java/org/apache/kafka/streams/tests/StreamsUpgradeTest.java
  3. 14
      streams/upgrade-system-tests-0101/src/test/java/org/apache/kafka/streams/tests/StreamsUpgradeTest.java
  4. 10
      streams/upgrade-system-tests-0102/src/test/java/org/apache/kafka/streams/tests/StreamsUpgradeTest.java
  5. 10
      streams/upgrade-system-tests-0110/src/test/java/org/apache/kafka/streams/tests/StreamsUpgradeTest.java
  6. 10
      streams/upgrade-system-tests-10/src/test/java/org/apache/kafka/streams/tests/StreamsUpgradeTest.java
  7. 10
      streams/upgrade-system-tests-11/src/test/java/org/apache/kafka/streams/tests/StreamsUpgradeTest.java
  8. 10
      streams/upgrade-system-tests-20/src/test/java/org/apache/kafka/streams/tests/StreamsUpgradeTest.java
  9. 10
      streams/upgrade-system-tests-21/src/test/java/org/apache/kafka/streams/tests/StreamsUpgradeTest.java
  10. 10
      streams/upgrade-system-tests-22/src/test/java/org/apache/kafka/streams/tests/StreamsUpgradeTest.java
  11. 10
      streams/upgrade-system-tests-23/src/test/java/org/apache/kafka/streams/tests/StreamsUpgradeTest.java
  12. 9
      tests/kafkatest/services/streams.py
  13. 4
      tests/kafkatest/tests/streams/streams_upgrade_test.py

4
streams/src/test/java/org/apache/kafka/streams/tests/StreamsUpgradeTest.java

@ -240,7 +240,9 @@ public class StreamsUpgradeTest { @@ -240,7 +240,9 @@ public class StreamsUpgradeTest {
info.prevTasks(),
info.standbyTasks(),
info.userEndPoint())
.encode()));
.encode(),
subscription.ownedPartitions()
));
}
assignment = super.assign(metadata, new GroupSubscription(downgradedSubscriptions)).groupAssignment();
bumpUsedVersion = true;

14
streams/upgrade-system-tests-0100/src/test/java/org/apache/kafka/streams/tests/StreamsUpgradeTest.java

@ -32,19 +32,16 @@ public class StreamsUpgradeTest { @@ -32,19 +32,16 @@ public class StreamsUpgradeTest {
@SuppressWarnings("unchecked")
public static void main(final String[] args) throws Exception {
if (args.length < 3) {
System.err.println("StreamsUpgradeTest requires three argument (kafka-url, zookeeper-url, properties-file) but only " + args.length + " provided: "
+ (args.length > 0 ? args[0] + " " : "")
+ (args.length > 1 ? args[1] : ""));
if (args.length < 2) {
System.err.println("StreamsUpgradeTest requires two arguments (zookeeper-url, properties-file) but only " + args.length + " provided: "
+ (args.length > 0 ? args[0] + " " : ""));
}
final String kafka = args[0];
final String zookeeper = args[1];
final String propFileName = args.length > 2 ? args[2] : null;
final String zookeeper = args[0];
final String propFileName = args[1];
final Properties streamsProperties = Utils.loadProps(propFileName);
System.out.println("StreamsTest instance started (StreamsUpgradeTest v0.10.0)");
System.out.println("kafka=" + kafka);
System.out.println("zookeeper=" + zookeeper);
System.out.println("props=" + streamsProperties);
@ -55,7 +52,6 @@ public class StreamsUpgradeTest { @@ -55,7 +52,6 @@ public class StreamsUpgradeTest {
final Properties config = new Properties();
config.setProperty(StreamsConfig.APPLICATION_ID_CONFIG, "StreamsUpgradeTest");
config.setProperty(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, kafka);
config.setProperty(StreamsConfig.ZOOKEEPER_CONNECT_CONFIG, zookeeper);
config.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, 1000);
config.putAll(streamsProperties);

14
streams/upgrade-system-tests-0101/src/test/java/org/apache/kafka/streams/tests/StreamsUpgradeTest.java

@ -35,19 +35,16 @@ public class StreamsUpgradeTest { @@ -35,19 +35,16 @@ public class StreamsUpgradeTest {
*/
@SuppressWarnings("unchecked")
public static void main(final String[] args) throws Exception {
if (args.length < 3) {
System.err.println("StreamsUpgradeTest requires three argument (kafka-url, zookeeper-url, properties-file) but only " + args.length + " provided: "
+ (args.length > 0 ? args[0] + " " : "")
+ (args.length > 1 ? args[1] : ""));
if (args.length < 2) {
System.err.println("StreamsUpgradeTest requires two arguments (zookeeper-url, properties-file) but only " + args.length + " provided: "
+ (args.length > 0 ? args[0] + " " : ""));
}
final String kafka = args[0];
final String zookeeper = args[1];
final String propFileName = args.length > 2 ? args[2] : null;
final String zookeeper = args[0];
final String propFileName = args[1];
final Properties streamsProperties = Utils.loadProps(propFileName);
System.out.println("StreamsTest instance started (StreamsUpgradeTest v0.10.1)");
System.out.println("kafka=" + kafka);
System.out.println("zookeeper=" + zookeeper);
System.out.println("props=" + streamsProperties);
@ -58,7 +55,6 @@ public class StreamsUpgradeTest { @@ -58,7 +55,6 @@ public class StreamsUpgradeTest {
final Properties config = new Properties();
config.setProperty(StreamsConfig.APPLICATION_ID_CONFIG, "StreamsUpgradeTest");
config.setProperty(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, kafka);
config.setProperty(StreamsConfig.ZOOKEEPER_CONNECT_CONFIG, zookeeper);
config.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, 1000);
config.putAll(streamsProperties);

10
streams/upgrade-system-tests-0102/src/test/java/org/apache/kafka/streams/tests/StreamsUpgradeTest.java

@ -32,17 +32,14 @@ public class StreamsUpgradeTest { @@ -32,17 +32,14 @@ public class StreamsUpgradeTest {
@SuppressWarnings("unchecked")
public static void main(final String[] args) throws Exception {
if (args.length < 2) {
System.err.println("StreamsUpgradeTest requires two argument (kafka-url, properties-file) but only " + args.length + " provided: "
+ (args.length > 0 ? args[0] : ""));
if (args.length < 1) {
System.err.println("StreamsUpgradeTest requires one argument (properties-file) but provided none");
}
final String kafka = args[0];
final String propFileName = args.length > 1 ? args[1] : null;
final String propFileName = args[0];
final Properties streamsProperties = Utils.loadProps(propFileName);
System.out.println("StreamsTest instance started (StreamsUpgradeTest v0.10.2)");
System.out.println("kafka=" + kafka);
System.out.println("props=" + streamsProperties);
final KStreamBuilder builder = new KStreamBuilder();
@ -52,7 +49,6 @@ public class StreamsUpgradeTest { @@ -52,7 +49,6 @@ public class StreamsUpgradeTest {
final Properties config = new Properties();
config.setProperty(StreamsConfig.APPLICATION_ID_CONFIG, "StreamsUpgradeTest");
config.setProperty(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, kafka);
config.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, 1000);
config.putAll(streamsProperties);

10
streams/upgrade-system-tests-0110/src/test/java/org/apache/kafka/streams/tests/StreamsUpgradeTest.java

@ -32,17 +32,14 @@ public class StreamsUpgradeTest { @@ -32,17 +32,14 @@ public class StreamsUpgradeTest {
@SuppressWarnings("unchecked")
public static void main(final String[] args) throws Exception {
if (args.length < 2) {
System.err.println("StreamsUpgradeTest requires two argument (kafka-url, properties-file) but only " + args.length + " provided: "
+ (args.length > 0 ? args[0] : ""));
if (args.length < 1) {
System.err.println("StreamsUpgradeTest requires one argument (properties-file) but provided none");
}
final String kafka = args[0];
final String propFileName = args.length > 1 ? args[1] : null;
final String propFileName = args[0];
final Properties streamsProperties = Utils.loadProps(propFileName);
System.out.println("StreamsTest instance started (StreamsUpgradeTest v0.11.0)");
System.out.println("kafka=" + kafka);
System.out.println("props=" + streamsProperties);
final KStreamBuilder builder = new KStreamBuilder();
@ -52,7 +49,6 @@ public class StreamsUpgradeTest { @@ -52,7 +49,6 @@ public class StreamsUpgradeTest {
final Properties config = new Properties();
config.setProperty(StreamsConfig.APPLICATION_ID_CONFIG, "StreamsUpgradeTest");
config.setProperty(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, kafka);
config.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, 1000);
config.putAll(streamsProperties);

10
streams/upgrade-system-tests-10/src/test/java/org/apache/kafka/streams/tests/StreamsUpgradeTest.java

@ -32,17 +32,14 @@ public class StreamsUpgradeTest { @@ -32,17 +32,14 @@ public class StreamsUpgradeTest {
@SuppressWarnings("unchecked")
public static void main(final String[] args) throws Exception {
if (args.length < 2) {
System.err.println("StreamsUpgradeTest requires two argument (kafka-url, properties-file) but only " + args.length + " provided: "
+ (args.length > 0 ? args[0] : ""));
if (args.length < 1) {
System.err.println("StreamsUpgradeTest requires one argument (properties-file) but provided none");
}
final String kafka = args[0];
final String propFileName = args.length > 1 ? args[1] : null;
final String propFileName = args[0];
final Properties streamsProperties = Utils.loadProps(propFileName);
System.out.println("StreamsTest instance started (StreamsUpgradeTest v1.0)");
System.out.println("kafka=" + kafka);
System.out.println("props=" + streamsProperties);
final StreamsBuilder builder = new StreamsBuilder();
@ -52,7 +49,6 @@ public class StreamsUpgradeTest { @@ -52,7 +49,6 @@ public class StreamsUpgradeTest {
final Properties config = new Properties();
config.setProperty(StreamsConfig.APPLICATION_ID_CONFIG, "StreamsUpgradeTest");
config.setProperty(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, kafka);
config.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, 1000);
config.putAll(streamsProperties);

10
streams/upgrade-system-tests-11/src/test/java/org/apache/kafka/streams/tests/StreamsUpgradeTest.java

@ -32,17 +32,14 @@ public class StreamsUpgradeTest { @@ -32,17 +32,14 @@ public class StreamsUpgradeTest {
@SuppressWarnings("unchecked")
public static void main(final String[] args) throws Exception {
if (args.length < 2) {
System.err.println("StreamsUpgradeTest requires two argument (kafka-url, properties-file) but only " + args.length + " provided: "
+ (args.length > 0 ? args[0] : ""));
if (args.length < 1) {
System.err.println("StreamsUpgradeTest requires one argument (properties-file) but provided none");
}
final String kafka = args[0];
final String propFileName = args.length > 1 ? args[1] : null;
final String propFileName = args[0];
final Properties streamsProperties = Utils.loadProps(propFileName);
System.out.println("StreamsTest instance started (StreamsUpgradeTest v1.1)");
System.out.println("kafka=" + kafka);
System.out.println("props=" + streamsProperties);
final StreamsBuilder builder = new StreamsBuilder();
@ -52,7 +49,6 @@ public class StreamsUpgradeTest { @@ -52,7 +49,6 @@ public class StreamsUpgradeTest {
final Properties config = new Properties();
config.setProperty(StreamsConfig.APPLICATION_ID_CONFIG, "StreamsUpgradeTest");
config.setProperty(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, kafka);
config.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, 1000);
config.putAll(streamsProperties);

10
streams/upgrade-system-tests-20/src/test/java/org/apache/kafka/streams/tests/StreamsUpgradeTest.java

@ -31,17 +31,14 @@ public class StreamsUpgradeTest { @@ -31,17 +31,14 @@ public class StreamsUpgradeTest {
@SuppressWarnings("unchecked")
public static void main(final String[] args) throws Exception {
if (args.length < 2) {
System.err.println("StreamsUpgradeTest requires two argument (kafka-url, properties-file) but only " + args.length + " provided: "
+ (args.length > 0 ? args[0] : ""));
if (args.length < 1) {
System.err.println("StreamsUpgradeTest requires one argument (properties-file) but provided none");
}
final String kafka = args[0];
final String propFileName = args.length > 1 ? args[1] : null;
final String propFileName = args[0];
final Properties streamsProperties = Utils.loadProps(propFileName);
System.out.println("StreamsTest instance started (StreamsUpgradeTest v2.0)");
System.out.println("kafka=" + kafka);
System.out.println("props=" + streamsProperties);
final StreamsBuilder builder = new StreamsBuilder();
@ -51,7 +48,6 @@ public class StreamsUpgradeTest { @@ -51,7 +48,6 @@ public class StreamsUpgradeTest {
final Properties config = new Properties();
config.setProperty(StreamsConfig.APPLICATION_ID_CONFIG, "StreamsUpgradeTest");
config.setProperty(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, kafka);
config.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, 1000);
config.putAll(streamsProperties);

10
streams/upgrade-system-tests-21/src/test/java/org/apache/kafka/streams/tests/StreamsUpgradeTest.java

@ -31,17 +31,14 @@ public class StreamsUpgradeTest { @@ -31,17 +31,14 @@ public class StreamsUpgradeTest {
@SuppressWarnings("unchecked")
public static void main(final String[] args) throws Exception {
if (args.length < 2) {
System.err.println("StreamsUpgradeTest requires two argument (kafka-url, properties-file) but only " + args.length + " provided: "
+ (args.length > 0 ? args[0] : ""));
if (args.length < 1) {
System.err.println("StreamsUpgradeTest requires one argument (properties-file) but provided none");
}
final String kafka = args[0];
final String propFileName = args.length > 1 ? args[1] : null;
final String propFileName = args[0];
final Properties streamsProperties = Utils.loadProps(propFileName);
System.out.println("StreamsTest instance started (StreamsUpgradeTest v2.1)");
System.out.println("kafka=" + kafka);
System.out.println("props=" + streamsProperties);
final StreamsBuilder builder = new StreamsBuilder();
@ -51,7 +48,6 @@ public class StreamsUpgradeTest { @@ -51,7 +48,6 @@ public class StreamsUpgradeTest {
final Properties config = new Properties();
config.setProperty(StreamsConfig.APPLICATION_ID_CONFIG, "StreamsUpgradeTest");
config.setProperty(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, kafka);
config.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, 1000);
config.putAll(streamsProperties);

10
streams/upgrade-system-tests-22/src/test/java/org/apache/kafka/streams/tests/StreamsUpgradeTest.java

@ -31,17 +31,14 @@ public class StreamsUpgradeTest { @@ -31,17 +31,14 @@ public class StreamsUpgradeTest {
@SuppressWarnings("unchecked")
public static void main(final String[] args) throws Exception {
if (args.length < 2) {
System.err.println("StreamsUpgradeTest requires two argument (kafka-url, properties-file) but only " + args.length + " provided: "
+ (args.length > 0 ? args[0] : ""));
if (args.length < 1) {
System.err.println("StreamsUpgradeTest requires one argument (properties-file) but provided none");
}
final String kafka = args[0];
final String propFileName = args.length > 1 ? args[1] : null;
final String propFileName = args[0];
final Properties streamsProperties = Utils.loadProps(propFileName);
System.out.println("StreamsTest instance started (StreamsUpgradeTest v2.2)");
System.out.println("kafka=" + kafka);
System.out.println("props=" + streamsProperties);
final StreamsBuilder builder = new StreamsBuilder();
@ -51,7 +48,6 @@ public class StreamsUpgradeTest { @@ -51,7 +48,6 @@ public class StreamsUpgradeTest {
final Properties config = new Properties();
config.setProperty(StreamsConfig.APPLICATION_ID_CONFIG, "StreamsUpgradeTest");
config.setProperty(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, kafka);
config.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, 1000);
config.putAll(streamsProperties);

10
streams/upgrade-system-tests-23/src/test/java/org/apache/kafka/streams/tests/StreamsUpgradeTest.java

@ -31,17 +31,14 @@ public class StreamsUpgradeTest { @@ -31,17 +31,14 @@ public class StreamsUpgradeTest {
@SuppressWarnings("unchecked")
public static void main(final String[] args) throws Exception {
if (args.length < 2) {
System.err.println("StreamsUpgradeTest requires two argument (kafka-url, properties-file) but only " + args.length + " provided: "
+ (args.length > 0 ? args[0] : ""));
if (args.length < 1) {
System.err.println("StreamsUpgradeTest requires one argument (properties-file) but provided none");
}
final String kafka = args[0];
final String propFileName = args.length > 1 ? args[1] : null;
final String propFileName = args[0];
final Properties streamsProperties = Utils.loadProps(propFileName);
System.out.println("StreamsTest instance started (StreamsUpgradeTest v2.3)");
System.out.println("kafka=" + kafka);
System.out.println("props=" + streamsProperties);
final StreamsBuilder builder = new StreamsBuilder();
@ -51,7 +48,6 @@ public class StreamsUpgradeTest { @@ -51,7 +48,6 @@ public class StreamsUpgradeTest {
final Properties config = new Properties();
config.setProperty(StreamsConfig.APPLICATION_ID_CONFIG, "StreamsUpgradeTest");
config.setProperty(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, kafka);
config.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, 1000);
config.putAll(streamsProperties);

9
tests/kafkatest/services/streams.py

@ -487,12 +487,7 @@ class StreamsUpgradeTestJobRunnerService(StreamsTestBaseService): @@ -487,12 +487,7 @@ class StreamsUpgradeTestJobRunnerService(StreamsTestBaseService):
def start_cmd(self, node):
args = self.args.copy()
if self.KAFKA_STREAMS_VERSION in [str(LATEST_0_10_0), str(LATEST_0_10_1), str(LATEST_0_10_2),
str(LATEST_0_11_0), str(LATEST_1_0), str(LATEST_1_1),
str(LATEST_2_0), str(LATEST_2_1), str(LATEST_2_2), str(LATEST_2_3)]:
args['kafka'] = self.kafka.bootstrap_servers()
else:
args['kafka'] = ""
if self.KAFKA_STREAMS_VERSION == str(LATEST_0_10_0) or self.KAFKA_STREAMS_VERSION == str(LATEST_0_10_1):
args['zk'] = self.kafka.zk.connect_setting()
else:
@ -507,7 +502,7 @@ class StreamsUpgradeTestJobRunnerService(StreamsTestBaseService): @@ -507,7 +502,7 @@ class StreamsUpgradeTestJobRunnerService(StreamsTestBaseService):
cmd = "( export KAFKA_LOG4J_OPTS=\"-Dlog4j.configuration=file:%(log4j)s\"; " \
"INCLUDE_TEST_JARS=true UPGRADE_KAFKA_STREAMS_TEST_VERSION=%(version)s " \
" %(kafka_run_class)s %(streams_class_name)s %(kafka)s %(zk)s %(config_file)s " \
" %(kafka_run_class)s %(streams_class_name)s %(zk)s %(config_file)s " \
" & echo $! >&3 ) 1>> %(stdout)s 2>> %(stderr)s 3> %(pidfile)s" % args
self.logger.info("Executing: " + cmd)

4
tests/kafkatest/tests/streams/streams_upgrade_test.py

@ -527,11 +527,11 @@ class StreamsUpgradeTest(Test): @@ -527,11 +527,11 @@ class StreamsUpgradeTest(Test):
monitors[second_other_processor] = second_other_monitor
if len(self.old_processors) > 0:
log_monitor.wait_until("Sent a version 6 subscription and got version 5 assignment back (successful version probing). Downgrade subscription metadata to commonly supported version and trigger new rebalance.",
log_monitor.wait_until("Sent a version 6 subscription and got version 5 assignment back (successful version probing). Downgrade subscription metadata to commonly supported version 5 and trigger new rebalance.",
timeout_sec=60,
err_msg="Could not detect 'successful version probing' at upgrading node " + str(node.account))
else:
log_monitor.wait_until("Sent a version 6 subscription and got version 5 assignment back (successful version probing). Downgrade subscription metadata to commonly supported version and trigger new rebalance.",
log_monitor.wait_until("Sent a version 6 subscription and got version 5 assignment back (successful version probing). Downgrade subscription metadata to commonly supported version 6 and trigger new rebalance.",
timeout_sec=60,
err_msg="Could not detect 'successful version probing with upgraded leader' at upgrading node " + str(node.account))
first_other_monitor.wait_until("Sent a version 5 subscription and group.s latest commonly supported version is 6 (successful version probing and end of rolling upgrade). Upgrading subscription metadata version to 6 for next rebalance.",

Loading…
Cancel
Save