@ -46,6 +46,7 @@ class ReplicaStateMachine(controller: KafkaController) extends Logging {
private val zkUtils = controllerContext . zkUtils
private val zkUtils = controllerContext . zkUtils
private val replicaState : mutable.Map [ PartitionAndReplica , ReplicaState ] = mutable . Map . empty
private val replicaState : mutable.Map [ PartitionAndReplica , ReplicaState ] = mutable . Map . empty
private val brokerRequestBatch = new ControllerBrokerRequestBatch ( controller )
private val brokerRequestBatch = new ControllerBrokerRequestBatch ( controller )
private val stateChangeLogger = KafkaController . stateChangeLogger
private val stateChangeLogger = KafkaController . stateChangeLogger
this . logIdent = "[Replica state machine on controller " + controller . config . brokerId + "]: "
this . logIdent = "[Replica state machine on controller " + controller . config . brokerId + "]: "
@ -136,9 +137,9 @@ class ReplicaStateMachine(controller: KafkaController) extends Logging {
val currState = replicaState . getOrElseUpdate ( partitionAndReplica , NonExistentReplica )
val currState = replicaState . getOrElseUpdate ( partitionAndReplica , NonExistentReplica )
try {
try {
val replicaAssignment = controllerContext . partitionReplicaAssignment ( topicAndPartition )
val replicaAssignment = controllerContext . partitionReplicaAssignment ( topicAndPartition )
assertValidTransition ( partitionAndReplica , targetState )
targetState match {
targetState match {
case NewReplica =>
case NewReplica =>
assertValidPreviousStates ( partitionAndReplica , List ( NonExistentReplica ) , targetState )
// start replica as a follower to the current leader for its partition
// start replica as a follower to the current leader for its partition
val leaderIsrAndControllerEpochOpt = ReplicationUtils . getLeaderIsrAndEpochForPartition ( zkUtils , topic , partition )
val leaderIsrAndControllerEpochOpt = ReplicationUtils . getLeaderIsrAndEpochForPartition ( zkUtils , topic , partition )
leaderIsrAndControllerEpochOpt match {
leaderIsrAndControllerEpochOpt match {
@ -156,7 +157,6 @@ class ReplicaStateMachine(controller: KafkaController) extends Logging {
. format ( controllerId , controller . epoch , replicaId , topicAndPartition , currState ,
. format ( controllerId , controller . epoch , replicaId , topicAndPartition , currState ,
targetState ) )
targetState ) )
case ReplicaDeletionStarted =>
case ReplicaDeletionStarted =>
assertValidPreviousStates ( partitionAndReplica , List ( OfflineReplica ) , targetState )
replicaState . put ( partitionAndReplica , ReplicaDeletionStarted )
replicaState . put ( partitionAndReplica , ReplicaDeletionStarted )
// send stop replica command
// send stop replica command
brokerRequestBatch . addStopReplicaRequestForBrokers ( List ( replicaId ) , topic , partition , deletePartition = true ,
brokerRequestBatch . addStopReplicaRequestForBrokers ( List ( replicaId ) , topic , partition , deletePartition = true ,
@ -164,17 +164,14 @@ class ReplicaStateMachine(controller: KafkaController) extends Logging {
stateChangeLogger . trace ( "Controller %d epoch %d changed state of replica %d for partition %s from %s to %s"
stateChangeLogger . trace ( "Controller %d epoch %d changed state of replica %d for partition %s from %s to %s"
. format ( controllerId , controller . epoch , replicaId , topicAndPartition , currState , targetState ) )
. format ( controllerId , controller . epoch , replicaId , topicAndPartition , currState , targetState ) )
case ReplicaDeletionIneligible =>
case ReplicaDeletionIneligible =>
assertValidPreviousStates ( partitionAndReplica , List ( ReplicaDeletionStarted ) , targetState )
replicaState . put ( partitionAndReplica , ReplicaDeletionIneligible )
replicaState . put ( partitionAndReplica , ReplicaDeletionIneligible )
stateChangeLogger . trace ( "Controller %d epoch %d changed state of replica %d for partition %s from %s to %s"
stateChangeLogger . trace ( "Controller %d epoch %d changed state of replica %d for partition %s from %s to %s"
. format ( controllerId , controller . epoch , replicaId , topicAndPartition , currState , targetState ) )
. format ( controllerId , controller . epoch , replicaId , topicAndPartition , currState , targetState ) )
case ReplicaDeletionSuccessful =>
case ReplicaDeletionSuccessful =>
assertValidPreviousStates ( partitionAndReplica , List ( ReplicaDeletionStarted ) , targetState )
replicaState . put ( partitionAndReplica , ReplicaDeletionSuccessful )
replicaState . put ( partitionAndReplica , ReplicaDeletionSuccessful )
stateChangeLogger . trace ( "Controller %d epoch %d changed state of replica %d for partition %s from %s to %s"
stateChangeLogger . trace ( "Controller %d epoch %d changed state of replica %d for partition %s from %s to %s"
. format ( controllerId , controller . epoch , replicaId , topicAndPartition , currState , targetState ) )
. format ( controllerId , controller . epoch , replicaId , topicAndPartition , currState , targetState ) )
case NonExistentReplica =>
case NonExistentReplica =>
assertValidPreviousStates ( partitionAndReplica , List ( ReplicaDeletionSuccessful ) , targetState )
// remove this replica from the assigned replicas list for its partition
// remove this replica from the assigned replicas list for its partition
val currentAssignedReplicas = controllerContext . partitionReplicaAssignment ( topicAndPartition )
val currentAssignedReplicas = controllerContext . partitionReplicaAssignment ( topicAndPartition )
controllerContext . partitionReplicaAssignment . put ( topicAndPartition , currentAssignedReplicas . filterNot ( _ == replicaId ) )
controllerContext . partitionReplicaAssignment . put ( topicAndPartition , currentAssignedReplicas . filterNot ( _ == replicaId ) )
@ -182,8 +179,6 @@ class ReplicaStateMachine(controller: KafkaController) extends Logging {
stateChangeLogger . trace ( "Controller %d epoch %d changed state of replica %d for partition %s from %s to %s"
stateChangeLogger . trace ( "Controller %d epoch %d changed state of replica %d for partition %s from %s to %s"
. format ( controllerId , controller . epoch , replicaId , topicAndPartition , currState , targetState ) )
. format ( controllerId , controller . epoch , replicaId , topicAndPartition , currState , targetState ) )
case OnlineReplica =>
case OnlineReplica =>
assertValidPreviousStates ( partitionAndReplica ,
List ( NewReplica , OnlineReplica , OfflineReplica , ReplicaDeletionIneligible ) , targetState )
replicaState ( partitionAndReplica ) match {
replicaState ( partitionAndReplica ) match {
case NewReplica =>
case NewReplica =>
// add this replica to the assigned replicas list for its partition
// add this replica to the assigned replicas list for its partition
@ -208,8 +203,6 @@ class ReplicaStateMachine(controller: KafkaController) extends Logging {
}
}
replicaState . put ( partitionAndReplica , OnlineReplica )
replicaState . put ( partitionAndReplica , OnlineReplica )
case OfflineReplica =>
case OfflineReplica =>
assertValidPreviousStates ( partitionAndReplica ,
List ( NewReplica , OnlineReplica , OfflineReplica , ReplicaDeletionIneligible ) , targetState )
// send stop replica command to the replica so that it stops fetching from the leader
// send stop replica command to the replica so that it stops fetching from the leader
brokerRequestBatch . addStopReplicaRequestForBrokers ( List ( replicaId ) , topic , partition , deletePartition = false )
brokerRequestBatch . addStopReplicaRequestForBrokers ( List ( replicaId ) , topic , partition , deletePartition = false )
// As an optimization , the controller removes dead replicas from the ISR
// As an optimization , the controller removes dead replicas from the ISR
@ -273,11 +266,10 @@ class ReplicaStateMachine(controller: KafkaController) extends Logging {
replicaState . filter ( r => r . _1 . topic . equals ( topic ) && deletionStates . contains ( r . _2 ) ) . keySet
replicaState . filter ( r => r . _1 . topic . equals ( topic ) && deletionStates . contains ( r . _2 ) ) . keySet
}
}
private def assertValidPreviousStates ( partitionAndReplica : PartitionAndReplica , fromStates : Seq [ ReplicaState ] ,
private def assertValidTransition ( partitionAndReplica : PartitionAndReplica , targetState : ReplicaState ) : Unit = {
targetState : ReplicaState ) {
assert ( targetState . validPreviousStates . contains ( replicaState ( partitionAndReplica ) ) ,
assert ( fromStates . contains ( replicaState ( partitionAndReplica ) ) ,
"Replica %s should be in the %s states before moving to %s state"
"Replica %s should be in the %s states before moving to %s state"
. format ( partitionAndReplica , from States. mkString ( "," ) , targetState ) +
. format ( partitionAndReplica , targetState . validPrevious States. mkString ( "," ) , targetState ) +
". Instead it is in %s state" . format ( replicaState ( partitionAndReplica ) ) )
". Instead it is in %s state" . format ( replicaState ( partitionAndReplica ) ) )
}
}
@ -307,11 +299,42 @@ class ReplicaStateMachine(controller: KafkaController) extends Logging {
}
}
}
}
sealed trait ReplicaState { def state : Byte }
sealed trait ReplicaState {
case object NewReplica extends ReplicaState { val state : Byte = 1 }
def state : Byte
case object OnlineReplica extends ReplicaState { val state : Byte = 2 }
def validPreviousStates : Set [ ReplicaState ]
case object OfflineReplica extends ReplicaState { val state : Byte = 3 }
}
case object ReplicaDeletionStarted extends ReplicaState { val state : Byte = 4 }
case object ReplicaDeletionSuccessful extends ReplicaState { val state : Byte = 5 }
case object NewReplica extends ReplicaState {
case object ReplicaDeletionIneligible extends ReplicaState { val state : Byte = 6 }
val state : Byte = 1
case object NonExistentReplica extends ReplicaState { val state : Byte = 7 }
val validPreviousStates : Set [ ReplicaState ] = Set ( NonExistentReplica )
}
case object OnlineReplica extends ReplicaState {
val state : Byte = 2
val validPreviousStates : Set [ ReplicaState ] = Set ( NewReplica , OnlineReplica , OfflineReplica , ReplicaDeletionIneligible )
}
case object OfflineReplica extends ReplicaState {
val state : Byte = 3
val validPreviousStates : Set [ ReplicaState ] = Set ( NewReplica , OnlineReplica , OfflineReplica , ReplicaDeletionIneligible )
}
case object ReplicaDeletionStarted extends ReplicaState {
val state : Byte = 4
val validPreviousStates : Set [ ReplicaState ] = Set ( OfflineReplica )
}
case object ReplicaDeletionSuccessful extends ReplicaState {
val state : Byte = 5
val validPreviousStates : Set [ ReplicaState ] = Set ( ReplicaDeletionStarted )
}
case object ReplicaDeletionIneligible extends ReplicaState {
val state : Byte = 6
val validPreviousStates : Set [ ReplicaState ] = Set ( ReplicaDeletionStarted )
}
case object NonExistentReplica extends ReplicaState {
val state : Byte = 7
val validPreviousStates : Set [ ReplicaState ] = Set ( ReplicaDeletionSuccessful )
}