keytool -keystore server.keystore.jks -alias localhost -validity {validity} -genkeyYou need to specify two parameters in the above command:
Therefore, it is important to prevent forged certificates by signing them for each machine in the cluster. A certificate authority (CA) is responsible for signing certificates. CA works likes a government that issues passports—the government stamps (signs) each passport so that the passport becomes difficult to forge. Other governments verify the stamps to ensure the passport is authentic. Similarly, the CA signs the certificates, and the cryptography guarantees that a signed certificate is computationally difficult to forge. Thus, as long as the CA is a genuine and trusted authority, the clients have high assurance that they are connecting to the authentic machines.
openssl req -new -x509 -keyout ca-key -out ca-cert -days 365The generated CA is simply a public-private key pair and certificate, and it is intended to sign other certificates.
keytool -keystore server.truststore.jks -alias CARoot -import -file ca-certNote: If you configure the Kafka brokers to require client authentication by setting ssl.client.auth to be "requested" or "required" on the Kafka brokers config then you must provide a truststore for the Kafka brokers as well and it should have all the CA certificates that clients keys were signed by.
keytool -keystore client.truststore.jks -alias CARoot -import -file ca-certIn contrast to the keystore in step 1 that stores each machine’s own identity, the truststore of a client stores all the certificates that the client should trust. Importing a certificate into one’s truststore also means trusting all certificates that are signed by that certificate. As the analogy above, trusting the government (CA) also means trusting all passports (certificates) that it has issued. This attribute is called the chain of trust, and it is particularly useful when deploying SSL on a large Kafka cluster. You can sign all certificates in the cluster with a single CA, and have all machines share the same truststore that trusts the CA. That way all machines can authenticate all other machines.
keytool -keystore server.keystore.jks -alias localhost -certreq -file cert-fileThen sign it with the CA:
openssl x509 -req -CA ca-cert -CAkey ca-key -in cert-file -out cert-signed -days {validity} -CAcreateserial -passin pass:{ca-password}Finally, you need to import both the certificate of the CA and the signed certificate into the keystore:
keytool -keystore server.keystore.jks -alias CARoot -import -file ca-cert keytool -keystore server.keystore.jks -alias localhost -import -file cert-signedThe definitions of the parameters are the following:
#!/bin/bash #Step 1 keytool -keystore server.keystore.jks -alias localhost -validity 365 -genkey #Step 2 openssl req -new -x509 -keyout ca-key -out ca-cert -days 365 keytool -keystore server.truststore.jks -alias CARoot -import -file ca-cert keytool -keystore client.truststore.jks -alias CARoot -import -file ca-cert #Step 3 keytool -keystore server.keystore.jks -alias localhost -certreq -file cert-file openssl x509 -req -CA ca-cert -CAkey ca-key -in cert-file -out cert-signed -days 365 -CAcreateserial -passin pass:test1234 keytool -keystore server.keystore.jks -alias CARoot -import -file ca-cert keytool -keystore server.keystore.jks -alias localhost -import -file cert-signed
listenersIf SSL is not enabled for inter-broker communication (see below for how to enable it), both PLAINTEXT and SSL ports will be necessary.
listeners=PLAINTEXT://host.name:port,SSL://host.name:portFollowing SSL configs are needed on the broker side
ssl.keystore.location = /var/private/ssl/kafka.server.keystore.jks ssl.keystore.password = test1234 ssl.key.password = test1234 ssl.truststore.location = /var/private/ssl/kafka.server.truststore.jks ssl.truststore.password = test1234Optional settings that are worth considering:
security.inter.broker.protocol = SSL
Due to import regulations in some countries, the Oracle implementation limits the strength of cryptographic algorithms available by default. If stronger algorithms are needed (for example, AES with 256-bit keys), the JCE Unlimited Strength Jurisdiction Policy Files must be obtained and installed in the JDK/JRE. See the JCA Providers Documentation for more information.
Once you start the broker you should be able to see in the server.logwith addresses: PLAINTEXT -> EndPoint(192.168.64.1,9092,PLAINTEXT),SSL -> EndPoint(192.168.64.1,9093,SSL)To check quickly if the server keystore and truststore are setup properly you can run the following command
openssl s_client -debug -connect localhost:9093 -tls1(Note: TLSv1 should be listed under ssl.enabled.protocols)
-----BEGIN CERTIFICATE----- {variable sized random bytes} -----END CERTIFICATE----- subject=/C=US/ST=CA/L=Santa Clara/O=org/OU=org/CN=Sriharsha Chintalapani issuer=/C=US/ST=CA/L=Santa Clara/O=org/OU=org/CN=kafka/emailAddress=test@test.comIf the certificate does not show up or if there are any other error messages than your keystore is not setup properly.
security.protocol = SSL ssl.truststore.location = "/var/private/ssl/kafka.client.truststore.jks" ssl.truststore.password = "test1234"If client authentication is required, then a keystore must be created like in step 1 and the following must also be configured:
ssl.keystore.location = "/var/private/ssl/kafka.client.keystore.jks" ssl.keystore.password = "test1234" ssl.key.password = "test1234"Other configuration settings that may also be needed depending on our requirements and the broker configuration:
kafka-console-producer.sh --broker-list localhost:9093 --topic test --producer.config client-ssl.properties kafka-console-consumer.sh --bootstrap-server localhost:9093 --topic test --new-consumer --consumer.config client-ssl.properties
sudo /usr/sbin/kadmin.local -q 'addprinc -randkey kafka/{hostname}@{REALM}' sudo /usr/sbin/kadmin.local -q "ktadd -k /etc/security/keytabs/{keytabname}.keytab kafka/{hostname}@{REALM}"
KafkaServer { com.sun.security.auth.module.Krb5LoginModule required useKeyTab=true storeKey=true keyTab="/etc/security/keytabs/kafka_server.keytab" principal="kafka/kafka1.hostname.com@EXAMPLE.COM"; }; # Zookeeper client authentication Client { com.sun.security.auth.module.Krb5LoginModule required useKeyTab=true storeKey=true keyTab="/etc/security/keytabs/kafka_server.keytab" principal="kafka/kafka1.hostname.com@EXAMPLE.COM"; };
-Djava.security.auth.login.config=/etc/kafka/kafka_server_jaas.conf
listeners=SASL_PLAINTEXT://host.name:portIf SASL_SSL is used, then SSL must also be configured. If you are only configuring a SASL port (or if you want the Kafka brokers to authenticate each other using SASL) then make sure you set the same SASL protocol for inter-broker communication:
security.inter.broker.protocol=SASL_PLAINTEXT (or SASL_SSL)
sasl.kerberos.service.name="kafka"Important notes:
KafkaClient { com.sun.security.auth.module.Krb5LoginModule required useKeyTab=true storeKey=true keyTab="/etc/security/keytabs/kafka_client.keytab" principal="kafka-client-1@EXAMPLE.COM"; };For command-line utilities like kafka-console-consumer or kafka-console-producer, kinit can be used along with "useTicketCache=true" as in:
KafkaClient { com.sun.security.auth.module.Krb5LoginModule required useTicketCache=true; };
-Djava.security.auth.login.config=/etc/kafka/kafka_client_jaas.conf
security.protocol=SASL_PLAINTEXT (or SASL_SSL) sasl.kerberos.service.name="kafka"
allow.everyone.if.no.acl.found=trueOne can also add super users in broker.properties like the following (note that the delimiter is semicolon since SSL user names may contain comma).
super.users=User:Bob;User:AliceBy default, the SSL user name will be of the form "CN=writeuser,OU=Unknown,O=Unknown,L=Unknown,ST=Unknown,C=Unknown". One can change that by setting a customized PrincipalBuilder in broker.properties like the following.
principal.builder.class=CustomizedPrincipalBuilderClassBy default, the SASL user name will be the primary part of the Kerberos principal. One can change that by setting
sasl.kerberos.principal.to.local.rules
to a customized rule in broker.properties.
Option | Description | Default | Option type |
---|---|---|---|
--add | Indicates to the script that user is trying to add an acl. | Action | |
--remove | Indicates to the script that user is trying to remove an acl. | Action | |
--list | Indicates to the script that user is trying to list acls. | Action | |
--authorizer | Fully qualified class name of the authorizer. | kafka.security.auth.SimpleAclAuthorizer | Configuration |
--authorizer-properties | key=val pairs that will be passed to authorizer for initialization. For the default authorizer the example values are: zookeeper.connect=localhost:2181 | Configuration | |
--cluster | Specifies cluster as resource. | Resource | |
--topic [topic-name] | Specifies the topic as resource. | Resource | |
--consumer-group [group-name] | Specifies the consumer-group as resource. | Resource | |
--allow-principal | Principal is in PrincipalType:name format that will be added to ACL with Allow permission. You can specify multiple --allow-principal in a single command. |
Principal | |
--deny-principal | Principal is in PrincipalType:name format that will be added to ACL with Deny permission. You can specify multiple --deny-principal in a single command. |
Principal | |
--allow-host | Host from which principals listed in --allow-principal will have access. | if --allow-principal is specified defaults to * which translates to "all hosts" | Host |
--deny-host | Host from which principals listed in --deny-principal will be denied access. | if --deny-principal is specified defaults to * which translates to "all hosts" | Host |
--operation | Operation that will be allowed or denied. Valid values are : Read, Write, Create, Delete, Alter, Describe, ClusterAction, All |
All | Operation |
--producer | Convenience option to add/remove acls for producer role. This will generate acls that allows WRITE, DESCRIBE on topic and CREATE on cluster. | Convenience | |
--consumer | Convenience option to add/remove acls for consumer role. This will generate acls that allows READ, DESCRIBE on topic and READ on consumer-group. | Convenience |
bin/kafka-acls.sh --authorizer kafka.security.auth.SimpleAclAuthorizer --authorizer-properties zookeeper.connect=localhost:2181 --add --allow-principal User:Bob --allow-principal User:Alice --allow-host Host1 --allow-host Host2 --operation Read --operation Write --topic Test-topicBy default all principals that don't have an explicit acl that allows access for an operation to a resource are denied. In rare cases where an allow acl is defined that allows access to all but some principal we will have to use the --deny-principal and --deny-host option. For example, if we want to allow all users to Read from Test-topic but only deny User:BadBob from host bad-host we can do so using following commands:
bin/kafka-acls.sh --authorizer kafka.security.auth.SimpleAclAuthorizer --authorizer-properties zookeeper.connect=localhost:2181 --add --allow-principal User:* --allow-host * --deny-principal User:BadBob --deny-host bad-host --operation Read --topic Test-topicAbove examples add acls to a topic by specifying --topic [topic-name] as the resource option. Similarly user can add acls to cluster by specifying --cluster and to a consumer group by specifying --consumer-group [group-name].
bin/kafka-acls.sh --authorizer kafka.security.auth.SimpleAclAuthorizer --authorizer-properties zookeeper.connect=localhost:2181 --remove --allow-principal User:Bob --allow-principal User:Alice --allow-host Host1 --allow-host Host2 --operation Read --operation Write --topic Test-topic
bin/kafka-acls.sh --authorizer kafka.security.auth.SimpleAclAuthorizer --authorizer-properties zookeeper.connect=localhost:2181 --list --topic Test-topic
bin/kafka-acls.sh --authorizer kafka.security.auth.SimpleAclAuthorizer --authorizer-properties zookeeper.connect=localhost:2181 --add --allow-principal User:Bob --producer --topic Test-topicSimilarly to add Alice as a consumer of Test-topic with consumer group Group-1 we just have to pass --consumer option:
bin/kafka-acls.sh --authorizer kafka.security.auth.SimpleAclAuthorizer --authorizer-properties zookeeper.connect=localhost:2181 --add --allow-principal User:Bob --consumer --topic test-topic --consumer-group Group-1Note that for consumer option we must also specify the consumer group. In order to remove a principal from producer or consumer role we just need to pass --remove option.
It is also possible to turn off authentication in a secure cluster. To do it, follow these steps:
./bin/zookeeper-security-migration --zookeeper.acl=secure --zookeeper.connection=localhost:2181
Run this to see the full list of parameters:
./bin/zookeeper-security-migration --help