Browse Source

KAFKA-7076; Skip rebuilding producer state when using old message format (#5254)

This patch removes the need to build up producer state when the log is using V0 / V1 message format where we did not have idempotent and transactional producers yet.

Also fixes a small issue where we incorrectly reported the offset index corrupt if the last offset in the index is equal to the base offset of the segment.
pull/5294/merge
Dhruvil Shah 6 years ago committed by Jason Gustafson
parent
commit
2db7eb7a8c
  1. 59
      core/src/main/scala/kafka/log/Log.scala
  2. 4
      core/src/main/scala/kafka/log/OffsetIndex.scala
  3. 67
      core/src/test/scala/unit/kafka/log/LogTest.scala
  4. 9
      core/src/test/scala/unit/kafka/log/OffsetIndexTest.scala

59
core/src/main/scala/kafka/log/Log.scala

@ -246,6 +246,10 @@ class Log(@volatile var dir: File, @@ -246,6 +246,10 @@ class Log(@volatile var dir: File,
// The earliest leader epoch may not be flushed during a hard failure. Recover it here.
_leaderEpochCache.clearAndFlushEarliest(logStartOffset)
// Any segment loading or recovery code must not use producerStateManager, so that we can build the full state here
// from scratch.
if (!producerStateManager.isEmpty)
throw new IllegalStateException("Producer state must be empty during log initialization")
loadProducerState(logEndOffset, reloadFromCleanShutdown = hasCleanShutdownFile)
info(s"Completed load of log with ${segments.size} segments, log start offset $logStartOffset and " +
@ -417,25 +421,14 @@ class Log(@volatile var dir: File, @@ -417,25 +421,14 @@ class Log(@volatile var dir: File,
* @return The number of bytes truncated from the segment
* @throws LogSegmentOffsetOverflowException if the segment contains messages that cause index offset overflow
*/
private def recoverSegment(segment: LogSegment, leaderEpochCache: Option[LeaderEpochCache] = None): Int = lock synchronized {
val stateManager = new ProducerStateManager(topicPartition, dir, maxProducerIdExpirationMs)
stateManager.truncateAndReload(logStartOffset, segment.baseOffset, time.milliseconds)
logSegments(stateManager.mapEndOffset, segment.baseOffset).foreach { segment =>
val startOffset = math.max(segment.baseOffset, stateManager.mapEndOffset)
val fetchDataInfo = segment.read(startOffset, None, Int.MaxValue)
if (fetchDataInfo != null)
loadProducersFromLog(stateManager, fetchDataInfo.records)
}
stateManager.updateMapEndOffset(segment.baseOffset)
// take a snapshot for the first recovered segment to avoid reloading all the segments if we shutdown before we
// checkpoint the recovery point
stateManager.takeSnapshot()
val bytesTruncated = segment.recover(stateManager, leaderEpochCache)
private def recoverSegment(segment: LogSegment,
leaderEpochCache: Option[LeaderEpochCache] = None): Int = lock synchronized {
val producerStateManager = new ProducerStateManager(topicPartition, dir, maxProducerIdExpirationMs)
rebuildProducerState(segment.baseOffset, reloadFromCleanShutdown = false, producerStateManager)
val bytesTruncated = segment.recover(producerStateManager, leaderEpochCache)
// once we have recovered the segment's data, take a snapshot to ensure that we won't
// need to reload the same segment again while recovering another segment.
stateManager.takeSnapshot()
producerStateManager.takeSnapshot()
bytesTruncated
}
@ -565,10 +558,22 @@ class Log(@volatile var dir: File, @@ -565,10 +558,22 @@ class Log(@volatile var dir: File,
recoveryPoint
}
private def loadProducerState(lastOffset: Long, reloadFromCleanShutdown: Boolean): Unit = lock synchronized {
// Rebuild producer state until lastOffset. This method may be called from the recovery code path, and thus must be
// free of all side-effects, i.e. it must not update any log-specific state.
private def rebuildProducerState(lastOffset: Long,
reloadFromCleanShutdown: Boolean,
producerStateManager: ProducerStateManager): Unit = lock synchronized {
checkIfMemoryMappedBufferClosed()
val messageFormatVersion = config.messageFormatVersion.recordVersion.value
info(s"Loading producer state from offset $lastOffset with message format version $messageFormatVersion")
val segments = logSegments
val offsetsToSnapshot =
if (segments.nonEmpty) {
val nextLatestSegmentBaseOffset = lowerSegment(segments.last.baseOffset).map(_.baseOffset)
Seq(nextLatestSegmentBaseOffset, Some(segments.last.baseOffset), Some(lastOffset))
} else {
Seq(Some(lastOffset))
}
info(s"Loading producer state till offset $lastOffset with message format version $messageFormatVersion")
// We want to avoid unnecessary scanning of the log to build the producer state when the broker is being
// upgraded. The basic idea is to use the absence of producer snapshot files to detect the upgrade case,
@ -582,13 +587,11 @@ class Log(@volatile var dir: File, @@ -582,13 +587,11 @@ class Log(@volatile var dir: File,
// offset (see below). The next time the log is reloaded, we will load producer state using this snapshot
// (or later snapshots). Otherwise, if there is no snapshot file, then we have to rebuild producer state
// from the first segment.
if (producerStateManager.latestSnapshotOffset.isEmpty && (messageFormatVersion < RecordBatch.MAGIC_VALUE_V2 || reloadFromCleanShutdown)) {
if (messageFormatVersion < RecordBatch.MAGIC_VALUE_V2 ||
(producerStateManager.latestSnapshotOffset.isEmpty && reloadFromCleanShutdown)) {
// To avoid an expensive scan through all of the segments, we take empty snapshots from the start of the
// last two segments and the last offset. This should avoid the full scan in the case that the log needs
// truncation.
val nextLatestSegmentBaseOffset = lowerSegment(activeSegment.baseOffset).map(_.baseOffset)
val offsetsToSnapshot = Seq(nextLatestSegmentBaseOffset, Some(activeSegment.baseOffset), Some(lastOffset))
offsetsToSnapshot.flatten.foreach { offset =>
producerStateManager.updateMapEndOffset(offset)
producerStateManager.takeSnapshot()
@ -607,6 +610,8 @@ class Log(@volatile var dir: File, @@ -607,6 +610,8 @@ class Log(@volatile var dir: File,
logSegments(producerStateManager.mapEndOffset, lastOffset).foreach { segment =>
val startOffset = Utils.max(segment.baseOffset, producerStateManager.mapEndOffset, logStartOffset)
producerStateManager.updateMapEndOffset(startOffset)
if (offsetsToSnapshot.contains(Some(segment.baseOffset)))
producerStateManager.takeSnapshot()
val fetchDataInfo = segment.read(startOffset, Some(lastOffset), Int.MaxValue)
@ -614,12 +619,16 @@ class Log(@volatile var dir: File, @@ -614,12 +619,16 @@ class Log(@volatile var dir: File,
loadProducersFromLog(producerStateManager, fetchDataInfo.records)
}
}
producerStateManager.updateMapEndOffset(lastOffset)
updateFirstUnstableOffset()
producerStateManager.takeSnapshot()
}
}
private def loadProducerState(lastOffset: Long, reloadFromCleanShutdown: Boolean): Unit = lock synchronized {
rebuildProducerState(lastOffset, reloadFromCleanShutdown, producerStateManager)
updateFirstUnstableOffset()
}
private def loadProducersFromLog(producerStateManager: ProducerStateManager, records: Records): Unit = {
val loadedProducers = mutable.Map.empty[Long, ProducerAppendInfo]
val completedTxns = ListBuffer.empty[CompletedTxn]

4
core/src/main/scala/kafka/log/OffsetIndex.scala

@ -188,9 +188,9 @@ class OffsetIndex(_file: File, baseOffset: Long, maxIndexSize: Int = -1, writabl @@ -188,9 +188,9 @@ class OffsetIndex(_file: File, baseOffset: Long, maxIndexSize: Int = -1, writabl
}
override def sanityCheck() {
if (_entries != 0 && _lastOffset <= baseOffset)
if (_entries != 0 && _lastOffset < baseOffset)
throw new CorruptIndexException(s"Corrupt index found, index file (${file.getAbsolutePath}) has non-zero size " +
s"but the last offset is ${_lastOffset} which is no greater than the base offset $baseOffset.")
s"but the last offset is ${_lastOffset} which is less than the base offset $baseOffset.")
if (length % entrySize != 0)
throw new CorruptIndexException(s"Index file ${file.getAbsolutePath} is corrupt, found $length bytes which is " +
s"neither positive nor a multiple of $entrySize.")

67
core/src/test/scala/unit/kafka/log/LogTest.scala

@ -22,6 +22,7 @@ import java.nio.ByteBuffer @@ -22,6 +22,7 @@ import java.nio.ByteBuffer
import java.nio.file.{Files, Paths}
import java.util.Properties
import kafka.api.{ApiVersion, KAFKA_0_11_0_IV0}
import kafka.common.{OffsetsOutOfOrderException, UnexpectedAppendOffsetException}
import kafka.log.Log.DeleteDirSuffix
import kafka.server.epoch.{EpochEntry, LeaderEpochCache, LeaderEpochFileCache}
@ -206,8 +207,6 @@ class LogTest { @@ -206,8 +207,6 @@ class LogTest {
// Reload after unclean shutdown with recoveryPoint set to log end offset
log = createLog(logDir, logConfig, recoveryPoint = logEndOffset)
// Note that we don't maintain the guarantee of having a snapshot for the 2 most recent segments in this case
expectedSnapshotOffsets = Vector(log.logSegments.last.baseOffset, log.logEndOffset)
assertEquals(expectedSnapshotOffsets, listProducerSnapshotOffsets)
log.close()
@ -215,15 +214,24 @@ class LogTest { @@ -215,15 +214,24 @@ class LogTest {
// Reload after unclean shutdown with recoveryPoint set to 0
log = createLog(logDir, logConfig, recoveryPoint = 0L)
// Is this working as intended?
// We progressively create a snapshot for each segment after the recovery point
expectedSnapshotOffsets = log.logSegments.map(_.baseOffset).tail.toVector :+ log.logEndOffset
assertEquals(expectedSnapshotOffsets, listProducerSnapshotOffsets)
log.close()
}
@Test
def testProducerSnapshotsRecoveryAfterUncleanShutdown(): Unit = {
val logConfig = LogTest.createLogConfig(segmentBytes = 64 * 10)
def testProducerSnapshotsRecoveryAfterUncleanShutdownV1(): Unit = {
testProducerSnapshotsRecoveryAfterUncleanShutdown(ApiVersion.minSupportedFor(RecordVersion.V1).version)
}
@Test
def testProducerSnapshotsRecoveryAfterUncleanShutdownCurrentMessageFormat(): Unit = {
testProducerSnapshotsRecoveryAfterUncleanShutdown(ApiVersion.latestVersion.version)
}
private def testProducerSnapshotsRecoveryAfterUncleanShutdown(messageFormatVersion: String): Unit = {
val logConfig = LogTest.createLogConfig(segmentBytes = 64 * 10, messageFormatVersion = messageFormatVersion)
var log = createLog(logDir, logConfig)
assertEquals(None, log.oldestProducerSnapshotOffset)
@ -247,6 +255,16 @@ class LogTest { @@ -247,6 +255,16 @@ class LogTest {
val segmentsWithReads = ArrayBuffer[LogSegment]()
val recoveredSegments = ArrayBuffer[LogSegment]()
val expectedSegmentsWithReads = ArrayBuffer[Long]()
val expectedSnapshotOffsets = ArrayBuffer[Long]()
if (logConfig.messageFormatVersion < KAFKA_0_11_0_IV0) {
expectedSegmentsWithReads += activeSegmentOffset
expectedSnapshotOffsets ++= log.logSegments.map(_.baseOffset).toVector.takeRight(2) :+ log.logEndOffset
} else {
expectedSegmentsWithReads ++= segOffsetsBeforeRecovery ++ Seq(activeSegmentOffset)
expectedSnapshotOffsets ++= log.logSegments.map(_.baseOffset).toVector.takeRight(4) :+ log.logEndOffset
}
def createLogWithInterceptedReads(recoveryPoint: Long) = {
val maxProducerIdExpirationMs = 60 * 60 * 1000
@ -283,9 +301,8 @@ class LogTest { @@ -283,9 +301,8 @@ class LogTest {
ProducerStateManager.deleteSnapshotsBefore(logDir, segmentOffsets(segmentOffsets.size - 2))
log = createLogWithInterceptedReads(offsetForRecoveryPointSegment)
// We will reload all segments because the recovery point is behind the producer snapshot files (pre KAFKA-5829 behaviour)
assertEquals(segOffsetsBeforeRecovery, segmentsWithReads.map(_.baseOffset) -- Seq(activeSegmentOffset))
assertEquals(expectedSegmentsWithReads, segmentsWithReads.map(_.baseOffset))
assertEquals(segOffsetsAfterRecovery, recoveredSegments.map(_.baseOffset))
var expectedSnapshotOffsets = segmentOffsets.takeRight(4) :+ log.logEndOffset
assertEquals(expectedSnapshotOffsets, listProducerSnapshotOffsets)
log.close()
segmentsWithReads.clear()
@ -297,13 +314,12 @@ class LogTest { @@ -297,13 +314,12 @@ class LogTest {
log = createLogWithInterceptedReads(recoveryPoint = recoveryPoint)
assertEquals(Seq(activeSegmentOffset), segmentsWithReads.map(_.baseOffset))
assertEquals(segOffsetsAfterRecovery, recoveredSegments.map(_.baseOffset))
expectedSnapshotOffsets = log.logSegments.map(_.baseOffset).toVector.takeRight(4) :+ log.logEndOffset
assertEquals(expectedSnapshotOffsets, listProducerSnapshotOffsets)
// Verify that we keep 2 snapshot files if we checkpoint the log end offset
log.deleteSnapshotsAfterRecoveryPointCheckpoint()
expectedSnapshotOffsets = log.logSegments.map(_.baseOffset).toVector.takeRight(2) :+ log.logEndOffset
assertEquals(expectedSnapshotOffsets, listProducerSnapshotOffsets)
val expectedSnapshotsAfterDelete = log.logSegments.map(_.baseOffset).toVector.takeRight(2) :+ log.logEndOffset
assertEquals(expectedSnapshotsAfterDelete, listProducerSnapshotOffsets)
log.close()
}
@ -398,6 +414,9 @@ class LogTest { @@ -398,6 +414,9 @@ class LogTest {
// We skip directly to updating the map end offset
stateManager.updateMapEndOffset(1L)
EasyMock.expectLastCall()
// Finally, we take a snapshot
stateManager.takeSnapshot()
EasyMock.expectLastCall().once()
EasyMock.replay(stateManager)
@ -410,14 +429,18 @@ class LogTest { @@ -410,14 +429,18 @@ class LogTest {
def testSkipTruncateAndReloadIfOldMessageFormatAndNoCleanShutdown(): Unit = {
val stateManager = EasyMock.mock(classOf[ProducerStateManager])
EasyMock.expect(stateManager.latestSnapshotOffset).andReturn(None)
stateManager.updateMapEndOffset(0L)
EasyMock.expectLastCall().anyTimes()
stateManager.takeSnapshot()
EasyMock.expectLastCall().anyTimes()
EasyMock.expect(stateManager.isEmpty).andReturn(true)
EasyMock.expectLastCall().once()
EasyMock.expect(stateManager.firstUnstableOffset).andReturn(None)
EasyMock.expectLastCall().once()
EasyMock.replay(stateManager)
val logProps = new Properties()
@ -443,14 +466,18 @@ class LogTest { @@ -443,14 +466,18 @@ class LogTest {
def testSkipTruncateAndReloadIfOldMessageFormatAndCleanShutdown(): Unit = {
val stateManager = EasyMock.mock(classOf[ProducerStateManager])
EasyMock.expect(stateManager.latestSnapshotOffset).andReturn(None)
stateManager.updateMapEndOffset(0L)
EasyMock.expectLastCall().anyTimes()
stateManager.takeSnapshot()
EasyMock.expectLastCall().anyTimes()
EasyMock.expect(stateManager.isEmpty).andReturn(true)
EasyMock.expectLastCall().once()
EasyMock.expect(stateManager.firstUnstableOffset).andReturn(None)
EasyMock.expectLastCall().once()
EasyMock.replay(stateManager)
val cleanShutdownFile = createCleanShutdownFile()
@ -487,6 +514,12 @@ class LogTest { @@ -487,6 +514,12 @@ class LogTest {
stateManager.takeSnapshot()
EasyMock.expectLastCall().anyTimes()
EasyMock.expect(stateManager.isEmpty).andReturn(true)
EasyMock.expectLastCall().once()
EasyMock.expect(stateManager.firstUnstableOffset).andReturn(None)
EasyMock.expectLastCall().once()
EasyMock.replay(stateManager)
val cleanShutdownFile = createCleanShutdownFile()
@ -644,8 +677,12 @@ class LogTest { @@ -644,8 +677,12 @@ class LogTest {
assertEquals(2, log.latestProducerStateEndOffset)
log.truncateTo(1)
assertEquals(None, log.latestProducerSnapshotOffset)
assertEquals(Some(1), log.latestProducerSnapshotOffset)
assertEquals(1, log.latestProducerStateEndOffset)
log.truncateTo(0)
assertEquals(None, log.latestProducerSnapshotOffset)
assertEquals(0, log.latestProducerStateEndOffset)
}
@Test

9
core/src/test/scala/unit/kafka/log/OffsetIndexTest.scala

@ -179,6 +179,15 @@ class OffsetIndexTest extends JUnitSuite { @@ -179,6 +179,15 @@ class OffsetIndexTest extends JUnitSuite {
intercept[NullPointerException](idx.lookup(1))
}
@Test
def testSanityLastOffsetEqualToBaseOffset(): Unit = {
// Test index sanity for the case where the last offset appended to the index is equal to the base offset
val baseOffset = 20L
val idx = new OffsetIndex(nonExistentTempFile(), baseOffset = baseOffset, maxIndexSize = 10 * 8)
idx.append(baseOffset, 0)
idx.sanityCheck()
}
def assertWriteFails[T](message: String, idx: OffsetIndex, offset: Int, klass: Class[T]) {
try {
idx.append(offset, 1)

Loading…
Cancel
Save