@ -28,6 +28,7 @@ import org.apache.kafka.common.errors.InvalidMetadataException;
@@ -28,6 +28,7 @@ import org.apache.kafka.common.errors.InvalidMetadataException;
import org.apache.kafka.common.errors.InvalidTopicException ;
import org.apache.kafka.common.errors.RecordTooLargeException ;
import org.apache.kafka.common.errors.RetriableException ;
import org.apache.kafka.common.errors.SerializationException ;
import org.apache.kafka.common.errors.TimeoutException ;
import org.apache.kafka.common.errors.TopicAuthorizationException ;
import org.apache.kafka.common.metrics.Metrics ;
@ -38,8 +39,10 @@ import org.apache.kafka.common.metrics.stats.Max;
@@ -38,8 +39,10 @@ import org.apache.kafka.common.metrics.stats.Max;
import org.apache.kafka.common.metrics.stats.Rate ;
import org.apache.kafka.common.protocol.ApiKeys ;
import org.apache.kafka.common.protocol.Errors ;
import org.apache.kafka.common.record.InvalidRecordException ;
import org.apache.kafka.common.record.LogEntry ;
import org.apache.kafka.common.record.MemoryRecords ;
import org.apache.kafka.common.record.Record ;
import org.apache.kafka.common.record.TimestampType ;
import org.apache.kafka.common.requests.FetchRequest ;
import org.apache.kafka.common.requests.FetchResponse ;
@ -59,7 +62,6 @@ import java.util.Collections;
@@ -59,7 +62,6 @@ import java.util.Collections;
import java.util.HashMap ;
import java.util.HashSet ;
import java.util.Iterator ;
import java.util.LinkedList ;
import java.util.List ;
import java.util.Locale ;
import java.util.Map ;
@ -83,13 +85,11 @@ public class Fetcher<K, V> {
@@ -83,13 +85,11 @@ public class Fetcher<K, V> {
private final Metadata metadata ;
private final FetchManagerMetrics sensors ;
private final SubscriptionState subscriptions ;
private final List < PartitionRecords < K , V > > record s;
private final List < CompletedFetch > completedFetche s;
private final Deserializer < K > keyDeserializer ;
private final Deserializer < V > valueDeserializer ;
private final Map < TopicPartition , Long > offsetOutOfRangePartitions ;
private final Set < String > unauthorizedTopics ;
private final Map < TopicPartition , Long > recordTooLargePartitions ;
private PartitionRecords < K , V > nextInLineRecords = null ;
public Fetcher ( ConsumerNetworkClient client ,
int minBytes ,
@ -105,7 +105,6 @@ public class Fetcher<K, V> {
@@ -105,7 +105,6 @@ public class Fetcher<K, V> {
String metricGrpPrefix ,
Time time ,
long retryBackoffMs ) {
this . time = time ;
this . client = client ;
this . metadata = metadata ;
@ -115,31 +114,37 @@ public class Fetcher<K, V> {
@@ -115,31 +114,37 @@ public class Fetcher<K, V> {
this . fetchSize = fetchSize ;
this . maxPollRecords = maxPollRecords ;
this . checkCrcs = checkCrcs ;
this . keyDeserializer = keyDeserializer ;
this . valueDeserializer = valueDeserializer ;
this . records = new LinkedList < > ( ) ;
this . offsetOutOfRangePartitions = new HashMap < > ( ) ;
this . unauthorizedTopics = new HashSet < > ( ) ;
this . recordTooLargePartitions = new HashMap < > ( ) ;
this . completedFetches = new ArrayList < > ( ) ;
this . sensors = new FetchManagerMetrics ( metrics , metricGrpPrefix ) ;
this . retryBackoffMs = retryBackoffMs ;
}
/ * *
* Set - up a fetch request for any node that we have assigned partitions for which doesn ' t have one .
*
* Set - up a fetch request for any node that we have assigned partitions for which doesn ' t already have
* an in - flight fetch or pending fetch data .
* /
public void sendFetches ( ) {
for ( Map . Entry < Node , FetchRequest > fetchEntry : createFetchRequests ( ) . entrySet ( ) ) {
final FetchRequest fetch = fetchEntry . getValue ( ) ;
client . send ( fetchEntry . getKey ( ) , ApiKeys . FETCH , fetch )
final FetchRequest request = fetchEntry . getValue ( ) ;
client . send ( fetchEntry . getKey ( ) , ApiKeys . FETCH , request )
. addListener ( new RequestFutureListener < ClientResponse > ( ) {
@Override
public void onSuccess ( ClientResponse response ) {
handleFetchResponse ( response , fetch ) ;
public void onSuccess ( ClientResponse resp ) {
FetchResponse response = new FetchResponse ( resp . responseBody ( ) ) ;
Set < TopicPartition > partitions = new HashSet < > ( response . responseData ( ) . keySet ( ) ) ;
FetchResponseMetricAggregator metricAggregator = new FetchResponseMetricAggregator ( sensors , partitions ) ;
for ( Map . Entry < TopicPartition , FetchResponse . PartitionData > entry : response . responseData ( ) . entrySet ( ) ) {
TopicPartition partition = entry . getKey ( ) ;
long fetchOffset = request . fetchData ( ) . get ( partition ) . offset ;
FetchResponse . PartitionData fetchData = entry . getValue ( ) ;
completedFetches . add ( new CompletedFetch ( partition , fetchOffset , fetchData , metricAggregator ) ) ;
}
sensors . fetchLatency . record ( resp . requestLatencyMs ( ) ) ;
sensors . fetchThrottleTimeSensor . record ( response . getThrottleTime ( ) ) ;
}
@Override
@ -152,7 +157,7 @@ public class Fetcher<K, V> {
@@ -152,7 +157,7 @@ public class Fetcher<K, V> {
/ * *
* Update the fetch positions for the provided partitions .
* @param partitions
* @param partitions the partitions to update positions for
* @throws NoOffsetForPartitionException If no offset is stored for a given partition and no reset policy is available
* /
public void updateFetchPositions ( Set < TopicPartition > partitions ) {
@ -323,62 +328,6 @@ public class Fetcher<K, V> {
@@ -323,62 +328,6 @@ public class Fetcher<K, V> {
}
}
/ * *
* If any partition from previous fetchResponse contains OffsetOutOfRange error and
* the defaultResetPolicy is NONE , throw OffsetOutOfRangeException
*
* @throws OffsetOutOfRangeException If there is OffsetOutOfRange error in fetchResponse
* /
private void throwIfOffsetOutOfRange ( ) throws OffsetOutOfRangeException {
Map < TopicPartition , Long > currentOutOfRangePartitions = new HashMap < > ( ) ;
// filter offsetOutOfRangePartitions to retain only the fetchable partitions
for ( Map . Entry < TopicPartition , Long > entry : this . offsetOutOfRangePartitions . entrySet ( ) ) {
if ( ! subscriptions . isFetchable ( entry . getKey ( ) ) ) {
log . debug ( "Ignoring fetched records for {} since it is no longer fetchable" , entry . getKey ( ) ) ;
continue ;
}
Long position = subscriptions . position ( entry . getKey ( ) ) ;
// ignore partition if the current position != the offset in fetchResponse, e.g. after seek()
if ( position ! = null & & entry . getValue ( ) . equals ( position ) )
currentOutOfRangePartitions . put ( entry . getKey ( ) , entry . getValue ( ) ) ;
}
this . offsetOutOfRangePartitions . clear ( ) ;
if ( ! currentOutOfRangePartitions . isEmpty ( ) )
throw new OffsetOutOfRangeException ( currentOutOfRangePartitions ) ;
}
/ * *
* If any topic from previous fetchResponse contains an Authorization error , raise an exception
* @throws TopicAuthorizationException
* /
private void throwIfUnauthorizedTopics ( ) throws TopicAuthorizationException {
if ( ! unauthorizedTopics . isEmpty ( ) ) {
Set < String > topics = new HashSet < > ( unauthorizedTopics ) ;
unauthorizedTopics . clear ( ) ;
throw new TopicAuthorizationException ( topics ) ;
}
}
/ * *
* If any partition from previous fetchResponse gets a RecordTooLarge error , throw RecordTooLargeException
*
* @throws RecordTooLargeException If there is a message larger than fetch size and hence cannot be ever returned
* /
private void throwIfRecordTooLarge ( ) throws RecordTooLargeException {
Map < TopicPartition , Long > copiedRecordTooLargePartitions = new HashMap < > ( this . recordTooLargePartitions ) ;
this . recordTooLargePartitions . clear ( ) ;
if ( ! copiedRecordTooLargePartitions . isEmpty ( ) )
throw new RecordTooLargeException ( "There are some messages at [Partition=Offset]: "
+ copiedRecordTooLargePartitions
+ " whose size is larger than the fetch size "
+ this . fetchSize
+ " and hence cannot be ever returned."
+ " Increase the fetch size, or decrease the maximum message size the broker will allow." ,
copiedRecordTooLargePartitions ) ;
}
/ * *
* Return the fetched records , empty the record buffer and update the consumed position .
*
@ -393,60 +342,68 @@ public class Fetcher<K, V> {
@@ -393,60 +342,68 @@ public class Fetcher<K, V> {
return Collections . emptyMap ( ) ;
} else {
Map < TopicPartition , List < ConsumerRecord < K , V > > > drained = new HashMap < > ( ) ;
throwIfOffsetOutOfRange ( ) ;
throwIfUnauthorizedTopics ( ) ;
throwIfRecordTooLarge ( ) ;
int maxRecords = maxPollRecords ;
Iterator < PartitionRecords < K , V > > iterator = records . iterator ( ) ;
while ( iterator . hasNext ( ) & & maxRecords > 0 ) {
PartitionRecords < K , V > part = iterator . next ( ) ;
maxRecords - = append ( drained , part , maxRecords ) ;
if ( part . isConsumed ( ) )
iterator . remove ( ) ;
int recordsRemaining = maxPollRecords ;
Iterator < CompletedFetch > completedFetchesIterator = completedFetches . iterator ( ) ;
while ( recordsRemaining > 0 ) {
if ( nextInLineRecords = = null | | nextInLineRecords . isEmpty ( ) ) {
if ( ! completedFetchesIterator . hasNext ( ) )
break ;
CompletedFetch completion = completedFetchesIterator . next ( ) ;
completedFetchesIterator . remove ( ) ;
nextInLineRecords = parseFetchedData ( completion ) ;
} else {
recordsRemaining - = append ( drained , nextInLineRecords , recordsRemaining ) ;
}
}
return drained ;
}
}
private int append ( Map < TopicPartition , List < ConsumerRecord < K , V > > > drained ,
PartitionRecords < K , V > part ,
PartitionRecords < K , V > partitionRecords ,
int maxRecords ) {
if ( ! subscriptions . isAssigned ( part . partition ) ) {
if ( partitionRecords . isEmpty ( ) )
return 0 ;
if ( ! subscriptions . isAssigned ( partitionRecords . partition ) ) {
// this can happen when a rebalance happened before fetched records are returned to the consumer's poll call
log . debug ( "Not returning fetched records for partition {} since it is no longer assigned" , part . partition ) ;
log . debug ( "Not returning fetched records for partition {} since it is no longer assigned" , partitionRecords . partition ) ;
} else {
// note that the consumed position should always be available as long as the partition is still assigned
long position = subscriptions . position ( part . partition ) ;
if ( ! subscriptions . isFetchable ( part . partition ) ) {
long position = subscriptions . position ( partitionRecords . partition ) ;
if ( ! subscriptions . isFetchable ( partitionRecords . partition ) ) {
// this can happen when a partition is paused before fetched records are returned to the consumer's poll call
log . debug ( "Not returning fetched records for assigned partition {} since it is no longer fetchable" , part . partition ) ;
} else if ( part . fetchOffset = = position ) {
List < ConsumerRecord < K , V > > partRecords = part . take ( maxRecords ) ;
log . debug ( "Not returning fetched records for assigned partition {} since it is no longer fetchable" , partitionRecords . partition ) ;
} else if ( partitionRecords . fetchOffset = = position ) {
// we are ensured to have at least one record since we already checked for emptiness
List < ConsumerRecord < K , V > > partRecords = partitionRecords . take ( maxRecords ) ;
long nextOffset = partRecords . get ( partRecords . size ( ) - 1 ) . offset ( ) + 1 ;
log . trace ( "Returning fetched records at offset {} for assigned partition {} and update " +
"position to {}" , position , part . partition , nextOffset ) ;
"position to {}" , position , partitionRecords . partition , nextOffset ) ;
List < ConsumerRecord < K , V > > records = drained . get ( part . partition ) ;
List < ConsumerRecord < K , V > > records = drained . get ( partitionRecords . partition ) ;
if ( records = = null ) {
records = partRecords ;
drained . put ( part . partition , records ) ;
drained . put ( partitionRecords . partition , records ) ;
} else {
records . addAll ( partRecords ) ;
}
subscriptions . position ( part . partition , nextOffset ) ;
subscriptions . position ( partitionRecords . partition , nextOffset ) ;
return partRecords . size ( ) ;
} else {
// these records aren't next in line based on the last consumed position, ignore them
// they must be from an obsolete request
log . debug ( "Ignoring fetched records for {} at offset {} since the current position is {}" ,
part . partition , part . fetchOffset , position ) ;
partitionRecords . partition , partitionRecords . fetchOffset , position ) ;
}
}
part . discard ( ) ;
partitionRecords . discard ( ) ;
return 0 ;
}
@ -513,10 +470,10 @@ public class Fetcher<K, V> {
@@ -513,10 +470,10 @@ public class Fetcher<K, V> {
private Set < TopicPartition > fetchablePartitions ( ) {
Set < TopicPartition > fetchable = subscriptions . fetchablePartitions ( ) ;
if ( records . isEmpty ( ) )
return fetchable ;
for ( PartitionRecords < K , V > partitionRecords : record s)
fetchable . remove ( partitionRecords . partition ) ;
if ( nextInLineReco rds ! = null & & ! nextInLineR ecords. isEmpty ( ) )
fetchable . remove ( nextInLineRecords . partition ) ;
for ( CompletedFetch completedFetch : completedFetche s)
fetchable . remove ( completedFetch . partition ) ;
return fetchable ;
}
@ -559,30 +516,29 @@ public class Fetcher<K, V> {
@@ -559,30 +516,29 @@ public class Fetcher<K, V> {
/ * *
* The callback for fetch completion
* /
private void handleFetchResponse ( ClientResponse resp , FetchRequest request ) {
int totalBytes = 0 ;
int totalCount = 0 ;
FetchResponse response = new FetchResponse ( resp . responseBody ( ) ) ;
for ( Map . Entry < TopicPartition , FetchResponse . PartitionData > entry : response . responseData ( ) . entrySet ( ) ) {
TopicPartition tp = entry . getKey ( ) ;
FetchResponse . PartitionData partition = entry . getValue ( ) ;
private PartitionRecords < K , V > parseFetchedData ( CompletedFetch completedFetch ) {
TopicPartition tp = completedFetch . partition ;
FetchResponse . PartitionData partition = completedFetch . partitionData ;
long fetchOffset = completedFetch . fetchedOffset ;
int bytes = 0 ;
int recordsCount = 0 ;
PartitionRecords < K , V > parsedRecords = null ;
try {
if ( ! subscriptions . isFetchable ( tp ) ) {
// this can happen when a rebalance happened or a partition consumption paused
// while fetch is still in-flight
log . debug ( "Ignoring fetched records for partition {} since it is no longer fetchable" , tp ) ;
} else if ( partition . errorCode = = Errors . NONE . code ( ) ) {
long fetchOffset = request . fetchData ( ) . get ( tp ) . offset ;
// we are interested in this fetch only if the beginning offset matches the
// current consumed position
Long position = subscriptions . position ( tp ) ;
if ( position = = null | | position ! = fetchOffset ) {
log . debug ( "Discarding fetch response for partition {} since its offset {} does not match " +
log . debug ( "Discarding stale fetch response for partition {} since its offset {} does not match " +
"the expected offset {}" , tp , fetchOffset , position ) ;
continue ;
return null ;
}
int bytes = 0 ;
ByteBuffer buffer = partition . recordSet ;
MemoryRecords records = MemoryRecords . readableRecords ( buffer ) ;
List < ConsumerRecord < K , V > > parsed = new ArrayList < > ( ) ;
@ -597,79 +553,95 @@ public class Fetcher<K, V> {
@@ -597,79 +553,95 @@ public class Fetcher<K, V> {
}
}
recordsCount = parsed . size ( ) ;
this . sensors . recordTopicFetchMetrics ( tp . topic ( ) , bytes , recordsCount ) ;
if ( ! parsed . isEmpty ( ) ) {
log . trace ( "Adding fetched record for partition {} with offset {} to buffered record list" , tp , position ) ;
parsedRecords = new PartitionRecords < > ( fetchOffset , tp , parsed ) ;
ConsumerRecord < K , V > record = parsed . get ( parsed . size ( ) - 1 ) ;
this . records . add ( new PartitionRecords < > ( fetchOffset , tp , parsed ) ) ;
this . sensors . recordsFetchLag . record ( partition . highWatermark - record . offset ( ) ) ;
} else if ( buffer . limit ( ) > 0 & & ! skippedRecords ) {
// we did not read a single message from a non-empty buffer
// because that message's size is larger than fetch size, in this case
// record this exception
this . recordTooLargePartitions . put ( tp , fetchOffset ) ;
Map < TopicPartition , Long > recordTooLargePartitions = Collections . singletonMap ( tp , fetchOffset ) ;
throw new RecordTooLargeException ( "There are some messages at [Partition=Offset]: "
+ recordTooLargePartitions
+ " whose size is larger than the fetch size "
+ this . fetchSize
+ " and hence cannot be ever returned."
+ " Increase the fetch size on the client (using max.partition.fetch.bytes),"
+ " or decrease the maximum message size the broker will allow (using message.max.bytes)." ,
recordTooLargePartitions ) ;
}
this . sensors . recordTopicFetchMetrics ( tp . topic ( ) , bytes , parsed . size ( ) ) ;
totalBytes + = bytes ;
totalCount + = parsed . size ( ) ;
} else if ( partition . errorCode = = Errors . NOT_LEADER_FOR_PARTITION . code ( )
| | partition . errorCode = = Errors . UNKNOWN_TOPIC_OR_PARTITION . code ( ) ) {
| | partition . errorCode = = Errors . UNKNOWN_TOPIC_OR_PARTITION . code ( ) ) {
this . metadata . requestUpdate ( ) ;
} else if ( partition . errorCode = = Errors . OFFSET_OUT_OF_RANGE . code ( ) ) {
long fetchOffset = request . fetchData ( ) . get ( tp ) . offset ;
if ( subscriptions . hasDefaultOffsetResetPolicy ( ) )
if ( fetchOffset ! = subscriptions . position ( tp ) ) {
log . debug ( "Discarding stale fetch response for partition {} since the fetched offset {}" +
"does not match the current offset {}" , tp , fetchOffset , subscriptions . position ( tp ) ) ;
} else if ( subscriptions . hasDefaultOffsetResetPolicy ( ) ) {
log . info ( "Fetch offset {} is out of range for partition {}, resetting offset" , fetchOffset , tp ) ;
subscriptions . needOffsetReset ( tp ) ;
else
this . offsetOutOfRangePartitions . put ( tp , fetchOffset ) ;
log . info ( "Fetch offset { } is out of range, resetting offset" , fetchOffset ) ;
} else {
throw new OffsetOutOfRangeException ( Collections . singletonMap ( tp , fetchOffset ) ) ;
}
} else if ( partition . errorCode = = Errors . TOPIC_AUTHORIZATION_FAILED . code ( ) ) {
log . warn ( "Not authorized to read from topic {}." , tp . topic ( ) ) ;
unauthorizedTopics . add ( tp . topic ( ) ) ;
throw new TopicAuthorizationException ( Collections . singleton ( tp . topic ( ) ) ) ;
} else if ( partition . errorCode = = Errors . UNKNOWN . code ( ) ) {
log . warn ( "Unknown error fetching data for topic-partition {}" , tp ) ;
} else {
throw new IllegalStateException ( "Unexpected error code " + partition . errorCode + " while fetching data" ) ;
}
} finally {
completedFetch . metricAggregator . record ( tp , bytes , recordsCount ) ;
}
this . sensors . bytesFetched . record ( totalBytes ) ;
this . sensors . recordsFetched . record ( totalCount ) ;
this . sensors . fetchThrottleTimeSensor . record ( response . getThrottleTime ( ) ) ;
this . sensors . fetchLatency . record ( resp . requestLatencyMs ( ) ) ;
return parsedRecords ;
}
/ * *
* Parse the record entry , deserializing the key / value fields if necessary
* /
private ConsumerRecord < K , V > parseRecord ( TopicPartition partition , LogEntry logEntry ) {
Record record = logEntry . record ( ) ;
if ( this . checkCrcs & & ! record . isValid ( ) )
throw new InvalidRecordException ( "Record for partition " + partition + " at offset "
+ logEntry . offset ( ) + " is corrupt (stored crc = " + record . checksum ( )
+ ", computed crc = "
+ record . computeChecksum ( )
+ ")" ) ;
try {
if ( this . checkCrcs )
logEntry . record ( ) . ensureValid ( ) ;
long offset = logEntry . offset ( ) ;
long timestamp = logEntry . record ( ) . timestamp ( ) ;
TimestampType timestampType = logEntry . record ( ) . timestampType ( ) ;
ByteBuffer keyBytes = logEntry . record ( ) . key ( ) ;
long timestamp = record . timestamp ( ) ;
TimestampType timestampType = record . timestampType ( ) ;
ByteBuffer keyBytes = record . key ( ) ;
byte [ ] keyByteArray = keyBytes = = null ? null : Utils . toArray ( keyBytes ) ;
K key = keyBytes = = null ? null : this . keyDeserializer . deserialize ( partition . topic ( ) , keyByteArray ) ;
ByteBuffer valueBytes = logEntry . record ( ) . value ( ) ;
ByteBuffer valueBytes = record . value ( ) ;
byte [ ] valueByteArray = valueBytes = = null ? null : Utils . toArray ( valueBytes ) ;
V value = valueBytes = = null ? null : this . valueDeserializer . deserialize ( partition . topic ( ) , valueByteArray ) ;
return new ConsumerRecord < > ( partition . topic ( ) , partition . partition ( ) , offset ,
timestamp , timestampType , logEntry . record ( ) . checksum ( ) ,
timestamp , timestampType , record . checksum ( ) ,
keyByteArray = = null ? ConsumerRecord . NULL_SIZE : keyByteArray . length ,
valueByteArray = = null ? ConsumerRecord . NULL_SIZE : valueByteArray . length ,
key , value ) ;
} catch ( KafkaException e ) {
throw e ;
} catch ( RuntimeException e ) {
throw new KafkaException ( "Error deserializing key/value for partition " + partition + " at offset " + logEntry . offset ( ) , e ) ;
throw new SerializationException ( "Error deserializing key/value for partition " + partition +
" at offset " + logEntry . offset ( ) , e ) ;
}
}
private static class PartitionRecords < K , V > {
public long fetchOffset ;
public TopicPartition partition ;
public List < ConsumerRecord < K , V > > records ;
private long fetchOffset ;
private TopicPartition partition ;
private List < ConsumerRecord < K , V > > records ;
public PartitionRecords ( long fetchOffset , TopicPartition partition , List < ConsumerRecord < K , V > > records ) {
this . fetchOffset = fetchOffset ;
@ -677,7 +649,7 @@ public class Fetcher<K, V> {
@@ -677,7 +649,7 @@ public class Fetcher<K, V> {
this . records = records ;
}
private boolean isConsumed ( ) {
private boolean isEmpty ( ) {
return records = = null | | records . isEmpty ( ) ;
}
@ -687,7 +659,7 @@ public class Fetcher<K, V> {
@@ -687,7 +659,7 @@ public class Fetcher<K, V> {
private List < ConsumerRecord < K , V > > take ( int n ) {
if ( records = = null )
return Collections . emptyList ( ) ;
return new ArrayList < > ( ) ;
if ( n > = records . size ( ) ) {
List < ConsumerRecord < K , V > > res = this . records ;
@ -709,7 +681,59 @@ public class Fetcher<K, V> {
@@ -709,7 +681,59 @@ public class Fetcher<K, V> {
}
}
private class FetchManagerMetrics {
private static class CompletedFetch {
private final TopicPartition partition ;
private final long fetchedOffset ;
private final FetchResponse . PartitionData partitionData ;
private final FetchResponseMetricAggregator metricAggregator ;
public CompletedFetch ( TopicPartition partition ,
long fetchedOffset ,
FetchResponse . PartitionData partitionData ,
FetchResponseMetricAggregator metricAggregator ) {
this . partition = partition ;
this . fetchedOffset = fetchedOffset ;
this . partitionData = partitionData ;
this . metricAggregator = metricAggregator ;
}
}
/ * *
* Since we parse the message data for each partition from each fetch response lazily , fetch - level
* metrics need to be aggregated as the messages from each partition are parsed . This class is used
* to facilitate this incremental aggregation .
* /
private static class FetchResponseMetricAggregator {
private final FetchManagerMetrics sensors ;
private final Set < TopicPartition > unrecordedPartitions ;
private int totalBytes ;
private int totalRecords ;
public FetchResponseMetricAggregator ( FetchManagerMetrics sensors ,
Set < TopicPartition > partitions ) {
this . sensors = sensors ;
this . unrecordedPartitions = partitions ;
}
/ * *
* After each partition is parsed , we update the current metric totals with the total bytes
* and number of records parsed . After all partitions have reported , we write the metric .
* /
public void record ( TopicPartition partition , int bytes , int records ) {
unrecordedPartitions . remove ( partition ) ;
totalBytes + = bytes ;
totalRecords + = records ;
if ( unrecordedPartitions . isEmpty ( ) ) {
// once all expected partitions from the fetch have reported in, record the metrics
sensors . bytesFetched . record ( totalBytes ) ;
sensors . recordsFetched . record ( totalRecords ) ;
}
}
}
private static class FetchManagerMetrics {
public final Metrics metrics ;
public final String metricGrpName ;
@ -719,7 +743,6 @@ public class Fetcher<K, V> {
@@ -719,7 +743,6 @@ public class Fetcher<K, V> {
public final Sensor recordsFetchLag ;
public final Sensor fetchThrottleTimeSensor ;
public FetchManagerMetrics ( Metrics metrics , String metricGrpPrefix ) {
this . metrics = metrics ;
this . metricGrpName = metricGrpPrefix + "-fetch-manager-metrics" ;