From 6964c356aa0acdb96d5ccb689a088ba224cf41f5 Mon Sep 17 00:00:00 2001 From: Jason Gustafson Date: Thu, 18 Apr 2019 08:41:32 -0700 Subject: [PATCH] KAFKA-7866; Ensure no duplicate offsets after txn index append failure (#6570) This patch fixes a bug in the append logic which can cause duplicate offsets to be appended to the log when the append to the transaction index fails. Rather than incrementing the log end offset after the index append, we do it immediately after the records are written to the log. If the index append later fails, we do two things: 1) We ensure that the last stable offset cannot advance. This guarantees that the aborted data will not be returned to the user until the transaction index contains the corresponding entry. 2) We skip updating the end offset of the producer state. When recovering the log, we will have to reprocess the log and write the index entries. Reviewers: Jun Rao --- core/src/main/scala/kafka/log/Log.scala | 14 +++-- .../src/main/scala/kafka/log/LogSegment.scala | 3 +- .../kafka/log/ProducerStateManager.scala | 21 +++++--- .../scala/kafka/log/TransactionIndex.scala | 6 +-- .../test/scala/unit/kafka/log/LogTest.scala | 41 +++++++++++++++ .../kafka/log/ProducerStateManagerTest.scala | 52 ++++++++++++++++++- 6 files changed, 122 insertions(+), 15 deletions(-) diff --git a/core/src/main/scala/kafka/log/Log.scala b/core/src/main/scala/kafka/log/Log.scala index 5ad3c3e5810..87179db1e73 100644 --- a/core/src/main/scala/kafka/log/Log.scala +++ b/core/src/main/scala/kafka/log/Log.scala @@ -943,6 +943,14 @@ class Log(@volatile var dir: File, shallowOffsetOfMaxTimestamp = appendInfo.offsetOfMaxTimestamp, records = validRecords) + // Increment the log end offset. We do this immediately after the append because a + // write to the transaction index below may fail and we want to ensure that the offsets + // of future appends still grow monotonically. The resulting transaction index inconsistency + // will be cleaned up after the log directory is recovered. Note that the end offset of the + // ProducerStateManager will not be updated and the last stable offset will not advance + // if the append to the transaction index fails. + updateLogEndOffset(appendInfo.lastOffset + 1) + // update the producer state for ((_, producerAppendInfo) <- updatedProducers) { producerAppendInfo.maybeCacheTxnFirstOffsetMetadata(logOffsetMetadata) @@ -952,17 +960,15 @@ class Log(@volatile var dir: File, // update the transaction index with the true last stable offset. The last offset visible // to consumers using READ_COMMITTED will be limited by this value and the high watermark. for (completedTxn <- completedTxns) { - val lastStableOffset = producerStateManager.completeTxn(completedTxn) + val lastStableOffset = producerStateManager.lastStableOffset(completedTxn) segment.updateTxnIndex(completedTxn, lastStableOffset) + producerStateManager.completeTxn(completedTxn) } // always update the last producer id map offset so that the snapshot reflects the current offset // even if there isn't any idempotent data being written producerStateManager.updateMapEndOffset(appendInfo.lastOffset + 1) - // increment the log end offset - updateLogEndOffset(appendInfo.lastOffset + 1) - // update the first unstable offset (which is used to compute LSO) updateFirstUnstableOffset() diff --git a/core/src/main/scala/kafka/log/LogSegment.scala b/core/src/main/scala/kafka/log/LogSegment.scala index 9168ce01dd5..624f3ea43ad 100755 --- a/core/src/main/scala/kafka/log/LogSegment.scala +++ b/core/src/main/scala/kafka/log/LogSegment.scala @@ -247,8 +247,9 @@ class LogSegment private[log] (val log: FileRecords, val maybeCompletedTxn = appendInfo.append(batch) producerStateManager.update(appendInfo) maybeCompletedTxn.foreach { completedTxn => - val lastStableOffset = producerStateManager.completeTxn(completedTxn) + val lastStableOffset = producerStateManager.lastStableOffset(completedTxn) updateTxnIndex(completedTxn, lastStableOffset) + producerStateManager.completeTxn(completedTxn) } } producerStateManager.updateMapEndOffset(batch.lastOffset + 1) diff --git a/core/src/main/scala/kafka/log/ProducerStateManager.scala b/core/src/main/scala/kafka/log/ProducerStateManager.scala index 59bc41772bb..3db436ef280 100644 --- a/core/src/main/scala/kafka/log/ProducerStateManager.scala +++ b/core/src/main/scala/kafka/log/ProducerStateManager.scala @@ -31,6 +31,7 @@ import org.apache.kafka.common.protocol.types._ import org.apache.kafka.common.record.{ControlRecordType, DefaultRecordBatch, EndTransactionMarker, RecordBatch} import org.apache.kafka.common.utils.{ByteUtils, Crc32C} +import scala.collection.JavaConverters._ import scala.collection.mutable.ListBuffer import scala.collection.{immutable, mutable} @@ -605,7 +606,7 @@ class ProducerStateManager(val topicPartition: TopicPartition, * snapshot in range (if there is one). Note that the log end offset is assumed to be less than * or equal to the high watermark. */ - def truncateAndReload(logStartOffset: Long, logEndOffset: Long, currentTimeMs: Long) { + def truncateAndReload(logStartOffset: Long, logEndOffset: Long, currentTimeMs: Long): Unit = { // remove all out of range snapshots deleteSnapshotFiles(logDir, { snapOffset => snapOffset > logEndOffset || snapOffset <= logStartOffset @@ -757,9 +758,20 @@ class ProducerStateManager(val topicPartition: TopicPartition, } /** - * Complete the transaction and return the last stable offset. + * Compute the last stable offset of a completed transaction, but do not yet mark the transaction complete. + * That will be done in `completeTxn` below. This is used to compute the LSO that will be appended to the + * transaction index, but the completion must be done only after successfully appending to the index. */ - def completeTxn(completedTxn: CompletedTxn): Long = { + def lastStableOffset(completedTxn: CompletedTxn): Long = { + val nextIncompleteTxn = ongoingTxns.values.asScala.find(_.producerId != completedTxn.producerId) + nextIncompleteTxn.map(_.firstOffset.messageOffset).getOrElse(completedTxn.lastOffset + 1) + } + + /** + * Mark a transaction as completed. We will still await advancement of the high watermark before + * advancing the first unstable offset. + */ + def completeTxn(completedTxn: CompletedTxn): Unit = { val txnMetadata = ongoingTxns.remove(completedTxn.firstOffset) if (txnMetadata == null) throw new IllegalArgumentException(s"Attempted to complete transaction $completedTxn on partition $topicPartition " + @@ -767,9 +779,6 @@ class ProducerStateManager(val topicPartition: TopicPartition, txnMetadata.lastOffset = Some(completedTxn.lastOffset) unreplicatedTxns.put(completedTxn.firstOffset, txnMetadata) - - val lastStableOffset = firstUndecidedOffset.getOrElse(completedTxn.lastOffset + 1) - lastStableOffset } @threadsafe diff --git a/core/src/main/scala/kafka/log/TransactionIndex.scala b/core/src/main/scala/kafka/log/TransactionIndex.scala index e730fdba985..696bc3a2a4f 100644 --- a/core/src/main/scala/kafka/log/TransactionIndex.scala +++ b/core/src/main/scala/kafka/log/TransactionIndex.scala @@ -57,7 +57,7 @@ class TransactionIndex(val startOffset: Long, @volatile var file: File) extends s"${abortedTxn.lastOffset} is not greater than current last offset $offset of index ${file.getAbsolutePath}") } lastOffset = Some(abortedTxn.lastOffset) - Utils.writeFully(channel, abortedTxn.buffer.duplicate()) + Utils.writeFully(channel(), abortedTxn.buffer.duplicate()) } def flush(): Unit = maybeChannel.foreach(_.force(true)) @@ -74,7 +74,7 @@ class TransactionIndex(val startOffset: Long, @volatile var file: File) extends Files.deleteIfExists(file.toPath) } - private def channel: FileChannel = { + private def channel(): FileChannel = { maybeChannel match { case Some(channel) => channel case None => openChannel() @@ -114,7 +114,7 @@ class TransactionIndex(val startOffset: Long, @volatile var file: File) extends var newLastOffset: Option[Long] = None for ((abortedTxn, position) <- iterator(() => buffer)) { if (abortedTxn.lastOffset >= offset) { - channel.truncate(position) + channel().truncate(position) lastOffset = newLastOffset return } diff --git a/core/src/test/scala/unit/kafka/log/LogTest.scala b/core/src/test/scala/unit/kafka/log/LogTest.scala index cb2b4b27a7a..2b3e3620981 100755 --- a/core/src/test/scala/unit/kafka/log/LogTest.scala +++ b/core/src/test/scala/unit/kafka/log/LogTest.scala @@ -3531,6 +3531,47 @@ class LogTest { assertEquals(Some(8L), log.firstUnstableOffset.map(_.messageOffset)) } + @Test + def testAppendToTransactionIndexFailure(): Unit = { + val pid = 1L + val epoch = 0.toShort + val logConfig = LogTest.createLogConfig(segmentBytes = 1024 * 1024 * 5) + val log = createLog(logDir, logConfig) + + val append = appendTransactionalAsLeader(log, pid, epoch) + append(10) + + // Kind of a hack, but renaming the index to a directory ensures that the append + // to the index will fail. + log.activeSegment.txnIndex.renameTo(log.dir) + + // The append will be written to the log successfully, but the write to the index will fail + assertThrows[KafkaStorageException] { + appendEndTxnMarkerAsLeader(log, pid, epoch, ControlRecordType.ABORT, coordinatorEpoch = 1) + } + assertEquals(11L, log.logEndOffset) + assertEquals(Some(0L), log.firstUnstableOffset.map(_.messageOffset)) + + // Try the append a second time. The appended offset in the log should still increase. + assertThrows[KafkaStorageException] { + appendEndTxnMarkerAsLeader(log, pid, epoch, ControlRecordType.ABORT, coordinatorEpoch = 1) + } + assertEquals(12L, log.logEndOffset) + assertEquals(Some(0L), log.firstUnstableOffset.map(_.messageOffset)) + + // Even if the high watermark is updated, the first unstable offset does not move + log.onHighWatermarkIncremented(12L) + assertEquals(Some(0L), log.firstUnstableOffset.map(_.messageOffset)) + + log.close() + + val reopenedLog = createLog(logDir, logConfig) + assertEquals(12L, reopenedLog.logEndOffset) + assertEquals(2, reopenedLog.activeSegment.txnIndex.allAbortedTxns.size) + reopenedLog.onHighWatermarkIncremented(12L) + assertEquals(None, reopenedLog.firstUnstableOffset.map(_.messageOffset)) + } + @Test def testLastStableOffsetWithMixedProducerData() { val logConfig = LogTest.createLogConfig(segmentBytes = 1024 * 1024 * 5) diff --git a/core/src/test/scala/unit/kafka/log/ProducerStateManagerTest.scala b/core/src/test/scala/unit/kafka/log/ProducerStateManagerTest.scala index 26067b407fb..2062fae1b8f 100644 --- a/core/src/test/scala/unit/kafka/log/ProducerStateManagerTest.scala +++ b/core/src/test/scala/unit/kafka/log/ProducerStateManagerTest.scala @@ -219,6 +219,55 @@ class ProducerStateManagerTest extends JUnitSuite { assertEquals(Some(logOffsetMetadata), stateManager.firstUnstableOffset) } + @Test + def testLastStableOffsetCompletedTxn(): Unit = { + val producerEpoch = 0.toShort + val segmentBaseOffset = 990000L + + def beginTxn(producerId: Long, startOffset: Long): Unit = { + val relativeOffset = (startOffset - segmentBaseOffset).toInt + val producerAppendInfo = new ProducerAppendInfo(producerId, ProducerStateEntry.empty(producerId), ValidationType.Full) + producerAppendInfo.append(producerEpoch, 0, 0, time.milliseconds(), startOffset, startOffset, isTransactional = true) + val logOffsetMetadata = LogOffsetMetadata(messageOffset = startOffset, segmentBaseOffset = segmentBaseOffset, + relativePositionInSegment = 50 * relativeOffset) + producerAppendInfo.maybeCacheTxnFirstOffsetMetadata(logOffsetMetadata) + stateManager.update(producerAppendInfo) + } + + val producerId1 = producerId + val startOffset1 = 992342L + beginTxn(producerId1, startOffset1) + + val producerId2 = producerId + 1 + val startOffset2 = startOffset1 + 25 + beginTxn(producerId2, startOffset2) + + val producerId3 = producerId + 2 + val startOffset3 = startOffset1 + 57 + beginTxn(producerId3, startOffset3) + + val lastOffset1 = startOffset3 + 15 + val completedTxn1 = CompletedTxn(producerId1, startOffset1, lastOffset1, isAborted = false) + assertEquals(startOffset2, stateManager.lastStableOffset(completedTxn1)) + stateManager.completeTxn(completedTxn1) + stateManager.onHighWatermarkUpdated(lastOffset1 + 1) + assertEquals(Some(startOffset2), stateManager.firstUnstableOffset.map(_.messageOffset)) + + val lastOffset3 = lastOffset1 + 20 + val completedTxn3 = CompletedTxn(producerId3, startOffset3, lastOffset3, isAborted = false) + assertEquals(startOffset2, stateManager.lastStableOffset(completedTxn3)) + stateManager.completeTxn(completedTxn3) + stateManager.onHighWatermarkUpdated(lastOffset3 + 1) + assertEquals(Some(startOffset2), stateManager.firstUnstableOffset.map(_.messageOffset)) + + val lastOffset2 = lastOffset3 + 78 + val completedTxn2 = CompletedTxn(producerId2, startOffset2, lastOffset2, isAborted = false) + assertEquals(lastOffset2 + 1, stateManager.lastStableOffset(completedTxn2)) + stateManager.completeTxn(completedTxn2) + stateManager.onHighWatermarkUpdated(lastOffset2 + 1) + assertEquals(None, stateManager.firstUnstableOffset) + } + @Test def testNonMatchingTxnFirstOffsetMetadataNotCached(): Unit = { val producerEpoch = 0.toShort @@ -823,7 +872,8 @@ class ProducerStateManagerTest extends JUnitSuite { val endTxnMarker = new EndTransactionMarker(controlType, coordinatorEpoch) val completedTxn = producerAppendInfo.appendEndTxnMarker(endTxnMarker, producerEpoch, offset, timestamp) mapping.update(producerAppendInfo) - val lastStableOffset = mapping.completeTxn(completedTxn) + val lastStableOffset = mapping.lastStableOffset(completedTxn) + mapping.completeTxn(completedTxn) mapping.updateMapEndOffset(offset + 1) (completedTxn, lastStableOffset) }