Browse Source

KAFKA-10599: Implement basic CLI tool for feature versioning system (#9409)

This PR implements a basic CLI tool for feature versioning system. The KIP-584 write up has been updated to suit this PR. The following is implemented in this PR:

--describe:
Describe supported and finalized features.
Usage: $> ./bin/kafka-features.sh --bootstrap-server host1:port1, host2:port2 --describe [--from-controller] [--command-config <path_to_java_properties_file>]
Optionally, use the --from-controller option to get features from the controller.
--upgrade-all:
Upgrades all features known to the tool to their highest max version levels.
Usage: $> ./bin/kafka-features.sh --bootstrap-server host1:port1, host2:port2 --upgrade-all [--dry-run] [--command-config <path_to_java_properties_file>]
Optionally, use the --dry-run CLI option to preview the feature updates without actually applying them.
--downgrade-all:
Downgrades existing finalized features to the highest max version levels known to this tool.
Usage: $> ./bin/kafka-features.sh --bootstrap-server host1:port1, host2:port2 --downgrade-all [--dry-run] [--command-config <path_to_java_properties_file>].
Optionally, use the --dry-run CLI option to preview the feature updates without actually applying them.

Reviewers: Boyang Chen <boyang@confluent.io>, Jun Rao <junrao@gmail.com>
pull/9450/head
Kowshik Prakasam 4 years ago committed by GitHub
parent
commit
d99fe49234
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 17
      bin/kafka-features.sh
  2. 408
      core/src/main/scala/kafka/admin/FeatureCommand.scala
  3. 244
      core/src/test/scala/unit/kafka/admin/FeatureCommandTest.scala

17
bin/kafka-features.sh

@ -0,0 +1,17 @@ @@ -0,0 +1,17 @@
#!/bin/bash
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
exec $(dirname $0)/kafka-run-class.sh kafka.admin.FeatureCommand "$@"

408
core/src/main/scala/kafka/admin/FeatureCommand.scala

@ -0,0 +1,408 @@ @@ -0,0 +1,408 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package kafka.admin
import kafka.server.BrokerFeatures
import kafka.utils.{CommandDefaultOptions, CommandLineUtils, Exit}
import org.apache.kafka.clients.CommonClientConfigs
import org.apache.kafka.clients.admin.{Admin, DescribeFeaturesOptions, FeatureMetadata, FeatureUpdate, UpdateFeaturesOptions}
import org.apache.kafka.common.feature.{Features, SupportedVersionRange}
import org.apache.kafka.common.utils.Utils
import java.util.Properties
import scala.collection.Seq
import scala.collection.immutable.ListMap
import scala.jdk.CollectionConverters._
import joptsimple.OptionSpec
import scala.concurrent.ExecutionException
object FeatureCommand {
def main(args: Array[String]): Unit = {
val opts = new FeatureCommandOptions(args)
val featureApis = new FeatureApis(opts)
var exitCode = 0
try {
featureApis.execute()
} catch {
case e: IllegalArgumentException =>
printException(e)
opts.parser.printHelpOn(System.err)
exitCode = 1
case _: UpdateFeaturesException =>
exitCode = 1
case e: ExecutionException =>
val cause = if (e.getCause == null) e else e.getCause
printException(cause)
exitCode = 1
case e: Throwable =>
printException(e)
exitCode = 1
} finally {
featureApis.close()
Exit.exit(exitCode)
}
}
private def printException(exception: Throwable): Unit = {
System.err.println("\nError encountered when executing command: " + Utils.stackTrace(exception))
}
}
class UpdateFeaturesException(message: String) extends RuntimeException(message)
/**
* A class that provides necessary APIs to bridge feature APIs provided by the the Admin client with
* the requirements of the CLI tool.
*
* @param opts the CLI options
*/
class FeatureApis(private var opts: FeatureCommandOptions) {
private var supportedFeatures = BrokerFeatures.createDefault().supportedFeatures
private var adminClient = FeatureApis.createAdminClient(opts)
private def pad(op: String): String = {
f"$op%11s"
}
private val addOp = pad("[Add]")
private val upgradeOp = pad("[Upgrade]")
private val deleteOp = pad("[Delete]")
private val downgradeOp = pad("[Downgrade]")
// For testing only.
private[admin] def setSupportedFeatures(newFeatures: Features[SupportedVersionRange]): Unit = {
supportedFeatures = newFeatures
}
// For testing only.
private[admin] def setOptions(newOpts: FeatureCommandOptions): Unit = {
adminClient.close()
adminClient = FeatureApis.createAdminClient(newOpts)
opts = newOpts
}
private def describeFeatures(sendRequestToController: Boolean): FeatureMetadata = {
val options = new DescribeFeaturesOptions().sendRequestToController(sendRequestToController)
adminClient.describeFeatures(options).featureMetadata().get()
}
/**
* Describes the supported and finalized features. If the --from-controller CLI option
* is provided, then the request is issued only to the controller, otherwise the request is issued
* to any of the provided bootstrap servers.
*/
def describeFeatures(): Unit = {
val result = describeFeatures(opts.hasFromControllerOption)
val features = result.supportedFeatures.asScala.keys.toSet ++ result.finalizedFeatures.asScala.keys.toSet
features.toList.sorted.foreach {
feature =>
val output = new StringBuilder()
output.append(s"Feature: $feature")
val (supportedMinVersion, supportedMaxVersion) = {
val supportedVersionRange = result.supportedFeatures.get(feature)
if (supportedVersionRange == null) {
("-", "-")
} else {
(supportedVersionRange.minVersion, supportedVersionRange.maxVersion)
}
}
output.append(s"\tSupportedMinVersion: $supportedMinVersion")
output.append(s"\tSupportedMaxVersion: $supportedMaxVersion")
val (finalizedMinVersionLevel, finalizedMaxVersionLevel) = {
val finalizedVersionRange = result.finalizedFeatures.get(feature)
if (finalizedVersionRange == null) {
("-", "-")
} else {
(finalizedVersionRange.minVersionLevel, finalizedVersionRange.maxVersionLevel)
}
}
output.append(s"\tFinalizedMinVersionLevel: $finalizedMinVersionLevel")
output.append(s"\tFinalizedMaxVersionLevel: $finalizedMaxVersionLevel")
val epoch = {
if (result.finalizedFeaturesEpoch.isPresent) {
result.finalizedFeaturesEpoch.get.toString
} else {
"-"
}
}
output.append(s"\tEpoch: $epoch")
println(output)
}
}
/**
* Upgrades all features known to this tool to their highest max version levels. The method may
* add new finalized features if they were not finalized previously, but it does not delete
* any existing finalized feature. The results of the feature updates are written to STDOUT.
*
* NOTE: if the --dry-run CLI option is provided, this method only prints the expected feature
* updates to STDOUT, without applying them.
*
* @throws UpdateFeaturesException if at least one of the feature updates failed
*/
def upgradeAllFeatures(): Unit = {
val metadata = describeFeatures(true)
val existingFinalizedFeatures = metadata.finalizedFeatures
val updates = supportedFeatures.features.asScala.map {
case (feature, targetVersionRange) =>
val existingVersionRange = existingFinalizedFeatures.get(feature)
if (existingVersionRange == null) {
val updateStr =
addOp +
s"\tFeature: $feature" +
s"\tExistingFinalizedMaxVersion: -" +
s"\tNewFinalizedMaxVersion: ${targetVersionRange.max}"
(feature, Some((updateStr, new FeatureUpdate(targetVersionRange.max, false))))
} else {
if (targetVersionRange.max > existingVersionRange.maxVersionLevel) {
val updateStr =
upgradeOp +
s"\tFeature: $feature" +
s"\tExistingFinalizedMaxVersion: ${existingVersionRange.maxVersionLevel}" +
s"\tNewFinalizedMaxVersion: ${targetVersionRange.max}"
(feature, Some((updateStr, new FeatureUpdate(targetVersionRange.max, false))))
} else {
(feature, Option.empty)
}
}
}.filter {
case(_, updateInfo) => updateInfo.isDefined
}.map {
case(feature, updateInfo) => (feature, updateInfo.get)
}.toMap
if (updates.nonEmpty) {
maybeApplyFeatureUpdates(updates)
}
}
/**
* Downgrades existing finalized features to the highest max version levels known to this tool.
* The method may delete existing finalized features if they are no longer seen to be supported,
* but it does not add a feature that was not finalized previously. The results of the feature
* updates are written to STDOUT.
*
* NOTE: if the --dry-run CLI option is provided, this method only prints the expected feature
* updates to STDOUT, without applying them.
*
* @throws UpdateFeaturesException if at least one of the feature updates failed
*/
def downgradeAllFeatures(): Unit = {
val metadata = describeFeatures(true)
val existingFinalizedFeatures = metadata.finalizedFeatures
val supportedFeaturesMap = supportedFeatures.features
val updates = existingFinalizedFeatures.asScala.map {
case (feature, existingVersionRange) =>
val targetVersionRange = supportedFeaturesMap.get(feature)
if (targetVersionRange == null) {
val updateStr =
deleteOp +
s"\tFeature: $feature" +
s"\tExistingFinalizedMaxVersion: ${existingVersionRange.maxVersionLevel}" +
s"\tNewFinalizedMaxVersion: -"
(feature, Some(updateStr, new FeatureUpdate(0, true)))
} else {
if (targetVersionRange.max < existingVersionRange.maxVersionLevel) {
val updateStr =
downgradeOp +
s"\tFeature: $feature" +
s"\tExistingFinalizedMaxVersion: ${existingVersionRange.maxVersionLevel}" +
s"\tNewFinalizedMaxVersion: ${targetVersionRange.max}"
(feature, Some(updateStr, new FeatureUpdate(targetVersionRange.max, true)))
} else {
(feature, Option.empty)
}
}
}.filter {
case(_, updateInfo) => updateInfo.isDefined
}.map {
case(feature, updateInfo) => (feature, updateInfo.get)
}.toMap
if (updates.nonEmpty) {
maybeApplyFeatureUpdates(updates)
}
}
/**
* Applies the provided feature updates. If the --dry-run CLI option is provided, the method
* only prints the expected feature updates to STDOUT without applying them.
*
* @param updates the feature updates to be applied via the admin client
*
* @throws UpdateFeaturesException if at least one of the feature updates failed
*/
private def maybeApplyFeatureUpdates(updates: Map[String, (String, FeatureUpdate)]): Unit = {
if (opts.hasDryRunOption) {
println("Expected feature updates:" + ListMap(
updates
.toSeq
.sortBy { case(feature, _) => feature} :_*)
.map { case(_, (updateStr, _)) => updateStr}
.mkString("\n"))
} else {
val result = adminClient.updateFeatures(
updates
.map { case(feature, (_, update)) => (feature, update)}
.asJava,
new UpdateFeaturesOptions())
val resultSortedByFeature = ListMap(
result
.values
.asScala
.toSeq
.sortBy { case(feature, _) => feature} :_*)
val failures = resultSortedByFeature.map {
case (feature, updateFuture) =>
val (updateStr, _) = updates(feature)
try {
updateFuture.get
println(updateStr + "\tResult: OK")
0
} catch {
case e: ExecutionException =>
val cause = if (e.getCause == null) e else e.getCause
println(updateStr + "\tResult: FAILED due to " + cause)
1
case e: Throwable =>
println(updateStr + "\tResult: FAILED due to " + e)
1
}
}.sum
if (failures > 0) {
throw new UpdateFeaturesException(s"$failures feature updates failed!")
}
}
}
def execute(): Unit = {
if (opts.hasDescribeOption) {
describeFeatures()
} else if (opts.hasUpgradeAllOption) {
upgradeAllFeatures()
} else if (opts.hasDowngradeAllOption) {
downgradeAllFeatures()
} else {
throw new IllegalStateException("Unexpected state: no CLI command could be executed.")
}
}
def close(): Unit = {
adminClient.close()
}
}
class FeatureCommandOptions(args: Array[String]) extends CommandDefaultOptions(args) {
private val bootstrapServerOpt = parser.accepts(
"bootstrap-server",
"REQUIRED: A comma-separated list of host:port pairs to use for establishing the connection" +
" to the Kafka cluster.")
.withRequiredArg
.describedAs("server to connect to")
.ofType(classOf[String])
private val commandConfigOpt = parser.accepts(
"command-config",
"Property file containing configs to be passed to Admin Client." +
" This is used with --bootstrap-server option when required.")
.withOptionalArg
.describedAs("command config property file")
.ofType(classOf[String])
private val describeOpt = parser.accepts(
"describe",
"Describe supported and finalized features. By default, the features are described from a" +
" random broker. The request can be optionally directed only to the controller using the" +
" --from-controller option.")
private val fromControllerOpt = parser.accepts(
"from-controller",
"Describe supported and finalized features from the controller.")
private val upgradeAllOpt = parser.accepts(
"upgrade-all",
"Upgrades all finalized features to the maximum version levels known to the tool." +
" This command finalizes new features known to the tool that were never finalized" +
" previously in the cluster, but it is guaranteed to not delete any existing feature.")
private val downgradeAllOpt = parser.accepts(
"downgrade-all",
"Downgrades all finalized features to the maximum version levels known to the tool." +
" This command deletes unknown features from the list of finalized features in the" +
" cluster, but it is guaranteed to not add a new feature.")
private val dryRunOpt = parser.accepts(
"dry-run",
"Performs a dry-run of upgrade/downgrade mutations to finalized feature without applying them.")
options = parser.parse(args : _*)
checkArgs()
def has(builder: OptionSpec[_]): Boolean = options.has(builder)
def hasDescribeOption: Boolean = has(describeOpt)
def hasFromControllerOption: Boolean = has(fromControllerOpt)
def hasDryRunOption: Boolean = has(dryRunOpt)
def hasUpgradeAllOption: Boolean = has(upgradeAllOpt)
def hasDowngradeAllOption: Boolean = has(downgradeAllOpt)
def commandConfig: Properties = {
if (has(commandConfigOpt))
Utils.loadProps(options.valueOf(commandConfigOpt))
else
new Properties()
}
def bootstrapServers: String = options.valueOf(bootstrapServerOpt)
def checkArgs(): Unit = {
CommandLineUtils.printHelpAndExitIfNeeded(this, "This tool describes and updates finalized features.")
val numActions = Seq(describeOpt, upgradeAllOpt, downgradeAllOpt).count(has)
if (numActions != 1) {
CommandLineUtils.printUsageAndDie(
parser,
"Command must include exactly one action: --describe, --upgrade-all, --downgrade-all.")
}
CommandLineUtils.checkRequiredArgs(parser, options, bootstrapServerOpt)
if (hasDryRunOption && !hasUpgradeAllOption && !hasDowngradeAllOption) {
CommandLineUtils.printUsageAndDie(
parser,
"Command can contain --dry-run option only when either --upgrade-all or --downgrade-all actions are provided.")
}
if (hasFromControllerOption && !hasDescribeOption) {
CommandLineUtils.printUsageAndDie(
parser,
"Command can contain --from-controller option only when --describe action is provided.")
}
}
}
object FeatureApis {
private def createAdminClient(opts: FeatureCommandOptions): Admin = {
val props = new Properties()
props.putAll(opts.commandConfig)
props.put(CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG, opts.bootstrapServers)
Admin.create(props)
}
}

244
core/src/test/scala/unit/kafka/admin/FeatureCommandTest.scala

@ -0,0 +1,244 @@ @@ -0,0 +1,244 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package kafka.admin
import kafka.api.KAFKA_2_7_IV0
import kafka.server.{BaseRequestTest, KafkaConfig, KafkaServer}
import kafka.utils.TestUtils
import kafka.utils.TestUtils.waitUntilTrue
import org.apache.kafka.common.feature.{Features, SupportedVersionRange}
import org.apache.kafka.common.utils.Utils
import java.util.Properties
import org.junit.Assert.{assertEquals, assertTrue}
import org.junit.Test
import org.scalatest.Assertions.intercept
class FeatureCommandTest extends BaseRequestTest {
override def brokerCount: Int = 3
override def brokerPropertyOverrides(props: Properties): Unit = {
props.put(KafkaConfig.InterBrokerProtocolVersionProp, KAFKA_2_7_IV0.toString)
}
private val defaultSupportedFeatures: Features[SupportedVersionRange] =
Features.supportedFeatures(Utils.mkMap(Utils.mkEntry("feature_1", new SupportedVersionRange(1, 3)),
Utils.mkEntry("feature_2", new SupportedVersionRange(1, 5))))
private def updateSupportedFeatures(features: Features[SupportedVersionRange],
targetServers: Set[KafkaServer]): Unit = {
targetServers.foreach(s => {
s.brokerFeatures.setSupportedFeatures(features)
s.zkClient.updateBrokerInfo(s.createBrokerInfo)
})
// Wait until updates to all BrokerZNode supported features propagate to the controller.
val brokerIds = targetServers.map(s => s.config.brokerId)
waitUntilTrue(
() => servers.exists(s => {
if (s.kafkaController.isActive) {
s.kafkaController.controllerContext.liveOrShuttingDownBrokers
.filter(b => brokerIds.contains(b.id))
.forall(b => {
b.features.equals(features)
})
} else {
false
}
}),
"Controller did not get broker updates")
}
private def updateSupportedFeaturesInAllBrokers(features: Features[SupportedVersionRange]): Unit = {
updateSupportedFeatures(features, Set[KafkaServer]() ++ servers)
}
/**
* Tests if the FeatureApis#describeFeatures API works as expected when describing features before and
* after upgrading features.
*/
@Test
def testDescribeFeaturesSuccess(): Unit = {
updateSupportedFeaturesInAllBrokers(defaultSupportedFeatures)
val featureApis = new FeatureApis(new FeatureCommandOptions(Array("--bootstrap-server", brokerList, "--describe", "--from-controller")))
featureApis.setSupportedFeatures(defaultSupportedFeatures)
try {
val initialDescribeOutput = TestUtils.grabConsoleOutput(featureApis.describeFeatures())
val expectedInitialDescribeOutput =
"Feature: feature_1\tSupportedMinVersion: 1\tSupportedMaxVersion: 3\tFinalizedMinVersionLevel: -\tFinalizedMaxVersionLevel: -\tEpoch: 0\n" +
"Feature: feature_2\tSupportedMinVersion: 1\tSupportedMaxVersion: 5\tFinalizedMinVersionLevel: -\tFinalizedMaxVersionLevel: -\tEpoch: 0\n"
assertEquals(expectedInitialDescribeOutput, initialDescribeOutput)
featureApis.upgradeAllFeatures()
val finalDescribeOutput = TestUtils.grabConsoleOutput(featureApis.describeFeatures())
val expectedFinalDescribeOutput =
"Feature: feature_1\tSupportedMinVersion: 1\tSupportedMaxVersion: 3\tFinalizedMinVersionLevel: 1\tFinalizedMaxVersionLevel: 3\tEpoch: 1\n" +
"Feature: feature_2\tSupportedMinVersion: 1\tSupportedMaxVersion: 5\tFinalizedMinVersionLevel: 1\tFinalizedMaxVersionLevel: 5\tEpoch: 1\n"
assertEquals(expectedFinalDescribeOutput, finalDescribeOutput)
} finally {
featureApis.close()
}
}
/**
* Tests if the FeatureApis#upgradeAllFeatures API works as expected during a success case.
*/
@Test
def testUpgradeAllFeaturesSuccess(): Unit = {
val upgradeOpts = new FeatureCommandOptions(Array("--bootstrap-server", brokerList, "--upgrade-all"))
val featureApis = new FeatureApis(upgradeOpts)
try {
// Step (1):
// - Update the supported features across all brokers.
// - Upgrade non-existing feature_1 to maxVersionLevel: 2.
// - Verify results.
val initialSupportedFeatures = Features.supportedFeatures(Utils.mkMap(Utils.mkEntry("feature_1", new SupportedVersionRange(1, 2))))
updateSupportedFeaturesInAllBrokers(initialSupportedFeatures)
featureApis.setSupportedFeatures(initialSupportedFeatures)
var output = TestUtils.grabConsoleOutput(featureApis.upgradeAllFeatures())
var expected =
" [Add]\tFeature: feature_1\tExistingFinalizedMaxVersion: -\tNewFinalizedMaxVersion: 2\tResult: OK\n"
assertEquals(expected, output)
// Step (2):
// - Update the supported features across all brokers.
// - Upgrade existing feature_1 to maxVersionLevel: 3.
// - Upgrade non-existing feature_2 to maxVersionLevel: 5.
// - Verify results.
updateSupportedFeaturesInAllBrokers(defaultSupportedFeatures)
featureApis.setSupportedFeatures(defaultSupportedFeatures)
output = TestUtils.grabConsoleOutput(featureApis.upgradeAllFeatures())
expected =
" [Upgrade]\tFeature: feature_1\tExistingFinalizedMaxVersion: 2\tNewFinalizedMaxVersion: 3\tResult: OK\n" +
" [Add]\tFeature: feature_2\tExistingFinalizedMaxVersion: -\tNewFinalizedMaxVersion: 5\tResult: OK\n"
assertEquals(expected, output)
// Step (3):
// - Perform an upgrade of all features again.
// - Since supported features have not changed, expect that the above action does not yield
// any results.
output = TestUtils.grabConsoleOutput(featureApis.upgradeAllFeatures())
assertTrue(output.isEmpty)
featureApis.setOptions(upgradeOpts)
output = TestUtils.grabConsoleOutput(featureApis.upgradeAllFeatures())
assertTrue(output.isEmpty)
} finally {
featureApis.close()
}
}
/**
* Tests if the FeatureApis#downgradeAllFeatures API works as expected during a success case.
*/
@Test
def testDowngradeFeaturesSuccess(): Unit = {
val downgradeOpts = new FeatureCommandOptions(Array("--bootstrap-server", brokerList, "--downgrade-all"))
val upgradeOpts = new FeatureCommandOptions(Array("--bootstrap-server", brokerList, "--upgrade-all"))
val featureApis = new FeatureApis(upgradeOpts)
try {
// Step (1):
// - Update the supported features across all brokers.
// - Upgrade non-existing feature_1 to maxVersionLevel: 3.
// - Upgrade non-existing feature_2 to maxVersionLevel: 5.
updateSupportedFeaturesInAllBrokers(defaultSupportedFeatures)
featureApis.setSupportedFeatures(defaultSupportedFeatures)
featureApis.upgradeAllFeatures()
// Step (2):
// - Downgrade existing feature_1 to maxVersionLevel: 2.
// - Delete feature_2 since it is no longer supported by the FeatureApis object.
// - Verify results.
val downgradedFeatures = Features.supportedFeatures(Utils.mkMap(Utils.mkEntry("feature_1", new SupportedVersionRange(1, 2))))
featureApis.setSupportedFeatures(downgradedFeatures)
featureApis.setOptions(downgradeOpts)
var output = TestUtils.grabConsoleOutput(featureApis.downgradeAllFeatures())
var expected =
"[Downgrade]\tFeature: feature_1\tExistingFinalizedMaxVersion: 3\tNewFinalizedMaxVersion: 2\tResult: OK\n" +
" [Delete]\tFeature: feature_2\tExistingFinalizedMaxVersion: 5\tNewFinalizedMaxVersion: -\tResult: OK\n"
assertEquals(expected, output)
// Step (3):
// - Perform a downgrade of all features again.
// - Since supported features have not changed, expect that the above action does not yield
// any results.
updateSupportedFeaturesInAllBrokers(downgradedFeatures)
output = TestUtils.grabConsoleOutput(featureApis.downgradeAllFeatures())
assertTrue(output.isEmpty)
// Step (4):
// - Delete feature_1 since it is no longer supported by the FeatureApis object.
// - Verify results.
featureApis.setSupportedFeatures(Features.emptySupportedFeatures())
output = TestUtils.grabConsoleOutput(featureApis.downgradeAllFeatures())
expected =
" [Delete]\tFeature: feature_1\tExistingFinalizedMaxVersion: 2\tNewFinalizedMaxVersion: -\tResult: OK\n"
assertEquals(expected, output)
} finally {
featureApis.close()
}
}
/**
* Tests if the FeatureApis#upgradeAllFeatures API works as expected during a partial failure case.
*/
@Test
def testUpgradeFeaturesFailure(): Unit = {
val upgradeOpts = new FeatureCommandOptions(Array("--bootstrap-server", brokerList, "--upgrade-all"))
val featureApis = new FeatureApis(upgradeOpts)
try {
// Step (1): Update the supported features across all brokers.
updateSupportedFeaturesInAllBrokers(defaultSupportedFeatures)
// Step (2):
// - Intentionally setup the FeatureApis object such that it contains incompatible target
// features (viz. feature_2 and feature_3).
// - Upgrade non-existing feature_1 to maxVersionLevel: 4. Expect the operation to fail with
// an incompatibility failure.
// - Upgrade non-existing feature_2 to maxVersionLevel: 5. Expect the operation to succeed.
// - Upgrade non-existing feature_3 to maxVersionLevel: 3. Expect the operation to fail
// since the feature is not supported.
val targetFeaturesWithIncompatibilities =
Features.supportedFeatures(
Utils.mkMap(Utils.mkEntry("feature_1", new SupportedVersionRange(1, 4)),
Utils.mkEntry("feature_2", new SupportedVersionRange(1, 5)),
Utils.mkEntry("feature_3", new SupportedVersionRange(1, 3))))
featureApis.setSupportedFeatures(targetFeaturesWithIncompatibilities)
val output = TestUtils.grabConsoleOutput({
val exception = intercept[UpdateFeaturesException] {
featureApis.upgradeAllFeatures()
}
assertEquals("2 feature updates failed!", exception.getMessage)
})
val expected =
" [Add]\tFeature: feature_1\tExistingFinalizedMaxVersion: -" +
"\tNewFinalizedMaxVersion: 4\tResult: FAILED due to" +
" org.apache.kafka.common.errors.InvalidRequestException: Could not apply finalized" +
" feature update because brokers were found to have incompatible versions for the" +
" feature.\n" +
" [Add]\tFeature: feature_2\tExistingFinalizedMaxVersion: -" +
"\tNewFinalizedMaxVersion: 5\tResult: OK\n" +
" [Add]\tFeature: feature_3\tExistingFinalizedMaxVersion: -" +
"\tNewFinalizedMaxVersion: 3\tResult: FAILED due to" +
" org.apache.kafka.common.errors.InvalidRequestException: Could not apply finalized" +
" feature update because the provided feature is not supported.\n"
assertEquals(expected, output)
} finally {
featureApis.close()
}
}
}
Loading…
Cancel
Save