Browse Source

KAFKA-1070 Auto assign broker id; reviewed by Neha Narkhede

pull/38/merge
Sriharsha Chintalapani 10 years ago committed by Neha Narkhede
parent
commit
b1b80860a0
  1. 27
      core/src/main/scala/kafka/common/GenerateBrokerIdException.scala
  2. 27
      core/src/main/scala/kafka/common/InconsistentBrokerIdException.scala
  3. 203
      core/src/main/scala/kafka/log/LogCleanerManager.scala.orig
  4. 83
      core/src/main/scala/kafka/server/BrokerMetadataCheckpoint.scala
  5. 20
      core/src/main/scala/kafka/server/KafkaConfig.scala
  6. 87
      core/src/main/scala/kafka/server/KafkaServer.scala
  7. 38
      core/src/main/scala/kafka/utils/ZkUtils.scala
  8. 127
      core/src/test/scala/unit/kafka/server/ServerGenerateBrokerIdTest.scala
  9. 9
      core/src/test/scala/unit/kafka/utils/TestUtils.scala

27
core/src/main/scala/kafka/common/GenerateBrokerIdException.scala

@ -0,0 +1,27 @@ @@ -0,0 +1,27 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package kafka.common
/**
* Thrown when there is a failure to generate a zookeeper sequenceId to use as brokerId
*/
class GenerateBrokerIdException(message: String, cause: Throwable) extends RuntimeException(message, cause) {
def this(message: String) = this(message, null)
def this(cause: Throwable) = this(null, cause)
def this() = this(null, null)
}

27
core/src/main/scala/kafka/common/InconsistentBrokerIdException.scala

@ -0,0 +1,27 @@ @@ -0,0 +1,27 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package kafka.common
/**
* Indicates the brokerId stored in logDirs is not consistent across logDirs.
*/
class InconsistentBrokerIdException(message: String, cause: Throwable) extends RuntimeException(message, cause) {
def this(message: String) = this(message, null)
def this(cause: Throwable) = this(null, cause)
def this() = this(null, null)
}

203
core/src/main/scala/kafka/log/LogCleanerManager.scala.orig

