diff --git a/clients/src/main/java/org/apache/kafka/common/config/internals/BrokerSecurityConfigs.java b/clients/src/main/java/org/apache/kafka/common/config/internals/BrokerSecurityConfigs.java index a29d8069b99..e3a8a774a51 100644 --- a/clients/src/main/java/org/apache/kafka/common/config/internals/BrokerSecurityConfigs.java +++ b/clients/src/main/java/org/apache/kafka/common/config/internals/BrokerSecurityConfigs.java @@ -34,17 +34,28 @@ public class BrokerSecurityConfigs { public static final String SSL_CLIENT_AUTH_CONFIG = "ssl.client.auth"; public static final String SASL_ENABLED_MECHANISMS_CONFIG = "sasl.enabled.mechanisms"; public static final String SASL_SERVER_CALLBACK_HANDLER_CLASS = "sasl.server.callback.handler.class"; + public static final String SSL_PRINCIPAL_MAPPING_RULES_CONFIG = "ssl.principal.mapping.rules"; public static final String PRINCIPAL_BUILDER_CLASS_DOC = "The fully qualified name of a class that implements the " + "KafkaPrincipalBuilder interface, which is used to build the KafkaPrincipal object used during " + "authorization. This config also supports the deprecated PrincipalBuilder interface which was previously " + "used for client authentication over SSL. If no principal builder is defined, the default behavior depends " + - "on the security protocol in use. For SSL authentication, the principal name will be the distinguished " + + "on the security protocol in use. For SSL authentication, the principal will be derived using the" + + " rules defined by " + SSL_PRINCIPAL_MAPPING_RULES_CONFIG + " applied on the distinguished " + "name from the client certificate if one is provided; otherwise, if client authentication is not required, " + "the principal name will be ANONYMOUS. For SASL authentication, the principal will be derived using the " + "rules defined by " + SASL_KERBEROS_PRINCIPAL_TO_LOCAL_RULES_CONFIG + " if GSSAPI is in use, " + "and the SASL authentication ID for other mechanisms. For PLAINTEXT, the principal will be ANONYMOUS."; + public static final String SSL_PRINCIPAL_MAPPING_RULES_DOC = "A list of rules for mapping from distinguished name" + + " from the client certificate to short name. The rules are evaluated in order and the first rule that matches" + + " a principal name is used to map it to a short name. Any later rules in the list are ignored. By default," + + " distinguished name of the X.500 certificate will be the principal. For more details on the format please" + + " see security authorization and acls. Note that this configuration is ignored" + + " if an extension of KafkaPrincipalBuilder is provided by the " + PRINCIPAL_BUILDER_CLASS_CONFIG + "" + + " configuration."; + public static final List DEFAULT_SSL_PRINCIPAL_MAPPING_RULES = Collections.singletonList("DEFAULT"); + public static final String SASL_KERBEROS_PRINCIPAL_TO_LOCAL_RULES_DOC = "A list of rules for mapping from principal " + "names to short names (typically operating system usernames). The rules are evaluated in order and the " + "first rule that matches a principal name is used to map it to a short name. Any later rules in the list are " + diff --git a/clients/src/main/java/org/apache/kafka/common/network/ChannelBuilders.java b/clients/src/main/java/org/apache/kafka/common/network/ChannelBuilders.java index 10779b7881f..b3040f3ef73 100644 --- a/clients/src/main/java/org/apache/kafka/common/network/ChannelBuilders.java +++ b/clients/src/main/java/org/apache/kafka/common/network/ChannelBuilders.java @@ -26,6 +26,7 @@ import org.apache.kafka.common.security.authenticator.DefaultKafkaPrincipalBuild import org.apache.kafka.common.security.auth.KafkaPrincipalBuilder; import org.apache.kafka.common.security.authenticator.CredentialCache; import org.apache.kafka.common.security.kerberos.KerberosShortNamer; +import org.apache.kafka.common.security.ssl.SslPrincipalMapper; import org.apache.kafka.common.security.token.delegation.internals.DelegationTokenCache; import org.apache.kafka.common.utils.Utils; @@ -163,12 +164,13 @@ public class ChannelBuilders { public static KafkaPrincipalBuilder createPrincipalBuilder(Map configs, TransportLayer transportLayer, Authenticator authenticator, - KerberosShortNamer kerberosShortNamer) { + KerberosShortNamer kerberosShortNamer, + SslPrincipalMapper sslPrincipalMapper) { Class principalBuilderClass = (Class) configs.get(BrokerSecurityConfigs.PRINCIPAL_BUILDER_CLASS_CONFIG); final KafkaPrincipalBuilder builder; if (principalBuilderClass == null || principalBuilderClass == DefaultKafkaPrincipalBuilder.class) { - builder = new DefaultKafkaPrincipalBuilder(kerberosShortNamer); + builder = new DefaultKafkaPrincipalBuilder(kerberosShortNamer, sslPrincipalMapper); } else if (KafkaPrincipalBuilder.class.isAssignableFrom(principalBuilderClass)) { builder = (KafkaPrincipalBuilder) Utils.newInstance(principalBuilderClass); } else if (org.apache.kafka.common.security.auth.PrincipalBuilder.class.isAssignableFrom(principalBuilderClass)) { diff --git a/clients/src/main/java/org/apache/kafka/common/network/PlaintextChannelBuilder.java b/clients/src/main/java/org/apache/kafka/common/network/PlaintextChannelBuilder.java index e397f05060f..e5dc778705f 100644 --- a/clients/src/main/java/org/apache/kafka/common/network/PlaintextChannelBuilder.java +++ b/clients/src/main/java/org/apache/kafka/common/network/PlaintextChannelBuilder.java @@ -70,7 +70,7 @@ public class PlaintextChannelBuilder implements ChannelBuilder { private PlaintextAuthenticator(Map configs, PlaintextTransportLayer transportLayer, ListenerName listenerName) { this.transportLayer = transportLayer; - this.principalBuilder = ChannelBuilders.createPrincipalBuilder(configs, transportLayer, this, null); + this.principalBuilder = ChannelBuilders.createPrincipalBuilder(configs, transportLayer, this, null, null); this.listenerName = listenerName; } diff --git a/clients/src/main/java/org/apache/kafka/common/network/SslChannelBuilder.java b/clients/src/main/java/org/apache/kafka/common/network/SslChannelBuilder.java index 86d41d08518..ffa8deb3127 100644 --- a/clients/src/main/java/org/apache/kafka/common/network/SslChannelBuilder.java +++ b/clients/src/main/java/org/apache/kafka/common/network/SslChannelBuilder.java @@ -18,10 +18,12 @@ package org.apache.kafka.common.network; import org.apache.kafka.common.KafkaException; import org.apache.kafka.common.config.SslConfigs; +import org.apache.kafka.common.config.internals.BrokerSecurityConfigs; import org.apache.kafka.common.memory.MemoryPool; import org.apache.kafka.common.security.auth.KafkaPrincipal; import org.apache.kafka.common.security.auth.KafkaPrincipalBuilder; import org.apache.kafka.common.security.auth.SslAuthenticationContext; +import org.apache.kafka.common.security.ssl.SslPrincipalMapper; import org.apache.kafka.common.security.ssl.SslFactory; import org.apache.kafka.common.utils.Utils; import org.slf4j.Logger; @@ -33,6 +35,7 @@ import java.net.InetAddress; import java.net.InetSocketAddress; import java.nio.channels.SelectionKey; import java.nio.channels.SocketChannel; +import java.util.List; import java.util.Map; import java.util.Set; @@ -44,6 +47,7 @@ public class SslChannelBuilder implements ChannelBuilder, ListenerReconfigurable private SslFactory sslFactory; private Mode mode; private Map configs; + private SslPrincipalMapper sslPrincipalMapper; /** * Constructs a SSL channel builder. ListenerName is provided only @@ -58,6 +62,10 @@ public class SslChannelBuilder implements ChannelBuilder, ListenerReconfigurable public void configure(Map configs) throws KafkaException { try { this.configs = configs; + @SuppressWarnings("unchecked") + List sslPrincipalMappingRules = (List) configs.get(BrokerSecurityConfigs.SSL_PRINCIPAL_MAPPING_RULES_CONFIG); + if (sslPrincipalMappingRules != null) + sslPrincipalMapper = SslPrincipalMapper.fromRules(sslPrincipalMappingRules); this.sslFactory = new SslFactory(mode, null, isInterBrokerListener); this.sslFactory.configure(this.configs); } catch (Exception e) { @@ -89,7 +97,7 @@ public class SslChannelBuilder implements ChannelBuilder, ListenerReconfigurable public KafkaChannel buildChannel(String id, SelectionKey key, int maxReceiveSize, MemoryPool memoryPool) throws KafkaException { try { SslTransportLayer transportLayer = buildTransportLayer(sslFactory, id, key, peerHost(key)); - Authenticator authenticator = new SslAuthenticator(configs, transportLayer, listenerName); + Authenticator authenticator = new SslAuthenticator(configs, transportLayer, listenerName, sslPrincipalMapper); return new KafkaChannel(id, transportLayer, authenticator, maxReceiveSize, memoryPool != null ? memoryPool : MemoryPool.NONE); } catch (Exception e) { @@ -154,9 +162,9 @@ public class SslChannelBuilder implements ChannelBuilder, ListenerReconfigurable private final KafkaPrincipalBuilder principalBuilder; private final ListenerName listenerName; - private SslAuthenticator(Map configs, SslTransportLayer transportLayer, ListenerName listenerName) { + private SslAuthenticator(Map configs, SslTransportLayer transportLayer, ListenerName listenerName, SslPrincipalMapper sslPrincipalMapper) { this.transportLayer = transportLayer; - this.principalBuilder = ChannelBuilders.createPrincipalBuilder(configs, transportLayer, this, null); + this.principalBuilder = ChannelBuilders.createPrincipalBuilder(configs, transportLayer, this, null, sslPrincipalMapper); this.listenerName = listenerName; } /** diff --git a/clients/src/main/java/org/apache/kafka/common/security/authenticator/DefaultKafkaPrincipalBuilder.java b/clients/src/main/java/org/apache/kafka/common/security/authenticator/DefaultKafkaPrincipalBuilder.java index 30b0a3e0980..38c303f5266 100644 --- a/clients/src/main/java/org/apache/kafka/common/security/authenticator/DefaultKafkaPrincipalBuilder.java +++ b/clients/src/main/java/org/apache/kafka/common/security/authenticator/DefaultKafkaPrincipalBuilder.java @@ -16,6 +16,7 @@ */ package org.apache.kafka.common.security.authenticator; +import javax.security.auth.x500.X500Principal; import org.apache.kafka.common.KafkaException; import org.apache.kafka.common.config.SaslConfigs; import org.apache.kafka.common.network.Authenticator; @@ -32,6 +33,8 @@ import org.apache.kafka.common.security.kerberos.KerberosShortNamer; import javax.net.ssl.SSLPeerUnverifiedException; import javax.net.ssl.SSLSession; import javax.security.sasl.SaslServer; +import org.apache.kafka.common.security.ssl.SslPrincipalMapper; + import java.io.Closeable; import java.io.IOException; import java.security.Principal; @@ -55,6 +58,7 @@ public class DefaultKafkaPrincipalBuilder implements KafkaPrincipalBuilder, Clos private final Authenticator authenticator; private final TransportLayer transportLayer; private final KerberosShortNamer kerberosShortNamer; + private final SslPrincipalMapper sslPrincipalMapper; /** * Construct a new instance which wraps an instance of the older {@link org.apache.kafka.common.security.auth.PrincipalBuilder}. @@ -73,27 +77,31 @@ public class DefaultKafkaPrincipalBuilder implements KafkaPrincipalBuilder, Clos requireNonNull(authenticator), requireNonNull(transportLayer), requireNonNull(oldPrincipalBuilder), - kerberosShortNamer); + kerberosShortNamer, + null); } @SuppressWarnings("deprecation") private DefaultKafkaPrincipalBuilder(Authenticator authenticator, TransportLayer transportLayer, org.apache.kafka.common.security.auth.PrincipalBuilder oldPrincipalBuilder, - KerberosShortNamer kerberosShortNamer) { + KerberosShortNamer kerberosShortNamer, + SslPrincipalMapper sslPrincipalMapper) { this.authenticator = authenticator; this.transportLayer = transportLayer; this.oldPrincipalBuilder = oldPrincipalBuilder; this.kerberosShortNamer = kerberosShortNamer; + this.sslPrincipalMapper = sslPrincipalMapper; } /** * Construct a new instance. * * @param kerberosShortNamer Kerberos name rewrite rules or null if none have been configured + * @param sslPrincipalMapper SSL Principal mapper or null if none have been configured */ - public DefaultKafkaPrincipalBuilder(KerberosShortNamer kerberosShortNamer) { - this(null, null, null, kerberosShortNamer); + public DefaultKafkaPrincipalBuilder(KerberosShortNamer kerberosShortNamer, SslPrincipalMapper sslPrincipalMapper) { + this(null, null, null, kerberosShortNamer, sslPrincipalMapper); } @Override @@ -110,7 +118,7 @@ public class DefaultKafkaPrincipalBuilder implements KafkaPrincipalBuilder, Clos return convertToKafkaPrincipal(oldPrincipalBuilder.buildPrincipal(transportLayer, authenticator)); try { - return convertToKafkaPrincipal(sslSession.getPeerPrincipal()); + return applySslPrincipalMapper(sslSession.getPeerPrincipal()); } catch (SSLPeerUnverifiedException se) { return KafkaPrincipal.ANONYMOUS; } @@ -136,6 +144,19 @@ public class DefaultKafkaPrincipalBuilder implements KafkaPrincipalBuilder, Clos } } + private KafkaPrincipal applySslPrincipalMapper(Principal principal) { + try { + if (!(principal instanceof X500Principal) || principal == KafkaPrincipal.ANONYMOUS) { + return new KafkaPrincipal(KafkaPrincipal.USER_TYPE, principal.getName()); + } else { + return new KafkaPrincipal(KafkaPrincipal.USER_TYPE, sslPrincipalMapper.getName(principal.getName())); + } + } catch (IOException e) { + throw new KafkaException("Failed to map name for '" + principal.getName() + + "' based on SSL principal mapping rules.", e); + } + } + private KafkaPrincipal convertToKafkaPrincipal(Principal principal) { return new KafkaPrincipal(KafkaPrincipal.USER_TYPE, principal.getName()); } diff --git a/clients/src/main/java/org/apache/kafka/common/security/authenticator/SaslServerAuthenticator.java b/clients/src/main/java/org/apache/kafka/common/security/authenticator/SaslServerAuthenticator.java index 48a49fe4e94..4db1971b377 100644 --- a/clients/src/main/java/org/apache/kafka/common/security/authenticator/SaslServerAuthenticator.java +++ b/clients/src/main/java/org/apache/kafka/common/security/authenticator/SaslServerAuthenticator.java @@ -153,7 +153,7 @@ public class SaslServerAuthenticator implements Authenticator { // Note that the old principal builder does not support SASL, so we do not need to pass the // authenticator or the transport layer - this.principalBuilder = ChannelBuilders.createPrincipalBuilder(configs, null, null, kerberosNameParser); + this.principalBuilder = ChannelBuilders.createPrincipalBuilder(configs, null, null, kerberosNameParser, null); } private void createSaslServer(String mechanism) throws IOException { diff --git a/clients/src/main/java/org/apache/kafka/common/security/ssl/SslPrincipalMapper.java b/clients/src/main/java/org/apache/kafka/common/security/ssl/SslPrincipalMapper.java new file mode 100644 index 00000000000..7ec4a79b2eb --- /dev/null +++ b/clients/src/main/java/org/apache/kafka/common/security/ssl/SslPrincipalMapper.java @@ -0,0 +1,197 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.common.security.ssl; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; +import java.util.Locale; +import java.util.regex.Matcher; +import java.util.regex.Pattern; + +public class SslPrincipalMapper { + + private static final Pattern RULE_PARSER = Pattern.compile("((DEFAULT)|(RULE:(([^/]*)/([^/]*))/([LU])?))"); + + private final List rules; + + public SslPrincipalMapper(List sslPrincipalMappingRules) { + this.rules = sslPrincipalMappingRules; + } + + public static SslPrincipalMapper fromRules(List sslPrincipalMappingRules) { + List rules = sslPrincipalMappingRules == null ? Collections.singletonList("DEFAULT") : sslPrincipalMappingRules; + return new SslPrincipalMapper(parseRules(rules)); + } + + private static List parseRules(List rules) { + List result = new ArrayList<>(); + for (String rule : rules) { + Matcher matcher = RULE_PARSER.matcher(rule); + if (!matcher.lookingAt()) { + throw new IllegalArgumentException("Invalid rule: " + rule); + } + if (rule.length() != matcher.end()) { + throw new IllegalArgumentException("Invalid rule: `" + rule + "`, unmatched substring: `" + rule.substring(matcher.end()) + "`"); + } + if (matcher.group(2) != null) { + result.add(new Rule()); + } else { + result.add(new Rule(matcher.group(5), + matcher.group(6), + "L".equals(matcher.group(7)), + "U".equals(matcher.group(7)))); + } + } + return result; + } + + public String getName(String distinguishedName) throws IOException { + for (Rule r : rules) { + String principalName = r.apply(distinguishedName); + if (principalName != null) { + return principalName; + } + } + throw new NoMatchingRule("No rules apply to " + distinguishedName + ", rules " + rules); + } + + @Override + public String toString() { + return "SslPrincipalMapper(rules = " + rules + ")"; + } + + public static class NoMatchingRule extends IOException { + NoMatchingRule(String msg) { + super(msg); + } + } + + private static class Rule { + private static final Pattern BACK_REFERENCE_PATTERN = Pattern.compile("\\$(\\d+)"); + + private final boolean isDefault; + private final Pattern pattern; + private final String replacement; + private final boolean toLowerCase; + private final boolean toUpperCase; + + Rule() { + isDefault = true; + pattern = null; + replacement = null; + toLowerCase = false; + toUpperCase = false; + } + + Rule(String pattern, String replacement, boolean toLowerCase, boolean toUpperCase) { + isDefault = false; + this.pattern = pattern == null ? null : Pattern.compile(pattern); + this.replacement = replacement; + this.toLowerCase = toLowerCase; + this.toUpperCase = toUpperCase; + } + + String apply(String distinguishedName) { + if (isDefault) { + return distinguishedName; + } + + String result = null; + final Matcher m = pattern.matcher(distinguishedName); + + if (m.matches()) { + result = distinguishedName.replaceAll(pattern.pattern(), escapeLiteralBackReferences(replacement, m.groupCount())); + } + + if (toLowerCase && result != null) { + result = result.toLowerCase(Locale.ENGLISH); + } else if (toUpperCase & result != null) { + result = result.toUpperCase(Locale.ENGLISH); + } + + return result; + } + + //If we find a back reference that is not valid, then we will treat it as a literal string. For example, if we have 3 capturing + //groups and the Replacement Value has the value is "$1@$4", then we want to treat the $4 as a literal "$4", rather + //than attempting to use it as a back reference. + //This method was taken from Apache Nifi project : org.apache.nifi.authorization.util.IdentityMappingUtil + private String escapeLiteralBackReferences(final String unescaped, final int numCapturingGroups) { + if (numCapturingGroups == 0) { + return unescaped; + } + + String value = unescaped; + final Matcher backRefMatcher = BACK_REFERENCE_PATTERN.matcher(value); + while (backRefMatcher.find()) { + final String backRefNum = backRefMatcher.group(1); + if (backRefNum.startsWith("0")) { + continue; + } + final int originalBackRefIndex = Integer.parseInt(backRefNum); + int backRefIndex = originalBackRefIndex; + + + // if we have a replacement value like $123, and we have less than 123 capturing groups, then + // we want to truncate the 3 and use capturing group 12; if we have less than 12 capturing groups, + // then we want to truncate the 2 and use capturing group 1; if we don't have a capturing group then + // we want to truncate the 1 and get 0. + while (backRefIndex > numCapturingGroups && backRefIndex >= 10) { + backRefIndex /= 10; + } + + if (backRefIndex > numCapturingGroups) { + final StringBuilder sb = new StringBuilder(value.length() + 1); + final int groupStart = backRefMatcher.start(1); + + sb.append(value.substring(0, groupStart - 1)); + sb.append("\\"); + sb.append(value.substring(groupStart - 1)); + value = sb.toString(); + } + } + + return value; + } + + @Override + public String toString() { + StringBuilder buf = new StringBuilder(); + if (isDefault) { + buf.append("DEFAULT"); + } else { + buf.append("RULE:"); + if (pattern != null) { + buf.append(pattern); + } + if (replacement != null) { + buf.append("/"); + buf.append(replacement); + } + if (toLowerCase) { + buf.append("/L"); + } else if (toUpperCase) { + buf.append("/U"); + } + } + return buf.toString(); + } + + } +} diff --git a/clients/src/test/java/org/apache/kafka/common/network/ChannelBuildersTest.java b/clients/src/test/java/org/apache/kafka/common/network/ChannelBuildersTest.java index 27daf0face8..630cba1f3e1 100644 --- a/clients/src/test/java/org/apache/kafka/common/network/ChannelBuildersTest.java +++ b/clients/src/test/java/org/apache/kafka/common/network/ChannelBuildersTest.java @@ -45,7 +45,7 @@ public class ChannelBuildersTest { Map configs = new HashMap<>(); configs.put(BrokerSecurityConfigs.PRINCIPAL_BUILDER_CLASS_CONFIG, OldPrincipalBuilder.class); - KafkaPrincipalBuilder builder = ChannelBuilders.createPrincipalBuilder(configs, transportLayer, authenticator, null); + KafkaPrincipalBuilder builder = ChannelBuilders.createPrincipalBuilder(configs, transportLayer, authenticator, null, null); // test old principal builder is properly configured and delegated to assertTrue(OldPrincipalBuilder.configured); @@ -60,7 +60,7 @@ public class ChannelBuildersTest { public void testCreateConfigurableKafkaPrincipalBuilder() { Map configs = new HashMap<>(); configs.put(BrokerSecurityConfigs.PRINCIPAL_BUILDER_CLASS_CONFIG, ConfigurableKafkaPrincipalBuilder.class); - KafkaPrincipalBuilder builder = ChannelBuilders.createPrincipalBuilder(configs, null, null, null); + KafkaPrincipalBuilder builder = ChannelBuilders.createPrincipalBuilder(configs, null, null, null, null); assertTrue(builder instanceof ConfigurableKafkaPrincipalBuilder); assertTrue(((ConfigurableKafkaPrincipalBuilder) builder).configured); } diff --git a/clients/src/test/java/org/apache/kafka/common/security/auth/DefaultKafkaPrincipalBuilderTest.java b/clients/src/test/java/org/apache/kafka/common/security/auth/DefaultKafkaPrincipalBuilderTest.java index a05a8502bf1..dd5087a84b3 100644 --- a/clients/src/test/java/org/apache/kafka/common/security/auth/DefaultKafkaPrincipalBuilderTest.java +++ b/clients/src/test/java/org/apache/kafka/common/security/auth/DefaultKafkaPrincipalBuilderTest.java @@ -16,23 +16,28 @@ */ package org.apache.kafka.common.security.auth; +import javax.security.auth.x500.X500Principal; import org.apache.kafka.common.config.SaslConfigs; import org.apache.kafka.common.network.Authenticator; import org.apache.kafka.common.network.TransportLayer; import org.apache.kafka.common.security.authenticator.DefaultKafkaPrincipalBuilder; import org.apache.kafka.common.security.kerberos.KerberosShortNamer; import org.apache.kafka.common.security.scram.internals.ScramMechanism; +import org.apache.kafka.common.security.ssl.SslPrincipalMapper; import org.junit.Test; import javax.net.ssl.SSLSession; import javax.security.sasl.SaslServer; import java.net.InetAddress; import java.security.Principal; +import java.util.Arrays; +import java.util.List; import static org.junit.Assert.assertEquals; import static org.mockito.ArgumentMatchers.any; import static org.mockito.Mockito.atLeastOnce; import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.times; import static org.mockito.Mockito.verify; import static org.mockito.Mockito.when; @@ -63,7 +68,7 @@ public class DefaultKafkaPrincipalBuilderTest { @Test public void testReturnAnonymousPrincipalForPlaintext() throws Exception { - try (DefaultKafkaPrincipalBuilder builder = new DefaultKafkaPrincipalBuilder(null)) { + try (DefaultKafkaPrincipalBuilder builder = new DefaultKafkaPrincipalBuilder(null, null)) { assertEquals(KafkaPrincipal.ANONYMOUS, builder.build( new PlaintextAuthenticationContext(InetAddress.getLocalHost(), SecurityProtocol.PLAINTEXT.name()))); } @@ -100,7 +105,7 @@ public class DefaultKafkaPrincipalBuilderTest { when(session.getPeerPrincipal()).thenReturn(new DummyPrincipal("foo")); - DefaultKafkaPrincipalBuilder builder = new DefaultKafkaPrincipalBuilder(null); + DefaultKafkaPrincipalBuilder builder = new DefaultKafkaPrincipalBuilder(null, null); KafkaPrincipal principal = builder.build( new SslAuthenticationContext(session, InetAddress.getLocalHost(), SecurityProtocol.PLAINTEXT.name())); @@ -112,6 +117,60 @@ public class DefaultKafkaPrincipalBuilderTest { verify(session, atLeastOnce()).getPeerPrincipal(); } + @Test + public void testPrincipalIfSSLPeerIsNotAuthenticated() throws Exception { + SSLSession session = mock(SSLSession.class); + + when(session.getPeerPrincipal()).thenReturn(KafkaPrincipal.ANONYMOUS); + + DefaultKafkaPrincipalBuilder builder = new DefaultKafkaPrincipalBuilder(null, null); + + KafkaPrincipal principal = builder.build( + new SslAuthenticationContext(session, InetAddress.getLocalHost(), SecurityProtocol.PLAINTEXT.name())); + assertEquals(KafkaPrincipal.ANONYMOUS, principal); + + builder.close(); + verify(session, atLeastOnce()).getPeerPrincipal(); + } + + + @Test + public void testPrincipalWithSslPrincipalMapper() throws Exception { + SSLSession session = mock(SSLSession.class); + + when(session.getPeerPrincipal()).thenReturn(new X500Principal("CN=Duke, OU=ServiceUsers, O=Org, C=US")) + .thenReturn(new X500Principal("CN=Duke, OU=SME, O=mycp, L=Fulton, ST=MD, C=US")) + .thenReturn(new X500Principal("CN=duke, OU=JavaSoft, O=Sun Microsystems")) + .thenReturn(new X500Principal("OU=JavaSoft, O=Sun Microsystems, C=US")); + + List rules = Arrays.asList( + "RULE:^CN=(.*),OU=ServiceUsers.*$/$1/L", + "RULE:^CN=(.*),OU=(.*),O=(.*),L=(.*),ST=(.*),C=(.*)$/$1@$2/L", + "RULE:^.*[Cc][Nn]=([a-zA-Z0-9.]*).*$/$1/U", + "DEFAULT" + ); + + SslPrincipalMapper mapper = SslPrincipalMapper.fromRules(rules); + DefaultKafkaPrincipalBuilder builder = new DefaultKafkaPrincipalBuilder(null, mapper); + + SslAuthenticationContext sslContext = new SslAuthenticationContext(session, InetAddress.getLocalHost(), SecurityProtocol.PLAINTEXT.name()); + + KafkaPrincipal principal = builder.build(sslContext); + assertEquals("duke", principal.getName()); + + principal = builder.build(sslContext); + assertEquals("duke@sme", principal.getName()); + + principal = builder.build(sslContext); + assertEquals("DUKE", principal.getName()); + + principal = builder.build(sslContext); + assertEquals("OU=JavaSoft,O=Sun Microsystems,C=US", principal.getName()); + + builder.close(); + verify(session, times(4)).getPeerPrincipal(); + } + @Test public void testPrincipalBuilderScram() throws Exception { SaslServer server = mock(SaslServer.class); @@ -119,7 +178,7 @@ public class DefaultKafkaPrincipalBuilderTest { when(server.getMechanismName()).thenReturn(ScramMechanism.SCRAM_SHA_256.mechanismName()); when(server.getAuthorizationID()).thenReturn("foo"); - DefaultKafkaPrincipalBuilder builder = new DefaultKafkaPrincipalBuilder(null); + DefaultKafkaPrincipalBuilder builder = new DefaultKafkaPrincipalBuilder(null, null); KafkaPrincipal principal = builder.build(new SaslAuthenticationContext(server, SecurityProtocol.SASL_PLAINTEXT, InetAddress.getLocalHost(), SecurityProtocol.SASL_PLAINTEXT.name())); @@ -141,7 +200,7 @@ public class DefaultKafkaPrincipalBuilderTest { when(server.getAuthorizationID()).thenReturn("foo/host@REALM.COM"); when(kerberosShortNamer.shortName(any())).thenReturn("foo"); - DefaultKafkaPrincipalBuilder builder = new DefaultKafkaPrincipalBuilder(kerberosShortNamer); + DefaultKafkaPrincipalBuilder builder = new DefaultKafkaPrincipalBuilder(kerberosShortNamer, null); KafkaPrincipal principal = builder.build(new SaslAuthenticationContext(server, SecurityProtocol.SASL_PLAINTEXT, InetAddress.getLocalHost(), SecurityProtocol.SASL_PLAINTEXT.name())); diff --git a/clients/src/test/java/org/apache/kafka/common/security/ssl/SslPrincipalMapperTest.java b/clients/src/test/java/org/apache/kafka/common/security/ssl/SslPrincipalMapperTest.java new file mode 100644 index 00000000000..c647fd00a3d --- /dev/null +++ b/clients/src/test/java/org/apache/kafka/common/security/ssl/SslPrincipalMapperTest.java @@ -0,0 +1,85 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.common.security.ssl; + +import org.junit.Test; + +import java.util.Arrays; +import java.util.List; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.fail; + +public class SslPrincipalMapperTest { + + @Test + public void testValidRules() { + testValidRule(Arrays.asList("DEFAULT")); + testValidRule(Arrays.asList("RULE:^CN=(.*?),OU=ServiceUsers.*$/$1/")); + testValidRule(Arrays.asList("RULE:^CN=(.*?),OU=ServiceUsers.*$/$1/L", "DEFAULT")); + testValidRule(Arrays.asList("RULE:^CN=(.*?),OU=(.*?),O=(.*?),L=(.*?),ST=(.*?),C=(.*?)$/$1@$2/")); + testValidRule(Arrays.asList("RULE:^.*[Cc][Nn]=([a-zA-Z0-9.]*).*$/$1/L")); + testValidRule(Arrays.asList("RULE:^cn=(.?),ou=(.?),dc=(.?),dc=(.?)$/$1@$2/U")); + } + + private void testValidRule(List rules) { + SslPrincipalMapper.fromRules(rules); + } + + @Test + public void testInvalidRules() { + testInvalidRule(Arrays.asList("default")); + testInvalidRule(Arrays.asList("DEFAUL")); + testInvalidRule(Arrays.asList("DEFAULT/L")); + testInvalidRule(Arrays.asList("DEFAULT/U")); + + testInvalidRule(Arrays.asList("RULE:CN=(.*?),OU=ServiceUsers.*/$1")); + testInvalidRule(Arrays.asList("rule:^CN=(.*?),OU=ServiceUsers.*$/$1/")); + testInvalidRule(Arrays.asList("RULE:^CN=(.*?),OU=ServiceUsers.*$/$1/L/U")); + testInvalidRule(Arrays.asList("RULE:^CN=(.*?),OU=ServiceUsers.*$/L")); + testInvalidRule(Arrays.asList("RULE:^CN=(.*?),OU=ServiceUsers.*$/U")); + testInvalidRule(Arrays.asList("RULE:^CN=(.*?),OU=ServiceUsers.*$/LU")); + } + + private void testInvalidRule(List rules) { + try { + System.out.println(SslPrincipalMapper.fromRules(rules)); + fail("should have thrown IllegalArgumentException"); + } catch (IllegalArgumentException e) { + } + } + + @Test + public void testSslPrincipalMapper() throws Exception { + List rules = Arrays.asList( + "RULE:^CN=(.*?),OU=ServiceUsers.*$/$1/L", + "RULE:^CN=(.*?),OU=(.*?),O=(.*?),L=(.*?),ST=(.*?),C=(.*?)$/$1@$2/L", + "RULE:^cn=(.*?),ou=(.*?),dc=(.*?),dc=(.*?)$/$1@$2/U", + "RULE:^.*[Cc][Nn]=([a-zA-Z0-9.]*).*$/$1/U", + "DEFAULT" + ); + + SslPrincipalMapper mapper = SslPrincipalMapper.fromRules(rules); + + assertEquals("duke", mapper.getName("CN=Duke,OU=ServiceUsers,O=Org,C=US")); + assertEquals("duke@sme", mapper.getName("CN=Duke,OU=SME,O=mycp,L=Fulton,ST=MD,C=US")); + assertEquals("DUKE@SME", mapper.getName("cn=duke,ou=sme,dc=mycp,dc=com")); + assertEquals("DUKE", mapper.getName("cN=duke,OU=JavaSoft,O=Sun Microsystems")); + assertEquals("OU=JavaSoft,O=Sun Microsystems,C=US", mapper.getName("OU=JavaSoft,O=Sun Microsystems,C=US")); + } + +} diff --git a/core/src/main/scala/kafka/server/KafkaConfig.scala b/core/src/main/scala/kafka/server/KafkaConfig.scala index 9edda4ea7f1..9bf41a1e054 100755 --- a/core/src/main/scala/kafka/server/KafkaConfig.scala +++ b/core/src/main/scala/kafka/server/KafkaConfig.scala @@ -220,6 +220,7 @@ object Defaults { val SslClientAuthRequested = "requested" val SslClientAuthNone = "none" val SslClientAuth = SslClientAuthNone + val SslPrincipalMappingRules = BrokerSecurityConfigs.DEFAULT_SSL_PRINCIPAL_MAPPING_RULES /** ********* Sasl configuration ***********/ val SaslMechanismInterBrokerProtocol = SaslConfigs.DEFAULT_SASL_MECHANISM @@ -439,6 +440,7 @@ object KafkaConfig { val SslEndpointIdentificationAlgorithmProp = SslConfigs.SSL_ENDPOINT_IDENTIFICATION_ALGORITHM_CONFIG val SslSecureRandomImplementationProp = SslConfigs.SSL_SECURE_RANDOM_IMPLEMENTATION_CONFIG val SslClientAuthProp = BrokerSecurityConfigs.SSL_CLIENT_AUTH_CONFIG + val SslPrincipalMappingRulesProp = BrokerSecurityConfigs.SSL_PRINCIPAL_MAPPING_RULES_CONFIG /** ********* SASL Configuration ****************/ val SaslMechanismInterBrokerProtocolProp = "sasl.mechanism.inter.broker.protocol" @@ -760,6 +762,7 @@ object KafkaConfig { val SslEndpointIdentificationAlgorithmDoc = SslConfigs.SSL_ENDPOINT_IDENTIFICATION_ALGORITHM_DOC val SslSecureRandomImplementationDoc = SslConfigs.SSL_SECURE_RANDOM_IMPLEMENTATION_DOC val SslClientAuthDoc = BrokerSecurityConfigs.SSL_CLIENT_AUTH_DOC + val SslPrincipalMappingRulesDoc = BrokerSecurityConfigs.SSL_PRINCIPAL_MAPPING_RULES_DOC /** ********* Sasl Configuration ****************/ val SaslMechanismInterBrokerProtocolDoc = "SASL mechanism used for inter-broker communication. Default is GSSAPI." @@ -998,6 +1001,7 @@ object KafkaConfig { .define(SslSecureRandomImplementationProp, STRING, null, LOW, SslSecureRandomImplementationDoc) .define(SslClientAuthProp, STRING, Defaults.SslClientAuth, in(Defaults.SslClientAuthRequired, Defaults.SslClientAuthRequested, Defaults.SslClientAuthNone), MEDIUM, SslClientAuthDoc) .define(SslCipherSuitesProp, LIST, Collections.emptyList(), MEDIUM, SslCipherSuitesDoc) + .define(SslPrincipalMappingRulesProp, LIST, Defaults.SslPrincipalMappingRules, LOW, SslPrincipalMappingRulesDoc) /** ********* Sasl Configuration ****************/ .define(SaslMechanismInterBrokerProtocolProp, STRING, Defaults.SaslMechanismInterBrokerProtocol, MEDIUM, SaslMechanismInterBrokerProtocolDoc) diff --git a/core/src/test/scala/unit/kafka/server/KafkaConfigTest.scala b/core/src/test/scala/unit/kafka/server/KafkaConfigTest.scala index b75c3e7eb24..ba8e54b2eac 100755 --- a/core/src/test/scala/unit/kafka/server/KafkaConfigTest.scala +++ b/core/src/test/scala/unit/kafka/server/KafkaConfigTest.scala @@ -691,6 +691,7 @@ class KafkaConfigTest { case KafkaConfig.SslEndpointIdentificationAlgorithmProp => // ignore string case KafkaConfig.SslSecureRandomImplementationProp => // ignore string case KafkaConfig.SslCipherSuitesProp => // ignore string + case KafkaConfig.SslPrincipalMappingRulesProp => // ignore string //Sasl Configs case KafkaConfig.SaslMechanismInterBrokerProtocolProp => // ignore diff --git a/docs/security.html b/docs/security.html index 6ff9eba2cbf..5f6d0aceb8e 100644 --- a/docs/security.html +++ b/docs/security.html @@ -1020,8 +1020,37 @@
allow.everyone.if.no.acl.found=true
One can also add super users in server.properties like the following (note that the delimiter is semicolon since SSL user names may contain comma). Default PrincipalType string "User" is case sensitive.
super.users=User:Bob;User:Alice
- By default, the SSL user name will be of the form "CN=writeuser,OU=Unknown,O=Unknown,L=Unknown,ST=Unknown,C=Unknown". One can change that by setting a customized PrincipalBuilder in server.properties like the following. + +
Customizing SSL User Name
+ + By default, the SSL user name will be of the form "CN=writeuser,OU=Unknown,O=Unknown,L=Unknown,ST=Unknown,C=Unknown". One can change that by setting ssl.principal.mapping.rules to a customized rule in server.properties. + This config allows a list of rules for mapping X.500 distinguished name to short name. The rules are evaluated in order and the first rule that matches a distinguished name is used to map it to a short name. Any later rules in the list are ignored. + +
The format of ssl.principal.mapping.rules is a list where each rule starts with "RULE:" and contains an expression as the following formats. Default rule will return + string representation of the X.500 certificate distinguished name. If the distinguished name matches the pattern, then the replacement command will be run over the name. + This also supports lowercase/uppercase options, to force the translated result to be all lower/uppercase case. This is done by adding a "/L" or "/U' to the end of the rule. + +
+        RULE:pattern/replacement/
+        RULE:pattern/replacement/[LU]
+    
+ + Example ssl.principal.mapping.rules values are: +
+        RULE:^CN=(.*?),OU=ServiceUsers.*$/$1/,
+        RULE:^CN=(.*?),OU=(.*?),O=(.*?),L=(.*?),ST=(.*?),C=(.*?)$/$1@$2/L,
+        RULE:^.*[Cc][Nn]=([a-zA-Z0-9.]*).*$/$1/L,
+        DEFAULT
+    
+ + Above rules translate distinguished name "CN=serviceuser,OU=ServiceUsers,O=Unknown,L=Unknown,ST=Unknown,C=Unknown" to "serviceuser" + and "CN=adminUser,OU=Admin,O=Unknown,L=Unknown,ST=Unknown,C=Unknown" to "adminuser@admin". + +
For advanced use cases, one can customize the name by setting a customized PrincipalBuilder in server.properties like the following.
principal.builder.class=CustomizedPrincipalBuilderClass
+ +
Customizing SASL User Name
+ By default, the SASL user name will be the primary part of the Kerberos principal. One can change that by setting sasl.kerberos.principal.to.local.rules to a customized rule in server.properties. The format of sasl.kerberos.principal.to.local.rules is a list where each rule works in the same way as the auth_to_local in Kerberos configuration file (krb5.conf). This also support additional lowercase rule, to force the translated result to be all lower case. This is done by adding a "/L" to the end of the rule. check below formats for syntax. Each rules starts with RULE: and contains an expression as the following formats. See the kerberos documentation for more details.