Browse Source

KAFKA-10109: Fix double AdminClient creation in AclCommand

Author: Tom Bentley <tbentley@redhat.com>

Reviewers: David Jacot <djacot@confluent.io>, Manikumar Reddy <manikumar.reddy@gmail.com>

Closes #8808 from tombentley/KAFKA-10109-AclComment-multiple-AdminClients
pull/8703/head
Tom Bentley 4 years ago committed by Manikumar Reddy
parent
commit
f5e970df1d
  1. 40
      core/src/main/scala/kafka/admin/AclCommand.scala
  2. 92
      core/src/test/scala/unit/kafka/admin/AclCommandTest.scala

40
core/src/main/scala/kafka/admin/AclCommand.scala

@ -112,7 +112,7 @@ object AclCommand extends Logging { @@ -112,7 +112,7 @@ object AclCommand extends Logging {
adminClient.createAcls(aclBindings).all().get()
}
listAcls()
listAcls(adminClient)
}
}
@ -130,33 +130,39 @@ object AclCommand extends Logging { @@ -130,33 +130,39 @@ object AclCommand extends Logging {
}
}
listAcls()
listAcls(adminClient)
}
}
def listAcls(): Unit = {
withAdminClient(opts) { adminClient =>
val filters = getResourceFilter(opts, dieIfNoResourceFound = false)
val listPrincipals = getPrincipals(opts, opts.listPrincipalsOpt)
val resourceToAcls = getAcls(adminClient, filters)
listAcls(adminClient)
}
}
if (listPrincipals.isEmpty) {
for ((resource, acls) <- resourceToAcls)
println(s"Current ACLs for resource `$resource`: $Newline ${acls.map("\t" + _).mkString(Newline)} $Newline")
} else {
listPrincipals.foreach(principal => {
println(s"ACLs for principal `$principal`")
val filteredResourceToAcls = resourceToAcls.map { case (resource, acls) =>
resource -> acls.filter(acl => principal.toString.equals(acl.principal))
}.filter { case (_, acls) => acls.nonEmpty }
private def listAcls(adminClient: Admin): Unit = {
val filters = getResourceFilter(opts, dieIfNoResourceFound = false)
val listPrincipals = getPrincipals(opts, opts.listPrincipalsOpt)
val resourceToAcls = getAcls(adminClient, filters)
for ((resource, acls) <- filteredResourceToAcls)
println(s"Current ACLs for resource `$resource`: $Newline ${acls.map("\t" + _).mkString(Newline)} $Newline")
})
if (listPrincipals.isEmpty) {
printResourceAcls(resourceToAcls)
} else {
listPrincipals.foreach{principal =>
println(s"ACLs for principal `$principal`")
val filteredResourceToAcls = resourceToAcls.map { case (resource, acls) =>
resource -> acls.filter(acl => principal.toString.equals(acl.principal))
}.filter { case (_, acls) => acls.nonEmpty }
printResourceAcls(filteredResourceToAcls)
}
}
}
private def printResourceAcls(resourceToAcls: Map[ResourcePattern, Set[AccessControlEntry]]): Unit = {
for ((resource, acls) <- resourceToAcls)
println(s"Current ACLs for resource `$resource`: $Newline ${acls.map("\t" + _).mkString(Newline)} $Newline")
}
private def removeAcls(adminClient: Admin, acls: Set[AccessControlEntry], filter: ResourcePatternFilter): Unit = {
if (acls.isEmpty)
adminClient.deleteAcls(List(new AclBindingFilter(filter, AccessControlEntryFilter.ANY)).asJava).all().get()

92
core/src/test/scala/unit/kafka/admin/AclCommandTest.scala

@ -16,12 +16,14 @@ @@ -16,12 +16,14 @@
*/
package kafka.admin
import java.io.{File, PrintWriter}
import java.util.Properties
import javax.management.InstanceAlreadyExistsException
import kafka.admin.AclCommand.AclCommandOptions
import kafka.security.authorizer.{AclAuthorizer, AclEntry}
import kafka.server.{KafkaConfig, KafkaServer}
import kafka.utils.{Exit, Logging, TestUtils}
import kafka.utils.{Exit, LogCaptureAppender, Logging, TestUtils}
import kafka.zk.ZooKeeperTestHarness
import org.apache.kafka.common.acl.{AccessControlEntry, AclOperation, AclPermissionType}
import org.apache.kafka.common.acl.AclOperation._
@ -31,9 +33,11 @@ import org.apache.kafka.common.resource.ResourceType._ @@ -31,9 +33,11 @@ import org.apache.kafka.common.resource.ResourceType._
import org.apache.kafka.common.network.ListenerName
import org.apache.kafka.common.resource.PatternType.{LITERAL, PREFIXED}
import org.apache.kafka.common.security.auth.{KafkaPrincipal, SecurityProtocol}
import org.apache.kafka.common.utils.SecurityUtils
import org.apache.kafka.common.utils.{AppInfoParser, SecurityUtils}
import org.apache.kafka.server.authorizer.Authorizer
import org.junit.{After, Before, Test}
import org.apache.log4j.Level
import org.junit.Assert.assertFalse
import org.junit.{After, Assert, Before, Test}
import org.scalatest.Assertions.intercept
class AclCommandTest extends ZooKeeperTestHarness with Logging {
@ -127,10 +131,19 @@ class AclCommandTest extends ZooKeeperTestHarness with Logging { @@ -127,10 +131,19 @@ class AclCommandTest extends ZooKeeperTestHarness with Logging {
testAclCli(adminArgs)
}
private def createServer(): Unit = {
private def createServer(commandConfig: Option[File] = None): Unit = {
servers = Seq(TestUtils.createServer(KafkaConfig.fromProps(brokerProps)))
val listenerName = ListenerName.forSecurityProtocol(SecurityProtocol.PLAINTEXT)
adminArgs = Array("--bootstrap-server", TestUtils.bootstrapServers(servers, listenerName))
var adminArgs = Array("--bootstrap-server", TestUtils.bootstrapServers(servers, listenerName))
if (commandConfig.isDefined) {
adminArgs ++= Array("--command-config", commandConfig.get.getAbsolutePath)
}
this.adminArgs = adminArgs
}
private def callMain(args: Array[String]): (String, String) = {
TestUtils.grabConsoleOutputAndError(AclCommand.main(args))
}
private def testAclCli(cmdArgs: Array[String]): Unit = {
@ -138,14 +151,33 @@ class AclCommandTest extends ZooKeeperTestHarness with Logging { @@ -138,14 +151,33 @@ class AclCommandTest extends ZooKeeperTestHarness with Logging {
for (permissionType <- Set(ALLOW, DENY)) {
val operationToCmd = ResourceToOperations(resources)
val (acls, cmd) = getAclToCommand(permissionType, operationToCmd._1)
AclCommand.main(cmdArgs ++ cmd ++ resourceCmd ++ operationToCmd._2 :+ "--add")
for (resource <- resources) {
withAuthorizer() { authorizer =>
TestUtils.waitAndVerifyAcls(acls, authorizer, resource)
}
val (addOut, addErr) = callMain(cmdArgs ++ cmd ++ resourceCmd ++ operationToCmd._2 :+ "--add")
assertOutputContains("Adding ACLs", resources, resourceCmd, addOut)
assertOutputContains("Current ACLs", resources, resourceCmd, addOut)
Assert.assertEquals("", addErr)
for (resource <- resources) {
withAuthorizer() { authorizer =>
TestUtils.waitAndVerifyAcls(acls, authorizer, resource)
}
}
testRemove(cmdArgs, resources, resourceCmd)
val (listOut, listErr) = callMain(cmdArgs :+ "--list")
assertOutputContains("Current ACLs", resources, resourceCmd, listOut)
Assert.assertEquals("", listErr)
testRemove(cmdArgs, resources, resourceCmd)
}
}
}
private def assertOutputContains(prefix: String, resources: Set[ResourcePattern], resourceCmd: Array[String], output: String): Unit = {
resources.foreach { resource =>
val resourceType = resource.resourceType.toString
(if (resource == ClusterResource) Array("kafka-cluster") else resourceCmd.filter(!_.startsWith("--"))).foreach { name =>
val expected = s"$prefix for resource `ResourcePattern(resourceType=$resourceType, name=$name, patternType=LITERAL)`:"
Assert.assertTrue(s"Substring ${expected} not in output:\n$output",
output.contains(expected))
}
}
}
@ -161,10 +193,34 @@ class AclCommandTest extends ZooKeeperTestHarness with Logging { @@ -161,10 +193,34 @@ class AclCommandTest extends ZooKeeperTestHarness with Logging {
testProducerConsumerCli(adminArgs)
}
@Test
def testAclCliWithClientId(): Unit = {
val adminClientConfig = TestUtils.tempFile()
val pw = new PrintWriter(adminClientConfig)
pw.println("client.id=my-client")
pw.close()
createServer(Some(adminClientConfig))
val appender = LogCaptureAppender.createAndRegister()
val previousLevel = LogCaptureAppender.setClassLoggerLevel(classOf[AppInfoParser], Level.WARN)
try {
testAclCli(adminArgs)
} finally {
LogCaptureAppender.setClassLoggerLevel(classOf[AppInfoParser], previousLevel)
LogCaptureAppender.unregister(appender)
}
val warning = appender.getMessages.find(e => e.getLevel == Level.WARN &&
e.getThrowableInformation != null &&
e.getThrowableInformation.getThrowable.getClass.getName == classOf[InstanceAlreadyExistsException].getName)
assertFalse("There should be no warnings about multiple registration of mbeans", warning.isDefined)
}
private def testProducerConsumerCli(cmdArgs: Array[String]): Unit = {
for ((cmd, resourcesToAcls) <- CmdToResourcesToAcl) {
val resourceCommand: Array[String] = resourcesToAcls.keys.map(ResourceToCommand).foldLeft(Array[String]())(_ ++ _)
AclCommand.main(cmdArgs ++ getCmd(ALLOW) ++ resourceCommand ++ cmd :+ "--add")
callMain(cmdArgs ++ getCmd(ALLOW) ++ resourceCommand ++ cmd :+ "--add")
for ((resources, acls) <- resourcesToAcls) {
for (resource <- resources) {
withAuthorizer() { authorizer =>
@ -190,7 +246,7 @@ class AclCommandTest extends ZooKeeperTestHarness with Logging { @@ -190,7 +246,7 @@ class AclCommandTest extends ZooKeeperTestHarness with Logging {
private def testAclsOnPrefixedResources(cmdArgs: Array[String]): Unit = {
val cmd = Array("--allow-principal", principal.toString, "--producer", "--topic", "Test-", "--resource-pattern-type", "Prefixed")
AclCommand.main(cmdArgs ++ cmd :+ "--add")
callMain(cmdArgs ++ cmd :+ "--add")
withAuthorizer() { authorizer =>
val writeAcl = new AccessControlEntry(principal.toString, AclEntry.WildcardHost, WRITE, ALLOW)
@ -200,7 +256,7 @@ class AclCommandTest extends ZooKeeperTestHarness with Logging { @@ -200,7 +256,7 @@ class AclCommandTest extends ZooKeeperTestHarness with Logging {
new ResourcePattern(TOPIC, "Test-", PREFIXED))
}
AclCommand.main(cmdArgs ++ cmd :+ "--remove" :+ "--force")
callMain(cmdArgs ++ cmd :+ "--remove" :+ "--force")
withAuthorizer() { authorizer =>
TestUtils.waitAndVerifyAcls(Set.empty[AccessControlEntry], authorizer, new ResourcePattern(CLUSTER, "kafka-cluster", LITERAL))
@ -226,9 +282,9 @@ class AclCommandTest extends ZooKeeperTestHarness with Logging { @@ -226,9 +282,9 @@ class AclCommandTest extends ZooKeeperTestHarness with Logging {
}
def verifyPatternType(cmd: Array[String], isValid: Boolean): Unit = {
if (isValid)
AclCommand.main(cmd)
callMain(cmd)
else
intercept[RuntimeException](AclCommand.main(cmd))
intercept[RuntimeException](callMain(cmd))
}
try {
PatternType.values.foreach { patternType =>
@ -247,8 +303,10 @@ class AclCommandTest extends ZooKeeperTestHarness with Logging { @@ -247,8 +303,10 @@ class AclCommandTest extends ZooKeeperTestHarness with Logging {
}
private def testRemove(cmdArgs: Array[String], resources: Set[ResourcePattern], resourceCmd: Array[String]): Unit = {
val (out, err) = callMain(cmdArgs ++ resourceCmd :+ "--remove" :+ "--force")
Assert.assertEquals("", out)
Assert.assertEquals("", err)
for (resource <- resources) {
AclCommand.main(cmdArgs ++ resourceCmd :+ "--remove" :+ "--force")
withAuthorizer() { authorizer =>
TestUtils.waitAndVerifyAcls(Set.empty[AccessControlEntry], authorizer, resource)
}

Loading…
Cancel
Save