Browse Source

KAFKA-14596: Move TopicCommand to tools (#13201)

Reviewers: Mickael Maison <mickael.maison@gmail.com>, Federico Valeri <fedevaleri@gmail.com>
pull/13762/head
Omnia G.H Ibrahim 1 year ago committed by GitHub
parent
commit
9af1e74b5e
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 2
      bin/kafka-topics.sh
  2. 2
      bin/windows/kafka-topics.bat
  3. 5
      build.gradle
  4. 3
      checkstyle/import-control.xml
  5. 649
      core/src/main/scala/kafka/admin/TopicCommand.scala
  6. 831
      core/src/test/scala/integration/kafka/admin/TopicCommandIntegrationTest.scala
  7. 246
      core/src/test/scala/unit/kafka/admin/TopicCommandTest.scala
  8. 17
      tools/src/main/java/org/apache/kafka/tools/ToolsUtils.java
  9. 1003
      tools/src/main/java/org/apache/kafka/tools/TopicCommand.java
  10. 143
      tools/src/test/java/org/apache/kafka/tools/ToolsTestUtils.java
  11. 1065
      tools/src/test/java/org/apache/kafka/tools/TopicCommandIntegrationTest.java
  12. 283
      tools/src/test/java/org/apache/kafka/tools/TopicCommandTest.java

2
bin/kafka-topics.sh

@ -14,4 +14,4 @@
# See the License for the specific language governing permissions and # See the License for the specific language governing permissions and
# limitations under the License. # limitations under the License.
exec $(dirname $0)/kafka-run-class.sh kafka.admin.TopicCommand "$@" exec $(dirname $0)/kafka-run-class.sh org.apache.kafka.tools.TopicCommand "$@"

2
bin/windows/kafka-topics.bat

@ -14,4 +14,4 @@ rem WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
rem See the License for the specific language governing permissions and rem See the License for the specific language governing permissions and
rem limitations under the License. rem limitations under the License.
"%~dp0kafka-run-class.bat" kafka.admin.TopicCommand %* "%~dp0kafka-run-class.bat" org.apache.kafka.tools.TopicCommand %*

5
build.gradle

@ -1882,17 +1882,14 @@ project(':tools:tools-api') {
project(':tools') { project(':tools') {
archivesBaseName = "kafka-tools" archivesBaseName = "kafka-tools"
dependencies { dependencies {
implementation project(':clients') implementation project(':clients')
implementation project(':storage')
implementation project(':server-common') implementation project(':server-common')
implementation project(':connect:api')
implementation project(':connect:runtime') implementation project(':connect:runtime')
implementation project(':log4j-appender') implementation project(':log4j-appender')
implementation project(':tools:tools-api') implementation project(':tools:tools-api')
implementation libs.argparse4j implementation libs.argparse4j
implementation libs.jacksonDatabind
implementation libs.jacksonJDK8Datatypes
implementation libs.slf4jApi implementation libs.slf4jApi
implementation libs.log4j implementation libs.log4j
implementation libs.joptSimple implementation libs.joptSimple

3
checkstyle/import-control.xml

@ -273,6 +273,9 @@
<subpackage name="tools"> <subpackage name="tools">
<allow pkg="org.apache.kafka.common"/> <allow pkg="org.apache.kafka.common"/>
<allow pkg="org.apache.kafka.server.util" /> <allow pkg="org.apache.kafka.server.util" />
<allow pkg="kafka.admin" />
<allow pkg="kafka.server" />
<allow pkg="org.apache.kafka.storage.internals" />
<allow pkg="org.apache.kafka.server.common" /> <allow pkg="org.apache.kafka.server.common" />
<allow pkg="org.apache.kafka.clients" /> <allow pkg="org.apache.kafka.clients" />
<allow pkg="org.apache.kafka.clients.admin" /> <allow pkg="org.apache.kafka.clients.admin" />

649
core/src/main/scala/kafka/admin/TopicCommand.scala

@ -1,649 +0,0 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package kafka.admin
import java.util
import java.util.{Collections, Optional, Properties}
import joptsimple._
import kafka.utils._
import org.apache.kafka.clients.CommonClientConfigs
import org.apache.kafka.clients.admin.CreatePartitionsOptions
import org.apache.kafka.clients.admin.CreateTopicsOptions
import org.apache.kafka.clients.admin.DeleteTopicsOptions
import org.apache.kafka.clients.admin.{Admin, ListTopicsOptions, NewPartitions, NewTopic, PartitionReassignment, Config => JConfig}
import org.apache.kafka.common.{TopicCollection, TopicPartition, TopicPartitionInfo, Uuid}
import org.apache.kafka.common.config.ConfigResource.Type
import org.apache.kafka.common.config.{ConfigResource, TopicConfig}
import org.apache.kafka.common.errors.{ClusterAuthorizationException, TopicExistsException, UnsupportedVersionException}
import org.apache.kafka.common.internals.Topic
import org.apache.kafka.common.utils.Utils
import org.apache.kafka.server.common.{AdminCommandFailedException, AdminOperationException}
import org.apache.kafka.server.util.{CommandDefaultOptions, CommandLineUtils}
import org.apache.kafka.storage.internals.log.LogConfig
import org.apache.kafka.server.util.TopicFilter.IncludeList
import scala.annotation.nowarn
import scala.jdk.CollectionConverters._
import scala.collection._
import scala.compat.java8.OptionConverters._
import scala.concurrent.ExecutionException
object TopicCommand extends Logging {
def main(args: Array[String]): Unit = {
val opts = new TopicCommandOptions(args)
opts.checkArgs()
val topicService = TopicService(opts.commandConfig, opts.bootstrapServer)
var exitCode = 0
try {
if (opts.hasCreateOption)
topicService.createTopic(opts)
else if (opts.hasAlterOption)
topicService.alterTopic(opts)
else if (opts.hasListOption)
topicService.listTopics(opts)
else if (opts.hasDescribeOption)
topicService.describeTopic(opts)
else if (opts.hasDeleteOption)
topicService.deleteTopic(opts)
} catch {
case e: ExecutionException =>
if (e.getCause != null)
printException(e.getCause)
else
printException(e)
exitCode = 1
case e: Throwable =>
printException(e)
exitCode = 1
} finally {
topicService.close()
Exit.exit(exitCode)
}
}
private def printException(e: Throwable): Unit = {
println("Error while executing topic command : " + e.getMessage)
error(Utils.stackTrace(e))
}
class CommandTopicPartition(opts: TopicCommandOptions) {
val name = opts.topic.get
val partitions = opts.partitions
val replicationFactor = opts.replicationFactor
val replicaAssignment = opts.replicaAssignment
val configsToAdd = parseTopicConfigsToBeAdded(opts)
def hasReplicaAssignment: Boolean = replicaAssignment.isDefined
def ifTopicDoesntExist(): Boolean = opts.ifNotExists
}
case class TopicDescription(topic: String,
topicId: Uuid,
numPartitions: Int,
replicationFactor: Int,
config: JConfig,
markedForDeletion: Boolean) {
def printDescription(): Unit = {
val configsAsString = config.entries.asScala.filterNot(_.isDefault).map { ce => s"${ce.name}=${ce.value}" }.mkString(",")
print(s"Topic: $topic")
if(topicId != Uuid.ZERO_UUID) print(s"\tTopicId: $topicId")
print(s"\tPartitionCount: $numPartitions")
print(s"\tReplicationFactor: $replicationFactor")
print(s"\tConfigs: $configsAsString")
print(if (markedForDeletion) "\tMarkedForDeletion: true" else "")
println()
}
}
case class PartitionDescription(topic: String,
info: TopicPartitionInfo,
config: Option[JConfig],
markedForDeletion: Boolean,
reassignment: Option[PartitionReassignment]) {
private def minIsrCount: Option[Int] = {
config.map(_.get(TopicConfig.MIN_IN_SYNC_REPLICAS_CONFIG).value.toInt)
}
def isUnderReplicated: Boolean = {
getReplicationFactor(info, reassignment) - info.isr.size > 0
}
private def hasLeader: Boolean = {
info.leader != null
}
def isUnderMinIsr: Boolean = {
!hasLeader || minIsrCount.exists(info.isr.size < _)
}
def isAtMinIsrPartitions: Boolean = {
minIsrCount.contains(info.isr.size)
}
def hasUnavailablePartitions(liveBrokers: Set[Int]): Boolean = {
!hasLeader || !liveBrokers.contains(info.leader.id)
}
def printDescription(): Unit = {
print("\tTopic: " + topic)
print("\tPartition: " + info.partition)
print("\tLeader: " + (if (hasLeader) info.leader.id else "none"))
print("\tReplicas: " + info.replicas.asScala.map(_.id).mkString(","))
print("\tIsr: " + info.isr.asScala.map(_.id).mkString(","))
if (reassignment.nonEmpty) {
print("\tAdding Replicas: " + reassignment.get.addingReplicas().asScala.mkString(","))
print("\tRemoving Replicas: " + reassignment.get.removingReplicas().asScala.mkString(","))
}
print(if (markedForDeletion) "\tMarkedForDeletion: true" else "")
println()
}
}
class DescribeOptions(opts: TopicCommandOptions, liveBrokers: Set[Int]) {
val describeConfigs =
!opts.reportUnavailablePartitions &&
!opts.reportUnderReplicatedPartitions &&
!opts.reportUnderMinIsrPartitions &&
!opts.reportAtMinIsrPartitions
val describePartitions = !opts.reportOverriddenConfigs
private def shouldPrintUnderReplicatedPartitions(partitionDescription: PartitionDescription): Boolean = {
opts.reportUnderReplicatedPartitions && partitionDescription.isUnderReplicated
}
private def shouldPrintUnavailablePartitions(partitionDescription: PartitionDescription): Boolean = {
opts.reportUnavailablePartitions && partitionDescription.hasUnavailablePartitions(liveBrokers)
}
private def shouldPrintUnderMinIsrPartitions(partitionDescription: PartitionDescription): Boolean = {
opts.reportUnderMinIsrPartitions && partitionDescription.isUnderMinIsr
}
private def shouldPrintAtMinIsrPartitions(partitionDescription: PartitionDescription): Boolean = {
opts.reportAtMinIsrPartitions && partitionDescription.isAtMinIsrPartitions
}
private def shouldPrintTopicPartition(partitionDesc: PartitionDescription): Boolean = {
describeConfigs ||
shouldPrintUnderReplicatedPartitions(partitionDesc) ||
shouldPrintUnavailablePartitions(partitionDesc) ||
shouldPrintUnderMinIsrPartitions(partitionDesc) ||
shouldPrintAtMinIsrPartitions(partitionDesc)
}
def maybePrintPartitionDescription(desc: PartitionDescription): Unit = {
if (shouldPrintTopicPartition(desc))
desc.printDescription()
}
}
object TopicService {
def createAdminClient(commandConfig: Properties, bootstrapServer: Option[String]): Admin = {
bootstrapServer match {
case Some(serverList) => commandConfig.put(CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG, serverList)
case None =>
}
Admin.create(commandConfig)
}
def apply(commandConfig: Properties, bootstrapServer: Option[String]): TopicService =
new TopicService(createAdminClient(commandConfig, bootstrapServer))
}
case class TopicService private (adminClient: Admin) extends AutoCloseable {
def createTopic(opts: TopicCommandOptions): Unit = {
val topic = new CommandTopicPartition(opts)
if (Topic.hasCollisionChars(topic.name))
println("WARNING: Due to limitations in metric names, topics with a period ('.') or underscore ('_') could " +
"collide. To avoid issues it is best to use either, but not both.")
createTopic(topic)
}
def createTopic(topic: CommandTopicPartition): Unit = {
if (topic.replicationFactor.exists(rf => rf > Short.MaxValue || rf < 1))
throw new IllegalArgumentException(s"The replication factor must be between 1 and ${Short.MaxValue} inclusive")
if (topic.partitions.exists(partitions => partitions < 1))
throw new IllegalArgumentException(s"The partitions must be greater than 0")
try {
val newTopic = if (topic.hasReplicaAssignment)
new NewTopic(topic.name, asJavaReplicaReassignment(topic.replicaAssignment.get))
else {
new NewTopic(
topic.name,
topic.partitions.asJava,
topic.replicationFactor.map(_.toShort).map(Short.box).asJava)
}
val configsMap = topic.configsToAdd.stringPropertyNames()
.asScala
.map(name => name -> topic.configsToAdd.getProperty(name))
.toMap.asJava
newTopic.configs(configsMap)
val createResult = adminClient.createTopics(Collections.singleton(newTopic),
new CreateTopicsOptions().retryOnQuotaViolation(false))
createResult.all().get()
println(s"Created topic ${topic.name}.")
} catch {
case e : ExecutionException =>
if (e.getCause == null)
throw e
if (!(e.getCause.isInstanceOf[TopicExistsException] && topic.ifTopicDoesntExist()))
throw e.getCause
}
}
def listTopics(opts: TopicCommandOptions): Unit = {
println(getTopics(opts.topic, opts.excludeInternalTopics).mkString("\n"))
}
def alterTopic(opts: TopicCommandOptions): Unit = {
val topic = new CommandTopicPartition(opts)
val topics = getTopics(opts.topic, opts.excludeInternalTopics)
ensureTopicExists(topics, opts.topic, !opts.ifExists)
if (topics.nonEmpty) {
val topicsInfo = adminClient.describeTopics(topics.asJavaCollection).topicNameValues()
val newPartitions = topics.map { topicName =>
if (topic.hasReplicaAssignment) {
val startPartitionId = topicsInfo.get(topicName).get().partitions().size()
val newAssignment = {
val replicaMap = topic.replicaAssignment.get.drop(startPartitionId)
new util.ArrayList(replicaMap.map(p => p._2.asJava).asJavaCollection).asInstanceOf[util.List[util.List[Integer]]]
}
topicName -> NewPartitions.increaseTo(topic.partitions.get, newAssignment)
} else {
topicName -> NewPartitions.increaseTo(topic.partitions.get)
}
}.toMap
adminClient.createPartitions(newPartitions.asJava,
new CreatePartitionsOptions().retryOnQuotaViolation(false)).all().get()
}
}
def listAllReassignments(topicPartitions: util.Set[TopicPartition]): Map[TopicPartition, PartitionReassignment] = {
try {
adminClient.listPartitionReassignments(topicPartitions).reassignments().get().asScala
} catch {
case e: ExecutionException =>
e.getCause match {
case ex @ (_: UnsupportedVersionException | _: ClusterAuthorizationException) =>
logger.debug(s"Couldn't query reassignments through the AdminClient API: ${ex.getMessage}", ex)
Map()
case t => throw t
}
}
}
def describeTopic(opts: TopicCommandOptions): Unit = {
// If topicId is provided and not zero, will use topicId regardless of topic name
val inputTopicId = opts.topicId.map(Uuid.fromString).filter(uuid => uuid != Uuid.ZERO_UUID)
val useTopicId = inputTopicId.nonEmpty
val (topicIds, topics) = if (useTopicId)
(getTopicIds(inputTopicId, opts.excludeInternalTopics), Seq())
else
(Seq(), getTopics(opts.topic, opts.excludeInternalTopics))
// Only check topic name when topicId is not provided
if (useTopicId)
ensureTopicIdExists(topicIds, inputTopicId, !opts.ifExists)
else
ensureTopicExists(topics, opts.topic, !opts.ifExists)
val topicDescriptions = if (topicIds.nonEmpty) {
adminClient.describeTopics(TopicCollection.ofTopicIds(topicIds.toSeq.asJavaCollection)).allTopicIds().get().values().asScala
} else if (topics.nonEmpty) {
adminClient.describeTopics(TopicCollection.ofTopicNames(topics.asJavaCollection)).allTopicNames().get().values().asScala
} else {
Seq()
}
val topicNames = topicDescriptions.map(_.name())
val allConfigs = adminClient.describeConfigs(topicNames.map(new ConfigResource(Type.TOPIC, _)).asJavaCollection).values()
val liveBrokers = adminClient.describeCluster().nodes().get().asScala.map(_.id())
val describeOptions = new DescribeOptions(opts, liveBrokers.toSet)
val topicPartitions = topicDescriptions
.flatMap(td => td.partitions.iterator().asScala.map(p => new TopicPartition(td.name(), p.partition())))
.toSet.asJava
val reassignments = listAllReassignments(topicPartitions)
for (td <- topicDescriptions) {
val topicName = td.name
val topicId = td.topicId()
val config = allConfigs.get(new ConfigResource(Type.TOPIC, topicName)).get()
val sortedPartitions = td.partitions.asScala.sortBy(_.partition)
if (describeOptions.describeConfigs) {
val hasNonDefault = config.entries().asScala.exists(!_.isDefault)
if (!opts.reportOverriddenConfigs || hasNonDefault) {
val numPartitions = td.partitions().size
val firstPartition = td.partitions.iterator.next()
val reassignment = reassignments.get(new TopicPartition(td.name, firstPartition.partition))
val topicDesc = TopicDescription(topicName, topicId, numPartitions, getReplicationFactor(firstPartition, reassignment), config, markedForDeletion = false)
topicDesc.printDescription()
}
}
if (describeOptions.describePartitions) {
for (partition <- sortedPartitions) {
val reassignment = reassignments.get(new TopicPartition(td.name, partition.partition))
val partitionDesc = PartitionDescription(topicName, partition, Some(config), markedForDeletion = false, reassignment)
describeOptions.maybePrintPartitionDescription(partitionDesc)
}
}
}
}
def deleteTopic(opts: TopicCommandOptions): Unit = {
val topics = getTopics(opts.topic, opts.excludeInternalTopics)
ensureTopicExists(topics, opts.topic, !opts.ifExists)
adminClient.deleteTopics(topics.asJavaCollection, new DeleteTopicsOptions().retryOnQuotaViolation(false))
.all().get()
}
def getTopics(topicIncludeList: Option[String], excludeInternalTopics: Boolean = false): Seq[String] = {
val allTopics = if (excludeInternalTopics) {
adminClient.listTopics()
} else {
adminClient.listTopics(new ListTopicsOptions().listInternal(true))
}
doGetTopics(allTopics.names().get().asScala.toSeq.sorted, topicIncludeList, excludeInternalTopics)
}
def getTopicIds(topicIdIncludeList: Option[Uuid], excludeInternalTopics: Boolean = false): Seq[Uuid] = {
val allTopics = if (excludeInternalTopics) {
adminClient.listTopics()
} else {
adminClient.listTopics(new ListTopicsOptions().listInternal(true))
}
val allTopicIds = allTopics.listings().get().asScala.map(_.topicId()).toSeq.sorted
topicIdIncludeList.filter(allTopicIds.contains).toSeq
}
def close(): Unit = adminClient.close()
}
/**
* ensures topic existence and throws exception if topic doesn't exist
*
* @param foundTopics Topics that were found to match the requested topic name.
* @param requestedTopic Name of the topic that was requested.
* @param requireTopicExists Indicates if the topic needs to exist for the operation to be successful.
* If set to true, the command will throw an exception if the topic with the
* requested name does not exist.
*/
private def ensureTopicExists(foundTopics: Seq[String], requestedTopic: Option[String], requireTopicExists: Boolean): Unit = {
// If no topic name was mentioned, do not need to throw exception.
if (requestedTopic.isDefined && requireTopicExists && foundTopics.isEmpty) {
// If given topic doesn't exist then throw exception
throw new IllegalArgumentException(s"Topic '${requestedTopic.get}' does not exist as expected")
}
}
/**
* ensures topic existence and throws exception if topic doesn't exist
*
* @param foundTopicIds Topics that were found to match the requested topic id.
* @param requestedTopicId Id of the topic that was requested.
* @param requireTopicIdExists Indicates if the topic needs to exist for the operation to be successful.
* If set to true, the command will throw an exception if the topic with the
* requested id does not exist.
*/
private def ensureTopicIdExists(foundTopicIds: Seq[Uuid], requestedTopicId: Option[Uuid], requireTopicIdExists: Boolean): Unit = {
// If no topic id was mentioned, do not need to throw exception.
if (requestedTopicId.isDefined && requireTopicIdExists && foundTopicIds.isEmpty) {
// If given topicId doesn't exist then throw exception
throw new IllegalArgumentException(s"TopicId '${requestedTopicId.get}' does not exist as expected")
}
}
private def doGetTopics(allTopics: Seq[String], topicIncludeList: Option[String], excludeInternalTopics: Boolean): Seq[String] = {
if (topicIncludeList.isDefined) {
val topicsFilter = new IncludeList(topicIncludeList.get)
allTopics.filter(topicsFilter.isTopicAllowed(_, excludeInternalTopics))
} else
allTopics.filterNot(Topic.isInternal(_) && excludeInternalTopics)
}
@nowarn("cat=deprecation")
def parseTopicConfigsToBeAdded(opts: TopicCommandOptions): Properties = {
val configsToBeAdded = opts.topicConfig.getOrElse(Collections.emptyList()).asScala.map(_.split("""\s*=\s*"""))
require(configsToBeAdded.forall(config => config.length == 2),
"Invalid topic config: all configs to be added must be in the format \"key=val\".")
val props = new Properties
configsToBeAdded.foreach(pair => props.setProperty(pair(0).trim, pair(1).trim))
LogConfig.validate(props)
if (props.containsKey(TopicConfig.MESSAGE_FORMAT_VERSION_CONFIG)) {
println(s"WARNING: The configuration ${TopicConfig.MESSAGE_FORMAT_VERSION_CONFIG}=${props.getProperty(TopicConfig.MESSAGE_FORMAT_VERSION_CONFIG)} is specified. " +
"This configuration will be ignored if the version is newer than the inter.broker.protocol.version specified in the broker or " +
"if the inter.broker.protocol.version is 3.0 or newer. This configuration is deprecated and it will be removed in Apache Kafka 4.0.")
}
props
}
def parseReplicaAssignment(replicaAssignmentList: String): Map[Int, List[Int]] = {
val partitionList = replicaAssignmentList.split(",")
val ret = new mutable.LinkedHashMap[Int, List[Int]]()
for (i <- 0 until partitionList.size) {
val brokerList = partitionList(i).split(":").map(s => s.trim().toInt)
val duplicateBrokers = CoreUtils.duplicates(brokerList)
if (duplicateBrokers.nonEmpty)
throw new AdminCommandFailedException(s"Partition replica lists may not contain duplicate entries: ${duplicateBrokers.mkString(",")}")
ret.put(i, brokerList.toList)
if (ret(i).size != ret(0).size)
throw new AdminOperationException("Partition " + i + " has different replication factor: " + brokerList)
}
ret
}
def asJavaReplicaReassignment(original: Map[Int, List[Int]]): util.Map[Integer, util.List[Integer]] = {
original.map(f => Integer.valueOf(f._1) -> f._2.map(e => Integer.valueOf(e)).asJava).asJava
}
private def getReplicationFactor(tpi: TopicPartitionInfo, reassignment: Option[PartitionReassignment]): Int = {
// It is possible for a reassignment to complete between the time we have fetched its state and the time
// we fetch partition metadata. In this case, we ignore the reassignment when determining replication factor.
def isReassignmentInProgress(ra: PartitionReassignment): Boolean = {
// Reassignment is still in progress as long as the removing and adding replicas are still present
val allReplicaIds = tpi.replicas.asScala.map(_.id).toSet
val changingReplicaIds = ra.removingReplicas.asScala.map(_.intValue).toSet ++ ra.addingReplicas.asScala.map(_.intValue).toSet
allReplicaIds.exists(changingReplicaIds.contains)
}
reassignment match {
case Some(ra) if isReassignmentInProgress(ra) => ra.replicas.asScala.diff(ra.addingReplicas.asScala).size
case _=> tpi.replicas.size
}
}
class TopicCommandOptions(args: Array[String]) extends CommandDefaultOptions(args) {
private val bootstrapServerOpt = parser.accepts("bootstrap-server", "REQUIRED: The Kafka server to connect to.")
.withRequiredArg
.describedAs("server to connect to")
.ofType(classOf[String])
private val commandConfigOpt = parser.accepts("command-config", "Property file containing configs to be passed to Admin Client. " +
"This is used only with --bootstrap-server option for describing and altering broker configs.")
.withRequiredArg
.describedAs("command config property file")
.ofType(classOf[String])
private val kafkaConfigsCanAlterTopicConfigsViaBootstrapServer =
" (the kafka-configs CLI supports altering topic configs with a --bootstrap-server option)"
private val listOpt = parser.accepts("list", "List all available topics.")
private val createOpt = parser.accepts("create", "Create a new topic.")
private val deleteOpt = parser.accepts("delete", "Delete a topic")
private val alterOpt = parser.accepts("alter", "Alter the number of partitions and replica assignment. " +
"Update the configuration of an existing topic via --alter is no longer supported here" + kafkaConfigsCanAlterTopicConfigsViaBootstrapServer + ".")
private val describeOpt = parser.accepts("describe", "List details for the given topics.")
private val topicOpt = parser.accepts("topic", "The topic to create, alter, describe or delete. It also accepts a regular " +
"expression, except for --create option. Put topic name in double quotes and use the '\\' prefix " +
"to escape regular expression symbols; e.g. \"test\\.topic\".")
.withRequiredArg
.describedAs("topic")
.ofType(classOf[String])
private val topicIdOpt = parser.accepts("topic-id", "The topic-id to describe." +
"This is used only with --bootstrap-server option for describing topics.")
.withRequiredArg
.describedAs("topic-id")
.ofType(classOf[String])
private val nl = System.getProperty("line.separator")
private val configOpt = parser.accepts("config", "A topic configuration override for the topic being created or altered." +
" The following is a list of valid configurations: " + nl + LogConfig.configNames.asScala.map("\t" + _).mkString(nl) + nl +
"See the Kafka documentation for full details on the topic configs." +
" It is supported only in combination with --create if --bootstrap-server option is used" +
kafkaConfigsCanAlterTopicConfigsViaBootstrapServer + ".")
.withRequiredArg
.describedAs("name=value")
.ofType(classOf[String])
private val deleteConfigOpt = parser.accepts("delete-config", "A topic configuration override to be removed for an existing topic (see the list of configurations under the --config option). " +
"Not supported with the --bootstrap-server option.")
.withRequiredArg
.describedAs("name")
.ofType(classOf[String])
private val partitionsOpt = parser.accepts("partitions", "The number of partitions for the topic being created or " +
"altered (WARNING: If partitions are increased for a topic that has a key, the partition logic or ordering of the messages will be affected). If not supplied for create, defaults to the cluster default.")
.withRequiredArg
.describedAs("# of partitions")
.ofType(classOf[java.lang.Integer])
private val replicationFactorOpt = parser.accepts("replication-factor", "The replication factor for each partition in the topic being created. If not supplied, defaults to the cluster default.")
.withRequiredArg
.describedAs("replication factor")
.ofType(classOf[java.lang.Integer])
private val replicaAssignmentOpt = parser.accepts("replica-assignment", "A list of manual partition-to-broker assignments for the topic being created or altered.")
.withRequiredArg
.describedAs("broker_id_for_part1_replica1 : broker_id_for_part1_replica2 , " +
"broker_id_for_part2_replica1 : broker_id_for_part2_replica2 , ...")
.ofType(classOf[String])
private val reportUnderReplicatedPartitionsOpt = parser.accepts("under-replicated-partitions",
"if set when describing topics, only show under replicated partitions")
private val reportUnavailablePartitionsOpt = parser.accepts("unavailable-partitions",
"if set when describing topics, only show partitions whose leader is not available")
private val reportUnderMinIsrPartitionsOpt = parser.accepts("under-min-isr-partitions",
"if set when describing topics, only show partitions whose isr count is less than the configured minimum.")
private val reportAtMinIsrPartitionsOpt = parser.accepts("at-min-isr-partitions",
"if set when describing topics, only show partitions whose isr count is equal to the configured minimum.")
private val topicsWithOverridesOpt = parser.accepts("topics-with-overrides",
"if set when describing topics, only show topics that have overridden configs")
private val ifExistsOpt = parser.accepts("if-exists",
"if set when altering or deleting or describing topics, the action will only execute if the topic exists.")
private val ifNotExistsOpt = parser.accepts("if-not-exists",
"if set when creating topics, the action will only execute if the topic does not already exist.")
private val excludeInternalTopicOpt = parser.accepts("exclude-internal",
"exclude internal topics when running list or describe command. The internal topics will be listed by default")
options = parser.parse(args : _*)
private val allTopicLevelOpts = immutable.Set[OptionSpec[_]](alterOpt, createOpt, describeOpt, listOpt, deleteOpt)
private val allReplicationReportOpts = Set(reportUnderReplicatedPartitionsOpt, reportUnderMinIsrPartitionsOpt, reportAtMinIsrPartitionsOpt, reportUnavailablePartitionsOpt)
def has(builder: OptionSpec[_]): Boolean = options.has(builder)
def valueAsOption[A](option: OptionSpec[A], defaultValue: Option[A] = None): Option[A] = if (has(option)) Some(options.valueOf(option)) else defaultValue
def valuesAsOption[A](option: OptionSpec[A], defaultValue: Option[util.List[A]] = None): Option[util.List[A]] = if (has(option)) Some(options.valuesOf(option)) else defaultValue
def hasCreateOption: Boolean = has(createOpt)
def hasAlterOption: Boolean = has(alterOpt)
def hasListOption: Boolean = has(listOpt)
def hasDescribeOption: Boolean = has(describeOpt)
def hasDeleteOption: Boolean = has(deleteOpt)
def bootstrapServer: Option[String] = valueAsOption(bootstrapServerOpt)
def commandConfig: Properties = if (has(commandConfigOpt)) Utils.loadProps(options.valueOf(commandConfigOpt)) else new Properties()
def topic: Option[String] = valueAsOption(topicOpt)
def topicId: Option[String] = valueAsOption(topicIdOpt)
def partitions: Option[Integer] = valueAsOption(partitionsOpt)
def replicationFactor: Option[Integer] = valueAsOption(replicationFactorOpt)
def replicaAssignment: Option[Map[Int, List[Int]]] =
if (has(replicaAssignmentOpt) && Option(options.valueOf(replicaAssignmentOpt)).getOrElse("").nonEmpty)
Some(parseReplicaAssignment(options.valueOf(replicaAssignmentOpt)))
else
None
def reportUnderReplicatedPartitions: Boolean = has(reportUnderReplicatedPartitionsOpt)
def reportUnavailablePartitions: Boolean = has(reportUnavailablePartitionsOpt)
def reportUnderMinIsrPartitions: Boolean = has(reportUnderMinIsrPartitionsOpt)
def reportAtMinIsrPartitions: Boolean = has(reportAtMinIsrPartitionsOpt)
def reportOverriddenConfigs: Boolean = has(topicsWithOverridesOpt)
def ifExists: Boolean = has(ifExistsOpt)
def ifNotExists: Boolean = has(ifNotExistsOpt)
def excludeInternalTopics: Boolean = has(excludeInternalTopicOpt)
def topicConfig: Option[util.List[String]] = valuesAsOption(configOpt)
def configsToDelete: Option[util.List[String]] = valuesAsOption(deleteConfigOpt)
def checkArgs(): Unit = {
if (args.isEmpty)
CommandLineUtils.printUsageAndExit(parser, "Create, delete, describe, or change a topic.")
CommandLineUtils.maybePrintHelpOrVersion(this, "This tool helps to create, delete, describe, or change a topic.")
// should have exactly one action
val actions = Seq(createOpt, listOpt, alterOpt, describeOpt, deleteOpt).count(options.has)
if (actions != 1)
CommandLineUtils.printUsageAndExit(parser, "Command must include exactly one action: --list, --describe, --create, --alter or --delete")
// check required args
if (!has(bootstrapServerOpt))
throw new IllegalArgumentException("--bootstrap-server must be specified")
if (has(describeOpt) && has(ifExistsOpt)) {
if (!has(topicOpt) && !has(topicIdOpt))
CommandLineUtils.printUsageAndExit(parser, "--topic or --topic-id is required to describe a topic")
if (has(topicOpt) && has(topicIdOpt))
println("Only topic id will be used when both --topic and --topic-id are specified and topicId is not Uuid.ZERO_UUID")
}
if (!has(listOpt) && !has(describeOpt))
CommandLineUtils.checkRequiredArgs(parser, options, topicOpt)
if (has(alterOpt)) {
val usedOptions = immutable.Set[OptionSpec[_]](bootstrapServerOpt, configOpt)
val invalidOptions = immutable.Set[OptionSpec[_]](alterOpt)
CommandLineUtils.checkInvalidArgsSet(parser, options, usedOptions.asJava, invalidOptions.asJava, Optional.of(kafkaConfigsCanAlterTopicConfigsViaBootstrapServer))
CommandLineUtils.checkRequiredArgs(parser, options, partitionsOpt)
}
// check invalid args
CommandLineUtils.checkInvalidArgs(parser, options, configOpt, (allTopicLevelOpts -- Set(alterOpt, createOpt)).asJava)
CommandLineUtils.checkInvalidArgs(parser, options, deleteConfigOpt, (allTopicLevelOpts -- Set(alterOpt) ++ Set(bootstrapServerOpt)).asJava)
CommandLineUtils.checkInvalidArgs(parser, options, partitionsOpt, (allTopicLevelOpts -- Set(alterOpt, createOpt)).asJava)
CommandLineUtils.checkInvalidArgs(parser, options, replicationFactorOpt, (allTopicLevelOpts -- Set(createOpt)).asJava)
CommandLineUtils.checkInvalidArgs(parser, options, replicaAssignmentOpt, (allTopicLevelOpts -- Set(createOpt,alterOpt)).asJava)
if (options.has(createOpt))
CommandLineUtils.checkInvalidArgs(parser, options, replicaAssignmentOpt, partitionsOpt, replicationFactorOpt)
CommandLineUtils.checkInvalidArgs(parser, options, reportUnderReplicatedPartitionsOpt,
(allTopicLevelOpts -- Set(describeOpt) ++ allReplicationReportOpts - reportUnderReplicatedPartitionsOpt + topicsWithOverridesOpt).asJava)
CommandLineUtils.checkInvalidArgs(parser, options, reportUnderMinIsrPartitionsOpt,
(allTopicLevelOpts -- Set(describeOpt) ++ allReplicationReportOpts - reportUnderMinIsrPartitionsOpt + topicsWithOverridesOpt).asJava)
CommandLineUtils.checkInvalidArgs(parser, options, reportAtMinIsrPartitionsOpt,
(allTopicLevelOpts -- Set(describeOpt) ++ allReplicationReportOpts - reportAtMinIsrPartitionsOpt + topicsWithOverridesOpt).asJava)
CommandLineUtils.checkInvalidArgs(parser, options, reportUnavailablePartitionsOpt,
(allTopicLevelOpts -- Set(describeOpt) ++ allReplicationReportOpts - reportUnavailablePartitionsOpt + topicsWithOverridesOpt).asJava)
CommandLineUtils.checkInvalidArgs(parser, options, topicsWithOverridesOpt,
(allTopicLevelOpts -- Set(describeOpt) ++ allReplicationReportOpts).asJava)
CommandLineUtils.checkInvalidArgs(parser, options, ifExistsOpt, (allTopicLevelOpts -- Set(alterOpt, deleteOpt, describeOpt)).asJava)
CommandLineUtils.checkInvalidArgs(parser, options, ifNotExistsOpt, (allTopicLevelOpts -- Set(createOpt)).asJava)
CommandLineUtils.checkInvalidArgs(parser, options, excludeInternalTopicOpt, (allTopicLevelOpts -- Set(listOpt, describeOpt)).asJava)
}
}
}

831
core/src/test/scala/integration/kafka/admin/TopicCommandIntegrationTest.scala

@ -1,831 +0,0 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package kafka.admin
import java.util.{Collections, Optional, Properties}
import kafka.admin.TopicCommand.{TopicCommandOptions, TopicService}
import kafka.integration.KafkaServerTestHarness
import kafka.server.KafkaConfig
import kafka.utils.{Logging, TestInfoUtils, TestUtils}
import kafka.zk.{ConfigEntityChangeNotificationZNode, DeleteTopicsTopicZNode}
import org.apache.kafka.clients.CommonClientConfigs
import org.apache.kafka.clients.admin._
import org.apache.kafka.common.TopicPartition
import org.apache.kafka.common.config.{ConfigException, ConfigResource, TopicConfig}
import org.apache.kafka.common.errors.{ClusterAuthorizationException, InvalidTopicException, TopicExistsException}
import org.apache.kafka.common.internals.Topic
import org.apache.kafka.common.network.ListenerName
import org.apache.kafka.common.protocol.Errors
import org.apache.kafka.common.requests.MetadataResponse
import org.apache.kafka.common.security.auth.SecurityProtocol
import org.junit.jupiter.api.Assertions._
import org.junit.jupiter.api.function.Executable
import org.junit.jupiter.api.{AfterEach, BeforeEach, TestInfo}
import org.junit.jupiter.params.ParameterizedTest
import org.junit.jupiter.params.provider.ValueSource
import org.mockito.Mockito._
import scala.collection.Seq
import scala.concurrent.ExecutionException
import scala.jdk.CollectionConverters._
import scala.util.Random
class TopicCommandIntegrationTest extends KafkaServerTestHarness with Logging with RackAwareTest {
/**
* Implementations must override this method to return a set of KafkaConfigs. This method will be invoked for every
* test and should not reuse previous configurations unless they select their ports randomly when servers are started.
*
* Note the replica fetch max bytes is set to `1` in order to throttle the rate of replication for test
* `testDescribeUnderReplicatedPartitionsWhenReassignmentIsInProgress`.
*/
override def generateConfigs: Seq[KafkaConfig] = TestUtils.createBrokerConfigs(
numConfigs = numBrokers,
zkConnect = zkConnectOrNull,
rackInfo = Map(0 -> "rack1", 1 -> "rack2", 2 -> "rack2", 3 -> "rack1", 4 -> "rack3", 5 -> "rack3"),
numPartitions = numPartitions,
defaultReplicationFactor = defaultReplicationFactor,
).map { props =>
props.put(KafkaConfig.ReplicaFetchMaxBytesProp, "1")
KafkaConfig.fromProps(props)
}
private val numPartitions = 1
private val defaultReplicationFactor = 1.toShort
private val numBrokers = 6
private val lineSeparator = System.lineSeparator()
private var topicService: TopicService = _
private var adminClient: Admin = _
private var testTopicName: String = _
private[this] def createAndWaitTopic(opts: TopicCommandOptions): Unit = {
topicService.createTopic(opts)
waitForTopicCreated(opts.topic.get)
}
private[this] def waitForTopicCreated(topicName: String, timeout: Int = 10000): Unit = {
TestUtils.waitForPartitionMetadata(brokers, topicName, partition = 0, timeout)
}
@BeforeEach
override def setUp(info: TestInfo): Unit = {
super.setUp(info)
// create adminClient
val props = new Properties()
props.put(CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers())
adminClient = Admin.create(props)
topicService = TopicService(adminClient)
testTopicName = s"${info.getTestMethod.get().getName}-${Random.alphanumeric.take(10).mkString}"
}
@AfterEach
def close(): Unit = {
// adminClient is closed by topicService
if (topicService != null)
topicService.close()
}
@ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumName)
@ValueSource(strings = Array("zk", "kraft"))
def testCreate(quorum: String): Unit = {
createAndWaitTopic(new TopicCommandOptions(
Array("--partitions", "2", "--replication-factor", "1", "--topic", testTopicName)))
adminClient.listTopics().names().get().contains(testTopicName)
}
@ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumName)
@ValueSource(strings = Array("zk", "kraft"))
def testCreateWithDefaults(quorum: String): Unit = {
createAndWaitTopic(new TopicCommandOptions(Array("--topic", testTopicName)))
val partitions = adminClient
.describeTopics(Collections.singletonList(testTopicName))
.allTopicNames()
.get()
.get(testTopicName)
.partitions()
assertEquals(partitions.size(), numPartitions)
assertEquals(partitions.get(0).replicas().size(), defaultReplicationFactor)
}
@ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumName)
@ValueSource(strings = Array("zk", "kraft"))
def testCreateWithDefaultReplication(quorum: String): Unit = {
createAndWaitTopic(new TopicCommandOptions(
Array("--topic", testTopicName, "--partitions", "2")))
val partitions = adminClient
.describeTopics(Collections.singletonList(testTopicName))
.allTopicNames()
.get()
.get(testTopicName)
.partitions()
assertEquals(partitions.size(), 2)
assertEquals(partitions.get(0).replicas().size(), defaultReplicationFactor)
}
@ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumName)
@ValueSource(strings = Array("zk", "kraft"))
def testCreateWithDefaultPartitions(quorum: String): Unit = {
createAndWaitTopic(new TopicCommandOptions(
Array("--topic", testTopicName, "--replication-factor", "2")))
val partitions = adminClient
.describeTopics(Collections.singletonList(testTopicName))
.allTopicNames()
.get()
.get(testTopicName)
.partitions()
assertEquals(partitions.size(), numPartitions)
assertEquals(partitions.get(0).replicas().size(), 2)
}
@ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumName)
@ValueSource(strings = Array("zk", "kraft"))
def testCreateWithConfigs(quorum: String): Unit = {
val configResource = new ConfigResource(ConfigResource.Type.TOPIC, testTopicName)
createAndWaitTopic(new TopicCommandOptions(
Array("--partitions", "2", "--replication-factor", "2", "--topic", testTopicName, "--config", "delete.retention.ms=1000")))
val configs = adminClient
.describeConfigs(Collections.singleton(configResource))
.all().get().get(configResource)
assertEquals(1000, Integer.valueOf(configs.get("delete.retention.ms").value()))
}
@ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumName)
@ValueSource(strings = Array("zk", "kraft"))
def testCreateWhenAlreadyExists(quorum: String): Unit = {
val numPartitions = 1
// create the topic
val createOpts = new TopicCommandOptions(
Array("--partitions", numPartitions.toString, "--replication-factor", "1", "--topic", testTopicName))
createAndWaitTopic(createOpts)
// try to re-create the topic
assertThrows(classOf[TopicExistsException], () => topicService.createTopic(createOpts))
}
@ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumName)
@ValueSource(strings = Array("zk", "kraft"))
def testCreateWhenAlreadyExistsWithIfNotExists(quorum: String): Unit = {
val createOpts = new TopicCommandOptions(Array("--topic", testTopicName, "--if-not-exists"))
createAndWaitTopic(createOpts)
topicService.createTopic(createOpts)
}
@ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumName)
@ValueSource(strings = Array("zk", "kraft"))
def testCreateWithReplicaAssignment(quorum: String): Unit = {
// create the topic
val createOpts = new TopicCommandOptions(
Array("--replica-assignment", "5:4,3:2,1:0", "--topic", testTopicName))
createAndWaitTopic(createOpts)
val partitions = adminClient
.describeTopics(Collections.singletonList(testTopicName))
.allTopicNames()
.get()
.get(testTopicName)
.partitions()
assertEquals(3, partitions.size())
assertEquals(List(5, 4), partitions.get(0).replicas().asScala.map(_.id()))
assertEquals(List(3, 2), partitions.get(1).replicas().asScala.map(_.id()))
assertEquals(List(1, 0), partitions.get(2).replicas().asScala.map(_.id()))
}
@ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumName)
@ValueSource(strings = Array("zk", "kraft"))
def testCreateWithInvalidReplicationFactor(quorum: String): Unit = {
assertThrows(classOf[IllegalArgumentException],
() => topicService.createTopic(new TopicCommandOptions(
Array("--partitions", "2", "--replication-factor", (Short.MaxValue+1).toString, "--topic", testTopicName))))
}
@ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumName)
@ValueSource(strings = Array("zk", "kraft"))
def testCreateWithNegativeReplicationFactor(quorum: String): Unit = {
assertThrows(classOf[IllegalArgumentException],
() => topicService.createTopic(new TopicCommandOptions(
Array("--partitions", "2", "--replication-factor", "-1", "--topic", testTopicName))))
}
@ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumName)
@ValueSource(strings = Array("zk", "kraft"))
def testCreateWithNegativePartitionCount(quorum: String): Unit = {
assertThrows(classOf[IllegalArgumentException],
() => topicService.createTopic(new TopicCommandOptions(
Array("--partitions", "-1", "--replication-factor", "1", "--topic", testTopicName))))
}
@ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumName)
@ValueSource(strings = Array("zk", "kraft"))
def testInvalidTopicLevelConfig(quorum: String): Unit = {
val createOpts = new TopicCommandOptions(
Array("--partitions", "1", "--replication-factor", "1", "--topic", testTopicName,
"--config", "message.timestamp.type=boom"))
assertThrows(classOf[ConfigException], () => topicService.createTopic(createOpts))
}
@ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumName)
@ValueSource(strings = Array("zk", "kraft"))
def testListTopics(quorum: String): Unit = {
createAndWaitTopic(new TopicCommandOptions(
Array("--partitions", "1", "--replication-factor", "1", "--topic", testTopicName)))
val output = TestUtils.grabConsoleOutput(
topicService.listTopics(new TopicCommandOptions(Array())))
assertTrue(output.contains(testTopicName), s"Unexpected output: $output")
}
@ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumName)
@ValueSource(strings = Array("zk", "kraft"))
def testListTopicsWithIncludeList(quorum: String): Unit = {
val topic1 = "kafka.testTopic1"
val topic2 = "kafka.testTopic2"
val topic3 = "oooof.testTopic1"
TestUtils.createTopicWithAdmin(adminClient, topic1, brokers, 2, 2)
TestUtils.createTopicWithAdmin(adminClient, topic2, brokers, 2, 2)
TestUtils.createTopicWithAdmin(adminClient, topic3, brokers, 2, 2)
val output = TestUtils.grabConsoleOutput(
topicService.listTopics(new TopicCommandOptions(Array("--topic", "kafka.*"))))
assertTrue(output.contains(topic1), s"Unexpected output: $output")
assertTrue(output.contains(topic2), s"Unexpected output: $output")
assertFalse(output.contains(topic3), s"Unexpected output: $output")
}
@ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumName)
@ValueSource(strings = Array("zk", "kraft"))
def testListTopicsWithExcludeInternal(quorum: String): Unit = {
val topic1 = "kafka.testTopic1"
TestUtils.createTopicWithAdmin(adminClient, topic1, brokers, 2, 2)
TestUtils.createTopicWithAdmin(adminClient, Topic.GROUP_METADATA_TOPIC_NAME, brokers, 2, 2)
val output = TestUtils.grabConsoleOutput(
topicService.listTopics(new TopicCommandOptions(Array("--exclude-internal"))))
assertTrue(output.contains(topic1), s"Unexpected output: $output")
assertFalse(output.contains(Topic.GROUP_METADATA_TOPIC_NAME), s"Unexpected output: $output")
}
@ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumName)
@ValueSource(strings = Array("zk", "kraft"))
def testAlterPartitionCount(quorum: String): Unit = {
TestUtils.createTopicWithAdmin(adminClient, testTopicName, brokers, 2, 2)
topicService.alterTopic(new TopicCommandOptions(
Array("--topic", testTopicName, "--partitions", "3")))
TestUtils.waitUntilTrue(
() => brokers.forall(_.metadataCache.getTopicPartitions(testTopicName).size == 3),
"Timeout waiting for new assignment propagating to broker")
val topicDescription = adminClient.describeTopics(Collections.singletonList(testTopicName)).topicNameValues().get(testTopicName).get()
assertTrue(topicDescription.partitions().size() == 3)
}
@ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumName)
@ValueSource(strings = Array("zk", "kraft"))
def testAlterAssignment(quorum: String): Unit = {
TestUtils.createTopicWithAdmin(adminClient, testTopicName, brokers, 2, 2)
topicService.alterTopic(new TopicCommandOptions(
Array("--topic", testTopicName, "--replica-assignment", "5:3,3:1,4:2", "--partitions", "3")))
TestUtils.waitUntilTrue(
() => brokers.forall(_.metadataCache.getTopicPartitions(testTopicName).size == 3),
"Timeout waiting for new assignment propagating to broker")
val topicDescription = adminClient.describeTopics(Collections.singletonList(testTopicName)).topicNameValues().get(testTopicName).get()
assertTrue(topicDescription.partitions().size() == 3)
assertEquals(List(4,2), topicDescription.partitions().get(2).replicas().asScala.map(_.id()))
}
@ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumName)
@ValueSource(strings = Array("zk", "kraft"))
def testAlterAssignmentWithMoreAssignmentThanPartitions(quorum: String): Unit = {
TestUtils.createTopicWithAdmin(adminClient, testTopicName, brokers, 2, 2)
assertThrows(classOf[ExecutionException],
() => topicService.alterTopic(new TopicCommandOptions(
Array("--topic", testTopicName, "--replica-assignment", "5:3,3:1,4:2,3:2", "--partitions", "3"))))
}
@ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumName)
@ValueSource(strings = Array("zk", "kraft"))
def testAlterAssignmentWithMorePartitionsThanAssignment(quorum: String): Unit = {
TestUtils.createTopicWithAdmin(adminClient, testTopicName, brokers, 2, 2)
assertThrows(classOf[ExecutionException],
() => topicService.alterTopic(new TopicCommandOptions(
Array("--topic", testTopicName, "--replica-assignment", "5:3,3:1,4:2", "--partitions", "6"))))
}
@ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumName)
@ValueSource(strings = Array("zk", "kraft"))
def testAlterWithInvalidPartitionCount(quorum: String): Unit = {
createAndWaitTopic(new TopicCommandOptions(
Array("--partitions", "1", "--replication-factor", "1", "--topic", testTopicName)))
assertThrows(classOf[ExecutionException],
() => topicService.alterTopic(new TopicCommandOptions(
Array("--partitions", "-1", "--topic", testTopicName))))
}
@ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumName)
@ValueSource(strings = Array("zk", "kraft"))
def testAlterWhenTopicDoesntExist(quorum: String): Unit = {
// alter a topic that does not exist without --if-exists
val alterOpts = new TopicCommandOptions(Array("--topic", testTopicName, "--partitions", "1"))
val topicService = TopicService(adminClient)
assertThrows(classOf[IllegalArgumentException], () => topicService.alterTopic(alterOpts))
}
@ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumName)
@ValueSource(strings = Array("zk", "kraft"))
def testAlterWhenTopicDoesntExistWithIfExists(quorum: String): Unit = {
topicService.alterTopic(new TopicCommandOptions(
Array("--topic", testTopicName, "--partitions", "1", "--if-exists")))
}
@ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumName)
@ValueSource(strings = Array("zk", "kraft"))
def testCreateAlterTopicWithRackAware(quorum: String): Unit = {
val rackInfo = Map(0 -> "rack1", 1 -> "rack2", 2 -> "rack2", 3 -> "rack1", 4 -> "rack3", 5 -> "rack3")
val numPartitions = 18
val replicationFactor = 3
val createOpts = new TopicCommandOptions(Array(
"--partitions", numPartitions.toString,
"--replication-factor", replicationFactor.toString,
"--topic", testTopicName))
createAndWaitTopic(createOpts)
var assignment = adminClient.describeTopics(Collections.singletonList(testTopicName))
.allTopicNames().get().get(testTopicName).partitions()
.asScala.map(info => info.partition() -> info.replicas().asScala.map(_.id())).toMap
checkReplicaDistribution(assignment, rackInfo, rackInfo.size, numPartitions, replicationFactor)
val alteredNumPartitions = 36
// verify that adding partitions will also be rack aware
val alterOpts = new TopicCommandOptions(Array(
"--partitions", alteredNumPartitions.toString,
"--topic", testTopicName))
topicService.alterTopic(alterOpts)
TestUtils.waitUntilTrue(
() => brokers.forall(_.metadataCache.getTopicPartitions(testTopicName).size == alteredNumPartitions),
"Timeout waiting for new assignment propagating to broker")
assignment = adminClient.describeTopics(Collections.singletonList(testTopicName))
.allTopicNames().get().get(testTopicName).partitions()
.asScala.map(info => info.partition() -> info.replicas().asScala.map(_.id())).toMap
checkReplicaDistribution(assignment, rackInfo, rackInfo.size, alteredNumPartitions, replicationFactor)
}
@ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumName)
@ValueSource(strings = Array("zk", "kraft"))
def testConfigPreservationAcrossPartitionAlteration(quorum: String): Unit = {
val numPartitionsOriginal = 1
val cleanupKey = "cleanup.policy"
val cleanupVal = "compact"
// create the topic
val createOpts = new TopicCommandOptions(Array(
"--partitions", numPartitionsOriginal.toString,
"--replication-factor", "1",
"--config", cleanupKey + "=" + cleanupVal,
"--topic", testTopicName))
createAndWaitTopic(createOpts)
val configResource = new ConfigResource(ConfigResource.Type.TOPIC, testTopicName)
val props = adminClient.describeConfigs(Collections.singleton(configResource)).all().get().get(configResource)
// val props = adminZkClient.fetchEntityConfig(ConfigType.Topic, testTopicName)
assertNotNull(props.get(cleanupKey), "Properties after creation don't contain " + cleanupKey)
assertEquals(cleanupVal, props.get(cleanupKey).value(), "Properties after creation have incorrect value")
// pre-create the topic config changes path to avoid a NoNodeException
if (!isKRaftTest()) {
zkClient.makeSurePersistentPathExists(ConfigEntityChangeNotificationZNode.path)
}
// modify the topic to add new partitions
val numPartitionsModified = 3
val alterOpts = new TopicCommandOptions(
Array("--partitions", numPartitionsModified.toString, "--topic", testTopicName))
topicService.alterTopic(alterOpts)
val newProps = adminClient.describeConfigs(Collections.singleton(configResource)).all().get().get(configResource)
assertNotNull(newProps.get(cleanupKey), "Updated properties do not contain " + cleanupKey)
assertEquals(cleanupVal, newProps.get(cleanupKey).value(), "Updated properties have incorrect value")
}
@ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumName)
@ValueSource(strings = Array("zk", "kraft"))
def testTopicDeletion(quorum: String): Unit = {
// create the NormalTopic
val createOpts = new TopicCommandOptions(Array("--partitions", "1",
"--replication-factor", "1",
"--topic", testTopicName))
createAndWaitTopic(createOpts)
// delete the NormalTopic
val deleteOpts = new TopicCommandOptions(Array("--topic", testTopicName))
if (!isKRaftTest()) {
val deletePath = DeleteTopicsTopicZNode.path(testTopicName)
assertFalse(zkClient.pathExists(deletePath), "Delete path for topic shouldn't exist before deletion.")
}
topicService.deleteTopic(deleteOpts)
TestUtils.verifyTopicDeletion(zkClientOrNull, testTopicName, 1, brokers)
}
@ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumName)
@ValueSource(strings = Array("zk", "kraft"))
def testTopicWithCollidingCharDeletionAndCreateAgain(quorum: String): Unit = {
// create the topic with colliding chars
val topicWithCollidingChar = "test.a"
val createOpts = new TopicCommandOptions(Array("--partitions", "1",
"--replication-factor", "1",
"--topic", topicWithCollidingChar))
createAndWaitTopic(createOpts)
// delete the topic
val deleteOpts = new TopicCommandOptions(Array("--topic", topicWithCollidingChar))
if (!isKRaftTest()) {
val deletePath = DeleteTopicsTopicZNode.path(topicWithCollidingChar)
assertFalse(zkClient.pathExists(deletePath), "Delete path for topic shouldn't exist before deletion.")
}
topicService.deleteTopic(deleteOpts)
TestUtils.verifyTopicDeletion(zkClientOrNull, topicWithCollidingChar, 1, brokers)
val createTopic: Executable = () => createAndWaitTopic(createOpts)
assertDoesNotThrow(createTopic)
}
@ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumName)
@ValueSource(strings = Array("zk", "kraft"))
def testDeleteInternalTopic(quorum: String): Unit = {
// create the offset topic
val createOffsetTopicOpts = new TopicCommandOptions(Array("--partitions", "1",
"--replication-factor", "1",
"--topic", Topic.GROUP_METADATA_TOPIC_NAME))
createAndWaitTopic(createOffsetTopicOpts)
// Try to delete the Topic.GROUP_METADATA_TOPIC_NAME which is allowed by default.
// This is a difference between the new and the old command as the old one didn't allow internal topic deletion.
// If deleting internal topics is not desired, ACLS should be used to control it.
val deleteOffsetTopicOpts = new TopicCommandOptions(
Array("--topic", Topic.GROUP_METADATA_TOPIC_NAME))
val deleteOffsetTopicPath = DeleteTopicsTopicZNode.path(Topic.GROUP_METADATA_TOPIC_NAME)
if (!isKRaftTest()) {
assertFalse(zkClient.pathExists(deleteOffsetTopicPath), "Delete path for topic shouldn't exist before deletion.")
}
topicService.deleteTopic(deleteOffsetTopicOpts)
TestUtils.verifyTopicDeletion(zkClientOrNull, Topic.GROUP_METADATA_TOPIC_NAME, 1, brokers)
}
@ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumName)
@ValueSource(strings = Array("zk", "kraft"))
def testDeleteWhenTopicDoesntExist(quorum: String): Unit = {
// delete a topic that does not exist
val deleteOpts = new TopicCommandOptions(Array("--topic", testTopicName))
assertThrows(classOf[IllegalArgumentException], () => topicService.deleteTopic(deleteOpts))
}
@ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumName)
@ValueSource(strings = Array("zk", "kraft"))
def testDeleteWhenTopicDoesntExistWithIfExists(quorum: String): Unit = {
topicService.deleteTopic(new TopicCommandOptions(Array("--topic", testTopicName, "--if-exists")))
}
@ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumName)
@ValueSource(strings = Array("zk", "kraft"))
def testDescribe(quorum: String): Unit = {
TestUtils.createTopicWithAdmin(adminClient, testTopicName, brokers, 2, 2)
val output = TestUtils.grabConsoleOutput(
topicService.describeTopic(new TopicCommandOptions(Array("--topic", testTopicName))))
val rows = output.split(lineSeparator)
assertEquals(3, rows.size, s"Unexpected output: $output")
assertTrue(rows(0).startsWith(s"Topic: $testTopicName"), s"Unexpected output: ${rows(0)}")
}
@ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumName)
@ValueSource(strings = Array("zk", "kraft"))
def testDescribeWhenTopicDoesntExist(quorum: String): Unit = {
assertThrows(classOf[IllegalArgumentException],
() => topicService.describeTopic(new TopicCommandOptions(Array("--topic", testTopicName))))
}
@ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumName)
@ValueSource(strings = Array("zk", "kraft"))
def testDescribeWhenTopicDoesntExistWithIfExists(quorum: String): Unit = {
topicService.describeTopic(new TopicCommandOptions(Array("--topic", testTopicName, "--if-exists")))
}
@ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumName)
@ValueSource(strings = Array("zk", "kraft"))
def testDescribeUnavailablePartitions(quorum: String): Unit = {
TestUtils.createTopicWithAdmin(adminClient, testTopicName, brokers, numBrokers, 1)
try {
// check which partition is on broker 0 which we'll kill
val testTopicDescription = adminClient.describeTopics(Collections.singletonList(testTopicName))
.allTopicNames().get().asScala(testTopicName)
val partitionOnBroker0 = testTopicDescription.partitions().asScala.find(_.leader().id() == 0).get.partition()
killBroker(0)
// wait until the topic metadata for the test topic is propagated to each alive broker
TestUtils.waitUntilTrue(() => {
brokers
.filterNot(_.config.brokerId == 0)
.foldLeft(true) {
(result, server) => {
val topicMetadatas = server.dataPlaneRequestProcessor.metadataCache
.getTopicMetadata(Set(testTopicName), ListenerName.forSecurityProtocol(SecurityProtocol.PLAINTEXT))
val testPartitionMetadata = topicMetadatas.find(_.name.equals(testTopicName)).get.partitions.asScala.find(_.partitionIndex == partitionOnBroker0)
testPartitionMetadata match {
case None => throw new AssertionError(s"Partition metadata is not found in metadata cache")
case Some(metadata) => result && metadata.errorCode == Errors.LEADER_NOT_AVAILABLE.code
}
}
}
}, s"Partition metadata for $testTopicName is not propagated")
// grab the console output and assert
val output = TestUtils.grabConsoleOutput(
topicService.describeTopic(new TopicCommandOptions(
Array("--topic", testTopicName, "--unavailable-partitions"))))
val rows = output.split(lineSeparator)
assertTrue(rows(0).startsWith(s"\tTopic: $testTopicName"), s"Unexpected output: ${rows(0)}")
assertTrue(rows(0).contains("Leader: none\tReplicas: 0\tIsr:"), s"Unexpected output: ${rows(0)}")
} finally {
restartDeadBrokers()
}
}
@ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumName)
@ValueSource(strings = Array("zk", "kraft"))
def testDescribeUnderReplicatedPartitions(quorum: String): Unit = {
TestUtils.createTopicWithAdmin(adminClient, testTopicName, brokers, 1, numBrokers)
try {
killBroker(0)
if (isKRaftTest()) {
ensureConsistentKRaftMetadata()
} else {
TestUtils.waitForPartitionMetadata(aliveBrokers, testTopicName, 0)
}
val output = TestUtils.grabConsoleOutput(
topicService.describeTopic(new TopicCommandOptions(Array("--under-replicated-partitions"))))
val rows = output.split(lineSeparator)
assertTrue(rows(0).startsWith(s"\tTopic: $testTopicName"), s"Unexpected output: ${rows(0)}")
} finally {
restartDeadBrokers()
}
}
@ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumName)
@ValueSource(strings = Array("zk", "kraft"))
def testDescribeUnderMinIsrPartitions(quorum: String): Unit = {
val topicProps = new Properties()
topicProps.setProperty(TopicConfig.MIN_IN_SYNC_REPLICAS_CONFIG, numBrokers.toString)
// create topic
TestUtils.createTopicWithAdmin(adminClient, testTopicName, brokers, 1, numBrokers, topicConfig = topicProps)
try {
killBroker(0)
if (isKRaftTest()) {
ensureConsistentKRaftMetadata()
} else {
TestUtils.waitUntilTrue(
() => aliveBrokers.forall(_.metadataCache.getPartitionInfo(testTopicName, 0).get.isr().size() == 5),
s"Timeout waiting for partition metadata propagating to brokers for $testTopicName topic"
)
}
val output = TestUtils.grabConsoleOutput(
topicService.describeTopic(new TopicCommandOptions(Array("--under-min-isr-partitions"))))
val rows = output.split(lineSeparator)
assertTrue(rows(0).startsWith(s"\tTopic: $testTopicName"), s"Unexpected output: ${rows(0)}")
} finally {
restartDeadBrokers()
}
}
@ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumName)
@ValueSource(strings = Array("zk", "kraft"))
def testDescribeUnderReplicatedPartitionsWhenReassignmentIsInProgress(quorum: String): Unit = {
val tp = new TopicPartition(testTopicName, 0)
TestUtils.createTopicWithAdmin(adminClient, testTopicName, brokers, 1, 1)
// Produce multiple batches.
TestUtils.generateAndProduceMessages(brokers, testTopicName, numMessages = 10, acks = -1)
TestUtils.generateAndProduceMessages(brokers, testTopicName, numMessages = 10, acks = -1)
// Enable throttling. Note the broker config sets the replica max fetch bytes to `1` upon to minimize replication
// throughput so the reassignment doesn't complete quickly.
val brokerIds = brokers.map(_.config.brokerId)
TestUtils.setReplicationThrottleForPartitions(adminClient, brokerIds, Set(tp), throttleBytes = 1)
val testTopicDesc = adminClient.describeTopics(Collections.singleton(testTopicName)).allTopicNames().get().get(testTopicName)
val firstPartition = testTopicDesc.partitions().asScala.head
val replicasOfFirstPartition = firstPartition.replicas().asScala.map(_.id())
val targetReplica = brokerIds.diff(replicasOfFirstPartition).head
adminClient.alterPartitionReassignments(Collections.singletonMap(tp,
Optional.of(new NewPartitionReassignment(Collections.singletonList(targetReplica))))).all().get()
// let's wait until the LAIR is propagated
TestUtils.waitUntilTrue(() => {
val reassignments = adminClient.listPartitionReassignments(Collections.singleton(tp)).reassignments().get()
!reassignments.get(tp).addingReplicas().isEmpty
}, "Reassignment didn't add the second node")
// describe the topic and test if it's under-replicated
val simpleDescribeOutput = TestUtils.grabConsoleOutput(
topicService.describeTopic(new TopicCommandOptions(Array("--topic", testTopicName))))
val simpleDescribeOutputRows = simpleDescribeOutput.split(lineSeparator)
assertTrue(simpleDescribeOutputRows(0).startsWith(s"Topic: $testTopicName"), s"Unexpected output: ${simpleDescribeOutputRows(0)}")
assertEquals(2, simpleDescribeOutputRows.size, s"Unexpected output: $simpleDescribeOutput")
val underReplicatedOutput = TestUtils.grabConsoleOutput(
topicService.describeTopic(new TopicCommandOptions(Array("--under-replicated-partitions"))))
assertEquals("", underReplicatedOutput, s"--under-replicated-partitions shouldn't return anything: '$underReplicatedOutput'")
// Verify reassignment is still ongoing.
val reassignments = adminClient.listPartitionReassignments(Collections.singleton(tp)).reassignments.get().get(tp)
assertFalse(Option(reassignments).forall(_.addingReplicas.isEmpty))
TestUtils.removeReplicationThrottleForPartitions(adminClient, brokerIds, Set(tp))
TestUtils.waitForAllReassignmentsToComplete(adminClient)
}
@ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumName)
@ValueSource(strings = Array("zk", "kraft"))
def testDescribeAtMinIsrPartitions(quorum: String): Unit = {
val topicProps = new Properties()
topicProps.setProperty(TopicConfig.MIN_IN_SYNC_REPLICAS_CONFIG, "4")
// create topic
TestUtils.createTopicWithAdmin(adminClient, testTopicName, brokers, 1, numBrokers, topicConfig = topicProps)
try {
killBroker(0)
killBroker(1)
if (isKRaftTest()) {
ensureConsistentKRaftMetadata()
} else {
TestUtils.waitUntilTrue(
() => aliveBrokers.forall(_.metadataCache.getPartitionInfo(testTopicName, 0).get.isr().size() == 4),
s"Timeout waiting for partition metadata propagating to brokers for $testTopicName topic"
)
}
val output = TestUtils.grabConsoleOutput(
topicService.describeTopic(new TopicCommandOptions(Array("--at-min-isr-partitions"))))
val rows = output.split(lineSeparator)
assertTrue(rows(0).startsWith(s"\tTopic: $testTopicName"), s"Unexpected output: ${rows(0)}")
assertEquals(1, rows.length, s"Unexpected output: $output")
} finally {
restartDeadBrokers()
}
}
/**
* Test describe --under-min-isr-partitions option with four topics:
* (1) topic with partition under the configured min ISR count
* (2) topic with under-replicated partition (but not under min ISR count)
* (3) topic with offline partition
* (4) topic with fully replicated partition
*
* Output should only display the (1) topic with partition under min ISR count and (3) topic with offline partition
*/
@ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumName)
@ValueSource(strings = Array("zk", "kraft"))
def testDescribeUnderMinIsrPartitionsMixed(quorum: String): Unit = {
val underMinIsrTopic = "under-min-isr-topic"
val notUnderMinIsrTopic = "not-under-min-isr-topic"
val offlineTopic = "offline-topic"
val fullyReplicatedTopic = "fully-replicated-topic"
val topicProps = new Properties()
topicProps.setProperty(TopicConfig.MIN_IN_SYNC_REPLICAS_CONFIG, numBrokers.toString)
// create topic
TestUtils.createTopicWithAdmin(adminClient, underMinIsrTopic, brokers, 1, numBrokers, topicConfig = topicProps)
TestUtils.createTopicWithAdmin(adminClient, notUnderMinIsrTopic, brokers, 1, numBrokers)
TestUtils.createTopicWithAdmin(adminClient, offlineTopic, brokers, 1, replicaAssignment = Map(0 -> Seq(0)))
TestUtils.createTopicWithAdmin(adminClient, fullyReplicatedTopic, brokers, 1, replicaAssignment = Map(0 -> Seq(1, 2, 3)))
try {
killBroker(0)
if (isKRaftTest()) {
ensureConsistentKRaftMetadata()
} else {
TestUtils.waitUntilTrue(
() => aliveBrokers.forall(
broker =>
broker.metadataCache.getPartitionInfo(underMinIsrTopic, 0).get.isr().size() < numBrokers &&
broker.metadataCache.getPartitionInfo(offlineTopic, 0).get.leader() == MetadataResponse.NO_LEADER_ID),
"Timeout waiting for partition metadata propagating to brokers for underMinIsrTopic topic"
)
}
val output = TestUtils.grabConsoleOutput(
topicService.describeTopic(new TopicCommandOptions(Array("--under-min-isr-partitions"))))
val rows = output.split(lineSeparator)
assertTrue(rows(0).startsWith(s"\tTopic: $underMinIsrTopic"), s"Unexpected output: ${rows(0)}")
assertTrue(rows(1).startsWith(s"\tTopic: $offlineTopic"), s"Unexpected output: ${rows(1)}")
assertEquals(2, rows.length, s"Unexpected output: $output")
} finally {
restartDeadBrokers()
}
}
@ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumName)
@ValueSource(strings = Array("zk", "kraft"))
def testDescribeReportOverriddenConfigs(quorum: String): Unit = {
val config = "file.delete.delay.ms=1000"
createAndWaitTopic(new TopicCommandOptions(
Array("--partitions", "2", "--replication-factor", "2", "--topic", testTopicName, "--config", config)))
val output = TestUtils.grabConsoleOutput(
topicService.describeTopic(new TopicCommandOptions(Array())))
assertTrue(output.contains(config), s"Describe output should have contained $config")
}
@ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumName)
@ValueSource(strings = Array("zk", "kraft"))
def testDescribeAndListTopicsWithoutInternalTopics(quorum: String): Unit = {
createAndWaitTopic(
new TopicCommandOptions(Array("--partitions", "1", "--replication-factor", "1", "--topic", testTopicName)))
// create a internal topic
createAndWaitTopic(
new TopicCommandOptions(Array("--partitions", "1", "--replication-factor", "1", "--topic", Topic.GROUP_METADATA_TOPIC_NAME)))
// test describe
var output = TestUtils.grabConsoleOutput(topicService.describeTopic(new TopicCommandOptions(
Array("--describe", "--exclude-internal"))))
assertTrue(output.contains(testTopicName), s"Output should have contained $testTopicName")
assertFalse(output.contains(Topic.GROUP_METADATA_TOPIC_NAME), s"Unexpected output: $output")
// test list
output = TestUtils.grabConsoleOutput(topicService.listTopics(new TopicCommandOptions(Array("--list", "--exclude-internal"))))
assertTrue(output.contains(testTopicName), s"Unexpected output: $output")
assertFalse(output.contains(Topic.GROUP_METADATA_TOPIC_NAME), s"Unexpected output: $output")
}
@ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumName)
@ValueSource(strings = Array("zk", "kraft"))
def testDescribeDoesNotFailWhenListingReassignmentIsUnauthorized(quorum: String): Unit = {
adminClient = spy(adminClient)
topicService = TopicService(adminClient)
val result = AdminClientTestUtils.listPartitionReassignmentsResult(
new ClusterAuthorizationException("Unauthorized"))
// Passing `null` here to help the compiler disambiguate the `doReturn` methods,
// compilation for scala 2.12 fails otherwise.
doReturn(result, null).when(adminClient).listPartitionReassignments(
Set(new TopicPartition(testTopicName, 0)).asJava
)
TestUtils.createTopicWithAdmin(adminClient, testTopicName, brokers, 1, 1)
val output = TestUtils.grabConsoleOutput(
topicService.describeTopic(new TopicCommandOptions(Array("--topic", testTopicName))))
val rows = output.split(lineSeparator)
assertEquals(2, rows.size, s"Unexpected output: $output")
assertTrue(rows(0).startsWith(s"Topic: $testTopicName"), s"Unexpected output: ${rows(0)}")
}
@ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumName)
@ValueSource(strings = Array("zk", "kraft"))
def testCreateWithTopicNameCollision(quorum: String): Unit = {
TestUtils.createTopicWithAdmin(adminClient, "foo_bar", brokers, 1, numBrokers)
assertThrows(classOf[InvalidTopicException],
() => topicService.createTopic(new TopicCommandOptions(Array("--topic", "foo.bar"))))
}
}

