Browse Source

KAFKA-5244; Refactor BrokerTopicStats and ControllerStats so that they are classes

This removes the need to force object initialisation via hacks to register
the relevant Yammer metrics during start-up.

It also works around issues caused by tests that delete JVM-wide singleton
metrics (like `MetricsDuringTopicCreationDeletionTest`). Without this
change, they would never be registered again. After this change, they will
be registered again during KafkaServer start-up.

It would be even better not to rely on JVM side singleton metrics (like we do
for Kafka Metrics), but that's a bigger change that should be considered
separately.

Author: Ismael Juma <ismael@juma.me.uk>

Reviewers: Rajini Sivaram <rajinisivaram@googlemail.com>

Closes #3059 from ijuma/kafka-5244-broker-static-stats-and-controller-stats-as-classes
pull/3064/head
Ismael Juma 8 years ago
parent
commit
46aa88b9cf
  1. 24
      core/src/main/scala/kafka/controller/KafkaController.scala
  2. 2
      core/src/main/scala/kafka/controller/PartitionLeaderSelector.scala
  3. 10
      core/src/main/scala/kafka/log/Log.scala
  4. 13
      core/src/main/scala/kafka/log/LogManager.scala
  5. 3
      core/src/main/scala/kafka/server/KafkaApis.scala
  6. 32
      core/src/main/scala/kafka/server/KafkaRequestHandler.scala
  7. 28
      core/src/main/scala/kafka/server/KafkaServer.scala
  8. 4
      core/src/main/scala/kafka/server/ReplicaFetcherThread.scala
  9. 31
      core/src/main/scala/kafka/server/ReplicaManager.scala
  10. 2
      core/src/test/scala/integration/kafka/server/ReplicaFetcherThreadFatalErrorTest.scala
  11. 4
      core/src/test/scala/other/kafka/StressTestLog.scala
  12. 3
      core/src/test/scala/other/kafka/TestLinearWriteSpeed.scala
  13. 4
      core/src/test/scala/unit/kafka/log/AbstractLogCleanerIntegrationTest.scala
  14. 7
      core/src/test/scala/unit/kafka/log/BrokerCompressionTest.scala
  15. 7
      core/src/test/scala/unit/kafka/log/LogCleanerManagerTest.scala
  16. 4
      core/src/test/scala/unit/kafka/log/LogCleanerTest.scala
  17. 128
      core/src/test/scala/unit/kafka/log/LogTest.scala
  18. 2
      core/src/test/scala/unit/kafka/metrics/MetricsTest.scala
  19. 4
      core/src/test/scala/unit/kafka/server/HighwatermarkPersistenceTest.scala
  20. 3
      core/src/test/scala/unit/kafka/server/ISRExpirationTest.scala
  21. 3
      core/src/test/scala/unit/kafka/server/ReplicaManagerQuotasTest.scala
  22. 18
      core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala
  23. 12
      core/src/test/scala/unit/kafka/server/SimpleFetchTest.scala
  24. 3
      core/src/test/scala/unit/kafka/utils/TestUtils.scala

24
core/src/main/scala/kafka/controller/KafkaController.scala

