diff --git a/clients/src/main/java/org/apache/kafka/common/config/internals/BrokerSecurityConfigs.java b/clients/src/main/java/org/apache/kafka/common/config/internals/BrokerSecurityConfigs.java
index 98f6467a484..de54f164be5 100644
--- a/clients/src/main/java/org/apache/kafka/common/config/internals/BrokerSecurityConfigs.java
+++ b/clients/src/main/java/org/apache/kafka/common/config/internals/BrokerSecurityConfigs.java
@@ -55,7 +55,7 @@ public class BrokerSecurityConfigs {
" see security authorization and acls. Note that this configuration is ignored" +
" if an extension of KafkaPrincipalBuilder is provided by the " + PRINCIPAL_BUILDER_CLASS_CONFIG + "
" +
" configuration.";
- public static final List DEFAULT_SSL_PRINCIPAL_MAPPING_RULES = Collections.singletonList("DEFAULT");
+ public static final String DEFAULT_SSL_PRINCIPAL_MAPPING_RULES = "DEFAULT";
public static final String SASL_KERBEROS_PRINCIPAL_TO_LOCAL_RULES_DOC = "A list of rules for mapping from principal " +
"names to short names (typically operating system usernames). The rules are evaluated in order and the " +
diff --git a/clients/src/main/java/org/apache/kafka/common/network/SslChannelBuilder.java b/clients/src/main/java/org/apache/kafka/common/network/SslChannelBuilder.java
index 50081999a27..aade37c2969 100644
--- a/clients/src/main/java/org/apache/kafka/common/network/SslChannelBuilder.java
+++ b/clients/src/main/java/org/apache/kafka/common/network/SslChannelBuilder.java
@@ -35,7 +35,6 @@ import java.net.InetAddress;
import java.net.InetSocketAddress;
import java.nio.channels.SelectionKey;
import java.nio.channels.SocketChannel;
-import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.function.Supplier;
@@ -63,8 +62,7 @@ public class SslChannelBuilder implements ChannelBuilder, ListenerReconfigurable
public void configure(Map configs) throws KafkaException {
try {
this.configs = configs;
- @SuppressWarnings("unchecked")
- List sslPrincipalMappingRules = (List) configs.get(BrokerSecurityConfigs.SSL_PRINCIPAL_MAPPING_RULES_CONFIG);
+ String sslPrincipalMappingRules = (String) configs.get(BrokerSecurityConfigs.SSL_PRINCIPAL_MAPPING_RULES_CONFIG);
if (sslPrincipalMappingRules != null)
sslPrincipalMapper = SslPrincipalMapper.fromRules(sslPrincipalMappingRules);
this.sslFactory = new SslFactory(mode, null, isInterBrokerListener);
diff --git a/clients/src/main/java/org/apache/kafka/common/security/ssl/SslPrincipalMapper.java b/clients/src/main/java/org/apache/kafka/common/security/ssl/SslPrincipalMapper.java
index 3b95e1a5f4d..977fdc332cb 100644
--- a/clients/src/main/java/org/apache/kafka/common/security/ssl/SslPrincipalMapper.java
+++ b/clients/src/main/java/org/apache/kafka/common/security/ssl/SslPrincipalMapper.java
@@ -18,53 +18,44 @@ package org.apache.kafka.common.security.ssl;
import java.io.IOException;
import java.util.List;
-import java.util.Collections;
import java.util.ArrayList;
import java.util.Locale;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
+import static org.apache.kafka.common.config.internals.BrokerSecurityConfigs.DEFAULT_SSL_PRINCIPAL_MAPPING_RULES;
+
public class SslPrincipalMapper {
- private static final Pattern RULE_PARSER = Pattern.compile("((DEFAULT)|(RULE:(([^/]*)/([^/]*))/([LU])?))");
+ private static final String RULE_PATTERN = "(DEFAULT)|RULE:((\\\\.|[^\\\\/])*)/((\\\\.|[^\\\\/])*)/([LU]?).*?|(.*?)";
+ private static final Pattern RULE_SPLITTER = Pattern.compile("\\s*(" + RULE_PATTERN + ")\\s*(,\\s*|$)");
+ private static final Pattern RULE_PARSER = Pattern.compile(RULE_PATTERN);
private final List rules;
- public SslPrincipalMapper(List sslPrincipalMappingRules) {
- this.rules = sslPrincipalMappingRules;
+ public SslPrincipalMapper(String sslPrincipalMappingRules) {
+ this.rules = parseRules(splitRules(sslPrincipalMappingRules));
}
- public static SslPrincipalMapper fromRules(List sslPrincipalMappingRules) {
- List rules = sslPrincipalMappingRules == null ? Collections.singletonList("DEFAULT") : sslPrincipalMappingRules;
- return new SslPrincipalMapper(parseRules(rules));
+ public static SslPrincipalMapper fromRules(String sslPrincipalMappingRules) {
+ return new SslPrincipalMapper(sslPrincipalMappingRules);
}
- private static List joinSplitRules(List rules) {
- String rule = "RULE:";
- String defaultRule = "DEFAULT";
- List retVal = new ArrayList<>();
- StringBuilder currentRule = new StringBuilder();
- for (String r : rules) {
- if (currentRule.length() > 0) {
- if (r.startsWith(rule) || r.equals(defaultRule)) {
- retVal.add(currentRule.toString());
- currentRule.setLength(0);
- currentRule.append(r);
- } else {
- currentRule.append(String.format(",%s", r));
- }
- } else {
- currentRule.append(r);
- }
+ private static List splitRules(String sslPrincipalMappingRules) {
+ if (sslPrincipalMappingRules == null) {
+ sslPrincipalMappingRules = DEFAULT_SSL_PRINCIPAL_MAPPING_RULES;
}
- if (currentRule.length() > 0) {
- retVal.add(currentRule.toString());
+
+ List result = new ArrayList<>();
+ Matcher matcher = RULE_SPLITTER.matcher(sslPrincipalMappingRules.trim());
+ while (matcher.find()) {
+ result.add(matcher.group(1));
}
- return retVal;
+
+ return result;
}
private static List parseRules(List rules) {
- rules = joinSplitRules(rules);
List result = new ArrayList<>();
for (String rule : rules) {
Matcher matcher = RULE_PARSER.matcher(rule);
@@ -74,15 +65,18 @@ public class SslPrincipalMapper {
if (rule.length() != matcher.end()) {
throw new IllegalArgumentException("Invalid rule: `" + rule + "`, unmatched substring: `" + rule.substring(matcher.end()) + "`");
}
- if (matcher.group(2) != null) {
+
+ // empty rules are ignored
+ if (matcher.group(1) != null) {
result.add(new Rule());
- } else {
- result.add(new Rule(matcher.group(5),
- matcher.group(6),
- "L".equals(matcher.group(7)),
- "U".equals(matcher.group(7))));
+ } else if (matcher.group(2) != null) {
+ result.add(new Rule(matcher.group(2),
+ matcher.group(4),
+ "L".equals(matcher.group(6)),
+ "U".equals(matcher.group(6))));
}
}
+
return result;
}
diff --git a/clients/src/test/java/org/apache/kafka/common/security/auth/DefaultKafkaPrincipalBuilderTest.java b/clients/src/test/java/org/apache/kafka/common/security/auth/DefaultKafkaPrincipalBuilderTest.java
index dd5087a84b3..dc1d6225e66 100644
--- a/clients/src/test/java/org/apache/kafka/common/security/auth/DefaultKafkaPrincipalBuilderTest.java
+++ b/clients/src/test/java/org/apache/kafka/common/security/auth/DefaultKafkaPrincipalBuilderTest.java
@@ -30,8 +30,6 @@ import javax.net.ssl.SSLSession;
import javax.security.sasl.SaslServer;
import java.net.InetAddress;
import java.security.Principal;
-import java.util.Arrays;
-import java.util.List;
import static org.junit.Assert.assertEquals;
import static org.mockito.ArgumentMatchers.any;
@@ -143,7 +141,7 @@ public class DefaultKafkaPrincipalBuilderTest {
.thenReturn(new X500Principal("CN=duke, OU=JavaSoft, O=Sun Microsystems"))
.thenReturn(new X500Principal("OU=JavaSoft, O=Sun Microsystems, C=US"));
- List rules = Arrays.asList(
+ String rules = String.join(", ",
"RULE:^CN=(.*),OU=ServiceUsers.*$/$1/L",
"RULE:^CN=(.*),OU=(.*),O=(.*),L=(.*),ST=(.*),C=(.*)$/$1@$2/L",
"RULE:^.*[Cc][Nn]=([a-zA-Z0-9.]*).*$/$1/U",
diff --git a/clients/src/test/java/org/apache/kafka/common/security/ssl/SslPrincipalMapperTest.java b/clients/src/test/java/org/apache/kafka/common/security/ssl/SslPrincipalMapperTest.java
index 56ef977a112..7b7b67b1dbc 100644
--- a/clients/src/test/java/org/apache/kafka/common/security/ssl/SslPrincipalMapperTest.java
+++ b/clients/src/test/java/org/apache/kafka/common/security/ssl/SslPrincipalMapperTest.java
@@ -18,9 +18,6 @@ package org.apache.kafka.common.security.ssl;
import org.junit.Test;
-import java.util.Arrays;
-import java.util.List;
-
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.fail;
@@ -28,59 +25,37 @@ public class SslPrincipalMapperTest {
@Test
public void testValidRules() {
- testValidRule(Arrays.asList("DEFAULT"));
- testValidRule(Arrays.asList("RULE:^CN=(.*?),OU=ServiceUsers.*$/$1/"));
- testValidRule(Arrays.asList("RULE:^CN=(.*?),OU=ServiceUsers.*$/$1/L", "DEFAULT"));
- testValidRule(Arrays.asList("RULE:^CN=(.*?),OU=(.*?),O=(.*?),L=(.*?),ST=(.*?),C=(.*?)$/$1@$2/"));
- testValidRule(Arrays.asList("RULE:^.*[Cc][Nn]=([a-zA-Z0-9.]*).*$/$1/L"));
- testValidRule(Arrays.asList("RULE:^cn=(.?),ou=(.?),dc=(.?),dc=(.?)$/$1@$2/U"));
+ testValidRule("DEFAULT");
+ testValidRule("RULE:^CN=(.*?),OU=ServiceUsers.*$/$1/");
+ testValidRule("RULE:^CN=(.*?),OU=ServiceUsers.*$/$1/L, DEFAULT");
+ testValidRule("RULE:^CN=(.*?),OU=(.*?),O=(.*?),L=(.*?),ST=(.*?),C=(.*?)$/$1@$2/");
+ testValidRule("RULE:^.*[Cc][Nn]=([a-zA-Z0-9.]*).*$/$1/L");
+ testValidRule("RULE:^cn=(.?),ou=(.?),dc=(.?),dc=(.?)$/$1@$2/U");
+
+ testValidRule("RULE:^CN=([^,ADEFLTU,]+)(,.*|$)/$1/");
+ testValidRule("RULE:^CN=([^,DEFAULT,]+)(,.*|$)/$1/");
}
- @Test
- public void testValidSplitRules() {
- testValidRule(Arrays.asList("DEFAULT"));
- testValidRule(Arrays.asList("RULE:^CN=(.*?)", "OU=ServiceUsers.*$/$1/"));
- testValidRule(Arrays.asList("RULE:^CN=(.*?)", "OU=ServiceUsers.*$/$1/L", "DEFAULT"));
- testValidRule(Arrays.asList("RULE:^CN=(.*?)", "OU=(.*?),O=(.*?),L=(.*?)", "ST=(.*?)", "C=(.*?)$/$1@$2/"));
- testValidRule(Arrays.asList("RULE:^.*[Cc][Nn]=([a-zA-Z0-9.]*).*$/$1/L"));
- testValidRule(Arrays.asList("RULE:^cn=(.?)", "ou=(.?)", "dc=(.?)", "dc=(.?)$/$1@$2/U"));
- }
-
- private void testValidRule(List rules) {
+ private void testValidRule(String rules) {
SslPrincipalMapper.fromRules(rules);
}
@Test
public void testInvalidRules() {
- testInvalidRule(Arrays.asList("default"));
- testInvalidRule(Arrays.asList("DEFAUL"));
- testInvalidRule(Arrays.asList("DEFAULT/L"));
- testInvalidRule(Arrays.asList("DEFAULT/U"));
-
- testInvalidRule(Arrays.asList("RULE:CN=(.*?),OU=ServiceUsers.*/$1"));
- testInvalidRule(Arrays.asList("rule:^CN=(.*?),OU=ServiceUsers.*$/$1/"));
- testInvalidRule(Arrays.asList("RULE:^CN=(.*?),OU=ServiceUsers.*$/$1/L/U"));
- testInvalidRule(Arrays.asList("RULE:^CN=(.*?),OU=ServiceUsers.*$/L"));
- testInvalidRule(Arrays.asList("RULE:^CN=(.*?),OU=ServiceUsers.*$/U"));
- testInvalidRule(Arrays.asList("RULE:^CN=(.*?),OU=ServiceUsers.*$/LU"));
+ testInvalidRule("default");
+ testInvalidRule("DEFAUL");
+ testInvalidRule("DEFAULT/L");
+ testInvalidRule("DEFAULT/U");
+
+ testInvalidRule("RULE:CN=(.*?),OU=ServiceUsers.*/$1");
+ testInvalidRule("rule:^CN=(.*?),OU=ServiceUsers.*$/$1/");
+ testInvalidRule("RULE:^CN=(.*?),OU=ServiceUsers.*$/$1/L/U");
+ testInvalidRule("RULE:^CN=(.*?),OU=ServiceUsers.*$/L");
+ testInvalidRule("RULE:^CN=(.*?),OU=ServiceUsers.*$/U");
+ testInvalidRule("RULE:^CN=(.*?),OU=ServiceUsers.*$/LU");
}
- @Test
- public void testInvalidSplitRules() {
- testInvalidRule(Arrays.asList("default"));
- testInvalidRule(Arrays.asList("DEFAUL"));
- testInvalidRule(Arrays.asList("DEFAULT/L"));
- testInvalidRule(Arrays.asList("DEFAULT/U"));
-
- testInvalidRule(Arrays.asList("RULE:CN=(.*?)", "OU=ServiceUsers.*/$1"));
- testInvalidRule(Arrays.asList("rule:^CN=(.*?)", "OU=ServiceUsers.*$/$1/"));
- testInvalidRule(Arrays.asList("RULE:^CN=(.*?)", "OU=ServiceUsers.*$/$1/L/U"));
- testInvalidRule(Arrays.asList("RULE:^CN=(.*?)", "OU=ServiceUsers.*$/L"));
- testInvalidRule(Arrays.asList("RULE:^CN=(.*?)", "OU=ServiceUsers.*$/U"));
- testInvalidRule(Arrays.asList("RULE:^CN=(.*?)", "OU=ServiceUsers.*$/LU"));
- }
-
- private void testInvalidRule(List rules) {
+ private void testInvalidRule(String rules) {
try {
System.out.println(SslPrincipalMapper.fromRules(rules));
fail("should have thrown IllegalArgumentException");
@@ -90,7 +65,7 @@ public class SslPrincipalMapperTest {
@Test
public void testSslPrincipalMapper() throws Exception {
- List rules = Arrays.asList(
+ String rules = String.join(", ",
"RULE:^CN=(.*?),OU=ServiceUsers.*$/$1/L",
"RULE:^CN=(.*?),OU=(.*?),O=(.*?),L=(.*?),ST=(.*?),C=(.*?)$/$1@$2/L",
"RULE:^cn=(.*?),ou=(.*?),dc=(.*?),dc=(.*?)$/$1@$2/U",
@@ -107,4 +82,47 @@ public class SslPrincipalMapperTest {
assertEquals("OU=JavaSoft,O=Sun Microsystems,C=US", mapper.getName("OU=JavaSoft,O=Sun Microsystems,C=US"));
}
+ private void testRulesSplitting(String expected, String rules) {
+ SslPrincipalMapper mapper = SslPrincipalMapper.fromRules(rules);
+ assertEquals(String.format("SslPrincipalMapper(rules = %s)", expected), mapper.toString());
+ }
+
+ @Test
+ public void testRulesSplitting() {
+ // seeing is believing
+ testRulesSplitting("[]", "");
+ testRulesSplitting("[DEFAULT]", "DEFAULT");
+ testRulesSplitting("[RULE:/]", "RULE://");
+ testRulesSplitting("[RULE:/.*]", "RULE:/.*/");
+ testRulesSplitting("[RULE:/.*/L]", "RULE:/.*/L");
+ testRulesSplitting("[RULE:/, DEFAULT]", "RULE://,DEFAULT");
+ testRulesSplitting("[RULE:/, DEFAULT]", " RULE:// , DEFAULT ");
+ testRulesSplitting("[RULE: / , DEFAULT]", " RULE: / / , DEFAULT ");
+ testRulesSplitting("[RULE: / /U, DEFAULT]", " RULE: / /U ,DEFAULT ");
+ testRulesSplitting("[RULE:([A-Z]*)/$1/U, RULE:([a-z]+)/$1, DEFAULT]", " RULE:([A-Z]*)/$1/U ,RULE:([a-z]+)/$1/, DEFAULT ");
+
+ // empty rules are ignored
+ testRulesSplitting("[]", ", , , , , , , ");
+ testRulesSplitting("[RULE:/, DEFAULT]", ",,RULE://,,,DEFAULT,,");
+ testRulesSplitting("[RULE: / , DEFAULT]", ", , RULE: / / ,,, DEFAULT, , ");
+ testRulesSplitting("[RULE: / /U, DEFAULT]", " , , RULE: / /U ,, ,DEFAULT, ,");
+
+ // escape sequences
+ testRulesSplitting("[RULE:\\/\\\\\\(\\)\\n\\t/\\/\\/]", "RULE:\\/\\\\\\(\\)\\n\\t/\\/\\//");
+ testRulesSplitting("[RULE:\\**\\/+/*/L, RULE:\\/*\\**/**]", "RULE:\\**\\/+/*/L,RULE:\\/*\\**/**/");
+
+ // rules rule
+ testRulesSplitting(
+ "[RULE:,RULE:,/,RULE:,\\//U, RULE:,/RULE:,, RULE:,RULE:,/L,RULE:,/L, RULE:, DEFAULT, /DEFAULT, DEFAULT]",
+ "RULE:,RULE:,/,RULE:,\\//U,RULE:,/RULE:,/,RULE:,RULE:,/L,RULE:,/L,RULE:, DEFAULT, /DEFAULT/,DEFAULT"
+ );
+ }
+
+ @Test
+ public void testCommaWithWhitespace() throws Exception {
+ String rules = "RULE:^CN=((\\\\, *|\\w)+)(,.*|$)/$1/,DEFAULT";
+
+ SslPrincipalMapper mapper = SslPrincipalMapper.fromRules(rules);
+ assertEquals("Tkac\\, Adam", mapper.getName("CN=Tkac\\, Adam,OU=ITZ,DC=geodis,DC=cz"));
+ }
}
diff --git a/core/src/main/scala/kafka/server/KafkaConfig.scala b/core/src/main/scala/kafka/server/KafkaConfig.scala
index 97f894cb5a6..4d6a65d79a1 100755
--- a/core/src/main/scala/kafka/server/KafkaConfig.scala
+++ b/core/src/main/scala/kafka/server/KafkaConfig.scala
@@ -1059,7 +1059,7 @@ object KafkaConfig {
.define(SslSecureRandomImplementationProp, STRING, null, LOW, SslSecureRandomImplementationDoc)
.define(SslClientAuthProp, STRING, Defaults.SslClientAuthentication, in(Defaults.SslClientAuthenticationValidValues:_*), MEDIUM, SslClientAuthDoc)
.define(SslCipherSuitesProp, LIST, Collections.emptyList(), MEDIUM, SslCipherSuitesDoc)
- .define(SslPrincipalMappingRulesProp, LIST, Defaults.SslPrincipalMappingRules, LOW, SslPrincipalMappingRulesDoc)
+ .define(SslPrincipalMappingRulesProp, STRING, Defaults.SslPrincipalMappingRules, LOW, SslPrincipalMappingRulesDoc)
/** ********* Sasl Configuration ****************/
.define(SaslMechanismInterBrokerProtocolProp, STRING, Defaults.SaslMechanismInterBrokerProtocol, MEDIUM, SaslMechanismInterBrokerProtocolDoc)