Browse Source

KAFKA-5249; Fix incorrect producer snapshot offsets when recovering segments

Author: Jason Gustafson <jason@confluent.io>

Reviewers: Jun Rao <junrao@gmail.com>

Closes #3060 from hachikuji/KAFKA-5249
pull/3064/head
Jason Gustafson 8 years ago committed by Jun Rao
parent
commit
7bb551b4a1
  1. 1
      core/src/main/scala/kafka/log/Log.scala
  2. 1
      core/src/main/scala/kafka/log/LogSegment.scala
  3. 10
      core/src/main/scala/kafka/log/ProducerStateManager.scala
  4. 8
      core/src/test/scala/unit/kafka/log/LogSegmentTest.scala
  5. 108
      core/src/test/scala/unit/kafka/log/LogTest.scala
  6. 21
      core/src/test/scala/unit/kafka/log/ProducerStateManagerTest.scala

1
core/src/main/scala/kafka/log/Log.scala

@ -316,6 +316,7 @@ class Log(@volatile var dir: File, @@ -316,6 +316,7 @@ class Log(@volatile var dir: File,
if (fetchDataInfo != null)
loadProducersFromLog(stateManager, fetchDataInfo.records)
}
stateManager.updateMapEndOffset(segment.baseOffset)
val bytesTruncated = segment.recover(config.maxMessageSize, stateManager, leaderEpochCache)
// once we have recovered the segment's data, take a snapshot to ensure that we won't

1
core/src/main/scala/kafka/log/LogSegment.scala

@ -156,6 +156,7 @@ class LogSegment(val log: FileRecords, @@ -156,6 +156,7 @@ class LogSegment(val log: FileRecords,
updateTxnIndex(completedTxn, lastStableOffset)
}
}
producerStateManager.updateMapEndOffset(batch.lastOffset + 1)
}
/**

10
core/src/main/scala/kafka/log/ProducerStateManager.scala

@ -429,6 +429,12 @@ class ProducerStateManager(val topicPartition: TopicPartition, @@ -429,6 +429,12 @@ class ProducerStateManager(val topicPartition: TopicPartition,
* or equal to the high watermark.
*/
def truncateAndReload(logStartOffset: Long, logEndOffset: Long, currentTimeMs: Long) {
// remove all out of range snapshots
deleteSnapshotFiles { file =>
val offset = offsetFromFilename(file.getName)
offset > logEndOffset || offset <= logStartOffset
}
if (logEndOffset != mapEndOffset) {
producers.clear()
ongoingTxns.clear()
@ -436,10 +442,6 @@ class ProducerStateManager(val topicPartition: TopicPartition, @@ -436,10 +442,6 @@ class ProducerStateManager(val topicPartition: TopicPartition,
// since we assume that the offset is less than or equal to the high watermark, it is
// safe to clear the unreplicated transactions
unreplicatedTxns.clear()
deleteSnapshotFiles { file =>
val offset = offsetFromFilename(file.getName)
offset > logEndOffset || offset <= logStartOffset
}
loadFromSnapshot(logStartOffset, currentTimeMs)
} else {
evictUnretainedProducers(logStartOffset)

8
core/src/test/scala/unit/kafka/log/LogSegmentTest.scala

@ -302,7 +302,10 @@ class LogSegmentTest { @@ -302,7 +302,10 @@ class LogSegmentTest {
segment.append(firstOffset = 107L, largestOffset = 107L, largestTimestamp = RecordBatch.NO_TIMESTAMP,
shallowOffsetOfMaxTimestamp = 107L, endTxnRecords(ControlRecordType.COMMIT, pid1, producerEpoch, offset = 107L))
segment.recover(64 * 1024, new ProducerStateManager(topicPartition, logDir))
var stateManager = new ProducerStateManager(topicPartition, logDir)
segment.recover(64 * 1024, stateManager)
assertEquals(108L, stateManager.mapEndOffset)
var abortedTxns = segment.txnIndex.allAbortedTxns
assertEquals(1, abortedTxns.size)
@ -313,9 +316,10 @@ class LogSegmentTest { @@ -313,9 +316,10 @@ class LogSegmentTest {
assertEquals(100L, abortedTxn.lastStableOffset)
// recover again, but this time assuming the transaction from pid2 began on a previous segment
val stateManager = new ProducerStateManager(topicPartition, logDir)
stateManager = new ProducerStateManager(topicPartition, logDir)
stateManager.loadProducerEntry(ProducerIdEntry(pid2, producerEpoch, 10, 90L, 5, RecordBatch.NO_TIMESTAMP, 0, Some(75L)))
segment.recover(64 * 1024, stateManager)
assertEquals(108L, stateManager.mapEndOffset)
abortedTxns = segment.txnIndex.allAbortedTxns
assertEquals(1, abortedTxns.size)

108
core/src/test/scala/unit/kafka/log/LogTest.scala

@ -178,7 +178,7 @@ class LogTest { @@ -178,7 +178,7 @@ class LogTest {
@Test
def testRebuildPidMapWithCompactedData() {
val log = createLog(2048, pidSnapshotIntervalMs = Int.MaxValue)
val log = createLog(2048)
val pid = 1L
val epoch = 0.toShort
val seq = 0
@ -2318,6 +2318,110 @@ class LogTest { @@ -2318,6 +2318,110 @@ class LogTest {
assertEquals(List(new AbortedTxn(pid1, 0L, 29L, 8L), new AbortedTxn(pid2, 8L, 74L, 36L)), abortedTransactions)
}
@Test
def testRecoverOnlyLastSegment(): Unit = {
val log = createLog(128)
val epoch = 0.toShort
val pid1 = 1L
val pid2 = 2L
val pid3 = 3L
val pid4 = 4L
val appendPid1 = appendTransactionalAsLeader(log, pid1, epoch)
val appendPid2 = appendTransactionalAsLeader(log, pid2, epoch)
val appendPid3 = appendTransactionalAsLeader(log, pid3, epoch)
val appendPid4 = appendTransactionalAsLeader(log, pid4, epoch)
// mix transactional and non-transactional data
appendPid1(5) // nextOffset: 5
appendNonTransactionalAsLeader(log, 3) // 8
appendPid2(2) // 10
appendPid1(4) // 14
appendPid3(3) // 17
appendNonTransactionalAsLeader(log, 2) // 19
appendPid1(10) // 29
appendEndTxnMarkerAsLeader(log, pid1, epoch, ControlRecordType.ABORT) // 30
appendPid2(6) // 36
appendPid4(3) // 39
appendNonTransactionalAsLeader(log, 10) // 49
appendPid3(9) // 58
appendEndTxnMarkerAsLeader(log, pid3, epoch, ControlRecordType.COMMIT) // 59
appendPid4(8) // 67
appendPid2(7) // 74
appendEndTxnMarkerAsLeader(log, pid2, epoch, ControlRecordType.ABORT) // 75
appendNonTransactionalAsLeader(log, 10) // 85
appendPid4(4) // 89
appendEndTxnMarkerAsLeader(log, pid4, epoch, ControlRecordType.COMMIT) // 90
// delete the last offset and transaction index files to force recovery
val lastSegment = log.logSegments.last
val recoveryPoint = lastSegment.baseOffset
lastSegment.index.delete()
lastSegment.txnIndex.delete()
log.close()
val reloadedLog = createLog(1024, recoveryPoint = recoveryPoint)
val abortedTransactions = allAbortedTransactions(reloadedLog)
assertEquals(List(new AbortedTxn(pid1, 0L, 29L, 8L), new AbortedTxn(pid2, 8L, 74L, 36L)), abortedTransactions)
}
@Test
def testRecoverLastSegmentWithNoSnapshots(): Unit = {
val log = createLog(128)
val epoch = 0.toShort
val pid1 = 1L
val pid2 = 2L
val pid3 = 3L
val pid4 = 4L
val appendPid1 = appendTransactionalAsLeader(log, pid1, epoch)
val appendPid2 = appendTransactionalAsLeader(log, pid2, epoch)
val appendPid3 = appendTransactionalAsLeader(log, pid3, epoch)
val appendPid4 = appendTransactionalAsLeader(log, pid4, epoch)
// mix transactional and non-transactional data
appendPid1(5) // nextOffset: 5
appendNonTransactionalAsLeader(log, 3) // 8
appendPid2(2) // 10
appendPid1(4) // 14
appendPid3(3) // 17
appendNonTransactionalAsLeader(log, 2) // 19
appendPid1(10) // 29
appendEndTxnMarkerAsLeader(log, pid1, epoch, ControlRecordType.ABORT) // 30
appendPid2(6) // 36
appendPid4(3) // 39
appendNonTransactionalAsLeader(log, 10) // 49
appendPid3(9) // 58
appendEndTxnMarkerAsLeader(log, pid3, epoch, ControlRecordType.COMMIT) // 59
appendPid4(8) // 67
appendPid2(7) // 74
appendEndTxnMarkerAsLeader(log, pid2, epoch, ControlRecordType.ABORT) // 75
appendNonTransactionalAsLeader(log, 10) // 85
appendPid4(4) // 89
appendEndTxnMarkerAsLeader(log, pid4, epoch, ControlRecordType.COMMIT) // 90
// delete all snapshot files
logDir.listFiles.filter(f => f.isFile && f.getName.endsWith(Log.PidSnapshotFileSuffix)).foreach { file =>
file.delete()
}
// delete the last offset and transaction index files to force recovery. this should force us to rebuild
// the producer state from the start of the log
val lastSegment = log.logSegments.last
val recoveryPoint = lastSegment.baseOffset
lastSegment.index.delete()
lastSegment.txnIndex.delete()
log.close()
val reloadedLog = createLog(1024, recoveryPoint = recoveryPoint)
val abortedTransactions = allAbortedTransactions(reloadedLog)
assertEquals(List(new AbortedTxn(pid1, 0L, 29L, 8L), new AbortedTxn(pid2, 8L, 74L, 36L)), abortedTransactions)
}
@Test
def testTransactionIndexUpdatedThroughReplication(): Unit = {
val epoch = 0.toShort
@ -2474,7 +2578,7 @@ class LogTest { @@ -2474,7 +2578,7 @@ class LogTest {
private def createLog(messageSizeInBytes: Int, retentionMs: Int = -1, retentionBytes: Int = -1,
cleanupPolicy: String = "delete", messagesPerSegment: Int = 5,
maxPidExpirationMs: Int = 300000, pidExpirationCheckIntervalMs: Int = 30000,
pidSnapshotIntervalMs: Int = 60000): Log = {
recoveryPoint: Long = 0L): Log = {
val logProps = new Properties()
logProps.put(LogConfig.SegmentBytesProp, messageSizeInBytes * messagesPerSegment: Integer)
logProps.put(LogConfig.RetentionMsProp, retentionMs: Integer)

21
core/src/test/scala/unit/kafka/log/ProducerStateManagerTest.scala

@ -23,6 +23,7 @@ import kafka.server.LogOffsetMetadata @@ -23,6 +23,7 @@ import kafka.server.LogOffsetMetadata
import kafka.utils.TestUtils
import org.apache.kafka.common.TopicPartition
import org.apache.kafka.common.errors._
import org.apache.kafka.common.internals.Topic
import org.apache.kafka.common.record.{ControlRecordType, EndTransactionMarker, RecordBatch}
import org.apache.kafka.common.utils.{MockTime, Utils}
import org.junit.Assert._
@ -226,6 +227,26 @@ class ProducerStateManagerTest extends JUnitSuite { @@ -226,6 +227,26 @@ class ProducerStateManagerTest extends JUnitSuite {
append(idMapping, pid, 1, epoch, 1L, isTransactional = false)
}
@Test
def testTruncateAndReloadRemovesOutOfRangeSnapshots(): Unit = {
val epoch = 0.toShort
append(idMapping, pid, epoch, 0, 0L)
idMapping.takeSnapshot()
append(idMapping, pid, epoch, 1, 1L)
idMapping.takeSnapshot()
append(idMapping, pid, epoch, 2, 2L)
idMapping.takeSnapshot()
append(idMapping, pid, epoch, 3, 3L)
idMapping.takeSnapshot()
append(idMapping, pid, epoch, 4, 4L)
idMapping.takeSnapshot()
idMapping.truncateAndReload(1L, 3L, time.milliseconds())
assertEquals(Some(2L), idMapping.oldestSnapshotOffset)
assertEquals(Some(3L), idMapping.latestSnapshotOffset)
}
@Test
def testTakeSnapshot(): Unit = {
val epoch = 0.toShort

Loading…
Cancel
Save