Browse Source

KAFKA-8191: Add pluggability of KeyManager to generate the broker Private Keys and Certificates

Reviewers: Sriharsha Chintalapani <sriharsha@apache.org>, Ismael Juma <ismael@juma.me.uk>
pull/6522/merge
saisandeep 6 years ago committed by Harsha
parent
commit
b074173ea2
  1. 6
      clients/src/main/java/org/apache/kafka/common/security/ssl/SslFactory.java
  2. 2
      clients/src/test/java/org/apache/kafka/common/network/SelectorTest.java
  3. 49
      clients/src/test/java/org/apache/kafka/common/network/SslSelectorTest.java
  4. 23
      clients/src/test/java/org/apache/kafka/common/security/ssl/SslFactoryTest.java
  5. 113
      clients/src/test/java/org/apache/kafka/common/security/ssl/mock/TestKeyManagerFactory.java
  6. 36
      clients/src/test/java/org/apache/kafka/common/security/ssl/mock/TestProvider.java
  7. 88
      clients/src/test/java/org/apache/kafka/common/security/ssl/mock/TestTrustManagerFactory.java
  8. 18
      clients/src/test/java/org/apache/kafka/test/TestSslUtils.java

6
clients/src/main/java/org/apache/kafka/common/security/ssl/SslFactory.java

