Browse Source

KAFKA-8351; Cleaner should handle transactions spanning multiple segments (#6722)

When cleaning transactional data, we need to keep track of which transactions still have data associated with them so that we do not remove the markers. We had logic to do this, but the state was not being carried over when beginning cleaning for a new set of segments. This could cause the cleaner to incorrectly believe a transaction marker was no longer needed. The fix here carries the transactional state between groups of segments to be cleaned.

Reviewers: Dhruvil Shah <dhruvil@confluent.io>, Viktor Somogyi <viktorsomogyi@gmail.com>, Jason Gustafson <jason@confluent.io>
pull/6817/head
Jason Gustafson 6 years ago committed by GitHub
parent
commit
6afb0ca735
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 70
      core/src/main/scala/kafka/log/LogCleaner.scala
  2. 117
      core/src/test/scala/unit/kafka/log/LogCleanerTest.scala

70
core/src/main/scala/kafka/log/LogCleaner.scala

@ -330,7 +330,7 @@ class LogCleaner(initialConfig: CleanerConfig,
} }
val deletable: Iterable[(TopicPartition, Log)] = cleanerManager.deletableLogs() val deletable: Iterable[(TopicPartition, Log)] = cleanerManager.deletableLogs()
try { try {
deletable.foreach { case (topicPartition, log) => deletable.foreach { case (_, log) =>
currentLog = Some(log) currentLog = Some(log)
log.deleteOldSegments() log.deleteOldSegments()
} }
@ -520,8 +520,12 @@ private[log] class Cleaner(val id: Int,
// group the segments and clean the groups // group the segments and clean the groups
info("Cleaning log %s (cleaning prior to %s, discarding tombstones prior to %s)...".format(log.name, new Date(cleanableHorizonMs), new Date(deleteHorizonMs))) info("Cleaning log %s (cleaning prior to %s, discarding tombstones prior to %s)...".format(log.name, new Date(cleanableHorizonMs), new Date(deleteHorizonMs)))
for (group <- groupSegmentsBySize(log.logSegments(0, endOffset), log.config.segmentSize, log.config.maxIndexSize, cleanable.firstUncleanableOffset)) val transactionMetadata = new CleanedTransactionMetadata
cleanSegments(log, group, offsetMap, deleteHorizonMs, stats)
val groupedSegments = groupSegmentsBySize(log.logSegments(0, endOffset), log.config.segmentSize,
log.config.maxIndexSize, cleanable.firstUncleanableOffset)
for (group <- groupedSegments)
cleanSegments(log, group, offsetMap, deleteHorizonMs, stats, transactionMetadata)
// record buffer utilization // record buffer utilization
stats.bufferUtilization = offsetMap.utilization stats.bufferUtilization = offsetMap.utilization
@ -539,14 +543,18 @@ private[log] class Cleaner(val id: Int,
* @param map The offset map to use for cleaning segments * @param map The offset map to use for cleaning segments
* @param deleteHorizonMs The time to retain delete tombstones * @param deleteHorizonMs The time to retain delete tombstones
* @param stats Collector for cleaning statistics * @param stats Collector for cleaning statistics
* @param transactionMetadata State of ongoing transactions which is carried between the cleaning
* of the grouped segments
*/ */
private[log] def cleanSegments(log: Log, private[log] def cleanSegments(log: Log,
segments: Seq[LogSegment], segments: Seq[LogSegment],
map: OffsetMap, map: OffsetMap,
deleteHorizonMs: Long, deleteHorizonMs: Long,
stats: CleanerStats) { stats: CleanerStats,
transactionMetadata: CleanedTransactionMetadata): Unit = {
// create a new segment with a suffix appended to the name of the log and indexes // create a new segment with a suffix appended to the name of the log and indexes
val cleaned = LogCleaner.createNewCleanedSegment(log, segments.head.baseOffset) val cleaned = LogCleaner.createNewCleanedSegment(log, segments.head.baseOffset)
transactionMetadata.cleanedIndex = Some(cleaned.txnIndex)
try { try {
// clean segments into the new destination segment // clean segments into the new destination segment
@ -561,7 +569,7 @@ private[log] class Cleaner(val id: Int,
val startOffset = currentSegment.baseOffset val startOffset = currentSegment.baseOffset
val upperBoundOffset = nextSegmentOpt.map(_.baseOffset).getOrElse(map.latestOffset + 1) val upperBoundOffset = nextSegmentOpt.map(_.baseOffset).getOrElse(map.latestOffset + 1)
val abortedTransactions = log.collectAbortedTransactions(startOffset, upperBoundOffset) val abortedTransactions = log.collectAbortedTransactions(startOffset, upperBoundOffset)
val transactionMetadata = CleanedTransactionMetadata(abortedTransactions, Some(cleaned.txnIndex)) transactionMetadata.addAbortedTransactions(abortedTransactions)
val retainDeletes = currentSegment.lastModified > deleteHorizonMs val retainDeletes = currentSegment.lastModified > deleteHorizonMs
info(s"Cleaning segment $startOffset in log ${log.name} (largest timestamp ${new Date(currentSegment.largestTimestamp)}) " + info(s"Cleaning segment $startOffset in log ${log.name} (largest timestamp ${new Date(currentSegment.largestTimestamp)}) " +
@ -870,8 +878,9 @@ private[log] class Cleaner(val id: Int,
val dirty = log.logSegments(start, end).toBuffer val dirty = log.logSegments(start, end).toBuffer
info("Building offset map for log %s for %d segments in offset range [%d, %d).".format(log.name, dirty.size, start, end)) info("Building offset map for log %s for %d segments in offset range [%d, %d).".format(log.name, dirty.size, start, end))
val transactionMetadata = new CleanedTransactionMetadata
val abortedTransactions = log.collectAbortedTransactions(start, end) val abortedTransactions = log.collectAbortedTransactions(start, end)
val transactionMetadata = CleanedTransactionMetadata(abortedTransactions) transactionMetadata.addAbortedTransactions(abortedTransactions)
// Add all the cleanable dirty segments. We must take at least map.slots * load_factor, // Add all the cleanable dirty segments. We must take at least map.slots * load_factor,
// but we may be able to fit more (if there is lots of duplication in the dirty section of the log) // but we may be able to fit more (if there is lots of duplication in the dirty section of the log)
@ -1042,29 +1051,26 @@ private case class LogToClean(topicPartition: TopicPartition, log: Log, firstDir
override def compare(that: LogToClean): Int = math.signum(this.cleanableRatio - that.cleanableRatio).toInt override def compare(that: LogToClean): Int = math.signum(this.cleanableRatio - that.cleanableRatio).toInt
} }
private[log] object CleanedTransactionMetadata {
def apply(abortedTransactions: List[AbortedTxn],
transactionIndex: Option[TransactionIndex] = None): CleanedTransactionMetadata = {
val queue = mutable.PriorityQueue.empty[AbortedTxn](new Ordering[AbortedTxn] {
override def compare(x: AbortedTxn, y: AbortedTxn): Int = x.firstOffset compare y.firstOffset
}.reverse)
queue ++= abortedTransactions
new CleanedTransactionMetadata(queue, transactionIndex)
}
val Empty = CleanedTransactionMetadata(List.empty[AbortedTxn])
}
/** /**
* This is a helper class to facilitate tracking transaction state while cleaning the log. It is initialized * This is a helper class to facilitate tracking transaction state while cleaning the log. It maintains a set
* with the aborted transactions from the transaction index and its state is updated as the cleaner iterates through * of the ongoing aborted and committed transactions as the cleaner is working its way through the log. This
* the log during a round of cleaning. This class is responsible for deciding when transaction markers can * class is responsible for deciding when transaction markers can be removed and is therefore also responsible
* be removed and is therefore also responsible for updating the cleaned transaction index accordingly. * for updating the cleaned transaction index accordingly.
*/ */
private[log] class CleanedTransactionMetadata(val abortedTransactions: mutable.PriorityQueue[AbortedTxn], private[log] class CleanedTransactionMetadata {
val transactionIndex: Option[TransactionIndex] = None) { private val ongoingCommittedTxns = mutable.Set.empty[Long]
val ongoingCommittedTxns = mutable.Set.empty[Long] private val ongoingAbortedTxns = mutable.Map.empty[Long, AbortedTransactionMetadata]
val ongoingAbortedTxns = mutable.Map.empty[Long, AbortedTransactionMetadata] // Minheap of aborted transactions sorted by the transaction first offset
private var abortedTransactions = mutable.PriorityQueue.empty[AbortedTxn](new Ordering[AbortedTxn] {
override def compare(x: AbortedTxn, y: AbortedTxn): Int = x.firstOffset compare y.firstOffset
}.reverse)
// Output cleaned index to write retained aborted transactions
var cleanedIndex: Option[TransactionIndex] = None
def addAbortedTransactions(abortedTransactions: List[AbortedTxn]): Unit = {
this.abortedTransactions ++= abortedTransactions
}
/** /**
* Update the cleaned transaction state with a control batch that has just been traversed by the cleaner. * Update the cleaned transaction state with a control batch that has just been traversed by the cleaner.
@ -1081,9 +1087,11 @@ private[log] class CleanedTransactionMetadata(val abortedTransactions: mutable.P
controlType match { controlType match {
case ControlRecordType.ABORT => case ControlRecordType.ABORT =>
ongoingAbortedTxns.remove(producerId) match { ongoingAbortedTxns.remove(producerId) match {
// Retain the marker until all batches from the transaction have been removed // Retain the marker until all batches from the transaction have been removed.
// We may retain a record from an aborted transaction if it is the last entry
// written by a given producerId.
case Some(abortedTxnMetadata) if abortedTxnMetadata.lastObservedBatchOffset.isDefined => case Some(abortedTxnMetadata) if abortedTxnMetadata.lastObservedBatchOffset.isDefined =>
transactionIndex.foreach(_.append(abortedTxnMetadata.abortedTxn)) cleanedIndex.foreach(_.append(abortedTxnMetadata.abortedTxn))
false false
case _ => true case _ => true
} }
@ -1103,7 +1111,7 @@ private[log] class CleanedTransactionMetadata(val abortedTransactions: mutable.P
private def consumeAbortedTxnsUpTo(offset: Long): Unit = { private def consumeAbortedTxnsUpTo(offset: Long): Unit = {
while (abortedTransactions.headOption.exists(_.firstOffset <= offset)) { while (abortedTransactions.headOption.exists(_.firstOffset <= offset)) {
val abortedTxn = abortedTransactions.dequeue() val abortedTxn = abortedTransactions.dequeue()
ongoingAbortedTxns += abortedTxn.producerId -> new AbortedTransactionMetadata(abortedTxn) ongoingAbortedTxns.getOrElseUpdate(abortedTxn.producerId, new AbortedTransactionMetadata(abortedTxn))
} }
} }
@ -1131,4 +1139,6 @@ private[log] class CleanedTransactionMetadata(val abortedTransactions: mutable.P
private class AbortedTransactionMetadata(val abortedTxn: AbortedTxn) { private class AbortedTransactionMetadata(val abortedTxn: AbortedTxn) {
var lastObservedBatchOffset: Option[Long] = None var lastObservedBatchOffset: Option[Long] = None
override def toString: String = s"(txn: $abortedTxn, lastOffset: $lastObservedBatchOffset)"
} }

117
core/src/test/scala/unit/kafka/log/LogCleanerTest.scala

@ -84,7 +84,7 @@ class LogCleanerTest {
val segments = log.logSegments.take(3).toSeq val segments = log.logSegments.take(3).toSeq
val stats = new CleanerStats() val stats = new CleanerStats()
val expectedBytesRead = segments.map(_.size).sum val expectedBytesRead = segments.map(_.size).sum
cleaner.cleanSegments(log, segments, map, 0L, stats) cleaner.cleanSegments(log, segments, map, 0L, stats, new CleanedTransactionMetadata)
val shouldRemain = LogTest.keysInLog(log).filter(!keys.contains(_)) val shouldRemain = LogTest.keysInLog(log).filter(!keys.contains(_))
assertEquals(shouldRemain, LogTest.keysInLog(log)) assertEquals(shouldRemain, LogTest.keysInLog(log))
assertEquals(expectedBytesRead, stats.bytesRead) assertEquals(expectedBytesRead, stats.bytesRead)
@ -151,7 +151,7 @@ class LogCleanerTest {
val segments = log.logSegments(0, log.activeSegment.baseOffset).toSeq val segments = log.logSegments(0, log.activeSegment.baseOffset).toSeq
val stats = new CleanerStats() val stats = new CleanerStats()
cleaner.buildOffsetMap(log, 0, log.activeSegment.baseOffset, offsetMap, stats) cleaner.buildOffsetMap(log, 0, log.activeSegment.baseOffset, offsetMap, stats)
cleaner.cleanSegments(log, segments, offsetMap, 0L, stats) cleaner.cleanSegments(log, segments, offsetMap, 0L, stats, new CleanedTransactionMetadata)
// Validate based on the file name that log segment file is renamed exactly once for async deletion // Validate based on the file name that log segment file is renamed exactly once for async deletion
assertEquals(expectedFileName, firstLogFile.file().getPath) assertEquals(expectedFileName, firstLogFile.file().getPath)
@ -432,37 +432,37 @@ class LogCleanerTest {
log.roll() log.roll()
// first time through the records are removed // first time through the records are removed
// Expected State: [{Producer1: EmptyBatch}, {Producer2: EmptyBatch}, {Producer2: Commit}, {2}, {3}] // Expected State: [{Producer1: EmptyBatch}, {Producer2: EmptyBatch}, {Producer2: Commit}, {2}, {3}, {Producer1: Commit}]
var dirtyOffset = cleaner.doClean(LogToClean(tp, log, 0L, 100L), deleteHorizonMs = Long.MaxValue)._1 var dirtyOffset = cleaner.doClean(LogToClean(tp, log, 0L, 100L), deleteHorizonMs = Long.MaxValue)._1
assertEquals(List(2, 3), LogTest.keysInLog(log)) assertEquals(List(2, 3), LogTest.keysInLog(log))
assertEquals(List(4, 5, 6), offsetsInLog(log)) assertEquals(List(4, 5, 6, 7), offsetsInLog(log))
assertEquals(List(1, 3, 4, 5, 6), lastOffsetsPerBatchInLog(log)) assertEquals(List(1, 3, 4, 5, 6, 7), lastOffsetsPerBatchInLog(log))
// the empty batch remains if cleaned again because it still holds the last sequence // the empty batch remains if cleaned again because it still holds the last sequence
// Expected State: [{Producer1: EmptyBatch}, {Producer2: EmptyBatch}, {Producer2: Commit}, {2}, {3}] // Expected State: [{Producer1: EmptyBatch}, {Producer2: EmptyBatch}, {Producer2: Commit}, {2}, {3}, {Producer1: Commit}]
dirtyOffset = cleaner.doClean(LogToClean(tp, log, dirtyOffset, 100L), deleteHorizonMs = Long.MaxValue)._1 dirtyOffset = cleaner.doClean(LogToClean(tp, log, dirtyOffset, 100L), deleteHorizonMs = Long.MaxValue)._1
assertEquals(List(2, 3), LogTest.keysInLog(log)) assertEquals(List(2, 3), LogTest.keysInLog(log))
assertEquals(List(4, 5, 6), offsetsInLog(log)) assertEquals(List(4, 5, 6, 7), offsetsInLog(log))
assertEquals(List(1, 3, 4, 5, 6), lastOffsetsPerBatchInLog(log)) assertEquals(List(1, 3, 4, 5, 6, 7), lastOffsetsPerBatchInLog(log))
// append a new record from the producer to allow cleaning of the empty batch // append a new record from the producer to allow cleaning of the empty batch
// [{Producer1: EmptyBatch}, {Producer2: EmptyBatch}, {Producer2: Commit}, {2}, {3}] [{Producer2: 1}, {Producer2: Commit}] // [{Producer1: EmptyBatch}, {Producer2: EmptyBatch}, {Producer2: Commit}, {2}, {3}, {Producer1: Commit}, {Producer2: 1}, {Producer2: Commit}]
// {1}, {3}, {4}, {5}, {6}, {8}, {9} ==> Offsets // {1}, {3}, {4}, {5}, {6}, {7}, {8}, {9} ==> Offsets
producer2(Seq(1)) // offset 8 producer2(Seq(1)) // offset 8
log.appendAsLeader(commitMarker(2L, producerEpoch), leaderEpoch = 0, isFromClient = false) // offset 9 log.appendAsLeader(commitMarker(2L, producerEpoch), leaderEpoch = 0, isFromClient = false) // offset 9
log.roll() log.roll()
// Expected State: [{Producer1: EmptyBatch}, {Producer2: Commit}, {2}, {3}, {Producer2: 1}, {Producer2: Commit}] // Expected State: [{Producer1: EmptyBatch}, {Producer2: Commit}, {2}, {3}, {Producer1: Commit}, {Producer2: 1}, {Producer2: Commit}]
dirtyOffset = cleaner.doClean(LogToClean(tp, log, dirtyOffset, 100L), deleteHorizonMs = Long.MaxValue)._1 dirtyOffset = cleaner.doClean(LogToClean(tp, log, dirtyOffset, 100L), deleteHorizonMs = Long.MaxValue)._1
assertEquals(List(2, 3, 1), LogTest.keysInLog(log)) assertEquals(List(2, 3, 1), LogTest.keysInLog(log))
assertEquals(List(4, 5, 6, 8, 9), offsetsInLog(log)) assertEquals(List(4, 5, 6, 7, 8, 9), offsetsInLog(log))
assertEquals(List(1, 4, 5, 6, 8, 9), lastOffsetsPerBatchInLog(log)) assertEquals(List(1, 4, 5, 6, 7, 8, 9), lastOffsetsPerBatchInLog(log))
// Expected State: [{Producer1: EmptyBatch}, {2}, {3}, {Producer2: 1}, {Producer2: Commit}] // Expected State: [{Producer1: EmptyBatch}, {2}, {3}, {Producer1: Commit}, {Producer2: 1}, {Producer2: Commit}]
dirtyOffset = cleaner.doClean(LogToClean(tp, log, dirtyOffset, 100L), deleteHorizonMs = Long.MaxValue)._1 dirtyOffset = cleaner.doClean(LogToClean(tp, log, dirtyOffset, 100L), deleteHorizonMs = Long.MaxValue)._1
assertEquals(List(2, 3, 1), LogTest.keysInLog(log)) assertEquals(List(2, 3, 1), LogTest.keysInLog(log))
assertEquals(List(5, 6, 8, 9), offsetsInLog(log)) assertEquals(List(5, 6, 7, 8, 9), offsetsInLog(log))
assertEquals(List(1, 5, 6, 8, 9), lastOffsetsPerBatchInLog(log)) assertEquals(List(1, 5, 6, 7, 8, 9), lastOffsetsPerBatchInLog(log))
} }
@Test @Test
@ -496,6 +496,59 @@ class LogCleanerTest {
assertEquals(List(0, 1, 2), lastOffsetsPerBatchInLog(log)) assertEquals(List(0, 1, 2), lastOffsetsPerBatchInLog(log))
} }
@Test
def testCommittedTransactionSpanningSegments(): Unit = {
val tp = new TopicPartition("test", 0)
val cleaner = makeCleaner(Int.MaxValue)
val logProps = new Properties()
logProps.put(LogConfig.SegmentBytesProp, 128: java.lang.Integer)
val log = makeLog(config = LogConfig.fromProps(logConfig.originals, logProps))
val producerEpoch = 0.toShort
val producerId = 1L
val appendTransaction = appendTransactionalAsLeader(log, producerId, producerEpoch)
appendTransaction(Seq(1))
log.roll()
log.appendAsLeader(commitMarker(producerId, producerEpoch), leaderEpoch = 0, isFromClient = false)
log.roll()
// Both the record and the marker should remain after cleaning
cleaner.doClean(LogToClean(tp, log, 0L, 100L), deleteHorizonMs = Long.MaxValue)
assertEquals(List(0, 1), offsetsInLog(log))
assertEquals(List(0, 1), lastOffsetsPerBatchInLog(log))
}
@Test
def testAbortedTransactionSpanningSegments(): Unit = {
val tp = new TopicPartition("test", 0)
val cleaner = makeCleaner(Int.MaxValue)
val logProps = new Properties()
logProps.put(LogConfig.SegmentBytesProp, 128: java.lang.Integer)
val log = makeLog(config = LogConfig.fromProps(logConfig.originals, logProps))
val producerEpoch = 0.toShort
val producerId = 1L
val appendTransaction = appendTransactionalAsLeader(log, producerId, producerEpoch)
appendTransaction(Seq(1))
log.roll()
log.appendAsLeader(abortMarker(producerId, producerEpoch), leaderEpoch = 0, isFromClient = false)
log.roll()
// Both the batch and the marker should remain after cleaning. The batch is retained
// because it is the last entry for this producerId. The marker is retained because
// there are still batches remaining from this transaction.
cleaner.doClean(LogToClean(tp, log, 0L, 100L), deleteHorizonMs = Long.MaxValue)
assertEquals(List(1), offsetsInLog(log))
assertEquals(List(0, 1), lastOffsetsPerBatchInLog(log))
// The empty batch and the marker is still retained after a second cleaning.
cleaner.doClean(LogToClean(tp, log, 0L, 100L), deleteHorizonMs = Long.MaxValue)
assertEquals(List(1), offsetsInLog(log))
assertEquals(List(0, 1), lastOffsetsPerBatchInLog(log))
}
@Test @Test
def testAbortMarkerRemoval(): Unit = { def testAbortMarkerRemoval(): Unit = {
val tp = new TopicPartition("test", 0) val tp = new TopicPartition("test", 0)
@ -650,7 +703,7 @@ class LogCleanerTest {
// clean the log // clean the log
val stats = new CleanerStats() val stats = new CleanerStats()
cleaner.cleanSegments(log, Seq(log.logSegments.head), map, 0L, stats) cleaner.cleanSegments(log, Seq(log.logSegments.head), map, 0L, stats, new CleanedTransactionMetadata)
val shouldRemain = LogTest.keysInLog(log).filter(!keys.contains(_)) val shouldRemain = LogTest.keysInLog(log).filter(!keys.contains(_))
assertEquals(shouldRemain, LogTest.keysInLog(log)) assertEquals(shouldRemain, LogTest.keysInLog(log))
} }
@ -663,7 +716,7 @@ class LogCleanerTest {
val (log, offsetMap) = createLogWithMessagesLargerThanMaxSize(largeMessageSize = 1024 * 1024) val (log, offsetMap) = createLogWithMessagesLargerThanMaxSize(largeMessageSize = 1024 * 1024)
val cleaner = makeCleaner(Int.MaxValue, maxMessageSize=1024) val cleaner = makeCleaner(Int.MaxValue, maxMessageSize=1024)
cleaner.cleanSegments(log, Seq(log.logSegments.head), offsetMap, 0L, new CleanerStats) cleaner.cleanSegments(log, Seq(log.logSegments.head), offsetMap, 0L, new CleanerStats, new CleanedTransactionMetadata)
val shouldRemain = LogTest.keysInLog(log).filter(k => !offsetMap.map.containsKey(k.toString)) val shouldRemain = LogTest.keysInLog(log).filter(k => !offsetMap.map.containsKey(k.toString))
assertEquals(shouldRemain, LogTest.keysInLog(log)) assertEquals(shouldRemain, LogTest.keysInLog(log))
} }
@ -682,7 +735,7 @@ class LogCleanerTest {
val cleaner = makeCleaner(Int.MaxValue, maxMessageSize=1024) val cleaner = makeCleaner(Int.MaxValue, maxMessageSize=1024)
intercept[CorruptRecordException] { intercept[CorruptRecordException] {
cleaner.cleanSegments(log, Seq(log.logSegments.head), offsetMap, 0L, new CleanerStats) cleaner.cleanSegments(log, Seq(log.logSegments.head), offsetMap, 0L, new CleanerStats, new CleanedTransactionMetadata)
} }
} }
@ -699,7 +752,7 @@ class LogCleanerTest {
val cleaner = makeCleaner(Int.MaxValue, maxMessageSize=1024) val cleaner = makeCleaner(Int.MaxValue, maxMessageSize=1024)
intercept[CorruptRecordException] { intercept[CorruptRecordException] {
cleaner.cleanSegments(log, Seq(log.logSegments.head), offsetMap, 0L, new CleanerStats) cleaner.cleanSegments(log, Seq(log.logSegments.head), offsetMap, 0L, new CleanerStats, new CleanedTransactionMetadata)
} }
} }
@ -1030,7 +1083,8 @@ class LogCleanerTest {
val map = new FakeOffsetMap(Int.MaxValue) val map = new FakeOffsetMap(Int.MaxValue)
keys.foreach(k => map.put(key(k), Long.MaxValue)) keys.foreach(k => map.put(key(k), Long.MaxValue))
intercept[LogCleaningAbortedException] { intercept[LogCleaningAbortedException] {
cleaner.cleanSegments(log, log.logSegments.take(3).toSeq, map, 0L, new CleanerStats()) cleaner.cleanSegments(log, log.logSegments.take(3).toSeq, map, 0L, new CleanerStats(),
new CleanedTransactionMetadata)
} }
} }
@ -1239,7 +1293,8 @@ class LogCleanerTest {
// Try to clean segment with offset overflow. This will trigger log split and the cleaning itself must abort. // Try to clean segment with offset overflow. This will trigger log split and the cleaning itself must abort.
assertThrows[LogCleaningAbortedException] { assertThrows[LogCleaningAbortedException] {
cleaner.cleanSegments(log, List(segmentWithOverflow), offsetMap, 0L, new CleanerStats()) cleaner.cleanSegments(log, List(segmentWithOverflow), offsetMap, 0L, new CleanerStats(),
new CleanedTransactionMetadata)
} }
assertEquals(numSegmentsInitial + 1, log.logSegments.size) assertEquals(numSegmentsInitial + 1, log.logSegments.size)
assertEquals(allKeys, LogTest.keysInLog(log)) assertEquals(allKeys, LogTest.keysInLog(log))
@ -1247,7 +1302,8 @@ class LogCleanerTest {
// Clean each segment now that split is complete. // Clean each segment now that split is complete.
for (segmentToClean <- log.logSegments) for (segmentToClean <- log.logSegments)
cleaner.cleanSegments(log, List(segmentToClean), offsetMap, 0L, new CleanerStats()) cleaner.cleanSegments(log, List(segmentToClean), offsetMap, 0L, new CleanerStats(),
new CleanedTransactionMetadata)
assertEquals(expectedKeysAfterCleaning, LogTest.keysInLog(log)) assertEquals(expectedKeysAfterCleaning, LogTest.keysInLog(log))
assertFalse(LogTest.hasOffsetOverflow(log)) assertFalse(LogTest.hasOffsetOverflow(log))
log.close() log.close()
@ -1287,7 +1343,8 @@ class LogCleanerTest {
offsetMap.put(key(k), Long.MaxValue) offsetMap.put(key(k), Long.MaxValue)
// clean the log // clean the log
cleaner.cleanSegments(log, log.logSegments.take(9).toSeq, offsetMap, 0L, new CleanerStats()) cleaner.cleanSegments(log, log.logSegments.take(9).toSeq, offsetMap, 0L, new CleanerStats(),
new CleanedTransactionMetadata)
// clear scheduler so that async deletes don't run // clear scheduler so that async deletes don't run
time.scheduler.clear() time.scheduler.clear()
var cleanedKeys = LogTest.keysInLog(log) var cleanedKeys = LogTest.keysInLog(log)
@ -1302,7 +1359,8 @@ class LogCleanerTest {
log = recoverAndCheck(config, allKeys) log = recoverAndCheck(config, allKeys)
// clean again // clean again
cleaner.cleanSegments(log, log.logSegments.take(9).toSeq, offsetMap, 0L, new CleanerStats()) cleaner.cleanSegments(log, log.logSegments.take(9).toSeq, offsetMap, 0L, new CleanerStats(),
new CleanedTransactionMetadata)
// clear scheduler so that async deletes don't run // clear scheduler so that async deletes don't run
time.scheduler.clear() time.scheduler.clear()
cleanedKeys = LogTest.keysInLog(log) cleanedKeys = LogTest.keysInLog(log)
@ -1323,7 +1381,8 @@ class LogCleanerTest {
} }
for (k <- 1 until messageCount by 2) for (k <- 1 until messageCount by 2)
offsetMap.put(key(k), Long.MaxValue) offsetMap.put(key(k), Long.MaxValue)
cleaner.cleanSegments(log, log.logSegments.take(9).toSeq, offsetMap, 0L, new CleanerStats()) cleaner.cleanSegments(log, log.logSegments.take(9).toSeq, offsetMap, 0L, new CleanerStats(),
new CleanedTransactionMetadata)
// clear scheduler so that async deletes don't run // clear scheduler so that async deletes don't run
time.scheduler.clear() time.scheduler.clear()
cleanedKeys = LogTest.keysInLog(log) cleanedKeys = LogTest.keysInLog(log)
@ -1340,7 +1399,8 @@ class LogCleanerTest {
} }
for (k <- 1 until messageCount by 2) for (k <- 1 until messageCount by 2)
offsetMap.put(key(k), Long.MaxValue) offsetMap.put(key(k), Long.MaxValue)
cleaner.cleanSegments(log, log.logSegments.take(9).toSeq, offsetMap, 0L, new CleanerStats()) cleaner.cleanSegments(log, log.logSegments.take(9).toSeq, offsetMap, 0L, new CleanerStats(),
new CleanedTransactionMetadata)
// clear scheduler so that async deletes don't run // clear scheduler so that async deletes don't run
time.scheduler.clear() time.scheduler.clear()
cleanedKeys = LogTest.keysInLog(log) cleanedKeys = LogTest.keysInLog(log)
@ -1568,7 +1628,8 @@ class LogCleanerTest {
appendIdempotentAsLeader(log, producerId, producerEpoch, isTransactional = true, isFromClient = isFromClient) appendIdempotentAsLeader(log, producerId, producerEpoch, isTransactional = true, isFromClient = isFromClient)
} }
private def appendIdempotentAsLeader(log: Log, producerId: Long, private def appendIdempotentAsLeader(log: Log,
producerId: Long,
producerEpoch: Short, producerEpoch: Short,
isTransactional: Boolean = false, isTransactional: Boolean = false,
leaderEpoch: Int = 0, leaderEpoch: Int = 0,

Loading…
Cancel
Save