Browse Source
Add StorageTool as specified in KIP-631. It can format and describe storage directories. Fix a bug in `ZkMetaProperties#toString`. Reviewers: David Arthur <mumrah@gmail.com>pull/10089/head
Colin Patrick McCabe
4 years ago
committed by
GitHub
4 changed files with 445 additions and 3 deletions
@ -0,0 +1,17 @@ |
|||||||
|
#!/bin/bash |
||||||
|
# Licensed to the Apache Software Foundation (ASF) under one or more |
||||||
|
# contributor license agreements. See the NOTICE file distributed with |
||||||
|
# this work for additional information regarding copyright ownership. |
||||||
|
# The ASF licenses this file to You under the Apache License, Version 2.0 |
||||||
|
# (the "License"); you may not use this file except in compliance with |
||||||
|
# the License. You may obtain a copy of the License at |
||||||
|
# |
||||||
|
# http://www.apache.org/licenses/LICENSE-2.0 |
||||||
|
# |
||||||
|
# Unless required by applicable law or agreed to in writing, software |
||||||
|
# distributed under the License is distributed on an "AS IS" BASIS, |
||||||
|
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
||||||
|
# See the License for the specific language governing permissions and |
||||||
|
# limitations under the License. |
||||||
|
|
||||||
|
exec $(dirname $0)/kafka-run-class.sh kafka.tools.StorageTool "$@" |
@ -0,0 +1,238 @@ |
|||||||
|
/** |
||||||
|
* Licensed to the Apache Software Foundation (ASF) under one or more |
||||||
|
* contributor license agreements. See the NOTICE file distributed with |
||||||
|
* this work for additional information regarding copyright ownership. |
||||||
|
* The ASF licenses this file to You under the Apache License, Version 2.0 |
||||||
|
* (the "License"); you may not use this file except in compliance with |
||||||
|
* the License. You may obtain a copy of the License at |
||||||
|
* |
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0 |
||||||
|
* |
||||||
|
* Unless required by applicable law or agreed to in writing, software |
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS, |
||||||
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
||||||
|
* See the License for the specific language governing permissions and |
||||||
|
* limitations under the License. |
||||||
|
*/ |
||||||
|
|
||||||
|
package kafka.tools |
||||||
|
|
||||||
|
import java.io.PrintStream |
||||||
|
import java.nio.file.{Files, Paths} |
||||||
|
|
||||||
|
import kafka.server.{BrokerMetadataCheckpoint, KafkaConfig, MetaProperties, RawMetaProperties} |
||||||
|
import kafka.utils.{Exit, Logging} |
||||||
|
import net.sourceforge.argparse4j.ArgumentParsers |
||||||
|
import net.sourceforge.argparse4j.impl.Arguments.{store, storeTrue} |
||||||
|
import org.apache.kafka.common.Uuid |
||||||
|
import org.apache.kafka.common.utils.Utils |
||||||
|
|
||||||
|
import scala.collection.mutable |
||||||
|
|
||||||
|
object StorageTool extends Logging { |
||||||
|
def main(args: Array[String]): Unit = { |
||||||
|
try { |
||||||
|
val parser = ArgumentParsers. |
||||||
|
newArgumentParser("kafka-storage"). |
||||||
|
defaultHelp(true). |
||||||
|
description("The Kafka storage tool.") |
||||||
|
val subparsers = parser.addSubparsers().dest("command") |
||||||
|
|
||||||
|
val infoParser = subparsers.addParser("info"). |
||||||
|
help("Get information about the Kafka log directories on this node.") |
||||||
|
val formatParser = subparsers.addParser("format"). |
||||||
|
help("Format the Kafka log directories on this node.") |
||||||
|
subparsers.addParser("random-uuid").help("Print a random UUID.") |
||||||
|
List(infoParser, formatParser).foreach(parser => { |
||||||
|
parser.addArgument("--config", "-c"). |
||||||
|
action(store()). |
||||||
|
required(true). |
||||||
|
help("The Kafka configuration file to use.") |
||||||
|
}) |
||||||
|
formatParser.addArgument("--cluster-id", "-t"). |
||||||
|
action(store()). |
||||||
|
required(true). |
||||||
|
help("The cluster ID to use.") |
||||||
|
formatParser.addArgument("--ignore-formatted", "-g"). |
||||||
|
action(storeTrue()) |
||||||
|
|
||||||
|
val namespace = parser.parseArgsOrFail(args) |
||||||
|
val command = namespace.getString("command") |
||||||
|
val config = Option(namespace.getString("config")).flatMap( |
||||||
|
p => Some(new KafkaConfig(Utils.loadProps(p)))) |
||||||
|
|
||||||
|
command match { |
||||||
|
case "info" => |
||||||
|
val directories = configToLogDirectories(config.get) |
||||||
|
val kip500Mode = configToKip500Mode(config.get) |
||||||
|
Exit.exit(infoCommand(System.out, kip500Mode, directories)) |
||||||
|
|
||||||
|
case "format" => |
||||||
|
val directories = configToLogDirectories(config.get) |
||||||
|
val clusterId = namespace.getString("cluster_id") |
||||||
|
val metaProperties = buildMetadataProperties(clusterId, config.get) |
||||||
|
val ignoreFormatted = namespace.getBoolean("ignore_formatted") |
||||||
|
if (!configToKip500Mode(config.get)) { |
||||||
|
throw new TerseFailure("The kafka configuration file appears to be for " + |
||||||
|
"a legacy cluster. Formatting is only supported for kip-500 clusters.") |
||||||
|
} |
||||||
|
Exit.exit(formatCommand(System.out, directories, metaProperties, ignoreFormatted )) |
||||||
|
|
||||||
|
case "random-uuid" => |
||||||
|
System.out.println(Uuid.randomUuid) |
||||||
|
Exit.exit(0) |
||||||
|
|
||||||
|
case _ => |
||||||
|
throw new RuntimeException(s"Unknown command $command") |
||||||
|
} |
||||||
|
} catch { |
||||||
|
case e: TerseFailure => |
||||||
|
System.err.println(e.getMessage) |
||||||
|
System.exit(1) |
||||||
|
} |
||||||
|
} |
||||||
|
|
||||||
|
def configToLogDirectories(config: KafkaConfig): Seq[String] = { |
||||||
|
val directories = new mutable.TreeSet[String] |
||||||
|
directories ++= config.logDirs |
||||||
|
Option(config.metadataLogDir).foreach(directories.add) |
||||||
|
directories.toSeq |
||||||
|
} |
||||||
|
|
||||||
|
def configToKip500Mode(config: KafkaConfig): Boolean = config.processRoles.nonEmpty |
||||||
|
|
||||||
|
def infoCommand(stream: PrintStream, kip500Mode: Boolean, directories: Seq[String]): Int = { |
||||||
|
val problems = new mutable.ArrayBuffer[String] |
||||||
|
val foundDirectories = new mutable.ArrayBuffer[String] |
||||||
|
var prevMetadata: Option[RawMetaProperties] = None |
||||||
|
directories.sorted.foreach(directory => { |
||||||
|
val directoryPath = Paths.get(directory) |
||||||
|
if (!Files.isDirectory(directoryPath)) { |
||||||
|
if (!Files.exists(directoryPath)) { |
||||||
|
problems += s"$directoryPath does not exist" |
||||||
|
} else { |
||||||
|
problems += s"$directoryPath is not a directory" |
||||||
|
} |
||||||
|
} else { |
||||||
|
foundDirectories += directoryPath.toString |
||||||
|
val metaPath = directoryPath.resolve("meta.properties") |
||||||
|
if (!Files.exists(metaPath)) { |
||||||
|
problems += s"$directoryPath is not formatted." |
||||||
|
} else { |
||||||
|
val properties = Utils.loadProps(metaPath.toString) |
||||||
|
val rawMetaProperties = new RawMetaProperties(properties) |
||||||
|
|
||||||
|
val curMetadata = rawMetaProperties.version match { |
||||||
|
case 0 | 1 => Some(rawMetaProperties) |
||||||
|
case v => |
||||||
|
problems += s"Unsupported version for $metaPath: $v" |
||||||
|
None |
||||||
|
} |
||||||
|
|
||||||
|
if (prevMetadata.isEmpty) { |
||||||
|
prevMetadata = curMetadata |
||||||
|
} else { |
||||||
|
if (!prevMetadata.get.equals(curMetadata.get)) { |
||||||
|
problems += s"Metadata for $metaPath was ${curMetadata.get}, " + |
||||||
|
s"but other directories featured ${prevMetadata.get}" |
||||||
|
} |
||||||
|
} |
||||||
|
} |
||||||
|
} |
||||||
|
}) |
||||||
|
|
||||||
|
prevMetadata.foreach { prev => |
||||||
|
if (kip500Mode) { |
||||||
|
if (prev.version == 0) { |
||||||
|
problems += "The kafka configuration file appears to be for a kip-500 cluster, but " + |
||||||
|
"the directories are formatted for legacy mode." |
||||||
|
} |
||||||
|
} else if (prev.version == 1) { |
||||||
|
problems += "The kafka configuration file appears to be for a legacy cluster, but " + |
||||||
|
"the directories are formatted for kip-500." |
||||||
|
} |
||||||
|
} |
||||||
|
|
||||||
|
if (directories.isEmpty) { |
||||||
|
stream.println("No directories specified.") |
||||||
|
0 |
||||||
|
} else { |
||||||
|
if (foundDirectories.nonEmpty) { |
||||||
|
if (foundDirectories.size == 1) { |
||||||
|
stream.println("Found log directory:") |
||||||
|
} else { |
||||||
|
stream.println("Found log directories:") |
||||||
|
} |
||||||
|
foundDirectories.foreach(d => stream.println(" %s".format(d))) |
||||||
|
stream.println("") |
||||||
|
} |
||||||
|
|
||||||
|
prevMetadata.foreach { prev => |
||||||
|
stream.println(s"Found metadata: ${prev}") |
||||||
|
stream.println("") |
||||||
|
} |
||||||
|
|
||||||
|
if (problems.nonEmpty) { |
||||||
|
if (problems.size == 1) { |
||||||
|
stream.println("Found problem:") |
||||||
|
} else { |
||||||
|
stream.println("Found problems:") |
||||||
|
} |
||||||
|
problems.foreach(d => stream.println(" %s".format(d))) |
||||||
|
stream.println("") |
||||||
|
1 |
||||||
|
} else { |
||||||
|
0 |
||||||
|
} |
||||||
|
} |
||||||
|
} |
||||||
|
|
||||||
|
def buildMetadataProperties( |
||||||
|
clusterIdStr: String, |
||||||
|
config: KafkaConfig |
||||||
|
): MetaProperties = { |
||||||
|
val effectiveClusterId = try { |
||||||
|
Uuid.fromString(clusterIdStr) |
||||||
|
} catch { |
||||||
|
case e: Throwable => throw new TerseFailure(s"Cluster ID string $clusterIdStr " + |
||||||
|
s"does not appear to be a valid UUID: ${e.getMessage}") |
||||||
|
} |
||||||
|
require(config.nodeId >= 0, s"The node.id must be set to a non-negative integer.") |
||||||
|
new MetaProperties(effectiveClusterId, config.nodeId) |
||||||
|
} |
||||||
|
|
||||||
|
def formatCommand(stream: PrintStream, |
||||||
|
directories: Seq[String], |
||||||
|
metaProperties: MetaProperties, |
||||||
|
ignoreFormatted: Boolean): Int = { |
||||||
|
if (directories.isEmpty) { |
||||||
|
throw new TerseFailure("No log directories found in the configuration.") |
||||||
|
} |
||||||
|
val unformattedDirectories = directories.filter(directory => { |
||||||
|
if (!Files.isDirectory(Paths.get(directory)) || !Files.exists(Paths.get(directory, "meta.properties"))) { |
||||||
|
true |
||||||
|
} else if (!ignoreFormatted) { |
||||||
|
throw new TerseFailure(s"Log directory ${directory} is already formatted. " + |
||||||
|
"Use --ignore-formatted to ignore this directory and format the others.") |
||||||
|
} else { |
||||||
|
false |
||||||
|
} |
||||||
|
}) |
||||||
|
if (unformattedDirectories.isEmpty) { |
||||||
|
stream.println("All of the log directories are already formatted.") |
||||||
|
} |
||||||
|
unformattedDirectories.foreach(directory => { |
||||||
|
try { |
||||||
|
Files.createDirectories(Paths.get(directory)) |
||||||
|
} catch { |
||||||
|
case e: Throwable => throw new TerseFailure(s"Unable to create storage " + |
||||||
|
s"directory ${directory}: ${e.getMessage}") |
||||||
|
} |
||||||
|
val metaPropertiesPath = Paths.get(directory, "meta.properties") |
||||||
|
val checkpoint = new BrokerMetadataCheckpoint(metaPropertiesPath.toFile) |
||||||
|
checkpoint.write(metaProperties.toProperties) |
||||||
|
stream.println(s"Formatting ${directory}") |
||||||
|
}) |
||||||
|
0 |
||||||
|
} |
||||||
|
} |
@ -0,0 +1,187 @@ |
|||||||
|
/* |
||||||
|
* Licensed to the Apache Software Foundation (ASF) under one or more |
||||||
|
* contributor license agreements. See the NOTICE file distributed with |
||||||
|
* this work for additional information regarding copyright ownership. |
||||||
|
* The ASF licenses this file to You under the Apache License, Version 2.0 |
||||||
|
* (the "License"); you may not use this file except in compliance with |
||||||
|
* the License. You may obtain a copy of the License at |
||||||
|
* |
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0 |
||||||
|
* |
||||||
|
* Unless required by applicable law or agreed to in writing, software |
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS, |
||||||
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
||||||
|
* See the License for the specific language governing permissions and |
||||||
|
* limitations under the License. |
||||||
|
*/ |
||||||
|
|
||||||
|
package kafka.tools |
||||||
|
|
||||||
|
import java.io.{ByteArrayOutputStream, PrintStream} |
||||||
|
import java.nio.charset.StandardCharsets |
||||||
|
import java.nio.file.Files |
||||||
|
import java.util |
||||||
|
import java.util.Properties |
||||||
|
|
||||||
|
import kafka.server.{KafkaConfig, MetaProperties} |
||||||
|
import kafka.utils.TestUtils |
||||||
|
import org.apache.kafka.common.Uuid |
||||||
|
import org.apache.kafka.common.utils.Utils |
||||||
|
import org.junit.jupiter.api.Assertions.{assertEquals, assertThrows} |
||||||
|
import org.junit.jupiter.api.{Test, Timeout} |
||||||
|
|
||||||
|
|
||||||
|
@Timeout(value = 40) |
||||||
|
class StorageToolTest { |
||||||
|
private def newKip500Properties() = { |
||||||
|
val properties = new Properties() |
||||||
|
properties.setProperty(KafkaConfig.LogDirsProp, "/tmp/foo,/tmp/bar") |
||||||
|
properties.setProperty(KafkaConfig.ProcessRolesProp, "controller") |
||||||
|
properties.setProperty(KafkaConfig.NodeIdProp, "2") |
||||||
|
properties |
||||||
|
} |
||||||
|
|
||||||
|
@Test |
||||||
|
def testConfigToLogDirectories(): Unit = { |
||||||
|
val config = new KafkaConfig(newKip500Properties()) |
||||||
|
assertEquals(Seq("/tmp/bar", "/tmp/foo"), StorageTool.configToLogDirectories(config)) |
||||||
|
} |
||||||
|
|
||||||
|
@Test |
||||||
|
def testConfigToLogDirectoriesWithMetaLogDir(): Unit = { |
||||||
|
val properties = newKip500Properties() |
||||||
|
properties.setProperty(KafkaConfig.MetadataLogDirProp, "/tmp/baz") |
||||||
|
val config = new KafkaConfig(properties) |
||||||
|
assertEquals(Seq("/tmp/bar", "/tmp/baz", "/tmp/foo"), |
||||||
|
StorageTool.configToLogDirectories(config)) |
||||||
|
} |
||||||
|
|
||||||
|
@Test |
||||||
|
def testInfoCommandOnEmptyDirectory(): Unit = { |
||||||
|
val stream = new ByteArrayOutputStream() |
||||||
|
val tempDir = TestUtils.tempDir() |
||||||
|
try { |
||||||
|
assertEquals(1, StorageTool. |
||||||
|
infoCommand(new PrintStream(stream), true, Seq(tempDir.toString))) |
||||||
|
assertEquals(s"""Found log directory: |
||||||
|
${tempDir.toString} |
||||||
|
|
||||||
|
Found problem: |
||||||
|
${tempDir.toString} is not formatted. |
||||||
|
|
||||||
|
""", stream.toString()) |
||||||
|
} finally Utils.delete(tempDir) |
||||||
|
} |
||||||
|
|
||||||
|
@Test |
||||||
|
def testInfoCommandOnMissingDirectory(): Unit = { |
||||||
|
val stream = new ByteArrayOutputStream() |
||||||
|
val tempDir = TestUtils.tempDir() |
||||||
|
tempDir.delete() |
||||||
|
try { |
||||||
|
assertEquals(1, StorageTool. |
||||||
|
infoCommand(new PrintStream(stream), true, Seq(tempDir.toString))) |
||||||
|
assertEquals(s"""Found problem: |
||||||
|
${tempDir.toString} does not exist |
||||||
|
|
||||||
|
""", stream.toString()) |
||||||
|
} finally Utils.delete(tempDir) |
||||||
|
} |
||||||
|
|
||||||
|
@Test |
||||||
|
def testInfoCommandOnDirectoryAsFile(): Unit = { |
||||||
|
val stream = new ByteArrayOutputStream() |
||||||
|
val tempFile = TestUtils.tempFile() |
||||||
|
try { |
||||||
|
assertEquals(1, StorageTool. |
||||||
|
infoCommand(new PrintStream(stream), true, Seq(tempFile.toString))) |
||||||
|
assertEquals(s"""Found problem: |
||||||
|
${tempFile.toString} is not a directory |
||||||
|
|
||||||
|
""", stream.toString()) |
||||||
|
} finally tempFile.delete() |
||||||
|
} |
||||||
|
|
||||||
|
@Test |
||||||
|
def testInfoWithMismatchedLegacyKafkaConfig(): Unit = { |
||||||
|
val stream = new ByteArrayOutputStream() |
||||||
|
val tempDir = TestUtils.tempDir() |
||||||
|
try { |
||||||
|
Files.write(tempDir.toPath.resolve("meta.properties"), |
||||||
|
String.join("\n", util.Arrays.asList( |
||||||
|
"version=1", |
||||||
|
"cluster.id=XcZZOzUqS4yHOjhMQB6JLQ")). |
||||||
|
getBytes(StandardCharsets.UTF_8)) |
||||||
|
assertEquals(1, StorageTool. |
||||||
|
infoCommand(new PrintStream(stream), false, Seq(tempDir.toString))) |
||||||
|
assertEquals(s"""Found log directory: |
||||||
|
${tempDir.toString} |
||||||
|
|
||||||
|
Found metadata: {cluster.id=XcZZOzUqS4yHOjhMQB6JLQ, version=1} |
||||||
|
|
||||||
|
Found problem: |
||||||
|
The kafka configuration file appears to be for a legacy cluster, but the directories are formatted for kip-500. |
||||||
|
|
||||||
|
""", stream.toString()) |
||||||
|
} finally Utils.delete(tempDir) |
||||||
|
} |
||||||
|
|
||||||
|
@Test |
||||||
|
def testInfoWithMismatchedKip500KafkaConfig(): Unit = { |
||||||
|
val stream = new ByteArrayOutputStream() |
||||||
|
val tempDir = TestUtils.tempDir() |
||||||
|
try { |
||||||
|
Files.write(tempDir.toPath.resolve("meta.properties"), |
||||||
|
String.join("\n", util.Arrays.asList( |
||||||
|
"version=0", |
||||||
|
"broker.id=1", |
||||||
|
"cluster.id=26c36907-4158-4a35-919d-6534229f5241")). |
||||||
|
getBytes(StandardCharsets.UTF_8)) |
||||||
|
assertEquals(1, StorageTool. |
||||||
|
infoCommand(new PrintStream(stream), true, Seq(tempDir.toString))) |
||||||
|
assertEquals(s"""Found log directory: |
||||||
|
${tempDir.toString} |
||||||
|
|
||||||
|
Found metadata: {broker.id=1, cluster.id=26c36907-4158-4a35-919d-6534229f5241, version=0} |
||||||
|
|
||||||
|
Found problem: |
||||||
|
The kafka configuration file appears to be for a kip-500 cluster, but the directories are formatted for legacy mode. |
||||||
|
|
||||||
|
""", stream.toString()) |
||||||
|
} finally Utils.delete(tempDir) |
||||||
|
} |
||||||
|
|
||||||
|
@Test |
||||||
|
def testFormatEmptyDirectory(): Unit = { |
||||||
|
val tempDir = TestUtils.tempDir() |
||||||
|
try { |
||||||
|
val metaProperties = MetaProperties( |
||||||
|
clusterId = Uuid.fromString("XcZZOzUqS4yHOjhMQB6JLQ"), nodeId = 2) |
||||||
|
val stream = new ByteArrayOutputStream() |
||||||
|
assertEquals(0, StorageTool. |
||||||
|
formatCommand(new PrintStream(stream), Seq(tempDir.toString), metaProperties, false)) |
||||||
|
assertEquals("Formatting %s%n".format(tempDir), stream.toString()) |
||||||
|
|
||||||
|
try assertEquals(1, StorageTool. |
||||||
|
formatCommand(new PrintStream(new ByteArrayOutputStream()), Seq(tempDir.toString), metaProperties, false)) catch { |
||||||
|
case e: TerseFailure => assertEquals(s"Log directory ${tempDir} is already " + |
||||||
|
"formatted. Use --ignore-formatted to ignore this directory and format the " + |
||||||
|
"others.", e.getMessage) |
||||||
|
} |
||||||
|
|
||||||
|
val stream2 = new ByteArrayOutputStream() |
||||||
|
assertEquals(0, StorageTool. |
||||||
|
formatCommand(new PrintStream(stream2), Seq(tempDir.toString), metaProperties, true)) |
||||||
|
assertEquals("All of the log directories are already formatted.%n".format(), stream2.toString()) |
||||||
|
} finally Utils.delete(tempDir) |
||||||
|
} |
||||||
|
|
||||||
|
@Test |
||||||
|
def testFormatWithInvalidClusterId(): Unit = { |
||||||
|
val config = new KafkaConfig(newKip500Properties()) |
||||||
|
assertEquals("Cluster ID string invalid does not appear to be a valid UUID: " + |
||||||
|
"Input string `invalid` decoded as 5 bytes, which is not equal to the expected " + |
||||||
|
"16 bytes of a base64-encoded UUID", assertThrows(classOf[TerseFailure], |
||||||
|
() => StorageTool.buildMetadataProperties("invalid", config)).getMessage) |
||||||
|
} |
||||||
|
} |
Loading…
Reference in new issue