diff --git a/tests/kafkatest/services/kafka/kafka.py b/tests/kafkatest/services/kafka/kafka.py index 910be0ab2b0..e164c917a6c 100644 --- a/tests/kafkatest/services/kafka/kafka.py +++ b/tests/kafkatest/services/kafka/kafka.py @@ -254,7 +254,7 @@ class KafkaService(KafkaPathResolverMixin, JmxMixin, Service): def alive(self, node): return len(self.pids(node)) > 0 - def start(self, add_principals="", use_zk_to_create_topic=True): + def start(self, add_principals=""): if self.zk_client_secure and not self.zk.zk_client_secure_port: raise Exception("Unable to start Kafka: TLS to Zookeeper requested but Zookeeper secure port not enabled") self.open_port(self.security_protocol) @@ -281,7 +281,7 @@ class KafkaService(KafkaPathResolverMixin, JmxMixin, Service): topic_cfg = {} topic_cfg["topic"] = topic - self.create_topic(topic_cfg, use_zk_to_create_topic=use_zk_to_create_topic) + self.create_topic(topic_cfg) def _ensure_zk_chroot(self): self.logger.info("Ensuring zk_chroot %s exists", self.zk_chroot) @@ -445,25 +445,61 @@ class KafkaService(KafkaPathResolverMixin, JmxMixin, Service): clean_shutdown=False, allow_fail=True) node.account.ssh("sudo rm -rf -- %s" % KafkaService.PERSISTENT_ROOT, allow_fail=False) - def _kafka_topics_cmd(self, node, use_zk_connection=True): + def _kafka_topics_cmd(self, node, force_use_zk_connection): """ Returns kafka-topics.sh command path with jaas configuration and krb5 environment variable set. If Admin client is not going to be used, don't set the environment variable. """ kafka_topic_script = self.path.script("kafka-topics.sh", node) - skip_security_settings = use_zk_connection or not node.version.topic_command_supports_bootstrap_server() + skip_security_settings = force_use_zk_connection or not self.all_nodes_topic_command_supports_bootstrap_server() return kafka_topic_script if skip_security_settings else \ "KAFKA_OPTS='-D%s -D%s' %s" % (KafkaService.JAAS_CONF_PROPERTY, KafkaService.KRB5_CONF, kafka_topic_script) - def _kafka_topics_cmd_config(self, node, use_zk_connection=True): + def _kafka_topics_cmd_config(self, node, force_use_zk_connection): """ Return --command-config parameter to the kafka-topics.sh command. The config parameter specifies the security settings that AdminClient uses to connect to a secure kafka server. """ - skip_command_config = use_zk_connection or not node.version.topic_command_supports_bootstrap_server() + skip_command_config = force_use_zk_connection or not self.all_nodes_topic_command_supports_bootstrap_server() return "" if skip_command_config else " --command-config <(echo '%s')" % (self.security_config.client_config()) - def create_topic(self, topic_cfg, node=None, use_zk_to_create_topic=True): + def all_nodes_topic_command_supports_bootstrap_server(self): + for node in self.nodes: + if not node.version.topic_command_supports_bootstrap_server(): + return False + return True + + def all_nodes_topic_command_supports_if_not_exists_with_bootstrap_server(self): + for node in self.nodes: + if not node.version.topic_command_supports_if_not_exists_with_bootstrap_server(): + return False + return True + + def all_nodes_configs_command_uses_bootstrap_server(self): + for node in self.nodes: + if not node.version.kafka_configs_command_uses_bootstrap_server(): + return False + return True + + def all_nodes_configs_command_uses_bootstrap_server_scram(self): + for node in self.nodes: + if not node.version.kafka_configs_command_uses_bootstrap_server_scram(): + return False + return True + + def all_nodes_acl_command_supports_bootstrap_server(self): + for node in self.nodes: + if not node.version.acl_command_supports_bootstrap_server(): + return False + return True + + def all_nodes_reassign_partitions_command_supports_bootstrap_server(self): + for node in self.nodes: + if not node.version.reassign_partitions_command_supports_bootstrap_server(): + return False + return True + + def create_topic(self, topic_cfg, node=None): """Run the admin tool create topic command. Specifying node is optional, and may be done if for different kafka nodes have different versions, and we care where command gets run. @@ -475,12 +511,13 @@ class KafkaService(KafkaPathResolverMixin, JmxMixin, Service): self.logger.info("Creating topic %s with settings %s", topic_cfg["topic"], topic_cfg) - use_zk_connection = topic_cfg.get('if-not-exists', False) or use_zk_to_create_topic + force_use_zk_connection = not self.all_nodes_topic_command_supports_bootstrap_server() or\ + (topic_cfg.get('if-not-exists', False) and not self.all_nodes_topic_command_supports_if_not_exists_with_bootstrap_server()) cmd = fix_opts_for_new_jvm(node) cmd += "%(kafka_topics_cmd)s %(connection_string)s --create --topic %(topic)s " % { - 'kafka_topics_cmd': self._kafka_topics_cmd(node, use_zk_connection), - 'connection_string': self._topic_command_connect_setting(node, use_zk_connection), + 'kafka_topics_cmd': self._kafka_topics_cmd(node, force_use_zk_connection), + 'connection_string': self._topic_command_connect_setting(node, force_use_zk_connection), 'topic': topic_cfg.get("topic"), } if 'replica-assignment' in topic_cfg: @@ -500,12 +537,12 @@ class KafkaService(KafkaPathResolverMixin, JmxMixin, Service): for config_name, config_value in topic_cfg["configs"].items(): cmd += " --config %s=%s" % (config_name, str(config_value)) - cmd += self._kafka_topics_cmd_config(node, use_zk_connection) + cmd += self._kafka_topics_cmd_config(node, force_use_zk_connection) self.logger.info("Running topic creation command...\n%s" % cmd) node.account.ssh(cmd) - def delete_topic(self, topic, node=None, use_zk_to_delete_topic=False): + def delete_topic(self, topic, node=None): """ Delete a topic with the topics command :param topic: @@ -516,22 +553,27 @@ class KafkaService(KafkaPathResolverMixin, JmxMixin, Service): node = self.nodes[0] self.logger.info("Deleting topic %s" % topic) + force_use_zk_connection = not self.all_nodes_topic_command_supports_bootstrap_server() + cmd = fix_opts_for_new_jvm(node) cmd += "%s %s --topic %s --delete %s" % \ - (self._kafka_topics_cmd(node=node, use_zk_connection=use_zk_to_delete_topic), - self._topic_command_connect_setting(node=node, use_zk_connection=use_zk_to_delete_topic), - topic, self._kafka_topics_cmd_config(node=node, use_zk_connection=use_zk_to_delete_topic)) + (self._kafka_topics_cmd(node, force_use_zk_connection), + self._topic_command_connect_setting(node, force_use_zk_connection), + topic, self._kafka_topics_cmd_config(node, force_use_zk_connection)) self.logger.info("Running topic delete command...\n%s" % cmd) node.account.ssh(cmd) - def describe_topic(self, topic, node=None, use_zk_to_describe_topic=True): + def describe_topic(self, topic, node=None): if node is None: node = self.nodes[0] + + force_use_zk_connection = not self.all_nodes_topic_command_supports_bootstrap_server() + cmd = fix_opts_for_new_jvm(node) cmd += "%s %s --topic %s --describe %s" % \ - (self._kafka_topics_cmd(node=node, use_zk_connection=use_zk_to_describe_topic), - self._topic_command_connect_setting(node=node, use_zk_connection=use_zk_to_describe_topic), - topic, self._kafka_topics_cmd_config(node=node, use_zk_connection=use_zk_to_describe_topic)) + (self._kafka_topics_cmd(node, force_use_zk_connection), + self._topic_command_connect_setting(node, force_use_zk_connection), + topic, self._kafka_topics_cmd_config(node, force_use_zk_connection)) self.logger.info("Running topic describe command...\n%s" % cmd) output = "" @@ -539,14 +581,16 @@ class KafkaService(KafkaPathResolverMixin, JmxMixin, Service): output += line return output - def list_topics(self, node=None, use_zk_to_list_topic=True): + def list_topics(self, node=None): if node is None: node = self.nodes[0] + force_use_zk_connection = not self.all_nodes_topic_command_supports_bootstrap_server() + cmd = fix_opts_for_new_jvm(node) - cmd += "%s %s --list %s" % (self._kafka_topics_cmd(node, use_zk_to_list_topic), - self._topic_command_connect_setting(node, use_zk_to_list_topic), - self._kafka_topics_cmd_config(node, use_zk_to_list_topic)) + cmd += "%s %s --list %s" % (self._kafka_topics_cmd(node, force_use_zk_connection), + self._topic_command_connect_setting(node, force_use_zk_connection), + self._kafka_topics_cmd_config(node, force_use_zk_connection)) for line in node.account.ssh_capture(cmd): if not line.startswith("SLF4J"): yield line.rstrip() @@ -578,7 +622,7 @@ class KafkaService(KafkaPathResolverMixin, JmxMixin, Service): def _connect_setting_kafka_configs(self, node): # Use this for everything related to kafka-configs except User SCRAM Credentials - if node.version.kafka_configs_command_uses_bootstrap_server(): + if self.all_nodes_configs_command_uses_bootstrap_server(): return "--bootstrap-server %s --command-config <(echo '%s')" % (self.bootstrap_servers(self.security_protocol), self.security_config.client_config()) else: @@ -586,13 +630,32 @@ class KafkaService(KafkaPathResolverMixin, JmxMixin, Service): def _connect_setting_kafka_configs_scram(self, node): # Use this for kafka-configs when operating on User SCRAM Credentials - if node.version.kafka_configs_command_uses_bootstrap_server_scram(): + if self.all_nodes_configs_command_uses_bootstrap_server_scram(): return "--bootstrap-server %s --command-config <(echo '%s')" %\ (self.bootstrap_servers(self.security_protocol), self.security_config.client_config(use_inter_broker_mechanism_for_client = True)) else: return "--zookeeper %s %s" % (self.zk_connect_setting(), self.zk.zkTlsConfigFileOption()) + def kafka_acls_cmd(self, node, force_use_zk_connection): + """ + Returns kafka-acls.sh command path with jaas configuration and krb5 environment variable + set. If Admin client is not going to be used, don't set the environment variable. + """ + kafka_acls_script = self.path.script("kafka-acls.sh", node) + skip_security_settings = force_use_zk_connection or not self.all_nodes_acl_command_supports_bootstrap_server() + return kafka_acls_script if skip_security_settings else \ + "KAFKA_OPTS='-D%s -D%s' %s" % (KafkaService.JAAS_CONF_PROPERTY, KafkaService.KRB5_CONF, kafka_acls_script) + + def run_cli_tool(self, node, cmd): + output = "" + self.logger.debug(cmd) + for line in node.account.ssh_capture(cmd): + if not line.startswith("SLF4J"): + output += line + self.logger.debug(output) + return output + def parse_describe_topic(self, topic_description): """Parse output of kafka-topics.sh --describe (or describe_topic() method above), which is a string of form PartitionCount:2\tReplicationFactor:2\tConfigs: @@ -624,7 +687,7 @@ class KafkaService(KafkaPathResolverMixin, JmxMixin, Service): def _connect_setting_reassign_partitions(self, node): - if node.version.reassign_partitions_command_supports_bootstrap_server(): + if self.all_nodes_reassign_partitions_command_supports_bootstrap_server(): return "--bootstrap-server %s " % self.bootstrap_servers(self.security_protocol) else: return "--zookeeper %s " % self.zk_connect_setting() @@ -748,37 +811,68 @@ class KafkaService(KafkaPathResolverMixin, JmxMixin, Service): self.stop_node(node, clean_shutdown, timeout_sec) self.start_node(node, timeout_sec) + def _describe_topic_line_for_partition(self, partition, describe_topic_output): + # Lines look like this: Topic: test_topic Partition: 0 Leader: 3 Replicas: 3,2 Isr: 3,2 + grep_for = "Partition: %i\t" % (partition) # be sure to include trailing tab, otherwise 1 might match 10 (for example) + found_lines = [line for line in describe_topic_output.splitlines() if grep_for in line] + return None if not found_lines else found_lines[0] + def isr_idx_list(self, topic, partition=0): """ Get in-sync replica list the given topic and partition. """ - self.logger.debug("Querying zookeeper to find in-sync replicas for topic %s and partition %d" % (topic, partition)) - zk_path = "/brokers/topics/%s/partitions/%d/state" % (topic, partition) - partition_state = self.zk.query(zk_path, chroot=self.zk_chroot) + node = self.nodes[0] + if not self.all_nodes_topic_command_supports_bootstrap_server(): + self.logger.debug("Querying zookeeper to find in-sync replicas for topic %s and partition %d" % (topic, partition)) + zk_path = "/brokers/topics/%s/partitions/%d/state" % (topic, partition) + partition_state = self.zk.query(zk_path, chroot=self.zk_chroot) - if partition_state is None: - raise Exception("Error finding partition state for topic %s and partition %d." % (topic, partition)) + if partition_state is None: + raise Exception("Error finding partition state for topic %s and partition %d." % (topic, partition)) - partition_state = json.loads(partition_state) - self.logger.info(partition_state) + partition_state = json.loads(partition_state) + self.logger.info(partition_state) + + isr_idx_list = partition_state["isr"] + else: + self.logger.debug("Querying Kafka Admin API to find in-sync replicas for topic %s and partition %d" % (topic, partition)) + describe_output = self.describe_topic(topic, node) + self.logger.debug(describe_output) + requested_partition_line = self._describe_topic_line_for_partition(partition, describe_output) + # e.g. Topic: test_topic Partition: 0 Leader: 3 Replicas: 3,2 Isr: 3,2 + if not requested_partition_line: + raise Exception("Error finding partition state for topic %s and partition %d." % (topic, partition)) + isr_csv = requested_partition_line.split()[9] # 10th column from above + isr_idx_list = [int(i) for i in isr_csv.split(",")] - isr_idx_list = partition_state["isr"] self.logger.info("Isr for topic %s and partition %d is now: %s" % (topic, partition, isr_idx_list)) return isr_idx_list def replicas(self, topic, partition=0): """ Get the assigned replicas for the given topic and partition. """ - self.logger.debug("Querying zookeeper to find assigned replicas for topic %s and partition %d" % (topic, partition)) - zk_path = "/brokers/topics/%s" % (topic) - assignment = self.zk.query(zk_path, chroot=self.zk_chroot) + node = self.nodes[0] + if not self.all_nodes_topic_command_supports_bootstrap_server(): + self.logger.debug("Querying zookeeper to find assigned replicas for topic %s and partition %d" % (topic, partition)) + zk_path = "/brokers/topics/%s" % (topic) + assignment = self.zk.query(zk_path, chroot=self.zk_chroot) - if assignment is None: - raise Exception("Error finding partition state for topic %s and partition %d." % (topic, partition)) + if assignment is None: + raise Exception("Error finding partition state for topic %s and partition %d." % (topic, partition)) - assignment = json.loads(assignment) - self.logger.info(assignment) + assignment = json.loads(assignment) + self.logger.info(assignment) - replicas = assignment["partitions"][str(partition)] + replicas = assignment["partitions"][str(partition)] + else: + self.logger.debug("Querying Kafka Admin API to find replicas for topic %s and partition %d" % (topic, partition)) + describe_output = self.describe_topic(topic, node) + self.logger.debug(describe_output) + requested_partition_line = self._describe_topic_line_for_partition(partition, describe_output) + # e.g. Topic: test_topic Partition: 0 Leader: 3 Replicas: 3,2 Isr: 3,2 + if not requested_partition_line: + raise Exception("Error finding partition state for topic %s and partition %d." % (topic, partition)) + isr_csv = requested_partition_line.split()[7] # 8th column from above + replicas = [int(i) for i in isr_csv.split(",")] self.logger.info("Assigned replicas for topic %s and partition %d is now: %s" % (topic, partition, replicas)) return [self.get_node(replica) for replica in replicas] @@ -786,17 +880,29 @@ class KafkaService(KafkaPathResolverMixin, JmxMixin, Service): def leader(self, topic, partition=0): """ Get the leader replica for the given topic and partition. """ - self.logger.debug("Querying zookeeper to find leader replica for topic %s and partition %d" % (topic, partition)) - zk_path = "/brokers/topics/%s/partitions/%d/state" % (topic, partition) - partition_state = self.zk.query(zk_path, chroot=self.zk_chroot) + node = self.nodes[0] + if not self.all_nodes_topic_command_supports_bootstrap_server(): + self.logger.debug("Querying zookeeper to find leader replica for topic %s and partition %d" % (topic, partition)) + zk_path = "/brokers/topics/%s/partitions/%d/state" % (topic, partition) + partition_state = self.zk.query(zk_path, chroot=self.zk_chroot) + + if partition_state is None: + raise Exception("Error finding partition state for topic %s and partition %d." % (topic, partition)) - if partition_state is None: - raise Exception("Error finding partition state for topic %s and partition %d." % (topic, partition)) + partition_state = json.loads(partition_state) + self.logger.info(partition_state) - partition_state = json.loads(partition_state) - self.logger.info(partition_state) + leader_idx = int(partition_state["leader"]) + else: + self.logger.debug("Querying Kafka Admin API to find leader for topic %s and partition %d" % (topic, partition)) + describe_output = self.describe_topic(topic, node) + self.logger.debug(describe_output) + requested_partition_line = self._describe_topic_line_for_partition(partition, describe_output) + # e.g. Topic: test_topic Partition: 0 Leader: 3 Replicas: 3,2 Isr: 3,2 + if not requested_partition_line: + raise Exception("Error finding partition state for topic %s and partition %d." % (topic, partition)) + leader_idx = int(requested_partition_line.split()[5]) # 6th column from above - leader_idx = int(partition_state["leader"]) self.logger.info("Leader for topic %s and partition %d is now: %d" % (topic, partition, leader_idx)) return self.get_node(leader_idx) @@ -840,13 +946,7 @@ class KafkaService(KafkaPathResolverMixin, JmxMixin, Service): (consumer_group_script, self.bootstrap_servers(self.security_protocol), command_config) - output = "" - self.logger.debug(cmd) - for line in node.account.ssh_capture(cmd): - if not line.startswith("SLF4J"): - output += line - self.logger.debug(output) - return output + return self.run_cli_tool(node, cmd) def describe_consumer_group(self, group, node=None, command_config=None): """ Describe a consumer group. @@ -877,12 +977,12 @@ class KafkaService(KafkaPathResolverMixin, JmxMixin, Service): def zk_connect_setting(self): return self.zk.connect_setting(self.zk_chroot, self.zk_client_secure) - def _topic_command_connect_setting(self, node, use_zk_connection=True): + def _topic_command_connect_setting(self, node, force_use_zk_connection): """ Checks if --bootstrap-server config is supported, if yes then returns a string with bootstrap server, otherwise returns zookeeper connection string. """ - if node.version.topic_command_supports_bootstrap_server() and not use_zk_connection: + if not force_use_zk_connection and self.all_nodes_topic_command_supports_bootstrap_server(): connection_setting = "--bootstrap-server %s" % (self.bootstrap_servers(self.security_protocol)) else: connection_setting = "--zookeeper %s" % (self.zk_connect_setting()) diff --git a/tests/kafkatest/services/security/kafka_acls.py b/tests/kafkatest/services/security/kafka_acls.py index 85d3c7d7371..3bb3e6fd1a8 100644 --- a/tests/kafkatest/services/security/kafka_acls.py +++ b/tests/kafkatest/services/security/kafka_acls.py @@ -14,62 +14,140 @@ # limitations under the License. from kafkatest.directory_layout.kafka_path import KafkaPathResolverMixin +from kafkatest.services.kafka.util import fix_opts_for_new_jvm class ACLs(KafkaPathResolverMixin): def __init__(self, context): self.context = context - def set_acls(self, protocol, kafka, topic, group): - node = kafka.nodes[0] - setting = kafka.zk_connect_setting() - + def set_acls(self, protocol, kafka, topic, group, force_use_zk_connection=False): # Set server ACLs kafka_principal = "User:CN=systemtest" if protocol == "SSL" else "User:kafka" - self.acls_command(node, ACLs.add_cluster_acl(setting, kafka_principal)) - self.acls_command(node, ACLs.broker_read_acl(setting, "*", kafka_principal)) + self.add_cluster_acl(kafka, kafka_principal, force_use_zk_connection=force_use_zk_connection) + self.add_read_acl(kafka, kafka_principal, "*", force_use_zk_connection=force_use_zk_connection) # Set client ACLs client_principal = "User:CN=systemtest" if protocol == "SSL" else "User:client" - self.acls_command(node, ACLs.produce_acl(setting, topic, client_principal)) - self.acls_command(node, ACLs.consume_acl(setting, topic, group, client_principal)) - - def acls_command(self, node, properties): - cmd = "%s %s" % (self.path.script("kafka-acls.sh", node), properties) - node.account.ssh(cmd) - - @staticmethod - def add_cluster_acl(zk_connect, principal="User:kafka"): - return "--authorizer-properties zookeeper.connect=%(zk_connect)s --add --cluster " \ - "--operation=ClusterAction --allow-principal=%(principal)s " % { - 'zk_connect': zk_connect, - 'principal': principal - } + self.add_produce_acl(kafka, client_principal, topic, force_use_zk_connection=force_use_zk_connection) + self.add_consume_acl(kafka, client_principal, topic, group, force_use_zk_connection=force_use_zk_connection) + + def _acl_command_connect_setting(self, kafka, node, force_use_zk_connection): + """ + Checks if --bootstrap-server config is supported, if yes then returns a string with + bootstrap server, otherwise returns authorizer properties for zookeeper connection. + """ + if not force_use_zk_connection and kafka.all_nodes_acl_command_supports_bootstrap_server(): + connection_setting = "--bootstrap-server %s" % (kafka.bootstrap_servers(kafka.security_protocol)) + else: + connection_setting = "--authorizer-properties zookeeper.connect=%s" % (kafka.zk_connect_setting()) + + return connection_setting - @staticmethod - def broker_read_acl(zk_connect, topic, principal="User:kafka"): - return "--authorizer-properties zookeeper.connect=%(zk_connect)s --add --topic=%(topic)s " \ - "--operation=Read --allow-principal=%(principal)s " % { - 'zk_connect': zk_connect, + def _kafka_acls_cmd_config(self, kafka, node, force_use_zk_connection): + """ + Return --command-config parameter to the kafka-acls.sh command. The config parameter specifies + the security settings that AdminClient uses to connect to a secure kafka server. + """ + skip_command_config = force_use_zk_connection or not kafka.all_nodes_acl_command_supports_bootstrap_server() + return "" if skip_command_config else " --command-config <(echo '%s')" % (kafka.security_config.client_config()) + + def _acl_cmd_prefix(self, kafka, node, force_use_zk_connection): + """ + :param node: Node to use when determining connection settings + :param force_use_zk_connection: forces the use of ZooKeeper when true, otherwise AdminClient is used when available + :return command prefix for running kafka-acls + """ + cmd = fix_opts_for_new_jvm(node) + cmd += "%s %s %s" % ( + kafka.kafka_acls_cmd(node, force_use_zk_connection), + self._acl_command_connect_setting(kafka, node, force_use_zk_connection), + self._kafka_acls_cmd_config(kafka, node, force_use_zk_connection)) + return cmd + + def _add_acl_on_topic(self, kafka, principal, topic, operation_flag, node, force_use_zk_connection): + """ + :param principal: principal for which ACL is created + :param topic: topic for which ACL is created + :param operation_flag: type of ACL created (e.g. --producer, --consumer, --operation=Read) + :param node: Node to use when determining connection settings + :param force_use_zk_connection: forces the use of ZooKeeper when true, otherwise AdminClient is used when available + """ + cmd = "%(cmd_prefix)s --add --topic=%(topic)s %(operation_flag)s --allow-principal=%(principal)s" % { + 'cmd_prefix': self._acl_cmd_prefix(kafka, node, force_use_zk_connection), 'topic': topic, + 'operation_flag': operation_flag, 'principal': principal } + kafka.run_cli_tool(node, cmd) - @staticmethod - def produce_acl(zk_connect, topic, principal="User:client"): - return "--authorizer-properties zookeeper.connect=%(zk_connect)s --add --topic=%(topic)s " \ - "--producer --allow-principal=%(principal)s " % { - 'zk_connect': zk_connect, - 'topic': topic, + def add_cluster_acl(self, kafka, principal, force_use_zk_connection=False): + """ + :param kafka: Kafka cluster upon which ClusterAction ACL is created + :param principal: principal for which ClusterAction ACL is created + :param node: Node to use when determining connection settings + :param force_use_zk_connection: forces the use of ZooKeeper when true, otherwise AdminClient is used when available. + This is necessary for the case where we are bootstrapping ACLs before Kafka is started or before authorizer is enabled + """ + node = kafka.nodes[0] + + force_use_zk_connection = force_use_zk_connection or not kafka.all_nodes_acl_command_supports_bootstrap_server() + + cmd = "%(cmd_prefix)s --add --cluster --operation=ClusterAction --allow-principal=%(principal)s" % { + 'cmd_prefix': self._acl_cmd_prefix(kafka, node, force_use_zk_connection), 'principal': principal } + kafka.run_cli_tool(node, cmd) - @staticmethod - def consume_acl(zk_connect, topic, group, principal="User:client"): - return "--authorizer-properties zookeeper.connect=%(zk_connect)s --add --topic=%(topic)s " \ - "--group=%(group)s --consumer --allow-principal=%(principal)s " % { - 'zk_connect': zk_connect, + def add_read_acl(self, kafka, principal, topic, force_use_zk_connection=False): + """ + :param kafka: Kafka cluster upon which Read ACL is created + :param principal: principal for which Read ACL is created + :param topic: topic for which Read ACL is created + :param node: Node to use when determining connection settings + :param force_use_zk_connection: forces the use of ZooKeeper when true, otherwise AdminClient is used when available. + This is necessary for the case where we are bootstrapping ACLs before Kafka is started or before authorizer is enabled + """ + node = kafka.nodes[0] + + force_use_zk_connection = force_use_zk_connection or not kafka.all_nodes_acl_command_supports_bootstrap_server() + + self._add_acl_on_topic(kafka, principal, topic, "--operation=Read", node, force_use_zk_connection) + + def add_produce_acl(self, kafka, principal, topic, force_use_zk_connection=False): + """ + :param kafka: Kafka cluster upon which Producer ACL is created + :param principal: principal for which Producer ACL is created + :param topic: topic for which Producer ACL is created + :param node: Node to use when determining connection settings + :param force_use_zk_connection: forces the use of ZooKeeper when true, otherwise AdminClient is used when available. + This is necessary for the case where we are bootstrapping ACLs before Kafka is started or before authorizer is enabled + """ + node = kafka.nodes[0] + + force_use_zk_connection = force_use_zk_connection or not kafka.all_nodes_acl_command_supports_bootstrap_server() + + self._add_acl_on_topic(kafka, principal, topic, "--producer", node, force_use_zk_connection) + + def add_consume_acl(self, kafka, principal, topic, group, force_use_zk_connection=False): + """ + :param kafka: Kafka cluster upon which Consumer ACL is created + :param principal: principal for which Consumer ACL is created + :param topic: topic for which Consumer ACL is created + :param group: consumewr group for which Consumer ACL is created + :param node: Node to use when determining connection settings + :param force_use_zk_connection: forces the use of ZooKeeper when true, otherwise AdminClient is used when available. + This is necessary for the case where we are bootstrapping ACLs before Kafka is started or before authorizer is enabled + """ + node = kafka.nodes[0] + + force_use_zk_connection = force_use_zk_connection or not kafka.all_nodes_acl_command_supports_bootstrap_server() + + cmd = "%(cmd_prefix)s --add --topic=%(topic)s --group=%(group)s --consumer --allow-principal=%(principal)s" % { + 'cmd_prefix': self._acl_cmd_prefix(kafka, node, force_use_zk_connection), 'topic': topic, 'group': group, 'principal': principal } + kafka.run_cli_tool(node, cmd) + diff --git a/tests/kafkatest/services/security/security_config.py b/tests/kafkatest/services/security/security_config.py index 429662be04e..d619630ee71 100644 --- a/tests/kafkatest/services/security/security_config.py +++ b/tests/kafkatest/services/security/security_config.py @@ -203,13 +203,16 @@ class SecurityConfig(TemplateRenderer): # If node is not specified, use static jaas config which will be created later. # Otherwise use static JAAS configuration files with SASL_SSL and sasl.jaas.config # property with SASL_PLAINTEXT so that both code paths are tested by existing tests. - # Note that this is an artibtrary choice and it is possible to run all tests with + # Note that this is an arbitrary choice and it is possible to run all tests with # either static or dynamic jaas config files if required. static_jaas_conf = node is None or (self.has_sasl and self.has_ssl) if use_inter_broker_mechanism_for_client: client_sasl_mechanism_to_use = self.interbroker_sasl_mechanism else: - client_sasl_mechanism_to_use = self.client_sasl_mechanism + # csv is supported here, but client configs only supports a single mechanism, + # so arbitrarily take the first one defined in case it has multiple values + client_sasl_mechanism_to_use = self.client_sasl_mechanism.split(',')[0].strip() + return SecurityConfig(self.context, self.security_protocol, client_sasl_mechanism=client_sasl_mechanism_to_use, template_props=template_props, @@ -297,7 +300,8 @@ class SecurityConfig(TemplateRenderer): self.export_kafka_opts_for_admin_client_as_broker()) def maybe_create_scram_credentials(self, node, connect, path, mechanism, user_name, password, kafka_opts_for_admin_client_as_broker = ""): - if self.has_sasl and self.is_sasl_scram(mechanism): + # we only need to create these credentials when the client and broker mechanisms are both SASL/SCRAM + if self.has_sasl and self.is_sasl_scram(mechanism) and self.is_sasl_scram(self.interbroker_sasl_mechanism): cmd = "%s %s %s --entity-name %s --entity-type users --alter --add-config %s=[password=%s]" % \ (kafka_opts_for_admin_client_as_broker, path.script("kafka-configs.sh", node), connect, user_name, mechanism, password) diff --git a/tests/kafkatest/tests/core/security_rolling_upgrade_test.py b/tests/kafkatest/tests/core/security_rolling_upgrade_test.py index 853bed2c931..4a176b06145 100644 --- a/tests/kafkatest/tests/core/security_rolling_upgrade_test.py +++ b/tests/kafkatest/tests/core/security_rolling_upgrade_test.py @@ -72,9 +72,10 @@ class TestSecurityRollingUpgrade(ProduceConsumeValidateTest): def set_authorizer_and_bounce(self, client_protocol, broker_protocol, authorizer_class_name = KafkaService.ACL_AUTHORIZER): self.kafka.authorizer_class_name = authorizer_class_name - self.acls.set_acls(client_protocol, self.kafka, self.topic, self.group) - self.acls.set_acls(broker_protocol, self.kafka, self.topic, self.group) - self.bounce() + # Force use of direct ZooKeeper access due to SecurityDisabledException: No Authorizer is configured on the broker. + self.acls.set_acls(client_protocol, self.kafka, self.topic, self.group, force_use_zk_connection=True) + self.acls.set_acls(broker_protocol, self.kafka, self.topic, self.group, force_use_zk_connection=True) + self.bounce() # enables the authorizer def open_secured_port(self, client_protocol): self.kafka.security_protocol = client_protocol diff --git a/tests/kafkatest/tests/core/security_test.py b/tests/kafkatest/tests/core/security_test.py index 5469df1ab22..7339873ed9f 100644 --- a/tests/kafkatest/tests/core/security_test.py +++ b/tests/kafkatest/tests/core/security_test.py @@ -67,8 +67,9 @@ class SecurityTest(EndToEndTest): with hostname verification failure. Hence clients are expected to fail with LEADER_NOT_AVAILABLE. """ + # Start Kafka with valid hostnames in the certs' SANs so that we can create the test topic via the admin client SecurityConfig.ssl_stores = TestSslStores(self.test_context.local_scratch_dir, - valid_hostname=False) + valid_hostname=True) self.create_zookeeper() self.zk.start() @@ -77,6 +78,10 @@ class SecurityTest(EndToEndTest): interbroker_security_protocol=interbroker_security_protocol) self.kafka.start() + # now set the certs to have invalid hostnames so we can run the actual test + SecurityConfig.ssl_stores.valid_hostname = False + self.kafka.restart_cluster() + # We need more verbose logging to catch the expected errors self.create_and_start_clients(log_level="DEBUG") diff --git a/tests/kafkatest/tests/core/zookeeper_security_upgrade_test.py b/tests/kafkatest/tests/core/zookeeper_security_upgrade_test.py index f1a5dd23525..7d610daf39b 100644 --- a/tests/kafkatest/tests/core/zookeeper_security_upgrade_test.py +++ b/tests/kafkatest/tests/core/zookeeper_security_upgrade_test.py @@ -96,7 +96,8 @@ class ZooKeeperSecurityUpgradeTest(ProduceConsumeValidateTest): # set acls if self.is_secure: self.kafka.authorizer_class_name = KafkaService.ACL_AUTHORIZER - self.acls.set_acls(security_protocol, self.kafka, self.topic, self.group) + # Force use of direct ZooKeeper access because Kafka is not yet started + self.acls.set_acls(security_protocol, self.kafka, self.topic, self.group, force_use_zk_connection=True) if self.no_sasl: self.kafka.start() diff --git a/tests/kafkatest/tests/core/zookeeper_tls_encrypt_only_test.py b/tests/kafkatest/tests/core/zookeeper_tls_encrypt_only_test.py index 4f1cf44ee3d..7e97a4cd685 100644 --- a/tests/kafkatest/tests/core/zookeeper_tls_encrypt_only_test.py +++ b/tests/kafkatest/tests/core/zookeeper_tls_encrypt_only_test.py @@ -69,9 +69,7 @@ class ZookeeperTlsEncryptOnlyTest(ProduceConsumeValidateTest): self.zk.start() self.kafka.security_protocol = self.kafka.interbroker_security_protocol = "PLAINTEXT" - # Cannot use --zookeeper because kafka-topics.sh is unable to connect to a TLS-enabled ZooKeeper quorum, - # so indicate that topics should be created via the Admin client - self.kafka.start(use_zk_to_create_topic=False) + self.kafka.start() self.perform_produce_consume_validation() diff --git a/tests/kafkatest/version.py b/tests/kafkatest/version.py index 1537b35604a..adcad440f76 100644 --- a/tests/kafkatest/version.py +++ b/tests/kafkatest/version.py @@ -52,9 +52,15 @@ class KafkaVersion(LooseVersion): def supports_named_listeners(self): return self >= V_0_10_2_0 + def acl_command_supports_bootstrap_server(self): + return self >= V_2_1_0 + def topic_command_supports_bootstrap_server(self): return self >= V_2_3_0 + def topic_command_supports_if_not_exists_with_bootstrap_server(self): + return self >= V_2_6_0 + def supports_tls_to_zookeeper(self): # indicate if KIP-515 is available return self >= V_2_5_0