From 6afb0ca735cd5c1a559d496247559d21bcadcb6c Mon Sep 17 00:00:00 2001 From: Jason Gustafson Date: Fri, 24 May 2019 23:10:56 -0700 Subject: [PATCH] KAFKA-8351; Cleaner should handle transactions spanning multiple segments (#6722) When cleaning transactional data, we need to keep track of which transactions still have data associated with them so that we do not remove the markers. We had logic to do this, but the state was not being carried over when beginning cleaning for a new set of segments. This could cause the cleaner to incorrectly believe a transaction marker was no longer needed. The fix here carries the transactional state between groups of segments to be cleaned. Reviewers: Dhruvil Shah , Viktor Somogyi , Jason Gustafson --- .../src/main/scala/kafka/log/LogCleaner.scala | 70 ++++++----- .../scala/unit/kafka/log/LogCleanerTest.scala | 117 +++++++++++++----- 2 files changed, 129 insertions(+), 58 deletions(-) diff --git a/core/src/main/scala/kafka/log/LogCleaner.scala b/core/src/main/scala/kafka/log/LogCleaner.scala index b972388e82b..3180f3db9bf 100644 --- a/core/src/main/scala/kafka/log/LogCleaner.scala +++ b/core/src/main/scala/kafka/log/LogCleaner.scala @@ -330,7 +330,7 @@ class LogCleaner(initialConfig: CleanerConfig, } val deletable: Iterable[(TopicPartition, Log)] = cleanerManager.deletableLogs() try { - deletable.foreach { case (topicPartition, log) => + deletable.foreach { case (_, log) => currentLog = Some(log) log.deleteOldSegments() } @@ -520,8 +520,12 @@ private[log] class Cleaner(val id: Int, // group the segments and clean the groups info("Cleaning log %s (cleaning prior to %s, discarding tombstones prior to %s)...".format(log.name, new Date(cleanableHorizonMs), new Date(deleteHorizonMs))) - for (group <- groupSegmentsBySize(log.logSegments(0, endOffset), log.config.segmentSize, log.config.maxIndexSize, cleanable.firstUncleanableOffset)) - cleanSegments(log, group, offsetMap, deleteHorizonMs, stats) + val transactionMetadata = new CleanedTransactionMetadata + + val groupedSegments = groupSegmentsBySize(log.logSegments(0, endOffset), log.config.segmentSize, + log.config.maxIndexSize, cleanable.firstUncleanableOffset) + for (group <- groupedSegments) + cleanSegments(log, group, offsetMap, deleteHorizonMs, stats, transactionMetadata) // record buffer utilization stats.bufferUtilization = offsetMap.utilization @@ -539,14 +543,18 @@ private[log] class Cleaner(val id: Int, * @param map The offset map to use for cleaning segments * @param deleteHorizonMs The time to retain delete tombstones * @param stats Collector for cleaning statistics + * @param transactionMetadata State of ongoing transactions which is carried between the cleaning + * of the grouped segments */ private[log] def cleanSegments(log: Log, segments: Seq[LogSegment], map: OffsetMap, deleteHorizonMs: Long, - stats: CleanerStats) { + stats: CleanerStats, + transactionMetadata: CleanedTransactionMetadata): Unit = { // create a new segment with a suffix appended to the name of the log and indexes val cleaned = LogCleaner.createNewCleanedSegment(log, segments.head.baseOffset) + transactionMetadata.cleanedIndex = Some(cleaned.txnIndex) try { // clean segments into the new destination segment @@ -561,7 +569,7 @@ private[log] class Cleaner(val id: Int, val startOffset = currentSegment.baseOffset val upperBoundOffset = nextSegmentOpt.map(_.baseOffset).getOrElse(map.latestOffset + 1) val abortedTransactions = log.collectAbortedTransactions(startOffset, upperBoundOffset) - val transactionMetadata = CleanedTransactionMetadata(abortedTransactions, Some(cleaned.txnIndex)) + transactionMetadata.addAbortedTransactions(abortedTransactions) val retainDeletes = currentSegment.lastModified > deleteHorizonMs info(s"Cleaning segment $startOffset in log ${log.name} (largest timestamp ${new Date(currentSegment.largestTimestamp)}) " + @@ -870,8 +878,9 @@ private[log] class Cleaner(val id: Int, val dirty = log.logSegments(start, end).toBuffer info("Building offset map for log %s for %d segments in offset range [%d, %d).".format(log.name, dirty.size, start, end)) + val transactionMetadata = new CleanedTransactionMetadata val abortedTransactions = log.collectAbortedTransactions(start, end) - val transactionMetadata = CleanedTransactionMetadata(abortedTransactions) + transactionMetadata.addAbortedTransactions(abortedTransactions) // Add all the cleanable dirty segments. We must take at least map.slots * load_factor, // but we may be able to fit more (if there is lots of duplication in the dirty section of the log) @@ -1042,29 +1051,26 @@ private case class LogToClean(topicPartition: TopicPartition, log: Log, firstDir override def compare(that: LogToClean): Int = math.signum(this.cleanableRatio - that.cleanableRatio).toInt } -private[log] object CleanedTransactionMetadata { - def apply(abortedTransactions: List[AbortedTxn], - transactionIndex: Option[TransactionIndex] = None): CleanedTransactionMetadata = { - val queue = mutable.PriorityQueue.empty[AbortedTxn](new Ordering[AbortedTxn] { - override def compare(x: AbortedTxn, y: AbortedTxn): Int = x.firstOffset compare y.firstOffset - }.reverse) - queue ++= abortedTransactions - new CleanedTransactionMetadata(queue, transactionIndex) - } - - val Empty = CleanedTransactionMetadata(List.empty[AbortedTxn]) -} - /** - * This is a helper class to facilitate tracking transaction state while cleaning the log. It is initialized - * with the aborted transactions from the transaction index and its state is updated as the cleaner iterates through - * the log during a round of cleaning. This class is responsible for deciding when transaction markers can - * be removed and is therefore also responsible for updating the cleaned transaction index accordingly. + * This is a helper class to facilitate tracking transaction state while cleaning the log. It maintains a set + * of the ongoing aborted and committed transactions as the cleaner is working its way through the log. This + * class is responsible for deciding when transaction markers can be removed and is therefore also responsible + * for updating the cleaned transaction index accordingly. */ -private[log] class CleanedTransactionMetadata(val abortedTransactions: mutable.PriorityQueue[AbortedTxn], - val transactionIndex: Option[TransactionIndex] = None) { - val ongoingCommittedTxns = mutable.Set.empty[Long] - val ongoingAbortedTxns = mutable.Map.empty[Long, AbortedTransactionMetadata] +private[log] class CleanedTransactionMetadata { + private val ongoingCommittedTxns = mutable.Set.empty[Long] + private val ongoingAbortedTxns = mutable.Map.empty[Long, AbortedTransactionMetadata] + // Minheap of aborted transactions sorted by the transaction first offset + private var abortedTransactions = mutable.PriorityQueue.empty[AbortedTxn](new Ordering[AbortedTxn] { + override def compare(x: AbortedTxn, y: AbortedTxn): Int = x.firstOffset compare y.firstOffset + }.reverse) + + // Output cleaned index to write retained aborted transactions + var cleanedIndex: Option[TransactionIndex] = None + + def addAbortedTransactions(abortedTransactions: List[AbortedTxn]): Unit = { + this.abortedTransactions ++= abortedTransactions + } /** * Update the cleaned transaction state with a control batch that has just been traversed by the cleaner. @@ -1081,9 +1087,11 @@ private[log] class CleanedTransactionMetadata(val abortedTransactions: mutable.P controlType match { case ControlRecordType.ABORT => ongoingAbortedTxns.remove(producerId) match { - // Retain the marker until all batches from the transaction have been removed + // Retain the marker until all batches from the transaction have been removed. + // We may retain a record from an aborted transaction if it is the last entry + // written by a given producerId. case Some(abortedTxnMetadata) if abortedTxnMetadata.lastObservedBatchOffset.isDefined => - transactionIndex.foreach(_.append(abortedTxnMetadata.abortedTxn)) + cleanedIndex.foreach(_.append(abortedTxnMetadata.abortedTxn)) false case _ => true } @@ -1103,7 +1111,7 @@ private[log] class CleanedTransactionMetadata(val abortedTransactions: mutable.P private def consumeAbortedTxnsUpTo(offset: Long): Unit = { while (abortedTransactions.headOption.exists(_.firstOffset <= offset)) { val abortedTxn = abortedTransactions.dequeue() - ongoingAbortedTxns += abortedTxn.producerId -> new AbortedTransactionMetadata(abortedTxn) + ongoingAbortedTxns.getOrElseUpdate(abortedTxn.producerId, new AbortedTransactionMetadata(abortedTxn)) } } @@ -1131,4 +1139,6 @@ private[log] class CleanedTransactionMetadata(val abortedTransactions: mutable.P private class AbortedTransactionMetadata(val abortedTxn: AbortedTxn) { var lastObservedBatchOffset: Option[Long] = None + + override def toString: String = s"(txn: $abortedTxn, lastOffset: $lastObservedBatchOffset)" } diff --git a/core/src/test/scala/unit/kafka/log/LogCleanerTest.scala b/core/src/test/scala/unit/kafka/log/LogCleanerTest.scala index 00c9a1817eb..825fa90f959 100755 --- a/core/src/test/scala/unit/kafka/log/LogCleanerTest.scala +++ b/core/src/test/scala/unit/kafka/log/LogCleanerTest.scala @@ -84,7 +84,7 @@ class LogCleanerTest { val segments = log.logSegments.take(3).toSeq val stats = new CleanerStats() val expectedBytesRead = segments.map(_.size).sum - cleaner.cleanSegments(log, segments, map, 0L, stats) + cleaner.cleanSegments(log, segments, map, 0L, stats, new CleanedTransactionMetadata) val shouldRemain = LogTest.keysInLog(log).filter(!keys.contains(_)) assertEquals(shouldRemain, LogTest.keysInLog(log)) assertEquals(expectedBytesRead, stats.bytesRead) @@ -151,7 +151,7 @@ class LogCleanerTest { val segments = log.logSegments(0, log.activeSegment.baseOffset).toSeq val stats = new CleanerStats() cleaner.buildOffsetMap(log, 0, log.activeSegment.baseOffset, offsetMap, stats) - cleaner.cleanSegments(log, segments, offsetMap, 0L, stats) + cleaner.cleanSegments(log, segments, offsetMap, 0L, stats, new CleanedTransactionMetadata) // Validate based on the file name that log segment file is renamed exactly once for async deletion assertEquals(expectedFileName, firstLogFile.file().getPath) @@ -432,37 +432,37 @@ class LogCleanerTest { log.roll() // first time through the records are removed - // Expected State: [{Producer1: EmptyBatch}, {Producer2: EmptyBatch}, {Producer2: Commit}, {2}, {3}] + // Expected State: [{Producer1: EmptyBatch}, {Producer2: EmptyBatch}, {Producer2: Commit}, {2}, {3}, {Producer1: Commit}] var dirtyOffset = cleaner.doClean(LogToClean(tp, log, 0L, 100L), deleteHorizonMs = Long.MaxValue)._1 assertEquals(List(2, 3), LogTest.keysInLog(log)) - assertEquals(List(4, 5, 6), offsetsInLog(log)) - assertEquals(List(1, 3, 4, 5, 6), lastOffsetsPerBatchInLog(log)) + assertEquals(List(4, 5, 6, 7), offsetsInLog(log)) + assertEquals(List(1, 3, 4, 5, 6, 7), lastOffsetsPerBatchInLog(log)) // the empty batch remains if cleaned again because it still holds the last sequence - // Expected State: [{Producer1: EmptyBatch}, {Producer2: EmptyBatch}, {Producer2: Commit}, {2}, {3}] + // Expected State: [{Producer1: EmptyBatch}, {Producer2: EmptyBatch}, {Producer2: Commit}, {2}, {3}, {Producer1: Commit}] dirtyOffset = cleaner.doClean(LogToClean(tp, log, dirtyOffset, 100L), deleteHorizonMs = Long.MaxValue)._1 assertEquals(List(2, 3), LogTest.keysInLog(log)) - assertEquals(List(4, 5, 6), offsetsInLog(log)) - assertEquals(List(1, 3, 4, 5, 6), lastOffsetsPerBatchInLog(log)) + assertEquals(List(4, 5, 6, 7), offsetsInLog(log)) + assertEquals(List(1, 3, 4, 5, 6, 7), lastOffsetsPerBatchInLog(log)) // append a new record from the producer to allow cleaning of the empty batch - // [{Producer1: EmptyBatch}, {Producer2: EmptyBatch}, {Producer2: Commit}, {2}, {3}] [{Producer2: 1}, {Producer2: Commit}] - // {1}, {3}, {4}, {5}, {6}, {8}, {9} ==> Offsets + // [{Producer1: EmptyBatch}, {Producer2: EmptyBatch}, {Producer2: Commit}, {2}, {3}, {Producer1: Commit}, {Producer2: 1}, {Producer2: Commit}] + // {1}, {3}, {4}, {5}, {6}, {7}, {8}, {9} ==> Offsets producer2(Seq(1)) // offset 8 log.appendAsLeader(commitMarker(2L, producerEpoch), leaderEpoch = 0, isFromClient = false) // offset 9 log.roll() - // Expected State: [{Producer1: EmptyBatch}, {Producer2: Commit}, {2}, {3}, {Producer2: 1}, {Producer2: Commit}] + // Expected State: [{Producer1: EmptyBatch}, {Producer2: Commit}, {2}, {3}, {Producer1: Commit}, {Producer2: 1}, {Producer2: Commit}] dirtyOffset = cleaner.doClean(LogToClean(tp, log, dirtyOffset, 100L), deleteHorizonMs = Long.MaxValue)._1 assertEquals(List(2, 3, 1), LogTest.keysInLog(log)) - assertEquals(List(4, 5, 6, 8, 9), offsetsInLog(log)) - assertEquals(List(1, 4, 5, 6, 8, 9), lastOffsetsPerBatchInLog(log)) + assertEquals(List(4, 5, 6, 7, 8, 9), offsetsInLog(log)) + assertEquals(List(1, 4, 5, 6, 7, 8, 9), lastOffsetsPerBatchInLog(log)) - // Expected State: [{Producer1: EmptyBatch}, {2}, {3}, {Producer2: 1}, {Producer2: Commit}] + // Expected State: [{Producer1: EmptyBatch}, {2}, {3}, {Producer1: Commit}, {Producer2: 1}, {Producer2: Commit}] dirtyOffset = cleaner.doClean(LogToClean(tp, log, dirtyOffset, 100L), deleteHorizonMs = Long.MaxValue)._1 assertEquals(List(2, 3, 1), LogTest.keysInLog(log)) - assertEquals(List(5, 6, 8, 9), offsetsInLog(log)) - assertEquals(List(1, 5, 6, 8, 9), lastOffsetsPerBatchInLog(log)) + assertEquals(List(5, 6, 7, 8, 9), offsetsInLog(log)) + assertEquals(List(1, 5, 6, 7, 8, 9), lastOffsetsPerBatchInLog(log)) } @Test @@ -496,6 +496,59 @@ class LogCleanerTest { assertEquals(List(0, 1, 2), lastOffsetsPerBatchInLog(log)) } + @Test + def testCommittedTransactionSpanningSegments(): Unit = { + val tp = new TopicPartition("test", 0) + val cleaner = makeCleaner(Int.MaxValue) + val logProps = new Properties() + logProps.put(LogConfig.SegmentBytesProp, 128: java.lang.Integer) + val log = makeLog(config = LogConfig.fromProps(logConfig.originals, logProps)) + val producerEpoch = 0.toShort + val producerId = 1L + + val appendTransaction = appendTransactionalAsLeader(log, producerId, producerEpoch) + appendTransaction(Seq(1)) + log.roll() + + log.appendAsLeader(commitMarker(producerId, producerEpoch), leaderEpoch = 0, isFromClient = false) + log.roll() + + // Both the record and the marker should remain after cleaning + cleaner.doClean(LogToClean(tp, log, 0L, 100L), deleteHorizonMs = Long.MaxValue) + assertEquals(List(0, 1), offsetsInLog(log)) + assertEquals(List(0, 1), lastOffsetsPerBatchInLog(log)) + } + + @Test + def testAbortedTransactionSpanningSegments(): Unit = { + val tp = new TopicPartition("test", 0) + val cleaner = makeCleaner(Int.MaxValue) + val logProps = new Properties() + logProps.put(LogConfig.SegmentBytesProp, 128: java.lang.Integer) + val log = makeLog(config = LogConfig.fromProps(logConfig.originals, logProps)) + val producerEpoch = 0.toShort + val producerId = 1L + + val appendTransaction = appendTransactionalAsLeader(log, producerId, producerEpoch) + appendTransaction(Seq(1)) + log.roll() + + log.appendAsLeader(abortMarker(producerId, producerEpoch), leaderEpoch = 0, isFromClient = false) + log.roll() + + // Both the batch and the marker should remain after cleaning. The batch is retained + // because it is the last entry for this producerId. The marker is retained because + // there are still batches remaining from this transaction. + cleaner.doClean(LogToClean(tp, log, 0L, 100L), deleteHorizonMs = Long.MaxValue) + assertEquals(List(1), offsetsInLog(log)) + assertEquals(List(0, 1), lastOffsetsPerBatchInLog(log)) + + // The empty batch and the marker is still retained after a second cleaning. + cleaner.doClean(LogToClean(tp, log, 0L, 100L), deleteHorizonMs = Long.MaxValue) + assertEquals(List(1), offsetsInLog(log)) + assertEquals(List(0, 1), lastOffsetsPerBatchInLog(log)) + } + @Test def testAbortMarkerRemoval(): Unit = { val tp = new TopicPartition("test", 0) @@ -650,7 +703,7 @@ class LogCleanerTest { // clean the log val stats = new CleanerStats() - cleaner.cleanSegments(log, Seq(log.logSegments.head), map, 0L, stats) + cleaner.cleanSegments(log, Seq(log.logSegments.head), map, 0L, stats, new CleanedTransactionMetadata) val shouldRemain = LogTest.keysInLog(log).filter(!keys.contains(_)) assertEquals(shouldRemain, LogTest.keysInLog(log)) } @@ -663,7 +716,7 @@ class LogCleanerTest { val (log, offsetMap) = createLogWithMessagesLargerThanMaxSize(largeMessageSize = 1024 * 1024) val cleaner = makeCleaner(Int.MaxValue, maxMessageSize=1024) - cleaner.cleanSegments(log, Seq(log.logSegments.head), offsetMap, 0L, new CleanerStats) + cleaner.cleanSegments(log, Seq(log.logSegments.head), offsetMap, 0L, new CleanerStats, new CleanedTransactionMetadata) val shouldRemain = LogTest.keysInLog(log).filter(k => !offsetMap.map.containsKey(k.toString)) assertEquals(shouldRemain, LogTest.keysInLog(log)) } @@ -682,7 +735,7 @@ class LogCleanerTest { val cleaner = makeCleaner(Int.MaxValue, maxMessageSize=1024) intercept[CorruptRecordException] { - cleaner.cleanSegments(log, Seq(log.logSegments.head), offsetMap, 0L, new CleanerStats) + cleaner.cleanSegments(log, Seq(log.logSegments.head), offsetMap, 0L, new CleanerStats, new CleanedTransactionMetadata) } } @@ -699,7 +752,7 @@ class LogCleanerTest { val cleaner = makeCleaner(Int.MaxValue, maxMessageSize=1024) intercept[CorruptRecordException] { - cleaner.cleanSegments(log, Seq(log.logSegments.head), offsetMap, 0L, new CleanerStats) + cleaner.cleanSegments(log, Seq(log.logSegments.head), offsetMap, 0L, new CleanerStats, new CleanedTransactionMetadata) } } @@ -1030,7 +1083,8 @@ class LogCleanerTest { val map = new FakeOffsetMap(Int.MaxValue) keys.foreach(k => map.put(key(k), Long.MaxValue)) intercept[LogCleaningAbortedException] { - cleaner.cleanSegments(log, log.logSegments.take(3).toSeq, map, 0L, new CleanerStats()) + cleaner.cleanSegments(log, log.logSegments.take(3).toSeq, map, 0L, new CleanerStats(), + new CleanedTransactionMetadata) } } @@ -1239,7 +1293,8 @@ class LogCleanerTest { // Try to clean segment with offset overflow. This will trigger log split and the cleaning itself must abort. assertThrows[LogCleaningAbortedException] { - cleaner.cleanSegments(log, List(segmentWithOverflow), offsetMap, 0L, new CleanerStats()) + cleaner.cleanSegments(log, List(segmentWithOverflow), offsetMap, 0L, new CleanerStats(), + new CleanedTransactionMetadata) } assertEquals(numSegmentsInitial + 1, log.logSegments.size) assertEquals(allKeys, LogTest.keysInLog(log)) @@ -1247,7 +1302,8 @@ class LogCleanerTest { // Clean each segment now that split is complete. for (segmentToClean <- log.logSegments) - cleaner.cleanSegments(log, List(segmentToClean), offsetMap, 0L, new CleanerStats()) + cleaner.cleanSegments(log, List(segmentToClean), offsetMap, 0L, new CleanerStats(), + new CleanedTransactionMetadata) assertEquals(expectedKeysAfterCleaning, LogTest.keysInLog(log)) assertFalse(LogTest.hasOffsetOverflow(log)) log.close() @@ -1287,7 +1343,8 @@ class LogCleanerTest { offsetMap.put(key(k), Long.MaxValue) // clean the log - cleaner.cleanSegments(log, log.logSegments.take(9).toSeq, offsetMap, 0L, new CleanerStats()) + cleaner.cleanSegments(log, log.logSegments.take(9).toSeq, offsetMap, 0L, new CleanerStats(), + new CleanedTransactionMetadata) // clear scheduler so that async deletes don't run time.scheduler.clear() var cleanedKeys = LogTest.keysInLog(log) @@ -1302,7 +1359,8 @@ class LogCleanerTest { log = recoverAndCheck(config, allKeys) // clean again - cleaner.cleanSegments(log, log.logSegments.take(9).toSeq, offsetMap, 0L, new CleanerStats()) + cleaner.cleanSegments(log, log.logSegments.take(9).toSeq, offsetMap, 0L, new CleanerStats(), + new CleanedTransactionMetadata) // clear scheduler so that async deletes don't run time.scheduler.clear() cleanedKeys = LogTest.keysInLog(log) @@ -1323,7 +1381,8 @@ class LogCleanerTest { } for (k <- 1 until messageCount by 2) offsetMap.put(key(k), Long.MaxValue) - cleaner.cleanSegments(log, log.logSegments.take(9).toSeq, offsetMap, 0L, new CleanerStats()) + cleaner.cleanSegments(log, log.logSegments.take(9).toSeq, offsetMap, 0L, new CleanerStats(), + new CleanedTransactionMetadata) // clear scheduler so that async deletes don't run time.scheduler.clear() cleanedKeys = LogTest.keysInLog(log) @@ -1340,7 +1399,8 @@ class LogCleanerTest { } for (k <- 1 until messageCount by 2) offsetMap.put(key(k), Long.MaxValue) - cleaner.cleanSegments(log, log.logSegments.take(9).toSeq, offsetMap, 0L, new CleanerStats()) + cleaner.cleanSegments(log, log.logSegments.take(9).toSeq, offsetMap, 0L, new CleanerStats(), + new CleanedTransactionMetadata) // clear scheduler so that async deletes don't run time.scheduler.clear() cleanedKeys = LogTest.keysInLog(log) @@ -1568,7 +1628,8 @@ class LogCleanerTest { appendIdempotentAsLeader(log, producerId, producerEpoch, isTransactional = true, isFromClient = isFromClient) } - private def appendIdempotentAsLeader(log: Log, producerId: Long, + private def appendIdempotentAsLeader(log: Log, + producerId: Long, producerEpoch: Short, isTransactional: Boolean = false, leaderEpoch: Int = 0,