|
|
|
@ -111,9 +111,9 @@ class ReplicaFetcherThread(name:String,
@@ -111,9 +111,9 @@ class ReplicaFetcherThread(name:String,
|
|
|
|
|
* Roll out a new log at the follower with the start offset equal to the current leader's start offset and continue fetching. |
|
|
|
|
*/ |
|
|
|
|
val leaderStartOffset = simpleConsumer.earliestOrLatestOffset(topicAndPartition, OffsetRequest.EarliestTime, brokerConfig.brokerId) |
|
|
|
|
replicaMgr.logManager.truncateFullyAndStartAt(topicAndPartition, leaderStartOffset) |
|
|
|
|
warn("Replica %d for partition %s reset its fetch offset from %d to current leader %d's start offset %d" |
|
|
|
|
.format(brokerConfig.brokerId, topicAndPartition, replica.logEndOffset.messageOffset, sourceBroker.id, leaderStartOffset)) |
|
|
|
|
replicaMgr.logManager.truncateFullyAndStartAt(topicAndPartition, leaderStartOffset) |
|
|
|
|
leaderStartOffset |
|
|
|
|
} |
|
|
|
|
} |
|
|
|
|