Browse Source

KAFKA-3152; kafka-acl doesn't allow space in principal name

* Add quotes to `$` in shell scripts
This is necessary for correct processing of quotes in the
user command.

* Minor improvements to AclCommand messages

* Use a principal with a space in `SslEndToEndAuthorizationTest`
This passed without any other changes, but good avoid regressions.

* Clean-up `TestSslUtils`:
Remove unused methods, fix unnecessary verbosity and don't set security.protocol (it should be done at a higher-level).

Author: Ismael Juma <ismael@juma.me.uk>

Reviewers: Grant Henke <granthenke@gmail.com>, Jun Rao <junrao@gmail.com

Closes #818 from ijuma/kafka-3152-kafka-acl-space-in-principal
pull/818/merge
Ismael Juma 9 years ago committed by Jun Rao
parent
commit
523562c109
  1. 2
      bin/kafka-acls.sh
  2. 2
      bin/kafka-configs.sh
  3. 2
      bin/kafka-console-consumer.sh
  4. 2
      bin/kafka-console-producer.sh
  5. 2
      bin/kafka-consumer-groups.sh
  6. 2
      bin/kafka-consumer-offset-checker.sh
  7. 2
      bin/kafka-consumer-perf-test.sh
  8. 2
      bin/kafka-mirror-maker.sh
  9. 2
      bin/kafka-preferred-replica-election.sh
  10. 2
      bin/kafka-producer-perf-test.sh
  11. 2
      bin/kafka-reassign-partitions.sh
  12. 2
      bin/kafka-replay-log-producer.sh
  13. 2
      bin/kafka-replica-verification.sh
  14. 2
      bin/kafka-server-start.sh
  15. 2
      bin/kafka-simple-consumer-shell.sh
  16. 2
      bin/kafka-topics.sh
  17. 2
      bin/kafka-verifiable-consumer.sh
  18. 2
      bin/kafka-verifiable-producer.sh
  19. 2
      bin/zookeeper-security-migration.sh
  20. 3
      bin/zookeeper-server-start.sh
  21. 27
      clients/src/test/java/org/apache/kafka/common/network/EchoServer.java
  22. 7
      clients/src/test/java/org/apache/kafka/common/network/SelectorTest.java
  23. 10
      clients/src/test/java/org/apache/kafka/common/network/SslSelectorTest.java
  24. 3
      clients/src/test/java/org/apache/kafka/common/network/SslTransportLayerTest.java
  25. 2
      clients/src/test/java/org/apache/kafka/common/security/ssl/SslFactoryTest.java
  26. 42
      clients/src/test/java/org/apache/kafka/test/TestSslUtils.java
  27. 42
      core/src/main/scala/kafka/admin/AclCommand.scala
  28. 4
      core/src/test/scala/integration/kafka/api/SslEndToEndAuthorizationTest.scala

2
bin/kafka-acls.sh

@ -14,4 +14,4 @@
# See the License for the specific language governing permissions and # See the License for the specific language governing permissions and
# limitations under the License. # limitations under the License.
exec $(dirname $0)/kafka-run-class.sh kafka.admin.AclCommand $@ exec $(dirname $0)/kafka-run-class.sh kafka.admin.AclCommand "$@"

2
bin/kafka-configs.sh

@ -14,4 +14,4 @@
# See the License for the specific language governing permissions and # See the License for the specific language governing permissions and
# limitations under the License. # limitations under the License.
exec $(dirname $0)/kafka-run-class.sh kafka.admin.ConfigCommand $@ exec $(dirname $0)/kafka-run-class.sh kafka.admin.ConfigCommand "$@"

2
bin/kafka-console-consumer.sh

@ -18,4 +18,4 @@ if [ "x$KAFKA_HEAP_OPTS" = "x" ]; then
export KAFKA_HEAP_OPTS="-Xmx512M" export KAFKA_HEAP_OPTS="-Xmx512M"
fi fi
exec $(dirname $0)/kafka-run-class.sh kafka.tools.ConsoleConsumer $@ exec $(dirname $0)/kafka-run-class.sh kafka.tools.ConsoleConsumer "$@"

2
bin/kafka-console-producer.sh

@ -17,4 +17,4 @@
if [ "x$KAFKA_HEAP_OPTS" = "x" ]; then if [ "x$KAFKA_HEAP_OPTS" = "x" ]; then
export KAFKA_HEAP_OPTS="-Xmx512M" export KAFKA_HEAP_OPTS="-Xmx512M"
fi fi
exec $(dirname $0)/kafka-run-class.sh kafka.tools.ConsoleProducer $@ exec $(dirname $0)/kafka-run-class.sh kafka.tools.ConsoleProducer "$@"

