Browse Source

KAFKA-15566: Fix flaky tests in FetchRequestTest.scala in KRaft mode (#14573)

Fixed some of the failing tests in FetchRequestTest.

testFetchWithPartitionsWithIdError and testCreateIncrementalFetchWithPartitionsInErrorV12 fail with the following error when enabled with KRaft mode. These tests only fail sometimes when running locally but consistently failed when running in the Jenkins Pipeline.

Tests will call the utility function TestUtils.waitUntilLeaderIsKnown after creating the topic partitions so that they wait for the logs to be created on the leader before sending fetch requests.

Enabled all tests except checkLastFetchedEpochValidation with KRaft mode.
Looking at the build history in Jenkins, all the other tests except these 2 tests and checkLastFetchedEpochValidation were passing when they were enabled with KRaft mode. Therefore enabled them with KRaft mode again but left checkLastFetchedEpochValidation to be investigated further.

Reviewers: Luke Chen <showuon@gmail.com>, dengziming <dengziming1993@gmail.com>
pull/14015/merge
Gantigmaa Selenge 11 months ago committed by GitHub
parent
commit
486d5f6c64
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 31
      core/src/test/scala/unit/kafka/server/FetchRequestTest.scala

31
core/src/test/scala/unit/kafka/server/FetchRequestTest.scala

@ -43,7 +43,7 @@ import scala.util.Random @@ -43,7 +43,7 @@ import scala.util.Random
class FetchRequestTest extends BaseFetchRequestTest {
@ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumName)
@ValueSource(strings = Array("zk"))
@ValueSource(strings = Array("zk","kraft"))
def testBrokerRespectsPartitionsOrderAndSizeLimits(quorum: String): Unit = {
initProducer()
@ -146,7 +146,7 @@ class FetchRequestTest extends BaseFetchRequestTest { @@ -146,7 +146,7 @@ class FetchRequestTest extends BaseFetchRequestTest {
}
@ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumName)
@ValueSource(strings = Array("zk"))
@ValueSource(strings = Array("zk","kraft"))
def testFetchRequestV4WithReadCommitted(quorum: String): Unit = {
initProducer()
val maxPartitionBytes = 200
@ -165,7 +165,7 @@ class FetchRequestTest extends BaseFetchRequestTest { @@ -165,7 +165,7 @@ class FetchRequestTest extends BaseFetchRequestTest {
}
@ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumName)
@ValueSource(strings = Array("zk"))
@ValueSource(strings = Array("zk","kraft"))
def testFetchRequestToNonReplica(quorum: String): Unit = {
val topic = "topic"
val partition = 0
@ -250,13 +250,13 @@ class FetchRequestTest extends BaseFetchRequestTest { @@ -250,13 +250,13 @@ class FetchRequestTest extends BaseFetchRequestTest {
}
@ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumName)
@ValueSource(strings = Array("zk"))
@ValueSource(strings = Array("zk","kraft"))
def testCurrentEpochValidation(quorum: String): Unit = {
checkCurrentEpochValidation(ApiKeys.FETCH.latestVersion())
}
@ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumName)
@ValueSource(strings = Array("zk"))
@ValueSource(strings = Array("zk","kraft"))
def testCurrentEpochValidationV12(quorum: String): Unit = {
checkCurrentEpochValidation(12)
}
@ -300,13 +300,13 @@ class FetchRequestTest extends BaseFetchRequestTest { @@ -300,13 +300,13 @@ class FetchRequestTest extends BaseFetchRequestTest {
}
@ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumName)
@ValueSource(strings = Array("zk"))
@ValueSource(strings = Array("zk","kraft"))
def testEpochValidationWithinFetchSession(quorum: String): Unit = {
checkEpochValidationWithinFetchSession(ApiKeys.FETCH.latestVersion())
}
@ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumName)
@ValueSource(strings = Array("zk"))
@ValueSource(strings = Array("zk","kraft"))
def testEpochValidationWithinFetchSessionV12(quorum: String): Unit = {
checkEpochValidationWithinFetchSession(12)
}
@ -368,7 +368,7 @@ class FetchRequestTest extends BaseFetchRequestTest { @@ -368,7 +368,7 @@ class FetchRequestTest extends BaseFetchRequestTest {
* channel is muted in the server. If buffers are not released this will result in OOM.
*/
@ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumName)
@ValueSource(strings = Array("zk"))
@ValueSource(strings = Array("zk","kraft"))
def testDownConversionWithConnectionFailure(quorum: String): Unit = {
val (topicPartition, leaderId) = createTopics(numTopics = 1, numPartitions = 1).head
val topicIds = getTopicIds().asJava
@ -436,7 +436,7 @@ class FetchRequestTest extends BaseFetchRequestTest { @@ -436,7 +436,7 @@ class FetchRequestTest extends BaseFetchRequestTest {
* some records have to be dropped during the conversion.
*/
@ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumName)
@ValueSource(strings = Array("zk"))
@ValueSource(strings = Array("zk","kraft"))
def testDownConversionFromBatchedToUnbatchedRespectsOffset(quorum: String): Unit = {
// Increase linger so that we have control over the batches created
producer = TestUtils.createProducer(bootstrapServers(),
@ -518,7 +518,7 @@ class FetchRequestTest extends BaseFetchRequestTest { @@ -518,7 +518,7 @@ class FetchRequestTest extends BaseFetchRequestTest {
* This tests using FetchRequests that don't use topic IDs
*/
@ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumName)
@ValueSource(strings = Array("zk"))
@ValueSource(strings = Array("zk","kraft"))
def testCreateIncrementalFetchWithPartitionsInErrorV12(quorum: String): Unit = {
def createConsumerFetchRequest(topicPartitions: Seq[TopicPartition],
metadata: JFetchMetadata,
@ -533,6 +533,8 @@ class FetchRequestTest extends BaseFetchRequestTest { @@ -533,6 +533,8 @@ class FetchRequestTest extends BaseFetchRequestTest {
// topicNames can be empty because we are using old requests
val topicNames = Map[Uuid, String]().asJava
createTopicWithAssignment("foo", Map(0 -> List(0, 1), 1 -> List(0, 2)))
TestUtils.waitUntilLeaderIsKnown(brokers, foo0)
TestUtils.waitUntilLeaderIsKnown(brokers, foo1)
val bar0 = new TopicPartition("bar", 0)
val req1 = createConsumerFetchRequest(List(foo0, foo1, bar0), JFetchMetadata.INITIAL, Nil)
val resp1 = sendFetchRequest(0, req1)
@ -557,6 +559,7 @@ class FetchRequestTest extends BaseFetchRequestTest { @@ -557,6 +559,7 @@ class FetchRequestTest extends BaseFetchRequestTest {
assertTrue(responseData2.containsKey(bar0))
assertEquals(Errors.UNKNOWN_TOPIC_OR_PARTITION.code, responseData2.get(bar0).errorCode)
createTopicWithAssignment("bar", Map(0 -> List(0, 1)))
TestUtils.waitUntilLeaderIsKnown(brokers, bar0)
val req3 = createConsumerFetchRequest(Nil, new JFetchMetadata(resp1.sessionId(), 2), Nil)
val resp3 = sendFetchRequest(0, req3)
assertEquals(Errors.NONE, resp3.error())
@ -578,7 +581,7 @@ class FetchRequestTest extends BaseFetchRequestTest { @@ -578,7 +581,7 @@ class FetchRequestTest extends BaseFetchRequestTest {
* Test that when a Fetch Request receives an unknown topic ID, it returns a top level error.
*/
@ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumName)
@ValueSource(strings = Array("zk"))
@ValueSource(strings = Array("zk","kraft"))
def testFetchWithPartitionsWithIdError(quorum: String): Unit = {
def createConsumerFetchRequest(fetchData: util.LinkedHashMap[TopicPartition, FetchRequest.PartitionData],
metadata: JFetchMetadata,
@ -592,6 +595,8 @@ class FetchRequestTest extends BaseFetchRequestTest { @@ -592,6 +595,8 @@ class FetchRequestTest extends BaseFetchRequestTest {
val foo0 = new TopicPartition("foo", 0)
val foo1 = new TopicPartition("foo", 1)
createTopicWithAssignment("foo", Map(0 -> List(0, 1), 1 -> List(0, 2)))
TestUtils.waitUntilLeaderIsKnown(brokers, foo0)
TestUtils.waitUntilLeaderIsKnown(brokers, foo1)
val topicIds = getTopicIds()
val topicIdsWithUnknown = topicIds ++ Map("bar" -> Uuid.randomUuid())
val bar0 = new TopicPartition("bar", 0)
@ -621,7 +626,7 @@ class FetchRequestTest extends BaseFetchRequestTest { @@ -621,7 +626,7 @@ class FetchRequestTest extends BaseFetchRequestTest {
}
@ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumName)
@ValueSource(strings = Array("zk"))
@ValueSource(strings = Array("zk","kraft"))
def testZStdCompressedTopic(quorum: String): Unit = {
// ZSTD compressed topic
val topicConfig = Map(TopicConfig.COMPRESSION_TYPE_CONFIG -> BrokerCompressionType.ZSTD.name)
@ -669,7 +674,7 @@ class FetchRequestTest extends BaseFetchRequestTest { @@ -669,7 +674,7 @@ class FetchRequestTest extends BaseFetchRequestTest {
}
@ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumName)
@ValueSource(strings = Array("zk"))
@ValueSource(strings = Array("zk","kraft"))
def testZStdCompressedRecords(quorum: String): Unit = {
// Producer compressed topic
val topicConfig = Map(TopicConfig.COMPRESSION_TYPE_CONFIG -> BrokerCompressionType.PRODUCER.name)

Loading…
Cancel
Save