From 523562c109b29cc5a5e56313f16f1b1ff6c5dd9c Mon Sep 17 00:00:00 2001 From: Ismael Juma Date: Wed, 27 Jan 2016 08:23:25 -0800 Subject: [PATCH] KAFKA-3152; kafka-acl doesn't allow space in principal name * Add quotes to `$` in shell scripts This is necessary for correct processing of quotes in the user command. * Minor improvements to AclCommand messages * Use a principal with a space in `SslEndToEndAuthorizationTest` This passed without any other changes, but good avoid regressions. * Clean-up `TestSslUtils`: Remove unused methods, fix unnecessary verbosity and don't set security.protocol (it should be done at a higher-level). Author: Ismael Juma Reviewers: Grant Henke , Jun Rao threads; private final List sockets; - private SecurityProtocol protocol = SecurityProtocol.PLAINTEXT; - private SslFactory sslFactory; + private final SslFactory sslFactory; private final AtomicBoolean renegotiate = new AtomicBoolean(); - public EchoServer(Map configs) throws Exception { - this.protocol = configs.containsKey("security.protocol") ? - SecurityProtocol.forName((String) configs.get("security.protocol")) : SecurityProtocol.PLAINTEXT; - if (protocol == SecurityProtocol.SSL) { - this.sslFactory = new SslFactory(Mode.SERVER); - this.sslFactory.configure(configs); - SSLContext sslContext = this.sslFactory.sslContext(); - this.serverSocket = sslContext.getServerSocketFactory().createServerSocket(0); - } else { - this.serverSocket = new ServerSocket(0); + public EchoServer(SecurityProtocol securityProtocol, Map configs) throws Exception { + switch (securityProtocol) { + case SSL: + this.sslFactory = new SslFactory(Mode.SERVER); + this.sslFactory.configure(configs); + SSLContext sslContext = this.sslFactory.sslContext(); + this.serverSocket = sslContext.getServerSocketFactory().createServerSocket(0); + break; + case PLAINTEXT: + this.serverSocket = new ServerSocket(0); + this.sslFactory = null; + break; + default: + throw new IllegalArgumentException("Unsupported securityProtocol " + securityProtocol); } this.port = this.serverSocket.getLocalPort(); this.threads = Collections.synchronizedList(new ArrayList()); diff --git a/clients/src/test/java/org/apache/kafka/common/network/SelectorTest.java b/clients/src/test/java/org/apache/kafka/common/network/SelectorTest.java index 4d3b1c8d967..a98594cc1d0 100644 --- a/clients/src/test/java/org/apache/kafka/common/network/SelectorTest.java +++ b/clients/src/test/java/org/apache/kafka/common/network/SelectorTest.java @@ -23,6 +23,7 @@ import java.net.ServerSocket; import java.nio.ByteBuffer; import org.apache.kafka.common.metrics.Metrics; +import org.apache.kafka.common.protocol.SecurityProtocol; import org.apache.kafka.common.utils.MockTime; import org.apache.kafka.common.utils.Time; import org.apache.kafka.common.utils.Utils; @@ -45,9 +46,9 @@ public class SelectorTest { private Metrics metrics; @Before - public void setup() throws Exception { + public void setUp() throws Exception { Map configs = new HashMap<>(); - this.server = new EchoServer(configs); + this.server = new EchoServer(SecurityProtocol.PLAINTEXT, configs); this.server.start(); this.time = new MockTime(); this.channelBuilder = new PlaintextChannelBuilder(); @@ -57,7 +58,7 @@ public class SelectorTest { } @After - public void teardown() throws Exception { + public void tearDown() throws Exception { this.selector.close(); this.server.close(); this.metrics.close(); diff --git a/clients/src/test/java/org/apache/kafka/common/network/SslSelectorTest.java b/clients/src/test/java/org/apache/kafka/common/network/SslSelectorTest.java index 2c098ea744a..06ad810fe41 100644 --- a/clients/src/test/java/org/apache/kafka/common/network/SslSelectorTest.java +++ b/clients/src/test/java/org/apache/kafka/common/network/SslSelectorTest.java @@ -26,6 +26,7 @@ import java.net.InetSocketAddress; import org.apache.kafka.common.metrics.Metrics; import org.apache.kafka.common.config.SslConfigs; +import org.apache.kafka.common.protocol.SecurityProtocol; import org.apache.kafka.common.security.ssl.SslFactory; import org.apache.kafka.common.utils.MockTime; import org.apache.kafka.test.TestSslUtils; @@ -42,16 +43,15 @@ public class SslSelectorTest extends SelectorTest { private Map sslClientConfigs; @Before - public void setup() throws Exception { + public void setUp() throws Exception { File trustStoreFile = File.createTempFile("truststore", ".jks"); Map sslServerConfigs = TestSslUtils.createSslConfig(false, true, Mode.SERVER, trustStoreFile, "server"); sslServerConfigs.put(SslConfigs.PRINCIPAL_BUILDER_CLASS_CONFIG, Class.forName(SslConfigs.DEFAULT_PRINCIPAL_BUILDER_CLASS)); - this.server = new EchoServer(sslServerConfigs); + this.server = new EchoServer(SecurityProtocol.SSL, sslServerConfigs); this.server.start(); this.time = new MockTime(); - sslClientConfigs = TestSslUtils.createSslConfig(false, false, Mode.SERVER, trustStoreFile, "client"); - + sslClientConfigs = TestSslUtils.createSslConfig(false, false, Mode.CLIENT, trustStoreFile, "client"); this.channelBuilder = new SslChannelBuilder(Mode.CLIENT); this.channelBuilder.configure(sslClientConfigs); this.metrics = new Metrics(); @@ -59,7 +59,7 @@ public class SslSelectorTest extends SelectorTest { } @After - public void teardown() throws Exception { + public void tearDown() throws Exception { this.selector.close(); this.server.close(); this.metrics.close(); diff --git a/clients/src/test/java/org/apache/kafka/common/network/SslTransportLayerTest.java b/clients/src/test/java/org/apache/kafka/common/network/SslTransportLayerTest.java index d4f1464f665..d3302c87a25 100644 --- a/clients/src/test/java/org/apache/kafka/common/network/SslTransportLayerTest.java +++ b/clients/src/test/java/org/apache/kafka/common/network/SslTransportLayerTest.java @@ -71,7 +71,6 @@ public class SslTransportLayerTest { clientCertStores = new CertStores(false, "localhost"); sslServerConfigs = serverCertStores.getTrustingConfig(clientCertStores); sslClientConfigs = clientCertStores.getTrustingConfig(serverCertStores); - this.channelBuilder = new SslChannelBuilder(Mode.CLIENT); this.channelBuilder.configure(sslClientConfigs); this.selector = new Selector(5000, new Metrics(), new MockTime(), "MetricGroup", channelBuilder); @@ -113,8 +112,8 @@ public class SslTransportLayerTest { clientCertStores = new CertStores(false, "localhost"); sslServerConfigs = serverCertStores.getTrustingConfig(clientCertStores); sslClientConfigs = clientCertStores.getTrustingConfig(serverCertStores); - createEchoServer(sslServerConfigs); sslClientConfigs.put(SslConfigs.SSL_ENDPOINT_IDENTIFICATION_ALGORITHM_CONFIG, "HTTPS"); + createEchoServer(sslServerConfigs); createSelector(sslClientConfigs); InetSocketAddress addr = new InetSocketAddress("localhost", server.port); selector.connect(node, addr, BUFFER_SIZE, BUFFER_SIZE); diff --git a/clients/src/test/java/org/apache/kafka/common/security/ssl/SslFactoryTest.java b/clients/src/test/java/org/apache/kafka/common/security/ssl/SslFactoryTest.java index 5336db7c18a..86dd161b52b 100644 --- a/clients/src/test/java/org/apache/kafka/common/security/ssl/SslFactoryTest.java +++ b/clients/src/test/java/org/apache/kafka/common/security/ssl/SslFactoryTest.java @@ -26,11 +26,9 @@ import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertArrayEquals; import static org.junit.Assert.assertTrue; - /** * A set of tests for the selector over ssl. These use a test harness that runs a simple socket server that echos back responses. */ - public class SslFactoryTest { @Test diff --git a/clients/src/test/java/org/apache/kafka/test/TestSslUtils.java b/clients/src/test/java/org/apache/kafka/test/TestSslUtils.java index 2507e59bfc9..71713af5204 100644 --- a/clients/src/test/java/org/apache/kafka/test/TestSslUtils.java +++ b/clients/src/test/java/org/apache/kafka/test/TestSslUtils.java @@ -19,7 +19,6 @@ package org.apache.kafka.test; import org.apache.kafka.common.config.SslConfigs; import org.apache.kafka.common.network.Mode; -import org.apache.kafka.clients.CommonClientConfigs; import java.io.File; import java.io.FileOutputStream; @@ -116,11 +115,8 @@ public class TestSslUtils { private static void saveKeyStore(KeyStore ks, String filename, Password password) throws GeneralSecurityException, IOException { - FileOutputStream out = new FileOutputStream(filename); - try { + try (FileOutputStream out = new FileOutputStream(filename)) { ks.store(out, password.value().toCharArray()); - } finally { - out.close(); } } @@ -154,14 +150,6 @@ public class TestSslUtils { saveKeyStore(ks, filename, password); } - public static void createTrustStore(String filename, - Password password, String alias, - Certificate cert) throws GeneralSecurityException, IOException { - KeyStore ks = createEmptyKeyStore(); - ks.setCertificateEntry(alias, cert); - saveKeyStore(ks, filename, password); - } - public static void createTrustStore( String filename, Password password, Map certs) throws GeneralSecurityException, IOException { KeyStore ks = KeyStore.getInstance("JKS"); @@ -178,18 +166,9 @@ public class TestSslUtils { saveKeyStore(ks, filename, password); } - public static Map createX509Certificates(KeyPair keyPair) - throws GeneralSecurityException { - Map certs = new HashMap(); - X509Certificate cert = generateCertificate("CN=localhost, O=localhost", keyPair, 30, "SHA1withRSA"); - certs.put("localhost", cert); - return certs; - } - - public static Map createSslConfig(Mode mode, File keyStoreFile, Password password, Password keyPassword, + private static Map createSslConfig(Mode mode, File keyStoreFile, Password password, Password keyPassword, File trustStoreFile, Password trustStorePassword) { Map sslConfigs = new HashMap<>(); - sslConfigs.put(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, "SSL"); // kafka security protocol sslConfigs.put(SslConfigs.SSL_PROTOCOL_CONFIG, "TLSv1.2"); // protocol to create SSLContext if (mode == Mode.SERVER || (mode == Mode.CLIENT && keyStoreFile != null)) { @@ -219,27 +198,22 @@ public class TestSslUtils { public static Map createSslConfig(boolean useClientCert, boolean trustStore, Mode mode, File trustStoreFile, String certAlias, String host) throws IOException, GeneralSecurityException { - Map certs = new HashMap(); + Map certs = new HashMap<>(); File keyStoreFile; - Password password; - - if (mode == Mode.SERVER) - password = new Password("ServerPassword"); - else - password = new Password("ClientPassword"); + Password password = mode == Mode.SERVER ? new Password("ServerPassword") : new Password("ClientPassword"); Password trustStorePassword = new Password("TrustStorePassword"); if (useClientCert) { keyStoreFile = File.createTempFile("clientKS", ".jks"); KeyPair cKP = generateKeyPair("RSA"); - X509Certificate cCert = generateCertificate("CN=" + host + ", O=client", cKP, 30, "SHA1withRSA"); + X509Certificate cCert = generateCertificate("CN=" + host + ", O=A client", cKP, 30, "SHA1withRSA"); createKeyStore(keyStoreFile.getPath(), password, "client", cKP.getPrivate(), cCert); certs.put(certAlias, cCert); } else { keyStoreFile = File.createTempFile("serverKS", ".jks"); KeyPair sKP = generateKeyPair("RSA"); - X509Certificate sCert = generateCertificate("CN=" + host + ", O=server", sKP, 30, + X509Certificate sCert = generateCertificate("CN=" + host + ", O=A server", sKP, 30, "SHA1withRSA"); createKeyStore(keyStoreFile.getPath(), password, password, "server", sKP.getPrivate(), sCert); certs.put(certAlias, sCert); @@ -249,9 +223,7 @@ public class TestSslUtils { createTrustStore(trustStoreFile.getPath(), trustStorePassword, certs); } - Map sslConfig = createSslConfig(mode, keyStoreFile, password, - password, trustStoreFile, trustStorePassword); - return sslConfig; + return createSslConfig(mode, keyStoreFile, password, password, trustStoreFile, trustStorePassword); } } diff --git a/core/src/main/scala/kafka/admin/AclCommand.scala b/core/src/main/scala/kafka/admin/AclCommand.scala index 841b278c6b7..bf22e90ca64 100644 --- a/core/src/main/scala/kafka/admin/AclCommand.scala +++ b/core/src/main/scala/kafka/admin/AclCommand.scala @@ -52,7 +52,7 @@ object AclCommand { listAcl(opts) } catch { case e: Throwable => - println(s"Error while executing topic Acl command ${e.getMessage}") + println(s"Error while executing ACL command: ${e.getMessage}") println(Utils.stackTrace(e)) System.exit(-1) } @@ -79,11 +79,11 @@ object AclCommand { val resourceToAcl = getResourceToAcls(opts) if (resourceToAcl.values.exists(_.isEmpty)) - CommandLineUtils.printUsageAndDie(opts.parser, "You must specify one of: --allow-principal, --deny-principal when trying to add acls.") + CommandLineUtils.printUsageAndDie(opts.parser, "You must specify one of: --allow-principal, --deny-principal when trying to add ACLs.") for ((resource, acls) <- resourceToAcl) { val acls = resourceToAcl(resource) - println(s"Adding following acls for resource: $resource $Newline ${acls.map("\t" + _).mkString(Newline)} $Newline") + println(s"Adding ACLs for resource `${resource}`: $Newline ${acls.map("\t" + _).mkString(Newline)} $Newline") authorizer.addAcls(acls, resource) } @@ -97,10 +97,10 @@ object AclCommand { for ((resource, acls) <- resourceToAcl) { if (acls.isEmpty) { - if (confirmAction(s"Are you sure you want to delete all acls for resource: $resource y/n?")) + if (confirmAction(s"Are you sure you want to delete all ACLs for resource `${resource}`? (y/n)")) authorizer.removeAcls(resource) } else { - if (confirmAction(s"Are you sure you want to remove acls: $Newline ${acls.map("\t" + _).mkString(Newline)} $Newline from resource $resource y/n?")) + if (confirmAction(s"Are you sure you want to remove ACLs: $Newline ${acls.map("\t" + _).mkString(Newline)} $Newline from resource `${resource}`? (y/n)")) authorizer.removeAcls(acls, resource) } } @@ -119,14 +119,14 @@ object AclCommand { resources.map(resource => (resource -> authorizer.getAcls(resource))) for ((resource, acls) <- resourceToAcls) - println(s"Following is list of acls for resource: $resource $Newline ${acls.map("\t" + _).mkString(Newline)} $Newline") + println(s"Current ACLs for resource `${resource}`: $Newline ${acls.map("\t" + _).mkString(Newline)} $Newline") } } private def getResourceToAcls(opts: AclCommandOptions): Map[Resource, Set[Acl]] = { var resourceToAcls = Map.empty[Resource, Set[Acl]] - //if none of the --producer or --consumer options are specified , just construct acls from CLI options. + //if none of the --producer or --consumer options are specified , just construct ACLs from CLI options. if (!opts.options.has(opts.producerOpt) && !opts.options.has(opts.consumerOpt)) { resourceToAcls ++= getCliResourceToAcls(opts) } @@ -267,22 +267,22 @@ object AclCommand { .describedAs("authorizer-properties") .ofType(classOf[String]) - val topicOpt = parser.accepts("topic", "topic to which acls should be added or removed. " + - "A value of * indicates acl should apply to all topics.") + val topicOpt = parser.accepts("topic", "topic to which ACLs should be added or removed. " + + "A value of * indicates ACL should apply to all topics.") .withRequiredArg .describedAs("topic") .ofType(classOf[String]) - val clusterOpt = parser.accepts("cluster", "Add/Remove cluster acls.") - val groupOpt = parser.accepts("group", "Consumer Group to which the acls should be added or removed. " + - "A value of * indicates the acls should apply to all groups.") + val clusterOpt = parser.accepts("cluster", "Add/Remove cluster ACLs.") + val groupOpt = parser.accepts("group", "Consumer Group to which the ACLs should be added or removed. " + + "A value of * indicates the ACLs should apply to all groups.") .withRequiredArg .describedAs("group") .ofType(classOf[String]) - val addOpt = parser.accepts("add", "Indicates you are trying to add acls.") - val removeOpt = parser.accepts("remove", "Indicates you are trying to remove acls.") - val listOpt = parser.accepts("list", "List acls for the specified resource, use --topic or --group or --cluster to specify a resource.") + val addOpt = parser.accepts("add", "Indicates you are trying to add ACLs.") + val removeOpt = parser.accepts("remove", "Indicates you are trying to remove ACLs.") + val listOpt = parser.accepts("list", "List ACLs for the specified resource, use --topic or --group or --cluster to specify a resource.") val operationsOpt = parser.accepts("operation", "Operation that is being allowed or denied. Valid operation names are: " + Newline + Operation.values.map("\t" + _).mkString(Newline) + Newline) @@ -296,10 +296,10 @@ object AclCommand { .describedAs("allow-principal") .ofType(classOf[String]) - val denyPrincipalsOpt = parser.accepts("deny-principal", "principal is in principalType: name format. " + + val denyPrincipalsOpt = parser.accepts("deny-principal", "principal is in principalType:name format. " + "By default anyone not added through --allow-principal is denied access. " + "You only need to use this option as negation to already allowed set. " + - "For example if you wanted to allow access to all users in the system but not test-user you can define an acl that " + + "For example if you wanted to allow access to all users in the system but not test-user you can define an ACL that " + "allows access to User:* and specify --deny-principal=User:test@EXAMPLE.COM. " + "AND PLEASE REMEMBER DENY RULES TAKES PRECEDENCE OVER ALLOW RULES.") .withRequiredArg @@ -318,11 +318,11 @@ object AclCommand { .describedAs("deny-host") .ofType(classOf[String]) - val producerOpt = parser.accepts("producer", "Convenience option to add/remove acls for producer role. " + - "This will generate acls that allows WRITE,DESCRIBE on topic and CREATE on cluster. ") + val producerOpt = parser.accepts("producer", "Convenience option to add/remove ACLs for producer role. " + + "This will generate ACLs that allows WRITE,DESCRIBE on topic and CREATE on cluster. ") - val consumerOpt = parser.accepts("consumer", "Convenience option to add/remove acls for consumer role. " + - "This will generate acls that allows READ,DESCRIBE on topic and READ on group.") + val consumerOpt = parser.accepts("consumer", "Convenience option to add/remove ACLs for consumer role. " + + "This will generate ACLs that allows READ,DESCRIBE on topic and READ on group.") val helpOpt = parser.accepts("help", "Print usage information.") diff --git a/core/src/test/scala/integration/kafka/api/SslEndToEndAuthorizationTest.scala b/core/src/test/scala/integration/kafka/api/SslEndToEndAuthorizationTest.scala index 15e85274306..812359eaeed 100644 --- a/core/src/test/scala/integration/kafka/api/SslEndToEndAuthorizationTest.scala +++ b/core/src/test/scala/integration/kafka/api/SslEndToEndAuthorizationTest.scala @@ -24,6 +24,6 @@ import org.apache.kafka.common.protocol.SecurityProtocol class SslEndToEndAuthorizationTest extends EndToEndAuthorizationTest { override protected def securityProtocol = SecurityProtocol.SSL this.serverConfig.setProperty(SslConfigs.SSL_CLIENT_AUTH_CONFIG, "required") - override val clientPrincipal = "O=client,CN=localhost" - override val kafkaPrincipal = "O=server,CN=localhost" + override val clientPrincipal = "O=A client,CN=localhost" + override val kafkaPrincipal = "O=A server,CN=localhost" }