diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java index 32782ee53db..0711d15cdb8 100644 --- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java +++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java @@ -45,6 +45,7 @@ import org.apache.kafka.common.metrics.stats.Avg; import org.apache.kafka.common.metrics.stats.Count; import org.apache.kafka.common.metrics.stats.Max; import org.apache.kafka.common.metrics.stats.Meter; +import org.apache.kafka.common.metrics.stats.Min; import org.apache.kafka.common.metrics.stats.Value; import org.apache.kafka.common.protocol.Errors; import org.apache.kafka.common.record.BufferSupplier; @@ -587,6 +588,11 @@ public class Fetcher implements SubscriptionState.Listener, Closeable { if (partitionLag != null) this.sensors.recordPartitionLag(partitionRecords.partition, partitionLag); + Long lead = subscriptions.partitionLead(partitionRecords.partition); + if (lead != null) { + this.sensors.recordPartitionLead(partitionRecords.partition, lead); + } + return partRecords; } else { // these records aren't next in line based on the last consumed position, ignore them @@ -871,6 +877,11 @@ public class Fetcher implements SubscriptionState.Listener, Closeable { subscriptions.updateHighWatermark(tp, partition.highWatermark); } + if (partition.logStartOffset >= 0) { + log.trace("Updating log start offset for partition {} to {}", tp, partition.logStartOffset); + subscriptions.updateLogStartOffset(tp, partition.logStartOffset); + } + if (partition.lastStableOffset >= 0) { log.trace("Updating last stable offset for partition {} to {}", tp, partition.lastStableOffset); subscriptions.updateLastStableOffset(tp, partition.lastStableOffset); @@ -945,7 +956,7 @@ public class Fetcher implements SubscriptionState.Listener, Closeable { @Override public void onAssignment(Set assignment) { - sensors.updatePartitionLagSensors(assignment); + sensors.updatePartitionLagAndLeadSensors(assignment); } public static Sensor throttleTimeSensor(Metrics metrics, FetcherMetricsRegistry metricsRegistry) { @@ -1261,6 +1272,7 @@ public class Fetcher implements SubscriptionState.Listener, Closeable { private final Sensor recordsFetched; private final Sensor fetchLatency; private final Sensor recordsFetchLag; + private final Sensor recordsFetchLead; private Set assignedPartitions; @@ -1287,6 +1299,9 @@ public class Fetcher implements SubscriptionState.Listener, Closeable { this.recordsFetchLag = metrics.sensor("records-lag"); this.recordsFetchLag.add(metrics.metricInstance(metricsRegistry.recordsLagMax), new Max()); + + this.recordsFetchLead = metrics.sensor("records-lead"); + this.recordsFetchLead.add(metrics.metricInstance(metricsRegistry.recordsLeadMin), new Min()); } private void recordTopicFetchMetrics(String topic, int bytes, int records) { @@ -1322,16 +1337,37 @@ public class Fetcher implements SubscriptionState.Listener, Closeable { recordsFetched.record(records); } - private void updatePartitionLagSensors(Set assignedPartitions) { + private void updatePartitionLagAndLeadSensors(Set assignedPartitions) { if (this.assignedPartitions != null) { for (TopicPartition tp : this.assignedPartitions) { - if (!assignedPartitions.contains(tp)) + if (!assignedPartitions.contains(tp)) { metrics.removeSensor(partitionLagMetricName(tp)); + metrics.removeSensor(partitionLeadMetricName(tp)); + } } } this.assignedPartitions = assignedPartitions; } + private void recordPartitionLead(TopicPartition tp, long lead) { + this.recordsFetchLead.record(lead); + + String name = partitionLeadMetricName(tp); + Sensor recordsLead = this.metrics.getSensor(name); + if (recordsLead == null) { + Map metricTags = new HashMap<>(2); + metricTags.put("topic", tp.topic().replace('.', '_')); + metricTags.put("partition", String.valueOf(tp.partition())); + + recordsLead = this.metrics.sensor(name); + + recordsLead.add(this.metrics.metricInstance(metricsRegistry.partitionRecordsLead, metricTags), new Value()); + recordsLead.add(this.metrics.metricInstance(metricsRegistry.partitionRecordsLeadMin, metricTags), new Min()); + recordsLead.add(this.metrics.metricInstance(metricsRegistry.partitionRecordsLeadAvg, metricTags), new Avg()); + } + recordsLead.record(lead); + } + private void recordPartitionLag(TopicPartition tp, long lag) { this.recordsFetchLag.record(lag); @@ -1364,6 +1400,11 @@ public class Fetcher implements SubscriptionState.Listener, Closeable { private static String partitionLagMetricName(TopicPartition tp) { return tp + ".records-lag"; } + + private static String partitionLeadMetricName(TopicPartition tp) { + return tp + ".records-lead"; + } + } @Override diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/FetcherMetricsRegistry.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/FetcherMetricsRegistry.java index 301363a638b..6eb4fa20ff4 100644 --- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/FetcherMetricsRegistry.java +++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/FetcherMetricsRegistry.java @@ -38,6 +38,7 @@ public class FetcherMetricsRegistry { public MetricNameTemplate fetchRequestRate; public MetricNameTemplate fetchRequestTotal; public MetricNameTemplate recordsLagMax; + public MetricNameTemplate recordsLeadMin; public MetricNameTemplate fetchThrottleTimeAvg; public MetricNameTemplate fetchThrottleTimeMax; public MetricNameTemplate topicFetchSizeAvg; @@ -50,6 +51,9 @@ public class FetcherMetricsRegistry { public MetricNameTemplate partitionRecordsLag; public MetricNameTemplate partitionRecordsLagMax; public MetricNameTemplate partitionRecordsLagAvg; + public MetricNameTemplate partitionRecordsLead; + public MetricNameTemplate partitionRecordsLeadMin; + public MetricNameTemplate partitionRecordsLeadAvg; // To remove in 2.0 public MetricNameTemplate partitionRecordsLagDeprecated; public MetricNameTemplate partitionRecordsLagMaxDeprecated; @@ -96,6 +100,8 @@ public class FetcherMetricsRegistry { this.recordsLagMax = new MetricNameTemplate("records-lag-max", groupName, "The maximum lag in terms of number of records for any partition in this window", tags); + this.recordsLeadMin = new MetricNameTemplate("records-lead-min", groupName, + "The minimum lead in terms of number of records for any partition in this window", tags); this.fetchThrottleTimeAvg = new MetricNameTemplate("fetch-throttle-time-avg", groupName, "The average throttle time in ms", tags); @@ -138,7 +144,12 @@ public class FetcherMetricsRegistry { "The max lag of the partition", partitionTags); this.partitionRecordsLagAvg = new MetricNameTemplate("records-lag-avg", groupName, "The average lag of the partition", partitionTags); - + this.partitionRecordsLead = new MetricNameTemplate("records-lead", groupName, + "The latest lead of the partition", partitionTags); + this.partitionRecordsLeadMin = new MetricNameTemplate("records-lead-min", groupName, + "The min lead of the partition", partitionTags); + this.partitionRecordsLeadAvg = new MetricNameTemplate("records-lead-avg", groupName, + "The average lead of the partition", partitionTags); } @@ -156,6 +167,7 @@ public class FetcherMetricsRegistry { fetchRequestRate, fetchRequestTotal, recordsLagMax, + recordsLeadMin, fetchThrottleTimeAvg, fetchThrottleTimeMax, topicFetchSizeAvg, @@ -170,7 +182,10 @@ public class FetcherMetricsRegistry { partitionRecordsLagMaxDeprecated, partitionRecordsLag, partitionRecordsLagAvg, - partitionRecordsLagMax + partitionRecordsLagMax, + partitionRecordsLead, + partitionRecordsLeadMin, + partitionRecordsLeadAvg ); } diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/SubscriptionState.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/SubscriptionState.java index adced58d44f..822870713f4 100644 --- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/SubscriptionState.java +++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/SubscriptionState.java @@ -327,10 +327,19 @@ public class SubscriptionState { return topicPartitionState.highWatermark == null ? null : topicPartitionState.highWatermark - topicPartitionState.position; } + public Long partitionLead(TopicPartition tp) { + TopicPartitionState topicPartitionState = assignedState(tp); + return topicPartitionState.logStartOffset == null ? null : topicPartitionState.position - topicPartitionState.logStartOffset; + } + public void updateHighWatermark(TopicPartition tp, long highWatermark) { assignedState(tp).highWatermark = highWatermark; } + public void updateLogStartOffset(TopicPartition tp, long logStartOffset) { + assignedState(tp).logStartOffset = logStartOffset; + } + public void updateLastStableOffset(TopicPartition tp, long lastStableOffset) { assignedState(tp).lastStableOffset = lastStableOffset; } @@ -435,6 +444,7 @@ public class SubscriptionState { private static class TopicPartitionState { private Long position; // last consumed position private Long highWatermark; // the high watermark from last fetch + private Long logStartOffset; // the log start offset private Long lastStableOffset; private OffsetAndMetadata committed; // last committed position private boolean paused; // whether this partition has been paused by the user @@ -444,6 +454,7 @@ public class SubscriptionState { this.paused = false; this.position = null; this.highWatermark = null; + this.logStartOffset = null; this.lastStableOffset = null; this.committed = null; this.resetStrategy = null; diff --git a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetcherTest.java b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetcherTest.java index a0205e7f19e..e81e3d1fc3a 100644 --- a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetcherTest.java +++ b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetcherTest.java @@ -1210,6 +1210,45 @@ public class FetcherTest { assertFalse(allMetrics.containsKey(partitionLagMetric)); } + @Test + public void testFetcherLeadMetric() { + subscriptions.assignFromUser(singleton(tp0)); + subscriptions.seek(tp0, 0); + + MetricName minLeadMetric = metrics.metricInstance(metricsRegistry.recordsLeadMin); + Map tags = new HashMap<>(2); + tags.put("topic", tp0.topic()); + tags.put("partition", String.valueOf(tp0.partition())); + MetricName partitionLeadMetric = metrics.metricName("records-lead", metricGroup, "", tags); + + Map allMetrics = metrics.metrics(); + KafkaMetric recordsFetchLeadMin = allMetrics.get(minLeadMetric); + + // recordsFetchLeadMin should be initialized to MAX_VALUE + assertEquals(Double.MAX_VALUE, recordsFetchLeadMin.value(), EPSILON); + + // recordsFetchLeadMin should be position - logStartOffset after receiving an empty FetchResponse + fetchRecords(tp0, MemoryRecords.EMPTY, Errors.NONE, 100L, -1L, 0L, 0); + assertEquals(0L, recordsFetchLeadMin.value(), EPSILON); + + KafkaMetric partitionLead = allMetrics.get(partitionLeadMetric); + assertEquals(0L, partitionLead.value(), EPSILON); + + // recordsFetchLeadMin should be position - logStartOffset after receiving a non-empty FetchResponse + MemoryRecordsBuilder builder = MemoryRecords.builder(ByteBuffer.allocate(1024), CompressionType.NONE, + TimestampType.CREATE_TIME, 0L); + for (int v = 0; v < 3; v++) { + builder.appendWithOffset(v, RecordBatch.NO_TIMESTAMP, "key".getBytes(), ("value-" + v).getBytes()); + } + fetchRecords(tp0, builder.build(), Errors.NONE, 200L, -1L, 0L, 0); + assertEquals(0L, recordsFetchLeadMin.value(), EPSILON); + assertEquals(3L, partitionLead.value(), EPSILON); + + // verify de-registration of partition lag + subscriptions.unsubscribe(); + assertFalse(allMetrics.containsKey(partitionLeadMetric)); + } + @Test public void testReadCommittedLagMetric() { Metrics metrics = new Metrics(); @@ -1430,6 +1469,14 @@ public class FetcherTest { return fetcher.fetchedRecords(); } + private Map>> fetchRecords( + TopicPartition tp, MemoryRecords records, Errors error, long hw, long lastStableOffset, long logStartOffset, int throttleTime) { + assertEquals(1, fetcher.sendFetches()); + client.prepareResponse(fetchResponse(tp, records, error, hw, lastStableOffset, logStartOffset, throttleTime)); + consumerClient.poll(0); + return fetcher.fetchedRecords(); + } + @Test public void testGetOffsetsForTimesTimeout() { try { @@ -2132,6 +2179,13 @@ public class FetcherTest { return new FetchResponse(Errors.NONE, new LinkedHashMap<>(partitions), throttleTime, INVALID_SESSION_ID); } + private FetchResponse fetchResponse(TopicPartition tp, MemoryRecords records, Errors error, long hw, + long lastStableOffset, long logStartOffset, int throttleTime) { + Map partitions = Collections.singletonMap(tp, + new FetchResponse.PartitionData(error, hw, lastStableOffset, logStartOffset, null, records)); + return new FetchResponse(new LinkedHashMap<>(partitions), throttleTime); + } + private MetadataResponse newMetadataResponse(String topic, Errors error) { List partitionsMetadata = new ArrayList<>(); if (error == Errors.NONE) { diff --git a/core/src/test/scala/integration/kafka/api/PlaintextConsumerTest.scala b/core/src/test/scala/integration/kafka/api/PlaintextConsumerTest.scala index 103a28ae67b..a06e9e36528 100644 --- a/core/src/test/scala/integration/kafka/api/PlaintextConsumerTest.scala +++ b/core/src/test/scala/integration/kafka/api/PlaintextConsumerTest.scala @@ -1375,6 +1375,54 @@ class PlaintextConsumerTest extends BaseConsumerTest { assertEquals(500, consumer0.committed(tp2).offset) } + @Test + def testPerPartitionLeadMetricsCleanUpWithSubscribe() { + val numMessages = 1000 + val topic2 = "topic2" + createTopic(topic2, 2, serverCount) + // send some messages. + sendRecords(numMessages, tp) + // Test subscribe + // Create a consumer and consumer some messages. + consumerConfig.setProperty(ConsumerConfig.GROUP_ID_CONFIG, "testPerPartitionLeadMetricsCleanUpWithSubscribe") + consumerConfig.setProperty(ConsumerConfig.CLIENT_ID_CONFIG, "testPerPartitionLeadMetricsCleanUpWithSubscribe") + val consumer = new KafkaConsumer(this.consumerConfig, new ByteArrayDeserializer(), new ByteArrayDeserializer()) + try { + val listener0 = new TestConsumerReassignmentListener + consumer.subscribe(List(topic, topic2).asJava, listener0) + var records: ConsumerRecords[Array[Byte], Array[Byte]] = ConsumerRecords.empty() + TestUtils.waitUntilTrue(() => { + records = consumer.poll(100) + !records.records(tp).isEmpty + }, "Consumer did not consume any message before timeout.") + assertEquals("should be assigned once", 1, listener0.callsToAssigned) + // Verify the metric exist. + val tags1 = new util.HashMap[String, String]() + tags1.put("client-id", "testPerPartitionLeadMetricsCleanUpWithSubscribe") + tags1.put("topic", tp.topic()) + tags1.put("partition", String.valueOf(tp.partition())) + + val tags2 = new util.HashMap[String, String]() + tags2.put("client-id", "testPerPartitionLeadMetricsCleanUpWithSubscribe") + tags2.put("topic", tp2.topic()) + tags2.put("partition", String.valueOf(tp2.partition())) + val fetchLead0 = consumer.metrics.get(new MetricName("records-lead", "consumer-fetch-manager-metrics", "", tags1)) + assertNotNull(fetchLead0) + assertTrue(s"The lead should be ${records.count}", fetchLead0.metricValue() == records.count) + + // Remove topic from subscription + consumer.subscribe(List(topic2).asJava, listener0) + TestUtils.waitUntilTrue(() => { + consumer.poll(100) + listener0.callsToAssigned >= 2 + }, "Expected rebalance did not occur.") + // Verify the metric has gone + assertNull(consumer.metrics.get(new MetricName("records-lead", "consumer-fetch-manager-metrics", "", tags1))) + assertNull(consumer.metrics.get(new MetricName("records-lead", "consumer-fetch-manager-metrics", "", tags2))) + } finally { + consumer.close() + } + } @Test def testPerPartitionLagMetricsCleanUpWithSubscribe() { @@ -1426,6 +1474,41 @@ class PlaintextConsumerTest extends BaseConsumerTest { } } + @Test + def testPerPartitionLeadMetricsCleanUpWithAssign() { + val numMessages = 1000 + // Test assign + // send some messages. + sendRecords(numMessages, tp) + sendRecords(numMessages, tp2) + consumerConfig.setProperty(ConsumerConfig.GROUP_ID_CONFIG, "testPerPartitionLeadMetricsCleanUpWithAssign") + consumerConfig.setProperty(ConsumerConfig.CLIENT_ID_CONFIG, "testPerPartitionLeadMetricsCleanUpWithAssign") + val consumer = new KafkaConsumer(this.consumerConfig, new ByteArrayDeserializer(), new ByteArrayDeserializer()) + try { + consumer.assign(List(tp).asJava) + var records: ConsumerRecords[Array[Byte], Array[Byte]] = ConsumerRecords.empty() + TestUtils.waitUntilTrue(() => { + records = consumer.poll(100) + !records.records(tp).isEmpty + }, "Consumer did not consume any message before timeout.") + // Verify the metric exist. + val tags = new util.HashMap[String, String]() + tags.put("client-id", "testPerPartitionLeadMetricsCleanUpWithAssign") + tags.put("topic", tp.topic()) + tags.put("partition", String.valueOf(tp.partition())) + val fetchLead = consumer.metrics.get(new MetricName("records-lead", "consumer-fetch-manager-metrics", "", tags)) + assertNotNull(fetchLead) + + assertTrue(s"The lead should be ${records.count}", records.count == fetchLead.metricValue()) + + consumer.assign(List(tp2).asJava) + TestUtils.waitUntilTrue(() => !consumer.poll(100).isEmpty, "Consumer did not consume any message before timeout.") + assertNull(consumer.metrics.get(new MetricName("records-lead", "consumer-fetch-manager-metrics", "", tags))) + } finally { + consumer.close() + } + } + @Test def testPerPartitionLagMetricsCleanUpWithAssign() { val numMessages = 1000 @@ -1467,6 +1550,34 @@ class PlaintextConsumerTest extends BaseConsumerTest { } } + @Test + def testPerPartitionLeadWithMaxPollRecords() { + val numMessages = 1000 + val maxPollRecords = 10 + sendRecords(numMessages, tp) + consumerConfig.setProperty(ConsumerConfig.GROUP_ID_CONFIG, "testPerPartitionLeadWithMaxPollRecords") + consumerConfig.setProperty(ConsumerConfig.CLIENT_ID_CONFIG, "testPerPartitionLeadWithMaxPollRecords") + consumerConfig.setProperty(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, maxPollRecords.toString) + val consumer = new KafkaConsumer(this.consumerConfig, new ByteArrayDeserializer(), new ByteArrayDeserializer()) + consumer.assign(List(tp).asJava) + try { + var records: ConsumerRecords[Array[Byte], Array[Byte]] = ConsumerRecords.empty() + TestUtils.waitUntilTrue(() => { + records = consumer.poll(100) + !records.isEmpty + }, "Consumer did not consume any message before timeout.") + + val tags = new util.HashMap[String, String]() + tags.put("client-id", "testPerPartitionLeadWithMaxPollRecords") + tags.put("topic", tp.topic()) + tags.put("partition", String.valueOf(tp.partition())) + val lead = consumer.metrics.get(new MetricName("records-lead", "consumer-fetch-manager-metrics", "", tags)) + assertTrue(s"The lead should be $maxPollRecords", lead.metricValue() == maxPollRecords) + } finally { + consumer.close() + } + } + @Test def testPerPartitionLagWithMaxPollRecords() { val numMessages = 1000