Browse Source

KAFKA-8753; Expose controller topic deletion metrics (KIP-503) (#7156)

This is the implementation for [KIP-503](https://cwiki.apache.org/confluence/display/KAFKA/KIP-503%3A+Add+metric+for+number+of+topics+marked+for+deletion)

When deleting a large number of topics, the Controller can get quite bogged down. One problem with this is the lack of visibility into the progress of the Controller. We can look into the ZK path for topics marked for deletion, but in a production environment this is inconvenient. This PR adds a JMX metric `kafka.controller:type=KafkaController,name=TopicsToDeleteCount` to make it easier to see how many topics are being deleted.

Reviewers: Stanislav Kozlovski <stanislav@confluent.io>, Jun Rao <junrao@gmail.com>, Jason Gustafson <jason@confluent.io>
pull/7250/head
David Arthur 5 years ago committed by Jason Gustafson
parent
commit
c1f2b0ffb8
  1. 54
      core/src/main/scala/kafka/controller/KafkaController.scala
  2. 4
      core/src/test/scala/unit/kafka/metrics/MetricsTest.scala
  3. 20
      docs/ops.html

54
core/src/main/scala/kafka/controller/KafkaController.scala

@ -109,6 +109,10 @@ class KafkaController(val config: KafkaConfig, @@ -109,6 +109,10 @@ class KafkaController(val config: KafkaConfig,
@volatile private var preferredReplicaImbalanceCount = 0
@volatile private var globalTopicCount = 0
@volatile private var globalPartitionCount = 0
@volatile private var topicsToDeleteCount = 0
@volatile private var replicasToDeleteCount = 0
@volatile private var ineligibleTopicsToDeleteCount = 0
@volatile private var ineligibleReplicasToDeleteCount = 0
/* single-thread scheduler to clean expired tokens */
private val tokenCleanScheduler = new KafkaScheduler(threads = 1, threadNamePrefix = "delegation-token-cleaner")
@ -155,6 +159,34 @@ class KafkaController(val config: KafkaConfig, @@ -155,6 +159,34 @@ class KafkaController(val config: KafkaConfig,
}
)
newGauge(
"TopicsToDeleteCount",
new Gauge[Int] {
def value: Int = topicsToDeleteCount
}
)
newGauge(
"ReplicasToDeleteCount",
new Gauge[Int] {
def value: Int = replicasToDeleteCount
}
)
newGauge(
"TopicsIneligibleToDeleteCount",
new Gauge[Int] {
def value: Int = ineligibleTopicsToDeleteCount
}
)
newGauge(
"ReplicasIneligibleToDeleteCount",
new Gauge[Int] {
def value: Int = ineligibleReplicasToDeleteCount
}
)
/**
* Returns true if this broker is the current controller.
*/
@ -315,6 +347,10 @@ class KafkaController(val config: KafkaConfig, @@ -315,6 +347,10 @@ class KafkaController(val config: KafkaConfig,
preferredReplicaImbalanceCount = 0
globalTopicCount = 0
globalPartitionCount = 0
topicsToDeleteCount = 0
replicasToDeleteCount = 0
ineligibleTopicsToDeleteCount = 0
ineligibleReplicasToDeleteCount = 0
// stop token expiry check scheduler
if (tokenCleanScheduler.isStarted)
@ -1191,6 +1227,24 @@ class KafkaController(val config: KafkaConfig, @@ -1191,6 +1227,24 @@ class KafkaController(val config: KafkaConfig,
globalTopicCount = if (!isActive) 0 else controllerContext.allTopics.size
globalPartitionCount = if (!isActive) 0 else controllerContext.partitionLeadershipInfo.size
topicsToDeleteCount = if (!isActive) 0 else controllerContext.topicsToBeDeleted.size
replicasToDeleteCount = if (!isActive) 0 else controllerContext.topicsToBeDeleted.map { topic =>
// For each enqueued topic, count the number of replicas that are not yet deleted
controllerContext.replicasForTopic(topic).count { replica =>
controllerContext.replicaState(replica) != ReplicaDeletionSuccessful
}
}.sum
ineligibleTopicsToDeleteCount = if (!isActive) 0 else controllerContext.topicsIneligibleForDeletion.size
ineligibleReplicasToDeleteCount = if (!isActive) 0 else controllerContext.topicsToBeDeleted.map { topic =>
// For each enqueued topic, count the number of replicas that are ineligible
controllerContext.replicasForTopic(topic).count { replica =>
controllerContext.replicaState(replica) == ReplicaDeletionIneligible
}
}.sum
}
// visible for testing

4
core/src/test/scala/unit/kafka/metrics/MetricsTest.scala

@ -141,6 +141,10 @@ class MetricsTest extends KafkaServerTestHarness with Logging { @@ -141,6 +141,10 @@ class MetricsTest extends KafkaServerTestHarness with Logging {
assertEquals(metrics.keySet.asScala.count(_.getMBeanName == "kafka.controller:type=KafkaController,name=PreferredReplicaImbalanceCount"), 1)
assertEquals(metrics.keySet.asScala.count(_.getMBeanName == "kafka.controller:type=KafkaController,name=GlobalTopicCount"), 1)
assertEquals(metrics.keySet.asScala.count(_.getMBeanName == "kafka.controller:type=KafkaController,name=GlobalPartitionCount"), 1)
assertEquals(metrics.keySet.asScala.count(_.getMBeanName == "kafka.controller:type=KafkaController,name=TopicsToDeleteCount"), 1)
assertEquals(metrics.keySet.asScala.count(_.getMBeanName == "kafka.controller:type=KafkaController,name=ReplicasToDeleteCount"), 1)
assertEquals(metrics.keySet.asScala.count(_.getMBeanName == "kafka.controller:type=KafkaController,name=TopicsIneligibleToDeleteCount"), 1)
assertEquals(metrics.keySet.asScala.count(_.getMBeanName == "kafka.controller:type=KafkaController,name=ReplicasIneligibleToDeleteCount"), 1)
}
/**

20
docs/ops.html

@ -900,6 +900,26 @@ @@ -900,6 +900,26 @@
<td>kafka.controller:type=ControllerStats,name=UncleanLeaderElectionsPerSec</td>
<td>0</td>
</tr>
<tr>
<td>Pending topic deletes</td>
<td>kafka.controller:type=KafkaController,name=TopicsToDeleteCount</td>
<td></td>
</tr>
<tr>
<td>Pending replica deletes</td>
<td>kafka.controller:type=KafkaController,name=ReplicasToDeleteCount</td>
<td></td>
</tr>
<tr>
<td>Ineligible pending topic deletes</td>
<td>kafka.controller:type=KafkaController,name=TopicsIneligibleToDeleteCount</td>
<td></td>
</tr>
<tr>
<td>Ineligible pending replica deletes</td>
<td>kafka.controller:type=KafkaController,name=ReplicasIneligibleToDeleteCount</td>
<td></td>
</tr>
<tr>
<td>Partition counts</td>
<td>kafka.server:type=ReplicaManager,name=PartitionCount</td>

Loading…
Cancel
Save