Browse Source

MINOR: Support dynamic JAAS config for broker's LoginManager cache (#4568)

Fix LoginManager caching when sasl.jaas.config is defined for broker and add unit tests.

Reviewers: Jason Gustafson <jason@confluent.io>
pull/4565/merge
Rajini Sivaram 7 years ago committed by GitHub
parent
commit
015e224b3d
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 28
      clients/src/main/java/org/apache/kafka/common/security/JaasContext.java
  2. 34
      clients/src/main/java/org/apache/kafka/common/security/authenticator/LoginManager.java
  3. 2
      clients/src/test/java/org/apache/kafka/common/network/SaslChannelBuilderTest.java
  4. 1
      clients/src/test/java/org/apache/kafka/common/security/authenticator/ClientAuthenticationFailureTest.java
  5. 129
      clients/src/test/java/org/apache/kafka/common/security/authenticator/LoginManagerTest.java
  6. 1
      clients/src/test/java/org/apache/kafka/common/security/authenticator/SaslAuthenticatorTest.java
  7. 2
      clients/src/test/java/org/apache/kafka/common/security/authenticator/SaslServerAuthenticatorTest.java
  8. 2
      clients/src/test/java/org/apache/kafka/common/security/plain/PlainSaslServerTest.java

28
clients/src/main/java/org/apache/kafka/common/security/JaasContext.java

