Browse Source

KAFKA-10189: reset event queue time histogram when queue is empty (#8935)

add a timeout for event queue time histogram;
reset eventQueueTimeHist when the controller event queue is empty;

Reviewers: José Armando García Sancio <jsancio@users.noreply.github.com>, Manikumar Reddy <manikumar.reddy@gmail.com>, Igor Soarez <i@soarez.me>, Jun Rao <junrao@gmail.com>
pull/5940/merge
Jeff Kim 4 years ago committed by GitHub
parent
commit
f77e250b99
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 22
      core/src/main/scala/kafka/controller/ControllerEventManager.scala
  2. 34
      core/src/test/scala/unit/kafka/controller/ControllerEventManagerTest.scala

22
core/src/main/scala/kafka/controller/ControllerEventManager.scala

@ -18,7 +18,7 @@ @@ -18,7 +18,7 @@
package kafka.controller
import java.util.concurrent.atomic.AtomicBoolean
import java.util.concurrent.{CountDownLatch, LinkedBlockingQueue}
import java.util.concurrent.{CountDownLatch, LinkedBlockingQueue, TimeUnit}
import java.util.concurrent.locks.ReentrantLock
import kafka.metrics.{KafkaMetricsGroup, KafkaTimer}
@ -69,7 +69,8 @@ class QueuedEvent(val event: ControllerEvent, @@ -69,7 +69,8 @@ class QueuedEvent(val event: ControllerEvent,
class ControllerEventManager(controllerId: Int,
processor: ControllerEventProcessor,
time: Time,
rateAndTimeMetrics: Map[ControllerState, KafkaTimer]) extends KafkaMetricsGroup {
rateAndTimeMetrics: Map[ControllerState, KafkaTimer],
eventQueueTimeTimeoutMs: Long = 300000) extends KafkaMetricsGroup {
import ControllerEventManager._
@volatile private var _state: ControllerState = ControllerState.Idle
@ -115,7 +116,7 @@ class ControllerEventManager(controllerId: Int, @@ -115,7 +116,7 @@ class ControllerEventManager(controllerId: Int,
logIdent = s"[ControllerEventThread controllerId=$controllerId] "
override def doWork(): Unit = {
val dequeued = queue.take()
val dequeued = pollFromEventQueue()
dequeued.event match {
case ShutdownEventThread => // The shutting down of the thread has been initiated at this point. Ignore this event.
case controllerEvent =>
@ -139,4 +140,19 @@ class ControllerEventManager(controllerId: Int, @@ -139,4 +140,19 @@ class ControllerEventManager(controllerId: Int,
}
}
private def pollFromEventQueue(): QueuedEvent = {
val count = eventQueueTimeHist.count()
if (count != 0) {
val event = queue.poll(eventQueueTimeTimeoutMs, TimeUnit.MILLISECONDS)
if (event == null) {
eventQueueTimeHist.clear()
queue.take()
} else {
event
}
} else {
queue.take()
}
}
}

34
core/src/test/scala/unit/kafka/controller/ControllerEventManagerTest.scala

@ -135,6 +135,40 @@ class ControllerEventManagerTest { @@ -135,6 +135,40 @@ class ControllerEventManagerTest {
assertEquals(500, queueTimeHistogram.max, 0.01)
}
@Test
def testEventQueueTimeResetOnTimeout(): Unit = {
val metricName = "kafka.controller:type=ControllerEventManager,name=EventQueueTimeMs"
val controllerStats = new ControllerStats
val time = new MockTime()
val processedEvents = new AtomicInteger()
val eventProcessor = new ControllerEventProcessor {
override def process(event: ControllerEvent): Unit = {
processedEvents.incrementAndGet()
}
override def preempt(event: ControllerEvent): Unit = {}
}
controllerEventManager = new ControllerEventManager(0, eventProcessor,
time, controllerStats.rateAndTimeMetrics, 1)
controllerEventManager.start()
controllerEventManager.put(TopicChange)
controllerEventManager.put(TopicChange)
TestUtils.waitUntilTrue(() => processedEvents.get() == 2,
"Timed out waiting for processing of all events")
val queueTimeHistogram = KafkaYammerMetrics.defaultRegistry.allMetrics.asScala.filter { case (k, _) =>
k.getMBeanName == metricName
}.values.headOption.getOrElse(fail(s"Unable to find metric $metricName")).asInstanceOf[Histogram]
TestUtils.waitUntilTrue(() => queueTimeHistogram.count == 0,
"Timed out on resetting queueTimeHistogram")
assertEquals(0, queueTimeHistogram.min, 0.1)
assertEquals(0, queueTimeHistogram.max, 0.1)
}
@Test
def testSuccessfulEvent(): Unit = {
check("kafka.controller:type=ControllerStats,name=AutoLeaderBalanceRateAndTimeMs",

Loading…
Cancel
Save