Browse Source

KAFKA-14519; [1/N] Implement coordinator runtime metrics (#14417)

Implements the following metrics:

kafka.server:type=group-coordinator-metrics,name=num-partitions,state=loading
kafka.server:type=group-coordinator-metrics,name=num-partitions,state=active
kafka.server:type=group-coordinator-metrics,name=num-partitions,state=failed
kafka.server:type=group-coordinator-metrics,name=event-queue-size
kafka.server:type=group-coordinator-metrics,name=partition-load-time-max
kafka.server:type=group-coordinator-metrics,name=partition-load-time-avg
kafka.server:type=group-coordinator-metrics,name=thread-idle-ratio-min
kafka.server:type=group-coordinator-metrics,name=thread-idle-ratio-avg
The PR makes these metrics generic so that in the future the transaction coordinator runtime can implement the same metrics in a similar fashion.

Also, CoordinatorLoaderImpl#load will now return LoadSummary which encapsulates the start time, end time, number of records/bytes.

Co-authored-by: David Jacot <djacot@confluent.io>

Reviewers:  Ritika Reddy <rreddy@confluent.io>, Calvin Liu <caliu@confluent.io>, David Jacot <djacot@confluent.io>, Justine Olshan <jolshan@confluent.io>
pull/12449/merge
Jeff Kim 11 months ago committed by GitHub
parent
commit
abee8f711c
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 1
      build.gradle
  2. 4
      checkstyle/import-control.xml
  3. 2
      checkstyle/suppressions.xml
  4. 21
      core/src/main/scala/kafka/coordinator/group/CoordinatorLoaderImpl.scala
  5. 3
      core/src/main/scala/kafka/server/BrokerServer.scala
  6. 71
      core/src/test/scala/unit/kafka/coordinator/group/CoordinatorLoaderImplTest.scala
  7. 14
      group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorService.java
  8. 68
      group-coordinator/src/main/java/org/apache/kafka/coordinator/group/metrics/CoordinatorRuntimeMetrics.java
  9. 219
      group-coordinator/src/main/java/org/apache/kafka/coordinator/group/metrics/GroupCoordinatorRuntimeMetrics.java
  10. 5
      group-coordinator/src/main/java/org/apache/kafka/coordinator/group/runtime/CoordinatorEvent.java
  11. 45
      group-coordinator/src/main/java/org/apache/kafka/coordinator/group/runtime/CoordinatorLoader.java
  12. 69
      group-coordinator/src/main/java/org/apache/kafka/coordinator/group/runtime/CoordinatorRuntime.java
  13. 57
      group-coordinator/src/main/java/org/apache/kafka/coordinator/group/runtime/MultiThreadedEventProcessor.java
  14. 149
      group-coordinator/src/test/java/org/apache/kafka/coordinator/group/metrics/GroupCoordinatorRuntimeMetricsTest.java
  15. 198
      group-coordinator/src/test/java/org/apache/kafka/coordinator/group/runtime/CoordinatorRuntimeTest.java
  16. 243
      group-coordinator/src/test/java/org/apache/kafka/coordinator/group/runtime/MultiThreadedEventProcessorTest.java

1
build.gradle

@ -1260,6 +1260,7 @@ project(':group-coordinator') { @@ -1260,6 +1260,7 @@ project(':group-coordinator') {
implementation project(':clients')
implementation project(':metadata')
implementation libs.slf4jApi
implementation libs.metrics
testImplementation project(':clients').sourceSets.test.output
testImplementation project(':server-common').sourceSets.test.output

4
checkstyle/import-control.xml

@ -239,6 +239,10 @@ @@ -239,6 +239,10 @@
<allow pkg="org.apache.kafka.server.util"/>
<allow pkg="org.apache.kafka.test" />
<allow pkg="org.apache.kafka.timeline" />
<subpackage name="metrics">
<allow pkg="org.apache.kafka.common.metrics" />
<allow pkg="org.apache.kafka.server.metrics" />
</subpackage>
</subpackage>
</subpackage>

2
checkstyle/suppressions.xml

@ -325,6 +325,8 @@ @@ -325,6 +325,8 @@
files="(ConsumerGroupMember|GroupMetadataManager).java"/>
<suppress checks="(NPathComplexity|MethodLength)"
files="(GroupMetadataManager|ConsumerGroupTest|GroupMetadataManagerTest).java"/>
<suppress checks="NPathComplexity"
files="CoordinatorRuntime.java"/>
<suppress checks="ClassFanOutComplexity"
files="(GroupMetadataManager|GroupMetadataManagerTest|GroupCoordinatorService|GroupCoordinatorServiceTest).java"/>
<suppress checks="ParameterNumber"

21
core/src/main/scala/kafka/coordinator/group/CoordinatorLoaderImpl.scala

@ -21,7 +21,8 @@ import kafka.utils.Logging @@ -21,7 +21,8 @@ import kafka.utils.Logging
import org.apache.kafka.common.TopicPartition
import org.apache.kafka.common.errors.NotLeaderOrFollowerException
import org.apache.kafka.common.record.{FileRecords, MemoryRecords}
import org.apache.kafka.coordinator.group.runtime.CoordinatorLoader.{Deserializer, UnknownRecordTypeException}
import org.apache.kafka.common.utils.Time
import org.apache.kafka.coordinator.group.runtime.CoordinatorLoader.{Deserializer, LoadSummary, UnknownRecordTypeException}
import org.apache.kafka.coordinator.group.runtime.{CoordinatorLoader, CoordinatorPlayback}
import org.apache.kafka.server.util.KafkaScheduler
import org.apache.kafka.storage.internals.log.FetchIsolation
@ -41,6 +42,7 @@ import scala.jdk.CollectionConverters._ @@ -41,6 +42,7 @@ import scala.jdk.CollectionConverters._
* @tparam T The record type.
*/
class CoordinatorLoaderImpl[T](
time: Time,
replicaManager: ReplicaManager,
deserializer: Deserializer[T],
loadBufferSize: Int
@ -59,10 +61,11 @@ class CoordinatorLoaderImpl[T]( @@ -59,10 +61,11 @@ class CoordinatorLoaderImpl[T](
override def load(
tp: TopicPartition,
coordinator: CoordinatorPlayback[T]
): CompletableFuture[Void] = {
val future = new CompletableFuture[Void]()
): CompletableFuture[LoadSummary] = {
val future = new CompletableFuture[LoadSummary]()
val startTimeMs = time.milliseconds()
val result = scheduler.scheduleOnce(s"Load coordinator from $tp",
() => doLoad(tp, coordinator, future))
() => doLoad(tp, coordinator, future, startTimeMs))
if (result.isCancelled) {
future.completeExceptionally(new RuntimeException("Coordinator loader is closed."))
}
@ -72,7 +75,8 @@ class CoordinatorLoaderImpl[T]( @@ -72,7 +75,8 @@ class CoordinatorLoaderImpl[T](
private def doLoad(
tp: TopicPartition,
coordinator: CoordinatorPlayback[T],
future: CompletableFuture[Void]
future: CompletableFuture[LoadSummary],
startTimeMs: Long
): Unit = {
try {
replicaManager.getLog(tp) match {
@ -92,6 +96,8 @@ class CoordinatorLoaderImpl[T]( @@ -92,6 +96,8 @@ class CoordinatorLoaderImpl[T](
// the log end offset but the log is empty. This could happen with compacted topics.
var readAtLeastOneRecord = true
var numRecords = 0
var numBytes = 0
while (currentOffset < logEndOffset && readAtLeastOneRecord && isRunning.get) {
val fetchDataInfo = log.read(
startOffset = currentOffset,
@ -131,6 +137,7 @@ class CoordinatorLoaderImpl[T]( @@ -131,6 +137,7 @@ class CoordinatorLoaderImpl[T](
throw new IllegalStateException("Control batches are not supported yet.")
} else {
batch.asScala.foreach { record =>
numRecords = numRecords + 1
try {
coordinator.replay(deserializer.deserialize(record.key, record.value))
} catch {
@ -143,10 +150,12 @@ class CoordinatorLoaderImpl[T]( @@ -143,10 +150,12 @@ class CoordinatorLoaderImpl[T](
currentOffset = batch.nextOffset
}
numBytes = numBytes + memoryRecords.sizeInBytes()
}
val endTimeMs = time.milliseconds()
if (isRunning.get) {
future.complete(null)
future.complete(new LoadSummary(startTimeMs, endTimeMs, numRecords, numBytes))
} else {
future.completeExceptionally(new RuntimeException("Coordinator loader is closed."))
}

3
core/src/main/scala/kafka/server/BrokerServer.scala

@ -36,6 +36,7 @@ import org.apache.kafka.common.security.token.delegation.internals.DelegationTok @@ -36,6 +36,7 @@ import org.apache.kafka.common.security.token.delegation.internals.DelegationTok
import org.apache.kafka.common.utils.{LogContext, Time}
import org.apache.kafka.common.{ClusterResource, KafkaException, TopicPartition}
import org.apache.kafka.coordinator.group
import org.apache.kafka.coordinator.group.metrics.GroupCoordinatorRuntimeMetrics
import org.apache.kafka.coordinator.group.util.SystemTimerReaper
import org.apache.kafka.coordinator.group.{GroupCoordinator, GroupCoordinatorConfig, GroupCoordinatorService, RecordSerde}
import org.apache.kafka.image.publisher.MetadataPublisher
@ -531,6 +532,7 @@ class BrokerServer( @@ -531,6 +532,7 @@ class BrokerServer(
new SystemTimer("group-coordinator")
)
val loader = new CoordinatorLoaderImpl[group.Record](
time,
replicaManager,
serde,
config.offsetsLoadBufferSize
@ -546,6 +548,7 @@ class BrokerServer( @@ -546,6 +548,7 @@ class BrokerServer(
.withTimer(timer)
.withLoader(loader)
.withWriter(writer)
.withCoordinatorRuntimeMetrics(new GroupCoordinatorRuntimeMetrics(metrics))
.build()
} else {
GroupCoordinatorAdapter(

71
core/src/test/scala/unit/kafka/coordinator/group/CoordinatorLoaderImplTest.scala

@ -22,14 +22,16 @@ import kafka.utils.TestUtils @@ -22,14 +22,16 @@ import kafka.utils.TestUtils
import org.apache.kafka.common.TopicPartition
import org.apache.kafka.common.errors.NotLeaderOrFollowerException
import org.apache.kafka.common.record.{CompressionType, FileRecords, MemoryRecords, SimpleRecord}
import org.apache.kafka.common.utils.{MockTime, Time}
import org.apache.kafka.coordinator.group.runtime.CoordinatorLoader.UnknownRecordTypeException
import org.apache.kafka.coordinator.group.runtime.{CoordinatorLoader, CoordinatorPlayback}
import org.apache.kafka.storage.internals.log.{FetchDataInfo, FetchIsolation, LogOffsetMetadata}
import org.apache.kafka.test.TestUtils.assertFutureThrows
import org.junit.jupiter.api.Assertions.{assertEquals, assertNull}
import org.junit.jupiter.api.Assertions.{assertEquals, assertNotNull}
import org.junit.jupiter.api.{Test, Timeout}
import org.mockito.{ArgumentCaptor, ArgumentMatchers}
import org.mockito.Mockito.{mock, verify, when}
import org.mockito.invocation.InvocationOnMock
import java.nio.ByteBuffer
import java.nio.charset.Charset
@ -54,6 +56,7 @@ class CoordinatorLoaderImplTest { @@ -54,6 +56,7 @@ class CoordinatorLoaderImplTest {
val coordinator = mock(classOf[CoordinatorPlayback[(String, String)]])
TestUtils.resource(new CoordinatorLoaderImpl[(String, String)](
time = Time.SYSTEM,
replicaManager = replicaManager,
deserializer = serde,
loadBufferSize = 1000
@ -73,6 +76,7 @@ class CoordinatorLoaderImplTest { @@ -73,6 +76,7 @@ class CoordinatorLoaderImplTest {
val coordinator = mock(classOf[CoordinatorPlayback[(String, String)]])
TestUtils.resource(new CoordinatorLoaderImpl[(String, String)](
time = Time.SYSTEM,
replicaManager = replicaManager,
deserializer = serde,
loadBufferSize = 1000
@ -93,6 +97,7 @@ class CoordinatorLoaderImplTest { @@ -93,6 +97,7 @@ class CoordinatorLoaderImplTest {
val coordinator = mock(classOf[CoordinatorPlayback[(String, String)]])
TestUtils.resource(new CoordinatorLoaderImpl[(String, String)](
time = Time.SYSTEM,
replicaManager = replicaManager,
deserializer = serde,
loadBufferSize = 1000
@ -126,7 +131,7 @@ class CoordinatorLoaderImplTest { @@ -126,7 +131,7 @@ class CoordinatorLoaderImplTest {
minOneMessage = true
)).thenReturn(readResult2)
assertNull(loader.load(tp, coordinator).get(10, TimeUnit.SECONDS))
assertNotNull(loader.load(tp, coordinator).get(10, TimeUnit.SECONDS))
verify(coordinator).replay(("k1", "v1"))
verify(coordinator).replay(("k2", "v2"))
@ -145,6 +150,7 @@ class CoordinatorLoaderImplTest { @@ -145,6 +150,7 @@ class CoordinatorLoaderImplTest {
val coordinator = mock(classOf[CoordinatorPlayback[(String, String)]])
TestUtils.resource(new CoordinatorLoaderImpl[(String, String)](
time = Time.SYSTEM,
replicaManager = replicaManager,
deserializer = serde,
loadBufferSize = 1000
@ -187,6 +193,7 @@ class CoordinatorLoaderImplTest { @@ -187,6 +193,7 @@ class CoordinatorLoaderImplTest {
val coordinator = mock(classOf[CoordinatorPlayback[(String, String)]])
TestUtils.resource(new CoordinatorLoaderImpl[(String, String)](
time = Time.SYSTEM,
replicaManager = replicaManager,
deserializer = serde,
loadBufferSize = 1000
@ -226,6 +233,7 @@ class CoordinatorLoaderImplTest { @@ -226,6 +233,7 @@ class CoordinatorLoaderImplTest {
val coordinator = mock(classOf[CoordinatorPlayback[(String, String)]])
TestUtils.resource(new CoordinatorLoaderImpl[(String, String)](
time = Time.SYSTEM,
replicaManager = replicaManager,
deserializer = serde,
loadBufferSize = 1000
@ -266,6 +274,7 @@ class CoordinatorLoaderImplTest { @@ -266,6 +274,7 @@ class CoordinatorLoaderImplTest {
val coordinator = mock(classOf[CoordinatorPlayback[(String, String)]])
TestUtils.resource(new CoordinatorLoaderImpl[(String, String)](
time = Time.SYSTEM,
replicaManager = replicaManager,
deserializer = serde,
loadBufferSize = 1000
@ -283,7 +292,63 @@ class CoordinatorLoaderImplTest { @@ -283,7 +292,63 @@ class CoordinatorLoaderImplTest {
minOneMessage = true
)).thenReturn(readResult)
assertNull(loader.load(tp, coordinator).get(10, TimeUnit.SECONDS))
assertNotNull(loader.load(tp, coordinator).get(10, TimeUnit.SECONDS))
}
}
@Test
def testLoadSummary(): Unit = {
val tp = new TopicPartition("foo", 0)
val replicaManager = mock(classOf[ReplicaManager])
val serde = new StringKeyValueDeserializer
val log = mock(classOf[UnifiedLog])
val coordinator = mock(classOf[CoordinatorPlayback[(String, String)]])
val time = new MockTime()
TestUtils.resource(new CoordinatorLoaderImpl[(String, String)](
time,
replicaManager = replicaManager,
deserializer = serde,
loadBufferSize = 1000
)) { loader =>
val startTimeMs = time.milliseconds()
when(replicaManager.getLog(tp)).thenReturn(Some(log))
when(log.logStartOffset).thenReturn(0L)
when(replicaManager.getLogEndOffset(tp)).thenReturn(Some(5L))
val readResult1 = logReadResult(startOffset = 0, records = Seq(
new SimpleRecord("k1".getBytes, "v1".getBytes),
new SimpleRecord("k2".getBytes, "v2".getBytes)
))
when(log.read(
startOffset = 0L,
maxLength = 1000,
isolation = FetchIsolation.LOG_END,
minOneMessage = true
)).thenAnswer((_: InvocationOnMock) => {
time.sleep(1000)
readResult1
})
val readResult2 = logReadResult(startOffset = 2, records = Seq(
new SimpleRecord("k3".getBytes, "v3".getBytes),
new SimpleRecord("k4".getBytes, "v4".getBytes),
new SimpleRecord("k5".getBytes, "v5".getBytes)
))
when(log.read(
startOffset = 2L,
maxLength = 1000,
isolation = FetchIsolation.LOG_END,
minOneMessage = true
)).thenReturn(readResult2)
val summary = loader.load(tp, coordinator).get(10, TimeUnit.SECONDS)
assertEquals(startTimeMs, summary.startTimeMs())
assertEquals(startTimeMs + 1000, summary.endTimeMs())
assertEquals(5, summary.numRecords())
assertEquals(readResult1.records.sizeInBytes() + readResult2.records.sizeInBytes(), summary.numBytes())
}
}

14
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorService.java

@ -62,6 +62,7 @@ import org.apache.kafka.common.utils.BufferSupplier; @@ -62,6 +62,7 @@ import org.apache.kafka.common.utils.BufferSupplier;
import org.apache.kafka.common.utils.LogContext;
import org.apache.kafka.common.utils.Time;
import org.apache.kafka.common.utils.Utils;
import org.apache.kafka.coordinator.group.metrics.CoordinatorRuntimeMetrics;
import org.apache.kafka.coordinator.group.runtime.CoordinatorShardBuilderSupplier;
import org.apache.kafka.coordinator.group.runtime.CoordinatorEventProcessor;
import org.apache.kafka.coordinator.group.runtime.CoordinatorLoader;
@ -102,6 +103,7 @@ public class GroupCoordinatorService implements GroupCoordinator { @@ -102,6 +103,7 @@ public class GroupCoordinatorService implements GroupCoordinator {
private CoordinatorLoader<Record> loader;
private Time time;
private Timer timer;
private CoordinatorRuntimeMetrics coordinatorRuntimeMetrics;
public Builder(
int nodeId,
@ -131,6 +133,11 @@ public class GroupCoordinatorService implements GroupCoordinator { @@ -131,6 +133,11 @@ public class GroupCoordinatorService implements GroupCoordinator {
return this;
}
public Builder withCoordinatorRuntimeMetrics(CoordinatorRuntimeMetrics coordinatorRuntimeMetrics) {
this.coordinatorRuntimeMetrics = coordinatorRuntimeMetrics;
return this;
}
public GroupCoordinatorService build() {
if (config == null)
throw new IllegalArgumentException("Config must be set.");
@ -142,6 +149,8 @@ public class GroupCoordinatorService implements GroupCoordinator { @@ -142,6 +149,8 @@ public class GroupCoordinatorService implements GroupCoordinator {
throw new IllegalArgumentException("Time must be set.");
if (timer == null)
throw new IllegalArgumentException("Timer must be set.");
if (coordinatorRuntimeMetrics == null)
throw new IllegalArgumentException("CoordinatorRuntimeMetrics must be set.");
String logPrefix = String.format("GroupCoordinator id=%d", nodeId);
LogContext logContext = new LogContext(String.format("[%s] ", logPrefix));
@ -152,7 +161,9 @@ public class GroupCoordinatorService implements GroupCoordinator { @@ -152,7 +161,9 @@ public class GroupCoordinatorService implements GroupCoordinator {
CoordinatorEventProcessor processor = new MultiThreadedEventProcessor(
logContext,
"group-coordinator-event-processor-",
config.numThreads
config.numThreads,
time,
coordinatorRuntimeMetrics
);
CoordinatorRuntime<GroupCoordinatorShard, Record> runtime =
@ -166,6 +177,7 @@ public class GroupCoordinatorService implements GroupCoordinator { @@ -166,6 +177,7 @@ public class GroupCoordinatorService implements GroupCoordinator {
.withLoader(loader)
.withCoordinatorShardBuilderSupplier(supplier)
.withTime(time)
.withCoordinatorRuntimeMetrics(coordinatorRuntimeMetrics)
.build();
return new GroupCoordinatorService(

68
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/metrics/CoordinatorRuntimeMetrics.java

@ -0,0 +1,68 @@ @@ -0,0 +1,68 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.coordinator.group.metrics;
import org.apache.kafka.coordinator.group.runtime.CoordinatorRuntime.CoordinatorState;
import java.util.function.Supplier;
/**
* Used by the group and transaction coordinator runtimes, the metrics suite holds partition state gauges and sensors.
*/
public interface CoordinatorRuntimeMetrics extends AutoCloseable {
/**
* Called when the partition state changes.
* @param oldState The old state.
* @param newState The new state to transition to.
*/
void recordPartitionStateChange(CoordinatorState oldState, CoordinatorState newState);
/**
* Record the partition load metric.
* @param startTimeMs The partition load start time.
* @param endTimeMs The partition load end time.
*/
void recordPartitionLoadSensor(long startTimeMs, long endTimeMs);
/**
* Update the event queue time.
*
* @param durationMs The queue time.
*/
void recordEventQueueTime(long durationMs);
/**
* Update the event queue processing time.
*
* @param durationMs The event processing time.
*/
void recordEventQueueProcessingTime(long durationMs);
/**
* Record the thread idle ratio.
* @param ratio The idle ratio.
*/
void recordThreadIdleRatio(double ratio);
/**
* Register the event queue size gauge.
*
* @param sizeSupplier The size supplier.
*/
void registerEventQueueSizeGauge(Supplier<Integer> sizeSupplier);
}

219
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/metrics/GroupCoordinatorRuntimeMetrics.java

@ -0,0 +1,219 @@ @@ -0,0 +1,219 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.coordinator.group.metrics;
import org.apache.kafka.common.MetricName;
import org.apache.kafka.common.metrics.Gauge;
import org.apache.kafka.common.metrics.Metrics;
import org.apache.kafka.common.metrics.Sensor;
import org.apache.kafka.common.metrics.stats.Avg;
import org.apache.kafka.common.metrics.stats.Max;
import org.apache.kafka.common.metrics.stats.Min;
import org.apache.kafka.coordinator.group.runtime.CoordinatorRuntime.CoordinatorState;
import java.util.Arrays;
import java.util.Objects;
import java.util.concurrent.atomic.AtomicLong;
import java.util.function.Supplier;
public class GroupCoordinatorRuntimeMetrics implements CoordinatorRuntimeMetrics {
/**
* The metrics group.
*/
public static final String METRICS_GROUP = "group-coordinator-metrics";
/**
* The partition count metric name.
*/
public static final String NUM_PARTITIONS_METRIC_NAME = "num-partitions";
/**
* Metric to count the number of partitions in Loading state.
*/
private final MetricName numPartitionsLoading;
private final AtomicLong numPartitionsLoadingCounter = new AtomicLong(0);
/**
* Metric to count the number of partitions in Active state.
*/
private final MetricName numPartitionsActive;
private final AtomicLong numPartitionsActiveCounter = new AtomicLong(0);
/**
* Metric to count the number of partitions in Failed state.
*/
private final MetricName numPartitionsFailed;
private final AtomicLong numPartitionsFailedCounter = new AtomicLong(0);
/**
* Metric to count the size of the processor queue.
*/
private final MetricName eventQueueSize;
/**
* The Kafka metrics registry.
*/
private final Metrics metrics;
/**
* The partition load sensor.
*/
private Sensor partitionLoadSensor;
/**
* The thread idle sensor.
*/
private Sensor threadIdleRatioSensor;
public GroupCoordinatorRuntimeMetrics(Metrics metrics) {
this.metrics = Objects.requireNonNull(metrics);
this.numPartitionsLoading = kafkaMetricName(
NUM_PARTITIONS_METRIC_NAME,
"The number of partitions in Loading state.",
"state", "loading"
);
this.numPartitionsActive = kafkaMetricName(
NUM_PARTITIONS_METRIC_NAME,
"The number of partitions in Active state.",
"state", "active"
);
this.numPartitionsFailed = kafkaMetricName(
NUM_PARTITIONS_METRIC_NAME,
"The number of partitions in Failed state.",
"state", "failed"
);
this.eventQueueSize = kafkaMetricName("event-queue-size", "The event accumulator queue size.");
metrics.addMetric(numPartitionsLoading, (Gauge<Long>) (config, now) -> numPartitionsLoadingCounter.get());
metrics.addMetric(numPartitionsActive, (Gauge<Long>) (config, now) -> numPartitionsActiveCounter.get());
metrics.addMetric(numPartitionsFailed, (Gauge<Long>) (config, now) -> numPartitionsFailedCounter.get());
this.partitionLoadSensor = metrics.sensor("GroupPartitionLoadTime");
this.partitionLoadSensor.add(
metrics.metricName(
"partition-load-time-max",
METRICS_GROUP,
"The max time it took to load the partitions in the last 30 sec."
), new Max());
this.partitionLoadSensor.add(
metrics.metricName(
"partition-load-time-avg",
METRICS_GROUP,
"The average time it took to load the partitions in the last 30 sec."
), new Avg());
this.threadIdleRatioSensor = metrics.sensor("ThreadIdleRatio");
this.threadIdleRatioSensor.add(
metrics.metricName(
"thread-idle-ratio-min",
METRICS_GROUP,
"The minimum thread idle ratio over the last 30 seconds."
), new Min());
this.threadIdleRatioSensor.add(
metrics.metricName(
"thread-idle-ratio-avg",
METRICS_GROUP,
"The average thread idle ratio over the last 30 seconds."
), new Avg());
}
/**
* Retrieve the kafka metric name.
*
* @param name The name of the metric.
*
* @return The kafka metric name.
*/
private MetricName kafkaMetricName(String name, String description, String... keyValue) {
return metrics.metricName(name, METRICS_GROUP, description, keyValue);
}
@Override
public void close() {
Arrays.asList(
numPartitionsLoading,
numPartitionsActive,
numPartitionsFailed,
eventQueueSize
).forEach(metrics::removeMetric);
metrics.removeSensor(partitionLoadSensor.name());
metrics.removeSensor(threadIdleRatioSensor.name());
}
/**
* Called when the partition state changes. Decrement the old state and increment the new state.
*
* @param oldState The old state.
* @param newState The new state to transition to.
*/
@Override
public void recordPartitionStateChange(CoordinatorState oldState, CoordinatorState newState) {
switch (oldState) {
case INITIAL:
case CLOSED:
break;
case LOADING:
numPartitionsLoadingCounter.decrementAndGet();
break;
case ACTIVE:
numPartitionsActiveCounter.decrementAndGet();
break;
case FAILED:
numPartitionsFailedCounter.decrementAndGet();
}
switch (newState) {
case INITIAL:
case CLOSED:
break;
case LOADING:
numPartitionsLoadingCounter.incrementAndGet();
break;
case ACTIVE:
numPartitionsActiveCounter.incrementAndGet();
break;
case FAILED:
numPartitionsFailedCounter.incrementAndGet();
}
}
@Override
public void recordPartitionLoadSensor(long startTimeMs, long endTimeMs) {
this.partitionLoadSensor.record(endTimeMs - startTimeMs, endTimeMs, false);
}
@Override
public void recordEventQueueTime(long durationMs) { }
@Override
public void recordEventQueueProcessingTime(long durationMs) { }
@Override
public void recordThreadIdleRatio(double ratio) {
threadIdleRatioSensor.record(ratio);
}
@Override
public void registerEventQueueSizeGauge(Supplier<Integer> sizeSupplier) {
metrics.addMetric(eventQueueSize, (Gauge<Long>) (config, now) -> (long) sizeSupplier.get());
}
}

5
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/runtime/CoordinatorEvent.java

@ -35,4 +35,9 @@ public interface CoordinatorEvent extends EventAccumulator.Event<TopicPartition> @@ -35,4 +35,9 @@ public interface CoordinatorEvent extends EventAccumulator.Event<TopicPartition>
* @param exception An exception if the processing of the event failed or null otherwise.
*/
void complete(Throwable exception);
/**
* @return The created time in milliseconds.
*/
long createdTimeMs();
}

45
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/runtime/CoordinatorLoader.java

@ -46,6 +46,49 @@ public interface CoordinatorLoader<U> extends AutoCloseable { @@ -46,6 +46,49 @@ public interface CoordinatorLoader<U> extends AutoCloseable {
}
}
/**
* Object that is returned as part of the future from load(). Holds the partition load time and the
* end time.
*/
class LoadSummary {
private final long startTimeMs;
private final long endTimeMs;
private final long numRecords;
private final long numBytes;
public LoadSummary(long startTimeMs, long endTimeMs, long numRecords, long numBytes) {
this.startTimeMs = startTimeMs;
this.endTimeMs = endTimeMs;
this.numRecords = numRecords;
this.numBytes = numBytes;
}
public long startTimeMs() {
return startTimeMs;
}
public long endTimeMs() {
return endTimeMs;
}
public long numRecords() {
return numRecords;
}
public long numBytes() {
return numBytes;
}
@Override
public String toString() {
return "LoadSummary(" +
"startTimeMs=" + startTimeMs +
", endTimeMs=" + endTimeMs +
", numRecords=" + numRecords +
", numBytes=" + numBytes + ")";
}
}
/**
* Deserializer to translates bytes to T.
*
@ -69,7 +112,7 @@ public interface CoordinatorLoader<U> extends AutoCloseable { @@ -69,7 +112,7 @@ public interface CoordinatorLoader<U> extends AutoCloseable {
* @param tp The TopicPartition to read from.
* @param coordinator The object to apply records to.
*/
CompletableFuture<Void> load(
CompletableFuture<LoadSummary> load(
TopicPartition tp,
CoordinatorPlayback<U> coordinator
);

69
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/runtime/CoordinatorRuntime.java

@ -24,6 +24,7 @@ import org.apache.kafka.common.protocol.Errors; @@ -24,6 +24,7 @@ import org.apache.kafka.common.protocol.Errors;
import org.apache.kafka.common.utils.LogContext;
import org.apache.kafka.common.utils.Time;
import org.apache.kafka.common.utils.Utils;
import org.apache.kafka.coordinator.group.metrics.CoordinatorRuntimeMetrics;
import org.apache.kafka.deferred.DeferredEvent;
import org.apache.kafka.deferred.DeferredEventQueue;
import org.apache.kafka.image.MetadataDelta;
@ -89,6 +90,7 @@ public class CoordinatorRuntime<S extends CoordinatorShard<U>, U> implements Aut @@ -89,6 +90,7 @@ public class CoordinatorRuntime<S extends CoordinatorShard<U>, U> implements Aut
private CoordinatorShardBuilderSupplier<S, U> coordinatorShardBuilderSupplier;
private Time time = Time.SYSTEM;
private Timer timer;
private CoordinatorRuntimeMetrics runtimeMetrics;
public Builder<S, U> withLogPrefix(String logPrefix) {
this.logPrefix = logPrefix;
@ -130,6 +132,11 @@ public class CoordinatorRuntime<S extends CoordinatorShard<U>, U> implements Aut @@ -130,6 +132,11 @@ public class CoordinatorRuntime<S extends CoordinatorShard<U>, U> implements Aut
return this;
}
public Builder<S, U> withCoordinatorRuntimeMetrics(CoordinatorRuntimeMetrics runtimeMetrics) {
this.runtimeMetrics = runtimeMetrics;
return this;
}
public CoordinatorRuntime<S, U> build() {
if (logPrefix == null)
logPrefix = "";
@ -147,6 +154,8 @@ public class CoordinatorRuntime<S extends CoordinatorShard<U>, U> implements Aut @@ -147,6 +154,8 @@ public class CoordinatorRuntime<S extends CoordinatorShard<U>, U> implements Aut
throw new IllegalArgumentException("Time must be set.");
if (timer == null)
throw new IllegalArgumentException("Timer must be set.");
if (runtimeMetrics == null)
throw new IllegalArgumentException("CoordinatorRuntimeMetrics must be set.");
return new CoordinatorRuntime<>(
logPrefix,
@ -156,7 +165,8 @@ public class CoordinatorRuntime<S extends CoordinatorShard<U>, U> implements Aut @@ -156,7 +165,8 @@ public class CoordinatorRuntime<S extends CoordinatorShard<U>, U> implements Aut
loader,
coordinatorShardBuilderSupplier,
time,
timer
timer,
runtimeMetrics
);
}
}
@ -164,7 +174,7 @@ public class CoordinatorRuntime<S extends CoordinatorShard<U>, U> implements Aut @@ -164,7 +174,7 @@ public class CoordinatorRuntime<S extends CoordinatorShard<U>, U> implements Aut
/**
* The various state that a coordinator for a partition can be in.
*/
enum CoordinatorState {
public enum CoordinatorState {
/**
* Initial state when a coordinator is created.
*/
@ -501,6 +511,7 @@ public class CoordinatorRuntime<S extends CoordinatorShard<U>, U> implements Aut @@ -501,6 +511,7 @@ public class CoordinatorRuntime<S extends CoordinatorShard<U>, U> implements Aut
throw new IllegalStateException("Cannot transition from " + state + " to " + newState);
}
CoordinatorState oldState = state;
log.debug("Transition from {} to {}.", state, newState);
switch (newState) {
case LOADING:
@ -537,6 +548,8 @@ public class CoordinatorRuntime<S extends CoordinatorShard<U>, U> implements Aut @@ -537,6 +548,8 @@ public class CoordinatorRuntime<S extends CoordinatorShard<U>, U> implements Aut
default:
throw new IllegalArgumentException("Transitioning to " + newState + " is not supported.");
}
runtimeMetrics.recordPartitionStateChange(oldState, state);
}
/**
@ -608,6 +621,11 @@ public class CoordinatorRuntime<S extends CoordinatorShard<U>, U> implements Aut @@ -608,6 +621,11 @@ public class CoordinatorRuntime<S extends CoordinatorShard<U>, U> implements Aut
*/
CoordinatorResult<T, U> result;
/**
* The time this event was created.
*/
private final long createdTimeMs;
/**
* Constructor.
*
@ -624,6 +642,7 @@ public class CoordinatorRuntime<S extends CoordinatorShard<U>, U> implements Aut @@ -624,6 +642,7 @@ public class CoordinatorRuntime<S extends CoordinatorShard<U>, U> implements Aut
this.name = name;
this.op = op;
this.future = new CompletableFuture<>();
this.createdTimeMs = time.milliseconds();
}
/**
@ -709,6 +728,11 @@ public class CoordinatorRuntime<S extends CoordinatorShard<U>, U> implements Aut @@ -709,6 +728,11 @@ public class CoordinatorRuntime<S extends CoordinatorShard<U>, U> implements Aut
}
}
@Override
public long createdTimeMs() {
return this.createdTimeMs;
}
@Override
public String toString() {
return "CoordinatorWriteEvent(name=" + name + ")";
@ -768,6 +792,11 @@ public class CoordinatorRuntime<S extends CoordinatorShard<U>, U> implements Aut @@ -768,6 +792,11 @@ public class CoordinatorRuntime<S extends CoordinatorShard<U>, U> implements Aut
*/
T response;
/**
* The time this event was created.
*/
private final long createdTimeMs;
/**
* Constructor.
*
@ -784,6 +813,7 @@ public class CoordinatorRuntime<S extends CoordinatorShard<U>, U> implements Aut @@ -784,6 +813,7 @@ public class CoordinatorRuntime<S extends CoordinatorShard<U>, U> implements Aut
this.name = name;
this.op = op;
this.future = new CompletableFuture<>();
this.createdTimeMs = time.milliseconds();
}
/**
@ -832,6 +862,11 @@ public class CoordinatorRuntime<S extends CoordinatorShard<U>, U> implements Aut @@ -832,6 +862,11 @@ public class CoordinatorRuntime<S extends CoordinatorShard<U>, U> implements Aut
}
}
@Override
public long createdTimeMs() {
return this.createdTimeMs;
}
@Override
public String toString() {
return "CoordinatorReadEvent(name=" + name + ")";
@ -857,6 +892,11 @@ public class CoordinatorRuntime<S extends CoordinatorShard<U>, U> implements Aut @@ -857,6 +892,11 @@ public class CoordinatorRuntime<S extends CoordinatorShard<U>, U> implements Aut
*/
final Runnable op;
/**
* The time this event was created.
*/
private final long createdTimeMs;
/**
* Constructor.
*
@ -872,6 +912,7 @@ public class CoordinatorRuntime<S extends CoordinatorShard<U>, U> implements Aut @@ -872,6 +912,7 @@ public class CoordinatorRuntime<S extends CoordinatorShard<U>, U> implements Aut
this.tp = tp;
this.name = name;
this.op = op;
this.createdTimeMs = time.milliseconds();
}
/**
@ -907,6 +948,11 @@ public class CoordinatorRuntime<S extends CoordinatorShard<U>, U> implements Aut @@ -907,6 +948,11 @@ public class CoordinatorRuntime<S extends CoordinatorShard<U>, U> implements Aut
}
}
@Override
public long createdTimeMs() {
return this.createdTimeMs;
}
@Override
public String toString() {
return "InternalEvent(name=" + name + ")";
@ -995,6 +1041,11 @@ public class CoordinatorRuntime<S extends CoordinatorShard<U>, U> implements Aut @@ -995,6 +1041,11 @@ public class CoordinatorRuntime<S extends CoordinatorShard<U>, U> implements Aut
*/
private final CoordinatorShardBuilderSupplier<S, U> coordinatorShardBuilderSupplier;
/**
* The coordinator runtime metrics.
*/
private final CoordinatorRuntimeMetrics runtimeMetrics;
/**
* Atomic boolean indicating whether the runtime is running.
*/
@ -1025,7 +1076,8 @@ public class CoordinatorRuntime<S extends CoordinatorShard<U>, U> implements Aut @@ -1025,7 +1076,8 @@ public class CoordinatorRuntime<S extends CoordinatorShard<U>, U> implements Aut
CoordinatorLoader<U> loader,
CoordinatorShardBuilderSupplier<S, U> coordinatorShardBuilderSupplier,
Time time,
Timer timer
Timer timer,
CoordinatorRuntimeMetrics runtimeMetrics
) {
this.logPrefix = logPrefix;
this.logContext = logContext;
@ -1038,6 +1090,7 @@ public class CoordinatorRuntime<S extends CoordinatorShard<U>, U> implements Aut @@ -1038,6 +1090,7 @@ public class CoordinatorRuntime<S extends CoordinatorShard<U>, U> implements Aut
this.highWatermarklistener = new HighWatermarkListener();
this.loader = loader;
this.coordinatorShardBuilderSupplier = coordinatorShardBuilderSupplier;
this.runtimeMetrics = runtimeMetrics;
}
/**
@ -1242,7 +1295,7 @@ public class CoordinatorRuntime<S extends CoordinatorShard<U>, U> implements Aut @@ -1242,7 +1295,7 @@ public class CoordinatorRuntime<S extends CoordinatorShard<U>, U> implements Aut
case FAILED:
case INITIAL:
context.transitionTo(CoordinatorState.LOADING);
loader.load(tp, context.coordinator).whenComplete((state, exception) -> {
loader.load(tp, context.coordinator).whenComplete((summary, exception) -> {
scheduleInternalOperation("CompleteLoad(tp=" + tp + ", epoch=" + partitionEpoch + ")", tp, () -> {
withContextOrThrow(tp, ctx -> {
if (ctx.state != CoordinatorState.LOADING) {
@ -1254,8 +1307,11 @@ public class CoordinatorRuntime<S extends CoordinatorShard<U>, U> implements Aut @@ -1254,8 +1307,11 @@ public class CoordinatorRuntime<S extends CoordinatorShard<U>, U> implements Aut
try {
if (exception != null) throw exception;
ctx.transitionTo(CoordinatorState.ACTIVE);
log.info("Finished loading of metadata from {} with epoch {}.",
tp, partitionEpoch
if (summary != null) {
runtimeMetrics.recordPartitionLoadSensor(summary.startTimeMs(), summary.endTimeMs());
}
log.info("Finished loading of metadata from {} with epoch {} and LoadSummary={}.",
tp, partitionEpoch, summary
);
} catch (Throwable ex) {
log.error("Failed to load metadata from {} with epoch {} due to {}.",
@ -1373,6 +1429,7 @@ public class CoordinatorRuntime<S extends CoordinatorShard<U>, U> implements Aut @@ -1373,6 +1429,7 @@ public class CoordinatorRuntime<S extends CoordinatorShard<U>, U> implements Aut
context.transitionTo(CoordinatorState.CLOSED);
});
coordinators.clear();
Utils.closeQuietly(runtimeMetrics, "runtime metrics");
log.info("Coordinator runtime closed.");
}
}

57
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/runtime/MultiThreadedEventProcessor.java

@ -18,9 +18,12 @@ package org.apache.kafka.coordinator.group.runtime; @@ -18,9 +18,12 @@ package org.apache.kafka.coordinator.group.runtime;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.utils.LogContext;
import org.apache.kafka.common.utils.Time;
import org.apache.kafka.coordinator.group.metrics.CoordinatorRuntimeMetrics;
import org.slf4j.Logger;
import java.util.List;
import java.util.Objects;
import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
@ -52,21 +55,50 @@ public class MultiThreadedEventProcessor implements CoordinatorEventProcessor { @@ -52,21 +55,50 @@ public class MultiThreadedEventProcessor implements CoordinatorEventProcessor {
*/
private volatile boolean shuttingDown;
/**
* The coordinator runtime metrics.
*/
private final CoordinatorRuntimeMetrics metrics;
/**
* The time.
*/
private final Time time;
public MultiThreadedEventProcessor(
LogContext logContext,
String threadPrefix,
int numThreads,
Time time,
CoordinatorRuntimeMetrics metrics
) {
this(logContext, threadPrefix, numThreads, time, metrics, new EventAccumulator<>());
}
/**
* Constructor.
*
* @param logContext The log context.
* @param threadPrefix The thread prefix.
* @param numThreads The number of threads.
* @param metrics The coordinator runtime metrics.
* @param time The time.
* @param eventAccumulator The event accumulator.
*/
public MultiThreadedEventProcessor(
LogContext logContext,
String threadPrefix,
int numThreads
int numThreads,
Time time,
CoordinatorRuntimeMetrics metrics,
EventAccumulator<TopicPartition, CoordinatorEvent> eventAccumulator
) {
this.log = logContext.logger(MultiThreadedEventProcessor.class);
this.shuttingDown = false;
this.accumulator = new EventAccumulator<>();
this.accumulator = eventAccumulator;
this.time = Objects.requireNonNull(time);
this.metrics = Objects.requireNonNull(metrics);
this.metrics.registerEventQueueSizeGauge(accumulator::size);
this.threads = IntStream.range(0, numThreads).mapToObj(threadId ->
new EventProcessorThread(
threadPrefix + threadId
@ -81,6 +113,9 @@ public class MultiThreadedEventProcessor implements CoordinatorEventProcessor { @@ -81,6 +113,9 @@ public class MultiThreadedEventProcessor implements CoordinatorEventProcessor {
*/
private class EventProcessorThread extends Thread {
private final Logger log;
private long pollStartMs;
private long timeSinceLastPollMs;
private long lastPollMs;
EventProcessorThread(
String name
@ -92,11 +127,16 @@ public class MultiThreadedEventProcessor implements CoordinatorEventProcessor { @@ -92,11 +127,16 @@ public class MultiThreadedEventProcessor implements CoordinatorEventProcessor {
private void handleEvents() {
while (!shuttingDown) {
recordPollStartTime(time.milliseconds());
CoordinatorEvent event = accumulator.poll();
recordPollEndTime(time.milliseconds());
if (event != null) {
try {
log.debug("Executing event: {}.", event);
long dequeuedTimeMs = time.milliseconds();
metrics.recordEventQueueTime(dequeuedTimeMs - event.createdTimeMs());
event.run();
metrics.recordEventQueueProcessingTime(time.milliseconds() - dequeuedTimeMs);
} catch (Throwable t) {
log.error("Failed to run event {} due to: {}.", event, t.getMessage(), t);
event.complete(t);
@ -112,6 +152,7 @@ public class MultiThreadedEventProcessor implements CoordinatorEventProcessor { @@ -112,6 +152,7 @@ public class MultiThreadedEventProcessor implements CoordinatorEventProcessor {
while (event != null) {
try {
log.debug("Draining event: {}.", event);
metrics.recordEventQueueTime(time.milliseconds() - event.createdTimeMs());
event.complete(new RejectedExecutionException("EventProcessor is closed."));
} catch (Throwable t) {
log.error("Failed to reject event {} due to: {}.", event, t.getMessage(), t);
@ -145,6 +186,18 @@ public class MultiThreadedEventProcessor implements CoordinatorEventProcessor { @@ -145,6 +186,18 @@ public class MultiThreadedEventProcessor implements CoordinatorEventProcessor {
log.info("Shutdown completed");
}
}
private void recordPollStartTime(long pollStartMs) {
this.pollStartMs = pollStartMs;
this.timeSinceLastPollMs = lastPollMs != 0L ? pollStartMs - lastPollMs : 0;
this.lastPollMs = pollStartMs;
}
private void recordPollEndTime(long pollEndMs) {
long pollTimeMs = pollEndMs - pollStartMs;
double pollIdleRatio = pollTimeMs * 1.0 / (pollTimeMs + timeSinceLastPollMs);
metrics.recordThreadIdleRatio(pollIdleRatio);
}
}
/**

149
group-coordinator/src/test/java/org/apache/kafka/coordinator/group/metrics/GroupCoordinatorRuntimeMetricsTest.java

@ -0,0 +1,149 @@ @@ -0,0 +1,149 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.coordinator.group.metrics;
import org.apache.kafka.common.MetricName;
import org.apache.kafka.common.metrics.KafkaMetric;
import org.apache.kafka.common.metrics.Metrics;
import org.apache.kafka.common.utils.MockTime;
import org.apache.kafka.common.utils.Time;
import org.apache.kafka.coordinator.group.runtime.CoordinatorRuntime.CoordinatorState;
import org.junit.jupiter.api.Test;
import java.util.Arrays;
import java.util.HashSet;
import java.util.stream.IntStream;
import static org.apache.kafka.coordinator.group.metrics.GroupCoordinatorRuntimeMetrics.METRICS_GROUP;
import static org.apache.kafka.coordinator.group.metrics.GroupCoordinatorRuntimeMetrics.NUM_PARTITIONS_METRIC_NAME;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertFalse;
import static org.junit.jupiter.api.Assertions.assertTrue;
public class GroupCoordinatorRuntimeMetricsTest {
@Test
public void testMetricNames() {
Metrics metrics = new Metrics();
HashSet<org.apache.kafka.common.MetricName> expectedMetrics = new HashSet<>(Arrays.asList(
kafkaMetricName(metrics, NUM_PARTITIONS_METRIC_NAME, "state", "loading"),
kafkaMetricName(metrics, NUM_PARTITIONS_METRIC_NAME, "state", "active"),
kafkaMetricName(metrics, NUM_PARTITIONS_METRIC_NAME, "state", "failed"),
metrics.metricName("event-queue-size", METRICS_GROUP),
metrics.metricName("partition-load-time-max", METRICS_GROUP),
metrics.metricName("partition-load-time-avg", METRICS_GROUP),
metrics.metricName("thread-idle-ratio-min", METRICS_GROUP),
metrics.metricName("thread-idle-ratio-avg", METRICS_GROUP)
));
try (GroupCoordinatorRuntimeMetrics runtimeMetrics = new GroupCoordinatorRuntimeMetrics(metrics)) {
runtimeMetrics.registerEventQueueSizeGauge(() -> 0);
expectedMetrics.forEach(metricName -> assertTrue(metrics.metrics().containsKey(metricName)));
}
expectedMetrics.forEach(metricName -> assertFalse(metrics.metrics().containsKey(metricName)));
}
@Test
public void testUpdateNumPartitionsMetrics() {
Metrics metrics = new Metrics();
try (GroupCoordinatorRuntimeMetrics runtimeMetrics = new GroupCoordinatorRuntimeMetrics(metrics)) {
IntStream.range(0, 10)
.forEach(__ -> runtimeMetrics.recordPartitionStateChange(CoordinatorState.INITIAL, CoordinatorState.LOADING));
IntStream.range(0, 8)
.forEach(__ -> runtimeMetrics.recordPartitionStateChange(CoordinatorState.LOADING, CoordinatorState.ACTIVE));
IntStream.range(0, 8)
.forEach(__ -> runtimeMetrics.recordPartitionStateChange(CoordinatorState.ACTIVE, CoordinatorState.FAILED));
IntStream.range(0, 2)
.forEach(__ -> runtimeMetrics.recordPartitionStateChange(CoordinatorState.FAILED, CoordinatorState.CLOSED));
assertMetricGauge(metrics, kafkaMetricName(metrics, NUM_PARTITIONS_METRIC_NAME, "state", "loading"), 2);
assertMetricGauge(metrics, kafkaMetricName(metrics, NUM_PARTITIONS_METRIC_NAME, "state", "active"), 0);
assertMetricGauge(metrics, kafkaMetricName(metrics, NUM_PARTITIONS_METRIC_NAME, "state", "failed"), 6);
}
}
@Test
public void testPartitionLoadSensorMetrics() {
Time time = new MockTime();
Metrics metrics = new Metrics(time);
try (GroupCoordinatorRuntimeMetrics runtimeMetrics = new GroupCoordinatorRuntimeMetrics(metrics)) {
long startTimeMs = time.milliseconds();
runtimeMetrics.recordPartitionLoadSensor(startTimeMs, startTimeMs + 1000);
runtimeMetrics.recordPartitionLoadSensor(startTimeMs, startTimeMs + 2000);
org.apache.kafka.common.MetricName metricName = metrics.metricName(
"partition-load-time-avg", METRICS_GROUP);
KafkaMetric metric = metrics.metrics().get(metricName);
assertEquals(1500.0, metric.metricValue());
metricName = metrics.metricName(
"partition-load-time-max", METRICS_GROUP);
metric = metrics.metrics().get(metricName);
assertEquals(2000.0, metric.metricValue());
}
}
@Test
public void testThreadIdleRatioSensor() {
Time time = new MockTime();
Metrics metrics = new Metrics(time);
try (GroupCoordinatorRuntimeMetrics runtimeMetrics = new GroupCoordinatorRuntimeMetrics(metrics)) {
IntStream.range(0, 3).forEach(i -> runtimeMetrics.recordThreadIdleRatio(1.0 / (i + 1)));
org.apache.kafka.common.MetricName metricName = metrics.metricName(
"thread-idle-ratio-avg", METRICS_GROUP);
KafkaMetric metric = metrics.metrics().get(metricName);
assertEquals((11.0 / 6.0) / 3.0, metric.metricValue()); // (6/6 + 3/6 + 2/6) / 3
metricName = metrics.metricName(
"thread-idle-ratio-min", METRICS_GROUP);
metric = metrics.metrics().get(metricName);
assertEquals(1.0 / 3.0, metric.metricValue());
}
}
@Test
public void testEventQueueSize() {
Time time = new MockTime();
Metrics metrics = new Metrics(time);
try (GroupCoordinatorRuntimeMetrics runtimeMetrics = new GroupCoordinatorRuntimeMetrics(metrics)) {
runtimeMetrics.registerEventQueueSizeGauge(() -> 5);
assertMetricGauge(metrics, kafkaMetricName(metrics, "event-queue-size"), 5);
}
}
private static void assertMetricGauge(Metrics metrics, org.apache.kafka.common.MetricName metricName, long count) {
assertEquals(count, (long) metrics.metric(metricName).metricValue());
}
private static com.yammer.metrics.core.MetricName yammerMetricName(String type, String name) {
String mBeanName = String.format("kafka.coordinator.group:type=%s,name=%s", type, name);
return new com.yammer.metrics.core.MetricName("kafka.coordinator.group", type, name, null, mBeanName);
}
private static MetricName kafkaMetricName(Metrics metrics, String name, String... keyValue) {
return metrics.metricName(name, METRICS_GROUP, "", keyValue);
}
}

198
group-coordinator/src/test/java/org/apache/kafka/coordinator/group/runtime/CoordinatorRuntimeTest.java

@ -21,6 +21,7 @@ import org.apache.kafka.common.TopicPartition; @@ -21,6 +21,7 @@ import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.errors.NotCoordinatorException;
import org.apache.kafka.common.utils.LogContext;
import org.apache.kafka.common.utils.Time;
import org.apache.kafka.coordinator.group.metrics.GroupCoordinatorRuntimeMetrics;
import org.apache.kafka.image.MetadataDelta;
import org.apache.kafka.image.MetadataImage;
import org.apache.kafka.image.MetadataProvenance;
@ -45,6 +46,11 @@ import java.util.concurrent.TimeoutException; @@ -45,6 +46,11 @@ import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicInteger;
import static org.apache.kafka.common.utils.Utils.mkSet;
import static org.apache.kafka.coordinator.group.runtime.CoordinatorRuntime.CoordinatorState.ACTIVE;
import static org.apache.kafka.coordinator.group.runtime.CoordinatorRuntime.CoordinatorState.CLOSED;
import static org.apache.kafka.coordinator.group.runtime.CoordinatorRuntime.CoordinatorState.FAILED;
import static org.apache.kafka.coordinator.group.runtime.CoordinatorRuntime.CoordinatorState.INITIAL;
import static org.apache.kafka.coordinator.group.runtime.CoordinatorRuntime.CoordinatorState.LOADING;
import static org.apache.kafka.test.TestUtils.assertFutureThrows;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertFalse;
@ -118,9 +124,19 @@ public class CoordinatorRuntimeTest { @@ -118,9 +124,19 @@ public class CoordinatorRuntimeTest {
* A CoordinatorLoader that always succeeds.
*/
private static class MockCoordinatorLoader implements CoordinatorLoader<String> {
private final LoadSummary summary;
public MockCoordinatorLoader(LoadSummary summary) {
this.summary = summary;
}
public MockCoordinatorLoader() {
this(null);
}
@Override
public CompletableFuture<Void> load(TopicPartition tp, CoordinatorPlayback<String> replayable) {
return CompletableFuture.completedFuture(null);
public CompletableFuture<LoadSummary> load(TopicPartition tp, CoordinatorPlayback<String> replayable) {
return CompletableFuture.completedFuture(summary);
}
@Override
@ -271,6 +287,7 @@ public class CoordinatorRuntimeTest { @@ -271,6 +287,7 @@ public class CoordinatorRuntimeTest {
.withEventProcessor(new DirectEventProcessor())
.withPartitionWriter(writer)
.withCoordinatorShardBuilderSupplier(supplier)
.withCoordinatorRuntimeMetrics(mock(GroupCoordinatorRuntimeMetrics.class))
.build();
when(builder.withSnapshotRegistry(any())).thenReturn(builder);
@ -280,7 +297,7 @@ public class CoordinatorRuntimeTest { @@ -280,7 +297,7 @@ public class CoordinatorRuntimeTest {
when(builder.withTopicPartition(any())).thenReturn(builder);
when(builder.build()).thenReturn(coordinator);
when(supplier.get()).thenReturn(builder);
CompletableFuture<Void> future = new CompletableFuture<>();
CompletableFuture<CoordinatorLoader.LoadSummary> future = new CompletableFuture<>();
when(loader.load(TP, coordinator)).thenReturn(future);
// Getting the coordinator context fails because the coordinator
@ -294,13 +311,13 @@ public class CoordinatorRuntimeTest { @@ -294,13 +311,13 @@ public class CoordinatorRuntimeTest {
CoordinatorRuntime<MockCoordinatorShard, String>.CoordinatorContext ctx = runtime.contextOrThrow(TP);
// The coordinator is loading.
assertEquals(CoordinatorRuntime.CoordinatorState.LOADING, ctx.state);
assertEquals(LOADING, ctx.state);
assertEquals(0, ctx.epoch);
assertEquals(coordinator, ctx.coordinator);
// When the loading completes, the coordinator transitions to active.
future.complete(null);
assertEquals(CoordinatorRuntime.CoordinatorState.ACTIVE, ctx.state);
assertEquals(ACTIVE, ctx.state);
// Verify that onLoaded is called.
verify(coordinator, times(1)).onLoaded(MetadataImage.EMPTY);
@ -335,6 +352,7 @@ public class CoordinatorRuntimeTest { @@ -335,6 +352,7 @@ public class CoordinatorRuntimeTest {
.withEventProcessor(new DirectEventProcessor())
.withPartitionWriter(writer)
.withCoordinatorShardBuilderSupplier(supplier)
.withCoordinatorRuntimeMetrics(mock(GroupCoordinatorRuntimeMetrics.class))
.build();
when(builder.withSnapshotRegistry(any())).thenReturn(builder);
@ -344,7 +362,7 @@ public class CoordinatorRuntimeTest { @@ -344,7 +362,7 @@ public class CoordinatorRuntimeTest {
when(builder.withTopicPartition(any())).thenReturn(builder);
when(builder.build()).thenReturn(coordinator);
when(supplier.get()).thenReturn(builder);
CompletableFuture<Void> future = new CompletableFuture<>();
CompletableFuture<CoordinatorLoader.LoadSummary> future = new CompletableFuture<>();
when(loader.load(TP, coordinator)).thenReturn(future);
// Schedule the loading.
@ -352,13 +370,13 @@ public class CoordinatorRuntimeTest { @@ -352,13 +370,13 @@ public class CoordinatorRuntimeTest {
// Getting the context succeeds and the coordinator should be in loading.
CoordinatorRuntime<MockCoordinatorShard, String>.CoordinatorContext ctx = runtime.contextOrThrow(TP);
assertEquals(CoordinatorRuntime.CoordinatorState.LOADING, ctx.state);
assertEquals(LOADING, ctx.state);
assertEquals(0, ctx.epoch);
assertEquals(coordinator, ctx.coordinator);
// When the loading fails, the coordinator transitions to failed.
future.completeExceptionally(new Exception("failure"));
assertEquals(CoordinatorRuntime.CoordinatorState.FAILED, ctx.state);
assertEquals(FAILED, ctx.state);
// Verify that onUnloaded is called.
verify(coordinator, times(1)).onUnloaded();
@ -386,6 +404,7 @@ public class CoordinatorRuntimeTest { @@ -386,6 +404,7 @@ public class CoordinatorRuntimeTest {
.withEventProcessor(new DirectEventProcessor())
.withPartitionWriter(new MockPartitionWriter())
.withCoordinatorShardBuilderSupplier(supplier)
.withCoordinatorRuntimeMetrics(mock(GroupCoordinatorRuntimeMetrics.class))
.build();
when(builder.withSnapshotRegistry(any())).thenReturn(builder);
@ -395,7 +414,7 @@ public class CoordinatorRuntimeTest { @@ -395,7 +414,7 @@ public class CoordinatorRuntimeTest {
when(builder.withTopicPartition(any())).thenReturn(builder);
when(builder.build()).thenReturn(coordinator);
when(supplier.get()).thenReturn(builder);
CompletableFuture<Void> future = new CompletableFuture<>();
CompletableFuture<CoordinatorLoader.LoadSummary> future = new CompletableFuture<>();
when(loader.load(TP, coordinator)).thenReturn(future);
// Schedule the loading.
@ -403,19 +422,19 @@ public class CoordinatorRuntimeTest { @@ -403,19 +422,19 @@ public class CoordinatorRuntimeTest {
// Getting the context succeeds and the coordinator should be in loading.
CoordinatorRuntime<MockCoordinatorShard, String>.CoordinatorContext ctx = runtime.contextOrThrow(TP);
assertEquals(CoordinatorRuntime.CoordinatorState.LOADING, ctx.state);
assertEquals(LOADING, ctx.state);
assertEquals(10, ctx.epoch);
assertEquals(coordinator, ctx.coordinator);
// When the loading completes, the coordinator transitions to active.
future.complete(null);
assertEquals(CoordinatorRuntime.CoordinatorState.ACTIVE, ctx.state);
assertEquals(ACTIVE, ctx.state);
assertEquals(10, ctx.epoch);
// Loading with a previous epoch is a no-op. The coordinator stays
// in active state with the correct epoch.
runtime.scheduleLoadOperation(TP, 0);
assertEquals(CoordinatorRuntime.CoordinatorState.ACTIVE, ctx.state);
assertEquals(ACTIVE, ctx.state);
assertEquals(10, ctx.epoch);
}
@ -435,6 +454,7 @@ public class CoordinatorRuntimeTest { @@ -435,6 +454,7 @@ public class CoordinatorRuntimeTest {
.withEventProcessor(new DirectEventProcessor())
.withPartitionWriter(new MockPartitionWriter())
.withCoordinatorShardBuilderSupplier(supplier)
.withCoordinatorRuntimeMetrics(mock(GroupCoordinatorRuntimeMetrics.class))
.build();
when(builder.withSnapshotRegistry(any())).thenReturn(builder);
@ -444,7 +464,7 @@ public class CoordinatorRuntimeTest { @@ -444,7 +464,7 @@ public class CoordinatorRuntimeTest {
when(builder.withTopicPartition(any())).thenReturn(builder);
when(builder.build()).thenReturn(coordinator);
when(supplier.get()).thenReturn(builder);
CompletableFuture<Void> future = new CompletableFuture<>();
CompletableFuture<CoordinatorLoader.LoadSummary> future = new CompletableFuture<>();
when(loader.load(TP, coordinator)).thenReturn(future);
// Schedule the loading.
@ -452,13 +472,13 @@ public class CoordinatorRuntimeTest { @@ -452,13 +472,13 @@ public class CoordinatorRuntimeTest {
// Getting the context succeeds and the coordinator should be in loading.
CoordinatorRuntime<MockCoordinatorShard, String>.CoordinatorContext ctx = runtime.contextOrThrow(TP);
assertEquals(CoordinatorRuntime.CoordinatorState.LOADING, ctx.state);
assertEquals(LOADING, ctx.state);
assertEquals(10, ctx.epoch);
assertEquals(coordinator, ctx.coordinator);
// When the loading fails, the coordinator transitions to failed.
future.completeExceptionally(new Exception("failure"));
assertEquals(CoordinatorRuntime.CoordinatorState.FAILED, ctx.state);
assertEquals(FAILED, ctx.state);
// Verify that onUnloaded is called.
verify(coordinator, times(1)).onUnloaded();
@ -474,7 +494,7 @@ public class CoordinatorRuntimeTest { @@ -474,7 +494,7 @@ public class CoordinatorRuntimeTest {
// Getting the context succeeds and the coordinator should be in loading.
ctx = runtime.contextOrThrow(TP);
assertEquals(CoordinatorRuntime.CoordinatorState.LOADING, ctx.state);
assertEquals(LOADING, ctx.state);
assertEquals(11, ctx.epoch);
assertEquals(coordinator, ctx.coordinator);
@ -482,7 +502,7 @@ public class CoordinatorRuntimeTest { @@ -482,7 +502,7 @@ public class CoordinatorRuntimeTest {
future.complete(null);
// Verify the state.
assertEquals(CoordinatorRuntime.CoordinatorState.ACTIVE, ctx.state);
assertEquals(ACTIVE, ctx.state);
}
@Test
@ -501,6 +521,7 @@ public class CoordinatorRuntimeTest { @@ -501,6 +521,7 @@ public class CoordinatorRuntimeTest {
.withEventProcessor(new DirectEventProcessor())
.withPartitionWriter(writer)
.withCoordinatorShardBuilderSupplier(supplier)
.withCoordinatorRuntimeMetrics(mock(GroupCoordinatorRuntimeMetrics.class))
.build();
when(builder.withSnapshotRegistry(any())).thenReturn(builder);
@ -514,12 +535,12 @@ public class CoordinatorRuntimeTest { @@ -514,12 +535,12 @@ public class CoordinatorRuntimeTest {
// Loads the coordinator. It directly transitions to active.
runtime.scheduleLoadOperation(TP, 10);
CoordinatorRuntime<MockCoordinatorShard, String>.CoordinatorContext ctx = runtime.contextOrThrow(TP);
assertEquals(CoordinatorRuntime.CoordinatorState.ACTIVE, ctx.state);
assertEquals(ACTIVE, ctx.state);
assertEquals(10, ctx.epoch);
// Schedule the unloading.
runtime.scheduleUnloadOperation(TP, ctx.epoch + 1);
assertEquals(CoordinatorRuntime.CoordinatorState.CLOSED, ctx.state);
assertEquals(CLOSED, ctx.state);
// Verify that onUnloaded is called.
verify(coordinator, times(1)).onUnloaded();
@ -549,6 +570,7 @@ public class CoordinatorRuntimeTest { @@ -549,6 +570,7 @@ public class CoordinatorRuntimeTest {
.withEventProcessor(new DirectEventProcessor())
.withPartitionWriter(new MockPartitionWriter())
.withCoordinatorShardBuilderSupplier(supplier)
.withCoordinatorRuntimeMetrics(mock(GroupCoordinatorRuntimeMetrics.class))
.build();
when(builder.withSnapshotRegistry(any())).thenReturn(builder);
@ -564,13 +586,13 @@ public class CoordinatorRuntimeTest { @@ -564,13 +586,13 @@ public class CoordinatorRuntimeTest {
// Loads the coordinator. It directly transitions to active.
runtime.scheduleLoadOperation(TP, 10);
CoordinatorRuntime<MockCoordinatorShard, String>.CoordinatorContext ctx = runtime.contextOrThrow(TP);
assertEquals(CoordinatorRuntime.CoordinatorState.ACTIVE, ctx.state);
assertEquals(ACTIVE, ctx.state);
assertEquals(10, ctx.epoch);
// Unloading with a previous epoch is a no-op. The coordinator stays
// in active with the correct epoch.
runtime.scheduleUnloadOperation(TP, 0);
assertEquals(CoordinatorRuntime.CoordinatorState.ACTIVE, ctx.state);
assertEquals(ACTIVE, ctx.state);
assertEquals(10, ctx.epoch);
}
@ -587,6 +609,7 @@ public class CoordinatorRuntimeTest { @@ -587,6 +609,7 @@ public class CoordinatorRuntimeTest {
.withEventProcessor(new DirectEventProcessor())
.withPartitionWriter(writer)
.withCoordinatorShardBuilderSupplier(new MockCoordinatorShardBuilderSupplier())
.withCoordinatorRuntimeMetrics(mock(GroupCoordinatorRuntimeMetrics.class))
.build();
// Schedule the loading.
@ -695,6 +718,7 @@ public class CoordinatorRuntimeTest { @@ -695,6 +718,7 @@ public class CoordinatorRuntimeTest {
.withEventProcessor(new DirectEventProcessor())
.withPartitionWriter(new MockPartitionWriter())
.withCoordinatorShardBuilderSupplier(new MockCoordinatorShardBuilderSupplier())
.withCoordinatorRuntimeMetrics(mock(GroupCoordinatorRuntimeMetrics.class))
.build();
// Scheduling a write fails with a NotCoordinatorException because the coordinator
@ -715,6 +739,7 @@ public class CoordinatorRuntimeTest { @@ -715,6 +739,7 @@ public class CoordinatorRuntimeTest {
.withEventProcessor(new DirectEventProcessor())
.withPartitionWriter(new MockPartitionWriter())
.withCoordinatorShardBuilderSupplier(new MockCoordinatorShardBuilderSupplier())
.withCoordinatorRuntimeMetrics(mock(GroupCoordinatorRuntimeMetrics.class))
.build();
// Loads the coordinator.
@ -739,6 +764,7 @@ public class CoordinatorRuntimeTest { @@ -739,6 +764,7 @@ public class CoordinatorRuntimeTest {
.withEventProcessor(new DirectEventProcessor())
.withPartitionWriter(new MockPartitionWriter())
.withCoordinatorShardBuilderSupplier(new MockCoordinatorShardBuilderSupplier())
.withCoordinatorRuntimeMetrics(mock(GroupCoordinatorRuntimeMetrics.class))
.build();
// Loads the coordinator.
@ -784,6 +810,7 @@ public class CoordinatorRuntimeTest { @@ -784,6 +810,7 @@ public class CoordinatorRuntimeTest {
.withEventProcessor(new DirectEventProcessor())
.withPartitionWriter(writer)
.withCoordinatorShardBuilderSupplier(new MockCoordinatorShardBuilderSupplier())
.withCoordinatorRuntimeMetrics(mock(GroupCoordinatorRuntimeMetrics.class))
.build();
// Loads the coordinator.
@ -831,6 +858,7 @@ public class CoordinatorRuntimeTest { @@ -831,6 +858,7 @@ public class CoordinatorRuntimeTest {
.withEventProcessor(new DirectEventProcessor())
.withPartitionWriter(writer)
.withCoordinatorShardBuilderSupplier(new MockCoordinatorShardBuilderSupplier())
.withCoordinatorRuntimeMetrics(mock(GroupCoordinatorRuntimeMetrics.class))
.build();
// Loads the coordinator.
@ -885,6 +913,7 @@ public class CoordinatorRuntimeTest { @@ -885,6 +913,7 @@ public class CoordinatorRuntimeTest {
.withEventProcessor(new DirectEventProcessor())
.withPartitionWriter(new MockPartitionWriter())
.withCoordinatorShardBuilderSupplier(new MockCoordinatorShardBuilderSupplier())
.withCoordinatorRuntimeMetrics(mock(GroupCoordinatorRuntimeMetrics.class))
.build();
// Schedule a read. It fails because the coordinator does not exist.
@ -906,6 +935,7 @@ public class CoordinatorRuntimeTest { @@ -906,6 +935,7 @@ public class CoordinatorRuntimeTest {
.withEventProcessor(new DirectEventProcessor())
.withPartitionWriter(writer)
.withCoordinatorShardBuilderSupplier(new MockCoordinatorShardBuilderSupplier())
.withCoordinatorRuntimeMetrics(mock(GroupCoordinatorRuntimeMetrics.class))
.build();
// Loads the coordinator.
@ -947,6 +977,7 @@ public class CoordinatorRuntimeTest { @@ -947,6 +977,7 @@ public class CoordinatorRuntimeTest {
.withEventProcessor(new DirectEventProcessor())
.withPartitionWriter(new MockPartitionWriter())
.withCoordinatorShardBuilderSupplier(new MockCoordinatorShardBuilderSupplier())
.withCoordinatorRuntimeMetrics(mock(GroupCoordinatorRuntimeMetrics.class))
.build();
// Loads the coordinator.
@ -1012,6 +1043,7 @@ public class CoordinatorRuntimeTest { @@ -1012,6 +1043,7 @@ public class CoordinatorRuntimeTest {
.withEventProcessor(new DirectEventProcessor())
.withPartitionWriter(writer)
.withCoordinatorShardBuilderSupplier(supplier)
.withCoordinatorRuntimeMetrics(mock(GroupCoordinatorRuntimeMetrics.class))
.build();
MockCoordinatorShard coordinator0 = mock(MockCoordinatorShard.class);
@ -1028,10 +1060,10 @@ public class CoordinatorRuntimeTest { @@ -1028,10 +1060,10 @@ public class CoordinatorRuntimeTest {
.thenReturn(coordinator0)
.thenReturn(coordinator1);
CompletableFuture<Void> future0 = new CompletableFuture<>();
CompletableFuture<CoordinatorLoader.LoadSummary> future0 = new CompletableFuture<>();
when(loader.load(tp0, coordinator0)).thenReturn(future0);
CompletableFuture<Void> future1 = new CompletableFuture<>();
CompletableFuture<CoordinatorLoader.LoadSummary> future1 = new CompletableFuture<>();
when(loader.load(tp1, coordinator1)).thenReturn(future1);
runtime.scheduleLoadOperation(tp0, 0);
@ -1067,6 +1099,7 @@ public class CoordinatorRuntimeTest { @@ -1067,6 +1099,7 @@ public class CoordinatorRuntimeTest {
.withEventProcessor(new DirectEventProcessor())
.withPartitionWriter(new MockPartitionWriter())
.withCoordinatorShardBuilderSupplier(new MockCoordinatorShardBuilderSupplier())
.withCoordinatorRuntimeMetrics(mock(GroupCoordinatorRuntimeMetrics.class))
.build();
// Loads the coordinator.
@ -1118,6 +1151,7 @@ public class CoordinatorRuntimeTest { @@ -1118,6 +1151,7 @@ public class CoordinatorRuntimeTest {
.withEventProcessor(processor)
.withPartitionWriter(new MockPartitionWriter())
.withCoordinatorShardBuilderSupplier(new MockCoordinatorShardBuilderSupplier())
.withCoordinatorRuntimeMetrics(mock(GroupCoordinatorRuntimeMetrics.class))
.build();
// Loads the coordinator.
@ -1189,6 +1223,7 @@ public class CoordinatorRuntimeTest { @@ -1189,6 +1223,7 @@ public class CoordinatorRuntimeTest {
.withEventProcessor(processor)
.withPartitionWriter(new MockPartitionWriter())
.withCoordinatorShardBuilderSupplier(new MockCoordinatorShardBuilderSupplier())
.withCoordinatorRuntimeMetrics(mock(GroupCoordinatorRuntimeMetrics.class))
.build();
// Loads the coordinator.
@ -1257,6 +1292,7 @@ public class CoordinatorRuntimeTest { @@ -1257,6 +1292,7 @@ public class CoordinatorRuntimeTest {
.withEventProcessor(new DirectEventProcessor())
.withPartitionWriter(new MockPartitionWriter())
.withCoordinatorShardBuilderSupplier(new MockCoordinatorShardBuilderSupplier())
.withCoordinatorRuntimeMetrics(mock(GroupCoordinatorRuntimeMetrics.class))
.build();
// Loads the coordinator.
@ -1313,6 +1349,7 @@ public class CoordinatorRuntimeTest { @@ -1313,6 +1349,7 @@ public class CoordinatorRuntimeTest {
.withEventProcessor(new DirectEventProcessor())
.withPartitionWriter(new MockPartitionWriter())
.withCoordinatorShardBuilderSupplier(new MockCoordinatorShardBuilderSupplier())
.withCoordinatorRuntimeMetrics(mock(GroupCoordinatorRuntimeMetrics.class))
.build();
// Loads the coordinator.
@ -1340,4 +1377,119 @@ public class CoordinatorRuntimeTest { @@ -1340,4 +1377,119 @@ public class CoordinatorRuntimeTest {
assertEquals(1, cnt.get());
assertEquals(0, ctx.timer.size());
}
@Test
public void testStateChanges() throws Exception {
MockTimer timer = new MockTimer();
MockPartitionWriter writer = mock(MockPartitionWriter.class);
MockCoordinatorLoader loader = mock(MockCoordinatorLoader.class);
MockCoordinatorShardBuilderSupplier supplier = mock(MockCoordinatorShardBuilderSupplier.class);
MockCoordinatorShardBuilder builder = mock(MockCoordinatorShardBuilder.class);
MockCoordinatorShard coordinator = mock(MockCoordinatorShard.class);
GroupCoordinatorRuntimeMetrics runtimeMetrics = mock(GroupCoordinatorRuntimeMetrics.class);
CoordinatorRuntime<MockCoordinatorShard, String> runtime =
new CoordinatorRuntime.Builder<MockCoordinatorShard, String>()
.withTime(timer.time())
.withTimer(timer)
.withLoader(loader)
.withEventProcessor(new DirectEventProcessor())
.withPartitionWriter(writer)
.withCoordinatorShardBuilderSupplier(supplier)
.withCoordinatorRuntimeMetrics(runtimeMetrics)
.build();
when(builder.withSnapshotRegistry(any())).thenReturn(builder);
when(builder.withLogContext(any())).thenReturn(builder);
when(builder.withTime(any())).thenReturn(builder);
when(builder.withTimer(any())).thenReturn(builder);
when(builder.withTopicPartition(any())).thenReturn(builder);
when(builder.build()).thenReturn(coordinator);
when(supplier.get()).thenReturn(builder);
CompletableFuture<CoordinatorLoader.LoadSummary> future = new CompletableFuture<>();
when(loader.load(TP, coordinator)).thenReturn(future);
// Schedule the loading.
runtime.scheduleLoadOperation(TP, 0);
// Getting the context succeeds and the coordinator should be in loading.
CoordinatorRuntime<MockCoordinatorShard, String>.CoordinatorContext ctx = runtime.contextOrThrow(TP);
assertEquals(LOADING, ctx.state);
verify(runtimeMetrics, times(1)).recordPartitionStateChange(INITIAL, LOADING);
// When the loading fails, the coordinator transitions to failed.
future.completeExceptionally(new Exception("failure"));
assertEquals(FAILED, ctx.state);
verify(runtimeMetrics, times(1)).recordPartitionStateChange(LOADING, FAILED);
// Start loading a new topic partition.
TopicPartition tp = new TopicPartition("__consumer_offsets", 1);
future = new CompletableFuture<>();
when(loader.load(tp, coordinator)).thenReturn(future);
// Schedule the loading.
runtime.scheduleLoadOperation(tp, 0);
// Getting the context succeeds and the coordinator should be in loading.
ctx = runtime.contextOrThrow(tp);
assertEquals(LOADING, ctx.state);
verify(runtimeMetrics, times(2)).recordPartitionStateChange(INITIAL, LOADING);
// When the loading completes, the coordinator transitions to active.
future.complete(null);
assertEquals(ACTIVE, ctx.state);
verify(runtimeMetrics, times(1)).recordPartitionStateChange(LOADING, ACTIVE);
runtime.close();
verify(runtimeMetrics, times(1)).recordPartitionStateChange(FAILED, CLOSED);
verify(runtimeMetrics, times(1)).recordPartitionStateChange(ACTIVE, CLOSED);
}
@Test
public void testPartitionLoadSensor() {
MockTimer timer = new MockTimer();
MockPartitionWriter writer = mock(MockPartitionWriter.class);
MockCoordinatorShardBuilderSupplier supplier = mock(MockCoordinatorShardBuilderSupplier.class);
MockCoordinatorShardBuilder builder = mock(MockCoordinatorShardBuilder.class);
MockCoordinatorShard coordinator = mock(MockCoordinatorShard.class);
GroupCoordinatorRuntimeMetrics runtimeMetrics = mock(GroupCoordinatorRuntimeMetrics.class);
long startTimeMs = timer.time().milliseconds();
CoordinatorRuntime<MockCoordinatorShard, String> runtime =
new CoordinatorRuntime.Builder<MockCoordinatorShard, String>()
.withTime(timer.time())
.withTimer(timer)
.withLoader(new MockCoordinatorLoader(
new CoordinatorLoader.LoadSummary(
startTimeMs,
startTimeMs + 1000,
30,
3000)))
.withEventProcessor(new DirectEventProcessor())
.withPartitionWriter(writer)
.withCoordinatorShardBuilderSupplier(supplier)
.withCoordinatorRuntimeMetrics(runtimeMetrics)
.build();
when(builder.withSnapshotRegistry(any())).thenReturn(builder);
when(builder.withLogContext(any())).thenReturn(builder);
when(builder.withTime(any())).thenReturn(builder);
when(builder.withTimer(any())).thenReturn(builder);
when(builder.withTopicPartition(any())).thenReturn(builder);
when(builder.build()).thenReturn(coordinator);
when(supplier.get()).thenReturn(builder);
// Getting the coordinator context fails because the coordinator
// does not exist until scheduleLoadOperation is called.
assertThrows(NotCoordinatorException.class, () -> runtime.contextOrThrow(TP));
// Schedule the loading.
runtime.scheduleLoadOperation(TP, 0);
// Getting the coordinator context succeeds now.
CoordinatorRuntime<MockCoordinatorShard, String>.CoordinatorContext ctx = runtime.contextOrThrow(TP);
// When the loading completes, the coordinator transitions to active.
assertEquals(ACTIVE, ctx.state);
verify(runtimeMetrics, times(1)).recordPartitionLoadSensor(startTimeMs, startTimeMs + 1000);
}
}

243
group-coordinator/src/test/java/org/apache/kafka/coordinator/group/runtime/MultiThreadedEventProcessorTest.java

@ -18,16 +18,24 @@ package org.apache.kafka.coordinator.group.runtime; @@ -18,16 +18,24 @@ package org.apache.kafka.coordinator.group.runtime;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.utils.LogContext;
import org.apache.kafka.common.utils.MockTime;
import org.apache.kafka.common.utils.Time;
import org.apache.kafka.coordinator.group.metrics.GroupCoordinatorRuntimeMetrics;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.Timeout;
import org.mockito.ArgumentCaptor;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.LinkedList;
import java.util.List;
import java.util.Queue;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.Supplier;
@ -37,9 +45,63 @@ import static org.junit.jupiter.api.Assertions.assertEquals; @@ -37,9 +45,63 @@ import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertFalse;
import static org.junit.jupiter.api.Assertions.assertThrows;
import static org.junit.jupiter.api.Assertions.assertTrue;
import static org.mockito.ArgumentMatchers.anyDouble;
import static org.mockito.Mockito.doAnswer;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
@Timeout(value = 60)
public class MultiThreadedEventProcessorTest {
private static class MockEventAccumulator<T> extends EventAccumulator<TopicPartition, CoordinatorEvent> {
private final Time time;
private final Queue<CoordinatorEvent> events;
private final long timeToPollMs;
private final AtomicBoolean isClosed;
public MockEventAccumulator(Time time, long timeToPollMs) {
this.time = time;
this.events = new LinkedList<>();
this.timeToPollMs = timeToPollMs;
this.isClosed = new AtomicBoolean(false);
}
@Override
public CoordinatorEvent poll() {
synchronized (events) {
while (events.isEmpty() && !isClosed.get()) {
try {
events.wait();
} catch (Exception ignored) {
}
}
time.sleep(timeToPollMs);
return events.poll();
}
}
@Override
public CoordinatorEvent poll(long timeout, TimeUnit unit) {
return null;
}
@Override
public void add(CoordinatorEvent event) throws RejectedExecutionException {
synchronized (events) {
events.add(event);
events.notifyAll();
}
}
@Override
public void close() {
isClosed.set(true);
synchronized (events) {
events.notifyAll();
}
}
}
private static class FutureEvent<T> implements CoordinatorEvent {
private final TopicPartition key;
@ -48,18 +110,28 @@ public class MultiThreadedEventProcessorTest { @@ -48,18 +110,28 @@ public class MultiThreadedEventProcessorTest {
private final boolean block;
private final CountDownLatch latch;
private final CountDownLatch executed;
private long createdTimeMs;
FutureEvent(
TopicPartition key,
Supplier<T> supplier
) {
this(key, supplier, false);
this(key, supplier, false, 0L);
}
FutureEvent(
TopicPartition key,
Supplier<T> supplier,
boolean block
) {
this(key, supplier, block, 0L);
}
FutureEvent(
TopicPartition key,
Supplier<T> supplier,
boolean block,
long createdTimeMs
) {
this.key = key;
this.future = new CompletableFuture<>();
@ -67,6 +139,7 @@ public class MultiThreadedEventProcessorTest { @@ -67,6 +139,7 @@ public class MultiThreadedEventProcessorTest {
this.block = block;
this.latch = new CountDownLatch(1);
this.executed = new CountDownLatch(1);
this.createdTimeMs = createdTimeMs;
}
@Override
@ -90,6 +163,11 @@ public class MultiThreadedEventProcessorTest { @@ -90,6 +163,11 @@ public class MultiThreadedEventProcessorTest {
future.completeExceptionally(ex);
}
@Override
public long createdTimeMs() {
return createdTimeMs;
}
@Override
public TopicPartition key() {
return key;
@ -118,7 +196,9 @@ public class MultiThreadedEventProcessorTest { @@ -118,7 +196,9 @@ public class MultiThreadedEventProcessorTest {
CoordinatorEventProcessor eventProcessor = new MultiThreadedEventProcessor(
new LogContext(),
"event-processor-",
2
2,
Time.SYSTEM,
mock(GroupCoordinatorRuntimeMetrics.class)
);
eventProcessor.close();
}
@ -128,7 +208,9 @@ public class MultiThreadedEventProcessorTest { @@ -128,7 +208,9 @@ public class MultiThreadedEventProcessorTest {
try (CoordinatorEventProcessor eventProcessor = new MultiThreadedEventProcessor(
new LogContext(),
"event-processor-",
2
2,
Time.SYSTEM,
mock(GroupCoordinatorRuntimeMetrics.class)
)) {
AtomicInteger numEventsExecuted = new AtomicInteger(0);
@ -163,7 +245,9 @@ public class MultiThreadedEventProcessorTest { @@ -163,7 +245,9 @@ public class MultiThreadedEventProcessorTest {
try (CoordinatorEventProcessor eventProcessor = new MultiThreadedEventProcessor(
new LogContext(),
"event-processor-",
2
2,
Time.SYSTEM,
mock(GroupCoordinatorRuntimeMetrics.class)
)) {
AtomicInteger numEventsExecuted = new AtomicInteger(0);
@ -246,7 +330,9 @@ public class MultiThreadedEventProcessorTest { @@ -246,7 +330,9 @@ public class MultiThreadedEventProcessorTest {
CoordinatorEventProcessor eventProcessor = new MultiThreadedEventProcessor(
new LogContext(),
"event-processor-",
2
2,
Time.SYSTEM,
mock(GroupCoordinatorRuntimeMetrics.class)
);
eventProcessor.close();
@ -260,7 +346,9 @@ public class MultiThreadedEventProcessorTest { @@ -260,7 +346,9 @@ public class MultiThreadedEventProcessorTest {
try (MultiThreadedEventProcessor eventProcessor = new MultiThreadedEventProcessor(
new LogContext(),
"event-processor-",
1 // Use a single thread to block event in the processor.
1, // Use a single thread to block event in the processor.
Time.SYSTEM,
mock(GroupCoordinatorRuntimeMetrics.class)
)) {
AtomicInteger numEventsExecuted = new AtomicInteger(0);
@ -317,4 +405,147 @@ public class MultiThreadedEventProcessorTest { @@ -317,4 +405,147 @@ public class MultiThreadedEventProcessorTest {
assertEquals(1, numEventsExecuted.get());
}
}
@Test
public void testMetrics() throws Exception {
GroupCoordinatorRuntimeMetrics mockRuntimeMetrics = mock(GroupCoordinatorRuntimeMetrics.class);
Time mockTime = new MockTime();
AtomicInteger numEventsExecuted = new AtomicInteger(0);
// Special event which blocks until the latch is released.
FutureEvent<Integer> blockingEvent = new FutureEvent<>(
new TopicPartition("foo", 0), () -> {
mockTime.sleep(4000L);
return numEventsExecuted.incrementAndGet();
},
true,
mockTime.milliseconds()
);
try (MultiThreadedEventProcessor eventProcessor = new MultiThreadedEventProcessor(
new LogContext(),
"event-processor-",
1, // Use a single thread to block event in the processor.
mockTime,
mockRuntimeMetrics,
new MockEventAccumulator<>(mockTime, 500L)
)) {
// Enqueue the blocking event.
eventProcessor.enqueue(blockingEvent);
// Ensure that the blocking event is executed.
waitForCondition(() -> numEventsExecuted.get() > 0,
"Blocking event not executed.");
// Enqueue the other event.
FutureEvent<Integer> otherEvent = new FutureEvent<>(
new TopicPartition("foo", 0), () -> {
mockTime.sleep(5000L);
return numEventsExecuted.incrementAndGet();
},
false,
mockTime.milliseconds()
);
eventProcessor.enqueue(otherEvent);
// Pass the time.
mockTime.sleep(3000L);
// Events should not be completed.
assertFalse(otherEvent.future.isDone());
// Release the blocking event to unblock the thread.
blockingEvent.release();
// The blocking event should be completed.
blockingEvent.future.get(DEFAULT_MAX_WAIT_MS, TimeUnit.SECONDS);
assertTrue(blockingEvent.future.isDone());
assertFalse(blockingEvent.future.isCompletedExceptionally());
// The other event should also be completed.
otherEvent.future.get(DEFAULT_MAX_WAIT_MS, TimeUnit.SECONDS);
assertTrue(otherEvent.future.isDone());
assertFalse(otherEvent.future.isCompletedExceptionally());
assertEquals(2, numEventsExecuted.get());
// e1 poll time = 500
// e1 processing time = 4000
// e2 enqueue time = 3000
// e2 poll time = 500
// e2 processing time = 5000
// e1 poll time / e1 poll time
verify(mockRuntimeMetrics, times(1)).recordThreadIdleRatio(1.0);
// e1 poll time
verify(mockRuntimeMetrics, times(1)).recordEventQueueTime(500L);
// e1 processing time + e2 enqueue time
verify(mockRuntimeMetrics, times(1)).recordEventQueueProcessingTime(7000L);
// Second event (e2)
// idle ratio = e2 poll time / (e1 poll time + e1 processing time + e2 enqueue time + e2 poll time)
verify(mockRuntimeMetrics, times(1)).recordThreadIdleRatio(500.0 / (500.0 + 7000.0 + 500.0));
// event queue time = e2 enqueue time + e2 poll time
verify(mockRuntimeMetrics, times(1)).recordEventQueueTime(3500L);
// e2 processing time
verify(mockRuntimeMetrics, times(1)).recordEventQueueProcessingTime(5000L);
}
}
@Test
public void testRecordThreadIdleRatioTwoThreads() throws Exception {
GroupCoordinatorRuntimeMetrics mockRuntimeMetrics = mock(GroupCoordinatorRuntimeMetrics.class);
try (CoordinatorEventProcessor eventProcessor = new MultiThreadedEventProcessor(
new LogContext(),
"event-processor-",
2,
Time.SYSTEM,
mockRuntimeMetrics,
new MockEventAccumulator<>(Time.SYSTEM, 100L)
)) {
List<Double> recordedRatios = new ArrayList<>();
AtomicInteger numEventsExecuted = new AtomicInteger(0);
ArgumentCaptor<Double> ratioCaptured = ArgumentCaptor.forClass(Double.class);
doAnswer(invocation -> {
double threadIdleRatio = ratioCaptured.getValue();
assertTrue(threadIdleRatio > 0.0);
synchronized (recordedRatios) {
recordedRatios.add(threadIdleRatio);
}
return null;
}).when(mockRuntimeMetrics).recordThreadIdleRatio(ratioCaptured.capture());
List<FutureEvent<Integer>> events = Arrays.asList(
new FutureEvent<>(new TopicPartition("foo", 0), numEventsExecuted::incrementAndGet),
new FutureEvent<>(new TopicPartition("foo", 1), numEventsExecuted::incrementAndGet),
new FutureEvent<>(new TopicPartition("foo", 2), numEventsExecuted::incrementAndGet),
new FutureEvent<>(new TopicPartition("foo", 0), numEventsExecuted::incrementAndGet),
new FutureEvent<>(new TopicPartition("foo", 1), numEventsExecuted::incrementAndGet),
new FutureEvent<>(new TopicPartition("foo", 2), numEventsExecuted::incrementAndGet),
new FutureEvent<>(new TopicPartition("foo", 2), numEventsExecuted::incrementAndGet)
);
events.forEach(eventProcessor::enqueue);
CompletableFuture.allOf(events
.stream()
.map(FutureEvent::future)
.toArray(CompletableFuture[]::new)
).get(10, TimeUnit.SECONDS);
events.forEach(event -> {
assertTrue(event.future.isDone());
assertFalse(event.future.isCompletedExceptionally());
});
assertEquals(events.size(), numEventsExecuted.get());
verify(mockRuntimeMetrics, times(7)).recordThreadIdleRatio(anyDouble());
assertEquals(7, recordedRatios.size());
double average = recordedRatios.stream().mapToDouble(Double::doubleValue).sum() / 7;
assertTrue(average > 0.0 && average < 1.0);
}
}
}

Loading…
Cancel
Save