@ -17,7 +17,7 @@
package kafka.admin
package kafka.admin
import kafka.utils. { CommandDefaultOptions , CommandLineUtils , Logging }
import kafka.utils. { CommandDefaultOptions , CommandLineUtils , Exit , Logging }
import kafka.zk. { ControllerZNode , KafkaZkClient , ZkData , ZkSecurityMigratorUtils }
import kafka.zk. { ControllerZNode , KafkaZkClient , ZkData , ZkSecurityMigratorUtils }
import org.apache.kafka.common.security.JaasUtils
import org.apache.kafka.common.security.JaasUtils
import org.apache.kafka.common.utils.Time
import org.apache.kafka.common.utils.Time
@ -95,8 +95,9 @@ object ZkSecurityMigrator extends Logging {
val zkConnectionTimeout = opts . options . valueOf ( opts . zkConnectionTimeoutOpt ) . intValue
val zkConnectionTimeout = opts . options . valueOf ( opts . zkConnectionTimeoutOpt ) . intValue
val zkClient = KafkaZkClient ( zkUrl , zkAcl , zkSessionTimeout , zkConnectionTimeout ,
val zkClient = KafkaZkClient ( zkUrl , zkAcl , zkSessionTimeout , zkConnectionTimeout ,
Int . MaxValue , Time . SYSTEM )
Int . MaxValue , Time . SYSTEM )
val enablePathCheck = opts . options . has ( opts . enablePathCheckOpt )
val migrator = new ZkSecurityMigrator ( zkClient )
val migrator = new ZkSecurityMigrator ( zkClient )
migrator . run ( )
migrator . run ( enablePathCheck )
}
}
def main ( args : Array [ String ] ) : Unit = {
def main ( args : Array [ String ] ) : Unit = {
@ -118,6 +119,8 @@ object ZkSecurityMigrator extends Logging {
withRequiredArg ( ) . ofType ( classOf [ java . lang . Integer ] ) . defaultsTo ( 30000 )
withRequiredArg ( ) . ofType ( classOf [ java . lang . Integer ] ) . defaultsTo ( 30000 )
val zkConnectionTimeoutOpt = parser . accepts ( "zookeeper.connection.timeout" , "Sets the ZooKeeper connection timeout." ) .
val zkConnectionTimeoutOpt = parser . accepts ( "zookeeper.connection.timeout" , "Sets the ZooKeeper connection timeout." ) .
withRequiredArg ( ) . ofType ( classOf [ java . lang . Integer ] ) . defaultsTo ( 30000 )
withRequiredArg ( ) . ofType ( classOf [ java . lang . Integer ] ) . defaultsTo ( 30000 )
val enablePathCheckOpt = parser . accepts ( "enable.path.check" , "Checks if all the root paths exist in ZooKeeper " +
"before migration. If not, exit the command." )
options = parser . parse ( args : _ * )
options = parser . parse ( args : _ * )
}
}
}
}
@ -218,9 +221,10 @@ class ZkSecurityMigrator(zkClient: KafkaZkClient) extends Logging {
}
}
}
}
private def run ( ) : Unit = {
private def run ( enablePathCheck : Boolean ) : Unit = {
try {
try {
setAclIndividually ( "/" )
setAclIndividually ( "/" )
checkPathExistenceAndMaybeExit ( enablePathCheck )
for ( path <- ZkData . SecureRootPaths ) {
for ( path <- ZkData . SecureRootPaths ) {
debug ( "Going to set ACL for %s" . format ( path ) )
debug ( "Going to set ACL for %s" . format ( path ) )
if ( path == ControllerZNode . path && ! zkClient . pathExists ( path ) ) {
if ( path == ControllerZNode . path && ! zkClient . pathExists ( path ) ) {
@ -250,4 +254,16 @@ class ZkSecurityMigrator(zkClient: KafkaZkClient) extends Logging {
zkClient . close
zkClient . close
}
}
}
}
private def checkPathExistenceAndMaybeExit ( enablePathCheck : Boolean ) : Unit = {
val nonExistingSecureRootPaths = ZkData . SecureRootPaths . filterNot ( zkClient . pathExists )
if ( nonExistingSecureRootPaths . nonEmpty ) {
println ( s" Warning: The following secure root paths do not exist in ZooKeeper: ${ nonExistingSecureRootPaths . mkString ( "," ) } " )
println ( "That might be due to an incorrect chroot is specified when executing the command." )
if ( enablePathCheck ) {
println ( "Exit the command." )
Exit . exit ( 0 )
}
}
}
}
}