Browse Source

HOTIFX: fix Kafka Streams upgrade path from 3.4 to 3.5 (#14103)

KIP-904 introduced a backward incompatible change that requires a 2-bounce rolling upgrade.
The new "3.4" upgrade config value is not recognized by `AssignorConfiguration` though and thus crashed Kafka Streams if use.

Reviewers: Farooq Qaiser <fqaiser94@gmail.com>, Bruno Cadonna <bruno@confluent.io>
pull/13932/merge
Matthias J. Sax 1 year ago committed by GitHub
parent
commit
b36cf4ef97
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 86
      streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java
  2. 53
      streams/src/main/java/org/apache/kafka/streams/internals/UpgradeFromValues.java
  3. 44
      streams/src/main/java/org/apache/kafka/streams/kstream/internals/ChangedSerializer.java
  4. 44
      streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableRepartitionMap.java
  5. 42
      streams/src/main/java/org/apache/kafka/streams/kstream/internals/foreignkeyjoin/SubscriptionWrapperSerde.java
  6. 2
      streams/src/main/java/org/apache/kafka/streams/processor/internals/InternalTopicManager.java
  7. 87
      streams/src/main/java/org/apache/kafka/streams/processor/internals/assignment/AssignorConfiguration.java
  8. 13
      streams/src/test/java/org/apache/kafka/streams/StreamsConfigTest.java
  9. 43
      streams/src/test/java/org/apache/kafka/streams/processor/internals/assignment/AssignorConfigurationTest.java

86
streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java

@ -42,6 +42,7 @@ import org.apache.kafka.streams.errors.LogAndFailExceptionHandler; @@ -42,6 +42,7 @@ import org.apache.kafka.streams.errors.LogAndFailExceptionHandler;
import org.apache.kafka.streams.errors.ProductionExceptionHandler;
import org.apache.kafka.streams.errors.StreamsException;
import org.apache.kafka.streams.internals.StreamsConfigUtils;
import org.apache.kafka.streams.internals.UpgradeFromValues;
import org.apache.kafka.streams.processor.FailOnInvalidTimestamp;
import org.apache.kafka.streams.processor.TimestampExtractor;
import org.apache.kafka.streams.processor.internals.DefaultKafkaClientSupplier;
@ -61,6 +62,7 @@ import java.util.Objects; @@ -61,6 +62,7 @@ import java.util.Objects;
import java.util.Properties;
import java.util.Set;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import static org.apache.kafka.common.IsolationLevel.READ_COMMITTED;
import static org.apache.kafka.common.config.ConfigDef.ListSize.atMostOfSize;
@ -289,121 +291,121 @@ public class StreamsConfig extends AbstractConfig { @@ -289,121 +291,121 @@ public class StreamsConfig extends AbstractConfig {
* Config value for parameter {@link #UPGRADE_FROM_CONFIG "upgrade.from"} for upgrading an application from version {@code 0.10.0.x}.
*/
@SuppressWarnings("WeakerAccess")
public static final String UPGRADE_FROM_0100 = "0.10.0";
public static final String UPGRADE_FROM_0100 = UpgradeFromValues.UPGRADE_FROM_0100.toString();
/**
* Config value for parameter {@link #UPGRADE_FROM_CONFIG "upgrade.from"} for upgrading an application from version {@code 0.10.1.x}.
*/
@SuppressWarnings("WeakerAccess")
public static final String UPGRADE_FROM_0101 = "0.10.1";
public static final String UPGRADE_FROM_0101 = UpgradeFromValues.UPGRADE_FROM_0101.toString();
/**
* Config value for parameter {@link #UPGRADE_FROM_CONFIG "upgrade.from"} for upgrading an application from version {@code 0.10.2.x}.
*/
@SuppressWarnings("WeakerAccess")
public static final String UPGRADE_FROM_0102 = "0.10.2";
public static final String UPGRADE_FROM_0102 = UpgradeFromValues.UPGRADE_FROM_0102.toString();
/**
* Config value for parameter {@link #UPGRADE_FROM_CONFIG "upgrade.from"} for upgrading an application from version {@code 0.11.0.x}.
*/
@SuppressWarnings("WeakerAccess")
public static final String UPGRADE_FROM_0110 = "0.11.0";
public static final String UPGRADE_FROM_0110 = UpgradeFromValues.UPGRADE_FROM_0110.toString();
/**
* Config value for parameter {@link #UPGRADE_FROM_CONFIG "upgrade.from"} for upgrading an application from version {@code 1.0.x}.
*/
@SuppressWarnings("WeakerAccess")
public static final String UPGRADE_FROM_10 = "1.0";
public static final String UPGRADE_FROM_10 = UpgradeFromValues.UPGRADE_FROM_10.toString();
/**
* Config value for parameter {@link #UPGRADE_FROM_CONFIG "upgrade.from"} for upgrading an application from version {@code 1.1.x}.
*/
@SuppressWarnings("WeakerAccess")
public static final String UPGRADE_FROM_11 = "1.1";
public static final String UPGRADE_FROM_11 = UpgradeFromValues.UPGRADE_FROM_11.toString();
/**
* Config value for parameter {@link #UPGRADE_FROM_CONFIG "upgrade.from"} for upgrading an application from version {@code 2.0.x}.
*/
@SuppressWarnings("WeakerAccess")
public static final String UPGRADE_FROM_20 = "2.0";
public static final String UPGRADE_FROM_20 = UpgradeFromValues.UPGRADE_FROM_20.toString();
/**
* Config value for parameter {@link #UPGRADE_FROM_CONFIG "upgrade.from"} for upgrading an application from version {@code 2.1.x}.
*/
@SuppressWarnings("WeakerAccess")
public static final String UPGRADE_FROM_21 = "2.1";
public static final String UPGRADE_FROM_21 = UpgradeFromValues.UPGRADE_FROM_21.toString();
/**
* Config value for parameter {@link #UPGRADE_FROM_CONFIG "upgrade.from"} for upgrading an application from version {@code 2.2.x}.
*/
@SuppressWarnings("WeakerAccess")
public static final String UPGRADE_FROM_22 = "2.2";
public static final String UPGRADE_FROM_22 = UpgradeFromValues.UPGRADE_FROM_22.toString();
/**
* Config value for parameter {@link #UPGRADE_FROM_CONFIG "upgrade.from"} for upgrading an application from version {@code 2.3.x}.
*/
@SuppressWarnings("WeakerAccess")
public static final String UPGRADE_FROM_23 = "2.3";
public static final String UPGRADE_FROM_23 = UpgradeFromValues.UPGRADE_FROM_23.toString();
/**
* Config value for parameter {@link #UPGRADE_FROM_CONFIG "upgrade.from"} for upgrading an application from version {@code 2.4.x}.
*/
@SuppressWarnings("WeakerAccess")
public static final String UPGRADE_FROM_24 = "2.4";
public static final String UPGRADE_FROM_24 = UpgradeFromValues.UPGRADE_FROM_24.toString();
/**
* Config value for parameter {@link #UPGRADE_FROM_CONFIG "upgrade.from"} for upgrading an application from version {@code 2.5.x}.
*/
@SuppressWarnings("WeakerAccess")
public static final String UPGRADE_FROM_25 = "2.5";
public static final String UPGRADE_FROM_25 = UpgradeFromValues.UPGRADE_FROM_25.toString();
/**
* Config value for parameter {@link #UPGRADE_FROM_CONFIG "upgrade.from"} for upgrading an application from version {@code 2.6.x}.
*/
@SuppressWarnings("WeakerAccess")
public static final String UPGRADE_FROM_26 = "2.6";
public static final String UPGRADE_FROM_26 = UpgradeFromValues.UPGRADE_FROM_26.toString();
/**
* Config value for parameter {@link #UPGRADE_FROM_CONFIG "upgrade.from"} for upgrading an application from version {@code 2.7.x}.
*/
@SuppressWarnings("WeakerAccess")
public static final String UPGRADE_FROM_27 = "2.7";
public static final String UPGRADE_FROM_27 = UpgradeFromValues.UPGRADE_FROM_27.toString();
/**
* Config value for parameter {@link #UPGRADE_FROM_CONFIG "upgrade.from"} for upgrading an application from version {@code 2.8.x}.
*/
@SuppressWarnings("WeakerAccess")
public static final String UPGRADE_FROM_28 = "2.8";
public static final String UPGRADE_FROM_28 = UpgradeFromValues.UPGRADE_FROM_28.toString();
/**
* Config value for parameter {@link #UPGRADE_FROM_CONFIG "upgrade.from"} for upgrading an application from version {@code 3.0.x}.
*/
@SuppressWarnings("WeakerAccess")
public static final String UPGRADE_FROM_30 = "3.0";
public static final String UPGRADE_FROM_30 = UpgradeFromValues.UPGRADE_FROM_30.toString();
/**
* Config value for parameter {@link #UPGRADE_FROM_CONFIG "upgrade.from"} for upgrading an application from version {@code 3.1.x}.
*/
@SuppressWarnings("WeakerAccess")
public static final String UPGRADE_FROM_31 = "3.1";
public static final String UPGRADE_FROM_31 = UpgradeFromValues.UPGRADE_FROM_31.toString();
/**
* Config value for parameter {@link #UPGRADE_FROM_CONFIG "upgrade.from"} for upgrading an application from version {@code 3.2.x}.
*/
@SuppressWarnings("WeakerAccess")
public static final String UPGRADE_FROM_32 = "3.2";
public static final String UPGRADE_FROM_32 = UpgradeFromValues.UPGRADE_FROM_32.toString();
/**
* Config value for parameter {@link #UPGRADE_FROM_CONFIG "upgrade.from"} for upgrading an application from version {@code 3.3.x}.
*/
@SuppressWarnings("WeakerAccess")
public static final String UPGRADE_FROM_33 = "3.3";
public static final String UPGRADE_FROM_33 = UpgradeFromValues.UPGRADE_FROM_33.toString();
/**
* Config value for parameter {@link #UPGRADE_FROM_CONFIG "upgrade.from"} for upgrading an application from version {@code 3.4.x}.
*/
@SuppressWarnings("WeakerAccess")
public static final String UPGRADE_FROM_34 = "3.4";
public static final String UPGRADE_FROM_34 = UpgradeFromValues.UPGRADE_FROM_34.toString();
/**
* Config value for parameter {@link #PROCESSING_GUARANTEE_CONFIG "processing.guarantee"} for at-least-once processing guarantees.
@ -993,9 +995,9 @@ public class StreamsConfig extends AbstractConfig { @@ -993,9 +995,9 @@ public class StreamsConfig extends AbstractConfig {
Importance.LOW,
REPARTITION_PURGE_INTERVAL_MS_DOC)
.define(CONNECTIONS_MAX_IDLE_MS_CONFIG,
ConfigDef.Type.LONG,
Type.LONG,
9 * 60 * 1000L,
ConfigDef.Importance.LOW,
Importance.LOW,
CommonClientConfigs.CONNECTIONS_MAX_IDLE_MS_DOC)
.define(DEFAULT_DSL_STORE_CONFIG,
Type.STRING,
@ -1009,10 +1011,10 @@ public class StreamsConfig extends AbstractConfig { @@ -1009,10 +1011,10 @@ public class StreamsConfig extends AbstractConfig {
Importance.LOW,
DEFAULT_CLIENT_SUPPLIER_DOC)
.define(METADATA_MAX_AGE_CONFIG,
ConfigDef.Type.LONG,
Type.LONG,
5 * 60 * 1000L,
atLeast(0),
ConfigDef.Importance.LOW,
Importance.LOW,
CommonClientConfigs.METADATA_MAX_AGE_DOC)
.define(METRICS_NUM_SAMPLES_CONFIG,
Type.INT,
@ -1069,25 +1071,25 @@ public class StreamsConfig extends AbstractConfig { @@ -1069,25 +1071,25 @@ public class StreamsConfig extends AbstractConfig {
Type.LONG,
1000L,
atLeast(0L),
ConfigDef.Importance.LOW,
Importance.LOW,
CommonClientConfigs.RECONNECT_BACKOFF_MAX_MS_DOC)
.define(RETRIES_CONFIG,
Type.INT,
0,
between(0, Integer.MAX_VALUE),
ConfigDef.Importance.LOW,
Importance.LOW,
CommonClientConfigs.RETRIES_DOC)
.define(RETRY_BACKOFF_MS_CONFIG,
Type.LONG,
100L,
atLeast(0L),
ConfigDef.Importance.LOW,
Importance.LOW,
CommonClientConfigs.RETRY_BACKOFF_MS_DOC)
.define(REQUEST_TIMEOUT_MS_CONFIG,
Type.INT,
40 * 1000,
atLeast(0),
ConfigDef.Importance.LOW,
Importance.LOW,
CommonClientConfigs.REQUEST_TIMEOUT_MS_DOC)
.define(ROCKSDB_CONFIG_SETTER_CLASS_CONFIG,
Type.CLASS,
@ -1106,29 +1108,13 @@ public class StreamsConfig extends AbstractConfig { @@ -1106,29 +1108,13 @@ public class StreamsConfig extends AbstractConfig {
Importance.LOW,
STATE_CLEANUP_DELAY_MS_DOC)
.define(UPGRADE_FROM_CONFIG,
ConfigDef.Type.STRING,
Type.STRING,
null,
in(null,
UPGRADE_FROM_0100,
UPGRADE_FROM_0101,
UPGRADE_FROM_0102,
UPGRADE_FROM_0110,
UPGRADE_FROM_10,
UPGRADE_FROM_11,
UPGRADE_FROM_20,
UPGRADE_FROM_21,
UPGRADE_FROM_22,
UPGRADE_FROM_23,
UPGRADE_FROM_24,
UPGRADE_FROM_25,
UPGRADE_FROM_26,
UPGRADE_FROM_27,
UPGRADE_FROM_28,
UPGRADE_FROM_30,
UPGRADE_FROM_31,
UPGRADE_FROM_32,
UPGRADE_FROM_33,
UPGRADE_FROM_34),
in(Stream.concat(
Stream.of((String) null),
Arrays.stream(UpgradeFromValues.values()).map(UpgradeFromValues::toString)
).toArray(String[]::new)
),
Importance.LOW,
UPGRADE_FROM_DOC)
.define(WINDOWED_INNER_CLASS_SERDE,

53
streams/src/main/java/org/apache/kafka/streams/internals/UpgradeFromValues.java

@ -0,0 +1,53 @@ @@ -0,0 +1,53 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.streams.internals;
public enum UpgradeFromValues {
UPGRADE_FROM_0100("0.10.0"),
UPGRADE_FROM_0101("0.10.1"),
UPGRADE_FROM_0102("0.10.2"),
UPGRADE_FROM_0110("0.11.0"),
UPGRADE_FROM_10("1.0"),
UPGRADE_FROM_11("1.1"),
UPGRADE_FROM_20("2.0"),
UPGRADE_FROM_21("2.1"),
UPGRADE_FROM_22("2.2"),
UPGRADE_FROM_23("2.3"),
UPGRADE_FROM_24("2.4"),
UPGRADE_FROM_25("2.5"),
UPGRADE_FROM_26("2.6"),
UPGRADE_FROM_27("2.7"),
UPGRADE_FROM_28("2.8"),
UPGRADE_FROM_30("3.0"),
UPGRADE_FROM_31("3.1"),
UPGRADE_FROM_32("3.2"),
UPGRADE_FROM_33("3.3"),
UPGRADE_FROM_34("3.4");
private final String value;
UpgradeFromValues(final String value) {
this.value = value;
}
public static UpgradeFromValues getValueFromString(final String upgradeFrom) {
return UpgradeFromValues.valueOf("UPGRADE_FROM_" + upgradeFrom.replace(".", ""));
}
public String toString() {
return value;
}
}

44
streams/src/main/java/org/apache/kafka/streams/kstream/internals/ChangedSerializer.java

@ -21,6 +21,7 @@ import org.apache.kafka.common.serialization.Serializer; @@ -21,6 +21,7 @@ import org.apache.kafka.common.serialization.Serializer;
import org.apache.kafka.common.utils.ByteUtils;
import org.apache.kafka.streams.StreamsConfig;
import org.apache.kafka.streams.errors.StreamsException;
import org.apache.kafka.streams.internals.UpgradeFromValues;
import org.apache.kafka.streams.processor.internals.SerdeGetter;
import java.nio.ByteBuffer;
@ -56,27 +57,28 @@ public class ChangedSerializer<T> implements Serializer<Change<T>>, WrappingNull @@ -56,27 +57,28 @@ public class ChangedSerializer<T> implements Serializer<Change<T>>, WrappingNull
return false;
}
switch ((String) upgradeFrom) {
case StreamsConfig.UPGRADE_FROM_0100:
case StreamsConfig.UPGRADE_FROM_0101:
case StreamsConfig.UPGRADE_FROM_0102:
case StreamsConfig.UPGRADE_FROM_0110:
case StreamsConfig.UPGRADE_FROM_10:
case StreamsConfig.UPGRADE_FROM_11:
case StreamsConfig.UPGRADE_FROM_20:
case StreamsConfig.UPGRADE_FROM_21:
case StreamsConfig.UPGRADE_FROM_22:
case StreamsConfig.UPGRADE_FROM_23:
case StreamsConfig.UPGRADE_FROM_24:
case StreamsConfig.UPGRADE_FROM_25:
case StreamsConfig.UPGRADE_FROM_26:
case StreamsConfig.UPGRADE_FROM_27:
case StreamsConfig.UPGRADE_FROM_28:
case StreamsConfig.UPGRADE_FROM_30:
case StreamsConfig.UPGRADE_FROM_31:
case StreamsConfig.UPGRADE_FROM_32:
case StreamsConfig.UPGRADE_FROM_33:
case StreamsConfig.UPGRADE_FROM_34:
switch (UpgradeFromValues.getValueFromString((String) upgradeFrom)) {
case UPGRADE_FROM_0100:
case UPGRADE_FROM_0101:
case UPGRADE_FROM_0102:
case UPGRADE_FROM_0110:
case UPGRADE_FROM_10:
case UPGRADE_FROM_11:
case UPGRADE_FROM_20:
case UPGRADE_FROM_21:
case UPGRADE_FROM_22:
case UPGRADE_FROM_23:
case UPGRADE_FROM_24:
case UPGRADE_FROM_25:
case UPGRADE_FROM_26:
case UPGRADE_FROM_27:
case UPGRADE_FROM_28:
case UPGRADE_FROM_30:
case UPGRADE_FROM_31:
case UPGRADE_FROM_32:
case UPGRADE_FROM_33:
case UPGRADE_FROM_34:
// there is no need to add new version here
return true;
default:
return false;

44
streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableRepartitionMap.java

@ -19,6 +19,7 @@ package org.apache.kafka.streams.kstream.internals; @@ -19,6 +19,7 @@ package org.apache.kafka.streams.kstream.internals;
import org.apache.kafka.streams.KeyValue;
import org.apache.kafka.streams.StreamsConfig;
import org.apache.kafka.streams.errors.StreamsException;
import org.apache.kafka.streams.internals.UpgradeFromValues;
import org.apache.kafka.streams.kstream.KeyValueMapper;
import org.apache.kafka.streams.processor.api.ContextualProcessor;
import org.apache.kafka.streams.processor.api.Processor;
@ -98,27 +99,28 @@ public class KTableRepartitionMap<K, V, K1, V1> implements KTableRepartitionMapS @@ -98,27 +99,28 @@ public class KTableRepartitionMap<K, V, K1, V1> implements KTableRepartitionMapS
return true;
}
switch ((String) upgradeFrom) {
case StreamsConfig.UPGRADE_FROM_0100:
case StreamsConfig.UPGRADE_FROM_0101:
case StreamsConfig.UPGRADE_FROM_0102:
case StreamsConfig.UPGRADE_FROM_0110:
case StreamsConfig.UPGRADE_FROM_10:
case StreamsConfig.UPGRADE_FROM_11:
case StreamsConfig.UPGRADE_FROM_20:
case StreamsConfig.UPGRADE_FROM_21:
case StreamsConfig.UPGRADE_FROM_22:
case StreamsConfig.UPGRADE_FROM_23:
case StreamsConfig.UPGRADE_FROM_24:
case StreamsConfig.UPGRADE_FROM_25:
case StreamsConfig.UPGRADE_FROM_26:
case StreamsConfig.UPGRADE_FROM_27:
case StreamsConfig.UPGRADE_FROM_28:
case StreamsConfig.UPGRADE_FROM_30:
case StreamsConfig.UPGRADE_FROM_31:
case StreamsConfig.UPGRADE_FROM_32:
case StreamsConfig.UPGRADE_FROM_33:
case StreamsConfig.UPGRADE_FROM_34:
switch (UpgradeFromValues.getValueFromString((String) upgradeFrom)) {
case UPGRADE_FROM_0100:
case UPGRADE_FROM_0101:
case UPGRADE_FROM_0102:
case UPGRADE_FROM_0110:
case UPGRADE_FROM_10:
case UPGRADE_FROM_11:
case UPGRADE_FROM_20:
case UPGRADE_FROM_21:
case UPGRADE_FROM_22:
case UPGRADE_FROM_23:
case UPGRADE_FROM_24:
case UPGRADE_FROM_25:
case UPGRADE_FROM_26:
case UPGRADE_FROM_27:
case UPGRADE_FROM_28:
case UPGRADE_FROM_30:
case UPGRADE_FROM_31:
case UPGRADE_FROM_32:
case UPGRADE_FROM_33:
case UPGRADE_FROM_34:
// there is no need to add new versions here
return false;
default:
return true;

42
streams/src/main/java/org/apache/kafka/streams/kstream/internals/foreignkeyjoin/SubscriptionWrapperSerde.java

@ -22,6 +22,7 @@ import org.apache.kafka.common.serialization.Deserializer; @@ -22,6 +22,7 @@ import org.apache.kafka.common.serialization.Deserializer;
import org.apache.kafka.common.serialization.Serde;
import org.apache.kafka.common.serialization.Serializer;
import org.apache.kafka.streams.StreamsConfig;
import org.apache.kafka.streams.internals.UpgradeFromValues;
import org.apache.kafka.streams.kstream.internals.WrappingNullableDeserializer;
import org.apache.kafka.streams.kstream.internals.WrappingNullableSerde;
import org.apache.kafka.streams.kstream.internals.WrappingNullableSerializer;
@ -74,26 +75,27 @@ public class SubscriptionWrapperSerde<K> extends WrappingNullableSerde<Subscript @@ -74,26 +75,27 @@ public class SubscriptionWrapperSerde<K> extends WrappingNullableSerde<Subscript
return false;
}
switch ((String) upgradeFrom) {
case StreamsConfig.UPGRADE_FROM_0100:
case StreamsConfig.UPGRADE_FROM_0101:
case StreamsConfig.UPGRADE_FROM_0102:
case StreamsConfig.UPGRADE_FROM_0110:
case StreamsConfig.UPGRADE_FROM_10:
case StreamsConfig.UPGRADE_FROM_11:
case StreamsConfig.UPGRADE_FROM_20:
case StreamsConfig.UPGRADE_FROM_21:
case StreamsConfig.UPGRADE_FROM_22:
case StreamsConfig.UPGRADE_FROM_23:
case StreamsConfig.UPGRADE_FROM_24:
case StreamsConfig.UPGRADE_FROM_25:
case StreamsConfig.UPGRADE_FROM_26:
case StreamsConfig.UPGRADE_FROM_27:
case StreamsConfig.UPGRADE_FROM_28:
case StreamsConfig.UPGRADE_FROM_30:
case StreamsConfig.UPGRADE_FROM_31:
case StreamsConfig.UPGRADE_FROM_32:
case StreamsConfig.UPGRADE_FROM_33:
switch (UpgradeFromValues.getValueFromString((String) upgradeFrom)) {
case UPGRADE_FROM_0100:
case UPGRADE_FROM_0101:
case UPGRADE_FROM_0102:
case UPGRADE_FROM_0110:
case UPGRADE_FROM_10:
case UPGRADE_FROM_11:
case UPGRADE_FROM_20:
case UPGRADE_FROM_21:
case UPGRADE_FROM_22:
case UPGRADE_FROM_23:
case UPGRADE_FROM_24:
case UPGRADE_FROM_25:
case UPGRADE_FROM_26:
case UPGRADE_FROM_27:
case UPGRADE_FROM_28:
case UPGRADE_FROM_30:
case UPGRADE_FROM_31:
case UPGRADE_FROM_32:
case UPGRADE_FROM_33:
// there is no need to add new versions here
return true;
default:
return false;

2
streams/src/main/java/org/apache/kafka/streams/processor/internals/InternalTopicManager.java

@ -584,7 +584,7 @@ public class InternalTopicManager { @@ -584,7 +584,7 @@ public class InternalTopicManager {
*/
// visible for testing
protected Map<String, List<TopicPartitionInfo>> getTopicPartitionInfo(final Set<String> topics,
final Set<String> tempUnknownTopics) {
final Set<String> tempUnknownTopics) {
final DescribeTopicsResult describeTopicsResult = adminClient.describeTopics(topics);
final Map<String, KafkaFuture<TopicDescription>> futures = describeTopicsResult.topicNameValues();

87
streams/src/main/java/org/apache/kafka/streams/processor/internals/assignment/AssignorConfiguration.java

@ -25,6 +25,7 @@ import org.apache.kafka.common.utils.LogContext; @@ -25,6 +25,7 @@ import org.apache.kafka.common.utils.LogContext;
import org.apache.kafka.common.utils.Utils;
import org.apache.kafka.streams.StreamsConfig;
import org.apache.kafka.streams.StreamsConfig.InternalConfig;
import org.apache.kafka.streams.internals.UpgradeFromValues;
import org.apache.kafka.streams.processor.internals.ClientUtils;
import org.apache.kafka.streams.processor.internals.InternalTopicManager;
import org.slf4j.Logger;
@ -95,17 +96,17 @@ public final class AssignorConfiguration { @@ -95,17 +96,17 @@ public final class AssignorConfiguration {
public RebalanceProtocol rebalanceProtocol() {
final String upgradeFrom = streamsConfig.getString(StreamsConfig.UPGRADE_FROM_CONFIG);
if (upgradeFrom != null) {
switch (upgradeFrom) {
case StreamsConfig.UPGRADE_FROM_0100:
case StreamsConfig.UPGRADE_FROM_0101:
case StreamsConfig.UPGRADE_FROM_0102:
case StreamsConfig.UPGRADE_FROM_0110:
case StreamsConfig.UPGRADE_FROM_10:
case StreamsConfig.UPGRADE_FROM_11:
case StreamsConfig.UPGRADE_FROM_20:
case StreamsConfig.UPGRADE_FROM_21:
case StreamsConfig.UPGRADE_FROM_22:
case StreamsConfig.UPGRADE_FROM_23:
switch (UpgradeFromValues.getValueFromString(upgradeFrom)) {
case UPGRADE_FROM_0100:
case UPGRADE_FROM_0101:
case UPGRADE_FROM_0102:
case UPGRADE_FROM_0110:
case UPGRADE_FROM_10:
case UPGRADE_FROM_11:
case UPGRADE_FROM_20:
case UPGRADE_FROM_21:
case UPGRADE_FROM_22:
case UPGRADE_FROM_23:
// ATTENTION: The following log messages is used for verification in system test
// streams/streams_cooperative_rebalance_upgrade_test.py::StreamsCooperativeRebalanceUpgradeTest.test_upgrade_to_cooperative_rebalance
// If you change it, please do also change the system test accordingly and
@ -114,15 +115,18 @@ public final class AssignorConfiguration { @@ -114,15 +115,18 @@ public final class AssignorConfiguration {
log.warn("The eager rebalancing protocol is deprecated and will stop being supported in a future release." +
" Please be prepared to remove the 'upgrade.from' config soon.");
return RebalanceProtocol.EAGER;
case StreamsConfig.UPGRADE_FROM_24:
case StreamsConfig.UPGRADE_FROM_25:
case StreamsConfig.UPGRADE_FROM_26:
case StreamsConfig.UPGRADE_FROM_27:
case StreamsConfig.UPGRADE_FROM_28:
case StreamsConfig.UPGRADE_FROM_30:
case StreamsConfig.UPGRADE_FROM_31:
case StreamsConfig.UPGRADE_FROM_32:
case StreamsConfig.UPGRADE_FROM_33:
case UPGRADE_FROM_24:
case UPGRADE_FROM_25:
case UPGRADE_FROM_26:
case UPGRADE_FROM_27:
case UPGRADE_FROM_28:
case UPGRADE_FROM_30:
case UPGRADE_FROM_31:
case UPGRADE_FROM_32:
case UPGRADE_FROM_33:
case UPGRADE_FROM_34:
// we need to add new version when new "upgrade.from" values become available
// This config is for explicitly sending FK response to a requested partition
// and should not affect the rebalance protocol
break;
@ -145,39 +149,42 @@ public final class AssignorConfiguration { @@ -145,39 +149,42 @@ public final class AssignorConfiguration {
public int configuredMetadataVersion(final int priorVersion) {
final String upgradeFrom = streamsConfig.getString(StreamsConfig.UPGRADE_FROM_CONFIG);
if (upgradeFrom != null) {
switch (upgradeFrom) {
case StreamsConfig.UPGRADE_FROM_0100:
switch (UpgradeFromValues.getValueFromString(upgradeFrom)) {
case UPGRADE_FROM_0100:
log.info(
"Downgrading metadata version from {} to 1 for upgrade from 0.10.0.x.",
LATEST_SUPPORTED_VERSION
);
return 1;
case StreamsConfig.UPGRADE_FROM_0101:
case StreamsConfig.UPGRADE_FROM_0102:
case StreamsConfig.UPGRADE_FROM_0110:
case StreamsConfig.UPGRADE_FROM_10:
case StreamsConfig.UPGRADE_FROM_11:
case UPGRADE_FROM_0101:
case UPGRADE_FROM_0102:
case UPGRADE_FROM_0110:
case UPGRADE_FROM_10:
case UPGRADE_FROM_11:
log.info(
"Downgrading metadata version from {} to 2 for upgrade from {}.x.",
LATEST_SUPPORTED_VERSION,
upgradeFrom
);
return 2;
case StreamsConfig.UPGRADE_FROM_20:
case StreamsConfig.UPGRADE_FROM_21:
case StreamsConfig.UPGRADE_FROM_22:
case StreamsConfig.UPGRADE_FROM_23:
case UPGRADE_FROM_20:
case UPGRADE_FROM_21:
case UPGRADE_FROM_22:
case UPGRADE_FROM_23:
// These configs are for cooperative rebalancing and should not affect the metadata version
break;
case StreamsConfig.UPGRADE_FROM_24:
case StreamsConfig.UPGRADE_FROM_25:
case StreamsConfig.UPGRADE_FROM_26:
case StreamsConfig.UPGRADE_FROM_27:
case StreamsConfig.UPGRADE_FROM_28:
case StreamsConfig.UPGRADE_FROM_30:
case StreamsConfig.UPGRADE_FROM_31:
case StreamsConfig.UPGRADE_FROM_32:
case StreamsConfig.UPGRADE_FROM_33:
case UPGRADE_FROM_24:
case UPGRADE_FROM_25:
case UPGRADE_FROM_26:
case UPGRADE_FROM_27:
case UPGRADE_FROM_28:
case UPGRADE_FROM_30:
case UPGRADE_FROM_31:
case UPGRADE_FROM_32:
case UPGRADE_FROM_33:
case UPGRADE_FROM_34:
// we need to add new version when new "upgrade.from" values become available
// This config is for explicitly sending FK response to a requested partition
// and should not affect the metadata version
break;

13
streams/src/test/java/org/apache/kafka/streams/StreamsConfigTest.java

@ -31,6 +31,7 @@ import org.apache.kafka.common.serialization.Serdes; @@ -31,6 +31,7 @@ import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.common.serialization.Serializer;
import org.apache.kafka.common.utils.Utils;
import org.apache.kafka.streams.errors.StreamsException;
import org.apache.kafka.streams.internals.UpgradeFromValues;
import org.apache.kafka.streams.processor.FailOnInvalidTimestamp;
import org.apache.kafka.streams.processor.TimestampExtractor;
import org.apache.kafka.streams.processor.internals.DefaultKafkaClientSupplier;
@ -1434,6 +1435,18 @@ public class StreamsConfigTest { @@ -1434,6 +1435,18 @@ public class StreamsConfigTest {
assertThrows(ConfigException.class, () -> new StreamsConfig(props));
}
@Test
public void shouldSupportAllUpgradeFromValues() {
for (final UpgradeFromValues upgradeFrom : UpgradeFromValues.values()) {
props.put(StreamsConfig.UPGRADE_FROM_CONFIG, upgradeFrom.toString());
try {
new StreamsConfig(props);
} catch (final Exception fatal) {
throw new AssertionError("StreamsConfig did not accept `upgrade.from` config value `" + upgradeFrom + "`");
}
}
}
static class MisconfiguredSerde implements Serde<Object> {
@Override
public void configure(final Map<String, ?> configs, final boolean isKey) {

43
streams/src/test/java/org/apache/kafka/streams/processor/internals/assignment/AssignorConfigurationTest.java

@ -17,14 +17,29 @@ @@ -17,14 +17,29 @@
package org.apache.kafka.streams.processor.internals.assignment;
import org.apache.kafka.common.config.ConfigException;
import org.apache.kafka.streams.StreamsConfig;
import org.apache.kafka.streams.internals.UpgradeFromValues;
import org.junit.Before;
import org.junit.Test;
import java.util.HashMap;
import java.util.Map;
import static org.apache.kafka.streams.processor.internals.assignment.AssignmentTestUtils.EMPTY_RACK_AWARE_ASSIGNMENT_TAGS;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.containsString;
import static org.junit.Assert.assertThrows;
import static org.mockito.Mockito.mock;
public class AssignorConfigurationTest {
private final Map<String, Object> config = new HashMap<>();
@Before
public void setup() {
config.put(StreamsConfig.APPLICATION_ID_CONFIG, "app.id");
config.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "dummy");
config.put(StreamsConfig.InternalConfig.REFERENCE_CONTAINER_PARTITION_ASSIGNOR, mock(ReferenceContainer.class));
}
@Test
public void configsShouldRejectZeroWarmups() {
@ -35,4 +50,32 @@ public class AssignorConfigurationTest { @@ -35,4 +50,32 @@ public class AssignorConfigurationTest {
assertThat(exception.getMessage(), containsString("Invalid value 0 for configuration max.warmup.replicas"));
}
@Test
public void rebalanceProtocolShouldSupportAllUpgradeFromVersions() {
for (final UpgradeFromValues upgradeFrom : UpgradeFromValues.values()) {
config.put(StreamsConfig.UPGRADE_FROM_CONFIG, upgradeFrom.toString());
final AssignorConfiguration assignorConfiguration = new AssignorConfiguration(config);
try {
assignorConfiguration.rebalanceProtocol();
} catch (final Exception error) {
throw new AssertionError("Upgrade from " + upgradeFrom + " failed with " + error.getMessage() + "!");
}
}
}
@Test
public void configuredMetadataVersionShouldSupportAllUpgradeFromVersions() {
for (final UpgradeFromValues upgradeFrom : UpgradeFromValues.values()) {
config.put(StreamsConfig.UPGRADE_FROM_CONFIG, upgradeFrom.toString());
final AssignorConfiguration assignorConfiguration = new AssignorConfiguration(config);
try {
assignorConfiguration.configuredMetadataVersion(0);
} catch (final Exception error) {
throw new AssertionError("Upgrade from " + upgradeFrom + " failed with " + error.getMessage() + "!");
}
}
}
}

Loading…
Cancel
Save