Browse Source

KAFKA-12697: Add FencedBrokerCount and ActiveBrokerCount metrics to the QuorumController (#10772)

Reviewers: Colin P. McCabe <cmccabe@apache.org>
pull/11429/head
Ryan Dielhenn 3 years ago committed by GitHub
parent
commit
3f433c0b4a
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 39
      metadata/src/main/java/org/apache/kafka/controller/ClusterControlManager.java
  2. 8
      metadata/src/main/java/org/apache/kafka/controller/ControllerMetrics.java
  3. 2
      metadata/src/main/java/org/apache/kafka/controller/QuorumController.java
  4. 43
      metadata/src/main/java/org/apache/kafka/controller/QuorumControllerMetrics.java
  5. 8
      metadata/src/test/java/org/apache/kafka/controller/ClusterControlManagerTest.java
  6. 24
      metadata/src/test/java/org/apache/kafka/controller/MockControllerMetrics.java
  7. 2
      metadata/src/test/java/org/apache/kafka/controller/ProducerIdControlManagerTest.java
  8. 39
      metadata/src/test/java/org/apache/kafka/controller/ReplicationControlManagerTest.java

39
metadata/src/main/java/org/apache/kafka/controller/ClusterControlManager.java

@ -116,6 +116,11 @@ public class ClusterControlManager { @@ -116,6 +116,11 @@ public class ClusterControlManager {
*/
private final TimelineHashMap<Integer, BrokerRegistration> brokerRegistrations;
/**
* A reference to the controller's metrics registry.
*/
private final ControllerMetrics controllerMetrics;
/**
* The broker heartbeat manager, or null if this controller is on standby.
*/
@ -131,7 +136,8 @@ public class ClusterControlManager { @@ -131,7 +136,8 @@ public class ClusterControlManager {
Time time,
SnapshotRegistry snapshotRegistry,
long sessionTimeoutNs,
ReplicaPlacer replicaPlacer) {
ReplicaPlacer replicaPlacer,
ControllerMetrics metrics) {
this.logContext = logContext;
this.log = logContext.logger(ClusterControlManager.class);
this.time = time;
@ -140,6 +146,7 @@ public class ClusterControlManager { @@ -140,6 +146,7 @@ public class ClusterControlManager {
this.brokerRegistrations = new TimelineHashMap<>(snapshotRegistry, 0);
this.heartbeatManager = null;
this.readyBrokersFuture = Optional.empty();
this.controllerMetrics = metrics;
}
/**
@ -249,11 +256,13 @@ public class ClusterControlManager { @@ -249,11 +256,13 @@ public class ClusterControlManager {
features.put(feature.name(), new VersionRange(
feature.minSupportedVersion(), feature.maxSupportedVersion()));
}
// Update broker registrations.
BrokerRegistration prevRegistration = brokerRegistrations.put(brokerId,
new BrokerRegistration(brokerId, record.brokerEpoch(),
record.incarnationId(), listeners, features,
Optional.ofNullable(record.rack()), record.fenced()));
updateMetrics(prevRegistration, brokerRegistrations.get(brokerId));
if (prevRegistration == null) {
log.info("Registered new broker: {}", record);
} else if (prevRegistration.incarnationId().equals(record.incarnationId())) {
@ -274,6 +283,7 @@ public class ClusterControlManager { @@ -274,6 +283,7 @@ public class ClusterControlManager {
"registration with that epoch found", record.toString()));
} else {
brokerRegistrations.remove(brokerId);
updateMetrics(registration, brokerRegistrations.get(brokerId));
log.info("Unregistered broker: {}", record);
}
}
@ -289,6 +299,7 @@ public class ClusterControlManager { @@ -289,6 +299,7 @@ public class ClusterControlManager {
"registration with that epoch found", record.toString()));
} else {
brokerRegistrations.put(brokerId, registration.cloneWithFencing(true));
updateMetrics(registration, brokerRegistrations.get(brokerId));
log.info("Fenced broker: {}", record);
}
}
@ -304,6 +315,7 @@ public class ClusterControlManager { @@ -304,6 +315,7 @@ public class ClusterControlManager {
"registration with that epoch found", record.toString()));
} else {
brokerRegistrations.put(brokerId, registration.cloneWithFencing(false));
updateMetrics(registration, brokerRegistrations.get(brokerId));
log.info("Unfenced broker: {}", record);
}
if (readyBrokersFuture.isPresent()) {
@ -314,6 +326,31 @@ public class ClusterControlManager { @@ -314,6 +326,31 @@ public class ClusterControlManager {
}
}
private void updateMetrics(BrokerRegistration prevRegistration, BrokerRegistration registration) {
if (registration == null) {
if (prevRegistration.fenced()) {
controllerMetrics.setFencedBrokerCount(controllerMetrics.fencedBrokerCount() - 1);
} else {
controllerMetrics.setActiveBrokerCount(controllerMetrics.activeBrokerCount() - 1);
}
} else if (prevRegistration == null) {
if (registration.fenced()) {
controllerMetrics.setFencedBrokerCount(controllerMetrics.fencedBrokerCount() + 1);
} else {
controllerMetrics.setActiveBrokerCount(controllerMetrics.activeBrokerCount() + 1);
}
} else {
if (prevRegistration.fenced() && !registration.fenced()) {
controllerMetrics.setFencedBrokerCount(controllerMetrics.fencedBrokerCount() - 1);
controllerMetrics.setActiveBrokerCount(controllerMetrics.activeBrokerCount() + 1);
} else if (!prevRegistration.fenced() && registration.fenced()) {
controllerMetrics.setFencedBrokerCount(controllerMetrics.fencedBrokerCount() + 1);
controllerMetrics.setActiveBrokerCount(controllerMetrics.activeBrokerCount() - 1);
}
}
}
public List<List<Integer>> placeReplicas(int startPartition,
int numPartitions,
short numReplicas) {

8
metadata/src/main/java/org/apache/kafka/controller/ControllerMetrics.java

@ -27,6 +27,14 @@ public interface ControllerMetrics extends AutoCloseable { @@ -27,6 +27,14 @@ public interface ControllerMetrics extends AutoCloseable {
void updateEventQueueProcessingTime(long durationMs);
void setFencedBrokerCount(int brokerCount);
int fencedBrokerCount();
void setActiveBrokerCount(int brokerCount);
int activeBrokerCount();
void setGlobalTopicsCount(int topicCount);
int globalTopicsCount();

2
metadata/src/main/java/org/apache/kafka/controller/QuorumController.java

@ -1125,7 +1125,7 @@ public final class QuorumController implements Controller { @@ -1125,7 +1125,7 @@ public final class QuorumController implements Controller {
snapshotRegistry, configDefs, alterConfigPolicy);
this.clientQuotaControlManager = new ClientQuotaControlManager(snapshotRegistry);
this.clusterControl = new ClusterControlManager(logContext, time,
snapshotRegistry, sessionTimeoutNs, replicaPlacer);
snapshotRegistry, sessionTimeoutNs, replicaPlacer, controllerMetrics);
this.featureControl = new FeatureControlManager(supportedFeatures, snapshotRegistry);
this.producerIdControlManager = new ProducerIdControlManager(clusterControl, snapshotRegistry);
this.snapshotMaxNewRecordBytes = snapshotMaxNewRecordBytes;

43
metadata/src/main/java/org/apache/kafka/controller/QuorumControllerMetrics.java

@ -32,6 +32,10 @@ public final class QuorumControllerMetrics implements ControllerMetrics { @@ -32,6 +32,10 @@ public final class QuorumControllerMetrics implements ControllerMetrics {
"ControllerEventManager", "EventQueueTimeMs");
private final static MetricName EVENT_QUEUE_PROCESSING_TIME_MS = getMetricName(
"ControllerEventManager", "EventQueueProcessingTimeMs");
private final static MetricName FENCED_BROKER_COUNT = getMetricName(
"KafkaController", "FencedBrokerCount");
private final static MetricName ACTIVE_BROKER_COUNT = getMetricName(
"KafkaController", "ActiveBrokerCount");
private final static MetricName GLOBAL_TOPIC_COUNT = getMetricName(
"KafkaController", "GlobalTopicCount");
private final static MetricName GLOBAL_PARTITION_COUNT = getMetricName(
@ -40,14 +44,18 @@ public final class QuorumControllerMetrics implements ControllerMetrics { @@ -40,14 +44,18 @@ public final class QuorumControllerMetrics implements ControllerMetrics {
"KafkaController", "OfflinePartitionsCount");
private final static MetricName PREFERRED_REPLICA_IMBALANCE_COUNT = getMetricName(
"KafkaController", "PreferredReplicaImbalanceCount");
private final MetricsRegistry registry;
private volatile boolean active;
private volatile int fencedBrokerCount;
private volatile int activeBrokerCount;
private volatile int globalTopicCount;
private volatile int globalPartitionCount;
private volatile int offlinePartitionCount;
private volatile int preferredReplicaImbalanceCount;
private final Gauge<Integer> activeControllerCount;
private final Gauge<Integer> fencedBrokerCountGauge;
private final Gauge<Integer> activeBrokerCountGauge;
private final Gauge<Integer> globalPartitionCountGauge;
private final Gauge<Integer> globalTopicCountGauge;
private final Gauge<Integer> offlinePartitionCountGauge;
@ -58,6 +66,8 @@ public final class QuorumControllerMetrics implements ControllerMetrics { @@ -58,6 +66,8 @@ public final class QuorumControllerMetrics implements ControllerMetrics {
public QuorumControllerMetrics(MetricsRegistry registry) {
this.registry = Objects.requireNonNull(registry);
this.active = false;
this.fencedBrokerCount = 0;
this.activeBrokerCount = 0;
this.globalTopicCount = 0;
this.globalPartitionCount = 0;
this.offlinePartitionCount = 0;
@ -70,6 +80,18 @@ public final class QuorumControllerMetrics implements ControllerMetrics { @@ -70,6 +80,18 @@ public final class QuorumControllerMetrics implements ControllerMetrics {
});
this.eventQueueTime = registry.newHistogram(EVENT_QUEUE_TIME_MS, true);
this.eventQueueProcessingTime = registry.newHistogram(EVENT_QUEUE_PROCESSING_TIME_MS, true);
this.fencedBrokerCountGauge = registry.newGauge(FENCED_BROKER_COUNT, new Gauge<Integer>() {
@Override
public Integer value() {
return fencedBrokerCount;
}
});
this.activeBrokerCountGauge = registry.newGauge(ACTIVE_BROKER_COUNT, new Gauge<Integer>() {
@Override
public Integer value() {
return activeBrokerCount;
}
});
this.globalTopicCountGauge = registry.newGauge(GLOBAL_TOPIC_COUNT, new Gauge<Integer>() {
@Override
public Integer value() {
@ -116,6 +138,25 @@ public final class QuorumControllerMetrics implements ControllerMetrics { @@ -116,6 +138,25 @@ public final class QuorumControllerMetrics implements ControllerMetrics {
eventQueueTime.update(durationMs);
}
@Override
public void setFencedBrokerCount(int brokerCount) {
this.fencedBrokerCount = brokerCount;
}
@Override
public int fencedBrokerCount() {
return this.fencedBrokerCount;
}
public void setActiveBrokerCount(int brokerCount) {
this.activeBrokerCount = brokerCount;
}
@Override
public int activeBrokerCount() {
return this.activeBrokerCount;
}
@Override
public void setGlobalTopicsCount(int topicCount) {
this.globalTopicCount = topicCount;

8
metadata/src/test/java/org/apache/kafka/controller/ClusterControlManagerTest.java

@ -58,7 +58,7 @@ public class ClusterControlManagerTest { @@ -58,7 +58,7 @@ public class ClusterControlManagerTest {
SnapshotRegistry snapshotRegistry = new SnapshotRegistry(new LogContext());
ClusterControlManager clusterControl = new ClusterControlManager(
new LogContext(), time, snapshotRegistry, 1000,
new StripedReplicaPlacer(new Random()));
new StripedReplicaPlacer(new Random()), new MockControllerMetrics());
clusterControl.activate();
assertFalse(clusterControl.unfenced(0));
@ -99,7 +99,7 @@ public class ClusterControlManagerTest { @@ -99,7 +99,7 @@ public class ClusterControlManagerTest {
SnapshotRegistry snapshotRegistry = new SnapshotRegistry(new LogContext());
ClusterControlManager clusterControl = new ClusterControlManager(
new LogContext(), new MockTime(0, 0, 0), snapshotRegistry, 1000,
new StripedReplicaPlacer(new Random()));
new StripedReplicaPlacer(new Random()), new MockControllerMetrics());
clusterControl.activate();
clusterControl.replay(brokerRecord);
assertEquals(new BrokerRegistration(1, 100,
@ -122,7 +122,7 @@ public class ClusterControlManagerTest { @@ -122,7 +122,7 @@ public class ClusterControlManagerTest {
MockRandom random = new MockRandom();
ClusterControlManager clusterControl = new ClusterControlManager(
new LogContext(), time, snapshotRegistry, 1000,
new StripedReplicaPlacer(random));
new StripedReplicaPlacer(random), new MockControllerMetrics());
clusterControl.activate();
for (int i = 0; i < numUsableBrokers; i++) {
RegisterBrokerRecord brokerRecord =
@ -159,7 +159,7 @@ public class ClusterControlManagerTest { @@ -159,7 +159,7 @@ public class ClusterControlManagerTest {
SnapshotRegistry snapshotRegistry = new SnapshotRegistry(new LogContext());
ClusterControlManager clusterControl = new ClusterControlManager(
new LogContext(), time, snapshotRegistry, 1000,
new StripedReplicaPlacer(new Random()));
new StripedReplicaPlacer(new Random()), new MockControllerMetrics());
clusterControl.activate();
assertFalse(clusterControl.unfenced(0));
for (int i = 0; i < 3; i++) {

24
metadata/src/test/java/org/apache/kafka/controller/MockControllerMetrics.java

@ -19,6 +19,8 @@ package org.apache.kafka.controller; @@ -19,6 +19,8 @@ package org.apache.kafka.controller;
public final class MockControllerMetrics implements ControllerMetrics {
private volatile boolean active;
private volatile int fencedBrokers;
private volatile int activeBrokers;
private volatile int topics;
private volatile int partitions;
private volatile int offlinePartitions;
@ -27,6 +29,8 @@ public final class MockControllerMetrics implements ControllerMetrics { @@ -27,6 +29,8 @@ public final class MockControllerMetrics implements ControllerMetrics {
public MockControllerMetrics() {
this.active = false;
this.fencedBrokers = 0;
this.activeBrokers = 0;
this.topics = 0;
this.partitions = 0;
this.offlinePartitions = 0;
@ -53,6 +57,26 @@ public final class MockControllerMetrics implements ControllerMetrics { @@ -53,6 +57,26 @@ public final class MockControllerMetrics implements ControllerMetrics {
// nothing to do
}
@Override
public void setFencedBrokerCount(int brokerCount) {
this.fencedBrokers = brokerCount;
}
@Override
public int fencedBrokerCount() {
return this.fencedBrokers;
}
@Override
public void setActiveBrokerCount(int brokerCount) {
this.activeBrokers = brokerCount;
}
@Override
public int activeBrokerCount() {
return activeBrokers;
}
@Override
public void setGlobalTopicsCount(int topicCount) {
this.topics = topicCount;

2
metadata/src/test/java/org/apache/kafka/controller/ProducerIdControlManagerTest.java

@ -54,7 +54,7 @@ public class ProducerIdControlManagerTest { @@ -54,7 +54,7 @@ public class ProducerIdControlManagerTest {
snapshotRegistry = new SnapshotRegistry(logContext);
clusterControl = new ClusterControlManager(
logContext, time, snapshotRegistry, 1000,
new StripedReplicaPlacer(random));
new StripedReplicaPlacer(random), new MockControllerMetrics());
clusterControl.activate();
for (int i = 0; i < 4; i++) {

39
metadata/src/test/java/org/apache/kafka/controller/ReplicationControlManagerTest.java

@ -132,10 +132,10 @@ public class ReplicationControlManagerTest { @@ -132,10 +132,10 @@ public class ReplicationControlManagerTest {
final LogContext logContext = new LogContext();
final MockTime time = new MockTime();
final MockRandom random = new MockRandom();
final ControllerMetrics metrics = new MockControllerMetrics();
final ClusterControlManager clusterControl = new ClusterControlManager(
logContext, time, snapshotRegistry, TimeUnit.MILLISECONDS.convert(BROKER_SESSION_TIMEOUT_MS, TimeUnit.NANOSECONDS),
new StripedReplicaPlacer(random));
final ControllerMetrics metrics = new MockControllerMetrics();
new StripedReplicaPlacer(random), metrics);
final ConfigurationControlManager configurationControl = new ConfigurationControlManager(
new LogContext(), snapshotRegistry, Collections.emptyMap(), Optional.empty());
final ReplicationControlManager replicationControl;
@ -429,6 +429,41 @@ public class ReplicationControlManagerTest { @@ -429,6 +429,41 @@ public class ReplicationControlManagerTest {
ctx.replicationControl.iterator(Long.MAX_VALUE));
}
@Test
public void testBrokerCountMetrics() throws Exception {
ReplicationControlTestContext ctx = new ReplicationControlTestContext();
ReplicationControlManager replicationControl = ctx.replicationControl;
ctx.registerBrokers(0);
assertEquals(1, ctx.metrics.fencedBrokerCount());
assertEquals(0, ctx.metrics.activeBrokerCount());
ctx.unfenceBrokers(0);
assertEquals(0, ctx.metrics.fencedBrokerCount());
assertEquals(1, ctx.metrics.activeBrokerCount());
ctx.registerBrokers(1);
ctx.unfenceBrokers(1);
assertEquals(2, ctx.metrics.activeBrokerCount());
ctx.registerBrokers(2);
ctx.unfenceBrokers(2);
assertEquals(0, ctx.metrics.fencedBrokerCount());
assertEquals(3, ctx.metrics.activeBrokerCount());
ControllerResult<Void> result = replicationControl.unregisterBroker(0);
ctx.replay(result.records());
result = replicationControl.unregisterBroker(2);
ctx.replay(result.records());
assertEquals(0, ctx.metrics.fencedBrokerCount());
assertEquals(1, ctx.metrics.activeBrokerCount());
}
@Test
public void testCreateTopicsWithValidateOnlyFlag() throws Exception {
ReplicationControlTestContext ctx = new ReplicationControlTestContext();

Loading…
Cancel
Save