From d36832e6fc531b56bbab49e60d96efdf17fb6191 Mon Sep 17 00:00:00 2001 From: Colin Patrick McCabe Date: Tue, 26 Oct 2021 15:27:30 -0700 Subject: [PATCH] MINOR: fix the path metadata shell uses for client quotas (#11437) Client quotas should appear under /client-quotas rather than /configs, since client quotas are not configs. Additionally we should correctly handle the case where the entity name is null (aka "default" quotas.) Reviewers: Jason Gustafson --- .../kafka/shell/MetadataNodeManager.java | 36 +++++++++++++------ .../kafka/shell/MetadataNodeManagerTest.java | 28 +++++++++++---- 2 files changed, 47 insertions(+), 17 deletions(-) diff --git a/shell/src/main/java/org/apache/kafka/shell/MetadataNodeManager.java b/shell/src/main/java/org/apache/kafka/shell/MetadataNodeManager.java index d4e9cd30dac..fa1b411289c 100644 --- a/shell/src/main/java/org/apache/kafka/shell/MetadataNodeManager.java +++ b/shell/src/main/java/org/apache/kafka/shell/MetadataNodeManager.java @@ -22,6 +22,7 @@ import com.fasterxml.jackson.databind.ObjectMapper; import com.fasterxml.jackson.datatype.jdk8.Jdk8Module; import org.apache.kafka.common.config.ConfigResource; import org.apache.kafka.common.metadata.ClientQuotaRecord; +import org.apache.kafka.common.metadata.ClientQuotaRecord.EntityData; import org.apache.kafka.common.metadata.ConfigRecord; import org.apache.kafka.common.metadata.FenceBrokerRecord; import org.apache.kafka.common.metadata.MetadataRecordType; @@ -50,6 +51,10 @@ import org.apache.kafka.snapshot.SnapshotReader; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import java.util.ArrayList; +import java.util.List; +import java.util.Map; +import java.util.TreeMap; import java.util.concurrent.CompletableFuture; import java.util.function.Consumer; @@ -302,21 +307,32 @@ public final class MetadataNodeManager implements AutoCloseable { } case CLIENT_QUOTA_RECORD: { ClientQuotaRecord record = (ClientQuotaRecord) message; - DirectoryNode configsDirectory = - data.root.mkdirs("configs"); - for (ClientQuotaRecord.EntityData entityData : record.entity()) { - String entityType = entityData.entityType(); - String entityName = entityData.entityName(); - DirectoryNode entityDirectory = configsDirectory.mkdirs(entityType).mkdirs(entityName); - if (record.remove()) - entityDirectory.rmrf(record.key()); - else - entityDirectory.create(record.key()).setContents(record.value() + ""); + List directories = clientQuotaRecordDirectories(record.entity()); + DirectoryNode node = data.root; + for (String directory : directories) { + node = node.mkdirs(directory); } + if (record.remove()) + node.rmrf(record.key()); + else + node.create(record.key()).setContents(record.value() + ""); break; } default: throw new RuntimeException("Unhandled metadata record type"); } } + + static List clientQuotaRecordDirectories(List entityData) { + List result = new ArrayList<>(); + result.add("client-quotas"); + TreeMap entries = new TreeMap<>(); + entityData.forEach(e -> entries.put(e.entityType(), e)); + for (Map.Entry entry : entries.entrySet()) { + result.add(entry.getKey()); + result.add(entry.getValue().entityName() == null ? + "" : entry.getValue().entityName()); + } + return result; + } } diff --git a/shell/src/test/java/org/apache/kafka/shell/MetadataNodeManagerTest.java b/shell/src/test/java/org/apache/kafka/shell/MetadataNodeManagerTest.java index 605e801d17e..81483f5290c 100644 --- a/shell/src/test/java/org/apache/kafka/shell/MetadataNodeManagerTest.java +++ b/shell/src/test/java/org/apache/kafka/shell/MetadataNodeManagerTest.java @@ -17,7 +17,6 @@ package org.apache.kafka.shell; - import org.apache.kafka.common.Uuid; import org.apache.kafka.common.config.ConfigResource; import org.apache.kafka.common.metadata.ClientQuotaRecord; @@ -41,6 +40,7 @@ import static org.apache.kafka.metadata.LeaderConstants.NO_LEADER_CHANGE; import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertFalse; + public class MetadataNodeManagerTest { private MetadataNodeManager metadataNodeManager; @@ -264,16 +264,30 @@ public class MetadataNodeManagerTest { metadataNodeManager.handleMessage(record); assertEquals("1000.0", - metadataNodeManager.getData().root().directory("configs", "user", "kraft").file("producer_byte_rate").contents()); - assertEquals("1000.0", - metadataNodeManager.getData().root().directory("configs", "client", "kstream").file("producer_byte_rate").contents()); + metadataNodeManager.getData().root().directory("client-quotas", + "client", "kstream", + "user", "kraft").file("producer_byte_rate").contents()); metadataNodeManager.handleMessage(record.setRemove(true)); assertFalse( - metadataNodeManager.getData().root().directory("configs", "user", "kraft").children().containsKey("producer_byte_rate")); - assertFalse( - metadataNodeManager.getData().root().directory("configs", "client", "kstream").children().containsKey("producer_byte_rate")); + metadataNodeManager.getData().root().directory("client-quotas", + "client", "kstream", + "user", "kraft").children().containsKey("producer_byte_rate")); + + record = new ClientQuotaRecord() + .setEntity(Arrays.asList( + new ClientQuotaRecord.EntityData() + .setEntityType("user") + .setEntityName(null) + )) + .setKey("producer_byte_rate") + .setValue(2000.0); + + metadataNodeManager.handleMessage(record); + assertEquals("2000.0", + metadataNodeManager.getData().root().directory("client-quotas", + "user", "").file("producer_byte_rate").contents()); } }