From 72df28fe8c0ca4c8e552c8cfb3f907b9969f47d2 Mon Sep 17 00:00:00 2001 From: huxihx Date: Tue, 10 Dec 2019 13:52:54 +0530 Subject: [PATCH] KAFKA-9025: Add a option for path existence check in ZkSecurityMigrator https://issues.apache.org/jira/browse/KAFKA-9025 If a chroot is configured, ZkSecurityMigrator should prompt a confirm to user to ensure whether chroot is specified correctly. Author: huxihx Author: huxi Reviewers: Manikumar Reddy Closes #7618 from huxihx/KAFKA-9025 --- .../kafka/admin/ZkSecurityMigrator.scala | 22 ++++++++++++++++--- 1 file changed, 19 insertions(+), 3 deletions(-) diff --git a/core/src/main/scala/kafka/admin/ZkSecurityMigrator.scala b/core/src/main/scala/kafka/admin/ZkSecurityMigrator.scala index 81fb8997087..b4096dfaed9 100644 --- a/core/src/main/scala/kafka/admin/ZkSecurityMigrator.scala +++ b/core/src/main/scala/kafka/admin/ZkSecurityMigrator.scala @@ -17,7 +17,7 @@ package kafka.admin -import kafka.utils.{CommandDefaultOptions, CommandLineUtils, Logging} +import kafka.utils.{CommandDefaultOptions, CommandLineUtils, Exit, Logging} import kafka.zk.{ControllerZNode, KafkaZkClient, ZkData, ZkSecurityMigratorUtils} import org.apache.kafka.common.security.JaasUtils import org.apache.kafka.common.utils.Time @@ -95,8 +95,9 @@ object ZkSecurityMigrator extends Logging { val zkConnectionTimeout = opts.options.valueOf(opts.zkConnectionTimeoutOpt).intValue val zkClient = KafkaZkClient(zkUrl, zkAcl, zkSessionTimeout, zkConnectionTimeout, Int.MaxValue, Time.SYSTEM) + val enablePathCheck = opts.options.has(opts.enablePathCheckOpt) val migrator = new ZkSecurityMigrator(zkClient) - migrator.run() + migrator.run(enablePathCheck) } def main(args: Array[String]): Unit = { @@ -118,6 +119,8 @@ object ZkSecurityMigrator extends Logging { withRequiredArg().ofType(classOf[java.lang.Integer]).defaultsTo(30000) val zkConnectionTimeoutOpt = parser.accepts("zookeeper.connection.timeout", "Sets the ZooKeeper connection timeout."). withRequiredArg().ofType(classOf[java.lang.Integer]).defaultsTo(30000) + val enablePathCheckOpt = parser.accepts("enable.path.check", "Checks if all the root paths exist in ZooKeeper " + + "before migration. If not, exit the command.") options = parser.parse(args : _*) } } @@ -218,9 +221,10 @@ class ZkSecurityMigrator(zkClient: KafkaZkClient) extends Logging { } } - private def run(): Unit = { + private def run(enablePathCheck: Boolean): Unit = { try { setAclIndividually("/") + checkPathExistenceAndMaybeExit(enablePathCheck) for (path <- ZkData.SecureRootPaths) { debug("Going to set ACL for %s".format(path)) if (path == ControllerZNode.path && !zkClient.pathExists(path)) { @@ -250,4 +254,16 @@ class ZkSecurityMigrator(zkClient: KafkaZkClient) extends Logging { zkClient.close } } + + private def checkPathExistenceAndMaybeExit(enablePathCheck: Boolean): Unit = { + val nonExistingSecureRootPaths = ZkData.SecureRootPaths.filterNot(zkClient.pathExists) + if (nonExistingSecureRootPaths.nonEmpty) { + println(s"Warning: The following secure root paths do not exist in ZooKeeper: ${nonExistingSecureRootPaths.mkString(",")}") + println("That might be due to an incorrect chroot is specified when executing the command.") + if (enablePathCheck) { + println("Exit the command.") + Exit.exit(0) + } + } + } }