@ -42,7 +42,10 @@ import scala.collection._ @@ -42,7 +42,10 @@ import scala.collection._
import scala.util.Try
class ControllerContext(val zkUtils: ZkUtils) {
val controllerStats = new ControllerStats
var controllerChannelManager: ControllerChannelManager = null
var shuttingDownBrokerIds: mutable.Set[Int] = mutable.Set.empty
var epoch: Int = KafkaController.InitialControllerEpoch - 1
var epochZkVersion: Int = KafkaController.InitialControllerEpochZkVersion - 1
@ -150,10 +153,12 @@ class KafkaController(val config: KafkaConfig, zkUtils: ZkUtils, val brokerState @@ -150,10 +153,12 @@ class KafkaController(val config: KafkaConfig, zkUtils: ZkUtils, val brokerState
val controllerContext = new ControllerContext(zkUtils)
val partitionStateMachine = new PartitionStateMachine(this)
val replicaStateMachine = new ReplicaStateMachine(this)
// have a separate scheduler for the controller to be able to start and stop independently of the
// kafka server
private val kafkaScheduler = new KafkaScheduler(1)
val topicDeletionManager: TopicDeletionManager = new TopicDeletionManager(this)
val topicDeletionManager = new TopicDeletionManager(this)
val offlinePartitionSelector = new OfflinePartitionLeaderSelector(controllerContext, config)
private val reassignedPartitionLeaderSelector = new ReassignedPartitionLeaderSelector(controllerContext)
private val preferredReplicaPartitionLeaderSelector = new PreferredReplicaPartitionLeaderSelector(controllerContext)
@ -162,6 +167,7 @@ class KafkaController(val config: KafkaConfig, zkUtils: ZkUtils, val brokerState @@ -162,6 +167,7 @@ class KafkaController(val config: KafkaConfig, zkUtils: ZkUtils, val brokerState
private val controllerEventQueue = new LinkedBlockingQueue[ControllerEvent]
private val controllerEventThread = new ControllerEventThread("controller-event-thread")
private val brokerChangeListener = new BrokerChangeListener(this)
private val topicChangeListener = new TopicChangeListener(this)
private val topicDeletionListener = new TopicDeletionListener(this)
@ -169,6 +175,7 @@ class KafkaController(val config: KafkaConfig, zkUtils: ZkUtils, val brokerState @@ -169,6 +175,7 @@ class KafkaController(val config: KafkaConfig, zkUtils: ZkUtils, val brokerState
private val partitionReassignmentListener = new PartitionReassignmentListener(this)
private val preferredReplicaElectionListener = new PreferredReplicaElectionListener(this)
private val isrChangeNotificationListener = new IsrChangeNotificationListener(this)
private val activeControllerId = new AtomicInteger(-1)
private val offlinePartitionCount = new AtomicInteger(0)
private val preferredReplicaImbalanceCount = new AtomicInteger(0)
@ -1155,7 +1162,7 @@ class KafkaController(val config: KafkaConfig, zkUtils: ZkUtils, val brokerState @@ -1155,7 +1162,7 @@ class KafkaController(val config: KafkaConfig, zkUtils: ZkUtils, val brokerState
case class BrokerChange(currentBrokerList: Seq[String]) extends ControllerEvent {
override def process(): Unit = {
if (!isActive) return
ControllerStats.leaderElectionTimer.time {
controllerContext.controllerStats.leaderElectionTimer.time {
try {
val curBrokers = currentBrokerList.map(_.toInt).toSet.flatMap(zkUtils.getBrokerInfo)
val curBrokerIds = curBrokers.map(_.id)
@ -1713,16 +1720,9 @@ case class LeaderIsrAndControllerEpoch(leaderAndIsr: LeaderAndIsr, controllerEpo @@ -1713,16 +1720,9 @@ case class LeaderIsrAndControllerEpoch(leaderAndIsr: LeaderAndIsr, controllerEpo
}
}
object ControllerStats extends KafkaMetricsGroup {
private val _uncleanLeaderElectionRate = newMeter("UncleanLeaderElectionsPerSec", "elections", TimeUnit.SECONDS)
private val _leaderElectionTimer = new KafkaTimer(newTimer("LeaderElectionRateAndTimeMs", TimeUnit.MILLISECONDS, TimeUnit.SECONDS))
// KafkaServer needs to initialize controller metrics during startup. We perform initialization
// through method calls to avoid Scala compiler warnings.
def uncleanLeaderElectionRate: Meter = _uncleanLeaderElectionRate
def leaderElectionTimer: KafkaTimer = _leaderElectionTimer
private[controller] class ControllerStats extends KafkaMetricsGroup {
val uncleanLeaderElectionRate = newMeter("UncleanLeaderElectionsPerSec", "elections", TimeUnit.SECONDS)
val leaderElectionTimer = new KafkaTimer(newTimer("LeaderElectionRateAndTimeMs", TimeUnit.MILLISECONDS, TimeUnit.SECONDS))
}
sealed trait ControllerEvent {

2
core/src/main/scala/kafka/controller/PartitionLeaderSelector.scala

@ -74,7 +74,7 @@ class OfflinePartitionLeaderSelector(controllerContext: ControllerContext, confi @@ -74,7 +74,7 @@ class OfflinePartitionLeaderSelector(controllerContext: ControllerContext, confi
throw new NoReplicaOnlineException(s"No replica for partition $topicAndPartition is alive. Live " +
s"brokers are: [${controllerContext.liveBrokerIds}]. Assigned replicas are: [$assignedReplicas].")
} else {
ControllerStats.uncleanLeaderElectionRate.mark()
controllerContext.controllerStats.uncleanLeaderElectionRate.mark()
val newLeader = liveAssignedReplicas.head
warn(s"No broker in ISR is alive for $topicAndPartition. Elect leader $newLeader from live " +
s"brokers ${liveAssignedReplicas.mkString(",")}. There's potential data loss.")

10
core/src/main/scala/kafka/log/Log.scala

@ -113,6 +113,7 @@ case class CompletedTxn(producerId: Long, firstOffset: Long, lastOffset: Long, i @@ -113,6 +113,7 @@ case class CompletedTxn(producerId: Long, firstOffset: Long, lastOffset: Long, i
* Other activities such as log cleaning are not affected by logStartOffset.
* @param recoveryPoint The offset at which to begin recovery--i.e. the first offset which has not been flushed to disk
* @param scheduler The thread pool scheduler used for background actions
* @param brokerTopicStats Container for Broker Topic Yammer Metrics
* @param time The time instance used for checking the clock
* @param maxProducerIdExpirationMs The maximum amount of time to wait before a producer id is considered expired
* @param producerIdExpirationCheckIntervalMs How often to check for producer ids which need to be expired
@ -123,6 +124,7 @@ class Log(@volatile var dir: File, @@ -123,6 +124,7 @@ class Log(@volatile var dir: File,
@volatile var logStartOffset: Long = 0L,
@volatile var recoveryPoint: Long = 0L,
scheduler: Scheduler,
brokerTopicStats: BrokerTopicStats,
time: Time = Time.SYSTEM,
val maxProducerIdExpirationMs: Int = 60 * 60 * 1000,
val producerIdExpirationCheckIntervalMs: Int = 10 * 60 * 1000) extends Logging with KafkaMetricsGroup {
@ -560,8 +562,8 @@ class Log(@volatile var dir: File, @@ -560,8 +562,8 @@ class Log(@volatile var dir: File,
if (batch.sizeInBytes > config.maxMessageSize) {
// we record the original message set size instead of the trimmed size
// to be consistent with pre-compression bytesRejectedRate recording
BrokerTopicStats.getBrokerTopicStats(topicPartition.topic).bytesRejectedRate.mark(records.sizeInBytes)
BrokerTopicStats.getBrokerAllTopicsStats.bytesRejectedRate.mark(records.sizeInBytes)
brokerTopicStats.topicStats(topicPartition.topic).bytesRejectedRate.mark(records.sizeInBytes)
brokerTopicStats.allTopicsStats.bytesRejectedRate.mark(records.sizeInBytes)
throw new RecordTooLargeException("Message batch size is %d bytes which exceeds the maximum configured size of %d."
.format(batch.sizeInBytes, config.maxMessageSize))
}
@ -746,8 +748,8 @@ class Log(@volatile var dir: File, @@ -746,8 +748,8 @@ class Log(@volatile var dir: File,
// Check if the message sizes are valid.
val batchSize = batch.sizeInBytes
if (batchSize > config.maxMessageSize) {
BrokerTopicStats.getBrokerTopicStats(topicPartition.topic).bytesRejectedRate.mark(records.sizeInBytes)
BrokerTopicStats.getBrokerAllTopicsStats.bytesRejectedRate.mark(records.sizeInBytes)
brokerTopicStats.topicStats(topicPartition.topic).bytesRejectedRate.mark(records.sizeInBytes)
brokerTopicStats.allTopicsStats.bytesRejectedRate.mark(records.sizeInBytes)
throw new RecordTooLargeException(s"The record batch size is $batchSize bytes which exceeds the maximum configured " +
s"value of ${config.maxMessageSize}.")
}

13
core/src/main/scala/kafka/log/LogManager.scala

@ -55,6 +55,7 @@ class LogManager(val logDirs: Array[File], @@ -55,6 +55,7 @@ class LogManager(val logDirs: Array[File],
val maxPidExpirationMs: Int,
scheduler: Scheduler,
val brokerState: BrokerState,
brokerTopicStats: BrokerTopicStats,
time: Time) extends Logging {
val RecoveryPointCheckpointFile = "recovery-point-offset-checkpoint"
val LogStartOffsetCheckpointFile = "log-start-offset-checkpoint"
@ -175,7 +176,8 @@ class LogManager(val logDirs: Array[File], @@ -175,7 +176,8 @@ class LogManager(val logDirs: Array[File],
recoveryPoint = logRecoveryPoint,
maxProducerIdExpirationMs = maxPidExpirationMs,
scheduler = scheduler,
time = time)
time = time,
brokerTopicStats = brokerTopicStats)
if (logDir.getName.endsWith(Log.DeleteDirSuffix)) {
this.logsToBeDeleted.add(current)
} else {
@ -416,7 +418,8 @@ class LogManager(val logDirs: Array[File], @@ -416,7 +418,8 @@ class LogManager(val logDirs: Array[File],
recoveryPoint = 0L,
maxProducerIdExpirationMs = maxPidExpirationMs,
scheduler = scheduler,
time = time)
time = time,
brokerTopicStats = brokerTopicStats)
logs.put(topicPartition, log)
info("Created log for partition [%s,%d] in %s with properties {%s}."
.format(topicPartition.topic,
@ -572,7 +575,8 @@ object LogManager { @@ -572,7 +575,8 @@ object LogManager {
zkUtils: ZkUtils,
brokerState: BrokerState,
kafkaScheduler: KafkaScheduler,
time: Time): LogManager = {
time: Time,
brokerTopicStats: BrokerTopicStats): LogManager = {
val defaultProps = KafkaServer.copyKafkaConfigToLog(config)
val defaultLogConfig = LogConfig(defaultProps)
@ -602,6 +606,7 @@ object LogManager { @@ -602,6 +606,7 @@ object LogManager {
maxPidExpirationMs = config.transactionIdExpirationMs,
scheduler = kafkaScheduler,
brokerState = brokerState,
time = time)
time = time,
brokerTopicStats = brokerTopicStats)
}
}

3
core/src/main/scala/kafka/server/KafkaApis.scala

@ -70,6 +70,7 @@ class KafkaApis(val requestChannel: RequestChannel, @@ -70,6 +70,7 @@ class KafkaApis(val requestChannel: RequestChannel,
val metrics: Metrics,
val authorizer: Option[Authorizer],
val quotas: QuotaManagers,
brokerTopicStats: BrokerTopicStats,
val clusterId: String,
time: Time) extends Logging {
@ -516,7 +517,7 @@ class KafkaApis(val requestChannel: RequestChannel, @@ -516,7 +517,7 @@ class KafkaApis(val requestChannel: RequestChannel,
fetchedPartitionData.put(topicPartition, data)
// record the bytes out metrics only when the response is being sent
BrokerTopicStats.updateBytesOut(topicPartition.topic, fetchRequest.isFromFollower, data.records.sizeInBytes)
brokerTopicStats.updateBytesOut(topicPartition.topic, fetchRequest.isFromFollower, data.records.sizeInBytes)
}
val response = new FetchResponse(fetchedPartitionData, 0)

32
core/src/main/scala/kafka/server/KafkaRequestHandler.scala

@ -107,7 +107,7 @@ class KafkaRequestHandlerPool(val brokerId: Int, @@ -107,7 +107,7 @@ class KafkaRequestHandlerPool(val brokerId: Int,
class BrokerTopicMetrics(name: Option[String]) extends KafkaMetricsGroup {
val tags: scala.collection.Map[String, String] = name match {
case None => scala.collection.Map.empty
case None => Map.empty
case Some(topic) => Map("topic" -> topic)
}
@ -142,7 +142,7 @@ class BrokerTopicMetrics(name: Option[String]) extends KafkaMetricsGroup { @@ -142,7 +142,7 @@ class BrokerTopicMetrics(name: Option[String]) extends KafkaMetricsGroup {
}
}
object BrokerTopicStats extends Logging {
object BrokerTopicStats {
val MessagesInPerSec = "MessagesInPerSec"
val BytesInPerSec = "BytesInPerSec"
val BytesOutPerSec = "BytesOutPerSec"
@ -153,25 +153,26 @@ object BrokerTopicStats extends Logging { @@ -153,25 +153,26 @@ object BrokerTopicStats extends Logging {
val FailedFetchRequestsPerSec = "FailedFetchRequestsPerSec"
val TotalProduceRequestsPerSec = "TotalProduceRequestsPerSec"
val TotalFetchRequestsPerSec = "TotalFetchRequestsPerSec"
private val valueFactory = (k: String) => new BrokerTopicMetrics(Some(k))
private val stats = new Pool[String, BrokerTopicMetrics](Some(valueFactory))
private val allTopicsStats = new BrokerTopicMetrics(None)
}
def getBrokerAllTopicsStats(): BrokerTopicMetrics = allTopicsStats
class BrokerTopicStats {
import BrokerTopicStats._
private val stats = new Pool[String, BrokerTopicMetrics](Some(valueFactory))
val allTopicsStats = new BrokerTopicMetrics(None)
def getBrokerTopicStats(topic: String): BrokerTopicMetrics = {
def topicStats(topic: String): BrokerTopicMetrics =
stats.getAndMaybePut(topic)
}
def updateReplicationBytesIn(value: Long) {
getBrokerAllTopicsStats.replicationBytesInRate.foreach { metric =>
allTopicsStats.replicationBytesInRate.foreach { metric =>
metric.mark(value)
}
}
private def updateReplicationBytesOut(value: Long) {
getBrokerAllTopicsStats.replicationBytesOutRate.foreach { metric =>
allTopicsStats.replicationBytesOutRate.foreach { metric =>
metric.mark(value)
}
}
@ -186,8 +187,15 @@ object BrokerTopicStats extends Logging { @@ -186,8 +187,15 @@ object BrokerTopicStats extends Logging {
if (isFollower) {
updateReplicationBytesOut(value)
} else {
getBrokerTopicStats(topic).bytesOutRate.mark(value)
getBrokerAllTopicsStats.bytesOutRate.mark(value)
topicStats(topic).bytesOutRate.mark(value)
allTopicsStats.bytesOutRate.mark(value)
}
}
def close(): Unit = {
allTopicsStats.close()
stats.values.foreach(_.close())
}
}

28
core/src/main/scala/kafka/server/KafkaServer.scala

@ -139,9 +139,12 @@ class KafkaServer(val config: KafkaConfig, time: Time = Time.SYSTEM, threadNameP @@ -139,9 +139,12 @@ class KafkaServer(val config: KafkaConfig, time: Time = Time.SYSTEM, threadNameP
val brokerMetadataCheckpoints = config.logDirs.map(logDir => (logDir, new BrokerMetadataCheckpoint(new File(logDir + File.separator +brokerMetaPropsFile)))).toMap
private var _clusterId: String = null
private var _brokerTopicStats: BrokerTopicStats = null
def clusterId: String = _clusterId
private[kafka] def brokerTopicStats = _brokerTopicStats
newGauge(
"BrokerState",
new Gauge[Int] {
@ -204,11 +207,14 @@ class KafkaServer(val config: KafkaConfig, time: Time = Time.SYSTEM, threadNameP @@ -204,11 +207,14 @@ class KafkaServer(val config: KafkaConfig, time: Time = Time.SYSTEM, threadNameP
val metricConfig = KafkaServer.metricConfig(config)
metrics = new Metrics(metricConfig, reporters, time, true)
/* register broker metrics */
_brokerTopicStats = new BrokerTopicStats
quotaManagers = QuotaFactory.instantiate(config, metrics, time)
notifyClusterListeners(kafkaMetricsReporters ++ reporters.asScala)
/* start log manager */
logManager = LogManager(config, zkUtils, brokerState, kafkaScheduler, time)
logManager = LogManager(config, zkUtils, brokerState, kafkaScheduler, time, brokerTopicStats)
logManager.startup()
metadataCache = new MetadataCache(config.brokerId)
@ -246,7 +252,8 @@ class KafkaServer(val config: KafkaConfig, time: Time = Time.SYSTEM, threadNameP @@ -246,7 +252,8 @@ class KafkaServer(val config: KafkaConfig, time: Time = Time.SYSTEM, threadNameP
/* start processing requests */
apis = new KafkaApis(socketServer.requestChannel, replicaManager, adminManager, groupCoordinator, transactionCoordinator,
kafkaController, zkUtils, config.brokerId, config, metadataCache, metrics, authorizer, quotaManagers, clusterId, time)
kafkaController, zkUtils, config.brokerId, config, metadataCache, metrics, authorizer, quotaManagers,
brokerTopicStats, clusterId, time)
requestHandlerPool = new KafkaRequestHandlerPool(config.brokerId, socketServer.requestChannel, apis, time,
config.numIoThreads)
@ -277,9 +284,6 @@ class KafkaServer(val config: KafkaConfig, time: Time = Time.SYSTEM, threadNameP @@ -277,9 +284,6 @@ class KafkaServer(val config: KafkaConfig, time: Time = Time.SYSTEM, threadNameP
// Now that the broker id is successfully registered via KafkaHealthcheck, checkpoint it
checkpointBrokerId(config.brokerId)
/* register broker metrics */
registerStats()
brokerState.newState(RunningAsBroker)
shutdownLatch = new CountDownLatch(1)
startupComplete.set(true)
@ -304,7 +308,8 @@ class KafkaServer(val config: KafkaConfig, time: Time = Time.SYSTEM, threadNameP @@ -304,7 +308,8 @@ class KafkaServer(val config: KafkaConfig, time: Time = Time.SYSTEM, threadNameP
}
protected def createReplicaManager(isShuttingDown: AtomicBoolean): ReplicaManager =
new ReplicaManager(config, metrics, time, zkUtils, kafkaScheduler, logManager, isShuttingDown, quotaManagers.follower, metadataCache)
new ReplicaManager(config, metrics, time, zkUtils, kafkaScheduler, logManager, isShuttingDown, quotaManagers.follower,
brokerTopicStats, metadataCache)
private def initZk(): ZkUtils = {
info(s"Connecting to zookeeper on ${config.zkConnect}")
@ -344,15 +349,6 @@ class KafkaServer(val config: KafkaConfig, time: Time = Time.SYSTEM, threadNameP @@ -344,15 +349,6 @@ class KafkaServer(val config: KafkaConfig, time: Time = Time.SYSTEM, threadNameP
zkUtils.getClusterId.getOrElse(zkUtils.createOrGetClusterId(CoreUtils.generateUuidAsBase64))
}
/**
* Forces some dynamic jmx beans to be registered on server startup.
*/
private def registerStats() {
BrokerTopicStats.getBrokerAllTopicsStats()
ControllerStats.uncleanLeaderElectionRate
ControllerStats.leaderElectionTimer
}
/**
* Performs controlled shutdown
*/
@ -620,6 +616,8 @@ class KafkaServer(val config: KafkaConfig, time: Time = Time.SYSTEM, threadNameP @@ -620,6 +616,8 @@ class KafkaServer(val config: KafkaConfig, time: Time = Time.SYSTEM, threadNameP
if (metrics != null)
CoreUtils.swallow(metrics.close())
if (brokerTopicStats != null)
CoreUtils.swallow(brokerTopicStats.close())
brokerState.newState(NotRunning)

4
core/src/main/scala/kafka/server/ReplicaFetcherThread.scala

@ -112,7 +112,7 @@ class ReplicaFetcherThread(name: String, @@ -112,7 +112,7 @@ class ReplicaFetcherThread(name: String,
trace(s"Follower ${replica.brokerId} set replica high watermark for partition $topicPartition to $followerHighWatermark")
if (quota.isThrottled(topicPartition))
quota.record(records.sizeInBytes)
BrokerTopicStats.updateReplicationBytesIn(records.sizeInBytes)
replicaMgr.brokerTopicStats.updateReplicationBytesIn(records.sizeInBytes)
} catch {
case e: KafkaStorageException =>
fatal(s"Disk error while replicating data for $topicPartition", e)
@ -368,4 +368,4 @@ object ReplicaFetcherThread { @@ -368,4 +368,4 @@ object ReplicaFetcherThread {
case e => Some(e.exception)
}
}
}
}

31
core/src/main/scala/kafka/server/ReplicaManager.scala

@ -131,6 +131,7 @@ class ReplicaManager(val config: KafkaConfig, @@ -131,6 +131,7 @@ class ReplicaManager(val config: KafkaConfig,
val logManager: LogManager,
val isShuttingDown: AtomicBoolean,
quotaManager: ReplicationQuotaManager,
val brokerTopicStats: BrokerTopicStats,
val metadataCache: MetadataCache,
threadNamePrefix: Option[String] = None) extends Logging with KafkaMetricsGroup {
/* epoch of the controller that last changed the leader */
@ -264,7 +265,7 @@ class ReplicaManager(val config: KafkaConfig, @@ -264,7 +265,7 @@ class ReplicaManager(val config: KafkaConfig,
removedPartition.delete() // this will delete the local log
val topicHasPartitions = allPartitions.keys.exists(tp => topicPartition.topic == tp.topic)
if (!topicHasPartitions)
BrokerTopicStats.removeMetrics(topicPartition.topic)
brokerTopicStats.removeMetrics(topicPartition.topic)
}
}
case None =>
@ -503,8 +504,8 @@ class ReplicaManager(val config: KafkaConfig, @@ -503,8 +504,8 @@ class ReplicaManager(val config: KafkaConfig,
requiredAcks: Short): Map[TopicPartition, LogAppendResult] = {
trace("Append [%s] to local log ".format(entriesPerPartition))
entriesPerPartition.map { case (topicPartition, records) =>
BrokerTopicStats.getBrokerTopicStats(topicPartition.topic).totalProduceRequestRate.mark()
BrokerTopicStats.getBrokerAllTopicsStats().totalProduceRequestRate.mark()
brokerTopicStats.topicStats(topicPartition.topic).totalProduceRequestRate.mark()
brokerTopicStats.allTopicsStats.totalProduceRequestRate.mark()
// reject appending to internal topics if it is not allowed
if (Topic.isInternal(topicPartition.topic) && !internalTopicsAllowed) {
@ -529,10 +530,10 @@ class ReplicaManager(val config: KafkaConfig, @@ -529,10 +530,10 @@ class ReplicaManager(val config: KafkaConfig,
info.lastOffset - info.firstOffset + 1
// update stats for successfully appended bytes and messages as bytesInRate and messageInRate
BrokerTopicStats.getBrokerTopicStats(topicPartition.topic).bytesInRate.mark(records.sizeInBytes)
BrokerTopicStats.getBrokerAllTopicsStats.bytesInRate.mark(records.sizeInBytes)
BrokerTopicStats.getBrokerTopicStats(topicPartition.topic).messagesInRate.mark(numAppendedMessages)
BrokerTopicStats.getBrokerAllTopicsStats.messagesInRate.mark(numAppendedMessages)
brokerTopicStats.topicStats(topicPartition.topic).bytesInRate.mark(records.sizeInBytes)
brokerTopicStats.allTopicsStats.bytesInRate.mark(records.sizeInBytes)
brokerTopicStats.topicStats(topicPartition.topic).messagesInRate.mark(numAppendedMessages)
brokerTopicStats.allTopicsStats.messagesInRate.mark(numAppendedMessages)
trace("%d bytes written to log %s-%d beginning at offset %d and ending at offset %d"
.format(records.sizeInBytes, topicPartition.topic, topicPartition.partition, info.firstOffset, info.lastOffset))
@ -552,8 +553,8 @@ class ReplicaManager(val config: KafkaConfig, @@ -552,8 +553,8 @@ class ReplicaManager(val config: KafkaConfig,
_: InvalidTimestampException) =>
(topicPartition, LogAppendResult(LogAppendInfo.UnknownLogAppendInfo, Some(e)))
case t: Throwable =>
BrokerTopicStats.getBrokerTopicStats(topicPartition.topic).failedProduceRequestRate.mark()
BrokerTopicStats.getBrokerAllTopicsStats.failedProduceRequestRate.mark()
brokerTopicStats.topicStats(topicPartition.topic).failedProduceRequestRate.mark()
brokerTopicStats.allTopicsStats.failedProduceRequestRate.mark()
error("Error processing append operation on partition %s".format(topicPartition), t)
(topicPartition, LogAppendResult(LogAppendInfo.UnknownLogAppendInfo, Some(t)))
}
@ -649,8 +650,8 @@ class ReplicaManager(val config: KafkaConfig, @@ -649,8 +650,8 @@ class ReplicaManager(val config: KafkaConfig,
val partitionFetchSize = fetchInfo.maxBytes
val followerLogStartOffset = fetchInfo.logStartOffset
BrokerTopicStats.getBrokerTopicStats(tp.topic).totalFetchRequestRate.mark()
BrokerTopicStats.getBrokerAllTopicsStats().totalFetchRequestRate.mark()
brokerTopicStats.topicStats(tp.topic).totalFetchRequestRate.mark()
brokerTopicStats.allTopicsStats.totalFetchRequestRate.mark()
try {
trace(s"Fetching log segment for partition $tp, offset $offset, partition fetch size $partitionFetchSize, " +
@ -726,8 +727,8 @@ class ReplicaManager(val config: KafkaConfig, @@ -726,8 +727,8 @@ class ReplicaManager(val config: KafkaConfig,
readSize = partitionFetchSize,
exception = Some(e))
case e: Throwable =>
BrokerTopicStats.getBrokerTopicStats(tp.topic).failedFetchRequestRate.mark()
BrokerTopicStats.getBrokerAllTopicsStats().failedFetchRequestRate.mark()
brokerTopicStats.topicStats(tp.topic).failedFetchRequestRate.mark()
brokerTopicStats.allTopicsStats.failedFetchRequestRate.mark()
error(s"Error processing fetch operation on partition $tp, offset $offset", e)
LogReadResult(info = FetchDataInfo(LogOffsetMetadata.UnknownOffsetMetadata, MemoryRecords.EMPTY),
hw = -1L,
@ -1119,8 +1120,8 @@ object OffsetsForLeaderEpoch extends Logging { @@ -1119,8 +1120,8 @@ object OffsetsForLeaderEpoch extends Logging {
val offset = try {
new EpochEndOffset(NONE, replicaManager.getLeaderReplicaIfLocal(tp).epochs.get.endOffsetFor(epoch))
} catch {
case e: NotLeaderForPartitionException => new EpochEndOffset(NOT_LEADER_FOR_PARTITION, UNDEFINED_EPOCH_OFFSET)
case e: UnknownTopicOrPartitionException => new EpochEndOffset(UNKNOWN_TOPIC_OR_PARTITION, UNDEFINED_EPOCH_OFFSET)
case _: NotLeaderForPartitionException => new EpochEndOffset(NOT_LEADER_FOR_PARTITION, UNDEFINED_EPOCH_OFFSET)
case _: UnknownTopicOrPartitionException => new EpochEndOffset(UNKNOWN_TOPIC_OR_PARTITION, UNDEFINED_EPOCH_OFFSET)
}
(tp, offset)
}.toMap.asJava

2
core/src/test/scala/integration/kafka/server/ReplicaFetcherThreadFatalErrorTest.scala

@ -111,7 +111,7 @@ class ReplicaFetcherThreadFatalErrorTest extends ZooKeeperTestHarness { @@ -111,7 +111,7 @@ class ReplicaFetcherThreadFatalErrorTest extends ZooKeeperTestHarness {
override def createReplicaManager(isShuttingDown: AtomicBoolean): ReplicaManager = {
new ReplicaManager(config, metrics, time, zkUtils, kafkaScheduler, logManager, isShuttingDown,
quotaManagers.follower, metadataCache) {
quotaManagers.follower, new BrokerTopicStats, metadataCache) {
override protected def createReplicaFetcherManager(metrics: Metrics, time: Time, threadNamePrefix: Option[String],
quotaManager: ReplicationQuotaManager) =

4
core/src/test/scala/other/kafka/StressTestLog.scala

@ -21,6 +21,7 @@ import java.util.Properties @@ -21,6 +21,7 @@ import java.util.Properties
import java.util.concurrent.atomic._
import kafka.log._
import kafka.server.BrokerTopicStats
import kafka.utils._
import org.apache.kafka.clients.consumer.OffsetOutOfRangeException
import org.apache.kafka.common.record.FileRecords
@ -46,7 +47,8 @@ object StressTestLog { @@ -46,7 +47,8 @@ object StressTestLog {
logStartOffset = 0L,
recoveryPoint = 0L,
scheduler = time.scheduler,
time = time)
time = time,
brokerTopicStats = new BrokerTopicStats)
val writer = new WriterThread(log)
writer.start()
val reader = new ReaderThread(log)

3
core/src/test/scala/other/kafka/TestLinearWriteSpeed.scala

@ -25,6 +25,7 @@ import java.util.{Properties, Random} @@ -25,6 +25,7 @@ import java.util.{Properties, Random}
import joptsimple._
import kafka.log._
import kafka.message._
import kafka.server.BrokerTopicStats
import kafka.utils._
import org.apache.kafka.common.record._
import org.apache.kafka.common.utils.{Time, Utils}
@ -206,7 +207,7 @@ object TestLinearWriteSpeed { @@ -206,7 +207,7 @@ object TestLinearWriteSpeed {
class LogWritable(val dir: File, config: LogConfig, scheduler: Scheduler, val messages: MemoryRecords) extends Writable {
Utils.delete(dir)
val log = new Log(dir, config, 0L, 0L, scheduler, Time.SYSTEM)
val log = new Log(dir, config, 0L, 0L, scheduler, new BrokerTopicStats, Time.SYSTEM)
def write(): Int = {
log.appendAsLeader(messages, leaderEpoch = 0)
messages.sizeInBytes

4
core/src/test/scala/unit/kafka/log/AbstractLogCleanerIntegrationTest.scala

@ -20,6 +20,7 @@ import java.io.File @@ -20,6 +20,7 @@ import java.io.File
import java.nio.file.Files
import java.util.Properties
import kafka.server.BrokerTopicStats
import kafka.utils.{MockTime, Pool, TestUtils}
import org.apache.kafka.common.TopicPartition
import org.apache.kafka.common.utils.Utils
@ -95,7 +96,8 @@ abstract class AbstractLogCleanerIntegrationTest { @@ -95,7 +96,8 @@ abstract class AbstractLogCleanerIntegrationTest {
logStartOffset = 0L,
recoveryPoint = 0L,
scheduler = time.scheduler,
time = time)
time = time,
brokerTopicStats = new BrokerTopicStats)
logMap.put(partition, log)
this.logs += log
}

7
core/src/test/scala/unit/kafka/log/BrokerCompressionTest.scala

@ -25,10 +25,12 @@ import org.junit.Assert._ @@ -25,10 +25,12 @@ import org.junit.Assert._
import org.junit.runner.RunWith
import org.junit.runners.Parameterized
import org.junit.runners.Parameterized.Parameters
import org.apache.kafka.common.record.{SimpleRecord, CompressionType, MemoryRecords}
import org.apache.kafka.common.record.{CompressionType, MemoryRecords, SimpleRecord}
import org.apache.kafka.common.utils.Utils
import java.util.{Collection, Properties}
import kafka.server.BrokerTopicStats
import scala.collection.JavaConverters._
@RunWith(value = classOf[Parameterized])
@ -53,7 +55,8 @@ class BrokerCompressionTest(messageCompression: String, brokerCompression: Strin @@ -53,7 +55,8 @@ class BrokerCompressionTest(messageCompression: String, brokerCompression: Strin
val logProps = new Properties()
logProps.put(LogConfig.CompressionTypeProp, brokerCompression)
/*configure broker-side compression */
val log = new Log(logDir, LogConfig(logProps), logStartOffset = 0L, recoveryPoint = 0L, scheduler = time.scheduler, time = time)
val log = new Log(logDir, LogConfig(logProps), logStartOffset = 0L, recoveryPoint = 0L, scheduler = time.scheduler,
time = time, brokerTopicStats = new BrokerTopicStats)
/* append two messages */
log.appendAsLeader(MemoryRecords.withRecords(CompressionType.forId(messageCompressionCode.codec), 0,

7
core/src/test/scala/unit/kafka/log/LogCleanerManagerTest.scala

@ -20,6 +20,7 @@ package kafka.log @@ -20,6 +20,7 @@ package kafka.log
import java.io.File
import java.util.Properties
import kafka.server.BrokerTopicStats
import kafka.utils._
import org.apache.kafka.common.TopicPartition
import org.apache.kafka.common.record._
@ -234,12 +235,14 @@ class LogCleanerManagerTest extends JUnitSuite with Logging { @@ -234,12 +235,14 @@ class LogCleanerManagerTest extends JUnitSuite with Logging {
logStartOffset = 0L,
recoveryPoint = 0L,
scheduler = time.scheduler,
time = time)
time = time,
brokerTopicStats = new BrokerTopicStats)
log
}
private def makeLog(dir: File = logDir, config: LogConfig = logConfig) =
new Log(dir = dir, config = config, logStartOffset = 0L, recoveryPoint = 0L, scheduler = time.scheduler, time = time)
new Log(dir = dir, config = config, logStartOffset = 0L, recoveryPoint = 0L, scheduler = time.scheduler,
time = time, brokerTopicStats = new BrokerTopicStats)
private def records(key: Int, value: Int, timestamp: Long) =
MemoryRecords.withRecords(CompressionType.NONE, new SimpleRecord(timestamp, key.toString.getBytes, value.toString.getBytes))

4
core/src/test/scala/unit/kafka/log/LogCleanerTest.scala

@ -23,6 +23,7 @@ import java.nio.file.Paths @@ -23,6 +23,7 @@ import java.nio.file.Paths
import java.util.Properties
import kafka.common._
import kafka.server.BrokerTopicStats
import kafka.utils._
import org.apache.kafka.common.TopicPartition
import org.apache.kafka.common.record._
@ -974,7 +975,8 @@ class LogCleanerTest extends JUnitSuite { @@ -974,7 +975,8 @@ class LogCleanerTest extends JUnitSuite {
messageWithOffset(key.toString.getBytes, value.toString.getBytes, offset)
private def makeLog(dir: File = dir, config: LogConfig = logConfig) =
new Log(dir = dir, config = config, logStartOffset = 0L, recoveryPoint = 0L, scheduler = time.scheduler, time = time)
new Log(dir = dir, config = config, logStartOffset = 0L, recoveryPoint = 0L, scheduler = time.scheduler,
time = time, brokerTopicStats = new BrokerTopicStats)
private def noOpCheckDone(topicPartition: TopicPartition) { /* do nothing */ }

128
core/src/test/scala/unit/kafka/log/LogTest.scala

@ -27,7 +27,7 @@ import kafka.common.KafkaException @@ -27,7 +27,7 @@ import kafka.common.KafkaException
import org.junit.Assert._
import org.junit.{After, Before, Test}
import kafka.utils._
import kafka.server.KafkaConfig
import kafka.server.{BrokerTopicStats, KafkaConfig}
import kafka.server.epoch.{EpochEntry, LeaderEpochFileCache}
import org.apache.kafka.common.record.MemoryRecords.RecordFilter
import org.apache.kafka.common.record._
@ -45,6 +45,7 @@ class LogTest { @@ -45,6 +45,7 @@ class LogTest {
val time = new MockTime()
var config: KafkaConfig = null
val logConfig = LogConfig()
val brokerTopicStats = new BrokerTopicStats
@Before
def setUp() {
@ -54,6 +55,7 @@ class LogTest { @@ -54,6 +55,7 @@ class LogTest {
@After
def tearDown() {
brokerTopicStats.close()
Utils.delete(tmpDir)
}
@ -100,6 +102,7 @@ class LogTest { @@ -100,6 +102,7 @@ class LogTest {
recoveryPoint = 0L,
maxProducerIdExpirationMs = 24 * 60,
scheduler = time.scheduler,
brokerTopicStats = brokerTopicStats,
time = time)
assertEquals("Log begins with a single empty segment.", 1, log.numberOfSegments)
// Test the segment rolling behavior when messages do not have a timestamp.
@ -151,7 +154,8 @@ class LogTest { @@ -151,7 +154,8 @@ class LogTest {
LogConfig(logProps),
recoveryPoint = 0L,
scheduler = time.scheduler,
time = time)
time = time,
brokerTopicStats = new BrokerTopicStats)
val pid = 1L
val epoch: Short = 0
@ -410,6 +414,7 @@ class LogTest { @@ -410,6 +414,7 @@ class LogTest {
LogConfig(logProps),
recoveryPoint = 0L,
scheduler = time.scheduler,
brokerTopicStats = brokerTopicStats,
time = time)
val pid = 1L
@ -485,6 +490,7 @@ class LogTest { @@ -485,6 +490,7 @@ class LogTest {
LogConfig(logProps),
recoveryPoint = 0L,
scheduler = time.scheduler,
brokerTopicStats = brokerTopicStats,
time = time)
val epoch: Short = 0
@ -601,6 +607,7 @@ class LogTest { @@ -601,6 +607,7 @@ class LogTest {
LogConfig(logProps),
recoveryPoint = 0L,
scheduler = time.scheduler,
brokerTopicStats = brokerTopicStats,
time = time)
val pid = 1L
@ -632,6 +639,7 @@ class LogTest { @@ -632,6 +639,7 @@ class LogTest {
logStartOffset = 0L,
recoveryPoint = 0L,
scheduler = time.scheduler,
brokerTopicStats = brokerTopicStats,
time = time)
assertEquals("Log begins with a single empty segment.", 1, log.numberOfSegments)
log.appendAsLeader(set, leaderEpoch = 0)
@ -661,7 +669,8 @@ class LogTest { @@ -661,7 +669,8 @@ class LogTest {
// We use need to use magic value 1 here because the test is message size sensitive.
logProps.put(LogConfig.MessageFormatVersionProp, ApiVersion.latestVersion.toString)
// create a log
val log = new Log(logDir, LogConfig(logProps), logStartOffset = 0L, recoveryPoint = 0L, scheduler = time.scheduler, time = time)
val log = new Log(logDir, LogConfig(logProps), logStartOffset = 0L, recoveryPoint = 0L, scheduler = time.scheduler,
brokerTopicStats = brokerTopicStats, time = time)
assertEquals("There should be exactly 1 segment.", 1, log.numberOfSegments)
// segments expire in size
@ -676,7 +685,8 @@ class LogTest { @@ -676,7 +685,8 @@ class LogTest {
@Test
def testLoadEmptyLog() {
createEmptyLogs(logDir, 0)
val log = new Log(logDir, logConfig, logStartOffset = 0L, recoveryPoint = 0L, scheduler = time.scheduler, time = time)
val log = new Log(logDir, logConfig, logStartOffset = 0L, recoveryPoint = 0L, scheduler = time.scheduler,
brokerTopicStats = brokerTopicStats, time = time)
log.appendAsLeader(TestUtils.singletonRecords(value = "test".getBytes, timestamp = time.milliseconds), leaderEpoch = 0)
}
@ -689,7 +699,8 @@ class LogTest { @@ -689,7 +699,8 @@ class LogTest {
logProps.put(LogConfig.SegmentBytesProp, 71: java.lang.Integer)
// We use need to use magic value 1 here because the test is message size sensitive.
logProps.put(LogConfig.MessageFormatVersionProp, ApiVersion.latestVersion.toString)
val log = new Log(logDir, LogConfig(logProps), logStartOffset = 0L, recoveryPoint = 0L, scheduler = time.scheduler, time = time)
val log = new Log(logDir, LogConfig(logProps), logStartOffset = 0L, recoveryPoint = 0L, scheduler = time.scheduler,
brokerTopicStats = brokerTopicStats, time = time)
val values = (0 until 100 by 2).map(id => id.toString.getBytes).toArray
for(value <- values)
@ -713,7 +724,8 @@ class LogTest { @@ -713,7 +724,8 @@ class LogTest {
def testAppendAndReadWithNonSequentialOffsets() {
val logProps = new Properties()
logProps.put(LogConfig.SegmentBytesProp, 72: java.lang.Integer)
val log = new Log(logDir, LogConfig(logProps), logStartOffset = 0L, recoveryPoint = 0L, scheduler = time.scheduler, time = time)
val log = new Log(logDir, LogConfig(logProps), logStartOffset = 0L, recoveryPoint = 0L, scheduler = time.scheduler,
brokerTopicStats = brokerTopicStats, time = time)
val messageIds = ((0 until 50) ++ (50 until 200 by 7)).toArray
val records = messageIds.map(id => new SimpleRecord(id.toString.getBytes))
@ -738,7 +750,8 @@ class LogTest { @@ -738,7 +750,8 @@ class LogTest {
def testReadAtLogGap() {
val logProps = new Properties()
logProps.put(LogConfig.SegmentBytesProp, 300: java.lang.Integer)
val log = new Log(logDir, LogConfig(logProps), logStartOffset = 0L, recoveryPoint = 0L, scheduler = time.scheduler, time = time)
val log = new Log(logDir, LogConfig(logProps), logStartOffset = 0L, recoveryPoint = 0L, scheduler = time.scheduler,
brokerTopicStats = brokerTopicStats, time = time)
// keep appending until we have two segments with only a single message in the second segment
while(log.numberOfSegments == 1)
@ -755,7 +768,8 @@ class LogTest { @@ -755,7 +768,8 @@ class LogTest {
def testReadWithMinMessage() {
val logProps = new Properties()
logProps.put(LogConfig.SegmentBytesProp, 72: java.lang.Integer)
val log = new Log(logDir, LogConfig(logProps), logStartOffset = 0L, recoveryPoint = 0L, scheduler = time.scheduler, time = time)
val log = new Log(logDir, LogConfig(logProps), logStartOffset = 0L, recoveryPoint = 0L, scheduler = time.scheduler,
brokerTopicStats = brokerTopicStats, time = time)
val messageIds = ((0 until 50) ++ (50 until 200 by 7)).toArray
val records = messageIds.map(id => new SimpleRecord(id.toString.getBytes))
@ -783,7 +797,8 @@ class LogTest { @@ -783,7 +797,8 @@ class LogTest {
def testReadWithTooSmallMaxLength() {
val logProps = new Properties()
logProps.put(LogConfig.SegmentBytesProp, 72: java.lang.Integer)
val log = new Log(logDir, LogConfig(logProps), logStartOffset = 0L, recoveryPoint = 0L, scheduler = time.scheduler, time = time)
val log = new Log(logDir, LogConfig(logProps), logStartOffset = 0L, recoveryPoint = 0L, scheduler = time.scheduler,
brokerTopicStats = brokerTopicStats, time = time)
val messageIds = ((0 until 50) ++ (50 until 200 by 7)).toArray
val records = messageIds.map(id => new SimpleRecord(id.toString.getBytes))
@ -819,7 +834,8 @@ class LogTest { @@ -819,7 +834,8 @@ class LogTest {
// set up replica log starting with offset 1024 and with one message (at offset 1024)
logProps.put(LogConfig.SegmentBytesProp, 1024: java.lang.Integer)
val log = new Log(logDir, LogConfig(logProps), logStartOffset = 0L, recoveryPoint = 0L, scheduler = time.scheduler, time = time)
val log = new Log(logDir, LogConfig(logProps), logStartOffset = 0L, recoveryPoint = 0L, scheduler = time.scheduler,
brokerTopicStats = brokerTopicStats, time = time)
log.appendAsLeader(TestUtils.singletonRecords(value = "42".getBytes), leaderEpoch = 0)
assertEquals("Reading at the log end offset should produce 0 byte read.", 0, log.read(1025, 1000).records.sizeInBytes)
@ -850,7 +866,8 @@ class LogTest { @@ -850,7 +866,8 @@ class LogTest {
/* create a multipart log with 100 messages */
val logProps = new Properties()
logProps.put(LogConfig.SegmentBytesProp, 100: java.lang.Integer)
val log = new Log(logDir, LogConfig(logProps), logStartOffset = 0L, recoveryPoint = 0L, scheduler = time.scheduler, time = time)
val log = new Log(logDir, LogConfig(logProps), logStartOffset = 0L, recoveryPoint = 0L, scheduler = time.scheduler,
brokerTopicStats = brokerTopicStats, time = time)
val numMessages = 100
val messageSets = (0 until numMessages).map(i => TestUtils.singletonRecords(value = i.toString.getBytes,
timestamp = time.milliseconds))
@ -888,7 +905,8 @@ class LogTest { @@ -888,7 +905,8 @@ class LogTest {
/* this log should roll after every messageset */
val logProps = new Properties()
logProps.put(LogConfig.SegmentBytesProp, 110: java.lang.Integer)
val log = new Log(logDir, LogConfig(logProps), logStartOffset = 0L, recoveryPoint = 0L, scheduler = time.scheduler, time = time)
val log = new Log(logDir, LogConfig(logProps), logStartOffset = 0L, recoveryPoint = 0L, scheduler = time.scheduler,
brokerTopicStats = brokerTopicStats, time = time)
/* append 2 compressed message sets, each with two messages giving offsets 0, 1, 2, 3 */
log.appendAsLeader(MemoryRecords.withRecords(CompressionType.GZIP, new SimpleRecord("hello".getBytes), new SimpleRecord("there".getBytes)), leaderEpoch = 0)
@ -914,7 +932,8 @@ class LogTest { @@ -914,7 +932,8 @@ class LogTest {
val logProps = new Properties()
logProps.put(LogConfig.SegmentBytesProp, 100: java.lang.Integer)
logProps.put(LogConfig.RetentionMsProp, 0: java.lang.Integer)
val log = new Log(logDir, LogConfig(logProps), logStartOffset = 0L, recoveryPoint = 0L, scheduler = time.scheduler, time = time)
val log = new Log(logDir, LogConfig(logProps), logStartOffset = 0L, recoveryPoint = 0L, scheduler = time.scheduler,
brokerTopicStats = brokerTopicStats, time = time)
for(i <- 0 until messagesToAppend)
log.appendAsLeader(TestUtils.singletonRecords(value = i.toString.getBytes, timestamp = time.milliseconds - 10), leaderEpoch = 0)
@ -950,7 +969,8 @@ class LogTest { @@ -950,7 +969,8 @@ class LogTest {
logProps.put(LogConfig.SegmentBytesProp, configSegmentSize: java.lang.Integer)
// We use need to use magic value 1 here because the test is message size sensitive.
logProps.put(LogConfig.MessageFormatVersionProp, ApiVersion.latestVersion.toString)
val log = new Log(logDir, LogConfig(logProps), logStartOffset = 0L, recoveryPoint = 0L, scheduler = time.scheduler, time = time)
val log = new Log(logDir, LogConfig(logProps), logStartOffset = 0L, recoveryPoint = 0L, scheduler = time.scheduler,
brokerTopicStats = brokerTopicStats, time = time)
try {
log.appendAsLeader(messageSet, leaderEpoch = 0)
@ -977,7 +997,8 @@ class LogTest { @@ -977,7 +997,8 @@ class LogTest {
val logProps = new Properties()
logProps.put(LogConfig.CleanupPolicyProp, LogConfig.Compact)
val log = new Log(logDir, LogConfig(logProps), logStartOffset = 0L, recoveryPoint = 0L, scheduler = time.scheduler, time = time)
val log = new Log(logDir, LogConfig(logProps), logStartOffset = 0L, recoveryPoint = 0L, scheduler = time.scheduler,
brokerTopicStats = brokerTopicStats, time = time)
try {
log.appendAsLeader(messageSetWithUnkeyedMessage, leaderEpoch = 0)
@ -1019,7 +1040,8 @@ class LogTest { @@ -1019,7 +1040,8 @@ class LogTest {
val maxMessageSize = second.sizeInBytes - 1
val logProps = new Properties()
logProps.put(LogConfig.MaxMessageBytesProp, maxMessageSize: java.lang.Integer)
val log = new Log(logDir, LogConfig(logProps), logStartOffset = 0L, recoveryPoint = 0L, scheduler = time.scheduler, time = time)
val log = new Log(logDir, LogConfig(logProps), logStartOffset = 0L, recoveryPoint = 0L, scheduler = time.scheduler,
brokerTopicStats = brokerTopicStats, time = time)
// should be able to append the small message
log.appendAsLeader(first, leaderEpoch = 0)
@ -1045,7 +1067,8 @@ class LogTest { @@ -1045,7 +1067,8 @@ class LogTest {
logProps.put(LogConfig.IndexIntervalBytesProp, indexInterval: java.lang.Integer)
logProps.put(LogConfig.SegmentIndexBytesProp, 4096: java.lang.Integer)
val config = LogConfig(logProps)
var log = new Log(logDir, config, logStartOffset = 0L, recoveryPoint = 0L, scheduler = time.scheduler, time = time)
var log = new Log(logDir, config, logStartOffset = 0L, recoveryPoint = 0L, scheduler = time.scheduler,
brokerTopicStats = brokerTopicStats, time = time)
for(i <- 0 until numMessages)
log.appendAsLeader(TestUtils.singletonRecords(value = TestUtils.randomBytes(messageSize),
timestamp = time.milliseconds + i * 10), leaderEpoch = 0)
@ -1071,12 +1094,14 @@ class LogTest { @@ -1071,12 +1094,14 @@ class LogTest {
assertEquals("Should have same number of time index entries as before.", numTimeIndexEntries, log.activeSegment.timeIndex.entries)
}
log = new Log(logDir, config, logStartOffset = 0L, recoveryPoint = lastOffset, scheduler = time.scheduler, time = time)
log = new Log(logDir, config, logStartOffset = 0L, recoveryPoint = lastOffset, scheduler = time.scheduler,
brokerTopicStats = brokerTopicStats, time = time)
verifyRecoveredLog(log)
log.close()
// test recovery case
log = new Log(logDir, config, logStartOffset = 0L, recoveryPoint = 0L, scheduler = time.scheduler, time = time)
log = new Log(logDir, config, logStartOffset = 0L, recoveryPoint = 0L, scheduler = time.scheduler,
brokerTopicStats = brokerTopicStats, time = time)
verifyRecoveredLog(log)
log.close()
}
@ -1092,7 +1117,8 @@ class LogTest { @@ -1092,7 +1117,8 @@ class LogTest {
logProps.put(LogConfig.IndexIntervalBytesProp, 1: java.lang.Integer)
val config = LogConfig(logProps)
val log = new Log(logDir, config, logStartOffset = 0L, recoveryPoint = 0L, scheduler = time.scheduler, time = time)
val log = new Log(logDir, config, logStartOffset = 0L, recoveryPoint = 0L, scheduler = time.scheduler,
brokerTopicStats = brokerTopicStats, time = time)
val messages = (0 until numMessages).map { i =>
MemoryRecords.withRecords(100 + i, CompressionType.NONE, 0, new SimpleRecord(time.milliseconds + i, i.toString.getBytes()))
@ -1116,7 +1142,8 @@ class LogTest { @@ -1116,7 +1142,8 @@ class LogTest {
logProps.put(LogConfig.IndexIntervalBytesProp, 1: java.lang.Integer)
val config = LogConfig(logProps)
var log = new Log(logDir, config, logStartOffset = 0L, recoveryPoint = 0L, scheduler = time.scheduler, time = time)
var log = new Log(logDir, config, logStartOffset = 0L, recoveryPoint = 0L, scheduler = time.scheduler,
brokerTopicStats = brokerTopicStats, time = time)
for(i <- 0 until numMessages)
log.appendAsLeader(TestUtils.singletonRecords(value = TestUtils.randomBytes(10), timestamp = time.milliseconds + i * 10), leaderEpoch = 0)
val indexFiles = log.logSegments.map(_.index.file)
@ -1128,7 +1155,8 @@ class LogTest { @@ -1128,7 +1155,8 @@ class LogTest {
timeIndexFiles.foreach(_.delete())
// reopen the log
log = new Log(logDir, config, logStartOffset = 0L, recoveryPoint = 0L, scheduler = time.scheduler, time = time)
log = new Log(logDir, config, logStartOffset = 0L, recoveryPoint = 0L, scheduler = time.scheduler,
brokerTopicStats = brokerTopicStats, time = time)
assertEquals("Should have %d messages when log is reopened".format(numMessages), numMessages, log.logEndOffset)
assertTrue("The index should have been rebuilt", log.logSegments.head.index.entries > 0)
assertTrue("The time index should have been rebuilt", log.logSegments.head.timeIndex.entries > 0)
@ -1155,7 +1183,8 @@ class LogTest { @@ -1155,7 +1183,8 @@ class LogTest {
logProps.put(LogConfig.MessageFormatVersionProp, "0.9.0")
val config = LogConfig(logProps)
var log = new Log(logDir, config, logStartOffset = 0L, recoveryPoint = 0L, scheduler = time.scheduler, time = time)
var log = new Log(logDir, config, logStartOffset = 0L, recoveryPoint = 0L, scheduler = time.scheduler,
brokerTopicStats = brokerTopicStats, time = time)
for(i <- 0 until numMessages)
log.appendAsLeader(TestUtils.singletonRecords(value = TestUtils.randomBytes(10), timestamp = time.milliseconds + i * 10, magicValue = RecordBatch.MAGIC_VALUE_V1), leaderEpoch = 0)
val timeIndexFiles = log.logSegments.map(_.timeIndex.file)
@ -1165,7 +1194,8 @@ class LogTest { @@ -1165,7 +1194,8 @@ class LogTest {
timeIndexFiles.foreach(_.delete())
// The rebuilt time index should be empty
log = new Log(logDir, config, logStartOffset = 0L, recoveryPoint = numMessages + 1, scheduler = time.scheduler, time = time)
log = new Log(logDir, config, logStartOffset = 0L, recoveryPoint = numMessages + 1, scheduler = time.scheduler,
brokerTopicStats = brokerTopicStats, time = time)
val segArray = log.logSegments.toArray
for (i <- 0 until segArray.size - 1) {
assertEquals("The time index should be empty", 0, segArray(i).timeIndex.entries)
@ -1186,7 +1216,8 @@ class LogTest { @@ -1186,7 +1216,8 @@ class LogTest {
logProps.put(LogConfig.IndexIntervalBytesProp, 1: java.lang.Integer)
val config = LogConfig(logProps)
var log = new Log(logDir, config, logStartOffset = 0L, recoveryPoint = 0L, scheduler = time.scheduler, time = time)
var log = new Log(logDir, config, logStartOffset = 0L, recoveryPoint = 0L, scheduler = time.scheduler,
brokerTopicStats = brokerTopicStats, time = time)
for(i <- 0 until numMessages)
log.appendAsLeader(TestUtils.singletonRecords(value = TestUtils.randomBytes(10), timestamp = time.milliseconds + i * 10), leaderEpoch = 0)
val indexFiles = log.logSegments.map(_.index.file)
@ -1208,7 +1239,8 @@ class LogTest { @@ -1208,7 +1239,8 @@ class LogTest {
}
// reopen the log
log = new Log(logDir, config, logStartOffset = 0L, recoveryPoint = 200L, scheduler = time.scheduler, time = time)
log = new Log(logDir, config, logStartOffset = 0L, recoveryPoint = 200L, scheduler = time.scheduler,
brokerTopicStats = brokerTopicStats, time = time)
assertEquals("Should have %d messages when log is reopened".format(numMessages), numMessages, log.logEndOffset)
for(i <- 0 until numMessages) {
assertEquals(i, log.read(i, 100, None).records.batches.iterator.next().lastOffset)
@ -1234,7 +1266,8 @@ class LogTest { @@ -1234,7 +1266,8 @@ class LogTest {
logProps.put(LogConfig.SegmentBytesProp, segmentSize: java.lang.Integer)
// create a log
val log = new Log(logDir, LogConfig(logProps), logStartOffset = 0L, recoveryPoint = 0L, scheduler = time.scheduler, time = time)
val log = new Log(logDir, LogConfig(logProps), logStartOffset = 0L, recoveryPoint = 0L, scheduler = time.scheduler,
brokerTopicStats = brokerTopicStats, time = time)
assertEquals("There should be exactly 1 segment.", 1, log.numberOfSegments)
for (_ <- 1 to msgPerSeg)
@ -1289,7 +1322,8 @@ class LogTest { @@ -1289,7 +1322,8 @@ class LogTest {
logProps.put(LogConfig.SegmentBytesProp, segmentSize: java.lang.Integer)
logProps.put(LogConfig.IndexIntervalBytesProp, setSize - 1: java.lang.Integer)
val config = LogConfig(logProps)
val log = new Log(logDir, config, logStartOffset = 0L, recoveryPoint = 0L, scheduler = time.scheduler, time = time)
val log = new Log(logDir, config, logStartOffset = 0L, recoveryPoint = 0L, scheduler = time.scheduler,
brokerTopicStats = brokerTopicStats, time = time)
assertEquals("There should be exactly 1 segment.", 1, log.numberOfSegments)
for (i<- 1 to msgPerSeg)
@ -1336,6 +1370,7 @@ class LogTest { @@ -1336,6 +1370,7 @@ class LogTest {
logStartOffset = 0L,
recoveryPoint = 0L,
scheduler = time.scheduler,
brokerTopicStats = brokerTopicStats,
time = time)
assertTrue("The first index file should have been replaced with a larger file", bogusIndex1.length > 0)
@ -1368,6 +1403,7 @@ class LogTest { @@ -1368,6 +1403,7 @@ class LogTest {
logStartOffset = 0L,
recoveryPoint = 0L,
scheduler = time.scheduler,
brokerTopicStats = brokerTopicStats,
time = time)
// add enough messages to roll over several segments then close and re-open and attempt to truncate
@ -1379,6 +1415,7 @@ class LogTest { @@ -1379,6 +1415,7 @@ class LogTest {
logStartOffset = 0L,
recoveryPoint = 0L,
scheduler = time.scheduler,
brokerTopicStats = brokerTopicStats,
time = time)
log.truncateTo(3)
assertEquals("All but one segment should be deleted.", 1, log.numberOfSegments)
@ -1405,6 +1442,7 @@ class LogTest { @@ -1405,6 +1442,7 @@ class LogTest {
logStartOffset = 0L,
recoveryPoint = 0L,
scheduler = time.scheduler,
brokerTopicStats = brokerTopicStats,
time = time)
// append some messages to create some segments
@ -1446,6 +1484,7 @@ class LogTest { @@ -1446,6 +1484,7 @@ class LogTest {
logStartOffset = 0L,
recoveryPoint = 0L,
scheduler = time.scheduler,
brokerTopicStats = brokerTopicStats,
time = time)
// append some messages to create some segments
@ -1461,6 +1500,7 @@ class LogTest { @@ -1461,6 +1500,7 @@ class LogTest {
logStartOffset = 0L,
recoveryPoint = 0L,
scheduler = time.scheduler,
brokerTopicStats = brokerTopicStats,
time = time)
assertEquals("The deleted segments should be gone.", 1, log.numberOfSegments)
}
@ -1472,6 +1512,7 @@ class LogTest { @@ -1472,6 +1512,7 @@ class LogTest {
logStartOffset = 0L,
recoveryPoint = 0L,
scheduler = time.scheduler,
brokerTopicStats = brokerTopicStats,
time = time)
log.appendAsLeader(TestUtils.singletonRecords(value = null), leaderEpoch = 0)
val head = log.read(0, 4096, None).records.records.iterator.next()
@ -1486,6 +1527,7 @@ class LogTest { @@ -1486,6 +1527,7 @@ class LogTest {
logStartOffset = 0L,
recoveryPoint = 0L,
scheduler = time.scheduler,
brokerTopicStats = brokerTopicStats,
time = time)
val records = (0 until 2).map(id => new SimpleRecord(id.toString.getBytes)).toArray
records.foreach(record => log.appendAsLeader(MemoryRecords.withRecords(CompressionType.NONE, record), leaderEpoch = 0))
@ -1500,6 +1542,7 @@ class LogTest { @@ -1500,6 +1542,7 @@ class LogTest {
logStartOffset = 0L,
recoveryPoint = 0L,
scheduler = time.scheduler,
brokerTopicStats = brokerTopicStats,
time = time)
log.appendAsLeader(MemoryRecords.withRecords(CompressionType.NONE,
new SimpleRecord(RecordBatch.NO_TIMESTAMP, "key".getBytes, "value".getBytes)), leaderEpoch = 0)
@ -1523,6 +1566,7 @@ class LogTest { @@ -1523,6 +1566,7 @@ class LogTest {
logStartOffset = 0L,
recoveryPoint = 0L,
scheduler = time.scheduler,
brokerTopicStats = brokerTopicStats,
time = time)
val numMessages = 50 + TestUtils.random.nextInt(50)
for (_ <- 0 until numMessages)
@ -1535,7 +1579,7 @@ class LogTest { @@ -1535,7 +1579,7 @@ class LogTest {
TestUtils.appendNonsenseToFile(log.activeSegment.log.file, TestUtils.random.nextInt(1024) + 1)
// attempt recovery
log = new Log(logDir, config, 0L, recoveryPoint, time.scheduler, time)
log = new Log(logDir, config, 0L, recoveryPoint, time.scheduler, brokerTopicStats, time)
assertEquals(numMessages, log.logEndOffset)
val recovered = log.logSegments.flatMap(_.log.records.asScala.toList).toList
@ -1566,6 +1610,7 @@ class LogTest { @@ -1566,6 +1610,7 @@ class LogTest {
logStartOffset = 0L,
recoveryPoint = 0L,
scheduler = time.scheduler,
brokerTopicStats = brokerTopicStats,
time = time)
val set1 = MemoryRecords.withRecords(0, CompressionType.NONE, 0, new SimpleRecord("v1".getBytes(), "k1".getBytes()))
val set2 = MemoryRecords.withRecords(Integer.MAX_VALUE.toLong + 2, CompressionType.NONE, 0, new SimpleRecord("v3".getBytes(), "k3".getBytes()))
@ -1617,6 +1662,7 @@ class LogTest { @@ -1617,6 +1662,7 @@ class LogTest {
logStartOffset = 0L,
recoveryPoint = 0L,
scheduler = time.scheduler,
brokerTopicStats = brokerTopicStats,
time = time)
for (_ <- 0 until 100)
log.appendAsLeader(createRecords, leaderEpoch = 0)
@ -1625,7 +1671,7 @@ class LogTest { @@ -1625,7 +1671,7 @@ class LogTest {
// check if recovery was attempted. Even if the recovery point is 0L, recovery should not be attempted as the
// clean shutdown file exists.
recoveryPoint = log.logEndOffset
log = new Log(logDir, config, 0L, 0L, time.scheduler, time)
log = new Log(logDir, config, 0L, 0L, time.scheduler, brokerTopicStats, time)
assertEquals(recoveryPoint, log.logEndOffset)
cleanShutdownFile.delete()
}
@ -1795,6 +1841,7 @@ class LogTest { @@ -1795,6 +1841,7 @@ class LogTest {
logStartOffset = 0L,
recoveryPoint = 0L,
scheduler = time.scheduler,
brokerTopicStats = brokerTopicStats,
time = time)
// append some messages to create some segments
@ -1924,7 +1971,7 @@ class LogTest { @@ -1924,7 +1971,7 @@ class LogTest {
@Test
def shouldDeleteSegmentsReadyToBeDeletedWhenCleanupPolicyIsCompactAndDelete() {
def createRecords = TestUtils.singletonRecords("test".getBytes, key = "test".getBytes,timestamp = 10L)
def createRecords = TestUtils.singletonRecords("test".getBytes, key = "test".getBytes, timestamp = 10L)
val log = createLog(createRecords.sizeInBytes,
retentionMs = 10000,
cleanupPolicy = "compact,delete")
@ -1943,7 +1990,8 @@ class LogTest { @@ -1943,7 +1990,8 @@ class LogTest {
//Given this partition is on leader epoch 72
val epoch = 72
val log = new Log(logDir, LogConfig(), recoveryPoint = 0L, scheduler = time.scheduler, time = time)
val log = new Log(logDir, LogConfig(), recoveryPoint = 0L, scheduler = time.scheduler,
brokerTopicStats = brokerTopicStats, time = time)
log.leaderEpochCache.assign(epoch, records.size)
//When appending messages as a leader (i.e. assignOffsets = true)
@ -1975,7 +2023,8 @@ class LogTest { @@ -1975,7 +2023,8 @@ class LogTest {
recs
}
val log = new Log(logDir, LogConfig(), recoveryPoint = 0L, scheduler = time.scheduler, time = time)
val log = new Log(logDir, LogConfig(), recoveryPoint = 0L, scheduler = time.scheduler,
brokerTopicStats = brokerTopicStats, time = time)
//When appending as follower (assignOffsets = false)
for (i <- records.indices)
@ -2034,7 +2083,8 @@ class LogTest { @@ -2034,7 +2083,8 @@ class LogTest {
def shouldTruncateLeaderEpochFileWhenTruncatingLog() {
def createRecords = TestUtils.singletonRecords(value = "test".getBytes, timestamp = time.milliseconds)
val logProps = CoreUtils.propsWith(LogConfig.SegmentBytesProp, (10 * createRecords.sizeInBytes).toString)
val log = new Log(logDir, LogConfig( logProps), recoveryPoint = 0L, scheduler = time.scheduler, time = time)
val log = new Log(logDir, LogConfig( logProps), recoveryPoint = 0L, scheduler = time.scheduler,
brokerTopicStats = brokerTopicStats, time = time)
val cache = epochCache(log)
//Given 2 segments, 10 messages per segment
@ -2079,7 +2129,8 @@ class LogTest { @@ -2079,7 +2129,8 @@ class LogTest {
*/
@Test
def testLogRecoversForLeaderEpoch() {
val log = new Log(logDir, LogConfig(new Properties()), recoveryPoint = 0L, scheduler = time.scheduler, time = time)
val log = new Log(logDir, LogConfig(new Properties()), recoveryPoint = 0L, scheduler = time.scheduler,
brokerTopicStats = brokerTopicStats, time = time)
val leaderEpochCache = epochCache(log)
val firstBatch = singletonRecordsWithLeaderEpoch(value = "random".getBytes, leaderEpoch = 1, offset = 0)
log.appendAsFollower(records = firstBatch)
@ -2101,7 +2152,8 @@ class LogTest { @@ -2101,7 +2152,8 @@ class LogTest {
log.close()
// reopen the log and recover from the beginning
val recoveredLog = new Log(logDir, LogConfig(new Properties()), recoveryPoint = 0L, scheduler = time.scheduler, time = time)
val recoveredLog = new Log(logDir, LogConfig(new Properties()), recoveryPoint = 0L, scheduler = time.scheduler,
brokerTopicStats = brokerTopicStats, time = time)
val recoveredLeaderEpochCache = epochCache(recoveredLog)
// epoch entries should be recovered
@ -2430,15 +2482,15 @@ class LogTest { @@ -2430,15 +2482,15 @@ class LogTest {
logProps.put(LogConfig.CleanupPolicyProp, cleanupPolicy)
logProps.put(LogConfig.MessageTimestampDifferenceMaxMsProp, Long.MaxValue.toString)
val config = LogConfig(logProps)
val log = new Log(logDir,
new Log(logDir,
config,
logStartOffset = 0L,
recoveryPoint = 0L,
scheduler = time.scheduler,
brokerTopicStats = brokerTopicStats,
time = time,
maxProducerIdExpirationMs = maxPidExpirationMs,
producerIdExpirationCheckIntervalMs = pidExpirationCheckIntervalMs)
log
}
private def allAbortedTransactions(log: Log) = log.logSegments.flatMap(_.txnIndex.allAbortedTxns)

2
core/src/test/scala/unit/kafka/metrics/MetricsTest.scala

@ -86,7 +86,7 @@ class MetricsTest extends KafkaServerTestHarness with Logging { @@ -86,7 +86,7 @@ class MetricsTest extends KafkaServerTestHarness with Logging {
// Don't consume messages as it may cause metrics to be re-created causing the test to fail, see KAFKA-5238
TestUtils.produceMessages(servers, topic, nMessages)
assertTrue("Topic metrics don't exist", topicMetricGroups(topic).nonEmpty)
assertNotNull(BrokerTopicStats.getBrokerTopicStats(topic))
servers.foreach(s => assertNotNull(s.brokerTopicStats.topicStats(topic)))
AdminUtils.deleteTopic(zkUtils, topic)
TestUtils.verifyTopicDeletion(zkUtils, topic, 1, servers)
assertEquals("Topic metrics exists after deleteTopic", Set.empty, topicMetricGroups(topic))

4
core/src/test/scala/unit/kafka/server/HighwatermarkPersistenceTest.scala

@ -61,7 +61,7 @@ class HighwatermarkPersistenceTest { @@ -61,7 +61,7 @@ class HighwatermarkPersistenceTest {
// create replica manager
val replicaManager = new ReplicaManager(configs.head, metrics, time, zkUtils, scheduler,
logManagers.head, new AtomicBoolean(false), QuotaFactory.instantiate(configs.head, metrics, time).follower,
new MetadataCache(configs.head.brokerId))
new BrokerTopicStats, new MetadataCache(configs.head.brokerId))
replicaManager.startup()
try {
replicaManager.checkpointHighWatermarks()
@ -106,7 +106,7 @@ class HighwatermarkPersistenceTest { @@ -106,7 +106,7 @@ class HighwatermarkPersistenceTest {
// create replica manager
val replicaManager = new ReplicaManager(configs.head, metrics, time, zkUtils,
scheduler, logManagers.head, new AtomicBoolean(false), QuotaFactory.instantiate(configs.head, metrics, time).follower,
new MetadataCache(configs.head.brokerId))
new BrokerTopicStats, new MetadataCache(configs.head.brokerId))
replicaManager.startup()
try {
replicaManager.checkpointHighWatermarks()

3
core/src/test/scala/unit/kafka/server/ISRExpirationTest.scala

@ -55,7 +55,8 @@ class IsrExpirationTest { @@ -55,7 +55,8 @@ class IsrExpirationTest {
@Before
def setUp() {
replicaManager = new ReplicaManager(configs.head, metrics, time, null, null, null, new AtomicBoolean(false),
QuotaFactory.instantiate(configs.head, metrics, time).follower, new MetadataCache(configs.head.brokerId))
QuotaFactory.instantiate(configs.head, metrics, time).follower, new BrokerTopicStats,
new MetadataCache(configs.head.brokerId))
}
@After

3
core/src/test/scala/unit/kafka/server/ReplicaManagerQuotasTest.scala

@ -175,7 +175,8 @@ class ReplicaManagerQuotasTest { @@ -175,7 +175,8 @@ class ReplicaManagerQuotasTest {
replay(logManager)
replicaManager = new ReplicaManager(configs.head, metrics, time, zkUtils, scheduler, logManager,
new AtomicBoolean(false), QuotaFactory.instantiate(configs.head, metrics, time).follower, new MetadataCache(configs.head.brokerId))
new AtomicBoolean(false), QuotaFactory.instantiate(configs.head, metrics, time).follower,
new BrokerTopicStats, new MetadataCache(configs.head.brokerId))
//create the two replicas
for ((p, _) <- fetchInfo) {

18
core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala

@ -64,7 +64,8 @@ class ReplicaManagerTest { @@ -64,7 +64,8 @@ class ReplicaManagerTest {
val config = KafkaConfig.fromProps(props)
val mockLogMgr = TestUtils.createLogManager(config.logDirs.map(new File(_)).toArray)
val rm = new ReplicaManager(config, metrics, time, zkUtils, new MockScheduler(time), mockLogMgr,
new AtomicBoolean(false), QuotaFactory.instantiate(config, metrics, time).follower, new MetadataCache(config.brokerId))
new AtomicBoolean(false), QuotaFactory.instantiate(config, metrics, time).follower, new BrokerTopicStats,
new MetadataCache(config.brokerId))
try {
val partition = rm.getOrCreatePartition(new TopicPartition(topic, 1))
partition.getOrCreateReplica(1)
@ -82,7 +83,8 @@ class ReplicaManagerTest { @@ -82,7 +83,8 @@ class ReplicaManagerTest {
val config = KafkaConfig.fromProps(props)
val mockLogMgr = TestUtils.createLogManager(config.logDirs.map(new File(_)).toArray)
val rm = new ReplicaManager(config, metrics, time, zkUtils, new MockScheduler(time), mockLogMgr,
new AtomicBoolean(false), QuotaFactory.instantiate(config, metrics, time).follower, new MetadataCache(config.brokerId))
new AtomicBoolean(false), QuotaFactory.instantiate(config, metrics, time).follower, new BrokerTopicStats,
new MetadataCache(config.brokerId))
try {
val partition = rm.getOrCreatePartition(new TopicPartition(topic, 1))
partition.getOrCreateReplica(1)
@ -99,7 +101,8 @@ class ReplicaManagerTest { @@ -99,7 +101,8 @@ class ReplicaManagerTest {
val config = KafkaConfig.fromProps(props)
val mockLogMgr = TestUtils.createLogManager(config.logDirs.map(new File(_)).toArray)
val rm = new ReplicaManager(config, metrics, time, zkUtils, new MockScheduler(time), mockLogMgr,
new AtomicBoolean(false), QuotaFactory.instantiate(config, metrics, time).follower, new MetadataCache(config.brokerId), Option(this.getClass.getName))
new AtomicBoolean(false), QuotaFactory.instantiate(config, metrics, time).follower, new BrokerTopicStats,
new MetadataCache(config.brokerId), Option(this.getClass.getName))
try {
def callback(responseStatus: Map[TopicPartition, PartitionResponse]) = {
assert(responseStatus.values.head.error == Errors.INVALID_REQUIRED_ACKS)
@ -132,7 +135,8 @@ class ReplicaManagerTest { @@ -132,7 +135,8 @@ class ReplicaManagerTest {
EasyMock.expect(metadataCache.getAliveBrokers).andReturn(aliveBrokers).anyTimes()
EasyMock.replay(metadataCache)
val rm = new ReplicaManager(config, metrics, time, zkUtils, new MockScheduler(time), mockLogMgr,
new AtomicBoolean(false), QuotaFactory.instantiate(config, metrics, time).follower, metadataCache)
new AtomicBoolean(false), QuotaFactory.instantiate(config, metrics, time).follower, new BrokerTopicStats,
metadataCache)
try {
var produceCallbackFired = false
@ -211,7 +215,8 @@ class ReplicaManagerTest { @@ -211,7 +215,8 @@ class ReplicaManagerTest {
EasyMock.expect(metadataCache.isBrokerAlive(EasyMock.eq(1))).andReturn(true).anyTimes()
EasyMock.replay(metadataCache)
val rm = new ReplicaManager(config, metrics, time, zkUtils, new MockScheduler(time), mockLogMgr,
new AtomicBoolean(false), QuotaFactory.instantiate(config, metrics, time).follower, metadataCache, Option(this.getClass.getName))
new AtomicBoolean(false), QuotaFactory.instantiate(config, metrics, time).follower, new BrokerTopicStats,
metadataCache, Option(this.getClass.getName))
try {
val brokerList: java.util.List[Integer] = Seq[Integer](0, 1).asJava
@ -338,7 +343,8 @@ class ReplicaManagerTest { @@ -338,7 +343,8 @@ class ReplicaManagerTest {
EasyMock.expect(metadataCache.isBrokerAlive(EasyMock.eq(2))).andReturn(true).anyTimes()
EasyMock.replay(metadataCache)
val rm = new ReplicaManager(config, metrics, time, zkUtils, new MockScheduler(time), mockLogMgr,
new AtomicBoolean(false), QuotaFactory.instantiate(config, metrics, time).follower, metadataCache, Option(this.getClass.getName))
new AtomicBoolean(false), QuotaFactory.instantiate(config, metrics, time).follower, new BrokerTopicStats,
metadataCache, Option(this.getClass.getName))
try {
val brokerList: java.util.List[Integer] = Seq[Integer](0, 1, 2).asJava

12
core/src/test/scala/unit/kafka/server/SimpleFetchTest.scala

@ -99,7 +99,8 @@ class SimpleFetchTest { @@ -99,7 +99,8 @@ class SimpleFetchTest {
// create the replica manager
replicaManager = new ReplicaManager(configs.head, metrics, time, zkUtils, scheduler, logManager,
new AtomicBoolean(false), QuotaFactory.instantiate(configs.head, metrics, time).follower, new MetadataCache(configs.head.brokerId))
new AtomicBoolean(false), QuotaFactory.instantiate(configs.head, metrics, time).follower, new BrokerTopicStats,
new MetadataCache(configs.head.brokerId))
// add the partition with two replicas, both in ISR
val partition = replicaManager.getOrCreatePartition(new TopicPartition(topic, partitionId))
@ -150,8 +151,9 @@ class SimpleFetchTest { @@ -150,8 +151,9 @@ class SimpleFetchTest {
*/
@Test
def testReadFromLog() {
val initialTopicCount = BrokerTopicStats.getBrokerTopicStats(topic).totalFetchRequestRate.count()
val initialAllTopicsCount = BrokerTopicStats.getBrokerAllTopicsStats().totalFetchRequestRate.count()
val brokerTopicStats = new BrokerTopicStats
val initialTopicCount = brokerTopicStats.topicStats(topic).totalFetchRequestRate.count()
val initialAllTopicsCount = brokerTopicStats.allTopicsStats.totalFetchRequestRate.count()
val readCommittedRecords = replicaManager.readFromLocalLog(
replicaId = Request.OrdinaryConsumerId,
@ -178,7 +180,7 @@ class SimpleFetchTest { @@ -178,7 +180,7 @@ class SimpleFetchTest {
assertEquals("Reading any data can return messages up to the end of the log", recordToLEO,
new SimpleRecord(firstRecord))
assertEquals("Counts should increment after fetch", initialTopicCount+2, BrokerTopicStats.getBrokerTopicStats(topic).totalFetchRequestRate.count())
assertEquals("Counts should increment after fetch", initialAllTopicsCount+2, BrokerTopicStats.getBrokerAllTopicsStats().totalFetchRequestRate.count())
assertEquals("Counts should increment after fetch", initialTopicCount+2, brokerTopicStats.topicStats(topic).totalFetchRequestRate.count())
assertEquals("Counts should increment after fetch", initialAllTopicsCount+2, brokerTopicStats.allTopicsStats.totalFetchRequestRate.count())
}
}

3
core/src/test/scala/unit/kafka/utils/TestUtils.scala

@ -975,7 +975,8 @@ object TestUtils extends Logging { @@ -975,7 +975,8 @@ object TestUtils extends Logging {
maxPidExpirationMs = 60 * 60 * 1000,
scheduler = time.scheduler,
time = time,
brokerState = BrokerState())
brokerState = BrokerState(),
brokerTopicStats = new BrokerTopicStats)
}
@deprecated("This method has been deprecated and it will be removed in a future release.", "0.10.0.0")

Loading…
Cancel
Save