diff --git a/core/src/main/scala/kafka/server/DynamicConfigManager.scala b/core/src/main/scala/kafka/server/DynamicConfigManager.scala index c81ce6c5e35..97760ba142f 100644 --- a/core/src/main/scala/kafka/server/DynamicConfigManager.scala +++ b/core/src/main/scala/kafka/server/DynamicConfigManager.scala @@ -23,7 +23,10 @@ import kafka.utils.Logging import kafka.utils.ZkUtils import scala.collection._ +import scala.collection.JavaConverters._ import kafka.admin.AdminUtils +import org.apache.kafka.common.config.types.Password +import org.apache.kafka.common.security.scram.ScramMechanism import org.apache.kafka.common.utils.Time /** @@ -142,7 +145,10 @@ class DynamicConfigManager(private val zkUtils: ZkUtils, val fullSanitizedEntityName = entityPath.substring(index + 1) val entityConfig = AdminUtils.fetchEntityConfig(zkUtils, rootEntityType, fullSanitizedEntityName) - logger.info(s"Processing override for entityPath: $entityPath with config: $entityConfig") + val loggableConfig = entityConfig.asScala.map { + case (k, v) => (k, if (ScramMechanism.isScram(k)) Password.HIDDEN else v) + } + logger.info(s"Processing override for entityPath: $entityPath with config: $loggableConfig") configHandlers(rootEntityType).processConfigChanges(fullSanitizedEntityName, entityConfig) } diff --git a/core/src/test/scala/integration/kafka/api/SaslScramSslEndToEndAuthorizationTest.scala b/core/src/test/scala/integration/kafka/api/SaslScramSslEndToEndAuthorizationTest.scala index 86db4074f94..0bc4e507137 100644 --- a/core/src/test/scala/integration/kafka/api/SaslScramSslEndToEndAuthorizationTest.scala +++ b/core/src/test/scala/integration/kafka/api/SaslScramSslEndToEndAuthorizationTest.scala @@ -21,6 +21,7 @@ import kafka.utils.JaasTestUtils import kafka.admin.ConfigCommand import kafka.utils.ZkUtils import scala.collection.JavaConverters._ +import org.junit.Before class SaslScramSslEndToEndAuthorizationTest extends SaslEndToEndAuthorizationTest { override protected def kafkaClientSaslMechanism = "SCRAM-SHA-256" @@ -33,16 +34,23 @@ class SaslScramSslEndToEndAuthorizationTest extends SaslEndToEndAuthorizationTes override def configureSecurityBeforeServersStart() { super.configureSecurityBeforeServersStart() zkUtils.makeSurePersistentPathExists(ZkUtils.ConfigChangesPath) - - def configCommandArgs(username: String, password: String) : Array[String] = { - val credentials = kafkaServerSaslMechanisms.map(m => s"$m=[iterations=4096,password=$password]") - Array("--zookeeper", zkConnect, - "--alter", "--add-config", credentials.mkString(","), - "--entity-type", "users", - "--entity-name", username) - } + // Create broker credentials before starting brokers ConfigCommand.main(configCommandArgs(kafkaPrincipal, kafkaPassword)) + } + + @Before + override def setUp() { + super.setUp() + // Create client credentials after starting brokers so that dynamic credential creation is also tested ConfigCommand.main(configCommandArgs(clientPrincipal, clientPassword)) ConfigCommand.main(configCommandArgs(JaasTestUtils.KafkaScramUser2, JaasTestUtils.KafkaScramPassword2)) } + + private def configCommandArgs(username: String, password: String) : Array[String] = { + val credentials = kafkaServerSaslMechanisms.map(m => s"$m=[iterations=4096,password=$password]") + Array("--zookeeper", zkConnect, + "--alter", "--add-config", credentials.mkString(","), + "--entity-type", "users", + "--entity-name", username) + } }