|
|
@ -119,6 +119,8 @@ class DynamicBrokerReconfigurationTest extends ZooKeeperTestHarness with SaslSet |
|
|
|
props.put(KafkaConfig.NumReplicaFetchersProp, "2") // greater than one to test reducing threads |
|
|
|
props.put(KafkaConfig.NumReplicaFetchersProp, "2") // greater than one to test reducing threads |
|
|
|
props.put(KafkaConfig.ProducerQuotaBytesPerSecondDefaultProp, "10000000") // non-default value to trigger a new metric |
|
|
|
props.put(KafkaConfig.ProducerQuotaBytesPerSecondDefaultProp, "10000000") // non-default value to trigger a new metric |
|
|
|
props.put(KafkaConfig.PasswordEncoderSecretProp, "dynamic-config-secret") |
|
|
|
props.put(KafkaConfig.PasswordEncoderSecretProp, "dynamic-config-secret") |
|
|
|
|
|
|
|
props.put(KafkaConfig.LogRetentionTimeMillisProp, 1680000000.toString) |
|
|
|
|
|
|
|
props.put(KafkaConfig.LogRetentionTimeHoursProp, 168.toString) |
|
|
|
|
|
|
|
|
|
|
|
props ++= sslProperties1 |
|
|
|
props ++= sslProperties1 |
|
|
|
props ++= securityProps(sslProperties1, KEYSTORE_PROPS, listenerPrefix(SecureInternal)) |
|
|
|
props ++= securityProps(sslProperties1, KEYSTORE_PROPS, listenerPrefix(SecureInternal)) |
|
|
@ -159,9 +161,10 @@ class DynamicBrokerReconfigurationTest extends ZooKeeperTestHarness with SaslSet |
|
|
|
} |
|
|
|
} |
|
|
|
|
|
|
|
|
|
|
|
@Test |
|
|
|
@Test |
|
|
|
def testKeyStoreDescribeUsingAdminClient(): Unit = { |
|
|
|
def testConfigDescribeUsingAdminClient(): Unit = { |
|
|
|
|
|
|
|
|
|
|
|
def verifyConfig(configName: String, configEntry: ConfigEntry, isSensitive: Boolean, expectedProps: Properties): Unit = { |
|
|
|
def verifyConfig(configName: String, configEntry: ConfigEntry, isSensitive: Boolean, isReadOnly: Boolean, |
|
|
|
|
|
|
|
expectedProps: Properties): Unit = { |
|
|
|
if (isSensitive) { |
|
|
|
if (isSensitive) { |
|
|
|
assertTrue(s"Value is sensitive: $configName", configEntry.isSensitive) |
|
|
|
assertTrue(s"Value is sensitive: $configName", configEntry.isSensitive) |
|
|
|
assertNull(s"Sensitive value returned for $configName", configEntry.value) |
|
|
|
assertNull(s"Sensitive value returned for $configName", configEntry.value) |
|
|
@ -169,6 +172,7 @@ class DynamicBrokerReconfigurationTest extends ZooKeeperTestHarness with SaslSet |
|
|
|
assertFalse(s"Config is not sensitive: $configName", configEntry.isSensitive) |
|
|
|
assertFalse(s"Config is not sensitive: $configName", configEntry.isSensitive) |
|
|
|
assertEquals(expectedProps.getProperty(configName), configEntry.value) |
|
|
|
assertEquals(expectedProps.getProperty(configName), configEntry.value) |
|
|
|
} |
|
|
|
} |
|
|
|
|
|
|
|
assertEquals(s"isReadOnly incorrect for $configName: $configEntry", isReadOnly, configEntry.isReadOnly) |
|
|
|
} |
|
|
|
} |
|
|
|
|
|
|
|
|
|
|
|
def verifySynonym(configName: String, synonym: ConfigSynonym, isSensitive: Boolean, |
|
|
|
def verifySynonym(configName: String, synonym: ConfigSynonym, isSensitive: Boolean, |
|
|
@ -203,7 +207,7 @@ class DynamicBrokerReconfigurationTest extends ZooKeeperTestHarness with SaslSet |
|
|
|
KEYSTORE_PROPS.asScala.foreach { configName => |
|
|
|
KEYSTORE_PROPS.asScala.foreach { configName => |
|
|
|
val desc = configEntry(configDesc, s"$prefix$configName") |
|
|
|
val desc = configEntry(configDesc, s"$prefix$configName") |
|
|
|
val isSensitive = configName.contains("password") |
|
|
|
val isSensitive = configName.contains("password") |
|
|
|
verifyConfig(configName, desc, isSensitive, if (prefix.isEmpty) invalidSslProperties else sslProperties1) |
|
|
|
verifyConfig(configName, desc, isSensitive, isReadOnly = prefix.nonEmpty, if (prefix.isEmpty) invalidSslProperties else sslProperties1) |
|
|
|
val defaultValue = if (configName == SSL_KEYSTORE_TYPE_CONFIG) Some("JKS") else None |
|
|
|
val defaultValue = if (configName == SSL_KEYSTORE_TYPE_CONFIG) Some("JKS") else None |
|
|
|
verifySynonyms(configName, desc.synonyms, isSensitive, prefix, defaultValue) |
|
|
|
verifySynonyms(configName, desc.synonyms, isSensitive, prefix, defaultValue) |
|
|
|
} |
|
|
|
} |
|
|
@ -215,6 +219,37 @@ class DynamicBrokerReconfigurationTest extends ZooKeeperTestHarness with SaslSet |
|
|
|
val configDesc = describeConfig(adminClient) |
|
|
|
val configDesc = describeConfig(adminClient) |
|
|
|
verifySslConfig("listener.name.external.", sslProperties1, configDesc) |
|
|
|
verifySslConfig("listener.name.external.", sslProperties1, configDesc) |
|
|
|
verifySslConfig("", invalidSslProperties, configDesc) |
|
|
|
verifySslConfig("", invalidSslProperties, configDesc) |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
// Verify a few log configs with and without synonyms |
|
|
|
|
|
|
|
val expectedProps = new Properties |
|
|
|
|
|
|
|
expectedProps.setProperty(KafkaConfig.LogRetentionTimeMillisProp, "1680000000") |
|
|
|
|
|
|
|
expectedProps.setProperty(KafkaConfig.LogRetentionTimeHoursProp, "168") |
|
|
|
|
|
|
|
expectedProps.setProperty(KafkaConfig.LogRollTimeHoursProp, "168") |
|
|
|
|
|
|
|
expectedProps.setProperty(KafkaConfig.LogCleanerThreadsProp, "1") |
|
|
|
|
|
|
|
val logRetentionMs = configEntry(configDesc, KafkaConfig.LogRetentionTimeMillisProp) |
|
|
|
|
|
|
|
verifyConfig(KafkaConfig.LogRetentionTimeMillisProp, logRetentionMs, |
|
|
|
|
|
|
|
isSensitive = false, isReadOnly = false, expectedProps) |
|
|
|
|
|
|
|
val logRetentionHours = configEntry(configDesc, KafkaConfig.LogRetentionTimeHoursProp) |
|
|
|
|
|
|
|
verifyConfig(KafkaConfig.LogRetentionTimeHoursProp, logRetentionHours, |
|
|
|
|
|
|
|
isSensitive = false, isReadOnly = true, expectedProps) |
|
|
|
|
|
|
|
val logRollHours = configEntry(configDesc, KafkaConfig.LogRollTimeHoursProp) |
|
|
|
|
|
|
|
verifyConfig(KafkaConfig.LogRollTimeHoursProp, logRollHours, |
|
|
|
|
|
|
|
isSensitive = false, isReadOnly = true, expectedProps) |
|
|
|
|
|
|
|
val logCleanerThreads = configEntry(configDesc, KafkaConfig.LogCleanerThreadsProp) |
|
|
|
|
|
|
|
verifyConfig(KafkaConfig.LogCleanerThreadsProp, logCleanerThreads, |
|
|
|
|
|
|
|
isSensitive = false, isReadOnly = false, expectedProps) |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def synonymsList(configEntry: ConfigEntry): List[(String, ConfigSource)] = |
|
|
|
|
|
|
|
configEntry.synonyms.asScala.map(s => (s.name, s.source)).toList |
|
|
|
|
|
|
|
assertEquals(List((KafkaConfig.LogRetentionTimeMillisProp, ConfigSource.STATIC_BROKER_CONFIG), |
|
|
|
|
|
|
|
(KafkaConfig.LogRetentionTimeHoursProp, ConfigSource.STATIC_BROKER_CONFIG), |
|
|
|
|
|
|
|
(KafkaConfig.LogRetentionTimeHoursProp, ConfigSource.DEFAULT_CONFIG)), |
|
|
|
|
|
|
|
synonymsList(logRetentionMs)) |
|
|
|
|
|
|
|
assertEquals(List((KafkaConfig.LogRetentionTimeHoursProp, ConfigSource.STATIC_BROKER_CONFIG), |
|
|
|
|
|
|
|
(KafkaConfig.LogRetentionTimeHoursProp, ConfigSource.DEFAULT_CONFIG)), |
|
|
|
|
|
|
|
synonymsList(logRetentionHours)) |
|
|
|
|
|
|
|
assertEquals(List((KafkaConfig.LogRollTimeHoursProp, ConfigSource.DEFAULT_CONFIG)), synonymsList(logRollHours)) |
|
|
|
|
|
|
|
assertEquals(List((KafkaConfig.LogCleanerThreadsProp, ConfigSource.DEFAULT_CONFIG)), synonymsList(logCleanerThreads)) |
|
|
|
} |
|
|
|
} |
|
|
|
|
|
|
|
|
|
|
|
@Test |
|
|
|
@Test |
|
|
|