diff --git a/core/src/main/scala/kafka/utils/ZkUtils.scala b/core/src/main/scala/kafka/utils/ZkUtils.scala index 1e9cc6e9d62..97a59951750 100644 --- a/core/src/main/scala/kafka/utils/ZkUtils.scala +++ b/core/src/main/scala/kafka/utils/ZkUtils.scala @@ -17,7 +17,6 @@ package kafka.utils -import java.util.Collections import java.util.concurrent.CountDownLatch import kafka.admin._ @@ -42,6 +41,7 @@ import scala.collection.JavaConverters._ object ZkUtils { + private val UseDefaultAcls = new java.util.ArrayList[ACL] // Important: it is necessary to add any new top level Zookeeper path here val AdminPath = "/admin" @@ -107,7 +107,7 @@ object ZkUtils { } def sensitivePath(path: String): Boolean = { - path != null && !SensitiveZkRootPaths.forall(!path.startsWith(_)) + path != null && SensitiveZkRootPaths.exists(path.startsWith(_)) } @deprecated("This is deprecated, use defaultAcls(isSecure, path) which doesn't make sensitive data world readable", since = "0.10.2.1") @@ -235,10 +235,11 @@ class ZkUtils(val zkClient: ZkClient, IsrChangeNotificationPath, PidBlockPath) + import ZkUtils._ + @deprecated("This is deprecated, use defaultAcls(path) which doesn't make sensitive data world readable", since = "0.10.2.1") val DefaultAcls: java.util.List[ACL] = ZkUtils.defaultAcls(isSecure, "") - private val useDefaultAcl = Collections.emptyList[ACL] def defaultAcls(path: String): java.util.List[ACL] = ZkUtils.defaultAcls(isSecure, path) def getController(): Int = { @@ -432,11 +433,11 @@ class ZkUtils(val zkClient: ZkClient, /** * make sure a persistent path exists in ZK. Create the path if not exist. */ - def makeSurePersistentPathExists(path: String, acls: java.util.List[ACL] = useDefaultAcl) { + def makeSurePersistentPathExists(path: String, acls: java.util.List[ACL] = UseDefaultAcls) { //Consumer path is kept open as different consumers will write under this node. val acl = if (path == null || path.isEmpty || path.equals(ConsumersPath)) { ZooDefs.Ids.OPEN_ACL_UNSAFE - } else if (acls == useDefaultAcl) { + } else if (acls eq UseDefaultAcls) { ZkUtils.defaultAcls(isSecure, path) } else { acls @@ -449,8 +450,8 @@ class ZkUtils(val zkClient: ZkClient, /** * create the parent path */ - private def createParentPath(path: String, acls: java.util.List[ACL] = useDefaultAcl): Unit = { - val acl = if (acls == useDefaultAcl) ZkUtils.defaultAcls(isSecure, path) else acls + private def createParentPath(path: String, acls: java.util.List[ACL] = UseDefaultAcls): Unit = { + val acl = if (acls eq UseDefaultAcls) ZkUtils.defaultAcls(isSecure, path) else acls val parentDir = path.substring(0, path.lastIndexOf('/')) if (parentDir.length != 0) { ZkPath.createPersistent(zkClient, parentDir, createParents = true, acl) @@ -460,8 +461,8 @@ class ZkUtils(val zkClient: ZkClient, /** * Create an ephemeral node with the given path and data. Create parents if necessary. */ - private def createEphemeralPath(path: String, data: String, acls: java.util.List[ACL] = useDefaultAcl): Unit = { - val acl = if (acls == useDefaultAcl) ZkUtils.defaultAcls(isSecure, path) else acls + private def createEphemeralPath(path: String, data: String, acls: java.util.List[ACL] = UseDefaultAcls): Unit = { + val acl = if (acls eq UseDefaultAcls) ZkUtils.defaultAcls(isSecure, path) else acls try { ZkPath.createEphemeral(zkClient, path, data, acl) } catch { @@ -475,8 +476,8 @@ class ZkUtils(val zkClient: ZkClient, * Create an ephemeral node with the given path and data. * Throw NodeExistException if node already exists. */ - def createEphemeralPathExpectConflict(path: String, data: String, acls: java.util.List[ACL] = useDefaultAcl): Unit = { - val acl = if (acls == useDefaultAcl) ZkUtils.defaultAcls(isSecure, path) else acls + def createEphemeralPathExpectConflict(path: String, data: String, acls: java.util.List[ACL] = UseDefaultAcls): Unit = { + val acl = if (acls eq UseDefaultAcls) ZkUtils.defaultAcls(isSecure, path) else acls try { createEphemeralPath(path, data, acl) } catch { @@ -501,8 +502,8 @@ class ZkUtils(val zkClient: ZkClient, /** * Create a persistent node with the given path and data. Create parents if necessary. */ - def createPersistentPath(path: String, data: String = "", acls: java.util.List[ACL] = useDefaultAcl): Unit = { - val acl = if (acls == useDefaultAcl) ZkUtils.defaultAcls(isSecure, path) else acls + def createPersistentPath(path: String, data: String = "", acls: java.util.List[ACL] = UseDefaultAcls): Unit = { + val acl = if (acls eq UseDefaultAcls) ZkUtils.defaultAcls(isSecure, path) else acls try { ZkPath.createPersistent(zkClient, path, data, acl) } catch { @@ -512,8 +513,8 @@ class ZkUtils(val zkClient: ZkClient, } } - def createSequentialPersistentPath(path: String, data: String = "", acls: java.util.List[ACL] = useDefaultAcl): String = { - val acl = if (acls == useDefaultAcl) ZkUtils.defaultAcls(isSecure, path) else acls + def createSequentialPersistentPath(path: String, data: String = "", acls: java.util.List[ACL] = UseDefaultAcls): String = { + val acl = if (acls eq UseDefaultAcls) ZkUtils.defaultAcls(isSecure, path) else acls ZkPath.createPersistentSequential(zkClient, path, data, acl) } @@ -522,8 +523,8 @@ class ZkUtils(val zkClient: ZkClient, * create parent directory if necessary. Never throw NodeExistException. * Return the updated path zkVersion */ - def updatePersistentPath(path: String, data: String, acls: java.util.List[ACL] = useDefaultAcl) = { - val acl = if (acls == useDefaultAcl) ZkUtils.defaultAcls(isSecure, path) else acls + def updatePersistentPath(path: String, data: String, acls: java.util.List[ACL] = UseDefaultAcls) = { + val acl = if (acls eq UseDefaultAcls) ZkUtils.defaultAcls(isSecure, path) else acls try { zkClient.writeData(path, data) } catch { @@ -593,8 +594,8 @@ class ZkUtils(val zkClient: ZkClient, * Update the value of a persistent node with the given path and data. * create parent directory if necessary. Never throw NodeExistException. */ - def updateEphemeralPath(path: String, data: String, acls: java.util.List[ACL] = useDefaultAcl): Unit = { - val acl = if (acls == useDefaultAcl) ZkUtils.defaultAcls(isSecure, path) else acls + def updateEphemeralPath(path: String, data: String, acls: java.util.List[ACL] = UseDefaultAcls): Unit = { + val acl = if (acls eq UseDefaultAcls) ZkUtils.defaultAcls(isSecure, path) else acls try { zkClient.writeData(path, data) } catch { @@ -871,8 +872,8 @@ class ZkUtils(val zkClient: ZkClient, * It uses the stat returned by the zookeeper and return the version. Every time * client updates the path stat.version gets incremented. Starting value of sequence number is 1. */ - def getSequenceId(path: String, acls: java.util.List[ACL] = useDefaultAcl): Int = { - val acl = if (acls == useDefaultAcl) ZkUtils.defaultAcls(isSecure, path) else acls + def getSequenceId(path: String, acls: java.util.List[ACL] = UseDefaultAcls): Int = { + val acl = if (acls == UseDefaultAcls) ZkUtils.defaultAcls(isSecure, path) else acls def writeToZk: Int = zkClient.writeDataReturnStat(path, "", -1).getVersion try { writeToZk