From 39d41e5aac4e8cf303040b0bb12c122832be5ea1 Mon Sep 17 00:00:00 2001 From: Chris Egerton Date: Wed, 8 Mar 2023 10:25:55 -0500 Subject: [PATCH] KAFKA-14781: Downgrade MM2 log message severity when no ACL authorizer is configured on source broker (#13351) Reviewers: Mickael Maison --- build.gradle | 1 + .../connect/mirror/MirrorSourceConnector.java | 44 +++++++++++++-- .../mirror/MirrorSourceConnectorTest.java | 54 +++++++++++++++++++ 3 files changed, 94 insertions(+), 5 deletions(-) diff --git a/build.gradle b/build.gradle index ac37efb6cc2..8cb49a6750a 100644 --- a/build.gradle +++ b/build.gradle @@ -2881,6 +2881,7 @@ project(':connect:mirror') { implementation libs.swaggerAnnotations testImplementation libs.junitJupiter + testImplementation libs.log4j testImplementation libs.mockitoCore testImplementation project(':clients').sourceSets.test.output testImplementation project(':connect:runtime').sourceSets.test.output diff --git a/connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorSourceConnector.java b/connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorSourceConnector.java index 0ecc69c0b37..6c9d779fe11 100644 --- a/connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorSourceConnector.java +++ b/connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorSourceConnector.java @@ -23,6 +23,7 @@ import org.apache.kafka.clients.admin.Admin; import org.apache.kafka.clients.consumer.ConsumerConfig; import org.apache.kafka.common.IsolationLevel; import org.apache.kafka.common.config.ConfigValue; +import org.apache.kafka.common.errors.SecurityDisabledException; import org.apache.kafka.connect.connector.Task; import org.apache.kafka.connect.source.ExactlyOnceSupport; import org.apache.kafka.connect.source.SourceConnector; @@ -53,10 +54,12 @@ import java.util.Map; import java.util.List; import java.util.ArrayList; import java.util.Objects; +import java.util.Optional; import java.util.Set; import java.util.HashSet; import java.util.Collection; import java.util.Collections; +import java.util.concurrent.atomic.AtomicBoolean; import java.util.function.Function; import java.util.stream.Collectors; import java.util.stream.Stream; @@ -65,6 +68,8 @@ import java.util.concurrent.ExecutionException; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import static org.apache.kafka.connect.mirror.MirrorSourceConfig.SYNC_TOPIC_ACLS_ENABLED; + /** Replicate data, configuration, and ACLs between clusters. * * @see MirrorSourceConfig for supported config properties. @@ -91,6 +96,7 @@ public class MirrorSourceConnector extends SourceConnector { private Admin sourceAdminClient; private Admin targetAdminClient; private Admin offsetSyncsAdminClient; + private AtomicBoolean noAclAuthorizer = new AtomicBoolean(false); public MirrorSourceConnector() { // nop @@ -111,6 +117,12 @@ public class MirrorSourceConnector extends SourceConnector { this.configPropertyFilter = configPropertyFilter; } + // visible for testing + MirrorSourceConnector(Admin sourceAdminClient, Admin targetAdminClient) { + this.sourceAdminClient = sourceAdminClient; + this.targetAdminClient = targetAdminClient; + } + @Override public void start(Map props) { long start = System.currentTimeMillis(); @@ -337,16 +349,20 @@ public class MirrorSourceConnector extends SourceConnector { .collect(Collectors.toSet()); } - private void syncTopicAcls() + // Visible for testing + void syncTopicAcls() throws InterruptedException, ExecutionException { - List bindings = listTopicAclBindings().stream() + Optional> rawBindings = listTopicAclBindings(); + if (!rawBindings.isPresent()) + return; + List filteredBindings = rawBindings.get().stream() .filter(x -> x.pattern().resourceType() == ResourceType.TOPIC) .filter(x -> x.pattern().patternType() == PatternType.LITERAL) .filter(this::shouldReplicateAcl) .filter(x -> shouldReplicateTopic(x.pattern().name())) .map(this::targetAclBinding) .collect(Collectors.toList()); - updateTopicAcls(bindings); + updateTopicAcls(filteredBindings); } private void syncTopicConfigs() @@ -451,9 +467,27 @@ public class MirrorSourceConnector extends SourceConnector { return adminClient.listTopics().names().get(); } - private Collection listTopicAclBindings() + private Optional> listTopicAclBindings() throws InterruptedException, ExecutionException { - return sourceAdminClient.describeAcls(ANY_TOPIC_ACL).values().get(); + Collection bindings; + try { + bindings = sourceAdminClient.describeAcls(ANY_TOPIC_ACL).values().get(); + } catch (ExecutionException e) { + if (e.getCause() instanceof SecurityDisabledException) { + if (noAclAuthorizer.compareAndSet(false, true)) { + log.info( + "No ACL authorizer is configured on the source Kafka cluster, so no topic ACL syncing will take place. " + + "Consider disabling topic ACL syncing by setting " + SYNC_TOPIC_ACLS_ENABLED + " to 'false'." + ); + } else { + log.debug("Source-side ACL authorizer still not found; skipping topic ACL sync"); + } + return Optional.empty(); + } else { + throw e; + } + } + return Optional.of(bindings); } private static Collection describeTopics(Admin adminClient, Collection topics) diff --git a/connect/mirror/src/test/java/org/apache/kafka/connect/mirror/MirrorSourceConnectorTest.java b/connect/mirror/src/test/java/org/apache/kafka/connect/mirror/MirrorSourceConnectorTest.java index 92e37e5fd15..dfe1f524f22 100644 --- a/connect/mirror/src/test/java/org/apache/kafka/connect/mirror/MirrorSourceConnectorTest.java +++ b/connect/mirror/src/test/java/org/apache/kafka/connect/mirror/MirrorSourceConnectorTest.java @@ -16,16 +16,21 @@ */ package org.apache.kafka.connect.mirror; +import org.apache.kafka.clients.admin.Admin; +import org.apache.kafka.clients.admin.DescribeAclsResult; +import org.apache.kafka.common.KafkaFuture; import org.apache.kafka.common.TopicPartition; import org.apache.kafka.common.acl.AccessControlEntry; import org.apache.kafka.common.acl.AclBinding; import org.apache.kafka.common.acl.AclOperation; import org.apache.kafka.common.acl.AclPermissionType; import org.apache.kafka.common.config.ConfigValue; +import org.apache.kafka.common.errors.SecurityDisabledException; import org.apache.kafka.common.resource.PatternType; import org.apache.kafka.common.resource.ResourcePattern; import org.apache.kafka.common.resource.ResourceType; import org.apache.kafka.clients.admin.Config; +import org.apache.kafka.common.utils.LogCaptureAppender; import org.apache.kafka.connect.connector.ConnectorContext; import org.apache.kafka.clients.admin.ConfigEntry; import org.apache.kafka.clients.admin.NewTopic; @@ -54,14 +59,18 @@ import static org.mockito.Mockito.mock; import static org.mockito.Mockito.spy; import static org.mockito.Mockito.times; import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.verifyNoInteractions; +import static org.mockito.Mockito.when; import java.util.ArrayList; import java.util.Arrays; +import java.util.Collection; import java.util.Collections; import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.Optional; +import java.util.concurrent.ExecutionException; import java.util.stream.Collectors; public class MirrorSourceConnectorTest { @@ -149,6 +158,51 @@ public class MirrorSourceConnectorTest { assertEquals(processedDenyAllAclBinding.entry().permissionType(), AclPermissionType.DENY, "should not change DENY"); } + @Test + public void testNoBrokerAclAuthorizer() throws Exception { + Admin sourceAdmin = mock(Admin.class); + Admin targetAdmin = mock(Admin.class); + MirrorSourceConnector connector = new MirrorSourceConnector(sourceAdmin, targetAdmin); + + ExecutionException describeAclsFailure = new ExecutionException( + "Failed to describe ACLs", + new SecurityDisabledException("No ACL authorizer configured on this broker") + ); + @SuppressWarnings("unchecked") + KafkaFuture> describeAclsFuture = mock(KafkaFuture.class); + when(describeAclsFuture.get()).thenThrow(describeAclsFailure); + DescribeAclsResult describeAclsResult = mock(DescribeAclsResult.class); + when(describeAclsResult.values()).thenReturn(describeAclsFuture); + when(sourceAdmin.describeAcls(any())).thenReturn(describeAclsResult); + + try (LogCaptureAppender connectorLogs = LogCaptureAppender.createAndRegister(MirrorSourceConnector.class)) { + LogCaptureAppender.setClassLoggerToTrace(MirrorSourceConnector.class); + connector.syncTopicAcls(); + long aclSyncDisableMessages = connectorLogs.getMessages().stream() + .filter(m -> m.contains("Consider disabling topic ACL syncing")) + .count(); + assertEquals(1, aclSyncDisableMessages, "Should have recommended that user disable ACL syncing"); + long aclSyncSkippingMessages = connectorLogs.getMessages().stream() + .filter(m -> m.contains("skipping topic ACL sync")) + .count(); + assertEquals(0, aclSyncSkippingMessages, "Should not have logged ACL sync skip at same time as suggesting ACL sync be disabled"); + + connector.syncTopicAcls(); + connector.syncTopicAcls(); + aclSyncDisableMessages = connectorLogs.getMessages().stream() + .filter(m -> m.contains("Consider disabling topic ACL syncing")) + .count(); + assertEquals(1, aclSyncDisableMessages, "Should not have recommended that user disable ACL syncing more than once"); + aclSyncSkippingMessages = connectorLogs.getMessages().stream() + .filter(m -> m.contains("skipping topic ACL sync")) + .count(); + assertEquals(2, aclSyncSkippingMessages, "Should have logged ACL sync skip instead of suggesting disabling ACL syncing"); + } + + // We should never have tried to perform an ACL sync on the target cluster + verifyNoInteractions(targetAdmin); + } + @Test public void testConfigPropertyFiltering() { MirrorSourceConnector connector = new MirrorSourceConnector(new SourceAndTarget("source", "target"),