@ -17,7 +17,6 @@
@@ -17,7 +17,6 @@
package kafka.utils
import java.util.Collections
import java.util.concurrent.CountDownLatch
import kafka.admin._
@ -42,6 +41,7 @@ import scala.collection.JavaConverters._
@@ -42,6 +41,7 @@ import scala.collection.JavaConverters._
object ZkUtils {
private val UseDefaultAcls = new java . util . ArrayList [ ACL ]
// Important : it is necessary to add any new top level Zookeeper path here
val AdminPath = "/admin"
@ -107,7 +107,7 @@ object ZkUtils {
@@ -107,7 +107,7 @@ object ZkUtils {
}
def sensitivePath ( path : String ) : Boolean = {
path != null && ! SensitiveZkRootPaths . forall ( ! path . startsWith ( _ ) )
path != null && SensitiveZkRootPaths . exists ( path . startsWith ( _ ) )
}
@deprecated ( "This is deprecated, use defaultAcls(isSecure, path) which doesn't make sensitive data world readable" , since = "0.10.2.1" )
@ -235,10 +235,11 @@ class ZkUtils(val zkClient: ZkClient,
@@ -235,10 +235,11 @@ class ZkUtils(val zkClient: ZkClient,
IsrChangeNotificationPath ,
PidBlockPath )
import ZkUtils._
@deprecated ( "This is deprecated, use defaultAcls(path) which doesn't make sensitive data world readable" , since = "0.10.2.1" )
val DefaultAcls : java.util.List [ ACL ] = ZkUtils . defaultAcls ( isSecure , "" )
private val useDefaultAcl = Collections . emptyList [ ACL ]
def defaultAcls ( path : String ) : java.util.List [ ACL ] = ZkUtils . defaultAcls ( isSecure , path )
def getController ( ) : Int = {
@ -432,11 +433,11 @@ class ZkUtils(val zkClient: ZkClient,
@@ -432,11 +433,11 @@ class ZkUtils(val zkClient: ZkClient,
/* *
* make sure a persistent path exists in ZK . Create the path if not exist .
*/
def makeSurePersistentPathExists ( path : String , acls : java.util.List [ ACL ] = useDefaultAcl ) {
def makeSurePersistentPathExists ( path : String , acls : java.util.List [ ACL ] = UseDefaultAcls ) {
// Consumer path is kept open as different consumers will write under this node .
val acl = if ( path == null || path . isEmpty || path . equals ( ConsumersPath ) ) {
ZooDefs . Ids . OPEN_ACL_UNSAFE
} else if ( acls == useDefaultAcl ) {
} else if ( acls eq UseDefaultAcls ) {
ZkUtils . defaultAcls ( isSecure , path )
} else {
acls
@ -449,8 +450,8 @@ class ZkUtils(val zkClient: ZkClient,
@@ -449,8 +450,8 @@ class ZkUtils(val zkClient: ZkClient,
/* *
* create the parent path
*/
private def createParentPath ( path : String , acls : java.util.List [ ACL ] = useDefaultAcl ) : Unit = {
val acl = if ( acls == useDefaultAcl ) ZkUtils . defaultAcls ( isSecure , path ) else acls
private def createParentPath ( path : String , acls : java.util.List [ ACL ] = UseDefaultAcls ) : Unit = {
val acl = if ( acls eq UseDefaultAcls ) ZkUtils . defaultAcls ( isSecure , path ) else acls
val parentDir = path . substring ( 0 , path . lastIndexOf ( '/' ) )
if ( parentDir . length != 0 ) {
ZkPath . createPersistent ( zkClient , parentDir , createParents = true , acl )
@ -460,8 +461,8 @@ class ZkUtils(val zkClient: ZkClient,
@@ -460,8 +461,8 @@ class ZkUtils(val zkClient: ZkClient,
/* *
* Create an ephemeral node with the given path and data . Create parents if necessary .
*/
private def createEphemeralPath ( path : String , data : String , acls : java.util.List [ ACL ] = useDefaultAcl ) : Unit = {
val acl = if ( acls == useDefaultAcl ) ZkUtils . defaultAcls ( isSecure , path ) else acls
private def createEphemeralPath ( path : String , data : String , acls : java.util.List [ ACL ] = UseDefaultAcls ) : Unit = {
val acl = if ( acls eq UseDefaultAcls ) ZkUtils . defaultAcls ( isSecure , path ) else acls
try {
ZkPath . createEphemeral ( zkClient , path , data , acl )
} catch {
@ -475,8 +476,8 @@ class ZkUtils(val zkClient: ZkClient,
@@ -475,8 +476,8 @@ class ZkUtils(val zkClient: ZkClient,
* Create an ephemeral node with the given path and data .
* Throw NodeExistException if node already exists .
*/
def createEphemeralPathExpectConflict ( path : String , data : String , acls : java.util.List [ ACL ] = useDefaultAcl ) : Unit = {
val acl = if ( acls == useDefaultAcl ) ZkUtils . defaultAcls ( isSecure , path ) else acls
def createEphemeralPathExpectConflict ( path : String , data : String , acls : java.util.List [ ACL ] = UseDefaultAcls ) : Unit = {
val acl = if ( acls eq UseDefaultAcls ) ZkUtils . defaultAcls ( isSecure , path ) else acls
try {
createEphemeralPath ( path , data , acl )
} catch {
@ -501,8 +502,8 @@ class ZkUtils(val zkClient: ZkClient,
@@ -501,8 +502,8 @@ class ZkUtils(val zkClient: ZkClient,
/* *
* Create a persistent node with the given path and data . Create parents if necessary .
*/
def createPersistentPath ( path : String , data : String = "" , acls : java.util.List [ ACL ] = useDefaultAcl ) : Unit = {
val acl = if ( acls == useDefaultAcl ) ZkUtils . defaultAcls ( isSecure , path ) else acls
def createPersistentPath ( path : String , data : String = "" , acls : java.util.List [ ACL ] = UseDefaultAcls ) : Unit = {
val acl = if ( acls eq UseDefaultAcls ) ZkUtils . defaultAcls ( isSecure , path ) else acls
try {
ZkPath . createPersistent ( zkClient , path , data , acl )
} catch {
@ -512,8 +513,8 @@ class ZkUtils(val zkClient: ZkClient,
@@ -512,8 +513,8 @@ class ZkUtils(val zkClient: ZkClient,
}
}
def createSequentialPersistentPath ( path : String , data : String = "" , acls : java.util.List [ ACL ] = useDefaultAcl ) : String = {
val acl = if ( acls == useDefaultAcl ) ZkUtils . defaultAcls ( isSecure , path ) else acls
def createSequentialPersistentPath ( path : String , data : String = "" , acls : java.util.List [ ACL ] = UseDefaultAcls ) : String = {
val acl = if ( acls eq UseDefaultAcls ) ZkUtils . defaultAcls ( isSecure , path ) else acls
ZkPath . createPersistentSequential ( zkClient , path , data , acl )
}
@ -522,8 +523,8 @@ class ZkUtils(val zkClient: ZkClient,
@@ -522,8 +523,8 @@ class ZkUtils(val zkClient: ZkClient,
* create parent directory if necessary . Never throw NodeExistException .
* Return the updated path zkVersion
*/
def updatePersistentPath ( path : String , data : String , acls : java.util.List [ ACL ] = useDefaultAcl ) = {
val acl = if ( acls == useDefaultAcl ) ZkUtils . defaultAcls ( isSecure , path ) else acls
def updatePersistentPath ( path : String , data : String , acls : java.util.List [ ACL ] = UseDefaultAcls ) = {
val acl = if ( acls eq UseDefaultAcls ) ZkUtils . defaultAcls ( isSecure , path ) else acls
try {
zkClient . writeData ( path , data )
} catch {
@ -593,8 +594,8 @@ class ZkUtils(val zkClient: ZkClient,
@@ -593,8 +594,8 @@ class ZkUtils(val zkClient: ZkClient,
* Update the value of a persistent node with the given path and data .
* create parent directory if necessary . Never throw NodeExistException .
*/
def updateEphemeralPath ( path : String , data : String , acls : java.util.List [ ACL ] = useDefaultAcl ) : Unit = {
val acl = if ( acls == useDefaultAcl ) ZkUtils . defaultAcls ( isSecure , path ) else acls
def updateEphemeralPath ( path : String , data : String , acls : java.util.List [ ACL ] = UseDefaultAcls ) : Unit = {
val acl = if ( acls eq UseDefaultAcls ) ZkUtils . defaultAcls ( isSecure , path ) else acls
try {
zkClient . writeData ( path , data )
} catch {
@ -871,8 +872,8 @@ class ZkUtils(val zkClient: ZkClient,
@@ -871,8 +872,8 @@ class ZkUtils(val zkClient: ZkClient,
* It uses the stat returned by the zookeeper and return the version . Every time
* client updates the path stat . version gets incremented . Starting value of sequence number is 1.
*/
def getSequenceId ( path : String , acls : java.util.List [ ACL ] = useDefaultAcl ) : Int = {
val acl = if ( acls == useDefaultAcl ) ZkUtils . defaultAcls ( isSecure , path ) else acls
def getSequenceId ( path : String , acls : java.util.List [ ACL ] = UseDefaultAcls ) : Int = {
val acl = if ( acls == UseDefaultAcls ) ZkUtils . defaultAcls ( isSecure , path ) else acls
def writeToZk : Int = zkClient . writeDataReturnStat ( path , "" , - 1 ) . getVersion
try {
writeToZk