Browse Source

KAFKA-5291; AdminClient should not trigger auto creation of topics

- Added a boolean `allow_auto_topic_creation` to MetadataRequest and
bumped the protocol version to V4.

- When connecting to brokers older than 0.11.0.0, the `allow_auto_topic_creation`
field won't be considered, so we send a metadata request for all topics
to keep the behavior consistent.

- Set `allow_auto_topic_creation` to false in the new AdminClient and
StreamsKafkaClient (which exists for the purpose of creating topics
manually); set it to true everywhere else for now. Other clients will eventually
rely on client-side auto topic creation, but that’s not there yet.

- Add `allowAutoTopicCreation` field to `Metadata`, which is used by
`DefaultMetadataUpdater`. This is not strictly needed for the new
`AdminClient`, but it avoids surprises if it ever adds a topic to `Metadata`
via `setTopics` or `addTopic`.

Author: Ismael Juma <ismael@juma.me.uk>

Reviewers: Jun Rao <junrao@gmail.com>

Closes #3098 from ijuma/kafka-5291-admin-client-no-auto-topic-creation
pull/2259/merge
Ismael Juma 8 years ago
parent
commit
7311dcbc53
  1. 22
      clients/src/main/java/org/apache/kafka/clients/Metadata.java
  2. 3
      clients/src/main/java/org/apache/kafka/clients/NetworkClient.java
  3. 6
      clients/src/main/java/org/apache/kafka/clients/admin/AdminClient.java
  4. 22
      clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java
  5. 5
      clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java
  6. 3
      clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java
  7. 32
      clients/src/main/java/org/apache/kafka/common/protocol/Protocol.java
  8. 39
      clients/src/main/java/org/apache/kafka/common/requests/MetadataRequest.java
  9. 12
      clients/src/test/java/org/apache/kafka/clients/MetadataTest.java
  10. 8
      clients/src/test/java/org/apache/kafka/clients/NetworkClientTest.java
  11. 2
      clients/src/test/java/org/apache/kafka/clients/admin/MockKafkaAdminClientEnv.java
  12. 36
      clients/src/test/java/org/apache/kafka/clients/consumer/KafkaConsumerTest.java
  13. 2
      clients/src/test/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinatorTest.java
  14. 2
      clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinatorTest.java
  15. 2
      clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerNetworkClientTest.java
  16. 14
      clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetcherTest.java
  17. 3
      clients/src/test/java/org/apache/kafka/clients/producer/KafkaProducerTest.java
  18. 2
      clients/src/test/java/org/apache/kafka/clients/producer/internals/SenderTest.java
  19. 2
      clients/src/test/java/org/apache/kafka/clients/producer/internals/TransactionManagerTest.java
  20. 6
      clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java
  21. 11
      clients/src/test/java/org/apache/kafka/common/security/authenticator/SaslAuthenticatorTest.java
  22. 2
      connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/WorkerGroupMember.java
  23. 2
      connect/runtime/src/test/java/org/apache/kafka/connect/runtime/distributed/WorkerCoordinatorTest.java
  24. 4
      core/src/main/scala/kafka/admin/AdminClient.scala
  25. 10
      core/src/main/scala/kafka/server/KafkaApis.scala
  26. 10
      core/src/test/scala/integration/kafka/api/AuthorizerIntegrationTest.scala
  27. 31
      core/src/test/scala/integration/kafka/api/KafkaAdminClientIntegrationTest.scala
  28. 4
      core/src/test/scala/unit/kafka/server/AbstractCreateTopicsRequestTest.scala
  29. 2
      core/src/test/scala/unit/kafka/server/DeleteTopicsRequestTest.scala
  30. 42
      core/src/test/scala/unit/kafka/server/MetadataRequestTest.scala
  31. 2
      core/src/test/scala/unit/kafka/server/RequestQuotaTest.scala
  32. 8
      streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsKafkaClient.java

22
clients/src/main/java/org/apache/kafka/clients/Metadata.java

