@ -114,21 +114,24 @@ case class FetchPartitionData(error: Errors = Errors.NONE,
@@ -114,21 +114,24 @@ case class FetchPartitionData(error: Errors = Errors.NONE,
* instance when the broker receives a LeaderAndIsr request from the controller indicating
* that it should be either a leader or follower of a partition .
*/
trait HostedPartition
sealed trait HostedPartition
object HostedPartition {
/* *
* This broker does not have any state for this partition locally .
*/
object None extends HostedPartition
final object None extends HostedPartition
/* *
* This broker hosts the partition and it is online .
*/
final case class Online ( partition : Partition ) extends HostedPartition
/* *
* This broker hosts the partition , but it is in an offline log directory .
*/
object Offline extends HostedPartition
final object Offline extends HostedPartition
}
object ReplicaManager {
val HighWatermarkFilename = "replication-offset-checkpoint"
val IsrChangePropagationBlackOut = 5000L
@ -183,8 +186,9 @@ class ReplicaManager(val config: KafkaConfig,
@@ -183,8 +186,9 @@ class ReplicaManager(val config: KafkaConfig,
/* epoch of the controller that last changed the leader */
@volatile var controllerEpoch : Int = KafkaController . InitialControllerEpoch
private val localBrokerId = config . brokerId
private val allPartitions = new Pool [ TopicPartition , HostedPartition ] ( valueFactory = Some ( tp =>
Partition ( tp , time , this ) ) )
private val allPartitions = new Pool [ TopicPartition , HostedPartition ] (
valueFactory = Some ( tp => HostedPartition . Online ( Partition ( tp , time , this ) ) )
)
private val replicaStateChangeLock = new Object
val replicaFetcherManager = createReplicaFetcherManager ( metrics , time , threadNamePrefix , quotaManagers . follower )
val replicaAlterLogDirsManager = createReplicaAlterLogDirsManager ( quotaManagers . alterLogDirs , brokerTopicStats )
@ -320,8 +324,8 @@ class ReplicaManager(val config: KafkaConfig,
@@ -320,8 +324,8 @@ class ReplicaManager(val config: KafkaConfig,
private def maybeRemoveTopicMetrics ( topic : String ) : Unit = {
val topicHasOnlinePartition = allPartitions . values . exists {
case partition : Partition => topic == partition . topic
case _ => false
case HostedPartition . Online ( partition ) => topic == partition . topic
case HostedPartition . None | HostedPartition . Offline => false
}
if ( ! topicHasOnlinePartition )
brokerTopicStats . removeMetrics ( topic )
@ -335,8 +339,8 @@ class ReplicaManager(val config: KafkaConfig,
@@ -335,8 +339,8 @@ class ReplicaManager(val config: KafkaConfig,
case HostedPartition . Offline =>
throw new KafkaStorageException ( s" Partition $topicPartition is on an offline disk " )
case removedPartition : Partition =>
if ( allPartitions . remove ( topicPartition , remov edPartition) ) {
case hostedPartition @ HostedPartition . Online ( removedPartition ) =>
if ( allPartitions . remove ( topicPartition , host edPartition) ) {
maybeRemoveTopicMetrics ( topicPartition . topic )
// this will delete the local log . This call may throw exception if the log is on offline directory
removedPartition . delete ( )
@ -393,14 +397,14 @@ class ReplicaManager(val config: KafkaConfig,
@@ -393,14 +397,14 @@ class ReplicaManager(val config: KafkaConfig,
// Visible for testing
def createPartition ( topicPartition : TopicPartition ) : Partition = {
val partition = Partition ( topicPartition , time , this )
allPartitions . put ( topicPartition , partition )
allPartitions . put ( topicPartition , HostedPartition . Online ( partition ) )
partition
}
def nonOfflinePartition ( topicPartition : TopicPartition ) : Option [ Partition ] = {
getPartition ( topicPartition ) match {
case partition : Partition => Some ( partition )
case _ => None
case HostedPartition . Online ( partition ) => Some ( partition )
case HostedPartition . None | HostedPartition . Offline => None
}
}
@ -408,8 +412,8 @@ class ReplicaManager(val config: KafkaConfig,
@@ -408,8 +412,8 @@ class ReplicaManager(val config: KafkaConfig,
// the iterator has been constructed could still be returned by this iterator .
private def nonOfflinePartitionsIterator : Iterator [ Partition ] = {
allPartitions . values . iterator . flatMap {
case p : Partition => Some ( p )
case _ => None
case HostedPartition . Online ( partition ) => Some ( partition )
case HostedPartition . None | HostedPartition . Offline => None
}
}
@ -419,7 +423,7 @@ class ReplicaManager(val config: KafkaConfig,
@@ -419,7 +423,7 @@ class ReplicaManager(val config: KafkaConfig,
def getPartitionOrException ( topicPartition : TopicPartition , expectLeader : Boolean ) : Partition = {
getPartition ( topicPartition ) match {
case partition : Partition =>
case HostedPartition . Online ( partition ) =>
partition
case HostedPartition . Offline =>
@ -577,7 +581,7 @@ class ReplicaManager(val config: KafkaConfig,
@@ -577,7 +581,7 @@ class ReplicaManager(val config: KafkaConfig,
throw new KafkaStorageException ( s" Log directory $destinationDir is offline " )
getPartition ( topicPartition ) match {
case partition : Partition =>
case HostedPartition . Online ( partition ) =>
// Stop current replica movement if the destinationDir is different from the existing destination log directory
if ( partition . futureReplicaDirChanged ( destinationDir ) ) {
replicaAlterLogDirsManager . removeFetcherForPartitions ( Set ( topicPartition ) )
@ -586,7 +590,7 @@ class ReplicaManager(val config: KafkaConfig,
@@ -586,7 +590,7 @@ class ReplicaManager(val config: KafkaConfig,
case HostedPartition . Offline =>
throw new KafkaStorageException ( s" Partition $topicPartition is offline " )
case _ => // Do nothing
case HostedPartition . None => // Do nothing
}
// If the log for this partition has not been created yet :
@ -776,8 +780,8 @@ class ReplicaManager(val config: KafkaConfig,
@@ -776,8 +780,8 @@ class ReplicaManager(val config: KafkaConfig,
( topicPartition , LogAppendResult ( LogAppendInfo . UnknownLogAppendInfo , Some ( e ) ) )
case t : Throwable =>
val logStartOffset = getPartition ( topicPartition ) match {
case partition : Partition => partition . logStartOffset
case _ => - 1L
case HostedPartition . Online ( partition ) => partition . logStartOffset
case HostedPartition . None | HostedPartition . Offline => - 1L
}
brokerTopicStats . topicStats ( topicPartition . topic ) . failedProduceRequestRate . mark ( )
brokerTopicStats . allTopicsStats . failedProduceRequestRate . mark ( )
@ -1064,11 +1068,11 @@ class ReplicaManager(val config: KafkaConfig,
@@ -1064,11 +1068,11 @@ class ReplicaManager(val config: KafkaConfig,
responseMap . put ( topicPartition , Errors . KAFKA_STORAGE_ERROR )
None
case partition : Partition => Some ( partition )
case HostedPartition . Online ( partition ) => Some ( partition )
case HostedPartition . None =>
val partition = Partition ( topicPartition , time , this )
allPartitions . putIfNotExists ( topicPartition , partition )
allPartitions . putIfNotExists ( topicPartition , HostedPartition . Online ( partition ) )
newPartitions . add ( partition )
Some ( partition )
}
@ -1534,7 +1538,7 @@ class ReplicaManager(val config: KafkaConfig,
@@ -1534,7 +1538,7 @@ class ReplicaManager(val config: KafkaConfig,
def lastOffsetForLeaderEpoch ( requestedEpochInfo : Map [ TopicPartition , OffsetsForLeaderEpochRequest . PartitionData ] ) : Map [ TopicPartition , EpochEndOffset ] = {
requestedEpochInfo . map { case ( tp , partitionData ) =>
val epochEndOffset = getPartition ( tp ) match {
case partition : Partition =>
case HostedPartition . Online ( partition ) =>
partition . lastOffsetForLeaderEpoch ( partitionData . currentLeaderEpoch , partitionData . leaderEpoch ,
fetchOnlyFromLeader = true )