Browse Source

MINOR: Fix potential bug in LogConfig.getConfigValue and improve test coverage (#7159)

LogConfig.getConfigValue would throw a NoSuchElementException if any log
config was defined without a server default mapping.

Added a unit test for `getConfigValue` and a sanity test for
`toHtml`/`toRst`/`toEnrichedRst`, which were previously not exercised during
the test suite.

Reviewers: Jason Gustafson <jason@confluent.io>, José Armando García Sancio <jsancio@users.noreply.github.com>
pull/7161/head
Ismael Juma 5 years ago committed by GitHub
parent
commit
f70ece26d1
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 2
      clients/src/main/java/org/apache/kafka/clients/producer/ProducerConfig.java
  2. 18
      core/src/main/scala/kafka/log/LogConfig.scala
  3. 46
      core/src/test/scala/unit/kafka/log/LogConfigTest.scala

2
clients/src/main/java/org/apache/kafka/clients/producer/ProducerConfig.java

@ -405,7 +405,7 @@ public class ProducerConfig extends AbstractConfig { @@ -405,7 +405,7 @@ public class ProducerConfig extends AbstractConfig {
}
public static ConfigDef configDef() {
return new ConfigDef(CONFIG);
return new ConfigDef(CONFIG);
}
public static void main(String[] args) {

18
core/src/main/scala/kafka/log/LogConfig.scala

@ -181,9 +181,17 @@ object LogConfig { @@ -181,9 +181,17 @@ object LogConfig {
"[PartitionId]:[BrokerId],[PartitionId]:[BrokerId]:... or alternatively the wildcard '*' can be used to throttle " +
"all replicas for this topic."
private class LogConfigDef extends ConfigDef {
private[log] val ServerDefaultHeaderName = "Server Default Property"
// Package private for testing
private[log] class LogConfigDef(base: ConfigDef) extends ConfigDef(base) {
def this() = this(new ConfigDef)
private final val serverDefaultConfigNames = mutable.Map[String, String]()
base match {
case b: LogConfigDef => serverDefaultConfigNames ++= b.serverDefaultConfigNames
case _ =>
}
def define(name: String, defType: ConfigDef.Type, defaultValue: Any, validator: Validator,
importance: ConfigDef.Importance, doc: String, serverDefaultConfigName: String): LogConfigDef = {
@ -206,11 +214,12 @@ object LogConfig { @@ -206,11 +214,12 @@ object LogConfig {
this
}
override def headers = List("Name", "Description", "Type", "Default", "Valid Values", "Server Default Property", "Importance").asJava
override def headers = List("Name", "Description", "Type", "Default", "Valid Values", ServerDefaultHeaderName,
"Importance").asJava
override def getConfigValue(key: ConfigKey, headerName: String): String = {
headerName match {
case "Server Default Property" => serverDefaultConfigNames.get(key.name).get
case ServerDefaultHeaderName => serverDefaultConfigNames.getOrElse(key.name, null)
case _ => super.getConfigValue(key, headerName)
}
}
@ -218,6 +227,9 @@ object LogConfig { @@ -218,6 +227,9 @@ object LogConfig {
def serverConfigName(configName: String): Option[String] = serverDefaultConfigNames.get(configName)
}
// Package private for testing, return a copy since it's a mutable global variable
private[log] def configDefCopy: LogConfigDef = new LogConfigDef(configDef)
private val configDef: LogConfigDef = {
import org.apache.kafka.common.config.ConfigDef.Importance._
import org.apache.kafka.common.config.ConfigDef.Range._

46
core/src/test/scala/unit/kafka/log/LogConfigTest.scala

@ -19,9 +19,11 @@ package kafka.log @@ -19,9 +19,11 @@ package kafka.log
import java.util.Properties
import kafka.server.{ThrottledReplicaListValidator, KafkaConfig, KafkaServer}
import kafka.server.{KafkaConfig, KafkaServer, ThrottledReplicaListValidator}
import kafka.utils.TestUtils
import org.apache.kafka.common.config.ConfigException
import org.apache.kafka.common.config.ConfigDef.Importance.MEDIUM
import org.apache.kafka.common.config.ConfigDef.Type.INT
import org.apache.kafka.common.config.{ConfigException, TopicConfig}
import org.junit.{Assert, Test}
import org.junit.Assert._
import org.scalatest.Assertions._
@ -113,6 +115,46 @@ class LogConfigTest { @@ -113,6 +115,46 @@ class LogConfigTest {
assertFalse(isValid("100:0,10 : "))
}
/* Sanity check that toHtml produces one of the expected configs */
@Test
def testToHtml(): Unit = {
val html = LogConfig.configDefCopy.toHtmlTable
val expectedConfig = "<td>file.delete.delay.ms</td>"
assertTrue(s"Could not find `$expectedConfig` in:\n $html", html.contains(expectedConfig))
}
/* Sanity check that toEnrichedRst produces one of the expected configs */
@Test
def testToEnrichedRst(): Unit = {
val rst = LogConfig.configDefCopy.toEnrichedRst
val expectedConfig = "``file.delete.delay.ms``"
assertTrue(s"Could not find `$expectedConfig` in:\n $rst", rst.contains(expectedConfig))
}
/* Sanity check that toEnrichedRst produces one of the expected configs */
@Test
def testToRst(): Unit = {
val rst = LogConfig.configDefCopy.toRst
val expectedConfig = "``file.delete.delay.ms``"
assertTrue(s"Could not find `$expectedConfig` in:\n $rst", rst.contains(expectedConfig))
}
@Test
def testGetConfigValue(): Unit = {
// Add a config that doesn't set the `serverDefaultConfigName`
val configDef = LogConfig.configDefCopy
val configNameWithNoServerMapping = "log.foo"
configDef.define(configNameWithNoServerMapping, INT, 1, MEDIUM, s"$configNameWithNoServerMapping doc")
val deleteDelayKey = configDef.configKeys.get(TopicConfig.FILE_DELETE_DELAY_MS_CONFIG)
val deleteDelayServerDefault = configDef.getConfigValue(deleteDelayKey, LogConfig.ServerDefaultHeaderName)
assertEquals(KafkaConfig.LogDeleteDelayMsProp, deleteDelayServerDefault)
val keyWithNoServerMapping = configDef.configKeys.get(configNameWithNoServerMapping)
val nullServerDefault = configDef.getConfigValue(keyWithNoServerMapping, LogConfig.ServerDefaultHeaderName)
assertNull(nullServerDefault)
}
private def isValid(configValue: String): Boolean = {
try {
ThrottledReplicaListValidator.ensureValidString("", configValue)

Loading…
Cancel
Save