From a339a387e0669226bc1537783b3d4c6ea5b9b4c0 Mon Sep 17 00:00:00 2001 From: Rajini Sivaram Date: Mon, 18 Sep 2017 10:13:28 +0100 Subject: [PATCH] MINOR: Add metric templates for sender/fetcher rate totals Author: Rajini Sivaram Reviewers: James Cheng , Ismael Juma Closes #3882 from rajinisivaram/MINOR-KAFKA-5738-metricstemplates --- .../internals/SenderMetricsRegistry.java | 10 +++++- .../kafka/common/MetricNameTemplate.java | 23 ++++++++++++- .../consumer/internals/FetcherTest.java | 34 +++++++++++++++++++ .../producer/internals/SenderTest.java | 30 ++++++++++++++++ .../java/org/apache/kafka/test/TestUtils.java | 12 ++++++- 5 files changed, 106 insertions(+), 3 deletions(-) diff --git a/clients/src/main/java/org/apache/kafka/clients/producer/internals/SenderMetricsRegistry.java b/clients/src/main/java/org/apache/kafka/clients/producer/internals/SenderMetricsRegistry.java index 9e014a726a6..21466e1fedf 100644 --- a/clients/src/main/java/org/apache/kafka/clients/producer/internals/SenderMetricsRegistry.java +++ b/clients/src/main/java/org/apache/kafka/clients/producer/internals/SenderMetricsRegistry.java @@ -117,24 +117,32 @@ public class SenderMetricsRegistry { this.requestLatencyAvg, this.requestLatencyMax, this.recordSendRate, + this.recordSendTotal, this.recordsPerRequestAvg, this.recordRetryRate, + this.recordRetryTotal, this.recordErrorRate, + this.recordErrorTotal, this.recordSizeMax, this.recordSizeAvg, this.requestsInFlight, this.metadataAge, this.batchSplitRate, + this.batchSplitTotal, this.produceThrottleTimeAvg, this.produceThrottleTimeMax, // per-topic metrics this.topicRecordSendRate, + this.topicRecordSendTotal, this.topicByteRate, + this.topicByteTotal, this.topicCompressionRate, this.topicRecordRetryRate, - this.topicRecordErrorRate + this.topicRecordRetryTotal, + this.topicRecordErrorRate, + this.topicRecordErrorTotal ); } diff --git a/clients/src/main/java/org/apache/kafka/common/MetricNameTemplate.java b/clients/src/main/java/org/apache/kafka/common/MetricNameTemplate.java index d768f228533..e3ea9950ef1 100644 --- a/clients/src/main/java/org/apache/kafka/common/MetricNameTemplate.java +++ b/clients/src/main/java/org/apache/kafka/common/MetricNameTemplate.java @@ -17,6 +17,7 @@ package org.apache.kafka.common; import java.util.HashSet; +import java.util.Objects; import java.util.Set; import org.apache.kafka.common.utils.Utils; @@ -65,8 +66,28 @@ public class MetricNameTemplate { return this.description; } - public Set tags() { return tags; } + + @Override + public int hashCode() { + return Objects.hash(name, group, tags); + } + + @Override + public boolean equals(Object o) { + if (this == o) + return true; + if (o == null || getClass() != o.getClass()) + return false; + MetricNameTemplate other = (MetricNameTemplate) o; + return Objects.equals(name, other.name) && Objects.equals(group, other.group) && + Objects.equals(tags, other.tags); + } + + @Override + public String toString() { + return String.format("name=%s, group=%s, tags=%s", name, group, tags); + } } diff --git a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetcherTest.java b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetcherTest.java index b2ede15ac3b..5263d3bd74b 100644 --- a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetcherTest.java +++ b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetcherTest.java @@ -32,6 +32,7 @@ import org.apache.kafka.clients.consumer.OffsetResetStrategy; import org.apache.kafka.common.Cluster; import org.apache.kafka.common.KafkaException; import org.apache.kafka.common.MetricName; +import org.apache.kafka.common.MetricNameTemplate; import org.apache.kafka.common.Node; import org.apache.kafka.common.PartitionInfo; import org.apache.kafka.common.TopicPartition; @@ -43,6 +44,7 @@ import org.apache.kafka.common.errors.TopicAuthorizationException; import org.apache.kafka.common.header.Header; import org.apache.kafka.common.header.internals.RecordHeader; import org.apache.kafka.common.metrics.KafkaMetric; +import org.apache.kafka.common.metrics.MetricConfig; import org.apache.kafka.common.metrics.Metrics; import org.apache.kafka.common.metrics.Sensor; import org.apache.kafka.common.network.NetworkReceive; @@ -1361,6 +1363,38 @@ public class FetcherTest { assertEquals(3, recordsCountAverage.value(), EPSILON); } + @Test + public void testFetcherMetricsTemplates() throws Exception { + metrics.close(); + Map clientTags = Collections.singletonMap("client-id", "clientA"); + metrics = new Metrics(new MetricConfig().tags(clientTags)); + metricsRegistry = new FetcherMetricsRegistry(clientTags.keySet(), "consumer" + groupId); + fetcher.close(); + fetcher = createFetcher(subscriptions, metrics); + + // Fetch from topic to generate topic metrics + subscriptions.assignFromUser(singleton(tp0)); + subscriptions.seek(tp0, 0); + assertEquals(1, fetcher.sendFetches()); + client.prepareResponse(fetchResponse(tp0, this.records, Errors.NONE, 100L, 0)); + consumerClient.poll(0); + assertTrue(fetcher.hasCompletedFetches()); + Map>> partitionRecords = fetcher.fetchedRecords(); + assertTrue(partitionRecords.containsKey(tp0)); + + // Create throttle metrics + Fetcher.throttleTimeSensor(metrics, metricsRegistry); + + // Verify that all metrics except metrics-count have registered templates + Set allMetrics = new HashSet<>(); + for (MetricName n : metrics.metrics().keySet()) { + String name = n.name().replaceAll(tp0.toString(), "{topic}-{partition}"); + if (!n.group().equals("kafka-metrics-count")) + allMetrics.add(new MetricNameTemplate(name, n.group(), "", n.tags().keySet())); + } + TestUtils.checkEquals(allMetrics, new HashSet<>(metricsRegistry.getAllTemplates()), "metrics", "templates"); + } + private Map>> fetchRecords( TopicPartition tp, MemoryRecords records, Errors error, long hw, int throttleTime) { return fetchRecords(tp, records, error, hw, FetchResponse.INVALID_LAST_STABLE_OFFSET, throttleTime); diff --git a/clients/src/test/java/org/apache/kafka/clients/producer/internals/SenderTest.java b/clients/src/test/java/org/apache/kafka/clients/producer/internals/SenderTest.java index 26e3e6c49e8..a64bc56d765 100644 --- a/clients/src/test/java/org/apache/kafka/clients/producer/internals/SenderTest.java +++ b/clients/src/test/java/org/apache/kafka/clients/producer/internals/SenderTest.java @@ -26,6 +26,7 @@ import org.apache.kafka.clients.producer.Callback; import org.apache.kafka.clients.producer.RecordMetadata; import org.apache.kafka.common.Cluster; import org.apache.kafka.common.MetricName; +import org.apache.kafka.common.MetricNameTemplate; import org.apache.kafka.common.Node; import org.apache.kafka.common.TopicPartition; import org.apache.kafka.common.errors.ClusterAuthorizationException; @@ -70,10 +71,12 @@ import java.nio.ByteBuffer; import java.util.Collections; import java.util.Deque; import java.util.HashMap; +import java.util.HashSet; import java.util.Iterator; import java.util.LinkedHashMap; import java.util.List; import java.util.Map; +import java.util.Set; import java.util.concurrent.ExecutionException; import java.util.concurrent.Future; import java.util.concurrent.atomic.AtomicInteger; @@ -274,6 +277,33 @@ public class SenderTest { client.close(); } + @Test + public void testSenderMetricsTemplates() throws Exception { + metrics.close(); + Map clientTags = Collections.singletonMap("client-id", "clientA"); + metrics = new Metrics(new MetricConfig().tags(clientTags)); + SenderMetricsRegistry metricsRegistry = new SenderMetricsRegistry(clientTags.keySet()); + Sender sender = new Sender(logContext, client, metadata, this.accumulator, false, MAX_REQUEST_SIZE, ACKS_ALL, + 1, metrics, metricsRegistry, time, REQUEST_TIMEOUT, 50, null, apiVersions); + + // Append a message so that topic metrics are created + accumulator.append(tp0, 0L, "key".getBytes(), "value".getBytes(), null, null, MAX_BLOCK_TIMEOUT); + sender.run(time.milliseconds()); // connect + sender.run(time.milliseconds()); // send produce request + client.respond(produceResponse(tp0, 0, Errors.NONE, 0)); + sender.run(time.milliseconds()); + // Create throttle time metrics + Sender.throttleTimeSensor(metrics, metricsRegistry); + + // Verify that all metrics except metrics-count have registered templates + Set allMetrics = new HashSet<>(); + for (MetricName n : metrics.metrics().keySet()) { + if (!n.group().equals("kafka-metrics-count")) + allMetrics.add(new MetricNameTemplate(n.name(), n.group(), "", n.tags().keySet())); + } + TestUtils.checkEquals(allMetrics, new HashSet<>(metricsRegistry.getAllTemplates()), "metrics", "templates"); + } + @Test public void testRetries() throws Exception { // create a sender with retries = 1 diff --git a/clients/src/test/java/org/apache/kafka/test/TestUtils.java b/clients/src/test/java/org/apache/kafka/test/TestUtils.java index 47ca82308a8..958ab2cc636 100644 --- a/clients/src/test/java/org/apache/kafka/test/TestUtils.java +++ b/clients/src/test/java/org/apache/kafka/test/TestUtils.java @@ -35,6 +35,7 @@ import java.nio.file.Path; import java.util.ArrayList; import java.util.Collections; import java.util.HashMap; +import java.util.HashSet; import java.util.Iterator; import java.util.List; import java.util.Map; @@ -317,6 +318,16 @@ public class TestUtils { assertEquals(Utils.toList(it1), Utils.toList(it2)); } + public static void checkEquals(Set c1, Set c2, String firstDesc, String secondDesc) { + if (!c1.equals(c2)) { + Set missing1 = new HashSet<>(c2); + missing1.removeAll(c1); + Set missing2 = new HashSet<>(c1); + missing2.removeAll(c2); + fail(String.format("Sets not equal, missing %s=%s, missing %s=%s", firstDesc, missing1, secondDesc, missing2)); + } + } + public static List toList(Iterable iterable) { List list = new ArrayList<>(); for (T item : iterable) @@ -330,5 +341,4 @@ public class TestUtils { buffer.rewind(); return buffer; } - }