From b1b80860a01cc378cfada3549a3480f0773c3ff8 Mon Sep 17 00:00:00 2001 From: Sriharsha Chintalapani Date: Mon, 12 Jan 2015 15:45:13 -0800 Subject: [PATCH] KAFKA-1070 Auto assign broker id; reviewed by Neha Narkhede --- .../common/GenerateBrokerIdException.scala | 27 +++ .../InconsistentBrokerIdException.scala | 27 +++ .../kafka/log/LogCleanerManager.scala.orig | 203 ++++++++++++++++++ .../server/BrokerMetadataCheckpoint.scala | 83 +++++++ .../main/scala/kafka/server/KafkaConfig.scala | 20 +- .../main/scala/kafka/server/KafkaServer.scala | 87 ++++++-- core/src/main/scala/kafka/utils/ZkUtils.scala | 38 +++- .../server/ServerGenerateBrokerIdTest.scala | 127 +++++++++++ .../scala/unit/kafka/utils/TestUtils.scala | 9 +- 9 files changed, 596 insertions(+), 25 deletions(-) create mode 100644 core/src/main/scala/kafka/common/GenerateBrokerIdException.scala create mode 100644 core/src/main/scala/kafka/common/InconsistentBrokerIdException.scala create mode 100644 core/src/main/scala/kafka/log/LogCleanerManager.scala.orig create mode 100644 core/src/main/scala/kafka/server/BrokerMetadataCheckpoint.scala create mode 100644 core/src/test/scala/unit/kafka/server/ServerGenerateBrokerIdTest.scala diff --git a/core/src/main/scala/kafka/common/GenerateBrokerIdException.scala b/core/src/main/scala/kafka/common/GenerateBrokerIdException.scala new file mode 100644 index 00000000000..13784fe5055 --- /dev/null +++ b/core/src/main/scala/kafka/common/GenerateBrokerIdException.scala @@ -0,0 +1,27 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package kafka.common + +/** + * Thrown when there is a failure to generate a zookeeper sequenceId to use as brokerId + */ +class GenerateBrokerIdException(message: String, cause: Throwable) extends RuntimeException(message, cause) { + def this(message: String) = this(message, null) + def this(cause: Throwable) = this(null, cause) + def this() = this(null, null) +} diff --git a/core/src/main/scala/kafka/common/InconsistentBrokerIdException.scala b/core/src/main/scala/kafka/common/InconsistentBrokerIdException.scala new file mode 100644 index 00000000000..0c0d1cd731a --- /dev/null +++ b/core/src/main/scala/kafka/common/InconsistentBrokerIdException.scala @@ -0,0 +1,27 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package kafka.common + +/** + * Indicates the brokerId stored in logDirs is not consistent across logDirs. + */ +class InconsistentBrokerIdException(message: String, cause: Throwable) extends RuntimeException(message, cause) { + def this(message: String) = this(message, null) + def this(cause: Throwable) = this(null, cause) + def this() = this(null, null) +} diff --git a/core/src/main/scala/kafka/log/LogCleanerManager.scala.orig b/core/src/main/scala/kafka/log/LogCleanerManager.scala.orig new file mode 100644 index 00000000000..e8ced6a5922 --- /dev/null +++ b/core/src/main/scala/kafka/log/LogCleanerManager.scala.orig @@ -0,0 +1,203 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package kafka.log + +import java.io.File +import kafka.metrics.KafkaMetricsGroup +import com.yammer.metrics.core.Gauge +import kafka.utils.{Logging, Pool} +import kafka.server.OffsetCheckpoint +import collection.mutable +import java.util.concurrent.locks.ReentrantLock +import kafka.utils.Utils._ +import java.util.concurrent.TimeUnit +import kafka.common.{LogCleaningAbortedException, TopicAndPartition} + +private[log] sealed trait LogCleaningState +private[log] case object LogCleaningInProgress extends LogCleaningState +private[log] case object LogCleaningAborted extends LogCleaningState +private[log] case object LogCleaningPaused extends LogCleaningState + +/** + * Manage the state of each partition being cleaned. + * If a partition is to be cleaned, it enters the LogCleaningInProgress state. + * While a partition is being cleaned, it can be requested to be aborted and paused. Then the partition first enters + * the LogCleaningAborted state. Once the cleaning task is aborted, the partition enters the LogCleaningPaused state. + * While a partition is in the LogCleaningPaused state, it won't be scheduled for cleaning again, until cleaning is + * requested to be resumed. + */ +private[log] class LogCleanerManager(val logDirs: Array[File], val logs: Pool[TopicAndPartition, Log]) extends Logging with KafkaMetricsGroup { + + override val loggerName = classOf[LogCleaner].getName + + /* the offset checkpoints holding the last cleaned point for each log */ + private val checkpoints = logDirs.map(dir => (dir, new OffsetCheckpoint(new File(dir, "cleaner-offset-checkpoint")))).toMap + + /* the set of logs currently being cleaned */ + private val inProgress = mutable.HashMap[TopicAndPartition, LogCleaningState]() + + /* a global lock used to control all access to the in-progress set and the offset checkpoints */ + private val lock = new ReentrantLock + + /* for coordinating the pausing and the cleaning of a partition */ + private val pausedCleaningCond = lock.newCondition() + + /* a gauge for tracking the cleanable ratio of the dirtiest log */ + @volatile private var dirtiestLogCleanableRatio = 0.0 + newGauge("max-dirty-percent", new Gauge[Int] { def value = (100 * dirtiestLogCleanableRatio).toInt }) + + /** + * @return the position processed for all logs. + */ + def allCleanerCheckpoints(): Map[TopicAndPartition, Long] = + checkpoints.values.flatMap(_.read()).toMap + + /** + * Choose the log to clean next and add it to the in-progress set. We recompute this + * every time off the full set of logs to allow logs to be dynamically added to the pool of logs + * the log manager maintains. + */ + def grabFilthiestLog(): Option[LogToClean] = { + inLock(lock) { + val lastClean = allCleanerCheckpoints() + val dirtyLogs = logs.filter(l => l._2.config.compact) // skip any logs marked for delete rather than dedupe + .filterNot(l => inProgress.contains(l._1)) // skip any logs already in-progress + .map(l => LogToClean(l._1, l._2, // create a LogToClean instance for each + lastClean.getOrElse(l._1, l._2.logSegments.head.baseOffset))) + .filter(l => l.totalBytes > 0) // skip any empty logs + this.dirtiestLogCleanableRatio = if (!dirtyLogs.isEmpty) dirtyLogs.max.cleanableRatio else 0 + val cleanableLogs = dirtyLogs.filter(l => l.cleanableRatio > l.log.config.minCleanableRatio) // and must meet the minimum threshold for dirty byte ratio + if(cleanableLogs.isEmpty) { + None + } else { + val filthiest = cleanableLogs.max + inProgress.put(filthiest.topicPartition, LogCleaningInProgress) + Some(filthiest) + } + } + } + + /** + * Abort the cleaning of a particular partition, if it's in progress. This call blocks until the cleaning of + * the partition is aborted. + * This is implemented by first abortAndPausing and then resuming the cleaning of the partition. + */ + def abortCleaning(topicAndPartition: TopicAndPartition) { + inLock(lock) { + abortAndPauseCleaning(topicAndPartition) + resumeCleaning(topicAndPartition) + info("The cleaning for partition %s is aborted".format(topicAndPartition)) + } + } + + /** + * Abort the cleaning of a particular partition if it's in progress, and pause any future cleaning of this partition. + * This call blocks until the cleaning of the partition is aborted and paused. + * 1. If the partition is not in progress, mark it as paused. + * 2. Otherwise, first mark the state of the partition as aborted. + * 3. The cleaner thread checks the state periodically and if it sees the state of the partition is aborted, it + * throws a LogCleaningAbortedException to stop the cleaning task. + * 4. When the cleaning task is stopped, doneCleaning() is called, which sets the state of the partition as paused. + * 5. abortAndPauseCleaning() waits until the state of the partition is changed to paused. + */ + def abortAndPauseCleaning(topicAndPartition: TopicAndPartition) { + inLock(lock) { + inProgress.get(topicAndPartition) match { + case None => + inProgress.put(topicAndPartition, LogCleaningPaused) + case Some(state) => + state match { + case LogCleaningInProgress => + inProgress.put(topicAndPartition, LogCleaningAborted) + case s => + throw new IllegalStateException("Compaction for partition %s cannot be aborted and paused since it is in %s state." + .format(topicAndPartition, s)) + } + } + while (!isCleaningInState(topicAndPartition, LogCleaningPaused)) + pausedCleaningCond.await(100, TimeUnit.MILLISECONDS) + info("The cleaning for partition %s is aborted and paused".format(topicAndPartition)) + } + } + + /** + * Resume the cleaning of a paused partition. This call blocks until the cleaning of a partition is resumed. + */ + def resumeCleaning(topicAndPartition: TopicAndPartition) { + inLock(lock) { + inProgress.get(topicAndPartition) match { + case None => + throw new IllegalStateException("Compaction for partition %s cannot be resumed since it is not paused." + .format(topicAndPartition)) + case Some(state) => + state match { + case LogCleaningPaused => + inProgress.remove(topicAndPartition) + case s => + throw new IllegalStateException("Compaction for partition %s cannot be resumed since it is in %s state." + .format(topicAndPartition, s)) + } + } + } + info("Compaction for partition %s is resumed".format(topicAndPartition)) + } + + /** + * Check if the cleaning for a partition is in a particular state. The caller is expected to hold lock while making the call. + */ + def isCleaningInState(topicAndPartition: TopicAndPartition, expectedState: LogCleaningState): Boolean = { + inProgress.get(topicAndPartition) match { + case None => return false + case Some(state) => + if (state == expectedState) + return true + else + return false + } + } + + /** + * Check if the cleaning for a partition is aborted. If so, throw an exception. + */ + def checkCleaningAborted(topicAndPartition: TopicAndPartition) { + inLock(lock) { + if (isCleaningInState(topicAndPartition, LogCleaningAborted)) + throw new LogCleaningAbortedException() + } + } + + /** + * Save out the endOffset and remove the given log from the in-progress set, if not aborted. + */ + def doneCleaning(topicAndPartition: TopicAndPartition, dataDir: File, endOffset: Long) { + inLock(lock) { + inProgress(topicAndPartition) match { + case LogCleaningInProgress => + val checkpoint = checkpoints(dataDir) + val offsets = checkpoint.read() + ((topicAndPartition, endOffset)) + checkpoint.write(offsets) + inProgress.remove(topicAndPartition) + case LogCleaningAborted => + inProgress.put(topicAndPartition, LogCleaningPaused) + pausedCleaningCond.signalAll() + case s => + throw new IllegalStateException("In-progress partition %s cannot be in %s state.".format(topicAndPartition, s)) + } + } + } +} diff --git a/core/src/main/scala/kafka/server/BrokerMetadataCheckpoint.scala b/core/src/main/scala/kafka/server/BrokerMetadataCheckpoint.scala new file mode 100644 index 00000000000..0e542ff0cc3 --- /dev/null +++ b/core/src/main/scala/kafka/server/BrokerMetadataCheckpoint.scala @@ -0,0 +1,83 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package kafka.server + +import java.io._ +import java.util.Properties +import kafka.utils._ + + +case class BrokerMetadata(brokerId: Int) + +/** + * This class saves broker's metadata to a file + */ +class BrokerMetadataCheckpoint(val file: File) extends Logging { + private val lock = new Object() + new File(file + ".tmp").delete() // try to delete any existing temp files for cleanliness + + def write(brokerMetadata: BrokerMetadata) = { + lock synchronized { + try { + val brokerMetaProps = new Properties() + brokerMetaProps.setProperty("version", 0.toString) + brokerMetaProps.setProperty("broker.id", brokerMetadata.brokerId.toString) + val temp = new File(file.getAbsolutePath + ".tmp") + val fileOutputStream = new FileOutputStream(temp) + brokerMetaProps.store(fileOutputStream,"") + fileOutputStream.flush() + fileOutputStream.getFD().sync() + fileOutputStream.close() + // swap new BrokerMetadata file with previous one + if(!temp.renameTo(file)) { + // renameTo() fails on windows if destination file exists. + file.delete() + if(!temp.renameTo(file)) + throw new IOException("File rename from %s to %s failed.".format(temp.getAbsolutePath(), file.getAbsolutePath())) + } + } catch { + case ie: IOException => + error("Failed to write meta.properties due to ",ie) + throw ie + } + } + } + + def read(): Option[BrokerMetadata] = { + lock synchronized { + try { + val brokerMetaProps = new VerifiableProperties(Utils.loadProps(file.getAbsolutePath())) + val version = brokerMetaProps.getIntInRange("version", (0, Int.MaxValue)) + version match { + case 0 => + val brokerId = brokerMetaProps.getIntInRange("broker.id", (0, Int.MaxValue)) + return Some(BrokerMetadata(brokerId)) + case _ => + throw new IOException("Unrecognized version of the server meta.properties file: " + version) + } + } catch { + case e: FileNotFoundException => + warn("No meta.properties file under dir %s".format(file.getAbsolutePath(), e.getMessage)) + None + case e1: Exception => + error("Failed to read meta.properties file under dir %s due to %s".format(file.getAbsolutePath(), e1.getMessage)) + throw e1 + } + } + } +} diff --git a/core/src/main/scala/kafka/server/KafkaConfig.scala b/core/src/main/scala/kafka/server/KafkaConfig.scala index 6e26c5436fe..bbd3fd75e83 100644 --- a/core/src/main/scala/kafka/server/KafkaConfig.scala +++ b/core/src/main/scala/kafka/server/KafkaConfig.scala @@ -35,13 +35,13 @@ class KafkaConfig private (val props: VerifiableProperties) extends ZKConfig(pro private def getLogRetentionTimeMillis(): Long = { val millisInMinute = 60L * 1000L val millisInHour = 60L * millisInMinute - + if(props.containsKey("log.retention.ms")){ props.getIntInRange("log.retention.ms", (1, Int.MaxValue)) } else if(props.containsKey("log.retention.minutes")){ millisInMinute * props.getIntInRange("log.retention.minutes", (1, Int.MaxValue)) - } + } else { millisInHour * props.getIntInRange("log.retention.hours", 24*7, (1, Int.MaxValue)) } @@ -49,7 +49,7 @@ class KafkaConfig private (val props: VerifiableProperties) extends ZKConfig(pro private def getLogRollTimeMillis(): Long = { val millisInHour = 60L * 60L * 1000L - + if(props.containsKey("log.roll.ms")){ props.getIntInRange("log.roll.ms", (1, Int.MaxValue)) } @@ -71,8 +71,14 @@ class KafkaConfig private (val props: VerifiableProperties) extends ZKConfig(pro /*********** General Configuration ***********/ - /* the broker id for this server */ - val brokerId: Int = props.getIntInRange("broker.id", (0, Int.MaxValue)) + /* Max number that can be used for a broker.id */ + val MaxReservedBrokerId = props.getIntInRange("reserved.broker.max.id", 1000, (0, Int.MaxValue)) + + /* The broker id for this server. + * To avoid conflicts between zookeeper generated brokerId and user's config.brokerId + * added MaxReservedBrokerId and zookeeper sequence starts from MaxReservedBrokerId + 1. + */ + var brokerId: Int = if (props.containsKey("broker.id")) props.getIntInRange("broker.id", (0, MaxReservedBrokerId)) else -1 /* the maximum size of message that the server can receive */ val messageMaxBytes = props.getIntInRange("message.max.bytes", 1000000 + MessageSet.LogOverhead, (0, Int.MaxValue)) @@ -117,10 +123,10 @@ class KafkaConfig private (val props: VerifiableProperties) extends ZKConfig(pro /* the maximum number of bytes in a socket request */ val socketRequestMaxBytes: Int = props.getIntInRange("socket.request.max.bytes", 100*1024*1024, (1, Int.MaxValue)) - + /* the maximum number of connections we allow from each ip address */ val maxConnectionsPerIp: Int = props.getIntInRange("max.connections.per.ip", Int.MaxValue, (1, Int.MaxValue)) - + /* per-ip or hostname overrides to the default maximum number of connections */ val maxConnectionsPerIpOverrides = props.getMap("max.connections.per.ip.overrides").map(entry => (entry._1, entry._2.toInt)) diff --git a/core/src/main/scala/kafka/server/KafkaServer.scala b/core/src/main/scala/kafka/server/KafkaServer.scala index 1691ad7fc80..a069eb9272c 100644 --- a/core/src/main/scala/kafka/server/KafkaServer.scala +++ b/core/src/main/scala/kafka/server/KafkaServer.scala @@ -25,11 +25,12 @@ import kafka.utils._ import java.util.concurrent._ import atomic.{AtomicInteger, AtomicBoolean} import java.io.File +import collection.mutable import org.I0Itec.zkclient.ZkClient import kafka.controller.{ControllerStats, KafkaController} import kafka.cluster.Broker import kafka.api.{ControlledShutdownResponse, ControlledShutdownRequest} -import kafka.common.ErrorMapping +import kafka.common.{ErrorMapping, InconsistentBrokerIdException, GenerateBrokerIdException} import kafka.network.{Receive, BlockingChannel, SocketServer} import kafka.metrics.KafkaMetricsGroup import com.yammer.metrics.core.Gauge @@ -39,10 +40,11 @@ import com.yammer.metrics.core.Gauge * to start up and shutdown a single Kafka node. */ class KafkaServer(val config: KafkaConfig, time: Time = SystemTime) extends Logging with KafkaMetricsGroup { - this.logIdent = "[Kafka Server " + config.brokerId + "], " private var isShuttingDown = new AtomicBoolean(false) private var shutdownLatch = new CountDownLatch(1) private var startupComplete = new AtomicBoolean(false) + private var brokerId: Int = -1 + val brokerState: BrokerState = new BrokerState val correlationId: AtomicInteger = new AtomicInteger(0) var socketServer: SocketServer = null @@ -56,6 +58,8 @@ class KafkaServer(val config: KafkaConfig, time: Time = SystemTime) extends Logg var kafkaController: KafkaController = null val kafkaScheduler = new KafkaScheduler(config.backgroundThreads) var zkClient: ZkClient = null + val brokerMetaPropsFile = "meta.properties" + val brokerMetadataCheckpoints = config.logDirs.map(logDir => (logDir, new BrokerMetadataCheckpoint(new File(logDir + File.separator +brokerMetaPropsFile)))).toMap newGauge( "BrokerState", @@ -77,7 +81,7 @@ class KafkaServer(val config: KafkaConfig, time: Time = SystemTime) extends Logg /* start scheduler */ kafkaScheduler.startup() - + /* setup zookeeper */ zkClient = initZk() @@ -85,6 +89,10 @@ class KafkaServer(val config: KafkaConfig, time: Time = SystemTime) extends Logg logManager = createLogManager(zkClient, brokerState) logManager.startup() + /* generate brokerId */ + config.brokerId = getBrokerId + this.logIdent = "[Kafka Server " + config.brokerId + "], " + socketServer = new SocketServer(config.brokerId, config.hostName, config.port, @@ -104,26 +112,25 @@ class KafkaServer(val config: KafkaConfig, time: Time = SystemTime) extends Logg offsetManager = createOffsetManager() kafkaController = new KafkaController(config, zkClient, brokerState) - + /* start processing requests */ apis = new KafkaApis(socketServer.requestChannel, replicaManager, offsetManager, zkClient, config.brokerId, config, kafkaController) requestHandlerPool = new KafkaRequestHandlerPool(config.brokerId, socketServer.requestChannel, apis, config.numIoThreads) brokerState.newState(RunningAsBroker) - + Mx4jLoader.maybeLoad() replicaManager.startup() kafkaController.startup() - + topicConfigManager = new TopicConfigManager(zkClient, logManager) topicConfigManager.startup() - + /* tell everyone we are alive */ kafkaHealthcheck = new KafkaHealthcheck(config.brokerId, config.advertisedHostName, config.advertisedPort, config.zkSessionTimeoutMs, zkClient) kafkaHealthcheck.startup() - registerStats() startupComplete.set(true) info("started") @@ -181,10 +188,10 @@ class KafkaServer(val config: KafkaConfig, time: Time = SystemTime) extends Logg info("Starting controlled shutdown") var channel : BlockingChannel = null var prevController : Broker = null - var shutdownSuceeded : Boolean = false + var shutdownSucceeded : Boolean = false try { brokerState.newState(PendingControlledShutdown) - while (!shutdownSuceeded && remainingRetries > 0) { + while (!shutdownSucceeded && remainingRetries > 0) { remainingRetries = remainingRetries - 1 // 1. Find the controller and establish a connection to it. @@ -223,7 +230,7 @@ class KafkaServer(val config: KafkaConfig, time: Time = SystemTime) extends Logg val shutdownResponse = ControlledShutdownResponse.readFrom(response.buffer) if (shutdownResponse.errorCode == ErrorMapping.NoError && shutdownResponse.partitionsRemaining != null && shutdownResponse.partitionsRemaining.size == 0) { - shutdownSuceeded = true + shutdownSucceeded = true info ("Controlled shutdown succeeded") } else { @@ -239,7 +246,7 @@ class KafkaServer(val config: KafkaConfig, time: Time = SystemTime) extends Logg // ignore and try again } } - if (!shutdownSuceeded) { + if (!shutdownSucceeded) { Thread.sleep(config.controlledShutdownRetryBackoffMs) warn("Retrying controlled shutdown after the previous attempt failed...") } @@ -251,7 +258,7 @@ class KafkaServer(val config: KafkaConfig, time: Time = SystemTime) extends Logg channel = null } } - if (!shutdownSuceeded) { + if (!shutdownSucceeded) { warn("Proceeding to do an unclean shutdown as all the controlled shutdown attempts failed") } } @@ -307,7 +314,7 @@ class KafkaServer(val config: KafkaConfig, time: Time = SystemTime) extends Logg def awaitShutdown(): Unit = shutdownLatch.await() def getLogManager(): LogManager = logManager - + private def createLogManager(zkClient: ZkClient, brokerState: BrokerState): LogManager = { val defaultLogConfig = LogConfig(segmentSize = config.logSegmentBytes, segmentMs = config.logRollTimeMillis, @@ -359,5 +366,55 @@ class KafkaServer(val config: KafkaConfig, time: Time = SystemTime) extends Logg new OffsetManager(offsetManagerConfig, replicaManager, zkClient, kafkaScheduler) } -} + /** + * Generates new brokerId or reads from meta.properties based on following conditions + *
    + *
  1. config has no broker.id provided , generates a broker.id based on Zookeeper's sequence + *
  2. stored broker.id in meta.properties doesn't match in all the log.dirs throws InconsistentBrokerIdException + *
  3. config has broker.id and meta.properties contains broker.id if they don't match throws InconsistentBrokerIdException + *
  4. config has broker.id and there is no meta.properties file, creates new meta.properties and stores broker.id + *
      + * @returns A brokerId. + */ + private def getBrokerId: Int = { + var brokerId = config.brokerId + var logDirsWithoutMetaProps: List[String] = List() + val brokerIdSet = mutable.HashSet[Int]() + + for (logDir <- config.logDirs) { + val brokerMetadataOpt = brokerMetadataCheckpoints(logDir).read() + brokerMetadataOpt match { + case Some(brokerMetadata: BrokerMetadata) => + brokerIdSet.add(brokerMetadata.brokerId) + case None => + logDirsWithoutMetaProps ++= List(logDir) + } + } + + if(brokerIdSet.size > 1) + throw new InconsistentBrokerIdException("Failed to match brokerId across logDirs") + else if(brokerId >= 0 && brokerIdSet.size == 1 && brokerIdSet.last != brokerId) + throw new InconsistentBrokerIdException("Configured brokerId %s doesn't match stored brokerId %s in meta.properties".format(brokerId, brokerIdSet.last)) + else if(brokerIdSet.size == 0 && brokerId < 0) // generate a new brokerId from Zookeeper + brokerId = generateBrokerId + else if(brokerIdSet.size == 1) // pick broker.id from meta.properties + brokerId = brokerIdSet.last + + for(logDir <- logDirsWithoutMetaProps) { + val checkpoint = brokerMetadataCheckpoints(logDir) + checkpoint.write(new BrokerMetadata(brokerId)) + } + + return brokerId + } + private def generateBrokerId: Int = { + try { + ZkUtils.getBrokerSequenceId(zkClient, config.MaxReservedBrokerId) + } catch { + case e: Exception => + error("Failed to generate broker.id due to ", e) + throw new GenerateBrokerIdException("Failed to generate broker.id", e) + } + } +} diff --git a/core/src/main/scala/kafka/utils/ZkUtils.scala b/core/src/main/scala/kafka/utils/ZkUtils.scala index 56e3e88e0cc..c14bd455b66 100644 --- a/core/src/main/scala/kafka/utils/ZkUtils.scala +++ b/core/src/main/scala/kafka/utils/ZkUtils.scala @@ -46,6 +46,7 @@ object ZkUtils extends Logging { val ReassignPartitionsPath = "/admin/reassign_partitions" val DeleteTopicsPath = "/admin/delete_topics" val PreferredReplicaLeaderElectionPath = "/admin/preferred_replica_election" + val BrokerSequenceIdPath = "/brokers/seqid" def getTopicPath(topic: String): String = { BrokerTopicsPath + "/" + topic @@ -87,7 +88,8 @@ object ZkUtils extends Logging { } def setupCommonPaths(zkClient: ZkClient) { - for(path <- Seq(ConsumersPath, BrokerIdsPath, BrokerTopicsPath, TopicConfigChangesPath, TopicConfigPath, DeleteTopicsPath)) + for(path <- Seq(ConsumersPath, BrokerIdsPath, BrokerTopicsPath, TopicConfigChangesPath, TopicConfigPath, + DeleteTopicsPath, BrokerSequenceIdPath)) makeSurePersistentPathExists(zkClient, path) } @@ -122,6 +124,14 @@ object ZkUtils extends Logging { } } + /** returns a sequence id generated by updating BrokerSequenceIdPath in Zk. + * users can provide brokerId in the config , inorder to avoid conflicts between zk generated + * seqId and config.brokerId we increment zk seqId by KafkaConfig.MaxReservedBrokerId. + */ + def getBrokerSequenceId(zkClient: ZkClient, MaxReservedBrokerId: Int): Int = { + getSequenceId(zkClient, BrokerSequenceIdPath) + MaxReservedBrokerId + } + /** * Gets the in-sync replicas (ISR) for a specific topic and partition */ @@ -696,6 +706,32 @@ object ZkUtils extends Logging { } } + /** + * This API produces a sequence number by creating / updating given path in zookeeper + * It uses the stat returned by the zookeeper and return the version. Every time + * client updates the path stat.version gets incremented + */ + def getSequenceId(client: ZkClient, path: String): Int = { + try { + val stat = client.writeDataReturnStat(path, "", -1) + return stat.getVersion + } catch { + case e: ZkNoNodeException => { + createParentPath(client, BrokerSequenceIdPath) + try { + client.createPersistent(BrokerSequenceIdPath, "") + return 0 + } catch { + case e: ZkNodeExistsException => + val stat = client.writeDataReturnStat(BrokerSequenceIdPath, "", -1) + return stat.getVersion + case e2: Throwable => throw e2 + } + } + case e2: Throwable => throw e2 + } + } + def getAllTopics(zkClient: ZkClient): Seq[String] = { val topics = ZkUtils.getChildrenParentMayNotExist(zkClient, BrokerTopicsPath) if(topics == null) diff --git a/core/src/test/scala/unit/kafka/server/ServerGenerateBrokerIdTest.scala b/core/src/test/scala/unit/kafka/server/ServerGenerateBrokerIdTest.scala new file mode 100644 index 00000000000..cf2dd9455a9 --- /dev/null +++ b/core/src/test/scala/unit/kafka/server/ServerGenerateBrokerIdTest.scala @@ -0,0 +1,127 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package kafka.server + +import kafka.zk.ZooKeeperTestHarness +import kafka.utils.{TestUtils, Utils} +import org.junit.Test +import org.scalatest.junit.JUnit3Suite +import junit.framework.Assert._ +import java.io.File + +class ServerGenerateBrokerIdTest extends JUnit3Suite with ZooKeeperTestHarness { + var props1 = TestUtils.createBrokerConfig(-1, TestUtils.choosePort) + var config1 = new KafkaConfig(props1) + var props2 = TestUtils.createBrokerConfig(0, TestUtils.choosePort) + var config2 = new KafkaConfig(props2) + val brokerMetaPropsFile = "meta.properties" + + + @Test + def testAutoGenerateBrokerId() { + var server1 = new KafkaServer(config1) + server1.startup() + server1.shutdown() + assertTrue(verifyBrokerMetadata(config1.logDirs, 1001)) + // restart the server check to see if it uses the brokerId generated previously + server1 = new KafkaServer(config1) + server1.startup() + assertEquals(server1.config.brokerId, 1001) + server1.shutdown() + Utils.rm(server1.config.logDirs) + TestUtils.verifyNonDaemonThreadsStatus + } + + @Test + def testUserConfigAndGeneratedBrokerId() { + // start the server with broker.id as part of config + val server1 = new KafkaServer(config1) + val server2 = new KafkaServer(config2) + val props3 = TestUtils.createBrokerConfig(-1, TestUtils.choosePort) + val config3 = new KafkaConfig(props3) + val server3 = new KafkaServer(config3) + server1.startup() + assertEquals(server1.config.brokerId,1001) + server2.startup() + assertEquals(server2.config.brokerId,0) + server3.startup() + assertEquals(server3.config.brokerId,1002) + server1.shutdown() + server2.shutdown() + server3.shutdown() + assertTrue(verifyBrokerMetadata(server1.config.logDirs,1001)) + assertTrue(verifyBrokerMetadata(server2.config.logDirs,0)) + assertTrue(verifyBrokerMetadata(server3.config.logDirs,1002)) + Utils.rm(server1.config.logDirs) + Utils.rm(server2.config.logDirs) + Utils.rm(server3.config.logDirs) + TestUtils.verifyNonDaemonThreadsStatus + } + + @Test + def testMultipleLogDirsMetaProps() { + // add multiple logDirs and check if the generate brokerId is stored in all of them + val logDirs = props1.getProperty("log.dir")+ "," + TestUtils.tempDir().getAbsolutePath + + "," + TestUtils.tempDir().getAbsolutePath + props1.setProperty("log.dir",logDirs) + config1 = new KafkaConfig(props1) + var server1 = new KafkaServer(config1) + server1.startup() + server1.shutdown() + assertTrue(verifyBrokerMetadata(config1.logDirs, 1001)) + // addition to log.dirs after generation of a broker.id from zk should be copied over + val newLogDirs = props1.getProperty("log.dir") + "," + TestUtils.tempDir().getAbsolutePath + props1.setProperty("log.dir",newLogDirs) + config1 = new KafkaConfig(props1) + server1 = new KafkaServer(config1) + server1.startup() + server1.shutdown() + assertTrue(verifyBrokerMetadata(config1.logDirs, 1001)) + Utils.rm(server1.config.logDirs) + TestUtils.verifyNonDaemonThreadsStatus + } + + @Test + def testConsistentBrokerIdFromUserConfigAndMetaProps() { + // check if configured brokerId and stored brokerId are equal or throw InconsistentBrokerException + var server1 = new KafkaServer(config1) //auto generate broker Id + server1.startup() + server1.shutdown() + server1 = new KafkaServer(config2) // user specified broker id + try { + server1.startup() + } catch { + case e: kafka.common.InconsistentBrokerIdException => //success + } + server1.shutdown() + Utils.rm(server1.config.logDirs) + TestUtils.verifyNonDaemonThreadsStatus + } + + def verifyBrokerMetadata(logDirs: Seq[String], brokerId: Int): Boolean = { + for(logDir <- logDirs) { + val brokerMetadataOpt = (new BrokerMetadataCheckpoint( + new File(logDir + File.separator + brokerMetaPropsFile))).read() + brokerMetadataOpt match { + case Some(brokerMetadata: BrokerMetadata) => + if (brokerMetadata.brokerId != brokerId) return false + case _ => return false + } + } + true + } +} diff --git a/core/src/test/scala/unit/kafka/utils/TestUtils.scala b/core/src/test/scala/unit/kafka/utils/TestUtils.scala index c9e8ba257b7..ac15d344257 100644 --- a/core/src/test/scala/unit/kafka/utils/TestUtils.scala +++ b/core/src/test/scala/unit/kafka/utils/TestUtils.scala @@ -94,7 +94,7 @@ object TestUtils extends Logging { Utils.rm(f) } }) - + f } @@ -154,7 +154,7 @@ object TestUtils extends Logging { def createBrokerConfig(nodeId: Int, port: Int = choosePort(), enableControlledShutdown: Boolean = true): Properties = { val props = new Properties - props.put("broker.id", nodeId.toString) + if (nodeId >= 0) props.put("broker.id", nodeId.toString) props.put("host.name", "localhost") props.put("port", port.toString) props.put("log.dir", TestUtils.tempDir().getAbsolutePath) @@ -700,6 +700,11 @@ object TestUtils extends Logging { ZkUtils.pathExists(zkClient, ZkUtils.ReassignPartitionsPath) } + def verifyNonDaemonThreadsStatus() { + assertEquals(0, Thread.getAllStackTraces.keySet().toArray + .map(_.asInstanceOf[Thread]) + .count(t => !t.isDaemon && t.isAlive && t.getClass.getCanonicalName.toLowerCase.startsWith("kafka"))) + } /** * Create new LogManager instance with default configuration for testing