From 38f86d139cbd8a3b92ce555f185e52252f626cd4 Mon Sep 17 00:00:00 2001 From: Vikas Singh <50422828+soondenana@users.noreply.github.com> Date: Tue, 9 Jul 2019 17:51:54 -0700 Subject: [PATCH] MINOR: Use `Topic::isInternalTopic` instead of directly checking (#7047) We don't allow changing number of partitions for internal topics. To do so we check if the topic name belongs to the set of internal topics directly instead of using the "isInternalTopic" method. This breaks the encapsulation by making client aware of the fact that internal topics have special names. This is a simple change to use the method `Topic::isInternalTopic` method instead of checking it directly in "alterTopic" command. We also reduce visibility to `Topic::INTERNAL_TOPICS` to avoid unnecessary reliance on it in the future. Reviewers: Jason Gustafson --- .../main/java/org/apache/kafka/common/internals/Topic.java | 2 +- clients/src/test/java/org/apache/kafka/test/TestUtils.java | 2 +- core/src/main/scala/kafka/admin/TopicCommand.scala | 4 ++-- 3 files changed, 4 insertions(+), 4 deletions(-) diff --git a/clients/src/main/java/org/apache/kafka/common/internals/Topic.java b/clients/src/main/java/org/apache/kafka/common/internals/Topic.java index 619477d3260..a5ef3357dd1 100644 --- a/clients/src/main/java/org/apache/kafka/common/internals/Topic.java +++ b/clients/src/main/java/org/apache/kafka/common/internals/Topic.java @@ -28,7 +28,7 @@ public class Topic { public static final String TRANSACTION_STATE_TOPIC_NAME = "__transaction_state"; public static final String LEGAL_CHARS = "[a-zA-Z0-9._-]"; - public static final Set INTERNAL_TOPICS = Collections.unmodifiableSet( + private static final Set INTERNAL_TOPICS = Collections.unmodifiableSet( Utils.mkSet(GROUP_METADATA_TOPIC_NAME, TRANSACTION_STATE_TOPIC_NAME)); private static final int MAX_NAME_LENGTH = 249; diff --git a/clients/src/test/java/org/apache/kafka/test/TestUtils.java b/clients/src/test/java/org/apache/kafka/test/TestUtils.java index 4d35f06d44a..5966cbe4a2f 100644 --- a/clients/src/test/java/org/apache/kafka/test/TestUtils.java +++ b/clients/src/test/java/org/apache/kafka/test/TestUtils.java @@ -107,7 +107,7 @@ public class TestUtils { for (int i = 0; i < partitions; i++) parts.add(new PartitionInfo(topic, i, ns[i % ns.length], ns, ns)); } - return new Cluster("kafka-cluster", asList(ns), parts, Collections.emptySet(), Topic.INTERNAL_TOPICS); + return new Cluster("kafka-cluster", asList(ns), parts, Collections.emptySet(), Collections.emptySet()); } public static MetadataResponse metadataUpdateWith(final int numNodes, diff --git a/core/src/main/scala/kafka/admin/TopicCommand.scala b/core/src/main/scala/kafka/admin/TopicCommand.scala index 5042fa27fca..4f529969645 100755 --- a/core/src/main/scala/kafka/admin/TopicCommand.scala +++ b/core/src/main/scala/kafka/admin/TopicCommand.scala @@ -333,8 +333,8 @@ object TopicCommand extends Logging { } if(tp.hasPartitions) { - if (Topic.INTERNAL_TOPICS.contains(topic)) { - throw new IllegalArgumentException(s"The number of partitions for the internal topics${Topic.INTERNAL_TOPICS} cannot be changed.") + if (Topic.isInternal(topic)) { + throw new IllegalArgumentException(s"The number of partitions for the internal topic $topic cannot be changed.") } println("WARNING: If partitions are increased for a topic that has a key, the partition " + "logic or ordering of the messages will be affected")