From 36abc8dcea1a418550875e2a8684b9c9d400464d Mon Sep 17 00:00:00 2001 From: Apoorv Mittal Date: Thu, 19 Oct 2023 16:55:21 +0100 Subject: [PATCH] KAFKA-15604: Telemetry API request and response schemas and classes (KIP-714) (#14554) Initial PR for [KIP-714](https://cwiki.apache.org/confluence/display/KAFKA/KIP-714%3A+Client+metrics+and+observability) - [KAFKA-15601](https://issues.apache.org/jira/browse/KAFKA-15601). This PR defines json request and response schemas for the new Telemetry APIs and implements the corresponding java classes. Reviewers: Andrew Schofield , Kirk True , Matthias J. Sax , Walker Carlson --- checkstyle/suppressions.xml | 2 +- .../apache/kafka/common/protocol/ApiKeys.java | 4 +- .../common/requests/AbstractRequest.java | 4 + .../common/requests/AbstractResponse.java | 4 + .../GetTelemetrySubscriptionsRequest.java | 79 +++++++++++++++++++ .../GetTelemetrySubscriptionsResponse.java | 72 +++++++++++++++++ .../common/requests/PushTelemetryRequest.java | 78 ++++++++++++++++++ .../requests/PushTelemetryResponse.java | 73 +++++++++++++++++ .../GetTelemetrySubscriptionsRequest.json | 33 ++++++++ .../GetTelemetrySubscriptionsResponse.json | 60 ++++++++++++++ .../common/message/PushTelemetryRequest.json | 49 ++++++++++++ .../common/message/PushTelemetryResponse.json | 32 ++++++++ .../kafka/common/protocol/ProtoUtilsTest.java | 1 + .../GetTelemetrySubscriptionsRequestTest.java | 36 +++++++++ ...GetTelemetrySubscriptionsResponseTest.java | 47 +++++++++++ .../requests/PushTelemetryRequestTest.java | 37 +++++++++ .../requests/PushTelemetryResponseTest.java | 48 +++++++++++ .../common/requests/RequestResponseTest.java | 43 ++++++++++ .../kafka/network/RequestConvertToJson.scala | 4 + .../main/scala/kafka/server/KafkaApis.scala | 14 ++++ .../unit/kafka/server/KafkaApisTest.scala | 54 +++++++++++++ .../unit/kafka/server/RequestQuotaTest.scala | 6 ++ 22 files changed, 778 insertions(+), 2 deletions(-) create mode 100644 clients/src/main/java/org/apache/kafka/common/requests/GetTelemetrySubscriptionsRequest.java create mode 100644 clients/src/main/java/org/apache/kafka/common/requests/GetTelemetrySubscriptionsResponse.java create mode 100644 clients/src/main/java/org/apache/kafka/common/requests/PushTelemetryRequest.java create mode 100644 clients/src/main/java/org/apache/kafka/common/requests/PushTelemetryResponse.java create mode 100644 clients/src/main/resources/common/message/GetTelemetrySubscriptionsRequest.json create mode 100644 clients/src/main/resources/common/message/GetTelemetrySubscriptionsResponse.json create mode 100644 clients/src/main/resources/common/message/PushTelemetryRequest.json create mode 100644 clients/src/main/resources/common/message/PushTelemetryResponse.json create mode 100644 clients/src/test/java/org/apache/kafka/common/requests/GetTelemetrySubscriptionsRequestTest.java create mode 100644 clients/src/test/java/org/apache/kafka/common/requests/GetTelemetrySubscriptionsResponseTest.java create mode 100644 clients/src/test/java/org/apache/kafka/common/requests/PushTelemetryRequestTest.java create mode 100644 clients/src/test/java/org/apache/kafka/common/requests/PushTelemetryResponseTest.java diff --git a/checkstyle/suppressions.xml b/checkstyle/suppressions.xml index a66e63428dc..dd2c6c98608 100644 --- a/checkstyle/suppressions.xml +++ b/checkstyle/suppressions.xml @@ -63,7 +63,7 @@ files="AbstractResponse.java"/> + files="(KerberosLogin|RequestResponseTest|ConnectMetricsRegistry|KafkaConsumer|AbstractStickyAssignor|AbstractRequest|AbstractResponse).java"/> diff --git a/clients/src/main/java/org/apache/kafka/common/protocol/ApiKeys.java b/clients/src/main/java/org/apache/kafka/common/protocol/ApiKeys.java index d98a48b6160..1da9eeaccd4 100644 --- a/clients/src/main/java/org/apache/kafka/common/protocol/ApiKeys.java +++ b/clients/src/main/java/org/apache/kafka/common/protocol/ApiKeys.java @@ -113,7 +113,9 @@ public enum ApiKeys { ALLOCATE_PRODUCER_IDS(ApiMessageType.ALLOCATE_PRODUCER_IDS, true, true), CONSUMER_GROUP_HEARTBEAT(ApiMessageType.CONSUMER_GROUP_HEARTBEAT), CONSUMER_GROUP_DESCRIBE(ApiMessageType.CONSUMER_GROUP_DESCRIBE), - CONTROLLER_REGISTRATION(ApiMessageType.CONTROLLER_REGISTRATION); + CONTROLLER_REGISTRATION(ApiMessageType.CONTROLLER_REGISTRATION), + GET_TELEMETRY_SUBSCRIPTIONS(ApiMessageType.GET_TELEMETRY_SUBSCRIPTIONS), + PUSH_TELEMETRY(ApiMessageType.PUSH_TELEMETRY); private static final Map> APIS_BY_LISTENER = new EnumMap<>(ApiMessageType.ListenerType.class); diff --git a/clients/src/main/java/org/apache/kafka/common/requests/AbstractRequest.java b/clients/src/main/java/org/apache/kafka/common/requests/AbstractRequest.java index 40615973269..2d5b0c66329 100644 --- a/clients/src/main/java/org/apache/kafka/common/requests/AbstractRequest.java +++ b/clients/src/main/java/org/apache/kafka/common/requests/AbstractRequest.java @@ -316,6 +316,10 @@ public abstract class AbstractRequest implements AbstractRequestResponse { return ConsumerGroupDescribeRequest.parse(buffer, apiVersion); case CONTROLLER_REGISTRATION: return ControllerRegistrationRequest.parse(buffer, apiVersion); + case GET_TELEMETRY_SUBSCRIPTIONS: + return GetTelemetrySubscriptionsRequest.parse(buffer, apiVersion); + case PUSH_TELEMETRY: + return PushTelemetryRequest.parse(buffer, apiVersion); default: throw new AssertionError(String.format("ApiKey %s is not currently handled in `parseRequest`, the " + "code should be updated to do so.", apiKey)); diff --git a/clients/src/main/java/org/apache/kafka/common/requests/AbstractResponse.java b/clients/src/main/java/org/apache/kafka/common/requests/AbstractResponse.java index bcf1ac895c5..1747ed1dbd2 100644 --- a/clients/src/main/java/org/apache/kafka/common/requests/AbstractResponse.java +++ b/clients/src/main/java/org/apache/kafka/common/requests/AbstractResponse.java @@ -253,6 +253,10 @@ public abstract class AbstractResponse implements AbstractRequestResponse { return ConsumerGroupDescribeResponse.parse(responseBuffer, version); case CONTROLLER_REGISTRATION: return ControllerRegistrationResponse.parse(responseBuffer, version); + case GET_TELEMETRY_SUBSCRIPTIONS: + return GetTelemetrySubscriptionsResponse.parse(responseBuffer, version); + case PUSH_TELEMETRY: + return PushTelemetryResponse.parse(responseBuffer, version); default: throw new AssertionError(String.format("ApiKey %s is not currently handled in `parseResponse`, the " + "code should be updated to do so.", apiKey)); diff --git a/clients/src/main/java/org/apache/kafka/common/requests/GetTelemetrySubscriptionsRequest.java b/clients/src/main/java/org/apache/kafka/common/requests/GetTelemetrySubscriptionsRequest.java new file mode 100644 index 00000000000..eb22f4658a4 --- /dev/null +++ b/clients/src/main/java/org/apache/kafka/common/requests/GetTelemetrySubscriptionsRequest.java @@ -0,0 +1,79 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kafka.common.requests; + +import org.apache.kafka.common.message.GetTelemetrySubscriptionsRequestData; +import org.apache.kafka.common.message.GetTelemetrySubscriptionsResponseData; + +import org.apache.kafka.common.protocol.ApiKeys; +import org.apache.kafka.common.protocol.ByteBufferAccessor; +import org.apache.kafka.common.protocol.Errors; + +import java.nio.ByteBuffer; + +public class GetTelemetrySubscriptionsRequest extends AbstractRequest { + + public static class Builder extends AbstractRequest.Builder { + + private final GetTelemetrySubscriptionsRequestData data; + + public Builder(GetTelemetrySubscriptionsRequestData data) { + this(data, false); + } + + public Builder(GetTelemetrySubscriptionsRequestData data, boolean enableUnstableLastVersion) { + super(ApiKeys.GET_TELEMETRY_SUBSCRIPTIONS, enableUnstableLastVersion); + this.data = data; + } + + @Override + public GetTelemetrySubscriptionsRequest build(short version) { + return new GetTelemetrySubscriptionsRequest(data, version); + } + + @Override + public String toString() { + return data.toString(); + } + } + + private final GetTelemetrySubscriptionsRequestData data; + + public GetTelemetrySubscriptionsRequest(GetTelemetrySubscriptionsRequestData data, short version) { + super(ApiKeys.GET_TELEMETRY_SUBSCRIPTIONS, version); + this.data = data; + } + + @Override + public GetTelemetrySubscriptionsResponse getErrorResponse(int throttleTimeMs, Throwable e) { + GetTelemetrySubscriptionsResponseData responseData = new GetTelemetrySubscriptionsResponseData() + .setErrorCode(Errors.forException(e).code()) + .setThrottleTimeMs(throttleTimeMs); + return new GetTelemetrySubscriptionsResponse(responseData); + } + + @Override + public GetTelemetrySubscriptionsRequestData data() { + return data; + } + + public static GetTelemetrySubscriptionsRequest parse(ByteBuffer buffer, short version) { + return new GetTelemetrySubscriptionsRequest(new GetTelemetrySubscriptionsRequestData( + new ByteBufferAccessor(buffer), version), version); + } +} \ No newline at end of file diff --git a/clients/src/main/java/org/apache/kafka/common/requests/GetTelemetrySubscriptionsResponse.java b/clients/src/main/java/org/apache/kafka/common/requests/GetTelemetrySubscriptionsResponse.java new file mode 100644 index 00000000000..01e90384570 --- /dev/null +++ b/clients/src/main/java/org/apache/kafka/common/requests/GetTelemetrySubscriptionsResponse.java @@ -0,0 +1,72 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kafka.common.requests; + +import org.apache.kafka.common.message.GetTelemetrySubscriptionsResponseData; +import org.apache.kafka.common.protocol.ApiKeys; +import org.apache.kafka.common.protocol.ByteBufferAccessor; +import org.apache.kafka.common.protocol.Errors; + +import java.nio.ByteBuffer; +import java.util.HashMap; +import java.util.Map; + +public class GetTelemetrySubscriptionsResponse extends AbstractResponse { + + private final GetTelemetrySubscriptionsResponseData data; + + public GetTelemetrySubscriptionsResponse(GetTelemetrySubscriptionsResponseData data) { + super(ApiKeys.GET_TELEMETRY_SUBSCRIPTIONS); + this.data = data; + } + + @Override + public GetTelemetrySubscriptionsResponseData data() { + return data; + } + + @Override + public Map errorCounts() { + HashMap counts = new HashMap<>(); + updateErrorCounts(counts, Errors.forCode(data.errorCode())); + return counts; + } + + @Override + public int throttleTimeMs() { + return data.throttleTimeMs(); + } + + @Override + public void maybeSetThrottleTimeMs(int throttleTimeMs) { + data.setThrottleTimeMs(throttleTimeMs); + } + + public boolean hasError() { + return error() != Errors.NONE; + } + + public Errors error() { + return Errors.forCode(data.errorCode()); + } + + public static GetTelemetrySubscriptionsResponse parse(ByteBuffer buffer, short version) { + return new GetTelemetrySubscriptionsResponse(new GetTelemetrySubscriptionsResponseData( + new ByteBufferAccessor(buffer), version)); + } +} diff --git a/clients/src/main/java/org/apache/kafka/common/requests/PushTelemetryRequest.java b/clients/src/main/java/org/apache/kafka/common/requests/PushTelemetryRequest.java new file mode 100644 index 00000000000..5df03ed3461 --- /dev/null +++ b/clients/src/main/java/org/apache/kafka/common/requests/PushTelemetryRequest.java @@ -0,0 +1,78 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kafka.common.requests; + +import org.apache.kafka.common.message.PushTelemetryRequestData; +import org.apache.kafka.common.message.PushTelemetryResponseData; +import org.apache.kafka.common.protocol.ApiKeys; +import org.apache.kafka.common.protocol.ByteBufferAccessor; +import org.apache.kafka.common.protocol.Errors; + +import java.nio.ByteBuffer; + +public class PushTelemetryRequest extends AbstractRequest { + + public static class Builder extends AbstractRequest.Builder { + + private final PushTelemetryRequestData data; + + public Builder(PushTelemetryRequestData data) { + this(data, false); + } + + public Builder(PushTelemetryRequestData data, boolean enableUnstableLastVersion) { + super(ApiKeys.PUSH_TELEMETRY, enableUnstableLastVersion); + this.data = data; + } + + @Override + public PushTelemetryRequest build(short version) { + return new PushTelemetryRequest(data, version); + } + + @Override + public String toString() { + return data.toString(); + } + } + + private final PushTelemetryRequestData data; + + public PushTelemetryRequest(PushTelemetryRequestData data, short version) { + super(ApiKeys.PUSH_TELEMETRY, version); + this.data = data; + } + + @Override + public PushTelemetryResponse getErrorResponse(int throttleTimeMs, Throwable e) { + PushTelemetryResponseData responseData = new PushTelemetryResponseData() + .setErrorCode(Errors.forException(e).code()) + .setThrottleTimeMs(throttleTimeMs); + return new PushTelemetryResponse(responseData); + } + + @Override + public PushTelemetryRequestData data() { + return data; + } + + public static PushTelemetryRequest parse(ByteBuffer buffer, short version) { + return new PushTelemetryRequest(new PushTelemetryRequestData( + new ByteBufferAccessor(buffer), version), version); + } +} diff --git a/clients/src/main/java/org/apache/kafka/common/requests/PushTelemetryResponse.java b/clients/src/main/java/org/apache/kafka/common/requests/PushTelemetryResponse.java new file mode 100644 index 00000000000..4be6fbbbae9 --- /dev/null +++ b/clients/src/main/java/org/apache/kafka/common/requests/PushTelemetryResponse.java @@ -0,0 +1,73 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kafka.common.requests; + +import org.apache.kafka.common.message.PushTelemetryResponseData; +import org.apache.kafka.common.protocol.ApiKeys; +import org.apache.kafka.common.protocol.ByteBufferAccessor; +import org.apache.kafka.common.protocol.Errors; + +import java.nio.ByteBuffer; +import java.util.HashMap; +import java.util.Map; + +public class PushTelemetryResponse extends AbstractResponse { + + private final PushTelemetryResponseData data; + + public PushTelemetryResponse(PushTelemetryResponseData data) { + super(ApiKeys.PUSH_TELEMETRY); + this.data = data; + } + + @Override + public PushTelemetryResponseData data() { + return data; + } + + @Override + public Map errorCounts() { + HashMap counts = new HashMap<>(); + updateErrorCounts(counts, Errors.forCode(data.errorCode())); + return counts; + } + + @Override + public int throttleTimeMs() { + return data.throttleTimeMs(); + } + + @Override + public void maybeSetThrottleTimeMs(int throttleTimeMs) { + data.setThrottleTimeMs(throttleTimeMs); + } + + public boolean hasError() { + return error() != Errors.NONE; + } + + public Errors error() { + return Errors.forCode(data.errorCode()); + } + + public static PushTelemetryResponse parse(ByteBuffer buffer, short version) { + return new PushTelemetryResponse(new PushTelemetryResponseData( + new ByteBufferAccessor(buffer), version)); + } +} + diff --git a/clients/src/main/resources/common/message/GetTelemetrySubscriptionsRequest.json b/clients/src/main/resources/common/message/GetTelemetrySubscriptionsRequest.json new file mode 100644 index 00000000000..1020ae717c0 --- /dev/null +++ b/clients/src/main/resources/common/message/GetTelemetrySubscriptionsRequest.json @@ -0,0 +1,33 @@ +// Licensed to the Apache Software Foundation (ASF) under one or more +// contributor license agreements. See the NOTICE file distributed with +// this work for additional information regarding copyright ownership. +// The ASF licenses this file to You under the Apache License, Version 2.0 +// (the "License"); you may not use this file except in compliance with +// the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +{ + "apiKey": 71, + "type": "request", + "listeners": ["broker"], + "name": "GetTelemetrySubscriptionsRequest", + "validVersions": "0", + "flexibleVersions": "0+", + // The Telemetry APIs are added as part of KIP-714 and are still under + // development. Hence, the APIs are not exposed by default unless explicitly + // enabled. + "latestVersionUnstable": true, + "fields": [ + { + "name": "ClientInstanceId", "type": "uuid", "versions": "0+", + "about": "Unique id for this client instance, must be set to 0 on the first request." + } + ] +} \ No newline at end of file diff --git a/clients/src/main/resources/common/message/GetTelemetrySubscriptionsResponse.json b/clients/src/main/resources/common/message/GetTelemetrySubscriptionsResponse.json new file mode 100644 index 00000000000..54687f4cf4f --- /dev/null +++ b/clients/src/main/resources/common/message/GetTelemetrySubscriptionsResponse.json @@ -0,0 +1,60 @@ +// Licensed to the Apache Software Foundation (ASF) under one or more +// contributor license agreements. See the NOTICE file distributed with +// this work for additional information regarding copyright ownership. +// The ASF licenses this file to You under the Apache License, Version 2.0 +// (the "License"); you may not use this file except in compliance with +// the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +{ + "apiKey": 71, + "type": "response", + "name": "GetTelemetrySubscriptionsResponse", + "validVersions": "0", + "flexibleVersions": "0+", + "fields": [ + { + "name": "ThrottleTimeMs", "type": "int32", "versions": "0+", + "about": "The duration in milliseconds for which the request was throttled due to a quota violation, or zero if the request did not violate any quota." + }, + { + "name": "ErrorCode", "type": "int16", "versions": "0+", + "about": "The error code, or 0 if there was no error." + }, + { + "name": "ClientInstanceId", "type": "uuid", "versions": "0+", + "about": "Assigned client instance id if ClientInstanceId was 0 in the request, else 0." + }, + { + "name": "SubscriptionId", "type": "int32", "versions": "0+", + "about": "Unique identifier for the current subscription set for this client instance." + }, + { + "name": "AcceptedCompressionTypes", "type": "[]int8", "versions": "0+", + "about": "Compression types that broker accepts for the PushTelemetryRequest." + }, + { + "name": "PushIntervalMs", "type": "int32", "versions": "0+", + "about": "Configured push interval, which is the lowest configured interval in the current subscription set." + }, + { + "name": "TelemetryMaxBytes", "type": "int32", "versions": "0+", + "about": "The maximum bytes of binary data the broker accepts in PushTelemetryRequest." + }, + { + "name": "DeltaTemporality", "type": "bool", "versions": "0+", + "about": "Flag to indicate monotonic/counter metrics are to be emitted as deltas or cumulative values" + }, + { + "name": "RequestedMetrics", "type": "[]string", "versions": "0+", + "about": "Requested metrics prefix string match. Empty array: No metrics subscribed, Array[0] empty string: All metrics subscribed." + } + ] +} \ No newline at end of file diff --git a/clients/src/main/resources/common/message/PushTelemetryRequest.json b/clients/src/main/resources/common/message/PushTelemetryRequest.json new file mode 100644 index 00000000000..b01c458f045 --- /dev/null +++ b/clients/src/main/resources/common/message/PushTelemetryRequest.json @@ -0,0 +1,49 @@ +// Licensed to the Apache Software Foundation (ASF) under one or more +// contributor license agreements. See the NOTICE file distributed with +// this work for additional information regarding copyright ownership. +// The ASF licenses this file to You under the Apache License, Version 2.0 +// (the "License"); you may not use this file except in compliance with +// the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +{ + "apiKey": 72, + "type": "request", + "listeners": ["broker"], + "name": "PushTelemetryRequest", + "validVersions": "0", + "flexibleVersions": "0+", + // The Telemetry APIs are added as part of KIP-714 and are still under + // development. Hence, the APIs are not exposed by default unless explicitly + // enabled. + "latestVersionUnstable": true, + "fields": [ + { + "name": "ClientInstanceId", "type": "uuid", "versions": "0+", + "about": "Unique id for this client instance." + }, + { + "name": "SubscriptionId", "type": "int32", "versions": "0+", + "about": "Unique identifier for the current subscription." + }, + { + "name": "Terminating", "type": "bool", "versions": "0+", + "about": "Client is terminating the connection." + }, + { + "name": "CompressionType", "type": "int8", "versions": "0+", + "about": "Compression codec used to compress the metrics." + }, + { + "name": "Metrics", "type": "bytes", "versions": "0+", + "about": "Metrics encoded in OpenTelemetry MetricsData v1 protobuf format." + } + ] +} \ No newline at end of file diff --git a/clients/src/main/resources/common/message/PushTelemetryResponse.json b/clients/src/main/resources/common/message/PushTelemetryResponse.json new file mode 100644 index 00000000000..56ddfe8bc47 --- /dev/null +++ b/clients/src/main/resources/common/message/PushTelemetryResponse.json @@ -0,0 +1,32 @@ +// Licensed to the Apache Software Foundation (ASF) under one or more +// contributor license agreements. See the NOTICE file distributed with +// this work for additional information regarding copyright ownership. +// The ASF licenses this file to You under the Apache License, Version 2.0 +// (the "License"); you may not use this file except in compliance with +// the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +{ + "apiKey": 72, + "type": "response", + "name": "PushTelemetryResponse", + "validVersions": "0", + "flexibleVersions": "0+", + "fields": [ + { + "name": "ThrottleTimeMs", "type": "int32", "versions": "0+", + "about": "The duration in milliseconds for which the request was throttled due to a quota violation, or zero if the request did not violate any quota." + }, + { + "name": "ErrorCode", "type": "int16", "versions": "0+", + "about": "The error code, or 0 if there was no error." + } + ] +} \ No newline at end of file diff --git a/clients/src/test/java/org/apache/kafka/common/protocol/ProtoUtilsTest.java b/clients/src/test/java/org/apache/kafka/common/protocol/ProtoUtilsTest.java index 712c61168f4..870f80e7a02 100644 --- a/clients/src/test/java/org/apache/kafka/common/protocol/ProtoUtilsTest.java +++ b/clients/src/test/java/org/apache/kafka/common/protocol/ProtoUtilsTest.java @@ -34,6 +34,7 @@ public class ProtoUtilsTest { case EXPIRE_DELEGATION_TOKEN: case RENEW_DELEGATION_TOKEN: case ALTER_USER_SCRAM_CREDENTIALS: + case PUSH_TELEMETRY: case ENVELOPE: assertTrue(key.requiresDelayedAllocation, key + " should require delayed allocation"); break; diff --git a/clients/src/test/java/org/apache/kafka/common/requests/GetTelemetrySubscriptionsRequestTest.java b/clients/src/test/java/org/apache/kafka/common/requests/GetTelemetrySubscriptionsRequestTest.java new file mode 100644 index 00000000000..9e2c376391d --- /dev/null +++ b/clients/src/test/java/org/apache/kafka/common/requests/GetTelemetrySubscriptionsRequestTest.java @@ -0,0 +1,36 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kafka.common.requests; + +import org.apache.kafka.common.message.GetTelemetrySubscriptionsRequestData; +import org.apache.kafka.common.protocol.Errors; +import org.junit.jupiter.api.Test; + +import java.util.Collections; + +import static org.junit.jupiter.api.Assertions.assertEquals; + +public class GetTelemetrySubscriptionsRequestTest { + + @Test + public void testGetErrorResponse() { + GetTelemetrySubscriptionsRequest req = new GetTelemetrySubscriptionsRequest(new GetTelemetrySubscriptionsRequestData(), (short) 0); + GetTelemetrySubscriptionsResponse response = req.getErrorResponse(0, Errors.CLUSTER_AUTHORIZATION_FAILED.exception()); + assertEquals(Collections.singletonMap(Errors.CLUSTER_AUTHORIZATION_FAILED, 1), response.errorCounts()); + } +} diff --git a/clients/src/test/java/org/apache/kafka/common/requests/GetTelemetrySubscriptionsResponseTest.java b/clients/src/test/java/org/apache/kafka/common/requests/GetTelemetrySubscriptionsResponseTest.java new file mode 100644 index 00000000000..2e972b42ff6 --- /dev/null +++ b/clients/src/test/java/org/apache/kafka/common/requests/GetTelemetrySubscriptionsResponseTest.java @@ -0,0 +1,47 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kafka.common.requests; + +import org.apache.kafka.common.message.GetTelemetrySubscriptionsResponseData; +import org.apache.kafka.common.protocol.Errors; +import org.junit.jupiter.api.Test; + +import java.util.Collections; + +import static org.junit.jupiter.api.Assertions.assertEquals; + +public class GetTelemetrySubscriptionsResponseTest { + + @Test + public void testErrorCountsReturnsNoneWhenNoErrors() { + GetTelemetrySubscriptionsResponseData data = new GetTelemetrySubscriptionsResponseData() + .setErrorCode(Errors.NONE.code()); + GetTelemetrySubscriptionsResponse response = new GetTelemetrySubscriptionsResponse(data); + assertEquals(Collections.singletonMap(Errors.NONE, 1), response.errorCounts()); + } + + @Test + public void testErrorCountsReturnsOneError() { + GetTelemetrySubscriptionsResponseData data = new GetTelemetrySubscriptionsResponseData() + .setErrorCode(Errors.CLUSTER_AUTHORIZATION_FAILED.code()); + data.setErrorCode(Errors.INVALID_CONFIG.code()); + + GetTelemetrySubscriptionsResponse response = new GetTelemetrySubscriptionsResponse(data); + assertEquals(Collections.singletonMap(Errors.INVALID_CONFIG, 1), response.errorCounts()); + } +} diff --git a/clients/src/test/java/org/apache/kafka/common/requests/PushTelemetryRequestTest.java b/clients/src/test/java/org/apache/kafka/common/requests/PushTelemetryRequestTest.java new file mode 100644 index 00000000000..77a58422db6 --- /dev/null +++ b/clients/src/test/java/org/apache/kafka/common/requests/PushTelemetryRequestTest.java @@ -0,0 +1,37 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kafka.common.requests; + +import org.apache.kafka.common.message.PushTelemetryRequestData; +import org.apache.kafka.common.protocol.Errors; +import org.junit.jupiter.api.Test; + +import java.util.Collections; + +import static org.junit.jupiter.api.Assertions.assertEquals; + +public class PushTelemetryRequestTest { + + @Test + public void testGetErrorResponse() { + PushTelemetryRequest req = new PushTelemetryRequest(new PushTelemetryRequestData(), (short) 0); + PushTelemetryResponse response = req.getErrorResponse(0, Errors.CLUSTER_AUTHORIZATION_FAILED.exception()); + assertEquals(Collections.singletonMap(Errors.CLUSTER_AUTHORIZATION_FAILED, 1), response.errorCounts()); + } + +} diff --git a/clients/src/test/java/org/apache/kafka/common/requests/PushTelemetryResponseTest.java b/clients/src/test/java/org/apache/kafka/common/requests/PushTelemetryResponseTest.java new file mode 100644 index 00000000000..4a5f085cf7d --- /dev/null +++ b/clients/src/test/java/org/apache/kafka/common/requests/PushTelemetryResponseTest.java @@ -0,0 +1,48 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kafka.common.requests; + +import org.apache.kafka.common.message.PushTelemetryResponseData; +import org.apache.kafka.common.protocol.Errors; +import org.junit.jupiter.api.Test; + +import java.util.Collections; + +import static org.junit.jupiter.api.Assertions.assertEquals; + +public class PushTelemetryResponseTest { + + @Test + public void testErrorCountsReturnsNoneWhenNoErrors() { + PushTelemetryResponseData data = new PushTelemetryResponseData() + .setErrorCode(Errors.NONE.code()); + PushTelemetryResponse response = new PushTelemetryResponse(data); + assertEquals(Collections.singletonMap(Errors.NONE, 1), response.errorCounts()); + } + + @Test + public void testErrorCountsReturnsOneError() { + PushTelemetryResponseData data = new PushTelemetryResponseData() + .setErrorCode(Errors.CLUSTER_AUTHORIZATION_FAILED.code()); + data.setErrorCode(Errors.INVALID_CONFIG.code()); + + PushTelemetryResponse response = new PushTelemetryResponse(data); + assertEquals(Collections.singletonMap(Errors.INVALID_CONFIG, 1), response.errorCounts()); + } + +} diff --git a/clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java b/clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java index 303c13abfad..9ad4569225b 100644 --- a/clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java +++ b/clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java @@ -146,6 +146,8 @@ import org.apache.kafka.common.message.FetchResponseData; import org.apache.kafka.common.message.FetchSnapshotRequestData; import org.apache.kafka.common.message.FetchSnapshotResponseData; import org.apache.kafka.common.message.FindCoordinatorRequestData; +import org.apache.kafka.common.message.GetTelemetrySubscriptionsRequestData; +import org.apache.kafka.common.message.GetTelemetrySubscriptionsResponseData; import org.apache.kafka.common.message.HeartbeatRequestData; import org.apache.kafka.common.message.HeartbeatResponseData; import org.apache.kafka.common.message.IncrementalAlterConfigsRequestData; @@ -193,6 +195,8 @@ import org.apache.kafka.common.message.OffsetForLeaderEpochResponseData.EpochEnd import org.apache.kafka.common.message.OffsetForLeaderEpochResponseData.OffsetForLeaderTopicResult; import org.apache.kafka.common.message.ProduceRequestData; import org.apache.kafka.common.message.ProduceResponseData; +import org.apache.kafka.common.message.PushTelemetryRequestData; +import org.apache.kafka.common.message.PushTelemetryResponseData; import org.apache.kafka.common.message.RenewDelegationTokenRequestData; import org.apache.kafka.common.message.RenewDelegationTokenResponseData; import org.apache.kafka.common.message.SaslAuthenticateRequestData; @@ -1065,6 +1069,8 @@ public class RequestResponseTest { case CONSUMER_GROUP_HEARTBEAT: return createConsumerGroupHeartbeatRequest(version); case CONSUMER_GROUP_DESCRIBE: return createConsumerGroupDescribeRequest(version); case CONTROLLER_REGISTRATION: return createControllerRegistrationRequest(version); + case GET_TELEMETRY_SUBSCRIPTIONS: return createGetTelemetrySubscriptionsRequest(version); + case PUSH_TELEMETRY: return createPushTelemetryRequest(version); default: throw new IllegalArgumentException("Unknown API key " + apikey); } } @@ -1142,6 +1148,8 @@ public class RequestResponseTest { case CONSUMER_GROUP_HEARTBEAT: return createConsumerGroupHeartbeatResponse(); case CONSUMER_GROUP_DESCRIBE: return createConsumerGroupDescribeResponse(); case CONTROLLER_REGISTRATION: return createControllerRegistrationResponse(); + case GET_TELEMETRY_SUBSCRIPTIONS: return createGetTelemetrySubscriptionsResponse(); + case PUSH_TELEMETRY: return createPushTelemetryResponse(); default: throw new IllegalArgumentException("Unknown API key " + apikey); } } @@ -3504,6 +3512,41 @@ public class RequestResponseTest { return new ListTransactionsResponse(response); } + private GetTelemetrySubscriptionsRequest createGetTelemetrySubscriptionsRequest(short version) { + return new GetTelemetrySubscriptionsRequest.Builder(new GetTelemetrySubscriptionsRequestData() + .setClientInstanceId(Uuid.randomUuid()) + ).build(version); + } + + private GetTelemetrySubscriptionsResponse createGetTelemetrySubscriptionsResponse() { + GetTelemetrySubscriptionsResponseData response = new GetTelemetrySubscriptionsResponseData(); + response.setClientInstanceId(Uuid.randomUuid()); + response.setSubscriptionId(1); + response.setAcceptedCompressionTypes(Collections.singletonList(CompressionType.GZIP.id)); + response.setPushIntervalMs(60 * 1000); + response.setDeltaTemporality(false); + response.setErrorCode(Errors.NONE.code()); + response.setThrottleTimeMs(10); + return new GetTelemetrySubscriptionsResponse(response); + } + + private PushTelemetryRequest createPushTelemetryRequest(short version) { + return new PushTelemetryRequest.Builder(new PushTelemetryRequestData() + .setClientInstanceId(Uuid.randomUuid()) + .setSubscriptionId(1) + .setTerminating(false) + .setCompressionType(CompressionType.ZSTD.id) + .setMetrics("test-metrics".getBytes(StandardCharsets.UTF_8)) + ).build(version); + } + + private PushTelemetryResponse createPushTelemetryResponse() { + PushTelemetryResponseData response = new PushTelemetryResponseData(); + response.setErrorCode(Errors.NONE.code()); + response.setThrottleTimeMs(10); + return new PushTelemetryResponse(response); + } + @Test public void testInvalidSaslHandShakeRequest() { AbstractRequest request = new SaslHandshakeRequest.Builder( diff --git a/core/src/main/scala/kafka/network/RequestConvertToJson.scala b/core/src/main/scala/kafka/network/RequestConvertToJson.scala index 889b76643ba..e635a5820d5 100644 --- a/core/src/main/scala/kafka/network/RequestConvertToJson.scala +++ b/core/src/main/scala/kafka/network/RequestConvertToJson.scala @@ -64,6 +64,7 @@ object RequestConvertToJson { case req: ExpireDelegationTokenRequest => ExpireDelegationTokenRequestDataJsonConverter.write(req.data, request.version) case req: FetchRequest => FetchRequestDataJsonConverter.write(req.data, request.version) case req: FindCoordinatorRequest => FindCoordinatorRequestDataJsonConverter.write(req.data, request.version) + case req: GetTelemetrySubscriptionsRequest => GetTelemetrySubscriptionsRequestDataJsonConverter.write(req.data, request.version) case req: HeartbeatRequest => HeartbeatRequestDataJsonConverter.write(req.data, request.version) case req: IncrementalAlterConfigsRequest => IncrementalAlterConfigsRequestDataJsonConverter.write(req.data, request.version) case req: InitProducerIdRequest => InitProducerIdRequestDataJsonConverter.write(req.data, request.version) @@ -79,6 +80,7 @@ object RequestConvertToJson { case req: OffsetFetchRequest => OffsetFetchRequestDataJsonConverter.write(req.data, request.version) case req: OffsetsForLeaderEpochRequest => OffsetForLeaderEpochRequestDataJsonConverter.write(req.data, request.version) case req: ProduceRequest => ProduceRequestDataJsonConverter.write(req.data, request.version, false) + case req: PushTelemetryRequest => PushTelemetryRequestDataJsonConverter.write(req.data, request.version) case req: RenewDelegationTokenRequest => RenewDelegationTokenRequestDataJsonConverter.write(req.data, request.version) case req: SaslAuthenticateRequest => SaslAuthenticateRequestDataJsonConverter.write(req.data, request.version) case req: SaslHandshakeRequest => SaslHandshakeRequestDataJsonConverter.write(req.data, request.version) @@ -142,6 +144,7 @@ object RequestConvertToJson { case res: ExpireDelegationTokenResponse => ExpireDelegationTokenResponseDataJsonConverter.write(res.data, version) case res: FetchResponse => FetchResponseDataJsonConverter.write(res.data, version, false) case res: FindCoordinatorResponse => FindCoordinatorResponseDataJsonConverter.write(res.data, version) + case res: GetTelemetrySubscriptionsResponse => GetTelemetrySubscriptionsResponseDataJsonConverter.write(res.data, version) case res: HeartbeatResponse => HeartbeatResponseDataJsonConverter.write(res.data, version) case res: IncrementalAlterConfigsResponse => IncrementalAlterConfigsResponseDataJsonConverter.write(res.data, version) case res: InitProducerIdResponse => InitProducerIdResponseDataJsonConverter.write(res.data, version) @@ -157,6 +160,7 @@ object RequestConvertToJson { case res: OffsetFetchResponse => OffsetFetchResponseDataJsonConverter.write(res.data, version) case res: OffsetsForLeaderEpochResponse => OffsetForLeaderEpochResponseDataJsonConverter.write(res.data, version) case res: ProduceResponse => ProduceResponseDataJsonConverter.write(res.data, version) + case res: PushTelemetryResponse => PushTelemetryResponseDataJsonConverter.write(res.data, version) case res: RenewDelegationTokenResponse => RenewDelegationTokenResponseDataJsonConverter.write(res.data, version) case res: SaslAuthenticateResponse => SaslAuthenticateResponseDataJsonConverter.write(res.data, version) case res: SaslHandshakeResponse => SaslHandshakeResponseDataJsonConverter.write(res.data, version) diff --git a/core/src/main/scala/kafka/server/KafkaApis.scala b/core/src/main/scala/kafka/server/KafkaApis.scala index fade1d1b880..030c056ad4c 100644 --- a/core/src/main/scala/kafka/server/KafkaApis.scala +++ b/core/src/main/scala/kafka/server/KafkaApis.scala @@ -243,6 +243,8 @@ class KafkaApis(val requestChannel: RequestChannel, case ApiKeys.DESCRIBE_QUORUM => forwardToControllerOrFail(request) case ApiKeys.CONSUMER_GROUP_HEARTBEAT => handleConsumerGroupHeartbeat(request).exceptionally(handleError) case ApiKeys.CONSUMER_GROUP_DESCRIBE => handleConsumerGroupDescribe(request).exceptionally(handleError) + case ApiKeys.GET_TELEMETRY_SUBSCRIPTIONS => handleGetTelemetrySubscriptionsRequest(request) + case ApiKeys.PUSH_TELEMETRY => handlePushTelemetryRequest(request) case _ => throw new IllegalStateException(s"No handler for request api key ${request.header.apiKey}") } } catch { @@ -3690,6 +3692,18 @@ class KafkaApis(val requestChannel: RequestChannel, CompletableFuture.completedFuture[Unit](()) } + // Just a place holder for now. + def handleGetTelemetrySubscriptionsRequest(request: RequestChannel.Request): Unit = { + requestHelper.sendMaybeThrottle(request, request.body[GetTelemetrySubscriptionsRequest].getErrorResponse(Errors.UNSUPPORTED_VERSION.exception)) + CompletableFuture.completedFuture[Unit](()) + } + + // Just a place holder for now. + def handlePushTelemetryRequest(request: RequestChannel.Request): Unit = { + requestHelper.sendMaybeThrottle(request, request.body[PushTelemetryRequest].getErrorResponse(Errors.UNSUPPORTED_VERSION.exception)) + CompletableFuture.completedFuture[Unit](()) + } + private def updateRecordConversionStats(request: RequestChannel.Request, tp: TopicPartition, conversionStats: RecordConversionStats): Unit = { diff --git a/core/src/test/scala/unit/kafka/server/KafkaApisTest.scala b/core/src/test/scala/unit/kafka/server/KafkaApisTest.scala index fb3d60e18a6..29ad468c204 100644 --- a/core/src/test/scala/unit/kafka/server/KafkaApisTest.scala +++ b/core/src/test/scala/unit/kafka/server/KafkaApisTest.scala @@ -6222,4 +6222,58 @@ class KafkaApisTest { assertEquals(expectedResponse, response.data) } + + @Test + def testGetTelemetrySubscriptionsNotAllowedForZkClusters(): Unit = { + val data = new GetTelemetrySubscriptionsRequestData() + + val request = buildRequest(new GetTelemetrySubscriptionsRequest.Builder(data, true).build()) + createKafkaApis(enableForwarding = true).handle(request, RequestLocal.NoCaching) + + val response = verifyNoThrottling[GetTelemetrySubscriptionsResponse](request) + assertEquals(Errors.UNKNOWN_SERVER_ERROR, Errors.forCode(response.data.errorCode)) + } + + @Test + def testGetTelemetrySubscriptionsUnsupportedVersionForKRaftClusters(): Unit = { + val data = new GetTelemetrySubscriptionsRequestData() + + val request = buildRequest(new GetTelemetrySubscriptionsRequest.Builder(data, true).build()) + val errorCode = Errors.UNSUPPORTED_VERSION.code + val expectedResponse = new GetTelemetrySubscriptionsResponseData() + expectedResponse.setErrorCode(errorCode) + + metadataCache = MetadataCache.kRaftMetadataCache(brokerId) + createKafkaApis(raftSupport = true).handle(request, RequestLocal.NoCaching) + val response = verifyNoThrottling[GetTelemetrySubscriptionsResponse](request) + + assertEquals(expectedResponse, response.data) + } + + @Test + def testPushTelemetryNotAllowedForZkClusters(): Unit = { + val data = new PushTelemetryRequestData() + + val request = buildRequest(new PushTelemetryRequest.Builder(data, true).build()) + createKafkaApis(enableForwarding = true).handle(request, RequestLocal.NoCaching) + + val response = verifyNoThrottling[PushTelemetryResponse](request) + assertEquals(Errors.UNKNOWN_SERVER_ERROR, Errors.forCode(response.data.errorCode)) + } + + @Test + def testPushTelemetryUnsupportedVersionForKRaftClusters(): Unit = { + val data = new PushTelemetryRequestData() + + val request = buildRequest(new PushTelemetryRequest.Builder(data, true).build()) + val errorCode = Errors.UNSUPPORTED_VERSION.code + val expectedResponse = new PushTelemetryResponseData() + expectedResponse.setErrorCode(errorCode) + + metadataCache = MetadataCache.kRaftMetadataCache(brokerId) + createKafkaApis(raftSupport = true).handle(request, RequestLocal.NoCaching) + val response = verifyNoThrottling[PushTelemetryResponse](request) + + assertEquals(expectedResponse, response.data) + } } diff --git a/core/src/test/scala/unit/kafka/server/RequestQuotaTest.scala b/core/src/test/scala/unit/kafka/server/RequestQuotaTest.scala index 1943f22563b..2f7692d49c1 100644 --- a/core/src/test/scala/unit/kafka/server/RequestQuotaTest.scala +++ b/core/src/test/scala/unit/kafka/server/RequestQuotaTest.scala @@ -713,6 +713,12 @@ class RequestQuotaTest extends BaseRequestTest { case ApiKeys.CONSUMER_GROUP_DESCRIBE => new ConsumerGroupDescribeRequest.Builder(new ConsumerGroupDescribeRequestData(), true) + case ApiKeys.GET_TELEMETRY_SUBSCRIPTIONS => + new GetTelemetrySubscriptionsRequest.Builder(new GetTelemetrySubscriptionsRequestData(), true) + + case ApiKeys.PUSH_TELEMETRY => + new PushTelemetryRequest.Builder(new PushTelemetryRequestData(), true) + case _ => throw new IllegalArgumentException("Unsupported API key " + apiKey) }