246
core/src/test/scala/unit/kafka/admin/TopicCommandTest.scala

@ -1,246 +0,0 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package kafka.admin
import kafka.admin.TopicCommand.{PartitionDescription, TopicCommandOptions, TopicService}
import kafka.utils.Exit
import org.apache.kafka.clients.admin.{Admin, AdminClientTestUtils, CreatePartitionsOptions, CreateTopicsOptions, DeleteTopicsOptions, NewPartitions, NewTopic, PartitionReassignment, TopicDescription}
import org.apache.kafka.common.Node
import org.apache.kafka.common.TopicPartitionInfo
import org.apache.kafka.common.errors.ThrottlingQuotaExceededException
import org.apache.kafka.common.protocol.Errors
import org.apache.kafka.server.common.{AdminCommandFailedException, AdminOperationException}
import org.junit.jupiter.api.Assertions._
import org.junit.jupiter.api.Test
import org.mockito.ArgumentMatcher
import org.mockito.ArgumentMatchers.{any, argThat, eq => eqThat}
import org.mockito.Mockito.{mock, times, verify, when}
import java.util.{Collection, Collections, Optional}
import scala.collection.Seq
import scala.concurrent.ExecutionException
import scala.jdk.CollectionConverters._
class TopicCommandTest {
private[this] val brokerList = "localhost:9092"
private[this] val topicName = "topicName"
@Test
def testIsNotUnderReplicatedWhenAdding(): Unit = {
val replicaIds = List(1, 2)
val replicas = replicaIds.map { id =>
new Node(id, "localhost", 9090 + id)
}
val partitionDescription = PartitionDescription(
"test-topic",
new TopicPartitionInfo(
0,
new Node(1, "localhost", 9091),
replicas.asJava,
List(new Node(1, "localhost", 9091)).asJava
),
None,
markedForDeletion = false,
Some(
new PartitionReassignment(
replicaIds.map(id => id: java.lang.Integer).asJava,
List(2: java.lang.Integer).asJava,
List.empty.asJava
)
)
)
assertFalse(partitionDescription.isUnderReplicated)
}
@Test
def testAlterWithUnspecifiedPartitionCount(): Unit = {
assertCheckArgsExitCode(1, new TopicCommandOptions(
Array("--bootstrap-server", brokerList ,"--alter", "--topic", topicName)))
}
@Test
def testConfigOptWithBootstrapServers(): Unit = {
assertCheckArgsExitCode(1,
new TopicCommandOptions(Array("--bootstrap-server", brokerList ,"--alter", "--topic", topicName, "--partitions", "3", "--config", "cleanup.policy=compact")))
assertCheckArgsExitCode(1,
new TopicCommandOptions(Array("--bootstrap-server", brokerList ,"--alter", "--topic", topicName, "--partitions", "3", "--delete-config", "cleanup.policy")))
val opts =
new TopicCommandOptions(Array("--bootstrap-server", brokerList ,"--create", "--topic", topicName, "--partitions", "3", "--replication-factor", "3", "--config", "cleanup.policy=compact"))
opts.checkArgs()
assertTrue(opts.hasCreateOption)
assertEquals(brokerList, opts.bootstrapServer.get)
assertEquals("cleanup.policy=compact", opts.topicConfig.get.get(0))
}
@Test
def testCreateWithPartitionCountWithoutReplicationFactorShouldSucceed(): Unit = {
val opts = new TopicCommandOptions(
Array("--bootstrap-server", brokerList,
"--create",
"--partitions", "2",
"--topic", topicName))
opts.checkArgs()
}
@Test
def testCreateWithReplicationFactorWithoutPartitionCountShouldSucceed(): Unit = {
val opts = new TopicCommandOptions(
Array("--bootstrap-server", brokerList,
"--create",
"--replication-factor", "3",
"--topic", topicName))
opts.checkArgs()
}
@Test
def testCreateWithAssignmentAndPartitionCount(): Unit = {
assertCheckArgsExitCode(1,
new TopicCommandOptions(
Array("--bootstrap-server", brokerList,
"--create",
"--replica-assignment", "3:0,5:1",
"--partitions", "2",
"--topic", topicName)))
}
@Test
def testCreateWithAssignmentAndReplicationFactor(): Unit = {
assertCheckArgsExitCode(1,
new TopicCommandOptions(
Array("--bootstrap-server", brokerList,
"--create",
"--replica-assignment", "3:0,5:1",
"--replication-factor", "2",
"--topic", topicName)))
}
@Test
def testCreateWithoutPartitionCountAndReplicationFactorShouldSucceed(): Unit = {
val opts = new TopicCommandOptions(
Array("--bootstrap-server", brokerList,
"--create",
"--topic", topicName))
opts.checkArgs()
}
@Test
def testDescribeShouldSucceed(): Unit = {
val opts = new TopicCommandOptions(
Array("--bootstrap-server", brokerList,
"--describe",
"--topic", topicName))
opts.checkArgs()
}
@Test
def testParseAssignmentDuplicateEntries(): Unit = {
assertThrows(classOf[AdminCommandFailedException], () => TopicCommand.parseReplicaAssignment("5:5"))
}
@Test
def testParseAssignmentPartitionsOfDifferentSize(): Unit = {
assertThrows(classOf[AdminOperationException], () => TopicCommand.parseReplicaAssignment("5:4:3,2:1"))
}
@Test
def testParseAssignment(): Unit = {
val actualAssignment = TopicCommand.parseReplicaAssignment("5:4,3:2,1:0")
val expectedAssignment = Map(0 -> List(5, 4), 1 -> List(3, 2), 2 -> List(1, 0))
assertEquals(expectedAssignment, actualAssignment)
}
@Test
def testCreateTopicDoesNotRetryThrottlingQuotaExceededException(): Unit = {
val adminClient = mock(classOf[Admin])
val topicService = TopicService(adminClient)
val result = AdminClientTestUtils.createTopicsResult(topicName, Errors.THROTTLING_QUOTA_EXCEEDED.exception())
when(adminClient.createTopics(any(), any())).thenReturn(result)
assertThrows(classOf[ThrottlingQuotaExceededException],
() => topicService.createTopic(new TopicCommandOptions(Array("--topic", topicName))))
val expectedNewTopic = new NewTopic(topicName, Optional.empty[Integer](), Optional.empty[java.lang.Short]())
.configs(Map.empty[String, String].asJava)
verify(adminClient, times(1)).createTopics(
eqThat(Set(expectedNewTopic).asJava),
argThat((_.shouldRetryOnQuotaViolation() == false): ArgumentMatcher[CreateTopicsOptions])
)
}
@Test
def testDeleteTopicDoesNotRetryThrottlingQuotaExceededException(): Unit = {
val adminClient = mock(classOf[Admin])
val topicService = TopicService(adminClient)
val listResult = AdminClientTestUtils.listTopicsResult(topicName)
when(adminClient.listTopics(any())).thenReturn(listResult)
val result = AdminClientTestUtils.deleteTopicsResult(topicName, Errors.THROTTLING_QUOTA_EXCEEDED.exception())
when(adminClient.deleteTopics(any[Collection[String]](), any())).thenReturn(result)
val exception = assertThrows(classOf[ExecutionException],
() => topicService.deleteTopic(new TopicCommandOptions(Array("--topic", topicName))))
assertTrue(exception.getCause.isInstanceOf[ThrottlingQuotaExceededException])
verify(adminClient).deleteTopics(
argThat((topics: java.util.Collection[String]) => topics.asScala.toBuffer.equals(Seq(topicName))),
argThat((options: DeleteTopicsOptions) => !options.shouldRetryOnQuotaViolation)
)
}
@Test
def testCreatePartitionsDoesNotRetryThrottlingQuotaExceededException(): Unit = {
val adminClient = mock(classOf[Admin])
val topicService = TopicService(adminClient)
val listResult = AdminClientTestUtils.listTopicsResult(topicName)
when(adminClient.listTopics(any())).thenReturn(listResult)
val topicPartitionInfo = new TopicPartitionInfo(0, new Node(0, "", 0),
Collections.emptyList(), Collections.emptyList())
val describeResult = AdminClientTestUtils.describeTopicsResult(topicName, new TopicDescription(
topicName, false, Collections.singletonList(topicPartitionInfo)))
when(adminClient.describeTopics(any(classOf[java.util.Collection[String]]))).thenReturn(describeResult)
val result = AdminClientTestUtils.createPartitionsResult(topicName, Errors.THROTTLING_QUOTA_EXCEEDED.exception())
when(adminClient.createPartitions(any(), any())).thenReturn(result)
val exception = assertThrows(classOf[ExecutionException],
() => topicService.alterTopic(new TopicCommandOptions(Array("--topic", topicName, "--partitions", "3"))))
assertTrue(exception.getCause.isInstanceOf[ThrottlingQuotaExceededException])
verify(adminClient, times(1)).createPartitions(
argThat((_.get(topicName).totalCount() == 3): ArgumentMatcher[java.util.Map[String, NewPartitions]]),
argThat((_.shouldRetryOnQuotaViolation() == false): ArgumentMatcher[CreatePartitionsOptions])
)
}
private[this] def assertCheckArgsExitCode(expected: Int, options: TopicCommandOptions): Unit = {
Exit.setExitProcedure {
(exitCode: Int, _: Option[String]) =>
assertEquals(expected, exitCode)
throw new RuntimeException
}
try assertThrows(classOf[RuntimeException], () => options.checkArgs()) finally Exit.resetExitProcedure()
}
}

