Browse Source

KAFKA-15626: Replace verification guard object with an specific type (#14568)

I've added a new class with an incrementing atomic long to represent the verification guard. Upon creation of verification guard, we will increment this value and assign it to the guard.

The expected behavior is the same as the object guard, but with better debuggability with the string value and type safety (I found a type safety issue in the current code when implementing this)

Reviewers: Ismael Juma <ismael@juma.me.uk>, Artem Livshits <alivshits@confluent.io>
pull/13762/merge
Justine Olshan 1 year ago committed by GitHub
parent
commit
e8c8969330
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 9
      core/src/main/scala/kafka/cluster/Partition.scala
  2. 45
      core/src/main/scala/kafka/log/UnifiedLog.scala
  3. 15
      core/src/main/scala/kafka/server/ReplicaManager.scala
  4. 4
      core/src/test/scala/unit/kafka/cluster/PartitionLockTest.scala
  5. 18
      core/src/test/scala/unit/kafka/cluster/PartitionTest.scala
  6. 37
      core/src/test/scala/unit/kafka/log/UnifiedLogTest.scala
  7. 53
      core/src/test/scala/unit/kafka/log/VerificationGuardTest.scala
  8. 18
      core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala
  9. 62
      storage/src/main/java/org/apache/kafka/storage/internals/log/VerificationGuard.java
  10. 10
      storage/src/main/java/org/apache/kafka/storage/internals/log/VerificationStateEntry.java

9
core/src/main/scala/kafka/cluster/Partition.scala

@ -45,7 +45,7 @@ import org.apache.kafka.common.utils.Time @@ -45,7 +45,7 @@ import org.apache.kafka.common.utils.Time
import org.apache.kafka.common.{IsolationLevel, TopicPartition, Uuid}
import org.apache.kafka.metadata.LeaderRecoveryState
import org.apache.kafka.server.common.MetadataVersion
import org.apache.kafka.storage.internals.log.{AppendOrigin, FetchDataInfo, FetchIsolation, FetchParams, LeaderHwChange, LogAppendInfo, LogOffsetMetadata, LogOffsetSnapshot, LogOffsetsListener, LogReadInfo, LogStartOffsetIncrementReason}
import org.apache.kafka.storage.internals.log.{AppendOrigin, FetchDataInfo, FetchIsolation, FetchParams, LeaderHwChange, LogAppendInfo, LogOffsetMetadata, LogOffsetSnapshot, LogOffsetsListener, LogReadInfo, LogStartOffsetIncrementReason, VerificationGuard}
import org.apache.kafka.server.metrics.KafkaMetricsGroup
import scala.collection.{Map, Seq}
@ -581,8 +581,9 @@ class Partition(val topicPartition: TopicPartition, @@ -581,8 +581,9 @@ class Partition(val topicPartition: TopicPartition,
}
}
// Returns a verification guard object if we need to verify. This starts or continues the verification process. Otherwise return null.
def maybeStartTransactionVerification(producerId: Long, sequence: Int, epoch: Short): Object = {
// Returns a VerificationGuard if we need to verify. This starts or continues the verification process. Otherwise return the
// sentinel VerificationGuard.
def maybeStartTransactionVerification(producerId: Long, sequence: Int, epoch: Short): VerificationGuard = {
leaderLogIfLocal match {
case Some(log) => log.maybeStartTransactionVerification(producerId, sequence, epoch)
case None => throw new NotLeaderOrFollowerException();
@ -1301,7 +1302,7 @@ class Partition(val topicPartition: TopicPartition, @@ -1301,7 +1302,7 @@ class Partition(val topicPartition: TopicPartition,
}
def appendRecordsToLeader(records: MemoryRecords, origin: AppendOrigin, requiredAcks: Int,
requestLocal: RequestLocal, verificationGuard: Object = null): LogAppendInfo = {
requestLocal: RequestLocal, verificationGuard: VerificationGuard = VerificationGuard.SENTINEL): LogAppendInfo = {
val (info, leaderHWIncremented) = inReadLock(leaderIsrUpdateLock) {
leaderLogIfLocal match {
case Some(leaderLog) =>

45
core/src/main/scala/kafka/log/UnifiedLog.scala

@ -40,7 +40,7 @@ import org.apache.kafka.server.record.BrokerCompressionType @@ -40,7 +40,7 @@ import org.apache.kafka.server.record.BrokerCompressionType
import org.apache.kafka.server.util.Scheduler
import org.apache.kafka.storage.internals.checkpoint.LeaderEpochCheckpointFile
import org.apache.kafka.storage.internals.epoch.LeaderEpochFileCache
import org.apache.kafka.storage.internals.log.{AbortedTxn, AppendOrigin, BatchMetadata, CompletedTxn, EpochEntry, FetchDataInfo, FetchIsolation, LastRecord, LeaderHwChange, LogAppendInfo, LogConfig, LogDirFailureChannel, LogFileUtils, LogOffsetMetadata, LogOffsetSnapshot, LogOffsetsListener, LogSegment, LogSegments, LogStartOffsetIncrementReason, LogValidator, ProducerAppendInfo, ProducerStateManager, ProducerStateManagerConfig, RollParams}
import org.apache.kafka.storage.internals.log.{AbortedTxn, AppendOrigin, BatchMetadata, CompletedTxn, EpochEntry, FetchDataInfo, FetchIsolation, LastRecord, LeaderHwChange, LogAppendInfo, LogConfig, LogDirFailureChannel, LogFileUtils, LogOffsetMetadata, LogOffsetSnapshot, LogOffsetsListener, LogSegment, LogSegments, LogStartOffsetIncrementReason, LogValidator, ProducerAppendInfo, ProducerStateManager, ProducerStateManagerConfig, RollParams, VerificationGuard}
import java.io.{File, IOException}
import java.nio.file.Files
@ -599,31 +599,32 @@ class UnifiedLog(@volatile var logStartOffset: Long, @@ -599,31 +599,32 @@ class UnifiedLog(@volatile var logStartOffset: Long,
}
/**
* Maybe create and return the verification guard object for the given producer ID if the transaction is not yet ongoing.
* Creation starts the verification process. Otherwise return null.
* Maybe create and return the VerificationGuard for the given producer ID if the transaction is not yet ongoing.
* Creation starts the verification process. Otherwise return the sentinel VerificationGuard.
*/
def maybeStartTransactionVerification(producerId: Long, sequence: Int, epoch: Short): Object = lock synchronized {
def maybeStartTransactionVerification(producerId: Long, sequence: Int, epoch: Short): VerificationGuard = lock synchronized {
if (hasOngoingTransaction(producerId))
null
VerificationGuard.SENTINEL
else
maybeCreateVerificationGuard(producerId, sequence, epoch)
}
/**
* Maybe create the VerificationStateEntry for the given producer ID -- always return the verification guard
* Maybe create the VerificationStateEntry for the given producer ID -- always return the VerificationGuard
*/
def maybeCreateVerificationGuard(producerId: Long,
sequence: Int,
epoch: Short): Object = lock synchronized {
epoch: Short): VerificationGuard = lock synchronized {
producerStateManager.maybeCreateVerificationStateEntry(producerId, sequence, epoch).verificationGuard
}
/**
* If an VerificationStateEntry is present for the given producer ID, return its verification guard, otherwise, return null.
* If an VerificationStateEntry is present for the given producer ID, return its VerificationGuard, otherwise, return the
* sentinel VerificationGuard.
*/
def verificationGuard(producerId: Long): Object = lock synchronized {
def verificationGuard(producerId: Long): VerificationGuard = lock synchronized {
val entry = producerStateManager.verificationStateEntry(producerId)
if (entry != null) entry.verificationGuard else null
if (entry != null) entry.verificationGuard else VerificationGuard.SENTINEL
}
/**
@ -715,7 +716,7 @@ class UnifiedLog(@volatile var logStartOffset: Long, @@ -715,7 +716,7 @@ class UnifiedLog(@volatile var logStartOffset: Long,
origin: AppendOrigin = AppendOrigin.CLIENT,
interBrokerProtocolVersion: MetadataVersion = MetadataVersion.latest,
requestLocal: RequestLocal = RequestLocal.NoCaching,
verificationGuard: Object = null): LogAppendInfo = {
verificationGuard: VerificationGuard = VerificationGuard.SENTINEL): LogAppendInfo = {
val validateAndAssignOffsets = origin != AppendOrigin.RAFT_LEADER
append(records, origin, interBrokerProtocolVersion, validateAndAssignOffsets, leaderEpoch, Some(requestLocal), verificationGuard, ignoreRecordSize = false)
}
@ -734,7 +735,7 @@ class UnifiedLog(@volatile var logStartOffset: Long, @@ -734,7 +735,7 @@ class UnifiedLog(@volatile var logStartOffset: Long,
validateAndAssignOffsets = false,
leaderEpoch = -1,
requestLocal = None,
verificationGuard = null,
verificationGuard = VerificationGuard.SENTINEL,
// disable to check the validation of record size since the record is already accepted by leader.
ignoreRecordSize = true)
}
@ -763,7 +764,7 @@ class UnifiedLog(@volatile var logStartOffset: Long, @@ -763,7 +764,7 @@ class UnifiedLog(@volatile var logStartOffset: Long,
validateAndAssignOffsets: Boolean,
leaderEpoch: Int,
requestLocal: Option[RequestLocal],
verificationGuard: Object,
verificationGuard: VerificationGuard,
ignoreRecordSize: Boolean): LogAppendInfo = {
// We want to ensure the partition metadata file is written to the log dir before any log data is written to disk.
// This will ensure that any log data can be recovered with the correct topic ID in the case of failure.
@ -1024,7 +1025,7 @@ class UnifiedLog(@volatile var logStartOffset: Long, @@ -1024,7 +1025,7 @@ class UnifiedLog(@volatile var logStartOffset: Long,
private def analyzeAndValidateProducerState(appendOffsetMetadata: LogOffsetMetadata,
records: MemoryRecords,
origin: AppendOrigin,
requestVerificationGuard: Object):
requestVerificationGuard: VerificationGuard):
(mutable.Map[Long, ProducerAppendInfo], List[CompletedTxn], Option[BatchMetadata]) = {
val updatedProducers = mutable.Map.empty[Long, ProducerAppendInfo]
val completedTxns = ListBuffer.empty[CompletedTxn]
@ -1049,17 +1050,17 @@ class UnifiedLog(@volatile var logStartOffset: Long, @@ -1049,17 +1050,17 @@ class UnifiedLog(@volatile var logStartOffset: Long,
// the partition. If we do not have an ongoing transaction or correct guard, return an error and do not append.
// There are two phases -- the first append to the log and subsequent appends.
//
// 1. First append: Verification starts with creating a verification guard object, sending a verification request to the transaction coordinator, and
// given a "verified" response, continuing the append path. (A non-verified response throws an error.) We create the unique verification guard for the transaction
// 1. First append: Verification starts with creating a VerificationGuard, sending a verification request to the transaction coordinator, and
// given a "verified" response, continuing the append path. (A non-verified response throws an error.) We create the unique VerificationGuard for the transaction
// to ensure there is no race between the transaction coordinator response and an abort marker getting written to the log. We need a unique guard because we could
// have a sequence of events where we start a transaction verification, have the transaction coordinator send a verified response, write an abort marker,
// start a new transaction not aware of the partition, and receive the stale verification (ABA problem). With a unique verification guard object, this sequence would not
// start a new transaction not aware of the partition, and receive the stale verification (ABA problem). With a unique VerificationGuard, this sequence would not
// result in appending to the log and would return an error. The guard is removed after the first append to the transaction and from then, we can rely on phase 2.
//
// 2. Subsequent appends: Once we write to the transaction, the in-memory state currentTxnFirstOffset is populated. This field remains until the
// transaction is completed or aborted. We can guarantee the transaction coordinator knows about the transaction given step 1 and that the transaction is still
// ongoing. If the transaction is expected to be ongoing, we will not set a verification guard. If the transaction is aborted, hasOngoingTransaction is false and
// requestVerificationGuard is null, so we will throw an error. A subsequent produce request (retry) should create verification state and return to phase 1.
// ongoing. If the transaction is expected to be ongoing, we will not set a VerificationGuard. If the transaction is aborted, hasOngoingTransaction is false and
// requestVerificationGuard is the sentinel, so we will throw an error. A subsequent produce request (retry) should create verification state and return to phase 1.
if (batch.isTransactional && !hasOngoingTransaction(batch.producerId) && batchMissingRequiredVerification(batch, requestVerificationGuard))
throw new InvalidTxnStateException("Record was not part of an ongoing transaction")
}
@ -1080,9 +1081,9 @@ class UnifiedLog(@volatile var logStartOffset: Long, @@ -1080,9 +1081,9 @@ class UnifiedLog(@volatile var logStartOffset: Long,
(updatedProducers, completedTxns.toList, None)
}
private def batchMissingRequiredVerification(batch: MutableRecordBatch, requestVerificationGuard: Object): Boolean = {
private def batchMissingRequiredVerification(batch: MutableRecordBatch, requestVerificationGuard: VerificationGuard): Boolean = {
producerStateManager.producerStateManagerConfig().transactionVerificationEnabled() && !batch.isControlBatch &&
(requestVerificationGuard != verificationGuard(batch.producerId) || requestVerificationGuard == null)
!verificationGuard(batch.producerId).verify(requestVerificationGuard)
}
/**
@ -1991,7 +1992,7 @@ object UnifiedLog extends Logging { @@ -1991,7 +1992,7 @@ object UnifiedLog extends Logging {
val producerId = batch.producerId
val appendInfo = producers.getOrElseUpdate(producerId, producerStateManager.prepareUpdate(producerId, origin))
val completedTxn = appendInfo.append(batch, firstOffsetMetadata.asJava).asScala
// Whether we wrote a control marker or a data batch, we can remove verification guard since either the transaction is complete or we have a first offset.
// Whether we wrote a control marker or a data batch, we can remove VerificationGuard since either the transaction is complete or we have a first offset.
if (batch.isTransactional)
producerStateManager.clearVerificationStateEntry(producerId)
completedTxn

15
core/src/main/scala/kafka/server/ReplicaManager.scala

@ -58,7 +58,7 @@ import org.apache.kafka.metadata.LeaderConstants.NO_LEADER @@ -58,7 +58,7 @@ import org.apache.kafka.metadata.LeaderConstants.NO_LEADER
import org.apache.kafka.server.common.MetadataVersion._
import org.apache.kafka.server.metrics.KafkaMetricsGroup
import org.apache.kafka.server.util.{Scheduler, ShutdownableThread}
import org.apache.kafka.storage.internals.log.{AppendOrigin, FetchDataInfo, FetchParams, FetchPartitionData, LeaderHwChange, LogAppendInfo, LogConfig, LogDirFailureChannel, LogOffsetMetadata, LogReadInfo, RecordValidationException, RemoteLogReadResult, RemoteStorageFetchInfo}
import org.apache.kafka.storage.internals.log.{AppendOrigin, FetchDataInfo, FetchParams, FetchPartitionData, LeaderHwChange, LogAppendInfo, LogConfig, LogDirFailureChannel, LogOffsetMetadata, LogReadInfo, RecordValidationException, RemoteLogReadResult, RemoteStorageFetchInfo, VerificationGuard}
import java.io.File
import java.nio.file.{Files, Paths}
@ -735,7 +735,7 @@ class ReplicaManager(val config: KafkaConfig, @@ -735,7 +735,7 @@ class ReplicaManager(val config: KafkaConfig,
if (isValidRequiredAcks(requiredAcks)) {
val sTime = time.milliseconds
val verificationGuards: mutable.Map[TopicPartition, Object] = mutable.Map[TopicPartition, Object]()
val verificationGuards: mutable.Map[TopicPartition, VerificationGuard] = mutable.Map[TopicPartition, VerificationGuard]()
val (verifiedEntriesPerPartition, notYetVerifiedEntriesPerPartition, errorsPerPartition) =
if (transactionalId == null || !config.transactionPartitionVerificationEnable)
(entriesPerPartition, Map.empty[TopicPartition, MemoryRecords], Map.empty[TopicPartition, Errors])
@ -864,7 +864,7 @@ class ReplicaManager(val config: KafkaConfig, @@ -864,7 +864,7 @@ class ReplicaManager(val config: KafkaConfig,
}
}
private def partitionEntriesForVerification(verificationGuards: mutable.Map[TopicPartition, Object],
private def partitionEntriesForVerification(verificationGuards: mutable.Map[TopicPartition, VerificationGuard],
entriesPerPartition: Map[TopicPartition, MemoryRecords],
verifiedEntries: mutable.Map[TopicPartition, MemoryRecords],
unverifiedEntries: mutable.Map[TopicPartition, MemoryRecords],
@ -877,10 +877,10 @@ class ReplicaManager(val config: KafkaConfig, @@ -877,10 +877,10 @@ class ReplicaManager(val config: KafkaConfig,
transactionalBatches.foreach(batch => transactionalProducerIds.add(batch.producerId))
if (transactionalBatches.nonEmpty) {
// We return verification guard if the partition needs to be verified. If no state is present, no need to verify.
// We return VerificationGuard if the partition needs to be verified. If no state is present, no need to verify.
val firstBatch = records.firstBatch
val verificationGuard = getPartitionOrException(topicPartition).maybeStartTransactionVerification(firstBatch.producerId, firstBatch.baseSequence, firstBatch.producerEpoch)
if (verificationGuard != null) {
if (verificationGuard != VerificationGuard.SENTINEL) {
verificationGuards.put(topicPartition, verificationGuard)
unverifiedEntries.put(topicPartition, records)
} else
@ -1156,7 +1156,7 @@ class ReplicaManager(val config: KafkaConfig, @@ -1156,7 +1156,7 @@ class ReplicaManager(val config: KafkaConfig,
entriesPerPartition: Map[TopicPartition, MemoryRecords],
requiredAcks: Short,
requestLocal: RequestLocal,
verificationGuards: Map[TopicPartition, Object]): Map[TopicPartition, LogAppendResult] = {
verificationGuards: Map[TopicPartition, VerificationGuard]): Map[TopicPartition, LogAppendResult] = {
val traceEnabled = isTraceEnabled
def processFailedRecord(topicPartition: TopicPartition, t: Throwable) = {
val logStartOffset = onlinePartition(topicPartition).map(_.logStartOffset).getOrElse(-1L)
@ -1183,7 +1183,8 @@ class ReplicaManager(val config: KafkaConfig, @@ -1183,7 +1183,8 @@ class ReplicaManager(val config: KafkaConfig,
} else {
try {
val partition = getPartitionOrException(topicPartition)
val info = partition.appendRecordsToLeader(records, origin, requiredAcks, requestLocal, verificationGuards.getOrElse(topicPartition, null))
val info = partition.appendRecordsToLeader(records, origin, requiredAcks, requestLocal,
verificationGuards.getOrElse(topicPartition, VerificationGuard.SENTINEL))
val numAppendedMessages = info.numMessages
// update stats for successfully appended bytes and messages as bytesInRate and messageInRate

4
core/src/test/scala/unit/kafka/cluster/PartitionLockTest.scala

@ -37,7 +37,7 @@ import org.apache.kafka.common.{TopicPartition, Uuid} @@ -37,7 +37,7 @@ import org.apache.kafka.common.{TopicPartition, Uuid}
import org.apache.kafka.server.common.MetadataVersion
import org.apache.kafka.server.util.MockTime
import org.apache.kafka.storage.internals.epoch.LeaderEpochFileCache
import org.apache.kafka.storage.internals.log.{AppendOrigin, CleanerConfig, FetchIsolation, FetchParams, LogAppendInfo, LogConfig, LogDirFailureChannel, LogSegments, ProducerStateManager, ProducerStateManagerConfig}
import org.apache.kafka.storage.internals.log.{AppendOrigin, CleanerConfig, FetchIsolation, FetchParams, LogAppendInfo, LogConfig, LogDirFailureChannel, LogSegments, ProducerStateManager, ProducerStateManagerConfig, VerificationGuard}
import org.junit.jupiter.api.Assertions.{assertEquals, assertFalse, assertTrue}
import org.junit.jupiter.api.{AfterEach, BeforeEach, Test}
import org.mockito.ArgumentMatchers
@ -450,7 +450,7 @@ class PartitionLockTest extends Logging { @@ -450,7 +450,7 @@ class PartitionLockTest extends Logging {
keepPartitionMetadataFile = true) {
override def appendAsLeader(records: MemoryRecords, leaderEpoch: Int, origin: AppendOrigin,
interBrokerProtocolVersion: MetadataVersion, requestLocal: RequestLocal, verificationGuard: Object): LogAppendInfo = {
interBrokerProtocolVersion: MetadataVersion, requestLocal: RequestLocal, verificationGuard: VerificationGuard): LogAppendInfo = {
val appendInfo = super.appendAsLeader(records, leaderEpoch, origin, interBrokerProtocolVersion, requestLocal, verificationGuard)
appendSemaphore.acquire()
appendInfo

18
core/src/test/scala/unit/kafka/cluster/PartitionTest.scala

@ -56,7 +56,7 @@ import org.apache.kafka.server.common.MetadataVersion.IBP_2_6_IV0 @@ -56,7 +56,7 @@ import org.apache.kafka.server.common.MetadataVersion.IBP_2_6_IV0
import org.apache.kafka.server.metrics.KafkaYammerMetrics
import org.apache.kafka.server.util.{KafkaScheduler, MockTime}
import org.apache.kafka.storage.internals.epoch.LeaderEpochFileCache
import org.apache.kafka.storage.internals.log.{AppendOrigin, CleanerConfig, EpochEntry, FetchIsolation, FetchParams, LogAppendInfo, LogDirFailureChannel, LogOffsetMetadata, LogReadInfo, LogSegments, LogStartOffsetIncrementReason, ProducerStateManager, ProducerStateManagerConfig}
import org.apache.kafka.storage.internals.log.{AppendOrigin, CleanerConfig, EpochEntry, FetchIsolation, FetchParams, LogAppendInfo, LogDirFailureChannel, LogOffsetMetadata, LogReadInfo, LogSegments, LogStartOffsetIncrementReason, ProducerStateManager, ProducerStateManagerConfig, VerificationGuard}
import org.junit.jupiter.params.ParameterizedTest
import org.junit.jupiter.params.provider.ValueSource
@ -3546,25 +3546,25 @@ class PartitionTest extends AbstractPartitionTest { @@ -3546,25 +3546,25 @@ class PartitionTest extends AbstractPartitionTest {
baseSequence = 3,
producerId = producerId)
// When verification guard is not there, we should not be able to append.
// When VerificationGuard is not there, we should not be able to append.
assertThrows(classOf[InvalidTxnStateException], () => partition.appendRecordsToLeader(transactionRecords(), origin = AppendOrigin.CLIENT, requiredAcks = 1, RequestLocal.withThreadConfinedCaching))
// Before appendRecordsToLeader is called, ReplicaManager will call maybeStartTransactionVerification. We should get a non-null verification object.
// Before appendRecordsToLeader is called, ReplicaManager will call maybeStartTransactionVerification. We should get a non-sentinel VerificationGuard.
val verificationGuard = partition.maybeStartTransactionVerification(producerId, 3, 0)
assertNotNull(verificationGuard)
assertNotEquals(VerificationGuard.SENTINEL, verificationGuard)
// With the wrong verification guard, append should fail.
// With the wrong VerificationGuard, append should fail.
assertThrows(classOf[InvalidTxnStateException], () => partition.appendRecordsToLeader(transactionRecords(),
origin = AppendOrigin.CLIENT, requiredAcks = 1, RequestLocal.withThreadConfinedCaching, Optional.of(new Object)))
origin = AppendOrigin.CLIENT, requiredAcks = 1, RequestLocal.withThreadConfinedCaching, new VerificationGuard()))
// We should return the same verification object when we still need to verify. Append should proceed.
// We should return the same VerificationGuard when we still need to verify. Append should proceed.
val verificationGuard2 = partition.maybeStartTransactionVerification(producerId, 3, 0)
assertEquals(verificationGuard, verificationGuard2)
partition.appendRecordsToLeader(transactionRecords(), origin = AppendOrigin.CLIENT, requiredAcks = 1, RequestLocal.withThreadConfinedCaching, verificationGuard)
// We should no longer need a verification object. Future appends without verification guard will also succeed.
// We should no longer need a VerificationGuard. Future appends without VerificationGuard will also succeed.
val verificationGuard3 = partition.maybeStartTransactionVerification(producerId, 3, 0)
assertNull(verificationGuard3)
assertEquals(VerificationGuard.SENTINEL, verificationGuard3)
partition.appendRecordsToLeader(transactionRecords(), origin = AppendOrigin.CLIENT, requiredAcks = 1, RequestLocal.withThreadConfinedCaching)
}

37
core/src/test/scala/unit/kafka/log/UnifiedLogTest.scala

@ -36,7 +36,7 @@ import org.apache.kafka.server.metrics.KafkaYammerMetrics @@ -36,7 +36,7 @@ import org.apache.kafka.server.metrics.KafkaYammerMetrics
import org.apache.kafka.server.util.{KafkaScheduler, MockTime, Scheduler}
import org.apache.kafka.storage.internals.checkpoint.LeaderEpochCheckpointFile
import org.apache.kafka.storage.internals.epoch.LeaderEpochFileCache
import org.apache.kafka.storage.internals.log.{AbortedTxn, AppendOrigin, EpochEntry, FetchIsolation, LogConfig, LogFileUtils, LogOffsetMetadata, LogOffsetSnapshot, LogOffsetsListener, LogSegment, LogStartOffsetIncrementReason, ProducerStateManager, ProducerStateManagerConfig, RecordValidationException}
import org.apache.kafka.storage.internals.log.{AbortedTxn, AppendOrigin, EpochEntry, FetchIsolation, LogConfig, LogFileUtils, LogOffsetMetadata, LogOffsetSnapshot, LogOffsetsListener, LogSegment, LogStartOffsetIncrementReason, ProducerStateManager, ProducerStateManagerConfig, RecordValidationException, VerificationGuard}
import org.junit.jupiter.api.Assertions._
import org.junit.jupiter.api.{AfterEach, BeforeEach, Test}
import org.junit.jupiter.params.ParameterizedTest
@ -3738,7 +3738,8 @@ class UnifiedLogTest { @@ -3738,7 +3738,8 @@ class UnifiedLogTest {
val logConfig = LogTestUtils.createLogConfig(segmentBytes = 2048 * 5)
val log = createLog(logDir, logConfig, producerStateManagerConfig = producerStateManagerConfig)
assertFalse(log.hasOngoingTransaction(producerId))
assertNull(log.verificationGuard(producerId))
assertEquals(VerificationGuard.SENTINEL, log.verificationGuard(producerId))
assertFalse(log.verificationGuard(producerId).verify(VerificationGuard.SENTINEL))
val idempotentRecords = MemoryRecords.withIdempotentRecords(
CompressionType.NONE,
@ -3763,22 +3764,23 @@ class UnifiedLogTest { @@ -3763,22 +3764,23 @@ class UnifiedLogTest {
)
val verificationGuard = log.maybeStartTransactionVerification(producerId, sequence, producerEpoch)
assertNotNull(verificationGuard)
assertNotEquals(VerificationGuard.SENTINEL, verificationGuard)
log.appendAsLeader(idempotentRecords, origin = appendOrigin, leaderEpoch = 0)
assertFalse(log.hasOngoingTransaction(producerId))
// Since we wrote idempotent records, we keep verification guard.
// Since we wrote idempotent records, we keep VerificationGuard.
assertEquals(verificationGuard, log.verificationGuard(producerId))
// Now write the transactional records
assertTrue(log.verificationGuard(producerId).verify(verificationGuard))
log.appendAsLeader(transactionalRecords, origin = appendOrigin, leaderEpoch = 0, verificationGuard = verificationGuard)
assertTrue(log.hasOngoingTransaction(producerId))
// Verification guard should be cleared now.
assertNull(log.verificationGuard(producerId))
// VerificationGuard should be cleared now.
assertEquals(VerificationGuard.SENTINEL, log.verificationGuard(producerId))
// A subsequent maybeStartTransactionVerification will be empty since we are already verified.
assertNull(log.maybeStartTransactionVerification(producerId, sequence, producerEpoch))
assertEquals(VerificationGuard.SENTINEL, log.maybeStartTransactionVerification(producerId, sequence, producerEpoch))
val endTransactionMarkerRecord = MemoryRecords.withEndTransactionMarker(
producerId,
@ -3788,15 +3790,16 @@ class UnifiedLogTest { @@ -3788,15 +3790,16 @@ class UnifiedLogTest {
log.appendAsLeader(endTransactionMarkerRecord, origin = AppendOrigin.COORDINATOR, leaderEpoch = 0)
assertFalse(log.hasOngoingTransaction(producerId))
assertNull(log.verificationGuard(producerId))
assertEquals(VerificationGuard.SENTINEL, log.verificationGuard(producerId))
if (appendOrigin == AppendOrigin.CLIENT)
sequence = sequence + 1
// A new maybeStartTransactionVerification will not be empty, as we need to verify the next transaction.
val newVerificationGuard = log.maybeStartTransactionVerification(producerId, sequence, producerEpoch)
assertNotNull(newVerificationGuard)
assertNotEquals(VerificationGuard.SENTINEL, newVerificationGuard)
assertNotEquals(verificationGuard, newVerificationGuard)
assertFalse(verificationGuard.verify(newVerificationGuard))
}
@Test
@ -3809,7 +3812,7 @@ class UnifiedLogTest { @@ -3809,7 +3812,7 @@ class UnifiedLogTest {
val log = createLog(logDir, logConfig, producerStateManagerConfig = producerStateManagerConfig)
val verificationGuard = log.maybeStartTransactionVerification(producerId, 0, producerEpoch)
assertNotNull(verificationGuard)
assertNotEquals(VerificationGuard.SENTINEL, verificationGuard)
val endTransactionMarkerRecord = MemoryRecords.withEndTransactionMarker(
producerId,
@ -3819,7 +3822,7 @@ class UnifiedLogTest { @@ -3819,7 +3822,7 @@ class UnifiedLogTest {
log.appendAsLeader(endTransactionMarkerRecord, origin = AppendOrigin.COORDINATOR, leaderEpoch = 0)
assertFalse(log.hasOngoingTransaction(producerId))
assertNull(log.verificationGuard(producerId))
assertEquals(VerificationGuard.SENTINEL, log.verificationGuard(producerId))
}
@Test
@ -3832,7 +3835,7 @@ class UnifiedLogTest { @@ -3832,7 +3835,7 @@ class UnifiedLogTest {
val log = createLog(logDir, logConfig, producerStateManagerConfig = producerStateManagerConfig)
val verificationGuard = log.maybeStartTransactionVerification(producerId, 0, producerEpoch)
assertNotNull(verificationGuard)
assertNotEquals(VerificationGuard.SENTINEL, verificationGuard)
producerStateManagerConfig.setTransactionVerificationEnabled(false)
@ -3847,7 +3850,7 @@ class UnifiedLogTest { @@ -3847,7 +3850,7 @@ class UnifiedLogTest {
log.appendAsLeader(transactionalRecords, leaderEpoch = 0)
assertTrue(log.hasOngoingTransaction(producerId))
assertNull(log.verificationGuard(producerId))
assertEquals(VerificationGuard.SENTINEL, log.verificationGuard(producerId))
}
@Test
@ -3872,14 +3875,14 @@ class UnifiedLogTest { @@ -3872,14 +3875,14 @@ class UnifiedLogTest {
)
assertThrows(classOf[InvalidTxnStateException], () => log.appendAsLeader(transactionalRecords, leaderEpoch = 0))
assertFalse(log.hasOngoingTransaction(producerId))
assertNull(log.verificationGuard(producerId))
assertEquals(VerificationGuard.SENTINEL, log.verificationGuard(producerId))
val verificationGuard = log.maybeStartTransactionVerification(producerId, sequence, producerEpoch)
assertNotNull(verificationGuard)
assertNotEquals(VerificationGuard.SENTINEL, verificationGuard)
log.appendAsLeader(transactionalRecords, leaderEpoch = 0, verificationGuard = verificationGuard)
assertTrue(log.hasOngoingTransaction(producerId))
assertNull(log.verificationGuard(producerId))
assertEquals(VerificationGuard.SENTINEL, log.verificationGuard(producerId))
}
@Test
@ -3892,7 +3895,7 @@ class UnifiedLogTest { @@ -3892,7 +3895,7 @@ class UnifiedLogTest {
val logConfig = LogTestUtils.createLogConfig(segmentBytes = 2048 * 5)
val log = createLog(logDir, logConfig, producerStateManagerConfig = producerStateManagerConfig)
assertFalse(log.hasOngoingTransaction(producerId))
assertNull(log.verificationGuard(producerId))
assertEquals(VerificationGuard.SENTINEL, log.verificationGuard(producerId))
val transactionalRecords = MemoryRecords.withTransactionalRecords(
CompressionType.NONE,

53
core/src/test/scala/unit/kafka/log/VerificationGuardTest.scala

@ -0,0 +1,53 @@ @@ -0,0 +1,53 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package unit.kafka.log
import org.apache.kafka.storage.internals.log.VerificationGuard
import org.apache.kafka.storage.internals.log.VerificationGuard.SENTINEL
import org.junit.jupiter.api.Assertions.{assertEquals, assertFalse, assertNotEquals, assertTrue}
import org.junit.jupiter.api.Test
class VerificationGuardTest {
@Test
def testEqualsAndHashCode(): Unit = {
val verificationGuard1 = new VerificationGuard
val verificationGuard2 = new VerificationGuard
assertNotEquals(verificationGuard1, verificationGuard2)
assertNotEquals(SENTINEL, verificationGuard1)
assertEquals(SENTINEL, SENTINEL)
assertNotEquals(verificationGuard1.hashCode, verificationGuard2.hashCode)
assertNotEquals(SENTINEL.hashCode, verificationGuard1.hashCode)
assertEquals(SENTINEL.hashCode, SENTINEL.hashCode)
}
@Test
def testVerify(): Unit = {
val verificationGuard1 = new VerificationGuard
val verificationGuard2 = new VerificationGuard
assertFalse(verificationGuard1.verify(verificationGuard2))
assertFalse(verificationGuard1.verify(SENTINEL))
assertFalse(SENTINEL.verify(verificationGuard1))
assertFalse(SENTINEL.verify(SENTINEL))
assertTrue(verificationGuard1.verify(verificationGuard1))
}
}

18
core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala

@ -60,7 +60,7 @@ import org.apache.kafka.server.common.OffsetAndEpoch @@ -60,7 +60,7 @@ import org.apache.kafka.server.common.OffsetAndEpoch
import org.apache.kafka.server.common.MetadataVersion.IBP_2_6_IV0
import org.apache.kafka.server.metrics.{KafkaMetricsGroup, KafkaYammerMetrics}
import org.apache.kafka.server.util.{MockScheduler, MockTime}
import org.apache.kafka.storage.internals.log.{AppendOrigin, FetchDataInfo, FetchIsolation, FetchParams, FetchPartitionData, LogConfig, LogDirFailureChannel, LogOffsetMetadata, LogSegments, LogStartOffsetIncrementReason, ProducerStateManager, ProducerStateManagerConfig, RemoteStorageFetchInfo}
import org.apache.kafka.storage.internals.log.{AppendOrigin, FetchDataInfo, FetchIsolation, FetchParams, FetchPartitionData, LogConfig, LogDirFailureChannel, LogOffsetMetadata, LogSegments, LogStartOffsetIncrementReason, ProducerStateManager, ProducerStateManagerConfig, RemoteStorageFetchInfo, VerificationGuard}
import org.junit.jupiter.api.Assertions._
import org.junit.jupiter.api.{AfterEach, BeforeEach, Test}
import org.junit.jupiter.params.ParameterizedTest
@ -2163,7 +2163,7 @@ class ReplicaManagerTest { @@ -2163,7 +2163,7 @@ class ReplicaManagerTest {
new SimpleRecord("message".getBytes))
appendRecords(replicaManager, tp0, idempotentRecords)
verify(addPartitionsToTxnManager, times(0)).verifyTransaction(any(), any(), any(), any(), any[AddPartitionsToTxnManager.AppendCallback]())
assertNull(getVerificationGuard(replicaManager, tp0, producerId))
assertEquals(VerificationGuard.SENTINEL, getVerificationGuard(replicaManager, tp0, producerId))
// If we supply a transactional ID and some transactional and some idempotent records, we should only verify the topic partition with transactional records.
val transactionalRecords = MemoryRecords.withTransactionalRecords(CompressionType.NONE, producerId, producerEpoch, sequence + 1,
@ -2179,8 +2179,8 @@ class ReplicaManagerTest { @@ -2179,8 +2179,8 @@ class ReplicaManagerTest {
ArgumentMatchers.eq(Seq(tp0)),
any[AddPartitionsToTxnManager.AppendCallback]()
)
assertNotNull(getVerificationGuard(replicaManager, tp0, producerId))
assertNull(getVerificationGuard(replicaManager, tp1, producerId))
assertNotEquals(VerificationGuard.SENTINEL, getVerificationGuard(replicaManager, tp0, producerId))
assertEquals(VerificationGuard.SENTINEL, getVerificationGuard(replicaManager, tp1, producerId))
} finally {
replicaManager.shutdown(checkpointHW = false)
}
@ -2238,7 +2238,7 @@ class ReplicaManagerTest { @@ -2238,7 +2238,7 @@ class ReplicaManagerTest {
val callback2: AddPartitionsToTxnManager.AppendCallback = appendCallback2.getValue()
callback2(Map.empty[TopicPartition, Errors].toMap)
assertEquals(null, getVerificationGuard(replicaManager, tp0, producerId))
assertEquals(VerificationGuard.SENTINEL, getVerificationGuard(replicaManager, tp0, producerId))
assertTrue(replicaManager.localLog(tp0).get.hasOngoingTransaction(producerId))
} finally {
replicaManager.shutdown(checkpointHW = false)
@ -2297,7 +2297,7 @@ class ReplicaManagerTest { @@ -2297,7 +2297,7 @@ class ReplicaManagerTest {
)
assertEquals(verificationGuard, getVerificationGuard(replicaManager, tp0, producerId))
// Verification should succeed, but we expect to fail with OutOfOrderSequence and for the verification guard to remain.
// Verification should succeed, but we expect to fail with OutOfOrderSequence and for the VerificationGuard to remain.
val callback2: AddPartitionsToTxnManager.AppendCallback = appendCallback2.getValue()
callback2(Map.empty[TopicPartition, Errors].toMap)
assertEquals(verificationGuard, getVerificationGuard(replicaManager, tp0, producerId))
@ -2424,7 +2424,7 @@ class ReplicaManagerTest { @@ -2424,7 +2424,7 @@ class ReplicaManagerTest {
val transactionalRecords = MemoryRecords.withTransactionalRecords(CompressionType.NONE, producerId, producerEpoch, sequence,
new SimpleRecord(s"message $sequence".getBytes))
appendRecords(replicaManager, tp, transactionalRecords, transactionalId = transactionalId)
assertNull(getVerificationGuard(replicaManager, tp, producerId))
assertEquals(VerificationGuard.SENTINEL, getVerificationGuard(replicaManager, tp, producerId))
// We should not add these partitions to the manager to verify.
verify(addPartitionsToTxnManager, times(0)).verifyTransaction(any(), any(), any(), any(), any())
@ -2442,7 +2442,7 @@ class ReplicaManagerTest { @@ -2442,7 +2442,7 @@ class ReplicaManagerTest {
appendRecords(replicaManager, tp, moreTransactionalRecords, transactionalId = transactionalId)
verify(addPartitionsToTxnManager, times(0)).verifyTransaction(any(), any(), any(), any(), any())
assertEquals(null, getVerificationGuard(replicaManager, tp, producerId))
assertEquals(VerificationGuard.SENTINEL, getVerificationGuard(replicaManager, tp, producerId))
assertTrue(replicaManager.localLog(tp).get.hasOngoingTransaction(producerId))
} finally {
replicaManager.shutdown(checkpointHW = false)
@ -2496,7 +2496,7 @@ class ReplicaManagerTest { @@ -2496,7 +2496,7 @@ class ReplicaManagerTest {
// This time we do not verify
appendRecords(replicaManager, tp0, transactionalRecords, transactionalId = transactionalId)
verify(addPartitionsToTxnManager, times(1)).verifyTransaction(any(), any(), any(), any(), any())
assertEquals(null, getVerificationGuard(replicaManager, tp0, producerId))
assertEquals(VerificationGuard.SENTINEL, getVerificationGuard(replicaManager, tp0, producerId))
assertTrue(replicaManager.localLog(tp0).get.hasOngoingTransaction(producerId))
} finally {
replicaManager.shutdown(checkpointHW = false)

62
storage/src/main/java/org/apache/kafka/storage/internals/log/VerificationGuard.java

@ -0,0 +1,62 @@ @@ -0,0 +1,62 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.storage.internals.log;
import java.util.concurrent.atomic.AtomicLong;
public final class VerificationGuard {
// The sentinel VerificationGuard will be used as a default when no verification guard is provided.
// It can not be used to verify a transaction is ongoing and its value is always 0.
public static final VerificationGuard SENTINEL = new VerificationGuard(0);
private static final AtomicLong INCREMENTING_ID = new AtomicLong(0L);
private final long value;
public VerificationGuard() {
value = INCREMENTING_ID.incrementAndGet();
}
private VerificationGuard(long value) {
this.value = value;
}
@Override
public String toString() {
return "VerificationGuard(value=" + value + ")";
}
@Override
public boolean equals(Object obj) {
if ((null == obj) || (obj.getClass() != this.getClass()))
return false;
VerificationGuard guard = (VerificationGuard) obj;
return value == guard.value();
}
@Override
public int hashCode() {
return Long.hashCode(value);
}
private long value() {
return value;
}
public boolean verify(VerificationGuard verifyingGuard) {
return verifyingGuard != SENTINEL && verifyingGuard.equals(this);
}
}

10
storage/src/main/java/org/apache/kafka/storage/internals/log/VerificationStateEntry.java

@ -18,10 +18,10 @@ package org.apache.kafka.storage.internals.log; @@ -18,10 +18,10 @@ package org.apache.kafka.storage.internals.log;
/**
* This class represents the verification state of a specific producer id.
* It contains a verification guard object that is used to uniquely identify the transaction we want to verify.
* It contains a VerificationGuard that is used to uniquely identify the transaction we want to verify.
* After verifying, we retain this object until we append to the log. This prevents any race conditions where the transaction
* may end via a control marker before we write to the log. This mechanism is used to prevent hanging transactions.
* We remove the verification guard object whenever we write data to the transaction or write an end marker for the transaction.
* We remove the VerificationGuard whenever we write data to the transaction or write an end marker for the transaction.
*
* We also store the lowest seen sequence to block a higher sequence from being written in the case of the lower sequence needing retries.
*
@ -30,13 +30,13 @@ package org.apache.kafka.storage.internals.log; @@ -30,13 +30,13 @@ package org.apache.kafka.storage.internals.log;
public class VerificationStateEntry {
final private long timestamp;
final private Object verificationGuard;
final private VerificationGuard verificationGuard;
private int lowestSequence;
private short epoch;
public VerificationStateEntry(long timestamp, int sequence, short epoch) {
this.timestamp = timestamp;
this.verificationGuard = new Object();
this.verificationGuard = new VerificationGuard();
this.lowestSequence = sequence;
this.epoch = epoch;
}
@ -45,7 +45,7 @@ public class VerificationStateEntry { @@ -45,7 +45,7 @@ public class VerificationStateEntry {
return timestamp;
}
public Object verificationGuard() {
public VerificationGuard verificationGuard() {
return verificationGuard;
}

Loading…
Cancel
Save