From b074173ea249ef028272c2c07358222550917d8c Mon Sep 17 00:00:00 2001 From: saisandeep Date: Thu, 2 May 2019 15:11:42 -0700 Subject: [PATCH] KAFKA-8191: Add pluggability of KeyManager to generate the broker Private Keys and Certificates Reviewers: Sriharsha Chintalapani , Ismael Juma --- .../kafka/common/security/ssl/SslFactory.java | 12 +- .../kafka/common/network/SelectorTest.java | 2 +- .../kafka/common/network/SslSelectorTest.java | 49 ++++++++ .../common/security/ssl/SslFactoryTest.java | 23 +++- .../ssl/mock/TestKeyManagerFactory.java | 113 ++++++++++++++++++ .../security/ssl/mock/TestProvider.java | 36 ++++++ .../ssl/mock/TestTrustManagerFactory.java | 88 ++++++++++++++ .../org/apache/kafka/test/TestSslUtils.java | 18 ++- 8 files changed, 332 insertions(+), 9 deletions(-) create mode 100644 clients/src/test/java/org/apache/kafka/common/security/ssl/mock/TestKeyManagerFactory.java create mode 100644 clients/src/test/java/org/apache/kafka/common/security/ssl/mock/TestProvider.java create mode 100644 clients/src/test/java/org/apache/kafka/common/security/ssl/mock/TestTrustManagerFactory.java diff --git a/clients/src/main/java/org/apache/kafka/common/security/ssl/SslFactory.java b/clients/src/main/java/org/apache/kafka/common/security/ssl/SslFactory.java index a03d1bcae8b..73d92103f6a 100644 --- a/clients/src/main/java/org/apache/kafka/common/security/ssl/SslFactory.java +++ b/clients/src/main/java/org/apache/kafka/common/security/ssl/SslFactory.java @@ -234,12 +234,16 @@ public class SslFactory implements Reconfigurable { sslContext = SSLContext.getInstance(protocol); KeyManager[] keyManagers = null; - if (keystore != null) { + if (keystore != null || kmfAlgorithm != null) { String kmfAlgorithm = this.kmfAlgorithm != null ? this.kmfAlgorithm : KeyManagerFactory.getDefaultAlgorithm(); KeyManagerFactory kmf = KeyManagerFactory.getInstance(kmfAlgorithm); - KeyStore ks = keystore.load(); - Password keyPassword = keystore.keyPassword != null ? keystore.keyPassword : keystore.password; - kmf.init(ks, keyPassword.value().toCharArray()); + if (keystore != null) { + KeyStore ks = keystore.load(); + Password keyPassword = keystore.keyPassword != null ? keystore.keyPassword : keystore.password; + kmf.init(ks, keyPassword.value().toCharArray()); + } else { + kmf.init(null, null); + } keyManagers = kmf.getKeyManagers(); } diff --git a/clients/src/test/java/org/apache/kafka/common/network/SelectorTest.java b/clients/src/test/java/org/apache/kafka/common/network/SelectorTest.java index 0f2d295f87b..ae06836986f 100644 --- a/clients/src/test/java/org/apache/kafka/common/network/SelectorTest.java +++ b/clients/src/test/java/org/apache/kafka/common/network/SelectorTest.java @@ -813,7 +813,7 @@ public class SelectorTest { verifySelectorEmpty(this.selector); } - private void verifySelectorEmpty(Selector selector) throws Exception { + public void verifySelectorEmpty(Selector selector) throws Exception { for (KafkaChannel channel : selector.channels()) { selector.close(channel.id()); assertNull(channel.selectionKey().attachment()); diff --git a/clients/src/test/java/org/apache/kafka/common/network/SslSelectorTest.java b/clients/src/test/java/org/apache/kafka/common/network/SslSelectorTest.java index 1f9739bf762..02cbaf84578 100644 --- a/clients/src/test/java/org/apache/kafka/common/network/SslSelectorTest.java +++ b/clients/src/test/java/org/apache/kafka/common/network/SslSelectorTest.java @@ -23,8 +23,12 @@ import org.apache.kafka.common.memory.SimpleMemoryPool; import org.apache.kafka.common.metrics.Metrics; import org.apache.kafka.common.security.auth.SecurityProtocol; import org.apache.kafka.common.security.ssl.SslFactory; +import org.apache.kafka.common.security.ssl.mock.TestKeyManagerFactory; +import org.apache.kafka.common.security.ssl.mock.TestProvider; +import org.apache.kafka.common.security.ssl.mock.TestTrustManagerFactory; import org.apache.kafka.common.utils.LogContext; import org.apache.kafka.common.utils.MockTime; +import org.apache.kafka.common.utils.Time; import org.apache.kafka.test.TestCondition; import org.apache.kafka.test.TestSslUtils; import org.apache.kafka.test.TestUtils; @@ -37,6 +41,8 @@ import java.io.IOException; import java.net.InetSocketAddress; import java.nio.channels.ServerSocketChannel; import java.nio.channels.SocketChannel; +import java.security.Provider; +import java.security.Security; import java.util.ArrayList; import java.util.Collections; import java.util.HashMap; @@ -85,6 +91,49 @@ public class SslSelectorTest extends SelectorTest { return SecurityProtocol.PLAINTEXT; } + @Test + public void testConnectionWithCustomKeyManager() throws Exception { + Provider provider = new TestProvider(); + Security.addProvider(provider); + + int requestSize = 100 * 1024; + final String node = "0"; + String request = TestUtils.randomString(requestSize); + + Map sslServerConfigs = TestSslUtils.createSslConfig( + TestKeyManagerFactory.ALGORITHM, + TestTrustManagerFactory.ALGORITHM + ); + EchoServer server = new EchoServer(SecurityProtocol.SSL, sslServerConfigs); + server.start(); + Time time = new MockTime(); + File trustStoreFile = new File(TestKeyManagerFactory.TestKeyManager.mockTrustStoreFile); + Map sslClientConfigs = TestSslUtils.createSslConfig(true, true, Mode.CLIENT, trustStoreFile, "client"); + + ChannelBuilder channelBuilder = new TestSslChannelBuilder(Mode.CLIENT); + channelBuilder.configure(sslClientConfigs); + Metrics metrics = new Metrics(); + Selector selector = new Selector(5000, metrics, time, "MetricGroup", channelBuilder, new LogContext()); + + selector.connect(node, new InetSocketAddress("localhost", server.port), BUFFER_SIZE, BUFFER_SIZE); + while (!selector.connected().contains(node)) + selector.poll(10000L); + while (!selector.isChannelReady(node)) + selector.poll(10000L); + + selector.send(createSend(node, request)); + + waitForBytesBuffered(selector, node); + + selector.close(node); + super.verifySelectorEmpty(selector); + + Security.removeProvider(provider.getName()); + selector.close(); + server.close(); + metrics.close(); + } + @Test public void testDisconnectWithIntermediateBufferedBytes() throws Exception { int requestSize = 100 * 1024; diff --git a/clients/src/test/java/org/apache/kafka/common/security/ssl/SslFactoryTest.java b/clients/src/test/java/org/apache/kafka/common/security/ssl/SslFactoryTest.java index 11035d05514..a6ce0b46cc6 100644 --- a/clients/src/test/java/org/apache/kafka/common/security/ssl/SslFactoryTest.java +++ b/clients/src/test/java/org/apache/kafka/common/security/ssl/SslFactoryTest.java @@ -19,6 +19,7 @@ package org.apache.kafka.common.security.ssl; import java.io.File; import java.nio.file.Files; import java.security.KeyStore; +import java.security.Provider; import java.util.Map; import javax.net.ssl.SSLContext; @@ -28,6 +29,9 @@ import javax.net.ssl.SSLHandshakeException; import org.apache.kafka.common.config.ConfigException; import org.apache.kafka.common.config.SslConfigs; import org.apache.kafka.common.config.types.Password; +import org.apache.kafka.common.security.ssl.mock.TestKeyManagerFactory; +import org.apache.kafka.common.security.ssl.mock.TestProvider; +import org.apache.kafka.common.security.ssl.mock.TestTrustManagerFactory; import org.apache.kafka.test.TestSslUtils; import org.apache.kafka.common.network.Mode; import org.junit.Test; @@ -41,10 +45,8 @@ import static org.junit.Assert.assertNotSame; import static org.junit.Assert.assertSame; import static org.junit.Assert.assertTrue; import static org.junit.Assert.fail; +import java.security.Security; -/** - * A set of tests for the selector over ssl. These use a test harness that runs a simple socket server that echos back responses. - */ public class SslFactoryTest { @Test @@ -61,6 +63,21 @@ public class SslFactoryTest { assertEquals(false, engine.getUseClientMode()); } + @Test + public void testSslFactoryWithCustomKeyManagerConfiguration() throws Exception { + Provider provider = new TestProvider(); + Security.addProvider(provider); + Map serverSslConfig = TestSslUtils.createSslConfig( + TestKeyManagerFactory.ALGORITHM, + TestTrustManagerFactory.ALGORITHM + ); + SslFactory sslFactory = new SslFactory(Mode.SERVER); + sslFactory.configure(serverSslConfig); + SSLContext sslContext = sslFactory.createSSLContext(null, null); + assertNotNull("SSL context not created", sslContext); + Security.removeProvider(provider.getName()); + } + @Test public void testSslFactoryWithoutPasswordConfiguration() throws Exception { File trustStoreFile = File.createTempFile("truststore", ".jks"); diff --git a/clients/src/test/java/org/apache/kafka/common/security/ssl/mock/TestKeyManagerFactory.java b/clients/src/test/java/org/apache/kafka/common/security/ssl/mock/TestKeyManagerFactory.java new file mode 100644 index 00000000000..dc686c246b5 --- /dev/null +++ b/clients/src/test/java/org/apache/kafka/common/security/ssl/mock/TestKeyManagerFactory.java @@ -0,0 +1,113 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.common.security.ssl.mock; + +import javax.net.ssl.KeyManager; +import javax.net.ssl.KeyManagerFactorySpi; +import javax.net.ssl.ManagerFactoryParameters; +import javax.net.ssl.X509ExtendedKeyManager; +import java.io.File; +import java.io.IOException; +import java.net.Socket; +import java.security.GeneralSecurityException; +import java.security.KeyPair; +import java.security.KeyStore; +import java.security.Principal; +import java.security.PrivateKey; +import java.security.cert.X509Certificate; +import java.util.HashMap; +import java.util.Map; + +import org.apache.kafka.common.config.types.Password; +import org.apache.kafka.test.TestSslUtils; +import org.apache.kafka.test.TestSslUtils.CertificateBuilder; + +public class TestKeyManagerFactory extends KeyManagerFactorySpi { + public static final String ALGORITHM = "TestAlgorithm"; + + @Override + protected void engineInit(KeyStore keyStore, char[] chars) { + + } + + @Override + protected void engineInit(ManagerFactoryParameters managerFactoryParameters) { + + } + + @Override + protected KeyManager[] engineGetKeyManagers() { + return new KeyManager[] {new TestKeyManager()}; + } + + public static class TestKeyManager extends X509ExtendedKeyManager { + + public static String mockTrustStoreFile; + public static final String ALIAS = "TestAlias"; + private static final String CN = "localhost"; + private static final String SIGNATURE_ALGORITHM = "RSA"; + private KeyPair keyPair; + private X509Certificate certificate; + + protected TestKeyManager() { + try { + this.keyPair = TestSslUtils.generateKeyPair(SIGNATURE_ALGORITHM); + CertificateBuilder certBuilder = new CertificateBuilder(); + this.certificate = certBuilder.generate("CN=" + CN + ", O=A server", this.keyPair); + Map certificates = new HashMap<>(); + certificates.put(ALIAS, certificate); + File trustStoreFile = File.createTempFile("testTrustStore", ".jks"); + mockTrustStoreFile = trustStoreFile.getPath(); + TestSslUtils.createTrustStore(mockTrustStoreFile, new Password(TestSslUtils.TRUST_STORE_PASSWORD), certificates); + } catch (IOException | GeneralSecurityException e) { + throw new RuntimeException(e); + } + } + + @Override + public String[] getClientAliases(String s, Principal[] principals) { + return new String[] {ALIAS}; + } + + @Override + public String chooseClientAlias(String[] strings, Principal[] principals, Socket socket) { + return ALIAS; + } + + @Override + public String[] getServerAliases(String s, Principal[] principals) { + return new String[] {ALIAS}; + } + + @Override + public String chooseServerAlias(String s, Principal[] principals, Socket socket) { + return ALIAS; + } + + @Override + public X509Certificate[] getCertificateChain(String s) { + return new X509Certificate[] {this.certificate}; + } + + @Override + public PrivateKey getPrivateKey(String s) { + return this.keyPair.getPrivate(); + } + } + +} + diff --git a/clients/src/test/java/org/apache/kafka/common/security/ssl/mock/TestProvider.java b/clients/src/test/java/org/apache/kafka/common/security/ssl/mock/TestProvider.java new file mode 100644 index 00000000000..fb44d3c994f --- /dev/null +++ b/clients/src/test/java/org/apache/kafka/common/security/ssl/mock/TestProvider.java @@ -0,0 +1,36 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.common.security.ssl.mock; + +import java.security.Provider; + +public class TestProvider extends Provider { + + private static final String KEY_MANAGER_FACTORY = String.format("KeyManagerFactory.%s", TestKeyManagerFactory.ALGORITHM); + private static final String TRUST_MANAGER_FACTORY = String.format("TrustManagerFactory.%s", TestTrustManagerFactory.ALGORITHM); + + public TestProvider() { + this("TestProvider", 0.1, "provider for test cases"); + } + + protected TestProvider(String name, double version, String info) { + super(name, version, info); + super.put(KEY_MANAGER_FACTORY, TestKeyManagerFactory.class.getName()); + super.put(TRUST_MANAGER_FACTORY, TestTrustManagerFactory.class.getName()); + } + +} diff --git a/clients/src/test/java/org/apache/kafka/common/security/ssl/mock/TestTrustManagerFactory.java b/clients/src/test/java/org/apache/kafka/common/security/ssl/mock/TestTrustManagerFactory.java new file mode 100644 index 00000000000..4115a5f8cbd --- /dev/null +++ b/clients/src/test/java/org/apache/kafka/common/security/ssl/mock/TestTrustManagerFactory.java @@ -0,0 +1,88 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.common.security.ssl.mock; + +import javax.net.ssl.ManagerFactoryParameters; +import javax.net.ssl.SSLEngine; +import javax.net.ssl.TrustManager; +import javax.net.ssl.TrustManagerFactorySpi; +import javax.net.ssl.X509ExtendedTrustManager; +import java.net.Socket; +import java.security.KeyStore; +import java.security.cert.CertificateException; +import java.security.cert.X509Certificate; + +public class TestTrustManagerFactory extends TrustManagerFactorySpi { + public static final String ALGORITHM = "TestAlgorithm"; + + @Override + protected void engineInit(KeyStore keyStore) { + + } + + @Override + protected void engineInit(ManagerFactoryParameters managerFactoryParameters) { + + } + + @Override + protected TrustManager[] engineGetTrustManagers() { + return new TrustManager[] {new TestTrustManager()}; + } + + public static class TestTrustManager extends X509ExtendedTrustManager { + + public static final String ALIAS = "TestAlias"; + + @Override + public void checkClientTrusted(X509Certificate[] x509Certificates, String s) throws CertificateException { + + } + + @Override + public void checkServerTrusted(X509Certificate[] x509Certificates, String s) throws CertificateException { + + } + + @Override + public X509Certificate[] getAcceptedIssuers() { + return new X509Certificate[0]; + } + + @Override + public void checkClientTrusted(X509Certificate[] x509Certificates, String s, Socket socket) throws CertificateException { + + } + + @Override + public void checkServerTrusted(X509Certificate[] x509Certificates, String s, Socket socket) throws CertificateException { + + } + + @Override + public void checkClientTrusted(X509Certificate[] x509Certificates, String s, SSLEngine sslEngine) throws CertificateException { + + } + + @Override + public void checkServerTrusted(X509Certificate[] x509Certificates, String s, SSLEngine sslEngine) throws CertificateException { + + } + } + +} + diff --git a/clients/src/test/java/org/apache/kafka/test/TestSslUtils.java b/clients/src/test/java/org/apache/kafka/test/TestSslUtils.java index b2de0e6a2b5..6dfceb28f09 100644 --- a/clients/src/test/java/org/apache/kafka/test/TestSslUtils.java +++ b/clients/src/test/java/org/apache/kafka/test/TestSslUtils.java @@ -70,6 +70,8 @@ import java.util.ArrayList; public class TestSslUtils { + public static final String TRUST_STORE_PASSWORD = "TrustStorePassword"; + /** * Create a self-signed X.509 Certificate. * From http://bfo.com/blog/2011/03/08/odds_and_ends_creating_a_new_x_509_certificate.html. @@ -175,6 +177,20 @@ public class TestSslUtils { return sslConfigs; } + public static Map createSslConfig(String keyManagerAlgorithm, String trustManagerAlgorithm) { + Map sslConfigs = new HashMap<>(); + sslConfigs.put(SslConfigs.SSL_PROTOCOL_CONFIG, "TLSv1.2"); // protocol to create SSLContext + + sslConfigs.put(SslConfigs.SSL_KEYMANAGER_ALGORITHM_CONFIG, keyManagerAlgorithm); + sslConfigs.put(SslConfigs.SSL_TRUSTMANAGER_ALGORITHM_CONFIG, trustManagerAlgorithm); + + List enabledProtocols = new ArrayList<>(); + enabledProtocols.add("TLSv1.2"); + sslConfigs.put(SslConfigs.SSL_ENABLED_PROTOCOLS_CONFIG, enabledProtocols); + + return sslConfigs; + } + public static Map createSslConfig(boolean useClientCert, boolean trustStore, Mode mode, File trustStoreFile, String certAlias) throws IOException, GeneralSecurityException { return createSslConfig(useClientCert, trustStore, mode, trustStoreFile, certAlias, "localhost"); @@ -193,7 +209,7 @@ public class TestSslUtils { File keyStoreFile = null; Password password = mode == Mode.SERVER ? new Password("ServerPassword") : new Password("ClientPassword"); - Password trustStorePassword = new Password("TrustStorePassword"); + Password trustStorePassword = new Password(TRUST_STORE_PASSWORD); if (mode == Mode.CLIENT && useClientCert) { keyStoreFile = File.createTempFile("clientKS", ".jks");