@ -23,6 +23,7 @@ import org.apache.kafka.clients.admin.Admin;
@@ -23,6 +23,7 @@ import org.apache.kafka.clients.admin.Admin;
import org.apache.kafka.clients.consumer.ConsumerConfig ;
import org.apache.kafka.common.IsolationLevel ;
import org.apache.kafka.common.config.ConfigValue ;
import org.apache.kafka.common.errors.SecurityDisabledException ;
import org.apache.kafka.connect.connector.Task ;
import org.apache.kafka.connect.source.ExactlyOnceSupport ;
import org.apache.kafka.connect.source.SourceConnector ;
@ -53,10 +54,12 @@ import java.util.Map;
@@ -53,10 +54,12 @@ import java.util.Map;
import java.util.List ;
import java.util.ArrayList ;
import java.util.Objects ;
import java.util.Optional ;
import java.util.Set ;
import java.util.HashSet ;
import java.util.Collection ;
import java.util.Collections ;
import java.util.concurrent.atomic.AtomicBoolean ;
import java.util.function.Function ;
import java.util.stream.Collectors ;
import java.util.stream.Stream ;
@ -65,6 +68,8 @@ import java.util.concurrent.ExecutionException;
@@ -65,6 +68,8 @@ import java.util.concurrent.ExecutionException;
import org.slf4j.Logger ;
import org.slf4j.LoggerFactory ;
import static org.apache.kafka.connect.mirror.MirrorSourceConfig.SYNC_TOPIC_ACLS_ENABLED ;
/ * * Replicate data , configuration , and ACLs between clusters .
*
* @see MirrorSourceConfig for supported config properties .
@ -91,6 +96,7 @@ public class MirrorSourceConnector extends SourceConnector {
@@ -91,6 +96,7 @@ public class MirrorSourceConnector extends SourceConnector {
private Admin sourceAdminClient ;
private Admin targetAdminClient ;
private Admin offsetSyncsAdminClient ;
private AtomicBoolean noAclAuthorizer = new AtomicBoolean ( false ) ;
public MirrorSourceConnector ( ) {
// nop
@ -111,6 +117,12 @@ public class MirrorSourceConnector extends SourceConnector {
@@ -111,6 +117,12 @@ public class MirrorSourceConnector extends SourceConnector {
this . configPropertyFilter = configPropertyFilter ;
}
// visible for testing
MirrorSourceConnector ( Admin sourceAdminClient , Admin targetAdminClient ) {
this . sourceAdminClient = sourceAdminClient ;
this . targetAdminClient = targetAdminClient ;
}
@Override
public void start ( Map < String , String > props ) {
long start = System . currentTimeMillis ( ) ;
@ -337,16 +349,20 @@ public class MirrorSourceConnector extends SourceConnector {
@@ -337,16 +349,20 @@ public class MirrorSourceConnector extends SourceConnector {
. collect ( Collectors . toSet ( ) ) ;
}
private void syncTopicAcls ( )
// Visible for testing
void syncTopicAcls ( )
throws InterruptedException , ExecutionException {
List < AclBinding > bindings = listTopicAclBindings ( ) . stream ( )
Optional < Collection < AclBinding > > rawBindings = listTopicAclBindings ( ) ;
if ( ! rawBindings . isPresent ( ) )
return ;
List < AclBinding > filteredBindings = rawBindings . get ( ) . stream ( )
. filter ( x - > x . pattern ( ) . resourceType ( ) = = ResourceType . TOPIC )
. filter ( x - > x . pattern ( ) . patternType ( ) = = PatternType . LITERAL )
. filter ( this : : shouldReplicateAcl )
. filter ( x - > shouldReplicateTopic ( x . pattern ( ) . name ( ) ) )
. map ( this : : targetAclBinding )
. collect ( Collectors . toList ( ) ) ;
updateTopicAcls ( b indings) ;
updateTopicAcls ( filteredB indings) ;
}
private void syncTopicConfigs ( )
@ -451,9 +467,27 @@ public class MirrorSourceConnector extends SourceConnector {
@@ -451,9 +467,27 @@ public class MirrorSourceConnector extends SourceConnector {
return adminClient . listTopics ( ) . names ( ) . get ( ) ;
}
private Collection < AclBinding > listTopicAclBindings ( )
private Optional < Collection < AclBinding > > listTopicAclBindings ( )
throws InterruptedException , ExecutionException {
return sourceAdminClient . describeAcls ( ANY_TOPIC_ACL ) . values ( ) . get ( ) ;
Collection < AclBinding > bindings ;
try {
bindings = sourceAdminClient . describeAcls ( ANY_TOPIC_ACL ) . values ( ) . get ( ) ;
} catch ( ExecutionException e ) {
if ( e . getCause ( ) instanceof SecurityDisabledException ) {
if ( noAclAuthorizer . compareAndSet ( false , true ) ) {
log . info (
"No ACL authorizer is configured on the source Kafka cluster, so no topic ACL syncing will take place. "
+ "Consider disabling topic ACL syncing by setting " + SYNC_TOPIC_ACLS_ENABLED + " to 'false'."
) ;
} else {
log . debug ( "Source-side ACL authorizer still not found; skipping topic ACL sync" ) ;
}
return Optional . empty ( ) ;
} else {
throw e ;
}
}
return Optional . of ( bindings ) ;
}
private static Collection < TopicDescription > describeTopics ( Admin adminClient , Collection < String > topics )