diff --git a/clients/src/main/java/org/apache/kafka/clients/Metadata.java b/clients/src/main/java/org/apache/kafka/clients/Metadata.java index 9ff629d7364..0963bad2e03 100644 --- a/clients/src/main/java/org/apache/kafka/clients/Metadata.java +++ b/clients/src/main/java/org/apache/kafka/clients/Metadata.java @@ -66,17 +66,11 @@ public final class Metadata { private final List listeners; private final ClusterResourceListeners clusterResourceListeners; private boolean needMetadataForAllTopics; + private final boolean allowAutoTopicCreation; private final boolean topicExpiryEnabled; - /** - * Create a metadata instance with reasonable defaults - */ - public Metadata() { - this(100L, 60 * 60 * 1000L); - } - - public Metadata(long refreshBackoffMs, long metadataExpireMs) { - this(refreshBackoffMs, metadataExpireMs, false, new ClusterResourceListeners()); + public Metadata(long refreshBackoffMs, long metadataExpireMs, boolean allowAutoTopicCreation) { + this(refreshBackoffMs, metadataExpireMs, allowAutoTopicCreation, false, new ClusterResourceListeners()); } /** @@ -84,12 +78,16 @@ public final class Metadata { * @param refreshBackoffMs The minimum amount of time that must expire between metadata refreshes to avoid busy * polling * @param metadataExpireMs The maximum amount of time that metadata can be retained without refresh + * @param allowAutoTopicCreation If this and the broker config 'auto.create.topics.enable' are true, topics that + * don't exist will be created by the broker when a metadata request is sent * @param topicExpiryEnabled If true, enable expiry of unused topics * @param clusterResourceListeners List of ClusterResourceListeners which will receive metadata updates. */ - public Metadata(long refreshBackoffMs, long metadataExpireMs, boolean topicExpiryEnabled, ClusterResourceListeners clusterResourceListeners) { + public Metadata(long refreshBackoffMs, long metadataExpireMs, boolean allowAutoTopicCreation, + boolean topicExpiryEnabled, ClusterResourceListeners clusterResourceListeners) { this.refreshBackoffMs = refreshBackoffMs; this.metadataExpireMs = metadataExpireMs; + this.allowAutoTopicCreation = allowAutoTopicCreation; this.topicExpiryEnabled = topicExpiryEnabled; this.lastRefreshMs = 0L; this.lastSuccessfulRefreshMs = 0L; @@ -275,6 +273,10 @@ public final class Metadata { return this.lastSuccessfulRefreshMs; } + public boolean allowAutoTopicCreation() { + return allowAutoTopicCreation; + } + /** * Set state to indicate if metadata for all topics in Kafka cluster is required or not. * @param needMetadataForAllTopics boolean indicating need for metadata of all topics in cluster. diff --git a/clients/src/main/java/org/apache/kafka/clients/NetworkClient.java b/clients/src/main/java/org/apache/kafka/clients/NetworkClient.java index bfd0eb56d11..1d4fe58949a 100644 --- a/clients/src/main/java/org/apache/kafka/clients/NetworkClient.java +++ b/clients/src/main/java/org/apache/kafka/clients/NetworkClient.java @@ -829,7 +829,8 @@ public class NetworkClient implements KafkaClient { if (metadata.needMetadataForAllTopics()) metadataRequest = MetadataRequest.Builder.allTopics(); else - metadataRequest = new MetadataRequest.Builder(new ArrayList<>(metadata.topics())); + metadataRequest = new MetadataRequest.Builder(new ArrayList<>(metadata.topics()), + metadata.allowAutoTopicCreation()); log.debug("Sending metadata request {} to node {}", metadataRequest, node.id()); diff --git a/clients/src/main/java/org/apache/kafka/clients/admin/AdminClient.java b/clients/src/main/java/org/apache/kafka/clients/admin/AdminClient.java index 8ae32496fe7..73ea75452ab 100644 --- a/clients/src/main/java/org/apache/kafka/clients/admin/AdminClient.java +++ b/clients/src/main/java/org/apache/kafka/clients/admin/AdminClient.java @@ -167,12 +167,6 @@ public abstract class AdminClient implements AutoCloseable { /** * Describe some topics in the cluster. * - * Note that if auto.create.topics.enable is true on the brokers, - * describeTopics(topicName, ...) may create a topic named topicName. - * There are two workarounds: either use AdminClient#listTopics and ensure - * that the topic is present before describing, or disable - * auto.create.topics.enable. - * * @param topicNames The names of the topics to describe. * @param options The options to use when describing the topic. * diff --git a/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java b/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java index 9fa0cadb6fd..199b07a755d 100644 --- a/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java +++ b/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java @@ -282,7 +282,7 @@ public class KafkaAdminClient extends AdminClient { try { metadata = new Metadata(config.getLong(AdminClientConfig.RETRY_BACKOFF_MS_CONFIG), - config.getLong(AdminClientConfig.METADATA_MAX_AGE_CONFIG)); + config.getLong(AdminClientConfig.METADATA_MAX_AGE_CONFIG), false); List reporters = config.getConfiguredInstances(AdminClientConfig.METRIC_REPORTER_CLASSES_CONFIG, MetricsReporter.class); Map metricTags = Collections.singletonMap("client-id", clientId); @@ -1144,7 +1144,7 @@ public class KafkaAdminClient extends AdminClient { final Map> topicFutures = new HashMap<>(topicNames.size()); final ArrayList topicNamesList = new ArrayList<>(); for (String topicName : topicNames) { - if (topicFutures.get(topicName) == null) { + if (!topicFutures.containsKey(topicName)) { topicFutures.put(topicName, new KafkaFutureImpl()); topicNamesList.add(topicName); } @@ -1153,9 +1153,14 @@ public class KafkaAdminClient extends AdminClient { runnable.call(new Call("describeTopics", calcDeadlineMs(now, options.timeoutMs()), new ControllerNodeProvider()) { + private boolean supportsDisablingTopicCreation = true; + @Override AbstractRequest.Builder createRequest(int timeoutMs) { - return new MetadataRequest.Builder(topicNamesList); + if (supportsDisablingTopicCreation) + return new MetadataRequest.Builder(topicNamesList, false); + else + return MetadataRequest.Builder.allTopics(); } @Override @@ -1189,6 +1194,15 @@ public class KafkaAdminClient extends AdminClient { } } + @Override + boolean handleUnsupportedVersionException(UnsupportedVersionException exception) { + if (supportsDisablingTopicCreation) { + supportsDisablingTopicCreation = false; + return true; + } + return false; + } + @Override void handleFailure(Throwable throwable) { completeAllExceptionally(topicFutures.values(), throwable); @@ -1208,7 +1222,7 @@ public class KafkaAdminClient extends AdminClient { @Override AbstractRequest.Builder createRequest(int timeoutMs) { - return new MetadataRequest.Builder(Collections.emptyList()); + return new MetadataRequest.Builder(Collections.emptyList(), false); } @Override diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java b/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java index 52d94563804..055712eac06 100644 --- a/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java +++ b/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java @@ -659,7 +659,8 @@ public class KafkaConsumer implements Consumer { this.valueDeserializer = valueDeserializer; } ClusterResourceListeners clusterResourceListeners = configureClusterResourceListeners(keyDeserializer, valueDeserializer, reporters, interceptorList); - this.metadata = new Metadata(retryBackoffMs, config.getLong(ConsumerConfig.METADATA_MAX_AGE_CONFIG), false, clusterResourceListeners); + this.metadata = new Metadata(retryBackoffMs, config.getLong(ConsumerConfig.METADATA_MAX_AGE_CONFIG), + true, false, clusterResourceListeners); List addresses = ClientUtils.parseAndValidateAddresses(config.getList(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG)); this.metadata.update(Cluster.bootstrap(addresses), Collections.emptySet(), 0); String metricGrpPrefix = "consumer"; @@ -1352,7 +1353,7 @@ public class KafkaConsumer implements Consumer { return parts; Map> topicMetadata = fetcher.getTopicMetadata( - new MetadataRequest.Builder(Collections.singletonList(topic)), requestTimeoutMs); + new MetadataRequest.Builder(Collections.singletonList(topic), true), requestTimeoutMs); return topicMetadata.get(topic); } finally { release(); diff --git a/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java b/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java index dc6b911d09d..89a18e3802f 100644 --- a/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java +++ b/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java @@ -265,7 +265,8 @@ public class KafkaProducer implements Producer { ProducerInterceptor.class); this.interceptors = interceptorList.isEmpty() ? null : new ProducerInterceptors<>(interceptorList); ClusterResourceListeners clusterResourceListeners = configureClusterResourceListeners(keySerializer, valueSerializer, interceptorList, reporters); - this.metadata = new Metadata(retryBackoffMs, config.getLong(ProducerConfig.METADATA_MAX_AGE_CONFIG), true, clusterResourceListeners); + this.metadata = new Metadata(retryBackoffMs, config.getLong(ProducerConfig.METADATA_MAX_AGE_CONFIG), + true, true, clusterResourceListeners); this.maxRequestSize = config.getInt(ProducerConfig.MAX_REQUEST_SIZE_CONFIG); this.totalMemorySize = config.getLong(ProducerConfig.BUFFER_MEMORY_CONFIG); this.compressionType = CompressionType.forName(config.getString(ProducerConfig.COMPRESSION_TYPE_CONFIG)); diff --git a/clients/src/main/java/org/apache/kafka/common/protocol/Protocol.java b/clients/src/main/java/org/apache/kafka/common/protocol/Protocol.java index 91391e99c4d..383332b93f0 100644 --- a/clients/src/main/java/org/apache/kafka/common/protocol/Protocol.java +++ b/clients/src/main/java/org/apache/kafka/common/protocol/Protocol.java @@ -68,6 +68,16 @@ public class Protocol { /* The v3 metadata request is the same as v1 and v2. An additional field for throttle time has been added to the v3 metadata response */ public static final Schema METADATA_REQUEST_V3 = METADATA_REQUEST_V2; + /* The v4 metadata request has an additional field for allowing auto topic creation. The response is the same as v3. */ + public static final Schema METADATA_REQUEST_V4 = new Schema(new Field("topics", + ArrayOf.nullable(STRING), + "An array of topics to fetch metadata for. If the topics array is null fetch metadata for all topics."), + new Field("allow_auto_topic_creation", + BOOLEAN, + "If this and the broker config 'auto.create.topics.enable' are true, " + + "topics that don't exist will be created by the broker. " + + "Otherwise, no topics will be created by the broker.")); + public static final Schema METADATA_BROKER_V0 = new Schema(new Field("node_id", INT32, "The broker id."), new Field("host", STRING, "The hostname of the broker."), new Field("port", INT32, @@ -142,8 +152,10 @@ public class Protocol { "The broker id of the controller broker."), new Field("topic_metadata", new ArrayOf(TOPIC_METADATA_V1))); - public static final Schema[] METADATA_REQUEST = {METADATA_REQUEST_V0, METADATA_REQUEST_V1, METADATA_REQUEST_V2, METADATA_REQUEST_V3}; - public static final Schema[] METADATA_RESPONSE = {METADATA_RESPONSE_V0, METADATA_RESPONSE_V1, METADATA_RESPONSE_V2, METADATA_RESPONSE_V3}; + public static final Schema METADATA_RESPONSE_V4 = METADATA_RESPONSE_V3; + + public static final Schema[] METADATA_REQUEST = {METADATA_REQUEST_V0, METADATA_REQUEST_V1, METADATA_REQUEST_V2, METADATA_REQUEST_V3, METADATA_REQUEST_V4}; + public static final Schema[] METADATA_RESPONSE = {METADATA_RESPONSE_V0, METADATA_RESPONSE_V1, METADATA_RESPONSE_V2, METADATA_RESPONSE_V3, METADATA_RESPONSE_V4}; /* Produce api */ @@ -1181,8 +1193,8 @@ public class Protocol { new Field("api_versions", new ArrayOf(API_VERSIONS_V0), "API versions supported by the broker."), newThrottleTimeField()); - public static final Schema[] API_VERSIONS_REQUEST = new Schema[]{API_VERSIONS_REQUEST_V0, API_VERSIONS_REQUEST_V1}; - public static final Schema[] API_VERSIONS_RESPONSE = new Schema[]{API_VERSIONS_RESPONSE_V0, API_VERSIONS_RESPONSE_V1}; + public static final Schema[] API_VERSIONS_REQUEST = {API_VERSIONS_REQUEST_V0, API_VERSIONS_REQUEST_V1}; + public static final Schema[] API_VERSIONS_RESPONSE = {API_VERSIONS_RESPONSE_V0, API_VERSIONS_RESPONSE_V1}; /* Admin requests common */ public static final Schema CONFIG_ENTRY = new Schema(new Field("config_name", STRING, "Configuration name"), @@ -1636,8 +1648,8 @@ public class Protocol { new ArrayOf(DESCRIBE_ACLS_RESOURCE), "The resources and their associated ACLs.")); - public static final Schema[] DESCRIBE_ACLS_REQUEST = new Schema[] {DESCRIBE_ACLS_REQUEST_V0}; - public static final Schema[] DESCRIBE_ACLS_RESPONSE = new Schema[] {DESCRIBE_ACLS_RESPONSE_V0}; + public static final Schema[] DESCRIBE_ACLS_REQUEST = {DESCRIBE_ACLS_REQUEST_V0}; + public static final Schema[] DESCRIBE_ACLS_RESPONSE = {DESCRIBE_ACLS_RESPONSE_V0}; public static final Schema CREATE_ACLS_REQUEST_V0 = new Schema( new Field("creations", @@ -1658,8 +1670,8 @@ public class Protocol { new Field("error_message", NULLABLE_STRING, "The error message.") )))); - public static final Schema[] CREATE_ACLS_REQUEST = new Schema[] {CREATE_ACLS_REQUEST_V0}; - public static final Schema[] CREATE_ACLS_RESPONSE = new Schema[] {CREATE_ACLS_RESPONSE_V0}; + public static final Schema[] CREATE_ACLS_REQUEST = {CREATE_ACLS_REQUEST_V0}; + public static final Schema[] CREATE_ACLS_RESPONSE = {CREATE_ACLS_RESPONSE_V0}; public static final Schema DELETE_ACLS_REQUEST_V0 = new Schema( new Field("filters", @@ -1691,8 +1703,8 @@ public class Protocol { new Field("error_message", NULLABLE_STRING, "The error message."), new Field("matching_acls", new ArrayOf(MATCHING_ACL), "The matching ACLs"))))); - public static final Schema[] DELETE_ACLS_REQUEST = new Schema[] {DELETE_ACLS_REQUEST_V0}; - public static final Schema[] DELETE_ACLS_RESPONSE = new Schema[] {DELETE_ACLS_RESPONSE_V0}; + public static final Schema[] DELETE_ACLS_REQUEST = {DELETE_ACLS_REQUEST_V0}; + public static final Schema[] DELETE_ACLS_RESPONSE = {DELETE_ACLS_RESPONSE_V0}; /* an array of all requests and responses with all schema versions; a null value in the inner array means that the * particular version is not supported */ diff --git a/clients/src/main/java/org/apache/kafka/common/requests/MetadataRequest.java b/clients/src/main/java/org/apache/kafka/common/requests/MetadataRequest.java index 3c201391b9d..0493f3d5cf5 100644 --- a/clients/src/main/java/org/apache/kafka/common/requests/MetadataRequest.java +++ b/clients/src/main/java/org/apache/kafka/common/requests/MetadataRequest.java @@ -30,19 +30,26 @@ import java.util.List; public class MetadataRequest extends AbstractRequest { + private static final String TOPICS_KEY_NAME = "topics"; + private static final String ALLOW_AUTO_TOPIC_CREATION_KEY_NAME = "allow_auto_topic_creation"; + public static class Builder extends AbstractRequest.Builder { private static final List ALL_TOPICS = null; // The list of topics, or null if we want to request metadata about all topics. private final List topics; + private final boolean allowAutoTopicCreation; public static Builder allTopics() { - return new Builder(ALL_TOPICS); + // This never causes auto-creation, but we set the boolean to true because that is the default value when + // deserializing V2 and older. This way, the value is consistent after serialization and deserialization. + return new Builder(ALL_TOPICS, true); } - public Builder(List topics) { + public Builder(List topics, boolean allowAutoTopicCreation) { super(ApiKeys.METADATA); this.topics = topics; + this.allowAutoTopicCreation = allowAutoTopicCreation; } public List topics() { @@ -55,11 +62,12 @@ public class MetadataRequest extends AbstractRequest { @Override public MetadataRequest build(short version) { - if (version < 1) { - throw new UnsupportedVersionException("MetadataRequest " + - "versions older than 1 are not supported."); - } - return new MetadataRequest(this.topics, version); + if (version < 1) + throw new UnsupportedVersionException("MetadataRequest versions older than 1 are not supported."); + if (!allowAutoTopicCreation && version < 4) + throw new UnsupportedVersionException("MetadataRequest versions older than 4 don't support the " + + "allowAutoTopicCreation field"); + return new MetadataRequest(this.topics, allowAutoTopicCreation, version); } @Override @@ -77,18 +85,18 @@ public class MetadataRequest extends AbstractRequest { } } - private static final String TOPICS_KEY_NAME = "topics"; - private final List topics; + private final boolean allowAutoTopicCreation; /** * In v0 null is not allowed and an empty list indicates requesting all topics. * Note: modern clients do not support sending v0 requests. * In v1 null indicates requesting all topics, and an empty list indicates requesting no topics. */ - public MetadataRequest(List topics, short version) { + public MetadataRequest(List topics, boolean allowAutoTopicCreation, short version) { super(version); this.topics = topics; + this.allowAutoTopicCreation = allowAutoTopicCreation; } public MetadataRequest(Struct struct, short version) { @@ -102,6 +110,10 @@ public class MetadataRequest extends AbstractRequest { } else { topics = null; } + if (struct.hasField(ALLOW_AUTO_TOPIC_CREATION_KEY_NAME)) + allowAutoTopicCreation = struct.getBoolean(ALLOW_AUTO_TOPIC_CREATION_KEY_NAME); + else + allowAutoTopicCreation = true; } @Override @@ -122,6 +134,7 @@ public class MetadataRequest extends AbstractRequest { case 2: return new MetadataResponse(Collections.emptyList(), null, MetadataResponse.NO_CONTROLLER_ID, topicMetadatas); case 3: + case 4: return new MetadataResponse(throttleTimeMs, Collections.emptyList(), null, MetadataResponse.NO_CONTROLLER_ID, topicMetadatas); default: throw new IllegalArgumentException(String.format("Version %d is not valid. Valid versions for %s are 0 to %d", @@ -137,6 +150,10 @@ public class MetadataRequest extends AbstractRequest { return topics; } + public boolean allowAutoTopicCreation() { + return allowAutoTopicCreation; + } + public static MetadataRequest parse(ByteBuffer buffer, short version) { return new MetadataRequest(ApiKeys.METADATA.parseRequest(version, buffer), version); } @@ -148,6 +165,8 @@ public class MetadataRequest extends AbstractRequest { struct.set(TOPICS_KEY_NAME, null); else struct.set(TOPICS_KEY_NAME, topics.toArray()); + if (struct.hasField(ALLOW_AUTO_TOPIC_CREATION_KEY_NAME)) + struct.set(ALLOW_AUTO_TOPIC_CREATION_KEY_NAME, allowAutoTopicCreation); return struct; } } diff --git a/clients/src/test/java/org/apache/kafka/clients/MetadataTest.java b/clients/src/test/java/org/apache/kafka/clients/MetadataTest.java index 0c87fc75d8d..407eb9f19a0 100644 --- a/clients/src/test/java/org/apache/kafka/clients/MetadataTest.java +++ b/clients/src/test/java/org/apache/kafka/clients/MetadataTest.java @@ -45,8 +45,8 @@ public class MetadataTest { private long refreshBackoffMs = 100; private long metadataExpireMs = 1000; - private Metadata metadata = new Metadata(refreshBackoffMs, metadataExpireMs); - private AtomicReference backgroundError = new AtomicReference(); + private Metadata metadata = new Metadata(refreshBackoffMs, metadataExpireMs, true); + private AtomicReference backgroundError = new AtomicReference<>(); @After public void tearDown() { @@ -96,7 +96,7 @@ public class MetadataTest { } long largerOfBackoffAndExpire = Math.max(refreshBackoffMs, metadataExpireMs); - Metadata metadata = new Metadata(refreshBackoffMs, metadataExpireMs); + Metadata metadata = new Metadata(refreshBackoffMs, metadataExpireMs, true); assertEquals(0, metadata.timeToNextUpdate(now)); @@ -255,7 +255,7 @@ public class MetadataTest { MockClusterResourceListener mockClusterListener = new MockClusterResourceListener(); ClusterResourceListeners listeners = new ClusterResourceListeners(); listeners.maybeAdd(mockClusterListener); - metadata = new Metadata(refreshBackoffMs, metadataExpireMs, false, listeners); + metadata = new Metadata(refreshBackoffMs, metadataExpireMs, true, false, listeners); String hostName = "www.example.com"; Cluster cluster = Cluster.bootstrap(Arrays.asList(new InetSocketAddress(hostName, 9002))); @@ -348,7 +348,7 @@ public class MetadataTest { @Test public void testTopicExpiry() throws Exception { - metadata = new Metadata(refreshBackoffMs, metadataExpireMs, true, new ClusterResourceListeners()); + metadata = new Metadata(refreshBackoffMs, metadataExpireMs, true, true, new ClusterResourceListeners()); // Test that topic is expired if not used within the expiry interval long time = 0; @@ -380,7 +380,7 @@ public class MetadataTest { @Test public void testNonExpiringMetadata() throws Exception { - metadata = new Metadata(refreshBackoffMs, metadataExpireMs, false, new ClusterResourceListeners()); + metadata = new Metadata(refreshBackoffMs, metadataExpireMs, true, false, new ClusterResourceListeners()); // Test that topic is not expired if not used within the expiry interval long time = 0; diff --git a/clients/src/test/java/org/apache/kafka/clients/NetworkClientTest.java b/clients/src/test/java/org/apache/kafka/clients/NetworkClientTest.java index c87acd79a21..0de76a1a6a0 100644 --- a/clients/src/test/java/org/apache/kafka/clients/NetworkClientTest.java +++ b/clients/src/test/java/org/apache/kafka/clients/NetworkClientTest.java @@ -49,7 +49,7 @@ public class NetworkClientTest { protected final int requestTimeoutMs = 1000; protected final MockTime time = new MockTime(); protected final MockSelector selector = new MockSelector(time); - protected final Metadata metadata = new Metadata(0, Long.MAX_VALUE); + protected final Metadata metadata = new Metadata(0, Long.MAX_VALUE, true); protected final int nodeId = 1; protected final Cluster cluster = TestUtils.singletonCluster("test", nodeId); protected final Node node = cluster.nodes().get(0); @@ -86,8 +86,7 @@ public class NetworkClientTest { @Test(expected = IllegalStateException.class) public void testSendToUnreadyNode() { - MetadataRequest.Builder builder = - new MetadataRequest.Builder(Arrays.asList("test")); + MetadataRequest.Builder builder = new MetadataRequest.Builder(Arrays.asList("test"), true); long now = time.milliseconds(); ClientRequest request = client.newClientRequest("5", builder, now, false); client.send(request, now); @@ -251,8 +250,7 @@ public class NetworkClientTest { // metadata request when the remote node disconnects with the request in-flight. awaitReady(client, node); - MetadataRequest.Builder builder = - new MetadataRequest.Builder(Collections.emptyList()); + MetadataRequest.Builder builder = new MetadataRequest.Builder(Collections.emptyList(), true); long now = time.milliseconds(); ClientRequest request = client.newClientRequest(node.idString(), builder, now, true); client.send(request, now); diff --git a/clients/src/test/java/org/apache/kafka/clients/admin/MockKafkaAdminClientEnv.java b/clients/src/test/java/org/apache/kafka/clients/admin/MockKafkaAdminClientEnv.java index ba7b528926f..6c1fd17e5b8 100644 --- a/clients/src/test/java/org/apache/kafka/clients/admin/MockKafkaAdminClientEnv.java +++ b/clients/src/test/java/org/apache/kafka/clients/admin/MockKafkaAdminClientEnv.java @@ -50,7 +50,7 @@ public class MockKafkaAdminClientEnv implements AutoCloseable { this.adminClientConfig = new AdminClientConfig(config); this.cluster = cluster; this.metadata = new Metadata(adminClientConfig.getLong(AdminClientConfig.RETRY_BACKOFF_MS_CONFIG), - adminClientConfig.getLong(AdminClientConfig.METADATA_MAX_AGE_CONFIG)); + adminClientConfig.getLong(AdminClientConfig.METADATA_MAX_AGE_CONFIG), false); this.mockClient = new MockClient(Time.SYSTEM, this.metadata); this.client = KafkaAdminClient.createInternal(adminClientConfig, mockClient, metadata); } diff --git a/clients/src/test/java/org/apache/kafka/clients/consumer/KafkaConsumerTest.java b/clients/src/test/java/org/apache/kafka/clients/consumer/KafkaConsumerTest.java index 1249896d4d8..f918a34ec10 100644 --- a/clients/src/test/java/org/apache/kafka/clients/consumer/KafkaConsumerTest.java +++ b/clients/src/test/java/org/apache/kafka/clients/consumer/KafkaConsumerTest.java @@ -366,7 +366,7 @@ public class KafkaConsumerTest { Cluster cluster = TestUtils.singletonCluster(topic, 1); Node node = cluster.nodes().get(0); - Metadata metadata = new Metadata(0, Long.MAX_VALUE); + Metadata metadata = createMetadata(); metadata.update(cluster, Collections.emptySet(), time.milliseconds()); MockClient client = new MockClient(time, metadata); @@ -407,7 +407,7 @@ public class KafkaConsumerTest { Cluster cluster = TestUtils.singletonCluster(topic, 1); Node node = cluster.nodes().get(0); - Metadata metadata = new Metadata(0, Long.MAX_VALUE); + Metadata metadata = createMetadata(); metadata.update(cluster, Collections.emptySet(), time.milliseconds()); MockClient client = new MockClient(time, metadata); @@ -448,7 +448,7 @@ public class KafkaConsumerTest { Cluster cluster = TestUtils.singletonCluster(topic, 1); Node node = cluster.nodes().get(0); - Metadata metadata = new Metadata(0, Long.MAX_VALUE); + Metadata metadata = createMetadata(); metadata.update(cluster, Collections.emptySet(), time.milliseconds()); MockClient client = new MockClient(time, metadata); @@ -484,7 +484,7 @@ public class KafkaConsumerTest { Cluster cluster = TestUtils.singletonCluster(topic, 1); Node node = cluster.nodes().get(0); - Metadata metadata = new Metadata(0, Long.MAX_VALUE); + Metadata metadata = createMetadata(); metadata.update(cluster, Collections.emptySet(), time.milliseconds()); MockClient client = new MockClient(time, metadata); @@ -534,7 +534,7 @@ public class KafkaConsumerTest { Cluster cluster = TestUtils.singletonCluster(topic, 1); Node node = cluster.nodes().get(0); - Metadata metadata = new Metadata(0, Long.MAX_VALUE); + Metadata metadata = createMetadata(); metadata.update(cluster, Collections.emptySet(), time.milliseconds()); MockClient client = new MockClient(time, metadata); @@ -581,7 +581,7 @@ public class KafkaConsumerTest { topicMetadata.put(unmatchedTopic, 1); Cluster cluster = TestUtils.clusterWith(1, topicMetadata); - Metadata metadata = new Metadata(0, Long.MAX_VALUE); + Metadata metadata = createMetadata(); Node node = cluster.nodes().get(0); MockClient client = new MockClient(time, metadata); @@ -621,7 +621,7 @@ public class KafkaConsumerTest { topicMetadata.put(otherTopic, 1); Cluster cluster = TestUtils.clusterWith(1, topicMetadata); - Metadata metadata = new Metadata(0, Long.MAX_VALUE); + Metadata metadata = createMetadata(); Node node = cluster.nodes().get(0); MockClient client = new MockClient(time, metadata); @@ -664,7 +664,7 @@ public class KafkaConsumerTest { Cluster cluster = TestUtils.singletonCluster(topic, 1); Node node = cluster.nodes().get(0); - Metadata metadata = new Metadata(0, Long.MAX_VALUE); + Metadata metadata = createMetadata(); metadata.update(cluster, Collections.emptySet(), time.milliseconds()); MockClient client = new MockClient(time, metadata); @@ -720,7 +720,7 @@ public class KafkaConsumerTest { Cluster cluster = TestUtils.singletonCluster(topic, 1); final Node node = cluster.nodes().get(0); - Metadata metadata = new Metadata(0, Long.MAX_VALUE); + Metadata metadata = createMetadata(); metadata.update(cluster, Collections.emptySet(), time.milliseconds()); final MockClient client = new MockClient(time, metadata); @@ -760,7 +760,7 @@ public class KafkaConsumerTest { Cluster cluster = TestUtils.singletonCluster(singletonMap(topic, 1)); Node node = cluster.nodes().get(0); - Metadata metadata = new Metadata(0, Long.MAX_VALUE); + Metadata metadata = createMetadata(); metadata.update(cluster, Collections.emptySet(), time.milliseconds()); MockClient client = new MockClient(time, metadata); @@ -808,7 +808,7 @@ public class KafkaConsumerTest { Cluster cluster = TestUtils.singletonCluster(tpCounts); Node node = cluster.nodes().get(0); - Metadata metadata = new Metadata(0, Long.MAX_VALUE); + Metadata metadata = createMetadata(); metadata.update(cluster, Collections.emptySet(), time.milliseconds()); MockClient client = new MockClient(time, metadata); @@ -928,7 +928,7 @@ public class KafkaConsumerTest { Cluster cluster = TestUtils.singletonCluster(tpCounts); Node node = cluster.nodes().get(0); - Metadata metadata = new Metadata(0, Long.MAX_VALUE); + Metadata metadata = createMetadata(); metadata.update(cluster, Collections.emptySet(), time.milliseconds()); MockClient client = new MockClient(time, metadata); @@ -996,7 +996,7 @@ public class KafkaConsumerTest { Cluster cluster = TestUtils.singletonCluster(tpCounts); Node node = cluster.nodes().get(0); - Metadata metadata = new Metadata(0, Long.MAX_VALUE); + Metadata metadata = createMetadata(); metadata.update(cluster, Collections.emptySet(), time.milliseconds()); MockClient client = new MockClient(time, metadata); @@ -1061,7 +1061,7 @@ public class KafkaConsumerTest { Cluster cluster = TestUtils.singletonCluster(tpCounts); Node node = cluster.nodes().get(0); - Metadata metadata = new Metadata(0, Long.MAX_VALUE); + Metadata metadata = createMetadata(); metadata.update(cluster, Collections.emptySet(), time.milliseconds()); MockClient client = new MockClient(time, metadata); @@ -1122,7 +1122,7 @@ public class KafkaConsumerTest { Cluster cluster = TestUtils.singletonCluster(topic, 2); Node node = cluster.nodes().get(0); - Metadata metadata = new Metadata(0, Long.MAX_VALUE); + Metadata metadata = createMetadata(); metadata.update(cluster, Collections.emptySet(), time.milliseconds()); MockClient client = new MockClient(time, metadata); @@ -1268,7 +1268,7 @@ public class KafkaConsumerTest { Cluster cluster = TestUtils.singletonCluster(topic, 1); Node node = cluster.nodes().get(0); - Metadata metadata = new Metadata(0, Long.MAX_VALUE); + Metadata metadata = createMetadata(); metadata.update(cluster, Collections.emptySet(), time.milliseconds()); MockClient client = new MockClient(time, metadata); @@ -1372,6 +1372,10 @@ public class KafkaConsumerTest { }; } + private Metadata createMetadata() { + return new Metadata(0, Long.MAX_VALUE, true); + } + private Node prepareRebalance(MockClient client, Node node, final Set subscribedTopics, PartitionAssignor assignor, List partitions, Node coordinator) { if (coordinator == null) { // lookup coordinator diff --git a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinatorTest.java b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinatorTest.java index 4779f43cf79..8a934398928 100644 --- a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinatorTest.java +++ b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinatorTest.java @@ -73,7 +73,7 @@ public class AbstractCoordinatorTest { this.mockTime = new MockTime(); this.mockClient = new MockClient(mockTime); - Metadata metadata = new Metadata(); + Metadata metadata = new Metadata(100L, 60 * 60 * 1000L, true); this.consumerClient = new ConsumerNetworkClient(mockClient, metadata, mockTime, RETRY_BACKOFF_MS, REQUEST_TIMEOUT_MS); Metrics metrics = new Metrics(); diff --git a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinatorTest.java b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinatorTest.java index 770d4f7f652..7d22351369b 100644 --- a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinatorTest.java +++ b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinatorTest.java @@ -119,7 +119,7 @@ public class ConsumerCoordinatorTest { public void setup() { this.time = new MockTime(); this.subscriptions = new SubscriptionState(OffsetResetStrategy.EARLIEST); - this.metadata = new Metadata(0, Long.MAX_VALUE); + this.metadata = new Metadata(0, Long.MAX_VALUE, true); this.metadata.update(cluster, Collections.emptySet(), time.milliseconds()); this.client = new MockClient(time, metadata); this.consumerClient = new ConsumerNetworkClient(client, metadata, time, 100, 1000); diff --git a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerNetworkClientTest.java b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerNetworkClientTest.java index 2a6b2286c9d..b46b65746b2 100644 --- a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerNetworkClientTest.java +++ b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerNetworkClientTest.java @@ -46,7 +46,7 @@ public class ConsumerNetworkClientTest { private MockClient client = new MockClient(time); private Cluster cluster = TestUtils.singletonCluster(topicName, 1); private Node node = cluster.nodes().get(0); - private Metadata metadata = new Metadata(0, Long.MAX_VALUE); + private Metadata metadata = new Metadata(0, Long.MAX_VALUE, true); private ConsumerNetworkClient consumerClient = new ConsumerNetworkClient(client, metadata, time, 100, 1000); @Test diff --git a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetcherTest.java b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetcherTest.java index 97a6259e053..24eeb6f8973 100644 --- a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetcherTest.java +++ b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetcherTest.java @@ -118,7 +118,7 @@ public class FetcherTest { private int fetchSize = 1000; private long retryBackoffMs = 100; private MockTime time = new MockTime(1); - private Metadata metadata = new Metadata(0, Long.MAX_VALUE); + private Metadata metadata = new Metadata(0, Long.MAX_VALUE, true); private MockClient client = new MockClient(time, metadata); private Cluster cluster = TestUtils.singletonCluster(topicName, 2); private Node node = cluster.nodes().get(0); @@ -1068,16 +1068,15 @@ public class FetcherTest { public void testGetTopicMetadataInvalidTopic() { client.prepareResponse(newMetadataResponse(topicName, Errors.INVALID_TOPIC_EXCEPTION)); fetcher.getTopicMetadata( - new MetadataRequest.Builder(Collections.singletonList(topicName)), 5000L); + new MetadataRequest.Builder(Collections.singletonList(topicName), true), 5000L); } @Test public void testGetTopicMetadataUnknownTopic() { client.prepareResponse(newMetadataResponse(topicName, Errors.UNKNOWN_TOPIC_OR_PARTITION)); - Map> topicMetadata = - fetcher.getTopicMetadata( - new MetadataRequest.Builder(Collections.singletonList(topicName)), 5000L); + Map> topicMetadata = fetcher.getTopicMetadata( + new MetadataRequest.Builder(Collections.singletonList(topicName), true), 5000L); assertNull(topicMetadata.get(topicName)); } @@ -1086,9 +1085,8 @@ public class FetcherTest { client.prepareResponse(newMetadataResponse(topicName, Errors.LEADER_NOT_AVAILABLE)); client.prepareResponse(newMetadataResponse(topicName, Errors.NONE)); - Map> topicMetadata = - fetcher.getTopicMetadata( - new MetadataRequest.Builder(Collections.singletonList(topicName)), 5000L); + Map> topicMetadata = fetcher.getTopicMetadata( + new MetadataRequest.Builder(Collections.singletonList(topicName), true), 5000L); assertTrue(topicMetadata.containsKey(topicName)); } diff --git a/clients/src/test/java/org/apache/kafka/clients/producer/KafkaProducerTest.java b/clients/src/test/java/org/apache/kafka/clients/producer/KafkaProducerTest.java index 3a6426a13af..e2fe614b81d 100644 --- a/clients/src/test/java/org/apache/kafka/clients/producer/KafkaProducerTest.java +++ b/clients/src/test/java/org/apache/kafka/clients/producer/KafkaProducerTest.java @@ -325,7 +325,8 @@ public class KafkaProducerTest { KafkaProducer producer = new KafkaProducer<>(props, new StringSerializer(), new StringSerializer()); long refreshBackoffMs = 500L; long metadataExpireMs = 60000L; - final Metadata metadata = new Metadata(refreshBackoffMs, metadataExpireMs, true, new ClusterResourceListeners()); + final Metadata metadata = new Metadata(refreshBackoffMs, metadataExpireMs, true, + true, new ClusterResourceListeners()); final Time time = new MockTime(); MemberModifier.field(KafkaProducer.class, "metadata").set(producer, metadata); MemberModifier.field(KafkaProducer.class, "time").set(producer, time); diff --git a/clients/src/test/java/org/apache/kafka/clients/producer/internals/SenderTest.java b/clients/src/test/java/org/apache/kafka/clients/producer/internals/SenderTest.java index faa6ea5229d..c1c5a2e0ff2 100644 --- a/clients/src/test/java/org/apache/kafka/clients/producer/internals/SenderTest.java +++ b/clients/src/test/java/org/apache/kafka/clients/producer/internals/SenderTest.java @@ -87,7 +87,7 @@ public class SenderTest { private MockTime time = new MockTime(); private MockClient client = new MockClient(time); private int batchSize = 16 * 1024; - private Metadata metadata = new Metadata(0, Long.MAX_VALUE, true, new ClusterResourceListeners()); + private Metadata metadata = new Metadata(0, Long.MAX_VALUE, true, true, new ClusterResourceListeners()); private ApiVersions apiVersions = new ApiVersions(); private Cluster cluster = TestUtils.singletonCluster("test", 2); private Metrics metrics = null; diff --git a/clients/src/test/java/org/apache/kafka/clients/producer/internals/TransactionManagerTest.java b/clients/src/test/java/org/apache/kafka/clients/producer/internals/TransactionManagerTest.java index ed7ec84d61d..3e3f785c6ed 100644 --- a/clients/src/test/java/org/apache/kafka/clients/producer/internals/TransactionManagerTest.java +++ b/clients/src/test/java/org/apache/kafka/clients/producer/internals/TransactionManagerTest.java @@ -92,7 +92,7 @@ public class TransactionManagerTest { private MockTime time = new MockTime(); private MockClient client = new MockClient(time); - private Metadata metadata = new Metadata(0, Long.MAX_VALUE, true, new ClusterResourceListeners()); + private Metadata metadata = new Metadata(0, Long.MAX_VALUE, true, true, new ClusterResourceListeners()); private ApiVersions apiVersions = new ApiVersions(); private Cluster cluster = TestUtils.singletonCluster("test", 2); private RecordAccumulator accumulator = null; diff --git a/clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java b/clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java index 56f02156683..a05b680f180 100644 --- a/clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java +++ b/clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java @@ -117,6 +117,10 @@ public class RequestResponseTest { checkErrorResponse(createMetadataRequest(1, singletonList("topic1")), new UnknownServerException()); checkResponse(createMetadataResponse(), 2); checkErrorResponse(createMetadataRequest(2, singletonList("topic1")), new UnknownServerException()); + checkResponse(createMetadataResponse(), 3); + checkErrorResponse(createMetadataRequest(3, singletonList("topic1")), new UnknownServerException()); + checkResponse(createMetadataResponse(), 4); + checkErrorResponse(createMetadataRequest(4, singletonList("topic1")), new UnknownServerException()); checkRequest(createOffsetCommitRequest(2)); checkErrorResponse(createOffsetCommitRequest(2), new UnknownServerException()); checkResponse(createOffsetCommitResponse(), 0); @@ -703,7 +707,7 @@ public class RequestResponseTest { } private MetadataRequest createMetadataRequest(int version, List topics) { - return new MetadataRequest.Builder(topics).build((short) version); + return new MetadataRequest.Builder(topics, true).build((short) version); } private MetadataResponse createMetadataResponse() { diff --git a/clients/src/test/java/org/apache/kafka/common/security/authenticator/SaslAuthenticatorTest.java b/clients/src/test/java/org/apache/kafka/common/security/authenticator/SaslAuthenticatorTest.java index 28402f0ca15..631ae08b31b 100644 --- a/clients/src/test/java/org/apache/kafka/common/security/authenticator/SaslAuthenticatorTest.java +++ b/clients/src/test/java/org/apache/kafka/common/security/authenticator/SaslAuthenticatorTest.java @@ -548,10 +548,10 @@ public class SaslAuthenticatorTest { // Send metadata request before Kafka SASL handshake request String node1 = "invalid1"; createClientConnection(SecurityProtocol.PLAINTEXT, node1); - MetadataRequest metadataRequest1 = - new MetadataRequest.Builder(Collections.singletonList("sometopic")).build(); - RequestHeader metadataRequestHeader1 = new RequestHeader(ApiKeys.METADATA.id, - metadataRequest1.version(), "someclient", 1); + MetadataRequest metadataRequest1 = new MetadataRequest.Builder(Collections.singletonList("sometopic"), + true).build(); + RequestHeader metadataRequestHeader1 = new RequestHeader(ApiKeys.METADATA.id, metadataRequest1.version(), + "someclient", 1); selector.send(metadataRequest1.toSend(node1, metadataRequestHeader1)); NetworkTestUtils.waitForChannelClose(selector, node1, ChannelState.READY); selector.close(); @@ -563,8 +563,7 @@ public class SaslAuthenticatorTest { String node2 = "invalid2"; createClientConnection(SecurityProtocol.PLAINTEXT, node2); sendHandshakeRequestReceiveResponse(node2); - MetadataRequest metadataRequest2 = - new MetadataRequest.Builder(Collections.singletonList("sometopic")).build(); + MetadataRequest metadataRequest2 = new MetadataRequest.Builder(Collections.singletonList("sometopic"), true).build(); RequestHeader metadataRequestHeader2 = new RequestHeader(ApiKeys.METADATA.id, metadataRequest2.version(), "someclient", 2); selector.send(metadataRequest2.toSend(node2, metadataRequestHeader2)); diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/WorkerGroupMember.java b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/WorkerGroupMember.java index 24d321eec7b..62e2fc1b243 100644 --- a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/WorkerGroupMember.java +++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/WorkerGroupMember.java @@ -87,7 +87,7 @@ public class WorkerGroupMember { reporters.add(new JmxReporter(JMX_PREFIX)); this.metrics = new Metrics(metricConfig, reporters, time); this.retryBackoffMs = config.getLong(CommonClientConfigs.RETRY_BACKOFF_MS_CONFIG); - this.metadata = new Metadata(retryBackoffMs, config.getLong(CommonClientConfigs.METADATA_MAX_AGE_CONFIG)); + this.metadata = new Metadata(retryBackoffMs, config.getLong(CommonClientConfigs.METADATA_MAX_AGE_CONFIG), true); List addresses = ClientUtils.parseAndValidateAddresses(config.getList(CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG)); this.metadata.update(Cluster.bootstrap(addresses), Collections.emptySet(), 0); String metricGrpPrefix = "connect"; diff --git a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/distributed/WorkerCoordinatorTest.java b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/distributed/WorkerCoordinatorTest.java index ab042dea47d..edef7dcb3f3 100644 --- a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/distributed/WorkerCoordinatorTest.java +++ b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/distributed/WorkerCoordinatorTest.java @@ -90,7 +90,7 @@ public class WorkerCoordinatorTest { public void setup() { this.time = new MockTime(); this.client = new MockClient(time); - this.metadata = new Metadata(0, Long.MAX_VALUE); + this.metadata = new Metadata(0, Long.MAX_VALUE, true); this.metadata.update(cluster, Collections.emptySet(), time.milliseconds()); this.consumerClient = new ConsumerNetworkClient(client, metadata, time, 100, 1000); this.metrics = new Metrics(time); diff --git a/core/src/main/scala/kafka/admin/AdminClient.scala b/core/src/main/scala/kafka/admin/AdminClient.scala index 50198a763ed..4410e94e033 100644 --- a/core/src/main/scala/kafka/admin/AdminClient.scala +++ b/core/src/main/scala/kafka/admin/AdminClient.scala @@ -213,7 +213,7 @@ class AdminClient(val time: Time, */ def deleteRecordsBefore(offsets: Map[TopicPartition, Long]): Future[Map[TopicPartition, DeleteRecordsResult]] = { - val metadataRequest = new MetadataRequest.Builder(offsets.keys.map(_.topic()).toSet.toList.asJava) + val metadataRequest = new MetadataRequest.Builder(offsets.keys.map(_.topic).toSet.toList.asJava, true) val response = sendAnyNode(ApiKeys.METADATA, metadataRequest).asInstanceOf[MetadataResponse] val errors = response.errors if (!errors.isEmpty) @@ -425,7 +425,7 @@ object AdminClient { def create(config: AdminConfig): AdminClient = { val time = Time.SYSTEM val metrics = new Metrics(time) - val metadata = new Metadata + val metadata = new Metadata(100L, 60 * 60 * 1000L, true) val channelBuilder = ClientUtils.createChannelBuilder(config) val requestTimeoutMs = config.getInt(CommonClientConfigs.REQUEST_TIMEOUT_MS_CONFIG) val retryBackoffMs = config.getLong(CommonClientConfigs.RETRY_BACKOFF_MS_CONFIG) diff --git a/core/src/main/scala/kafka/server/KafkaApis.scala b/core/src/main/scala/kafka/server/KafkaApis.scala index b780823c62a..eb0bf3b999c 100644 --- a/core/src/main/scala/kafka/server/KafkaApis.scala +++ b/core/src/main/scala/kafka/server/KafkaApis.scala @@ -886,7 +886,8 @@ class KafkaApis(val requestChannel: RequestChannel, topicMetadata.headOption.getOrElse(createInternalTopic(topic)) } - private def getTopicMetadata(topics: Set[String], listenerName: ListenerName, errorUnavailableEndpoints: Boolean): Seq[MetadataResponse.TopicMetadata] = { + private def getTopicMetadata(allowAutoTopicCreation: Boolean, topics: Set[String], listenerName: ListenerName, + errorUnavailableEndpoints: Boolean): Seq[MetadataResponse.TopicMetadata] = { val topicResponses = metadataCache.getTopicMetadata(topics, listenerName, errorUnavailableEndpoints) if (topics.isEmpty || topicResponses.size == topics.size) { topicResponses @@ -899,7 +900,7 @@ class KafkaApis(val requestChannel: RequestChannel, new MetadataResponse.TopicMetadata(Errors.INVALID_REPLICATION_FACTOR, topic, true, java.util.Collections.emptyList()) else topicMetadata - } else if (config.autoCreateTopicsEnable) { + } else if (allowAutoTopicCreation && config.autoCreateTopicsEnable) { createTopic(topic, config.numPartitions, config.defaultReplicationFactor) } else { new MetadataResponse.TopicMetadata(Errors.UNKNOWN_TOPIC_OR_PARTITION, topic, false, java.util.Collections.emptyList()) @@ -937,7 +938,7 @@ class KafkaApis(val requestChannel: RequestChannel, if (authorizedTopics.nonEmpty) { val nonExistingTopics = metadataCache.getNonExistingTopics(authorizedTopics) - if (config.autoCreateTopicsEnable && nonExistingTopics.nonEmpty) { + if (metadataRequest.allowAutoTopicCreation && config.autoCreateTopicsEnable && nonExistingTopics.nonEmpty) { if (!authorize(request.session, Create, Resource.ClusterResource)) { authorizedTopics --= nonExistingTopics unauthorizedForCreateTopics ++= nonExistingTopics @@ -965,7 +966,8 @@ class KafkaApis(val requestChannel: RequestChannel, if (authorizedTopics.isEmpty) Seq.empty[MetadataResponse.TopicMetadata] else - getTopicMetadata(authorizedTopics, request.listenerName, errorUnavailableEndpoints) + getTopicMetadata(metadataRequest.allowAutoTopicCreation, authorizedTopics, request.listenerName, + errorUnavailableEndpoints) val completeTopicMetadata = topicMetadata ++ unauthorizedForCreateTopicMetadata ++ unauthorizedForDescribeTopicMetadata diff --git a/core/src/test/scala/integration/kafka/api/AuthorizerIntegrationTest.scala b/core/src/test/scala/integration/kafka/api/AuthorizerIntegrationTest.scala index 017c57f75cd..dce5da272a8 100644 --- a/core/src/test/scala/integration/kafka/api/AuthorizerIntegrationTest.scala +++ b/core/src/test/scala/integration/kafka/api/AuthorizerIntegrationTest.scala @@ -222,8 +222,8 @@ class AuthorizerIntegrationTest extends BaseRequestTest { super.tearDown() } - private def createMetadataRequest = { - new requests.MetadataRequest.Builder(List(topic).asJava).build() + private def createMetadataRequest(allowAutoTopicCreation: Boolean) = { + new requests.MetadataRequest.Builder(List(topic).asJava, allowAutoTopicCreation).build() } private def createProduceRequest = { @@ -328,7 +328,7 @@ class AuthorizerIntegrationTest extends BaseRequestTest { @Test def testAuthorizationWithTopicExisting() { val requestKeyToRequest = mutable.LinkedHashMap[ApiKeys, AbstractRequest]( - ApiKeys.METADATA -> createMetadataRequest, + ApiKeys.METADATA -> createMetadataRequest(allowAutoTopicCreation = true), ApiKeys.PRODUCE -> createProduceRequest, ApiKeys.FETCH -> createFetchRequest, ApiKeys.LIST_OFFSETS -> createListOffsetsRequest, @@ -381,6 +381,8 @@ class AuthorizerIntegrationTest extends BaseRequestTest { TestUtils.verifyTopicDeletion(zkUtils, deleteTopic, 1, servers) val requestKeyToRequest = mutable.LinkedHashMap[ApiKeys, AbstractRequest]( + ApiKeys.METADATA -> createMetadataRequest(allowAutoTopicCreation = true), + ApiKeys.METADATA -> createMetadataRequest(allowAutoTopicCreation = false), ApiKeys.PRODUCE -> createProduceRequest, ApiKeys.FETCH -> createFetchRequest, ApiKeys.LIST_OFFSETS -> createListOffsetsRequest, @@ -397,7 +399,7 @@ class AuthorizerIntegrationTest extends BaseRequestTest { val resourceToAcls = requestKeysToAcls(key) resourceToAcls.get(topicResource).foreach { acls => val describeAcls = topicDescribeAcl(topicResource) - val isAuthorized = describeAcls == acls + val isAuthorized = describeAcls == acls addAndVerifyAcls(describeAcls, topicResource) sendRequestAndVerifyResponseError(key, request, resources, isAuthorized = isAuthorized, isAuthorizedTopicDescribe = true, topicExists = false) removeAllAcls() diff --git a/core/src/test/scala/integration/kafka/api/KafkaAdminClientIntegrationTest.scala b/core/src/test/scala/integration/kafka/api/KafkaAdminClientIntegrationTest.scala index 065759ffe60..0e21da7fd01 100644 --- a/core/src/test/scala/integration/kafka/api/KafkaAdminClientIntegrationTest.scala +++ b/core/src/test/scala/integration/kafka/api/KafkaAdminClientIntegrationTest.scala @@ -30,8 +30,7 @@ import org.apache.kafka.clients.admin.NewTopic import org.apache.kafka.common.KafkaFuture import org.apache.kafka.common.acl.{AccessControlEntry, AclBinding, AclBindingFilter, AclOperation, AclPermissionType} import org.apache.kafka.common.config.ConfigResource -import org.apache.kafka.common.errors.{InvalidRequestException, SecurityDisabledException, TopicExistsException} -import org.apache.kafka.common.errors.{InvalidRequestException, SecurityDisabledException, TimeoutException, TopicExistsException} +import org.apache.kafka.common.errors.{InvalidRequestException, SecurityDisabledException, TimeoutException, TopicExistsException, UnknownTopicOrPartitionException} import org.apache.kafka.common.protocol.ApiKeys import org.junit.{After, Before, Rule, Test} import org.apache.kafka.common.requests.MetadataResponse @@ -80,7 +79,7 @@ class KafkaAdminClientIntegrationTest extends KafkaServerTestHarness with Loggin def waitForTopics(client: AdminClient, expectedPresent: Seq[String], expectedMissing: Seq[String]): Unit = { TestUtils.waitUntilTrue(() => { - val topics = client.listTopics().names().get() + val topics = client.listTopics.names.get() expectedPresent.forall(topicName => topics.contains(topicName)) && expectedMissing.forall(topicName => !topics.contains(topicName)) }, "timed out waiting for topics") @@ -123,21 +122,39 @@ class KafkaAdminClientIntegrationTest extends KafkaServerTestHarness with Loggin val topics = Seq("mytopic", "mytopic2") val newTopics = topics.map(new NewTopic(_, 1, 1)) client.createTopics(newTopics.asJava, new CreateTopicsOptions().validateOnly(true)).all.get() - waitForTopics(client, List(), List("mytopic", "mytopic2")) + waitForTopics(client, List(), topics) client.createTopics(newTopics.asJava).all.get() - waitForTopics(client, List("mytopic", "mytopic2"), List()) + waitForTopics(client, topics, List()) val results = client.createTopics(newTopics.asJava).results() assertTrue(results.containsKey("mytopic")) assertFutureExceptionTypeEquals(results.get("mytopic"), classOf[TopicExistsException]) assertTrue(results.containsKey("mytopic2")) assertFutureExceptionTypeEquals(results.get("mytopic2"), classOf[TopicExistsException]) - val topicsFromDescribe = client.describeTopics(Seq("mytopic", "mytopic2").asJava).all.get().asScala.keys + val topicsFromDescribe = client.describeTopics(topics.asJava).all.get().asScala.keys assertEquals(topics.toSet, topicsFromDescribe) client.deleteTopics(topics.asJava).all.get() - waitForTopics(client, List(), List("mytopic", "mytopic2")) + waitForTopics(client, List(), topics) + } + + /** + * describe should not auto create topics + */ + @Test + def testDescribeNonExistingTopic(): Unit = { + client = AdminClient.create(createConfig()) + + val existingTopic = "existing-topic" + client.createTopics(Seq(existingTopic).map(new NewTopic(_, 1, 1)).asJava).all.get() + waitForTopics(client, Seq(existingTopic), List()) + + val nonExistingTopic = "non-existing" + val results = client.describeTopics(Seq(nonExistingTopic, existingTopic).asJava).results + assertEquals(existingTopic, results.get(existingTopic).get.name) + intercept[ExecutionException](results.get(nonExistingTopic).get).getCause.isInstanceOf[UnknownTopicOrPartitionException] + assertEquals(None, zkUtils.getTopicPartitionCount(nonExistingTopic)) } @Test diff --git a/core/src/test/scala/unit/kafka/server/AbstractCreateTopicsRequestTest.scala b/core/src/test/scala/unit/kafka/server/AbstractCreateTopicsRequestTest.scala index 8e6b11d78b6..0ef3405287d 100644 --- a/core/src/test/scala/unit/kafka/server/AbstractCreateTopicsRequestTest.scala +++ b/core/src/test/scala/unit/kafka/server/AbstractCreateTopicsRequestTest.scala @@ -43,7 +43,7 @@ class AbstractCreateTopicsRequestTest extends BaseRequestTest { def verifyMetadata(socketServer: SocketServer) = { val metadata = sendMetadataRequest( - new MetadataRequest.Builder(List(topic).asJava).build()).topicMetadata.asScala + new MetadataRequest.Builder(List(topic).asJava, true).build()).topicMetadata.asScala val metadataForTopic = metadata.filter(_.topic == topic).head val partitions = if (!details.replicasAssignments.isEmpty) @@ -127,7 +127,7 @@ class AbstractCreateTopicsRequestTest extends BaseRequestTest { protected def validateTopicExists(topic: String): Unit = { TestUtils.waitUntilMetadataIsPropagated(servers, topic, 0) val metadata = sendMetadataRequest( - new MetadataRequest.Builder(List(topic).asJava).build()).topicMetadata.asScala + new MetadataRequest.Builder(List(topic).asJava, true).build()).topicMetadata.asScala assertTrue("The topic should be created", metadata.exists(p => p.topic.equals(topic) && p.error == Errors.NONE)) } diff --git a/core/src/test/scala/unit/kafka/server/DeleteTopicsRequestTest.scala b/core/src/test/scala/unit/kafka/server/DeleteTopicsRequestTest.scala index 9cd53d8e158..881bf8ee790 100644 --- a/core/src/test/scala/unit/kafka/server/DeleteTopicsRequestTest.scala +++ b/core/src/test/scala/unit/kafka/server/DeleteTopicsRequestTest.scala @@ -106,7 +106,7 @@ class DeleteTopicsRequestTest extends BaseRequestTest { private def validateTopicIsDeleted(topic: String): Unit = { val metadata = sendMetadataRequest(new MetadataRequest. - Builder(List(topic).asJava).build).topicMetadata.asScala + Builder(List(topic).asJava, true).build).topicMetadata.asScala TestUtils.waitUntilTrue (() => !metadata.exists(p => p.topic.equals(topic) && p.error == Errors.NONE), s"The topic $topic should not exist") } diff --git a/core/src/test/scala/unit/kafka/server/MetadataRequestTest.scala b/core/src/test/scala/unit/kafka/server/MetadataRequestTest.scala index fdc9a95e570..177a9ee4fd4 100644 --- a/core/src/test/scala/unit/kafka/server/MetadataRequestTest.scala +++ b/core/src/test/scala/unit/kafka/server/MetadataRequestTest.scala @@ -108,11 +108,41 @@ class MetadataRequestTest extends BaseRequestTest { // v0, Doesn't support a "no topics" request // v1, Empty list represents "no topics" - val metadataResponse = sendMetadataRequest(new MetadataRequest(List[String]().asJava, 1.toShort)) + val metadataResponse = sendMetadataRequest(new MetadataRequest(List[String]().asJava, true, 1.toShort)) assertTrue("Response should have no errors", metadataResponse.errors.isEmpty) assertTrue("Response should have no topics", metadataResponse.topicMetadata.isEmpty) } + @Test + def testAutoTopicCreation(): Unit = { + def checkAutoCreatedTopic(existingTopic: String, autoCreatedTopic: String, response: MetadataResponse): Unit = { + assertNull(response.errors.get(existingTopic)) + assertEquals(Errors.LEADER_NOT_AVAILABLE, response.errors.get(autoCreatedTopic)) + assertEquals(Some(servers.head.config.numPartitions), zkUtils.getTopicPartitionCount(autoCreatedTopic)) + for (i <- 0 until servers.head.config.numPartitions) + TestUtils.waitUntilMetadataIsPropagated(servers, autoCreatedTopic, i) + } + + val topic1 = "t1" + val topic2 = "t2" + val topic3 = "t3" + val topic4 = "t4" + TestUtils.createTopic(zkUtils, topic1, 1, 1, servers) + + val response1 = sendMetadataRequest(new MetadataRequest(Seq(topic1, topic2).asJava, true, ApiKeys.METADATA.latestVersion)) + checkAutoCreatedTopic(topic1, topic2, response1) + + // V3 doesn't support a configurable allowAutoTopicCreation, so the fact that we set it to `false` has no effect + val response2 = sendMetadataRequest(new MetadataRequest(Seq(topic2, topic3).asJava, false, 3)) + checkAutoCreatedTopic(topic2, topic3, response2) + + // V4 and higher support a configurable allowAutoTopicCreation + val response3 = sendMetadataRequest(new MetadataRequest(Seq(topic3, topic4).asJava, false, 4)) + assertNull(response3.errors.get(topic3)) + assertEquals(Errors.UNKNOWN_TOPIC_OR_PARTITION, response3.errors.get(topic4)) + assertEquals(None, zkUtils.getTopicPartitionCount(topic4)) + } + @Test def testAllTopicsRequest() { // create some topics @@ -120,7 +150,7 @@ class MetadataRequestTest extends BaseRequestTest { TestUtils.createTopic(zkUtils, "t2", 3, 2, servers) // v0, Empty list represents all topics - val metadataResponseV0 = sendMetadataRequest(new MetadataRequest(List[String]().asJava, 0.toShort)) + val metadataResponseV0 = sendMetadataRequest(new MetadataRequest(List[String]().asJava, true, 0.toShort)) assertTrue("V0 Response should have no errors", metadataResponseV0.errors.isEmpty) assertEquals("V0 Response should have 2 (all) topics", 2, metadataResponseV0.topicMetadata.size()) @@ -139,7 +169,7 @@ class MetadataRequestTest extends BaseRequestTest { TestUtils.createTopic(zkUtils, replicaDownTopic, 1, replicaCount, servers) // Kill a replica node that is not the leader - val metadataResponse = sendMetadataRequest(new MetadataRequest(List(replicaDownTopic).asJava, 1.toShort)) + val metadataResponse = sendMetadataRequest(new MetadataRequest(List(replicaDownTopic).asJava, true, 1.toShort)) val partitionMetadata = metadataResponse.topicMetadata.asScala.head.partitionMetadata.asScala.head val downNode = servers.find { server => val serverId = server.apis.brokerId @@ -150,14 +180,14 @@ class MetadataRequestTest extends BaseRequestTest { downNode.shutdown() TestUtils.waitUntilTrue(() => { - val response = sendMetadataRequest(new MetadataRequest(List(replicaDownTopic).asJava, 1.toShort)) + val response = sendMetadataRequest(new MetadataRequest(List(replicaDownTopic).asJava, true, 1.toShort)) val metadata = response.topicMetadata.asScala.head.partitionMetadata.asScala.head val replica = metadata.replicas.asScala.find(_.id == downNode.apis.brokerId).get replica.host == "" & replica.port == -1 }, "Replica was not found down", 5000) // Validate version 0 still filters unavailable replicas and contains error - val v0MetadataResponse = sendMetadataRequest(new MetadataRequest(List(replicaDownTopic).asJava, 0.toShort)) + val v0MetadataResponse = sendMetadataRequest(new MetadataRequest(List(replicaDownTopic).asJava, true, 0.toShort)) val v0BrokerIds = v0MetadataResponse.brokers().asScala.map(_.id).toSeq assertTrue("Response should have no errors", v0MetadataResponse.errors.isEmpty) assertFalse(s"The downed broker should not be in the brokers list", v0BrokerIds.contains(downNode)) @@ -167,7 +197,7 @@ class MetadataRequestTest extends BaseRequestTest { assertTrue(s"Response should have ${replicaCount - 1} replicas", v0PartitionMetadata.replicas.size == replicaCount - 1) // Validate version 1 returns unavailable replicas with no error - val v1MetadataResponse = sendMetadataRequest(new MetadataRequest(List(replicaDownTopic).asJava, 1.toShort)) + val v1MetadataResponse = sendMetadataRequest(new MetadataRequest(List(replicaDownTopic).asJava, true, 1.toShort)) val v1BrokerIds = v1MetadataResponse.brokers().asScala.map(_.id).toSeq assertTrue("Response should have no errors", v1MetadataResponse.errors.isEmpty) assertFalse(s"The downed broker should not be in the brokers list", v1BrokerIds.contains(downNode)) diff --git a/core/src/test/scala/unit/kafka/server/RequestQuotaTest.scala b/core/src/test/scala/unit/kafka/server/RequestQuotaTest.scala index b261cb2ee42..3b0e93c025e 100644 --- a/core/src/test/scala/unit/kafka/server/RequestQuotaTest.scala +++ b/core/src/test/scala/unit/kafka/server/RequestQuotaTest.scala @@ -173,7 +173,7 @@ class RequestQuotaTest extends BaseRequestTest { FetchRequest.Builder.forConsumer(0, 0, partitionMap) case ApiKeys.METADATA => - new MetadataRequest.Builder(List(topic).asJava) + new MetadataRequest.Builder(List(topic).asJava, true) case ApiKeys.LIST_OFFSETS => ListOffsetRequest.Builder.forConsumer(false, IsolationLevel.READ_UNCOMMITTED) diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsKafkaClient.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsKafkaClient.java index 44f7900edff..b1c3f2b4c94 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsKafkaClient.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsKafkaClient.java @@ -101,7 +101,8 @@ public class StreamsKafkaClient { final Metadata metadata = new Metadata(streamsConfig.getLong( StreamsConfig.RETRY_BACKOFF_MS_CONFIG), - streamsConfig.getLong(StreamsConfig.METADATA_MAX_AGE_CONFIG) + streamsConfig.getLong(StreamsConfig.METADATA_MAX_AGE_CONFIG), + false ); final List addresses = ClientUtils.parseAndValidateAddresses(streamsConfig.getList(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG)); metadata.update(Cluster.bootstrap(addresses), Collections.emptySet(), time.milliseconds()); @@ -241,7 +242,8 @@ public class StreamsKafkaClient { private String getAnyReadyBrokerId() { final Metadata metadata = new Metadata( streamsConfig.getLong(StreamsConfig.RETRY_BACKOFF_MS_CONFIG), - streamsConfig.getLong(StreamsConfig.METADATA_MAX_AGE_CONFIG)); + streamsConfig.getLong(StreamsConfig.METADATA_MAX_AGE_CONFIG), + false); final List addresses = ClientUtils.parseAndValidateAddresses(streamsConfig.getList(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG)); metadata.update(Cluster.bootstrap(addresses), Collections.emptySet(), Time.SYSTEM.milliseconds()); @@ -289,7 +291,7 @@ public class StreamsKafkaClient { final ClientRequest clientRequest = kafkaClient.newClientRequest( getAnyReadyBrokerId(), - new MetadataRequest.Builder(null), + MetadataRequest.Builder.allTopics(), Time.SYSTEM.milliseconds(), true); final ClientResponse clientResponse = sendRequest(clientRequest);