Browse Source

KAFKA-9101: Create a fetch.max.bytes configuration for the broker (#7595)

Create a fetch.max.bytes configuration for the broker as described by KIP-541.

Reviewers: Gwen Shapira <gwen@confluent.io>
pull/7609/merge
Colin Patrick McCabe 5 years ago committed by GitHub
parent
commit
1b08fceb7a
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 6
      core/src/main/scala/kafka/server/KafkaApis.scala
  2. 15
      core/src/main/scala/kafka/server/KafkaConfig.scala
  3. 135
      core/src/test/scala/unit/kafka/server/FetchRequestMaxBytesTest.scala
  4. 4
      core/src/test/scala/unit/kafka/server/FetchRequestTest.scala

6
core/src/main/scala/kafka/server/KafkaApis.scala

@ -795,6 +795,8 @@ class KafkaApis(val requestChannel: RequestChannel, @@ -795,6 +795,8 @@ class KafkaApis(val requestChannel: RequestChannel,
}
}
val fetchMaxBytes = Math.min(fetchRequest.maxBytes, config.fetchMaxBytes)
val fetchMinBytes = Math.min(fetchRequest.minBytes, fetchMaxBytes)
if (interesting.isEmpty)
processResponseCallback(Seq.empty)
else {
@ -802,8 +804,8 @@ class KafkaApis(val requestChannel: RequestChannel, @@ -802,8 +804,8 @@ class KafkaApis(val requestChannel: RequestChannel,
replicaManager.fetchMessages(
fetchRequest.maxWait.toLong,
fetchRequest.replicaId,
fetchRequest.minBytes,
fetchRequest.maxBytes,
fetchMinBytes,
fetchMaxBytes,
versionId <= 2,
interesting,
replicationQuota(fetchRequest),

15
core/src/main/scala/kafka/server/KafkaConfig.scala

@ -182,8 +182,9 @@ object Defaults { @@ -182,8 +182,9 @@ object Defaults {
val TransactionsAbortTimedOutTransactionsCleanupIntervalMS = TransactionStateManager.DefaultAbortTimedOutTransactionsIntervalMs
val TransactionsRemoveExpiredTransactionsCleanupIntervalMS = TransactionStateManager.DefaultRemoveExpiredTransactionalIdsIntervalMs
/** ********* Fetch Session Configuration **************/
/** ********* Fetch Configuration **************/
val MaxIncrementalFetchSessionCacheSlots = 1000
val FetchMaxBytes = 55 * 1024 * 1024
/** ********* Quota Configuration ***********/
val ProducerQuotaBytesPerSecondDefault = ClientQuotaManagerConfig.QuotaBytesPerSecondDefault
@ -405,8 +406,9 @@ object KafkaConfig { @@ -405,8 +406,9 @@ object KafkaConfig {
val TransactionsAbortTimedOutTransactionCleanupIntervalMsProp = "transaction.abort.timed.out.transaction.cleanup.interval.ms"
val TransactionsRemoveExpiredTransactionalIdCleanupIntervalMsProp = "transaction.remove.expired.transaction.cleanup.interval.ms"
/** ********* Fetch Session Configuration **************/
/** ********* Fetch Configuration **************/
val MaxIncrementalFetchSessionCacheSlots = "max.incremental.fetch.session.cache.slots"
val FetchMaxBytes = "fetch.max.bytes"
/** ********* Quota Configuration ***********/
val ProducerQuotaBytesPerSecondDefaultProp = "quota.producer.default"
@ -745,8 +747,9 @@ object KafkaConfig { @@ -745,8 +747,9 @@ object KafkaConfig {
val TransactionsAbortTimedOutTransactionsIntervalMsDoc = "The interval at which to rollback transactions that have timed out"
val TransactionsRemoveExpiredTransactionsIntervalMsDoc = "The interval at which to remove transactions that have expired due to <code>transactional.id.expiration.ms</code> passing"
/** ********* Fetch Session Configuration **************/
/** ********* Fetch Configuration **************/
val MaxIncrementalFetchSessionCacheSlotsDoc = "The maximum number of incremental fetch sessions that we will maintain."
val FetchMaxBytesDoc = "The maximum number of bytes we will return for a fetch request. Must be at least 1024."
/** ********* Quota Configuration ***********/
val ProducerQuotaBytesPerSecondDefaultDoc = "DEPRECATED: Used only when dynamic default quotas are not configured for <user>, <client-id> or <user, client-id> in Zookeeper. " +
@ -1010,8 +1013,9 @@ object KafkaConfig { @@ -1010,8 +1013,9 @@ object KafkaConfig {
.define(TransactionsAbortTimedOutTransactionCleanupIntervalMsProp, INT, Defaults.TransactionsAbortTimedOutTransactionsCleanupIntervalMS, atLeast(1), LOW, TransactionsAbortTimedOutTransactionsIntervalMsDoc)
.define(TransactionsRemoveExpiredTransactionalIdCleanupIntervalMsProp, INT, Defaults.TransactionsRemoveExpiredTransactionsCleanupIntervalMS, atLeast(1), LOW, TransactionsRemoveExpiredTransactionsIntervalMsDoc)
/** ********* Fetch Session Configuration **************/
/** ********* Fetch Configuration **************/
.define(MaxIncrementalFetchSessionCacheSlots, INT, Defaults.MaxIncrementalFetchSessionCacheSlots, atLeast(0), MEDIUM, MaxIncrementalFetchSessionCacheSlotsDoc)
.define(FetchMaxBytes, INT, Defaults.FetchMaxBytes, atLeast(1024), MEDIUM, FetchMaxBytesDoc)
/** ********* Kafka Metrics Configuration ***********/
.define(MetricNumSamplesProp, INT, Defaults.MetricNumSamples, atLeast(1), LOW, MetricNumSamplesDoc)
@ -1358,8 +1362,9 @@ class KafkaConfig(val props: java.util.Map[_, _], doLog: Boolean, dynamicConfigO @@ -1358,8 +1362,9 @@ class KafkaConfig(val props: java.util.Map[_, _], doLog: Boolean, dynamicConfigO
val numAlterLogDirsReplicationQuotaSamples = getInt(KafkaConfig.NumAlterLogDirsReplicationQuotaSamplesProp)
val alterLogDirsReplicationQuotaWindowSizeSeconds = getInt(KafkaConfig.AlterLogDirsReplicationQuotaWindowSizeSecondsProp)
/** ********* Fetch Session Configuration **************/
/** ********* Fetch Configuration **************/
val maxIncrementalFetchSessionCacheSlots = getInt(KafkaConfig.MaxIncrementalFetchSessionCacheSlots)
val fetchMaxBytes = getInt(KafkaConfig.FetchMaxBytes)
val deleteTopicEnable = getBoolean(KafkaConfig.DeleteTopicEnableProp)
def compressionType = getString(KafkaConfig.CompressionTypeProp)

135
core/src/test/scala/unit/kafka/server/FetchRequestMaxBytesTest.scala

@ -0,0 +1,135 @@ @@ -0,0 +1,135 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package kafka.server
import java.util.{Optional, Properties}
import kafka.log.LogConfig
import kafka.utils.TestUtils
import org.apache.kafka.clients.consumer.KafkaConsumer
import org.apache.kafka.clients.producer.{KafkaProducer, ProducerRecord}
import org.apache.kafka.common.TopicPartition
import org.apache.kafka.common.protocol.ApiKeys
import org.apache.kafka.common.record.MemoryRecords
import org.apache.kafka.common.requests.{FetchRequest, FetchResponse}
import org.apache.kafka.common.requests.FetchRequest.PartitionData
import org.junit.{Assert, Test}
import scala.collection.JavaConverters._
/**
* This test verifies that the KIP-541 broker-level FetchMaxBytes configuration is honored.
*/
class FetchRequestMaxBytesTest extends BaseRequestTest {
override def brokerCount: Int = 1
private var producer: KafkaProducer[Array[Byte], Array[Byte]] = null
private val testTopic = "testTopic"
private val testTopicPartition = new TopicPartition(testTopic, 0)
private val messages = IndexedSeq(
multiByteArray(1),
multiByteArray(500),
multiByteArray(1040),
multiByteArray(500),
multiByteArray(50))
private def multiByteArray(length: Int): Array[Byte] = {
val array = new Array[Byte](length)
array.indices.foreach(i => array(i) = (i % 5).toByte)
array
}
private def oneByteArray(value: Byte): Array[Byte] = {
val array = new Array[Byte](1)
array(0) = value
array
}
override def setUp(): Unit = {
super.setUp()
producer = TestUtils.createProducer(TestUtils.getBrokerListStrFromServers(servers))
}
override def tearDown(): Unit = {
if (producer != null)
producer.close()
super.tearDown()
}
override protected def brokerPropertyOverrides(properties: Properties): Unit = {
super.brokerPropertyOverrides(properties)
properties.put(KafkaConfig.FetchMaxBytes, "1024")
}
private def createTopics(): Unit = {
val topicConfig = new Properties
topicConfig.setProperty(LogConfig.MinInSyncReplicasProp, 1.toString)
createTopic(testTopic,
numPartitions = 1,
replicationFactor = 1,
topicConfig = topicConfig)
// Produce several messages as single batches.
messages.indices.foreach(i => {
val record = new ProducerRecord(testTopic, 0, oneByteArray(i.toByte), messages(i))
val future = producer.send(record)
producer.flush()
future.get()
})
}
private def sendFetchRequest(leaderId: Int, request: FetchRequest): FetchResponse[MemoryRecords] = {
val response = connectAndSend(request, ApiKeys.FETCH, destination = brokerSocketServer(leaderId))
FetchResponse.parse(response, request.version)
}
/**
* Tests that each of our fetch requests respects FetchMaxBytes.
*
* Note that when a single batch is larger than FetchMaxBytes, it will be
* returned in full even if this is larger than FetchMaxBytes. See KIP-74.
*/
@Test
def testConsumeMultipleRecords(): Unit = {
createTopics()
expectNextRecords(IndexedSeq(messages(0), messages(1)), 0)
expectNextRecords(IndexedSeq(messages(2)), 2)
expectNextRecords(IndexedSeq(messages(3), messages(4)), 3)
}
private def expectNextRecords(expected: IndexedSeq[Array[Byte]],
fetchOffset: Long): Unit = {
val response = sendFetchRequest(0,
FetchRequest.Builder.forConsumer(Int.MaxValue, 0,
Map(testTopicPartition ->
new PartitionData(fetchOffset, 0, Integer.MAX_VALUE, Optional.empty())).asJava).build(3))
val records = response.responseData().get(testTopicPartition).records.records()
Assert.assertNotNull(records)
val recordsList = records.asScala.toList
Assert.assertEquals(expected.size, recordsList.size)
recordsList.zipWithIndex.foreach {
case (record, i) => {
val buffer = record.value().duplicate()
val array = new Array[Byte](buffer.remaining())
buffer.get(array)
Assert.assertArrayEquals(s"expectNextRecords unexpected element ${i}",
expected(i), array)
}
}
}
}

4
core/src/test/scala/unit/kafka/server/FetchRequestTest.scala

@ -45,6 +45,10 @@ class FetchRequestTest extends BaseRequestTest { @@ -45,6 +45,10 @@ class FetchRequestTest extends BaseRequestTest {
private var producer: KafkaProducer[String, String] = null
override def brokerPropertyOverrides(properties: Properties): Unit = {
properties.put(KafkaConfig.FetchMaxBytes, Int.MaxValue.toString)
}
override def tearDown(): Unit = {
if (producer != null)
producer.close()

Loading…
Cancel
Save