@ -16,7 +16,7 @@
@@ -16,7 +16,7 @@
*/
package kafka.server
import kafka.utils.TestUtils
import kafka.utils. { TestInfoUtils , TestUtils }
import org.apache.kafka.clients.producer.ProducerRecord
import org.apache.kafka.common.config.TopicConfig
import org.apache.kafka.common.protocol. { ApiKeys , Errors }
@ -26,7 +26,8 @@ import org.apache.kafka.common.serialization.{ByteArraySerializer, StringSeriali
@@ -26,7 +26,8 @@ import org.apache.kafka.common.serialization.{ByteArraySerializer, StringSeriali
import org.apache.kafka.common. { IsolationLevel , TopicIdPartition , TopicPartition , Uuid }
import org.apache.kafka.server.record.BrokerCompressionType
import org.junit.jupiter.api.Assertions._
import org.junit.jupiter.api.Test
import org.junit.jupiter.params.ParameterizedTest
import org.junit.jupiter.params.provider.ValueSource
import java.io.DataInputStream
import java.util
@ -41,8 +42,9 @@ import scala.util.Random
@@ -41,8 +42,9 @@ import scala.util.Random
*/
class FetchRequestTest extends BaseFetchRequestTest {
@Test
def testBrokerRespectsPartitionsOrderAndSizeLimits ( ) : Unit = {
@ParameterizedTest ( name = TestInfoUtils . TestWithParameterizedQuorumName )
@ValueSource ( strings = Array ( "zk" , "kraft" ) )
def testBrokerRespectsPartitionsOrderAndSizeLimits ( quorum : String ) : Unit = {
initProducer ( )
val messagesPerPartition = 9
@ -60,7 +62,7 @@ class FetchRequestTest extends BaseFetchRequestTest {
@@ -60,7 +62,7 @@ class FetchRequestTest extends BaseFetchRequestTest {
val topicNames = topicIds . asScala . map ( _ . swap ) . asJava
produceData ( topicPartitions , messagesPerPartition )
val leaderId = serv ers. head . config . brokerId
val leaderId = brok ers. head . config . brokerId
val partitionsForLeader = topicPartitionToLeader . toVector . collect {
case ( tp , partitionLeaderId ) if partitionLeaderId == leaderId => tp
}
@ -143,8 +145,9 @@ class FetchRequestTest extends BaseFetchRequestTest {
@@ -143,8 +145,9 @@ class FetchRequestTest extends BaseFetchRequestTest {
evaluateResponse4 ( fetchResponse4V12 , 12 )
}
@Test
def testFetchRequestV4WithReadCommitted ( ) : Unit = {
@ParameterizedTest ( name = TestInfoUtils . TestWithParameterizedQuorumName )
@ValueSource ( strings = Array ( "zk" , "kraft" ) )
def testFetchRequestV4WithReadCommitted ( quorum : String ) : Unit = {
initProducer ( )
val maxPartitionBytes = 200
val ( topicPartition , leaderId ) = createTopics ( numTopics = 1 , numPartitions = 1 ) . head
@ -161,18 +164,19 @@ class FetchRequestTest extends BaseFetchRequestTest {
@@ -161,18 +164,19 @@ class FetchRequestTest extends BaseFetchRequestTest {
assertTrue ( records ( partitionData ) . map ( _ . sizeInBytes ) . sum > 0 )
}
@Test
def testFetchRequestToNonReplica ( ) : Unit = {
@ParameterizedTest ( name = TestInfoUtils . TestWithParameterizedQuorumName )
@ValueSource ( strings = Array ( "zk" , "kraft" ) )
def testFetchRequestToNonReplica ( quorum : String ) : Unit = {
val topic = "topic"
val partition = 0
val topicPartition = new TopicPartition ( topic , partition )
// Create a single - partition topic and find a broker which is not the leader
val partitionToLeader = TestUtils . createTopic ( zkClient , topic , numPartitions = 1 , 1 , servers )
val partitionToLeader = createTopic ( topic )
val topicIds = getTopicIds ( ) . asJava
val topicNames = topicIds . asScala . map ( _ . swap ) . asJava
val leader = partitionToLeader ( partition )
val nonReplicaOpt = serv ers. find ( _ . config . brokerId != leader )
val nonReplicaOpt = brok ers. find ( _ . config . brokerId != leader )
assertTrue ( nonReplicaOpt . isDefined )
val nonReplicaId = nonReplicaOpt . get . config . brokerId
@ -191,22 +195,24 @@ class FetchRequestTest extends BaseFetchRequestTest {
@@ -191,22 +195,24 @@ class FetchRequestTest extends BaseFetchRequestTest {
assertEquals ( Errors . NOT_LEADER_OR_FOLLOWER . code , oldPartitionData . errorCode )
}
@Test
def testLastFetchedEpochValidation ( ) : Unit = {
@ParameterizedTest ( name = TestInfoUtils . TestWithParameterizedQuorumName )
@ValueSource ( strings = Array ( "zk" , "kraft" ) )
def testLastFetchedEpochValidation ( quorum : String ) : Unit = {
checkLastFetchedEpochValidation ( ApiKeys . FETCH . latestVersion ( ) )
}
@Test
def testLastFetchedEpochValidationV12 ( ) : Unit = {
@ParameterizedTest ( name = TestInfoUtils . TestWithParameterizedQuorumName )
@ValueSource ( strings = Array ( "zk" , "kraft" ) )
def testLastFetchedEpochValidationV12 ( quorum : String ) : Unit = {
checkLastFetchedEpochValidation ( 12 )
}
private def checkLastFetchedEpochValidation ( version : Short ) : Unit = {
val topic = "topic"
val topicPartition = new TopicPartition ( topic , 0 )
val partitionToLeader = TestUtils . createTopic ( zkClient , topic , numPartitions = 1 , replicationFactor = 3 , servers )
val partitionToLeader = createTopic ( topic , replicationFactor = 3 )
val firstLeaderId = partitionToLeader ( topicPartition . partition )
val firstLeaderEpoch = TestUtils . findLeaderEpoch ( firstLeaderId , topicPartition , serv ers)
val firstLeaderEpoch = TestUtils . findLeaderEpoch ( firstLeaderId , topicPartition , brok ers)
initProducer ( )
@ -216,8 +222,8 @@ class FetchRequestTest extends BaseFetchRequestTest {
@@ -216,8 +222,8 @@ class FetchRequestTest extends BaseFetchRequestTest {
// Force a leader change
killBroker ( firstLeaderId )
// Write some more data in epoch 1
val secondLeaderId = TestUtils . awaitLeaderChange ( serv ers, topicPartition , firstLeaderId )
val secondLeaderEpoch = TestUtils . findLeaderEpoch ( secondLeaderId , topicPartition , serv ers)
val secondLeaderId = TestUtils . awaitLeaderChange ( brok ers, topicPartition , firstLeaderId )
val secondLeaderEpoch = TestUtils . findLeaderEpoch ( secondLeaderId , topicPartition , brok ers)
val secondEpochResponses = produceData ( Seq ( topicPartition ) , 100 )
val secondEpochEndOffset = secondEpochResponses . lastOption . get . offset + 1
val topicIds = getTopicIds ( ) . asJava
@ -243,20 +249,22 @@ class FetchRequestTest extends BaseFetchRequestTest {
@@ -243,20 +249,22 @@ class FetchRequestTest extends BaseFetchRequestTest {
assertEquals ( firstEpochEndOffset , divergingEpoch . endOffset )
}
@Test
def testCurrentEpochValidation ( ) : Unit = {
@ParameterizedTest ( name = TestInfoUtils . TestWithParameterizedQuorumName )
@ValueSource ( strings = Array ( "zk" , "kraft" ) )
def testCurrentEpochValidation ( quorum : String ) : Unit = {
checkCurrentEpochValidation ( ApiKeys . FETCH . latestVersion ( ) )
}
@Test
def testCurrentEpochValidationV12 ( ) : Unit = {
@ParameterizedTest ( name = TestInfoUtils . TestWithParameterizedQuorumName )
@ValueSource ( strings = Array ( "zk" , "kraft" ) )
def testCurrentEpochValidationV12 ( quorum : String ) : Unit = {
checkCurrentEpochValidation ( 12 )
}
private def checkCurrentEpochValidation ( version : Short ) : Unit = {
val topic = "topic"
val topicPartition = new TopicPartition ( topic , 0 )
val partitionToLeader = TestUtils . createTopic ( zkClient , topic , numPartitions = 1 , replicationFactor = 3 , servers )
val partitionToLeader = createTopic ( topic , replicationFactor = 3 )
val firstLeaderId = partitionToLeader ( topicPartition . partition )
def assertResponseErrorForEpoch ( error : Errors , brokerId : Int , leaderEpoch : Optional [ Integer ] ) : Unit = {
@ -276,46 +284,48 @@ class FetchRequestTest extends BaseFetchRequestTest {
@@ -276,46 +284,48 @@ class FetchRequestTest extends BaseFetchRequestTest {
killBroker ( firstLeaderId )
// Check leader error codes
val secondLeaderId = TestUtils . awaitLeaderChange ( serv ers, topicPartition , firstLeaderId )
val secondLeaderEpoch = TestUtils . findLeaderEpoch ( secondLeaderId , topicPartition , serv ers)
val secondLeaderId = TestUtils . awaitLeaderChange ( brok ers, topicPartition , firstLeaderId )
val secondLeaderEpoch = TestUtils . findLeaderEpoch ( secondLeaderId , topicPartition , brok ers)
assertResponseErrorForEpoch ( Errors . NONE , secondLeaderId , Optional . empty ( ) )
assertResponseErrorForEpoch ( Errors . NONE , secondLeaderId , Optional . of ( secondLeaderEpoch ) )
assertResponseErrorForEpoch ( Errors . FENCED_LEADER_EPOCH , secondLeaderId , Optional . of ( secondLeaderEpoch - 1 ) )
assertResponseErrorForEpoch ( Errors . UNKNOWN_LEADER_EPOCH , secondLeaderId , Optional . of ( secondLeaderEpoch + 1 ) )
// Check follower error codes
val followerId = TestUtils . findFollowerId ( topicPartition , serv ers)
val followerId = TestUtils . findFollowerId ( topicPartition , brok ers)
assertResponseErrorForEpoch ( Errors . NONE , followerId , Optional . empty ( ) )
assertResponseErrorForEpoch ( Errors . NONE , followerId , Optional . of ( secondLeaderEpoch ) )
assertResponseErrorForEpoch ( Errors . UNKNOWN_LEADER_EPOCH , followerId , Optional . of ( secondLeaderEpoch + 1 ) )
assertResponseErrorForEpoch ( Errors . FENCED_LEADER_EPOCH , followerId , Optional . of ( secondLeaderEpoch - 1 ) )
}
@Test
def testEpochValidationWithinFetchSession ( ) : Unit = {
@ParameterizedTest ( name = TestInfoUtils . TestWithParameterizedQuorumName )
@ValueSource ( strings = Array ( "zk" , "kraft" ) )
def testEpochValidationWithinFetchSession ( quorum : String ) : Unit = {
checkEpochValidationWithinFetchSession ( ApiKeys . FETCH . latestVersion ( ) )
}
@Test
def testEpochValidationWithinFetchSessionV12 ( ) : Unit = {
@ParameterizedTest ( name = TestInfoUtils . TestWithParameterizedQuorumName )
@ValueSource ( strings = Array ( "zk" , "kraft" ) )
def testEpochValidationWithinFetchSessionV12 ( quorum : String ) : Unit = {
checkEpochValidationWithinFetchSession ( 12 )
}
private def checkEpochValidationWithinFetchSession ( version : Short ) : Unit = {
val topic = "topic"
val topicPartition = new TopicPartition ( topic , 0 )
val partitionToLeader = TestUtils . createTopic ( zkClient , topic , numPartitions = 1 , replicationFactor = 3 , servers )
val partitionToLeader = createTopic ( topic , replicationFactor = 3 )
val firstLeaderId = partitionToLeader ( topicPartition . partition )
// We need a leader change in order to check epoch fencing since the first epoch is 0 and
// - 1 is treated as having no epoch at all
killBroker ( firstLeaderId )
val secondLeaderId = TestUtils . awaitLeaderChange ( serv ers, topicPartition , firstLeaderId )
val secondLeaderEpoch = TestUtils . findLeaderEpoch ( secondLeaderId , topicPartition , serv ers)
val secondLeaderId = TestUtils . awaitLeaderChange ( brok ers, topicPartition , firstLeaderId )
val secondLeaderEpoch = TestUtils . findLeaderEpoch ( secondLeaderId , topicPartition , brok ers)
verifyFetchSessionErrors ( topicPartition , secondLeaderEpoch , secondLeaderId , version )
val followerId = TestUtils . findFollowerId ( topicPartition , serv ers)
val followerId = TestUtils . findFollowerId ( topicPartition , brok ers)
verifyFetchSessionErrors ( topicPartition , secondLeaderEpoch , followerId , version )
}
@ -357,8 +367,9 @@ class FetchRequestTest extends BaseFetchRequestTest {
@@ -357,8 +367,9 @@ class FetchRequestTest extends BaseFetchRequestTest {
* in the server . The client closes its connection after reading partial data when the
* channel is muted in the server . If buffers are not released this will result in OOM .
*/
@Test
def testDownConversionWithConnectionFailure ( ) : Unit = {
@ParameterizedTest ( name = TestInfoUtils . TestWithParameterizedQuorumName )
@ValueSource ( strings = Array ( "zk" , "kraft" ) )
def testDownConversionWithConnectionFailure ( quorum : String ) : Unit = {
val ( topicPartition , leaderId ) = createTopics ( numTopics = 1 , numPartitions = 1 ) . head
val topicIds = getTopicIds ( ) . asJava
val topicNames = topicIds . asScala . map ( _ . swap ) . asJava
@ -424,8 +435,9 @@ class FetchRequestTest extends BaseFetchRequestTest {
@@ -424,8 +435,9 @@ class FetchRequestTest extends BaseFetchRequestTest {
* record batch to multiple v0 / v1 record batches with size 1. If the fetch offset points to inside the record batch ,
* some records have to be dropped during the conversion .
*/
@Test
def testDownConversionFromBatchedToUnbatchedRespectsOffset ( ) : Unit = {
@ParameterizedTest ( name = TestInfoUtils . TestWithParameterizedQuorumName )
@ValueSource ( strings = Array ( "zk" , "kraft" ) )
def testDownConversionFromBatchedToUnbatchedRespectsOffset ( quorum : String ) : Unit = {
// Increase linger so that we have control over the batches created
producer = TestUtils . createProducer ( bootstrapServers ( ) ,
retries = 5 ,
@ -505,8 +517,9 @@ class FetchRequestTest extends BaseFetchRequestTest {
@@ -505,8 +517,9 @@ class FetchRequestTest extends BaseFetchRequestTest {
* those partitions are returned in all incremental fetch requests .
* This tests using FetchRequests that don 't use topic IDs
*/
@Test
def testCreateIncrementalFetchWithPartitionsInErrorV12 ( ) : Unit = {
@ParameterizedTest ( name = TestInfoUtils . TestWithParameterizedQuorumName )
@ValueSource ( strings = Array ( "zk" , "kraft" ) )
def testCreateIncrementalFetchWithPartitionsInErrorV12 ( quorum : String ) : Unit = {
def createConsumerFetchRequest ( topicPartitions : Seq [ TopicPartition ] ,
metadata : JFetchMetadata ,
toForget : Seq [ TopicIdPartition ] ) : FetchRequest =
@ -564,8 +577,9 @@ class FetchRequestTest extends BaseFetchRequestTest {
@@ -564,8 +577,9 @@ class FetchRequestTest extends BaseFetchRequestTest {
/* *
* Test that when a Fetch Request receives an unknown topic ID , it returns a top level error .
*/
@Test
def testFetchWithPartitionsWithIdError ( ) : Unit = {
@ParameterizedTest ( name = TestInfoUtils . TestWithParameterizedQuorumName )
@ValueSource ( strings = Array ( "zk" , "kraft" ) )
def testFetchWithPartitionsWithIdError ( quorum : String ) : Unit = {
def createConsumerFetchRequest ( fetchData : util.LinkedHashMap [ TopicPartition , FetchRequest . PartitionData ] ,
metadata : JFetchMetadata ,
toForget : Seq [ TopicIdPartition ] ) : FetchRequest = {
@ -606,8 +620,9 @@ class FetchRequestTest extends BaseFetchRequestTest {
@@ -606,8 +620,9 @@ class FetchRequestTest extends BaseFetchRequestTest {
assertEquals ( Errors . UNKNOWN_TOPIC_ID . code , responseData1 . get ( bar0 ) . errorCode )
}
@Test
def testZStdCompressedTopic ( ) : Unit = {
@ParameterizedTest ( name = TestInfoUtils . TestWithParameterizedQuorumName )
@ValueSource ( strings = Array ( "zk" , "kraft" ) )
def testZStdCompressedTopic ( quorum : String ) : Unit = {
// ZSTD compressed topic
val topicConfig = Map ( TopicConfig . COMPRESSION_TYPE_CONFIG -> BrokerCompressionType . ZSTD . name )
val ( topicPartition , leaderId ) = createTopics ( numTopics = 1 , numPartitions = 1 , configs = topicConfig ) . head
@ -653,8 +668,9 @@ class FetchRequestTest extends BaseFetchRequestTest {
@@ -653,8 +668,9 @@ class FetchRequestTest extends BaseFetchRequestTest {
assertEquals ( 3 , records ( data2 ) . size )
}
@Test
def testZStdCompressedRecords ( ) : Unit = {
@ParameterizedTest ( name = TestInfoUtils . TestWithParameterizedQuorumName )
@ValueSource ( strings = Array ( "zk" , "kraft" ) )
def testZStdCompressedRecords ( quorum : String ) : Unit = {
// Producer compressed topic
val topicConfig = Map ( TopicConfig . COMPRESSION_TYPE_CONFIG -> BrokerCompressionType . PRODUCER . name )
val ( topicPartition , leaderId ) = createTopics ( numTopics = 1 , numPartitions = 1 , configs = topicConfig ) . head