@ -22,6 +22,7 @@ import com.fasterxml.jackson.databind.ObjectMapper;
@@ -22,6 +22,7 @@ import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.datatype.jdk8.Jdk8Module ;
import org.apache.kafka.common.config.ConfigResource ;
import org.apache.kafka.common.metadata.ClientQuotaRecord ;
import org.apache.kafka.common.metadata.ClientQuotaRecord.EntityData ;
import org.apache.kafka.common.metadata.ConfigRecord ;
import org.apache.kafka.common.metadata.FenceBrokerRecord ;
import org.apache.kafka.common.metadata.MetadataRecordType ;
@ -50,6 +51,10 @@ import org.apache.kafka.snapshot.SnapshotReader;
@@ -50,6 +51,10 @@ import org.apache.kafka.snapshot.SnapshotReader;
import org.slf4j.Logger ;
import org.slf4j.LoggerFactory ;
import java.util.ArrayList ;
import java.util.List ;
import java.util.Map ;
import java.util.TreeMap ;
import java.util.concurrent.CompletableFuture ;
import java.util.function.Consumer ;
@ -302,21 +307,32 @@ public final class MetadataNodeManager implements AutoCloseable {
@@ -302,21 +307,32 @@ public final class MetadataNodeManager implements AutoCloseable {
}
case CLIENT_QUOTA_RECORD : {
ClientQuotaRecord record = ( ClientQuotaRecord ) message ;
DirectoryNode configsDirectory =
data . root . mkdirs ( "configs" ) ;
for ( ClientQuotaRecord . EntityData entityData : record . entity ( ) ) {
String entityType = entityData . entityType ( ) ;
String entityName = entityData . entityName ( ) ;
DirectoryNode entityDirectory = configsDirectory . mkdirs ( entityType ) . mkdirs ( entityName ) ;
if ( record . remove ( ) )
entityDirectory . rmrf ( record . key ( ) ) ;
else
entityDirectory . create ( record . key ( ) ) . setContents ( record . value ( ) + "" ) ;
List < String > directories = clientQuotaRecordDirectories ( record . entity ( ) ) ;
DirectoryNode node = data . root ;
for ( String directory : directories ) {
node = node . mkdirs ( directory ) ;
}
if ( record . remove ( ) )
node . rmrf ( record . key ( ) ) ;
else
node . create ( record . key ( ) ) . setContents ( record . value ( ) + "" ) ;
break ;
}
default :
throw new RuntimeException ( "Unhandled metadata record type" ) ;
}
}
static List < String > clientQuotaRecordDirectories ( List < EntityData > entityData ) {
List < String > result = new ArrayList < > ( ) ;
result . add ( "client-quotas" ) ;
TreeMap < String , EntityData > entries = new TreeMap < > ( ) ;
entityData . forEach ( e - > entries . put ( e . entityType ( ) , e ) ) ;
for ( Map . Entry < String , EntityData > entry : entries . entrySet ( ) ) {
result . add ( entry . getKey ( ) ) ;
result . add ( entry . getValue ( ) . entityName ( ) = = null ?
"<default>" : entry . getValue ( ) . entityName ( ) ) ;
}
return result ;
}
}