Browse Source

KAFKA-7190; Retain producer state until transactionalIdExpiration time passes (#7388)

As described in KIP-360, this patch changes producer state retention so that producer state remains cached even after it is removed from the log. Producer state will only be removed now when the transactional id expiration time has passed. This is intended to reduce the incidence of UNKNOWN_PRODUCER_ID errors for producers when records are deleted or when a topic has a short retention time. Tested with unit tests.

Reviewers: Jason Gustafson <jason@confluent.io>, Guozhang Wang <wangguoz@gmail.com>
pull/7473/head
Bob Barrett 5 years ago committed by Jason Gustafson
parent
commit
c49775bf07
  1. 1
      core/src/main/scala/kafka/log/Log.scala
  2. 58
      core/src/main/scala/kafka/log/ProducerStateManager.scala
  3. 13
      core/src/test/scala/unit/kafka/log/LogTest.scala
  4. 63
      core/src/test/scala/unit/kafka/log/ProducerStateManagerTest.scala

1
core/src/main/scala/kafka/log/Log.scala

@ -1241,7 +1241,6 @@ class Log(@volatile var dir: File, @@ -1241,7 +1241,6 @@ class Log(@volatile var dir: File,
info(s"Incrementing log start offset to $newLogStartOffset")
logStartOffset = newLogStartOffset
leaderEpochCache.foreach(_.truncateFromStart(logStartOffset))
producerStateManager.truncateHead(logStartOffset)
maybeIncrementFirstUnstableOffset()
}
}

58
core/src/main/scala/kafka/log/ProducerStateManager.scala

@ -113,7 +113,7 @@ private[log] class ProducerStateEntry(val producerId: Long, @@ -113,7 +113,7 @@ private[log] class ProducerStateEntry(val producerId: Long,
def lastDataOffset: Long = if (isEmpty) -1L else batchMetadata.last.lastOffset
def lastTimestamp = if (isEmpty) RecordBatch.NO_TIMESTAMP else batchMetadata.last.timestamp
def lastTimestamp: Long = if (isEmpty) RecordBatch.NO_TIMESTAMP else batchMetadata.last.timestamp
def lastOffsetDelta : Int = if (isEmpty) 0 else batchMetadata.last.offsetDelta
@ -148,8 +148,6 @@ private[log] class ProducerStateEntry(val producerId: Long, @@ -148,8 +148,6 @@ private[log] class ProducerStateEntry(val producerId: Long,
this.currentTxnFirstOffset = nextEntry.currentTxnFirstOffset
}
def removeBatchesOlderThan(offset: Long): Unit = batchMetadata.dropWhile(_.lastOffset < offset)
def findDuplicateBatch(batch: RecordBatch): Option[BatchMetadata] = {
if (batch.producerEpoch != producerEpoch)
None
@ -542,7 +540,7 @@ class ProducerStateManager(val topicPartition: TopicPartition, @@ -542,7 +540,7 @@ class ProducerStateManager(val topicPartition: TopicPartition,
/**
* Returns the last offset of this map
*/
def mapEndOffset = lastMapOffset
def mapEndOffset: Long = lastMapOffset
/**
* Get a copy of the active producers
@ -557,9 +555,7 @@ class ProducerStateManager(val topicPartition: TopicPartition, @@ -557,9 +555,7 @@ class ProducerStateManager(val topicPartition: TopicPartition,
case Some(file) =>
try {
info(s"Loading producer state from snapshot file '$file'")
val loadedProducers = readSnapshot(file).filter { producerEntry =>
isProducerRetained(producerEntry, logStartOffset) && !isProducerExpired(currentTime, producerEntry)
}
val loadedProducers = readSnapshot(file).filter { producerEntry => !isProducerExpired(currentTime, producerEntry) }
loadedProducers.foreach(loadProducerEntry)
lastSnapOffset = offsetFromFile(file)
lastMapOffset = lastSnapOffset
@ -600,8 +596,10 @@ class ProducerStateManager(val topicPartition: TopicPartition, @@ -600,8 +596,10 @@ class ProducerStateManager(val topicPartition: TopicPartition,
/**
* Truncate the producer id mapping to the given offset range and reload the entries from the most recent
* snapshot in range (if there is one). Note that the log end offset is assumed to be less than
* or equal to the high watermark.
* snapshot in range (if there is one). We delete snapshot files prior to the logStartOffset but do not remove
* producer state from the map. This means that in-memory and on-disk state can diverge, and in the case of
* broker failover or unclean shutdown, any in-memory state not persisted in the snapshots will be lost.
* Note that the log end offset is assumed to be less than or equal to the high watermark.
*/
def truncateAndReload(logStartOffset: Long, logEndOffset: Long, currentTimeMs: Long): Unit = {
// remove all out of range snapshots
@ -617,8 +615,6 @@ class ProducerStateManager(val topicPartition: TopicPartition, @@ -617,8 +615,6 @@ class ProducerStateManager(val topicPartition: TopicPartition,
// safe to clear the unreplicated transactions
unreplicatedTxns.clear()
loadFromSnapshot(logStartOffset, currentTimeMs)
} else {
truncateHead(logStartOffset)
}
}
@ -692,46 +688,6 @@ class ProducerStateManager(val topicPartition: TopicPartition, @@ -692,46 +688,6 @@ class ProducerStateManager(val topicPartition: TopicPartition,
*/
def oldestSnapshotOffset: Option[Long] = oldestSnapshotFile.map(file => offsetFromFile(file))
private def isProducerRetained(producerStateEntry: ProducerStateEntry, logStartOffset: Long): Boolean = {
producerStateEntry.removeBatchesOlderThan(logStartOffset)
producerStateEntry.lastDataOffset >= logStartOffset
}
/**
* When we remove the head of the log due to retention, we need to clean up the id map. This method takes
* the new start offset and removes all producerIds which have a smaller last written offset. Additionally,
* we remove snapshots older than the new log start offset.
*
* Note that snapshots from offsets greater than the log start offset may have producers included which
* should no longer be retained: these producers will be removed if and when we need to load state from
* the snapshot.
*/
def truncateHead(logStartOffset: Long): Unit = {
val evictedProducerEntries = producers.filter { case (_, producerState) =>
!isProducerRetained(producerState, logStartOffset)
}
val evictedProducerIds = evictedProducerEntries.keySet
producers --= evictedProducerIds
removeEvictedOngoingTransactions(evictedProducerIds)
removeUnreplicatedTransactions(logStartOffset)
if (lastMapOffset < logStartOffset)
lastMapOffset = logStartOffset
deleteSnapshotsBefore(logStartOffset)
lastSnapOffset = latestSnapshotOffset.getOrElse(logStartOffset)
}
private def removeEvictedOngoingTransactions(expiredProducerIds: collection.Set[Long]): Unit = {
val iterator = ongoingTxns.entrySet.iterator
while (iterator.hasNext) {
val txnEntry = iterator.next()
if (expiredProducerIds.contains(txnEntry.getValue.producerId))
iterator.remove()
}
}
private def removeUnreplicatedTransactions(offset: Long): Unit = {
val iterator = unreplicatedTxns.entrySet.iterator
while (iterator.hasNext) {

13
core/src/test/scala/unit/kafka/log/LogTest.scala

@ -1070,15 +1070,17 @@ class LogTest { @@ -1070,15 +1070,17 @@ class LogTest {
log.updateHighWatermark(log.logEndOffset)
log.maybeIncrementLogStartOffset(1L)
assertEquals(1, log.activeProducersWithLastSequence.size)
// Deleting records should not remove producer state
assertEquals(2, log.activeProducersWithLastSequence.size)
val retainedLastSeqOpt = log.activeProducersWithLastSequence.get(pid2)
assertTrue(retainedLastSeqOpt.isDefined)
assertEquals(0, retainedLastSeqOpt.get)
log.close()
// Because the log start offset did not advance, producer snapshots will still be present and the state will be rebuilt
val reloadedLog = createLog(logDir, logConfig, logStartOffset = 1L)
assertEquals(1, reloadedLog.activeProducersWithLastSequence.size)
assertEquals(2, reloadedLog.activeProducersWithLastSequence.size)
val reloadedLastSeqOpt = log.activeProducersWithLastSequence.get(pid2)
assertEquals(retainedLastSeqOpt, reloadedLastSeqOpt)
}
@ -1104,14 +1106,16 @@ class LogTest { @@ -1104,14 +1106,16 @@ class LogTest {
log.maybeIncrementLogStartOffset(1L)
log.deleteOldSegments()
// Deleting records should not remove producer state
assertEquals(1, log.logSegments.size)
assertEquals(1, log.activeProducersWithLastSequence.size)
assertEquals(2, log.activeProducersWithLastSequence.size)
val retainedLastSeqOpt = log.activeProducersWithLastSequence.get(pid2)
assertTrue(retainedLastSeqOpt.isDefined)
assertEquals(0, retainedLastSeqOpt.get)
log.close()
// After reloading log, producer state should not be regenerated
val reloadedLog = createLog(logDir, logConfig, logStartOffset = 1L)
assertEquals(1, reloadedLog.activeProducersWithLastSequence.size)
val reloadedEntryOpt = log.activeProducersWithLastSequence.get(pid2)
@ -1162,8 +1166,9 @@ class LogTest { @@ -1162,8 +1166,9 @@ class LogTest {
log.updateHighWatermark(log.logEndOffset)
log.deleteOldSegments()
// Producer state should not be removed when deleting log segment
assertEquals(2, log.logSegments.size)
assertEquals(Set(pid2), log.activeProducersWithLastSequence.keySet)
assertEquals(Set(pid1, pid2), log.activeProducersWithLastSequence.keySet)
}
@Test

63
core/src/test/scala/unit/kafka/log/ProducerStateManagerTest.scala

@ -561,52 +561,7 @@ class ProducerStateManagerTest { @@ -561,52 +561,7 @@ class ProducerStateManagerTest {
}
@Test
def testFirstUnstableOffsetAfterEviction(): Unit = {
val epoch = 0.toShort
val sequence = 0
append(stateManager, producerId, epoch, sequence, offset = 99, isTransactional = true)
assertEquals(Some(99), stateManager.firstUnstableOffset.map(_.messageOffset))
append(stateManager, 2L, epoch, 0, offset = 106, isTransactional = true)
stateManager.truncateHead(100)
assertEquals(Some(106), stateManager.firstUnstableOffset.map(_.messageOffset))
}
@Test
def testTruncateHead(): Unit = {
val epoch = 0.toShort
append(stateManager, producerId, epoch, 0, 0L)
append(stateManager, producerId, epoch, 1, 1L)
stateManager.takeSnapshot()
val anotherPid = 2L
append(stateManager, anotherPid, epoch, 0, 2L)
append(stateManager, anotherPid, epoch, 1, 3L)
stateManager.takeSnapshot()
assertEquals(Set(2, 4), currentSnapshotOffsets)
stateManager.truncateHead(2)
assertEquals(Set(2, 4), currentSnapshotOffsets)
assertEquals(Set(anotherPid), stateManager.activeProducers.keySet)
assertEquals(None, stateManager.lastEntry(producerId))
val maybeEntry = stateManager.lastEntry(anotherPid)
assertTrue(maybeEntry.isDefined)
assertEquals(3L, maybeEntry.get.lastDataOffset)
stateManager.truncateHead(3)
assertEquals(Set(anotherPid), stateManager.activeProducers.keySet)
assertEquals(Set(4), currentSnapshotOffsets)
assertEquals(4, stateManager.mapEndOffset)
stateManager.truncateHead(5)
assertEquals(Set(), stateManager.activeProducers.keySet)
assertEquals(Set(), currentSnapshotOffsets)
assertEquals(5, stateManager.mapEndOffset)
}
@Test
def testLoadFromSnapshotRemovesNonRetainedProducers(): Unit = {
def testLoadFromSnapshotRetainsNonExpiredProducers(): Unit = {
val epoch = 0.toShort
val pid1 = 1L
val pid2 = 2L
@ -617,13 +572,17 @@ class ProducerStateManagerTest { @@ -617,13 +572,17 @@ class ProducerStateManagerTest {
assertEquals(2, stateManager.activeProducers.size)
stateManager.truncateAndReload(1L, 2L, time.milliseconds())
assertEquals(1, stateManager.activeProducers.size)
assertEquals(None, stateManager.lastEntry(pid1))
assertEquals(2, stateManager.activeProducers.size)
val entry1 = stateManager.lastEntry(pid1)
assertTrue(entry1.isDefined)
assertEquals(0, entry1.get.lastSeq)
assertEquals(0L, entry1.get.lastDataOffset)
val entry = stateManager.lastEntry(pid2)
assertTrue(entry.isDefined)
assertEquals(0, entry.get.lastSeq)
assertEquals(1L, entry.get.lastDataOffset)
val entry2 = stateManager.lastEntry(pid2)
assertTrue(entry2.isDefined)
assertEquals(0, entry2.get.lastSeq)
assertEquals(1L, entry2.get.lastDataOffset)
}
@Test

Loading…
Cancel
Save