|
|
|
@ -22,7 +22,7 @@ import java.util.concurrent.locks.ReentrantReadWriteLock
@@ -22,7 +22,7 @@ import java.util.concurrent.locks.ReentrantReadWriteLock
|
|
|
|
|
import com.typesafe.scalalogging.Logger |
|
|
|
|
import kafka.api.KAFKA_2_0_IV1 |
|
|
|
|
import kafka.network.RequestChannel.Session |
|
|
|
|
import kafka.security.auth.SimpleAclAuthorizer.{VersionedAcls, NoAcls} |
|
|
|
|
import kafka.security.auth.SimpleAclAuthorizer.{NoAcls, VersionedAcls} |
|
|
|
|
import kafka.server.KafkaConfig |
|
|
|
|
import kafka.utils.CoreUtils.{inReadLock, inWriteLock} |
|
|
|
|
import kafka.utils._ |
|
|
|
@ -33,7 +33,7 @@ import org.apache.kafka.common.security.auth.KafkaPrincipal
@@ -33,7 +33,7 @@ import org.apache.kafka.common.security.auth.KafkaPrincipal
|
|
|
|
|
import org.apache.kafka.common.utils.{SecurityUtils, Time} |
|
|
|
|
|
|
|
|
|
import scala.collection.JavaConverters._ |
|
|
|
|
import scala.util.Random |
|
|
|
|
import scala.util.{Failure, Random, Success, Try} |
|
|
|
|
|
|
|
|
|
object SimpleAclAuthorizer { |
|
|
|
|
//optional override zookeeper cluster configuration where acls will be stored, if not specified acls will be stored in |
|
|
|
@ -267,12 +267,17 @@ class SimpleAclAuthorizer extends Authorizer with Logging {
@@ -267,12 +267,17 @@ class SimpleAclAuthorizer extends Authorizer with Logging {
|
|
|
|
|
ZkAclStore.stores.foreach(store => { |
|
|
|
|
val resourceTypes = zkClient.getResourceTypes(store.patternType) |
|
|
|
|
for (rType <- resourceTypes) { |
|
|
|
|
val resourceType = ResourceType.fromString(rType) |
|
|
|
|
val resourceNames = zkClient.getResourceNames(store.patternType, resourceType) |
|
|
|
|
for (resourceName <- resourceNames) { |
|
|
|
|
val resource = new Resource(resourceType, resourceName, store.patternType) |
|
|
|
|
val versionedAcls = getAclsFromZk(resource) |
|
|
|
|
updateCache(resource, versionedAcls) |
|
|
|
|
val resourceType = Try(ResourceType.fromString(rType)) |
|
|
|
|
resourceType match { |
|
|
|
|
case Success(resourceTypeObj) => { |
|
|
|
|
val resourceNames = zkClient.getResourceNames(store.patternType, resourceTypeObj) |
|
|
|
|
for (resourceName <- resourceNames) { |
|
|
|
|
val resource = new Resource(resourceTypeObj, resourceName, store.patternType) |
|
|
|
|
val versionedAcls = getAclsFromZk(resource) |
|
|
|
|
updateCache(resource, versionedAcls) |
|
|
|
|
} |
|
|
|
|
} |
|
|
|
|
case Failure(f) => warn(s"Ignoring unknown ResourceType: $rType") |
|
|
|
|
} |
|
|
|
|
} |
|
|
|
|
}) |
|
|
|
|