From 596d3d0ec4ee1ba719ce87e464a095bd63ab72be Mon Sep 17 00:00:00 2001 From: Vahid Hashemian Date: Mon, 12 Feb 2018 14:29:44 -0800 Subject: [PATCH] KAFKA-5944: Unit tests for handling SASL authentication failures in clients (#3965) --- .../org/apache/kafka/clients/MockClient.java | 13 ++- .../clients/admin/KafkaAdminClientTest.java | 72 +++++++++++++++- .../clients/consumer/KafkaConsumerTest.java | 83 +++++++++++++++++++ .../internals/AbstractCoordinatorTest.java | 25 ++++++ .../internals/ConsumerCoordinatorTest.java | 23 +++++ .../internals/ConsumerNetworkClientTest.java | 19 +++++ 6 files changed, 232 insertions(+), 3 deletions(-) diff --git a/clients/src/test/java/org/apache/kafka/clients/MockClient.java b/clients/src/test/java/org/apache/kafka/clients/MockClient.java index d843414fd7a..65255fe9648 100644 --- a/clients/src/test/java/org/apache/kafka/clients/MockClient.java +++ b/clients/src/test/java/org/apache/kafka/clients/MockClient.java @@ -20,6 +20,7 @@ import org.apache.kafka.common.Cluster; import org.apache.kafka.common.Node; import org.apache.kafka.common.errors.AuthenticationException; import org.apache.kafka.common.errors.UnsupportedVersionException; +import org.apache.kafka.common.protocol.Errors; import org.apache.kafka.common.requests.AbstractRequest; import org.apache.kafka.common.requests.AbstractResponse; import org.apache.kafka.common.utils.Time; @@ -77,6 +78,7 @@ public class MockClient implements KafkaClient { private Node node = null; private final Set ready = new HashSet<>(); private final Map blackedOut = new HashMap<>(); + private final Map authenticationException = new HashMap<>(); // Use concurrent queue for requests so that requests may be queried from a different thread private final Queue requests = new ConcurrentLinkedDeque<>(); // Use concurrent queue for responses so that responses may be updated during poll() from a different thread. @@ -102,7 +104,7 @@ public class MockClient implements KafkaClient { @Override public boolean ready(Node node, long now) { - if (isBlackedOut(node)) + if (isBlackedOut(node) || authenticationException(node) != null) return false; ready.add(node.idString()); return true; @@ -117,6 +119,12 @@ public class MockClient implements KafkaClient { blackedOut.put(node, time.milliseconds() + duration); } + public void authenticationFailed(Node node, long duration) { + authenticationException.put(node, (AuthenticationException) Errors.SASL_AUTHENTICATION_FAILED.exception()); + disconnect(node.idString()); + blackout(node, duration); + } + private boolean isBlackedOut(Node node) { if (blackedOut.containsKey(node)) { long expiration = blackedOut.get(node); @@ -137,7 +145,7 @@ public class MockClient implements KafkaClient { @Override public AuthenticationException authenticationException(Node node) { - return null; + return authenticationException.get(node); } @Override @@ -347,6 +355,7 @@ public class MockClient implements KafkaClient { responses.clear(); futureResponses.clear(); metadataUpdates.clear(); + authenticationException.clear(); } public void prepareMetadataUpdate(Cluster cluster, Set unavailableTopics) { diff --git a/clients/src/test/java/org/apache/kafka/clients/admin/KafkaAdminClientTest.java b/clients/src/test/java/org/apache/kafka/clients/admin/KafkaAdminClientTest.java index 186ccf06cb5..f08a99b6ddc 100644 --- a/clients/src/test/java/org/apache/kafka/clients/admin/KafkaAdminClientTest.java +++ b/clients/src/test/java/org/apache/kafka/clients/admin/KafkaAdminClientTest.java @@ -30,6 +30,7 @@ import org.apache.kafka.common.acl.AclBindingFilter; import org.apache.kafka.common.acl.AclOperation; import org.apache.kafka.common.acl.AclPermissionType; import org.apache.kafka.common.config.ConfigResource; +import org.apache.kafka.common.errors.AuthenticationException; import org.apache.kafka.common.errors.InvalidTopicException; import org.apache.kafka.common.errors.LeaderNotAvailableException; import org.apache.kafka.common.errors.NotLeaderForPartitionException; @@ -248,6 +249,75 @@ public class KafkaAdminClientTest { } } + @Test + public void testAdminClientApisWithinBlackoutPeriodAfterAuthenticationFailure() throws Exception { + AdminClientUnitTestEnv env = mockClientEnv(); + Node node = env.cluster().controller(); + env.kafkaClient().setNodeApiVersions(NodeApiVersions.create()); + env.kafkaClient().setNode(node); + env.kafkaClient().authenticationFailed(node, 300); + + callAdminClientApisAndExpectAnAuthenticationError(env); + + // wait less than the blackout period, the connection should fail and the authentication error should remain + env.time().sleep(30); + assertTrue(env.kafkaClient().connectionFailed(node)); + callAdminClientApisAndExpectAnAuthenticationError(env); + + env.close(); + } + + private void callAdminClientApisAndExpectAnAuthenticationError(AdminClientUnitTestEnv env) throws InterruptedException { + env.kafkaClient().prepareMetadataUpdate(env.cluster(), Collections.emptySet()); + + try { + env.adminClient().createTopics( + Collections.singleton(new NewTopic("myTopic", Collections.singletonMap(Integer.valueOf(0), asList(new Integer[]{0, 1, 2})))), + new CreateTopicsOptions().timeoutMs(10000)).all().get(); + fail("Expected an authentication error."); + } catch (ExecutionException e) { + assertTrue("Expected only an authentication error.", e.getCause() instanceof AuthenticationException); + } + + try { + Map counts = new HashMap<>(); + counts.put("my_topic", NewPartitions.increaseTo(3)); + counts.put("other_topic", NewPartitions.increaseTo(3, asList(asList(2), asList(3)))); + env.adminClient().createPartitions(counts).all().get(); + fail("Expected an authentication error."); + } catch (ExecutionException e) { + assertTrue("Expected only an authentication error.", e.getCause() instanceof AuthenticationException); + } + + try { + env.adminClient().createAcls(asList(ACL1, ACL2)).all().get(); + fail("Expected an authentication error."); + } catch (ExecutionException e) { + assertTrue("Expected only an authentication error.", e.getCause() instanceof AuthenticationException); + } + + try { + env.adminClient().describeAcls(FILTER1).values().get(); + fail("Expected an authentication error."); + } catch (ExecutionException e) { + assertTrue("Expected only an authentication error.", e.getCause() instanceof AuthenticationException); + } + + try { + env.adminClient().deleteAcls(asList(FILTER1, FILTER2)).all().get(); + fail("Expected an authentication error."); + } catch (ExecutionException e) { + assertTrue("Expected only an authentication error.", e.getCause() instanceof AuthenticationException); + } + + try { + env.adminClient().describeConfigs(Collections.singleton(new ConfigResource(ConfigResource.Type.BROKER, "0"))).all().get(); + fail("Expected an authentication error."); + } catch (ExecutionException e) { + assertTrue("Expected only an authentication error.", e.getCause() instanceof AuthenticationException); + } + } + private static final AclBinding ACL1 = new AclBinding(new Resource(ResourceType.TOPIC, "mytopic3"), new AccessControlEntry("User:ANONYMOUS", "*", AclOperation.DESCRIBE, AclPermissionType.ALLOW)); private static final AclBinding ACL2 = new AclBinding(new Resource(ResourceType.TOPIC, "mytopic4"), @@ -579,7 +649,7 @@ public class KafkaAdminClientTest { private int numTries = 0; private int failuresInjected = 0; - + @Override public KafkaAdminClient.TimeoutProcessor create(long now) { return new FailureInjectingTimeoutProcessor(now); diff --git a/clients/src/test/java/org/apache/kafka/clients/consumer/KafkaConsumerTest.java b/clients/src/test/java/org/apache/kafka/clients/consumer/KafkaConsumerTest.java index d47124f8f3e..be8db2bf55c 100644 --- a/clients/src/test/java/org/apache/kafka/clients/consumer/KafkaConsumerTest.java +++ b/clients/src/test/java/org/apache/kafka/clients/consumer/KafkaConsumerTest.java @@ -32,6 +32,7 @@ import org.apache.kafka.common.Cluster; import org.apache.kafka.common.KafkaException; import org.apache.kafka.common.Node; import org.apache.kafka.common.TopicPartition; +import org.apache.kafka.common.errors.AuthenticationException; import org.apache.kafka.common.errors.InterruptException; import org.apache.kafka.common.errors.WakeupException; import org.apache.kafka.common.metrics.Metrics; @@ -1431,6 +1432,88 @@ public class KafkaConsumerTest { } } + @Test + public void testConsumerWithinBlackoutPeriodAfterAuthenticationFailure() { + int rebalanceTimeoutMs = 60000; + int sessionTimeoutMs = 30000; + int heartbeatIntervalMs = 3000; + int autoCommitIntervalMs = 1000; + + Time time = new MockTime(); + Map tpCounts = new HashMap<>(); + tpCounts.put(topic, 1); + Cluster cluster = TestUtils.singletonCluster(tpCounts); + Node node = cluster.nodes().get(0); + + Metadata metadata = createMetadata(); + metadata.update(cluster, Collections.emptySet(), time.milliseconds()); + + MockClient client = new MockClient(time, metadata); + client.setNode(node); + client.authenticationFailed(node, 300); + PartitionAssignor assignor = new RangeAssignor(); + + final KafkaConsumer consumer = newConsumer(time, client, metadata, assignor, + rebalanceTimeoutMs, sessionTimeoutMs, heartbeatIntervalMs, true, autoCommitIntervalMs); + + consumer.subscribe(Collections.singleton(topic)); + callConsumerApisAndExpectAnAuthenticationError(consumer, tp0); + + time.sleep(30); // wait less than the blackout period + assertTrue(client.connectionFailed(node)); + callConsumerApisAndExpectAnAuthenticationError(consumer, tp0); + + client.requests().clear(); + consumer.close(0, TimeUnit.MILLISECONDS); + } + + private void callConsumerApisAndExpectAnAuthenticationError(KafkaConsumer consumer, TopicPartition partition) { + try { + consumer.partitionsFor("some other topic"); + fail("Expected an authentication error!"); + } catch (AuthenticationException e) { + // OK + } + + try { + consumer.beginningOffsets(Collections.singleton(partition)); + fail("Expected an authentication error!"); + } catch (AuthenticationException e) { + // OK + } + + try { + consumer.endOffsets(Collections.singleton(partition)); + fail("Expected an authentication error!"); + } catch (AuthenticationException e) { + // OK + } + + try { + consumer.poll(10); + fail("Expected an authentication error!"); + } catch (AuthenticationException e) { + // OK + } + + Map offset = new HashMap<>(); + offset.put(partition, new OffsetAndMetadata(10L)); + + try { + consumer.commitSync(offset); + fail("Expected an authentication error!"); + } catch (AuthenticationException e) { + // OK + } + + try { + consumer.committed(partition); + fail("Expected an authentication error!"); + } catch (AuthenticationException e) { + // OK + } + } + private ConsumerRebalanceListener getConsumerRebalanceListener(final KafkaConsumer consumer) { return new ConsumerRebalanceListener() { @Override diff --git a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinatorTest.java b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinatorTest.java index 7eaca9826d9..1c88803e26c 100644 --- a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinatorTest.java +++ b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinatorTest.java @@ -20,6 +20,7 @@ import org.apache.kafka.clients.Metadata; import org.apache.kafka.clients.MockClient; import org.apache.kafka.common.Cluster; import org.apache.kafka.common.Node; +import org.apache.kafka.common.errors.AuthenticationException; import org.apache.kafka.common.errors.WakeupException; import org.apache.kafka.common.metrics.Metrics; import org.apache.kafka.common.protocol.Errors; @@ -519,6 +520,30 @@ public class AbstractCoordinatorTest { awaitFirstHeartbeat(heartbeatReceived); } + @Test + public void testEnsureCoordinatorReadyWithinBlackoutPeriodAfterAuthenticationFailure() { + setupCoordinator(RETRY_BACKOFF_MS); + + mockClient.authenticationFailed(node, 300); + + try { + coordinator.ensureCoordinatorReady(); + fail("Expected an authentication error."); + } catch (AuthenticationException e) { + // OK + } + + mockTime.sleep(30); // wait less than the blackout period + assertTrue(mockClient.connectionFailed(node)); + + try { + coordinator.ensureCoordinatorReady(); + fail("Expected an authentication error."); + } catch (AuthenticationException e) { + // OK + } + } + private AtomicBoolean prepareFirstHeartbeat() { final AtomicBoolean heartbeatReceived = new AtomicBoolean(false); mockClient.prepareResponse(new MockClient.RequestMatcher() { diff --git a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinatorTest.java b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinatorTest.java index c49339b6525..fdaa6b3f522 100644 --- a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinatorTest.java +++ b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinatorTest.java @@ -33,6 +33,7 @@ import org.apache.kafka.common.KafkaException; import org.apache.kafka.common.Node; import org.apache.kafka.common.TopicPartition; import org.apache.kafka.common.errors.ApiException; +import org.apache.kafka.common.errors.AuthenticationException; import org.apache.kafka.common.errors.DisconnectException; import org.apache.kafka.common.errors.GroupAuthorizationException; import org.apache.kafka.common.errors.OffsetMetadataTooLarge; @@ -1503,6 +1504,28 @@ public class ConsumerCoordinatorTest { assertEquals(null, subscriptions.committed(t1p)); } + @Test + public void testEnsureActiveGroupWithinBlackoutPeriodAfterAuthenticationFailure() { + client.authenticationFailed(node, 300); + + try { + coordinator.ensureActiveGroup(); + fail("Expected an authentication error."); + } catch (AuthenticationException e) { + // OK + } + + time.sleep(30); // wait less than the blackout period + assertTrue(client.connectionFailed(node)); + + try { + coordinator.ensureActiveGroup(); + fail("Expected an authentication error."); + } catch (AuthenticationException e) { + // OK + } + } + @Test public void testProtocolMetadataOrder() { RoundRobinAssignor roundRobin = new RoundRobinAssignor(); diff --git a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerNetworkClientTest.java b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerNetworkClientTest.java index 93c6acd9c40..904270ec489 100644 --- a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerNetworkClientTest.java +++ b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerNetworkClientTest.java @@ -22,6 +22,7 @@ import org.apache.kafka.clients.MockClient; import org.apache.kafka.clients.NetworkClient; import org.apache.kafka.common.Cluster; import org.apache.kafka.common.Node; +import org.apache.kafka.common.errors.AuthenticationException; import org.apache.kafka.common.errors.DisconnectException; import org.apache.kafka.common.errors.WakeupException; import org.apache.kafka.common.protocol.Errors; @@ -69,6 +70,24 @@ public class ConsumerNetworkClientTest { assertEquals(Errors.NONE, response.error()); } + @Test + public void sendWithinBlackoutPeriodAfterAuthenticationFailure() throws InterruptedException { + client.authenticationFailed(node, 300); + client.prepareResponse(heartbeatResponse(Errors.NONE)); + final RequestFuture future = consumerClient.send(node, heartbeat()); + consumerClient.poll(future); + assertTrue(future.failed()); + assertTrue("Expected only an authentication error.", future.exception() instanceof AuthenticationException); + + time.sleep(30); // wait less than the blackout period + assertTrue(client.connectionFailed(node)); + + final RequestFuture future2 = consumerClient.send(node, heartbeat()); + consumerClient.poll(future2); + assertTrue(future2.failed()); + assertTrue("Expected only an authentication error.", future2.exception() instanceof AuthenticationException); + } + @Test public void multiSend() { client.prepareResponse(heartbeatResponse(Errors.NONE));