Browse Source

KAFKA-3799: Enable SSL endpoint validation in system tests

Generate certificates with hostname in SubjectAlternativeName and enable hostname validation.

Author: Rajini Sivaram <rajinisivaram@googlemail.com>

Reviewers: Sriharsha Chintalapani <harsha@hortonworks.com>, Ismael Juma <ismael@juma.me.uk>, Ewen Cheslack-Postava <ewen@confluent.io>

Closes #1483 from rajinisivaram/KAFKA-3799
pull/1483/merge
Rajini Sivaram 8 years ago committed by Ewen Cheslack-Postava
parent
commit
2e731a9ee0
  1. 1
      tests/kafkatest/services/kafka/templates/kafka.properties
  2. 2
      tests/kafkatest/services/kafka_log4j_appender.py
  3. 97
      tests/kafkatest/services/security/security_config.py
  4. 106
      tests/kafkatest/tests/core/security_test.py

1
tests/kafkatest/services/kafka/templates/kafka.properties

@ -58,6 +58,7 @@ ssl.keystore.type=JKS @@ -58,6 +58,7 @@ ssl.keystore.type=JKS
ssl.truststore.location=/mnt/security/test.truststore.jks
ssl.truststore.password=test-ts-passwd
ssl.truststore.type=JKS
ssl.endpoint.identification.algorithm=HTTPS
sasl.mechanism.inter.broker.protocol={{ security_config.interbroker_sasl_mechanism }}
sasl.enabled.mechanisms={{ ",".join(security_config.enabled_sasl_mechanisms) }}
sasl.kerberos.service.name=kafka

2
tests/kafkatest/services/kafka_log4j_appender.py

@ -53,7 +53,7 @@ class KafkaLog4jAppender(KafkaPathResolverMixin, BackgroundThreadService): @@ -53,7 +53,7 @@ class KafkaLog4jAppender(KafkaPathResolverMixin, BackgroundThreadService):
cmd += " --security-protocol %s" % str(self.security_protocol)
if self.security_protocol == SecurityConfig.SSL or self.security_protocol == SecurityConfig.SASL_SSL:
cmd += " --ssl-truststore-location %s" % str(SecurityConfig.TRUSTSTORE_PATH)
cmd += " --ssl-truststore-password %s" % str(SecurityConfig.ssl_stores['ssl.truststore.password'])
cmd += " --ssl-truststore-password %s" % str(SecurityConfig.ssl_stores.truststore_passwd)
if self.security_protocol == SecurityConfig.SASL_PLAINTEXT or \
self.security_protocol == SecurityConfig.SASL_SSL or \
self.security_protocol == SecurityConfig.SASL_MECHANISM_GSSAPI or \

97
tests/kafkatest/services/security/security_config.py