@ -234,12 +234,16 @@ public class SslFactory implements Reconfigurable { @@ -234,12 +234,16 @@ public class SslFactory implements Reconfigurable {
sslContext = SSLContext.getInstance(protocol);
KeyManager[] keyManagers = null;
if (keystore != null) {
if (keystore != null || kmfAlgorithm != null) {
String kmfAlgorithm = this.kmfAlgorithm != null ? this.kmfAlgorithm : KeyManagerFactory.getDefaultAlgorithm();
KeyManagerFactory kmf = KeyManagerFactory.getInstance(kmfAlgorithm);
if (keystore != null) {
KeyStore ks = keystore.load();
Password keyPassword = keystore.keyPassword != null ? keystore.keyPassword : keystore.password;
kmf.init(ks, keyPassword.value().toCharArray());
} else {
kmf.init(null, null);
}
keyManagers = kmf.getKeyManagers();
}

2
clients/src/test/java/org/apache/kafka/common/network/SelectorTest.java

@ -813,7 +813,7 @@ public class SelectorTest { @@ -813,7 +813,7 @@ public class SelectorTest {
verifySelectorEmpty(this.selector);
}
private void verifySelectorEmpty(Selector selector) throws Exception {
public void verifySelectorEmpty(Selector selector) throws Exception {
for (KafkaChannel channel : selector.channels()) {
selector.close(channel.id());
assertNull(channel.selectionKey().attachment());

49
clients/src/test/java/org/apache/kafka/common/network/SslSelectorTest.java

@ -23,8 +23,12 @@ import org.apache.kafka.common.memory.SimpleMemoryPool; @@ -23,8 +23,12 @@ import org.apache.kafka.common.memory.SimpleMemoryPool;
import org.apache.kafka.common.metrics.Metrics;
import org.apache.kafka.common.security.auth.SecurityProtocol;
import org.apache.kafka.common.security.ssl.SslFactory;
import org.apache.kafka.common.security.ssl.mock.TestKeyManagerFactory;
import org.apache.kafka.common.security.ssl.mock.TestProvider;
import org.apache.kafka.common.security.ssl.mock.TestTrustManagerFactory;
import org.apache.kafka.common.utils.LogContext;
import org.apache.kafka.common.utils.MockTime;
import org.apache.kafka.common.utils.Time;
import org.apache.kafka.test.TestCondition;
import org.apache.kafka.test.TestSslUtils;
import org.apache.kafka.test.TestUtils;
@ -37,6 +41,8 @@ import java.io.IOException; @@ -37,6 +41,8 @@ import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.channels.ServerSocketChannel;
import java.nio.channels.SocketChannel;
import java.security.Provider;
import java.security.Security;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
@ -85,6 +91,49 @@ public class SslSelectorTest extends SelectorTest { @@ -85,6 +91,49 @@ public class SslSelectorTest extends SelectorTest {
return SecurityProtocol.PLAINTEXT;
}
@Test
public void testConnectionWithCustomKeyManager() throws Exception {
Provider provider = new TestProvider();
Security.addProvider(provider);
int requestSize = 100 * 1024;
final String node = "0";
String request = TestUtils.randomString(requestSize);
Map<String, Object> sslServerConfigs = TestSslUtils.createSslConfig(
TestKeyManagerFactory.ALGORITHM,
TestTrustManagerFactory.ALGORITHM
);
EchoServer server = new EchoServer(SecurityProtocol.SSL, sslServerConfigs);
server.start();
Time time = new MockTime();
File trustStoreFile = new File(TestKeyManagerFactory.TestKeyManager.mockTrustStoreFile);
Map<String, Object> sslClientConfigs = TestSslUtils.createSslConfig(true, true, Mode.CLIENT, trustStoreFile, "client");
ChannelBuilder channelBuilder = new TestSslChannelBuilder(Mode.CLIENT);
channelBuilder.configure(sslClientConfigs);
Metrics metrics = new Metrics();
Selector selector = new Selector(5000, metrics, time, "MetricGroup", channelBuilder, new LogContext());
selector.connect(node, new InetSocketAddress("localhost", server.port), BUFFER_SIZE, BUFFER_SIZE);
while (!selector.connected().contains(node))
selector.poll(10000L);
while (!selector.isChannelReady(node))
selector.poll(10000L);
selector.send(createSend(node, request));
waitForBytesBuffered(selector, node);
selector.close(node);
super.verifySelectorEmpty(selector);
Security.removeProvider(provider.getName());
selector.close();
server.close();
metrics.close();
}
@Test
public void testDisconnectWithIntermediateBufferedBytes() throws Exception {
int requestSize = 100 * 1024;

23
clients/src/test/java/org/apache/kafka/common/security/ssl/SslFactoryTest.java

@ -19,6 +19,7 @@ package org.apache.kafka.common.security.ssl; @@ -19,6 +19,7 @@ package org.apache.kafka.common.security.ssl;
import java.io.File;
import java.nio.file.Files;
import java.security.KeyStore;
import java.security.Provider;
import java.util.Map;
import javax.net.ssl.SSLContext;
@ -28,6 +29,9 @@ import javax.net.ssl.SSLHandshakeException; @@ -28,6 +29,9 @@ import javax.net.ssl.SSLHandshakeException;
import org.apache.kafka.common.config.ConfigException;
import org.apache.kafka.common.config.SslConfigs;
import org.apache.kafka.common.config.types.Password;
import org.apache.kafka.common.security.ssl.mock.TestKeyManagerFactory;
import org.apache.kafka.common.security.ssl.mock.TestProvider;
import org.apache.kafka.common.security.ssl.mock.TestTrustManagerFactory;
import org.apache.kafka.test.TestSslUtils;
import org.apache.kafka.common.network.Mode;
import org.junit.Test;
@ -41,10 +45,8 @@ import static org.junit.Assert.assertNotSame; @@ -41,10 +45,8 @@ import static org.junit.Assert.assertNotSame;
import static org.junit.Assert.assertSame;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
import java.security.Security;
/**
* A set of tests for the selector over ssl. These use a test harness that runs a simple socket server that echos back responses.
*/
public class SslFactoryTest {
@Test
@ -61,6 +63,21 @@ public class SslFactoryTest { @@ -61,6 +63,21 @@ public class SslFactoryTest {
assertEquals(false, engine.getUseClientMode());
}
@Test
public void testSslFactoryWithCustomKeyManagerConfiguration() throws Exception {
Provider provider = new TestProvider();
Security.addProvider(provider);
Map<String, Object> serverSslConfig = TestSslUtils.createSslConfig(
TestKeyManagerFactory.ALGORITHM,
TestTrustManagerFactory.ALGORITHM
);
SslFactory sslFactory = new SslFactory(Mode.SERVER);
sslFactory.configure(serverSslConfig);
SSLContext sslContext = sslFactory.createSSLContext(null, null);
assertNotNull("SSL context not created", sslContext);
Security.removeProvider(provider.getName());
}
@Test
public void testSslFactoryWithoutPasswordConfiguration() throws Exception {
File trustStoreFile = File.createTempFile("truststore", ".jks");

113
clients/src/test/java/org/apache/kafka/common/security/ssl/mock/TestKeyManagerFactory.java

@ -0,0 +1,113 @@ @@ -0,0 +1,113 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.common.security.ssl.mock;
import javax.net.ssl.KeyManager;
import javax.net.ssl.KeyManagerFactorySpi;
import javax.net.ssl.ManagerFactoryParameters;
import javax.net.ssl.X509ExtendedKeyManager;
import java.io.File;
import java.io.IOException;
import java.net.Socket;
import java.security.GeneralSecurityException;
import java.security.KeyPair;
import java.security.KeyStore;
import java.security.Principal;
import java.security.PrivateKey;
import java.security.cert.X509Certificate;
import java.util.HashMap;
import java.util.Map;
import org.apache.kafka.common.config.types.Password;
import org.apache.kafka.test.TestSslUtils;
import org.apache.kafka.test.TestSslUtils.CertificateBuilder;
public class TestKeyManagerFactory extends KeyManagerFactorySpi {
public static final String ALGORITHM = "TestAlgorithm";
@Override
protected void engineInit(KeyStore keyStore, char[] chars) {
}
@Override
protected void engineInit(ManagerFactoryParameters managerFactoryParameters) {
}
@Override
protected KeyManager[] engineGetKeyManagers() {
return new KeyManager[] {new TestKeyManager()};
}
public static class TestKeyManager extends X509ExtendedKeyManager {
public static String mockTrustStoreFile;
public static final String ALIAS = "TestAlias";
private static final String CN = "localhost";
private static final String SIGNATURE_ALGORITHM = "RSA";
private KeyPair keyPair;
private X509Certificate certificate;
protected TestKeyManager() {
try {
this.keyPair = TestSslUtils.generateKeyPair(SIGNATURE_ALGORITHM);
CertificateBuilder certBuilder = new CertificateBuilder();
this.certificate = certBuilder.generate("CN=" + CN + ", O=A server", this.keyPair);
Map<String, X509Certificate> certificates = new HashMap<>();
certificates.put(ALIAS, certificate);
File trustStoreFile = File.createTempFile("testTrustStore", ".jks");
mockTrustStoreFile = trustStoreFile.getPath();
TestSslUtils.createTrustStore(mockTrustStoreFile, new Password(TestSslUtils.TRUST_STORE_PASSWORD), certificates);
} catch (IOException | GeneralSecurityException e) {
throw new RuntimeException(e);
}
}
@Override
public String[] getClientAliases(String s, Principal[] principals) {
return new String[] {ALIAS};
}
@Override
public String chooseClientAlias(String[] strings, Principal[] principals, Socket socket) {
return ALIAS;
}
@Override
public String[] getServerAliases(String s, Principal[] principals) {
return new String[] {ALIAS};
}
@Override
public String chooseServerAlias(String s, Principal[] principals, Socket socket) {
return ALIAS;
}
@Override
public X509Certificate[] getCertificateChain(String s) {
return new X509Certificate[] {this.certificate};
}
@Override
public PrivateKey getPrivateKey(String s) {
return this.keyPair.getPrivate();
}
}
}

36
clients/src/test/java/org/apache/kafka/common/security/ssl/mock/TestProvider.java

@ -0,0 +1,36 @@ @@ -0,0 +1,36 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.common.security.ssl.mock;
import java.security.Provider;
public class TestProvider extends Provider {
private static final String KEY_MANAGER_FACTORY = String.format("KeyManagerFactory.%s", TestKeyManagerFactory.ALGORITHM);
private static final String TRUST_MANAGER_FACTORY = String.format("TrustManagerFactory.%s", TestTrustManagerFactory.ALGORITHM);
public TestProvider() {
this("TestProvider", 0.1, "provider for test cases");
}
protected TestProvider(String name, double version, String info) {
super(name, version, info);
super.put(KEY_MANAGER_FACTORY, TestKeyManagerFactory.class.getName());
super.put(TRUST_MANAGER_FACTORY, TestTrustManagerFactory.class.getName());
}
}

88
clients/src/test/java/org/apache/kafka/common/security/ssl/mock/TestTrustManagerFactory.java

@ -0,0 +1,88 @@ @@ -0,0 +1,88 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.common.security.ssl.mock;
import javax.net.ssl.ManagerFactoryParameters;
import javax.net.ssl.SSLEngine;
import javax.net.ssl.TrustManager;
import javax.net.ssl.TrustManagerFactorySpi;
import javax.net.ssl.X509ExtendedTrustManager;
import java.net.Socket;
import java.security.KeyStore;
import java.security.cert.CertificateException;
import java.security.cert.X509Certificate;
public class TestTrustManagerFactory extends TrustManagerFactorySpi {
public static final String ALGORITHM = "TestAlgorithm";
@Override
protected void engineInit(KeyStore keyStore) {
}
@Override
protected void engineInit(ManagerFactoryParameters managerFactoryParameters) {
}
@Override
protected TrustManager[] engineGetTrustManagers() {
return new TrustManager[] {new TestTrustManager()};
}
public static class TestTrustManager extends X509ExtendedTrustManager {
public static final String ALIAS = "TestAlias";
@Override
public void checkClientTrusted(X509Certificate[] x509Certificates, String s) throws CertificateException {
}
@Override
public void checkServerTrusted(X509Certificate[] x509Certificates, String s) throws CertificateException {
}
@Override
public X509Certificate[] getAcceptedIssuers() {
return new X509Certificate[0];
}
@Override
public void checkClientTrusted(X509Certificate[] x509Certificates, String s, Socket socket) throws CertificateException {
}
@Override
public void checkServerTrusted(X509Certificate[] x509Certificates, String s, Socket socket) throws CertificateException {
}
@Override
public void checkClientTrusted(X509Certificate[] x509Certificates, String s, SSLEngine sslEngine) throws CertificateException {
}
@Override
public void checkServerTrusted(X509Certificate[] x509Certificates, String s, SSLEngine sslEngine) throws CertificateException {
}
}
}

18
clients/src/test/java/org/apache/kafka/test/TestSslUtils.java

@ -70,6 +70,8 @@ import java.util.ArrayList; @@ -70,6 +70,8 @@ import java.util.ArrayList;
public class TestSslUtils {
public static final String TRUST_STORE_PASSWORD = "TrustStorePassword";
/**
* Create a self-signed X.509 Certificate.
* From http://bfo.com/blog/2011/03/08/odds_and_ends_creating_a_new_x_509_certificate.html.
@ -175,6 +177,20 @@ public class TestSslUtils { @@ -175,6 +177,20 @@ public class TestSslUtils {
return sslConfigs;
}
public static Map<String, Object> createSslConfig(String keyManagerAlgorithm, String trustManagerAlgorithm) {
Map<String, Object> sslConfigs = new HashMap<>();
sslConfigs.put(SslConfigs.SSL_PROTOCOL_CONFIG, "TLSv1.2"); // protocol to create SSLContext
sslConfigs.put(SslConfigs.SSL_KEYMANAGER_ALGORITHM_CONFIG, keyManagerAlgorithm);
sslConfigs.put(SslConfigs.SSL_TRUSTMANAGER_ALGORITHM_CONFIG, trustManagerAlgorithm);
List<String> enabledProtocols = new ArrayList<>();
enabledProtocols.add("TLSv1.2");
sslConfigs.put(SslConfigs.SSL_ENABLED_PROTOCOLS_CONFIG, enabledProtocols);
return sslConfigs;
}
public static Map<String, Object> createSslConfig(boolean useClientCert, boolean trustStore, Mode mode, File trustStoreFile, String certAlias)
throws IOException, GeneralSecurityException {
return createSslConfig(useClientCert, trustStore, mode, trustStoreFile, certAlias, "localhost");
@ -193,7 +209,7 @@ public class TestSslUtils { @@ -193,7 +209,7 @@ public class TestSslUtils {
File keyStoreFile = null;
Password password = mode == Mode.SERVER ? new Password("ServerPassword") : new Password("ClientPassword");
Password trustStorePassword = new Password("TrustStorePassword");
Password trustStorePassword = new Password(TRUST_STORE_PASSWORD);
if (mode == Mode.CLIENT && useClientCert) {
keyStoreFile = File.createTempFile("clientKS", ".jks");

Loading…
Cancel
Save