diff --git a/core/src/main/scala/kafka/controller/PartitionStateMachine.scala b/core/src/main/scala/kafka/controller/PartitionStateMachine.scala index 5bed329d74f..4cffc132606 100755 --- a/core/src/main/scala/kafka/controller/PartitionStateMachine.scala +++ b/core/src/main/scala/kafka/controller/PartitionStateMachine.scala @@ -146,10 +146,9 @@ class PartitionStateMachine(controller: KafkaController) extends Logging { val topicAndPartition = TopicAndPartition(topic, partition) val currState = partitionState.getOrElseUpdate(topicAndPartition, NonExistentPartition) try { + assertValidTransition(topicAndPartition, targetState) targetState match { case NewPartition => - // pre: partition did not exist before this - assertValidPreviousStates(topicAndPartition, List(NonExistentPartition), NewPartition) partitionState.put(topicAndPartition, NewPartition) val assignedReplicas = controllerContext.partitionReplicaAssignment(topicAndPartition).mkString(",") stateChangeLogger.trace("Controller %d epoch %d changed partition %s state from %s to %s with assigned replicas %s" @@ -157,7 +156,6 @@ class PartitionStateMachine(controller: KafkaController) extends Logging { assignedReplicas)) // post: partition has been assigned replicas case OnlinePartition => - assertValidPreviousStates(topicAndPartition, List(NewPartition, OnlinePartition, OfflinePartition), OnlinePartition) partitionState(topicAndPartition) match { case NewPartition => // initialize leader and isr path for new partition @@ -174,16 +172,12 @@ class PartitionStateMachine(controller: KafkaController) extends Logging { .format(controllerId, controller.epoch, topicAndPartition, currState, targetState, leader)) // post: partition has a leader case OfflinePartition => - // pre: partition should be in New or Online state - assertValidPreviousStates(topicAndPartition, List(NewPartition, OnlinePartition, OfflinePartition), OfflinePartition) // should be called when the leader for a partition is no longer alive stateChangeLogger.trace("Controller %d epoch %d changed partition %s state from %s to %s" .format(controllerId, controller.epoch, topicAndPartition, currState, targetState)) partitionState.put(topicAndPartition, OfflinePartition) // post: partition has no alive leader case NonExistentPartition => - // pre: partition should be in Offline state - assertValidPreviousStates(topicAndPartition, List(OfflinePartition), NonExistentPartition) stateChangeLogger.trace("Controller %d epoch %d changed partition %s state from %s to %s" .format(controllerId, controller.epoch, topicAndPartition, currState, targetState)) partitionState.put(topicAndPartition, NonExistentPartition) @@ -217,11 +211,10 @@ class PartitionStateMachine(controller: KafkaController) extends Logging { } } - private def assertValidPreviousStates(topicAndPartition: TopicAndPartition, fromStates: Seq[PartitionState], - targetState: PartitionState) { - if(!fromStates.contains(partitionState(topicAndPartition))) + private def assertValidTransition(topicAndPartition: TopicAndPartition, targetState: PartitionState): Unit = { + if (!targetState.validPreviousStates.contains(partitionState(topicAndPartition))) throw new IllegalStateException("Partition %s should be in the %s states before moving to %s state" - .format(topicAndPartition, fromStates.mkString(","), targetState) + ". Instead it is in %s state" + .format(topicAndPartition, targetState.validPreviousStates.mkString(","), targetState) + ". Instead it is in %s state" .format(partitionState(topicAndPartition))) } @@ -351,8 +344,27 @@ class PartitionStateMachine(controller: KafkaController) extends Logging { } } -sealed trait PartitionState { def state: Byte } -case object NewPartition extends PartitionState { val state: Byte = 0 } -case object OnlinePartition extends PartitionState { val state: Byte = 1 } -case object OfflinePartition extends PartitionState { val state: Byte = 2 } -case object NonExistentPartition extends PartitionState { val state: Byte = 3 } +sealed trait PartitionState { + def state: Byte + def validPreviousStates: Set[PartitionState] +} + +case object NewPartition extends PartitionState { + val state: Byte = 0 + val validPreviousStates: Set[PartitionState] = Set(NonExistentPartition) +} + +case object OnlinePartition extends PartitionState { + val state: Byte = 1 + val validPreviousStates: Set[PartitionState] = Set(NewPartition, OnlinePartition, OfflinePartition) +} + +case object OfflinePartition extends PartitionState { + val state: Byte = 2 + val validPreviousStates: Set[PartitionState] = Set(NewPartition, OnlinePartition, OfflinePartition) +} + +case object NonExistentPartition extends PartitionState { + val state: Byte = 3 + val validPreviousStates: Set[PartitionState] = Set(OfflinePartition) +} diff --git a/core/src/main/scala/kafka/controller/ReplicaStateMachine.scala b/core/src/main/scala/kafka/controller/ReplicaStateMachine.scala index c812f4a863d..0759ed4b109 100755 --- a/core/src/main/scala/kafka/controller/ReplicaStateMachine.scala +++ b/core/src/main/scala/kafka/controller/ReplicaStateMachine.scala @@ -46,6 +46,7 @@ class ReplicaStateMachine(controller: KafkaController) extends Logging { private val zkUtils = controllerContext.zkUtils private val replicaState: mutable.Map[PartitionAndReplica, ReplicaState] = mutable.Map.empty private val brokerRequestBatch = new ControllerBrokerRequestBatch(controller) + private val stateChangeLogger = KafkaController.stateChangeLogger this.logIdent = "[Replica state machine on controller " + controller.config.brokerId + "]: " @@ -136,9 +137,9 @@ class ReplicaStateMachine(controller: KafkaController) extends Logging { val currState = replicaState.getOrElseUpdate(partitionAndReplica, NonExistentReplica) try { val replicaAssignment = controllerContext.partitionReplicaAssignment(topicAndPartition) + assertValidTransition(partitionAndReplica, targetState) targetState match { case NewReplica => - assertValidPreviousStates(partitionAndReplica, List(NonExistentReplica), targetState) // start replica as a follower to the current leader for its partition val leaderIsrAndControllerEpochOpt = ReplicationUtils.getLeaderIsrAndEpochForPartition(zkUtils, topic, partition) leaderIsrAndControllerEpochOpt match { @@ -156,7 +157,6 @@ class ReplicaStateMachine(controller: KafkaController) extends Logging { .format(controllerId, controller.epoch, replicaId, topicAndPartition, currState, targetState)) case ReplicaDeletionStarted => - assertValidPreviousStates(partitionAndReplica, List(OfflineReplica), targetState) replicaState.put(partitionAndReplica, ReplicaDeletionStarted) // send stop replica command brokerRequestBatch.addStopReplicaRequestForBrokers(List(replicaId), topic, partition, deletePartition = true, @@ -164,17 +164,14 @@ class ReplicaStateMachine(controller: KafkaController) extends Logging { stateChangeLogger.trace("Controller %d epoch %d changed state of replica %d for partition %s from %s to %s" .format(controllerId, controller.epoch, replicaId, topicAndPartition, currState, targetState)) case ReplicaDeletionIneligible => - assertValidPreviousStates(partitionAndReplica, List(ReplicaDeletionStarted), targetState) replicaState.put(partitionAndReplica, ReplicaDeletionIneligible) stateChangeLogger.trace("Controller %d epoch %d changed state of replica %d for partition %s from %s to %s" .format(controllerId, controller.epoch, replicaId, topicAndPartition, currState, targetState)) case ReplicaDeletionSuccessful => - assertValidPreviousStates(partitionAndReplica, List(ReplicaDeletionStarted), targetState) replicaState.put(partitionAndReplica, ReplicaDeletionSuccessful) stateChangeLogger.trace("Controller %d epoch %d changed state of replica %d for partition %s from %s to %s" .format(controllerId, controller.epoch, replicaId, topicAndPartition, currState, targetState)) case NonExistentReplica => - assertValidPreviousStates(partitionAndReplica, List(ReplicaDeletionSuccessful), targetState) // remove this replica from the assigned replicas list for its partition val currentAssignedReplicas = controllerContext.partitionReplicaAssignment(topicAndPartition) controllerContext.partitionReplicaAssignment.put(topicAndPartition, currentAssignedReplicas.filterNot(_ == replicaId)) @@ -182,8 +179,6 @@ class ReplicaStateMachine(controller: KafkaController) extends Logging { stateChangeLogger.trace("Controller %d epoch %d changed state of replica %d for partition %s from %s to %s" .format(controllerId, controller.epoch, replicaId, topicAndPartition, currState, targetState)) case OnlineReplica => - assertValidPreviousStates(partitionAndReplica, - List(NewReplica, OnlineReplica, OfflineReplica, ReplicaDeletionIneligible), targetState) replicaState(partitionAndReplica) match { case NewReplica => // add this replica to the assigned replicas list for its partition @@ -208,8 +203,6 @@ class ReplicaStateMachine(controller: KafkaController) extends Logging { } replicaState.put(partitionAndReplica, OnlineReplica) case OfflineReplica => - assertValidPreviousStates(partitionAndReplica, - List(NewReplica, OnlineReplica, OfflineReplica, ReplicaDeletionIneligible), targetState) // send stop replica command to the replica so that it stops fetching from the leader brokerRequestBatch.addStopReplicaRequestForBrokers(List(replicaId), topic, partition, deletePartition = false) // As an optimization, the controller removes dead replicas from the ISR @@ -273,11 +266,10 @@ class ReplicaStateMachine(controller: KafkaController) extends Logging { replicaState.filter(r => r._1.topic.equals(topic) && deletionStates.contains(r._2)).keySet } - private def assertValidPreviousStates(partitionAndReplica: PartitionAndReplica, fromStates: Seq[ReplicaState], - targetState: ReplicaState) { - assert(fromStates.contains(replicaState(partitionAndReplica)), + private def assertValidTransition(partitionAndReplica: PartitionAndReplica, targetState: ReplicaState): Unit = { + assert(targetState.validPreviousStates.contains(replicaState(partitionAndReplica)), "Replica %s should be in the %s states before moving to %s state" - .format(partitionAndReplica, fromStates.mkString(","), targetState) + + .format(partitionAndReplica, targetState.validPreviousStates.mkString(","), targetState) + ". Instead it is in %s state".format(replicaState(partitionAndReplica))) } @@ -307,11 +299,42 @@ class ReplicaStateMachine(controller: KafkaController) extends Logging { } } -sealed trait ReplicaState { def state: Byte } -case object NewReplica extends ReplicaState { val state: Byte = 1 } -case object OnlineReplica extends ReplicaState { val state: Byte = 2 } -case object OfflineReplica extends ReplicaState { val state: Byte = 3 } -case object ReplicaDeletionStarted extends ReplicaState { val state: Byte = 4} -case object ReplicaDeletionSuccessful extends ReplicaState { val state: Byte = 5} -case object ReplicaDeletionIneligible extends ReplicaState { val state: Byte = 6} -case object NonExistentReplica extends ReplicaState { val state: Byte = 7 } +sealed trait ReplicaState { + def state: Byte + def validPreviousStates: Set[ReplicaState] +} + +case object NewReplica extends ReplicaState { + val state: Byte = 1 + val validPreviousStates: Set[ReplicaState] = Set(NonExistentReplica) +} + +case object OnlineReplica extends ReplicaState { + val state: Byte = 2 + val validPreviousStates: Set[ReplicaState] = Set(NewReplica, OnlineReplica, OfflineReplica, ReplicaDeletionIneligible) +} + +case object OfflineReplica extends ReplicaState { + val state: Byte = 3 + val validPreviousStates: Set[ReplicaState] = Set(NewReplica, OnlineReplica, OfflineReplica, ReplicaDeletionIneligible) +} + +case object ReplicaDeletionStarted extends ReplicaState { + val state: Byte = 4 + val validPreviousStates: Set[ReplicaState] = Set(OfflineReplica) +} + +case object ReplicaDeletionSuccessful extends ReplicaState { + val state: Byte = 5 + val validPreviousStates: Set[ReplicaState] = Set(ReplicaDeletionStarted) +} + +case object ReplicaDeletionIneligible extends ReplicaState { + val state: Byte = 6 + val validPreviousStates: Set[ReplicaState] = Set(ReplicaDeletionStarted) +} + +case object NonExistentReplica extends ReplicaState { + val state: Byte = 7 + val validPreviousStates: Set[ReplicaState] = Set(ReplicaDeletionSuccessful) +}