Browse Source

KAFKA-15356: Generate and persist directory IDs (#14291)

Reviewers: Proven Provenzano <pprovenzano@confluent.io>, Ron Dagostino <rdagostino@confluent.io>
pull/14510/head
Igor Soarez 12 months ago committed by GitHub
parent
commit
7e1c453af9
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 53
      clients/src/main/java/org/apache/kafka/common/Uuid.java
  2. 37
      core/src/main/scala/kafka/log/LogManager.scala
  3. 30
      core/src/main/scala/kafka/server/BrokerMetadataCheckpoint.scala
  4. 5
      core/src/main/scala/kafka/server/KafkaServer.scala
  5. 13
      core/src/main/scala/kafka/tools/StorageTool.scala
  6. 2
      core/src/test/scala/kafka/server/BrokerMetadataCheckpointTest.scala
  7. 30
      core/src/test/scala/unit/kafka/log/LogManagerTest.scala
  8. 2
      core/src/test/scala/unit/kafka/server/KafkaRaftServerTest.scala
  9. 5
      core/src/test/scala/unit/kafka/server/ServerGenerateBrokerIdTest.scala
  10. 5
      core/src/test/scala/unit/kafka/server/ServerGenerateClusterIdTest.scala
  11. 32
      core/src/test/scala/unit/kafka/tools/StorageToolTest.scala

53
clients/src/main/java/org/apache/kafka/common/Uuid.java

@ -17,7 +17,11 @@ @@ -17,7 +17,11 @@
package org.apache.kafka.common;
import java.nio.ByteBuffer;
import java.util.Arrays;
import java.util.Base64;
import java.util.Collections;
import java.util.HashSet;
import java.util.Set;
/**
* This class defines an immutable universally unique identifier (UUID). It represents a 128-bit value.
@ -27,16 +31,61 @@ import java.util.Base64; @@ -27,16 +31,61 @@ import java.util.Base64;
*/
public class Uuid implements Comparable<Uuid> {
/**
* A reserved UUID. Will never be returned by the randomUuid method.
*/
public static final Uuid ONE_UUID = new Uuid(0L, 1L);
/**
* A UUID for the metadata topic in KRaft mode. Will never be returned by the randomUuid method.
*/
public static final Uuid METADATA_TOPIC_ID = new Uuid(0L, 1L);
public static final Uuid METADATA_TOPIC_ID = ONE_UUID;
/**
* A UUID that represents a null or empty UUID. Will never be returned by the randomUuid method.
*/
public static final Uuid ZERO_UUID = new Uuid(0L, 0L);
/**
* A UUID that is used to identify new or unknown dir assignments.
*/
public static final Uuid UNKNOWN_DIR = ZERO_UUID;
/**
* A UUID that is used to represent unspecified offline dirs.
*/
public static final Uuid OFFLINE_DIR = ONE_UUID;
/**
* A UUID that is used to represent and unspecified log directory,
* that is expected to have been previously selected to host an
* associated replica. This contrasts with {@code UNKNOWN_DIR},
* which is associated with (typically new) replicas that may not
* yet have been placed in any log directory.
*/
public static final Uuid SELECTED_DIR = new Uuid(0L, 2L);
/**
* The set of reserved UUIDs that will never be returned by the randomUuid method.
*/
public static final Set<Uuid> RESERVED;
static {
HashSet<Uuid> reserved = new HashSet<>(Arrays.asList(
METADATA_TOPIC_ID,
ZERO_UUID,
ONE_UUID,
UNKNOWN_DIR,
OFFLINE_DIR,
SELECTED_DIR
));
// The first 100 UUIDs are reserved for future use.
for (long i = 0L; i < 100L; i++) {
reserved.add(new Uuid(0L, i));
}
RESERVED = Collections.unmodifiableSet(reserved);
}
private final long mostSignificantBits;
private final long leastSignificantBits;
@ -61,7 +110,7 @@ public class Uuid implements Comparable<Uuid> { @@ -61,7 +110,7 @@ public class Uuid implements Comparable<Uuid> {
*/
public static Uuid randomUuid() {
Uuid uuid = unsafeRandomUuid();
while (uuid.equals(METADATA_TOPIC_ID) || uuid.equals(ZERO_UUID) || uuid.toString().startsWith("-")) {
while (RESERVED.contains(uuid) || uuid.toString().startsWith("-")) {
uuid = unsafeRandomUuid();
}
return uuid;

37
core/src/main/scala/kafka/log/LogManager.scala

@ -117,6 +117,9 @@ class LogManager(logDirs: Seq[File], @@ -117,6 +117,9 @@ class LogManager(logDirs: Seq[File],
}
private val dirLocks = lockLogDirs(liveLogDirs)
private val dirIds = directoryIds(liveLogDirs)
// visible for testing
private[log] val directoryIds: Set[Uuid] = dirIds.values.toSet
@volatile private var recoveryPointCheckpoints = liveLogDirs.map(dir =>
(dir, new OffsetCheckpointFile(new File(dir, RecoveryPointCheckpointFile), logDirFailureChannel))).toMap
@volatile private var logStartOffsetCheckpoints = liveLogDirs.map(dir =>
@ -261,6 +264,40 @@ class LogManager(logDirs: Seq[File], @@ -261,6 +264,40 @@ class LogManager(logDirs: Seq[File],
}
}
/**
* Retrieves the Uuid for the directory, given its absolute path.
*/
def directoryId(dir: String): Option[Uuid] = dirIds.get(dir)
/**
* Determine directory ID for each directory with a meta.properties.
* If meta.properties does not include a directory ID, one is generated and persisted back to meta.properties.
* Directories without a meta.properties don't get a directory ID assigned.
*/
private def directoryIds(dirs: Seq[File]): Map[String, Uuid] = {
dirs.flatMap { dir =>
try {
val metadataCheckpoint = new BrokerMetadataCheckpoint(new File(dir, KafkaServer.brokerMetaPropsFile))
metadataCheckpoint.read().map { props =>
val rawMetaProperties = new RawMetaProperties(props)
val uuid = rawMetaProperties.directoryId match {
case Some(uuidStr) => Uuid.fromString(uuidStr)
case None =>
val uuid = Uuid.randomUuid()
rawMetaProperties.directoryId = uuid.toString
metadataCheckpoint.write(rawMetaProperties.props)
uuid
}
dir.getAbsolutePath -> uuid
}.toMap
} catch {
case e: IOException =>
logDirFailureChannel.maybeAddOfflineLogDir(dir.getAbsolutePath, s"Disk error while loading ID $dir", e)
None
}
}.toMap
}
private def addLogToBeDeleted(log: UnifiedLog): Unit = {
this.logsToBeDeleted.add((log, time.milliseconds()))
}

30
core/src/main/scala/kafka/server/BrokerMetadataCheckpoint.scala

@ -34,6 +34,7 @@ object RawMetaProperties { @@ -34,6 +34,7 @@ object RawMetaProperties {
val ClusterIdKey = "cluster.id"
val BrokerIdKey = "broker.id"
val NodeIdKey = "node.id"
val DirectoryIdKey = "directory.id"
val VersionKey = "version"
}
@ -63,6 +64,14 @@ class RawMetaProperties(val props: Properties = new Properties()) { @@ -63,6 +64,14 @@ class RawMetaProperties(val props: Properties = new Properties()) {
props.setProperty(NodeIdKey, id.toString)
}
def directoryId: Option[String] = {
Option(props.getProperty(DirectoryIdKey))
}
def directoryId_=(id: String): Unit = {
props.setProperty(DirectoryIdKey, id)
}
def version: Int = {
intValue(VersionKey).getOrElse(0)
}
@ -71,13 +80,6 @@ class RawMetaProperties(val props: Properties = new Properties()) { @@ -71,13 +80,6 @@ class RawMetaProperties(val props: Properties = new Properties()) {
props.setProperty(VersionKey, ver.toString)
}
def requireVersion(expectedVersion: Int): Unit = {
if (version != expectedVersion) {
throw new RuntimeException(s"Expected version $expectedVersion, but got "+
s"version $version")
}
}
private def intValue(key: String): Option[Int] = {
try {
Option(props.getProperty(key)).map(Integer.parseInt)
@ -141,11 +143,21 @@ case class MetaProperties( @@ -141,11 +143,21 @@ case class MetaProperties(
clusterId: String,
nodeId: Int,
) {
def toProperties: Properties = {
private def toRawMetaProperties: RawMetaProperties = {
val properties = new RawMetaProperties()
properties.version = 1
properties.clusterId = clusterId
properties.nodeId = nodeId
properties
}
def toProperties: Properties = {
toRawMetaProperties.props
}
def toPropertiesWithDirectoryId(directoryId: String): Properties = {
val properties = toRawMetaProperties
properties.directoryId = directoryId
properties.props
}
@ -166,7 +178,7 @@ object BrokerMetadataCheckpoint extends Logging { @@ -166,7 +178,7 @@ object BrokerMetadataCheckpoint extends Logging {
val offlineDirs = mutable.ArrayBuffer.empty[String]
for (logDir <- logDirs) {
val brokerCheckpointFile = new File(logDir, "meta.properties")
val brokerCheckpointFile = new File(logDir, KafkaServer.brokerMetaPropsFile)
val brokerCheckpoint = new BrokerMetadataCheckpoint(brokerCheckpointFile)
try {

5
core/src/main/scala/kafka/server/KafkaServer.scala

@ -69,6 +69,8 @@ import scala.jdk.CollectionConverters._ @@ -69,6 +69,8 @@ import scala.jdk.CollectionConverters._
object KafkaServer {
val brokerMetaPropsFile = "meta.properties"
def zkClientConfigFromKafkaConfig(config: KafkaConfig, forceZkSslClientEnable: Boolean = false): ZKClientConfig = {
val clientConfig = new ZKClientConfig
if (config.zkSslClientEnable || forceZkSslClientEnable) {
@ -165,9 +167,8 @@ class KafkaServer( @@ -165,9 +167,8 @@ class KafkaServer(
private var configRepository: ZkConfigRepository = _
val correlationId: AtomicInteger = new AtomicInteger(0)
val brokerMetaPropsFile = "meta.properties"
val brokerMetadataCheckpoints = config.logDirs.map { logDir =>
(logDir, new BrokerMetadataCheckpoint(new File(logDir + File.separator + brokerMetaPropsFile)))
(logDir, new BrokerMetadataCheckpoint(new File(logDir + File.separator + KafkaServer.brokerMetaPropsFile)))
}.toMap
private var _clusterId: String = _

13
core/src/main/scala/kafka/tools/StorageTool.scala

@ -19,10 +19,10 @@ package kafka.tools @@ -19,10 +19,10 @@ package kafka.tools
import java.io.PrintStream
import java.nio.file.{Files, Paths}
import kafka.server.{BrokerMetadataCheckpoint, KafkaConfig, MetaProperties, RawMetaProperties}
import kafka.server.{BrokerMetadataCheckpoint, KafkaConfig, KafkaServer, MetaProperties, RawMetaProperties}
import kafka.utils.{Exit, Logging}
import net.sourceforge.argparse4j.ArgumentParsers
import net.sourceforge.argparse4j.impl.Arguments.{store, storeTrue, append}
import net.sourceforge.argparse4j.impl.Arguments.{append, store, storeTrue}
import net.sourceforge.argparse4j.inf.Namespace
import org.apache.kafka.common.Uuid
import org.apache.kafka.common.utils.Utils
@ -33,7 +33,6 @@ import org.apache.kafka.common.metadata.UserScramCredentialRecord @@ -33,7 +33,6 @@ import org.apache.kafka.common.metadata.UserScramCredentialRecord
import org.apache.kafka.common.security.scram.internals.ScramMechanism
import org.apache.kafka.common.security.scram.internals.ScramFormatter
import java.util
import java.util.Base64
import java.util.Optional
@ -281,7 +280,7 @@ object StorageTool extends Logging { @@ -281,7 +280,7 @@ object StorageTool extends Logging {
}
} else {
foundDirectories += directoryPath.toString
val metaPath = directoryPath.resolve("meta.properties")
val metaPath = directoryPath.resolve(KafkaServer.brokerMetaPropsFile)
if (!Files.exists(metaPath)) {
problems += s"$directoryPath is not formatted."
} else {
@ -410,7 +409,7 @@ object StorageTool extends Logging { @@ -410,7 +409,7 @@ object StorageTool extends Logging {
}
val unformattedDirectories = directories.filter(directory => {
if (!Files.isDirectory(Paths.get(directory)) || !Files.exists(Paths.get(directory, "meta.properties"))) {
if (!Files.isDirectory(Paths.get(directory)) || !Files.exists(Paths.get(directory, KafkaServer.brokerMetaPropsFile))) {
true
} else if (!ignoreFormatted) {
throw new TerseFailure(s"Log directory $directory is already formatted. " +
@ -429,9 +428,9 @@ object StorageTool extends Logging { @@ -429,9 +428,9 @@ object StorageTool extends Logging {
case e: Throwable => throw new TerseFailure(s"Unable to create storage " +
s"directory $directory: ${e.getMessage}")
}
val metaPropertiesPath = Paths.get(directory, "meta.properties")
val metaPropertiesPath = Paths.get(directory, KafkaServer.brokerMetaPropsFile)
val checkpoint = new BrokerMetadataCheckpoint(metaPropertiesPath.toFile)
checkpoint.write(metaProperties.toProperties)
checkpoint.write(metaProperties.toPropertiesWithDirectoryId(Uuid.randomUuid().toString))
val bootstrapDirectory = new BootstrapDirectory(directory, Optional.empty())
bootstrapDirectory.writeBinaryFile(bootstrapMetadata)

2
core/src/test/scala/kafka/server/BrokerMetadataCheckpointTest.scala

@ -168,7 +168,7 @@ class BrokerMetadataCheckpointTest extends Logging { @@ -168,7 +168,7 @@ class BrokerMetadataCheckpointTest extends Logging {
for (mp <- metaProperties) {
val logDir = TestUtils.tempDirectory()
logDirs += logDir
val propFile = new File(logDir.getAbsolutePath, "meta.properties")
val propFile = new File(logDir.getAbsolutePath, KafkaServer.brokerMetaPropsFile)
val fs = new FileOutputStream(propFile)
try {
mp.props.store(fs, "")

30
core/src/test/scala/unit/kafka/log/LogManagerTest.scala

@ -20,13 +20,13 @@ package kafka.log @@ -20,13 +20,13 @@ package kafka.log
import com.yammer.metrics.core.{Gauge, MetricName}
import kafka.server.checkpoints.OffsetCheckpointFile
import kafka.server.metadata.{ConfigRepository, MockConfigRepository}
import kafka.server.BrokerTopicStats
import kafka.server.{BrokerMetadataCheckpoint, BrokerTopicStats, KafkaServer, RawMetaProperties}
import kafka.utils._
import org.apache.directory.api.util.FileUtils
import org.apache.kafka.common.config.TopicConfig
import org.apache.kafka.common.errors.OffsetOutOfRangeException
import org.apache.kafka.common.utils.Utils
import org.apache.kafka.common.{KafkaException, TopicPartition}
import org.apache.kafka.common.{KafkaException, TopicPartition, Uuid}
import org.junit.jupiter.api.Assertions._
import org.junit.jupiter.api.{AfterEach, BeforeEach, Test}
import org.mockito.ArgumentMatchers.any
@ -1010,4 +1010,30 @@ class LogManagerTest { @@ -1010,4 +1010,30 @@ class LogManagerTest {
assertEquals(8, invokedCount)
assertEquals(4, failureCount)
}
@Test
def testLoadDirectoryIds(): Unit = {
def writeMetaProperties(dir: File, id: Option[String] = None): Unit = {
val rawProps = new RawMetaProperties()
rawProps.nodeId = 1
rawProps.clusterId = "IVT1Seu3QjacxS7oBTKhDQ"
id.foreach(v => rawProps.directoryId = v)
new BrokerMetadataCheckpoint(new File(dir, KafkaServer.brokerMetaPropsFile)).write(rawProps.props)
}
val dirs: Seq[File] = Seq.fill(5)(TestUtils.tempDir())
writeMetaProperties(dirs(0))
writeMetaProperties(dirs(1), Some("ZwkGXjB0TvSF6mjVh6gO7Q"))
// no meta.properties on dirs(2)
writeMetaProperties(dirs(3), Some("kQfNPJ2FTHq_6Qlyyv6Jqg"))
writeMetaProperties(dirs(4))
logManager = createLogManager(dirs)
assertTrue(logManager.directoryId(dirs(0).getAbsolutePath).isDefined)
assertEquals(Some(Uuid.fromString("ZwkGXjB0TvSF6mjVh6gO7Q")), logManager.directoryId(dirs(1).getAbsolutePath))
assertEquals(None, logManager.directoryId(dirs(2).getAbsolutePath))
assertEquals(Some(Uuid.fromString("kQfNPJ2FTHq_6Qlyyv6Jqg")), logManager.directoryId(dirs(3).getAbsolutePath))
assertTrue(logManager.directoryId(dirs(4).getAbsolutePath).isDefined)
assertEquals(4, logManager.directoryIds.size)
}
}

2
core/src/test/scala/unit/kafka/server/KafkaRaftServerTest.scala

@ -91,7 +91,7 @@ class KafkaRaftServerTest { @@ -91,7 +91,7 @@ class KafkaRaftServerTest {
logDir: File,
metaProperties: MetaProperties
): Unit = {
val metaPropertiesFile = new File(logDir.getAbsolutePath, "meta.properties")
val metaPropertiesFile = new File(logDir.getAbsolutePath, KafkaServer.brokerMetaPropsFile)
val checkpoint = new BrokerMetadataCheckpoint(metaPropertiesFile)
checkpoint.write(metaProperties.toProperties)
}

5
core/src/test/scala/unit/kafka/server/ServerGenerateBrokerIdTest.scala

@ -33,7 +33,6 @@ class ServerGenerateBrokerIdTest extends QuorumTestHarness { @@ -33,7 +33,6 @@ class ServerGenerateBrokerIdTest extends QuorumTestHarness {
var config1: KafkaConfig = _
var props2: Properties = _
var config2: KafkaConfig = _
val brokerMetaPropsFile = "meta.properties"
var servers: Seq[KafkaServer] = Seq()
@BeforeEach
@ -158,7 +157,7 @@ class ServerGenerateBrokerIdTest extends QuorumTestHarness { @@ -158,7 +157,7 @@ class ServerGenerateBrokerIdTest extends QuorumTestHarness {
// verify no broker metadata was written
serverB.config.logDirs.foreach { logDir =>
val brokerMetaFile = new File(logDir + File.separator + brokerMetaPropsFile)
val brokerMetaFile = new File(logDir + File.separator + KafkaServer.brokerMetaPropsFile)
assertFalse(brokerMetaFile.exists())
}
@ -180,7 +179,7 @@ class ServerGenerateBrokerIdTest extends QuorumTestHarness { @@ -180,7 +179,7 @@ class ServerGenerateBrokerIdTest extends QuorumTestHarness {
def verifyBrokerMetadata(logDirs: Seq[String], brokerId: Int): Boolean = {
for (logDir <- logDirs) {
val brokerMetadataOpt = new BrokerMetadataCheckpoint(
new File(logDir + File.separator + brokerMetaPropsFile)).read()
new File(logDir + File.separator + KafkaServer.brokerMetaPropsFile)).read()
brokerMetadataOpt match {
case Some(properties) =>
val brokerMetadata = new RawMetaProperties(properties)

5
core/src/test/scala/unit/kafka/server/ServerGenerateClusterIdTest.scala

@ -38,7 +38,6 @@ class ServerGenerateClusterIdTest extends QuorumTestHarness { @@ -38,7 +38,6 @@ class ServerGenerateClusterIdTest extends QuorumTestHarness {
var config2: KafkaConfig = _
var config3: KafkaConfig = _
var servers: Seq[KafkaServer] = Seq()
val brokerMetaPropsFile = "meta.properties"
@BeforeEach
override def setUp(testInfo: TestInfo): Unit = {
@ -213,14 +212,14 @@ class ServerGenerateClusterIdTest extends QuorumTestHarness { @@ -213,14 +212,14 @@ class ServerGenerateClusterIdTest extends QuorumTestHarness {
def forgeBrokerMetadata(logDir: String, brokerId: Int, clusterId: String): Unit = {
val checkpoint = new BrokerMetadataCheckpoint(
new File(logDir + File.separator + brokerMetaPropsFile))
new File(logDir + File.separator + KafkaServer.brokerMetaPropsFile))
checkpoint.write(ZkMetaProperties(clusterId, brokerId).toProperties)
}
def verifyBrokerMetadata(logDirs: Seq[String], clusterId: String): Boolean = {
for (logDir <- logDirs) {
val brokerMetadataOpt = new BrokerMetadataCheckpoint(
new File(logDir + File.separator + brokerMetaPropsFile)).read()
new File(logDir + File.separator + KafkaServer.brokerMetaPropsFile)).read()
brokerMetadataOpt match {
case Some(properties) =>
val brokerMetadata = new RawMetaProperties(properties)

32
core/src/test/scala/unit/kafka/tools/StorageToolTest.scala

@ -19,17 +19,18 @@ package kafka.tools @@ -19,17 +19,18 @@ package kafka.tools
import java.io.{ByteArrayOutputStream, PrintStream}
import java.nio.charset.StandardCharsets
import java.nio.file.Files
import java.nio.file.{Files, Paths}
import java.util
import java.util.Properties
import org.apache.kafka.common.KafkaException
import kafka.server.{KafkaConfig, MetaProperties}
import org.apache.kafka.common.{KafkaException, Uuid}
import kafka.server.{BrokerMetadataCheckpoint, KafkaConfig, KafkaServer, MetaProperties}
import kafka.utils.Exit
import kafka.utils.TestUtils
import org.apache.commons.io.output.NullOutputStream
import org.apache.kafka.common.utils.Utils
import org.apache.kafka.server.common.MetadataVersion
import org.apache.kafka.common.metadata.UserScramCredentialRecord
import org.junit.jupiter.api.Assertions.{assertEquals, assertThrows, assertTrue}
import org.junit.jupiter.api.Assertions.{assertEquals, assertFalse, assertThrows, assertTrue}
import org.junit.jupiter.api.{Test, Timeout}
import scala.collection.mutable
@ -114,7 +115,7 @@ Found problem: @@ -114,7 +115,7 @@ Found problem:
val stream = new ByteArrayOutputStream()
val tempDir = TestUtils.tempDir()
try {
Files.write(tempDir.toPath.resolve("meta.properties"),
Files.write(tempDir.toPath.resolve(KafkaServer.brokerMetaPropsFile),
String.join("\n", util.Arrays.asList(
"version=1",
"cluster.id=XcZZOzUqS4yHOjhMQB6JLQ")).
@ -138,7 +139,7 @@ Found problem: @@ -138,7 +139,7 @@ Found problem:
val stream = new ByteArrayOutputStream()
val tempDir = TestUtils.tempDir()
try {
Files.write(tempDir.toPath.resolve("meta.properties"),
Files.write(tempDir.toPath.resolve(KafkaServer.brokerMetaPropsFile),
String.join("\n", util.Arrays.asList(
"version=0",
"broker.id=1",
@ -361,4 +362,23 @@ Found problem: @@ -361,4 +362,23 @@ Found problem:
Exit.resetExitProcedure()
}
}
@Test
def testDirUuidGeneration(): Unit = {
val tempDir = TestUtils.tempDir()
try {
val metaProperties = MetaProperties(
clusterId = "XcZZOzUqS4yHOjhMQB6JLQ", nodeId = 2)
val bootstrapMetadata = StorageTool.buildBootstrapMetadata(MetadataVersion.latest(), None, "test format command")
assertEquals(0, StorageTool.
formatCommand(new PrintStream(NullOutputStream.NULL_OUTPUT_STREAM), Seq(tempDir.toString), metaProperties, bootstrapMetadata, MetadataVersion.latest(), ignoreFormatted = false))
val metaPropertiesFile = Paths.get(tempDir.toURI).resolve(KafkaServer.brokerMetaPropsFile).toFile
assertTrue(metaPropertiesFile.exists())
val properties = new BrokerMetadataCheckpoint(metaPropertiesFile).read().get
assertTrue(properties.containsKey("directory.id"))
val directoryId = Uuid.fromString(properties.getProperty("directory.id"))
assertFalse(Uuid.RESERVED.contains(directoryId))
} finally Utils.delete(tempDir)
}
}

Loading…
Cancel
Save