2
bin/kafka-consumer-groups.sh

@ -14,4 +14,4 @@
# See the License for the specific language governing permissions and # See the License for the specific language governing permissions and
# limitations under the License. # limitations under the License.
exec $(dirname $0)/kafka-run-class.sh kafka.admin.ConsumerGroupCommand $@ exec $(dirname $0)/kafka-run-class.sh kafka.admin.ConsumerGroupCommand "$@"

2
bin/kafka-consumer-offset-checker.sh

@ -14,4 +14,4 @@
# See the License for the specific language governing permissions and # See the License for the specific language governing permissions and
# limitations under the License. # limitations under the License.
exec $(dirname $0)/kafka-run-class.sh kafka.tools.ConsumerOffsetChecker $@ exec $(dirname $0)/kafka-run-class.sh kafka.tools.ConsumerOffsetChecker "$@"

2
bin/kafka-consumer-perf-test.sh

@ -17,4 +17,4 @@
if [ "x$KAFKA_HEAP_OPTS" = "x" ]; then if [ "x$KAFKA_HEAP_OPTS" = "x" ]; then
export KAFKA_HEAP_OPTS="-Xmx512M" export KAFKA_HEAP_OPTS="-Xmx512M"
fi fi
exec $(dirname $0)/kafka-run-class.sh kafka.tools.ConsumerPerformance $@ exec $(dirname $0)/kafka-run-class.sh kafka.tools.ConsumerPerformance "$@"

2
bin/kafka-mirror-maker.sh

@ -14,4 +14,4 @@
# See the License for the specific language governing permissions and # See the License for the specific language governing permissions and
# limitations under the License. # limitations under the License.
exec $(dirname $0)/kafka-run-class.sh kafka.tools.MirrorMaker $@ exec $(dirname $0)/kafka-run-class.sh kafka.tools.MirrorMaker "$@"

2
bin/kafka-preferred-replica-election.sh

@ -14,4 +14,4 @@
# See the License for the specific language governing permissions and # See the License for the specific language governing permissions and
# limitations under the License. # limitations under the License.
exec $(dirname $0)/kafka-run-class.sh kafka.admin.PreferredReplicaLeaderElectionCommand $@ exec $(dirname $0)/kafka-run-class.sh kafka.admin.PreferredReplicaLeaderElectionCommand "$@"

2
bin/kafka-producer-perf-test.sh

@ -17,4 +17,4 @@
if [ "x$KAFKA_HEAP_OPTS" = "x" ]; then if [ "x$KAFKA_HEAP_OPTS" = "x" ]; then
export KAFKA_HEAP_OPTS="-Xmx512M" export KAFKA_HEAP_OPTS="-Xmx512M"
fi fi
exec $(dirname $0)/kafka-run-class.sh org.apache.kafka.tools.ProducerPerformance $@ exec $(dirname $0)/kafka-run-class.sh org.apache.kafka.tools.ProducerPerformance "$@"

2
bin/kafka-reassign-partitions.sh

@ -14,4 +14,4 @@
# See the License for the specific language governing permissions and # See the License for the specific language governing permissions and
# limitations under the License. # limitations under the License.
exec $(dirname $0)/kafka-run-class.sh kafka.admin.ReassignPartitionsCommand $@ exec $(dirname $0)/kafka-run-class.sh kafka.admin.ReassignPartitionsCommand "$@"

2
bin/kafka-replay-log-producer.sh

@ -14,4 +14,4 @@
# See the License for the specific language governing permissions and # See the License for the specific language governing permissions and
# limitations under the License. # limitations under the License.
exec $(dirname $0)/kafka-run-class.sh kafka.tools.ReplayLogProducer $@ exec $(dirname $0)/kafka-run-class.sh kafka.tools.ReplayLogProducer "$@"

2
bin/kafka-replica-verification.sh

@ -14,4 +14,4 @@
# See the License for the specific language governing permissions and # See the License for the specific language governing permissions and
# limitations under the License. # limitations under the License.
exec $(dirname $0)/kafka-run-class.sh kafka.tools.ReplicaVerificationTool $@ exec $(dirname $0)/kafka-run-class.sh kafka.tools.ReplicaVerificationTool "$@"

2
bin/kafka-server-start.sh

@ -41,4 +41,4 @@ case $COMMAND in
;; ;;
esac esac
exec $base_dir/kafka-run-class.sh $EXTRA_ARGS kafka.Kafka $@ exec $base_dir/kafka-run-class.sh $EXTRA_ARGS kafka.Kafka "$@"

2
bin/kafka-simple-consumer-shell.sh

