@ -49,7 +49,7 @@ import scala.jdk.CollectionConverters._
@@ -49,7 +49,7 @@ import scala.jdk.CollectionConverters._
* Note that ISR state changes can still be initiated by the controller and sent to the partitions via LeaderAndIsr
* requests .
*/
trait AlterIsr Manager {
trait AlterPartition Manager {
def start ( ) : Unit = { }
def shutdown ( ) : Unit = { }
@ -61,12 +61,14 @@ trait AlterIsrManager {
@@ -61,12 +61,14 @@ trait AlterIsrManager {
) : CompletableFuture [ LeaderAndIsr ]
}
case class AlterIsrItem ( topicPartition : TopicPartition ,
leaderAndIsr : LeaderAndIsr ,
future : CompletableFuture [ LeaderAndIsr ] ,
controllerEpoch : Int ) // controllerEpoch needed for Zk impl
case class AlterPartitionItem (
topicPartition : TopicPartition ,
leaderAndIsr : LeaderAndIsr ,
future : CompletableFuture [ LeaderAndIsr ] ,
controllerEpoch : Int // controllerEpoch needed for `ZkAlterPartitionManager`
)
object AlterIsrManager {
object AlterPartition Manager {
/* *
* Factory to AlterPartition based implementation , used when IBP >= 2.7 - IV2
@ -79,7 +81,7 @@ object AlterIsrManager {
@@ -79,7 +81,7 @@ object AlterIsrManager {
metrics : Metrics ,
threadNamePrefix : Option [ String ] ,
brokerEpochSupplier : ( ) => Long
) : AlterIsr Manager = {
) : AlterPartition Manager = {
val nodeProvider = MetadataCacheControllerNodeProvider ( config , metadataCache )
val channelManager = BrokerToControllerChannelManager (
@ -91,7 +93,7 @@ object AlterIsrManager {
@@ -91,7 +93,7 @@ object AlterIsrManager {
threadNamePrefix = threadNamePrefix ,
retryTimeoutMs = Long . MaxValue
)
new DefaultAlterIsr Manager (
new DefaultAlterPartition Manager (
controllerChannelManager = channelManager ,
scheduler = scheduler ,
time = time ,
@ -108,23 +110,23 @@ object AlterIsrManager {
@@ -108,23 +110,23 @@ object AlterIsrManager {
scheduler : Scheduler ,
time : Time ,
zkClient : KafkaZkClient
) : AlterIsr Manager = {
new ZkIsr Manager ( scheduler , time , zkClient )
) : AlterPartition Manager = {
new ZkAlterPartition Manager ( scheduler , time , zkClient )
}
}
class DefaultAlterIsr Manager (
class DefaultAlterPartition Manager (
val controllerChannelManager : BrokerToControllerChannelManager ,
val scheduler : Scheduler ,
val time : Time ,
val brokerId : Int ,
val brokerEpochSupplier : ( ) => Long ,
ibpVersion : ApiVersion
) extends AlterIsr Manager with Logging with KafkaMetricsGroup {
) extends AlterPartition Manager with Logging with KafkaMetricsGroup {
// Used to allow only one pending ISR update per partition ( visible for testing )
private [ server ] val unsentIsrUpdates : util.Map [ TopicPartition , AlterIsr Item ] = new ConcurrentHashMap [ TopicPartition , AlterIsr Item ] ( )
private [ server ] val unsentIsrUpdates : util.Map [ TopicPartition , AlterPartition Item ] = new ConcurrentHashMap [ TopicPartition , AlterPartition Item ] ( )
// Used to allow only one in - flight request at a time
private val inflightRequest : AtomicBoolean = new AtomicBoolean ( false )
@ -143,8 +145,8 @@ class DefaultAlterIsrManager(
@@ -143,8 +145,8 @@ class DefaultAlterIsrManager(
controllerEpoch : Int
) : CompletableFuture [ LeaderAndIsr ] = {
val future = new CompletableFuture [ LeaderAndIsr ] ( )
val alterIsrItem = AlterIsr Item ( topicPartition , leaderAndIsr , future , controllerEpoch )
val enqueued = unsentIsrUpdates . putIfAbsent ( alterIsr Item . topicPartition , alterIsr Item ) == null
val alterPartitionItem = AlterPartition Item ( topicPartition , leaderAndIsr , future , controllerEpoch )
val enqueued = unsentIsrUpdates . putIfAbsent ( alterPartition Item . topicPartition , alterPartition Item ) == null
if ( enqueued ) {
maybePropagateIsrChanges ( )
} else {
@ -158,9 +160,9 @@ class DefaultAlterIsrManager(
@@ -158,9 +160,9 @@ class DefaultAlterIsrManager(
// Send all pending items if there is not already a request in - flight .
if ( ! unsentIsrUpdates . isEmpty && inflightRequest . compareAndSet ( false , true ) ) {
// Copy current unsent ISRs but don 't remove from the map , they get cleared in the response handler
val inflightAlterIsr Items = new ListBuffer [ AlterIsr Item ] ( )
unsentIsrUpdates . values . forEach ( item => inflightAlterIsr Items . append ( item ) )
sendRequest ( inflightAlterIsr Items . toSeq )
val inflightAlterPartition Items = new ListBuffer [ AlterPartition Item ] ( )
unsentIsrUpdates . values . forEach ( item => inflightAlterPartition Items . append ( item ) )
sendRequest ( inflightAlterPartition Items . toSeq )
}
}
@ -170,8 +172,8 @@ class DefaultAlterIsrManager(
@@ -170,8 +172,8 @@ class DefaultAlterIsrManager(
}
}
private def sendRequest ( inflightAlterIsr Items : Seq [ AlterIsr Item ] ) : Unit = {
val message = buildRequest ( inflightAlterIsr Items )
private def sendRequest ( inflightAlterPartition Items : Seq [ AlterPartition Item ] ) : Unit = {
val message = buildRequest ( inflightAlterPartition Items )
debug ( s" Sending AlterPartition to controller $message " )
// We will not timeout AlterPartition request , instead letting it retry indefinitely
@ -192,7 +194,7 @@ class DefaultAlterIsrManager(
@@ -192,7 +194,7 @@ class DefaultAlterIsrManager(
Errors . UNSUPPORTED_VERSION
} else {
val body = response . responseBody ( ) . asInstanceOf [ AlterPartitionResponse ]
handleAlterPartitionResponse ( body , message . brokerEpoch , inflightAlterIsr Items )
handleAlterPartitionResponse ( body , message . brokerEpoch , inflightAlterPartition Items )
}
} finally {
// clear the flag so future requests can proceed
@ -216,16 +218,16 @@ class DefaultAlterIsrManager(
@@ -216,16 +218,16 @@ class DefaultAlterIsrManager(
} )
}
private def buildRequest ( inflightAlterIsr Items : Seq [ AlterIsr Item ] ) : AlterPartitionRequestData = {
private def buildRequest ( inflightAlterPartition Items : Seq [ AlterPartition Item ] ) : AlterPartitionRequestData = {
val message = new AlterPartitionRequestData ( )
. setBrokerId ( brokerId )
. setBrokerEpoch ( brokerEpochSupplier . apply ( ) )
inflightAlterIsr Items . groupBy ( _ . topicPartition . topic ) . foreach { case ( topic , items ) =>
inflightAlterPartition Items . groupBy ( _ . topicPartition . topic ) . foreach { case ( topic , items ) =>
val topicData = new AlterPartitionRequestData . TopicData ( )
. setName ( topic )
message . topics . add ( topicData )
items . foreach { item =>
items . foreach { item =>
val partitionData = new AlterPartitionRequestData . PartitionData ( )
. setPartitionIndex ( item . topicPartition . partition )
. setLeaderEpoch ( item . leaderAndIsr . leaderEpoch )
@ -245,7 +247,7 @@ class DefaultAlterIsrManager(
@@ -245,7 +247,7 @@ class DefaultAlterIsrManager(
def handleAlterPartitionResponse (
alterPartitionResp : AlterPartitionResponse ,
sentBrokerEpoch : Long ,
inflightAlterIsr Items : Seq [ AlterIsr Item ]
inflightAlterPartition Items : Seq [ AlterPartition Item ]
) : Errors = {
val data = alterPartitionResp . data
@ -291,21 +293,21 @@ class DefaultAlterIsrManager(
@@ -291,21 +293,21 @@ class DefaultAlterIsrManager(
// Iterate across the items we sent rather than what we received to ensure we run the callback even if a
// partition was somehow erroneously excluded from the response . Note that these callbacks are run from
// the leaderIsrUpdateLock write lock in Partition # sendAlterPartitionRequest
inflightAlterIsr Items . foreach { inflightAlterIsr =>
partitionResponses . get ( inflightAlterIsr . topicPartition ) match {
inflightAlterPartition Items . foreach { inflightAlterPartition =>
partitionResponses . get ( inflightAlterPartition . topicPartition ) match {
case Some ( leaderAndIsrOrError ) =>
try {
leaderAndIsrOrError match {
case Left ( error ) => inflightAlterIsr . future . completeExceptionally ( error . exception )
case Right ( leaderAndIsr ) => inflightAlterIsr . future . complete ( leaderAndIsr )
case Left ( error ) => inflightAlterPartition . future . completeExceptionally ( error . exception )
case Right ( leaderAndIsr ) => inflightAlterPartition . future . complete ( leaderAndIsr )
}
} finally {
// Regardless of callback outcome , we need to clear from the unsent updates map to unblock further updates
unsentIsrUpdates . remove ( inflightAlterIsr . topicPartition )
unsentIsrUpdates . remove ( inflightAlterPartition . topicPartition )
}
case None =>
// Don 't remove this partition from the update map so it will get re - sent
warn ( s" Partition ${ inflightAlterIsr . topicPartition } was sent but not included in the response " )
warn ( s" Partition ${ inflightAlterPartition . topicPartition } was sent but not included in the response " )
}
}