keytool -keystore server.keystore.jks -alias localhost -validity {validity} -genkeyYou need to specify two parameters in the above command:
Therefore, it is important to prevent forged certificates by signing them for each machine in the cluster. A certificate authority (CA) is responsible for signing certificates. CA works likes a government that issues passports—the government stamps (signs) each passport so that the passport becomes difficult to forge. Other governments verify the stamps to ensure the passport is authentic. Similarly, the CA signs the certificates, and the cryptography guarantees that a signed certificate is computationally difficult to forge. Thus, as long as the CA is a genuine and trusted authority, the clients have high assurance that they are connecting to the authentic machines.
openssl req -new -x509 -keyout ca-key -out ca-cert -days 365The generated CA is simply a public-private key pair and certificate, and it is intended to sign other certificates.
keytool -keystore server.truststore.jks -alias CARoot -import -file ca-certNote: If you configure the Kafka brokers to require client authentication by setting ssl.client.auth to be "requested" or "required" on the Kafka brokers config then you must provide a truststore for the Kafka brokers as well and it should have all the CA certificates that clients keys were signed by.
keytool -keystore client.truststore.jks -alias CARoot -import -file ca-certIn contrast to the keystore in step 1 that stores each machine's own identity, the truststore of a client stores all the certificates that the client should trust. Importing a certificate into one's truststore also means trusting all certificates that are signed by that certificate. As the analogy above, trusting the government (CA) also means trusting all passports (certificates) that it has issued. This attribute is called the chain of trust, and it is particularly useful when deploying SSL on a large Kafka cluster. You can sign all certificates in the cluster with a single CA, and have all machines share the same truststore that trusts the CA. That way all machines can authenticate all other machines.
keytool -keystore server.keystore.jks -alias localhost -certreq -file cert-fileThen sign it with the CA:
openssl x509 -req -CA ca-cert -CAkey ca-key -in cert-file -out cert-signed -days {validity} -CAcreateserial -passin pass:{ca-password}Finally, you need to import both the certificate of the CA and the signed certificate into the keystore:
keytool -keystore server.keystore.jks -alias CARoot -import -file ca-cert keytool -keystore server.keystore.jks -alias localhost -import -file cert-signedThe definitions of the parameters are the following:
#!/bin/bash #Step 1 keytool -keystore server.keystore.jks -alias localhost -validity 365 -keyalg RSA -genkey #Step 2 openssl req -new -x509 -keyout ca-key -out ca-cert -days 365 keytool -keystore server.truststore.jks -alias CARoot -import -file ca-cert keytool -keystore client.truststore.jks -alias CARoot -import -file ca-cert #Step 3 keytool -keystore server.keystore.jks -alias localhost -certreq -file cert-file openssl x509 -req -CA ca-cert -CAkey ca-key -in cert-file -out cert-signed -days 365 -CAcreateserial -passin pass:test1234 keytool -keystore server.keystore.jks -alias CARoot -import -file ca-cert keytool -keystore server.keystore.jks -alias localhost -import -file cert-signed
listenersIf SSL is not enabled for inter-broker communication (see below for how to enable it), both PLAINTEXT and SSL ports will be necessary.
listeners=PLAINTEXT://host.name:port,SSL://host.name:portFollowing SSL configs are needed on the broker side
ssl.keystore.location=/var/private/ssl/kafka.server.keystore.jks ssl.keystore.password=test1234 ssl.key.password=test1234 ssl.truststore.location=/var/private/ssl/kafka.server.truststore.jks ssl.truststore.password=test1234Optional settings that are worth considering:
security.inter.broker.protocol=SSL
Due to import regulations in some countries, the Oracle implementation limits the strength of cryptographic algorithms available by default. If stronger algorithms are needed (for example, AES with 256-bit keys), the JCE Unlimited Strength Jurisdiction Policy Files must be obtained and installed in the JDK/JRE. See the JCA Providers Documentation for more information.
Once you start the broker you should be able to see in the server.logwith addresses: PLAINTEXT -> EndPoint(192.168.64.1,9092,PLAINTEXT),SSL -> EndPoint(192.168.64.1,9093,SSL)To check quickly if the server keystore and truststore are setup properly you can run the following command
openssl s_client -debug -connect localhost:9093 -tls1(Note: TLSv1 should be listed under ssl.enabled.protocols)
-----BEGIN CERTIFICATE----- {variable sized random bytes} -----END CERTIFICATE----- subject=/C=US/ST=CA/L=Santa Clara/O=org/OU=org/CN=Sriharsha Chintalapani issuer=/C=US/ST=CA/L=Santa Clara/O=org/OU=org/CN=kafka/emailAddress=test@test.comIf the certificate does not show up or if there are any other error messages then your keystore is not setup properly.
security.protocol=SSL ssl.truststore.location=/var/private/ssl/kafka.client.truststore.jks ssl.truststore.password=test1234If client authentication is required, then a keystore must be created like in step 1 and the following must also be configured:
ssl.keystore.location=/var/private/ssl/kafka.client.keystore.jks ssl.keystore.password=test1234 ssl.key.password=test1234Other configuration settings that may also be needed depending on our requirements and the broker configuration:
kafka-console-producer.sh --broker-list localhost:9093 --topic test --producer.config client-ssl.properties kafka-console-consumer.sh --bootstrap-server localhost:9093 --topic test --new-consumer --consumer.config client-ssl.properties
-Djava.security.auth.login.config=/etc/kafka/kafka_server_jaas.conf
listeners=SASL_PLAINTEXT://host.name:portIf SASL_SSL is used, then SSL must also be configured. If you are only configuring a SASL port (or if you want the Kafka brokers to authenticate each other using SASL) then make sure you set the same SASL protocol for inter-broker communication:
security.inter.broker.protocol=SASL_PLAINTEXT (or SASL_SSL)
sasl.enabled.mechanisms=GSSAPI (,PLAIN)
sasl.mechanism.inter.broker.protocol=GSSAPI (or PLAIN)
-Djava.security.auth.login.config=/etc/kafka/kafka_client_jaas.conf
security.protocol=SASL_PLAINTEXT (or SASL_SSL) sasl.mechanism=GSSAPI (or PLAIN)
sudo /usr/sbin/kadmin.local -q 'addprinc -randkey kafka/{hostname}@{REALM}' sudo /usr/sbin/kadmin.local -q "ktadd -k /etc/security/keytabs/{keytabname}.keytab kafka/{hostname}@{REALM}"
KafkaServer { com.sun.security.auth.module.Krb5LoginModule required useKeyTab=true storeKey=true keyTab="/etc/security/keytabs/kafka_server.keytab" principal="kafka/kafka1.hostname.com@EXAMPLE.COM"; }; // Zookeeper client authentication Client { com.sun.security.auth.module.Krb5LoginModule required useKeyTab=true storeKey=true keyTab="/etc/security/keytabs/kafka_server.keytab" principal="kafka/kafka1.hostname.com@EXAMPLE.COM"; };
-Djava.security.krb5.conf=/etc/kafka/krb5.conf -Djava.security.auth.login.config=/etc/kafka/kafka_server_jaas.conf
listeners=SASL_PLAINTEXT://host.name:port security.inter.broker.protocol=SASL_PLAINTEXT sasl.mechanism.inter.broker.protocol=GSSAPI sasl.enabled.mechanisms=GSSAPI
sasl.kerberos.service.name=kafka
KafkaClient { com.sun.security.auth.module.Krb5LoginModule required useKeyTab=true storeKey=true keyTab="/etc/security/keytabs/kafka_client.keytab" principal="kafka-client-1@EXAMPLE.COM"; };For command-line utilities like kafka-console-consumer or kafka-console-producer, kinit can be used along with "useTicketCache=true" as in:
KafkaClient { com.sun.security.auth.module.Krb5LoginModule required useTicketCache=true; };
-Djava.security.krb5.conf=/etc/kafka/krb5.conf -Djava.security.auth.login.config=/etc/kafka/kafka_client_jaas.conf
security.protocol=SASL_PLAINTEXT (or SASL_SSL) sasl.mechanism=GSSAPI sasl.kerberos.service.name=kafka
SASL/PLAIN is a simple username/password authentication mechanism that is typically used with TLS for encryption to implement secure authentication. Kafka supports a default implementation for SASL/PLAIN which can be extended for production use as described here.
The username is used as the authenticatedPrincipal
for configuration of ACLs etc.
KafkaServer { org.apache.kafka.common.security.plain.PlainLoginModule required username="admin" password="admin-secret" user_admin="admin-secret" user_alice="alice-secret"; };This configuration defines two users (admin and alice). The properties username and password in the KafkaServer section are used by the broker to initiate connections to other brokers. In this example, admin is the user for inter-broker communication. The set of properties user_userName defines the passwords for all users that connect to the broker and the broker validates all client connections including those from other brokers using these properties.
-Djava.security.auth.login.config=/etc/kafka/kafka_server_jaas.conf
listeners=SASL_SSL://host.name:port security.inter.broker.protocol=SASL_SSL sasl.mechanism.inter.broker.protocol=PLAIN sasl.enabled.mechanisms=PLAIN
KafkaClient { org.apache.kafka.common.security.plain.PlainLoginModule required username="alice" password="alice-secret"; };The properties username and password in the KafkaClient section are used by clients to configure the user for client connections. In this example, clients connect to the broker as user alice.
-Djava.security.auth.login.config=/etc/kafka/kafka_client_jaas.conf
security.protocol=SASL_SSL sasl.mechanism=PLAIN
javax.security.auth.spi.LoginModule
that provides usernames and passwords from an external source. The login module implementation should
provide username as the public credential and password as the private credential of the Subject
. The default implementation
org.apache.kafka.common.security.plain.PlainLoginModule
can be used as an example.javax.security.sasl.SaslServer
. The default implementation included in Kafka in the package
org.apache.kafka.common.security.plain
can be used as an example to get started.
security.provider.n=providerClassNamewhere providerClassName is the fully qualified name of the new provider and n is the preference order with lower numbers indicating higher preference.
Security.addProvider
at the beginning of the client
application or in a static initializer in the login module. For example:
Security.addProvider(new PlainSaslServerProvider());
KafkaServer { com.sun.security.auth.module.Krb5LoginModule required useKeyTab=true storeKey=true keyTab="/etc/security/keytabs/kafka_server.keytab" principal="kafka/kafka1.hostname.com@EXAMPLE.COM"; org.apache.kafka.common.security.plain.PlainLoginModule required username="admin" password="admin-secret" user_admin="admin-secret" user_alice="alice-secret"; };
sasl.enabled.mechanisms=GSSAPI,PLAIN
security.inter.broker.protocol=SASL_PLAINTEXT (or SASL_SSL) sasl.mechanism.inter.broker.protocol=GSSAPI (or PLAIN)
SASL mechanism can be modified in a running cluster using the following sequence:
allow.everyone.if.no.acl.found=trueOne can also add super users in broker.properties like the following (note that the delimiter is semicolon since SSL user names may contain comma).
super.users=User:Bob;User:AliceBy default, the SSL user name will be of the form "CN=writeuser,OU=Unknown,O=Unknown,L=Unknown,ST=Unknown,C=Unknown". One can change that by setting a customized PrincipalBuilder in broker.properties like the following.
principal.builder.class=CustomizedPrincipalBuilderClassBy default, the SASL user name will be the primary part of the Kerberos principal. One can change that by setting
sasl.kerberos.principal.to.local.rules
to a customized rule in broker.properties.
The format of sasl.kerberos.principal.to.local.rules
is a list where each rule works in the same way as the auth_to_local in Kerberos configuration file (krb5.conf). Each rules starts with RULE: and contains an expression in the format [n:string](regexp)s/pattern/replacement/g. See the kerberos documentation for more details. An example of adding a rule to properly translate user@MYDOMAIN.COM to user while also keeping the default rule in place is:
sasl.kerberos.principal.to.local.rules=RULE:[1:$1@$0](.*@MYDOMAIN.COM)s/@.*//,DEFAULT
Option | Description | Default | Option type |
---|---|---|---|
--add | Indicates to the script that user is trying to add an acl. | Action | |
--remove | Indicates to the script that user is trying to remove an acl. | Action | |
--list | Indicates to the script that user is trying to list acls. | Action | |
--authorizer | Fully qualified class name of the authorizer. | kafka.security.auth.SimpleAclAuthorizer | Configuration |
--authorizer-properties | key=val pairs that will be passed to authorizer for initialization. For the default authorizer the example values are: zookeeper.connect=localhost:2181 | Configuration | |
--cluster | Specifies cluster as resource. | Resource | |
--topic [topic-name] | Specifies the topic as resource. | Resource | |
--group [group-name] | Specifies the consumer-group as resource. | Resource | |
--allow-principal | Principal is in PrincipalType:name format that will be added to ACL with Allow permission. You can specify multiple --allow-principal in a single command. |
Principal | |
--deny-principal | Principal is in PrincipalType:name format that will be added to ACL with Deny permission. You can specify multiple --deny-principal in a single command. |
Principal | |
--allow-host | IP address from which principals listed in --allow-principal will have access. | if --allow-principal is specified defaults to * which translates to "all hosts" | Host |
--deny-host | IP address from which principals listed in --deny-principal will be denied access. | if --deny-principal is specified defaults to * which translates to "all hosts" | Host |
--operation | Operation that will be allowed or denied. Valid values are : Read, Write, Create, Delete, Alter, Describe, ClusterAction, All |
All | Operation |
--producer | Convenience option to add/remove acls for producer role. This will generate acls that allows WRITE, DESCRIBE on topic and CREATE on cluster. | Convenience | |
--consumer | Convenience option to add/remove acls for consumer role. This will generate acls that allows READ, DESCRIBE on topic and READ on consumer-group. | Convenience | |
--force | Convenience option to assume yes to all queries and do not prompt. | Convenience |
bin/kafka-acls.sh --authorizer-properties zookeeper.connect=localhost:2181 --add --allow-principal User:Bob --allow-principal User:Alice --allow-host 198.51.100.0 --allow-host 198.51.100.1 --operation Read --operation Write --topic Test-topicBy default all principals that don't have an explicit acl that allows access for an operation to a resource are denied. In rare cases where an allow acl is defined that allows access to all but some principal we will have to use the --deny-principal and --deny-host option. For example, if we want to allow all users to Read from Test-topic but only deny User:BadBob from IP 198.51.100.3 we can do so using following commands:
bin/kafka-acls.sh --authorizer-properties zookeeper.connect=localhost:2181 --add --allow-principal User:* --allow-host * --deny-principal User:BadBob --deny-host 198.51.100.3 --operation Read --topic Test-topicNote that ``--allow-host`` and ``deny-host`` only support IP addresses (hostnames are not supported). Above examples add acls to a topic by specifying --topic [topic-name] as the resource option. Similarly user can add acls to cluster by specifying --cluster and to a consumer group by specifying --group [group-name].
bin/kafka-acls.sh --authorizer-properties zookeeper.connect=localhost:2181 --remove --allow-principal User:Bob --allow-principal User:Alice --allow-host 198.51.100.0 --allow-host 198.51.100.1 --operation Read --operation Write --topic Test-topic
bin/kafka-acls.sh --authorizer-properties zookeeper.connect=localhost:2181 --list --topic Test-topic
bin/kafka-acls.sh --authorizer-properties zookeeper.connect=localhost:2181 --add --allow-principal User:Bob --producer --topic Test-topicSimilarly to add Alice as a consumer of Test-topic with consumer group Group-1 we just have to pass --consumer option:
bin/kafka-acls.sh --authorizer-properties zookeeper.connect=localhost:2181 --add --allow-principal User:Bob --consumer --topic test-topic --group Group-1Note that for consumer option we must also specify the consumer group. In order to remove a principal from producer or consumer role we just need to pass --remove option.
listeners=PLAINTEXT://broker1:9091,SSL://broker1:9092We then restart the clients, changing their config to point at the newly opened, secured port:
bootstrap.servers = [broker1:9092,...] security.protocol = SSL ...etcIn the second incremental server bounce we instruct Kafka to use SSL as the broker-broker protocol (which will use the same SSL port):
listeners=PLAINTEXT://broker1:9091,SSL://broker1:9092 security.inter.broker.protocol=SSLIn the final bounce we secure the cluster by closing the PLAINTEXT port:
listeners=SSL://broker1:9092 security.inter.broker.protocol=SSLAlternatively we might choose to open multiple ports so that different protocols can be used for broker-broker and broker-client communication. Say we wished to use SSL encryption throughout (i.e. for broker-broker and broker-client communication) but we'd like to add SASL authentication to the broker-client connection also. We would achieve this by opening two additional ports during the first bounce:
listeners=PLAINTEXT://broker1:9091,SSL://broker1:9092,SASL_SSL://broker1:9093We would then restart the clients, changing their config to point at the newly opened, SASL & SSL secured port:
bootstrap.servers = [broker1:9093,...] security.protocol = SASL_SSL ...etcThe second server bounce would switch the cluster to use encrypted broker-broker communication via the SSL port we previously opened on port 9092:
listeners=PLAINTEXT://broker1:9091,SSL://broker1:9092,SASL_SSL://broker1:9093 security.inter.broker.protocol=SSLThe final bounce secures the cluster by closing the PLAINTEXT port.
listeners=SSL://broker1:9092,SASL_SSL://broker1:9093 security.inter.broker.protocol=SSLZooKeeper can be secured independently of the Kafka cluster. The steps for doing this are covered in section 7.6.2.
It is also possible to turn off authentication in a secure cluster. To do it, follow these steps:
./bin/zookeeper-security-migration --zookeeper.acl=secure --zookeeper.connection=localhost:2181
Run this to see the full list of parameters:
./bin/zookeeper-security-migration --help