From c015169aa6ae835978cadbcb1d17a123e8c475f8 Mon Sep 17 00:00:00 2001 From: Bill Bejeck Date: Thu, 24 Oct 2019 10:28:29 -0400 Subject: [PATCH] MINOR: Streams upgrade system test cleanup (#7571) Reviewers: Guozhang Wang , Sophie Blee-Goldman , Boyang Chen , --- .../kafka/streams/tests/StreamsUpgradeTest.java | 4 +++- .../kafka/streams/tests/StreamsUpgradeTest.java | 14 +++++--------- .../kafka/streams/tests/StreamsUpgradeTest.java | 14 +++++--------- .../kafka/streams/tests/StreamsUpgradeTest.java | 10 +++------- .../kafka/streams/tests/StreamsUpgradeTest.java | 10 +++------- .../kafka/streams/tests/StreamsUpgradeTest.java | 10 +++------- .../kafka/streams/tests/StreamsUpgradeTest.java | 10 +++------- .../kafka/streams/tests/StreamsUpgradeTest.java | 10 +++------- .../kafka/streams/tests/StreamsUpgradeTest.java | 10 +++------- .../kafka/streams/tests/StreamsUpgradeTest.java | 10 +++------- .../kafka/streams/tests/StreamsUpgradeTest.java | 10 +++------- tests/kafkatest/services/streams.py | 9 ++------- .../tests/streams/streams_upgrade_test.py | 4 ++-- 13 files changed, 41 insertions(+), 84 deletions(-) diff --git a/streams/src/test/java/org/apache/kafka/streams/tests/StreamsUpgradeTest.java b/streams/src/test/java/org/apache/kafka/streams/tests/StreamsUpgradeTest.java index 185fa7c3bb1..318154889ca 100644 --- a/streams/src/test/java/org/apache/kafka/streams/tests/StreamsUpgradeTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/tests/StreamsUpgradeTest.java @@ -240,7 +240,9 @@ public class StreamsUpgradeTest { info.prevTasks(), info.standbyTasks(), info.userEndPoint()) - .encode())); + .encode(), + subscription.ownedPartitions() + )); } assignment = super.assign(metadata, new GroupSubscription(downgradedSubscriptions)).groupAssignment(); bumpUsedVersion = true; diff --git a/streams/upgrade-system-tests-0100/src/test/java/org/apache/kafka/streams/tests/StreamsUpgradeTest.java b/streams/upgrade-system-tests-0100/src/test/java/org/apache/kafka/streams/tests/StreamsUpgradeTest.java index c2d8c4ddb37..3c1d99fde25 100644 --- a/streams/upgrade-system-tests-0100/src/test/java/org/apache/kafka/streams/tests/StreamsUpgradeTest.java +++ b/streams/upgrade-system-tests-0100/src/test/java/org/apache/kafka/streams/tests/StreamsUpgradeTest.java @@ -32,19 +32,16 @@ public class StreamsUpgradeTest { @SuppressWarnings("unchecked") public static void main(final String[] args) throws Exception { - if (args.length < 3) { - System.err.println("StreamsUpgradeTest requires three argument (kafka-url, zookeeper-url, properties-file) but only " + args.length + " provided: " - + (args.length > 0 ? args[0] + " " : "") - + (args.length > 1 ? args[1] : "")); + if (args.length < 2) { + System.err.println("StreamsUpgradeTest requires two arguments (zookeeper-url, properties-file) but only " + args.length + " provided: " + + (args.length > 0 ? args[0] + " " : "")); } - final String kafka = args[0]; - final String zookeeper = args[1]; - final String propFileName = args.length > 2 ? args[2] : null; + final String zookeeper = args[0]; + final String propFileName = args[1]; final Properties streamsProperties = Utils.loadProps(propFileName); System.out.println("StreamsTest instance started (StreamsUpgradeTest v0.10.0)"); - System.out.println("kafka=" + kafka); System.out.println("zookeeper=" + zookeeper); System.out.println("props=" + streamsProperties); @@ -55,7 +52,6 @@ public class StreamsUpgradeTest { final Properties config = new Properties(); config.setProperty(StreamsConfig.APPLICATION_ID_CONFIG, "StreamsUpgradeTest"); - config.setProperty(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, kafka); config.setProperty(StreamsConfig.ZOOKEEPER_CONNECT_CONFIG, zookeeper); config.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, 1000); config.putAll(streamsProperties); diff --git a/streams/upgrade-system-tests-0101/src/test/java/org/apache/kafka/streams/tests/StreamsUpgradeTest.java b/streams/upgrade-system-tests-0101/src/test/java/org/apache/kafka/streams/tests/StreamsUpgradeTest.java index e525658f6cf..53ee0dcc42b 100644 --- a/streams/upgrade-system-tests-0101/src/test/java/org/apache/kafka/streams/tests/StreamsUpgradeTest.java +++ b/streams/upgrade-system-tests-0101/src/test/java/org/apache/kafka/streams/tests/StreamsUpgradeTest.java @@ -35,19 +35,16 @@ public class StreamsUpgradeTest { */ @SuppressWarnings("unchecked") public static void main(final String[] args) throws Exception { - if (args.length < 3) { - System.err.println("StreamsUpgradeTest requires three argument (kafka-url, zookeeper-url, properties-file) but only " + args.length + " provided: " - + (args.length > 0 ? args[0] + " " : "") - + (args.length > 1 ? args[1] : "")); + if (args.length < 2) { + System.err.println("StreamsUpgradeTest requires two arguments (zookeeper-url, properties-file) but only " + args.length + " provided: " + + (args.length > 0 ? args[0] + " " : "")); } - final String kafka = args[0]; - final String zookeeper = args[1]; - final String propFileName = args.length > 2 ? args[2] : null; + final String zookeeper = args[0]; + final String propFileName = args[1]; final Properties streamsProperties = Utils.loadProps(propFileName); System.out.println("StreamsTest instance started (StreamsUpgradeTest v0.10.1)"); - System.out.println("kafka=" + kafka); System.out.println("zookeeper=" + zookeeper); System.out.println("props=" + streamsProperties); @@ -58,7 +55,6 @@ public class StreamsUpgradeTest { final Properties config = new Properties(); config.setProperty(StreamsConfig.APPLICATION_ID_CONFIG, "StreamsUpgradeTest"); - config.setProperty(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, kafka); config.setProperty(StreamsConfig.ZOOKEEPER_CONNECT_CONFIG, zookeeper); config.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, 1000); config.putAll(streamsProperties); diff --git a/streams/upgrade-system-tests-0102/src/test/java/org/apache/kafka/streams/tests/StreamsUpgradeTest.java b/streams/upgrade-system-tests-0102/src/test/java/org/apache/kafka/streams/tests/StreamsUpgradeTest.java index fa855521a64..d8e355b2e98 100644 --- a/streams/upgrade-system-tests-0102/src/test/java/org/apache/kafka/streams/tests/StreamsUpgradeTest.java +++ b/streams/upgrade-system-tests-0102/src/test/java/org/apache/kafka/streams/tests/StreamsUpgradeTest.java @@ -32,17 +32,14 @@ public class StreamsUpgradeTest { @SuppressWarnings("unchecked") public static void main(final String[] args) throws Exception { - if (args.length < 2) { - System.err.println("StreamsUpgradeTest requires two argument (kafka-url, properties-file) but only " + args.length + " provided: " - + (args.length > 0 ? args[0] : "")); + if (args.length < 1) { + System.err.println("StreamsUpgradeTest requires one argument (properties-file) but provided none"); } - final String kafka = args[0]; - final String propFileName = args.length > 1 ? args[1] : null; + final String propFileName = args[0]; final Properties streamsProperties = Utils.loadProps(propFileName); System.out.println("StreamsTest instance started (StreamsUpgradeTest v0.10.2)"); - System.out.println("kafka=" + kafka); System.out.println("props=" + streamsProperties); final KStreamBuilder builder = new KStreamBuilder(); @@ -52,7 +49,6 @@ public class StreamsUpgradeTest { final Properties config = new Properties(); config.setProperty(StreamsConfig.APPLICATION_ID_CONFIG, "StreamsUpgradeTest"); - config.setProperty(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, kafka); config.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, 1000); config.putAll(streamsProperties); diff --git a/streams/upgrade-system-tests-0110/src/test/java/org/apache/kafka/streams/tests/StreamsUpgradeTest.java b/streams/upgrade-system-tests-0110/src/test/java/org/apache/kafka/streams/tests/StreamsUpgradeTest.java index efb96ff5f0d..a1187578a62 100644 --- a/streams/upgrade-system-tests-0110/src/test/java/org/apache/kafka/streams/tests/StreamsUpgradeTest.java +++ b/streams/upgrade-system-tests-0110/src/test/java/org/apache/kafka/streams/tests/StreamsUpgradeTest.java @@ -32,17 +32,14 @@ public class StreamsUpgradeTest { @SuppressWarnings("unchecked") public static void main(final String[] args) throws Exception { - if (args.length < 2) { - System.err.println("StreamsUpgradeTest requires two argument (kafka-url, properties-file) but only " + args.length + " provided: " - + (args.length > 0 ? args[0] : "")); + if (args.length < 1) { + System.err.println("StreamsUpgradeTest requires one argument (properties-file) but provided none"); } - final String kafka = args[0]; - final String propFileName = args.length > 1 ? args[1] : null; + final String propFileName = args[0]; final Properties streamsProperties = Utils.loadProps(propFileName); System.out.println("StreamsTest instance started (StreamsUpgradeTest v0.11.0)"); - System.out.println("kafka=" + kafka); System.out.println("props=" + streamsProperties); final KStreamBuilder builder = new KStreamBuilder(); @@ -52,7 +49,6 @@ public class StreamsUpgradeTest { final Properties config = new Properties(); config.setProperty(StreamsConfig.APPLICATION_ID_CONFIG, "StreamsUpgradeTest"); - config.setProperty(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, kafka); config.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, 1000); config.putAll(streamsProperties); diff --git a/streams/upgrade-system-tests-10/src/test/java/org/apache/kafka/streams/tests/StreamsUpgradeTest.java b/streams/upgrade-system-tests-10/src/test/java/org/apache/kafka/streams/tests/StreamsUpgradeTest.java index 6f63f8dd755..f69162c7d80 100644 --- a/streams/upgrade-system-tests-10/src/test/java/org/apache/kafka/streams/tests/StreamsUpgradeTest.java +++ b/streams/upgrade-system-tests-10/src/test/java/org/apache/kafka/streams/tests/StreamsUpgradeTest.java @@ -32,17 +32,14 @@ public class StreamsUpgradeTest { @SuppressWarnings("unchecked") public static void main(final String[] args) throws Exception { - if (args.length < 2) { - System.err.println("StreamsUpgradeTest requires two argument (kafka-url, properties-file) but only " + args.length + " provided: " - + (args.length > 0 ? args[0] : "")); + if (args.length < 1) { + System.err.println("StreamsUpgradeTest requires one argument (properties-file) but provided none"); } - final String kafka = args[0]; - final String propFileName = args.length > 1 ? args[1] : null; + final String propFileName = args[0]; final Properties streamsProperties = Utils.loadProps(propFileName); System.out.println("StreamsTest instance started (StreamsUpgradeTest v1.0)"); - System.out.println("kafka=" + kafka); System.out.println("props=" + streamsProperties); final StreamsBuilder builder = new StreamsBuilder(); @@ -52,7 +49,6 @@ public class StreamsUpgradeTest { final Properties config = new Properties(); config.setProperty(StreamsConfig.APPLICATION_ID_CONFIG, "StreamsUpgradeTest"); - config.setProperty(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, kafka); config.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, 1000); config.putAll(streamsProperties); diff --git a/streams/upgrade-system-tests-11/src/test/java/org/apache/kafka/streams/tests/StreamsUpgradeTest.java b/streams/upgrade-system-tests-11/src/test/java/org/apache/kafka/streams/tests/StreamsUpgradeTest.java index b5759f56a5a..ca284f22bfb 100644 --- a/streams/upgrade-system-tests-11/src/test/java/org/apache/kafka/streams/tests/StreamsUpgradeTest.java +++ b/streams/upgrade-system-tests-11/src/test/java/org/apache/kafka/streams/tests/StreamsUpgradeTest.java @@ -32,17 +32,14 @@ public class StreamsUpgradeTest { @SuppressWarnings("unchecked") public static void main(final String[] args) throws Exception { - if (args.length < 2) { - System.err.println("StreamsUpgradeTest requires two argument (kafka-url, properties-file) but only " + args.length + " provided: " - + (args.length > 0 ? args[0] : "")); + if (args.length < 1) { + System.err.println("StreamsUpgradeTest requires one argument (properties-file) but provided none"); } - final String kafka = args[0]; - final String propFileName = args.length > 1 ? args[1] : null; + final String propFileName = args[0]; final Properties streamsProperties = Utils.loadProps(propFileName); System.out.println("StreamsTest instance started (StreamsUpgradeTest v1.1)"); - System.out.println("kafka=" + kafka); System.out.println("props=" + streamsProperties); final StreamsBuilder builder = new StreamsBuilder(); @@ -52,7 +49,6 @@ public class StreamsUpgradeTest { final Properties config = new Properties(); config.setProperty(StreamsConfig.APPLICATION_ID_CONFIG, "StreamsUpgradeTest"); - config.setProperty(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, kafka); config.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, 1000); config.putAll(streamsProperties); diff --git a/streams/upgrade-system-tests-20/src/test/java/org/apache/kafka/streams/tests/StreamsUpgradeTest.java b/streams/upgrade-system-tests-20/src/test/java/org/apache/kafka/streams/tests/StreamsUpgradeTest.java index d963e4a1f65..f66c7a42d71 100644 --- a/streams/upgrade-system-tests-20/src/test/java/org/apache/kafka/streams/tests/StreamsUpgradeTest.java +++ b/streams/upgrade-system-tests-20/src/test/java/org/apache/kafka/streams/tests/StreamsUpgradeTest.java @@ -31,17 +31,14 @@ public class StreamsUpgradeTest { @SuppressWarnings("unchecked") public static void main(final String[] args) throws Exception { - if (args.length < 2) { - System.err.println("StreamsUpgradeTest requires two argument (kafka-url, properties-file) but only " + args.length + " provided: " - + (args.length > 0 ? args[0] : "")); + if (args.length < 1) { + System.err.println("StreamsUpgradeTest requires one argument (properties-file) but provided none"); } - final String kafka = args[0]; - final String propFileName = args.length > 1 ? args[1] : null; + final String propFileName = args[0]; final Properties streamsProperties = Utils.loadProps(propFileName); System.out.println("StreamsTest instance started (StreamsUpgradeTest v2.0)"); - System.out.println("kafka=" + kafka); System.out.println("props=" + streamsProperties); final StreamsBuilder builder = new StreamsBuilder(); @@ -51,7 +48,6 @@ public class StreamsUpgradeTest { final Properties config = new Properties(); config.setProperty(StreamsConfig.APPLICATION_ID_CONFIG, "StreamsUpgradeTest"); - config.setProperty(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, kafka); config.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, 1000); config.putAll(streamsProperties); diff --git a/streams/upgrade-system-tests-21/src/test/java/org/apache/kafka/streams/tests/StreamsUpgradeTest.java b/streams/upgrade-system-tests-21/src/test/java/org/apache/kafka/streams/tests/StreamsUpgradeTest.java index 6e409a00658..d467df099b6 100644 --- a/streams/upgrade-system-tests-21/src/test/java/org/apache/kafka/streams/tests/StreamsUpgradeTest.java +++ b/streams/upgrade-system-tests-21/src/test/java/org/apache/kafka/streams/tests/StreamsUpgradeTest.java @@ -31,17 +31,14 @@ public class StreamsUpgradeTest { @SuppressWarnings("unchecked") public static void main(final String[] args) throws Exception { - if (args.length < 2) { - System.err.println("StreamsUpgradeTest requires two argument (kafka-url, properties-file) but only " + args.length + " provided: " - + (args.length > 0 ? args[0] : "")); + if (args.length < 1) { + System.err.println("StreamsUpgradeTest requires one argument (properties-file) but provided none"); } - final String kafka = args[0]; - final String propFileName = args.length > 1 ? args[1] : null; + final String propFileName = args[0]; final Properties streamsProperties = Utils.loadProps(propFileName); System.out.println("StreamsTest instance started (StreamsUpgradeTest v2.1)"); - System.out.println("kafka=" + kafka); System.out.println("props=" + streamsProperties); final StreamsBuilder builder = new StreamsBuilder(); @@ -51,7 +48,6 @@ public class StreamsUpgradeTest { final Properties config = new Properties(); config.setProperty(StreamsConfig.APPLICATION_ID_CONFIG, "StreamsUpgradeTest"); - config.setProperty(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, kafka); config.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, 1000); config.putAll(streamsProperties); diff --git a/streams/upgrade-system-tests-22/src/test/java/org/apache/kafka/streams/tests/StreamsUpgradeTest.java b/streams/upgrade-system-tests-22/src/test/java/org/apache/kafka/streams/tests/StreamsUpgradeTest.java index 7ff4d815406..83b68cc1460 100644 --- a/streams/upgrade-system-tests-22/src/test/java/org/apache/kafka/streams/tests/StreamsUpgradeTest.java +++ b/streams/upgrade-system-tests-22/src/test/java/org/apache/kafka/streams/tests/StreamsUpgradeTest.java @@ -31,17 +31,14 @@ public class StreamsUpgradeTest { @SuppressWarnings("unchecked") public static void main(final String[] args) throws Exception { - if (args.length < 2) { - System.err.println("StreamsUpgradeTest requires two argument (kafka-url, properties-file) but only " + args.length + " provided: " - + (args.length > 0 ? args[0] : "")); + if (args.length < 1) { + System.err.println("StreamsUpgradeTest requires one argument (properties-file) but provided none"); } - final String kafka = args[0]; - final String propFileName = args.length > 1 ? args[1] : null; + final String propFileName = args[0]; final Properties streamsProperties = Utils.loadProps(propFileName); System.out.println("StreamsTest instance started (StreamsUpgradeTest v2.2)"); - System.out.println("kafka=" + kafka); System.out.println("props=" + streamsProperties); final StreamsBuilder builder = new StreamsBuilder(); @@ -51,7 +48,6 @@ public class StreamsUpgradeTest { final Properties config = new Properties(); config.setProperty(StreamsConfig.APPLICATION_ID_CONFIG, "StreamsUpgradeTest"); - config.setProperty(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, kafka); config.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, 1000); config.putAll(streamsProperties); diff --git a/streams/upgrade-system-tests-23/src/test/java/org/apache/kafka/streams/tests/StreamsUpgradeTest.java b/streams/upgrade-system-tests-23/src/test/java/org/apache/kafka/streams/tests/StreamsUpgradeTest.java index f9182c4b347..6428ec6e4de 100644 --- a/streams/upgrade-system-tests-23/src/test/java/org/apache/kafka/streams/tests/StreamsUpgradeTest.java +++ b/streams/upgrade-system-tests-23/src/test/java/org/apache/kafka/streams/tests/StreamsUpgradeTest.java @@ -31,17 +31,14 @@ public class StreamsUpgradeTest { @SuppressWarnings("unchecked") public static void main(final String[] args) throws Exception { - if (args.length < 2) { - System.err.println("StreamsUpgradeTest requires two argument (kafka-url, properties-file) but only " + args.length + " provided: " - + (args.length > 0 ? args[0] : "")); + if (args.length < 1) { + System.err.println("StreamsUpgradeTest requires one argument (properties-file) but provided none"); } - final String kafka = args[0]; - final String propFileName = args.length > 1 ? args[1] : null; + final String propFileName = args[0]; final Properties streamsProperties = Utils.loadProps(propFileName); System.out.println("StreamsTest instance started (StreamsUpgradeTest v2.3)"); - System.out.println("kafka=" + kafka); System.out.println("props=" + streamsProperties); final StreamsBuilder builder = new StreamsBuilder(); @@ -51,7 +48,6 @@ public class StreamsUpgradeTest { final Properties config = new Properties(); config.setProperty(StreamsConfig.APPLICATION_ID_CONFIG, "StreamsUpgradeTest"); - config.setProperty(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, kafka); config.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, 1000); config.putAll(streamsProperties); diff --git a/tests/kafkatest/services/streams.py b/tests/kafkatest/services/streams.py index 63efb4f662f..80c2ee73673 100644 --- a/tests/kafkatest/services/streams.py +++ b/tests/kafkatest/services/streams.py @@ -487,12 +487,7 @@ class StreamsUpgradeTestJobRunnerService(StreamsTestBaseService): def start_cmd(self, node): args = self.args.copy() - if self.KAFKA_STREAMS_VERSION in [str(LATEST_0_10_0), str(LATEST_0_10_1), str(LATEST_0_10_2), - str(LATEST_0_11_0), str(LATEST_1_0), str(LATEST_1_1), - str(LATEST_2_0), str(LATEST_2_1), str(LATEST_2_2), str(LATEST_2_3)]: - args['kafka'] = self.kafka.bootstrap_servers() - else: - args['kafka'] = "" + if self.KAFKA_STREAMS_VERSION == str(LATEST_0_10_0) or self.KAFKA_STREAMS_VERSION == str(LATEST_0_10_1): args['zk'] = self.kafka.zk.connect_setting() else: @@ -507,7 +502,7 @@ class StreamsUpgradeTestJobRunnerService(StreamsTestBaseService): cmd = "( export KAFKA_LOG4J_OPTS=\"-Dlog4j.configuration=file:%(log4j)s\"; " \ "INCLUDE_TEST_JARS=true UPGRADE_KAFKA_STREAMS_TEST_VERSION=%(version)s " \ - " %(kafka_run_class)s %(streams_class_name)s %(kafka)s %(zk)s %(config_file)s " \ + " %(kafka_run_class)s %(streams_class_name)s %(zk)s %(config_file)s " \ " & echo $! >&3 ) 1>> %(stdout)s 2>> %(stderr)s 3> %(pidfile)s" % args self.logger.info("Executing: " + cmd) diff --git a/tests/kafkatest/tests/streams/streams_upgrade_test.py b/tests/kafkatest/tests/streams/streams_upgrade_test.py index 00dbe355f80..25fe065f8e5 100644 --- a/tests/kafkatest/tests/streams/streams_upgrade_test.py +++ b/tests/kafkatest/tests/streams/streams_upgrade_test.py @@ -527,11 +527,11 @@ class StreamsUpgradeTest(Test): monitors[second_other_processor] = second_other_monitor if len(self.old_processors) > 0: - log_monitor.wait_until("Sent a version 6 subscription and got version 5 assignment back (successful version probing). Downgrade subscription metadata to commonly supported version and trigger new rebalance.", + log_monitor.wait_until("Sent a version 6 subscription and got version 5 assignment back (successful version probing). Downgrade subscription metadata to commonly supported version 5 and trigger new rebalance.", timeout_sec=60, err_msg="Could not detect 'successful version probing' at upgrading node " + str(node.account)) else: - log_monitor.wait_until("Sent a version 6 subscription and got version 5 assignment back (successful version probing). Downgrade subscription metadata to commonly supported version and trigger new rebalance.", + log_monitor.wait_until("Sent a version 6 subscription and got version 5 assignment back (successful version probing). Downgrade subscription metadata to commonly supported version 6 and trigger new rebalance.", timeout_sec=60, err_msg="Could not detect 'successful version probing with upgraded leader' at upgrading node " + str(node.account)) first_other_monitor.wait_until("Sent a version 5 subscription and group.s latest commonly supported version is 6 (successful version probing and end of rolling upgrade). Upgrading subscription metadata version to 6 for next rebalance.",