Browse Source

MINOR: Use `Topic::isInternalTopic` instead of directly checking (#7047)

We don't allow changing number of partitions for internal topics. To do
so we check if the topic name belongs to the set of internal topics
directly instead of using the "isInternalTopic" method. This breaks the
encapsulation by making client aware of the fact that internal topics
have special names.

This is a simple change to use the method `Topic::isInternalTopic`
method instead of checking it directly in "alterTopic" command. We
also reduce visibility to `Topic::INTERNAL_TOPICS` to avoid 
unnecessary reliance on it in the future.

Reviewers: Jason Gustafson <jason@confluent.io>
pull/7065/head
Vikas Singh 5 years ago committed by Jason Gustafson
parent
commit
38f86d139c
  1. 2
      clients/src/main/java/org/apache/kafka/common/internals/Topic.java
  2. 2
      clients/src/test/java/org/apache/kafka/test/TestUtils.java
  3. 4
      core/src/main/scala/kafka/admin/TopicCommand.scala

2
clients/src/main/java/org/apache/kafka/common/internals/Topic.java

@ -28,7 +28,7 @@ public class Topic {
public static final String TRANSACTION_STATE_TOPIC_NAME = "__transaction_state"; public static final String TRANSACTION_STATE_TOPIC_NAME = "__transaction_state";
public static final String LEGAL_CHARS = "[a-zA-Z0-9._-]"; public static final String LEGAL_CHARS = "[a-zA-Z0-9._-]";
public static final Set<String> INTERNAL_TOPICS = Collections.unmodifiableSet( private static final Set<String> INTERNAL_TOPICS = Collections.unmodifiableSet(
Utils.mkSet(GROUP_METADATA_TOPIC_NAME, TRANSACTION_STATE_TOPIC_NAME)); Utils.mkSet(GROUP_METADATA_TOPIC_NAME, TRANSACTION_STATE_TOPIC_NAME));
private static final int MAX_NAME_LENGTH = 249; private static final int MAX_NAME_LENGTH = 249;

2
clients/src/test/java/org/apache/kafka/test/TestUtils.java

@ -107,7 +107,7 @@ public class TestUtils {
for (int i = 0; i < partitions; i++) for (int i = 0; i < partitions; i++)
parts.add(new PartitionInfo(topic, i, ns[i % ns.length], ns, ns)); parts.add(new PartitionInfo(topic, i, ns[i % ns.length], ns, ns));
} }
return new Cluster("kafka-cluster", asList(ns), parts, Collections.emptySet(), Topic.INTERNAL_TOPICS); return new Cluster("kafka-cluster", asList(ns), parts, Collections.emptySet(), Collections.emptySet());
} }
public static MetadataResponse metadataUpdateWith(final int numNodes, public static MetadataResponse metadataUpdateWith(final int numNodes,

4
core/src/main/scala/kafka/admin/TopicCommand.scala

@ -333,8 +333,8 @@ object TopicCommand extends Logging {
} }
if(tp.hasPartitions) { if(tp.hasPartitions) {
if (Topic.INTERNAL_TOPICS.contains(topic)) { if (Topic.isInternal(topic)) {
throw new IllegalArgumentException(s"The number of partitions for the internal topics${Topic.INTERNAL_TOPICS} cannot be changed.") throw new IllegalArgumentException(s"The number of partitions for the internal topic $topic cannot be changed.")
} }
println("WARNING: If partitions are increased for a topic that has a key, the partition " + println("WARNING: If partitions are increased for a topic that has a key, the partition " +
"logic or ordering of the messages will be affected") "logic or ordering of the messages will be affected")

Loading…
Cancel
Save