diff --git a/clients/src/main/java/org/apache/kafka/clients/ClientUtils.java b/clients/src/main/java/org/apache/kafka/clients/ClientUtils.java index 3d5451542e6..28eb72e1be6 100644 --- a/clients/src/main/java/org/apache/kafka/clients/ClientUtils.java +++ b/clients/src/main/java/org/apache/kafka/clients/ClientUtils.java @@ -16,11 +16,11 @@ import java.io.Closeable; import java.net.InetSocketAddress; import java.util.ArrayList; import java.util.List; -import java.util.Map; import java.util.concurrent.atomic.AtomicReference; +import org.apache.kafka.common.config.AbstractConfig; import org.apache.kafka.common.network.ChannelBuilders; -import org.apache.kafka.common.network.LoginType; +import org.apache.kafka.common.security.JaasContext; import org.apache.kafka.common.protocol.SecurityProtocol; import org.apache.kafka.common.network.ChannelBuilder; import org.apache.kafka.common.config.ConfigException; @@ -73,14 +73,15 @@ public class ClientUtils { } /** - * @param configs client/server configs + * @param config client configs * @return configured ChannelBuilder based on the configs. */ - public static ChannelBuilder createChannelBuilder(Map configs) { - SecurityProtocol securityProtocol = SecurityProtocol.forName((String) configs.get(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG)); + public static ChannelBuilder createChannelBuilder(AbstractConfig config) { + SecurityProtocol securityProtocol = SecurityProtocol.forName(config.getString(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG)); if (!SecurityProtocol.nonTestingValues().contains(securityProtocol)) throw new ConfigException("Invalid SecurityProtocol " + securityProtocol); - String clientSaslMechanism = (String) configs.get(SaslConfigs.SASL_MECHANISM); - return ChannelBuilders.clientChannelBuilder(securityProtocol, LoginType.CLIENT, configs, clientSaslMechanism, true); + String clientSaslMechanism = config.getString(SaslConfigs.SASL_MECHANISM); + return ChannelBuilders.clientChannelBuilder(securityProtocol, JaasContext.Type.CLIENT, config, null, + clientSaslMechanism, true); } } diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java b/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java index 2936f0f8f03..e21c1965113 100644 --- a/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java +++ b/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java @@ -648,7 +648,7 @@ public class KafkaConsumer implements Consumer { List addresses = ClientUtils.parseAndValidateAddresses(config.getList(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG)); this.metadata.update(Cluster.bootstrap(addresses), 0); String metricGrpPrefix = "consumer"; - ChannelBuilder channelBuilder = ClientUtils.createChannelBuilder(config.values()); + ChannelBuilder channelBuilder = ClientUtils.createChannelBuilder(config); NetworkClient netClient = new NetworkClient( new Selector(config.getLong(ConsumerConfig.CONNECTIONS_MAX_IDLE_MS_CONFIG), metrics, time, metricGrpPrefix, channelBuilder), this.metadata, diff --git a/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java b/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java index c604daa86a5..4415c64f1a2 100644 --- a/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java +++ b/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java @@ -298,7 +298,7 @@ public class KafkaProducer implements Producer { List addresses = ClientUtils.parseAndValidateAddresses(config.getList(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG)); this.metadata.update(Cluster.bootstrap(addresses), time.milliseconds()); - ChannelBuilder channelBuilder = ClientUtils.createChannelBuilder(config.values()); + ChannelBuilder channelBuilder = ClientUtils.createChannelBuilder(config); NetworkClient client = new NetworkClient( new Selector(config.getLong(ProducerConfig.CONNECTIONS_MAX_IDLE_MS_CONFIG), this.metrics, time, "producer", channelBuilder), this.metadata, diff --git a/clients/src/main/java/org/apache/kafka/common/config/AbstractConfig.java b/clients/src/main/java/org/apache/kafka/common/config/AbstractConfig.java index 096047f3957..84047e09d8e 100644 --- a/clients/src/main/java/org/apache/kafka/common/config/AbstractConfig.java +++ b/clients/src/main/java/org/apache/kafka/common/config/AbstractConfig.java @@ -46,6 +46,8 @@ public class AbstractConfig { /* the parsed values */ private final Map values; + private final ConfigDef definition; + @SuppressWarnings("unchecked") public AbstractConfig(ConfigDef definition, Map originals, boolean doLog) { /* check that all the keys are really strings */ @@ -55,6 +57,7 @@ public class AbstractConfig { this.originals = (Map) originals; this.values = definition.parse(this.originals); this.used = Collections.synchronizedSet(new HashSet()); + this.definition = definition; if (doLog) logAll(); } @@ -63,12 +66,6 @@ public class AbstractConfig { this(definition, originals, true); } - public AbstractConfig(Map parsedConfig) { - this.values = parsedConfig; - this.originals = new HashMap<>(); - this.used = Collections.synchronizedSet(new HashSet()); - } - protected Object get(String key) { if (!values.containsKey(key)) throw new ConfigException(String.format("Unknown configuration '%s'", key)); @@ -152,7 +149,7 @@ public class AbstractConfig { * @return a Map containing the settings with the prefix */ public Map originalsWithPrefix(String prefix) { - Map result = new RecordingMap<>(prefix); + Map result = new RecordingMap<>(prefix, false); for (Map.Entry entry : originals.entrySet()) { if (entry.getKey().startsWith(prefix) && entry.getKey().length() > prefix.length()) result.put(entry.getKey().substring(prefix.length()), entry.getValue()); @@ -160,6 +157,25 @@ public class AbstractConfig { return result; } + /** + * Put all keys that do not start with {@code prefix} and their parsed values in the result map and then + * put all the remaining keys with the prefix stripped and their parsed values in the result map. + * + * This is useful if one wants to allow prefixed configs to override default ones. + */ + public Map valuesWithPrefixOverride(String prefix) { + Map result = new RecordingMap<>(values(), prefix, true); + for (Map.Entry entry : originals.entrySet()) { + if (entry.getKey().startsWith(prefix) && entry.getKey().length() > prefix.length()) { + String keyWithNoPrefix = entry.getKey().substring(prefix.length()); + ConfigDef.ConfigKey configKey = definition.configKeys().get(keyWithNoPrefix); + if (configKey != null) + result.put(keyWithNoPrefix, definition.parseValue(configKey, entry.getValue(), true)); + } + } + return result; + } + public Map values() { return new RecordingMap<>(values); } @@ -264,34 +280,40 @@ public class AbstractConfig { private class RecordingMap extends HashMap { private final String prefix; + private final boolean withIgnoreFallback; RecordingMap() { - this(""); + this("", false); } - RecordingMap(String prefix) { + RecordingMap(String prefix, boolean withIgnoreFallback) { this.prefix = prefix; + this.withIgnoreFallback = withIgnoreFallback; } RecordingMap(Map m) { - this(m, ""); + this(m, "", false); } - RecordingMap(Map m, String prefix) { + RecordingMap(Map m, String prefix, boolean withIgnoreFallback) { super(m); this.prefix = prefix; + this.withIgnoreFallback = withIgnoreFallback; } @Override public V get(Object key) { if (key instanceof String) { + String stringKey = (String) key; String keyWithPrefix; if (prefix.isEmpty()) { - keyWithPrefix = (String) key; + keyWithPrefix = stringKey; } else { - keyWithPrefix = prefix + key; + keyWithPrefix = prefix + stringKey; } ignore(keyWithPrefix); + if (withIgnoreFallback) + ignore(stringKey); } return super.get(key); } diff --git a/clients/src/main/java/org/apache/kafka/common/config/ConfigDef.java b/clients/src/main/java/org/apache/kafka/common/config/ConfigDef.java index 89feb9aba5e..169461428d9 100644 --- a/clients/src/main/java/org/apache/kafka/common/config/ConfigDef.java +++ b/clients/src/main/java/org/apache/kafka/common/config/ConfigDef.java @@ -426,26 +426,28 @@ public class ConfigDef { } // parse all known keys Map values = new HashMap<>(); - for (ConfigKey key : configKeys.values()) { - Object value; - // props map contains setting - assign ConfigKey value - if (props.containsKey(key.name)) { - value = parseType(key.name, props.get(key.name), key.type); - // props map doesn't contain setting, the key is required because no default value specified - its an error - } else if (key.defaultValue == NO_DEFAULT_VALUE) { - throw new ConfigException("Missing required configuration \"" + key.name + "\" which has no default value."); - } else { - // otherwise assign setting its default value - value = key.defaultValue; - } - if (key.validator != null) { - key.validator.ensureValid(key.name, value); - } - values.put(key.name, value); - } + for (ConfigKey key : configKeys.values()) + values.put(key.name, parseValue(key, props.get(key.name), props.containsKey(key.name))); return values; } + Object parseValue(ConfigKey key, Object value, boolean isSet) { + Object parsedValue; + if (isSet) { + parsedValue = parseType(key.name, value, key.type); + // props map doesn't contain setting, the key is required because no default value specified - its an error + } else if (key.defaultValue == NO_DEFAULT_VALUE) { + throw new ConfigException("Missing required configuration \"" + key.name + "\" which has no default value."); + } else { + // otherwise assign setting its default value + parsedValue = key.defaultValue; + } + if (key.validator != null) { + key.validator.ensureValid(key.name, parsedValue); + } + return parsedValue; + } + /** * Validate the current configuration values with the configuration definition. * @param props the current configuration values @@ -1166,4 +1168,4 @@ public class ConfigDef { }; } -} \ No newline at end of file +} diff --git a/clients/src/main/java/org/apache/kafka/common/network/ChannelBuilders.java b/clients/src/main/java/org/apache/kafka/common/network/ChannelBuilders.java index 5a1486c1d64..02fb5e80d4a 100644 --- a/clients/src/main/java/org/apache/kafka/common/network/ChannelBuilders.java +++ b/clients/src/main/java/org/apache/kafka/common/network/ChannelBuilders.java @@ -13,8 +13,10 @@ package org.apache.kafka.common.network; +import org.apache.kafka.common.config.AbstractConfig; import org.apache.kafka.common.config.SslConfigs; import org.apache.kafka.common.protocol.SecurityProtocol; +import org.apache.kafka.common.security.JaasContext; import org.apache.kafka.common.security.auth.DefaultPrincipalBuilder; import org.apache.kafka.common.security.auth.PrincipalBuilder; import org.apache.kafka.common.security.authenticator.CredentialCache; @@ -28,8 +30,9 @@ public class ChannelBuilders { /** * @param securityProtocol the securityProtocol - * @param loginType the loginType, it must be non-null if `securityProtocol` is SASL_*; it is ignored otherwise - * @param configs client configs + * @param contextType the contextType, it must be non-null if `securityProtocol` is SASL_*; it is ignored otherwise + * @param config client config + * @param listenerName the listenerName if contextType is SERVER or null otherwise * @param clientSaslMechanism SASL mechanism if mode is CLIENT, ignored otherwise * @param saslHandshakeRequestEnable flag to enable Sasl handshake requests; disabled only for SASL * inter-broker connections with inter-broker protocol version < 0.10 @@ -37,41 +40,52 @@ public class ChannelBuilders { * @throws IllegalArgumentException if `mode` invariants described above is not maintained */ public static ChannelBuilder clientChannelBuilder(SecurityProtocol securityProtocol, - LoginType loginType, - Map configs, + JaasContext.Type contextType, + AbstractConfig config, + ListenerName listenerName, String clientSaslMechanism, boolean saslHandshakeRequestEnable) { if (securityProtocol == SecurityProtocol.SASL_PLAINTEXT || securityProtocol == SecurityProtocol.SASL_SSL) { - if (loginType == null) - throw new IllegalArgumentException("`loginType` must be non-null if `securityProtocol` is `" + securityProtocol + "`"); + if (contextType == null) + throw new IllegalArgumentException("`contextType` must be non-null if `securityProtocol` is `" + securityProtocol + "`"); if (clientSaslMechanism == null) throw new IllegalArgumentException("`clientSaslMechanism` must be non-null in client mode if `securityProtocol` is `" + securityProtocol + "`"); } - return create(securityProtocol, Mode.CLIENT, loginType, configs, clientSaslMechanism, saslHandshakeRequestEnable, null); + return create(securityProtocol, Mode.CLIENT, contextType, config, listenerName, clientSaslMechanism, + saslHandshakeRequestEnable, null); } /** + * @param listenerName the listenerName * @param securityProtocol the securityProtocol - * @param configs server configs + * @param config server config * @param credentialCache Credential cache for SASL/SCRAM if SCRAM is enabled * @return the configured `ChannelBuilder` */ - public static ChannelBuilder serverChannelBuilder(SecurityProtocol securityProtocol, - Map configs, - CredentialCache credentialCache) { - return create(securityProtocol, Mode.SERVER, LoginType.SERVER, configs, null, true, credentialCache); + public static ChannelBuilder serverChannelBuilder(ListenerName listenerName, + SecurityProtocol securityProtocol, + AbstractConfig config, + CredentialCache credentialCache) { + return create(securityProtocol, Mode.SERVER, JaasContext.Type.SERVER, config, listenerName, null, + true, credentialCache); } private static ChannelBuilder create(SecurityProtocol securityProtocol, - Mode mode, - LoginType loginType, - Map configs, - String clientSaslMechanism, - boolean saslHandshakeRequestEnable, - CredentialCache credentialCache) { - ChannelBuilder channelBuilder; + Mode mode, + JaasContext.Type contextType, + AbstractConfig config, + ListenerName listenerName, + String clientSaslMechanism, + boolean saslHandshakeRequestEnable, + CredentialCache credentialCache) { + Map configs; + if (listenerName == null) + configs = config.values(); + else + configs = config.valuesWithPrefixOverride(listenerName.configPrefix()); + ChannelBuilder channelBuilder; switch (securityProtocol) { case SSL: requireNonNullMode(mode, securityProtocol); @@ -80,7 +94,9 @@ public class ChannelBuilders { case SASL_SSL: case SASL_PLAINTEXT: requireNonNullMode(mode, securityProtocol); - channelBuilder = new SaslChannelBuilder(mode, loginType, securityProtocol, clientSaslMechanism, saslHandshakeRequestEnable, credentialCache); + JaasContext jaasContext = JaasContext.load(contextType, listenerName, configs); + channelBuilder = new SaslChannelBuilder(mode, jaasContext, securityProtocol, + clientSaslMechanism, saslHandshakeRequestEnable, credentialCache); break; case PLAINTEXT: case TRACE: diff --git a/clients/src/main/java/org/apache/kafka/common/network/ListenerName.java b/clients/src/main/java/org/apache/kafka/common/network/ListenerName.java index b37651437ac..fad2ee27330 100644 --- a/clients/src/main/java/org/apache/kafka/common/network/ListenerName.java +++ b/clients/src/main/java/org/apache/kafka/common/network/ListenerName.java @@ -23,6 +23,8 @@ import java.util.Objects; public final class ListenerName { + private static final String CONFIG_STATIC_PREFIX = "listener.name"; + /** * Create an instance with the security protocol name as the value. */ @@ -65,4 +67,8 @@ public final class ListenerName { public String toString() { return "ListenerName(" + value + ")"; } + + public String configPrefix() { + return CONFIG_STATIC_PREFIX + "." + value.toLowerCase(Locale.ROOT) + "."; + } } diff --git a/clients/src/main/java/org/apache/kafka/common/network/LoginType.java b/clients/src/main/java/org/apache/kafka/common/network/LoginType.java deleted file mode 100644 index a3a2b279e6b..00000000000 --- a/clients/src/main/java/org/apache/kafka/common/network/LoginType.java +++ /dev/null @@ -1,39 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.kafka.common.network; - -import org.apache.kafka.common.security.JaasUtils; - -/** - * The type of the login context, it should be SERVER for the broker and CLIENT for the clients (i.e. consumer and - * producer). It provides the login context name which defines the section of the JAAS configuration file to be used - * for login. - */ -public enum LoginType { - CLIENT(JaasUtils.LOGIN_CONTEXT_CLIENT), - SERVER(JaasUtils.LOGIN_CONTEXT_SERVER); - - private final String contextName; - - LoginType(String contextName) { - this.contextName = contextName; - } - - public String contextName() { - return contextName; - } -} diff --git a/clients/src/main/java/org/apache/kafka/common/network/Mode.java b/clients/src/main/java/org/apache/kafka/common/network/Mode.java index 4d8ef3b601a..59ef712da9b 100644 --- a/clients/src/main/java/org/apache/kafka/common/network/Mode.java +++ b/clients/src/main/java/org/apache/kafka/common/network/Mode.java @@ -16,4 +16,7 @@ */ package org.apache.kafka.common.network; +/** + * Connection mode for SSL and SASL connections. + */ public enum Mode { CLIENT, SERVER } diff --git a/clients/src/main/java/org/apache/kafka/common/network/SaslChannelBuilder.java b/clients/src/main/java/org/apache/kafka/common/network/SaslChannelBuilder.java index b556f3818f1..060f83308f1 100644 --- a/clients/src/main/java/org/apache/kafka/common/network/SaslChannelBuilder.java +++ b/clients/src/main/java/org/apache/kafka/common/network/SaslChannelBuilder.java @@ -13,15 +13,15 @@ package org.apache.kafka.common.network; import java.io.IOException; +import java.lang.reflect.InvocationTargetException; +import java.lang.reflect.Method; import java.nio.channels.SelectionKey; import java.nio.channels.SocketChannel; import java.util.List; import java.util.Map; -import javax.security.auth.login.Configuration; - import org.apache.kafka.common.config.SaslConfigs; -import org.apache.kafka.common.security.JaasUtils; +import org.apache.kafka.common.security.JaasContext; import org.apache.kafka.common.security.kerberos.KerberosShortNamer; import org.apache.kafka.common.security.authenticator.CredentialCache; import org.apache.kafka.common.security.authenticator.LoginManager; @@ -39,20 +39,20 @@ public class SaslChannelBuilder implements ChannelBuilder { private final SecurityProtocol securityProtocol; private final String clientSaslMechanism; private final Mode mode; - private final LoginType loginType; + private final JaasContext jaasContext; private final boolean handshakeRequestEnable; private final CredentialCache credentialCache; - private Configuration jaasConfig; private LoginManager loginManager; private SslFactory sslFactory; private Map configs; private KerberosShortNamer kerberosShortNamer; - public SaslChannelBuilder(Mode mode, LoginType loginType, SecurityProtocol securityProtocol, - String clientSaslMechanism, boolean handshakeRequestEnable, CredentialCache credentialCache) { + public SaslChannelBuilder(Mode mode, JaasContext jaasContext, SecurityProtocol securityProtocol, + String clientSaslMechanism, + boolean handshakeRequestEnable, CredentialCache credentialCache) { this.mode = mode; - this.loginType = loginType; + this.jaasContext = jaasContext; this.securityProtocol = securityProtocol; this.handshakeRequestEnable = handshakeRequestEnable; this.clientSaslMechanism = clientSaslMechanism; @@ -73,7 +73,7 @@ public class SaslChannelBuilder implements ChannelBuilder { if (hasKerberos) { String defaultRealm; try { - defaultRealm = JaasUtils.defaultKerberosRealm(); + defaultRealm = defaultKerberosRealm(); } catch (Exception ke) { defaultRealm = ""; } @@ -82,8 +82,7 @@ public class SaslChannelBuilder implements ChannelBuilder { if (principalToLocalRules != null) kerberosShortNamer = KerberosShortNamer.fromUnparsedRules(defaultRealm, principalToLocalRules); } - this.jaasConfig = JaasUtils.jaasConfig(loginType, configs); - this.loginManager = LoginManager.acquireLoginManager(loginType, hasKerberos, configs, jaasConfig); + this.loginManager = LoginManager.acquireLoginManager(jaasContext, hasKerberos, configs); if (this.securityProtocol == SecurityProtocol.SASL_SSL) { // Disable SSL client authentication as we are using SASL authentication @@ -101,8 +100,9 @@ public class SaslChannelBuilder implements ChannelBuilder { TransportLayer transportLayer = buildTransportLayer(id, key, socketChannel); Authenticator authenticator; if (mode == Mode.SERVER) - authenticator = new SaslServerAuthenticator(id, jaasConfig, loginManager.subject(), kerberosShortNamer, - socketChannel.socket().getLocalAddress().getHostName(), maxReceiveSize, credentialCache); + authenticator = new SaslServerAuthenticator(id, jaasContext, loginManager.subject(), + kerberosShortNamer, socketChannel.socket().getLocalAddress().getHostName(), maxReceiveSize, + credentialCache); else authenticator = new SaslClientAuthenticator(id, loginManager.subject(), loginManager.serviceName(), socketChannel.socket().getInetAddress().getHostName(), clientSaslMechanism, handshakeRequestEnable); @@ -128,4 +128,26 @@ public class SaslChannelBuilder implements ChannelBuilder { return new PlaintextTransportLayer(key); } } + + private static String defaultKerberosRealm() throws ClassNotFoundException, NoSuchMethodException, + IllegalArgumentException, IllegalAccessException, InvocationTargetException { + + //TODO Find a way to avoid using these proprietary classes as access to Java 9 will block access by default + //due to the Jigsaw module system + + Object kerbConf; + Class classRef; + Method getInstanceMethod; + Method getDefaultRealmMethod; + if (System.getProperty("java.vendor").contains("IBM")) { + classRef = Class.forName("com.ibm.security.krb5.internal.Config"); + } else { + classRef = Class.forName("sun.security.krb5.Config"); + } + getInstanceMethod = classRef.getMethod("getInstance", new Class[0]); + kerbConf = getInstanceMethod.invoke(classRef, new Object[0]); + getDefaultRealmMethod = classRef.getDeclaredMethod("getDefaultRealm", + new Class[0]); + return (String) getDefaultRealmMethod.invoke(kerbConf, new Object[0]); + } } diff --git a/clients/src/main/java/org/apache/kafka/common/security/JaasConfig.java b/clients/src/main/java/org/apache/kafka/common/security/JaasConfig.java index 2128c6136a1..d900be29780 100644 --- a/clients/src/main/java/org/apache/kafka/common/security/JaasConfig.java +++ b/clients/src/main/java/org/apache/kafka/common/security/JaasConfig.java @@ -32,7 +32,6 @@ import javax.security.auth.login.AppConfigurationEntry.LoginModuleControlFlag; import org.apache.kafka.common.KafkaException; import org.apache.kafka.common.config.SaslConfigs; -import org.apache.kafka.common.network.LoginType; /** * JAAS configuration parser that constructs a JAAS configuration object with a single @@ -51,7 +50,7 @@ class JaasConfig extends Configuration { private final String loginContextName; private final List configEntries; - public JaasConfig(LoginType loginType, String jaasConfigParams) { + public JaasConfig(String loginContextName, String jaasConfigParams) { StreamTokenizer tokenizer = new StreamTokenizer(new StringReader(jaasConfigParams)); tokenizer.slashSlashComments(true); tokenizer.slashStarComments(true); @@ -67,7 +66,7 @@ class JaasConfig extends Configuration { if (configEntries.isEmpty()) throw new IllegalArgumentException("Login module not specified in JAAS config"); - this.loginContextName = loginType.contextName(); + this.loginContextName = loginContextName; } catch (IOException e) { throw new KafkaException("Unexpected exception while parsing JAAS config"); @@ -120,4 +119,4 @@ class JaasConfig extends Configuration { throw new IllegalArgumentException("JAAS config entry not terminated by semi-colon"); return new AppConfigurationEntry(loginModule, controlFlag, options); } -} \ No newline at end of file +} diff --git a/clients/src/main/java/org/apache/kafka/common/security/JaasContext.java b/clients/src/main/java/org/apache/kafka/common/security/JaasContext.java new file mode 100644 index 00000000000..6abeef62b57 --- /dev/null +++ b/clients/src/main/java/org/apache/kafka/common/security/JaasContext.java @@ -0,0 +1,190 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kafka.common.security; + +import org.apache.kafka.common.config.SaslConfigs; +import org.apache.kafka.common.config.types.Password; +import org.apache.kafka.common.network.ListenerName; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import javax.security.auth.login.AppConfigurationEntry; +import javax.security.auth.login.Configuration; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collections; +import java.util.List; +import java.util.Locale; +import java.util.Map; + +public class JaasContext { + + private static final Logger LOG = LoggerFactory.getLogger(JaasUtils.class); + + private static final String GLOBAL_CONTEXT_NAME_SERVER = "KafkaServer"; + private static final String GLOBAL_CONTEXT_NAME_CLIENT = "KafkaClient"; + + /** + * Returns an instance of this class. + * + * For contextType SERVER, the context will contain the default Configuration and the context name will be one of: + * + * 1. Lowercased listener name followed by a period and the string `KafkaServer` + * 2. The string `KafkaServer` + * + * If both are valid entries in the JAAS configuration, the first option is chosen. + * + * For contextType CLIENT, if JAAS configuration property @link SaslConfigs#SASL_JAAS_CONFIG} is specified, + * the configuration object is created by parsing the property value. Otherwise, the default Configuration + * is returned. The context name is always `KafkaClient`. + * + * @throws IllegalArgumentException if JAAS configuration property is specified for contextType SERVER, if + * listenerName is not defined for contextType SERVER of if listenerName is defined for contextType CLIENT. + */ + public static JaasContext load(JaasContext.Type contextType, ListenerName listenerName, + Map configs) { + String listenerContextName; + String globalContextName; + switch (contextType) { + case CLIENT: + if (listenerName != null) + throw new IllegalArgumentException("listenerName should be null for CLIENT"); + globalContextName = GLOBAL_CONTEXT_NAME_CLIENT; + listenerContextName = null; + break; + case SERVER: + if (listenerName == null) + throw new IllegalArgumentException("listenerName should not be null for SERVER"); + globalContextName = GLOBAL_CONTEXT_NAME_SERVER; + listenerContextName = listenerName.value().toLowerCase(Locale.ROOT) + "." + GLOBAL_CONTEXT_NAME_SERVER; + break; + default: + throw new IllegalArgumentException("Unexpected context type " + contextType); + } + return load(contextType, listenerContextName, globalContextName, configs); + } + + static JaasContext load(JaasContext.Type contextType, String listenerContextName, + String globalContextName, Map configs) { + Password jaasConfigArgs = (Password) configs.get(SaslConfigs.SASL_JAAS_CONFIG); + if (jaasConfigArgs != null) { + if (contextType == JaasContext.Type.SERVER) + throw new IllegalArgumentException("JAAS config property not supported for server"); + else { + JaasConfig jaasConfig = new JaasConfig(globalContextName, jaasConfigArgs.value()); + AppConfigurationEntry[] clientModules = jaasConfig.getAppConfigurationEntry(globalContextName); + int numModules = clientModules == null ? 0 : clientModules.length; + if (numModules != 1) + throw new IllegalArgumentException("JAAS config property contains " + numModules + " login modules, should be 1 module"); + return new JaasContext(globalContextName, contextType, jaasConfig); + } + } else + return defaultContext(contextType, listenerContextName, globalContextName); + } + + private static JaasContext defaultContext(JaasContext.Type contextType, String listenerContextName, + String globalContextName) { + String jaasConfigFile = System.getProperty(JaasUtils.JAVA_LOGIN_CONFIG_PARAM); + if (jaasConfigFile == null) { + if (contextType == Type.CLIENT) { + LOG.debug("System property '" + JaasUtils.JAVA_LOGIN_CONFIG_PARAM + "' and Kafka SASL property '" + + SaslConfigs.SASL_JAAS_CONFIG + "' are not set, using default JAAS configuration."); + } else { + LOG.debug("System property '" + JaasUtils.JAVA_LOGIN_CONFIG_PARAM + "' is not set, using default JAAS " + + "configuration."); + } + } + + Configuration jaasConfig = Configuration.getConfiguration(); + + AppConfigurationEntry[] configEntries = null; + String contextName = globalContextName; + + if (listenerContextName != null) { + configEntries = jaasConfig.getAppConfigurationEntry(listenerContextName); + if (configEntries != null) + contextName = listenerContextName; + } + + if (configEntries == null) + configEntries = jaasConfig.getAppConfigurationEntry(globalContextName); + + if (configEntries == null) { + String listenerNameText = listenerContextName == null ? "" : " or '" + listenerContextName + "'"; + String errorMessage = "Could not find a '" + globalContextName + "'" + listenerNameText + " entry in the JAAS " + + "configuration. System property '" + JaasUtils.JAVA_LOGIN_CONFIG_PARAM + "' is " + + (jaasConfigFile == null ? "not set" : jaasConfigFile); + throw new IllegalArgumentException(errorMessage); + } + + return new JaasContext(contextName, contextType, jaasConfig); + } + + /** + * The type of the SASL login context, it should be SERVER for the broker and CLIENT for the clients (consumer, producer, + * etc.). This is used to validate behaviour (e.g. some functionality is only available in the broker or clients). + */ + public enum Type { CLIENT, SERVER; } + + private final String name; + private final Type type; + private final Configuration configuration; + private final List configurationEntries; + + public JaasContext(String name, Type type, Configuration configuration) { + this.name = name; + this.type = type; + this.configuration = configuration; + AppConfigurationEntry[] entries = configuration.getAppConfigurationEntry(name); + if (entries == null) + throw new IllegalArgumentException("Could not find a '" + name + "' entry in this JAAS configuration."); + this.configurationEntries = Collections.unmodifiableList(new ArrayList<>(Arrays.asList(entries))); + } + + public String name() { + return name; + } + + public Type type() { + return type; + } + + public Configuration configuration() { + return configuration; + } + + public List configurationEntries() { + return configurationEntries; + } + + /** + * Returns the configuration option for key from this context. + * If login module name is specified, return option value only from that module. + */ + public String configEntryOption(String key, String loginModuleName) { + for (AppConfigurationEntry entry : configurationEntries) { + if (loginModuleName != null && !loginModuleName.equals(entry.getLoginModuleName())) + continue; + Object val = entry.getOptions().get(key); + if (val != null) + return (String) val; + } + return null; + } + +} diff --git a/clients/src/main/java/org/apache/kafka/common/security/JaasUtils.java b/clients/src/main/java/org/apache/kafka/common/security/JaasUtils.java index e326156023b..ca6b7f082af 100644 --- a/clients/src/main/java/org/apache/kafka/common/security/JaasUtils.java +++ b/clients/src/main/java/org/apache/kafka/common/security/JaasUtils.java @@ -17,16 +17,8 @@ package org.apache.kafka.common.security; import javax.security.auth.login.Configuration; -import javax.security.auth.login.AppConfigurationEntry; -import java.lang.reflect.InvocationTargetException; -import java.lang.reflect.Method; -import java.util.Map; -import java.io.IOException; import org.apache.kafka.common.KafkaException; -import org.apache.kafka.common.config.SaslConfigs; -import org.apache.kafka.common.config.types.Password; -import org.apache.kafka.common.network.LoginType; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -34,114 +26,11 @@ public class JaasUtils { private static final Logger LOG = LoggerFactory.getLogger(JaasUtils.class); public static final String JAVA_LOGIN_CONFIG_PARAM = "java.security.auth.login.config"; - public static final String LOGIN_CONTEXT_SERVER = "KafkaServer"; - public static final String LOGIN_CONTEXT_CLIENT = "KafkaClient"; public static final String SERVICE_NAME = "serviceName"; public static final String ZK_SASL_CLIENT = "zookeeper.sasl.client"; public static final String ZK_LOGIN_CONTEXT_NAME_KEY = "zookeeper.sasl.clientconfig"; - /** - * Returns a JAAS Configuration object. For loginType SERVER, default Configuration - * is returned. For loginType CLIENT, if JAAS configuration property - * {@link SaslConfigs#SASL_JAAS_CONFIG} is specified, the configuration object - * is created by parsing the property value. Otherwise, the default Configuration - * is returned. - * @throws IllegalArgumentException if JAAS configuration property is specified - * for loginType SERVER - */ - public static Configuration jaasConfig(LoginType loginType, Map configs) { - Password jaasConfigArgs = (Password) configs.get(SaslConfigs.SASL_JAAS_CONFIG); - if (jaasConfigArgs != null) { - if (loginType == LoginType.SERVER) - throw new IllegalArgumentException("JAAS config property not supported for server"); - else { - JaasConfig jaasConfig = new JaasConfig(loginType, jaasConfigArgs.value()); - AppConfigurationEntry[] clientModules = jaasConfig.getAppConfigurationEntry(LoginType.CLIENT.contextName()); - int numModules = clientModules == null ? 0 : clientModules.length; - if (numModules != 1) - throw new IllegalArgumentException("JAAS config property contains " + numModules + " login modules, should be one module"); - return jaasConfig; - } - } else - return defaultJaasConfig(loginType); - } - - private static Configuration defaultJaasConfig(LoginType loginType) { - String jaasConfigFile = System.getProperty(JaasUtils.JAVA_LOGIN_CONFIG_PARAM); - if (jaasConfigFile == null) { - LOG.debug("System property '" + JaasUtils.JAVA_LOGIN_CONFIG_PARAM + "' and Kafka SASL property '" + - SaslConfigs.SASL_JAAS_CONFIG + "' are not set, using default JAAS configuration."); - } - - Configuration jaasConfig = Configuration.getConfiguration(); - - String loginContextName = loginType.contextName(); - AppConfigurationEntry[] configEntries = jaasConfig.getAppConfigurationEntry(loginContextName); - if (configEntries == null) { - String errorMessage; - errorMessage = "Could not find a '" + loginContextName + "' entry in the JAAS configuration. System property '" + - JaasUtils.JAVA_LOGIN_CONFIG_PARAM + "' is " + (jaasConfigFile == null ? "not set" : jaasConfigFile); - throw new IllegalArgumentException(errorMessage); - } - return jaasConfig; - } - - /** - * Returns the configuration option for key from the server login context - * of the default JAAS configuration. If login module name is specified, return option value - * only from that module. - */ - public static String defaultServerJaasConfigOption(String key, String loginModuleName) throws IOException { - return jaasConfigOption(Configuration.getConfiguration(), LoginType.SERVER.contextName(), key, loginModuleName); - } - - /** - * Returns the configuration option for key from the login context - * loginContextName of the specified JAAS configuration. - * If login module name is specified, return option value only from that module. - */ - public static String jaasConfigOption(Configuration jaasConfig, String loginContextName, String key, String loginModuleName) throws IOException { - AppConfigurationEntry[] configurationEntries = jaasConfig.getAppConfigurationEntry(loginContextName); - if (configurationEntries == null) { - String errorMessage = "Could not find a '" + loginContextName + "' entry in this JAAS configuration."; - throw new IOException(errorMessage); - } - - for (AppConfigurationEntry entry: configurationEntries) { - if (loginModuleName != null && !loginModuleName.equals(entry.getLoginModuleName())) - continue; - Object val = entry.getOptions().get(key); - if (val != null) - return (String) val; - } - return null; - } - - public static String defaultKerberosRealm() - throws ClassNotFoundException, NoSuchMethodException, - IllegalArgumentException, IllegalAccessException, - InvocationTargetException { - - //TODO Find a way to avoid using these proprietary classes as access to Java 9 will block access by default - //due to the Jigsaw module system - - Object kerbConf; - Class classRef; - Method getInstanceMethod; - Method getDefaultRealmMethod; - if (System.getProperty("java.vendor").contains("IBM")) { - classRef = Class.forName("com.ibm.security.krb5.internal.Config"); - } else { - classRef = Class.forName("sun.security.krb5.Config"); - } - getInstanceMethod = classRef.getMethod("getInstance", new Class[0]); - kerbConf = getInstanceMethod.invoke(classRef, new Object[0]); - getDefaultRealmMethod = classRef.getDeclaredMethod("getDefaultRealm", - new Class[0]); - return (String) getDefaultRealmMethod.invoke(kerbConf, new Object[0]); - } - public static boolean isZkSecurityEnabled() { boolean zkSaslEnabled = Boolean.parseBoolean(System.getProperty(ZK_SASL_CLIENT, "true")); String zkLoginContextName = System.getProperty(ZK_LOGIN_CONTEXT_NAME_KEY, "Client"); diff --git a/clients/src/main/java/org/apache/kafka/common/security/auth/DefaultPrincipalBuilder.java b/clients/src/main/java/org/apache/kafka/common/security/auth/DefaultPrincipalBuilder.java index fbbeb9e8bc7..0ea935c58a6 100644 --- a/clients/src/main/java/org/apache/kafka/common/security/auth/DefaultPrincipalBuilder.java +++ b/clients/src/main/java/org/apache/kafka/common/security/auth/DefaultPrincipalBuilder.java @@ -25,7 +25,6 @@ import org.apache.kafka.common.network.Authenticator; import org.apache.kafka.common.KafkaException; /** DefaultPrincipalBuilder which return transportLayer's peer Principal **/ - public class DefaultPrincipalBuilder implements PrincipalBuilder { public void configure(Map configs) {} diff --git a/clients/src/main/java/org/apache/kafka/common/security/auth/Login.java b/clients/src/main/java/org/apache/kafka/common/security/auth/Login.java index 2f831c094d4..bc550943b63 100644 --- a/clients/src/main/java/org/apache/kafka/common/security/auth/Login.java +++ b/clients/src/main/java/org/apache/kafka/common/security/auth/Login.java @@ -18,10 +18,11 @@ package org.apache.kafka.common.security.auth; +import org.apache.kafka.common.security.JaasContext; + import java.util.Map; import javax.security.auth.Subject; -import javax.security.auth.login.Configuration; import javax.security.auth.login.LoginContext; import javax.security.auth.login.LoginException; @@ -33,7 +34,7 @@ public interface Login { /** * Configures this login instance. */ - void configure(Map configs, Configuration jaasConfig, String loginContextName); + void configure(Map configs, JaasContext jaasContext); /** * Performs login for each login module specified for the login context of this instance. diff --git a/clients/src/main/java/org/apache/kafka/common/security/authenticator/AbstractLogin.java b/clients/src/main/java/org/apache/kafka/common/security/authenticator/AbstractLogin.java index e1bbbce429b..1b9953cb509 100644 --- a/clients/src/main/java/org/apache/kafka/common/security/authenticator/AbstractLogin.java +++ b/clients/src/main/java/org/apache/kafka/common/security/authenticator/AbstractLogin.java @@ -18,7 +18,6 @@ package org.apache.kafka.common.security.authenticator; -import javax.security.auth.login.Configuration; import javax.security.auth.login.LoginContext; import javax.security.auth.login.LoginException; import javax.security.sasl.RealmCallback; @@ -29,6 +28,7 @@ import javax.security.auth.callback.PasswordCallback; import javax.security.auth.callback.UnsupportedCallbackException; import javax.security.auth.Subject; +import org.apache.kafka.common.security.JaasContext; import org.apache.kafka.common.security.auth.Login; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -41,20 +41,17 @@ import java.util.Map; public abstract class AbstractLogin implements Login { private static final Logger log = LoggerFactory.getLogger(AbstractLogin.class); - private Configuration jaasConfig; - private String loginContextName; + private JaasContext jaasContext; private LoginContext loginContext; - @Override - public void configure(Map configs, Configuration jaasConfig, String loginContextName) { - this.jaasConfig = jaasConfig; - this.loginContextName = loginContextName; + public void configure(Map configs, JaasContext jaasContext) { + this.jaasContext = jaasContext; } @Override public LoginContext login() throws LoginException { - loginContext = new LoginContext(loginContextName, null, new LoginCallbackHandler(), jaasConfig); + loginContext = new LoginContext(jaasContext.name(), null, new LoginCallbackHandler(), jaasContext.configuration()); loginContext.login(); log.info("Successfully logged in."); return loginContext; @@ -65,8 +62,8 @@ public abstract class AbstractLogin implements Login { return loginContext.getSubject(); } - protected Configuration jaasConfig() { - return jaasConfig; + protected JaasContext jaasContext() { + return jaasContext; } /** diff --git a/clients/src/main/java/org/apache/kafka/common/security/authenticator/LoginManager.java b/clients/src/main/java/org/apache/kafka/common/security/authenticator/LoginManager.java index a28afae180c..55b561cc803 100644 --- a/clients/src/main/java/org/apache/kafka/common/security/authenticator/LoginManager.java +++ b/clients/src/main/java/org/apache/kafka/common/security/authenticator/LoginManager.java @@ -19,39 +19,36 @@ package org.apache.kafka.common.security.authenticator; import javax.security.auth.Subject; -import javax.security.auth.login.Configuration; import javax.security.auth.login.LoginException; import java.io.IOException; import java.util.ArrayList; -import java.util.EnumMap; import java.util.HashMap; import java.util.Map; import org.apache.kafka.common.config.SaslConfigs; import org.apache.kafka.common.config.types.Password; -import org.apache.kafka.common.network.LoginType; +import org.apache.kafka.common.security.JaasContext; import org.apache.kafka.common.security.auth.Login; import org.apache.kafka.common.security.kerberos.KerberosLogin; public class LoginManager { // static configs (broker or client) - private static final EnumMap LOGIN_TYPE_INSTANCES = new EnumMap<>(LoginType.class); + private static final Map STATIC_INSTANCES = new HashMap<>(); // dynamic configs (client-only) - private static final Map JAAS_CONF_INSTANCES = new HashMap<>(); + private static final Map DYNAMIC_INSTANCES = new HashMap<>(); private final Login login; private final Object cacheKey; private int refCount; - private LoginManager(LoginType loginType, boolean hasKerberos, Map configs, Configuration jaasConfig, + private LoginManager(JaasContext jaasContext, boolean hasKerberos, Map configs, Password jaasConfigValue) throws IOException, LoginException { - this.cacheKey = jaasConfigValue != null ? jaasConfigValue : loginType; - String loginContext = loginType.contextName(); + this.cacheKey = jaasConfigValue != null ? jaasConfigValue : jaasContext.name(); login = hasKerberos ? new KerberosLogin() : new DefaultLogin(); - login.configure(configs, jaasConfig, loginContext); + login.configure(configs, jaasContext); login.login(); } @@ -59,35 +56,30 @@ public class LoginManager { * Returns an instance of `LoginManager` and increases its reference count. * * `release()` should be invoked when the `LoginManager` is no longer needed. This method will try to reuse an - * existing `LoginManager` for the provided `loginType` and `SaslConfigs.SASL_JAAS_CONFIG` in `configs`, if available. + * existing `LoginManager` for the provided context type and `SaslConfigs.SASL_JAAS_CONFIG` in `configs`, + * if available. * * This is a bit ugly and it would be nicer if we could pass the `LoginManager` to `ChannelBuilders.create` and * shut it down when the broker or clients are closed. It's straightforward to do the former, but it's more * complicated to do the latter without making the consumer API more complex. - * - * @param loginType the type of the login context, it should be SERVER for the broker and CLIENT for the clients - * (i.e. consumer and producer) - * @param hasKerberos - * @param configs configuration as key/value pairs - * @param jaasConfig JAAS Configuration object */ - public static LoginManager acquireLoginManager(LoginType loginType, boolean hasKerberos, Map configs, - Configuration jaasConfig) throws IOException, LoginException { + public static LoginManager acquireLoginManager(JaasContext jaasContext, boolean hasKerberos, + Map configs) throws IOException, LoginException { synchronized (LoginManager.class) { // SASL_JAAS_CONFIG is only supported by clients LoginManager loginManager; Password jaasConfigValue = (Password) configs.get(SaslConfigs.SASL_JAAS_CONFIG); - if (loginType == LoginType.CLIENT && jaasConfigValue != null) { - loginManager = JAAS_CONF_INSTANCES.get(jaasConfigValue); + if (jaasContext.type() == JaasContext.Type.CLIENT && jaasConfigValue != null) { + loginManager = DYNAMIC_INSTANCES.get(jaasConfigValue); if (loginManager == null) { - loginManager = new LoginManager(loginType, hasKerberos, configs, jaasConfig, jaasConfigValue); - JAAS_CONF_INSTANCES.put(jaasConfigValue, loginManager); + loginManager = new LoginManager(jaasContext, hasKerberos, configs, jaasConfigValue); + DYNAMIC_INSTANCES.put(jaasConfigValue, loginManager); } } else { - loginManager = LOGIN_TYPE_INSTANCES.get(loginType); + loginManager = STATIC_INSTANCES.get(jaasContext.name()); if (loginManager == null) { - loginManager = new LoginManager(loginType, hasKerberos, configs, jaasConfig, jaasConfigValue); - LOGIN_TYPE_INSTANCES.put(loginType, loginManager); + loginManager = new LoginManager(jaasContext, hasKerberos, configs, jaasConfigValue); + STATIC_INSTANCES.put(jaasContext.name(), loginManager); } } return loginManager.acquire(); @@ -116,9 +108,9 @@ public class LoginManager { throw new IllegalStateException("release called on LoginManager with refCount == 0"); else if (refCount == 1) { if (cacheKey instanceof Password) { - JAAS_CONF_INSTANCES.remove(cacheKey); + DYNAMIC_INSTANCES.remove(cacheKey); } else { - LOGIN_TYPE_INSTANCES.remove(cacheKey); + STATIC_INSTANCES.remove(cacheKey); } login.close(); } @@ -129,10 +121,10 @@ public class LoginManager { /* Should only be used in tests. */ public static void closeAll() { synchronized (LoginManager.class) { - for (LoginType key : new ArrayList<>(LOGIN_TYPE_INSTANCES.keySet())) - LOGIN_TYPE_INSTANCES.remove(key).login.close(); - for (Password key : new ArrayList<>(JAAS_CONF_INSTANCES.keySet())) - JAAS_CONF_INSTANCES.remove(key).login.close(); + for (String key : new ArrayList<>(STATIC_INSTANCES.keySet())) + STATIC_INSTANCES.remove(key).login.close(); + for (Password key : new ArrayList<>(DYNAMIC_INSTANCES.keySet())) + DYNAMIC_INSTANCES.remove(key).login.close(); } } } diff --git a/clients/src/main/java/org/apache/kafka/common/security/authenticator/SaslClientCallbackHandler.java b/clients/src/main/java/org/apache/kafka/common/security/authenticator/SaslClientCallbackHandler.java index 8e0b8dbfa3d..3a38e24b94b 100644 --- a/clients/src/main/java/org/apache/kafka/common/security/authenticator/SaslClientCallbackHandler.java +++ b/clients/src/main/java/org/apache/kafka/common/security/authenticator/SaslClientCallbackHandler.java @@ -91,4 +91,4 @@ public class SaslClientCallbackHandler implements AuthCallbackHandler { @Override public void close() { } -} \ No newline at end of file +} diff --git a/clients/src/main/java/org/apache/kafka/common/security/authenticator/SaslServerAuthenticator.java b/clients/src/main/java/org/apache/kafka/common/security/authenticator/SaslServerAuthenticator.java index 069e12f3e94..07792d26674 100644 --- a/clients/src/main/java/org/apache/kafka/common/security/authenticator/SaslServerAuthenticator.java +++ b/clients/src/main/java/org/apache/kafka/common/security/authenticator/SaslServerAuthenticator.java @@ -24,6 +24,7 @@ import org.apache.kafka.common.errors.IllegalSaslStateException; import org.apache.kafka.common.errors.UnsupportedSaslMechanismException; import org.apache.kafka.common.errors.UnsupportedVersionException; import org.apache.kafka.common.network.Authenticator; +import org.apache.kafka.common.security.JaasContext; import org.apache.kafka.common.network.Mode; import org.apache.kafka.common.network.NetworkReceive; import org.apache.kafka.common.network.NetworkSend; @@ -57,7 +58,6 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; import javax.security.auth.Subject; -import javax.security.auth.login.Configuration; import javax.security.sasl.Sasl; import javax.security.sasl.SaslException; import javax.security.sasl.SaslServer; @@ -81,7 +81,7 @@ public class SaslServerAuthenticator implements Authenticator { } private final String node; - private final Configuration jaasConfig; + private final JaasContext jaasContext; private final Subject subject; private final KerberosShortNamer kerberosNamer; private final int maxReceiveSize; @@ -105,11 +105,11 @@ public class SaslServerAuthenticator implements Authenticator { private NetworkReceive netInBuffer; private Send netOutBuffer; - public SaslServerAuthenticator(String node, Configuration jaasConfig, final Subject subject, KerberosShortNamer kerberosNameParser, String host, int maxReceiveSize, CredentialCache credentialCache) throws IOException { + public SaslServerAuthenticator(String node, JaasContext jaasContext, final Subject subject, KerberosShortNamer kerberosNameParser, String host, int maxReceiveSize, CredentialCache credentialCache) throws IOException { if (subject == null) throw new IllegalArgumentException("subject cannot be null"); this.node = node; - this.jaasConfig = jaasConfig; + this.jaasContext = jaasContext; this.subject = subject; this.kerberosNamer = kerberosNameParser; this.maxReceiveSize = maxReceiveSize; @@ -129,7 +129,7 @@ public class SaslServerAuthenticator implements Authenticator { private void createSaslServer(String mechanism) throws IOException { this.saslMechanism = mechanism; if (!ScramMechanism.isScram(mechanism)) - callbackHandler = new SaslServerCallbackHandler(jaasConfig, kerberosNamer); + callbackHandler = new SaslServerCallbackHandler(jaasContext, kerberosNamer); else callbackHandler = new ScramServerCallbackHandler(credentialCache.cache(mechanism, ScramCredential.class)); callbackHandler.configure(configs, Mode.SERVER, subject, saslMechanism); diff --git a/clients/src/main/java/org/apache/kafka/common/security/authenticator/SaslServerCallbackHandler.java b/clients/src/main/java/org/apache/kafka/common/security/authenticator/SaslServerCallbackHandler.java index c01f01d6a26..94083fb8b11 100644 --- a/clients/src/main/java/org/apache/kafka/common/security/authenticator/SaslServerCallbackHandler.java +++ b/clients/src/main/java/org/apache/kafka/common/security/authenticator/SaslServerCallbackHandler.java @@ -21,6 +21,7 @@ package org.apache.kafka.common.security.authenticator; import java.io.IOException; import java.util.Map; +import org.apache.kafka.common.security.JaasContext; import org.apache.kafka.common.security.auth.AuthCallbackHandler; import org.apache.kafka.common.security.kerberos.KerberosShortNamer; import org.slf4j.Logger; @@ -29,14 +30,11 @@ import org.slf4j.LoggerFactory; import javax.security.auth.Subject; import javax.security.auth.callback.Callback; import javax.security.auth.callback.UnsupportedCallbackException; -import javax.security.auth.login.AppConfigurationEntry; -import javax.security.auth.login.Configuration; import javax.security.sasl.AuthorizeCallback; import javax.security.sasl.RealmCallback; import org.apache.kafka.common.security.kerberos.KerberosName; import org.apache.kafka.common.network.Mode; -import org.apache.kafka.common.security.JaasUtils; /** * Callback handler for Sasl servers. The callbacks required for all the SASL @@ -47,11 +45,10 @@ import org.apache.kafka.common.security.JaasUtils; public class SaslServerCallbackHandler implements AuthCallbackHandler { private static final Logger LOG = LoggerFactory.getLogger(SaslServerCallbackHandler.class); private final KerberosShortNamer kerberosShortNamer; + private final JaasContext jaasContext; - public SaslServerCallbackHandler(Configuration configuration, KerberosShortNamer kerberosNameParser) throws IOException { - AppConfigurationEntry[] configurationEntries = configuration.getAppConfigurationEntry(JaasUtils.LOGIN_CONTEXT_SERVER); - if (configurationEntries == null) - throw new IOException("Could not find a 'KafkaServer' entry in this configuration: Kafka Server cannot start."); + public SaslServerCallbackHandler(JaasContext jaasContext, KerberosShortNamer kerberosNameParser) throws IOException { + this.jaasContext = jaasContext; this.kerberosShortNamer = kerberosNameParser; } @@ -59,6 +56,10 @@ public class SaslServerCallbackHandler implements AuthCallbackHandler { public void configure(Map configs, Mode mode, Subject subject, String saslMechanism) { } + public JaasContext jaasContext() { + return jaasContext; + } + @Override public void handle(Callback[] callbacks) throws UnsupportedCallbackException { for (Callback callback : callbacks) { diff --git a/clients/src/main/java/org/apache/kafka/common/security/kerberos/KerberosLogin.java b/clients/src/main/java/org/apache/kafka/common/security/kerberos/KerberosLogin.java index 32a09290367..23d163cf621 100644 --- a/clients/src/main/java/org/apache/kafka/common/security/kerberos/KerberosLogin.java +++ b/clients/src/main/java/org/apache/kafka/common/security/kerberos/KerberosLogin.java @@ -20,13 +20,12 @@ package org.apache.kafka.common.security.kerberos; import javax.security.auth.kerberos.KerberosPrincipal; import javax.security.auth.login.AppConfigurationEntry; -import javax.security.auth.login.Configuration; import javax.security.auth.login.LoginContext; import javax.security.auth.login.LoginException; import javax.security.auth.kerberos.KerberosTicket; import javax.security.auth.Subject; -import org.apache.kafka.common.KafkaException; +import org.apache.kafka.common.security.JaasContext; import org.apache.kafka.common.security.JaasUtils; import org.apache.kafka.common.security.authenticator.AbstractLogin; import org.apache.kafka.common.config.SaslConfigs; @@ -36,8 +35,8 @@ import org.apache.kafka.common.utils.Utils; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import java.io.IOException; import java.util.Date; +import java.util.List; import java.util.Random; import java.util.Set; import java.util.Map; @@ -56,7 +55,6 @@ public class KerberosLogin extends AbstractLogin { private boolean isKrbTicket; private boolean isUsingTicketCache; - private String loginContextName; private String principal; // LoginThread will sleep until 80% of time from last refresh to @@ -82,26 +80,19 @@ public class KerberosLogin extends AbstractLogin { private String serviceName; private long lastLogin; - /** - * Login constructor. The constructor starts the thread used - * to periodically re-login to the Kerberos Ticket Granting Server. - * @param loginContextName - * name of section in JAAS file that will be use to login. - * Passed as first param to javax.security.auth.login.LoginContext(). - * @param configs configure Login with the given key-value pairs. - * @throws javax.security.auth.login.LoginException - * Thrown if authentication fails. - */ - public void configure(Map configs, Configuration jaasConfig, final String loginContextName) { - super.configure(configs, jaasConfig, loginContextName); - this.loginContextName = loginContextName; + public void configure(Map configs, JaasContext jaasContext) { + super.configure(configs, jaasContext); this.ticketRenewWindowFactor = (Double) configs.get(SaslConfigs.SASL_KERBEROS_TICKET_RENEW_WINDOW_FACTOR); this.ticketRenewJitter = (Double) configs.get(SaslConfigs.SASL_KERBEROS_TICKET_RENEW_JITTER); this.minTimeBeforeRelogin = (Long) configs.get(SaslConfigs.SASL_KERBEROS_MIN_TIME_BEFORE_RELOGIN); this.kinitCmd = (String) configs.get(SaslConfigs.SASL_KERBEROS_KINIT_CMD); - this.serviceName = getServiceName(jaasConfig, configs, loginContextName); + this.serviceName = getServiceName(configs, jaasContext); } + /** + * Performs login for each login module specified for the login context of this instance and starts the thread used + * to periodically re-login to the Kerberos Ticket Granting Server. + */ @Override public LoginContext login() throws LoginException { @@ -110,13 +101,13 @@ public class KerberosLogin extends AbstractLogin { subject = loginContext.getSubject(); isKrbTicket = !subject.getPrivateCredentials(KerberosTicket.class).isEmpty(); - AppConfigurationEntry[] entries = jaasConfig().getAppConfigurationEntry(loginContextName); - if (entries.length == 0) { + List entries = jaasContext().configurationEntries(); + if (entries.isEmpty()) { isUsingTicketCache = false; principal = null; } else { // there will only be a single entry - AppConfigurationEntry entry = entries[0]; + AppConfigurationEntry entry = entries.get(0); if (entry.getOptions().get("useTicketCache") != null) { String val = (String) entry.getOptions().get("useTicketCache"); isUsingTicketCache = val.equals("true"); @@ -292,13 +283,8 @@ public class KerberosLogin extends AbstractLogin { return serviceName; } - private String getServiceName(Configuration jaasConfig, Map configs, String loginContext) { - String jaasServiceName; - try { - jaasServiceName = JaasUtils.jaasConfigOption(jaasConfig, loginContext, JaasUtils.SERVICE_NAME, null); - } catch (IOException e) { - throw new KafkaException("JAAS configuration entry not found", e); - } + private static String getServiceName(Map configs, JaasContext jaasContext) { + String jaasServiceName = jaasContext.configEntryOption(JaasUtils.SERVICE_NAME, null); String configServiceName = (String) configs.get(SaslConfigs.SASL_KERBEROS_SERVICE_NAME); if (jaasServiceName != null && configServiceName != null && !jaasServiceName.equals(configServiceName)) { String message = String.format("Conflicting serviceName values found in JAAS and Kafka configs " + @@ -377,7 +363,7 @@ public class KerberosLogin extends AbstractLogin { loginContext.logout(); //login and also update the subject field of this instance to //have the new credentials (pass it to the LoginContext constructor) - loginContext = new LoginContext(loginContextName, subject, null, jaasConfig()); + loginContext = new LoginContext(jaasContext().name(), subject, null, jaasContext().configuration()); log.info("Initiating re-login for {}", principal); loginContext.login(); } diff --git a/clients/src/main/java/org/apache/kafka/common/security/plain/PlainSaslServer.java b/clients/src/main/java/org/apache/kafka/common/security/plain/PlainSaslServer.java index 09280578ffd..1ad02237be3 100644 --- a/clients/src/main/java/org/apache/kafka/common/security/plain/PlainSaslServer.java +++ b/clients/src/main/java/org/apache/kafka/common/security/plain/PlainSaslServer.java @@ -18,7 +18,6 @@ package org.apache.kafka.common.security.plain; -import java.io.IOException; import java.io.UnsupportedEncodingException; import java.util.Arrays; import java.util.Map; @@ -29,7 +28,8 @@ import javax.security.sasl.SaslException; import javax.security.sasl.SaslServer; import javax.security.sasl.SaslServerFactory; -import org.apache.kafka.common.security.JaasUtils; +import org.apache.kafka.common.security.JaasContext; +import org.apache.kafka.common.security.authenticator.SaslServerCallbackHandler; /** * Simple SaslServer implementation for SASL/PLAIN. In order to make this implementation @@ -49,10 +49,13 @@ public class PlainSaslServer implements SaslServer { public static final String PLAIN_MECHANISM = "PLAIN"; private static final String JAAS_USER_PREFIX = "user_"; + private final JaasContext jaasContext; + private boolean complete; private String authorizationID; - public PlainSaslServer(CallbackHandler callbackHandler) { + public PlainSaslServer(JaasContext jaasContext) { + this.jaasContext = jaasContext; } @Override @@ -91,13 +94,10 @@ public class PlainSaslServer implements SaslServer { if (authorizationID.isEmpty()) authorizationID = username; - try { - String expectedPassword = JaasUtils.defaultServerJaasConfigOption(JAAS_USER_PREFIX + username, PlainLoginModule.class.getName()); - if (!password.equals(expectedPassword)) { - throw new SaslException("Authentication failed: Invalid username or password"); - } - } catch (IOException e) { - throw new SaslException("Authentication failed: Invalid JAAS configuration", e); + String expectedPassword = jaasContext.configEntryOption(JAAS_USER_PREFIX + username, + PlainLoginModule.class.getName()); + if (!password.equals(expectedPassword)) { + throw new SaslException("Authentication failed: Invalid username or password"); } complete = true; return new byte[0]; @@ -151,10 +151,13 @@ public class PlainSaslServer implements SaslServer { public SaslServer createSaslServer(String mechanism, String protocol, String serverName, Map props, CallbackHandler cbh) throws SaslException { - if (!PLAIN_MECHANISM.equals(mechanism)) { + if (!PLAIN_MECHANISM.equals(mechanism)) throw new SaslException(String.format("Mechanism \'%s\' is not supported. Only PLAIN is supported.", mechanism)); - } - return new PlainSaslServer(cbh); + + if (!(cbh instanceof SaslServerCallbackHandler)) + throw new SaslException("CallbackHandler must be of type SaslServerCallbackHandler, but it is: " + cbh.getClass()); + + return new PlainSaslServer(((SaslServerCallbackHandler) cbh).jaasContext()); } @Override diff --git a/clients/src/main/java/org/apache/kafka/common/security/scram/ScramServerCallbackHandler.java b/clients/src/main/java/org/apache/kafka/common/security/scram/ScramServerCallbackHandler.java index 46bfe5792e0..002489c5983 100644 --- a/clients/src/main/java/org/apache/kafka/common/security/scram/ScramServerCallbackHandler.java +++ b/clients/src/main/java/org/apache/kafka/common/security/scram/ScramServerCallbackHandler.java @@ -57,4 +57,4 @@ public class ScramServerCallbackHandler implements AuthCallbackHandler { @Override public void close() { } -} \ No newline at end of file +} diff --git a/clients/src/test/java/org/apache/kafka/common/config/AbstractConfigTest.java b/clients/src/test/java/org/apache/kafka/common/config/AbstractConfigTest.java index d483ef0591d..00c604e2219 100644 --- a/clients/src/test/java/org/apache/kafka/common/config/AbstractConfigTest.java +++ b/clients/src/test/java/org/apache/kafka/common/config/AbstractConfigTest.java @@ -18,6 +18,7 @@ import org.apache.kafka.common.config.ConfigDef.Type; import org.apache.kafka.common.metrics.FakeMetricsReporter; import org.apache.kafka.common.metrics.JmxReporter; import org.apache.kafka.common.metrics.MetricsReporter; +import org.apache.kafka.common.security.TestSecurityConfig; import org.junit.Test; import java.util.Arrays; @@ -27,6 +28,7 @@ import java.util.Map; import java.util.Properties; import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertNull; import static org.junit.Assert.assertTrue; import static org.junit.Assert.fail; import static org.junit.Assert.assertEquals; @@ -61,6 +63,62 @@ public class AbstractConfigTest { assertEquals(expected, originalsWithPrefix); } + @Test + public void testValuesWithPrefixOverride() { + String prefix = "prefix."; + Properties props = new Properties(); + props.put("sasl.mechanism", "PLAIN"); + props.put("prefix.sasl.mechanism", "GSSAPI"); + props.put("prefix.sasl.kerberos.kinit.cmd", "/usr/bin/kinit2"); + props.put("prefix.ssl.truststore.location", "my location"); + props.put("sasl.kerberos.service.name", "service name"); + props.put("ssl.keymanager.algorithm", "algorithm"); + TestSecurityConfig config = new TestSecurityConfig(props); + Map valuesWithPrefixOverride = config.valuesWithPrefixOverride(prefix); + + // prefix overrides global + assertTrue(config.unused().contains("prefix.sasl.mechanism")); + assertTrue(config.unused().contains("sasl.mechanism")); + assertEquals("GSSAPI", valuesWithPrefixOverride.get("sasl.mechanism")); + assertFalse(config.unused().contains("sasl.mechanism")); + assertFalse(config.unused().contains("prefix.sasl.mechanism")); + + // prefix overrides default + assertTrue(config.unused().contains("prefix.sasl.kerberos.kinit.cmd")); + assertFalse(config.unused().contains("sasl.kerberos.kinit.cmd")); + assertEquals("/usr/bin/kinit2", valuesWithPrefixOverride.get("sasl.kerberos.kinit.cmd")); + assertFalse(config.unused().contains("sasl.kerberos.kinit.cmd")); + assertFalse(config.unused().contains("prefix.sasl.kerberos.kinit.cmd")); + + // prefix override with no default + assertTrue(config.unused().contains("prefix.ssl.truststore.location")); + assertFalse(config.unused().contains("ssl.truststore.location")); + assertEquals("my location", valuesWithPrefixOverride.get("ssl.truststore.location")); + assertFalse(config.unused().contains("ssl.truststore.location")); + assertFalse(config.unused().contains("prefix.ssl.truststore.location")); + + // global overrides default + assertTrue(config.unused().contains("ssl.keymanager.algorithm")); + assertEquals("algorithm", valuesWithPrefixOverride.get("ssl.keymanager.algorithm")); + assertFalse(config.unused().contains("ssl.keymanager.algorithm")); + + // global with no default + assertTrue(config.unused().contains("sasl.kerberos.service.name")); + assertEquals("service name", valuesWithPrefixOverride.get("sasl.kerberos.service.name")); + assertFalse(config.unused().contains("sasl.kerberos.service.name")); + + // unset with default + assertFalse(config.unused().contains("sasl.kerberos.min.time.before.relogin")); + assertEquals(SaslConfigs.DEFAULT_KERBEROS_MIN_TIME_BEFORE_RELOGIN, + valuesWithPrefixOverride.get("sasl.kerberos.min.time.before.relogin")); + assertFalse(config.unused().contains("sasl.kerberos.min.time.before.relogin")); + + // unset with no default + assertFalse(config.unused().contains("ssl.key.password")); + assertNull(valuesWithPrefixOverride.get("ssl.key.password")); + assertFalse(config.unused().contains("ssl.key.password")); + } + @Test public void testUnused() { Properties props = new Properties(); diff --git a/clients/src/test/java/org/apache/kafka/common/network/NetworkTestUtils.java b/clients/src/test/java/org/apache/kafka/common/network/NetworkTestUtils.java index 969055d0ef8..78c08d5b3f5 100644 --- a/clients/src/test/java/org/apache/kafka/common/network/NetworkTestUtils.java +++ b/clients/src/test/java/org/apache/kafka/common/network/NetworkTestUtils.java @@ -14,11 +14,11 @@ package org.apache.kafka.common.network; import java.io.IOException; import java.nio.ByteBuffer; -import java.util.Map; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertTrue; +import org.apache.kafka.common.config.AbstractConfig; import org.apache.kafka.common.metrics.Metrics; import org.apache.kafka.common.protocol.SecurityProtocol; import org.apache.kafka.common.utils.MockTime; @@ -30,8 +30,9 @@ import org.apache.kafka.test.TestUtils; */ public class NetworkTestUtils { - public static NioEchoServer createEchoServer(SecurityProtocol securityProtocol, Map serverConfigs) throws Exception { - NioEchoServer server = new NioEchoServer(securityProtocol, serverConfigs, "localhost"); + public static NioEchoServer createEchoServer(ListenerName listenerName, SecurityProtocol securityProtocol, + AbstractConfig serverConfig) throws Exception { + NioEchoServer server = new NioEchoServer(listenerName, securityProtocol, serverConfig, "localhost"); server.start(); return server; } @@ -81,6 +82,6 @@ public class NetworkTestUtils { break; } } - assertTrue(closed); + assertTrue("Channel was not closed by timeout", closed); } } diff --git a/clients/src/test/java/org/apache/kafka/common/network/NioEchoServer.java b/clients/src/test/java/org/apache/kafka/common/network/NioEchoServer.java index 0bff0ee2195..fb00e9ca020 100644 --- a/clients/src/test/java/org/apache/kafka/common/network/NioEchoServer.java +++ b/clients/src/test/java/org/apache/kafka/common/network/NioEchoServer.java @@ -23,8 +23,8 @@ import java.util.ArrayList; import java.util.Collections; import java.util.Iterator; import java.util.List; -import java.util.Map; +import org.apache.kafka.common.config.AbstractConfig; import org.apache.kafka.common.metrics.Metrics; import org.apache.kafka.common.protocol.SecurityProtocol; import org.apache.kafka.common.security.authenticator.CredentialCache; @@ -47,7 +47,9 @@ public class NioEchoServer extends Thread { private volatile WritableByteChannel outputChannel; private final CredentialCache credentialCache; - public NioEchoServer(SecurityProtocol securityProtocol, Map configs, String serverHost) throws Exception { + public NioEchoServer(ListenerName listenerName, SecurityProtocol securityProtocol, AbstractConfig config, String serverHost) throws Exception { + super("echoserver"); + setDaemon(true); serverSocketChannel = ServerSocketChannel.open(); serverSocketChannel.configureBlocking(false); serverSocketChannel.socket().bind(new InetSocketAddress(serverHost, 0)); @@ -57,10 +59,8 @@ public class NioEchoServer extends Thread { this.credentialCache = new CredentialCache(); if (securityProtocol == SecurityProtocol.SASL_PLAINTEXT || securityProtocol == SecurityProtocol.SASL_SSL) ScramCredentialUtils.createCache(credentialCache, ScramMechanism.mechanismNames()); - ChannelBuilder channelBuilder = ChannelBuilders.serverChannelBuilder(securityProtocol, configs, credentialCache); + ChannelBuilder channelBuilder = ChannelBuilders.serverChannelBuilder(listenerName, securityProtocol, config, credentialCache); this.selector = new Selector(5000, new Metrics(), new MockTime(), "MetricGroup", channelBuilder); - setName("echoserver"); - setDaemon(true); acceptorThread = new AcceptorThread(); } diff --git a/clients/src/test/java/org/apache/kafka/common/network/SslTransportLayerTest.java b/clients/src/test/java/org/apache/kafka/common/network/SslTransportLayerTest.java index 01d8a25b21d..3bc1b50b31c 100644 --- a/clients/src/test/java/org/apache/kafka/common/network/SslTransportLayerTest.java +++ b/clients/src/test/java/org/apache/kafka/common/network/SslTransportLayerTest.java @@ -14,8 +14,6 @@ package org.apache.kafka.common.network; import static org.junit.Assert.fail; -import java.util.Arrays; -import java.util.Map; import java.io.ByteArrayOutputStream; import java.io.IOException; import java.net.InetAddress; @@ -24,19 +22,22 @@ import java.nio.ByteBuffer; import java.nio.channels.Channels; import java.nio.channels.SelectionKey; import java.nio.channels.SocketChannel; +import java.util.Arrays; +import java.util.Map; import javax.net.ssl.SSLContext; import javax.net.ssl.SSLEngine; import org.apache.kafka.common.KafkaException; import org.apache.kafka.common.config.SslConfigs; +import org.apache.kafka.common.config.types.Password; +import org.apache.kafka.common.security.TestSecurityConfig; import org.apache.kafka.common.security.ssl.SslFactory; import org.apache.kafka.common.metrics.Metrics; import org.apache.kafka.common.protocol.SecurityProtocol; import org.apache.kafka.common.utils.MockTime; import org.apache.kafka.test.TestCondition; import org.apache.kafka.test.TestUtils; -import org.apache.kafka.common.config.types.Password; import org.junit.After; import org.junit.Before; import org.junit.Test; @@ -83,7 +84,7 @@ public class SslTransportLayerTest { @Test public void testValidEndpointIdentification() throws Exception { String node = "0"; - server = NetworkTestUtils.createEchoServer(SecurityProtocol.SSL, sslServerConfigs); + server = createEchoServer(SecurityProtocol.SSL); sslClientConfigs.put(SslConfigs.SSL_ENDPOINT_IDENTIFICATION_ALGORITHM_CONFIG, "HTTPS"); createSelector(sslClientConfigs); InetSocketAddress addr = new InetSocketAddress("localhost", server.port()); @@ -105,7 +106,7 @@ public class SslTransportLayerTest { sslServerConfigs = serverCertStores.getTrustingConfig(clientCertStores); sslClientConfigs = clientCertStores.getTrustingConfig(serverCertStores); sslClientConfigs.put(SslConfigs.SSL_ENDPOINT_IDENTIFICATION_ALGORITHM_CONFIG, "HTTPS"); - server = NetworkTestUtils.createEchoServer(SecurityProtocol.SSL, sslServerConfigs); + server = createEchoServer(SecurityProtocol.SSL); createSelector(sslClientConfigs); InetSocketAddress addr = new InetSocketAddress("localhost", server.port()); selector.connect(node, addr, BUFFER_SIZE, BUFFER_SIZE); @@ -121,7 +122,9 @@ public class SslTransportLayerTest { public void testEndpointIdentificationDisabled() throws Exception { String node = "0"; String serverHost = InetAddress.getLocalHost().getHostAddress(); - server = new NioEchoServer(SecurityProtocol.SSL, sslServerConfigs, serverHost); + SecurityProtocol securityProtocol = SecurityProtocol.SSL; + server = new NioEchoServer(ListenerName.forSecurityProtocol(securityProtocol), securityProtocol, + new TestSecurityConfig(sslServerConfigs), serverHost); server.start(); sslClientConfigs.remove(SslConfigs.SSL_ENDPOINT_IDENTIFICATION_ALGORITHM_CONFIG); createSelector(sslClientConfigs); @@ -139,13 +142,53 @@ public class SslTransportLayerTest { public void testClientAuthenticationRequiredValidProvided() throws Exception { String node = "0"; sslServerConfigs.put(SslConfigs.SSL_CLIENT_AUTH_CONFIG, "required"); - server = NetworkTestUtils.createEchoServer(SecurityProtocol.SSL, sslServerConfigs); + server = createEchoServer(SecurityProtocol.SSL); createSelector(sslClientConfigs); InetSocketAddress addr = new InetSocketAddress("localhost", server.port()); selector.connect(node, addr, BUFFER_SIZE, BUFFER_SIZE); NetworkTestUtils.checkClientConnection(selector, node, 100, 10); } + + /** + * Tests that disabling client authentication as a listener override has the desired effect. + */ + @Test + public void testListenerConfigOverride() throws Exception { + String node = "0"; + ListenerName clientListenerName = new ListenerName("client"); + sslServerConfigs.put(SslConfigs.SSL_CLIENT_AUTH_CONFIG, "required"); + sslServerConfigs.put(clientListenerName.configPrefix() + SslConfigs.SSL_CLIENT_AUTH_CONFIG, "none"); + + // `client` listener is not configured at this point, so client auth should be required + server = createEchoServer(SecurityProtocol.SSL); + InetSocketAddress addr = new InetSocketAddress("localhost", server.port()); + + // Connect with client auth should work fine + createSelector(sslClientConfigs); + selector.connect(node, addr, BUFFER_SIZE, BUFFER_SIZE); + NetworkTestUtils.checkClientConnection(selector, node, 100, 10); + selector.close(); + + // Remove client auth, so connection should fail + sslClientConfigs.remove(SslConfigs.SSL_KEYSTORE_LOCATION_CONFIG); + sslClientConfigs.remove(SslConfigs.SSL_KEYSTORE_PASSWORD_CONFIG); + sslClientConfigs.remove(SslConfigs.SSL_KEY_PASSWORD_CONFIG); + createSelector(sslClientConfigs); + selector.connect(node, addr, BUFFER_SIZE, BUFFER_SIZE); + NetworkTestUtils.waitForChannelClose(selector, node); + selector.close(); + server.close(); + + // Listener-specific config should be used and client auth should be disabled + server = createEchoServer(clientListenerName, SecurityProtocol.SSL); + addr = new InetSocketAddress("localhost", server.port()); + + // Connect without client auth should work fine now + createSelector(sslClientConfigs); + selector.connect(node, addr, BUFFER_SIZE, BUFFER_SIZE); + NetworkTestUtils.checkClientConnection(selector, node, 100, 10); + } /** * Tests that server does not accept connections from clients with an untrusted certificate @@ -156,7 +199,7 @@ public class SslTransportLayerTest { String node = "0"; sslServerConfigs = serverCertStores.getUntrustingConfig(); sslServerConfigs.put(SslConfigs.SSL_CLIENT_AUTH_CONFIG, "required"); - server = NetworkTestUtils.createEchoServer(SecurityProtocol.SSL, sslServerConfigs); + server = createEchoServer(SecurityProtocol.SSL); createSelector(sslClientConfigs); InetSocketAddress addr = new InetSocketAddress("localhost", server.port()); selector.connect(node, addr, BUFFER_SIZE, BUFFER_SIZE); @@ -172,7 +215,7 @@ public class SslTransportLayerTest { public void testClientAuthenticationRequiredNotProvided() throws Exception { String node = "0"; sslServerConfigs.put(SslConfigs.SSL_CLIENT_AUTH_CONFIG, "required"); - server = NetworkTestUtils.createEchoServer(SecurityProtocol.SSL, sslServerConfigs); + server = createEchoServer(SecurityProtocol.SSL); sslClientConfigs.remove(SslConfigs.SSL_KEYSTORE_LOCATION_CONFIG); sslClientConfigs.remove(SslConfigs.SSL_KEYSTORE_PASSWORD_CONFIG); @@ -193,7 +236,7 @@ public class SslTransportLayerTest { String node = "0"; sslServerConfigs = serverCertStores.getUntrustingConfig(); sslServerConfigs.put(SslConfigs.SSL_CLIENT_AUTH_CONFIG, "none"); - server = NetworkTestUtils.createEchoServer(SecurityProtocol.SSL, sslServerConfigs); + server = createEchoServer(SecurityProtocol.SSL); createSelector(sslClientConfigs); InetSocketAddress addr = new InetSocketAddress("localhost", server.port()); selector.connect(node, addr, BUFFER_SIZE, BUFFER_SIZE); @@ -209,7 +252,7 @@ public class SslTransportLayerTest { public void testClientAuthenticationDisabledNotProvided() throws Exception { String node = "0"; sslServerConfigs.put(SslConfigs.SSL_CLIENT_AUTH_CONFIG, "none"); - server = NetworkTestUtils.createEchoServer(SecurityProtocol.SSL, sslServerConfigs); + server = createEchoServer(SecurityProtocol.SSL); sslClientConfigs.remove(SslConfigs.SSL_KEYSTORE_LOCATION_CONFIG); sslClientConfigs.remove(SslConfigs.SSL_KEYSTORE_PASSWORD_CONFIG); @@ -229,7 +272,7 @@ public class SslTransportLayerTest { public void testClientAuthenticationRequestedValidProvided() throws Exception { String node = "0"; sslServerConfigs.put(SslConfigs.SSL_CLIENT_AUTH_CONFIG, "requested"); - server = NetworkTestUtils.createEchoServer(SecurityProtocol.SSL, sslServerConfigs); + server = createEchoServer(SecurityProtocol.SSL); createSelector(sslClientConfigs); InetSocketAddress addr = new InetSocketAddress("localhost", server.port()); selector.connect(node, addr, BUFFER_SIZE, BUFFER_SIZE); @@ -245,7 +288,7 @@ public class SslTransportLayerTest { public void testClientAuthenticationRequestedNotProvided() throws Exception { String node = "0"; sslServerConfigs.put(SslConfigs.SSL_CLIENT_AUTH_CONFIG, "requested"); - server = NetworkTestUtils.createEchoServer(SecurityProtocol.SSL, sslServerConfigs); + server = createEchoServer(SecurityProtocol.SSL); sslClientConfigs.remove(SslConfigs.SSL_KEYSTORE_LOCATION_CONFIG); sslClientConfigs.remove(SslConfigs.SSL_KEYSTORE_PASSWORD_CONFIG); @@ -310,7 +353,7 @@ public class SslTransportLayerTest { public void testInvalidKeyPassword() throws Exception { String node = "0"; sslServerConfigs.put(SslConfigs.SSL_KEY_PASSWORD_CONFIG, new Password("invalid")); - server = NetworkTestUtils.createEchoServer(SecurityProtocol.SSL, sslServerConfigs); + server = createEchoServer(SecurityProtocol.SSL); createSelector(sslClientConfigs); InetSocketAddress addr = new InetSocketAddress("localhost", server.port()); selector.connect(node, addr, BUFFER_SIZE, BUFFER_SIZE); @@ -325,7 +368,7 @@ public class SslTransportLayerTest { public void testUnsupportedTLSVersion() throws Exception { String node = "0"; sslServerConfigs.put(SslConfigs.SSL_ENABLED_PROTOCOLS_CONFIG, Arrays.asList("TLSv1.2")); - server = NetworkTestUtils.createEchoServer(SecurityProtocol.SSL, sslServerConfigs); + server = createEchoServer(SecurityProtocol.SSL); sslClientConfigs.put(SslConfigs.SSL_ENABLED_PROTOCOLS_CONFIG, Arrays.asList("TLSv1.1")); createSelector(sslClientConfigs); @@ -343,7 +386,7 @@ public class SslTransportLayerTest { String node = "0"; String[] cipherSuites = SSLContext.getDefault().getDefaultSSLParameters().getCipherSuites(); sslServerConfigs.put(SslConfigs.SSL_CIPHER_SUITES_CONFIG, Arrays.asList(cipherSuites[0])); - server = NetworkTestUtils.createEchoServer(SecurityProtocol.SSL, sslServerConfigs); + server = createEchoServer(SecurityProtocol.SSL); sslClientConfigs.put(SslConfigs.SSL_CIPHER_SUITES_CONFIG, Arrays.asList(cipherSuites[1])); createSelector(sslClientConfigs); @@ -359,7 +402,7 @@ public class SslTransportLayerTest { @Test public void testNetReadBufferResize() throws Exception { String node = "0"; - server = NetworkTestUtils.createEchoServer(SecurityProtocol.SSL, sslServerConfigs); + server = createEchoServer(SecurityProtocol.SSL); createSelector(sslClientConfigs, 10, null, null); InetSocketAddress addr = new InetSocketAddress("localhost", server.port()); selector.connect(node, addr, BUFFER_SIZE, BUFFER_SIZE); @@ -373,7 +416,7 @@ public class SslTransportLayerTest { @Test public void testNetWriteBufferResize() throws Exception { String node = "0"; - server = NetworkTestUtils.createEchoServer(SecurityProtocol.SSL, sslServerConfigs); + server = createEchoServer(SecurityProtocol.SSL); createSelector(sslClientConfigs, null, 10, null); InetSocketAddress addr = new InetSocketAddress("localhost", server.port()); selector.connect(node, addr, BUFFER_SIZE, BUFFER_SIZE); @@ -387,7 +430,7 @@ public class SslTransportLayerTest { @Test public void testApplicationBufferResize() throws Exception { String node = "0"; - server = NetworkTestUtils.createEchoServer(SecurityProtocol.SSL, sslServerConfigs); + server = createEchoServer(SecurityProtocol.SSL); createSelector(sslClientConfigs, null, null, 10); InetSocketAddress addr = new InetSocketAddress("localhost", server.port()); selector.connect(node, addr, BUFFER_SIZE, BUFFER_SIZE); @@ -407,7 +450,7 @@ public class SslTransportLayerTest { private void testClose(SecurityProtocol securityProtocol, ChannelBuilder clientChannelBuilder) throws Exception { String node = "0"; - server = NetworkTestUtils.createEchoServer(securityProtocol, sslServerConfigs); + server = createEchoServer(securityProtocol); clientChannelBuilder.configure(sslClientConfigs); this.selector = new Selector(5000, new Metrics(), new MockTime(), "MetricGroup", clientChannelBuilder); InetSocketAddress addr = new InetSocketAddress("localhost", server.port()); @@ -441,7 +484,8 @@ public class SslTransportLayerTest { createSelector(sslClientConfigs, null, null, null); } - private void createSelector(Map sslClientConfigs, final Integer netReadBufSize, final Integer netWriteBufSize, final Integer appBufSize) { + private void createSelector(Map sslClientConfigs, final Integer netReadBufSize, + final Integer netWriteBufSize, final Integer appBufSize) { this.channelBuilder = new SslChannelBuilder(Mode.CLIENT) { @@ -460,6 +504,14 @@ public class SslTransportLayerTest { this.channelBuilder.configure(sslClientConfigs); this.selector = new Selector(5000, new Metrics(), new MockTime(), "MetricGroup", channelBuilder); } + + private NioEchoServer createEchoServer(ListenerName listenerName, SecurityProtocol securityProtocol) throws Exception { + return NetworkTestUtils.createEchoServer(listenerName, securityProtocol, new TestSecurityConfig(sslServerConfigs)); + } + + private NioEchoServer createEchoServer(SecurityProtocol securityProtocol) throws Exception { + return createEchoServer(ListenerName.forSecurityProtocol(securityProtocol), securityProtocol); + } /** * SSLTransportLayer with overrides for packet and application buffer size to test buffer resize diff --git a/clients/src/test/java/org/apache/kafka/common/security/JaasUtilsTest.java b/clients/src/test/java/org/apache/kafka/common/security/JaasContextTest.java similarity index 70% rename from clients/src/test/java/org/apache/kafka/common/security/JaasUtilsTest.java rename to clients/src/test/java/org/apache/kafka/common/security/JaasContextTest.java index 10ec39052dd..6040aa2e664 100644 --- a/clients/src/test/java/org/apache/kafka/common/security/JaasUtilsTest.java +++ b/clients/src/test/java/org/apache/kafka/common/security/JaasContextTest.java @@ -21,6 +21,7 @@ import java.io.IOException; import java.nio.charset.StandardCharsets; import java.nio.file.Files; import java.util.Arrays; +import java.util.Collections; import java.util.HashMap; import java.util.List; import java.util.Map; @@ -35,7 +36,7 @@ import static org.junit.Assert.fail; import org.apache.kafka.common.config.SaslConfigs; import org.apache.kafka.common.config.types.Password; -import org.apache.kafka.common.network.LoginType; +import org.apache.kafka.common.network.ListenerName; import org.junit.After; import org.junit.Before; import org.junit.Test; @@ -44,7 +45,7 @@ import org.junit.Test; * Tests parsing of {@link SaslConfigs#SASL_JAAS_CONFIG} property and verifies that the format * and parsing are consistent with JAAS configuration files loaded by the JRE. */ -public class JaasUtilsTest { +public class JaasContextTest { private File jaasConfigFile; @@ -129,8 +130,9 @@ public class JaasUtilsTest { } String jaasConfigProp = builder.toString(); - Configuration configuration = new JaasConfig(LoginType.CLIENT, jaasConfigProp); - AppConfigurationEntry[] dynamicEntries = configuration.getAppConfigurationEntry(LoginType.CLIENT.contextName()); + String clientContextName = "CLIENT"; + Configuration configuration = new JaasConfig(clientContextName, jaasConfigProp); + AppConfigurationEntry[] dynamicEntries = configuration.getAppConfigurationEntry(clientContextName); assertEquals(moduleCount, dynamicEntries.length); for (int i = 0; i < moduleCount; i++) { @@ -138,8 +140,9 @@ public class JaasUtilsTest { checkEntry(entry, "test.Module" + i, LoginModuleControlFlag.REQUIRED, moduleOptions.get(i)); } - writeConfiguration(LoginType.SERVER, jaasConfigProp); - AppConfigurationEntry[] staticEntries = Configuration.getConfiguration().getAppConfigurationEntry(LoginType.SERVER.contextName()); + String serverContextName = "SERVER"; + writeConfiguration(serverContextName, jaasConfigProp); + AppConfigurationEntry[] staticEntries = Configuration.getConfiguration().getAppConfigurationEntry(serverContextName); for (int i = 0; i < moduleCount; i++) { AppConfigurationEntry staticEntry = staticEntries[i]; checkEntry(staticEntry, dynamicEntries[i].getLoginModuleName(), LoginModuleControlFlag.REQUIRED, dynamicEntries[i].getOptions()); @@ -179,14 +182,60 @@ public class JaasUtilsTest { checkConfiguration(config, "test.testNumericOptionWithQuotes", LoginModuleControlFlag.REQUIRED, options); } - private AppConfigurationEntry configurationEntry(LoginType loginType, String jaasConfigProp) { + @Test + public void testLoadForServerWithListenerNameOverride() throws IOException { + writeConfiguration(Arrays.asList( + "KafkaServer { test.LoginModuleDefault required; };", + "plaintext.KafkaServer { test.LoginModuleOverride requisite; };" + )); + JaasContext context = JaasContext.load(JaasContext.Type.SERVER, new ListenerName("plaintext"), + Collections.emptyMap()); + assertEquals("plaintext.KafkaServer", context.name()); + assertEquals(JaasContext.Type.SERVER, context.type()); + assertEquals(1, context.configurationEntries().size()); + checkEntry(context.configurationEntries().get(0), "test.LoginModuleOverride", + LoginModuleControlFlag.REQUISITE, Collections.emptyMap()); + } + + @Test + public void testLoadForServerWithListenerNameAndFallback() throws IOException { + writeConfiguration(Arrays.asList( + "KafkaServer { test.LoginModule required; };", + "other.KafkaServer { test.LoginModuleOther requisite; };" + )); + JaasContext context = JaasContext.load(JaasContext.Type.SERVER, new ListenerName("plaintext"), + Collections.emptyMap()); + assertEquals("KafkaServer", context.name()); + assertEquals(JaasContext.Type.SERVER, context.type()); + assertEquals(1, context.configurationEntries().size()); + checkEntry(context.configurationEntries().get(0), "test.LoginModule", LoginModuleControlFlag.REQUIRED, + Collections.emptyMap()); + } + + @Test(expected = IllegalArgumentException.class) + public void testLoadForServerWithWrongListenerName() throws IOException { + writeConfiguration("Server", "test.LoginModule required;"); + JaasContext.load(JaasContext.Type.SERVER, new ListenerName("plaintext"), + Collections.emptyMap()); + } + + /** + * ListenerName can only be used with Type.SERVER. + */ + @Test(expected = IllegalArgumentException.class) + public void testLoadForClientWithListenerName() { + JaasContext.load(JaasContext.Type.CLIENT, new ListenerName("foo"), + Collections.emptyMap()); + } + + private AppConfigurationEntry configurationEntry(JaasContext.Type contextType, String jaasConfigProp) { Map configs = new HashMap<>(); if (jaasConfigProp != null) configs.put(SaslConfigs.SASL_JAAS_CONFIG, new Password(jaasConfigProp)); - Configuration configuration = JaasUtils.jaasConfig(loginType, configs); - AppConfigurationEntry[] entry = configuration.getAppConfigurationEntry(loginType.contextName()); - assertEquals(1, entry.length); - return entry[0]; + JaasContext context = JaasContext.load(contextType, null, contextType.name(), configs); + List entries = context.configurationEntries(); + assertEquals(1, entries.size()); + return entries.get(0); } private String controlFlag(LoginModuleControlFlag loginModuleControlFlag) { @@ -210,8 +259,12 @@ public class JaasUtilsTest { return builder.toString(); } - private void writeConfiguration(LoginType loginType, String jaasConfigProp) throws IOException { - List lines = Arrays.asList(loginType.contextName() + " { ", jaasConfigProp, "};"); + private void writeConfiguration(String contextName, String jaasConfigProp) throws IOException { + List lines = Arrays.asList(contextName + " { ", jaasConfigProp, "};"); + writeConfiguration(lines); + } + + private void writeConfiguration(List lines) throws IOException { Files.write(jaasConfigFile.toPath(), lines, StandardCharsets.UTF_8); Configuration.setConfiguration(null); } @@ -228,25 +281,25 @@ public class JaasUtilsTest { } private void checkConfiguration(String jaasConfigProp, String loginModule, LoginModuleControlFlag controlFlag, Map options) throws Exception { - AppConfigurationEntry dynamicEntry = configurationEntry(LoginType.CLIENT, jaasConfigProp); + AppConfigurationEntry dynamicEntry = configurationEntry(JaasContext.Type.CLIENT, jaasConfigProp); checkEntry(dynamicEntry, loginModule, controlFlag, options); - assertNull("Static configuration updated", Configuration.getConfiguration().getAppConfigurationEntry(LoginType.CLIENT.contextName())); + assertNull("Static configuration updated", Configuration.getConfiguration().getAppConfigurationEntry(JaasContext.Type.CLIENT.name())); - writeConfiguration(LoginType.SERVER, jaasConfigProp); - AppConfigurationEntry staticEntry = configurationEntry(LoginType.SERVER, null); + writeConfiguration(JaasContext.Type.SERVER.name(), jaasConfigProp); + AppConfigurationEntry staticEntry = configurationEntry(JaasContext.Type.SERVER, null); checkEntry(staticEntry, loginModule, controlFlag, options); } private void checkInvalidConfiguration(String jaasConfigProp) throws IOException { try { - writeConfiguration(LoginType.SERVER, jaasConfigProp); - AppConfigurationEntry entry = configurationEntry(LoginType.SERVER, null); + writeConfiguration(JaasContext.Type.SERVER.name(), jaasConfigProp); + AppConfigurationEntry entry = configurationEntry(JaasContext.Type.SERVER, null); fail("Invalid JAAS configuration file didn't throw exception, entry=" + entry); } catch (SecurityException e) { // Expected exception } try { - AppConfigurationEntry entry = configurationEntry(LoginType.CLIENT, jaasConfigProp); + AppConfigurationEntry entry = configurationEntry(JaasContext.Type.CLIENT, jaasConfigProp); fail("Invalid JAAS configuration property didn't throw exception, entry=" + entry); } catch (IllegalArgumentException e) { // Expected exception diff --git a/clients/src/test/java/org/apache/kafka/common/security/TestSecurityConfig.java b/clients/src/test/java/org/apache/kafka/common/security/TestSecurityConfig.java new file mode 100644 index 00000000000..8c1c0380301 --- /dev/null +++ b/clients/src/test/java/org/apache/kafka/common/security/TestSecurityConfig.java @@ -0,0 +1,38 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE + * file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file + * to You under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the + * License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on + * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the + * specific language governing permissions and limitations under the License. + */ +package org.apache.kafka.common.security; + +import org.apache.kafka.common.config.AbstractConfig; +import org.apache.kafka.common.config.ConfigDef; +import org.apache.kafka.common.config.ConfigDef.Importance; +import org.apache.kafka.common.config.ConfigDef.Type; +import org.apache.kafka.common.config.SaslConfigs; +import org.apache.kafka.common.config.SslConfigs; + +import java.util.Map; + +public class TestSecurityConfig extends AbstractConfig { + private static final ConfigDef CONFIG = new ConfigDef() + .define(SslConfigs.SSL_CLIENT_AUTH_CONFIG, Type.STRING, null, Importance.MEDIUM, + SslConfigs.SSL_CLIENT_AUTH_DOC) + .define(SaslConfigs.SASL_ENABLED_MECHANISMS, Type.LIST, SaslConfigs.DEFAULT_SASL_ENABLED_MECHANISMS, + Importance.MEDIUM, SaslConfigs.SASL_ENABLED_MECHANISMS_DOC) + .define(SslConfigs.PRINCIPAL_BUILDER_CLASS_CONFIG, Type.CLASS, SslConfigs.DEFAULT_PRINCIPAL_BUILDER_CLASS, + Importance.MEDIUM, SslConfigs.PRINCIPAL_BUILDER_CLASS_DOC) + .withClientSslSupport() + .withClientSaslSupport(); + + public TestSecurityConfig(Map originals) { + super(CONFIG, originals, false); + } +} diff --git a/clients/src/test/java/org/apache/kafka/common/security/authenticator/SaslAuthenticatorTest.java b/clients/src/test/java/org/apache/kafka/common/security/authenticator/SaslAuthenticatorTest.java index ac9beb4fde0..bc967af100c 100644 --- a/clients/src/test/java/org/apache/kafka/common/security/authenticator/SaslAuthenticatorTest.java +++ b/clients/src/test/java/org/apache/kafka/common/security/authenticator/SaslAuthenticatorTest.java @@ -19,7 +19,7 @@ import org.apache.kafka.common.config.types.Password; import org.apache.kafka.common.network.CertStores; import org.apache.kafka.common.network.ChannelBuilder; import org.apache.kafka.common.network.ChannelBuilders; -import org.apache.kafka.common.network.LoginType; +import org.apache.kafka.common.network.ListenerName; import org.apache.kafka.common.network.NetworkSend; import org.apache.kafka.common.network.NetworkTestUtils; import org.apache.kafka.common.network.NioEchoServer; @@ -38,7 +38,8 @@ import org.apache.kafka.common.requests.RequestHeader; import org.apache.kafka.common.requests.ResponseHeader; import org.apache.kafka.common.requests.SaslHandshakeRequest; import org.apache.kafka.common.requests.SaslHandshakeResponse; -import org.apache.kafka.common.security.JaasUtils; +import org.apache.kafka.common.security.JaasContext; +import org.apache.kafka.common.security.TestSecurityConfig; import org.apache.kafka.common.security.plain.PlainLoginModule; import org.apache.kafka.common.security.scram.ScramCredential; import org.apache.kafka.common.security.scram.ScramFormatter; @@ -63,7 +64,6 @@ import java.util.Random; import javax.security.auth.login.Configuration; import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertTrue; import static org.junit.Assert.fail; /** @@ -106,7 +106,7 @@ public class SaslAuthenticatorTest { SecurityProtocol securityProtocol = SecurityProtocol.SASL_SSL; configureMechanisms("PLAIN", Arrays.asList("PLAIN")); - server = NetworkTestUtils.createEchoServer(securityProtocol, saslServerConfigs); + server = createEchoServer(securityProtocol); createAndCheckClientConnection(securityProtocol, node); } @@ -119,7 +119,7 @@ public class SaslAuthenticatorTest { SecurityProtocol securityProtocol = SecurityProtocol.SASL_PLAINTEXT; configureMechanisms("PLAIN", Arrays.asList("PLAIN")); - server = NetworkTestUtils.createEchoServer(securityProtocol, saslServerConfigs); + server = createEchoServer(securityProtocol); createAndCheckClientConnection(securityProtocol, node); } @@ -133,7 +133,7 @@ public class SaslAuthenticatorTest { TestJaasConfig jaasConfig = configureMechanisms("PLAIN", Arrays.asList("PLAIN")); jaasConfig.setPlainClientOptions(TestJaasConfig.USERNAME, "invalidpassword"); - server = NetworkTestUtils.createEchoServer(securityProtocol, saslServerConfigs); + server = createEchoServer(securityProtocol); createClientConnection(securityProtocol, node); NetworkTestUtils.waitForChannelClose(selector, node); } @@ -148,7 +148,7 @@ public class SaslAuthenticatorTest { TestJaasConfig jaasConfig = configureMechanisms("PLAIN", Arrays.asList("PLAIN")); jaasConfig.setPlainClientOptions("invaliduser", TestJaasConfig.PASSWORD); - server = NetworkTestUtils.createEchoServer(securityProtocol, saslServerConfigs); + server = createEchoServer(securityProtocol); createClientConnection(securityProtocol, node); NetworkTestUtils.waitForChannelClose(selector, node); } @@ -163,7 +163,7 @@ public class SaslAuthenticatorTest { jaasConfig.setPlainClientOptions(null, "mypassword"); SecurityProtocol securityProtocol = SecurityProtocol.SASL_SSL; - server = NetworkTestUtils.createEchoServer(securityProtocol, saslServerConfigs); + server = createEchoServer(securityProtocol); createSelector(securityProtocol, saslClientConfigs); InetSocketAddress addr = new InetSocketAddress("127.0.0.1", server.port()); try { @@ -184,7 +184,7 @@ public class SaslAuthenticatorTest { jaasConfig.setPlainClientOptions("myuser", null); SecurityProtocol securityProtocol = SecurityProtocol.SASL_SSL; - server = NetworkTestUtils.createEchoServer(securityProtocol, saslServerConfigs); + server = createEchoServer(securityProtocol); createSelector(securityProtocol, saslClientConfigs); InetSocketAddress addr = new InetSocketAddress("127.0.0.1", server.port()); try { @@ -205,7 +205,7 @@ public class SaslAuthenticatorTest { SecurityProtocol securityProtocol = SecurityProtocol.SASL_SSL; configureMechanisms("DIGEST-MD5", Arrays.asList("DIGEST-MD5")); - server = NetworkTestUtils.createEchoServer(securityProtocol, saslServerConfigs); + server = createEchoServer(securityProtocol); createAndCheckClientConnection(securityProtocol, node); } @@ -217,7 +217,7 @@ public class SaslAuthenticatorTest { public void testMultipleServerMechanisms() throws Exception { SecurityProtocol securityProtocol = SecurityProtocol.SASL_SSL; configureMechanisms("DIGEST-MD5", Arrays.asList("DIGEST-MD5", "PLAIN", "SCRAM-SHA-256")); - server = NetworkTestUtils.createEchoServer(securityProtocol, saslServerConfigs); + server = createEchoServer(securityProtocol); updateScramCredentialCache(TestJaasConfig.USERNAME, TestJaasConfig.PASSWORD); String node1 = "1"; @@ -230,15 +230,12 @@ public class SaslAuthenticatorTest { InetSocketAddress addr = new InetSocketAddress("127.0.0.1", server.port()); selector.connect(node2, addr, BUFFER_SIZE, BUFFER_SIZE); NetworkTestUtils.checkClientConnection(selector, node2, 100, 10); - selector.close(); String node3 = "3"; saslClientConfigs.put(SaslConfigs.SASL_MECHANISM, "SCRAM-SHA-256"); createSelector(securityProtocol, saslClientConfigs); selector.connect(node3, new InetSocketAddress("127.0.0.1", server.port()), BUFFER_SIZE, BUFFER_SIZE); NetworkTestUtils.checkClientConnection(selector, node3, 100, 10); - selector.close(); - selector = null; } /** @@ -249,7 +246,7 @@ public class SaslAuthenticatorTest { SecurityProtocol securityProtocol = SecurityProtocol.SASL_SSL; configureMechanisms("SCRAM-SHA-256", Arrays.asList("SCRAM-SHA-256")); - server = NetworkTestUtils.createEchoServer(securityProtocol, saslServerConfigs); + server = createEchoServer(securityProtocol); updateScramCredentialCache(TestJaasConfig.USERNAME, TestJaasConfig.PASSWORD); createAndCheckClientConnection(securityProtocol, "0"); } @@ -262,7 +259,7 @@ public class SaslAuthenticatorTest { public void testValidSaslScramMechanisms() throws Exception { SecurityProtocol securityProtocol = SecurityProtocol.SASL_SSL; configureMechanisms("SCRAM-SHA-256", new ArrayList<>(ScramMechanism.mechanismNames())); - server = NetworkTestUtils.createEchoServer(securityProtocol, saslServerConfigs); + server = createEchoServer(securityProtocol); updateScramCredentialCache(TestJaasConfig.USERNAME, TestJaasConfig.PASSWORD); for (String mechanism : ScramMechanism.mechanismNames()) { @@ -281,10 +278,10 @@ public class SaslAuthenticatorTest { Map options = new HashMap<>(); options.put("username", TestJaasConfig.USERNAME); options.put("password", "invalidpassword"); - jaasConfig.createOrUpdateEntry(JaasUtils.LOGIN_CONTEXT_CLIENT, ScramLoginModule.class.getName(), options); + jaasConfig.createOrUpdateEntry(TestJaasConfig.LOGIN_CONTEXT_CLIENT, ScramLoginModule.class.getName(), options); String node = "0"; - server = NetworkTestUtils.createEchoServer(securityProtocol, saslServerConfigs); + server = createEchoServer(securityProtocol); updateScramCredentialCache(TestJaasConfig.USERNAME, TestJaasConfig.PASSWORD); createClientConnection(securityProtocol, node); NetworkTestUtils.waitForChannelClose(selector, node); @@ -300,10 +297,10 @@ public class SaslAuthenticatorTest { Map options = new HashMap<>(); options.put("username", "unknownUser"); options.put("password", TestJaasConfig.PASSWORD); - jaasConfig.createOrUpdateEntry(JaasUtils.LOGIN_CONTEXT_CLIENT, ScramLoginModule.class.getName(), options); + jaasConfig.createOrUpdateEntry(TestJaasConfig.LOGIN_CONTEXT_CLIENT, ScramLoginModule.class.getName(), options); String node = "0"; - server = NetworkTestUtils.createEchoServer(securityProtocol, saslServerConfigs); + server = createEchoServer(securityProtocol); updateScramCredentialCache(TestJaasConfig.USERNAME, TestJaasConfig.PASSWORD); createClientConnection(securityProtocol, node); NetworkTestUtils.waitForChannelClose(selector, node); @@ -317,7 +314,7 @@ public class SaslAuthenticatorTest { public void testUserCredentialsUnavailableForScramMechanism() throws Exception { SecurityProtocol securityProtocol = SecurityProtocol.SASL_SSL; configureMechanisms("SCRAM-SHA-256", new ArrayList<>(ScramMechanism.mechanismNames())); - server = NetworkTestUtils.createEchoServer(securityProtocol, saslServerConfigs); + server = createEchoServer(securityProtocol); updateScramCredentialCache(TestJaasConfig.USERNAME, TestJaasConfig.PASSWORD); server.credentialCache().cache(ScramMechanism.SCRAM_SHA_256.mechanismName(), ScramCredential.class).remove(TestJaasConfig.USERNAME); @@ -325,7 +322,6 @@ public class SaslAuthenticatorTest { saslClientConfigs.put(SaslConfigs.SASL_MECHANISM, "SCRAM-SHA-256"); createClientConnection(securityProtocol, node); NetworkTestUtils.waitForChannelClose(selector, node); - selector.close(); saslClientConfigs.put(SaslConfigs.SASL_MECHANISM, "SCRAM-SHA-512"); createAndCheckClientConnection(securityProtocol, "2"); @@ -344,9 +340,9 @@ public class SaslAuthenticatorTest { Map options = new HashMap<>(); options.put("username", username); options.put("password", password); - jaasConfig.createOrUpdateEntry(JaasUtils.LOGIN_CONTEXT_CLIENT, ScramLoginModule.class.getName(), options); + jaasConfig.createOrUpdateEntry(TestJaasConfig.LOGIN_CONTEXT_CLIENT, ScramLoginModule.class.getName(), options); - server = NetworkTestUtils.createEchoServer(securityProtocol, saslServerConfigs); + server = createEchoServer(securityProtocol); updateScramCredentialCache(username, password); createAndCheckClientConnection(securityProtocol, "0"); } @@ -386,7 +382,7 @@ public class SaslAuthenticatorTest { public void testApiVersionsRequestWithUnsupportedVersion() throws Exception { SecurityProtocol securityProtocol = SecurityProtocol.SASL_PLAINTEXT; configureMechanisms("PLAIN", Arrays.asList("PLAIN")); - server = NetworkTestUtils.createEchoServer(securityProtocol, saslServerConfigs); + server = createEchoServer(securityProtocol); // Send ApiVersionsRequest with unsupported version and validate error response. String node = "1"; @@ -418,7 +414,7 @@ public class SaslAuthenticatorTest { public void testSaslHandshakeRequestWithUnsupportedVersion() throws Exception { SecurityProtocol securityProtocol = SecurityProtocol.SASL_PLAINTEXT; configureMechanisms("PLAIN", Arrays.asList("PLAIN")); - server = NetworkTestUtils.createEchoServer(securityProtocol, saslServerConfigs); + server = createEchoServer(securityProtocol); // Send ApiVersionsRequest and validate error response. String node1 = "invalid1"; @@ -442,7 +438,7 @@ public class SaslAuthenticatorTest { public void testInvalidSaslPacket() throws Exception { SecurityProtocol securityProtocol = SecurityProtocol.SASL_PLAINTEXT; configureMechanisms("PLAIN", Arrays.asList("PLAIN")); - server = NetworkTestUtils.createEchoServer(securityProtocol, saslServerConfigs); + server = createEchoServer(securityProtocol); // Send invalid SASL packet after valid handshake request String node1 = "invalid1"; @@ -481,7 +477,7 @@ public class SaslAuthenticatorTest { public void testInvalidApiVersionsRequestSequence() throws Exception { SecurityProtocol securityProtocol = SecurityProtocol.SASL_PLAINTEXT; configureMechanisms("PLAIN", Arrays.asList("PLAIN")); - server = NetworkTestUtils.createEchoServer(securityProtocol, saslServerConfigs); + server = createEchoServer(securityProtocol); // Send handshake request followed by ApiVersionsRequest String node1 = "invalid1"; @@ -508,7 +504,7 @@ public class SaslAuthenticatorTest { public void testPacketSizeTooBig() throws Exception { SecurityProtocol securityProtocol = SecurityProtocol.SASL_PLAINTEXT; configureMechanisms("PLAIN", Arrays.asList("PLAIN")); - server = NetworkTestUtils.createEchoServer(securityProtocol, saslServerConfigs); + server = createEchoServer(securityProtocol); // Send SASL packet with large size after valid handshake request String node1 = "invalid1"; @@ -548,7 +544,7 @@ public class SaslAuthenticatorTest { public void testDisallowedKafkaRequestsBeforeAuthentication() throws Exception { SecurityProtocol securityProtocol = SecurityProtocol.SASL_PLAINTEXT; configureMechanisms("PLAIN", Arrays.asList("PLAIN")); - server = NetworkTestUtils.createEchoServer(securityProtocol, saslServerConfigs); + server = createEchoServer(securityProtocol); // Send metadata request before Kafka SASL handshake request String node1 = "invalid1"; @@ -586,10 +582,10 @@ public class SaslAuthenticatorTest { @Test public void testInvalidLoginModule() throws Exception { TestJaasConfig jaasConfig = configureMechanisms("PLAIN", Arrays.asList("PLAIN")); - jaasConfig.createOrUpdateEntry(JaasUtils.LOGIN_CONTEXT_CLIENT, "InvalidLoginModule", TestJaasConfig.defaultClientOptions()); + jaasConfig.createOrUpdateEntry(TestJaasConfig.LOGIN_CONTEXT_CLIENT, "InvalidLoginModule", TestJaasConfig.defaultClientOptions()); SecurityProtocol securityProtocol = SecurityProtocol.SASL_SSL; - server = NetworkTestUtils.createEchoServer(securityProtocol, saslServerConfigs); + server = createEchoServer(securityProtocol); try { createSelector(securityProtocol, saslClientConfigs); fail("SASL/PLAIN channel created without valid login module"); @@ -608,7 +604,7 @@ public class SaslAuthenticatorTest { SecurityProtocol securityProtocol = SecurityProtocol.SASL_SSL; configureMechanisms("PLAIN", Arrays.asList("DIGEST-MD5")); - server = NetworkTestUtils.createEchoServer(securityProtocol, saslServerConfigs); + server = createEchoServer(securityProtocol); createClientConnection(securityProtocol, node); NetworkTestUtils.waitForChannelClose(selector, node); } @@ -623,7 +619,7 @@ public class SaslAuthenticatorTest { configureMechanisms("PLAIN", Arrays.asList("PLAIN")); saslClientConfigs.put(SaslConfigs.SASL_MECHANISM, "INVALID"); - server = NetworkTestUtils.createEchoServer(securityProtocol, saslServerConfigs); + server = createEchoServer(securityProtocol); createClientConnection(securityProtocol, node); NetworkTestUtils.waitForChannelClose(selector, node); } @@ -642,10 +638,11 @@ public class SaslAuthenticatorTest { serverOptions.put("user_user1", "user1-secret"); serverOptions.put("user_user2", "user2-secret"); TestJaasConfig staticJaasConfig = new TestJaasConfig(); - staticJaasConfig.createOrUpdateEntry(JaasUtils.LOGIN_CONTEXT_SERVER, PlainLoginModule.class.getName(), serverOptions); + staticJaasConfig.createOrUpdateEntry(TestJaasConfig.LOGIN_CONTEXT_SERVER, PlainLoginModule.class.getName(), + serverOptions); staticJaasConfig.setPlainClientOptions("user1", "invalidpassword"); Configuration.setConfiguration(staticJaasConfig); - server = NetworkTestUtils.createEchoServer(securityProtocol, saslServerConfigs); + server = createEchoServer(securityProtocol); // Check that client using static Jaas config does not connect since password is invalid createAndCheckClientConnectionFailure(securityProtocol, "1"); @@ -669,11 +666,52 @@ public class SaslAuthenticatorTest { try { createClientConnection(securityProtocol, "1"); fail("Connection created with multiple login modules in sasl.jaas.config"); - } catch (KafkaException e) { - assertTrue("Unexpected exception " + e, e.getCause() instanceof IllegalArgumentException); + } catch (IllegalArgumentException e) { + // Expected } } + @Test + public void testJaasConfigurationForListener() throws Exception { + SecurityProtocol securityProtocol = SecurityProtocol.SASL_PLAINTEXT; + saslClientConfigs.put(SaslConfigs.SASL_MECHANISM, "PLAIN"); + saslServerConfigs.put(SaslConfigs.SASL_ENABLED_MECHANISMS, Arrays.asList("PLAIN")); + + TestJaasConfig staticJaasConfig = new TestJaasConfig(); + + Map globalServerOptions = new HashMap<>(); + globalServerOptions.put("user_global1", "gsecret1"); + globalServerOptions.put("user_global2", "gsecret2"); + staticJaasConfig.createOrUpdateEntry(TestJaasConfig.LOGIN_CONTEXT_SERVER, PlainLoginModule.class.getName(), + globalServerOptions); + + Map clientListenerServerOptions = new HashMap<>(); + clientListenerServerOptions.put("user_client1", "csecret1"); + clientListenerServerOptions.put("user_client2", "csecret2"); + String clientJaasEntryName = "client." + TestJaasConfig.LOGIN_CONTEXT_SERVER; + staticJaasConfig.createOrUpdateEntry(clientJaasEntryName, PlainLoginModule.class.getName(), clientListenerServerOptions); + Configuration.setConfiguration(staticJaasConfig); + + // Listener-specific credentials + server = createEchoServer(new ListenerName("client"), securityProtocol); + saslClientConfigs.put(SaslConfigs.SASL_JAAS_CONFIG, + TestJaasConfig.jaasConfigProperty("PLAIN", "client1", "csecret1")); + createAndCheckClientConnection(securityProtocol, "1"); + saslClientConfigs.put(SaslConfigs.SASL_JAAS_CONFIG, + TestJaasConfig.jaasConfigProperty("PLAIN", "global1", "gsecret1")); + createAndCheckClientConnectionFailure(securityProtocol, "2"); + server.close(); + + // Global credentials as there is no listener-specific JAAS entry + server = createEchoServer(new ListenerName("other"), securityProtocol); + saslClientConfigs.put(SaslConfigs.SASL_JAAS_CONFIG, + TestJaasConfig.jaasConfigProperty("PLAIN", "global1", "gsecret1")); + createAndCheckClientConnection(securityProtocol, "3"); + saslClientConfigs.put(SaslConfigs.SASL_JAAS_CONFIG, + TestJaasConfig.jaasConfigProperty("PLAIN", "client1", "csecret1")); + createAndCheckClientConnectionFailure(securityProtocol, "4"); + } + /** * Tests that Kafka ApiVersionsRequests are handled by the SASL server authenticator * prior to SASL handshake flow and that subsequent authentication succeeds @@ -700,7 +738,7 @@ public class SaslAuthenticatorTest { */ private void testUnauthenticatedApiVersionsRequest(SecurityProtocol securityProtocol) throws Exception { configureMechanisms("PLAIN", Arrays.asList("PLAIN")); - server = NetworkTestUtils.createEchoServer(securityProtocol, saslServerConfigs); + server = createEchoServer(securityProtocol); // Create non-SASL connection to manually authenticate after ApiVersionsRequest String node = "1"; @@ -748,11 +786,26 @@ public class SaslAuthenticatorTest { } private void createSelector(SecurityProtocol securityProtocol, Map clientConfigs) { + if (selector != null) { + selector.close(); + selector = null; + } + String saslMechanism = (String) saslClientConfigs.get(SaslConfigs.SASL_MECHANISM); - this.channelBuilder = ChannelBuilders.clientChannelBuilder(securityProtocol, LoginType.CLIENT, clientConfigs, saslMechanism, true); + this.channelBuilder = ChannelBuilders.clientChannelBuilder(securityProtocol, JaasContext.Type.CLIENT, + new TestSecurityConfig(clientConfigs), null, saslMechanism, true); this.selector = NetworkTestUtils.createSelector(channelBuilder); } + private NioEchoServer createEchoServer(SecurityProtocol securityProtocol) throws Exception { + return createEchoServer(ListenerName.forSecurityProtocol(securityProtocol), securityProtocol); + } + + private NioEchoServer createEchoServer(ListenerName listenerName, SecurityProtocol securityProtocol) throws Exception { + return NetworkTestUtils.createEchoServer(listenerName, securityProtocol, + new TestSecurityConfig(saslServerConfigs)); + } + private void createClientConnection(SecurityProtocol securityProtocol, String node) throws Exception { createSelector(securityProtocol, saslClientConfigs); InetSocketAddress addr = new InetSocketAddress("127.0.0.1", server.port()); diff --git a/clients/src/test/java/org/apache/kafka/common/security/authenticator/TestJaasConfig.java b/clients/src/test/java/org/apache/kafka/common/security/authenticator/TestJaasConfig.java index a27b87a786f..fb73d6958fd 100644 --- a/clients/src/test/java/org/apache/kafka/common/security/authenticator/TestJaasConfig.java +++ b/clients/src/test/java/org/apache/kafka/common/security/authenticator/TestJaasConfig.java @@ -22,13 +22,15 @@ import javax.security.auth.login.Configuration; import javax.security.auth.login.AppConfigurationEntry.LoginModuleControlFlag; import org.apache.kafka.common.config.types.Password; -import org.apache.kafka.common.security.JaasUtils; import org.apache.kafka.common.security.plain.PlainLoginModule; import org.apache.kafka.common.security.scram.ScramLoginModule; import org.apache.kafka.common.security.scram.ScramMechanism; public class TestJaasConfig extends Configuration { + static final String LOGIN_CONTEXT_CLIENT = "KafkaClient"; + static final String LOGIN_CONTEXT_SERVER = "KafkaServer"; + static final String USERNAME = "myuser"; static final String PASSWORD = "mypassword"; @@ -36,9 +38,9 @@ public class TestJaasConfig extends Configuration { public static TestJaasConfig createConfiguration(String clientMechanism, List serverMechanisms) { TestJaasConfig config = new TestJaasConfig(); - config.createOrUpdateEntry(JaasUtils.LOGIN_CONTEXT_CLIENT, loginModule(clientMechanism), defaultClientOptions()); + config.createOrUpdateEntry(LOGIN_CONTEXT_CLIENT, loginModule(clientMechanism), defaultClientOptions()); for (String mechanism : serverMechanisms) { - config.addEntry(JaasUtils.LOGIN_CONTEXT_SERVER, loginModule(mechanism), defaultServerOptions(mechanism)); + config.addEntry(LOGIN_CONTEXT_SERVER, loginModule(mechanism), defaultServerOptions(mechanism)); } Configuration.setConfiguration(config); return config; @@ -54,7 +56,7 @@ public class TestJaasConfig extends Configuration { options.put("username", clientUsername); if (clientPassword != null) options.put("password", clientPassword); - createOrUpdateEntry(JaasUtils.LOGIN_CONTEXT_CLIENT, PlainLoginModule.class.getName(), options); + createOrUpdateEntry(LOGIN_CONTEXT_CLIENT, PlainLoginModule.class.getName(), options); } public void createOrUpdateEntry(String name, String loginModule, Map options) { diff --git a/clients/src/test/java/org/apache/kafka/test/TestSslUtils.java b/clients/src/test/java/org/apache/kafka/test/TestSslUtils.java index 91e921fcee3..3c172941f3b 100644 --- a/clients/src/test/java/org/apache/kafka/test/TestSslUtils.java +++ b/clients/src/test/java/org/apache/kafka/test/TestSslUtils.java @@ -167,7 +167,7 @@ public class TestSslUtils { } private static Map createSslConfig(Mode mode, File keyStoreFile, Password password, Password keyPassword, - File trustStoreFile, Password trustStorePassword) { + File trustStoreFile, Password trustStorePassword) { Map sslConfigs = new HashMap<>(); sslConfigs.put(SslConfigs.SSL_PROTOCOL_CONFIG, "TLSv1.2"); // protocol to create SSLContext diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/WorkerGroupMember.java b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/WorkerGroupMember.java index ac13472ea5d..f61eaa287ac 100644 --- a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/WorkerGroupMember.java +++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/WorkerGroupMember.java @@ -90,7 +90,7 @@ public class WorkerGroupMember { List addresses = ClientUtils.parseAndValidateAddresses(config.getList(CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG)); this.metadata.update(Cluster.bootstrap(addresses), 0); String metricGrpPrefix = "connect"; - ChannelBuilder channelBuilder = ClientUtils.createChannelBuilder(config.values()); + ChannelBuilder channelBuilder = ClientUtils.createChannelBuilder(config); NetworkClient netClient = new NetworkClient( new Selector(config.getLong(CommonClientConfigs.CONNECTIONS_MAX_IDLE_MS_CONFIG), metrics, time, metricGrpPrefix, channelBuilder), this.metadata, diff --git a/core/src/main/scala/kafka/admin/AdminClient.scala b/core/src/main/scala/kafka/admin/AdminClient.scala index 680c5e16f43..13b6571eb38 100644 --- a/core/src/main/scala/kafka/admin/AdminClient.scala +++ b/core/src/main/scala/kafka/admin/AdminClient.scala @@ -226,7 +226,7 @@ object AdminClient { val time = Time.SYSTEM val metrics = new Metrics(time) val metadata = new Metadata - val channelBuilder = ClientUtils.createChannelBuilder(config.values()) + val channelBuilder = ClientUtils.createChannelBuilder(config) val brokerUrls = config.getList(CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG) val brokerAddresses = ClientUtils.parseAndValidateAddresses(brokerUrls) diff --git a/core/src/main/scala/kafka/controller/ControllerChannelManager.scala b/core/src/main/scala/kafka/controller/ControllerChannelManager.scala index d9280348c94..d8e6a9504d6 100755 --- a/core/src/main/scala/kafka/controller/ControllerChannelManager.scala +++ b/core/src/main/scala/kafka/controller/ControllerChannelManager.scala @@ -26,11 +26,12 @@ import kafka.server.KafkaConfig import kafka.utils._ import org.apache.kafka.clients.{ClientResponse, ManualMetadataUpdater, NetworkClient} import org.apache.kafka.common.metrics.Metrics -import org.apache.kafka.common.network.{ChannelBuilders, ListenerName, LoginType, NetworkReceive, Selectable, Selector} +import org.apache.kafka.common.network.{ChannelBuilders, ListenerName, NetworkReceive, Selectable, Selector} import org.apache.kafka.common.protocol.{ApiKeys, SecurityProtocol} import org.apache.kafka.common.requests import org.apache.kafka.common.requests.{UpdateMetadataRequest, _} import org.apache.kafka.common.requests.UpdateMetadataRequest.EndPoint +import org.apache.kafka.common.security.JaasContext import org.apache.kafka.common.utils.Time import org.apache.kafka.common.{Node, TopicPartition} @@ -94,8 +95,9 @@ class ControllerChannelManager(controllerContext: ControllerContext, config: Kaf val networkClient = { val channelBuilder = ChannelBuilders.clientChannelBuilder( config.interBrokerSecurityProtocol, - LoginType.SERVER, - config.values, + JaasContext.Type.SERVER, + config, + config.interBrokerListenerName, config.saslMechanismInterBrokerProtocol, config.saslInterBrokerHandshakeRequestEnable ) diff --git a/core/src/main/scala/kafka/network/SocketServer.scala b/core/src/main/scala/kafka/network/SocketServer.scala index c0353d5299c..b9bf3e44cbd 100644 --- a/core/src/main/scala/kafka/network/SocketServer.scala +++ b/core/src/main/scala/kafka/network/SocketServer.scala @@ -34,7 +34,7 @@ import kafka.server.KafkaConfig import kafka.utils._ import org.apache.kafka.common.errors.InvalidRequestException import org.apache.kafka.common.metrics._ -import org.apache.kafka.common.network.{ChannelBuilders, KafkaChannel, ListenerName, LoginType, Mode, Selectable, Selector => KSelector} +import org.apache.kafka.common.network.{ChannelBuilders, KafkaChannel, ListenerName, Mode, Selectable, Selector => KSelector} import org.apache.kafka.common.security.auth.KafkaPrincipal import org.apache.kafka.common.protocol.SecurityProtocol import org.apache.kafka.common.protocol.types.SchemaException @@ -150,7 +150,7 @@ class SocketServer(val config: KafkaConfig, val metrics: Metrics, val time: Time config.connectionsMaxIdleMs, listenerName, securityProtocol, - config.values, + config, metrics, credentialProvider ) @@ -379,7 +379,7 @@ private[kafka] class Processor(val id: Int, connectionsMaxIdleMs: Long, listenerName: ListenerName, securityProtocol: SecurityProtocol, - channelConfigs: java.util.Map[String, _], + config: KafkaConfig, metrics: Metrics, credentialProvider: CredentialProvider) extends AbstractServerThread(connectionQuotas) with KafkaMetricsGroup { @@ -419,7 +419,7 @@ private[kafka] class Processor(val id: Int, "socket-server", metricTags, false, - ChannelBuilders.serverChannelBuilder(securityProtocol, channelConfigs, credentialProvider.credentialCache)) + ChannelBuilders.serverChannelBuilder(listenerName, securityProtocol, config, credentialProvider.credentialCache)) override def run() { startupComplete() diff --git a/core/src/main/scala/kafka/server/KafkaServer.scala b/core/src/main/scala/kafka/server/KafkaServer.scala index b5075f94728..dcbd3b4004c 100755 --- a/core/src/main/scala/kafka/server/KafkaServer.scala +++ b/core/src/main/scala/kafka/server/KafkaServer.scala @@ -26,7 +26,7 @@ import java.util.concurrent.atomic.{AtomicBoolean, AtomicInteger} import com.yammer.metrics.core.Gauge import kafka.admin.AdminUtils import kafka.api.KAFKA_0_9_0 -import kafka.cluster.{Broker, EndPoint} +import kafka.cluster.Broker import kafka.common.{GenerateBrokerIdException, InconsistentBrokerIdException} import kafka.controller.{ControllerStats, KafkaController} import kafka.coordinator.GroupCoordinator @@ -37,13 +37,13 @@ import kafka.security.CredentialProvider import kafka.security.auth.Authorizer import kafka.utils._ import org.I0Itec.zkclient.ZkClient -import org.apache.kafka.clients.{ClientRequest, ManualMetadataUpdater, NetworkClient} +import org.apache.kafka.clients.{ManualMetadataUpdater, NetworkClient} import org.apache.kafka.common.internals.ClusterResourceListeners import org.apache.kafka.common.metrics.{JmxReporter, Metrics, _} import org.apache.kafka.common.network._ -import org.apache.kafka.common.protocol.{ApiKeys, Errors, SecurityProtocol} +import org.apache.kafka.common.protocol.Errors import org.apache.kafka.common.requests.{ControlledShutdownRequest, ControlledShutdownResponse} -import org.apache.kafka.common.security.JaasUtils +import org.apache.kafka.common.security.{JaasContext, JaasUtils} import org.apache.kafka.common.utils.{AppInfoParser, Time} import org.apache.kafka.common.{ClusterResource, Node} @@ -360,8 +360,9 @@ class KafkaServer(val config: KafkaConfig, time: Time = Time.SYSTEM, threadNameP val networkClient = { val channelBuilder = ChannelBuilders.clientChannelBuilder( config.interBrokerSecurityProtocol, - LoginType.SERVER, - config.values, + JaasContext.Type.SERVER, + config, + config.interBrokerListenerName, config.saslMechanismInterBrokerProtocol, config.saslInterBrokerHandshakeRequestEnable) val selector = new Selector( diff --git a/core/src/main/scala/kafka/server/ReplicaFetcherThread.scala b/core/src/main/scala/kafka/server/ReplicaFetcherThread.scala index d6663fa31c9..df640eb9832 100644 --- a/core/src/main/scala/kafka/server/ReplicaFetcherThread.scala +++ b/core/src/main/scala/kafka/server/ReplicaFetcherThread.scala @@ -27,13 +27,14 @@ import kafka.api.{KAFKA_0_10_0_IV0, KAFKA_0_10_1_IV1, KAFKA_0_10_1_IV2, KAFKA_0_ import kafka.common.KafkaStorageException import ReplicaFetcherThread._ import org.apache.kafka.clients.{ClientRequest, ClientResponse, ManualMetadataUpdater, NetworkClient} -import org.apache.kafka.common.network.{ChannelBuilders, LoginType, Mode, NetworkReceive, Selectable, Selector} +import org.apache.kafka.common.network.{ChannelBuilders, Mode, NetworkReceive, Selectable, Selector} import org.apache.kafka.common.requests.{AbstractRequest, FetchResponse, ListOffsetRequest, ListOffsetResponse} import org.apache.kafka.common.requests.{FetchRequest => JFetchRequest} import org.apache.kafka.common.{Node, TopicPartition} import org.apache.kafka.common.metrics.Metrics import org.apache.kafka.common.protocol.{ApiKeys, Errors, ProtoUtils} import org.apache.kafka.common.record.MemoryRecords +import org.apache.kafka.common.security.JaasContext import org.apache.kafka.common.utils.Time import scala.collection.Map @@ -78,8 +79,9 @@ class ReplicaFetcherThread(name: String, private val networkClient = { val channelBuilder = ChannelBuilders.clientChannelBuilder( brokerConfig.interBrokerSecurityProtocol, - LoginType.SERVER, - brokerConfig.values, + JaasContext.Type.SERVER, + brokerConfig, + brokerConfig.interBrokerListenerName, brokerConfig.saslMechanismInterBrokerProtocol, brokerConfig.saslInterBrokerHandshakeRequestEnable ) diff --git a/core/src/test/scala/integration/kafka/api/IntegrationTestHarness.scala b/core/src/test/scala/integration/kafka/api/IntegrationTestHarness.scala index 92088f8210c..10baa42828c 100644 --- a/core/src/test/scala/integration/kafka/api/IntegrationTestHarness.scala +++ b/core/src/test/scala/integration/kafka/api/IntegrationTestHarness.scala @@ -48,6 +48,12 @@ trait IntegrationTestHarness extends KafkaServerTestHarness { override def generateConfigs() = { val cfgs = TestUtils.createBrokerConfigs(serverCount, zkConnect, interBrokerSecurityProtocol = Some(securityProtocol), trustStoreFile = trustStoreFile, saslProperties = serverSaslProperties) + cfgs.foreach { config => + config.setProperty(KafkaConfig.ListenersProp, s"${listenerName.value}://localhost:${TestUtils.RandomPort}") + config.remove(KafkaConfig.InterBrokerSecurityProtocolProp) + config.setProperty(KafkaConfig.InterBrokerListenerNameProp, listenerName.value) + config.setProperty(KafkaConfig.ListenerSecurityProtocolMapProp, s"${listenerName.value}:${securityProtocol.name}") + } cfgs.foreach(_.putAll(serverConfig)) cfgs.map(KafkaConfig.fromProps) } diff --git a/core/src/test/scala/integration/kafka/api/SaslEndToEndAuthorizationTest.scala b/core/src/test/scala/integration/kafka/api/SaslEndToEndAuthorizationTest.scala index 826eb5ce8ba..dd91627747d 100644 --- a/core/src/test/scala/integration/kafka/api/SaslEndToEndAuthorizationTest.scala +++ b/core/src/test/scala/integration/kafka/api/SaslEndToEndAuthorizationTest.scala @@ -37,14 +37,15 @@ abstract class SaslEndToEndAuthorizationTest extends EndToEndAuthorizationTest { @Before override def setUp { - startSasl(Both, kafkaServerSaslMechanisms, Option(kafkaClientSaslMechanism)) + startSasl(kafkaServerSaslMechanisms, Option(kafkaClientSaslMechanism), Both) super.setUp } // Use JAAS configuration properties for clients so that dynamic JAAS configuration is also tested by this set of tests - override protected def setJaasConfiguration(mode: SaslSetupMode, serverMechanisms: List[String], clientMechanism: Option[String]) { + override protected def setJaasConfiguration(mode: SaslSetupMode, serverEntryName: String, + serverMechanisms: List[String], clientMechanism: Option[String]) { // create static config with client login context with credentials for JaasTestUtils 'client2' - super.setJaasConfiguration(mode, kafkaServerSaslMechanisms, clientMechanism) + super.setJaasConfiguration(mode, serverEntryName, kafkaServerSaslMechanisms, clientMechanism) // set dynamic properties with credentials for JaasTestUtils 'client1' val clientLoginContext = jaasClientLoginModule(kafkaClientSaslMechanism) producerConfig.put(SaslConfigs.SASL_JAAS_CONFIG, clientLoginContext) diff --git a/core/src/test/scala/integration/kafka/api/SaslPlainPlaintextConsumerTest.scala b/core/src/test/scala/integration/kafka/api/SaslPlainPlaintextConsumerTest.scala index 125d4318ebe..ddf9578478e 100644 --- a/core/src/test/scala/integration/kafka/api/SaslPlainPlaintextConsumerTest.scala +++ b/core/src/test/scala/integration/kafka/api/SaslPlainPlaintextConsumerTest.scala @@ -13,13 +13,20 @@ package kafka.api import java.io.File +import java.util.Locale + import org.apache.kafka.common.protocol.SecurityProtocol import kafka.server.KafkaConfig +import kafka.utils.JaasTestUtils +import org.apache.kafka.common.network.ListenerName class SaslPlainPlaintextConsumerTest extends BaseConsumerTest with SaslTestHarness { override protected val zkSaslEnabled = true + override protected def listenerName = new ListenerName("CLIENT") override protected val kafkaClientSaslMechanism = "PLAIN" override protected val kafkaServerSaslMechanisms = List(kafkaClientSaslMechanism) + override protected val kafkaServerJaasEntryName = + s"${listenerName.value.toLowerCase(Locale.ROOT)}.${JaasTestUtils.KafkaServerContextName}" this.serverConfig.setProperty(KafkaConfig.ZkEnableSecureAclsProp, "true") override protected def securityProtocol = SecurityProtocol.SASL_PLAINTEXT override protected lazy val trustStoreFile = Some(File.createTempFile("truststore", ".jks")) diff --git a/core/src/test/scala/integration/kafka/api/SaslSetup.scala b/core/src/test/scala/integration/kafka/api/SaslSetup.scala index 36b9d41e058..29aea610ae5 100644 --- a/core/src/test/scala/integration/kafka/api/SaslSetup.scala +++ b/core/src/test/scala/integration/kafka/api/SaslSetup.scala @@ -46,7 +46,8 @@ trait SaslSetup { private var serverKeytabFile: Option[File] = null private var clientKeytabFile: Option[File] = null - def startSasl(mode: SaslSetupMode = Both, kafkaServerSaslMechanisms: List[String], kafkaClientSaslMechanism: Option[String]) { + def startSasl(kafkaServerSaslMechanisms: List[String], kafkaClientSaslMechanism: Option[String], + mode: SaslSetupMode = Both, kafkaServerJaasEntryName: String = JaasTestUtils.KafkaServerContextName) { // Important if tests leak consumers, producers or brokers LoginManager.closeAll() val hasKerberos = mode != ZkSasl && (kafkaClientSaslMechanism == Some("GSSAPI") || kafkaServerSaslMechanisms.contains("GSSAPI")) @@ -63,16 +64,19 @@ trait SaslSetup { this.clientKeytabFile = None this.serverKeytabFile = None } - setJaasConfiguration(mode, kafkaServerSaslMechanisms, kafkaClientSaslMechanism) + setJaasConfiguration(mode, kafkaServerJaasEntryName, kafkaServerSaslMechanisms, kafkaClientSaslMechanism) if (mode == Both || mode == ZkSasl) System.setProperty("zookeeper.authProvider.1", "org.apache.zookeeper.server.auth.SASLAuthenticationProvider") } - protected def setJaasConfiguration(mode: SaslSetupMode, kafkaServerSaslMechanisms: List[String], kafkaClientSaslMechanism: Option[String]) { + protected def setJaasConfiguration(mode: SaslSetupMode, kafkaServerEntryName: String, + kafkaServerSaslMechanisms: List[String], kafkaClientSaslMechanism: Option[String]) { val jaasFile = mode match { case ZkSasl => JaasTestUtils.writeZkFile() - case KafkaSasl => JaasTestUtils.writeKafkaFile(kafkaServerSaslMechanisms, kafkaClientSaslMechanism, serverKeytabFile, clientKeytabFile) - case Both => JaasTestUtils.writeZkAndKafkaFiles(kafkaServerSaslMechanisms, kafkaClientSaslMechanism, serverKeytabFile, clientKeytabFile) + case KafkaSasl => JaasTestUtils.writeKafkaFile(kafkaServerEntryName, kafkaServerSaslMechanisms, + kafkaClientSaslMechanism, serverKeytabFile, clientKeytabFile) + case Both => JaasTestUtils.writeZkAndKafkaFiles(kafkaServerEntryName, kafkaServerSaslMechanisms, + kafkaClientSaslMechanism, serverKeytabFile, clientKeytabFile) } // This will cause a reload of the Configuration singleton when `getConfiguration` is called Configuration.setConfiguration(null) @@ -104,5 +108,7 @@ trait SaslSetup { props } - def jaasClientLoginModule(clientSaslMechanism: String): String = JaasTestUtils.clientLoginModule(clientSaslMechanism, clientKeytabFile) + def jaasClientLoginModule(clientSaslMechanism: String): String = + JaasTestUtils.clientLoginModule(clientSaslMechanism, clientKeytabFile) + } diff --git a/core/src/test/scala/integration/kafka/api/SaslTestHarness.scala b/core/src/test/scala/integration/kafka/api/SaslTestHarness.scala index 97faa36e7fa..445a59caa63 100644 --- a/core/src/test/scala/integration/kafka/api/SaslTestHarness.scala +++ b/core/src/test/scala/integration/kafka/api/SaslTestHarness.scala @@ -12,20 +12,22 @@ */ package kafka.api +import kafka.utils.JaasTestUtils import kafka.zk.ZooKeeperTestHarness import org.junit.{After, Before} trait SaslTestHarness extends ZooKeeperTestHarness with SaslSetup { protected val zkSaslEnabled: Boolean + protected val kafkaServerJaasEntryName = JaasTestUtils.KafkaServerContextName protected val kafkaClientSaslMechanism = "GSSAPI" protected val kafkaServerSaslMechanisms = List(kafkaClientSaslMechanism) @Before override def setUp() { if (zkSaslEnabled) - startSasl(Both, kafkaServerSaslMechanisms, Some(kafkaClientSaslMechanism)) + startSasl(kafkaServerSaslMechanisms, Some(kafkaClientSaslMechanism), Both, kafkaServerJaasEntryName) else - startSasl(KafkaSasl, kafkaServerSaslMechanisms, Some(kafkaClientSaslMechanism)) + startSasl(kafkaServerSaslMechanisms, Some(kafkaClientSaslMechanism), KafkaSasl, kafkaServerJaasEntryName) super.setUp } diff --git a/core/src/test/scala/integration/kafka/api/SslEndToEndAuthorizationTest.scala b/core/src/test/scala/integration/kafka/api/SslEndToEndAuthorizationTest.scala index 064e7835081..4eca6e218c2 100644 --- a/core/src/test/scala/integration/kafka/api/SslEndToEndAuthorizationTest.scala +++ b/core/src/test/scala/integration/kafka/api/SslEndToEndAuthorizationTest.scala @@ -29,7 +29,7 @@ class SslEndToEndAuthorizationTest extends EndToEndAuthorizationTest { @Before override def setUp { - startSasl(ZkSasl, List.empty, None) + startSasl(List.empty, None, ZkSasl) super.setUp } } diff --git a/core/src/test/scala/integration/kafka/server/MultipleListenersWithSameSecurityProtocolTest.scala b/core/src/test/scala/integration/kafka/server/MultipleListenersWithSameSecurityProtocolTest.scala index 5bd64147b2a..37db174c95c 100644 --- a/core/src/test/scala/integration/kafka/server/MultipleListenersWithSameSecurityProtocolTest.scala +++ b/core/src/test/scala/integration/kafka/server/MultipleListenersWithSameSecurityProtocolTest.scala @@ -28,6 +28,7 @@ import kafka.utils.{CoreUtils, TestUtils} import kafka.zk.ZooKeeperTestHarness import org.apache.kafka.clients.consumer.{ConsumerRecord, KafkaConsumer} import org.apache.kafka.clients.producer.{KafkaProducer, ProducerRecord} +import org.apache.kafka.common.config.SslConfigs import org.apache.kafka.common.network.{ListenerName, Mode} import org.apache.kafka.common.protocol.SecurityProtocol import org.junit.Assert.assertEquals @@ -61,6 +62,13 @@ class MultipleListenersWithSameSecurityProtocolTest extends ZooKeeperTestHarness props.put(KafkaConfig.InterBrokerListenerNameProp, "INTERNAL") props.putAll(TestUtils.sslConfigs(Mode.SERVER, false, Some(trustStoreFile), s"server$brokerId")) + // set listener-specific configs and set an invalid path for the global config to verify that the overrides work + Seq("SECURE_INTERNAL", "SECURE_EXTERNAL").foreach { listenerName => + props.put(new ListenerName(listenerName).configPrefix + SslConfigs.SSL_KEYSTORE_LOCATION_CONFIG, + props.get(SslConfigs.SSL_KEYSTORE_LOCATION_CONFIG)) + } + props.put(SslConfigs.SSL_KEYSTORE_LOCATION_CONFIG, "invalid/file/path") + servers += TestUtils.createServer(KafkaConfig.fromProps(props)) } @@ -109,7 +117,7 @@ class MultipleListenersWithSameSecurityProtocolTest extends ZooKeeperTestHarness producers.foreach { case (listenerName, producer) => val producerRecords = (1 to 10).map(i => new ProducerRecord(listenerName.value, s"key$i".getBytes, s"value$i".getBytes)) - producerRecords.map(producer.send(_)).map(_.get(10, TimeUnit.SECONDS)) + producerRecords.map(producer.send).map(_.get(10, TimeUnit.SECONDS)) val consumer = consumers(listenerName) consumer.subscribe(Collections.singleton(listenerName.value)) diff --git a/core/src/test/scala/unit/kafka/integration/KafkaServerTestHarness.scala b/core/src/test/scala/unit/kafka/integration/KafkaServerTestHarness.scala index bfaff0bb343..ff319143a94 100755 --- a/core/src/test/scala/unit/kafka/integration/KafkaServerTestHarness.scala +++ b/core/src/test/scala/unit/kafka/integration/KafkaServerTestHarness.scala @@ -89,7 +89,7 @@ trait KafkaServerTestHarness extends ZooKeeperTestHarness { configureSecurityBeforeServersStart() servers = configs.map(TestUtils.createServer(_)).toBuffer - brokerList = TestUtils.getBrokerListStrFromServers(servers, securityProtocol) + brokerList = TestUtils.bootstrapServers(servers, listenerName) alive = new Array[Boolean](servers.length) Arrays.fill(alive, true) } diff --git a/core/src/test/scala/unit/kafka/network/SocketServerTest.scala b/core/src/test/scala/unit/kafka/network/SocketServerTest.scala index b8e3a8a4e5c..37bc23875fc 100644 --- a/core/src/test/scala/unit/kafka/network/SocketServerTest.scala +++ b/core/src/test/scala/unit/kafka/network/SocketServerTest.scala @@ -328,7 +328,7 @@ class SocketServerTest extends JUnitSuite { override def newProcessor(id: Int, connectionQuotas: ConnectionQuotas, listenerName: ListenerName, protocol: SecurityProtocol): Processor = { new Processor(id, time, config.socketRequestMaxBytes, requestChannel, connectionQuotas, - config.connectionsMaxIdleMs, listenerName, protocol, config.values, metrics, credentialProvider) { + config.connectionsMaxIdleMs, listenerName, protocol, config, metrics, credentialProvider) { override protected[network] def sendResponse(response: RequestChannel.Response) { conn.close() super.sendResponse(response) diff --git a/core/src/test/scala/unit/kafka/utils/JaasTestUtils.scala b/core/src/test/scala/unit/kafka/utils/JaasTestUtils.scala index 0949eb793b4..7b90abf86ee 100644 --- a/core/src/test/scala/unit/kafka/utils/JaasTestUtils.scala +++ b/core/src/test/scala/unit/kafka/utils/JaasTestUtils.scala @@ -99,7 +99,7 @@ object JaasTestUtils { private val ZkUserPassword = "fpjsecret" private val ZkModule = "org.apache.zookeeper.server.auth.DigestLoginModule" - private val KafkaServerContextName = "KafkaServer" + val KafkaServerContextName = "KafkaServer" val KafkaServerPrincipalUnqualifiedName = "kafka" private val KafkaServerPrincipal = KafkaServerPrincipalUnqualifiedName + "/localhost@EXAMPLE.COM" private val KafkaClientContextName = "KafkaClient" @@ -128,16 +128,22 @@ object JaasTestUtils { jaasFile.getCanonicalPath } - def writeKafkaFile(kafkaServerSaslMechanisms: List[String], kafkaClientSaslMechanism: Option[String], serverKeyTabLocation: Option[File], clientKeyTabLocation: Option[File]): String = { + def writeKafkaFile(serverEntryName: String, kafkaServerSaslMechanisms: List[String], + kafkaClientSaslMechanism: Option[String], serverKeyTabLocation: Option[File], + clientKeyTabLocation: Option[File]): String = { val jaasFile = TestUtils.tempFile() - val kafkaSections = Seq(kafkaServerSection(kafkaServerSaslMechanisms, serverKeyTabLocation), kafkaClientSection(kafkaClientSaslMechanism, clientKeyTabLocation)) + val kafkaSections = Seq(kafkaServerSection(serverEntryName, kafkaServerSaslMechanisms, serverKeyTabLocation), + kafkaClientSection(kafkaClientSaslMechanism, clientKeyTabLocation)) writeToFile(jaasFile, kafkaSections) jaasFile.getCanonicalPath } - def writeZkAndKafkaFiles(kafkaServerSaslMechanisms: List[String], kafkaClientSaslMechanism: Option[String], serverKeyTabLocation: Option[File], clientKeyTabLocation: Option[File]): String = { + def writeZkAndKafkaFiles(serverEntryName: String, kafkaServerSaslMechanisms: List[String], + kafkaClientSaslMechanism: Option[String], serverKeyTabLocation: Option[File], + clientKeyTabLocation: Option[File]): String = { val jaasFile = TestUtils.tempFile() - val kafkaSections = Seq(kafkaServerSection(kafkaServerSaslMechanisms, serverKeyTabLocation), kafkaClientSection(kafkaClientSaslMechanism, clientKeyTabLocation)) + val kafkaSections = Seq(kafkaServerSection(serverEntryName, kafkaServerSaslMechanisms, serverKeyTabLocation), + kafkaClientSection(kafkaClientSaslMechanism, clientKeyTabLocation)) writeToFile(jaasFile, kafkaSections ++ zkSections) jaasFile.getCanonicalPath } @@ -151,7 +157,7 @@ object JaasTestUtils { new JaasSection(ZkClientContextName, Seq(JaasModule(ZkModule, false, Map("username" -> ZkUser, "password" -> ZkUserPassword)))) ) - private def kafkaServerSection(mechanisms: List[String], keytabLocation: Option[File]): JaasSection = { + private def kafkaServerSection(contextName: String, mechanisms: List[String], keytabLocation: Option[File]): JaasSection = { val modules = mechanisms.map { case "GSSAPI" => Krb5LoginModule( @@ -174,7 +180,7 @@ object JaasTestUtils { debug = false).toJaasModule case mechanism => throw new IllegalArgumentException("Unsupported server mechanism " + mechanism) } - new JaasSection(KafkaServerContextName, modules) + new JaasSection(contextName, modules) } // consider refactoring if more mechanisms are added diff --git a/core/src/test/scala/unit/kafka/utils/TestUtils.scala b/core/src/test/scala/unit/kafka/utils/TestUtils.scala index c530e07604c..f132f9e7870 100755 --- a/core/src/test/scala/unit/kafka/utils/TestUtils.scala +++ b/core/src/test/scala/unit/kafka/utils/TestUtils.scala @@ -166,7 +166,7 @@ object TestUtils extends Logging { def bootstrapServers(servers: Seq[KafkaServer], listenerName: ListenerName): String = { servers.map { s => val listener = s.config.advertisedListeners.find(_.listenerName == listenerName).getOrElse( - sys.error(s"Could not find listener with name $listenerName")) + sys.error(s"Could not find listener with name ${listenerName.value}")) formatAddress(listener.host, s.boundPort(listenerName)) }.mkString(",") } diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsKafkaClient.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsKafkaClient.java index afde63f87b4..ede0b458464 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsKafkaClient.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsKafkaClient.java @@ -100,7 +100,7 @@ public class StreamsKafkaClient { reporters.add(new JmxReporter("kafka.admin")); final Metrics metrics = new Metrics(metricConfig, reporters, time); - final ChannelBuilder channelBuilder = ClientUtils.createChannelBuilder(streamsConfig.values()); + final ChannelBuilder channelBuilder = ClientUtils.createChannelBuilder(streamsConfig); final Selector selector = new Selector(streamsConfig.getLong(StreamsConfig.CONNECTIONS_MAX_IDLE_MS_CONFIG), metrics, time, "kafka-client", channelBuilder);