@ -0,0 +1,203 @@ @@ -0,0 +1,203 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package kafka.log
import java.io.File
import kafka.metrics.KafkaMetricsGroup
import com.yammer.metrics.core.Gauge
import kafka.utils.{Logging, Pool}
import kafka.server.OffsetCheckpoint
import collection.mutable
import java.util.concurrent.locks.ReentrantLock
import kafka.utils.Utils._
import java.util.concurrent.TimeUnit
import kafka.common.{LogCleaningAbortedException, TopicAndPartition}
private[log] sealed trait LogCleaningState
private[log] case object LogCleaningInProgress extends LogCleaningState
private[log] case object LogCleaningAborted extends LogCleaningState
private[log] case object LogCleaningPaused extends LogCleaningState
/**
* Manage the state of each partition being cleaned.
* If a partition is to be cleaned, it enters the LogCleaningInProgress state.
* While a partition is being cleaned, it can be requested to be aborted and paused. Then the partition first enters
* the LogCleaningAborted state. Once the cleaning task is aborted, the partition enters the LogCleaningPaused state.
* While a partition is in the LogCleaningPaused state, it won't be scheduled for cleaning again, until cleaning is
* requested to be resumed.
*/
private[log] class LogCleanerManager(val logDirs: Array[File], val logs: Pool[TopicAndPartition, Log]) extends Logging with KafkaMetricsGroup {
override val loggerName = classOf[LogCleaner].getName
/* the offset checkpoints holding the last cleaned point for each log */
private val checkpoints = logDirs.map(dir => (dir, new OffsetCheckpoint(new File(dir, "cleaner-offset-checkpoint")))).toMap
/* the set of logs currently being cleaned */
private val inProgress = mutable.HashMap[TopicAndPartition, LogCleaningState]()
/* a global lock used to control all access to the in-progress set and the offset checkpoints */
private val lock = new ReentrantLock
/* for coordinating the pausing and the cleaning of a partition */
private val pausedCleaningCond = lock.newCondition()
/* a gauge for tracking the cleanable ratio of the dirtiest log */
@volatile private var dirtiestLogCleanableRatio = 0.0
newGauge("max-dirty-percent", new Gauge[Int] { def value = (100 * dirtiestLogCleanableRatio).toInt })
/**
* @return the position processed for all logs.
*/
def allCleanerCheckpoints(): Map[TopicAndPartition, Long] =
checkpoints.values.flatMap(_.read()).toMap
/**
* Choose the log to clean next and add it to the in-progress set. We recompute this
* every time off the full set of logs to allow logs to be dynamically added to the pool of logs
* the log manager maintains.
*/
def grabFilthiestLog(): Option[LogToClean] = {
inLock(lock) {
val lastClean = allCleanerCheckpoints()
val dirtyLogs = logs.filter(l => l._2.config.compact) // skip any logs marked for delete rather than dedupe
.filterNot(l => inProgress.contains(l._1)) // skip any logs already in-progress
.map(l => LogToClean(l._1, l._2, // create a LogToClean instance for each
lastClean.getOrElse(l._1, l._2.logSegments.head.baseOffset)))
.filter(l => l.totalBytes > 0) // skip any empty logs
this.dirtiestLogCleanableRatio = if (!dirtyLogs.isEmpty) dirtyLogs.max.cleanableRatio else 0
val cleanableLogs = dirtyLogs.filter(l => l.cleanableRatio > l.log.config.minCleanableRatio) // and must meet the minimum threshold for dirty byte ratio
if(cleanableLogs.isEmpty) {
None
} else {
val filthiest = cleanableLogs.max
inProgress.put(filthiest.topicPartition, LogCleaningInProgress)
Some(filthiest)
}
}
}
/**
* Abort the cleaning of a particular partition, if it's in progress. This call blocks until the cleaning of
* the partition is aborted.
* This is implemented by first abortAndPausing and then resuming the cleaning of the partition.
*/
def abortCleaning(topicAndPartition: TopicAndPartition) {
inLock(lock) {
abortAndPauseCleaning(topicAndPartition)
resumeCleaning(topicAndPartition)
info("The cleaning for partition %s is aborted".format(topicAndPartition))
}
}
/**
* Abort the cleaning of a particular partition if it's in progress, and pause any future cleaning of this partition.
* This call blocks until the cleaning of the partition is aborted and paused.
* 1. If the partition is not in progress, mark it as paused.
* 2. Otherwise, first mark the state of the partition as aborted.
* 3. The cleaner thread checks the state periodically and if it sees the state of the partition is aborted, it
* throws a LogCleaningAbortedException to stop the cleaning task.
* 4. When the cleaning task is stopped, doneCleaning() is called, which sets the state of the partition as paused.
* 5. abortAndPauseCleaning() waits until the state of the partition is changed to paused.
*/
def abortAndPauseCleaning(topicAndPartition: TopicAndPartition) {
inLock(lock) {
inProgress.get(topicAndPartition) match {
case None =>
inProgress.put(topicAndPartition, LogCleaningPaused)
case Some(state) =>
state match {
case LogCleaningInProgress =>
inProgress.put(topicAndPartition, LogCleaningAborted)
case s =>
throw new IllegalStateException("Compaction for partition %s cannot be aborted and paused since it is in %s state."
.format(topicAndPartition, s))
}
}
while (!isCleaningInState(topicAndPartition, LogCleaningPaused))
pausedCleaningCond.await(100, TimeUnit.MILLISECONDS)
info("The cleaning for partition %s is aborted and paused".format(topicAndPartition))
}
}
/**
* Resume the cleaning of a paused partition. This call blocks until the cleaning of a partition is resumed.
*/
def resumeCleaning(topicAndPartition: TopicAndPartition) {
inLock(lock) {
inProgress.get(topicAndPartition) match {
case None =>
throw new IllegalStateException("Compaction for partition %s cannot be resumed since it is not paused."
.format(topicAndPartition))
case Some(state) =>
state match {
case LogCleaningPaused =>
inProgress.remove(topicAndPartition)
case s =>
throw new IllegalStateException("Compaction for partition %s cannot be resumed since it is in %s state."
.format(topicAndPartition, s))
}
}
}
info("Compaction for partition %s is resumed".format(topicAndPartition))
}
/**
* Check if the cleaning for a partition is in a particular state. The caller is expected to hold lock while making the call.
*/
def isCleaningInState(topicAndPartition: TopicAndPartition, expectedState: LogCleaningState): Boolean = {
inProgress.get(topicAndPartition) match {
case None => return false
case Some(state) =>
if (state == expectedState)
return true
else
return false
}
}
/**
* Check if the cleaning for a partition is aborted. If so, throw an exception.
*/
def checkCleaningAborted(topicAndPartition: TopicAndPartition) {
inLock(lock) {
if (isCleaningInState(topicAndPartition, LogCleaningAborted))
throw new LogCleaningAbortedException()
}
}
/**
* Save out the endOffset and remove the given log from the in-progress set, if not aborted.
*/
def doneCleaning(topicAndPartition: TopicAndPartition, dataDir: File, endOffset: Long) {
inLock(lock) {
inProgress(topicAndPartition) match {
case LogCleaningInProgress =>
val checkpoint = checkpoints(dataDir)
val offsets = checkpoint.read() + ((topicAndPartition, endOffset))
checkpoint.write(offsets)
inProgress.remove(topicAndPartition)
case LogCleaningAborted =>
inProgress.put(topicAndPartition, LogCleaningPaused)
pausedCleaningCond.signalAll()
case s =>
throw new IllegalStateException("In-progress partition %s cannot be in %s state.".format(topicAndPartition, s))
}
}
}
}

