Browse Source

MINOR: Ensure producer state append exceptions areuseful (#6591)

We should include partition/offset information when we raise exceptions during producer state validation. This saves a lot of the discovery work to figure out where the problem occurred. This patch also includes a new test case to verify additional coordinator fencing cases.

Reviewers: Rajini Sivaram <rajinisivaram@googlemail.com>
pull/6603/head
Jason Gustafson 6 years ago committed by GitHub
parent
commit
48179677a7
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 48
      core/src/main/scala/kafka/log/ProducerStateManager.scala
  2. 52
      core/src/test/scala/unit/kafka/log/LogTest.scala
  3. 4
      core/src/test/scala/unit/kafka/log/ProducerStateManagerTest.scala

48
core/src/main/scala/kafka/log/ProducerStateManager.scala

@ -184,7 +184,8 @@ private[log] class ProducerStateEntry(val producerId: Long, @@ -184,7 +184,8 @@ private[log] class ProducerStateEntry(val producerId: Long,
* should have ValidationType.None. Appends coming from a client for produce requests should have
* ValidationType.Full.
*/
private[log] class ProducerAppendInfo(val producerId: Long,
private[log] class ProducerAppendInfo(val topicPartition: TopicPartition,
val producerId: Long,
val currentEntry: ProducerStateEntry,
val validationType: ValidationType) {
private val transactions = ListBuffer.empty[TxnMetadata]
@ -194,35 +195,36 @@ private[log] class ProducerAppendInfo(val producerId: Long, @@ -194,35 +195,36 @@ private[log] class ProducerAppendInfo(val producerId: Long,
updatedEntry.coordinatorEpoch = currentEntry.coordinatorEpoch
updatedEntry.currentTxnFirstOffset = currentEntry.currentTxnFirstOffset
private def maybeValidateAppend(producerEpoch: Short, firstSeq: Int) = {
private def maybeValidateAppend(producerEpoch: Short, firstSeq: Int, offset: Long): Unit = {
validationType match {
case ValidationType.None =>
case ValidationType.EpochOnly =>
checkProducerEpoch(producerEpoch)
checkProducerEpoch(producerEpoch, offset)
case ValidationType.Full =>
checkProducerEpoch(producerEpoch)
checkSequence(producerEpoch, firstSeq)
checkProducerEpoch(producerEpoch, offset)
checkSequence(producerEpoch, firstSeq, offset)
}
}
private def checkProducerEpoch(producerEpoch: Short): Unit = {
private def checkProducerEpoch(producerEpoch: Short, offset: Long): Unit = {
if (producerEpoch < updatedEntry.producerEpoch) {
throw new ProducerFencedException(s"Producer's epoch is no longer valid. There is probably another producer " +
s"with a newer epoch. $producerEpoch (request epoch), ${updatedEntry.producerEpoch} (server epoch)")
throw new ProducerFencedException(s"Producer's epoch at offset $offset is no longer valid in " +
s"partition $topicPartition: $producerEpoch (request epoch), ${updatedEntry.producerEpoch} (current epoch)")
}
}
private def checkSequence(producerEpoch: Short, appendFirstSeq: Int): Unit = {
private def checkSequence(producerEpoch: Short, appendFirstSeq: Int, offset: Long): Unit = {
if (producerEpoch != updatedEntry.producerEpoch) {
if (appendFirstSeq != 0) {
if (updatedEntry.producerEpoch != RecordBatch.NO_PRODUCER_EPOCH) {
throw new OutOfOrderSequenceException(s"Invalid sequence number for new epoch: $producerEpoch " +
s"(request epoch), $appendFirstSeq (seq. number)")
throw new OutOfOrderSequenceException(s"Invalid sequence number for new epoch at offset $offset in " +
s"partition $topicPartition: $producerEpoch (request epoch), $appendFirstSeq (seq. number)")
} else {
throw new UnknownProducerIdException(s"Found no record of producerId=$producerId on the broker. It is possible " +
s"that the last message with the producerId=$producerId has been removed due to hitting the retention limit.")
throw new UnknownProducerIdException(s"Found no record of producerId=$producerId on the broker at offset $offset" +
s"in partition $topicPartition. It is possible that the last message with the producerId=$producerId has " +
"been removed due to hitting the retention limit.")
}
}
} else {
@ -240,10 +242,12 @@ private[log] class ProducerAppendInfo(val producerId: Long, @@ -240,10 +242,12 @@ private[log] class ProducerAppendInfo(val producerId: Long,
// the sequence number. Note that this check follows the fencing check, so the marker still fences
// old producers even if it cannot determine our next expected sequence number.
throw new UnknownProducerIdException(s"Local producer state matches expected epoch $producerEpoch " +
s"for producerId=$producerId, but next expected sequence number is not known.")
s"for producerId=$producerId at offset $offset in partition $topicPartition, but the next expected " +
"sequence number is not known.")
} else if (!inSequence(currentLastSeq, appendFirstSeq)) {
throw new OutOfOrderSequenceException(s"Out of order sequence number for producerId $producerId: $appendFirstSeq " +
s"(incoming seq. number), $currentLastSeq (current end sequence number)")
throw new OutOfOrderSequenceException(s"Out of order sequence number for producerId $producerId at " +
s"offset $offset in partition $topicPartition: $appendFirstSeq (incoming seq. number), " +
s"$currentLastSeq (current end sequence number)")
}
}
}
@ -278,13 +282,14 @@ private[log] class ProducerAppendInfo(val producerId: Long, @@ -278,13 +282,14 @@ private[log] class ProducerAppendInfo(val producerId: Long,
firstOffset: Long,
lastOffset: Long,
isTransactional: Boolean): Unit = {
maybeValidateAppend(epoch, firstSeq)
maybeValidateAppend(epoch, firstSeq, firstOffset)
updatedEntry.addBatch(epoch, lastSeq, lastOffset, (lastOffset - firstOffset).toInt, lastTimestamp)
updatedEntry.currentTxnFirstOffset match {
case Some(_) if !isTransactional =>
// Received a non-transactional message while a transaction is active
throw new InvalidTxnStateException(s"Expected transactional write from producer $producerId")
throw new InvalidTxnStateException(s"Expected transactional write from producer $producerId at " +
s"offset $firstOffset in partition $topicPartition")
case None if isTransactional =>
// Began a new transaction
@ -299,10 +304,11 @@ private[log] class ProducerAppendInfo(val producerId: Long, @@ -299,10 +304,11 @@ private[log] class ProducerAppendInfo(val producerId: Long,
producerEpoch: Short,
offset: Long,
timestamp: Long): CompletedTxn = {
checkProducerEpoch(producerEpoch)
checkProducerEpoch(producerEpoch, offset)
if (updatedEntry.coordinatorEpoch > endTxnMarker.coordinatorEpoch)
throw new TransactionCoordinatorFencedException(s"Invalid coordinator epoch: ${endTxnMarker.coordinatorEpoch} " +
throw new TransactionCoordinatorFencedException(s"Invalid coordinator epoch for producerId $producerId at " +
s"offset $offset in partition $topicPartition: ${endTxnMarker.coordinatorEpoch} " +
s"(zombie), ${updatedEntry.coordinatorEpoch} (current)")
updatedEntry.maybeUpdateEpoch(producerEpoch)
@ -628,7 +634,7 @@ class ProducerStateManager(val topicPartition: TopicPartition, @@ -628,7 +634,7 @@ class ProducerStateManager(val topicPartition: TopicPartition,
ValidationType.Full
val currentEntry = lastEntry(producerId).getOrElse(ProducerStateEntry.empty(producerId))
new ProducerAppendInfo(producerId, currentEntry, validationToPerform)
new ProducerAppendInfo(topicPartition, producerId, currentEntry, validationToPerform)
}
/**

52
core/src/test/scala/unit/kafka/log/LogTest.scala

@ -3457,6 +3457,29 @@ class LogTest { @@ -3457,6 +3457,29 @@ class LogTest {
appendEndTxnMarkerAsLeader(log, pid, epoch, ControlRecordType.ABORT, coordinatorEpoch = 1)
}
@Test
def testZombieCoordinatorFencedEmptyTransaction(): Unit = {
val pid = 1L
val epoch = 0.toShort
val logConfig = LogTest.createLogConfig(segmentBytes = 1024 * 1024 * 5)
val log = createLog(logDir, logConfig)
val buffer = ByteBuffer.allocate(256)
val append = appendTransactionalToBuffer(buffer, pid, epoch, leaderEpoch = 1)
append(0, 10)
appendEndTxnMarkerToBuffer(buffer, pid, epoch, 10L, ControlRecordType.COMMIT,
coordinatorEpoch = 0, leaderEpoch = 1)
buffer.flip()
log.appendAsFollower(MemoryRecords.readableRecords(buffer))
appendEndTxnMarkerAsLeader(log, pid, epoch, ControlRecordType.ABORT, coordinatorEpoch = 2, leaderEpoch = 1)
appendEndTxnMarkerAsLeader(log, pid, epoch, ControlRecordType.ABORT, coordinatorEpoch = 2, leaderEpoch = 1)
assertThrows[TransactionCoordinatorFencedException] {
appendEndTxnMarkerAsLeader(log, pid, epoch, ControlRecordType.ABORT, coordinatorEpoch = 1, leaderEpoch = 1)
}
}
@Test
def testFirstUnstableOffsetDoesNotExceedLogStartOffsetMidSegment(): Unit = {
val logConfig = LogTest.createLogConfig(segmentBytes = 1024 * 1024 * 5)
@ -3617,10 +3640,14 @@ class LogTest { @@ -3617,10 +3640,14 @@ class LogTest {
}
}
private def appendEndTxnMarkerAsLeader(log: Log, producerId: Long, producerEpoch: Short,
controlType: ControlRecordType, coordinatorEpoch: Int = 0): Unit = {
private def appendEndTxnMarkerAsLeader(log: Log,
producerId: Long,
producerEpoch: Short,
controlType: ControlRecordType,
coordinatorEpoch: Int = 0,
leaderEpoch: Int = 0): Unit = {
val records = endTxnRecords(controlType, producerId, producerEpoch, coordinatorEpoch = coordinatorEpoch)
log.appendAsLeader(records, isFromClient = false, leaderEpoch = 0)
log.appendAsLeader(records, isFromClient = false, leaderEpoch = leaderEpoch)
}
private def appendNonTransactionalAsLeader(log: Log, numRecords: Int): Unit = {
@ -3631,10 +3658,14 @@ class LogTest { @@ -3631,10 +3658,14 @@ class LogTest {
log.appendAsLeader(records, leaderEpoch = 0)
}
private def appendTransactionalToBuffer(buffer: ByteBuffer, producerId: Long, producerEpoch: Short): (Long, Int) => Unit = {
private def appendTransactionalToBuffer(buffer: ByteBuffer,
producerId: Long,
producerEpoch: Short,
leaderEpoch: Int = 0): (Long, Int) => Unit = {
var sequence = 0
(offset: Long, numRecords: Int) => {
val builder = MemoryRecords.builder(buffer, CompressionType.NONE, offset, producerId, producerEpoch, sequence, true)
val builder = MemoryRecords.builder(buffer, RecordBatch.CURRENT_MAGIC_VALUE, CompressionType.NONE, TimestampType.CREATE_TIME,
offset, System.currentTimeMillis(), producerId, producerEpoch, sequence, true, leaderEpoch)
for (seq <- sequence until sequence + numRecords) {
val record = new SimpleRecord(s"$seq".getBytes)
builder.append(record)
@ -3645,10 +3676,15 @@ class LogTest { @@ -3645,10 +3676,15 @@ class LogTest {
}
}
private def appendEndTxnMarkerToBuffer(buffer: ByteBuffer, producerId: Long, producerEpoch: Short, offset: Long,
controlType: ControlRecordType, coordinatorEpoch: Int = 0): Unit = {
private def appendEndTxnMarkerToBuffer(buffer: ByteBuffer,
producerId: Long,
producerEpoch: Short,
offset: Long,
controlType: ControlRecordType,
coordinatorEpoch: Int = 0,
leaderEpoch: Int = 0): Unit = {
val marker = new EndTransactionMarker(controlType, coordinatorEpoch)
MemoryRecords.writeEndTransactionalMarker(buffer, offset, mockTime.milliseconds(), 0, producerId, producerEpoch, marker)
MemoryRecords.writeEndTransactionalMarker(buffer, offset, mockTime.milliseconds(), leaderEpoch, producerId, producerEpoch, marker)
}
private def appendNonTransactionalToBuffer(buffer: ByteBuffer, offset: Long, numRecords: Int): Unit = {

4
core/src/test/scala/unit/kafka/log/ProducerStateManagerTest.scala

@ -208,7 +208,7 @@ class ProducerStateManagerTest extends JUnitSuite { @@ -208,7 +208,7 @@ class ProducerStateManagerTest extends JUnitSuite {
val producerEpoch = 0.toShort
val offset = 992342L
val seq = 0
val producerAppendInfo = new ProducerAppendInfo(producerId, ProducerStateEntry.empty(producerId), ValidationType.Full)
val producerAppendInfo = new ProducerAppendInfo(partition, producerId, ProducerStateEntry.empty(producerId), ValidationType.Full)
producerAppendInfo.append(producerEpoch, seq, seq, time.milliseconds(), offset, offset, isTransactional = true)
val logOffsetMetadata = new LogOffsetMetadata(messageOffset = offset, segmentBaseOffset = 990000L,
@ -224,7 +224,7 @@ class ProducerStateManagerTest extends JUnitSuite { @@ -224,7 +224,7 @@ class ProducerStateManagerTest extends JUnitSuite {
val producerEpoch = 0.toShort
val offset = 992342L
val seq = 0
val producerAppendInfo = new ProducerAppendInfo(producerId, ProducerStateEntry.empty(producerId), ValidationType.Full)
val producerAppendInfo = new ProducerAppendInfo(partition, producerId, ProducerStateEntry.empty(producerId), ValidationType.Full)
producerAppendInfo.append(producerEpoch, seq, seq, time.milliseconds(), offset, offset, isTransactional = true)
// use some other offset to simulate a follower append where the log offset metadata won't typically

Loading…
Cancel
Save