diff --git a/clients/src/main/java/org/apache/kafka/common/security/JaasUtils.java b/clients/src/main/java/org/apache/kafka/common/security/JaasUtils.java index c2e7fbdc74a..593f104e300 100644 --- a/clients/src/main/java/org/apache/kafka/common/security/JaasUtils.java +++ b/clients/src/main/java/org/apache/kafka/common/security/JaasUtils.java @@ -16,12 +16,12 @@ */ package org.apache.kafka.common.security; -import javax.security.auth.login.Configuration; - import org.apache.kafka.common.KafkaException; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import javax.security.auth.login.Configuration; + public final class JaasUtils { private static final Logger LOG = LoggerFactory.getLogger(JaasUtils.class); public static final String JAVA_LOGIN_CONFIG_PARAM = "java.security.auth.login.config"; @@ -31,27 +31,48 @@ public final class JaasUtils { public static final String ZK_SASL_CLIENT = "zookeeper.sasl.client"; public static final String ZK_LOGIN_CONTEXT_NAME_KEY = "zookeeper.sasl.clientconfig"; + private static final String DEFAULT_ZK_LOGIN_CONTEXT_NAME = "Client"; + private static final String DEFAULT_ZK_SASL_CLIENT = "true"; + private JaasUtils() {} + public static String zkSecuritySysConfigString() { + String loginConfig = System.getProperty(JAVA_LOGIN_CONFIG_PARAM); + String clientEnabled = System.getProperty(ZK_SASL_CLIENT, "default:" + DEFAULT_ZK_SASL_CLIENT); + String contextName = System.getProperty(ZK_LOGIN_CONTEXT_NAME_KEY, "default:" + DEFAULT_ZK_LOGIN_CONTEXT_NAME); + return "[" + + JAVA_LOGIN_CONFIG_PARAM + "=" + loginConfig + + ", " + + ZK_SASL_CLIENT + "=" + clientEnabled + + ", " + + ZK_LOGIN_CONTEXT_NAME_KEY + "=" + contextName + + "]"; + } + public static boolean isZkSecurityEnabled() { - boolean zkSaslEnabled = Boolean.parseBoolean(System.getProperty(ZK_SASL_CLIENT, "true")); - String zkLoginContextName = System.getProperty(ZK_LOGIN_CONTEXT_NAME_KEY, "Client"); + boolean zkSaslEnabled = Boolean.parseBoolean(System.getProperty(ZK_SASL_CLIENT, DEFAULT_ZK_SASL_CLIENT)); + String zkLoginContextName = System.getProperty(ZK_LOGIN_CONTEXT_NAME_KEY, DEFAULT_ZK_LOGIN_CONTEXT_NAME); - boolean isSecurityEnabled; + LOG.debug("Checking login config for Zookeeper JAAS context {}", zkSecuritySysConfigString()); + + boolean foundLoginConfigEntry; try { Configuration loginConf = Configuration.getConfiguration(); - isSecurityEnabled = loginConf.getAppConfigurationEntry(zkLoginContextName) != null; + foundLoginConfigEntry = loginConf.getAppConfigurationEntry(zkLoginContextName) != null; } catch (Exception e) { - throw new KafkaException("Exception while loading Zookeeper JAAS login context '" + zkLoginContextName + "'", e); + throw new KafkaException("Exception while loading Zookeeper JAAS login context " + + zkSecuritySysConfigString(), e); } - if (isSecurityEnabled && !zkSaslEnabled) { + + if (foundLoginConfigEntry && !zkSaslEnabled) { LOG.error("JAAS configuration is present, but system property " + ZK_SASL_CLIENT + " is set to false, which disables " + "SASL in the ZooKeeper client"); - throw new KafkaException("Exception while determining if ZooKeeper is secure"); + throw new KafkaException("Exception while determining if ZooKeeper is secure " + + zkSecuritySysConfigString()); } - return isSecurityEnabled; + return foundLoginConfigEntry; } } diff --git a/core/src/main/scala/kafka/server/KafkaServer.scala b/core/src/main/scala/kafka/server/KafkaServer.scala index 353b66a32d1..10677150aa4 100755 --- a/core/src/main/scala/kafka/server/KafkaServer.scala +++ b/core/src/main/scala/kafka/server/KafkaServer.scala @@ -384,7 +384,8 @@ class KafkaServer(val config: KafkaConfig, time: Time = Time.SYSTEM, threadNameP val isZkSecurityEnabled = JaasUtils.isZkSecurityEnabled() if (secureAclsEnabled && !isZkSecurityEnabled) - throw new java.lang.SecurityException(s"${KafkaConfig.ZkEnableSecureAclsProp} is true, but the verification of the JAAS login file failed.") + throw new java.lang.SecurityException(s"${KafkaConfig.ZkEnableSecureAclsProp} is true, but the " + + s"verification of the JAAS login file failed ${JaasUtils.zkSecuritySysConfigString}") // make sure chroot path exists chrootOption.foreach { chroot =>