@ -130,18 +130,18 @@ public class FetchResponse<T extends BaseRecords> extends AbstractResponse {
@@ -130,18 +130,18 @@ public class FetchResponse<T extends BaseRecords> extends AbstractResponse {
// When we convert the PartitionData (and other fields) into FetchResponseData down in toMessage, we
// set the partition IDs.
this . partitionResponse = partitionResponse ;
this . preferredReplica = Optional . of ( partitionResponse . partitionHeader ( ) . p referredReadReplica ( ) )
this . preferredReplica = Optional . of ( partitionResponse . preferredReadReplica ( ) )
. filter ( replicaId - > replicaId ! = INVALID_PREFERRED_REPLICA_ID ) ;
if ( partitionResponse . partitionHeader ( ) . abortedTransactions ( ) = = null ) {
if ( partitionResponse . abortedTransactions ( ) = = null ) {
this . abortedTransactions = null ;
} else {
this . abortedTransactions = partitionResponse . partitionHeader ( ) . abortedTransactions ( ) . stream ( )
this . abortedTransactions = partitionResponse . abortedTransactions ( ) . stream ( )
. map ( AbortedTransaction : : fromMessage )
. collect ( Collectors . toList ( ) ) ;
}
this . error = Errors . forCode ( partitionResponse . partitionHeader ( ) . errorCode ( ) ) ;
this . error = Errors . forCode ( partitionResponse . errorCode ( ) ) ;
}
public PartitionData ( Errors error ,
@ -154,24 +154,25 @@ public class FetchResponse<T extends BaseRecords> extends AbstractResponse {
@@ -154,24 +154,25 @@ public class FetchResponse<T extends BaseRecords> extends AbstractResponse {
this . preferredReplica = preferredReadReplica ;
this . abortedTransactions = abortedTransactions ;
this . error = error ;
FetchResponseData . PartitionHeader partitionHeader = new FetchResponseData . PartitionHeader ( ) ;
partitionHeader . setErrorCode ( error . code ( ) )
FetchResponseData . FetchablePartitionResponse partitionResponse =
new FetchResponseData . FetchablePartitionResponse ( ) ;
partitionResponse . setErrorCode ( error . code ( ) )
. setHighWatermark ( highWatermark )
. setLastStableOffset ( lastStableOffset )
. setLogStartOffset ( logStartOffset ) ;
if ( abortedTransactions ! = null ) {
partitionHeader . setAbortedTransactions ( abortedTransactions . stream ( ) . map (
partitionResponse . setAbortedTransactions ( abortedTransactions . stream ( ) . map (
aborted - > new FetchResponseData . AbortedTransaction ( )
. setProducerId ( aborted . producerId )
. setFirstOffset ( aborted . firstOffset ) )
. collect ( Collectors . toList ( ) ) ) ;
} else {
partitionHeader . setAbortedTransactions ( null ) ;
partitionResponse . setAbortedTransactions ( null ) ;
}
partitionHeader . setPreferredReadReplica ( preferredReadReplica . orElse ( INVALID_PREFERRED_REPLICA_ID ) ) ;
this . partitionResponse = new FetchResponseData . FetchablePartitionResponse ( )
. setPartitionHeader ( partitionHeader )
. setRecordSet ( records ) ;
partitionResponse . setPreferredReadReplica ( preferredReadReplica . orElse ( INVALID_PREFERRED_REPLICA_ID ) ) ;
partitionResponse . setRecordSet ( records ) ;
this . partitionResponse = partitionResponse ;
}
public PartitionData ( Errors error ,
@ -216,15 +217,15 @@ public class FetchResponse<T extends BaseRecords> extends AbstractResponse {
@@ -216,15 +217,15 @@ public class FetchResponse<T extends BaseRecords> extends AbstractResponse {
}
public long highWatermark ( ) {
return partitionResponse . partitionHeader ( ) . highWatermark ( ) ;
return partitionResponse . highWatermark ( ) ;
}
public long lastStableOffset ( ) {
return partitionResponse . partitionHeader ( ) . lastStableOffset ( ) ;
return partitionResponse . lastStableOffset ( ) ;
}
public long logStartOffset ( ) {
return partitionResponse . partitionHeader ( ) . logStartOffset ( ) ;
return partitionResponse . logStartOffset ( ) ;
}
public Optional < Integer > preferredReadReplica ( ) {
@ -342,8 +343,7 @@ public class FetchResponse<T extends BaseRecords> extends AbstractResponse {
@@ -342,8 +343,7 @@ public class FetchResponse<T extends BaseRecords> extends AbstractResponse {
LinkedHashMap < TopicPartition , PartitionData < T > > responseMap = new LinkedHashMap < > ( ) ;
message . responses ( ) . forEach ( topicResponse - > {
topicResponse . partitionResponses ( ) . forEach ( partitionResponse - > {
FetchResponseData . PartitionHeader partitionHeader = partitionResponse . partitionHeader ( ) ;
TopicPartition tp = new TopicPartition ( topicResponse . topic ( ) , partitionHeader . partition ( ) ) ;
TopicPartition tp = new TopicPartition ( topicResponse . topic ( ) , partitionResponse . partition ( ) ) ;
PartitionData < T > partitionData = new PartitionData < > ( partitionResponse ) ;
responseMap . put ( tp , partitionData ) ;
} ) ;
@ -366,7 +366,7 @@ public class FetchResponse<T extends BaseRecords> extends AbstractResponse {
@@ -366,7 +366,7 @@ public class FetchResponse<T extends BaseRecords> extends AbstractResponse {
List < FetchResponseData . FetchablePartitionResponse > partitionResponses = new ArrayList < > ( ) ;
partitionDataTopicAndPartitionData . partitions . forEach ( ( partitionId , partitionData ) - > {
// Since PartitionData alone doesn't know the partition ID, we set it here
partitionData . partitionResponse . partitionHeader ( ) . setPartition ( partitionId ) ;
partitionData . partitionResponse . setPartition ( partitionId ) ;
partitionResponses . add ( partitionData . partitionResponse ) ;
} ) ;
topicResponseList . add ( new FetchResponseData . FetchableTopicResponse ( )