From 2e731a9ee002298b4b90f97e9c876b330b005539 Mon Sep 17 00:00:00 2001 From: Rajini Sivaram Date: Wed, 31 Aug 2016 09:14:59 -0700 Subject: [PATCH] KAFKA-3799: Enable SSL endpoint validation in system tests Generate certificates with hostname in SubjectAlternativeName and enable hostname validation. Author: Rajini Sivaram Reviewers: Sriharsha Chintalapani , Ismael Juma , Ewen Cheslack-Postava Closes #1483 from rajinisivaram/KAFKA-3799 --- .../services/kafka/templates/kafka.properties | 1 + .../services/kafka_log4j_appender.py | 2 +- .../services/security/security_config.py | 97 ++++++++++------ tests/kafkatest/tests/core/security_test.py | 106 ++++++++++++++++++ 4 files changed, 170 insertions(+), 36 deletions(-) create mode 100644 tests/kafkatest/tests/core/security_test.py diff --git a/tests/kafkatest/services/kafka/templates/kafka.properties b/tests/kafkatest/services/kafka/templates/kafka.properties index 9924aebb2dc..c02c64f875a 100644 --- a/tests/kafkatest/services/kafka/templates/kafka.properties +++ b/tests/kafkatest/services/kafka/templates/kafka.properties @@ -58,6 +58,7 @@ ssl.keystore.type=JKS ssl.truststore.location=/mnt/security/test.truststore.jks ssl.truststore.password=test-ts-passwd ssl.truststore.type=JKS +ssl.endpoint.identification.algorithm=HTTPS sasl.mechanism.inter.broker.protocol={{ security_config.interbroker_sasl_mechanism }} sasl.enabled.mechanisms={{ ",".join(security_config.enabled_sasl_mechanisms) }} sasl.kerberos.service.name=kafka diff --git a/tests/kafkatest/services/kafka_log4j_appender.py b/tests/kafkatest/services/kafka_log4j_appender.py index c50cab423eb..b25d8be92e4 100644 --- a/tests/kafkatest/services/kafka_log4j_appender.py +++ b/tests/kafkatest/services/kafka_log4j_appender.py @@ -53,7 +53,7 @@ class KafkaLog4jAppender(KafkaPathResolverMixin, BackgroundThreadService): cmd += " --security-protocol %s" % str(self.security_protocol) if self.security_protocol == SecurityConfig.SSL or self.security_protocol == SecurityConfig.SASL_SSL: cmd += " --ssl-truststore-location %s" % str(SecurityConfig.TRUSTSTORE_PATH) - cmd += " --ssl-truststore-password %s" % str(SecurityConfig.ssl_stores['ssl.truststore.password']) + cmd += " --ssl-truststore-password %s" % str(SecurityConfig.ssl_stores.truststore_passwd) if self.security_protocol == SecurityConfig.SASL_PLAINTEXT or \ self.security_protocol == SecurityConfig.SASL_SSL or \ self.security_protocol == SecurityConfig.SASL_MECHANISM_GSSAPI or \ diff --git a/tests/kafkatest/services/security/security_config.py b/tests/kafkatest/services/security/security_config.py index 59a0ed4f528..40674e85d08 100644 --- a/tests/kafkatest/services/security/security_config.py +++ b/tests/kafkatest/services/security/security_config.py @@ -15,43 +15,67 @@ import os import subprocess +from tempfile import mkdtemp +from shutil import rmtree from ducktape.template import TemplateRenderer from kafkatest.services.security.minikdc import MiniKdc import itertools -class Keytool(object): +class SslStores(object): + def __init__(self): + self.ca_crt_path = "/tmp/test.ca.crt" + self.ca_jks_path = "/tmp/test.ca.jks" + self.ca_passwd = "test-ca-passwd" - @staticmethod - def generate_keystore_truststore(ssl_dir='.'): + self.truststore_path = "/tmp/test.truststore.jks" + self.truststore_passwd = "test-ts-passwd" + self.keystore_passwd = "test-ks-passwd" + self.key_passwd = "test-key-passwd" + + for file in [self.ca_crt_path, self.ca_jks_path, self.truststore_path]: + if os.path.exists(file): + os.remove(file) + + def generate_ca(self): """ - Generate JKS keystore and truststore and return - Kafka SSL properties with these stores. + Generate CA private key and certificate. """ - ks_path = os.path.join(ssl_dir, 'test.keystore.jks') - ks_password = 'test-ks-passwd' - key_password = 'test-key-passwd' - ts_path = os.path.join(ssl_dir, 'test.truststore.jks') - ts_password = 'test-ts-passwd' - if os.path.exists(ks_path): - os.remove(ks_path) - if os.path.exists(ts_path): - os.remove(ts_path) - - Keytool.runcmd("keytool -genkeypair -alias test -keyalg RSA -keysize 2048 -keystore %s -storetype JKS -keypass %s -storepass %s -dname CN=systemtest" % (ks_path, key_password, ks_password)) - Keytool.runcmd("keytool -export -alias test -keystore %s -storepass %s -storetype JKS -rfc -file test.crt" % (ks_path, ks_password)) - Keytool.runcmd("keytool -import -alias test -file test.crt -keystore %s -storepass %s -storetype JKS -noprompt" % (ts_path, ts_password)) - os.remove('test.crt') - - return { - 'ssl.keystore.location' : ks_path, - 'ssl.keystore.password' : ks_password, - 'ssl.key.password' : key_password, - 'ssl.truststore.location' : ts_path, - 'ssl.truststore.password' : ts_password - } - @staticmethod - def runcmd(cmd): + self.runcmd("keytool -genkeypair -alias ca -keyalg RSA -keysize 2048 -keystore %s -storetype JKS -storepass %s -keypass %s -dname CN=SystemTestCA" % (self.ca_jks_path, self.ca_passwd, self.ca_passwd)) + self.runcmd("keytool -export -alias ca -keystore %s -storepass %s -storetype JKS -rfc -file %s" % (self.ca_jks_path, self.ca_passwd, self.ca_crt_path)) + + def generate_truststore(self): + """ + Generate JKS truststore containing CA certificate. + """ + + self.runcmd("keytool -importcert -alias ca -file %s -keystore %s -storepass %s -storetype JKS -noprompt" % (self.ca_crt_path, self.truststore_path, self.truststore_passwd)) + + def generate_and_copy_keystore(self, node): + """ + Generate JKS keystore with certificate signed by the test CA. + The generated certificate has the node's hostname as a DNS SubjectAlternativeName. + """ + + ks_dir = mkdtemp(dir="/tmp") + ks_path = os.path.join(ks_dir, "test.keystore.jks") + csr_path = os.path.join(ks_dir, "test.kafka.csr") + crt_path = os.path.join(ks_dir, "test.kafka.crt") + + self.runcmd("keytool -genkeypair -alias kafka -keyalg RSA -keysize 2048 -keystore %s -storepass %s -keypass %s -dname CN=systemtest -ext SAN=DNS:%s" % (ks_path, self.keystore_passwd, self.key_passwd, self.hostname(node))) + self.runcmd("keytool -certreq -keystore %s -storepass %s -keypass %s -alias kafka -file %s" % (ks_path, self.keystore_passwd, self.key_passwd, csr_path)) + self.runcmd("keytool -gencert -keystore %s -storepass %s -alias ca -infile %s -outfile %s -dname CN=systemtest -ext SAN=DNS:%s" % (self.ca_jks_path, self.ca_passwd, csr_path, crt_path, self.hostname(node))) + self.runcmd("keytool -importcert -keystore %s -storepass %s -alias ca -file %s -noprompt" % (ks_path, self.keystore_passwd, self.ca_crt_path)) + self.runcmd("keytool -importcert -keystore %s -storepass %s -keypass %s -alias kafka -file %s -noprompt" % (ks_path, self.keystore_passwd, self.key_passwd, crt_path)) + node.account.scp_to(ks_path, SecurityConfig.KEYSTORE_PATH) + rmtree(ks_dir) + + def hostname(self, node): + """ Hostname which may be overridden for testing validation failures + """ + return node.account.hostname + + def runcmd(self, cmd): proc = subprocess.Popen(cmd, shell=True, stdout=subprocess.PIPE, stderr=subprocess.STDOUT) proc.communicate() if proc.returncode != 0: @@ -73,7 +97,9 @@ class SecurityConfig(TemplateRenderer): KRB5CONF_PATH = "/mnt/security/krb5.conf" KEYTAB_PATH = "/mnt/security/keytab" - ssl_stores = Keytool.generate_keystore_truststore('.') + ssl_stores = SslStores() + ssl_stores.generate_ca() + ssl_stores.generate_truststore() def __init__(self, security_protocol=None, interbroker_security_protocol=None, client_sasl_mechanism=SASL_MECHANISM_GSSAPI, interbroker_sasl_mechanism=SASL_MECHANISM_GSSAPI, @@ -102,10 +128,11 @@ class SecurityConfig(TemplateRenderer): self.properties = { 'security.protocol' : security_protocol, 'ssl.keystore.location' : SecurityConfig.KEYSTORE_PATH, - 'ssl.keystore.password' : SecurityConfig.ssl_stores['ssl.keystore.password'], - 'ssl.key.password' : SecurityConfig.ssl_stores['ssl.key.password'], + 'ssl.keystore.password' : SecurityConfig.ssl_stores.keystore_passwd, + 'ssl.key.password' : SecurityConfig.ssl_stores.key_passwd, 'ssl.truststore.location' : SecurityConfig.TRUSTSTORE_PATH, - 'ssl.truststore.password' : SecurityConfig.ssl_stores['ssl.truststore.password'], + 'ssl.truststore.password' : SecurityConfig.ssl_stores.truststore_passwd, + 'ssl.endpoint.identification.algorithm' : 'HTTPS', 'sasl.mechanism' : client_sasl_mechanism, 'sasl.mechanism.inter.broker.protocol' : interbroker_sasl_mechanism, 'sasl.kerberos.service.name' : 'kafka' @@ -117,8 +144,8 @@ class SecurityConfig(TemplateRenderer): def setup_ssl(self, node): node.account.ssh("mkdir -p %s" % SecurityConfig.CONFIG_DIR, allow_fail=False) - node.account.scp_to(SecurityConfig.ssl_stores['ssl.keystore.location'], SecurityConfig.KEYSTORE_PATH) - node.account.scp_to(SecurityConfig.ssl_stores['ssl.truststore.location'], SecurityConfig.TRUSTSTORE_PATH) + node.account.scp_to(SecurityConfig.ssl_stores.truststore_path, SecurityConfig.TRUSTSTORE_PATH) + SecurityConfig.ssl_stores.generate_and_copy_keystore(node) def setup_sasl(self, node): node.account.ssh("mkdir -p %s" % SecurityConfig.CONFIG_DIR, allow_fail=False) diff --git a/tests/kafkatest/tests/core/security_test.py b/tests/kafkatest/tests/core/security_test.py new file mode 100644 index 00000000000..8c150a26715 --- /dev/null +++ b/tests/kafkatest/tests/core/security_test.py @@ -0,0 +1,106 @@ +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from ducktape.mark import parametrize + +from kafkatest.services.zookeeper import ZookeeperService +from kafkatest.services.kafka import KafkaService +from kafkatest.services.verifiable_producer import VerifiableProducer +from kafkatest.services.console_consumer import ConsoleConsumer +from kafkatest.services.security.security_config import SecurityConfig +from kafkatest.services.security.security_config import SslStores +from kafkatest.tests.produce_consume_validate import ProduceConsumeValidateTest +from kafkatest.utils import is_int +import time + +class TestSslStores(SslStores): + def __init__(self): + super(TestSslStores, self).__init__() + self.invalid_hostname = False + self.generate_ca() + self.generate_truststore() + + def hostname(self, node): + if (self.invalid_hostname): + return "invalidhost" + else: + return super(TestSslStores, self).hostname(node) + +class SecurityTest(ProduceConsumeValidateTest): + """ + These tests validate security features. + """ + + def __init__(self, test_context): + """:type test_context: ducktape.tests.test.TestContext""" + super(SecurityTest, self).__init__(test_context=test_context) + + self.topic = "test_topic" + self.zk = ZookeeperService(test_context, num_nodes=1) + self.kafka = KafkaService(test_context, num_nodes=1, zk=self.zk, topics={self.topic: { + "partitions": 2, + "replication-factor": 1} + }) + self.num_partitions = 2 + self.timeout_sec = 10000 + self.producer_throughput = 1000 + self.num_producers = 1 + self.num_consumers = 1 + + def setUp(self): + self.zk.start() + + @parametrize(security_protocol='PLAINTEXT', interbroker_security_protocol='SSL') + @parametrize(security_protocol='SSL', interbroker_security_protocol='PLAINTEXT') + def test_client_ssl_endpoint_validation_failure(self, security_protocol, interbroker_security_protocol): + """ + Test that invalid hostname in certificate results in connection failures. + When security_protocol=SSL, client SSL handshakes are expected to fail due to hostname verification failure. + When security_protocol=PLAINTEXT and interbroker_security_protocol=SSL, controller connections fail + with hostname verification failure. Hence clients are expected to fail with LEADER_NOT_AVAILABLE. + """ + + self.kafka.security_protocol = security_protocol + self.kafka.interbroker_security_protocol = interbroker_security_protocol + SecurityConfig.ssl_stores = TestSslStores() + + SecurityConfig.ssl_stores.invalid_hostname = True + self.kafka.start() + self.create_producer_and_consumer() + self.producer.log_level = "TRACE" + self.producer.start() + self.consumer.start() + time.sleep(10) + assert self.producer.num_acked == 0, "Messages published successfully, endpoint validation did not fail with invalid hostname" + error = 'SSLHandshakeException' if security_protocol is 'SSL' else 'LEADER_NOT_AVAILABLE' + for node in self.producer.nodes: + node.account.ssh("grep %s %s" % (error, self.producer.LOG_FILE)) + for node in self.consumer.nodes: + node.account.ssh("grep %s %s" % (error, self.consumer.LOG_FILE)) + + self.producer.stop() + self.consumer.stop() + self.producer.log_level = "INFO" + + SecurityConfig.ssl_stores.invalid_hostname = False + for node in self.kafka.nodes: + self.kafka.restart_node(node, clean_shutdown=True) + self.create_producer_and_consumer() + self.run_produce_consume_validate() + + def create_producer_and_consumer(self): + self.producer = VerifiableProducer(self.test_context, self.num_producers, self.kafka, self.topic, throughput=self.producer_throughput) + self.consumer = ConsoleConsumer(self.test_context, self.num_consumers, self.kafka, self.topic, new_consumer=True, consumer_timeout_ms=10000, message_validator=is_int) +