From 5aaaba7ffe51a3216262811f24e6ccf10488b5e1 Mon Sep 17 00:00:00 2001 From: Onur Karaman Date: Thu, 18 May 2017 11:31:47 +0100 Subject: [PATCH] KAFKA-5258; Move all partition and replica state transition rules into their states Today PartitionStateMachine and ReplicaStateMachine define and assert the valid state transitions inline for each state. It's cleaner to move the transition rules into ReplicaState/PartitionState and do the assertion at the top of the handleStateChange. Author: Onur Karaman Reviewers: Ismael Juma Closes #3071 from onurkaraman/KAFKA-5258 --- .../controller/PartitionStateMachine.scala | 44 ++++++++----- .../controller/ReplicaStateMachine.scala | 65 +++++++++++++------ 2 files changed, 72 insertions(+), 37 deletions(-) diff --git a/core/src/main/scala/kafka/controller/PartitionStateMachine.scala b/core/src/main/scala/kafka/controller/PartitionStateMachine.scala index 5bed329d74f..4cffc132606 100755 --- a/core/src/main/scala/kafka/controller/PartitionStateMachine.scala +++ b/core/src/main/scala/kafka/controller/PartitionStateMachine.scala @@ -146,10 +146,9 @@ class PartitionStateMachine(controller: KafkaController) extends Logging { val topicAndPartition = TopicAndPartition(topic, partition) val currState = partitionState.getOrElseUpdate(topicAndPartition, NonExistentPartition) try { + assertValidTransition(topicAndPartition, targetState) targetState match { case NewPartition => - // pre: partition did not exist before this - assertValidPreviousStates(topicAndPartition, List(NonExistentPartition), NewPartition) partitionState.put(topicAndPartition, NewPartition) val assignedReplicas = controllerContext.partitionReplicaAssignment(topicAndPartition).mkString(",") stateChangeLogger.trace("Controller %d epoch %d changed partition %s state from %s to %s with assigned replicas %s" @@ -157,7 +156,6 @@ class PartitionStateMachine(controller: KafkaController) extends Logging { assignedReplicas)) // post: partition has been assigned replicas case OnlinePartition => - assertValidPreviousStates(topicAndPartition, List(NewPartition, OnlinePartition, OfflinePartition), OnlinePartition) partitionState(topicAndPartition) match { case NewPartition => // initialize leader and isr path for new partition @@ -174,16 +172,12 @@ class PartitionStateMachine(controller: KafkaController) extends Logging { .format(controllerId, controller.epoch, topicAndPartition, currState, targetState, leader)) // post: partition has a leader case OfflinePartition => - // pre: partition should be in New or Online state - assertValidPreviousStates(topicAndPartition, List(NewPartition, OnlinePartition, OfflinePartition), OfflinePartition) // should be called when the leader for a partition is no longer alive stateChangeLogger.trace("Controller %d epoch %d changed partition %s state from %s to %s" .format(controllerId, controller.epoch, topicAndPartition, currState, targetState)) partitionState.put(topicAndPartition, OfflinePartition) // post: partition has no alive leader case NonExistentPartition => - // pre: partition should be in Offline state - assertValidPreviousStates(topicAndPartition, List(OfflinePartition), NonExistentPartition) stateChangeLogger.trace("Controller %d epoch %d changed partition %s state from %s to %s" .format(controllerId, controller.epoch, topicAndPartition, currState, targetState)) partitionState.put(topicAndPartition, NonExistentPartition) @@ -217,11 +211,10 @@ class PartitionStateMachine(controller: KafkaController) extends Logging { } } - private def assertValidPreviousStates(topicAndPartition: TopicAndPartition, fromStates: Seq[PartitionState], - targetState: PartitionState) { - if(!fromStates.contains(partitionState(topicAndPartition))) + private def assertValidTransition(topicAndPartition: TopicAndPartition, targetState: PartitionState): Unit = { + if (!targetState.validPreviousStates.contains(partitionState(topicAndPartition))) throw new IllegalStateException("Partition %s should be in the %s states before moving to %s state" - .format(topicAndPartition, fromStates.mkString(","), targetState) + ". Instead it is in %s state" + .format(topicAndPartition, targetState.validPreviousStates.mkString(","), targetState) + ". Instead it is in %s state" .format(partitionState(topicAndPartition))) } @@ -351,8 +344,27 @@ class PartitionStateMachine(controller: KafkaController) extends Logging { } } -sealed trait PartitionState { def state: Byte } -case object NewPartition extends PartitionState { val state: Byte = 0 } -case object OnlinePartition extends PartitionState { val state: Byte = 1 } -case object OfflinePartition extends PartitionState { val state: Byte = 2 } -case object NonExistentPartition extends PartitionState { val state: Byte = 3 } +sealed trait PartitionState { + def state: Byte + def validPreviousStates: Set[PartitionState] +} + +case object NewPartition extends PartitionState { + val state: Byte = 0 + val validPreviousStates: Set[PartitionState] = Set(NonExistentPartition) +} + +case object OnlinePartition extends PartitionState { + val state: Byte = 1 + val validPreviousStates: Set[PartitionState] = Set(NewPartition, OnlinePartition, OfflinePartition) +} + +case object OfflinePartition extends PartitionState { + val state: Byte = 2 + val validPreviousStates: Set[PartitionState] = Set(NewPartition, OnlinePartition, OfflinePartition) +} + +case object NonExistentPartition extends PartitionState { + val state: Byte = 3 + val validPreviousStates: Set[PartitionState] = Set(OfflinePartition) +} diff --git a/core/src/main/scala/kafka/controller/ReplicaStateMachine.scala b/core/src/main/scala/kafka/controller/ReplicaStateMachine.scala index c812f4a863d..0759ed4b109 100755 --- a/core/src/main/scala/kafka/controller/ReplicaStateMachine.scala +++ b/core/src/main/scala/kafka/controller/ReplicaStateMachine.scala @@ -46,6 +46,7 @@ class ReplicaStateMachine(controller: KafkaController) extends Logging { private val zkUtils = controllerContext.zkUtils private val replicaState: mutable.Map[PartitionAndReplica, ReplicaState] = mutable.Map.empty private val brokerRequestBatch = new ControllerBrokerRequestBatch(controller) + private val stateChangeLogger = KafkaController.stateChangeLogger this.logIdent = "[Replica state machine on controller " + controller.config.brokerId + "]: " @@ -136,9 +137,9 @@ class ReplicaStateMachine(controller: KafkaController) extends Logging { val currState = replicaState.getOrElseUpdate(partitionAndReplica, NonExistentReplica) try { val replicaAssignment = controllerContext.partitionReplicaAssignment(topicAndPartition) + assertValidTransition(partitionAndReplica, targetState) targetState match { case NewReplica => - assertValidPreviousStates(partitionAndReplica, List(NonExistentReplica), targetState) // start replica as a follower to the current leader for its partition val leaderIsrAndControllerEpochOpt = ReplicationUtils.getLeaderIsrAndEpochForPartition(zkUtils, topic, partition) leaderIsrAndControllerEpochOpt match { @@ -156,7 +157,6 @@ class ReplicaStateMachine(controller: KafkaController) extends Logging { .format(controllerId, controller.epoch, replicaId, topicAndPartition, currState, targetState)) case ReplicaDeletionStarted => - assertValidPreviousStates(partitionAndReplica, List(OfflineReplica), targetState) replicaState.put(partitionAndReplica, ReplicaDeletionStarted) // send stop replica command brokerRequestBatch.addStopReplicaRequestForBrokers(List(replicaId), topic, partition, deletePartition = true, @@ -164,17 +164,14 @@ class ReplicaStateMachine(controller: KafkaController) extends Logging { stateChangeLogger.trace("Controller %d epoch %d changed state of replica %d for partition %s from %s to %s" .format(controllerId, controller.epoch, replicaId, topicAndPartition, currState, targetState)) case ReplicaDeletionIneligible => - assertValidPreviousStates(partitionAndReplica, List(ReplicaDeletionStarted), targetState) replicaState.put(partitionAndReplica, ReplicaDeletionIneligible) stateChangeLogger.trace("Controller %d epoch %d changed state of replica %d for partition %s from %s to %s" .format(controllerId, controller.epoch, replicaId, topicAndPartition, currState, targetState)) case ReplicaDeletionSuccessful => - assertValidPreviousStates(partitionAndReplica, List(ReplicaDeletionStarted), targetState) replicaState.put(partitionAndReplica, ReplicaDeletionSuccessful) stateChangeLogger.trace("Controller %d epoch %d changed state of replica %d for partition %s from %s to %s" .format(controllerId, controller.epoch, replicaId, topicAndPartition, currState, targetState)) case NonExistentReplica => - assertValidPreviousStates(partitionAndReplica, List(ReplicaDeletionSuccessful), targetState) // remove this replica from the assigned replicas list for its partition val currentAssignedReplicas = controllerContext.partitionReplicaAssignment(topicAndPartition) controllerContext.partitionReplicaAssignment.put(topicAndPartition, currentAssignedReplicas.filterNot(_ == replicaId)) @@ -182,8 +179,6 @@ class ReplicaStateMachine(controller: KafkaController) extends Logging { stateChangeLogger.trace("Controller %d epoch %d changed state of replica %d for partition %s from %s to %s" .format(controllerId, controller.epoch, replicaId, topicAndPartition, currState, targetState)) case OnlineReplica => - assertValidPreviousStates(partitionAndReplica, - List(NewReplica, OnlineReplica, OfflineReplica, ReplicaDeletionIneligible), targetState) replicaState(partitionAndReplica) match { case NewReplica => // add this replica to the assigned replicas list for its partition @@ -208,8 +203,6 @@ class ReplicaStateMachine(controller: KafkaController) extends Logging { } replicaState.put(partitionAndReplica, OnlineReplica) case OfflineReplica => - assertValidPreviousStates(partitionAndReplica, - List(NewReplica, OnlineReplica, OfflineReplica, ReplicaDeletionIneligible), targetState) // send stop replica command to the replica so that it stops fetching from the leader brokerRequestBatch.addStopReplicaRequestForBrokers(List(replicaId), topic, partition, deletePartition = false) // As an optimization, the controller removes dead replicas from the ISR @@ -273,11 +266,10 @@ class ReplicaStateMachine(controller: KafkaController) extends Logging { replicaState.filter(r => r._1.topic.equals(topic) && deletionStates.contains(r._2)).keySet } - private def assertValidPreviousStates(partitionAndReplica: PartitionAndReplica, fromStates: Seq[ReplicaState], - targetState: ReplicaState) { - assert(fromStates.contains(replicaState(partitionAndReplica)), + private def assertValidTransition(partitionAndReplica: PartitionAndReplica, targetState: ReplicaState): Unit = { + assert(targetState.validPreviousStates.contains(replicaState(partitionAndReplica)), "Replica %s should be in the %s states before moving to %s state" - .format(partitionAndReplica, fromStates.mkString(","), targetState) + + .format(partitionAndReplica, targetState.validPreviousStates.mkString(","), targetState) + ". Instead it is in %s state".format(replicaState(partitionAndReplica))) } @@ -307,11 +299,42 @@ class ReplicaStateMachine(controller: KafkaController) extends Logging { } } -sealed trait ReplicaState { def state: Byte } -case object NewReplica extends ReplicaState { val state: Byte = 1 } -case object OnlineReplica extends ReplicaState { val state: Byte = 2 } -case object OfflineReplica extends ReplicaState { val state: Byte = 3 } -case object ReplicaDeletionStarted extends ReplicaState { val state: Byte = 4} -case object ReplicaDeletionSuccessful extends ReplicaState { val state: Byte = 5} -case object ReplicaDeletionIneligible extends ReplicaState { val state: Byte = 6} -case object NonExistentReplica extends ReplicaState { val state: Byte = 7 } +sealed trait ReplicaState { + def state: Byte + def validPreviousStates: Set[ReplicaState] +} + +case object NewReplica extends ReplicaState { + val state: Byte = 1 + val validPreviousStates: Set[ReplicaState] = Set(NonExistentReplica) +} + +case object OnlineReplica extends ReplicaState { + val state: Byte = 2 + val validPreviousStates: Set[ReplicaState] = Set(NewReplica, OnlineReplica, OfflineReplica, ReplicaDeletionIneligible) +} + +case object OfflineReplica extends ReplicaState { + val state: Byte = 3 + val validPreviousStates: Set[ReplicaState] = Set(NewReplica, OnlineReplica, OfflineReplica, ReplicaDeletionIneligible) +} + +case object ReplicaDeletionStarted extends ReplicaState { + val state: Byte = 4 + val validPreviousStates: Set[ReplicaState] = Set(OfflineReplica) +} + +case object ReplicaDeletionSuccessful extends ReplicaState { + val state: Byte = 5 + val validPreviousStates: Set[ReplicaState] = Set(ReplicaDeletionStarted) +} + +case object ReplicaDeletionIneligible extends ReplicaState { + val state: Byte = 6 + val validPreviousStates: Set[ReplicaState] = Set(ReplicaDeletionStarted) +} + +case object NonExistentReplica extends ReplicaState { + val state: Byte = 7 + val validPreviousStates: Set[ReplicaState] = Set(ReplicaDeletionSuccessful) +}