From abee8f711c5c9ab6cae80406ce8ccd65f62841ce Mon Sep 17 00:00:00 2001 From: Jeff Kim Date: Wed, 18 Oct 2023 08:06:23 +0900 Subject: [PATCH] KAFKA-14519; [1/N] Implement coordinator runtime metrics (#14417) Implements the following metrics: kafka.server:type=group-coordinator-metrics,name=num-partitions,state=loading kafka.server:type=group-coordinator-metrics,name=num-partitions,state=active kafka.server:type=group-coordinator-metrics,name=num-partitions,state=failed kafka.server:type=group-coordinator-metrics,name=event-queue-size kafka.server:type=group-coordinator-metrics,name=partition-load-time-max kafka.server:type=group-coordinator-metrics,name=partition-load-time-avg kafka.server:type=group-coordinator-metrics,name=thread-idle-ratio-min kafka.server:type=group-coordinator-metrics,name=thread-idle-ratio-avg The PR makes these metrics generic so that in the future the transaction coordinator runtime can implement the same metrics in a similar fashion. Also, CoordinatorLoaderImpl#load will now return LoadSummary which encapsulates the start time, end time, number of records/bytes. Co-authored-by: David Jacot Reviewers: Ritika Reddy , Calvin Liu , David Jacot , Justine Olshan --- build.gradle | 1 + checkstyle/import-control.xml | 4 + checkstyle/suppressions.xml | 2 + .../group/CoordinatorLoaderImpl.scala | 21 +- .../scala/kafka/server/BrokerServer.scala | 3 + .../group/CoordinatorLoaderImplTest.scala | 71 ++++- .../group/GroupCoordinatorService.java | 14 +- .../metrics/CoordinatorRuntimeMetrics.java | 68 +++++ .../GroupCoordinatorRuntimeMetrics.java | 219 ++++++++++++++++ .../group/runtime/CoordinatorEvent.java | 5 + .../group/runtime/CoordinatorLoader.java | 45 +++- .../group/runtime/CoordinatorRuntime.java | 69 ++++- .../runtime/MultiThreadedEventProcessor.java | 63 ++++- .../GroupCoordinatorRuntimeMetricsTest.java | 149 +++++++++++ .../group/runtime/CoordinatorRuntimeTest.java | 198 ++++++++++++-- .../MultiThreadedEventProcessorTest.java | 243 +++++++++++++++++- 16 files changed, 1124 insertions(+), 51 deletions(-) create mode 100644 group-coordinator/src/main/java/org/apache/kafka/coordinator/group/metrics/CoordinatorRuntimeMetrics.java create mode 100644 group-coordinator/src/main/java/org/apache/kafka/coordinator/group/metrics/GroupCoordinatorRuntimeMetrics.java create mode 100644 group-coordinator/src/test/java/org/apache/kafka/coordinator/group/metrics/GroupCoordinatorRuntimeMetricsTest.java diff --git a/build.gradle b/build.gradle index c88ac7b189d..8fc01156a86 100644 --- a/build.gradle +++ b/build.gradle @@ -1260,6 +1260,7 @@ project(':group-coordinator') { implementation project(':clients') implementation project(':metadata') implementation libs.slf4jApi + implementation libs.metrics testImplementation project(':clients').sourceSets.test.output testImplementation project(':server-common').sourceSets.test.output diff --git a/checkstyle/import-control.xml b/checkstyle/import-control.xml index 22195e78ee6..ea41f587c92 100644 --- a/checkstyle/import-control.xml +++ b/checkstyle/import-control.xml @@ -239,6 +239,10 @@ + + + + diff --git a/checkstyle/suppressions.xml b/checkstyle/suppressions.xml index a554fcc56b6..a66e63428dc 100644 --- a/checkstyle/suppressions.xml +++ b/checkstyle/suppressions.xml @@ -325,6 +325,8 @@ files="(ConsumerGroupMember|GroupMetadataManager).java"/> + doLoad(tp, coordinator, future)) + () => doLoad(tp, coordinator, future, startTimeMs)) if (result.isCancelled) { future.completeExceptionally(new RuntimeException("Coordinator loader is closed.")) } @@ -72,7 +75,8 @@ class CoordinatorLoaderImpl[T]( private def doLoad( tp: TopicPartition, coordinator: CoordinatorPlayback[T], - future: CompletableFuture[Void] + future: CompletableFuture[LoadSummary], + startTimeMs: Long ): Unit = { try { replicaManager.getLog(tp) match { @@ -92,6 +96,8 @@ class CoordinatorLoaderImpl[T]( // the log end offset but the log is empty. This could happen with compacted topics. var readAtLeastOneRecord = true + var numRecords = 0 + var numBytes = 0 while (currentOffset < logEndOffset && readAtLeastOneRecord && isRunning.get) { val fetchDataInfo = log.read( startOffset = currentOffset, @@ -131,6 +137,7 @@ class CoordinatorLoaderImpl[T]( throw new IllegalStateException("Control batches are not supported yet.") } else { batch.asScala.foreach { record => + numRecords = numRecords + 1 try { coordinator.replay(deserializer.deserialize(record.key, record.value)) } catch { @@ -143,10 +150,12 @@ class CoordinatorLoaderImpl[T]( currentOffset = batch.nextOffset } + numBytes = numBytes + memoryRecords.sizeInBytes() } + val endTimeMs = time.milliseconds() if (isRunning.get) { - future.complete(null) + future.complete(new LoadSummary(startTimeMs, endTimeMs, numRecords, numBytes)) } else { future.completeExceptionally(new RuntimeException("Coordinator loader is closed.")) } diff --git a/core/src/main/scala/kafka/server/BrokerServer.scala b/core/src/main/scala/kafka/server/BrokerServer.scala index daec8f73089..fe8e5acb9fa 100644 --- a/core/src/main/scala/kafka/server/BrokerServer.scala +++ b/core/src/main/scala/kafka/server/BrokerServer.scala @@ -36,6 +36,7 @@ import org.apache.kafka.common.security.token.delegation.internals.DelegationTok import org.apache.kafka.common.utils.{LogContext, Time} import org.apache.kafka.common.{ClusterResource, KafkaException, TopicPartition} import org.apache.kafka.coordinator.group +import org.apache.kafka.coordinator.group.metrics.GroupCoordinatorRuntimeMetrics import org.apache.kafka.coordinator.group.util.SystemTimerReaper import org.apache.kafka.coordinator.group.{GroupCoordinator, GroupCoordinatorConfig, GroupCoordinatorService, RecordSerde} import org.apache.kafka.image.publisher.MetadataPublisher @@ -531,6 +532,7 @@ class BrokerServer( new SystemTimer("group-coordinator") ) val loader = new CoordinatorLoaderImpl[group.Record]( + time, replicaManager, serde, config.offsetsLoadBufferSize @@ -546,6 +548,7 @@ class BrokerServer( .withTimer(timer) .withLoader(loader) .withWriter(writer) + .withCoordinatorRuntimeMetrics(new GroupCoordinatorRuntimeMetrics(metrics)) .build() } else { GroupCoordinatorAdapter( diff --git a/core/src/test/scala/unit/kafka/coordinator/group/CoordinatorLoaderImplTest.scala b/core/src/test/scala/unit/kafka/coordinator/group/CoordinatorLoaderImplTest.scala index c71c28d47ea..ef19d732c34 100644 --- a/core/src/test/scala/unit/kafka/coordinator/group/CoordinatorLoaderImplTest.scala +++ b/core/src/test/scala/unit/kafka/coordinator/group/CoordinatorLoaderImplTest.scala @@ -22,14 +22,16 @@ import kafka.utils.TestUtils import org.apache.kafka.common.TopicPartition import org.apache.kafka.common.errors.NotLeaderOrFollowerException import org.apache.kafka.common.record.{CompressionType, FileRecords, MemoryRecords, SimpleRecord} +import org.apache.kafka.common.utils.{MockTime, Time} import org.apache.kafka.coordinator.group.runtime.CoordinatorLoader.UnknownRecordTypeException import org.apache.kafka.coordinator.group.runtime.{CoordinatorLoader, CoordinatorPlayback} import org.apache.kafka.storage.internals.log.{FetchDataInfo, FetchIsolation, LogOffsetMetadata} import org.apache.kafka.test.TestUtils.assertFutureThrows -import org.junit.jupiter.api.Assertions.{assertEquals, assertNull} +import org.junit.jupiter.api.Assertions.{assertEquals, assertNotNull} import org.junit.jupiter.api.{Test, Timeout} import org.mockito.{ArgumentCaptor, ArgumentMatchers} import org.mockito.Mockito.{mock, verify, when} +import org.mockito.invocation.InvocationOnMock import java.nio.ByteBuffer import java.nio.charset.Charset @@ -54,6 +56,7 @@ class CoordinatorLoaderImplTest { val coordinator = mock(classOf[CoordinatorPlayback[(String, String)]]) TestUtils.resource(new CoordinatorLoaderImpl[(String, String)]( + time = Time.SYSTEM, replicaManager = replicaManager, deserializer = serde, loadBufferSize = 1000 @@ -73,6 +76,7 @@ class CoordinatorLoaderImplTest { val coordinator = mock(classOf[CoordinatorPlayback[(String, String)]]) TestUtils.resource(new CoordinatorLoaderImpl[(String, String)]( + time = Time.SYSTEM, replicaManager = replicaManager, deserializer = serde, loadBufferSize = 1000 @@ -93,6 +97,7 @@ class CoordinatorLoaderImplTest { val coordinator = mock(classOf[CoordinatorPlayback[(String, String)]]) TestUtils.resource(new CoordinatorLoaderImpl[(String, String)]( + time = Time.SYSTEM, replicaManager = replicaManager, deserializer = serde, loadBufferSize = 1000 @@ -126,7 +131,7 @@ class CoordinatorLoaderImplTest { minOneMessage = true )).thenReturn(readResult2) - assertNull(loader.load(tp, coordinator).get(10, TimeUnit.SECONDS)) + assertNotNull(loader.load(tp, coordinator).get(10, TimeUnit.SECONDS)) verify(coordinator).replay(("k1", "v1")) verify(coordinator).replay(("k2", "v2")) @@ -145,6 +150,7 @@ class CoordinatorLoaderImplTest { val coordinator = mock(classOf[CoordinatorPlayback[(String, String)]]) TestUtils.resource(new CoordinatorLoaderImpl[(String, String)]( + time = Time.SYSTEM, replicaManager = replicaManager, deserializer = serde, loadBufferSize = 1000 @@ -187,6 +193,7 @@ class CoordinatorLoaderImplTest { val coordinator = mock(classOf[CoordinatorPlayback[(String, String)]]) TestUtils.resource(new CoordinatorLoaderImpl[(String, String)]( + time = Time.SYSTEM, replicaManager = replicaManager, deserializer = serde, loadBufferSize = 1000 @@ -226,6 +233,7 @@ class CoordinatorLoaderImplTest { val coordinator = mock(classOf[CoordinatorPlayback[(String, String)]]) TestUtils.resource(new CoordinatorLoaderImpl[(String, String)]( + time = Time.SYSTEM, replicaManager = replicaManager, deserializer = serde, loadBufferSize = 1000 @@ -266,6 +274,7 @@ class CoordinatorLoaderImplTest { val coordinator = mock(classOf[CoordinatorPlayback[(String, String)]]) TestUtils.resource(new CoordinatorLoaderImpl[(String, String)]( + time = Time.SYSTEM, replicaManager = replicaManager, deserializer = serde, loadBufferSize = 1000 @@ -283,7 +292,63 @@ class CoordinatorLoaderImplTest { minOneMessage = true )).thenReturn(readResult) - assertNull(loader.load(tp, coordinator).get(10, TimeUnit.SECONDS)) + assertNotNull(loader.load(tp, coordinator).get(10, TimeUnit.SECONDS)) + } + } + + @Test + def testLoadSummary(): Unit = { + val tp = new TopicPartition("foo", 0) + val replicaManager = mock(classOf[ReplicaManager]) + val serde = new StringKeyValueDeserializer + val log = mock(classOf[UnifiedLog]) + val coordinator = mock(classOf[CoordinatorPlayback[(String, String)]]) + val time = new MockTime() + + TestUtils.resource(new CoordinatorLoaderImpl[(String, String)]( + time, + replicaManager = replicaManager, + deserializer = serde, + loadBufferSize = 1000 + )) { loader => + val startTimeMs = time.milliseconds() + when(replicaManager.getLog(tp)).thenReturn(Some(log)) + when(log.logStartOffset).thenReturn(0L) + when(replicaManager.getLogEndOffset(tp)).thenReturn(Some(5L)) + + val readResult1 = logReadResult(startOffset = 0, records = Seq( + new SimpleRecord("k1".getBytes, "v1".getBytes), + new SimpleRecord("k2".getBytes, "v2".getBytes) + )) + + when(log.read( + startOffset = 0L, + maxLength = 1000, + isolation = FetchIsolation.LOG_END, + minOneMessage = true + )).thenAnswer((_: InvocationOnMock) => { + time.sleep(1000) + readResult1 + }) + + val readResult2 = logReadResult(startOffset = 2, records = Seq( + new SimpleRecord("k3".getBytes, "v3".getBytes), + new SimpleRecord("k4".getBytes, "v4".getBytes), + new SimpleRecord("k5".getBytes, "v5".getBytes) + )) + + when(log.read( + startOffset = 2L, + maxLength = 1000, + isolation = FetchIsolation.LOG_END, + minOneMessage = true + )).thenReturn(readResult2) + + val summary = loader.load(tp, coordinator).get(10, TimeUnit.SECONDS) + assertEquals(startTimeMs, summary.startTimeMs()) + assertEquals(startTimeMs + 1000, summary.endTimeMs()) + assertEquals(5, summary.numRecords()) + assertEquals(readResult1.records.sizeInBytes() + readResult2.records.sizeInBytes(), summary.numBytes()) } } diff --git a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorService.java b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorService.java index e30bb930269..176dc770cfe 100644 --- a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorService.java +++ b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorService.java @@ -62,6 +62,7 @@ import org.apache.kafka.common.utils.BufferSupplier; import org.apache.kafka.common.utils.LogContext; import org.apache.kafka.common.utils.Time; import org.apache.kafka.common.utils.Utils; +import org.apache.kafka.coordinator.group.metrics.CoordinatorRuntimeMetrics; import org.apache.kafka.coordinator.group.runtime.CoordinatorShardBuilderSupplier; import org.apache.kafka.coordinator.group.runtime.CoordinatorEventProcessor; import org.apache.kafka.coordinator.group.runtime.CoordinatorLoader; @@ -102,6 +103,7 @@ public class GroupCoordinatorService implements GroupCoordinator { private CoordinatorLoader loader; private Time time; private Timer timer; + private CoordinatorRuntimeMetrics coordinatorRuntimeMetrics; public Builder( int nodeId, @@ -131,6 +133,11 @@ public class GroupCoordinatorService implements GroupCoordinator { return this; } + public Builder withCoordinatorRuntimeMetrics(CoordinatorRuntimeMetrics coordinatorRuntimeMetrics) { + this.coordinatorRuntimeMetrics = coordinatorRuntimeMetrics; + return this; + } + public GroupCoordinatorService build() { if (config == null) throw new IllegalArgumentException("Config must be set."); @@ -142,6 +149,8 @@ public class GroupCoordinatorService implements GroupCoordinator { throw new IllegalArgumentException("Time must be set."); if (timer == null) throw new IllegalArgumentException("Timer must be set."); + if (coordinatorRuntimeMetrics == null) + throw new IllegalArgumentException("CoordinatorRuntimeMetrics must be set."); String logPrefix = String.format("GroupCoordinator id=%d", nodeId); LogContext logContext = new LogContext(String.format("[%s] ", logPrefix)); @@ -152,7 +161,9 @@ public class GroupCoordinatorService implements GroupCoordinator { CoordinatorEventProcessor processor = new MultiThreadedEventProcessor( logContext, "group-coordinator-event-processor-", - config.numThreads + config.numThreads, + time, + coordinatorRuntimeMetrics ); CoordinatorRuntime runtime = @@ -166,6 +177,7 @@ public class GroupCoordinatorService implements GroupCoordinator { .withLoader(loader) .withCoordinatorShardBuilderSupplier(supplier) .withTime(time) + .withCoordinatorRuntimeMetrics(coordinatorRuntimeMetrics) .build(); return new GroupCoordinatorService( diff --git a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/metrics/CoordinatorRuntimeMetrics.java b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/metrics/CoordinatorRuntimeMetrics.java new file mode 100644 index 00000000000..8978a040007 --- /dev/null +++ b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/metrics/CoordinatorRuntimeMetrics.java @@ -0,0 +1,68 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.coordinator.group.metrics; + +import org.apache.kafka.coordinator.group.runtime.CoordinatorRuntime.CoordinatorState; + +import java.util.function.Supplier; + +/** + * Used by the group and transaction coordinator runtimes, the metrics suite holds partition state gauges and sensors. + */ +public interface CoordinatorRuntimeMetrics extends AutoCloseable { + + /** + * Called when the partition state changes. + * @param oldState The old state. + * @param newState The new state to transition to. + */ + void recordPartitionStateChange(CoordinatorState oldState, CoordinatorState newState); + + /** + * Record the partition load metric. + * @param startTimeMs The partition load start time. + * @param endTimeMs The partition load end time. + */ + void recordPartitionLoadSensor(long startTimeMs, long endTimeMs); + + /** + * Update the event queue time. + * + * @param durationMs The queue time. + */ + void recordEventQueueTime(long durationMs); + + /** + * Update the event queue processing time. + * + * @param durationMs The event processing time. + */ + void recordEventQueueProcessingTime(long durationMs); + + /** + * Record the thread idle ratio. + * @param ratio The idle ratio. + */ + void recordThreadIdleRatio(double ratio); + + /** + * Register the event queue size gauge. + * + * @param sizeSupplier The size supplier. + */ + void registerEventQueueSizeGauge(Supplier sizeSupplier); +} diff --git a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/metrics/GroupCoordinatorRuntimeMetrics.java b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/metrics/GroupCoordinatorRuntimeMetrics.java new file mode 100644 index 00000000000..e4c6552c4bc --- /dev/null +++ b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/metrics/GroupCoordinatorRuntimeMetrics.java @@ -0,0 +1,219 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.coordinator.group.metrics; + +import org.apache.kafka.common.MetricName; +import org.apache.kafka.common.metrics.Gauge; +import org.apache.kafka.common.metrics.Metrics; +import org.apache.kafka.common.metrics.Sensor; +import org.apache.kafka.common.metrics.stats.Avg; +import org.apache.kafka.common.metrics.stats.Max; +import org.apache.kafka.common.metrics.stats.Min; +import org.apache.kafka.coordinator.group.runtime.CoordinatorRuntime.CoordinatorState; + +import java.util.Arrays; +import java.util.Objects; +import java.util.concurrent.atomic.AtomicLong; +import java.util.function.Supplier; + +public class GroupCoordinatorRuntimeMetrics implements CoordinatorRuntimeMetrics { + /** + * The metrics group. + */ + public static final String METRICS_GROUP = "group-coordinator-metrics"; + + /** + * The partition count metric name. + */ + public static final String NUM_PARTITIONS_METRIC_NAME = "num-partitions"; + + /** + * Metric to count the number of partitions in Loading state. + */ + private final MetricName numPartitionsLoading; + private final AtomicLong numPartitionsLoadingCounter = new AtomicLong(0); + + /** + * Metric to count the number of partitions in Active state. + */ + private final MetricName numPartitionsActive; + private final AtomicLong numPartitionsActiveCounter = new AtomicLong(0); + + /** + * Metric to count the number of partitions in Failed state. + */ + private final MetricName numPartitionsFailed; + private final AtomicLong numPartitionsFailedCounter = new AtomicLong(0); + + /** + * Metric to count the size of the processor queue. + */ + private final MetricName eventQueueSize; + + /** + * The Kafka metrics registry. + */ + private final Metrics metrics; + + /** + * The partition load sensor. + */ + private Sensor partitionLoadSensor; + + /** + * The thread idle sensor. + */ + private Sensor threadIdleRatioSensor; + + public GroupCoordinatorRuntimeMetrics(Metrics metrics) { + this.metrics = Objects.requireNonNull(metrics); + + this.numPartitionsLoading = kafkaMetricName( + NUM_PARTITIONS_METRIC_NAME, + "The number of partitions in Loading state.", + "state", "loading" + ); + + this.numPartitionsActive = kafkaMetricName( + NUM_PARTITIONS_METRIC_NAME, + "The number of partitions in Active state.", + "state", "active" + ); + + this.numPartitionsFailed = kafkaMetricName( + NUM_PARTITIONS_METRIC_NAME, + "The number of partitions in Failed state.", + "state", "failed" + ); + + this.eventQueueSize = kafkaMetricName("event-queue-size", "The event accumulator queue size."); + + metrics.addMetric(numPartitionsLoading, (Gauge) (config, now) -> numPartitionsLoadingCounter.get()); + metrics.addMetric(numPartitionsActive, (Gauge) (config, now) -> numPartitionsActiveCounter.get()); + metrics.addMetric(numPartitionsFailed, (Gauge) (config, now) -> numPartitionsFailedCounter.get()); + + this.partitionLoadSensor = metrics.sensor("GroupPartitionLoadTime"); + this.partitionLoadSensor.add( + metrics.metricName( + "partition-load-time-max", + METRICS_GROUP, + "The max time it took to load the partitions in the last 30 sec." + ), new Max()); + this.partitionLoadSensor.add( + metrics.metricName( + "partition-load-time-avg", + METRICS_GROUP, + "The average time it took to load the partitions in the last 30 sec." + ), new Avg()); + + this.threadIdleRatioSensor = metrics.sensor("ThreadIdleRatio"); + this.threadIdleRatioSensor.add( + metrics.metricName( + "thread-idle-ratio-min", + METRICS_GROUP, + "The minimum thread idle ratio over the last 30 seconds." + ), new Min()); + this.threadIdleRatioSensor.add( + metrics.metricName( + "thread-idle-ratio-avg", + METRICS_GROUP, + "The average thread idle ratio over the last 30 seconds." + ), new Avg()); + } + + /** + * Retrieve the kafka metric name. + * + * @param name The name of the metric. + * + * @return The kafka metric name. + */ + private MetricName kafkaMetricName(String name, String description, String... keyValue) { + return metrics.metricName(name, METRICS_GROUP, description, keyValue); + } + + @Override + public void close() { + Arrays.asList( + numPartitionsLoading, + numPartitionsActive, + numPartitionsFailed, + eventQueueSize + ).forEach(metrics::removeMetric); + + metrics.removeSensor(partitionLoadSensor.name()); + metrics.removeSensor(threadIdleRatioSensor.name()); + } + + /** + * Called when the partition state changes. Decrement the old state and increment the new state. + * + * @param oldState The old state. + * @param newState The new state to transition to. + */ + @Override + public void recordPartitionStateChange(CoordinatorState oldState, CoordinatorState newState) { + switch (oldState) { + case INITIAL: + case CLOSED: + break; + case LOADING: + numPartitionsLoadingCounter.decrementAndGet(); + break; + case ACTIVE: + numPartitionsActiveCounter.decrementAndGet(); + break; + case FAILED: + numPartitionsFailedCounter.decrementAndGet(); + } + + switch (newState) { + case INITIAL: + case CLOSED: + break; + case LOADING: + numPartitionsLoadingCounter.incrementAndGet(); + break; + case ACTIVE: + numPartitionsActiveCounter.incrementAndGet(); + break; + case FAILED: + numPartitionsFailedCounter.incrementAndGet(); + } + } + + @Override + public void recordPartitionLoadSensor(long startTimeMs, long endTimeMs) { + this.partitionLoadSensor.record(endTimeMs - startTimeMs, endTimeMs, false); + } + + @Override + public void recordEventQueueTime(long durationMs) { } + + @Override + public void recordEventQueueProcessingTime(long durationMs) { } + + @Override + public void recordThreadIdleRatio(double ratio) { + threadIdleRatioSensor.record(ratio); + } + + @Override + public void registerEventQueueSizeGauge(Supplier sizeSupplier) { + metrics.addMetric(eventQueueSize, (Gauge) (config, now) -> (long) sizeSupplier.get()); + } +} diff --git a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/runtime/CoordinatorEvent.java b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/runtime/CoordinatorEvent.java index fb9bdbed651..19fc0d88cf8 100644 --- a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/runtime/CoordinatorEvent.java +++ b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/runtime/CoordinatorEvent.java @@ -35,4 +35,9 @@ public interface CoordinatorEvent extends EventAccumulator.Event * @param exception An exception if the processing of the event failed or null otherwise. */ void complete(Throwable exception); + + /** + * @return The created time in milliseconds. + */ + long createdTimeMs(); } diff --git a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/runtime/CoordinatorLoader.java b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/runtime/CoordinatorLoader.java index 0fe23b5dc69..dd6a67ec15c 100644 --- a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/runtime/CoordinatorLoader.java +++ b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/runtime/CoordinatorLoader.java @@ -46,6 +46,49 @@ public interface CoordinatorLoader extends AutoCloseable { } } + /** + * Object that is returned as part of the future from load(). Holds the partition load time and the + * end time. + */ + class LoadSummary { + private final long startTimeMs; + private final long endTimeMs; + private final long numRecords; + private final long numBytes; + + public LoadSummary(long startTimeMs, long endTimeMs, long numRecords, long numBytes) { + this.startTimeMs = startTimeMs; + this.endTimeMs = endTimeMs; + this.numRecords = numRecords; + this.numBytes = numBytes; + } + + public long startTimeMs() { + return startTimeMs; + } + + public long endTimeMs() { + return endTimeMs; + } + + public long numRecords() { + return numRecords; + } + + public long numBytes() { + return numBytes; + } + + @Override + public String toString() { + return "LoadSummary(" + + "startTimeMs=" + startTimeMs + + ", endTimeMs=" + endTimeMs + + ", numRecords=" + numRecords + + ", numBytes=" + numBytes + ")"; + } + } + /** * Deserializer to translates bytes to T. * @@ -69,7 +112,7 @@ public interface CoordinatorLoader extends AutoCloseable { * @param tp The TopicPartition to read from. * @param coordinator The object to apply records to. */ - CompletableFuture load( + CompletableFuture load( TopicPartition tp, CoordinatorPlayback coordinator ); diff --git a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/runtime/CoordinatorRuntime.java b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/runtime/CoordinatorRuntime.java index 8e0f53f5d0b..deeed11bc85 100644 --- a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/runtime/CoordinatorRuntime.java +++ b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/runtime/CoordinatorRuntime.java @@ -24,6 +24,7 @@ import org.apache.kafka.common.protocol.Errors; import org.apache.kafka.common.utils.LogContext; import org.apache.kafka.common.utils.Time; import org.apache.kafka.common.utils.Utils; +import org.apache.kafka.coordinator.group.metrics.CoordinatorRuntimeMetrics; import org.apache.kafka.deferred.DeferredEvent; import org.apache.kafka.deferred.DeferredEventQueue; import org.apache.kafka.image.MetadataDelta; @@ -89,6 +90,7 @@ public class CoordinatorRuntime, U> implements Aut private CoordinatorShardBuilderSupplier coordinatorShardBuilderSupplier; private Time time = Time.SYSTEM; private Timer timer; + private CoordinatorRuntimeMetrics runtimeMetrics; public Builder withLogPrefix(String logPrefix) { this.logPrefix = logPrefix; @@ -130,6 +132,11 @@ public class CoordinatorRuntime, U> implements Aut return this; } + public Builder withCoordinatorRuntimeMetrics(CoordinatorRuntimeMetrics runtimeMetrics) { + this.runtimeMetrics = runtimeMetrics; + return this; + } + public CoordinatorRuntime build() { if (logPrefix == null) logPrefix = ""; @@ -147,6 +154,8 @@ public class CoordinatorRuntime, U> implements Aut throw new IllegalArgumentException("Time must be set."); if (timer == null) throw new IllegalArgumentException("Timer must be set."); + if (runtimeMetrics == null) + throw new IllegalArgumentException("CoordinatorRuntimeMetrics must be set."); return new CoordinatorRuntime<>( logPrefix, @@ -156,7 +165,8 @@ public class CoordinatorRuntime, U> implements Aut loader, coordinatorShardBuilderSupplier, time, - timer + timer, + runtimeMetrics ); } } @@ -164,7 +174,7 @@ public class CoordinatorRuntime, U> implements Aut /** * The various state that a coordinator for a partition can be in. */ - enum CoordinatorState { + public enum CoordinatorState { /** * Initial state when a coordinator is created. */ @@ -501,6 +511,7 @@ public class CoordinatorRuntime, U> implements Aut throw new IllegalStateException("Cannot transition from " + state + " to " + newState); } + CoordinatorState oldState = state; log.debug("Transition from {} to {}.", state, newState); switch (newState) { case LOADING: @@ -537,6 +548,8 @@ public class CoordinatorRuntime, U> implements Aut default: throw new IllegalArgumentException("Transitioning to " + newState + " is not supported."); } + + runtimeMetrics.recordPartitionStateChange(oldState, state); } /** @@ -608,6 +621,11 @@ public class CoordinatorRuntime, U> implements Aut */ CoordinatorResult result; + /** + * The time this event was created. + */ + private final long createdTimeMs; + /** * Constructor. * @@ -624,6 +642,7 @@ public class CoordinatorRuntime, U> implements Aut this.name = name; this.op = op; this.future = new CompletableFuture<>(); + this.createdTimeMs = time.milliseconds(); } /** @@ -709,6 +728,11 @@ public class CoordinatorRuntime, U> implements Aut } } + @Override + public long createdTimeMs() { + return this.createdTimeMs; + } + @Override public String toString() { return "CoordinatorWriteEvent(name=" + name + ")"; @@ -768,6 +792,11 @@ public class CoordinatorRuntime, U> implements Aut */ T response; + /** + * The time this event was created. + */ + private final long createdTimeMs; + /** * Constructor. * @@ -784,6 +813,7 @@ public class CoordinatorRuntime, U> implements Aut this.name = name; this.op = op; this.future = new CompletableFuture<>(); + this.createdTimeMs = time.milliseconds(); } /** @@ -832,6 +862,11 @@ public class CoordinatorRuntime, U> implements Aut } } + @Override + public long createdTimeMs() { + return this.createdTimeMs; + } + @Override public String toString() { return "CoordinatorReadEvent(name=" + name + ")"; @@ -857,6 +892,11 @@ public class CoordinatorRuntime, U> implements Aut */ final Runnable op; + /** + * The time this event was created. + */ + private final long createdTimeMs; + /** * Constructor. * @@ -872,6 +912,7 @@ public class CoordinatorRuntime, U> implements Aut this.tp = tp; this.name = name; this.op = op; + this.createdTimeMs = time.milliseconds(); } /** @@ -907,6 +948,11 @@ public class CoordinatorRuntime, U> implements Aut } } + @Override + public long createdTimeMs() { + return this.createdTimeMs; + } + @Override public String toString() { return "InternalEvent(name=" + name + ")"; @@ -995,6 +1041,11 @@ public class CoordinatorRuntime, U> implements Aut */ private final CoordinatorShardBuilderSupplier coordinatorShardBuilderSupplier; + /** + * The coordinator runtime metrics. + */ + private final CoordinatorRuntimeMetrics runtimeMetrics; + /** * Atomic boolean indicating whether the runtime is running. */ @@ -1025,7 +1076,8 @@ public class CoordinatorRuntime, U> implements Aut CoordinatorLoader loader, CoordinatorShardBuilderSupplier coordinatorShardBuilderSupplier, Time time, - Timer timer + Timer timer, + CoordinatorRuntimeMetrics runtimeMetrics ) { this.logPrefix = logPrefix; this.logContext = logContext; @@ -1038,6 +1090,7 @@ public class CoordinatorRuntime, U> implements Aut this.highWatermarklistener = new HighWatermarkListener(); this.loader = loader; this.coordinatorShardBuilderSupplier = coordinatorShardBuilderSupplier; + this.runtimeMetrics = runtimeMetrics; } /** @@ -1242,7 +1295,7 @@ public class CoordinatorRuntime, U> implements Aut case FAILED: case INITIAL: context.transitionTo(CoordinatorState.LOADING); - loader.load(tp, context.coordinator).whenComplete((state, exception) -> { + loader.load(tp, context.coordinator).whenComplete((summary, exception) -> { scheduleInternalOperation("CompleteLoad(tp=" + tp + ", epoch=" + partitionEpoch + ")", tp, () -> { withContextOrThrow(tp, ctx -> { if (ctx.state != CoordinatorState.LOADING) { @@ -1254,8 +1307,11 @@ public class CoordinatorRuntime, U> implements Aut try { if (exception != null) throw exception; ctx.transitionTo(CoordinatorState.ACTIVE); - log.info("Finished loading of metadata from {} with epoch {}.", - tp, partitionEpoch + if (summary != null) { + runtimeMetrics.recordPartitionLoadSensor(summary.startTimeMs(), summary.endTimeMs()); + } + log.info("Finished loading of metadata from {} with epoch {} and LoadSummary={}.", + tp, partitionEpoch, summary ); } catch (Throwable ex) { log.error("Failed to load metadata from {} with epoch {} due to {}.", @@ -1373,6 +1429,7 @@ public class CoordinatorRuntime, U> implements Aut context.transitionTo(CoordinatorState.CLOSED); }); coordinators.clear(); + Utils.closeQuietly(runtimeMetrics, "runtime metrics"); log.info("Coordinator runtime closed."); } } diff --git a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/runtime/MultiThreadedEventProcessor.java b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/runtime/MultiThreadedEventProcessor.java index 1f35d65c3f4..e4adc18e957 100644 --- a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/runtime/MultiThreadedEventProcessor.java +++ b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/runtime/MultiThreadedEventProcessor.java @@ -18,9 +18,12 @@ package org.apache.kafka.coordinator.group.runtime; import org.apache.kafka.common.TopicPartition; import org.apache.kafka.common.utils.LogContext; +import org.apache.kafka.common.utils.Time; +import org.apache.kafka.coordinator.group.metrics.CoordinatorRuntimeMetrics; import org.slf4j.Logger; import java.util.List; +import java.util.Objects; import java.util.concurrent.RejectedExecutionException; import java.util.concurrent.TimeUnit; import java.util.stream.Collectors; @@ -52,21 +55,50 @@ public class MultiThreadedEventProcessor implements CoordinatorEventProcessor { */ private volatile boolean shuttingDown; + /** + * The coordinator runtime metrics. + */ + private final CoordinatorRuntimeMetrics metrics; + + /** + * The time. + */ + private final Time time; + + public MultiThreadedEventProcessor( + LogContext logContext, + String threadPrefix, + int numThreads, + Time time, + CoordinatorRuntimeMetrics metrics + ) { + this(logContext, threadPrefix, numThreads, time, metrics, new EventAccumulator<>()); + } + /** * Constructor. * - * @param logContext The log context. - * @param threadPrefix The thread prefix. - * @param numThreads The number of threads. + * @param logContext The log context. + * @param threadPrefix The thread prefix. + * @param numThreads The number of threads. + * @param metrics The coordinator runtime metrics. + * @param time The time. + * @param eventAccumulator The event accumulator. */ public MultiThreadedEventProcessor( LogContext logContext, String threadPrefix, - int numThreads + int numThreads, + Time time, + CoordinatorRuntimeMetrics metrics, + EventAccumulator eventAccumulator ) { this.log = logContext.logger(MultiThreadedEventProcessor.class); this.shuttingDown = false; - this.accumulator = new EventAccumulator<>(); + this.accumulator = eventAccumulator; + this.time = Objects.requireNonNull(time); + this.metrics = Objects.requireNonNull(metrics); + this.metrics.registerEventQueueSizeGauge(accumulator::size); this.threads = IntStream.range(0, numThreads).mapToObj(threadId -> new EventProcessorThread( threadPrefix + threadId @@ -81,6 +113,9 @@ public class MultiThreadedEventProcessor implements CoordinatorEventProcessor { */ private class EventProcessorThread extends Thread { private final Logger log; + private long pollStartMs; + private long timeSinceLastPollMs; + private long lastPollMs; EventProcessorThread( String name @@ -92,11 +127,16 @@ public class MultiThreadedEventProcessor implements CoordinatorEventProcessor { private void handleEvents() { while (!shuttingDown) { + recordPollStartTime(time.milliseconds()); CoordinatorEvent event = accumulator.poll(); + recordPollEndTime(time.milliseconds()); if (event != null) { try { log.debug("Executing event: {}.", event); + long dequeuedTimeMs = time.milliseconds(); + metrics.recordEventQueueTime(dequeuedTimeMs - event.createdTimeMs()); event.run(); + metrics.recordEventQueueProcessingTime(time.milliseconds() - dequeuedTimeMs); } catch (Throwable t) { log.error("Failed to run event {} due to: {}.", event, t.getMessage(), t); event.complete(t); @@ -112,6 +152,7 @@ public class MultiThreadedEventProcessor implements CoordinatorEventProcessor { while (event != null) { try { log.debug("Draining event: {}.", event); + metrics.recordEventQueueTime(time.milliseconds() - event.createdTimeMs()); event.complete(new RejectedExecutionException("EventProcessor is closed.")); } catch (Throwable t) { log.error("Failed to reject event {} due to: {}.", event, t.getMessage(), t); @@ -145,6 +186,18 @@ public class MultiThreadedEventProcessor implements CoordinatorEventProcessor { log.info("Shutdown completed"); } } + + private void recordPollStartTime(long pollStartMs) { + this.pollStartMs = pollStartMs; + this.timeSinceLastPollMs = lastPollMs != 0L ? pollStartMs - lastPollMs : 0; + this.lastPollMs = pollStartMs; + } + + private void recordPollEndTime(long pollEndMs) { + long pollTimeMs = pollEndMs - pollStartMs; + double pollIdleRatio = pollTimeMs * 1.0 / (pollTimeMs + timeSinceLastPollMs); + metrics.recordThreadIdleRatio(pollIdleRatio); + } } /** diff --git a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/metrics/GroupCoordinatorRuntimeMetricsTest.java b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/metrics/GroupCoordinatorRuntimeMetricsTest.java new file mode 100644 index 00000000000..f3726aafb17 --- /dev/null +++ b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/metrics/GroupCoordinatorRuntimeMetricsTest.java @@ -0,0 +1,149 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.coordinator.group.metrics; + +import org.apache.kafka.common.MetricName; +import org.apache.kafka.common.metrics.KafkaMetric; +import org.apache.kafka.common.metrics.Metrics; +import org.apache.kafka.common.utils.MockTime; +import org.apache.kafka.common.utils.Time; +import org.apache.kafka.coordinator.group.runtime.CoordinatorRuntime.CoordinatorState; +import org.junit.jupiter.api.Test; + +import java.util.Arrays; +import java.util.HashSet; +import java.util.stream.IntStream; + +import static org.apache.kafka.coordinator.group.metrics.GroupCoordinatorRuntimeMetrics.METRICS_GROUP; +import static org.apache.kafka.coordinator.group.metrics.GroupCoordinatorRuntimeMetrics.NUM_PARTITIONS_METRIC_NAME; +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertFalse; +import static org.junit.jupiter.api.Assertions.assertTrue; + +public class GroupCoordinatorRuntimeMetricsTest { + + @Test + public void testMetricNames() { + Metrics metrics = new Metrics(); + + HashSet expectedMetrics = new HashSet<>(Arrays.asList( + kafkaMetricName(metrics, NUM_PARTITIONS_METRIC_NAME, "state", "loading"), + kafkaMetricName(metrics, NUM_PARTITIONS_METRIC_NAME, "state", "active"), + kafkaMetricName(metrics, NUM_PARTITIONS_METRIC_NAME, "state", "failed"), + metrics.metricName("event-queue-size", METRICS_GROUP), + metrics.metricName("partition-load-time-max", METRICS_GROUP), + metrics.metricName("partition-load-time-avg", METRICS_GROUP), + metrics.metricName("thread-idle-ratio-min", METRICS_GROUP), + metrics.metricName("thread-idle-ratio-avg", METRICS_GROUP) + )); + + try (GroupCoordinatorRuntimeMetrics runtimeMetrics = new GroupCoordinatorRuntimeMetrics(metrics)) { + runtimeMetrics.registerEventQueueSizeGauge(() -> 0); + expectedMetrics.forEach(metricName -> assertTrue(metrics.metrics().containsKey(metricName))); + } + + expectedMetrics.forEach(metricName -> assertFalse(metrics.metrics().containsKey(metricName))); + } + + @Test + public void testUpdateNumPartitionsMetrics() { + Metrics metrics = new Metrics(); + + try (GroupCoordinatorRuntimeMetrics runtimeMetrics = new GroupCoordinatorRuntimeMetrics(metrics)) { + IntStream.range(0, 10) + .forEach(__ -> runtimeMetrics.recordPartitionStateChange(CoordinatorState.INITIAL, CoordinatorState.LOADING)); + IntStream.range(0, 8) + .forEach(__ -> runtimeMetrics.recordPartitionStateChange(CoordinatorState.LOADING, CoordinatorState.ACTIVE)); + IntStream.range(0, 8) + .forEach(__ -> runtimeMetrics.recordPartitionStateChange(CoordinatorState.ACTIVE, CoordinatorState.FAILED)); + IntStream.range(0, 2) + .forEach(__ -> runtimeMetrics.recordPartitionStateChange(CoordinatorState.FAILED, CoordinatorState.CLOSED)); + + assertMetricGauge(metrics, kafkaMetricName(metrics, NUM_PARTITIONS_METRIC_NAME, "state", "loading"), 2); + assertMetricGauge(metrics, kafkaMetricName(metrics, NUM_PARTITIONS_METRIC_NAME, "state", "active"), 0); + assertMetricGauge(metrics, kafkaMetricName(metrics, NUM_PARTITIONS_METRIC_NAME, "state", "failed"), 6); + } + } + + @Test + public void testPartitionLoadSensorMetrics() { + Time time = new MockTime(); + Metrics metrics = new Metrics(time); + + try (GroupCoordinatorRuntimeMetrics runtimeMetrics = new GroupCoordinatorRuntimeMetrics(metrics)) { + long startTimeMs = time.milliseconds(); + runtimeMetrics.recordPartitionLoadSensor(startTimeMs, startTimeMs + 1000); + runtimeMetrics.recordPartitionLoadSensor(startTimeMs, startTimeMs + 2000); + + org.apache.kafka.common.MetricName metricName = metrics.metricName( + "partition-load-time-avg", METRICS_GROUP); + + KafkaMetric metric = metrics.metrics().get(metricName); + assertEquals(1500.0, metric.metricValue()); + + metricName = metrics.metricName( + "partition-load-time-max", METRICS_GROUP); + metric = metrics.metrics().get(metricName); + assertEquals(2000.0, metric.metricValue()); + } + } + + @Test + public void testThreadIdleRatioSensor() { + Time time = new MockTime(); + Metrics metrics = new Metrics(time); + + try (GroupCoordinatorRuntimeMetrics runtimeMetrics = new GroupCoordinatorRuntimeMetrics(metrics)) { + IntStream.range(0, 3).forEach(i -> runtimeMetrics.recordThreadIdleRatio(1.0 / (i + 1))); + + org.apache.kafka.common.MetricName metricName = metrics.metricName( + "thread-idle-ratio-avg", METRICS_GROUP); + + KafkaMetric metric = metrics.metrics().get(metricName); + assertEquals((11.0 / 6.0) / 3.0, metric.metricValue()); // (6/6 + 3/6 + 2/6) / 3 + + metricName = metrics.metricName( + "thread-idle-ratio-min", METRICS_GROUP); + metric = metrics.metrics().get(metricName); + assertEquals(1.0 / 3.0, metric.metricValue()); + } + } + + @Test + public void testEventQueueSize() { + Time time = new MockTime(); + Metrics metrics = new Metrics(time); + + try (GroupCoordinatorRuntimeMetrics runtimeMetrics = new GroupCoordinatorRuntimeMetrics(metrics)) { + runtimeMetrics.registerEventQueueSizeGauge(() -> 5); + assertMetricGauge(metrics, kafkaMetricName(metrics, "event-queue-size"), 5); + } + } + + private static void assertMetricGauge(Metrics metrics, org.apache.kafka.common.MetricName metricName, long count) { + assertEquals(count, (long) metrics.metric(metricName).metricValue()); + } + + private static com.yammer.metrics.core.MetricName yammerMetricName(String type, String name) { + String mBeanName = String.format("kafka.coordinator.group:type=%s,name=%s", type, name); + return new com.yammer.metrics.core.MetricName("kafka.coordinator.group", type, name, null, mBeanName); + } + + private static MetricName kafkaMetricName(Metrics metrics, String name, String... keyValue) { + return metrics.metricName(name, METRICS_GROUP, "", keyValue); + } +} \ No newline at end of file diff --git a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/runtime/CoordinatorRuntimeTest.java b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/runtime/CoordinatorRuntimeTest.java index a7eb0f6b709..d776a07cd3a 100644 --- a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/runtime/CoordinatorRuntimeTest.java +++ b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/runtime/CoordinatorRuntimeTest.java @@ -21,6 +21,7 @@ import org.apache.kafka.common.TopicPartition; import org.apache.kafka.common.errors.NotCoordinatorException; import org.apache.kafka.common.utils.LogContext; import org.apache.kafka.common.utils.Time; +import org.apache.kafka.coordinator.group.metrics.GroupCoordinatorRuntimeMetrics; import org.apache.kafka.image.MetadataDelta; import org.apache.kafka.image.MetadataImage; import org.apache.kafka.image.MetadataProvenance; @@ -45,6 +46,11 @@ import java.util.concurrent.TimeoutException; import java.util.concurrent.atomic.AtomicInteger; import static org.apache.kafka.common.utils.Utils.mkSet; +import static org.apache.kafka.coordinator.group.runtime.CoordinatorRuntime.CoordinatorState.ACTIVE; +import static org.apache.kafka.coordinator.group.runtime.CoordinatorRuntime.CoordinatorState.CLOSED; +import static org.apache.kafka.coordinator.group.runtime.CoordinatorRuntime.CoordinatorState.FAILED; +import static org.apache.kafka.coordinator.group.runtime.CoordinatorRuntime.CoordinatorState.INITIAL; +import static org.apache.kafka.coordinator.group.runtime.CoordinatorRuntime.CoordinatorState.LOADING; import static org.apache.kafka.test.TestUtils.assertFutureThrows; import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertFalse; @@ -118,9 +124,19 @@ public class CoordinatorRuntimeTest { * A CoordinatorLoader that always succeeds. */ private static class MockCoordinatorLoader implements CoordinatorLoader { + private final LoadSummary summary; + + public MockCoordinatorLoader(LoadSummary summary) { + this.summary = summary; + } + + public MockCoordinatorLoader() { + this(null); + } + @Override - public CompletableFuture load(TopicPartition tp, CoordinatorPlayback replayable) { - return CompletableFuture.completedFuture(null); + public CompletableFuture load(TopicPartition tp, CoordinatorPlayback replayable) { + return CompletableFuture.completedFuture(summary); } @Override @@ -271,6 +287,7 @@ public class CoordinatorRuntimeTest { .withEventProcessor(new DirectEventProcessor()) .withPartitionWriter(writer) .withCoordinatorShardBuilderSupplier(supplier) + .withCoordinatorRuntimeMetrics(mock(GroupCoordinatorRuntimeMetrics.class)) .build(); when(builder.withSnapshotRegistry(any())).thenReturn(builder); @@ -280,7 +297,7 @@ public class CoordinatorRuntimeTest { when(builder.withTopicPartition(any())).thenReturn(builder); when(builder.build()).thenReturn(coordinator); when(supplier.get()).thenReturn(builder); - CompletableFuture future = new CompletableFuture<>(); + CompletableFuture future = new CompletableFuture<>(); when(loader.load(TP, coordinator)).thenReturn(future); // Getting the coordinator context fails because the coordinator @@ -294,13 +311,13 @@ public class CoordinatorRuntimeTest { CoordinatorRuntime.CoordinatorContext ctx = runtime.contextOrThrow(TP); // The coordinator is loading. - assertEquals(CoordinatorRuntime.CoordinatorState.LOADING, ctx.state); + assertEquals(LOADING, ctx.state); assertEquals(0, ctx.epoch); assertEquals(coordinator, ctx.coordinator); // When the loading completes, the coordinator transitions to active. future.complete(null); - assertEquals(CoordinatorRuntime.CoordinatorState.ACTIVE, ctx.state); + assertEquals(ACTIVE, ctx.state); // Verify that onLoaded is called. verify(coordinator, times(1)).onLoaded(MetadataImage.EMPTY); @@ -335,6 +352,7 @@ public class CoordinatorRuntimeTest { .withEventProcessor(new DirectEventProcessor()) .withPartitionWriter(writer) .withCoordinatorShardBuilderSupplier(supplier) + .withCoordinatorRuntimeMetrics(mock(GroupCoordinatorRuntimeMetrics.class)) .build(); when(builder.withSnapshotRegistry(any())).thenReturn(builder); @@ -344,7 +362,7 @@ public class CoordinatorRuntimeTest { when(builder.withTopicPartition(any())).thenReturn(builder); when(builder.build()).thenReturn(coordinator); when(supplier.get()).thenReturn(builder); - CompletableFuture future = new CompletableFuture<>(); + CompletableFuture future = new CompletableFuture<>(); when(loader.load(TP, coordinator)).thenReturn(future); // Schedule the loading. @@ -352,13 +370,13 @@ public class CoordinatorRuntimeTest { // Getting the context succeeds and the coordinator should be in loading. CoordinatorRuntime.CoordinatorContext ctx = runtime.contextOrThrow(TP); - assertEquals(CoordinatorRuntime.CoordinatorState.LOADING, ctx.state); + assertEquals(LOADING, ctx.state); assertEquals(0, ctx.epoch); assertEquals(coordinator, ctx.coordinator); // When the loading fails, the coordinator transitions to failed. future.completeExceptionally(new Exception("failure")); - assertEquals(CoordinatorRuntime.CoordinatorState.FAILED, ctx.state); + assertEquals(FAILED, ctx.state); // Verify that onUnloaded is called. verify(coordinator, times(1)).onUnloaded(); @@ -386,6 +404,7 @@ public class CoordinatorRuntimeTest { .withEventProcessor(new DirectEventProcessor()) .withPartitionWriter(new MockPartitionWriter()) .withCoordinatorShardBuilderSupplier(supplier) + .withCoordinatorRuntimeMetrics(mock(GroupCoordinatorRuntimeMetrics.class)) .build(); when(builder.withSnapshotRegistry(any())).thenReturn(builder); @@ -395,7 +414,7 @@ public class CoordinatorRuntimeTest { when(builder.withTopicPartition(any())).thenReturn(builder); when(builder.build()).thenReturn(coordinator); when(supplier.get()).thenReturn(builder); - CompletableFuture future = new CompletableFuture<>(); + CompletableFuture future = new CompletableFuture<>(); when(loader.load(TP, coordinator)).thenReturn(future); // Schedule the loading. @@ -403,19 +422,19 @@ public class CoordinatorRuntimeTest { // Getting the context succeeds and the coordinator should be in loading. CoordinatorRuntime.CoordinatorContext ctx = runtime.contextOrThrow(TP); - assertEquals(CoordinatorRuntime.CoordinatorState.LOADING, ctx.state); + assertEquals(LOADING, ctx.state); assertEquals(10, ctx.epoch); assertEquals(coordinator, ctx.coordinator); // When the loading completes, the coordinator transitions to active. future.complete(null); - assertEquals(CoordinatorRuntime.CoordinatorState.ACTIVE, ctx.state); + assertEquals(ACTIVE, ctx.state); assertEquals(10, ctx.epoch); // Loading with a previous epoch is a no-op. The coordinator stays // in active state with the correct epoch. runtime.scheduleLoadOperation(TP, 0); - assertEquals(CoordinatorRuntime.CoordinatorState.ACTIVE, ctx.state); + assertEquals(ACTIVE, ctx.state); assertEquals(10, ctx.epoch); } @@ -435,6 +454,7 @@ public class CoordinatorRuntimeTest { .withEventProcessor(new DirectEventProcessor()) .withPartitionWriter(new MockPartitionWriter()) .withCoordinatorShardBuilderSupplier(supplier) + .withCoordinatorRuntimeMetrics(mock(GroupCoordinatorRuntimeMetrics.class)) .build(); when(builder.withSnapshotRegistry(any())).thenReturn(builder); @@ -444,7 +464,7 @@ public class CoordinatorRuntimeTest { when(builder.withTopicPartition(any())).thenReturn(builder); when(builder.build()).thenReturn(coordinator); when(supplier.get()).thenReturn(builder); - CompletableFuture future = new CompletableFuture<>(); + CompletableFuture future = new CompletableFuture<>(); when(loader.load(TP, coordinator)).thenReturn(future); // Schedule the loading. @@ -452,13 +472,13 @@ public class CoordinatorRuntimeTest { // Getting the context succeeds and the coordinator should be in loading. CoordinatorRuntime.CoordinatorContext ctx = runtime.contextOrThrow(TP); - assertEquals(CoordinatorRuntime.CoordinatorState.LOADING, ctx.state); + assertEquals(LOADING, ctx.state); assertEquals(10, ctx.epoch); assertEquals(coordinator, ctx.coordinator); // When the loading fails, the coordinator transitions to failed. future.completeExceptionally(new Exception("failure")); - assertEquals(CoordinatorRuntime.CoordinatorState.FAILED, ctx.state); + assertEquals(FAILED, ctx.state); // Verify that onUnloaded is called. verify(coordinator, times(1)).onUnloaded(); @@ -474,7 +494,7 @@ public class CoordinatorRuntimeTest { // Getting the context succeeds and the coordinator should be in loading. ctx = runtime.contextOrThrow(TP); - assertEquals(CoordinatorRuntime.CoordinatorState.LOADING, ctx.state); + assertEquals(LOADING, ctx.state); assertEquals(11, ctx.epoch); assertEquals(coordinator, ctx.coordinator); @@ -482,7 +502,7 @@ public class CoordinatorRuntimeTest { future.complete(null); // Verify the state. - assertEquals(CoordinatorRuntime.CoordinatorState.ACTIVE, ctx.state); + assertEquals(ACTIVE, ctx.state); } @Test @@ -501,6 +521,7 @@ public class CoordinatorRuntimeTest { .withEventProcessor(new DirectEventProcessor()) .withPartitionWriter(writer) .withCoordinatorShardBuilderSupplier(supplier) + .withCoordinatorRuntimeMetrics(mock(GroupCoordinatorRuntimeMetrics.class)) .build(); when(builder.withSnapshotRegistry(any())).thenReturn(builder); @@ -514,12 +535,12 @@ public class CoordinatorRuntimeTest { // Loads the coordinator. It directly transitions to active. runtime.scheduleLoadOperation(TP, 10); CoordinatorRuntime.CoordinatorContext ctx = runtime.contextOrThrow(TP); - assertEquals(CoordinatorRuntime.CoordinatorState.ACTIVE, ctx.state); + assertEquals(ACTIVE, ctx.state); assertEquals(10, ctx.epoch); // Schedule the unloading. runtime.scheduleUnloadOperation(TP, ctx.epoch + 1); - assertEquals(CoordinatorRuntime.CoordinatorState.CLOSED, ctx.state); + assertEquals(CLOSED, ctx.state); // Verify that onUnloaded is called. verify(coordinator, times(1)).onUnloaded(); @@ -549,6 +570,7 @@ public class CoordinatorRuntimeTest { .withEventProcessor(new DirectEventProcessor()) .withPartitionWriter(new MockPartitionWriter()) .withCoordinatorShardBuilderSupplier(supplier) + .withCoordinatorRuntimeMetrics(mock(GroupCoordinatorRuntimeMetrics.class)) .build(); when(builder.withSnapshotRegistry(any())).thenReturn(builder); @@ -564,13 +586,13 @@ public class CoordinatorRuntimeTest { // Loads the coordinator. It directly transitions to active. runtime.scheduleLoadOperation(TP, 10); CoordinatorRuntime.CoordinatorContext ctx = runtime.contextOrThrow(TP); - assertEquals(CoordinatorRuntime.CoordinatorState.ACTIVE, ctx.state); + assertEquals(ACTIVE, ctx.state); assertEquals(10, ctx.epoch); // Unloading with a previous epoch is a no-op. The coordinator stays // in active with the correct epoch. runtime.scheduleUnloadOperation(TP, 0); - assertEquals(CoordinatorRuntime.CoordinatorState.ACTIVE, ctx.state); + assertEquals(ACTIVE, ctx.state); assertEquals(10, ctx.epoch); } @@ -587,6 +609,7 @@ public class CoordinatorRuntimeTest { .withEventProcessor(new DirectEventProcessor()) .withPartitionWriter(writer) .withCoordinatorShardBuilderSupplier(new MockCoordinatorShardBuilderSupplier()) + .withCoordinatorRuntimeMetrics(mock(GroupCoordinatorRuntimeMetrics.class)) .build(); // Schedule the loading. @@ -695,6 +718,7 @@ public class CoordinatorRuntimeTest { .withEventProcessor(new DirectEventProcessor()) .withPartitionWriter(new MockPartitionWriter()) .withCoordinatorShardBuilderSupplier(new MockCoordinatorShardBuilderSupplier()) + .withCoordinatorRuntimeMetrics(mock(GroupCoordinatorRuntimeMetrics.class)) .build(); // Scheduling a write fails with a NotCoordinatorException because the coordinator @@ -715,6 +739,7 @@ public class CoordinatorRuntimeTest { .withEventProcessor(new DirectEventProcessor()) .withPartitionWriter(new MockPartitionWriter()) .withCoordinatorShardBuilderSupplier(new MockCoordinatorShardBuilderSupplier()) + .withCoordinatorRuntimeMetrics(mock(GroupCoordinatorRuntimeMetrics.class)) .build(); // Loads the coordinator. @@ -739,6 +764,7 @@ public class CoordinatorRuntimeTest { .withEventProcessor(new DirectEventProcessor()) .withPartitionWriter(new MockPartitionWriter()) .withCoordinatorShardBuilderSupplier(new MockCoordinatorShardBuilderSupplier()) + .withCoordinatorRuntimeMetrics(mock(GroupCoordinatorRuntimeMetrics.class)) .build(); // Loads the coordinator. @@ -784,6 +810,7 @@ public class CoordinatorRuntimeTest { .withEventProcessor(new DirectEventProcessor()) .withPartitionWriter(writer) .withCoordinatorShardBuilderSupplier(new MockCoordinatorShardBuilderSupplier()) + .withCoordinatorRuntimeMetrics(mock(GroupCoordinatorRuntimeMetrics.class)) .build(); // Loads the coordinator. @@ -831,6 +858,7 @@ public class CoordinatorRuntimeTest { .withEventProcessor(new DirectEventProcessor()) .withPartitionWriter(writer) .withCoordinatorShardBuilderSupplier(new MockCoordinatorShardBuilderSupplier()) + .withCoordinatorRuntimeMetrics(mock(GroupCoordinatorRuntimeMetrics.class)) .build(); // Loads the coordinator. @@ -885,6 +913,7 @@ public class CoordinatorRuntimeTest { .withEventProcessor(new DirectEventProcessor()) .withPartitionWriter(new MockPartitionWriter()) .withCoordinatorShardBuilderSupplier(new MockCoordinatorShardBuilderSupplier()) + .withCoordinatorRuntimeMetrics(mock(GroupCoordinatorRuntimeMetrics.class)) .build(); // Schedule a read. It fails because the coordinator does not exist. @@ -906,6 +935,7 @@ public class CoordinatorRuntimeTest { .withEventProcessor(new DirectEventProcessor()) .withPartitionWriter(writer) .withCoordinatorShardBuilderSupplier(new MockCoordinatorShardBuilderSupplier()) + .withCoordinatorRuntimeMetrics(mock(GroupCoordinatorRuntimeMetrics.class)) .build(); // Loads the coordinator. @@ -947,6 +977,7 @@ public class CoordinatorRuntimeTest { .withEventProcessor(new DirectEventProcessor()) .withPartitionWriter(new MockPartitionWriter()) .withCoordinatorShardBuilderSupplier(new MockCoordinatorShardBuilderSupplier()) + .withCoordinatorRuntimeMetrics(mock(GroupCoordinatorRuntimeMetrics.class)) .build(); // Loads the coordinator. @@ -1012,6 +1043,7 @@ public class CoordinatorRuntimeTest { .withEventProcessor(new DirectEventProcessor()) .withPartitionWriter(writer) .withCoordinatorShardBuilderSupplier(supplier) + .withCoordinatorRuntimeMetrics(mock(GroupCoordinatorRuntimeMetrics.class)) .build(); MockCoordinatorShard coordinator0 = mock(MockCoordinatorShard.class); @@ -1028,10 +1060,10 @@ public class CoordinatorRuntimeTest { .thenReturn(coordinator0) .thenReturn(coordinator1); - CompletableFuture future0 = new CompletableFuture<>(); + CompletableFuture future0 = new CompletableFuture<>(); when(loader.load(tp0, coordinator0)).thenReturn(future0); - CompletableFuture future1 = new CompletableFuture<>(); + CompletableFuture future1 = new CompletableFuture<>(); when(loader.load(tp1, coordinator1)).thenReturn(future1); runtime.scheduleLoadOperation(tp0, 0); @@ -1067,6 +1099,7 @@ public class CoordinatorRuntimeTest { .withEventProcessor(new DirectEventProcessor()) .withPartitionWriter(new MockPartitionWriter()) .withCoordinatorShardBuilderSupplier(new MockCoordinatorShardBuilderSupplier()) + .withCoordinatorRuntimeMetrics(mock(GroupCoordinatorRuntimeMetrics.class)) .build(); // Loads the coordinator. @@ -1118,6 +1151,7 @@ public class CoordinatorRuntimeTest { .withEventProcessor(processor) .withPartitionWriter(new MockPartitionWriter()) .withCoordinatorShardBuilderSupplier(new MockCoordinatorShardBuilderSupplier()) + .withCoordinatorRuntimeMetrics(mock(GroupCoordinatorRuntimeMetrics.class)) .build(); // Loads the coordinator. @@ -1189,6 +1223,7 @@ public class CoordinatorRuntimeTest { .withEventProcessor(processor) .withPartitionWriter(new MockPartitionWriter()) .withCoordinatorShardBuilderSupplier(new MockCoordinatorShardBuilderSupplier()) + .withCoordinatorRuntimeMetrics(mock(GroupCoordinatorRuntimeMetrics.class)) .build(); // Loads the coordinator. @@ -1257,6 +1292,7 @@ public class CoordinatorRuntimeTest { .withEventProcessor(new DirectEventProcessor()) .withPartitionWriter(new MockPartitionWriter()) .withCoordinatorShardBuilderSupplier(new MockCoordinatorShardBuilderSupplier()) + .withCoordinatorRuntimeMetrics(mock(GroupCoordinatorRuntimeMetrics.class)) .build(); // Loads the coordinator. @@ -1313,6 +1349,7 @@ public class CoordinatorRuntimeTest { .withEventProcessor(new DirectEventProcessor()) .withPartitionWriter(new MockPartitionWriter()) .withCoordinatorShardBuilderSupplier(new MockCoordinatorShardBuilderSupplier()) + .withCoordinatorRuntimeMetrics(mock(GroupCoordinatorRuntimeMetrics.class)) .build(); // Loads the coordinator. @@ -1340,4 +1377,119 @@ public class CoordinatorRuntimeTest { assertEquals(1, cnt.get()); assertEquals(0, ctx.timer.size()); } + + @Test + public void testStateChanges() throws Exception { + MockTimer timer = new MockTimer(); + MockPartitionWriter writer = mock(MockPartitionWriter.class); + MockCoordinatorLoader loader = mock(MockCoordinatorLoader.class); + MockCoordinatorShardBuilderSupplier supplier = mock(MockCoordinatorShardBuilderSupplier.class); + MockCoordinatorShardBuilder builder = mock(MockCoordinatorShardBuilder.class); + MockCoordinatorShard coordinator = mock(MockCoordinatorShard.class); + GroupCoordinatorRuntimeMetrics runtimeMetrics = mock(GroupCoordinatorRuntimeMetrics.class); + + CoordinatorRuntime runtime = + new CoordinatorRuntime.Builder() + .withTime(timer.time()) + .withTimer(timer) + .withLoader(loader) + .withEventProcessor(new DirectEventProcessor()) + .withPartitionWriter(writer) + .withCoordinatorShardBuilderSupplier(supplier) + .withCoordinatorRuntimeMetrics(runtimeMetrics) + .build(); + + when(builder.withSnapshotRegistry(any())).thenReturn(builder); + when(builder.withLogContext(any())).thenReturn(builder); + when(builder.withTime(any())).thenReturn(builder); + when(builder.withTimer(any())).thenReturn(builder); + when(builder.withTopicPartition(any())).thenReturn(builder); + when(builder.build()).thenReturn(coordinator); + when(supplier.get()).thenReturn(builder); + CompletableFuture future = new CompletableFuture<>(); + when(loader.load(TP, coordinator)).thenReturn(future); + + // Schedule the loading. + runtime.scheduleLoadOperation(TP, 0); + + // Getting the context succeeds and the coordinator should be in loading. + CoordinatorRuntime.CoordinatorContext ctx = runtime.contextOrThrow(TP); + assertEquals(LOADING, ctx.state); + verify(runtimeMetrics, times(1)).recordPartitionStateChange(INITIAL, LOADING); + + // When the loading fails, the coordinator transitions to failed. + future.completeExceptionally(new Exception("failure")); + assertEquals(FAILED, ctx.state); + verify(runtimeMetrics, times(1)).recordPartitionStateChange(LOADING, FAILED); + + // Start loading a new topic partition. + TopicPartition tp = new TopicPartition("__consumer_offsets", 1); + future = new CompletableFuture<>(); + when(loader.load(tp, coordinator)).thenReturn(future); + // Schedule the loading. + runtime.scheduleLoadOperation(tp, 0); + // Getting the context succeeds and the coordinator should be in loading. + ctx = runtime.contextOrThrow(tp); + assertEquals(LOADING, ctx.state); + verify(runtimeMetrics, times(2)).recordPartitionStateChange(INITIAL, LOADING); + + // When the loading completes, the coordinator transitions to active. + future.complete(null); + assertEquals(ACTIVE, ctx.state); + verify(runtimeMetrics, times(1)).recordPartitionStateChange(LOADING, ACTIVE); + + runtime.close(); + verify(runtimeMetrics, times(1)).recordPartitionStateChange(FAILED, CLOSED); + verify(runtimeMetrics, times(1)).recordPartitionStateChange(ACTIVE, CLOSED); + } + + @Test + public void testPartitionLoadSensor() { + MockTimer timer = new MockTimer(); + MockPartitionWriter writer = mock(MockPartitionWriter.class); + MockCoordinatorShardBuilderSupplier supplier = mock(MockCoordinatorShardBuilderSupplier.class); + MockCoordinatorShardBuilder builder = mock(MockCoordinatorShardBuilder.class); + MockCoordinatorShard coordinator = mock(MockCoordinatorShard.class); + GroupCoordinatorRuntimeMetrics runtimeMetrics = mock(GroupCoordinatorRuntimeMetrics.class); + + long startTimeMs = timer.time().milliseconds(); + CoordinatorRuntime runtime = + new CoordinatorRuntime.Builder() + .withTime(timer.time()) + .withTimer(timer) + .withLoader(new MockCoordinatorLoader( + new CoordinatorLoader.LoadSummary( + startTimeMs, + startTimeMs + 1000, + 30, + 3000))) + .withEventProcessor(new DirectEventProcessor()) + .withPartitionWriter(writer) + .withCoordinatorShardBuilderSupplier(supplier) + .withCoordinatorRuntimeMetrics(runtimeMetrics) + .build(); + + when(builder.withSnapshotRegistry(any())).thenReturn(builder); + when(builder.withLogContext(any())).thenReturn(builder); + when(builder.withTime(any())).thenReturn(builder); + when(builder.withTimer(any())).thenReturn(builder); + when(builder.withTopicPartition(any())).thenReturn(builder); + when(builder.build()).thenReturn(coordinator); + when(supplier.get()).thenReturn(builder); + + // Getting the coordinator context fails because the coordinator + // does not exist until scheduleLoadOperation is called. + assertThrows(NotCoordinatorException.class, () -> runtime.contextOrThrow(TP)); + + // Schedule the loading. + runtime.scheduleLoadOperation(TP, 0); + + // Getting the coordinator context succeeds now. + CoordinatorRuntime.CoordinatorContext ctx = runtime.contextOrThrow(TP); + + // When the loading completes, the coordinator transitions to active. + assertEquals(ACTIVE, ctx.state); + + verify(runtimeMetrics, times(1)).recordPartitionLoadSensor(startTimeMs, startTimeMs + 1000); + } } diff --git a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/runtime/MultiThreadedEventProcessorTest.java b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/runtime/MultiThreadedEventProcessorTest.java index 0630c88f389..f6017b90bcd 100644 --- a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/runtime/MultiThreadedEventProcessorTest.java +++ b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/runtime/MultiThreadedEventProcessorTest.java @@ -18,16 +18,24 @@ package org.apache.kafka.coordinator.group.runtime; import org.apache.kafka.common.TopicPartition; import org.apache.kafka.common.utils.LogContext; +import org.apache.kafka.common.utils.MockTime; +import org.apache.kafka.common.utils.Time; +import org.apache.kafka.coordinator.group.metrics.GroupCoordinatorRuntimeMetrics; import org.junit.jupiter.api.Test; import org.junit.jupiter.api.Timeout; +import org.mockito.ArgumentCaptor; +import java.util.ArrayList; import java.util.Arrays; +import java.util.LinkedList; import java.util.List; +import java.util.Queue; import java.util.concurrent.CompletableFuture; import java.util.concurrent.CountDownLatch; import java.util.concurrent.ExecutionException; import java.util.concurrent.RejectedExecutionException; import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; import java.util.function.Supplier; @@ -37,9 +45,63 @@ import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertFalse; import static org.junit.jupiter.api.Assertions.assertThrows; import static org.junit.jupiter.api.Assertions.assertTrue; +import static org.mockito.ArgumentMatchers.anyDouble; +import static org.mockito.Mockito.doAnswer; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.times; +import static org.mockito.Mockito.verify; @Timeout(value = 60) public class MultiThreadedEventProcessorTest { + private static class MockEventAccumulator extends EventAccumulator { + private final Time time; + private final Queue events; + private final long timeToPollMs; + private final AtomicBoolean isClosed; + + public MockEventAccumulator(Time time, long timeToPollMs) { + this.time = time; + this.events = new LinkedList<>(); + this.timeToPollMs = timeToPollMs; + this.isClosed = new AtomicBoolean(false); + } + + @Override + public CoordinatorEvent poll() { + synchronized (events) { + while (events.isEmpty() && !isClosed.get()) { + try { + events.wait(); + } catch (Exception ignored) { + + } + } + time.sleep(timeToPollMs); + return events.poll(); + } + } + + @Override + public CoordinatorEvent poll(long timeout, TimeUnit unit) { + return null; + } + + @Override + public void add(CoordinatorEvent event) throws RejectedExecutionException { + synchronized (events) { + events.add(event); + events.notifyAll(); + } + } + + @Override + public void close() { + isClosed.set(true); + synchronized (events) { + events.notifyAll(); + } + } + } private static class FutureEvent implements CoordinatorEvent { private final TopicPartition key; @@ -48,18 +110,28 @@ public class MultiThreadedEventProcessorTest { private final boolean block; private final CountDownLatch latch; private final CountDownLatch executed; + private long createdTimeMs; FutureEvent( TopicPartition key, Supplier supplier ) { - this(key, supplier, false); + this(key, supplier, false, 0L); } FutureEvent( TopicPartition key, Supplier supplier, boolean block + ) { + this(key, supplier, block, 0L); + } + + FutureEvent( + TopicPartition key, + Supplier supplier, + boolean block, + long createdTimeMs ) { this.key = key; this.future = new CompletableFuture<>(); @@ -67,6 +139,7 @@ public class MultiThreadedEventProcessorTest { this.block = block; this.latch = new CountDownLatch(1); this.executed = new CountDownLatch(1); + this.createdTimeMs = createdTimeMs; } @Override @@ -90,6 +163,11 @@ public class MultiThreadedEventProcessorTest { future.completeExceptionally(ex); } + @Override + public long createdTimeMs() { + return createdTimeMs; + } + @Override public TopicPartition key() { return key; @@ -118,7 +196,9 @@ public class MultiThreadedEventProcessorTest { CoordinatorEventProcessor eventProcessor = new MultiThreadedEventProcessor( new LogContext(), "event-processor-", - 2 + 2, + Time.SYSTEM, + mock(GroupCoordinatorRuntimeMetrics.class) ); eventProcessor.close(); } @@ -128,7 +208,9 @@ public class MultiThreadedEventProcessorTest { try (CoordinatorEventProcessor eventProcessor = new MultiThreadedEventProcessor( new LogContext(), "event-processor-", - 2 + 2, + Time.SYSTEM, + mock(GroupCoordinatorRuntimeMetrics.class) )) { AtomicInteger numEventsExecuted = new AtomicInteger(0); @@ -163,7 +245,9 @@ public class MultiThreadedEventProcessorTest { try (CoordinatorEventProcessor eventProcessor = new MultiThreadedEventProcessor( new LogContext(), "event-processor-", - 2 + 2, + Time.SYSTEM, + mock(GroupCoordinatorRuntimeMetrics.class) )) { AtomicInteger numEventsExecuted = new AtomicInteger(0); @@ -246,7 +330,9 @@ public class MultiThreadedEventProcessorTest { CoordinatorEventProcessor eventProcessor = new MultiThreadedEventProcessor( new LogContext(), "event-processor-", - 2 + 2, + Time.SYSTEM, + mock(GroupCoordinatorRuntimeMetrics.class) ); eventProcessor.close(); @@ -260,7 +346,9 @@ public class MultiThreadedEventProcessorTest { try (MultiThreadedEventProcessor eventProcessor = new MultiThreadedEventProcessor( new LogContext(), "event-processor-", - 1 // Use a single thread to block event in the processor. + 1, // Use a single thread to block event in the processor. + Time.SYSTEM, + mock(GroupCoordinatorRuntimeMetrics.class) )) { AtomicInteger numEventsExecuted = new AtomicInteger(0); @@ -317,4 +405,147 @@ public class MultiThreadedEventProcessorTest { assertEquals(1, numEventsExecuted.get()); } } + + @Test + public void testMetrics() throws Exception { + GroupCoordinatorRuntimeMetrics mockRuntimeMetrics = mock(GroupCoordinatorRuntimeMetrics.class); + Time mockTime = new MockTime(); + AtomicInteger numEventsExecuted = new AtomicInteger(0); + + // Special event which blocks until the latch is released. + FutureEvent blockingEvent = new FutureEvent<>( + new TopicPartition("foo", 0), () -> { + mockTime.sleep(4000L); + return numEventsExecuted.incrementAndGet(); + }, + true, + mockTime.milliseconds() + ); + + try (MultiThreadedEventProcessor eventProcessor = new MultiThreadedEventProcessor( + new LogContext(), + "event-processor-", + 1, // Use a single thread to block event in the processor. + mockTime, + mockRuntimeMetrics, + new MockEventAccumulator<>(mockTime, 500L) + )) { + // Enqueue the blocking event. + eventProcessor.enqueue(blockingEvent); + + // Ensure that the blocking event is executed. + waitForCondition(() -> numEventsExecuted.get() > 0, + "Blocking event not executed."); + + // Enqueue the other event. + FutureEvent otherEvent = new FutureEvent<>( + new TopicPartition("foo", 0), () -> { + mockTime.sleep(5000L); + return numEventsExecuted.incrementAndGet(); + }, + false, + mockTime.milliseconds() + ); + + eventProcessor.enqueue(otherEvent); + + // Pass the time. + mockTime.sleep(3000L); + + // Events should not be completed. + assertFalse(otherEvent.future.isDone()); + + // Release the blocking event to unblock the thread. + blockingEvent.release(); + + // The blocking event should be completed. + blockingEvent.future.get(DEFAULT_MAX_WAIT_MS, TimeUnit.SECONDS); + assertTrue(blockingEvent.future.isDone()); + assertFalse(blockingEvent.future.isCompletedExceptionally()); + + // The other event should also be completed. + otherEvent.future.get(DEFAULT_MAX_WAIT_MS, TimeUnit.SECONDS); + assertTrue(otherEvent.future.isDone()); + assertFalse(otherEvent.future.isCompletedExceptionally()); + assertEquals(2, numEventsExecuted.get()); + + // e1 poll time = 500 + // e1 processing time = 4000 + // e2 enqueue time = 3000 + // e2 poll time = 500 + // e2 processing time = 5000 + + // e1 poll time / e1 poll time + verify(mockRuntimeMetrics, times(1)).recordThreadIdleRatio(1.0); + // e1 poll time + verify(mockRuntimeMetrics, times(1)).recordEventQueueTime(500L); + // e1 processing time + e2 enqueue time + verify(mockRuntimeMetrics, times(1)).recordEventQueueProcessingTime(7000L); + + // Second event (e2) + + // idle ratio = e2 poll time / (e1 poll time + e1 processing time + e2 enqueue time + e2 poll time) + verify(mockRuntimeMetrics, times(1)).recordThreadIdleRatio(500.0 / (500.0 + 7000.0 + 500.0)); + // event queue time = e2 enqueue time + e2 poll time + verify(mockRuntimeMetrics, times(1)).recordEventQueueTime(3500L); + // e2 processing time + verify(mockRuntimeMetrics, times(1)).recordEventQueueProcessingTime(5000L); + } + } + + @Test + public void testRecordThreadIdleRatioTwoThreads() throws Exception { + GroupCoordinatorRuntimeMetrics mockRuntimeMetrics = mock(GroupCoordinatorRuntimeMetrics.class); + + try (CoordinatorEventProcessor eventProcessor = new MultiThreadedEventProcessor( + new LogContext(), + "event-processor-", + 2, + Time.SYSTEM, + mockRuntimeMetrics, + new MockEventAccumulator<>(Time.SYSTEM, 100L) + )) { + List recordedRatios = new ArrayList<>(); + AtomicInteger numEventsExecuted = new AtomicInteger(0); + ArgumentCaptor ratioCaptured = ArgumentCaptor.forClass(Double.class); + doAnswer(invocation -> { + double threadIdleRatio = ratioCaptured.getValue(); + assertTrue(threadIdleRatio > 0.0); + synchronized (recordedRatios) { + recordedRatios.add(threadIdleRatio); + } + return null; + }).when(mockRuntimeMetrics).recordThreadIdleRatio(ratioCaptured.capture()); + + List> events = Arrays.asList( + new FutureEvent<>(new TopicPartition("foo", 0), numEventsExecuted::incrementAndGet), + new FutureEvent<>(new TopicPartition("foo", 1), numEventsExecuted::incrementAndGet), + new FutureEvent<>(new TopicPartition("foo", 2), numEventsExecuted::incrementAndGet), + new FutureEvent<>(new TopicPartition("foo", 0), numEventsExecuted::incrementAndGet), + new FutureEvent<>(new TopicPartition("foo", 1), numEventsExecuted::incrementAndGet), + new FutureEvent<>(new TopicPartition("foo", 2), numEventsExecuted::incrementAndGet), + new FutureEvent<>(new TopicPartition("foo", 2), numEventsExecuted::incrementAndGet) + ); + + events.forEach(eventProcessor::enqueue); + + CompletableFuture.allOf(events + .stream() + .map(FutureEvent::future) + .toArray(CompletableFuture[]::new) + ).get(10, TimeUnit.SECONDS); + + events.forEach(event -> { + assertTrue(event.future.isDone()); + assertFalse(event.future.isCompletedExceptionally()); + }); + + assertEquals(events.size(), numEventsExecuted.get()); + verify(mockRuntimeMetrics, times(7)).recordThreadIdleRatio(anyDouble()); + + assertEquals(7, recordedRatios.size()); + double average = recordedRatios.stream().mapToDouble(Double::doubleValue).sum() / 7; + assertTrue(average > 0.0 && average < 1.0); + } + } }