Browse Source

KAFKA-5944: Unit tests for handling SASL authentication failures in clients (#3965)

pull/4525/head
Vahid Hashemian 7 years ago committed by Jason Gustafson
parent
commit
596d3d0ec4
  1. 13
      clients/src/test/java/org/apache/kafka/clients/MockClient.java
  2. 72
      clients/src/test/java/org/apache/kafka/clients/admin/KafkaAdminClientTest.java
  3. 83
      clients/src/test/java/org/apache/kafka/clients/consumer/KafkaConsumerTest.java
  4. 25
      clients/src/test/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinatorTest.java
  5. 23
      clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinatorTest.java
  6. 19
      clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerNetworkClientTest.java

13
clients/src/test/java/org/apache/kafka/clients/MockClient.java

@ -20,6 +20,7 @@ import org.apache.kafka.common.Cluster; @@ -20,6 +20,7 @@ import org.apache.kafka.common.Cluster;
import org.apache.kafka.common.Node;
import org.apache.kafka.common.errors.AuthenticationException;
import org.apache.kafka.common.errors.UnsupportedVersionException;
import org.apache.kafka.common.protocol.Errors;
import org.apache.kafka.common.requests.AbstractRequest;
import org.apache.kafka.common.requests.AbstractResponse;
import org.apache.kafka.common.utils.Time;
@ -77,6 +78,7 @@ public class MockClient implements KafkaClient { @@ -77,6 +78,7 @@ public class MockClient implements KafkaClient {
private Node node = null;
private final Set<String> ready = new HashSet<>();
private final Map<Node, Long> blackedOut = new HashMap<>();
private final Map<Node, AuthenticationException> authenticationException = new HashMap<>();
// Use concurrent queue for requests so that requests may be queried from a different thread
private final Queue<ClientRequest> requests = new ConcurrentLinkedDeque<>();
// Use concurrent queue for responses so that responses may be updated during poll() from a different thread.
@ -102,7 +104,7 @@ public class MockClient implements KafkaClient { @@ -102,7 +104,7 @@ public class MockClient implements KafkaClient {
@Override
public boolean ready(Node node, long now) {
if (isBlackedOut(node))
if (isBlackedOut(node) || authenticationException(node) != null)
return false;
ready.add(node.idString());
return true;
@ -117,6 +119,12 @@ public class MockClient implements KafkaClient { @@ -117,6 +119,12 @@ public class MockClient implements KafkaClient {
blackedOut.put(node, time.milliseconds() + duration);
}
public void authenticationFailed(Node node, long duration) {
authenticationException.put(node, (AuthenticationException) Errors.SASL_AUTHENTICATION_FAILED.exception());
disconnect(node.idString());
blackout(node, duration);
}
private boolean isBlackedOut(Node node) {
if (blackedOut.containsKey(node)) {
long expiration = blackedOut.get(node);
@ -137,7 +145,7 @@ public class MockClient implements KafkaClient { @@ -137,7 +145,7 @@ public class MockClient implements KafkaClient {
@Override
public AuthenticationException authenticationException(Node node) {
return null;
return authenticationException.get(node);
}
@Override
@ -347,6 +355,7 @@ public class MockClient implements KafkaClient { @@ -347,6 +355,7 @@ public class MockClient implements KafkaClient {
responses.clear();
futureResponses.clear();
metadataUpdates.clear();
authenticationException.clear();
}
public void prepareMetadataUpdate(Cluster cluster, Set<String> unavailableTopics) {

72
clients/src/test/java/org/apache/kafka/clients/admin/KafkaAdminClientTest.java

@ -30,6 +30,7 @@ import org.apache.kafka.common.acl.AclBindingFilter; @@ -30,6 +30,7 @@ import org.apache.kafka.common.acl.AclBindingFilter;
import org.apache.kafka.common.acl.AclOperation;
import org.apache.kafka.common.acl.AclPermissionType;
import org.apache.kafka.common.config.ConfigResource;
import org.apache.kafka.common.errors.AuthenticationException;
import org.apache.kafka.common.errors.InvalidTopicException;
import org.apache.kafka.common.errors.LeaderNotAvailableException;
import org.apache.kafka.common.errors.NotLeaderForPartitionException;
@ -248,6 +249,75 @@ public class KafkaAdminClientTest { @@ -248,6 +249,75 @@ public class KafkaAdminClientTest {
}
}
@Test
public void testAdminClientApisWithinBlackoutPeriodAfterAuthenticationFailure() throws Exception {
AdminClientUnitTestEnv env = mockClientEnv();
Node node = env.cluster().controller();
env.kafkaClient().setNodeApiVersions(NodeApiVersions.create());
env.kafkaClient().setNode(node);
env.kafkaClient().authenticationFailed(node, 300);
callAdminClientApisAndExpectAnAuthenticationError(env);
// wait less than the blackout period, the connection should fail and the authentication error should remain
env.time().sleep(30);
assertTrue(env.kafkaClient().connectionFailed(node));
callAdminClientApisAndExpectAnAuthenticationError(env);
env.close();
}
private void callAdminClientApisAndExpectAnAuthenticationError(AdminClientUnitTestEnv env) throws InterruptedException {
env.kafkaClient().prepareMetadataUpdate(env.cluster(), Collections.<String>emptySet());
try {
env.adminClient().createTopics(
Collections.singleton(new NewTopic("myTopic", Collections.singletonMap(Integer.valueOf(0), asList(new Integer[]{0, 1, 2})))),
new CreateTopicsOptions().timeoutMs(10000)).all().get();
fail("Expected an authentication error.");
} catch (ExecutionException e) {
assertTrue("Expected only an authentication error.", e.getCause() instanceof AuthenticationException);
}
try {
Map<String, NewPartitions> counts = new HashMap<>();
counts.put("my_topic", NewPartitions.increaseTo(3));
counts.put("other_topic", NewPartitions.increaseTo(3, asList(asList(2), asList(3))));
env.adminClient().createPartitions(counts).all().get();
fail("Expected an authentication error.");
} catch (ExecutionException e) {
assertTrue("Expected only an authentication error.", e.getCause() instanceof AuthenticationException);
}
try {
env.adminClient().createAcls(asList(ACL1, ACL2)).all().get();
fail("Expected an authentication error.");
} catch (ExecutionException e) {
assertTrue("Expected only an authentication error.", e.getCause() instanceof AuthenticationException);
}
try {
env.adminClient().describeAcls(FILTER1).values().get();
fail("Expected an authentication error.");
} catch (ExecutionException e) {
assertTrue("Expected only an authentication error.", e.getCause() instanceof AuthenticationException);
}
try {
env.adminClient().deleteAcls(asList(FILTER1, FILTER2)).all().get();
fail("Expected an authentication error.");
} catch (ExecutionException e) {
assertTrue("Expected only an authentication error.", e.getCause() instanceof AuthenticationException);
}
try {
env.adminClient().describeConfigs(Collections.singleton(new ConfigResource(ConfigResource.Type.BROKER, "0"))).all().get();
fail("Expected an authentication error.");
} catch (ExecutionException e) {
assertTrue("Expected only an authentication error.", e.getCause() instanceof AuthenticationException);
}
}
private static final AclBinding ACL1 = new AclBinding(new Resource(ResourceType.TOPIC, "mytopic3"),
new AccessControlEntry("User:ANONYMOUS", "*", AclOperation.DESCRIBE, AclPermissionType.ALLOW));
private static final AclBinding ACL2 = new AclBinding(new Resource(ResourceType.TOPIC, "mytopic4"),
@ -579,7 +649,7 @@ public class KafkaAdminClientTest { @@ -579,7 +649,7 @@ public class KafkaAdminClientTest {
private int numTries = 0;
private int failuresInjected = 0;
@Override
public KafkaAdminClient.TimeoutProcessor create(long now) {
return new FailureInjectingTimeoutProcessor(now);

83
clients/src/test/java/org/apache/kafka/clients/consumer/KafkaConsumerTest.java

@ -32,6 +32,7 @@ import org.apache.kafka.common.Cluster; @@ -32,6 +32,7 @@ import org.apache.kafka.common.Cluster;
import org.apache.kafka.common.KafkaException;
import org.apache.kafka.common.Node;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.errors.AuthenticationException;
import org.apache.kafka.common.errors.InterruptException;
import org.apache.kafka.common.errors.WakeupException;
import org.apache.kafka.common.metrics.Metrics;
@ -1431,6 +1432,88 @@ public class KafkaConsumerTest { @@ -1431,6 +1432,88 @@ public class KafkaConsumerTest {
}
}
@Test
public void testConsumerWithinBlackoutPeriodAfterAuthenticationFailure() {
int rebalanceTimeoutMs = 60000;
int sessionTimeoutMs = 30000;
int heartbeatIntervalMs = 3000;
int autoCommitIntervalMs = 1000;
Time time = new MockTime();
Map<String, Integer> tpCounts = new HashMap<>();
tpCounts.put(topic, 1);
Cluster cluster = TestUtils.singletonCluster(tpCounts);
Node node = cluster.nodes().get(0);
Metadata metadata = createMetadata();
metadata.update(cluster, Collections.<String>emptySet(), time.milliseconds());
MockClient client = new MockClient(time, metadata);
client.setNode(node);
client.authenticationFailed(node, 300);
PartitionAssignor assignor = new RangeAssignor();
final KafkaConsumer<String, String> consumer = newConsumer(time, client, metadata, assignor,
rebalanceTimeoutMs, sessionTimeoutMs, heartbeatIntervalMs, true, autoCommitIntervalMs);
consumer.subscribe(Collections.singleton(topic));
callConsumerApisAndExpectAnAuthenticationError(consumer, tp0);
time.sleep(30); // wait less than the blackout period
assertTrue(client.connectionFailed(node));
callConsumerApisAndExpectAnAuthenticationError(consumer, tp0);
client.requests().clear();
consumer.close(0, TimeUnit.MILLISECONDS);
}
private void callConsumerApisAndExpectAnAuthenticationError(KafkaConsumer<?, ?> consumer, TopicPartition partition) {
try {
consumer.partitionsFor("some other topic");
fail("Expected an authentication error!");
} catch (AuthenticationException e) {
// OK
}
try {
consumer.beginningOffsets(Collections.singleton(partition));
fail("Expected an authentication error!");
} catch (AuthenticationException e) {
// OK
}
try {
consumer.endOffsets(Collections.singleton(partition));
fail("Expected an authentication error!");
} catch (AuthenticationException e) {
// OK
}
try {
consumer.poll(10);
fail("Expected an authentication error!");
} catch (AuthenticationException e) {
// OK
}
Map<TopicPartition, OffsetAndMetadata> offset = new HashMap<>();
offset.put(partition, new OffsetAndMetadata(10L));
try {
consumer.commitSync(offset);
fail("Expected an authentication error!");
} catch (AuthenticationException e) {
// OK
}
try {
consumer.committed(partition);
fail("Expected an authentication error!");
} catch (AuthenticationException e) {
// OK
}
}
private ConsumerRebalanceListener getConsumerRebalanceListener(final KafkaConsumer<String, String> consumer) {
return new ConsumerRebalanceListener() {
@Override

25
clients/src/test/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinatorTest.java

@ -20,6 +20,7 @@ import org.apache.kafka.clients.Metadata; @@ -20,6 +20,7 @@ import org.apache.kafka.clients.Metadata;
import org.apache.kafka.clients.MockClient;
import org.apache.kafka.common.Cluster;
import org.apache.kafka.common.Node;
import org.apache.kafka.common.errors.AuthenticationException;
import org.apache.kafka.common.errors.WakeupException;
import org.apache.kafka.common.metrics.Metrics;
import org.apache.kafka.common.protocol.Errors;
@ -519,6 +520,30 @@ public class AbstractCoordinatorTest { @@ -519,6 +520,30 @@ public class AbstractCoordinatorTest {
awaitFirstHeartbeat(heartbeatReceived);
}
@Test
public void testEnsureCoordinatorReadyWithinBlackoutPeriodAfterAuthenticationFailure() {
setupCoordinator(RETRY_BACKOFF_MS);
mockClient.authenticationFailed(node, 300);
try {
coordinator.ensureCoordinatorReady();
fail("Expected an authentication error.");
} catch (AuthenticationException e) {
// OK
}
mockTime.sleep(30); // wait less than the blackout period
assertTrue(mockClient.connectionFailed(node));
try {
coordinator.ensureCoordinatorReady();
fail("Expected an authentication error.");
} catch (AuthenticationException e) {
// OK
}
}
private AtomicBoolean prepareFirstHeartbeat() {
final AtomicBoolean heartbeatReceived = new AtomicBoolean(false);
mockClient.prepareResponse(new MockClient.RequestMatcher() {

23
clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinatorTest.java

@ -33,6 +33,7 @@ import org.apache.kafka.common.KafkaException; @@ -33,6 +33,7 @@ import org.apache.kafka.common.KafkaException;
import org.apache.kafka.common.Node;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.errors.ApiException;
import org.apache.kafka.common.errors.AuthenticationException;
import org.apache.kafka.common.errors.DisconnectException;
import org.apache.kafka.common.errors.GroupAuthorizationException;
import org.apache.kafka.common.errors.OffsetMetadataTooLarge;
@ -1503,6 +1504,28 @@ public class ConsumerCoordinatorTest { @@ -1503,6 +1504,28 @@ public class ConsumerCoordinatorTest {
assertEquals(null, subscriptions.committed(t1p));
}
@Test
public void testEnsureActiveGroupWithinBlackoutPeriodAfterAuthenticationFailure() {
client.authenticationFailed(node, 300);
try {
coordinator.ensureActiveGroup();
fail("Expected an authentication error.");
} catch (AuthenticationException e) {
// OK
}
time.sleep(30); // wait less than the blackout period
assertTrue(client.connectionFailed(node));
try {
coordinator.ensureActiveGroup();
fail("Expected an authentication error.");
} catch (AuthenticationException e) {
// OK
}
}
@Test
public void testProtocolMetadataOrder() {
RoundRobinAssignor roundRobin = new RoundRobinAssignor();

19
clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerNetworkClientTest.java

@ -22,6 +22,7 @@ import org.apache.kafka.clients.MockClient; @@ -22,6 +22,7 @@ import org.apache.kafka.clients.MockClient;
import org.apache.kafka.clients.NetworkClient;
import org.apache.kafka.common.Cluster;
import org.apache.kafka.common.Node;
import org.apache.kafka.common.errors.AuthenticationException;
import org.apache.kafka.common.errors.DisconnectException;
import org.apache.kafka.common.errors.WakeupException;
import org.apache.kafka.common.protocol.Errors;
@ -69,6 +70,24 @@ public class ConsumerNetworkClientTest { @@ -69,6 +70,24 @@ public class ConsumerNetworkClientTest {
assertEquals(Errors.NONE, response.error());
}
@Test
public void sendWithinBlackoutPeriodAfterAuthenticationFailure() throws InterruptedException {
client.authenticationFailed(node, 300);
client.prepareResponse(heartbeatResponse(Errors.NONE));
final RequestFuture<ClientResponse> future = consumerClient.send(node, heartbeat());
consumerClient.poll(future);
assertTrue(future.failed());
assertTrue("Expected only an authentication error.", future.exception() instanceof AuthenticationException);
time.sleep(30); // wait less than the blackout period
assertTrue(client.connectionFailed(node));
final RequestFuture<ClientResponse> future2 = consumerClient.send(node, heartbeat());
consumerClient.poll(future2);
assertTrue(future2.failed());
assertTrue("Expected only an authentication error.", future2.exception() instanceof AuthenticationException);
}
@Test
public void multiSend() {
client.prepareResponse(heartbeatResponse(Errors.NONE));

Loading…
Cancel
Save