Browse Source

KAFKA-7832: Use automatic RPC generation in CreateTopics (#5972)

Reviewers: Jun Rao <junrao@gmail.com>, Tom Bentley <tbentley@redhat.com>, Boyang Chen <bchen11@outlook.com>
pull/6235/head
Colin Patrick McCabe 6 years ago committed by GitHub
parent
commit
e2e8bdbd8c
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 29
      clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java
  2. 33
      clients/src/main/java/org/apache/kafka/clients/admin/NewTopic.java
  3. 6
      clients/src/main/java/org/apache/kafka/common/protocol/ApiKeys.java
  4. 2
      clients/src/main/java/org/apache/kafka/common/requests/AbstractResponse.java
  5. 315
      clients/src/main/java/org/apache/kafka/common/requests/CreateTopicsRequest.java
  6. 100
      clients/src/main/java/org/apache/kafka/common/requests/CreateTopicsResponse.java
  7. 4
      clients/src/main/resources/common/message/CreateTopicsRequest.json
  8. 29
      clients/src/test/java/org/apache/kafka/clients/admin/KafkaAdminClientTest.java
  9. 3
      clients/src/test/java/org/apache/kafka/common/message/MessageTest.java
  10. 2
      clients/src/test/java/org/apache/kafka/common/requests/RequestContextTest.java
  11. 54
      clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java
  12. 12
      connect/runtime/src/test/java/org/apache/kafka/connect/util/TopicAdminTest.java
  13. 97
      core/src/main/scala/kafka/server/AdminManager.scala
  14. 89
      core/src/main/scala/kafka/server/KafkaApis.scala
  15. 10
      core/src/test/scala/integration/kafka/api/AuthorizerIntegrationTest.scala
  16. 116
      core/src/test/scala/unit/kafka/server/AbstractCreateTopicsRequestTest.scala
  17. 142
      core/src/test/scala/unit/kafka/server/CreateTopicsRequestTest.scala
  18. 78
      core/src/test/scala/unit/kafka/server/CreateTopicsRequestWithPolicyTest.scala
  19. 14
      core/src/test/scala/unit/kafka/server/RequestQuotaTest.scala

29
clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java

@ -59,6 +59,9 @@ import org.apache.kafka.common.errors.UnknownServerException; @@ -59,6 +59,9 @@ import org.apache.kafka.common.errors.UnknownServerException;
import org.apache.kafka.common.errors.UnknownTopicOrPartitionException;
import org.apache.kafka.common.errors.UnsupportedVersionException;
import org.apache.kafka.common.internals.KafkaFutureImpl;
import org.apache.kafka.common.message.CreateTopicsRequestData;
import org.apache.kafka.common.message.CreateTopicsRequestData.CreatableTopicSet;
import org.apache.kafka.common.message.CreateTopicsResponseData.CreatableTopicResult;
import org.apache.kafka.common.metrics.JmxReporter;
import org.apache.kafka.common.metrics.MetricConfig;
import org.apache.kafka.common.metrics.Metrics;
@ -1252,7 +1255,7 @@ public class KafkaAdminClient extends AdminClient { @@ -1252,7 +1255,7 @@ public class KafkaAdminClient extends AdminClient {
public CreateTopicsResult createTopics(final Collection<NewTopic> newTopics,
final CreateTopicsOptions options) {
final Map<String, KafkaFutureImpl<Void>> topicFutures = new HashMap<>(newTopics.size());
final Map<String, CreateTopicsRequest.TopicDetails> topicsMap = new HashMap<>(newTopics.size());
final CreatableTopicSet topics = new CreatableTopicSet();
for (NewTopic newTopic : newTopics) {
if (topicNameIsUnrepresentable(newTopic.name())) {
KafkaFutureImpl<Void> future = new KafkaFutureImpl<>();
@ -1261,7 +1264,7 @@ public class KafkaAdminClient extends AdminClient { @@ -1261,7 +1264,7 @@ public class KafkaAdminClient extends AdminClient {
topicFutures.put(newTopic.name(), future);
} else if (!topicFutures.containsKey(newTopic.name())) {
topicFutures.put(newTopic.name(), new KafkaFutureImpl<>());
topicsMap.put(newTopic.name(), newTopic.convertToTopicDetails());
topics.add(newTopic.convertToCreatableTopic());
}
}
final long now = time.milliseconds();
@ -1270,27 +1273,33 @@ public class KafkaAdminClient extends AdminClient { @@ -1270,27 +1273,33 @@ public class KafkaAdminClient extends AdminClient {
@Override
public AbstractRequest.Builder createRequest(int timeoutMs) {
return new CreateTopicsRequest.Builder(topicsMap, timeoutMs, options.shouldValidateOnly());
return new CreateTopicsRequest.Builder(
new CreateTopicsRequestData().
setTopics(topics).
setTimeoutMs(timeoutMs).
setValidateOnly(options.shouldValidateOnly()));
}
@Override
public void handleResponse(AbstractResponse abstractResponse) {
CreateTopicsResponse response = (CreateTopicsResponse) abstractResponse;
// Check for controller change
for (ApiError error : response.errors().values()) {
if (error.error() == Errors.NOT_CONTROLLER) {
for (Errors error : response.errorCounts().keySet()) {
if (error == Errors.NOT_CONTROLLER) {
metadataManager.clearController();
metadataManager.requestUpdate();
throw error.exception();
}
}
// Handle server responses for particular topics.
for (Map.Entry<String, ApiError> entry : response.errors().entrySet()) {
KafkaFutureImpl<Void> future = topicFutures.get(entry.getKey());
for (CreatableTopicResult result : response.data().topics()) {
KafkaFutureImpl<Void> future = topicFutures.get(result.name());
if (future == null) {
log.warn("Server response mentioned unknown topic {}", entry.getKey());
log.warn("Server response mentioned unknown topic {}", result.name());
} else {
ApiException exception = entry.getValue().exception();
ApiError error = new ApiError(
Errors.forCode(result.errorCode()), result.errorMessage());
ApiException exception = error.exception();
if (exception != null) {
future.completeExceptionally(exception);
} else {
@ -1313,7 +1322,7 @@ public class KafkaAdminClient extends AdminClient { @@ -1313,7 +1322,7 @@ public class KafkaAdminClient extends AdminClient {
completeAllExceptionally(topicFutures.values(), throwable);
}
};
if (!topicsMap.isEmpty()) {
if (!topics.isEmpty()) {
runnable.call(call, now);
}
return new CreateTopicsResult(new HashMap<>(topicFutures));

33
clients/src/main/java/org/apache/kafka/clients/admin/NewTopic.java

@ -17,12 +17,15 @@ @@ -17,12 +17,15 @@
package org.apache.kafka.clients.admin;
import org.apache.kafka.common.requests.CreateTopicsRequest.TopicDetails;
import org.apache.kafka.common.message.CreateTopicsRequestData.CreatableReplicaAssignment;
import org.apache.kafka.common.message.CreateTopicsRequestData.CreatableTopic;
import org.apache.kafka.common.message.CreateTopicsRequestData.CreateableTopicConfig;
import java.util.Collection;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
/**
* A new topic to be created via {@link AdminClient#createTopics(Collection)}.
@ -105,20 +108,28 @@ public class NewTopic { @@ -105,20 +108,28 @@ public class NewTopic {
return configs;
}
TopicDetails convertToTopicDetails() {
CreatableTopic convertToCreatableTopic() {
CreatableTopic creatableTopic = new CreatableTopic().
setName(name).
setNumPartitions(numPartitions).
setReplicationFactor(replicationFactor);
if (replicasAssignments != null) {
if (configs != null) {
return new TopicDetails(replicasAssignments, configs);
} else {
return new TopicDetails(replicasAssignments);
for (Entry<Integer, List<Integer>> entry : replicasAssignments.entrySet()) {
creatableTopic.assignments().add(
new CreatableReplicaAssignment().
setPartitionIndex(entry.getKey()).
setBrokerIds(entry.getValue()));
}
} else {
if (configs != null) {
return new TopicDetails(numPartitions, replicationFactor, configs);
} else {
return new TopicDetails(numPartitions, replicationFactor);
}
if (configs != null) {
for (Entry<String, String> entry : configs.entrySet()) {
creatableTopic.configs().add(
new CreateableTopicConfig().
setName(entry.getKey()).
setValue(entry.getValue()));
}
}
return creatableTopic;
}
@Override

6
clients/src/main/java/org/apache/kafka/common/protocol/ApiKeys.java

@ -16,6 +16,8 @@ @@ -16,6 +16,8 @@
*/
package org.apache.kafka.common.protocol;
import org.apache.kafka.common.message.CreateTopicsRequestData;
import org.apache.kafka.common.message.CreateTopicsResponseData;
import org.apache.kafka.common.message.ElectPreferredLeadersRequestData;
import org.apache.kafka.common.message.ElectPreferredLeadersResponseData;
import org.apache.kafka.common.message.LeaveGroupRequestData;
@ -43,8 +45,6 @@ import org.apache.kafka.common.requests.CreateDelegationTokenRequest; @@ -43,8 +45,6 @@ import org.apache.kafka.common.requests.CreateDelegationTokenRequest;
import org.apache.kafka.common.requests.CreateDelegationTokenResponse;
import org.apache.kafka.common.requests.CreatePartitionsRequest;
import org.apache.kafka.common.requests.CreatePartitionsResponse;
import org.apache.kafka.common.requests.CreateTopicsRequest;
import org.apache.kafka.common.requests.CreateTopicsResponse;
import org.apache.kafka.common.requests.DeleteAclsRequest;
import org.apache.kafka.common.requests.DeleteAclsResponse;
import org.apache.kafka.common.requests.DeleteGroupsRequest;
@ -152,7 +152,7 @@ public enum ApiKeys { @@ -152,7 +152,7 @@ public enum ApiKeys {
return parseResponse(version, buffer, (short) 0);
}
},
CREATE_TOPICS(19, "CreateTopics", CreateTopicsRequest.schemaVersions(), CreateTopicsResponse.schemaVersions()),
CREATE_TOPICS(19, "CreateTopics", CreateTopicsRequestData.SCHEMAS, CreateTopicsResponseData.SCHEMAS),
DELETE_TOPICS(20, "DeleteTopics", DeleteTopicsRequest.schemaVersions(), DeleteTopicsResponse.schemaVersions()),
DELETE_RECORDS(21, "DeleteRecords", DeleteRecordsRequest.schemaVersions(), DeleteRecordsResponse.schemaVersions()),
INIT_PRODUCER_ID(22, "InitProducerId", InitProducerIdRequest.schemaVersions(),

2
clients/src/main/java/org/apache/kafka/common/requests/AbstractResponse.java

@ -109,7 +109,7 @@ public abstract class AbstractResponse extends AbstractRequestResponse { @@ -109,7 +109,7 @@ public abstract class AbstractResponse extends AbstractRequestResponse {
case API_VERSIONS:
return new ApiVersionsResponse(struct);
case CREATE_TOPICS:
return new CreateTopicsResponse(struct);
return new CreateTopicsResponse(struct, version);
case DELETE_TOPICS:
return new DeleteTopicsResponse(struct);
case DELETE_RECORDS:

315
clients/src/main/java/org/apache/kafka/common/requests/CreateTopicsRequest.java

@ -18,286 +18,73 @@ package org.apache.kafka.common.requests; @@ -18,286 +18,73 @@ package org.apache.kafka.common.requests;
import org.apache.kafka.common.errors.UnsupportedVersionException;
import org.apache.kafka.common.protocol.ApiKeys;
import org.apache.kafka.common.protocol.types.ArrayOf;
import org.apache.kafka.common.protocol.types.Field;
import org.apache.kafka.common.protocol.types.Schema;
import org.apache.kafka.common.protocol.types.Struct;
import org.apache.kafka.common.message.CreateTopicsRequestData;
import org.apache.kafka.common.message.CreateTopicsRequestData.CreatableTopic;
import org.apache.kafka.common.message.CreateTopicsResponseData;
import org.apache.kafka.common.message.CreateTopicsResponseData.CreatableTopicResult;
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import static org.apache.kafka.common.protocol.CommonFields.PARTITION_ID;
import static org.apache.kafka.common.protocol.CommonFields.TOPIC_NAME;
import static org.apache.kafka.common.protocol.types.Type.BOOLEAN;
import static org.apache.kafka.common.protocol.types.Type.INT16;
import static org.apache.kafka.common.protocol.types.Type.INT32;
import static org.apache.kafka.common.protocol.types.Type.NULLABLE_STRING;
import static org.apache.kafka.common.protocol.types.Type.STRING;
public class CreateTopicsRequest extends AbstractRequest {
private static final String REQUESTS_KEY_NAME = "create_topic_requests";
private static final String TIMEOUT_KEY_NAME = "timeout";
private static final String VALIDATE_ONLY_KEY_NAME = "validate_only";
private static final String NUM_PARTITIONS_KEY_NAME = "num_partitions";
private static final String REPLICATION_FACTOR_KEY_NAME = "replication_factor";
private static final String REPLICA_ASSIGNMENT_KEY_NAME = "replica_assignment";
private static final String REPLICA_ASSIGNMENT_REPLICAS_KEY_NAME = "replicas";
private static final String CONFIG_NAME_KEY_NAME = "config_name";
private static final String CONFIG_VALUE_KEY_NAME = "config_value";
private static final String CONFIG_ENTRIES_KEY_NAME = "config_entries";
private static final Schema CONFIG_ENTRY = new Schema(
new Field(CONFIG_NAME_KEY_NAME, STRING, "Configuration name"),
new Field(CONFIG_VALUE_KEY_NAME, NULLABLE_STRING, "Configuration value"));
private static final Schema PARTITION_REPLICA_ASSIGNMENT_ENTRY = new Schema(
PARTITION_ID,
new Field(REPLICA_ASSIGNMENT_REPLICAS_KEY_NAME, new ArrayOf(INT32), "The set of all nodes that should " +
"host this partition. The first replica in the list is the preferred leader."));
private static final Schema SINGLE_CREATE_TOPIC_REQUEST_V0 = new Schema(
TOPIC_NAME,
new Field(NUM_PARTITIONS_KEY_NAME, INT32, "Number of partitions to be created. -1 indicates unset."),
new Field(REPLICATION_FACTOR_KEY_NAME, INT16, "Replication factor for the topic. -1 indicates unset."),
new Field(REPLICA_ASSIGNMENT_KEY_NAME, new ArrayOf(PARTITION_REPLICA_ASSIGNMENT_ENTRY),
"Replica assignment among kafka brokers for this topic partitions. If this is set num_partitions " +
"and replication_factor must be unset."),
new Field(CONFIG_ENTRIES_KEY_NAME, new ArrayOf(CONFIG_ENTRY), "Topic level configuration for topic to be set."));
private static final Schema SINGLE_CREATE_TOPIC_REQUEST_V1 = SINGLE_CREATE_TOPIC_REQUEST_V0;
private static final Schema CREATE_TOPICS_REQUEST_V0 = new Schema(
new Field(REQUESTS_KEY_NAME, new ArrayOf(SINGLE_CREATE_TOPIC_REQUEST_V0),
"An array of single topic creation requests. Can not have multiple entries for the same topic."),
new Field(TIMEOUT_KEY_NAME, INT32, "The time in ms to wait for a topic to be completely created on the " +
"controller node. Values <= 0 will trigger topic creation and return immediately"));
private static final Schema CREATE_TOPICS_REQUEST_V1 = new Schema(
new Field(REQUESTS_KEY_NAME, new ArrayOf(SINGLE_CREATE_TOPIC_REQUEST_V1), "An array of single " +
"topic creation requests. Can not have multiple entries for the same topic."),
new Field(TIMEOUT_KEY_NAME, INT32, "The time in ms to wait for a topic to be completely created on the " +
"controller node. Values <= 0 will trigger topic creation and return immediately"),
new Field(VALIDATE_ONLY_KEY_NAME, BOOLEAN, "If this is true, the request will be validated, but the " +
"topic won't be created."));
/* v2 request is the same as v1. Throttle time has been added to the response */
private static final Schema CREATE_TOPICS_REQUEST_V2 = CREATE_TOPICS_REQUEST_V1;
/**
* The version number is bumped to indicate that on quota violation brokers send out responses before throttling.
*/
private static final Schema CREATE_TOPICS_REQUEST_V3 = CREATE_TOPICS_REQUEST_V2;
public static Schema[] schemaVersions() {
return new Schema[]{CREATE_TOPICS_REQUEST_V0, CREATE_TOPICS_REQUEST_V1, CREATE_TOPICS_REQUEST_V2,
CREATE_TOPICS_REQUEST_V3};
}
public static final class TopicDetails {
public final int numPartitions;
public final short replicationFactor;
public final Map<Integer, List<Integer>> replicasAssignments;
public final Map<String, String> configs;
private TopicDetails(int numPartitions,
short replicationFactor,
Map<Integer, List<Integer>> replicasAssignments,
Map<String, String> configs) {
this.numPartitions = numPartitions;
this.replicationFactor = replicationFactor;
this.replicasAssignments = replicasAssignments;
this.configs = configs;
}
public TopicDetails(int partitions,
short replicationFactor,
Map<String, String> configs) {
this(partitions, replicationFactor, Collections.emptyMap(), configs);
}
public TopicDetails(int partitions,
short replicationFactor) {
this(partitions, replicationFactor, Collections.emptyMap());
}
public TopicDetails(Map<Integer, List<Integer>> replicasAssignments,
Map<String, String> configs) {
this(NO_NUM_PARTITIONS, NO_REPLICATION_FACTOR, replicasAssignments, configs);
}
public TopicDetails(Map<Integer, List<Integer>> replicasAssignments) {
this(replicasAssignments, Collections.emptyMap());
}
@Override
public String toString() {
StringBuilder bld = new StringBuilder();
bld.append("(numPartitions=").append(numPartitions).
append(", replicationFactor=").append(replicationFactor).
append(", replicasAssignments=").append(replicasAssignments).
append(", configs=").append(configs).
append(")");
return bld.toString();
}
}
public static class Builder extends AbstractRequest.Builder<CreateTopicsRequest> {
private final Map<String, TopicDetails> topics;
private final int timeout;
private final boolean validateOnly; // introduced in V1
public Builder(Map<String, TopicDetails> topics, int timeout) {
this(topics, timeout, false);
}
private final CreateTopicsRequestData data;
public Builder(Map<String, TopicDetails> topics, int timeout, boolean validateOnly) {
public Builder(CreateTopicsRequestData data) {
super(ApiKeys.CREATE_TOPICS);
this.topics = topics;
this.timeout = timeout;
this.validateOnly = validateOnly;
this.data = data;
}
@Override
public CreateTopicsRequest build(short version) {
if (validateOnly && version == 0)
if (data.validateOnly() && version == 0)
throw new UnsupportedVersionException("validateOnly is not supported in version 0 of " +
"CreateTopicsRequest");
return new CreateTopicsRequest(topics, timeout, validateOnly, version);
return new CreateTopicsRequest(data, version);
}
@Override
public String toString() {
StringBuilder bld = new StringBuilder();
bld.append("(type=CreateTopicsRequest").
append(", topics=").append(topics).
append(", timeout=").append(timeout).
append(", validateOnly=").append(validateOnly).
append(")");
return bld.toString();
return data.toString();
}
}
private final Map<String, TopicDetails> topics;
private final Integer timeout;
private final boolean validateOnly; // introduced in V1
// Set to handle special case where 2 requests for the same topic exist on the wire.
// This allows the broker to return an error code for these topics.
private final Set<String> duplicateTopics;
private final CreateTopicsRequestData data;
private final short version;
public static final int NO_NUM_PARTITIONS = -1;
public static final short NO_REPLICATION_FACTOR = -1;
private CreateTopicsRequest(Map<String, TopicDetails> topics, Integer timeout, boolean validateOnly, short version) {
private CreateTopicsRequest(CreateTopicsRequestData data, short version) {
super(ApiKeys.CREATE_TOPICS, version);
this.topics = topics;
this.timeout = timeout;
this.validateOnly = validateOnly;
this.duplicateTopics = Collections.emptySet();
this.data = data;
this.version = version;
}
public CreateTopicsRequest(Struct struct, short version) {
super(ApiKeys.CREATE_TOPICS, version);
this.data = new CreateTopicsRequestData(struct, version);
this.version = version;
}
Object[] requestStructs = struct.getArray(REQUESTS_KEY_NAME);
Map<String, TopicDetails> topics = new HashMap<>();
Set<String> duplicateTopics = new HashSet<>();
for (Object requestStructObj : requestStructs) {
Struct singleRequestStruct = (Struct) requestStructObj;
String topic = singleRequestStruct.get(TOPIC_NAME);
if (topics.containsKey(topic))
duplicateTopics.add(topic);
int numPartitions = singleRequestStruct.getInt(NUM_PARTITIONS_KEY_NAME);
short replicationFactor = singleRequestStruct.getShort(REPLICATION_FACTOR_KEY_NAME);
//replica assignment
Object[] assignmentsArray = singleRequestStruct.getArray(REPLICA_ASSIGNMENT_KEY_NAME);
Map<Integer, List<Integer>> partitionReplicaAssignments = new HashMap<>(assignmentsArray.length);
for (Object assignmentStructObj : assignmentsArray) {
Struct assignmentStruct = (Struct) assignmentStructObj;
Integer partitionId = assignmentStruct.get(PARTITION_ID);
Object[] replicasArray = assignmentStruct.getArray(REPLICA_ASSIGNMENT_REPLICAS_KEY_NAME);
List<Integer> replicas = new ArrayList<>(replicasArray.length);
for (Object replica : replicasArray) {
replicas.add((Integer) replica);
}
partitionReplicaAssignments.put(partitionId, replicas);
}
Object[] configArray = singleRequestStruct.getArray(CONFIG_ENTRIES_KEY_NAME);
Map<String, String> configs = new HashMap<>(configArray.length);
for (Object configStructObj : configArray) {
Struct configStruct = (Struct) configStructObj;
String key = configStruct.getString(CONFIG_NAME_KEY_NAME);
String value = configStruct.getString(CONFIG_VALUE_KEY_NAME);
configs.put(key, value);
}
TopicDetails args = new TopicDetails(numPartitions, replicationFactor, partitionReplicaAssignments, configs);
topics.put(topic, args);
}
this.topics = topics;
this.timeout = struct.getInt(TIMEOUT_KEY_NAME);
if (struct.hasField(VALIDATE_ONLY_KEY_NAME))
this.validateOnly = struct.getBoolean(VALIDATE_ONLY_KEY_NAME);
else
this.validateOnly = false;
this.duplicateTopics = duplicateTopics;
public CreateTopicsRequestData data() {
return data;
}
@Override
public AbstractResponse getErrorResponse(int throttleTimeMs, Throwable e) {
Map<String, ApiError> topicErrors = new HashMap<>();
for (String topic : topics.keySet()) {
topicErrors.put(topic, ApiError.fromThrowable(e));
CreateTopicsResponseData response = new CreateTopicsResponseData();
if (version() >= 2) {
response.setThrottleTimeMs(throttleTimeMs);
}
short versionId = version();
switch (versionId) {
case 0:
case 1:
return new CreateTopicsResponse(topicErrors);
case 2:
case 3:
return new CreateTopicsResponse(throttleTimeMs, topicErrors);
default:
throw new IllegalArgumentException(String.format("Version %d is not valid. Valid versions for %s are 0 to %d",
versionId, this.getClass().getSimpleName(), ApiKeys.CREATE_TOPICS.latestVersion()));
ApiError apiError = ApiError.fromThrowable(e);
for (CreatableTopic topic : data.topics()) {
response.topics().add(new CreatableTopicResult().
setName(topic.name()).
setErrorCode(apiError.error().code()).
setErrorMessage(apiError.message()));
}
}
public Map<String, TopicDetails> topics() {
return this.topics;
}
public int timeout() {
return this.timeout;
}
public boolean validateOnly() {
return validateOnly;
}
public Set<String> duplicateTopics() {
return this.duplicateTopics;
return new CreateTopicsResponse(response);
}
public static CreateTopicsRequest parse(ByteBuffer buffer, short version) {
@ -309,46 +96,6 @@ public class CreateTopicsRequest extends AbstractRequest { @@ -309,46 +96,6 @@ public class CreateTopicsRequest extends AbstractRequest {
*/
@Override
public Struct toStruct() {
short version = version();
Struct struct = new Struct(ApiKeys.CREATE_TOPICS.requestSchema(version));
List<Struct> createTopicRequestStructs = new ArrayList<>(topics.size());
for (Map.Entry<String, TopicDetails> entry : topics.entrySet()) {
Struct singleRequestStruct = struct.instance(REQUESTS_KEY_NAME);
String topic = entry.getKey();
TopicDetails args = entry.getValue();
singleRequestStruct.set(TOPIC_NAME, topic);
singleRequestStruct.set(NUM_PARTITIONS_KEY_NAME, args.numPartitions);
singleRequestStruct.set(REPLICATION_FACTOR_KEY_NAME, args.replicationFactor);
// replica assignment
List<Struct> replicaAssignmentsStructs = new ArrayList<>(args.replicasAssignments.size());
for (Map.Entry<Integer, List<Integer>> partitionReplicaAssignment : args.replicasAssignments.entrySet()) {
Struct replicaAssignmentStruct = singleRequestStruct.instance(REPLICA_ASSIGNMENT_KEY_NAME);
replicaAssignmentStruct.set(PARTITION_ID, partitionReplicaAssignment.getKey());
replicaAssignmentStruct.set(REPLICA_ASSIGNMENT_REPLICAS_KEY_NAME, partitionReplicaAssignment.getValue().toArray());
replicaAssignmentsStructs.add(replicaAssignmentStruct);
}
singleRequestStruct.set(REPLICA_ASSIGNMENT_KEY_NAME, replicaAssignmentsStructs.toArray());
// configs
List<Struct> configsStructs = new ArrayList<>(args.configs.size());
for (Map.Entry<String, String> configEntry : args.configs.entrySet()) {
Struct configStruct = singleRequestStruct.instance(CONFIG_ENTRIES_KEY_NAME);
configStruct.set(CONFIG_NAME_KEY_NAME, configEntry.getKey());
configStruct.set(CONFIG_VALUE_KEY_NAME, configEntry.getValue());
configsStructs.add(configStruct);
}
singleRequestStruct.set(CONFIG_ENTRIES_KEY_NAME, configsStructs.toArray());
createTopicRequestStructs.add(singleRequestStruct);
}
struct.set(REQUESTS_KEY_NAME, createTopicRequestStructs.toArray());
struct.set(TIMEOUT_KEY_NAME, timeout);
if (version >= 1)
struct.set(VALIDATE_ONLY_KEY_NAME, validateOnly);
return struct;
return data.toStruct(version);
}
}

100
clients/src/main/java/org/apache/kafka/common/requests/CreateTopicsResponse.java

@ -14,60 +14,20 @@ @@ -14,60 +14,20 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.common.requests;
package org.apache.kafka.common.requests;
import org.apache.kafka.common.message.CreateTopicsResponseData;
import org.apache.kafka.common.message.CreateTopicsResponseData.CreatableTopicResult;
import org.apache.kafka.common.protocol.ApiKeys;
import org.apache.kafka.common.protocol.types.ArrayOf;
import org.apache.kafka.common.protocol.types.Field;
import org.apache.kafka.common.protocol.types.Schema;
import org.apache.kafka.common.protocol.Errors;
import org.apache.kafka.common.protocol.types.Struct;
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import static org.apache.kafka.common.protocol.CommonFields.ERROR_CODE;
import static org.apache.kafka.common.protocol.CommonFields.ERROR_MESSAGE;
import static org.apache.kafka.common.protocol.CommonFields.THROTTLE_TIME_MS;
import static org.apache.kafka.common.protocol.CommonFields.TOPIC_NAME;
public class CreateTopicsResponse extends AbstractResponse {
private static final String TOPIC_ERRORS_KEY_NAME = "topic_errors";
private static final Schema TOPIC_ERROR_CODE = new Schema(
TOPIC_NAME,
ERROR_CODE);
// Improves on TOPIC_ERROR_CODE by adding an error_message to complement the error_code
private static final Schema TOPIC_ERROR = new Schema(
TOPIC_NAME,
ERROR_CODE,
ERROR_MESSAGE);
private static final Schema CREATE_TOPICS_RESPONSE_V0 = new Schema(
new Field(TOPIC_ERRORS_KEY_NAME, new ArrayOf(TOPIC_ERROR_CODE), "An array of per topic error codes."));
private static final Schema CREATE_TOPICS_RESPONSE_V1 = new Schema(
new Field(TOPIC_ERRORS_KEY_NAME, new ArrayOf(TOPIC_ERROR), "An array of per topic errors."));
private static final Schema CREATE_TOPICS_RESPONSE_V2 = new Schema(
THROTTLE_TIME_MS,
new Field(TOPIC_ERRORS_KEY_NAME, new ArrayOf(TOPIC_ERROR), "An array of per topic errors."));
/**
* The version number is bumped to indicate that on quota violation brokers send out responses before throttling.
*/
private static final Schema CREATE_TOPICS_RESPONSE_V3 = CREATE_TOPICS_RESPONSE_V2;
public static Schema[] schemaVersions() {
return new Schema[]{CREATE_TOPICS_RESPONSE_V0, CREATE_TOPICS_RESPONSE_V1, CREATE_TOPICS_RESPONSE_V2,
CREATE_TOPICS_RESPONSE_V3};
}
/**
* Possible error codes:
*
@ -84,63 +44,43 @@ public class CreateTopicsResponse extends AbstractResponse { @@ -84,63 +44,43 @@ public class CreateTopicsResponse extends AbstractResponse {
* POLICY_VIOLATION(44)
*/
private final Map<String, ApiError> errors;
private final int throttleTimeMs;
private final CreateTopicsResponseData data;
public CreateTopicsResponse(Map<String, ApiError> errors) {
this(DEFAULT_THROTTLE_TIME, errors);
public CreateTopicsResponse(CreateTopicsResponseData data) {
this.data = data;
}
public CreateTopicsResponse(int throttleTimeMs, Map<String, ApiError> errors) {
this.throttleTimeMs = throttleTimeMs;
this.errors = errors;
public CreateTopicsResponse(Struct struct, short version) {
this.data = new CreateTopicsResponseData(struct, version);
}
public CreateTopicsResponse(Struct struct) {
Object[] topicErrorStructs = struct.getArray(TOPIC_ERRORS_KEY_NAME);
Map<String, ApiError> errors = new HashMap<>();
for (Object topicErrorStructObj : topicErrorStructs) {
Struct topicErrorStruct = (Struct) topicErrorStructObj;
String topic = topicErrorStruct.get(TOPIC_NAME);
errors.put(topic, new ApiError(topicErrorStruct));
}
this.throttleTimeMs = struct.getOrElse(THROTTLE_TIME_MS, DEFAULT_THROTTLE_TIME);
this.errors = errors;
public CreateTopicsResponseData data() {
return data;
}
@Override
protected Struct toStruct(short version) {
Struct struct = new Struct(ApiKeys.CREATE_TOPICS.responseSchema(version));
struct.setIfExists(THROTTLE_TIME_MS, throttleTimeMs);
List<Struct> topicErrorsStructs = new ArrayList<>(errors.size());
for (Map.Entry<String, ApiError> topicError : errors.entrySet()) {
Struct topicErrorsStruct = struct.instance(TOPIC_ERRORS_KEY_NAME);
topicErrorsStruct.set(TOPIC_NAME, topicError.getKey());
topicError.getValue().write(topicErrorsStruct);
topicErrorsStructs.add(topicErrorsStruct);
}
struct.set(TOPIC_ERRORS_KEY_NAME, topicErrorsStructs.toArray());
return struct;
return data.toStruct(version);
}
@Override
public int throttleTimeMs() {
return throttleTimeMs;
}
public Map<String, ApiError> errors() {
return errors;
return data.throttleTimeMs();
}
@Override
public Map<Errors, Integer> errorCounts() {
return apiErrorCounts(errors);
HashMap<Errors, Integer> counts = new HashMap<>();
for (CreatableTopicResult result : data.topics()) {
Errors error = Errors.forCode(result.errorCode());
counts.put(error, counts.getOrDefault(error, 0) + 1);
}
return counts;
}
public static CreateTopicsResponse parse(ByteBuffer buffer, short version) {
return new CreateTopicsResponse(ApiKeys.CREATE_TOPICS.responseSchema(version).read(buffer));
return new CreateTopicsResponse(
ApiKeys.CREATE_TOPICS.responseSchema(version).read(buffer), version);
}
@Override

4
clients/src/main/resources/common/message/CreateTopicsRequest.json

@ -22,7 +22,7 @@ @@ -22,7 +22,7 @@
"fields": [
{ "name": "Topics", "type": "[]CreatableTopic", "versions": "0+",
"about": "The topics to create.", "fields": [
{ "name": "Name", "type": "string", "versions": "0+",
{ "name": "Name", "type": "string", "versions": "0+", "mapKey": true,
"about": "The topic name." },
{ "name": "NumPartitions", "type": "int32", "versions": "0+",
"about": "The number of partitions to create in the topic, or -1 if we are specifying a manual partition assignment." },
@ -43,7 +43,7 @@ @@ -43,7 +43,7 @@
"about": "The configuration value." }
]}
]},
{ "name": "timeoutMs", "type": "int32", "versions": "0+",
{ "name": "timeoutMs", "type": "int32", "versions": "0+", "default": "60000",
"about": "How long to wait in milliseconds before timing out the request." },
{ "name": "validateOnly", "type": "bool", "versions": "1+", "default": "false", "ignorable": false,
"about": "If true, check that the topics can be created as specified, but don't create anything." }

29
clients/src/test/java/org/apache/kafka/clients/admin/KafkaAdminClientTest.java

@ -51,6 +51,8 @@ import org.apache.kafka.common.errors.TimeoutException; @@ -51,6 +51,8 @@ import org.apache.kafka.common.errors.TimeoutException;
import org.apache.kafka.common.errors.TopicDeletionDisabledException;
import org.apache.kafka.common.errors.UnknownServerException;
import org.apache.kafka.common.errors.UnknownTopicOrPartitionException;
import org.apache.kafka.common.message.CreateTopicsResponseData;
import org.apache.kafka.common.message.CreateTopicsResponseData.CreatableTopicResult;
import org.apache.kafka.common.message.ElectPreferredLeadersResponseData;
import org.apache.kafka.common.message.ElectPreferredLeadersResponseData.PartitionResult;
import org.apache.kafka.common.message.ElectPreferredLeadersResponseData.ReplicaElectionResult;
@ -213,6 +215,13 @@ public class KafkaAdminClientTest { @@ -213,6 +215,13 @@ public class KafkaAdminClientTest {
}
}
private static CreateTopicsResponse prepareCreateTopicsResponse(String topicName, Errors error) {
CreateTopicsResponseData data = new CreateTopicsResponseData();
data.topics().add(new CreatableTopicResult().
setName(topicName).setErrorCode(error.code()));
return new CreateTopicsResponse(data);
}
/**
* Test that the client properly times out when we don't receive any metadata.
*/
@ -221,7 +230,7 @@ public class KafkaAdminClientTest { @@ -221,7 +230,7 @@ public class KafkaAdminClientTest {
try (final AdminClientUnitTestEnv env = new AdminClientUnitTestEnv(Time.SYSTEM, mockBootstrapCluster(),
newStrMap(AdminClientConfig.REQUEST_TIMEOUT_MS_CONFIG, "10"))) {
env.kafkaClient().setNodeApiVersions(NodeApiVersions.create());
env.kafkaClient().prepareResponse(new CreateTopicsResponse(Collections.singletonMap("myTopic", new ApiError(Errors.NONE, ""))));
env.kafkaClient().prepareResponse(prepareCreateTopicsResponse("myTopic", Errors.NONE));
KafkaFuture<Void> future = env.adminClient().createTopics(
Collections.singleton(new NewTopic("myTopic", Collections.singletonMap(0, asList(0, 1, 2)))),
new CreateTopicsOptions().timeoutMs(1000)).all();
@ -243,7 +252,7 @@ public class KafkaAdminClientTest { @@ -243,7 +252,7 @@ public class KafkaAdminClientTest {
new MetadataResponse(discoveredCluster.nodes(), discoveredCluster.clusterResource().clusterId(),
1, Collections.emptyList()));
env.kafkaClient().prepareResponse(body -> body instanceof CreateTopicsRequest,
new CreateTopicsResponse(Collections.singletonMap("myTopic", new ApiError(Errors.NONE, ""))));
prepareCreateTopicsResponse("myTopic", Errors.NONE));
KafkaFuture<Void> future = env.adminClient().createTopics(
Collections.singleton(new NewTopic("myTopic", Collections.singletonMap(0, asList(0, 1, 2)))),
@ -267,7 +276,7 @@ public class KafkaAdminClientTest { @@ -267,7 +276,7 @@ public class KafkaAdminClientTest {
new MetadataResponse(discoveredCluster.nodes(), discoveredCluster.clusterResource().clusterId(),
1, Collections.emptyList()));
env.kafkaClient().prepareResponse(body -> body instanceof CreateTopicsRequest,
new CreateTopicsResponse(Collections.singletonMap("myTopic", new ApiError(Errors.NONE, ""))));
prepareCreateTopicsResponse("myTopic", Errors.NONE));
KafkaFuture<Void> future = env.adminClient().createTopics(
Collections.singleton(new NewTopic("myTopic", Collections.singletonMap(0, asList(0, 1, 2)))),
@ -289,7 +298,7 @@ public class KafkaAdminClientTest { @@ -289,7 +298,7 @@ public class KafkaAdminClientTest {
env.kafkaClient().setNodeApiVersions(NodeApiVersions.create());
env.kafkaClient().createPendingAuthenticationError(cluster.nodeById(0),
TimeUnit.DAYS.toMillis(1));
env.kafkaClient().prepareResponse(new CreateTopicsResponse(Collections.singletonMap("myTopic", new ApiError(Errors.NONE, ""))));
env.kafkaClient().prepareResponse(prepareCreateTopicsResponse("myTopic", Errors.NONE));
KafkaFuture<Void> future = env.adminClient().createTopics(
Collections.singleton(new NewTopic("myTopic", Collections.singletonMap(0, asList(0, 1, 2)))),
new CreateTopicsOptions().timeoutMs(1000)).all();
@ -302,7 +311,7 @@ public class KafkaAdminClientTest { @@ -302,7 +311,7 @@ public class KafkaAdminClientTest {
try (AdminClientUnitTestEnv env = mockClientEnv()) {
env.kafkaClient().setNodeApiVersions(NodeApiVersions.create());
env.kafkaClient().prepareResponse(body -> body instanceof CreateTopicsRequest,
new CreateTopicsResponse(Collections.singletonMap("myTopic", new ApiError(Errors.NONE, ""))));
prepareCreateTopicsResponse("myTopic", Errors.NONE));
KafkaFuture<Void> future = env.adminClient().createTopics(
Collections.singleton(new NewTopic("myTopic", Collections.singletonMap(0, asList(0, 1, 2)))),
new CreateTopicsOptions().timeoutMs(10000)).all();
@ -333,7 +342,7 @@ public class KafkaAdminClientTest { @@ -333,7 +342,7 @@ public class KafkaAdminClientTest {
mockClient.prepareResponse(body -> {
secondAttemptTime.set(time.milliseconds());
return body instanceof CreateTopicsRequest;
}, new CreateTopicsResponse(Collections.singletonMap("myTopic", new ApiError(Errors.NONE, ""))));
}, prepareCreateTopicsResponse("myTopic", Errors.NONE));
KafkaFuture<Void> future = env.adminClient().createTopics(
Collections.singleton(new NewTopic("myTopic", Collections.singletonMap(0, asList(0, 1, 2)))),
@ -356,15 +365,15 @@ public class KafkaAdminClientTest { @@ -356,15 +365,15 @@ public class KafkaAdminClientTest {
public void testCreateTopicsHandleNotControllerException() throws Exception {
try (AdminClientUnitTestEnv env = mockClientEnv()) {
env.kafkaClient().setNodeApiVersions(NodeApiVersions.create());
env.kafkaClient().prepareResponseFrom(new CreateTopicsResponse(
Collections.singletonMap("myTopic", new ApiError(Errors.NOT_CONTROLLER, ""))),
env.kafkaClient().prepareResponseFrom(
prepareCreateTopicsResponse("myTopic", Errors.NOT_CONTROLLER),
env.cluster().nodeById(0));
env.kafkaClient().prepareResponse(new MetadataResponse(env.cluster().nodes(),
env.cluster().clusterResource().clusterId(),
1,
Collections.<MetadataResponse.TopicMetadata>emptyList()));
env.kafkaClient().prepareResponseFrom(new CreateTopicsResponse(
Collections.singletonMap("myTopic", new ApiError(Errors.NONE, ""))),
env.kafkaClient().prepareResponseFrom(
prepareCreateTopicsResponse("myTopic", Errors.NONE),
env.cluster().nodeById(1));
KafkaFuture<Void> future = env.adminClient().createTopics(
Collections.singleton(new NewTopic("myTopic", Collections.singletonMap(0, asList(0, 1, 2)))),

3
clients/src/test/java/org/apache/kafka/common/message/MessageTest.java

@ -18,6 +18,7 @@ @@ -18,6 +18,7 @@
package org.apache.kafka.common.message;
import org.apache.kafka.common.errors.UnsupportedVersionException;
import org.apache.kafka.common.message.CreateTopicsRequestData.CreatableTopicSet;
import org.apache.kafka.common.protocol.ApiKeys;
import org.apache.kafka.common.protocol.ByteBufferAccessor;
import org.apache.kafka.common.protocol.Message;
@ -75,7 +76,7 @@ public final class MessageTest { @@ -75,7 +76,7 @@ public final class MessageTest {
setName("Topic").
setPartitions(Collections.singletonList(1))).iterator())));
testMessageRoundTrips(new CreateTopicsRequestData().
setTimeoutMs(1000).setTopics(Collections.emptyList()));
setTimeoutMs(1000).setTopics(new CreatableTopicSet()));
testMessageRoundTrips(new DescribeAclsRequestData().
setResourceType((byte) 42).
setResourceNameFilter(null).

2
clients/src/test/java/org/apache/kafka/common/requests/RequestContextTest.java

@ -69,7 +69,7 @@ public class RequestContextTest { @@ -69,7 +69,7 @@ public class RequestContextTest {
Struct struct = ApiKeys.API_VERSIONS.parseResponse((short) 0, responseBuffer);
ApiVersionsResponse response = (ApiVersionsResponse)
AbstractResponse.parseResponse(ApiKeys.API_VERSIONS, struct, (short) 0);
AbstractResponse.parseResponse(ApiKeys.API_VERSIONS, struct, (short) 0);
assertEquals(Errors.UNSUPPORTED_VERSION, response.error());
assertTrue(response.apiVersions().isEmpty());
}

54
clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java

@ -32,6 +32,12 @@ import org.apache.kafka.common.errors.NotEnoughReplicasException; @@ -32,6 +32,12 @@ import org.apache.kafka.common.errors.NotEnoughReplicasException;
import org.apache.kafka.common.errors.SecurityDisabledException;
import org.apache.kafka.common.errors.UnknownServerException;
import org.apache.kafka.common.errors.UnsupportedVersionException;
import org.apache.kafka.common.message.CreateTopicsRequestData;
import org.apache.kafka.common.message.CreateTopicsRequestData.CreatableReplicaAssignment;
import org.apache.kafka.common.message.CreateTopicsRequestData.CreatableTopic;
import org.apache.kafka.common.message.CreateTopicsRequestData.CreateableTopicConfig;
import org.apache.kafka.common.message.CreateTopicsResponseData;
import org.apache.kafka.common.message.CreateTopicsResponseData.CreatableTopicResult;
import org.apache.kafka.common.message.ElectPreferredLeadersRequestData;
import org.apache.kafka.common.message.ElectPreferredLeadersRequestData.TopicPartitions;
import org.apache.kafka.common.message.ElectPreferredLeadersResponseData;
@ -1034,28 +1040,38 @@ public class RequestResponseTest { @@ -1034,28 +1040,38 @@ public class RequestResponseTest {
}
private CreateTopicsRequest createCreateTopicRequest(int version, boolean validateOnly) {
CreateTopicsRequest.TopicDetails request1 = new CreateTopicsRequest.TopicDetails(3, (short) 5);
Map<Integer, List<Integer>> replicaAssignments = new HashMap<>();
replicaAssignments.put(1, asList(1, 2, 3));
replicaAssignments.put(2, asList(2, 3, 4));
Map<String, String> configs = new HashMap<>();
configs.put("config1", "value1");
CreateTopicsRequest.TopicDetails request2 = new CreateTopicsRequest.TopicDetails(replicaAssignments, configs);
Map<String, CreateTopicsRequest.TopicDetails> request = new HashMap<>();
request.put("my_t1", request1);
request.put("my_t2", request2);
return new CreateTopicsRequest.Builder(request, 0, validateOnly).build((short) version);
CreateTopicsRequestData data = new CreateTopicsRequestData().
setTimeoutMs(123).
setValidateOnly(validateOnly);
data.topics().add(new CreatableTopic().
setNumPartitions(3).
setReplicationFactor((short) 5));
CreatableTopic topic2 = new CreatableTopic();
data.topics().add(topic2);
topic2.assignments().add(new CreatableReplicaAssignment().
setPartitionIndex(0).
setBrokerIds(Arrays.asList(1, 2, 3)));
topic2.assignments().add(new CreatableReplicaAssignment().
setPartitionIndex(1).
setBrokerIds(Arrays.asList(2, 3, 4)));
topic2.configs().add(new CreateableTopicConfig().
setName("config1").setValue("value1"));
return new CreateTopicsRequest.Builder(data).build((short) version);
}
private CreateTopicsResponse createCreateTopicResponse() {
Map<String, ApiError> errors = new HashMap<>();
errors.put("t1", new ApiError(Errors.INVALID_TOPIC_EXCEPTION, null));
errors.put("t2", new ApiError(Errors.LEADER_NOT_AVAILABLE, "Leader with id 5 is not available."));
return new CreateTopicsResponse(errors);
CreateTopicsResponseData data = new CreateTopicsResponseData();
data.topics().add(new CreatableTopicResult().
setName("t1").
setErrorCode(Errors.INVALID_TOPIC_EXCEPTION.code()).
setErrorMessage(null));
data.topics().add(new CreatableTopicResult().
setName("t2").
setErrorCode(Errors.LEADER_NOT_AVAILABLE.code()).
setErrorMessage("Leader with id 5 is not available."));
return new CreateTopicsResponse(data);
}
private DeleteTopicsRequest createDeleteTopicsRequest() {

12
connect/runtime/src/test/java/org/apache/kafka/connect/util/TopicAdminTest.java

@ -20,6 +20,8 @@ import org.apache.kafka.clients.NodeApiVersions; @@ -20,6 +20,8 @@ import org.apache.kafka.clients.NodeApiVersions;
import org.apache.kafka.clients.admin.MockAdminClient;
import org.apache.kafka.clients.admin.AdminClientUnitTestEnv;
import org.apache.kafka.clients.admin.NewTopic;
import org.apache.kafka.common.message.CreateTopicsResponseData;
import org.apache.kafka.common.message.CreateTopicsResponseData.CreatableTopicResult;
import org.apache.kafka.common.Cluster;
import org.apache.kafka.common.Node;
import org.apache.kafka.common.PartitionInfo;
@ -32,7 +34,6 @@ import org.junit.Test; @@ -32,7 +34,6 @@ import org.junit.Test;
import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
import java.util.Set;
import static org.junit.Assert.assertEquals;
@ -137,10 +138,13 @@ public class TopicAdminTest { @@ -137,10 +138,13 @@ public class TopicAdminTest {
private CreateTopicsResponse createTopicResponse(ApiError error, NewTopic... topics) {
if (error == null) error = new ApiError(Errors.NONE, "");
Map<String, ApiError> topicResults = new HashMap<>();
CreateTopicsResponseData response = new CreateTopicsResponseData();
for (NewTopic topic : topics) {
topicResults.put(topic.name(), error);
response.topics().add(new CreatableTopicResult().
setName(topic.name()).
setErrorCode(error.error().code()).
setErrorMessage(error.message()));
}
return new CreateTopicsResponse(topicResults);
return new CreateTopicsResponse(response);
}
}

97
core/src/main/scala/kafka/server/AdminManager.scala

@ -25,8 +25,9 @@ import kafka.metrics.KafkaMetricsGroup @@ -25,8 +25,9 @@ import kafka.metrics.KafkaMetricsGroup
import kafka.utils._
import kafka.zk.{AdminZkClient, KafkaZkClient}
import org.apache.kafka.common.config.{AbstractConfig, ConfigDef, ConfigException, ConfigResource}
import org.apache.kafka.common.errors.{ApiException, InvalidPartitionsException, InvalidReplicaAssignmentException, InvalidRequestException, ReassignmentInProgressException, UnknownTopicOrPartitionException, InvalidConfigurationException}
import org.apache.kafka.common.errors.{ApiException, InvalidConfigurationException, InvalidPartitionsException, InvalidReplicaAssignmentException, InvalidRequestException, ReassignmentInProgressException, UnknownTopicOrPartitionException}
import org.apache.kafka.common.internals.Topic
import org.apache.kafka.common.message.CreateTopicsRequestData.CreatableTopic
import org.apache.kafka.common.metrics.Metrics
import org.apache.kafka.common.network.ListenerName
import org.apache.kafka.common.protocol.Errors
@ -37,7 +38,7 @@ import org.apache.kafka.common.requests.{AlterConfigsRequest, ApiError, Describe @@ -37,7 +38,7 @@ import org.apache.kafka.common.requests.{AlterConfigsRequest, ApiError, Describe
import org.apache.kafka.server.policy.{AlterConfigPolicy, CreateTopicPolicy}
import org.apache.kafka.server.policy.CreateTopicPolicy.RequestMetadata
import scala.collection._
import scala.collection.{mutable, _}
import scala.collection.JavaConverters._
class AdminManager(val config: KafkaConfig,
@ -73,72 +74,89 @@ class AdminManager(val config: KafkaConfig, @@ -73,72 +74,89 @@ class AdminManager(val config: KafkaConfig,
*/
def createTopics(timeout: Int,
validateOnly: Boolean,
createInfo: Map[String, TopicDetails],
toCreate: Map[String, CreatableTopic],
responseCallback: Map[String, ApiError] => Unit) {
// 1. map over topics creating assignment and calling zookeeper
val brokers = metadataCache.getAliveBrokers.map { b => kafka.admin.BrokerMetadata(b.id, b.rack) }
val metadata = createInfo.map { case (topic, arguments) =>
val metadata = toCreate.values.map(topic =>
try {
val configs = new Properties()
arguments.configs.asScala.foreach { case (key, value) =>
configs.setProperty(key, value)
topic.configs().asScala.foreach { case entry =>
configs.setProperty(entry.name(), entry.value())
}
LogConfig.validate(configs)
val assignments = {
if ((arguments.numPartitions != NO_NUM_PARTITIONS || arguments.replicationFactor != NO_REPLICATION_FACTOR)
&& !arguments.replicasAssignments.isEmpty)
throw new InvalidRequestException("Both numPartitions or replicationFactor and replicasAssignments were set. " +
"Both cannot be used at the same time.")
else if (!arguments.replicasAssignments.isEmpty) {
// Note: we don't check that replicaAssignment contains unknown brokers - unlike in add-partitions case,
// this follows the existing logic in TopicCommand
arguments.replicasAssignments.asScala.map { case (partitionId, replicas) =>
(partitionId.intValue, replicas.asScala.map(_.intValue))
}
} else
AdminUtils.assignReplicasToBrokers(brokers, arguments.numPartitions, arguments.replicationFactor)
if ((topic.numPartitions != NO_NUM_PARTITIONS || topic.replicationFactor != NO_REPLICATION_FACTOR)
&& !topic.assignments().isEmpty) {
throw new InvalidRequestException("Both numPartitions or replicationFactor and replicasAssignments were set. " +
"Both cannot be used at the same time.")
}
val assignments = if (topic.assignments().isEmpty) {
AdminUtils.assignReplicasToBrokers(brokers, topic.numPartitions, topic.replicationFactor)
} else {
val assignments = new mutable.HashMap[Int, Seq[Int]]
// Note: we don't check that replicaAssignment contains unknown brokers - unlike in add-partitions case,
// this follows the existing logic in TopicCommand
topic.assignments.asScala.foreach {
case assignment => assignments(assignment.partitionIndex()) =
assignment.brokerIds().asScala.map(a => a: Int)
}
assignments
}
trace(s"Assignments for topic $topic are $assignments ")
createTopicPolicy match {
case Some(policy) =>
adminZkClient.validateTopicCreate(topic, assignments, configs)
adminZkClient.validateTopicCreate(topic.name(), assignments, configs)
// Use `null` for unset fields in the public API
val numPartitions: java.lang.Integer =
if (arguments.numPartitions == NO_NUM_PARTITIONS) null else arguments.numPartitions
if (topic.numPartitions == NO_NUM_PARTITIONS) null else topic.numPartitions
val replicationFactor: java.lang.Short =
if (arguments.replicationFactor == NO_REPLICATION_FACTOR) null else arguments.replicationFactor
val replicaAssignments = if (arguments.replicasAssignments.isEmpty) null else arguments.replicasAssignments
policy.validate(new RequestMetadata(topic, numPartitions, replicationFactor, replicaAssignments,
arguments.configs))
if (topic.replicationFactor == NO_REPLICATION_FACTOR) null else topic.replicationFactor
val javaAssignments = if (topic.assignments().isEmpty) {
null
} else {
val map = new java.util.HashMap[Integer, java.util.List[Integer]]
assignments.foreach {
case (k, v) => {
val list = new java.util.ArrayList[Integer]
v.foreach {
case i => list.add(Integer.valueOf(i))
}
map.put(k, list)
}
}
map
}
val javaConfigs = new java.util.HashMap[String, String]
topic.configs().asScala.foreach(config => javaConfigs.put(config.name(), config.value()))
policy.validate(new RequestMetadata(topic.name, numPartitions, replicationFactor,
javaAssignments, javaConfigs))
if (!validateOnly)
adminZkClient.createTopicWithAssignment(topic, configs, assignments)
adminZkClient.createTopicWithAssignment(topic.name, configs, assignments)
case None =>
if (validateOnly)
adminZkClient.validateTopicCreate(topic, assignments, configs)
adminZkClient.validateTopicCreate(topic.name, assignments, configs)
else
adminZkClient.createTopicWithAssignment(topic, configs, assignments)
adminZkClient.createTopicWithAssignment(topic.name, configs, assignments)
}
CreatePartitionsMetadata(topic, assignments, ApiError.NONE)
CreatePartitionsMetadata(topic.name, assignments, ApiError.NONE)
} catch {
// Log client errors at a lower level than unexpected exceptions
case e: ApiException =>
info(s"Error processing create topic request for topic $topic with arguments $arguments", e)
CreatePartitionsMetadata(topic, Map(), ApiError.fromThrowable(e))
info(s"Error processing create topic request $topic", e)
CreatePartitionsMetadata(topic.name, Map(), ApiError.fromThrowable(e))
case e: ConfigException =>
info(s"Error processing create topic request for topic $topic with arguments $arguments", e)
CreatePartitionsMetadata(topic, Map(), ApiError.fromThrowable(new InvalidConfigurationException(e.getMessage, e.getCause)))
info(s"Error processing create topic request $topic", e)
CreatePartitionsMetadata(topic.name, Map(), ApiError.fromThrowable(new InvalidConfigurationException(e.getMessage, e.getCause)))
case e: Throwable =>
error(s"Error processing create topic request for topic $topic with arguments $arguments", e)
CreatePartitionsMetadata(topic, Map(), ApiError.fromThrowable(e))
}
}
error(s"Error processing create topic request $topic", e)
CreatePartitionsMetadata(topic.name, Map(), ApiError.fromThrowable(e))
})
// 2. if timeout <= 0, validateOnly or no topics can proceed return immediately
if (timeout <= 0 || validateOnly || !metadata.exists(_.error.is(Errors.NONE))) {
@ -154,7 +172,8 @@ class AdminManager(val config: KafkaConfig, @@ -154,7 +172,8 @@ class AdminManager(val config: KafkaConfig,
} else {
// 3. else pass the assignments and errors to the delayed operation and set the keys
val delayedCreate = new DelayedCreatePartitions(timeout, metadata.toSeq, this, responseCallback)
val delayedCreateKeys = createInfo.keys.map(new TopicKey(_)).toSeq
val delayedCreateKeys = toCreate.values.map(
topic => new TopicKey(topic.name())).toSeq
// try to complete the request immediately, otherwise put it into the purgatory
topicPurgatory.tryCompleteElseWatch(delayedCreate, delayedCreateKeys)
}

89
core/src/main/scala/kafka/server/KafkaApis.scala

@ -43,6 +43,9 @@ import org.apache.kafka.common.config.ConfigResource @@ -43,6 +43,9 @@ import org.apache.kafka.common.config.ConfigResource
import org.apache.kafka.common.errors._
import org.apache.kafka.common.internals.FatalExitError
import org.apache.kafka.common.internals.Topic.{GROUP_METADATA_TOPIC_NAME, TRANSACTION_STATE_TOPIC_NAME, isInternal}
import org.apache.kafka.common.message.CreateTopicsRequestData.CreatableTopic
import org.apache.kafka.common.message.CreateTopicsResponseData
import org.apache.kafka.common.message.CreateTopicsResponseData.{CreatableTopicResult, CreatableTopicResultSet}
import org.apache.kafka.common.message.ElectPreferredLeadersResponseData
import org.apache.kafka.common.message.LeaveGroupResponseData
import org.apache.kafka.common.metrics.Metrics
@ -50,7 +53,6 @@ import org.apache.kafka.common.network.{ListenerName, Send} @@ -50,7 +53,6 @@ import org.apache.kafka.common.network.{ListenerName, Send}
import org.apache.kafka.common.protocol.{ApiKeys, Errors}
import org.apache.kafka.common.record._
import org.apache.kafka.common.requests.CreateAclsResponse.AclCreationResponse
import org.apache.kafka.common.requests.CreateTopicsRequest.TopicDetails
import org.apache.kafka.common.requests.DeleteAclsResponse.{AclDeletionResult, AclFilterResponse}
import org.apache.kafka.common.requests.DescribeLogDirsResponse.LogDirInfo
import org.apache.kafka.common.requests.ProduceResponse.PartitionResponse
@ -1393,60 +1395,61 @@ class KafkaApis(val requestChannel: RequestChannel, @@ -1393,60 +1395,61 @@ class KafkaApis(val requestChannel: RequestChannel,
}
def handleCreateTopicsRequest(request: RequestChannel.Request) {
val createTopicsRequest = request.body[CreateTopicsRequest]
def sendResponseCallback(results: Map[String, ApiError]): Unit = {
def sendResponseCallback(results: CreatableTopicResultSet): Unit = {
def createResponse(requestThrottleMs: Int): AbstractResponse = {
val responseBody = new CreateTopicsResponse(requestThrottleMs, results.asJava)
trace(s"Sending create topics response $responseBody for correlation id ${request.header.correlationId} to client ${request.header.clientId}.")
val responseData = new CreateTopicsResponseData().
setThrottleTimeMs(requestThrottleMs).
setTopics(results)
val responseBody = new CreateTopicsResponse(responseData)
trace(s"Sending create topics response $responseData for correlation id " +
"${request.header.correlationId} to client ${request.header.clientId}.")
responseBody
}
sendResponseMaybeThrottle(request, createResponse)
}
val createTopicsRequest = request.body[CreateTopicsRequest]
val results = new CreatableTopicResultSet(createTopicsRequest.data().topics().size())
if (!controller.isActive) {
val results = createTopicsRequest.topics.asScala.map { case (topic, _) =>
(topic, new ApiError(Errors.NOT_CONTROLLER, null))
createTopicsRequest.data.topics.asScala.foreach { case topic =>
results.add(new CreatableTopicResult().setName(topic.name()).
setErrorCode(Errors.NOT_CONTROLLER.code()))
}
sendResponseCallback(results)
} else {
val (validTopics, duplicateTopics) = createTopicsRequest.topics.asScala.partition { case (topic, _) =>
!createTopicsRequest.duplicateTopics.contains(topic)
createTopicsRequest.data.topics.asScala.foreach { case topic =>
results.add(new CreatableTopicResult().setName(topic.name()))
}
val hasClusterAuthorization = authorize(request.session, Create, Resource.ClusterResource)
results.asScala.foreach(topic => {
if (results.findAll(topic.name()).size() > 1) {
topic.setErrorCode(Errors.INVALID_REQUEST.code())
topic.setErrorMessage("Found multiple entries for this topic.")
} else if ((!hasClusterAuthorization) && (!authorize(request.session, Create,
new Resource(Topic, topic.name(), PatternType.LITERAL)))) {
topic.setErrorCode(Errors.TOPIC_AUTHORIZATION_FAILED.code())
topic.setErrorMessage("Authorization failed.")
}
})
val toCreate = mutable.Map[String, CreatableTopic]()
createTopicsRequest.data.topics.asScala.foreach { case topic =>
if (results.find(topic.name()).errorCode() == 0) {
toCreate += topic.name() -> topic
}
}
val (authorizedTopics, unauthorizedTopics) =
if (authorize(request.session, Create, Resource.ClusterResource)) {
(validTopics, Map[String, TopicDetails]())
} else {
validTopics.partition { case (topic, _) =>
authorize(request.session, Create, new Resource(Topic, topic, PatternType.LITERAL))
}
def handleCreateTopicsResults(errors: Map[String, ApiError]): Unit = {
errors.foreach {
case (topicName, error) =>
results.find(topicName).
setErrorCode(error.error().code()).
setErrorMessage(error.message())
}
// Special handling to add duplicate topics to the response
def sendResponseWithDuplicatesCallback(results: Map[String, ApiError]): Unit = {
val duplicatedTopicsResults =
if (duplicateTopics.nonEmpty) {
val errorMessage = s"Create topics request from client `${request.header.clientId}` contains multiple entries " +
s"for the following topics: ${duplicateTopics.keySet.mkString(",")}"
// We can send the error message in the response for version 1, so we don't have to log it any more
if (request.header.apiVersion == 0)
warn(errorMessage)
duplicateTopics.keySet.map((_, new ApiError(Errors.INVALID_REQUEST, errorMessage))).toMap
} else Map.empty
val unauthorizedTopicsResults = unauthorizedTopics.keySet.map(_ -> new ApiError(Errors.TOPIC_AUTHORIZATION_FAILED, null))
val completeResults = results ++ duplicatedTopicsResults ++ unauthorizedTopicsResults
sendResponseCallback(completeResults)
}
adminManager.createTopics(
createTopicsRequest.timeout,
createTopicsRequest.validateOnly,
authorizedTopics,
sendResponseWithDuplicatesCallback
)
sendResponseCallback(results)
}
adminManager.createTopics(createTopicsRequest.data.timeoutMs(),
createTopicsRequest.data.validateOnly(),
toCreate,
handleCreateTopicsResults)
}
}

10
core/src/test/scala/integration/kafka/api/AuthorizerIntegrationTest.scala

@ -33,12 +33,13 @@ import org.apache.kafka.common.acl.{AccessControlEntry, AccessControlEntryFilter @@ -33,12 +33,13 @@ import org.apache.kafka.common.acl.{AccessControlEntry, AccessControlEntryFilter
import org.apache.kafka.common.config.ConfigResource
import org.apache.kafka.common.errors._
import org.apache.kafka.common.internals.Topic.GROUP_METADATA_TOPIC_NAME
import org.apache.kafka.common.message.CreateTopicsRequestData
import org.apache.kafka.common.message.CreateTopicsRequestData.{CreatableTopic, CreatableTopicSet}
import org.apache.kafka.common.message.LeaveGroupRequestData
import org.apache.kafka.common.network.ListenerName
import org.apache.kafka.common.protocol.{ApiKeys, Errors}
import org.apache.kafka.common.record.{CompressionType, MemoryRecords, Records, SimpleRecord}
import org.apache.kafka.common.requests.CreateAclsRequest.AclCreation
import org.apache.kafka.common.requests.CreateTopicsRequest.TopicDetails
import org.apache.kafka.common.requests._
import org.apache.kafka.common.resource.PatternType.LITERAL
import org.apache.kafka.common.resource.{ResourcePattern, ResourcePatternFilter, ResourceType => AdminResourceType}
@ -169,7 +170,7 @@ class AuthorizerIntegrationTest extends BaseRequestTest { @@ -169,7 +170,7 @@ class AuthorizerIntegrationTest extends BaseRequestTest {
ApiKeys.LEADER_AND_ISR -> ((resp: requests.LeaderAndIsrResponse) => resp.responses.asScala.find(_._1 == tp).get._2),
ApiKeys.STOP_REPLICA -> ((resp: requests.StopReplicaResponse) => resp.responses.asScala.find(_._1 == tp).get._2),
ApiKeys.CONTROLLED_SHUTDOWN -> ((resp: requests.ControlledShutdownResponse) => resp.error),
ApiKeys.CREATE_TOPICS -> ((resp: CreateTopicsResponse) => resp.errors.asScala.find(_._1 == createTopic).get._2.error),
ApiKeys.CREATE_TOPICS -> ((resp: CreateTopicsResponse) => Errors.forCode(resp.data().topics().find(createTopic).errorCode())),
ApiKeys.DELETE_TOPICS -> ((resp: requests.DeleteTopicsResponse) => resp.errors.asScala.find(_._1 == deleteTopic).get._2),
ApiKeys.DELETE_RECORDS -> ((resp: requests.DeleteRecordsResponse) => resp.responses.get(deleteRecordsPartition).error),
ApiKeys.OFFSET_FOR_LEADER_EPOCH -> ((resp: OffsetsForLeaderEpochResponse) => resp.responses.get(tp).error),
@ -351,7 +352,10 @@ class AuthorizerIntegrationTest extends BaseRequestTest { @@ -351,7 +352,10 @@ class AuthorizerIntegrationTest extends BaseRequestTest {
ApiKeys.CONTROLLED_SHUTDOWN.latestVersion).build()
private def createTopicsRequest =
new CreateTopicsRequest.Builder(Map(createTopic -> new TopicDetails(1, 1.toShort)).asJava, 0).build()
new CreateTopicsRequest.Builder(new CreateTopicsRequestData().setTopics(
new CreatableTopicSet(Collections.singleton(new CreatableTopic().
setName(createTopic).setNumPartitions(1).
setReplicationFactor(1.toShort)).iterator))).build()
private def deleteTopicsRequest = new DeleteTopicsRequest.Builder(Set(deleteTopic).asJava, 5000).build()

116
core/src/test/scala/unit/kafka/server/AbstractCreateTopicsRequestTest.scala

@ -17,15 +17,19 @@ @@ -17,15 +17,19 @@
package kafka.server
import java.util
import java.util.Properties
import kafka.network.SocketServer
import kafka.utils.TestUtils
import org.apache.kafka.common.message.CreateTopicsRequestData
import org.apache.kafka.common.message.CreateTopicsRequestData.{CreatableReplicaAssignment, CreatableReplicaAssignmentSet, CreatableTopic, CreatableTopicSet, CreateableTopicConfig, CreateableTopicConfigSet}
import org.apache.kafka.common.protocol.types.Struct
import org.apache.kafka.common.protocol.{ApiKeys, Errors}
import org.apache.kafka.common.requests.{ApiError, CreateTopicsRequest, CreateTopicsResponse, MetadataRequest, MetadataResponse}
import org.junit.Assert.{assertEquals, assertFalse, assertNotNull, assertTrue}
import scala.collection.JavaConverters
import scala.collection.JavaConverters._
class AbstractCreateTopicsRequestTest extends BaseRequestTest {
@ -33,30 +37,87 @@ class AbstractCreateTopicsRequestTest extends BaseRequestTest { @@ -33,30 +37,87 @@ class AbstractCreateTopicsRequestTest extends BaseRequestTest {
override def propertyOverrides(properties: Properties): Unit =
properties.put(KafkaConfig.AutoCreateTopicsEnableProp, false.toString)
def topicsReq(topics: Seq[CreatableTopic],
timeout: Integer = 10000,
validateOnly: Boolean = false) = {
val req = new CreateTopicsRequestData()
req.setTimeoutMs(timeout)
req.setTopics(new CreatableTopicSet(topics.asJava.iterator()))
req.setValidateOnly(validateOnly)
new CreateTopicsRequest.Builder(req).build()
}
def topicReq(name: String,
numPartitions: Integer = null,
replicationFactor: Integer = null,
config: Map[String, String] = null,
assignment: Map[Int, Seq[Int]] = null): CreatableTopic = {
val topic = new CreatableTopic()
topic.setName(name)
if (numPartitions != null) {
topic.setNumPartitions(numPartitions)
} else if (assignment != null) {
topic.setNumPartitions(-1)
} else {
topic.setNumPartitions(1)
}
if (replicationFactor != null) {
topic.setReplicationFactor(replicationFactor.toShort)
} else if (assignment != null) {
topic.setReplicationFactor((-1).toShort)
} else {
topic.setReplicationFactor(1.toShort)
}
if (config != null) {
val effectiveConfigs = new CreateableTopicConfigSet()
config.foreach {
case (name, value) => {
effectiveConfigs.add(new CreateableTopicConfig().setName(name).setValue(value))
}
}
topic.setConfigs(effectiveConfigs)
}
if (assignment != null) {
val effectiveAssignments = new CreatableReplicaAssignmentSet()
assignment.foreach {
case (partitionIndex, brokerIdList) => {
val effectiveAssignment = new CreatableReplicaAssignment()
effectiveAssignment.setPartitionIndex(partitionIndex)
val brokerIds = new util.ArrayList[java.lang.Integer]()
brokerIdList.foreach(brokerId => brokerIds.add(brokerId))
effectiveAssignment.setBrokerIds(brokerIds)
effectiveAssignments.add(effectiveAssignment)
}
}
topic.setAssignments(effectiveAssignments)
}
topic
}
protected def validateValidCreateTopicsRequests(request: CreateTopicsRequest): Unit = {
val response = sendCreateTopicRequest(request)
val error = response.errors.values.asScala.find(_.isFailure)
assertTrue(s"There should be no errors, found ${response.errors.asScala}", error.isEmpty)
request.topics.asScala.foreach { case (topic, details) =>
assertTrue(s"There should be no errors, found " +
s"${response.errorCounts().keySet().asScala.mkString(", ")},",
response.errorCounts().keySet().asScala.find(_.code() > 0).isEmpty)
request.data().topics().asScala.foreach { case topic =>
def verifyMetadata(socketServer: SocketServer) = {
val metadata = sendMetadataRequest(
new MetadataRequest.Builder(List(topic).asJava, true).build()).topicMetadata.asScala
val metadataForTopic = metadata.filter(_.topic == topic).head
new MetadataRequest.Builder(List(topic.name()).asJava, true).build()).topicMetadata.asScala
val metadataForTopic = metadata.filter(_.topic == topic.name()).head
val partitions = if (!details.replicasAssignments.isEmpty)
details.replicasAssignments.size
val partitions = if (!topic.assignments().isEmpty)
topic.assignments().size
else
details.numPartitions
topic.numPartitions
val replication = if (!details.replicasAssignments.isEmpty)
details.replicasAssignments.asScala.head._2.size
val replication = if (!topic.assignments().isEmpty)
topic.assignments().iterator().next().brokerIds().size()
else
details.replicationFactor
topic.replicationFactor
if (request.validateOnly) {
if (request.data.validateOnly) {
assertNotNull(s"Topic $topic should be created", metadataForTopic)
assertFalse(s"Error ${metadataForTopic.error} for topic $topic", metadataForTopic.error == Errors.NONE)
assertTrue("The topic should have no partitions", metadataForTopic.partitionMetadata.isEmpty)
@ -71,9 +132,9 @@ class AbstractCreateTopicsRequestTest extends BaseRequestTest { @@ -71,9 +132,9 @@ class AbstractCreateTopicsRequestTest extends BaseRequestTest {
// Verify controller broker has the correct metadata
verifyMetadata(controllerSocketServer)
if (!request.validateOnly) {
if (!request.data.validateOnly) {
// Wait until metadata is propagated and validate non-controller broker has the correct metadata
TestUtils.waitUntilMetadataIsPropagated(servers, topic, 0)
TestUtils.waitUntilMetadataIsPropagated(servers, topic.name(), 0)
}
verifyMetadata(notControllerSocketServer)
}
@ -106,20 +167,21 @@ class AbstractCreateTopicsRequestTest extends BaseRequestTest { @@ -106,20 +167,21 @@ class AbstractCreateTopicsRequestTest extends BaseRequestTest {
requestStruct: Option[Struct] = None): Unit = {
val response = requestStruct.map(sendCreateTopicRequestStruct(_, request.version)).getOrElse(
sendCreateTopicRequest(request))
val errors = response.errors.asScala
assertEquals("The response size should match", expectedResponse.size, response.errors.size)
assertEquals("The response size should match", expectedResponse.size, response.data().topics().size)
expectedResponse.foreach { case (topic, expectedError) =>
val expected = expectedResponse(topic)
val actual = errors(topic)
assertEquals("The response error should match", expected.error, actual.error)
expectedResponse.foreach { case (topicName, expectedError) =>
val expected = expectedResponse(topicName)
val actual = response.data().topics().find(topicName)
if (actual == null) {
throw new RuntimeException(s"No response data found for topic ${topicName}")
}
assertEquals("The response error should match", expected.error.code(), actual.errorCode())
if (checkErrorMessage) {
assertEquals(expected.message, actual.message)
assertEquals(expected.messageWithFallback, actual.messageWithFallback)
assertEquals(expected.message, actual.errorMessage())
}
// If no error validate topic exists
if (expectedError.isSuccess && !request.validateOnly) {
validateTopicExists(topic)
if (expectedError.isSuccess && !request.data.validateOnly) {
validateTopicExists(topicName)
}
}
}
@ -131,10 +193,6 @@ class AbstractCreateTopicsRequestTest extends BaseRequestTest { @@ -131,10 +193,6 @@ class AbstractCreateTopicsRequestTest extends BaseRequestTest {
assertTrue("The topic should be created", metadata.exists(p => p.topic.equals(topic) && p.error == Errors.NONE))
}
protected def replicaAssignmentToJava(assignments: Map[Int, List[Int]]) = {
assignments.map { case (k, v) => (k: Integer, v.map { i => i: Integer }.asJava) }.asJava
}
protected def sendCreateTopicRequestStruct(requestStruct: Struct, apiVersion: Short,
socketServer: SocketServer = controllerSocketServer): CreateTopicsResponse = {
val response = connectAndSendStruct(requestStruct, ApiKeys.CREATE_TOPICS, apiVersion, socketServer)

142
core/src/test/scala/unit/kafka/server/CreateTopicsRequestTest.scala

@ -19,76 +19,61 @@ package kafka.server @@ -19,76 +19,61 @@ package kafka.server
import kafka.utils._
import org.apache.kafka.common.protocol.Errors
import org.apache.kafka.common.requests.CreateTopicsRequest
import org.junit.Assert._
import org.junit.Test
import scala.collection.JavaConverters._
class CreateTopicsRequestTest extends AbstractCreateTopicsRequestTest {
@Test
def testValidCreateTopicsRequests() {
val timeout = 10000
// Generated assignments
validateValidCreateTopicsRequests(new CreateTopicsRequest.Builder(Map("topic1" -> new CreateTopicsRequest.TopicDetails(1, 1.toShort)).asJava, timeout).build())
validateValidCreateTopicsRequests(new CreateTopicsRequest.Builder(Map("topic2" -> new CreateTopicsRequest.TopicDetails(1, 3.toShort)).asJava, timeout).build())
val config3 = Map("min.insync.replicas" -> "2").asJava
validateValidCreateTopicsRequests(new CreateTopicsRequest.Builder(Map("topic3" -> new CreateTopicsRequest.TopicDetails(5, 2.toShort, config3)).asJava, timeout).build())
validateValidCreateTopicsRequests(topicsReq(Seq(topicReq("topic1"))))
validateValidCreateTopicsRequests(topicsReq(Seq(topicReq("topic2", replicationFactor = 3))))
validateValidCreateTopicsRequests(topicsReq(Seq(topicReq("topic3",
numPartitions = 5, replicationFactor = 2, config = Map("min.insync.replicas" -> "2")))))
// Manual assignments
val assignments4 = replicaAssignmentToJava(Map(0 -> List(0)))
validateValidCreateTopicsRequests(new CreateTopicsRequest.Builder(Map("topic4" -> new CreateTopicsRequest.TopicDetails(assignments4)).asJava, timeout).build())
val assignments5 = replicaAssignmentToJava(Map(0 -> List(0, 1), 1 -> List(1, 0), 2 -> List(1, 2)))
val config5 = Map("min.insync.replicas" -> "2").asJava
validateValidCreateTopicsRequests(new CreateTopicsRequest.Builder(Map("topic5" -> new CreateTopicsRequest.TopicDetails(assignments5, config5)).asJava, timeout).build())
validateValidCreateTopicsRequests(topicsReq(Seq(topicReq("topic4", assignment = Map(0 -> List(0))))))
validateValidCreateTopicsRequests(topicsReq(Seq(topicReq("topic5",
assignment = Map(0 -> List(0, 1), 1 -> List(1, 0), 2 -> List(1, 2)),
config = Map("min.insync.replicas" -> "2")))))
// Mixed
val assignments8 = replicaAssignmentToJava(Map(0 -> List(0, 1), 1 -> List(1, 0), 2 -> List(1, 2)))
validateValidCreateTopicsRequests(new CreateTopicsRequest.Builder(Map(
"topic6" -> new CreateTopicsRequest.TopicDetails(1, 1.toShort),
"topic7" -> new CreateTopicsRequest.TopicDetails(5, 2.toShort),
"topic8" -> new CreateTopicsRequest.TopicDetails(assignments8)).asJava, timeout).build()
)
validateValidCreateTopicsRequests(new CreateTopicsRequest.Builder(Map(
"topic9" -> new CreateTopicsRequest.TopicDetails(1, 1.toShort),
"topic10" -> new CreateTopicsRequest.TopicDetails(5, 2.toShort),
"topic11" -> new CreateTopicsRequest.TopicDetails(assignments8)).asJava, timeout, true).build()
)
validateValidCreateTopicsRequests(topicsReq(Seq(topicReq("topic6"),
topicReq("topic7", numPartitions = 5, replicationFactor = 2),
topicReq("topic8", assignment = Map(0 -> List(0, 1), 1 -> List(1, 0), 2 -> List(1, 2))))))
validateValidCreateTopicsRequests(topicsReq(Seq(topicReq("topic9"),
topicReq("topic10", numPartitions = 5, replicationFactor = 2),
topicReq("topic11", assignment = Map(0 -> List(0, 1), 1 -> List(1, 0), 2 -> List(1, 2)))),
validateOnly = true))
}
@Test
def testErrorCreateTopicsRequests() {
val timeout = 10000
val existingTopic = "existing-topic"
createTopic(existingTopic, 1, 1)
// Basic
validateErrorCreateTopicsRequests(new CreateTopicsRequest.Builder(Map(existingTopic -> new CreateTopicsRequest.TopicDetails(1, 1.toShort)).asJava, timeout).build(),
validateErrorCreateTopicsRequests(topicsReq(Seq(topicReq(existingTopic))),
Map(existingTopic -> error(Errors.TOPIC_ALREADY_EXISTS, Some("Topic 'existing-topic' already exists."))))
validateErrorCreateTopicsRequests(new CreateTopicsRequest.Builder(Map("error-partitions" -> new CreateTopicsRequest.TopicDetails(-1, 1.toShort)).asJava, timeout).build(),
validateErrorCreateTopicsRequests(topicsReq(Seq(topicReq("error-partitions", numPartitions = -1))),
Map("error-partitions" -> error(Errors.INVALID_PARTITIONS)), checkErrorMessage = false)
validateErrorCreateTopicsRequests(new CreateTopicsRequest.Builder(Map("error-replication" -> new CreateTopicsRequest.TopicDetails(1, (numBrokers + 1).toShort)).asJava, timeout).build(),
validateErrorCreateTopicsRequests(topicsReq(Seq(topicReq("error-replication",
replicationFactor = numBrokers + 1))),
Map("error-replication" -> error(Errors.INVALID_REPLICATION_FACTOR)), checkErrorMessage = false)
val invalidConfig = Map("not.a.property" -> "error").asJava
validateErrorCreateTopicsRequests(new CreateTopicsRequest.Builder(Map("error-config" -> new CreateTopicsRequest.TopicDetails(1, 1.toShort, invalidConfig)).asJava, timeout).build(),
validateErrorCreateTopicsRequests(topicsReq(Seq(topicReq("error-config",
config=Map("not.a.property" -> "error")))),
Map("error-config" -> error(Errors.INVALID_CONFIG)), checkErrorMessage = false)
val config = Map("message.format.version" -> "invalid-value").asJava
validateErrorCreateTopicsRequests(new CreateTopicsRequest.Builder(
Map("error-config-value" -> new CreateTopicsRequest.TopicDetails(1, 1.toShort, config)).asJava, timeout).build(),
validateErrorCreateTopicsRequests(topicsReq(Seq(topicReq("error-config-value",
config=Map("message.format.version" -> "invalid-value")))),
Map("error-config-value" -> error(Errors.INVALID_CONFIG)), checkErrorMessage = false)
val invalidAssignments = replicaAssignmentToJava(Map(0 -> List(0, 1), 1 -> List(0)))
validateErrorCreateTopicsRequests(new CreateTopicsRequest.Builder(Map("error-assignment" -> new CreateTopicsRequest.TopicDetails(invalidAssignments)).asJava, timeout).build(),
validateErrorCreateTopicsRequests(topicsReq(Seq(topicReq("error-assignment",
assignment=Map(0 -> List(0, 1), 1 -> List(0))))),
Map("error-assignment" -> error(Errors.INVALID_REPLICA_ASSIGNMENT)), checkErrorMessage = false)
// Partial
validateErrorCreateTopicsRequests(
new CreateTopicsRequest.Builder(Map(
existingTopic -> new CreateTopicsRequest.TopicDetails(1, 1.toShort),
"partial-partitions" -> new CreateTopicsRequest.TopicDetails(-1, 1.toShort),
"partial-replication" -> new CreateTopicsRequest.TopicDetails(1, (numBrokers + 1).toShort),
"partial-assignment" -> new CreateTopicsRequest.TopicDetails(invalidAssignments),
"partial-none" -> new CreateTopicsRequest.TopicDetails(1, 1.toShort)).asJava, timeout).build(),
validateErrorCreateTopicsRequests(topicsReq(Seq(
topicReq(existingTopic),
topicReq("partial-partitions", numPartitions = -1),
topicReq("partial-replication", replicationFactor=numBrokers + 1),
topicReq("partial-assignment", assignment=Map(0 -> List(0, 1), 1 -> List(0))),
topicReq("partial-none"))),
Map(
existingTopic -> error(Errors.TOPIC_ALREADY_EXISTS),
"partial-partitions" -> error(Errors.INVALID_PARTITIONS),
@ -101,12 +86,15 @@ class CreateTopicsRequestTest extends AbstractCreateTopicsRequestTest { @@ -101,12 +86,15 @@ class CreateTopicsRequestTest extends AbstractCreateTopicsRequestTest {
// Timeout
// We don't expect a request to ever complete within 1ms. A timeout of 1 ms allows us to test the purgatory timeout logic.
validateErrorCreateTopicsRequests(new CreateTopicsRequest.Builder(Map("error-timeout" -> new CreateTopicsRequest.TopicDetails(10, 3.toShort)).asJava, 1).build(),
validateErrorCreateTopicsRequests(topicsReq(Seq(
topicReq("error-timeout", numPartitions = 10, replicationFactor = 3)), timeout = 1),
Map("error-timeout" -> error(Errors.REQUEST_TIMED_OUT)), checkErrorMessage = false)
validateErrorCreateTopicsRequests(new CreateTopicsRequest.Builder(Map("error-timeout-zero" -> new CreateTopicsRequest.TopicDetails(10, 3.toShort)).asJava, 0).build(),
validateErrorCreateTopicsRequests(topicsReq(Seq(
topicReq("error-timeout-zero", numPartitions = 10, replicationFactor = 3)), timeout = 0),
Map("error-timeout-zero" -> error(Errors.REQUEST_TIMED_OUT)), checkErrorMessage = false)
// Negative timeouts are treated the same as 0
validateErrorCreateTopicsRequests(new CreateTopicsRequest.Builder(Map("error-timeout-negative" -> new CreateTopicsRequest.TopicDetails(10, 3.toShort)).asJava, -1).build(),
validateErrorCreateTopicsRequests(topicsReq(Seq(
topicReq("error-timeout-negative", numPartitions = 10, replicationFactor = 3)), timeout = -1),
Map("error-timeout-negative" -> error(Errors.REQUEST_TIMED_OUT)), checkErrorMessage = false)
// The topics should still get created eventually
TestUtils.waitUntilMetadataIsPropagated(servers, "error-timeout", 0)
@ -119,54 +107,22 @@ class CreateTopicsRequestTest extends AbstractCreateTopicsRequestTest { @@ -119,54 +107,22 @@ class CreateTopicsRequestTest extends AbstractCreateTopicsRequestTest {
@Test
def testInvalidCreateTopicsRequests() {
// Duplicate
val singleRequest = new CreateTopicsRequest.Builder(Map("duplicate-topic" ->
new CreateTopicsRequest.TopicDetails(1, 1.toShort)).asJava, 1000).build()
validateErrorCreateTopicsRequests(singleRequest, Map("duplicate-topic" -> error(Errors.INVALID_REQUEST,
Some("""Create topics request from client `client-id` contains multiple entries for the following topics: duplicate-topic"""))),
requestStruct = Some(toStructWithDuplicateFirstTopic(singleRequest)))
// Duplicate Partial with validateOnly
val doubleRequestValidateOnly = new CreateTopicsRequest.Builder(Map(
"duplicate-topic" -> new CreateTopicsRequest.TopicDetails(1, 1.toShort),
"other-topic" -> new CreateTopicsRequest.TopicDetails(1, 1.toShort)).asJava, 1000, true).build()
validateErrorCreateTopicsRequests(doubleRequestValidateOnly, Map(
"duplicate-topic" -> error(Errors.INVALID_REQUEST),
"other-topic" -> error(Errors.NONE)), checkErrorMessage = false,
requestStruct = Some(toStructWithDuplicateFirstTopic(doubleRequestValidateOnly)))
// Duplicate Partial
val doubleRequest = new CreateTopicsRequest.Builder(Map(
"duplicate-topic" -> new CreateTopicsRequest.TopicDetails(1, 1.toShort),
"other-topic" -> new CreateTopicsRequest.TopicDetails(1, 1.toShort)).asJava, 1000).build()
validateErrorCreateTopicsRequests(doubleRequest, Map(
"duplicate-topic" -> error(Errors.INVALID_REQUEST),
"other-topic" -> error(Errors.NONE)), checkErrorMessage = false,
requestStruct = Some(toStructWithDuplicateFirstTopic(doubleRequest)))
// Partitions/ReplicationFactor and ReplicaAssignment
val assignments = replicaAssignmentToJava(Map(0 -> List(0)))
val assignmentRequest = new CreateTopicsRequest.Builder(Map("bad-args-topic" ->
new CreateTopicsRequest.TopicDetails(assignments)).asJava, 1000).build()
val badArgumentsRequest = addPartitionsAndReplicationFactorToFirstTopic(assignmentRequest)
validateErrorCreateTopicsRequests(badArgumentsRequest, Map("bad-args-topic" -> error(Errors.INVALID_REQUEST)),
checkErrorMessage = false)
// Partitions/ReplicationFactor and ReplicaAssignment with validateOnly
val assignmentRequestValidateOnly = new CreateTopicsRequest.Builder(Map("bad-args-topic" ->
new CreateTopicsRequest.TopicDetails(assignments)).asJava, 1000, true).build()
val badArgumentsRequestValidateOnly = addPartitionsAndReplicationFactorToFirstTopic(assignmentRequestValidateOnly)
validateErrorCreateTopicsRequests(badArgumentsRequestValidateOnly, Map("bad-args-topic" -> error(Errors.INVALID_REQUEST)),
checkErrorMessage = false)
validateErrorCreateTopicsRequests(topicsReq(Seq(
topicReq("bad-args-topic", numPartitions = 10, replicationFactor = 3,
assignment = Map(0 -> List(0))))),
Map("bad-args-topic" -> error(Errors.INVALID_REQUEST)), checkErrorMessage = false)
validateErrorCreateTopicsRequests(topicsReq(Seq(
topicReq("bad-args-topic", numPartitions = 10, replicationFactor = 3,
assignment = Map(0 -> List(0)))), validateOnly = true),
Map("bad-args-topic" -> error(Errors.INVALID_REQUEST)), checkErrorMessage = false)
}
@Test
def testNotController() {
val request = new CreateTopicsRequest.Builder(Map("topic1" -> new CreateTopicsRequest.TopicDetails(1, 1.toShort)).asJava, 1000).build()
val response = sendCreateTopicRequest(request, notControllerSocketServer)
val error = response.errors.asScala.head._2.error
assertEquals("Expected controller error when routed incorrectly", Errors.NOT_CONTROLLER, error)
val req = topicsReq(Seq(topicReq("topic1")))
val response = sendCreateTopicRequest(req, notControllerSocketServer)
assertEquals(1, response.errorCounts().get(Errors.NOT_CONTROLLER))
}
}

78
core/src/test/scala/unit/kafka/server/CreateTopicsRequestWithPolicyTest.scala

@ -23,7 +23,6 @@ import java.util.Properties @@ -23,7 +23,6 @@ import java.util.Properties
import kafka.log.LogConfig
import org.apache.kafka.common.errors.PolicyViolationException
import org.apache.kafka.common.protocol.Errors
import org.apache.kafka.common.requests.CreateTopicsRequest
import org.apache.kafka.server.policy.CreateTopicPolicy
import org.apache.kafka.server.policy.CreateTopicPolicy.RequestMetadata
import org.junit.Test
@ -40,66 +39,69 @@ class CreateTopicsRequestWithPolicyTest extends AbstractCreateTopicsRequestTest @@ -40,66 +39,69 @@ class CreateTopicsRequestWithPolicyTest extends AbstractCreateTopicsRequestTest
@Test
def testValidCreateTopicsRequests() {
val timeout = 10000
validateValidCreateTopicsRequests(topicsReq(Seq(topicReq("topic1",
numPartitions = 5))))
validateValidCreateTopicsRequests(new CreateTopicsRequest.Builder(
Map("topic1" -> new CreateTopicsRequest.TopicDetails(5, 1.toShort)).asJava, timeout).build())
validateValidCreateTopicsRequests(topicsReq(Seq(topicReq("topic2",
numPartitions = 5, replicationFactor = 3)),
validateOnly = true))
validateValidCreateTopicsRequests(new CreateTopicsRequest.Builder(
Map("topic2" -> new CreateTopicsRequest.TopicDetails(5, 3.toShort)).asJava, timeout, true).build())
validateValidCreateTopicsRequests(topicsReq(Seq(topicReq("topic3",
numPartitions = 11, replicationFactor = 2,
config = Map(LogConfig.RetentionMsProp -> 4999.toString))),
validateOnly = true))
val configs = Map(LogConfig.RetentionMsProp -> 4999.toString)
validateValidCreateTopicsRequests(new CreateTopicsRequest.Builder(
Map("topic3" -> new CreateTopicsRequest.TopicDetails(11, 2.toShort, configs.asJava)).asJava, timeout, true).build())
val assignments = replicaAssignmentToJava(Map(0 -> List(1, 0), 1 -> List(0, 1)))
validateValidCreateTopicsRequests(new CreateTopicsRequest.Builder(
Map("topic4" -> new CreateTopicsRequest.TopicDetails(assignments)).asJava, timeout).build())
validateValidCreateTopicsRequests(topicsReq(Seq(topicReq("topic4",
assignment = Map(0 -> List(1, 0), 1 -> List(0, 1))))))
}
@Test
def testErrorCreateTopicsRequests() {
val timeout = 10000
val existingTopic = "existing-topic"
createTopic(existingTopic, 1, 1)
// Policy violations
validateErrorCreateTopicsRequests(new CreateTopicsRequest.Builder(
Map("policy-topic1" -> new CreateTopicsRequest.TopicDetails(4, 1.toShort)).asJava, timeout).build(),
validateErrorCreateTopicsRequests(topicsReq(Seq(topicReq("policy-topic1",
numPartitions = 4, replicationFactor = 1))),
Map("policy-topic1" -> error(Errors.POLICY_VIOLATION, Some("Topics should have at least 5 partitions, received 4"))))
validateErrorCreateTopicsRequests(new CreateTopicsRequest.Builder(
Map("policy-topic2" -> new CreateTopicsRequest.TopicDetails(4, 3.toShort)).asJava, timeout, true).build(),
validateErrorCreateTopicsRequests(topicsReq(Seq(topicReq("policy-topic2",
numPartitions = 4, replicationFactor = 3)), validateOnly = true),
Map("policy-topic2" -> error(Errors.POLICY_VIOLATION, Some("Topics should have at least 5 partitions, received 4"))))
val configs = Map(LogConfig.RetentionMsProp -> 5001.toString)
validateErrorCreateTopicsRequests(new CreateTopicsRequest.Builder(
Map("policy-topic3" -> new CreateTopicsRequest.TopicDetails(11, 2.toShort, configs.asJava)).asJava, timeout, true).build(),
Map("policy-topic3" -> error(Errors.POLICY_VIOLATION, Some("RetentionMs should be less than 5000ms if replicationFactor > 5"))))
validateErrorCreateTopicsRequests(new CreateTopicsRequest.Builder(
Map("policy-topic4" -> new CreateTopicsRequest.TopicDetails(11, 3.toShort, Map.empty.asJava)).asJava, timeout, true).build(),
Map("policy-topic4" -> error(Errors.POLICY_VIOLATION, Some("RetentionMs should be less than 5000ms if replicationFactor > 5"))))
val assignments = replicaAssignmentToJava(Map(0 -> List(1), 1 -> List(0)))
validateErrorCreateTopicsRequests(new CreateTopicsRequest.Builder(
Map("policy-topic5" -> new CreateTopicsRequest.TopicDetails(assignments)).asJava, timeout).build(),
validateErrorCreateTopicsRequests(topicsReq(Seq(topicReq("policy-topic3",
numPartitions = 11, replicationFactor = 2,
config = Map(LogConfig.RetentionMsProp -> 5001.toString))), validateOnly = true),
Map("policy-topic3" -> error(Errors.POLICY_VIOLATION,
Some("RetentionMs should be less than 5000ms if replicationFactor > 5"))))
validateErrorCreateTopicsRequests(topicsReq(Seq(topicReq("policy-topic4",
numPartitions = 11, replicationFactor = 3,
config = Map(LogConfig.RetentionMsProp -> 5001.toString))), validateOnly = true),
Map("policy-topic4" -> error(Errors.POLICY_VIOLATION,
Some("RetentionMs should be less than 5000ms if replicationFactor > 5"))))
validateErrorCreateTopicsRequests(topicsReq(Seq(topicReq("policy-topic5",
assignment = Map(0 -> List(1), 1 -> List(0)),
config = Map(LogConfig.RetentionMsProp -> 5001.toString))), validateOnly = true),
Map("policy-topic5" -> error(Errors.POLICY_VIOLATION,
Some("Topic partitions should have at least 2 partitions, received 1 for partition 0"))))
// Check that basic errors still work
validateErrorCreateTopicsRequests(new CreateTopicsRequest.Builder(
Map(existingTopic -> new CreateTopicsRequest.TopicDetails(5, 1.toShort)).asJava, timeout).build(),
Map(existingTopic -> error(Errors.TOPIC_ALREADY_EXISTS, Some("Topic 'existing-topic' already exists."))))
validateErrorCreateTopicsRequests(topicsReq(Seq(topicReq(existingTopic,
numPartitions = 5, replicationFactor = 1))),
Map(existingTopic -> error(Errors.TOPIC_ALREADY_EXISTS,
Some("Topic 'existing-topic' already exists."))))
validateErrorCreateTopicsRequests(new CreateTopicsRequest.Builder(
Map("error-replication" -> new CreateTopicsRequest.TopicDetails(10, (numBrokers + 1).toShort)).asJava, timeout, true).build(),
validateErrorCreateTopicsRequests(topicsReq(Seq(topicReq("error-replication",
numPartitions = 10, replicationFactor = numBrokers + 1)), validateOnly = true),
Map("error-replication" -> error(Errors.INVALID_REPLICATION_FACTOR,
Some("Replication factor: 4 larger than available brokers: 3."))))
validateErrorCreateTopicsRequests(new CreateTopicsRequest.Builder(
Map("error-replication2" -> new CreateTopicsRequest.TopicDetails(10, -1: Short)).asJava, timeout, true).build(),
Map("error-replication2" -> error(Errors.INVALID_REPLICATION_FACTOR, Some("Replication factor must be larger than 0."))))
validateErrorCreateTopicsRequests(topicsReq(Seq(topicReq("error-replication2",
numPartitions = 10, replicationFactor = -1)), validateOnly = true),
Map("error-replication2" -> error(Errors.INVALID_REPLICATION_FACTOR,
Some("Replication factor must be larger than 0."))))
}
}

14
core/src/test/scala/unit/kafka/server/RequestQuotaTest.scala

@ -27,6 +27,8 @@ import org.apache.kafka.common.config.ConfigResource @@ -27,6 +27,8 @@ import org.apache.kafka.common.config.ConfigResource
import org.apache.kafka.common.message.{ElectPreferredLeadersRequestData, LeaveGroupRequestData}
import org.apache.kafka.common.resource.{PatternType, ResourcePattern, ResourcePatternFilter, ResourceType => AdminResourceType}
import org.apache.kafka.common.{Node, TopicPartition}
import org.apache.kafka.common.message.CreateTopicsRequestData
import org.apache.kafka.common.message.CreateTopicsRequestData.{CreatableTopic, CreatableTopicSet}
import org.apache.kafka.common.metrics.{KafkaMetric, Quota, Sensor}
import org.apache.kafka.common.network.ListenerName
import org.apache.kafka.common.protocol.ApiKeys
@ -279,8 +281,13 @@ class RequestQuotaTest extends BaseRequestTest { @@ -279,8 +281,13 @@ class RequestQuotaTest extends BaseRequestTest {
case ApiKeys.API_VERSIONS =>
new ApiVersionsRequest.Builder
case ApiKeys.CREATE_TOPICS =>
new CreateTopicsRequest.Builder(Map("topic-2" -> new CreateTopicsRequest.TopicDetails(1, 1.toShort)).asJava, 0)
case ApiKeys.CREATE_TOPICS => {
new CreateTopicsRequest.Builder(
new CreateTopicsRequestData().setTopics(
new CreatableTopicSet(Collections.singleton(
new CreatableTopic().setName("topic-2").setNumPartitions(1).
setReplicationFactor(1.toShort)).iterator())))
}
case ApiKeys.DELETE_TOPICS =>
new DeleteTopicsRequest.Builder(Set("topic-2").asJava, 5000)
@ -438,7 +445,8 @@ class RequestQuotaTest extends BaseRequestTest { @@ -438,7 +445,8 @@ class RequestQuotaTest extends BaseRequestTest {
case ApiKeys.DESCRIBE_GROUPS => new DescribeGroupsResponse(response).throttleTimeMs
case ApiKeys.LIST_GROUPS => new ListGroupsResponse(response).throttleTimeMs
case ApiKeys.API_VERSIONS => new ApiVersionsResponse(response).throttleTimeMs
case ApiKeys.CREATE_TOPICS => new CreateTopicsResponse(response).throttleTimeMs
case ApiKeys.CREATE_TOPICS =>
new CreateTopicsResponse(response, ApiKeys.CREATE_TOPICS.latestVersion()).throttleTimeMs
case ApiKeys.DELETE_TOPICS => new DeleteTopicsResponse(response).throttleTimeMs
case ApiKeys.DELETE_RECORDS => new DeleteRecordsResponse(response).throttleTimeMs
case ApiKeys.INIT_PRODUCER_ID => new InitProducerIdResponse(response).throttleTimeMs

Loading…
Cancel
Save