diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java index 8ac5730110c..64bc92130c7 100644 --- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java +++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java @@ -68,6 +68,7 @@ import org.apache.kafka.common.utils.Time; import org.apache.kafka.common.utils.Timer; import org.apache.kafka.common.utils.Utils; import org.slf4j.Logger; +import org.slf4j.helpers.MessageFormatter; import java.io.Closeable; import java.nio.ByteBuffer; @@ -241,13 +242,30 @@ public class Fetcher implements SubscriptionState.Listener, Closeable { for (Map.Entry> entry : response.responseData().entrySet()) { TopicPartition partition = entry.getKey(); - long fetchOffset = data.sessionPartitions().get(partition).fetchOffset; - FetchResponse.PartitionData fetchData = entry.getValue(); - - log.debug("Fetch {} at offset {} for partition {} returned fetch data {}", - isolationLevel, fetchOffset, partition, fetchData); - completedFetches.add(new CompletedFetch(partition, fetchOffset, fetchData, metricAggregator, - resp.requestHeader().apiVersion())); + FetchRequest.PartitionData requestData = data.sessionPartitions().get(partition); + if (requestData == null) { + String message; + if (data.metadata().isFull()) { + message = MessageFormatter.arrayFormat( + "Response for missing full request partition: partition={}; metadata={}", + new Object[]{partition, data.metadata()}).getMessage(); + } else { + message = MessageFormatter.arrayFormat( + "Response for missing session request partition: partition={}; metadata={}; toSend={}; toForget={}", + new Object[]{partition, data.metadata(), data.toSend(), data.toForget()}).getMessage(); + } + + // Received fetch response for missing session partition + throw new IllegalStateException(message); + } else { + long fetchOffset = requestData.fetchOffset; + FetchResponse.PartitionData fetchData = entry.getValue(); + + log.debug("Fetch {} at offset {} for partition {} returned fetch data {}", + isolationLevel, fetchOffset, partition, fetchData); + completedFetches.add(new CompletedFetch(partition, fetchOffset, fetchData, metricAggregator, + resp.requestHeader().apiVersion())); + } } sensors.fetchLatency.record(resp.requestLatencyMs());