@ -753,39 +753,56 @@ class ReplicaManager(val config: KafkaConfig,
@@ -753,39 +753,56 @@ class ReplicaManager(val config: KafkaConfig,
val localProduceResults = appendToLocalLog ( internalTopicsAllowed = internalTopicsAllowed ,
origin , verifiedEntries , requiredAcks , requestLocal , verificationGuards . toMap )
debug ( "Produce to local log in %d ms" . format ( time . milliseconds - sTime ) )
val unverifiedResults = unverifiedEntries . map { case ( topicPartition , error ) =>
val message = if ( error == Errors . INVALID_TXN_STATE ) "Partition was not added to the transaction" else error . message ( )
topicPartition -> LogAppendResult (
LogAppendInfo . UNKNOWN_LOG_APPEND_INFO ,
Some ( error . exception ( message ) )
)
}
val errorResults = errorsPerPartition . map { case ( topicPartition , error ) =>
topicPartition -> LogAppendResult (
LogAppendInfo . UNKNOWN_LOG_APPEND_INFO ,
Some ( error . exception ( ) )
)
def produceStatusResult ( appendResult : Map [ TopicPartition , LogAppendResult ] ,
useCustomMessage : Boolean ) : Map [ TopicPartition , ProducePartitionStatus ] = {
appendResult . map { case ( topicPartition , result ) =>
topicPartition -> ProducePartitionStatus (
result . info . lastOffset + 1 , // required offset
new PartitionResponse (
result . error ,
result . info . firstOffset . map [ Long ] ( _ . messageOffset ) . orElse ( - 1L ) ,
result . info . lastOffset ,
result . info . logAppendTime ,
result . info . logStartOffset ,
result . info . recordErrors ,
if ( useCustomMessage ) result . exception . get . getMessage else result . info . errorMessage
)
) // response status
}
}
val allResults = localProduceResults ++ unverifiedResults ++ errorResults
val unverifiedResults = unverifiedEntries . map {
case ( topicPartition , error ) =>
val finalException =
error match {
case Errors . INVALID_TXN_STATE => error . exception ( "Partition was not added to the transaction" )
case Errors . CONCURRENT_TRANSACTIONS |
Errors . COORDINATOR_LOAD_IN_PROGRESS |
Errors . COORDINATOR_NOT_AVAILABLE |
Errors . NOT_COORDINATOR => new NotEnoughReplicasException (
s" Unable to verify the partition has been added to the transaction. Underlying error: ${ error . toString } " )
case _ => error . exception ( )
}
topicPartition -> LogAppendResult (
LogAppendInfo . UNKNOWN_LOG_APPEND_INFO ,
Some ( finalException )
)
}
val produceStatus = allResults . map { case ( topicPartition , result ) =>
topicPartition -> ProducePartitionStatus (
result . info . lastOffset + 1 , // required offset
new PartitionResponse (
result . error ,
result . info . firstOffset . map [ Long ] ( _ . messageOffset ) . orElse ( - 1L ) ,
result . info . lastOffset ,
result . info . logAppendTime ,
result . info . logStartOffset ,
result . info . recordErrors ,
result . info . errorMessage
val errorResults = errorsPerPartition . map {
case ( topicPartition , error ) =>
topicPartition -> LogAppendResult (
LogAppendInfo . UNKNOWN_LOG_APPEND_INFO ,
Some ( error . exception ( ) )
)
) // response status
}
val produceStatus = Set ( ( localProduceResults , false ) , ( unverifiedResults , true ) , ( errorResults , false ) ) . flatMap {
case ( results , useCustomError ) => produceStatusResult ( results , useCustomError )
} . toMap
val allResults = localProduceResults ++ unverifiedResults ++ errorResults
actionQueue . add {
( ) => allResults . foreach { case ( topicPartition , result ) =>
val requestKey = TopicPartitionOperationKey ( topicPartition )
@ -832,28 +849,30 @@ class ReplicaManager(val config: KafkaConfig,
@@ -832,28 +849,30 @@ class ReplicaManager(val config: KafkaConfig,
val ( error , node ) = getTransactionCoordinator ( transactionStatePartition . get )
if ( error != Errors . NONE ) {
throw error . exception ( ) // Can throw coordinator not available -- which is retriable
}
appendEntries ( entriesPerPartition ) ( notYetVerifiedEntriesPerPartition . map {
case ( tp , _ ) => ( tp , error )
} . toMap )
} else {
val topicGrouping = notYetVerifiedEntriesPerPartition . keySet . groupBy ( tp => tp . topic ( ) )
val topicCollection = new AddPartitionsToTxnTopicCollection ( )
topicGrouping . foreach { case ( topic , tps ) =>
topicCollection . add ( new AddPartitionsToTxnTopic ( )
. setName ( topic )
. setPartitions ( tps . map ( tp => Integer . valueOf ( tp . partition ( ) ) ) . toList . asJava ) )
}
val topicGrouping = notYetVerifiedEntriesPerPartition . keySet . groupBy ( tp => tp . topic ( ) )
val topicCollection = new AddPartitionsToTxnTopicCollection ( )
topicGrouping . foreach { case ( topic , tps ) =>
topicCollection . add ( new AddPartitionsToTxnTopic ( )
. setName ( topic )
. setPartitions ( tps . map ( tp => Integer . valueOf ( tp . partition ( ) ) ) . toList . asJava ) )
// Map not yet verified partitions to a request object .
// We verify above that all partitions use the same producer ID .
val batchInfo = notYetVerifiedEntriesPerPartition . head . _2 . firstBatch ( )
val notYetVerifiedTransaction = new AddPartitionsToTxnTransaction ( )
. setTransactionalId ( transactionalId )
. setProducerId ( batchInfo . producerId ( ) )
. setProducerEpoch ( batchInfo . producerEpoch ( ) )
. setVerifyOnly ( true )
. setTopics ( topicCollection )
addPartitionsToTxnManager . foreach ( _ . addTxnData ( node , notYetVerifiedTransaction , KafkaRequestHandler . wrap ( appendEntries ( entriesPerPartition ) ( _ ) ) ) )
}
// Map not yet verified partitions to a request object .
// We verify above that all partitions use the same producer ID .
val batchInfo = notYetVerifiedEntriesPerPartition . head . _2 . firstBatch ( )
val notYetVerifiedTransaction = new AddPartitionsToTxnTransaction ( )
. setTransactionalId ( transactionalId )
. setProducerId ( batchInfo . producerId ( ) )
. setProducerEpoch ( batchInfo . producerEpoch ( ) )
. setVerifyOnly ( true )
. setTopics ( topicCollection )
addPartitionsToTxnManager . foreach ( _ . addTxnData ( node , notYetVerifiedTransaction , KafkaRequestHandler . wrap ( appendEntries ( entriesPerPartition ) ( _ ) ) ) )
}
} else {
// If required . acks is outside accepted range , something is wrong with the client