diff --git a/core/src/main/scala/kafka/log/Log.scala b/core/src/main/scala/kafka/log/Log.scala index 3036018dbda..e4be8fcc43d 100644 --- a/core/src/main/scala/kafka/log/Log.scala +++ b/core/src/main/scala/kafka/log/Log.scala @@ -246,6 +246,10 @@ class Log(@volatile var dir: File, // The earliest leader epoch may not be flushed during a hard failure. Recover it here. _leaderEpochCache.clearAndFlushEarliest(logStartOffset) + // Any segment loading or recovery code must not use producerStateManager, so that we can build the full state here + // from scratch. + if (!producerStateManager.isEmpty) + throw new IllegalStateException("Producer state must be empty during log initialization") loadProducerState(logEndOffset, reloadFromCleanShutdown = hasCleanShutdownFile) info(s"Completed load of log with ${segments.size} segments, log start offset $logStartOffset and " + @@ -417,25 +421,14 @@ class Log(@volatile var dir: File, * @return The number of bytes truncated from the segment * @throws LogSegmentOffsetOverflowException if the segment contains messages that cause index offset overflow */ - private def recoverSegment(segment: LogSegment, leaderEpochCache: Option[LeaderEpochCache] = None): Int = lock synchronized { - val stateManager = new ProducerStateManager(topicPartition, dir, maxProducerIdExpirationMs) - stateManager.truncateAndReload(logStartOffset, segment.baseOffset, time.milliseconds) - logSegments(stateManager.mapEndOffset, segment.baseOffset).foreach { segment => - val startOffset = math.max(segment.baseOffset, stateManager.mapEndOffset) - val fetchDataInfo = segment.read(startOffset, None, Int.MaxValue) - if (fetchDataInfo != null) - loadProducersFromLog(stateManager, fetchDataInfo.records) - } - stateManager.updateMapEndOffset(segment.baseOffset) - - // take a snapshot for the first recovered segment to avoid reloading all the segments if we shutdown before we - // checkpoint the recovery point - stateManager.takeSnapshot() - val bytesTruncated = segment.recover(stateManager, leaderEpochCache) - + private def recoverSegment(segment: LogSegment, + leaderEpochCache: Option[LeaderEpochCache] = None): Int = lock synchronized { + val producerStateManager = new ProducerStateManager(topicPartition, dir, maxProducerIdExpirationMs) + rebuildProducerState(segment.baseOffset, reloadFromCleanShutdown = false, producerStateManager) + val bytesTruncated = segment.recover(producerStateManager, leaderEpochCache) // once we have recovered the segment's data, take a snapshot to ensure that we won't // need to reload the same segment again while recovering another segment. - stateManager.takeSnapshot() + producerStateManager.takeSnapshot() bytesTruncated } @@ -565,10 +558,22 @@ class Log(@volatile var dir: File, recoveryPoint } - private def loadProducerState(lastOffset: Long, reloadFromCleanShutdown: Boolean): Unit = lock synchronized { + // Rebuild producer state until lastOffset. This method may be called from the recovery code path, and thus must be + // free of all side-effects, i.e. it must not update any log-specific state. + private def rebuildProducerState(lastOffset: Long, + reloadFromCleanShutdown: Boolean, + producerStateManager: ProducerStateManager): Unit = lock synchronized { checkIfMemoryMappedBufferClosed() val messageFormatVersion = config.messageFormatVersion.recordVersion.value - info(s"Loading producer state from offset $lastOffset with message format version $messageFormatVersion") + val segments = logSegments + val offsetsToSnapshot = + if (segments.nonEmpty) { + val nextLatestSegmentBaseOffset = lowerSegment(segments.last.baseOffset).map(_.baseOffset) + Seq(nextLatestSegmentBaseOffset, Some(segments.last.baseOffset), Some(lastOffset)) + } else { + Seq(Some(lastOffset)) + } + info(s"Loading producer state till offset $lastOffset with message format version $messageFormatVersion") // We want to avoid unnecessary scanning of the log to build the producer state when the broker is being // upgraded. The basic idea is to use the absence of producer snapshot files to detect the upgrade case, @@ -582,13 +587,11 @@ class Log(@volatile var dir: File, // offset (see below). The next time the log is reloaded, we will load producer state using this snapshot // (or later snapshots). Otherwise, if there is no snapshot file, then we have to rebuild producer state // from the first segment. - - if (producerStateManager.latestSnapshotOffset.isEmpty && (messageFormatVersion < RecordBatch.MAGIC_VALUE_V2 || reloadFromCleanShutdown)) { + if (messageFormatVersion < RecordBatch.MAGIC_VALUE_V2 || + (producerStateManager.latestSnapshotOffset.isEmpty && reloadFromCleanShutdown)) { // To avoid an expensive scan through all of the segments, we take empty snapshots from the start of the // last two segments and the last offset. This should avoid the full scan in the case that the log needs // truncation. - val nextLatestSegmentBaseOffset = lowerSegment(activeSegment.baseOffset).map(_.baseOffset) - val offsetsToSnapshot = Seq(nextLatestSegmentBaseOffset, Some(activeSegment.baseOffset), Some(lastOffset)) offsetsToSnapshot.flatten.foreach { offset => producerStateManager.updateMapEndOffset(offset) producerStateManager.takeSnapshot() @@ -607,19 +610,25 @@ class Log(@volatile var dir: File, logSegments(producerStateManager.mapEndOffset, lastOffset).foreach { segment => val startOffset = Utils.max(segment.baseOffset, producerStateManager.mapEndOffset, logStartOffset) producerStateManager.updateMapEndOffset(startOffset) - producerStateManager.takeSnapshot() + + if (offsetsToSnapshot.contains(Some(segment.baseOffset))) + producerStateManager.takeSnapshot() val fetchDataInfo = segment.read(startOffset, Some(lastOffset), Int.MaxValue) if (fetchDataInfo != null) loadProducersFromLog(producerStateManager, fetchDataInfo.records) } } - producerStateManager.updateMapEndOffset(lastOffset) - updateFirstUnstableOffset() + producerStateManager.takeSnapshot() } } + private def loadProducerState(lastOffset: Long, reloadFromCleanShutdown: Boolean): Unit = lock synchronized { + rebuildProducerState(lastOffset, reloadFromCleanShutdown, producerStateManager) + updateFirstUnstableOffset() + } + private def loadProducersFromLog(producerStateManager: ProducerStateManager, records: Records): Unit = { val loadedProducers = mutable.Map.empty[Long, ProducerAppendInfo] val completedTxns = ListBuffer.empty[CompletedTxn] diff --git a/core/src/main/scala/kafka/log/OffsetIndex.scala b/core/src/main/scala/kafka/log/OffsetIndex.scala index 2babd007a5a..6f246eedf1f 100755 --- a/core/src/main/scala/kafka/log/OffsetIndex.scala +++ b/core/src/main/scala/kafka/log/OffsetIndex.scala @@ -188,9 +188,9 @@ class OffsetIndex(_file: File, baseOffset: Long, maxIndexSize: Int = -1, writabl } override def sanityCheck() { - if (_entries != 0 && _lastOffset <= baseOffset) + if (_entries != 0 && _lastOffset < baseOffset) throw new CorruptIndexException(s"Corrupt index found, index file (${file.getAbsolutePath}) has non-zero size " + - s"but the last offset is ${_lastOffset} which is no greater than the base offset $baseOffset.") + s"but the last offset is ${_lastOffset} which is less than the base offset $baseOffset.") if (length % entrySize != 0) throw new CorruptIndexException(s"Index file ${file.getAbsolutePath} is corrupt, found $length bytes which is " + s"neither positive nor a multiple of $entrySize.") diff --git a/core/src/test/scala/unit/kafka/log/LogTest.scala b/core/src/test/scala/unit/kafka/log/LogTest.scala index 3b5b2fa7875..9a9bc613585 100755 --- a/core/src/test/scala/unit/kafka/log/LogTest.scala +++ b/core/src/test/scala/unit/kafka/log/LogTest.scala @@ -22,6 +22,7 @@ import java.nio.ByteBuffer import java.nio.file.{Files, Paths} import java.util.Properties +import kafka.api.{ApiVersion, KAFKA_0_11_0_IV0} import kafka.common.{OffsetsOutOfOrderException, UnexpectedAppendOffsetException} import kafka.log.Log.DeleteDirSuffix import kafka.server.epoch.{EpochEntry, LeaderEpochCache, LeaderEpochFileCache} @@ -206,8 +207,6 @@ class LogTest { // Reload after unclean shutdown with recoveryPoint set to log end offset log = createLog(logDir, logConfig, recoveryPoint = logEndOffset) - // Note that we don't maintain the guarantee of having a snapshot for the 2 most recent segments in this case - expectedSnapshotOffsets = Vector(log.logSegments.last.baseOffset, log.logEndOffset) assertEquals(expectedSnapshotOffsets, listProducerSnapshotOffsets) log.close() @@ -215,15 +214,24 @@ class LogTest { // Reload after unclean shutdown with recoveryPoint set to 0 log = createLog(logDir, logConfig, recoveryPoint = 0L) - // Is this working as intended? + // We progressively create a snapshot for each segment after the recovery point expectedSnapshotOffsets = log.logSegments.map(_.baseOffset).tail.toVector :+ log.logEndOffset assertEquals(expectedSnapshotOffsets, listProducerSnapshotOffsets) log.close() } @Test - def testProducerSnapshotsRecoveryAfterUncleanShutdown(): Unit = { - val logConfig = LogTest.createLogConfig(segmentBytes = 64 * 10) + def testProducerSnapshotsRecoveryAfterUncleanShutdownV1(): Unit = { + testProducerSnapshotsRecoveryAfterUncleanShutdown(ApiVersion.minSupportedFor(RecordVersion.V1).version) + } + + @Test + def testProducerSnapshotsRecoveryAfterUncleanShutdownCurrentMessageFormat(): Unit = { + testProducerSnapshotsRecoveryAfterUncleanShutdown(ApiVersion.latestVersion.version) + } + + private def testProducerSnapshotsRecoveryAfterUncleanShutdown(messageFormatVersion: String): Unit = { + val logConfig = LogTest.createLogConfig(segmentBytes = 64 * 10, messageFormatVersion = messageFormatVersion) var log = createLog(logDir, logConfig) assertEquals(None, log.oldestProducerSnapshotOffset) @@ -247,6 +255,16 @@ class LogTest { val segmentsWithReads = ArrayBuffer[LogSegment]() val recoveredSegments = ArrayBuffer[LogSegment]() + val expectedSegmentsWithReads = ArrayBuffer[Long]() + val expectedSnapshotOffsets = ArrayBuffer[Long]() + + if (logConfig.messageFormatVersion < KAFKA_0_11_0_IV0) { + expectedSegmentsWithReads += activeSegmentOffset + expectedSnapshotOffsets ++= log.logSegments.map(_.baseOffset).toVector.takeRight(2) :+ log.logEndOffset + } else { + expectedSegmentsWithReads ++= segOffsetsBeforeRecovery ++ Seq(activeSegmentOffset) + expectedSnapshotOffsets ++= log.logSegments.map(_.baseOffset).toVector.takeRight(4) :+ log.logEndOffset + } def createLogWithInterceptedReads(recoveryPoint: Long) = { val maxProducerIdExpirationMs = 60 * 60 * 1000 @@ -283,9 +301,8 @@ class LogTest { ProducerStateManager.deleteSnapshotsBefore(logDir, segmentOffsets(segmentOffsets.size - 2)) log = createLogWithInterceptedReads(offsetForRecoveryPointSegment) // We will reload all segments because the recovery point is behind the producer snapshot files (pre KAFKA-5829 behaviour) - assertEquals(segOffsetsBeforeRecovery, segmentsWithReads.map(_.baseOffset) -- Seq(activeSegmentOffset)) + assertEquals(expectedSegmentsWithReads, segmentsWithReads.map(_.baseOffset)) assertEquals(segOffsetsAfterRecovery, recoveredSegments.map(_.baseOffset)) - var expectedSnapshotOffsets = segmentOffsets.takeRight(4) :+ log.logEndOffset assertEquals(expectedSnapshotOffsets, listProducerSnapshotOffsets) log.close() segmentsWithReads.clear() @@ -297,13 +314,12 @@ class LogTest { log = createLogWithInterceptedReads(recoveryPoint = recoveryPoint) assertEquals(Seq(activeSegmentOffset), segmentsWithReads.map(_.baseOffset)) assertEquals(segOffsetsAfterRecovery, recoveredSegments.map(_.baseOffset)) - expectedSnapshotOffsets = log.logSegments.map(_.baseOffset).toVector.takeRight(4) :+ log.logEndOffset assertEquals(expectedSnapshotOffsets, listProducerSnapshotOffsets) // Verify that we keep 2 snapshot files if we checkpoint the log end offset log.deleteSnapshotsAfterRecoveryPointCheckpoint() - expectedSnapshotOffsets = log.logSegments.map(_.baseOffset).toVector.takeRight(2) :+ log.logEndOffset - assertEquals(expectedSnapshotOffsets, listProducerSnapshotOffsets) + val expectedSnapshotsAfterDelete = log.logSegments.map(_.baseOffset).toVector.takeRight(2) :+ log.logEndOffset + assertEquals(expectedSnapshotsAfterDelete, listProducerSnapshotOffsets) log.close() } @@ -398,6 +414,9 @@ class LogTest { // We skip directly to updating the map end offset stateManager.updateMapEndOffset(1L) EasyMock.expectLastCall() + // Finally, we take a snapshot + stateManager.takeSnapshot() + EasyMock.expectLastCall().once() EasyMock.replay(stateManager) @@ -410,14 +429,18 @@ class LogTest { def testSkipTruncateAndReloadIfOldMessageFormatAndNoCleanShutdown(): Unit = { val stateManager = EasyMock.mock(classOf[ProducerStateManager]) - EasyMock.expect(stateManager.latestSnapshotOffset).andReturn(None) - stateManager.updateMapEndOffset(0L) EasyMock.expectLastCall().anyTimes() stateManager.takeSnapshot() EasyMock.expectLastCall().anyTimes() + EasyMock.expect(stateManager.isEmpty).andReturn(true) + EasyMock.expectLastCall().once() + + EasyMock.expect(stateManager.firstUnstableOffset).andReturn(None) + EasyMock.expectLastCall().once() + EasyMock.replay(stateManager) val logProps = new Properties() @@ -443,14 +466,18 @@ class LogTest { def testSkipTruncateAndReloadIfOldMessageFormatAndCleanShutdown(): Unit = { val stateManager = EasyMock.mock(classOf[ProducerStateManager]) - EasyMock.expect(stateManager.latestSnapshotOffset).andReturn(None) - stateManager.updateMapEndOffset(0L) EasyMock.expectLastCall().anyTimes() stateManager.takeSnapshot() EasyMock.expectLastCall().anyTimes() + EasyMock.expect(stateManager.isEmpty).andReturn(true) + EasyMock.expectLastCall().once() + + EasyMock.expect(stateManager.firstUnstableOffset).andReturn(None) + EasyMock.expectLastCall().once() + EasyMock.replay(stateManager) val cleanShutdownFile = createCleanShutdownFile() @@ -487,6 +514,12 @@ class LogTest { stateManager.takeSnapshot() EasyMock.expectLastCall().anyTimes() + EasyMock.expect(stateManager.isEmpty).andReturn(true) + EasyMock.expectLastCall().once() + + EasyMock.expect(stateManager.firstUnstableOffset).andReturn(None) + EasyMock.expectLastCall().once() + EasyMock.replay(stateManager) val cleanShutdownFile = createCleanShutdownFile() @@ -644,8 +677,12 @@ class LogTest { assertEquals(2, log.latestProducerStateEndOffset) log.truncateTo(1) - assertEquals(None, log.latestProducerSnapshotOffset) + assertEquals(Some(1), log.latestProducerSnapshotOffset) assertEquals(1, log.latestProducerStateEndOffset) + + log.truncateTo(0) + assertEquals(None, log.latestProducerSnapshotOffset) + assertEquals(0, log.latestProducerStateEndOffset) } @Test diff --git a/core/src/test/scala/unit/kafka/log/OffsetIndexTest.scala b/core/src/test/scala/unit/kafka/log/OffsetIndexTest.scala index 1529597cf41..f47da995dd5 100644 --- a/core/src/test/scala/unit/kafka/log/OffsetIndexTest.scala +++ b/core/src/test/scala/unit/kafka/log/OffsetIndexTest.scala @@ -178,6 +178,15 @@ class OffsetIndexTest extends JUnitSuite { // mmap should be null after unmap causing lookup to throw a NPE intercept[NullPointerException](idx.lookup(1)) } + + @Test + def testSanityLastOffsetEqualToBaseOffset(): Unit = { + // Test index sanity for the case where the last offset appended to the index is equal to the base offset + val baseOffset = 20L + val idx = new OffsetIndex(nonExistentTempFile(), baseOffset = baseOffset, maxIndexSize = 10 * 8) + idx.append(baseOffset, 0) + idx.sanityCheck() + } def assertWriteFails[T](message: String, idx: OffsetIndex, offset: Int, klass: Class[T]) { try {