17
tools/src/main/java/org/apache/kafka/tools/ToolsUtils.java

@ -22,8 +22,10 @@ import org.apache.kafka.common.utils.Utils;
import java.io.PrintStream; import java.io.PrintStream;
import java.util.Arrays; import java.util.Arrays;
import java.util.HashSet;
import java.util.List; import java.util.List;
import java.util.Map; import java.util.Map;
import java.util.Set;
import java.util.TreeMap; import java.util.TreeMap;
import java.util.stream.Collectors; import java.util.stream.Collectors;
@ -123,4 +125,19 @@ public class ToolsUtils {
throw new IllegalArgumentException("Please provide valid host:port like host1:9091,host2:9092\n"); throw new IllegalArgumentException("Please provide valid host:port like host1:9091,host2:9092\n");
} }
} }
/**
* Return all duplicates in a list. A duplicated element will appear only once.
*/
public static <T> Set<T> duplicates(List<T> s) {
Set<T> set = new HashSet<>();
Set<T> duplicates = new HashSet<>();
s.forEach(element -> {
if (!set.add(element)) {
duplicates.add(element);
}
});
return duplicates;
}
} }

1003
tools/src/main/java/org/apache/kafka/tools/TopicCommand.java

File diff suppressed because it is too large Load Diff

143
tools/src/test/java/org/apache/kafka/tools/ToolsTestUtils.java

