Browse Source

KAFKA-8669: Add security providers in kafka security config (#7090)

* Adds custom provider class to security config 
* Implementation of KIP-492
Reviewers: Sriharsha Chintalapani <sriharsha@apache.org> , Jeff Huang
pull/7253/head
saisandeep 5 years ago committed by Harsha
parent
commit
d08bcae7f9
  1. 12
      clients/src/main/java/org/apache/kafka/clients/admin/AdminClientConfig.java
  2. 14
      clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerConfig.java
  3. 12
      clients/src/main/java/org/apache/kafka/clients/producer/ProducerConfig.java
  4. 28
      clients/src/main/java/org/apache/kafka/common/config/SecurityConfig.java
  5. 41
      clients/src/main/java/org/apache/kafka/common/security/SecurityProviderCreator.java
  6. 2
      clients/src/main/java/org/apache/kafka/common/security/authenticator/LoginManager.java
  7. 4
      clients/src/main/java/org/apache/kafka/common/security/ssl/SslEngineBuilder.java
  8. 23
      clients/src/main/java/org/apache/kafka/common/security/ssl/SslFactory.java
  9. 31
      clients/src/main/java/org/apache/kafka/common/utils/SecurityUtils.java
  10. 12
      clients/src/test/java/org/apache/kafka/common/network/SslSelectorTest.java
  11. 37
      clients/src/test/java/org/apache/kafka/common/security/ssl/SslFactoryTest.java
  12. 31
      clients/src/test/java/org/apache/kafka/common/security/ssl/mock/TestPlainSaslServerProvider.java
  13. 34
      clients/src/test/java/org/apache/kafka/common/security/ssl/mock/TestPlainSaslServerProviderCreator.java
  14. 34
      clients/src/test/java/org/apache/kafka/common/security/ssl/mock/TestProviderCreator.java
  15. 31
      clients/src/test/java/org/apache/kafka/common/security/ssl/mock/TestScramSaslServerProvider.java
  16. 34
      clients/src/test/java/org/apache/kafka/common/security/ssl/mock/TestScramSaslServerProviderCreator.java
  17. 63
      clients/src/test/java/org/apache/kafka/common/utils/SecurityUtilsTest.java
  18. 4
      core/src/main/scala/kafka/server/KafkaConfig.scala
  19. 3
      core/src/test/scala/unit/kafka/server/KafkaConfigTest.scala

12
clients/src/main/java/org/apache/kafka/clients/admin/AdminClientConfig.java

@ -23,6 +23,7 @@ import org.apache.kafka.common.config.AbstractConfig; @@ -23,6 +23,7 @@ import org.apache.kafka.common.config.AbstractConfig;
import org.apache.kafka.common.config.ConfigDef;
import org.apache.kafka.common.config.ConfigDef.Importance;
import org.apache.kafka.common.config.ConfigDef.Type;
import org.apache.kafka.common.config.SecurityConfig;
import org.apache.kafka.common.metrics.Sensor;
import java.util.Map;
@ -107,6 +108,12 @@ public class AdminClientConfig extends AbstractConfig { @@ -107,6 +108,12 @@ public class AdminClientConfig extends AbstractConfig {
public static final String RETRIES_CONFIG = CommonClientConfigs.RETRIES_CONFIG;
/**
* <code>security.providers</code>
*/
public static final String SECURITY_PROVIDERS_CONFIG = SecurityConfig.SECURITY_PROVIDERS_CONFIG;
private static final String SECURITY_PROVIDERS_DOC = SecurityConfig.SECURITY_PROVIDERS_DOC;
static {
CONFIG = new ConfigDef().define(BOOTSTRAP_SERVERS_CONFIG,
Type.LIST,
@ -174,6 +181,11 @@ public class AdminClientConfig extends AbstractConfig { @@ -174,6 +181,11 @@ public class AdminClientConfig extends AbstractConfig {
Importance.MEDIUM,
CLIENT_DNS_LOOKUP_DOC)
// security support
.define(SECURITY_PROVIDERS_CONFIG,
Type.STRING,
null,
Importance.LOW,
SECURITY_PROVIDERS_DOC)
.define(SECURITY_PROTOCOL_CONFIG,
Type.STRING,
DEFAULT_SECURITY_PROTOCOL,

14
clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerConfig.java

@ -22,6 +22,7 @@ import org.apache.kafka.common.config.AbstractConfig; @@ -22,6 +22,7 @@ import org.apache.kafka.common.config.AbstractConfig;
import org.apache.kafka.common.config.ConfigDef;
import org.apache.kafka.common.config.ConfigDef.Importance;
import org.apache.kafka.common.config.ConfigDef.Type;
import org.apache.kafka.common.config.SecurityConfig;
import org.apache.kafka.common.metrics.Sensor;
import org.apache.kafka.common.requests.IsolationLevel;
import org.apache.kafka.common.serialization.Deserializer;
@ -267,7 +268,13 @@ public class ConsumerConfig extends AbstractConfig { @@ -267,7 +268,13 @@ public class ConsumerConfig extends AbstractConfig {
" broker allows for it using `auto.create.topics.enable` broker configuration. This configuration must" +
" be set to `false` when using brokers older than 0.11.0";
public static final boolean DEFAULT_ALLOW_AUTO_CREATE_TOPICS = true;
/**
* <code>security.providers</code>
*/
public static final String SECURITY_PROVIDERS_CONFIG = SecurityConfig.SECURITY_PROVIDERS_CONFIG;
private static final String SECURITY_PROVIDERS_DOC = SecurityConfig.SECURITY_PROVIDERS_DOC;
static {
CONFIG = new ConfigDef().define(BOOTSTRAP_SERVERS_CONFIG,
Type.LIST,
@ -486,6 +493,11 @@ public class ConsumerConfig extends AbstractConfig { @@ -486,6 +493,11 @@ public class ConsumerConfig extends AbstractConfig {
Importance.MEDIUM,
ALLOW_AUTO_CREATE_TOPICS_DOC)
// security support
.define(SECURITY_PROVIDERS_CONFIG,
Type.STRING,
null,
Importance.LOW,
SECURITY_PROVIDERS_DOC)
.define(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG,
Type.STRING,
CommonClientConfigs.DEFAULT_SECURITY_PROTOCOL,

12
clients/src/main/java/org/apache/kafka/clients/producer/ProducerConfig.java

@ -23,6 +23,7 @@ import org.apache.kafka.common.config.AbstractConfig; @@ -23,6 +23,7 @@ import org.apache.kafka.common.config.AbstractConfig;
import org.apache.kafka.common.config.ConfigDef;
import org.apache.kafka.common.config.ConfigDef.Importance;
import org.apache.kafka.common.config.ConfigDef.Type;
import org.apache.kafka.common.config.SecurityConfig;
import org.apache.kafka.common.metrics.Sensor;
import org.apache.kafka.common.serialization.Serializer;
@ -240,6 +241,12 @@ public class ProducerConfig extends AbstractConfig { @@ -240,6 +241,12 @@ public class ProducerConfig extends AbstractConfig {
"The default is <code>null</code>, which means transactions cannot be used. " +
"Note that, by default, transactions require a cluster of at least three brokers which is the recommended setting for production; for development you can change this, by adjusting broker setting <code>transaction.state.log.replication.factor</code>.";
/**
* <code>security.providers</code>
*/
public static final String SECURITY_PROVIDERS_CONFIG = SecurityConfig.SECURITY_PROVIDERS_CONFIG;
private static final String SECURITY_PROVIDERS_DOC = SecurityConfig.SECURITY_PROVIDERS_DOC;
static {
CONFIG = new ConfigDef().define(BOOTSTRAP_SERVERS_CONFIG, Type.LIST, Collections.emptyList(), new ConfigDef.NonNullValidator(), Importance.HIGH, CommonClientConfigs.BOOTSTRAP_SERVERS_DOC)
.define(CLIENT_DNS_LOOKUP_CONFIG,
@ -341,6 +348,11 @@ public class ProducerConfig extends AbstractConfig { @@ -341,6 +348,11 @@ public class ProducerConfig extends AbstractConfig {
CommonClientConfigs.DEFAULT_SECURITY_PROTOCOL,
Importance.MEDIUM,
CommonClientConfigs.SECURITY_PROTOCOL_DOC)
.define(SECURITY_PROVIDERS_CONFIG,
Type.STRING,
null,
Importance.LOW,
SECURITY_PROVIDERS_DOC)
.withClientSslSupport()
.withClientSaslSupport()
.define(ENABLE_IDEMPOTENCE_CONFIG,

28
clients/src/main/java/org/apache/kafka/common/config/SecurityConfig.java

@ -0,0 +1,28 @@ @@ -0,0 +1,28 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.common.config;
/**
* Contains the common security config for SSL and SASL
*/
public class SecurityConfig {
public static final String SECURITY_PROVIDERS_CONFIG = "security.providers";
public static final String SECURITY_PROVIDERS_DOC = "A list of configurable creators each returning a provider " +
"implementing security algorithms";
}

41
clients/src/main/java/org/apache/kafka/common/security/SecurityProviderCreator.java

@ -0,0 +1,41 @@ @@ -0,0 +1,41 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.common.security;
import org.apache.kafka.common.Configurable;
import java.security.Provider;
import java.util.Map;
/**
* An interface for generating security providers.
*/
public interface SecurityProviderCreator extends Configurable {
/**
* Configure method is used to configure the generator to create the Security Provider
* @param config configuration parameters for initialising security provider
*/
default void configure(Map<String, ?> config) {
}
/**
* Generate the security provider configured
*/
Provider getProvider();
}

2
clients/src/main/java/org/apache/kafka/common/security/authenticator/LoginManager.java

@ -25,6 +25,7 @@ import org.apache.kafka.common.security.auth.AuthenticateCallbackHandler; @@ -25,6 +25,7 @@ import org.apache.kafka.common.security.auth.AuthenticateCallbackHandler;
import org.apache.kafka.common.security.auth.Login;
import org.apache.kafka.common.security.oauthbearer.OAuthBearerLoginModule;
import org.apache.kafka.common.security.oauthbearer.internals.unsecured.OAuthBearerUnsecuredLoginCallbackHandler;
import org.apache.kafka.common.utils.SecurityUtils;
import org.apache.kafka.common.utils.Utils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@ -112,6 +113,7 @@ public class LoginManager { @@ -112,6 +113,7 @@ public class LoginManager {
STATIC_INSTANCES.put(loginMetadata, loginManager);
}
}
SecurityUtils.addConfiguredSecurityProviders(configs);
return loginManager.acquire();
}
}

4
clients/src/main/java/org/apache/kafka/common/security/ssl/SslEngineBuilder.java

@ -22,6 +22,7 @@ import org.apache.kafka.common.config.SslConfigs; @@ -22,6 +22,7 @@ import org.apache.kafka.common.config.SslConfigs;
import org.apache.kafka.common.config.internals.BrokerSecurityConfigs;
import org.apache.kafka.common.network.Mode;
import org.apache.kafka.common.config.types.Password;
import org.apache.kafka.common.utils.SecurityUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@ -35,9 +36,9 @@ import java.io.IOException; @@ -35,9 +36,9 @@ import java.io.IOException;
import java.io.InputStream;
import java.nio.file.Files;
import java.nio.file.Paths;
import java.security.SecureRandom;
import java.security.GeneralSecurityException;
import java.security.KeyStore;
import java.security.SecureRandom;
import java.util.Collections;
import java.util.Date;
import java.util.List;
@ -66,6 +67,7 @@ public class SslEngineBuilder { @@ -66,6 +67,7 @@ public class SslEngineBuilder {
this.configs = Collections.unmodifiableMap(configs);
this.protocol = (String) configs.get(SslConfigs.SSL_PROTOCOL_CONFIG);
this.provider = (String) configs.get(SslConfigs.SSL_PROVIDER_CONFIG);
SecurityUtils.addConfiguredSecurityProviders(this.configs);
List<String> cipherSuitesList = (List<String>) configs.get(SslConfigs.SSL_CIPHER_SUITES_CONFIG);
if (cipherSuitesList != null && !cipherSuitesList.isEmpty()) {

23
clients/src/main/java/org/apache/kafka/common/security/ssl/SslFactory.java

@ -19,6 +19,7 @@ package org.apache.kafka.common.security.ssl; @@ -19,6 +19,7 @@ package org.apache.kafka.common.security.ssl;
import org.apache.kafka.common.KafkaException;
import org.apache.kafka.common.Reconfigurable;
import org.apache.kafka.common.config.ConfigException;
import org.apache.kafka.common.config.SecurityConfig;
import org.apache.kafka.common.config.SslConfigs;
import org.apache.kafka.common.config.internals.BrokerSecurityConfigs;
import org.apache.kafka.common.network.Mode;
@ -87,6 +88,7 @@ public class SslFactory implements Reconfigurable { @@ -87,6 +88,7 @@ public class SslFactory implements Reconfigurable {
Map<String, Object> nextConfigs = new HashMap<>();
copyMapEntries(nextConfigs, configs, SslConfigs.NON_RECONFIGURABLE_CONFIGS);
copyMapEntries(nextConfigs, configs, SslConfigs.RECONFIGURABLE_CONFIGS);
copyMapEntry(nextConfigs, configs, SecurityConfig.SECURITY_PROVIDERS_CONFIG);
if (clientAuthConfigOverride != null) {
nextConfigs.put(BrokerSecurityConfigs.SSL_CLIENT_AUTH_CONFIG, clientAuthConfigOverride);
}
@ -196,9 +198,24 @@ public class SslFactory implements Reconfigurable { @@ -196,9 +198,24 @@ public class SslFactory implements Reconfigurable {
Map<K, ? extends V> srcMap,
Set<K> keySet) {
for (K k : keySet) {
if (srcMap.containsKey(k)) {
destMap.put(k, srcMap.get(k));
}
copyMapEntry(destMap, srcMap, k);
}
}
/**
* Copy entry from one map into another.
*
* @param destMap The map to copy entries into.
* @param srcMap The map to copy entries from.
* @param key The entry with this key will be copied
* @param <K> The map key type.
* @param <V> The map value type.
*/
private static <K, V> void copyMapEntry(Map<K, V> destMap,
Map<K, ? extends V> srcMap,
K key) {
if (srcMap.containsKey(key)) {
destMap.put(key, srcMap.get(key));
}
}

31
clients/src/main/java/org/apache/kafka/common/utils/SecurityUtils.java

@ -16,10 +16,19 @@ @@ -16,10 +16,19 @@
*/
package org.apache.kafka.common.utils;
import org.apache.kafka.common.config.SecurityConfig;
import org.apache.kafka.common.security.SecurityProviderCreator;
import org.apache.kafka.common.security.auth.KafkaPrincipal;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.security.Security;
import java.util.Map;
public class SecurityUtils {
private static final Logger LOGGER = LoggerFactory.getLogger(SecurityConfig.class);
public static KafkaPrincipal parseKafkaPrincipal(String str) {
if (str == null || str.isEmpty()) {
throw new IllegalArgumentException("expected a string in format principalType:principalName but got " + str);
@ -34,4 +43,26 @@ public class SecurityUtils { @@ -34,4 +43,26 @@ public class SecurityUtils {
return new KafkaPrincipal(split[0], split[1]);
}
public static void addConfiguredSecurityProviders(Map<String, ?> configs) {
String securityProviderClassesStr = (String) configs.get(SecurityConfig.SECURITY_PROVIDERS_CONFIG);
if (securityProviderClassesStr == null || securityProviderClassesStr.equals("")) {
return;
}
try {
String[] securityProviderClasses = securityProviderClassesStr.replaceAll("\\s+", "").split(",");
for (int index = 0; index < securityProviderClasses.length; index++) {
SecurityProviderCreator securityProviderCreator = (SecurityProviderCreator) Class.forName(securityProviderClasses[index]).newInstance();
securityProviderCreator.configure(configs);
Security.insertProviderAt(securityProviderCreator.getProvider(), index + 1);
}
} catch (ClassCastException e) {
LOGGER.error("Creators provided through " + SecurityConfig.SECURITY_PROVIDERS_CONFIG +
" are expected to be sub-classes of SecurityProviderCreator");
} catch (ClassNotFoundException cnfe) {
LOGGER.error("Unrecognized security provider creator class", cnfe);
} catch (IllegalAccessException | InstantiationException e) {
LOGGER.error("Unexpected implementation of security provider creator class", e);
}
}
}

12
clients/src/test/java/org/apache/kafka/common/network/SslSelectorTest.java

@ -18,13 +18,15 @@ package org.apache.kafka.common.network; @@ -18,13 +18,15 @@ package org.apache.kafka.common.network;
import java.nio.channels.SelectionKey;
import javax.net.ssl.SSLEngine;
import org.apache.kafka.common.config.SecurityConfig;
import org.apache.kafka.common.memory.MemoryPool;
import org.apache.kafka.common.memory.SimpleMemoryPool;
import org.apache.kafka.common.metrics.Metrics;
import org.apache.kafka.common.security.auth.SecurityProtocol;
import org.apache.kafka.common.security.ssl.SslFactory;
import org.apache.kafka.common.security.ssl.mock.TestKeyManagerFactory;
import org.apache.kafka.common.security.ssl.mock.TestProvider;
import org.apache.kafka.common.security.ssl.mock.TestProviderCreator;
import org.apache.kafka.common.security.ssl.mock.TestTrustManagerFactory;
import org.apache.kafka.common.utils.LogContext;
import org.apache.kafka.common.utils.MockTime;
@ -41,7 +43,6 @@ import java.io.IOException; @@ -41,7 +43,6 @@ import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.channels.ServerSocketChannel;
import java.nio.channels.SocketChannel;
import java.security.Provider;
import java.security.Security;
import java.util.ArrayList;
import java.util.Collections;
@ -93,8 +94,8 @@ public class SslSelectorTest extends SelectorTest { @@ -93,8 +94,8 @@ public class SslSelectorTest extends SelectorTest {
@Test
public void testConnectionWithCustomKeyManager() throws Exception {
Provider provider = new TestProvider();
Security.addProvider(provider);
TestProviderCreator testProviderCreator = new TestProviderCreator();
int requestSize = 100 * 1024;
final String node = "0";
@ -104,6 +105,7 @@ public class SslSelectorTest extends SelectorTest { @@ -104,6 +105,7 @@ public class SslSelectorTest extends SelectorTest {
TestKeyManagerFactory.ALGORITHM,
TestTrustManagerFactory.ALGORITHM
);
sslServerConfigs.put(SecurityConfig.SECURITY_PROVIDERS_CONFIG, testProviderCreator.getClass().getName());
EchoServer server = new EchoServer(SecurityProtocol.SSL, sslServerConfigs);
server.start();
Time time = new MockTime();
@ -128,7 +130,7 @@ public class SslSelectorTest extends SelectorTest { @@ -128,7 +130,7 @@ public class SslSelectorTest extends SelectorTest {
selector.close(node);
super.verifySelectorEmpty(selector);
Security.removeProvider(provider.getName());
Security.removeProvider(testProviderCreator.getProvider().getName());
selector.close();
server.close();
metrics.close();

37
clients/src/test/java/org/apache/kafka/common/security/ssl/SslFactoryTest.java

@ -19,18 +19,19 @@ package org.apache.kafka.common.security.ssl; @@ -19,18 +19,19 @@ package org.apache.kafka.common.security.ssl;
import java.io.File;
import java.nio.file.Files;
import java.security.KeyStore;
import java.security.Provider;
import java.util.Arrays;
import java.util.Map;
import javax.net.ssl.SSLContext;
import javax.net.ssl.SSLEngine;
import org.apache.kafka.common.KafkaException;
import org.apache.kafka.common.config.ConfigException;
import org.apache.kafka.common.config.SecurityConfig;
import org.apache.kafka.common.config.SslConfigs;
import org.apache.kafka.common.config.types.Password;
import org.apache.kafka.common.security.ssl.mock.TestKeyManagerFactory;
import org.apache.kafka.common.security.ssl.mock.TestProvider;
import org.apache.kafka.common.security.ssl.mock.TestProviderCreator;
import org.apache.kafka.common.security.ssl.mock.TestTrustManagerFactory;
import org.apache.kafka.common.utils.Utils;
import org.apache.kafka.test.TestSslUtils;
@ -63,17 +64,41 @@ public class SslFactoryTest { @@ -63,17 +64,41 @@ public class SslFactoryTest {
}
@Test
public void testSslFactoryWithCustomKeyManagerConfiguration() throws Exception {
Provider provider = new TestProvider();
Security.addProvider(provider);
public void testSslFactoryWithCustomKeyManagerConfiguration() {
TestProviderCreator testProviderCreator = new TestProviderCreator();
Map<String, Object> serverSslConfig = TestSslUtils.createSslConfig(
TestKeyManagerFactory.ALGORITHM,
TestTrustManagerFactory.ALGORITHM
);
serverSslConfig.put(SecurityConfig.SECURITY_PROVIDERS_CONFIG, testProviderCreator.getClass().getName());
SslFactory sslFactory = new SslFactory(Mode.SERVER);
sslFactory.configure(serverSslConfig);
assertNotNull("SslEngineBuilder not created", sslFactory.sslEngineBuilder());
Security.removeProvider(provider.getName());
Security.removeProvider(testProviderCreator.getProvider().getName());
}
@Test(expected = KafkaException.class)
public void testSslFactoryWithoutProviderClassConfiguration() {
// An exception is thrown as the algorithm is not registered through a provider
Map<String, Object> serverSslConfig = TestSslUtils.createSslConfig(
TestKeyManagerFactory.ALGORITHM,
TestTrustManagerFactory.ALGORITHM
);
SslFactory sslFactory = new SslFactory(Mode.SERVER);
sslFactory.configure(serverSslConfig);
}
@Test(expected = KafkaException.class)
public void testSslFactoryWithIncorrectProviderClassConfiguration() {
// An exception is thrown as the algorithm is not registered through a provider
Map<String, Object> serverSslConfig = TestSslUtils.createSslConfig(
TestKeyManagerFactory.ALGORITHM,
TestTrustManagerFactory.ALGORITHM
);
serverSslConfig.put(SecurityConfig.SECURITY_PROVIDERS_CONFIG,
"com.fake.ProviderClass1,com.fake.ProviderClass2");
SslFactory sslFactory = new SslFactory(Mode.SERVER);
sslFactory.configure(serverSslConfig);
}
@Test

31
clients/src/test/java/org/apache/kafka/common/security/ssl/mock/TestPlainSaslServerProvider.java

@ -0,0 +1,31 @@ @@ -0,0 +1,31 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.common.security.ssl.mock;
import java.security.Provider;
public class TestPlainSaslServerProvider extends Provider {
public TestPlainSaslServerProvider() {
this("TestPlainSaslServerProvider", 0.1, "test plain sasl server provider");
}
protected TestPlainSaslServerProvider(String name, double version, String info) {
super(name, version, info);
}
}

34
clients/src/test/java/org/apache/kafka/common/security/ssl/mock/TestPlainSaslServerProviderCreator.java

@ -0,0 +1,34 @@ @@ -0,0 +1,34 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.common.security.ssl.mock;
import org.apache.kafka.common.security.SecurityProviderCreator;
import java.security.Provider;
public class TestPlainSaslServerProviderCreator implements SecurityProviderCreator {
private TestPlainSaslServerProvider provider;
@Override
public Provider getProvider() {
if (provider == null) {
provider = new TestPlainSaslServerProvider();
}
return provider;
}
}

34
clients/src/test/java/org/apache/kafka/common/security/ssl/mock/TestProviderCreator.java

@ -0,0 +1,34 @@ @@ -0,0 +1,34 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.common.security.ssl.mock;
import org.apache.kafka.common.security.SecurityProviderCreator;
import java.security.Provider;
public class TestProviderCreator implements SecurityProviderCreator {
private TestProvider provider;
@Override
public Provider getProvider() {
if (provider == null) {
provider = new TestProvider();
}
return provider;
}
}

31
clients/src/test/java/org/apache/kafka/common/security/ssl/mock/TestScramSaslServerProvider.java

@ -0,0 +1,31 @@ @@ -0,0 +1,31 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.common.security.ssl.mock;
import java.security.Provider;
public class TestScramSaslServerProvider extends Provider {
public TestScramSaslServerProvider() {
this("TestScramSaslServerProvider", 0.1, "test scram sasl server provider");
}
protected TestScramSaslServerProvider(String name, double version, String info) {
super(name, version, info);
}
}

34
clients/src/test/java/org/apache/kafka/common/security/ssl/mock/TestScramSaslServerProviderCreator.java

@ -0,0 +1,34 @@ @@ -0,0 +1,34 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.common.security.ssl.mock;
import org.apache.kafka.common.security.SecurityProviderCreator;
import java.security.Provider;
public class TestScramSaslServerProviderCreator implements SecurityProviderCreator {
private TestScramSaslServerProvider provider;
@Override
public Provider getProvider() {
if (provider == null) {
provider = new TestScramSaslServerProvider();
}
return provider;
}
}

63
clients/src/test/java/org/apache/kafka/common/utils/SecurityUtilsTest.java

@ -16,13 +16,49 @@ @@ -16,13 +16,49 @@
*/
package org.apache.kafka.common.utils;
import org.apache.kafka.common.config.SecurityConfig;
import org.apache.kafka.common.security.SecurityProviderCreator;
import org.apache.kafka.common.security.auth.KafkaPrincipal;
import org.apache.kafka.common.security.ssl.mock.TestPlainSaslServerProviderCreator;
import org.apache.kafka.common.security.ssl.mock.TestScramSaslServerProviderCreator;
import org.hamcrest.MatcherAssert;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
import java.security.Provider;
import java.security.Security;
import java.util.HashMap;
import java.util.Map;
import static org.junit.Assert.assertEquals;
public class SecurityUtilsTest {
private SecurityProviderCreator testScramSaslServerProviderCreator = new TestScramSaslServerProviderCreator();
private SecurityProviderCreator testPlainSaslServerProviderCreator = new TestPlainSaslServerProviderCreator();
private Provider testScramSaslServerProvider = testScramSaslServerProviderCreator.getProvider();
private Provider testPlainSaslServerProvider = testPlainSaslServerProviderCreator.getProvider();
private void clearTestProviders() {
Security.removeProvider(testScramSaslServerProvider.getName());
Security.removeProvider(testPlainSaslServerProvider.getName());
}
@Before
// Remove the providers if already added
public void setUp() {
clearTestProviders();
}
// Remove the providers after running test cases
@After
public void tearDown() {
clearTestProviders();
}
@Test
public void testPrincipalNameCanContainSeparator() {
String name = "name:with:separator:in:it";
@ -40,4 +76,31 @@ public class SecurityUtilsTest { @@ -40,4 +76,31 @@ public class SecurityUtilsTest {
assertEquals(name, principal.getName());
}
private int getProviderIndexFromName(String providerName, Provider[] providers) {
for (int index = 0; index < providers.length; index++) {
if (providers[index].getName().equals(providerName)) {
return index;
}
}
return -1;
}
// Tests if the custom providers configured are being added to the JVM correctly. These providers are
// expected to be added at the start of the list of available providers and with the relative ordering maintained
@Test
public void testAddCustomSecurityProvider() {
String customProviderClasses = testScramSaslServerProviderCreator.getClass().getName() + "," +
testPlainSaslServerProviderCreator.getClass().getName();
Map<String, String> configs = new HashMap<>();
configs.put(SecurityConfig.SECURITY_PROVIDERS_CONFIG, customProviderClasses);
SecurityUtils.addConfiguredSecurityProviders(configs);
Provider[] providers = Security.getProviders();
int testScramSaslServerProviderIndex = getProviderIndexFromName(testScramSaslServerProvider.getName(), providers);
int testPlainSaslServerProviderIndex = getProviderIndexFromName(testPlainSaslServerProvider.getName(), providers);
// validations
MatcherAssert.assertThat(testScramSaslServerProvider.getName() + " testProvider not found at expected index", testScramSaslServerProviderIndex == 0);
MatcherAssert.assertThat(testPlainSaslServerProvider.getName() + " testProvider not found at expected index", testPlainSaslServerProviderIndex == 1);
}
}

4
core/src/main/scala/kafka/server/KafkaConfig.scala

@ -29,6 +29,7 @@ import kafka.utils.CoreUtils @@ -29,6 +29,7 @@ import kafka.utils.CoreUtils
import kafka.utils.Implicits._
import org.apache.kafka.clients.CommonClientConfigs
import org.apache.kafka.common.Reconfigurable
import org.apache.kafka.common.config.SecurityConfig
import org.apache.kafka.common.config.ConfigDef.{ConfigKey, ValidList}
import org.apache.kafka.common.config.internals.BrokerSecurityConfigs
import org.apache.kafka.common.config.{AbstractConfig, ConfigDef, ConfigException, SaslConfigs, SslClientAuth, SslConfigs, TopicConfig}
@ -432,6 +433,7 @@ object KafkaConfig { @@ -432,6 +433,7 @@ object KafkaConfig {
/** ******** Common Security Configuration *************/
val PrincipalBuilderClassProp = BrokerSecurityConfigs.PRINCIPAL_BUILDER_CLASS_CONFIG
val ConnectionsMaxReauthMsProp = BrokerSecurityConfigs.CONNECTIONS_MAX_REAUTH_MS
val securityProviderClassProp = SecurityConfig.SECURITY_PROVIDERS_CONFIG
/** ********* SSL Configuration ****************/
val SslProtocolProp = SslConfigs.SSL_PROTOCOL_CONFIG
@ -786,6 +788,7 @@ object KafkaConfig { @@ -786,6 +788,7 @@ object KafkaConfig {
/** ******** Common Security Configuration *************/
val PrincipalBuilderClassDoc = BrokerSecurityConfigs.PRINCIPAL_BUILDER_CLASS_DOC
val ConnectionsMaxReauthMsDoc = BrokerSecurityConfigs.CONNECTIONS_MAX_REAUTH_MS_DOC
val securityProviderClassDoc = SecurityConfig.SECURITY_PROVIDERS_DOC
/** ********* SSL Configuration ****************/
val SslProtocolDoc = SslConfigs.SSL_PROTOCOL_DOC
@ -1032,6 +1035,7 @@ object KafkaConfig { @@ -1032,6 +1035,7 @@ object KafkaConfig {
/** ********* General Security Configuration ****************/
.define(ConnectionsMaxReauthMsProp, LONG, Defaults.ConnectionsMaxReauthMsDefault, MEDIUM, ConnectionsMaxReauthMsDoc)
.define(securityProviderClassProp, STRING, null, LOW, securityProviderClassDoc)
/** ********* SSL Configuration ****************/
.define(PrincipalBuilderClassProp, CLASS, null, MEDIUM, PrincipalBuilderClassDoc)

3
core/src/test/scala/unit/kafka/server/KafkaConfigTest.scala

@ -740,6 +740,9 @@ class KafkaConfigTest { @@ -740,6 +740,9 @@ class KafkaConfigTest {
case KafkaConfig.SaslLoginRefreshMinPeriodSecondsProp =>
case KafkaConfig.SaslLoginRefreshBufferSecondsProp =>
// Security config
case KafkaConfig.securityProviderClassProp =>
// Password encoder configs
case KafkaConfig.PasswordEncoderSecretProp =>
case KafkaConfig.PasswordEncoderOldSecretProp =>

Loading…
Cancel
Save