@ -14,4 +14,4 @@
# See the License for the specific language governing permissions and # See the License for the specific language governing permissions and
# limitations under the License. # limitations under the License.
exec $(dirname $0)/kafka-run-class.sh kafka.tools.SimpleConsumerShell $@ exec $(dirname $0)/kafka-run-class.sh kafka.tools.SimpleConsumerShell "$@"

2
bin/kafka-topics.sh

@ -14,4 +14,4 @@
# See the License for the specific language governing permissions and # See the License for the specific language governing permissions and
# limitations under the License. # limitations under the License.
exec $(dirname $0)/kafka-run-class.sh kafka.admin.TopicCommand $@ exec $(dirname $0)/kafka-run-class.sh kafka.admin.TopicCommand "$@"

2
bin/kafka-verifiable-consumer.sh

@ -17,4 +17,4 @@
if [ "x$KAFKA_HEAP_OPTS" = "x" ]; then if [ "x$KAFKA_HEAP_OPTS" = "x" ]; then
export KAFKA_HEAP_OPTS="-Xmx512M" export KAFKA_HEAP_OPTS="-Xmx512M"
fi fi
exec $(dirname $0)/kafka-run-class.sh org.apache.kafka.tools.VerifiableConsumer $@ exec $(dirname $0)/kafka-run-class.sh org.apache.kafka.tools.VerifiableConsumer "$@"

2
bin/kafka-verifiable-producer.sh

@ -17,4 +17,4 @@
if [ "x$KAFKA_HEAP_OPTS" = "x" ]; then if [ "x$KAFKA_HEAP_OPTS" = "x" ]; then
export KAFKA_HEAP_OPTS="-Xmx512M" export KAFKA_HEAP_OPTS="-Xmx512M"
fi fi
exec $(dirname $0)/kafka-run-class.sh org.apache.kafka.tools.VerifiableProducer $@ exec $(dirname $0)/kafka-run-class.sh org.apache.kafka.tools.VerifiableProducer "$@"

2
bin/zookeeper-security-migration.sh

@ -14,4 +14,4 @@
# See the License for the specific language governing permissions and # See the License for the specific language governing permissions and
# limitations under the License. # limitations under the License.
exec $(dirname $0)/kafka-run-class.sh kafka.admin.ZkSecurityMigrator $@ exec $(dirname $0)/kafka-run-class.sh kafka.admin.ZkSecurityMigrator "$@"

3
bin/zookeeper-server-start.sh

@ -41,5 +41,4 @@ case $COMMAND in
;; ;;
esac esac
exec $base_dir/kafka-run-class.sh $EXTRA_ARGS org.apache.zookeeper.server.quorum.QuorumPeerMain $@ exec $base_dir/kafka-run-class.sh $EXTRA_ARGS org.apache.zookeeper.server.quorum.QuorumPeerMain "$@"

27
clients/src/test/java/org/apache/kafka/common/network/EchoServer.java

