Browse Source

KAFKA-4636; Per listener security settings overrides (KIP-103)

Author: Ismael Juma <ismael@juma.me.uk>

Reviewers: Jun Rao <junrao@gmail.com>, Rajini Sivaram <rajinisivaram@googlemail.com>

Closes #2406 from ijuma/kafka-4636-per-listener-security-settings
pull/2337/merge
Ismael Juma 8 years ago
parent
commit
ca0c071c10
  1. 15
      clients/src/main/java/org/apache/kafka/clients/ClientUtils.java
  2. 2
      clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java
  3. 2
      clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java
  4. 48
      clients/src/main/java/org/apache/kafka/common/config/AbstractConfig.java
  5. 38
      clients/src/main/java/org/apache/kafka/common/config/ConfigDef.java
  6. 56
      clients/src/main/java/org/apache/kafka/common/network/ChannelBuilders.java
  7. 6
      clients/src/main/java/org/apache/kafka/common/network/ListenerName.java
  8. 39
      clients/src/main/java/org/apache/kafka/common/network/LoginType.java
  9. 3
      clients/src/main/java/org/apache/kafka/common/network/Mode.java
  10. 48
      clients/src/main/java/org/apache/kafka/common/network/SaslChannelBuilder.java
  11. 7
      clients/src/main/java/org/apache/kafka/common/security/JaasConfig.java
  12. 190
      clients/src/main/java/org/apache/kafka/common/security/JaasContext.java
  13. 111
      clients/src/main/java/org/apache/kafka/common/security/JaasUtils.java
  14. 1
      clients/src/main/java/org/apache/kafka/common/security/auth/DefaultPrincipalBuilder.java
  15. 5
      clients/src/main/java/org/apache/kafka/common/security/auth/Login.java
  16. 17
      clients/src/main/java/org/apache/kafka/common/security/authenticator/AbstractLogin.java
  17. 54
      clients/src/main/java/org/apache/kafka/common/security/authenticator/LoginManager.java
  18. 2
      clients/src/main/java/org/apache/kafka/common/security/authenticator/SaslClientCallbackHandler.java
  19. 10
      clients/src/main/java/org/apache/kafka/common/security/authenticator/SaslServerAuthenticator.java
  20. 15
      clients/src/main/java/org/apache/kafka/common/security/authenticator/SaslServerCallbackHandler.java
  21. 44
      clients/src/main/java/org/apache/kafka/common/security/kerberos/KerberosLogin.java
  22. 29
      clients/src/main/java/org/apache/kafka/common/security/plain/PlainSaslServer.java
  23. 2
      clients/src/main/java/org/apache/kafka/common/security/scram/ScramServerCallbackHandler.java
  24. 58
      clients/src/test/java/org/apache/kafka/common/config/AbstractConfigTest.java
  25. 9
      clients/src/test/java/org/apache/kafka/common/network/NetworkTestUtils.java
  26. 10
      clients/src/test/java/org/apache/kafka/common/network/NioEchoServer.java
  27. 94
      clients/src/test/java/org/apache/kafka/common/network/SslTransportLayerTest.java
  28. 93
      clients/src/test/java/org/apache/kafka/common/security/JaasContextTest.java
  29. 38
      clients/src/test/java/org/apache/kafka/common/security/TestSecurityConfig.java
  30. 133
      clients/src/test/java/org/apache/kafka/common/security/authenticator/SaslAuthenticatorTest.java
  31. 10
      clients/src/test/java/org/apache/kafka/common/security/authenticator/TestJaasConfig.java
  32. 2
      clients/src/test/java/org/apache/kafka/test/TestSslUtils.java
  33. 2
      connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/WorkerGroupMember.java
  34. 2
      core/src/main/scala/kafka/admin/AdminClient.scala
  35. 8
      core/src/main/scala/kafka/controller/ControllerChannelManager.scala
  36. 8
      core/src/main/scala/kafka/network/SocketServer.scala
  37. 13
      core/src/main/scala/kafka/server/KafkaServer.scala
  38. 8
      core/src/main/scala/kafka/server/ReplicaFetcherThread.scala
  39. 6
      core/src/test/scala/integration/kafka/api/IntegrationTestHarness.scala
  40. 7
      core/src/test/scala/integration/kafka/api/SaslEndToEndAuthorizationTest.scala
  41. 7
      core/src/test/scala/integration/kafka/api/SaslPlainPlaintextConsumerTest.scala
  42. 18
      core/src/test/scala/integration/kafka/api/SaslSetup.scala
  43. 6
      core/src/test/scala/integration/kafka/api/SaslTestHarness.scala
  44. 2
      core/src/test/scala/integration/kafka/api/SslEndToEndAuthorizationTest.scala
  45. 10
      core/src/test/scala/integration/kafka/server/MultipleListenersWithSameSecurityProtocolTest.scala
  46. 2
      core/src/test/scala/unit/kafka/integration/KafkaServerTestHarness.scala
  47. 2
      core/src/test/scala/unit/kafka/network/SocketServerTest.scala
  48. 20
      core/src/test/scala/unit/kafka/utils/JaasTestUtils.scala
  49. 2
      core/src/test/scala/unit/kafka/utils/TestUtils.scala
  50. 2
      streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsKafkaClient.java

15
clients/src/main/java/org/apache/kafka/clients/ClientUtils.java

