@ -121,6 +121,60 @@ class ZkMigrationIntegrationTest {
@@ -121,6 +121,60 @@ class ZkMigrationIntegrationTest {
}
}
@ClusterTest (
brokers = 3 , clusterType = Type . ZK , autoStart = AutoStart . YES ,
metadataVersion = MetadataVersion . IBP_3_4_IV0 ,
serverProperties = Array (
new ClusterConfigProperty ( key = "authorizer.class.name" , value = "kafka.security.authorizer.AclAuthorizer" ) ,
new ClusterConfigProperty ( key = "super.users" , value = "User:ANONYMOUS" ) ,
new ClusterConfigProperty ( key = "inter.broker.listener.name" , value = "EXTERNAL" ) ,
new ClusterConfigProperty ( key = "listeners" , value = "PLAINTEXT://localhost:0,EXTERNAL://localhost:0" ) ,
new ClusterConfigProperty ( key = "advertised.listeners" , value = "PLAINTEXT://localhost:0,EXTERNAL://localhost:0" ) ,
new ClusterConfigProperty ( key = "listener.security.protocol.map" , value = "EXTERNAL:PLAINTEXT,PLAINTEXT:PLAINTEXT" )
)
)
def testStartZkBrokerWithAuthorizer ( zkCluster : ClusterInstance ) : Unit = {
// Bootstrap the ZK cluster ID into KRaft
val clusterId = zkCluster . clusterId ( )
val kraftCluster = new KafkaClusterTestKit . Builder (
new TestKitNodes . Builder ( ) .
setBootstrapMetadataVersion ( MetadataVersion . IBP_3_4_IV0 ) .
setClusterId ( Uuid . fromString ( clusterId ) ) .
setNumBrokerNodes ( 0 ) .
setNumControllerNodes ( 1 ) . build ( ) )
. setConfigProp ( KafkaConfig . MigrationEnabledProp , "true" )
. setConfigProp ( KafkaConfig . ZkConnectProp , zkCluster . asInstanceOf [ ZkClusterInstance ] . getUnderlying . zkConnect )
. build ( )
try {
kraftCluster . format ( )
kraftCluster . startup ( )
val readyFuture = kraftCluster . controllers ( ) . values ( ) . asScala . head . controller . waitForReadyBrokers ( 3 )
// Enable migration configs and restart brokers
log . info ( "Restart brokers in migration mode" )
val clientProps = kraftCluster . controllerClientProperties ( )
val voters = clientProps . get ( RaftConfig . QUORUM_VOTERS_CONFIG )
zkCluster . config ( ) . serverProperties ( ) . put ( KafkaConfig . MigrationEnabledProp , "true" )
zkCluster . config ( ) . serverProperties ( ) . put ( RaftConfig . QUORUM_VOTERS_CONFIG , voters )
zkCluster . config ( ) . serverProperties ( ) . put ( KafkaConfig . ControllerListenerNamesProp , "CONTROLLER" )
zkCluster . config ( ) . serverProperties ( ) . put ( KafkaConfig . ListenerSecurityProtocolMapProp , "CONTROLLER:PLAINTEXT,EXTERNAL:PLAINTEXT,PLAINTEXT:PLAINTEXT" )
zkCluster . rollingBrokerRestart ( ) // This would throw if authorizers weren 't allowed
zkCluster . waitForReadyBrokers ( )
readyFuture . get ( 30 , TimeUnit . SECONDS )
val zkClient = zkCluster . asInstanceOf [ ZkClusterInstance ] . getUnderlying ( ) . zkClient
TestUtils . waitUntilTrue ( ( ) => zkClient . getControllerId . contains ( 3000 ) , "Timed out waiting for KRaft controller to take over" )
def inDualWrite ( ) : Boolean = {
val migrationState = kraftCluster . controllers ( ) . get ( 3000 ) . migrationSupport . get . migrationDriver . migrationState ( ) . get ( 10 , TimeUnit . SECONDS )
migrationState . allowDualWrite ( )
}
TestUtils . waitUntilTrue ( ( ) => inDualWrite ( ) , "Timed out waiting for dual-write mode" )
} finally {
shutdownInSequence ( zkCluster , kraftCluster )
}
}
/* *
* Test ZkMigrationClient against a real ZooKeeper - backed Kafka cluster . This test creates a ZK cluster
* and modifies data using AdminClient . The ZkMigrationClient is then used to read the metadata from ZK