From da314ee48c31f85e99301c37f26710f67383e8de Mon Sep 17 00:00:00 2001 From: mannoopj <139923522+mannoopj@users.noreply.github.com> Date: Mon, 16 Oct 2023 18:22:50 -0400 Subject: [PATCH] KAFKA-15532: non active controllers return 0 for ZkWriteBeforelag (#14478) Since only the active controller is performing the dual-write to ZK during a migration, it should be the only controller to report the ZkWriteBehindLag metric. Currently, if the controller fails over during a migration, the previous active controller will incorrectly report its last value for ZkWriteBehindLag forever. Instead, it should report zero. Reviewers: Colin P. McCabe , David Arthur --- .../metrics/QuorumControllerMetrics.java | 4 ++-- .../metrics/QuorumControllerMetricsTest.java | 16 ++++++++++++++++ 2 files changed, 18 insertions(+), 2 deletions(-) diff --git a/metadata/src/main/java/org/apache/kafka/controller/metrics/QuorumControllerMetrics.java b/metadata/src/main/java/org/apache/kafka/controller/metrics/QuorumControllerMetrics.java index 225d6d0fb8a..9ef3f79b572 100644 --- a/metadata/src/main/java/org/apache/kafka/controller/metrics/QuorumControllerMetrics.java +++ b/metadata/src/main/java/org/apache/kafka/controller/metrics/QuorumControllerMetrics.java @@ -160,8 +160,8 @@ public class QuorumControllerMetrics implements AutoCloseable { registry.ifPresent(r -> r.newGauge(ZK_WRITE_BEHIND_LAG, new Gauge() { @Override public Long value() { - // not in dual-write mode: set metric value to 0 - if (dualWriteOffset() == 0) return 0L; + // not in dual-write mode or not an active controller: set metric value to 0 + if (dualWriteOffset() == 0 || !active()) return 0L; // in dual write mode else return lastCommittedRecordOffset() - dualWriteOffset(); } diff --git a/metadata/src/test/java/org/apache/kafka/controller/metrics/QuorumControllerMetricsTest.java b/metadata/src/test/java/org/apache/kafka/controller/metrics/QuorumControllerMetricsTest.java index 936009e47d9..95b58e79233 100644 --- a/metadata/src/test/java/org/apache/kafka/controller/metrics/QuorumControllerMetricsTest.java +++ b/metadata/src/test/java/org/apache/kafka/controller/metrics/QuorumControllerMetricsTest.java @@ -106,6 +106,7 @@ public class QuorumControllerMetricsTest { metrics.setLastAppliedRecordTimestamp(500); metrics.setLastCommittedRecordOffset(50); metrics.updateDualWriteOffset(40L); + metrics.setActive(true); for (int i = 0; i < 2; i++) { metrics.incrementTimedOutHeartbeats(); } @@ -197,6 +198,7 @@ public class QuorumControllerMetricsTest { try (QuorumControllerMetrics metrics = new QuorumControllerMetrics(Optional.of(registry), time, true)) { metrics.updateDualWriteOffset(90); metrics.setLastCommittedRecordOffset(100); + metrics.setActive(true); @SuppressWarnings("unchecked") Gauge zkWriteBehindLag = (Gauge) registry .allMetrics() @@ -205,6 +207,20 @@ public class QuorumControllerMetricsTest { } finally { registry.shutdown(); } + + // test zkWriteBehindLag metric when in dual-write mode and not active + try (QuorumControllerMetrics metrics = new QuorumControllerMetrics(Optional.of(registry), time, true)) { + metrics.updateDualWriteOffset(90); + metrics.setLastCommittedRecordOffset(100); + metrics.setActive(false); + @SuppressWarnings("unchecked") + Gauge zkWriteBehindLag = (Gauge) registry + .allMetrics() + .get(metricName("KafkaController", "ZkWriteBehindLag")); + assertEquals(0, zkWriteBehindLag.value()); + } finally { + registry.shutdown(); + } } private static void assertMetricHistogram(MetricsRegistry registry, MetricName metricName, long count, double sum) {