diff --git a/core/src/main/scala/kafka/log/Log.scala b/core/src/main/scala/kafka/log/Log.scala index 8068c9ba31a..b0af105f504 100644 --- a/core/src/main/scala/kafka/log/Log.scala +++ b/core/src/main/scala/kafka/log/Log.scala @@ -1241,7 +1241,6 @@ class Log(@volatile var dir: File, info(s"Incrementing log start offset to $newLogStartOffset") logStartOffset = newLogStartOffset leaderEpochCache.foreach(_.truncateFromStart(logStartOffset)) - producerStateManager.truncateHead(logStartOffset) maybeIncrementFirstUnstableOffset() } } diff --git a/core/src/main/scala/kafka/log/ProducerStateManager.scala b/core/src/main/scala/kafka/log/ProducerStateManager.scala index 995bf851eb0..ae5b77ab95f 100644 --- a/core/src/main/scala/kafka/log/ProducerStateManager.scala +++ b/core/src/main/scala/kafka/log/ProducerStateManager.scala @@ -113,7 +113,7 @@ private[log] class ProducerStateEntry(val producerId: Long, def lastDataOffset: Long = if (isEmpty) -1L else batchMetadata.last.lastOffset - def lastTimestamp = if (isEmpty) RecordBatch.NO_TIMESTAMP else batchMetadata.last.timestamp + def lastTimestamp: Long = if (isEmpty) RecordBatch.NO_TIMESTAMP else batchMetadata.last.timestamp def lastOffsetDelta : Int = if (isEmpty) 0 else batchMetadata.last.offsetDelta @@ -148,8 +148,6 @@ private[log] class ProducerStateEntry(val producerId: Long, this.currentTxnFirstOffset = nextEntry.currentTxnFirstOffset } - def removeBatchesOlderThan(offset: Long): Unit = batchMetadata.dropWhile(_.lastOffset < offset) - def findDuplicateBatch(batch: RecordBatch): Option[BatchMetadata] = { if (batch.producerEpoch != producerEpoch) None @@ -542,7 +540,7 @@ class ProducerStateManager(val topicPartition: TopicPartition, /** * Returns the last offset of this map */ - def mapEndOffset = lastMapOffset + def mapEndOffset: Long = lastMapOffset /** * Get a copy of the active producers @@ -557,9 +555,7 @@ class ProducerStateManager(val topicPartition: TopicPartition, case Some(file) => try { info(s"Loading producer state from snapshot file '$file'") - val loadedProducers = readSnapshot(file).filter { producerEntry => - isProducerRetained(producerEntry, logStartOffset) && !isProducerExpired(currentTime, producerEntry) - } + val loadedProducers = readSnapshot(file).filter { producerEntry => !isProducerExpired(currentTime, producerEntry) } loadedProducers.foreach(loadProducerEntry) lastSnapOffset = offsetFromFile(file) lastMapOffset = lastSnapOffset @@ -600,8 +596,10 @@ class ProducerStateManager(val topicPartition: TopicPartition, /** * Truncate the producer id mapping to the given offset range and reload the entries from the most recent - * snapshot in range (if there is one). Note that the log end offset is assumed to be less than - * or equal to the high watermark. + * snapshot in range (if there is one). We delete snapshot files prior to the logStartOffset but do not remove + * producer state from the map. This means that in-memory and on-disk state can diverge, and in the case of + * broker failover or unclean shutdown, any in-memory state not persisted in the snapshots will be lost. + * Note that the log end offset is assumed to be less than or equal to the high watermark. */ def truncateAndReload(logStartOffset: Long, logEndOffset: Long, currentTimeMs: Long): Unit = { // remove all out of range snapshots @@ -617,8 +615,6 @@ class ProducerStateManager(val topicPartition: TopicPartition, // safe to clear the unreplicated transactions unreplicatedTxns.clear() loadFromSnapshot(logStartOffset, currentTimeMs) - } else { - truncateHead(logStartOffset) } } @@ -692,46 +688,6 @@ class ProducerStateManager(val topicPartition: TopicPartition, */ def oldestSnapshotOffset: Option[Long] = oldestSnapshotFile.map(file => offsetFromFile(file)) - private def isProducerRetained(producerStateEntry: ProducerStateEntry, logStartOffset: Long): Boolean = { - producerStateEntry.removeBatchesOlderThan(logStartOffset) - producerStateEntry.lastDataOffset >= logStartOffset - } - - /** - * When we remove the head of the log due to retention, we need to clean up the id map. This method takes - * the new start offset and removes all producerIds which have a smaller last written offset. Additionally, - * we remove snapshots older than the new log start offset. - * - * Note that snapshots from offsets greater than the log start offset may have producers included which - * should no longer be retained: these producers will be removed if and when we need to load state from - * the snapshot. - */ - def truncateHead(logStartOffset: Long): Unit = { - val evictedProducerEntries = producers.filter { case (_, producerState) => - !isProducerRetained(producerState, logStartOffset) - } - val evictedProducerIds = evictedProducerEntries.keySet - - producers --= evictedProducerIds - removeEvictedOngoingTransactions(evictedProducerIds) - removeUnreplicatedTransactions(logStartOffset) - - if (lastMapOffset < logStartOffset) - lastMapOffset = logStartOffset - - deleteSnapshotsBefore(logStartOffset) - lastSnapOffset = latestSnapshotOffset.getOrElse(logStartOffset) - } - - private def removeEvictedOngoingTransactions(expiredProducerIds: collection.Set[Long]): Unit = { - val iterator = ongoingTxns.entrySet.iterator - while (iterator.hasNext) { - val txnEntry = iterator.next() - if (expiredProducerIds.contains(txnEntry.getValue.producerId)) - iterator.remove() - } - } - private def removeUnreplicatedTransactions(offset: Long): Unit = { val iterator = unreplicatedTxns.entrySet.iterator while (iterator.hasNext) { diff --git a/core/src/test/scala/unit/kafka/log/LogTest.scala b/core/src/test/scala/unit/kafka/log/LogTest.scala index c4465635704..29b564e0660 100755 --- a/core/src/test/scala/unit/kafka/log/LogTest.scala +++ b/core/src/test/scala/unit/kafka/log/LogTest.scala @@ -1070,15 +1070,17 @@ class LogTest { log.updateHighWatermark(log.logEndOffset) log.maybeIncrementLogStartOffset(1L) - assertEquals(1, log.activeProducersWithLastSequence.size) + // Deleting records should not remove producer state + assertEquals(2, log.activeProducersWithLastSequence.size) val retainedLastSeqOpt = log.activeProducersWithLastSequence.get(pid2) assertTrue(retainedLastSeqOpt.isDefined) assertEquals(0, retainedLastSeqOpt.get) log.close() + // Because the log start offset did not advance, producer snapshots will still be present and the state will be rebuilt val reloadedLog = createLog(logDir, logConfig, logStartOffset = 1L) - assertEquals(1, reloadedLog.activeProducersWithLastSequence.size) + assertEquals(2, reloadedLog.activeProducersWithLastSequence.size) val reloadedLastSeqOpt = log.activeProducersWithLastSequence.get(pid2) assertEquals(retainedLastSeqOpt, reloadedLastSeqOpt) } @@ -1104,14 +1106,16 @@ class LogTest { log.maybeIncrementLogStartOffset(1L) log.deleteOldSegments() + // Deleting records should not remove producer state assertEquals(1, log.logSegments.size) - assertEquals(1, log.activeProducersWithLastSequence.size) + assertEquals(2, log.activeProducersWithLastSequence.size) val retainedLastSeqOpt = log.activeProducersWithLastSequence.get(pid2) assertTrue(retainedLastSeqOpt.isDefined) assertEquals(0, retainedLastSeqOpt.get) log.close() + // After reloading log, producer state should not be regenerated val reloadedLog = createLog(logDir, logConfig, logStartOffset = 1L) assertEquals(1, reloadedLog.activeProducersWithLastSequence.size) val reloadedEntryOpt = log.activeProducersWithLastSequence.get(pid2) @@ -1162,8 +1166,9 @@ class LogTest { log.updateHighWatermark(log.logEndOffset) log.deleteOldSegments() + // Producer state should not be removed when deleting log segment assertEquals(2, log.logSegments.size) - assertEquals(Set(pid2), log.activeProducersWithLastSequence.keySet) + assertEquals(Set(pid1, pid2), log.activeProducersWithLastSequence.keySet) } @Test diff --git a/core/src/test/scala/unit/kafka/log/ProducerStateManagerTest.scala b/core/src/test/scala/unit/kafka/log/ProducerStateManagerTest.scala index e98e59e2e27..8500c94678a 100644 --- a/core/src/test/scala/unit/kafka/log/ProducerStateManagerTest.scala +++ b/core/src/test/scala/unit/kafka/log/ProducerStateManagerTest.scala @@ -561,52 +561,7 @@ class ProducerStateManagerTest { } @Test - def testFirstUnstableOffsetAfterEviction(): Unit = { - val epoch = 0.toShort - val sequence = 0 - append(stateManager, producerId, epoch, sequence, offset = 99, isTransactional = true) - assertEquals(Some(99), stateManager.firstUnstableOffset.map(_.messageOffset)) - append(stateManager, 2L, epoch, 0, offset = 106, isTransactional = true) - stateManager.truncateHead(100) - assertEquals(Some(106), stateManager.firstUnstableOffset.map(_.messageOffset)) - } - - @Test - def testTruncateHead(): Unit = { - val epoch = 0.toShort - - append(stateManager, producerId, epoch, 0, 0L) - append(stateManager, producerId, epoch, 1, 1L) - stateManager.takeSnapshot() - - val anotherPid = 2L - append(stateManager, anotherPid, epoch, 0, 2L) - append(stateManager, anotherPid, epoch, 1, 3L) - stateManager.takeSnapshot() - assertEquals(Set(2, 4), currentSnapshotOffsets) - - stateManager.truncateHead(2) - assertEquals(Set(2, 4), currentSnapshotOffsets) - assertEquals(Set(anotherPid), stateManager.activeProducers.keySet) - assertEquals(None, stateManager.lastEntry(producerId)) - - val maybeEntry = stateManager.lastEntry(anotherPid) - assertTrue(maybeEntry.isDefined) - assertEquals(3L, maybeEntry.get.lastDataOffset) - - stateManager.truncateHead(3) - assertEquals(Set(anotherPid), stateManager.activeProducers.keySet) - assertEquals(Set(4), currentSnapshotOffsets) - assertEquals(4, stateManager.mapEndOffset) - - stateManager.truncateHead(5) - assertEquals(Set(), stateManager.activeProducers.keySet) - assertEquals(Set(), currentSnapshotOffsets) - assertEquals(5, stateManager.mapEndOffset) - } - - @Test - def testLoadFromSnapshotRemovesNonRetainedProducers(): Unit = { + def testLoadFromSnapshotRetainsNonExpiredProducers(): Unit = { val epoch = 0.toShort val pid1 = 1L val pid2 = 2L @@ -617,13 +572,17 @@ class ProducerStateManagerTest { assertEquals(2, stateManager.activeProducers.size) stateManager.truncateAndReload(1L, 2L, time.milliseconds()) - assertEquals(1, stateManager.activeProducers.size) - assertEquals(None, stateManager.lastEntry(pid1)) + assertEquals(2, stateManager.activeProducers.size) + + val entry1 = stateManager.lastEntry(pid1) + assertTrue(entry1.isDefined) + assertEquals(0, entry1.get.lastSeq) + assertEquals(0L, entry1.get.lastDataOffset) - val entry = stateManager.lastEntry(pid2) - assertTrue(entry.isDefined) - assertEquals(0, entry.get.lastSeq) - assertEquals(1L, entry.get.lastDataOffset) + val entry2 = stateManager.lastEntry(pid2) + assertTrue(entry2.isDefined) + assertEquals(0, entry2.get.lastSeq) + assertEquals(1L, entry2.get.lastDataOffset) } @Test