Browse Source

MINOR: remove DelayedOperations.checkAndCompleteFetch (#9278)

Reviewers: Jun Rao <junrao@gmail.com>
pull/9231/head
Chia-Ping Tsai 4 years ago committed by GitHub
parent
commit
6bba661e24
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 4
      core/src/main/scala/kafka/cluster/Partition.scala
  2. 128
      core/src/test/scala/unit/kafka/cluster/PartitionTest.scala

4
core/src/main/scala/kafka/cluster/Partition.scala

@ -99,10 +99,6 @@ class DelayedOperations(topicPartition: TopicPartition,
deleteRecords.checkAndComplete(requestKey) deleteRecords.checkAndComplete(requestKey)
} }
def checkAndCompleteFetch(): Unit = {
fetch.checkAndComplete(TopicPartitionOperationKey(topicPartition))
}
def numDelayedDelete: Int = deleteRecords.numDelayed def numDelayedDelete: Int = deleteRecords.numDelayed
} }

128
core/src/test/scala/unit/kafka/cluster/PartitionTest.scala

@ -17,9 +17,8 @@
package kafka.cluster package kafka.cluster
import java.nio.ByteBuffer import java.nio.ByteBuffer
import java.util.{Optional, Properties} import java.util.Optional
import java.util.concurrent.{CountDownLatch, Executors, Semaphore, TimeUnit, TimeoutException} import java.util.concurrent.{CountDownLatch, Semaphore}
import java.util.concurrent.atomic.AtomicBoolean
import com.yammer.metrics.core.Metric import com.yammer.metrics.core.Metric
import kafka.api.{ApiVersion, LeaderAndIsr} import kafka.api.{ApiVersion, LeaderAndIsr}
@ -29,24 +28,23 @@ import kafka.metrics.KafkaYammerMetrics
import kafka.server._ import kafka.server._
import kafka.server.checkpoints.OffsetCheckpoints import kafka.server.checkpoints.OffsetCheckpoints
import kafka.utils._ import kafka.utils._
import org.apache.kafka.common.{IsolationLevel, TopicPartition}
import org.apache.kafka.common.errors.{ApiException, NotLeaderOrFollowerException, OffsetNotAvailableException} import org.apache.kafka.common.errors.{ApiException, NotLeaderOrFollowerException, OffsetNotAvailableException}
import org.apache.kafka.common.message.LeaderAndIsrRequestData.LeaderAndIsrPartitionState import org.apache.kafka.common.message.LeaderAndIsrRequestData.LeaderAndIsrPartitionState
import org.apache.kafka.common.protocol.Errors import org.apache.kafka.common.protocol.Errors
import org.apache.kafka.common.record.FileRecords.TimestampAndOffset import org.apache.kafka.common.record.FileRecords.TimestampAndOffset
import org.apache.kafka.common.utils.SystemTime
import org.apache.kafka.common.record._ import org.apache.kafka.common.record._
import org.apache.kafka.common.requests.{EpochEndOffset, ListOffsetRequest} import org.apache.kafka.common.requests.{EpochEndOffset, ListOffsetRequest}
import org.junit.Test import org.apache.kafka.common.utils.SystemTime
import org.apache.kafka.common.{IsolationLevel, TopicPartition}
import org.junit.Assert._ import org.junit.Assert._
import org.mockito.Mockito._ import org.junit.Test
import org.scalatest.Assertions.assertThrows
import org.mockito.ArgumentMatchers import org.mockito.ArgumentMatchers
import org.mockito.Mockito._
import org.mockito.invocation.InvocationOnMock import org.mockito.invocation.InvocationOnMock
import org.scalatest.Assertions.assertThrows
import unit.kafka.cluster.AbstractPartitionTest import unit.kafka.cluster.AbstractPartitionTest
import scala.jdk.CollectionConverters._ import scala.jdk.CollectionConverters._
import scala.collection.mutable.ListBuffer
class PartitionTest extends AbstractPartitionTest { class PartitionTest extends AbstractPartitionTest {
@ -725,8 +723,6 @@ class PartitionTest extends AbstractPartitionTest {
val replicas = List[Integer](brokerId, brokerId + 1).asJava val replicas = List[Integer](brokerId, brokerId + 1).asJava
val isr = replicas val isr = replicas
doNothing().when(delayedOperations).checkAndCompleteFetch()
partition.createLogIfNotExists(isNew = false, isFutureReplica = false, offsetCheckpoints) partition.createLogIfNotExists(isNew = false, isFutureReplica = false, offsetCheckpoints)
assertTrue("Expected become leader transition to succeed", assertTrue("Expected become leader transition to succeed",
@ -922,102 +918,6 @@ class PartitionTest extends AbstractPartitionTest {
assertEquals("ISR", Set[Integer](leader, follower1, follower2), partition.inSyncReplicaIds) assertEquals("ISR", Set[Integer](leader, follower1, follower2), partition.inSyncReplicaIds)
} }
/**
* Verify that delayed fetch operations which are completed when records are appended don't result in deadlocks.
* Delayed fetch operations acquire Partition leaderIsrUpdate read lock for one or more partitions. So they
* need to be completed after releasing the lock acquired to append records. Otherwise, waiting writers
* (e.g. to check if ISR needs to be shrinked) can trigger deadlock in request handler threads waiting for
* read lock of one Partition while holding on to read lock of another Partition.
*/
@Test
def testDelayedFetchAfterAppendRecords(): Unit = {
val controllerEpoch = 0
val leaderEpoch = 5
val replicaIds = List[Integer](brokerId, brokerId + 1).asJava
val isr = replicaIds
val logConfig = LogConfig(new Properties)
val topicPartitions = (0 until 5).map { i => new TopicPartition("test-topic", i) }
val logs = topicPartitions.map { tp => logManager.getOrCreateLog(tp, () => logConfig) }
val partitions = ListBuffer.empty[Partition]
logs.foreach { log =>
val tp = log.topicPartition
val delayedOperations: DelayedOperations = mock(classOf[DelayedOperations])
val partition = new Partition(tp,
replicaLagTimeMaxMs = Defaults.ReplicaLagTimeMaxMs,
interBrokerProtocolVersion = ApiVersion.latestVersion,
localBrokerId = brokerId,
time,
stateStore,
delayedOperations,
metadataCache,
logManager)
when(delayedOperations.checkAndCompleteFetch())
.thenAnswer((invocation: InvocationOnMock) => {
// Acquire leaderIsrUpdate read lock of a different partition when completing delayed fetch
val anotherPartition = (tp.partition + 1) % topicPartitions.size
val partition = partitions(anotherPartition)
partition.fetchOffsetSnapshot(Optional.of(leaderEpoch), fetchOnlyFromLeader = true)
})
partition.setLog(log, isFutureLog = false)
val leaderState = new LeaderAndIsrPartitionState()
.setControllerEpoch(controllerEpoch)
.setLeader(brokerId)
.setLeaderEpoch(leaderEpoch)
.setIsr(isr)
.setZkVersion(1)
.setReplicas(replicaIds)
.setIsNew(true)
partition.makeLeader(leaderState, offsetCheckpoints)
partitions += partition
}
def createRecords(baseOffset: Long): MemoryRecords = {
val records = List(
new SimpleRecord("k1".getBytes, "v1".getBytes),
new SimpleRecord("k2".getBytes, "v2".getBytes))
val buf = ByteBuffer.allocate(DefaultRecordBatch.sizeInBytes(records.asJava))
val builder = MemoryRecords.builder(
buf, RecordBatch.CURRENT_MAGIC_VALUE, CompressionType.NONE, TimestampType.CREATE_TIME,
baseOffset, time.milliseconds, 0)
records.foreach(builder.append)
builder.build()
}
val done = new AtomicBoolean()
val executor = Executors.newFixedThreadPool(topicPartitions.size + 1)
try {
// Invoke some operation that acquires leaderIsrUpdate write lock on one thread
executor.submit((() => {
while (!done.get) {
partitions.foreach(_.maybeShrinkIsr())
}
}): Runnable)
// Append records to partitions, one partition-per-thread
val futures = partitions.map { partition =>
executor.submit((() => {
(1 to 10000).foreach { _ =>
partition.appendRecordsToLeader(createRecords(baseOffset = 0),
origin = AppendOrigin.Client,
requiredAcks = 0)
}
}): Runnable)
}
futures.foreach(_.get(15, TimeUnit.SECONDS))
done.set(true)
} catch {
case e: TimeoutException =>
val allThreads = TestUtils.allThreadStackTraces()
fail(s"Test timed out with exception $e, thread stack traces: $allThreads")
} finally {
executor.shutdownNow()
executor.awaitTermination(5, TimeUnit.SECONDS)
}
}
def createRecords(records: Iterable[SimpleRecord], baseOffset: Long, partitionLeaderEpoch: Int = 0): MemoryRecords = { def createRecords(records: Iterable[SimpleRecord], baseOffset: Long, partitionLeaderEpoch: Int = 0): MemoryRecords = {
val buf = ByteBuffer.allocate(DefaultRecordBatch.sizeInBytes(records.asJava)) val buf = ByteBuffer.allocate(DefaultRecordBatch.sizeInBytes(records.asJava))
val builder = MemoryRecords.builder( val builder = MemoryRecords.builder(
@ -1079,8 +979,6 @@ class PartitionTest extends AbstractPartitionTest {
val replicas = List[Integer](brokerId, remoteBrokerId).asJava val replicas = List[Integer](brokerId, remoteBrokerId).asJava
val isr = replicas val isr = replicas
doNothing().when(delayedOperations).checkAndCompleteFetch()
partition.createLogIfNotExists(isNew = false, isFutureReplica = false, offsetCheckpoints) partition.createLogIfNotExists(isNew = false, isFutureReplica = false, offsetCheckpoints)
val initializeTimeMs = time.milliseconds() val initializeTimeMs = time.milliseconds()
@ -1138,8 +1036,6 @@ class PartitionTest extends AbstractPartitionTest {
val replicas = List(brokerId, remoteBrokerId) val replicas = List(brokerId, remoteBrokerId)
val isr = List[Integer](brokerId).asJava val isr = List[Integer](brokerId).asJava
doNothing().when(delayedOperations).checkAndCompleteFetch()
partition.createLogIfNotExists(isNew = false, isFutureReplica = false, offsetCheckpoints) partition.createLogIfNotExists(isNew = false, isFutureReplica = false, offsetCheckpoints)
assertTrue( assertTrue(
"Expected become leader transition to succeed", "Expected become leader transition to succeed",
@ -1200,8 +1096,6 @@ class PartitionTest extends AbstractPartitionTest {
val replicas = List[Integer](brokerId, remoteBrokerId).asJava val replicas = List[Integer](brokerId, remoteBrokerId).asJava
val isr = List[Integer](brokerId).asJava val isr = List[Integer](brokerId).asJava
doNothing().when(delayedOperations).checkAndCompleteFetch()
partition.createLogIfNotExists(isNew = false, isFutureReplica = false, offsetCheckpoints) partition.createLogIfNotExists(isNew = false, isFutureReplica = false, offsetCheckpoints)
assertTrue("Expected become leader transition to succeed", assertTrue("Expected become leader transition to succeed",
partition.makeLeader( partition.makeLeader(
@ -1251,8 +1145,6 @@ class PartitionTest extends AbstractPartitionTest {
val replicas = List(brokerId, remoteBrokerId) val replicas = List(brokerId, remoteBrokerId)
val isr = List[Integer](brokerId, remoteBrokerId).asJava val isr = List[Integer](brokerId, remoteBrokerId).asJava
doNothing().when(delayedOperations).checkAndCompleteFetch()
val initializeTimeMs = time.milliseconds() val initializeTimeMs = time.milliseconds()
partition.createLogIfNotExists(isNew = false, isFutureReplica = false, offsetCheckpoints) partition.createLogIfNotExists(isNew = false, isFutureReplica = false, offsetCheckpoints)
assertTrue( assertTrue(
@ -1305,8 +1197,6 @@ class PartitionTest extends AbstractPartitionTest {
val replicas = List(brokerId, remoteBrokerId) val replicas = List(brokerId, remoteBrokerId)
val isr = List[Integer](brokerId, remoteBrokerId).asJava val isr = List[Integer](brokerId, remoteBrokerId).asJava
doNothing().when(delayedOperations).checkAndCompleteFetch()
val initializeTimeMs = time.milliseconds() val initializeTimeMs = time.milliseconds()
partition.createLogIfNotExists(isNew = false, isFutureReplica = false, offsetCheckpoints) partition.createLogIfNotExists(isNew = false, isFutureReplica = false, offsetCheckpoints)
assertTrue( assertTrue(
@ -1374,8 +1264,6 @@ class PartitionTest extends AbstractPartitionTest {
val replicas = List(brokerId, remoteBrokerId) val replicas = List(brokerId, remoteBrokerId)
val isr = List[Integer](brokerId, remoteBrokerId).asJava val isr = List[Integer](brokerId, remoteBrokerId).asJava
doNothing().when(delayedOperations).checkAndCompleteFetch()
val initializeTimeMs = time.milliseconds() val initializeTimeMs = time.milliseconds()
partition.createLogIfNotExists(isNew = false, isFutureReplica = false, offsetCheckpoints) partition.createLogIfNotExists(isNew = false, isFutureReplica = false, offsetCheckpoints)
assertTrue( assertTrue(
@ -1429,8 +1317,6 @@ class PartitionTest extends AbstractPartitionTest {
val replicas = List[Integer](brokerId, remoteBrokerId).asJava val replicas = List[Integer](brokerId, remoteBrokerId).asJava
val isr = List[Integer](brokerId, remoteBrokerId).asJava val isr = List[Integer](brokerId, remoteBrokerId).asJava
doNothing().when(delayedOperations).checkAndCompleteFetch()
val initializeTimeMs = time.milliseconds() val initializeTimeMs = time.milliseconds()
partition.createLogIfNotExists(isNew = false, isFutureReplica = false, offsetCheckpoints) partition.createLogIfNotExists(isNew = false, isFutureReplica = false, offsetCheckpoints)
assertTrue("Expected become leader transition to succeed", assertTrue("Expected become leader transition to succeed",

Loading…
Cancel
Save