From 65b5ccf6413369aa4f21c72abcdf31ca72a79b00 Mon Sep 17 00:00:00 2001 From: Rajini Sivaram Date: Wed, 7 Feb 2018 03:29:08 -0800 Subject: [PATCH] KAFKA-6532: Reduce impact of delegation tokens on public interfaces (#4524) Keep delegation token implementation internal without exposing implementation details to pluggable classes: 1. KafkaPrincipal#tokenAuthenticated must always be set by SaslServerAuthenticator so that custom PrincipalBuilders cannot override. 2. Replace o.a.k.c.security.scram.DelegationTokenAuthenticationCallback with a more generic ScramExtensionsCallback that can be used to add more extensions in future. 3. Separate out ScramCredentialCallback (KIP-86 makes this a public interface) from delegation token credential callback (which is internal). Reviewers: Jun Rao , Manikumar Reddy --- .../common/security/auth/KafkaPrincipal.java | 12 ++- .../DefaultKafkaPrincipalBuilder.java | 10 +-- .../SaslClientCallbackHandler.java | 10 +-- .../SaslServerAuthenticator.java | 7 +- .../scram/ScramCredentialCallback.java | 24 ------ .../security/scram/ScramExtensions.java | 78 +++++++++++++++++++ ...back.java => ScramExtensionsCallback.java} | 14 ++-- .../security/scram/ScramLoginModule.java | 6 +- .../common/security/scram/ScramMessages.java | 31 +++----- .../security/scram/ScramSaslClient.java | 8 +- .../security/scram/ScramSaslServer.java | 43 ++++++---- .../scram/ScramServerCallbackHandler.java | 15 ++-- .../DelegationTokenCredentialCallback.java | 31 ++++++++ .../DefaultKafkaPrincipalBuilderTest.java | 2 - .../security/scram/ScramMessagesTest.java | 6 +- 15 files changed, 193 insertions(+), 104 deletions(-) create mode 100644 clients/src/main/java/org/apache/kafka/common/security/scram/ScramExtensions.java rename clients/src/main/java/org/apache/kafka/common/security/scram/{DelegationTokenAuthenticationCallback.java => ScramExtensionsCallback.java} (72%) create mode 100644 clients/src/main/java/org/apache/kafka/common/security/token/delegation/DelegationTokenCredentialCallback.java diff --git a/clients/src/main/java/org/apache/kafka/common/security/auth/KafkaPrincipal.java b/clients/src/main/java/org/apache/kafka/common/security/auth/KafkaPrincipal.java index 10bf76dd2e0..74bc9c916c0 100644 --- a/clients/src/main/java/org/apache/kafka/common/security/auth/KafkaPrincipal.java +++ b/clients/src/main/java/org/apache/kafka/common/security/auth/KafkaPrincipal.java @@ -47,16 +47,11 @@ public class KafkaPrincipal implements Principal { private final String principalType; private final String name; - private boolean tokenAuthenticated; + private volatile boolean tokenAuthenticated; public KafkaPrincipal(String principalType, String name) { - this(principalType, name, false); - } - - public KafkaPrincipal(String principalType, String name, boolean tokenauth) { this.principalType = requireNonNull(principalType, "Principal type cannot be null"); this.name = requireNonNull(name, "Principal name cannot be null"); - this.tokenAuthenticated = tokenauth; } /** @@ -91,7 +86,6 @@ public class KafkaPrincipal implements Principal { public int hashCode() { int result = principalType != null ? principalType.hashCode() : 0; result = 31 * result + (name != null ? name.hashCode() : 0); - result = 31 * result + (tokenAuthenticated ? 1 : 0); return result; } @@ -104,6 +98,10 @@ public class KafkaPrincipal implements Principal { return principalType; } + public void tokenAuthenticated(boolean tokenAuthenticated) { + this.tokenAuthenticated = tokenAuthenticated; + } + public boolean tokenAuthenticated() { return tokenAuthenticated; } diff --git a/clients/src/main/java/org/apache/kafka/common/security/authenticator/DefaultKafkaPrincipalBuilder.java b/clients/src/main/java/org/apache/kafka/common/security/authenticator/DefaultKafkaPrincipalBuilder.java index 740423863af..30b0a3e0980 100644 --- a/clients/src/main/java/org/apache/kafka/common/security/authenticator/DefaultKafkaPrincipalBuilder.java +++ b/clients/src/main/java/org/apache/kafka/common/security/authenticator/DefaultKafkaPrincipalBuilder.java @@ -28,7 +28,6 @@ import org.apache.kafka.common.security.auth.SaslAuthenticationContext; import org.apache.kafka.common.security.auth.SslAuthenticationContext; import org.apache.kafka.common.security.kerberos.KerberosName; import org.apache.kafka.common.security.kerberos.KerberosShortNamer; -import org.apache.kafka.common.security.scram.ScramLoginModule; import javax.net.ssl.SSLPeerUnverifiedException; import javax.net.ssl.SSLSession; @@ -119,13 +118,8 @@ public class DefaultKafkaPrincipalBuilder implements KafkaPrincipalBuilder, Clos SaslServer saslServer = ((SaslAuthenticationContext) context).server(); if (SaslConfigs.GSSAPI_MECHANISM.equals(saslServer.getMechanismName())) return applyKerberosShortNamer(saslServer.getAuthorizationID()); - else { - Boolean isTokenAuthenticated = (Boolean) saslServer.getNegotiatedProperty(ScramLoginModule.TOKEN_AUTH_CONFIG); - if (isTokenAuthenticated != null && isTokenAuthenticated) - return new KafkaPrincipal(KafkaPrincipal.USER_TYPE, saslServer.getAuthorizationID(), true); - else - return new KafkaPrincipal(KafkaPrincipal.USER_TYPE, saslServer.getAuthorizationID()); - } + else + return new KafkaPrincipal(KafkaPrincipal.USER_TYPE, saslServer.getAuthorizationID()); } else { throw new IllegalArgumentException("Unhandled authentication context type: " + context.getClass().getName()); } diff --git a/clients/src/main/java/org/apache/kafka/common/security/authenticator/SaslClientCallbackHandler.java b/clients/src/main/java/org/apache/kafka/common/security/authenticator/SaslClientCallbackHandler.java index de96cef1d43..31c51c22ca3 100644 --- a/clients/src/main/java/org/apache/kafka/common/security/authenticator/SaslClientCallbackHandler.java +++ b/clients/src/main/java/org/apache/kafka/common/security/authenticator/SaslClientCallbackHandler.java @@ -28,7 +28,7 @@ import javax.security.sasl.RealmCallback; import org.apache.kafka.common.config.SaslConfigs; import org.apache.kafka.common.network.Mode; -import org.apache.kafka.common.security.scram.DelegationTokenAuthenticationCallback; +import org.apache.kafka.common.security.scram.ScramExtensionsCallback; /** * Callback handler for Sasl clients. The callbacks required for the SASL mechanism @@ -81,10 +81,10 @@ public class SaslClientCallbackHandler implements AuthCallbackHandler { ac.setAuthorized(authId.equals(authzId)); if (ac.isAuthorized()) ac.setAuthorizedID(authzId); - } else if (callback instanceof DelegationTokenAuthenticationCallback) { - DelegationTokenAuthenticationCallback tc = (DelegationTokenAuthenticationCallback) callback; - if (!isKerberos && subject != null && !subject.getPublicCredentials(Boolean.class).isEmpty()) { - tc.tokenauth(subject.getPublicCredentials(Boolean.class).iterator().next()); + } else if (callback instanceof ScramExtensionsCallback) { + ScramExtensionsCallback sc = (ScramExtensionsCallback) callback; + if (!isKerberos && subject != null && !subject.getPublicCredentials(Map.class).isEmpty()) { + sc.extensions((Map) subject.getPublicCredentials(Map.class).iterator().next()); } } else { throw new UnsupportedCallbackException(callback, "Unrecognized SASL ClientCallback"); diff --git a/clients/src/main/java/org/apache/kafka/common/security/authenticator/SaslServerAuthenticator.java b/clients/src/main/java/org/apache/kafka/common/security/authenticator/SaslServerAuthenticator.java index ca6e9d2c21a..2a80e5bc0e4 100644 --- a/clients/src/main/java/org/apache/kafka/common/security/authenticator/SaslServerAuthenticator.java +++ b/clients/src/main/java/org/apache/kafka/common/security/authenticator/SaslServerAuthenticator.java @@ -53,6 +53,7 @@ import org.apache.kafka.common.security.auth.SaslAuthenticationContext; import org.apache.kafka.common.security.kerberos.KerberosName; import org.apache.kafka.common.security.kerberos.KerberosShortNamer; import org.apache.kafka.common.security.scram.ScramCredential; +import org.apache.kafka.common.security.scram.ScramLoginModule; import org.apache.kafka.common.security.scram.ScramMechanism; import org.apache.kafka.common.security.scram.ScramServerCallbackHandler; import org.apache.kafka.common.utils.Utils; @@ -297,7 +298,11 @@ public class SaslServerAuthenticator implements Authenticator { @Override public KafkaPrincipal principal() { SaslAuthenticationContext context = new SaslAuthenticationContext(saslServer, securityProtocol, clientAddress()); - return principalBuilder.build(context); + KafkaPrincipal principal = principalBuilder.build(context); + if (ScramMechanism.isScram(saslMechanism) && Boolean.parseBoolean((String) saslServer.getNegotiatedProperty(ScramLoginModule.TOKEN_AUTH_CONFIG))) { + principal.tokenAuthenticated(true); + } + return principal; } @Override diff --git a/clients/src/main/java/org/apache/kafka/common/security/scram/ScramCredentialCallback.java b/clients/src/main/java/org/apache/kafka/common/security/scram/ScramCredentialCallback.java index 7f3601cf24c..931210a0e07 100644 --- a/clients/src/main/java/org/apache/kafka/common/security/scram/ScramCredentialCallback.java +++ b/clients/src/main/java/org/apache/kafka/common/security/scram/ScramCredentialCallback.java @@ -20,36 +20,12 @@ import javax.security.auth.callback.Callback; public class ScramCredentialCallback implements Callback { private ScramCredential scramCredential; - private boolean tokenAuthenticated; - private String tokenOwner; - private String mechanism; - - public ScramCredentialCallback(boolean tokenAuthenticated, String mechanism) { - this.tokenAuthenticated = tokenAuthenticated; - this.mechanism = mechanism; - } public ScramCredential scramCredential() { return scramCredential; } - public boolean tokenauth() { - return tokenAuthenticated; - } - public void scramCredential(ScramCredential scramCredential) { this.scramCredential = scramCredential; } - - public void tokenOwner(String tokenOwner) { - this.tokenOwner = tokenOwner; - } - - public String tokenOwner() { - return tokenOwner; - } - - public String mechanism() { - return mechanism; - } } \ No newline at end of file diff --git a/clients/src/main/java/org/apache/kafka/common/security/scram/ScramExtensions.java b/clients/src/main/java/org/apache/kafka/common/security/scram/ScramExtensions.java new file mode 100644 index 00000000000..0f461c0e3c2 --- /dev/null +++ b/clients/src/main/java/org/apache/kafka/common/security/scram/ScramExtensions.java @@ -0,0 +1,78 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.common.security.scram; + +import java.util.Collections; +import java.util.HashMap; +import java.util.Map; +import java.util.Set; + +public class ScramExtensions { + private final Map extensionMap; + + public ScramExtensions() { + this(Collections.emptyMap()); + } + + public ScramExtensions(String extensions) { + this(stringToMap(extensions)); + } + + public ScramExtensions(Map extensionMap) { + this.extensionMap = extensionMap; + } + + public String extensionValue(String name) { + return extensionMap.get(name); + } + + public Set extensionNames() { + return extensionMap.keySet(); + } + + public boolean tokenAuthenticated() { + return Boolean.parseBoolean(extensionMap.get(ScramLoginModule.TOKEN_AUTH_CONFIG)); + } + + @Override + public String toString() { + return mapToString(extensionMap); + } + + private static Map stringToMap(String extensions) { + Map extensionMap = new HashMap<>(); + + if (!extensions.isEmpty()) { + String[] attrvals = extensions.split(","); + for (String attrval : attrvals) { + String[] array = attrval.split("=", 2); + extensionMap.put(array[0], array[1]); + } + } + return extensionMap; + } + + private static String mapToString(Map extensionMap) { + StringBuilder builder = new StringBuilder(); + for (Map.Entry entry : extensionMap.entrySet()) { + builder.append(entry.getKey()); + builder.append('='); + builder.append(entry.getValue()); + } + return builder.toString(); + } +} \ No newline at end of file diff --git a/clients/src/main/java/org/apache/kafka/common/security/scram/DelegationTokenAuthenticationCallback.java b/clients/src/main/java/org/apache/kafka/common/security/scram/ScramExtensionsCallback.java similarity index 72% rename from clients/src/main/java/org/apache/kafka/common/security/scram/DelegationTokenAuthenticationCallback.java rename to clients/src/main/java/org/apache/kafka/common/security/scram/ScramExtensionsCallback.java index df6e849309a..b40468bd650 100644 --- a/clients/src/main/java/org/apache/kafka/common/security/scram/DelegationTokenAuthenticationCallback.java +++ b/clients/src/main/java/org/apache/kafka/common/security/scram/ScramExtensionsCallback.java @@ -18,15 +18,17 @@ package org.apache.kafka.common.security.scram; import javax.security.auth.callback.Callback; +import java.util.Collections; +import java.util.Map; -public class DelegationTokenAuthenticationCallback implements Callback { - private boolean tokenauth; +public class ScramExtensionsCallback implements Callback { + private Map extensions = Collections.emptyMap(); - public String extension() { - return ScramLoginModule.TOKEN_AUTH_CONFIG + "=" + Boolean.toString(tokenauth); + public Map extensions() { + return extensions; } - public void tokenauth(Boolean tokenauth) { - this.tokenauth = tokenauth; + public void extensions(Map extensions) { + this.extensions = extensions; } } \ No newline at end of file diff --git a/clients/src/main/java/org/apache/kafka/common/security/scram/ScramLoginModule.java b/clients/src/main/java/org/apache/kafka/common/security/scram/ScramLoginModule.java index 8000f4c7f2a..43df515256f 100644 --- a/clients/src/main/java/org/apache/kafka/common/security/scram/ScramLoginModule.java +++ b/clients/src/main/java/org/apache/kafka/common/security/scram/ScramLoginModule.java @@ -16,6 +16,7 @@ */ package org.apache.kafka.common.security.scram; +import java.util.Collections; import java.util.Map; import javax.security.auth.Subject; @@ -44,7 +45,10 @@ public class ScramLoginModule implements LoginModule { subject.getPrivateCredentials().add(password); Boolean useTokenAuthentication = "true".equalsIgnoreCase((String) options.get(TOKEN_AUTH_CONFIG)); - subject.getPublicCredentials().add(useTokenAuthentication); + if (useTokenAuthentication) { + Map scramExtensions = Collections.singletonMap(TOKEN_AUTH_CONFIG, "true"); + subject.getPublicCredentials().add(scramExtensions); + } } @Override diff --git a/clients/src/main/java/org/apache/kafka/common/security/scram/ScramMessages.java b/clients/src/main/java/org/apache/kafka/common/security/scram/ScramMessages.java index e697ea5f7ba..05b3d775540 100644 --- a/clients/src/main/java/org/apache/kafka/common/security/scram/ScramMessages.java +++ b/clients/src/main/java/org/apache/kafka/common/security/scram/ScramMessages.java @@ -19,7 +19,6 @@ package org.apache.kafka.common.security.scram; import org.apache.kafka.common.utils.Base64; import java.nio.charset.StandardCharsets; -import java.util.HashMap; import java.util.Map; import java.util.regex.Matcher; import java.util.regex.Pattern; @@ -77,7 +76,7 @@ public class ScramMessages { private final String saslName; private final String nonce; private final String authorizationId; - private final String extensions; + private final ScramExtensions extensions; public ClientFirstMessage(byte[] messageBytes) throws SaslException { String message = toMessage(messageBytes); Matcher matcher = PATTERN.matcher(message); @@ -88,12 +87,13 @@ public class ScramMessages { this.saslName = matcher.group("saslname"); this.nonce = matcher.group("nonce"); String extString = matcher.group("extensions"); - this.extensions = extString.startsWith(",") ? extString.substring(1) : extString; + + this.extensions = extString.startsWith(",") ? new ScramExtensions(extString.substring(1)) : new ScramExtensions(); } - public ClientFirstMessage(String saslName, String nonce, String extensions) { + public ClientFirstMessage(String saslName, String nonce, Map extensions) { this.saslName = saslName; this.nonce = nonce; - this.extensions = extensions; + this.extensions = new ScramExtensions(extensions); this.authorizationId = ""; // Optional authzid not specified in gs2-header } public String saslName() { @@ -108,29 +108,16 @@ public class ScramMessages { public String gs2Header() { return "n," + authorizationId + ","; } - public String extensions() { + public ScramExtensions extensions() { return extensions; } - public Map extensionsAsMap() { - Map map = new HashMap<>(); - - if (extensions.isEmpty()) - return map; - - String[] attrvals = extensions.split(","); - for (String attrval: attrvals) { - String[] array = attrval.split("=", 2); - map.put(array[0], array[1]); - } - return map; - } - public String clientFirstMessageBare() { - if (extensions.isEmpty()) + String extensionStr = extensions.toString(); + if (extensionStr.isEmpty()) return String.format("n=%s,r=%s", saslName, nonce); else - return String.format("n=%s,r=%s,%s", saslName, nonce, extensions); + return String.format("n=%s,r=%s,%s", saslName, nonce, extensionStr); } String toMessage() { return gs2Header() + clientFirstMessageBare(); diff --git a/clients/src/main/java/org/apache/kafka/common/security/scram/ScramSaslClient.java b/clients/src/main/java/org/apache/kafka/common/security/scram/ScramSaslClient.java index 6b66f5ece99..71109df2ddd 100644 --- a/clients/src/main/java/org/apache/kafka/common/security/scram/ScramSaslClient.java +++ b/clients/src/main/java/org/apache/kafka/common/security/scram/ScramSaslClient.java @@ -97,18 +97,18 @@ public class ScramSaslClient implements SaslClient { throw new SaslException("Expected empty challenge"); clientNonce = formatter.secureRandomString(); NameCallback nameCallback = new NameCallback("Name:"); - DelegationTokenAuthenticationCallback tokenAuthCallback = new DelegationTokenAuthenticationCallback(); + ScramExtensionsCallback extensionsCallback = new ScramExtensionsCallback(); try { - callbackHandler.handle(new Callback[]{nameCallback, tokenAuthCallback}); + callbackHandler.handle(new Callback[]{nameCallback, extensionsCallback}); } catch (IOException | UnsupportedCallbackException e) { throw new SaslException("User name could not be obtained", e); } String username = nameCallback.getName(); String saslName = formatter.saslName(username); - String extension = tokenAuthCallback.extension(); - this.clientFirstMessage = new ScramMessages.ClientFirstMessage(saslName, clientNonce, extension); + Map extensions = extensionsCallback.extensions(); + this.clientFirstMessage = new ScramMessages.ClientFirstMessage(saslName, clientNonce, extensions); setState(State.RECEIVE_SERVER_FIRST_MESSAGE); return clientFirstMessage.toBytes(); diff --git a/clients/src/main/java/org/apache/kafka/common/security/scram/ScramSaslServer.java b/clients/src/main/java/org/apache/kafka/common/security/scram/ScramSaslServer.java index 94b92b6e1ea..314c1d413ba 100644 --- a/clients/src/main/java/org/apache/kafka/common/security/scram/ScramSaslServer.java +++ b/clients/src/main/java/org/apache/kafka/common/security/scram/ScramSaslServer.java @@ -22,6 +22,7 @@ import java.security.NoSuchAlgorithmException; import java.util.Arrays; import java.util.Collection; import java.util.Map; +import java.util.Set; import javax.security.auth.callback.Callback; import javax.security.auth.callback.CallbackHandler; @@ -37,6 +38,8 @@ import org.apache.kafka.common.security.scram.ScramMessages.ClientFinalMessage; import org.apache.kafka.common.security.scram.ScramMessages.ClientFirstMessage; import org.apache.kafka.common.security.scram.ScramMessages.ServerFinalMessage; import org.apache.kafka.common.security.scram.ScramMessages.ServerFirstMessage; +import org.apache.kafka.common.security.token.delegation.DelegationTokenCredentialCallback; +import org.apache.kafka.common.utils.Utils; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -50,6 +53,7 @@ import org.slf4j.LoggerFactory; public class ScramSaslServer implements SaslServer { private static final Logger log = LoggerFactory.getLogger(ScramSaslServer.class); + private static final Set SUPPORTED_EXTENSIONS = Utils.mkSet(ScramLoginModule.TOKEN_AUTH_CONFIG); enum State { RECEIVE_CLIENT_FIRST_MESSAGE, @@ -65,9 +69,9 @@ public class ScramSaslServer implements SaslServer { private String username; private ClientFirstMessage clientFirstMessage; private ServerFirstMessage serverFirstMessage; + private ScramExtensions scramExtensions; private ScramCredential scramCredential; - private boolean tokenAuthentication; - private String tokenOwner; + private String authorizationId; public ScramSaslServer(ScramMechanism mechanism, Map props, CallbackHandler callbackHandler) throws NoSuchAlgorithmException { this.mechanism = mechanism; @@ -91,18 +95,29 @@ public class ScramSaslServer implements SaslServer { switch (state) { case RECEIVE_CLIENT_FIRST_MESSAGE: this.clientFirstMessage = new ClientFirstMessage(response); + this.scramExtensions = clientFirstMessage.extensions(); + if (!SUPPORTED_EXTENSIONS.containsAll(scramExtensions.extensionNames())) { + log.debug("Unsupported extensions will be ignored, supported {}, provided {}", + SUPPORTED_EXTENSIONS, scramExtensions.extensionNames()); + } String serverNonce = formatter.secureRandomString(); try { String saslName = clientFirstMessage.saslName(); this.username = formatter.username(saslName); - Map extensions = clientFirstMessage.extensionsAsMap(); - this.tokenAuthentication = "true".equalsIgnoreCase(extensions.get(ScramLoginModule.TOKEN_AUTH_CONFIG)); NameCallback nameCallback = new NameCallback("username", username); - ScramCredentialCallback credentialCallback = new ScramCredentialCallback(tokenAuthentication, getMechanismName()); - callbackHandler.handle(new Callback[]{nameCallback, credentialCallback}); - this.tokenOwner = credentialCallback.tokenOwner(); - if (tokenAuthentication && tokenOwner == null) - throw new SaslException("Token Authentication failed: Invalid tokenId : " + username); + ScramCredentialCallback credentialCallback; + if (scramExtensions.tokenAuthenticated()) { + DelegationTokenCredentialCallback tokenCallback = new DelegationTokenCredentialCallback(); + credentialCallback = tokenCallback; + callbackHandler.handle(new Callback[]{nameCallback, tokenCallback}); + if (tokenCallback.tokenOwner() == null) + throw new SaslException("Token Authentication failed: Invalid tokenId : " + username); + this.authorizationId = tokenCallback.tokenOwner(); + } else { + credentialCallback = new ScramCredentialCallback(); + callbackHandler.handle(new Callback[]{nameCallback, credentialCallback}); + this.authorizationId = username; + } this.scramCredential = credentialCallback.scramCredential(); if (scramCredential == null) throw new SaslException("Authentication failed: Invalid user credentials"); @@ -150,11 +165,7 @@ public class ScramSaslServer implements SaslServer { public String getAuthorizationID() { if (!isComplete()) throw new IllegalStateException("Authentication exchange has not completed"); - - if (tokenAuthentication) - return tokenOwner; // return token owner as principal for this session - - return username; + return authorizationId; } @Override @@ -167,8 +178,8 @@ public class ScramSaslServer implements SaslServer { if (!isComplete()) throw new IllegalStateException("Authentication exchange has not completed"); - if (ScramLoginModule.TOKEN_AUTH_CONFIG.equals(propName)) - return tokenAuthentication; + if (SUPPORTED_EXTENSIONS.contains(propName)) + return scramExtensions.extensionValue(propName); else return null; } diff --git a/clients/src/main/java/org/apache/kafka/common/security/scram/ScramServerCallbackHandler.java b/clients/src/main/java/org/apache/kafka/common/security/scram/ScramServerCallbackHandler.java index a064e8a3423..5e37eae9d4f 100644 --- a/clients/src/main/java/org/apache/kafka/common/security/scram/ScramServerCallbackHandler.java +++ b/clients/src/main/java/org/apache/kafka/common/security/scram/ScramServerCallbackHandler.java @@ -28,11 +28,13 @@ import org.apache.kafka.common.network.Mode; import org.apache.kafka.common.security.authenticator.AuthCallbackHandler; import org.apache.kafka.common.security.authenticator.CredentialCache; import org.apache.kafka.common.security.token.delegation.DelegationTokenCache; +import org.apache.kafka.common.security.token.delegation.DelegationTokenCredentialCallback; public class ScramServerCallbackHandler implements AuthCallbackHandler { private final CredentialCache.Cache credentialCache; private final DelegationTokenCache tokenCache; + private String saslMechanism; public ScramServerCallbackHandler(CredentialCache.Cache credentialCache, DelegationTokenCache tokenCache) { @@ -46,13 +48,13 @@ public class ScramServerCallbackHandler implements AuthCallbackHandler { for (Callback callback : callbacks) { if (callback instanceof NameCallback) username = ((NameCallback) callback).getDefaultName(); - else if (callback instanceof ScramCredentialCallback) { + else if (callback instanceof DelegationTokenCredentialCallback) { + DelegationTokenCredentialCallback tokenCallback = (DelegationTokenCredentialCallback) callback; + tokenCallback.scramCredential(tokenCache.credential(saslMechanism, username)); + tokenCallback.tokenOwner(tokenCache.owner(username)); + } else if (callback instanceof ScramCredentialCallback) { ScramCredentialCallback sc = (ScramCredentialCallback) callback; - if (sc.tokenauth()) { - sc.scramCredential(tokenCache.credential(sc.mechanism(), username)); - sc.tokenOwner(tokenCache.owner(username)); - } else - sc.scramCredential(credentialCache.get(username)); + sc.scramCredential(credentialCache.get(username)); } else throw new UnsupportedCallbackException(callback); } @@ -60,6 +62,7 @@ public class ScramServerCallbackHandler implements AuthCallbackHandler { @Override public void configure(Map configs, Mode mode, Subject subject, String saslMechanism) { + this.saslMechanism = saslMechanism; } @Override diff --git a/clients/src/main/java/org/apache/kafka/common/security/token/delegation/DelegationTokenCredentialCallback.java b/clients/src/main/java/org/apache/kafka/common/security/token/delegation/DelegationTokenCredentialCallback.java new file mode 100644 index 00000000000..7490a3e91b1 --- /dev/null +++ b/clients/src/main/java/org/apache/kafka/common/security/token/delegation/DelegationTokenCredentialCallback.java @@ -0,0 +1,31 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.common.security.token.delegation; + +import org.apache.kafka.common.security.scram.ScramCredentialCallback; + +public class DelegationTokenCredentialCallback extends ScramCredentialCallback { + private String tokenOwner; + + public void tokenOwner(String tokenOwner) { + this.tokenOwner = tokenOwner; + } + + public String tokenOwner() { + return tokenOwner; + } +} \ No newline at end of file diff --git a/clients/src/test/java/org/apache/kafka/common/security/auth/DefaultKafkaPrincipalBuilderTest.java b/clients/src/test/java/org/apache/kafka/common/security/auth/DefaultKafkaPrincipalBuilderTest.java index 787f5a72cd7..a30c09ff316 100644 --- a/clients/src/test/java/org/apache/kafka/common/security/auth/DefaultKafkaPrincipalBuilderTest.java +++ b/clients/src/test/java/org/apache/kafka/common/security/auth/DefaultKafkaPrincipalBuilderTest.java @@ -22,7 +22,6 @@ import org.apache.kafka.common.network.TransportLayer; import org.apache.kafka.common.security.authenticator.DefaultKafkaPrincipalBuilder; import org.apache.kafka.common.security.kerberos.KerberosName; import org.apache.kafka.common.security.kerberos.KerberosShortNamer; -import org.apache.kafka.common.security.scram.ScramLoginModule; import org.apache.kafka.common.security.scram.ScramMechanism; import org.easymock.EasyMock; import org.easymock.EasyMockSupport; @@ -119,7 +118,6 @@ public class DefaultKafkaPrincipalBuilderTest extends EasyMockSupport { EasyMock.expect(server.getMechanismName()).andReturn(ScramMechanism.SCRAM_SHA_256.mechanismName()); EasyMock.expect(server.getAuthorizationID()).andReturn("foo"); - EasyMock.expect(server.getNegotiatedProperty(ScramLoginModule.TOKEN_AUTH_CONFIG)).andReturn(false); replayAll(); diff --git a/clients/src/test/java/org/apache/kafka/common/security/scram/ScramMessagesTest.java b/clients/src/test/java/org/apache/kafka/common/security/scram/ScramMessagesTest.java index 58be6e19bf6..7b04ede6e78 100644 --- a/clients/src/test/java/org/apache/kafka/common/security/scram/ScramMessagesTest.java +++ b/clients/src/test/java/org/apache/kafka/common/security/scram/ScramMessagesTest.java @@ -21,12 +21,14 @@ import org.junit.Before; import org.junit.Test; import java.nio.charset.StandardCharsets; +import java.util.Collections; import javax.security.sasl.SaslException; import static org.junit.Assert.assertArrayEquals; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertNull; +import static org.junit.Assert.assertTrue; import static org.junit.Assert.fail; import org.apache.kafka.common.security.scram.ScramMessages.AbstractScramMessage; @@ -70,7 +72,7 @@ public class ScramMessagesTest { @Test public void validClientFirstMessage() throws SaslException { String nonce = formatter.secureRandomString(); - ClientFirstMessage m = new ClientFirstMessage("someuser", nonce, ""); + ClientFirstMessage m = new ClientFirstMessage("someuser", nonce, Collections.emptyMap()); checkClientFirstMessage(m, "someuser", nonce, ""); // Default format used by Kafka client: only user and nonce are specified @@ -111,7 +113,7 @@ public class ScramMessagesTest { //optional tokenauth specified as extensions str = String.format("n,,n=testuser,r=%s,%s", nonce, "tokenauth=true"); m = createScramMessage(ClientFirstMessage.class, str); - assertEquals("tokenauth=true", m.extensions()); + assertTrue("Token authentication not set from extensions", m.extensions().tokenAuthenticated()); } @Test