Browse Source

KAFKA-15532: non active controllers return 0 for ZkWriteBeforelag (#14478)

Since only the active controller is performing the dual-write to ZK during a migration, it should be the only controller
to report the ZkWriteBehindLag metric.

Currently, if the controller fails over during a migration, the previous active controller will incorrectly report its last
value for ZkWriteBehindLag forever. Instead, it should report zero.

Reviewers: Colin P. McCabe <cmccabe@apache.org>, David Arthur <mumrah@gmail.com>
pull/14569/head
mannoopj 1 year ago committed by GitHub
parent
commit
da314ee48c
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 4
      metadata/src/main/java/org/apache/kafka/controller/metrics/QuorumControllerMetrics.java
  2. 16
      metadata/src/test/java/org/apache/kafka/controller/metrics/QuorumControllerMetricsTest.java

4
metadata/src/main/java/org/apache/kafka/controller/metrics/QuorumControllerMetrics.java

@ -160,8 +160,8 @@ public class QuorumControllerMetrics implements AutoCloseable { @@ -160,8 +160,8 @@ public class QuorumControllerMetrics implements AutoCloseable {
registry.ifPresent(r -> r.newGauge(ZK_WRITE_BEHIND_LAG, new Gauge<Long>() {
@Override
public Long value() {
// not in dual-write mode: set metric value to 0
if (dualWriteOffset() == 0) return 0L;
// not in dual-write mode or not an active controller: set metric value to 0
if (dualWriteOffset() == 0 || !active()) return 0L;
// in dual write mode
else return lastCommittedRecordOffset() - dualWriteOffset();
}

16
metadata/src/test/java/org/apache/kafka/controller/metrics/QuorumControllerMetricsTest.java

@ -106,6 +106,7 @@ public class QuorumControllerMetricsTest { @@ -106,6 +106,7 @@ public class QuorumControllerMetricsTest {
metrics.setLastAppliedRecordTimestamp(500);
metrics.setLastCommittedRecordOffset(50);
metrics.updateDualWriteOffset(40L);
metrics.setActive(true);
for (int i = 0; i < 2; i++) {
metrics.incrementTimedOutHeartbeats();
}
@ -197,6 +198,7 @@ public class QuorumControllerMetricsTest { @@ -197,6 +198,7 @@ public class QuorumControllerMetricsTest {
try (QuorumControllerMetrics metrics = new QuorumControllerMetrics(Optional.of(registry), time, true)) {
metrics.updateDualWriteOffset(90);
metrics.setLastCommittedRecordOffset(100);
metrics.setActive(true);
@SuppressWarnings("unchecked")
Gauge<Long> zkWriteBehindLag = (Gauge<Long>) registry
.allMetrics()
@ -205,6 +207,20 @@ public class QuorumControllerMetricsTest { @@ -205,6 +207,20 @@ public class QuorumControllerMetricsTest {
} finally {
registry.shutdown();
}
// test zkWriteBehindLag metric when in dual-write mode and not active
try (QuorumControllerMetrics metrics = new QuorumControllerMetrics(Optional.of(registry), time, true)) {
metrics.updateDualWriteOffset(90);
metrics.setLastCommittedRecordOffset(100);
metrics.setActive(false);
@SuppressWarnings("unchecked")
Gauge<Long> zkWriteBehindLag = (Gauge<Long>) registry
.allMetrics()
.get(metricName("KafkaController", "ZkWriteBehindLag"));
assertEquals(0, zkWriteBehindLag.value());
} finally {
registry.shutdown();
}
}
private static void assertMetricHistogram(MetricsRegistry registry, MetricName metricName, long count, double sum) {

Loading…
Cancel
Save