@ -22,6 +22,7 @@ import kafka.zk.KafkaZkClient
@@ -22,6 +22,7 @@ import kafka.zk.KafkaZkClient
import org.apache.kafka.common.TopicPartition
import scala.collection.Set
import scala.collection.mutable
trait DeletionClient {
def deleteTopic ( topic : String , epochZkVersion : Int ) : Unit
@ -226,12 +227,12 @@ class TopicDeletionManager(config: KafkaConfig,
@@ -226,12 +227,12 @@ class TopicDeletionManager(config: KafkaConfig,
/* *
* If the topic is queued for deletion but deletion is not currently under progress , then deletion is retried for that topic
* To ensure a successful retry , reset states for respective replicas from ReplicaDeletionIneligible to OfflineReplica state
*@ param topic Topic for which deletion should be retried
* @ param topics Topics for which deletion should be retried
*/
private def retryDeletionForIneligibleReplicas ( topic : String ) : Unit = {
private def retryDeletionForIneligibleReplicas ( topics : Set [ S tring ] ) : Unit = {
// reset replica states from ReplicaDeletionIneligible to OfflineReplica
val failedReplicas = controllerContext . replicasInState ( topic , ReplicaDeletionIneligible )
info ( s" Retrying deletion of topic $topic since replicas ${ failedReplicas . mkString ( "," ) } were not successfully deleted " )
val failedReplicas = topics . flatMap ( controllerContext . replicasInState ( _ , ReplicaDeletionIneligible ) )
debug ( s" Retrying deletion of topics ${ topics . mkString ( "," ) } since replicas ${ failedReplicas . mkString ( "," ) } were not successfully deleted " )
replicaStateMachine . handleStateChanges ( failedReplicas . toSeq , OfflineReplica )
}
@ -256,9 +257,6 @@ class TopicDeletionManager(config: KafkaConfig,
@@ -256,9 +257,6 @@ class TopicDeletionManager(config: KafkaConfig,
* removed from their caches .
*/
private def onTopicDeletion ( topics : Set [ String ] ) : Unit = {
info ( s" Topic deletion callback for ${ topics . mkString ( "," ) } " )
// send update metadata so that brokers stop serving data for topics to be deleted
val partitions = topics . flatMap ( controllerContext . partitionsForTopic )
val unseenTopicsForDeletion = topics -- controllerContext . topicsWithDeletionStarted
if ( unseenTopicsForDeletion . nonEmpty ) {
val unseenPartitionsForDeletion = unseenTopicsForDeletion . flatMap ( controllerContext . partitionsForTopic )
@ -269,66 +267,61 @@ class TopicDeletionManager(config: KafkaConfig,
@@ -269,66 +267,61 @@ class TopicDeletionManager(config: KafkaConfig,
controllerContext . beginTopicDeletion ( unseenTopicsForDeletion )
}
client . sendMetadataUpdate ( partitions )
topics . foreach { topic =>
onPartitionDeletion ( controllerContext . partitionsForTopic ( topic ) )
}
}
// send update metadata so that brokers stop serving data for topics to be deleted
client . sendMetadataUpdate ( topics . flatMap ( controllerContext . partitionsForTopic ) )
/* *
* Invoked by onPartitionDeletion . It is the 2 nd step of topic deletion , the first being sending
* UpdateMetadata requests to all brokers to start rejecting requests for deleted topics . As part of starting deletion ,
* the topics are added to the in progress list . As long as a topic is in the in progress list , deletion for that topic
* is never retried . A topic is removed from the in progress list when
* 1. Either the topic is successfully deleted OR
* 2. No replica for the topic is in ReplicaDeletionStarted state and at least one replica is in ReplicaDeletionIneligible state
* If the topic is queued for deletion but deletion is not currently under progress , then deletion is retried for that topic
* As part of starting deletion , all replicas are moved to the ReplicaDeletionStarted state where the controller sends
* the replicas a StopReplicaRequest ( delete = true )
* This method does the following things -
* 1. Move all dead replicas directly to ReplicaDeletionIneligible state . Also mark the respective topics ineligible
* for deletion if some replicas are dead since it won 't complete successfully anyway
* 2. Move all alive replicas to ReplicaDeletionStarted state so they can be deleted successfully
* @param replicasForTopicsToBeDeleted
*/
private def startReplicaDeletion ( replicasForTopicsToBeDeleted : Set [ PartitionAndReplica ] ) : Unit = {
replicasForTopicsToBeDeleted . groupBy ( _ . topic ) . keys . foreach { topic =>
val aliveReplicasForTopic = controllerContext . allLiveReplicas ( ) . filter ( p => p . topic == topic )
val deadReplicasForTopic = replicasForTopicsToBeDeleted -- aliveReplicasForTopic
val successfullyDeletedReplicas = controllerContext . replicasInState ( topic , ReplicaDeletionSuccessful )
val replicasForDeletionRetry = aliveReplicasForTopic -- successfullyDeletedReplicas
// move dead replicas directly to failed state
replicaStateMachine . handleStateChanges ( deadReplicasForTopic . toSeq , ReplicaDeletionIneligible )
// send stop replica to all followers that are not in the OfflineReplica state so they stop sending fetch requests to the leader
replicaStateMachine . handleStateChanges ( replicasForDeletionRetry . toSeq , OfflineReplica )
debug ( s" Deletion started for replicas ${ replicasForDeletionRetry . mkString ( "," ) } " )
replicaStateMachine . handleStateChanges ( replicasForDeletionRetry . toSeq , ReplicaDeletionStarted )
if ( deadReplicasForTopic . nonEmpty ) {
debug ( s" Dead Replicas ( ${ deadReplicasForTopic . mkString ( "," ) } ) found for topic $topic " )
markTopicIneligibleForDeletion ( Set ( topic ) , reason = "offline replicas" )
}
}
onPartitionDeletion ( topics )
}
/* *
* Invoked by onTopicDeletion with the list of partitions for topics to be deleted
* It does the following -
* 1. Send UpdateMetadataRequest to all live brokers ( that are not shutting down ) for partitions that are being
* deleted . The brokers start rejecting all client requests with UnknownTopicOrPartitionException
* 1. Move all dead replicas directly to ReplicaDeletionIneligible state . Also mark the respective topics ineligible
* for deletion if some replicas are dead since it won 't complete successfully anyway
* 2. Move all replicas for the partitions to OfflineReplica state . This will send StopReplicaRequest to the replicas
* and LeaderAndIsrRequest to the leader with the shrunk ISR . When the leader replica itself is moved to OfflineReplica state ,
* it will skip sending the LeaderAndIsrRequest since the leader will be updated to - 1
* 3. Move all replicas to ReplicaDeletionStarted state . This will send StopReplicaRequest with deletePartition = true . And
* will delete all persistent data from all replicas of the respective partitions
*/
private def onPartitionDeletion ( partitionsToBeDeleted : Set [ TopicPartition ] ) : Unit = {
info ( s" Partition deletion callback for ${ partitionsToBeDeleted . mkString ( "," ) } " )
val replicasPerPartition = controllerContext . replicasForPartition ( partitionsToBeDeleted )
startReplicaDeletion ( replicasPerPartition )
private def onPartitionDeletion ( topicsToBeDeleted : Set [ String ] ) : Unit = {
val allDeadReplicas = mutable . ListBuffer . empty [ PartitionAndReplica ]
val allReplicasForDeletionRetry = mutable . ListBuffer . empty [ PartitionAndReplica ]
val allTopicsIneligibleForDeletion = mutable . Set . empty [ String ]
topicsToBeDeleted . foreach { topic =>
val ( aliveReplicas , deadReplicas ) = controllerContext . replicasForTopic ( topic ) . partition { r =>
controllerContext . isReplicaOnline ( r . replica , r . topicPartition )
}
val successfullyDeletedReplicas = controllerContext . replicasInState ( topic , ReplicaDeletionSuccessful )
val replicasForDeletionRetry = aliveReplicas -- successfullyDeletedReplicas
allDeadReplicas ++= deadReplicas
allReplicasForDeletionRetry ++= replicasForDeletionRetry
if ( deadReplicas . nonEmpty ) {
debug ( s" Dead Replicas ( ${ deadReplicas . mkString ( "," ) } ) found for topic $topic " )
allTopicsIneligibleForDeletion += topic
}
}
// move dead replicas directly to failed state
replicaStateMachine . handleStateChanges ( allDeadReplicas , ReplicaDeletionIneligible )
// send stop replica to all followers that are not in the OfflineReplica state so they stop sending fetch requests to the leader
replicaStateMachine . handleStateChanges ( allReplicasForDeletionRetry , OfflineReplica )
replicaStateMachine . handleStateChanges ( allReplicasForDeletionRetry , ReplicaDeletionStarted )
if ( allTopicsIneligibleForDeletion . nonEmpty ) {
markTopicIneligibleForDeletion ( allTopicsIneligibleForDeletion , reason = "offline replicas" )
}
}
private def resumeDeletions ( ) : Unit = {
val topicsQueuedForDeletion = Set . empty [ String ] ++ controllerContext . topicsToBeDeleted
val topicsEligibleForRetry = mutable . Set . empty [ String ]
val topicsEligibleForDeletion = mutable . Set . empty [ String ]
if ( topicsQueuedForDeletion . nonEmpty )
info ( s" Handling deletion for topics ${ topicsQueuedForDeletion . mkString ( "," ) } " )
@ -343,16 +336,25 @@ class TopicDeletionManager(config: KafkaConfig,
@@ -343,16 +336,25 @@ class TopicDeletionManager(config: KafkaConfig,
// TopicDeletionSuccessful . That means , that either given topic haven 't initiated deletion
// or there is at least one failed replica ( which means topic deletion should be retried ) .
if ( controllerContext . isAnyReplicaInState ( topic , ReplicaDeletionIneligible ) ) {
retryDeletionForIneligibleReplicas ( topic )
topicsEligibleForRetry += topic
}
}
// Try delete topic if it is eligible for deletion .
// Add topic to the eligible set if it is eligible for deletion .
if ( isTopicEligibleForDeletion ( topic ) ) {
info ( s" Deletion of topic $topic (re)started " )
// topic deletion will be kicked off
onTopicDeletion ( Set ( topic ) )
topicsEligibleForDeletion += topic
}
}
// topic deletion retry will be kicked off
if ( topicsEligibleForRetry . nonEmpty ) {
retryDeletionForIneligibleReplicas ( topicsEligibleForRetry )
}
// topic deletion will be kicked off
if ( topicsEligibleForDeletion . nonEmpty ) {
onTopicDeletion ( topicsEligibleForDeletion )
}
}
}