From b36cf4ef977fb14bc57683630a9f3f3680705550 Mon Sep 17 00:00:00 2001 From: "Matthias J. Sax" Date: Fri, 18 Aug 2023 11:06:08 -0700 Subject: [PATCH] HOTIFX: fix Kafka Streams upgrade path from 3.4 to 3.5 (#14103) KIP-904 introduced a backward incompatible change that requires a 2-bounce rolling upgrade. The new "3.4" upgrade config value is not recognized by `AssignorConfiguration` though and thus crashed Kafka Streams if use. Reviewers: Farooq Qaiser , Bruno Cadonna --- .../apache/kafka/streams/StreamsConfig.java | 86 ++++++++---------- .../streams/internals/UpgradeFromValues.java | 53 +++++++++++ .../kstream/internals/ChangedSerializer.java | 44 +++++----- .../internals/KTableRepartitionMap.java | 44 +++++----- .../SubscriptionWrapperSerde.java | 42 ++++----- .../internals/InternalTopicManager.java | 2 +- .../assignment/AssignorConfiguration.java | 87 ++++++++++--------- .../kafka/streams/StreamsConfigTest.java | 13 +++ .../assignment/AssignorConfigurationTest.java | 43 +++++++++ 9 files changed, 261 insertions(+), 153 deletions(-) create mode 100644 streams/src/main/java/org/apache/kafka/streams/internals/UpgradeFromValues.java diff --git a/streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java b/streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java index de70b77f4bf..2ff4eea53c1 100644 --- a/streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java +++ b/streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java @@ -42,6 +42,7 @@ import org.apache.kafka.streams.errors.LogAndFailExceptionHandler; import org.apache.kafka.streams.errors.ProductionExceptionHandler; import org.apache.kafka.streams.errors.StreamsException; import org.apache.kafka.streams.internals.StreamsConfigUtils; +import org.apache.kafka.streams.internals.UpgradeFromValues; import org.apache.kafka.streams.processor.FailOnInvalidTimestamp; import org.apache.kafka.streams.processor.TimestampExtractor; import org.apache.kafka.streams.processor.internals.DefaultKafkaClientSupplier; @@ -61,6 +62,7 @@ import java.util.Objects; import java.util.Properties; import java.util.Set; import java.util.stream.Collectors; +import java.util.stream.Stream; import static org.apache.kafka.common.IsolationLevel.READ_COMMITTED; import static org.apache.kafka.common.config.ConfigDef.ListSize.atMostOfSize; @@ -289,121 +291,121 @@ public class StreamsConfig extends AbstractConfig { * Config value for parameter {@link #UPGRADE_FROM_CONFIG "upgrade.from"} for upgrading an application from version {@code 0.10.0.x}. */ @SuppressWarnings("WeakerAccess") - public static final String UPGRADE_FROM_0100 = "0.10.0"; + public static final String UPGRADE_FROM_0100 = UpgradeFromValues.UPGRADE_FROM_0100.toString(); /** * Config value for parameter {@link #UPGRADE_FROM_CONFIG "upgrade.from"} for upgrading an application from version {@code 0.10.1.x}. */ @SuppressWarnings("WeakerAccess") - public static final String UPGRADE_FROM_0101 = "0.10.1"; + public static final String UPGRADE_FROM_0101 = UpgradeFromValues.UPGRADE_FROM_0101.toString(); /** * Config value for parameter {@link #UPGRADE_FROM_CONFIG "upgrade.from"} for upgrading an application from version {@code 0.10.2.x}. */ @SuppressWarnings("WeakerAccess") - public static final String UPGRADE_FROM_0102 = "0.10.2"; + public static final String UPGRADE_FROM_0102 = UpgradeFromValues.UPGRADE_FROM_0102.toString(); /** * Config value for parameter {@link #UPGRADE_FROM_CONFIG "upgrade.from"} for upgrading an application from version {@code 0.11.0.x}. */ @SuppressWarnings("WeakerAccess") - public static final String UPGRADE_FROM_0110 = "0.11.0"; + public static final String UPGRADE_FROM_0110 = UpgradeFromValues.UPGRADE_FROM_0110.toString(); /** * Config value for parameter {@link #UPGRADE_FROM_CONFIG "upgrade.from"} for upgrading an application from version {@code 1.0.x}. */ @SuppressWarnings("WeakerAccess") - public static final String UPGRADE_FROM_10 = "1.0"; + public static final String UPGRADE_FROM_10 = UpgradeFromValues.UPGRADE_FROM_10.toString(); /** * Config value for parameter {@link #UPGRADE_FROM_CONFIG "upgrade.from"} for upgrading an application from version {@code 1.1.x}. */ @SuppressWarnings("WeakerAccess") - public static final String UPGRADE_FROM_11 = "1.1"; + public static final String UPGRADE_FROM_11 = UpgradeFromValues.UPGRADE_FROM_11.toString(); /** * Config value for parameter {@link #UPGRADE_FROM_CONFIG "upgrade.from"} for upgrading an application from version {@code 2.0.x}. */ @SuppressWarnings("WeakerAccess") - public static final String UPGRADE_FROM_20 = "2.0"; + public static final String UPGRADE_FROM_20 = UpgradeFromValues.UPGRADE_FROM_20.toString(); /** * Config value for parameter {@link #UPGRADE_FROM_CONFIG "upgrade.from"} for upgrading an application from version {@code 2.1.x}. */ @SuppressWarnings("WeakerAccess") - public static final String UPGRADE_FROM_21 = "2.1"; + public static final String UPGRADE_FROM_21 = UpgradeFromValues.UPGRADE_FROM_21.toString(); /** * Config value for parameter {@link #UPGRADE_FROM_CONFIG "upgrade.from"} for upgrading an application from version {@code 2.2.x}. */ @SuppressWarnings("WeakerAccess") - public static final String UPGRADE_FROM_22 = "2.2"; + public static final String UPGRADE_FROM_22 = UpgradeFromValues.UPGRADE_FROM_22.toString(); /** * Config value for parameter {@link #UPGRADE_FROM_CONFIG "upgrade.from"} for upgrading an application from version {@code 2.3.x}. */ @SuppressWarnings("WeakerAccess") - public static final String UPGRADE_FROM_23 = "2.3"; + public static final String UPGRADE_FROM_23 = UpgradeFromValues.UPGRADE_FROM_23.toString(); /** * Config value for parameter {@link #UPGRADE_FROM_CONFIG "upgrade.from"} for upgrading an application from version {@code 2.4.x}. */ @SuppressWarnings("WeakerAccess") - public static final String UPGRADE_FROM_24 = "2.4"; + public static final String UPGRADE_FROM_24 = UpgradeFromValues.UPGRADE_FROM_24.toString(); /** * Config value for parameter {@link #UPGRADE_FROM_CONFIG "upgrade.from"} for upgrading an application from version {@code 2.5.x}. */ @SuppressWarnings("WeakerAccess") - public static final String UPGRADE_FROM_25 = "2.5"; + public static final String UPGRADE_FROM_25 = UpgradeFromValues.UPGRADE_FROM_25.toString(); /** * Config value for parameter {@link #UPGRADE_FROM_CONFIG "upgrade.from"} for upgrading an application from version {@code 2.6.x}. */ @SuppressWarnings("WeakerAccess") - public static final String UPGRADE_FROM_26 = "2.6"; + public static final String UPGRADE_FROM_26 = UpgradeFromValues.UPGRADE_FROM_26.toString(); /** * Config value for parameter {@link #UPGRADE_FROM_CONFIG "upgrade.from"} for upgrading an application from version {@code 2.7.x}. */ @SuppressWarnings("WeakerAccess") - public static final String UPGRADE_FROM_27 = "2.7"; + public static final String UPGRADE_FROM_27 = UpgradeFromValues.UPGRADE_FROM_27.toString(); /** * Config value for parameter {@link #UPGRADE_FROM_CONFIG "upgrade.from"} for upgrading an application from version {@code 2.8.x}. */ @SuppressWarnings("WeakerAccess") - public static final String UPGRADE_FROM_28 = "2.8"; + public static final String UPGRADE_FROM_28 = UpgradeFromValues.UPGRADE_FROM_28.toString(); /** * Config value for parameter {@link #UPGRADE_FROM_CONFIG "upgrade.from"} for upgrading an application from version {@code 3.0.x}. */ @SuppressWarnings("WeakerAccess") - public static final String UPGRADE_FROM_30 = "3.0"; + public static final String UPGRADE_FROM_30 = UpgradeFromValues.UPGRADE_FROM_30.toString(); /** * Config value for parameter {@link #UPGRADE_FROM_CONFIG "upgrade.from"} for upgrading an application from version {@code 3.1.x}. */ @SuppressWarnings("WeakerAccess") - public static final String UPGRADE_FROM_31 = "3.1"; + public static final String UPGRADE_FROM_31 = UpgradeFromValues.UPGRADE_FROM_31.toString(); /** * Config value for parameter {@link #UPGRADE_FROM_CONFIG "upgrade.from"} for upgrading an application from version {@code 3.2.x}. */ @SuppressWarnings("WeakerAccess") - public static final String UPGRADE_FROM_32 = "3.2"; + public static final String UPGRADE_FROM_32 = UpgradeFromValues.UPGRADE_FROM_32.toString(); /** * Config value for parameter {@link #UPGRADE_FROM_CONFIG "upgrade.from"} for upgrading an application from version {@code 3.3.x}. */ @SuppressWarnings("WeakerAccess") - public static final String UPGRADE_FROM_33 = "3.3"; + public static final String UPGRADE_FROM_33 = UpgradeFromValues.UPGRADE_FROM_33.toString(); /** * Config value for parameter {@link #UPGRADE_FROM_CONFIG "upgrade.from"} for upgrading an application from version {@code 3.4.x}. */ @SuppressWarnings("WeakerAccess") - public static final String UPGRADE_FROM_34 = "3.4"; + public static final String UPGRADE_FROM_34 = UpgradeFromValues.UPGRADE_FROM_34.toString(); /** * Config value for parameter {@link #PROCESSING_GUARANTEE_CONFIG "processing.guarantee"} for at-least-once processing guarantees. @@ -993,9 +995,9 @@ public class StreamsConfig extends AbstractConfig { Importance.LOW, REPARTITION_PURGE_INTERVAL_MS_DOC) .define(CONNECTIONS_MAX_IDLE_MS_CONFIG, - ConfigDef.Type.LONG, + Type.LONG, 9 * 60 * 1000L, - ConfigDef.Importance.LOW, + Importance.LOW, CommonClientConfigs.CONNECTIONS_MAX_IDLE_MS_DOC) .define(DEFAULT_DSL_STORE_CONFIG, Type.STRING, @@ -1009,10 +1011,10 @@ public class StreamsConfig extends AbstractConfig { Importance.LOW, DEFAULT_CLIENT_SUPPLIER_DOC) .define(METADATA_MAX_AGE_CONFIG, - ConfigDef.Type.LONG, + Type.LONG, 5 * 60 * 1000L, atLeast(0), - ConfigDef.Importance.LOW, + Importance.LOW, CommonClientConfigs.METADATA_MAX_AGE_DOC) .define(METRICS_NUM_SAMPLES_CONFIG, Type.INT, @@ -1069,25 +1071,25 @@ public class StreamsConfig extends AbstractConfig { Type.LONG, 1000L, atLeast(0L), - ConfigDef.Importance.LOW, + Importance.LOW, CommonClientConfigs.RECONNECT_BACKOFF_MAX_MS_DOC) .define(RETRIES_CONFIG, Type.INT, 0, between(0, Integer.MAX_VALUE), - ConfigDef.Importance.LOW, + Importance.LOW, CommonClientConfigs.RETRIES_DOC) .define(RETRY_BACKOFF_MS_CONFIG, Type.LONG, 100L, atLeast(0L), - ConfigDef.Importance.LOW, + Importance.LOW, CommonClientConfigs.RETRY_BACKOFF_MS_DOC) .define(REQUEST_TIMEOUT_MS_CONFIG, Type.INT, 40 * 1000, atLeast(0), - ConfigDef.Importance.LOW, + Importance.LOW, CommonClientConfigs.REQUEST_TIMEOUT_MS_DOC) .define(ROCKSDB_CONFIG_SETTER_CLASS_CONFIG, Type.CLASS, @@ -1106,29 +1108,13 @@ public class StreamsConfig extends AbstractConfig { Importance.LOW, STATE_CLEANUP_DELAY_MS_DOC) .define(UPGRADE_FROM_CONFIG, - ConfigDef.Type.STRING, + Type.STRING, null, - in(null, - UPGRADE_FROM_0100, - UPGRADE_FROM_0101, - UPGRADE_FROM_0102, - UPGRADE_FROM_0110, - UPGRADE_FROM_10, - UPGRADE_FROM_11, - UPGRADE_FROM_20, - UPGRADE_FROM_21, - UPGRADE_FROM_22, - UPGRADE_FROM_23, - UPGRADE_FROM_24, - UPGRADE_FROM_25, - UPGRADE_FROM_26, - UPGRADE_FROM_27, - UPGRADE_FROM_28, - UPGRADE_FROM_30, - UPGRADE_FROM_31, - UPGRADE_FROM_32, - UPGRADE_FROM_33, - UPGRADE_FROM_34), + in(Stream.concat( + Stream.of((String) null), + Arrays.stream(UpgradeFromValues.values()).map(UpgradeFromValues::toString) + ).toArray(String[]::new) + ), Importance.LOW, UPGRADE_FROM_DOC) .define(WINDOWED_INNER_CLASS_SERDE, diff --git a/streams/src/main/java/org/apache/kafka/streams/internals/UpgradeFromValues.java b/streams/src/main/java/org/apache/kafka/streams/internals/UpgradeFromValues.java new file mode 100644 index 00000000000..a5ae71a33d4 --- /dev/null +++ b/streams/src/main/java/org/apache/kafka/streams/internals/UpgradeFromValues.java @@ -0,0 +1,53 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.streams.internals; + +public enum UpgradeFromValues { + UPGRADE_FROM_0100("0.10.0"), + UPGRADE_FROM_0101("0.10.1"), + UPGRADE_FROM_0102("0.10.2"), + UPGRADE_FROM_0110("0.11.0"), + UPGRADE_FROM_10("1.0"), + UPGRADE_FROM_11("1.1"), + UPGRADE_FROM_20("2.0"), + UPGRADE_FROM_21("2.1"), + UPGRADE_FROM_22("2.2"), + UPGRADE_FROM_23("2.3"), + UPGRADE_FROM_24("2.4"), + UPGRADE_FROM_25("2.5"), + UPGRADE_FROM_26("2.6"), + UPGRADE_FROM_27("2.7"), + UPGRADE_FROM_28("2.8"), + UPGRADE_FROM_30("3.0"), + UPGRADE_FROM_31("3.1"), + UPGRADE_FROM_32("3.2"), + UPGRADE_FROM_33("3.3"), + UPGRADE_FROM_34("3.4"); + + private final String value; + + UpgradeFromValues(final String value) { + this.value = value; + } + + public static UpgradeFromValues getValueFromString(final String upgradeFrom) { + return UpgradeFromValues.valueOf("UPGRADE_FROM_" + upgradeFrom.replace(".", "")); + } + public String toString() { + return value; + } +} diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/ChangedSerializer.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/ChangedSerializer.java index 8a11575392f..7aca5245243 100644 --- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/ChangedSerializer.java +++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/ChangedSerializer.java @@ -21,6 +21,7 @@ import org.apache.kafka.common.serialization.Serializer; import org.apache.kafka.common.utils.ByteUtils; import org.apache.kafka.streams.StreamsConfig; import org.apache.kafka.streams.errors.StreamsException; +import org.apache.kafka.streams.internals.UpgradeFromValues; import org.apache.kafka.streams.processor.internals.SerdeGetter; import java.nio.ByteBuffer; @@ -56,27 +57,28 @@ public class ChangedSerializer implements Serializer>, WrappingNull return false; } - switch ((String) upgradeFrom) { - case StreamsConfig.UPGRADE_FROM_0100: - case StreamsConfig.UPGRADE_FROM_0101: - case StreamsConfig.UPGRADE_FROM_0102: - case StreamsConfig.UPGRADE_FROM_0110: - case StreamsConfig.UPGRADE_FROM_10: - case StreamsConfig.UPGRADE_FROM_11: - case StreamsConfig.UPGRADE_FROM_20: - case StreamsConfig.UPGRADE_FROM_21: - case StreamsConfig.UPGRADE_FROM_22: - case StreamsConfig.UPGRADE_FROM_23: - case StreamsConfig.UPGRADE_FROM_24: - case StreamsConfig.UPGRADE_FROM_25: - case StreamsConfig.UPGRADE_FROM_26: - case StreamsConfig.UPGRADE_FROM_27: - case StreamsConfig.UPGRADE_FROM_28: - case StreamsConfig.UPGRADE_FROM_30: - case StreamsConfig.UPGRADE_FROM_31: - case StreamsConfig.UPGRADE_FROM_32: - case StreamsConfig.UPGRADE_FROM_33: - case StreamsConfig.UPGRADE_FROM_34: + switch (UpgradeFromValues.getValueFromString((String) upgradeFrom)) { + case UPGRADE_FROM_0100: + case UPGRADE_FROM_0101: + case UPGRADE_FROM_0102: + case UPGRADE_FROM_0110: + case UPGRADE_FROM_10: + case UPGRADE_FROM_11: + case UPGRADE_FROM_20: + case UPGRADE_FROM_21: + case UPGRADE_FROM_22: + case UPGRADE_FROM_23: + case UPGRADE_FROM_24: + case UPGRADE_FROM_25: + case UPGRADE_FROM_26: + case UPGRADE_FROM_27: + case UPGRADE_FROM_28: + case UPGRADE_FROM_30: + case UPGRADE_FROM_31: + case UPGRADE_FROM_32: + case UPGRADE_FROM_33: + case UPGRADE_FROM_34: + // there is no need to add new version here return true; default: return false; diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableRepartitionMap.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableRepartitionMap.java index 67bf5734590..a08bf978d40 100644 --- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableRepartitionMap.java +++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableRepartitionMap.java @@ -19,6 +19,7 @@ package org.apache.kafka.streams.kstream.internals; import org.apache.kafka.streams.KeyValue; import org.apache.kafka.streams.StreamsConfig; import org.apache.kafka.streams.errors.StreamsException; +import org.apache.kafka.streams.internals.UpgradeFromValues; import org.apache.kafka.streams.kstream.KeyValueMapper; import org.apache.kafka.streams.processor.api.ContextualProcessor; import org.apache.kafka.streams.processor.api.Processor; @@ -98,27 +99,28 @@ public class KTableRepartitionMap implements KTableRepartitionMapS return true; } - switch ((String) upgradeFrom) { - case StreamsConfig.UPGRADE_FROM_0100: - case StreamsConfig.UPGRADE_FROM_0101: - case StreamsConfig.UPGRADE_FROM_0102: - case StreamsConfig.UPGRADE_FROM_0110: - case StreamsConfig.UPGRADE_FROM_10: - case StreamsConfig.UPGRADE_FROM_11: - case StreamsConfig.UPGRADE_FROM_20: - case StreamsConfig.UPGRADE_FROM_21: - case StreamsConfig.UPGRADE_FROM_22: - case StreamsConfig.UPGRADE_FROM_23: - case StreamsConfig.UPGRADE_FROM_24: - case StreamsConfig.UPGRADE_FROM_25: - case StreamsConfig.UPGRADE_FROM_26: - case StreamsConfig.UPGRADE_FROM_27: - case StreamsConfig.UPGRADE_FROM_28: - case StreamsConfig.UPGRADE_FROM_30: - case StreamsConfig.UPGRADE_FROM_31: - case StreamsConfig.UPGRADE_FROM_32: - case StreamsConfig.UPGRADE_FROM_33: - case StreamsConfig.UPGRADE_FROM_34: + switch (UpgradeFromValues.getValueFromString((String) upgradeFrom)) { + case UPGRADE_FROM_0100: + case UPGRADE_FROM_0101: + case UPGRADE_FROM_0102: + case UPGRADE_FROM_0110: + case UPGRADE_FROM_10: + case UPGRADE_FROM_11: + case UPGRADE_FROM_20: + case UPGRADE_FROM_21: + case UPGRADE_FROM_22: + case UPGRADE_FROM_23: + case UPGRADE_FROM_24: + case UPGRADE_FROM_25: + case UPGRADE_FROM_26: + case UPGRADE_FROM_27: + case UPGRADE_FROM_28: + case UPGRADE_FROM_30: + case UPGRADE_FROM_31: + case UPGRADE_FROM_32: + case UPGRADE_FROM_33: + case UPGRADE_FROM_34: + // there is no need to add new versions here return false; default: return true; diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/foreignkeyjoin/SubscriptionWrapperSerde.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/foreignkeyjoin/SubscriptionWrapperSerde.java index 75074f12b1f..d9700b594d1 100644 --- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/foreignkeyjoin/SubscriptionWrapperSerde.java +++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/foreignkeyjoin/SubscriptionWrapperSerde.java @@ -22,6 +22,7 @@ import org.apache.kafka.common.serialization.Deserializer; import org.apache.kafka.common.serialization.Serde; import org.apache.kafka.common.serialization.Serializer; import org.apache.kafka.streams.StreamsConfig; +import org.apache.kafka.streams.internals.UpgradeFromValues; import org.apache.kafka.streams.kstream.internals.WrappingNullableDeserializer; import org.apache.kafka.streams.kstream.internals.WrappingNullableSerde; import org.apache.kafka.streams.kstream.internals.WrappingNullableSerializer; @@ -74,26 +75,27 @@ public class SubscriptionWrapperSerde extends WrappingNullableSerde> getTopicPartitionInfo(final Set topics, - final Set tempUnknownTopics) { + final Set tempUnknownTopics) { final DescribeTopicsResult describeTopicsResult = adminClient.describeTopics(topics); final Map> futures = describeTopicsResult.topicNameValues(); diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/assignment/AssignorConfiguration.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/assignment/AssignorConfiguration.java index b45044fe5f2..02a0cb8bab9 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/assignment/AssignorConfiguration.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/assignment/AssignorConfiguration.java @@ -25,6 +25,7 @@ import org.apache.kafka.common.utils.LogContext; import org.apache.kafka.common.utils.Utils; import org.apache.kafka.streams.StreamsConfig; import org.apache.kafka.streams.StreamsConfig.InternalConfig; +import org.apache.kafka.streams.internals.UpgradeFromValues; import org.apache.kafka.streams.processor.internals.ClientUtils; import org.apache.kafka.streams.processor.internals.InternalTopicManager; import org.slf4j.Logger; @@ -95,17 +96,17 @@ public final class AssignorConfiguration { public RebalanceProtocol rebalanceProtocol() { final String upgradeFrom = streamsConfig.getString(StreamsConfig.UPGRADE_FROM_CONFIG); if (upgradeFrom != null) { - switch (upgradeFrom) { - case StreamsConfig.UPGRADE_FROM_0100: - case StreamsConfig.UPGRADE_FROM_0101: - case StreamsConfig.UPGRADE_FROM_0102: - case StreamsConfig.UPGRADE_FROM_0110: - case StreamsConfig.UPGRADE_FROM_10: - case StreamsConfig.UPGRADE_FROM_11: - case StreamsConfig.UPGRADE_FROM_20: - case StreamsConfig.UPGRADE_FROM_21: - case StreamsConfig.UPGRADE_FROM_22: - case StreamsConfig.UPGRADE_FROM_23: + switch (UpgradeFromValues.getValueFromString(upgradeFrom)) { + case UPGRADE_FROM_0100: + case UPGRADE_FROM_0101: + case UPGRADE_FROM_0102: + case UPGRADE_FROM_0110: + case UPGRADE_FROM_10: + case UPGRADE_FROM_11: + case UPGRADE_FROM_20: + case UPGRADE_FROM_21: + case UPGRADE_FROM_22: + case UPGRADE_FROM_23: // ATTENTION: The following log messages is used for verification in system test // streams/streams_cooperative_rebalance_upgrade_test.py::StreamsCooperativeRebalanceUpgradeTest.test_upgrade_to_cooperative_rebalance // If you change it, please do also change the system test accordingly and @@ -114,15 +115,18 @@ public final class AssignorConfiguration { log.warn("The eager rebalancing protocol is deprecated and will stop being supported in a future release." + " Please be prepared to remove the 'upgrade.from' config soon."); return RebalanceProtocol.EAGER; - case StreamsConfig.UPGRADE_FROM_24: - case StreamsConfig.UPGRADE_FROM_25: - case StreamsConfig.UPGRADE_FROM_26: - case StreamsConfig.UPGRADE_FROM_27: - case StreamsConfig.UPGRADE_FROM_28: - case StreamsConfig.UPGRADE_FROM_30: - case StreamsConfig.UPGRADE_FROM_31: - case StreamsConfig.UPGRADE_FROM_32: - case StreamsConfig.UPGRADE_FROM_33: + case UPGRADE_FROM_24: + case UPGRADE_FROM_25: + case UPGRADE_FROM_26: + case UPGRADE_FROM_27: + case UPGRADE_FROM_28: + case UPGRADE_FROM_30: + case UPGRADE_FROM_31: + case UPGRADE_FROM_32: + case UPGRADE_FROM_33: + case UPGRADE_FROM_34: + // we need to add new version when new "upgrade.from" values become available + // This config is for explicitly sending FK response to a requested partition // and should not affect the rebalance protocol break; @@ -145,39 +149,42 @@ public final class AssignorConfiguration { public int configuredMetadataVersion(final int priorVersion) { final String upgradeFrom = streamsConfig.getString(StreamsConfig.UPGRADE_FROM_CONFIG); if (upgradeFrom != null) { - switch (upgradeFrom) { - case StreamsConfig.UPGRADE_FROM_0100: + switch (UpgradeFromValues.getValueFromString(upgradeFrom)) { + case UPGRADE_FROM_0100: log.info( "Downgrading metadata version from {} to 1 for upgrade from 0.10.0.x.", LATEST_SUPPORTED_VERSION ); return 1; - case StreamsConfig.UPGRADE_FROM_0101: - case StreamsConfig.UPGRADE_FROM_0102: - case StreamsConfig.UPGRADE_FROM_0110: - case StreamsConfig.UPGRADE_FROM_10: - case StreamsConfig.UPGRADE_FROM_11: + case UPGRADE_FROM_0101: + case UPGRADE_FROM_0102: + case UPGRADE_FROM_0110: + case UPGRADE_FROM_10: + case UPGRADE_FROM_11: log.info( "Downgrading metadata version from {} to 2 for upgrade from {}.x.", LATEST_SUPPORTED_VERSION, upgradeFrom ); return 2; - case StreamsConfig.UPGRADE_FROM_20: - case StreamsConfig.UPGRADE_FROM_21: - case StreamsConfig.UPGRADE_FROM_22: - case StreamsConfig.UPGRADE_FROM_23: + case UPGRADE_FROM_20: + case UPGRADE_FROM_21: + case UPGRADE_FROM_22: + case UPGRADE_FROM_23: // These configs are for cooperative rebalancing and should not affect the metadata version break; - case StreamsConfig.UPGRADE_FROM_24: - case StreamsConfig.UPGRADE_FROM_25: - case StreamsConfig.UPGRADE_FROM_26: - case StreamsConfig.UPGRADE_FROM_27: - case StreamsConfig.UPGRADE_FROM_28: - case StreamsConfig.UPGRADE_FROM_30: - case StreamsConfig.UPGRADE_FROM_31: - case StreamsConfig.UPGRADE_FROM_32: - case StreamsConfig.UPGRADE_FROM_33: + case UPGRADE_FROM_24: + case UPGRADE_FROM_25: + case UPGRADE_FROM_26: + case UPGRADE_FROM_27: + case UPGRADE_FROM_28: + case UPGRADE_FROM_30: + case UPGRADE_FROM_31: + case UPGRADE_FROM_32: + case UPGRADE_FROM_33: + case UPGRADE_FROM_34: + // we need to add new version when new "upgrade.from" values become available + // This config is for explicitly sending FK response to a requested partition // and should not affect the metadata version break; diff --git a/streams/src/test/java/org/apache/kafka/streams/StreamsConfigTest.java b/streams/src/test/java/org/apache/kafka/streams/StreamsConfigTest.java index 94e474c9975..4223294c68a 100644 --- a/streams/src/test/java/org/apache/kafka/streams/StreamsConfigTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/StreamsConfigTest.java @@ -31,6 +31,7 @@ import org.apache.kafka.common.serialization.Serdes; import org.apache.kafka.common.serialization.Serializer; import org.apache.kafka.common.utils.Utils; import org.apache.kafka.streams.errors.StreamsException; +import org.apache.kafka.streams.internals.UpgradeFromValues; import org.apache.kafka.streams.processor.FailOnInvalidTimestamp; import org.apache.kafka.streams.processor.TimestampExtractor; import org.apache.kafka.streams.processor.internals.DefaultKafkaClientSupplier; @@ -1434,6 +1435,18 @@ public class StreamsConfigTest { assertThrows(ConfigException.class, () -> new StreamsConfig(props)); } + @Test + public void shouldSupportAllUpgradeFromValues() { + for (final UpgradeFromValues upgradeFrom : UpgradeFromValues.values()) { + props.put(StreamsConfig.UPGRADE_FROM_CONFIG, upgradeFrom.toString()); + try { + new StreamsConfig(props); + } catch (final Exception fatal) { + throw new AssertionError("StreamsConfig did not accept `upgrade.from` config value `" + upgradeFrom + "`"); + } + } + } + static class MisconfiguredSerde implements Serde { @Override public void configure(final Map configs, final boolean isKey) { diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/assignment/AssignorConfigurationTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/assignment/AssignorConfigurationTest.java index 9ff53b54244..37394e647fb 100644 --- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/assignment/AssignorConfigurationTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/assignment/AssignorConfigurationTest.java @@ -17,14 +17,29 @@ package org.apache.kafka.streams.processor.internals.assignment; import org.apache.kafka.common.config.ConfigException; +import org.apache.kafka.streams.StreamsConfig; +import org.apache.kafka.streams.internals.UpgradeFromValues; +import org.junit.Before; import org.junit.Test; +import java.util.HashMap; +import java.util.Map; + import static org.apache.kafka.streams.processor.internals.assignment.AssignmentTestUtils.EMPTY_RACK_AWARE_ASSIGNMENT_TAGS; import static org.hamcrest.MatcherAssert.assertThat; import static org.hamcrest.Matchers.containsString; import static org.junit.Assert.assertThrows; +import static org.mockito.Mockito.mock; public class AssignorConfigurationTest { + private final Map config = new HashMap<>(); + + @Before + public void setup() { + config.put(StreamsConfig.APPLICATION_ID_CONFIG, "app.id"); + config.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "dummy"); + config.put(StreamsConfig.InternalConfig.REFERENCE_CONTAINER_PARTITION_ASSIGNOR, mock(ReferenceContainer.class)); + } @Test public void configsShouldRejectZeroWarmups() { @@ -35,4 +50,32 @@ public class AssignorConfigurationTest { assertThat(exception.getMessage(), containsString("Invalid value 0 for configuration max.warmup.replicas")); } + + @Test + public void rebalanceProtocolShouldSupportAllUpgradeFromVersions() { + for (final UpgradeFromValues upgradeFrom : UpgradeFromValues.values()) { + config.put(StreamsConfig.UPGRADE_FROM_CONFIG, upgradeFrom.toString()); + final AssignorConfiguration assignorConfiguration = new AssignorConfiguration(config); + + try { + assignorConfiguration.rebalanceProtocol(); + } catch (final Exception error) { + throw new AssertionError("Upgrade from " + upgradeFrom + " failed with " + error.getMessage() + "!"); + } + } + } + + @Test + public void configuredMetadataVersionShouldSupportAllUpgradeFromVersions() { + for (final UpgradeFromValues upgradeFrom : UpgradeFromValues.values()) { + config.put(StreamsConfig.UPGRADE_FROM_CONFIG, upgradeFrom.toString()); + final AssignorConfiguration assignorConfiguration = new AssignorConfiguration(config); + + try { + assignorConfiguration.configuredMetadataVersion(0); + } catch (final Exception error) { + throw new AssertionError("Upgrade from " + upgradeFrom + " failed with " + error.getMessage() + "!"); + } + } + } }