diff --git a/clients/src/test/java/org/apache/kafka/common/protocol/ApiKeysTest.java b/clients/src/test/java/org/apache/kafka/common/protocol/ApiKeysTest.java index 7e0a6e438e9..1aa420b36f0 100644 --- a/clients/src/test/java/org/apache/kafka/common/protocol/ApiKeysTest.java +++ b/clients/src/test/java/org/apache/kafka/common/protocol/ApiKeysTest.java @@ -44,7 +44,7 @@ public class ApiKeysTest { } @Test - public void testAlterIsrIsClusterAction() { + public void testAlterPartitionIsClusterAction() { assertTrue(ApiKeys.ALTER_PARTITION.clusterAction); } diff --git a/core/src/main/java/kafka/server/builders/ReplicaManagerBuilder.java b/core/src/main/java/kafka/server/builders/ReplicaManagerBuilder.java index a0051784b4d..a1339264bf0 100644 --- a/core/src/main/java/kafka/server/builders/ReplicaManagerBuilder.java +++ b/core/src/main/java/kafka/server/builders/ReplicaManagerBuilder.java @@ -18,7 +18,7 @@ package kafka.server.builders; import kafka.log.LogManager; -import kafka.server.AlterIsrManager; +import kafka.server.AlterPartitionManager; import kafka.server.BrokerTopicStats; import kafka.server.DelayedDeleteRecords; import kafka.server.DelayedElectLeader; @@ -50,7 +50,7 @@ public class ReplicaManagerBuilder { private QuotaManagers quotaManagers = null; private MetadataCache metadataCache = null; private LogDirFailureChannel logDirFailureChannel = null; - private AlterIsrManager alterIsrManager = null; + private AlterPartitionManager alterPartitionManager = null; private BrokerTopicStats brokerTopicStats = new BrokerTopicStats(); private AtomicBoolean isShuttingDown = new AtomicBoolean(false); private Optional zkClient = Optional.empty(); @@ -100,8 +100,8 @@ public class ReplicaManagerBuilder { return this; } - public ReplicaManagerBuilder setAlterIsrManager(AlterIsrManager alterIsrManager) { - this.alterIsrManager = alterIsrManager; + public ReplicaManagerBuilder setAlterPartitionManager(AlterPartitionManager alterPartitionManager) { + this.alterPartitionManager = alterPartitionManager; return this; } @@ -151,7 +151,7 @@ public class ReplicaManagerBuilder { if (logManager == null) throw new RuntimeException("You must set logManager"); if (metadataCache == null) throw new RuntimeException("You must set metadataCache"); if (logDirFailureChannel == null) throw new RuntimeException("You must set logDirFailureChannel"); - if (alterIsrManager == null) throw new RuntimeException("You must set alterIsrManager"); + if (alterPartitionManager == null) throw new RuntimeException("You must set alterIsrManager"); return new ReplicaManager(config, metrics, time, @@ -160,7 +160,7 @@ public class ReplicaManagerBuilder { quotaManagers, metadataCache, logDirFailureChannel, - alterIsrManager, + alterPartitionManager, brokerTopicStats, isShuttingDown, OptionConverters.toScala(zkClient), diff --git a/core/src/main/scala/kafka/cluster/Partition.scala b/core/src/main/scala/kafka/cluster/Partition.scala index f27a9cb5581..0d1e5de0cc8 100755 --- a/core/src/main/scala/kafka/cluster/Partition.scala +++ b/core/src/main/scala/kafka/cluster/Partition.scala @@ -46,9 +46,9 @@ import org.apache.kafka.metadata.LeaderRecoveryState import scala.collection.{Map, Seq} import scala.jdk.CollectionConverters._ -trait IsrChangeListener { - def markExpand(): Unit - def markShrink(): Unit +trait AlterPartitionListener { + def markIsrExpand(): Unit + def markIsrShrink(): Unit def markFailed(): Unit } @@ -72,12 +72,12 @@ object Partition extends KafkaMetricsGroup { time: Time, replicaManager: ReplicaManager): Partition = { - val isrChangeListener = new IsrChangeListener { - override def markExpand(): Unit = { + val isrChangeListener = new AlterPartitionListener { + override def markIsrExpand(): Unit = { replicaManager.isrExpandRate.mark() } - override def markShrink(): Unit = { + override def markIsrShrink(): Unit = { replicaManager.isrShrinkRate.mark() } @@ -95,11 +95,11 @@ object Partition extends KafkaMetricsGroup { interBrokerProtocolVersion = replicaManager.config.interBrokerProtocolVersion, localBrokerId = replicaManager.config.brokerId, time = time, - isrChangeListener = isrChangeListener, + alterPartitionListener = isrChangeListener, delayedOperations = delayedOperations, metadataCache = replicaManager.metadataCache, logManager = replicaManager.logManager, - alterIsrManager = replicaManager.alterIsrManager) + alterIsrManager = replicaManager.alterPartitionManager) } def removeMetrics(topicPartition: TopicPartition): Unit = { @@ -235,11 +235,11 @@ class Partition(val topicPartition: TopicPartition, interBrokerProtocolVersion: ApiVersion, localBrokerId: Int, time: Time, - isrChangeListener: IsrChangeListener, + alterPartitionListener: AlterPartitionListener, delayedOperations: DelayedOperations, metadataCache: MetadataCache, logManager: LogManager, - alterIsrManager: AlterIsrManager) extends Logging with KafkaMetricsGroup { + alterIsrManager: AlterPartitionManager) extends Logging with KafkaMetricsGroup { def topic: String = topicPartition.topic def partitionId: Int = topicPartition.partition @@ -1417,14 +1417,14 @@ class Partition(val topicPartition: TopicPartition, * to the KRaft metadata log). * * @param proposedIsrState The ISR state change that was requested - * @param error The error returned from [[AlterIsrManager]] + * @param error The error returned from [[AlterPartitionManager]] * @return true if the `AlterPartition` request should be retried, false otherwise */ private def handleAlterPartitionError( proposedIsrState: PendingPartitionChange, error: Errors ): Boolean = { - isrChangeListener.markFailed() + alterPartitionListener.markFailed() error match { case Errors.OPERATION_NOT_ATTEMPTED => // Since the operation was not attempted, it is safe to reset back to the committed state. @@ -1465,11 +1465,11 @@ class Partition(val topicPartition: TopicPartition, // Success from controller, still need to check a few things if (leaderAndIsr.leaderEpoch != leaderEpoch) { debug(s"Ignoring new ISR $leaderAndIsr since we have a stale leader epoch $leaderEpoch.") - isrChangeListener.markFailed() + alterPartitionListener.markFailed() false } else if (leaderAndIsr.partitionEpoch < partitionEpoch) { debug(s"Ignoring new ISR $leaderAndIsr since we have a newer version $partitionEpoch.") - isrChangeListener.markFailed() + alterPartitionListener.markFailed() false } else { // This is one of two states: @@ -1482,8 +1482,8 @@ class Partition(val topicPartition: TopicPartition, info(s"ISR updated to ${partitionState.isr.mkString(",")} and version updated to $partitionEpoch") proposedIsrState match { - case PendingExpandIsr(_, _, _) => isrChangeListener.markExpand() - case PendingShrinkIsr(_, _, _) => isrChangeListener.markShrink() + case PendingExpandIsr(_, _, _) => alterPartitionListener.markIsrExpand() + case PendingShrinkIsr(_, _, _) => alterPartitionListener.markIsrShrink() } // we may need to increment high watermark since ISR could be down to 1 diff --git a/core/src/main/scala/kafka/server/AlterIsrManager.scala b/core/src/main/scala/kafka/server/AlterPartitionManager.scala similarity index 84% rename from core/src/main/scala/kafka/server/AlterIsrManager.scala rename to core/src/main/scala/kafka/server/AlterPartitionManager.scala index ae9ef007b84..8f5e4438c7e 100644 --- a/core/src/main/scala/kafka/server/AlterIsrManager.scala +++ b/core/src/main/scala/kafka/server/AlterPartitionManager.scala @@ -49,7 +49,7 @@ import scala.jdk.CollectionConverters._ * Note that ISR state changes can still be initiated by the controller and sent to the partitions via LeaderAndIsr * requests. */ -trait AlterIsrManager { +trait AlterPartitionManager { def start(): Unit = {} def shutdown(): Unit = {} @@ -61,12 +61,14 @@ trait AlterIsrManager { ): CompletableFuture[LeaderAndIsr] } -case class AlterIsrItem(topicPartition: TopicPartition, - leaderAndIsr: LeaderAndIsr, - future: CompletableFuture[LeaderAndIsr], - controllerEpoch: Int) // controllerEpoch needed for Zk impl +case class AlterPartitionItem( + topicPartition: TopicPartition, + leaderAndIsr: LeaderAndIsr, + future: CompletableFuture[LeaderAndIsr], + controllerEpoch: Int // controllerEpoch needed for `ZkAlterPartitionManager` +) -object AlterIsrManager { +object AlterPartitionManager { /** * Factory to AlterPartition based implementation, used when IBP >= 2.7-IV2 @@ -79,7 +81,7 @@ object AlterIsrManager { metrics: Metrics, threadNamePrefix: Option[String], brokerEpochSupplier: () => Long - ): AlterIsrManager = { + ): AlterPartitionManager = { val nodeProvider = MetadataCacheControllerNodeProvider(config, metadataCache) val channelManager = BrokerToControllerChannelManager( @@ -91,7 +93,7 @@ object AlterIsrManager { threadNamePrefix = threadNamePrefix, retryTimeoutMs = Long.MaxValue ) - new DefaultAlterIsrManager( + new DefaultAlterPartitionManager( controllerChannelManager = channelManager, scheduler = scheduler, time = time, @@ -108,23 +110,23 @@ object AlterIsrManager { scheduler: Scheduler, time: Time, zkClient: KafkaZkClient - ): AlterIsrManager = { - new ZkIsrManager(scheduler, time, zkClient) + ): AlterPartitionManager = { + new ZkAlterPartitionManager(scheduler, time, zkClient) } } -class DefaultAlterIsrManager( +class DefaultAlterPartitionManager( val controllerChannelManager: BrokerToControllerChannelManager, val scheduler: Scheduler, val time: Time, val brokerId: Int, val brokerEpochSupplier: () => Long, ibpVersion: ApiVersion -) extends AlterIsrManager with Logging with KafkaMetricsGroup { +) extends AlterPartitionManager with Logging with KafkaMetricsGroup { // Used to allow only one pending ISR update per partition (visible for testing) - private[server] val unsentIsrUpdates: util.Map[TopicPartition, AlterIsrItem] = new ConcurrentHashMap[TopicPartition, AlterIsrItem]() + private[server] val unsentIsrUpdates: util.Map[TopicPartition, AlterPartitionItem] = new ConcurrentHashMap[TopicPartition, AlterPartitionItem]() // Used to allow only one in-flight request at a time private val inflightRequest: AtomicBoolean = new AtomicBoolean(false) @@ -143,8 +145,8 @@ class DefaultAlterIsrManager( controllerEpoch: Int ): CompletableFuture[LeaderAndIsr] = { val future = new CompletableFuture[LeaderAndIsr]() - val alterIsrItem = AlterIsrItem(topicPartition, leaderAndIsr, future, controllerEpoch) - val enqueued = unsentIsrUpdates.putIfAbsent(alterIsrItem.topicPartition, alterIsrItem) == null + val alterPartitionItem = AlterPartitionItem(topicPartition, leaderAndIsr, future, controllerEpoch) + val enqueued = unsentIsrUpdates.putIfAbsent(alterPartitionItem.topicPartition, alterPartitionItem) == null if (enqueued) { maybePropagateIsrChanges() } else { @@ -158,9 +160,9 @@ class DefaultAlterIsrManager( // Send all pending items if there is not already a request in-flight. if (!unsentIsrUpdates.isEmpty && inflightRequest.compareAndSet(false, true)) { // Copy current unsent ISRs but don't remove from the map, they get cleared in the response handler - val inflightAlterIsrItems = new ListBuffer[AlterIsrItem]() - unsentIsrUpdates.values.forEach(item => inflightAlterIsrItems.append(item)) - sendRequest(inflightAlterIsrItems.toSeq) + val inflightAlterPartitionItems = new ListBuffer[AlterPartitionItem]() + unsentIsrUpdates.values.forEach(item => inflightAlterPartitionItems.append(item)) + sendRequest(inflightAlterPartitionItems.toSeq) } } @@ -170,8 +172,8 @@ class DefaultAlterIsrManager( } } - private def sendRequest(inflightAlterIsrItems: Seq[AlterIsrItem]): Unit = { - val message = buildRequest(inflightAlterIsrItems) + private def sendRequest(inflightAlterPartitionItems: Seq[AlterPartitionItem]): Unit = { + val message = buildRequest(inflightAlterPartitionItems) debug(s"Sending AlterPartition to controller $message") // We will not timeout AlterPartition request, instead letting it retry indefinitely @@ -192,7 +194,7 @@ class DefaultAlterIsrManager( Errors.UNSUPPORTED_VERSION } else { val body = response.responseBody().asInstanceOf[AlterPartitionResponse] - handleAlterPartitionResponse(body, message.brokerEpoch, inflightAlterIsrItems) + handleAlterPartitionResponse(body, message.brokerEpoch, inflightAlterPartitionItems) } } finally { // clear the flag so future requests can proceed @@ -216,16 +218,16 @@ class DefaultAlterIsrManager( }) } - private def buildRequest(inflightAlterIsrItems: Seq[AlterIsrItem]): AlterPartitionRequestData = { + private def buildRequest(inflightAlterPartitionItems: Seq[AlterPartitionItem]): AlterPartitionRequestData = { val message = new AlterPartitionRequestData() .setBrokerId(brokerId) .setBrokerEpoch(brokerEpochSupplier.apply()) - inflightAlterIsrItems.groupBy(_.topicPartition.topic).foreach { case (topic, items) => + inflightAlterPartitionItems.groupBy(_.topicPartition.topic).foreach { case (topic, items) => val topicData = new AlterPartitionRequestData.TopicData() .setName(topic) message.topics.add(topicData) - items.foreach { item => + items.foreach { item => val partitionData = new AlterPartitionRequestData.PartitionData() .setPartitionIndex(item.topicPartition.partition) .setLeaderEpoch(item.leaderAndIsr.leaderEpoch) @@ -245,7 +247,7 @@ class DefaultAlterIsrManager( def handleAlterPartitionResponse( alterPartitionResp: AlterPartitionResponse, sentBrokerEpoch: Long, - inflightAlterIsrItems: Seq[AlterIsrItem] + inflightAlterPartitionItems: Seq[AlterPartitionItem] ): Errors = { val data = alterPartitionResp.data @@ -291,21 +293,21 @@ class DefaultAlterIsrManager( // Iterate across the items we sent rather than what we received to ensure we run the callback even if a // partition was somehow erroneously excluded from the response. Note that these callbacks are run from // the leaderIsrUpdateLock write lock in Partition#sendAlterPartitionRequest - inflightAlterIsrItems.foreach { inflightAlterIsr => - partitionResponses.get(inflightAlterIsr.topicPartition) match { + inflightAlterPartitionItems.foreach { inflightAlterPartition => + partitionResponses.get(inflightAlterPartition.topicPartition) match { case Some(leaderAndIsrOrError) => try { leaderAndIsrOrError match { - case Left(error) => inflightAlterIsr.future.completeExceptionally(error.exception) - case Right(leaderAndIsr) => inflightAlterIsr.future.complete(leaderAndIsr) + case Left(error) => inflightAlterPartition.future.completeExceptionally(error.exception) + case Right(leaderAndIsr) => inflightAlterPartition.future.complete(leaderAndIsr) } } finally { // Regardless of callback outcome, we need to clear from the unsent updates map to unblock further updates - unsentIsrUpdates.remove(inflightAlterIsr.topicPartition) + unsentIsrUpdates.remove(inflightAlterPartition.topicPartition) } case None => // Don't remove this partition from the update map so it will get re-sent - warn(s"Partition ${inflightAlterIsr.topicPartition} was sent but not included in the response") + warn(s"Partition ${inflightAlterPartition.topicPartition} was sent but not included in the response") } } diff --git a/core/src/main/scala/kafka/server/BrokerServer.scala b/core/src/main/scala/kafka/server/BrokerServer.scala index 8fdc59d9453..5447e298636 100644 --- a/core/src/main/scala/kafka/server/BrokerServer.scala +++ b/core/src/main/scala/kafka/server/BrokerServer.scala @@ -129,7 +129,7 @@ class BrokerServer( @volatile var featureCache: FinalizedFeatureCache = null - var alterIsrManager: AlterIsrManager = null + var alterIsrManager: AlterPartitionManager = null var autoTopicCreationManager: AutoTopicCreationManager = null @@ -250,7 +250,7 @@ class BrokerServer( threadNamePrefix, retryTimeoutMs = Long.MaxValue ) - alterIsrManager = new DefaultAlterIsrManager( + alterIsrManager = new DefaultAlterPartitionManager( controllerChannelManager = alterIsrChannelManager, scheduler = kafkaScheduler, time = time, @@ -269,7 +269,7 @@ class BrokerServer( quotaManagers = quotaManagers, metadataCache = metadataCache, logDirFailureChannel = logDirFailureChannel, - alterIsrManager = alterIsrManager, + alterPartitionManager = alterIsrManager, brokerTopicStats = brokerTopicStats, isShuttingDown = isShuttingDown, zkClient = None, diff --git a/core/src/main/scala/kafka/server/KafkaServer.scala b/core/src/main/scala/kafka/server/KafkaServer.scala index d006165577e..78ec415c3bc 100755 --- a/core/src/main/scala/kafka/server/KafkaServer.scala +++ b/core/src/main/scala/kafka/server/KafkaServer.scala @@ -140,7 +140,7 @@ class KafkaServer( var clientToControllerChannelManager: BrokerToControllerChannelManager = null - var alterIsrManager: AlterIsrManager = null + var alterIsrManager: AlterPartitionManager = null var kafkaScheduler: KafkaScheduler = null @@ -311,7 +311,7 @@ class KafkaServer( // Start alter partition manager based on the IBP version alterIsrManager = if (config.interBrokerProtocolVersion.isAlterIsrSupported) { - AlterIsrManager( + AlterPartitionManager( config = config, metadataCache = metadataCache, scheduler = kafkaScheduler, @@ -321,7 +321,7 @@ class KafkaServer( brokerEpochSupplier = () => kafkaController.brokerEpoch ) } else { - AlterIsrManager(kafkaScheduler, time, zkClient) + AlterPartitionManager(kafkaScheduler, time, zkClient) } alterIsrManager.start() @@ -479,7 +479,7 @@ class KafkaServer( quotaManagers = quotaManagers, metadataCache = metadataCache, logDirFailureChannel = logDirFailureChannel, - alterIsrManager = alterIsrManager, + alterPartitionManager = alterIsrManager, brokerTopicStats = brokerTopicStats, isShuttingDown = isShuttingDown, zkClient = Some(zkClient), diff --git a/core/src/main/scala/kafka/server/ReplicaManager.scala b/core/src/main/scala/kafka/server/ReplicaManager.scala index d72c7351f67..dac8313be34 100644 --- a/core/src/main/scala/kafka/server/ReplicaManager.scala +++ b/core/src/main/scala/kafka/server/ReplicaManager.scala @@ -190,7 +190,7 @@ class ReplicaManager(val config: KafkaConfig, quotaManagers: QuotaManagers, val metadataCache: MetadataCache, logDirFailureChannel: LogDirFailureChannel, - val alterIsrManager: AlterIsrManager, + val alterPartitionManager: AlterPartitionManager, val brokerTopicStats: BrokerTopicStats = new BrokerTopicStats(), val isShuttingDown: AtomicBoolean = new AtomicBoolean(false), val zkClient: Option[KafkaZkClient] = None, diff --git a/core/src/main/scala/kafka/server/ZkIsrManager.scala b/core/src/main/scala/kafka/server/ZkAlterPartitionManager.scala similarity index 94% rename from core/src/main/scala/kafka/server/ZkIsrManager.scala rename to core/src/main/scala/kafka/server/ZkAlterPartitionManager.scala index 5549e493275..c906ad6a706 100644 --- a/core/src/main/scala/kafka/server/ZkIsrManager.scala +++ b/core/src/main/scala/kafka/server/ZkAlterPartitionManager.scala @@ -35,7 +35,7 @@ import scala.collection.mutable */ case class IsrChangePropagationConfig(checkIntervalMs: Long, maxDelayMs: Long, lingerMs: Long) -object ZkIsrManager { +object ZkAlterPartitionManager { // This field is mutable to allow overriding change notification behavior in test cases @volatile var DefaultIsrPropagationConfig: IsrChangePropagationConfig = IsrChangePropagationConfig( checkIntervalMs = 2500, @@ -44,9 +44,9 @@ object ZkIsrManager { ) } -class ZkIsrManager(scheduler: Scheduler, time: Time, zkClient: KafkaZkClient) extends AlterIsrManager with Logging { +class ZkAlterPartitionManager(scheduler: Scheduler, time: Time, zkClient: KafkaZkClient) extends AlterPartitionManager with Logging { - private val isrChangeNotificationConfig = ZkIsrManager.DefaultIsrPropagationConfig + private val isrChangeNotificationConfig = ZkAlterPartitionManager.DefaultIsrPropagationConfig // Visible for testing private[server] val isrChangeSet: mutable.Set[TopicPartition] = new mutable.HashSet[TopicPartition]() private val lastIsrChangeMs = new AtomicLong(time.milliseconds()) diff --git a/core/src/test/scala/integration/kafka/admin/ReassignPartitionsIntegrationTest.scala b/core/src/test/scala/integration/kafka/admin/ReassignPartitionsIntegrationTest.scala index 518c8c4b91d..5385b2faa1b 100644 --- a/core/src/test/scala/integration/kafka/admin/ReassignPartitionsIntegrationTest.scala +++ b/core/src/test/scala/integration/kafka/admin/ReassignPartitionsIntegrationTest.scala @@ -21,7 +21,7 @@ import java.io.Closeable import java.util.{Collections, HashMap, List} import kafka.admin.ReassignPartitionsCommand._ import kafka.api.KAFKA_2_7_IV1 -import kafka.server.{IsrChangePropagationConfig, KafkaConfig, KafkaServer, ZkIsrManager} +import kafka.server.{IsrChangePropagationConfig, KafkaConfig, KafkaServer, ZkAlterPartitionManager} import kafka.utils.Implicits._ import kafka.utils.TestUtils import kafka.server.QuorumTestHarness @@ -83,7 +83,7 @@ class ReassignPartitionsIntegrationTest extends QuorumTestHarness { // Override change notification settings so that test is not delayed by ISR // change notification delay - ZkIsrManager.DefaultIsrPropagationConfig = IsrChangePropagationConfig( + ZkAlterPartitionManager.DefaultIsrPropagationConfig = IsrChangePropagationConfig( checkIntervalMs = 500, lingerMs = 100, maxDelayMs = 500 diff --git a/core/src/test/scala/unit/kafka/cluster/AbstractPartitionTest.scala b/core/src/test/scala/unit/kafka/cluster/AbstractPartitionTest.scala index d59e4ab3877..52addbc82bd 100644 --- a/core/src/test/scala/unit/kafka/cluster/AbstractPartitionTest.scala +++ b/core/src/test/scala/unit/kafka/cluster/AbstractPartitionTest.scala @@ -21,7 +21,7 @@ import kafka.log.{CleanerConfig, LogConfig, LogManager} import kafka.server.{Defaults, MetadataCache} import kafka.server.checkpoints.OffsetCheckpoints import kafka.server.metadata.MockConfigRepository -import kafka.utils.TestUtils.{MockAlterIsrManager, MockIsrChangeListener} +import kafka.utils.TestUtils.{MockAlterPartitionManager, MockAlterPartitionListener} import kafka.utils.{MockTime, TestUtils} import org.apache.kafka.common.TopicPartition import org.apache.kafka.common.message.LeaderAndIsrRequestData.LeaderAndIsrPartitionState @@ -49,8 +49,8 @@ class AbstractPartitionTest { var logDir1: File = _ var logDir2: File = _ var logManager: LogManager = _ - var alterIsrManager: MockAlterIsrManager = _ - var isrChangeListener: MockIsrChangeListener = _ + var alterIsrManager: MockAlterPartitionManager = _ + var isrChangeListener: MockAlterPartitionListener = _ var logConfig: LogConfig = _ var configRepository: MockConfigRepository = _ val delayedOperations: DelayedOperations = mock(classOf[DelayedOperations]) diff --git a/core/src/test/scala/unit/kafka/cluster/PartitionLockTest.scala b/core/src/test/scala/unit/kafka/cluster/PartitionLockTest.scala index d6a8649b2a2..50bbe18f4f2 100644 --- a/core/src/test/scala/unit/kafka/cluster/PartitionLockTest.scala +++ b/core/src/test/scala/unit/kafka/cluster/PartitionLockTest.scala @@ -253,11 +253,11 @@ class PartitionLockTest extends Logging { private def setupPartitionWithMocks(logManager: LogManager): Partition = { val leaderEpoch = 1 val brokerId = 0 - val isrChangeListener: IsrChangeListener = mock(classOf[IsrChangeListener]) + val isrChangeListener: AlterPartitionListener = mock(classOf[AlterPartitionListener]) val delayedOperations: DelayedOperations = mock(classOf[DelayedOperations]) val metadataCache: MetadataCache = mock(classOf[MetadataCache]) val offsetCheckpoints: OffsetCheckpoints = mock(classOf[OffsetCheckpoints]) - val alterIsrManager: AlterIsrManager = mock(classOf[AlterIsrManager]) + val alterIsrManager: AlterPartitionManager = mock(classOf[AlterPartitionManager]) logManager.startup(Set.empty) val partition = new Partition(topicPartition, diff --git a/core/src/test/scala/unit/kafka/cluster/PartitionTest.scala b/core/src/test/scala/unit/kafka/cluster/PartitionTest.scala index ec0ad044f70..dcf91ad2d27 100644 --- a/core/src/test/scala/unit/kafka/cluster/PartitionTest.scala +++ b/core/src/test/scala/unit/kafka/cluster/PartitionTest.scala @@ -1734,7 +1734,7 @@ class PartitionTest extends AbstractPartitionTest { .when(kafkaZkClient) .conditionalUpdatePath(anyString(), any(), ArgumentMatchers.eq(1), any()) - val zkIsrManager = AlterIsrManager(scheduler, time, kafkaZkClient) + val zkIsrManager = AlterPartitionManager(scheduler, time, kafkaZkClient) zkIsrManager.start() val partition = new Partition(topicPartition, @@ -1963,8 +1963,8 @@ class PartitionTest extends AbstractPartitionTest { val topicPartition = new TopicPartition("test", 1) val partition = new Partition( topicPartition, 1000, ApiVersion.latestVersion, 0, - new SystemTime(), mock(classOf[IsrChangeListener]), mock(classOf[DelayedOperations]), - mock(classOf[MetadataCache]), mock(classOf[LogManager]), mock(classOf[AlterIsrManager])) + new SystemTime(), mock(classOf[AlterPartitionListener]), mock(classOf[DelayedOperations]), + mock(classOf[MetadataCache]), mock(classOf[LogManager]), mock(classOf[AlterPartitionManager])) val replicas = Seq(0, 1, 2, 3) val isr = Set(0, 1, 2, 3) diff --git a/core/src/test/scala/unit/kafka/server/AlterIsrManagerTest.scala b/core/src/test/scala/unit/kafka/server/AlterPartitionManagerTest.scala similarity index 93% rename from core/src/test/scala/unit/kafka/server/AlterIsrManagerTest.scala rename to core/src/test/scala/unit/kafka/server/AlterPartitionManagerTest.scala index 483a5347e4e..d4a1b35660f 100644 --- a/core/src/test/scala/unit/kafka/server/AlterIsrManagerTest.scala +++ b/core/src/test/scala/unit/kafka/server/AlterPartitionManagerTest.scala @@ -45,7 +45,7 @@ import org.mockito.Mockito.{mock, reset, times, verify} import org.mockito.{ArgumentCaptor, ArgumentMatchers, Mockito} import scala.jdk.CollectionConverters._ -class AlterIsrManagerTest { +class AlterPartitionManagerTest { val topic = "test-topic" val time = new MockTime @@ -67,7 +67,7 @@ class AlterIsrManagerTest { @MethodSource(Array("provideApiVersions")) def testBasic(apiVersion: ApiVersion): Unit = { val scheduler = new MockScheduler(time) - val alterIsrManager = new DefaultAlterIsrManager(brokerToController, scheduler, time, brokerId, () => 2, apiVersion) + val alterIsrManager = new DefaultAlterPartitionManager(brokerToController, scheduler, time, brokerId, () => 2, apiVersion) alterIsrManager.start() alterIsrManager.submit(tp0, new LeaderAndIsr(1, 1, List(1,2,3), LeaderRecoveryState.RECOVERED, 10), 0) verify(brokerToController).start() @@ -83,7 +83,7 @@ class AlterIsrManagerTest { val requestCapture = ArgumentCaptor.forClass(classOf[AbstractRequest.Builder[AlterPartitionRequest]]) val scheduler = new MockScheduler(time) - val alterIsrManager = new DefaultAlterIsrManager(brokerToController, scheduler, time, brokerId, () => 2, apiVersion) + val alterIsrManager = new DefaultAlterPartitionManager(brokerToController, scheduler, time, brokerId, () => 2, apiVersion) alterIsrManager.start() alterIsrManager.submit(tp0, new LeaderAndIsr(1, 1, List(1), leaderRecoveryState, 10), 0) verify(brokerToController).start() @@ -101,7 +101,7 @@ class AlterIsrManagerTest { val callbackCapture: ArgumentCaptor[ControllerRequestCompletionHandler] = ArgumentCaptor.forClass(classOf[ControllerRequestCompletionHandler]) val scheduler = new MockScheduler(time) - val alterIsrManager = new DefaultAlterIsrManager(brokerToController, scheduler, time, brokerId, () => 2, apiVersion) + val alterIsrManager = new DefaultAlterPartitionManager(brokerToController, scheduler, time, brokerId, () => 2, apiVersion) alterIsrManager.start() // Only send one ISR update for a given topic+partition @@ -139,7 +139,7 @@ class AlterIsrManagerTest { val callbackCapture: ArgumentCaptor[ControllerRequestCompletionHandler] = ArgumentCaptor.forClass(classOf[ControllerRequestCompletionHandler]) val scheduler = new MockScheduler(time) - val alterIsrManager = new DefaultAlterIsrManager(brokerToController, scheduler, time, brokerId, () => 2, apiVersion) + val alterIsrManager = new DefaultAlterPartitionManager(brokerToController, scheduler, time, brokerId, () => 2, apiVersion) alterIsrManager.start() // First request will send batch of one @@ -209,7 +209,7 @@ class AlterIsrManagerTest { val callbackCapture: ArgumentCaptor[ControllerRequestCompletionHandler] = ArgumentCaptor.forClass(classOf[ControllerRequestCompletionHandler]) val scheduler = new MockScheduler(time) - val alterIsrManager = new DefaultAlterIsrManager(brokerToController, scheduler, time, brokerId, () => 2, KAFKA_3_2_IV0) + val alterIsrManager = new DefaultAlterPartitionManager(brokerToController, scheduler, time, brokerId, () => 2, KAFKA_3_2_IV0) alterIsrManager.start() alterIsrManager.submit(tp0, leaderAndIsr, 0) @@ -264,12 +264,12 @@ class AlterIsrManagerTest { assertFalse(future.isDone) } - private def testPartitionError(tp: TopicPartition, error: Errors): AlterIsrManager = { + private def testPartitionError(tp: TopicPartition, error: Errors): AlterPartitionManager = { val callbackCapture: ArgumentCaptor[ControllerRequestCompletionHandler] = ArgumentCaptor.forClass(classOf[ControllerRequestCompletionHandler]) reset(brokerToController) val scheduler = new MockScheduler(time) - val alterIsrManager = new DefaultAlterIsrManager(brokerToController, scheduler, time, brokerId, () => 2, KAFKA_3_2_IV0) + val alterIsrManager = new DefaultAlterPartitionManager(brokerToController, scheduler, time, brokerId, () => 2, KAFKA_3_2_IV0) alterIsrManager.start() val future = alterIsrManager.submit(tp, LeaderAndIsr(1, 1, List(1,2,3), LeaderRecoveryState.RECOVERED, 10), 0) @@ -293,7 +293,7 @@ class AlterIsrManagerTest { val callbackCapture: ArgumentCaptor[ControllerRequestCompletionHandler] = ArgumentCaptor.forClass(classOf[ControllerRequestCompletionHandler]) val scheduler = new MockScheduler(time) - val alterIsrManager = new DefaultAlterIsrManager(brokerToController, scheduler, time, brokerId, () => 2, apiVersion) + val alterIsrManager = new DefaultAlterPartitionManager(brokerToController, scheduler, time, brokerId, () => 2, apiVersion) alterIsrManager.start() // First submit will send the request @@ -322,7 +322,7 @@ class AlterIsrManagerTest { val brokerEpoch = 2 val scheduler = new MockScheduler(time) - val alterIsrManager = new DefaultAlterIsrManager(brokerToController, scheduler, time, brokerId, () => brokerEpoch, apiVersion) + val alterIsrManager = new DefaultAlterPartitionManager(brokerToController, scheduler, time, brokerId, () => brokerEpoch, apiVersion) alterIsrManager.start() def matchesAlterIsr(topicPartitions: Set[TopicPartition]): AbstractRequest.Builder[_ <: AbstractRequest] = { @@ -395,7 +395,7 @@ class AlterIsrManagerTest { .when(kafkaZkClient) .conditionalUpdatePath(anyString(), any(), ArgumentMatchers.eq(3), any()) - val zkIsrManager = new ZkIsrManager(scheduler, time, kafkaZkClient) + val zkIsrManager = new ZkAlterPartitionManager(scheduler, time, kafkaZkClient) zkIsrManager.start() // Correct ZK version @@ -421,7 +421,7 @@ class AlterIsrManagerTest { } } -object AlterIsrManagerTest { +object AlterPartitionManagerTest { def provideApiVersions(): JStream[ApiVersion] = { JStream.of( // Supports KIP-704: unclean leader recovery diff --git a/core/src/test/scala/unit/kafka/server/HighwatermarkPersistenceTest.scala b/core/src/test/scala/unit/kafka/server/HighwatermarkPersistenceTest.scala index e7c5372d6a3..6a862bd2fbd 100755 --- a/core/src/test/scala/unit/kafka/server/HighwatermarkPersistenceTest.scala +++ b/core/src/test/scala/unit/kafka/server/HighwatermarkPersistenceTest.scala @@ -72,7 +72,7 @@ class HighwatermarkPersistenceTest { quotaManagers = quotaManager, metadataCache = MetadataCache.zkMetadataCache(configs.head.brokerId), logDirFailureChannel = logDirFailureChannels.head, - alterIsrManager = alterIsrManager) + alterPartitionManager = alterIsrManager) replicaManager.startup() try { replicaManager.checkpointHighWatermarks() @@ -129,7 +129,7 @@ class HighwatermarkPersistenceTest { quotaManagers = quotaManager, metadataCache = MetadataCache.zkMetadataCache(configs.head.brokerId), logDirFailureChannel = logDirFailureChannels.head, - alterIsrManager = alterIsrManager) + alterPartitionManager = alterIsrManager) replicaManager.startup() try { replicaManager.checkpointHighWatermarks() diff --git a/core/src/test/scala/unit/kafka/server/IsrExpirationTest.scala b/core/src/test/scala/unit/kafka/server/IsrExpirationTest.scala index 6a68217f94d..15ae7a7e8e9 100644 --- a/core/src/test/scala/unit/kafka/server/IsrExpirationTest.scala +++ b/core/src/test/scala/unit/kafka/server/IsrExpirationTest.scala @@ -22,7 +22,7 @@ import java.util.Properties import kafka.cluster.Partition import kafka.log.{LogManager, UnifiedLog} import kafka.server.QuotaFactory.QuotaManagers -import kafka.utils.TestUtils.MockAlterIsrManager +import kafka.utils.TestUtils.MockAlterPartitionManager import kafka.utils._ import org.apache.kafka.common.TopicPartition import org.apache.kafka.common.metrics.Metrics @@ -55,7 +55,7 @@ class IsrExpirationTest { var quotaManager: QuotaManagers = null var replicaManager: ReplicaManager = null - var alterIsrManager: MockAlterIsrManager = _ + var alterIsrManager: MockAlterPartitionManager = _ @BeforeEach def setUp(): Unit = { @@ -73,7 +73,7 @@ class IsrExpirationTest { quotaManagers = quotaManager, metadataCache = MetadataCache.zkMetadataCache(configs.head.brokerId), logDirFailureChannel = new LogDirFailureChannel(configs.head.logDirs.size), - alterIsrManager = alterIsrManager) + alterPartitionManager = alterIsrManager) } @AfterEach diff --git a/core/src/test/scala/unit/kafka/server/KafkaServerTest.scala b/core/src/test/scala/unit/kafka/server/KafkaServerTest.scala index 79bfc241fd6..89cbd04fea1 100755 --- a/core/src/test/scala/unit/kafka/server/KafkaServerTest.scala +++ b/core/src/test/scala/unit/kafka/server/KafkaServerTest.scala @@ -116,8 +116,8 @@ class KafkaServerTest extends QuorumTestHarness { props.put(KafkaConfig.InterBrokerProtocolVersionProp, "2.7-IV1") val server = TestUtils.createServer(KafkaConfig.fromProps(props)) - server.replicaManager.alterIsrManager match { - case _: ZkIsrManager => + server.replicaManager.alterPartitionManager match { + case _: ZkAlterPartitionManager => case _ => fail("Should use ZK for ISR manager in versions before 2.7-IV2") } server.shutdown() @@ -129,8 +129,8 @@ class KafkaServerTest extends QuorumTestHarness { props.put(KafkaConfig.InterBrokerProtocolVersionProp, ApiVersion.latestVersion.toString) val server = TestUtils.createServer(KafkaConfig.fromProps(props)) - server.replicaManager.alterIsrManager match { - case _: DefaultAlterIsrManager => + server.replicaManager.alterPartitionManager match { + case _: DefaultAlterPartitionManager => case _ => fail("Should use AlterIsr for ISR manager in versions after 2.7-IV2") } server.shutdown() diff --git a/core/src/test/scala/unit/kafka/server/ReplicaManagerConcurrencyTest.scala b/core/src/test/scala/unit/kafka/server/ReplicaManagerConcurrencyTest.scala index 34e87958f52..d281788ab81 100644 --- a/core/src/test/scala/unit/kafka/server/ReplicaManagerConcurrencyTest.scala +++ b/core/src/test/scala/unit/kafka/server/ReplicaManagerConcurrencyTest.scala @@ -170,7 +170,7 @@ class ReplicaManagerConcurrencyTest { quotaManagers = QuotaFactory.instantiate(config, metrics, time, ""), metadataCache = MetadataCache.kRaftMetadataCache(config.brokerId), logDirFailureChannel = new LogDirFailureChannel(config.logDirs.size), - alterIsrManager = new MockAlterIsrManager(channel) + alterPartitionManager = new MockAlterPartitionManager(channel) ) { override def createReplicaFetcherManager( metrics: Metrics, @@ -427,7 +427,7 @@ class ReplicaManagerConcurrencyTest { } } - private class MockAlterIsrManager(channel: ControllerChannel) extends AlterIsrManager { + private class MockAlterPartitionManager(channel: ControllerChannel) extends AlterPartitionManager { override def submit( topicPartition: TopicPartition, leaderAndIsr: LeaderAndIsr, diff --git a/core/src/test/scala/unit/kafka/server/ReplicaManagerQuotasTest.scala b/core/src/test/scala/unit/kafka/server/ReplicaManagerQuotasTest.scala index 658575aca67..d7b69bc61d5 100644 --- a/core/src/test/scala/unit/kafka/server/ReplicaManagerQuotasTest.scala +++ b/core/src/test/scala/unit/kafka/server/ReplicaManagerQuotasTest.scala @@ -308,7 +308,7 @@ class ReplicaManagerQuotasTest { when(logManager.getLog(any[TopicPartition], anyBoolean)).thenReturn(Some(log)) when(logManager.liveLogDirs).thenReturn(Array.empty[File]) - val alterIsrManager: AlterIsrManager = mock(classOf[AlterIsrManager]) + val alterIsrManager: AlterPartitionManager = mock(classOf[AlterPartitionManager]) val leaderBrokerId = configs.head.brokerId quotaManager = QuotaFactory.instantiate(configs.head, metrics, time, "") @@ -321,7 +321,7 @@ class ReplicaManagerQuotasTest { quotaManagers = quotaManager, metadataCache = MetadataCache.zkMetadataCache(leaderBrokerId), logDirFailureChannel = new LogDirFailureChannel(configs.head.logDirs.size), - alterIsrManager = alterIsrManager) + alterPartitionManager = alterIsrManager) //create the two replicas for ((p, _) <- fetchInfo) { diff --git a/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala b/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala index 6d01f592592..6c17503a615 100644 --- a/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala +++ b/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala @@ -76,7 +76,7 @@ class ReplicaManagerTest { val time = new MockTime val scheduler = new MockScheduler(time) val metrics = new Metrics - var alterIsrManager: AlterIsrManager = _ + var alterPartitionManager: AlterPartitionManager = _ var config: KafkaConfig = _ var quotaManager: QuotaManagers = _ @@ -90,7 +90,7 @@ class ReplicaManagerTest { def setUp(): Unit = { val props = TestUtils.createBrokerConfig(1, TestUtils.MockZkConnect) config = KafkaConfig.fromProps(props) - alterIsrManager = mock(classOf[AlterIsrManager]) + alterPartitionManager = mock(classOf[AlterPartitionManager]) quotaManager = QuotaFactory.instantiate(config, metrics, time, "") } @@ -113,7 +113,7 @@ class ReplicaManagerTest { quotaManagers = quotaManager, metadataCache = MetadataCache.zkMetadataCache(config.brokerId), logDirFailureChannel = new LogDirFailureChannel(config.logDirs.size), - alterIsrManager = alterIsrManager) + alterPartitionManager = alterPartitionManager) try { val partition = rm.createPartition(new TopicPartition(topic, 1)) partition.createLogIfNotExists(isNew = false, isFutureReplica = false, @@ -140,7 +140,7 @@ class ReplicaManagerTest { quotaManagers = quotaManager, metadataCache = MetadataCache.zkMetadataCache(config.brokerId), logDirFailureChannel = new LogDirFailureChannel(config.logDirs.size), - alterIsrManager = alterIsrManager) + alterPartitionManager = alterPartitionManager) try { val partition = rm.createPartition(new TopicPartition(topic, 1)) partition.createLogIfNotExists(isNew = false, isFutureReplica = false, @@ -164,7 +164,7 @@ class ReplicaManagerTest { quotaManagers = quotaManager, metadataCache = MetadataCache.zkMetadataCache(config.brokerId), logDirFailureChannel = new LogDirFailureChannel(config.logDirs.size), - alterIsrManager = alterIsrManager, + alterPartitionManager = alterPartitionManager, threadNamePrefix = Option(this.getClass.getName)) try { def callback(responseStatus: Map[TopicPartition, PartitionResponse]): Unit = { @@ -219,7 +219,7 @@ class ReplicaManagerTest { quotaManagers = quotaManager, metadataCache = metadataCache, logDirFailureChannel = new LogDirFailureChannel(config.logDirs.size), - alterIsrManager = alterIsrManager) + alterPartitionManager = alterPartitionManager) try { val brokerList = Seq[Integer](0, 1).asJava @@ -2021,7 +2021,7 @@ class ReplicaManagerTest { brokerTopicStats = mockBrokerTopicStats, metadataCache = metadataCache, logDirFailureChannel = mockLogDirFailureChannel, - alterIsrManager = alterIsrManager, + alterPartitionManager = alterPartitionManager, delayedProducePurgatoryParam = Some(mockProducePurgatory), delayedFetchPurgatoryParam = Some(mockFetchPurgatory), delayedDeleteRecordsPurgatoryParam = Some(mockDeleteRecordsPurgatory), @@ -2216,7 +2216,7 @@ class ReplicaManagerTest { quotaManagers = quotaManager, metadataCache = metadataCache, logDirFailureChannel = new LogDirFailureChannel(config.logDirs.size), - alterIsrManager = alterIsrManager, + alterPartitionManager = alterPartitionManager, delayedProducePurgatoryParam = Some(mockProducePurgatory), delayedFetchPurgatoryParam = Some(mockFetchPurgatory), delayedDeleteRecordsPurgatoryParam = Some(mockDeleteRecordsPurgatory), @@ -2461,7 +2461,7 @@ class ReplicaManagerTest { brokerTopicStats = brokerTopicStats1, metadataCache = metadataCache0, logDirFailureChannel = new LogDirFailureChannel(config0.logDirs.size), - alterIsrManager = alterIsrManager) + alterPartitionManager = alterPartitionManager) val rm1 = new ReplicaManager( metrics = metrics, config = config1, @@ -2472,7 +2472,7 @@ class ReplicaManagerTest { brokerTopicStats = brokerTopicStats2, metadataCache = metadataCache1, logDirFailureChannel = new LogDirFailureChannel(config1.logDirs.size), - alterIsrManager = alterIsrManager) + alterPartitionManager = alterPartitionManager) (rm0, rm1) } @@ -2722,7 +2722,7 @@ class ReplicaManagerTest { quotaManagers = quotaManager, metadataCache = MetadataCache.zkMetadataCache(config.brokerId), logDirFailureChannel = new LogDirFailureChannel(config.logDirs.size), - alterIsrManager = alterIsrManager) { + alterPartitionManager = alterPartitionManager) { override def getPartitionOrException(topicPartition: TopicPartition): Partition = { throw Errors.NOT_LEADER_OR_FOLLOWER.exception() } diff --git a/core/src/test/scala/unit/kafka/server/epoch/OffsetsForLeaderEpochTest.scala b/core/src/test/scala/unit/kafka/server/epoch/OffsetsForLeaderEpochTest.scala index 44e17ebb4cc..f4e95207458 100644 --- a/core/src/test/scala/unit/kafka/server/epoch/OffsetsForLeaderEpochTest.scala +++ b/core/src/test/scala/unit/kafka/server/epoch/OffsetsForLeaderEpochTest.scala @@ -72,7 +72,7 @@ class OffsetsForLeaderEpochTest { quotaManagers = quotaManager, metadataCache = MetadataCache.zkMetadataCache(config.brokerId), logDirFailureChannel = new LogDirFailureChannel(config.logDirs.size), - alterIsrManager = alterIsrManager) + alterPartitionManager = alterIsrManager) val partition = replicaManager.createPartition(tp) partition.setLog(mockLog, isFutureLog = false) partition.leaderReplicaIdOpt = Some(config.brokerId) @@ -101,7 +101,7 @@ class OffsetsForLeaderEpochTest { quotaManagers = quotaManager, metadataCache = MetadataCache.zkMetadataCache(config.brokerId), logDirFailureChannel = new LogDirFailureChannel(config.logDirs.size), - alterIsrManager = alterIsrManager) + alterPartitionManager = alterIsrManager) replicaManager.createPartition(tp) //Given @@ -132,7 +132,7 @@ class OffsetsForLeaderEpochTest { quotaManagers = quotaManager, metadataCache = MetadataCache.zkMetadataCache(config.brokerId), logDirFailureChannel = new LogDirFailureChannel(config.logDirs.size), - alterIsrManager = alterIsrManager) + alterPartitionManager = alterIsrManager) //Given val epochRequested: Integer = 5 diff --git a/core/src/test/scala/unit/kafka/utils/TestUtils.scala b/core/src/test/scala/unit/kafka/utils/TestUtils.scala index eecdb10d2b3..877b3b2a230 100755 --- a/core/src/test/scala/unit/kafka/utils/TestUtils.scala +++ b/core/src/test/scala/unit/kafka/utils/TestUtils.scala @@ -32,7 +32,7 @@ import java.util.{Arrays, Collections, Optional, Properties} import com.yammer.metrics.core.{Gauge, Meter} import javax.net.ssl.X509TrustManager import kafka.api._ -import kafka.cluster.{Broker, EndPoint, IsrChangeListener} +import kafka.cluster.{Broker, EndPoint, AlterPartitionListener} import kafka.controller.{ControllerEventManager, LeaderIsrAndControllerEpoch} import kafka.log._ import kafka.network.RequestChannel @@ -1254,8 +1254,8 @@ object TestUtils extends Logging { interBrokerProtocolVersion = interBrokerProtocolVersion) } - class MockAlterIsrManager extends AlterIsrManager { - val isrUpdates: mutable.Queue[AlterIsrItem] = new mutable.Queue[AlterIsrItem]() + class MockAlterPartitionManager extends AlterPartitionManager { + val isrUpdates: mutable.Queue[AlterPartitionItem] = new mutable.Queue[AlterPartitionItem]() val inFlight: AtomicBoolean = new AtomicBoolean(false) @@ -1266,7 +1266,7 @@ object TestUtils extends Logging { ): CompletableFuture[LeaderAndIsr]= { val future = new CompletableFuture[LeaderAndIsr]() if (inFlight.compareAndSet(false, true)) { - isrUpdates += AlterIsrItem(topicPartition, leaderAndIsr, future, controllerEpoch) + isrUpdates += AlterPartitionItem(topicPartition, leaderAndIsr, future, controllerEpoch) } else { future.completeExceptionally(new OperationNotAttemptedException( s"Failed to enqueue AlterIsr request for $topicPartition since there is already an inflight request")) @@ -1293,18 +1293,18 @@ object TestUtils extends Logging { } } - def createAlterIsrManager(): MockAlterIsrManager = { - new MockAlterIsrManager() + def createAlterIsrManager(): MockAlterPartitionManager = { + new MockAlterPartitionManager() } - class MockIsrChangeListener extends IsrChangeListener { + class MockAlterPartitionListener extends AlterPartitionListener { val expands: AtomicInteger = new AtomicInteger(0) val shrinks: AtomicInteger = new AtomicInteger(0) val failures: AtomicInteger = new AtomicInteger(0) - override def markExpand(): Unit = expands.incrementAndGet() + override def markIsrExpand(): Unit = expands.incrementAndGet() - override def markShrink(): Unit = shrinks.incrementAndGet() + override def markIsrShrink(): Unit = shrinks.incrementAndGet() override def markFailed(): Unit = failures.incrementAndGet() @@ -1315,8 +1315,8 @@ object TestUtils extends Logging { } } - def createIsrChangeListener(): MockIsrChangeListener = { - new MockIsrChangeListener() + def createIsrChangeListener(): MockAlterPartitionListener = { + new MockAlterPartitionListener() } def produceMessages[B <: KafkaBroker]( diff --git a/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/fetcher/ReplicaFetcherThreadBenchmark.java b/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/fetcher/ReplicaFetcherThreadBenchmark.java index a857b1164c3..1dec000f3ab 100644 --- a/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/fetcher/ReplicaFetcherThreadBenchmark.java +++ b/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/fetcher/ReplicaFetcherThreadBenchmark.java @@ -21,14 +21,14 @@ import kafka.api.ApiVersion; import kafka.api.ApiVersion$; import kafka.cluster.BrokerEndPoint; import kafka.cluster.DelayedOperations; -import kafka.cluster.IsrChangeListener; +import kafka.cluster.AlterPartitionListener; import kafka.cluster.Partition; import kafka.log.CleanerConfig; import kafka.log.Defaults; import kafka.log.LogAppendInfo; import kafka.log.LogConfig; import kafka.log.LogManager; -import kafka.server.AlterIsrManager; +import kafka.server.AlterPartitionManager; import kafka.server.BrokerTopicStats; import kafka.server.FailedPartitions; import kafka.server.InitialFetchState; @@ -170,12 +170,12 @@ public class ReplicaFetcherThreadBenchmark { .setReplicas(replicas) .setIsNew(true); - IsrChangeListener isrChangeListener = Mockito.mock(IsrChangeListener.class); + AlterPartitionListener alterPartitionListener = Mockito.mock(AlterPartitionListener.class); OffsetCheckpoints offsetCheckpoints = Mockito.mock(OffsetCheckpoints.class); Mockito.when(offsetCheckpoints.fetch(logDir.getAbsolutePath(), tp)).thenReturn(Option.apply(0L)); - AlterIsrManager isrChannelManager = Mockito.mock(AlterIsrManager.class); + AlterPartitionManager isrChannelManager = Mockito.mock(AlterPartitionManager.class); Partition partition = new Partition(tp, 100, ApiVersion$.MODULE$.latestVersion(), - 0, Time.SYSTEM, isrChangeListener, new DelayedOperationsMock(tp), + 0, Time.SYSTEM, alterPartitionListener, new DelayedOperationsMock(tp), Mockito.mock(MetadataCache.class), logManager, isrChannelManager); partition.makeFollower(partitionState, offsetCheckpoints, topicId); @@ -227,7 +227,7 @@ public class ReplicaFetcherThreadBenchmark { setBrokerTopicStats(brokerTopicStats). setMetadataCache(metadataCache). setLogDirFailureChannel(new LogDirFailureChannel(logDirs.size())). - setAlterIsrManager(TestUtils.createAlterIsrManager()). + setAlterPartitionManager(TestUtils.createAlterIsrManager()). build(); fetcher = new ReplicaFetcherBenchThread(config, replicaManager, pool); fetcher.addPartitions(initialFetchStates); diff --git a/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/partition/PartitionMakeFollowerBenchmark.java b/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/partition/PartitionMakeFollowerBenchmark.java index e883020936f..1bc695ecb11 100644 --- a/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/partition/PartitionMakeFollowerBenchmark.java +++ b/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/partition/PartitionMakeFollowerBenchmark.java @@ -20,13 +20,13 @@ package org.apache.kafka.jmh.partition; import kafka.api.ApiVersion; import kafka.api.ApiVersion$; import kafka.cluster.DelayedOperations; -import kafka.cluster.IsrChangeListener; +import kafka.cluster.AlterPartitionListener; import kafka.cluster.Partition; import kafka.log.CleanerConfig; import kafka.log.Defaults; import kafka.log.LogConfig; import kafka.log.LogManager; -import kafka.server.AlterIsrManager; +import kafka.server.AlterPartitionManager; import kafka.server.BrokerTopicStats; import kafka.server.LogDirFailureChannel; import kafka.server.MetadataCache; @@ -122,12 +122,12 @@ public class PartitionMakeFollowerBenchmark { topicId = OptionConverters.toScala(Optional.of(Uuid.randomUuid())); Mockito.when(offsetCheckpoints.fetch(logDir.getAbsolutePath(), tp)).thenReturn(Option.apply(0L)); - IsrChangeListener isrChangeListener = Mockito.mock(IsrChangeListener.class); - AlterIsrManager alterIsrManager = Mockito.mock(AlterIsrManager.class); + AlterPartitionListener alterPartitionListener = Mockito.mock(AlterPartitionListener.class); + AlterPartitionManager alterPartitionManager = Mockito.mock(AlterPartitionManager.class); partition = new Partition(tp, 100, ApiVersion$.MODULE$.latestVersion(), 0, Time.SYSTEM, - isrChangeListener, delayedOperations, - Mockito.mock(MetadataCache.class), logManager, alterIsrManager); + alterPartitionListener, delayedOperations, + Mockito.mock(MetadataCache.class), logManager, alterPartitionManager); partition.createLogIfNotExists(true, false, offsetCheckpoints, topicId); executorService.submit((Runnable) () -> { SimpleRecord[] simpleRecords = new SimpleRecord[] { diff --git a/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/partition/UpdateFollowerFetchStateBenchmark.java b/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/partition/UpdateFollowerFetchStateBenchmark.java index 6470125b12c..cf7201d4c8f 100644 --- a/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/partition/UpdateFollowerFetchStateBenchmark.java +++ b/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/partition/UpdateFollowerFetchStateBenchmark.java @@ -20,13 +20,13 @@ package org.apache.kafka.jmh.partition; import kafka.api.ApiVersion; import kafka.api.ApiVersion$; import kafka.cluster.DelayedOperations; -import kafka.cluster.IsrChangeListener; +import kafka.cluster.AlterPartitionListener; import kafka.cluster.Partition; import kafka.log.CleanerConfig; import kafka.log.Defaults; import kafka.log.LogConfig; import kafka.log.LogManager; -import kafka.server.AlterIsrManager; +import kafka.server.AlterPartitionManager; import kafka.server.BrokerTopicStats; import kafka.server.LogDirFailureChannel; import kafka.server.LogOffsetMetadata; @@ -121,12 +121,12 @@ public class UpdateFollowerFetchStateBenchmark { .setPartitionEpoch(1) .setReplicas(replicas) .setIsNew(true); - IsrChangeListener isrChangeListener = Mockito.mock(IsrChangeListener.class); - AlterIsrManager alterIsrManager = Mockito.mock(AlterIsrManager.class); + AlterPartitionListener alterPartitionListener = Mockito.mock(AlterPartitionListener.class); + AlterPartitionManager alterPartitionManager = Mockito.mock(AlterPartitionManager.class); partition = new Partition(topicPartition, 100, ApiVersion$.MODULE$.latestVersion(), 0, Time.SYSTEM, - isrChangeListener, delayedOperations, - Mockito.mock(MetadataCache.class), logManager, alterIsrManager); + alterPartitionListener, delayedOperations, + Mockito.mock(MetadataCache.class), logManager, alterPartitionManager); partition.makeLeader(partitionState, offsetCheckpoints, topicId); } diff --git a/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/server/CheckpointBench.java b/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/server/CheckpointBench.java index 21a8086e4df..919179ac3bc 100644 --- a/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/server/CheckpointBench.java +++ b/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/server/CheckpointBench.java @@ -21,7 +21,7 @@ import kafka.cluster.Partition; import kafka.log.CleanerConfig; import kafka.log.LogConfig; import kafka.log.LogManager; -import kafka.server.AlterIsrManager; +import kafka.server.AlterPartitionManager; import kafka.server.BrokerTopicStats; import kafka.server.KafkaConfig; import kafka.server.LogDirFailureChannel; @@ -88,7 +88,7 @@ public class CheckpointBench { private QuotaFactory.QuotaManagers quotaManagers; private LogDirFailureChannel failureChannel; private LogManager logManager; - private AlterIsrManager alterIsrManager; + private AlterPartitionManager alterPartitionManager; @SuppressWarnings("deprecation") @@ -117,7 +117,7 @@ public class CheckpointBench { this.metrics, this.time, ""); - this.alterIsrManager = TestUtils.createAlterIsrManager(); + this.alterPartitionManager = TestUtils.createAlterIsrManager(); this.replicaManager = new ReplicaManagerBuilder(). setConfig(brokerProperties). setMetrics(metrics). @@ -128,7 +128,7 @@ public class CheckpointBench { setBrokerTopicStats(brokerTopicStats). setMetadataCache(metadataCache). setLogDirFailureChannel(failureChannel). - setAlterIsrManager(alterIsrManager). + setAlterPartitionManager(alterPartitionManager). build(); replicaManager.startup(); diff --git a/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/server/PartitionCreationBench.java b/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/server/PartitionCreationBench.java index e344f7d7ae1..ac9a7f4c545 100644 --- a/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/server/PartitionCreationBench.java +++ b/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/server/PartitionCreationBench.java @@ -22,7 +22,7 @@ import kafka.log.CleanerConfig; import kafka.log.Defaults; import kafka.log.LogConfig; import kafka.log.LogManager; -import kafka.server.AlterIsrManager; +import kafka.server.AlterPartitionManager; import kafka.server.BrokerTopicStats; import kafka.server.KafkaConfig; import kafka.server.LogDirFailureChannel; @@ -94,7 +94,7 @@ public class PartitionCreationBench { private KafkaZkClient zkClient; private LogDirFailureChannel failureChannel; private LogManager logManager; - private AlterIsrManager alterIsrManager; + private AlterPartitionManager alterPartitionManager; private List topicPartitions; @SuppressWarnings("deprecation") @@ -149,7 +149,7 @@ public class PartitionCreationBench { return new Properties(); } }; - this.alterIsrManager = TestUtils.createAlterIsrManager(); + this.alterPartitionManager = TestUtils.createAlterIsrManager(); this.replicaManager = new ReplicaManagerBuilder(). setConfig(brokerProperties). setMetrics(metrics). @@ -161,7 +161,7 @@ public class PartitionCreationBench { setBrokerTopicStats(brokerTopicStats). setMetadataCache(new ZkMetadataCache(this.brokerProperties.brokerId())). setLogDirFailureChannel(failureChannel). - setAlterIsrManager(alterIsrManager). + setAlterPartitionManager(alterPartitionManager). build(); replicaManager.startup(); replicaManager.checkpointHighWatermarks();