@ -91,36 +91,30 @@ object TransactionMarkerChannelManager {
@@ -91,36 +91,30 @@ object TransactionMarkerChannelManager {
}
class TxnMarkerQueue ( @volatile private var destination : Node ) {
class TxnMarkerQueue ( @volatile var destination : Node ) {
// keep track of the requests per txn topic partition so we can easily clear the queue
// during partition emigration
private val markersPerTxnTopicPartition : concurrent.Map [ Int , BlockingQueue [ TxnIdAndMarkerEntry ] ]
= concurrent . TrieMap . empty [ Int , BlockingQueue [ TxnIdAndMarkerEntry ] ]
private val markersPerTxnTopicPartition = new ConcurrentHashMap [ Int , BlockingQueue [ TxnIdAndMarkerEntry ] ] ( ) . asScala
def removeMarkersForTxnTopicPartition ( partition : Int ) : Option [ BlockingQueue [ TxnIdAndMarkerEntry ] ] = {
markersPerTxnTopicPartition . remove ( partition )
}
def maybeUpdateNode ( node : Node ) : Unit = {
destination = node
}
def addMarkers ( txnTopicPartition : Int , txnIdAndMarker : TxnIdAndMarkerEntry ) : Unit = {
val queue = markersPerTxnTopicPartition . getOrElseUpdate ( txnTopicPartition , new LinkedBlockingQueue [ TxnIdAndMarkerEntry ] ( ) )
queue . add ( txnIdAndMarker )
}
def forEachTxnTopicPartition [ B ] ( f : ( Int , BlockingQueue [ TxnIdAndMarkerEntry ] ) => B ) : mutable.Iterable [ B ] =
markersPerTxnTopicPartition . filter { case ( _ , queue ) => ! queue . isEmpty }
. map { case ( partition : Int , queue : BlockingQueue [ TxnIdAndMarkerEntry ] ) => f ( partition , queue ) }
def node : Node = destination
def forEachTxnTopicPartition [ B ] ( f : ( Int , BlockingQueue [ TxnIdAndMarkerEntry ] ) => B ) : Unit =
markersPerTxnTopicPartition . foreach { case ( partition , queue ) =>
if ( ! queue . isEmpty ) f ( partition , queue )
}
def totalNumMarkers : Int = markersPerTxnTopicPartition . map { case ( _ , queue ) => queue . size ( ) } . sum
def totalNumMarkers : Int = markersPerTxnTopicPartition . values . foldLeft ( 0 ) { _ + _ . size }
// visible for testing
def totalNumMarkers ( txnTopicPartition : Int ) : Int = markersPerTxnTopicPartition . get ( txnTopicPartition ) . fold ( 0 ) ( _ . size ( ) )
def totalNumMarkers ( txnTopicPartition : Int ) : Int = markersPerTxnTopicPartition . get ( txnTopicPartition ) . fold ( 0 ) ( _ . size )
}
class TransactionMarkerChannelManager ( config : KafkaConfig ,
@ -148,16 +142,14 @@ class TransactionMarkerChannelManager(config: KafkaConfig,
@@ -148,16 +142,14 @@ class TransactionMarkerChannelManager(config: KafkaConfig,
"UnknownDestinationQueueSize" ,
new Gauge [ Int ] {
def value : Int = markersQueueForUnknownBroker . totalNumMarkers
} ,
Map ( "broker-id" -> config . brokerId . toString )
}
)
newGauge (
"CompleteTxn LogAppendQueueSize" ,
"LogAppendRetry QueueSize" ,
new Gauge [ Int ] {
def value : Int = txnLogAppendRetryQueue . size
} ,
Map ( "broker-id" -> config . brokerId . toString )
}
)
def start ( ) : Unit = {
@ -190,7 +182,7 @@ class TransactionMarkerChannelManager(config: KafkaConfig,
@@ -190,7 +182,7 @@ class TransactionMarkerChannelManager(config: KafkaConfig,
// we do not synchronize on the update of the broker node with the enqueuing ,
// since even if there is a race condition we will just retry
val brokerRequestQueue = markersQueuePerBroker . getOrElseUpdate ( brokerId , new TxnMarkerQueue ( broker ) )
brokerRequestQueue . maybeUpdateNode ( broker )
brokerRequestQueue . destination = broker
brokerRequestQueue . addMarkers ( txnTopicPartition , txnIdAndMarker )
trace ( s" Added marker ${ txnIdAndMarker . txnMarkerEntry } for transactional id ${ txnIdAndMarker . txnId } to destination broker $brokerId " )
@ -224,19 +216,17 @@ class TransactionMarkerChannelManager(config: KafkaConfig,
@@ -224,19 +216,17 @@ class TransactionMarkerChannelManager(config: KafkaConfig,
addTxnMarkersToBrokerQueue ( transactionalId , producerId , producerEpoch , txnResult , coordinatorEpoch , topicPartitions )
}
markersQueuePerBroker . map { case ( _ , brokerRequestQueue : TxnMarkerQueue ) =>
val txnIdAndMarkerEntries : java.util.List [ TxnIdAndMarkerEntry ] = new util . ArrayList [ TxnIdAndMarkerEntry ] ( )
markersQueuePerBroker . values . map { brokerRequestQueue =>
val txnIdAndMarkerEntries = new util . ArrayList [ TxnIdAndMarkerEntry ] ( )
brokerRequestQueue . forEachTxnTopicPartition { case ( _ , queue ) =>
queue . drainTo ( txnIdAndMarkerEntries )
}
( brokerRequestQueue . node , txnIdAndMarkerEntries )
( brokerRequestQueue . destination , txnIdAndMarkerEntries )
} . filter { case ( _ , entries ) => ! entries . isEmpty } . map { case ( node , entries ) =>
val markersToSend = entries . asScala . map ( _ . txnMarkerEntry ) . asJava
val requestCompletionHandler = new TransactionMarkerRequestCompletionHandler ( node . id , txnStateManager , this , entries )
RequestAndCompletionHandler ( node , new WriteTxnMarkersRequest . Builder ( markersToSend ) , requestCompletionHandler )
}
. filter { case ( _ , entries ) => ! entries . isEmpty }
. map { case ( node , entries ) =>
val markersToSend : java.util.List [ TxnMarkerEntry ] = entries . asScala . map ( _ . txnMarkerEntry ) . asJava
val requestCompletionHandler = new TransactionMarkerRequestCompletionHandler ( node . id , txnStateManager , this , entries )
RequestAndCompletionHandler ( node , new WriteTxnMarkersRequest . Builder ( markersToSend ) , requestCompletionHandler )
}
}
def addTxnMarkersToSend ( transactionalId : String ,