diff --git a/core/src/main/scala/kafka/log/Log.scala b/core/src/main/scala/kafka/log/Log.scala index 5ad3c3e5810..87179db1e73 100644 --- a/core/src/main/scala/kafka/log/Log.scala +++ b/core/src/main/scala/kafka/log/Log.scala @@ -943,6 +943,14 @@ class Log(@volatile var dir: File, shallowOffsetOfMaxTimestamp = appendInfo.offsetOfMaxTimestamp, records = validRecords) + // Increment the log end offset. We do this immediately after the append because a + // write to the transaction index below may fail and we want to ensure that the offsets + // of future appends still grow monotonically. The resulting transaction index inconsistency + // will be cleaned up after the log directory is recovered. Note that the end offset of the + // ProducerStateManager will not be updated and the last stable offset will not advance + // if the append to the transaction index fails. + updateLogEndOffset(appendInfo.lastOffset + 1) + // update the producer state for ((_, producerAppendInfo) <- updatedProducers) { producerAppendInfo.maybeCacheTxnFirstOffsetMetadata(logOffsetMetadata) @@ -952,17 +960,15 @@ class Log(@volatile var dir: File, // update the transaction index with the true last stable offset. The last offset visible // to consumers using READ_COMMITTED will be limited by this value and the high watermark. for (completedTxn <- completedTxns) { - val lastStableOffset = producerStateManager.completeTxn(completedTxn) + val lastStableOffset = producerStateManager.lastStableOffset(completedTxn) segment.updateTxnIndex(completedTxn, lastStableOffset) + producerStateManager.completeTxn(completedTxn) } // always update the last producer id map offset so that the snapshot reflects the current offset // even if there isn't any idempotent data being written producerStateManager.updateMapEndOffset(appendInfo.lastOffset + 1) - // increment the log end offset - updateLogEndOffset(appendInfo.lastOffset + 1) - // update the first unstable offset (which is used to compute LSO) updateFirstUnstableOffset() diff --git a/core/src/main/scala/kafka/log/LogSegment.scala b/core/src/main/scala/kafka/log/LogSegment.scala index 9168ce01dd5..624f3ea43ad 100755 --- a/core/src/main/scala/kafka/log/LogSegment.scala +++ b/core/src/main/scala/kafka/log/LogSegment.scala @@ -247,8 +247,9 @@ class LogSegment private[log] (val log: FileRecords, val maybeCompletedTxn = appendInfo.append(batch) producerStateManager.update(appendInfo) maybeCompletedTxn.foreach { completedTxn => - val lastStableOffset = producerStateManager.completeTxn(completedTxn) + val lastStableOffset = producerStateManager.lastStableOffset(completedTxn) updateTxnIndex(completedTxn, lastStableOffset) + producerStateManager.completeTxn(completedTxn) } } producerStateManager.updateMapEndOffset(batch.lastOffset + 1) diff --git a/core/src/main/scala/kafka/log/ProducerStateManager.scala b/core/src/main/scala/kafka/log/ProducerStateManager.scala index 59bc41772bb..3db436ef280 100644 --- a/core/src/main/scala/kafka/log/ProducerStateManager.scala +++ b/core/src/main/scala/kafka/log/ProducerStateManager.scala @@ -31,6 +31,7 @@ import org.apache.kafka.common.protocol.types._ import org.apache.kafka.common.record.{ControlRecordType, DefaultRecordBatch, EndTransactionMarker, RecordBatch} import org.apache.kafka.common.utils.{ByteUtils, Crc32C} +import scala.collection.JavaConverters._ import scala.collection.mutable.ListBuffer import scala.collection.{immutable, mutable} @@ -605,7 +606,7 @@ class ProducerStateManager(val topicPartition: TopicPartition, * snapshot in range (if there is one). Note that the log end offset is assumed to be less than * or equal to the high watermark. */ - def truncateAndReload(logStartOffset: Long, logEndOffset: Long, currentTimeMs: Long) { + def truncateAndReload(logStartOffset: Long, logEndOffset: Long, currentTimeMs: Long): Unit = { // remove all out of range snapshots deleteSnapshotFiles(logDir, { snapOffset => snapOffset > logEndOffset || snapOffset <= logStartOffset @@ -757,9 +758,20 @@ class ProducerStateManager(val topicPartition: TopicPartition, } /** - * Complete the transaction and return the last stable offset. + * Compute the last stable offset of a completed transaction, but do not yet mark the transaction complete. + * That will be done in `completeTxn` below. This is used to compute the LSO that will be appended to the + * transaction index, but the completion must be done only after successfully appending to the index. */ - def completeTxn(completedTxn: CompletedTxn): Long = { + def lastStableOffset(completedTxn: CompletedTxn): Long = { + val nextIncompleteTxn = ongoingTxns.values.asScala.find(_.producerId != completedTxn.producerId) + nextIncompleteTxn.map(_.firstOffset.messageOffset).getOrElse(completedTxn.lastOffset + 1) + } + + /** + * Mark a transaction as completed. We will still await advancement of the high watermark before + * advancing the first unstable offset. + */ + def completeTxn(completedTxn: CompletedTxn): Unit = { val txnMetadata = ongoingTxns.remove(completedTxn.firstOffset) if (txnMetadata == null) throw new IllegalArgumentException(s"Attempted to complete transaction $completedTxn on partition $topicPartition " + @@ -767,9 +779,6 @@ class ProducerStateManager(val topicPartition: TopicPartition, txnMetadata.lastOffset = Some(completedTxn.lastOffset) unreplicatedTxns.put(completedTxn.firstOffset, txnMetadata) - - val lastStableOffset = firstUndecidedOffset.getOrElse(completedTxn.lastOffset + 1) - lastStableOffset } @threadsafe diff --git a/core/src/main/scala/kafka/log/TransactionIndex.scala b/core/src/main/scala/kafka/log/TransactionIndex.scala index e730fdba985..696bc3a2a4f 100644 --- a/core/src/main/scala/kafka/log/TransactionIndex.scala +++ b/core/src/main/scala/kafka/log/TransactionIndex.scala @@ -57,7 +57,7 @@ class TransactionIndex(val startOffset: Long, @volatile var file: File) extends s"${abortedTxn.lastOffset} is not greater than current last offset $offset of index ${file.getAbsolutePath}") } lastOffset = Some(abortedTxn.lastOffset) - Utils.writeFully(channel, abortedTxn.buffer.duplicate()) + Utils.writeFully(channel(), abortedTxn.buffer.duplicate()) } def flush(): Unit = maybeChannel.foreach(_.force(true)) @@ -74,7 +74,7 @@ class TransactionIndex(val startOffset: Long, @volatile var file: File) extends Files.deleteIfExists(file.toPath) } - private def channel: FileChannel = { + private def channel(): FileChannel = { maybeChannel match { case Some(channel) => channel case None => openChannel() @@ -114,7 +114,7 @@ class TransactionIndex(val startOffset: Long, @volatile var file: File) extends var newLastOffset: Option[Long] = None for ((abortedTxn, position) <- iterator(() => buffer)) { if (abortedTxn.lastOffset >= offset) { - channel.truncate(position) + channel().truncate(position) lastOffset = newLastOffset return } diff --git a/core/src/test/scala/unit/kafka/log/LogTest.scala b/core/src/test/scala/unit/kafka/log/LogTest.scala index cb2b4b27a7a..2b3e3620981 100755 --- a/core/src/test/scala/unit/kafka/log/LogTest.scala +++ b/core/src/test/scala/unit/kafka/log/LogTest.scala @@ -3531,6 +3531,47 @@ class LogTest { assertEquals(Some(8L), log.firstUnstableOffset.map(_.messageOffset)) } + @Test + def testAppendToTransactionIndexFailure(): Unit = { + val pid = 1L + val epoch = 0.toShort + val logConfig = LogTest.createLogConfig(segmentBytes = 1024 * 1024 * 5) + val log = createLog(logDir, logConfig) + + val append = appendTransactionalAsLeader(log, pid, epoch) + append(10) + + // Kind of a hack, but renaming the index to a directory ensures that the append + // to the index will fail. + log.activeSegment.txnIndex.renameTo(log.dir) + + // The append will be written to the log successfully, but the write to the index will fail + assertThrows[KafkaStorageException] { + appendEndTxnMarkerAsLeader(log, pid, epoch, ControlRecordType.ABORT, coordinatorEpoch = 1) + } + assertEquals(11L, log.logEndOffset) + assertEquals(Some(0L), log.firstUnstableOffset.map(_.messageOffset)) + + // Try the append a second time. The appended offset in the log should still increase. + assertThrows[KafkaStorageException] { + appendEndTxnMarkerAsLeader(log, pid, epoch, ControlRecordType.ABORT, coordinatorEpoch = 1) + } + assertEquals(12L, log.logEndOffset) + assertEquals(Some(0L), log.firstUnstableOffset.map(_.messageOffset)) + + // Even if the high watermark is updated, the first unstable offset does not move + log.onHighWatermarkIncremented(12L) + assertEquals(Some(0L), log.firstUnstableOffset.map(_.messageOffset)) + + log.close() + + val reopenedLog = createLog(logDir, logConfig) + assertEquals(12L, reopenedLog.logEndOffset) + assertEquals(2, reopenedLog.activeSegment.txnIndex.allAbortedTxns.size) + reopenedLog.onHighWatermarkIncremented(12L) + assertEquals(None, reopenedLog.firstUnstableOffset.map(_.messageOffset)) + } + @Test def testLastStableOffsetWithMixedProducerData() { val logConfig = LogTest.createLogConfig(segmentBytes = 1024 * 1024 * 5) diff --git a/core/src/test/scala/unit/kafka/log/ProducerStateManagerTest.scala b/core/src/test/scala/unit/kafka/log/ProducerStateManagerTest.scala index 26067b407fb..2062fae1b8f 100644 --- a/core/src/test/scala/unit/kafka/log/ProducerStateManagerTest.scala +++ b/core/src/test/scala/unit/kafka/log/ProducerStateManagerTest.scala @@ -219,6 +219,55 @@ class ProducerStateManagerTest extends JUnitSuite { assertEquals(Some(logOffsetMetadata), stateManager.firstUnstableOffset) } + @Test + def testLastStableOffsetCompletedTxn(): Unit = { + val producerEpoch = 0.toShort + val segmentBaseOffset = 990000L + + def beginTxn(producerId: Long, startOffset: Long): Unit = { + val relativeOffset = (startOffset - segmentBaseOffset).toInt + val producerAppendInfo = new ProducerAppendInfo(producerId, ProducerStateEntry.empty(producerId), ValidationType.Full) + producerAppendInfo.append(producerEpoch, 0, 0, time.milliseconds(), startOffset, startOffset, isTransactional = true) + val logOffsetMetadata = LogOffsetMetadata(messageOffset = startOffset, segmentBaseOffset = segmentBaseOffset, + relativePositionInSegment = 50 * relativeOffset) + producerAppendInfo.maybeCacheTxnFirstOffsetMetadata(logOffsetMetadata) + stateManager.update(producerAppendInfo) + } + + val producerId1 = producerId + val startOffset1 = 992342L + beginTxn(producerId1, startOffset1) + + val producerId2 = producerId + 1 + val startOffset2 = startOffset1 + 25 + beginTxn(producerId2, startOffset2) + + val producerId3 = producerId + 2 + val startOffset3 = startOffset1 + 57 + beginTxn(producerId3, startOffset3) + + val lastOffset1 = startOffset3 + 15 + val completedTxn1 = CompletedTxn(producerId1, startOffset1, lastOffset1, isAborted = false) + assertEquals(startOffset2, stateManager.lastStableOffset(completedTxn1)) + stateManager.completeTxn(completedTxn1) + stateManager.onHighWatermarkUpdated(lastOffset1 + 1) + assertEquals(Some(startOffset2), stateManager.firstUnstableOffset.map(_.messageOffset)) + + val lastOffset3 = lastOffset1 + 20 + val completedTxn3 = CompletedTxn(producerId3, startOffset3, lastOffset3, isAborted = false) + assertEquals(startOffset2, stateManager.lastStableOffset(completedTxn3)) + stateManager.completeTxn(completedTxn3) + stateManager.onHighWatermarkUpdated(lastOffset3 + 1) + assertEquals(Some(startOffset2), stateManager.firstUnstableOffset.map(_.messageOffset)) + + val lastOffset2 = lastOffset3 + 78 + val completedTxn2 = CompletedTxn(producerId2, startOffset2, lastOffset2, isAborted = false) + assertEquals(lastOffset2 + 1, stateManager.lastStableOffset(completedTxn2)) + stateManager.completeTxn(completedTxn2) + stateManager.onHighWatermarkUpdated(lastOffset2 + 1) + assertEquals(None, stateManager.firstUnstableOffset) + } + @Test def testNonMatchingTxnFirstOffsetMetadataNotCached(): Unit = { val producerEpoch = 0.toShort @@ -823,7 +872,8 @@ class ProducerStateManagerTest extends JUnitSuite { val endTxnMarker = new EndTransactionMarker(controlType, coordinatorEpoch) val completedTxn = producerAppendInfo.appendEndTxnMarker(endTxnMarker, producerEpoch, offset, timestamp) mapping.update(producerAppendInfo) - val lastStableOffset = mapping.completeTxn(completedTxn) + val lastStableOffset = mapping.lastStableOffset(completedTxn) + mapping.completeTxn(completedTxn) mapping.updateMapEndOffset(offset + 1) (completedTxn, lastStableOffset) }