@ -66,17 +66,11 @@ public final class Metadata { @@ -66,17 +66,11 @@ public final class Metadata {
private final List<Listener> listeners;
private final ClusterResourceListeners clusterResourceListeners;
private boolean needMetadataForAllTopics;
private final boolean allowAutoTopicCreation;
private final boolean topicExpiryEnabled;
/**
* Create a metadata instance with reasonable defaults
*/
public Metadata() {
this(100L, 60 * 60 * 1000L);
}
public Metadata(long refreshBackoffMs, long metadataExpireMs) {
this(refreshBackoffMs, metadataExpireMs, false, new ClusterResourceListeners());
public Metadata(long refreshBackoffMs, long metadataExpireMs, boolean allowAutoTopicCreation) {
this(refreshBackoffMs, metadataExpireMs, allowAutoTopicCreation, false, new ClusterResourceListeners());
}
/**
@ -84,12 +78,16 @@ public final class Metadata { @@ -84,12 +78,16 @@ public final class Metadata {
* @param refreshBackoffMs The minimum amount of time that must expire between metadata refreshes to avoid busy
* polling
* @param metadataExpireMs The maximum amount of time that metadata can be retained without refresh
* @param allowAutoTopicCreation If this and the broker config 'auto.create.topics.enable' are true, topics that
* don't exist will be created by the broker when a metadata request is sent
* @param topicExpiryEnabled If true, enable expiry of unused topics
* @param clusterResourceListeners List of ClusterResourceListeners which will receive metadata updates.
*/
public Metadata(long refreshBackoffMs, long metadataExpireMs, boolean topicExpiryEnabled, ClusterResourceListeners clusterResourceListeners) {
public Metadata(long refreshBackoffMs, long metadataExpireMs, boolean allowAutoTopicCreation,
boolean topicExpiryEnabled, ClusterResourceListeners clusterResourceListeners) {
this.refreshBackoffMs = refreshBackoffMs;
this.metadataExpireMs = metadataExpireMs;
this.allowAutoTopicCreation = allowAutoTopicCreation;
this.topicExpiryEnabled = topicExpiryEnabled;
this.lastRefreshMs = 0L;
this.lastSuccessfulRefreshMs = 0L;
@ -275,6 +273,10 @@ public final class Metadata { @@ -275,6 +273,10 @@ public final class Metadata {
return this.lastSuccessfulRefreshMs;
}
public boolean allowAutoTopicCreation() {
return allowAutoTopicCreation;
}
/**
* Set state to indicate if metadata for all topics in Kafka cluster is required or not.
* @param needMetadataForAllTopics boolean indicating need for metadata of all topics in cluster.

3
clients/src/main/java/org/apache/kafka/clients/NetworkClient.java

@ -829,7 +829,8 @@ public class NetworkClient implements KafkaClient { @@ -829,7 +829,8 @@ public class NetworkClient implements KafkaClient {
if (metadata.needMetadataForAllTopics())
metadataRequest = MetadataRequest.Builder.allTopics();
else
metadataRequest = new MetadataRequest.Builder(new ArrayList<>(metadata.topics()));
metadataRequest = new MetadataRequest.Builder(new ArrayList<>(metadata.topics()),
metadata.allowAutoTopicCreation());
log.debug("Sending metadata request {} to node {}", metadataRequest, node.id());

6
clients/src/main/java/org/apache/kafka/clients/admin/AdminClient.java

@ -167,12 +167,6 @@ public abstract class AdminClient implements AutoCloseable { @@ -167,12 +167,6 @@ public abstract class AdminClient implements AutoCloseable {
/**
* Describe some topics in the cluster.
*
* Note that if auto.create.topics.enable is true on the brokers,
* describeTopics(topicName, ...) may create a topic named topicName.
* There are two workarounds: either use AdminClient#listTopics and ensure
* that the topic is present before describing, or disable
* auto.create.topics.enable.
*
* @param topicNames The names of the topics to describe.
* @param options The options to use when describing the topic.
*

22
clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java

@ -282,7 +282,7 @@ public class KafkaAdminClient extends AdminClient { @@ -282,7 +282,7 @@ public class KafkaAdminClient extends AdminClient {
try {
metadata = new Metadata(config.getLong(AdminClientConfig.RETRY_BACKOFF_MS_CONFIG),
config.getLong(AdminClientConfig.METADATA_MAX_AGE_CONFIG));
config.getLong(AdminClientConfig.METADATA_MAX_AGE_CONFIG), false);
List<MetricsReporter> reporters = config.getConfiguredInstances(AdminClientConfig.METRIC_REPORTER_CLASSES_CONFIG,
MetricsReporter.class);
Map<String, String> metricTags = Collections.singletonMap("client-id", clientId);
@ -1144,7 +1144,7 @@ public class KafkaAdminClient extends AdminClient { @@ -1144,7 +1144,7 @@ public class KafkaAdminClient extends AdminClient {
final Map<String, KafkaFutureImpl<TopicDescription>> topicFutures = new HashMap<>(topicNames.size());
final ArrayList<String> topicNamesList = new ArrayList<>();
for (String topicName : topicNames) {
if (topicFutures.get(topicName) == null) {
if (!topicFutures.containsKey(topicName)) {
topicFutures.put(topicName, new KafkaFutureImpl<TopicDescription>());
topicNamesList.add(topicName);
}
@ -1153,9 +1153,14 @@ public class KafkaAdminClient extends AdminClient { @@ -1153,9 +1153,14 @@ public class KafkaAdminClient extends AdminClient {
runnable.call(new Call("describeTopics", calcDeadlineMs(now, options.timeoutMs()),
new ControllerNodeProvider()) {
private boolean supportsDisablingTopicCreation = true;
@Override
AbstractRequest.Builder createRequest(int timeoutMs) {
return new MetadataRequest.Builder(topicNamesList);
if (supportsDisablingTopicCreation)
return new MetadataRequest.Builder(topicNamesList, false);
else
return MetadataRequest.Builder.allTopics();
}
@Override
@ -1189,6 +1194,15 @@ public class KafkaAdminClient extends AdminClient { @@ -1189,6 +1194,15 @@ public class KafkaAdminClient extends AdminClient {
}
}
@Override
boolean handleUnsupportedVersionException(UnsupportedVersionException exception) {
if (supportsDisablingTopicCreation) {
supportsDisablingTopicCreation = false;
return true;
}
return false;
}
@Override
void handleFailure(Throwable throwable) {
completeAllExceptionally(topicFutures.values(), throwable);
@ -1208,7 +1222,7 @@ public class KafkaAdminClient extends AdminClient { @@ -1208,7 +1222,7 @@ public class KafkaAdminClient extends AdminClient {
@Override
AbstractRequest.Builder createRequest(int timeoutMs) {
return new MetadataRequest.Builder(Collections.<String>emptyList());
return new MetadataRequest.Builder(Collections.<String>emptyList(), false);
}
@Override

5
clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java

@ -659,7 +659,8 @@ public class KafkaConsumer<K, V> implements Consumer<K, V> { @@ -659,7 +659,8 @@ public class KafkaConsumer<K, V> implements Consumer<K, V> {
this.valueDeserializer = valueDeserializer;
}
ClusterResourceListeners clusterResourceListeners = configureClusterResourceListeners(keyDeserializer, valueDeserializer, reporters, interceptorList);
this.metadata = new Metadata(retryBackoffMs, config.getLong(ConsumerConfig.METADATA_MAX_AGE_CONFIG), false, clusterResourceListeners);
this.metadata = new Metadata(retryBackoffMs, config.getLong(ConsumerConfig.METADATA_MAX_AGE_CONFIG),
true, false, clusterResourceListeners);
List<InetSocketAddress> addresses = ClientUtils.parseAndValidateAddresses(config.getList(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG));
this.metadata.update(Cluster.bootstrap(addresses), Collections.<String>emptySet(), 0);
String metricGrpPrefix = "consumer";
@ -1352,7 +1353,7 @@ public class KafkaConsumer<K, V> implements Consumer<K, V> { @@ -1352,7 +1353,7 @@ public class KafkaConsumer<K, V> implements Consumer<K, V> {
return parts;
Map<String, List<PartitionInfo>> topicMetadata = fetcher.getTopicMetadata(
new MetadataRequest.Builder(Collections.singletonList(topic)), requestTimeoutMs);
new MetadataRequest.Builder(Collections.singletonList(topic), true), requestTimeoutMs);
return topicMetadata.get(topic);
} finally {
release();

3
clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java

@ -265,7 +265,8 @@ public class KafkaProducer<K, V> implements Producer<K, V> { @@ -265,7 +265,8 @@ public class KafkaProducer<K, V> implements Producer<K, V> {
ProducerInterceptor.class);
this.interceptors = interceptorList.isEmpty() ? null : new ProducerInterceptors<>(interceptorList);
ClusterResourceListeners clusterResourceListeners = configureClusterResourceListeners(keySerializer, valueSerializer, interceptorList, reporters);
this.metadata = new Metadata(retryBackoffMs, config.getLong(ProducerConfig.METADATA_MAX_AGE_CONFIG), true, clusterResourceListeners);
this.metadata = new Metadata(retryBackoffMs, config.getLong(ProducerConfig.METADATA_MAX_AGE_CONFIG),
true, true, clusterResourceListeners);
this.maxRequestSize = config.getInt(ProducerConfig.MAX_REQUEST_SIZE_CONFIG);
this.totalMemorySize = config.getLong(ProducerConfig.BUFFER_MEMORY_CONFIG);
this.compressionType = CompressionType.forName(config.getString(ProducerConfig.COMPRESSION_TYPE_CONFIG));

32
clients/src/main/java/org/apache/kafka/common/protocol/Protocol.java

@ -68,6 +68,16 @@ public class Protocol { @@ -68,6 +68,16 @@ public class Protocol {
/* The v3 metadata request is the same as v1 and v2. An additional field for throttle time has been added to the v3 metadata response */
public static final Schema METADATA_REQUEST_V3 = METADATA_REQUEST_V2;
/* The v4 metadata request has an additional field for allowing auto topic creation. The response is the same as v3. */
public static final Schema METADATA_REQUEST_V4 = new Schema(new Field("topics",
ArrayOf.nullable(STRING),
"An array of topics to fetch metadata for. If the topics array is null fetch metadata for all topics."),
new Field("allow_auto_topic_creation",
BOOLEAN,
"If this and the broker config 'auto.create.topics.enable' are true, " +
"topics that don't exist will be created by the broker. " +
"Otherwise, no topics will be created by the broker."));
public static final Schema METADATA_BROKER_V0 = new Schema(new Field("node_id", INT32, "The broker id."),
new Field("host", STRING, "The hostname of the broker."),
new Field("port", INT32,
@ -142,8 +152,10 @@ public class Protocol { @@ -142,8 +152,10 @@ public class Protocol {
"The broker id of the controller broker."),
new Field("topic_metadata", new ArrayOf(TOPIC_METADATA_V1)));
public static final Schema[] METADATA_REQUEST = {METADATA_REQUEST_V0, METADATA_REQUEST_V1, METADATA_REQUEST_V2, METADATA_REQUEST_V3};
public static final Schema[] METADATA_RESPONSE = {METADATA_RESPONSE_V0, METADATA_RESPONSE_V1, METADATA_RESPONSE_V2, METADATA_RESPONSE_V3};
public static final Schema METADATA_RESPONSE_V4 = METADATA_RESPONSE_V3;
public static final Schema[] METADATA_REQUEST = {METADATA_REQUEST_V0, METADATA_REQUEST_V1, METADATA_REQUEST_V2, METADATA_REQUEST_V3, METADATA_REQUEST_V4};
public static final Schema[] METADATA_RESPONSE = {METADATA_RESPONSE_V0, METADATA_RESPONSE_V1, METADATA_RESPONSE_V2, METADATA_RESPONSE_V3, METADATA_RESPONSE_V4};
/* Produce api */
@ -1181,8 +1193,8 @@ public class Protocol { @@ -1181,8 +1193,8 @@ public class Protocol {
new Field("api_versions", new ArrayOf(API_VERSIONS_V0), "API versions supported by the broker."),
newThrottleTimeField());
public static final Schema[] API_VERSIONS_REQUEST = new Schema[]{API_VERSIONS_REQUEST_V0, API_VERSIONS_REQUEST_V1};
public static final Schema[] API_VERSIONS_RESPONSE = new Schema[]{API_VERSIONS_RESPONSE_V0, API_VERSIONS_RESPONSE_V1};
public static final Schema[] API_VERSIONS_REQUEST = {API_VERSIONS_REQUEST_V0, API_VERSIONS_REQUEST_V1};
public static final Schema[] API_VERSIONS_RESPONSE = {API_VERSIONS_RESPONSE_V0, API_VERSIONS_RESPONSE_V1};
/* Admin requests common */
public static final Schema CONFIG_ENTRY = new Schema(new Field("config_name", STRING, "Configuration name"),
@ -1636,8 +1648,8 @@ public class Protocol { @@ -1636,8 +1648,8 @@ public class Protocol {
new ArrayOf(DESCRIBE_ACLS_RESOURCE),
"The resources and their associated ACLs."));
public static final Schema[] DESCRIBE_ACLS_REQUEST = new Schema[] {DESCRIBE_ACLS_REQUEST_V0};
public static final Schema[] DESCRIBE_ACLS_RESPONSE = new Schema[] {DESCRIBE_ACLS_RESPONSE_V0};
public static final Schema[] DESCRIBE_ACLS_REQUEST = {DESCRIBE_ACLS_REQUEST_V0};
public static final Schema[] DESCRIBE_ACLS_RESPONSE = {DESCRIBE_ACLS_RESPONSE_V0};
public static final Schema CREATE_ACLS_REQUEST_V0 = new Schema(
new Field("creations",
@ -1658,8 +1670,8 @@ public class Protocol { @@ -1658,8 +1670,8 @@ public class Protocol {
new Field("error_message", NULLABLE_STRING, "The error message.")
))));
public static final Schema[] CREATE_ACLS_REQUEST = new Schema[] {CREATE_ACLS_REQUEST_V0};
public static final Schema[] CREATE_ACLS_RESPONSE = new Schema[] {CREATE_ACLS_RESPONSE_V0};
public static final Schema[] CREATE_ACLS_REQUEST = {CREATE_ACLS_REQUEST_V0};
public static final Schema[] CREATE_ACLS_RESPONSE = {CREATE_ACLS_RESPONSE_V0};
public static final Schema DELETE_ACLS_REQUEST_V0 = new Schema(
new Field("filters",
@ -1691,8 +1703,8 @@ public class Protocol { @@ -1691,8 +1703,8 @@ public class Protocol {
new Field("error_message", NULLABLE_STRING, "The error message."),
new Field("matching_acls", new ArrayOf(MATCHING_ACL), "The matching ACLs")))));
public static final Schema[] DELETE_ACLS_REQUEST = new Schema[] {DELETE_ACLS_REQUEST_V0};
public static final Schema[] DELETE_ACLS_RESPONSE = new Schema[] {DELETE_ACLS_RESPONSE_V0};
public static final Schema[] DELETE_ACLS_REQUEST = {DELETE_ACLS_REQUEST_V0};
public static final Schema[] DELETE_ACLS_RESPONSE = {DELETE_ACLS_RESPONSE_V0};
/* an array of all requests and responses with all schema versions; a null value in the inner array means that the
* particular version is not supported */

39
clients/src/main/java/org/apache/kafka/common/requests/MetadataRequest.java

@ -30,19 +30,26 @@ import java.util.List; @@ -30,19 +30,26 @@ import java.util.List;
public class MetadataRequest extends AbstractRequest {
private static final String TOPICS_KEY_NAME = "topics";
private static final String ALLOW_AUTO_TOPIC_CREATION_KEY_NAME = "allow_auto_topic_creation";
public static class Builder extends AbstractRequest.Builder<MetadataRequest> {
private static final List<String> ALL_TOPICS = null;
// The list of topics, or null if we want to request metadata about all topics.
private final List<String> topics;
private final boolean allowAutoTopicCreation;
public static Builder allTopics() {
return new Builder(ALL_TOPICS);
// This never causes auto-creation, but we set the boolean to true because that is the default value when
// deserializing V2 and older. This way, the value is consistent after serialization and deserialization.
return new Builder(ALL_TOPICS, true);
}
public Builder(List<String> topics) {
public Builder(List<String> topics, boolean allowAutoTopicCreation) {
super(ApiKeys.METADATA);
this.topics = topics;
this.allowAutoTopicCreation = allowAutoTopicCreation;
}
public List<String> topics() {
@ -55,11 +62,12 @@ public class MetadataRequest extends AbstractRequest { @@ -55,11 +62,12 @@ public class MetadataRequest extends AbstractRequest {
@Override
public MetadataRequest build(short version) {
if (version < 1) {
throw new UnsupportedVersionException("MetadataRequest " +
"versions older than 1 are not supported.");
}
return new MetadataRequest(this.topics, version);
if (version < 1)
throw new UnsupportedVersionException("MetadataRequest versions older than 1 are not supported.");
if (!allowAutoTopicCreation && version < 4)
throw new UnsupportedVersionException("MetadataRequest versions older than 4 don't support the " +
"allowAutoTopicCreation field");
return new MetadataRequest(this.topics, allowAutoTopicCreation, version);
}
@Override
@ -77,18 +85,18 @@ public class MetadataRequest extends AbstractRequest { @@ -77,18 +85,18 @@ public class MetadataRequest extends AbstractRequest {
}
}
private static final String TOPICS_KEY_NAME = "topics";
private final List<String> topics;
private final boolean allowAutoTopicCreation;
/**
* In v0 null is not allowed and an empty list indicates requesting all topics.
* Note: modern clients do not support sending v0 requests.
* In v1 null indicates requesting all topics, and an empty list indicates requesting no topics.
*/
public MetadataRequest(List<String> topics, short version) {
public MetadataRequest(List<String> topics, boolean allowAutoTopicCreation, short version) {
super(version);
this.topics = topics;
this.allowAutoTopicCreation = allowAutoTopicCreation;
}
public MetadataRequest(Struct struct, short version) {
@ -102,6 +110,10 @@ public class MetadataRequest extends AbstractRequest { @@ -102,6 +110,10 @@ public class MetadataRequest extends AbstractRequest {
} else {
topics = null;
}
if (struct.hasField(ALLOW_AUTO_TOPIC_CREATION_KEY_NAME))
allowAutoTopicCreation = struct.getBoolean(ALLOW_AUTO_TOPIC_CREATION_KEY_NAME);
else
allowAutoTopicCreation = true;
}
@Override
@ -122,6 +134,7 @@ public class MetadataRequest extends AbstractRequest { @@ -122,6 +134,7 @@ public class MetadataRequest extends AbstractRequest {
case 2:
return new MetadataResponse(Collections.<Node>emptyList(), null, MetadataResponse.NO_CONTROLLER_ID, topicMetadatas);
case 3:
case 4:
return new MetadataResponse(throttleTimeMs, Collections.<Node>emptyList(), null, MetadataResponse.NO_CONTROLLER_ID, topicMetadatas);
default:
throw new IllegalArgumentException(String.format("Version %d is not valid. Valid versions for %s are 0 to %d",
@ -137,6 +150,10 @@ public class MetadataRequest extends AbstractRequest { @@ -137,6 +150,10 @@ public class MetadataRequest extends AbstractRequest {
return topics;
}
public boolean allowAutoTopicCreation() {
return allowAutoTopicCreation;
}
public static MetadataRequest parse(ByteBuffer buffer, short version) {
return new MetadataRequest(ApiKeys.METADATA.parseRequest(version, buffer), version);
}
@ -148,6 +165,8 @@ public class MetadataRequest extends AbstractRequest { @@ -148,6 +165,8 @@ public class MetadataRequest extends AbstractRequest {
struct.set(TOPICS_KEY_NAME, null);
else
struct.set(TOPICS_KEY_NAME, topics.toArray());
if (struct.hasField(ALLOW_AUTO_TOPIC_CREATION_KEY_NAME))
struct.set(ALLOW_AUTO_TOPIC_CREATION_KEY_NAME, allowAutoTopicCreation);
return struct;
}
}

12
clients/src/test/java/org/apache/kafka/clients/MetadataTest.java

@ -45,8 +45,8 @@ public class MetadataTest { @@ -45,8 +45,8 @@ public class MetadataTest {
private long refreshBackoffMs = 100;
private long metadataExpireMs = 1000;
private Metadata metadata = new Metadata(refreshBackoffMs, metadataExpireMs);
private AtomicReference<String> backgroundError = new AtomicReference<String>();
private Metadata metadata = new Metadata(refreshBackoffMs, metadataExpireMs, true);
private AtomicReference<String> backgroundError = new AtomicReference<>();
@After
public void tearDown() {
@ -96,7 +96,7 @@ public class MetadataTest { @@ -96,7 +96,7 @@ public class MetadataTest {
}
long largerOfBackoffAndExpire = Math.max(refreshBackoffMs, metadataExpireMs);
Metadata metadata = new Metadata(refreshBackoffMs, metadataExpireMs);
Metadata metadata = new Metadata(refreshBackoffMs, metadataExpireMs, true);
assertEquals(0, metadata.timeToNextUpdate(now));
@ -255,7 +255,7 @@ public class MetadataTest { @@ -255,7 +255,7 @@ public class MetadataTest {
MockClusterResourceListener mockClusterListener = new MockClusterResourceListener();
ClusterResourceListeners listeners = new ClusterResourceListeners();
listeners.maybeAdd(mockClusterListener);
metadata = new Metadata(refreshBackoffMs, metadataExpireMs, false, listeners);
metadata = new Metadata(refreshBackoffMs, metadataExpireMs, true, false, listeners);
String hostName = "www.example.com";
Cluster cluster = Cluster.bootstrap(Arrays.asList(new InetSocketAddress(hostName, 9002)));
@ -348,7 +348,7 @@ public class MetadataTest { @@ -348,7 +348,7 @@ public class MetadataTest {
@Test
public void testTopicExpiry() throws Exception {
metadata = new Metadata(refreshBackoffMs, metadataExpireMs, true, new ClusterResourceListeners());
metadata = new Metadata(refreshBackoffMs, metadataExpireMs, true, true, new ClusterResourceListeners());
// Test that topic is expired if not used within the expiry interval
long time = 0;
@ -380,7 +380,7 @@ public class MetadataTest { @@ -380,7 +380,7 @@ public class MetadataTest {
@Test
public void testNonExpiringMetadata() throws Exception {
metadata = new Metadata(refreshBackoffMs, metadataExpireMs, false, new ClusterResourceListeners());
metadata = new Metadata(refreshBackoffMs, metadataExpireMs, true, false, new ClusterResourceListeners());
// Test that topic is not expired if not used within the expiry interval
long time = 0;

8
clients/src/test/java/org/apache/kafka/clients/NetworkClientTest.java

@ -49,7 +49,7 @@ public class NetworkClientTest { @@ -49,7 +49,7 @@ public class NetworkClientTest {
protected final int requestTimeoutMs = 1000;
protected final MockTime time = new MockTime();
protected final MockSelector selector = new MockSelector(time);
protected final Metadata metadata = new Metadata(0, Long.MAX_VALUE);
protected final Metadata metadata = new Metadata(0, Long.MAX_VALUE, true);
protected final int nodeId = 1;
protected final Cluster cluster = TestUtils.singletonCluster("test", nodeId);
protected final Node node = cluster.nodes().get(0);
@ -86,8 +86,7 @@ public class NetworkClientTest { @@ -86,8 +86,7 @@ public class NetworkClientTest {
@Test(expected = IllegalStateException.class)
public void testSendToUnreadyNode() {
MetadataRequest.Builder builder =
new MetadataRequest.Builder(Arrays.asList("test"));
MetadataRequest.Builder builder = new MetadataRequest.Builder(Arrays.asList("test"), true);
long now = time.milliseconds();
ClientRequest request = client.newClientRequest("5", builder, now, false);
client.send(request, now);
@ -251,8 +250,7 @@ public class NetworkClientTest { @@ -251,8 +250,7 @@ public class NetworkClientTest {
// metadata request when the remote node disconnects with the request in-flight.
awaitReady(client, node);
MetadataRequest.Builder builder =
new MetadataRequest.Builder(Collections.<String>emptyList());
MetadataRequest.Builder builder = new MetadataRequest.Builder(Collections.<String>emptyList(), true);
long now = time.milliseconds();
ClientRequest request = client.newClientRequest(node.idString(), builder, now, true);
client.send(request, now);

2
clients/src/test/java/org/apache/kafka/clients/admin/MockKafkaAdminClientEnv.java

@ -50,7 +50,7 @@ public class MockKafkaAdminClientEnv implements AutoCloseable { @@ -50,7 +50,7 @@ public class MockKafkaAdminClientEnv implements AutoCloseable {
this.adminClientConfig = new AdminClientConfig(config);
this.cluster = cluster;
this.metadata = new Metadata(adminClientConfig.getLong(AdminClientConfig.RETRY_BACKOFF_MS_CONFIG),
adminClientConfig.getLong(AdminClientConfig.METADATA_MAX_AGE_CONFIG));
adminClientConfig.getLong(AdminClientConfig.METADATA_MAX_AGE_CONFIG), false);
this.mockClient = new MockClient(Time.SYSTEM, this.metadata);
this.client = KafkaAdminClient.createInternal(adminClientConfig, mockClient, metadata);
}

36
clients/src/test/java/org/apache/kafka/clients/consumer/KafkaConsumerTest.java

@ -366,7 +366,7 @@ public class KafkaConsumerTest { @@ -366,7 +366,7 @@ public class KafkaConsumerTest {
Cluster cluster = TestUtils.singletonCluster(topic, 1);
Node node = cluster.nodes().get(0);
Metadata metadata = new Metadata(0, Long.MAX_VALUE);
Metadata metadata = createMetadata();
metadata.update(cluster, Collections.<String>emptySet(), time.milliseconds());
MockClient client = new MockClient(time, metadata);
@ -407,7 +407,7 @@ public class KafkaConsumerTest { @@ -407,7 +407,7 @@ public class KafkaConsumerTest {
Cluster cluster = TestUtils.singletonCluster(topic, 1);
Node node = cluster.nodes().get(0);
Metadata metadata = new Metadata(0, Long.MAX_VALUE);
Metadata metadata = createMetadata();
metadata.update(cluster, Collections.<String>emptySet(), time.milliseconds());
MockClient client = new MockClient(time, metadata);
@ -448,7 +448,7 @@ public class KafkaConsumerTest { @@ -448,7 +448,7 @@ public class KafkaConsumerTest {
Cluster cluster = TestUtils.singletonCluster(topic, 1);
Node node = cluster.nodes().get(0);
Metadata metadata = new Metadata(0, Long.MAX_VALUE);
Metadata metadata = createMetadata();
metadata.update(cluster, Collections.<String>emptySet(), time.milliseconds());
MockClient client = new MockClient(time, metadata);
@ -484,7 +484,7 @@ public class KafkaConsumerTest { @@ -484,7 +484,7 @@ public class KafkaConsumerTest {
Cluster cluster = TestUtils.singletonCluster(topic, 1);
Node node = cluster.nodes().get(0);
Metadata metadata = new Metadata(0, Long.MAX_VALUE);
Metadata metadata = createMetadata();
metadata.update(cluster, Collections.<String>emptySet(), time.milliseconds());
MockClient client = new MockClient(time, metadata);
@ -534,7 +534,7 @@ public class KafkaConsumerTest { @@ -534,7 +534,7 @@ public class KafkaConsumerTest {
Cluster cluster = TestUtils.singletonCluster(topic, 1);
Node node = cluster.nodes().get(0);
Metadata metadata = new Metadata(0, Long.MAX_VALUE);
Metadata metadata = createMetadata();
metadata.update(cluster, Collections.<String>emptySet(), time.milliseconds());
MockClient client = new MockClient(time, metadata);
@ -581,7 +581,7 @@ public class KafkaConsumerTest { @@ -581,7 +581,7 @@ public class KafkaConsumerTest {
topicMetadata.put(unmatchedTopic, 1);
Cluster cluster = TestUtils.clusterWith(1, topicMetadata);
Metadata metadata = new Metadata(0, Long.MAX_VALUE);
Metadata metadata = createMetadata();
Node node = cluster.nodes().get(0);
MockClient client = new MockClient(time, metadata);
@ -621,7 +621,7 @@ public class KafkaConsumerTest { @@ -621,7 +621,7 @@ public class KafkaConsumerTest {
topicMetadata.put(otherTopic, 1);
Cluster cluster = TestUtils.clusterWith(1, topicMetadata);
Metadata metadata = new Metadata(0, Long.MAX_VALUE);
Metadata metadata = createMetadata();
Node node = cluster.nodes().get(0);
MockClient client = new MockClient(time, metadata);
@ -664,7 +664,7 @@ public class KafkaConsumerTest { @@ -664,7 +664,7 @@ public class KafkaConsumerTest {
Cluster cluster = TestUtils.singletonCluster(topic, 1);
Node node = cluster.nodes().get(0);
Metadata metadata = new Metadata(0, Long.MAX_VALUE);
Metadata metadata = createMetadata();
metadata.update(cluster, Collections.<String>emptySet(), time.milliseconds());
MockClient client = new MockClient(time, metadata);
@ -720,7 +720,7 @@ public class KafkaConsumerTest { @@ -720,7 +720,7 @@ public class KafkaConsumerTest {
Cluster cluster = TestUtils.singletonCluster(topic, 1);
final Node node = cluster.nodes().get(0);
Metadata metadata = new Metadata(0, Long.MAX_VALUE);
Metadata metadata = createMetadata();
metadata.update(cluster, Collections.<String>emptySet(), time.milliseconds());
final MockClient client = new MockClient(time, metadata);
@ -760,7 +760,7 @@ public class KafkaConsumerTest { @@ -760,7 +760,7 @@ public class KafkaConsumerTest {
Cluster cluster = TestUtils.singletonCluster(singletonMap(topic, 1));
Node node = cluster.nodes().get(0);
Metadata metadata = new Metadata(0, Long.MAX_VALUE);
Metadata metadata = createMetadata();
metadata.update(cluster, Collections.<String>emptySet(), time.milliseconds());
MockClient client = new MockClient(time, metadata);
@ -808,7 +808,7 @@ public class KafkaConsumerTest { @@ -808,7 +808,7 @@ public class KafkaConsumerTest {
Cluster cluster = TestUtils.singletonCluster(tpCounts);
Node node = cluster.nodes().get(0);
Metadata metadata = new Metadata(0, Long.MAX_VALUE);
Metadata metadata = createMetadata();
metadata.update(cluster, Collections.<String>emptySet(), time.milliseconds());
MockClient client = new MockClient(time, metadata);
@ -928,7 +928,7 @@ public class KafkaConsumerTest { @@ -928,7 +928,7 @@ public class KafkaConsumerTest {
Cluster cluster = TestUtils.singletonCluster(tpCounts);
Node node = cluster.nodes().get(0);
Metadata metadata = new Metadata(0, Long.MAX_VALUE);
Metadata metadata = createMetadata();
metadata.update(cluster, Collections.<String>emptySet(), time.milliseconds());
MockClient client = new MockClient(time, metadata);
@ -996,7 +996,7 @@ public class KafkaConsumerTest { @@ -996,7 +996,7 @@ public class KafkaConsumerTest {
Cluster cluster = TestUtils.singletonCluster(tpCounts);
Node node = cluster.nodes().get(0);
Metadata metadata = new Metadata(0, Long.MAX_VALUE);
Metadata metadata = createMetadata();
metadata.update(cluster, Collections.<String>emptySet(), time.milliseconds());
MockClient client = new MockClient(time, metadata);
@ -1061,7 +1061,7 @@ public class KafkaConsumerTest { @@ -1061,7 +1061,7 @@ public class KafkaConsumerTest {
Cluster cluster = TestUtils.singletonCluster(tpCounts);
Node node = cluster.nodes().get(0);
Metadata metadata = new Metadata(0, Long.MAX_VALUE);
Metadata metadata = createMetadata();
metadata.update(cluster, Collections.<String>emptySet(), time.milliseconds());
MockClient client = new MockClient(time, metadata);
@ -1122,7 +1122,7 @@ public class KafkaConsumerTest { @@ -1122,7 +1122,7 @@ public class KafkaConsumerTest {
Cluster cluster = TestUtils.singletonCluster(topic, 2);
Node node = cluster.nodes().get(0);
Metadata metadata = new Metadata(0, Long.MAX_VALUE);
Metadata metadata = createMetadata();
metadata.update(cluster, Collections.<String>emptySet(), time.milliseconds());
MockClient client = new MockClient(time, metadata);
@ -1268,7 +1268,7 @@ public class KafkaConsumerTest { @@ -1268,7 +1268,7 @@ public class KafkaConsumerTest {
Cluster cluster = TestUtils.singletonCluster(topic, 1);
Node node = cluster.nodes().get(0);
Metadata metadata = new Metadata(0, Long.MAX_VALUE);
Metadata metadata = createMetadata();
metadata.update(cluster, Collections.<String>emptySet(), time.milliseconds());
MockClient client = new MockClient(time, metadata);
@ -1372,6 +1372,10 @@ public class KafkaConsumerTest { @@ -1372,6 +1372,10 @@ public class KafkaConsumerTest {
};
}
private Metadata createMetadata() {
return new Metadata(0, Long.MAX_VALUE, true);
}
private Node prepareRebalance(MockClient client, Node node, final Set<String> subscribedTopics, PartitionAssignor assignor, List<TopicPartition> partitions, Node coordinator) {
if (coordinator == null) {
// lookup coordinator

2
clients/src/test/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinatorTest.java

@ -73,7 +73,7 @@ public class AbstractCoordinatorTest { @@ -73,7 +73,7 @@ public class AbstractCoordinatorTest {
this.mockTime = new MockTime();
this.mockClient = new MockClient(mockTime);
Metadata metadata = new Metadata();
Metadata metadata = new Metadata(100L, 60 * 60 * 1000L, true);
this.consumerClient = new ConsumerNetworkClient(mockClient, metadata, mockTime,
RETRY_BACKOFF_MS, REQUEST_TIMEOUT_MS);
Metrics metrics = new Metrics();

2
clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinatorTest.java

@ -119,7 +119,7 @@ public class ConsumerCoordinatorTest { @@ -119,7 +119,7 @@ public class ConsumerCoordinatorTest {
public void setup() {
this.time = new MockTime();
this.subscriptions = new SubscriptionState(OffsetResetStrategy.EARLIEST);
this.metadata = new Metadata(0, Long.MAX_VALUE);
this.metadata = new Metadata(0, Long.MAX_VALUE, true);
this.metadata.update(cluster, Collections.<String>emptySet(), time.milliseconds());
this.client = new MockClient(time, metadata);
this.consumerClient = new ConsumerNetworkClient(client, metadata, time, 100, 1000);

2
clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerNetworkClientTest.java

@ -46,7 +46,7 @@ public class ConsumerNetworkClientTest { @@ -46,7 +46,7 @@ public class ConsumerNetworkClientTest {
private MockClient client = new MockClient(time);
private Cluster cluster = TestUtils.singletonCluster(topicName, 1);
private Node node = cluster.nodes().get(0);
private Metadata metadata = new Metadata(0, Long.MAX_VALUE);
private Metadata metadata = new Metadata(0, Long.MAX_VALUE, true);
private ConsumerNetworkClient consumerClient = new ConsumerNetworkClient(client, metadata, time, 100, 1000);
@Test

14
clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetcherTest.java

@ -118,7 +118,7 @@ public class FetcherTest { @@ -118,7 +118,7 @@ public class FetcherTest {
private int fetchSize = 1000;
private long retryBackoffMs = 100;
private MockTime time = new MockTime(1);
private Metadata metadata = new Metadata(0, Long.MAX_VALUE);
private Metadata metadata = new Metadata(0, Long.MAX_VALUE, true);
private MockClient client = new MockClient(time, metadata);
private Cluster cluster = TestUtils.singletonCluster(topicName, 2);
private Node node = cluster.nodes().get(0);
@ -1068,16 +1068,15 @@ public class FetcherTest { @@ -1068,16 +1068,15 @@ public class FetcherTest {
public void testGetTopicMetadataInvalidTopic() {
client.prepareResponse(newMetadataResponse(topicName, Errors.INVALID_TOPIC_EXCEPTION));
fetcher.getTopicMetadata(
new MetadataRequest.Builder(Collections.singletonList(topicName)), 5000L);
new MetadataRequest.Builder(Collections.singletonList(topicName), true), 5000L);
}
@Test
public void testGetTopicMetadataUnknownTopic() {
client.prepareResponse(newMetadataResponse(topicName, Errors.UNKNOWN_TOPIC_OR_PARTITION));
Map<String, List<PartitionInfo>> topicMetadata =
fetcher.getTopicMetadata(
new MetadataRequest.Builder(Collections.singletonList(topicName)), 5000L);
Map<String, List<PartitionInfo>> topicMetadata = fetcher.getTopicMetadata(
new MetadataRequest.Builder(Collections.singletonList(topicName), true), 5000L);
assertNull(topicMetadata.get(topicName));
}
@ -1086,9 +1085,8 @@ public class FetcherTest { @@ -1086,9 +1085,8 @@ public class FetcherTest {
client.prepareResponse(newMetadataResponse(topicName, Errors.LEADER_NOT_AVAILABLE));
client.prepareResponse(newMetadataResponse(topicName, Errors.NONE));
Map<String, List<PartitionInfo>> topicMetadata =
fetcher.getTopicMetadata(
new MetadataRequest.Builder(Collections.singletonList(topicName)), 5000L);
Map<String, List<PartitionInfo>> topicMetadata = fetcher.getTopicMetadata(
new MetadataRequest.Builder(Collections.singletonList(topicName), true), 5000L);
assertTrue(topicMetadata.containsKey(topicName));
}

3
clients/src/test/java/org/apache/kafka/clients/producer/KafkaProducerTest.java

@ -325,7 +325,8 @@ public class KafkaProducerTest { @@ -325,7 +325,8 @@ public class KafkaProducerTest {
KafkaProducer<String, String> producer = new KafkaProducer<>(props, new StringSerializer(), new StringSerializer());
long refreshBackoffMs = 500L;
long metadataExpireMs = 60000L;
final Metadata metadata = new Metadata(refreshBackoffMs, metadataExpireMs, true, new ClusterResourceListeners());
final Metadata metadata = new Metadata(refreshBackoffMs, metadataExpireMs, true,
true, new ClusterResourceListeners());
final Time time = new MockTime();
MemberModifier.field(KafkaProducer.class, "metadata").set(producer, metadata);
MemberModifier.field(KafkaProducer.class, "time").set(producer, time);

2
clients/src/test/java/org/apache/kafka/clients/producer/internals/SenderTest.java

@ -87,7 +87,7 @@ public class SenderTest { @@ -87,7 +87,7 @@ public class SenderTest {
private MockTime time = new MockTime();
private MockClient client = new MockClient(time);
private int batchSize = 16 * 1024;
private Metadata metadata = new Metadata(0, Long.MAX_VALUE, true, new ClusterResourceListeners());
private Metadata metadata = new Metadata(0, Long.MAX_VALUE, true, true, new ClusterResourceListeners());
private ApiVersions apiVersions = new ApiVersions();
private Cluster cluster = TestUtils.singletonCluster("test", 2);
private Metrics metrics = null;

2
clients/src/test/java/org/apache/kafka/clients/producer/internals/TransactionManagerTest.java

@ -92,7 +92,7 @@ public class TransactionManagerTest { @@ -92,7 +92,7 @@ public class TransactionManagerTest {
private MockTime time = new MockTime();
private MockClient client = new MockClient(time);
private Metadata metadata = new Metadata(0, Long.MAX_VALUE, true, new ClusterResourceListeners());
private Metadata metadata = new Metadata(0, Long.MAX_VALUE, true, true, new ClusterResourceListeners());
private ApiVersions apiVersions = new ApiVersions();
private Cluster cluster = TestUtils.singletonCluster("test", 2);
private RecordAccumulator accumulator = null;

6
clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java

@ -117,6 +117,10 @@ public class RequestResponseTest { @@ -117,6 +117,10 @@ public class RequestResponseTest {
checkErrorResponse(createMetadataRequest(1, singletonList("topic1")), new UnknownServerException());
checkResponse(createMetadataResponse(), 2);
checkErrorResponse(createMetadataRequest(2, singletonList("topic1")), new UnknownServerException());
checkResponse(createMetadataResponse(), 3);
checkErrorResponse(createMetadataRequest(3, singletonList("topic1")), new UnknownServerException());
checkResponse(createMetadataResponse(), 4);
checkErrorResponse(createMetadataRequest(4, singletonList("topic1")), new UnknownServerException());
checkRequest(createOffsetCommitRequest(2));
checkErrorResponse(createOffsetCommitRequest(2), new UnknownServerException());
checkResponse(createOffsetCommitResponse(), 0);
@ -703,7 +707,7 @@ public class RequestResponseTest { @@ -703,7 +707,7 @@ public class RequestResponseTest {
}
private MetadataRequest createMetadataRequest(int version, List<String> topics) {
return new MetadataRequest.Builder(topics).build((short) version);
return new MetadataRequest.Builder(topics, true).build((short) version);
}
private MetadataResponse createMetadataResponse() {

11
clients/src/test/java/org/apache/kafka/common/security/authenticator/SaslAuthenticatorTest.java

@ -548,10 +548,10 @@ public class SaslAuthenticatorTest { @@ -548,10 +548,10 @@ public class SaslAuthenticatorTest {
// Send metadata request before Kafka SASL handshake request
String node1 = "invalid1";
createClientConnection(SecurityProtocol.PLAINTEXT, node1);
MetadataRequest metadataRequest1 =
new MetadataRequest.Builder(Collections.singletonList("sometopic")).build();
RequestHeader metadataRequestHeader1 = new RequestHeader(ApiKeys.METADATA.id,
metadataRequest1.version(), "someclient", 1);
MetadataRequest metadataRequest1 = new MetadataRequest.Builder(Collections.singletonList("sometopic"),
true).build();
RequestHeader metadataRequestHeader1 = new RequestHeader(ApiKeys.METADATA.id, metadataRequest1.version(),
"someclient", 1);
selector.send(metadataRequest1.toSend(node1, metadataRequestHeader1));
NetworkTestUtils.waitForChannelClose(selector, node1, ChannelState.READY);
selector.close();
@ -563,8 +563,7 @@ public class SaslAuthenticatorTest { @@ -563,8 +563,7 @@ public class SaslAuthenticatorTest {
String node2 = "invalid2";
createClientConnection(SecurityProtocol.PLAINTEXT, node2);
sendHandshakeRequestReceiveResponse(node2);
MetadataRequest metadataRequest2 =
new MetadataRequest.Builder(Collections.singletonList("sometopic")).build();
MetadataRequest metadataRequest2 = new MetadataRequest.Builder(Collections.singletonList("sometopic"), true).build();
RequestHeader metadataRequestHeader2 = new RequestHeader(ApiKeys.METADATA.id,
metadataRequest2.version(), "someclient", 2);
selector.send(metadataRequest2.toSend(node2, metadataRequestHeader2));

2
connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/WorkerGroupMember.java

@ -87,7 +87,7 @@ public class WorkerGroupMember { @@ -87,7 +87,7 @@ public class WorkerGroupMember {
reporters.add(new JmxReporter(JMX_PREFIX));
this.metrics = new Metrics(metricConfig, reporters, time);
this.retryBackoffMs = config.getLong(CommonClientConfigs.RETRY_BACKOFF_MS_CONFIG);
this.metadata = new Metadata(retryBackoffMs, config.getLong(CommonClientConfigs.METADATA_MAX_AGE_CONFIG));
this.metadata = new Metadata(retryBackoffMs, config.getLong(CommonClientConfigs.METADATA_MAX_AGE_CONFIG), true);
List<InetSocketAddress> addresses = ClientUtils.parseAndValidateAddresses(config.getList(CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG));
this.metadata.update(Cluster.bootstrap(addresses), Collections.<String>emptySet(), 0);
String metricGrpPrefix = "connect";

2
connect/runtime/src/test/java/org/apache/kafka/connect/runtime/distributed/WorkerCoordinatorTest.java

@ -90,7 +90,7 @@ public class WorkerCoordinatorTest { @@ -90,7 +90,7 @@ public class WorkerCoordinatorTest {
public void setup() {
this.time = new MockTime();
this.client = new MockClient(time);
this.metadata = new Metadata(0, Long.MAX_VALUE);
this.metadata = new Metadata(0, Long.MAX_VALUE, true);
this.metadata.update(cluster, Collections.<String>emptySet(), time.milliseconds());
this.consumerClient = new ConsumerNetworkClient(client, metadata, time, 100, 1000);
this.metrics = new Metrics(time);

4
core/src/main/scala/kafka/admin/AdminClient.scala

@ -213,7 +213,7 @@ class AdminClient(val time: Time, @@ -213,7 +213,7 @@ class AdminClient(val time: Time,
*/
def deleteRecordsBefore(offsets: Map[TopicPartition, Long]): Future[Map[TopicPartition, DeleteRecordsResult]] = {
val metadataRequest = new MetadataRequest.Builder(offsets.keys.map(_.topic()).toSet.toList.asJava)
val metadataRequest = new MetadataRequest.Builder(offsets.keys.map(_.topic).toSet.toList.asJava, true)
val response = sendAnyNode(ApiKeys.METADATA, metadataRequest).asInstanceOf[MetadataResponse]
val errors = response.errors
if (!errors.isEmpty)
@ -425,7 +425,7 @@ object AdminClient { @@ -425,7 +425,7 @@ object AdminClient {
def create(config: AdminConfig): AdminClient = {
val time = Time.SYSTEM
val metrics = new Metrics(time)
val metadata = new Metadata
val metadata = new Metadata(100L, 60 * 60 * 1000L, true)
val channelBuilder = ClientUtils.createChannelBuilder(config)
val requestTimeoutMs = config.getInt(CommonClientConfigs.REQUEST_TIMEOUT_MS_CONFIG)
val retryBackoffMs = config.getLong(CommonClientConfigs.RETRY_BACKOFF_MS_CONFIG)

10
core/src/main/scala/kafka/server/KafkaApis.scala

@ -886,7 +886,8 @@ class KafkaApis(val requestChannel: RequestChannel, @@ -886,7 +886,8 @@ class KafkaApis(val requestChannel: RequestChannel,
topicMetadata.headOption.getOrElse(createInternalTopic(topic))
}
private def getTopicMetadata(topics: Set[String], listenerName: ListenerName, errorUnavailableEndpoints: Boolean): Seq[MetadataResponse.TopicMetadata] = {
private def getTopicMetadata(allowAutoTopicCreation: Boolean, topics: Set[String], listenerName: ListenerName,
errorUnavailableEndpoints: Boolean): Seq[MetadataResponse.TopicMetadata] = {
val topicResponses = metadataCache.getTopicMetadata(topics, listenerName, errorUnavailableEndpoints)
if (topics.isEmpty || topicResponses.size == topics.size) {
topicResponses
@ -899,7 +900,7 @@ class KafkaApis(val requestChannel: RequestChannel, @@ -899,7 +900,7 @@ class KafkaApis(val requestChannel: RequestChannel,
new MetadataResponse.TopicMetadata(Errors.INVALID_REPLICATION_FACTOR, topic, true, java.util.Collections.emptyList())
else
topicMetadata
} else if (config.autoCreateTopicsEnable) {
} else if (allowAutoTopicCreation && config.autoCreateTopicsEnable) {
createTopic(topic, config.numPartitions, config.defaultReplicationFactor)
} else {
new MetadataResponse.TopicMetadata(Errors.UNKNOWN_TOPIC_OR_PARTITION, topic, false, java.util.Collections.emptyList())
@ -937,7 +938,7 @@ class KafkaApis(val requestChannel: RequestChannel, @@ -937,7 +938,7 @@ class KafkaApis(val requestChannel: RequestChannel,
if (authorizedTopics.nonEmpty) {
val nonExistingTopics = metadataCache.getNonExistingTopics(authorizedTopics)
if (config.autoCreateTopicsEnable && nonExistingTopics.nonEmpty) {
if (metadataRequest.allowAutoTopicCreation && config.autoCreateTopicsEnable && nonExistingTopics.nonEmpty) {
if (!authorize(request.session, Create, Resource.ClusterResource)) {
authorizedTopics --= nonExistingTopics
unauthorizedForCreateTopics ++= nonExistingTopics
@ -965,7 +966,8 @@ class KafkaApis(val requestChannel: RequestChannel, @@ -965,7 +966,8 @@ class KafkaApis(val requestChannel: RequestChannel,
if (authorizedTopics.isEmpty)
Seq.empty[MetadataResponse.TopicMetadata]
else
getTopicMetadata(authorizedTopics, request.listenerName, errorUnavailableEndpoints)
getTopicMetadata(metadataRequest.allowAutoTopicCreation, authorizedTopics, request.listenerName,
errorUnavailableEndpoints)
val completeTopicMetadata = topicMetadata ++ unauthorizedForCreateTopicMetadata ++ unauthorizedForDescribeTopicMetadata

10
core/src/test/scala/integration/kafka/api/AuthorizerIntegrationTest.scala

@ -222,8 +222,8 @@ class AuthorizerIntegrationTest extends BaseRequestTest { @@ -222,8 +222,8 @@ class AuthorizerIntegrationTest extends BaseRequestTest {
super.tearDown()
}
private def createMetadataRequest = {
new requests.MetadataRequest.Builder(List(topic).asJava).build()
private def createMetadataRequest(allowAutoTopicCreation: Boolean) = {
new requests.MetadataRequest.Builder(List(topic).asJava, allowAutoTopicCreation).build()
}
private def createProduceRequest = {
@ -328,7 +328,7 @@ class AuthorizerIntegrationTest extends BaseRequestTest { @@ -328,7 +328,7 @@ class AuthorizerIntegrationTest extends BaseRequestTest {
@Test
def testAuthorizationWithTopicExisting() {
val requestKeyToRequest = mutable.LinkedHashMap[ApiKeys, AbstractRequest](
ApiKeys.METADATA -> createMetadataRequest,
ApiKeys.METADATA -> createMetadataRequest(allowAutoTopicCreation = true),
ApiKeys.PRODUCE -> createProduceRequest,
ApiKeys.FETCH -> createFetchRequest,
ApiKeys.LIST_OFFSETS -> createListOffsetsRequest,
@ -381,6 +381,8 @@ class AuthorizerIntegrationTest extends BaseRequestTest { @@ -381,6 +381,8 @@ class AuthorizerIntegrationTest extends BaseRequestTest {
TestUtils.verifyTopicDeletion(zkUtils, deleteTopic, 1, servers)
val requestKeyToRequest = mutable.LinkedHashMap[ApiKeys, AbstractRequest](
ApiKeys.METADATA -> createMetadataRequest(allowAutoTopicCreation = true),
ApiKeys.METADATA -> createMetadataRequest(allowAutoTopicCreation = false),
ApiKeys.PRODUCE -> createProduceRequest,
ApiKeys.FETCH -> createFetchRequest,
ApiKeys.LIST_OFFSETS -> createListOffsetsRequest,
@ -397,7 +399,7 @@ class AuthorizerIntegrationTest extends BaseRequestTest { @@ -397,7 +399,7 @@ class AuthorizerIntegrationTest extends BaseRequestTest {
val resourceToAcls = requestKeysToAcls(key)
resourceToAcls.get(topicResource).foreach { acls =>
val describeAcls = topicDescribeAcl(topicResource)
val isAuthorized = describeAcls == acls
val isAuthorized = describeAcls == acls
addAndVerifyAcls(describeAcls, topicResource)
sendRequestAndVerifyResponseError(key, request, resources, isAuthorized = isAuthorized, isAuthorizedTopicDescribe = true, topicExists = false)
removeAllAcls()

31
core/src/test/scala/integration/kafka/api/KafkaAdminClientIntegrationTest.scala

@ -30,8 +30,7 @@ import org.apache.kafka.clients.admin.NewTopic @@ -30,8 +30,7 @@ import org.apache.kafka.clients.admin.NewTopic
import org.apache.kafka.common.KafkaFuture
import org.apache.kafka.common.acl.{AccessControlEntry, AclBinding, AclBindingFilter, AclOperation, AclPermissionType}
import org.apache.kafka.common.config.ConfigResource
import org.apache.kafka.common.errors.{InvalidRequestException, SecurityDisabledException, TopicExistsException}
import org.apache.kafka.common.errors.{InvalidRequestException, SecurityDisabledException, TimeoutException, TopicExistsException}
import org.apache.kafka.common.errors.{InvalidRequestException, SecurityDisabledException, TimeoutException, TopicExistsException, UnknownTopicOrPartitionException}
import org.apache.kafka.common.protocol.ApiKeys
import org.junit.{After, Before, Rule, Test}
import org.apache.kafka.common.requests.MetadataResponse
@ -80,7 +79,7 @@ class KafkaAdminClientIntegrationTest extends KafkaServerTestHarness with Loggin @@ -80,7 +79,7 @@ class KafkaAdminClientIntegrationTest extends KafkaServerTestHarness with Loggin
def waitForTopics(client: AdminClient, expectedPresent: Seq[String], expectedMissing: Seq[String]): Unit = {
TestUtils.waitUntilTrue(() => {
val topics = client.listTopics().names().get()
val topics = client.listTopics.names.get()
expectedPresent.forall(topicName => topics.contains(topicName)) &&
expectedMissing.forall(topicName => !topics.contains(topicName))
}, "timed out waiting for topics")
@ -123,21 +122,39 @@ class KafkaAdminClientIntegrationTest extends KafkaServerTestHarness with Loggin @@ -123,21 +122,39 @@ class KafkaAdminClientIntegrationTest extends KafkaServerTestHarness with Loggin
val topics = Seq("mytopic", "mytopic2")
val newTopics = topics.map(new NewTopic(_, 1, 1))
client.createTopics(newTopics.asJava, new CreateTopicsOptions().validateOnly(true)).all.get()
waitForTopics(client, List(), List("mytopic", "mytopic2"))
waitForTopics(client, List(), topics)
client.createTopics(newTopics.asJava).all.get()
waitForTopics(client, List("mytopic", "mytopic2"), List())
waitForTopics(client, topics, List())
val results = client.createTopics(newTopics.asJava).results()
assertTrue(results.containsKey("mytopic"))
assertFutureExceptionTypeEquals(results.get("mytopic"), classOf[TopicExistsException])
assertTrue(results.containsKey("mytopic2"))
assertFutureExceptionTypeEquals(results.get("mytopic2"), classOf[TopicExistsException])
val topicsFromDescribe = client.describeTopics(Seq("mytopic", "mytopic2").asJava).all.get().asScala.keys
val topicsFromDescribe = client.describeTopics(topics.asJava).all.get().asScala.keys
assertEquals(topics.toSet, topicsFromDescribe)
client.deleteTopics(topics.asJava).all.get()
waitForTopics(client, List(), List("mytopic", "mytopic2"))
waitForTopics(client, List(), topics)
}
/**
* describe should not auto create topics
*/
@Test
def testDescribeNonExistingTopic(): Unit = {
client = AdminClient.create(createConfig())
val existingTopic = "existing-topic"
client.createTopics(Seq(existingTopic).map(new NewTopic(_, 1, 1)).asJava).all.get()
waitForTopics(client, Seq(existingTopic), List())
val nonExistingTopic = "non-existing"
val results = client.describeTopics(Seq(nonExistingTopic, existingTopic).asJava).results
assertEquals(existingTopic, results.get(existingTopic).get.name)
intercept[ExecutionException](results.get(nonExistingTopic).get).getCause.isInstanceOf[UnknownTopicOrPartitionException]
assertEquals(None, zkUtils.getTopicPartitionCount(nonExistingTopic))
}
@Test

4
core/src/test/scala/unit/kafka/server/AbstractCreateTopicsRequestTest.scala

@ -43,7 +43,7 @@ class AbstractCreateTopicsRequestTest extends BaseRequestTest { @@ -43,7 +43,7 @@ class AbstractCreateTopicsRequestTest extends BaseRequestTest {
def verifyMetadata(socketServer: SocketServer) = {
val metadata = sendMetadataRequest(
new MetadataRequest.Builder(List(topic).asJava).build()).topicMetadata.asScala
new MetadataRequest.Builder(List(topic).asJava, true).build()).topicMetadata.asScala
val metadataForTopic = metadata.filter(_.topic == topic).head
val partitions = if (!details.replicasAssignments.isEmpty)
@ -127,7 +127,7 @@ class AbstractCreateTopicsRequestTest extends BaseRequestTest { @@ -127,7 +127,7 @@ class AbstractCreateTopicsRequestTest extends BaseRequestTest {
protected def validateTopicExists(topic: String): Unit = {
TestUtils.waitUntilMetadataIsPropagated(servers, topic, 0)
val metadata = sendMetadataRequest(
new MetadataRequest.Builder(List(topic).asJava).build()).topicMetadata.asScala
new MetadataRequest.Builder(List(topic).asJava, true).build()).topicMetadata.asScala
assertTrue("The topic should be created", metadata.exists(p => p.topic.equals(topic) && p.error == Errors.NONE))
}

2
core/src/test/scala/unit/kafka/server/DeleteTopicsRequestTest.scala

@ -106,7 +106,7 @@ class DeleteTopicsRequestTest extends BaseRequestTest { @@ -106,7 +106,7 @@ class DeleteTopicsRequestTest extends BaseRequestTest {
private def validateTopicIsDeleted(topic: String): Unit = {
val metadata = sendMetadataRequest(new MetadataRequest.
Builder(List(topic).asJava).build).topicMetadata.asScala
Builder(List(topic).asJava, true).build).topicMetadata.asScala
TestUtils.waitUntilTrue (() => !metadata.exists(p => p.topic.equals(topic) && p.error == Errors.NONE),
s"The topic $topic should not exist")
}

42
core/src/test/scala/unit/kafka/server/MetadataRequestTest.scala

@ -108,11 +108,41 @@ class MetadataRequestTest extends BaseRequestTest { @@ -108,11 +108,41 @@ class MetadataRequestTest extends BaseRequestTest {
// v0, Doesn't support a "no topics" request
// v1, Empty list represents "no topics"
val metadataResponse = sendMetadataRequest(new MetadataRequest(List[String]().asJava, 1.toShort))
val metadataResponse = sendMetadataRequest(new MetadataRequest(List[String]().asJava, true, 1.toShort))
assertTrue("Response should have no errors", metadataResponse.errors.isEmpty)
assertTrue("Response should have no topics", metadataResponse.topicMetadata.isEmpty)
}
@Test
def testAutoTopicCreation(): Unit = {
def checkAutoCreatedTopic(existingTopic: String, autoCreatedTopic: String, response: MetadataResponse): Unit = {
assertNull(response.errors.get(existingTopic))
assertEquals(Errors.LEADER_NOT_AVAILABLE, response.errors.get(autoCreatedTopic))
assertEquals(Some(servers.head.config.numPartitions), zkUtils.getTopicPartitionCount(autoCreatedTopic))
for (i <- 0 until servers.head.config.numPartitions)
TestUtils.waitUntilMetadataIsPropagated(servers, autoCreatedTopic, i)
}
val topic1 = "t1"
val topic2 = "t2"
val topic3 = "t3"
val topic4 = "t4"
TestUtils.createTopic(zkUtils, topic1, 1, 1, servers)
val response1 = sendMetadataRequest(new MetadataRequest(Seq(topic1, topic2).asJava, true, ApiKeys.METADATA.latestVersion))
checkAutoCreatedTopic(topic1, topic2, response1)
// V3 doesn't support a configurable allowAutoTopicCreation, so the fact that we set it to `false` has no effect
val response2 = sendMetadataRequest(new MetadataRequest(Seq(topic2, topic3).asJava, false, 3))
checkAutoCreatedTopic(topic2, topic3, response2)
// V4 and higher support a configurable allowAutoTopicCreation
val response3 = sendMetadataRequest(new MetadataRequest(Seq(topic3, topic4).asJava, false, 4))
assertNull(response3.errors.get(topic3))
assertEquals(Errors.UNKNOWN_TOPIC_OR_PARTITION, response3.errors.get(topic4))
assertEquals(None, zkUtils.getTopicPartitionCount(topic4))
}
@Test
def testAllTopicsRequest() {
// create some topics
@ -120,7 +150,7 @@ class MetadataRequestTest extends BaseRequestTest { @@ -120,7 +150,7 @@ class MetadataRequestTest extends BaseRequestTest {
TestUtils.createTopic(zkUtils, "t2", 3, 2, servers)
// v0, Empty list represents all topics
val metadataResponseV0 = sendMetadataRequest(new MetadataRequest(List[String]().asJava, 0.toShort))
val metadataResponseV0 = sendMetadataRequest(new MetadataRequest(List[String]().asJava, true, 0.toShort))
assertTrue("V0 Response should have no errors", metadataResponseV0.errors.isEmpty)
assertEquals("V0 Response should have 2 (all) topics", 2, metadataResponseV0.topicMetadata.size())
@ -139,7 +169,7 @@ class MetadataRequestTest extends BaseRequestTest { @@ -139,7 +169,7 @@ class MetadataRequestTest extends BaseRequestTest {
TestUtils.createTopic(zkUtils, replicaDownTopic, 1, replicaCount, servers)
// Kill a replica node that is not the leader
val metadataResponse = sendMetadataRequest(new MetadataRequest(List(replicaDownTopic).asJava, 1.toShort))
val metadataResponse = sendMetadataRequest(new MetadataRequest(List(replicaDownTopic).asJava, true, 1.toShort))
val partitionMetadata = metadataResponse.topicMetadata.asScala.head.partitionMetadata.asScala.head
val downNode = servers.find { server =>
val serverId = server.apis.brokerId
@ -150,14 +180,14 @@ class MetadataRequestTest extends BaseRequestTest { @@ -150,14 +180,14 @@ class MetadataRequestTest extends BaseRequestTest {
downNode.shutdown()
TestUtils.waitUntilTrue(() => {
val response = sendMetadataRequest(new MetadataRequest(List(replicaDownTopic).asJava, 1.toShort))
val response = sendMetadataRequest(new MetadataRequest(List(replicaDownTopic).asJava, true, 1.toShort))
val metadata = response.topicMetadata.asScala.head.partitionMetadata.asScala.head
val replica = metadata.replicas.asScala.find(_.id == downNode.apis.brokerId).get
replica.host == "" & replica.port == -1
}, "Replica was not found down", 5000)
// Validate version 0 still filters unavailable replicas and contains error
val v0MetadataResponse = sendMetadataRequest(new MetadataRequest(List(replicaDownTopic).asJava, 0.toShort))
val v0MetadataResponse = sendMetadataRequest(new MetadataRequest(List(replicaDownTopic).asJava, true, 0.toShort))
val v0BrokerIds = v0MetadataResponse.brokers().asScala.map(_.id).toSeq
assertTrue("Response should have no errors", v0MetadataResponse.errors.isEmpty)
assertFalse(s"The downed broker should not be in the brokers list", v0BrokerIds.contains(downNode))
@ -167,7 +197,7 @@ class MetadataRequestTest extends BaseRequestTest { @@ -167,7 +197,7 @@ class MetadataRequestTest extends BaseRequestTest {
assertTrue(s"Response should have ${replicaCount - 1} replicas", v0PartitionMetadata.replicas.size == replicaCount - 1)
// Validate version 1 returns unavailable replicas with no error
val v1MetadataResponse = sendMetadataRequest(new MetadataRequest(List(replicaDownTopic).asJava, 1.toShort))
val v1MetadataResponse = sendMetadataRequest(new MetadataRequest(List(replicaDownTopic).asJava, true, 1.toShort))
val v1BrokerIds = v1MetadataResponse.brokers().asScala.map(_.id).toSeq
assertTrue("Response should have no errors", v1MetadataResponse.errors.isEmpty)
assertFalse(s"The downed broker should not be in the brokers list", v1BrokerIds.contains(downNode))

2
core/src/test/scala/unit/kafka/server/RequestQuotaTest.scala

@ -173,7 +173,7 @@ class RequestQuotaTest extends BaseRequestTest { @@ -173,7 +173,7 @@ class RequestQuotaTest extends BaseRequestTest {
FetchRequest.Builder.forConsumer(0, 0, partitionMap)
case ApiKeys.METADATA =>
new MetadataRequest.Builder(List(topic).asJava)
new MetadataRequest.Builder(List(topic).asJava, true)
case ApiKeys.LIST_OFFSETS =>
ListOffsetRequest.Builder.forConsumer(false, IsolationLevel.READ_UNCOMMITTED)

8
streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsKafkaClient.java

@ -101,7 +101,8 @@ public class StreamsKafkaClient { @@ -101,7 +101,8 @@ public class StreamsKafkaClient {
final Metadata metadata = new Metadata(streamsConfig.getLong(
StreamsConfig.RETRY_BACKOFF_MS_CONFIG),
streamsConfig.getLong(StreamsConfig.METADATA_MAX_AGE_CONFIG)
streamsConfig.getLong(StreamsConfig.METADATA_MAX_AGE_CONFIG),
false
);
final List<InetSocketAddress> addresses = ClientUtils.parseAndValidateAddresses(streamsConfig.getList(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG));
metadata.update(Cluster.bootstrap(addresses), Collections.<String>emptySet(), time.milliseconds());
@ -241,7 +242,8 @@ public class StreamsKafkaClient { @@ -241,7 +242,8 @@ public class StreamsKafkaClient {
private String getAnyReadyBrokerId() {
final Metadata metadata = new Metadata(
streamsConfig.getLong(StreamsConfig.RETRY_BACKOFF_MS_CONFIG),
streamsConfig.getLong(StreamsConfig.METADATA_MAX_AGE_CONFIG));
streamsConfig.getLong(StreamsConfig.METADATA_MAX_AGE_CONFIG),
false);
final List<InetSocketAddress> addresses = ClientUtils.parseAndValidateAddresses(streamsConfig.getList(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG));
metadata.update(Cluster.bootstrap(addresses), Collections.<String>emptySet(), Time.SYSTEM.milliseconds());
@ -289,7 +291,7 @@ public class StreamsKafkaClient { @@ -289,7 +291,7 @@ public class StreamsKafkaClient {
final ClientRequest clientRequest = kafkaClient.newClientRequest(
getAnyReadyBrokerId(),
new MetadataRequest.Builder(null),
MetadataRequest.Builder.allTopics(),
Time.SYSTEM.milliseconds(),
true);
final ClientResponse clientResponse = sendRequest(clientRequest);

Loading…
Cancel
Save