diff --git a/core/src/main/scala/kafka/server/ReplicaManager.scala b/core/src/main/scala/kafka/server/ReplicaManager.scala index bfb50b88a7d..c71e29c92ab 100644 --- a/core/src/main/scala/kafka/server/ReplicaManager.scala +++ b/core/src/main/scala/kafka/server/ReplicaManager.scala @@ -874,25 +874,26 @@ class ReplicaManager(val config: KafkaConfig, } // Wrap the given callback function with another function that will update the HW for the remote follower - val updateHwAndThenCallback: Seq[(TopicPartition, FetchPartitionData)] => Unit = - (fetchPartitionData: Seq[(TopicPartition, FetchPartitionData)]) => { + def maybeUpdateHwAndSendResponse(fetchPartitionData: Seq[(TopicPartition, FetchPartitionData)]): Unit = { + if (isFromFollower) { fetchPartitionData.foreach { case (tp, partitionData) => updateFollowerHighWatermark(tp, replicaId, partitionData.highWatermark) } - responseCallback(fetchPartitionData) } + responseCallback(fetchPartitionData) + } // respond immediately if 1) fetch request does not want to wait // 2) fetch request does not require any data // 3) has enough data to respond // 4) some error happens while reading data - // 5) all the requested partitions need HW update + // 5) any of the requested partitions need HW update if (timeout <= 0 || fetchInfos.isEmpty || bytesReadable >= fetchMinBytes || errorReadingData || anyPartitionsNeedHwUpdate) { val fetchPartitionData = logReadResults.map { case (tp, result) => tp -> FetchPartitionData(result.error, result.highWatermark, result.leaderLogStartOffset, result.info.records, result.lastStableOffset, result.info.abortedTransactions, result.preferredReadReplica) } - updateHwAndThenCallback(fetchPartitionData) + maybeUpdateHwAndSendResponse(fetchPartitionData) } else { // construct the fetch results from the read results val fetchPartitionStatus = new mutable.ArrayBuffer[(TopicPartition, FetchPartitionStatus)] @@ -905,7 +906,7 @@ class ReplicaManager(val config: KafkaConfig, val fetchMetadata = FetchMetadata(fetchMinBytes, fetchMaxBytes, hardMaxBytesLimit, isFromFollower, fetchIsolation, isFromFollower, replicaId, fetchPartitionStatus) val delayedFetch = new DelayedFetch(timeout, fetchMetadata, this, quota, clientMetadata, - updateHwAndThenCallback) + maybeUpdateHwAndSendResponse) // create a list of (topic, partition) pairs to use as keys for this delayed fetch operation val delayedFetchKeys = fetchPartitionStatus.map { case (tp, _) => TopicPartitionOperationKey(tp) }