@ -16,11 +16,11 @@ import java.io.Closeable; @@ -16,11 +16,11 @@ import java.io.Closeable;
import java.net.InetSocketAddress;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.concurrent.atomic.AtomicReference;
import org.apache.kafka.common.config.AbstractConfig;
import org.apache.kafka.common.network.ChannelBuilders;
import org.apache.kafka.common.network.LoginType;
import org.apache.kafka.common.security.JaasContext;
import org.apache.kafka.common.protocol.SecurityProtocol;
import org.apache.kafka.common.network.ChannelBuilder;
import org.apache.kafka.common.config.ConfigException;
@ -73,14 +73,15 @@ public class ClientUtils { @@ -73,14 +73,15 @@ public class ClientUtils {
}
/**
* @param configs client/server configs
* @param config client configs
* @return configured ChannelBuilder based on the configs.
*/
public static ChannelBuilder createChannelBuilder(Map<String, ?> configs) {
SecurityProtocol securityProtocol = SecurityProtocol.forName((String) configs.get(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG));
public static ChannelBuilder createChannelBuilder(AbstractConfig config) {
SecurityProtocol securityProtocol = SecurityProtocol.forName(config.getString(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG));
if (!SecurityProtocol.nonTestingValues().contains(securityProtocol))
throw new ConfigException("Invalid SecurityProtocol " + securityProtocol);
String clientSaslMechanism = (String) configs.get(SaslConfigs.SASL_MECHANISM);
return ChannelBuilders.clientChannelBuilder(securityProtocol, LoginType.CLIENT, configs, clientSaslMechanism, true);
String clientSaslMechanism = config.getString(SaslConfigs.SASL_MECHANISM);
return ChannelBuilders.clientChannelBuilder(securityProtocol, JaasContext.Type.CLIENT, config, null,
clientSaslMechanism, true);
}
}

2
clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java

@ -648,7 +648,7 @@ public class KafkaConsumer<K, V> implements Consumer<K, V> { @@ -648,7 +648,7 @@ public class KafkaConsumer<K, V> implements Consumer<K, V> {
List<InetSocketAddress> addresses = ClientUtils.parseAndValidateAddresses(config.getList(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG));
this.metadata.update(Cluster.bootstrap(addresses), 0);
String metricGrpPrefix = "consumer";
ChannelBuilder channelBuilder = ClientUtils.createChannelBuilder(config.values());
ChannelBuilder channelBuilder = ClientUtils.createChannelBuilder(config);
NetworkClient netClient = new NetworkClient(
new Selector(config.getLong(ConsumerConfig.CONNECTIONS_MAX_IDLE_MS_CONFIG), metrics, time, metricGrpPrefix, channelBuilder),
this.metadata,

2
clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java

@ -298,7 +298,7 @@ public class KafkaProducer<K, V> implements Producer<K, V> { @@ -298,7 +298,7 @@ public class KafkaProducer<K, V> implements Producer<K, V> {
List<InetSocketAddress> addresses = ClientUtils.parseAndValidateAddresses(config.getList(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG));
this.metadata.update(Cluster.bootstrap(addresses), time.milliseconds());
ChannelBuilder channelBuilder = ClientUtils.createChannelBuilder(config.values());
ChannelBuilder channelBuilder = ClientUtils.createChannelBuilder(config);
NetworkClient client = new NetworkClient(
new Selector(config.getLong(ProducerConfig.CONNECTIONS_MAX_IDLE_MS_CONFIG), this.metrics, time, "producer", channelBuilder),
this.metadata,

48
clients/src/main/java/org/apache/kafka/common/config/AbstractConfig.java

@ -46,6 +46,8 @@ public class AbstractConfig { @@ -46,6 +46,8 @@ public class AbstractConfig {
/* the parsed values */
private final Map<String, Object> values;
private final ConfigDef definition;
@SuppressWarnings("unchecked")
public AbstractConfig(ConfigDef definition, Map<?, ?> originals, boolean doLog) {
/* check that all the keys are really strings */
@ -55,6 +57,7 @@ public class AbstractConfig { @@ -55,6 +57,7 @@ public class AbstractConfig {
this.originals = (Map<String, ?>) originals;
this.values = definition.parse(this.originals);
this.used = Collections.synchronizedSet(new HashSet<String>());
this.definition = definition;
if (doLog)
logAll();
}
@ -63,12 +66,6 @@ public class AbstractConfig { @@ -63,12 +66,6 @@ public class AbstractConfig {
this(definition, originals, true);
}
public AbstractConfig(Map<String, Object> parsedConfig) {
this.values = parsedConfig;
this.originals = new HashMap<>();
this.used = Collections.synchronizedSet(new HashSet<String>());
}
protected Object get(String key) {
if (!values.containsKey(key))
throw new ConfigException(String.format("Unknown configuration '%s'", key));
@ -152,7 +149,7 @@ public class AbstractConfig { @@ -152,7 +149,7 @@ public class AbstractConfig {
* @return a Map containing the settings with the prefix
*/
public Map<String, Object> originalsWithPrefix(String prefix) {
Map<String, Object> result = new RecordingMap<>(prefix);
Map<String, Object> result = new RecordingMap<>(prefix, false);
for (Map.Entry<String, ?> entry : originals.entrySet()) {
if (entry.getKey().startsWith(prefix) && entry.getKey().length() > prefix.length())
result.put(entry.getKey().substring(prefix.length()), entry.getValue());
@ -160,6 +157,25 @@ public class AbstractConfig { @@ -160,6 +157,25 @@ public class AbstractConfig {
return result;
}
/**
* Put all keys that do not start with {@code prefix} and their parsed values in the result map and then
* put all the remaining keys with the prefix stripped and their parsed values in the result map.
*
* This is useful if one wants to allow prefixed configs to override default ones.
*/
public Map<String, Object> valuesWithPrefixOverride(String prefix) {
Map<String, Object> result = new RecordingMap<>(values(), prefix, true);
for (Map.Entry<String, ?> entry : originals.entrySet()) {
if (entry.getKey().startsWith(prefix) && entry.getKey().length() > prefix.length()) {
String keyWithNoPrefix = entry.getKey().substring(prefix.length());
ConfigDef.ConfigKey configKey = definition.configKeys().get(keyWithNoPrefix);
if (configKey != null)
result.put(keyWithNoPrefix, definition.parseValue(configKey, entry.getValue(), true));
}
}
return result;
}
public Map<String, ?> values() {
return new RecordingMap<>(values);
}
@ -264,34 +280,40 @@ public class AbstractConfig { @@ -264,34 +280,40 @@ public class AbstractConfig {
private class RecordingMap<V> extends HashMap<String, V> {
private final String prefix;
private final boolean withIgnoreFallback;
RecordingMap() {
this("");
this("", false);
}
RecordingMap(String prefix) {
RecordingMap(String prefix, boolean withIgnoreFallback) {
this.prefix = prefix;
this.withIgnoreFallback = withIgnoreFallback;
}
RecordingMap(Map<String, ? extends V> m) {
this(m, "");
this(m, "", false);
}
RecordingMap(Map<String, ? extends V> m, String prefix) {
RecordingMap(Map<String, ? extends V> m, String prefix, boolean withIgnoreFallback) {
super(m);
this.prefix = prefix;
this.withIgnoreFallback = withIgnoreFallback;
}
@Override
public V get(Object key) {
if (key instanceof String) {
String stringKey = (String) key;
String keyWithPrefix;
if (prefix.isEmpty()) {
keyWithPrefix = (String) key;
keyWithPrefix = stringKey;
} else {
keyWithPrefix = prefix + key;
keyWithPrefix = prefix + stringKey;
}
ignore(keyWithPrefix);
if (withIgnoreFallback)
ignore(stringKey);
}
return super.get(key);
}

38
clients/src/main/java/org/apache/kafka/common/config/ConfigDef.java

@ -426,26 +426,28 @@ public class ConfigDef { @@ -426,26 +426,28 @@ public class ConfigDef {
}
// parse all known keys
Map<String, Object> values = new HashMap<>();
for (ConfigKey key : configKeys.values()) {
Object value;
// props map contains setting - assign ConfigKey value
if (props.containsKey(key.name)) {
value = parseType(key.name, props.get(key.name), key.type);
// props map doesn't contain setting, the key is required because no default value specified - its an error
} else if (key.defaultValue == NO_DEFAULT_VALUE) {
throw new ConfigException("Missing required configuration \"" + key.name + "\" which has no default value.");
} else {
// otherwise assign setting its default value
value = key.defaultValue;
}
if (key.validator != null) {
key.validator.ensureValid(key.name, value);
}
values.put(key.name, value);
}
for (ConfigKey key : configKeys.values())
values.put(key.name, parseValue(key, props.get(key.name), props.containsKey(key.name)));
return values;
}
Object parseValue(ConfigKey key, Object value, boolean isSet) {
Object parsedValue;
if (isSet) {
parsedValue = parseType(key.name, value, key.type);
// props map doesn't contain setting, the key is required because no default value specified - its an error
} else if (key.defaultValue == NO_DEFAULT_VALUE) {
throw new ConfigException("Missing required configuration \"" + key.name + "\" which has no default value.");
} else {
// otherwise assign setting its default value
parsedValue = key.defaultValue;
}
if (key.validator != null) {
key.validator.ensureValid(key.name, parsedValue);
}
return parsedValue;
}
/**
* Validate the current configuration values with the configuration definition.
* @param props the current configuration values
@ -1166,4 +1168,4 @@ public class ConfigDef { @@ -1166,4 +1168,4 @@ public class ConfigDef {
};
}
}
}

56
clients/src/main/java/org/apache/kafka/common/network/ChannelBuilders.java

@ -13,8 +13,10 @@ @@ -13,8 +13,10 @@
package org.apache.kafka.common.network;
import org.apache.kafka.common.config.AbstractConfig;
import org.apache.kafka.common.config.SslConfigs;
import org.apache.kafka.common.protocol.SecurityProtocol;
import org.apache.kafka.common.security.JaasContext;
import org.apache.kafka.common.security.auth.DefaultPrincipalBuilder;
import org.apache.kafka.common.security.auth.PrincipalBuilder;
import org.apache.kafka.common.security.authenticator.CredentialCache;
@ -28,8 +30,9 @@ public class ChannelBuilders { @@ -28,8 +30,9 @@ public class ChannelBuilders {
/**
* @param securityProtocol the securityProtocol
* @param loginType the loginType, it must be non-null if `securityProtocol` is SASL_*; it is ignored otherwise
* @param configs client configs
* @param contextType the contextType, it must be non-null if `securityProtocol` is SASL_*; it is ignored otherwise
* @param config client config
* @param listenerName the listenerName if contextType is SERVER or null otherwise
* @param clientSaslMechanism SASL mechanism if mode is CLIENT, ignored otherwise
* @param saslHandshakeRequestEnable flag to enable Sasl handshake requests; disabled only for SASL
* inter-broker connections with inter-broker protocol version < 0.10
@ -37,41 +40,52 @@ public class ChannelBuilders { @@ -37,41 +40,52 @@ public class ChannelBuilders {
* @throws IllegalArgumentException if `mode` invariants described above is not maintained
*/
public static ChannelBuilder clientChannelBuilder(SecurityProtocol securityProtocol,
LoginType loginType,
Map<String, ?> configs,
JaasContext.Type contextType,
AbstractConfig config,
ListenerName listenerName,
String clientSaslMechanism,
boolean saslHandshakeRequestEnable) {
if (securityProtocol == SecurityProtocol.SASL_PLAINTEXT || securityProtocol == SecurityProtocol.SASL_SSL) {
if (loginType == null)
throw new IllegalArgumentException("`loginType` must be non-null if `securityProtocol` is `" + securityProtocol + "`");
if (contextType == null)
throw new IllegalArgumentException("`contextType` must be non-null if `securityProtocol` is `" + securityProtocol + "`");
if (clientSaslMechanism == null)
throw new IllegalArgumentException("`clientSaslMechanism` must be non-null in client mode if `securityProtocol` is `" + securityProtocol + "`");
}
return create(securityProtocol, Mode.CLIENT, loginType, configs, clientSaslMechanism, saslHandshakeRequestEnable, null);
return create(securityProtocol, Mode.CLIENT, contextType, config, listenerName, clientSaslMechanism,
saslHandshakeRequestEnable, null);
}
/**
* @param listenerName the listenerName
* @param securityProtocol the securityProtocol
* @param configs server configs
* @param config server config
* @param credentialCache Credential cache for SASL/SCRAM if SCRAM is enabled
* @return the configured `ChannelBuilder`
*/
public static ChannelBuilder serverChannelBuilder(SecurityProtocol securityProtocol,
Map<String, ?> configs,
CredentialCache credentialCache) {
return create(securityProtocol, Mode.SERVER, LoginType.SERVER, configs, null, true, credentialCache);
public static ChannelBuilder serverChannelBuilder(ListenerName listenerName,
SecurityProtocol securityProtocol,
AbstractConfig config,
CredentialCache credentialCache) {
return create(securityProtocol, Mode.SERVER, JaasContext.Type.SERVER, config, listenerName, null,
true, credentialCache);
}
private static ChannelBuilder create(SecurityProtocol securityProtocol,
Mode mode,
LoginType loginType,
Map<String, ?> configs,
String clientSaslMechanism,
boolean saslHandshakeRequestEnable,
CredentialCache credentialCache) {
ChannelBuilder channelBuilder;
Mode mode,
JaasContext.Type contextType,
AbstractConfig config,
ListenerName listenerName,
String clientSaslMechanism,
boolean saslHandshakeRequestEnable,
CredentialCache credentialCache) {
Map<String, ?> configs;
if (listenerName == null)
configs = config.values();
else
configs = config.valuesWithPrefixOverride(listenerName.configPrefix());
ChannelBuilder channelBuilder;
switch (securityProtocol) {
case SSL:
requireNonNullMode(mode, securityProtocol);
@ -80,7 +94,9 @@ public class ChannelBuilders { @@ -80,7 +94,9 @@ public class ChannelBuilders {
case SASL_SSL:
case SASL_PLAINTEXT:
requireNonNullMode(mode, securityProtocol);
channelBuilder = new SaslChannelBuilder(mode, loginType, securityProtocol, clientSaslMechanism, saslHandshakeRequestEnable, credentialCache);
JaasContext jaasContext = JaasContext.load(contextType, listenerName, configs);
channelBuilder = new SaslChannelBuilder(mode, jaasContext, securityProtocol,
clientSaslMechanism, saslHandshakeRequestEnable, credentialCache);
break;
case PLAINTEXT:
case TRACE:

6
clients/src/main/java/org/apache/kafka/common/network/ListenerName.java

@ -23,6 +23,8 @@ import java.util.Objects; @@ -23,6 +23,8 @@ import java.util.Objects;
public final class ListenerName {
private static final String CONFIG_STATIC_PREFIX = "listener.name";
/**
* Create an instance with the security protocol name as the value.
*/
@ -65,4 +67,8 @@ public final class ListenerName { @@ -65,4 +67,8 @@ public final class ListenerName {
public String toString() {
return "ListenerName(" + value + ")";
}
public String configPrefix() {
return CONFIG_STATIC_PREFIX + "." + value.toLowerCase(Locale.ROOT) + ".";
}
}

39
clients/src/main/java/org/apache/kafka/common/network/LoginType.java

@ -1,39 +0,0 @@ @@ -1,39 +0,0 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.common.network;
import org.apache.kafka.common.security.JaasUtils;
/**
* The type of the login context, it should be SERVER for the broker and CLIENT for the clients (i.e. consumer and
* producer). It provides the login context name which defines the section of the JAAS configuration file to be used
* for login.
*/
public enum LoginType {
CLIENT(JaasUtils.LOGIN_CONTEXT_CLIENT),
SERVER(JaasUtils.LOGIN_CONTEXT_SERVER);
private final String contextName;
LoginType(String contextName) {
this.contextName = contextName;
}
public String contextName() {
return contextName;
}
}

3
clients/src/main/java/org/apache/kafka/common/network/Mode.java

@ -16,4 +16,7 @@ @@ -16,4 +16,7 @@
*/
package org.apache.kafka.common.network;
/**
* Connection mode for SSL and SASL connections.
*/
public enum Mode { CLIENT, SERVER }

48
clients/src/main/java/org/apache/kafka/common/network/SaslChannelBuilder.java

@ -13,15 +13,15 @@ @@ -13,15 +13,15 @@
package org.apache.kafka.common.network;
import java.io.IOException;
import java.lang.reflect.InvocationTargetException;
import java.lang.reflect.Method;
import java.nio.channels.SelectionKey;
import java.nio.channels.SocketChannel;
import java.util.List;
import java.util.Map;
import javax.security.auth.login.Configuration;
import org.apache.kafka.common.config.SaslConfigs;
import org.apache.kafka.common.security.JaasUtils;
import org.apache.kafka.common.security.JaasContext;
import org.apache.kafka.common.security.kerberos.KerberosShortNamer;
import org.apache.kafka.common.security.authenticator.CredentialCache;
import org.apache.kafka.common.security.authenticator.LoginManager;
@ -39,20 +39,20 @@ public class SaslChannelBuilder implements ChannelBuilder { @@ -39,20 +39,20 @@ public class SaslChannelBuilder implements ChannelBuilder {
private final SecurityProtocol securityProtocol;
private final String clientSaslMechanism;
private final Mode mode;
private final LoginType loginType;
private final JaasContext jaasContext;
private final boolean handshakeRequestEnable;
private final CredentialCache credentialCache;
private Configuration jaasConfig;
private LoginManager loginManager;
private SslFactory sslFactory;
private Map<String, ?> configs;
private KerberosShortNamer kerberosShortNamer;
public SaslChannelBuilder(Mode mode, LoginType loginType, SecurityProtocol securityProtocol,
String clientSaslMechanism, boolean handshakeRequestEnable, CredentialCache credentialCache) {
public SaslChannelBuilder(Mode mode, JaasContext jaasContext, SecurityProtocol securityProtocol,
String clientSaslMechanism,
boolean handshakeRequestEnable, CredentialCache credentialCache) {
this.mode = mode;
this.loginType = loginType;
this.jaasContext = jaasContext;
this.securityProtocol = securityProtocol;
this.handshakeRequestEnable = handshakeRequestEnable;
this.clientSaslMechanism = clientSaslMechanism;
@ -73,7 +73,7 @@ public class SaslChannelBuilder implements ChannelBuilder { @@ -73,7 +73,7 @@ public class SaslChannelBuilder implements ChannelBuilder {
if (hasKerberos) {
String defaultRealm;
try {
defaultRealm = JaasUtils.defaultKerberosRealm();
defaultRealm = defaultKerberosRealm();
} catch (Exception ke) {
defaultRealm = "";
}
@ -82,8 +82,7 @@ public class SaslChannelBuilder implements ChannelBuilder { @@ -82,8 +82,7 @@ public class SaslChannelBuilder implements ChannelBuilder {
if (principalToLocalRules != null)
kerberosShortNamer = KerberosShortNamer.fromUnparsedRules(defaultRealm, principalToLocalRules);
}
this.jaasConfig = JaasUtils.jaasConfig(loginType, configs);
this.loginManager = LoginManager.acquireLoginManager(loginType, hasKerberos, configs, jaasConfig);
this.loginManager = LoginManager.acquireLoginManager(jaasContext, hasKerberos, configs);
if (this.securityProtocol == SecurityProtocol.SASL_SSL) {
// Disable SSL client authentication as we are using SASL authentication
@ -101,8 +100,9 @@ public class SaslChannelBuilder implements ChannelBuilder { @@ -101,8 +100,9 @@ public class SaslChannelBuilder implements ChannelBuilder {
TransportLayer transportLayer = buildTransportLayer(id, key, socketChannel);
Authenticator authenticator;
if (mode == Mode.SERVER)
authenticator = new SaslServerAuthenticator(id, jaasConfig, loginManager.subject(), kerberosShortNamer,
socketChannel.socket().getLocalAddress().getHostName(), maxReceiveSize, credentialCache);
authenticator = new SaslServerAuthenticator(id, jaasContext, loginManager.subject(),
kerberosShortNamer, socketChannel.socket().getLocalAddress().getHostName(), maxReceiveSize,
credentialCache);
else
authenticator = new SaslClientAuthenticator(id, loginManager.subject(), loginManager.serviceName(),
socketChannel.socket().getInetAddress().getHostName(), clientSaslMechanism, handshakeRequestEnable);
@ -128,4 +128,26 @@ public class SaslChannelBuilder implements ChannelBuilder { @@ -128,4 +128,26 @@ public class SaslChannelBuilder implements ChannelBuilder {
return new PlaintextTransportLayer(key);
}
}
private static String defaultKerberosRealm() throws ClassNotFoundException, NoSuchMethodException,
IllegalArgumentException, IllegalAccessException, InvocationTargetException {
//TODO Find a way to avoid using these proprietary classes as access to Java 9 will block access by default
//due to the Jigsaw module system
Object kerbConf;
Class<?> classRef;
Method getInstanceMethod;
Method getDefaultRealmMethod;
if (System.getProperty("java.vendor").contains("IBM")) {
classRef = Class.forName("com.ibm.security.krb5.internal.Config");
} else {
classRef = Class.forName("sun.security.krb5.Config");
}
getInstanceMethod = classRef.getMethod("getInstance", new Class[0]);
kerbConf = getInstanceMethod.invoke(classRef, new Object[0]);
getDefaultRealmMethod = classRef.getDeclaredMethod("getDefaultRealm",
new Class[0]);
return (String) getDefaultRealmMethod.invoke(kerbConf, new Object[0]);
}
}

7
clients/src/main/java/org/apache/kafka/common/security/JaasConfig.java

@ -32,7 +32,6 @@ import javax.security.auth.login.AppConfigurationEntry.LoginModuleControlFlag; @@ -32,7 +32,6 @@ import javax.security.auth.login.AppConfigurationEntry.LoginModuleControlFlag;
import org.apache.kafka.common.KafkaException;
import org.apache.kafka.common.config.SaslConfigs;
import org.apache.kafka.common.network.LoginType;
/**
* JAAS configuration parser that constructs a JAAS configuration object with a single
@ -51,7 +50,7 @@ class JaasConfig extends Configuration { @@ -51,7 +50,7 @@ class JaasConfig extends Configuration {
private final String loginContextName;
private final List<AppConfigurationEntry> configEntries;
public JaasConfig(LoginType loginType, String jaasConfigParams) {
public JaasConfig(String loginContextName, String jaasConfigParams) {
StreamTokenizer tokenizer = new StreamTokenizer(new StringReader(jaasConfigParams));
tokenizer.slashSlashComments(true);
tokenizer.slashStarComments(true);
@ -67,7 +66,7 @@ class JaasConfig extends Configuration { @@ -67,7 +66,7 @@ class JaasConfig extends Configuration {
if (configEntries.isEmpty())
throw new IllegalArgumentException("Login module not specified in JAAS config");
this.loginContextName = loginType.contextName();
this.loginContextName = loginContextName;
} catch (IOException e) {
throw new KafkaException("Unexpected exception while parsing JAAS config");
@ -120,4 +119,4 @@ class JaasConfig extends Configuration { @@ -120,4 +119,4 @@ class JaasConfig extends Configuration {
throw new IllegalArgumentException("JAAS config entry not terminated by semi-colon");
return new AppConfigurationEntry(loginModule, controlFlag, options);
}
}
}

190
clients/src/main/java/org/apache/kafka/common/security/JaasContext.java

@ -0,0 +1,190 @@ @@ -0,0 +1,190 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.common.security;
import org.apache.kafka.common.config.SaslConfigs;
import org.apache.kafka.common.config.types.Password;
import org.apache.kafka.common.network.ListenerName;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import javax.security.auth.login.AppConfigurationEntry;
import javax.security.auth.login.Configuration;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import java.util.Locale;
import java.util.Map;
public class JaasContext {
private static final Logger LOG = LoggerFactory.getLogger(JaasUtils.class);
private static final String GLOBAL_CONTEXT_NAME_SERVER = "KafkaServer";
private static final String GLOBAL_CONTEXT_NAME_CLIENT = "KafkaClient";
/**
* Returns an instance of this class.
*
* For contextType SERVER, the context will contain the default Configuration and the context name will be one of:
*
* 1. Lowercased listener name followed by a period and the string `KafkaServer`
* 2. The string `KafkaServer`
*
* If both are valid entries in the JAAS configuration, the first option is chosen.
*
* For contextType CLIENT, if JAAS configuration property @link SaslConfigs#SASL_JAAS_CONFIG} is specified,
* the configuration object is created by parsing the property value. Otherwise, the default Configuration
* is returned. The context name is always `KafkaClient`.
*
* @throws IllegalArgumentException if JAAS configuration property is specified for contextType SERVER, if
* listenerName is not defined for contextType SERVER of if listenerName is defined for contextType CLIENT.
*/
public static JaasContext load(JaasContext.Type contextType, ListenerName listenerName,
Map<String, ?> configs) {
String listenerContextName;
String globalContextName;
switch (contextType) {
case CLIENT:
if (listenerName != null)
throw new IllegalArgumentException("listenerName should be null for CLIENT");
globalContextName = GLOBAL_CONTEXT_NAME_CLIENT;
listenerContextName = null;
break;
case SERVER:
if (listenerName == null)
throw new IllegalArgumentException("listenerName should not be null for SERVER");
globalContextName = GLOBAL_CONTEXT_NAME_SERVER;
listenerContextName = listenerName.value().toLowerCase(Locale.ROOT) + "." + GLOBAL_CONTEXT_NAME_SERVER;
break;
default:
throw new IllegalArgumentException("Unexpected context type " + contextType);
}
return load(contextType, listenerContextName, globalContextName, configs);
}
static JaasContext load(JaasContext.Type contextType, String listenerContextName,
String globalContextName, Map<String, ?> configs) {
Password jaasConfigArgs = (Password) configs.get(SaslConfigs.SASL_JAAS_CONFIG);
if (jaasConfigArgs != null) {
if (contextType == JaasContext.Type.SERVER)
throw new IllegalArgumentException("JAAS config property not supported for server");
else {
JaasConfig jaasConfig = new JaasConfig(globalContextName, jaasConfigArgs.value());
AppConfigurationEntry[] clientModules = jaasConfig.getAppConfigurationEntry(globalContextName);
int numModules = clientModules == null ? 0 : clientModules.length;
if (numModules != 1)
throw new IllegalArgumentException("JAAS config property contains " + numModules + " login modules, should be 1 module");
return new JaasContext(globalContextName, contextType, jaasConfig);
}
} else
return defaultContext(contextType, listenerContextName, globalContextName);
}
private static JaasContext defaultContext(JaasContext.Type contextType, String listenerContextName,
String globalContextName) {
String jaasConfigFile = System.getProperty(JaasUtils.JAVA_LOGIN_CONFIG_PARAM);
if (jaasConfigFile == null) {
if (contextType == Type.CLIENT) {
LOG.debug("System property '" + JaasUtils.JAVA_LOGIN_CONFIG_PARAM + "' and Kafka SASL property '" +
SaslConfigs.SASL_JAAS_CONFIG + "' are not set, using default JAAS configuration.");
} else {
LOG.debug("System property '" + JaasUtils.JAVA_LOGIN_CONFIG_PARAM + "' is not set, using default JAAS " +
"configuration.");
}
}
Configuration jaasConfig = Configuration.getConfiguration();
AppConfigurationEntry[] configEntries = null;
String contextName = globalContextName;
if (listenerContextName != null) {
configEntries = jaasConfig.getAppConfigurationEntry(listenerContextName);
if (configEntries != null)
contextName = listenerContextName;
}
if (configEntries == null)
configEntries = jaasConfig.getAppConfigurationEntry(globalContextName);
if (configEntries == null) {
String listenerNameText = listenerContextName == null ? "" : " or '" + listenerContextName + "'";
String errorMessage = "Could not find a '" + globalContextName + "'" + listenerNameText + " entry in the JAAS " +
"configuration. System property '" + JaasUtils.JAVA_LOGIN_CONFIG_PARAM + "' is " +
(jaasConfigFile == null ? "not set" : jaasConfigFile);
throw new IllegalArgumentException(errorMessage);
}
return new JaasContext(contextName, contextType, jaasConfig);
}
/**
* The type of the SASL login context, it should be SERVER for the broker and CLIENT for the clients (consumer, producer,
* etc.). This is used to validate behaviour (e.g. some functionality is only available in the broker or clients).
*/
public enum Type { CLIENT, SERVER; }
private final String name;
private final Type type;
private final Configuration configuration;
private final List<AppConfigurationEntry> configurationEntries;
public JaasContext(String name, Type type, Configuration configuration) {
this.name = name;
this.type = type;
this.configuration = configuration;
AppConfigurationEntry[] entries = configuration.getAppConfigurationEntry(name);
if (entries == null)
throw new IllegalArgumentException("Could not find a '" + name + "' entry in this JAAS configuration.");
this.configurationEntries = Collections.unmodifiableList(new ArrayList<>(Arrays.asList(entries)));
}
public String name() {
return name;
}
public Type type() {
return type;
}
public Configuration configuration() {
return configuration;
}
public List<AppConfigurationEntry> configurationEntries() {
return configurationEntries;
}
/**
* Returns the configuration option for <code>key</code> from this context.
* If login module name is specified, return option value only from that module.
*/
public String configEntryOption(String key, String loginModuleName) {
for (AppConfigurationEntry entry : configurationEntries) {
if (loginModuleName != null && !loginModuleName.equals(entry.getLoginModuleName()))
continue;
Object val = entry.getOptions().get(key);
if (val != null)
return (String) val;
}
return null;
}
}

111
clients/src/main/java/org/apache/kafka/common/security/JaasUtils.java

@ -17,16 +17,8 @@ @@ -17,16 +17,8 @@
package org.apache.kafka.common.security;
import javax.security.auth.login.Configuration;
import javax.security.auth.login.AppConfigurationEntry;
import java.lang.reflect.InvocationTargetException;
import java.lang.reflect.Method;
import java.util.Map;
import java.io.IOException;
import org.apache.kafka.common.KafkaException;
import org.apache.kafka.common.config.SaslConfigs;
import org.apache.kafka.common.config.types.Password;
import org.apache.kafka.common.network.LoginType;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@ -34,114 +26,11 @@ public class JaasUtils { @@ -34,114 +26,11 @@ public class JaasUtils {
private static final Logger LOG = LoggerFactory.getLogger(JaasUtils.class);
public static final String JAVA_LOGIN_CONFIG_PARAM = "java.security.auth.login.config";
public static final String LOGIN_CONTEXT_SERVER = "KafkaServer";
public static final String LOGIN_CONTEXT_CLIENT = "KafkaClient";
public static final String SERVICE_NAME = "serviceName";
public static final String ZK_SASL_CLIENT = "zookeeper.sasl.client";
public static final String ZK_LOGIN_CONTEXT_NAME_KEY = "zookeeper.sasl.clientconfig";
/**
* Returns a JAAS Configuration object. For loginType SERVER, default Configuration
* is returned. For loginType CLIENT, if JAAS configuration property
* {@link SaslConfigs#SASL_JAAS_CONFIG} is specified, the configuration object
* is created by parsing the property value. Otherwise, the default Configuration
* is returned.
* @throws IllegalArgumentException if JAAS configuration property is specified
* for loginType SERVER
*/
public static Configuration jaasConfig(LoginType loginType, Map<String, ?> configs) {
Password jaasConfigArgs = (Password) configs.get(SaslConfigs.SASL_JAAS_CONFIG);
if (jaasConfigArgs != null) {
if (loginType == LoginType.SERVER)
throw new IllegalArgumentException("JAAS config property not supported for server");
else {
JaasConfig jaasConfig = new JaasConfig(loginType, jaasConfigArgs.value());
AppConfigurationEntry[] clientModules = jaasConfig.getAppConfigurationEntry(LoginType.CLIENT.contextName());
int numModules = clientModules == null ? 0 : clientModules.length;
if (numModules != 1)
throw new IllegalArgumentException("JAAS config property contains " + numModules + " login modules, should be one module");
return jaasConfig;
}
} else
return defaultJaasConfig(loginType);
}
private static Configuration defaultJaasConfig(LoginType loginType) {
String jaasConfigFile = System.getProperty(JaasUtils.JAVA_LOGIN_CONFIG_PARAM);
if (jaasConfigFile == null) {
LOG.debug("System property '" + JaasUtils.JAVA_LOGIN_CONFIG_PARAM + "' and Kafka SASL property '" +
SaslConfigs.SASL_JAAS_CONFIG + "' are not set, using default JAAS configuration.");
}
Configuration jaasConfig = Configuration.getConfiguration();
String loginContextName = loginType.contextName();
AppConfigurationEntry[] configEntries = jaasConfig.getAppConfigurationEntry(loginContextName);
if (configEntries == null) {
String errorMessage;
errorMessage = "Could not find a '" + loginContextName + "' entry in the JAAS configuration. System property '" +
JaasUtils.JAVA_LOGIN_CONFIG_PARAM + "' is " + (jaasConfigFile == null ? "not set" : jaasConfigFile);
throw new IllegalArgumentException(errorMessage);
}
return jaasConfig;
}
/**
* Returns the configuration option for <code>key</code> from the server login context
* of the default JAAS configuration. If login module name is specified, return option value
* only from that module.
*/
public static String defaultServerJaasConfigOption(String key, String loginModuleName) throws IOException {
return jaasConfigOption(Configuration.getConfiguration(), LoginType.SERVER.contextName(), key, loginModuleName);
}
/**
* Returns the configuration option for <code>key</code> from the login context
* <code>loginContextName</code> of the specified JAAS configuration.
* If login module name is specified, return option value only from that module.
*/
public static String jaasConfigOption(Configuration jaasConfig, String loginContextName, String key, String loginModuleName) throws IOException {
AppConfigurationEntry[] configurationEntries = jaasConfig.getAppConfigurationEntry(loginContextName);
if (configurationEntries == null) {
String errorMessage = "Could not find a '" + loginContextName + "' entry in this JAAS configuration.";
throw new IOException(errorMessage);
}
for (AppConfigurationEntry entry: configurationEntries) {
if (loginModuleName != null && !loginModuleName.equals(entry.getLoginModuleName()))
continue;
Object val = entry.getOptions().get(key);
if (val != null)
return (String) val;
}
return null;
}
public static String defaultKerberosRealm()
throws ClassNotFoundException, NoSuchMethodException,
IllegalArgumentException, IllegalAccessException,
InvocationTargetException {
//TODO Find a way to avoid using these proprietary classes as access to Java 9 will block access by default
//due to the Jigsaw module system
Object kerbConf;
Class<?> classRef;
Method getInstanceMethod;
Method getDefaultRealmMethod;
if (System.getProperty("java.vendor").contains("IBM")) {
classRef = Class.forName("com.ibm.security.krb5.internal.Config");
} else {
classRef = Class.forName("sun.security.krb5.Config");
}
getInstanceMethod = classRef.getMethod("getInstance", new Class[0]);
kerbConf = getInstanceMethod.invoke(classRef, new Object[0]);
getDefaultRealmMethod = classRef.getDeclaredMethod("getDefaultRealm",
new Class[0]);
return (String) getDefaultRealmMethod.invoke(kerbConf, new Object[0]);
}
public static boolean isZkSecurityEnabled() {
boolean zkSaslEnabled = Boolean.parseBoolean(System.getProperty(ZK_SASL_CLIENT, "true"));
String zkLoginContextName = System.getProperty(ZK_LOGIN_CONTEXT_NAME_KEY, "Client");

1
clients/src/main/java/org/apache/kafka/common/security/auth/DefaultPrincipalBuilder.java

@ -25,7 +25,6 @@ import org.apache.kafka.common.network.Authenticator; @@ -25,7 +25,6 @@ import org.apache.kafka.common.network.Authenticator;
import org.apache.kafka.common.KafkaException;
/** DefaultPrincipalBuilder which return transportLayer's peer Principal **/
public class DefaultPrincipalBuilder implements PrincipalBuilder {
public void configure(Map<String, ?> configs) {}

5
clients/src/main/java/org/apache/kafka/common/security/auth/Login.java

@ -18,10 +18,11 @@ @@ -18,10 +18,11 @@
package org.apache.kafka.common.security.auth;
import org.apache.kafka.common.security.JaasContext;
import java.util.Map;
import javax.security.auth.Subject;
import javax.security.auth.login.Configuration;
import javax.security.auth.login.LoginContext;
import javax.security.auth.login.LoginException;
@ -33,7 +34,7 @@ public interface Login { @@ -33,7 +34,7 @@ public interface Login {
/**
* Configures this login instance.
*/
void configure(Map<String, ?> configs, Configuration jaasConfig, String loginContextName);
void configure(Map<String, ?> configs, JaasContext jaasContext);
/**
* Performs login for each login module specified for the login context of this instance.

17
clients/src/main/java/org/apache/kafka/common/security/authenticator/AbstractLogin.java

@ -18,7 +18,6 @@ @@ -18,7 +18,6 @@
package org.apache.kafka.common.security.authenticator;
import javax.security.auth.login.Configuration;
import javax.security.auth.login.LoginContext;
import javax.security.auth.login.LoginException;
import javax.security.sasl.RealmCallback;
@ -29,6 +28,7 @@ import javax.security.auth.callback.PasswordCallback; @@ -29,6 +28,7 @@ import javax.security.auth.callback.PasswordCallback;
import javax.security.auth.callback.UnsupportedCallbackException;
import javax.security.auth.Subject;
import org.apache.kafka.common.security.JaasContext;
import org.apache.kafka.common.security.auth.Login;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@ -41,20 +41,17 @@ import java.util.Map; @@ -41,20 +41,17 @@ import java.util.Map;
public abstract class AbstractLogin implements Login {
private static final Logger log = LoggerFactory.getLogger(AbstractLogin.class);
private Configuration jaasConfig;
private String loginContextName;
private JaasContext jaasContext;
private LoginContext loginContext;
@Override
public void configure(Map<String, ?> configs, Configuration jaasConfig, String loginContextName) {
this.jaasConfig = jaasConfig;
this.loginContextName = loginContextName;
public void configure(Map<String, ?> configs, JaasContext jaasContext) {
this.jaasContext = jaasContext;
}
@Override
public LoginContext login() throws LoginException {
loginContext = new LoginContext(loginContextName, null, new LoginCallbackHandler(), jaasConfig);
loginContext = new LoginContext(jaasContext.name(), null, new LoginCallbackHandler(), jaasContext.configuration());
loginContext.login();
log.info("Successfully logged in.");
return loginContext;
@ -65,8 +62,8 @@ public abstract class AbstractLogin implements Login { @@ -65,8 +62,8 @@ public abstract class AbstractLogin implements Login {
return loginContext.getSubject();
}
protected Configuration jaasConfig() {
return jaasConfig;
protected JaasContext jaasContext() {
return jaasContext;
}
/**

54
clients/src/main/java/org/apache/kafka/common/security/authenticator/LoginManager.java

@ -19,39 +19,36 @@ @@ -19,39 +19,36 @@
package org.apache.kafka.common.security.authenticator;
import javax.security.auth.Subject;
import javax.security.auth.login.Configuration;
import javax.security.auth.login.LoginException;
import java.io.IOException;
import java.util.ArrayList;
import java.util.EnumMap;
import java.util.HashMap;
import java.util.Map;
import org.apache.kafka.common.config.SaslConfigs;
import org.apache.kafka.common.config.types.Password;
import org.apache.kafka.common.network.LoginType;
import org.apache.kafka.common.security.JaasContext;
import org.apache.kafka.common.security.auth.Login;
import org.apache.kafka.common.security.kerberos.KerberosLogin;
public class LoginManager {
// static configs (broker or client)
private static final EnumMap<LoginType, LoginManager> LOGIN_TYPE_INSTANCES = new EnumMap<>(LoginType.class);
private static final Map<String, LoginManager> STATIC_INSTANCES = new HashMap<>();
// dynamic configs (client-only)
private static final Map<Password, LoginManager> JAAS_CONF_INSTANCES = new HashMap<>();
private static final Map<Password, LoginManager> DYNAMIC_INSTANCES = new HashMap<>();
private final Login login;
private final Object cacheKey;
private int refCount;
private LoginManager(LoginType loginType, boolean hasKerberos, Map<String, ?> configs, Configuration jaasConfig,
private LoginManager(JaasContext jaasContext, boolean hasKerberos, Map<String, ?> configs,
Password jaasConfigValue) throws IOException, LoginException {
this.cacheKey = jaasConfigValue != null ? jaasConfigValue : loginType;
String loginContext = loginType.contextName();
this.cacheKey = jaasConfigValue != null ? jaasConfigValue : jaasContext.name();
login = hasKerberos ? new KerberosLogin() : new DefaultLogin();
login.configure(configs, jaasConfig, loginContext);
login.configure(configs, jaasContext);
login.login();
}
@ -59,35 +56,30 @@ public class LoginManager { @@ -59,35 +56,30 @@ public class LoginManager {
* Returns an instance of `LoginManager` and increases its reference count.
*
* `release()` should be invoked when the `LoginManager` is no longer needed. This method will try to reuse an
* existing `LoginManager` for the provided `loginType` and `SaslConfigs.SASL_JAAS_CONFIG` in `configs`, if available.
* existing `LoginManager` for the provided context type and `SaslConfigs.SASL_JAAS_CONFIG` in `configs`,
* if available.
*
* This is a bit ugly and it would be nicer if we could pass the `LoginManager` to `ChannelBuilders.create` and
* shut it down when the broker or clients are closed. It's straightforward to do the former, but it's more
* complicated to do the latter without making the consumer API more complex.
*
* @param loginType the type of the login context, it should be SERVER for the broker and CLIENT for the clients
* (i.e. consumer and producer)
* @param hasKerberos
* @param configs configuration as key/value pairs
* @param jaasConfig JAAS Configuration object
*/
public static LoginManager acquireLoginManager(LoginType loginType, boolean hasKerberos, Map<String, ?> configs,
Configuration jaasConfig) throws IOException, LoginException {
public static LoginManager acquireLoginManager(JaasContext jaasContext, boolean hasKerberos,
Map<String, ?> configs) throws IOException, LoginException {
synchronized (LoginManager.class) {
// SASL_JAAS_CONFIG is only supported by clients
LoginManager loginManager;
Password jaasConfigValue = (Password) configs.get(SaslConfigs.SASL_JAAS_CONFIG);
if (loginType == LoginType.CLIENT && jaasConfigValue != null) {
loginManager = JAAS_CONF_INSTANCES.get(jaasConfigValue);
if (jaasContext.type() == JaasContext.Type.CLIENT && jaasConfigValue != null) {
loginManager = DYNAMIC_INSTANCES.get(jaasConfigValue);
if (loginManager == null) {
loginManager = new LoginManager(loginType, hasKerberos, configs, jaasConfig, jaasConfigValue);
JAAS_CONF_INSTANCES.put(jaasConfigValue, loginManager);
loginManager = new LoginManager(jaasContext, hasKerberos, configs, jaasConfigValue);
DYNAMIC_INSTANCES.put(jaasConfigValue, loginManager);
}
} else {
loginManager = LOGIN_TYPE_INSTANCES.get(loginType);
loginManager = STATIC_INSTANCES.get(jaasContext.name());
if (loginManager == null) {
loginManager = new LoginManager(loginType, hasKerberos, configs, jaasConfig, jaasConfigValue);
LOGIN_TYPE_INSTANCES.put(loginType, loginManager);
loginManager = new LoginManager(jaasContext, hasKerberos, configs, jaasConfigValue);
STATIC_INSTANCES.put(jaasContext.name(), loginManager);
}
}
return loginManager.acquire();
@ -116,9 +108,9 @@ public class LoginManager { @@ -116,9 +108,9 @@ public class LoginManager {
throw new IllegalStateException("release called on LoginManager with refCount == 0");
else if (refCount == 1) {
if (cacheKey instanceof Password) {
JAAS_CONF_INSTANCES.remove(cacheKey);
DYNAMIC_INSTANCES.remove(cacheKey);
} else {
LOGIN_TYPE_INSTANCES.remove(cacheKey);
STATIC_INSTANCES.remove(cacheKey);
}
login.close();
}
@ -129,10 +121,10 @@ public class LoginManager { @@ -129,10 +121,10 @@ public class LoginManager {
/* Should only be used in tests. */
public static void closeAll() {
synchronized (LoginManager.class) {
for (LoginType key : new ArrayList<>(LOGIN_TYPE_INSTANCES.keySet()))
LOGIN_TYPE_INSTANCES.remove(key).login.close();
for (Password key : new ArrayList<>(JAAS_CONF_INSTANCES.keySet()))
JAAS_CONF_INSTANCES.remove(key).login.close();
for (String key : new ArrayList<>(STATIC_INSTANCES.keySet()))
STATIC_INSTANCES.remove(key).login.close();
for (Password key : new ArrayList<>(DYNAMIC_INSTANCES.keySet()))
DYNAMIC_INSTANCES.remove(key).login.close();
}
}
}

2
clients/src/main/java/org/apache/kafka/common/security/authenticator/SaslClientCallbackHandler.java

@ -91,4 +91,4 @@ public class SaslClientCallbackHandler implements AuthCallbackHandler { @@ -91,4 +91,4 @@ public class SaslClientCallbackHandler implements AuthCallbackHandler {
@Override
public void close() {
}
}
}

10
clients/src/main/java/org/apache/kafka/common/security/authenticator/SaslServerAuthenticator.java

@ -24,6 +24,7 @@ import org.apache.kafka.common.errors.IllegalSaslStateException; @@ -24,6 +24,7 @@ import org.apache.kafka.common.errors.IllegalSaslStateException;
import org.apache.kafka.common.errors.UnsupportedSaslMechanismException;
import org.apache.kafka.common.errors.UnsupportedVersionException;
import org.apache.kafka.common.network.Authenticator;
import org.apache.kafka.common.security.JaasContext;
import org.apache.kafka.common.network.Mode;
import org.apache.kafka.common.network.NetworkReceive;
import org.apache.kafka.common.network.NetworkSend;
@ -57,7 +58,6 @@ import org.slf4j.Logger; @@ -57,7 +58,6 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import javax.security.auth.Subject;
import javax.security.auth.login.Configuration;
import javax.security.sasl.Sasl;
import javax.security.sasl.SaslException;
import javax.security.sasl.SaslServer;
@ -81,7 +81,7 @@ public class SaslServerAuthenticator implements Authenticator { @@ -81,7 +81,7 @@ public class SaslServerAuthenticator implements Authenticator {
}
private final String node;
private final Configuration jaasConfig;
private final JaasContext jaasContext;
private final Subject subject;
private final KerberosShortNamer kerberosNamer;
private final int maxReceiveSize;
@ -105,11 +105,11 @@ public class SaslServerAuthenticator implements Authenticator { @@ -105,11 +105,11 @@ public class SaslServerAuthenticator implements Authenticator {
private NetworkReceive netInBuffer;
private Send netOutBuffer;
public SaslServerAuthenticator(String node, Configuration jaasConfig, final Subject subject, KerberosShortNamer kerberosNameParser, String host, int maxReceiveSize, CredentialCache credentialCache) throws IOException {
public SaslServerAuthenticator(String node, JaasContext jaasContext, final Subject subject, KerberosShortNamer kerberosNameParser, String host, int maxReceiveSize, CredentialCache credentialCache) throws IOException {
if (subject == null)
throw new IllegalArgumentException("subject cannot be null");
this.node = node;
this.jaasConfig = jaasConfig;
this.jaasContext = jaasContext;
this.subject = subject;
this.kerberosNamer = kerberosNameParser;
this.maxReceiveSize = maxReceiveSize;
@ -129,7 +129,7 @@ public class SaslServerAuthenticator implements Authenticator { @@ -129,7 +129,7 @@ public class SaslServerAuthenticator implements Authenticator {
private void createSaslServer(String mechanism) throws IOException {
this.saslMechanism = mechanism;
if (!ScramMechanism.isScram(mechanism))
callbackHandler = new SaslServerCallbackHandler(jaasConfig, kerberosNamer);
callbackHandler = new SaslServerCallbackHandler(jaasContext, kerberosNamer);
else
callbackHandler = new ScramServerCallbackHandler(credentialCache.cache(mechanism, ScramCredential.class));
callbackHandler.configure(configs, Mode.SERVER, subject, saslMechanism);

15
clients/src/main/java/org/apache/kafka/common/security/authenticator/SaslServerCallbackHandler.java

@ -21,6 +21,7 @@ package org.apache.kafka.common.security.authenticator; @@ -21,6 +21,7 @@ package org.apache.kafka.common.security.authenticator;
import java.io.IOException;
import java.util.Map;
import org.apache.kafka.common.security.JaasContext;
import org.apache.kafka.common.security.auth.AuthCallbackHandler;
import org.apache.kafka.common.security.kerberos.KerberosShortNamer;
import org.slf4j.Logger;
@ -29,14 +30,11 @@ import org.slf4j.LoggerFactory; @@ -29,14 +30,11 @@ import org.slf4j.LoggerFactory;
import javax.security.auth.Subject;
import javax.security.auth.callback.Callback;
import javax.security.auth.callback.UnsupportedCallbackException;
import javax.security.auth.login.AppConfigurationEntry;
import javax.security.auth.login.Configuration;
import javax.security.sasl.AuthorizeCallback;
import javax.security.sasl.RealmCallback;
import org.apache.kafka.common.security.kerberos.KerberosName;
import org.apache.kafka.common.network.Mode;
import org.apache.kafka.common.security.JaasUtils;
/**
* Callback handler for Sasl servers. The callbacks required for all the SASL
@ -47,11 +45,10 @@ import org.apache.kafka.common.security.JaasUtils; @@ -47,11 +45,10 @@ import org.apache.kafka.common.security.JaasUtils;
public class SaslServerCallbackHandler implements AuthCallbackHandler {
private static final Logger LOG = LoggerFactory.getLogger(SaslServerCallbackHandler.class);
private final KerberosShortNamer kerberosShortNamer;
private final JaasContext jaasContext;
public SaslServerCallbackHandler(Configuration configuration, KerberosShortNamer kerberosNameParser) throws IOException {
AppConfigurationEntry[] configurationEntries = configuration.getAppConfigurationEntry(JaasUtils.LOGIN_CONTEXT_SERVER);
if (configurationEntries == null)
throw new IOException("Could not find a 'KafkaServer' entry in this configuration: Kafka Server cannot start.");
public SaslServerCallbackHandler(JaasContext jaasContext, KerberosShortNamer kerberosNameParser) throws IOException {
this.jaasContext = jaasContext;
this.kerberosShortNamer = kerberosNameParser;
}
@ -59,6 +56,10 @@ public class SaslServerCallbackHandler implements AuthCallbackHandler { @@ -59,6 +56,10 @@ public class SaslServerCallbackHandler implements AuthCallbackHandler {
public void configure(Map<String, ?> configs, Mode mode, Subject subject, String saslMechanism) {
}
public JaasContext jaasContext() {
return jaasContext;
}
@Override
public void handle(Callback[] callbacks) throws UnsupportedCallbackException {
for (Callback callback : callbacks) {

44
clients/src/main/java/org/apache/kafka/common/security/kerberos/KerberosLogin.java

@ -20,13 +20,12 @@ package org.apache.kafka.common.security.kerberos; @@ -20,13 +20,12 @@ package org.apache.kafka.common.security.kerberos;
import javax.security.auth.kerberos.KerberosPrincipal;
import javax.security.auth.login.AppConfigurationEntry;
import javax.security.auth.login.Configuration;
import javax.security.auth.login.LoginContext;
import javax.security.auth.login.LoginException;
import javax.security.auth.kerberos.KerberosTicket;
import javax.security.auth.Subject;
import org.apache.kafka.common.KafkaException;
import org.apache.kafka.common.security.JaasContext;
import org.apache.kafka.common.security.JaasUtils;
import org.apache.kafka.common.security.authenticator.AbstractLogin;
import org.apache.kafka.common.config.SaslConfigs;
@ -36,8 +35,8 @@ import org.apache.kafka.common.utils.Utils; @@ -36,8 +35,8 @@ import org.apache.kafka.common.utils.Utils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.IOException;
import java.util.Date;
import java.util.List;
import java.util.Random;
import java.util.Set;
import java.util.Map;
@ -56,7 +55,6 @@ public class KerberosLogin extends AbstractLogin { @@ -56,7 +55,6 @@ public class KerberosLogin extends AbstractLogin {
private boolean isKrbTicket;
private boolean isUsingTicketCache;
private String loginContextName;
private String principal;
// LoginThread will sleep until 80% of time from last refresh to
@ -82,26 +80,19 @@ public class KerberosLogin extends AbstractLogin { @@ -82,26 +80,19 @@ public class KerberosLogin extends AbstractLogin {
private String serviceName;
private long lastLogin;
/**
* Login constructor. The constructor starts the thread used
* to periodically re-login to the Kerberos Ticket Granting Server.
* @param loginContextName
* name of section in JAAS file that will be use to login.
* Passed as first param to javax.security.auth.login.LoginContext().
* @param configs configure Login with the given key-value pairs.
* @throws javax.security.auth.login.LoginException
* Thrown if authentication fails.
*/
public void configure(Map<String, ?> configs, Configuration jaasConfig, final String loginContextName) {
super.configure(configs, jaasConfig, loginContextName);
this.loginContextName = loginContextName;
public void configure(Map<String, ?> configs, JaasContext jaasContext) {
super.configure(configs, jaasContext);
this.ticketRenewWindowFactor = (Double) configs.get(SaslConfigs.SASL_KERBEROS_TICKET_RENEW_WINDOW_FACTOR);
this.ticketRenewJitter = (Double) configs.get(SaslConfigs.SASL_KERBEROS_TICKET_RENEW_JITTER);
this.minTimeBeforeRelogin = (Long) configs.get(SaslConfigs.SASL_KERBEROS_MIN_TIME_BEFORE_RELOGIN);
this.kinitCmd = (String) configs.get(SaslConfigs.SASL_KERBEROS_KINIT_CMD);
this.serviceName = getServiceName(jaasConfig, configs, loginContextName);
this.serviceName = getServiceName(configs, jaasContext);
}
/**
* Performs login for each login module specified for the login context of this instance and starts the thread used
* to periodically re-login to the Kerberos Ticket Granting Server.
*/
@Override
public LoginContext login() throws LoginException {
@ -110,13 +101,13 @@ public class KerberosLogin extends AbstractLogin { @@ -110,13 +101,13 @@ public class KerberosLogin extends AbstractLogin {
subject = loginContext.getSubject();
isKrbTicket = !subject.getPrivateCredentials(KerberosTicket.class).isEmpty();
AppConfigurationEntry[] entries = jaasConfig().getAppConfigurationEntry(loginContextName);
if (entries.length == 0) {
List<AppConfigurationEntry> entries = jaasContext().configurationEntries();
if (entries.isEmpty()) {
isUsingTicketCache = false;
principal = null;
} else {
// there will only be a single entry
AppConfigurationEntry entry = entries[0];
AppConfigurationEntry entry = entries.get(0);
if (entry.getOptions().get("useTicketCache") != null) {
String val = (String) entry.getOptions().get("useTicketCache");
isUsingTicketCache = val.equals("true");
@ -292,13 +283,8 @@ public class KerberosLogin extends AbstractLogin { @@ -292,13 +283,8 @@ public class KerberosLogin extends AbstractLogin {
return serviceName;
}
private String getServiceName(Configuration jaasConfig, Map<String, ?> configs, String loginContext) {
String jaasServiceName;
try {
jaasServiceName = JaasUtils.jaasConfigOption(jaasConfig, loginContext, JaasUtils.SERVICE_NAME, null);
} catch (IOException e) {
throw new KafkaException("JAAS configuration entry not found", e);
}
private static String getServiceName(Map<String, ?> configs, JaasContext jaasContext) {
String jaasServiceName = jaasContext.configEntryOption(JaasUtils.SERVICE_NAME, null);
String configServiceName = (String) configs.get(SaslConfigs.SASL_KERBEROS_SERVICE_NAME);
if (jaasServiceName != null && configServiceName != null && !jaasServiceName.equals(configServiceName)) {
String message = String.format("Conflicting serviceName values found in JAAS and Kafka configs " +
@ -377,7 +363,7 @@ public class KerberosLogin extends AbstractLogin { @@ -377,7 +363,7 @@ public class KerberosLogin extends AbstractLogin {
loginContext.logout();
//login and also update the subject field of this instance to
//have the new credentials (pass it to the LoginContext constructor)
loginContext = new LoginContext(loginContextName, subject, null, jaasConfig());
loginContext = new LoginContext(jaasContext().name(), subject, null, jaasContext().configuration());
log.info("Initiating re-login for {}", principal);
loginContext.login();
}

29
clients/src/main/java/org/apache/kafka/common/security/plain/PlainSaslServer.java

@ -18,7 +18,6 @@ @@ -18,7 +18,6 @@
package org.apache.kafka.common.security.plain;
import java.io.IOException;
import java.io.UnsupportedEncodingException;
import java.util.Arrays;
import java.util.Map;
@ -29,7 +28,8 @@ import javax.security.sasl.SaslException; @@ -29,7 +28,8 @@ import javax.security.sasl.SaslException;
import javax.security.sasl.SaslServer;
import javax.security.sasl.SaslServerFactory;
import org.apache.kafka.common.security.JaasUtils;
import org.apache.kafka.common.security.JaasContext;
import org.apache.kafka.common.security.authenticator.SaslServerCallbackHandler;
/**
* Simple SaslServer implementation for SASL/PLAIN. In order to make this implementation
@ -49,10 +49,13 @@ public class PlainSaslServer implements SaslServer { @@ -49,10 +49,13 @@ public class PlainSaslServer implements SaslServer {
public static final String PLAIN_MECHANISM = "PLAIN";
private static final String JAAS_USER_PREFIX = "user_";
private final JaasContext jaasContext;
private boolean complete;
private String authorizationID;
public PlainSaslServer(CallbackHandler callbackHandler) {
public PlainSaslServer(JaasContext jaasContext) {
this.jaasContext = jaasContext;
}
@Override
@ -91,13 +94,10 @@ public class PlainSaslServer implements SaslServer { @@ -91,13 +94,10 @@ public class PlainSaslServer implements SaslServer {
if (authorizationID.isEmpty())
authorizationID = username;
try {
String expectedPassword = JaasUtils.defaultServerJaasConfigOption(JAAS_USER_PREFIX + username, PlainLoginModule.class.getName());
if (!password.equals(expectedPassword)) {
throw new SaslException("Authentication failed: Invalid username or password");
}
} catch (IOException e) {
throw new SaslException("Authentication failed: Invalid JAAS configuration", e);
String expectedPassword = jaasContext.configEntryOption(JAAS_USER_PREFIX + username,
PlainLoginModule.class.getName());
if (!password.equals(expectedPassword)) {
throw new SaslException("Authentication failed: Invalid username or password");
}
complete = true;
return new byte[0];
@ -151,10 +151,13 @@ public class PlainSaslServer implements SaslServer { @@ -151,10 +151,13 @@ public class PlainSaslServer implements SaslServer {
public SaslServer createSaslServer(String mechanism, String protocol, String serverName, Map<String, ?> props, CallbackHandler cbh)
throws SaslException {
if (!PLAIN_MECHANISM.equals(mechanism)) {
if (!PLAIN_MECHANISM.equals(mechanism))
throw new SaslException(String.format("Mechanism \'%s\' is not supported. Only PLAIN is supported.", mechanism));
}
return new PlainSaslServer(cbh);
if (!(cbh instanceof SaslServerCallbackHandler))
throw new SaslException("CallbackHandler must be of type SaslServerCallbackHandler, but it is: " + cbh.getClass());
return new PlainSaslServer(((SaslServerCallbackHandler) cbh).jaasContext());
}
@Override

2
clients/src/main/java/org/apache/kafka/common/security/scram/ScramServerCallbackHandler.java

@ -57,4 +57,4 @@ public class ScramServerCallbackHandler implements AuthCallbackHandler { @@ -57,4 +57,4 @@ public class ScramServerCallbackHandler implements AuthCallbackHandler {
@Override
public void close() {
}
}
}

58
clients/src/test/java/org/apache/kafka/common/config/AbstractConfigTest.java

@ -18,6 +18,7 @@ import org.apache.kafka.common.config.ConfigDef.Type; @@ -18,6 +18,7 @@ import org.apache.kafka.common.config.ConfigDef.Type;
import org.apache.kafka.common.metrics.FakeMetricsReporter;
import org.apache.kafka.common.metrics.JmxReporter;
import org.apache.kafka.common.metrics.MetricsReporter;
import org.apache.kafka.common.security.TestSecurityConfig;
import org.junit.Test;
import java.util.Arrays;
@ -27,6 +28,7 @@ import java.util.Map; @@ -27,6 +28,7 @@ import java.util.Map;
import java.util.Properties;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertNull;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
import static org.junit.Assert.assertEquals;
@ -61,6 +63,62 @@ public class AbstractConfigTest { @@ -61,6 +63,62 @@ public class AbstractConfigTest {
assertEquals(expected, originalsWithPrefix);
}
@Test
public void testValuesWithPrefixOverride() {
String prefix = "prefix.";
Properties props = new Properties();
props.put("sasl.mechanism", "PLAIN");
props.put("prefix.sasl.mechanism", "GSSAPI");
props.put("prefix.sasl.kerberos.kinit.cmd", "/usr/bin/kinit2");
props.put("prefix.ssl.truststore.location", "my location");
props.put("sasl.kerberos.service.name", "service name");
props.put("ssl.keymanager.algorithm", "algorithm");
TestSecurityConfig config = new TestSecurityConfig(props);
Map<String, Object> valuesWithPrefixOverride = config.valuesWithPrefixOverride(prefix);
// prefix overrides global
assertTrue(config.unused().contains("prefix.sasl.mechanism"));
assertTrue(config.unused().contains("sasl.mechanism"));
assertEquals("GSSAPI", valuesWithPrefixOverride.get("sasl.mechanism"));
assertFalse(config.unused().contains("sasl.mechanism"));
assertFalse(config.unused().contains("prefix.sasl.mechanism"));
// prefix overrides default
assertTrue(config.unused().contains("prefix.sasl.kerberos.kinit.cmd"));
assertFalse(config.unused().contains("sasl.kerberos.kinit.cmd"));
assertEquals("/usr/bin/kinit2", valuesWithPrefixOverride.get("sasl.kerberos.kinit.cmd"));
assertFalse(config.unused().contains("sasl.kerberos.kinit.cmd"));
assertFalse(config.unused().contains("prefix.sasl.kerberos.kinit.cmd"));
// prefix override with no default
assertTrue(config.unused().contains("prefix.ssl.truststore.location"));
assertFalse(config.unused().contains("ssl.truststore.location"));
assertEquals("my location", valuesWithPrefixOverride.get("ssl.truststore.location"));
assertFalse(config.unused().contains("ssl.truststore.location"));
assertFalse(config.unused().contains("prefix.ssl.truststore.location"));
// global overrides default
assertTrue(config.unused().contains("ssl.keymanager.algorithm"));
assertEquals("algorithm", valuesWithPrefixOverride.get("ssl.keymanager.algorithm"));
assertFalse(config.unused().contains("ssl.keymanager.algorithm"));
// global with no default
assertTrue(config.unused().contains("sasl.kerberos.service.name"));
assertEquals("service name", valuesWithPrefixOverride.get("sasl.kerberos.service.name"));
assertFalse(config.unused().contains("sasl.kerberos.service.name"));
// unset with default
assertFalse(config.unused().contains("sasl.kerberos.min.time.before.relogin"));
assertEquals(SaslConfigs.DEFAULT_KERBEROS_MIN_TIME_BEFORE_RELOGIN,
valuesWithPrefixOverride.get("sasl.kerberos.min.time.before.relogin"));
assertFalse(config.unused().contains("sasl.kerberos.min.time.before.relogin"));
// unset with no default
assertFalse(config.unused().contains("ssl.key.password"));
assertNull(valuesWithPrefixOverride.get("ssl.key.password"));
assertFalse(config.unused().contains("ssl.key.password"));
}
@Test
public void testUnused() {
Properties props = new Properties();

9
clients/src/test/java/org/apache/kafka/common/network/NetworkTestUtils.java

@ -14,11 +14,11 @@ package org.apache.kafka.common.network; @@ -14,11 +14,11 @@ package org.apache.kafka.common.network;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.Map;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;
import org.apache.kafka.common.config.AbstractConfig;
import org.apache.kafka.common.metrics.Metrics;
import org.apache.kafka.common.protocol.SecurityProtocol;
import org.apache.kafka.common.utils.MockTime;
@ -30,8 +30,9 @@ import org.apache.kafka.test.TestUtils; @@ -30,8 +30,9 @@ import org.apache.kafka.test.TestUtils;
*/
public class NetworkTestUtils {
public static NioEchoServer createEchoServer(SecurityProtocol securityProtocol, Map<String, Object> serverConfigs) throws Exception {
NioEchoServer server = new NioEchoServer(securityProtocol, serverConfigs, "localhost");
public static NioEchoServer createEchoServer(ListenerName listenerName, SecurityProtocol securityProtocol,
AbstractConfig serverConfig) throws Exception {
NioEchoServer server = new NioEchoServer(listenerName, securityProtocol, serverConfig, "localhost");
server.start();
return server;
}
@ -81,6 +82,6 @@ public class NetworkTestUtils { @@ -81,6 +82,6 @@ public class NetworkTestUtils {
break;
}
}
assertTrue(closed);
assertTrue("Channel was not closed by timeout", closed);
}
}

10
clients/src/test/java/org/apache/kafka/common/network/NioEchoServer.java

@ -23,8 +23,8 @@ import java.util.ArrayList; @@ -23,8 +23,8 @@ import java.util.ArrayList;
import java.util.Collections;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import org.apache.kafka.common.config.AbstractConfig;
import org.apache.kafka.common.metrics.Metrics;
import org.apache.kafka.common.protocol.SecurityProtocol;
import org.apache.kafka.common.security.authenticator.CredentialCache;
@ -47,7 +47,9 @@ public class NioEchoServer extends Thread { @@ -47,7 +47,9 @@ public class NioEchoServer extends Thread {
private volatile WritableByteChannel outputChannel;
private final CredentialCache credentialCache;
public NioEchoServer(SecurityProtocol securityProtocol, Map<String, ?> configs, String serverHost) throws Exception {
public NioEchoServer(ListenerName listenerName, SecurityProtocol securityProtocol, AbstractConfig config, String serverHost) throws Exception {
super("echoserver");
setDaemon(true);
serverSocketChannel = ServerSocketChannel.open();
serverSocketChannel.configureBlocking(false);
serverSocketChannel.socket().bind(new InetSocketAddress(serverHost, 0));
@ -57,10 +59,8 @@ public class NioEchoServer extends Thread { @@ -57,10 +59,8 @@ public class NioEchoServer extends Thread {
this.credentialCache = new CredentialCache();
if (securityProtocol == SecurityProtocol.SASL_PLAINTEXT || securityProtocol == SecurityProtocol.SASL_SSL)
ScramCredentialUtils.createCache(credentialCache, ScramMechanism.mechanismNames());
ChannelBuilder channelBuilder = ChannelBuilders.serverChannelBuilder(securityProtocol, configs, credentialCache);
ChannelBuilder channelBuilder = ChannelBuilders.serverChannelBuilder(listenerName, securityProtocol, config, credentialCache);
this.selector = new Selector(5000, new Metrics(), new MockTime(), "MetricGroup", channelBuilder);
setName("echoserver");
setDaemon(true);
acceptorThread = new AcceptorThread();
}

94
clients/src/test/java/org/apache/kafka/common/network/SslTransportLayerTest.java

@ -14,8 +14,6 @@ package org.apache.kafka.common.network; @@ -14,8 +14,6 @@ package org.apache.kafka.common.network;
import static org.junit.Assert.fail;
import java.util.Arrays;
import java.util.Map;
import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.net.InetAddress;
@ -24,19 +22,22 @@ import java.nio.ByteBuffer; @@ -24,19 +22,22 @@ import java.nio.ByteBuffer;
import java.nio.channels.Channels;
import java.nio.channels.SelectionKey;
import java.nio.channels.SocketChannel;
import java.util.Arrays;
import java.util.Map;
import javax.net.ssl.SSLContext;
import javax.net.ssl.SSLEngine;
import org.apache.kafka.common.KafkaException;
import org.apache.kafka.common.config.SslConfigs;
import org.apache.kafka.common.config.types.Password;
import org.apache.kafka.common.security.TestSecurityConfig;
import org.apache.kafka.common.security.ssl.SslFactory;
import org.apache.kafka.common.metrics.Metrics;
import org.apache.kafka.common.protocol.SecurityProtocol;
import org.apache.kafka.common.utils.MockTime;
import org.apache.kafka.test.TestCondition;
import org.apache.kafka.test.TestUtils;
import org.apache.kafka.common.config.types.Password;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
@ -83,7 +84,7 @@ public class SslTransportLayerTest { @@ -83,7 +84,7 @@ public class SslTransportLayerTest {
@Test
public void testValidEndpointIdentification() throws Exception {
String node = "0";
server = NetworkTestUtils.createEchoServer(SecurityProtocol.SSL, sslServerConfigs);
server = createEchoServer(SecurityProtocol.SSL);
sslClientConfigs.put(SslConfigs.SSL_ENDPOINT_IDENTIFICATION_ALGORITHM_CONFIG, "HTTPS");
createSelector(sslClientConfigs);
InetSocketAddress addr = new InetSocketAddress("localhost", server.port());
@ -105,7 +106,7 @@ public class SslTransportLayerTest { @@ -105,7 +106,7 @@ public class SslTransportLayerTest {
sslServerConfigs = serverCertStores.getTrustingConfig(clientCertStores);
sslClientConfigs = clientCertStores.getTrustingConfig(serverCertStores);
sslClientConfigs.put(SslConfigs.SSL_ENDPOINT_IDENTIFICATION_ALGORITHM_CONFIG, "HTTPS");
server = NetworkTestUtils.createEchoServer(SecurityProtocol.SSL, sslServerConfigs);
server = createEchoServer(SecurityProtocol.SSL);
createSelector(sslClientConfigs);
InetSocketAddress addr = new InetSocketAddress("localhost", server.port());
selector.connect(node, addr, BUFFER_SIZE, BUFFER_SIZE);
@ -121,7 +122,9 @@ public class SslTransportLayerTest { @@ -121,7 +122,9 @@ public class SslTransportLayerTest {
public void testEndpointIdentificationDisabled() throws Exception {
String node = "0";
String serverHost = InetAddress.getLocalHost().getHostAddress();
server = new NioEchoServer(SecurityProtocol.SSL, sslServerConfigs, serverHost);
SecurityProtocol securityProtocol = SecurityProtocol.SSL;
server = new NioEchoServer(ListenerName.forSecurityProtocol(securityProtocol), securityProtocol,
new TestSecurityConfig(sslServerConfigs), serverHost);
server.start();
sslClientConfigs.remove(SslConfigs.SSL_ENDPOINT_IDENTIFICATION_ALGORITHM_CONFIG);
createSelector(sslClientConfigs);
@ -139,13 +142,53 @@ public class SslTransportLayerTest { @@ -139,13 +142,53 @@ public class SslTransportLayerTest {
public void testClientAuthenticationRequiredValidProvided() throws Exception {
String node = "0";
sslServerConfigs.put(SslConfigs.SSL_CLIENT_AUTH_CONFIG, "required");
server = NetworkTestUtils.createEchoServer(SecurityProtocol.SSL, sslServerConfigs);
server = createEchoServer(SecurityProtocol.SSL);
createSelector(sslClientConfigs);
InetSocketAddress addr = new InetSocketAddress("localhost", server.port());
selector.connect(node, addr, BUFFER_SIZE, BUFFER_SIZE);
NetworkTestUtils.checkClientConnection(selector, node, 100, 10);
}
/**
* Tests that disabling client authentication as a listener override has the desired effect.
*/
@Test
public void testListenerConfigOverride() throws Exception {
String node = "0";
ListenerName clientListenerName = new ListenerName("client");
sslServerConfigs.put(SslConfigs.SSL_CLIENT_AUTH_CONFIG, "required");
sslServerConfigs.put(clientListenerName.configPrefix() + SslConfigs.SSL_CLIENT_AUTH_CONFIG, "none");
// `client` listener is not configured at this point, so client auth should be required
server = createEchoServer(SecurityProtocol.SSL);
InetSocketAddress addr = new InetSocketAddress("localhost", server.port());
// Connect with client auth should work fine
createSelector(sslClientConfigs);
selector.connect(node, addr, BUFFER_SIZE, BUFFER_SIZE);
NetworkTestUtils.checkClientConnection(selector, node, 100, 10);
selector.close();
// Remove client auth, so connection should fail
sslClientConfigs.remove(SslConfigs.SSL_KEYSTORE_LOCATION_CONFIG);
sslClientConfigs.remove(SslConfigs.SSL_KEYSTORE_PASSWORD_CONFIG);
sslClientConfigs.remove(SslConfigs.SSL_KEY_PASSWORD_CONFIG);
createSelector(sslClientConfigs);
selector.connect(node, addr, BUFFER_SIZE, BUFFER_SIZE);
NetworkTestUtils.waitForChannelClose(selector, node);
selector.close();
server.close();
// Listener-specific config should be used and client auth should be disabled
server = createEchoServer(clientListenerName, SecurityProtocol.SSL);
addr = new InetSocketAddress("localhost", server.port());
// Connect without client auth should work fine now
createSelector(sslClientConfigs);
selector.connect(node, addr, BUFFER_SIZE, BUFFER_SIZE);
NetworkTestUtils.checkClientConnection(selector, node, 100, 10);
}
/**
* Tests that server does not accept connections from clients with an untrusted certificate
@ -156,7 +199,7 @@ public class SslTransportLayerTest { @@ -156,7 +199,7 @@ public class SslTransportLayerTest {
String node = "0";
sslServerConfigs = serverCertStores.getUntrustingConfig();
sslServerConfigs.put(SslConfigs.SSL_CLIENT_AUTH_CONFIG, "required");
server = NetworkTestUtils.createEchoServer(SecurityProtocol.SSL, sslServerConfigs);
server = createEchoServer(SecurityProtocol.SSL);
createSelector(sslClientConfigs);
InetSocketAddress addr = new InetSocketAddress("localhost", server.port());
selector.connect(node, addr, BUFFER_SIZE, BUFFER_SIZE);
@ -172,7 +215,7 @@ public class SslTransportLayerTest { @@ -172,7 +215,7 @@ public class SslTransportLayerTest {
public void testClientAuthenticationRequiredNotProvided() throws Exception {
String node = "0";
sslServerConfigs.put(SslConfigs.SSL_CLIENT_AUTH_CONFIG, "required");
server = NetworkTestUtils.createEchoServer(SecurityProtocol.SSL, sslServerConfigs);
server = createEchoServer(SecurityProtocol.SSL);
sslClientConfigs.remove(SslConfigs.SSL_KEYSTORE_LOCATION_CONFIG);
sslClientConfigs.remove(SslConfigs.SSL_KEYSTORE_PASSWORD_CONFIG);
@ -193,7 +236,7 @@ public class SslTransportLayerTest { @@ -193,7 +236,7 @@ public class SslTransportLayerTest {
String node = "0";
sslServerConfigs = serverCertStores.getUntrustingConfig();
sslServerConfigs.put(SslConfigs.SSL_CLIENT_AUTH_CONFIG, "none");
server = NetworkTestUtils.createEchoServer(SecurityProtocol.SSL, sslServerConfigs);
server = createEchoServer(SecurityProtocol.SSL);
createSelector(sslClientConfigs);
InetSocketAddress addr = new InetSocketAddress("localhost", server.port());
selector.connect(node, addr, BUFFER_SIZE, BUFFER_SIZE);
@ -209,7 +252,7 @@ public class SslTransportLayerTest { @@ -209,7 +252,7 @@ public class SslTransportLayerTest {
public void testClientAuthenticationDisabledNotProvided() throws Exception {
String node = "0";
sslServerConfigs.put(SslConfigs.SSL_CLIENT_AUTH_CONFIG, "none");
server = NetworkTestUtils.createEchoServer(SecurityProtocol.SSL, sslServerConfigs);
server = createEchoServer(SecurityProtocol.SSL);
sslClientConfigs.remove(SslConfigs.SSL_KEYSTORE_LOCATION_CONFIG);
sslClientConfigs.remove(SslConfigs.SSL_KEYSTORE_PASSWORD_CONFIG);
@ -229,7 +272,7 @@ public class SslTransportLayerTest { @@ -229,7 +272,7 @@ public class SslTransportLayerTest {
public void testClientAuthenticationRequestedValidProvided() throws Exception {
String node = "0";
sslServerConfigs.put(SslConfigs.SSL_CLIENT_AUTH_CONFIG, "requested");
server = NetworkTestUtils.createEchoServer(SecurityProtocol.SSL, sslServerConfigs);
server = createEchoServer(SecurityProtocol.SSL);
createSelector(sslClientConfigs);
InetSocketAddress addr = new InetSocketAddress("localhost", server.port());
selector.connect(node, addr, BUFFER_SIZE, BUFFER_SIZE);
@ -245,7 +288,7 @@ public class SslTransportLayerTest { @@ -245,7 +288,7 @@ public class SslTransportLayerTest {
public void testClientAuthenticationRequestedNotProvided() throws Exception {
String node = "0";
sslServerConfigs.put(SslConfigs.SSL_CLIENT_AUTH_CONFIG, "requested");
server = NetworkTestUtils.createEchoServer(SecurityProtocol.SSL, sslServerConfigs);
server = createEchoServer(SecurityProtocol.SSL);
sslClientConfigs.remove(SslConfigs.SSL_KEYSTORE_LOCATION_CONFIG);
sslClientConfigs.remove(SslConfigs.SSL_KEYSTORE_PASSWORD_CONFIG);
@ -310,7 +353,7 @@ public class SslTransportLayerTest { @@ -310,7 +353,7 @@ public class SslTransportLayerTest {
public void testInvalidKeyPassword() throws Exception {
String node = "0";
sslServerConfigs.put(SslConfigs.SSL_KEY_PASSWORD_CONFIG, new Password("invalid"));
server = NetworkTestUtils.createEchoServer(SecurityProtocol.SSL, sslServerConfigs);
server = createEchoServer(SecurityProtocol.SSL);
createSelector(sslClientConfigs);
InetSocketAddress addr = new InetSocketAddress("localhost", server.port());
selector.connect(node, addr, BUFFER_SIZE, BUFFER_SIZE);
@ -325,7 +368,7 @@ public class SslTransportLayerTest { @@ -325,7 +368,7 @@ public class SslTransportLayerTest {
public void testUnsupportedTLSVersion() throws Exception {
String node = "0";
sslServerConfigs.put(SslConfigs.SSL_ENABLED_PROTOCOLS_CONFIG, Arrays.asList("TLSv1.2"));
server = NetworkTestUtils.createEchoServer(SecurityProtocol.SSL, sslServerConfigs);
server = createEchoServer(SecurityProtocol.SSL);
sslClientConfigs.put(SslConfigs.SSL_ENABLED_PROTOCOLS_CONFIG, Arrays.asList("TLSv1.1"));
createSelector(sslClientConfigs);
@ -343,7 +386,7 @@ public class SslTransportLayerTest { @@ -343,7 +386,7 @@ public class SslTransportLayerTest {
String node = "0";
String[] cipherSuites = SSLContext.getDefault().getDefaultSSLParameters().getCipherSuites();
sslServerConfigs.put(SslConfigs.SSL_CIPHER_SUITES_CONFIG, Arrays.asList(cipherSuites[0]));
server = NetworkTestUtils.createEchoServer(SecurityProtocol.SSL, sslServerConfigs);
server = createEchoServer(SecurityProtocol.SSL);
sslClientConfigs.put(SslConfigs.SSL_CIPHER_SUITES_CONFIG, Arrays.asList(cipherSuites[1]));
createSelector(sslClientConfigs);
@ -359,7 +402,7 @@ public class SslTransportLayerTest { @@ -359,7 +402,7 @@ public class SslTransportLayerTest {
@Test
public void testNetReadBufferResize() throws Exception {
String node = "0";
server = NetworkTestUtils.createEchoServer(SecurityProtocol.SSL, sslServerConfigs);
server = createEchoServer(SecurityProtocol.SSL);
createSelector(sslClientConfigs, 10, null, null);
InetSocketAddress addr = new InetSocketAddress("localhost", server.port());
selector.connect(node, addr, BUFFER_SIZE, BUFFER_SIZE);
@ -373,7 +416,7 @@ public class SslTransportLayerTest { @@ -373,7 +416,7 @@ public class SslTransportLayerTest {
@Test
public void testNetWriteBufferResize() throws Exception {
String node = "0";
server = NetworkTestUtils.createEchoServer(SecurityProtocol.SSL, sslServerConfigs);
server = createEchoServer(SecurityProtocol.SSL);
createSelector(sslClientConfigs, null, 10, null);
InetSocketAddress addr = new InetSocketAddress("localhost", server.port());
selector.connect(node, addr, BUFFER_SIZE, BUFFER_SIZE);
@ -387,7 +430,7 @@ public class SslTransportLayerTest { @@ -387,7 +430,7 @@ public class SslTransportLayerTest {
@Test
public void testApplicationBufferResize() throws Exception {
String node = "0";
server = NetworkTestUtils.createEchoServer(SecurityProtocol.SSL, sslServerConfigs);
server = createEchoServer(SecurityProtocol.SSL);
createSelector(sslClientConfigs, null, null, 10);
InetSocketAddress addr = new InetSocketAddress("localhost", server.port());
selector.connect(node, addr, BUFFER_SIZE, BUFFER_SIZE);
@ -407,7 +450,7 @@ public class SslTransportLayerTest { @@ -407,7 +450,7 @@ public class SslTransportLayerTest {
private void testClose(SecurityProtocol securityProtocol, ChannelBuilder clientChannelBuilder) throws Exception {
String node = "0";
server = NetworkTestUtils.createEchoServer(securityProtocol, sslServerConfigs);
server = createEchoServer(securityProtocol);
clientChannelBuilder.configure(sslClientConfigs);
this.selector = new Selector(5000, new Metrics(), new MockTime(), "MetricGroup", clientChannelBuilder);
InetSocketAddress addr = new InetSocketAddress("localhost", server.port());
@ -441,7 +484,8 @@ public class SslTransportLayerTest { @@ -441,7 +484,8 @@ public class SslTransportLayerTest {
createSelector(sslClientConfigs, null, null, null);
}
private void createSelector(Map<String, Object> sslClientConfigs, final Integer netReadBufSize, final Integer netWriteBufSize, final Integer appBufSize) {
private void createSelector(Map<String, Object> sslClientConfigs, final Integer netReadBufSize,
final Integer netWriteBufSize, final Integer appBufSize) {
this.channelBuilder = new SslChannelBuilder(Mode.CLIENT) {
@ -460,6 +504,14 @@ public class SslTransportLayerTest { @@ -460,6 +504,14 @@ public class SslTransportLayerTest {
this.channelBuilder.configure(sslClientConfigs);
this.selector = new Selector(5000, new Metrics(), new MockTime(), "MetricGroup", channelBuilder);
}
private NioEchoServer createEchoServer(ListenerName listenerName, SecurityProtocol securityProtocol) throws Exception {
return NetworkTestUtils.createEchoServer(listenerName, securityProtocol, new TestSecurityConfig(sslServerConfigs));
}
private NioEchoServer createEchoServer(SecurityProtocol securityProtocol) throws Exception {
return createEchoServer(ListenerName.forSecurityProtocol(securityProtocol), securityProtocol);
}
/**
* SSLTransportLayer with overrides for packet and application buffer size to test buffer resize

93
clients/src/test/java/org/apache/kafka/common/security/JaasUtilsTest.java → clients/src/test/java/org/apache/kafka/common/security/JaasContextTest.java

@ -21,6 +21,7 @@ import java.io.IOException; @@ -21,6 +21,7 @@ import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.nio.file.Files;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
@ -35,7 +36,7 @@ import static org.junit.Assert.fail; @@ -35,7 +36,7 @@ import static org.junit.Assert.fail;
import org.apache.kafka.common.config.SaslConfigs;
import org.apache.kafka.common.config.types.Password;
import org.apache.kafka.common.network.LoginType;
import org.apache.kafka.common.network.ListenerName;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
@ -44,7 +45,7 @@ import org.junit.Test; @@ -44,7 +45,7 @@ import org.junit.Test;
* Tests parsing of {@link SaslConfigs#SASL_JAAS_CONFIG} property and verifies that the format
* and parsing are consistent with JAAS configuration files loaded by the JRE.
*/
public class JaasUtilsTest {
public class JaasContextTest {
private File jaasConfigFile;
@ -129,8 +130,9 @@ public class JaasUtilsTest { @@ -129,8 +130,9 @@ public class JaasUtilsTest {
}
String jaasConfigProp = builder.toString();
Configuration configuration = new JaasConfig(LoginType.CLIENT, jaasConfigProp);
AppConfigurationEntry[] dynamicEntries = configuration.getAppConfigurationEntry(LoginType.CLIENT.contextName());
String clientContextName = "CLIENT";
Configuration configuration = new JaasConfig(clientContextName, jaasConfigProp);
AppConfigurationEntry[] dynamicEntries = configuration.getAppConfigurationEntry(clientContextName);
assertEquals(moduleCount, dynamicEntries.length);
for (int i = 0; i < moduleCount; i++) {
@ -138,8 +140,9 @@ public class JaasUtilsTest { @@ -138,8 +140,9 @@ public class JaasUtilsTest {
checkEntry(entry, "test.Module" + i, LoginModuleControlFlag.REQUIRED, moduleOptions.get(i));
}
writeConfiguration(LoginType.SERVER, jaasConfigProp);
AppConfigurationEntry[] staticEntries = Configuration.getConfiguration().getAppConfigurationEntry(LoginType.SERVER.contextName());
String serverContextName = "SERVER";
writeConfiguration(serverContextName, jaasConfigProp);
AppConfigurationEntry[] staticEntries = Configuration.getConfiguration().getAppConfigurationEntry(serverContextName);
for (int i = 0; i < moduleCount; i++) {
AppConfigurationEntry staticEntry = staticEntries[i];
checkEntry(staticEntry, dynamicEntries[i].getLoginModuleName(), LoginModuleControlFlag.REQUIRED, dynamicEntries[i].getOptions());
@ -179,14 +182,60 @@ public class JaasUtilsTest { @@ -179,14 +182,60 @@ public class JaasUtilsTest {
checkConfiguration(config, "test.testNumericOptionWithQuotes", LoginModuleControlFlag.REQUIRED, options);
}
private AppConfigurationEntry configurationEntry(LoginType loginType, String jaasConfigProp) {
@Test
public void testLoadForServerWithListenerNameOverride() throws IOException {
writeConfiguration(Arrays.asList(
"KafkaServer { test.LoginModuleDefault required; };",
"plaintext.KafkaServer { test.LoginModuleOverride requisite; };"
));
JaasContext context = JaasContext.load(JaasContext.Type.SERVER, new ListenerName("plaintext"),
Collections.<String, Object>emptyMap());
assertEquals("plaintext.KafkaServer", context.name());
assertEquals(JaasContext.Type.SERVER, context.type());
assertEquals(1, context.configurationEntries().size());
checkEntry(context.configurationEntries().get(0), "test.LoginModuleOverride",
LoginModuleControlFlag.REQUISITE, Collections.<String, Object>emptyMap());
}
@Test
public void testLoadForServerWithListenerNameAndFallback() throws IOException {
writeConfiguration(Arrays.asList(
"KafkaServer { test.LoginModule required; };",
"other.KafkaServer { test.LoginModuleOther requisite; };"
));
JaasContext context = JaasContext.load(JaasContext.Type.SERVER, new ListenerName("plaintext"),
Collections.<String, Object>emptyMap());
assertEquals("KafkaServer", context.name());
assertEquals(JaasContext.Type.SERVER, context.type());
assertEquals(1, context.configurationEntries().size());
checkEntry(context.configurationEntries().get(0), "test.LoginModule", LoginModuleControlFlag.REQUIRED,
Collections.<String, Object>emptyMap());
}
@Test(expected = IllegalArgumentException.class)
public void testLoadForServerWithWrongListenerName() throws IOException {
writeConfiguration("Server", "test.LoginModule required;");
JaasContext.load(JaasContext.Type.SERVER, new ListenerName("plaintext"),
Collections.<String, Object>emptyMap());
}
/**
* ListenerName can only be used with Type.SERVER.
*/
@Test(expected = IllegalArgumentException.class)
public void testLoadForClientWithListenerName() {
JaasContext.load(JaasContext.Type.CLIENT, new ListenerName("foo"),
Collections.<String, Object>emptyMap());
}
private AppConfigurationEntry configurationEntry(JaasContext.Type contextType, String jaasConfigProp) {
Map<String, Object> configs = new HashMap<>();
if (jaasConfigProp != null)
configs.put(SaslConfigs.SASL_JAAS_CONFIG, new Password(jaasConfigProp));
Configuration configuration = JaasUtils.jaasConfig(loginType, configs);
AppConfigurationEntry[] entry = configuration.getAppConfigurationEntry(loginType.contextName());
assertEquals(1, entry.length);
return entry[0];
JaasContext context = JaasContext.load(contextType, null, contextType.name(), configs);
List<AppConfigurationEntry> entries = context.configurationEntries();
assertEquals(1, entries.size());
return entries.get(0);
}
private String controlFlag(LoginModuleControlFlag loginModuleControlFlag) {
@ -210,8 +259,12 @@ public class JaasUtilsTest { @@ -210,8 +259,12 @@ public class JaasUtilsTest {
return builder.toString();
}
private void writeConfiguration(LoginType loginType, String jaasConfigProp) throws IOException {
List<String> lines = Arrays.asList(loginType.contextName() + " { ", jaasConfigProp, "};");
private void writeConfiguration(String contextName, String jaasConfigProp) throws IOException {
List<String> lines = Arrays.asList(contextName + " { ", jaasConfigProp, "};");
writeConfiguration(lines);
}
private void writeConfiguration(List<String> lines) throws IOException {
Files.write(jaasConfigFile.toPath(), lines, StandardCharsets.UTF_8);
Configuration.setConfiguration(null);
}
@ -228,25 +281,25 @@ public class JaasUtilsTest { @@ -228,25 +281,25 @@ public class JaasUtilsTest {
}
private void checkConfiguration(String jaasConfigProp, String loginModule, LoginModuleControlFlag controlFlag, Map<String, Object> options) throws Exception {
AppConfigurationEntry dynamicEntry = configurationEntry(LoginType.CLIENT, jaasConfigProp);
AppConfigurationEntry dynamicEntry = configurationEntry(JaasContext.Type.CLIENT, jaasConfigProp);
checkEntry(dynamicEntry, loginModule, controlFlag, options);
assertNull("Static configuration updated", Configuration.getConfiguration().getAppConfigurationEntry(LoginType.CLIENT.contextName()));
assertNull("Static configuration updated", Configuration.getConfiguration().getAppConfigurationEntry(JaasContext.Type.CLIENT.name()));
writeConfiguration(LoginType.SERVER, jaasConfigProp);
AppConfigurationEntry staticEntry = configurationEntry(LoginType.SERVER, null);
writeConfiguration(JaasContext.Type.SERVER.name(), jaasConfigProp);
AppConfigurationEntry staticEntry = configurationEntry(JaasContext.Type.SERVER, null);
checkEntry(staticEntry, loginModule, controlFlag, options);
}
private void checkInvalidConfiguration(String jaasConfigProp) throws IOException {
try {
writeConfiguration(LoginType.SERVER, jaasConfigProp);
AppConfigurationEntry entry = configurationEntry(LoginType.SERVER, null);
writeConfiguration(JaasContext.Type.SERVER.name(), jaasConfigProp);
AppConfigurationEntry entry = configurationEntry(JaasContext.Type.SERVER, null);
fail("Invalid JAAS configuration file didn't throw exception, entry=" + entry);
} catch (SecurityException e) {
// Expected exception
}
try {
AppConfigurationEntry entry = configurationEntry(LoginType.CLIENT, jaasConfigProp);
AppConfigurationEntry entry = configurationEntry(JaasContext.Type.CLIENT, jaasConfigProp);
fail("Invalid JAAS configuration property didn't throw exception, entry=" + entry);
} catch (IllegalArgumentException e) {
// Expected exception

38
clients/src/test/java/org/apache/kafka/common/security/TestSecurityConfig.java

@ -0,0 +1,38 @@ @@ -0,0 +1,38 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE
* file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file
* to You under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the
* License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
* an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
* specific language governing permissions and limitations under the License.
*/
package org.apache.kafka.common.security;
import org.apache.kafka.common.config.AbstractConfig;
import org.apache.kafka.common.config.ConfigDef;
import org.apache.kafka.common.config.ConfigDef.Importance;
import org.apache.kafka.common.config.ConfigDef.Type;
import org.apache.kafka.common.config.SaslConfigs;
import org.apache.kafka.common.config.SslConfigs;
import java.util.Map;
public class TestSecurityConfig extends AbstractConfig {
private static final ConfigDef CONFIG = new ConfigDef()
.define(SslConfigs.SSL_CLIENT_AUTH_CONFIG, Type.STRING, null, Importance.MEDIUM,
SslConfigs.SSL_CLIENT_AUTH_DOC)
.define(SaslConfigs.SASL_ENABLED_MECHANISMS, Type.LIST, SaslConfigs.DEFAULT_SASL_ENABLED_MECHANISMS,
Importance.MEDIUM, SaslConfigs.SASL_ENABLED_MECHANISMS_DOC)
.define(SslConfigs.PRINCIPAL_BUILDER_CLASS_CONFIG, Type.CLASS, SslConfigs.DEFAULT_PRINCIPAL_BUILDER_CLASS,
Importance.MEDIUM, SslConfigs.PRINCIPAL_BUILDER_CLASS_DOC)
.withClientSslSupport()
.withClientSaslSupport();
public TestSecurityConfig(Map<?, ?> originals) {
super(CONFIG, originals, false);
}
}

133
clients/src/test/java/org/apache/kafka/common/security/authenticator/SaslAuthenticatorTest.java

@ -19,7 +19,7 @@ import org.apache.kafka.common.config.types.Password; @@ -19,7 +19,7 @@ import org.apache.kafka.common.config.types.Password;
import org.apache.kafka.common.network.CertStores;
import org.apache.kafka.common.network.ChannelBuilder;
import org.apache.kafka.common.network.ChannelBuilders;
import org.apache.kafka.common.network.LoginType;
import org.apache.kafka.common.network.ListenerName;
import org.apache.kafka.common.network.NetworkSend;
import org.apache.kafka.common.network.NetworkTestUtils;
import org.apache.kafka.common.network.NioEchoServer;
@ -38,7 +38,8 @@ import org.apache.kafka.common.requests.RequestHeader; @@ -38,7 +38,8 @@ import org.apache.kafka.common.requests.RequestHeader;
import org.apache.kafka.common.requests.ResponseHeader;
import org.apache.kafka.common.requests.SaslHandshakeRequest;
import org.apache.kafka.common.requests.SaslHandshakeResponse;
import org.apache.kafka.common.security.JaasUtils;
import org.apache.kafka.common.security.JaasContext;
import org.apache.kafka.common.security.TestSecurityConfig;
import org.apache.kafka.common.security.plain.PlainLoginModule;
import org.apache.kafka.common.security.scram.ScramCredential;
import org.apache.kafka.common.security.scram.ScramFormatter;
@ -63,7 +64,6 @@ import java.util.Random; @@ -63,7 +64,6 @@ import java.util.Random;
import javax.security.auth.login.Configuration;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
/**
@ -106,7 +106,7 @@ public class SaslAuthenticatorTest { @@ -106,7 +106,7 @@ public class SaslAuthenticatorTest {
SecurityProtocol securityProtocol = SecurityProtocol.SASL_SSL;
configureMechanisms("PLAIN", Arrays.asList("PLAIN"));
server = NetworkTestUtils.createEchoServer(securityProtocol, saslServerConfigs);
server = createEchoServer(securityProtocol);
createAndCheckClientConnection(securityProtocol, node);
}
@ -119,7 +119,7 @@ public class SaslAuthenticatorTest { @@ -119,7 +119,7 @@ public class SaslAuthenticatorTest {
SecurityProtocol securityProtocol = SecurityProtocol.SASL_PLAINTEXT;
configureMechanisms("PLAIN", Arrays.asList("PLAIN"));
server = NetworkTestUtils.createEchoServer(securityProtocol, saslServerConfigs);
server = createEchoServer(securityProtocol);
createAndCheckClientConnection(securityProtocol, node);
}
@ -133,7 +133,7 @@ public class SaslAuthenticatorTest { @@ -133,7 +133,7 @@ public class SaslAuthenticatorTest {
TestJaasConfig jaasConfig = configureMechanisms("PLAIN", Arrays.asList("PLAIN"));
jaasConfig.setPlainClientOptions(TestJaasConfig.USERNAME, "invalidpassword");
server = NetworkTestUtils.createEchoServer(securityProtocol, saslServerConfigs);
server = createEchoServer(securityProtocol);
createClientConnection(securityProtocol, node);
NetworkTestUtils.waitForChannelClose(selector, node);
}
@ -148,7 +148,7 @@ public class SaslAuthenticatorTest { @@ -148,7 +148,7 @@ public class SaslAuthenticatorTest {
TestJaasConfig jaasConfig = configureMechanisms("PLAIN", Arrays.asList("PLAIN"));
jaasConfig.setPlainClientOptions("invaliduser", TestJaasConfig.PASSWORD);
server = NetworkTestUtils.createEchoServer(securityProtocol, saslServerConfigs);
server = createEchoServer(securityProtocol);
createClientConnection(securityProtocol, node);
NetworkTestUtils.waitForChannelClose(selector, node);
}
@ -163,7 +163,7 @@ public class SaslAuthenticatorTest { @@ -163,7 +163,7 @@ public class SaslAuthenticatorTest {
jaasConfig.setPlainClientOptions(null, "mypassword");
SecurityProtocol securityProtocol = SecurityProtocol.SASL_SSL;
server = NetworkTestUtils.createEchoServer(securityProtocol, saslServerConfigs);
server = createEchoServer(securityProtocol);
createSelector(securityProtocol, saslClientConfigs);
InetSocketAddress addr = new InetSocketAddress("127.0.0.1", server.port());
try {
@ -184,7 +184,7 @@ public class SaslAuthenticatorTest { @@ -184,7 +184,7 @@ public class SaslAuthenticatorTest {
jaasConfig.setPlainClientOptions("myuser", null);
SecurityProtocol securityProtocol = SecurityProtocol.SASL_SSL;
server = NetworkTestUtils.createEchoServer(securityProtocol, saslServerConfigs);
server = createEchoServer(securityProtocol);
createSelector(securityProtocol, saslClientConfigs);
InetSocketAddress addr = new InetSocketAddress("127.0.0.1", server.port());
try {
@ -205,7 +205,7 @@ public class SaslAuthenticatorTest { @@ -205,7 +205,7 @@ public class SaslAuthenticatorTest {
SecurityProtocol securityProtocol = SecurityProtocol.SASL_SSL;
configureMechanisms("DIGEST-MD5", Arrays.asList("DIGEST-MD5"));
server = NetworkTestUtils.createEchoServer(securityProtocol, saslServerConfigs);
server = createEchoServer(securityProtocol);
createAndCheckClientConnection(securityProtocol, node);
}
@ -217,7 +217,7 @@ public class SaslAuthenticatorTest { @@ -217,7 +217,7 @@ public class SaslAuthenticatorTest {
public void testMultipleServerMechanisms() throws Exception {
SecurityProtocol securityProtocol = SecurityProtocol.SASL_SSL;
configureMechanisms("DIGEST-MD5", Arrays.asList("DIGEST-MD5", "PLAIN", "SCRAM-SHA-256"));
server = NetworkTestUtils.createEchoServer(securityProtocol, saslServerConfigs);
server = createEchoServer(securityProtocol);
updateScramCredentialCache(TestJaasConfig.USERNAME, TestJaasConfig.PASSWORD);
String node1 = "1";
@ -230,15 +230,12 @@ public class SaslAuthenticatorTest { @@ -230,15 +230,12 @@ public class SaslAuthenticatorTest {
InetSocketAddress addr = new InetSocketAddress("127.0.0.1", server.port());
selector.connect(node2, addr, BUFFER_SIZE, BUFFER_SIZE);
NetworkTestUtils.checkClientConnection(selector, node2, 100, 10);
selector.close();
String node3 = "3";
saslClientConfigs.put(SaslConfigs.SASL_MECHANISM, "SCRAM-SHA-256");
createSelector(securityProtocol, saslClientConfigs);
selector.connect(node3, new InetSocketAddress("127.0.0.1", server.port()), BUFFER_SIZE, BUFFER_SIZE);
NetworkTestUtils.checkClientConnection(selector, node3, 100, 10);
selector.close();
selector = null;
}
/**
@ -249,7 +246,7 @@ public class SaslAuthenticatorTest { @@ -249,7 +246,7 @@ public class SaslAuthenticatorTest {
SecurityProtocol securityProtocol = SecurityProtocol.SASL_SSL;
configureMechanisms("SCRAM-SHA-256", Arrays.asList("SCRAM-SHA-256"));
server = NetworkTestUtils.createEchoServer(securityProtocol, saslServerConfigs);
server = createEchoServer(securityProtocol);
updateScramCredentialCache(TestJaasConfig.USERNAME, TestJaasConfig.PASSWORD);
createAndCheckClientConnection(securityProtocol, "0");
}
@ -262,7 +259,7 @@ public class SaslAuthenticatorTest { @@ -262,7 +259,7 @@ public class SaslAuthenticatorTest {
public void testValidSaslScramMechanisms() throws Exception {
SecurityProtocol securityProtocol = SecurityProtocol.SASL_SSL;
configureMechanisms("SCRAM-SHA-256", new ArrayList<>(ScramMechanism.mechanismNames()));
server = NetworkTestUtils.createEchoServer(securityProtocol, saslServerConfigs);
server = createEchoServer(securityProtocol);
updateScramCredentialCache(TestJaasConfig.USERNAME, TestJaasConfig.PASSWORD);
for (String mechanism : ScramMechanism.mechanismNames()) {
@ -281,10 +278,10 @@ public class SaslAuthenticatorTest { @@ -281,10 +278,10 @@ public class SaslAuthenticatorTest {
Map<String, Object> options = new HashMap<>();
options.put("username", TestJaasConfig.USERNAME);
options.put("password", "invalidpassword");
jaasConfig.createOrUpdateEntry(JaasUtils.LOGIN_CONTEXT_CLIENT, ScramLoginModule.class.getName(), options);
jaasConfig.createOrUpdateEntry(TestJaasConfig.LOGIN_CONTEXT_CLIENT, ScramLoginModule.class.getName(), options);
String node = "0";
server = NetworkTestUtils.createEchoServer(securityProtocol, saslServerConfigs);
server = createEchoServer(securityProtocol);
updateScramCredentialCache(TestJaasConfig.USERNAME, TestJaasConfig.PASSWORD);
createClientConnection(securityProtocol, node);
NetworkTestUtils.waitForChannelClose(selector, node);
@ -300,10 +297,10 @@ public class SaslAuthenticatorTest { @@ -300,10 +297,10 @@ public class SaslAuthenticatorTest {
Map<String, Object> options = new HashMap<>();
options.put("username", "unknownUser");
options.put("password", TestJaasConfig.PASSWORD);
jaasConfig.createOrUpdateEntry(JaasUtils.LOGIN_CONTEXT_CLIENT, ScramLoginModule.class.getName(), options);
jaasConfig.createOrUpdateEntry(TestJaasConfig.LOGIN_CONTEXT_CLIENT, ScramLoginModule.class.getName(), options);
String node = "0";
server = NetworkTestUtils.createEchoServer(securityProtocol, saslServerConfigs);
server = createEchoServer(securityProtocol);
updateScramCredentialCache(TestJaasConfig.USERNAME, TestJaasConfig.PASSWORD);
createClientConnection(securityProtocol, node);
NetworkTestUtils.waitForChannelClose(selector, node);
@ -317,7 +314,7 @@ public class SaslAuthenticatorTest { @@ -317,7 +314,7 @@ public class SaslAuthenticatorTest {
public void testUserCredentialsUnavailableForScramMechanism() throws Exception {
SecurityProtocol securityProtocol = SecurityProtocol.SASL_SSL;
configureMechanisms("SCRAM-SHA-256", new ArrayList<>(ScramMechanism.mechanismNames()));
server = NetworkTestUtils.createEchoServer(securityProtocol, saslServerConfigs);
server = createEchoServer(securityProtocol);
updateScramCredentialCache(TestJaasConfig.USERNAME, TestJaasConfig.PASSWORD);
server.credentialCache().cache(ScramMechanism.SCRAM_SHA_256.mechanismName(), ScramCredential.class).remove(TestJaasConfig.USERNAME);
@ -325,7 +322,6 @@ public class SaslAuthenticatorTest { @@ -325,7 +322,6 @@ public class SaslAuthenticatorTest {
saslClientConfigs.put(SaslConfigs.SASL_MECHANISM, "SCRAM-SHA-256");
createClientConnection(securityProtocol, node);
NetworkTestUtils.waitForChannelClose(selector, node);
selector.close();
saslClientConfigs.put(SaslConfigs.SASL_MECHANISM, "SCRAM-SHA-512");
createAndCheckClientConnection(securityProtocol, "2");
@ -344,9 +340,9 @@ public class SaslAuthenticatorTest { @@ -344,9 +340,9 @@ public class SaslAuthenticatorTest {
Map<String, Object> options = new HashMap<>();
options.put("username", username);
options.put("password", password);
jaasConfig.createOrUpdateEntry(JaasUtils.LOGIN_CONTEXT_CLIENT, ScramLoginModule.class.getName(), options);
jaasConfig.createOrUpdateEntry(TestJaasConfig.LOGIN_CONTEXT_CLIENT, ScramLoginModule.class.getName(), options);
server = NetworkTestUtils.createEchoServer(securityProtocol, saslServerConfigs);
server = createEchoServer(securityProtocol);
updateScramCredentialCache(username, password);
createAndCheckClientConnection(securityProtocol, "0");
}
@ -386,7 +382,7 @@ public class SaslAuthenticatorTest { @@ -386,7 +382,7 @@ public class SaslAuthenticatorTest {
public void testApiVersionsRequestWithUnsupportedVersion() throws Exception {
SecurityProtocol securityProtocol = SecurityProtocol.SASL_PLAINTEXT;
configureMechanisms("PLAIN", Arrays.asList("PLAIN"));
server = NetworkTestUtils.createEchoServer(securityProtocol, saslServerConfigs);
server = createEchoServer(securityProtocol);
// Send ApiVersionsRequest with unsupported version and validate error response.
String node = "1";
@ -418,7 +414,7 @@ public class SaslAuthenticatorTest { @@ -418,7 +414,7 @@ public class SaslAuthenticatorTest {
public void testSaslHandshakeRequestWithUnsupportedVersion() throws Exception {
SecurityProtocol securityProtocol = SecurityProtocol.SASL_PLAINTEXT;
configureMechanisms("PLAIN", Arrays.asList("PLAIN"));
server = NetworkTestUtils.createEchoServer(securityProtocol, saslServerConfigs);
server = createEchoServer(securityProtocol);
// Send ApiVersionsRequest and validate error response.
String node1 = "invalid1";
@ -442,7 +438,7 @@ public class SaslAuthenticatorTest { @@ -442,7 +438,7 @@ public class SaslAuthenticatorTest {
public void testInvalidSaslPacket() throws Exception {
SecurityProtocol securityProtocol = SecurityProtocol.SASL_PLAINTEXT;
configureMechanisms("PLAIN", Arrays.asList("PLAIN"));
server = NetworkTestUtils.createEchoServer(securityProtocol, saslServerConfigs);
server = createEchoServer(securityProtocol);
// Send invalid SASL packet after valid handshake request
String node1 = "invalid1";
@ -481,7 +477,7 @@ public class SaslAuthenticatorTest { @@ -481,7 +477,7 @@ public class SaslAuthenticatorTest {
public void testInvalidApiVersionsRequestSequence() throws Exception {
SecurityProtocol securityProtocol = SecurityProtocol.SASL_PLAINTEXT;
configureMechanisms("PLAIN", Arrays.asList("PLAIN"));
server = NetworkTestUtils.createEchoServer(securityProtocol, saslServerConfigs);
server = createEchoServer(securityProtocol);
// Send handshake request followed by ApiVersionsRequest
String node1 = "invalid1";
@ -508,7 +504,7 @@ public class SaslAuthenticatorTest { @@ -508,7 +504,7 @@ public class SaslAuthenticatorTest {
public void testPacketSizeTooBig() throws Exception {
SecurityProtocol securityProtocol = SecurityProtocol.SASL_PLAINTEXT;
configureMechanisms("PLAIN", Arrays.asList("PLAIN"));
server = NetworkTestUtils.createEchoServer(securityProtocol, saslServerConfigs);
server = createEchoServer(securityProtocol);
// Send SASL packet with large size after valid handshake request
String node1 = "invalid1";
@ -548,7 +544,7 @@ public class SaslAuthenticatorTest { @@ -548,7 +544,7 @@ public class SaslAuthenticatorTest {
public void testDisallowedKafkaRequestsBeforeAuthentication() throws Exception {
SecurityProtocol securityProtocol = SecurityProtocol.SASL_PLAINTEXT;
configureMechanisms("PLAIN", Arrays.asList("PLAIN"));
server = NetworkTestUtils.createEchoServer(securityProtocol, saslServerConfigs);
server = createEchoServer(securityProtocol);
// Send metadata request before Kafka SASL handshake request
String node1 = "invalid1";
@ -586,10 +582,10 @@ public class SaslAuthenticatorTest { @@ -586,10 +582,10 @@ public class SaslAuthenticatorTest {
@Test
public void testInvalidLoginModule() throws Exception {
TestJaasConfig jaasConfig = configureMechanisms("PLAIN", Arrays.asList("PLAIN"));
jaasConfig.createOrUpdateEntry(JaasUtils.LOGIN_CONTEXT_CLIENT, "InvalidLoginModule", TestJaasConfig.defaultClientOptions());
jaasConfig.createOrUpdateEntry(TestJaasConfig.LOGIN_CONTEXT_CLIENT, "InvalidLoginModule", TestJaasConfig.defaultClientOptions());
SecurityProtocol securityProtocol = SecurityProtocol.SASL_SSL;
server = NetworkTestUtils.createEchoServer(securityProtocol, saslServerConfigs);
server = createEchoServer(securityProtocol);
try {
createSelector(securityProtocol, saslClientConfigs);
fail("SASL/PLAIN channel created without valid login module");
@ -608,7 +604,7 @@ public class SaslAuthenticatorTest { @@ -608,7 +604,7 @@ public class SaslAuthenticatorTest {
SecurityProtocol securityProtocol = SecurityProtocol.SASL_SSL;
configureMechanisms("PLAIN", Arrays.asList("DIGEST-MD5"));
server = NetworkTestUtils.createEchoServer(securityProtocol, saslServerConfigs);
server = createEchoServer(securityProtocol);
createClientConnection(securityProtocol, node);
NetworkTestUtils.waitForChannelClose(selector, node);
}
@ -623,7 +619,7 @@ public class SaslAuthenticatorTest { @@ -623,7 +619,7 @@ public class SaslAuthenticatorTest {
configureMechanisms("PLAIN", Arrays.asList("PLAIN"));
saslClientConfigs.put(SaslConfigs.SASL_MECHANISM, "INVALID");
server = NetworkTestUtils.createEchoServer(securityProtocol, saslServerConfigs);
server = createEchoServer(securityProtocol);
createClientConnection(securityProtocol, node);
NetworkTestUtils.waitForChannelClose(selector, node);
}
@ -642,10 +638,11 @@ public class SaslAuthenticatorTest { @@ -642,10 +638,11 @@ public class SaslAuthenticatorTest {
serverOptions.put("user_user1", "user1-secret");
serverOptions.put("user_user2", "user2-secret");
TestJaasConfig staticJaasConfig = new TestJaasConfig();
staticJaasConfig.createOrUpdateEntry(JaasUtils.LOGIN_CONTEXT_SERVER, PlainLoginModule.class.getName(), serverOptions);
staticJaasConfig.createOrUpdateEntry(TestJaasConfig.LOGIN_CONTEXT_SERVER, PlainLoginModule.class.getName(),
serverOptions);
staticJaasConfig.setPlainClientOptions("user1", "invalidpassword");
Configuration.setConfiguration(staticJaasConfig);
server = NetworkTestUtils.createEchoServer(securityProtocol, saslServerConfigs);
server = createEchoServer(securityProtocol);
// Check that client using static Jaas config does not connect since password is invalid
createAndCheckClientConnectionFailure(securityProtocol, "1");
@ -669,11 +666,52 @@ public class SaslAuthenticatorTest { @@ -669,11 +666,52 @@ public class SaslAuthenticatorTest {
try {
createClientConnection(securityProtocol, "1");
fail("Connection created with multiple login modules in sasl.jaas.config");
} catch (KafkaException e) {
assertTrue("Unexpected exception " + e, e.getCause() instanceof IllegalArgumentException);
} catch (IllegalArgumentException e) {
// Expected
}
}
@Test
public void testJaasConfigurationForListener() throws Exception {
SecurityProtocol securityProtocol = SecurityProtocol.SASL_PLAINTEXT;
saslClientConfigs.put(SaslConfigs.SASL_MECHANISM, "PLAIN");
saslServerConfigs.put(SaslConfigs.SASL_ENABLED_MECHANISMS, Arrays.asList("PLAIN"));
TestJaasConfig staticJaasConfig = new TestJaasConfig();
Map<String, Object> globalServerOptions = new HashMap<>();
globalServerOptions.put("user_global1", "gsecret1");
globalServerOptions.put("user_global2", "gsecret2");
staticJaasConfig.createOrUpdateEntry(TestJaasConfig.LOGIN_CONTEXT_SERVER, PlainLoginModule.class.getName(),
globalServerOptions);
Map<String, Object> clientListenerServerOptions = new HashMap<>();
clientListenerServerOptions.put("user_client1", "csecret1");
clientListenerServerOptions.put("user_client2", "csecret2");
String clientJaasEntryName = "client." + TestJaasConfig.LOGIN_CONTEXT_SERVER;
staticJaasConfig.createOrUpdateEntry(clientJaasEntryName, PlainLoginModule.class.getName(), clientListenerServerOptions);
Configuration.setConfiguration(staticJaasConfig);
// Listener-specific credentials
server = createEchoServer(new ListenerName("client"), securityProtocol);
saslClientConfigs.put(SaslConfigs.SASL_JAAS_CONFIG,
TestJaasConfig.jaasConfigProperty("PLAIN", "client1", "csecret1"));
createAndCheckClientConnection(securityProtocol, "1");
saslClientConfigs.put(SaslConfigs.SASL_JAAS_CONFIG,
TestJaasConfig.jaasConfigProperty("PLAIN", "global1", "gsecret1"));
createAndCheckClientConnectionFailure(securityProtocol, "2");
server.close();
// Global credentials as there is no listener-specific JAAS entry
server = createEchoServer(new ListenerName("other"), securityProtocol);
saslClientConfigs.put(SaslConfigs.SASL_JAAS_CONFIG,
TestJaasConfig.jaasConfigProperty("PLAIN", "global1", "gsecret1"));
createAndCheckClientConnection(securityProtocol, "3");
saslClientConfigs.put(SaslConfigs.SASL_JAAS_CONFIG,
TestJaasConfig.jaasConfigProperty("PLAIN", "client1", "csecret1"));
createAndCheckClientConnectionFailure(securityProtocol, "4");
}
/**
* Tests that Kafka ApiVersionsRequests are handled by the SASL server authenticator
* prior to SASL handshake flow and that subsequent authentication succeeds
@ -700,7 +738,7 @@ public class SaslAuthenticatorTest { @@ -700,7 +738,7 @@ public class SaslAuthenticatorTest {
*/
private void testUnauthenticatedApiVersionsRequest(SecurityProtocol securityProtocol) throws Exception {
configureMechanisms("PLAIN", Arrays.asList("PLAIN"));
server = NetworkTestUtils.createEchoServer(securityProtocol, saslServerConfigs);
server = createEchoServer(securityProtocol);
// Create non-SASL connection to manually authenticate after ApiVersionsRequest
String node = "1";
@ -748,11 +786,26 @@ public class SaslAuthenticatorTest { @@ -748,11 +786,26 @@ public class SaslAuthenticatorTest {
}
private void createSelector(SecurityProtocol securityProtocol, Map<String, Object> clientConfigs) {
if (selector != null) {
selector.close();
selector = null;
}
String saslMechanism = (String) saslClientConfigs.get(SaslConfigs.SASL_MECHANISM);
this.channelBuilder = ChannelBuilders.clientChannelBuilder(securityProtocol, LoginType.CLIENT, clientConfigs, saslMechanism, true);
this.channelBuilder = ChannelBuilders.clientChannelBuilder(securityProtocol, JaasContext.Type.CLIENT,
new TestSecurityConfig(clientConfigs), null, saslMechanism, true);
this.selector = NetworkTestUtils.createSelector(channelBuilder);
}
private NioEchoServer createEchoServer(SecurityProtocol securityProtocol) throws Exception {
return createEchoServer(ListenerName.forSecurityProtocol(securityProtocol), securityProtocol);
}
private NioEchoServer createEchoServer(ListenerName listenerName, SecurityProtocol securityProtocol) throws Exception {
return NetworkTestUtils.createEchoServer(listenerName, securityProtocol,
new TestSecurityConfig(saslServerConfigs));
}
private void createClientConnection(SecurityProtocol securityProtocol, String node) throws Exception {
createSelector(securityProtocol, saslClientConfigs);
InetSocketAddress addr = new InetSocketAddress("127.0.0.1", server.port());

10
clients/src/test/java/org/apache/kafka/common/security/authenticator/TestJaasConfig.java

@ -22,13 +22,15 @@ import javax.security.auth.login.Configuration; @@ -22,13 +22,15 @@ import javax.security.auth.login.Configuration;
import javax.security.auth.login.AppConfigurationEntry.LoginModuleControlFlag;
import org.apache.kafka.common.config.types.Password;
import org.apache.kafka.common.security.JaasUtils;
import org.apache.kafka.common.security.plain.PlainLoginModule;
import org.apache.kafka.common.security.scram.ScramLoginModule;
import org.apache.kafka.common.security.scram.ScramMechanism;
public class TestJaasConfig extends Configuration {
static final String LOGIN_CONTEXT_CLIENT = "KafkaClient";
static final String LOGIN_CONTEXT_SERVER = "KafkaServer";
static final String USERNAME = "myuser";
static final String PASSWORD = "mypassword";
@ -36,9 +38,9 @@ public class TestJaasConfig extends Configuration { @@ -36,9 +38,9 @@ public class TestJaasConfig extends Configuration {
public static TestJaasConfig createConfiguration(String clientMechanism, List<String> serverMechanisms) {
TestJaasConfig config = new TestJaasConfig();
config.createOrUpdateEntry(JaasUtils.LOGIN_CONTEXT_CLIENT, loginModule(clientMechanism), defaultClientOptions());
config.createOrUpdateEntry(LOGIN_CONTEXT_CLIENT, loginModule(clientMechanism), defaultClientOptions());
for (String mechanism : serverMechanisms) {
config.addEntry(JaasUtils.LOGIN_CONTEXT_SERVER, loginModule(mechanism), defaultServerOptions(mechanism));
config.addEntry(LOGIN_CONTEXT_SERVER, loginModule(mechanism), defaultServerOptions(mechanism));
}
Configuration.setConfiguration(config);
return config;
@ -54,7 +56,7 @@ public class TestJaasConfig extends Configuration { @@ -54,7 +56,7 @@ public class TestJaasConfig extends Configuration {
options.put("username", clientUsername);
if (clientPassword != null)
options.put("password", clientPassword);
createOrUpdateEntry(JaasUtils.LOGIN_CONTEXT_CLIENT, PlainLoginModule.class.getName(), options);
createOrUpdateEntry(LOGIN_CONTEXT_CLIENT, PlainLoginModule.class.getName(), options);
}
public void createOrUpdateEntry(String name, String loginModule, Map<String, Object> options) {

2
clients/src/test/java/org/apache/kafka/test/TestSslUtils.java

@ -167,7 +167,7 @@ public class TestSslUtils { @@ -167,7 +167,7 @@ public class TestSslUtils {
}
private static Map<String, Object> createSslConfig(Mode mode, File keyStoreFile, Password password, Password keyPassword,
File trustStoreFile, Password trustStorePassword) {
File trustStoreFile, Password trustStorePassword) {
Map<String, Object> sslConfigs = new HashMap<>();
sslConfigs.put(SslConfigs.SSL_PROTOCOL_CONFIG, "TLSv1.2"); // protocol to create SSLContext

2
connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/WorkerGroupMember.java

@ -90,7 +90,7 @@ public class WorkerGroupMember { @@ -90,7 +90,7 @@ public class WorkerGroupMember {
List<InetSocketAddress> addresses = ClientUtils.parseAndValidateAddresses(config.getList(CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG));
this.metadata.update(Cluster.bootstrap(addresses), 0);
String metricGrpPrefix = "connect";
ChannelBuilder channelBuilder = ClientUtils.createChannelBuilder(config.values());
ChannelBuilder channelBuilder = ClientUtils.createChannelBuilder(config);
NetworkClient netClient = new NetworkClient(
new Selector(config.getLong(CommonClientConfigs.CONNECTIONS_MAX_IDLE_MS_CONFIG), metrics, time, metricGrpPrefix, channelBuilder),
this.metadata,

2
core/src/main/scala/kafka/admin/AdminClient.scala

@ -226,7 +226,7 @@ object AdminClient { @@ -226,7 +226,7 @@ object AdminClient {
val time = Time.SYSTEM
val metrics = new Metrics(time)
val metadata = new Metadata
val channelBuilder = ClientUtils.createChannelBuilder(config.values())
val channelBuilder = ClientUtils.createChannelBuilder(config)
val brokerUrls = config.getList(CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG)
val brokerAddresses = ClientUtils.parseAndValidateAddresses(brokerUrls)

8
core/src/main/scala/kafka/controller/ControllerChannelManager.scala

@ -26,11 +26,12 @@ import kafka.server.KafkaConfig @@ -26,11 +26,12 @@ import kafka.server.KafkaConfig
import kafka.utils._
import org.apache.kafka.clients.{ClientResponse, ManualMetadataUpdater, NetworkClient}
import org.apache.kafka.common.metrics.Metrics
import org.apache.kafka.common.network.{ChannelBuilders, ListenerName, LoginType, NetworkReceive, Selectable, Selector}
import org.apache.kafka.common.network.{ChannelBuilders, ListenerName, NetworkReceive, Selectable, Selector}
import org.apache.kafka.common.protocol.{ApiKeys, SecurityProtocol}
import org.apache.kafka.common.requests
import org.apache.kafka.common.requests.{UpdateMetadataRequest, _}
import org.apache.kafka.common.requests.UpdateMetadataRequest.EndPoint
import org.apache.kafka.common.security.JaasContext
import org.apache.kafka.common.utils.Time
import org.apache.kafka.common.{Node, TopicPartition}
@ -94,8 +95,9 @@ class ControllerChannelManager(controllerContext: ControllerContext, config: Kaf @@ -94,8 +95,9 @@ class ControllerChannelManager(controllerContext: ControllerContext, config: Kaf
val networkClient = {
val channelBuilder = ChannelBuilders.clientChannelBuilder(
config.interBrokerSecurityProtocol,
LoginType.SERVER,
config.values,
JaasContext.Type.SERVER,
config,
config.interBrokerListenerName,
config.saslMechanismInterBrokerProtocol,
config.saslInterBrokerHandshakeRequestEnable
)

8
core/src/main/scala/kafka/network/SocketServer.scala

@ -34,7 +34,7 @@ import kafka.server.KafkaConfig @@ -34,7 +34,7 @@ import kafka.server.KafkaConfig
import kafka.utils._
import org.apache.kafka.common.errors.InvalidRequestException
import org.apache.kafka.common.metrics._
import org.apache.kafka.common.network.{ChannelBuilders, KafkaChannel, ListenerName, LoginType, Mode, Selectable, Selector => KSelector}
import org.apache.kafka.common.network.{ChannelBuilders, KafkaChannel, ListenerName, Mode, Selectable, Selector => KSelector}
import org.apache.kafka.common.security.auth.KafkaPrincipal
import org.apache.kafka.common.protocol.SecurityProtocol
import org.apache.kafka.common.protocol.types.SchemaException
@ -150,7 +150,7 @@ class SocketServer(val config: KafkaConfig, val metrics: Metrics, val time: Time @@ -150,7 +150,7 @@ class SocketServer(val config: KafkaConfig, val metrics: Metrics, val time: Time
config.connectionsMaxIdleMs,
listenerName,
securityProtocol,
config.values,
config,
metrics,
credentialProvider
)
@ -379,7 +379,7 @@ private[kafka] class Processor(val id: Int, @@ -379,7 +379,7 @@ private[kafka] class Processor(val id: Int,
connectionsMaxIdleMs: Long,
listenerName: ListenerName,
securityProtocol: SecurityProtocol,
channelConfigs: java.util.Map[String, _],
config: KafkaConfig,
metrics: Metrics,
credentialProvider: CredentialProvider) extends AbstractServerThread(connectionQuotas) with KafkaMetricsGroup {
@ -419,7 +419,7 @@ private[kafka] class Processor(val id: Int, @@ -419,7 +419,7 @@ private[kafka] class Processor(val id: Int,
"socket-server",
metricTags,
false,
ChannelBuilders.serverChannelBuilder(securityProtocol, channelConfigs, credentialProvider.credentialCache))
ChannelBuilders.serverChannelBuilder(listenerName, securityProtocol, config, credentialProvider.credentialCache))
override def run() {
startupComplete()

13
core/src/main/scala/kafka/server/KafkaServer.scala

@ -26,7 +26,7 @@ import java.util.concurrent.atomic.{AtomicBoolean, AtomicInteger} @@ -26,7 +26,7 @@ import java.util.concurrent.atomic.{AtomicBoolean, AtomicInteger}
import com.yammer.metrics.core.Gauge
import kafka.admin.AdminUtils
import kafka.api.KAFKA_0_9_0
import kafka.cluster.{Broker, EndPoint}
import kafka.cluster.Broker
import kafka.common.{GenerateBrokerIdException, InconsistentBrokerIdException}
import kafka.controller.{ControllerStats, KafkaController}
import kafka.coordinator.GroupCoordinator
@ -37,13 +37,13 @@ import kafka.security.CredentialProvider @@ -37,13 +37,13 @@ import kafka.security.CredentialProvider
import kafka.security.auth.Authorizer
import kafka.utils._
import org.I0Itec.zkclient.ZkClient
import org.apache.kafka.clients.{ClientRequest, ManualMetadataUpdater, NetworkClient}
import org.apache.kafka.clients.{ManualMetadataUpdater, NetworkClient}
import org.apache.kafka.common.internals.ClusterResourceListeners
import org.apache.kafka.common.metrics.{JmxReporter, Metrics, _}
import org.apache.kafka.common.network._
import org.apache.kafka.common.protocol.{ApiKeys, Errors, SecurityProtocol}
import org.apache.kafka.common.protocol.Errors
import org.apache.kafka.common.requests.{ControlledShutdownRequest, ControlledShutdownResponse}
import org.apache.kafka.common.security.JaasUtils
import org.apache.kafka.common.security.{JaasContext, JaasUtils}
import org.apache.kafka.common.utils.{AppInfoParser, Time}
import org.apache.kafka.common.{ClusterResource, Node}
@ -360,8 +360,9 @@ class KafkaServer(val config: KafkaConfig, time: Time = Time.SYSTEM, threadNameP @@ -360,8 +360,9 @@ class KafkaServer(val config: KafkaConfig, time: Time = Time.SYSTEM, threadNameP
val networkClient = {
val channelBuilder = ChannelBuilders.clientChannelBuilder(
config.interBrokerSecurityProtocol,
LoginType.SERVER,
config.values,
JaasContext.Type.SERVER,
config,
config.interBrokerListenerName,
config.saslMechanismInterBrokerProtocol,
config.saslInterBrokerHandshakeRequestEnable)
val selector = new Selector(

8
core/src/main/scala/kafka/server/ReplicaFetcherThread.scala

@ -27,13 +27,14 @@ import kafka.api.{KAFKA_0_10_0_IV0, KAFKA_0_10_1_IV1, KAFKA_0_10_1_IV2, KAFKA_0_ @@ -27,13 +27,14 @@ import kafka.api.{KAFKA_0_10_0_IV0, KAFKA_0_10_1_IV1, KAFKA_0_10_1_IV2, KAFKA_0_
import kafka.common.KafkaStorageException
import ReplicaFetcherThread._
import org.apache.kafka.clients.{ClientRequest, ClientResponse, ManualMetadataUpdater, NetworkClient}
import org.apache.kafka.common.network.{ChannelBuilders, LoginType, Mode, NetworkReceive, Selectable, Selector}
import org.apache.kafka.common.network.{ChannelBuilders, Mode, NetworkReceive, Selectable, Selector}
import org.apache.kafka.common.requests.{AbstractRequest, FetchResponse, ListOffsetRequest, ListOffsetResponse}
import org.apache.kafka.common.requests.{FetchRequest => JFetchRequest}
import org.apache.kafka.common.{Node, TopicPartition}
import org.apache.kafka.common.metrics.Metrics
import org.apache.kafka.common.protocol.{ApiKeys, Errors, ProtoUtils}
import org.apache.kafka.common.record.MemoryRecords
import org.apache.kafka.common.security.JaasContext
import org.apache.kafka.common.utils.Time
import scala.collection.Map
@ -78,8 +79,9 @@ class ReplicaFetcherThread(name: String, @@ -78,8 +79,9 @@ class ReplicaFetcherThread(name: String,
private val networkClient = {
val channelBuilder = ChannelBuilders.clientChannelBuilder(
brokerConfig.interBrokerSecurityProtocol,
LoginType.SERVER,
brokerConfig.values,
JaasContext.Type.SERVER,
brokerConfig,
brokerConfig.interBrokerListenerName,
brokerConfig.saslMechanismInterBrokerProtocol,
brokerConfig.saslInterBrokerHandshakeRequestEnable
)

6
core/src/test/scala/integration/kafka/api/IntegrationTestHarness.scala

@ -48,6 +48,12 @@ trait IntegrationTestHarness extends KafkaServerTestHarness { @@ -48,6 +48,12 @@ trait IntegrationTestHarness extends KafkaServerTestHarness {
override def generateConfigs() = {
val cfgs = TestUtils.createBrokerConfigs(serverCount, zkConnect, interBrokerSecurityProtocol = Some(securityProtocol),
trustStoreFile = trustStoreFile, saslProperties = serverSaslProperties)
cfgs.foreach { config =>
config.setProperty(KafkaConfig.ListenersProp, s"${listenerName.value}://localhost:${TestUtils.RandomPort}")
config.remove(KafkaConfig.InterBrokerSecurityProtocolProp)
config.setProperty(KafkaConfig.InterBrokerListenerNameProp, listenerName.value)
config.setProperty(KafkaConfig.ListenerSecurityProtocolMapProp, s"${listenerName.value}:${securityProtocol.name}")
}
cfgs.foreach(_.putAll(serverConfig))
cfgs.map(KafkaConfig.fromProps)
}

7
core/src/test/scala/integration/kafka/api/SaslEndToEndAuthorizationTest.scala

@ -37,14 +37,15 @@ abstract class SaslEndToEndAuthorizationTest extends EndToEndAuthorizationTest { @@ -37,14 +37,15 @@ abstract class SaslEndToEndAuthorizationTest extends EndToEndAuthorizationTest {
@Before
override def setUp {
startSasl(Both, kafkaServerSaslMechanisms, Option(kafkaClientSaslMechanism))
startSasl(kafkaServerSaslMechanisms, Option(kafkaClientSaslMechanism), Both)
super.setUp
}
// Use JAAS configuration properties for clients so that dynamic JAAS configuration is also tested by this set of tests
override protected def setJaasConfiguration(mode: SaslSetupMode, serverMechanisms: List[String], clientMechanism: Option[String]) {
override protected def setJaasConfiguration(mode: SaslSetupMode, serverEntryName: String,
serverMechanisms: List[String], clientMechanism: Option[String]) {
// create static config with client login context with credentials for JaasTestUtils 'client2'
super.setJaasConfiguration(mode, kafkaServerSaslMechanisms, clientMechanism)
super.setJaasConfiguration(mode, serverEntryName, kafkaServerSaslMechanisms, clientMechanism)
// set dynamic properties with credentials for JaasTestUtils 'client1'
val clientLoginContext = jaasClientLoginModule(kafkaClientSaslMechanism)
producerConfig.put(SaslConfigs.SASL_JAAS_CONFIG, clientLoginContext)

7
core/src/test/scala/integration/kafka/api/SaslPlainPlaintextConsumerTest.scala

@ -13,13 +13,20 @@ @@ -13,13 +13,20 @@
package kafka.api
import java.io.File
import java.util.Locale
import org.apache.kafka.common.protocol.SecurityProtocol
import kafka.server.KafkaConfig
import kafka.utils.JaasTestUtils
import org.apache.kafka.common.network.ListenerName
class SaslPlainPlaintextConsumerTest extends BaseConsumerTest with SaslTestHarness {
override protected val zkSaslEnabled = true
override protected def listenerName = new ListenerName("CLIENT")
override protected val kafkaClientSaslMechanism = "PLAIN"
override protected val kafkaServerSaslMechanisms = List(kafkaClientSaslMechanism)
override protected val kafkaServerJaasEntryName =
s"${listenerName.value.toLowerCase(Locale.ROOT)}.${JaasTestUtils.KafkaServerContextName}"
this.serverConfig.setProperty(KafkaConfig.ZkEnableSecureAclsProp, "true")
override protected def securityProtocol = SecurityProtocol.SASL_PLAINTEXT
override protected lazy val trustStoreFile = Some(File.createTempFile("truststore", ".jks"))

18
core/src/test/scala/integration/kafka/api/SaslSetup.scala

@ -46,7 +46,8 @@ trait SaslSetup { @@ -46,7 +46,8 @@ trait SaslSetup {
private var serverKeytabFile: Option[File] = null
private var clientKeytabFile: Option[File] = null
def startSasl(mode: SaslSetupMode = Both, kafkaServerSaslMechanisms: List[String], kafkaClientSaslMechanism: Option[String]) {
def startSasl(kafkaServerSaslMechanisms: List[String], kafkaClientSaslMechanism: Option[String],
mode: SaslSetupMode = Both, kafkaServerJaasEntryName: String = JaasTestUtils.KafkaServerContextName) {
// Important if tests leak consumers, producers or brokers
LoginManager.closeAll()
val hasKerberos = mode != ZkSasl && (kafkaClientSaslMechanism == Some("GSSAPI") || kafkaServerSaslMechanisms.contains("GSSAPI"))
@ -63,16 +64,19 @@ trait SaslSetup { @@ -63,16 +64,19 @@ trait SaslSetup {
this.clientKeytabFile = None
this.serverKeytabFile = None
}
setJaasConfiguration(mode, kafkaServerSaslMechanisms, kafkaClientSaslMechanism)
setJaasConfiguration(mode, kafkaServerJaasEntryName, kafkaServerSaslMechanisms, kafkaClientSaslMechanism)
if (mode == Both || mode == ZkSasl)
System.setProperty("zookeeper.authProvider.1", "org.apache.zookeeper.server.auth.SASLAuthenticationProvider")
}
protected def setJaasConfiguration(mode: SaslSetupMode, kafkaServerSaslMechanisms: List[String], kafkaClientSaslMechanism: Option[String]) {
protected def setJaasConfiguration(mode: SaslSetupMode, kafkaServerEntryName: String,
kafkaServerSaslMechanisms: List[String], kafkaClientSaslMechanism: Option[String]) {
val jaasFile = mode match {
case ZkSasl => JaasTestUtils.writeZkFile()
case KafkaSasl => JaasTestUtils.writeKafkaFile(kafkaServerSaslMechanisms, kafkaClientSaslMechanism, serverKeytabFile, clientKeytabFile)
case Both => JaasTestUtils.writeZkAndKafkaFiles(kafkaServerSaslMechanisms, kafkaClientSaslMechanism, serverKeytabFile, clientKeytabFile)
case KafkaSasl => JaasTestUtils.writeKafkaFile(kafkaServerEntryName, kafkaServerSaslMechanisms,
kafkaClientSaslMechanism, serverKeytabFile, clientKeytabFile)
case Both => JaasTestUtils.writeZkAndKafkaFiles(kafkaServerEntryName, kafkaServerSaslMechanisms,
kafkaClientSaslMechanism, serverKeytabFile, clientKeytabFile)
}
// This will cause a reload of the Configuration singleton when `getConfiguration` is called
Configuration.setConfiguration(null)
@ -104,5 +108,7 @@ trait SaslSetup { @@ -104,5 +108,7 @@ trait SaslSetup {
props
}
def jaasClientLoginModule(clientSaslMechanism: String): String = JaasTestUtils.clientLoginModule(clientSaslMechanism, clientKeytabFile)
def jaasClientLoginModule(clientSaslMechanism: String): String =
JaasTestUtils.clientLoginModule(clientSaslMechanism, clientKeytabFile)
}

6
core/src/test/scala/integration/kafka/api/SaslTestHarness.scala

@ -12,20 +12,22 @@ @@ -12,20 +12,22 @@
*/
package kafka.api
import kafka.utils.JaasTestUtils
import kafka.zk.ZooKeeperTestHarness
import org.junit.{After, Before}
trait SaslTestHarness extends ZooKeeperTestHarness with SaslSetup {
protected val zkSaslEnabled: Boolean
protected val kafkaServerJaasEntryName = JaasTestUtils.KafkaServerContextName
protected val kafkaClientSaslMechanism = "GSSAPI"
protected val kafkaServerSaslMechanisms = List(kafkaClientSaslMechanism)
@Before
override def setUp() {
if (zkSaslEnabled)
startSasl(Both, kafkaServerSaslMechanisms, Some(kafkaClientSaslMechanism))
startSasl(kafkaServerSaslMechanisms, Some(kafkaClientSaslMechanism), Both, kafkaServerJaasEntryName)
else
startSasl(KafkaSasl, kafkaServerSaslMechanisms, Some(kafkaClientSaslMechanism))
startSasl(kafkaServerSaslMechanisms, Some(kafkaClientSaslMechanism), KafkaSasl, kafkaServerJaasEntryName)
super.setUp
}

2
core/src/test/scala/integration/kafka/api/SslEndToEndAuthorizationTest.scala

@ -29,7 +29,7 @@ class SslEndToEndAuthorizationTest extends EndToEndAuthorizationTest { @@ -29,7 +29,7 @@ class SslEndToEndAuthorizationTest extends EndToEndAuthorizationTest {
@Before
override def setUp {
startSasl(ZkSasl, List.empty, None)
startSasl(List.empty, None, ZkSasl)
super.setUp
}
}

10
core/src/test/scala/integration/kafka/server/MultipleListenersWithSameSecurityProtocolTest.scala

@ -28,6 +28,7 @@ import kafka.utils.{CoreUtils, TestUtils} @@ -28,6 +28,7 @@ import kafka.utils.{CoreUtils, TestUtils}
import kafka.zk.ZooKeeperTestHarness
import org.apache.kafka.clients.consumer.{ConsumerRecord, KafkaConsumer}
import org.apache.kafka.clients.producer.{KafkaProducer, ProducerRecord}
import org.apache.kafka.common.config.SslConfigs
import org.apache.kafka.common.network.{ListenerName, Mode}
import org.apache.kafka.common.protocol.SecurityProtocol
import org.junit.Assert.assertEquals
@ -61,6 +62,13 @@ class MultipleListenersWithSameSecurityProtocolTest extends ZooKeeperTestHarness @@ -61,6 +62,13 @@ class MultipleListenersWithSameSecurityProtocolTest extends ZooKeeperTestHarness
props.put(KafkaConfig.InterBrokerListenerNameProp, "INTERNAL")
props.putAll(TestUtils.sslConfigs(Mode.SERVER, false, Some(trustStoreFile), s"server$brokerId"))
// set listener-specific configs and set an invalid path for the global config to verify that the overrides work
Seq("SECURE_INTERNAL", "SECURE_EXTERNAL").foreach { listenerName =>
props.put(new ListenerName(listenerName).configPrefix + SslConfigs.SSL_KEYSTORE_LOCATION_CONFIG,
props.get(SslConfigs.SSL_KEYSTORE_LOCATION_CONFIG))
}
props.put(SslConfigs.SSL_KEYSTORE_LOCATION_CONFIG, "invalid/file/path")
servers += TestUtils.createServer(KafkaConfig.fromProps(props))
}
@ -109,7 +117,7 @@ class MultipleListenersWithSameSecurityProtocolTest extends ZooKeeperTestHarness @@ -109,7 +117,7 @@ class MultipleListenersWithSameSecurityProtocolTest extends ZooKeeperTestHarness
producers.foreach { case (listenerName, producer) =>
val producerRecords = (1 to 10).map(i => new ProducerRecord(listenerName.value, s"key$i".getBytes,
s"value$i".getBytes))
producerRecords.map(producer.send(_)).map(_.get(10, TimeUnit.SECONDS))
producerRecords.map(producer.send).map(_.get(10, TimeUnit.SECONDS))
val consumer = consumers(listenerName)
consumer.subscribe(Collections.singleton(listenerName.value))

2
core/src/test/scala/unit/kafka/integration/KafkaServerTestHarness.scala

@ -89,7 +89,7 @@ trait KafkaServerTestHarness extends ZooKeeperTestHarness { @@ -89,7 +89,7 @@ trait KafkaServerTestHarness extends ZooKeeperTestHarness {
configureSecurityBeforeServersStart()
servers = configs.map(TestUtils.createServer(_)).toBuffer
brokerList = TestUtils.getBrokerListStrFromServers(servers, securityProtocol)
brokerList = TestUtils.bootstrapServers(servers, listenerName)
alive = new Array[Boolean](servers.length)
Arrays.fill(alive, true)
}

2
core/src/test/scala/unit/kafka/network/SocketServerTest.scala

@ -328,7 +328,7 @@ class SocketServerTest extends JUnitSuite { @@ -328,7 +328,7 @@ class SocketServerTest extends JUnitSuite {
override def newProcessor(id: Int, connectionQuotas: ConnectionQuotas, listenerName: ListenerName,
protocol: SecurityProtocol): Processor = {
new Processor(id, time, config.socketRequestMaxBytes, requestChannel, connectionQuotas,
config.connectionsMaxIdleMs, listenerName, protocol, config.values, metrics, credentialProvider) {
config.connectionsMaxIdleMs, listenerName, protocol, config, metrics, credentialProvider) {
override protected[network] def sendResponse(response: RequestChannel.Response) {
conn.close()
super.sendResponse(response)

20
core/src/test/scala/unit/kafka/utils/JaasTestUtils.scala

@ -99,7 +99,7 @@ object JaasTestUtils { @@ -99,7 +99,7 @@ object JaasTestUtils {
private val ZkUserPassword = "fpjsecret"
private val ZkModule = "org.apache.zookeeper.server.auth.DigestLoginModule"
private val KafkaServerContextName = "KafkaServer"
val KafkaServerContextName = "KafkaServer"
val KafkaServerPrincipalUnqualifiedName = "kafka"
private val KafkaServerPrincipal = KafkaServerPrincipalUnqualifiedName + "/localhost@EXAMPLE.COM"
private val KafkaClientContextName = "KafkaClient"
@ -128,16 +128,22 @@ object JaasTestUtils { @@ -128,16 +128,22 @@ object JaasTestUtils {
jaasFile.getCanonicalPath
}
def writeKafkaFile(kafkaServerSaslMechanisms: List[String], kafkaClientSaslMechanism: Option[String], serverKeyTabLocation: Option[File], clientKeyTabLocation: Option[File]): String = {
def writeKafkaFile(serverEntryName: String, kafkaServerSaslMechanisms: List[String],
kafkaClientSaslMechanism: Option[String], serverKeyTabLocation: Option[File],
clientKeyTabLocation: Option[File]): String = {
val jaasFile = TestUtils.tempFile()
val kafkaSections = Seq(kafkaServerSection(kafkaServerSaslMechanisms, serverKeyTabLocation), kafkaClientSection(kafkaClientSaslMechanism, clientKeyTabLocation))
val kafkaSections = Seq(kafkaServerSection(serverEntryName, kafkaServerSaslMechanisms, serverKeyTabLocation),
kafkaClientSection(kafkaClientSaslMechanism, clientKeyTabLocation))
writeToFile(jaasFile, kafkaSections)
jaasFile.getCanonicalPath
}
def writeZkAndKafkaFiles(kafkaServerSaslMechanisms: List[String], kafkaClientSaslMechanism: Option[String], serverKeyTabLocation: Option[File], clientKeyTabLocation: Option[File]): String = {
def writeZkAndKafkaFiles(serverEntryName: String, kafkaServerSaslMechanisms: List[String],
kafkaClientSaslMechanism: Option[String], serverKeyTabLocation: Option[File],
clientKeyTabLocation: Option[File]): String = {
val jaasFile = TestUtils.tempFile()
val kafkaSections = Seq(kafkaServerSection(kafkaServerSaslMechanisms, serverKeyTabLocation), kafkaClientSection(kafkaClientSaslMechanism, clientKeyTabLocation))
val kafkaSections = Seq(kafkaServerSection(serverEntryName, kafkaServerSaslMechanisms, serverKeyTabLocation),
kafkaClientSection(kafkaClientSaslMechanism, clientKeyTabLocation))
writeToFile(jaasFile, kafkaSections ++ zkSections)
jaasFile.getCanonicalPath
}
@ -151,7 +157,7 @@ object JaasTestUtils { @@ -151,7 +157,7 @@ object JaasTestUtils {
new JaasSection(ZkClientContextName, Seq(JaasModule(ZkModule, false, Map("username" -> ZkUser, "password" -> ZkUserPassword))))
)
private def kafkaServerSection(mechanisms: List[String], keytabLocation: Option[File]): JaasSection = {
private def kafkaServerSection(contextName: String, mechanisms: List[String], keytabLocation: Option[File]): JaasSection = {
val modules = mechanisms.map {
case "GSSAPI" =>
Krb5LoginModule(
@ -174,7 +180,7 @@ object JaasTestUtils { @@ -174,7 +180,7 @@ object JaasTestUtils {
debug = false).toJaasModule
case mechanism => throw new IllegalArgumentException("Unsupported server mechanism " + mechanism)
}
new JaasSection(KafkaServerContextName, modules)
new JaasSection(contextName, modules)
}
// consider refactoring if more mechanisms are added

2
core/src/test/scala/unit/kafka/utils/TestUtils.scala

@ -166,7 +166,7 @@ object TestUtils extends Logging { @@ -166,7 +166,7 @@ object TestUtils extends Logging {
def bootstrapServers(servers: Seq[KafkaServer], listenerName: ListenerName): String = {
servers.map { s =>
val listener = s.config.advertisedListeners.find(_.listenerName == listenerName).getOrElse(
sys.error(s"Could not find listener with name $listenerName"))
sys.error(s"Could not find listener with name ${listenerName.value}"))
formatAddress(listener.host, s.boundPort(listenerName))
}.mkString(",")
}

2
streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsKafkaClient.java

@ -100,7 +100,7 @@ public class StreamsKafkaClient { @@ -100,7 +100,7 @@ public class StreamsKafkaClient {
reporters.add(new JmxReporter("kafka.admin"));
final Metrics metrics = new Metrics(metricConfig, reporters, time);
final ChannelBuilder channelBuilder = ClientUtils.createChannelBuilder(streamsConfig.values());
final ChannelBuilder channelBuilder = ClientUtils.createChannelBuilder(streamsConfig);
final Selector selector = new Selector(streamsConfig.getLong(StreamsConfig.CONNECTIONS_MAX_IDLE_MS_CONFIG), metrics, time, "kafka-client", channelBuilder);

Loading…
Cancel
Save