Browse Source

KAFKA-6184; report a metric of the lag between the consumer offset ...

Add `records-lead` and partition-level `{topic}-{partition}.records-lead-min|avg` for fetcher metrics.

junrao  Please kindly review. Thanks.

Author: huxihx <huxi_2b@hotmail.com>

Reviewers: Jun Rao <junrao@gmail.com>

Closes #4191 from huxihx/KAFKA-6184
pull/4533/merge
huxihx 7 years ago committed by Jun Rao
parent
commit
7c97b239a5
  1. 47
      clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java
  2. 19
      clients/src/main/java/org/apache/kafka/clients/consumer/internals/FetcherMetricsRegistry.java
  3. 11
      clients/src/main/java/org/apache/kafka/clients/consumer/internals/SubscriptionState.java
  4. 54
      clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetcherTest.java
  5. 111
      core/src/test/scala/integration/kafka/api/PlaintextConsumerTest.scala

47
clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java

@ -45,6 +45,7 @@ import org.apache.kafka.common.metrics.stats.Avg; @@ -45,6 +45,7 @@ import org.apache.kafka.common.metrics.stats.Avg;
import org.apache.kafka.common.metrics.stats.Count;
import org.apache.kafka.common.metrics.stats.Max;
import org.apache.kafka.common.metrics.stats.Meter;
import org.apache.kafka.common.metrics.stats.Min;
import org.apache.kafka.common.metrics.stats.Value;
import org.apache.kafka.common.protocol.Errors;
import org.apache.kafka.common.record.BufferSupplier;
@ -587,6 +588,11 @@ public class Fetcher<K, V> implements SubscriptionState.Listener, Closeable { @@ -587,6 +588,11 @@ public class Fetcher<K, V> implements SubscriptionState.Listener, Closeable {
if (partitionLag != null)
this.sensors.recordPartitionLag(partitionRecords.partition, partitionLag);
Long lead = subscriptions.partitionLead(partitionRecords.partition);
if (lead != null) {
this.sensors.recordPartitionLead(partitionRecords.partition, lead);
}
return partRecords;
} else {
// these records aren't next in line based on the last consumed position, ignore them
@ -871,6 +877,11 @@ public class Fetcher<K, V> implements SubscriptionState.Listener, Closeable { @@ -871,6 +877,11 @@ public class Fetcher<K, V> implements SubscriptionState.Listener, Closeable {
subscriptions.updateHighWatermark(tp, partition.highWatermark);
}
if (partition.logStartOffset >= 0) {
log.trace("Updating log start offset for partition {} to {}", tp, partition.logStartOffset);
subscriptions.updateLogStartOffset(tp, partition.logStartOffset);
}
if (partition.lastStableOffset >= 0) {
log.trace("Updating last stable offset for partition {} to {}", tp, partition.lastStableOffset);
subscriptions.updateLastStableOffset(tp, partition.lastStableOffset);
@ -945,7 +956,7 @@ public class Fetcher<K, V> implements SubscriptionState.Listener, Closeable { @@ -945,7 +956,7 @@ public class Fetcher<K, V> implements SubscriptionState.Listener, Closeable {
@Override
public void onAssignment(Set<TopicPartition> assignment) {
sensors.updatePartitionLagSensors(assignment);
sensors.updatePartitionLagAndLeadSensors(assignment);
}
public static Sensor throttleTimeSensor(Metrics metrics, FetcherMetricsRegistry metricsRegistry) {
@ -1261,6 +1272,7 @@ public class Fetcher<K, V> implements SubscriptionState.Listener, Closeable { @@ -1261,6 +1272,7 @@ public class Fetcher<K, V> implements SubscriptionState.Listener, Closeable {
private final Sensor recordsFetched;
private final Sensor fetchLatency;
private final Sensor recordsFetchLag;
private final Sensor recordsFetchLead;
private Set<TopicPartition> assignedPartitions;
@ -1287,6 +1299,9 @@ public class Fetcher<K, V> implements SubscriptionState.Listener, Closeable { @@ -1287,6 +1299,9 @@ public class Fetcher<K, V> implements SubscriptionState.Listener, Closeable {
this.recordsFetchLag = metrics.sensor("records-lag");
this.recordsFetchLag.add(metrics.metricInstance(metricsRegistry.recordsLagMax), new Max());
this.recordsFetchLead = metrics.sensor("records-lead");
this.recordsFetchLead.add(metrics.metricInstance(metricsRegistry.recordsLeadMin), new Min());
}
private void recordTopicFetchMetrics(String topic, int bytes, int records) {
@ -1322,16 +1337,37 @@ public class Fetcher<K, V> implements SubscriptionState.Listener, Closeable { @@ -1322,16 +1337,37 @@ public class Fetcher<K, V> implements SubscriptionState.Listener, Closeable {
recordsFetched.record(records);
}
private void updatePartitionLagSensors(Set<TopicPartition> assignedPartitions) {
private void updatePartitionLagAndLeadSensors(Set<TopicPartition> assignedPartitions) {
if (this.assignedPartitions != null) {
for (TopicPartition tp : this.assignedPartitions) {
if (!assignedPartitions.contains(tp))
if (!assignedPartitions.contains(tp)) {
metrics.removeSensor(partitionLagMetricName(tp));
metrics.removeSensor(partitionLeadMetricName(tp));
}
}
}
this.assignedPartitions = assignedPartitions;
}
private void recordPartitionLead(TopicPartition tp, long lead) {
this.recordsFetchLead.record(lead);
String name = partitionLeadMetricName(tp);
Sensor recordsLead = this.metrics.getSensor(name);
if (recordsLead == null) {
Map<String, String> metricTags = new HashMap<>(2);
metricTags.put("topic", tp.topic().replace('.', '_'));
metricTags.put("partition", String.valueOf(tp.partition()));
recordsLead = this.metrics.sensor(name);
recordsLead.add(this.metrics.metricInstance(metricsRegistry.partitionRecordsLead, metricTags), new Value());
recordsLead.add(this.metrics.metricInstance(metricsRegistry.partitionRecordsLeadMin, metricTags), new Min());
recordsLead.add(this.metrics.metricInstance(metricsRegistry.partitionRecordsLeadAvg, metricTags), new Avg());
}
recordsLead.record(lead);
}
private void recordPartitionLag(TopicPartition tp, long lag) {
this.recordsFetchLag.record(lag);
@ -1364,6 +1400,11 @@ public class Fetcher<K, V> implements SubscriptionState.Listener, Closeable { @@ -1364,6 +1400,11 @@ public class Fetcher<K, V> implements SubscriptionState.Listener, Closeable {
private static String partitionLagMetricName(TopicPartition tp) {
return tp + ".records-lag";
}
private static String partitionLeadMetricName(TopicPartition tp) {
return tp + ".records-lead";
}
}
@Override

19
clients/src/main/java/org/apache/kafka/clients/consumer/internals/FetcherMetricsRegistry.java

@ -38,6 +38,7 @@ public class FetcherMetricsRegistry { @@ -38,6 +38,7 @@ public class FetcherMetricsRegistry {
public MetricNameTemplate fetchRequestRate;
public MetricNameTemplate fetchRequestTotal;
public MetricNameTemplate recordsLagMax;
public MetricNameTemplate recordsLeadMin;
public MetricNameTemplate fetchThrottleTimeAvg;
public MetricNameTemplate fetchThrottleTimeMax;
public MetricNameTemplate topicFetchSizeAvg;
@ -50,6 +51,9 @@ public class FetcherMetricsRegistry { @@ -50,6 +51,9 @@ public class FetcherMetricsRegistry {
public MetricNameTemplate partitionRecordsLag;
public MetricNameTemplate partitionRecordsLagMax;
public MetricNameTemplate partitionRecordsLagAvg;
public MetricNameTemplate partitionRecordsLead;
public MetricNameTemplate partitionRecordsLeadMin;
public MetricNameTemplate partitionRecordsLeadAvg;
// To remove in 2.0
public MetricNameTemplate partitionRecordsLagDeprecated;
public MetricNameTemplate partitionRecordsLagMaxDeprecated;
@ -96,6 +100,8 @@ public class FetcherMetricsRegistry { @@ -96,6 +100,8 @@ public class FetcherMetricsRegistry {
this.recordsLagMax = new MetricNameTemplate("records-lag-max", groupName,
"The maximum lag in terms of number of records for any partition in this window", tags);
this.recordsLeadMin = new MetricNameTemplate("records-lead-min", groupName,
"The minimum lead in terms of number of records for any partition in this window", tags);
this.fetchThrottleTimeAvg = new MetricNameTemplate("fetch-throttle-time-avg", groupName,
"The average throttle time in ms", tags);
@ -138,7 +144,12 @@ public class FetcherMetricsRegistry { @@ -138,7 +144,12 @@ public class FetcherMetricsRegistry {
"The max lag of the partition", partitionTags);
this.partitionRecordsLagAvg = new MetricNameTemplate("records-lag-avg", groupName,
"The average lag of the partition", partitionTags);
this.partitionRecordsLead = new MetricNameTemplate("records-lead", groupName,
"The latest lead of the partition", partitionTags);
this.partitionRecordsLeadMin = new MetricNameTemplate("records-lead-min", groupName,
"The min lead of the partition", partitionTags);
this.partitionRecordsLeadAvg = new MetricNameTemplate("records-lead-avg", groupName,
"The average lead of the partition", partitionTags);
}
@ -156,6 +167,7 @@ public class FetcherMetricsRegistry { @@ -156,6 +167,7 @@ public class FetcherMetricsRegistry {
fetchRequestRate,
fetchRequestTotal,
recordsLagMax,
recordsLeadMin,
fetchThrottleTimeAvg,
fetchThrottleTimeMax,
topicFetchSizeAvg,
@ -170,7 +182,10 @@ public class FetcherMetricsRegistry { @@ -170,7 +182,10 @@ public class FetcherMetricsRegistry {
partitionRecordsLagMaxDeprecated,
partitionRecordsLag,
partitionRecordsLagAvg,
partitionRecordsLagMax
partitionRecordsLagMax,
partitionRecordsLead,
partitionRecordsLeadMin,
partitionRecordsLeadAvg
);
}

11
clients/src/main/java/org/apache/kafka/clients/consumer/internals/SubscriptionState.java

@ -327,10 +327,19 @@ public class SubscriptionState { @@ -327,10 +327,19 @@ public class SubscriptionState {
return topicPartitionState.highWatermark == null ? null : topicPartitionState.highWatermark - topicPartitionState.position;
}
public Long partitionLead(TopicPartition tp) {
TopicPartitionState topicPartitionState = assignedState(tp);
return topicPartitionState.logStartOffset == null ? null : topicPartitionState.position - topicPartitionState.logStartOffset;
}
public void updateHighWatermark(TopicPartition tp, long highWatermark) {
assignedState(tp).highWatermark = highWatermark;
}
public void updateLogStartOffset(TopicPartition tp, long logStartOffset) {
assignedState(tp).logStartOffset = logStartOffset;
}
public void updateLastStableOffset(TopicPartition tp, long lastStableOffset) {
assignedState(tp).lastStableOffset = lastStableOffset;
}
@ -435,6 +444,7 @@ public class SubscriptionState { @@ -435,6 +444,7 @@ public class SubscriptionState {
private static class TopicPartitionState {
private Long position; // last consumed position
private Long highWatermark; // the high watermark from last fetch
private Long logStartOffset; // the log start offset
private Long lastStableOffset;
private OffsetAndMetadata committed; // last committed position
private boolean paused; // whether this partition has been paused by the user
@ -444,6 +454,7 @@ public class SubscriptionState { @@ -444,6 +454,7 @@ public class SubscriptionState {
this.paused = false;
this.position = null;
this.highWatermark = null;
this.logStartOffset = null;
this.lastStableOffset = null;
this.committed = null;
this.resetStrategy = null;

54
clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetcherTest.java

@ -1210,6 +1210,45 @@ public class FetcherTest { @@ -1210,6 +1210,45 @@ public class FetcherTest {
assertFalse(allMetrics.containsKey(partitionLagMetric));
}
@Test
public void testFetcherLeadMetric() {
subscriptions.assignFromUser(singleton(tp0));
subscriptions.seek(tp0, 0);
MetricName minLeadMetric = metrics.metricInstance(metricsRegistry.recordsLeadMin);
Map<String, String> tags = new HashMap<>(2);
tags.put("topic", tp0.topic());
tags.put("partition", String.valueOf(tp0.partition()));
MetricName partitionLeadMetric = metrics.metricName("records-lead", metricGroup, "", tags);
Map<MetricName, KafkaMetric> allMetrics = metrics.metrics();
KafkaMetric recordsFetchLeadMin = allMetrics.get(minLeadMetric);
// recordsFetchLeadMin should be initialized to MAX_VALUE
assertEquals(Double.MAX_VALUE, recordsFetchLeadMin.value(), EPSILON);
// recordsFetchLeadMin should be position - logStartOffset after receiving an empty FetchResponse
fetchRecords(tp0, MemoryRecords.EMPTY, Errors.NONE, 100L, -1L, 0L, 0);
assertEquals(0L, recordsFetchLeadMin.value(), EPSILON);
KafkaMetric partitionLead = allMetrics.get(partitionLeadMetric);
assertEquals(0L, partitionLead.value(), EPSILON);
// recordsFetchLeadMin should be position - logStartOffset after receiving a non-empty FetchResponse
MemoryRecordsBuilder builder = MemoryRecords.builder(ByteBuffer.allocate(1024), CompressionType.NONE,
TimestampType.CREATE_TIME, 0L);
for (int v = 0; v < 3; v++) {
builder.appendWithOffset(v, RecordBatch.NO_TIMESTAMP, "key".getBytes(), ("value-" + v).getBytes());
}
fetchRecords(tp0, builder.build(), Errors.NONE, 200L, -1L, 0L, 0);
assertEquals(0L, recordsFetchLeadMin.value(), EPSILON);
assertEquals(3L, partitionLead.value(), EPSILON);
// verify de-registration of partition lag
subscriptions.unsubscribe();
assertFalse(allMetrics.containsKey(partitionLeadMetric));
}
@Test
public void testReadCommittedLagMetric() {
Metrics metrics = new Metrics();
@ -1430,6 +1469,14 @@ public class FetcherTest { @@ -1430,6 +1469,14 @@ public class FetcherTest {
return fetcher.fetchedRecords();
}
private Map<TopicPartition, List<ConsumerRecord<byte[], byte[]>>> fetchRecords(
TopicPartition tp, MemoryRecords records, Errors error, long hw, long lastStableOffset, long logStartOffset, int throttleTime) {
assertEquals(1, fetcher.sendFetches());
client.prepareResponse(fetchResponse(tp, records, error, hw, lastStableOffset, logStartOffset, throttleTime));
consumerClient.poll(0);
return fetcher.fetchedRecords();
}
@Test
public void testGetOffsetsForTimesTimeout() {
try {
@ -2132,6 +2179,13 @@ public class FetcherTest { @@ -2132,6 +2179,13 @@ public class FetcherTest {
return new FetchResponse(Errors.NONE, new LinkedHashMap<>(partitions), throttleTime, INVALID_SESSION_ID);
}
private FetchResponse fetchResponse(TopicPartition tp, MemoryRecords records, Errors error, long hw,
long lastStableOffset, long logStartOffset, int throttleTime) {
Map<TopicPartition, FetchResponse.PartitionData> partitions = Collections.singletonMap(tp,
new FetchResponse.PartitionData(error, hw, lastStableOffset, logStartOffset, null, records));
return new FetchResponse(new LinkedHashMap<>(partitions), throttleTime);
}
private MetadataResponse newMetadataResponse(String topic, Errors error) {
List<MetadataResponse.PartitionMetadata> partitionsMetadata = new ArrayList<>();
if (error == Errors.NONE) {

111
core/src/test/scala/integration/kafka/api/PlaintextConsumerTest.scala

@ -1375,6 +1375,54 @@ class PlaintextConsumerTest extends BaseConsumerTest { @@ -1375,6 +1375,54 @@ class PlaintextConsumerTest extends BaseConsumerTest {
assertEquals(500, consumer0.committed(tp2).offset)
}
@Test
def testPerPartitionLeadMetricsCleanUpWithSubscribe() {
val numMessages = 1000
val topic2 = "topic2"
createTopic(topic2, 2, serverCount)
// send some messages.
sendRecords(numMessages, tp)
// Test subscribe
// Create a consumer and consumer some messages.
consumerConfig.setProperty(ConsumerConfig.GROUP_ID_CONFIG, "testPerPartitionLeadMetricsCleanUpWithSubscribe")
consumerConfig.setProperty(ConsumerConfig.CLIENT_ID_CONFIG, "testPerPartitionLeadMetricsCleanUpWithSubscribe")
val consumer = new KafkaConsumer(this.consumerConfig, new ByteArrayDeserializer(), new ByteArrayDeserializer())
try {
val listener0 = new TestConsumerReassignmentListener
consumer.subscribe(List(topic, topic2).asJava, listener0)
var records: ConsumerRecords[Array[Byte], Array[Byte]] = ConsumerRecords.empty()
TestUtils.waitUntilTrue(() => {
records = consumer.poll(100)
!records.records(tp).isEmpty
}, "Consumer did not consume any message before timeout.")
assertEquals("should be assigned once", 1, listener0.callsToAssigned)
// Verify the metric exist.
val tags1 = new util.HashMap[String, String]()
tags1.put("client-id", "testPerPartitionLeadMetricsCleanUpWithSubscribe")
tags1.put("topic", tp.topic())
tags1.put("partition", String.valueOf(tp.partition()))
val tags2 = new util.HashMap[String, String]()
tags2.put("client-id", "testPerPartitionLeadMetricsCleanUpWithSubscribe")
tags2.put("topic", tp2.topic())
tags2.put("partition", String.valueOf(tp2.partition()))
val fetchLead0 = consumer.metrics.get(new MetricName("records-lead", "consumer-fetch-manager-metrics", "", tags1))
assertNotNull(fetchLead0)
assertTrue(s"The lead should be ${records.count}", fetchLead0.metricValue() == records.count)
// Remove topic from subscription
consumer.subscribe(List(topic2).asJava, listener0)
TestUtils.waitUntilTrue(() => {
consumer.poll(100)
listener0.callsToAssigned >= 2
}, "Expected rebalance did not occur.")
// Verify the metric has gone
assertNull(consumer.metrics.get(new MetricName("records-lead", "consumer-fetch-manager-metrics", "", tags1)))
assertNull(consumer.metrics.get(new MetricName("records-lead", "consumer-fetch-manager-metrics", "", tags2)))
} finally {
consumer.close()
}
}
@Test
def testPerPartitionLagMetricsCleanUpWithSubscribe() {
@ -1426,6 +1474,41 @@ class PlaintextConsumerTest extends BaseConsumerTest { @@ -1426,6 +1474,41 @@ class PlaintextConsumerTest extends BaseConsumerTest {
}
}
@Test
def testPerPartitionLeadMetricsCleanUpWithAssign() {
val numMessages = 1000
// Test assign
// send some messages.
sendRecords(numMessages, tp)
sendRecords(numMessages, tp2)
consumerConfig.setProperty(ConsumerConfig.GROUP_ID_CONFIG, "testPerPartitionLeadMetricsCleanUpWithAssign")
consumerConfig.setProperty(ConsumerConfig.CLIENT_ID_CONFIG, "testPerPartitionLeadMetricsCleanUpWithAssign")
val consumer = new KafkaConsumer(this.consumerConfig, new ByteArrayDeserializer(), new ByteArrayDeserializer())
try {
consumer.assign(List(tp).asJava)
var records: ConsumerRecords[Array[Byte], Array[Byte]] = ConsumerRecords.empty()
TestUtils.waitUntilTrue(() => {
records = consumer.poll(100)
!records.records(tp).isEmpty
}, "Consumer did not consume any message before timeout.")
// Verify the metric exist.
val tags = new util.HashMap[String, String]()
tags.put("client-id", "testPerPartitionLeadMetricsCleanUpWithAssign")
tags.put("topic", tp.topic())
tags.put("partition", String.valueOf(tp.partition()))
val fetchLead = consumer.metrics.get(new MetricName("records-lead", "consumer-fetch-manager-metrics", "", tags))
assertNotNull(fetchLead)
assertTrue(s"The lead should be ${records.count}", records.count == fetchLead.metricValue())
consumer.assign(List(tp2).asJava)
TestUtils.waitUntilTrue(() => !consumer.poll(100).isEmpty, "Consumer did not consume any message before timeout.")
assertNull(consumer.metrics.get(new MetricName("records-lead", "consumer-fetch-manager-metrics", "", tags)))
} finally {
consumer.close()
}
}
@Test
def testPerPartitionLagMetricsCleanUpWithAssign() {
val numMessages = 1000
@ -1467,6 +1550,34 @@ class PlaintextConsumerTest extends BaseConsumerTest { @@ -1467,6 +1550,34 @@ class PlaintextConsumerTest extends BaseConsumerTest {
}
}
@Test
def testPerPartitionLeadWithMaxPollRecords() {
val numMessages = 1000
val maxPollRecords = 10
sendRecords(numMessages, tp)
consumerConfig.setProperty(ConsumerConfig.GROUP_ID_CONFIG, "testPerPartitionLeadWithMaxPollRecords")
consumerConfig.setProperty(ConsumerConfig.CLIENT_ID_CONFIG, "testPerPartitionLeadWithMaxPollRecords")
consumerConfig.setProperty(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, maxPollRecords.toString)
val consumer = new KafkaConsumer(this.consumerConfig, new ByteArrayDeserializer(), new ByteArrayDeserializer())
consumer.assign(List(tp).asJava)
try {
var records: ConsumerRecords[Array[Byte], Array[Byte]] = ConsumerRecords.empty()
TestUtils.waitUntilTrue(() => {
records = consumer.poll(100)
!records.isEmpty
}, "Consumer did not consume any message before timeout.")
val tags = new util.HashMap[String, String]()
tags.put("client-id", "testPerPartitionLeadWithMaxPollRecords")
tags.put("topic", tp.topic())
tags.put("partition", String.valueOf(tp.partition()))
val lead = consumer.metrics.get(new MetricName("records-lead", "consumer-fetch-manager-metrics", "", tags))
assertTrue(s"The lead should be $maxPollRecords", lead.metricValue() == maxPollRecords)
} finally {
consumer.close()
}
}
@Test
def testPerPartitionLagWithMaxPollRecords() {
val numMessages = 1000

Loading…
Cancel
Save