diff --git a/core/src/main/scala/kafka/security/auth/SimpleAclAuthorizer.scala b/core/src/main/scala/kafka/security/auth/SimpleAclAuthorizer.scala index 7ec572caa66..892377cfe92 100644 --- a/core/src/main/scala/kafka/security/auth/SimpleAclAuthorizer.scala +++ b/core/src/main/scala/kafka/security/auth/SimpleAclAuthorizer.scala @@ -22,7 +22,7 @@ import java.util.concurrent.locks.ReentrantReadWriteLock import com.typesafe.scalalogging.Logger import kafka.api.KAFKA_2_0_IV1 import kafka.network.RequestChannel.Session -import kafka.security.auth.SimpleAclAuthorizer.{VersionedAcls, NoAcls} +import kafka.security.auth.SimpleAclAuthorizer.{NoAcls, VersionedAcls} import kafka.server.KafkaConfig import kafka.utils.CoreUtils.{inReadLock, inWriteLock} import kafka.utils._ @@ -33,7 +33,7 @@ import org.apache.kafka.common.security.auth.KafkaPrincipal import org.apache.kafka.common.utils.{SecurityUtils, Time} import scala.collection.JavaConverters._ -import scala.util.Random +import scala.util.{Failure, Random, Success, Try} object SimpleAclAuthorizer { //optional override zookeeper cluster configuration where acls will be stored, if not specified acls will be stored in @@ -267,12 +267,17 @@ class SimpleAclAuthorizer extends Authorizer with Logging { ZkAclStore.stores.foreach(store => { val resourceTypes = zkClient.getResourceTypes(store.patternType) for (rType <- resourceTypes) { - val resourceType = ResourceType.fromString(rType) - val resourceNames = zkClient.getResourceNames(store.patternType, resourceType) - for (resourceName <- resourceNames) { - val resource = new Resource(resourceType, resourceName, store.patternType) - val versionedAcls = getAclsFromZk(resource) - updateCache(resource, versionedAcls) + val resourceType = Try(ResourceType.fromString(rType)) + resourceType match { + case Success(resourceTypeObj) => { + val resourceNames = zkClient.getResourceNames(store.patternType, resourceTypeObj) + for (resourceName <- resourceNames) { + val resource = new Resource(resourceTypeObj, resourceName, store.patternType) + val versionedAcls = getAclsFromZk(resource) + updateCache(resource, versionedAcls) + } + } + case Failure(f) => warn(s"Ignoring unknown ResourceType: $rType") } } })