7.1 Security Overview

In release 0.9.0.0, the Kafka community added a number of features that, used either separately or together, increases security in a Kafka cluster. The following security measures are currently supported:
  1. Authenticating clients (Producers and consumers) connections to brokers, using either SSL or SASL (Kerberos)
  2. Authorizing read / write operations by clients
  3. Encryption of data sent between brokers and clients, or between brokers, using SSL
  4. Authenticate brokers connecting to ZooKeeper
  5. Security is optional - non-secured clusters are supported, as well as a mix of authenticated, unauthenticated, encrypted and non-encrypted clients.
  6. Authorization is pluggable and supports integration with external authorization services
The guides below explain how to configure and use the security features in both clients and brokers.

7.2 Encryption and Authentication using SSL

Apache Kafka allows clients to connect over SSL. By default SSL is disabled but can be turned on as needed.
  1. Generate SSL key and certificate for each Kafka broker

    The first step of deploying HTTPS is to generate the key and the certificate for each machine in the cluster. You can use Java’s keytool utility to accomplish this task. We will generate the key into a temporary keystore initially so that we can export and sign it later with CA.
    $ keytool -keystore server.keystore.jks -alias localhost -validity {validity} -genkey
    You need to specify two parameters in the above command:
    1. keystore: the keystore file that stores the certificate. The keystore file contains the private key of the certificate; therefore, it needs to be kept safely.
    2. validity: the valid time of the certificate in days.
    Ensure that common name (CN) matches exactly with the fully qualified domain name (FQDN) of the server. The client compares the CN with the DNS domain name to ensure that it is indeed connecting to the desired server, not the malicious one.
  2. Creating your own CA

    After the first step, each machine in the cluster has a public-private key pair, and a certificate to identify the machine. The certificate, however, is unsigned, which means that an attacker can create such a certificate to pretend to be any machine.

    Therefore, it is important to prevent forged certificates by signing them for each machine in the cluster. A certificate authority (CA) is responsible for signing certificates. CA works likes a government that issues passports—the government stamps (signs) each passport so that the passport becomes difficult to forge. Other governments verify the stamps to ensure the passport is authentic. Similarly, the CA signs the certificates, and the cryptography guarantees that a signed certificate is computationally difficult to forge. Thus, as long as the CA is a genuine and trusted authority, the clients have high assurance that they are connecting to the authentic machines.

    openssl req -new -x509 -keyout ca-key -out ca-cert -days 365
    The generated CA is simply a public-private key pair and certificate, and it is intended to sign other certificates.
    The next step is to add the generated CA to the **clients’ truststore** so that the clients can trust this CA:
    keytool -keystore server.truststore.jks -alias CARoot -import -file ca-cert
    Note: If you configure Kafka brokers to require client authentication by setting ssl.client.auth to be "requested" or "required" on Kafka broker config then you must provide a truststore for kafka broker as well and it should have all the CA certificates that clients keys signed by.
    keytool -keystore client.truststore.jks -alias CARoot -import -file ca-cert
    In contrast to the keystore in step 1 that stores each machine’s own identity, the truststore of a client stores all the certificates that the client should trust. Importing a certificate into one’s truststore also means that trusting all certificates that are signed by that certificate. As the analogy above, trusting the government (CA) also means that trusting all passports (certificates) that it has issued. This attribute is called the chains of trust, and it is particularly useful when deploying SSL on a large Kafka cluster. You can sign all certificates in the cluster with a single CA, and have all machines share the same truststore that trusts the CA. That way all machines can authenticate all other machines.
  3. Signing the certificate

    The next step is to sign all certificates generated by step 1 with the CA generated in step 2. First, you need to export the certificate from the keystore:
    keytool -keystore server.keystore.jks -alias localhost -certreq -file cert-file
    Then sign it with the CA:
    openssl x509 -req -CA ca-cert -CAkey ca-key -in cert-file -out cert-signed -days {validity} -CAcreateserial -passin pass:{ca-password}
    Finally, you need to import both the certificate of the CA and the signed certificate into the keystore:
                $ keytool -keystore server.keystore.jks -alias CARoot -import -file ca-cert
                $ keytool -keystore server.keystore.jks -alias localhost -import -file cert-signed
            
    The definitions of the parameters are the following:
    1. keystore: the location of the keystore
    2. ca-cert: the certificate of the CA
    3. ca-key: the private key of the CA
    4. ca-password: the passphrase of the CA
    5. cert-file: the exported, unsigned certificate of the server
    6. cert-signed: the signed certificate of the server
    Here is an example of a bash script with all above steps. Note that one of the commands assumes a password of `test1234`, so either use that password or edit the command before running it.
            #!/bin/bash
            #Step 1
            keytool -keystore server.keystore.jks -alias localhost -validity 365 -genkey
            #Step 2
            openssl req -new -x509 -keyout ca-key -out ca-cert -days 365
            keytool -keystore server.truststore.jks -alias CARoot -import -file ca-cert
            keytool -keystore client.truststore.jks -alias CARoot -import -file ca-cert
            #Step 3
            keytool -keystore server.keystore.jks -alias localhost -certreq -file cert-file
            openssl x509 -req -CA ca-cert -CAkey ca-key -in cert-file -out cert-signed -days 365 -CAcreateserial -passin pass:test1234
            keytool -keystore server.keystore.jks -alias CARoot -import -file ca-cert
            keytool -keystore server.keystore.jks -alias localhost -import -file cert-signed
                    
  4. Configuring Kafka Broker

    Kafka Broker comes with the feature of listening on multiple ports thanks to [KAFKA-1809](https://issues.apache.org/jira/browse/KAFKA-1809). We need to configure the following property in server.properties, which must have one or more comma-separated values:
    listeners
    If SSL is not enabled for inter-broker communication (see below for how to enable it), both PLAINTEXT and SSL ports will be necessary.
    listeners=PLAINTEXT://host.name:port,SSL://host.name:port
    Following SSL configs are needed on the broker side
            ssl.keystore.location = /var/private/ssl/kafka.server.keystore.jks
            ssl.keystore.password = test1234
            ssl.key.password = test1234
            ssl.truststore.location = /var/private/ssl/kafka.server.truststore.jks
            ssl.truststore.password = test1234
            
    Optional settings that are worth considering:
    1. ssl.client.auth = none ("required" => client authentication is required, "requested" => client authentication is requested and client without certs can still connect when this option chosen")
    2. ssl.cipher.suites = A cipher suite is a named combination of authentication, encryption, MAC and key exchange algorithm used to negotiate the security settings for a network connection using TLS or SSL network protocol. (Default is an empty list)
    3. ssl.enabled.protocols = TLSv1.2,TLSv1.1,TLSv1 (list out the SSL protocols that you are going to accept from clients. Do note SSL is deprecated and using that in production is not recommended)
    4. ssl.keystore.type = JKS
    5. ssl.truststore.type = JKS
    If you want to enable SSL for inter-broker communication, add the following to the broker properties file (it defaults to PLAINTEXT)
    security.inter.broker.protocol = SSL
    If you want to enable any cipher suites other than the defaults that comes with JVM like the ones listed here: https://docs.oracle.com/javase/7/docs/technotes/guides/security/SunProviders.html you will need to install Unlimited Strength Policy files
    Once you start the broker you should be able to see in the server.log
    with addresses: PLAINTEXT -> EndPoint(192.168.64.1,9092,PLAINTEXT),SSL -> EndPoint(192.168.64.1,9093,SSL)
    To check quickly if the server keystore and truststore are setup properly you can run the following command
    openssl s_client -debug -connect localhost:9093 -tls1
    (Note: TLSv1 should be listed under ssl.enabled.protocols)
    In the output of this command you should see server's certificate:
            -----BEGIN CERTIFICATE-----
            {variable sized random bytes}
            -----END CERTIFICATE-----
            subject=/C=US/ST=CA/L=Santa Clara/O=org/OU=org/CN=Sriharsha Chintalapani
            issuer=/C=US/ST=CA/L=Santa Clara/O=org/OU=org/CN=kafka/emailAddress=test@test.com
                
    If the certificate does not show up or if there are any other error messages than your keystore is not setup properly.
  5. Configuring Kafka Clients

    h4> SSL is supported only for new Kafka Producer & Consumer, the older API is not supported. The configs for SSL will be same for both producer & consumer.
    If client authentication is not required in the broker, then the following is a minimal configuration example:
            security.protocol = SSL
            ssl.truststore.location = "/var/private/ssl/kafka.client.truststore.jks"
            ssl.truststore.password = "test1234"
                
    If client authentication is required, then a keystore must be created like in step 1 and the following must also be configured:
            ssl.keystore.location = "/var/private/ssl/kafka.client.keystore.jks"
            ssl.keystore.password = "test1234"
            ssl.key.password = "test1234"
                    
    Other configuration settings that may also be needed depending on our requirements and the broker configuration:\
    1. ssl.provider (Optional). The name of the security provider used for SSL connections. Default value is the default security provider of the JVM.
    2. ssl.cipher.suites (Optional). A cipher suite is a named combination of authentication, encryption, MAC and key exchange algorithm used to negotiate the security settings for a network connection using TLS or SSL network protocol.
    3. ssl.enabled.protocols=TLSv1.2,TLSv1.1,TLSv1 **Should list at least one of the protocols configured on the broker side
    4. ssl.truststore.type = "JKS"
    5. ssl.keystore.type = "JKS"

    Examples using console-producer and console-consumer:
                kafka-console-producer.sh --broker-list localhost:9093 --topic test --new-producer --producer-property "security.protocol=SSL"  --producer-property "ssl.truststore.location=client.truststore.jks" --producer-property "ssl.truststore.password=test1234"
    
                kafka-console-consumer.sh --bootstrap-server localhost:9093 --topic test --new-consumer --consumer.config client-ssl.properties
                

7.3 Authentication using SASL

  1. Prerequisites


    1. Kerberos
      If your organization is already using a Kerberos server (for example, by using Active Directory), there is no need to install a new server just for Kafka. Otherwise you will need to install one, your Linux vendor likely has packages for Kerberos and a short guide on how to install and configure it (Ubuntu, Redhat). Note that if you are using Oracle Java, you will need to download JCE policy files for your Java version and copy them to $JAVA_HOME/jre/lib/security.
    2. Create Kerberos Principals
      If you are using the organization's Kerberos or Active Directory server, ask your Kerberos administrator for a principal for each Kafka broker in your cluster and for every Linux user that will access Kafka with Kerberos authentication.
      If you installed your own Kerberos, you will need to create these principals yourself:
      sudo /usr/sbin/kadmin.local -q 'addprinc -randkey kafka/hostname@domainname'
      sudo /usr/sbin/kadmin.local -q "ktadd -k /etc/security/keytabs/kafka.keytab kafka/hostname@domainname"
    3. Make sure all hosts can be reachable using hostnames - It is important in case of kerberos all your hosts can be resolved with their FQDNs.
    4. Creating JAAS Config File
      Each node in the cluster should have a JAAS file similar to the example below. Add this file to kafka/config dir:
                  KafkaServer {
                      com.sun.security.auth.module.Krb5LoginModule required
                      useKeyTab=true
                      storeKey=true
                      serviceName="kafka"
                      keyTab="/etc/security/keytabs/kafka1.keytab"
                      principal="kafka/kafka1.hostname.com@DOMAIN.COM";
                  };
      
                  Client {
                     com.sun.security.auth.module.Krb5LoginModule required
                     useKeyTab=true
                     storeKey=true
                     serviceName="zookeeper"
                     keyTab="/etc/security/keytabs/kafka1.keytab"
                     principal="kafka/kafka1.hostname.com@DOMAIN.COM";
                  };
      
                  KafkaClient {
                     com.sun.security.auth.module.Krb5LoginModule required
                     useTicketCache=true
                     serviceName="kafka";
                  };
              
      Important notes:
      1. KafkaServer is a section name in JAAS file used by KafkaServer/Broker. This section tells Kafka Server which principal to use and which keytab this principal is stored. It allows Kafka Server to login using the keytab specified in this section.
      2. Client section is used to authenticate a SASL connection with zookeeper. It also allows a broker to set SASL ACL on zookeeper nodes which locks these nodes down so that only kafka broker can modify. It is necessary to have the same principal name across all the brokers. If you want to use a section name other than Client, then you need to set the system property zookeeper.sasl.client to the appropriate name (e.g., -Dzookeeper.sasl.client=ZkClient).
      3. KafkaClient section here describes how the clients like producer and consumer can connect to the Kafka Broker. Here we specified "useTicketCache=true" not a keytab this allows user to do kinit and run a kafka-console-consumer or kafka-console-producer to connect to broker. For a long running process one should create KafkaClient section similar to KafkaServer.
      4. In KafkaServer and KafkaClient sections we've "serviceName" this should match principal name with which kafka broker is running. In the above example principal="kafka/kafka1.hostname.com@DOMAIN.com" so we've "kafka" which is matching the principalName.
    5. Creating Client Side JAAS Config
      Clients (producers, consumers, copycat workers, etc) will authenticate to the cluster with their own principal (usually with the same name as the user used for running the client), so obtain or create these principals as needed. Then create a JAAS file as follows:
                      KafkaClient {
                          com.sun.security.auth.module.Krb5LoginModule required
                          useKeyTab=true
                          storeKey=true
                          serviceName="kafka"
                          keyTab="/etc/security/keytabs/kafka1.keytab"
                          principal="kafkaproducer/hostname@DOMAIN.COM";
                      };
                  
  2. Configuring Kafka Brokers

    1. Pass the name of the jaas file you created in Creating JAAS Config File" as a JVM parameter to the kafka broker:
      -Djava.security.auth.login.config=/etc/kafka/kafka_jaas.conf
    2. Make sure the keytabs configured in the kafka_jaas.conf are readable by the linux user who is starting kafka broker.
    3. Configure a SASL port in server.properties, by adding the following to the listeners parameter, which contains one or more comma-separated values:
      listeners=SASL_PLAINTEXT://host.name:port
      If you are only configuring SASL port (or if you are very paranoid and want the Kafka brokers to authenticate each other using SASL) then make sure you set same SASL protocol for inter-broker communication:
      security.inter.broker.protocol=SASL_PLAINTEXT
  3. Configuring Kafka Clients

    SASL authentication is only supported for new kafka producer and consumer, the older API is not supported.>br> To configure SASL authentication on the clients:
    1. pass the name of the jaas file you created in Creating Client Side JAAS Config" as a JVM parameter to the client JVM:
      -Djava.security.auth.login.config=/etc/kafka/kafka_client_jaas.conf
    2. Make sure the keytabs configured in the kafka_client_jaas.conf are readable by the linux user who is starting kafka client.
    3. Configure the following property in producer.properties or consumer.properties:
      security.protocol=SASL_PLAINTEXT

7.4 Authorization and ACLs

7.5 ZooKeeper Authentication

7.5.1 New clusters

To enable ZooKeeper authentication on brokers, there are two necessary steps:
  1. Create a JAAS login file and set the appropriate system property to point to it as described above
  2. Set the configuration property zookeeper.set.acl in each broker to true
The metadata stored in ZooKeeper is such that only brokers will be able to modify the corresponding znodes, but znodes are world readable. The rationale behind this decision is that the data stored in ZooKeeper is not sensitive, but inappropriate manipulation of znodes can cause cluster disruption.

7.5.2 Migrating clusters

If you are running a version of Kafka that does not support security of simply with security disabled, and you want to make the cluster secure, then you need to execute the following steps to enable ZooKeeper authentication with minimal disruption to your operations:
  1. Perform a rolling restart setting the JAAS login file, which enables brokers to authenticate. At the end of the rolling restart, brokers are able to manipulate znodes with strict ACLs, but they will not create znodes with those ACLs
  2. Perform a second rolling restart of brokers, this time setting the configuration parameter zookeeper.set.acl to true, which enables the use of secure ACLs when creating znodes
  3. Execute the ZkSecurityMigrator tool. To execute the tool, there is this script: ./bin/zookeeper-security-migration.sh with zookeeper.acl set to secure. This tool traverses the corresponding sub-trees changing the ACLs of the znodes

It is also possible to turn off authentication in a secure cluster. To do it, follow these steps:

  1. Perform a rolling restart of brokers setting the JAAS login file, which enables brokers to authenticate, but setting zookeeper.set.acl to false. At the end of the rolling restart, brokers stop creating znodes with secure ACLs, but are still able to authenticate and manipulate all znodes
  2. Execute the ZkSecurityMigrator tool. To execute the tool, run this script ./bin/zookeeper-security-migration.sh with zookeeper.acl set to unsecure. This tool traverses the corresponding sub-trees changing the ACLs of the znodes
  3. Perform a second rolling restart of brokers, this time omitting the system property that sets the JAAS login file
Here is an example of how to run the migration tool:
./bin/zookeeper-security-migration --zookeeper.acl=secure --zookeeper.connection=localhost:2181

Run this to see the full list of parameters:

./bin/zookeeper-security-migration --help

7.5.3 Migrating the ZooKeeper ensemble

It is also necessary to enable authentication on the ZooKeeper ensemble. To do it, we need to perform a rolling restart of the server and set a few properties. Please refer to the ZooKeeper documentation for more detail:
  1. Apache ZooKeeper documentation
  2. Apache ZooKeeper wiki
  3. Cloudera ZooKeeper security configuration