@ -32,6 +32,7 @@ import org.apache.kafka.clients.consumer.OffsetResetStrategy;
import org.apache.kafka.common.Cluster ;
import org.apache.kafka.common.Cluster ;
import org.apache.kafka.common.KafkaException ;
import org.apache.kafka.common.KafkaException ;
import org.apache.kafka.common.MetricName ;
import org.apache.kafka.common.MetricName ;
import org.apache.kafka.common.MetricNameTemplate ;
import org.apache.kafka.common.Node ;
import org.apache.kafka.common.Node ;
import org.apache.kafka.common.PartitionInfo ;
import org.apache.kafka.common.PartitionInfo ;
import org.apache.kafka.common.TopicPartition ;
import org.apache.kafka.common.TopicPartition ;
@ -43,6 +44,7 @@ import org.apache.kafka.common.errors.TopicAuthorizationException;
import org.apache.kafka.common.header.Header ;
import org.apache.kafka.common.header.Header ;
import org.apache.kafka.common.header.internals.RecordHeader ;
import org.apache.kafka.common.header.internals.RecordHeader ;
import org.apache.kafka.common.metrics.KafkaMetric ;
import org.apache.kafka.common.metrics.KafkaMetric ;
import org.apache.kafka.common.metrics.MetricConfig ;
import org.apache.kafka.common.metrics.Metrics ;
import org.apache.kafka.common.metrics.Metrics ;
import org.apache.kafka.common.metrics.Sensor ;
import org.apache.kafka.common.metrics.Sensor ;
import org.apache.kafka.common.network.NetworkReceive ;
import org.apache.kafka.common.network.NetworkReceive ;
@ -1361,6 +1363,38 @@ public class FetcherTest {
assertEquals ( 3 , recordsCountAverage . value ( ) , EPSILON ) ;
assertEquals ( 3 , recordsCountAverage . value ( ) , EPSILON ) ;
}
}
@Test
public void testFetcherMetricsTemplates ( ) throws Exception {
metrics . close ( ) ;
Map < String , String > clientTags = Collections . singletonMap ( "client-id" , "clientA" ) ;
metrics = new Metrics ( new MetricConfig ( ) . tags ( clientTags ) ) ;
metricsRegistry = new FetcherMetricsRegistry ( clientTags . keySet ( ) , "consumer" + groupId ) ;
fetcher . close ( ) ;
fetcher = createFetcher ( subscriptions , metrics ) ;
// Fetch from topic to generate topic metrics
subscriptions . assignFromUser ( singleton ( tp0 ) ) ;
subscriptions . seek ( tp0 , 0 ) ;
assertEquals ( 1 , fetcher . sendFetches ( ) ) ;
client . prepareResponse ( fetchResponse ( tp0 , this . records , Errors . NONE , 100L , 0 ) ) ;
consumerClient . poll ( 0 ) ;
assertTrue ( fetcher . hasCompletedFetches ( ) ) ;
Map < TopicPartition , List < ConsumerRecord < byte [ ] , byte [ ] > > > partitionRecords = fetcher . fetchedRecords ( ) ;
assertTrue ( partitionRecords . containsKey ( tp0 ) ) ;
// Create throttle metrics
Fetcher . throttleTimeSensor ( metrics , metricsRegistry ) ;
// Verify that all metrics except metrics-count have registered templates
Set < MetricNameTemplate > allMetrics = new HashSet < > ( ) ;
for ( MetricName n : metrics . metrics ( ) . keySet ( ) ) {
String name = n . name ( ) . replaceAll ( tp0 . toString ( ) , "{topic}-{partition}" ) ;
if ( ! n . group ( ) . equals ( "kafka-metrics-count" ) )
allMetrics . add ( new MetricNameTemplate ( name , n . group ( ) , "" , n . tags ( ) . keySet ( ) ) ) ;
}
TestUtils . checkEquals ( allMetrics , new HashSet < > ( metricsRegistry . getAllTemplates ( ) ) , "metrics" , "templates" ) ;
}
private Map < TopicPartition , List < ConsumerRecord < byte [ ] , byte [ ] > > > fetchRecords (
private Map < TopicPartition , List < ConsumerRecord < byte [ ] , byte [ ] > > > fetchRecords (
TopicPartition tp , MemoryRecords records , Errors error , long hw , int throttleTime ) {
TopicPartition tp , MemoryRecords records , Errors error , long hw , int throttleTime ) {
return fetchRecords ( tp , records , error , hw , FetchResponse . INVALID_LAST_STABLE_OFFSET , throttleTime ) ;
return fetchRecords ( tp , records , error , hw , FetchResponse . INVALID_LAST_STABLE_OFFSET , throttleTime ) ;