diff --git a/core/src/main/scala/kafka/cluster/Partition.scala b/core/src/main/scala/kafka/cluster/Partition.scala index fc0852f7874..f98133c5a30 100755 --- a/core/src/main/scala/kafka/cluster/Partition.scala +++ b/core/src/main/scala/kafka/cluster/Partition.scala @@ -99,10 +99,6 @@ class DelayedOperations(topicPartition: TopicPartition, deleteRecords.checkAndComplete(requestKey) } - def checkAndCompleteFetch(): Unit = { - fetch.checkAndComplete(TopicPartitionOperationKey(topicPartition)) - } - def numDelayedDelete: Int = deleteRecords.numDelayed } diff --git a/core/src/test/scala/unit/kafka/cluster/PartitionTest.scala b/core/src/test/scala/unit/kafka/cluster/PartitionTest.scala index dc73b4db2f6..00ac2c1dcd7 100644 --- a/core/src/test/scala/unit/kafka/cluster/PartitionTest.scala +++ b/core/src/test/scala/unit/kafka/cluster/PartitionTest.scala @@ -17,9 +17,8 @@ package kafka.cluster import java.nio.ByteBuffer -import java.util.{Optional, Properties} -import java.util.concurrent.{CountDownLatch, Executors, Semaphore, TimeUnit, TimeoutException} -import java.util.concurrent.atomic.AtomicBoolean +import java.util.Optional +import java.util.concurrent.{CountDownLatch, Semaphore} import com.yammer.metrics.core.Metric import kafka.api.{ApiVersion, LeaderAndIsr} @@ -29,24 +28,23 @@ import kafka.metrics.KafkaYammerMetrics import kafka.server._ import kafka.server.checkpoints.OffsetCheckpoints import kafka.utils._ -import org.apache.kafka.common.{IsolationLevel, TopicPartition} import org.apache.kafka.common.errors.{ApiException, NotLeaderOrFollowerException, OffsetNotAvailableException} import org.apache.kafka.common.message.LeaderAndIsrRequestData.LeaderAndIsrPartitionState import org.apache.kafka.common.protocol.Errors import org.apache.kafka.common.record.FileRecords.TimestampAndOffset -import org.apache.kafka.common.utils.SystemTime import org.apache.kafka.common.record._ import org.apache.kafka.common.requests.{EpochEndOffset, ListOffsetRequest} -import org.junit.Test +import org.apache.kafka.common.utils.SystemTime +import org.apache.kafka.common.{IsolationLevel, TopicPartition} import org.junit.Assert._ -import org.mockito.Mockito._ -import org.scalatest.Assertions.assertThrows +import org.junit.Test import org.mockito.ArgumentMatchers +import org.mockito.Mockito._ import org.mockito.invocation.InvocationOnMock +import org.scalatest.Assertions.assertThrows import unit.kafka.cluster.AbstractPartitionTest import scala.jdk.CollectionConverters._ -import scala.collection.mutable.ListBuffer class PartitionTest extends AbstractPartitionTest { @@ -725,8 +723,6 @@ class PartitionTest extends AbstractPartitionTest { val replicas = List[Integer](brokerId, brokerId + 1).asJava val isr = replicas - doNothing().when(delayedOperations).checkAndCompleteFetch() - partition.createLogIfNotExists(isNew = false, isFutureReplica = false, offsetCheckpoints) assertTrue("Expected become leader transition to succeed", @@ -922,102 +918,6 @@ class PartitionTest extends AbstractPartitionTest { assertEquals("ISR", Set[Integer](leader, follower1, follower2), partition.inSyncReplicaIds) } - /** - * Verify that delayed fetch operations which are completed when records are appended don't result in deadlocks. - * Delayed fetch operations acquire Partition leaderIsrUpdate read lock for one or more partitions. So they - * need to be completed after releasing the lock acquired to append records. Otherwise, waiting writers - * (e.g. to check if ISR needs to be shrinked) can trigger deadlock in request handler threads waiting for - * read lock of one Partition while holding on to read lock of another Partition. - */ - @Test - def testDelayedFetchAfterAppendRecords(): Unit = { - val controllerEpoch = 0 - val leaderEpoch = 5 - val replicaIds = List[Integer](brokerId, brokerId + 1).asJava - val isr = replicaIds - val logConfig = LogConfig(new Properties) - - val topicPartitions = (0 until 5).map { i => new TopicPartition("test-topic", i) } - val logs = topicPartitions.map { tp => logManager.getOrCreateLog(tp, () => logConfig) } - val partitions = ListBuffer.empty[Partition] - - logs.foreach { log => - val tp = log.topicPartition - val delayedOperations: DelayedOperations = mock(classOf[DelayedOperations]) - val partition = new Partition(tp, - replicaLagTimeMaxMs = Defaults.ReplicaLagTimeMaxMs, - interBrokerProtocolVersion = ApiVersion.latestVersion, - localBrokerId = brokerId, - time, - stateStore, - delayedOperations, - metadataCache, - logManager) - - when(delayedOperations.checkAndCompleteFetch()) - .thenAnswer((invocation: InvocationOnMock) => { - // Acquire leaderIsrUpdate read lock of a different partition when completing delayed fetch - val anotherPartition = (tp.partition + 1) % topicPartitions.size - val partition = partitions(anotherPartition) - partition.fetchOffsetSnapshot(Optional.of(leaderEpoch), fetchOnlyFromLeader = true) - }) - - partition.setLog(log, isFutureLog = false) - val leaderState = new LeaderAndIsrPartitionState() - .setControllerEpoch(controllerEpoch) - .setLeader(brokerId) - .setLeaderEpoch(leaderEpoch) - .setIsr(isr) - .setZkVersion(1) - .setReplicas(replicaIds) - .setIsNew(true) - partition.makeLeader(leaderState, offsetCheckpoints) - partitions += partition - } - - def createRecords(baseOffset: Long): MemoryRecords = { - val records = List( - new SimpleRecord("k1".getBytes, "v1".getBytes), - new SimpleRecord("k2".getBytes, "v2".getBytes)) - val buf = ByteBuffer.allocate(DefaultRecordBatch.sizeInBytes(records.asJava)) - val builder = MemoryRecords.builder( - buf, RecordBatch.CURRENT_MAGIC_VALUE, CompressionType.NONE, TimestampType.CREATE_TIME, - baseOffset, time.milliseconds, 0) - records.foreach(builder.append) - builder.build() - } - - val done = new AtomicBoolean() - val executor = Executors.newFixedThreadPool(topicPartitions.size + 1) - try { - // Invoke some operation that acquires leaderIsrUpdate write lock on one thread - executor.submit((() => { - while (!done.get) { - partitions.foreach(_.maybeShrinkIsr()) - } - }): Runnable) - // Append records to partitions, one partition-per-thread - val futures = partitions.map { partition => - executor.submit((() => { - (1 to 10000).foreach { _ => - partition.appendRecordsToLeader(createRecords(baseOffset = 0), - origin = AppendOrigin.Client, - requiredAcks = 0) - } - }): Runnable) - } - futures.foreach(_.get(15, TimeUnit.SECONDS)) - done.set(true) - } catch { - case e: TimeoutException => - val allThreads = TestUtils.allThreadStackTraces() - fail(s"Test timed out with exception $e, thread stack traces: $allThreads") - } finally { - executor.shutdownNow() - executor.awaitTermination(5, TimeUnit.SECONDS) - } - } - def createRecords(records: Iterable[SimpleRecord], baseOffset: Long, partitionLeaderEpoch: Int = 0): MemoryRecords = { val buf = ByteBuffer.allocate(DefaultRecordBatch.sizeInBytes(records.asJava)) val builder = MemoryRecords.builder( @@ -1079,8 +979,6 @@ class PartitionTest extends AbstractPartitionTest { val replicas = List[Integer](brokerId, remoteBrokerId).asJava val isr = replicas - doNothing().when(delayedOperations).checkAndCompleteFetch() - partition.createLogIfNotExists(isNew = false, isFutureReplica = false, offsetCheckpoints) val initializeTimeMs = time.milliseconds() @@ -1138,8 +1036,6 @@ class PartitionTest extends AbstractPartitionTest { val replicas = List(brokerId, remoteBrokerId) val isr = List[Integer](brokerId).asJava - doNothing().when(delayedOperations).checkAndCompleteFetch() - partition.createLogIfNotExists(isNew = false, isFutureReplica = false, offsetCheckpoints) assertTrue( "Expected become leader transition to succeed", @@ -1200,8 +1096,6 @@ class PartitionTest extends AbstractPartitionTest { val replicas = List[Integer](brokerId, remoteBrokerId).asJava val isr = List[Integer](brokerId).asJava - doNothing().when(delayedOperations).checkAndCompleteFetch() - partition.createLogIfNotExists(isNew = false, isFutureReplica = false, offsetCheckpoints) assertTrue("Expected become leader transition to succeed", partition.makeLeader( @@ -1251,8 +1145,6 @@ class PartitionTest extends AbstractPartitionTest { val replicas = List(brokerId, remoteBrokerId) val isr = List[Integer](brokerId, remoteBrokerId).asJava - doNothing().when(delayedOperations).checkAndCompleteFetch() - val initializeTimeMs = time.milliseconds() partition.createLogIfNotExists(isNew = false, isFutureReplica = false, offsetCheckpoints) assertTrue( @@ -1305,8 +1197,6 @@ class PartitionTest extends AbstractPartitionTest { val replicas = List(brokerId, remoteBrokerId) val isr = List[Integer](brokerId, remoteBrokerId).asJava - doNothing().when(delayedOperations).checkAndCompleteFetch() - val initializeTimeMs = time.milliseconds() partition.createLogIfNotExists(isNew = false, isFutureReplica = false, offsetCheckpoints) assertTrue( @@ -1374,8 +1264,6 @@ class PartitionTest extends AbstractPartitionTest { val replicas = List(brokerId, remoteBrokerId) val isr = List[Integer](brokerId, remoteBrokerId).asJava - doNothing().when(delayedOperations).checkAndCompleteFetch() - val initializeTimeMs = time.milliseconds() partition.createLogIfNotExists(isNew = false, isFutureReplica = false, offsetCheckpoints) assertTrue( @@ -1429,8 +1317,6 @@ class PartitionTest extends AbstractPartitionTest { val replicas = List[Integer](brokerId, remoteBrokerId).asJava val isr = List[Integer](brokerId, remoteBrokerId).asJava - doNothing().when(delayedOperations).checkAndCompleteFetch() - val initializeTimeMs = time.milliseconds() partition.createLogIfNotExists(isNew = false, isFutureReplica = false, offsetCheckpoints) assertTrue("Expected become leader transition to succeed",