Browse Source

KAFKA-7866; Ensure no duplicate offsets after txn index append failure (#6570)

This patch fixes a bug in the append logic which can cause duplicate offsets to be appended to the log when the append to the transaction index fails. Rather than incrementing the log end offset after the index append, we do it immediately after the records are written to the log. If the index append later fails, we do two things:

1) We ensure that the last stable offset cannot advance. This guarantees that the aborted data will not be returned to the user until the transaction index contains the corresponding entry.
2) We skip updating the end offset of the producer state. When recovering the log, we will have to reprocess the log and write the index entries.

Reviewers: Jun Rao <junrao@gmail.com>
pull/6603/head
Jason Gustafson 6 years ago committed by GitHub
parent
commit
6964c356aa
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 14
      core/src/main/scala/kafka/log/Log.scala
  2. 3
      core/src/main/scala/kafka/log/LogSegment.scala
  3. 21
      core/src/main/scala/kafka/log/ProducerStateManager.scala
  4. 6
      core/src/main/scala/kafka/log/TransactionIndex.scala
  5. 41
      core/src/test/scala/unit/kafka/log/LogTest.scala
  6. 52
      core/src/test/scala/unit/kafka/log/ProducerStateManagerTest.scala

14
core/src/main/scala/kafka/log/Log.scala

