Browse Source

KAFKA-5462: Add configuration to build custom SSL principal name (KIP-371)

Author: Manikumar Reddy <manikumar.reddy@gmail.com>

Reviewers: Sriharsha Chintalapani <sriharsha@apache.org>

Closes #5684 from omkreddy/KAFKA-5462-SSL-Name
pull/5781/head
Manikumar Reddy 6 years ago
parent
commit
32e1da570a
  1. 13
      clients/src/main/java/org/apache/kafka/common/config/internals/BrokerSecurityConfigs.java
  2. 6
      clients/src/main/java/org/apache/kafka/common/network/ChannelBuilders.java
  3. 2
      clients/src/main/java/org/apache/kafka/common/network/PlaintextChannelBuilder.java
  4. 14
      clients/src/main/java/org/apache/kafka/common/network/SslChannelBuilder.java
  5. 31
      clients/src/main/java/org/apache/kafka/common/security/authenticator/DefaultKafkaPrincipalBuilder.java
  6. 2
      clients/src/main/java/org/apache/kafka/common/security/authenticator/SaslServerAuthenticator.java
  7. 197
      clients/src/main/java/org/apache/kafka/common/security/ssl/SslPrincipalMapper.java
  8. 4
      clients/src/test/java/org/apache/kafka/common/network/ChannelBuildersTest.java
  9. 67
      clients/src/test/java/org/apache/kafka/common/security/auth/DefaultKafkaPrincipalBuilderTest.java
  10. 85
      clients/src/test/java/org/apache/kafka/common/security/ssl/SslPrincipalMapperTest.java
  11. 4
      core/src/main/scala/kafka/server/KafkaConfig.scala
  12. 1
      core/src/test/scala/unit/kafka/server/KafkaConfigTest.scala
  13. 31
      docs/security.html

13
clients/src/main/java/org/apache/kafka/common/config/internals/BrokerSecurityConfigs.java

