diff --git a/build.gradle b/build.gradle
index c88ac7b189d..8fc01156a86 100644
--- a/build.gradle
+++ b/build.gradle
@@ -1260,6 +1260,7 @@ project(':group-coordinator') {
implementation project(':clients')
implementation project(':metadata')
implementation libs.slf4jApi
+ implementation libs.metrics
testImplementation project(':clients').sourceSets.test.output
testImplementation project(':server-common').sourceSets.test.output
diff --git a/checkstyle/import-control.xml b/checkstyle/import-control.xml
index 22195e78ee6..ea41f587c92 100644
--- a/checkstyle/import-control.xml
+++ b/checkstyle/import-control.xml
@@ -239,6 +239,10 @@
+
+
+
+
diff --git a/checkstyle/suppressions.xml b/checkstyle/suppressions.xml
index a554fcc56b6..a66e63428dc 100644
--- a/checkstyle/suppressions.xml
+++ b/checkstyle/suppressions.xml
@@ -325,6 +325,8 @@
files="(ConsumerGroupMember|GroupMetadataManager).java"/>
+
doLoad(tp, coordinator, future))
+ () => doLoad(tp, coordinator, future, startTimeMs))
if (result.isCancelled) {
future.completeExceptionally(new RuntimeException("Coordinator loader is closed."))
}
@@ -72,7 +75,8 @@ class CoordinatorLoaderImpl[T](
private def doLoad(
tp: TopicPartition,
coordinator: CoordinatorPlayback[T],
- future: CompletableFuture[Void]
+ future: CompletableFuture[LoadSummary],
+ startTimeMs: Long
): Unit = {
try {
replicaManager.getLog(tp) match {
@@ -92,6 +96,8 @@ class CoordinatorLoaderImpl[T](
// the log end offset but the log is empty. This could happen with compacted topics.
var readAtLeastOneRecord = true
+ var numRecords = 0
+ var numBytes = 0
while (currentOffset < logEndOffset && readAtLeastOneRecord && isRunning.get) {
val fetchDataInfo = log.read(
startOffset = currentOffset,
@@ -131,6 +137,7 @@ class CoordinatorLoaderImpl[T](
throw new IllegalStateException("Control batches are not supported yet.")
} else {
batch.asScala.foreach { record =>
+ numRecords = numRecords + 1
try {
coordinator.replay(deserializer.deserialize(record.key, record.value))
} catch {
@@ -143,10 +150,12 @@ class CoordinatorLoaderImpl[T](
currentOffset = batch.nextOffset
}
+ numBytes = numBytes + memoryRecords.sizeInBytes()
}
+ val endTimeMs = time.milliseconds()
if (isRunning.get) {
- future.complete(null)
+ future.complete(new LoadSummary(startTimeMs, endTimeMs, numRecords, numBytes))
} else {
future.completeExceptionally(new RuntimeException("Coordinator loader is closed."))
}
diff --git a/core/src/main/scala/kafka/server/BrokerServer.scala b/core/src/main/scala/kafka/server/BrokerServer.scala
index daec8f73089..fe8e5acb9fa 100644
--- a/core/src/main/scala/kafka/server/BrokerServer.scala
+++ b/core/src/main/scala/kafka/server/BrokerServer.scala
@@ -36,6 +36,7 @@ import org.apache.kafka.common.security.token.delegation.internals.DelegationTok
import org.apache.kafka.common.utils.{LogContext, Time}
import org.apache.kafka.common.{ClusterResource, KafkaException, TopicPartition}
import org.apache.kafka.coordinator.group
+import org.apache.kafka.coordinator.group.metrics.GroupCoordinatorRuntimeMetrics
import org.apache.kafka.coordinator.group.util.SystemTimerReaper
import org.apache.kafka.coordinator.group.{GroupCoordinator, GroupCoordinatorConfig, GroupCoordinatorService, RecordSerde}
import org.apache.kafka.image.publisher.MetadataPublisher
@@ -531,6 +532,7 @@ class BrokerServer(
new SystemTimer("group-coordinator")
)
val loader = new CoordinatorLoaderImpl[group.Record](
+ time,
replicaManager,
serde,
config.offsetsLoadBufferSize
@@ -546,6 +548,7 @@ class BrokerServer(
.withTimer(timer)
.withLoader(loader)
.withWriter(writer)
+ .withCoordinatorRuntimeMetrics(new GroupCoordinatorRuntimeMetrics(metrics))
.build()
} else {
GroupCoordinatorAdapter(
diff --git a/core/src/test/scala/unit/kafka/coordinator/group/CoordinatorLoaderImplTest.scala b/core/src/test/scala/unit/kafka/coordinator/group/CoordinatorLoaderImplTest.scala
index c71c28d47ea..ef19d732c34 100644
--- a/core/src/test/scala/unit/kafka/coordinator/group/CoordinatorLoaderImplTest.scala
+++ b/core/src/test/scala/unit/kafka/coordinator/group/CoordinatorLoaderImplTest.scala
@@ -22,14 +22,16 @@ import kafka.utils.TestUtils
import org.apache.kafka.common.TopicPartition
import org.apache.kafka.common.errors.NotLeaderOrFollowerException
import org.apache.kafka.common.record.{CompressionType, FileRecords, MemoryRecords, SimpleRecord}
+import org.apache.kafka.common.utils.{MockTime, Time}
import org.apache.kafka.coordinator.group.runtime.CoordinatorLoader.UnknownRecordTypeException
import org.apache.kafka.coordinator.group.runtime.{CoordinatorLoader, CoordinatorPlayback}
import org.apache.kafka.storage.internals.log.{FetchDataInfo, FetchIsolation, LogOffsetMetadata}
import org.apache.kafka.test.TestUtils.assertFutureThrows
-import org.junit.jupiter.api.Assertions.{assertEquals, assertNull}
+import org.junit.jupiter.api.Assertions.{assertEquals, assertNotNull}
import org.junit.jupiter.api.{Test, Timeout}
import org.mockito.{ArgumentCaptor, ArgumentMatchers}
import org.mockito.Mockito.{mock, verify, when}
+import org.mockito.invocation.InvocationOnMock
import java.nio.ByteBuffer
import java.nio.charset.Charset
@@ -54,6 +56,7 @@ class CoordinatorLoaderImplTest {
val coordinator = mock(classOf[CoordinatorPlayback[(String, String)]])
TestUtils.resource(new CoordinatorLoaderImpl[(String, String)](
+ time = Time.SYSTEM,
replicaManager = replicaManager,
deserializer = serde,
loadBufferSize = 1000
@@ -73,6 +76,7 @@ class CoordinatorLoaderImplTest {
val coordinator = mock(classOf[CoordinatorPlayback[(String, String)]])
TestUtils.resource(new CoordinatorLoaderImpl[(String, String)](
+ time = Time.SYSTEM,
replicaManager = replicaManager,
deserializer = serde,
loadBufferSize = 1000
@@ -93,6 +97,7 @@ class CoordinatorLoaderImplTest {
val coordinator = mock(classOf[CoordinatorPlayback[(String, String)]])
TestUtils.resource(new CoordinatorLoaderImpl[(String, String)](
+ time = Time.SYSTEM,
replicaManager = replicaManager,
deserializer = serde,
loadBufferSize = 1000
@@ -126,7 +131,7 @@ class CoordinatorLoaderImplTest {
minOneMessage = true
)).thenReturn(readResult2)
- assertNull(loader.load(tp, coordinator).get(10, TimeUnit.SECONDS))
+ assertNotNull(loader.load(tp, coordinator).get(10, TimeUnit.SECONDS))
verify(coordinator).replay(("k1", "v1"))
verify(coordinator).replay(("k2", "v2"))
@@ -145,6 +150,7 @@ class CoordinatorLoaderImplTest {
val coordinator = mock(classOf[CoordinatorPlayback[(String, String)]])
TestUtils.resource(new CoordinatorLoaderImpl[(String, String)](
+ time = Time.SYSTEM,
replicaManager = replicaManager,
deserializer = serde,
loadBufferSize = 1000
@@ -187,6 +193,7 @@ class CoordinatorLoaderImplTest {
val coordinator = mock(classOf[CoordinatorPlayback[(String, String)]])
TestUtils.resource(new CoordinatorLoaderImpl[(String, String)](
+ time = Time.SYSTEM,
replicaManager = replicaManager,
deserializer = serde,
loadBufferSize = 1000
@@ -226,6 +233,7 @@ class CoordinatorLoaderImplTest {
val coordinator = mock(classOf[CoordinatorPlayback[(String, String)]])
TestUtils.resource(new CoordinatorLoaderImpl[(String, String)](
+ time = Time.SYSTEM,
replicaManager = replicaManager,
deserializer = serde,
loadBufferSize = 1000
@@ -266,6 +274,7 @@ class CoordinatorLoaderImplTest {
val coordinator = mock(classOf[CoordinatorPlayback[(String, String)]])
TestUtils.resource(new CoordinatorLoaderImpl[(String, String)](
+ time = Time.SYSTEM,
replicaManager = replicaManager,
deserializer = serde,
loadBufferSize = 1000
@@ -283,7 +292,63 @@ class CoordinatorLoaderImplTest {
minOneMessage = true
)).thenReturn(readResult)
- assertNull(loader.load(tp, coordinator).get(10, TimeUnit.SECONDS))
+ assertNotNull(loader.load(tp, coordinator).get(10, TimeUnit.SECONDS))
+ }
+ }
+
+ @Test
+ def testLoadSummary(): Unit = {
+ val tp = new TopicPartition("foo", 0)
+ val replicaManager = mock(classOf[ReplicaManager])
+ val serde = new StringKeyValueDeserializer
+ val log = mock(classOf[UnifiedLog])
+ val coordinator = mock(classOf[CoordinatorPlayback[(String, String)]])
+ val time = new MockTime()
+
+ TestUtils.resource(new CoordinatorLoaderImpl[(String, String)](
+ time,
+ replicaManager = replicaManager,
+ deserializer = serde,
+ loadBufferSize = 1000
+ )) { loader =>
+ val startTimeMs = time.milliseconds()
+ when(replicaManager.getLog(tp)).thenReturn(Some(log))
+ when(log.logStartOffset).thenReturn(0L)
+ when(replicaManager.getLogEndOffset(tp)).thenReturn(Some(5L))
+
+ val readResult1 = logReadResult(startOffset = 0, records = Seq(
+ new SimpleRecord("k1".getBytes, "v1".getBytes),
+ new SimpleRecord("k2".getBytes, "v2".getBytes)
+ ))
+
+ when(log.read(
+ startOffset = 0L,
+ maxLength = 1000,
+ isolation = FetchIsolation.LOG_END,
+ minOneMessage = true
+ )).thenAnswer((_: InvocationOnMock) => {
+ time.sleep(1000)
+ readResult1
+ })
+
+ val readResult2 = logReadResult(startOffset = 2, records = Seq(
+ new SimpleRecord("k3".getBytes, "v3".getBytes),
+ new SimpleRecord("k4".getBytes, "v4".getBytes),
+ new SimpleRecord("k5".getBytes, "v5".getBytes)
+ ))
+
+ when(log.read(
+ startOffset = 2L,
+ maxLength = 1000,
+ isolation = FetchIsolation.LOG_END,
+ minOneMessage = true
+ )).thenReturn(readResult2)
+
+ val summary = loader.load(tp, coordinator).get(10, TimeUnit.SECONDS)
+ assertEquals(startTimeMs, summary.startTimeMs())
+ assertEquals(startTimeMs + 1000, summary.endTimeMs())
+ assertEquals(5, summary.numRecords())
+ assertEquals(readResult1.records.sizeInBytes() + readResult2.records.sizeInBytes(), summary.numBytes())
}
}
diff --git a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorService.java b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorService.java
index e30bb930269..176dc770cfe 100644
--- a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorService.java
+++ b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorService.java
@@ -62,6 +62,7 @@ import org.apache.kafka.common.utils.BufferSupplier;
import org.apache.kafka.common.utils.LogContext;
import org.apache.kafka.common.utils.Time;
import org.apache.kafka.common.utils.Utils;
+import org.apache.kafka.coordinator.group.metrics.CoordinatorRuntimeMetrics;
import org.apache.kafka.coordinator.group.runtime.CoordinatorShardBuilderSupplier;
import org.apache.kafka.coordinator.group.runtime.CoordinatorEventProcessor;
import org.apache.kafka.coordinator.group.runtime.CoordinatorLoader;
@@ -102,6 +103,7 @@ public class GroupCoordinatorService implements GroupCoordinator {
private CoordinatorLoader loader;
private Time time;
private Timer timer;
+ private CoordinatorRuntimeMetrics coordinatorRuntimeMetrics;
public Builder(
int nodeId,
@@ -131,6 +133,11 @@ public class GroupCoordinatorService implements GroupCoordinator {
return this;
}
+ public Builder withCoordinatorRuntimeMetrics(CoordinatorRuntimeMetrics coordinatorRuntimeMetrics) {
+ this.coordinatorRuntimeMetrics = coordinatorRuntimeMetrics;
+ return this;
+ }
+
public GroupCoordinatorService build() {
if (config == null)
throw new IllegalArgumentException("Config must be set.");
@@ -142,6 +149,8 @@ public class GroupCoordinatorService implements GroupCoordinator {
throw new IllegalArgumentException("Time must be set.");
if (timer == null)
throw new IllegalArgumentException("Timer must be set.");
+ if (coordinatorRuntimeMetrics == null)
+ throw new IllegalArgumentException("CoordinatorRuntimeMetrics must be set.");
String logPrefix = String.format("GroupCoordinator id=%d", nodeId);
LogContext logContext = new LogContext(String.format("[%s] ", logPrefix));
@@ -152,7 +161,9 @@ public class GroupCoordinatorService implements GroupCoordinator {
CoordinatorEventProcessor processor = new MultiThreadedEventProcessor(
logContext,
"group-coordinator-event-processor-",
- config.numThreads
+ config.numThreads,
+ time,
+ coordinatorRuntimeMetrics
);
CoordinatorRuntime runtime =
@@ -166,6 +177,7 @@ public class GroupCoordinatorService implements GroupCoordinator {
.withLoader(loader)
.withCoordinatorShardBuilderSupplier(supplier)
.withTime(time)
+ .withCoordinatorRuntimeMetrics(coordinatorRuntimeMetrics)
.build();
return new GroupCoordinatorService(
diff --git a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/metrics/CoordinatorRuntimeMetrics.java b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/metrics/CoordinatorRuntimeMetrics.java
new file mode 100644
index 00000000000..8978a040007
--- /dev/null
+++ b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/metrics/CoordinatorRuntimeMetrics.java
@@ -0,0 +1,68 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.coordinator.group.metrics;
+
+import org.apache.kafka.coordinator.group.runtime.CoordinatorRuntime.CoordinatorState;
+
+import java.util.function.Supplier;
+
+/**
+ * Used by the group and transaction coordinator runtimes, the metrics suite holds partition state gauges and sensors.
+ */
+public interface CoordinatorRuntimeMetrics extends AutoCloseable {
+
+ /**
+ * Called when the partition state changes.
+ * @param oldState The old state.
+ * @param newState The new state to transition to.
+ */
+ void recordPartitionStateChange(CoordinatorState oldState, CoordinatorState newState);
+
+ /**
+ * Record the partition load metric.
+ * @param startTimeMs The partition load start time.
+ * @param endTimeMs The partition load end time.
+ */
+ void recordPartitionLoadSensor(long startTimeMs, long endTimeMs);
+
+ /**
+ * Update the event queue time.
+ *
+ * @param durationMs The queue time.
+ */
+ void recordEventQueueTime(long durationMs);
+
+ /**
+ * Update the event queue processing time.
+ *
+ * @param durationMs The event processing time.
+ */
+ void recordEventQueueProcessingTime(long durationMs);
+
+ /**
+ * Record the thread idle ratio.
+ * @param ratio The idle ratio.
+ */
+ void recordThreadIdleRatio(double ratio);
+
+ /**
+ * Register the event queue size gauge.
+ *
+ * @param sizeSupplier The size supplier.
+ */
+ void registerEventQueueSizeGauge(Supplier sizeSupplier);
+}
diff --git a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/metrics/GroupCoordinatorRuntimeMetrics.java b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/metrics/GroupCoordinatorRuntimeMetrics.java
new file mode 100644
index 00000000000..e4c6552c4bc
--- /dev/null
+++ b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/metrics/GroupCoordinatorRuntimeMetrics.java
@@ -0,0 +1,219 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.coordinator.group.metrics;
+
+import org.apache.kafka.common.MetricName;
+import org.apache.kafka.common.metrics.Gauge;
+import org.apache.kafka.common.metrics.Metrics;
+import org.apache.kafka.common.metrics.Sensor;
+import org.apache.kafka.common.metrics.stats.Avg;
+import org.apache.kafka.common.metrics.stats.Max;
+import org.apache.kafka.common.metrics.stats.Min;
+import org.apache.kafka.coordinator.group.runtime.CoordinatorRuntime.CoordinatorState;
+
+import java.util.Arrays;
+import java.util.Objects;
+import java.util.concurrent.atomic.AtomicLong;
+import java.util.function.Supplier;
+
+public class GroupCoordinatorRuntimeMetrics implements CoordinatorRuntimeMetrics {
+ /**
+ * The metrics group.
+ */
+ public static final String METRICS_GROUP = "group-coordinator-metrics";
+
+ /**
+ * The partition count metric name.
+ */
+ public static final String NUM_PARTITIONS_METRIC_NAME = "num-partitions";
+
+ /**
+ * Metric to count the number of partitions in Loading state.
+ */
+ private final MetricName numPartitionsLoading;
+ private final AtomicLong numPartitionsLoadingCounter = new AtomicLong(0);
+
+ /**
+ * Metric to count the number of partitions in Active state.
+ */
+ private final MetricName numPartitionsActive;
+ private final AtomicLong numPartitionsActiveCounter = new AtomicLong(0);
+
+ /**
+ * Metric to count the number of partitions in Failed state.
+ */
+ private final MetricName numPartitionsFailed;
+ private final AtomicLong numPartitionsFailedCounter = new AtomicLong(0);
+
+ /**
+ * Metric to count the size of the processor queue.
+ */
+ private final MetricName eventQueueSize;
+
+ /**
+ * The Kafka metrics registry.
+ */
+ private final Metrics metrics;
+
+ /**
+ * The partition load sensor.
+ */
+ private Sensor partitionLoadSensor;
+
+ /**
+ * The thread idle sensor.
+ */
+ private Sensor threadIdleRatioSensor;
+
+ public GroupCoordinatorRuntimeMetrics(Metrics metrics) {
+ this.metrics = Objects.requireNonNull(metrics);
+
+ this.numPartitionsLoading = kafkaMetricName(
+ NUM_PARTITIONS_METRIC_NAME,
+ "The number of partitions in Loading state.",
+ "state", "loading"
+ );
+
+ this.numPartitionsActive = kafkaMetricName(
+ NUM_PARTITIONS_METRIC_NAME,
+ "The number of partitions in Active state.",
+ "state", "active"
+ );
+
+ this.numPartitionsFailed = kafkaMetricName(
+ NUM_PARTITIONS_METRIC_NAME,
+ "The number of partitions in Failed state.",
+ "state", "failed"
+ );
+
+ this.eventQueueSize = kafkaMetricName("event-queue-size", "The event accumulator queue size.");
+
+ metrics.addMetric(numPartitionsLoading, (Gauge) (config, now) -> numPartitionsLoadingCounter.get());
+ metrics.addMetric(numPartitionsActive, (Gauge) (config, now) -> numPartitionsActiveCounter.get());
+ metrics.addMetric(numPartitionsFailed, (Gauge) (config, now) -> numPartitionsFailedCounter.get());
+
+ this.partitionLoadSensor = metrics.sensor("GroupPartitionLoadTime");
+ this.partitionLoadSensor.add(
+ metrics.metricName(
+ "partition-load-time-max",
+ METRICS_GROUP,
+ "The max time it took to load the partitions in the last 30 sec."
+ ), new Max());
+ this.partitionLoadSensor.add(
+ metrics.metricName(
+ "partition-load-time-avg",
+ METRICS_GROUP,
+ "The average time it took to load the partitions in the last 30 sec."
+ ), new Avg());
+
+ this.threadIdleRatioSensor = metrics.sensor("ThreadIdleRatio");
+ this.threadIdleRatioSensor.add(
+ metrics.metricName(
+ "thread-idle-ratio-min",
+ METRICS_GROUP,
+ "The minimum thread idle ratio over the last 30 seconds."
+ ), new Min());
+ this.threadIdleRatioSensor.add(
+ metrics.metricName(
+ "thread-idle-ratio-avg",
+ METRICS_GROUP,
+ "The average thread idle ratio over the last 30 seconds."
+ ), new Avg());
+ }
+
+ /**
+ * Retrieve the kafka metric name.
+ *
+ * @param name The name of the metric.
+ *
+ * @return The kafka metric name.
+ */
+ private MetricName kafkaMetricName(String name, String description, String... keyValue) {
+ return metrics.metricName(name, METRICS_GROUP, description, keyValue);
+ }
+
+ @Override
+ public void close() {
+ Arrays.asList(
+ numPartitionsLoading,
+ numPartitionsActive,
+ numPartitionsFailed,
+ eventQueueSize
+ ).forEach(metrics::removeMetric);
+
+ metrics.removeSensor(partitionLoadSensor.name());
+ metrics.removeSensor(threadIdleRatioSensor.name());
+ }
+
+ /**
+ * Called when the partition state changes. Decrement the old state and increment the new state.
+ *
+ * @param oldState The old state.
+ * @param newState The new state to transition to.
+ */
+ @Override
+ public void recordPartitionStateChange(CoordinatorState oldState, CoordinatorState newState) {
+ switch (oldState) {
+ case INITIAL:
+ case CLOSED:
+ break;
+ case LOADING:
+ numPartitionsLoadingCounter.decrementAndGet();
+ break;
+ case ACTIVE:
+ numPartitionsActiveCounter.decrementAndGet();
+ break;
+ case FAILED:
+ numPartitionsFailedCounter.decrementAndGet();
+ }
+
+ switch (newState) {
+ case INITIAL:
+ case CLOSED:
+ break;
+ case LOADING:
+ numPartitionsLoadingCounter.incrementAndGet();
+ break;
+ case ACTIVE:
+ numPartitionsActiveCounter.incrementAndGet();
+ break;
+ case FAILED:
+ numPartitionsFailedCounter.incrementAndGet();
+ }
+ }
+
+ @Override
+ public void recordPartitionLoadSensor(long startTimeMs, long endTimeMs) {
+ this.partitionLoadSensor.record(endTimeMs - startTimeMs, endTimeMs, false);
+ }
+
+ @Override
+ public void recordEventQueueTime(long durationMs) { }
+
+ @Override
+ public void recordEventQueueProcessingTime(long durationMs) { }
+
+ @Override
+ public void recordThreadIdleRatio(double ratio) {
+ threadIdleRatioSensor.record(ratio);
+ }
+
+ @Override
+ public void registerEventQueueSizeGauge(Supplier sizeSupplier) {
+ metrics.addMetric(eventQueueSize, (Gauge) (config, now) -> (long) sizeSupplier.get());
+ }
+}
diff --git a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/runtime/CoordinatorEvent.java b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/runtime/CoordinatorEvent.java
index fb9bdbed651..19fc0d88cf8 100644
--- a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/runtime/CoordinatorEvent.java
+++ b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/runtime/CoordinatorEvent.java
@@ -35,4 +35,9 @@ public interface CoordinatorEvent extends EventAccumulator.Event
* @param exception An exception if the processing of the event failed or null otherwise.
*/
void complete(Throwable exception);
+
+ /**
+ * @return The created time in milliseconds.
+ */
+ long createdTimeMs();
}
diff --git a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/runtime/CoordinatorLoader.java b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/runtime/CoordinatorLoader.java
index 0fe23b5dc69..dd6a67ec15c 100644
--- a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/runtime/CoordinatorLoader.java
+++ b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/runtime/CoordinatorLoader.java
@@ -46,6 +46,49 @@ public interface CoordinatorLoader extends AutoCloseable {
}
}
+ /**
+ * Object that is returned as part of the future from load(). Holds the partition load time and the
+ * end time.
+ */
+ class LoadSummary {
+ private final long startTimeMs;
+ private final long endTimeMs;
+ private final long numRecords;
+ private final long numBytes;
+
+ public LoadSummary(long startTimeMs, long endTimeMs, long numRecords, long numBytes) {
+ this.startTimeMs = startTimeMs;
+ this.endTimeMs = endTimeMs;
+ this.numRecords = numRecords;
+ this.numBytes = numBytes;
+ }
+
+ public long startTimeMs() {
+ return startTimeMs;
+ }
+
+ public long endTimeMs() {
+ return endTimeMs;
+ }
+
+ public long numRecords() {
+ return numRecords;
+ }
+
+ public long numBytes() {
+ return numBytes;
+ }
+
+ @Override
+ public String toString() {
+ return "LoadSummary(" +
+ "startTimeMs=" + startTimeMs +
+ ", endTimeMs=" + endTimeMs +
+ ", numRecords=" + numRecords +
+ ", numBytes=" + numBytes + ")";
+ }
+ }
+
/**
* Deserializer to translates bytes to T.
*
@@ -69,7 +112,7 @@ public interface CoordinatorLoader extends AutoCloseable {
* @param tp The TopicPartition to read from.
* @param coordinator The object to apply records to.
*/
- CompletableFuture load(
+ CompletableFuture load(
TopicPartition tp,
CoordinatorPlayback coordinator
);
diff --git a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/runtime/CoordinatorRuntime.java b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/runtime/CoordinatorRuntime.java
index 8e0f53f5d0b..deeed11bc85 100644
--- a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/runtime/CoordinatorRuntime.java
+++ b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/runtime/CoordinatorRuntime.java
@@ -24,6 +24,7 @@ import org.apache.kafka.common.protocol.Errors;
import org.apache.kafka.common.utils.LogContext;
import org.apache.kafka.common.utils.Time;
import org.apache.kafka.common.utils.Utils;
+import org.apache.kafka.coordinator.group.metrics.CoordinatorRuntimeMetrics;
import org.apache.kafka.deferred.DeferredEvent;
import org.apache.kafka.deferred.DeferredEventQueue;
import org.apache.kafka.image.MetadataDelta;
@@ -89,6 +90,7 @@ public class CoordinatorRuntime, U> implements Aut
private CoordinatorShardBuilderSupplier coordinatorShardBuilderSupplier;
private Time time = Time.SYSTEM;
private Timer timer;
+ private CoordinatorRuntimeMetrics runtimeMetrics;
public Builder withLogPrefix(String logPrefix) {
this.logPrefix = logPrefix;
@@ -130,6 +132,11 @@ public class CoordinatorRuntime, U> implements Aut
return this;
}
+ public Builder withCoordinatorRuntimeMetrics(CoordinatorRuntimeMetrics runtimeMetrics) {
+ this.runtimeMetrics = runtimeMetrics;
+ return this;
+ }
+
public CoordinatorRuntime build() {
if (logPrefix == null)
logPrefix = "";
@@ -147,6 +154,8 @@ public class CoordinatorRuntime, U> implements Aut
throw new IllegalArgumentException("Time must be set.");
if (timer == null)
throw new IllegalArgumentException("Timer must be set.");
+ if (runtimeMetrics == null)
+ throw new IllegalArgumentException("CoordinatorRuntimeMetrics must be set.");
return new CoordinatorRuntime<>(
logPrefix,
@@ -156,7 +165,8 @@ public class CoordinatorRuntime, U> implements Aut
loader,
coordinatorShardBuilderSupplier,
time,
- timer
+ timer,
+ runtimeMetrics
);
}
}
@@ -164,7 +174,7 @@ public class CoordinatorRuntime, U> implements Aut
/**
* The various state that a coordinator for a partition can be in.
*/
- enum CoordinatorState {
+ public enum CoordinatorState {
/**
* Initial state when a coordinator is created.
*/
@@ -501,6 +511,7 @@ public class CoordinatorRuntime, U> implements Aut
throw new IllegalStateException("Cannot transition from " + state + " to " + newState);
}
+ CoordinatorState oldState = state;
log.debug("Transition from {} to {}.", state, newState);
switch (newState) {
case LOADING:
@@ -537,6 +548,8 @@ public class CoordinatorRuntime, U> implements Aut
default:
throw new IllegalArgumentException("Transitioning to " + newState + " is not supported.");
}
+
+ runtimeMetrics.recordPartitionStateChange(oldState, state);
}
/**
@@ -608,6 +621,11 @@ public class CoordinatorRuntime, U> implements Aut
*/
CoordinatorResult result;
+ /**
+ * The time this event was created.
+ */
+ private final long createdTimeMs;
+
/**
* Constructor.
*
@@ -624,6 +642,7 @@ public class CoordinatorRuntime, U> implements Aut
this.name = name;
this.op = op;
this.future = new CompletableFuture<>();
+ this.createdTimeMs = time.milliseconds();
}
/**
@@ -709,6 +728,11 @@ public class CoordinatorRuntime, U> implements Aut
}
}
+ @Override
+ public long createdTimeMs() {
+ return this.createdTimeMs;
+ }
+
@Override
public String toString() {
return "CoordinatorWriteEvent(name=" + name + ")";
@@ -768,6 +792,11 @@ public class CoordinatorRuntime, U> implements Aut
*/
T response;
+ /**
+ * The time this event was created.
+ */
+ private final long createdTimeMs;
+
/**
* Constructor.
*
@@ -784,6 +813,7 @@ public class CoordinatorRuntime, U> implements Aut
this.name = name;
this.op = op;
this.future = new CompletableFuture<>();
+ this.createdTimeMs = time.milliseconds();
}
/**
@@ -832,6 +862,11 @@ public class CoordinatorRuntime, U> implements Aut
}
}
+ @Override
+ public long createdTimeMs() {
+ return this.createdTimeMs;
+ }
+
@Override
public String toString() {
return "CoordinatorReadEvent(name=" + name + ")";
@@ -857,6 +892,11 @@ public class CoordinatorRuntime, U> implements Aut
*/
final Runnable op;
+ /**
+ * The time this event was created.
+ */
+ private final long createdTimeMs;
+
/**
* Constructor.
*
@@ -872,6 +912,7 @@ public class CoordinatorRuntime, U> implements Aut
this.tp = tp;
this.name = name;
this.op = op;
+ this.createdTimeMs = time.milliseconds();
}
/**
@@ -907,6 +948,11 @@ public class CoordinatorRuntime, U> implements Aut
}
}
+ @Override
+ public long createdTimeMs() {
+ return this.createdTimeMs;
+ }
+
@Override
public String toString() {
return "InternalEvent(name=" + name + ")";
@@ -995,6 +1041,11 @@ public class CoordinatorRuntime, U> implements Aut
*/
private final CoordinatorShardBuilderSupplier coordinatorShardBuilderSupplier;
+ /**
+ * The coordinator runtime metrics.
+ */
+ private final CoordinatorRuntimeMetrics runtimeMetrics;
+
/**
* Atomic boolean indicating whether the runtime is running.
*/
@@ -1025,7 +1076,8 @@ public class CoordinatorRuntime, U> implements Aut
CoordinatorLoader loader,
CoordinatorShardBuilderSupplier coordinatorShardBuilderSupplier,
Time time,
- Timer timer
+ Timer timer,
+ CoordinatorRuntimeMetrics runtimeMetrics
) {
this.logPrefix = logPrefix;
this.logContext = logContext;
@@ -1038,6 +1090,7 @@ public class CoordinatorRuntime, U> implements Aut
this.highWatermarklistener = new HighWatermarkListener();
this.loader = loader;
this.coordinatorShardBuilderSupplier = coordinatorShardBuilderSupplier;
+ this.runtimeMetrics = runtimeMetrics;
}
/**
@@ -1242,7 +1295,7 @@ public class CoordinatorRuntime, U> implements Aut
case FAILED:
case INITIAL:
context.transitionTo(CoordinatorState.LOADING);
- loader.load(tp, context.coordinator).whenComplete((state, exception) -> {
+ loader.load(tp, context.coordinator).whenComplete((summary, exception) -> {
scheduleInternalOperation("CompleteLoad(tp=" + tp + ", epoch=" + partitionEpoch + ")", tp, () -> {
withContextOrThrow(tp, ctx -> {
if (ctx.state != CoordinatorState.LOADING) {
@@ -1254,8 +1307,11 @@ public class CoordinatorRuntime, U> implements Aut
try {
if (exception != null) throw exception;
ctx.transitionTo(CoordinatorState.ACTIVE);
- log.info("Finished loading of metadata from {} with epoch {}.",
- tp, partitionEpoch
+ if (summary != null) {
+ runtimeMetrics.recordPartitionLoadSensor(summary.startTimeMs(), summary.endTimeMs());
+ }
+ log.info("Finished loading of metadata from {} with epoch {} and LoadSummary={}.",
+ tp, partitionEpoch, summary
);
} catch (Throwable ex) {
log.error("Failed to load metadata from {} with epoch {} due to {}.",
@@ -1373,6 +1429,7 @@ public class CoordinatorRuntime, U> implements Aut
context.transitionTo(CoordinatorState.CLOSED);
});
coordinators.clear();
+ Utils.closeQuietly(runtimeMetrics, "runtime metrics");
log.info("Coordinator runtime closed.");
}
}
diff --git a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/runtime/MultiThreadedEventProcessor.java b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/runtime/MultiThreadedEventProcessor.java
index 1f35d65c3f4..e4adc18e957 100644
--- a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/runtime/MultiThreadedEventProcessor.java
+++ b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/runtime/MultiThreadedEventProcessor.java
@@ -18,9 +18,12 @@ package org.apache.kafka.coordinator.group.runtime;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.utils.LogContext;
+import org.apache.kafka.common.utils.Time;
+import org.apache.kafka.coordinator.group.metrics.CoordinatorRuntimeMetrics;
import org.slf4j.Logger;
import java.util.List;
+import java.util.Objects;
import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
@@ -52,21 +55,50 @@ public class MultiThreadedEventProcessor implements CoordinatorEventProcessor {
*/
private volatile boolean shuttingDown;
+ /**
+ * The coordinator runtime metrics.
+ */
+ private final CoordinatorRuntimeMetrics metrics;
+
+ /**
+ * The time.
+ */
+ private final Time time;
+
+ public MultiThreadedEventProcessor(
+ LogContext logContext,
+ String threadPrefix,
+ int numThreads,
+ Time time,
+ CoordinatorRuntimeMetrics metrics
+ ) {
+ this(logContext, threadPrefix, numThreads, time, metrics, new EventAccumulator<>());
+ }
+
/**
* Constructor.
*
- * @param logContext The log context.
- * @param threadPrefix The thread prefix.
- * @param numThreads The number of threads.
+ * @param logContext The log context.
+ * @param threadPrefix The thread prefix.
+ * @param numThreads The number of threads.
+ * @param metrics The coordinator runtime metrics.
+ * @param time The time.
+ * @param eventAccumulator The event accumulator.
*/
public MultiThreadedEventProcessor(
LogContext logContext,
String threadPrefix,
- int numThreads
+ int numThreads,
+ Time time,
+ CoordinatorRuntimeMetrics metrics,
+ EventAccumulator eventAccumulator
) {
this.log = logContext.logger(MultiThreadedEventProcessor.class);
this.shuttingDown = false;
- this.accumulator = new EventAccumulator<>();
+ this.accumulator = eventAccumulator;
+ this.time = Objects.requireNonNull(time);
+ this.metrics = Objects.requireNonNull(metrics);
+ this.metrics.registerEventQueueSizeGauge(accumulator::size);
this.threads = IntStream.range(0, numThreads).mapToObj(threadId ->
new EventProcessorThread(
threadPrefix + threadId
@@ -81,6 +113,9 @@ public class MultiThreadedEventProcessor implements CoordinatorEventProcessor {
*/
private class EventProcessorThread extends Thread {
private final Logger log;
+ private long pollStartMs;
+ private long timeSinceLastPollMs;
+ private long lastPollMs;
EventProcessorThread(
String name
@@ -92,11 +127,16 @@ public class MultiThreadedEventProcessor implements CoordinatorEventProcessor {
private void handleEvents() {
while (!shuttingDown) {
+ recordPollStartTime(time.milliseconds());
CoordinatorEvent event = accumulator.poll();
+ recordPollEndTime(time.milliseconds());
if (event != null) {
try {
log.debug("Executing event: {}.", event);
+ long dequeuedTimeMs = time.milliseconds();
+ metrics.recordEventQueueTime(dequeuedTimeMs - event.createdTimeMs());
event.run();
+ metrics.recordEventQueueProcessingTime(time.milliseconds() - dequeuedTimeMs);
} catch (Throwable t) {
log.error("Failed to run event {} due to: {}.", event, t.getMessage(), t);
event.complete(t);
@@ -112,6 +152,7 @@ public class MultiThreadedEventProcessor implements CoordinatorEventProcessor {
while (event != null) {
try {
log.debug("Draining event: {}.", event);
+ metrics.recordEventQueueTime(time.milliseconds() - event.createdTimeMs());
event.complete(new RejectedExecutionException("EventProcessor is closed."));
} catch (Throwable t) {
log.error("Failed to reject event {} due to: {}.", event, t.getMessage(), t);
@@ -145,6 +186,18 @@ public class MultiThreadedEventProcessor implements CoordinatorEventProcessor {
log.info("Shutdown completed");
}
}
+
+ private void recordPollStartTime(long pollStartMs) {
+ this.pollStartMs = pollStartMs;
+ this.timeSinceLastPollMs = lastPollMs != 0L ? pollStartMs - lastPollMs : 0;
+ this.lastPollMs = pollStartMs;
+ }
+
+ private void recordPollEndTime(long pollEndMs) {
+ long pollTimeMs = pollEndMs - pollStartMs;
+ double pollIdleRatio = pollTimeMs * 1.0 / (pollTimeMs + timeSinceLastPollMs);
+ metrics.recordThreadIdleRatio(pollIdleRatio);
+ }
}
/**
diff --git a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/metrics/GroupCoordinatorRuntimeMetricsTest.java b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/metrics/GroupCoordinatorRuntimeMetricsTest.java
new file mode 100644
index 00000000000..f3726aafb17
--- /dev/null
+++ b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/metrics/GroupCoordinatorRuntimeMetricsTest.java
@@ -0,0 +1,149 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.coordinator.group.metrics;
+
+import org.apache.kafka.common.MetricName;
+import org.apache.kafka.common.metrics.KafkaMetric;
+import org.apache.kafka.common.metrics.Metrics;
+import org.apache.kafka.common.utils.MockTime;
+import org.apache.kafka.common.utils.Time;
+import org.apache.kafka.coordinator.group.runtime.CoordinatorRuntime.CoordinatorState;
+import org.junit.jupiter.api.Test;
+
+import java.util.Arrays;
+import java.util.HashSet;
+import java.util.stream.IntStream;
+
+import static org.apache.kafka.coordinator.group.metrics.GroupCoordinatorRuntimeMetrics.METRICS_GROUP;
+import static org.apache.kafka.coordinator.group.metrics.GroupCoordinatorRuntimeMetrics.NUM_PARTITIONS_METRIC_NAME;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertFalse;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+public class GroupCoordinatorRuntimeMetricsTest {
+
+ @Test
+ public void testMetricNames() {
+ Metrics metrics = new Metrics();
+
+ HashSet expectedMetrics = new HashSet<>(Arrays.asList(
+ kafkaMetricName(metrics, NUM_PARTITIONS_METRIC_NAME, "state", "loading"),
+ kafkaMetricName(metrics, NUM_PARTITIONS_METRIC_NAME, "state", "active"),
+ kafkaMetricName(metrics, NUM_PARTITIONS_METRIC_NAME, "state", "failed"),
+ metrics.metricName("event-queue-size", METRICS_GROUP),
+ metrics.metricName("partition-load-time-max", METRICS_GROUP),
+ metrics.metricName("partition-load-time-avg", METRICS_GROUP),
+ metrics.metricName("thread-idle-ratio-min", METRICS_GROUP),
+ metrics.metricName("thread-idle-ratio-avg", METRICS_GROUP)
+ ));
+
+ try (GroupCoordinatorRuntimeMetrics runtimeMetrics = new GroupCoordinatorRuntimeMetrics(metrics)) {
+ runtimeMetrics.registerEventQueueSizeGauge(() -> 0);
+ expectedMetrics.forEach(metricName -> assertTrue(metrics.metrics().containsKey(metricName)));
+ }
+
+ expectedMetrics.forEach(metricName -> assertFalse(metrics.metrics().containsKey(metricName)));
+ }
+
+ @Test
+ public void testUpdateNumPartitionsMetrics() {
+ Metrics metrics = new Metrics();
+
+ try (GroupCoordinatorRuntimeMetrics runtimeMetrics = new GroupCoordinatorRuntimeMetrics(metrics)) {
+ IntStream.range(0, 10)
+ .forEach(__ -> runtimeMetrics.recordPartitionStateChange(CoordinatorState.INITIAL, CoordinatorState.LOADING));
+ IntStream.range(0, 8)
+ .forEach(__ -> runtimeMetrics.recordPartitionStateChange(CoordinatorState.LOADING, CoordinatorState.ACTIVE));
+ IntStream.range(0, 8)
+ .forEach(__ -> runtimeMetrics.recordPartitionStateChange(CoordinatorState.ACTIVE, CoordinatorState.FAILED));
+ IntStream.range(0, 2)
+ .forEach(__ -> runtimeMetrics.recordPartitionStateChange(CoordinatorState.FAILED, CoordinatorState.CLOSED));
+
+ assertMetricGauge(metrics, kafkaMetricName(metrics, NUM_PARTITIONS_METRIC_NAME, "state", "loading"), 2);
+ assertMetricGauge(metrics, kafkaMetricName(metrics, NUM_PARTITIONS_METRIC_NAME, "state", "active"), 0);
+ assertMetricGauge(metrics, kafkaMetricName(metrics, NUM_PARTITIONS_METRIC_NAME, "state", "failed"), 6);
+ }
+ }
+
+ @Test
+ public void testPartitionLoadSensorMetrics() {
+ Time time = new MockTime();
+ Metrics metrics = new Metrics(time);
+
+ try (GroupCoordinatorRuntimeMetrics runtimeMetrics = new GroupCoordinatorRuntimeMetrics(metrics)) {
+ long startTimeMs = time.milliseconds();
+ runtimeMetrics.recordPartitionLoadSensor(startTimeMs, startTimeMs + 1000);
+ runtimeMetrics.recordPartitionLoadSensor(startTimeMs, startTimeMs + 2000);
+
+ org.apache.kafka.common.MetricName metricName = metrics.metricName(
+ "partition-load-time-avg", METRICS_GROUP);
+
+ KafkaMetric metric = metrics.metrics().get(metricName);
+ assertEquals(1500.0, metric.metricValue());
+
+ metricName = metrics.metricName(
+ "partition-load-time-max", METRICS_GROUP);
+ metric = metrics.metrics().get(metricName);
+ assertEquals(2000.0, metric.metricValue());
+ }
+ }
+
+ @Test
+ public void testThreadIdleRatioSensor() {
+ Time time = new MockTime();
+ Metrics metrics = new Metrics(time);
+
+ try (GroupCoordinatorRuntimeMetrics runtimeMetrics = new GroupCoordinatorRuntimeMetrics(metrics)) {
+ IntStream.range(0, 3).forEach(i -> runtimeMetrics.recordThreadIdleRatio(1.0 / (i + 1)));
+
+ org.apache.kafka.common.MetricName metricName = metrics.metricName(
+ "thread-idle-ratio-avg", METRICS_GROUP);
+
+ KafkaMetric metric = metrics.metrics().get(metricName);
+ assertEquals((11.0 / 6.0) / 3.0, metric.metricValue()); // (6/6 + 3/6 + 2/6) / 3
+
+ metricName = metrics.metricName(
+ "thread-idle-ratio-min", METRICS_GROUP);
+ metric = metrics.metrics().get(metricName);
+ assertEquals(1.0 / 3.0, metric.metricValue());
+ }
+ }
+
+ @Test
+ public void testEventQueueSize() {
+ Time time = new MockTime();
+ Metrics metrics = new Metrics(time);
+
+ try (GroupCoordinatorRuntimeMetrics runtimeMetrics = new GroupCoordinatorRuntimeMetrics(metrics)) {
+ runtimeMetrics.registerEventQueueSizeGauge(() -> 5);
+ assertMetricGauge(metrics, kafkaMetricName(metrics, "event-queue-size"), 5);
+ }
+ }
+
+ private static void assertMetricGauge(Metrics metrics, org.apache.kafka.common.MetricName metricName, long count) {
+ assertEquals(count, (long) metrics.metric(metricName).metricValue());
+ }
+
+ private static com.yammer.metrics.core.MetricName yammerMetricName(String type, String name) {
+ String mBeanName = String.format("kafka.coordinator.group:type=%s,name=%s", type, name);
+ return new com.yammer.metrics.core.MetricName("kafka.coordinator.group", type, name, null, mBeanName);
+ }
+
+ private static MetricName kafkaMetricName(Metrics metrics, String name, String... keyValue) {
+ return metrics.metricName(name, METRICS_GROUP, "", keyValue);
+ }
+}
\ No newline at end of file
diff --git a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/runtime/CoordinatorRuntimeTest.java b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/runtime/CoordinatorRuntimeTest.java
index a7eb0f6b709..d776a07cd3a 100644
--- a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/runtime/CoordinatorRuntimeTest.java
+++ b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/runtime/CoordinatorRuntimeTest.java
@@ -21,6 +21,7 @@ import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.errors.NotCoordinatorException;
import org.apache.kafka.common.utils.LogContext;
import org.apache.kafka.common.utils.Time;
+import org.apache.kafka.coordinator.group.metrics.GroupCoordinatorRuntimeMetrics;
import org.apache.kafka.image.MetadataDelta;
import org.apache.kafka.image.MetadataImage;
import org.apache.kafka.image.MetadataProvenance;
@@ -45,6 +46,11 @@ import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicInteger;
import static org.apache.kafka.common.utils.Utils.mkSet;
+import static org.apache.kafka.coordinator.group.runtime.CoordinatorRuntime.CoordinatorState.ACTIVE;
+import static org.apache.kafka.coordinator.group.runtime.CoordinatorRuntime.CoordinatorState.CLOSED;
+import static org.apache.kafka.coordinator.group.runtime.CoordinatorRuntime.CoordinatorState.FAILED;
+import static org.apache.kafka.coordinator.group.runtime.CoordinatorRuntime.CoordinatorState.INITIAL;
+import static org.apache.kafka.coordinator.group.runtime.CoordinatorRuntime.CoordinatorState.LOADING;
import static org.apache.kafka.test.TestUtils.assertFutureThrows;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertFalse;
@@ -118,9 +124,19 @@ public class CoordinatorRuntimeTest {
* A CoordinatorLoader that always succeeds.
*/
private static class MockCoordinatorLoader implements CoordinatorLoader {
+ private final LoadSummary summary;
+
+ public MockCoordinatorLoader(LoadSummary summary) {
+ this.summary = summary;
+ }
+
+ public MockCoordinatorLoader() {
+ this(null);
+ }
+
@Override
- public CompletableFuture load(TopicPartition tp, CoordinatorPlayback replayable) {
- return CompletableFuture.completedFuture(null);
+ public CompletableFuture load(TopicPartition tp, CoordinatorPlayback replayable) {
+ return CompletableFuture.completedFuture(summary);
}
@Override
@@ -271,6 +287,7 @@ public class CoordinatorRuntimeTest {
.withEventProcessor(new DirectEventProcessor())
.withPartitionWriter(writer)
.withCoordinatorShardBuilderSupplier(supplier)
+ .withCoordinatorRuntimeMetrics(mock(GroupCoordinatorRuntimeMetrics.class))
.build();
when(builder.withSnapshotRegistry(any())).thenReturn(builder);
@@ -280,7 +297,7 @@ public class CoordinatorRuntimeTest {
when(builder.withTopicPartition(any())).thenReturn(builder);
when(builder.build()).thenReturn(coordinator);
when(supplier.get()).thenReturn(builder);
- CompletableFuture future = new CompletableFuture<>();
+ CompletableFuture future = new CompletableFuture<>();
when(loader.load(TP, coordinator)).thenReturn(future);
// Getting the coordinator context fails because the coordinator
@@ -294,13 +311,13 @@ public class CoordinatorRuntimeTest {
CoordinatorRuntime.CoordinatorContext ctx = runtime.contextOrThrow(TP);
// The coordinator is loading.
- assertEquals(CoordinatorRuntime.CoordinatorState.LOADING, ctx.state);
+ assertEquals(LOADING, ctx.state);
assertEquals(0, ctx.epoch);
assertEquals(coordinator, ctx.coordinator);
// When the loading completes, the coordinator transitions to active.
future.complete(null);
- assertEquals(CoordinatorRuntime.CoordinatorState.ACTIVE, ctx.state);
+ assertEquals(ACTIVE, ctx.state);
// Verify that onLoaded is called.
verify(coordinator, times(1)).onLoaded(MetadataImage.EMPTY);
@@ -335,6 +352,7 @@ public class CoordinatorRuntimeTest {
.withEventProcessor(new DirectEventProcessor())
.withPartitionWriter(writer)
.withCoordinatorShardBuilderSupplier(supplier)
+ .withCoordinatorRuntimeMetrics(mock(GroupCoordinatorRuntimeMetrics.class))
.build();
when(builder.withSnapshotRegistry(any())).thenReturn(builder);
@@ -344,7 +362,7 @@ public class CoordinatorRuntimeTest {
when(builder.withTopicPartition(any())).thenReturn(builder);
when(builder.build()).thenReturn(coordinator);
when(supplier.get()).thenReturn(builder);
- CompletableFuture future = new CompletableFuture<>();
+ CompletableFuture future = new CompletableFuture<>();
when(loader.load(TP, coordinator)).thenReturn(future);
// Schedule the loading.
@@ -352,13 +370,13 @@ public class CoordinatorRuntimeTest {
// Getting the context succeeds and the coordinator should be in loading.
CoordinatorRuntime.CoordinatorContext ctx = runtime.contextOrThrow(TP);
- assertEquals(CoordinatorRuntime.CoordinatorState.LOADING, ctx.state);
+ assertEquals(LOADING, ctx.state);
assertEquals(0, ctx.epoch);
assertEquals(coordinator, ctx.coordinator);
// When the loading fails, the coordinator transitions to failed.
future.completeExceptionally(new Exception("failure"));
- assertEquals(CoordinatorRuntime.CoordinatorState.FAILED, ctx.state);
+ assertEquals(FAILED, ctx.state);
// Verify that onUnloaded is called.
verify(coordinator, times(1)).onUnloaded();
@@ -386,6 +404,7 @@ public class CoordinatorRuntimeTest {
.withEventProcessor(new DirectEventProcessor())
.withPartitionWriter(new MockPartitionWriter())
.withCoordinatorShardBuilderSupplier(supplier)
+ .withCoordinatorRuntimeMetrics(mock(GroupCoordinatorRuntimeMetrics.class))
.build();
when(builder.withSnapshotRegistry(any())).thenReturn(builder);
@@ -395,7 +414,7 @@ public class CoordinatorRuntimeTest {
when(builder.withTopicPartition(any())).thenReturn(builder);
when(builder.build()).thenReturn(coordinator);
when(supplier.get()).thenReturn(builder);
- CompletableFuture future = new CompletableFuture<>();
+ CompletableFuture future = new CompletableFuture<>();
when(loader.load(TP, coordinator)).thenReturn(future);
// Schedule the loading.
@@ -403,19 +422,19 @@ public class CoordinatorRuntimeTest {
// Getting the context succeeds and the coordinator should be in loading.
CoordinatorRuntime.CoordinatorContext ctx = runtime.contextOrThrow(TP);
- assertEquals(CoordinatorRuntime.CoordinatorState.LOADING, ctx.state);
+ assertEquals(LOADING, ctx.state);
assertEquals(10, ctx.epoch);
assertEquals(coordinator, ctx.coordinator);
// When the loading completes, the coordinator transitions to active.
future.complete(null);
- assertEquals(CoordinatorRuntime.CoordinatorState.ACTIVE, ctx.state);
+ assertEquals(ACTIVE, ctx.state);
assertEquals(10, ctx.epoch);
// Loading with a previous epoch is a no-op. The coordinator stays
// in active state with the correct epoch.
runtime.scheduleLoadOperation(TP, 0);
- assertEquals(CoordinatorRuntime.CoordinatorState.ACTIVE, ctx.state);
+ assertEquals(ACTIVE, ctx.state);
assertEquals(10, ctx.epoch);
}
@@ -435,6 +454,7 @@ public class CoordinatorRuntimeTest {
.withEventProcessor(new DirectEventProcessor())
.withPartitionWriter(new MockPartitionWriter())
.withCoordinatorShardBuilderSupplier(supplier)
+ .withCoordinatorRuntimeMetrics(mock(GroupCoordinatorRuntimeMetrics.class))
.build();
when(builder.withSnapshotRegistry(any())).thenReturn(builder);
@@ -444,7 +464,7 @@ public class CoordinatorRuntimeTest {
when(builder.withTopicPartition(any())).thenReturn(builder);
when(builder.build()).thenReturn(coordinator);
when(supplier.get()).thenReturn(builder);
- CompletableFuture future = new CompletableFuture<>();
+ CompletableFuture future = new CompletableFuture<>();
when(loader.load(TP, coordinator)).thenReturn(future);
// Schedule the loading.
@@ -452,13 +472,13 @@ public class CoordinatorRuntimeTest {
// Getting the context succeeds and the coordinator should be in loading.
CoordinatorRuntime.CoordinatorContext ctx = runtime.contextOrThrow(TP);
- assertEquals(CoordinatorRuntime.CoordinatorState.LOADING, ctx.state);
+ assertEquals(LOADING, ctx.state);
assertEquals(10, ctx.epoch);
assertEquals(coordinator, ctx.coordinator);
// When the loading fails, the coordinator transitions to failed.
future.completeExceptionally(new Exception("failure"));
- assertEquals(CoordinatorRuntime.CoordinatorState.FAILED, ctx.state);
+ assertEquals(FAILED, ctx.state);
// Verify that onUnloaded is called.
verify(coordinator, times(1)).onUnloaded();
@@ -474,7 +494,7 @@ public class CoordinatorRuntimeTest {
// Getting the context succeeds and the coordinator should be in loading.
ctx = runtime.contextOrThrow(TP);
- assertEquals(CoordinatorRuntime.CoordinatorState.LOADING, ctx.state);
+ assertEquals(LOADING, ctx.state);
assertEquals(11, ctx.epoch);
assertEquals(coordinator, ctx.coordinator);
@@ -482,7 +502,7 @@ public class CoordinatorRuntimeTest {
future.complete(null);
// Verify the state.
- assertEquals(CoordinatorRuntime.CoordinatorState.ACTIVE, ctx.state);
+ assertEquals(ACTIVE, ctx.state);
}
@Test
@@ -501,6 +521,7 @@ public class CoordinatorRuntimeTest {
.withEventProcessor(new DirectEventProcessor())
.withPartitionWriter(writer)
.withCoordinatorShardBuilderSupplier(supplier)
+ .withCoordinatorRuntimeMetrics(mock(GroupCoordinatorRuntimeMetrics.class))
.build();
when(builder.withSnapshotRegistry(any())).thenReturn(builder);
@@ -514,12 +535,12 @@ public class CoordinatorRuntimeTest {
// Loads the coordinator. It directly transitions to active.
runtime.scheduleLoadOperation(TP, 10);
CoordinatorRuntime.CoordinatorContext ctx = runtime.contextOrThrow(TP);
- assertEquals(CoordinatorRuntime.CoordinatorState.ACTIVE, ctx.state);
+ assertEquals(ACTIVE, ctx.state);
assertEquals(10, ctx.epoch);
// Schedule the unloading.
runtime.scheduleUnloadOperation(TP, ctx.epoch + 1);
- assertEquals(CoordinatorRuntime.CoordinatorState.CLOSED, ctx.state);
+ assertEquals(CLOSED, ctx.state);
// Verify that onUnloaded is called.
verify(coordinator, times(1)).onUnloaded();
@@ -549,6 +570,7 @@ public class CoordinatorRuntimeTest {
.withEventProcessor(new DirectEventProcessor())
.withPartitionWriter(new MockPartitionWriter())
.withCoordinatorShardBuilderSupplier(supplier)
+ .withCoordinatorRuntimeMetrics(mock(GroupCoordinatorRuntimeMetrics.class))
.build();
when(builder.withSnapshotRegistry(any())).thenReturn(builder);
@@ -564,13 +586,13 @@ public class CoordinatorRuntimeTest {
// Loads the coordinator. It directly transitions to active.
runtime.scheduleLoadOperation(TP, 10);
CoordinatorRuntime.CoordinatorContext ctx = runtime.contextOrThrow(TP);
- assertEquals(CoordinatorRuntime.CoordinatorState.ACTIVE, ctx.state);
+ assertEquals(ACTIVE, ctx.state);
assertEquals(10, ctx.epoch);
// Unloading with a previous epoch is a no-op. The coordinator stays
// in active with the correct epoch.
runtime.scheduleUnloadOperation(TP, 0);
- assertEquals(CoordinatorRuntime.CoordinatorState.ACTIVE, ctx.state);
+ assertEquals(ACTIVE, ctx.state);
assertEquals(10, ctx.epoch);
}
@@ -587,6 +609,7 @@ public class CoordinatorRuntimeTest {
.withEventProcessor(new DirectEventProcessor())
.withPartitionWriter(writer)
.withCoordinatorShardBuilderSupplier(new MockCoordinatorShardBuilderSupplier())
+ .withCoordinatorRuntimeMetrics(mock(GroupCoordinatorRuntimeMetrics.class))
.build();
// Schedule the loading.
@@ -695,6 +718,7 @@ public class CoordinatorRuntimeTest {
.withEventProcessor(new DirectEventProcessor())
.withPartitionWriter(new MockPartitionWriter())
.withCoordinatorShardBuilderSupplier(new MockCoordinatorShardBuilderSupplier())
+ .withCoordinatorRuntimeMetrics(mock(GroupCoordinatorRuntimeMetrics.class))
.build();
// Scheduling a write fails with a NotCoordinatorException because the coordinator
@@ -715,6 +739,7 @@ public class CoordinatorRuntimeTest {
.withEventProcessor(new DirectEventProcessor())
.withPartitionWriter(new MockPartitionWriter())
.withCoordinatorShardBuilderSupplier(new MockCoordinatorShardBuilderSupplier())
+ .withCoordinatorRuntimeMetrics(mock(GroupCoordinatorRuntimeMetrics.class))
.build();
// Loads the coordinator.
@@ -739,6 +764,7 @@ public class CoordinatorRuntimeTest {
.withEventProcessor(new DirectEventProcessor())
.withPartitionWriter(new MockPartitionWriter())
.withCoordinatorShardBuilderSupplier(new MockCoordinatorShardBuilderSupplier())
+ .withCoordinatorRuntimeMetrics(mock(GroupCoordinatorRuntimeMetrics.class))
.build();
// Loads the coordinator.
@@ -784,6 +810,7 @@ public class CoordinatorRuntimeTest {
.withEventProcessor(new DirectEventProcessor())
.withPartitionWriter(writer)
.withCoordinatorShardBuilderSupplier(new MockCoordinatorShardBuilderSupplier())
+ .withCoordinatorRuntimeMetrics(mock(GroupCoordinatorRuntimeMetrics.class))
.build();
// Loads the coordinator.
@@ -831,6 +858,7 @@ public class CoordinatorRuntimeTest {
.withEventProcessor(new DirectEventProcessor())
.withPartitionWriter(writer)
.withCoordinatorShardBuilderSupplier(new MockCoordinatorShardBuilderSupplier())
+ .withCoordinatorRuntimeMetrics(mock(GroupCoordinatorRuntimeMetrics.class))
.build();
// Loads the coordinator.
@@ -885,6 +913,7 @@ public class CoordinatorRuntimeTest {
.withEventProcessor(new DirectEventProcessor())
.withPartitionWriter(new MockPartitionWriter())
.withCoordinatorShardBuilderSupplier(new MockCoordinatorShardBuilderSupplier())
+ .withCoordinatorRuntimeMetrics(mock(GroupCoordinatorRuntimeMetrics.class))
.build();
// Schedule a read. It fails because the coordinator does not exist.
@@ -906,6 +935,7 @@ public class CoordinatorRuntimeTest {
.withEventProcessor(new DirectEventProcessor())
.withPartitionWriter(writer)
.withCoordinatorShardBuilderSupplier(new MockCoordinatorShardBuilderSupplier())
+ .withCoordinatorRuntimeMetrics(mock(GroupCoordinatorRuntimeMetrics.class))
.build();
// Loads the coordinator.
@@ -947,6 +977,7 @@ public class CoordinatorRuntimeTest {
.withEventProcessor(new DirectEventProcessor())
.withPartitionWriter(new MockPartitionWriter())
.withCoordinatorShardBuilderSupplier(new MockCoordinatorShardBuilderSupplier())
+ .withCoordinatorRuntimeMetrics(mock(GroupCoordinatorRuntimeMetrics.class))
.build();
// Loads the coordinator.
@@ -1012,6 +1043,7 @@ public class CoordinatorRuntimeTest {
.withEventProcessor(new DirectEventProcessor())
.withPartitionWriter(writer)
.withCoordinatorShardBuilderSupplier(supplier)
+ .withCoordinatorRuntimeMetrics(mock(GroupCoordinatorRuntimeMetrics.class))
.build();
MockCoordinatorShard coordinator0 = mock(MockCoordinatorShard.class);
@@ -1028,10 +1060,10 @@ public class CoordinatorRuntimeTest {
.thenReturn(coordinator0)
.thenReturn(coordinator1);
- CompletableFuture future0 = new CompletableFuture<>();
+ CompletableFuture future0 = new CompletableFuture<>();
when(loader.load(tp0, coordinator0)).thenReturn(future0);
- CompletableFuture future1 = new CompletableFuture<>();
+ CompletableFuture future1 = new CompletableFuture<>();
when(loader.load(tp1, coordinator1)).thenReturn(future1);
runtime.scheduleLoadOperation(tp0, 0);
@@ -1067,6 +1099,7 @@ public class CoordinatorRuntimeTest {
.withEventProcessor(new DirectEventProcessor())
.withPartitionWriter(new MockPartitionWriter())
.withCoordinatorShardBuilderSupplier(new MockCoordinatorShardBuilderSupplier())
+ .withCoordinatorRuntimeMetrics(mock(GroupCoordinatorRuntimeMetrics.class))
.build();
// Loads the coordinator.
@@ -1118,6 +1151,7 @@ public class CoordinatorRuntimeTest {
.withEventProcessor(processor)
.withPartitionWriter(new MockPartitionWriter())
.withCoordinatorShardBuilderSupplier(new MockCoordinatorShardBuilderSupplier())
+ .withCoordinatorRuntimeMetrics(mock(GroupCoordinatorRuntimeMetrics.class))
.build();
// Loads the coordinator.
@@ -1189,6 +1223,7 @@ public class CoordinatorRuntimeTest {
.withEventProcessor(processor)
.withPartitionWriter(new MockPartitionWriter())
.withCoordinatorShardBuilderSupplier(new MockCoordinatorShardBuilderSupplier())
+ .withCoordinatorRuntimeMetrics(mock(GroupCoordinatorRuntimeMetrics.class))
.build();
// Loads the coordinator.
@@ -1257,6 +1292,7 @@ public class CoordinatorRuntimeTest {
.withEventProcessor(new DirectEventProcessor())
.withPartitionWriter(new MockPartitionWriter())
.withCoordinatorShardBuilderSupplier(new MockCoordinatorShardBuilderSupplier())
+ .withCoordinatorRuntimeMetrics(mock(GroupCoordinatorRuntimeMetrics.class))
.build();
// Loads the coordinator.
@@ -1313,6 +1349,7 @@ public class CoordinatorRuntimeTest {
.withEventProcessor(new DirectEventProcessor())
.withPartitionWriter(new MockPartitionWriter())
.withCoordinatorShardBuilderSupplier(new MockCoordinatorShardBuilderSupplier())
+ .withCoordinatorRuntimeMetrics(mock(GroupCoordinatorRuntimeMetrics.class))
.build();
// Loads the coordinator.
@@ -1340,4 +1377,119 @@ public class CoordinatorRuntimeTest {
assertEquals(1, cnt.get());
assertEquals(0, ctx.timer.size());
}
+
+ @Test
+ public void testStateChanges() throws Exception {
+ MockTimer timer = new MockTimer();
+ MockPartitionWriter writer = mock(MockPartitionWriter.class);
+ MockCoordinatorLoader loader = mock(MockCoordinatorLoader.class);
+ MockCoordinatorShardBuilderSupplier supplier = mock(MockCoordinatorShardBuilderSupplier.class);
+ MockCoordinatorShardBuilder builder = mock(MockCoordinatorShardBuilder.class);
+ MockCoordinatorShard coordinator = mock(MockCoordinatorShard.class);
+ GroupCoordinatorRuntimeMetrics runtimeMetrics = mock(GroupCoordinatorRuntimeMetrics.class);
+
+ CoordinatorRuntime runtime =
+ new CoordinatorRuntime.Builder()
+ .withTime(timer.time())
+ .withTimer(timer)
+ .withLoader(loader)
+ .withEventProcessor(new DirectEventProcessor())
+ .withPartitionWriter(writer)
+ .withCoordinatorShardBuilderSupplier(supplier)
+ .withCoordinatorRuntimeMetrics(runtimeMetrics)
+ .build();
+
+ when(builder.withSnapshotRegistry(any())).thenReturn(builder);
+ when(builder.withLogContext(any())).thenReturn(builder);
+ when(builder.withTime(any())).thenReturn(builder);
+ when(builder.withTimer(any())).thenReturn(builder);
+ when(builder.withTopicPartition(any())).thenReturn(builder);
+ when(builder.build()).thenReturn(coordinator);
+ when(supplier.get()).thenReturn(builder);
+ CompletableFuture future = new CompletableFuture<>();
+ when(loader.load(TP, coordinator)).thenReturn(future);
+
+ // Schedule the loading.
+ runtime.scheduleLoadOperation(TP, 0);
+
+ // Getting the context succeeds and the coordinator should be in loading.
+ CoordinatorRuntime.CoordinatorContext ctx = runtime.contextOrThrow(TP);
+ assertEquals(LOADING, ctx.state);
+ verify(runtimeMetrics, times(1)).recordPartitionStateChange(INITIAL, LOADING);
+
+ // When the loading fails, the coordinator transitions to failed.
+ future.completeExceptionally(new Exception("failure"));
+ assertEquals(FAILED, ctx.state);
+ verify(runtimeMetrics, times(1)).recordPartitionStateChange(LOADING, FAILED);
+
+ // Start loading a new topic partition.
+ TopicPartition tp = new TopicPartition("__consumer_offsets", 1);
+ future = new CompletableFuture<>();
+ when(loader.load(tp, coordinator)).thenReturn(future);
+ // Schedule the loading.
+ runtime.scheduleLoadOperation(tp, 0);
+ // Getting the context succeeds and the coordinator should be in loading.
+ ctx = runtime.contextOrThrow(tp);
+ assertEquals(LOADING, ctx.state);
+ verify(runtimeMetrics, times(2)).recordPartitionStateChange(INITIAL, LOADING);
+
+ // When the loading completes, the coordinator transitions to active.
+ future.complete(null);
+ assertEquals(ACTIVE, ctx.state);
+ verify(runtimeMetrics, times(1)).recordPartitionStateChange(LOADING, ACTIVE);
+
+ runtime.close();
+ verify(runtimeMetrics, times(1)).recordPartitionStateChange(FAILED, CLOSED);
+ verify(runtimeMetrics, times(1)).recordPartitionStateChange(ACTIVE, CLOSED);
+ }
+
+ @Test
+ public void testPartitionLoadSensor() {
+ MockTimer timer = new MockTimer();
+ MockPartitionWriter writer = mock(MockPartitionWriter.class);
+ MockCoordinatorShardBuilderSupplier supplier = mock(MockCoordinatorShardBuilderSupplier.class);
+ MockCoordinatorShardBuilder builder = mock(MockCoordinatorShardBuilder.class);
+ MockCoordinatorShard coordinator = mock(MockCoordinatorShard.class);
+ GroupCoordinatorRuntimeMetrics runtimeMetrics = mock(GroupCoordinatorRuntimeMetrics.class);
+
+ long startTimeMs = timer.time().milliseconds();
+ CoordinatorRuntime runtime =
+ new CoordinatorRuntime.Builder()
+ .withTime(timer.time())
+ .withTimer(timer)
+ .withLoader(new MockCoordinatorLoader(
+ new CoordinatorLoader.LoadSummary(
+ startTimeMs,
+ startTimeMs + 1000,
+ 30,
+ 3000)))
+ .withEventProcessor(new DirectEventProcessor())
+ .withPartitionWriter(writer)
+ .withCoordinatorShardBuilderSupplier(supplier)
+ .withCoordinatorRuntimeMetrics(runtimeMetrics)
+ .build();
+
+ when(builder.withSnapshotRegistry(any())).thenReturn(builder);
+ when(builder.withLogContext(any())).thenReturn(builder);
+ when(builder.withTime(any())).thenReturn(builder);
+ when(builder.withTimer(any())).thenReturn(builder);
+ when(builder.withTopicPartition(any())).thenReturn(builder);
+ when(builder.build()).thenReturn(coordinator);
+ when(supplier.get()).thenReturn(builder);
+
+ // Getting the coordinator context fails because the coordinator
+ // does not exist until scheduleLoadOperation is called.
+ assertThrows(NotCoordinatorException.class, () -> runtime.contextOrThrow(TP));
+
+ // Schedule the loading.
+ runtime.scheduleLoadOperation(TP, 0);
+
+ // Getting the coordinator context succeeds now.
+ CoordinatorRuntime.CoordinatorContext ctx = runtime.contextOrThrow(TP);
+
+ // When the loading completes, the coordinator transitions to active.
+ assertEquals(ACTIVE, ctx.state);
+
+ verify(runtimeMetrics, times(1)).recordPartitionLoadSensor(startTimeMs, startTimeMs + 1000);
+ }
}
diff --git a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/runtime/MultiThreadedEventProcessorTest.java b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/runtime/MultiThreadedEventProcessorTest.java
index 0630c88f389..f6017b90bcd 100644
--- a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/runtime/MultiThreadedEventProcessorTest.java
+++ b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/runtime/MultiThreadedEventProcessorTest.java
@@ -18,16 +18,24 @@ package org.apache.kafka.coordinator.group.runtime;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.utils.LogContext;
+import org.apache.kafka.common.utils.MockTime;
+import org.apache.kafka.common.utils.Time;
+import org.apache.kafka.coordinator.group.metrics.GroupCoordinatorRuntimeMetrics;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.Timeout;
+import org.mockito.ArgumentCaptor;
+import java.util.ArrayList;
import java.util.Arrays;
+import java.util.LinkedList;
import java.util.List;
+import java.util.Queue;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.Supplier;
@@ -37,9 +45,63 @@ import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertFalse;
import static org.junit.jupiter.api.Assertions.assertThrows;
import static org.junit.jupiter.api.Assertions.assertTrue;
+import static org.mockito.ArgumentMatchers.anyDouble;
+import static org.mockito.Mockito.doAnswer;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
@Timeout(value = 60)
public class MultiThreadedEventProcessorTest {
+ private static class MockEventAccumulator extends EventAccumulator {
+ private final Time time;
+ private final Queue events;
+ private final long timeToPollMs;
+ private final AtomicBoolean isClosed;
+
+ public MockEventAccumulator(Time time, long timeToPollMs) {
+ this.time = time;
+ this.events = new LinkedList<>();
+ this.timeToPollMs = timeToPollMs;
+ this.isClosed = new AtomicBoolean(false);
+ }
+
+ @Override
+ public CoordinatorEvent poll() {
+ synchronized (events) {
+ while (events.isEmpty() && !isClosed.get()) {
+ try {
+ events.wait();
+ } catch (Exception ignored) {
+
+ }
+ }
+ time.sleep(timeToPollMs);
+ return events.poll();
+ }
+ }
+
+ @Override
+ public CoordinatorEvent poll(long timeout, TimeUnit unit) {
+ return null;
+ }
+
+ @Override
+ public void add(CoordinatorEvent event) throws RejectedExecutionException {
+ synchronized (events) {
+ events.add(event);
+ events.notifyAll();
+ }
+ }
+
+ @Override
+ public void close() {
+ isClosed.set(true);
+ synchronized (events) {
+ events.notifyAll();
+ }
+ }
+ }
private static class FutureEvent implements CoordinatorEvent {
private final TopicPartition key;
@@ -48,18 +110,28 @@ public class MultiThreadedEventProcessorTest {
private final boolean block;
private final CountDownLatch latch;
private final CountDownLatch executed;
+ private long createdTimeMs;
FutureEvent(
TopicPartition key,
Supplier supplier
) {
- this(key, supplier, false);
+ this(key, supplier, false, 0L);
}
FutureEvent(
TopicPartition key,
Supplier supplier,
boolean block
+ ) {
+ this(key, supplier, block, 0L);
+ }
+
+ FutureEvent(
+ TopicPartition key,
+ Supplier supplier,
+ boolean block,
+ long createdTimeMs
) {
this.key = key;
this.future = new CompletableFuture<>();
@@ -67,6 +139,7 @@ public class MultiThreadedEventProcessorTest {
this.block = block;
this.latch = new CountDownLatch(1);
this.executed = new CountDownLatch(1);
+ this.createdTimeMs = createdTimeMs;
}
@Override
@@ -90,6 +163,11 @@ public class MultiThreadedEventProcessorTest {
future.completeExceptionally(ex);
}
+ @Override
+ public long createdTimeMs() {
+ return createdTimeMs;
+ }
+
@Override
public TopicPartition key() {
return key;
@@ -118,7 +196,9 @@ public class MultiThreadedEventProcessorTest {
CoordinatorEventProcessor eventProcessor = new MultiThreadedEventProcessor(
new LogContext(),
"event-processor-",
- 2
+ 2,
+ Time.SYSTEM,
+ mock(GroupCoordinatorRuntimeMetrics.class)
);
eventProcessor.close();
}
@@ -128,7 +208,9 @@ public class MultiThreadedEventProcessorTest {
try (CoordinatorEventProcessor eventProcessor = new MultiThreadedEventProcessor(
new LogContext(),
"event-processor-",
- 2
+ 2,
+ Time.SYSTEM,
+ mock(GroupCoordinatorRuntimeMetrics.class)
)) {
AtomicInteger numEventsExecuted = new AtomicInteger(0);
@@ -163,7 +245,9 @@ public class MultiThreadedEventProcessorTest {
try (CoordinatorEventProcessor eventProcessor = new MultiThreadedEventProcessor(
new LogContext(),
"event-processor-",
- 2
+ 2,
+ Time.SYSTEM,
+ mock(GroupCoordinatorRuntimeMetrics.class)
)) {
AtomicInteger numEventsExecuted = new AtomicInteger(0);
@@ -246,7 +330,9 @@ public class MultiThreadedEventProcessorTest {
CoordinatorEventProcessor eventProcessor = new MultiThreadedEventProcessor(
new LogContext(),
"event-processor-",
- 2
+ 2,
+ Time.SYSTEM,
+ mock(GroupCoordinatorRuntimeMetrics.class)
);
eventProcessor.close();
@@ -260,7 +346,9 @@ public class MultiThreadedEventProcessorTest {
try (MultiThreadedEventProcessor eventProcessor = new MultiThreadedEventProcessor(
new LogContext(),
"event-processor-",
- 1 // Use a single thread to block event in the processor.
+ 1, // Use a single thread to block event in the processor.
+ Time.SYSTEM,
+ mock(GroupCoordinatorRuntimeMetrics.class)
)) {
AtomicInteger numEventsExecuted = new AtomicInteger(0);
@@ -317,4 +405,147 @@ public class MultiThreadedEventProcessorTest {
assertEquals(1, numEventsExecuted.get());
}
}
+
+ @Test
+ public void testMetrics() throws Exception {
+ GroupCoordinatorRuntimeMetrics mockRuntimeMetrics = mock(GroupCoordinatorRuntimeMetrics.class);
+ Time mockTime = new MockTime();
+ AtomicInteger numEventsExecuted = new AtomicInteger(0);
+
+ // Special event which blocks until the latch is released.
+ FutureEvent blockingEvent = new FutureEvent<>(
+ new TopicPartition("foo", 0), () -> {
+ mockTime.sleep(4000L);
+ return numEventsExecuted.incrementAndGet();
+ },
+ true,
+ mockTime.milliseconds()
+ );
+
+ try (MultiThreadedEventProcessor eventProcessor = new MultiThreadedEventProcessor(
+ new LogContext(),
+ "event-processor-",
+ 1, // Use a single thread to block event in the processor.
+ mockTime,
+ mockRuntimeMetrics,
+ new MockEventAccumulator<>(mockTime, 500L)
+ )) {
+ // Enqueue the blocking event.
+ eventProcessor.enqueue(blockingEvent);
+
+ // Ensure that the blocking event is executed.
+ waitForCondition(() -> numEventsExecuted.get() > 0,
+ "Blocking event not executed.");
+
+ // Enqueue the other event.
+ FutureEvent otherEvent = new FutureEvent<>(
+ new TopicPartition("foo", 0), () -> {
+ mockTime.sleep(5000L);
+ return numEventsExecuted.incrementAndGet();
+ },
+ false,
+ mockTime.milliseconds()
+ );
+
+ eventProcessor.enqueue(otherEvent);
+
+ // Pass the time.
+ mockTime.sleep(3000L);
+
+ // Events should not be completed.
+ assertFalse(otherEvent.future.isDone());
+
+ // Release the blocking event to unblock the thread.
+ blockingEvent.release();
+
+ // The blocking event should be completed.
+ blockingEvent.future.get(DEFAULT_MAX_WAIT_MS, TimeUnit.SECONDS);
+ assertTrue(blockingEvent.future.isDone());
+ assertFalse(blockingEvent.future.isCompletedExceptionally());
+
+ // The other event should also be completed.
+ otherEvent.future.get(DEFAULT_MAX_WAIT_MS, TimeUnit.SECONDS);
+ assertTrue(otherEvent.future.isDone());
+ assertFalse(otherEvent.future.isCompletedExceptionally());
+ assertEquals(2, numEventsExecuted.get());
+
+ // e1 poll time = 500
+ // e1 processing time = 4000
+ // e2 enqueue time = 3000
+ // e2 poll time = 500
+ // e2 processing time = 5000
+
+ // e1 poll time / e1 poll time
+ verify(mockRuntimeMetrics, times(1)).recordThreadIdleRatio(1.0);
+ // e1 poll time
+ verify(mockRuntimeMetrics, times(1)).recordEventQueueTime(500L);
+ // e1 processing time + e2 enqueue time
+ verify(mockRuntimeMetrics, times(1)).recordEventQueueProcessingTime(7000L);
+
+ // Second event (e2)
+
+ // idle ratio = e2 poll time / (e1 poll time + e1 processing time + e2 enqueue time + e2 poll time)
+ verify(mockRuntimeMetrics, times(1)).recordThreadIdleRatio(500.0 / (500.0 + 7000.0 + 500.0));
+ // event queue time = e2 enqueue time + e2 poll time
+ verify(mockRuntimeMetrics, times(1)).recordEventQueueTime(3500L);
+ // e2 processing time
+ verify(mockRuntimeMetrics, times(1)).recordEventQueueProcessingTime(5000L);
+ }
+ }
+
+ @Test
+ public void testRecordThreadIdleRatioTwoThreads() throws Exception {
+ GroupCoordinatorRuntimeMetrics mockRuntimeMetrics = mock(GroupCoordinatorRuntimeMetrics.class);
+
+ try (CoordinatorEventProcessor eventProcessor = new MultiThreadedEventProcessor(
+ new LogContext(),
+ "event-processor-",
+ 2,
+ Time.SYSTEM,
+ mockRuntimeMetrics,
+ new MockEventAccumulator<>(Time.SYSTEM, 100L)
+ )) {
+ List recordedRatios = new ArrayList<>();
+ AtomicInteger numEventsExecuted = new AtomicInteger(0);
+ ArgumentCaptor ratioCaptured = ArgumentCaptor.forClass(Double.class);
+ doAnswer(invocation -> {
+ double threadIdleRatio = ratioCaptured.getValue();
+ assertTrue(threadIdleRatio > 0.0);
+ synchronized (recordedRatios) {
+ recordedRatios.add(threadIdleRatio);
+ }
+ return null;
+ }).when(mockRuntimeMetrics).recordThreadIdleRatio(ratioCaptured.capture());
+
+ List> events = Arrays.asList(
+ new FutureEvent<>(new TopicPartition("foo", 0), numEventsExecuted::incrementAndGet),
+ new FutureEvent<>(new TopicPartition("foo", 1), numEventsExecuted::incrementAndGet),
+ new FutureEvent<>(new TopicPartition("foo", 2), numEventsExecuted::incrementAndGet),
+ new FutureEvent<>(new TopicPartition("foo", 0), numEventsExecuted::incrementAndGet),
+ new FutureEvent<>(new TopicPartition("foo", 1), numEventsExecuted::incrementAndGet),
+ new FutureEvent<>(new TopicPartition("foo", 2), numEventsExecuted::incrementAndGet),
+ new FutureEvent<>(new TopicPartition("foo", 2), numEventsExecuted::incrementAndGet)
+ );
+
+ events.forEach(eventProcessor::enqueue);
+
+ CompletableFuture.allOf(events
+ .stream()
+ .map(FutureEvent::future)
+ .toArray(CompletableFuture[]::new)
+ ).get(10, TimeUnit.SECONDS);
+
+ events.forEach(event -> {
+ assertTrue(event.future.isDone());
+ assertFalse(event.future.isCompletedExceptionally());
+ });
+
+ assertEquals(events.size(), numEventsExecuted.get());
+ verify(mockRuntimeMetrics, times(7)).recordThreadIdleRatio(anyDouble());
+
+ assertEquals(7, recordedRatios.size());
+ double average = recordedRatios.stream().mapToDouble(Double::doubleValue).sum() / 7;
+ assertTrue(average > 0.0 && average < 1.0);
+ }
+ }
}