@ -943,6 +943,14 @@ class Log(@volatile var dir: File,
shallowOffsetOfMaxTimestamp = appendInfo.offsetOfMaxTimestamp, shallowOffsetOfMaxTimestamp = appendInfo.offsetOfMaxTimestamp,
records = validRecords) records = validRecords)
// Increment the log end offset. We do this immediately after the append because a
// write to the transaction index below may fail and we want to ensure that the offsets
// of future appends still grow monotonically. The resulting transaction index inconsistency
// will be cleaned up after the log directory is recovered. Note that the end offset of the
// ProducerStateManager will not be updated and the last stable offset will not advance
// if the append to the transaction index fails.
updateLogEndOffset(appendInfo.lastOffset + 1)
// update the producer state // update the producer state
for ((_, producerAppendInfo) <- updatedProducers) { for ((_, producerAppendInfo) <- updatedProducers) {
producerAppendInfo.maybeCacheTxnFirstOffsetMetadata(logOffsetMetadata) producerAppendInfo.maybeCacheTxnFirstOffsetMetadata(logOffsetMetadata)
@ -952,17 +960,15 @@ class Log(@volatile var dir: File,
// update the transaction index with the true last stable offset. The last offset visible // update the transaction index with the true last stable offset. The last offset visible
// to consumers using READ_COMMITTED will be limited by this value and the high watermark. // to consumers using READ_COMMITTED will be limited by this value and the high watermark.
for (completedTxn <- completedTxns) { for (completedTxn <- completedTxns) {
val lastStableOffset = producerStateManager.completeTxn(completedTxn) val lastStableOffset = producerStateManager.lastStableOffset(completedTxn)
segment.updateTxnIndex(completedTxn, lastStableOffset) segment.updateTxnIndex(completedTxn, lastStableOffset)
producerStateManager.completeTxn(completedTxn)
} }
// always update the last producer id map offset so that the snapshot reflects the current offset // always update the last producer id map offset so that the snapshot reflects the current offset
// even if there isn't any idempotent data being written // even if there isn't any idempotent data being written
producerStateManager.updateMapEndOffset(appendInfo.lastOffset + 1) producerStateManager.updateMapEndOffset(appendInfo.lastOffset + 1)
// increment the log end offset
updateLogEndOffset(appendInfo.lastOffset + 1)
// update the first unstable offset (which is used to compute LSO) // update the first unstable offset (which is used to compute LSO)
updateFirstUnstableOffset() updateFirstUnstableOffset()

3
core/src/main/scala/kafka/log/LogSegment.scala

@ -247,8 +247,9 @@ class LogSegment private[log] (val log: FileRecords,
val maybeCompletedTxn = appendInfo.append(batch) val maybeCompletedTxn = appendInfo.append(batch)
producerStateManager.update(appendInfo) producerStateManager.update(appendInfo)
maybeCompletedTxn.foreach { completedTxn => maybeCompletedTxn.foreach { completedTxn =>
val lastStableOffset = producerStateManager.completeTxn(completedTxn) val lastStableOffset = producerStateManager.lastStableOffset(completedTxn)
updateTxnIndex(completedTxn, lastStableOffset) updateTxnIndex(completedTxn, lastStableOffset)
producerStateManager.completeTxn(completedTxn)
} }
} }
producerStateManager.updateMapEndOffset(batch.lastOffset + 1) producerStateManager.updateMapEndOffset(batch.lastOffset + 1)

21
core/src/main/scala/kafka/log/ProducerStateManager.scala

@ -31,6 +31,7 @@ import org.apache.kafka.common.protocol.types._
import org.apache.kafka.common.record.{ControlRecordType, DefaultRecordBatch, EndTransactionMarker, RecordBatch} import org.apache.kafka.common.record.{ControlRecordType, DefaultRecordBatch, EndTransactionMarker, RecordBatch}
import org.apache.kafka.common.utils.{ByteUtils, Crc32C} import org.apache.kafka.common.utils.{ByteUtils, Crc32C}
import scala.collection.JavaConverters._
import scala.collection.mutable.ListBuffer import scala.collection.mutable.ListBuffer
import scala.collection.{immutable, mutable} import scala.collection.{immutable, mutable}
@ -605,7 +606,7 @@ class ProducerStateManager(val topicPartition: TopicPartition,
* snapshot in range (if there is one). Note that the log end offset is assumed to be less than * snapshot in range (if there is one). Note that the log end offset is assumed to be less than
* or equal to the high watermark. * or equal to the high watermark.
*/ */
def truncateAndReload(logStartOffset: Long, logEndOffset: Long, currentTimeMs: Long) { def truncateAndReload(logStartOffset: Long, logEndOffset: Long, currentTimeMs: Long): Unit = {
// remove all out of range snapshots // remove all out of range snapshots
deleteSnapshotFiles(logDir, { snapOffset => deleteSnapshotFiles(logDir, { snapOffset =>
snapOffset > logEndOffset || snapOffset <= logStartOffset snapOffset > logEndOffset || snapOffset <= logStartOffset
@ -757,9 +758,20 @@ class ProducerStateManager(val topicPartition: TopicPartition,
} }
/** /**
* Complete the transaction and return the last stable offset. * Compute the last stable offset of a completed transaction, but do not yet mark the transaction complete.
* That will be done in `completeTxn` below. This is used to compute the LSO that will be appended to the
* transaction index, but the completion must be done only after successfully appending to the index.
*/ */
def completeTxn(completedTxn: CompletedTxn): Long = { def lastStableOffset(completedTxn: CompletedTxn): Long = {
val nextIncompleteTxn = ongoingTxns.values.asScala.find(_.producerId != completedTxn.producerId)
nextIncompleteTxn.map(_.firstOffset.messageOffset).getOrElse(completedTxn.lastOffset + 1)
}
/**
* Mark a transaction as completed. We will still await advancement of the high watermark before
* advancing the first unstable offset.
*/
def completeTxn(completedTxn: CompletedTxn): Unit = {
val txnMetadata = ongoingTxns.remove(completedTxn.firstOffset) val txnMetadata = ongoingTxns.remove(completedTxn.firstOffset)
if (txnMetadata == null) if (txnMetadata == null)
throw new IllegalArgumentException(s"Attempted to complete transaction $completedTxn on partition $topicPartition " + throw new IllegalArgumentException(s"Attempted to complete transaction $completedTxn on partition $topicPartition " +
@ -767,9 +779,6 @@ class ProducerStateManager(val topicPartition: TopicPartition,
txnMetadata.lastOffset = Some(completedTxn.lastOffset) txnMetadata.lastOffset = Some(completedTxn.lastOffset)
unreplicatedTxns.put(completedTxn.firstOffset, txnMetadata) unreplicatedTxns.put(completedTxn.firstOffset, txnMetadata)
val lastStableOffset = firstUndecidedOffset.getOrElse(completedTxn.lastOffset + 1)
lastStableOffset
} }
@threadsafe @threadsafe

6
core/src/main/scala/kafka/log/TransactionIndex.scala

@ -57,7 +57,7 @@ class TransactionIndex(val startOffset: Long, @volatile var file: File) extends
s"${abortedTxn.lastOffset} is not greater than current last offset $offset of index ${file.getAbsolutePath}") s"${abortedTxn.lastOffset} is not greater than current last offset $offset of index ${file.getAbsolutePath}")
} }
lastOffset = Some(abortedTxn.lastOffset) lastOffset = Some(abortedTxn.lastOffset)
Utils.writeFully(channel, abortedTxn.buffer.duplicate()) Utils.writeFully(channel(), abortedTxn.buffer.duplicate())
} }
def flush(): Unit = maybeChannel.foreach(_.force(true)) def flush(): Unit = maybeChannel.foreach(_.force(true))
@ -74,7 +74,7 @@ class TransactionIndex(val startOffset: Long, @volatile var file: File) extends
Files.deleteIfExists(file.toPath) Files.deleteIfExists(file.toPath)
} }
private def channel: FileChannel = { private def channel(): FileChannel = {
maybeChannel match { maybeChannel match {
case Some(channel) => channel case Some(channel) => channel
case None => openChannel() case None => openChannel()
@ -114,7 +114,7 @@ class TransactionIndex(val startOffset: Long, @volatile var file: File) extends
var newLastOffset: Option[Long] = None var newLastOffset: Option[Long] = None
for ((abortedTxn, position) <- iterator(() => buffer)) { for ((abortedTxn, position) <- iterator(() => buffer)) {
if (abortedTxn.lastOffset >= offset) { if (abortedTxn.lastOffset >= offset) {
channel.truncate(position) channel().truncate(position)
lastOffset = newLastOffset lastOffset = newLastOffset
return return
} }

41
core/src/test/scala/unit/kafka/log/LogTest.scala

@ -3531,6 +3531,47 @@ class LogTest {
assertEquals(Some(8L), log.firstUnstableOffset.map(_.messageOffset)) assertEquals(Some(8L), log.firstUnstableOffset.map(_.messageOffset))
} }
@Test
def testAppendToTransactionIndexFailure(): Unit = {
val pid = 1L
val epoch = 0.toShort
val logConfig = LogTest.createLogConfig(segmentBytes = 1024 * 1024 * 5)
val log = createLog(logDir, logConfig)
val append = appendTransactionalAsLeader(log, pid, epoch)
append(10)
// Kind of a hack, but renaming the index to a directory ensures that the append
// to the index will fail.
log.activeSegment.txnIndex.renameTo(log.dir)
// The append will be written to the log successfully, but the write to the index will fail
assertThrows[KafkaStorageException] {
appendEndTxnMarkerAsLeader(log, pid, epoch, ControlRecordType.ABORT, coordinatorEpoch = 1)
}
assertEquals(11L, log.logEndOffset)
assertEquals(Some(0L), log.firstUnstableOffset.map(_.messageOffset))
// Try the append a second time. The appended offset in the log should still increase.
assertThrows[KafkaStorageException] {
appendEndTxnMarkerAsLeader(log, pid, epoch, ControlRecordType.ABORT, coordinatorEpoch = 1)
}
assertEquals(12L, log.logEndOffset)
assertEquals(Some(0L), log.firstUnstableOffset.map(_.messageOffset))
// Even if the high watermark is updated, the first unstable offset does not move
log.onHighWatermarkIncremented(12L)
assertEquals(Some(0L), log.firstUnstableOffset.map(_.messageOffset))
log.close()
val reopenedLog = createLog(logDir, logConfig)
assertEquals(12L, reopenedLog.logEndOffset)
assertEquals(2, reopenedLog.activeSegment.txnIndex.allAbortedTxns.size)
reopenedLog.onHighWatermarkIncremented(12L)
assertEquals(None, reopenedLog.firstUnstableOffset.map(_.messageOffset))
}
@Test @Test
def testLastStableOffsetWithMixedProducerData() { def testLastStableOffsetWithMixedProducerData() {
val logConfig = LogTest.createLogConfig(segmentBytes = 1024 * 1024 * 5) val logConfig = LogTest.createLogConfig(segmentBytes = 1024 * 1024 * 5)

52
core/src/test/scala/unit/kafka/log/ProducerStateManagerTest.scala

@ -219,6 +219,55 @@ class ProducerStateManagerTest extends JUnitSuite {
assertEquals(Some(logOffsetMetadata), stateManager.firstUnstableOffset) assertEquals(Some(logOffsetMetadata), stateManager.firstUnstableOffset)
} }
@Test
def testLastStableOffsetCompletedTxn(): Unit = {
val producerEpoch = 0.toShort
val segmentBaseOffset = 990000L
def beginTxn(producerId: Long, startOffset: Long): Unit = {
val relativeOffset = (startOffset - segmentBaseOffset).toInt
val producerAppendInfo = new ProducerAppendInfo(producerId, ProducerStateEntry.empty(producerId), ValidationType.Full)
producerAppendInfo.append(producerEpoch, 0, 0, time.milliseconds(), startOffset, startOffset, isTransactional = true)
val logOffsetMetadata = LogOffsetMetadata(messageOffset = startOffset, segmentBaseOffset = segmentBaseOffset,
relativePositionInSegment = 50 * relativeOffset)
producerAppendInfo.maybeCacheTxnFirstOffsetMetadata(logOffsetMetadata)
stateManager.update(producerAppendInfo)
}
val producerId1 = producerId
val startOffset1 = 992342L
beginTxn(producerId1, startOffset1)
val producerId2 = producerId + 1
val startOffset2 = startOffset1 + 25
beginTxn(producerId2, startOffset2)
val producerId3 = producerId + 2
val startOffset3 = startOffset1 + 57
beginTxn(producerId3, startOffset3)
val lastOffset1 = startOffset3 + 15
val completedTxn1 = CompletedTxn(producerId1, startOffset1, lastOffset1, isAborted = false)
assertEquals(startOffset2, stateManager.lastStableOffset(completedTxn1))
stateManager.completeTxn(completedTxn1)
stateManager.onHighWatermarkUpdated(lastOffset1 + 1)
assertEquals(Some(startOffset2), stateManager.firstUnstableOffset.map(_.messageOffset))
val lastOffset3 = lastOffset1 + 20
val completedTxn3 = CompletedTxn(producerId3, startOffset3, lastOffset3, isAborted = false)
assertEquals(startOffset2, stateManager.lastStableOffset(completedTxn3))
stateManager.completeTxn(completedTxn3)
stateManager.onHighWatermarkUpdated(lastOffset3 + 1)
assertEquals(Some(startOffset2), stateManager.firstUnstableOffset.map(_.messageOffset))
val lastOffset2 = lastOffset3 + 78
val completedTxn2 = CompletedTxn(producerId2, startOffset2, lastOffset2, isAborted = false)
assertEquals(lastOffset2 + 1, stateManager.lastStableOffset(completedTxn2))
stateManager.completeTxn(completedTxn2)
stateManager.onHighWatermarkUpdated(lastOffset2 + 1)
assertEquals(None, stateManager.firstUnstableOffset)
}
@Test @Test
def testNonMatchingTxnFirstOffsetMetadataNotCached(): Unit = { def testNonMatchingTxnFirstOffsetMetadataNotCached(): Unit = {
val producerEpoch = 0.toShort val producerEpoch = 0.toShort
@ -823,7 +872,8 @@ class ProducerStateManagerTest extends JUnitSuite {
val endTxnMarker = new EndTransactionMarker(controlType, coordinatorEpoch) val endTxnMarker = new EndTransactionMarker(controlType, coordinatorEpoch)
val completedTxn = producerAppendInfo.appendEndTxnMarker(endTxnMarker, producerEpoch, offset, timestamp) val completedTxn = producerAppendInfo.appendEndTxnMarker(endTxnMarker, producerEpoch, offset, timestamp)
mapping.update(producerAppendInfo) mapping.update(producerAppendInfo)
val lastStableOffset = mapping.completeTxn(completedTxn) val lastStableOffset = mapping.lastStableOffset(completedTxn)
mapping.completeTxn(completedTxn)
mapping.updateMapEndOffset(offset + 1) mapping.updateMapEndOffset(offset + 1)
(completedTxn, lastStableOffset) (completedTxn, lastStableOffset)
} }

Loading…
Cancel
Save