Browse Source

MINOR: Add system configuration to zk security exception messages (#7280)

This patch ensures that relevant system configurations are included in exception messages when zk security validation fails.

Reviewers: Vikas Singh <soondenana@users.noreply.github.com>,  José Armando García Sancio <jsancio@users.noreply.github.com>, Manikumar Reddy <manikumar.reddy@gmail.com>
pull/7294/head
Jason Gustafson 5 years ago committed by GitHub
parent
commit
3b8d7a661c
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 41
      clients/src/main/java/org/apache/kafka/common/security/JaasUtils.java
  2. 3
      core/src/main/scala/kafka/server/KafkaServer.scala

41
clients/src/main/java/org/apache/kafka/common/security/JaasUtils.java

@ -16,12 +16,12 @@ @@ -16,12 +16,12 @@
*/
package org.apache.kafka.common.security;
import javax.security.auth.login.Configuration;
import org.apache.kafka.common.KafkaException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import javax.security.auth.login.Configuration;
public final class JaasUtils {
private static final Logger LOG = LoggerFactory.getLogger(JaasUtils.class);
public static final String JAVA_LOGIN_CONFIG_PARAM = "java.security.auth.login.config";
@ -31,27 +31,48 @@ public final class JaasUtils { @@ -31,27 +31,48 @@ public final class JaasUtils {
public static final String ZK_SASL_CLIENT = "zookeeper.sasl.client";
public static final String ZK_LOGIN_CONTEXT_NAME_KEY = "zookeeper.sasl.clientconfig";
private static final String DEFAULT_ZK_LOGIN_CONTEXT_NAME = "Client";
private static final String DEFAULT_ZK_SASL_CLIENT = "true";
private JaasUtils() {}
public static String zkSecuritySysConfigString() {
String loginConfig = System.getProperty(JAVA_LOGIN_CONFIG_PARAM);
String clientEnabled = System.getProperty(ZK_SASL_CLIENT, "default:" + DEFAULT_ZK_SASL_CLIENT);
String contextName = System.getProperty(ZK_LOGIN_CONTEXT_NAME_KEY, "default:" + DEFAULT_ZK_LOGIN_CONTEXT_NAME);
return "[" +
JAVA_LOGIN_CONFIG_PARAM + "=" + loginConfig +
", " +
ZK_SASL_CLIENT + "=" + clientEnabled +
", " +
ZK_LOGIN_CONTEXT_NAME_KEY + "=" + contextName +
"]";
}
public static boolean isZkSecurityEnabled() {
boolean zkSaslEnabled = Boolean.parseBoolean(System.getProperty(ZK_SASL_CLIENT, "true"));
String zkLoginContextName = System.getProperty(ZK_LOGIN_CONTEXT_NAME_KEY, "Client");
boolean zkSaslEnabled = Boolean.parseBoolean(System.getProperty(ZK_SASL_CLIENT, DEFAULT_ZK_SASL_CLIENT));
String zkLoginContextName = System.getProperty(ZK_LOGIN_CONTEXT_NAME_KEY, DEFAULT_ZK_LOGIN_CONTEXT_NAME);
boolean isSecurityEnabled;
LOG.debug("Checking login config for Zookeeper JAAS context {}", zkSecuritySysConfigString());
boolean foundLoginConfigEntry;
try {
Configuration loginConf = Configuration.getConfiguration();
isSecurityEnabled = loginConf.getAppConfigurationEntry(zkLoginContextName) != null;
foundLoginConfigEntry = loginConf.getAppConfigurationEntry(zkLoginContextName) != null;
} catch (Exception e) {
throw new KafkaException("Exception while loading Zookeeper JAAS login context '" + zkLoginContextName + "'", e);
throw new KafkaException("Exception while loading Zookeeper JAAS login context " +
zkSecuritySysConfigString(), e);
}
if (isSecurityEnabled && !zkSaslEnabled) {
if (foundLoginConfigEntry && !zkSaslEnabled) {
LOG.error("JAAS configuration is present, but system property " +
ZK_SASL_CLIENT + " is set to false, which disables " +
"SASL in the ZooKeeper client");
throw new KafkaException("Exception while determining if ZooKeeper is secure");
throw new KafkaException("Exception while determining if ZooKeeper is secure " +
zkSecuritySysConfigString());
}
return isSecurityEnabled;
return foundLoginConfigEntry;
}
}

3
core/src/main/scala/kafka/server/KafkaServer.scala

@ -384,7 +384,8 @@ class KafkaServer(val config: KafkaConfig, time: Time = Time.SYSTEM, threadNameP @@ -384,7 +384,8 @@ class KafkaServer(val config: KafkaConfig, time: Time = Time.SYSTEM, threadNameP
val isZkSecurityEnabled = JaasUtils.isZkSecurityEnabled()
if (secureAclsEnabled && !isZkSecurityEnabled)
throw new java.lang.SecurityException(s"${KafkaConfig.ZkEnableSecureAclsProp} is true, but the verification of the JAAS login file failed.")
throw new java.lang.SecurityException(s"${KafkaConfig.ZkEnableSecureAclsProp} is true, but the " +
s"verification of the JAAS login file failed ${JaasUtils.zkSecuritySysConfigString}")
// make sure chroot path exists
chrootOption.foreach { chroot =>

Loading…
Cancel
Save