@ -15,43 +15,67 @@ @@ -15,43 +15,67 @@
import os
import subprocess
from tempfile import mkdtemp
from shutil import rmtree
from ducktape.template import TemplateRenderer
from kafkatest.services.security.minikdc import MiniKdc
import itertools
class Keytool(object):
class SslStores(object):
def __init__(self):
self.ca_crt_path = "/tmp/test.ca.crt"
self.ca_jks_path = "/tmp/test.ca.jks"
self.ca_passwd = "test-ca-passwd"
@staticmethod
def generate_keystore_truststore(ssl_dir='.'):
self.truststore_path = "/tmp/test.truststore.jks"
self.truststore_passwd = "test-ts-passwd"
self.keystore_passwd = "test-ks-passwd"
self.key_passwd = "test-key-passwd"
for file in [self.ca_crt_path, self.ca_jks_path, self.truststore_path]:
if os.path.exists(file):
os.remove(file)
def generate_ca(self):
"""
Generate JKS keystore and truststore and return
Kafka SSL properties with these stores.
Generate CA private key and certificate.
"""
ks_path = os.path.join(ssl_dir, 'test.keystore.jks')
ks_password = 'test-ks-passwd'
key_password = 'test-key-passwd'
ts_path = os.path.join(ssl_dir, 'test.truststore.jks')
ts_password = 'test-ts-passwd'
if os.path.exists(ks_path):
os.remove(ks_path)
if os.path.exists(ts_path):
os.remove(ts_path)
Keytool.runcmd("keytool -genkeypair -alias test -keyalg RSA -keysize 2048 -keystore %s -storetype JKS -keypass %s -storepass %s -dname CN=systemtest" % (ks_path, key_password, ks_password))
Keytool.runcmd("keytool -export -alias test -keystore %s -storepass %s -storetype JKS -rfc -file test.crt" % (ks_path, ks_password))
Keytool.runcmd("keytool -import -alias test -file test.crt -keystore %s -storepass %s -storetype JKS -noprompt" % (ts_path, ts_password))
os.remove('test.crt')
return {
'ssl.keystore.location' : ks_path,
'ssl.keystore.password' : ks_password,
'ssl.key.password' : key_password,
'ssl.truststore.location' : ts_path,
'ssl.truststore.password' : ts_password
}
@staticmethod
def runcmd(cmd):
self.runcmd("keytool -genkeypair -alias ca -keyalg RSA -keysize 2048 -keystore %s -storetype JKS -storepass %s -keypass %s -dname CN=SystemTestCA" % (self.ca_jks_path, self.ca_passwd, self.ca_passwd))
self.runcmd("keytool -export -alias ca -keystore %s -storepass %s -storetype JKS -rfc -file %s" % (self.ca_jks_path, self.ca_passwd, self.ca_crt_path))
def generate_truststore(self):
"""
Generate JKS truststore containing CA certificate.
"""
self.runcmd("keytool -importcert -alias ca -file %s -keystore %s -storepass %s -storetype JKS -noprompt" % (self.ca_crt_path, self.truststore_path, self.truststore_passwd))
def generate_and_copy_keystore(self, node):
"""
Generate JKS keystore with certificate signed by the test CA.
The generated certificate has the node's hostname as a DNS SubjectAlternativeName.
"""
ks_dir = mkdtemp(dir="/tmp")
ks_path = os.path.join(ks_dir, "test.keystore.jks")
csr_path = os.path.join(ks_dir, "test.kafka.csr")
crt_path = os.path.join(ks_dir, "test.kafka.crt")
self.runcmd("keytool -genkeypair -alias kafka -keyalg RSA -keysize 2048 -keystore %s -storepass %s -keypass %s -dname CN=systemtest -ext SAN=DNS:%s" % (ks_path, self.keystore_passwd, self.key_passwd, self.hostname(node)))
self.runcmd("keytool -certreq -keystore %s -storepass %s -keypass %s -alias kafka -file %s" % (ks_path, self.keystore_passwd, self.key_passwd, csr_path))
self.runcmd("keytool -gencert -keystore %s -storepass %s -alias ca -infile %s -outfile %s -dname CN=systemtest -ext SAN=DNS:%s" % (self.ca_jks_path, self.ca_passwd, csr_path, crt_path, self.hostname(node)))
self.runcmd("keytool -importcert -keystore %s -storepass %s -alias ca -file %s -noprompt" % (ks_path, self.keystore_passwd, self.ca_crt_path))
self.runcmd("keytool -importcert -keystore %s -storepass %s -keypass %s -alias kafka -file %s -noprompt" % (ks_path, self.keystore_passwd, self.key_passwd, crt_path))
node.account.scp_to(ks_path, SecurityConfig.KEYSTORE_PATH)
rmtree(ks_dir)
def hostname(self, node):
""" Hostname which may be overridden for testing validation failures
"""
return node.account.hostname
def runcmd(self, cmd):
proc = subprocess.Popen(cmd, shell=True, stdout=subprocess.PIPE, stderr=subprocess.STDOUT)
proc.communicate()
if proc.returncode != 0:
@ -73,7 +97,9 @@ class SecurityConfig(TemplateRenderer): @@ -73,7 +97,9 @@ class SecurityConfig(TemplateRenderer):
KRB5CONF_PATH = "/mnt/security/krb5.conf"
KEYTAB_PATH = "/mnt/security/keytab"
ssl_stores = Keytool.generate_keystore_truststore('.')
ssl_stores = SslStores()
ssl_stores.generate_ca()
ssl_stores.generate_truststore()
def __init__(self, security_protocol=None, interbroker_security_protocol=None,
client_sasl_mechanism=SASL_MECHANISM_GSSAPI, interbroker_sasl_mechanism=SASL_MECHANISM_GSSAPI,
@ -102,10 +128,11 @@ class SecurityConfig(TemplateRenderer): @@ -102,10 +128,11 @@ class SecurityConfig(TemplateRenderer):
self.properties = {
'security.protocol' : security_protocol,
'ssl.keystore.location' : SecurityConfig.KEYSTORE_PATH,
'ssl.keystore.password' : SecurityConfig.ssl_stores['ssl.keystore.password'],
'ssl.key.password' : SecurityConfig.ssl_stores['ssl.key.password'],
'ssl.keystore.password' : SecurityConfig.ssl_stores.keystore_passwd,
'ssl.key.password' : SecurityConfig.ssl_stores.key_passwd,
'ssl.truststore.location' : SecurityConfig.TRUSTSTORE_PATH,
'ssl.truststore.password' : SecurityConfig.ssl_stores['ssl.truststore.password'],
'ssl.truststore.password' : SecurityConfig.ssl_stores.truststore_passwd,
'ssl.endpoint.identification.algorithm' : 'HTTPS',
'sasl.mechanism' : client_sasl_mechanism,
'sasl.mechanism.inter.broker.protocol' : interbroker_sasl_mechanism,
'sasl.kerberos.service.name' : 'kafka'
@ -117,8 +144,8 @@ class SecurityConfig(TemplateRenderer): @@ -117,8 +144,8 @@ class SecurityConfig(TemplateRenderer):
def setup_ssl(self, node):
node.account.ssh("mkdir -p %s" % SecurityConfig.CONFIG_DIR, allow_fail=False)
node.account.scp_to(SecurityConfig.ssl_stores['ssl.keystore.location'], SecurityConfig.KEYSTORE_PATH)
node.account.scp_to(SecurityConfig.ssl_stores['ssl.truststore.location'], SecurityConfig.TRUSTSTORE_PATH)
node.account.scp_to(SecurityConfig.ssl_stores.truststore_path, SecurityConfig.TRUSTSTORE_PATH)
SecurityConfig.ssl_stores.generate_and_copy_keystore(node)
def setup_sasl(self, node):
node.account.ssh("mkdir -p %s" % SecurityConfig.CONFIG_DIR, allow_fail=False)

106
tests/kafkatest/tests/core/security_test.py

@ -0,0 +1,106 @@ @@ -0,0 +1,106 @@
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from ducktape.mark import parametrize
from kafkatest.services.zookeeper import ZookeeperService
from kafkatest.services.kafka import KafkaService
from kafkatest.services.verifiable_producer import VerifiableProducer
from kafkatest.services.console_consumer import ConsoleConsumer
from kafkatest.services.security.security_config import SecurityConfig
from kafkatest.services.security.security_config import SslStores
from kafkatest.tests.produce_consume_validate import ProduceConsumeValidateTest
from kafkatest.utils import is_int
import time
class TestSslStores(SslStores):
def __init__(self):
super(TestSslStores, self).__init__()
self.invalid_hostname = False
self.generate_ca()
self.generate_truststore()
def hostname(self, node):
if (self.invalid_hostname):
return "invalidhost"
else:
return super(TestSslStores, self).hostname(node)
class SecurityTest(ProduceConsumeValidateTest):
"""
These tests validate security features.
"""
def __init__(self, test_context):
""":type test_context: ducktape.tests.test.TestContext"""
super(SecurityTest, self).__init__(test_context=test_context)
self.topic = "test_topic"
self.zk = ZookeeperService(test_context, num_nodes=1)
self.kafka = KafkaService(test_context, num_nodes=1, zk=self.zk, topics={self.topic: {
"partitions": 2,
"replication-factor": 1}
})
self.num_partitions = 2
self.timeout_sec = 10000
self.producer_throughput = 1000
self.num_producers = 1
self.num_consumers = 1
def setUp(self):
self.zk.start()
@parametrize(security_protocol='PLAINTEXT', interbroker_security_protocol='SSL')
@parametrize(security_protocol='SSL', interbroker_security_protocol='PLAINTEXT')
def test_client_ssl_endpoint_validation_failure(self, security_protocol, interbroker_security_protocol):
"""
Test that invalid hostname in certificate results in connection failures.
When security_protocol=SSL, client SSL handshakes are expected to fail due to hostname verification failure.
When security_protocol=PLAINTEXT and interbroker_security_protocol=SSL, controller connections fail
with hostname verification failure. Hence clients are expected to fail with LEADER_NOT_AVAILABLE.
"""
self.kafka.security_protocol = security_protocol
self.kafka.interbroker_security_protocol = interbroker_security_protocol
SecurityConfig.ssl_stores = TestSslStores()
SecurityConfig.ssl_stores.invalid_hostname = True
self.kafka.start()
self.create_producer_and_consumer()
self.producer.log_level = "TRACE"
self.producer.start()
self.consumer.start()
time.sleep(10)
assert self.producer.num_acked == 0, "Messages published successfully, endpoint validation did not fail with invalid hostname"
error = 'SSLHandshakeException' if security_protocol is 'SSL' else 'LEADER_NOT_AVAILABLE'
for node in self.producer.nodes:
node.account.ssh("grep %s %s" % (error, self.producer.LOG_FILE))
for node in self.consumer.nodes:
node.account.ssh("grep %s %s" % (error, self.consumer.LOG_FILE))
self.producer.stop()
self.consumer.stop()
self.producer.log_level = "INFO"
SecurityConfig.ssl_stores.invalid_hostname = False
for node in self.kafka.nodes:
self.kafka.restart_node(node, clean_shutdown=True)
self.create_producer_and_consumer()
self.run_produce_consume_validate()
def create_producer_and_consumer(self):
self.producer = VerifiableProducer(self.test_context, self.num_producers, self.kafka, self.topic, throughput=self.producer_throughput)
self.consumer = ConsoleConsumer(self.test_context, self.num_consumers, self.kafka, self.topic, new_consumer=True, consumer_timeout_ms=10000, message_validator=is_int)
Loading…
Cancel
Save