diff --git a/clients/src/main/java/org/apache/kafka/common/requests/FetchResponse.java b/clients/src/main/java/org/apache/kafka/common/requests/FetchResponse.java index cb9a5279e84..0e3e4030c47 100644 --- a/clients/src/main/java/org/apache/kafka/common/requests/FetchResponse.java +++ b/clients/src/main/java/org/apache/kafka/common/requests/FetchResponse.java @@ -130,18 +130,18 @@ public class FetchResponse extends AbstractResponse { // When we convert the PartitionData (and other fields) into FetchResponseData down in toMessage, we // set the partition IDs. this.partitionResponse = partitionResponse; - this.preferredReplica = Optional.of(partitionResponse.partitionHeader().preferredReadReplica()) + this.preferredReplica = Optional.of(partitionResponse.preferredReadReplica()) .filter(replicaId -> replicaId != INVALID_PREFERRED_REPLICA_ID); - if (partitionResponse.partitionHeader().abortedTransactions() == null) { + if (partitionResponse.abortedTransactions() == null) { this.abortedTransactions = null; } else { - this.abortedTransactions = partitionResponse.partitionHeader().abortedTransactions().stream() + this.abortedTransactions = partitionResponse.abortedTransactions().stream() .map(AbortedTransaction::fromMessage) .collect(Collectors.toList()); } - this.error = Errors.forCode(partitionResponse.partitionHeader().errorCode()); + this.error = Errors.forCode(partitionResponse.errorCode()); } public PartitionData(Errors error, @@ -154,24 +154,25 @@ public class FetchResponse extends AbstractResponse { this.preferredReplica = preferredReadReplica; this.abortedTransactions = abortedTransactions; this.error = error; - FetchResponseData.PartitionHeader partitionHeader = new FetchResponseData.PartitionHeader(); - partitionHeader.setErrorCode(error.code()) + FetchResponseData.FetchablePartitionResponse partitionResponse = + new FetchResponseData.FetchablePartitionResponse(); + partitionResponse.setErrorCode(error.code()) .setHighWatermark(highWatermark) .setLastStableOffset(lastStableOffset) .setLogStartOffset(logStartOffset); if (abortedTransactions != null) { - partitionHeader.setAbortedTransactions(abortedTransactions.stream().map( + partitionResponse.setAbortedTransactions(abortedTransactions.stream().map( aborted -> new FetchResponseData.AbortedTransaction() .setProducerId(aborted.producerId) .setFirstOffset(aborted.firstOffset)) .collect(Collectors.toList())); } else { - partitionHeader.setAbortedTransactions(null); + partitionResponse.setAbortedTransactions(null); } - partitionHeader.setPreferredReadReplica(preferredReadReplica.orElse(INVALID_PREFERRED_REPLICA_ID)); - this.partitionResponse = new FetchResponseData.FetchablePartitionResponse() - .setPartitionHeader(partitionHeader) - .setRecordSet(records); + partitionResponse.setPreferredReadReplica(preferredReadReplica.orElse(INVALID_PREFERRED_REPLICA_ID)); + partitionResponse.setRecordSet(records); + + this.partitionResponse = partitionResponse; } public PartitionData(Errors error, @@ -216,15 +217,15 @@ public class FetchResponse extends AbstractResponse { } public long highWatermark() { - return partitionResponse.partitionHeader().highWatermark(); + return partitionResponse.highWatermark(); } public long lastStableOffset() { - return partitionResponse.partitionHeader().lastStableOffset(); + return partitionResponse.lastStableOffset(); } public long logStartOffset() { - return partitionResponse.partitionHeader().logStartOffset(); + return partitionResponse.logStartOffset(); } public Optional preferredReadReplica() { @@ -342,8 +343,7 @@ public class FetchResponse extends AbstractResponse { LinkedHashMap> responseMap = new LinkedHashMap<>(); message.responses().forEach(topicResponse -> { topicResponse.partitionResponses().forEach(partitionResponse -> { - FetchResponseData.PartitionHeader partitionHeader = partitionResponse.partitionHeader(); - TopicPartition tp = new TopicPartition(topicResponse.topic(), partitionHeader.partition()); + TopicPartition tp = new TopicPartition(topicResponse.topic(), partitionResponse.partition()); PartitionData partitionData = new PartitionData<>(partitionResponse); responseMap.put(tp, partitionData); }); @@ -366,7 +366,7 @@ public class FetchResponse extends AbstractResponse { List partitionResponses = new ArrayList<>(); partitionDataTopicAndPartitionData.partitions.forEach((partitionId, partitionData) -> { // Since PartitionData alone doesn't know the partition ID, we set it here - partitionData.partitionResponse.partitionHeader().setPartition(partitionId); + partitionData.partitionResponse.setPartition(partitionId); partitionResponses.add(partitionData.partitionResponse); }); topicResponseList.add(new FetchResponseData.FetchableTopicResponse() diff --git a/clients/src/main/resources/common/message/FetchResponse.json b/clients/src/main/resources/common/message/FetchResponse.json index 9f64c2428f7..454fafed7d8 100644 --- a/clients/src/main/resources/common/message/FetchResponse.json +++ b/clients/src/main/resources/common/message/FetchResponse.json @@ -53,28 +53,25 @@ "about": "The topic name." }, { "name": "PartitionResponses", "type": "[]FetchablePartitionResponse", "versions": "0+", "about": "The topic partitions.", "fields": [ - { "name": "PartitionHeader", "type": "PartitionHeader", "versions": "0+", - "fields": [ - { "name": "Partition", "type": "int32", "versions": "0+", - "about": "The partition index." }, - { "name": "ErrorCode", "type": "int16", "versions": "0+", - "about": "The error code, or 0 if there was no fetch error." }, - { "name": "HighWatermark", "type": "int64", "versions": "0+", - "about": "The current high water mark." }, - { "name": "LastStableOffset", "type": "int64", "versions": "4+", "default": "-1", "ignorable": true, - "about": "The last stable offset (or LSO) of the partition. This is the last offset such that the state of all transactional records prior to this offset have been decided (ABORTED or COMMITTED)" }, - { "name": "LogStartOffset", "type": "int64", "versions": "5+", "default": "-1", "ignorable": true, - "about": "The current log start offset." }, - { "name": "AbortedTransactions", "type": "[]AbortedTransaction", "versions": "4+", "nullableVersions": "4+", "ignorable": true, - "about": "The aborted transactions.", "fields": [ - { "name": "ProducerId", "type": "int64", "versions": "4+", "entityType": "producerId", - "about": "The producer id associated with the aborted transaction." }, - { "name": "FirstOffset", "type": "int64", "versions": "4+", - "about": "The first offset in the aborted transaction." } - ]}, - { "name": "PreferredReadReplica", "type": "int32", "versions": "11+", "default": "-1", "ignorable": false, - "about": "The preferred read replica for the consumer to use on its next fetch request"} + { "name": "Partition", "type": "int32", "versions": "0+", + "about": "The partition index." }, + { "name": "ErrorCode", "type": "int16", "versions": "0+", + "about": "The error code, or 0 if there was no fetch error." }, + { "name": "HighWatermark", "type": "int64", "versions": "0+", + "about": "The current high water mark." }, + { "name": "LastStableOffset", "type": "int64", "versions": "4+", "default": "-1", "ignorable": true, + "about": "The last stable offset (or LSO) of the partition. This is the last offset such that the state of all transactional records prior to this offset have been decided (ABORTED or COMMITTED)" }, + { "name": "LogStartOffset", "type": "int64", "versions": "5+", "default": "-1", "ignorable": true, + "about": "The current log start offset." }, + { "name": "AbortedTransactions", "type": "[]AbortedTransaction", "versions": "4+", "nullableVersions": "4+", "ignorable": true, + "about": "The aborted transactions.", "fields": [ + { "name": "ProducerId", "type": "int64", "versions": "4+", "entityType": "producerId", + "about": "The producer id associated with the aborted transaction." }, + { "name": "FirstOffset", "type": "int64", "versions": "4+", + "about": "The first offset in the aborted transaction." } ]}, + { "name": "PreferredReadReplica", "type": "int32", "versions": "11+", "default": "-1", "ignorable": false, + "about": "The preferred read replica for the consumer to use on its next fetch request"}, { "name": "RecordSet", "type": "records", "versions": "0+", "nullableVersions": "0+", "about": "The record data."} ]} ]}