@ -17,15 +17,35 @@
package org.apache.kafka.tools; package org.apache.kafka.tools;
import kafka.utils.TestInfoUtils; import kafka.utils.TestInfoUtils;
import kafka.server.DynamicConfig;
import kafka.utils.TestUtils;
import org.apache.kafka.clients.admin.Admin;
import org.apache.kafka.clients.admin.AlterConfigOp;
import org.apache.kafka.clients.admin.ConfigEntry;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.config.ConfigResource;
import org.apache.kafka.common.utils.Exit; import org.apache.kafka.common.utils.Exit;
import org.apache.kafka.storage.internals.log.LogConfig;
import java.io.ByteArrayOutputStream; import java.io.ByteArrayOutputStream;
import java.io.PrintStream; import java.io.PrintStream;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.Set;
import java.util.concurrent.ExecutionException;
import java.util.stream.Collectors;
public class ToolsTestUtils { public class ToolsTestUtils {
/** @see TestInfoUtils#TestWithParameterizedQuorumName() */ /** @see TestInfoUtils#TestWithParameterizedQuorumName() */
public static final String TEST_WITH_PARAMETERIZED_QUORUM_NAME = "{displayName}.quorum={0}"; public static final String TEST_WITH_PARAMETERIZED_QUORUM_NAME = "{displayName}.quorum={0}";
private static int randomPort = 0;
public static String captureStandardOut(Runnable runnable) { public static String captureStandardOut(Runnable runnable) {
return captureStandardStream(false, runnable); return captureStandardStream(false, runnable);
} }
@ -55,6 +75,129 @@ public class ToolsTestUtils {
} }
} }
public static List<Properties> createBrokerProperties(int numConfigs, String zkConnect,
Map<Integer, String> rackInfo,
int numPartitions,
short defaultReplicationFactor) {
return createBrokerProperties(numConfigs, zkConnect, rackInfo, 1, false, numPartitions,
defaultReplicationFactor, 0);
}
/**
* Create a test config for the provided parameters.
*
* Note that if `interBrokerSecurityProtocol` is defined, the listener for the `SecurityProtocol` will be enabled.
*/
public static List<Properties> createBrokerProperties(int numConfigs, String zkConnect,
Map<Integer, String> rackInfo, int logDirCount,
boolean enableToken, int numPartitions, short defaultReplicationFactor,
int startingIdNumber) {
List<Properties> result = new ArrayList<>();
int endingIdNumber = startingIdNumber + numConfigs - 1;
for (int node = startingIdNumber; node <= endingIdNumber; node++) {
result.add(TestUtils.createBrokerConfig(node, zkConnect, true, true, randomPort,
scala.Option.empty(),
scala.Option.empty(),
scala.Option.empty(),
true, false, randomPort, false, randomPort, false, randomPort,
scala.Option.apply(rackInfo.get(node)),
logDirCount, enableToken, numPartitions, defaultReplicationFactor, false));
}
return result;
}
/**
* Set broker replication quotas and enable throttling for a set of partitions. This
* will override any previous replication quotas, but will leave the throttling status
* of other partitions unaffected.
*/
public static void setReplicationThrottleForPartitions(Admin admin,
List<Integer> brokerIds,
Set<TopicPartition> partitions,
int throttleBytes) throws ExecutionException, InterruptedException {
throttleAllBrokersReplication(admin, brokerIds, throttleBytes);
assignThrottledPartitionReplicas(admin, partitions.stream().collect(Collectors.toMap(p -> p, p -> brokerIds)));
}
/**
* Throttles all replication across the cluster.
* @param adminClient is the adminClient to use for making connection with the cluster
* @param brokerIds all broker ids in the cluster
* @param throttleBytes is the target throttle
*/
public static void throttleAllBrokersReplication(Admin adminClient, List<Integer> brokerIds, int throttleBytes) throws ExecutionException, InterruptedException {
List<AlterConfigOp> throttleConfigs = new ArrayList<>();
throttleConfigs.add(new AlterConfigOp(new ConfigEntry(DynamicConfig.Broker$.MODULE$.LeaderReplicationThrottledRateProp(),
Integer.toString(throttleBytes)), AlterConfigOp.OpType.SET));
throttleConfigs.add(new AlterConfigOp(new ConfigEntry(DynamicConfig.Broker$.MODULE$.FollowerReplicationThrottledRateProp(),
Integer.toString(throttleBytes)), AlterConfigOp.OpType.SET));
Map<ConfigResource, Collection<AlterConfigOp>> configs = new HashMap<>();
for (int brokerId : brokerIds) {
ConfigResource configResource = new ConfigResource(ConfigResource.Type.BROKER, Integer.toString(brokerId));
configs.put(configResource, throttleConfigs);
}
adminClient.incrementalAlterConfigs(configs).all().get();
}
/**
* Remove a set of throttled partitions and reset the overall replication quota.
*/
public static void removeReplicationThrottleForPartitions(Admin admin, List<Integer> brokerIds, Set<TopicPartition> partitions) throws ExecutionException, InterruptedException {
removePartitionReplicaThrottles(admin, partitions);
resetBrokersThrottle(admin, brokerIds);
}
public static void assignThrottledPartitionReplicas(Admin adminClient, Map<TopicPartition, List<Integer>> allReplicasByPartition) throws InterruptedException, ExecutionException {
Map<ConfigResource, List<Map.Entry<TopicPartition, List<Integer>>>> configResourceToPartitionReplicas =
allReplicasByPartition.entrySet().stream()
.collect(Collectors.groupingBy(
topicPartitionListEntry -> new ConfigResource(ConfigResource.Type.TOPIC, topicPartitionListEntry.getKey().topic()))
);
Map<ConfigResource, List<AlterConfigOp>> throttles = configResourceToPartitionReplicas.entrySet().stream()
.collect(
Collectors.toMap(Map.Entry::getKey, entry -> {
List<AlterConfigOp> alterConfigOps = new ArrayList<>();
Map<TopicPartition, List<Integer>> replicaThrottle =
entry.getValue().stream().collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue));
alterConfigOps.add(new AlterConfigOp(
new ConfigEntry(LogConfig.LEADER_REPLICATION_THROTTLED_REPLICAS_CONFIG, formatReplicaThrottles(replicaThrottle)),
AlterConfigOp.OpType.SET));
alterConfigOps.add(new AlterConfigOp(
new ConfigEntry(LogConfig.FOLLOWER_REPLICATION_THROTTLED_REPLICAS_CONFIG, formatReplicaThrottles(replicaThrottle)),
AlterConfigOp.OpType.SET));
return alterConfigOps;
}
));
adminClient.incrementalAlterConfigs(new HashMap<>(throttles)).all().get();
}
public static void resetBrokersThrottle(Admin adminClient, List<Integer> brokerIds) throws ExecutionException, InterruptedException {
throttleAllBrokersReplication(adminClient, brokerIds, Integer.MAX_VALUE);
}
public static void removePartitionReplicaThrottles(Admin adminClient, Set<TopicPartition> partitions) throws ExecutionException, InterruptedException {
Map<ConfigResource, Collection<AlterConfigOp>> throttles = partitions.stream().collect(Collectors.toMap(
tp -> new ConfigResource(ConfigResource.Type.TOPIC, tp.topic()),
tp -> Arrays.asList(
new AlterConfigOp(new ConfigEntry(LogConfig.LEADER_REPLICATION_THROTTLED_REPLICAS_CONFIG, ""),
AlterConfigOp.OpType.DELETE),
new AlterConfigOp(new ConfigEntry(LogConfig.FOLLOWER_REPLICATION_THROTTLED_REPLICAS_CONFIG, ""),
AlterConfigOp.OpType.DELETE))
));
adminClient.incrementalAlterConfigs(throttles).all().get();
}
public static String formatReplicaThrottles(Map<TopicPartition, List<Integer>> moves) {
return moves.entrySet().stream()
.flatMap(entry -> entry.getValue().stream().map(replicaId -> entry.getKey().partition() + ":" + replicaId))
.collect(Collectors.joining(","));
}
public static class MockExitProcedure implements Exit.Procedure { public static class MockExitProcedure implements Exit.Procedure {
private boolean hasExited = false; private boolean hasExited = false;
private int statusCode; private int statusCode;

1065
tools/src/test/java/org/apache/kafka/tools/TopicCommandIntegrationTest.java

File diff suppressed because it is too large Load Diff

283
tools/src/test/java/org/apache/kafka/tools/TopicCommandTest.java

@ -0,0 +1,283 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.tools;
import kafka.utils.Exit;
import org.apache.kafka.clients.admin.Admin;
import org.apache.kafka.clients.admin.AdminClientTestUtils;
import org.apache.kafka.clients.admin.CreatePartitionsResult;
import org.apache.kafka.clients.admin.CreateTopicsResult;
import org.apache.kafka.clients.admin.DeleteTopicsOptions;
import org.apache.kafka.clients.admin.DeleteTopicsResult;
import org.apache.kafka.clients.admin.DescribeTopicsResult;
import org.apache.kafka.clients.admin.ListTopicsResult;
import org.apache.kafka.clients.admin.NewTopic;
import org.apache.kafka.clients.admin.PartitionReassignment;
import org.apache.kafka.clients.admin.TopicDescription;
import org.apache.kafka.common.Node;
import org.apache.kafka.common.TopicPartitionInfo;
import org.apache.kafka.common.errors.ThrottlingQuotaExceededException;
import org.apache.kafka.common.protocol.Errors;
import org.apache.kafka.server.common.AdminCommandFailedException;
import org.apache.kafka.server.common.AdminOperationException;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.Timeout;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.ExecutionException;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertFalse;
import static org.junit.jupiter.api.Assertions.assertThrows;
import static org.junit.jupiter.api.Assertions.assertTrue;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.anyCollection;
import static org.mockito.ArgumentMatchers.argThat;
import static org.mockito.ArgumentMatchers.eq;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.when;
import static org.mockito.Mockito.verify;
@Timeout(value = 60)
public class TopicCommandTest {
private String bootstrapServer = "localhost:9092";
private String topicName = "topicName";
@Test
public void testIsNotUnderReplicatedWhenAdding() {
List<Integer> replicaIds = Arrays.asList(1, 2);
List<Node> replicas = new ArrayList<>();
for (int id : replicaIds) {
replicas.add(new Node(id, "localhost", 9090 + id));
}
TopicCommand.PartitionDescription partitionDescription = new TopicCommand.PartitionDescription("test-topic",
new TopicPartitionInfo(0, new Node(1, "localhost", 9091), replicas,
Collections.singletonList(new Node(1, "localhost", 9091))),
null, false,
new PartitionReassignment(replicaIds, Arrays.asList(2), Collections.emptyList())
);
assertFalse(partitionDescription.isUnderReplicated());
}
@Test
public void testAlterWithUnspecifiedPartitionCount() {
String[] options = new String[] {" --bootstrap-server", bootstrapServer, "--alter", "--topic", topicName};
assertInitializeInvalidOptionsExitCode(1, options);
}
@Test
public void testConfigOptWithBootstrapServers() {
assertInitializeInvalidOptionsExitCode(1,
new String[] {"--bootstrap-server", bootstrapServer, "--alter", "--topic", topicName,
"--partitions", "3", "--config", "cleanup.policy=compact"});
assertInitializeInvalidOptionsExitCode(1,
new String[] {"--bootstrap-server", bootstrapServer, "--alter", "--topic", topicName,
"--partitions", "3", "--delete-config", "cleanup.policy"});
TopicCommand.TopicCommandOptions opts =
new TopicCommand.TopicCommandOptions(
new String[] {"--bootstrap-server", bootstrapServer, "--create", "--topic", topicName, "--partitions", "3",
"--replication-factor", "3", "--config", "cleanup.policy=compact"});
assertTrue(opts.hasCreateOption());
assertEquals(bootstrapServer, opts.bootstrapServer().get());
assertEquals("cleanup.policy=compact", opts.topicConfig().get().get(0));
}
@Test
public void testCreateWithPartitionCountWithoutReplicationFactorShouldSucceed() {
TopicCommand.TopicCommandOptions opts = new TopicCommand.TopicCommandOptions(
new String[] {"--bootstrap-server", bootstrapServer,
"--create",
"--partitions", "2",
"--topic", topicName});
assertTrue(opts.hasCreateOption());
assertEquals(topicName, opts.topic().get());
assertEquals(2, opts.partitions().get());
}
@Test
public void testCreateWithReplicationFactorWithoutPartitionCountShouldSucceed() {
TopicCommand.TopicCommandOptions opts = new TopicCommand.TopicCommandOptions(
new String[] {"--bootstrap-server", bootstrapServer,
"--create",
"--replication-factor", "3",
"--topic", topicName});
assertTrue(opts.hasCreateOption());
assertEquals(topicName, opts.topic().get());
assertEquals(3, opts.replicationFactor().get());
}
@Test
public void testCreateWithAssignmentAndPartitionCount() {
assertInitializeInvalidOptionsExitCode(1,
new String[]{"--bootstrap-server", bootstrapServer,
"--create",
"--replica-assignment", "3:0,5:1",
"--partitions", "2",
"--topic", topicName});
}
@Test
public void testCreateWithAssignmentAndReplicationFactor() {
assertInitializeInvalidOptionsExitCode(1,
new String[] {"--bootstrap-server", bootstrapServer,
"--create",
"--replica-assignment", "3:0,5:1",
"--replication-factor", "2",
"--topic", topicName});
}
@Test
public void testCreateWithoutPartitionCountAndReplicationFactorShouldSucceed() {
TopicCommand.TopicCommandOptions opts = new TopicCommand.TopicCommandOptions(
new String[] {"--bootstrap-server", bootstrapServer,
"--create",
"--topic", topicName});
assertTrue(opts.hasCreateOption());
assertEquals(topicName, opts.topic().get());
assertFalse(opts.partitions().isPresent());
}
@Test
public void testDescribeShouldSucceed() {
TopicCommand.TopicCommandOptions opts = new TopicCommand.TopicCommandOptions(
new String[] {"--bootstrap-server", bootstrapServer,
"--describe",
"--topic", topicName});
assertTrue(opts.hasDescribeOption());
assertEquals(topicName, opts.topic().get());
}
@Test
public void testParseAssignmentDuplicateEntries() {
assertThrows(AdminCommandFailedException.class, () -> TopicCommand.parseReplicaAssignment("5:5"));
}
@Test
public void testParseAssignmentPartitionsOfDifferentSize() {
assertThrows(AdminOperationException.class, () -> TopicCommand.parseReplicaAssignment("5:4:3,2:1"));
}
@Test
public void testParseAssignment() {
Map<Integer, List<Integer>> actualAssignment = TopicCommand.parseReplicaAssignment("5:4,3:2,1:0");
Map<Integer, List<Integer>> expectedAssignment = new HashMap<>();
expectedAssignment.put(0, Arrays.asList(5, 4));
expectedAssignment.put(1, Arrays.asList(3, 2));
expectedAssignment.put(2, Arrays.asList(1, 0));
assertEquals(expectedAssignment, actualAssignment);
}
@Test
public void testCreateTopicDoesNotRetryThrottlingQuotaExceededException() {
Admin adminClient = mock(Admin.class);
TopicCommand.TopicService topicService = new TopicCommand.TopicService(adminClient);
CreateTopicsResult result = AdminClientTestUtils.createTopicsResult(topicName, Errors.THROTTLING_QUOTA_EXCEEDED.exception());
when(adminClient.createTopics(any(), any())).thenReturn(result);
assertThrows(ThrottlingQuotaExceededException.class,
() -> topicService.createTopic(new TopicCommand.TopicCommandOptions(new String[]{
"--bootstrap-server", bootstrapServer,
"--create", "--topic", topicName
})));
NewTopic expectedNewTopic = new NewTopic(topicName, Optional.empty(), Optional.empty())
.configs(Collections.emptyMap());
verify(adminClient, times(1)).createTopics(
eq(new HashSet<>(Arrays.asList(expectedNewTopic))),
argThat(exception -> !exception.shouldRetryOnQuotaViolation())
);
}
@Test
public void testDeleteTopicDoesNotRetryThrottlingQuotaExceededException() {
Admin adminClient = mock(Admin.class);
TopicCommand.TopicService topicService = new TopicCommand.TopicService(adminClient);
ListTopicsResult listResult = AdminClientTestUtils.listTopicsResult(topicName);
when(adminClient.listTopics(any())).thenReturn(listResult);
DeleteTopicsResult result = AdminClientTestUtils.deleteTopicsResult(topicName, Errors.THROTTLING_QUOTA_EXCEEDED.exception());
when(adminClient.deleteTopics(anyCollection(), any())).thenReturn(result);
ExecutionException exception = assertThrows(ExecutionException.class,
() -> topicService.deleteTopic(new TopicCommand.TopicCommandOptions(new String[]{
"--bootstrap-server", bootstrapServer,
"--delete", "--topic", topicName
})));
assertTrue(exception.getCause() instanceof ThrottlingQuotaExceededException);
verify(adminClient).deleteTopics(
argThat((Collection<String> topics) -> topics.equals(Arrays.asList(topicName))),
argThat((DeleteTopicsOptions options) -> !options.shouldRetryOnQuotaViolation()));
}
@Test
public void testCreatePartitionsDoesNotRetryThrottlingQuotaExceededException() {
Admin adminClient = mock(Admin.class);
TopicCommand.TopicService topicService = new TopicCommand.TopicService(adminClient);
ListTopicsResult listResult = AdminClientTestUtils.listTopicsResult(topicName);
when(adminClient.listTopics(any())).thenReturn(listResult);
TopicPartitionInfo topicPartitionInfo = new TopicPartitionInfo(0, new Node(0, "", 0),
Collections.emptyList(), Collections.emptyList());
DescribeTopicsResult describeResult = AdminClientTestUtils.describeTopicsResult(topicName,
new TopicDescription(topicName, false, Collections.singletonList(topicPartitionInfo)));
when(adminClient.describeTopics(anyCollection())).thenReturn(describeResult);
CreatePartitionsResult result = AdminClientTestUtils.createPartitionsResult(topicName, Errors.THROTTLING_QUOTA_EXCEEDED.exception());
when(adminClient.createPartitions(any(), any())).thenReturn(result);
Exception exception = assertThrows(ExecutionException.class,
() -> topicService.alterTopic(new TopicCommand.TopicCommandOptions(new String[]{
"--alter", "--topic", topicName, "--partitions", "3",
"--bootstrap-server", bootstrapServer
})));
assertTrue(exception.getCause() instanceof ThrottlingQuotaExceededException);
verify(adminClient, times(1)).createPartitions(
argThat(newPartitions -> newPartitions.get(topicName).totalCount() == 3),
argThat(createPartitionOption -> !createPartitionOption.shouldRetryOnQuotaViolation()));
}
public void assertInitializeInvalidOptionsExitCode(int expected, String[] options) {
Exit.setExitProcedure((exitCode, message) -> {
assertEquals(expected, exitCode);
throw new RuntimeException();
});
try {
assertThrows(RuntimeException.class, () -> new TopicCommand.TopicCommandOptions(options));
} finally {
Exit.resetExitProcedure();
}
}
}
Loading…
Cancel
Save