Browse Source

KAFKA-15502: Update SslEngineValidator to handle large stores (#14445)

We have observed an issue where inter broker SSL listener is not coming up when running with TLSv3/JDK 17 .
SSL debug logs shows that TLSv3 post handshake messages >16K are not getting read and causing SslEngineValidator process to stuck while validating the provided trust/key store.

- Right now, WRAP returns if there is already data in the buffer. But if we need more data to be wrapped for UNWRAP to succeed, we end up looping forever. To fix this, now we always attempt WRAP and only return early on BUFFER_OVERFLOW.
- Update SslEngineValidator to unwrap post-handshake messages from peer when local handshake status is FINISHED.

Reviewers: Rajini Sivaram <rajinisivaram@googlemail.com>
pull/14265/head
Manikumar Reddy 1 year ago committed by GitHub
parent
commit
170550af40
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 58
      clients/src/main/java/org/apache/kafka/common/security/ssl/SslFactory.java
  2. 29
      clients/src/test/java/org/apache/kafka/common/security/ssl/SslFactoryTest.java
  3. 83
      clients/src/test/java/org/apache/kafka/test/TestSslUtils.java

58
clients/src/main/java/org/apache/kafka/common/security/ssl/SslFactory.java

@ -419,12 +419,12 @@ public class SslFactory implements Reconfigurable, Closeable { @@ -419,12 +419,12 @@ public class SslFactory implements Reconfigurable, Closeable {
while (true) {
switch (handshakeStatus) {
case NEED_WRAP:
if (netBuffer.position() != 0) // Wait for peer to consume previously wrapped data
return;
handshakeResult = sslEngine.wrap(EMPTY_BUF, netBuffer);
switch (handshakeResult.getStatus()) {
case OK: break;
case BUFFER_OVERFLOW:
if (netBuffer.position() != 0) // Wait for peer to consume previously wrapped data
return;
netBuffer.compact();
netBuffer = Utils.ensureCapacity(netBuffer, sslEngine.getSession().getPacketBufferSize());
netBuffer.flip();
@ -436,24 +436,8 @@ public class SslFactory implements Reconfigurable, Closeable { @@ -436,24 +436,8 @@ public class SslFactory implements Reconfigurable, Closeable {
}
return;
case NEED_UNWRAP:
if (peerValidator.netBuffer.position() == 0) // no data to unwrap, return to process peer
return;
peerValidator.netBuffer.flip(); // unwrap the data from peer
handshakeResult = sslEngine.unwrap(peerValidator.netBuffer, appBuffer);
peerValidator.netBuffer.compact();
handshakeStatus = handshakeResult.getHandshakeStatus();
switch (handshakeResult.getStatus()) {
case OK: break;
case BUFFER_OVERFLOW:
appBuffer = Utils.ensureCapacity(appBuffer, sslEngine.getSession().getApplicationBufferSize());
break;
case BUFFER_UNDERFLOW:
netBuffer = Utils.ensureCapacity(netBuffer, sslEngine.getSession().getPacketBufferSize());
break;
case CLOSED:
default:
throw new SSLException("Unexpected handshake status: " + handshakeResult.getStatus());
}
handshakeStatus = unwrap(peerValidator, true);
if (handshakeStatus == null) return;
break;
case NEED_TASK:
sslEngine.getDelegatedTask().run();
@ -463,14 +447,44 @@ public class SslFactory implements Reconfigurable, Closeable { @@ -463,14 +447,44 @@ public class SslFactory implements Reconfigurable, Closeable {
return;
case NOT_HANDSHAKING:
if (handshakeResult.getHandshakeStatus() != SSLEngineResult.HandshakeStatus.FINISHED)
throw new SSLException("Did not finish handshake");
throw new SSLException("Did not finish handshake, handshake status: " + handshakeResult.getHandshakeStatus());
else if (peerValidator.netBuffer.position() != 0) {
unwrap(peerValidator, false);
}
return;
default:
throw new IllegalStateException("Unexpected handshake status " + handshakeStatus);
throw new IllegalStateException("Unexpected handshake status: " + handshakeStatus);
}
}
}
private SSLEngineResult.HandshakeStatus unwrap(SslEngineValidator peerValidator, boolean updateHandshakeResult) throws SSLException {
// Unwrap regardless of whether there is data in the buffer to ensure that
// handshake status is updated if required.
peerValidator.netBuffer.flip(); // unwrap the data from peer
SSLEngineResult sslEngineResult = sslEngine.unwrap(peerValidator.netBuffer, appBuffer);
if (updateHandshakeResult) {
handshakeResult = sslEngineResult;
}
peerValidator.netBuffer.compact();
SSLEngineResult.HandshakeStatus handshakeStatus = sslEngineResult.getHandshakeStatus();
switch (sslEngineResult.getStatus()) {
case OK: break;
case BUFFER_OVERFLOW:
appBuffer = Utils.ensureCapacity(appBuffer, sslEngine.getSession().getApplicationBufferSize());
break;
case BUFFER_UNDERFLOW:
netBuffer = Utils.ensureCapacity(netBuffer, sslEngine.getSession().getPacketBufferSize());
// BUFFER_UNDERFLOW typically indicates that we need more data from peer,
// so return to process peer.
return null;
case CLOSED:
default:
throw new SSLException("Unexpected handshake status: " + sslEngineResult.getStatus());
}
return handshakeStatus;
}
boolean complete() {
return sslEngine.getHandshakeStatus() == SSLEngineResult.HandshakeStatus.FINISHED ||
sslEngine.getHandshakeStatus() == SSLEngineResult.HandshakeStatus.NOT_HANDSHAKING;

29
clients/src/test/java/org/apache/kafka/common/security/ssl/SslFactoryTest.java

@ -74,13 +74,28 @@ public abstract class SslFactoryTest { @@ -74,13 +74,28 @@ public abstract class SslFactoryTest {
Map<String, Object> serverSslConfig = sslConfigsBuilder(Mode.SERVER)
.createNewTrustStore(trustStoreFile)
.build();
SslFactory sslFactory = new SslFactory(Mode.SERVER);
sslFactory.configure(serverSslConfig);
//host and port are hints
SSLEngine engine = sslFactory.createSslEngine("localhost", 0);
assertNotNull(engine);
assertEquals(Utils.mkSet(tlsProtocol), Utils.mkSet(engine.getEnabledProtocols()));
assertFalse(engine.getUseClientMode());
try (SslFactory sslFactory = new SslFactory(Mode.SERVER, null, true)) {
sslFactory.configure(serverSslConfig);
//host and port are hints
SSLEngine engine = sslFactory.createSslEngine("localhost", 0);
assertNotNull(engine);
assertEquals(Utils.mkSet(tlsProtocol), Utils.mkSet(engine.getEnabledProtocols()));
assertFalse(engine.getUseClientMode());
}
}
@Test
public void testSslFactoryConfigWithManyKeyStoreEntries() throws Exception {
//generate server configs for keystore with multiple certificate chain
Map<String, Object> serverSslConfig = TestSslUtils.generateConfigsWithCertificateChains(tlsProtocol);
try (SslFactory sslFactory = new SslFactory(Mode.SERVER, null, true)) {
sslFactory.configure(serverSslConfig);
SSLEngine engine = sslFactory.createSslEngine("localhost", 0);
assertNotNull(engine);
assertEquals(Utils.mkSet(tlsProtocol), Utils.mkSet(engine.getEnabledProtocols()));
assertFalse(engine.getUseClientMode());
}
}
@Test

83
clients/src/test/java/org/apache/kafka/test/TestSslUtils.java

@ -142,6 +142,14 @@ public class TestSslUtils { @@ -142,6 +142,14 @@ public class TestSslUtils {
daysBeforeNow, daysAfterNow, issuer, parentKeyPair, isCA, isServerCert, isClientCert);
}
public static X509Certificate generateSignedCertificate(String dn, KeyPair keyPair,
int daysBeforeNow, int daysAfterNow, String issuer, KeyPair parentKeyPair,
String algorithm, boolean isCA, boolean isServerCert, boolean isClientCert,
String[] hostNames) throws CertificateException, IOException {
return new CertificateBuilder(0, algorithm).sanDnsNames(hostNames).generateSignedCertificate(dn, keyPair,
daysBeforeNow, daysAfterNow, issuer, parentKeyPair, isCA, isServerCert, isClientCert);
}
public static KeyPair generateKeyPair(String algorithm) throws NoSuchAlgorithmException {
KeyPairGenerator keyGen = KeyPairGenerator.getInstance(algorithm);
keyGen.initialize(algorithm.equals("EC") ? 256 : 2048);
@ -758,4 +766,79 @@ public class TestSslUtils { @@ -758,4 +766,79 @@ public class TestSslUtils {
defaultSslEngineFactory.configure(configs);
}
}
/**
* method to generate ssl configs for keystore with large number of entries. This is used to verify large key stores and
* post-handshake messages in SslEngineValidator with TLSv3/JDK17+
* @param tlsProtocol
* @return ssl configs
* @throws Exception
*/
public static Map<String, Object> generateConfigsWithCertificateChains(String tlsProtocol) throws Exception {
int nrOfCerts = 10;
KeyPair[] keyPairs = new KeyPair[nrOfCerts];
for (int i = 0; i < nrOfCerts; i++) {
keyPairs[i] = TestSslUtils.generateKeyPair("RSA");
}
//add a bunch of hostNames to keystore to increase the keystore size
String[] hostNames = new String[150];
for (int i = 0; i < hostNames.length; i++) {
hostNames[i] = "hostName" + i;
}
X509Certificate[] certs = new X509Certificate[nrOfCerts];
// Generate root CA
int caIndex = nrOfCerts - 1;
certs[caIndex] = TestSslUtils.generateSignedCertificate("CN=CA", keyPairs[caIndex], 365,
365, null, null, "SHA512withRSA", true, false, false, hostNames);
//Generate Intermediate certificates
for (int intermediateCertIndex = caIndex - 1; intermediateCertIndex > 0; intermediateCertIndex--) {
certs[intermediateCertIndex] = TestSslUtils.generateSignedCertificate("CN=Intermediate CA" + intermediateCertIndex,
keyPairs[intermediateCertIndex], 365, 365, certs[intermediateCertIndex + 1].getSubjectX500Principal().getName(),
keyPairs[intermediateCertIndex + 1], "SHA512withRSA", true, false, false, hostNames);
}
// Generate a valid end certificate
certs[0] = TestSslUtils.generateSignedCertificate("CN=kafka", keyPairs[0], 1, 1,
certs[1].getSubjectX500Principal().getName(), keyPairs[1], "SHA512withRSA", false, true, true, hostNames);
File keystoreStoreFile = TestUtils.tempFile("keystore", ".jks");
Password keyStorePassword = new Password("password");
KeyStore keyStore = KeyStore.getInstance("PKCS12");
keyStore.load(null, null);
keyStore.setKeyEntry("issued-cert", keyPairs[0].getPrivate(), keyStorePassword.value().toCharArray(), certs);
saveKeyStore(keyStore, keystoreStoreFile.getPath(), keyStorePassword);
File trustStoreFile = TestUtils.tempFile("truststore", ".jks");
Password trustStorePassword = new Password("password");
KeyStore trustStore = KeyStore.getInstance("PKCS12");
trustStore.load(null, null);
for (X509Certificate cert : certs) {
trustStore.setCertificateEntry(cert.getSubjectX500Principal().getName(), cert);
}
saveKeyStore(trustStore, trustStoreFile.getPath(), trustStorePassword);
Map<String, Object> sslConfigs = new HashMap<>();
sslConfigs.put(SslConfigs.SSL_PROTOCOL_CONFIG, tlsProtocol); // protocol to create SSLContext
sslConfigs.put(SslConfigs.SSL_KEYSTORE_LOCATION_CONFIG, keystoreStoreFile.getPath());
sslConfigs.put(SslConfigs.SSL_KEYSTORE_TYPE_CONFIG, "JKS");
sslConfigs.put(SslConfigs.SSL_KEYMANAGER_ALGORITHM_CONFIG, TrustManagerFactory.getDefaultAlgorithm());
sslConfigs.put(SslConfigs.SSL_KEYSTORE_PASSWORD_CONFIG, keyStorePassword);
sslConfigs.put(SslConfigs.SSL_KEY_PASSWORD_CONFIG, keyStorePassword);
sslConfigs.put(SslConfigs.SSL_TRUSTSTORE_LOCATION_CONFIG, trustStoreFile.getPath());
sslConfigs.put(SslConfigs.SSL_TRUSTSTORE_PASSWORD_CONFIG, trustStorePassword);
sslConfigs.put(SslConfigs.SSL_TRUSTSTORE_TYPE_CONFIG, "JKS");
sslConfigs.put(SslConfigs.SSL_TRUSTMANAGER_ALGORITHM_CONFIG, TrustManagerFactory.getDefaultAlgorithm());
List<String> enabledProtocols = new ArrayList<>();
enabledProtocols.add(tlsProtocol);
sslConfigs.put(SslConfigs.SSL_ENABLED_PROTOCOLS_CONFIG, enabledProtocols);
return sslConfigs;
}
}

Loading…
Cancel
Save