diff --git a/core/src/main/scala/kafka/log/LogCleaner.scala b/core/src/main/scala/kafka/log/LogCleaner.scala index f8fcb843c80..f8e7cd5fabc 100644 --- a/core/src/main/scala/kafka/log/LogCleaner.scala +++ b/core/src/main/scala/kafka/log/LogCleaner.scala @@ -71,8 +71,8 @@ class LogCleaner(val config: CleanerConfig, val logs: Pool[TopicAndPartition, Log], time: Time = SystemTime) extends Logging with KafkaMetricsGroup { - /* for managing the state of partitions being cleaned. */ - private val cleanerManager = new LogCleanerManager(logDirs, logs); + /* for managing the state of partitions being cleaned. package-private to allow access in tests */ + private[log] val cleanerManager = new LogCleanerManager(logDirs, logs); /* a throttle used to limit the I/O of all the cleaner threads to a user-specified maximum rate */ private val throttler = new Throttler(desiredRatePerSec = config.maxIoBytesPerSecond, @@ -129,6 +129,13 @@ class LogCleaner(val config: CleanerConfig, cleanerManager.abortCleaning(topicAndPartition) } + /** + * Update checkpoint file, removing topics and partitions that no longer exist + */ + def updateCheckpoints(dataDir: File) { + cleanerManager.updateCheckpoints(dataDir, update=None); + } + /** * Abort the cleaning of a particular partition if it's in progress, and pause any future cleaning of this partition. * This call blocks until the cleaning of the partition is aborted and paused. diff --git a/core/src/main/scala/kafka/log/LogCleanerManager.scala b/core/src/main/scala/kafka/log/LogCleanerManager.scala index bcfef77ed53..fd87d905979 100644 --- a/core/src/main/scala/kafka/log/LogCleanerManager.scala +++ b/core/src/main/scala/kafka/log/LogCleanerManager.scala @@ -44,9 +44,12 @@ private[log] case object LogCleaningPaused extends LogCleaningState private[log] class LogCleanerManager(val logDirs: Array[File], val logs: Pool[TopicAndPartition, Log]) extends Logging with KafkaMetricsGroup { override val loggerName = classOf[LogCleaner].getName + + // package-private for testing + private[log] val offsetCheckpointFile = "cleaner-offset-checkpoint" /* the offset checkpoints holding the last cleaned point for each log */ - private val checkpoints = logDirs.map(dir => (dir, new OffsetCheckpoint(new File(dir, "cleaner-offset-checkpoint")))).toMap + private val checkpoints = logDirs.map(dir => (dir, new OffsetCheckpoint(new File(dir, offsetCheckpointFile)))).toMap /* the set of logs currently being cleaned */ private val inProgress = mutable.HashMap[TopicAndPartition, LogCleaningState]() @@ -199,6 +202,14 @@ private[log] class LogCleanerManager(val logDirs: Array[File], val logs: Pool[To } } + def updateCheckpoints(dataDir: File, update: Option[(TopicAndPartition,Long)]) { + inLock(lock) { + val checkpoint = checkpoints(dataDir) + val existing = checkpoint.read().filterKeys(logs.keys) ++ update + checkpoint.write(existing) + } + } + /** * Save out the endOffset and remove the given log from the in-progress set, if not aborted. */ @@ -206,9 +217,7 @@ private[log] class LogCleanerManager(val logDirs: Array[File], val logs: Pool[To inLock(lock) { inProgress(topicAndPartition) match { case LogCleaningInProgress => - val checkpoint = checkpoints(dataDir) - val offsets = checkpoint.read() + ((topicAndPartition, endOffset)) - checkpoint.write(offsets) + updateCheckpoints(dataDir,Option(topicAndPartition, endOffset)) inProgress.remove(topicAndPartition) case LogCleaningAborted => inProgress.put(topicAndPartition, LogCleaningPaused) diff --git a/core/src/main/scala/kafka/log/LogManager.scala b/core/src/main/scala/kafka/log/LogManager.scala index 4ebaae00ca4..47d250af62c 100644 --- a/core/src/main/scala/kafka/log/LogManager.scala +++ b/core/src/main/scala/kafka/log/LogManager.scala @@ -57,8 +57,9 @@ class LogManager(val logDirs: Array[File], private val dirLocks = lockLogDirs(logDirs) private val recoveryPointCheckpoints = logDirs.map(dir => (dir, new OffsetCheckpoint(new File(dir, RecoveryPointCheckpointFile)))).toMap loadLogs() - - private val cleaner: LogCleaner = + + // public, so we can access this from kafka.admin.DeleteTopicTest + val cleaner: LogCleaner = if(cleanerConfig.enableCleaner) new LogCleaner(cleanerConfig, logDirs, logs, time = time) else @@ -370,8 +371,10 @@ class LogManager(val logDirs: Array[File], } if (removedLog != null) { //We need to wait until there is no more cleaning task on the log to be deleted before actually deleting it. - if (cleaner != null) + if (cleaner != null) { cleaner.abortCleaning(topicAndPartition) + cleaner.updateCheckpoints(removedLog.dir.getParentFile) + } removedLog.delete() info("Deleted log for partition [%s,%d] in %s." .format(topicAndPartition.topic, diff --git a/core/src/test/scala/unit/kafka/admin/DeleteTopicTest.scala b/core/src/test/scala/unit/kafka/admin/DeleteTopicTest.scala index 29cc01bcef9..33c27678bf8 100644 --- a/core/src/test/scala/unit/kafka/admin/DeleteTopicTest.scala +++ b/core/src/test/scala/unit/kafka/admin/DeleteTopicTest.scala @@ -16,11 +16,14 @@ */ package kafka.admin +import java.io.File + +import kafka.log.Log import org.scalatest.junit.JUnit3Suite import kafka.zk.ZooKeeperTestHarness import junit.framework.Assert._ import kafka.utils.{ZkUtils, TestUtils} -import kafka.server.{KafkaServer, KafkaConfig} +import kafka.server.{OffsetCheckpoint, KafkaServer, KafkaConfig} import org.junit.Test import kafka.common._ import kafka.producer.{ProducerConfig, Producer} @@ -221,14 +224,50 @@ class DeleteTopicTest extends JUnit3Suite with ZooKeeperTestHarness { val leaderIdOpt = TestUtils.waitUntilLeaderIsElectedOrChanged(zkClient, topic, 0, 1000) assertTrue("Leader should exist for topic test", leaderIdOpt.isDefined) servers.foreach(_.shutdown()) + } + + @Test + def testDeleteTopicWithCleaner() { + val topicName = "test" + val topicAndPartition = TopicAndPartition(topicName, 0) + val topic = topicAndPartition.topic + + val brokerConfigs = TestUtils.createBrokerConfigs(3, false) + brokerConfigs(0).setProperty("delete.topic.enable", "true") + brokerConfigs(0).setProperty("log.cleaner.enable","true") + brokerConfigs(0).setProperty("log.cleanup.policy","compact") + brokerConfigs(0).setProperty("log.segment.bytes","100") + brokerConfigs(0).setProperty("log.segment.delete.delay.ms","1000") + val servers = createTestTopicAndCluster(topic,brokerConfigs) + + // for simplicity, we are validating cleaner offsets on a single broker + val server = servers(0) + val log = server.logManager.getLog(topicAndPartition).get + + // write to the topic to activate cleaner + writeDups(numKeys = 100, numDups = 3,log) + + // wait for cleaner to clean + server.logManager.cleaner.awaitCleaned(topicName,0,0) + // delete topic + AdminUtils.deleteTopic(zkClient, "test") + verifyTopicDeletion("test", servers) + + servers.foreach(_.shutdown()) } private def createTestTopicAndCluster(topic: String): Seq[KafkaServer] = { + + val brokerConfigs = TestUtils.createBrokerConfigs(3, false) + brokerConfigs.foreach(p => p.setProperty("delete.topic.enable", "true") + ) + createTestTopicAndCluster(topic,brokerConfigs) + } + + private def createTestTopicAndCluster(topic: String, brokerConfigs: Seq[Properties]): Seq[KafkaServer] = { val expectedReplicaAssignment = Map(0 -> List(0, 1, 2)) val topicAndPartition = TopicAndPartition(topic, 0) - val brokerConfigs = TestUtils.createBrokerConfigs(3, false) - brokerConfigs.foreach(p => p.setProperty("delete.topic.enable", "true")) // create brokers val servers = brokerConfigs.map(b => TestUtils.createServer(new KafkaConfig(b))) // create the topic @@ -253,5 +292,24 @@ class DeleteTopicTest extends JUnit3Suite with ZooKeeperTestHarness { // ensure that logs from all replicas are deleted if delete topic is marked successful in zookeeper assertTrue("Replica logs not deleted after delete topic is complete", servers.foldLeft(true)((res, server) => res && server.getLogManager().getLog(topicAndPartition).isEmpty)) + // ensure that topic is removed from all cleaner offsets + TestUtils.waitUntilTrue(() => servers.foldLeft(true)((res,server) => res && + { + val topicAndPartition = TopicAndPartition(topic,0) + val logdir = server.getLogManager().logDirs(0) + val checkpoints = new OffsetCheckpoint(new File(logdir,"cleaner-offset-checkpoint")).read() + !checkpoints.contains(topicAndPartition) + }), + "Cleaner offset for deleted partition should have been removed") + } + + private def writeDups(numKeys: Int, numDups: Int, log: Log): Seq[(Int, Int)] = { + var counter = 0 + for(dup <- 0 until numDups; key <- 0 until numKeys) yield { + val count = counter + log.append(TestUtils.singleMessageSet(payload = counter.toString.getBytes, key = key.toString.getBytes), assignOffsets = true) + counter += 1 + (key, count) + } } } diff --git a/core/src/test/scala/unit/kafka/log/LogCleanerIntegrationTest.scala b/core/src/test/scala/unit/kafka/log/LogCleanerIntegrationTest.scala index 5bfa764638e..07acd460b12 100644 --- a/core/src/test/scala/unit/kafka/log/LogCleanerIntegrationTest.scala +++ b/core/src/test/scala/unit/kafka/log/LogCleanerIntegrationTest.scala @@ -18,6 +18,8 @@ package kafka.log import java.io.File +import kafka.server.OffsetCheckpoint + import scala.collection._ import org.junit._ import kafka.common.TopicAndPartition @@ -62,6 +64,18 @@ class LogCleanerIntegrationTest extends JUnitSuite { cleaner.awaitCleaned("log", 0, lastCleaned2) val read2 = readFromLog(log) assertEquals("Contents of the map shouldn't change.", appends2.toMap, read2.toMap) + + // simulate deleting a partition, by removing it from logs + // force a checkpoint + // and make sure its gone from checkpoint file + + cleaner.logs.remove(topics(0)) + + cleaner.updateCheckpoints(logDir) + val checkpoints = new OffsetCheckpoint(new File(logDir,cleaner.cleanerManager.offsetCheckpointFile)).read() + + // we expect partition 0 to be gone + assert(!checkpoints.contains(topics(0))) cleaner.shutdown() }