$ keytool -keystore server.keystore.jks -alias localhost -validity {validity} -genkeyYou need to specify two parameters in the above command:
Therefore, it is important to prevent forged certificates by signing them for each machine in the cluster. A certificate authority (CA) is responsible for signing certificates. CA works likes a government that issues passports—the government stamps (signs) each passport so that the passport becomes difficult to forge. Other governments verify the stamps to ensure the passport is authentic. Similarly, the CA signs the certificates, and the cryptography guarantees that a signed certificate is computationally difficult to forge. Thus, as long as the CA is a genuine and trusted authority, the clients have high assurance that they are connecting to the authentic machines.
openssl req -new -x509 -keyout ca-key -out ca-cert -days 365The generated CA is simply a public-private key pair and certificate, and it is intended to sign other certificates.
keytool -keystore server.truststore.jks -alias CARoot -import -file ca-certNote: If you configure Kafka brokers to require client authentication by setting ssl.client.auth to be "requested" or "required" on Kafka broker config then you must provide a truststore for Kafka broker as well and it should have all the CA certificates that clients keys signed by.
keytool -keystore client.truststore.jks -alias CARoot -import -file ca-certIn contrast to the keystore in step 1 that stores each machine’s own identity, the truststore of a client stores all the certificates that the client should trust. Importing a certificate into one’s truststore also means that trusting all certificates that are signed by that certificate. As the analogy above, trusting the government (CA) also means that trusting all passports (certificates) that it has issued. This attribute is called the chains of trust, and it is particularly useful when deploying SSL on a large Kafka cluster. You can sign all certificates in the cluster with a single CA, and have all machines share the same truststore that trusts the CA. That way all machines can authenticate all other machines.
keytool -keystore server.keystore.jks -alias localhost -certreq -file cert-fileThen sign it with the CA:
openssl x509 -req -CA ca-cert -CAkey ca-key -in cert-file -out cert-signed -days {validity} -CAcreateserial -passin pass:{ca-password}Finally, you need to import both the certificate of the CA and the signed certificate into the keystore:
$ keytool -keystore server.keystore.jks -alias CARoot -import -file ca-cert $ keytool -keystore server.keystore.jks -alias localhost -import -file cert-signedThe definitions of the parameters are the following:
#!/bin/bash #Step 1 keytool -keystore server.keystore.jks -alias localhost -validity 365 -genkey #Step 2 openssl req -new -x509 -keyout ca-key -out ca-cert -days 365 keytool -keystore server.truststore.jks -alias CARoot -import -file ca-cert keytool -keystore client.truststore.jks -alias CARoot -import -file ca-cert #Step 3 keytool -keystore server.keystore.jks -alias localhost -certreq -file cert-file openssl x509 -req -CA ca-cert -CAkey ca-key -in cert-file -out cert-signed -days 365 -CAcreateserial -passin pass:test1234 keytool -keystore server.keystore.jks -alias CARoot -import -file ca-cert keytool -keystore server.keystore.jks -alias localhost -import -file cert-signed
listenersIf SSL is not enabled for inter-broker communication (see below for how to enable it), both PLAINTEXT and SSL ports will be necessary.
listeners=PLAINTEXT://host.name:port,SSL://host.name:portFollowing SSL configs are needed on the broker side
ssl.keystore.location = /var/private/ssl/kafka.server.keystore.jks ssl.keystore.password = test1234 ssl.key.password = test1234 ssl.truststore.location = /var/private/ssl/kafka.server.truststore.jks ssl.truststore.password = test1234Optional settings that are worth considering:
security.inter.broker.protocol = SSLIf you want to enable any cipher suites other than the defaults that comes with JVM like the ones listed here: https://docs.oracle.com/javase/7/docs/technotes/guides/security/SunProviders.html you will need to install Unlimited Strength Policy files
with addresses: PLAINTEXT -> EndPoint(192.168.64.1,9092,PLAINTEXT),SSL -> EndPoint(192.168.64.1,9093,SSL)To check quickly if the server keystore and truststore are setup properly you can run the following command
openssl s_client -debug -connect localhost:9093 -tls1(Note: TLSv1 should be listed under ssl.enabled.protocols)
-----BEGIN CERTIFICATE----- {variable sized random bytes} -----END CERTIFICATE----- subject=/C=US/ST=CA/L=Santa Clara/O=org/OU=org/CN=Sriharsha Chintalapani issuer=/C=US/ST=CA/L=Santa Clara/O=org/OU=org/CN=kafka/emailAddress=test@test.comIf the certificate does not show up or if there are any other error messages than your keystore is not setup properly.
security.protocol = SSL ssl.truststore.location = "/var/private/ssl/kafka.client.truststore.jks" ssl.truststore.password = "test1234"If client authentication is required, then a keystore must be created like in step 1 and the following must also be configured:
ssl.keystore.location = "/var/private/ssl/kafka.client.keystore.jks" ssl.keystore.password = "test1234" ssl.key.password = "test1234"Other configuration settings that may also be needed depending on our requirements and the broker configuration:\
kafka-console-producer.sh --broker-list localhost:9093 --topic test --new-producer --producer-property "security.protocol=SSL" --producer-property "ssl.truststore.location=client.truststore.jks" --producer-property "ssl.truststore.password=test1234" kafka-console-consumer.sh --bootstrap-server localhost:9093 --topic test --new-consumer --consumer.config client-ssl.properties
sudo /usr/sbin/kadmin.local -q 'addprinc -randkey kafka/hostname@domainname'
sudo /usr/sbin/kadmin.local -q "ktadd -k /etc/security/keytabs/kafka.keytab kafka/hostname@domainname"
KafkaServer { com.sun.security.auth.module.Krb5LoginModule required useKeyTab=true storeKey=true serviceName="kafka" keyTab="/etc/security/keytabs/kafka1.keytab" principal="kafka/kafka1.hostname.com@DOMAIN.COM"; }; Client { com.sun.security.auth.module.Krb5LoginModule required useKeyTab=true storeKey=true serviceName="zookeeper" keyTab="/etc/security/keytabs/kafka1.keytab" principal="kafka/kafka1.hostname.com@DOMAIN.COM"; }; KafkaClient { com.sun.security.auth.module.Krb5LoginModule required useTicketCache=true serviceName="kafka"; };Important notes:
KafkaClient { com.sun.security.auth.module.Krb5LoginModule required useKeyTab=true storeKey=true serviceName="kafka" keyTab="/etc/security/keytabs/kafka1.keytab" principal="kafkaproducer/hostname@DOMAIN.COM"; };
-Djava.security.auth.login.config=/etc/kafka/kafka_jaas.conf
listeners=SASL_PLAINTEXT://host.name:portIf you are only configuring SASL port (or if you are very paranoid and want the Kafka brokers to authenticate each other using SASL) then make sure you set same SASL protocol for inter-broker communication:
security.inter.broker.protocol=SASL_PLAINTEXT
-Djava.security.auth.login.config=/etc/kafka/kafka_client_jaas.conf
security.protocol=SASL_PLAINTEXT
allow.everyone.if.no.acl.found=trueOne can also add super users in broker.properties like the following.
super.users=User:Bob;User:Alice
Option | Description | Default | Option type |
---|---|---|---|
--add | Indicates to the script that user is trying to add an acl. | Action | |
--remove | Indicates to the script that user is trying to remove an acl. | Action | |
--list | Indicates to the script that user is trying to list acls. | Action | |
--authorizer | Fully qualified class name of the authorizer. | kafka.security.auth.SimpleAclAuthorizer | Configuration |
--authorizer-properties | key=val pairs that will be passed to authorizer for initialization. For the default authorizer the example values are: zookeeper.connect=localhost:2181 | Configuration | |
--cluster | Specifies cluster as resource. | Resource | |
--topic [topic-name] | Specifies the topic as resource. | Resource | |
--consumer-group [group-name] | Specifies the consumer-group as resource. | Resource | |
--allow-principal | Principal is in PrincipalType:name format that will be added to ACL with Allow permission. You can specify multiple --allow-principal in a single command. |
Principal | |
--deny-principal | Principal is in PrincipalType:name format that will be added to ACL with Deny permission. You can specify multiple --deny-principal in a single command. |
Principal | |
--allow-hosts | Comma separated list of hosts from which principals listed in --allow-principals will have access. | if --allow-principals is specified defaults to * which translates to "all hosts" | Host |
--deny-hosts | Comma separated list of hosts from which principals listed in --deny-principals will be denied access. | if --deny-principals is specified defaults to * which translates to "all hosts" | Host |
--operations | Comma separated list of operations. Valid values are : Read, Write, Create, Delete, Alter, Describe, ClusterAction, All |
All | Operation |
--producer | Convenience option to add/remove acls for producer role. This will generate acls that allows WRITE, DESCRIBE on topic and CREATE on cluster. | Convenience | |
--consumer | Convenience option to add/remove acls for consumer role. This will generate acls that allows READ, DESCRIBE on topic and READ on consumer-group. | Convenience |
bin/kafka-acls.sh --authorizer kafka.security.auth.SimpleAclAuthorizer --authorizer-properties zookeeper.connect=localhost:2181 --add --allow-principal User:Bob --allow-principal User:Alice --allow-hosts Host1,Host2 --operations Read,Write --topic Test-topicBy default all principals that don't have an explicit acl that allows access for an operation to a resource are denied. In rare cases where an allow acl is defined that allows access to all but some principal we will have to use the --deny-principals and --deny-host option. For example, if we want to allow all users to Read from Test-topic but only deny User:BadBob from host bad-host we can do so using following commands:
bin/kafka-acls.sh --authorizer kafka.security.auth.SimpleAclAuthorizer --authorizer-properties zookeeper.connect=localhost:2181 --add --allow-principal User:* --allow-hosts * --deny-principal User:BadBob --deny-hosts bad-host --operations Read--topic Test-topicAbove examples add acls to a topic by specifying --topic [topic-name] as the resource option. Similarly user can add acls to cluster by specifying --cluster and to a consumer group by specifying --consumer-group [group-name].
bin/kafka-acls.sh --authorizer kafka.security.auth.SimpleAclAuthorizer --authorizer-properties zookeeper.connect=localhost:2181 --remove --allow-principal User:Bob --allow-principal User:Alice --allow-hosts Host1,Host2 --operations Read,Write --topic Test-topic
bin/kafka-acls.sh --authorizer kafka.security.auth.SimpleAclAuthorizer --authorizer-properties zookeeper.connect=localhost:2181 --list --topic Test-topic
bin/kafka-acls.sh --authorizer kafka.security.auth.SimpleAclAuthorizer --authorizer-properties zookeeper.connect=localhost:2181 --add --allow-principal User:Bob --producer --topic Test-topicSimilarly to add Alice as a consumer of Test-topic with consumer group Group-1 we just have to pass --consumer option:
bin/kafka-acls.sh --authorizer kafka.security.auth.SimpleAclAuthorizer --authorizer-properties zookeeper.connect=localhost:2181 --add --allow-principal User:Bob --consumer --topic test-topic --consumer-group Group-1Note that for consumer option we must also specify the consumer group. In order to remove a principal from producer or consumer role we just need to pass --remove option.
It is also possible to turn off authentication in a secure cluster. To do it, follow these steps:
./bin/zookeeper-security-migration --zookeeper.acl=secure --zookeeper.connection=localhost:2181
Run this to see the full list of parameters:
./bin/zookeeper-security-migration --help