Browse Source

MINOR: Add ZK dual-write lag metric (#14009)

This patch adds ZKWriteBehindLag metric to the KafkaController mbean as specified in KIP-866

Reviewers: David Arthur <mumrah@gmail.com>
pull/14039/head
Hailey Ni 1 year ago committed by GitHub
parent
commit
9e50f7cdd3
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 5
      core/src/main/scala/kafka/server/ControllerServer.scala
  2. 2
      metadata/src/main/java/org/apache/kafka/controller/QuorumController.java
  3. 29
      metadata/src/main/java/org/apache/kafka/controller/metrics/QuorumControllerMetrics.java
  4. 12
      metadata/src/main/java/org/apache/kafka/metadata/migration/KRaftMigrationDriver.java
  5. 2
      metadata/src/test/java/org/apache/kafka/controller/QuorumControllerTest.java
  6. 66
      metadata/src/test/java/org/apache/kafka/controller/metrics/QuorumControllerMetricsTest.java
  7. 44
      metadata/src/test/java/org/apache/kafka/metadata/migration/KRaftMigrationDriverTest.java

5
core/src/main/scala/kafka/server/ControllerServer.scala

@ -214,7 +214,7 @@ class ControllerServer( @@ -214,7 +214,7 @@ class ControllerServer(
val maxIdleIntervalNs = config.metadataMaxIdleIntervalNs.fold(OptionalLong.empty)(OptionalLong.of)
quorumControllerMetrics = new QuorumControllerMetrics(Optional.of(KafkaYammerMetrics.defaultRegistry), time)
quorumControllerMetrics = new QuorumControllerMetrics(Optional.of(KafkaYammerMetrics.defaultRegistry), time, config.migrationEnabled)
new QuorumController.Builder(config.nodeId, sharedServer.metaProps.clusterId).
setTime(time).
@ -268,7 +268,8 @@ class ControllerServer( @@ -268,7 +268,8 @@ class ControllerServer(
() => {}
),
quorumFeatures,
configSchema
configSchema,
quorumControllerMetrics
)
migrationDriver.start()
migrationSupport = Some(ControllerMigrationSupport(zkClient, migrationDriver, propagator))

2
metadata/src/main/java/org/apache/kafka/controller/QuorumController.java

@ -346,7 +346,7 @@ public final class QuorumController implements Controller { @@ -346,7 +346,7 @@ public final class QuorumController implements Controller {
logContext = new LogContext(String.format("[QuorumController id=%d] ", nodeId));
}
if (controllerMetrics == null) {
controllerMetrics = new QuorumControllerMetrics(Optional.empty(), time);
controllerMetrics = new QuorumControllerMetrics(Optional.empty(), time, zkMigrationEnabled);
}
KafkaEventQueue queue = null;

29
metadata/src/main/java/org/apache/kafka/controller/metrics/QuorumControllerMetrics.java

@ -44,6 +44,8 @@ public class QuorumControllerMetrics implements AutoCloseable { @@ -44,6 +44,8 @@ public class QuorumControllerMetrics implements AutoCloseable {
"ControllerEventManager", "EventQueueTimeMs");
private final static MetricName EVENT_QUEUE_PROCESSING_TIME_MS = getMetricName(
"ControllerEventManager", "EventQueueProcessingTimeMs");
private final static MetricName ZK_WRITE_BEHIND_LAG = getMetricName(
"KafkaController", "ZKWriteBehindLag");
private final static MetricName LAST_APPLIED_RECORD_OFFSET = getMetricName(
"KafkaController", "LastAppliedRecordOffset");
private final static MetricName LAST_COMMITTED_RECORD_OFFSET = getMetricName(
@ -58,6 +60,7 @@ public class QuorumControllerMetrics implements AutoCloseable { @@ -58,6 +60,7 @@ public class QuorumControllerMetrics implements AutoCloseable {
private final AtomicLong lastAppliedRecordOffset = new AtomicLong(0);
private final AtomicLong lastCommittedRecordOffset = new AtomicLong(0);
private final AtomicLong lastAppliedRecordTimestamp = new AtomicLong(0);
private final AtomicLong dualWriteOffset = new AtomicLong(0);
private final Consumer<Long> eventQueueTimeUpdater;
private final Consumer<Long> eventQueueProcessingTimeUpdater;
private final AtomicLong timedOutHeartbeats = new AtomicLong(0);
@ -73,7 +76,8 @@ public class QuorumControllerMetrics implements AutoCloseable { @@ -73,7 +76,8 @@ public class QuorumControllerMetrics implements AutoCloseable {
public QuorumControllerMetrics(
Optional<MetricsRegistry> registry,
Time time
Time time,
boolean zkMigrationState
) {
this.registry = registry;
this.active = false;
@ -109,6 +113,18 @@ public class QuorumControllerMetrics implements AutoCloseable { @@ -109,6 +113,18 @@ public class QuorumControllerMetrics implements AutoCloseable {
return time.milliseconds() - lastAppliedRecordTimestamp();
}
}));
if (zkMigrationState) {
registry.ifPresent(r -> r.newGauge(ZK_WRITE_BEHIND_LAG, new Gauge<Long>() {
@Override
public Long value() {
// not in dual-write mode: set metric value to 0
if (dualWriteOffset() == 0) return 0L;
// in dual write mode
else return lastCommittedRecordOffset() - dualWriteOffset();
}
}));
}
}
public void setActive(boolean active) {
@ -151,6 +167,14 @@ public class QuorumControllerMetrics implements AutoCloseable { @@ -151,6 +167,14 @@ public class QuorumControllerMetrics implements AutoCloseable {
return lastAppliedRecordTimestamp.get();
}
public void updateDualWriteOffset(long offset) {
dualWriteOffset.set(offset);
}
public long dualWriteOffset() {
return dualWriteOffset.get();
}
public void incrementTimedOutHeartbeats() {
timedOutHeartbeats.addAndGet(1);
}
@ -172,7 +196,8 @@ public class QuorumControllerMetrics implements AutoCloseable { @@ -172,7 +196,8 @@ public class QuorumControllerMetrics implements AutoCloseable {
LAST_APPLIED_RECORD_OFFSET,
LAST_COMMITTED_RECORD_OFFSET,
LAST_APPLIED_RECORD_TIMESTAMP,
LAST_APPLIED_RECORD_LAG_MS
LAST_APPLIED_RECORD_LAG_MS,
ZK_WRITE_BEHIND_LAG
).forEach(r::removeMetric));
}

12
metadata/src/main/java/org/apache/kafka/metadata/migration/KRaftMigrationDriver.java

@ -20,6 +20,7 @@ import org.apache.kafka.common.utils.ExponentialBackoff; @@ -20,6 +20,7 @@ import org.apache.kafka.common.utils.ExponentialBackoff;
import org.apache.kafka.common.utils.LogContext;
import org.apache.kafka.common.utils.Time;
import org.apache.kafka.controller.QuorumFeatures;
import org.apache.kafka.controller.metrics.QuorumControllerMetrics;
import org.apache.kafka.image.MetadataDelta;
import org.apache.kafka.image.MetadataImage;
import org.apache.kafka.image.MetadataProvenance;
@ -97,6 +98,7 @@ public class KRaftMigrationDriver implements MetadataPublisher { @@ -97,6 +98,7 @@ public class KRaftMigrationDriver implements MetadataPublisher {
private final ZkRecordConsumer zkRecordConsumer;
private final KafkaEventQueue eventQueue;
private final PollTimeSupplier pollTimeSupplier;
private final QuorumControllerMetrics controllerMetrics;
private final FaultHandler faultHandler;
private final QuorumFeatures quorumFeatures;
private final RecordRedactor recordRedactor;
@ -119,6 +121,7 @@ public class KRaftMigrationDriver implements MetadataPublisher { @@ -119,6 +121,7 @@ public class KRaftMigrationDriver implements MetadataPublisher {
FaultHandler faultHandler,
QuorumFeatures quorumFeatures,
KafkaConfigSchema configSchema,
QuorumControllerMetrics controllerMetrics,
Time time
) {
this.nodeId = nodeId;
@ -127,6 +130,7 @@ public class KRaftMigrationDriver implements MetadataPublisher { @@ -127,6 +130,7 @@ public class KRaftMigrationDriver implements MetadataPublisher {
this.propagator = propagator;
this.time = time;
LogContext logContext = new LogContext("[KRaftMigrationDriver id=" + nodeId + "] ");
this.controllerMetrics = controllerMetrics;
this.log = logContext.logger(KRaftMigrationDriver.class);
this.migrationState = MigrationDriverState.UNINITIALIZED;
this.migrationLeadershipState = ZkMigrationLeadershipState.EMPTY;
@ -149,9 +153,10 @@ public class KRaftMigrationDriver implements MetadataPublisher { @@ -149,9 +153,10 @@ public class KRaftMigrationDriver implements MetadataPublisher {
Consumer<MetadataPublisher> initialZkLoadHandler,
FaultHandler faultHandler,
QuorumFeatures quorumFeatures,
KafkaConfigSchema configSchema
KafkaConfigSchema configSchema,
QuorumControllerMetrics controllerMetrics
) {
this(nodeId, zkRecordConsumer, zkMigrationClient, propagator, initialZkLoadHandler, faultHandler, quorumFeatures, configSchema, Time.SYSTEM);
this(nodeId, zkRecordConsumer, zkMigrationClient, propagator, initialZkLoadHandler, faultHandler, quorumFeatures, configSchema, controllerMetrics, Time.SYSTEM);
}
@ -497,6 +502,9 @@ public class KRaftMigrationDriver implements MetadataPublisher { @@ -497,6 +502,9 @@ public class KRaftMigrationDriver implements MetadataPublisher {
// Persist the offset of the metadata that was written to ZK
ZkMigrationLeadershipState zkStateAfterDualWrite = migrationLeadershipState.withKRaftMetadataOffsetAndEpoch(
image.highestOffsetAndEpoch().offset(), image.highestOffsetAndEpoch().epoch());
//update the dual write offset metric
controllerMetrics.updateDualWriteOffset(image.highestOffsetAndEpoch().offset());
applyMigrationOperation("Updating ZK migration state after " + metadataType,
state -> zkMigrationClient.setMigrationRecoveryState(zkStateAfterDualWrite));

2
metadata/src/test/java/org/apache/kafka/controller/QuorumControllerTest.java

@ -144,7 +144,7 @@ public class QuorumControllerTest { @@ -144,7 +144,7 @@ public class QuorumControllerTest {
final AtomicBoolean closed = new AtomicBoolean(false);
MockControllerMetrics() {
super(Optional.empty(), Time.SYSTEM);
super(Optional.empty(), Time.SYSTEM, false);
}
@Override

66
metadata/src/test/java/org/apache/kafka/controller/metrics/QuorumControllerMetricsTest.java

@ -33,11 +33,11 @@ import static org.junit.jupiter.api.Assertions.assertEquals; @@ -33,11 +33,11 @@ import static org.junit.jupiter.api.Assertions.assertEquals;
public class QuorumControllerMetricsTest {
@Test
public void testMetricNames() {
public void testMetricNamesNotInMigrationState() {
MetricsRegistry registry = new MetricsRegistry();
MockTime time = new MockTime();
try {
try (QuorumControllerMetrics metrics = new QuorumControllerMetrics(Optional.of(registry), time)) {
try (QuorumControllerMetrics metrics = new QuorumControllerMetrics(Optional.of(registry), time, false)) {
ControllerMetricsTestUtils.assertMetricsForTypeEqual(registry, "kafka.controller",
new HashSet<>(Arrays.asList(
"kafka.controller:type=ControllerEventManager,name=EventQueueTimeMs",
@ -57,11 +57,37 @@ public class QuorumControllerMetricsTest { @@ -57,11 +57,37 @@ public class QuorumControllerMetricsTest {
}
}
@Test
public void testMetricNamesInMigrationState() {
MetricsRegistry registry = new MetricsRegistry();
MockTime time = new MockTime();
try {
try (QuorumControllerMetrics metrics = new QuorumControllerMetrics(Optional.of(registry), time, true)) {
ControllerMetricsTestUtils.assertMetricsForTypeEqual(registry, "kafka.controller",
new HashSet<>(Arrays.asList(
"kafka.controller:type=ControllerEventManager,name=EventQueueTimeMs",
"kafka.controller:type=ControllerEventManager,name=EventQueueProcessingTimeMs",
"kafka.controller:type=KafkaController,name=ActiveControllerCount",
"kafka.controller:type=KafkaController,name=LastCommittedRecordOffset",
"kafka.controller:type=KafkaController,name=LastAppliedRecordLagMs",
"kafka.controller:type=KafkaController,name=LastAppliedRecordOffset",
"kafka.controller:type=KafkaController,name=LastAppliedRecordTimestamp",
"kafka.controller:type=KafkaController,name=LastAppliedRecordOffset",
"kafka.controller:type=KafkaController,name=ZKWriteBehindLag"
)));
}
ControllerMetricsTestUtils.assertMetricsForTypeEqual(registry, "kafka.controller",
Collections.emptySet());
} finally {
registry.shutdown();
}
}
@Test
public void testUpdateEventQueueTime() {
MetricsRegistry registry = new MetricsRegistry();
MockTime time = new MockTime();
try (QuorumControllerMetrics metrics = new QuorumControllerMetrics(Optional.of(registry), time)) {
try (QuorumControllerMetrics metrics = new QuorumControllerMetrics(Optional.of(registry), time, false)) {
metrics.updateEventQueueTime(1000);
assertMetricHistogram(registry, metricName("ControllerEventManager", "EventQueueTimeMs"), 1, 1000);
} finally {
@ -73,7 +99,7 @@ public class QuorumControllerMetricsTest { @@ -73,7 +99,7 @@ public class QuorumControllerMetricsTest {
public void testUpdateEventQueueProcessingTime() {
MetricsRegistry registry = new MetricsRegistry();
MockTime time = new MockTime();
try (QuorumControllerMetrics metrics = new QuorumControllerMetrics(Optional.of(registry), time)) {
try (QuorumControllerMetrics metrics = new QuorumControllerMetrics(Optional.of(registry), time, false)) {
metrics.updateEventQueueProcessingTime(1000);
assertMetricHistogram(registry, metricName("ControllerEventManager", "EventQueueProcessingTimeMs"), 1, 1000);
} finally {
@ -86,7 +112,7 @@ public class QuorumControllerMetricsTest { @@ -86,7 +112,7 @@ public class QuorumControllerMetricsTest {
MetricsRegistry registry = new MetricsRegistry();
MockTime time = new MockTime();
time.sleep(1000);
try (QuorumControllerMetrics metrics = new QuorumControllerMetrics(Optional.of(registry), time)) {
try (QuorumControllerMetrics metrics = new QuorumControllerMetrics(Optional.of(registry), time, false)) {
metrics.setLastAppliedRecordOffset(100);
metrics.setLastAppliedRecordTimestamp(500);
metrics.setLastCommittedRecordOffset(50);
@ -119,6 +145,36 @@ public class QuorumControllerMetricsTest { @@ -119,6 +145,36 @@ public class QuorumControllerMetricsTest {
}
}
@Test
public void testUpdateZKWriteBehindLag() {
MetricsRegistry registry = new MetricsRegistry();
MockTime time = new MockTime();
// test zkWriteBehindLag metric when NOT in dual-write mode
try (QuorumControllerMetrics metrics = new QuorumControllerMetrics(Optional.of(registry), time, true)) {
metrics.updateDualWriteOffset(0);
@SuppressWarnings("unchecked")
Gauge<Long> zkWriteBehindLag = (Gauge<Long>) registry
.allMetrics()
.get(metricName("KafkaController", "ZKWriteBehindLag"));
assertEquals(0, zkWriteBehindLag.value());
} finally {
registry.shutdown();
}
// test zkWriteBehindLag metric when in dual-write mode
try (QuorumControllerMetrics metrics = new QuorumControllerMetrics(Optional.of(registry), time, true)) {
metrics.updateDualWriteOffset(90);
metrics.setLastCommittedRecordOffset(100);
@SuppressWarnings("unchecked")
Gauge<Long> zkWriteBehindLag = (Gauge<Long>) registry
.allMetrics()
.get(metricName("KafkaController", "ZKWriteBehindLag"));
assertEquals(10, zkWriteBehindLag.value());
} finally {
registry.shutdown();
}
}
private static void assertMetricHistogram(MetricsRegistry registry, MetricName metricName, long count, double sum) {
Histogram histogram = (Histogram) registry.allMetrics().get(metricName);

44
metadata/src/test/java/org/apache/kafka/metadata/migration/KRaftMigrationDriverTest.java

@ -28,6 +28,7 @@ import org.apache.kafka.common.metadata.RegisterBrokerRecord; @@ -28,6 +28,7 @@ import org.apache.kafka.common.metadata.RegisterBrokerRecord;
import org.apache.kafka.common.utils.MockTime;
import org.apache.kafka.common.utils.Time;
import org.apache.kafka.controller.QuorumFeatures;
import org.apache.kafka.controller.metrics.QuorumControllerMetrics;
import org.apache.kafka.image.AclsImage;
import org.apache.kafka.image.ClientQuotasImage;
import org.apache.kafka.image.ClusterImage;
@ -63,9 +64,11 @@ import java.util.HashSet; @@ -63,9 +64,11 @@ import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.OptionalInt;
import java.util.Optional;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.function.Consumer;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
@ -88,6 +91,22 @@ public class KRaftMigrationDriverTest { @@ -88,6 +91,22 @@ public class KRaftMigrationDriverTest {
apiVersions,
QuorumFeatures.defaultFeatureMap(),
controllerNodes);
static class MockControllerMetrics extends QuorumControllerMetrics {
final AtomicBoolean closed = new AtomicBoolean(false);
MockControllerMetrics() {
super(Optional.empty(), Time.SYSTEM, false);
}
@Override
public void close() {
super.close();
closed.set(true);
}
}
MockControllerMetrics metrics = new MockControllerMetrics();
Time mockTime = new MockTime(1) {
public long nanoseconds() {
// We poll the event for each 1 sec, make it happen for each 10 ms to speed up the test
@ -216,9 +235,9 @@ public class KRaftMigrationDriverTest { @@ -216,9 +235,9 @@ public class KRaftMigrationDriverTest {
new MockFaultHandler("test"),
quorumFeatures,
KafkaConfigSchema.EMPTY,
metrics,
mockTime
)) {
MetadataImage image = MetadataImage.EMPTY;
MetadataDelta delta = new MetadataDelta(image);
@ -302,6 +321,7 @@ public class KRaftMigrationDriverTest { @@ -302,6 +321,7 @@ public class KRaftMigrationDriverTest {
faultHandler,
quorumFeatures,
KafkaConfigSchema.EMPTY,
metrics,
mockTime
)) {
MetadataImage image = MetadataImage.EMPTY;
@ -348,9 +368,9 @@ public class KRaftMigrationDriverTest { @@ -348,9 +368,9 @@ public class KRaftMigrationDriverTest {
new MockFaultHandler("test"),
quorumFeatures,
KafkaConfigSchema.EMPTY,
metrics,
mockTime
)) {
MetadataImage image = MetadataImage.EMPTY;
MetadataDelta delta = new MetadataDelta(image);
@ -387,15 +407,16 @@ public class KRaftMigrationDriverTest { @@ -387,15 +407,16 @@ public class KRaftMigrationDriverTest {
new CapturingTopicMigrationClient(), new CapturingConfigMigrationClient(), new CapturingAclMigrationClient());
MockFaultHandler faultHandler = new MockFaultHandler("testMigrationClientExpiration");
try (KRaftMigrationDriver driver = new KRaftMigrationDriver(
3000,
new NoOpRecordConsumer(),
migrationClient,
metadataPropagator,
metadataPublisher -> { },
faultHandler,
quorumFeatures,
KafkaConfigSchema.EMPTY,
mockTime
3000,
new NoOpRecordConsumer(),
migrationClient,
metadataPropagator,
metadataPublisher -> { },
faultHandler,
quorumFeatures,
KafkaConfigSchema.EMPTY,
metrics,
mockTime
)) {
MetadataImage image = MetadataImage.EMPTY;
MetadataDelta delta = new MetadataDelta(image);
@ -467,6 +488,7 @@ public class KRaftMigrationDriverTest { @@ -467,6 +488,7 @@ public class KRaftMigrationDriverTest {
new MockFaultHandler("test"),
quorumFeatures,
KafkaConfigSchema.EMPTY,
metrics,
mockTime
)) {
verifier.verify(driver, migrationClient, topicClient, configClient);

Loading…
Cancel
Save