diff --git a/core/src/main/scala/kafka/log/LogCleaner.scala b/core/src/main/scala/kafka/log/LogCleaner.scala index b972388e82b..3180f3db9bf 100644 --- a/core/src/main/scala/kafka/log/LogCleaner.scala +++ b/core/src/main/scala/kafka/log/LogCleaner.scala @@ -330,7 +330,7 @@ class LogCleaner(initialConfig: CleanerConfig, } val deletable: Iterable[(TopicPartition, Log)] = cleanerManager.deletableLogs() try { - deletable.foreach { case (topicPartition, log) => + deletable.foreach { case (_, log) => currentLog = Some(log) log.deleteOldSegments() } @@ -520,8 +520,12 @@ private[log] class Cleaner(val id: Int, // group the segments and clean the groups info("Cleaning log %s (cleaning prior to %s, discarding tombstones prior to %s)...".format(log.name, new Date(cleanableHorizonMs), new Date(deleteHorizonMs))) - for (group <- groupSegmentsBySize(log.logSegments(0, endOffset), log.config.segmentSize, log.config.maxIndexSize, cleanable.firstUncleanableOffset)) - cleanSegments(log, group, offsetMap, deleteHorizonMs, stats) + val transactionMetadata = new CleanedTransactionMetadata + + val groupedSegments = groupSegmentsBySize(log.logSegments(0, endOffset), log.config.segmentSize, + log.config.maxIndexSize, cleanable.firstUncleanableOffset) + for (group <- groupedSegments) + cleanSegments(log, group, offsetMap, deleteHorizonMs, stats, transactionMetadata) // record buffer utilization stats.bufferUtilization = offsetMap.utilization @@ -539,14 +543,18 @@ private[log] class Cleaner(val id: Int, * @param map The offset map to use for cleaning segments * @param deleteHorizonMs The time to retain delete tombstones * @param stats Collector for cleaning statistics + * @param transactionMetadata State of ongoing transactions which is carried between the cleaning + * of the grouped segments */ private[log] def cleanSegments(log: Log, segments: Seq[LogSegment], map: OffsetMap, deleteHorizonMs: Long, - stats: CleanerStats) { + stats: CleanerStats, + transactionMetadata: CleanedTransactionMetadata): Unit = { // create a new segment with a suffix appended to the name of the log and indexes val cleaned = LogCleaner.createNewCleanedSegment(log, segments.head.baseOffset) + transactionMetadata.cleanedIndex = Some(cleaned.txnIndex) try { // clean segments into the new destination segment @@ -561,7 +569,7 @@ private[log] class Cleaner(val id: Int, val startOffset = currentSegment.baseOffset val upperBoundOffset = nextSegmentOpt.map(_.baseOffset).getOrElse(map.latestOffset + 1) val abortedTransactions = log.collectAbortedTransactions(startOffset, upperBoundOffset) - val transactionMetadata = CleanedTransactionMetadata(abortedTransactions, Some(cleaned.txnIndex)) + transactionMetadata.addAbortedTransactions(abortedTransactions) val retainDeletes = currentSegment.lastModified > deleteHorizonMs info(s"Cleaning segment $startOffset in log ${log.name} (largest timestamp ${new Date(currentSegment.largestTimestamp)}) " + @@ -870,8 +878,9 @@ private[log] class Cleaner(val id: Int, val dirty = log.logSegments(start, end).toBuffer info("Building offset map for log %s for %d segments in offset range [%d, %d).".format(log.name, dirty.size, start, end)) + val transactionMetadata = new CleanedTransactionMetadata val abortedTransactions = log.collectAbortedTransactions(start, end) - val transactionMetadata = CleanedTransactionMetadata(abortedTransactions) + transactionMetadata.addAbortedTransactions(abortedTransactions) // Add all the cleanable dirty segments. We must take at least map.slots * load_factor, // but we may be able to fit more (if there is lots of duplication in the dirty section of the log) @@ -1042,29 +1051,26 @@ private case class LogToClean(topicPartition: TopicPartition, log: Log, firstDir override def compare(that: LogToClean): Int = math.signum(this.cleanableRatio - that.cleanableRatio).toInt } -private[log] object CleanedTransactionMetadata { - def apply(abortedTransactions: List[AbortedTxn], - transactionIndex: Option[TransactionIndex] = None): CleanedTransactionMetadata = { - val queue = mutable.PriorityQueue.empty[AbortedTxn](new Ordering[AbortedTxn] { - override def compare(x: AbortedTxn, y: AbortedTxn): Int = x.firstOffset compare y.firstOffset - }.reverse) - queue ++= abortedTransactions - new CleanedTransactionMetadata(queue, transactionIndex) - } - - val Empty = CleanedTransactionMetadata(List.empty[AbortedTxn]) -} - /** - * This is a helper class to facilitate tracking transaction state while cleaning the log. It is initialized - * with the aborted transactions from the transaction index and its state is updated as the cleaner iterates through - * the log during a round of cleaning. This class is responsible for deciding when transaction markers can - * be removed and is therefore also responsible for updating the cleaned transaction index accordingly. + * This is a helper class to facilitate tracking transaction state while cleaning the log. It maintains a set + * of the ongoing aborted and committed transactions as the cleaner is working its way through the log. This + * class is responsible for deciding when transaction markers can be removed and is therefore also responsible + * for updating the cleaned transaction index accordingly. */ -private[log] class CleanedTransactionMetadata(val abortedTransactions: mutable.PriorityQueue[AbortedTxn], - val transactionIndex: Option[TransactionIndex] = None) { - val ongoingCommittedTxns = mutable.Set.empty[Long] - val ongoingAbortedTxns = mutable.Map.empty[Long, AbortedTransactionMetadata] +private[log] class CleanedTransactionMetadata { + private val ongoingCommittedTxns = mutable.Set.empty[Long] + private val ongoingAbortedTxns = mutable.Map.empty[Long, AbortedTransactionMetadata] + // Minheap of aborted transactions sorted by the transaction first offset + private var abortedTransactions = mutable.PriorityQueue.empty[AbortedTxn](new Ordering[AbortedTxn] { + override def compare(x: AbortedTxn, y: AbortedTxn): Int = x.firstOffset compare y.firstOffset + }.reverse) + + // Output cleaned index to write retained aborted transactions + var cleanedIndex: Option[TransactionIndex] = None + + def addAbortedTransactions(abortedTransactions: List[AbortedTxn]): Unit = { + this.abortedTransactions ++= abortedTransactions + } /** * Update the cleaned transaction state with a control batch that has just been traversed by the cleaner. @@ -1081,9 +1087,11 @@ private[log] class CleanedTransactionMetadata(val abortedTransactions: mutable.P controlType match { case ControlRecordType.ABORT => ongoingAbortedTxns.remove(producerId) match { - // Retain the marker until all batches from the transaction have been removed + // Retain the marker until all batches from the transaction have been removed. + // We may retain a record from an aborted transaction if it is the last entry + // written by a given producerId. case Some(abortedTxnMetadata) if abortedTxnMetadata.lastObservedBatchOffset.isDefined => - transactionIndex.foreach(_.append(abortedTxnMetadata.abortedTxn)) + cleanedIndex.foreach(_.append(abortedTxnMetadata.abortedTxn)) false case _ => true } @@ -1103,7 +1111,7 @@ private[log] class CleanedTransactionMetadata(val abortedTransactions: mutable.P private def consumeAbortedTxnsUpTo(offset: Long): Unit = { while (abortedTransactions.headOption.exists(_.firstOffset <= offset)) { val abortedTxn = abortedTransactions.dequeue() - ongoingAbortedTxns += abortedTxn.producerId -> new AbortedTransactionMetadata(abortedTxn) + ongoingAbortedTxns.getOrElseUpdate(abortedTxn.producerId, new AbortedTransactionMetadata(abortedTxn)) } } @@ -1131,4 +1139,6 @@ private[log] class CleanedTransactionMetadata(val abortedTransactions: mutable.P private class AbortedTransactionMetadata(val abortedTxn: AbortedTxn) { var lastObservedBatchOffset: Option[Long] = None + + override def toString: String = s"(txn: $abortedTxn, lastOffset: $lastObservedBatchOffset)" } diff --git a/core/src/test/scala/unit/kafka/log/LogCleanerTest.scala b/core/src/test/scala/unit/kafka/log/LogCleanerTest.scala index 00c9a1817eb..825fa90f959 100755 --- a/core/src/test/scala/unit/kafka/log/LogCleanerTest.scala +++ b/core/src/test/scala/unit/kafka/log/LogCleanerTest.scala @@ -84,7 +84,7 @@ class LogCleanerTest { val segments = log.logSegments.take(3).toSeq val stats = new CleanerStats() val expectedBytesRead = segments.map(_.size).sum - cleaner.cleanSegments(log, segments, map, 0L, stats) + cleaner.cleanSegments(log, segments, map, 0L, stats, new CleanedTransactionMetadata) val shouldRemain = LogTest.keysInLog(log).filter(!keys.contains(_)) assertEquals(shouldRemain, LogTest.keysInLog(log)) assertEquals(expectedBytesRead, stats.bytesRead) @@ -151,7 +151,7 @@ class LogCleanerTest { val segments = log.logSegments(0, log.activeSegment.baseOffset).toSeq val stats = new CleanerStats() cleaner.buildOffsetMap(log, 0, log.activeSegment.baseOffset, offsetMap, stats) - cleaner.cleanSegments(log, segments, offsetMap, 0L, stats) + cleaner.cleanSegments(log, segments, offsetMap, 0L, stats, new CleanedTransactionMetadata) // Validate based on the file name that log segment file is renamed exactly once for async deletion assertEquals(expectedFileName, firstLogFile.file().getPath) @@ -432,37 +432,37 @@ class LogCleanerTest { log.roll() // first time through the records are removed - // Expected State: [{Producer1: EmptyBatch}, {Producer2: EmptyBatch}, {Producer2: Commit}, {2}, {3}] + // Expected State: [{Producer1: EmptyBatch}, {Producer2: EmptyBatch}, {Producer2: Commit}, {2}, {3}, {Producer1: Commit}] var dirtyOffset = cleaner.doClean(LogToClean(tp, log, 0L, 100L), deleteHorizonMs = Long.MaxValue)._1 assertEquals(List(2, 3), LogTest.keysInLog(log)) - assertEquals(List(4, 5, 6), offsetsInLog(log)) - assertEquals(List(1, 3, 4, 5, 6), lastOffsetsPerBatchInLog(log)) + assertEquals(List(4, 5, 6, 7), offsetsInLog(log)) + assertEquals(List(1, 3, 4, 5, 6, 7), lastOffsetsPerBatchInLog(log)) // the empty batch remains if cleaned again because it still holds the last sequence - // Expected State: [{Producer1: EmptyBatch}, {Producer2: EmptyBatch}, {Producer2: Commit}, {2}, {3}] + // Expected State: [{Producer1: EmptyBatch}, {Producer2: EmptyBatch}, {Producer2: Commit}, {2}, {3}, {Producer1: Commit}] dirtyOffset = cleaner.doClean(LogToClean(tp, log, dirtyOffset, 100L), deleteHorizonMs = Long.MaxValue)._1 assertEquals(List(2, 3), LogTest.keysInLog(log)) - assertEquals(List(4, 5, 6), offsetsInLog(log)) - assertEquals(List(1, 3, 4, 5, 6), lastOffsetsPerBatchInLog(log)) + assertEquals(List(4, 5, 6, 7), offsetsInLog(log)) + assertEquals(List(1, 3, 4, 5, 6, 7), lastOffsetsPerBatchInLog(log)) // append a new record from the producer to allow cleaning of the empty batch - // [{Producer1: EmptyBatch}, {Producer2: EmptyBatch}, {Producer2: Commit}, {2}, {3}] [{Producer2: 1}, {Producer2: Commit}] - // {1}, {3}, {4}, {5}, {6}, {8}, {9} ==> Offsets + // [{Producer1: EmptyBatch}, {Producer2: EmptyBatch}, {Producer2: Commit}, {2}, {3}, {Producer1: Commit}, {Producer2: 1}, {Producer2: Commit}] + // {1}, {3}, {4}, {5}, {6}, {7}, {8}, {9} ==> Offsets producer2(Seq(1)) // offset 8 log.appendAsLeader(commitMarker(2L, producerEpoch), leaderEpoch = 0, isFromClient = false) // offset 9 log.roll() - // Expected State: [{Producer1: EmptyBatch}, {Producer2: Commit}, {2}, {3}, {Producer2: 1}, {Producer2: Commit}] + // Expected State: [{Producer1: EmptyBatch}, {Producer2: Commit}, {2}, {3}, {Producer1: Commit}, {Producer2: 1}, {Producer2: Commit}] dirtyOffset = cleaner.doClean(LogToClean(tp, log, dirtyOffset, 100L), deleteHorizonMs = Long.MaxValue)._1 assertEquals(List(2, 3, 1), LogTest.keysInLog(log)) - assertEquals(List(4, 5, 6, 8, 9), offsetsInLog(log)) - assertEquals(List(1, 4, 5, 6, 8, 9), lastOffsetsPerBatchInLog(log)) + assertEquals(List(4, 5, 6, 7, 8, 9), offsetsInLog(log)) + assertEquals(List(1, 4, 5, 6, 7, 8, 9), lastOffsetsPerBatchInLog(log)) - // Expected State: [{Producer1: EmptyBatch}, {2}, {3}, {Producer2: 1}, {Producer2: Commit}] + // Expected State: [{Producer1: EmptyBatch}, {2}, {3}, {Producer1: Commit}, {Producer2: 1}, {Producer2: Commit}] dirtyOffset = cleaner.doClean(LogToClean(tp, log, dirtyOffset, 100L), deleteHorizonMs = Long.MaxValue)._1 assertEquals(List(2, 3, 1), LogTest.keysInLog(log)) - assertEquals(List(5, 6, 8, 9), offsetsInLog(log)) - assertEquals(List(1, 5, 6, 8, 9), lastOffsetsPerBatchInLog(log)) + assertEquals(List(5, 6, 7, 8, 9), offsetsInLog(log)) + assertEquals(List(1, 5, 6, 7, 8, 9), lastOffsetsPerBatchInLog(log)) } @Test @@ -496,6 +496,59 @@ class LogCleanerTest { assertEquals(List(0, 1, 2), lastOffsetsPerBatchInLog(log)) } + @Test + def testCommittedTransactionSpanningSegments(): Unit = { + val tp = new TopicPartition("test", 0) + val cleaner = makeCleaner(Int.MaxValue) + val logProps = new Properties() + logProps.put(LogConfig.SegmentBytesProp, 128: java.lang.Integer) + val log = makeLog(config = LogConfig.fromProps(logConfig.originals, logProps)) + val producerEpoch = 0.toShort + val producerId = 1L + + val appendTransaction = appendTransactionalAsLeader(log, producerId, producerEpoch) + appendTransaction(Seq(1)) + log.roll() + + log.appendAsLeader(commitMarker(producerId, producerEpoch), leaderEpoch = 0, isFromClient = false) + log.roll() + + // Both the record and the marker should remain after cleaning + cleaner.doClean(LogToClean(tp, log, 0L, 100L), deleteHorizonMs = Long.MaxValue) + assertEquals(List(0, 1), offsetsInLog(log)) + assertEquals(List(0, 1), lastOffsetsPerBatchInLog(log)) + } + + @Test + def testAbortedTransactionSpanningSegments(): Unit = { + val tp = new TopicPartition("test", 0) + val cleaner = makeCleaner(Int.MaxValue) + val logProps = new Properties() + logProps.put(LogConfig.SegmentBytesProp, 128: java.lang.Integer) + val log = makeLog(config = LogConfig.fromProps(logConfig.originals, logProps)) + val producerEpoch = 0.toShort + val producerId = 1L + + val appendTransaction = appendTransactionalAsLeader(log, producerId, producerEpoch) + appendTransaction(Seq(1)) + log.roll() + + log.appendAsLeader(abortMarker(producerId, producerEpoch), leaderEpoch = 0, isFromClient = false) + log.roll() + + // Both the batch and the marker should remain after cleaning. The batch is retained + // because it is the last entry for this producerId. The marker is retained because + // there are still batches remaining from this transaction. + cleaner.doClean(LogToClean(tp, log, 0L, 100L), deleteHorizonMs = Long.MaxValue) + assertEquals(List(1), offsetsInLog(log)) + assertEquals(List(0, 1), lastOffsetsPerBatchInLog(log)) + + // The empty batch and the marker is still retained after a second cleaning. + cleaner.doClean(LogToClean(tp, log, 0L, 100L), deleteHorizonMs = Long.MaxValue) + assertEquals(List(1), offsetsInLog(log)) + assertEquals(List(0, 1), lastOffsetsPerBatchInLog(log)) + } + @Test def testAbortMarkerRemoval(): Unit = { val tp = new TopicPartition("test", 0) @@ -650,7 +703,7 @@ class LogCleanerTest { // clean the log val stats = new CleanerStats() - cleaner.cleanSegments(log, Seq(log.logSegments.head), map, 0L, stats) + cleaner.cleanSegments(log, Seq(log.logSegments.head), map, 0L, stats, new CleanedTransactionMetadata) val shouldRemain = LogTest.keysInLog(log).filter(!keys.contains(_)) assertEquals(shouldRemain, LogTest.keysInLog(log)) } @@ -663,7 +716,7 @@ class LogCleanerTest { val (log, offsetMap) = createLogWithMessagesLargerThanMaxSize(largeMessageSize = 1024 * 1024) val cleaner = makeCleaner(Int.MaxValue, maxMessageSize=1024) - cleaner.cleanSegments(log, Seq(log.logSegments.head), offsetMap, 0L, new CleanerStats) + cleaner.cleanSegments(log, Seq(log.logSegments.head), offsetMap, 0L, new CleanerStats, new CleanedTransactionMetadata) val shouldRemain = LogTest.keysInLog(log).filter(k => !offsetMap.map.containsKey(k.toString)) assertEquals(shouldRemain, LogTest.keysInLog(log)) } @@ -682,7 +735,7 @@ class LogCleanerTest { val cleaner = makeCleaner(Int.MaxValue, maxMessageSize=1024) intercept[CorruptRecordException] { - cleaner.cleanSegments(log, Seq(log.logSegments.head), offsetMap, 0L, new CleanerStats) + cleaner.cleanSegments(log, Seq(log.logSegments.head), offsetMap, 0L, new CleanerStats, new CleanedTransactionMetadata) } } @@ -699,7 +752,7 @@ class LogCleanerTest { val cleaner = makeCleaner(Int.MaxValue, maxMessageSize=1024) intercept[CorruptRecordException] { - cleaner.cleanSegments(log, Seq(log.logSegments.head), offsetMap, 0L, new CleanerStats) + cleaner.cleanSegments(log, Seq(log.logSegments.head), offsetMap, 0L, new CleanerStats, new CleanedTransactionMetadata) } } @@ -1030,7 +1083,8 @@ class LogCleanerTest { val map = new FakeOffsetMap(Int.MaxValue) keys.foreach(k => map.put(key(k), Long.MaxValue)) intercept[LogCleaningAbortedException] { - cleaner.cleanSegments(log, log.logSegments.take(3).toSeq, map, 0L, new CleanerStats()) + cleaner.cleanSegments(log, log.logSegments.take(3).toSeq, map, 0L, new CleanerStats(), + new CleanedTransactionMetadata) } } @@ -1239,7 +1293,8 @@ class LogCleanerTest { // Try to clean segment with offset overflow. This will trigger log split and the cleaning itself must abort. assertThrows[LogCleaningAbortedException] { - cleaner.cleanSegments(log, List(segmentWithOverflow), offsetMap, 0L, new CleanerStats()) + cleaner.cleanSegments(log, List(segmentWithOverflow), offsetMap, 0L, new CleanerStats(), + new CleanedTransactionMetadata) } assertEquals(numSegmentsInitial + 1, log.logSegments.size) assertEquals(allKeys, LogTest.keysInLog(log)) @@ -1247,7 +1302,8 @@ class LogCleanerTest { // Clean each segment now that split is complete. for (segmentToClean <- log.logSegments) - cleaner.cleanSegments(log, List(segmentToClean), offsetMap, 0L, new CleanerStats()) + cleaner.cleanSegments(log, List(segmentToClean), offsetMap, 0L, new CleanerStats(), + new CleanedTransactionMetadata) assertEquals(expectedKeysAfterCleaning, LogTest.keysInLog(log)) assertFalse(LogTest.hasOffsetOverflow(log)) log.close() @@ -1287,7 +1343,8 @@ class LogCleanerTest { offsetMap.put(key(k), Long.MaxValue) // clean the log - cleaner.cleanSegments(log, log.logSegments.take(9).toSeq, offsetMap, 0L, new CleanerStats()) + cleaner.cleanSegments(log, log.logSegments.take(9).toSeq, offsetMap, 0L, new CleanerStats(), + new CleanedTransactionMetadata) // clear scheduler so that async deletes don't run time.scheduler.clear() var cleanedKeys = LogTest.keysInLog(log) @@ -1302,7 +1359,8 @@ class LogCleanerTest { log = recoverAndCheck(config, allKeys) // clean again - cleaner.cleanSegments(log, log.logSegments.take(9).toSeq, offsetMap, 0L, new CleanerStats()) + cleaner.cleanSegments(log, log.logSegments.take(9).toSeq, offsetMap, 0L, new CleanerStats(), + new CleanedTransactionMetadata) // clear scheduler so that async deletes don't run time.scheduler.clear() cleanedKeys = LogTest.keysInLog(log) @@ -1323,7 +1381,8 @@ class LogCleanerTest { } for (k <- 1 until messageCount by 2) offsetMap.put(key(k), Long.MaxValue) - cleaner.cleanSegments(log, log.logSegments.take(9).toSeq, offsetMap, 0L, new CleanerStats()) + cleaner.cleanSegments(log, log.logSegments.take(9).toSeq, offsetMap, 0L, new CleanerStats(), + new CleanedTransactionMetadata) // clear scheduler so that async deletes don't run time.scheduler.clear() cleanedKeys = LogTest.keysInLog(log) @@ -1340,7 +1399,8 @@ class LogCleanerTest { } for (k <- 1 until messageCount by 2) offsetMap.put(key(k), Long.MaxValue) - cleaner.cleanSegments(log, log.logSegments.take(9).toSeq, offsetMap, 0L, new CleanerStats()) + cleaner.cleanSegments(log, log.logSegments.take(9).toSeq, offsetMap, 0L, new CleanerStats(), + new CleanedTransactionMetadata) // clear scheduler so that async deletes don't run time.scheduler.clear() cleanedKeys = LogTest.keysInLog(log) @@ -1568,7 +1628,8 @@ class LogCleanerTest { appendIdempotentAsLeader(log, producerId, producerEpoch, isTransactional = true, isFromClient = isFromClient) } - private def appendIdempotentAsLeader(log: Log, producerId: Long, + private def appendIdempotentAsLeader(log: Log, + producerId: Long, producerEpoch: Short, isTransactional: Boolean = false, leaderEpoch: Int = 0,