Browse Source

KAFKA-6532: Reduce impact of delegation tokens on public interfaces (#4524)

Keep delegation token implementation internal without exposing implementation details to pluggable classes:
  1. KafkaPrincipal#tokenAuthenticated must always be set by SaslServerAuthenticator so that custom PrincipalBuilders cannot override.
  2. Replace o.a.k.c.security.scram.DelegationTokenAuthenticationCallback with a more generic ScramExtensionsCallback that can be used to add more extensions in future.
  3. Separate out ScramCredentialCallback (KIP-86 makes this a public interface) from delegation token credential callback (which is internal).

Reviewers: Jun Rao <junrao@gmail.com>, Manikumar Reddy <manikumar.reddy@gmail.com>
pull/4543/head
Rajini Sivaram 7 years ago committed by GitHub
parent
commit
65b5ccf641
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 12
      clients/src/main/java/org/apache/kafka/common/security/auth/KafkaPrincipal.java
  2. 10
      clients/src/main/java/org/apache/kafka/common/security/authenticator/DefaultKafkaPrincipalBuilder.java
  3. 10
      clients/src/main/java/org/apache/kafka/common/security/authenticator/SaslClientCallbackHandler.java
  4. 7
      clients/src/main/java/org/apache/kafka/common/security/authenticator/SaslServerAuthenticator.java
  5. 24
      clients/src/main/java/org/apache/kafka/common/security/scram/ScramCredentialCallback.java
  6. 78
      clients/src/main/java/org/apache/kafka/common/security/scram/ScramExtensions.java
  7. 14
      clients/src/main/java/org/apache/kafka/common/security/scram/ScramExtensionsCallback.java
  8. 6
      clients/src/main/java/org/apache/kafka/common/security/scram/ScramLoginModule.java
  9. 31
      clients/src/main/java/org/apache/kafka/common/security/scram/ScramMessages.java
  10. 8
      clients/src/main/java/org/apache/kafka/common/security/scram/ScramSaslClient.java
  11. 43
      clients/src/main/java/org/apache/kafka/common/security/scram/ScramSaslServer.java
  12. 15
      clients/src/main/java/org/apache/kafka/common/security/scram/ScramServerCallbackHandler.java
  13. 31
      clients/src/main/java/org/apache/kafka/common/security/token/delegation/DelegationTokenCredentialCallback.java
  14. 2
      clients/src/test/java/org/apache/kafka/common/security/auth/DefaultKafkaPrincipalBuilderTest.java
  15. 6
      clients/src/test/java/org/apache/kafka/common/security/scram/ScramMessagesTest.java

12
clients/src/main/java/org/apache/kafka/common/security/auth/KafkaPrincipal.java

@ -47,16 +47,11 @@ public class KafkaPrincipal implements Principal { @@ -47,16 +47,11 @@ public class KafkaPrincipal implements Principal {
private final String principalType;
private final String name;
private boolean tokenAuthenticated;
private volatile boolean tokenAuthenticated;
public KafkaPrincipal(String principalType, String name) {
this(principalType, name, false);
}
public KafkaPrincipal(String principalType, String name, boolean tokenauth) {
this.principalType = requireNonNull(principalType, "Principal type cannot be null");
this.name = requireNonNull(name, "Principal name cannot be null");
this.tokenAuthenticated = tokenauth;
}
/**
@ -91,7 +86,6 @@ public class KafkaPrincipal implements Principal { @@ -91,7 +86,6 @@ public class KafkaPrincipal implements Principal {
public int hashCode() {
int result = principalType != null ? principalType.hashCode() : 0;
result = 31 * result + (name != null ? name.hashCode() : 0);
result = 31 * result + (tokenAuthenticated ? 1 : 0);
return result;
}
@ -104,6 +98,10 @@ public class KafkaPrincipal implements Principal { @@ -104,6 +98,10 @@ public class KafkaPrincipal implements Principal {
return principalType;
}
public void tokenAuthenticated(boolean tokenAuthenticated) {
this.tokenAuthenticated = tokenAuthenticated;
}
public boolean tokenAuthenticated() {
return tokenAuthenticated;
}

10
clients/src/main/java/org/apache/kafka/common/security/authenticator/DefaultKafkaPrincipalBuilder.java

@ -28,7 +28,6 @@ import org.apache.kafka.common.security.auth.SaslAuthenticationContext; @@ -28,7 +28,6 @@ import org.apache.kafka.common.security.auth.SaslAuthenticationContext;
import org.apache.kafka.common.security.auth.SslAuthenticationContext;
import org.apache.kafka.common.security.kerberos.KerberosName;
import org.apache.kafka.common.security.kerberos.KerberosShortNamer;
import org.apache.kafka.common.security.scram.ScramLoginModule;
import javax.net.ssl.SSLPeerUnverifiedException;
import javax.net.ssl.SSLSession;
@ -119,13 +118,8 @@ public class DefaultKafkaPrincipalBuilder implements KafkaPrincipalBuilder, Clos @@ -119,13 +118,8 @@ public class DefaultKafkaPrincipalBuilder implements KafkaPrincipalBuilder, Clos
SaslServer saslServer = ((SaslAuthenticationContext) context).server();
if (SaslConfigs.GSSAPI_MECHANISM.equals(saslServer.getMechanismName()))
return applyKerberosShortNamer(saslServer.getAuthorizationID());
else {
Boolean isTokenAuthenticated = (Boolean) saslServer.getNegotiatedProperty(ScramLoginModule.TOKEN_AUTH_CONFIG);
if (isTokenAuthenticated != null && isTokenAuthenticated)
return new KafkaPrincipal(KafkaPrincipal.USER_TYPE, saslServer.getAuthorizationID(), true);
else
return new KafkaPrincipal(KafkaPrincipal.USER_TYPE, saslServer.getAuthorizationID());
}
else
return new KafkaPrincipal(KafkaPrincipal.USER_TYPE, saslServer.getAuthorizationID());
} else {
throw new IllegalArgumentException("Unhandled authentication context type: " + context.getClass().getName());
}

10
clients/src/main/java/org/apache/kafka/common/security/authenticator/SaslClientCallbackHandler.java

@ -28,7 +28,7 @@ import javax.security.sasl.RealmCallback; @@ -28,7 +28,7 @@ import javax.security.sasl.RealmCallback;
import org.apache.kafka.common.config.SaslConfigs;
import org.apache.kafka.common.network.Mode;
import org.apache.kafka.common.security.scram.DelegationTokenAuthenticationCallback;
import org.apache.kafka.common.security.scram.ScramExtensionsCallback;
/**
* Callback handler for Sasl clients. The callbacks required for the SASL mechanism
@ -81,10 +81,10 @@ public class SaslClientCallbackHandler implements AuthCallbackHandler { @@ -81,10 +81,10 @@ public class SaslClientCallbackHandler implements AuthCallbackHandler {
ac.setAuthorized(authId.equals(authzId));
if (ac.isAuthorized())
ac.setAuthorizedID(authzId);
} else if (callback instanceof DelegationTokenAuthenticationCallback) {
DelegationTokenAuthenticationCallback tc = (DelegationTokenAuthenticationCallback) callback;
if (!isKerberos && subject != null && !subject.getPublicCredentials(Boolean.class).isEmpty()) {
tc.tokenauth(subject.getPublicCredentials(Boolean.class).iterator().next());
} else if (callback instanceof ScramExtensionsCallback) {
ScramExtensionsCallback sc = (ScramExtensionsCallback) callback;
if (!isKerberos && subject != null && !subject.getPublicCredentials(Map.class).isEmpty()) {
sc.extensions((Map<String, String>) subject.getPublicCredentials(Map.class).iterator().next());
}
} else {
throw new UnsupportedCallbackException(callback, "Unrecognized SASL ClientCallback");

7
clients/src/main/java/org/apache/kafka/common/security/authenticator/SaslServerAuthenticator.java

@ -53,6 +53,7 @@ import org.apache.kafka.common.security.auth.SaslAuthenticationContext; @@ -53,6 +53,7 @@ import org.apache.kafka.common.security.auth.SaslAuthenticationContext;
import org.apache.kafka.common.security.kerberos.KerberosName;
import org.apache.kafka.common.security.kerberos.KerberosShortNamer;
import org.apache.kafka.common.security.scram.ScramCredential;
import org.apache.kafka.common.security.scram.ScramLoginModule;
import org.apache.kafka.common.security.scram.ScramMechanism;
import org.apache.kafka.common.security.scram.ScramServerCallbackHandler;
import org.apache.kafka.common.utils.Utils;
@ -297,7 +298,11 @@ public class SaslServerAuthenticator implements Authenticator { @@ -297,7 +298,11 @@ public class SaslServerAuthenticator implements Authenticator {
@Override
public KafkaPrincipal principal() {
SaslAuthenticationContext context = new SaslAuthenticationContext(saslServer, securityProtocol, clientAddress());
return principalBuilder.build(context);
KafkaPrincipal principal = principalBuilder.build(context);
if (ScramMechanism.isScram(saslMechanism) && Boolean.parseBoolean((String) saslServer.getNegotiatedProperty(ScramLoginModule.TOKEN_AUTH_CONFIG))) {
principal.tokenAuthenticated(true);
}
return principal;
}
@Override

24
clients/src/main/java/org/apache/kafka/common/security/scram/ScramCredentialCallback.java

@ -20,36 +20,12 @@ import javax.security.auth.callback.Callback; @@ -20,36 +20,12 @@ import javax.security.auth.callback.Callback;
public class ScramCredentialCallback implements Callback {
private ScramCredential scramCredential;
private boolean tokenAuthenticated;
private String tokenOwner;
private String mechanism;
public ScramCredentialCallback(boolean tokenAuthenticated, String mechanism) {
this.tokenAuthenticated = tokenAuthenticated;
this.mechanism = mechanism;
}
public ScramCredential scramCredential() {
return scramCredential;
}
public boolean tokenauth() {
return tokenAuthenticated;
}
public void scramCredential(ScramCredential scramCredential) {
this.scramCredential = scramCredential;
}
public void tokenOwner(String tokenOwner) {
this.tokenOwner = tokenOwner;
}
public String tokenOwner() {
return tokenOwner;
}
public String mechanism() {
return mechanism;
}
}

78
clients/src/main/java/org/apache/kafka/common/security/scram/ScramExtensions.java

@ -0,0 +1,78 @@ @@ -0,0 +1,78 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.common.security.scram;
import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
import java.util.Set;
public class ScramExtensions {
private final Map<String, String> extensionMap;
public ScramExtensions() {
this(Collections.<String, String>emptyMap());
}
public ScramExtensions(String extensions) {
this(stringToMap(extensions));
}
public ScramExtensions(Map<String, String> extensionMap) {
this.extensionMap = extensionMap;
}
public String extensionValue(String name) {
return extensionMap.get(name);
}
public Set<String> extensionNames() {
return extensionMap.keySet();
}
public boolean tokenAuthenticated() {
return Boolean.parseBoolean(extensionMap.get(ScramLoginModule.TOKEN_AUTH_CONFIG));
}
@Override
public String toString() {
return mapToString(extensionMap);
}
private static Map<String, String> stringToMap(String extensions) {
Map<String, String> extensionMap = new HashMap<>();
if (!extensions.isEmpty()) {
String[] attrvals = extensions.split(",");
for (String attrval : attrvals) {
String[] array = attrval.split("=", 2);
extensionMap.put(array[0], array[1]);
}
}
return extensionMap;
}
private static String mapToString(Map<String, String> extensionMap) {
StringBuilder builder = new StringBuilder();
for (Map.Entry<String, String> entry : extensionMap.entrySet()) {
builder.append(entry.getKey());
builder.append('=');
builder.append(entry.getValue());
}
return builder.toString();
}
}

14
clients/src/main/java/org/apache/kafka/common/security/scram/DelegationTokenAuthenticationCallback.java → clients/src/main/java/org/apache/kafka/common/security/scram/ScramExtensionsCallback.java

@ -18,15 +18,17 @@ @@ -18,15 +18,17 @@
package org.apache.kafka.common.security.scram;
import javax.security.auth.callback.Callback;
import java.util.Collections;
import java.util.Map;
public class DelegationTokenAuthenticationCallback implements Callback {
private boolean tokenauth;
public class ScramExtensionsCallback implements Callback {
private Map<String, String> extensions = Collections.emptyMap();
public String extension() {
return ScramLoginModule.TOKEN_AUTH_CONFIG + "=" + Boolean.toString(tokenauth);
public Map<String, String> extensions() {
return extensions;
}
public void tokenauth(Boolean tokenauth) {
this.tokenauth = tokenauth;
public void extensions(Map<String, String> extensions) {
this.extensions = extensions;
}
}

6
clients/src/main/java/org/apache/kafka/common/security/scram/ScramLoginModule.java

@ -16,6 +16,7 @@ @@ -16,6 +16,7 @@
*/
package org.apache.kafka.common.security.scram;
import java.util.Collections;
import java.util.Map;
import javax.security.auth.Subject;
@ -44,7 +45,10 @@ public class ScramLoginModule implements LoginModule { @@ -44,7 +45,10 @@ public class ScramLoginModule implements LoginModule {
subject.getPrivateCredentials().add(password);
Boolean useTokenAuthentication = "true".equalsIgnoreCase((String) options.get(TOKEN_AUTH_CONFIG));
subject.getPublicCredentials().add(useTokenAuthentication);
if (useTokenAuthentication) {
Map<String, String> scramExtensions = Collections.singletonMap(TOKEN_AUTH_CONFIG, "true");
subject.getPublicCredentials().add(scramExtensions);
}
}
@Override

31
clients/src/main/java/org/apache/kafka/common/security/scram/ScramMessages.java

@ -19,7 +19,6 @@ package org.apache.kafka.common.security.scram; @@ -19,7 +19,6 @@ package org.apache.kafka.common.security.scram;
import org.apache.kafka.common.utils.Base64;
import java.nio.charset.StandardCharsets;
import java.util.HashMap;
import java.util.Map;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
@ -77,7 +76,7 @@ public class ScramMessages { @@ -77,7 +76,7 @@ public class ScramMessages {
private final String saslName;
private final String nonce;
private final String authorizationId;
private final String extensions;
private final ScramExtensions extensions;
public ClientFirstMessage(byte[] messageBytes) throws SaslException {
String message = toMessage(messageBytes);
Matcher matcher = PATTERN.matcher(message);
@ -88,12 +87,13 @@ public class ScramMessages { @@ -88,12 +87,13 @@ public class ScramMessages {
this.saslName = matcher.group("saslname");
this.nonce = matcher.group("nonce");
String extString = matcher.group("extensions");
this.extensions = extString.startsWith(",") ? extString.substring(1) : extString;
this.extensions = extString.startsWith(",") ? new ScramExtensions(extString.substring(1)) : new ScramExtensions();
}
public ClientFirstMessage(String saslName, String nonce, String extensions) {
public ClientFirstMessage(String saslName, String nonce, Map<String, String> extensions) {
this.saslName = saslName;
this.nonce = nonce;
this.extensions = extensions;
this.extensions = new ScramExtensions(extensions);
this.authorizationId = ""; // Optional authzid not specified in gs2-header
}
public String saslName() {
@ -108,29 +108,16 @@ public class ScramMessages { @@ -108,29 +108,16 @@ public class ScramMessages {
public String gs2Header() {
return "n," + authorizationId + ",";
}
public String extensions() {
public ScramExtensions extensions() {
return extensions;
}
public Map<String, String> extensionsAsMap() {
Map<String, String> map = new HashMap<>();
if (extensions.isEmpty())
return map;
String[] attrvals = extensions.split(",");
for (String attrval: attrvals) {
String[] array = attrval.split("=", 2);
map.put(array[0], array[1]);
}
return map;
}
public String clientFirstMessageBare() {
if (extensions.isEmpty())
String extensionStr = extensions.toString();
if (extensionStr.isEmpty())
return String.format("n=%s,r=%s", saslName, nonce);
else
return String.format("n=%s,r=%s,%s", saslName, nonce, extensions);
return String.format("n=%s,r=%s,%s", saslName, nonce, extensionStr);
}
String toMessage() {
return gs2Header() + clientFirstMessageBare();

8
clients/src/main/java/org/apache/kafka/common/security/scram/ScramSaslClient.java

@ -97,18 +97,18 @@ public class ScramSaslClient implements SaslClient { @@ -97,18 +97,18 @@ public class ScramSaslClient implements SaslClient {
throw new SaslException("Expected empty challenge");
clientNonce = formatter.secureRandomString();
NameCallback nameCallback = new NameCallback("Name:");
DelegationTokenAuthenticationCallback tokenAuthCallback = new DelegationTokenAuthenticationCallback();
ScramExtensionsCallback extensionsCallback = new ScramExtensionsCallback();
try {
callbackHandler.handle(new Callback[]{nameCallback, tokenAuthCallback});
callbackHandler.handle(new Callback[]{nameCallback, extensionsCallback});
} catch (IOException | UnsupportedCallbackException e) {
throw new SaslException("User name could not be obtained", e);
}
String username = nameCallback.getName();
String saslName = formatter.saslName(username);
String extension = tokenAuthCallback.extension();
this.clientFirstMessage = new ScramMessages.ClientFirstMessage(saslName, clientNonce, extension);
Map<String, String> extensions = extensionsCallback.extensions();
this.clientFirstMessage = new ScramMessages.ClientFirstMessage(saslName, clientNonce, extensions);
setState(State.RECEIVE_SERVER_FIRST_MESSAGE);
return clientFirstMessage.toBytes();

43
clients/src/main/java/org/apache/kafka/common/security/scram/ScramSaslServer.java

@ -22,6 +22,7 @@ import java.security.NoSuchAlgorithmException; @@ -22,6 +22,7 @@ import java.security.NoSuchAlgorithmException;
import java.util.Arrays;
import java.util.Collection;
import java.util.Map;
import java.util.Set;
import javax.security.auth.callback.Callback;
import javax.security.auth.callback.CallbackHandler;
@ -37,6 +38,8 @@ import org.apache.kafka.common.security.scram.ScramMessages.ClientFinalMessage; @@ -37,6 +38,8 @@ import org.apache.kafka.common.security.scram.ScramMessages.ClientFinalMessage;
import org.apache.kafka.common.security.scram.ScramMessages.ClientFirstMessage;
import org.apache.kafka.common.security.scram.ScramMessages.ServerFinalMessage;
import org.apache.kafka.common.security.scram.ScramMessages.ServerFirstMessage;
import org.apache.kafka.common.security.token.delegation.DelegationTokenCredentialCallback;
import org.apache.kafka.common.utils.Utils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@ -50,6 +53,7 @@ import org.slf4j.LoggerFactory; @@ -50,6 +53,7 @@ import org.slf4j.LoggerFactory;
public class ScramSaslServer implements SaslServer {
private static final Logger log = LoggerFactory.getLogger(ScramSaslServer.class);
private static final Set<String> SUPPORTED_EXTENSIONS = Utils.mkSet(ScramLoginModule.TOKEN_AUTH_CONFIG);
enum State {
RECEIVE_CLIENT_FIRST_MESSAGE,
@ -65,9 +69,9 @@ public class ScramSaslServer implements SaslServer { @@ -65,9 +69,9 @@ public class ScramSaslServer implements SaslServer {
private String username;
private ClientFirstMessage clientFirstMessage;
private ServerFirstMessage serverFirstMessage;
private ScramExtensions scramExtensions;
private ScramCredential scramCredential;
private boolean tokenAuthentication;
private String tokenOwner;
private String authorizationId;
public ScramSaslServer(ScramMechanism mechanism, Map<String, ?> props, CallbackHandler callbackHandler) throws NoSuchAlgorithmException {
this.mechanism = mechanism;
@ -91,18 +95,29 @@ public class ScramSaslServer implements SaslServer { @@ -91,18 +95,29 @@ public class ScramSaslServer implements SaslServer {
switch (state) {
case RECEIVE_CLIENT_FIRST_MESSAGE:
this.clientFirstMessage = new ClientFirstMessage(response);
this.scramExtensions = clientFirstMessage.extensions();
if (!SUPPORTED_EXTENSIONS.containsAll(scramExtensions.extensionNames())) {
log.debug("Unsupported extensions will be ignored, supported {}, provided {}",
SUPPORTED_EXTENSIONS, scramExtensions.extensionNames());
}
String serverNonce = formatter.secureRandomString();
try {
String saslName = clientFirstMessage.saslName();
this.username = formatter.username(saslName);
Map<String, String> extensions = clientFirstMessage.extensionsAsMap();
this.tokenAuthentication = "true".equalsIgnoreCase(extensions.get(ScramLoginModule.TOKEN_AUTH_CONFIG));
NameCallback nameCallback = new NameCallback("username", username);
ScramCredentialCallback credentialCallback = new ScramCredentialCallback(tokenAuthentication, getMechanismName());
callbackHandler.handle(new Callback[]{nameCallback, credentialCallback});
this.tokenOwner = credentialCallback.tokenOwner();
if (tokenAuthentication && tokenOwner == null)
throw new SaslException("Token Authentication failed: Invalid tokenId : " + username);
ScramCredentialCallback credentialCallback;
if (scramExtensions.tokenAuthenticated()) {
DelegationTokenCredentialCallback tokenCallback = new DelegationTokenCredentialCallback();
credentialCallback = tokenCallback;
callbackHandler.handle(new Callback[]{nameCallback, tokenCallback});
if (tokenCallback.tokenOwner() == null)
throw new SaslException("Token Authentication failed: Invalid tokenId : " + username);
this.authorizationId = tokenCallback.tokenOwner();
} else {
credentialCallback = new ScramCredentialCallback();
callbackHandler.handle(new Callback[]{nameCallback, credentialCallback});
this.authorizationId = username;
}
this.scramCredential = credentialCallback.scramCredential();
if (scramCredential == null)
throw new SaslException("Authentication failed: Invalid user credentials");
@ -150,11 +165,7 @@ public class ScramSaslServer implements SaslServer { @@ -150,11 +165,7 @@ public class ScramSaslServer implements SaslServer {
public String getAuthorizationID() {
if (!isComplete())
throw new IllegalStateException("Authentication exchange has not completed");
if (tokenAuthentication)
return tokenOwner; // return token owner as principal for this session
return username;
return authorizationId;
}
@Override
@ -167,8 +178,8 @@ public class ScramSaslServer implements SaslServer { @@ -167,8 +178,8 @@ public class ScramSaslServer implements SaslServer {
if (!isComplete())
throw new IllegalStateException("Authentication exchange has not completed");
if (ScramLoginModule.TOKEN_AUTH_CONFIG.equals(propName))
return tokenAuthentication;
if (SUPPORTED_EXTENSIONS.contains(propName))
return scramExtensions.extensionValue(propName);
else
return null;
}

15
clients/src/main/java/org/apache/kafka/common/security/scram/ScramServerCallbackHandler.java

@ -28,11 +28,13 @@ import org.apache.kafka.common.network.Mode; @@ -28,11 +28,13 @@ import org.apache.kafka.common.network.Mode;
import org.apache.kafka.common.security.authenticator.AuthCallbackHandler;
import org.apache.kafka.common.security.authenticator.CredentialCache;
import org.apache.kafka.common.security.token.delegation.DelegationTokenCache;
import org.apache.kafka.common.security.token.delegation.DelegationTokenCredentialCallback;
public class ScramServerCallbackHandler implements AuthCallbackHandler {
private final CredentialCache.Cache<ScramCredential> credentialCache;
private final DelegationTokenCache tokenCache;
private String saslMechanism;
public ScramServerCallbackHandler(CredentialCache.Cache<ScramCredential> credentialCache,
DelegationTokenCache tokenCache) {
@ -46,13 +48,13 @@ public class ScramServerCallbackHandler implements AuthCallbackHandler { @@ -46,13 +48,13 @@ public class ScramServerCallbackHandler implements AuthCallbackHandler {
for (Callback callback : callbacks) {
if (callback instanceof NameCallback)
username = ((NameCallback) callback).getDefaultName();
else if (callback instanceof ScramCredentialCallback) {
else if (callback instanceof DelegationTokenCredentialCallback) {
DelegationTokenCredentialCallback tokenCallback = (DelegationTokenCredentialCallback) callback;
tokenCallback.scramCredential(tokenCache.credential(saslMechanism, username));
tokenCallback.tokenOwner(tokenCache.owner(username));
} else if (callback instanceof ScramCredentialCallback) {
ScramCredentialCallback sc = (ScramCredentialCallback) callback;
if (sc.tokenauth()) {
sc.scramCredential(tokenCache.credential(sc.mechanism(), username));
sc.tokenOwner(tokenCache.owner(username));
} else
sc.scramCredential(credentialCache.get(username));
sc.scramCredential(credentialCache.get(username));
} else
throw new UnsupportedCallbackException(callback);
}
@ -60,6 +62,7 @@ public class ScramServerCallbackHandler implements AuthCallbackHandler { @@ -60,6 +62,7 @@ public class ScramServerCallbackHandler implements AuthCallbackHandler {
@Override
public void configure(Map<String, ?> configs, Mode mode, Subject subject, String saslMechanism) {
this.saslMechanism = saslMechanism;
}
@Override

31
clients/src/main/java/org/apache/kafka/common/security/token/delegation/DelegationTokenCredentialCallback.java

@ -0,0 +1,31 @@ @@ -0,0 +1,31 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.common.security.token.delegation;
import org.apache.kafka.common.security.scram.ScramCredentialCallback;
public class DelegationTokenCredentialCallback extends ScramCredentialCallback {
private String tokenOwner;
public void tokenOwner(String tokenOwner) {
this.tokenOwner = tokenOwner;
}
public String tokenOwner() {
return tokenOwner;
}
}

2
clients/src/test/java/org/apache/kafka/common/security/auth/DefaultKafkaPrincipalBuilderTest.java

@ -22,7 +22,6 @@ import org.apache.kafka.common.network.TransportLayer; @@ -22,7 +22,6 @@ import org.apache.kafka.common.network.TransportLayer;
import org.apache.kafka.common.security.authenticator.DefaultKafkaPrincipalBuilder;
import org.apache.kafka.common.security.kerberos.KerberosName;
import org.apache.kafka.common.security.kerberos.KerberosShortNamer;
import org.apache.kafka.common.security.scram.ScramLoginModule;
import org.apache.kafka.common.security.scram.ScramMechanism;
import org.easymock.EasyMock;
import org.easymock.EasyMockSupport;
@ -119,7 +118,6 @@ public class DefaultKafkaPrincipalBuilderTest extends EasyMockSupport { @@ -119,7 +118,6 @@ public class DefaultKafkaPrincipalBuilderTest extends EasyMockSupport {
EasyMock.expect(server.getMechanismName()).andReturn(ScramMechanism.SCRAM_SHA_256.mechanismName());
EasyMock.expect(server.getAuthorizationID()).andReturn("foo");
EasyMock.expect(server.getNegotiatedProperty(ScramLoginModule.TOKEN_AUTH_CONFIG)).andReturn(false);
replayAll();

6
clients/src/test/java/org/apache/kafka/common/security/scram/ScramMessagesTest.java

@ -21,12 +21,14 @@ import org.junit.Before; @@ -21,12 +21,14 @@ import org.junit.Before;
import org.junit.Test;
import java.nio.charset.StandardCharsets;
import java.util.Collections;
import javax.security.sasl.SaslException;
import static org.junit.Assert.assertArrayEquals;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNull;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
import org.apache.kafka.common.security.scram.ScramMessages.AbstractScramMessage;
@ -70,7 +72,7 @@ public class ScramMessagesTest { @@ -70,7 +72,7 @@ public class ScramMessagesTest {
@Test
public void validClientFirstMessage() throws SaslException {
String nonce = formatter.secureRandomString();
ClientFirstMessage m = new ClientFirstMessage("someuser", nonce, "");
ClientFirstMessage m = new ClientFirstMessage("someuser", nonce, Collections.<String, String>emptyMap());
checkClientFirstMessage(m, "someuser", nonce, "");
// Default format used by Kafka client: only user and nonce are specified
@ -111,7 +113,7 @@ public class ScramMessagesTest { @@ -111,7 +113,7 @@ public class ScramMessagesTest {
//optional tokenauth specified as extensions
str = String.format("n,,n=testuser,r=%s,%s", nonce, "tokenauth=true");
m = createScramMessage(ClientFirstMessage.class, str);
assertEquals("tokenauth=true", m.extensions());
assertTrue("Token authentication not set from extensions", m.extensions().tokenAuthenticated());
}
@Test

Loading…
Cancel
Save