diff --git a/core/src/main/scala/kafka/controller/ControllerContext.scala b/core/src/main/scala/kafka/controller/ControllerContext.scala index 47cb553dad1..f7a6cdd19a1 100644 --- a/core/src/main/scala/kafka/controller/ControllerContext.scala +++ b/core/src/main/scala/kafka/controller/ControllerContext.scala @@ -221,7 +221,9 @@ class ControllerContext { def replicasForTopic(topic: String): Set[PartitionAndReplica] = { partitionAssignments.getOrElse(topic, mutable.Map.empty).flatMap { - case (partition, assignment) => assignment.replicas.map(r => PartitionAndReplica(new TopicPartition(topic, partition), r)) + case (partition, assignment) => assignment.replicas.map { r => + PartitionAndReplica(new TopicPartition(topic, partition), r) + } }.toSet } @@ -231,12 +233,6 @@ class ControllerContext { }.toSet } - def allLiveReplicas(): Set[PartitionAndReplica] = { - replicasOnBrokers(liveBrokerIds).filter { partitionAndReplica => - isReplicaOnline(partitionAndReplica.replica, partitionAndReplica.topicPartition) - } - } - /** * Get all online and offline replicas. * diff --git a/core/src/main/scala/kafka/controller/TopicDeletionManager.scala b/core/src/main/scala/kafka/controller/TopicDeletionManager.scala index d032b3b499d..64f9ff052d1 100755 --- a/core/src/main/scala/kafka/controller/TopicDeletionManager.scala +++ b/core/src/main/scala/kafka/controller/TopicDeletionManager.scala @@ -22,6 +22,7 @@ import kafka.zk.KafkaZkClient import org.apache.kafka.common.TopicPartition import scala.collection.Set +import scala.collection.mutable trait DeletionClient { def deleteTopic(topic: String, epochZkVersion: Int): Unit @@ -226,12 +227,12 @@ class TopicDeletionManager(config: KafkaConfig, /** * If the topic is queued for deletion but deletion is not currently under progress, then deletion is retried for that topic * To ensure a successful retry, reset states for respective replicas from ReplicaDeletionIneligible to OfflineReplica state - *@param topic Topic for which deletion should be retried + * @param topics Topics for which deletion should be retried */ - private def retryDeletionForIneligibleReplicas(topic: String): Unit = { + private def retryDeletionForIneligibleReplicas(topics: Set[String]): Unit = { // reset replica states from ReplicaDeletionIneligible to OfflineReplica - val failedReplicas = controllerContext.replicasInState(topic, ReplicaDeletionIneligible) - info(s"Retrying deletion of topic $topic since replicas ${failedReplicas.mkString(",")} were not successfully deleted") + val failedReplicas = topics.flatMap(controllerContext.replicasInState(_, ReplicaDeletionIneligible)) + debug(s"Retrying deletion of topics ${topics.mkString(",")} since replicas ${failedReplicas.mkString(",")} were not successfully deleted") replicaStateMachine.handleStateChanges(failedReplicas.toSeq, OfflineReplica) } @@ -256,9 +257,6 @@ class TopicDeletionManager(config: KafkaConfig, * removed from their caches. */ private def onTopicDeletion(topics: Set[String]): Unit = { - info(s"Topic deletion callback for ${topics.mkString(",")}") - // send update metadata so that brokers stop serving data for topics to be deleted - val partitions = topics.flatMap(controllerContext.partitionsForTopic) val unseenTopicsForDeletion = topics -- controllerContext.topicsWithDeletionStarted if (unseenTopicsForDeletion.nonEmpty) { val unseenPartitionsForDeletion = unseenTopicsForDeletion.flatMap(controllerContext.partitionsForTopic) @@ -269,66 +267,61 @@ class TopicDeletionManager(config: KafkaConfig, controllerContext.beginTopicDeletion(unseenTopicsForDeletion) } - client.sendMetadataUpdate(partitions) - topics.foreach { topic => - onPartitionDeletion(controllerContext.partitionsForTopic(topic)) - } - } + // send update metadata so that brokers stop serving data for topics to be deleted + client.sendMetadataUpdate(topics.flatMap(controllerContext.partitionsForTopic)) - /** - * Invoked by onPartitionDeletion. It is the 2nd step of topic deletion, the first being sending - * UpdateMetadata requests to all brokers to start rejecting requests for deleted topics. As part of starting deletion, - * the topics are added to the in progress list. As long as a topic is in the in progress list, deletion for that topic - * is never retried. A topic is removed from the in progress list when - * 1. Either the topic is successfully deleted OR - * 2. No replica for the topic is in ReplicaDeletionStarted state and at least one replica is in ReplicaDeletionIneligible state - * If the topic is queued for deletion but deletion is not currently under progress, then deletion is retried for that topic - * As part of starting deletion, all replicas are moved to the ReplicaDeletionStarted state where the controller sends - * the replicas a StopReplicaRequest (delete=true) - * This method does the following things - - * 1. Move all dead replicas directly to ReplicaDeletionIneligible state. Also mark the respective topics ineligible - * for deletion if some replicas are dead since it won't complete successfully anyway - * 2. Move all alive replicas to ReplicaDeletionStarted state so they can be deleted successfully - * @param replicasForTopicsToBeDeleted - */ - private def startReplicaDeletion(replicasForTopicsToBeDeleted: Set[PartitionAndReplica]): Unit = { - replicasForTopicsToBeDeleted.groupBy(_.topic).keys.foreach { topic => - val aliveReplicasForTopic = controllerContext.allLiveReplicas().filter(p => p.topic == topic) - val deadReplicasForTopic = replicasForTopicsToBeDeleted -- aliveReplicasForTopic - val successfullyDeletedReplicas = controllerContext.replicasInState(topic, ReplicaDeletionSuccessful) - val replicasForDeletionRetry = aliveReplicasForTopic -- successfullyDeletedReplicas - // move dead replicas directly to failed state - replicaStateMachine.handleStateChanges(deadReplicasForTopic.toSeq, ReplicaDeletionIneligible) - // send stop replica to all followers that are not in the OfflineReplica state so they stop sending fetch requests to the leader - replicaStateMachine.handleStateChanges(replicasForDeletionRetry.toSeq, OfflineReplica) - debug(s"Deletion started for replicas ${replicasForDeletionRetry.mkString(",")}") - replicaStateMachine.handleStateChanges(replicasForDeletionRetry.toSeq, ReplicaDeletionStarted) - if (deadReplicasForTopic.nonEmpty) { - debug(s"Dead Replicas (${deadReplicasForTopic.mkString(",")}) found for topic $topic") - markTopicIneligibleForDeletion(Set(topic), reason = "offline replicas") - } - } + onPartitionDeletion(topics) } /** * Invoked by onTopicDeletion with the list of partitions for topics to be deleted * It does the following - - * 1. Send UpdateMetadataRequest to all live brokers (that are not shutting down) for partitions that are being - * deleted. The brokers start rejecting all client requests with UnknownTopicOrPartitionException + * 1. Move all dead replicas directly to ReplicaDeletionIneligible state. Also mark the respective topics ineligible + * for deletion if some replicas are dead since it won't complete successfully anyway * 2. Move all replicas for the partitions to OfflineReplica state. This will send StopReplicaRequest to the replicas * and LeaderAndIsrRequest to the leader with the shrunk ISR. When the leader replica itself is moved to OfflineReplica state, * it will skip sending the LeaderAndIsrRequest since the leader will be updated to -1 * 3. Move all replicas to ReplicaDeletionStarted state. This will send StopReplicaRequest with deletePartition=true. And * will delete all persistent data from all replicas of the respective partitions */ - private def onPartitionDeletion(partitionsToBeDeleted: Set[TopicPartition]): Unit = { - info(s"Partition deletion callback for ${partitionsToBeDeleted.mkString(",")}") - val replicasPerPartition = controllerContext.replicasForPartition(partitionsToBeDeleted) - startReplicaDeletion(replicasPerPartition) + private def onPartitionDeletion(topicsToBeDeleted: Set[String]): Unit = { + val allDeadReplicas = mutable.ListBuffer.empty[PartitionAndReplica] + val allReplicasForDeletionRetry = mutable.ListBuffer.empty[PartitionAndReplica] + val allTopicsIneligibleForDeletion = mutable.Set.empty[String] + + topicsToBeDeleted.foreach { topic => + val (aliveReplicas, deadReplicas) = controllerContext.replicasForTopic(topic).partition { r => + controllerContext.isReplicaOnline(r.replica, r.topicPartition) + } + + val successfullyDeletedReplicas = controllerContext.replicasInState(topic, ReplicaDeletionSuccessful) + val replicasForDeletionRetry = aliveReplicas -- successfullyDeletedReplicas + + allDeadReplicas ++= deadReplicas + allReplicasForDeletionRetry ++= replicasForDeletionRetry + + if (deadReplicas.nonEmpty) { + debug(s"Dead Replicas (${deadReplicas.mkString(",")}) found for topic $topic") + allTopicsIneligibleForDeletion += topic + } + } + + // move dead replicas directly to failed state + replicaStateMachine.handleStateChanges(allDeadReplicas, ReplicaDeletionIneligible) + // send stop replica to all followers that are not in the OfflineReplica state so they stop sending fetch requests to the leader + replicaStateMachine.handleStateChanges(allReplicasForDeletionRetry, OfflineReplica) + replicaStateMachine.handleStateChanges(allReplicasForDeletionRetry, ReplicaDeletionStarted) + + if (allTopicsIneligibleForDeletion.nonEmpty) { + markTopicIneligibleForDeletion(allTopicsIneligibleForDeletion, reason = "offline replicas") + } } private def resumeDeletions(): Unit = { val topicsQueuedForDeletion = Set.empty[String] ++ controllerContext.topicsToBeDeleted + val topicsEligibleForRetry = mutable.Set.empty[String] + val topicsEligibleForDeletion = mutable.Set.empty[String] + if (topicsQueuedForDeletion.nonEmpty) info(s"Handling deletion for topics ${topicsQueuedForDeletion.mkString(",")}") @@ -343,16 +336,25 @@ class TopicDeletionManager(config: KafkaConfig, // TopicDeletionSuccessful. That means, that either given topic haven't initiated deletion // or there is at least one failed replica (which means topic deletion should be retried). if (controllerContext.isAnyReplicaInState(topic, ReplicaDeletionIneligible)) { - retryDeletionForIneligibleReplicas(topic) + topicsEligibleForRetry += topic } } - // Try delete topic if it is eligible for deletion. + // Add topic to the eligible set if it is eligible for deletion. if (isTopicEligibleForDeletion(topic)) { info(s"Deletion of topic $topic (re)started") - // topic deletion will be kicked off - onTopicDeletion(Set(topic)) + topicsEligibleForDeletion += topic } } + + // topic deletion retry will be kicked off + if (topicsEligibleForRetry.nonEmpty) { + retryDeletionForIneligibleReplicas(topicsEligibleForRetry) + } + + // topic deletion will be kicked off + if (topicsEligibleForDeletion.nonEmpty) { + onTopicDeletion(topicsEligibleForDeletion) + } } } diff --git a/core/src/test/scala/unit/kafka/controller/ControllerContextTest.scala b/core/src/test/scala/unit/kafka/controller/ControllerContextTest.scala index fd8d3e7a33b..39023fa0945 100644 --- a/core/src/test/scala/unit/kafka/controller/ControllerContextTest.scala +++ b/core/src/test/scala/unit/kafka/controller/ControllerContextTest.scala @@ -18,6 +18,7 @@ package unit.kafka.controller import kafka.cluster.{Broker, EndPoint} +import kafka.controller.PartitionAndReplica import kafka.controller.{ControllerContext, ReplicaAssignment} import org.apache.kafka.common.TopicPartition import org.apache.kafka.common.network.ListenerName @@ -50,14 +51,12 @@ class ControllerContextTest { // Simple round-robin replica assignment var leaderIndex = 0 - Seq(tp1, tp2, tp3).foreach { - partition => - val replicas = brokers.indices.map { i => - val replica = brokers((i + leaderIndex) % brokers.size) - replica - } - context.updatePartitionFullReplicaAssignment(partition, ReplicaAssignment(replicas)) - leaderIndex += 1 + Seq(tp1, tp2, tp3).foreach { partition => + val replicas = brokers.indices.map { i => + brokers((i + leaderIndex) % brokers.size) + } + context.updatePartitionFullReplicaAssignment(partition, ReplicaAssignment(replicas)) + leaderIndex += 1 } } diff --git a/core/src/test/scala/unit/kafka/controller/MockPartitionStateMachine.scala b/core/src/test/scala/unit/kafka/controller/MockPartitionStateMachine.scala index 0c6c00dd630..b29a3d9a585 100644 --- a/core/src/test/scala/unit/kafka/controller/MockPartitionStateMachine.scala +++ b/core/src/test/scala/unit/kafka/controller/MockPartitionStateMachine.scala @@ -27,11 +27,23 @@ class MockPartitionStateMachine(controllerContext: ControllerContext, uncleanLeaderElectionEnabled: Boolean) extends PartitionStateMachine(controllerContext) { + var stateChangesByTargetState = mutable.Map.empty[PartitionState, Int].withDefaultValue(0) + + def stateChangesCalls(targetState: PartitionState): Int = { + stateChangesByTargetState(targetState) + } + + def clear(): Unit = { + stateChangesByTargetState.clear() + } + override def handleStateChanges( partitions: Seq[TopicPartition], targetState: PartitionState, leaderElectionStrategy: Option[PartitionLeaderElectionStrategy] ): Map[TopicPartition, Either[Throwable, LeaderAndIsr]] = { + stateChangesByTargetState(targetState) = stateChangesByTargetState(targetState) + 1 + partitions.foreach(partition => controllerContext.putPartitionStateIfNotExists(partition, NonExistentPartition)) val (validPartitions, invalidPartitions) = controllerContext.checkValidPartitionStateChange(partitions, targetState) if (invalidPartitions.nonEmpty) { diff --git a/core/src/test/scala/unit/kafka/controller/MockReplicaStateMachine.scala b/core/src/test/scala/unit/kafka/controller/MockReplicaStateMachine.scala index e5207bfddb3..32bfc50d6bb 100644 --- a/core/src/test/scala/unit/kafka/controller/MockReplicaStateMachine.scala +++ b/core/src/test/scala/unit/kafka/controller/MockReplicaStateMachine.scala @@ -17,10 +17,22 @@ package kafka.controller import scala.collection.Seq +import scala.collection.mutable class MockReplicaStateMachine(controllerContext: ControllerContext) extends ReplicaStateMachine(controllerContext) { + val stateChangesByTargetState = mutable.Map.empty[ReplicaState, Int].withDefaultValue(0) + + def stateChangesCalls(targetState: ReplicaState): Int = { + stateChangesByTargetState(targetState) + } + + def clear(): Unit = { + stateChangesByTargetState.clear() + } override def handleStateChanges(replicas: Seq[PartitionAndReplica], targetState: ReplicaState): Unit = { + stateChangesByTargetState(targetState) = stateChangesByTargetState(targetState) + 1 + replicas.foreach(replica => controllerContext.putReplicaStateIfNotExists(replica, NonExistentReplica)) val (validReplicas, invalidReplicas) = controllerContext.checkValidReplicaStateChange(replicas, targetState) if (invalidReplicas.nonEmpty) { diff --git a/core/src/test/scala/unit/kafka/controller/TopicDeletionManagerTest.scala b/core/src/test/scala/unit/kafka/controller/TopicDeletionManagerTest.scala index 33479c1753b..b1b8c2471e4 100644 --- a/core/src/test/scala/unit/kafka/controller/TopicDeletionManagerTest.scala +++ b/core/src/test/scala/unit/kafka/controller/TopicDeletionManagerTest.scala @@ -76,25 +76,43 @@ class TopicDeletionManagerTest { val fooPartitions = controllerContext.partitionsForTopic("foo") val fooReplicas = controllerContext.replicasForPartition(fooPartitions).toSet + val barPartitions = controllerContext.partitionsForTopic("bar") + val barReplicas = controllerContext.replicasForPartition(barPartitions).toSet + + // Clean up state changes before starting the deletion + replicaStateMachine.clear() + partitionStateMachine.clear() // Queue the topic for deletion - deletionManager.enqueueTopicsForDeletion(Set("foo")) + deletionManager.enqueueTopicsForDeletion(Set("foo", "bar")) assertEquals(fooPartitions, controllerContext.partitionsInState("foo", NonExistentPartition)) assertEquals(fooReplicas, controllerContext.replicasInState("foo", ReplicaDeletionStarted)) - verify(deletionClient).sendMetadataUpdate(fooPartitions) - assertEquals(Set("foo"), controllerContext.topicsToBeDeleted) - assertEquals(Set("foo"), controllerContext.topicsWithDeletionStarted) + assertEquals(barPartitions, controllerContext.partitionsInState("bar", NonExistentPartition)) + assertEquals(barReplicas, controllerContext.replicasInState("bar", ReplicaDeletionStarted)) + verify(deletionClient).sendMetadataUpdate(fooPartitions ++ barPartitions) + assertEquals(Set("foo", "bar"), controllerContext.topicsToBeDeleted) + assertEquals(Set("foo", "bar"), controllerContext.topicsWithDeletionStarted) assertEquals(Set(), controllerContext.topicsIneligibleForDeletion) // Complete the deletion - deletionManager.completeReplicaDeletion(fooReplicas) + deletionManager.completeReplicaDeletion(fooReplicas ++ barReplicas) assertEquals(Set.empty, controllerContext.partitionsForTopic("foo")) assertEquals(Set.empty[PartitionAndReplica], controllerContext.replicaStates.keySet.filter(_.topic == "foo")) + assertEquals(Set.empty, controllerContext.partitionsForTopic("bar")) + assertEquals(Set.empty[PartitionAndReplica], controllerContext.replicaStates.keySet.filter(_.topic == "bar")) assertEquals(Set(), controllerContext.topicsToBeDeleted) assertEquals(Set(), controllerContext.topicsWithDeletionStarted) assertEquals(Set(), controllerContext.topicsIneligibleForDeletion) + + assertEquals(1, partitionStateMachine.stateChangesCalls(OfflinePartition)) + assertEquals(1, partitionStateMachine.stateChangesCalls(NonExistentPartition)) + + assertEquals(1, replicaStateMachine.stateChangesCalls(ReplicaDeletionIneligible)) + assertEquals(1, replicaStateMachine.stateChangesCalls(OfflineReplica)) + assertEquals(1, replicaStateMachine.stateChangesCalls(ReplicaDeletionStarted)) + assertEquals(1, replicaStateMachine.stateChangesCalls(ReplicaDeletionSuccessful)) } @Test