Browse Source

KAFKA-8803: Remove timestamp check in completeTransitionTo (#8278)

In prepareAddPartitions the txnStartTimestamp could be updated as updateTimestamp, which is assumed to be always larger then the original startTimestamp. However, due to ntp time shift the timer may go backwards and hence the newStartTimestamp be smaller than the original one. Then later in completeTransitionTo the time check would fail with an IllegalStateException, and the txn would not transit to Ongoing.

An indirect result of this, is that this txn would NEVER be expired anymore because only Ongoing ones would be checked for expiration.

We should do the same as in #3286 to remove this check.

Also added test coverage for both KAFKA-5415 and KAFKA-8803.

Reviewers: Jason Gustafson<jason@confluent.io>
pull/8310/head
Guozhang Wang 5 years ago committed by GitHub
parent
commit
6a88d32b9f
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 3
      core/src/main/scala/kafka/coordinator/transaction/TransactionMetadata.scala
  2. 215
      core/src/test/scala/unit/kafka/coordinator/transaction/TransactionMetadataTest.scala

3
core/src/main/scala/kafka/coordinator/transaction/TransactionMetadata.scala

@ -378,8 +378,7 @@ private[transaction] class TransactionMetadata(val transactionalId: String, @@ -378,8 +378,7 @@ private[transaction] class TransactionMetadata(val transactionalId: String,
case Ongoing => // from addPartitions
if (!validProducerEpoch(transitMetadata) ||
!topicPartitions.subsetOf(transitMetadata.topicPartitions) ||
txnTimeoutMs != transitMetadata.txnTimeoutMs ||
txnStartTimestamp > transitMetadata.txnStartTimestamp) {
txnTimeoutMs != transitMetadata.txnTimeoutMs) {
throwStateTransitionFailure(transitMetadata)
} else {

215
core/src/test/scala/unit/kafka/coordinator/transaction/TransactionMetadataTest.scala

@ -17,6 +17,7 @@ @@ -17,6 +17,7 @@
package kafka.coordinator.transaction
import kafka.utils.MockTime
import org.apache.kafka.common.TopicPartition
import org.apache.kafka.common.protocol.Errors
import org.apache.kafka.common.record.RecordBatch
import org.junit.Assert._
@ -27,11 +28,11 @@ import scala.collection.mutable @@ -27,11 +28,11 @@ import scala.collection.mutable
class TransactionMetadataTest {
val time = new MockTime()
val producerId = 23423L
val transactionalId = "txnlId"
@Test
def testInitializeEpoch(): Unit = {
val transactionalId = "txnlId"
val producerId = 23423L
val producerEpoch = RecordBatch.NO_PRODUCER_EPOCH
val txnMetadata = new TransactionMetadata(
@ -55,8 +56,6 @@ class TransactionMetadataTest { @@ -55,8 +56,6 @@ class TransactionMetadataTest {
@Test
def testNormalEpochBump(): Unit = {
val transactionalId = "txnlId"
val producerId = 23423L
val producerEpoch = 735.toShort
val txnMetadata = new TransactionMetadata(
@ -79,8 +78,6 @@ class TransactionMetadataTest { @@ -79,8 +78,6 @@ class TransactionMetadataTest {
@Test(expected = classOf[IllegalStateException])
def testBumpEpochNotAllowedIfEpochsExhausted(): Unit = {
val transactionalId = "txnlId"
val producerId = 23423L
val producerEpoch = (Short.MaxValue - 1).toShort
val txnMetadata = new TransactionMetadata(
@ -98,10 +95,198 @@ class TransactionMetadataTest { @@ -98,10 +95,198 @@ class TransactionMetadataTest {
txnMetadata.prepareIncrementProducerEpoch(30000, None, time.milliseconds())
}
@Test
def testTolerateUpdateTimeShiftDuringEpochBump(): Unit = {
val producerEpoch: Short = 1
val txnMetadata = new TransactionMetadata(
transactionalId = transactionalId,
producerId = producerId,
lastProducerId = RecordBatch.NO_PRODUCER_ID,
producerEpoch = producerEpoch,
lastProducerEpoch = RecordBatch.NO_PRODUCER_EPOCH,
txnTimeoutMs = 30000,
state = Empty,
topicPartitions = mutable.Set.empty,
txnStartTimestamp = 1L,
txnLastUpdateTimestamp = time.milliseconds())
// let new time be smaller
val transitMetadata = txnMetadata.prepareIncrementProducerEpoch(30000, Option(producerEpoch), time.milliseconds() - 1).right.get
txnMetadata.completeTransitionTo(transitMetadata)
assertEquals(producerId, txnMetadata.producerId)
assertEquals(producerEpoch + 1, txnMetadata.producerEpoch)
assertEquals(producerEpoch, txnMetadata.lastProducerEpoch)
assertEquals(1L, txnMetadata.txnStartTimestamp)
assertEquals(time.milliseconds() - 1, txnMetadata.txnLastUpdateTimestamp)
}
@Test
def testTolerateUpdateTimeResetDuringProducerIdRotation(): Unit = {
val producerEpoch: Short = 1
val txnMetadata = new TransactionMetadata(
transactionalId = transactionalId,
producerId = producerId,
lastProducerId = RecordBatch.NO_PRODUCER_ID,
producerEpoch = producerEpoch,
lastProducerEpoch = RecordBatch.NO_PRODUCER_EPOCH,
txnTimeoutMs = 30000,
state = Empty,
topicPartitions = mutable.Set.empty,
txnStartTimestamp = 1L,
txnLastUpdateTimestamp = time.milliseconds())
// let new time be smaller
val transitMetadata = txnMetadata.prepareProducerIdRotation(producerId + 1, 30000, time.milliseconds() - 1, recordLastEpoch = true)
txnMetadata.completeTransitionTo(transitMetadata)
assertEquals(producerId + 1, txnMetadata.producerId)
assertEquals(producerEpoch, txnMetadata.lastProducerEpoch)
assertEquals(0, txnMetadata.producerEpoch)
assertEquals(1L, txnMetadata.txnStartTimestamp)
assertEquals(time.milliseconds() - 1, txnMetadata.txnLastUpdateTimestamp)
}
@Test
def testTolerateTimeShiftDuringAddPartitions(): Unit = {
val producerEpoch: Short = 1
val txnMetadata = new TransactionMetadata(
transactionalId = transactionalId,
producerId = producerId,
lastProducerId = RecordBatch.NO_PRODUCER_ID,
producerEpoch = producerEpoch,
lastProducerEpoch = RecordBatch.NO_PRODUCER_EPOCH,
txnTimeoutMs = 30000,
state = Empty,
topicPartitions = mutable.Set.empty,
txnStartTimestamp = time.milliseconds(),
txnLastUpdateTimestamp = time.milliseconds())
// let new time be smaller; when transting from Empty the start time would be updated to the update-time
var transitMetadata = txnMetadata.prepareAddPartitions(Set[TopicPartition](new TopicPartition("topic1", 0)), time.milliseconds() - 1)
txnMetadata.completeTransitionTo(transitMetadata)
assertEquals(Set[TopicPartition](new TopicPartition("topic1", 0)), txnMetadata.topicPartitions)
assertEquals(producerId, txnMetadata.producerId)
assertEquals(RecordBatch.NO_PRODUCER_EPOCH, txnMetadata.lastProducerEpoch)
assertEquals(producerEpoch, txnMetadata.producerEpoch)
assertEquals(time.milliseconds() - 1, txnMetadata.txnStartTimestamp)
assertEquals(time.milliseconds() - 1, txnMetadata.txnLastUpdateTimestamp)
// add another partition, check that in Ongoing state the start timestamp would not change to update time
transitMetadata = txnMetadata.prepareAddPartitions(Set[TopicPartition](new TopicPartition("topic2", 0)), time.milliseconds() - 2)
txnMetadata.completeTransitionTo(transitMetadata)
assertEquals(Set[TopicPartition](new TopicPartition("topic1", 0), new TopicPartition("topic2", 0)), txnMetadata.topicPartitions)
assertEquals(producerId, txnMetadata.producerId)
assertEquals(RecordBatch.NO_PRODUCER_EPOCH, txnMetadata.lastProducerEpoch)
assertEquals(producerEpoch, txnMetadata.producerEpoch)
assertEquals(time.milliseconds() - 1, txnMetadata.txnStartTimestamp)
assertEquals(time.milliseconds() - 2, txnMetadata.txnLastUpdateTimestamp)
}
@Test
def testTolerateTimeShiftDuringPrepareCommit(): Unit = {
val producerEpoch: Short = 1
val txnMetadata = new TransactionMetadata(
transactionalId = transactionalId,
producerId = producerId,
lastProducerId = RecordBatch.NO_PRODUCER_ID,
producerEpoch = producerEpoch,
lastProducerEpoch = RecordBatch.NO_PRODUCER_EPOCH,
txnTimeoutMs = 30000,
state = Ongoing,
topicPartitions = mutable.Set.empty,
txnStartTimestamp = 1L,
txnLastUpdateTimestamp = time.milliseconds())
// let new time be smaller
var transitMetadata = txnMetadata.prepareAbortOrCommit(PrepareCommit, time.milliseconds() - 1)
txnMetadata.completeTransitionTo(transitMetadata)
assertEquals(PrepareCommit, txnMetadata.state)
assertEquals(producerId, txnMetadata.producerId)
assertEquals(RecordBatch.NO_PRODUCER_EPOCH, txnMetadata.lastProducerEpoch)
assertEquals(producerEpoch, txnMetadata.producerEpoch)
assertEquals(1L, txnMetadata.txnStartTimestamp)
assertEquals(time.milliseconds() - 1, txnMetadata.txnLastUpdateTimestamp)
}
@Test
def testTolerateTimeShiftDuringPrepareAbort(): Unit = {
val producerEpoch: Short = 1
val txnMetadata = new TransactionMetadata(
transactionalId = transactionalId,
producerId = producerId,
lastProducerId = RecordBatch.NO_PRODUCER_ID,
producerEpoch = producerEpoch,
lastProducerEpoch = RecordBatch.NO_PRODUCER_EPOCH,
txnTimeoutMs = 30000,
state = Ongoing,
topicPartitions = mutable.Set.empty,
txnStartTimestamp = 1L,
txnLastUpdateTimestamp = time.milliseconds())
// let new time be smaller
var transitMetadata = txnMetadata.prepareAbortOrCommit(PrepareAbort, time.milliseconds() - 1)
txnMetadata.completeTransitionTo(transitMetadata)
assertEquals(PrepareAbort, txnMetadata.state)
assertEquals(producerId, txnMetadata.producerId)
assertEquals(RecordBatch.NO_PRODUCER_EPOCH, txnMetadata.lastProducerEpoch)
assertEquals(producerEpoch, txnMetadata.producerEpoch)
assertEquals(1L, txnMetadata.txnStartTimestamp)
assertEquals(time.milliseconds() - 1, txnMetadata.txnLastUpdateTimestamp)
}
@Test
def testTolerateTimeShiftDuringCompleteCommit(): Unit = {
val producerEpoch: Short = 1
val txnMetadata = new TransactionMetadata(
transactionalId = transactionalId,
producerId = producerId,
lastProducerId = RecordBatch.NO_PRODUCER_ID,
producerEpoch = producerEpoch,
lastProducerEpoch = RecordBatch.NO_PRODUCER_EPOCH,
txnTimeoutMs = 30000,
state = PrepareCommit,
topicPartitions = mutable.Set.empty,
txnStartTimestamp = 1L,
txnLastUpdateTimestamp = time.milliseconds())
// let new time be smaller
var transitMetadata = txnMetadata.prepareComplete(time.milliseconds() - 1)
txnMetadata.completeTransitionTo(transitMetadata)
assertEquals(CompleteCommit, txnMetadata.state)
assertEquals(producerId, txnMetadata.producerId)
assertEquals(RecordBatch.NO_PRODUCER_EPOCH, txnMetadata.lastProducerEpoch)
assertEquals(producerEpoch, txnMetadata.producerEpoch)
assertEquals(1L, txnMetadata.txnStartTimestamp)
assertEquals(time.milliseconds() - 1, txnMetadata.txnLastUpdateTimestamp)
}
@Test
def testTolerateTimeShiftDuringCompleteAbort(): Unit = {
val producerEpoch: Short = 1
val txnMetadata = new TransactionMetadata(
transactionalId = transactionalId,
producerId = producerId,
lastProducerId = RecordBatch.NO_PRODUCER_ID,
producerEpoch = producerEpoch,
lastProducerEpoch = RecordBatch.NO_PRODUCER_EPOCH,
txnTimeoutMs = 30000,
state = PrepareAbort,
topicPartitions = mutable.Set.empty,
txnStartTimestamp = 1L,
txnLastUpdateTimestamp = time.milliseconds())
// let new time be smaller
var transitMetadata = txnMetadata.prepareComplete(time.milliseconds() - 1)
txnMetadata.completeTransitionTo(transitMetadata)
assertEquals(CompleteAbort, txnMetadata.state)
assertEquals(producerId, txnMetadata.producerId)
assertEquals(RecordBatch.NO_PRODUCER_EPOCH, txnMetadata.lastProducerEpoch)
assertEquals(producerEpoch, txnMetadata.producerEpoch)
assertEquals(1L, txnMetadata.txnStartTimestamp)
assertEquals(time.milliseconds() - 1, txnMetadata.txnLastUpdateTimestamp)
}
@Test
def testFenceProducerAfterEpochsExhausted(): Unit = {
val transactionalId = "txnlId"
val producerId = 23423L
val producerEpoch = (Short.MaxValue - 1).toShort
val txnMetadata = new TransactionMetadata(
@ -131,8 +316,6 @@ class TransactionMetadataTest { @@ -131,8 +316,6 @@ class TransactionMetadataTest {
@Test(expected = classOf[IllegalStateException])
def testFenceProducerNotAllowedIfItWouldOverflow(): Unit = {
val transactionalId = "txnlId"
val producerId = 23423L
val producerEpoch = Short.MaxValue
val txnMetadata = new TransactionMetadata(
@ -151,8 +334,6 @@ class TransactionMetadataTest { @@ -151,8 +334,6 @@ class TransactionMetadataTest {
@Test
def testRotateProducerId(): Unit = {
val transactionalId = "txnlId"
val producerId = 23423L
val producerEpoch = (Short.MaxValue - 1).toShort
val txnMetadata = new TransactionMetadata(
@ -192,8 +373,6 @@ class TransactionMetadataTest { @@ -192,8 +373,6 @@ class TransactionMetadataTest {
@Test
def testAttemptedEpochBumpWithNewlyCreatedMetadata(): Unit = {
val transactionalId = "txnlId"
val producerId = 23423L
val producerEpoch = 735.toShort
val txnMetadata = new TransactionMetadata(
@ -217,8 +396,6 @@ class TransactionMetadataTest { @@ -217,8 +396,6 @@ class TransactionMetadataTest {
@Test
def testEpochBumpWithCurrentEpochProvided(): Unit = {
val transactionalId = "txnlId"
val producerId = 23423L
val producerEpoch = 735.toShort
val txnMetadata = new TransactionMetadata(
@ -242,8 +419,6 @@ class TransactionMetadataTest { @@ -242,8 +419,6 @@ class TransactionMetadataTest {
@Test
def testAttemptedEpochBumpWithLastEpoch(): Unit = {
val transactionalId = "txnlId"
val producerId = 23423L
val producerEpoch = 735.toShort
val lastProducerEpoch = (producerEpoch - 1).toShort
@ -268,8 +443,6 @@ class TransactionMetadataTest { @@ -268,8 +443,6 @@ class TransactionMetadataTest {
@Test
def testAttemptedEpochBumpWithFencedEpoch(): Unit = {
val transactionalId = "txnlId"
val producerId = 23423L
val producerEpoch = 735.toShort
val lastProducerEpoch = (producerEpoch - 1).toShort
@ -290,8 +463,6 @@ class TransactionMetadataTest { @@ -290,8 +463,6 @@ class TransactionMetadataTest {
}
private def testRotateProducerIdInOngoingState(state: TransactionState): Unit = {
val transactionalId = "txnlId"
val producerId = 23423L
val producerEpoch = (Short.MaxValue - 1).toShort
val txnMetadata = new TransactionMetadata(

Loading…
Cancel
Save