Browse Source

KAFKA-8843: KIP-515: Zookeeper TLS support

Signed-off-by: Ron Dagostino <rdagostinoconfluent.io>

Author: Ron Dagostino <rdagostino@confluent.io>

Reviewers: Manikumar Reddy <manikumar.reddy@gmail.com>, Rajini Sivaram <rajinisivaram@googlemail.com>

Closes #8003 from rondagostino/KAFKA-8843
pull/8075/head
Ron Dagostino 5 years ago committed by Manikumar Reddy
parent
commit
342f13a838
  1. 4
      bin/windows/zookeeper-shell.bat
  2. 4
      bin/zookeeper-shell.sh
  3. 5
      clients/src/main/java/org/apache/kafka/common/security/JaasUtils.java
  4. 21
      clients/src/main/java/org/apache/kafka/common/utils/Utils.java
  5. 23
      clients/src/test/java/org/apache/kafka/common/utils/UtilsTest.java
  6. 23
      core/src/main/scala/kafka/admin/AclCommand.scala
  7. 13
      core/src/main/scala/kafka/admin/ConfigCommand.scala
  8. 2
      core/src/main/scala/kafka/admin/PreferredReplicaLeaderElectionCommand.scala
  9. 2
      core/src/main/scala/kafka/admin/ReassignPartitionsCommand.scala
  10. 2
      core/src/main/scala/kafka/admin/TopicCommand.scala
  11. 60
      core/src/main/scala/kafka/admin/ZkSecurityMigrator.scala
  12. 39
      core/src/main/scala/kafka/security/authorizer/AclAuthorizer.scala
  13. 198
      core/src/main/scala/kafka/server/KafkaConfig.scala
  14. 33
      core/src/main/scala/kafka/server/KafkaServer.scala
  15. 6
      core/src/main/scala/kafka/zk/KafkaZkClient.scala
  16. 16
      core/src/main/scala/kafka/zookeeper/ZooKeeperClient.scala
  17. 100
      core/src/main/scala/org/apache/zookeeper/ZooKeeperMainWithTlsSupportForKafka.scala
  18. 157
      core/src/test/scala/unit/kafka/KafkaConfigTest.scala
  19. 6
      core/src/test/scala/unit/kafka/security/auth/ZkAuthorizationTest.scala
  20. 146
      core/src/test/scala/unit/kafka/security/authorizer/AclAuthorizerTest.scala
  21. 14
      core/src/test/scala/unit/kafka/server/KafkaConfigTest.scala
  22. 62
      core/src/test/scala/unit/kafka/server/KafkaServerTest.scala
  23. 32
      core/src/test/scala/unit/kafka/zk/KafkaZkClientTest.scala
  24. 2
      core/src/test/scala/unit/kafka/zk/ZooKeeperTestHarness.scala
  25. 28
      core/src/test/scala/unit/kafka/zookeeper/ZooKeeperClientTest.scala
  26. 143
      docs/security.html
  27. 5
      docs/toc.html
  28. 2
      tests/kafkatest/services/kafka/config_property.py
  29. 30
      tests/kafkatest/services/kafka/kafka.py
  30. 12
      tests/kafkatest/services/kafka/templates/kafka.properties
  31. 28
      tests/kafkatest/services/security/security_config.py
  32. 13
      tests/kafkatest/services/templates/zookeeper.properties
  33. 83
      tests/kafkatest/services/zookeeper.py
  34. 11
      tests/kafkatest/tests/core/security_rolling_upgrade_test.py
  35. 15
      tests/kafkatest/tests/core/zookeeper_security_upgrade_test.py
  36. 156
      tests/kafkatest/tests/core/zookeeper_tls_test.py

4
bin/windows/zookeeper-shell.bat

@ -15,8 +15,8 @@ rem See the License for the specific language governing permissions and @@ -15,8 +15,8 @@ rem See the License for the specific language governing permissions and
rem limitations under the License.
IF [%1] EQU [] (
echo USAGE: %0 zookeeper_host:port[/path] [args...]
echo USAGE: %0 zookeeper_host:port[/path] [-zk-tls-config-file file] [args...]
EXIT /B 1
)
"%~dp0kafka-run-class.bat" org.apache.zookeeper.ZooKeeperMain -server %*
"%~dp0kafka-run-class.bat" org.apache.zookeeper.ZooKeeperMainWithTlsSupportForKafka -server %*

4
bin/zookeeper-shell.sh

