Browse Source

KAFKA-14581: Moving GetOffsetShell to tools (#13562)

This PR moves GetOffsetShell from core module to tools module with rewriting from Scala to Java.

Reviewers: Federico Valeri fedevaleri@gmail.com, Ziming Deng dengziming1993@gmail.com, Mickael Maison mimaison@apache.org.
pull/14272/head
Ruslan Krivoshein 1 year ago committed by GitHub
parent
commit
b72d92919f
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 2
      bin/kafka-get-offsets.sh
  2. 2
      bin/windows/kafka-get-offsets.bat
  3. 283
      core/src/main/scala/kafka/tools/GetOffsetShell.scala
  4. 264
      core/src/test/scala/kafka/tools/GetOffsetShellParsingTest.scala
  5. 278
      core/src/test/scala/kafka/tools/GetOffsetShellTest.scala
  6. 3
      tests/kafkatest/services/kafka/kafka.py
  7. 396
      tools/src/main/java/org/apache/kafka/tools/GetOffsetShell.java
  8. 28
      tools/src/main/java/org/apache/kafka/tools/ReplicaVerificationTool.java
  9. 24
      tools/src/main/java/org/apache/kafka/tools/ToolsUtils.java
  10. 248
      tools/src/test/java/org/apache/kafka/tools/GetOffsetShellParsingTest.java
  11. 376
      tools/src/test/java/org/apache/kafka/tools/GetOffsetShellTest.java

2
bin/kafka-get-offsets.sh

@ -14,4 +14,4 @@ @@ -14,4 +14,4 @@
# See the License for the specific language governing permissions and
# limitations under the License.
exec $(dirname $0)/kafka-run-class.sh kafka.tools.GetOffsetShell "$@"
exec $(dirname $0)/kafka-run-class.sh org.apache.kafka.tools.GetOffsetShell "$@"

2
bin/windows/kafka-get-offsets.bat

@ -14,4 +14,4 @@ rem WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. @@ -14,4 +14,4 @@ rem WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
rem See the License for the specific language governing permissions and
rem limitations under the License.
"%~dp0kafka-run-class.bat" kafka.tools.GetOffsetShell %*
"%~dp0kafka-run-class.bat" org.apache.kafka.tools.GetOffsetShell %*

283
core/src/main/scala/kafka/tools/GetOffsetShell.scala

@ -1,283 +0,0 @@ @@ -1,283 +0,0 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package kafka.tools
import joptsimple._
import kafka.utils.{Exit, ToolsUtils}
import org.apache.kafka.clients.admin.{Admin, AdminClientConfig, ListTopicsOptions, OffsetSpec}
import org.apache.kafka.common.{KafkaException, TopicPartition}
import org.apache.kafka.common.requests.{ListOffsetsRequest, ListOffsetsResponse}
import org.apache.kafka.common.utils.Utils
import org.apache.kafka.server.util.CommandLineUtils
import org.apache.kafka.server.util.TopicFilter.IncludeList
import org.apache.kafka.server.util.TopicPartitionFilter
import org.apache.kafka.server.util.TopicPartitionFilter.TopicFilterAndPartitionFilter
import org.apache.kafka.server.util.TopicPartitionFilter.CompositeTopicPartitionFilter
import org.apache.kafka.server.util.PartitionFilter.UniquePartitionFilter
import org.apache.kafka.server.util.PartitionFilter.PartitionRangeFilter
import org.apache.kafka.server.util.PartitionFilter.PartitionsSetFilter
import java.util.Properties
import java.util.concurrent.ExecutionException
import java.util.regex.Pattern
import scala.collection.Seq
import scala.jdk.CollectionConverters._
import scala.math.Ordering.Implicits.infixOrderingOps
object GetOffsetShell {
private val TopicPartitionPattern = Pattern.compile("([^:,]*)(?::(?:([0-9]*)|(?:([0-9]*)-([0-9]*))))?")
def main(args: Array[String]): Unit = {
try {
fetchOffsets(args)
} catch {
case e: Exception =>
println(s"Error occurred: ${e.getMessage}")
Exit.exit(1, Some(e.getMessage))
}
}
private[tools] def fetchOffsets(args: Array[String]): Unit = {
val parser = new OptionParser(false)
val brokerListOpt = parser.accepts("broker-list", "DEPRECATED, use --bootstrap-server instead; ignored if --bootstrap-server is specified. The server(s) to connect to in the form HOST1:PORT1,HOST2:PORT2.")
.withRequiredArg
.describedAs("HOST1:PORT1,...,HOST3:PORT3")
.ofType(classOf[String])
val bootstrapServerOpt = parser.accepts("bootstrap-server", "REQUIRED. The server(s) to connect to in the form HOST1:PORT1,HOST2:PORT2.")
.requiredUnless("broker-list")
.withRequiredArg
.describedAs("HOST1:PORT1,...,HOST3:PORT3")
.ofType(classOf[String])
val topicPartitionsOpt = parser.accepts("topic-partitions", s"Comma separated list of topic-partition patterns to get the offsets for, with the format of '$TopicPartitionPattern'." +
" The first group is an optional regex for the topic name, if omitted, it matches any topic name." +
" The section after ':' describes a 'partition' pattern, which can be: a number, a range in the format of 'NUMBER-NUMBER' (lower inclusive, upper exclusive), an inclusive lower bound in the format of 'NUMBER-', an exclusive upper bound in the format of '-NUMBER' or may be omitted to accept all partitions.")
.withRequiredArg
.describedAs("topic1:1,topic2:0-3,topic3,topic4:5-,topic5:-3")
.ofType(classOf[String])
val topicOpt = parser.accepts("topic", s"The topic to get the offsets for. It also accepts a regular expression. If not present, all authorized topics are queried. Cannot be used if --topic-partitions is present.")
.withRequiredArg
.describedAs("topic")
.ofType(classOf[String])
val partitionsOpt = parser.accepts("partitions", s"Comma separated list of partition ids to get the offsets for. If not present, all partitions of the authorized topics are queried. Cannot be used if --topic-partitions is present.")
.withRequiredArg
.describedAs("partition ids")
.ofType(classOf[String])
val timeOpt = parser.accepts("time", "timestamp of the offsets before that. [Note: No offset is returned, if the timestamp greater than recently committed record timestamp is given.]")
.withRequiredArg
.describedAs("<timestamp> / -1 or latest / -2 or earliest / -3 or max-timestamp")
.ofType(classOf[String])
.defaultsTo("latest")
val commandConfigOpt = parser.accepts("command-config", s"Property file containing configs to be passed to Admin Client.")
.withRequiredArg
.describedAs("config file")
.ofType(classOf[String])
val excludeInternalTopicsOpt = parser.accepts("exclude-internal-topics", s"By default, internal topics are included. If specified, internal topics are excluded.")
if (args.isEmpty)
CommandLineUtils.printUsageAndExit(parser, "An interactive shell for getting topic-partition offsets.")
val options = parser.parse(args : _*)
val effectiveBrokerListOpt = if (options.has(bootstrapServerOpt))
bootstrapServerOpt
else
brokerListOpt
CommandLineUtils.checkRequiredArgs(parser, options, effectiveBrokerListOpt)
val clientId = "GetOffsetShell"
val brokerList = options.valueOf(effectiveBrokerListOpt)
ToolsUtils.validatePortOrDie(parser, brokerList)
val excludeInternalTopics = options.has(excludeInternalTopicsOpt)
if (options.has(topicPartitionsOpt) && (options.has(topicOpt) || options.has(partitionsOpt))) {
throw new IllegalArgumentException("--topic-partitions cannot be used with --topic or --partitions")
}
val offsetSpec = parseOffsetSpec(options.valueOf(timeOpt))
val topicPartitionFilter = if (options.has(topicPartitionsOpt)) {
createTopicPartitionFilterWithPatternList(options.valueOf(topicPartitionsOpt))
} else {
createTopicPartitionFilterWithTopicAndPartitionPattern(
if (options.has(topicOpt)) Some(options.valueOf(topicOpt)) else None,
options.valueOf(partitionsOpt)
)
}
val config = if (options.has(commandConfigOpt))
Utils.loadProps(options.valueOf(commandConfigOpt))
else
new Properties
config.setProperty(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, brokerList)
config.setProperty(AdminClientConfig.CLIENT_ID_CONFIG, clientId)
val adminClient = Admin.create(config)
try {
val partitionInfos = listPartitionInfos(adminClient, topicPartitionFilter, excludeInternalTopics)
if (partitionInfos.isEmpty) {
throw new IllegalArgumentException("Could not match any topic-partitions with the specified filters")
}
val timestampsToSearch = partitionInfos.map(tp => tp -> offsetSpec).toMap.asJava
val listOffsetsResult = adminClient.listOffsets(timestampsToSearch)
val partitionOffsets = partitionInfos.flatMap { tp =>
try {
val partitionInfo = listOffsetsResult.partitionResult(tp).get
if (partitionInfo.offset != ListOffsetsResponse.UNKNOWN_OFFSET) {
Some((tp, partitionInfo.offset))
} else {
None
}
} catch {
case e: ExecutionException =>
e.getCause match {
case cause: KafkaException =>
System.err.println(s"Skip getting offsets for topic-partition ${tp.topic}:${tp.partition} due to error: ${cause.getMessage}")
case _ =>
throw e
}
None
}
}
partitionOffsets.sortWith((tp1, tp2) => compareTopicPartitions(tp1._1, tp2._1)).foreach {
case (tp, offset) => println(s"${tp.topic}:${tp.partition}:${Option(offset).getOrElse("")}")
}
} finally {
adminClient.close()
}
}
private def parseOffsetSpec(listOffsetsTimestamp: String): OffsetSpec = {
listOffsetsTimestamp match {
case "earliest" => OffsetSpec.earliest()
case "latest" => OffsetSpec.latest()
case "max-timestamp" => OffsetSpec.maxTimestamp()
case _ =>
try {
listOffsetsTimestamp.toLong match {
case ListOffsetsRequest.EARLIEST_TIMESTAMP => OffsetSpec.earliest()
case ListOffsetsRequest.LATEST_TIMESTAMP => OffsetSpec.latest()
case ListOffsetsRequest.MAX_TIMESTAMP => OffsetSpec.maxTimestamp()
case value => OffsetSpec.forTimestamp(value)
}
} catch {
case e: NumberFormatException =>
throw new IllegalArgumentException(s"Malformed time argument $listOffsetsTimestamp, please use -1 or latest / -2 or earliest / -3 or max-timestamp, or a specified long format timestamp", e)
}
}
}
def compareTopicPartitions(a: TopicPartition, b: TopicPartition): Boolean = {
(a.topic(), a.partition()) < (b.topic(), b.partition())
}
/**
* Creates a topic-partition filter based on a list of patterns.
* Expected format:
* List: TopicPartitionPattern(, TopicPartitionPattern)*
* TopicPartitionPattern: TopicPattern(:PartitionPattern)? | :PartitionPattern
* TopicPattern: REGEX
* PartitionPattern: NUMBER | NUMBER-(NUMBER)? | -NUMBER
*/
def createTopicPartitionFilterWithPatternList(
topicPartitions: String
): TopicPartitionFilter = {
val ruleSpecs = topicPartitions.split(",")
val rules = ruleSpecs.toSeq.map(ruleSpec => parseRuleSpec(ruleSpec))
new CompositeTopicPartitionFilter(rules.asJava)
}
def parseRuleSpec(ruleSpec: String): TopicPartitionFilter = {
val matcher = TopicPartitionPattern.matcher(ruleSpec)
if (!matcher.matches())
throw new IllegalArgumentException(s"Invalid rule specification: $ruleSpec")
def group(group: Int): Option[String] = {
Option(matcher.group(group)).filter(s => s != null && s.nonEmpty)
}
val topicFilter = new IncludeList(group(1).getOrElse(".*"))
val partitionFilter = group(2).map(_.toInt) match {
case Some(partition) =>
new UniquePartitionFilter(partition)
case None =>
val lowerRange = group(3).map(_.toInt).getOrElse(0)
val upperRange = group(4).map(_.toInt).getOrElse(Int.MaxValue)
new PartitionRangeFilter(lowerRange, upperRange)
}
new TopicFilterAndPartitionFilter(
topicFilter,
partitionFilter
)
}
/**
* Creates a topic-partition filter based on a topic pattern and a set of partition ids.
*/
def createTopicPartitionFilterWithTopicAndPartitionPattern(
topicOpt: Option[String],
partitionIds: String
): TopicFilterAndPartitionFilter = {
new TopicFilterAndPartitionFilter(
new IncludeList(topicOpt.getOrElse(".*")),
new PartitionsSetFilter(createPartitionSet(partitionIds).asJava)
)
}
def createPartitionSet(partitionsString: String): Set[Integer] = {
if (partitionsString == null || partitionsString.isEmpty)
Set.empty
else
partitionsString.split(",").map { partitionString =>
try Integer.valueOf(partitionString)
catch {
case _: NumberFormatException =>
throw new IllegalArgumentException(s"--partitions expects a comma separated list of numeric " +
s"partition ids, but received: $partitionsString")
}
}.toSet
}
/**
* Return the partition infos. Filter them with topicPartitionFilter.
*/
private def listPartitionInfos(
client: Admin,
topicPartitionFilter: TopicPartitionFilter,
excludeInternalTopics: Boolean
): Seq[TopicPartition] = {
val listTopicsOptions = new ListTopicsOptions().listInternal(!excludeInternalTopics)
val topics = client.listTopics(listTopicsOptions).names.get
val filteredTopics = topics.asScala.filter(topicPartitionFilter.isTopicAllowed)
client.describeTopics(filteredTopics.asJava).allTopicNames.get.asScala.flatMap { case (topic, description) =>
description
.partitions
.asScala
.map(tp => new TopicPartition(topic, tp.partition))
.filter(topicPartitionFilter.isTopicPartitionAllowed)
}.toBuffer
}
}

264
core/src/test/scala/kafka/tools/GetOffsetShellParsingTest.scala

@ -1,264 +0,0 @@ @@ -1,264 +0,0 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package kafka.tools
import org.apache.kafka.common.TopicPartition
import org.junit.jupiter.api.Assertions.{assertFalse, assertThrows, assertTrue}
import org.junit.jupiter.api.Test
class GetOffsetShellParsingTest {
@Test
def testTopicPartitionFilterForTopicName(): Unit = {
val topicPartitionFilter = GetOffsetShell.createTopicPartitionFilterWithPatternList("test")
assertTrue(topicPartitionFilter.isTopicAllowed("test"))
assertFalse(topicPartitionFilter.isTopicAllowed("test1"))
assertFalse(topicPartitionFilter.isTopicAllowed("__consumer_offsets"))
assertTrue(topicPartitionFilter.isTopicPartitionAllowed(topicPartition("test", 0)))
assertTrue(topicPartitionFilter.isTopicPartitionAllowed(topicPartition("test", 1)))
assertFalse(topicPartitionFilter.isTopicPartitionAllowed(topicPartition("test1", 0)))
assertFalse(topicPartitionFilter.isTopicPartitionAllowed(topicPartition("__consumer_offsets", 0)))
}
@Test
def testTopicPartitionFilterForInternalTopicName(): Unit = {
val topicPartitionFilter = GetOffsetShell.createTopicPartitionFilterWithPatternList("__consumer_offsets")
assertTrue(topicPartitionFilter.isTopicAllowed("__consumer_offsets"))
assertFalse(topicPartitionFilter.isTopicAllowed("test1"))
assertFalse(topicPartitionFilter.isTopicAllowed("test2"))
assertTrue(topicPartitionFilter.isTopicPartitionAllowed(topicPartition("__consumer_offsets", 0)))
assertTrue(topicPartitionFilter.isTopicPartitionAllowed(topicPartition("__consumer_offsets", 1)))
assertFalse(topicPartitionFilter.isTopicPartitionAllowed(topicPartition("test1", 0)))
assertFalse(topicPartitionFilter.isTopicPartitionAllowed(topicPartition("test2", 0)))
}
@Test
def testTopicPartitionFilterForTopicNameList(): Unit = {
val topicPartitionFilter = GetOffsetShell.createTopicPartitionFilterWithPatternList("test,test1,__consumer_offsets")
assertTrue(topicPartitionFilter.isTopicAllowed("test"))
assertTrue(topicPartitionFilter.isTopicAllowed("test1"))
assertTrue(topicPartitionFilter.isTopicAllowed("__consumer_offsets"))
assertFalse(topicPartitionFilter.isTopicAllowed("test2"))
assertTrue(topicPartitionFilter.isTopicPartitionAllowed(topicPartition("test", 0)))
assertTrue(topicPartitionFilter.isTopicPartitionAllowed(topicPartition("test1", 1)))
assertTrue(topicPartitionFilter.isTopicPartitionAllowed(topicPartition("__consumer_offsets", 0)))
assertFalse(topicPartitionFilter.isTopicPartitionAllowed(topicPartition("test2", 0)))
}
@Test
def testTopicPartitionFilterForRegex(): Unit = {
val topicPartitionFilter = GetOffsetShell.createTopicPartitionFilterWithPatternList("test.*")
assertTrue(topicPartitionFilter.isTopicAllowed("test"))
assertTrue(topicPartitionFilter.isTopicAllowed("test1"))
assertTrue(topicPartitionFilter.isTopicAllowed("test2"))
assertFalse(topicPartitionFilter.isTopicAllowed("__consumer_offsets"))
assertTrue(topicPartitionFilter.isTopicPartitionAllowed(topicPartition("test", 0)))
assertTrue(topicPartitionFilter.isTopicPartitionAllowed(topicPartition("test1", 1)))
assertTrue(topicPartitionFilter.isTopicPartitionAllowed(topicPartition("test2", 0)))
assertFalse(topicPartitionFilter.isTopicPartitionAllowed(topicPartition("__consumer_offsets", 0)))
}
@Test
def testTopicPartitionFilterForPartitionIndexSpec(): Unit = {
val topicPartitionFilter = GetOffsetShell.createTopicPartitionFilterWithPatternList(":0")
assertTrue(topicPartitionFilter.isTopicAllowed("test"))
assertTrue(topicPartitionFilter.isTopicAllowed("test1"))
assertTrue(topicPartitionFilter.isTopicAllowed("test2"))
assertTrue(topicPartitionFilter.isTopicAllowed("__consumer_offsets"))
assertTrue(topicPartitionFilter.isTopicPartitionAllowed(topicPartition("test", 0)))
assertTrue(topicPartitionFilter.isTopicPartitionAllowed(topicPartition("test1", 0)))
assertFalse(topicPartitionFilter.isTopicPartitionAllowed(topicPartition("test2", 1)))
assertTrue(topicPartitionFilter.isTopicPartitionAllowed(topicPartition("__consumer_offsets", 0)))
assertFalse(topicPartitionFilter.isTopicPartitionAllowed(topicPartition("__consumer_offsets", 1)))
}
@Test
def testTopicPartitionFilterForPartitionRangeSpec(): Unit = {
val topicPartitionFilter = GetOffsetShell.createTopicPartitionFilterWithPatternList(":1-3")
assertTrue(topicPartitionFilter.isTopicAllowed("test"))
assertTrue(topicPartitionFilter.isTopicAllowed("test1"))
assertTrue(topicPartitionFilter.isTopicAllowed("__consumer_offsets"))
assertTrue(topicPartitionFilter.isTopicAllowed("test2"))
assertTrue(topicPartitionFilter.isTopicPartitionAllowed(topicPartition("test", 1)))
assertTrue(topicPartitionFilter.isTopicPartitionAllowed(topicPartition("test1", 2)))
assertTrue(topicPartitionFilter.isTopicPartitionAllowed(topicPartition("__consumer_offsets", 2)))
assertFalse(topicPartitionFilter.isTopicPartitionAllowed(topicPartition("test2", 0)))
assertFalse(topicPartitionFilter.isTopicPartitionAllowed(topicPartition("test2", 3)))
assertFalse(topicPartitionFilter.isTopicPartitionAllowed(topicPartition("__consumer_offsets", 3)))
}
@Test
def testTopicPartitionFilterForPartitionLowerBoundSpec(): Unit = {
val topicPartitionFilter = GetOffsetShell.createTopicPartitionFilterWithPatternList(":1-")
assertTrue(topicPartitionFilter.isTopicAllowed("test"))
assertTrue(topicPartitionFilter.isTopicAllowed("test1"))
assertTrue(topicPartitionFilter.isTopicAllowed("test2"))
assertTrue(topicPartitionFilter.isTopicAllowed("__consumer_offsets"))
assertTrue(topicPartitionFilter.isTopicPartitionAllowed(topicPartition("test", 1)))
assertTrue(topicPartitionFilter.isTopicPartitionAllowed(topicPartition("test1", 2)))
assertTrue(topicPartitionFilter.isTopicPartitionAllowed(topicPartition("__consumer_offsets", 2)))
assertFalse(topicPartitionFilter.isTopicPartitionAllowed(topicPartition("test2", 0)))
assertFalse(topicPartitionFilter.isTopicPartitionAllowed(topicPartition("__consumer_offsets", 0)))
}
@Test
def testTopicPartitionFilterForPartitionUpperBoundSpec(): Unit = {
val topicPartitionFilter = GetOffsetShell.createTopicPartitionFilterWithPatternList(":-3")
assertTrue(topicPartitionFilter.isTopicAllowed("test"))
assertTrue(topicPartitionFilter.isTopicAllowed("test1"))
assertTrue(topicPartitionFilter.isTopicAllowed("test2"))
assertTrue(topicPartitionFilter.isTopicAllowed("test3"))
assertTrue(topicPartitionFilter.isTopicAllowed("__consumer_offsets"))
assertTrue(topicPartitionFilter.isTopicPartitionAllowed(topicPartition("test", 0)))
assertTrue(topicPartitionFilter.isTopicPartitionAllowed(topicPartition("test1", 1)))
assertTrue(topicPartitionFilter.isTopicPartitionAllowed(topicPartition("test2", 2)))
assertTrue(topicPartitionFilter.isTopicPartitionAllowed(topicPartition("__consumer_offsets", 2)))
assertFalse(topicPartitionFilter.isTopicPartitionAllowed(topicPartition("test3", 3)))
assertFalse(topicPartitionFilter.isTopicPartitionAllowed(topicPartition("__consumer_offsets", 3)))
}
@Test
def testTopicPartitionFilterComplex(): Unit = {
val topicPartitionFilter = GetOffsetShell.createTopicPartitionFilterWithPatternList("test.*:0,__consumer_offsets:1-2,.*:3")
assertTrue(topicPartitionFilter.isTopicAllowed("test"))
assertTrue(topicPartitionFilter.isTopicAllowed("test1"))
assertTrue(topicPartitionFilter.isTopicAllowed("custom"))
assertTrue(topicPartitionFilter.isTopicAllowed("__consumer_offsets"))
assertTrue(topicPartitionFilter.isTopicPartitionAllowed(topicPartition("test", 0)))
assertTrue(topicPartitionFilter.isTopicPartitionAllowed(topicPartition("test1", 0)))
assertFalse(topicPartitionFilter.isTopicPartitionAllowed(topicPartition("test", 1)))
assertFalse(topicPartitionFilter.isTopicPartitionAllowed(topicPartition("test1", 1)))
assertTrue(topicPartitionFilter.isTopicPartitionAllowed(topicPartition("custom", 3)))
assertFalse(topicPartitionFilter.isTopicPartitionAllowed(topicPartition("custom", 0)))
assertTrue(topicPartitionFilter.isTopicPartitionAllowed(topicPartition("__consumer_offsets", 1)))
assertTrue(topicPartitionFilter.isTopicPartitionAllowed(topicPartition("__consumer_offsets", 3)))
assertFalse(topicPartitionFilter.isTopicPartitionAllowed(topicPartition("__consumer_offsets", 0)))
assertFalse(topicPartitionFilter.isTopicPartitionAllowed(topicPartition("__consumer_offsets", 2)))
}
@Test
def testPartitionFilterForSingleIndex(): Unit = {
val topicPartitionFilter = GetOffsetShell.createTopicPartitionFilterWithPatternList(":1")
assertTrue(topicPartitionFilter.isTopicPartitionAllowed(topicPartition("test", 1)))
assertFalse(topicPartitionFilter.isTopicPartitionAllowed(topicPartition("test", 0)))
assertFalse(topicPartitionFilter.isTopicPartitionAllowed(topicPartition("test", 2)))
}
@Test
def testPartitionFilterForRange(): Unit = {
val topicPartitionFilter = GetOffsetShell.createTopicPartitionFilterWithPatternList(":1-3")
assertFalse(topicPartitionFilter.isTopicPartitionAllowed(topicPartition("test", 0)))
assertTrue(topicPartitionFilter.isTopicPartitionAllowed(topicPartition("test", 1)))
assertTrue(topicPartitionFilter.isTopicPartitionAllowed(topicPartition("test", 2)))
assertFalse(topicPartitionFilter.isTopicPartitionAllowed(topicPartition("test", 3)))
assertFalse(topicPartitionFilter.isTopicPartitionAllowed(topicPartition("test", 4)))
assertFalse(topicPartitionFilter.isTopicPartitionAllowed(topicPartition("test", 5)))
}
@Test
def testPartitionFilterForLowerBound(): Unit = {
val topicPartitionFilter = GetOffsetShell.createTopicPartitionFilterWithPatternList(":3-")
assertFalse(topicPartitionFilter.isTopicPartitionAllowed(topicPartition("test", 0)))
assertFalse(topicPartitionFilter.isTopicPartitionAllowed(topicPartition("test", 1)))
assertFalse(topicPartitionFilter.isTopicPartitionAllowed(topicPartition("test", 2)))
assertTrue(topicPartitionFilter.isTopicPartitionAllowed(topicPartition("test", 3)))
assertTrue(topicPartitionFilter.isTopicPartitionAllowed(topicPartition("test", 4)))
assertTrue(topicPartitionFilter.isTopicPartitionAllowed(topicPartition("test", 5)))
}
@Test
def testPartitionFilterForUpperBound(): Unit = {
val topicPartitionFilter = GetOffsetShell.createTopicPartitionFilterWithPatternList(":-3")
assertTrue(topicPartitionFilter.isTopicPartitionAllowed(topicPartition("test", 0)))
assertTrue(topicPartitionFilter.isTopicPartitionAllowed(topicPartition("test", 1)))
assertTrue(topicPartitionFilter.isTopicPartitionAllowed(topicPartition("test", 2)))
assertFalse(topicPartitionFilter.isTopicPartitionAllowed(topicPartition("test", 3)))
assertFalse(topicPartitionFilter.isTopicPartitionAllowed(topicPartition("test", 4)))
assertFalse(topicPartitionFilter.isTopicPartitionAllowed(topicPartition("test", 5)))
}
@Test
def testPartitionsSetFilter(): Unit = {
val partitionsSetFilter = GetOffsetShell.createTopicPartitionFilterWithTopicAndPartitionPattern(Some("topic"), "1,3,5")
assertFalse(partitionsSetFilter.isTopicPartitionAllowed(topicPartition("topic", 0)))
assertFalse(partitionsSetFilter.isTopicPartitionAllowed(topicPartition("topic", 2)))
assertFalse(partitionsSetFilter.isTopicPartitionAllowed(topicPartition("topic", 4)))
assertFalse(partitionsSetFilter.isTopicPartitionAllowed(topicPartition("topic1", 1)))
assertFalse(partitionsSetFilter.isTopicAllowed("topic1"))
assertTrue(partitionsSetFilter.isTopicPartitionAllowed(topicPartition("topic", 1)))
assertTrue(partitionsSetFilter.isTopicPartitionAllowed(topicPartition("topic", 3)))
assertTrue(partitionsSetFilter.isTopicPartitionAllowed(topicPartition("topic", 5)))
assertTrue(partitionsSetFilter.isTopicAllowed("topic"))
}
@Test
def testPartitionFilterForInvalidSingleIndex(): Unit = {
assertThrows(classOf[IllegalArgumentException],
() => GetOffsetShell.createTopicPartitionFilterWithPatternList(":a"))
}
@Test
def testPartitionFilterForInvalidRange(): Unit = {
assertThrows(classOf[IllegalArgumentException],
() => GetOffsetShell.createTopicPartitionFilterWithPatternList(":a-b"))
}
@Test
def testPartitionFilterForInvalidLowerBound(): Unit = {
assertThrows(classOf[IllegalArgumentException],
() => GetOffsetShell.createTopicPartitionFilterWithPatternList(":a-"))
}
@Test
def testPartitionFilterForInvalidUpperBound(): Unit = {
assertThrows(classOf[IllegalArgumentException],
() => GetOffsetShell.createTopicPartitionFilterWithPatternList(":-b"))
}
@Test
def testInvalidTimeValue(): Unit = {
assertThrows(classOf[IllegalArgumentException],
() => GetOffsetShell.fetchOffsets(Array("--bootstrap-server", "localhost:9092", "--time", "invalid")))
}
private def topicPartition(topic: String, partition: Int): TopicPartition = {
new TopicPartition(topic, partition)
}
}

278
core/src/test/scala/kafka/tools/GetOffsetShellTest.scala

@ -1,278 +0,0 @@ @@ -1,278 +0,0 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package kafka.tools
import java.util.Properties
import kafka.integration.KafkaServerTestHarness
import kafka.server.KafkaConfig
import kafka.utils.{Exit, Logging, TestUtils}
import org.apache.kafka.clients.CommonClientConfigs
import org.apache.kafka.clients.producer.{KafkaProducer, ProducerConfig, ProducerRecord}
import org.apache.kafka.common.serialization.StringSerializer
import org.junit.jupiter.api.Assertions.{assertEquals, assertTrue}
import org.junit.jupiter.api.{BeforeEach, Test, TestInfo}
import org.junit.jupiter.params.ParameterizedTest
import org.junit.jupiter.params.provider.ValueSource
class GetOffsetShellTest extends KafkaServerTestHarness with Logging {
private val topicCount = 4
private val offsetTopicPartitionCount = 4
override def generateConfigs: collection.Seq[KafkaConfig] = TestUtils.createBrokerConfigs(1, zkConnect)
.map { p =>
p.put(KafkaConfig.OffsetsTopicPartitionsProp, Int.box(offsetTopicPartitionCount))
p
}.map(KafkaConfig.fromProps)
@BeforeEach
override def setUp(testInfo: TestInfo): Unit = {
super.setUp(testInfo)
Range(1, topicCount + 1).foreach(i => createTopic(topicName(i), i))
val props = new Properties()
props.put(CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers())
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, classOf[StringSerializer])
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, classOf[StringSerializer])
// Send X messages to each partition of topicX
val producer = new KafkaProducer[String, String](props)
Range(1, topicCount + 1).foreach(i => Range(0, i*i)
.foreach(msgCount => producer.send(new ProducerRecord[String, String](topicName(i), msgCount % i, null, "val" + msgCount))))
producer.close()
TestUtils.createOffsetsTopic(zkClient, servers)
}
@Test
def testNoFilterOptions(): Unit = {
val offsets = executeAndParse(Array())
assertEquals(expectedOffsetsWithInternal(), offsets)
}
@Test
def testInternalExcluded(): Unit = {
val offsets = executeAndParse(Array("--exclude-internal-topics"))
assertEquals(expectedTestTopicOffsets(), offsets)
}
@Test
def testTopicNameArg(): Unit = {
Range(1, topicCount + 1).foreach(i => {
val offsets = executeAndParse(Array("--topic", topicName(i)))
assertEquals(expectedOffsetsForTopic(i), offsets, () => "Offset output did not match for " + topicName(i))
})
}
@Test
def testTopicPatternArg(): Unit = {
val offsets = executeAndParse(Array("--topic", "topic.*"))
assertEquals(expectedTestTopicOffsets(), offsets)
}
@Test
def testPartitionsArg(): Unit = {
val offsets = executeAndParse(Array("--partitions", "0,1"))
assertEquals(expectedOffsetsWithInternal().filter { case (_, partition, _) => partition <= 1 }, offsets)
}
@Test
def testTopicPatternArgWithPartitionsArg(): Unit = {
val offsets = executeAndParse(Array("--topic", "topic.*", "--partitions", "0,1"))
assertEquals(expectedTestTopicOffsets().filter { case (_, partition, _) => partition <= 1 }, offsets)
}
@Test
def testTopicPartitionsArg(): Unit = {
val offsets = executeAndParse(Array("--topic-partitions", "topic1:0,topic2:1,topic(3|4):2,__.*:3"))
assertEquals(
List(
("__consumer_offsets", 3, Some(0)),
("topic1", 0, Some(1)),
("topic2", 1, Some(2)),
("topic3", 2, Some(3)),
("topic4", 2, Some(4))
),
offsets
)
}
@ParameterizedTest
@ValueSource(strings = Array("-1", "latest"))
def testGetLatestOffsets(time: String): Unit = {
val offsets = executeAndParse(Array("--topic-partitions", "topic.*:0", "--time", time))
assertEquals(
List(
("topic1", 0, Some(1)),
("topic2", 0, Some(2)),
("topic3", 0, Some(3)),
("topic4", 0, Some(4))
),
offsets
)
}
@ParameterizedTest
@ValueSource(strings = Array("-2", "earliest"))
def testGetEarliestOffsets(time: String): Unit = {
val offsets = executeAndParse(Array("--topic-partitions", "topic.*:0", "--time", time))
assertEquals(
List(
("topic1", 0, Some(0)),
("topic2", 0, Some(0)),
("topic3", 0, Some(0)),
("topic4", 0, Some(0))
),
offsets
)
}
@ParameterizedTest
@ValueSource(strings = Array("-3", "max-timestamp"))
def testGetOffsetsByMaxTimestamp(time: String): Unit = {
val offsets = executeAndParse(Array("--topic-partitions", "topic.*", "--time", time))
offsets.foreach { case (topic, _, timestampOpt) =>
// We can't know the exact offsets with max timestamp
assertTrue(timestampOpt.get >= 0 && timestampOpt.get <= topic.replace("topic", "").toInt)
}
}
@Test
def testGetOffsetsByTimestamp(): Unit = {
val time = (System.currentTimeMillis() / 2).toString
val offsets = executeAndParse(Array("--topic-partitions", "topic.*:0", "--time", time))
assertEquals(
List(
("topic1", 0, Some(0)),
("topic2", 0, Some(0)),
("topic3", 0, Some(0)),
("topic4", 0, Some(0))
),
offsets
)
}
@Test
def testNoOffsetIfTimestampGreaterThanLatestRecord(): Unit = {
val time = (System.currentTimeMillis() * 2).toString
val offsets = executeAndParse(Array("--topic-partitions", "topic.*", "--time", time))
assertEquals(List.empty, offsets)
}
@Test
def testTopicPartitionsArgWithInternalExcluded(): Unit = {
val offsets = executeAndParse(Array("--topic-partitions",
"topic1:0,topic2:1,topic(3|4):2,__.*:3", "--exclude-internal-topics"))
assertEquals(
List(
("topic1", 0, Some(1)),
("topic2", 1, Some(2)),
("topic3", 2, Some(3)),
("topic4", 2, Some(4))
),
offsets
)
}
@Test
def testTopicPartitionsArgWithInternalIncluded(): Unit = {
val offsets = executeAndParse(Array("--topic-partitions", "__.*:0"))
assertEquals(List(("__consumer_offsets", 0, Some(0))), offsets)
}
@Test
def testTopicPartitionsNotFoundForNonExistentTopic(): Unit = {
assertExitCodeIsOne(Array("--topic", "some_nonexistent_topic"))
}
@Test
def testTopicPartitionsNotFoundForExcludedInternalTopic(): Unit = {
assertExitCodeIsOne(Array("--topic", "some_nonexistent_topic:*"))
}
@Test
def testTopicPartitionsNotFoundForNonMatchingTopicPartitionPattern(): Unit = {
assertExitCodeIsOne(Array("--topic-partitions", "__consumer_offsets", "--exclude-internal-topics"))
}
@Test
def testTopicPartitionsFlagWithTopicFlagCauseExit(): Unit = {
assertExitCodeIsOne(Array("--topic-partitions", "__consumer_offsets", "--topic", "topic1"))
}
@Test
def testTopicPartitionsFlagWithPartitionsFlagCauseExit(): Unit = {
assertExitCodeIsOne(Array("--topic-partitions", "__consumer_offsets", "--partitions", "0"))
}
private def expectedOffsetsWithInternal(): List[(String, Int, Option[Long])] = {
Range(0, offsetTopicPartitionCount).map(i => ("__consumer_offsets", i, Some(0L))).toList ++ expectedTestTopicOffsets()
}
private def expectedTestTopicOffsets(): List[(String, Int, Option[Long])] = {
Range(1, topicCount + 1).flatMap(i => expectedOffsetsForTopic(i)).toList
}
private def expectedOffsetsForTopic(i: Int): List[(String, Int, Option[Long])] = {
val name = topicName(i)
Range(0, i).map(p => (name, p, Some(i.toLong))).toList
}
private def topicName(i: Int): String = "topic" + i
private def assertExitCodeIsOne(args: Array[String]): Unit = {
var exitStatus: Option[Int] = None
Exit.setExitProcedure { (status, _) =>
exitStatus = Some(status)
throw new RuntimeException
}
try {
GetOffsetShell.main(addBootstrapServer(args))
} catch {
case e: RuntimeException =>
} finally {
Exit.resetExitProcedure()
}
assertEquals(Some(1), exitStatus)
}
private def executeAndParse(args: Array[String]): List[(String, Int, Option[Long])] = {
val output = executeAndGrabOutput(args)
output.split(System.lineSeparator())
.map(_.split(":"))
.filter(_.length >= 2)
.map { line =>
val topic = line(0)
val partition = line(1).toInt
val timestamp = if (line.length == 2 || line(2).isEmpty) None else Some(line(2).toLong)
(topic, partition, timestamp)
}
.toList
}
private def executeAndGrabOutput(args: Array[String]): String = {
TestUtils.grabConsoleOutput(GetOffsetShell.main(addBootstrapServer(args)))
}
private def addBootstrapServer(args: Array[String]): Array[String] = {
args ++ Array("--bootstrap-server", bootstrapServers())
}
}

