diff --git a/core/src/main/scala/kafka/coordinator/transaction/TransactionCoordinator.scala b/core/src/main/scala/kafka/coordinator/transaction/TransactionCoordinator.scala index 2111a8f2247..46c061ebcea 100644 --- a/core/src/main/scala/kafka/coordinator/transaction/TransactionCoordinator.scala +++ b/core/src/main/scala/kafka/coordinator/transaction/TransactionCoordinator.scala @@ -18,7 +18,6 @@ package kafka.coordinator.transaction import java.util.Properties import java.util.concurrent.atomic.AtomicBoolean -import java.util.concurrent.locks.ReentrantReadWriteLock import kafka.server.{DelayedOperationPurgatory, KafkaConfig, MetadataCache, ReplicaManager} import kafka.utils.{Logging, Scheduler, ZkUtils} @@ -28,8 +27,6 @@ import org.apache.kafka.common.protocol.Errors import org.apache.kafka.common.record.RecordBatch import org.apache.kafka.common.requests.TransactionResult import org.apache.kafka.common.utils.Time -import kafka.utils.CoreUtils.inWriteLock - object TransactionCoordinator { @@ -253,7 +250,7 @@ class TransactionCoordinator(brokerId: Int, } def handleTxnImmigration(transactionStateTopicPartitionId: Int, coordinatorEpoch: Int) { - txnManager.loadTransactionsForPartition(transactionStateTopicPartitionId, coordinatorEpoch) + txnManager.loadTransactionsForPartition(transactionStateTopicPartitionId, coordinatorEpoch, writeTxnMarkers) } def handleTxnEmigration(transactionStateTopicPartitionId: Int) { @@ -322,61 +319,66 @@ class TransactionCoordinator(brokerId: Int, if (errors == Errors.NONE) txnManager.coordinatorEpochFor(transactionalId) match { case Some(coordinatorEpoch) => - def completionCallback(error: Errors): Unit = { - error match { - case Errors.NONE => - txnManager.getTransactionState(transactionalId) match { - case Some(preparedCommitMetadata) => - val completedState = if (nextState == PrepareCommit) CompleteCommit else CompleteAbort - val committedMetadata = new TransactionMetadata(pid, - epoch, - preparedCommitMetadata.txnTimeoutMs, - completedState, - preparedCommitMetadata.topicPartitions, - preparedCommitMetadata.transactionStartTime, - time.milliseconds()) - preparedCommitMetadata.prepareTransitionTo(completedState) - - def writeCommittedTransactionCallback(error: Errors): Unit = - error match { - case Errors.NONE => - trace(s"completed txn for transactionalId: $transactionalId state after commit: ${txnManager.getTransactionState(transactionalId)}") - txnMarkerChannelManager.removeCompleted(txnManager.partitionFor(transactionalId), pid) - case Errors.NOT_COORDINATOR => - // this one should be completed by the new coordinator - warn(s"no longer the coordinator for transactionalId: $transactionalId") - case _ => - warn(s"error: $error caught for transactionalId: $transactionalId when appending state: $completedState. retrying") - // retry until success - txnManager.appendTransactionToLog(transactionalId, committedMetadata, writeCommittedTransactionCallback) - } - - txnManager.appendTransactionToLog(transactionalId, committedMetadata, writeCommittedTransactionCallback) - case None => - // this one should be completed by the new coordinator - warn(s"no longer the coordinator for transactionalId: $transactionalId") - } - case Errors.NOT_COORDINATOR => - warn(s"no longer the coordinator for transactionalId: $transactionalId") - case _ => - warn(s"error: $error caught when writing transaction markers for transactionalId: $transactionalId. retrying") - txnMarkerChannelManager.addTxnMarkerRequest(txnManager.partitionFor(transactionalId), - newMetadata, - coordinatorEpoch, - completionCallback) - } - } - - txnMarkerChannelManager.addTxnMarkerRequest(txnManager.partitionFor(transactionalId), newMetadata, coordinatorEpoch, completionCallback) + writeTxnMarkers(WriteTxnMarkerArgs(transactionalId, pid, epoch, nextState, newMetadata, coordinatorEpoch)) case None => // this one should be completed by the new coordinator warn(s"no longer the coordinator for transactionalId: $transactionalId") } } - txnManager.appendTransactionToLog(transactionalId, newMetadata, logAppendCallback) } + private def writeTxnMarkers(markerArgs: WriteTxnMarkerArgs): Unit = { + def completionCallback(error: Errors): Unit = { + error match { + case Errors.NONE => + txnManager.getTransactionState(markerArgs.transactionalId) match { + case Some(preparedCommitMetadata) => + val completedState = if (markerArgs.nextState == PrepareCommit) CompleteCommit else CompleteAbort + val committedMetadata = new TransactionMetadata(markerArgs.pid, + markerArgs.epoch, + preparedCommitMetadata.txnTimeoutMs, + completedState, + preparedCommitMetadata.topicPartitions, + preparedCommitMetadata.transactionStartTime, + time.milliseconds()) + preparedCommitMetadata.prepareTransitionTo(completedState) + + def writeCommittedTransactionCallback(error: Errors): Unit = { + error match { + case Errors.NONE => + txnMarkerChannelManager.removeCompleted(txnManager.partitionFor(markerArgs.transactionalId), + markerArgs.pid) + case Errors.NOT_COORDINATOR => + // this one should be completed by the new coordinator + warn(s"no longer the coordinator for transactionalId: ${markerArgs.transactionalId}") + case _ => + warn(s"error: $error caught for transactionalId: ${markerArgs.transactionalId} when appending state: $completedState. Retrying.") + // retry until success + txnManager.appendTransactionToLog(markerArgs.transactionalId, committedMetadata, writeCommittedTransactionCallback) + } + } + txnManager.appendTransactionToLog(markerArgs.transactionalId, committedMetadata, writeCommittedTransactionCallback) + case None => + // this one should be completed by the new coordinator + warn(s"no longer the coordinator for transactionalId: ${markerArgs.transactionalId}") + } + case Errors.NOT_COORDINATOR => + warn(s"no longer the coordinator for transactionalId: ${markerArgs.transactionalId}") + case _ => + warn(s"error: $error caught when writing transaction markers for transactionalId: ${markerArgs.transactionalId}. retrying") + txnMarkerChannelManager.addTxnMarkerRequest(txnManager.partitionFor(markerArgs.transactionalId), + markerArgs.newMetadata, + markerArgs.coordinatorEpoch, + completionCallback) + } + } + txnMarkerChannelManager.addTxnMarkerRequest(txnManager.partitionFor(markerArgs.transactionalId), + markerArgs.newMetadata, + markerArgs.coordinatorEpoch, + completionCallback) + } + def transactionTopicConfigs: Properties = txnManager.transactionTopicConfigs def partitionFor(transactionalId: String): Int = txnManager.partitionFor(transactionalId) @@ -408,4 +410,10 @@ class TransactionCoordinator(brokerId: Int, } } -case class InitPidResult(pid: Long, epoch: Short, error: Errors) \ No newline at end of file +case class InitPidResult(pid: Long, epoch: Short, error: Errors) +case class WriteTxnMarkerArgs(transactionalId: String, + pid: Long, + epoch: Short, + nextState: TransactionState, + newMetadata: TransactionMetadata, + coordinatorEpoch: Int) diff --git a/core/src/main/scala/kafka/coordinator/transaction/TransactionStateManager.scala b/core/src/main/scala/kafka/coordinator/transaction/TransactionStateManager.scala index 2e40a345d5e..c8931c40723 100644 --- a/core/src/main/scala/kafka/coordinator/transaction/TransactionStateManager.scala +++ b/core/src/main/scala/kafka/coordinator/transaction/TransactionStateManager.scala @@ -20,7 +20,7 @@ import java.nio.ByteBuffer import java.util.Properties import java.util.concurrent.TimeUnit import java.util.concurrent.atomic.AtomicBoolean -import java.util.concurrent.locks.{ReentrantLock, ReentrantReadWriteLock} +import java.util.concurrent.locks.ReentrantLock import kafka.common.{KafkaException, Topic} import kafka.log.LogConfig @@ -34,7 +34,7 @@ import org.apache.kafka.common.record.{FileRecords, MemoryRecords, SimpleRecord} import org.apache.kafka.common.requests.ProduceResponse.PartitionResponse import org.apache.kafka.common.utils.{Time, Utils} -import scala.collection.{concurrent, mutable} +import scala.collection.mutable import scala.collection.JavaConverters._ @@ -45,7 +45,7 @@ object TransactionManager { } /** - * Transaction manager is part of the transaction coordinator, it manages: + * Transaction state manager is part of the transaction coordinator, it manages: * * 1. the transaction log, which is a special internal topic. * 2. the transaction metadata including its ongoing transaction status. @@ -60,6 +60,8 @@ class TransactionStateManager(brokerId: Int, this.logIdent = "[Transaction Log Manager " + brokerId + "]: " + type WriteTxnMarkers = WriteTxnMarkerArgs => Unit + /** shutting down flag */ private val shuttingDown = new AtomicBoolean(false) @@ -147,7 +149,7 @@ class TransactionStateManager(brokerId: Int, zkUtils.getTopicPartitionCount(Topic.TransactionStateTopicName).getOrElse(config.transactionLogNumPartitions) } - private def loadTransactionMetadata(topicPartition: TopicPartition) { + private def loadTransactionMetadata(topicPartition: TopicPartition, writeTxnMarkers: WriteTxnMarkers) { def highWaterMark = replicaManager.getLogEndOffset(topicPartition).getOrElse(-1L) val startMs = time.milliseconds() @@ -207,6 +209,16 @@ class TransactionStateManager(brokerId: Int, throw new KafkaException("Loading transaction topic partition failed.") } + // if state is PrepareCommit or PrepareAbort we need to complete the transaction + if (currentTxnMetadata.state == PrepareCommit || currentTxnMetadata.state == PrepareAbort) { + writeTxnMarkers(WriteTxnMarkerArgs(transactionalId, + txnMetadata.pid, + txnMetadata.producerEpoch, + txnMetadata.state, + txnMetadata, + coordinatorEpochFor(transactionalId).get + )) + } } removedTransactionalIds.foreach { transactionalId => @@ -229,7 +241,7 @@ class TransactionStateManager(brokerId: Int, * When this broker becomes a leader for a transaction log partition, load this partition and * populate the transaction metadata cache with the transactional ids. */ - def loadTransactionsForPartition(partition: Int, coordinatorEpoch: Int) { + def loadTransactionsForPartition(partition: Int, coordinatorEpoch: Int, writeTxnMarkers: WriteTxnMarkers) { validateTransactionTopicPartitionCountIsStable() val topicPartition = new TopicPartition(Topic.TransactionStateTopicName, partition) @@ -242,7 +254,7 @@ class TransactionStateManager(brokerId: Int, def loadTransactions() { info(s"Loading transaction metadata from $topicPartition") try { - loadTransactionMetadata(topicPartition) + loadTransactionMetadata(topicPartition, writeTxnMarkers) } catch { case t: Throwable => error(s"Error loading transactions from transaction log $topicPartition", t) } finally { diff --git a/core/src/test/scala/unit/kafka/coordinator/transaction/TransactionMarkerChannelManagerTest.scala b/core/src/test/scala/unit/kafka/coordinator/transaction/TransactionMarkerChannelManagerTest.scala index 1c491511fb8..29240a6fb27 100644 --- a/core/src/test/scala/unit/kafka/coordinator/transaction/TransactionMarkerChannelManagerTest.scala +++ b/core/src/test/scala/unit/kafka/coordinator/transaction/TransactionMarkerChannelManagerTest.scala @@ -17,7 +17,7 @@ package kafka.coordinator.transaction import kafka.api.{LeaderAndIsr, PartitionStateInfo} -import kafka.common.{BrokerEndPointNotAvailableException, BrokerNotAvailableException, InterBrokerSendThread} +import kafka.common.{BrokerEndPointNotAvailableException, InterBrokerSendThread} import kafka.controller.LeaderIsrAndControllerEpoch import kafka.server.{DelayedOperationPurgatory, KafkaConfig, MetadataCache} import kafka.utils.{MockTime, TestUtils} diff --git a/core/src/test/scala/unit/kafka/coordinator/transaction/TransactionStateManagerTest.scala b/core/src/test/scala/unit/kafka/coordinator/transaction/TransactionStateManagerTest.scala index 2edcb8f9bd7..94dc12bcc54 100644 --- a/core/src/test/scala/unit/kafka/coordinator/transaction/TransactionStateManagerTest.scala +++ b/core/src/test/scala/unit/kafka/coordinator/transaction/TransactionStateManagerTest.scala @@ -156,7 +156,7 @@ class TransactionStateManagerTest { assertFalse(transactionManager.isCoordinatorFor(txnId1)) assertFalse(transactionManager.isCoordinatorFor(txnId2)) - transactionManager.loadTransactionsForPartition(partitionId, 0) + transactionManager.loadTransactionsForPartition(partitionId, 0, _ => ()) // let the time advance to trigger the background thread loading scheduler.tick() @@ -293,7 +293,7 @@ class TransactionStateManagerTest { val coordinatorEpoch = 10 EasyMock.expect(replicaManager.getLog(EasyMock.anyObject(classOf[TopicPartition]))).andReturn(None) EasyMock.replay(replicaManager) - transactionManager.loadTransactionsForPartition(partitionId, coordinatorEpoch) + transactionManager.loadTransactionsForPartition(partitionId, coordinatorEpoch, _ => ()) val epoch = transactionManager.coordinatorEpochFor(txnId1).get assertEquals(coordinatorEpoch, epoch) } @@ -303,6 +303,34 @@ class TransactionStateManagerTest { assertEquals(None, transactionManager.coordinatorEpochFor(txnId1)) } + @Test + def shouldWriteTxnMarkersForTransactionInPreparedCommitState(): Unit = { + verifyWritesTxnMarkersInPrepareState(PrepareCommit) + } + + @Test + def shouldWriteTxnMarkersForTransactionInPreparedAbortState(): Unit = { + verifyWritesTxnMarkersInPrepareState(PrepareAbort) + } + + private def verifyWritesTxnMarkersInPrepareState(state: TransactionState): Unit = { + txnMetadata1.state = state + txnMetadata1.addPartitions(Set[TopicPartition](new TopicPartition("topic1", 0), + new TopicPartition("topic1", 1))) + + txnRecords += new SimpleRecord(txnMessageKeyBytes1, TransactionLog.valueToBytes(txnMetadata1)) + val startOffset = 0L + val records = MemoryRecords.withRecords(startOffset, CompressionType.NONE, txnRecords: _*) + + prepareTxnLog(topicPartition, 0, records) + + var receivedArgs: WriteTxnMarkerArgs = null + transactionManager.loadTransactionsForPartition(partitionId, 0, markerArgs => receivedArgs = markerArgs) + scheduler.tick() + + assertEquals(txnId1, receivedArgs.transactionalId) + } + private def assertCallback(error: Errors): Unit = { assertEquals(expectedError, error) } @@ -351,4 +379,5 @@ class TransactionStateManagerTest { EasyMock.replay(replicaManager) } + }