83
core/src/main/scala/kafka/server/BrokerMetadataCheckpoint.scala

@ -0,0 +1,83 @@ @@ -0,0 +1,83 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package kafka.server
import java.io._
import java.util.Properties
import kafka.utils._
case class BrokerMetadata(brokerId: Int)
/**
* This class saves broker's metadata to a file
*/
class BrokerMetadataCheckpoint(val file: File) extends Logging {
private val lock = new Object()
new File(file + ".tmp").delete() // try to delete any existing temp files for cleanliness
def write(brokerMetadata: BrokerMetadata) = {
lock synchronized {
try {
val brokerMetaProps = new Properties()
brokerMetaProps.setProperty("version", 0.toString)
brokerMetaProps.setProperty("broker.id", brokerMetadata.brokerId.toString)
val temp = new File(file.getAbsolutePath + ".tmp")
val fileOutputStream = new FileOutputStream(temp)
brokerMetaProps.store(fileOutputStream,"")
fileOutputStream.flush()
fileOutputStream.getFD().sync()
fileOutputStream.close()
// swap new BrokerMetadata file with previous one
if(!temp.renameTo(file)) {
// renameTo() fails on windows if destination file exists.
file.delete()
if(!temp.renameTo(file))
throw new IOException("File rename from %s to %s failed.".format(temp.getAbsolutePath(), file.getAbsolutePath()))
}
} catch {
case ie: IOException =>
error("Failed to write meta.properties due to ",ie)
throw ie
}
}
}
def read(): Option[BrokerMetadata] = {
lock synchronized {
try {
val brokerMetaProps = new VerifiableProperties(Utils.loadProps(file.getAbsolutePath()))
val version = brokerMetaProps.getIntInRange("version", (0, Int.MaxValue))
version match {
case 0 =>
val brokerId = brokerMetaProps.getIntInRange("broker.id", (0, Int.MaxValue))
return Some(BrokerMetadata(brokerId))
case _ =>
throw new IOException("Unrecognized version of the server meta.properties file: " + version)
}
} catch {
case e: FileNotFoundException =>
warn("No meta.properties file under dir %s".format(file.getAbsolutePath(), e.getMessage))
None
case e1: Exception =>
error("Failed to read meta.properties file under dir %s due to %s".format(file.getAbsolutePath(), e1.getMessage))
throw e1
}
}
}
}

20
core/src/main/scala/kafka/server/KafkaConfig.scala

