diff --git a/clients/src/main/java/org/apache/kafka/clients/admin/AdminClientConfig.java b/clients/src/main/java/org/apache/kafka/clients/admin/AdminClientConfig.java
index ba97b5bcb68..35a781e1deb 100644
--- a/clients/src/main/java/org/apache/kafka/clients/admin/AdminClientConfig.java
+++ b/clients/src/main/java/org/apache/kafka/clients/admin/AdminClientConfig.java
@@ -23,6 +23,7 @@ import org.apache.kafka.common.config.AbstractConfig;
import org.apache.kafka.common.config.ConfigDef;
import org.apache.kafka.common.config.ConfigDef.Importance;
import org.apache.kafka.common.config.ConfigDef.Type;
+import org.apache.kafka.common.config.SecurityConfig;
import org.apache.kafka.common.metrics.Sensor;
import java.util.Map;
@@ -107,6 +108,12 @@ public class AdminClientConfig extends AbstractConfig {
public static final String RETRIES_CONFIG = CommonClientConfigs.RETRIES_CONFIG;
+ /**
+ * security.providers
+ */
+ public static final String SECURITY_PROVIDERS_CONFIG = SecurityConfig.SECURITY_PROVIDERS_CONFIG;
+ private static final String SECURITY_PROVIDERS_DOC = SecurityConfig.SECURITY_PROVIDERS_DOC;
+
static {
CONFIG = new ConfigDef().define(BOOTSTRAP_SERVERS_CONFIG,
Type.LIST,
@@ -174,6 +181,11 @@ public class AdminClientConfig extends AbstractConfig {
Importance.MEDIUM,
CLIENT_DNS_LOOKUP_DOC)
// security support
+ .define(SECURITY_PROVIDERS_CONFIG,
+ Type.STRING,
+ null,
+ Importance.LOW,
+ SECURITY_PROVIDERS_DOC)
.define(SECURITY_PROTOCOL_CONFIG,
Type.STRING,
DEFAULT_SECURITY_PROTOCOL,
diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerConfig.java b/clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerConfig.java
index 2e4507af1d4..a4f3003cc50 100644
--- a/clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerConfig.java
+++ b/clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerConfig.java
@@ -22,6 +22,7 @@ import org.apache.kafka.common.config.AbstractConfig;
import org.apache.kafka.common.config.ConfigDef;
import org.apache.kafka.common.config.ConfigDef.Importance;
import org.apache.kafka.common.config.ConfigDef.Type;
+import org.apache.kafka.common.config.SecurityConfig;
import org.apache.kafka.common.metrics.Sensor;
import org.apache.kafka.common.requests.IsolationLevel;
import org.apache.kafka.common.serialization.Deserializer;
@@ -267,7 +268,13 @@ public class ConsumerConfig extends AbstractConfig {
" broker allows for it using `auto.create.topics.enable` broker configuration. This configuration must" +
" be set to `false` when using brokers older than 0.11.0";
public static final boolean DEFAULT_ALLOW_AUTO_CREATE_TOPICS = true;
-
+
+ /**
+ * security.providers
+ */
+ public static final String SECURITY_PROVIDERS_CONFIG = SecurityConfig.SECURITY_PROVIDERS_CONFIG;
+ private static final String SECURITY_PROVIDERS_DOC = SecurityConfig.SECURITY_PROVIDERS_DOC;
+
static {
CONFIG = new ConfigDef().define(BOOTSTRAP_SERVERS_CONFIG,
Type.LIST,
@@ -486,6 +493,11 @@ public class ConsumerConfig extends AbstractConfig {
Importance.MEDIUM,
ALLOW_AUTO_CREATE_TOPICS_DOC)
// security support
+ .define(SECURITY_PROVIDERS_CONFIG,
+ Type.STRING,
+ null,
+ Importance.LOW,
+ SECURITY_PROVIDERS_DOC)
.define(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG,
Type.STRING,
CommonClientConfigs.DEFAULT_SECURITY_PROTOCOL,
diff --git a/clients/src/main/java/org/apache/kafka/clients/producer/ProducerConfig.java b/clients/src/main/java/org/apache/kafka/clients/producer/ProducerConfig.java
index 0d87284c1cf..00767eb29d9 100644
--- a/clients/src/main/java/org/apache/kafka/clients/producer/ProducerConfig.java
+++ b/clients/src/main/java/org/apache/kafka/clients/producer/ProducerConfig.java
@@ -23,6 +23,7 @@ import org.apache.kafka.common.config.AbstractConfig;
import org.apache.kafka.common.config.ConfigDef;
import org.apache.kafka.common.config.ConfigDef.Importance;
import org.apache.kafka.common.config.ConfigDef.Type;
+import org.apache.kafka.common.config.SecurityConfig;
import org.apache.kafka.common.metrics.Sensor;
import org.apache.kafka.common.serialization.Serializer;
@@ -240,6 +241,12 @@ public class ProducerConfig extends AbstractConfig {
"The default is null
, which means transactions cannot be used. " +
"Note that, by default, transactions require a cluster of at least three brokers which is the recommended setting for production; for development you can change this, by adjusting broker setting transaction.state.log.replication.factor
.";
+ /**
+ * security.providers
+ */
+ public static final String SECURITY_PROVIDERS_CONFIG = SecurityConfig.SECURITY_PROVIDERS_CONFIG;
+ private static final String SECURITY_PROVIDERS_DOC = SecurityConfig.SECURITY_PROVIDERS_DOC;
+
static {
CONFIG = new ConfigDef().define(BOOTSTRAP_SERVERS_CONFIG, Type.LIST, Collections.emptyList(), new ConfigDef.NonNullValidator(), Importance.HIGH, CommonClientConfigs.BOOTSTRAP_SERVERS_DOC)
.define(CLIENT_DNS_LOOKUP_CONFIG,
@@ -341,6 +348,11 @@ public class ProducerConfig extends AbstractConfig {
CommonClientConfigs.DEFAULT_SECURITY_PROTOCOL,
Importance.MEDIUM,
CommonClientConfigs.SECURITY_PROTOCOL_DOC)
+ .define(SECURITY_PROVIDERS_CONFIG,
+ Type.STRING,
+ null,
+ Importance.LOW,
+ SECURITY_PROVIDERS_DOC)
.withClientSslSupport()
.withClientSaslSupport()
.define(ENABLE_IDEMPOTENCE_CONFIG,
diff --git a/clients/src/main/java/org/apache/kafka/common/config/SecurityConfig.java b/clients/src/main/java/org/apache/kafka/common/config/SecurityConfig.java
new file mode 100644
index 00000000000..0889aa5f44d
--- /dev/null
+++ b/clients/src/main/java/org/apache/kafka/common/config/SecurityConfig.java
@@ -0,0 +1,28 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.common.config;
+
+/**
+ * Contains the common security config for SSL and SASL
+ */
+public class SecurityConfig {
+
+ public static final String SECURITY_PROVIDERS_CONFIG = "security.providers";
+ public static final String SECURITY_PROVIDERS_DOC = "A list of configurable creators each returning a provider " +
+ "implementing security algorithms";
+
+}
diff --git a/clients/src/main/java/org/apache/kafka/common/security/SecurityProviderCreator.java b/clients/src/main/java/org/apache/kafka/common/security/SecurityProviderCreator.java
new file mode 100644
index 00000000000..e5f3a7c30ad
--- /dev/null
+++ b/clients/src/main/java/org/apache/kafka/common/security/SecurityProviderCreator.java
@@ -0,0 +1,41 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.common.security;
+
+import org.apache.kafka.common.Configurable;
+
+import java.security.Provider;
+import java.util.Map;
+
+/**
+ * An interface for generating security providers.
+ */
+public interface SecurityProviderCreator extends Configurable {
+
+ /**
+ * Configure method is used to configure the generator to create the Security Provider
+ * @param config configuration parameters for initialising security provider
+ */
+ default void configure(Map config) {
+
+ }
+
+ /**
+ * Generate the security provider configured
+ */
+ Provider getProvider();
+}
diff --git a/clients/src/main/java/org/apache/kafka/common/security/authenticator/LoginManager.java b/clients/src/main/java/org/apache/kafka/common/security/authenticator/LoginManager.java
index 3d9481c497d..6613fd147f8 100644
--- a/clients/src/main/java/org/apache/kafka/common/security/authenticator/LoginManager.java
+++ b/clients/src/main/java/org/apache/kafka/common/security/authenticator/LoginManager.java
@@ -25,6 +25,7 @@ import org.apache.kafka.common.security.auth.AuthenticateCallbackHandler;
import org.apache.kafka.common.security.auth.Login;
import org.apache.kafka.common.security.oauthbearer.OAuthBearerLoginModule;
import org.apache.kafka.common.security.oauthbearer.internals.unsecured.OAuthBearerUnsecuredLoginCallbackHandler;
+import org.apache.kafka.common.utils.SecurityUtils;
import org.apache.kafka.common.utils.Utils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -112,6 +113,7 @@ public class LoginManager {
STATIC_INSTANCES.put(loginMetadata, loginManager);
}
}
+ SecurityUtils.addConfiguredSecurityProviders(configs);
return loginManager.acquire();
}
}
diff --git a/clients/src/main/java/org/apache/kafka/common/security/ssl/SslEngineBuilder.java b/clients/src/main/java/org/apache/kafka/common/security/ssl/SslEngineBuilder.java
index fe9e135a86a..dc49f5e9fff 100644
--- a/clients/src/main/java/org/apache/kafka/common/security/ssl/SslEngineBuilder.java
+++ b/clients/src/main/java/org/apache/kafka/common/security/ssl/SslEngineBuilder.java
@@ -22,6 +22,7 @@ import org.apache.kafka.common.config.SslConfigs;
import org.apache.kafka.common.config.internals.BrokerSecurityConfigs;
import org.apache.kafka.common.network.Mode;
import org.apache.kafka.common.config.types.Password;
+import org.apache.kafka.common.utils.SecurityUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -35,9 +36,9 @@ import java.io.IOException;
import java.io.InputStream;
import java.nio.file.Files;
import java.nio.file.Paths;
+import java.security.SecureRandom;
import java.security.GeneralSecurityException;
import java.security.KeyStore;
-import java.security.SecureRandom;
import java.util.Collections;
import java.util.Date;
import java.util.List;
@@ -66,6 +67,7 @@ public class SslEngineBuilder {
this.configs = Collections.unmodifiableMap(configs);
this.protocol = (String) configs.get(SslConfigs.SSL_PROTOCOL_CONFIG);
this.provider = (String) configs.get(SslConfigs.SSL_PROVIDER_CONFIG);
+ SecurityUtils.addConfiguredSecurityProviders(this.configs);
List cipherSuitesList = (List) configs.get(SslConfigs.SSL_CIPHER_SUITES_CONFIG);
if (cipherSuitesList != null && !cipherSuitesList.isEmpty()) {
diff --git a/clients/src/main/java/org/apache/kafka/common/security/ssl/SslFactory.java b/clients/src/main/java/org/apache/kafka/common/security/ssl/SslFactory.java
index c0094128420..a338138eb51 100644
--- a/clients/src/main/java/org/apache/kafka/common/security/ssl/SslFactory.java
+++ b/clients/src/main/java/org/apache/kafka/common/security/ssl/SslFactory.java
@@ -19,6 +19,7 @@ package org.apache.kafka.common.security.ssl;
import org.apache.kafka.common.KafkaException;
import org.apache.kafka.common.Reconfigurable;
import org.apache.kafka.common.config.ConfigException;
+import org.apache.kafka.common.config.SecurityConfig;
import org.apache.kafka.common.config.SslConfigs;
import org.apache.kafka.common.config.internals.BrokerSecurityConfigs;
import org.apache.kafka.common.network.Mode;
@@ -87,6 +88,7 @@ public class SslFactory implements Reconfigurable {
Map nextConfigs = new HashMap<>();
copyMapEntries(nextConfigs, configs, SslConfigs.NON_RECONFIGURABLE_CONFIGS);
copyMapEntries(nextConfigs, configs, SslConfigs.RECONFIGURABLE_CONFIGS);
+ copyMapEntry(nextConfigs, configs, SecurityConfig.SECURITY_PROVIDERS_CONFIG);
if (clientAuthConfigOverride != null) {
nextConfigs.put(BrokerSecurityConfigs.SSL_CLIENT_AUTH_CONFIG, clientAuthConfigOverride);
}
@@ -196,9 +198,24 @@ public class SslFactory implements Reconfigurable {
Map srcMap,
Set keySet) {
for (K k : keySet) {
- if (srcMap.containsKey(k)) {
- destMap.put(k, srcMap.get(k));
- }
+ copyMapEntry(destMap, srcMap, k);
+ }
+ }
+
+ /**
+ * Copy entry from one map into another.
+ *
+ * @param destMap The map to copy entries into.
+ * @param srcMap The map to copy entries from.
+ * @param key The entry with this key will be copied
+ * @param The map key type.
+ * @param The map value type.
+ */
+ private static void copyMapEntry(Map destMap,
+ Map srcMap,
+ K key) {
+ if (srcMap.containsKey(key)) {
+ destMap.put(key, srcMap.get(key));
}
}
diff --git a/clients/src/main/java/org/apache/kafka/common/utils/SecurityUtils.java b/clients/src/main/java/org/apache/kafka/common/utils/SecurityUtils.java
index 9c9bd44bfcf..bd3a22c7a58 100644
--- a/clients/src/main/java/org/apache/kafka/common/utils/SecurityUtils.java
+++ b/clients/src/main/java/org/apache/kafka/common/utils/SecurityUtils.java
@@ -16,10 +16,19 @@
*/
package org.apache.kafka.common.utils;
+import org.apache.kafka.common.config.SecurityConfig;
+import org.apache.kafka.common.security.SecurityProviderCreator;
import org.apache.kafka.common.security.auth.KafkaPrincipal;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.security.Security;
+import java.util.Map;
public class SecurityUtils {
+ private static final Logger LOGGER = LoggerFactory.getLogger(SecurityConfig.class);
+
public static KafkaPrincipal parseKafkaPrincipal(String str) {
if (str == null || str.isEmpty()) {
throw new IllegalArgumentException("expected a string in format principalType:principalName but got " + str);
@@ -34,4 +43,26 @@ public class SecurityUtils {
return new KafkaPrincipal(split[0], split[1]);
}
+ public static void addConfiguredSecurityProviders(Map configs) {
+ String securityProviderClassesStr = (String) configs.get(SecurityConfig.SECURITY_PROVIDERS_CONFIG);
+ if (securityProviderClassesStr == null || securityProviderClassesStr.equals("")) {
+ return;
+ }
+ try {
+ String[] securityProviderClasses = securityProviderClassesStr.replaceAll("\\s+", "").split(",");
+ for (int index = 0; index < securityProviderClasses.length; index++) {
+ SecurityProviderCreator securityProviderCreator = (SecurityProviderCreator) Class.forName(securityProviderClasses[index]).newInstance();
+ securityProviderCreator.configure(configs);
+ Security.insertProviderAt(securityProviderCreator.getProvider(), index + 1);
+ }
+ } catch (ClassCastException e) {
+ LOGGER.error("Creators provided through " + SecurityConfig.SECURITY_PROVIDERS_CONFIG +
+ " are expected to be sub-classes of SecurityProviderCreator");
+ } catch (ClassNotFoundException cnfe) {
+ LOGGER.error("Unrecognized security provider creator class", cnfe);
+ } catch (IllegalAccessException | InstantiationException e) {
+ LOGGER.error("Unexpected implementation of security provider creator class", e);
+ }
+ }
+
}
diff --git a/clients/src/test/java/org/apache/kafka/common/network/SslSelectorTest.java b/clients/src/test/java/org/apache/kafka/common/network/SslSelectorTest.java
index 02cbaf84578..19e7a8bf78a 100644
--- a/clients/src/test/java/org/apache/kafka/common/network/SslSelectorTest.java
+++ b/clients/src/test/java/org/apache/kafka/common/network/SslSelectorTest.java
@@ -18,13 +18,15 @@ package org.apache.kafka.common.network;
import java.nio.channels.SelectionKey;
import javax.net.ssl.SSLEngine;
+
+import org.apache.kafka.common.config.SecurityConfig;
import org.apache.kafka.common.memory.MemoryPool;
import org.apache.kafka.common.memory.SimpleMemoryPool;
import org.apache.kafka.common.metrics.Metrics;
import org.apache.kafka.common.security.auth.SecurityProtocol;
import org.apache.kafka.common.security.ssl.SslFactory;
import org.apache.kafka.common.security.ssl.mock.TestKeyManagerFactory;
-import org.apache.kafka.common.security.ssl.mock.TestProvider;
+import org.apache.kafka.common.security.ssl.mock.TestProviderCreator;
import org.apache.kafka.common.security.ssl.mock.TestTrustManagerFactory;
import org.apache.kafka.common.utils.LogContext;
import org.apache.kafka.common.utils.MockTime;
@@ -41,7 +43,6 @@ import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.channels.ServerSocketChannel;
import java.nio.channels.SocketChannel;
-import java.security.Provider;
import java.security.Security;
import java.util.ArrayList;
import java.util.Collections;
@@ -93,8 +94,8 @@ public class SslSelectorTest extends SelectorTest {
@Test
public void testConnectionWithCustomKeyManager() throws Exception {
- Provider provider = new TestProvider();
- Security.addProvider(provider);
+
+ TestProviderCreator testProviderCreator = new TestProviderCreator();
int requestSize = 100 * 1024;
final String node = "0";
@@ -104,6 +105,7 @@ public class SslSelectorTest extends SelectorTest {
TestKeyManagerFactory.ALGORITHM,
TestTrustManagerFactory.ALGORITHM
);
+ sslServerConfigs.put(SecurityConfig.SECURITY_PROVIDERS_CONFIG, testProviderCreator.getClass().getName());
EchoServer server = new EchoServer(SecurityProtocol.SSL, sslServerConfigs);
server.start();
Time time = new MockTime();
@@ -128,7 +130,7 @@ public class SslSelectorTest extends SelectorTest {
selector.close(node);
super.verifySelectorEmpty(selector);
- Security.removeProvider(provider.getName());
+ Security.removeProvider(testProviderCreator.getProvider().getName());
selector.close();
server.close();
metrics.close();
diff --git a/clients/src/test/java/org/apache/kafka/common/security/ssl/SslFactoryTest.java b/clients/src/test/java/org/apache/kafka/common/security/ssl/SslFactoryTest.java
index 20cd4992e98..b4166a968ff 100644
--- a/clients/src/test/java/org/apache/kafka/common/security/ssl/SslFactoryTest.java
+++ b/clients/src/test/java/org/apache/kafka/common/security/ssl/SslFactoryTest.java
@@ -19,18 +19,19 @@ package org.apache.kafka.common.security.ssl;
import java.io.File;
import java.nio.file.Files;
import java.security.KeyStore;
-import java.security.Provider;
import java.util.Arrays;
import java.util.Map;
import javax.net.ssl.SSLContext;
import javax.net.ssl.SSLEngine;
+import org.apache.kafka.common.KafkaException;
import org.apache.kafka.common.config.ConfigException;
+import org.apache.kafka.common.config.SecurityConfig;
import org.apache.kafka.common.config.SslConfigs;
import org.apache.kafka.common.config.types.Password;
import org.apache.kafka.common.security.ssl.mock.TestKeyManagerFactory;
-import org.apache.kafka.common.security.ssl.mock.TestProvider;
+import org.apache.kafka.common.security.ssl.mock.TestProviderCreator;
import org.apache.kafka.common.security.ssl.mock.TestTrustManagerFactory;
import org.apache.kafka.common.utils.Utils;
import org.apache.kafka.test.TestSslUtils;
@@ -63,17 +64,41 @@ public class SslFactoryTest {
}
@Test
- public void testSslFactoryWithCustomKeyManagerConfiguration() throws Exception {
- Provider provider = new TestProvider();
- Security.addProvider(provider);
+ public void testSslFactoryWithCustomKeyManagerConfiguration() {
+ TestProviderCreator testProviderCreator = new TestProviderCreator();
Map serverSslConfig = TestSslUtils.createSslConfig(
TestKeyManagerFactory.ALGORITHM,
TestTrustManagerFactory.ALGORITHM
);
+ serverSslConfig.put(SecurityConfig.SECURITY_PROVIDERS_CONFIG, testProviderCreator.getClass().getName());
SslFactory sslFactory = new SslFactory(Mode.SERVER);
sslFactory.configure(serverSslConfig);
assertNotNull("SslEngineBuilder not created", sslFactory.sslEngineBuilder());
- Security.removeProvider(provider.getName());
+ Security.removeProvider(testProviderCreator.getProvider().getName());
+ }
+
+ @Test(expected = KafkaException.class)
+ public void testSslFactoryWithoutProviderClassConfiguration() {
+ // An exception is thrown as the algorithm is not registered through a provider
+ Map serverSslConfig = TestSslUtils.createSslConfig(
+ TestKeyManagerFactory.ALGORITHM,
+ TestTrustManagerFactory.ALGORITHM
+ );
+ SslFactory sslFactory = new SslFactory(Mode.SERVER);
+ sslFactory.configure(serverSslConfig);
+ }
+
+ @Test(expected = KafkaException.class)
+ public void testSslFactoryWithIncorrectProviderClassConfiguration() {
+ // An exception is thrown as the algorithm is not registered through a provider
+ Map serverSslConfig = TestSslUtils.createSslConfig(
+ TestKeyManagerFactory.ALGORITHM,
+ TestTrustManagerFactory.ALGORITHM
+ );
+ serverSslConfig.put(SecurityConfig.SECURITY_PROVIDERS_CONFIG,
+ "com.fake.ProviderClass1,com.fake.ProviderClass2");
+ SslFactory sslFactory = new SslFactory(Mode.SERVER);
+ sslFactory.configure(serverSslConfig);
}
@Test
diff --git a/clients/src/test/java/org/apache/kafka/common/security/ssl/mock/TestPlainSaslServerProvider.java b/clients/src/test/java/org/apache/kafka/common/security/ssl/mock/TestPlainSaslServerProvider.java
new file mode 100644
index 00000000000..5e6e82ea07e
--- /dev/null
+++ b/clients/src/test/java/org/apache/kafka/common/security/ssl/mock/TestPlainSaslServerProvider.java
@@ -0,0 +1,31 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.common.security.ssl.mock;
+
+import java.security.Provider;
+
+public class TestPlainSaslServerProvider extends Provider {
+
+ public TestPlainSaslServerProvider() {
+ this("TestPlainSaslServerProvider", 0.1, "test plain sasl server provider");
+ }
+
+ protected TestPlainSaslServerProvider(String name, double version, String info) {
+ super(name, version, info);
+ }
+
+}
diff --git a/clients/src/test/java/org/apache/kafka/common/security/ssl/mock/TestPlainSaslServerProviderCreator.java b/clients/src/test/java/org/apache/kafka/common/security/ssl/mock/TestPlainSaslServerProviderCreator.java
new file mode 100644
index 00000000000..fa476a5f12d
--- /dev/null
+++ b/clients/src/test/java/org/apache/kafka/common/security/ssl/mock/TestPlainSaslServerProviderCreator.java
@@ -0,0 +1,34 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.common.security.ssl.mock;
+
+import org.apache.kafka.common.security.SecurityProviderCreator;
+
+import java.security.Provider;
+
+public class TestPlainSaslServerProviderCreator implements SecurityProviderCreator {
+
+ private TestPlainSaslServerProvider provider;
+
+ @Override
+ public Provider getProvider() {
+ if (provider == null) {
+ provider = new TestPlainSaslServerProvider();
+ }
+ return provider;
+ }
+}
diff --git a/clients/src/test/java/org/apache/kafka/common/security/ssl/mock/TestProviderCreator.java b/clients/src/test/java/org/apache/kafka/common/security/ssl/mock/TestProviderCreator.java
new file mode 100644
index 00000000000..5f4929f0580
--- /dev/null
+++ b/clients/src/test/java/org/apache/kafka/common/security/ssl/mock/TestProviderCreator.java
@@ -0,0 +1,34 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.common.security.ssl.mock;
+
+import org.apache.kafka.common.security.SecurityProviderCreator;
+
+import java.security.Provider;
+
+public class TestProviderCreator implements SecurityProviderCreator {
+
+ private TestProvider provider;
+
+ @Override
+ public Provider getProvider() {
+ if (provider == null) {
+ provider = new TestProvider();
+ }
+ return provider;
+ }
+}
diff --git a/clients/src/test/java/org/apache/kafka/common/security/ssl/mock/TestScramSaslServerProvider.java b/clients/src/test/java/org/apache/kafka/common/security/ssl/mock/TestScramSaslServerProvider.java
new file mode 100644
index 00000000000..c5e831028f7
--- /dev/null
+++ b/clients/src/test/java/org/apache/kafka/common/security/ssl/mock/TestScramSaslServerProvider.java
@@ -0,0 +1,31 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.common.security.ssl.mock;
+
+import java.security.Provider;
+
+public class TestScramSaslServerProvider extends Provider {
+
+ public TestScramSaslServerProvider() {
+ this("TestScramSaslServerProvider", 0.1, "test scram sasl server provider");
+ }
+
+ protected TestScramSaslServerProvider(String name, double version, String info) {
+ super(name, version, info);
+ }
+
+}
diff --git a/clients/src/test/java/org/apache/kafka/common/security/ssl/mock/TestScramSaslServerProviderCreator.java b/clients/src/test/java/org/apache/kafka/common/security/ssl/mock/TestScramSaslServerProviderCreator.java
new file mode 100644
index 00000000000..f03f8c96a27
--- /dev/null
+++ b/clients/src/test/java/org/apache/kafka/common/security/ssl/mock/TestScramSaslServerProviderCreator.java
@@ -0,0 +1,34 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.common.security.ssl.mock;
+
+import org.apache.kafka.common.security.SecurityProviderCreator;
+
+import java.security.Provider;
+
+public class TestScramSaslServerProviderCreator implements SecurityProviderCreator {
+
+ private TestScramSaslServerProvider provider;
+
+ @Override
+ public Provider getProvider() {
+ if (provider == null) {
+ provider = new TestScramSaslServerProvider();
+ }
+ return provider;
+ }
+}
diff --git a/clients/src/test/java/org/apache/kafka/common/utils/SecurityUtilsTest.java b/clients/src/test/java/org/apache/kafka/common/utils/SecurityUtilsTest.java
index 273c13abbfe..903ac5c9dd7 100644
--- a/clients/src/test/java/org/apache/kafka/common/utils/SecurityUtilsTest.java
+++ b/clients/src/test/java/org/apache/kafka/common/utils/SecurityUtilsTest.java
@@ -16,13 +16,49 @@
*/
package org.apache.kafka.common.utils;
+import org.apache.kafka.common.config.SecurityConfig;
+import org.apache.kafka.common.security.SecurityProviderCreator;
import org.apache.kafka.common.security.auth.KafkaPrincipal;
+import org.apache.kafka.common.security.ssl.mock.TestPlainSaslServerProviderCreator;
+import org.apache.kafka.common.security.ssl.mock.TestScramSaslServerProviderCreator;
+import org.hamcrest.MatcherAssert;
+import org.junit.After;
+import org.junit.Before;
import org.junit.Test;
+import java.security.Provider;
+import java.security.Security;
+import java.util.HashMap;
+import java.util.Map;
+
import static org.junit.Assert.assertEquals;
public class SecurityUtilsTest {
+ private SecurityProviderCreator testScramSaslServerProviderCreator = new TestScramSaslServerProviderCreator();
+ private SecurityProviderCreator testPlainSaslServerProviderCreator = new TestPlainSaslServerProviderCreator();
+
+ private Provider testScramSaslServerProvider = testScramSaslServerProviderCreator.getProvider();
+ private Provider testPlainSaslServerProvider = testPlainSaslServerProviderCreator.getProvider();
+
+ private void clearTestProviders() {
+ Security.removeProvider(testScramSaslServerProvider.getName());
+ Security.removeProvider(testPlainSaslServerProvider.getName());
+ }
+
+ @Before
+ // Remove the providers if already added
+ public void setUp() {
+ clearTestProviders();
+ }
+
+ // Remove the providers after running test cases
+ @After
+ public void tearDown() {
+ clearTestProviders();
+ }
+
+
@Test
public void testPrincipalNameCanContainSeparator() {
String name = "name:with:separator:in:it";
@@ -40,4 +76,31 @@ public class SecurityUtilsTest {
assertEquals(name, principal.getName());
}
+ private int getProviderIndexFromName(String providerName, Provider[] providers) {
+ for (int index = 0; index < providers.length; index++) {
+ if (providers[index].getName().equals(providerName)) {
+ return index;
+ }
+ }
+ return -1;
+ }
+
+ // Tests if the custom providers configured are being added to the JVM correctly. These providers are
+ // expected to be added at the start of the list of available providers and with the relative ordering maintained
+ @Test
+ public void testAddCustomSecurityProvider() {
+ String customProviderClasses = testScramSaslServerProviderCreator.getClass().getName() + "," +
+ testPlainSaslServerProviderCreator.getClass().getName();
+ Map configs = new HashMap<>();
+ configs.put(SecurityConfig.SECURITY_PROVIDERS_CONFIG, customProviderClasses);
+ SecurityUtils.addConfiguredSecurityProviders(configs);
+
+ Provider[] providers = Security.getProviders();
+ int testScramSaslServerProviderIndex = getProviderIndexFromName(testScramSaslServerProvider.getName(), providers);
+ int testPlainSaslServerProviderIndex = getProviderIndexFromName(testPlainSaslServerProvider.getName(), providers);
+
+ // validations
+ MatcherAssert.assertThat(testScramSaslServerProvider.getName() + " testProvider not found at expected index", testScramSaslServerProviderIndex == 0);
+ MatcherAssert.assertThat(testPlainSaslServerProvider.getName() + " testProvider not found at expected index", testPlainSaslServerProviderIndex == 1);
+ }
}
diff --git a/core/src/main/scala/kafka/server/KafkaConfig.scala b/core/src/main/scala/kafka/server/KafkaConfig.scala
index 74c397b9190..2069e70cec6 100755
--- a/core/src/main/scala/kafka/server/KafkaConfig.scala
+++ b/core/src/main/scala/kafka/server/KafkaConfig.scala
@@ -29,6 +29,7 @@ import kafka.utils.CoreUtils
import kafka.utils.Implicits._
import org.apache.kafka.clients.CommonClientConfigs
import org.apache.kafka.common.Reconfigurable
+import org.apache.kafka.common.config.SecurityConfig
import org.apache.kafka.common.config.ConfigDef.{ConfigKey, ValidList}
import org.apache.kafka.common.config.internals.BrokerSecurityConfigs
import org.apache.kafka.common.config.{AbstractConfig, ConfigDef, ConfigException, SaslConfigs, SslClientAuth, SslConfigs, TopicConfig}
@@ -432,6 +433,7 @@ object KafkaConfig {
/** ******** Common Security Configuration *************/
val PrincipalBuilderClassProp = BrokerSecurityConfigs.PRINCIPAL_BUILDER_CLASS_CONFIG
val ConnectionsMaxReauthMsProp = BrokerSecurityConfigs.CONNECTIONS_MAX_REAUTH_MS
+ val securityProviderClassProp = SecurityConfig.SECURITY_PROVIDERS_CONFIG
/** ********* SSL Configuration ****************/
val SslProtocolProp = SslConfigs.SSL_PROTOCOL_CONFIG
@@ -786,6 +788,7 @@ object KafkaConfig {
/** ******** Common Security Configuration *************/
val PrincipalBuilderClassDoc = BrokerSecurityConfigs.PRINCIPAL_BUILDER_CLASS_DOC
val ConnectionsMaxReauthMsDoc = BrokerSecurityConfigs.CONNECTIONS_MAX_REAUTH_MS_DOC
+ val securityProviderClassDoc = SecurityConfig.SECURITY_PROVIDERS_DOC
/** ********* SSL Configuration ****************/
val SslProtocolDoc = SslConfigs.SSL_PROTOCOL_DOC
@@ -1032,6 +1035,7 @@ object KafkaConfig {
/** ********* General Security Configuration ****************/
.define(ConnectionsMaxReauthMsProp, LONG, Defaults.ConnectionsMaxReauthMsDefault, MEDIUM, ConnectionsMaxReauthMsDoc)
+ .define(securityProviderClassProp, STRING, null, LOW, securityProviderClassDoc)
/** ********* SSL Configuration ****************/
.define(PrincipalBuilderClassProp, CLASS, null, MEDIUM, PrincipalBuilderClassDoc)
diff --git a/core/src/test/scala/unit/kafka/server/KafkaConfigTest.scala b/core/src/test/scala/unit/kafka/server/KafkaConfigTest.scala
index a6e86c66722..2709ee60470 100755
--- a/core/src/test/scala/unit/kafka/server/KafkaConfigTest.scala
+++ b/core/src/test/scala/unit/kafka/server/KafkaConfigTest.scala
@@ -740,6 +740,9 @@ class KafkaConfigTest {
case KafkaConfig.SaslLoginRefreshMinPeriodSecondsProp =>
case KafkaConfig.SaslLoginRefreshBufferSecondsProp =>
+ // Security config
+ case KafkaConfig.securityProviderClassProp =>
+
// Password encoder configs
case KafkaConfig.PasswordEncoderSecretProp =>
case KafkaConfig.PasswordEncoderOldSecretProp =>