Browse Source

KAFKA-15189: only init remote topic metrics when enabled (#14133)

Only initialize remote topic metrics when system-wise remote storage is enabled to avoid impacting performance for existing brokers. Also add tests.

Reviewers: Divij Vaidya <diviv@amazon.com>, Kamal Chandraprakash <kamal.chandraprakash@gmail.com>
pull/10246/merge
Luke Chen 1 year ago committed by GitHub
parent
commit
748175ce62
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 2
      build.gradle
  2. 4
      checkstyle/import-control.xml
  3. 10
      core/src/main/java/kafka/log/remote/RemoteLogManager.java
  4. 3
      core/src/main/java/kafka/server/builders/KafkaApisBuilder.java
  5. 3
      core/src/main/java/kafka/server/builders/ReplicaManagerBuilder.java
  6. 3
      core/src/main/scala/kafka/server/BrokerServer.scala
  7. 61
      core/src/main/scala/kafka/server/KafkaRequestHandler.scala
  8. 2
      core/src/main/scala/kafka/server/KafkaServer.scala
  9. 21
      core/src/test/java/kafka/log/remote/RemoteLogManagerTest.java
  10. 9
      core/src/test/java/kafka/log/remote/RemoteLogReaderTest.java
  11. 30
      core/src/test/scala/integration/kafka/api/MetricsTest.scala
  12. 23
      core/src/test/scala/kafka/server/KafkaRequestHandlerTest.scala
  13. 3
      core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala
  14. 4
      core/src/test/scala/unit/kafka/utils/TestUtils.scala
  15. 3
      jmh-benchmarks/src/main/java/org/apache/kafka/jmh/fetcher/ReplicaFetcherThreadBenchmark.java
  16. 2
      jmh-benchmarks/src/main/java/org/apache/kafka/jmh/metadata/KRaftMetadataRequestBenchmark.java
  17. 2
      jmh-benchmarks/src/main/java/org/apache/kafka/jmh/metadata/MetadataRequestBenchmark.java
  18. 2
      jmh-benchmarks/src/main/java/org/apache/kafka/jmh/partition/PartitionMakeFollowerBenchmark.java
  19. 2
      jmh-benchmarks/src/main/java/org/apache/kafka/jmh/partition/UpdateFollowerFetchStateBenchmark.java
  20. 3
      jmh-benchmarks/src/main/java/org/apache/kafka/jmh/record/BaseRecordBatchBenchmark.java
  21. 3
      jmh-benchmarks/src/main/java/org/apache/kafka/jmh/server/CheckpointBench.java
  22. 3
      jmh-benchmarks/src/main/java/org/apache/kafka/jmh/server/PartitionCreationBench.java
  23. 97
      storage/api/src/main/java/org/apache/kafka/server/log/remote/storage/RemoteStorageMetrics.java
  24. 22
      storage/src/main/java/org/apache/kafka/storage/internals/log/RemoteStorageThreadPool.java

2
build.gradle

@ -1638,6 +1638,8 @@ project(':storage:api') { @@ -1638,6 +1638,8 @@ project(':storage:api') {
dependencies {
implementation project(':clients')
implementation project(':server-common')
implementation libs.metrics
implementation libs.slf4jApi
testImplementation project(':clients')

4
checkstyle/import-control.xml

@ -261,6 +261,10 @@ @@ -261,6 +261,10 @@
<allow pkg="org.apache.kafka.storage"/>
<subpackage name="remote">
<allow pkg="scala.collection" />
<subpackage name="storage">
<allow pkg="com.yammer.metrics.core" />
<allow pkg="org.apache.kafka.server.metrics" />
</subpackage>
</subpackage>
</subpackage>

10
core/src/main/java/kafka/log/remote/RemoteLogManager.java

@ -111,6 +111,7 @@ import java.util.stream.Collectors; @@ -111,6 +111,7 @@ import java.util.stream.Collectors;
import java.util.stream.Stream;
import static org.apache.kafka.server.log.remote.metadata.storage.TopicBasedRemoteLogMetadataManagerConfig.REMOTE_LOG_METADATA_COMMON_CLIENT_PREFIX;
import static org.apache.kafka.server.log.remote.storage.RemoteStorageMetrics.REMOTE_LOG_MANAGER_TASKS_AVG_IDLE_PERCENT_METRIC;
/**
* This class is responsible for
@ -123,8 +124,6 @@ public class RemoteLogManager implements Closeable { @@ -123,8 +124,6 @@ public class RemoteLogManager implements Closeable {
private static final Logger LOGGER = LoggerFactory.getLogger(RemoteLogManager.class);
private static final String REMOTE_LOG_READER_THREAD_NAME_PREFIX = "remote-log-reader";
public static final String REMOTE_LOG_READER_METRICS_NAME_PREFIX = "RemoteLogReader";
public static final String REMOTE_LOG_MANAGER_TASKS_AVG_IDLE_PERCENT = "RemoteLogManagerTasksAvgIdlePercent";
private final RemoteLogManagerConfig rlmConfig;
private final int brokerId;
private final String logDir;
@ -185,7 +184,7 @@ public class RemoteLogManager implements Closeable { @@ -185,7 +184,7 @@ public class RemoteLogManager implements Closeable {
delayInMs = rlmConfig.remoteLogManagerTaskIntervalMs();
rlmScheduledThreadPool = new RLMScheduledThreadPool(rlmConfig.remoteLogManagerThreadPoolSize());
metricsGroup.newGauge(REMOTE_LOG_MANAGER_TASKS_AVG_IDLE_PERCENT, new Gauge<Double>() {
metricsGroup.newGauge(REMOTE_LOG_MANAGER_TASKS_AVG_IDLE_PERCENT_METRIC.getName(), new Gauge<Double>() {
@Override
public Double value() {
return rlmScheduledThreadPool.getIdlePercent();
@ -195,13 +194,12 @@ public class RemoteLogManager implements Closeable { @@ -195,13 +194,12 @@ public class RemoteLogManager implements Closeable {
remoteStorageReaderThreadPool = new RemoteStorageThreadPool(
REMOTE_LOG_READER_THREAD_NAME_PREFIX,
rlmConfig.remoteLogReaderThreads(),
rlmConfig.remoteLogReaderMaxPendingTasks(),
REMOTE_LOG_READER_METRICS_NAME_PREFIX
rlmConfig.remoteLogReaderMaxPendingTasks()
);
}
private void removeMetrics() {
metricsGroup.removeMetric(REMOTE_LOG_MANAGER_TASKS_AVG_IDLE_PERCENT);
metricsGroup.removeMetric(REMOTE_LOG_MANAGER_TASKS_AVG_IDLE_PERCENT_METRIC.getName());
remoteStorageReaderThreadPool.removeMetrics();
}

3
core/src/main/java/kafka/server/builders/KafkaApisBuilder.java

@ -38,6 +38,7 @@ import org.apache.kafka.server.authorizer.Authorizer; @@ -38,6 +38,7 @@ import org.apache.kafka.server.authorizer.Authorizer;
import java.util.Collections;
import java.util.Optional;
import scala.compat.java8.OptionConverters;
@ -171,7 +172,7 @@ public class KafkaApisBuilder { @@ -171,7 +172,7 @@ public class KafkaApisBuilder {
if (metrics == null) throw new RuntimeException("You must set metrics");
if (quotas == null) throw new RuntimeException("You must set quotas");
if (fetchManager == null) throw new RuntimeException("You must set fetchManager");
if (brokerTopicStats == null) brokerTopicStats = new BrokerTopicStats();
if (brokerTopicStats == null) brokerTopicStats = new BrokerTopicStats(Optional.of(config));
if (apiVersionManager == null) throw new RuntimeException("You must set apiVersionManager");
return new KafkaApis(requestChannel,

3
core/src/main/java/kafka/server/builders/ReplicaManagerBuilder.java

@ -54,7 +54,7 @@ public class ReplicaManagerBuilder { @@ -54,7 +54,7 @@ public class ReplicaManagerBuilder {
private MetadataCache metadataCache = null;
private LogDirFailureChannel logDirFailureChannel = null;
private AlterPartitionManager alterPartitionManager = null;
private BrokerTopicStats brokerTopicStats = new BrokerTopicStats();
private BrokerTopicStats brokerTopicStats = null;
private AtomicBoolean isShuttingDown = new AtomicBoolean(false);
private Optional<RemoteLogManager> remoteLogManager = Optional.empty();
private Optional<KafkaZkClient> zkClient = Optional.empty();
@ -179,6 +179,7 @@ public class ReplicaManagerBuilder { @@ -179,6 +179,7 @@ public class ReplicaManagerBuilder {
if (metadataCache == null) throw new RuntimeException("You must set metadataCache");
if (logDirFailureChannel == null) throw new RuntimeException("You must set logDirFailureChannel");
if (alterPartitionManager == null) throw new RuntimeException("You must set alterIsrManager");
if (brokerTopicStats == null) brokerTopicStats = new BrokerTopicStats(Optional.of(config));
return new ReplicaManager(config,
metrics,
time,

3
core/src/main/scala/kafka/server/BrokerServer.scala

@ -188,8 +188,7 @@ class BrokerServer( @@ -188,8 +188,7 @@ class BrokerServer(
kafkaScheduler.startup()
/* register broker metrics */
brokerTopicStats = new BrokerTopicStats
brokerTopicStats = new BrokerTopicStats(java.util.Optional.of(config))
quotaManagers = QuotaFactory.instantiate(config, metrics, time, s"broker-${config.nodeId}-")

61
core/src/main/scala/kafka/server/KafkaRequestHandler.scala

@ -26,6 +26,7 @@ import java.util.concurrent.atomic.AtomicInteger @@ -26,6 +26,7 @@ import java.util.concurrent.atomic.AtomicInteger
import com.yammer.metrics.core.Meter
import org.apache.kafka.common.internals.FatalExitError
import org.apache.kafka.common.utils.{KafkaThread, Time}
import org.apache.kafka.server.log.remote.storage.RemoteStorageMetrics
import org.apache.kafka.server.metrics.KafkaMetricsGroup
import java.util.Collections
@ -227,7 +228,7 @@ class KafkaRequestHandlerPool(val brokerId: Int, @@ -227,7 +228,7 @@ class KafkaRequestHandlerPool(val brokerId: Int,
}
}
class BrokerTopicMetrics(name: Option[String]) {
class BrokerTopicMetrics(name: Option[String], configOpt: java.util.Optional[KafkaConfig]) {
private val metricsGroup = new KafkaMetricsGroup(this.getClass)
val tags: java.util.Map[String, String] = name match {
@ -277,17 +278,12 @@ class BrokerTopicMetrics(name: Option[String]) { @@ -277,17 +278,12 @@ class BrokerTopicMetrics(name: Option[String]) {
BrokerTopicStats.TotalFetchRequestsPerSec -> MeterWrapper(BrokerTopicStats.TotalFetchRequestsPerSec, "requests"),
BrokerTopicStats.FetchMessageConversionsPerSec -> MeterWrapper(BrokerTopicStats.FetchMessageConversionsPerSec, "requests"),
BrokerTopicStats.ProduceMessageConversionsPerSec -> MeterWrapper(BrokerTopicStats.ProduceMessageConversionsPerSec, "requests"),
BrokerTopicStats.RemoteCopyBytesPerSec -> MeterWrapper(BrokerTopicStats.RemoteCopyBytesPerSec, "bytes"),
BrokerTopicStats.RemoteFetchBytesPerSec -> MeterWrapper(BrokerTopicStats.RemoteFetchBytesPerSec, "bytes"),
BrokerTopicStats.RemoteFetchRequestsPerSec -> MeterWrapper(BrokerTopicStats.RemoteFetchRequestsPerSec, "requests"),
BrokerTopicStats.RemoteCopyRequestsPerSec -> MeterWrapper(BrokerTopicStats.RemoteCopyRequestsPerSec, "requests"),
BrokerTopicStats.FailedRemoteFetchRequestsPerSec -> MeterWrapper(BrokerTopicStats.FailedRemoteFetchRequestsPerSec, "requests"),
BrokerTopicStats.FailedRemoteCopyRequestsPerSec -> MeterWrapper(BrokerTopicStats.FailedRemoteCopyRequestsPerSec, "requests"),
BrokerTopicStats.NoKeyCompactedTopicRecordsPerSec -> MeterWrapper(BrokerTopicStats.NoKeyCompactedTopicRecordsPerSec, "requests"),
BrokerTopicStats.InvalidMagicNumberRecordsPerSec -> MeterWrapper(BrokerTopicStats.InvalidMagicNumberRecordsPerSec, "requests"),
BrokerTopicStats.InvalidMessageCrcRecordsPerSec -> MeterWrapper(BrokerTopicStats.InvalidMessageCrcRecordsPerSec, "requests"),
BrokerTopicStats.InvalidOffsetOrSequenceRecordsPerSec -> MeterWrapper(BrokerTopicStats.InvalidOffsetOrSequenceRecordsPerSec, "requests")
).asJava)
if (name.isEmpty) {
metricTypeMap.put(BrokerTopicStats.ReplicationBytesInPerSec, MeterWrapper(BrokerTopicStats.ReplicationBytesInPerSec, "bytes"))
metricTypeMap.put(BrokerTopicStats.ReplicationBytesOutPerSec, MeterWrapper(BrokerTopicStats.ReplicationBytesOutPerSec, "bytes"))
@ -295,6 +291,18 @@ class BrokerTopicMetrics(name: Option[String]) { @@ -295,6 +291,18 @@ class BrokerTopicMetrics(name: Option[String]) {
metricTypeMap.put(BrokerTopicStats.ReassignmentBytesOutPerSec, MeterWrapper(BrokerTopicStats.ReassignmentBytesOutPerSec, "bytes"))
}
configOpt.ifPresent(config =>
if (config.remoteLogManagerConfig.enableRemoteStorageSystem()) {
metricTypeMap.putAll(Map(
RemoteStorageMetrics.REMOTE_COPY_BYTES_PER_SEC_METRIC.getName -> MeterWrapper(RemoteStorageMetrics.REMOTE_COPY_BYTES_PER_SEC_METRIC.getName, "bytes"),
RemoteStorageMetrics.REMOTE_FETCH_BYTES_PER_SEC_METRIC.getName -> MeterWrapper(RemoteStorageMetrics.REMOTE_FETCH_BYTES_PER_SEC_METRIC.getName, "bytes"),
RemoteStorageMetrics.REMOTE_FETCH_REQUESTS_PER_SEC_METRIC.getName -> MeterWrapper(RemoteStorageMetrics.REMOTE_FETCH_REQUESTS_PER_SEC_METRIC.getName, "requests"),
RemoteStorageMetrics.REMOTE_COPY_REQUESTS_PER_SEC_METRIC.getName -> MeterWrapper(RemoteStorageMetrics.REMOTE_COPY_REQUESTS_PER_SEC_METRIC.getName, "requests"),
RemoteStorageMetrics.FAILED_REMOTE_FETCH_PER_SEC_METRIC.getName -> MeterWrapper(RemoteStorageMetrics.FAILED_REMOTE_FETCH_PER_SEC_METRIC.getName, "requests"),
RemoteStorageMetrics.FAILED_REMOTE_COPY_PER_SEC_METRIC.getName -> MeterWrapper(RemoteStorageMetrics.FAILED_REMOTE_COPY_PER_SEC_METRIC.getName, "requests")
).asJava)
})
// used for testing only
def metricMap: Map[String, MeterWrapper] = metricTypeMap.toMap
@ -342,17 +350,17 @@ class BrokerTopicMetrics(name: Option[String]) { @@ -342,17 +350,17 @@ class BrokerTopicMetrics(name: Option[String]) {
def invalidOffsetOrSequenceRecordsPerSec: Meter = metricTypeMap.get(BrokerTopicStats.InvalidOffsetOrSequenceRecordsPerSec).meter()
def remoteCopyBytesRate: Meter = metricTypeMap.get(BrokerTopicStats.RemoteCopyBytesPerSec).meter()
def remoteCopyBytesRate: Meter = metricTypeMap.get(RemoteStorageMetrics.REMOTE_COPY_BYTES_PER_SEC_METRIC.getName).meter()
def remoteFetchBytesRate: Meter = metricTypeMap.get(BrokerTopicStats.RemoteFetchBytesPerSec).meter()
def remoteFetchBytesRate: Meter = metricTypeMap.get(RemoteStorageMetrics.REMOTE_FETCH_BYTES_PER_SEC_METRIC.getName).meter()
def remoteFetchRequestRate: Meter = metricTypeMap.get(BrokerTopicStats.RemoteFetchRequestsPerSec).meter()
def remoteFetchRequestRate: Meter = metricTypeMap.get(RemoteStorageMetrics.REMOTE_FETCH_REQUESTS_PER_SEC_METRIC.getName).meter()
def remoteCopyRequestRate: Meter = metricTypeMap.get(BrokerTopicStats.RemoteCopyRequestsPerSec).meter()
def remoteCopyRequestRate: Meter = metricTypeMap.get(RemoteStorageMetrics.REMOTE_COPY_REQUESTS_PER_SEC_METRIC.getName).meter()
def failedRemoteFetchRequestRate: Meter = metricTypeMap.get(BrokerTopicStats.FailedRemoteFetchRequestsPerSec).meter()
def failedRemoteFetchRequestRate: Meter = metricTypeMap.get(RemoteStorageMetrics.FAILED_REMOTE_FETCH_PER_SEC_METRIC.getName).meter()
def failedRemoteCopyRequestRate: Meter = metricTypeMap.get(BrokerTopicStats.FailedRemoteCopyRequestsPerSec).meter()
def failedRemoteCopyRequestRate: Meter = metricTypeMap.get(RemoteStorageMetrics.FAILED_REMOTE_COPY_PER_SEC_METRIC.getName).meter()
def closeMetric(metricType: String): Unit = {
val meter = metricTypeMap.get(metricType)
@ -378,27 +386,18 @@ object BrokerTopicStats { @@ -378,27 +386,18 @@ object BrokerTopicStats {
val ProduceMessageConversionsPerSec = "ProduceMessageConversionsPerSec"
val ReassignmentBytesInPerSec = "ReassignmentBytesInPerSec"
val ReassignmentBytesOutPerSec = "ReassignmentBytesOutPerSec"
val RemoteCopyBytesPerSec = "RemoteCopyBytesPerSec"
val RemoteFetchBytesPerSec = "RemoteFetchBytesPerSec"
val RemoteFetchRequestsPerSec = "RemoteFetchRequestsPerSec"
val RemoteCopyRequestsPerSec = "RemoteCopyRequestsPerSec"
val FailedRemoteFetchRequestsPerSec = "RemoteFetchErrorsPerSec"
val FailedRemoteCopyRequestsPerSec = "RemoteCopyErrorsPerSec"
// These following topics are for LogValidator for better debugging on failed records
val NoKeyCompactedTopicRecordsPerSec = "NoKeyCompactedTopicRecordsPerSec"
val InvalidMagicNumberRecordsPerSec = "InvalidMagicNumberRecordsPerSec"
val InvalidMessageCrcRecordsPerSec = "InvalidMessageCrcRecordsPerSec"
val InvalidOffsetOrSequenceRecordsPerSec = "InvalidOffsetOrSequenceRecordsPerSec"
private val valueFactory = (k: String) => new BrokerTopicMetrics(Some(k))
}
class BrokerTopicStats extends Logging {
import BrokerTopicStats._
class BrokerTopicStats(configOpt: java.util.Optional[KafkaConfig] = java.util.Optional.empty()) extends Logging {
private val valueFactory = (k: String) => new BrokerTopicMetrics(Some(k), configOpt)
private val stats = new Pool[String, BrokerTopicMetrics](Some(valueFactory))
val allTopicsStats = new BrokerTopicMetrics(None)
val allTopicsStats = new BrokerTopicMetrics(None, configOpt)
def topicStats(topic: String): BrokerTopicMetrics =
stats.getAndMaybePut(topic)
@ -439,12 +438,12 @@ class BrokerTopicStats extends Logging { @@ -439,12 +438,12 @@ class BrokerTopicStats extends Logging {
topicMetrics.closeMetric(BrokerTopicStats.ProduceMessageConversionsPerSec)
topicMetrics.closeMetric(BrokerTopicStats.ReplicationBytesOutPerSec)
topicMetrics.closeMetric(BrokerTopicStats.ReassignmentBytesOutPerSec)
topicMetrics.closeMetric(BrokerTopicStats.RemoteCopyBytesPerSec)
topicMetrics.closeMetric(BrokerTopicStats.RemoteFetchBytesPerSec)
topicMetrics.closeMetric(BrokerTopicStats.RemoteFetchRequestsPerSec)
topicMetrics.closeMetric(BrokerTopicStats.RemoteCopyRequestsPerSec)
topicMetrics.closeMetric(BrokerTopicStats.FailedRemoteFetchRequestsPerSec)
topicMetrics.closeMetric(BrokerTopicStats.FailedRemoteCopyRequestsPerSec)
topicMetrics.closeMetric(RemoteStorageMetrics.REMOTE_COPY_BYTES_PER_SEC_METRIC.getName)
topicMetrics.closeMetric(RemoteStorageMetrics.REMOTE_FETCH_BYTES_PER_SEC_METRIC.getName)
topicMetrics.closeMetric(RemoteStorageMetrics.REMOTE_FETCH_REQUESTS_PER_SEC_METRIC.getName)
topicMetrics.closeMetric(RemoteStorageMetrics.REMOTE_COPY_REQUESTS_PER_SEC_METRIC.getName)
topicMetrics.closeMetric(RemoteStorageMetrics.FAILED_REMOTE_FETCH_PER_SEC_METRIC.getName)
topicMetrics.closeMetric(RemoteStorageMetrics.FAILED_REMOTE_COPY_PER_SEC_METRIC.getName)
}
}

2
core/src/main/scala/kafka/server/KafkaServer.scala

@ -261,7 +261,7 @@ class KafkaServer( @@ -261,7 +261,7 @@ class KafkaServer(
metrics = Server.initializeMetrics(config, time, clusterId)
/* register broker metrics */
_brokerTopicStats = new BrokerTopicStats
_brokerTopicStats = new BrokerTopicStats(java.util.Optional.of(config))
quotaManagers = QuotaFactory.instantiate(config, metrics, time, threadNamePrefix.getOrElse(""))
KafkaBroker.notifyClusterListeners(clusterId, kafkaMetricsReporters ++ metrics.reporters.asScala)

21
core/src/test/java/kafka/log/remote/RemoteLogManagerTest.java

@ -59,7 +59,6 @@ import org.apache.kafka.storage.internals.log.EpochEntry; @@ -59,7 +59,6 @@ import org.apache.kafka.storage.internals.log.EpochEntry;
import org.apache.kafka.storage.internals.log.LazyIndex;
import org.apache.kafka.storage.internals.log.OffsetIndex;
import org.apache.kafka.storage.internals.log.ProducerStateManager;
import org.apache.kafka.storage.internals.log.RemoteStorageThreadPool;
import org.apache.kafka.storage.internals.log.TimeIndex;
import org.apache.kafka.storage.internals.log.TransactionIndex;
import org.apache.kafka.test.TestUtils;
@ -88,14 +87,14 @@ import java.util.Map; @@ -88,14 +87,14 @@ import java.util.Map;
import java.util.NavigableMap;
import java.util.Optional;
import java.util.Properties;
import java.util.Set;
import java.util.TreeMap;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CountDownLatch;
import java.util.stream.Collectors;
import static kafka.log.remote.RemoteLogManager.REMOTE_LOG_MANAGER_TASKS_AVG_IDLE_PERCENT;
import static kafka.log.remote.RemoteLogManager.REMOTE_LOG_READER_METRICS_NAME_PREFIX;
import static org.apache.kafka.server.log.remote.metadata.storage.TopicBasedRemoteLogMetadataManagerConfig.REMOTE_LOG_METADATA_COMMON_CLIENT_PREFIX;
import static org.apache.kafka.server.log.remote.storage.RemoteStorageMetrics.REMOTE_LOG_MANAGER_TASKS_AVG_IDLE_PERCENT_METRIC;
import static org.apache.kafka.server.log.remote.storage.RemoteStorageMetrics.REMOTE_STORAGE_THREAD_POOL_METRICS;
import static org.junit.jupiter.api.Assertions.assertDoesNotThrow;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertFalse;
@ -127,7 +126,8 @@ public class RemoteLogManagerTest { @@ -127,7 +126,8 @@ public class RemoteLogManagerTest {
RemoteStorageManager remoteStorageManager = mock(RemoteStorageManager.class);
RemoteLogMetadataManager remoteLogMetadataManager = mock(RemoteLogMetadataManager.class);
RemoteLogManagerConfig remoteLogManagerConfig = null;
BrokerTopicStats brokerTopicStats = new BrokerTopicStats();
BrokerTopicStats brokerTopicStats = null;
RemoteLogManager remoteLogManager = null;
TopicIdPartition leaderTopicIdPartition = new TopicIdPartition(Uuid.randomUuid(), new TopicPartition("Leader", 0));
@ -157,8 +157,10 @@ public class RemoteLogManagerTest { @@ -157,8 +157,10 @@ public class RemoteLogManagerTest {
void setUp() throws Exception {
topicIds.put(leaderTopicIdPartition.topicPartition().topic(), leaderTopicIdPartition.topicId());
topicIds.put(followerTopicIdPartition.topicPartition().topic(), followerTopicIdPartition.topicId());
Properties props = new Properties();
Properties props = kafka.utils.TestUtils.createDummyBrokerConfig();
props.setProperty(RemoteLogManagerConfig.REMOTE_LOG_STORAGE_SYSTEM_ENABLE_PROP, "true");
remoteLogManagerConfig = createRLMConfig(props);
brokerTopicStats = new BrokerTopicStats(Optional.of(KafkaConfig.fromProps(props)));
kafka.utils.TestUtils.clearYammerMetrics();
@ -922,11 +924,8 @@ public class RemoteLogManagerTest { @@ -922,11 +924,8 @@ public class RemoteLogManagerTest {
KafkaMetricsGroup mockRlmMetricsGroup = mockMetricsGroupCtor.constructed().get(0);
KafkaMetricsGroup mockThreadPoolMetricsGroup = mockMetricsGroupCtor.constructed().get(1);
List<String> remoteLogManagerMetricNames = Collections.singletonList(REMOTE_LOG_MANAGER_TASKS_AVG_IDLE_PERCENT);
List<String> remoteStorageThreadPoolMetricNames = RemoteStorageThreadPool.METRIC_SUFFIXES
.stream()
.map(suffix -> REMOTE_LOG_READER_METRICS_NAME_PREFIX + suffix)
.collect(Collectors.toList());
List<String> remoteLogManagerMetricNames = Collections.singletonList(REMOTE_LOG_MANAGER_TASKS_AVG_IDLE_PERCENT_METRIC.getName());
Set<String> remoteStorageThreadPoolMetricNames = REMOTE_STORAGE_THREAD_POOL_METRICS;
verify(mockRlmMetricsGroup, times(remoteLogManagerMetricNames.size())).newGauge(anyString(), any());
// Verify that the RemoteLogManager metrics are removed

9
core/src/test/java/kafka/log/remote/RemoteLogReaderTest.java

@ -17,9 +17,11 @@ @@ -17,9 +17,11 @@
package kafka.log.remote;
import kafka.server.BrokerTopicStats;
import kafka.server.KafkaConfig;
import kafka.utils.TestUtils;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.record.Records;
import org.apache.kafka.server.log.remote.storage.RemoteLogManagerConfig;
import org.apache.kafka.server.log.remote.storage.RemoteStorageException;
import org.apache.kafka.storage.internals.log.FetchDataInfo;
import org.apache.kafka.storage.internals.log.LogOffsetMetadata;
@ -30,6 +32,8 @@ import org.junit.jupiter.api.Test; @@ -30,6 +32,8 @@ import org.junit.jupiter.api.Test;
import org.mockito.ArgumentCaptor;
import java.io.IOException;
import java.util.Optional;
import java.util.Properties;
import java.util.function.Consumer;
import static org.junit.jupiter.api.Assertions.assertEquals;
@ -44,13 +48,16 @@ import static org.mockito.Mockito.when; @@ -44,13 +48,16 @@ import static org.mockito.Mockito.when;
public class RemoteLogReaderTest {
public static final String TOPIC = "test";
RemoteLogManager mockRLM = mock(RemoteLogManager.class);
BrokerTopicStats brokerTopicStats = new BrokerTopicStats();
BrokerTopicStats brokerTopicStats = null;
LogOffsetMetadata logOffsetMetadata = new LogOffsetMetadata(100);
Records records = mock(Records.class);
@BeforeEach
public void setUp() {
TestUtils.clearYammerMetrics();
Properties props = kafka.utils.TestUtils.createDummyBrokerConfig();
props.setProperty(RemoteLogManagerConfig.REMOTE_LOG_STORAGE_SYSTEM_ENABLE_PROP, "true");
brokerTopicStats = new BrokerTopicStats(Optional.of(KafkaConfig.fromProps(props)));
}
@Test

30
core/src/test/scala/integration/kafka/api/MetricsTest.scala

@ -24,9 +24,12 @@ import org.apache.kafka.common.errors.{InvalidTopicException, UnknownTopicOrPart @@ -24,9 +24,12 @@ import org.apache.kafka.common.errors.{InvalidTopicException, UnknownTopicOrPart
import org.apache.kafka.common.network.ListenerName
import org.apache.kafka.common.security.auth.SecurityProtocol
import org.apache.kafka.common.security.authenticator.TestJaasConfig
import org.apache.kafka.server.log.remote.storage.{NoOpRemoteLogMetadataManager, NoOpRemoteStorageManager, RemoteLogManagerConfig, RemoteStorageMetrics}
import org.apache.kafka.server.metrics.KafkaYammerMetrics
import org.junit.jupiter.api.{AfterEach, BeforeEach, Test, TestInfo}
import org.junit.jupiter.api.{AfterEach, BeforeEach, TestInfo}
import org.junit.jupiter.api.Assertions._
import org.junit.jupiter.params.ParameterizedTest
import org.junit.jupiter.params.provider.ValueSource
import scala.annotation.nowarn
import scala.jdk.CollectionConverters._
@ -54,6 +57,12 @@ class MetricsTest extends IntegrationTestHarness with SaslSetup { @@ -54,6 +57,12 @@ class MetricsTest extends IntegrationTestHarness with SaslSetup {
@BeforeEach
override def setUp(testInfo: TestInfo): Unit = {
if (testInfo.getDisplayName.contains("testMetrics") && testInfo.getDisplayName.endsWith("true")) {
// systemRemoteStorageEnabled is enabled
this.serverConfig.setProperty(RemoteLogManagerConfig.REMOTE_LOG_STORAGE_SYSTEM_ENABLE_PROP, "true")
this.serverConfig.setProperty(RemoteLogManagerConfig.REMOTE_STORAGE_MANAGER_CLASS_NAME_PROP, classOf[NoOpRemoteStorageManager].getName)
this.serverConfig.setProperty(RemoteLogManagerConfig.REMOTE_LOG_METADATA_MANAGER_CLASS_NAME_PROP, classOf[NoOpRemoteLogMetadataManager].getName)
}
verifyNoRequestMetrics("Request metrics not removed in a previous test")
startSasl(jaasSections(kafkaServerSaslMechanisms, Some(kafkaClientSaslMechanism), KafkaSasl, kafkaServerJaasEntryName))
super.setUp(testInfo)
@ -70,8 +79,9 @@ class MetricsTest extends IntegrationTestHarness with SaslSetup { @@ -70,8 +79,9 @@ class MetricsTest extends IntegrationTestHarness with SaslSetup {
* Verifies some of the metrics of producer, consumer as well as server.
*/
@nowarn("cat=deprecation")
@Test
def testMetrics(): Unit = {
@ParameterizedTest(name = "testMetrics with systemRemoteStorageEnabled: {0}")
@ValueSource(booleans = Array(true, false))
def testMetrics(systemRemoteStorageEnabled: Boolean): Unit = {
val topic = "topicWithOldMessageFormat"
val props = new Properties
props.setProperty(TopicConfig.MESSAGE_FORMAT_VERSION_CONFIG, "0.9.0")
@ -103,6 +113,7 @@ class MetricsTest extends IntegrationTestHarness with SaslSetup { @@ -103,6 +113,7 @@ class MetricsTest extends IntegrationTestHarness with SaslSetup {
generateAuthenticationFailure(tp)
verifyBrokerAuthenticationMetrics(server)
verifyRemoteStorageMetrics(systemRemoteStorageEnabled)
}
private def sendRecords(producer: KafkaProducer[Array[Byte], Array[Byte]], numRecords: Int,
@ -308,4 +319,17 @@ class MetricsTest extends IntegrationTestHarness with SaslSetup { @@ -308,4 +319,17 @@ class MetricsTest extends IntegrationTestHarness with SaslSetup {
}
assertTrue(metrics.isEmpty, s"$errorMessage: ${metrics.keys}")
}
private def verifyRemoteStorageMetrics(shouldContainMetrics: Boolean): Unit = {
val metrics = RemoteStorageMetrics.allMetrics().asScala.filter(name =>
KafkaYammerMetrics.defaultRegistry.allMetrics.asScala.find(metric => {
metric._1.getMBeanName().equals(name.getMBeanName)
}).isDefined
).toList
if (shouldContainMetrics) {
assertEquals(RemoteStorageMetrics.allMetrics().size(), metrics.size, s"Only $metrics appear in the metrics")
} else {
assertEquals(0, metrics.size, s"$metrics should not appear in the metrics")
}
}
}

23
core/src/test/scala/kafka/server/KafkaRequestHandlerTest.scala

@ -25,8 +25,11 @@ import org.apache.kafka.common.protocol.ApiKeys @@ -25,8 +25,11 @@ import org.apache.kafka.common.protocol.ApiKeys
import org.apache.kafka.common.requests.{RequestContext, RequestHeader}
import org.apache.kafka.common.security.auth.{KafkaPrincipal, SecurityProtocol}
import org.apache.kafka.common.utils.MockTime
import org.junit.jupiter.api.Assertions.assertEquals
import org.apache.kafka.server.log.remote.storage.{RemoteLogManagerConfig, RemoteStorageMetrics}
import org.junit.jupiter.api.Assertions.{assertEquals, assertFalse, assertTrue}
import org.junit.jupiter.api.Test
import org.junit.jupiter.params.ParameterizedTest
import org.junit.jupiter.params.provider.ValueSource
import org.mockito.ArgumentMatchers
import org.mockito.ArgumentMatchers.any
import org.mockito.Mockito.{mock, when}
@ -77,4 +80,22 @@ class KafkaRequestHandlerTest { @@ -77,4 +80,22 @@ class KafkaRequestHandlerTest {
assertEquals(Some(startTime + 2000000), request.callbackRequestDequeueTimeNanos)
assertEquals(Some(startTime + 3000000), request.callbackRequestCompleteTimeNanos)
}
@ParameterizedTest
@ValueSource(booleans = Array(true, false))
def testTopicStats(systemRemoteStorageEnabled: Boolean): Unit = {
val topic = "topic"
val props = kafka.utils.TestUtils.createDummyBrokerConfig()
props.setProperty(RemoteLogManagerConfig.REMOTE_LOG_STORAGE_SYSTEM_ENABLE_PROP, systemRemoteStorageEnabled.toString)
val brokerTopicStats = new BrokerTopicStats(java.util.Optional.of(KafkaConfig.fromProps(props)))
brokerTopicStats.topicStats(topic)
RemoteStorageMetrics.brokerTopicStatsMetrics.forEach(metric => {
if (systemRemoteStorageEnabled) {
assertTrue(brokerTopicStats.topicStats(topic).metricMap.contains(metric.getName))
} else {
assertFalse(brokerTopicStats.topicStats(topic).metricMap.contains(metric.getName))
}
})
}
}

3
core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala

@ -3583,6 +3583,7 @@ class ReplicaManagerTest { @@ -3583,6 +3583,7 @@ class ReplicaManagerTest {
val tidp0 = new TopicIdPartition(topicId, tp0)
val props = new Properties()
props.put("zookeeper.connect", "test")
props.put(RemoteLogManagerConfig.REMOTE_LOG_STORAGE_SYSTEM_ENABLE_PROP, true.toString)
props.put(RemoteLogManagerConfig.REMOTE_STORAGE_MANAGER_CLASS_NAME_PROP, classOf[NoOpRemoteStorageManager].getName)
props.put(RemoteLogManagerConfig.REMOTE_LOG_METADATA_MANAGER_CLASS_NAME_PROP, classOf[NoOpRemoteLogMetadataManager].getName)
@ -3591,7 +3592,7 @@ class ReplicaManagerTest { @@ -3591,7 +3592,7 @@ class ReplicaManagerTest {
val config = new AbstractConfig(RemoteLogManagerConfig.CONFIG_DEF, props)
val remoteLogManagerConfig = new RemoteLogManagerConfig(config)
val mockLog = mock(classOf[UnifiedLog])
val brokerTopicStats = new BrokerTopicStats
val brokerTopicStats = new BrokerTopicStats(java.util.Optional.of(KafkaConfig.fromProps(props)))
val remoteLogManager = new RemoteLogManager(
remoteLogManagerConfig,
0,

4
core/src/test/scala/unit/kafka/utils/TestUtils.scala

@ -280,6 +280,10 @@ object TestUtils extends Logging { @@ -280,6 +280,10 @@ object TestUtils extends Logging {
Await.result(future, FiniteDuration(5, TimeUnit.MINUTES))
}
def createDummyBrokerConfig(): Properties = {
createBrokerConfig(0, "")
}
/**
* Create a test config for the provided parameters.
*

3
jmh-benchmarks/src/main/java/org/apache/kafka/jmh/fetcher/ReplicaFetcherThreadBenchmark.java

@ -95,6 +95,7 @@ import java.util.Collections; @@ -95,6 +95,7 @@ import java.util.Collections;
import java.util.HashMap;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Optional;
import java.util.Properties;
import java.util.UUID;
import java.util.concurrent.TimeUnit;
@ -133,7 +134,7 @@ public class ReplicaFetcherThreadBenchmark { @@ -133,7 +134,7 @@ public class ReplicaFetcherThreadBenchmark {
KafkaConfig config = new KafkaConfig(props);
LogConfig logConfig = createLogConfig();
BrokerTopicStats brokerTopicStats = new BrokerTopicStats();
BrokerTopicStats brokerTopicStats = new BrokerTopicStats(Optional.empty());
LogDirFailureChannel logDirFailureChannel = Mockito.mock(LogDirFailureChannel.class);
List<File> logDirs = Collections.singletonList(logDir);
logManager = new LogManagerBuilder().

2
jmh-benchmarks/src/main/java/org/apache/kafka/jmh/metadata/KRaftMetadataRequestBenchmark.java

@ -117,7 +117,7 @@ public class KRaftMetadataRequestBenchmark { @@ -117,7 +117,7 @@ public class KRaftMetadataRequestBenchmark {
clientQuotaManager, clientRequestQuotaManager, controllerMutationQuotaManager, replicaQuotaManager,
replicaQuotaManager, replicaQuotaManager, Option.empty());
private FetchManager fetchManager = Mockito.mock(FetchManager.class);
private BrokerTopicStats brokerTopicStats = new BrokerTopicStats();
private BrokerTopicStats brokerTopicStats = new BrokerTopicStats(Optional.empty());
private KafkaPrincipal principal = new KafkaPrincipal(KafkaPrincipal.USER_TYPE, "test-user");
private KafkaApis kafkaApis;
private RequestChannel.Request allTopicMetadataRequest;

2
jmh-benchmarks/src/main/java/org/apache/kafka/jmh/metadata/MetadataRequestBenchmark.java

@ -122,7 +122,7 @@ public class MetadataRequestBenchmark { @@ -122,7 +122,7 @@ public class MetadataRequestBenchmark {
clientQuotaManager, clientRequestQuotaManager, controllerMutationQuotaManager, replicaQuotaManager,
replicaQuotaManager, replicaQuotaManager, Option.empty());
private FetchManager fetchManager = Mockito.mock(FetchManager.class);
private BrokerTopicStats brokerTopicStats = new BrokerTopicStats();
private BrokerTopicStats brokerTopicStats = new BrokerTopicStats(Optional.empty());
private KafkaPrincipal principal = new KafkaPrincipal(KafkaPrincipal.USER_TYPE, "test-user");
private KafkaApis kafkaApis;
private RequestChannel.Request allTopicMetadataRequest;

2
jmh-benchmarks/src/main/java/org/apache/kafka/jmh/partition/PartitionMakeFollowerBenchmark.java

@ -94,7 +94,7 @@ public class PartitionMakeFollowerBenchmark { @@ -94,7 +94,7 @@ public class PartitionMakeFollowerBenchmark {
scheduler.startup();
LogConfig logConfig = new LogConfig(new Properties());
BrokerTopicStats brokerTopicStats = new BrokerTopicStats();
BrokerTopicStats brokerTopicStats = new BrokerTopicStats(Optional.empty());
LogDirFailureChannel logDirFailureChannel = Mockito.mock(LogDirFailureChannel.class);
logManager = new LogManagerBuilder().
setLogDirs(Collections.singletonList(logDir)).

2
jmh-benchmarks/src/main/java/org/apache/kafka/jmh/partition/UpdateFollowerFetchStateBenchmark.java

@ -74,7 +74,7 @@ public class UpdateFollowerFetchStateBenchmark { @@ -74,7 +74,7 @@ public class UpdateFollowerFetchStateBenchmark {
private Option<Uuid> topicId = OptionConverters.toScala(Optional.of(Uuid.randomUuid()));
private File logDir = new File(System.getProperty("java.io.tmpdir"), topicPartition.toString());
private KafkaScheduler scheduler = new KafkaScheduler(1, true, "scheduler");
private BrokerTopicStats brokerTopicStats = new BrokerTopicStats();
private BrokerTopicStats brokerTopicStats = new BrokerTopicStats(Optional.empty());
private LogDirFailureChannel logDirFailureChannel = Mockito.mock(LogDirFailureChannel.class);
private long nextOffset = 0;
private LogManager logManager;

3
jmh-benchmarks/src/main/java/org/apache/kafka/jmh/record/BaseRecordBatchBenchmark.java

@ -36,6 +36,7 @@ import org.openjdk.jmh.annotations.TearDown; @@ -36,6 +36,7 @@ import org.openjdk.jmh.annotations.TearDown;
import java.nio.ByteBuffer;
import java.util.Arrays;
import java.util.Optional;
import java.util.Random;
import java.util.stream.IntStream;
@ -79,7 +80,7 @@ public abstract class BaseRecordBatchBenchmark { @@ -79,7 +80,7 @@ public abstract class BaseRecordBatchBenchmark {
ByteBuffer[] batchBuffers;
RequestLocal requestLocal;
LogValidator.MetricsRecorder validatorMetricsRecorder = UnifiedLog.newValidatorMetricsRecorder(
new BrokerTopicStats().allTopicsStats());
new BrokerTopicStats(Optional.empty()).allTopicsStats());
@Setup
public void init() {

3
jmh-benchmarks/src/main/java/org/apache/kafka/jmh/server/CheckpointBench.java

@ -56,6 +56,7 @@ import org.openjdk.jmh.annotations.Warmup; @@ -56,6 +56,7 @@ import org.openjdk.jmh.annotations.Warmup;
import java.io.File;
import java.util.ArrayList;
import java.util.List;
import java.util.Optional;
import java.util.Properties;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
@ -110,7 +111,7 @@ public class CheckpointBench { @@ -110,7 +111,7 @@ public class CheckpointBench {
new LogConfig(new Properties()), new MockConfigRepository(), new CleanerConfig(1, 4 * 1024 * 1024L, 0.9d,
1024 * 1024, 32 * 1024 * 1024, Double.MAX_VALUE, 15 * 1000, true), time, MetadataVersion.latest(), 4, false, Option.empty());
scheduler.startup();
final BrokerTopicStats brokerTopicStats = new BrokerTopicStats();
final BrokerTopicStats brokerTopicStats = new BrokerTopicStats(Optional.empty());
final MetadataCache metadataCache =
MetadataCache.zkMetadataCache(this.brokerProperties.brokerId(),
this.brokerProperties.interBrokerProtocolVersion(),

3
jmh-benchmarks/src/main/java/org/apache/kafka/jmh/server/PartitionCreationBench.java

@ -63,6 +63,7 @@ import java.io.File; @@ -63,6 +63,7 @@ import java.io.File;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Optional;
import java.util.Properties;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
@ -113,7 +114,7 @@ public class PartitionCreationBench { @@ -113,7 +114,7 @@ public class PartitionCreationBench {
this.metrics = new Metrics();
this.time = Time.SYSTEM;
this.failureChannel = new LogDirFailureChannel(brokerProperties.logDirs().size());
final BrokerTopicStats brokerTopicStats = new BrokerTopicStats();
final BrokerTopicStats brokerTopicStats = new BrokerTopicStats(Optional.empty());
final List<File> files =
JavaConverters.seqAsJavaList(brokerProperties.logDirs()).stream().map(File::new).collect(Collectors.toList());
CleanerConfig cleanerConfig = new CleanerConfig(1,

97
storage/api/src/main/java/org/apache/kafka/server/log/remote/storage/RemoteStorageMetrics.java

@ -0,0 +1,97 @@ @@ -0,0 +1,97 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.server.log.remote.storage;
import com.yammer.metrics.core.MetricName;
import org.apache.kafka.server.metrics.KafkaYammerMetrics;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashSet;
import java.util.Set;
/**
* This class contains the metrics related to tiered storage feature, which is to have a centralized
* place to store them, so that we can verify all of them easily.
*
* @see kafka.api.MetricsTest
*/
public class RemoteStorageMetrics {
private static final String REMOTE_LOG_READER_METRICS_NAME_PREFIX = "RemoteLogReader";
private static final String REMOTE_LOG_MANAGER_TASKS_AVG_IDLE_PERCENT = "RemoteLogManagerTasksAvgIdlePercent";
private static final String TASK_QUEUE_SIZE = "TaskQueueSize";
private static final String AVG_IDLE_PERCENT = "AvgIdlePercent";
private static final String REMOTE_COPY_BYTES_PER_SEC = "RemoteCopyBytesPerSec";
private static final String REMOTE_FETCH_BYTES_PER_SEC = "RemoteFetchBytesPerSec";
private static final String REMOTE_FETCH_REQUESTS_PER_SEC = "RemoteFetchRequestsPerSec";
private static final String REMOTE_COPY_REQUESTS_PER_SEC = "RemoteCopyRequestsPerSec";
private static final String FAILED_REMOTE_FETCH_PER_SEC = "RemoteFetchErrorsPerSec";
private static final String FAILED_REMOTE_COPY_PER_SEC = "RemoteCopyErrorsPerSec";
private static final String REMOTE_LOG_READER_TASK_QUEUE_SIZE = REMOTE_LOG_READER_METRICS_NAME_PREFIX + TASK_QUEUE_SIZE;
private static final String REMOTE_LOG_READER_AVG_IDLE_PERCENT = REMOTE_LOG_READER_METRICS_NAME_PREFIX + AVG_IDLE_PERCENT;
public static final Set<String> REMOTE_STORAGE_THREAD_POOL_METRICS = Collections.unmodifiableSet(
new HashSet<>(Arrays.asList(REMOTE_LOG_READER_TASK_QUEUE_SIZE, REMOTE_LOG_READER_AVG_IDLE_PERCENT)));
public final static MetricName REMOTE_COPY_BYTES_PER_SEC_METRIC = getMetricName(
"kafka.server", "BrokerTopicMetrics", REMOTE_COPY_BYTES_PER_SEC);
public final static MetricName REMOTE_FETCH_BYTES_PER_SEC_METRIC = getMetricName(
"kafka.server", "BrokerTopicMetrics", REMOTE_FETCH_BYTES_PER_SEC);
public final static MetricName REMOTE_FETCH_REQUESTS_PER_SEC_METRIC = getMetricName(
"kafka.server", "BrokerTopicMetrics", REMOTE_FETCH_REQUESTS_PER_SEC);
public final static MetricName REMOTE_COPY_REQUESTS_PER_SEC_METRIC = getMetricName(
"kafka.server", "BrokerTopicMetrics", REMOTE_COPY_REQUESTS_PER_SEC);
public final static MetricName FAILED_REMOTE_FETCH_PER_SEC_METRIC = getMetricName(
"kafka.server", "BrokerTopicMetrics", FAILED_REMOTE_FETCH_PER_SEC);
public final static MetricName FAILED_REMOTE_COPY_PER_SEC_METRIC = getMetricName(
"kafka.server", "BrokerTopicMetrics", FAILED_REMOTE_COPY_PER_SEC);
public final static MetricName REMOTE_LOG_MANAGER_TASKS_AVG_IDLE_PERCENT_METRIC = getMetricName(
"kafka.log.remote", "RemoteLogManager", REMOTE_LOG_MANAGER_TASKS_AVG_IDLE_PERCENT);
public final static MetricName REMOTE_LOG_READER_TASK_QUEUE_SIZE_METRIC = getMetricName(
"org.apache.kafka.storage.internals.log", "RemoteStorageThreadPool", REMOTE_LOG_READER_TASK_QUEUE_SIZE);
public final static MetricName REMOTE_LOG_READER_AVG_IDLE_PERCENT_METRIC = getMetricName(
"org.apache.kafka.storage.internals.log", "RemoteStorageThreadPool", REMOTE_LOG_READER_AVG_IDLE_PERCENT);
public static Set<MetricName> allMetrics() {
Set<MetricName> metrics = new HashSet<>();
metrics.add(REMOTE_COPY_BYTES_PER_SEC_METRIC);
metrics.add(REMOTE_FETCH_BYTES_PER_SEC_METRIC);
metrics.add(REMOTE_FETCH_REQUESTS_PER_SEC_METRIC);
metrics.add(REMOTE_COPY_REQUESTS_PER_SEC_METRIC);
metrics.add(FAILED_REMOTE_FETCH_PER_SEC_METRIC);
metrics.add(FAILED_REMOTE_COPY_PER_SEC_METRIC);
metrics.add(REMOTE_LOG_MANAGER_TASKS_AVG_IDLE_PERCENT_METRIC);
metrics.add(REMOTE_LOG_READER_AVG_IDLE_PERCENT_METRIC);
metrics.add(REMOTE_LOG_READER_TASK_QUEUE_SIZE_METRIC);
return metrics;
}
public static Set<MetricName> brokerTopicStatsMetrics() {
Set<MetricName> metrics = new HashSet<>();
metrics.add(REMOTE_COPY_BYTES_PER_SEC_METRIC);
metrics.add(REMOTE_FETCH_BYTES_PER_SEC_METRIC);
metrics.add(REMOTE_FETCH_REQUESTS_PER_SEC_METRIC);
metrics.add(REMOTE_COPY_REQUESTS_PER_SEC_METRIC);
metrics.add(FAILED_REMOTE_FETCH_PER_SEC_METRIC);
metrics.add(FAILED_REMOTE_COPY_PER_SEC_METRIC);
return metrics;
}
private static MetricName getMetricName(String group, String type, String name) {
return KafkaYammerMetrics.getMetricName(group, type, name);
}
}

22
storage/src/main/java/org/apache/kafka/storage/internals/log/RemoteStorageThreadPool.java

@ -23,28 +23,23 @@ import org.apache.kafka.common.utils.LogContext; @@ -23,28 +23,23 @@ import org.apache.kafka.common.utils.LogContext;
import org.apache.kafka.server.metrics.KafkaMetricsGroup;
import org.slf4j.Logger;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashSet;
import java.util.Set;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import static org.apache.kafka.server.log.remote.storage.RemoteStorageMetrics.REMOTE_LOG_READER_AVG_IDLE_PERCENT_METRIC;
import static org.apache.kafka.server.log.remote.storage.RemoteStorageMetrics.REMOTE_LOG_READER_TASK_QUEUE_SIZE_METRIC;
import static org.apache.kafka.server.log.remote.storage.RemoteStorageMetrics.REMOTE_STORAGE_THREAD_POOL_METRICS;
public class RemoteStorageThreadPool extends ThreadPoolExecutor {
public static final String TASK_QUEUE_SIZE = "TaskQueueSize";
public static final String AVG_IDLE_PERCENT = "AvgIdlePercent";
public static final Set<String> METRIC_SUFFIXES = Collections.unmodifiableSet(new HashSet<>(Arrays.asList(TASK_QUEUE_SIZE, AVG_IDLE_PERCENT)));
private final Logger logger;
private final String metricsNamePrefix;
private final KafkaMetricsGroup metricsGroup = new KafkaMetricsGroup(this.getClass());
public RemoteStorageThreadPool(String threadNamePrefix,
int numThreads,
int maxPendingTasks,
String metricsNamePrefix) {
int maxPendingTasks) {
super(numThreads, numThreads, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<>(maxPendingTasks),
new RemoteStorageThreadFactory(threadNamePrefix));
logger = new LogContext() {
@ -54,14 +49,13 @@ public class RemoteStorageThreadPool extends ThreadPoolExecutor { @@ -54,14 +49,13 @@ public class RemoteStorageThreadPool extends ThreadPoolExecutor {
}
}.logger(RemoteStorageThreadPool.class);
this.metricsNamePrefix = metricsNamePrefix;
metricsGroup.newGauge(metricsNamePrefix.concat(TASK_QUEUE_SIZE), new Gauge<Integer>() {
metricsGroup.newGauge(REMOTE_LOG_READER_TASK_QUEUE_SIZE_METRIC.getName(), new Gauge<Integer>() {
@Override
public Integer value() {
return RemoteStorageThreadPool.this.getQueue().size();
}
});
metricsGroup.newGauge(metricsNamePrefix.concat(AVG_IDLE_PERCENT), new Gauge<Double>() {
metricsGroup.newGauge(REMOTE_LOG_READER_AVG_IDLE_PERCENT_METRIC.getName(), new Gauge<Double>() {
@Override
public Double value() {
return 1 - (double) RemoteStorageThreadPool.this.getActiveCount() / (double) RemoteStorageThreadPool.this.getCorePoolSize();
@ -98,6 +92,6 @@ public class RemoteStorageThreadPool extends ThreadPoolExecutor { @@ -98,6 +92,6 @@ public class RemoteStorageThreadPool extends ThreadPoolExecutor {
}
public void removeMetrics() {
METRIC_SUFFIXES.forEach(metric -> metricsGroup.removeMetric(metricsNamePrefix.concat(metric)));
REMOTE_STORAGE_THREAD_POOL_METRICS.forEach(metricsGroup::removeMetric);
}
}

Loading…
Cancel
Save