@ -37,20 +37,23 @@ class EchoServer extends Thread {
private final ServerSocket serverSocket; private final ServerSocket serverSocket;
private final List<Thread> threads; private final List<Thread> threads;
private final List<Socket> sockets; private final List<Socket> sockets;
private SecurityProtocol protocol = SecurityProtocol.PLAINTEXT; private final SslFactory sslFactory;
private SslFactory sslFactory;
private final AtomicBoolean renegotiate = new AtomicBoolean(); private final AtomicBoolean renegotiate = new AtomicBoolean();
public EchoServer(Map<String, ?> configs) throws Exception { public EchoServer(SecurityProtocol securityProtocol, Map<String, ?> configs) throws Exception {
this.protocol = configs.containsKey("security.protocol") ? switch (securityProtocol) {
SecurityProtocol.forName((String) configs.get("security.protocol")) : SecurityProtocol.PLAINTEXT; case SSL:
if (protocol == SecurityProtocol.SSL) { this.sslFactory = new SslFactory(Mode.SERVER);
this.sslFactory = new SslFactory(Mode.SERVER); this.sslFactory.configure(configs);
this.sslFactory.configure(configs); SSLContext sslContext = this.sslFactory.sslContext();
SSLContext sslContext = this.sslFactory.sslContext(); this.serverSocket = sslContext.getServerSocketFactory().createServerSocket(0);
this.serverSocket = sslContext.getServerSocketFactory().createServerSocket(0); break;
} else { case PLAINTEXT:
this.serverSocket = new ServerSocket(0); this.serverSocket = new ServerSocket(0);
this.sslFactory = null;
break;
default:
throw new IllegalArgumentException("Unsupported securityProtocol " + securityProtocol);
} }
this.port = this.serverSocket.getLocalPort(); this.port = this.serverSocket.getLocalPort();
this.threads = Collections.synchronizedList(new ArrayList<Thread>()); this.threads = Collections.synchronizedList(new ArrayList<Thread>());

7
clients/src/test/java/org/apache/kafka/common/network/SelectorTest.java

@ -23,6 +23,7 @@ import java.net.ServerSocket;
import java.nio.ByteBuffer; import java.nio.ByteBuffer;
import org.apache.kafka.common.metrics.Metrics; import org.apache.kafka.common.metrics.Metrics;
import org.apache.kafka.common.protocol.SecurityProtocol;
import org.apache.kafka.common.utils.MockTime; import org.apache.kafka.common.utils.MockTime;
import org.apache.kafka.common.utils.Time; import org.apache.kafka.common.utils.Time;
import org.apache.kafka.common.utils.Utils; import org.apache.kafka.common.utils.Utils;
@ -45,9 +46,9 @@ public class SelectorTest {
private Metrics metrics; private Metrics metrics;
@Before @Before
public void setup() throws Exception { public void setUp() throws Exception {
Map<String, Object> configs = new HashMap<>(); Map<String, Object> configs = new HashMap<>();
this.server = new EchoServer(configs); this.server = new EchoServer(SecurityProtocol.PLAINTEXT, configs);
this.server.start(); this.server.start();
this.time = new MockTime(); this.time = new MockTime();
this.channelBuilder = new PlaintextChannelBuilder(); this.channelBuilder = new PlaintextChannelBuilder();
@ -57,7 +58,7 @@ public class SelectorTest {
} }
@After @After
public void teardown() throws Exception { public void tearDown() throws Exception {
this.selector.close(); this.selector.close();
this.server.close(); this.server.close();
this.metrics.close(); this.metrics.close();

10
clients/src/test/java/org/apache/kafka/common/network/SslSelectorTest.java

@ -26,6 +26,7 @@ import java.net.InetSocketAddress;
import org.apache.kafka.common.metrics.Metrics; import org.apache.kafka.common.metrics.Metrics;
import org.apache.kafka.common.config.SslConfigs; import org.apache.kafka.common.config.SslConfigs;
import org.apache.kafka.common.protocol.SecurityProtocol;
import org.apache.kafka.common.security.ssl.SslFactory; import org.apache.kafka.common.security.ssl.SslFactory;
import org.apache.kafka.common.utils.MockTime; import org.apache.kafka.common.utils.MockTime;
import org.apache.kafka.test.TestSslUtils; import org.apache.kafka.test.TestSslUtils;
@ -42,16 +43,15 @@ public class SslSelectorTest extends SelectorTest {
private Map<String, Object> sslClientConfigs; private Map<String, Object> sslClientConfigs;
@Before @Before
public void setup() throws Exception { public void setUp() throws Exception {
File trustStoreFile = File.createTempFile("truststore", ".jks"); File trustStoreFile = File.createTempFile("truststore", ".jks");
Map<String, Object> sslServerConfigs = TestSslUtils.createSslConfig(false, true, Mode.SERVER, trustStoreFile, "server"); Map<String, Object> sslServerConfigs = TestSslUtils.createSslConfig(false, true, Mode.SERVER, trustStoreFile, "server");
sslServerConfigs.put(SslConfigs.PRINCIPAL_BUILDER_CLASS_CONFIG, Class.forName(SslConfigs.DEFAULT_PRINCIPAL_BUILDER_CLASS)); sslServerConfigs.put(SslConfigs.PRINCIPAL_BUILDER_CLASS_CONFIG, Class.forName(SslConfigs.DEFAULT_PRINCIPAL_BUILDER_CLASS));
this.server = new EchoServer(sslServerConfigs); this.server = new EchoServer(SecurityProtocol.SSL, sslServerConfigs);
this.server.start(); this.server.start();
this.time = new MockTime(); this.time = new MockTime();
sslClientConfigs = TestSslUtils.createSslConfig(false, false, Mode.SERVER, trustStoreFile, "client"); sslClientConfigs = TestSslUtils.createSslConfig(false, false, Mode.CLIENT, trustStoreFile, "client");
this.channelBuilder = new SslChannelBuilder(Mode.CLIENT); this.channelBuilder = new SslChannelBuilder(Mode.CLIENT);
this.channelBuilder.configure(sslClientConfigs); this.channelBuilder.configure(sslClientConfigs);
this.metrics = new Metrics(); this.metrics = new Metrics();
@ -59,7 +59,7 @@ public class SslSelectorTest extends SelectorTest {
} }
@After @After
public void teardown() throws Exception { public void tearDown() throws Exception {
this.selector.close(); this.selector.close();
this.server.close(); this.server.close();
this.metrics.close(); this.metrics.close();

3
clients/src/test/java/org/apache/kafka/common/network/SslTransportLayerTest.java

@ -71,7 +71,6 @@ public class SslTransportLayerTest {
clientCertStores = new CertStores(false, "localhost"); clientCertStores = new CertStores(false, "localhost");
sslServerConfigs = serverCertStores.getTrustingConfig(clientCertStores); sslServerConfigs = serverCertStores.getTrustingConfig(clientCertStores);
sslClientConfigs = clientCertStores.getTrustingConfig(serverCertStores); sslClientConfigs = clientCertStores.getTrustingConfig(serverCertStores);
this.channelBuilder = new SslChannelBuilder(Mode.CLIENT); this.channelBuilder = new SslChannelBuilder(Mode.CLIENT);
this.channelBuilder.configure(sslClientConfigs); this.channelBuilder.configure(sslClientConfigs);
this.selector = new Selector(5000, new Metrics(), new MockTime(), "MetricGroup", channelBuilder); this.selector = new Selector(5000, new Metrics(), new MockTime(), "MetricGroup", channelBuilder);
@ -113,8 +112,8 @@ public class SslTransportLayerTest {
clientCertStores = new CertStores(false, "localhost"); clientCertStores = new CertStores(false, "localhost");
sslServerConfigs = serverCertStores.getTrustingConfig(clientCertStores); sslServerConfigs = serverCertStores.getTrustingConfig(clientCertStores);
sslClientConfigs = clientCertStores.getTrustingConfig(serverCertStores); sslClientConfigs = clientCertStores.getTrustingConfig(serverCertStores);
createEchoServer(sslServerConfigs);
sslClientConfigs.put(SslConfigs.SSL_ENDPOINT_IDENTIFICATION_ALGORITHM_CONFIG, "HTTPS"); sslClientConfigs.put(SslConfigs.SSL_ENDPOINT_IDENTIFICATION_ALGORITHM_CONFIG, "HTTPS");
createEchoServer(sslServerConfigs);
createSelector(sslClientConfigs); createSelector(sslClientConfigs);
InetSocketAddress addr = new InetSocketAddress("localhost", server.port); InetSocketAddress addr = new InetSocketAddress("localhost", server.port);
selector.connect(node, addr, BUFFER_SIZE, BUFFER_SIZE); selector.connect(node, addr, BUFFER_SIZE, BUFFER_SIZE);

2
clients/src/test/java/org/apache/kafka/common/security/ssl/SslFactoryTest.java

@ -26,11 +26,9 @@ import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertArrayEquals; import static org.junit.Assert.assertArrayEquals;
import static org.junit.Assert.assertTrue; import static org.junit.Assert.assertTrue;
/** /**
* A set of tests for the selector over ssl. These use a test harness that runs a simple socket server that echos back responses. * A set of tests for the selector over ssl. These use a test harness that runs a simple socket server that echos back responses.
*/ */
public class SslFactoryTest { public class SslFactoryTest {
@Test @Test

42
clients/src/test/java/org/apache/kafka/test/TestSslUtils.java

@ -19,7 +19,6 @@ package org.apache.kafka.test;
import org.apache.kafka.common.config.SslConfigs; import org.apache.kafka.common.config.SslConfigs;
import org.apache.kafka.common.network.Mode; import org.apache.kafka.common.network.Mode;
import org.apache.kafka.clients.CommonClientConfigs;
import java.io.File; import java.io.File;
import java.io.FileOutputStream; import java.io.FileOutputStream;
@ -116,11 +115,8 @@ public class TestSslUtils {
private static void saveKeyStore(KeyStore ks, String filename, private static void saveKeyStore(KeyStore ks, String filename,
Password password) throws GeneralSecurityException, IOException { Password password) throws GeneralSecurityException, IOException {
FileOutputStream out = new FileOutputStream(filename); try (FileOutputStream out = new FileOutputStream(filename)) {
try {
ks.store(out, password.value().toCharArray()); ks.store(out, password.value().toCharArray());
} finally {
out.close();
} }
} }
@ -154,14 +150,6 @@ public class TestSslUtils {
saveKeyStore(ks, filename, password); saveKeyStore(ks, filename, password);
} }
public static void createTrustStore(String filename,
Password password, String alias,
Certificate cert) throws GeneralSecurityException, IOException {
KeyStore ks = createEmptyKeyStore();
ks.setCertificateEntry(alias, cert);
saveKeyStore(ks, filename, password);
}
public static <T extends Certificate> void createTrustStore( public static <T extends Certificate> void createTrustStore(
String filename, Password password, Map<String, T> certs) throws GeneralSecurityException, IOException { String filename, Password password, Map<String, T> certs) throws GeneralSecurityException, IOException {
KeyStore ks = KeyStore.getInstance("JKS"); KeyStore ks = KeyStore.getInstance("JKS");
@ -178,18 +166,9 @@ public class TestSslUtils {
saveKeyStore(ks, filename, password); saveKeyStore(ks, filename, password);
} }
public static Map<String, X509Certificate> createX509Certificates(KeyPair keyPair) private static Map<String, Object> createSslConfig(Mode mode, File keyStoreFile, Password password, Password keyPassword,
throws GeneralSecurityException {
Map<String, X509Certificate> certs = new HashMap<String, X509Certificate>();
X509Certificate cert = generateCertificate("CN=localhost, O=localhost", keyPair, 30, "SHA1withRSA");
certs.put("localhost", cert);
return certs;
}
public static Map<String, Object> createSslConfig(Mode mode, File keyStoreFile, Password password, Password keyPassword,
File trustStoreFile, Password trustStorePassword) { File trustStoreFile, Password trustStorePassword) {
Map<String, Object> sslConfigs = new HashMap<>(); Map<String, Object> sslConfigs = new HashMap<>();
sslConfigs.put(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, "SSL"); // kafka security protocol
sslConfigs.put(SslConfigs.SSL_PROTOCOL_CONFIG, "TLSv1.2"); // protocol to create SSLContext sslConfigs.put(SslConfigs.SSL_PROTOCOL_CONFIG, "TLSv1.2"); // protocol to create SSLContext
if (mode == Mode.SERVER || (mode == Mode.CLIENT && keyStoreFile != null)) { if (mode == Mode.SERVER || (mode == Mode.CLIENT && keyStoreFile != null)) {
@ -219,27 +198,22 @@ public class TestSslUtils {
public static Map<String, Object> createSslConfig(boolean useClientCert, boolean trustStore, Mode mode, File trustStoreFile, String certAlias, String host) public static Map<String, Object> createSslConfig(boolean useClientCert, boolean trustStore, Mode mode, File trustStoreFile, String certAlias, String host)
throws IOException, GeneralSecurityException { throws IOException, GeneralSecurityException {
Map<String, X509Certificate> certs = new HashMap<String, X509Certificate>(); Map<String, X509Certificate> certs = new HashMap<>();
File keyStoreFile; File keyStoreFile;
Password password; Password password = mode == Mode.SERVER ? new Password("ServerPassword") : new Password("ClientPassword");
if (mode == Mode.SERVER)
password = new Password("ServerPassword");
else
password = new Password("ClientPassword");
Password trustStorePassword = new Password("TrustStorePassword"); Password trustStorePassword = new Password("TrustStorePassword");
if (useClientCert) { if (useClientCert) {
keyStoreFile = File.createTempFile("clientKS", ".jks"); keyStoreFile = File.createTempFile("clientKS", ".jks");
KeyPair cKP = generateKeyPair("RSA"); KeyPair cKP = generateKeyPair("RSA");
X509Certificate cCert = generateCertificate("CN=" + host + ", O=client", cKP, 30, "SHA1withRSA"); X509Certificate cCert = generateCertificate("CN=" + host + ", O=A client", cKP, 30, "SHA1withRSA");
createKeyStore(keyStoreFile.getPath(), password, "client", cKP.getPrivate(), cCert); createKeyStore(keyStoreFile.getPath(), password, "client", cKP.getPrivate(), cCert);
certs.put(certAlias, cCert); certs.put(certAlias, cCert);
} else { } else {
keyStoreFile = File.createTempFile("serverKS", ".jks"); keyStoreFile = File.createTempFile("serverKS", ".jks");
KeyPair sKP = generateKeyPair("RSA"); KeyPair sKP = generateKeyPair("RSA");
X509Certificate sCert = generateCertificate("CN=" + host + ", O=server", sKP, 30, X509Certificate sCert = generateCertificate("CN=" + host + ", O=A server", sKP, 30,
"SHA1withRSA"); "SHA1withRSA");
createKeyStore(keyStoreFile.getPath(), password, password, "server", sKP.getPrivate(), sCert); createKeyStore(keyStoreFile.getPath(), password, password, "server", sKP.getPrivate(), sCert);
certs.put(certAlias, sCert); certs.put(certAlias, sCert);
@ -249,9 +223,7 @@ public class TestSslUtils {
createTrustStore(trustStoreFile.getPath(), trustStorePassword, certs); createTrustStore(trustStoreFile.getPath(), trustStorePassword, certs);
} }
Map<String, Object> sslConfig = createSslConfig(mode, keyStoreFile, password, return createSslConfig(mode, keyStoreFile, password, password, trustStoreFile, trustStorePassword);
password, trustStoreFile, trustStorePassword);
return sslConfig;
} }
} }

42
core/src/main/scala/kafka/admin/AclCommand.scala

@ -52,7 +52,7 @@ object AclCommand {
listAcl(opts) listAcl(opts)
} catch { } catch {
case e: Throwable => case e: Throwable =>
println(s"Error while executing topic Acl command ${e.getMessage}") println(s"Error while executing ACL command: ${e.getMessage}")
println(Utils.stackTrace(e)) println(Utils.stackTrace(e))
System.exit(-1) System.exit(-1)
} }
@ -79,11 +79,11 @@ object AclCommand {
val resourceToAcl = getResourceToAcls(opts) val resourceToAcl = getResourceToAcls(opts)
if (resourceToAcl.values.exists(_.isEmpty)) if (resourceToAcl.values.exists(_.isEmpty))
CommandLineUtils.printUsageAndDie(opts.parser, "You must specify one of: --allow-principal, --deny-principal when trying to add acls.") CommandLineUtils.printUsageAndDie(opts.parser, "You must specify one of: --allow-principal, --deny-principal when trying to add ACLs.")
for ((resource, acls) <- resourceToAcl) { for ((resource, acls) <- resourceToAcl) {
val acls = resourceToAcl(resource) val acls = resourceToAcl(resource)
println(s"Adding following acls for resource: $resource $Newline ${acls.map("\t" + _).mkString(Newline)} $Newline") println(s"Adding ACLs for resource `${resource}`: $Newline ${acls.map("\t" + _).mkString(Newline)} $Newline")
authorizer.addAcls(acls, resource) authorizer.addAcls(acls, resource)
} }
@ -97,10 +97,10 @@ object AclCommand {
for ((resource, acls) <- resourceToAcl) { for ((resource, acls) <- resourceToAcl) {
if (acls.isEmpty) { if (acls.isEmpty) {
if (confirmAction(s"Are you sure you want to delete all acls for resource: $resource y/n?")) if (confirmAction(s"Are you sure you want to delete all ACLs for resource `${resource}`? (y/n)"))
authorizer.removeAcls(resource) authorizer.removeAcls(resource)
} else { } else {
if (confirmAction(s"Are you sure you want to remove acls: $Newline ${acls.map("\t" + _).mkString(Newline)} $Newline from resource $resource y/n?")) if (confirmAction(s"Are you sure you want to remove ACLs: $Newline ${acls.map("\t" + _).mkString(Newline)} $Newline from resource `${resource}`? (y/n)"))
authorizer.removeAcls(acls, resource) authorizer.removeAcls(acls, resource)
} }
} }
@ -119,14 +119,14 @@ object AclCommand {
resources.map(resource => (resource -> authorizer.getAcls(resource))) resources.map(resource => (resource -> authorizer.getAcls(resource)))
for ((resource, acls) <- resourceToAcls) for ((resource, acls) <- resourceToAcls)
println(s"Following is list of acls for resource: $resource $Newline ${acls.map("\t" + _).mkString(Newline)} $Newline") println(s"Current ACLs for resource `${resource}`: $Newline ${acls.map("\t" + _).mkString(Newline)} $Newline")
} }
} }
private def getResourceToAcls(opts: AclCommandOptions): Map[Resource, Set[Acl]] = { private def getResourceToAcls(opts: AclCommandOptions): Map[Resource, Set[Acl]] = {
var resourceToAcls = Map.empty[Resource, Set[Acl]] var resourceToAcls = Map.empty[Resource, Set[Acl]]
//if none of the --producer or --consumer options are specified , just construct acls from CLI options. //if none of the --producer or --consumer options are specified , just construct ACLs from CLI options.
if (!opts.options.has(opts.producerOpt) && !opts.options.has(opts.consumerOpt)) { if (!opts.options.has(opts.producerOpt) && !opts.options.has(opts.consumerOpt)) {
resourceToAcls ++= getCliResourceToAcls(opts) resourceToAcls ++= getCliResourceToAcls(opts)
} }
@ -267,22 +267,22 @@ object AclCommand {
.describedAs("authorizer-properties") .describedAs("authorizer-properties")
.ofType(classOf[String]) .ofType(classOf[String])
val topicOpt = parser.accepts("topic", "topic to which acls should be added or removed. " + val topicOpt = parser.accepts("topic", "topic to which ACLs should be added or removed. " +
"A value of * indicates acl should apply to all topics.") "A value of * indicates ACL should apply to all topics.")
.withRequiredArg .withRequiredArg
.describedAs("topic") .describedAs("topic")
.ofType(classOf[String]) .ofType(classOf[String])
val clusterOpt = parser.accepts("cluster", "Add/Remove cluster acls.") val clusterOpt = parser.accepts("cluster", "Add/Remove cluster ACLs.")
val groupOpt = parser.accepts("group", "Consumer Group to which the acls should be added or removed. " + val groupOpt = parser.accepts("group", "Consumer Group to which the ACLs should be added or removed. " +
"A value of * indicates the acls should apply to all groups.") "A value of * indicates the ACLs should apply to all groups.")
.withRequiredArg .withRequiredArg
.describedAs("group") .describedAs("group")
.ofType(classOf[String]) .ofType(classOf[String])
val addOpt = parser.accepts("add", "Indicates you are trying to add acls.") val addOpt = parser.accepts("add", "Indicates you are trying to add ACLs.")
val removeOpt = parser.accepts("remove", "Indicates you are trying to remove acls.") val removeOpt = parser.accepts("remove", "Indicates you are trying to remove ACLs.")
val listOpt = parser.accepts("list", "List acls for the specified resource, use --topic <topic> or --group <group> or --cluster to specify a resource.") val listOpt = parser.accepts("list", "List ACLs for the specified resource, use --topic <topic> or --group <group> or --cluster to specify a resource.")
val operationsOpt = parser.accepts("operation", "Operation that is being allowed or denied. Valid operation names are: " + Newline + val operationsOpt = parser.accepts("operation", "Operation that is being allowed or denied. Valid operation names are: " + Newline +
Operation.values.map("\t" + _).mkString(Newline) + Newline) Operation.values.map("\t" + _).mkString(Newline) + Newline)
@ -296,10 +296,10 @@ object AclCommand {
.describedAs("allow-principal") .describedAs("allow-principal")
.ofType(classOf[String]) .ofType(classOf[String])
val denyPrincipalsOpt = parser.accepts("deny-principal", "principal is in principalType: name format. " + val denyPrincipalsOpt = parser.accepts("deny-principal", "principal is in principalType:name format. " +
"By default anyone not added through --allow-principal is denied access. " + "By default anyone not added through --allow-principal is denied access. " +
"You only need to use this option as negation to already allowed set. " + "You only need to use this option as negation to already allowed set. " +
"For example if you wanted to allow access to all users in the system but not test-user you can define an acl that " + "For example if you wanted to allow access to all users in the system but not test-user you can define an ACL that " +
"allows access to User:* and specify --deny-principal=User:test@EXAMPLE.COM. " + "allows access to User:* and specify --deny-principal=User:test@EXAMPLE.COM. " +
"AND PLEASE REMEMBER DENY RULES TAKES PRECEDENCE OVER ALLOW RULES.") "AND PLEASE REMEMBER DENY RULES TAKES PRECEDENCE OVER ALLOW RULES.")
.withRequiredArg .withRequiredArg
@ -318,11 +318,11 @@ object AclCommand {
.describedAs("deny-host") .describedAs("deny-host")
.ofType(classOf[String]) .ofType(classOf[String])
val producerOpt = parser.accepts("producer", "Convenience option to add/remove acls for producer role. " + val producerOpt = parser.accepts("producer", "Convenience option to add/remove ACLs for producer role. " +
"This will generate acls that allows WRITE,DESCRIBE on topic and CREATE on cluster. ") "This will generate ACLs that allows WRITE,DESCRIBE on topic and CREATE on cluster. ")
val consumerOpt = parser.accepts("consumer", "Convenience option to add/remove acls for consumer role. " + val consumerOpt = parser.accepts("consumer", "Convenience option to add/remove ACLs for consumer role. " +
"This will generate acls that allows READ,DESCRIBE on topic and READ on group.") "This will generate ACLs that allows READ,DESCRIBE on topic and READ on group.")
val helpOpt = parser.accepts("help", "Print usage information.") val helpOpt = parser.accepts("help", "Print usage information.")

4
core/src/test/scala/integration/kafka/api/SslEndToEndAuthorizationTest.scala

@ -24,6 +24,6 @@ import org.apache.kafka.common.protocol.SecurityProtocol
class SslEndToEndAuthorizationTest extends EndToEndAuthorizationTest { class SslEndToEndAuthorizationTest extends EndToEndAuthorizationTest {
override protected def securityProtocol = SecurityProtocol.SSL override protected def securityProtocol = SecurityProtocol.SSL
this.serverConfig.setProperty(SslConfigs.SSL_CLIENT_AUTH_CONFIG, "required") this.serverConfig.setProperty(SslConfigs.SSL_CLIENT_AUTH_CONFIG, "required")
override val clientPrincipal = "O=client,CN=localhost" override val clientPrincipal = "O=A client,CN=localhost"
override val kafkaPrincipal = "O=server,CN=localhost" override val kafkaPrincipal = "O=A server,CN=localhost"
} }

Loading…
Cancel
Save