From 015e224b3d3c8e7bc412686ff22d5e99324b1019 Mon Sep 17 00:00:00 2001 From: Rajini Sivaram Date: Thu, 15 Feb 2018 09:26:34 +0000 Subject: [PATCH] MINOR: Support dynamic JAAS config for broker's LoginManager cache (#4568) Fix LoginManager caching when sasl.jaas.config is defined for broker and add unit tests. Reviewers: Jason Gustafson --- .../kafka/common/security/JaasContext.java | 28 ++-- .../security/authenticator/LoginManager.java | 34 +++-- .../network/SaslChannelBuilderTest.java | 2 +- .../ClientAuthenticationFailureTest.java | 1 + .../authenticator/LoginManagerTest.java | 129 ++++++++++++++++++ .../authenticator/SaslAuthenticatorTest.java | 1 + .../SaslServerAuthenticatorTest.java | 2 +- .../security/plain/PlainSaslServerTest.java | 2 +- 8 files changed, 177 insertions(+), 22 deletions(-) create mode 100644 clients/src/test/java/org/apache/kafka/common/security/authenticator/LoginManagerTest.java diff --git a/clients/src/main/java/org/apache/kafka/common/security/JaasContext.java b/clients/src/main/java/org/apache/kafka/common/security/JaasContext.java index 6afed556925..d72f00dc590 100644 --- a/clients/src/main/java/org/apache/kafka/common/security/JaasContext.java +++ b/clients/src/main/java/org/apache/kafka/common/security/JaasContext.java @@ -64,10 +64,10 @@ public class JaasContext { throw new IllegalArgumentException("mechanism should not be null for SERVER"); String globalContextName = GLOBAL_CONTEXT_NAME_SERVER; String listenerContextName = listenerName.value().toLowerCase(Locale.ROOT) + "." + GLOBAL_CONTEXT_NAME_SERVER; - Password jaasConfigArgs = (Password) configs.get(mechanism.toLowerCase(Locale.ROOT) + "." + SaslConfigs.SASL_JAAS_CONFIG); - if (jaasConfigArgs == null && configs.get(SaslConfigs.SASL_JAAS_CONFIG) != null) + Password dynamicJaasConfig = (Password) configs.get(mechanism.toLowerCase(Locale.ROOT) + "." + SaslConfigs.SASL_JAAS_CONFIG); + if (dynamicJaasConfig == null && configs.get(SaslConfigs.SASL_JAAS_CONFIG) != null) LOG.warn("Server config {} should be prefixed with SASL mechanism name, ignoring config", SaslConfigs.SASL_JAAS_CONFIG); - return load(Type.SERVER, listenerContextName, globalContextName, jaasConfigArgs); + return load(Type.SERVER, listenerContextName, globalContextName, dynamicJaasConfig); } /** @@ -80,20 +80,20 @@ public class JaasContext { */ public static JaasContext loadClientContext(Map configs) { String globalContextName = GLOBAL_CONTEXT_NAME_CLIENT; - Password jaasConfigArgs = (Password) configs.get(SaslConfigs.SASL_JAAS_CONFIG); - return load(JaasContext.Type.CLIENT, null, globalContextName, jaasConfigArgs); + Password dynamicJaasConfig = (Password) configs.get(SaslConfigs.SASL_JAAS_CONFIG); + return load(JaasContext.Type.CLIENT, null, globalContextName, dynamicJaasConfig); } static JaasContext load(JaasContext.Type contextType, String listenerContextName, - String globalContextName, Password jaasConfigArgs) { - if (jaasConfigArgs != null) { - JaasConfig jaasConfig = new JaasConfig(globalContextName, jaasConfigArgs.value()); + String globalContextName, Password dynamicJaasConfig) { + if (dynamicJaasConfig != null) { + JaasConfig jaasConfig = new JaasConfig(globalContextName, dynamicJaasConfig.value()); AppConfigurationEntry[] contextModules = jaasConfig.getAppConfigurationEntry(globalContextName); if (contextModules == null || contextModules.length == 0) throw new IllegalArgumentException("JAAS config property does not contain any login modules"); else if (contextModules.length != 1) throw new IllegalArgumentException("JAAS config property contains " + contextModules.length + " login modules, should be 1 module"); - return new JaasContext(globalContextName, contextType, jaasConfig); + return new JaasContext(globalContextName, contextType, jaasConfig, dynamicJaasConfig); } else return defaultContext(contextType, listenerContextName, globalContextName); } @@ -133,7 +133,7 @@ public class JaasContext { throw new IllegalArgumentException(errorMessage); } - return new JaasContext(contextName, contextType, jaasConfig); + return new JaasContext(contextName, contextType, jaasConfig, null); } /** @@ -146,8 +146,9 @@ public class JaasContext { private final Type type; private final Configuration configuration; private final List configurationEntries; + private final Password dynamicJaasConfig; - public JaasContext(String name, Type type, Configuration configuration) { + public JaasContext(String name, Type type, Configuration configuration, Password dynamicJaasConfig) { this.name = name; this.type = type; this.configuration = configuration; @@ -155,6 +156,7 @@ public class JaasContext { if (entries == null) throw new IllegalArgumentException("Could not find a '" + name + "' entry in this JAAS configuration."); this.configurationEntries = Collections.unmodifiableList(new ArrayList<>(Arrays.asList(entries))); + this.dynamicJaasConfig = dynamicJaasConfig; } public String name() { @@ -173,6 +175,10 @@ public class JaasContext { return configurationEntries; } + public Password dynamicJaasConfig() { + return dynamicJaasConfig; + } + /** * Returns the configuration option for key from this context. * If login module name is specified, return option value only from that module. diff --git a/clients/src/main/java/org/apache/kafka/common/security/authenticator/LoginManager.java b/clients/src/main/java/org/apache/kafka/common/security/authenticator/LoginManager.java index dc264c8a5fd..81dc063c743 100644 --- a/clients/src/main/java/org/apache/kafka/common/security/authenticator/LoginManager.java +++ b/clients/src/main/java/org/apache/kafka/common/security/authenticator/LoginManager.java @@ -46,8 +46,8 @@ public class LoginManager { private int refCount; private LoginManager(JaasContext jaasContext, boolean hasKerberos, Map configs, - Password jaasConfigValue) throws IOException, LoginException { - this.cacheKey = jaasConfigValue != null ? jaasConfigValue : jaasContext.name(); + Object cacheKey) throws IOException, LoginException { + this.cacheKey = cacheKey; login = hasKerberos ? new KerberosLogin() : new DefaultLogin(); login.configure(configs, jaasContext); login.login(); @@ -57,20 +57,33 @@ public class LoginManager { * Returns an instance of `LoginManager` and increases its reference count. * * `release()` should be invoked when the `LoginManager` is no longer needed. This method will try to reuse an - * existing `LoginManager` for the provided context type and `SaslConfigs.SASL_JAAS_CONFIG` in `configs`, - * if available. + * existing `LoginManager` for the provided context type. If `jaasContext` was loaded from a dynamic config, + * login managers are reused for the same dynamic config value. For `jaasContext` loaded from static JAAS + * configuration, login managers are reused for static contexts with the same login context name. * * This is a bit ugly and it would be nicer if we could pass the `LoginManager` to `ChannelBuilders.create` and * shut it down when the broker or clients are closed. It's straightforward to do the former, but it's more * complicated to do the latter without making the consumer API more complex. + * + * @param jaasContext Static or dynamic JAAS context. `jaasContext.dynamicJaasConfig()` is non-null for dynamic context. + * For static contexts, this may contain multiple login modules if the context type is SERVER. + * For CLIENT static contexts and dynamic contexts of CLIENT and SERVER, 'jaasContext` contains + * only one login module. + * @param saslMechanism SASL mechanism for which login manager is being acquired. For dynamic contexts, the single + * login module in `jaasContext` corresponds to this SASL mechanism. Hence `Login` class is + * chosen based on this mechanism. + * @param hasKerberos Boolean flag that indicates if Kerberos is enabled for the server listener or client. Since + * static broker configuration may contain multiple login modules in a login context, KerberosLogin + * must be used if Kerberos is enabled on the listener, even if `saslMechanism` is not GSSAPI. + * @param configs Config options used to configure `Login` if a new login manager is created. + * */ public static LoginManager acquireLoginManager(JaasContext jaasContext, String saslMechanism, boolean hasKerberos, Map configs) throws IOException, LoginException { synchronized (LoginManager.class) { - // SASL_JAAS_CONFIG is only supported by clients LoginManager loginManager; - Password jaasConfigValue = (Password) configs.get(SaslConfigs.SASL_JAAS_CONFIG); - if (jaasContext.type() == JaasContext.Type.CLIENT && jaasConfigValue != null) { + Password jaasConfigValue = jaasContext.dynamicJaasConfig(); + if (jaasConfigValue != null) { loginManager = DYNAMIC_INSTANCES.get(jaasConfigValue); if (loginManager == null) { loginManager = new LoginManager(jaasContext, saslMechanism.equals(SaslConfigs.GSSAPI_MECHANISM), configs, jaasConfigValue); @@ -79,7 +92,7 @@ public class LoginManager { } else { loginManager = STATIC_INSTANCES.get(jaasContext.name()); if (loginManager == null) { - loginManager = new LoginManager(jaasContext, hasKerberos, configs, jaasConfigValue); + loginManager = new LoginManager(jaasContext, hasKerberos, configs, jaasContext.name()); STATIC_INSTANCES.put(jaasContext.name(), loginManager); } } @@ -95,6 +108,11 @@ public class LoginManager { return login.serviceName(); } + // Only for testing + Object cacheKey() { + return cacheKey; + } + private LoginManager acquire() { ++refCount; LOGGER.trace("{} acquired", this); diff --git a/clients/src/test/java/org/apache/kafka/common/network/SaslChannelBuilderTest.java b/clients/src/test/java/org/apache/kafka/common/network/SaslChannelBuilderTest.java index 6072bf54fa7..26cc544cd6d 100644 --- a/clients/src/test/java/org/apache/kafka/common/network/SaslChannelBuilderTest.java +++ b/clients/src/test/java/org/apache/kafka/common/network/SaslChannelBuilderTest.java @@ -71,7 +71,7 @@ public class SaslChannelBuilderTest { private SaslChannelBuilder createChannelBuilder(SecurityProtocol securityProtocol) { TestJaasConfig jaasConfig = new TestJaasConfig(); jaasConfig.addEntry("jaasContext", PlainLoginModule.class.getName(), new HashMap()); - JaasContext jaasContext = new JaasContext("jaasContext", JaasContext.Type.SERVER, jaasConfig); + JaasContext jaasContext = new JaasContext("jaasContext", JaasContext.Type.SERVER, jaasConfig, null); Map jaasContexts = Collections.singletonMap("PLAIN", jaasContext); return new SaslChannelBuilder(Mode.CLIENT, jaasContexts, securityProtocol, new ListenerName("PLAIN"), false, "PLAIN", true, null, null); diff --git a/clients/src/test/java/org/apache/kafka/common/security/authenticator/ClientAuthenticationFailureTest.java b/clients/src/test/java/org/apache/kafka/common/security/authenticator/ClientAuthenticationFailureTest.java index 7c028c4db61..c64597b7e8b 100644 --- a/clients/src/test/java/org/apache/kafka/common/security/authenticator/ClientAuthenticationFailureTest.java +++ b/clients/src/test/java/org/apache/kafka/common/security/authenticator/ClientAuthenticationFailureTest.java @@ -56,6 +56,7 @@ public class ClientAuthenticationFailureTest { @Before public void setup() throws Exception { + LoginManager.closeAll(); SecurityProtocol securityProtocol = SecurityProtocol.SASL_PLAINTEXT; saslServerConfigs = new HashMap<>(); diff --git a/clients/src/test/java/org/apache/kafka/common/security/authenticator/LoginManagerTest.java b/clients/src/test/java/org/apache/kafka/common/security/authenticator/LoginManagerTest.java new file mode 100644 index 00000000000..8be72fb5b8c --- /dev/null +++ b/clients/src/test/java/org/apache/kafka/common/security/authenticator/LoginManagerTest.java @@ -0,0 +1,129 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.common.security.authenticator; + +import org.apache.kafka.common.config.types.Password; +import org.apache.kafka.common.network.ListenerName; +import org.apache.kafka.common.security.JaasContext; +import org.apache.kafka.common.security.plain.PlainLoginModule; +import org.junit.After; +import org.junit.Before; +import org.junit.Test; + +import java.util.Collections; +import java.util.HashMap; +import java.util.Map; + +import static org.junit.Assert.assertNotSame; +import static org.junit.Assert.assertSame; +import static org.junit.Assert.assertEquals; + +public class LoginManagerTest { + + private Password dynamicPlainContext; + private Password dynamicDigestContext; + + @Before + public void setUp() { + dynamicPlainContext = new Password(PlainLoginModule.class.getName() + + " required user=\"plainuser\" password=\"plain-secret\";"); + dynamicDigestContext = new Password(TestDigestLoginModule.class.getName() + + " required user=\"digestuser\" password=\"digest-secret\";"); + TestJaasConfig.createConfiguration("SCRAM-SHA-256", + Collections.singletonList("SCRAM-SHA-256")); + } + + @After + public void tearDown() { + LoginManager.closeAll(); + } + + @Test + public void testClientLoginManager() throws Exception { + Map configs = Collections.singletonMap("sasl.jaas.config", dynamicPlainContext); + JaasContext dynamicContext = JaasContext.loadClientContext(configs); + JaasContext staticContext = JaasContext.loadClientContext(Collections.emptyMap()); + + LoginManager dynamicLogin = LoginManager.acquireLoginManager(dynamicContext, "PLAIN", + false, configs); + assertEquals(dynamicPlainContext, dynamicLogin.cacheKey()); + LoginManager staticLogin = LoginManager.acquireLoginManager(staticContext, "SCRAM-SHA-256", + false, configs); + assertNotSame(dynamicLogin, staticLogin); + assertEquals("KafkaClient", staticLogin.cacheKey()); + + assertSame(dynamicLogin, LoginManager.acquireLoginManager(dynamicContext, "PLAIN", + false, configs)); + assertSame(staticLogin, LoginManager.acquireLoginManager(staticContext, "SCRAM-SHA-256", + false, configs)); + + verifyLoginManagerRelease(dynamicLogin, 2, dynamicContext, configs); + verifyLoginManagerRelease(staticLogin, 2, staticContext, configs); + } + + @Test + public void testServerLoginManager() throws Exception { + Map configs = new HashMap<>(); + configs.put("plain.sasl.jaas.config", dynamicPlainContext); + configs.put("digest-md5.sasl.jaas.config", dynamicDigestContext); + ListenerName listenerName = new ListenerName("listener1"); + JaasContext plainJaasContext = JaasContext.loadServerContext(listenerName, "PLAIN", configs); + JaasContext digestJaasContext = JaasContext.loadServerContext(listenerName, "DIGEST-MD5", configs); + JaasContext scramJaasContext = JaasContext.loadServerContext(listenerName, "SCRAM-SHA-256", configs); + + LoginManager dynamicPlainLogin = LoginManager.acquireLoginManager(plainJaasContext, "PLAIN", + false, configs); + assertEquals(dynamicPlainContext, dynamicPlainLogin.cacheKey()); + LoginManager dynamicDigestLogin = LoginManager.acquireLoginManager(digestJaasContext, "DIGEST-MD5", + false, configs); + assertNotSame(dynamicPlainLogin, dynamicDigestLogin); + assertEquals(dynamicDigestContext, dynamicDigestLogin.cacheKey()); + LoginManager staticScramLogin = LoginManager.acquireLoginManager(scramJaasContext, "SCRAM-SHA-256", + false, configs); + assertNotSame(dynamicPlainLogin, staticScramLogin); + assertEquals("KafkaServer", staticScramLogin.cacheKey()); + + assertSame(dynamicPlainLogin, LoginManager.acquireLoginManager(plainJaasContext, "PLAIN", + false, configs)); + assertSame(dynamicDigestLogin, LoginManager.acquireLoginManager(digestJaasContext, "DIGEST-MD5", + false, configs)); + assertSame(staticScramLogin, LoginManager.acquireLoginManager(scramJaasContext, "SCRAM-SHA-256", + false, configs)); + + verifyLoginManagerRelease(dynamicPlainLogin, 2, plainJaasContext, configs); + verifyLoginManagerRelease(dynamicDigestLogin, 2, digestJaasContext, configs); + verifyLoginManagerRelease(staticScramLogin, 2, scramJaasContext, configs); + } + + private void verifyLoginManagerRelease(LoginManager loginManager, int acquireCount, JaasContext jaasContext, + Map configs) throws Exception { + + // Release all except one reference and verify that the loginManager is still cached + for (int i = 0; i < acquireCount - 1; i++) + loginManager.release(); + assertSame(loginManager, LoginManager.acquireLoginManager(jaasContext, "PLAIN", + false, configs)); + + // Release all references and verify that new LoginManager is created on next acquire + for (int i = 0; i < 2; i++) // release all references + loginManager.release(); + LoginManager newLoginManager = LoginManager.acquireLoginManager(jaasContext, "PLAIN", + false, configs); + assertNotSame(loginManager, newLoginManager); + newLoginManager.release(); + } +} diff --git a/clients/src/test/java/org/apache/kafka/common/security/authenticator/SaslAuthenticatorTest.java b/clients/src/test/java/org/apache/kafka/common/security/authenticator/SaslAuthenticatorTest.java index ef2a07578da..b8edc612f69 100644 --- a/clients/src/test/java/org/apache/kafka/common/security/authenticator/SaslAuthenticatorTest.java +++ b/clients/src/test/java/org/apache/kafka/common/security/authenticator/SaslAuthenticatorTest.java @@ -104,6 +104,7 @@ public class SaslAuthenticatorTest { @Before public void setup() throws Exception { + LoginManager.closeAll(); serverCertStores = new CertStores(true, "localhost"); clientCertStores = new CertStores(false, "localhost"); saslServerConfigs = serverCertStores.getTrustingConfig(clientCertStores); diff --git a/clients/src/test/java/org/apache/kafka/common/security/authenticator/SaslServerAuthenticatorTest.java b/clients/src/test/java/org/apache/kafka/common/security/authenticator/SaslServerAuthenticatorTest.java index 72c296943e7..17d31bda72c 100644 --- a/clients/src/test/java/org/apache/kafka/common/security/authenticator/SaslServerAuthenticatorTest.java +++ b/clients/src/test/java/org/apache/kafka/common/security/authenticator/SaslServerAuthenticatorTest.java @@ -110,7 +110,7 @@ public class SaslServerAuthenticatorTest { TestJaasConfig jaasConfig = new TestJaasConfig(); jaasConfig.addEntry("jaasContext", PlainLoginModule.class.getName(), new HashMap()); Map jaasContexts = Collections.singletonMap(mechanism, - new JaasContext("jaasContext", JaasContext.Type.SERVER, jaasConfig)); + new JaasContext("jaasContext", JaasContext.Type.SERVER, jaasConfig, null)); Map subjects = Collections.singletonMap(mechanism, new Subject()); return new SaslServerAuthenticator(configs, "node", jaasContexts, subjects, null, new CredentialCache(), new ListenerName("ssl"), SecurityProtocol.SASL_SSL, transportLayer, new DelegationTokenCache(ScramMechanism.mechanismNames())); diff --git a/clients/src/test/java/org/apache/kafka/common/security/plain/PlainSaslServerTest.java b/clients/src/test/java/org/apache/kafka/common/security/plain/PlainSaslServerTest.java index 4196db6ac0f..86baf3e07af 100644 --- a/clients/src/test/java/org/apache/kafka/common/security/plain/PlainSaslServerTest.java +++ b/clients/src/test/java/org/apache/kafka/common/security/plain/PlainSaslServerTest.java @@ -45,7 +45,7 @@ public class PlainSaslServerTest { options.put("user_" + USER_A, PASSWORD_A); options.put("user_" + USER_B, PASSWORD_B); jaasConfig.addEntry("jaasContext", PlainLoginModule.class.getName(), options); - JaasContext jaasContext = new JaasContext("jaasContext", JaasContext.Type.SERVER, jaasConfig); + JaasContext jaasContext = new JaasContext("jaasContext", JaasContext.Type.SERVER, jaasConfig, null); saslServer = new PlainSaslServer(jaasContext); }