@ -35,13 +35,13 @@ class KafkaConfig private (val props: VerifiableProperties) extends ZKConfig(pro @@ -35,13 +35,13 @@ class KafkaConfig private (val props: VerifiableProperties) extends ZKConfig(pro
private def getLogRetentionTimeMillis(): Long = {
val millisInMinute = 60L * 1000L
val millisInHour = 60L * millisInMinute
if(props.containsKey("log.retention.ms")){
props.getIntInRange("log.retention.ms", (1, Int.MaxValue))
}
else if(props.containsKey("log.retention.minutes")){
millisInMinute * props.getIntInRange("log.retention.minutes", (1, Int.MaxValue))
}
}
else {
millisInHour * props.getIntInRange("log.retention.hours", 24*7, (1, Int.MaxValue))
}
@ -49,7 +49,7 @@ class KafkaConfig private (val props: VerifiableProperties) extends ZKConfig(pro @@ -49,7 +49,7 @@ class KafkaConfig private (val props: VerifiableProperties) extends ZKConfig(pro
private def getLogRollTimeMillis(): Long = {
val millisInHour = 60L * 60L * 1000L
if(props.containsKey("log.roll.ms")){
props.getIntInRange("log.roll.ms", (1, Int.MaxValue))
}
@ -71,8 +71,14 @@ class KafkaConfig private (val props: VerifiableProperties) extends ZKConfig(pro @@ -71,8 +71,14 @@ class KafkaConfig private (val props: VerifiableProperties) extends ZKConfig(pro
/*********** General Configuration ***********/
/* the broker id for this server */
val brokerId: Int = props.getIntInRange("broker.id", (0, Int.MaxValue))
/* Max number that can be used for a broker.id */
val MaxReservedBrokerId = props.getIntInRange("reserved.broker.max.id", 1000, (0, Int.MaxValue))
/* The broker id for this server.
* To avoid conflicts between zookeeper generated brokerId and user's config.brokerId
* added MaxReservedBrokerId and zookeeper sequence starts from MaxReservedBrokerId + 1.
*/
var brokerId: Int = if (props.containsKey("broker.id")) props.getIntInRange("broker.id", (0, MaxReservedBrokerId)) else -1
/* the maximum size of message that the server can receive */
val messageMaxBytes = props.getIntInRange("message.max.bytes", 1000000 + MessageSet.LogOverhead, (0, Int.MaxValue))
@ -117,10 +123,10 @@ class KafkaConfig private (val props: VerifiableProperties) extends ZKConfig(pro @@ -117,10 +123,10 @@ class KafkaConfig private (val props: VerifiableProperties) extends ZKConfig(pro
/* the maximum number of bytes in a socket request */
val socketRequestMaxBytes: Int = props.getIntInRange("socket.request.max.bytes", 100*1024*1024, (1, Int.MaxValue))
/* the maximum number of connections we allow from each ip address */
val maxConnectionsPerIp: Int = props.getIntInRange("max.connections.per.ip", Int.MaxValue, (1, Int.MaxValue))
/* per-ip or hostname overrides to the default maximum number of connections */
val maxConnectionsPerIpOverrides = props.getMap("max.connections.per.ip.overrides").map(entry => (entry._1, entry._2.toInt))

87
core/src/main/scala/kafka/server/KafkaServer.scala

@ -25,11 +25,12 @@ import kafka.utils._ @@ -25,11 +25,12 @@ import kafka.utils._
import java.util.concurrent._
import atomic.{AtomicInteger, AtomicBoolean}
import java.io.File
import collection.mutable
import org.I0Itec.zkclient.ZkClient
import kafka.controller.{ControllerStats, KafkaController}
import kafka.cluster.Broker
import kafka.api.{ControlledShutdownResponse, ControlledShutdownRequest}
import kafka.common.ErrorMapping
import kafka.common.{ErrorMapping, InconsistentBrokerIdException, GenerateBrokerIdException}
import kafka.network.{Receive, BlockingChannel, SocketServer}
import kafka.metrics.KafkaMetricsGroup
import com.yammer.metrics.core.Gauge
@ -39,10 +40,11 @@ import com.yammer.metrics.core.Gauge @@ -39,10 +40,11 @@ import com.yammer.metrics.core.Gauge
* to start up and shutdown a single Kafka node.
*/
class KafkaServer(val config: KafkaConfig, time: Time = SystemTime) extends Logging with KafkaMetricsGroup {
this.logIdent = "[Kafka Server " + config.brokerId + "], "
private var isShuttingDown = new AtomicBoolean(false)
private var shutdownLatch = new CountDownLatch(1)
private var startupComplete = new AtomicBoolean(false)
private var brokerId: Int = -1
val brokerState: BrokerState = new BrokerState
val correlationId: AtomicInteger = new AtomicInteger(0)
var socketServer: SocketServer = null
@ -56,6 +58,8 @@ class KafkaServer(val config: KafkaConfig, time: Time = SystemTime) extends Logg @@ -56,6 +58,8 @@ class KafkaServer(val config: KafkaConfig, time: Time = SystemTime) extends Logg
var kafkaController: KafkaController = null
val kafkaScheduler = new KafkaScheduler(config.backgroundThreads)
var zkClient: ZkClient = null
val brokerMetaPropsFile = "meta.properties"
val brokerMetadataCheckpoints = config.logDirs.map(logDir => (logDir, new BrokerMetadataCheckpoint(new File(logDir + File.separator +brokerMetaPropsFile)))).toMap
newGauge(
"BrokerState",
@ -77,7 +81,7 @@ class KafkaServer(val config: KafkaConfig, time: Time = SystemTime) extends Logg @@ -77,7 +81,7 @@ class KafkaServer(val config: KafkaConfig, time: Time = SystemTime) extends Logg
/* start scheduler */
kafkaScheduler.startup()
/* setup zookeeper */
zkClient = initZk()
@ -85,6 +89,10 @@ class KafkaServer(val config: KafkaConfig, time: Time = SystemTime) extends Logg @@ -85,6 +89,10 @@ class KafkaServer(val config: KafkaConfig, time: Time = SystemTime) extends Logg
logManager = createLogManager(zkClient, brokerState)
logManager.startup()
/* generate brokerId */
config.brokerId = getBrokerId
this.logIdent = "[Kafka Server " + config.brokerId + "], "
socketServer = new SocketServer(config.brokerId,
config.hostName,
config.port,
@ -104,26 +112,25 @@ class KafkaServer(val config: KafkaConfig, time: Time = SystemTime) extends Logg @@ -104,26 +112,25 @@ class KafkaServer(val config: KafkaConfig, time: Time = SystemTime) extends Logg
offsetManager = createOffsetManager()
kafkaController = new KafkaController(config, zkClient, brokerState)
/* start processing requests */
apis = new KafkaApis(socketServer.requestChannel, replicaManager, offsetManager, zkClient, config.brokerId, config, kafkaController)
requestHandlerPool = new KafkaRequestHandlerPool(config.brokerId, socketServer.requestChannel, apis, config.numIoThreads)
brokerState.newState(RunningAsBroker)
Mx4jLoader.maybeLoad()
replicaManager.startup()
kafkaController.startup()
topicConfigManager = new TopicConfigManager(zkClient, logManager)
topicConfigManager.startup()
/* tell everyone we are alive */
kafkaHealthcheck = new KafkaHealthcheck(config.brokerId, config.advertisedHostName, config.advertisedPort, config.zkSessionTimeoutMs, zkClient)
kafkaHealthcheck.startup()
registerStats()
startupComplete.set(true)
info("started")
@ -181,10 +188,10 @@ class KafkaServer(val config: KafkaConfig, time: Time = SystemTime) extends Logg @@ -181,10 +188,10 @@ class KafkaServer(val config: KafkaConfig, time: Time = SystemTime) extends Logg
info("Starting controlled shutdown")
var channel : BlockingChannel = null
var prevController : Broker = null
var shutdownSuceeded : Boolean = false
var shutdownSucceeded : Boolean = false
try {
brokerState.newState(PendingControlledShutdown)
while (!shutdownSuceeded && remainingRetries > 0) {
while (!shutdownSucceeded && remainingRetries > 0) {
remainingRetries = remainingRetries - 1
// 1. Find the controller and establish a connection to it.
@ -223,7 +230,7 @@ class KafkaServer(val config: KafkaConfig, time: Time = SystemTime) extends Logg @@ -223,7 +230,7 @@ class KafkaServer(val config: KafkaConfig, time: Time = SystemTime) extends Logg
val shutdownResponse = ControlledShutdownResponse.readFrom(response.buffer)
if (shutdownResponse.errorCode == ErrorMapping.NoError && shutdownResponse.partitionsRemaining != null &&
shutdownResponse.partitionsRemaining.size == 0) {
shutdownSuceeded = true
shutdownSucceeded = true
info ("Controlled shutdown succeeded")
}
else {
@ -239,7 +246,7 @@ class KafkaServer(val config: KafkaConfig, time: Time = SystemTime) extends Logg @@ -239,7 +246,7 @@ class KafkaServer(val config: KafkaConfig, time: Time = SystemTime) extends Logg
// ignore and try again
}
}
if (!shutdownSuceeded) {
if (!shutdownSucceeded) {
Thread.sleep(config.controlledShutdownRetryBackoffMs)
warn("Retrying controlled shutdown after the previous attempt failed...")
}
@ -251,7 +258,7 @@ class KafkaServer(val config: KafkaConfig, time: Time = SystemTime) extends Logg @@ -251,7 +258,7 @@ class KafkaServer(val config: KafkaConfig, time: Time = SystemTime) extends Logg
channel = null
}
}
if (!shutdownSuceeded) {
if (!shutdownSucceeded) {
warn("Proceeding to do an unclean shutdown as all the controlled shutdown attempts failed")
}
}
@ -307,7 +314,7 @@ class KafkaServer(val config: KafkaConfig, time: Time = SystemTime) extends Logg @@ -307,7 +314,7 @@ class KafkaServer(val config: KafkaConfig, time: Time = SystemTime) extends Logg
def awaitShutdown(): Unit = shutdownLatch.await()
def getLogManager(): LogManager = logManager
private def createLogManager(zkClient: ZkClient, brokerState: BrokerState): LogManager = {
val defaultLogConfig = LogConfig(segmentSize = config.logSegmentBytes,
segmentMs = config.logRollTimeMillis,
@ -359,5 +366,55 @@ class KafkaServer(val config: KafkaConfig, time: Time = SystemTime) extends Logg @@ -359,5 +366,55 @@ class KafkaServer(val config: KafkaConfig, time: Time = SystemTime) extends Logg
new OffsetManager(offsetManagerConfig, replicaManager, zkClient, kafkaScheduler)
}
}
/**
* Generates new brokerId or reads from meta.properties based on following conditions
* <ol>
* <li> config has no broker.id provided , generates a broker.id based on Zookeeper's sequence
* <li> stored broker.id in meta.properties doesn't match in all the log.dirs throws InconsistentBrokerIdException
* <li> config has broker.id and meta.properties contains broker.id if they don't match throws InconsistentBrokerIdException
* <li> config has broker.id and there is no meta.properties file, creates new meta.properties and stores broker.id
* <ol>
* @returns A brokerId.
*/
private def getBrokerId: Int = {
var brokerId = config.brokerId
var logDirsWithoutMetaProps: List[String] = List()
val brokerIdSet = mutable.HashSet[Int]()
for (logDir <- config.logDirs) {
val brokerMetadataOpt = brokerMetadataCheckpoints(logDir).read()
brokerMetadataOpt match {
case Some(brokerMetadata: BrokerMetadata) =>
brokerIdSet.add(brokerMetadata.brokerId)
case None =>
logDirsWithoutMetaProps ++= List(logDir)
}
}
if(brokerIdSet.size > 1)
throw new InconsistentBrokerIdException("Failed to match brokerId across logDirs")
else if(brokerId >= 0 && brokerIdSet.size == 1 && brokerIdSet.last != brokerId)
throw new InconsistentBrokerIdException("Configured brokerId %s doesn't match stored brokerId %s in meta.properties".format(brokerId, brokerIdSet.last))
else if(brokerIdSet.size == 0 && brokerId < 0) // generate a new brokerId from Zookeeper
brokerId = generateBrokerId
else if(brokerIdSet.size == 1) // pick broker.id from meta.properties
brokerId = brokerIdSet.last
for(logDir <- logDirsWithoutMetaProps) {
val checkpoint = brokerMetadataCheckpoints(logDir)
checkpoint.write(new BrokerMetadata(brokerId))
}
return brokerId
}
private def generateBrokerId: Int = {
try {
ZkUtils.getBrokerSequenceId(zkClient, config.MaxReservedBrokerId)
} catch {
case e: Exception =>
error("Failed to generate broker.id due to ", e)
throw new GenerateBrokerIdException("Failed to generate broker.id", e)
}
}
}

38
core/src/main/scala/kafka/utils/ZkUtils.scala

@ -46,6 +46,7 @@ object ZkUtils extends Logging { @@ -46,6 +46,7 @@ object ZkUtils extends Logging {
val ReassignPartitionsPath = "/admin/reassign_partitions"
val DeleteTopicsPath = "/admin/delete_topics"
val PreferredReplicaLeaderElectionPath = "/admin/preferred_replica_election"
val BrokerSequenceIdPath = "/brokers/seqid"
def getTopicPath(topic: String): String = {
BrokerTopicsPath + "/" + topic
@ -87,7 +88,8 @@ object ZkUtils extends Logging { @@ -87,7 +88,8 @@ object ZkUtils extends Logging {
}
def setupCommonPaths(zkClient: ZkClient) {
for(path <- Seq(ConsumersPath, BrokerIdsPath, BrokerTopicsPath, TopicConfigChangesPath, TopicConfigPath, DeleteTopicsPath))
for(path <- Seq(ConsumersPath, BrokerIdsPath, BrokerTopicsPath, TopicConfigChangesPath, TopicConfigPath,
DeleteTopicsPath, BrokerSequenceIdPath))
makeSurePersistentPathExists(zkClient, path)
}
@ -122,6 +124,14 @@ object ZkUtils extends Logging { @@ -122,6 +124,14 @@ object ZkUtils extends Logging {
}
}
/** returns a sequence id generated by updating BrokerSequenceIdPath in Zk.
* users can provide brokerId in the config , inorder to avoid conflicts between zk generated
* seqId and config.brokerId we increment zk seqId by KafkaConfig.MaxReservedBrokerId.
*/
def getBrokerSequenceId(zkClient: ZkClient, MaxReservedBrokerId: Int): Int = {
getSequenceId(zkClient, BrokerSequenceIdPath) + MaxReservedBrokerId
}
/**
* Gets the in-sync replicas (ISR) for a specific topic and partition
*/
@ -696,6 +706,32 @@ object ZkUtils extends Logging { @@ -696,6 +706,32 @@ object ZkUtils extends Logging {
}
}
/**
* This API produces a sequence number by creating / updating given path in zookeeper
* It uses the stat returned by the zookeeper and return the version. Every time
* client updates the path stat.version gets incremented
*/
def getSequenceId(client: ZkClient, path: String): Int = {
try {
val stat = client.writeDataReturnStat(path, "", -1)
return stat.getVersion
} catch {
case e: ZkNoNodeException => {
createParentPath(client, BrokerSequenceIdPath)
try {
client.createPersistent(BrokerSequenceIdPath, "")
return 0
} catch {
case e: ZkNodeExistsException =>
val stat = client.writeDataReturnStat(BrokerSequenceIdPath, "", -1)
return stat.getVersion
case e2: Throwable => throw e2
}
}
case e2: Throwable => throw e2
}
}
def getAllTopics(zkClient: ZkClient): Seq[String] = {
val topics = ZkUtils.getChildrenParentMayNotExist(zkClient, BrokerTopicsPath)
if(topics == null)

127
core/src/test/scala/unit/kafka/server/ServerGenerateBrokerIdTest.scala

@ -0,0 +1,127 @@ @@ -0,0 +1,127 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package kafka.server
import kafka.zk.ZooKeeperTestHarness
import kafka.utils.{TestUtils, Utils}
import org.junit.Test
import org.scalatest.junit.JUnit3Suite
import junit.framework.Assert._
import java.io.File
class ServerGenerateBrokerIdTest extends JUnit3Suite with ZooKeeperTestHarness {
var props1 = TestUtils.createBrokerConfig(-1, TestUtils.choosePort)
var config1 = new KafkaConfig(props1)
var props2 = TestUtils.createBrokerConfig(0, TestUtils.choosePort)
var config2 = new KafkaConfig(props2)
val brokerMetaPropsFile = "meta.properties"
@Test
def testAutoGenerateBrokerId() {
var server1 = new KafkaServer(config1)
server1.startup()
server1.shutdown()
assertTrue(verifyBrokerMetadata(config1.logDirs, 1001))
// restart the server check to see if it uses the brokerId generated previously
server1 = new KafkaServer(config1)
server1.startup()
assertEquals(server1.config.brokerId, 1001)
server1.shutdown()
Utils.rm(server1.config.logDirs)
TestUtils.verifyNonDaemonThreadsStatus
}
@Test
def testUserConfigAndGeneratedBrokerId() {
// start the server with broker.id as part of config
val server1 = new KafkaServer(config1)
val server2 = new KafkaServer(config2)
val props3 = TestUtils.createBrokerConfig(-1, TestUtils.choosePort)
val config3 = new KafkaConfig(props3)
val server3 = new KafkaServer(config3)
server1.startup()
assertEquals(server1.config.brokerId,1001)
server2.startup()
assertEquals(server2.config.brokerId,0)
server3.startup()
assertEquals(server3.config.brokerId,1002)
server1.shutdown()
server2.shutdown()
server3.shutdown()
assertTrue(verifyBrokerMetadata(server1.config.logDirs,1001))
assertTrue(verifyBrokerMetadata(server2.config.logDirs,0))
assertTrue(verifyBrokerMetadata(server3.config.logDirs,1002))
Utils.rm(server1.config.logDirs)
Utils.rm(server2.config.logDirs)
Utils.rm(server3.config.logDirs)
TestUtils.verifyNonDaemonThreadsStatus
}
@Test
def testMultipleLogDirsMetaProps() {
// add multiple logDirs and check if the generate brokerId is stored in all of them
val logDirs = props1.getProperty("log.dir")+ "," + TestUtils.tempDir().getAbsolutePath +
"," + TestUtils.tempDir().getAbsolutePath
props1.setProperty("log.dir",logDirs)
config1 = new KafkaConfig(props1)
var server1 = new KafkaServer(config1)
server1.startup()
server1.shutdown()
assertTrue(verifyBrokerMetadata(config1.logDirs, 1001))
// addition to log.dirs after generation of a broker.id from zk should be copied over
val newLogDirs = props1.getProperty("log.dir") + "," + TestUtils.tempDir().getAbsolutePath
props1.setProperty("log.dir",newLogDirs)
config1 = new KafkaConfig(props1)
server1 = new KafkaServer(config1)
server1.startup()
server1.shutdown()
assertTrue(verifyBrokerMetadata(config1.logDirs, 1001))
Utils.rm(server1.config.logDirs)
TestUtils.verifyNonDaemonThreadsStatus
}
@Test
def testConsistentBrokerIdFromUserConfigAndMetaProps() {
// check if configured brokerId and stored brokerId are equal or throw InconsistentBrokerException
var server1 = new KafkaServer(config1) //auto generate broker Id
server1.startup()
server1.shutdown()
server1 = new KafkaServer(config2) // user specified broker id
try {
server1.startup()
} catch {
case e: kafka.common.InconsistentBrokerIdException => //success
}
server1.shutdown()
Utils.rm(server1.config.logDirs)
TestUtils.verifyNonDaemonThreadsStatus
}
def verifyBrokerMetadata(logDirs: Seq[String], brokerId: Int): Boolean = {
for(logDir <- logDirs) {
val brokerMetadataOpt = (new BrokerMetadataCheckpoint(
new File(logDir + File.separator + brokerMetaPropsFile))).read()
brokerMetadataOpt match {
case Some(brokerMetadata: BrokerMetadata) =>
if (brokerMetadata.brokerId != brokerId) return false
case _ => return false
}
}
true
}
}

9
core/src/test/scala/unit/kafka/utils/TestUtils.scala

@ -94,7 +94,7 @@ object TestUtils extends Logging { @@ -94,7 +94,7 @@ object TestUtils extends Logging {
Utils.rm(f)
}
})
f
}
@ -154,7 +154,7 @@ object TestUtils extends Logging { @@ -154,7 +154,7 @@ object TestUtils extends Logging {
def createBrokerConfig(nodeId: Int, port: Int = choosePort(),
enableControlledShutdown: Boolean = true): Properties = {
val props = new Properties
props.put("broker.id", nodeId.toString)
if (nodeId >= 0) props.put("broker.id", nodeId.toString)
props.put("host.name", "localhost")
props.put("port", port.toString)
props.put("log.dir", TestUtils.tempDir().getAbsolutePath)
@ -700,6 +700,11 @@ object TestUtils extends Logging { @@ -700,6 +700,11 @@ object TestUtils extends Logging {
ZkUtils.pathExists(zkClient, ZkUtils.ReassignPartitionsPath)
}
def verifyNonDaemonThreadsStatus() {
assertEquals(0, Thread.getAllStackTraces.keySet().toArray
.map(_.asInstanceOf[Thread])
.count(t => !t.isDaemon && t.isAlive && t.getClass.getCanonicalName.toLowerCase.startsWith("kafka")))
}
/**
* Create new LogManager instance with default configuration for testing

Loading…
Cancel
Save