3
tests/kafkatest/services/kafka/kafka.py

@ -1777,8 +1777,7 @@ class KafkaService(KafkaPathResolverMixin, JmxMixin, Service): @@ -1777,8 +1777,7 @@ class KafkaService(KafkaPathResolverMixin, JmxMixin, Service):
node = self.nodes[0]
cmd = fix_opts_for_new_jvm(node)
cmd += self.path.script("kafka-run-class.sh", node)
cmd += " kafka.tools.GetOffsetShell"
cmd += self.path.script("kafka-get-offsets.sh", node)
cmd += " --bootstrap-server %s" % self.bootstrap_servers(self.security_protocol)
if time:

396
tools/src/main/java/org/apache/kafka/tools/GetOffsetShell.java

@ -0,0 +1,396 @@ @@ -0,0 +1,396 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.tools;
import joptsimple.OptionException;
import joptsimple.OptionSpec;
import joptsimple.OptionSpecBuilder;
import org.apache.kafka.clients.admin.Admin;
import org.apache.kafka.clients.admin.AdminClientConfig;
import org.apache.kafka.clients.admin.ListOffsetsResult;
import org.apache.kafka.clients.admin.ListOffsetsResult.ListOffsetsResultInfo;
import org.apache.kafka.clients.admin.ListTopicsOptions;
import org.apache.kafka.clients.admin.OffsetSpec;
import org.apache.kafka.common.KafkaException;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.requests.ListOffsetsRequest;
import org.apache.kafka.common.requests.ListOffsetsResponse;
import org.apache.kafka.common.utils.Exit;
import org.apache.kafka.common.utils.Utils;
import org.apache.kafka.server.util.CommandDefaultOptions;
import org.apache.kafka.server.util.CommandLineUtils;
import org.apache.kafka.server.util.PartitionFilter;
import org.apache.kafka.server.util.PartitionFilter.PartitionRangeFilter;
import org.apache.kafka.server.util.PartitionFilter.PartitionsSetFilter;
import org.apache.kafka.server.util.PartitionFilter.UniquePartitionFilter;
import org.apache.kafka.server.util.TopicFilter.IncludeList;
import org.apache.kafka.server.util.TopicPartitionFilter;
import org.apache.kafka.server.util.TopicPartitionFilter.CompositeTopicPartitionFilter;
import org.apache.kafka.server.util.TopicPartitionFilter.TopicFilterAndPartitionFilter;
import java.io.IOException;
import java.util.Arrays;
import java.util.Collections;
import java.util.Comparator;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.Set;
import java.util.TreeMap;
import java.util.concurrent.ExecutionException;
import java.util.function.IntFunction;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
import java.util.stream.Collectors;
public class GetOffsetShell {
private static final Pattern TOPIC_PARTITION_PATTERN = Pattern.compile("([^:,]*)(?::(?:([0-9]*)|(?:([0-9]*)-([0-9]*))))?");
public static void main(String... args) {
Exit.exit(mainNoExit(args));
}
static int mainNoExit(String... args) {
try {
execute(args);
return 0;
} catch (TerseException e) {
System.err.println("Error occurred: " + e.getMessage());
return 1;
} catch (Throwable e) {
System.err.println("Error occurred: " + e.getMessage());
System.err.println(Utils.stackTrace(e));
return 1;
}
}
static void execute(String... args) throws IOException, ExecutionException, InterruptedException, TerseException {
GetOffsetShell getOffsetShell = new GetOffsetShell();
GetOffsetShellOptions options = new GetOffsetShellOptions(args);
Map<TopicPartition, Long> partitionOffsets = getOffsetShell.fetchOffsets(options);
for (Map.Entry<TopicPartition, Long> entry : partitionOffsets.entrySet()) {
TopicPartition topic = entry.getKey();
System.out.println(String.join(":", new String[]{topic.topic(), String.valueOf(topic.partition()), entry.getValue().toString()}));
}
}
private static class GetOffsetShellOptions extends CommandDefaultOptions {
private final OptionSpec<String> brokerListOpt;
private final OptionSpec<String> bootstrapServerOpt;
private final OptionSpec<String> topicPartitionsOpt;
private final OptionSpec<String> topicOpt;
private final OptionSpec<String> partitionsOpt;
private final OptionSpec<String> timeOpt;
private final OptionSpec<String> commandConfigOpt;
private final OptionSpec<String> effectiveBrokerListOpt;
private final OptionSpecBuilder excludeInternalTopicsOpt;
public GetOffsetShellOptions(String[] args) throws TerseException {
super(args);
brokerListOpt = parser.accepts("broker-list", "DEPRECATED, use --bootstrap-server instead; ignored if --bootstrap-server is specified. The server(s) to connect to in the form HOST1:PORT1,HOST2:PORT2.")
.withRequiredArg()
.describedAs("HOST1:PORT1,...,HOST3:PORT3")
.ofType(String.class);
bootstrapServerOpt = parser.accepts("bootstrap-server", "REQUIRED. The server(s) to connect to in the form HOST1:PORT1,HOST2:PORT2.")
.requiredUnless("broker-list")
.withRequiredArg()
.describedAs("HOST1:PORT1,...,HOST3:PORT3")
.ofType(String.class);
topicPartitionsOpt = parser.accepts("topic-partitions", "Comma separated list of topic-partition patterns to get the offsets for, with the format of '" + TOPIC_PARTITION_PATTERN + "'." +
" The first group is an optional regex for the topic name, if omitted, it matches any topic name." +
" The section after ':' describes a 'partition' pattern, which can be: a number, a range in the format of 'NUMBER-NUMBER' (lower inclusive, upper exclusive), an inclusive lower bound in the format of 'NUMBER-', an exclusive upper bound in the format of '-NUMBER' or may be omitted to accept all partitions.")
.withRequiredArg()
.describedAs("topic1:1,topic2:0-3,topic3,topic4:5-,topic5:-3")
.ofType(String.class);
topicOpt = parser.accepts("topic", "The topic to get the offsets for. It also accepts a regular expression. If not present, all authorized topics are queried. Cannot be used if --topic-partitions is present.")
.withRequiredArg()
.describedAs("topic")
.ofType(String.class);
partitionsOpt = parser.accepts("partitions", "Comma separated list of partition ids to get the offsets for. If not present, all partitions of the authorized topics are queried. Cannot be used if --topic-partitions is present.")
.withRequiredArg()
.describedAs("partition ids")
.ofType(String.class);
timeOpt = parser.accepts("time", "timestamp of the offsets before that. [Note: No offset is returned, if the timestamp greater than recently committed record timestamp is given.]")
.withRequiredArg()
.describedAs("<timestamp> / -1 or latest / -2 or earliest / -3 or max-timestamp")
.ofType(String.class)
.defaultsTo("latest");
commandConfigOpt = parser.accepts("command-config", "Property file containing configs to be passed to Admin Client.")
.withRequiredArg()
.describedAs("config file")
.ofType(String.class);
excludeInternalTopicsOpt = parser.accepts("exclude-internal-topics", "By default, internal topics are included. If specified, internal topics are excluded.");
if (args.length == 0) {
CommandLineUtils.printUsageAndExit(parser, "An interactive shell for getting topic-partition offsets.");
}
try {
options = parser.parse(args);
} catch (OptionException e) {
throw new TerseException(e.getMessage());
}
if (options.has(bootstrapServerOpt)) {
effectiveBrokerListOpt = bootstrapServerOpt;
} else {
effectiveBrokerListOpt = brokerListOpt;
}
CommandLineUtils.checkRequiredArgs(parser, options, effectiveBrokerListOpt);
String brokerList = options.valueOf(effectiveBrokerListOpt);
try {
ToolsUtils.validateBootstrapServer(brokerList);
} catch (IllegalArgumentException e) {
CommandLineUtils.printUsageAndExit(parser, e.getMessage());
}
}
public boolean hasTopicPartitionsOpt() {
return options.has(topicPartitionsOpt);
}
public String topicPartitionsOpt() {
return options.valueOf(topicPartitionsOpt);
}
public boolean hasTopicOpt() {
return options.has(topicOpt);
}
public String topicOpt() {
return options.valueOf(topicOpt);
}
public boolean hasPartitionsOpt() {
return options.has(partitionsOpt);
}
public String partitionsOpt() {
return options.valueOf(partitionsOpt);
}
public String timeOpt() {
return options.valueOf(timeOpt);
}
public boolean hasCommandConfigOpt() {
return options.has(commandConfigOpt);
}
public String commandConfigOpt() {
return options.valueOf(commandConfigOpt);
}
public String effectiveBrokerListOpt() {
return options.valueOf(effectiveBrokerListOpt);
}
public boolean hasExcludeInternalTopicsOpt() {
return options.has(excludeInternalTopicsOpt);
}
}
public Map<TopicPartition, Long> fetchOffsets(GetOffsetShellOptions options) throws IOException, ExecutionException, InterruptedException, TerseException {
String clientId = "GetOffsetShell";
String brokerList = options.effectiveBrokerListOpt();
if (options.hasTopicPartitionsOpt() && (options.hasTopicOpt() || options.hasPartitionsOpt())) {
throw new TerseException("--topic-partitions cannot be used with --topic or --partitions");
}
boolean excludeInternalTopics = options.hasExcludeInternalTopicsOpt();
OffsetSpec offsetSpec = parseOffsetSpec(options.timeOpt());
TopicPartitionFilter topicPartitionFilter;
if (options.hasTopicPartitionsOpt()) {
topicPartitionFilter = createTopicPartitionFilterWithPatternList(options.topicPartitionsOpt());
} else {
topicPartitionFilter = createTopicPartitionFilterWithTopicAndPartitionPattern(options.topicOpt(), options.partitionsOpt());
}
Properties config = options.hasCommandConfigOpt() ? Utils.loadProps(options.commandConfigOpt()) : new Properties();
config.setProperty(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, brokerList);
config.setProperty(AdminClientConfig.CLIENT_ID_CONFIG, clientId);
try (Admin adminClient = Admin.create(config)) {
List<TopicPartition> partitionInfos = listPartitionInfos(adminClient, topicPartitionFilter, excludeInternalTopics);
if (partitionInfos.isEmpty()) {
throw new TerseException("Could not match any topic-partitions with the specified filters");
}
Map<TopicPartition, OffsetSpec> timestampsToSearch = partitionInfos.stream().collect(Collectors.toMap(tp -> tp, tp -> offsetSpec));
ListOffsetsResult listOffsetsResult = adminClient.listOffsets(timestampsToSearch);
TreeMap<TopicPartition, Long> partitionOffsets = new TreeMap<>(Comparator.comparing(TopicPartition::toString));
for (TopicPartition partition : partitionInfos) {
ListOffsetsResultInfo partitionInfo;
try {
partitionInfo = listOffsetsResult.partitionResult(partition).get();
} catch (ExecutionException e) {
if (e.getCause() instanceof KafkaException) {
System.err.println("Skip getting offsets for topic-partition " + partition.toString() + " due to error: " + e.getMessage());
} else {
throw e;
}
continue;
}
if (partitionInfo.offset() != ListOffsetsResponse.UNKNOWN_OFFSET) {
partitionOffsets.put(partition, partitionInfo.offset());
}
}
return partitionOffsets;
}
}
private OffsetSpec parseOffsetSpec(String listOffsetsTimestamp) throws TerseException {
switch (listOffsetsTimestamp) {
case "earliest":
return OffsetSpec.earliest();
case "latest":
return OffsetSpec.latest();
case "max-timestamp":
return OffsetSpec.maxTimestamp();
default:
long timestamp;
try {
timestamp = Long.parseLong(listOffsetsTimestamp);
} catch (NumberFormatException e) {
throw new TerseException("Malformed time argument " + listOffsetsTimestamp + ". " +
"Please use -1 or latest / -2 or earliest / -3 or max-timestamp, or a specified long format timestamp");
}
if (timestamp == ListOffsetsRequest.EARLIEST_TIMESTAMP) {
return OffsetSpec.earliest();
} else if (timestamp == ListOffsetsRequest.LATEST_TIMESTAMP) {
return OffsetSpec.latest();
} else if (timestamp == ListOffsetsRequest.MAX_TIMESTAMP) {
return OffsetSpec.maxTimestamp();
} else {
return OffsetSpec.forTimestamp(timestamp);
}
}
}
/**
* Creates a topic-partition filter based on a list of patterns.
* Expected format:
* List: TopicPartitionPattern(, TopicPartitionPattern)*
* TopicPartitionPattern: TopicPattern(:PartitionPattern)? | :PartitionPattern
* TopicPattern: REGEX
* PartitionPattern: NUMBER | NUMBER-(NUMBER)? | -NUMBER
*/
public TopicPartitionFilter createTopicPartitionFilterWithPatternList(String topicPartitions) {
List<String> ruleSpecs = Arrays.asList(topicPartitions.split(","));
List<TopicPartitionFilter> rules = ruleSpecs.stream().map(ruleSpec -> {
try {
return parseRuleSpec(ruleSpec);
} catch (TerseException e) {
throw new RuntimeException(e);
}
}).collect(Collectors.toList());
return new CompositeTopicPartitionFilter(rules);
}
/**
* Creates a topic-partition filter based on a topic pattern and a set of partition ids.
*/
public TopicPartitionFilter createTopicPartitionFilterWithTopicAndPartitionPattern(String topicOpt, String partitionIds) throws TerseException {
return new TopicFilterAndPartitionFilter(
new IncludeList(topicOpt != null ? topicOpt : ".*"),
new PartitionsSetFilter(createPartitionSet(partitionIds))
);
}
private Set<Integer> createPartitionSet(String partitionsString) throws TerseException {
Set<Integer> partitions;
if (partitionsString == null || partitionsString.isEmpty()) {
partitions = Collections.emptySet();
} else {
try {
partitions = Arrays.stream(partitionsString.split(",")).map(Integer::parseInt).collect(Collectors.toSet());
} catch (NumberFormatException e) {
throw new TerseException("--partitions expects a comma separated list of numeric " +
"partition ids, but received: " + partitionsString);
}
}
return partitions;
}
/**
* Return the partition infos. Filter them with topicPartitionFilter.
*/
private List<TopicPartition> listPartitionInfos(
Admin client,
TopicPartitionFilter topicPartitionFilter,
boolean excludeInternalTopics
) throws ExecutionException, InterruptedException {
ListTopicsOptions listTopicsOptions = new ListTopicsOptions().listInternal(!excludeInternalTopics);
Set<String> topics = client.listTopics(listTopicsOptions).names().get();
Set<String> filteredTopics = topics.stream().filter(topicPartitionFilter::isTopicAllowed).collect(Collectors.toSet());
return client.describeTopics(filteredTopics).allTopicNames().get().entrySet().stream().flatMap(
topic -> topic.getValue().partitions().stream().map(
tp -> new TopicPartition(topic.getKey(), tp.partition())
).filter(topicPartitionFilter::isTopicPartitionAllowed)
).collect(Collectors.toList());
}
private TopicPartitionFilter parseRuleSpec(String ruleSpec) throws TerseException, RuntimeException {
Matcher matcher = TOPIC_PARTITION_PATTERN.matcher(ruleSpec);
if (!matcher.matches())
throw new TerseException("Invalid rule specification: " + ruleSpec);
IntFunction<String> group = (int g) -> (matcher.group(g) != null && !matcher.group(g).isEmpty()) ? matcher.group(g) : null;
IncludeList topicFilter = group.apply(1) != null ? new IncludeList(group.apply(1)) : new IncludeList(".*");
PartitionFilter partitionFilter;
if (group.apply(2) != null) {
partitionFilter = new UniquePartitionFilter(Integer.parseInt(group.apply(2)));
} else {
int lowerRange = group.apply(3) != null ? Integer.parseInt(group.apply(3)) : 0;
int upperRange = group.apply(4) != null ? Integer.parseInt(group.apply(4)) : Integer.MAX_VALUE;
partitionFilter = new PartitionRangeFilter(lowerRange, upperRange);
}
return new TopicPartitionFilter.TopicFilterAndPartitionFilter(topicFilter, partitionFilter);
}
}