@ -16,8 +16,8 @@ @@ -16,8 +16,8 @@
if [ $# -lt 1 ];
then
echo "USAGE: $0 zookeeper_host:port[/path] [args...]"
echo "USAGE: $0 zookeeper_host:port[/path] [-zk-tls-config-file file] [args...]"
exit 1
fi
exec $(dirname $0)/kafka-run-class.sh org.apache.zookeeper.ZooKeeperMain -server "$@"
exec $(dirname $0)/kafka-run-class.sh org.apache.zookeeper.ZooKeeperMainWithTlsSupportForKafka -server "$@"

5
clients/src/main/java/org/apache/kafka/common/security/JaasUtils.java

@ -49,7 +49,10 @@ public final class JaasUtils { @@ -49,7 +49,10 @@ public final class JaasUtils {
"]";
}
public static boolean isZkSecurityEnabled() {
public static boolean isZkSaslEnabled() {
// Technically a client must also check if TLS mutual authentication has been configured,
// but we will leave that up to the client code to determine since direct connectivity to ZooKeeper
// has been deprecated in many clients and we don't wish to re-introduce a ZooKeeper jar dependency here.
boolean zkSaslEnabled = Boolean.parseBoolean(System.getProperty(ZK_SASL_CLIENT, DEFAULT_ZK_SASL_CLIENT));
String zkLoginContextName = System.getProperty(ZK_LOGIN_CONTEXT_NAME_KEY, DEFAULT_ZK_LOGIN_CONTEXT_NAME);

21
clients/src/main/java/org/apache/kafka/common/utils/Utils.java

@ -553,8 +553,19 @@ public final class Utils { @@ -553,8 +553,19 @@ public final class Utils {
/**
* Read a properties file from the given path
* @param filename The path of the file to read
* @return the loaded properties
*/
public static Properties loadProps(String filename) throws IOException {
return loadProps(filename, null);
}
/**
* Read a properties file from the given path
* @param filename The path of the file to read
* @param onlyIncludeKeys When non-null, only return values associated with these keys and ignore all others
* @return the loaded properties
*/
public static Properties loadProps(String filename, List<String> onlyIncludeKeys) throws IOException {
Properties props = new Properties();
if (filename != null) {
@ -565,7 +576,15 @@ public final class Utils { @@ -565,7 +576,15 @@ public final class Utils {
System.out.println("Did not load any properties since the property file is not specified");
}
return props;
if (onlyIncludeKeys == null || onlyIncludeKeys.isEmpty())
return props;
Properties requestedProps = new Properties();
onlyIncludeKeys.forEach(key -> {
String value = props.getProperty(key);
if (value != null)
requestedProps.setProperty(key, value);
});
return requestedProps;
}
/**

23
clients/src/test/java/org/apache/kafka/common/utils/UtilsTest.java

@ -30,9 +30,11 @@ import java.nio.channels.FileChannel; @@ -30,9 +30,11 @@ import java.nio.channels.FileChannel;
import java.nio.charset.StandardCharsets;
import java.nio.file.Files;
import java.nio.file.StandardOpenOption;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashSet;
import java.util.Map;
import java.util.Properties;
import java.util.Random;
import java.util.Set;
import java.util.stream.Collectors;
@ -452,6 +454,27 @@ public class UtilsTest { @@ -452,6 +454,27 @@ public class UtilsTest {
verify(channelMock, atLeastOnce()).read(any(), anyLong());
}
@Test
public void testLoadProps() throws IOException {
File tempFile = TestUtils.tempFile();
try {
String testContent = "a=1\nb=2\n#a comment\n\nc=3\nd=";
Files.write(tempFile.toPath(), testContent.getBytes());
Properties props = Utils.loadProps(tempFile.getPath());
assertEquals(4, props.size());
assertEquals("1", props.get("a"));
assertEquals("2", props.get("b"));
assertEquals("3", props.get("c"));
assertEquals("", props.get("d"));
Properties restrictedProps = Utils.loadProps(tempFile.getPath(), Arrays.asList("b", "d", "e"));
assertEquals(2, restrictedProps.size());
assertEquals("2", restrictedProps.get("b"));
assertEquals("", restrictedProps.get("d"));
} finally {
Files.deleteIfExists(tempFile.toPath());
}
}
/**
* Expectation setter for multiple reads where each one reads random bytes to the buffer.
*

23
core/src/main/scala/kafka/admin/AclCommand.scala

@ -186,14 +186,26 @@ object AclCommand extends Logging { @@ -186,14 +186,26 @@ object AclCommand extends Logging {
class AuthorizerService(val authorizerClassName: String, val opts: AclCommandOptions) extends AclCommandService with Logging {
private def withAuthorizer()(f: Authorizer => Unit): Unit = {
val defaultProps = Map(KafkaConfig.ZkEnableSecureAclsProp -> JaasUtils.isZkSecurityEnabled)
val authorizerProperties =
// It is possible that zookeeper.set.acl could be true without SASL if mutual certificate authentication is configured.
// We will default the value of zookeeper.set.acl to true or false based on whether SASL is configured,
// but if SASL is not configured and zookeeper.set.acl is supposed to be true due to mutual certificate authentication
// then it will be up to the user to explicitly specify zookeeper.set.acl=true in the authorizer-properties.
val defaultProps = Map(KafkaConfig.ZkEnableSecureAclsProp -> JaasUtils.isZkSaslEnabled)
val authorizerPropertiesWithoutTls =
if (opts.options.has(opts.authorizerPropertiesOpt)) {
val authorizerProperties = opts.options.valuesOf(opts.authorizerPropertiesOpt).asScala
defaultProps ++ CommandLineUtils.parseKeyValueArgs(authorizerProperties, acceptMissingValue = false).asScala
} else {
defaultProps
}
val authorizerProperties =
if (opts.options.has(opts.zkTlsConfigFile)) {
// load in TLS configs both with and without the "authorizer." prefix
val validKeys = (KafkaConfig.ZkSslConfigToSystemPropertyMap.keys.toList ++ KafkaConfig.ZkSslConfigToSystemPropertyMap.keys.map("authorizer." + _).toList).asJava
authorizerPropertiesWithoutTls ++ Utils.loadProps(opts.options.valueOf(opts.zkTlsConfigFile), validKeys).asInstanceOf[java.util.Map[String, Any]].asScala
}
else
authorizerPropertiesWithoutTls
val authZ = AuthorizerUtils.createAuthorizer(authorizerClassName)
try {
@ -585,6 +597,13 @@ object AclCommand extends Logging { @@ -585,6 +597,13 @@ object AclCommand extends Logging {
val forceOpt = parser.accepts("force", "Assume Yes to all queries and do not prompt.")
val zkTlsConfigFile = parser.accepts("zk-tls-config-file",
"Identifies the file where ZooKeeper client TLS connectivity properties for the authorizer are defined. Any properties other than the following (with or without an \"authorizer.\" prefix) are ignored: " +
KafkaConfig.ZkSslConfigToSystemPropertyMap.keys.toList.sorted.mkString(", ") +
". Note that if SASL is not configured and zookeeper.set.acl is supposed to be true due to mutual certificate authentication being used" +
" then it is necessary to explicitly specify --authorizer-properties zookeeper.set.acl=true")
.withRequiredArg().describedAs("Authorizer ZooKeeper TLS configuration").ofType(classOf[String])
options = parser.parse(args: _*)
def checkArgs(): Unit = {

13
core/src/main/scala/kafka/admin/ConfigCommand.scala

@ -28,7 +28,7 @@ import kafka.utils.{CommandDefaultOptions, CommandLineUtils, Exit, PasswordEncod @@ -28,7 +28,7 @@ import kafka.utils.{CommandDefaultOptions, CommandLineUtils, Exit, PasswordEncod
import kafka.utils.Implicits._
import kafka.zk.{AdminZkClient, KafkaZkClient}
import org.apache.kafka.clients.CommonClientConfigs
import org.apache.kafka.clients.admin.{Admin, AlterConfigOp, AlterConfigsOptions, ConfigEntry, DescribeClusterOptions, Config => JConfig, ListTopicsOptions}
import org.apache.kafka.clients.admin.{Admin, AlterConfigOp, AlterConfigsOptions, ConfigEntry, DescribeClusterOptions, ListTopicsOptions, Config => JConfig}
import org.apache.kafka.common.config.ConfigResource
import org.apache.kafka.common.config.types.Password
import org.apache.kafka.common.errors.InvalidConfigurationException
@ -36,6 +36,7 @@ import org.apache.kafka.common.internals.Topic @@ -36,6 +36,7 @@ import org.apache.kafka.common.internals.Topic
import org.apache.kafka.common.security.JaasUtils
import org.apache.kafka.common.security.scram.internals.{ScramCredentialUtils, ScramFormatter, ScramMechanism}
import org.apache.kafka.common.utils.{Sanitizer, Time, Utils}
import org.apache.zookeeper.client.ZKClientConfig
import scala.collection.JavaConverters._
import scala.collection._
@ -103,8 +104,10 @@ object ConfigCommand extends Config { @@ -103,8 +104,10 @@ object ConfigCommand extends Config {
}
private def processCommandWithZk(zkConnectString: String, opts: ConfigCommandOptions): Unit = {
val zkClient = KafkaZkClient(zkConnectString, JaasUtils.isZkSecurityEnabled, 30000, 30000,
Int.MaxValue, Time.SYSTEM)
val zkClientConfig = ZkSecurityMigrator.createZkClientConfigFromOption(opts.options, opts.zkTlsConfigFile)
.getOrElse(new ZKClientConfig())
val zkClient = KafkaZkClient(zkConnectString, JaasUtils.isZkSaslEnabled || KafkaConfig.zkTlsClientAuthEnabled(zkClientConfig), 30000, 30000,
Int.MaxValue, Time.SYSTEM, zkClientConfig = Some(zkClientConfig))
val adminZkClient = new AdminZkClient(zkClient)
try {
if (opts.options.has(opts.alterOpt))
@ -596,6 +599,10 @@ object ConfigCommand extends Config { @@ -596,6 +599,10 @@ object ConfigCommand extends Config {
val brokerLogger = parser.accepts("broker-logger", "The broker's ID for its logger config.")
.withRequiredArg
.ofType(classOf[String])
val zkTlsConfigFile = parser.accepts("zk-tls-config-file",
"Identifies the file where ZooKeeper client TLS connectivity properties are defined. Any properties other than " +
KafkaConfig.ZkSslConfigToSystemPropertyMap.keys.toList.sorted.mkString(", ") + " are ignored.")
.withRequiredArg().describedAs("ZooKeeper TLS configuration").ofType(classOf[String])
options = parser.parse(args : _*)
private val entityFlags = List((topic, ConfigType.Topic),

2
core/src/main/scala/kafka/admin/PreferredReplicaLeaderElectionCommand.scala

@ -65,7 +65,7 @@ object PreferredReplicaLeaderElectionCommand extends Logging { @@ -65,7 +65,7 @@ object PreferredReplicaLeaderElectionCommand extends Logging {
println(s"Warning: --zookeeper is deprecated and will be removed in a future version of Kafka.")
println(s"Use --bootstrap-server instead to specify a broker to connect to.")
new ZkCommand(commandOpts.options.valueOf(commandOpts.zkConnectOpt),
JaasUtils.isZkSecurityEnabled,
JaasUtils.isZkSaslEnabled,
timeout)
} else {
val adminProps = if (commandOpts.options.has(commandOpts.adminClientConfigOpt))

2
core/src/main/scala/kafka/admin/ReassignPartitionsCommand.scala

@ -52,7 +52,7 @@ object ReassignPartitionsCommand extends Logging { @@ -52,7 +52,7 @@ object ReassignPartitionsCommand extends Logging {
val opts = validateAndParseArgs(args)
val zkConnect = opts.options.valueOf(opts.zkConnectOpt)
val time = Time.SYSTEM
val zkClient = KafkaZkClient(zkConnect, JaasUtils.isZkSecurityEnabled, 30000, 30000, Int.MaxValue, time)
val zkClient = KafkaZkClient(zkConnect, JaasUtils.isZkSaslEnabled, 30000, 30000, Int.MaxValue, time)
val adminClientOpt = createAdminClient(opts)

2
core/src/main/scala/kafka/admin/TopicCommand.scala

@ -338,7 +338,7 @@ object TopicCommand extends Logging { @@ -338,7 +338,7 @@ object TopicCommand extends Logging {
object ZookeeperTopicService {
def apply(zkConnect: Option[String]): ZookeeperTopicService =
new ZookeeperTopicService(KafkaZkClient(zkConnect.get, JaasUtils.isZkSecurityEnabled, 30000, 30000,
new ZookeeperTopicService(KafkaZkClient(zkConnect.get, JaasUtils.isZkSaslEnabled, 30000, 30000,
Int.MaxValue, Time.SYSTEM))
}

60
core/src/main/scala/kafka/admin/ZkSecurityMigrator.scala

@ -17,13 +17,16 @@ @@ -17,13 +17,16 @@
package kafka.admin
import joptsimple.{ArgumentAcceptingOptionSpec, OptionSet}
import kafka.server.KafkaConfig
import kafka.utils.{CommandDefaultOptions, CommandLineUtils, Exit, Logging}
import kafka.zk.{ControllerZNode, KafkaZkClient, ZkData, ZkSecurityMigratorUtils}
import org.apache.kafka.common.security.JaasUtils
import org.apache.kafka.common.utils.Time
import org.apache.kafka.common.utils.{Time, Utils}
import org.apache.zookeeper.AsyncCallback.{ChildrenCallback, StatCallback}
import org.apache.zookeeper.KeeperException
import org.apache.zookeeper.KeeperException.Code
import org.apache.zookeeper.client.ZKClientConfig
import org.apache.zookeeper.data.Stat
import scala.annotation.tailrec
@ -60,6 +63,7 @@ object ZkSecurityMigrator extends Logging { @@ -60,6 +63,7 @@ object ZkSecurityMigrator extends Logging {
val usageMessage = ("ZooKeeper Migration Tool Help. This tool updates the ACLs of "
+ "znodes as part of the process of setting up ZooKeeper "
+ "authentication.")
val tlsConfigFileOption = "zk-tls-config-file"
def run(args: Array[String]): Unit = {
val jaasFile = System.getProperty(JaasUtils.JAVA_LOGIN_CONFIG_PARAM)
@ -67,17 +71,23 @@ object ZkSecurityMigrator extends Logging { @@ -67,17 +71,23 @@ object ZkSecurityMigrator extends Logging {
CommandLineUtils.printHelpAndExitIfNeeded(opts, usageMessage)
if (jaasFile == null) {
val errorMsg = "No JAAS configuration file has been specified. Please make sure that you have set " +
"the system property %s".format(JaasUtils.JAVA_LOGIN_CONFIG_PARAM)
System.out.println("ERROR: %s".format(errorMsg))
throw new IllegalArgumentException("Incorrect configuration")
// Must have either SASL or TLS mutual authentication enabled to use this tool.
// Instantiate the client config we will use so that we take into account config provided via the CLI option
// and system properties passed via -D parameters if no CLI option is given.
val zkClientConfig = createZkClientConfigFromOption(opts.options, opts.zkTlsConfigFile).getOrElse(new ZKClientConfig())
val tlsClientAuthEnabled = KafkaConfig.zkTlsClientAuthEnabled(zkClientConfig)
if (jaasFile == null && !tlsClientAuthEnabled) {
val errorMsg = s"No JAAS configuration file has been specified and no TLS client certificate has been specified. Please make sure that you set " +
s"the system property ${JaasUtils.JAVA_LOGIN_CONFIG_PARAM} or provide a ZooKeeper client TLS configuration via --$tlsConfigFileOption <filename> " +
s"identifying at least ${KafkaConfig.ZkSslClientEnableProp}, ${KafkaConfig.ZkClientCnxnSocketProp}, and ${KafkaConfig.ZkSslKeyStoreLocationProp}"
System.err.println("ERROR: %s".format(errorMsg))
throw new IllegalArgumentException("Incorrect configuration")
}
if (!JaasUtils.isZkSecurityEnabled()) {
if (!tlsClientAuthEnabled && !JaasUtils.isZkSaslEnabled()) {
val errorMsg = "Security isn't enabled, most likely the file isn't set properly: %s".format(jaasFile)
System.out.println("ERROR: %s".format(errorMsg))
throw new IllegalArgumentException("Incorrect configuration")
throw new IllegalArgumentException("Incorrect configuration")
}
val zkAcl: Boolean = opts.options.valueOf(opts.zkAclOpt) match {
@ -94,7 +104,7 @@ object ZkSecurityMigrator extends Logging { @@ -94,7 +104,7 @@ object ZkSecurityMigrator extends Logging {
val zkSessionTimeout = opts.options.valueOf(opts.zkSessionTimeoutOpt).intValue
val zkConnectionTimeout = opts.options.valueOf(opts.zkConnectionTimeoutOpt).intValue
val zkClient = KafkaZkClient(zkUrl, zkAcl, zkSessionTimeout, zkConnectionTimeout,
Int.MaxValue, Time.SYSTEM)
Int.MaxValue, Time.SYSTEM, zkClientConfig = Some(zkClientConfig))
val enablePathCheck = opts.options.has(opts.enablePathCheckOpt)
val migrator = new ZkSecurityMigrator(zkClient)
migrator.run(enablePathCheck)
@ -104,11 +114,34 @@ object ZkSecurityMigrator extends Logging { @@ -104,11 +114,34 @@ object ZkSecurityMigrator extends Logging {
try {
run(args)
} catch {
case e: Exception =>
case e: Exception => {
e.printStackTrace()
// must exit with non-zero status so system tests will know we failed
Exit.exit(1)
}
}
}
def createZkClientConfigFromFile(filename: String) : ZKClientConfig = {
val zkTlsConfigFileProps = Utils.loadProps(filename, KafkaConfig.ZkSslConfigToSystemPropertyMap.keys.toList.asJava)
val zkClientConfig = new ZKClientConfig() // Initializes based on any system properties that have been set
// Now override any set system properties with explicitly-provided values from the config file
// Emit INFO logs due to camel-case property names encouraging mistakes -- help people see mistakes they make
info(s"Found ${zkTlsConfigFileProps.size()} ZooKeeper client configuration properties in file $filename")
zkTlsConfigFileProps.entrySet().asScala.foreach(entry => {
val key = entry.getKey.toString
info(s"Setting $key")
KafkaConfig.setZooKeeperClientProperty(zkClientConfig, key, entry.getValue.toString)
})
zkClientConfig
}
private[admin] def createZkClientConfigFromOption(options: OptionSet, option: ArgumentAcceptingOptionSpec[String]) : Option[ZKClientConfig] =
if (!options.has(option))
None
else
Some(createZkClientConfigFromFile(options.valueOf(option)))
class ZkSecurityMigratorOptions(args: Array[String]) extends CommandDefaultOptions(args) {
val zkAclOpt = parser.accepts("zookeeper.acl", "Indicates whether to make the Kafka znodes in ZooKeeper secure or unsecure."
+ " The options are 'secure' and 'unsecure'").withRequiredArg().ofType(classOf[String])
@ -121,6 +154,10 @@ object ZkSecurityMigrator extends Logging { @@ -121,6 +154,10 @@ object ZkSecurityMigrator extends Logging {
withRequiredArg().ofType(classOf[java.lang.Integer]).defaultsTo(30000)
val enablePathCheckOpt = parser.accepts("enable.path.check", "Checks if all the root paths exist in ZooKeeper " +
"before migration. If not, exit the command.")
val zkTlsConfigFile = parser.accepts(tlsConfigFileOption,
"Identifies the file where ZooKeeper client TLS connectivity properties are defined. Any properties other than " +
KafkaConfig.ZkSslConfigToSystemPropertyMap.keys.mkString(", ") + " are ignored.")
.withRequiredArg().describedAs("ZooKeeper TLS configuration").ofType(classOf[String])
options = parser.parse(args : _*)
}
}
@ -262,7 +299,8 @@ class ZkSecurityMigrator(zkClient: KafkaZkClient) extends Logging { @@ -262,7 +299,8 @@ class ZkSecurityMigrator(zkClient: KafkaZkClient) extends Logging {
println("That might be due to an incorrect chroot is specified when executing the command.")
if (enablePathCheck) {
println("Exit the command.")
Exit.exit(0)
// must exit with non-zero status so system tests will know we failed
Exit.exit(1)
}
}
}

39
core/src/main/scala/kafka/security/authorizer/AclAuthorizer.scala

@ -24,7 +24,7 @@ import com.typesafe.scalalogging.Logger @@ -24,7 +24,7 @@ import com.typesafe.scalalogging.Logger
import kafka.api.KAFKA_2_0_IV1
import kafka.security.authorizer.AclAuthorizer.VersionedAcls
import kafka.security.authorizer.AclEntry.ResourceSeparator
import kafka.server.KafkaConfig
import kafka.server.{KafkaConfig, KafkaServer}
import kafka.utils.CoreUtils.{inReadLock, inWriteLock}
import kafka.utils._
import kafka.zk._
@ -39,6 +39,7 @@ import org.apache.kafka.common.security.auth.KafkaPrincipal @@ -39,6 +39,7 @@ import org.apache.kafka.common.security.auth.KafkaPrincipal
import org.apache.kafka.common.utils.{Time, SecurityUtils}
import org.apache.kafka.server.authorizer.AclDeleteResult.AclBindingDeleteResult
import org.apache.kafka.server.authorizer._
import org.apache.zookeeper.client.ZKClientConfig
import scala.collection.mutable
import scala.collection.JavaConverters._
@ -47,10 +48,11 @@ import scala.util.{Failure, Random, Success, Try} @@ -47,10 +48,11 @@ import scala.util.{Failure, Random, Success, Try}
object AclAuthorizer {
// Optional override zookeeper cluster configuration where acls will be stored. If not specified,
// acls will be stored in the same zookeeper where all other kafka broker metadata is stored.
val ZkUrlProp = "authorizer.zookeeper.url"
val ZkConnectionTimeOutProp = "authorizer.zookeeper.connection.timeout.ms"
val ZkSessionTimeOutProp = "authorizer.zookeeper.session.timeout.ms"
val ZkMaxInFlightRequests = "authorizer.zookeeper.max.in.flight.requests"
val configPrefix = "authorizer."
val ZkUrlProp = s"${configPrefix}zookeeper.url"
val ZkConnectionTimeOutProp = s"${configPrefix}zookeeper.connection.timeout.ms"
val ZkSessionTimeOutProp = s"${configPrefix}zookeeper.session.timeout.ms"
val ZkMaxInFlightRequests = s"${configPrefix}zookeeper.max.in.flight.requests"
// Semi-colon separated list of users that will be treated as super users and will have access to all the resources
// for all actions from all hosts, defaults to no super users.
@ -80,6 +82,29 @@ object AclAuthorizer { @@ -80,6 +82,29 @@ object AclAuthorizer {
}
}
}
private[authorizer] def zkClientConfigFromKafkaConfigAndMap(kafkaConfig: KafkaConfig, configMap: mutable.Map[String, _<:Any]): Option[ZKClientConfig] = {
val zkSslClientEnable = configMap.get(AclAuthorizer.configPrefix + KafkaConfig.ZkSslClientEnableProp).
map(_.toString).getOrElse(kafkaConfig.zkSslClientEnable.toString).toBoolean
if (!zkSslClientEnable)
None
else {
// start with the base config from the Kafka configuration
// be sure to force creation since the zkSslClientEnable property in the kafkaConfig could be false
val zkClientConfig = KafkaServer.zkClientConfigFromKafkaConfig(kafkaConfig, true)
// add in any prefixed overlays
KafkaConfig.ZkSslConfigToSystemPropertyMap.foreach{ case (kafkaProp, sysProp) => {
val prefixedValue = configMap.get(AclAuthorizer.configPrefix + kafkaProp)
if (prefixedValue.isDefined)
zkClientConfig.get.setProperty(sysProp,
if (kafkaProp == KafkaConfig.ZkSslEndpointIdentificationAlgorithmProp)
(prefixedValue.get.toString.toUpperCase == "HTTPS").toString
else
prefixedValue.get.toString)
}}
zkClientConfig
}
}
}
class AclAuthorizer extends Authorizer with Logging {
@ -124,9 +149,11 @@ class AclAuthorizer extends Authorizer with Logging { @@ -124,9 +149,11 @@ class AclAuthorizer extends Authorizer with Logging {
val zkSessionTimeOutMs = configs.get(AclAuthorizer.ZkSessionTimeOutProp).map(_.toString.toInt).getOrElse(kafkaConfig.zkSessionTimeoutMs)
val zkMaxInFlightRequests = configs.get(AclAuthorizer.ZkMaxInFlightRequests).map(_.toString.toInt).getOrElse(kafkaConfig.zkMaxInFlightRequests)
val zkClientConfig = AclAuthorizer.zkClientConfigFromKafkaConfigAndMap(kafkaConfig, configs)
val time = Time.SYSTEM
zkClient = KafkaZkClient(zkUrl, kafkaConfig.zkEnableSecureAcls, zkSessionTimeOutMs, zkConnectionTimeoutMs,
zkMaxInFlightRequests, time, "kafka.security", "AclAuthorizer", name=Some("ACL authorizer"))
zkMaxInFlightRequests, time, "kafka.security", "AclAuthorizer", name=Some("ACL authorizer"),
zkClientConfig = zkClientConfig)
zkClient.createAclPaths()
extendedAclSupport = kafkaConfig.interBrokerProtocolVersion >= KAFKA_2_0_IV1

198
core/src/main/scala/kafka/server/KafkaConfig.scala

@ -33,6 +33,7 @@ import org.apache.kafka.common.Reconfigurable @@ -33,6 +33,7 @@ import org.apache.kafka.common.Reconfigurable
import org.apache.kafka.common.config.SecurityConfig
import org.apache.kafka.common.config.ConfigDef.{ConfigKey, ValidList}
import org.apache.kafka.common.config.internals.BrokerSecurityConfigs
import org.apache.kafka.common.config.types.Password
import org.apache.kafka.common.config.{AbstractConfig, ConfigDef, ConfigException, SaslConfigs, SslClientAuth, SslConfigs, TopicConfig}
import org.apache.kafka.common.metrics.Sensor
import org.apache.kafka.common.network.ListenerName
@ -40,6 +41,7 @@ import org.apache.kafka.common.record.{LegacyRecord, Records, TimestampType} @@ -40,6 +41,7 @@ import org.apache.kafka.common.record.{LegacyRecord, Records, TimestampType}
import org.apache.kafka.common.security.auth.SecurityProtocol
import org.apache.kafka.common.utils.Utils
import org.apache.kafka.server.authorizer.Authorizer
import org.apache.zookeeper.client.ZKClientConfig
import scala.collection.JavaConverters._
import scala.collection.{Map, Seq}
@ -50,6 +52,11 @@ object Defaults { @@ -50,6 +52,11 @@ object Defaults {
val ZkSyncTimeMs = 2000
val ZkEnableSecureAcls = false
val ZkMaxInFlightRequests = 10
val ZkSslClientEnable = false
val ZkSslProtocol = "TLSv1.2"
val ZkSslEndpointIdentificationAlgorithm = "HTTPS"
val ZkSslCrlEnable = false
val ZkSslOcspEnable = false
/** ********* General Configuration ***********/
val BrokerIdGenerationEnable = true
@ -269,6 +276,62 @@ object KafkaConfig { @@ -269,6 +276,62 @@ object KafkaConfig {
val ZkSyncTimeMsProp = "zookeeper.sync.time.ms"
val ZkEnableSecureAclsProp = "zookeeper.set.acl"
val ZkMaxInFlightRequestsProp = "zookeeper.max.in.flight.requests"
val ZkSslClientEnableProp = "zookeeper.ssl.client.enable"
val ZkClientCnxnSocketProp = "zookeeper.clientCnxnSocket"
val ZkSslKeyStoreLocationProp = "zookeeper.ssl.keystore.location"
val ZkSslKeyStorePasswordProp = "zookeeper.ssl.keystore.password"
val ZkSslKeyStoreTypeProp = "zookeeper.ssl.keystore.type"
val ZkSslTrustStoreLocationProp = "zookeeper.ssl.truststore.location"
val ZkSslTrustStorePasswordProp = "zookeeper.ssl.truststore.password"
val ZkSslTrustStoreTypeProp = "zookeeper.ssl.truststore.type"
val ZkSslProtocolProp = "zookeeper.ssl.protocol"
val ZkSslEnabledProtocolsProp = "zookeeper.ssl.enabled.protocols"
val ZkSslCipherSuitesProp = "zookeeper.ssl.cipher.suites"
val ZkSslEndpointIdentificationAlgorithmProp = "zookeeper.ssl.endpoint.identification.algorithm"
val ZkSslCrlEnableProp = "zookeeper.ssl.crl.enable"
val ZkSslOcspEnableProp = "zookeeper.ssl.ocsp.enable"
// a map from the Kafka config to the corresponding ZooKeeper Java system property
private[kafka] val ZkSslConfigToSystemPropertyMap: Map[String, String] = Map(
ZkSslClientEnableProp -> ZKClientConfig.SECURE_CLIENT,
ZkClientCnxnSocketProp -> ZKClientConfig.ZOOKEEPER_CLIENT_CNXN_SOCKET,
ZkSslKeyStoreLocationProp -> "zookeeper.ssl.keyStore.location",
ZkSslKeyStorePasswordProp -> "zookeeper.ssl.keyStore.password",
ZkSslKeyStoreTypeProp -> "zookeeper.ssl.keyStore.type",
ZkSslTrustStoreLocationProp -> "zookeeper.ssl.trustStore.location",
ZkSslTrustStorePasswordProp -> "zookeeper.ssl.trustStore.password",
ZkSslTrustStoreTypeProp -> "zookeeper.ssl.trustStore.type",
ZkSslProtocolProp -> "zookeeper.ssl.protocol",
ZkSslEnabledProtocolsProp -> "zookeeper.ssl.enabledProtocols",
ZkSslCipherSuitesProp -> "zookeeper.ssl.ciphersuites",
ZkSslEndpointIdentificationAlgorithmProp -> "zookeeper.ssl.hostnameVerification",
ZkSslCrlEnableProp -> "zookeeper.ssl.crl",
ZkSslOcspEnableProp -> "zookeeper.ssl.ocsp")
private[kafka] def getZooKeeperClientProperty(clientConfig: ZKClientConfig, kafkaPropName: String): Option[String] = {
Option(clientConfig.getProperty(ZkSslConfigToSystemPropertyMap(kafkaPropName)))
}
private[kafka] def setZooKeeperClientProperty(clientConfig: ZKClientConfig, kafkaPropName: String, kafkaPropValue: Any): Unit = {
clientConfig.setProperty(ZkSslConfigToSystemPropertyMap(kafkaPropName),
kafkaPropName match {
case ZkSslEndpointIdentificationAlgorithmProp => (kafkaPropValue.toString.toUpperCase == "HTTPS").toString
case ZkSslEnabledProtocolsProp | ZkSslCipherSuitesProp => kafkaPropValue match {
case list: java.util.List[_] => list.asInstanceOf[java.util.List[_]].asScala.mkString(",")
case _ => kafkaPropValue.toString
}
case _ => kafkaPropValue.toString
})
}
// For ZooKeeper TLS client authentication to be enabled the client must (at a minimum) configure itself as using TLS
// with both a client connection socket and a key store location explicitly set.
private[kafka] def zkTlsClientAuthEnabled(zkClientConfig: ZKClientConfig) = {
getZooKeeperClientProperty(zkClientConfig, ZkSslClientEnableProp).getOrElse("false") == "true" &&
getZooKeeperClientProperty(zkClientConfig, ZkClientCnxnSocketProp).isDefined &&
getZooKeeperClientProperty(zkClientConfig, ZkSslKeyStoreLocationProp).isDefined
}
/** ********* General Configuration ***********/
val BrokerIdGenerationEnableProp = "broker.id.generation.enable"
val MaxReservedBrokerIdProp = "reserved.broker.max.id"
@ -503,6 +566,41 @@ object KafkaConfig { @@ -503,6 +566,41 @@ object KafkaConfig {
val ZkSyncTimeMsDoc = "How far a ZK follower can be behind a ZK leader"
val ZkEnableSecureAclsDoc = "Set client to use secure ACLs"
val ZkMaxInFlightRequestsDoc = "The maximum number of unacknowledged requests the client will send to Zookeeper before blocking."
val ZkSslClientEnableDoc = "Set client to use TLS when connecting to ZooKeeper." +
" An explicit value overrides any value set via the <code>zookeeper.client.secure</code> system property (note the different name)." +
s" Defaults to false if neither is set; when true, <code>$ZkClientCnxnSocketProp</code> must be set (typically to <code>org.apache.zookeeper.ClientCnxnSocketNetty</code>); other values to set may include " +
ZkSslConfigToSystemPropertyMap.keys.toList.sorted.filter(x => x != ZkSslClientEnableProp && x != ZkClientCnxnSocketProp).mkString("<code>", "</code>, <code>", "</code>")
val ZkClientCnxnSocketDoc = "Typically set to <code>org.apache.zookeeper.ClientCnxnSocketNetty</code> when using TLS connectivity to ZooKeeper." +
s" Overrides any explicit value set via the same-named <code>${ZkSslConfigToSystemPropertyMap(ZkClientCnxnSocketProp)}</code> system property."
val ZkSslKeyStoreLocationDoc = "Keystore location when using a client-side certificate with TLS connectivity to ZooKeeper." +
s" Overrides any explicit value set via the <code>${ZkSslConfigToSystemPropertyMap(ZkSslKeyStoreLocationProp)}</code> system property (note the camelCase)."
val ZkSslKeyStorePasswordDoc = "Keystore password when using a client-side certificate with TLS connectivity to ZooKeeper." +
s" Overrides any explicit value set via the <code>${ZkSslConfigToSystemPropertyMap(ZkSslKeyStorePasswordProp)}</code> system property (note the camelCase)." +
" Note that ZooKeeper does not support a key password different from the keystore password, so be sure to set the key password in the keystore to be identical to the keystore password; otherwise the connection attempt to Zookeeper will fail."
val ZkSslKeyStoreTypeDoc = "Keystore type when using a client-side certificate with TLS connectivity to ZooKeeper." +
s" Overrides any explicit value set via the <code>${ZkSslConfigToSystemPropertyMap(ZkSslKeyStoreTypeProp)}</code> system property (note the camelCase)." +
" The default value of <code>null</code> means the type will be auto-detected based on the filename extension of the keystore."
val ZkSslTrustStoreLocationDoc = "Truststore location when using TLS connectivity to ZooKeeper." +
s" Overrides any explicit value set via the <code>${ZkSslConfigToSystemPropertyMap(ZkSslTrustStoreLocationProp)}</code> system property (note the camelCase)."
val ZkSslTrustStorePasswordDoc = "Truststore password when using TLS connectivity to ZooKeeper." +
s" Overrides any explicit value set via the <code>${ZkSslConfigToSystemPropertyMap(ZkSslTrustStorePasswordProp)}</code> system property (note the camelCase)."
val ZkSslTrustStoreTypeDoc = "Truststore type when using TLS connectivity to ZooKeeper." +
s" Overrides any explicit value set via the <code>${ZkSslConfigToSystemPropertyMap(ZkSslTrustStoreTypeProp)}</code> system property (note the camelCase)." +
" The default value of <code>null</code> means the type will be auto-detected based on the filename extension of the truststore."
val ZkSslProtocolDoc = "Specifies the protocol to be used in ZooKeeper TLS negotiation." +
s" An explicit value overrides any value set via the same-named <code>${ZkSslConfigToSystemPropertyMap(ZkSslProtocolProp)}</code> system property."
val ZkSslEnabledProtocolsDoc = "Specifies the enabled protocol(s) in ZooKeeper TLS negotiation (csv)." +
s" Overrides any explicit value set via the <code>${ZkSslConfigToSystemPropertyMap(ZkSslEnabledProtocolsProp)}</code> system property (note the camelCase)." +
s" The default value of <code>null</code> means the enabled protocol will be the value of the <code>${KafkaConfig.ZkSslProtocolProp}</code> configuration property."
val ZkSslCipherSuitesDoc = "Specifies the enabled cipher suites to be used in ZooKeeper TLS negotiation (csv)." +
s""" Overrides any explicit value set via the <code>${ZkSslConfigToSystemPropertyMap(ZkSslCipherSuitesProp)}</code> system property (note the single word \"ciphersuites\").""" +
" The default value of <code>null</code> means the list of enabled cipher suites is determined by the Java runtime being used."
val ZkSslEndpointIdentificationAlgorithmDoc = "Specifies whether to enable hostname verification in the ZooKeeper TLS negotiation process, with (case-insensitively) \"https\" meaning ZooKeeper hostname verification is enabled and an explicit blank value meaning it is disabled (disabling it is only recommended for testing purposes)." +
s""" An explicit value overrides any \"true\" or \"false\" value set via the <code>${ZkSslConfigToSystemPropertyMap(ZkSslEndpointIdentificationAlgorithmProp)}</code> system property (note the different name and values; true implies https and false implies blank)."""
val ZkSslCrlEnableDoc = "Specifies whether to enable Certificate Revocation List in the ZooKeeper TLS protocols." +
s" Overrides any explicit value set via the <code>${ZkSslConfigToSystemPropertyMap(ZkSslCrlEnableProp)}</code> system property (note the shorter name)."
val ZkSslOcspEnableDoc = "Specifies whether to enable Online Certificate Status Protocol in the ZooKeeper TLS protocols." +
s" Overrides any explicit value set via the <code>${ZkSslConfigToSystemPropertyMap(ZkSslOcspEnableProp)}</code> system property (note the shorter name)."
/** ********* General Configuration ***********/
val BrokerIdGenerationEnableDoc = s"Enable automatic broker id generation on the server. When enabled the value configured for $MaxReservedBrokerIdProp should be reviewed."
val MaxReservedBrokerIdDoc = "Max number that can be used for a broker.id"
@ -734,7 +832,7 @@ object KafkaConfig { @@ -734,7 +832,7 @@ object KafkaConfig {
val OffsetCommitRequiredAcksDoc = "The required acks before the commit can be accepted. In general, the default (-1) should not be overridden"
/** ********* Transaction management configuration ***********/
val TransactionalIdExpirationMsDoc = "The time in ms that the transaction coordinator will wait without receiving any transaction status updates " +
"for the current transaction before expiring its transactional id. This setting also influences producer id expiration - producer ids are expired " +
"for the current transaction before expiring its transactional id. This setting also influences producer id expiration - producer ids are expired " +
"once this time has elapsed after the last write with the given producer id. Note that producer ids may expire sooner if the last write from the producer id is deleted due to the topic's retention settings."
val TransactionsMaxTimeoutMsDoc = "The maximum allowed timeout for transactions. " +
"If a client’s requested transaction time exceed this, then the broker will return an error in InitProducerIdRequest. This prevents a client from too large of a timeout, which can stall consumers reading from topics included in the transaction."
@ -865,6 +963,20 @@ object KafkaConfig { @@ -865,6 +963,20 @@ object KafkaConfig {
.define(ZkSyncTimeMsProp, INT, Defaults.ZkSyncTimeMs, LOW, ZkSyncTimeMsDoc)
.define(ZkEnableSecureAclsProp, BOOLEAN, Defaults.ZkEnableSecureAcls, HIGH, ZkEnableSecureAclsDoc)
.define(ZkMaxInFlightRequestsProp, INT, Defaults.ZkMaxInFlightRequests, atLeast(1), HIGH, ZkMaxInFlightRequestsDoc)
.define(ZkSslClientEnableProp, BOOLEAN, Defaults.ZkSslClientEnable, MEDIUM, ZkSslClientEnableDoc)
.define(ZkClientCnxnSocketProp, STRING, null, MEDIUM, ZkClientCnxnSocketDoc)
.define(ZkSslKeyStoreLocationProp, STRING, null, MEDIUM, ZkSslKeyStoreLocationDoc)
.define(ZkSslKeyStorePasswordProp, PASSWORD, null, MEDIUM, ZkSslKeyStorePasswordDoc)
.define(ZkSslKeyStoreTypeProp, STRING, null, MEDIUM, ZkSslKeyStoreTypeDoc)
.define(ZkSslTrustStoreLocationProp, STRING, null, MEDIUM, ZkSslTrustStoreLocationDoc)
.define(ZkSslTrustStorePasswordProp, PASSWORD, null, MEDIUM, ZkSslTrustStorePasswordDoc)
.define(ZkSslTrustStoreTypeProp, STRING, null, MEDIUM, ZkSslTrustStoreTypeDoc)
.define(ZkSslProtocolProp, STRING, Defaults.ZkSslProtocol, LOW, ZkSslProtocolDoc)
.define(ZkSslEnabledProtocolsProp, LIST, null, LOW, ZkSslEnabledProtocolsDoc)
.define(ZkSslCipherSuitesProp, LIST, null, LOW, ZkSslCipherSuitesDoc)
.define(ZkSslEndpointIdentificationAlgorithmProp, STRING, Defaults.ZkSslEndpointIdentificationAlgorithm, LOW, ZkSslEndpointIdentificationAlgorithmDoc)
.define(ZkSslCrlEnableProp, BOOLEAN, Defaults.ZkSslCrlEnable, LOW, ZkSslCrlEnableDoc)
.define(ZkSslOcspEnableProp, BOOLEAN, Defaults.ZkSslOcspEnable, LOW, ZkSslOcspEnableDoc)
/** ********* General Configuration ***********/
.define(BrokerIdGenerationEnableProp, BOOLEAN, Defaults.BrokerIdGenerationEnable, MEDIUM, BrokerIdGenerationEnableDoc)
@ -1132,6 +1244,11 @@ class KafkaConfig(val props: java.util.Map[_, _], doLog: Boolean, dynamicConfigO @@ -1132,6 +1244,11 @@ class KafkaConfig(val props: java.util.Map[_, _], doLog: Boolean, dynamicConfigO
this.currentConfig = newConfig
}
// The following captures any system properties impacting ZooKeeper TLS configuration
// and defines the default values this instance will use if no explicit config is given.
// We make it part of each instance rather than the object to facilitate testing.
private val zkClientConfigViaSystemProperties = new ZKClientConfig()
override def originals: util.Map[String, AnyRef] =
if (this eq currentConfig) super.originals else currentConfig.originals
override def values: util.Map[String, _] =
@ -1159,6 +1276,85 @@ class KafkaConfig(val props: java.util.Map[_, _], doLog: Boolean, dynamicConfigO @@ -1159,6 +1276,85 @@ class KafkaConfig(val props: java.util.Map[_, _], doLog: Boolean, dynamicConfigO
val zkSyncTimeMs: Int = getInt(KafkaConfig.ZkSyncTimeMsProp)
val zkEnableSecureAcls: Boolean = getBoolean(KafkaConfig.ZkEnableSecureAclsProp)
val zkMaxInFlightRequests: Int = getInt(KafkaConfig.ZkMaxInFlightRequestsProp)
private def zkBooleanConfigOrSystemPropertyWithDefaultValue(propKey: String): Boolean = {
// Use the system property if it exists and the Kafka config value was defaulted rather than actually provided
// Need to translate any system property value from true/false (String) to true/false (Boolean)
val actuallyProvided = originals.containsKey(propKey)
if (actuallyProvided) getBoolean(propKey) else {
val sysPropValue = KafkaConfig.getZooKeeperClientProperty(zkClientConfigViaSystemProperties, propKey)
sysPropValue match {
case Some("true") => true
case Some(_) => false
case _ => getBoolean(propKey) // not specified so use the default value
}
}
}
private def zkStringConfigOrSystemPropertyWithDefaultValue(propKey: String): String = {
// Use the system property if it exists and the Kafka config value was defaulted rather than actually provided
val actuallyProvided = originals.containsKey(propKey)
if (actuallyProvided) getString(propKey) else {
val sysPropValue = KafkaConfig.getZooKeeperClientProperty(zkClientConfigViaSystemProperties, propKey)
sysPropValue match {
case Some(_) => sysPropValue.get
case _ => getString(propKey) // not specified so use the default value
}
}
}
private def zkOptionalStringConfigOrSystemProperty(propKey: String): Option[String] = {
Option(getString(propKey)) match {
case config: Some[String] => config
case _ => KafkaConfig.getZooKeeperClientProperty(zkClientConfigViaSystemProperties, propKey)
}
}
private def zkPasswordConfigOrSystemProperty(propKey: String): Option[Password] = {
Option(getPassword(propKey)) match {
case config: Some[Password] => config
case _ => {
val sysProp = KafkaConfig.getZooKeeperClientProperty (zkClientConfigViaSystemProperties, propKey)
if (sysProp.isDefined) Some (new Password (sysProp.get) ) else None
}
}
}
private def zkListConfigOrSystemProperty(propKey: String): Option[util.List[String]] = {
Option(getList(propKey)) match {
case config: Some[util.List[String]] => config
case _ => {
val sysProp = KafkaConfig.getZooKeeperClientProperty(zkClientConfigViaSystemProperties, propKey)
if (sysProp.isDefined) Some(sysProp.get.split("\\s*,\\s*").toList.asJava) else None
}
}
}
val zkSslClientEnable = zkBooleanConfigOrSystemPropertyWithDefaultValue(KafkaConfig.ZkSslClientEnableProp)
val zkClientCnxnSocketClassName = zkOptionalStringConfigOrSystemProperty(KafkaConfig.ZkClientCnxnSocketProp)
val zkSslKeyStoreLocation = zkOptionalStringConfigOrSystemProperty(KafkaConfig.ZkSslKeyStoreLocationProp)
val zkSslKeyStorePassword = zkPasswordConfigOrSystemProperty(KafkaConfig.ZkSslKeyStorePasswordProp)
val zkSslKeyStoreType = zkOptionalStringConfigOrSystemProperty(KafkaConfig.ZkSslKeyStoreTypeProp)
val zkSslTrustStoreLocation = zkOptionalStringConfigOrSystemProperty(KafkaConfig.ZkSslTrustStoreLocationProp)
val zkSslTrustStorePassword = zkPasswordConfigOrSystemProperty(KafkaConfig.ZkSslTrustStorePasswordProp)
val zkSslTrustStoreType = zkOptionalStringConfigOrSystemProperty(KafkaConfig.ZkSslTrustStoreTypeProp)
val ZkSslProtocol = zkStringConfigOrSystemPropertyWithDefaultValue(KafkaConfig.ZkSslProtocolProp)
val ZkSslEnabledProtocols = zkListConfigOrSystemProperty(KafkaConfig.ZkSslEnabledProtocolsProp)
val ZkSslCipherSuites = zkListConfigOrSystemProperty(KafkaConfig.ZkSslCipherSuitesProp)
val ZkSslEndpointIdentificationAlgorithm = {
// Use the system property if it exists and the Kafka config value was defaulted rather than actually provided
// Need to translate any system property value from true/false to HTTPS/<blank>
val kafkaProp = KafkaConfig.ZkSslEndpointIdentificationAlgorithmProp
val actuallyProvided = originals.containsKey(kafkaProp)
if (actuallyProvided) getString(kafkaProp) else {
val sysPropValue = KafkaConfig.getZooKeeperClientProperty(zkClientConfigViaSystemProperties, kafkaProp)
sysPropValue match {
case Some("true") => "HTTPS"
case Some(_) => ""
case _ => getString(kafkaProp) // not specified so use the default value
}
}
}
val ZkSslCrlEnable = zkBooleanConfigOrSystemPropertyWithDefaultValue(KafkaConfig.ZkSslCrlEnableProp)
val ZkSslOcspEnable = zkBooleanConfigOrSystemPropertyWithDefaultValue(KafkaConfig.ZkSslOcspEnableProp)
/** ********* General Configuration ***********/
val brokerIdGenerationEnable: Boolean = getBoolean(KafkaConfig.BrokerIdGenerationEnableProp)
val maxReservedBrokerId: Int = getInt(KafkaConfig.MaxReservedBrokerIdProp)

33
core/src/main/scala/kafka/server/KafkaServer.scala

@ -48,6 +48,7 @@ import org.apache.kafka.common.security.{JaasContext, JaasUtils} @@ -48,6 +48,7 @@ import org.apache.kafka.common.security.{JaasContext, JaasUtils}
import org.apache.kafka.common.utils.{AppInfoParser, LogContext, Time}
import org.apache.kafka.common.{ClusterResource, Endpoint, Node}
import org.apache.kafka.server.authorizer.Authorizer
import org.apache.zookeeper.client.ZKClientConfig
import scala.collection.JavaConverters._
import scala.collection.{Map, Seq, mutable}
@ -91,6 +92,28 @@ object KafkaServer { @@ -91,6 +92,28 @@ object KafkaServer {
.timeWindow(kafkaConfig.metricSampleWindowMs, TimeUnit.MILLISECONDS)
}
def zkClientConfigFromKafkaConfig(config: KafkaConfig, forceZkSslClientEnable: Boolean = false) =
if (!config.zkSslClientEnable && !forceZkSslClientEnable)
None
else {
val clientConfig = new ZKClientConfig()
KafkaConfig.setZooKeeperClientProperty(clientConfig, KafkaConfig.ZkSslClientEnableProp, "true")
config.zkClientCnxnSocketClassName.foreach(KafkaConfig.setZooKeeperClientProperty(clientConfig, KafkaConfig.ZkClientCnxnSocketProp, _))
config.zkSslKeyStoreLocation.foreach(KafkaConfig.setZooKeeperClientProperty(clientConfig, KafkaConfig.ZkSslKeyStoreLocationProp, _))
config.zkSslKeyStorePassword.foreach(x => KafkaConfig.setZooKeeperClientProperty(clientConfig, KafkaConfig.ZkSslKeyStorePasswordProp, x.value))
config.zkSslKeyStoreType.foreach(KafkaConfig.setZooKeeperClientProperty(clientConfig, KafkaConfig.ZkSslKeyStoreTypeProp, _))
config.zkSslTrustStoreLocation.foreach(KafkaConfig.setZooKeeperClientProperty(clientConfig, KafkaConfig.ZkSslTrustStoreLocationProp, _))
config.zkSslTrustStorePassword.foreach(x => KafkaConfig.setZooKeeperClientProperty(clientConfig, KafkaConfig.ZkSslTrustStorePasswordProp, x.value))
config.zkSslTrustStoreType.foreach(KafkaConfig.setZooKeeperClientProperty(clientConfig, KafkaConfig.ZkSslTrustStoreTypeProp, _))
KafkaConfig.setZooKeeperClientProperty(clientConfig, KafkaConfig.ZkSslProtocolProp, config.ZkSslProtocol)
config.ZkSslEnabledProtocols.foreach(KafkaConfig.setZooKeeperClientProperty(clientConfig, KafkaConfig.ZkSslEnabledProtocolsProp, _))
config.ZkSslCipherSuites.foreach(KafkaConfig.setZooKeeperClientProperty(clientConfig, KafkaConfig.ZkSslCipherSuitesProp, _))
KafkaConfig.setZooKeeperClientProperty(clientConfig, KafkaConfig.ZkSslEndpointIdentificationAlgorithmProp, config.ZkSslEndpointIdentificationAlgorithm)
KafkaConfig.setZooKeeperClientProperty(clientConfig, KafkaConfig.ZkSslCrlEnableProp, config.ZkSslCrlEnable.toString)
KafkaConfig.setZooKeeperClientProperty(clientConfig, KafkaConfig.ZkSslOcspEnableProp, config.ZkSslOcspEnable.toString)
Some(clientConfig)
}
val MIN_INCREMENTAL_FETCH_SESSION_EVICTION_MS: Long = 120000
}
@ -145,6 +168,7 @@ class KafkaServer(val config: KafkaConfig, time: Time = Time.SYSTEM, threadNameP @@ -145,6 +168,7 @@ class KafkaServer(val config: KafkaConfig, time: Time = Time.SYSTEM, threadNameP
var metadataCache: MetadataCache = null
var quotaManagers: QuotaFactory.QuotaManagers = null
val zkClientConfig: ZKClientConfig = KafkaServer.zkClientConfigFromKafkaConfig(config).getOrElse(new ZKClientConfig())
private var _zkClient: KafkaZkClient = null
val correlationId: AtomicInteger = new AtomicInteger(0)
val brokerMetaPropsFile = "meta.properties"
@ -349,9 +373,10 @@ class KafkaServer(val config: KafkaConfig, time: Time = Time.SYSTEM, threadNameP @@ -349,9 +373,10 @@ class KafkaServer(val config: KafkaConfig, time: Time = Time.SYSTEM, threadNameP
private def initZkClient(time: Time): Unit = {
info(s"Connecting to zookeeper on ${config.zkConnect}")
def createZkClient(zkConnect: String, isSecure: Boolean) =
def createZkClient(zkConnect: String, isSecure: Boolean) = {
KafkaZkClient(zkConnect, isSecure, config.zkSessionTimeoutMs, config.zkConnectionTimeoutMs,
config.zkMaxInFlightRequests, time, name = Some("Kafka server"))
config.zkMaxInFlightRequests, time, name = Some("Kafka server"), zkClientConfig = Some(zkClientConfig))
}
val chrootIndex = config.zkConnect.indexOf("/")
val chrootOption = {
@ -360,10 +385,10 @@ class KafkaServer(val config: KafkaConfig, time: Time = Time.SYSTEM, threadNameP @@ -360,10 +385,10 @@ class KafkaServer(val config: KafkaConfig, time: Time = Time.SYSTEM, threadNameP
}
val secureAclsEnabled = config.zkEnableSecureAcls
val isZkSecurityEnabled = JaasUtils.isZkSecurityEnabled()
val isZkSecurityEnabled = JaasUtils.isZkSaslEnabled() || KafkaConfig.zkTlsClientAuthEnabled(zkClientConfig)
if (secureAclsEnabled && !isZkSecurityEnabled)
throw new java.lang.SecurityException(s"${KafkaConfig.ZkEnableSecureAclsProp} is true, but the " +
throw new java.lang.SecurityException(s"${KafkaConfig.ZkEnableSecureAclsProp} is true, but ZooKeeper client TLS configuration identifying at least $KafkaConfig.ZkSslClientEnableProp, $KafkaConfig.ZkClientCnxnSocketProp, and $KafkaConfig.ZkSslKeyStoreLocationProp was not present and the " +
s"verification of the JAAS login file failed ${JaasUtils.zkSecuritySysConfigString}")
// make sure chroot path exists

6
core/src/main/scala/kafka/zk/KafkaZkClient.scala

@ -36,6 +36,7 @@ import org.apache.kafka.common.utils.{Time, Utils} @@ -36,6 +36,7 @@ import org.apache.kafka.common.utils.{Time, Utils}
import org.apache.kafka.common.{KafkaException, TopicPartition}
import org.apache.zookeeper.KeeperException.{Code, NodeExistsException}
import org.apache.zookeeper.OpResult.{CreateResult, ErrorResult, SetDataResult}
import org.apache.zookeeper.client.ZKClientConfig
import org.apache.zookeeper.data.{ACL, Stat}
import org.apache.zookeeper.{CreateMode, KeeperException, ZooKeeper}
@ -1856,9 +1857,10 @@ object KafkaZkClient { @@ -1856,9 +1857,10 @@ object KafkaZkClient {
time: Time,
metricGroup: String = "kafka.server",
metricType: String = "SessionExpireListener",
name: Option[String] = None) = {
name: Option[String] = None,
zkClientConfig: Option[ZKClientConfig] = None) = {
val zooKeeperClient = new ZooKeeperClient(connectString, sessionTimeoutMs, connectionTimeoutMs, maxInFlightRequests,
time, metricGroup, metricType, name)
time, metricGroup, metricType, name, zkClientConfig)
new KafkaZkClient(zooKeeperClient, isSecure, time)
}

16
core/src/main/scala/kafka/zookeeper/ZooKeeperClient.scala

@ -33,6 +33,7 @@ import org.apache.zookeeper.Watcher.Event.{EventType, KeeperState} @@ -33,6 +33,7 @@ import org.apache.zookeeper.Watcher.Event.{EventType, KeeperState}
import org.apache.zookeeper.ZooKeeper.States
import org.apache.zookeeper.data.{ACL, Stat}
import org.apache.zookeeper._
import org.apache.zookeeper.client.ZKClientConfig
import scala.collection.JavaConverters._
import scala.collection.Seq
@ -46,6 +47,7 @@ import scala.collection.mutable.Set @@ -46,6 +47,7 @@ import scala.collection.mutable.Set
* @param connectionTimeoutMs connection timeout in milliseconds
* @param maxInFlightRequests maximum number of unacknowledged requests the client will send before blocking.
* @param name name of the client instance
* @param zkClientConfig ZooKeeper client configuration, for TLS configs if desired
*/
class ZooKeeperClient(connectString: String,
sessionTimeoutMs: Int,
@ -54,7 +56,8 @@ class ZooKeeperClient(connectString: String, @@ -54,7 +56,8 @@ class ZooKeeperClient(connectString: String,
time: Time,
metricGroup: String,
metricType: String,
name: Option[String]) extends Logging with KafkaMetricsGroup {
name: Option[String],
zkClientConfig: Option[ZKClientConfig]) extends Logging with KafkaMetricsGroup {
def this(connectString: String,
sessionTimeoutMs: Int,
@ -63,7 +66,8 @@ class ZooKeeperClient(connectString: String, @@ -63,7 +66,8 @@ class ZooKeeperClient(connectString: String,
time: Time,
metricGroup: String,
metricType: String) = {
this(connectString, sessionTimeoutMs, connectionTimeoutMs, maxInFlightRequests, time, metricGroup, metricType, None)
this(connectString, sessionTimeoutMs, connectionTimeoutMs, maxInFlightRequests, time, metricGroup, metricType, None,
None)
}
this.logIdent = name match {
@ -99,9 +103,13 @@ class ZooKeeperClient(connectString: String, @@ -99,9 +103,13 @@ class ZooKeeperClient(connectString: String,
}
}
private val clientConfig = zkClientConfig getOrElse new ZKClientConfig()
info(s"Initializing a new session to $connectString.")
// Fail-fast if there's an error during construction (so don't call initialize, which retries forever)
@volatile private var zooKeeper = new ZooKeeper(connectString, sessionTimeoutMs, ZooKeeperClientWatcher)
@volatile private var zooKeeper = new ZooKeeper(connectString, sessionTimeoutMs, ZooKeeperClientWatcher,
clientConfig)
private[zookeeper] def getClientConfig = clientConfig
newGauge("SessionState", () => connectionState.toString)
@ -370,7 +378,7 @@ class ZooKeeperClient(connectString: String, @@ -370,7 +378,7 @@ class ZooKeeperClient(connectString: String,
var connected = false
while (!connected) {
try {
zooKeeper = new ZooKeeper(connectString, sessionTimeoutMs, ZooKeeperClientWatcher)
zooKeeper = new ZooKeeper(connectString, sessionTimeoutMs, ZooKeeperClientWatcher, clientConfig)
connected = true
} catch {
case e: Exception =>

100
core/src/main/scala/org/apache/zookeeper/ZooKeeperMainWithTlsSupportForKafka.scala

@ -0,0 +1,100 @@ @@ -0,0 +1,100 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.zookeeper
import kafka.admin.ZkSecurityMigrator
import org.apache.zookeeper.admin.ZooKeeperAdmin
import org.apache.zookeeper.cli.CommandNotFoundException
import org.apache.zookeeper.cli.MalformedCommandException
import org.apache.zookeeper.client.ZKClientConfig
import scala.collection.JavaConverters._
object ZooKeeperMainWithTlsSupportForKafka {
val zkTlsConfigFileOption = "-zk-tls-config-file"
def main(args: Array[String]): Unit = {
val zkTlsConfigFileIndex = args.indexOf(zkTlsConfigFileOption)
val zooKeeperMain: ZooKeeperMain =
if (zkTlsConfigFileIndex < 0)
// no TLS config, so just pass args directly
new ZooKeeperMainWithTlsSupportForKafka(args, None)
else if (zkTlsConfigFileIndex == args.length - 1)
throw new IllegalArgumentException(s"Error: no filename provided with option $zkTlsConfigFileOption")
else
// found TLS config, so instantiate it and pass args without the two TLS config-related arguments
new ZooKeeperMainWithTlsSupportForKafka(
args.slice(0, zkTlsConfigFileIndex) ++ args.slice(zkTlsConfigFileIndex + 2, args.length),
Some(ZkSecurityMigrator.createZkClientConfigFromFile(args(zkTlsConfigFileIndex + 1))))
// The run method of ZooKeeperMain is package-private,
// therefore this code unfortunately must reside in the same org.apache.zookeeper package.
zooKeeperMain.run
}
}
class ZooKeeperMainWithTlsSupportForKafka(args: Array[String], val zkClientConfig: Option[ZKClientConfig])
extends ZooKeeperMain(args) with Watcher {
override def processZKCmd (co: ZooKeeperMain.MyCommandOptions): Boolean = {
// Unfortunately the usage() method is static, so it can't be overridden.
// This method is where usage() gets called. We don't cover all possible calls
// to usage() -- we would have to implement the entire method to do that -- but
// the short implementation below covers most cases.
val args = co.getArgArray
val cmd = co.getCommand
if (args.length < 1) {
kafkaTlsUsage
throw new MalformedCommandException("No command entered")
}
if (!ZooKeeperMain.commandMap.containsKey(cmd)) {
kafkaTlsUsage
throw new CommandNotFoundException(s"Command not found $cmd")
}
super.processZKCmd(co)
}
def kafkaTlsUsage(): Unit = {
System.err.println("ZooKeeper -server host:port [-zk-tls-config-file <file>] cmd args")
asScalaSet(ZooKeeperMain.commandMap.keySet).toList.sorted.foreach(cmd =>
System.err.println(s"\t$cmd ${ZooKeeperMain.commandMap.get(cmd)}"))
}
override def connectToZK(newHost: String) = {
// ZooKeeperAdmin has no constructor that supports passing in both readOnly and ZkClientConfig,
// and readOnly ends up being set to false when passing in a ZkClientConfig instance;
// therefore it is currently not possible for us to construct a ZooKeeperAdmin instance with
// both an explicit ZkClientConfig instance and a readOnly value of true.
val readOnlyRequested = cl.getOption("readonly") != null
if (readOnlyRequested && zkClientConfig.isDefined)
throw new IllegalArgumentException(
s"read-only mode (-r) is not supported with an explicit TLS config (${ZooKeeperMainWithTlsSupportForKafka.zkTlsConfigFileOption})")
if (zk != null && zk.getState.isAlive) zk.close()
host = newHost
zk = if (zkClientConfig.isDefined)
new ZooKeeperAdmin(host, cl.getOption("timeout").toInt, this, zkClientConfig.get)
else
new ZooKeeperAdmin(host, cl.getOption("timeout").toInt, this, readOnlyRequested)
}
override def process(event: WatchedEvent): Unit = {
if (getPrintWatches) {
ZooKeeperMain.printMessage("WATCHER::")
ZooKeeperMain.printMessage(event.toString)
}
}
}

157
core/src/test/scala/unit/kafka/KafkaConfigTest.scala

@ -28,6 +28,8 @@ import org.junit.{After, Before, Test} @@ -28,6 +28,8 @@ import org.junit.{After, Before, Test}
import org.junit.Assert._
import org.apache.kafka.common.config.internals.BrokerSecurityConfigs
import scala.collection.JavaConverters._
class KafkaTest {
@Before
@ -109,6 +111,118 @@ class KafkaTest { @@ -109,6 +111,118 @@ class KafkaTest {
assertEquals(password, config.getPassword(KafkaConfig.SslTruststorePasswordProp).value)
}
private val booleanPropValueToSet = true
private val stringPropValueToSet = "foo"
private val passwordPropValueToSet = "ThePa$$word!"
private val listPropValueToSet = List("A", "B")
@Test
def testZkSslClientEnable(): Unit = {
testZkConfig(KafkaConfig.ZkSslClientEnableProp, "zookeeper.ssl.client.enable",
"zookeeper.client.secure", booleanPropValueToSet, config => Some(config.zkSslClientEnable), booleanPropValueToSet, Some(false))
}
@Test
def testZkSslKeyStoreLocation(): Unit = {
testZkConfig(KafkaConfig.ZkSslKeyStoreLocationProp, "zookeeper.ssl.keystore.location",
"zookeeper.ssl.keyStore.location", stringPropValueToSet, config => config.zkSslKeyStoreLocation, stringPropValueToSet)
}
@Test
def testZkSslTrustStoreLocation(): Unit = {
testZkConfig(KafkaConfig.ZkSslTrustStoreLocationProp, "zookeeper.ssl.truststore.location",
"zookeeper.ssl.trustStore.location", stringPropValueToSet, config => config.zkSslTrustStoreLocation, stringPropValueToSet)
}
@Test
def testZookeeperKeyStorePassword(): Unit = {
testZkConfig(KafkaConfig.ZkSslKeyStorePasswordProp, "zookeeper.ssl.keystore.password",
"zookeeper.ssl.keyStore.password", passwordPropValueToSet, config => config.zkSslKeyStorePassword, new Password(passwordPropValueToSet))
}
@Test
def testZookeeperTrustStorePassword(): Unit = {
testZkConfig(KafkaConfig.ZkSslTrustStorePasswordProp, "zookeeper.ssl.truststore.password",
"zookeeper.ssl.trustStore.password", passwordPropValueToSet, config => config.zkSslTrustStorePassword, new Password(passwordPropValueToSet))
}
@Test
def testZkSslKeyStoreType(): Unit = {
testZkConfig(KafkaConfig.ZkSslKeyStoreTypeProp, "zookeeper.ssl.keystore.type",
"zookeeper.ssl.keyStore.type", stringPropValueToSet, config => config.zkSslKeyStoreType, stringPropValueToSet)
}
@Test
def testZkSslTrustStoreType(): Unit = {
testZkConfig(KafkaConfig.ZkSslTrustStoreTypeProp, "zookeeper.ssl.truststore.type",
"zookeeper.ssl.trustStore.type", stringPropValueToSet, config => config.zkSslTrustStoreType, stringPropValueToSet)
}
@Test
def testZkSslProtocol(): Unit = {
testZkConfig(KafkaConfig.ZkSslProtocolProp, "zookeeper.ssl.protocol",
"zookeeper.ssl.protocol", stringPropValueToSet, config => Some(config.ZkSslProtocol), stringPropValueToSet, Some("TLSv1.2"))
}
@Test
def testZkSslEnabledProtocols(): Unit = {
testZkConfig(KafkaConfig.ZkSslEnabledProtocolsProp, "zookeeper.ssl.enabled.protocols",
"zookeeper.ssl.enabledProtocols", listPropValueToSet.mkString(","), config => config.ZkSslEnabledProtocols, listPropValueToSet.asJava)
}
@Test
def testZkSslCipherSuites(): Unit = {
testZkConfig(KafkaConfig.ZkSslCipherSuitesProp, "zookeeper.ssl.cipher.suites",
"zookeeper.ssl.ciphersuites", listPropValueToSet.mkString(","), config => config.ZkSslCipherSuites, listPropValueToSet.asJava)
}
@Test
def testZkSslEndpointIdentificationAlgorithm(): Unit = {
// this property is different than the others
// because the system property values and the Kafka property values don't match
val kafkaPropName = KafkaConfig.ZkSslEndpointIdentificationAlgorithmProp
assertEquals("zookeeper.ssl.endpoint.identification.algorithm", kafkaPropName)
val sysProp = "zookeeper.ssl.hostnameVerification"
val expectedDefaultValue = "HTTPS"
val propertiesFile = prepareDefaultConfig()
// first make sure there is the correct default value
val emptyConfig = KafkaConfig.fromProps(Kafka.getPropsFromArgs(Array(propertiesFile)))
assertNull(emptyConfig.originals.get(kafkaPropName)) // doesn't appear in the originals
assertEquals(expectedDefaultValue, emptyConfig.values.get(kafkaPropName)) // but default value appears in the values
assertEquals(expectedDefaultValue, emptyConfig.ZkSslEndpointIdentificationAlgorithm) // and has the correct default value
// next set system property alone
Map("true" -> "HTTPS", "false" -> "").foreach { case (sysPropValue, expected) => {
try {
System.setProperty(sysProp, sysPropValue)
val config = KafkaConfig.fromProps(Kafka.getPropsFromArgs(Array(propertiesFile)))
assertNull(config.originals.get(kafkaPropName)) // doesn't appear in the originals
assertEquals(expectedDefaultValue, config.values.get(kafkaPropName)) // default value appears in the values
assertEquals(expected, config.ZkSslEndpointIdentificationAlgorithm) // system property impacts the ultimate value of the property
} finally {
System.clearProperty(sysProp)
}
}}
// finally set Kafka config alone
List("https", "").foreach(expected => {
val config = KafkaConfig.fromProps(Kafka.getPropsFromArgs(Array(propertiesFile, "--override", s"$kafkaPropName=${expected}")))
assertEquals(expected, config.originals.get(kafkaPropName)) // appears in the originals
assertEquals(expected, config.values.get(kafkaPropName)) // appears in the values
assertEquals(expected, config.ZkSslEndpointIdentificationAlgorithm) // is the ultimate value
})
}
@Test
def testZkSslCrlEnable(): Unit = {
testZkConfig(KafkaConfig.ZkSslCrlEnableProp, "zookeeper.ssl.crl.enable",
"zookeeper.ssl.crl", booleanPropValueToSet, config => Some(config.ZkSslCrlEnable), booleanPropValueToSet, Some(false))
}
@Test
def testZkSslOcspEnable(): Unit = {
testZkConfig(KafkaConfig.ZkSslOcspEnableProp, "zookeeper.ssl.ocsp.enable",
"zookeeper.ssl.ocsp", booleanPropValueToSet, config => Some(config.ZkSslOcspEnable), booleanPropValueToSet, Some(false))
}
@Test
def testConnectionsMaxReauthMsDefault(): Unit = {
val propertiesFile = prepareDefaultConfig()
@ -124,6 +238,49 @@ class KafkaTest { @@ -124,6 +238,49 @@ class KafkaTest {
assertEquals(expected, config.valuesWithPrefixOverride("sasl_ssl.oauthbearer.").get(BrokerSecurityConfigs.CONNECTIONS_MAX_REAUTH_MS).asInstanceOf[Long])
}
private def testZkConfig[T, U](kafkaPropName: String,
expectedKafkaPropName: String,
sysPropName: String,
propValueToSet: T,
getPropValueFrom: (KafkaConfig) => Option[T],
expectedPropertyValue: U,
expectedDefaultValue: Option[T] = None): Unit = {
assertEquals(expectedKafkaPropName, kafkaPropName)
val propertiesFile = prepareDefaultConfig()
// first make sure there is the correct default value (if any)
val emptyConfig = KafkaConfig.fromProps(Kafka.getPropsFromArgs(Array(propertiesFile)))
assertNull(emptyConfig.originals.get(kafkaPropName)) // doesn't appear in the originals
if (expectedDefaultValue.isDefined) {
// confirm default value behavior
assertEquals(expectedDefaultValue.get, emptyConfig.values.get(kafkaPropName)) // default value appears in the values
assertEquals(expectedDefaultValue.get, getPropValueFrom(emptyConfig).get) // default value appears in the property
} else {
// confirm no default value behavior
assertNull(emptyConfig.values.get(kafkaPropName)) // doesn't appear in the values
assertEquals(None, getPropValueFrom(emptyConfig)) // has no default value
}
// next set system property alone
try {
System.setProperty(sysPropName, s"$propValueToSet")
// need to create a new Kafka config for the system property to be recognized
val config = KafkaConfig.fromProps(Kafka.getPropsFromArgs(Array(propertiesFile)))
assertNull(config.originals.get(kafkaPropName)) // doesn't appear in the originals
// confirm default value (if any) overridden by system property
if (expectedDefaultValue.isDefined)
assertEquals(expectedDefaultValue.get, config.values.get(kafkaPropName)) // default value (different from system property) appears in the values
else
assertNull(config.values.get(kafkaPropName)) // doesn't appear in the values
// confirm system property appears in the property
assertEquals(Some(expectedPropertyValue), getPropValueFrom(config))
} finally {
System.clearProperty(sysPropName)
}
// finally set Kafka config alone
val config = KafkaConfig.fromProps(Kafka.getPropsFromArgs(Array(propertiesFile, "--override", s"$kafkaPropName=${propValueToSet}")))
assertEquals(expectedPropertyValue, config.values.get(kafkaPropName)) // appears in the values
assertEquals(Some(expectedPropertyValue), getPropValueFrom(config)) // appears in the property
}
def prepareDefaultConfig(): String = {
prepareConfig(Array("broker.id=1", "zookeeper.connect=somewhere"))
}

6
core/src/test/scala/unit/kafka/security/auth/ZkAuthorizationTest.scala

@ -66,14 +66,14 @@ class ZkAuthorizationTest extends ZooKeeperTestHarness with Logging { @@ -66,14 +66,14 @@ class ZkAuthorizationTest extends ZooKeeperTestHarness with Logging {
*/
@Test
def testIsZkSecurityEnabled(): Unit = {
assertTrue(JaasUtils.isZkSecurityEnabled())
assertTrue(JaasUtils.isZkSaslEnabled())
Configuration.setConfiguration(null)
System.clearProperty(JaasUtils.JAVA_LOGIN_CONFIG_PARAM)
assertFalse(JaasUtils.isZkSecurityEnabled())
assertFalse(JaasUtils.isZkSaslEnabled())
try {
Configuration.setConfiguration(null)
System.setProperty(JaasUtils.JAVA_LOGIN_CONFIG_PARAM, "no-such-file-exists.conf")
JaasUtils.isZkSecurityEnabled()
JaasUtils.isZkSaslEnabled()
fail("Should have thrown an exception")
} catch {
case _: KafkaException => // Expected

146
core/src/test/scala/unit/kafka/security/authorizer/AclAuthorizerTest.scala

@ -16,11 +16,14 @@ @@ -16,11 +16,14 @@
*/
package kafka.security.authorizer
import java.io.File
import java.net.InetAddress
import java.nio.charset.StandardCharsets.UTF_8
import java.nio.file.Files
import java.util.UUID
import java.util.concurrent.{Executors, Semaphore, TimeUnit}
import kafka.Kafka
import kafka.api.{ApiVersion, KAFKA_2_0_IV0, KAFKA_2_0_IV1}
import kafka.security.authorizer.AclEntry.{WildcardHost, WildcardPrincipalString}
import kafka.server.KafkaConfig
@ -48,6 +51,7 @@ import org.junit.{After, Before, Test} @@ -48,6 +51,7 @@ import org.junit.{After, Before, Test}
import org.scalatest.Assertions.intercept
import scala.collection.JavaConverters._
import scala.collection.mutable
import scala.compat.java8.OptionConverters._
class AclAuthorizerTest extends ZooKeeperTestHarness {
@ -785,6 +789,130 @@ class AclAuthorizerTest extends ZooKeeperTestHarness { @@ -785,6 +789,130 @@ class AclAuthorizerTest extends ZooKeeperTestHarness {
assertEquals(expected, actual)
}
@Test
def testAuthorizerNoZkConfig(): Unit = {
val noTlsProps = Kafka.getPropsFromArgs(Array(prepareDefaultConfig))
assertEquals(None, AclAuthorizer.zkClientConfigFromKafkaConfigAndMap(
KafkaConfig.fromProps(noTlsProps),
mutable.Map(noTlsProps.asInstanceOf[java.util.Map[String, Any]].asScala.toSeq: _*)))
}
@Test
def testAuthorizerZkConfigFromKafkaConfigWithDefaults(): Unit = {
val props = new java.util.Properties()
val kafkaValue = "kafkaValue"
val configs = Map("zookeeper.connect" -> "somewhere", // required, otherwise we would omit it
KafkaConfig.ZkSslClientEnableProp -> "true",
KafkaConfig.ZkClientCnxnSocketProp -> kafkaValue,
KafkaConfig.ZkSslKeyStoreLocationProp -> kafkaValue,
KafkaConfig.ZkSslKeyStorePasswordProp -> kafkaValue,
KafkaConfig.ZkSslKeyStoreTypeProp -> kafkaValue,
KafkaConfig.ZkSslTrustStoreLocationProp -> kafkaValue,
KafkaConfig.ZkSslTrustStorePasswordProp -> kafkaValue,
KafkaConfig.ZkSslTrustStoreTypeProp -> kafkaValue,
KafkaConfig.ZkSslEnabledProtocolsProp -> kafkaValue,
KafkaConfig.ZkSslCipherSuitesProp -> kafkaValue)
configs.foreach{case (key, value) => props.put(key, value.toString) }
val zkClientConfig = AclAuthorizer.zkClientConfigFromKafkaConfigAndMap(
KafkaConfig.fromProps(props), mutable.Map(configs.toSeq: _*))
assertTrue(zkClientConfig.isDefined)
// confirm we get all the values we expect
KafkaConfig.ZkSslConfigToSystemPropertyMap.keys.foreach(prop => prop match {
case KafkaConfig.ZkSslClientEnableProp | KafkaConfig.ZkSslEndpointIdentificationAlgorithmProp =>
assertEquals("true", KafkaConfig.getZooKeeperClientProperty(zkClientConfig.get, prop).getOrElse("<None>"))
case KafkaConfig.ZkSslCrlEnableProp | KafkaConfig.ZkSslOcspEnableProp =>
assertEquals("false", KafkaConfig.getZooKeeperClientProperty(zkClientConfig.get, prop).getOrElse("<None>"))
case KafkaConfig.ZkSslProtocolProp =>
assertEquals("TLSv1.2", KafkaConfig.getZooKeeperClientProperty(zkClientConfig.get, prop).getOrElse("<None>"))
case _ => assertEquals(kafkaValue, KafkaConfig.getZooKeeperClientProperty(zkClientConfig.get, prop).getOrElse("<None>"))
})
}
@Test
def testAuthorizerZkConfigFromKafkaConfig(): Unit = {
val props = new java.util.Properties()
val kafkaValue = "kafkaValue"
val configs = Map("zookeeper.connect" -> "somewhere", // required, otherwise we would omit it
KafkaConfig.ZkSslClientEnableProp -> "true",
KafkaConfig.ZkClientCnxnSocketProp -> kafkaValue,
KafkaConfig.ZkSslKeyStoreLocationProp -> kafkaValue,
KafkaConfig.ZkSslKeyStorePasswordProp -> kafkaValue,
KafkaConfig.ZkSslKeyStoreTypeProp -> kafkaValue,
KafkaConfig.ZkSslTrustStoreLocationProp -> kafkaValue,
KafkaConfig.ZkSslTrustStorePasswordProp -> kafkaValue,
KafkaConfig.ZkSslTrustStoreTypeProp -> kafkaValue,
KafkaConfig.ZkSslProtocolProp -> kafkaValue,
KafkaConfig.ZkSslEnabledProtocolsProp -> kafkaValue,
KafkaConfig.ZkSslCipherSuitesProp -> kafkaValue,
KafkaConfig.ZkSslEndpointIdentificationAlgorithmProp -> "HTTPS",
KafkaConfig.ZkSslCrlEnableProp -> "false",
KafkaConfig.ZkSslOcspEnableProp -> "false")
configs.foreach{case (key, value) => props.put(key, value.toString) }
val zkClientConfig = AclAuthorizer.zkClientConfigFromKafkaConfigAndMap(
KafkaConfig.fromProps(props), mutable.Map(configs.toSeq: _*))
assertTrue(zkClientConfig.isDefined)
// confirm we get all the values we expect
KafkaConfig.ZkSslConfigToSystemPropertyMap.keys.foreach(prop => prop match {
case KafkaConfig.ZkSslClientEnableProp | KafkaConfig.ZkSslEndpointIdentificationAlgorithmProp =>
assertEquals("true", KafkaConfig.getZooKeeperClientProperty(zkClientConfig.get, prop).getOrElse("<None>"))
case KafkaConfig.ZkSslCrlEnableProp | KafkaConfig.ZkSslOcspEnableProp =>
assertEquals("false", KafkaConfig.getZooKeeperClientProperty(zkClientConfig.get, prop).getOrElse("<None>"))
case _ => assertEquals(kafkaValue, KafkaConfig.getZooKeeperClientProperty(zkClientConfig.get, prop).getOrElse("<None>"))
})
}
@Test
def testAuthorizerZkConfigFromPrefixOverrides(): Unit = {
val props = new java.util.Properties()
val kafkaValue = "kafkaValue"
val prefixedValue = "prefixedValue"
val prefix = "authorizer."
val configs = Map("zookeeper.connect" -> "somewhere", // required, otherwise we would omit it
KafkaConfig.ZkSslClientEnableProp -> "false",
KafkaConfig.ZkClientCnxnSocketProp -> kafkaValue,
KafkaConfig.ZkSslKeyStoreLocationProp -> kafkaValue,
KafkaConfig.ZkSslKeyStorePasswordProp -> kafkaValue,
KafkaConfig.ZkSslKeyStoreTypeProp -> kafkaValue,
KafkaConfig.ZkSslTrustStoreLocationProp -> kafkaValue,
KafkaConfig.ZkSslTrustStorePasswordProp -> kafkaValue,
KafkaConfig.ZkSslTrustStoreTypeProp -> kafkaValue,
KafkaConfig.ZkSslProtocolProp -> kafkaValue,
KafkaConfig.ZkSslEnabledProtocolsProp -> kafkaValue,
KafkaConfig.ZkSslCipherSuitesProp -> kafkaValue,
KafkaConfig.ZkSslEndpointIdentificationAlgorithmProp -> "HTTPS",
KafkaConfig.ZkSslCrlEnableProp -> "false",
KafkaConfig.ZkSslOcspEnableProp -> "false",
prefix + KafkaConfig.ZkSslClientEnableProp -> "true",
prefix + KafkaConfig.ZkClientCnxnSocketProp -> prefixedValue,
prefix + KafkaConfig.ZkSslKeyStoreLocationProp -> prefixedValue,
prefix + KafkaConfig.ZkSslKeyStorePasswordProp -> prefixedValue,
prefix + KafkaConfig.ZkSslKeyStoreTypeProp -> prefixedValue,
prefix + KafkaConfig.ZkSslTrustStoreLocationProp -> prefixedValue,
prefix + KafkaConfig.ZkSslTrustStorePasswordProp -> prefixedValue,
prefix + KafkaConfig.ZkSslTrustStoreTypeProp -> prefixedValue,
prefix + KafkaConfig.ZkSslProtocolProp -> prefixedValue,
prefix + KafkaConfig.ZkSslEnabledProtocolsProp -> prefixedValue,
prefix + KafkaConfig.ZkSslCipherSuitesProp -> prefixedValue,
prefix + KafkaConfig.ZkSslEndpointIdentificationAlgorithmProp -> "",
prefix + KafkaConfig.ZkSslCrlEnableProp -> "true",
prefix + KafkaConfig.ZkSslOcspEnableProp -> "true")
configs.foreach{case (key, value) => props.put(key, value.toString) }
val zkClientConfig = AclAuthorizer.zkClientConfigFromKafkaConfigAndMap(
KafkaConfig.fromProps(props), mutable.Map(configs.toSeq: _*))
assertTrue(zkClientConfig.isDefined)
// confirm we get all the values we expect
KafkaConfig.ZkSslConfigToSystemPropertyMap.keys.foreach(prop => prop match {
case KafkaConfig.ZkSslClientEnableProp | KafkaConfig.ZkSslCrlEnableProp | KafkaConfig.ZkSslOcspEnableProp =>
assertEquals("true", KafkaConfig.getZooKeeperClientProperty(zkClientConfig.get, prop).getOrElse("<None>"))
case KafkaConfig.ZkSslEndpointIdentificationAlgorithmProp =>
assertEquals("false", KafkaConfig.getZooKeeperClientProperty(zkClientConfig.get, prop).getOrElse("<None>"))
case _ => assertEquals(prefixedValue, KafkaConfig.getZooKeeperClientProperty(zkClientConfig.get, prop).getOrElse("<None>"))
})
}
private def givenAuthorizerWithProtocolVersion(protocolVersion: Option[ApiVersion]): Unit = {
aclAuthorizer.close()
@ -883,4 +1011,22 @@ class AclAuthorizerTest extends ZooKeeperTestHarness { @@ -883,4 +1011,22 @@ class AclAuthorizerTest extends ZooKeeperTestHarness {
private def validOp(op: AclOperation): Boolean = {
op != AclOperation.ANY && op != AclOperation.UNKNOWN
}
private def prepareDefaultConfig(): String = {
prepareConfig(Array("broker.id=1", "zookeeper.connect=somewhere"))
}
private def prepareConfig(lines : Array[String]): String = {
val file = File.createTempFile("kafkatest", ".properties")
file.deleteOnExit()
val writer = Files.newOutputStream(file.toPath)
try {
lines.foreach { l =>
writer.write(l.getBytes)
writer.write("\n".getBytes)
}
file.getAbsolutePath
} finally writer.close()
}
}

14
core/src/test/scala/unit/kafka/server/KafkaConfigTest.scala

@ -590,6 +590,20 @@ class KafkaConfigTest { @@ -590,6 +590,20 @@ class KafkaConfigTest {
case KafkaConfig.ZkSyncTimeMsProp => assertPropertyInvalid(getBaseProperties(), name, "not_a_number")
case KafkaConfig.ZkEnableSecureAclsProp => assertPropertyInvalid(getBaseProperties(), name, "not_a_boolean")
case KafkaConfig.ZkMaxInFlightRequestsProp => assertPropertyInvalid(getBaseProperties(), name, "not_a_number", "0")
case KafkaConfig.ZkSslClientEnableProp => assertPropertyInvalid(getBaseProperties(), name, "not_a_boolean")
case KafkaConfig.ZkClientCnxnSocketProp => //ignore string
case KafkaConfig.ZkSslKeyStoreLocationProp => //ignore string
case KafkaConfig.ZkSslKeyStorePasswordProp => //ignore string
case KafkaConfig.ZkSslKeyStoreTypeProp => //ignore string
case KafkaConfig.ZkSslTrustStoreLocationProp => //ignore string
case KafkaConfig.ZkSslTrustStorePasswordProp => //ignore string
case KafkaConfig.ZkSslTrustStoreTypeProp => //ignore string
case KafkaConfig.ZkSslProtocolProp => //ignore string
case KafkaConfig.ZkSslEnabledProtocolsProp => //ignore string
case KafkaConfig.ZkSslCipherSuitesProp => //ignore string
case KafkaConfig.ZkSslEndpointIdentificationAlgorithmProp => //ignore string
case KafkaConfig.ZkSslCrlEnableProp => assertPropertyInvalid(getBaseProperties(), name, "not_a_boolean")
case KafkaConfig.ZkSslOcspEnableProp => assertPropertyInvalid(getBaseProperties(), name, "not_a_boolean")
case KafkaConfig.BrokerIdProp => assertPropertyInvalid(getBaseProperties(), name, "not_a_number")
case KafkaConfig.NumNetworkThreadsProp => assertPropertyInvalid(getBaseProperties(), name, "not_a_number", "0")

62
core/src/test/scala/unit/kafka/server/KafkaServerTest.scala

@ -17,9 +17,13 @@ @@ -17,9 +17,13 @@
package kafka.server
import java.util.Properties
import kafka.utils.TestUtils
import kafka.zk.ZooKeeperTestHarness
import org.apache.zookeeper.client.ZKClientConfig
import org.junit.Test
import org.junit.Assert.assertEquals
import org.scalatest.Assertions.intercept
class KafkaServerTest extends ZooKeeperTestHarness {
@ -40,6 +44,64 @@ class KafkaServerTest extends ZooKeeperTestHarness { @@ -40,6 +44,64 @@ class KafkaServerTest extends ZooKeeperTestHarness {
TestUtils.shutdownServers(Seq(server1, server2))
}
@Test
def testCreatesProperZkTlsConfigWhenDisabled(): Unit = {
val props = new Properties
props.put(KafkaConfig.ZkConnectProp, zkConnect) // required, otherwise we would leave it out
props.put(KafkaConfig.ZkSslClientEnableProp, "false")
assertEquals(None, KafkaServer.zkClientConfigFromKafkaConfig(KafkaConfig.fromProps(props)))
}
@Test
def testCreatesProperZkTlsConfigWithTrueValues(): Unit = {
val props = new Properties
props.put(KafkaConfig.ZkConnectProp, zkConnect) // required, otherwise we would leave it out
// should get correct config for all properties if TLS is enabled
val someValue = "some_value"
def kafkaConfigValueToSet(kafkaProp: String) : String = kafkaProp match {
case KafkaConfig.ZkSslClientEnableProp | KafkaConfig.ZkSslCrlEnableProp | KafkaConfig.ZkSslOcspEnableProp => "true"
case KafkaConfig.ZkSslEndpointIdentificationAlgorithmProp => "HTTPS"
case _ => someValue
}
KafkaConfig.ZkSslConfigToSystemPropertyMap.keys.foreach(kafkaProp => props.put(kafkaProp, kafkaConfigValueToSet(kafkaProp)))
val zkClientConfig: Option[ZKClientConfig] = KafkaServer.zkClientConfigFromKafkaConfig(KafkaConfig.fromProps(props))
// now check to make sure the values were set correctly
def zkClientValueToExpect(kafkaProp: String) : String = kafkaProp match {
case KafkaConfig.ZkSslClientEnableProp | KafkaConfig.ZkSslCrlEnableProp | KafkaConfig.ZkSslOcspEnableProp => "true"
case KafkaConfig.ZkSslEndpointIdentificationAlgorithmProp => "true"
case _ => someValue
}
KafkaConfig.ZkSslConfigToSystemPropertyMap.keys.foreach(kafkaProp =>
assertEquals(zkClientValueToExpect(kafkaProp), zkClientConfig.get.getProperty(KafkaConfig.ZkSslConfigToSystemPropertyMap(kafkaProp))))
}
@Test
def testCreatesProperZkTlsConfigWithFalseAndListValues(): Unit = {
val props = new Properties
props.put(KafkaConfig.ZkConnectProp, zkConnect) // required, otherwise we would leave it out
// should get correct config for all properties if TLS is enabled
val someValue = "some_value"
def kafkaConfigValueToSet(kafkaProp: String) : String = kafkaProp match {
case KafkaConfig.ZkSslClientEnableProp => "true"
case KafkaConfig.ZkSslCrlEnableProp | KafkaConfig.ZkSslOcspEnableProp => "false"
case KafkaConfig.ZkSslEndpointIdentificationAlgorithmProp => ""
case KafkaConfig.ZkSslEnabledProtocolsProp | KafkaConfig.ZkSslCipherSuitesProp => "A,B"
case _ => someValue
}
KafkaConfig.ZkSslConfigToSystemPropertyMap.keys.foreach(kafkaProp => props.put(kafkaProp, kafkaConfigValueToSet(kafkaProp)))
val zkClientConfig: Option[ZKClientConfig] = KafkaServer.zkClientConfigFromKafkaConfig(KafkaConfig.fromProps(props))
// now check to make sure the values were set correctly
def zkClientValueToExpect(kafkaProp: String) : String = kafkaProp match {
case KafkaConfig.ZkSslClientEnableProp => "true"
case KafkaConfig.ZkSslCrlEnableProp | KafkaConfig.ZkSslOcspEnableProp => "false"
case KafkaConfig.ZkSslEndpointIdentificationAlgorithmProp => "false"
case KafkaConfig.ZkSslEnabledProtocolsProp | KafkaConfig.ZkSslCipherSuitesProp => "A,B"
case _ => someValue
}
KafkaConfig.ZkSslConfigToSystemPropertyMap.keys.foreach(kafkaProp =>
assertEquals(zkClientValueToExpect(kafkaProp), zkClientConfig.get.getProperty(KafkaConfig.ZkSslConfigToSystemPropertyMap(kafkaProp))))
}
def createServer(nodeId: Int, hostName: String, port: Int): KafkaServer = {
val props = TestUtils.createBrokerConfig(nodeId, zkConnect)
props.put(KafkaConfig.AdvertisedListenersProp, s"PLAINTEXT://$hostName:$port")

32
core/src/test/scala/unit/kafka/zk/KafkaZkClientTest.scala

@ -23,7 +23,7 @@ import java.util.concurrent.{CountDownLatch, TimeUnit} @@ -23,7 +23,7 @@ import java.util.concurrent.{CountDownLatch, TimeUnit}
import kafka.api.{ApiVersion, LeaderAndIsr}
import kafka.cluster.{Broker, EndPoint}
import kafka.log.LogConfig
import kafka.server.ConfigType
import kafka.server.{ConfigType, KafkaConfig}
import kafka.utils.CoreUtils
import org.apache.kafka.common.TopicPartition
import org.apache.kafka.common.network.ListenerName
@ -50,6 +50,7 @@ import org.apache.kafka.common.resource.ResourcePattern @@ -50,6 +50,7 @@ import org.apache.kafka.common.resource.ResourcePattern
import org.apache.kafka.common.resource.ResourceType.{GROUP, TOPIC}
import org.apache.kafka.common.security.JaasUtils
import org.apache.zookeeper.ZooDefs
import org.apache.zookeeper.client.ZKClientConfig
import org.apache.zookeeper.data.Stat
class KafkaZkClientTest extends ZooKeeperTestHarness {
@ -71,9 +72,9 @@ class KafkaZkClientTest extends ZooKeeperTestHarness { @@ -71,9 +72,9 @@ class KafkaZkClientTest extends ZooKeeperTestHarness {
override def setUp(): Unit = {
super.setUp()
zkClient.createControllerEpochRaw(1)
otherZkClient = KafkaZkClient(zkConnect, zkAclsEnabled.getOrElse(JaasUtils.isZkSecurityEnabled), zkSessionTimeout,
otherZkClient = KafkaZkClient(zkConnect, zkAclsEnabled.getOrElse(JaasUtils.isZkSaslEnabled), zkSessionTimeout,
zkConnectionTimeout, zkMaxInFlightRequests, Time.SYSTEM)
expiredSessionZkClient = ExpiredKafkaZkClient(zkConnect, zkAclsEnabled.getOrElse(JaasUtils.isZkSecurityEnabled),
expiredSessionZkClient = ExpiredKafkaZkClient(zkConnect, zkAclsEnabled.getOrElse(JaasUtils.isZkSaslEnabled),
zkSessionTimeout, zkConnectionTimeout, zkMaxInFlightRequests, Time.SYSTEM)
}
@ -89,6 +90,31 @@ class KafkaZkClientTest extends ZooKeeperTestHarness { @@ -89,6 +90,31 @@ class KafkaZkClientTest extends ZooKeeperTestHarness {
private val topicPartition = new TopicPartition("topic", 0)
@Test
def testConnectionViaNettyClient(): Unit = {
// Confirm that we can explicitly set client connection configuration, which is necessary for TLS.
// TLS connectivity itself is tested in system tests rather than here to avoid having to add TLS support
// to kafka.zk.EmbeddedZoopeeper
val clientConfig = new ZKClientConfig()
val propKey = KafkaConfig.ZkClientCnxnSocketProp
val propVal = "org.apache.zookeeper.ClientCnxnSocketNetty"
KafkaConfig.setZooKeeperClientProperty(clientConfig, propKey, propVal)
val client = KafkaZkClient(zkConnect, zkAclsEnabled.getOrElse(JaasUtils.isZkSaslEnabled), zkSessionTimeout,
zkConnectionTimeout, zkMaxInFlightRequests, Time.SYSTEM, zkClientConfig = Some(clientConfig))
try {
assertEquals(Some(propVal), KafkaConfig.getZooKeeperClientProperty(client.currentZooKeeper.getClientConfig, propKey))
// For a sanity check, make sure a bad client connection socket class name generates an exception
val badClientConfig = new ZKClientConfig()
KafkaConfig.setZooKeeperClientProperty(badClientConfig, propKey, propVal + "BadClassName")
intercept[Exception] {
KafkaZkClient(zkConnect, zkAclsEnabled.getOrElse(JaasUtils.isZkSaslEnabled), zkSessionTimeout,
zkConnectionTimeout, zkMaxInFlightRequests, Time.SYSTEM, zkClientConfig = Some(badClientConfig))
}
} finally {
client.close()
}
}
@Test
def testSetAndGetConsumerOffset(): Unit = {
val offset = 123L

2
core/src/test/scala/unit/kafka/zk/ZooKeeperTestHarness.scala

@ -55,7 +55,7 @@ abstract class ZooKeeperTestHarness extends Logging { @@ -55,7 +55,7 @@ abstract class ZooKeeperTestHarness extends Logging {
@Before
def setUp(): Unit = {
zookeeper = new EmbeddedZookeeper()
zkClient = KafkaZkClient(zkConnect, zkAclsEnabled.getOrElse(JaasUtils.isZkSecurityEnabled), zkSessionTimeout,
zkClient = KafkaZkClient(zkConnect, zkAclsEnabled.getOrElse(JaasUtils.isZkSaslEnabled), zkSessionTimeout,
zkConnectionTimeout, zkMaxInFlightRequests, Time.SYSTEM)
adminZkClient = new AdminZkClient(zkClient)
}

28
core/src/test/scala/unit/kafka/zookeeper/ZooKeeperClientTest.scala

@ -22,15 +22,16 @@ import java.util.concurrent.atomic.{AtomicBoolean, AtomicInteger} @@ -22,15 +22,16 @@ import java.util.concurrent.atomic.{AtomicBoolean, AtomicInteger}
import java.util.concurrent.{ArrayBlockingQueue, ConcurrentLinkedQueue, CountDownLatch, Executors, Semaphore, TimeUnit}
import scala.collection.Seq
import com.yammer.metrics.Metrics
import com.yammer.metrics.core.{Gauge, Meter, MetricName}
import kafka.server.KafkaConfig
import kafka.zk.ZooKeeperTestHarness
import org.apache.kafka.common.security.JaasUtils
import org.apache.kafka.common.utils.Time
import org.apache.zookeeper.KeeperException.{Code, NoNodeException}
import org.apache.zookeeper.Watcher.Event.{EventType, KeeperState}
import org.apache.zookeeper.ZooKeeper.States
import org.apache.zookeeper.client.ZKClientConfig
import org.apache.zookeeper.{CreateMode, WatchedEvent, ZooDefs}
import org.junit.Assert.{assertArrayEquals, assertEquals, assertFalse, assertTrue}
import org.junit.{After, Before, Test}
@ -99,6 +100,31 @@ class ZooKeeperClientTest extends ZooKeeperTestHarness { @@ -99,6 +100,31 @@ class ZooKeeperClientTest extends ZooKeeperTestHarness {
}
}
@Test
def testConnectionViaNettyClient(): Unit = {
// Confirm that we can explicitly set client connection configuration, which is necessary for TLS.
// TLS connectivity itself is tested in system tests rather than here to avoid having to add TLS support
// to kafka.zk.EmbeddedZoopeeper
val clientConfig = new ZKClientConfig()
val propKey = KafkaConfig.ZkClientCnxnSocketProp
val propVal = "org.apache.zookeeper.ClientCnxnSocketNetty"
KafkaConfig.setZooKeeperClientProperty(clientConfig, propKey, propVal)
val client = new ZooKeeperClient(zkConnect, zkSessionTimeout, zkConnectionTimeout, Int.MaxValue, time, "testMetricGroup",
"testMetricType", None, Some(clientConfig))
try {
assertEquals(Some(propVal), KafkaConfig.getZooKeeperClientProperty(client.getClientConfig, propKey))
// For a sanity check, make sure a bad client connection socket class name generates an exception
val badClientConfig = new ZKClientConfig()
KafkaConfig.setZooKeeperClientProperty(badClientConfig, propKey, propVal + "BadClassName")
intercept[Exception] {
new ZooKeeperClient(zkConnect, zkSessionTimeout, zkConnectionTimeout, Int.MaxValue, time, "testMetricGroup",
"testMetricType", None, Some(badClientConfig))
}
} finally {
client.close()
}
}
@Test
def testDeleteNonExistentZNode(): Unit = {
val deleteResponse = zooKeeperClient.handleRequest(DeleteRequest(mockPath, -1))

143
docs/security.html

@ -1014,8 +1014,8 @@ @@ -1014,8 +1014,8 @@
</ol>
<h3><a id="security_authz" href="#security_authz">7.4 Authorization and ACLs</a></h3>
Kafka ships with a pluggable Authorizer and an out-of-box authorizer implementation that uses zookeeper to store all the acls. The Authorizer is configured by setting <code>authorizer.class.name</code> in server.properties. To enable the out of the box implementation use:
<pre>authorizer.class.name=kafka.security.auth.SimpleAclAuthorizer</pre>
Kafka ships with a pluggable Authorizer and an out-of-box authorizer implementation that uses zookeeper to store all the acls. The Authorizer is configured by setting <tt>authorizer.class.name</tt> in server.properties. To enable the out of the box implementation use:
<pre>authorizer.class.name=kafka.security.authorizer.AclAuthorizer</pre>
Kafka acls are defined in the general format of "Principal P is [Allowed/Denied] Operation O From Host H on any Resource R matching ResourcePattern RP". You can read more about the acl structure in KIP-11 and resource patterns in KIP-290. In order to add, remove or list acls you can use the Kafka authorizer CLI. By default, if no ResourcePatterns match a specific Resource R, then R has no associated acls, and therefore no one other than super users is allowed to access R. If you want to change that behavior, you can include the following in server.properties.
<pre>allow.everyone.if.no.acl.found=true</pre>
One can also add super users in server.properties like the following (note that the delimiter is semicolon since SSL user names may contain comma). Default PrincipalType string "User" is case sensitive.
@ -1097,7 +1097,7 @@ @@ -1097,7 +1097,7 @@
<tr>
<td>--authorizer</td>
<td>Fully qualified class name of the authorizer.</td>
<td>kafka.security.auth.SimpleAclAuthorizer</td>
<td>kafka.security.authorizer.AclAuthorizer</td>
<td>Configuration</td>
</tr>
<tr>
@ -1239,6 +1239,19 @@ @@ -1239,6 +1239,19 @@
<td></td>
<td>Convenience</td>
</tr>
<tr>
<td>--zk-tls-config-file</td>
<td> Identifies the file where ZooKeeper client TLS connectivity properties for the authorizer are defined.
Any properties other than the following (with or without an "authorizer." prefix) are ignored:
zookeeper.clientCnxnSocket, zookeeper.ssl.cipher.suites, zookeeper.ssl.client.enable,
zookeeper.ssl.crl.enable, zookeeper.ssl.enabled.protocols, zookeeper.ssl.endpoint.identification.algorithm,
zookeeper.ssl.keystore.location, zookeeper.ssl.keystore.password, zookeeper.ssl.keystore.type,
zookeeper.ssl.ocsp.enable, zookeeper.ssl.protocol, zookeeper.ssl.truststore.location,
zookeeper.ssl.truststore.password, zookeeper.ssl.truststore.type
</td>
<td></td>
<td>Configuration</td>
</tr>
</tbody></table>
<h4><a id="security_authz_examples" href="#security_authz_examples">Examples</a></h4>
@ -1858,8 +1871,48 @@ @@ -1858,8 +1871,48 @@
<h3><a id="zk_authz" href="#zk_authz">7.6 ZooKeeper Authentication</a></h3>
ZooKeeper supports mutual TLS (mTLS) authentication beginning with the 3.5.x versions.
Kafka supports authenticating to ZooKeeper with SASL and mTLS -- either individually or both together --
beginning with version 2.5. See
<a href="https://cwiki.apache.org/confluence/display/KAFKA/KIP-515%3A+Enable+ZK+client+to+use+the+new+TLS+supported+authentication">KIP-515: Enable ZK client to use the new TLS supported authentication</a>
for more details.
<p>When using mTLS alone, every broker and any CLI tools (such as the <a href="#zk_authz_migration">ZooKeeper Security Migration Tool</a>)
should identify itself with the same Distinguished Name (DN) because it is the DN that is ACL'ed.
This can be changed as described below, but it involves writing and deploying a custom ZooKeeper authentication provider.
Generally each certificate should have the same DN but a different Subject Alternative Name (SAN)
so that hostname verification of the brokers and any CLI tools by ZooKeeper will succeed.
<p>
When using SASL authentication to ZooKeeper together with mTLS, both the SASL identity and
either the DN that created the znode (i.e. the creating broker's certificate)
or the DN of the Security Migration Tool (if migration was performed after the znode was created)
will be ACL'ed, and all brokers and CLI tools will be authorized even if they all use different DNs
because they will all use the same ACL'ed SASL identity.
It is only when using mTLS authentication alone that all the DNs must match (and SANs become critical --
again, in the absence of writing and deploying a custom ZooKeeper authentication provider as described below).
</p>
Note that ZooKeeper version 3.5.6 <strong>requires</strong> clients that connect via TLS to present their own certificate
(i.e. encryption and mTLS authentication are linked rather than independent of each other in this ZooKeeper version).
There is a ZooKeeper sever-side config <tt>ssl.clientAuth</tt> that is recognized
(case-insensitively: <tt>want</tt>/<tt>need</tt>/<tt>none</tt> are the valid options),
but this config <a href="https://issues.apache.org/jira/browse/ZOOKEEPER-3674">is not acted upon in version 3.5.6</a>.
A future version of ZooKeeper will both recognize and act upon this config and allow clients to connect via a TLS-encrypted connection
without presenting their own certificate -- i.e. encryption and authentication will become independent of each other.
But for now this is not the case.
</p>
<p>
Use the broker properties file to set TLS configs for brokers as described below.
</p>
<p>
Use the <tt>--zk-tls-config-file &lt;file&gt;</tt> option to set TLS configs in the Zookeeper Security Migration Tool.
The <tt>kafka-acls.sh</tt> and <tt>kafka-configs.sh</tt> CLI tools also support the <tt>--zk-tls-config-file &lt;file&gt;</tt> option.
</p>
<p>
Use the <tt>-zk-tls-config-file &lt;file&gt;</tt> option (note the single-dash rather than double-dash)
to set TLS configs for the <tt>zookeeper-shell.sh</tt> CLI tool.
</p>
<h4><a id="zk_authz_new" href="#zk_authz_new">7.6.1 New clusters</a></h4>
To enable ZooKeeper authentication on brokers, there are two necessary steps:
<h5><a id="zk_authz_new_sasl" href="#zk_authz_new_sasl">7.6.1.1 ZooKeeper SASL Authentication</a></h5>
To enable ZooKeeper SASL authentication on brokers, there are two necessary steps:
<ol>
<li> Create a JAAS login file and set the appropriate system property to point to it as described above</li>
<li> Set the configuration property <tt>zookeeper.set.acl</tt> in each broker to true</li>
@ -1867,18 +1920,83 @@ @@ -1867,18 +1920,83 @@
The metadata stored in ZooKeeper for the Kafka cluster is world-readable, but can only be modified by the brokers. The rationale behind this decision is that the data stored in ZooKeeper is not sensitive, but inappropriate manipulation of that data can cause cluster disruption. We also recommend limiting the access to ZooKeeper via network segmentation (only brokers and some admin tools need access to ZooKeeper).
<h5><a id="zk_authz_new_mtls" href="#zk_authz_new_mtls">7.6.1.2 ZooKeeper Mutual TLS Authentication</a></h5>
ZooKeeper mTLS authentication can be enabled with or without SASL authentication. As mentioned above,
when using mTLS alone, every broker and any CLI tools (such as the <a href="#zk_authz_migration">ZooKeeper Security Migration Tool</a>)
must generally identify itself with the same Distinguished Name (DN) because it is the DN that is ACL'ed, which means
each certificate should have an appropriate Subject Alternative Name (SAN) so that
hostname verification of the brokers and any CLI tool by ZooKeeper will succeed.
<p>
It is possible to use something other than the DN for the identity of mTLS clients by writing a class that
extends <tt>org.apache.zookeeper.server.auth.X509AuthenticationProvider</tt> and overrides the method
<tt>protected String getClientId(X509Certificate clientCert)</tt>.
Choose a scheme name and set <tt>authProvider.[scheme]</tt> in ZooKeeper to be the fully-qualified class name
of the custom implementation; then set <tt>ssl.authProvider=[scheme]</tt> to use it.
</p>
Here is a sample (partial) ZooKeeper configuration for enabling TLS authentication.
These configurations are described in the
<a href="https://zookeeper.apache.org/doc/r3.5.6/zookeeperAdmin.html#sc_authOptions">ZooKeeper Admin Guide</a>.
<pre>
secureClientPort=2182
serverCnxnFactory=org.apache.zookeeper.server.NettyServerCnxnFactory
authProvider.x509=org.apache.zookeeper.server.auth.X509AuthenticationProvider
ssl.keyStore.location=/path/to/zk/keystore.jks
ssl.keyStore.password=zk-ks-passwd
ssl.trustStore.location=/path/to/zk/truststore.jks
ssl.trustStore.password=zk-ts-passwd
</pre>
<strong>IMPORTANT</strong>: ZooKeeper does not support setting the key password in the ZooKeeper server keystore
to a value different from the keystore password itself.
Be sure to set the key password to be the same as the keystore password.
Here is a sample (partial) Kafka Broker configuration for connecting to ZooKeeper with mTLS authentication.
These configuration are described above in <a href="#brokerconfigs">Broker Configs</a>.
<pre>
# connect to the ZooKeeper port configured for TLS
zookeeper.connect=zk1:2182,zk2:2182,zk3:2182
# required to use TLS to ZooKeeper (default is false)
zookeeper.ssl.client.enable=true
# required to use TLS to ZooKeeper
zookeeper.clientCnxnSocket=org.apache.zookeeper.ClientCnxnSocketNetty
# define key/trust stores to use TLS to ZooKeeper; ignored unless zookeeper.ssl.client.enable=true
zookeeper.ssl.keystore.location=/path/to/kafka/keystore.jks
zookeeper.ssl.keystore.password=kafka-ks-passwd
zookeeper.ssl.truststore.location=/path/to/kafka/truststore.jks
zookeeper.ssl.truststore.password=kafka-ts-passwd
# tell broker to create ACLs on znodes
zookeeper.set.acl=true
</pre>
<strong>IMPORTANT</strong>: ZooKeeper does not support setting the key password in the ZooKeeper client (i.e. broker) keystore
to a value different from the keystore password itself.
Be sure to set the key password to be the same as the keystore password.
<h4><a id="zk_authz_migration" href="#zk_authz_migration">7.6.2 Migrating clusters</a></h4>
If you are running a version of Kafka that does not support security or simply with security disabled, and you want to make the cluster secure, then you need to execute the following steps to enable ZooKeeper authentication with minimal disruption to your operations:
<ol>
<li>Perform a rolling restart setting the JAAS login file, which enables brokers to authenticate. At the end of the rolling restart, brokers are able to manipulate znodes with strict ACLs, but they will not create znodes with those ACLs</li>
<li>Enable SASL and/or mTLS authentication on ZooKeeper. If enabling mTLS, you would now have both a non-TLS port and a TLS port, like this:
<pre>
clientPort=2181
secureClientPort=2182
serverCnxnFactory=org.apache.zookeeper.server.NettyServerCnxnFactory
authProvider.x509=org.apache.zookeeper.server.auth.X509AuthenticationProvider
ssl.keyStore.location=/path/to/zk/keystore.jks
ssl.keyStore.password=zk-ks-passwd
ssl.trustStore.location=/path/to/zk/truststore.jks
ssl.trustStore.password=zk-ts-passwd
</pre>
</li>
<li>Perform a rolling restart of brokers setting the JAAS login file and/or defining ZooKeeper mutual TLS configurations (including connecting to the TLS-enabled ZooKeeper port) as required, which enables brokers to authenticate to ZooKeeper. At the end of the rolling restart, brokers are able to manipulate znodes with strict ACLs, but they will not create znodes with those ACLs</li>
<li>If you enabled mTLS, disable the non-TLS port in ZooKeeper</li>
<li>Perform a second rolling restart of brokers, this time setting the configuration parameter <tt>zookeeper.set.acl</tt> to true, which enables the use of secure ACLs when creating znodes</li>
<li>Execute the ZkSecurityMigrator tool. To execute the tool, there is this script: <tt>./bin/zookeeper-security-migration.sh</tt> with <tt>zookeeper.acl</tt> set to secure. This tool traverses the corresponding sub-trees changing the ACLs of the znodes</li>
<li>Execute the ZkSecurityMigrator tool. To execute the tool, there is this script: <tt>./bin/zookeeper-security-migration.sh</tt> with <tt>zookeeper.acl</tt> set to secure. This tool traverses the corresponding sub-trees changing the ACLs of the znodes. Use the <code>--zk-tls-config-file &lt;file&gt;</code> option if you enable mTLS.</li>
</ol>
<p>It is also possible to turn off authentication in a secure cluster. To do it, follow these steps:</p>
<ol>
<li>Perform a rolling restart of brokers setting the JAAS login file, which enables brokers to authenticate, but setting <tt>zookeeper.set.acl</tt> to false. At the end of the rolling restart, brokers stop creating znodes with secure ACLs, but are still able to authenticate and manipulate all znodes</li>
<li>Execute the ZkSecurityMigrator tool. To execute the tool, run this script <tt>./bin/zookeeper-security-migration.sh</tt> with <tt>zookeeper.acl</tt> set to unsecure. This tool traverses the corresponding sub-trees changing the ACLs of the znodes</li>
<li>Perform a second rolling restart of brokers, this time omitting the system property that sets the JAAS login file</li>
<li>Perform a rolling restart of brokers setting the JAAS login file and/or defining ZooKeeper mutual TLS configurations, which enables brokers to authenticate, but setting <tt>zookeeper.set.acl</tt> to false. At the end of the rolling restart, brokers stop creating znodes with secure ACLs, but are still able to authenticate and manipulate all znodes</li>
<li>Execute the ZkSecurityMigrator tool. To execute the tool, run this script <tt>./bin/zookeeper-security-migration.sh</tt> with <tt>zookeeper.acl</tt> set to unsecure. This tool traverses the corresponding sub-trees changing the ACLs of the znodes. Use the <code>--zk-tls-config-file &lt;file&gt;</code> option if you need to set TLS configuration.</li></li>
<li>If you are disabling mTLS, enable the non-TLS port in ZooKeeper</li>
<li>Perform a second rolling restart of brokers, this time omitting the system property that sets the JAAS login file and/or removing ZooKeeper mutual TLS configuration (including connecting to the non-TLS-enabled ZooKeeper port) as required</li>
<li>If you are disabling mTLS, disable the TLS port in ZooKeeper</li>
</ol>
Here is an example of how to run the migration tool:
<pre class="brush: bash;">
@ -1889,11 +2007,14 @@ @@ -1889,11 +2007,14 @@
./bin/zookeeper-security-migration.sh --help
</pre>
<h4><a id="zk_authz_ensemble" href="#zk_authz_ensemble">7.6.3 Migrating the ZooKeeper ensemble</a></h4>
It is also necessary to enable authentication on the ZooKeeper ensemble. To do it, we need to perform a rolling restart of the server and set a few properties. Please refer to the ZooKeeper documentation for more detail:
It is also necessary to enable SASL and/or mTLS authentication on the ZooKeeper ensemble. To do it, we need to perform a rolling restart of the server and set a few properties. See above for mTLS information. Please refer to the ZooKeeper documentation for more detail:
<ol>
<li><a href="http://zookeeper.apache.org/doc/r3.5.5/zookeeperProgrammers.html#sc_ZooKeeperAccessControl">Apache ZooKeeper documentation</a></li>
<li><a href="http://zookeeper.apache.org/doc/r3.5.6/zookeeperProgrammers.html#sc_ZooKeeperAccessControl">Apache ZooKeeper documentation</a></li>
<li><a href="https://cwiki.apache.org/confluence/display/ZOOKEEPER/Zookeeper+and+SASL">Apache ZooKeeper wiki</a></li>
</ol>
<h4><a id="zk_authz_quorum" href="#zk_authz_quorum">7.6.4 ZooKeeper Quorum Mutial TLS Authentication</a></h4>
It is possible to enable mTLS authentication between the ZooKeeper servers themselves.
Please refer to the <a href="https://zookeeper.apache.org/doc/r3.5.6/zookeeperAdmin.html#Quorum+TLS">ZooKeeper documentation</a> for more detail.
</script>
<div class="p-security"></div>

5
docs/toc.html

@ -127,8 +127,13 @@ @@ -127,8 +127,13 @@
<li><a href="#zk_authz">7.6 ZooKeeper Authentication</a></li>
<ul>
<li><a href="#zk_authz_new">New Clusters</a></li>
<ul>
<li><a href="#zk_authz_new_sasl">ZooKeeper SASL Authentication</a></li>
<li><a href="#zk_authz_new_mtls">ZooKeeper Mutual TLS Authentication</a></li>
</ul>
<li><a href="#zk_authz_migration">Migrating Clusters</a></li>
<li><a href="#zk_authz_ensemble">Migrating the ZooKeeper Ensemble</a></li>
<li><a href="#zk_authz_quorum">ZooKeeper Quorum Mutual TLS Authentication</a></li>
</ul>
</ul>
</li>

2
tests/kafkatest/services/kafka/config_property.py

@ -39,6 +39,8 @@ LOG_CLEANER_ENABLE = "log.cleaner.enable" @@ -39,6 +39,8 @@ LOG_CLEANER_ENABLE = "log.cleaner.enable"
AUTO_CREATE_TOPICS_ENABLE = "auto.create.topics.enable"
ZOOKEEPER_CONNECT = "zookeeper.connect"
ZOOKEEPER_SSL_CLIENT_ENABLE = "zookeeper.ssl.client.enable"
ZOOKEEPER_CLIENT_CNXN_SOCKET = "zookeeper.clientCnxnSocket"
ZOOKEEPER_CONNECTION_TIMEOUT_MS = "zookeeper.connection.timeout.ms"
INTER_BROKER_PROTOCOL_VERSION = "inter.broker.protocol.version"
MESSAGE_FORMAT_VERSION = "log.message.format.version"

30
tests/kafkatest/services/kafka/kafka.py

@ -96,6 +96,7 @@ class KafkaService(KafkaPathResolverMixin, JmxMixin, Service): @@ -96,6 +96,7 @@ class KafkaService(KafkaPathResolverMixin, JmxMixin, Service):
client_sasl_mechanism=SecurityConfig.SASL_MECHANISM_GSSAPI, interbroker_sasl_mechanism=SecurityConfig.SASL_MECHANISM_GSSAPI,
authorizer_class_name=None, topics=None, version=DEV_BRANCH, jmx_object_names=None,
jmx_attributes=None, zk_connect_timeout=5000, zk_session_timeout=6000, server_prop_overides=None, zk_chroot=None,
zk_client_secure=False,
listener_security_config=ListenerSecurityConfig(), per_node_server_prop_overrides=None, extra_kafka_opts=""):
"""
:param context: test context
@ -113,6 +114,7 @@ class KafkaService(KafkaPathResolverMixin, JmxMixin, Service): @@ -113,6 +114,7 @@ class KafkaService(KafkaPathResolverMixin, JmxMixin, Service):
:param int zk_session_timeout:
:param dict server_prop_overides: overrides for kafka.properties file
:param zk_chroot:
:param bool zk_client_secure: connect to Zookeeper over secure client port (TLS) when True
:param ListenerSecurityConfig listener_security_config: listener config to use
:param dict per_node_server_prop_overrides:
:param str extra_kafka_opts: jvm args to add to KAFKA_OPTS variable
@ -139,6 +141,7 @@ class KafkaService(KafkaPathResolverMixin, JmxMixin, Service): @@ -139,6 +141,7 @@ class KafkaService(KafkaPathResolverMixin, JmxMixin, Service):
self.per_node_server_prop_overrides = per_node_server_prop_overrides
self.log_level = "DEBUG"
self.zk_chroot = zk_chroot
self.zk_client_secure = zk_client_secure
self.listener_security_config = listener_security_config
self.extra_kafka_opts = extra_kafka_opts
@ -206,7 +209,7 @@ class KafkaService(KafkaPathResolverMixin, JmxMixin, Service): @@ -206,7 +209,7 @@ class KafkaService(KafkaPathResolverMixin, JmxMixin, Service):
@property
def security_config(self):
config = SecurityConfig(self.context, self.security_protocol, self.interbroker_listener.security_protocol,
zk_sasl=self.zk.zk_sasl,
zk_sasl=self.zk.zk_sasl, zk_tls=self.zk_client_secure,
client_sasl_mechanism=self.client_sasl_mechanism,
interbroker_sasl_mechanism=self.interbroker_sasl_mechanism,
listener_security_config=self.listener_security_config)
@ -221,7 +224,7 @@ class KafkaService(KafkaPathResolverMixin, JmxMixin, Service): @@ -221,7 +224,7 @@ class KafkaService(KafkaPathResolverMixin, JmxMixin, Service):
def close_port(self, listener_name):
self.port_mappings[listener_name].open = False
def start_minikdc(self, add_principals=""):
def start_minikdc_if_necessary(self, add_principals=""):
if self.security_config.has_sasl:
if self.minikdc is None:
self.minikdc = MiniKdc(self.context, self.nodes, extra_principals = add_principals)
@ -233,10 +236,12 @@ class KafkaService(KafkaPathResolverMixin, JmxMixin, Service): @@ -233,10 +236,12 @@ class KafkaService(KafkaPathResolverMixin, JmxMixin, Service):
return len(self.pids(node)) > 0
def start(self, add_principals=""):
if self.zk_client_secure and not self.zk.client_secure_port:
raise Exception("Unable to start Kafka: TLS to Zookeeper requested but Zookeeper secure port not enabled")
self.open_port(self.security_protocol)
self.interbroker_listener.open = True
self.start_minikdc(add_principals)
self.start_minikdc_if_necessary(add_principals)
self._ensure_zk_chroot()
Service.start(self)
@ -300,6 +305,11 @@ class KafkaService(KafkaPathResolverMixin, JmxMixin, Service): @@ -300,6 +305,11 @@ class KafkaService(KafkaPathResolverMixin, JmxMixin, Service):
override_configs = KafkaConfig(**node.config)
override_configs[config_property.ADVERTISED_HOSTNAME] = node.account.hostname
override_configs[config_property.ZOOKEEPER_CONNECT] = self.zk_connect_setting()
if self.zk_client_secure:
override_configs[config_property.ZOOKEEPER_SSL_CLIENT_ENABLE] = 'true'
override_configs[config_property.ZOOKEEPER_CLIENT_CNXN_SOCKET] = 'org.apache.zookeeper.ClientCnxnSocketNetty'
else:
override_configs[config_property.ZOOKEEPER_SSL_CLIENT_ENABLE] = 'false'
for prop in self.server_prop_overides:
override_configs[prop[0]] = prop[1]
@ -643,14 +653,16 @@ class KafkaService(KafkaPathResolverMixin, JmxMixin, Service): @@ -643,14 +653,16 @@ class KafkaService(KafkaPathResolverMixin, JmxMixin, Service):
return missing
def restart_cluster(self, clean_shutdown=True):
def restart_cluster(self, clean_shutdown=True, timeout_sec=60, after_each_broker_restart=None, *args):
for node in self.nodes:
self.restart_node(node, clean_shutdown=clean_shutdown)
self.restart_node(node, clean_shutdown=clean_shutdown, timeout_sec=timeout_sec)
if after_each_broker_restart is not None:
after_each_broker_restart(*args)
def restart_node(self, node, clean_shutdown=True):
def restart_node(self, node, clean_shutdown=True, timeout_sec=60):
"""Restart the given node."""
self.stop_node(node, clean_shutdown)
self.start_node(node)
self.stop_node(node, clean_shutdown, timeout_sec)
self.start_node(node, timeout_sec)
def isr_idx_list(self, topic, partition=0):
""" Get in-sync replica list the given topic and partition.
@ -777,7 +789,7 @@ class KafkaService(KafkaPathResolverMixin, JmxMixin, Service): @@ -777,7 +789,7 @@ class KafkaService(KafkaPathResolverMixin, JmxMixin, Service):
return output
def zk_connect_setting(self):
return self.zk.connect_setting(self.zk_chroot)
return self.zk.connect_setting(self.zk_chroot, self.zk_client_secure)
def __bootstrap_servers(self, port, validate=True, offline_nodes=[]):
if validate and not port.open:

12
tests/kafkatest/services/kafka/templates/kafka.properties

@ -47,12 +47,22 @@ listener.name.{{ interbroker_listener.name.lower() }}.{{ k }}={{ v }} @@ -47,12 +47,22 @@ listener.name.{{ interbroker_listener.name.lower() }}.{{ k }}={{ v }}
ssl.keystore.location=/mnt/security/test.keystore.jks
ssl.keystore.password=test-ks-passwd
ssl.key.password=test-key-passwd
ssl.key.password=test-ks-passwd
ssl.keystore.type=JKS
ssl.truststore.location=/mnt/security/test.truststore.jks
ssl.truststore.password=test-ts-passwd
ssl.truststore.type=JKS
ssl.endpoint.identification.algorithm=HTTPS
# Zookeeper TLS settings
#
# Note that zookeeper.ssl.client.enable will be set to true or false elsewhere, as appropriate.
# If it is false then these ZK keystore/truststore settings will have no effect. If it is true then
# zookeeper.clientCnxnSocket will also be set elsewhere (to org.apache.zookeeper.ClientCnxnSocketNetty)
zookeeper.ssl.keystore.location=/mnt/security/test.keystore.jks
zookeeper.ssl.keystore.password=test-ks-passwd
zookeeper.ssl.truststore.location=/mnt/security/test.truststore.jks
zookeeper.ssl.truststore.password=test-ts-passwd
#
sasl.mechanism.inter.broker.protocol={{ security_config.interbroker_sasl_mechanism }}
sasl.enabled.mechanisms={{ ",".join(security_config.enabled_sasl_mechanisms) }}
sasl.kerberos.service.name=kafka

28
tests/kafkatest/services/security/security_config.py

@ -13,6 +13,7 @@ @@ -13,6 +13,7 @@
# See the License for the specific language governing permissions and
# limitations under the License.
import logging
import os
import subprocess
from tempfile import mkdtemp
@ -24,7 +25,8 @@ import itertools @@ -24,7 +25,8 @@ import itertools
class SslStores(object):
def __init__(self, local_scratch_dir):
def __init__(self, local_scratch_dir, logger=None):
self.logger = logger
self.ca_crt_path = os.path.join(local_scratch_dir, "test.ca.crt")
self.ca_jks_path = os.path.join(local_scratch_dir, "test.ca.jks")
self.ca_passwd = "test-ca-passwd"
@ -32,7 +34,8 @@ class SslStores(object): @@ -32,7 +34,8 @@ class SslStores(object):
self.truststore_path = os.path.join(local_scratch_dir, "test.truststore.jks")
self.truststore_passwd = "test-ts-passwd"
self.keystore_passwd = "test-ks-passwd"
self.key_passwd = "test-key-passwd"
# Zookeeper TLS (as of v3.5.6) does not support a key password different than the keystore password
self.key_passwd = self.keystore_passwd
# Allow upto one hour of clock skew between host and VMs
self.startdate = "-1H"
@ -72,6 +75,17 @@ class SslStores(object): @@ -72,6 +75,17 @@ class SslStores(object):
self.runcmd("keytool -importcert -keystore %s -storepass %s -storetype JKS -alias ca -file %s -noprompt" % (ks_path, self.keystore_passwd, self.ca_crt_path))
self.runcmd("keytool -importcert -keystore %s -storepass %s -storetype JKS -keypass %s -alias kafka -file %s -noprompt" % (ks_path, self.keystore_passwd, self.key_passwd, crt_path))
node.account.copy_to(ks_path, SecurityConfig.KEYSTORE_PATH)
# also generate ZooKeeper client TLS config file for mutual authentication use case
str = """zookeeper.clientCnxnSocket=org.apache.zookeeper.ClientCnxnSocketNetty
zookeeper.ssl.client.enable=true
zookeeper.ssl.truststore.location=%s
zookeeper.ssl.truststore.password=%s
zookeeper.ssl.keystore.location=%s
zookeeper.ssl.keystore.password=%s
""" % (SecurityConfig.TRUSTSTORE_PATH, self.truststore_passwd, SecurityConfig.KEYSTORE_PATH, self.keystore_passwd)
node.account.create_file(SecurityConfig.ZK_CLIENT_MUTUAL_AUTH_CONFIG_PATH, str)
rmtree(ks_dir)
def hostname(self, node):
@ -80,6 +94,8 @@ class SslStores(object): @@ -80,6 +94,8 @@ class SslStores(object):
return node.account.hostname
def runcmd(self, cmd):
if self.logger:
self.logger.log(logging.DEBUG, cmd)
proc = subprocess.Popen(cmd, shell=True, stdout=subprocess.PIPE, stderr=subprocess.STDOUT)
stdout, stderr = proc.communicate()
@ -104,6 +120,7 @@ class SecurityConfig(TemplateRenderer): @@ -104,6 +120,7 @@ class SecurityConfig(TemplateRenderer):
CONFIG_DIR = "/mnt/security"
KEYSTORE_PATH = "/mnt/security/test.keystore.jks"
TRUSTSTORE_PATH = "/mnt/security/test.truststore.jks"
ZK_CLIENT_MUTUAL_AUTH_CONFIG_PATH = "/mnt/security/zk_client_mutual_auth_config.properties"
JAAS_CONF_PATH = "/mnt/security/jaas.conf"
KRB5CONF_PATH = "/mnt/security/krb5.conf"
KEYTAB_PATH = "/mnt/security/keytab"
@ -113,7 +130,7 @@ class SecurityConfig(TemplateRenderer): @@ -113,7 +130,7 @@ class SecurityConfig(TemplateRenderer):
def __init__(self, context, security_protocol=None, interbroker_security_protocol=None,
client_sasl_mechanism=SASL_MECHANISM_GSSAPI, interbroker_sasl_mechanism=SASL_MECHANISM_GSSAPI,
zk_sasl=False, template_props="", static_jaas_conf=True, jaas_override_variables=None,
zk_sasl=False, zk_tls=False, template_props="", static_jaas_conf=True, jaas_override_variables=None,
listener_security_config=ListenerSecurityConfig()):
"""
Initialize the security properties for the node and copy
@ -128,7 +145,7 @@ class SecurityConfig(TemplateRenderer): @@ -128,7 +145,7 @@ class SecurityConfig(TemplateRenderer):
# This generates keystore/trustore files in a local scratch directory which gets
# automatically destroyed after the test is run
# Creating within the scratch directory allows us to run tests in parallel without fear of collision
SecurityConfig.ssl_stores = SslStores(context.local_scratch_dir)
SecurityConfig.ssl_stores = SslStores(context.local_scratch_dir, context.logger)
SecurityConfig.ssl_stores.generate_ca()
SecurityConfig.ssl_stores.generate_truststore()
@ -143,8 +160,9 @@ class SecurityConfig(TemplateRenderer): @@ -143,8 +160,9 @@ class SecurityConfig(TemplateRenderer):
interbroker_security_protocol = security_protocol
self.interbroker_security_protocol = interbroker_security_protocol
self.has_sasl = self.is_sasl(security_protocol) or self.is_sasl(interbroker_security_protocol) or zk_sasl
self.has_ssl = self.is_ssl(security_protocol) or self.is_ssl(interbroker_security_protocol)
self.has_ssl = self.is_ssl(security_protocol) or self.is_ssl(interbroker_security_protocol) or zk_tls
self.zk_sasl = zk_sasl
self.zk_tls = zk_tls
self.static_jaas_conf = static_jaas_conf
self.listener_security_config = listener_security_config
self.properties = {

13
tests/kafkatest/services/templates/zookeeper.properties

@ -14,7 +14,20 @@ @@ -14,7 +14,20 @@
# limitations under the License.
dataDir=/mnt/zookeeper/data
{% if zk_client_port %}
clientPort=2181
{% endif %}
{% if zk_client_secure_port %}
secureClientPort=2182
serverCnxnFactory=org.apache.zookeeper.server.NettyServerCnxnFactory
authProvider.x509=org.apache.zookeeper.server.auth.X509AuthenticationProvider
ssl.keyStore.location=/mnt/security/test.keystore.jks
ssl.keyStore.password=test-ks-passwd
ssl.keyStore.type=JKS
ssl.trustStore.location=/mnt/security/test.truststore.jks
ssl.trustStore.password=test-ts-passwd
ssl.trustStore.type=JKS
{% endif %}
maxClientCnxns=0
initLimit=5
syncLimit=2

83
tests/kafkatest/services/zookeeper.py

@ -44,21 +44,25 @@ class ZookeeperService(KafkaPathResolverMixin, Service): @@ -44,21 +44,25 @@ class ZookeeperService(KafkaPathResolverMixin, Service):
"collect_default": True}
}
def __init__(self, context, num_nodes, zk_sasl = False):
def __init__(self, context, num_nodes, zk_sasl = False, zk_client_port = True, zk_client_secure_port = False):
"""
:type context
"""
self.kafka_opts = ""
self.zk_sasl = zk_sasl
if not zk_client_port and not zk_client_secure_port:
raise Exception("Cannot disable both ZK clientPort and clientSecurePort")
self.zk_client_port = zk_client_port
self.zk_client_secure_port = zk_client_secure_port
super(ZookeeperService, self).__init__(context, num_nodes)
@property
def security_config(self):
return SecurityConfig(self.context, zk_sasl=self.zk_sasl)
return SecurityConfig(self.context, zk_sasl=self.zk_sasl, zk_tls=self.zk_client_secure_port)
@property
def security_system_properties(self):
return "-Dzookeeper.authProvider.1=org.apache.zookeeper.server.auth.SASLAuthenticationProvider " \
return "-Dzookeeper.authProvider.sasl=org.apache.zookeeper.server.auth.SASLAuthenticationProvider " \
"-DjaasLoginRenew=3600000 " \
"-Djava.security.auth.login.config=%s " \
"-Djava.security.krb5.conf=%s " % (self.security_config.JAAS_CONF_PATH, self.security_config.KRB5CONF_PATH)
@ -67,6 +71,15 @@ class ZookeeperService(KafkaPathResolverMixin, Service): @@ -67,6 +71,15 @@ class ZookeeperService(KafkaPathResolverMixin, Service):
def zk_principals(self):
return " zkclient " + ' '.join(['zookeeper/' + zk_node.account.hostname for zk_node in self.nodes])
def restart_cluster(self):
for node in self.nodes:
self.restart_node(node)
def restart_node(self, node):
"""Restart the given node."""
self.stop_node(node)
self.start_node(node)
def start_node(self, node):
idx = self.idx(node)
self.logger.info("Starting ZK node %d on %s", idx, node.account.hostname)
@ -92,9 +105,10 @@ class ZookeeperService(KafkaPathResolverMixin, Service): @@ -92,9 +105,10 @@ class ZookeeperService(KafkaPathResolverMixin, Service):
def listening(self, node):
try:
cmd = "nc -z %s %s" % (node.account.hostname, 2181)
port = 2181 if self.zk_client_port else 2182
cmd = "nc -z %s %s" % (node.account.hostname, port)
node.account.ssh_output(cmd, allow_fail=False)
self.logger.debug("Zookeeper started accepting connections at: '%s:%s')", node.account.hostname, 2181)
self.logger.debug("Zookeeper started accepting connections at: '%s:%s')", node.account.hostname, port)
return True
except (RemoteCommandError, ValueError) as e:
return False
@ -124,20 +138,27 @@ class ZookeeperService(KafkaPathResolverMixin, Service): @@ -124,20 +138,27 @@ class ZookeeperService(KafkaPathResolverMixin, Service):
node.account.ssh("rm -rf -- %s" % ZookeeperService.ROOT, allow_fail=False)
def connect_setting(self, chroot=None):
# force_tls is a necessary option for the case where we define both encrypted and non-encrypted ports
def connect_setting(self, chroot=None, force_tls=False):
if chroot and not chroot.startswith("/"):
raise Exception("ZK chroot must start with '/', invalid chroot: %s" % chroot)
chroot = '' if chroot is None else chroot
return ','.join([node.account.hostname + ':2181' + chroot for node in self.nodes])
return ','.join([node.account.hostname + (':2182' if not self.zk_client_port or force_tls else ':2181') + chroot
for node in self.nodes])
#
# This call is used to simulate a rolling upgrade to enable/disable
# the use of ZooKeeper ACLs.
#
def zookeeper_migration(self, node, zk_acl):
la_migra_cmd = "%s --zookeeper.acl=%s --zookeeper.connect=%s" % \
(self.path.script("zookeeper-security-migration.sh", node), zk_acl, self.connect_setting())
la_migra_cmd = "export KAFKA_OPTS=\"%s\";" % \
self.security_system_properties if self.security_config.zk_sasl else ""
la_migra_cmd += "%s --zookeeper.acl=%s --zookeeper.connect=%s %s" % \
(self.path.script("zookeeper-security-migration.sh", node), zk_acl,
self.connect_setting(force_tls=self.zk_client_secure_port),
"--zk-tls-config-file=" + SecurityConfig.ZK_CLIENT_MUTUAL_AUTH_CONFIG_PATH \
if self.zk_client_secure_port else "")
node.account.ssh(la_migra_cmd)
def _check_chroot(self, chroot):
@ -153,8 +174,10 @@ class ZookeeperService(KafkaPathResolverMixin, Service): @@ -153,8 +174,10 @@ class ZookeeperService(KafkaPathResolverMixin, Service):
chroot_path = ('' if chroot is None else chroot) + path
kafka_run_class = self.path.script("kafka-run-class.sh", DEV_BRANCH)
cmd = "%s %s -server %s get %s" % \
(kafka_run_class, self.java_cli_class_name(), self.connect_setting(), chroot_path)
cmd = "%s %s -server %s %s get %s" % \
(kafka_run_class, self.java_cli_class_name(), self.connect_setting(force_tls=self.zk_client_secure_port),
"-zk-tls-config-file " + SecurityConfig.ZK_CLIENT_MUTUAL_AUTH_CONFIG_PATH if self.zk_client_secure_port else "",
chroot_path)
self.logger.debug(cmd)
node = self.nodes[0]
@ -167,7 +190,7 @@ class ZookeeperService(KafkaPathResolverMixin, Service): @@ -167,7 +190,7 @@ class ZookeeperService(KafkaPathResolverMixin, Service):
result = match.groups()[0]
return result
def create(self, path, chroot=None):
def create(self, path, chroot=None, value=""):
"""
Create an znode at the given path
"""
@ -176,8 +199,38 @@ class ZookeeperService(KafkaPathResolverMixin, Service): @@ -176,8 +199,38 @@ class ZookeeperService(KafkaPathResolverMixin, Service):
chroot_path = ('' if chroot is None else chroot) + path
kafka_run_class = self.path.script("kafka-run-class.sh", DEV_BRANCH)
cmd = "%s %s -server %s create %s ''" % \
(kafka_run_class, self.java_cli_class_name(), self.connect_setting(), chroot_path)
cmd = "%s %s -server %s %s create %s '%s'" % \
(kafka_run_class, self.java_cli_class_name(), self.connect_setting(force_tls=self.zk_client_secure_port),
"-zk-tls-config-file " + SecurityConfig.ZK_CLIENT_MUTUAL_AUTH_CONFIG_PATH if self.zk_client_secure_port else "",
chroot_path, value)
self.logger.debug(cmd)
output = self.nodes[0].account.ssh_output(cmd)
self.logger.debug(output)
def describe(self, topic):
"""
Describe the given topic using the ConfigCommand CLI
"""
kafka_run_class = self.path.script("kafka-run-class.sh", DEV_BRANCH)
cmd = "%s kafka.admin.ConfigCommand --zookeeper %s %s --describe --topic %s" % \
(kafka_run_class, self.connect_setting(force_tls=self.zk_client_secure_port),
"--zk-tls-config-file " + SecurityConfig.ZK_CLIENT_MUTUAL_AUTH_CONFIG_PATH if self.zk_client_secure_port else "",
topic)
self.logger.debug(cmd)
output = self.nodes[0].account.ssh_output(cmd)
self.logger.debug(output)
def list_acls(self, topic):
"""
List ACLs for the given topic using the AclCommand CLI
"""
kafka_run_class = self.path.script("kafka-run-class.sh", DEV_BRANCH)
cmd = "%s kafka.admin.AclCommand --authorizer-properties zookeeper.connect=%s %s --list --topic %s" % \
(kafka_run_class, self.connect_setting(force_tls=self.zk_client_secure_port),
"--zk-tls-config-file " + SecurityConfig.ZK_CLIENT_MUTUAL_AUTH_CONFIG_PATH if self.zk_client_secure_port else "",
topic)
self.logger.debug(cmd)
output = self.nodes[0].account.ssh_output(cmd)
self.logger.debug(output)
@ -188,4 +241,4 @@ class ZookeeperService(KafkaPathResolverMixin, Service): @@ -188,4 +241,4 @@ class ZookeeperService(KafkaPathResolverMixin, Service):
def java_cli_class_name(self):
""" The class name of the Zookeeper tool within Kafka. """
return "org.apache.zookeeper.ZooKeeperMain"
return "org.apache.zookeeper.ZooKeeperMainWithTlsSupportForKafka"

11
tests/kafkatest/tests/core/security_rolling_upgrade_test.py

@ -58,11 +58,8 @@ class TestSecurityRollingUpgrade(ProduceConsumeValidateTest): @@ -58,11 +58,8 @@ class TestSecurityRollingUpgrade(ProduceConsumeValidateTest):
self.consumer.group_id = "group"
def bounce(self):
self.kafka.start_minikdc()
for node in self.kafka.nodes:
self.kafka.stop_node(node)
self.kafka.start_node(node)
time.sleep(10)
self.kafka.start_minikdc_if_necessary()
self.kafka.restart_cluster(after_each_broker_restart = lambda: time.sleep(10))
def roll_in_secured_settings(self, client_protocol, broker_protocol):
# Roll cluster to include inter broker security protocol.
@ -82,12 +79,12 @@ class TestSecurityRollingUpgrade(ProduceConsumeValidateTest): @@ -82,12 +79,12 @@ class TestSecurityRollingUpgrade(ProduceConsumeValidateTest):
def open_secured_port(self, client_protocol):
self.kafka.security_protocol = client_protocol
self.kafka.open_port(client_protocol)
self.kafka.start_minikdc()
self.kafka.start_minikdc_if_necessary()
self.bounce()
def add_sasl_mechanism(self, new_client_sasl_mechanism):
self.kafka.client_sasl_mechanism = new_client_sasl_mechanism
self.kafka.start_minikdc()
self.kafka.start_minikdc_if_necessary()
self.bounce()
def roll_in_sasl_mechanism(self, security_protocol, new_sasl_mechanism):

15
tests/kafkatest/tests/core/zookeeper_security_upgrade_test.py

@ -69,19 +69,14 @@ class ZooKeeperSecurityUpgradeTest(ProduceConsumeValidateTest): @@ -69,19 +69,14 @@ class ZooKeeperSecurityUpgradeTest(ProduceConsumeValidateTest):
def run_zk_migration(self):
# change zk config (auth provider + jaas login)
self.zk.kafka_opts = self.zk.security_system_properties
self.zk.zk_sasl = True
if self.no_sasl:
self.kafka.start_minikdc(self.zk.zk_principals)
self.kafka.start_minikdc_if_necessary(self.zk.zk_principals)
# restart zk
for node in self.zk.nodes:
self.zk.stop_node(node)
self.zk.start_node(node)
self.zk.restart_cluster()
# restart broker with jaas login
for node in self.kafka.nodes:
self.kafka.stop_node(node)
self.kafka.start_node(node)
self.kafka.restart_cluster()
# run migration tool
for node in self.zk.nodes:
@ -89,9 +84,7 @@ class ZooKeeperSecurityUpgradeTest(ProduceConsumeValidateTest): @@ -89,9 +84,7 @@ class ZooKeeperSecurityUpgradeTest(ProduceConsumeValidateTest):
# restart broker with zookeeper.set.acl=true and acls
self.kafka.zk_set_acl = True
for node in self.kafka.nodes:
self.kafka.stop_node(node)
self.kafka.start_node(node)
self.kafka.restart_cluster()
@cluster(num_nodes=9)
@matrix(security_protocol=["PLAINTEXT", "SSL", "SASL_SSL", "SASL_PLAINTEXT"])

156
tests/kafkatest/tests/core/zookeeper_tls_test.py

@ -0,0 +1,156 @@ @@ -0,0 +1,156 @@
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from ducktape.mark import matrix, ignore
from ducktape.mark.resource import cluster
from kafkatest.services.zookeeper import ZookeeperService
from kafkatest.services.kafka import KafkaService
from kafkatest.services.verifiable_producer import VerifiableProducer
from kafkatest.services.console_consumer import ConsoleConsumer
from kafkatest.tests.produce_consume_validate import ProduceConsumeValidateTest
from kafkatest.utils import is_int
import logging
class ZookeeperTlsTest(ProduceConsumeValidateTest):
"""Tests TLS connectivity to zookeeper.
"""
def __init__(self, test_context):
super(ZookeeperTlsTest, self).__init__(test_context=test_context)
def setUp(self):
self.topic = "test_topic"
self.group = "group"
self.producer_throughput = 100
self.num_producers = 1
self.num_consumers = 1
self.zk = ZookeeperService(self.test_context, num_nodes=3)
self.kafka = KafkaService(self.test_context, num_nodes=3, zk=self.zk, topics={self.topic: {
"partitions": 3,
"replication-factor": 3,
'configs': {"min.insync.replicas": 2}}})
def create_producer_and_consumer(self):
self.producer = VerifiableProducer(
self.test_context, self.num_producers, self.kafka, self.topic,
throughput=self.producer_throughput)
self.consumer = ConsoleConsumer(
self.test_context, self.num_consumers, self.kafka, self.topic,
consumer_timeout_ms=60000, message_validator=is_int)
self.consumer.group_id = self.group
def perform_produce_consume_validation(self):
self.create_producer_and_consumer()
self.run_produce_consume_validate()
self.producer.free()
self.consumer.free()
def enable_zk_tls(self):
self.test_context.logger.debug("Enabling the TLS port in Zookeeper (we won't use it from Kafka yet)")
# change zk config (enable TLS, but also keep non-TLS)
self.zk.zk_client_secure_port = True
self.zk.restart_cluster()
# bounce a Kafka broker -- allows us to detect a broker restart failure as a simple sanity check
self.kafka.stop_node(self.kafka.nodes[0])
self.kafka.start_node(self.kafka.nodes[0])
def enable_kafka_zk_tls(self):
self.test_context.logger.debug("Configuring Kafka to use the TLS port in Zookeeper")
# change Kafka config (enable TLS to Zookeeper) and restart the Kafka cluster
self.kafka.zk_client_secure = True
self.kafka.restart_cluster()
def disable_zk_non_tls(self):
self.test_context.logger.debug("Disabling the non-TLS port in Zookeeper (as a simple sanity check)")
# change zk config (disable non-TLS, keep TLS) and restart the ZooKeeper cluster
self.zk.zk_client_port = False
self.zk.restart_cluster()
# bounce a Kafka broker -- allows us to detect a broker restart failure as a simple sanity check
self.kafka.stop_node(self.kafka.nodes[0])
self.kafka.start_node(self.kafka.nodes[0])
@cluster(num_nodes=9)
def test_zk_tls(self):
self.zk.start()
self.kafka.security_protocol = self.kafka.interbroker_security_protocol = "PLAINTEXT"
self.kafka.start()
# Enable TLS port in Zookeeper in addition to the regular non-TLS port
# Bounces the ZooKeeper cluster (and a single broker as a sanity check)
self.enable_zk_tls()
# Leverage ZooKeeper TLS port in Kafka
# Bounces the Kafka cluster
self.enable_kafka_zk_tls()
self.perform_produce_consume_validation()
# Disable ZooKeeper non-TLS port to make sure we aren't using it
# Bounces the ZooKeeper cluster (and a single broker as a sanity check)
self.disable_zk_non_tls()
# Make sure the ZooKeeper command line is able to talk to a TLS-enabled ZooKeeper quorum
# Test both create() and query(), each of which leverages the ZooKeeper command line
# This tests the code in org.apache.zookeeper.ZooKeeperMainWithTlsSupportForKafka
path="/foo"
value="{\"bar\": 0}"
self.zk.create(path, value=value)
if self.zk.query(path) != value:
raise Exception("Error creating and then querying a znode using the CLI with a TLS-enabled ZooKeeper quorum")
# Make sure the ConfigCommand CLI is able to talk to a TLS-enabled ZooKeeper quorum
# This is necessary for the bootstrap use case despite direct ZooKeeper connectivity being deprecated
self.zk.describe(self.topic)
# Make sure the AclCommand CLI is able to talk to a TLS-enabled ZooKeeper quorum
# This is necessary for the bootstrap use case despite direct ZooKeeper connectivity being deprecated
self.zk.list_acls(self.topic)
#
# Test zookeeper.set.acl with just TLS mutual authentication (no SASL)
#
# Step 1: run migration tool
self.zk.zookeeper_migration(self.zk.nodes[0], "secure")
# Step 2: restart brokers with zookeeper.set.acl=true and acls (with TLS but no SASL)
self.kafka.zk_set_acl = True
self.kafka.restart_cluster()
self.perform_produce_consume_validation()
#
# Test zookeeper.set.acl with both SASL and TLS mutual authentication
#
# Step 1: remove ACLs created previously
self.kafka.zk_set_acl = False
self.kafka.restart_cluster()
self.zk.zookeeper_migration(self.zk.nodes[0], "unsecure")
# Step 2: enable ZooKeeper SASL authentication, but don't take advantage of it in Kafka yet
self.zk.zk_sasl = True
self.kafka.start_minikdc_if_necessary(self.zk.zk_principals)
self.zk.restart_cluster()
# bounce a Kafka broker -- allows us to detect a broker restart failure as a simple sanity check
self.kafka.stop_node(self.kafka.nodes[0])
self.kafka.start_node(self.kafka.nodes[0])
# Step 3: run migration tool
self.zk.zookeeper_migration(self.zk.nodes[0], "secure")
# Step 4: restart brokers with zookeeper.set.acl=true and acls (with both TLS and SASL)
self.kafka.zk_set_acl = True
self.kafka.restart_cluster()
self.perform_produce_consume_validation()
Loading…
Cancel
Save