From 48179677a75276b495c59bd429e1f7874dc905b3 Mon Sep 17 00:00:00 2001 From: Jason Gustafson Date: Thu, 18 Apr 2019 08:38:43 -0700 Subject: [PATCH] MINOR: Ensure producer state append exceptions areuseful (#6591) We should include partition/offset information when we raise exceptions during producer state validation. This saves a lot of the discovery work to figure out where the problem occurred. This patch also includes a new test case to verify additional coordinator fencing cases. Reviewers: Rajini Sivaram --- .../kafka/log/ProducerStateManager.scala | 48 +++++++++-------- .../test/scala/unit/kafka/log/LogTest.scala | 52 ++++++++++++++++--- .../kafka/log/ProducerStateManagerTest.scala | 4 +- 3 files changed, 73 insertions(+), 31 deletions(-) diff --git a/core/src/main/scala/kafka/log/ProducerStateManager.scala b/core/src/main/scala/kafka/log/ProducerStateManager.scala index a3b03d063f2..59bc41772bb 100644 --- a/core/src/main/scala/kafka/log/ProducerStateManager.scala +++ b/core/src/main/scala/kafka/log/ProducerStateManager.scala @@ -184,7 +184,8 @@ private[log] class ProducerStateEntry(val producerId: Long, * should have ValidationType.None. Appends coming from a client for produce requests should have * ValidationType.Full. */ -private[log] class ProducerAppendInfo(val producerId: Long, +private[log] class ProducerAppendInfo(val topicPartition: TopicPartition, + val producerId: Long, val currentEntry: ProducerStateEntry, val validationType: ValidationType) { private val transactions = ListBuffer.empty[TxnMetadata] @@ -194,35 +195,36 @@ private[log] class ProducerAppendInfo(val producerId: Long, updatedEntry.coordinatorEpoch = currentEntry.coordinatorEpoch updatedEntry.currentTxnFirstOffset = currentEntry.currentTxnFirstOffset - private def maybeValidateAppend(producerEpoch: Short, firstSeq: Int) = { + private def maybeValidateAppend(producerEpoch: Short, firstSeq: Int, offset: Long): Unit = { validationType match { case ValidationType.None => case ValidationType.EpochOnly => - checkProducerEpoch(producerEpoch) + checkProducerEpoch(producerEpoch, offset) case ValidationType.Full => - checkProducerEpoch(producerEpoch) - checkSequence(producerEpoch, firstSeq) + checkProducerEpoch(producerEpoch, offset) + checkSequence(producerEpoch, firstSeq, offset) } } - private def checkProducerEpoch(producerEpoch: Short): Unit = { + private def checkProducerEpoch(producerEpoch: Short, offset: Long): Unit = { if (producerEpoch < updatedEntry.producerEpoch) { - throw new ProducerFencedException(s"Producer's epoch is no longer valid. There is probably another producer " + - s"with a newer epoch. $producerEpoch (request epoch), ${updatedEntry.producerEpoch} (server epoch)") + throw new ProducerFencedException(s"Producer's epoch at offset $offset is no longer valid in " + + s"partition $topicPartition: $producerEpoch (request epoch), ${updatedEntry.producerEpoch} (current epoch)") } } - private def checkSequence(producerEpoch: Short, appendFirstSeq: Int): Unit = { + private def checkSequence(producerEpoch: Short, appendFirstSeq: Int, offset: Long): Unit = { if (producerEpoch != updatedEntry.producerEpoch) { if (appendFirstSeq != 0) { if (updatedEntry.producerEpoch != RecordBatch.NO_PRODUCER_EPOCH) { - throw new OutOfOrderSequenceException(s"Invalid sequence number for new epoch: $producerEpoch " + - s"(request epoch), $appendFirstSeq (seq. number)") + throw new OutOfOrderSequenceException(s"Invalid sequence number for new epoch at offset $offset in " + + s"partition $topicPartition: $producerEpoch (request epoch), $appendFirstSeq (seq. number)") } else { - throw new UnknownProducerIdException(s"Found no record of producerId=$producerId on the broker. It is possible " + - s"that the last message with the producerId=$producerId has been removed due to hitting the retention limit.") + throw new UnknownProducerIdException(s"Found no record of producerId=$producerId on the broker at offset $offset" + + s"in partition $topicPartition. It is possible that the last message with the producerId=$producerId has " + + "been removed due to hitting the retention limit.") } } } else { @@ -240,10 +242,12 @@ private[log] class ProducerAppendInfo(val producerId: Long, // the sequence number. Note that this check follows the fencing check, so the marker still fences // old producers even if it cannot determine our next expected sequence number. throw new UnknownProducerIdException(s"Local producer state matches expected epoch $producerEpoch " + - s"for producerId=$producerId, but next expected sequence number is not known.") + s"for producerId=$producerId at offset $offset in partition $topicPartition, but the next expected " + + "sequence number is not known.") } else if (!inSequence(currentLastSeq, appendFirstSeq)) { - throw new OutOfOrderSequenceException(s"Out of order sequence number for producerId $producerId: $appendFirstSeq " + - s"(incoming seq. number), $currentLastSeq (current end sequence number)") + throw new OutOfOrderSequenceException(s"Out of order sequence number for producerId $producerId at " + + s"offset $offset in partition $topicPartition: $appendFirstSeq (incoming seq. number), " + + s"$currentLastSeq (current end sequence number)") } } } @@ -278,13 +282,14 @@ private[log] class ProducerAppendInfo(val producerId: Long, firstOffset: Long, lastOffset: Long, isTransactional: Boolean): Unit = { - maybeValidateAppend(epoch, firstSeq) + maybeValidateAppend(epoch, firstSeq, firstOffset) updatedEntry.addBatch(epoch, lastSeq, lastOffset, (lastOffset - firstOffset).toInt, lastTimestamp) updatedEntry.currentTxnFirstOffset match { case Some(_) if !isTransactional => // Received a non-transactional message while a transaction is active - throw new InvalidTxnStateException(s"Expected transactional write from producer $producerId") + throw new InvalidTxnStateException(s"Expected transactional write from producer $producerId at " + + s"offset $firstOffset in partition $topicPartition") case None if isTransactional => // Began a new transaction @@ -299,10 +304,11 @@ private[log] class ProducerAppendInfo(val producerId: Long, producerEpoch: Short, offset: Long, timestamp: Long): CompletedTxn = { - checkProducerEpoch(producerEpoch) + checkProducerEpoch(producerEpoch, offset) if (updatedEntry.coordinatorEpoch > endTxnMarker.coordinatorEpoch) - throw new TransactionCoordinatorFencedException(s"Invalid coordinator epoch: ${endTxnMarker.coordinatorEpoch} " + + throw new TransactionCoordinatorFencedException(s"Invalid coordinator epoch for producerId $producerId at " + + s"offset $offset in partition $topicPartition: ${endTxnMarker.coordinatorEpoch} " + s"(zombie), ${updatedEntry.coordinatorEpoch} (current)") updatedEntry.maybeUpdateEpoch(producerEpoch) @@ -628,7 +634,7 @@ class ProducerStateManager(val topicPartition: TopicPartition, ValidationType.Full val currentEntry = lastEntry(producerId).getOrElse(ProducerStateEntry.empty(producerId)) - new ProducerAppendInfo(producerId, currentEntry, validationToPerform) + new ProducerAppendInfo(topicPartition, producerId, currentEntry, validationToPerform) } /** diff --git a/core/src/test/scala/unit/kafka/log/LogTest.scala b/core/src/test/scala/unit/kafka/log/LogTest.scala index 6b6dd7c8a9d..cb2b4b27a7a 100755 --- a/core/src/test/scala/unit/kafka/log/LogTest.scala +++ b/core/src/test/scala/unit/kafka/log/LogTest.scala @@ -3457,6 +3457,29 @@ class LogTest { appendEndTxnMarkerAsLeader(log, pid, epoch, ControlRecordType.ABORT, coordinatorEpoch = 1) } + @Test + def testZombieCoordinatorFencedEmptyTransaction(): Unit = { + val pid = 1L + val epoch = 0.toShort + val logConfig = LogTest.createLogConfig(segmentBytes = 1024 * 1024 * 5) + val log = createLog(logDir, logConfig) + + val buffer = ByteBuffer.allocate(256) + val append = appendTransactionalToBuffer(buffer, pid, epoch, leaderEpoch = 1) + append(0, 10) + appendEndTxnMarkerToBuffer(buffer, pid, epoch, 10L, ControlRecordType.COMMIT, + coordinatorEpoch = 0, leaderEpoch = 1) + + buffer.flip() + log.appendAsFollower(MemoryRecords.readableRecords(buffer)) + + appendEndTxnMarkerAsLeader(log, pid, epoch, ControlRecordType.ABORT, coordinatorEpoch = 2, leaderEpoch = 1) + appendEndTxnMarkerAsLeader(log, pid, epoch, ControlRecordType.ABORT, coordinatorEpoch = 2, leaderEpoch = 1) + assertThrows[TransactionCoordinatorFencedException] { + appendEndTxnMarkerAsLeader(log, pid, epoch, ControlRecordType.ABORT, coordinatorEpoch = 1, leaderEpoch = 1) + } + } + @Test def testFirstUnstableOffsetDoesNotExceedLogStartOffsetMidSegment(): Unit = { val logConfig = LogTest.createLogConfig(segmentBytes = 1024 * 1024 * 5) @@ -3617,10 +3640,14 @@ class LogTest { } } - private def appendEndTxnMarkerAsLeader(log: Log, producerId: Long, producerEpoch: Short, - controlType: ControlRecordType, coordinatorEpoch: Int = 0): Unit = { + private def appendEndTxnMarkerAsLeader(log: Log, + producerId: Long, + producerEpoch: Short, + controlType: ControlRecordType, + coordinatorEpoch: Int = 0, + leaderEpoch: Int = 0): Unit = { val records = endTxnRecords(controlType, producerId, producerEpoch, coordinatorEpoch = coordinatorEpoch) - log.appendAsLeader(records, isFromClient = false, leaderEpoch = 0) + log.appendAsLeader(records, isFromClient = false, leaderEpoch = leaderEpoch) } private def appendNonTransactionalAsLeader(log: Log, numRecords: Int): Unit = { @@ -3631,10 +3658,14 @@ class LogTest { log.appendAsLeader(records, leaderEpoch = 0) } - private def appendTransactionalToBuffer(buffer: ByteBuffer, producerId: Long, producerEpoch: Short): (Long, Int) => Unit = { + private def appendTransactionalToBuffer(buffer: ByteBuffer, + producerId: Long, + producerEpoch: Short, + leaderEpoch: Int = 0): (Long, Int) => Unit = { var sequence = 0 (offset: Long, numRecords: Int) => { - val builder = MemoryRecords.builder(buffer, CompressionType.NONE, offset, producerId, producerEpoch, sequence, true) + val builder = MemoryRecords.builder(buffer, RecordBatch.CURRENT_MAGIC_VALUE, CompressionType.NONE, TimestampType.CREATE_TIME, + offset, System.currentTimeMillis(), producerId, producerEpoch, sequence, true, leaderEpoch) for (seq <- sequence until sequence + numRecords) { val record = new SimpleRecord(s"$seq".getBytes) builder.append(record) @@ -3645,10 +3676,15 @@ class LogTest { } } - private def appendEndTxnMarkerToBuffer(buffer: ByteBuffer, producerId: Long, producerEpoch: Short, offset: Long, - controlType: ControlRecordType, coordinatorEpoch: Int = 0): Unit = { + private def appendEndTxnMarkerToBuffer(buffer: ByteBuffer, + producerId: Long, + producerEpoch: Short, + offset: Long, + controlType: ControlRecordType, + coordinatorEpoch: Int = 0, + leaderEpoch: Int = 0): Unit = { val marker = new EndTransactionMarker(controlType, coordinatorEpoch) - MemoryRecords.writeEndTransactionalMarker(buffer, offset, mockTime.milliseconds(), 0, producerId, producerEpoch, marker) + MemoryRecords.writeEndTransactionalMarker(buffer, offset, mockTime.milliseconds(), leaderEpoch, producerId, producerEpoch, marker) } private def appendNonTransactionalToBuffer(buffer: ByteBuffer, offset: Long, numRecords: Int): Unit = { diff --git a/core/src/test/scala/unit/kafka/log/ProducerStateManagerTest.scala b/core/src/test/scala/unit/kafka/log/ProducerStateManagerTest.scala index a2abf7b3f5a..26067b407fb 100644 --- a/core/src/test/scala/unit/kafka/log/ProducerStateManagerTest.scala +++ b/core/src/test/scala/unit/kafka/log/ProducerStateManagerTest.scala @@ -208,7 +208,7 @@ class ProducerStateManagerTest extends JUnitSuite { val producerEpoch = 0.toShort val offset = 992342L val seq = 0 - val producerAppendInfo = new ProducerAppendInfo(producerId, ProducerStateEntry.empty(producerId), ValidationType.Full) + val producerAppendInfo = new ProducerAppendInfo(partition, producerId, ProducerStateEntry.empty(producerId), ValidationType.Full) producerAppendInfo.append(producerEpoch, seq, seq, time.milliseconds(), offset, offset, isTransactional = true) val logOffsetMetadata = new LogOffsetMetadata(messageOffset = offset, segmentBaseOffset = 990000L, @@ -224,7 +224,7 @@ class ProducerStateManagerTest extends JUnitSuite { val producerEpoch = 0.toShort val offset = 992342L val seq = 0 - val producerAppendInfo = new ProducerAppendInfo(producerId, ProducerStateEntry.empty(producerId), ValidationType.Full) + val producerAppendInfo = new ProducerAppendInfo(partition, producerId, ProducerStateEntry.empty(producerId), ValidationType.Full) producerAppendInfo.append(producerEpoch, seq, seq, time.milliseconds(), offset, offset, isTransactional = true) // use some other offset to simulate a follower append where the log offset metadata won't typically