@ -34,17 +34,28 @@ public class BrokerSecurityConfigs {
public static final String SSL_CLIENT_AUTH_CONFIG = "ssl.client.auth"; public static final String SSL_CLIENT_AUTH_CONFIG = "ssl.client.auth";
public static final String SASL_ENABLED_MECHANISMS_CONFIG = "sasl.enabled.mechanisms"; public static final String SASL_ENABLED_MECHANISMS_CONFIG = "sasl.enabled.mechanisms";
public static final String SASL_SERVER_CALLBACK_HANDLER_CLASS = "sasl.server.callback.handler.class"; public static final String SASL_SERVER_CALLBACK_HANDLER_CLASS = "sasl.server.callback.handler.class";
public static final String SSL_PRINCIPAL_MAPPING_RULES_CONFIG = "ssl.principal.mapping.rules";
public static final String PRINCIPAL_BUILDER_CLASS_DOC = "The fully qualified name of a class that implements the " + public static final String PRINCIPAL_BUILDER_CLASS_DOC = "The fully qualified name of a class that implements the " +
"KafkaPrincipalBuilder interface, which is used to build the KafkaPrincipal object used during " + "KafkaPrincipalBuilder interface, which is used to build the KafkaPrincipal object used during " +
"authorization. This config also supports the deprecated PrincipalBuilder interface which was previously " + "authorization. This config also supports the deprecated PrincipalBuilder interface which was previously " +
"used for client authentication over SSL. If no principal builder is defined, the default behavior depends " + "used for client authentication over SSL. If no principal builder is defined, the default behavior depends " +
"on the security protocol in use. For SSL authentication, the principal name will be the distinguished " + "on the security protocol in use. For SSL authentication, the principal will be derived using the" +
" rules defined by <code>" + SSL_PRINCIPAL_MAPPING_RULES_CONFIG + "</code> applied on the distinguished " +
"name from the client certificate if one is provided; otherwise, if client authentication is not required, " + "name from the client certificate if one is provided; otherwise, if client authentication is not required, " +
"the principal name will be ANONYMOUS. For SASL authentication, the principal will be derived using the " + "the principal name will be ANONYMOUS. For SASL authentication, the principal will be derived using the " +
"rules defined by <code>" + SASL_KERBEROS_PRINCIPAL_TO_LOCAL_RULES_CONFIG + "</code> if GSSAPI is in use, " + "rules defined by <code>" + SASL_KERBEROS_PRINCIPAL_TO_LOCAL_RULES_CONFIG + "</code> if GSSAPI is in use, " +
"and the SASL authentication ID for other mechanisms. For PLAINTEXT, the principal will be ANONYMOUS."; "and the SASL authentication ID for other mechanisms. For PLAINTEXT, the principal will be ANONYMOUS.";
public static final String SSL_PRINCIPAL_MAPPING_RULES_DOC = "A list of rules for mapping from distinguished name" +
" from the client certificate to short name. The rules are evaluated in order and the first rule that matches" +
" a principal name is used to map it to a short name. Any later rules in the list are ignored. By default," +
" distinguished name of the X.500 certificate will be the principal. For more details on the format please" +
" see <a href=\"#security_authz\"> security authorization and acls</a>. Note that this configuration is ignored" +
" if an extension of KafkaPrincipalBuilder is provided by the <code>" + PRINCIPAL_BUILDER_CLASS_CONFIG + "</code>" +
" configuration.";
public static final List<String> DEFAULT_SSL_PRINCIPAL_MAPPING_RULES = Collections.singletonList("DEFAULT");
public static final String SASL_KERBEROS_PRINCIPAL_TO_LOCAL_RULES_DOC = "A list of rules for mapping from principal " + public static final String SASL_KERBEROS_PRINCIPAL_TO_LOCAL_RULES_DOC = "A list of rules for mapping from principal " +
"names to short names (typically operating system usernames). The rules are evaluated in order and the " + "names to short names (typically operating system usernames). The rules are evaluated in order and the " +
"first rule that matches a principal name is used to map it to a short name. Any later rules in the list are " + "first rule that matches a principal name is used to map it to a short name. Any later rules in the list are " +

6
clients/src/main/java/org/apache/kafka/common/network/ChannelBuilders.java

@ -26,6 +26,7 @@ import org.apache.kafka.common.security.authenticator.DefaultKafkaPrincipalBuild
import org.apache.kafka.common.security.auth.KafkaPrincipalBuilder; import org.apache.kafka.common.security.auth.KafkaPrincipalBuilder;
import org.apache.kafka.common.security.authenticator.CredentialCache; import org.apache.kafka.common.security.authenticator.CredentialCache;
import org.apache.kafka.common.security.kerberos.KerberosShortNamer; import org.apache.kafka.common.security.kerberos.KerberosShortNamer;
import org.apache.kafka.common.security.ssl.SslPrincipalMapper;
import org.apache.kafka.common.security.token.delegation.internals.DelegationTokenCache; import org.apache.kafka.common.security.token.delegation.internals.DelegationTokenCache;
import org.apache.kafka.common.utils.Utils; import org.apache.kafka.common.utils.Utils;
@ -163,12 +164,13 @@ public class ChannelBuilders {
public static KafkaPrincipalBuilder createPrincipalBuilder(Map<String, ?> configs, public static KafkaPrincipalBuilder createPrincipalBuilder(Map<String, ?> configs,
TransportLayer transportLayer, TransportLayer transportLayer,
Authenticator authenticator, Authenticator authenticator,
KerberosShortNamer kerberosShortNamer) { KerberosShortNamer kerberosShortNamer,
SslPrincipalMapper sslPrincipalMapper) {
Class<?> principalBuilderClass = (Class<?>) configs.get(BrokerSecurityConfigs.PRINCIPAL_BUILDER_CLASS_CONFIG); Class<?> principalBuilderClass = (Class<?>) configs.get(BrokerSecurityConfigs.PRINCIPAL_BUILDER_CLASS_CONFIG);
final KafkaPrincipalBuilder builder; final KafkaPrincipalBuilder builder;
if (principalBuilderClass == null || principalBuilderClass == DefaultKafkaPrincipalBuilder.class) { if (principalBuilderClass == null || principalBuilderClass == DefaultKafkaPrincipalBuilder.class) {
builder = new DefaultKafkaPrincipalBuilder(kerberosShortNamer); builder = new DefaultKafkaPrincipalBuilder(kerberosShortNamer, sslPrincipalMapper);
} else if (KafkaPrincipalBuilder.class.isAssignableFrom(principalBuilderClass)) { } else if (KafkaPrincipalBuilder.class.isAssignableFrom(principalBuilderClass)) {
builder = (KafkaPrincipalBuilder) Utils.newInstance(principalBuilderClass); builder = (KafkaPrincipalBuilder) Utils.newInstance(principalBuilderClass);
} else if (org.apache.kafka.common.security.auth.PrincipalBuilder.class.isAssignableFrom(principalBuilderClass)) { } else if (org.apache.kafka.common.security.auth.PrincipalBuilder.class.isAssignableFrom(principalBuilderClass)) {

2
clients/src/main/java/org/apache/kafka/common/network/PlaintextChannelBuilder.java

@ -70,7 +70,7 @@ public class PlaintextChannelBuilder implements ChannelBuilder {
private PlaintextAuthenticator(Map<String, ?> configs, PlaintextTransportLayer transportLayer, ListenerName listenerName) { private PlaintextAuthenticator(Map<String, ?> configs, PlaintextTransportLayer transportLayer, ListenerName listenerName) {
this.transportLayer = transportLayer; this.transportLayer = transportLayer;
this.principalBuilder = ChannelBuilders.createPrincipalBuilder(configs, transportLayer, this, null); this.principalBuilder = ChannelBuilders.createPrincipalBuilder(configs, transportLayer, this, null, null);
this.listenerName = listenerName; this.listenerName = listenerName;
} }

14
clients/src/main/java/org/apache/kafka/common/network/SslChannelBuilder.java

@ -18,10 +18,12 @@ package org.apache.kafka.common.network;
import org.apache.kafka.common.KafkaException; import org.apache.kafka.common.KafkaException;
import org.apache.kafka.common.config.SslConfigs; import org.apache.kafka.common.config.SslConfigs;
import org.apache.kafka.common.config.internals.BrokerSecurityConfigs;
import org.apache.kafka.common.memory.MemoryPool; import org.apache.kafka.common.memory.MemoryPool;
import org.apache.kafka.common.security.auth.KafkaPrincipal; import org.apache.kafka.common.security.auth.KafkaPrincipal;
import org.apache.kafka.common.security.auth.KafkaPrincipalBuilder; import org.apache.kafka.common.security.auth.KafkaPrincipalBuilder;
import org.apache.kafka.common.security.auth.SslAuthenticationContext; import org.apache.kafka.common.security.auth.SslAuthenticationContext;
import org.apache.kafka.common.security.ssl.SslPrincipalMapper;
import org.apache.kafka.common.security.ssl.SslFactory; import org.apache.kafka.common.security.ssl.SslFactory;
import org.apache.kafka.common.utils.Utils; import org.apache.kafka.common.utils.Utils;
import org.slf4j.Logger; import org.slf4j.Logger;
@ -33,6 +35,7 @@ import java.net.InetAddress;
import java.net.InetSocketAddress; import java.net.InetSocketAddress;
import java.nio.channels.SelectionKey; import java.nio.channels.SelectionKey;
import java.nio.channels.SocketChannel; import java.nio.channels.SocketChannel;
import java.util.List;
import java.util.Map; import java.util.Map;
import java.util.Set; import java.util.Set;
@ -44,6 +47,7 @@ public class SslChannelBuilder implements ChannelBuilder, ListenerReconfigurable
private SslFactory sslFactory; private SslFactory sslFactory;
private Mode mode; private Mode mode;
private Map<String, ?> configs; private Map<String, ?> configs;
private SslPrincipalMapper sslPrincipalMapper;
/** /**
* Constructs a SSL channel builder. ListenerName is provided only * Constructs a SSL channel builder. ListenerName is provided only
@ -58,6 +62,10 @@ public class SslChannelBuilder implements ChannelBuilder, ListenerReconfigurable
public void configure(Map<String, ?> configs) throws KafkaException { public void configure(Map<String, ?> configs) throws KafkaException {
try { try {
this.configs = configs; this.configs = configs;
@SuppressWarnings("unchecked")
List<String> sslPrincipalMappingRules = (List<String>) configs.get(BrokerSecurityConfigs.SSL_PRINCIPAL_MAPPING_RULES_CONFIG);
if (sslPrincipalMappingRules != null)
sslPrincipalMapper = SslPrincipalMapper.fromRules(sslPrincipalMappingRules);
this.sslFactory = new SslFactory(mode, null, isInterBrokerListener); this.sslFactory = new SslFactory(mode, null, isInterBrokerListener);
this.sslFactory.configure(this.configs); this.sslFactory.configure(this.configs);
} catch (Exception e) { } catch (Exception e) {
@ -89,7 +97,7 @@ public class SslChannelBuilder implements ChannelBuilder, ListenerReconfigurable
public KafkaChannel buildChannel(String id, SelectionKey key, int maxReceiveSize, MemoryPool memoryPool) throws KafkaException { public KafkaChannel buildChannel(String id, SelectionKey key, int maxReceiveSize, MemoryPool memoryPool) throws KafkaException {
try { try {
SslTransportLayer transportLayer = buildTransportLayer(sslFactory, id, key, peerHost(key)); SslTransportLayer transportLayer = buildTransportLayer(sslFactory, id, key, peerHost(key));
Authenticator authenticator = new SslAuthenticator(configs, transportLayer, listenerName); Authenticator authenticator = new SslAuthenticator(configs, transportLayer, listenerName, sslPrincipalMapper);
return new KafkaChannel(id, transportLayer, authenticator, maxReceiveSize, return new KafkaChannel(id, transportLayer, authenticator, maxReceiveSize,
memoryPool != null ? memoryPool : MemoryPool.NONE); memoryPool != null ? memoryPool : MemoryPool.NONE);
} catch (Exception e) { } catch (Exception e) {
@ -154,9 +162,9 @@ public class SslChannelBuilder implements ChannelBuilder, ListenerReconfigurable
private final KafkaPrincipalBuilder principalBuilder; private final KafkaPrincipalBuilder principalBuilder;
private final ListenerName listenerName; private final ListenerName listenerName;
private SslAuthenticator(Map<String, ?> configs, SslTransportLayer transportLayer, ListenerName listenerName) { private SslAuthenticator(Map<String, ?> configs, SslTransportLayer transportLayer, ListenerName listenerName, SslPrincipalMapper sslPrincipalMapper) {
this.transportLayer = transportLayer; this.transportLayer = transportLayer;
this.principalBuilder = ChannelBuilders.createPrincipalBuilder(configs, transportLayer, this, null); this.principalBuilder = ChannelBuilders.createPrincipalBuilder(configs, transportLayer, this, null, sslPrincipalMapper);
this.listenerName = listenerName; this.listenerName = listenerName;
} }
/** /**

31
clients/src/main/java/org/apache/kafka/common/security/authenticator/DefaultKafkaPrincipalBuilder.java

@ -16,6 +16,7 @@
*/ */
package org.apache.kafka.common.security.authenticator; package org.apache.kafka.common.security.authenticator;
import javax.security.auth.x500.X500Principal;
import org.apache.kafka.common.KafkaException; import org.apache.kafka.common.KafkaException;
import org.apache.kafka.common.config.SaslConfigs; import org.apache.kafka.common.config.SaslConfigs;
import org.apache.kafka.common.network.Authenticator; import org.apache.kafka.common.network.Authenticator;
@ -32,6 +33,8 @@ import org.apache.kafka.common.security.kerberos.KerberosShortNamer;
import javax.net.ssl.SSLPeerUnverifiedException; import javax.net.ssl.SSLPeerUnverifiedException;
import javax.net.ssl.SSLSession; import javax.net.ssl.SSLSession;
import javax.security.sasl.SaslServer; import javax.security.sasl.SaslServer;
import org.apache.kafka.common.security.ssl.SslPrincipalMapper;
import java.io.Closeable; import java.io.Closeable;
import java.io.IOException; import java.io.IOException;
import java.security.Principal; import java.security.Principal;
@ -55,6 +58,7 @@ public class DefaultKafkaPrincipalBuilder implements KafkaPrincipalBuilder, Clos
private final Authenticator authenticator; private final Authenticator authenticator;
private final TransportLayer transportLayer; private final TransportLayer transportLayer;
private final KerberosShortNamer kerberosShortNamer; private final KerberosShortNamer kerberosShortNamer;
private final SslPrincipalMapper sslPrincipalMapper;
/** /**
* Construct a new instance which wraps an instance of the older {@link org.apache.kafka.common.security.auth.PrincipalBuilder}. * Construct a new instance which wraps an instance of the older {@link org.apache.kafka.common.security.auth.PrincipalBuilder}.
@ -73,27 +77,31 @@ public class DefaultKafkaPrincipalBuilder implements KafkaPrincipalBuilder, Clos
requireNonNull(authenticator), requireNonNull(authenticator),
requireNonNull(transportLayer), requireNonNull(transportLayer),
requireNonNull(oldPrincipalBuilder), requireNonNull(oldPrincipalBuilder),
kerberosShortNamer); kerberosShortNamer,
null);
} }
@SuppressWarnings("deprecation") @SuppressWarnings("deprecation")
private DefaultKafkaPrincipalBuilder(Authenticator authenticator, private DefaultKafkaPrincipalBuilder(Authenticator authenticator,
TransportLayer transportLayer, TransportLayer transportLayer,
org.apache.kafka.common.security.auth.PrincipalBuilder oldPrincipalBuilder, org.apache.kafka.common.security.auth.PrincipalBuilder oldPrincipalBuilder,
KerberosShortNamer kerberosShortNamer) { KerberosShortNamer kerberosShortNamer,
SslPrincipalMapper sslPrincipalMapper) {
this.authenticator = authenticator; this.authenticator = authenticator;
this.transportLayer = transportLayer; this.transportLayer = transportLayer;
this.oldPrincipalBuilder = oldPrincipalBuilder; this.oldPrincipalBuilder = oldPrincipalBuilder;
this.kerberosShortNamer = kerberosShortNamer; this.kerberosShortNamer = kerberosShortNamer;
this.sslPrincipalMapper = sslPrincipalMapper;
} }
/** /**
* Construct a new instance. * Construct a new instance.
* *
* @param kerberosShortNamer Kerberos name rewrite rules or null if none have been configured * @param kerberosShortNamer Kerberos name rewrite rules or null if none have been configured
* @param sslPrincipalMapper SSL Principal mapper or null if none have been configured
*/ */
public DefaultKafkaPrincipalBuilder(KerberosShortNamer kerberosShortNamer) { public DefaultKafkaPrincipalBuilder(KerberosShortNamer kerberosShortNamer, SslPrincipalMapper sslPrincipalMapper) {
this(null, null, null, kerberosShortNamer); this(null, null, null, kerberosShortNamer, sslPrincipalMapper);
} }
@Override @Override
@ -110,7 +118,7 @@ public class DefaultKafkaPrincipalBuilder implements KafkaPrincipalBuilder, Clos
return convertToKafkaPrincipal(oldPrincipalBuilder.buildPrincipal(transportLayer, authenticator)); return convertToKafkaPrincipal(oldPrincipalBuilder.buildPrincipal(transportLayer, authenticator));
try { try {
return convertToKafkaPrincipal(sslSession.getPeerPrincipal()); return applySslPrincipalMapper(sslSession.getPeerPrincipal());
} catch (SSLPeerUnverifiedException se) { } catch (SSLPeerUnverifiedException se) {
return KafkaPrincipal.ANONYMOUS; return KafkaPrincipal.ANONYMOUS;
} }
@ -136,6 +144,19 @@ public class DefaultKafkaPrincipalBuilder implements KafkaPrincipalBuilder, Clos
} }
} }
private KafkaPrincipal applySslPrincipalMapper(Principal principal) {
try {
if (!(principal instanceof X500Principal) || principal == KafkaPrincipal.ANONYMOUS) {
return new KafkaPrincipal(KafkaPrincipal.USER_TYPE, principal.getName());
} else {
return new KafkaPrincipal(KafkaPrincipal.USER_TYPE, sslPrincipalMapper.getName(principal.getName()));
}
} catch (IOException e) {
throw new KafkaException("Failed to map name for '" + principal.getName() +
"' based on SSL principal mapping rules.", e);
}
}
private KafkaPrincipal convertToKafkaPrincipal(Principal principal) { private KafkaPrincipal convertToKafkaPrincipal(Principal principal) {
return new KafkaPrincipal(KafkaPrincipal.USER_TYPE, principal.getName()); return new KafkaPrincipal(KafkaPrincipal.USER_TYPE, principal.getName());
} }

2
clients/src/main/java/org/apache/kafka/common/security/authenticator/SaslServerAuthenticator.java

@ -153,7 +153,7 @@ public class SaslServerAuthenticator implements Authenticator {
// Note that the old principal builder does not support SASL, so we do not need to pass the // Note that the old principal builder does not support SASL, so we do not need to pass the
// authenticator or the transport layer // authenticator or the transport layer
this.principalBuilder = ChannelBuilders.createPrincipalBuilder(configs, null, null, kerberosNameParser); this.principalBuilder = ChannelBuilders.createPrincipalBuilder(configs, null, null, kerberosNameParser, null);
} }
private void createSaslServer(String mechanism) throws IOException { private void createSaslServer(String mechanism) throws IOException {

197
clients/src/main/java/org/apache/kafka/common/security/ssl/SslPrincipalMapper.java

@ -0,0 +1,197 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.common.security.ssl;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Locale;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
public class SslPrincipalMapper {
private static final Pattern RULE_PARSER = Pattern.compile("((DEFAULT)|(RULE:(([^/]*)/([^/]*))/([LU])?))");
private final List<Rule> rules;
public SslPrincipalMapper(List<Rule> sslPrincipalMappingRules) {
this.rules = sslPrincipalMappingRules;
}
public static SslPrincipalMapper fromRules(List<String> sslPrincipalMappingRules) {
List<String> rules = sslPrincipalMappingRules == null ? Collections.singletonList("DEFAULT") : sslPrincipalMappingRules;
return new SslPrincipalMapper(parseRules(rules));
}
private static List<Rule> parseRules(List<String> rules) {
List<Rule> result = new ArrayList<>();
for (String rule : rules) {
Matcher matcher = RULE_PARSER.matcher(rule);
if (!matcher.lookingAt()) {
throw new IllegalArgumentException("Invalid rule: " + rule);
}
if (rule.length() != matcher.end()) {
throw new IllegalArgumentException("Invalid rule: `" + rule + "`, unmatched substring: `" + rule.substring(matcher.end()) + "`");
}
if (matcher.group(2) != null) {
result.add(new Rule());
} else {
result.add(new Rule(matcher.group(5),
matcher.group(6),
"L".equals(matcher.group(7)),
"U".equals(matcher.group(7))));
}
}
return result;
}
public String getName(String distinguishedName) throws IOException {
for (Rule r : rules) {
String principalName = r.apply(distinguishedName);
if (principalName != null) {
return principalName;
}
}
throw new NoMatchingRule("No rules apply to " + distinguishedName + ", rules " + rules);
}
@Override
public String toString() {
return "SslPrincipalMapper(rules = " + rules + ")";
}
public static class NoMatchingRule extends IOException {
NoMatchingRule(String msg) {
super(msg);
}
}
private static class Rule {
private static final Pattern BACK_REFERENCE_PATTERN = Pattern.compile("\\$(\\d+)");
private final boolean isDefault;
private final Pattern pattern;
private final String replacement;
private final boolean toLowerCase;
private final boolean toUpperCase;
Rule() {
isDefault = true;
pattern = null;
replacement = null;
toLowerCase = false;
toUpperCase = false;
}
Rule(String pattern, String replacement, boolean toLowerCase, boolean toUpperCase) {
isDefault = false;
this.pattern = pattern == null ? null : Pattern.compile(pattern);
this.replacement = replacement;
this.toLowerCase = toLowerCase;
this.toUpperCase = toUpperCase;
}
String apply(String distinguishedName) {
if (isDefault) {
return distinguishedName;
}
String result = null;
final Matcher m = pattern.matcher(distinguishedName);
if (m.matches()) {
result = distinguishedName.replaceAll(pattern.pattern(), escapeLiteralBackReferences(replacement, m.groupCount()));
}
if (toLowerCase && result != null) {
result = result.toLowerCase(Locale.ENGLISH);
} else if (toUpperCase & result != null) {
result = result.toUpperCase(Locale.ENGLISH);
}
return result;
}
//If we find a back reference that is not valid, then we will treat it as a literal string. For example, if we have 3 capturing
//groups and the Replacement Value has the value is "$1@$4", then we want to treat the $4 as a literal "$4", rather
//than attempting to use it as a back reference.
//This method was taken from Apache Nifi project : org.apache.nifi.authorization.util.IdentityMappingUtil
private String escapeLiteralBackReferences(final String unescaped, final int numCapturingGroups) {
if (numCapturingGroups == 0) {
return unescaped;
}
String value = unescaped;
final Matcher backRefMatcher = BACK_REFERENCE_PATTERN.matcher(value);
while (backRefMatcher.find()) {
final String backRefNum = backRefMatcher.group(1);
if (backRefNum.startsWith("0")) {
continue;
}
final int originalBackRefIndex = Integer.parseInt(backRefNum);
int backRefIndex = originalBackRefIndex;
// if we have a replacement value like $123, and we have less than 123 capturing groups, then
// we want to truncate the 3 and use capturing group 12; if we have less than 12 capturing groups,
// then we want to truncate the 2 and use capturing group 1; if we don't have a capturing group then
// we want to truncate the 1 and get 0.
while (backRefIndex > numCapturingGroups && backRefIndex >= 10) {
backRefIndex /= 10;
}
if (backRefIndex > numCapturingGroups) {
final StringBuilder sb = new StringBuilder(value.length() + 1);
final int groupStart = backRefMatcher.start(1);
sb.append(value.substring(0, groupStart - 1));
sb.append("\\");
sb.append(value.substring(groupStart - 1));
value = sb.toString();
}
}
return value;
}
@Override
public String toString() {
StringBuilder buf = new StringBuilder();
if (isDefault) {
buf.append("DEFAULT");
} else {
buf.append("RULE:");
if (pattern != null) {
buf.append(pattern);
}
if (replacement != null) {
buf.append("/");
buf.append(replacement);
}
if (toLowerCase) {
buf.append("/L");
} else if (toUpperCase) {
buf.append("/U");
}
}
return buf.toString();
}
}
}

4
clients/src/test/java/org/apache/kafka/common/network/ChannelBuildersTest.java

@ -45,7 +45,7 @@ public class ChannelBuildersTest {
Map<String, Object> configs = new HashMap<>(); Map<String, Object> configs = new HashMap<>();
configs.put(BrokerSecurityConfigs.PRINCIPAL_BUILDER_CLASS_CONFIG, OldPrincipalBuilder.class); configs.put(BrokerSecurityConfigs.PRINCIPAL_BUILDER_CLASS_CONFIG, OldPrincipalBuilder.class);
KafkaPrincipalBuilder builder = ChannelBuilders.createPrincipalBuilder(configs, transportLayer, authenticator, null); KafkaPrincipalBuilder builder = ChannelBuilders.createPrincipalBuilder(configs, transportLayer, authenticator, null, null);
// test old principal builder is properly configured and delegated to // test old principal builder is properly configured and delegated to
assertTrue(OldPrincipalBuilder.configured); assertTrue(OldPrincipalBuilder.configured);
@ -60,7 +60,7 @@ public class ChannelBuildersTest {
public void testCreateConfigurableKafkaPrincipalBuilder() { public void testCreateConfigurableKafkaPrincipalBuilder() {
Map<String, Object> configs = new HashMap<>(); Map<String, Object> configs = new HashMap<>();
configs.put(BrokerSecurityConfigs.PRINCIPAL_BUILDER_CLASS_CONFIG, ConfigurableKafkaPrincipalBuilder.class); configs.put(BrokerSecurityConfigs.PRINCIPAL_BUILDER_CLASS_CONFIG, ConfigurableKafkaPrincipalBuilder.class);
KafkaPrincipalBuilder builder = ChannelBuilders.createPrincipalBuilder(configs, null, null, null); KafkaPrincipalBuilder builder = ChannelBuilders.createPrincipalBuilder(configs, null, null, null, null);
assertTrue(builder instanceof ConfigurableKafkaPrincipalBuilder); assertTrue(builder instanceof ConfigurableKafkaPrincipalBuilder);
assertTrue(((ConfigurableKafkaPrincipalBuilder) builder).configured); assertTrue(((ConfigurableKafkaPrincipalBuilder) builder).configured);
} }

67
clients/src/test/java/org/apache/kafka/common/security/auth/DefaultKafkaPrincipalBuilderTest.java

@ -16,23 +16,28 @@
*/ */
package org.apache.kafka.common.security.auth; package org.apache.kafka.common.security.auth;
import javax.security.auth.x500.X500Principal;
import org.apache.kafka.common.config.SaslConfigs; import org.apache.kafka.common.config.SaslConfigs;
import org.apache.kafka.common.network.Authenticator; import org.apache.kafka.common.network.Authenticator;
import org.apache.kafka.common.network.TransportLayer; import org.apache.kafka.common.network.TransportLayer;
import org.apache.kafka.common.security.authenticator.DefaultKafkaPrincipalBuilder; import org.apache.kafka.common.security.authenticator.DefaultKafkaPrincipalBuilder;
import org.apache.kafka.common.security.kerberos.KerberosShortNamer; import org.apache.kafka.common.security.kerberos.KerberosShortNamer;
import org.apache.kafka.common.security.scram.internals.ScramMechanism; import org.apache.kafka.common.security.scram.internals.ScramMechanism;
import org.apache.kafka.common.security.ssl.SslPrincipalMapper;
import org.junit.Test; import org.junit.Test;
import javax.net.ssl.SSLSession; import javax.net.ssl.SSLSession;
import javax.security.sasl.SaslServer; import javax.security.sasl.SaslServer;
import java.net.InetAddress; import java.net.InetAddress;
import java.security.Principal; import java.security.Principal;
import java.util.Arrays;
import java.util.List;
import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertEquals;
import static org.mockito.ArgumentMatchers.any; import static org.mockito.ArgumentMatchers.any;
import static org.mockito.Mockito.atLeastOnce; import static org.mockito.Mockito.atLeastOnce;
import static org.mockito.Mockito.mock; import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify; import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when; import static org.mockito.Mockito.when;
@ -63,7 +68,7 @@ public class DefaultKafkaPrincipalBuilderTest {
@Test @Test
public void testReturnAnonymousPrincipalForPlaintext() throws Exception { public void testReturnAnonymousPrincipalForPlaintext() throws Exception {
try (DefaultKafkaPrincipalBuilder builder = new DefaultKafkaPrincipalBuilder(null)) { try (DefaultKafkaPrincipalBuilder builder = new DefaultKafkaPrincipalBuilder(null, null)) {
assertEquals(KafkaPrincipal.ANONYMOUS, builder.build( assertEquals(KafkaPrincipal.ANONYMOUS, builder.build(
new PlaintextAuthenticationContext(InetAddress.getLocalHost(), SecurityProtocol.PLAINTEXT.name()))); new PlaintextAuthenticationContext(InetAddress.getLocalHost(), SecurityProtocol.PLAINTEXT.name())));
} }
@ -100,7 +105,7 @@ public class DefaultKafkaPrincipalBuilderTest {
when(session.getPeerPrincipal()).thenReturn(new DummyPrincipal("foo")); when(session.getPeerPrincipal()).thenReturn(new DummyPrincipal("foo"));
DefaultKafkaPrincipalBuilder builder = new DefaultKafkaPrincipalBuilder(null); DefaultKafkaPrincipalBuilder builder = new DefaultKafkaPrincipalBuilder(null, null);
KafkaPrincipal principal = builder.build( KafkaPrincipal principal = builder.build(
new SslAuthenticationContext(session, InetAddress.getLocalHost(), SecurityProtocol.PLAINTEXT.name())); new SslAuthenticationContext(session, InetAddress.getLocalHost(), SecurityProtocol.PLAINTEXT.name()));
@ -112,6 +117,60 @@ public class DefaultKafkaPrincipalBuilderTest {
verify(session, atLeastOnce()).getPeerPrincipal(); verify(session, atLeastOnce()).getPeerPrincipal();
} }
@Test
public void testPrincipalIfSSLPeerIsNotAuthenticated() throws Exception {
SSLSession session = mock(SSLSession.class);
when(session.getPeerPrincipal()).thenReturn(KafkaPrincipal.ANONYMOUS);
DefaultKafkaPrincipalBuilder builder = new DefaultKafkaPrincipalBuilder(null, null);
KafkaPrincipal principal = builder.build(
new SslAuthenticationContext(session, InetAddress.getLocalHost(), SecurityProtocol.PLAINTEXT.name()));
assertEquals(KafkaPrincipal.ANONYMOUS, principal);
builder.close();
verify(session, atLeastOnce()).getPeerPrincipal();
}
@Test
public void testPrincipalWithSslPrincipalMapper() throws Exception {
SSLSession session = mock(SSLSession.class);
when(session.getPeerPrincipal()).thenReturn(new X500Principal("CN=Duke, OU=ServiceUsers, O=Org, C=US"))
.thenReturn(new X500Principal("CN=Duke, OU=SME, O=mycp, L=Fulton, ST=MD, C=US"))
.thenReturn(new X500Principal("CN=duke, OU=JavaSoft, O=Sun Microsystems"))
.thenReturn(new X500Principal("OU=JavaSoft, O=Sun Microsystems, C=US"));
List<String> rules = Arrays.asList(
"RULE:^CN=(.*),OU=ServiceUsers.*$/$1/L",
"RULE:^CN=(.*),OU=(.*),O=(.*),L=(.*),ST=(.*),C=(.*)$/$1@$2/L",
"RULE:^.*[Cc][Nn]=([a-zA-Z0-9.]*).*$/$1/U",
"DEFAULT"
);
SslPrincipalMapper mapper = SslPrincipalMapper.fromRules(rules);
DefaultKafkaPrincipalBuilder builder = new DefaultKafkaPrincipalBuilder(null, mapper);
SslAuthenticationContext sslContext = new SslAuthenticationContext(session, InetAddress.getLocalHost(), SecurityProtocol.PLAINTEXT.name());
KafkaPrincipal principal = builder.build(sslContext);
assertEquals("duke", principal.getName());
principal = builder.build(sslContext);
assertEquals("duke@sme", principal.getName());
principal = builder.build(sslContext);
assertEquals("DUKE", principal.getName());
principal = builder.build(sslContext);
assertEquals("OU=JavaSoft,O=Sun Microsystems,C=US", principal.getName());
builder.close();
verify(session, times(4)).getPeerPrincipal();
}
@Test @Test
public void testPrincipalBuilderScram() throws Exception { public void testPrincipalBuilderScram() throws Exception {
SaslServer server = mock(SaslServer.class); SaslServer server = mock(SaslServer.class);
@ -119,7 +178,7 @@ public class DefaultKafkaPrincipalBuilderTest {
when(server.getMechanismName()).thenReturn(ScramMechanism.SCRAM_SHA_256.mechanismName()); when(server.getMechanismName()).thenReturn(ScramMechanism.SCRAM_SHA_256.mechanismName());
when(server.getAuthorizationID()).thenReturn("foo"); when(server.getAuthorizationID()).thenReturn("foo");
DefaultKafkaPrincipalBuilder builder = new DefaultKafkaPrincipalBuilder(null); DefaultKafkaPrincipalBuilder builder = new DefaultKafkaPrincipalBuilder(null, null);
KafkaPrincipal principal = builder.build(new SaslAuthenticationContext(server, KafkaPrincipal principal = builder.build(new SaslAuthenticationContext(server,
SecurityProtocol.SASL_PLAINTEXT, InetAddress.getLocalHost(), SecurityProtocol.SASL_PLAINTEXT.name())); SecurityProtocol.SASL_PLAINTEXT, InetAddress.getLocalHost(), SecurityProtocol.SASL_PLAINTEXT.name()));
@ -141,7 +200,7 @@ public class DefaultKafkaPrincipalBuilderTest {
when(server.getAuthorizationID()).thenReturn("foo/host@REALM.COM"); when(server.getAuthorizationID()).thenReturn("foo/host@REALM.COM");
when(kerberosShortNamer.shortName(any())).thenReturn("foo"); when(kerberosShortNamer.shortName(any())).thenReturn("foo");
DefaultKafkaPrincipalBuilder builder = new DefaultKafkaPrincipalBuilder(kerberosShortNamer); DefaultKafkaPrincipalBuilder builder = new DefaultKafkaPrincipalBuilder(kerberosShortNamer, null);
KafkaPrincipal principal = builder.build(new SaslAuthenticationContext(server, KafkaPrincipal principal = builder.build(new SaslAuthenticationContext(server,
SecurityProtocol.SASL_PLAINTEXT, InetAddress.getLocalHost(), SecurityProtocol.SASL_PLAINTEXT.name())); SecurityProtocol.SASL_PLAINTEXT, InetAddress.getLocalHost(), SecurityProtocol.SASL_PLAINTEXT.name()));

85
clients/src/test/java/org/apache/kafka/common/security/ssl/SslPrincipalMapperTest.java

@ -0,0 +1,85 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.common.security.ssl;
import org.junit.Test;
import java.util.Arrays;
import java.util.List;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.fail;
public class SslPrincipalMapperTest {
@Test
public void testValidRules() {
testValidRule(Arrays.asList("DEFAULT"));
testValidRule(Arrays.asList("RULE:^CN=(.*?),OU=ServiceUsers.*$/$1/"));
testValidRule(Arrays.asList("RULE:^CN=(.*?),OU=ServiceUsers.*$/$1/L", "DEFAULT"));
testValidRule(Arrays.asList("RULE:^CN=(.*?),OU=(.*?),O=(.*?),L=(.*?),ST=(.*?),C=(.*?)$/$1@$2/"));
testValidRule(Arrays.asList("RULE:^.*[Cc][Nn]=([a-zA-Z0-9.]*).*$/$1/L"));
testValidRule(Arrays.asList("RULE:^cn=(.?),ou=(.?),dc=(.?),dc=(.?)$/$1@$2/U"));
}
private void testValidRule(List<String> rules) {
SslPrincipalMapper.fromRules(rules);
}
@Test
public void testInvalidRules() {
testInvalidRule(Arrays.asList("default"));
testInvalidRule(Arrays.asList("DEFAUL"));
testInvalidRule(Arrays.asList("DEFAULT/L"));
testInvalidRule(Arrays.asList("DEFAULT/U"));
testInvalidRule(Arrays.asList("RULE:CN=(.*?),OU=ServiceUsers.*/$1"));
testInvalidRule(Arrays.asList("rule:^CN=(.*?),OU=ServiceUsers.*$/$1/"));
testInvalidRule(Arrays.asList("RULE:^CN=(.*?),OU=ServiceUsers.*$/$1/L/U"));
testInvalidRule(Arrays.asList("RULE:^CN=(.*?),OU=ServiceUsers.*$/L"));
testInvalidRule(Arrays.asList("RULE:^CN=(.*?),OU=ServiceUsers.*$/U"));
testInvalidRule(Arrays.asList("RULE:^CN=(.*?),OU=ServiceUsers.*$/LU"));
}
private void testInvalidRule(List<String> rules) {
try {
System.out.println(SslPrincipalMapper.fromRules(rules));
fail("should have thrown IllegalArgumentException");
} catch (IllegalArgumentException e) {
}
}
@Test
public void testSslPrincipalMapper() throws Exception {
List<String> rules = Arrays.asList(
"RULE:^CN=(.*?),OU=ServiceUsers.*$/$1/L",
"RULE:^CN=(.*?),OU=(.*?),O=(.*?),L=(.*?),ST=(.*?),C=(.*?)$/$1@$2/L",
"RULE:^cn=(.*?),ou=(.*?),dc=(.*?),dc=(.*?)$/$1@$2/U",
"RULE:^.*[Cc][Nn]=([a-zA-Z0-9.]*).*$/$1/U",
"DEFAULT"
);
SslPrincipalMapper mapper = SslPrincipalMapper.fromRules(rules);
assertEquals("duke", mapper.getName("CN=Duke,OU=ServiceUsers,O=Org,C=US"));
assertEquals("duke@sme", mapper.getName("CN=Duke,OU=SME,O=mycp,L=Fulton,ST=MD,C=US"));
assertEquals("DUKE@SME", mapper.getName("cn=duke,ou=sme,dc=mycp,dc=com"));
assertEquals("DUKE", mapper.getName("cN=duke,OU=JavaSoft,O=Sun Microsystems"));
assertEquals("OU=JavaSoft,O=Sun Microsystems,C=US", mapper.getName("OU=JavaSoft,O=Sun Microsystems,C=US"));
}
}

4
core/src/main/scala/kafka/server/KafkaConfig.scala

@ -220,6 +220,7 @@ object Defaults {
val SslClientAuthRequested = "requested" val SslClientAuthRequested = "requested"
val SslClientAuthNone = "none" val SslClientAuthNone = "none"
val SslClientAuth = SslClientAuthNone val SslClientAuth = SslClientAuthNone
val SslPrincipalMappingRules = BrokerSecurityConfigs.DEFAULT_SSL_PRINCIPAL_MAPPING_RULES
/** ********* Sasl configuration ***********/ /** ********* Sasl configuration ***********/
val SaslMechanismInterBrokerProtocol = SaslConfigs.DEFAULT_SASL_MECHANISM val SaslMechanismInterBrokerProtocol = SaslConfigs.DEFAULT_SASL_MECHANISM
@ -439,6 +440,7 @@ object KafkaConfig {
val SslEndpointIdentificationAlgorithmProp = SslConfigs.SSL_ENDPOINT_IDENTIFICATION_ALGORITHM_CONFIG val SslEndpointIdentificationAlgorithmProp = SslConfigs.SSL_ENDPOINT_IDENTIFICATION_ALGORITHM_CONFIG
val SslSecureRandomImplementationProp = SslConfigs.SSL_SECURE_RANDOM_IMPLEMENTATION_CONFIG val SslSecureRandomImplementationProp = SslConfigs.SSL_SECURE_RANDOM_IMPLEMENTATION_CONFIG
val SslClientAuthProp = BrokerSecurityConfigs.SSL_CLIENT_AUTH_CONFIG val SslClientAuthProp = BrokerSecurityConfigs.SSL_CLIENT_AUTH_CONFIG
val SslPrincipalMappingRulesProp = BrokerSecurityConfigs.SSL_PRINCIPAL_MAPPING_RULES_CONFIG
/** ********* SASL Configuration ****************/ /** ********* SASL Configuration ****************/
val SaslMechanismInterBrokerProtocolProp = "sasl.mechanism.inter.broker.protocol" val SaslMechanismInterBrokerProtocolProp = "sasl.mechanism.inter.broker.protocol"
@ -760,6 +762,7 @@ object KafkaConfig {
val SslEndpointIdentificationAlgorithmDoc = SslConfigs.SSL_ENDPOINT_IDENTIFICATION_ALGORITHM_DOC val SslEndpointIdentificationAlgorithmDoc = SslConfigs.SSL_ENDPOINT_IDENTIFICATION_ALGORITHM_DOC
val SslSecureRandomImplementationDoc = SslConfigs.SSL_SECURE_RANDOM_IMPLEMENTATION_DOC val SslSecureRandomImplementationDoc = SslConfigs.SSL_SECURE_RANDOM_IMPLEMENTATION_DOC
val SslClientAuthDoc = BrokerSecurityConfigs.SSL_CLIENT_AUTH_DOC val SslClientAuthDoc = BrokerSecurityConfigs.SSL_CLIENT_AUTH_DOC
val SslPrincipalMappingRulesDoc = BrokerSecurityConfigs.SSL_PRINCIPAL_MAPPING_RULES_DOC
/** ********* Sasl Configuration ****************/ /** ********* Sasl Configuration ****************/
val SaslMechanismInterBrokerProtocolDoc = "SASL mechanism used for inter-broker communication. Default is GSSAPI." val SaslMechanismInterBrokerProtocolDoc = "SASL mechanism used for inter-broker communication. Default is GSSAPI."
@ -998,6 +1001,7 @@ object KafkaConfig {
.define(SslSecureRandomImplementationProp, STRING, null, LOW, SslSecureRandomImplementationDoc) .define(SslSecureRandomImplementationProp, STRING, null, LOW, SslSecureRandomImplementationDoc)
.define(SslClientAuthProp, STRING, Defaults.SslClientAuth, in(Defaults.SslClientAuthRequired, Defaults.SslClientAuthRequested, Defaults.SslClientAuthNone), MEDIUM, SslClientAuthDoc) .define(SslClientAuthProp, STRING, Defaults.SslClientAuth, in(Defaults.SslClientAuthRequired, Defaults.SslClientAuthRequested, Defaults.SslClientAuthNone), MEDIUM, SslClientAuthDoc)
.define(SslCipherSuitesProp, LIST, Collections.emptyList(), MEDIUM, SslCipherSuitesDoc) .define(SslCipherSuitesProp, LIST, Collections.emptyList(), MEDIUM, SslCipherSuitesDoc)
.define(SslPrincipalMappingRulesProp, LIST, Defaults.SslPrincipalMappingRules, LOW, SslPrincipalMappingRulesDoc)
/** ********* Sasl Configuration ****************/ /** ********* Sasl Configuration ****************/
.define(SaslMechanismInterBrokerProtocolProp, STRING, Defaults.SaslMechanismInterBrokerProtocol, MEDIUM, SaslMechanismInterBrokerProtocolDoc) .define(SaslMechanismInterBrokerProtocolProp, STRING, Defaults.SaslMechanismInterBrokerProtocol, MEDIUM, SaslMechanismInterBrokerProtocolDoc)

1
core/src/test/scala/unit/kafka/server/KafkaConfigTest.scala

@ -691,6 +691,7 @@ class KafkaConfigTest {
case KafkaConfig.SslEndpointIdentificationAlgorithmProp => // ignore string case KafkaConfig.SslEndpointIdentificationAlgorithmProp => // ignore string
case KafkaConfig.SslSecureRandomImplementationProp => // ignore string case KafkaConfig.SslSecureRandomImplementationProp => // ignore string
case KafkaConfig.SslCipherSuitesProp => // ignore string case KafkaConfig.SslCipherSuitesProp => // ignore string
case KafkaConfig.SslPrincipalMappingRulesProp => // ignore string
//Sasl Configs //Sasl Configs
case KafkaConfig.SaslMechanismInterBrokerProtocolProp => // ignore case KafkaConfig.SaslMechanismInterBrokerProtocolProp => // ignore

31
docs/security.html

@ -1020,8 +1020,37 @@
<pre>allow.everyone.if.no.acl.found=true</pre> <pre>allow.everyone.if.no.acl.found=true</pre>
One can also add super users in server.properties like the following (note that the delimiter is semicolon since SSL user names may contain comma). Default PrincipalType string "User" is case sensitive. One can also add super users in server.properties like the following (note that the delimiter is semicolon since SSL user names may contain comma). Default PrincipalType string "User" is case sensitive.
<pre>super.users=User:Bob;User:Alice</pre> <pre>super.users=User:Bob;User:Alice</pre>
By default, the SSL user name will be of the form "CN=writeuser,OU=Unknown,O=Unknown,L=Unknown,ST=Unknown,C=Unknown". One can change that by setting a customized PrincipalBuilder in server.properties like the following.
<h5><a id="security_authz_ssl" href="#security_authz_ssl">Customizing SSL User Name</a></h5>
By default, the SSL user name will be of the form "CN=writeuser,OU=Unknown,O=Unknown,L=Unknown,ST=Unknown,C=Unknown". One can change that by setting <code>ssl.principal.mapping.rules</code> to a customized rule in server.properties.
This config allows a list of rules for mapping X.500 distinguished name to short name. The rules are evaluated in order and the first rule that matches a distinguished name is used to map it to a short name. Any later rules in the list are ignored.
<br>The format of <code>ssl.principal.mapping.rules</code> is a list where each rule starts with "RULE:" and contains an expression as the following formats. Default rule will return
string representation of the X.500 certificate distinguished name. If the distinguished name matches the pattern, then the replacement command will be run over the name.
This also supports lowercase/uppercase options, to force the translated result to be all lower/uppercase case. This is done by adding a "/L" or "/U' to the end of the rule.
<pre>
RULE:pattern/replacement/
RULE:pattern/replacement/[LU]
</pre>
Example <code>ssl.principal.mapping.rules</code> values are:
<pre>
RULE:^CN=(.*?),OU=ServiceUsers.*$/$1/,
RULE:^CN=(.*?),OU=(.*?),O=(.*?),L=(.*?),ST=(.*?),C=(.*?)$/$1@$2/L,
RULE:^.*[Cc][Nn]=([a-zA-Z0-9.]*).*$/$1/L,
DEFAULT
</pre>
Above rules translate distinguished name "CN=serviceuser,OU=ServiceUsers,O=Unknown,L=Unknown,ST=Unknown,C=Unknown" to "serviceuser"
and "CN=adminUser,OU=Admin,O=Unknown,L=Unknown,ST=Unknown,C=Unknown" to "adminuser@admin".
<br>For advanced use cases, one can customize the name by setting a customized PrincipalBuilder in server.properties like the following.
<pre>principal.builder.class=CustomizedPrincipalBuilderClass</pre> <pre>principal.builder.class=CustomizedPrincipalBuilderClass</pre>
<h5><a id="security_authz_sasl" href="#security_authz_sasl">Customizing SASL User Name</a></h5>
By default, the SASL user name will be the primary part of the Kerberos principal. One can change that by setting <code>sasl.kerberos.principal.to.local.rules</code> to a customized rule in server.properties. By default, the SASL user name will be the primary part of the Kerberos principal. One can change that by setting <code>sasl.kerberos.principal.to.local.rules</code> to a customized rule in server.properties.
The format of <code>sasl.kerberos.principal.to.local.rules</code> is a list where each rule works in the same way as the auth_to_local in <a href="http://web.mit.edu/Kerberos/krb5-latest/doc/admin/conf_files/krb5_conf.html">Kerberos configuration file (krb5.conf)</a>. This also support additional lowercase rule, to force the translated result to be all lower case. This is done by adding a "/L" to the end of the rule. check below formats for syntax. The format of <code>sasl.kerberos.principal.to.local.rules</code> is a list where each rule works in the same way as the auth_to_local in <a href="http://web.mit.edu/Kerberos/krb5-latest/doc/admin/conf_files/krb5_conf.html">Kerberos configuration file (krb5.conf)</a>. This also support additional lowercase rule, to force the translated result to be all lower case. This is done by adding a "/L" to the end of the rule. check below formats for syntax.
Each rules starts with RULE: and contains an expression as the following formats. See the kerberos documentation for more details. Each rules starts with RULE: and contains an expression as the following formats. See the kerberos documentation for more details.

Loading…
Cancel
Save