@ -64,10 +64,10 @@ public class JaasContext { @@ -64,10 +64,10 @@ public class JaasContext {
throw new IllegalArgumentException("mechanism should not be null for SERVER");
String globalContextName = GLOBAL_CONTEXT_NAME_SERVER;
String listenerContextName = listenerName.value().toLowerCase(Locale.ROOT) + "." + GLOBAL_CONTEXT_NAME_SERVER;
Password jaasConfigArgs = (Password) configs.get(mechanism.toLowerCase(Locale.ROOT) + "." + SaslConfigs.SASL_JAAS_CONFIG);
if (jaasConfigArgs == null && configs.get(SaslConfigs.SASL_JAAS_CONFIG) != null)
Password dynamicJaasConfig = (Password) configs.get(mechanism.toLowerCase(Locale.ROOT) + "." + SaslConfigs.SASL_JAAS_CONFIG);
if (dynamicJaasConfig == null && configs.get(SaslConfigs.SASL_JAAS_CONFIG) != null)
LOG.warn("Server config {} should be prefixed with SASL mechanism name, ignoring config", SaslConfigs.SASL_JAAS_CONFIG);
return load(Type.SERVER, listenerContextName, globalContextName, jaasConfigArgs);
return load(Type.SERVER, listenerContextName, globalContextName, dynamicJaasConfig);
}
/**
@ -80,20 +80,20 @@ public class JaasContext { @@ -80,20 +80,20 @@ public class JaasContext {
*/
public static JaasContext loadClientContext(Map<String, ?> configs) {
String globalContextName = GLOBAL_CONTEXT_NAME_CLIENT;
Password jaasConfigArgs = (Password) configs.get(SaslConfigs.SASL_JAAS_CONFIG);
return load(JaasContext.Type.CLIENT, null, globalContextName, jaasConfigArgs);
Password dynamicJaasConfig = (Password) configs.get(SaslConfigs.SASL_JAAS_CONFIG);
return load(JaasContext.Type.CLIENT, null, globalContextName, dynamicJaasConfig);
}
static JaasContext load(JaasContext.Type contextType, String listenerContextName,
String globalContextName, Password jaasConfigArgs) {
if (jaasConfigArgs != null) {
JaasConfig jaasConfig = new JaasConfig(globalContextName, jaasConfigArgs.value());
String globalContextName, Password dynamicJaasConfig) {
if (dynamicJaasConfig != null) {
JaasConfig jaasConfig = new JaasConfig(globalContextName, dynamicJaasConfig.value());
AppConfigurationEntry[] contextModules = jaasConfig.getAppConfigurationEntry(globalContextName);
if (contextModules == null || contextModules.length == 0)
throw new IllegalArgumentException("JAAS config property does not contain any login modules");
else if (contextModules.length != 1)
throw new IllegalArgumentException("JAAS config property contains " + contextModules.length + " login modules, should be 1 module");
return new JaasContext(globalContextName, contextType, jaasConfig);
return new JaasContext(globalContextName, contextType, jaasConfig, dynamicJaasConfig);
} else
return defaultContext(contextType, listenerContextName, globalContextName);
}
@ -133,7 +133,7 @@ public class JaasContext { @@ -133,7 +133,7 @@ public class JaasContext {
throw new IllegalArgumentException(errorMessage);
}
return new JaasContext(contextName, contextType, jaasConfig);
return new JaasContext(contextName, contextType, jaasConfig, null);
}
/**
@ -146,8 +146,9 @@ public class JaasContext { @@ -146,8 +146,9 @@ public class JaasContext {
private final Type type;
private final Configuration configuration;
private final List<AppConfigurationEntry> configurationEntries;
private final Password dynamicJaasConfig;
public JaasContext(String name, Type type, Configuration configuration) {
public JaasContext(String name, Type type, Configuration configuration, Password dynamicJaasConfig) {
this.name = name;
this.type = type;
this.configuration = configuration;
@ -155,6 +156,7 @@ public class JaasContext { @@ -155,6 +156,7 @@ public class JaasContext {
if (entries == null)
throw new IllegalArgumentException("Could not find a '" + name + "' entry in this JAAS configuration.");
this.configurationEntries = Collections.unmodifiableList(new ArrayList<>(Arrays.asList(entries)));
this.dynamicJaasConfig = dynamicJaasConfig;
}
public String name() {
@ -173,6 +175,10 @@ public class JaasContext { @@ -173,6 +175,10 @@ public class JaasContext {
return configurationEntries;
}
public Password dynamicJaasConfig() {
return dynamicJaasConfig;
}
/**
* Returns the configuration option for <code>key</code> from this context.
* If login module name is specified, return option value only from that module.

34
clients/src/main/java/org/apache/kafka/common/security/authenticator/LoginManager.java

@ -46,8 +46,8 @@ public class LoginManager { @@ -46,8 +46,8 @@ public class LoginManager {
private int refCount;
private LoginManager(JaasContext jaasContext, boolean hasKerberos, Map<String, ?> configs,
Password jaasConfigValue) throws IOException, LoginException {
this.cacheKey = jaasConfigValue != null ? jaasConfigValue : jaasContext.name();
Object cacheKey) throws IOException, LoginException {
this.cacheKey = cacheKey;
login = hasKerberos ? new KerberosLogin() : new DefaultLogin();
login.configure(configs, jaasContext);
login.login();
@ -57,20 +57,33 @@ public class LoginManager { @@ -57,20 +57,33 @@ public class LoginManager {
* Returns an instance of `LoginManager` and increases its reference count.
*
* `release()` should be invoked when the `LoginManager` is no longer needed. This method will try to reuse an
* existing `LoginManager` for the provided context type and `SaslConfigs.SASL_JAAS_CONFIG` in `configs`,
* if available.
* existing `LoginManager` for the provided context type. If `jaasContext` was loaded from a dynamic config,
* login managers are reused for the same dynamic config value. For `jaasContext` loaded from static JAAS
* configuration, login managers are reused for static contexts with the same login context name.
*
* This is a bit ugly and it would be nicer if we could pass the `LoginManager` to `ChannelBuilders.create` and
* shut it down when the broker or clients are closed. It's straightforward to do the former, but it's more
* complicated to do the latter without making the consumer API more complex.
*
* @param jaasContext Static or dynamic JAAS context. `jaasContext.dynamicJaasConfig()` is non-null for dynamic context.
* For static contexts, this may contain multiple login modules if the context type is SERVER.
* For CLIENT static contexts and dynamic contexts of CLIENT and SERVER, 'jaasContext` contains
* only one login module.
* @param saslMechanism SASL mechanism for which login manager is being acquired. For dynamic contexts, the single
* login module in `jaasContext` corresponds to this SASL mechanism. Hence `Login` class is
* chosen based on this mechanism.
* @param hasKerberos Boolean flag that indicates if Kerberos is enabled for the server listener or client. Since
* static broker configuration may contain multiple login modules in a login context, KerberosLogin
* must be used if Kerberos is enabled on the listener, even if `saslMechanism` is not GSSAPI.
* @param configs Config options used to configure `Login` if a new login manager is created.
*
*/
public static LoginManager acquireLoginManager(JaasContext jaasContext, String saslMechanism, boolean hasKerberos,
Map<String, ?> configs) throws IOException, LoginException {
synchronized (LoginManager.class) {
// SASL_JAAS_CONFIG is only supported by clients
LoginManager loginManager;
Password jaasConfigValue = (Password) configs.get(SaslConfigs.SASL_JAAS_CONFIG);
if (jaasContext.type() == JaasContext.Type.CLIENT && jaasConfigValue != null) {
Password jaasConfigValue = jaasContext.dynamicJaasConfig();
if (jaasConfigValue != null) {
loginManager = DYNAMIC_INSTANCES.get(jaasConfigValue);
if (loginManager == null) {
loginManager = new LoginManager(jaasContext, saslMechanism.equals(SaslConfigs.GSSAPI_MECHANISM), configs, jaasConfigValue);
@ -79,7 +92,7 @@ public class LoginManager { @@ -79,7 +92,7 @@ public class LoginManager {
} else {
loginManager = STATIC_INSTANCES.get(jaasContext.name());
if (loginManager == null) {
loginManager = new LoginManager(jaasContext, hasKerberos, configs, jaasConfigValue);
loginManager = new LoginManager(jaasContext, hasKerberos, configs, jaasContext.name());
STATIC_INSTANCES.put(jaasContext.name(), loginManager);
}
}
@ -95,6 +108,11 @@ public class LoginManager { @@ -95,6 +108,11 @@ public class LoginManager {
return login.serviceName();
}
// Only for testing
Object cacheKey() {
return cacheKey;
}
private LoginManager acquire() {
++refCount;
LOGGER.trace("{} acquired", this);

2
clients/src/test/java/org/apache/kafka/common/network/SaslChannelBuilderTest.java

@ -71,7 +71,7 @@ public class SaslChannelBuilderTest { @@ -71,7 +71,7 @@ public class SaslChannelBuilderTest {
private SaslChannelBuilder createChannelBuilder(SecurityProtocol securityProtocol) {
TestJaasConfig jaasConfig = new TestJaasConfig();
jaasConfig.addEntry("jaasContext", PlainLoginModule.class.getName(), new HashMap<String, Object>());
JaasContext jaasContext = new JaasContext("jaasContext", JaasContext.Type.SERVER, jaasConfig);
JaasContext jaasContext = new JaasContext("jaasContext", JaasContext.Type.SERVER, jaasConfig, null);
Map<String, JaasContext> jaasContexts = Collections.singletonMap("PLAIN", jaasContext);
return new SaslChannelBuilder(Mode.CLIENT, jaasContexts, securityProtocol, new ListenerName("PLAIN"),
false, "PLAIN", true, null, null);

1
clients/src/test/java/org/apache/kafka/common/security/authenticator/ClientAuthenticationFailureTest.java

@ -56,6 +56,7 @@ public class ClientAuthenticationFailureTest { @@ -56,6 +56,7 @@ public class ClientAuthenticationFailureTest {
@Before
public void setup() throws Exception {
LoginManager.closeAll();
SecurityProtocol securityProtocol = SecurityProtocol.SASL_PLAINTEXT;
saslServerConfigs = new HashMap<>();

129
clients/src/test/java/org/apache/kafka/common/security/authenticator/LoginManagerTest.java

@ -0,0 +1,129 @@ @@ -0,0 +1,129 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.common.security.authenticator;
import org.apache.kafka.common.config.types.Password;
import org.apache.kafka.common.network.ListenerName;
import org.apache.kafka.common.security.JaasContext;
import org.apache.kafka.common.security.plain.PlainLoginModule;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
import static org.junit.Assert.assertNotSame;
import static org.junit.Assert.assertSame;
import static org.junit.Assert.assertEquals;
public class LoginManagerTest {
private Password dynamicPlainContext;
private Password dynamicDigestContext;
@Before
public void setUp() {
dynamicPlainContext = new Password(PlainLoginModule.class.getName() +
" required user=\"plainuser\" password=\"plain-secret\";");
dynamicDigestContext = new Password(TestDigestLoginModule.class.getName() +
" required user=\"digestuser\" password=\"digest-secret\";");
TestJaasConfig.createConfiguration("SCRAM-SHA-256",
Collections.singletonList("SCRAM-SHA-256"));
}
@After
public void tearDown() {
LoginManager.closeAll();
}
@Test
public void testClientLoginManager() throws Exception {
Map<String, ?> configs = Collections.singletonMap("sasl.jaas.config", dynamicPlainContext);
JaasContext dynamicContext = JaasContext.loadClientContext(configs);
JaasContext staticContext = JaasContext.loadClientContext(Collections.<String, Object>emptyMap());
LoginManager dynamicLogin = LoginManager.acquireLoginManager(dynamicContext, "PLAIN",
false, configs);
assertEquals(dynamicPlainContext, dynamicLogin.cacheKey());
LoginManager staticLogin = LoginManager.acquireLoginManager(staticContext, "SCRAM-SHA-256",
false, configs);
assertNotSame(dynamicLogin, staticLogin);
assertEquals("KafkaClient", staticLogin.cacheKey());
assertSame(dynamicLogin, LoginManager.acquireLoginManager(dynamicContext, "PLAIN",
false, configs));
assertSame(staticLogin, LoginManager.acquireLoginManager(staticContext, "SCRAM-SHA-256",
false, configs));
verifyLoginManagerRelease(dynamicLogin, 2, dynamicContext, configs);
verifyLoginManagerRelease(staticLogin, 2, staticContext, configs);
}
@Test
public void testServerLoginManager() throws Exception {
Map<String, Object> configs = new HashMap<>();
configs.put("plain.sasl.jaas.config", dynamicPlainContext);
configs.put("digest-md5.sasl.jaas.config", dynamicDigestContext);
ListenerName listenerName = new ListenerName("listener1");
JaasContext plainJaasContext = JaasContext.loadServerContext(listenerName, "PLAIN", configs);
JaasContext digestJaasContext = JaasContext.loadServerContext(listenerName, "DIGEST-MD5", configs);
JaasContext scramJaasContext = JaasContext.loadServerContext(listenerName, "SCRAM-SHA-256", configs);
LoginManager dynamicPlainLogin = LoginManager.acquireLoginManager(plainJaasContext, "PLAIN",
false, configs);
assertEquals(dynamicPlainContext, dynamicPlainLogin.cacheKey());
LoginManager dynamicDigestLogin = LoginManager.acquireLoginManager(digestJaasContext, "DIGEST-MD5",
false, configs);
assertNotSame(dynamicPlainLogin, dynamicDigestLogin);
assertEquals(dynamicDigestContext, dynamicDigestLogin.cacheKey());
LoginManager staticScramLogin = LoginManager.acquireLoginManager(scramJaasContext, "SCRAM-SHA-256",
false, configs);
assertNotSame(dynamicPlainLogin, staticScramLogin);
assertEquals("KafkaServer", staticScramLogin.cacheKey());
assertSame(dynamicPlainLogin, LoginManager.acquireLoginManager(plainJaasContext, "PLAIN",
false, configs));
assertSame(dynamicDigestLogin, LoginManager.acquireLoginManager(digestJaasContext, "DIGEST-MD5",
false, configs));
assertSame(staticScramLogin, LoginManager.acquireLoginManager(scramJaasContext, "SCRAM-SHA-256",
false, configs));
verifyLoginManagerRelease(dynamicPlainLogin, 2, plainJaasContext, configs);
verifyLoginManagerRelease(dynamicDigestLogin, 2, digestJaasContext, configs);
verifyLoginManagerRelease(staticScramLogin, 2, scramJaasContext, configs);
}
private void verifyLoginManagerRelease(LoginManager loginManager, int acquireCount, JaasContext jaasContext,
Map<String, ?> configs) throws Exception {
// Release all except one reference and verify that the loginManager is still cached
for (int i = 0; i < acquireCount - 1; i++)
loginManager.release();
assertSame(loginManager, LoginManager.acquireLoginManager(jaasContext, "PLAIN",
false, configs));
// Release all references and verify that new LoginManager is created on next acquire
for (int i = 0; i < 2; i++) // release all references
loginManager.release();
LoginManager newLoginManager = LoginManager.acquireLoginManager(jaasContext, "PLAIN",
false, configs);
assertNotSame(loginManager, newLoginManager);
newLoginManager.release();
}
}

1
clients/src/test/java/org/apache/kafka/common/security/authenticator/SaslAuthenticatorTest.java

@ -104,6 +104,7 @@ public class SaslAuthenticatorTest { @@ -104,6 +104,7 @@ public class SaslAuthenticatorTest {
@Before
public void setup() throws Exception {
LoginManager.closeAll();
serverCertStores = new CertStores(true, "localhost");
clientCertStores = new CertStores(false, "localhost");
saslServerConfigs = serverCertStores.getTrustingConfig(clientCertStores);

2
clients/src/test/java/org/apache/kafka/common/security/authenticator/SaslServerAuthenticatorTest.java

@ -110,7 +110,7 @@ public class SaslServerAuthenticatorTest { @@ -110,7 +110,7 @@ public class SaslServerAuthenticatorTest {
TestJaasConfig jaasConfig = new TestJaasConfig();
jaasConfig.addEntry("jaasContext", PlainLoginModule.class.getName(), new HashMap<String, Object>());
Map<String, JaasContext> jaasContexts = Collections.singletonMap(mechanism,
new JaasContext("jaasContext", JaasContext.Type.SERVER, jaasConfig));
new JaasContext("jaasContext", JaasContext.Type.SERVER, jaasConfig, null));
Map<String, Subject> subjects = Collections.singletonMap(mechanism, new Subject());
return new SaslServerAuthenticator(configs, "node", jaasContexts, subjects, null, new CredentialCache(),
new ListenerName("ssl"), SecurityProtocol.SASL_SSL, transportLayer, new DelegationTokenCache(ScramMechanism.mechanismNames()));

2
clients/src/test/java/org/apache/kafka/common/security/plain/PlainSaslServerTest.java

@ -45,7 +45,7 @@ public class PlainSaslServerTest { @@ -45,7 +45,7 @@ public class PlainSaslServerTest {
options.put("user_" + USER_A, PASSWORD_A);
options.put("user_" + USER_B, PASSWORD_B);
jaasConfig.addEntry("jaasContext", PlainLoginModule.class.getName(), options);
JaasContext jaasContext = new JaasContext("jaasContext", JaasContext.Type.SERVER, jaasConfig);
JaasContext jaasContext = new JaasContext("jaasContext", JaasContext.Type.SERVER, jaasConfig, null);
saslServer = new PlainSaslServer(jaasContext);
}

Loading…
Cancel
Save