28
tools/src/main/java/org/apache/kafka/tools/ReplicaVerificationTool.java

@ -16,7 +16,6 @@ @@ -16,7 +16,6 @@
*/
package org.apache.kafka.tools;
import joptsimple.OptionParser;
import joptsimple.OptionSpec;
import org.apache.kafka.clients.ApiVersions;
import org.apache.kafka.clients.ClientRequest;
@ -62,7 +61,6 @@ import org.slf4j.LoggerFactory; @@ -62,7 +61,6 @@ import org.slf4j.LoggerFactory;
import java.net.SocketTimeoutException;
import java.text.SimpleDateFormat;
import java.util.Arrays;
import java.util.Collection;
import java.util.Date;
import java.util.HashMap;
@ -312,33 +310,19 @@ public class ReplicaVerificationTool { @@ -312,33 +310,19 @@ public class ReplicaVerificationTool {
}
CommandLineUtils.checkRequiredArgs(parser, options, brokerListOpt);
CommandLineUtils.checkInvalidArgs(parser, options, topicsIncludeOpt, topicWhiteListOpt);
}
String brokerHostsAndPorts() {
String brokerList = options.valueOf(brokerListOpt);
validateBrokerList(parser, brokerList);
return brokerList;
}
void validateBrokerList(OptionParser parser, String brokerList) {
if (parser == null || brokerList == null) {
throw new RuntimeException("No option parser or broker list found");
}
if (brokerList.isEmpty()) {
CommandLineUtils.printUsageAndExit(parser, "Empty broker list option");
try {
ToolsUtils.validateBootstrapServer(brokerList);
} catch (IllegalArgumentException e) {
CommandLineUtils.printUsageAndExit(parser, e.getMessage());
}
String[] hostPorts;
if (brokerList.contains(",")) hostPorts = brokerList.split(",");
else hostPorts = new String[]{brokerList};
String[] validHostPort = Arrays.stream(hostPorts)
.filter(hostPortData -> Utils.getPort(hostPortData) != null)
.toArray(String[]::new);
if (validHostPort.length == 0 || validHostPort.length != hostPorts.length) {
CommandLineUtils.printUsageAndExit(parser, "Invalid broker list option");
}
return brokerList;
}
TopicFilter.IncludeList topicsIncludeFilter() {

24
tools/src/main/java/org/apache/kafka/tools/ToolsUtils.java

@ -18,8 +18,10 @@ package org.apache.kafka.tools; @@ -18,8 +18,10 @@ package org.apache.kafka.tools;
import org.apache.kafka.common.Metric;
import org.apache.kafka.common.MetricName;
import org.apache.kafka.common.utils.Utils;
import java.io.PrintStream;
import java.util.Arrays;
import java.util.List;
import java.util.Map;
import java.util.TreeMap;
@ -99,4 +101,26 @@ public class ToolsUtils { @@ -99,4 +101,26 @@ public class ToolsUtils {
printRow(columnLengths, headers, out);
rows.forEach(row -> printRow(columnLengths, row, out));
}
public static void validateBootstrapServer(String hostPort) throws IllegalArgumentException {
if (hostPort == null || hostPort.trim().isEmpty()) {
throw new IllegalArgumentException("Error while validating the bootstrap address\n");
}
String[] hostPorts;
if (hostPort.contains(",")) {
hostPorts = hostPort.split(",");
} else {
hostPorts = new String[] {hostPort};
}
String[] validHostPort = Arrays.stream(hostPorts)
.filter(hostPortData -> Utils.getPort(hostPortData) != null)
.toArray(String[]::new);
if (validHostPort.length == 0 || validHostPort.length != hostPorts.length) {
throw new IllegalArgumentException("Please provide valid host:port like host1:9091,host2:9092\n");
}
}
}

248
tools/src/test/java/org/apache/kafka/tools/GetOffsetShellParsingTest.java

@ -0,0 +1,248 @@ @@ -0,0 +1,248 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.tools;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.server.util.TopicPartitionFilter;
import org.junit.jupiter.api.Test;
import static org.junit.jupiter.api.Assertions.assertFalse;
import static org.junit.jupiter.api.Assertions.assertThrows;
import static org.junit.jupiter.api.Assertions.assertTrue;
public class GetOffsetShellParsingTest {
GetOffsetShell getOffsetShell = new GetOffsetShell();
@Test
public void testTopicPartitionFilterForTopicName() {
TopicPartitionFilter topicPartitionFilter = getOffsetShell.createTopicPartitionFilterWithPatternList("test");
assertTrue(topicPartitionFilter.isTopicAllowed("test"));
assertFalse(topicPartitionFilter.isTopicAllowed("test1"));
assertFalse(topicPartitionFilter.isTopicAllowed("__consumer_offsets"));
assertTrue(topicPartitionFilter.isTopicPartitionAllowed(getTopicPartition("test", 0)));
assertTrue(topicPartitionFilter.isTopicPartitionAllowed(getTopicPartition("test", 1)));
assertFalse(topicPartitionFilter.isTopicPartitionAllowed(getTopicPartition("test1", 0)));
assertFalse(topicPartitionFilter.isTopicPartitionAllowed(getTopicPartition("__consumer_offsets", 0)));
}
@Test
public void testTopicPartitionFilterForInternalTopicName() {
TopicPartitionFilter topicPartitionFilter = getOffsetShell.createTopicPartitionFilterWithPatternList("__consumer_offsets");
assertTrue(topicPartitionFilter.isTopicAllowed("__consumer_offsets"));
assertFalse(topicPartitionFilter.isTopicAllowed("test1"));
assertFalse(topicPartitionFilter.isTopicAllowed("test2"));
assertTrue(topicPartitionFilter.isTopicPartitionAllowed(getTopicPartition("__consumer_offsets", 0)));
assertTrue(topicPartitionFilter.isTopicPartitionAllowed(getTopicPartition("__consumer_offsets", 1)));
assertFalse(topicPartitionFilter.isTopicPartitionAllowed(getTopicPartition("test1", 0)));
assertFalse(topicPartitionFilter.isTopicPartitionAllowed(getTopicPartition("test2", 0)));
}
@Test
public void testTopicPartitionFilterForTopicNameList() {
TopicPartitionFilter topicPartitionFilter = getOffsetShell.createTopicPartitionFilterWithPatternList("test,test1,__consumer_offsets");
assertTrue(topicPartitionFilter.isTopicAllowed("test"));
assertTrue(topicPartitionFilter.isTopicAllowed("test1"));
assertTrue(topicPartitionFilter.isTopicAllowed("__consumer_offsets"));
assertFalse(topicPartitionFilter.isTopicAllowed("test2"));
assertTrue(topicPartitionFilter.isTopicPartitionAllowed(getTopicPartition("test", 0)));
assertTrue(topicPartitionFilter.isTopicPartitionAllowed(getTopicPartition("test1", 1)));
assertTrue(topicPartitionFilter.isTopicPartitionAllowed(getTopicPartition("__consumer_offsets", 0)));
assertFalse(topicPartitionFilter.isTopicPartitionAllowed(getTopicPartition("test2", 0)));
}
@Test
public void testTopicPartitionFilterForRegex() {
TopicPartitionFilter topicPartitionFilter = getOffsetShell.createTopicPartitionFilterWithPatternList("test.*");
assertTrue(topicPartitionFilter.isTopicAllowed("test"));
assertTrue(topicPartitionFilter.isTopicAllowed("test1"));
assertTrue(topicPartitionFilter.isTopicAllowed("test2"));
assertFalse(topicPartitionFilter.isTopicAllowed("__consumer_offsets"));
assertTrue(topicPartitionFilter.isTopicPartitionAllowed(getTopicPartition("test", 0)));
assertTrue(topicPartitionFilter.isTopicPartitionAllowed(getTopicPartition("test1", 1)));
assertTrue(topicPartitionFilter.isTopicPartitionAllowed(getTopicPartition("test2", 0)));
assertFalse(topicPartitionFilter.isTopicPartitionAllowed(getTopicPartition("__consumer_offsets", 0)));
}
@Test
public void testTopicPartitionFilterForPartitionIndexSpec() {
TopicPartitionFilter topicPartitionFilter = getOffsetShell.createTopicPartitionFilterWithPatternList(":0");
assertTrue(topicPartitionFilter.isTopicAllowed("test"));
assertTrue(topicPartitionFilter.isTopicAllowed("test1"));
assertTrue(topicPartitionFilter.isTopicAllowed("test2"));
assertTrue(topicPartitionFilter.isTopicAllowed("__consumer_offsets"));
assertTrue(topicPartitionFilter.isTopicPartitionAllowed(getTopicPartition("test", 0)));
assertTrue(topicPartitionFilter.isTopicPartitionAllowed(getTopicPartition("test1", 0)));
assertFalse(topicPartitionFilter.isTopicPartitionAllowed(getTopicPartition("test2", 1)));
assertTrue(topicPartitionFilter.isTopicPartitionAllowed(getTopicPartition("__consumer_offsets", 0)));
assertFalse(topicPartitionFilter.isTopicPartitionAllowed(getTopicPartition("__consumer_offsets", 1)));
}
@Test
public void testTopicPartitionFilterForPartitionRangeSpec() {
TopicPartitionFilter topicPartitionFilter = getOffsetShell.createTopicPartitionFilterWithPatternList(":1-3");
assertTrue(topicPartitionFilter.isTopicAllowed("test"));
assertTrue(topicPartitionFilter.isTopicAllowed("test1"));
assertTrue(topicPartitionFilter.isTopicAllowed("__consumer_offsets"));
assertTrue(topicPartitionFilter.isTopicAllowed("test2"));
assertTrue(topicPartitionFilter.isTopicPartitionAllowed(getTopicPartition("test", 1)));
assertTrue(topicPartitionFilter.isTopicPartitionAllowed(getTopicPartition("test1", 2)));
assertTrue(topicPartitionFilter.isTopicPartitionAllowed(getTopicPartition("__consumer_offsets", 2)));
assertFalse(topicPartitionFilter.isTopicPartitionAllowed(getTopicPartition("test2", 0)));
assertFalse(topicPartitionFilter.isTopicPartitionAllowed(getTopicPartition("test2", 3)));
assertFalse(topicPartitionFilter.isTopicPartitionAllowed(getTopicPartition("__consumer_offsets", 3)));
}
@Test
public void testTopicPartitionFilterForPartitionLowerBoundSpec() {
TopicPartitionFilter topicPartitionFilter = getOffsetShell.createTopicPartitionFilterWithPatternList(":1-");
assertTrue(topicPartitionFilter.isTopicAllowed("test"));
assertTrue(topicPartitionFilter.isTopicAllowed("test1"));
assertTrue(topicPartitionFilter.isTopicAllowed("test2"));
assertTrue(topicPartitionFilter.isTopicAllowed("__consumer_offsets"));
assertTrue(topicPartitionFilter.isTopicPartitionAllowed(getTopicPartition("test", 1)));
assertTrue(topicPartitionFilter.isTopicPartitionAllowed(getTopicPartition("test1", 2)));
assertTrue(topicPartitionFilter.isTopicPartitionAllowed(getTopicPartition("__consumer_offsets", 2)));
assertFalse(topicPartitionFilter.isTopicPartitionAllowed(getTopicPartition("test2", 0)));
assertFalse(topicPartitionFilter.isTopicPartitionAllowed(getTopicPartition("__consumer_offsets", 0)));
}
@Test
public void testTopicPartitionFilterForPartitionUpperBoundSpec() {
TopicPartitionFilter topicPartitionFilter = getOffsetShell.createTopicPartitionFilterWithPatternList(":-3");
assertTrue(topicPartitionFilter.isTopicAllowed("test"));
assertTrue(topicPartitionFilter.isTopicAllowed("test1"));
assertTrue(topicPartitionFilter.isTopicAllowed("test2"));
assertTrue(topicPartitionFilter.isTopicAllowed("test3"));
assertTrue(topicPartitionFilter.isTopicAllowed("__consumer_offsets"));
assertTrue(topicPartitionFilter.isTopicPartitionAllowed(getTopicPartition("test", 0)));
assertTrue(topicPartitionFilter.isTopicPartitionAllowed(getTopicPartition("test1", 1)));
assertTrue(topicPartitionFilter.isTopicPartitionAllowed(getTopicPartition("test2", 2)));
assertTrue(topicPartitionFilter.isTopicPartitionAllowed(getTopicPartition("__consumer_offsets", 2)));
assertFalse(topicPartitionFilter.isTopicPartitionAllowed(getTopicPartition("test3", 3)));
assertFalse(topicPartitionFilter.isTopicPartitionAllowed(getTopicPartition("__consumer_offsets", 3)));
}
@Test
public void testTopicPartitionFilterComplex() {
TopicPartitionFilter topicPartitionFilter = getOffsetShell.createTopicPartitionFilterWithPatternList("test.*:0,__consumer_offsets:1-2,.*:3");
assertTrue(topicPartitionFilter.isTopicAllowed("test"));
assertTrue(topicPartitionFilter.isTopicAllowed("test1"));
assertTrue(topicPartitionFilter.isTopicAllowed("custom"));
assertTrue(topicPartitionFilter.isTopicAllowed("__consumer_offsets"));
assertTrue(topicPartitionFilter.isTopicPartitionAllowed(getTopicPartition("test", 0)));
assertTrue(topicPartitionFilter.isTopicPartitionAllowed(getTopicPartition("test1", 0)));
assertFalse(topicPartitionFilter.isTopicPartitionAllowed(getTopicPartition("test", 1)));
assertFalse(topicPartitionFilter.isTopicPartitionAllowed(getTopicPartition("test1", 1)));
assertTrue(topicPartitionFilter.isTopicPartitionAllowed(getTopicPartition("custom", 3)));
assertFalse(topicPartitionFilter.isTopicPartitionAllowed(getTopicPartition("custom", 0)));
assertTrue(topicPartitionFilter.isTopicPartitionAllowed(getTopicPartition("__consumer_offsets", 1)));
assertTrue(topicPartitionFilter.isTopicPartitionAllowed(getTopicPartition("__consumer_offsets", 3)));
assertFalse(topicPartitionFilter.isTopicPartitionAllowed(getTopicPartition("__consumer_offsets", 0)));
assertFalse(topicPartitionFilter.isTopicPartitionAllowed(getTopicPartition("__consumer_offsets", 2)));
}
@Test
public void testPartitionFilterForSingleIndex() {
TopicPartitionFilter topicPartitionFilter = getOffsetShell.createTopicPartitionFilterWithPatternList(":1");
assertTrue(topicPartitionFilter.isTopicPartitionAllowed(getTopicPartition("test", 1)));
assertFalse(topicPartitionFilter.isTopicPartitionAllowed(getTopicPartition("test", 0)));
assertFalse(topicPartitionFilter.isTopicPartitionAllowed(getTopicPartition("test", 2)));
}
@Test
public void testPartitionFilterForRange() {
TopicPartitionFilter topicPartitionFilter = getOffsetShell.createTopicPartitionFilterWithPatternList(":1-3");
assertFalse(topicPartitionFilter.isTopicPartitionAllowed(getTopicPartition("test", 0)));
assertTrue(topicPartitionFilter.isTopicPartitionAllowed(getTopicPartition("test", 1)));
assertTrue(topicPartitionFilter.isTopicPartitionAllowed(getTopicPartition("test", 2)));
assertFalse(topicPartitionFilter.isTopicPartitionAllowed(getTopicPartition("test", 3)));
assertFalse(topicPartitionFilter.isTopicPartitionAllowed(getTopicPartition("test", 4)));
assertFalse(topicPartitionFilter.isTopicPartitionAllowed(getTopicPartition("test", 5)));
}
@Test
public void testPartitionFilterForLowerBound() {
TopicPartitionFilter topicPartitionFilter = getOffsetShell.createTopicPartitionFilterWithPatternList(":3-");
assertFalse(topicPartitionFilter.isTopicPartitionAllowed(getTopicPartition("test", 0)));
assertFalse(topicPartitionFilter.isTopicPartitionAllowed(getTopicPartition("test", 1)));
assertFalse(topicPartitionFilter.isTopicPartitionAllowed(getTopicPartition("test", 2)));
assertTrue(topicPartitionFilter.isTopicPartitionAllowed(getTopicPartition("test", 3)));
assertTrue(topicPartitionFilter.isTopicPartitionAllowed(getTopicPartition("test", 4)));
assertTrue(topicPartitionFilter.isTopicPartitionAllowed(getTopicPartition("test", 5)));
}
@Test
public void testPartitionFilterForUpperBound() {
TopicPartitionFilter topicPartitionFilter = getOffsetShell.createTopicPartitionFilterWithPatternList(":-3");
assertTrue(topicPartitionFilter.isTopicPartitionAllowed(getTopicPartition("test", 0)));
assertTrue(topicPartitionFilter.isTopicPartitionAllowed(getTopicPartition("test", 1)));
assertTrue(topicPartitionFilter.isTopicPartitionAllowed(getTopicPartition("test", 2)));
assertFalse(topicPartitionFilter.isTopicPartitionAllowed(getTopicPartition("test", 3)));
assertFalse(topicPartitionFilter.isTopicPartitionAllowed(getTopicPartition("test", 4)));
assertFalse(topicPartitionFilter.isTopicPartitionAllowed(getTopicPartition("test", 5)));
}
@Test
public void testPartitionsSetFilter() throws TerseException {
TopicPartitionFilter partitionsSetFilter = getOffsetShell.createTopicPartitionFilterWithTopicAndPartitionPattern("topic", "1,3,5");
assertFalse(partitionsSetFilter.isTopicPartitionAllowed(getTopicPartition("topic", 0)));
assertFalse(partitionsSetFilter.isTopicPartitionAllowed(getTopicPartition("topic", 2)));
assertFalse(partitionsSetFilter.isTopicPartitionAllowed(getTopicPartition("topic", 4)));
assertFalse(partitionsSetFilter.isTopicPartitionAllowed(getTopicPartition("topic1", 1)));
assertFalse(partitionsSetFilter.isTopicAllowed("topic1"));
assertTrue(partitionsSetFilter.isTopicPartitionAllowed(getTopicPartition("topic", 1)));
assertTrue(partitionsSetFilter.isTopicPartitionAllowed(getTopicPartition("topic", 3)));
assertTrue(partitionsSetFilter.isTopicPartitionAllowed(getTopicPartition("topic", 5)));
assertTrue(partitionsSetFilter.isTopicAllowed("topic"));
}
@Test
public void testInvalidTimeValue() {
assertThrows(TerseException.class, () -> GetOffsetShell.execute("--bootstrap-server", "localhost:9092", "--time", "invalid"));
}
private TopicPartition getTopicPartition(String topic, Integer partition) {
return new TopicPartition(topic, partition);
}
}

376
tools/src/test/java/org/apache/kafka/tools/GetOffsetShellTest.java

@ -0,0 +1,376 @@ @@ -0,0 +1,376 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.tools;
import kafka.test.ClusterInstance;
import kafka.test.annotation.ClusterTest;
import kafka.test.annotation.ClusterTestDefaults;
import kafka.test.annotation.Type;
import kafka.test.junit.ClusterTestExtensions;
import org.apache.kafka.clients.CommonClientConfigs;
import org.apache.kafka.clients.admin.Admin;
import org.apache.kafka.clients.admin.NewTopic;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.serialization.StringSerializer;
import org.apache.kafka.common.utils.Exit;
import org.junit.jupiter.api.Tag;
import org.junit.jupiter.api.extension.ExtendWith;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.Objects;
import java.util.Properties;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
import java.util.stream.Stream;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertTrue;
@ExtendWith(value = ClusterTestExtensions.class)
@ClusterTestDefaults(clusterType = Type.ZK)
@Tag("integration")
public class GetOffsetShellTest {
private final int topicCount = 4;
private final int offsetTopicPartitionCount = 4;
private final ClusterInstance cluster;
public GetOffsetShellTest(ClusterInstance cluster) {
this.cluster = cluster;
}
private String getTopicName(int i) {
return "topic" + i;
}
public void setUp() {
cluster.config().serverProperties().put("auto.create.topics.enable", false);
cluster.config().serverProperties().put("offsets.topic.replication.factor", "1");
cluster.config().serverProperties().put("offsets.topic.num.partitions", String.valueOf(offsetTopicPartitionCount));
try (Admin admin = Admin.create(cluster.config().adminClientProperties())) {
List<NewTopic> topics = new ArrayList<>();
IntStream.range(0, topicCount + 1).forEach(i -> topics.add(new NewTopic(getTopicName(i), i, (short) 1)));
admin.createTopics(topics);
}
Properties props = new Properties();
props.put(CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG, cluster.config().producerProperties().get("bootstrap.servers"));
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
try (KafkaProducer<String, String> producer = new KafkaProducer<>(props)) {
IntStream.range(0, topicCount + 1)
.forEach(i -> IntStream.range(0, i * i)
.forEach(msgCount -> producer.send(
new ProducerRecord<>(getTopicName(i), msgCount % i, null, "val" + msgCount)))
);
}
}
static class Row {
private String name;
private int partition;
private Long timestamp;
public Row(String name, int partition, Long timestamp) {
this.name = name;
this.partition = partition;
this.timestamp = timestamp;
}
@Override
public boolean equals(Object o) {
if (o == this) return true;
if (!(o instanceof Row)) return false;
Row r = (Row) o;
return name.equals(r.name) && partition == r.partition && Objects.equals(timestamp, r.timestamp);
}
@Override
public int hashCode() {
return Objects.hash(name, partition, timestamp);
}
}
@ClusterTest
public void testNoFilterOptions() {
setUp();
List<Row> output = executeAndParse();
assertEquals(expectedOffsetsWithInternal(), output);
}
@ClusterTest
public void testInternalExcluded() {
setUp();
List<Row> output = executeAndParse("--exclude-internal-topics");
assertEquals(expectedTestTopicOffsets(), output);
}
@ClusterTest
public void testTopicNameArg() {
setUp();
IntStream.range(1, topicCount + 1).forEach(i -> {
List<Row> offsets = executeAndParse("--topic", getTopicName(i));
assertEquals(expectedOffsetsForTopic(i), offsets, () -> "Offset output did not match for " + getTopicName(i));
});
}
@ClusterTest
public void testTopicPatternArg() {
setUp();
List<Row> offsets = executeAndParse("--topic", "topic.*");
assertEquals(expectedTestTopicOffsets(), offsets);
}
@ClusterTest
public void testPartitionsArg() {
setUp();
List<Row> offsets = executeAndParse("--partitions", "0,1");
assertEquals(expectedOffsetsWithInternal().stream().filter(r -> r.partition <= 1).collect(Collectors.toList()), offsets);
}
@ClusterTest
public void testTopicPatternArgWithPartitionsArg() {
setUp();
List<Row> offsets = executeAndParse("--topic", "topic.*", "--partitions", "0,1");
assertEquals(expectedTestTopicOffsets().stream().filter(r -> r.partition <= 1).collect(Collectors.toList()), offsets);
}
@ClusterTest
public void testTopicPartitionsArg() {
setUp();
List<Row> offsets = executeAndParse("--topic-partitions", "topic1:0,topic2:1,topic(3|4):2,__.*:3");
List<Row> expected = Arrays.asList(
new Row("__consumer_offsets", 3, 0L),
new Row("topic1", 0, 1L),
new Row("topic2", 1, 2L),
new Row("topic3", 2, 3L),
new Row("topic4", 2, 4L)
);
assertEquals(expected, offsets);
}
@ClusterTest
public void testGetLatestOffsets() {
setUp();
for (String time : new String[] {"-1", "latest"}) {
List<Row> offsets = executeAndParse("--topic-partitions", "topic.*:0", "--time", time);
List<Row> expected = Arrays.asList(
new Row("topic1", 0, 1L),
new Row("topic2", 0, 2L),
new Row("topic3", 0, 3L),
new Row("topic4", 0, 4L)
);
assertEquals(expected, offsets);
}
}
@ClusterTest
public void testGetEarliestOffsets() {
setUp();
for (String time : new String[] {"-2", "earliest"}) {
List<Row> offsets = executeAndParse("--topic-partitions", "topic.*:0", "--time", time);
List<Row> expected = Arrays.asList(
new Row("topic1", 0, 0L),
new Row("topic2", 0, 0L),
new Row("topic3", 0, 0L),
new Row("topic4", 0, 0L)
);
assertEquals(expected, offsets);
}
}
@ClusterTest
public void testGetOffsetsByMaxTimestamp() {
setUp();
for (String time : new String[] {"-3", "max-timestamp"}) {
List<Row> offsets = executeAndParse("--topic-partitions", "topic.*", "--time", time);
offsets.forEach(
row -> assertTrue(row.timestamp >= 0 && row.timestamp <= Integer.parseInt(row.name.replace("topic", "")))
);
}
}
@ClusterTest
public void testGetOffsetsByTimestamp() {
setUp();
String time = String.valueOf(System.currentTimeMillis() / 2);
List<Row> offsets = executeAndParse("--topic-partitions", "topic.*:0", "--time", time);
List<Row> expected = Arrays.asList(
new Row("topic1", 0, 0L),
new Row("topic2", 0, 0L),
new Row("topic3", 0, 0L),
new Row("topic4", 0, 0L)
);
assertEquals(expected, offsets);
}
@ClusterTest
public void testNoOffsetIfTimestampGreaterThanLatestRecord() {
setUp();
String time = String.valueOf(System.currentTimeMillis() * 2);
List<Row> offsets = executeAndParse("--topic-partitions", "topic.*", "--time", time);
assertEquals(new ArrayList<Row>(), offsets);
}
@ClusterTest
public void testTopicPartitionsArgWithInternalExcluded() {
setUp();
List<Row> offsets = executeAndParse("--topic-partitions", "topic1:0,topic2:1,topic(3|4):2,__.*:3", "--exclude-internal-topics");
List<Row> expected = Arrays.asList(
new Row("topic1", 0, 1L),
new Row("topic2", 1, 2L),
new Row("topic3", 2, 3L),
new Row("topic4", 2, 4L)
);
assertEquals(expected, offsets);
}
@ClusterTest
public void testTopicPartitionsArgWithInternalIncluded() {
setUp();
List<Row> offsets = executeAndParse("--topic-partitions", "__.*:0");
assertEquals(Arrays.asList(new Row("__consumer_offsets", 0, 0L)), offsets);
}
@ClusterTest
public void testTopicPartitionsNotFoundForNonExistentTopic() {
assertExitCodeIsOne("--topic", "some_nonexistent_topic");
}
@ClusterTest
public void testTopicPartitionsNotFoundForExcludedInternalTopic() {
assertExitCodeIsOne("--topic", "some_nonexistent_topic:*");
}
@ClusterTest
public void testTopicPartitionsNotFoundForNonMatchingTopicPartitionPattern() {
assertExitCodeIsOne("--topic-partitions", "__consumer_offsets", "--exclude-internal-topics");
}
@ClusterTest
public void testTopicPartitionsFlagWithTopicFlagCauseExit() {
assertExitCodeIsOne("--topic-partitions", "__consumer_offsets", "--topic", "topic1");
}
@ClusterTest
public void testTopicPartitionsFlagWithPartitionsFlagCauseExit() {
assertExitCodeIsOne("--topic-partitions", "__consumer_offsets", "--partitions", "0");
}
private void assertExitCodeIsOne(String... args) {
final int[] exitStatus = new int[1];
Exit.setExitProcedure((statusCode, message) -> {
exitStatus[0] = statusCode;
throw new RuntimeException();
});
try {
GetOffsetShell.main(addBootstrapServer(args));
} catch (RuntimeException ignored) {
} finally {
Exit.resetExitProcedure();
}
assertEquals(1, exitStatus[0]);
}
private List<Row> expectedOffsetsWithInternal() {
List<Row> consOffsets = IntStream.range(0, offsetTopicPartitionCount + 1)
.mapToObj(i -> new Row("__consumer_offsets", i, 0L))
.collect(Collectors.toList());
return Stream.concat(consOffsets.stream(), expectedTestTopicOffsets().stream()).collect(Collectors.toList());
}
private List<Row> expectedTestTopicOffsets() {
List<Row> offsets = new ArrayList<>(topicCount + 1);
for (int i = 0; i < topicCount + 1; i++) {
offsets.addAll(expectedOffsetsForTopic(i));
}
return offsets;
}
private List<Row> expectedOffsetsForTopic(int i) {
String name = getTopicName(i);
return IntStream.range(0, i).mapToObj(p -> new Row(name, p, (long) i)).collect(Collectors.toList());
}
private List<Row> executeAndParse(String... args) {
String out = ToolsTestUtils.captureStandardOut(() -> GetOffsetShell.mainNoExit(addBootstrapServer(args)));
return Arrays.stream(out.split(System.lineSeparator()))
.map(i -> i.split(":"))
.filter(i -> i.length >= 2)
.map(line -> new Row(line[0], Integer.parseInt(line[1]), (line.length == 2 || line[2].isEmpty()) ? null : Long.parseLong(line[2])))
.collect(Collectors.toList());
}
private String[] addBootstrapServer(String... args) {
ArrayList<String> newArgs = new ArrayList<>(Arrays.asList(args));
newArgs.add("--bootstrap-server");
newArgs.add(cluster.bootstrapServers());
return newArgs.toArray(new String[0]);
}
}
Loading…
Cancel
Save