Browse Source

MINOR: Only invoke hw update logic for follower fetches (#7064)

We noticed a lot of messages like the following in the broker logs recently:
```
[2019-07-10 02:01:23,946] WARN [ReplicaManager broker=0] While updating the HW for follower -1 for partition connect-storage-topic-connect-cluster-0, the replica could not be found. (kafka.server.ReplicaManager:70)
```
In the KIP-392 PR, we added logic to track the high watermark of followers, but it is invoked even for consumer fetches. This doesn't cause any harm other than all the log noise. 

This patch just adds the missing follower check.

Reviewers: David Arthur <mumrah@gmail.com>
pull/7059/head
Jason Gustafson 5 years ago committed by GitHub
parent
commit
df6efda1f2
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 13
      core/src/main/scala/kafka/server/ReplicaManager.scala

13
core/src/main/scala/kafka/server/ReplicaManager.scala

@ -874,25 +874,26 @@ class ReplicaManager(val config: KafkaConfig, @@ -874,25 +874,26 @@ class ReplicaManager(val config: KafkaConfig,
}
// Wrap the given callback function with another function that will update the HW for the remote follower
val updateHwAndThenCallback: Seq[(TopicPartition, FetchPartitionData)] => Unit =
(fetchPartitionData: Seq[(TopicPartition, FetchPartitionData)]) => {
def maybeUpdateHwAndSendResponse(fetchPartitionData: Seq[(TopicPartition, FetchPartitionData)]): Unit = {
if (isFromFollower) {
fetchPartitionData.foreach {
case (tp, partitionData) => updateFollowerHighWatermark(tp, replicaId, partitionData.highWatermark)
}
responseCallback(fetchPartitionData)
}
responseCallback(fetchPartitionData)
}
// respond immediately if 1) fetch request does not want to wait
// 2) fetch request does not require any data
// 3) has enough data to respond
// 4) some error happens while reading data
// 5) all the requested partitions need HW update
// 5) any of the requested partitions need HW update
if (timeout <= 0 || fetchInfos.isEmpty || bytesReadable >= fetchMinBytes || errorReadingData || anyPartitionsNeedHwUpdate) {
val fetchPartitionData = logReadResults.map { case (tp, result) =>
tp -> FetchPartitionData(result.error, result.highWatermark, result.leaderLogStartOffset, result.info.records,
result.lastStableOffset, result.info.abortedTransactions, result.preferredReadReplica)
}
updateHwAndThenCallback(fetchPartitionData)
maybeUpdateHwAndSendResponse(fetchPartitionData)
} else {
// construct the fetch results from the read results
val fetchPartitionStatus = new mutable.ArrayBuffer[(TopicPartition, FetchPartitionStatus)]
@ -905,7 +906,7 @@ class ReplicaManager(val config: KafkaConfig, @@ -905,7 +906,7 @@ class ReplicaManager(val config: KafkaConfig,
val fetchMetadata = FetchMetadata(fetchMinBytes, fetchMaxBytes, hardMaxBytesLimit, isFromFollower,
fetchIsolation, isFromFollower, replicaId, fetchPartitionStatus)
val delayedFetch = new DelayedFetch(timeout, fetchMetadata, this, quota, clientMetadata,
updateHwAndThenCallback)
maybeUpdateHwAndSendResponse)
// create a list of (topic, partition) pairs to use as keys for this delayed fetch operation
val delayedFetchKeys = fetchPartitionStatus.map { case (tp, _) => TopicPartitionOperationKey(tp) }

Loading…
Cancel
Save