Browse Source

KAFKA-5131; WriteTxnMarkers and complete commit/abort on partition immigration

Write txn markers and complete the commit/abort for transactions in PrepareXX
state during partition immigration.

Author: Damian Guy <damian.guy@gmail.com>

Reviewers: Guozhang Wang <wangguoz@gmail.com>, Apurva Mehta <apurva@confluent.io>, Ismael Juma <ismael@juma.me.uk>

Closes #2926 from dguy/kafka-5059
pull/2971/head
Damian Guy 8 years ago committed by Ismael Juma
parent
commit
619fd7aeb6
  1. 112
      core/src/main/scala/kafka/coordinator/transaction/TransactionCoordinator.scala
  2. 24
      core/src/main/scala/kafka/coordinator/transaction/TransactionStateManager.scala
  3. 2
      core/src/test/scala/unit/kafka/coordinator/transaction/TransactionMarkerChannelManagerTest.scala
  4. 33
      core/src/test/scala/unit/kafka/coordinator/transaction/TransactionStateManagerTest.scala

112
core/src/main/scala/kafka/coordinator/transaction/TransactionCoordinator.scala

@ -18,7 +18,6 @@ package kafka.coordinator.transaction @@ -18,7 +18,6 @@ package kafka.coordinator.transaction
import java.util.Properties
import java.util.concurrent.atomic.AtomicBoolean
import java.util.concurrent.locks.ReentrantReadWriteLock
import kafka.server.{DelayedOperationPurgatory, KafkaConfig, MetadataCache, ReplicaManager}
import kafka.utils.{Logging, Scheduler, ZkUtils}
@ -28,8 +27,6 @@ import org.apache.kafka.common.protocol.Errors @@ -28,8 +27,6 @@ import org.apache.kafka.common.protocol.Errors
import org.apache.kafka.common.record.RecordBatch
import org.apache.kafka.common.requests.TransactionResult
import org.apache.kafka.common.utils.Time
import kafka.utils.CoreUtils.inWriteLock
object TransactionCoordinator {
@ -253,7 +250,7 @@ class TransactionCoordinator(brokerId: Int, @@ -253,7 +250,7 @@ class TransactionCoordinator(brokerId: Int,
}
def handleTxnImmigration(transactionStateTopicPartitionId: Int, coordinatorEpoch: Int) {
txnManager.loadTransactionsForPartition(transactionStateTopicPartitionId, coordinatorEpoch)
txnManager.loadTransactionsForPartition(transactionStateTopicPartitionId, coordinatorEpoch, writeTxnMarkers)
}
def handleTxnEmigration(transactionStateTopicPartitionId: Int) {
@ -322,61 +319,66 @@ class TransactionCoordinator(brokerId: Int, @@ -322,61 +319,66 @@ class TransactionCoordinator(brokerId: Int,
if (errors == Errors.NONE)
txnManager.coordinatorEpochFor(transactionalId) match {
case Some(coordinatorEpoch) =>
def completionCallback(error: Errors): Unit = {
error match {
case Errors.NONE =>
txnManager.getTransactionState(transactionalId) match {
case Some(preparedCommitMetadata) =>
val completedState = if (nextState == PrepareCommit) CompleteCommit else CompleteAbort
val committedMetadata = new TransactionMetadata(pid,
epoch,
preparedCommitMetadata.txnTimeoutMs,
completedState,
preparedCommitMetadata.topicPartitions,
preparedCommitMetadata.transactionStartTime,
time.milliseconds())
preparedCommitMetadata.prepareTransitionTo(completedState)
def writeCommittedTransactionCallback(error: Errors): Unit =
error match {
case Errors.NONE =>
trace(s"completed txn for transactionalId: $transactionalId state after commit: ${txnManager.getTransactionState(transactionalId)}")
txnMarkerChannelManager.removeCompleted(txnManager.partitionFor(transactionalId), pid)
case Errors.NOT_COORDINATOR =>
// this one should be completed by the new coordinator
warn(s"no longer the coordinator for transactionalId: $transactionalId")
case _ =>
warn(s"error: $error caught for transactionalId: $transactionalId when appending state: $completedState. retrying")
// retry until success
txnManager.appendTransactionToLog(transactionalId, committedMetadata, writeCommittedTransactionCallback)
}
txnManager.appendTransactionToLog(transactionalId, committedMetadata, writeCommittedTransactionCallback)
case None =>
// this one should be completed by the new coordinator
warn(s"no longer the coordinator for transactionalId: $transactionalId")
}
case Errors.NOT_COORDINATOR =>
warn(s"no longer the coordinator for transactionalId: $transactionalId")
case _ =>
warn(s"error: $error caught when writing transaction markers for transactionalId: $transactionalId. retrying")
txnMarkerChannelManager.addTxnMarkerRequest(txnManager.partitionFor(transactionalId),
newMetadata,
coordinatorEpoch,
completionCallback)
}
}
txnMarkerChannelManager.addTxnMarkerRequest(txnManager.partitionFor(transactionalId), newMetadata, coordinatorEpoch, completionCallback)
writeTxnMarkers(WriteTxnMarkerArgs(transactionalId, pid, epoch, nextState, newMetadata, coordinatorEpoch))
case None =>
// this one should be completed by the new coordinator
warn(s"no longer the coordinator for transactionalId: $transactionalId")
}
}
txnManager.appendTransactionToLog(transactionalId, newMetadata, logAppendCallback)
}
private def writeTxnMarkers(markerArgs: WriteTxnMarkerArgs): Unit = {
def completionCallback(error: Errors): Unit = {
error match {
case Errors.NONE =>
txnManager.getTransactionState(markerArgs.transactionalId) match {
case Some(preparedCommitMetadata) =>
val completedState = if (markerArgs.nextState == PrepareCommit) CompleteCommit else CompleteAbort
val committedMetadata = new TransactionMetadata(markerArgs.pid,
markerArgs.epoch,
preparedCommitMetadata.txnTimeoutMs,
completedState,
preparedCommitMetadata.topicPartitions,
preparedCommitMetadata.transactionStartTime,
time.milliseconds())
preparedCommitMetadata.prepareTransitionTo(completedState)
def writeCommittedTransactionCallback(error: Errors): Unit = {
error match {
case Errors.NONE =>
txnMarkerChannelManager.removeCompleted(txnManager.partitionFor(markerArgs.transactionalId),
markerArgs.pid)
case Errors.NOT_COORDINATOR =>
// this one should be completed by the new coordinator
warn(s"no longer the coordinator for transactionalId: ${markerArgs.transactionalId}")
case _ =>
warn(s"error: $error caught for transactionalId: ${markerArgs.transactionalId} when appending state: $completedState. Retrying.")
// retry until success
txnManager.appendTransactionToLog(markerArgs.transactionalId, committedMetadata, writeCommittedTransactionCallback)
}
}
txnManager.appendTransactionToLog(markerArgs.transactionalId, committedMetadata, writeCommittedTransactionCallback)
case None =>
// this one should be completed by the new coordinator
warn(s"no longer the coordinator for transactionalId: ${markerArgs.transactionalId}")
}
case Errors.NOT_COORDINATOR =>
warn(s"no longer the coordinator for transactionalId: ${markerArgs.transactionalId}")
case _ =>
warn(s"error: $error caught when writing transaction markers for transactionalId: ${markerArgs.transactionalId}. retrying")
txnMarkerChannelManager.addTxnMarkerRequest(txnManager.partitionFor(markerArgs.transactionalId),
markerArgs.newMetadata,
markerArgs.coordinatorEpoch,
completionCallback)
}
}
txnMarkerChannelManager.addTxnMarkerRequest(txnManager.partitionFor(markerArgs.transactionalId),
markerArgs.newMetadata,
markerArgs.coordinatorEpoch,
completionCallback)
}
def transactionTopicConfigs: Properties = txnManager.transactionTopicConfigs
def partitionFor(transactionalId: String): Int = txnManager.partitionFor(transactionalId)
@ -408,4 +410,10 @@ class TransactionCoordinator(brokerId: Int, @@ -408,4 +410,10 @@ class TransactionCoordinator(brokerId: Int,
}
}
case class InitPidResult(pid: Long, epoch: Short, error: Errors)
case class InitPidResult(pid: Long, epoch: Short, error: Errors)
case class WriteTxnMarkerArgs(transactionalId: String,
pid: Long,
epoch: Short,
nextState: TransactionState,
newMetadata: TransactionMetadata,
coordinatorEpoch: Int)

24
core/src/main/scala/kafka/coordinator/transaction/TransactionStateManager.scala

@ -20,7 +20,7 @@ import java.nio.ByteBuffer @@ -20,7 +20,7 @@ import java.nio.ByteBuffer
import java.util.Properties
import java.util.concurrent.TimeUnit
import java.util.concurrent.atomic.AtomicBoolean
import java.util.concurrent.locks.{ReentrantLock, ReentrantReadWriteLock}
import java.util.concurrent.locks.ReentrantLock
import kafka.common.{KafkaException, Topic}
import kafka.log.LogConfig
@ -34,7 +34,7 @@ import org.apache.kafka.common.record.{FileRecords, MemoryRecords, SimpleRecord} @@ -34,7 +34,7 @@ import org.apache.kafka.common.record.{FileRecords, MemoryRecords, SimpleRecord}
import org.apache.kafka.common.requests.ProduceResponse.PartitionResponse
import org.apache.kafka.common.utils.{Time, Utils}
import scala.collection.{concurrent, mutable}
import scala.collection.mutable
import scala.collection.JavaConverters._
@ -45,7 +45,7 @@ object TransactionManager { @@ -45,7 +45,7 @@ object TransactionManager {
}
/**
* Transaction manager is part of the transaction coordinator, it manages:
* Transaction state manager is part of the transaction coordinator, it manages:
*
* 1. the transaction log, which is a special internal topic.
* 2. the transaction metadata including its ongoing transaction status.
@ -60,6 +60,8 @@ class TransactionStateManager(brokerId: Int, @@ -60,6 +60,8 @@ class TransactionStateManager(brokerId: Int,
this.logIdent = "[Transaction Log Manager " + brokerId + "]: "
type WriteTxnMarkers = WriteTxnMarkerArgs => Unit
/** shutting down flag */
private val shuttingDown = new AtomicBoolean(false)
@ -147,7 +149,7 @@ class TransactionStateManager(brokerId: Int, @@ -147,7 +149,7 @@ class TransactionStateManager(brokerId: Int,
zkUtils.getTopicPartitionCount(Topic.TransactionStateTopicName).getOrElse(config.transactionLogNumPartitions)
}
private def loadTransactionMetadata(topicPartition: TopicPartition) {
private def loadTransactionMetadata(topicPartition: TopicPartition, writeTxnMarkers: WriteTxnMarkers) {
def highWaterMark = replicaManager.getLogEndOffset(topicPartition).getOrElse(-1L)
val startMs = time.milliseconds()
@ -207,6 +209,16 @@ class TransactionStateManager(brokerId: Int, @@ -207,6 +209,16 @@ class TransactionStateManager(brokerId: Int,
throw new KafkaException("Loading transaction topic partition failed.")
}
// if state is PrepareCommit or PrepareAbort we need to complete the transaction
if (currentTxnMetadata.state == PrepareCommit || currentTxnMetadata.state == PrepareAbort) {
writeTxnMarkers(WriteTxnMarkerArgs(transactionalId,
txnMetadata.pid,
txnMetadata.producerEpoch,
txnMetadata.state,
txnMetadata,
coordinatorEpochFor(transactionalId).get
))
}
}
removedTransactionalIds.foreach { transactionalId =>
@ -229,7 +241,7 @@ class TransactionStateManager(brokerId: Int, @@ -229,7 +241,7 @@ class TransactionStateManager(brokerId: Int,
* When this broker becomes a leader for a transaction log partition, load this partition and
* populate the transaction metadata cache with the transactional ids.
*/
def loadTransactionsForPartition(partition: Int, coordinatorEpoch: Int) {
def loadTransactionsForPartition(partition: Int, coordinatorEpoch: Int, writeTxnMarkers: WriteTxnMarkers) {
validateTransactionTopicPartitionCountIsStable()
val topicPartition = new TopicPartition(Topic.TransactionStateTopicName, partition)
@ -242,7 +254,7 @@ class TransactionStateManager(brokerId: Int, @@ -242,7 +254,7 @@ class TransactionStateManager(brokerId: Int,
def loadTransactions() {
info(s"Loading transaction metadata from $topicPartition")
try {
loadTransactionMetadata(topicPartition)
loadTransactionMetadata(topicPartition, writeTxnMarkers)
} catch {
case t: Throwable => error(s"Error loading transactions from transaction log $topicPartition", t)
} finally {

2
core/src/test/scala/unit/kafka/coordinator/transaction/TransactionMarkerChannelManagerTest.scala

@ -17,7 +17,7 @@ @@ -17,7 +17,7 @@
package kafka.coordinator.transaction
import kafka.api.{LeaderAndIsr, PartitionStateInfo}
import kafka.common.{BrokerEndPointNotAvailableException, BrokerNotAvailableException, InterBrokerSendThread}
import kafka.common.{BrokerEndPointNotAvailableException, InterBrokerSendThread}
import kafka.controller.LeaderIsrAndControllerEpoch
import kafka.server.{DelayedOperationPurgatory, KafkaConfig, MetadataCache}
import kafka.utils.{MockTime, TestUtils}

33
core/src/test/scala/unit/kafka/coordinator/transaction/TransactionStateManagerTest.scala

@ -156,7 +156,7 @@ class TransactionStateManagerTest { @@ -156,7 +156,7 @@ class TransactionStateManagerTest {
assertFalse(transactionManager.isCoordinatorFor(txnId1))
assertFalse(transactionManager.isCoordinatorFor(txnId2))
transactionManager.loadTransactionsForPartition(partitionId, 0)
transactionManager.loadTransactionsForPartition(partitionId, 0, _ => ())
// let the time advance to trigger the background thread loading
scheduler.tick()
@ -293,7 +293,7 @@ class TransactionStateManagerTest { @@ -293,7 +293,7 @@ class TransactionStateManagerTest {
val coordinatorEpoch = 10
EasyMock.expect(replicaManager.getLog(EasyMock.anyObject(classOf[TopicPartition]))).andReturn(None)
EasyMock.replay(replicaManager)
transactionManager.loadTransactionsForPartition(partitionId, coordinatorEpoch)
transactionManager.loadTransactionsForPartition(partitionId, coordinatorEpoch, _ => ())
val epoch = transactionManager.coordinatorEpochFor(txnId1).get
assertEquals(coordinatorEpoch, epoch)
}
@ -303,6 +303,34 @@ class TransactionStateManagerTest { @@ -303,6 +303,34 @@ class TransactionStateManagerTest {
assertEquals(None, transactionManager.coordinatorEpochFor(txnId1))
}
@Test
def shouldWriteTxnMarkersForTransactionInPreparedCommitState(): Unit = {
verifyWritesTxnMarkersInPrepareState(PrepareCommit)
}
@Test
def shouldWriteTxnMarkersForTransactionInPreparedAbortState(): Unit = {
verifyWritesTxnMarkersInPrepareState(PrepareAbort)
}
private def verifyWritesTxnMarkersInPrepareState(state: TransactionState): Unit = {
txnMetadata1.state = state
txnMetadata1.addPartitions(Set[TopicPartition](new TopicPartition("topic1", 0),
new TopicPartition("topic1", 1)))
txnRecords += new SimpleRecord(txnMessageKeyBytes1, TransactionLog.valueToBytes(txnMetadata1))
val startOffset = 0L
val records = MemoryRecords.withRecords(startOffset, CompressionType.NONE, txnRecords: _*)
prepareTxnLog(topicPartition, 0, records)
var receivedArgs: WriteTxnMarkerArgs = null
transactionManager.loadTransactionsForPartition(partitionId, 0, markerArgs => receivedArgs = markerArgs)
scheduler.tick()
assertEquals(txnId1, receivedArgs.transactionalId)
}
private def assertCallback(error: Errors): Unit = {
assertEquals(expectedError, error)
}
@ -351,4 +379,5 @@ class TransactionStateManagerTest { @@ -351,4 +379,5 @@ class TransactionStateManagerTest {
EasyMock.replay(replicaManager)
}
}

Loading…
Cancel
Save