diff --git a/clients/src/main/java/org/apache/kafka/common/requests/LeaderAndIsrRequest.java b/clients/src/main/java/org/apache/kafka/common/requests/LeaderAndIsrRequest.java index 257d8e78bfc..dbd59b5b692 100644 --- a/clients/src/main/java/org/apache/kafka/common/requests/LeaderAndIsrRequest.java +++ b/clients/src/main/java/org/apache/kafka/common/requests/LeaderAndIsrRequest.java @@ -42,26 +42,52 @@ import java.util.stream.Collectors; public class LeaderAndIsrRequest extends AbstractControlRequest { + public enum Type { + UNKNOWN(0), + INCREMENTAL(1), + FULL(2); + + private final byte type; + private Type(int type) { + this.type = (byte) type; + } + + public byte toByte() { + return type; + } + + public static Type fromByte(byte type) { + for (Type t : Type.values()) { + if (t.type == type) { + return t; + } + } + return UNKNOWN; + } + } + public static class Builder extends AbstractControlRequest.Builder { private final List partitionStates; private final Map topicIds; private final Collection liveLeaders; + private final Type updateType; public Builder(short version, int controllerId, int controllerEpoch, long brokerEpoch, List partitionStates, Map topicIds, Collection liveLeaders) { this(version, controllerId, controllerEpoch, brokerEpoch, partitionStates, topicIds, - liveLeaders, false); + liveLeaders, false, Type.UNKNOWN); } public Builder(short version, int controllerId, int controllerEpoch, long brokerEpoch, List partitionStates, Map topicIds, - Collection liveLeaders, boolean kraftController) { + Collection liveLeaders, boolean kraftController, Type updateType) { super(ApiKeys.LEADER_AND_ISR, version, controllerId, controllerEpoch, brokerEpoch, kraftController); this.partitionStates = partitionStates; this.topicIds = topicIds; this.liveLeaders = liveLeaders; + this.updateType = updateType; } @Override @@ -82,6 +108,10 @@ public class LeaderAndIsrRequest extends AbstractControlRequest { data.setIsKRaftController(kraftController); } + if (version >= 5) { + data.setType(updateType.toByte()); + } + if (version >= 2) { Map topicStatesMap = groupByTopic(partitionStates, topicIds); data.setTopicStates(new ArrayList<>(topicStatesMap.values())); @@ -210,6 +240,10 @@ public class LeaderAndIsrRequest extends AbstractControlRequest { return Collections.unmodifiableList(data.liveLeaders()); } + public Type requestType() { + return Type.fromByte(data.type()); + } + @Override public LeaderAndIsrRequestData data() { return data; diff --git a/core/src/main/scala/kafka/controller/ControllerChannelManager.scala b/core/src/main/scala/kafka/controller/ControllerChannelManager.scala index 7e3b2a29227..33a335dcc2f 100755 --- a/core/src/main/scala/kafka/controller/ControllerChannelManager.scala +++ b/core/src/main/scala/kafka/controller/ControllerChannelManager.scala @@ -377,6 +377,7 @@ abstract class AbstractControllerBrokerRequestBatch(config: KafkaConfig, val stopReplicaRequestMap = mutable.Map.empty[Int, mutable.Map[TopicPartition, StopReplicaPartitionState]] val updateMetadataRequestBrokerSet = mutable.Set.empty[Int] val updateMetadataRequestPartitionInfoMap = mutable.Map.empty[TopicPartition, UpdateMetadataPartitionState] + private var updateType: LeaderAndIsrRequest.Type = LeaderAndIsrRequest.Type.UNKNOWN private var metadataInstance: ControllerChannelContext = _ def sendRequest(brokerId: Int, @@ -398,12 +399,17 @@ abstract class AbstractControllerBrokerRequestBatch(config: KafkaConfig, metadataInstance = metadataProvider() } + def setUpdateType(updateType: LeaderAndIsrRequest.Type): Unit = { + this.updateType = updateType + } + def clear(): Unit = { leaderAndIsrRequestMap.clear() stopReplicaRequestMap.clear() updateMetadataRequestBrokerSet.clear() updateMetadataRequestPartitionInfoMap.clear() metadataInstance = null + updateType = LeaderAndIsrRequest.Type.UNKNOWN } def addLeaderAndIsrRequestForBrokers(brokerIds: Seq[Int], @@ -543,8 +549,17 @@ abstract class AbstractControllerBrokerRequestBatch(config: KafkaConfig, .toSet[String] .map(topic => (topic, metadataInstance.topicIds.getOrElse(topic, Uuid.ZERO_UUID))) .toMap - val leaderAndIsrRequestBuilder = new LeaderAndIsrRequest.Builder(leaderAndIsrRequestVersion, controllerId, - controllerEpoch, brokerEpoch, leaderAndIsrPartitionStates.values.toBuffer.asJava, topicIds.asJava, leaders.asJava, kraftController) + val leaderAndIsrRequestBuilder = new LeaderAndIsrRequest.Builder( + leaderAndIsrRequestVersion, + controllerId, + controllerEpoch, + brokerEpoch, + leaderAndIsrPartitionStates.values.toBuffer.asJava, + topicIds.asJava, + leaders.asJava, + kraftController, + updateType + ) sendRequest(broker, leaderAndIsrRequestBuilder, (r: AbstractResponse) => { val leaderAndIsrResponse = r.asInstanceOf[LeaderAndIsrResponse] handleLeaderAndIsrResponse(leaderAndIsrResponse, broker) @@ -552,6 +567,7 @@ abstract class AbstractControllerBrokerRequestBatch(config: KafkaConfig, } } leaderAndIsrRequestMap.clear() + updateType = LeaderAndIsrRequest.Type.UNKNOWN } def handleLeaderAndIsrResponse(response: LeaderAndIsrResponse, broker: Int): Unit diff --git a/core/src/main/scala/kafka/log/LocalLog.scala b/core/src/main/scala/kafka/log/LocalLog.scala index 6f6679f1ebd..27b89864ffc 100644 --- a/core/src/main/scala/kafka/log/LocalLog.scala +++ b/core/src/main/scala/kafka/log/LocalLog.scala @@ -610,8 +610,12 @@ object LocalLog extends Logging { /** a directory that is used for future partition */ private[log] val FutureDirSuffix = "-future" + /** a directory that is used for stray partition */ + private[log] val StrayDirSuffix = "-stray" + private[log] val DeleteDirPattern = Pattern.compile(s"^(\\S+)-(\\S+)\\.(\\S+)$DeleteDirSuffix") private[log] val FutureDirPattern = Pattern.compile(s"^(\\S+)-(\\S+)\\.(\\S+)$FutureDirSuffix") + private[log] val StrayDirPattern = Pattern.compile(s"^(\\S+)-(\\S+)\\.(\\S+)$StrayDirSuffix") private[log] val UnknownOffset = -1L @@ -622,10 +626,17 @@ object LocalLog extends Logging { * from exceeding 255 characters. */ private[log] def logDeleteDirName(topicPartition: TopicPartition): String = { - val uniqueId = java.util.UUID.randomUUID.toString.replaceAll("-", "") - val suffix = s"-${topicPartition.partition()}.$uniqueId$DeleteDirSuffix" - val prefixLength = Math.min(topicPartition.topic().size, 255 - suffix.size) - s"${topicPartition.topic().substring(0, prefixLength)}$suffix" + logDirNameWithSuffixCappedLength(topicPartition, DeleteDirSuffix) + } + + /** + * Return a directory name to rename the log directory to for stray partition deletion. + * The name will be in the following format: "topic-partitionId.uniqueId-stray". + * If the topic name is too long, it will be truncated to prevent the total name + * from exceeding 255 characters. + */ + private[log] def logStrayDirName(topicPartition: TopicPartition): String = { + logDirNameWithSuffixCappedLength(topicPartition, StrayDirSuffix) } /** @@ -636,6 +647,18 @@ object LocalLog extends Logging { logDirNameWithSuffix(topicPartition, FutureDirSuffix) } + /** + * Return a new directory name in the following format: "${topic}-${partitionId}.${uniqueId}${suffix}". + * If the topic name is too long, it will be truncated to prevent the total name + * from exceeding 255 characters. + */ + private[log] def logDirNameWithSuffixCappedLength(topicPartition: TopicPartition, suffix: String): String = { + val uniqueId = java.util.UUID.randomUUID.toString.replaceAll("-", "") + val fullSuffix = s"-${topicPartition.partition()}.$uniqueId$suffix" + val prefixLength = Math.min(topicPartition.topic().size, 255 - fullSuffix.size) + s"${topicPartition.topic().substring(0, prefixLength)}$fullSuffix" + } + private[log] def logDirNameWithSuffix(topicPartition: TopicPartition, suffix: String): String = { val uniqueId = java.util.UUID.randomUUID.toString.replaceAll("-", "") s"${logDirName(topicPartition)}.$uniqueId$suffix" @@ -666,11 +689,13 @@ object LocalLog extends Logging { if (dirName == null || dirName.isEmpty || !dirName.contains('-')) throw exception(dir) if (dirName.endsWith(DeleteDirSuffix) && !DeleteDirPattern.matcher(dirName).matches || - dirName.endsWith(FutureDirSuffix) && !FutureDirPattern.matcher(dirName).matches) + dirName.endsWith(FutureDirSuffix) && !FutureDirPattern.matcher(dirName).matches || + dirName.endsWith(StrayDirSuffix) && !StrayDirPattern.matcher(dirName).matches) throw exception(dir) val name: String = - if (dirName.endsWith(DeleteDirSuffix) || dirName.endsWith(FutureDirSuffix)) dirName.substring(0, dirName.lastIndexOf('.')) + if (dirName.endsWith(DeleteDirSuffix) || dirName.endsWith(FutureDirSuffix) || dirName.endsWith(StrayDirSuffix)) + dirName.substring(0, dirName.lastIndexOf('.')) else dirName val index = name.lastIndexOf('-') diff --git a/core/src/main/scala/kafka/log/LogManager.scala b/core/src/main/scala/kafka/log/LogManager.scala index a1f4e582355..10b4ece3e9c 100755 --- a/core/src/main/scala/kafka/log/LogManager.scala +++ b/core/src/main/scala/kafka/log/LogManager.scala @@ -92,6 +92,10 @@ class LogManager(logDirs: Seq[File], // Each element in the queue contains the log object to be deleted and the time it is scheduled for deletion. private val logsToBeDeleted = new LinkedBlockingQueue[(UnifiedLog, Long)]() + // Map of stray partition to stray log. This holds all stray logs detected on the broker. + // Visible for testing + private val strayLogs = new Pool[TopicPartition, UnifiedLog]() + private val _liveLogDirs: ConcurrentLinkedQueue[File] = createAndValidateLogDirs(logDirs, initialOfflineDirs) @volatile private var _currentDefaultConfig = initialDefaultConfig @volatile private var numRecoveryThreadsPerDataDir = recoveryThreadsPerDataDir @@ -302,6 +306,10 @@ class LogManager(logDirs: Seq[File], this.logsToBeDeleted.add((log, time.milliseconds())) } + def addStrayLog(strayPartition: TopicPartition, strayLog: UnifiedLog): Unit = { + this.strayLogs.put(strayPartition, strayLog) + } + // Only for testing private[log] def hasLogsToBeDeleted: Boolean = !logsToBeDeleted.isEmpty @@ -337,6 +345,9 @@ class LogManager(logDirs: Seq[File], if (logDir.getName.endsWith(UnifiedLog.DeleteDirSuffix)) { addLogToBeDeleted(log) + } else if (logDir.getName.endsWith(UnifiedLog.StrayDirSuffix)) { + addStrayLog(topicPartition, log) + warn(s"Loaded stray log: $logDir") } else { val previous = { if (log.isFuture) @@ -1203,7 +1214,8 @@ class LogManager(logDirs: Seq[File], */ def asyncDelete(topicPartition: TopicPartition, isFuture: Boolean = false, - checkpoint: Boolean = true): Option[UnifiedLog] = { + checkpoint: Boolean = true, + isStray: Boolean = false): Option[UnifiedLog] = { val removedLog: Option[UnifiedLog] = logCreationOrDeletionLock synchronized { removeLogAndMetrics(if (isFuture) futureLogs else currentLogs, topicPartition) } @@ -1216,15 +1228,21 @@ class LogManager(logDirs: Seq[File], cleaner.updateCheckpoints(removedLog.parentDirFile, partitionToRemove = Option(topicPartition)) } } - removedLog.renameDir(UnifiedLog.logDeleteDirName(topicPartition), false) + if (isStray) { + // Move aside stray partitions, don't delete them + removedLog.renameDir(UnifiedLog.logStrayDirName(topicPartition), false) + warn(s"Log for partition ${removedLog.topicPartition} is marked as stray and renamed to ${removedLog.dir.getAbsolutePath}") + } else { + removedLog.renameDir(UnifiedLog.logDeleteDirName(topicPartition), false) + addLogToBeDeleted(removedLog) + info(s"Log for partition ${removedLog.topicPartition} is renamed to ${removedLog.dir.getAbsolutePath} and is scheduled for deletion") + } if (checkpoint) { val logDir = removedLog.parentDirFile val logsToCheckpoint = logsInDir(logDir) checkpointRecoveryOffsetsInDir(logDir, logsToCheckpoint) checkpointLogStartOffsetsInDir(logDir, logsToCheckpoint) } - addLogToBeDeleted(removedLog) - info(s"Log for partition ${removedLog.topicPartition} is renamed to ${removedLog.dir.getAbsolutePath} and is scheduled for deletion") case None => if (offlineLogDirs.nonEmpty) { @@ -1244,6 +1262,7 @@ class LogManager(logDirs: Seq[File], * topic-partition is raised */ def asyncDelete(topicPartitions: Set[TopicPartition], + isStray: Boolean, errorHandler: (TopicPartition, Throwable) => Unit): Unit = { val logDirs = mutable.Set.empty[File] @@ -1251,11 +1270,11 @@ class LogManager(logDirs: Seq[File], try { getLog(topicPartition).foreach { log => logDirs += log.parentDirFile - asyncDelete(topicPartition, checkpoint = false) + asyncDelete(topicPartition, checkpoint = false, isStray = isStray) } getLog(topicPartition, isFuture = true).foreach { log => logDirs += log.parentDirFile - asyncDelete(topicPartition, isFuture = true, checkpoint = false) + asyncDelete(topicPartition, isFuture = true, checkpoint = false, isStray = isStray) } } catch { case e: Throwable => errorHandler(topicPartition, e) diff --git a/core/src/main/scala/kafka/log/UnifiedLog.scala b/core/src/main/scala/kafka/log/UnifiedLog.scala index 34468012306..e8c88ccd199 100644 --- a/core/src/main/scala/kafka/log/UnifiedLog.scala +++ b/core/src/main/scala/kafka/log/UnifiedLog.scala @@ -1870,6 +1870,8 @@ object UnifiedLog extends Logging { val DeleteDirSuffix = LocalLog.DeleteDirSuffix + val StrayDirSuffix = LocalLog.StrayDirSuffix + val FutureDirSuffix = LocalLog.FutureDirSuffix private[log] val DeleteDirPattern = LocalLog.DeleteDirPattern @@ -1952,6 +1954,8 @@ object UnifiedLog extends Logging { def logFutureDirName(topicPartition: TopicPartition): String = LocalLog.logFutureDirName(topicPartition) + def logStrayDirName(topicPartition: TopicPartition): String = LocalLog.logStrayDirName(topicPartition) + def logDirName(topicPartition: TopicPartition): String = LocalLog.logDirName(topicPartition) def transactionIndexFile(dir: File, offset: Long, suffix: String = ""): File = LogFileUtils.transactionIndexFile(dir, offset, suffix) diff --git a/core/src/main/scala/kafka/migration/MigrationPropagator.scala b/core/src/main/scala/kafka/migration/MigrationPropagator.scala index c573991b84e..1a18ca42fcb 100644 --- a/core/src/main/scala/kafka/migration/MigrationPropagator.scala +++ b/core/src/main/scala/kafka/migration/MigrationPropagator.scala @@ -22,6 +22,7 @@ import kafka.controller.{ControllerChannelContext, ControllerChannelManager, Rep import kafka.server.KafkaConfig import org.apache.kafka.common.TopicPartition import org.apache.kafka.common.metrics.Metrics +import org.apache.kafka.common.requests.LeaderAndIsrRequest import org.apache.kafka.common.utils.Time import org.apache.kafka.image.{ClusterImage, MetadataDelta, MetadataImage, TopicsImage} import org.apache.kafka.metadata.PartitionRegistration @@ -225,6 +226,7 @@ class MigrationPropagator( requestBatch.sendRequestsToBrokers(zkControllerEpoch) requestBatch.newBatch() + requestBatch.setUpdateType(LeaderAndIsrRequest.Type.FULL) // When we need to send RPCs from the image, we're sending 'full' requests meaning we let // every broker know about all the metadata and all the LISR requests it needs to handle. // Note that we cannot send StopReplica requests from the image. We don't have any state diff --git a/core/src/main/scala/kafka/server/ReplicaManager.scala b/core/src/main/scala/kafka/server/ReplicaManager.scala index 4eb9a6f9c75..b8348a12d69 100644 --- a/core/src/main/scala/kafka/server/ReplicaManager.scala +++ b/core/src/main/scala/kafka/server/ReplicaManager.scala @@ -402,6 +402,47 @@ class ReplicaManager(val config: KafkaConfig, brokerTopicStats.removeMetrics(topic) } + private[server] def updateStrayLogs(strayPartitions: Set[TopicPartition]): Unit = { + if (strayPartitions.isEmpty) { + return + } + warn(s"Found stray partitions ${strayPartitions.mkString(",")}") + + // First, stop the partitions. This will shutdown the fetchers and other managers + val partitionsToStop = strayPartitions.map { tp => tp -> false }.toMap + stopPartitions(partitionsToStop).forKeyValue { (topicPartition, exception) => + error(s"Unable to stop stray partition $topicPartition", exception) + } + + // Next, delete the in-memory partition state. Normally, stopPartitions would do this, but since we're not + // actually deleting the log, so we can't rely on the "deleteLocalLog" behavior in stopPartitions. + strayPartitions.foreach { topicPartition => + getPartition(topicPartition) match { + case hostedPartition: HostedPartition.Online => + if (allPartitions.remove(topicPartition, hostedPartition)) { + maybeRemoveTopicMetrics(topicPartition.topic) + hostedPartition.partition.delete() + } + case _ => + } + } + + // Mark the log as stray in-memory and rename the directory + strayPartitions.foreach { tp => + logManager.getLog(tp).foreach(logManager.addStrayLog(tp, _)) + logManager.getLog(tp, isFuture = true).foreach(logManager.addStrayLog(tp, _)) + } + logManager.asyncDelete(strayPartitions, isStray = true, (topicPartition, e) => { + error(s"Failed to delete stray partition $topicPartition due to " + + s"${e.getClass.getName} exception: ${e.getMessage}") + }) + } + + // Find logs which exist on the broker, but aren't present in the full LISR + private[server] def findStrayPartitionsFromLeaderAndIsr(partitionsFromRequest: Set[TopicPartition]): Set[TopicPartition] = { + logManager.allLogs.map(_.topicPartition).filterNot(partitionsFromRequest.contains).toSet + } + protected def completeDelayedFetchOrProduceRequests(topicPartition: TopicPartition): Unit = { val topicPartitionOperationKey = TopicPartitionOperationKey(topicPartition) delayedProducePurgatory.checkAndComplete(topicPartitionOperationKey) @@ -591,7 +632,7 @@ class ReplicaManager(val config: KafkaConfig, val errorMap = new mutable.HashMap[TopicPartition, Throwable]() if (partitionsToDelete.nonEmpty) { // Delete the logs and checkpoint. - logManager.asyncDelete(partitionsToDelete, (tp, e) => errorMap.put(tp, e)) + logManager.asyncDelete(partitionsToDelete, isStray = false, (tp, e) => errorMap.put(tp, e)) } remoteLogManager.foreach { rlm => // exclude the partitions with offline/error state @@ -1689,10 +1730,12 @@ class ReplicaManager(val config: KafkaConfig, val partitionsToBeLeader = new mutable.HashMap[Partition, LeaderAndIsrPartitionState]() val partitionsToBeFollower = new mutable.HashMap[Partition, LeaderAndIsrPartitionState]() val topicIdUpdateFollowerPartitions = new mutable.HashSet[Partition]() + val allTopicPartitionsInRequest = new mutable.HashSet[TopicPartition]() // First create the partition if it doesn't exist already requestPartitionStates.foreach { partitionState => val topicPartition = new TopicPartition(partitionState.topicName, partitionState.partitionIndex) + allTopicPartitionsInRequest += topicPartition val partitionOpt = getPartition(topicPartition) match { case HostedPartition.Offline => stateChangeLogger.warn(s"Ignoring LeaderAndIsr request from " + @@ -1801,6 +1844,17 @@ class ReplicaManager(val config: KafkaConfig, // have been completely populated before starting the checkpointing there by avoiding weird race conditions startHighWatermarkCheckPointThread() + // In migration mode, reconcile missed topic deletions when handling full LISR from KRaft controller. + // LISR "type" field was previously unspecified (0), so if we see it set to Full (2), then we know the + // request came from a KRaft controller. + if ( + config.migrationEnabled && + leaderAndIsrRequest.isKRaftController && + leaderAndIsrRequest.requestType() == LeaderAndIsrRequest.Type.FULL + ) { + updateStrayLogs(findStrayPartitionsFromLeaderAndIsr(allTopicPartitionsInRequest)) + } + maybeAddLogDirFetchers(partitions, highWatermarkCheckpoints, topicIdFromRequest) replicaFetcherManager.shutdownIdleFetcherThreads() diff --git a/core/src/main/scala/kafka/zk/migration/ZkTopicMigrationClient.scala b/core/src/main/scala/kafka/zk/migration/ZkTopicMigrationClient.scala index 9459a83b283..dd042ff96a7 100644 --- a/core/src/main/scala/kafka/zk/migration/ZkTopicMigrationClient.scala +++ b/core/src/main/scala/kafka/zk/migration/ZkTopicMigrationClient.scala @@ -47,8 +47,17 @@ class ZkTopicMigrationClient(zkClient: KafkaZkClient) extends TopicMigrationClie if (!interests.contains(TopicVisitorInterest.TOPICS)) { throw new IllegalArgumentException("Must specify at least TOPICS in topic visitor interests.") } - val topics = zkClient.getAllTopicsInCluster() - val replicaAssignmentAndTopicIds = zkClient.getReplicaAssignmentAndTopicIdForTopics(topics) + val allTopics = zkClient.getAllTopicsInCluster() + val topicDeletions = readPendingTopicDeletions().asScala + val topicsToMigrated = allTopics -- topicDeletions + if (topicDeletions.nonEmpty) { + warn(s"Found ${topicDeletions.size} pending topic deletions. These will be not migrated " + + s"to KRaft. After the migration, the brokers will reconcile their logs with these pending topic deletions.") + } + topicDeletions.foreach { + deletion => logger.info(s"Not migrating pending deleted topic: $deletion") + } + val replicaAssignmentAndTopicIds = zkClient.getReplicaAssignmentAndTopicIdForTopics(topicsToMigrated) replicaAssignmentAndTopicIds.foreach { case TopicIdReplicaAssignment(topic, topicIdOpt, partitionAssignments) => val topicAssignment = partitionAssignments.map { case (partition, assignment) => partition.partition().asInstanceOf[Integer] -> assignment.replicas.map(Integer.valueOf).asJava @@ -206,6 +215,7 @@ class ZkTopicMigrationClient(zkClient: KafkaZkClient) extends TopicMigrationClie DeleteRequest(ConfigEntityZNode.path(ConfigType.Topic, topicName), ZkVersion.MatchAnyVersion), DeleteRequest(TopicZNode.path(topicName), ZkVersion.MatchAnyVersion) ) + val (migrationZkVersion, responses) = zkClient.retryMigrationRequestsUntilConnected(deleteRequests, state) val resultCodes = responses.map { response => response.path -> response.resultCode }.toMap if (responses.last.resultCode.equals(Code.OK)) { @@ -316,4 +326,25 @@ class ZkTopicMigrationClient(zkClient: KafkaZkClient) extends TopicMigrationClie val (path, data) = partitionStatePathAndData(topicPartition, partitionRegistration, controllerEpoch) SetDataRequest(path, data, ZkVersion.MatchAnyVersion, Some(topicPartition)) } + + override def readPendingTopicDeletions(): util.Set[String] = { + zkClient.getTopicDeletions.toSet.asJava + } + + override def clearPendingTopicDeletions( + pendingTopicDeletions: util.Set[String], + state: ZkMigrationLeadershipState + ): ZkMigrationLeadershipState = { + val deleteRequests = pendingTopicDeletions.asScala.map { topicName => + DeleteRequest(DeleteTopicsTopicZNode.path(topicName), ZkVersion.MatchAnyVersion) + }.toSeq + + val (migrationZkVersion, responses) = zkClient.retryMigrationRequestsUntilConnected(deleteRequests.toSeq, state) + val resultCodes = responses.map { response => response.path -> response.resultCode }.toMap + if (resultCodes.forall { case (_, code) => code.equals(Code.OK) }) { + state.withMigrationZkVersion(migrationZkVersion) + } else { + throw new MigrationClientException(s"Failed to delete pending topic deletions: $pendingTopicDeletions. ZK transaction had results $resultCodes") + } + } } diff --git a/core/src/test/scala/integration/kafka/zk/ZkMigrationIntegrationTest.scala b/core/src/test/scala/integration/kafka/zk/ZkMigrationIntegrationTest.scala index 4f5a2b8cb43..25ae532752a 100644 --- a/core/src/test/scala/integration/kafka/zk/ZkMigrationIntegrationTest.scala +++ b/core/src/test/scala/integration/kafka/zk/ZkMigrationIntegrationTest.scala @@ -47,7 +47,7 @@ import org.apache.kafka.metadata.migration.ZkMigrationLeadershipState import org.apache.kafka.raft.RaftConfig import org.apache.kafka.server.common.{ApiMessageAndVersion, MetadataVersion, ProducerIdsBlock} import org.junit.jupiter.api.Assertions.{assertEquals, assertFalse, assertNotEquals, assertNotNull, assertTrue} -import org.junit.jupiter.api.Timeout +import org.junit.jupiter.api.{Assumptions, Timeout} import org.junit.jupiter.api.extension.ExtendWith import org.slf4j.LoggerFactory @@ -265,6 +265,118 @@ class ZkMigrationIntegrationTest { migrationState = migrationClient.releaseControllerLeadership(migrationState) } + @ClusterTemplate("zkClustersForAllMigrationVersions") + def testMigrateTopicDeletions(zkCluster: ClusterInstance): Unit = { + // Create some topics in ZK mode + var admin = zkCluster.createAdminClient() + val newTopics = new util.ArrayList[NewTopic]() + newTopics.add(new NewTopic("test-topic-1", 10, 3.toShort)) + newTopics.add(new NewTopic("test-topic-2", 10, 3.toShort)) + newTopics.add(new NewTopic("test-topic-3", 10, 3.toShort)) + newTopics.add(new NewTopic("test-topic-4", 10, 3.toShort)) + newTopics.add(new NewTopic("test-topic-5", 10, 3.toShort)) + val createTopicResult = admin.createTopics(newTopics) + createTopicResult.all().get(300, TimeUnit.SECONDS) + admin.close() + val zkClient = zkCluster.asInstanceOf[ZkClusterInstance].getUnderlying().zkClient + + // Bootstrap the ZK cluster ID into KRaft + val clusterId = zkCluster.clusterId() + val kraftCluster = new KafkaClusterTestKit.Builder( + new TestKitNodes.Builder(). + setBootstrapMetadataVersion(zkCluster.config().metadataVersion()). + setClusterId(Uuid.fromString(clusterId)). + setNumBrokerNodes(0). + setNumControllerNodes(1).build()) + .setConfigProp(KafkaConfig.MigrationEnabledProp, "true") + .setConfigProp(KafkaConfig.ZkConnectProp, zkCluster.asInstanceOf[ZkClusterInstance].getUnderlying.zkConnect) + .build() + try { + kraftCluster.format() + kraftCluster.startup() + val readyFuture = kraftCluster.controllers().values().asScala.head.controller.waitForReadyBrokers(3) + + // Start a deletion that will take some time, but don't wait for it + admin = zkCluster.createAdminClient() + admin.deleteTopics(Seq("test-topic-1", "test-topic-2", "test-topic-3", "test-topic-4", "test-topic-5").asJava) + admin.close() + + // Enable migration configs and restart brokers + log.info("Restart brokers in migration mode") + zkCluster.config().serverProperties().put(KafkaConfig.MigrationEnabledProp, "true") + zkCluster.config().serverProperties().put(RaftConfig.QUORUM_VOTERS_CONFIG, kraftCluster.quorumVotersConfig()) + zkCluster.config().serverProperties().put(KafkaConfig.ControllerListenerNamesProp, "CONTROLLER") + zkCluster.config().serverProperties().put(KafkaConfig.ListenerSecurityProtocolMapProp, "CONTROLLER:PLAINTEXT,EXTERNAL:PLAINTEXT,PLAINTEXT:PLAINTEXT") + zkCluster.rollingBrokerRestart() + + zkCluster.waitForReadyBrokers() + readyFuture.get(60, TimeUnit.SECONDS) + + // Only continue with the test if there are some pending deletions to verify. If there are not any pending + // deletions, this will mark the test as "skipped" instead of failed. + val topicDeletions = zkCluster.asInstanceOf[ZkClusterInstance].getUnderlying.zkClient.getTopicDeletions + Assumptions.assumeTrue(topicDeletions.nonEmpty, + "This test needs pending topic deletions after a migration in order to verify the behavior") + + // Wait for migration to begin + log.info("Waiting for ZK migration to complete") + TestUtils.waitUntilTrue( + () => zkClient.getOrCreateMigrationState(ZkMigrationLeadershipState.EMPTY).initialZkMigrationComplete(), + "Timed out waiting for migration to complete", + 30000) + + // At this point, some of the topics may have been deleted by ZK controller and the rest will be + // implicitly deleted by the KRaft controller and remove from the ZK brokers as stray partitions + admin = zkCluster.createAdminClient() + TestUtils.waitUntilTrue( + () => admin.listTopics().names().get(60, TimeUnit.SECONDS).isEmpty, + "Timed out waiting for topics to be deleted", + 300000) + + val newTopics = new util.ArrayList[NewTopic]() + newTopics.add(new NewTopic("test-topic-1", 2, 3.toShort)) + newTopics.add(new NewTopic("test-topic-2", 1, 3.toShort)) + newTopics.add(new NewTopic("test-topic-3", 10, 3.toShort)) + val createTopicResult = admin.createTopics(newTopics) + createTopicResult.all().get(60, TimeUnit.SECONDS) + + val expectedNewTopics = Seq("test-topic-1", "test-topic-2", "test-topic-3") + TestUtils.waitUntilTrue( + () => admin.listTopics().names().get(60, TimeUnit.SECONDS).equals(expectedNewTopics.toSet.asJava), + "Timed out waiting for topics to be created", + 300000) + + TestUtils.retry(300000) { + // Need a retry here since topic metadata may be inconsistent between brokers + val topicDescriptions = admin.describeTopics(expectedNewTopics.asJavaCollection) + .topicNameValues().asScala.map { case (name, description) => + name -> description.get(60, TimeUnit.SECONDS) + }.toMap + + assertEquals(2, topicDescriptions("test-topic-1").partitions().size()) + assertEquals(1, topicDescriptions("test-topic-2").partitions().size()) + assertEquals(10, topicDescriptions("test-topic-3").partitions().size()) + topicDescriptions.foreach { case (topic, description) => + description.partitions().forEach(partition => { + assertEquals(3, partition.replicas().size(), s"Unexpected number of replicas for ${topic}-${partition.partition()}") + assertEquals(3, partition.isr().size(), s"Unexpected ISR for ${topic}-${partition.partition()}") + }) + } + + val absentTopics = admin.listTopics().names().get(60, TimeUnit.SECONDS).asScala + assertTrue(absentTopics.contains("test-topic-1")) + assertTrue(absentTopics.contains("test-topic-2")) + assertTrue(absentTopics.contains("test-topic-3")) + assertFalse(absentTopics.contains("test-topic-4")) + assertFalse(absentTopics.contains("test-topic-5")) + } + + admin.close() + } finally { + shutdownInSequence(zkCluster, kraftCluster) + } + } + // SCRAM and Quota are intermixed. Test SCRAM Only here @ClusterTest(clusterType = Type.ZK, brokers = 3, metadataVersion = MetadataVersion.IBP_3_5_IV2, serverProperties = Array( new ClusterConfigProperty(key = "inter.broker.listener.name", value = "EXTERNAL"), diff --git a/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala b/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala index 095c14260b8..0a8064760ee 100644 --- a/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala +++ b/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala @@ -68,6 +68,7 @@ import org.junit.jupiter.params.provider.{EnumSource, ValueSource} import com.yammer.metrics.core.Gauge import kafka.log.remote.RemoteLogManager import org.apache.kafka.common.config.AbstractConfig +import org.apache.kafka.raft.RaftConfig import org.apache.kafka.server.log.remote.storage.{NoOpRemoteLogMetadataManager, NoOpRemoteStorageManager, RemoteLogManagerConfig} import org.apache.kafka.server.util.timer.MockTimer import org.mockito.invocation.InvocationOnMock @@ -561,7 +562,9 @@ class ReplicaManagerTest { .setIsNew(true)).asJava, Collections.singletonMap(topic, Uuid.randomUuid()), Set(new Node(0, "host1", 0), new Node(1, "host2", 1)).asJava, - false).build() + false, + LeaderAndIsrRequest.Type.UNKNOWN + ).build() replicaManager.becomeLeaderOrFollower(0, leaderAndIsrRequest, (_, _) => ()) replicaManager.getPartitionOrException(new TopicPartition(topic, partition)) .localLogOrException @@ -2543,6 +2546,130 @@ class ReplicaManagerTest { } } + @ParameterizedTest + @ValueSource(booleans = Array(true, false)) + def testFullLeaderAndIsrStrayPartitions(zkMigrationEnabled: Boolean): Unit = { + val props = TestUtils.createBrokerConfig(1, TestUtils.MockZkConnect) + if (zkMigrationEnabled) { + props.put(KafkaConfig.MigrationEnabledProp, zkMigrationEnabled) + props.put(RaftConfig.QUORUM_VOTERS_CONFIG, "3000@localhost:9071") + props.put(KafkaConfig.ControllerListenerNamesProp, "CONTROLLER") + props.put(KafkaConfig.ListenerSecurityProtocolMapProp, "CONTROLLER:PLAINTEXT,EXTERNAL:PLAINTEXT,PLAINTEXT:PLAINTEXT") + config = KafkaConfig.fromProps(props) + } + + val logManager = TestUtils.createLogManager(config.logDirs.map(new File(_)), defaultConfig = new LogConfig(new Properties()), time = time) + val replicaManager = new ReplicaManager( + metrics = metrics, + config = config, + time = time, + scheduler = time.scheduler, + logManager = logManager, + quotaManagers = QuotaFactory.instantiate(config, metrics, time, ""), + metadataCache = MetadataCache.zkMetadataCache(config.brokerId, config.interBrokerProtocolVersion), + logDirFailureChannel = new LogDirFailureChannel(config.logDirs.size), + alterPartitionManager = alterPartitionManager, + threadNamePrefix = Option(this.getClass.getName)) + + logManager.startup(Set.empty[String]) + + // Create a hosted topic, a hosted topic that will become stray + createHostedLogs("hosted-topic", numLogs = 2, replicaManager).toSet + createHostedLogs("hosted-stray", numLogs = 10, replicaManager).toSet + + val lisr = new LeaderAndIsrRequest.Builder(ApiKeys.LEADER_AND_ISR.latestVersion, + 3000, 0, brokerEpoch, + Seq( + new LeaderAndIsrPartitionState() + .setTopicName("hosted-topic") + .setPartitionIndex(0) + .setControllerEpoch(controllerEpoch) + .setLeader(0) + .setLeaderEpoch(10) + .setIsr(Seq[Integer](0, 1).asJava) + .setPartitionEpoch(0) + .setReplicas(Seq[Integer](0, 1).asJava) + .setIsNew(false), + new LeaderAndIsrPartitionState() + .setTopicName("hosted-topic") + .setPartitionIndex(1) + .setControllerEpoch(controllerEpoch) + .setLeader(1) + .setLeaderEpoch(10) + .setIsr(Seq[Integer](1, 0).asJava) + .setPartitionEpoch(0) + .setReplicas(Seq[Integer](1, 0).asJava) + .setIsNew(false) + ).asJava, + topicIds.asJava, + Set(new Node(0, "host0", 0), new Node(1, "host1", 1)).asJava, + true, + LeaderAndIsrRequest.Type.FULL + ).build() + + replicaManager.becomeLeaderOrFollower(0, lisr, (_, _) => ()) + + val ht0 = replicaManager.getPartition(new TopicPartition("hosted-topic", 0)) + assertTrue(ht0.isInstanceOf[HostedPartition.Online]) + + val stray0 = replicaManager.getPartition(new TopicPartition("hosted-stray", 0)) + + if (zkMigrationEnabled) { + assertEquals(HostedPartition.None, stray0) + } else { + assertTrue(stray0.isInstanceOf[HostedPartition.Online]) + } + } + + @Test + def testUpdateStrayLogs(): Unit = { + val logManager = TestUtils.createLogManager(config.logDirs.map(new File(_)), defaultConfig = new LogConfig(new Properties()), time = time) + val replicaManager = new ReplicaManager( + metrics = metrics, + config = config, + time = time, + scheduler = time.scheduler, + logManager = logManager, + quotaManagers = QuotaFactory.instantiate(config, metrics, time, ""), + metadataCache = MetadataCache.zkMetadataCache(config.brokerId, config.interBrokerProtocolVersion), + logDirFailureChannel = new LogDirFailureChannel(config.logDirs.size), + alterPartitionManager = alterPartitionManager, + threadNamePrefix = Option(this.getClass.getName)) + + logManager.startup(Set.empty[String]) + + // Create a hosted topic, a hosted topic that will become stray, and a stray topic + val validLogs = createHostedLogs("hosted-topic", numLogs = 2, replicaManager).toSet + createHostedLogs("hosted-stray", numLogs = 10, replicaManager).toSet + createStrayLogs(10, logManager) + + val allReplicasFromLISR = Set(new TopicPartition("hosted-topic", 0), new TopicPartition("hosted-topic", 1)) + + replicaManager.updateStrayLogs(replicaManager.findStrayPartitionsFromLeaderAndIsr(allReplicasFromLISR)) + + assertEquals(validLogs, logManager.allLogs.toSet) + assertEquals(validLogs.size, replicaManager.partitionCount.value) + + replicaManager.shutdown() + logManager.shutdown() + } + + private def createHostedLogs(name: String, numLogs: Int, replicaManager: ReplicaManager): Seq[UnifiedLog] = { + for (i <- 0 until numLogs) yield { + val topicPartition = new TopicPartition(name, i) + val partition = replicaManager.createPartition(topicPartition) + partition.createLogIfNotExists(isNew = true, isFutureReplica = false, + new LazyOffsetCheckpoints(replicaManager.highWatermarkCheckpoints), topicId = None) + partition.log.get + } + } + + private def createStrayLogs(numLogs: Int, logManager: LogManager): Seq[UnifiedLog] = { + val name = "stray" + for (i <- 0 until numLogs) + yield logManager.getOrCreateLog(new TopicPartition(name, i), topicId = None) + } + private def sendProducerAppend( replicaManager: ReplicaManager, topicPartition: TopicPartition, diff --git a/metadata/src/main/java/org/apache/kafka/metadata/migration/KRaftMigrationDriver.java b/metadata/src/main/java/org/apache/kafka/metadata/migration/KRaftMigrationDriver.java index aa60390cc06..879f7de40bd 100644 --- a/metadata/src/main/java/org/apache/kafka/metadata/migration/KRaftMigrationDriver.java +++ b/metadata/src/main/java/org/apache/kafka/metadata/migration/KRaftMigrationDriver.java @@ -308,7 +308,8 @@ public class KRaftMigrationDriver implements MetadataPublisher { } } - private void transitionTo(MigrationDriverState newState) { + // Visible for testing + void transitionTo(MigrationDriverState newState) { if (!isValidStateChange(newState)) { throw new IllegalStateException( String.format("Invalid transition in migration driver from %s to %s", migrationState, newState)); @@ -498,6 +499,17 @@ public class KRaftMigrationDriver implements MetadataPublisher { return; } + // Until the metadata has been migrated, the migrationLeadershipState offset is -1. We need to ignore + // metadata images until we see that the migration has happened and the image exceeds the offset of the + // migration + if (!migrationLeadershipState.initialZkMigrationComplete()) { + log.info("Ignoring {} {} since the migration has not finished.", metadataType, provenance); + completionHandler.accept(null); + return; + } + + // If the migration has finished, the migrationLeadershipState offset will be positive. Ignore any images + // which are older than the offset that has been written to ZK. if (image.highestOffsetAndEpoch().compareTo(migrationLeadershipState.offsetAndEpoch()) < 0) { log.info("Ignoring {} {} which contains metadata that has already been written to ZK.", metadataType, provenance); completionHandler.accept(null); @@ -532,13 +544,18 @@ public class KRaftMigrationDriver implements MetadataPublisher { applyMigrationOperation("Updating ZK migration state after " + metadataType, state -> zkMigrationClient.setMigrationRecoveryState(zkStateAfterDualWrite)); - // TODO: Unhappy path: Probably relinquish leadership and let new controller - // retry the write? - if (delta.topicsDelta() != null || delta.clusterDelta() != null) { - log.trace("Sending RPCs to brokers for metadata {}.", metadataType); - propagator.sendRPCsToBrokersFromMetadataDelta(delta, image, migrationLeadershipState.zkControllerEpoch()); + if (isSnapshot) { + // When we load a snapshot, need to send full metadata updates to the brokers + log.debug("Sending full metadata RPCs to brokers for snapshot."); + propagator.sendRPCsToBrokersFromMetadataImage(image, migrationLeadershipState.zkControllerEpoch()); } else { - log.trace("Not sending RPCs to brokers for metadata {} since no relevant metadata has changed", metadataType); + // delta + if (delta.topicsDelta() != null || delta.clusterDelta() != null) { + log.trace("Sending incremental metadata RPCs to brokers for delta."); + propagator.sendRPCsToBrokersFromMetadataDelta(delta, image, migrationLeadershipState.zkControllerEpoch()); + } else { + log.trace("Not sending RPCs to brokers for metadata {} since no relevant metadata has changed", metadataType); + } } completionHandler.accept(null); @@ -699,6 +716,13 @@ public class KRaftMigrationDriver implements MetadataPublisher { @Override public void run() throws Exception { if (checkDriverState(MigrationDriverState.SYNC_KRAFT_TO_ZK)) { + // The migration offset will be non-negative at this point, so we just need to check that the image + // we have actually includes the migration metadata. + if (image.highestOffsetAndEpoch().compareTo(migrationLeadershipState.offsetAndEpoch()) < 0) { + log.info("Ignoring image {} which does not contain a superset of the metadata in ZK. Staying in " + + "SYNC_KRAFT_TO_ZK until a newer image is loaded", image.provenance()); + return; + } log.info("Performing a full metadata sync from KRaft to ZK."); Map dualWriteCounts = new TreeMap<>(); long startTime = time.nanoseconds(); diff --git a/metadata/src/main/java/org/apache/kafka/metadata/migration/KRaftMigrationZkWriter.java b/metadata/src/main/java/org/apache/kafka/metadata/migration/KRaftMigrationZkWriter.java index a57e1cadc65..6c82c9cb9cc 100644 --- a/metadata/src/main/java/org/apache/kafka/metadata/migration/KRaftMigrationZkWriter.java +++ b/metadata/src/main/java/org/apache/kafka/metadata/migration/KRaftMigrationZkWriter.java @@ -69,6 +69,7 @@ public class KRaftMigrationZkWriter { private static final String CREATE_TOPIC = "CreateTopic"; private static final String UPDATE_TOPIC = "UpdateTopic"; private static final String DELETE_TOPIC = "DeleteTopic"; + private static final String DELETE_PENDING_TOPIC_DELETION = "DeletePendingTopicDeletion"; private static final String UPDATE_PARTITION = "UpdatePartition"; private static final String DELETE_PARTITION = "DeletePartition"; private static final String UPDATE_BROKER_CONFIG = "UpdateBrokerConfig"; @@ -146,6 +147,15 @@ public class KRaftMigrationZkWriter { Map> changedPartitions = new HashMap<>(); Map> newPartitions = new HashMap<>(); + Set pendingTopicDeletions = migrationClient.topicClient().readPendingTopicDeletions(); + if (!pendingTopicDeletions.isEmpty()) { + operationConsumer.accept( + DELETE_PENDING_TOPIC_DELETION, + "Delete pending topic deletions", + migrationState -> migrationClient.topicClient().clearPendingTopicDeletions(pendingTopicDeletions, migrationState) + ); + } + migrationClient.topicClient().iterateTopics( EnumSet.of( TopicMigrationClient.TopicVisitorInterest.TOPICS, diff --git a/metadata/src/main/java/org/apache/kafka/metadata/migration/TopicMigrationClient.java b/metadata/src/main/java/org/apache/kafka/metadata/migration/TopicMigrationClient.java index 505c78a10b4..5eafd72b298 100644 --- a/metadata/src/main/java/org/apache/kafka/metadata/migration/TopicMigrationClient.java +++ b/metadata/src/main/java/org/apache/kafka/metadata/migration/TopicMigrationClient.java @@ -42,6 +42,13 @@ public interface TopicMigrationClient { void iterateTopics(EnumSet interests, TopicVisitor visitor); + Set readPendingTopicDeletions(); + + ZkMigrationLeadershipState clearPendingTopicDeletions( + Set pendingTopicDeletions, + ZkMigrationLeadershipState state + ); + ZkMigrationLeadershipState deleteTopic( String topicName, ZkMigrationLeadershipState state diff --git a/metadata/src/test/java/org/apache/kafka/metadata/migration/CapturingTopicMigrationClient.java b/metadata/src/test/java/org/apache/kafka/metadata/migration/CapturingTopicMigrationClient.java index 30317b20ecc..8b8e5acc5f3 100644 --- a/metadata/src/test/java/org/apache/kafka/metadata/migration/CapturingTopicMigrationClient.java +++ b/metadata/src/test/java/org/apache/kafka/metadata/migration/CapturingTopicMigrationClient.java @@ -21,6 +21,7 @@ import org.apache.kafka.common.Uuid; import org.apache.kafka.metadata.PartitionRegistration; import java.util.ArrayList; +import java.util.Collections; import java.util.EnumSet; import java.util.LinkedHashMap; import java.util.List; @@ -51,7 +52,23 @@ public class CapturingTopicMigrationClient implements TopicMigrationClient { } @Override - public ZkMigrationLeadershipState deleteTopic(String topicName, ZkMigrationLeadershipState state) { + public Set readPendingTopicDeletions() { + return Collections.emptySet(); + } + + @Override + public ZkMigrationLeadershipState clearPendingTopicDeletions( + Set pendingTopicDeletions, + ZkMigrationLeadershipState state + ) { + return state; + } + + @Override + public ZkMigrationLeadershipState deleteTopic( + String topicName, + ZkMigrationLeadershipState state + ) { deletedTopics.add(topicName); return state; } diff --git a/metadata/src/test/java/org/apache/kafka/metadata/migration/KRaftMigrationDriverTest.java b/metadata/src/test/java/org/apache/kafka/metadata/migration/KRaftMigrationDriverTest.java index 1bca3e2f2b2..eda9dc38ffa 100644 --- a/metadata/src/test/java/org/apache/kafka/metadata/migration/KRaftMigrationDriverTest.java +++ b/metadata/src/test/java/org/apache/kafka/metadata/migration/KRaftMigrationDriverTest.java @@ -615,7 +615,7 @@ public class KRaftMigrationDriverTest { // Wait for migration TestUtils.waitForCondition(() -> driver.migrationState().get(1, TimeUnit.MINUTES).equals(MigrationDriverState.DUAL_WRITE), - "Waiting for KRaftMigrationDriver to enter ZK_MIGRATION state"); + "Waiting for KRaftMigrationDriver to enter DUAL_WRITE state"); // Modify topics in a KRaft snapshot -- delete foo, modify bar, add baz provenance = new MetadataProvenance(200, 1, 1); @@ -634,6 +634,61 @@ public class KRaftMigrationDriverTest { }); } + @Test + public void testNoDualWriteBeforeMigration() throws Exception { + setupTopicDualWrite((driver, migrationClient, topicClient, configClient) -> { + MetadataImage image = new MetadataImage( + MetadataProvenance.EMPTY, + FeaturesImage.EMPTY, + ClusterImage.EMPTY, + IMAGE1, + ConfigurationsImage.EMPTY, + ClientQuotasImage.EMPTY, + ProducerIdsImage.EMPTY, + AclsImage.EMPTY, + ScramImage.EMPTY, + DelegationTokenImage.EMPTY); + MetadataDelta delta = new MetadataDelta(image); + + driver.start(); + setupDeltaForMigration(delta, true); + delta.replay(ZkMigrationState.PRE_MIGRATION.toRecord().message()); + delta.replay(zkBrokerRecord(0)); + delta.replay(zkBrokerRecord(1)); + delta.replay(zkBrokerRecord(2)); + delta.replay(zkBrokerRecord(3)); + delta.replay(zkBrokerRecord(4)); + delta.replay(zkBrokerRecord(5)); + MetadataProvenance provenance = new MetadataProvenance(100, 1, 1); + image = delta.apply(provenance); + + // Publish a delta with this node (3000) as the leader + LeaderAndEpoch newLeader = new LeaderAndEpoch(OptionalInt.of(3000), 1); + driver.onControllerChange(newLeader); + + TestUtils.waitForCondition(() -> driver.migrationState().get(1, TimeUnit.MINUTES).equals(MigrationDriverState.WAIT_FOR_CONTROLLER_QUORUM), + "Waiting for KRaftMigrationDriver to enter DUAL_WRITE state"); + + driver.onMetadataUpdate(delta, image, logDeltaManifestBuilder(provenance, newLeader).build()); + + driver.transitionTo(MigrationDriverState.WAIT_FOR_BROKERS); + driver.transitionTo(MigrationDriverState.BECOME_CONTROLLER); + driver.transitionTo(MigrationDriverState.ZK_MIGRATION); + driver.transitionTo(MigrationDriverState.SYNC_KRAFT_TO_ZK); + + provenance = new MetadataProvenance(200, 1, 1); + delta = new MetadataDelta(image); + RecordTestUtils.replayAll(delta, DELTA1_RECORDS); + image = delta.apply(provenance); + driver.onMetadataUpdate(delta, image, new SnapshotManifest(provenance, 100)); + + + // Wait for migration + TestUtils.waitForCondition(() -> driver.migrationState().get(1, TimeUnit.MINUTES).equals(MigrationDriverState.DUAL_WRITE), + "Waiting for KRaftMigrationDriver to enter DUAL_WRITE state"); + }); + } + @Test public void testControllerFailover() throws Exception { setupTopicDualWrite((driver, migrationClient, topicClient, configClient) -> {