Browse Source

KAFKA-1819 Cleaner gets confused about deleted and re-created topics; reviewed by Neha Narkhede

pull/38/merge
Gwen Shapira 10 years ago committed by Neha Narkhede
parent
commit
14779dddb6
  1. 11
      core/src/main/scala/kafka/log/LogCleaner.scala
  2. 17
      core/src/main/scala/kafka/log/LogCleanerManager.scala
  3. 9
      core/src/main/scala/kafka/log/LogManager.scala
  4. 64
      core/src/test/scala/unit/kafka/admin/DeleteTopicTest.scala
  5. 14
      core/src/test/scala/unit/kafka/log/LogCleanerIntegrationTest.scala

11
core/src/main/scala/kafka/log/LogCleaner.scala

@ -71,8 +71,8 @@ class LogCleaner(val config: CleanerConfig, @@ -71,8 +71,8 @@ class LogCleaner(val config: CleanerConfig,
val logs: Pool[TopicAndPartition, Log],
time: Time = SystemTime) extends Logging with KafkaMetricsGroup {
/* for managing the state of partitions being cleaned. */
private val cleanerManager = new LogCleanerManager(logDirs, logs);
/* for managing the state of partitions being cleaned. package-private to allow access in tests */
private[log] val cleanerManager = new LogCleanerManager(logDirs, logs);
/* a throttle used to limit the I/O of all the cleaner threads to a user-specified maximum rate */
private val throttler = new Throttler(desiredRatePerSec = config.maxIoBytesPerSecond,
@ -129,6 +129,13 @@ class LogCleaner(val config: CleanerConfig, @@ -129,6 +129,13 @@ class LogCleaner(val config: CleanerConfig,
cleanerManager.abortCleaning(topicAndPartition)
}
/**
* Update checkpoint file, removing topics and partitions that no longer exist
*/
def updateCheckpoints(dataDir: File) {
cleanerManager.updateCheckpoints(dataDir, update=None);
}
/**
* Abort the cleaning of a particular partition if it's in progress, and pause any future cleaning of this partition.
* This call blocks until the cleaning of the partition is aborted and paused.

17
core/src/main/scala/kafka/log/LogCleanerManager.scala

@ -44,9 +44,12 @@ private[log] case object LogCleaningPaused extends LogCleaningState @@ -44,9 +44,12 @@ private[log] case object LogCleaningPaused extends LogCleaningState
private[log] class LogCleanerManager(val logDirs: Array[File], val logs: Pool[TopicAndPartition, Log]) extends Logging with KafkaMetricsGroup {
override val loggerName = classOf[LogCleaner].getName
// package-private for testing
private[log] val offsetCheckpointFile = "cleaner-offset-checkpoint"
/* the offset checkpoints holding the last cleaned point for each log */
private val checkpoints = logDirs.map(dir => (dir, new OffsetCheckpoint(new File(dir, "cleaner-offset-checkpoint")))).toMap
private val checkpoints = logDirs.map(dir => (dir, new OffsetCheckpoint(new File(dir, offsetCheckpointFile)))).toMap
/* the set of logs currently being cleaned */
private val inProgress = mutable.HashMap[TopicAndPartition, LogCleaningState]()
@ -199,6 +202,14 @@ private[log] class LogCleanerManager(val logDirs: Array[File], val logs: Pool[To @@ -199,6 +202,14 @@ private[log] class LogCleanerManager(val logDirs: Array[File], val logs: Pool[To
}
}
def updateCheckpoints(dataDir: File, update: Option[(TopicAndPartition,Long)]) {
inLock(lock) {
val checkpoint = checkpoints(dataDir)
val existing = checkpoint.read().filterKeys(logs.keys) ++ update
checkpoint.write(existing)
}
}
/**
* Save out the endOffset and remove the given log from the in-progress set, if not aborted.
*/
@ -206,9 +217,7 @@ private[log] class LogCleanerManager(val logDirs: Array[File], val logs: Pool[To @@ -206,9 +217,7 @@ private[log] class LogCleanerManager(val logDirs: Array[File], val logs: Pool[To
inLock(lock) {
inProgress(topicAndPartition) match {
case LogCleaningInProgress =>
val checkpoint = checkpoints(dataDir)
val offsets = checkpoint.read() + ((topicAndPartition, endOffset))
checkpoint.write(offsets)
updateCheckpoints(dataDir,Option(topicAndPartition, endOffset))
inProgress.remove(topicAndPartition)
case LogCleaningAborted =>
inProgress.put(topicAndPartition, LogCleaningPaused)

9
core/src/main/scala/kafka/log/LogManager.scala

@ -57,8 +57,9 @@ class LogManager(val logDirs: Array[File], @@ -57,8 +57,9 @@ class LogManager(val logDirs: Array[File],
private val dirLocks = lockLogDirs(logDirs)
private val recoveryPointCheckpoints = logDirs.map(dir => (dir, new OffsetCheckpoint(new File(dir, RecoveryPointCheckpointFile)))).toMap
loadLogs()
private val cleaner: LogCleaner =
// public, so we can access this from kafka.admin.DeleteTopicTest
val cleaner: LogCleaner =
if(cleanerConfig.enableCleaner)
new LogCleaner(cleanerConfig, logDirs, logs, time = time)
else
@ -370,8 +371,10 @@ class LogManager(val logDirs: Array[File], @@ -370,8 +371,10 @@ class LogManager(val logDirs: Array[File],
}
if (removedLog != null) {
//We need to wait until there is no more cleaning task on the log to be deleted before actually deleting it.
if (cleaner != null)
if (cleaner != null) {
cleaner.abortCleaning(topicAndPartition)
cleaner.updateCheckpoints(removedLog.dir.getParentFile)
}
removedLog.delete()
info("Deleted log for partition [%s,%d] in %s."
.format(topicAndPartition.topic,

64
core/src/test/scala/unit/kafka/admin/DeleteTopicTest.scala

@ -16,11 +16,14 @@ @@ -16,11 +16,14 @@
*/
package kafka.admin
import java.io.File
import kafka.log.Log
import org.scalatest.junit.JUnit3Suite
import kafka.zk.ZooKeeperTestHarness
import junit.framework.Assert._
import kafka.utils.{ZkUtils, TestUtils}
import kafka.server.{KafkaServer, KafkaConfig}
import kafka.server.{OffsetCheckpoint, KafkaServer, KafkaConfig}
import org.junit.Test
import kafka.common._
import kafka.producer.{ProducerConfig, Producer}
@ -221,14 +224,50 @@ class DeleteTopicTest extends JUnit3Suite with ZooKeeperTestHarness { @@ -221,14 +224,50 @@ class DeleteTopicTest extends JUnit3Suite with ZooKeeperTestHarness {
val leaderIdOpt = TestUtils.waitUntilLeaderIsElectedOrChanged(zkClient, topic, 0, 1000)
assertTrue("Leader should exist for topic test", leaderIdOpt.isDefined)
servers.foreach(_.shutdown())
}
@Test
def testDeleteTopicWithCleaner() {
val topicName = "test"
val topicAndPartition = TopicAndPartition(topicName, 0)
val topic = topicAndPartition.topic
val brokerConfigs = TestUtils.createBrokerConfigs(3, false)
brokerConfigs(0).setProperty("delete.topic.enable", "true")
brokerConfigs(0).setProperty("log.cleaner.enable","true")
brokerConfigs(0).setProperty("log.cleanup.policy","compact")
brokerConfigs(0).setProperty("log.segment.bytes","100")
brokerConfigs(0).setProperty("log.segment.delete.delay.ms","1000")
val servers = createTestTopicAndCluster(topic,brokerConfigs)
// for simplicity, we are validating cleaner offsets on a single broker
val server = servers(0)
val log = server.logManager.getLog(topicAndPartition).get
// write to the topic to activate cleaner
writeDups(numKeys = 100, numDups = 3,log)
// wait for cleaner to clean
server.logManager.cleaner.awaitCleaned(topicName,0,0)
// delete topic
AdminUtils.deleteTopic(zkClient, "test")
verifyTopicDeletion("test", servers)
servers.foreach(_.shutdown())
}
private def createTestTopicAndCluster(topic: String): Seq[KafkaServer] = {
val brokerConfigs = TestUtils.createBrokerConfigs(3, false)
brokerConfigs.foreach(p => p.setProperty("delete.topic.enable", "true")
)
createTestTopicAndCluster(topic,brokerConfigs)
}
private def createTestTopicAndCluster(topic: String, brokerConfigs: Seq[Properties]): Seq[KafkaServer] = {
val expectedReplicaAssignment = Map(0 -> List(0, 1, 2))
val topicAndPartition = TopicAndPartition(topic, 0)
val brokerConfigs = TestUtils.createBrokerConfigs(3, false)
brokerConfigs.foreach(p => p.setProperty("delete.topic.enable", "true"))
// create brokers
val servers = brokerConfigs.map(b => TestUtils.createServer(new KafkaConfig(b)))
// create the topic
@ -253,5 +292,24 @@ class DeleteTopicTest extends JUnit3Suite with ZooKeeperTestHarness { @@ -253,5 +292,24 @@ class DeleteTopicTest extends JUnit3Suite with ZooKeeperTestHarness {
// ensure that logs from all replicas are deleted if delete topic is marked successful in zookeeper
assertTrue("Replica logs not deleted after delete topic is complete",
servers.foldLeft(true)((res, server) => res && server.getLogManager().getLog(topicAndPartition).isEmpty))
// ensure that topic is removed from all cleaner offsets
TestUtils.waitUntilTrue(() => servers.foldLeft(true)((res,server) => res &&
{
val topicAndPartition = TopicAndPartition(topic,0)
val logdir = server.getLogManager().logDirs(0)
val checkpoints = new OffsetCheckpoint(new File(logdir,"cleaner-offset-checkpoint")).read()
!checkpoints.contains(topicAndPartition)
}),
"Cleaner offset for deleted partition should have been removed")
}
private def writeDups(numKeys: Int, numDups: Int, log: Log): Seq[(Int, Int)] = {
var counter = 0
for(dup <- 0 until numDups; key <- 0 until numKeys) yield {
val count = counter
log.append(TestUtils.singleMessageSet(payload = counter.toString.getBytes, key = key.toString.getBytes), assignOffsets = true)
counter += 1
(key, count)
}
}
}

14
core/src/test/scala/unit/kafka/log/LogCleanerIntegrationTest.scala

@ -18,6 +18,8 @@ @@ -18,6 +18,8 @@
package kafka.log
import java.io.File
import kafka.server.OffsetCheckpoint
import scala.collection._
import org.junit._
import kafka.common.TopicAndPartition
@ -62,6 +64,18 @@ class LogCleanerIntegrationTest extends JUnitSuite { @@ -62,6 +64,18 @@ class LogCleanerIntegrationTest extends JUnitSuite {
cleaner.awaitCleaned("log", 0, lastCleaned2)
val read2 = readFromLog(log)
assertEquals("Contents of the map shouldn't change.", appends2.toMap, read2.toMap)
// simulate deleting a partition, by removing it from logs
// force a checkpoint
// and make sure its gone from checkpoint file
cleaner.logs.remove(topics(0))
cleaner.updateCheckpoints(logDir)
val checkpoints = new OffsetCheckpoint(new File(logDir,cleaner.cleanerManager.offsetCheckpointFile)).read()
// we expect partition 0 to be gone
assert(!checkpoints.contains(topics(0)))
cleaner.shutdown()
}

Loading…
Cancel
Save