Browse Source

MINOR: Better messaging for invalid fetch response (#6427)

Users have reported (KAFKA-7565) that when consumer poll wake up is used,
it is possible to receive fetch responses that don't match the copied topic
partitions collection for the session when the fetch request was created.

This commit improves the error handling here by throwing an
IllegalStateException instead of a NullPointerException. And by
generating a message for the exception that includes a bit of more
information.

Reviewers: Jason Gustafson <jason@confluent.io>
pull/6440/head
José Armando García Sancio 6 years ago committed by Jason Gustafson
parent
commit
fd79dd0608
  1. 32
      clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java

32
clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java

@ -68,6 +68,7 @@ import org.apache.kafka.common.utils.Time; @@ -68,6 +68,7 @@ import org.apache.kafka.common.utils.Time;
import org.apache.kafka.common.utils.Timer;
import org.apache.kafka.common.utils.Utils;
import org.slf4j.Logger;
import org.slf4j.helpers.MessageFormatter;
import java.io.Closeable;
import java.nio.ByteBuffer;
@ -241,13 +242,30 @@ public class Fetcher<K, V> implements SubscriptionState.Listener, Closeable { @@ -241,13 +242,30 @@ public class Fetcher<K, V> implements SubscriptionState.Listener, Closeable {
for (Map.Entry<TopicPartition, FetchResponse.PartitionData<Records>> entry : response.responseData().entrySet()) {
TopicPartition partition = entry.getKey();
long fetchOffset = data.sessionPartitions().get(partition).fetchOffset;
FetchResponse.PartitionData<Records> fetchData = entry.getValue();
log.debug("Fetch {} at offset {} for partition {} returned fetch data {}",
isolationLevel, fetchOffset, partition, fetchData);
completedFetches.add(new CompletedFetch(partition, fetchOffset, fetchData, metricAggregator,
resp.requestHeader().apiVersion()));
FetchRequest.PartitionData requestData = data.sessionPartitions().get(partition);
if (requestData == null) {
String message;
if (data.metadata().isFull()) {
message = MessageFormatter.arrayFormat(
"Response for missing full request partition: partition={}; metadata={}",
new Object[]{partition, data.metadata()}).getMessage();
} else {
message = MessageFormatter.arrayFormat(
"Response for missing session request partition: partition={}; metadata={}; toSend={}; toForget={}",
new Object[]{partition, data.metadata(), data.toSend(), data.toForget()}).getMessage();
}
// Received fetch response for missing session partition
throw new IllegalStateException(message);
} else {
long fetchOffset = requestData.fetchOffset;
FetchResponse.PartitionData<Records> fetchData = entry.getValue();
log.debug("Fetch {} at offset {} for partition {} returned fetch data {}",
isolationLevel, fetchOffset, partition, fetchData);
completedFetches.add(new CompletedFetch(partition, fetchOffset, fetchData, metricAggregator,
resp.requestHeader().apiVersion()));
}
}
sensors.fetchLatency.record(resp.requestLatencyMs());

Loading…
Cancel
Save