Browse Source

KAFKA-14591: Move DeleteRecordsCommand to tools (#13278)

Reviewers: Mickael Maison <mickael.maison@gmail.com>, Federico Valeri <fedevaleri@gmail.com>
pull/14069/head
Nikolay 1 year ago committed by GitHub
parent
commit
4bba2c8a32
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 2
      bin/kafka-delete-records.sh
  2. 2
      bin/windows/kafka-delete-records.bat
  3. 2
      core/src/main/scala/kafka/admin/AdminUtils.scala
  4. 137
      core/src/main/scala/kafka/admin/DeleteRecordsCommand.scala
  5. 2
      core/src/main/scala/kafka/admin/LeaderElectionCommand.scala
  6. 2
      core/src/main/scala/kafka/admin/ReassignPartitionsCommand.scala
  7. 2
      core/src/main/scala/kafka/admin/TopicCommand.scala
  8. 3
      core/src/main/scala/kafka/controller/KafkaController.scala
  9. 3
      core/src/main/scala/kafka/server/ZkAdminManager.scala
  10. 3
      core/src/main/scala/kafka/zk/AdminZkClient.scala
  11. 1
      core/src/test/scala/unit/kafka/admin/AddPartitionsTest.scala
  12. 2
      core/src/test/scala/unit/kafka/admin/LeaderElectionCommandErrorTest.scala
  13. 3
      core/src/test/scala/unit/kafka/admin/LeaderElectionCommandTest.scala
  14. 2
      core/src/test/scala/unit/kafka/admin/ReassignPartitionsUnitTest.scala
  15. 2
      core/src/test/scala/unit/kafka/admin/TopicCommandTest.scala
  16. 2
      core/src/test/scala/unit/kafka/server/DynamicConfigTest.scala
  17. 1
      core/src/test/scala/unit/kafka/zk/AdminZkClientTest.scala
  18. 21
      server-common/src/main/java/org/apache/kafka/server/common/AdminCommandFailedException.java
  19. 19
      server-common/src/main/java/org/apache/kafka/server/common/AdminOperationException.java
  20. 183
      tools/src/main/java/org/apache/kafka/tools/DeleteRecordsCommand.java
  21. 182
      tools/src/test/java/org/apache/kafka/tools/DeleteRecordsCommandTest.java

2
bin/kafka-delete-records.sh

@ -14,4 +14,4 @@ @@ -14,4 +14,4 @@
# See the License for the specific language governing permissions and
# limitations under the License.
exec $(dirname $0)/kafka-run-class.sh kafka.admin.DeleteRecordsCommand "$@"
exec $(dirname $0)/kafka-run-class.sh org.apache.kafka.tools.DeleteRecordsCommand "$@"

2
bin/windows/kafka-delete-records.bat

@ -14,4 +14,4 @@ rem WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. @@ -14,4 +14,4 @@ rem WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
rem See the License for the specific language governing permissions and
rem limitations under the License.
"%~dp0kafka-run-class.bat" kafka.admin.DeleteRecordsCommand %*
"%~dp0kafka-run-class.bat" org.apache.kafka.tools.DeleteRecordsCommand %*

2
core/src/main/scala/kafka/admin/AdminUtils.scala

@ -18,9 +18,9 @@ @@ -18,9 +18,9 @@
package kafka.admin
import java.util.Random
import kafka.utils.Logging
import org.apache.kafka.common.errors.{InvalidPartitionsException, InvalidReplicationFactorException}
import org.apache.kafka.server.common.AdminOperationException
import collection.{Map, mutable, _}

137
core/src/main/scala/kafka/admin/DeleteRecordsCommand.scala

@ -1,137 +0,0 @@ @@ -1,137 +0,0 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package kafka.admin
import java.io.PrintStream
import java.util.Properties
import kafka.common.AdminCommandFailedException
import kafka.utils.json.JsonValue
import kafka.utils.{CoreUtils, Json}
import org.apache.kafka.clients.admin.{Admin, RecordsToDelete}
import org.apache.kafka.clients.CommonClientConfigs
import org.apache.kafka.common.TopicPartition
import org.apache.kafka.common.utils.Utils
import org.apache.kafka.server.util.{CommandDefaultOptions, CommandLineUtils}
import scala.jdk.CollectionConverters._
import scala.collection.Seq
/**
* A command for delete records of the given partitions down to the specified offset.
*/
object DeleteRecordsCommand {
private[admin] val EarliestVersion = 1
def main(args: Array[String]): Unit = {
execute(args, System.out)
}
def parseOffsetJsonStringWithoutDedup(jsonData: String): Seq[(TopicPartition, Long)] = {
Json.parseFull(jsonData) match {
case Some(js) =>
val version = js.asJsonObject.get("version") match {
case Some(jsonValue) => jsonValue.to[Int]
case None => EarliestVersion
}
parseJsonData(version, js)
case None => throw new AdminOperationException("The input string is not a valid JSON")
}
}
def parseJsonData(version: Int, js: JsonValue): Seq[(TopicPartition, Long)] = {
version match {
case 1 =>
js.asJsonObject.get("partitions") match {
case Some(partitions) =>
partitions.asJsonArray.iterator.map(_.asJsonObject).map { partitionJs =>
val topic = partitionJs("topic").to[String]
val partition = partitionJs("partition").to[Int]
val offset = partitionJs("offset").to[Long]
new TopicPartition(topic, partition) -> offset
}.toBuffer
case _ => throw new AdminOperationException("Missing partitions field");
}
case _ => throw new AdminOperationException(s"Not supported version field value $version")
}
}
def execute(args: Array[String], out: PrintStream): Unit = {
val opts = new DeleteRecordsCommandOptions(args)
val adminClient = createAdminClient(opts)
val offsetJsonFile = opts.options.valueOf(opts.offsetJsonFileOpt)
val offsetJsonString = Utils.readFileAsString(offsetJsonFile)
val offsetSeq = parseOffsetJsonStringWithoutDedup(offsetJsonString)
val duplicatePartitions = CoreUtils.duplicates(offsetSeq.map { case (tp, _) => tp })
if (duplicatePartitions.nonEmpty)
throw new AdminCommandFailedException("Offset json file contains duplicate topic partitions: %s".format(duplicatePartitions.mkString(",")))
val recordsToDelete = offsetSeq.map { case (topicPartition, offset) =>
(topicPartition, RecordsToDelete.beforeOffset(offset))
}.toMap.asJava
out.println("Executing records delete operation")
val deleteRecordsResult = adminClient.deleteRecords(recordsToDelete)
out.println("Records delete operation completed:")
deleteRecordsResult.lowWatermarks.forEach { (tp, partitionResult) =>
try out.println(s"partition: $tp\tlow_watermark: ${partitionResult.get.lowWatermark}")
catch {
case e: Exception => out.println(s"partition: $tp\terror: ${e.getMessage}")
}
}
adminClient.close()
}
private def createAdminClient(opts: DeleteRecordsCommandOptions): Admin = {
val props = if (opts.options.has(opts.commandConfigOpt))
Utils.loadProps(opts.options.valueOf(opts.commandConfigOpt))
else
new Properties()
props.put(CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG, opts.options.valueOf(opts.bootstrapServerOpt))
Admin.create(props)
}
class DeleteRecordsCommandOptions(args: Array[String]) extends CommandDefaultOptions(args) {
val BootstrapServerDoc = "REQUIRED: The server to connect to."
val offsetJsonFileDoc = "REQUIRED: The JSON file with offset per partition. The format to use is:\n" +
"{\"partitions\":\n [{\"topic\": \"foo\", \"partition\": 1, \"offset\": 1}],\n \"version\":1\n}"
val CommandConfigDoc = "A property file containing configs to be passed to Admin Client."
val bootstrapServerOpt = parser.accepts("bootstrap-server", BootstrapServerDoc)
.withRequiredArg
.describedAs("server(s) to use for bootstrapping")
.ofType(classOf[String])
val offsetJsonFileOpt = parser.accepts("offset-json-file", offsetJsonFileDoc)
.withRequiredArg
.describedAs("Offset json file path")
.ofType(classOf[String])
val commandConfigOpt = parser.accepts("command-config", CommandConfigDoc)
.withRequiredArg
.describedAs("command config property file path")
.ofType(classOf[String])
options = parser.parse(args : _*)
CommandLineUtils.maybePrintHelpOrVersion(this, "This tool helps to delete records of the given partitions down to the specified offset.")
CommandLineUtils.checkRequiredArgs(parser, options, bootstrapServerOpt, offsetJsonFileOpt)
}
}

2
core/src/main/scala/kafka/admin/LeaderElectionCommand.scala

@ -19,7 +19,6 @@ package kafka.admin @@ -19,7 +19,6 @@ package kafka.admin
import java.util.Properties
import java.util.concurrent.ExecutionException
import joptsimple.util.EnumConverter
import kafka.common.AdminCommandFailedException
import kafka.utils.CoreUtils
import kafka.utils.Implicits._
import kafka.utils.Json
@ -31,6 +30,7 @@ import org.apache.kafka.common.errors.ClusterAuthorizationException @@ -31,6 +30,7 @@ import org.apache.kafka.common.errors.ClusterAuthorizationException
import org.apache.kafka.common.errors.ElectionNotNeededException
import org.apache.kafka.common.errors.TimeoutException
import org.apache.kafka.common.utils.Utils
import org.apache.kafka.server.common.{AdminCommandFailedException, AdminOperationException}
import org.apache.kafka.server.util.{CommandDefaultOptions, CommandLineUtils}
import scala.jdk.CollectionConverters._

2
core/src/main/scala/kafka/admin/ReassignPartitionsCommand.scala

@ -19,7 +19,6 @@ package kafka.admin @@ -19,7 +19,6 @@ package kafka.admin
import java.util
import java.util.Optional
import java.util.concurrent.ExecutionException
import kafka.common.AdminCommandFailedException
import kafka.server.DynamicConfig
import kafka.utils.{CoreUtils, Exit, Json, Logging}
import kafka.utils.Implicits._
@ -30,6 +29,7 @@ import org.apache.kafka.common.config.ConfigResource @@ -30,6 +29,7 @@ import org.apache.kafka.common.config.ConfigResource
import org.apache.kafka.common.errors.{ReplicaNotAvailableException, UnknownTopicOrPartitionException}
import org.apache.kafka.common.utils.{Time, Utils}
import org.apache.kafka.common.{KafkaException, KafkaFuture, TopicPartition, TopicPartitionReplica}
import org.apache.kafka.server.common.{AdminCommandFailedException, AdminOperationException}
import org.apache.kafka.server.util.{CommandDefaultOptions, CommandLineUtils}
import org.apache.kafka.storage.internals.log.LogConfig

2
core/src/main/scala/kafka/admin/TopicCommand.scala

@ -20,7 +20,6 @@ package kafka.admin @@ -20,7 +20,6 @@ package kafka.admin
import java.util
import java.util.{Collections, Optional, Properties}
import joptsimple._
import kafka.common.AdminCommandFailedException
import kafka.utils._
import org.apache.kafka.clients.CommonClientConfigs
import org.apache.kafka.clients.admin.CreatePartitionsOptions
@ -33,6 +32,7 @@ import org.apache.kafka.common.config.{ConfigResource, TopicConfig} @@ -33,6 +32,7 @@ import org.apache.kafka.common.config.{ConfigResource, TopicConfig}
import org.apache.kafka.common.errors.{ClusterAuthorizationException, TopicExistsException, UnsupportedVersionException}
import org.apache.kafka.common.internals.Topic
import org.apache.kafka.common.utils.Utils
import org.apache.kafka.server.common.{AdminCommandFailedException, AdminOperationException}
import org.apache.kafka.server.util.{CommandDefaultOptions, CommandLineUtils}
import org.apache.kafka.storage.internals.log.LogConfig
import org.apache.kafka.server.util.TopicFilter.IncludeList

3
core/src/main/scala/kafka/controller/KafkaController.scala

@ -19,7 +19,6 @@ package kafka.controller @@ -19,7 +19,6 @@ package kafka.controller
import com.yammer.metrics.core.Timer
import java.util.concurrent.TimeUnit
import kafka.admin.AdminOperationException
import kafka.api._
import kafka.common._
import kafka.cluster.Broker
@ -45,7 +44,7 @@ import org.apache.kafka.common.protocol.Errors @@ -45,7 +44,7 @@ import org.apache.kafka.common.protocol.Errors
import org.apache.kafka.common.requests.{AbstractControlRequest, ApiError, LeaderAndIsrResponse, UpdateFeaturesRequest, UpdateMetadataResponse}
import org.apache.kafka.common.utils.{Time, Utils}
import org.apache.kafka.metadata.LeaderRecoveryState
import org.apache.kafka.server.common.ProducerIdsBlock
import org.apache.kafka.server.common.{AdminOperationException, ProducerIdsBlock}
import org.apache.kafka.server.metrics.KafkaMetricsGroup
import org.apache.kafka.server.util.KafkaScheduler
import org.apache.zookeeper.KeeperException

3
core/src/main/scala/kafka/server/ZkAdminManager.scala

@ -18,7 +18,7 @@ package kafka.server @@ -18,7 +18,7 @@ package kafka.server
import java.util
import java.util.Properties
import kafka.admin.{AdminOperationException, AdminUtils}
import kafka.admin.AdminUtils
import kafka.common.TopicAlreadyMarkedForDeletionException
import kafka.server.ConfigAdminManager.{prepareIncrementalConfigs, toLoggableProps}
import kafka.server.DynamicConfig.QuotaConfigs
@ -47,6 +47,7 @@ import org.apache.kafka.common.requests.CreateTopicsRequest._ @@ -47,6 +47,7 @@ import org.apache.kafka.common.requests.CreateTopicsRequest._
import org.apache.kafka.common.requests.{AlterConfigsRequest, ApiError}
import org.apache.kafka.common.security.scram.internals.{ScramCredentialUtils, ScramFormatter}
import org.apache.kafka.common.utils.Sanitizer
import org.apache.kafka.server.common.AdminOperationException
import org.apache.kafka.storage.internals.log.LogConfig
import scala.collection.{Map, mutable, _}

3
core/src/main/scala/kafka/zk/AdminZkClient.scala

@ -17,7 +17,7 @@ @@ -17,7 +17,7 @@
package kafka.zk
import java.util.Properties
import kafka.admin.{AdminOperationException, AdminUtils, BrokerMetadata, RackAwareMode}
import kafka.admin.{AdminUtils, BrokerMetadata, RackAwareMode}
import kafka.common.TopicAlreadyMarkedForDeletionException
import kafka.controller.ReplicaAssignment
import kafka.server.{ConfigEntityName, ConfigType, DynamicConfig}
@ -26,6 +26,7 @@ import kafka.utils.Implicits._ @@ -26,6 +26,7 @@ import kafka.utils.Implicits._
import org.apache.kafka.common.{TopicPartition, Uuid}
import org.apache.kafka.common.errors._
import org.apache.kafka.common.internals.Topic
import org.apache.kafka.server.common.AdminOperationException
import org.apache.kafka.storage.internals.log.LogConfig
import org.apache.zookeeper.KeeperException.NodeExistsException

1
core/src/test/scala/unit/kafka/admin/AddPartitionsTest.scala

@ -26,6 +26,7 @@ import org.apache.kafka.clients.admin.{Admin, NewPartitions, NewTopic} @@ -26,6 +26,7 @@ import org.apache.kafka.clients.admin.{Admin, NewPartitions, NewTopic}
import org.apache.kafka.common.errors.InvalidReplicaAssignmentException
import org.apache.kafka.common.requests.MetadataResponse.TopicMetadata
import org.apache.kafka.common.requests.{MetadataRequest, MetadataResponse}
import org.apache.kafka.server.common.AdminOperationException
import org.junit.jupiter.api.Assertions._
import org.junit.jupiter.api.{BeforeEach, TestInfo}
import org.junit.jupiter.params.ParameterizedTest

2
core/src/test/scala/unit/kafka/admin/LeaderElectionCommandErrorTest.scala

@ -16,8 +16,8 @@ @@ -16,8 +16,8 @@
*/
package kafka.admin
import kafka.common.AdminCommandFailedException
import org.apache.kafka.common.errors.TimeoutException
import org.apache.kafka.server.common.AdminCommandFailedException
import org.junit.jupiter.api.Assertions._
import org.junit.jupiter.api.Test

3
core/src/test/scala/unit/kafka/admin/LeaderElectionCommandTest.scala

@ -18,8 +18,6 @@ package kafka.admin @@ -18,8 +18,6 @@ package kafka.admin
import java.nio.charset.StandardCharsets
import java.nio.file.{Files, Path}
import kafka.common.AdminCommandFailedException
import kafka.server.IntegrationTestUtils.createTopic
import kafka.server.{KafkaConfig, KafkaServer}
import kafka.test.annotation.{ClusterTest, ClusterTestDefaults, Type}
@ -29,6 +27,7 @@ import kafka.utils.TestUtils @@ -29,6 +27,7 @@ import kafka.utils.TestUtils
import org.apache.kafka.clients.admin.AdminClientConfig
import org.apache.kafka.common.TopicPartition
import org.apache.kafka.common.errors.UnknownTopicOrPartitionException
import org.apache.kafka.server.common.AdminCommandFailedException
import org.junit.jupiter.api.Assertions._
import org.junit.jupiter.api.extension.ExtendWith
import org.junit.jupiter.api.{BeforeEach, Tag}

2
core/src/test/scala/unit/kafka/admin/ReassignPartitionsUnitTest.scala

@ -20,12 +20,12 @@ package kafka.admin @@ -20,12 +20,12 @@ package kafka.admin
import java.util.concurrent.ExecutionException
import java.util.{Arrays, Collections}
import kafka.admin.ReassignPartitionsCommand._
import kafka.common.AdminCommandFailedException
import kafka.utils.Exit
import org.apache.kafka.clients.admin.{Config, MockAdminClient, PartitionReassignment}
import org.apache.kafka.common.config.ConfigResource
import org.apache.kafka.common.errors.{InvalidReplicationFactorException, UnknownTopicOrPartitionException}
import org.apache.kafka.common.{Node, TopicPartition, TopicPartitionInfo, TopicPartitionReplica}
import org.apache.kafka.server.common.{AdminCommandFailedException, AdminOperationException}
import org.junit.jupiter.api.Assertions.{assertEquals, assertFalse, assertThrows, assertTrue}
import org.junit.jupiter.api.{AfterEach, BeforeEach, Test, Timeout}

2
core/src/test/scala/unit/kafka/admin/TopicCommandTest.scala

@ -17,13 +17,13 @@ @@ -17,13 +17,13 @@
package kafka.admin
import kafka.admin.TopicCommand.{PartitionDescription, TopicCommandOptions, TopicService}
import kafka.common.AdminCommandFailedException
import kafka.utils.Exit
import org.apache.kafka.clients.admin.{Admin, AdminClientTestUtils, CreatePartitionsOptions, CreateTopicsOptions, DeleteTopicsOptions, NewPartitions, NewTopic, PartitionReassignment, TopicDescription}
import org.apache.kafka.common.Node
import org.apache.kafka.common.TopicPartitionInfo
import org.apache.kafka.common.errors.ThrottlingQuotaExceededException
import org.apache.kafka.common.protocol.Errors
import org.apache.kafka.server.common.{AdminCommandFailedException, AdminOperationException}
import org.junit.jupiter.api.Assertions._
import org.junit.jupiter.api.Test
import org.mockito.ArgumentMatcher

2
core/src/test/scala/unit/kafka/server/DynamicConfigTest.scala

@ -16,11 +16,11 @@ @@ -16,11 +16,11 @@
*/
package kafka.server
import kafka.admin.AdminOperationException
import kafka.utils.CoreUtils._
import kafka.server.QuorumTestHarness
import org.apache.kafka.common.config._
import org.apache.kafka.common.config.internals.QuotaConfigs
import org.apache.kafka.server.common.AdminOperationException
import org.junit.jupiter.api.Assertions.assertThrows
import org.junit.jupiter.api.Test

1
core/src/test/scala/unit/kafka/zk/AdminZkClientTest.scala

@ -31,6 +31,7 @@ import org.apache.kafka.common.config.TopicConfig @@ -31,6 +31,7 @@ import org.apache.kafka.common.config.TopicConfig
import org.apache.kafka.common.config.internals.QuotaConfigs
import org.apache.kafka.common.errors.{InvalidReplicaAssignmentException, InvalidTopicException, TopicExistsException}
import org.apache.kafka.common.metrics.Quota
import org.apache.kafka.server.common.AdminOperationException
import org.apache.kafka.storage.internals.log.LogConfig
import org.apache.kafka.test.{TestUtils => JTestUtils}
import org.junit.jupiter.api.Assertions._

21
core/src/main/scala/kafka/admin/AdminOperationException.scala → server-common/src/main/java/org/apache/kafka/server/common/AdminCommandFailedException.java

@ -1,10 +1,10 @@ @@ -1,10 +1,10 @@
/**
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
@ -15,9 +15,14 @@ @@ -15,9 +15,14 @@
* limitations under the License.
*/
package kafka.admin
package org.apache.kafka.server.common;
class AdminOperationException(val error: String, cause: Throwable) extends RuntimeException(error, cause) {
def this(error: Throwable) = this(error.getMessage, error)
def this(msg: String) = this(msg, null)
}
public class AdminCommandFailedException extends RuntimeException {
public AdminCommandFailedException(String message) {
super(message);
}
public AdminCommandFailedException(String message, Throwable cause) {
super(message, cause);
}
}

19
core/src/main/scala/kafka/common/AdminCommandFailedException.scala → server-common/src/main/java/org/apache/kafka/server/common/AdminOperationException.java

@ -1,10 +1,10 @@ @@ -1,10 +1,10 @@
/**
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
@ -15,9 +15,14 @@ @@ -15,9 +15,14 @@
* limitations under the License.
*/
package kafka.common
package org.apache.kafka.server.common;
class AdminCommandFailedException(message: String, cause: Throwable) extends RuntimeException(message, cause) {
def this(message: String) = this(message, null)
def this() = this(null, null)
public class AdminOperationException extends RuntimeException {
public AdminOperationException(String message) {
super(message);
}
public AdminOperationException(Throwable cause) {
super(cause.getMessage(), cause);
}
}

183
tools/src/main/java/org/apache/kafka/tools/DeleteRecordsCommand.java

@ -0,0 +1,183 @@ @@ -0,0 +1,183 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.tools;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.JsonMappingException;
import joptsimple.OptionSpec;
import org.apache.kafka.clients.CommonClientConfigs;
import org.apache.kafka.clients.admin.Admin;
import org.apache.kafka.clients.admin.DeleteRecordsResult;
import org.apache.kafka.clients.admin.RecordsToDelete;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.utils.Utils;
import org.apache.kafka.server.common.AdminCommandFailedException;
import org.apache.kafka.server.common.AdminOperationException;
import org.apache.kafka.server.util.CommandDefaultOptions;
import org.apache.kafka.server.util.CommandLineUtils;
import org.apache.kafka.server.util.Json;
import org.apache.kafka.server.util.json.DecodeJson;
import org.apache.kafka.server.util.json.JsonObject;
import org.apache.kafka.server.util.json.JsonValue;
import java.io.IOException;
import java.io.PrintStream;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Properties;
import java.util.Set;
import java.util.StringJoiner;
import java.util.concurrent.ExecutionException;
import java.util.stream.Collectors;
/**
* A command for delete records of the given partitions down to the specified offset.
*/
public class DeleteRecordsCommand {
private static final int EARLIEST_VERSION = 1;
private static final DecodeJson.DecodeInteger INT = new DecodeJson.DecodeInteger();
private static final DecodeJson.DecodeLong LONG = new DecodeJson.DecodeLong();
private static final DecodeJson.DecodeString STRING = new DecodeJson.DecodeString();
public static void main(String[] args) throws Exception {
execute(args, System.out);
}
static Map<TopicPartition, List<Long>> parseOffsetJsonStringWithoutDedup(String jsonData) throws JsonProcessingException {
JsonValue js = Json.parseFull(jsonData)
.orElseThrow(() -> new AdminOperationException("The input string is not a valid JSON"));
Optional<JsonValue> version = js.asJsonObject().get("version");
return parseJsonData(version.isPresent() ? version.get().to(INT) : EARLIEST_VERSION, js);
}
private static Map<TopicPartition, List<Long>> parseJsonData(int version, JsonValue js) throws JsonMappingException {
if (version == 1) {
JsonValue partitions = js.asJsonObject().get("partitions")
.orElseThrow(() -> new AdminOperationException("Missing partitions field"));
Map<TopicPartition, List<Long>> res = new HashMap<>();
Iterator<JsonValue> iterator = partitions.asJsonArray().iterator();
while (iterator.hasNext()) {
JsonObject partitionJs = iterator.next().asJsonObject();
String topic = partitionJs.apply("topic").to(STRING);
int partition = partitionJs.apply("partition").to(INT);
long offset = partitionJs.apply("offset").to(LONG);
res.computeIfAbsent(new TopicPartition(topic, partition), k -> new ArrayList<>()).add(offset);
}
return res;
}
throw new AdminOperationException("Not supported version field value " + version);
}
public static void execute(String[] args, PrintStream out) throws IOException {
DeleteRecordsCommandOptions opts = new DeleteRecordsCommandOptions(args);
try (Admin adminClient = createAdminClient(opts)) {
execute(adminClient, Utils.readFileAsString(opts.options.valueOf(opts.offsetJsonFileOpt)), out);
}
}
static void execute(Admin adminClient, String offsetJsonString, PrintStream out) throws JsonProcessingException {
Map<TopicPartition, List<Long>> offsetSeq = parseOffsetJsonStringWithoutDedup(offsetJsonString);
Set<TopicPartition> duplicatePartitions = offsetSeq.entrySet().stream()
.filter(e -> e.getValue().size() > 1)
.map(Map.Entry::getKey)
.collect(Collectors.toSet());
if (!duplicatePartitions.isEmpty()) {
StringJoiner duplicates = new StringJoiner(",");
duplicatePartitions.forEach(tp -> duplicates.add(tp.toString()));
throw new AdminCommandFailedException(
String.format("Offset json file contains duplicate topic partitions: %s", duplicates)
);
}
Map<TopicPartition, RecordsToDelete> recordsToDelete = new HashMap<>();
for (Map.Entry<TopicPartition, List<Long>> e : offsetSeq.entrySet())
recordsToDelete.put(e.getKey(), RecordsToDelete.beforeOffset(e.getValue().get(0)));
out.println("Executing records delete operation");
DeleteRecordsResult deleteRecordsResult = adminClient.deleteRecords(recordsToDelete);
out.println("Records delete operation completed:");
deleteRecordsResult.lowWatermarks().forEach((tp, partitionResult) -> {
try {
out.printf("partition: %s\tlow_watermark: %s%n", tp, partitionResult.get().lowWatermark());
} catch (InterruptedException | ExecutionException e) {
out.printf("partition: %s\terror: %s%n", tp, e.getMessage());
}
});
}
private static Admin createAdminClient(DeleteRecordsCommandOptions opts) throws IOException {
Properties props = opts.options.has(opts.commandConfigOpt)
? Utils.loadProps(opts.options.valueOf(opts.commandConfigOpt))
: new Properties();
props.put(CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG, opts.options.valueOf(opts.bootstrapServerOpt));
return Admin.create(props);
}
private static class DeleteRecordsCommandOptions extends CommandDefaultOptions {
private final OptionSpec<String> bootstrapServerOpt;
private final OptionSpec<String> offsetJsonFileOpt;
private final OptionSpec<String> commandConfigOpt;
public DeleteRecordsCommandOptions(String[] args) {
super(args);
bootstrapServerOpt = parser.accepts("bootstrap-server", "REQUIRED: The server to connect to.")
.withRequiredArg()
.describedAs("server(s) to use for bootstrapping")
.ofType(String.class);
offsetJsonFileOpt = parser.accepts("offset-json-file", "REQUIRED: The JSON file with offset per partition. " +
"The format to use is:\n" +
"{\"partitions\":\n [{\"topic\": \"foo\", \"partition\": 1, \"offset\": 1}],\n \"version\":1\n}")
.withRequiredArg()
.describedAs("Offset json file path")
.ofType(String.class);
commandConfigOpt = parser.accepts("command-config", "A property file containing configs to be passed to Admin Client.")
.withRequiredArg()
.describedAs("command config property file path")
.ofType(String.class);
options = parser.parse(args);
CommandLineUtils.maybePrintHelpOrVersion(this, "This tool helps to delete records of the given partitions down to the specified offset.");
CommandLineUtils.checkRequiredArgs(parser, options, bootstrapServerOpt, offsetJsonFileOpt);
}
}
}

182
tools/src/test/java/org/apache/kafka/tools/DeleteRecordsCommandTest.java

@ -0,0 +1,182 @@ @@ -0,0 +1,182 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.tools;
import com.fasterxml.jackson.core.JsonProcessingException;
import kafka.test.ClusterInstance;
import kafka.test.annotation.ClusterTest;
import kafka.test.annotation.ClusterTestDefaults;
import kafka.test.annotation.Type;
import kafka.test.junit.ClusterTestExtensions;
import org.apache.kafka.clients.admin.Admin;
import org.apache.kafka.clients.admin.AdminClientConfig;
import org.apache.kafka.clients.admin.NewTopic;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.serialization.StringSerializer;
import org.apache.kafka.server.common.AdminCommandFailedException;
import org.apache.kafka.server.common.AdminOperationException;
import org.junit.jupiter.api.Tag;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.ExtendWith;
import java.io.IOException;
import java.nio.file.NoSuchFileException;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertThrows;
import static org.junit.jupiter.api.Assertions.assertTrue;
@ExtendWith(value = ClusterTestExtensions.class)
@ClusterTestDefaults(clusterType = Type.ALL)
@Tag("integration")
public class DeleteRecordsCommandTest {
private final ClusterInstance cluster;
public DeleteRecordsCommandTest(ClusterInstance cluster) {
this.cluster = cluster;
}
@ClusterTest
public void testCommand() throws Exception {
Properties adminProps = new Properties();
adminProps.put(AdminClientConfig.RETRIES_CONFIG, 1);
try (Admin admin = cluster.createAdminClient(adminProps)) {
assertThrows(
AdminCommandFailedException.class,
() -> DeleteRecordsCommand.execute(admin, "{\"partitions\":[" +
"{\"topic\":\"t\", \"partition\":0, \"offset\":1}," +
"{\"topic\":\"t\", \"partition\":0, \"offset\":1}]" +
"}", System.out),
"Offset json file contains duplicate topic partitions: t-0"
);
admin.createTopics(Collections.singleton(new NewTopic("t", 1, (short) 1))).all().get();
Properties props = new Properties();
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, cluster.bootstrapServers());
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
try (KafkaProducer<?, String> producer = new KafkaProducer<>(props)) {
producer.send(new ProducerRecord<>("t", "1")).get();
producer.send(new ProducerRecord<>("t", "2")).get();
producer.send(new ProducerRecord<>("t", "3")).get();
}
executeAndAssertOutput(
"{\"partitions\":[{\"topic\":\"t\", \"partition\":0, \"offset\":1}]}",
"partition: t-0\tlow_watermark: 1",
admin
);
executeAndAssertOutput(
"{\"partitions\":[{\"topic\":\"t\", \"partition\":42, \"offset\":42}]}",
"partition: t-42\terror",
admin
);
}
}
private static void executeAndAssertOutput(String json, String expOut, Admin admin) {
String output = ToolsTestUtils.captureStandardOut(() -> {
try {
DeleteRecordsCommand.execute(admin, json, System.out);
} catch (JsonProcessingException e) {
throw new RuntimeException(e);
}
});
assertTrue(output.contains(expOut));
}
}
/**
* Unit test of {@link DeleteRecordsCommand} tool.
*/
class DeleteRecordsCommandUnitTest {
@Test
public void testOffsetFileNotExists() {
assertThrows(IOException.class, () -> DeleteRecordsCommand.main(new String[]{
"--bootstrap-server", "localhost:9092",
"--offset-json-file", "/not/existing/file"
}));
}
@Test
public void testCommandConfigNotExists() {
assertThrows(NoSuchFileException.class, () -> DeleteRecordsCommand.main(new String[] {
"--bootstrap-server", "localhost:9092",
"--offset-json-file", "/not/existing/file",
"--command-config", "/another/not/existing/file"
}));
}
@Test
public void testWrongVersion() {
assertCommandThrows(JsonProcessingException.class, "{\"version\":\"string\"}");
assertCommandThrows(AdminOperationException.class, "{\"version\":2}");
}
@Test
public void testWrongPartitions() {
assertCommandThrows(AdminOperationException.class, "{\"version\":1}");
assertCommandThrows(JsonProcessingException.class, "{\"partitions\":2}");
assertCommandThrows(JsonProcessingException.class, "{\"partitions\":{}}");
assertCommandThrows(JsonProcessingException.class, "{\"partitions\":[{}]}");
assertCommandThrows(JsonProcessingException.class, "{\"partitions\":[{\"topic\":\"t\"}]}");
assertCommandThrows(JsonProcessingException.class, "{\"partitions\":[{\"topic\":\"t\", \"partition\": \"\"}]}");
assertCommandThrows(JsonProcessingException.class, "{\"partitions\":[{\"topic\":\"t\", \"partition\": 0}]}");
assertCommandThrows(JsonProcessingException.class, "{\"partitions\":[{\"topic\":\"t\", \"offset\":0}]}");
}
@Test
public void testParse() throws Exception {
Map<TopicPartition, List<Long>> res = DeleteRecordsCommand.parseOffsetJsonStringWithoutDedup(
"{\"partitions\":[" +
"{\"topic\":\"t\", \"partition\":0, \"offset\":0}," +
"{\"topic\":\"t\", \"partition\":1, \"offset\":1, \"ignored\":\"field\"}," +
"{\"topic\":\"t\", \"partition\":0, \"offset\":2}," +
"{\"topic\":\"t\", \"partition\":0, \"offset\":0}" +
"]}"
);
assertEquals(2, res.size());
assertEquals(Arrays.asList(0L, 2L, 0L), res.get(new TopicPartition("t", 0)));
assertEquals(Collections.singletonList(1L), res.get(new TopicPartition("t", 1)));
}
/**
* Asserts that {@link DeleteRecordsCommand#parseOffsetJsonStringWithoutDedup(String)} throws {@link AdminOperationException}.
* @param jsonData Data to check.
*/
private static void assertCommandThrows(Class<? extends Exception> expectedException, String jsonData) {
assertThrows(
expectedException,
() -> DeleteRecordsCommand.parseOffsetJsonStringWithoutDedup(jsonData)
);
}
}
Loading…
Cancel
Save