Browse Source

KAFKA-15604: Telemetry API request and response schemas and classes (KIP-714) (#14554)

Initial PR for [KIP-714](https://cwiki.apache.org/confluence/display/KAFKA/KIP-714%3A+Client+metrics+and+observability) - [KAFKA-15601](https://issues.apache.org/jira/browse/KAFKA-15601).

This PR defines json request and response schemas for the new Telemetry APIs and implements the corresponding java classes.

Reviewers: 
Andrew Schofield <andrew_schofield@uk.ibm.com>, Kirk True <ktrue@confluent.io>, Matthias J. Sax <matthias@confluent.io>, Walker Carlson <wcarlson@apache.org>
pull/13909/merge
Apoorv Mittal 11 months ago committed by GitHub
parent
commit
36abc8dcea
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 2
      checkstyle/suppressions.xml
  2. 4
      clients/src/main/java/org/apache/kafka/common/protocol/ApiKeys.java
  3. 4
      clients/src/main/java/org/apache/kafka/common/requests/AbstractRequest.java
  4. 4
      clients/src/main/java/org/apache/kafka/common/requests/AbstractResponse.java
  5. 79
      clients/src/main/java/org/apache/kafka/common/requests/GetTelemetrySubscriptionsRequest.java
  6. 72
      clients/src/main/java/org/apache/kafka/common/requests/GetTelemetrySubscriptionsResponse.java
  7. 78
      clients/src/main/java/org/apache/kafka/common/requests/PushTelemetryRequest.java
  8. 73
      clients/src/main/java/org/apache/kafka/common/requests/PushTelemetryResponse.java
  9. 33
      clients/src/main/resources/common/message/GetTelemetrySubscriptionsRequest.json
  10. 60
      clients/src/main/resources/common/message/GetTelemetrySubscriptionsResponse.json
  11. 49
      clients/src/main/resources/common/message/PushTelemetryRequest.json
  12. 32
      clients/src/main/resources/common/message/PushTelemetryResponse.json
  13. 1
      clients/src/test/java/org/apache/kafka/common/protocol/ProtoUtilsTest.java
  14. 36
      clients/src/test/java/org/apache/kafka/common/requests/GetTelemetrySubscriptionsRequestTest.java
  15. 47
      clients/src/test/java/org/apache/kafka/common/requests/GetTelemetrySubscriptionsResponseTest.java
  16. 37
      clients/src/test/java/org/apache/kafka/common/requests/PushTelemetryRequestTest.java
  17. 48
      clients/src/test/java/org/apache/kafka/common/requests/PushTelemetryResponseTest.java
  18. 43
      clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java
  19. 4
      core/src/main/scala/kafka/network/RequestConvertToJson.scala
  20. 14
      core/src/main/scala/kafka/server/KafkaApis.scala
  21. 54
      core/src/test/scala/unit/kafka/server/KafkaApisTest.scala
  22. 6
      core/src/test/scala/unit/kafka/server/RequestQuotaTest.scala

2
checkstyle/suppressions.xml

@ -63,7 +63,7 @@ @@ -63,7 +63,7 @@
files="AbstractResponse.java"/>
<suppress checks="MethodLength"
files="(KerberosLogin|RequestResponseTest|ConnectMetricsRegistry|KafkaConsumer|AbstractStickyAssignor).java"/>
files="(KerberosLogin|RequestResponseTest|ConnectMetricsRegistry|KafkaConsumer|AbstractStickyAssignor|AbstractRequest|AbstractResponse).java"/>
<suppress checks="ParameterNumber"
files="(NetworkClient|FieldSpec|KafkaRaftClient).java"/>

4
clients/src/main/java/org/apache/kafka/common/protocol/ApiKeys.java

@ -113,7 +113,9 @@ public enum ApiKeys { @@ -113,7 +113,9 @@ public enum ApiKeys {
ALLOCATE_PRODUCER_IDS(ApiMessageType.ALLOCATE_PRODUCER_IDS, true, true),
CONSUMER_GROUP_HEARTBEAT(ApiMessageType.CONSUMER_GROUP_HEARTBEAT),
CONSUMER_GROUP_DESCRIBE(ApiMessageType.CONSUMER_GROUP_DESCRIBE),
CONTROLLER_REGISTRATION(ApiMessageType.CONTROLLER_REGISTRATION);
CONTROLLER_REGISTRATION(ApiMessageType.CONTROLLER_REGISTRATION),
GET_TELEMETRY_SUBSCRIPTIONS(ApiMessageType.GET_TELEMETRY_SUBSCRIPTIONS),
PUSH_TELEMETRY(ApiMessageType.PUSH_TELEMETRY);
private static final Map<ApiMessageType.ListenerType, EnumSet<ApiKeys>> APIS_BY_LISTENER =
new EnumMap<>(ApiMessageType.ListenerType.class);

4
clients/src/main/java/org/apache/kafka/common/requests/AbstractRequest.java

@ -316,6 +316,10 @@ public abstract class AbstractRequest implements AbstractRequestResponse { @@ -316,6 +316,10 @@ public abstract class AbstractRequest implements AbstractRequestResponse {
return ConsumerGroupDescribeRequest.parse(buffer, apiVersion);
case CONTROLLER_REGISTRATION:
return ControllerRegistrationRequest.parse(buffer, apiVersion);
case GET_TELEMETRY_SUBSCRIPTIONS:
return GetTelemetrySubscriptionsRequest.parse(buffer, apiVersion);
case PUSH_TELEMETRY:
return PushTelemetryRequest.parse(buffer, apiVersion);
default:
throw new AssertionError(String.format("ApiKey %s is not currently handled in `parseRequest`, the " +
"code should be updated to do so.", apiKey));

4
clients/src/main/java/org/apache/kafka/common/requests/AbstractResponse.java

@ -253,6 +253,10 @@ public abstract class AbstractResponse implements AbstractRequestResponse { @@ -253,6 +253,10 @@ public abstract class AbstractResponse implements AbstractRequestResponse {
return ConsumerGroupDescribeResponse.parse(responseBuffer, version);
case CONTROLLER_REGISTRATION:
return ControllerRegistrationResponse.parse(responseBuffer, version);
case GET_TELEMETRY_SUBSCRIPTIONS:
return GetTelemetrySubscriptionsResponse.parse(responseBuffer, version);
case PUSH_TELEMETRY:
return PushTelemetryResponse.parse(responseBuffer, version);
default:
throw new AssertionError(String.format("ApiKey %s is not currently handled in `parseResponse`, the " +
"code should be updated to do so.", apiKey));

79
clients/src/main/java/org/apache/kafka/common/requests/GetTelemetrySubscriptionsRequest.java

@ -0,0 +1,79 @@ @@ -0,0 +1,79 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.common.requests;
import org.apache.kafka.common.message.GetTelemetrySubscriptionsRequestData;
import org.apache.kafka.common.message.GetTelemetrySubscriptionsResponseData;
import org.apache.kafka.common.protocol.ApiKeys;
import org.apache.kafka.common.protocol.ByteBufferAccessor;
import org.apache.kafka.common.protocol.Errors;
import java.nio.ByteBuffer;
public class GetTelemetrySubscriptionsRequest extends AbstractRequest {
public static class Builder extends AbstractRequest.Builder<GetTelemetrySubscriptionsRequest> {
private final GetTelemetrySubscriptionsRequestData data;
public Builder(GetTelemetrySubscriptionsRequestData data) {
this(data, false);
}
public Builder(GetTelemetrySubscriptionsRequestData data, boolean enableUnstableLastVersion) {
super(ApiKeys.GET_TELEMETRY_SUBSCRIPTIONS, enableUnstableLastVersion);
this.data = data;
}
@Override
public GetTelemetrySubscriptionsRequest build(short version) {
return new GetTelemetrySubscriptionsRequest(data, version);
}
@Override
public String toString() {
return data.toString();
}
}
private final GetTelemetrySubscriptionsRequestData data;
public GetTelemetrySubscriptionsRequest(GetTelemetrySubscriptionsRequestData data, short version) {
super(ApiKeys.GET_TELEMETRY_SUBSCRIPTIONS, version);
this.data = data;
}
@Override
public GetTelemetrySubscriptionsResponse getErrorResponse(int throttleTimeMs, Throwable e) {
GetTelemetrySubscriptionsResponseData responseData = new GetTelemetrySubscriptionsResponseData()
.setErrorCode(Errors.forException(e).code())
.setThrottleTimeMs(throttleTimeMs);
return new GetTelemetrySubscriptionsResponse(responseData);
}
@Override
public GetTelemetrySubscriptionsRequestData data() {
return data;
}
public static GetTelemetrySubscriptionsRequest parse(ByteBuffer buffer, short version) {
return new GetTelemetrySubscriptionsRequest(new GetTelemetrySubscriptionsRequestData(
new ByteBufferAccessor(buffer), version), version);
}
}

72
clients/src/main/java/org/apache/kafka/common/requests/GetTelemetrySubscriptionsResponse.java

@ -0,0 +1,72 @@ @@ -0,0 +1,72 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.common.requests;
import org.apache.kafka.common.message.GetTelemetrySubscriptionsResponseData;
import org.apache.kafka.common.protocol.ApiKeys;
import org.apache.kafka.common.protocol.ByteBufferAccessor;
import org.apache.kafka.common.protocol.Errors;
import java.nio.ByteBuffer;
import java.util.HashMap;
import java.util.Map;
public class GetTelemetrySubscriptionsResponse extends AbstractResponse {
private final GetTelemetrySubscriptionsResponseData data;
public GetTelemetrySubscriptionsResponse(GetTelemetrySubscriptionsResponseData data) {
super(ApiKeys.GET_TELEMETRY_SUBSCRIPTIONS);
this.data = data;
}
@Override
public GetTelemetrySubscriptionsResponseData data() {
return data;
}
@Override
public Map<Errors, Integer> errorCounts() {
HashMap<Errors, Integer> counts = new HashMap<>();
updateErrorCounts(counts, Errors.forCode(data.errorCode()));
return counts;
}
@Override
public int throttleTimeMs() {
return data.throttleTimeMs();
}
@Override
public void maybeSetThrottleTimeMs(int throttleTimeMs) {
data.setThrottleTimeMs(throttleTimeMs);
}
public boolean hasError() {
return error() != Errors.NONE;
}
public Errors error() {
return Errors.forCode(data.errorCode());
}
public static GetTelemetrySubscriptionsResponse parse(ByteBuffer buffer, short version) {
return new GetTelemetrySubscriptionsResponse(new GetTelemetrySubscriptionsResponseData(
new ByteBufferAccessor(buffer), version));
}
}

78
clients/src/main/java/org/apache/kafka/common/requests/PushTelemetryRequest.java

@ -0,0 +1,78 @@ @@ -0,0 +1,78 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.common.requests;
import org.apache.kafka.common.message.PushTelemetryRequestData;
import org.apache.kafka.common.message.PushTelemetryResponseData;
import org.apache.kafka.common.protocol.ApiKeys;
import org.apache.kafka.common.protocol.ByteBufferAccessor;
import org.apache.kafka.common.protocol.Errors;
import java.nio.ByteBuffer;
public class PushTelemetryRequest extends AbstractRequest {
public static class Builder extends AbstractRequest.Builder<PushTelemetryRequest> {
private final PushTelemetryRequestData data;
public Builder(PushTelemetryRequestData data) {
this(data, false);
}
public Builder(PushTelemetryRequestData data, boolean enableUnstableLastVersion) {
super(ApiKeys.PUSH_TELEMETRY, enableUnstableLastVersion);
this.data = data;
}
@Override
public PushTelemetryRequest build(short version) {
return new PushTelemetryRequest(data, version);
}
@Override
public String toString() {
return data.toString();
}
}
private final PushTelemetryRequestData data;
public PushTelemetryRequest(PushTelemetryRequestData data, short version) {
super(ApiKeys.PUSH_TELEMETRY, version);
this.data = data;
}
@Override
public PushTelemetryResponse getErrorResponse(int throttleTimeMs, Throwable e) {
PushTelemetryResponseData responseData = new PushTelemetryResponseData()
.setErrorCode(Errors.forException(e).code())
.setThrottleTimeMs(throttleTimeMs);
return new PushTelemetryResponse(responseData);
}
@Override
public PushTelemetryRequestData data() {
return data;
}
public static PushTelemetryRequest parse(ByteBuffer buffer, short version) {
return new PushTelemetryRequest(new PushTelemetryRequestData(
new ByteBufferAccessor(buffer), version), version);
}
}

73
clients/src/main/java/org/apache/kafka/common/requests/PushTelemetryResponse.java

@ -0,0 +1,73 @@ @@ -0,0 +1,73 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.common.requests;
import org.apache.kafka.common.message.PushTelemetryResponseData;
import org.apache.kafka.common.protocol.ApiKeys;
import org.apache.kafka.common.protocol.ByteBufferAccessor;
import org.apache.kafka.common.protocol.Errors;
import java.nio.ByteBuffer;
import java.util.HashMap;
import java.util.Map;
public class PushTelemetryResponse extends AbstractResponse {
private final PushTelemetryResponseData data;
public PushTelemetryResponse(PushTelemetryResponseData data) {
super(ApiKeys.PUSH_TELEMETRY);
this.data = data;
}
@Override
public PushTelemetryResponseData data() {
return data;
}
@Override
public Map<Errors, Integer> errorCounts() {
HashMap<Errors, Integer> counts = new HashMap<>();
updateErrorCounts(counts, Errors.forCode(data.errorCode()));
return counts;
}
@Override
public int throttleTimeMs() {
return data.throttleTimeMs();
}
@Override
public void maybeSetThrottleTimeMs(int throttleTimeMs) {
data.setThrottleTimeMs(throttleTimeMs);
}
public boolean hasError() {
return error() != Errors.NONE;
}
public Errors error() {
return Errors.forCode(data.errorCode());
}
public static PushTelemetryResponse parse(ByteBuffer buffer, short version) {
return new PushTelemetryResponse(new PushTelemetryResponseData(
new ByteBufferAccessor(buffer), version));
}
}

33
clients/src/main/resources/common/message/GetTelemetrySubscriptionsRequest.json

@ -0,0 +1,33 @@ @@ -0,0 +1,33 @@
// Licensed to the Apache Software Foundation (ASF) under one or more
// contributor license agreements. See the NOTICE file distributed with
// this work for additional information regarding copyright ownership.
// The ASF licenses this file to You under the Apache License, Version 2.0
// (the "License"); you may not use this file except in compliance with
// the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
{
"apiKey": 71,
"type": "request",
"listeners": ["broker"],
"name": "GetTelemetrySubscriptionsRequest",
"validVersions": "0",
"flexibleVersions": "0+",
// The Telemetry APIs are added as part of KIP-714 and are still under
// development. Hence, the APIs are not exposed by default unless explicitly
// enabled.
"latestVersionUnstable": true,
"fields": [
{
"name": "ClientInstanceId", "type": "uuid", "versions": "0+",
"about": "Unique id for this client instance, must be set to 0 on the first request."
}
]
}

60
clients/src/main/resources/common/message/GetTelemetrySubscriptionsResponse.json

@ -0,0 +1,60 @@ @@ -0,0 +1,60 @@
// Licensed to the Apache Software Foundation (ASF) under one or more
// contributor license agreements. See the NOTICE file distributed with
// this work for additional information regarding copyright ownership.
// The ASF licenses this file to You under the Apache License, Version 2.0
// (the "License"); you may not use this file except in compliance with
// the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
{
"apiKey": 71,
"type": "response",
"name": "GetTelemetrySubscriptionsResponse",
"validVersions": "0",
"flexibleVersions": "0+",
"fields": [
{
"name": "ThrottleTimeMs", "type": "int32", "versions": "0+",
"about": "The duration in milliseconds for which the request was throttled due to a quota violation, or zero if the request did not violate any quota."
},
{
"name": "ErrorCode", "type": "int16", "versions": "0+",
"about": "The error code, or 0 if there was no error."
},
{
"name": "ClientInstanceId", "type": "uuid", "versions": "0+",
"about": "Assigned client instance id if ClientInstanceId was 0 in the request, else 0."
},
{
"name": "SubscriptionId", "type": "int32", "versions": "0+",
"about": "Unique identifier for the current subscription set for this client instance."
},
{
"name": "AcceptedCompressionTypes", "type": "[]int8", "versions": "0+",
"about": "Compression types that broker accepts for the PushTelemetryRequest."
},
{
"name": "PushIntervalMs", "type": "int32", "versions": "0+",
"about": "Configured push interval, which is the lowest configured interval in the current subscription set."
},
{
"name": "TelemetryMaxBytes", "type": "int32", "versions": "0+",
"about": "The maximum bytes of binary data the broker accepts in PushTelemetryRequest."
},
{
"name": "DeltaTemporality", "type": "bool", "versions": "0+",
"about": "Flag to indicate monotonic/counter metrics are to be emitted as deltas or cumulative values"
},
{
"name": "RequestedMetrics", "type": "[]string", "versions": "0+",
"about": "Requested metrics prefix string match. Empty array: No metrics subscribed, Array[0] empty string: All metrics subscribed."
}
]
}

49
clients/src/main/resources/common/message/PushTelemetryRequest.json

@ -0,0 +1,49 @@ @@ -0,0 +1,49 @@
// Licensed to the Apache Software Foundation (ASF) under one or more
// contributor license agreements. See the NOTICE file distributed with
// this work for additional information regarding copyright ownership.
// The ASF licenses this file to You under the Apache License, Version 2.0
// (the "License"); you may not use this file except in compliance with
// the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
{
"apiKey": 72,
"type": "request",
"listeners": ["broker"],
"name": "PushTelemetryRequest",
"validVersions": "0",
"flexibleVersions": "0+",
// The Telemetry APIs are added as part of KIP-714 and are still under
// development. Hence, the APIs are not exposed by default unless explicitly
// enabled.
"latestVersionUnstable": true,
"fields": [
{
"name": "ClientInstanceId", "type": "uuid", "versions": "0+",
"about": "Unique id for this client instance."
},
{
"name": "SubscriptionId", "type": "int32", "versions": "0+",
"about": "Unique identifier for the current subscription."
},
{
"name": "Terminating", "type": "bool", "versions": "0+",
"about": "Client is terminating the connection."
},
{
"name": "CompressionType", "type": "int8", "versions": "0+",
"about": "Compression codec used to compress the metrics."
},
{
"name": "Metrics", "type": "bytes", "versions": "0+",
"about": "Metrics encoded in OpenTelemetry MetricsData v1 protobuf format."
}
]
}

32
clients/src/main/resources/common/message/PushTelemetryResponse.json

@ -0,0 +1,32 @@ @@ -0,0 +1,32 @@
// Licensed to the Apache Software Foundation (ASF) under one or more
// contributor license agreements. See the NOTICE file distributed with
// this work for additional information regarding copyright ownership.
// The ASF licenses this file to You under the Apache License, Version 2.0
// (the "License"); you may not use this file except in compliance with
// the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
{
"apiKey": 72,
"type": "response",
"name": "PushTelemetryResponse",
"validVersions": "0",
"flexibleVersions": "0+",
"fields": [
{
"name": "ThrottleTimeMs", "type": "int32", "versions": "0+",
"about": "The duration in milliseconds for which the request was throttled due to a quota violation, or zero if the request did not violate any quota."
},
{
"name": "ErrorCode", "type": "int16", "versions": "0+",
"about": "The error code, or 0 if there was no error."
}
]
}

1
clients/src/test/java/org/apache/kafka/common/protocol/ProtoUtilsTest.java

@ -34,6 +34,7 @@ public class ProtoUtilsTest { @@ -34,6 +34,7 @@ public class ProtoUtilsTest {
case EXPIRE_DELEGATION_TOKEN:
case RENEW_DELEGATION_TOKEN:
case ALTER_USER_SCRAM_CREDENTIALS:
case PUSH_TELEMETRY:
case ENVELOPE:
assertTrue(key.requiresDelayedAllocation, key + " should require delayed allocation");
break;

36
clients/src/test/java/org/apache/kafka/common/requests/GetTelemetrySubscriptionsRequestTest.java

@ -0,0 +1,36 @@ @@ -0,0 +1,36 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.common.requests;
import org.apache.kafka.common.message.GetTelemetrySubscriptionsRequestData;
import org.apache.kafka.common.protocol.Errors;
import org.junit.jupiter.api.Test;
import java.util.Collections;
import static org.junit.jupiter.api.Assertions.assertEquals;
public class GetTelemetrySubscriptionsRequestTest {
@Test
public void testGetErrorResponse() {
GetTelemetrySubscriptionsRequest req = new GetTelemetrySubscriptionsRequest(new GetTelemetrySubscriptionsRequestData(), (short) 0);
GetTelemetrySubscriptionsResponse response = req.getErrorResponse(0, Errors.CLUSTER_AUTHORIZATION_FAILED.exception());
assertEquals(Collections.singletonMap(Errors.CLUSTER_AUTHORIZATION_FAILED, 1), response.errorCounts());
}
}

47
clients/src/test/java/org/apache/kafka/common/requests/GetTelemetrySubscriptionsResponseTest.java

@ -0,0 +1,47 @@ @@ -0,0 +1,47 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.common.requests;
import org.apache.kafka.common.message.GetTelemetrySubscriptionsResponseData;
import org.apache.kafka.common.protocol.Errors;
import org.junit.jupiter.api.Test;
import java.util.Collections;
import static org.junit.jupiter.api.Assertions.assertEquals;
public class GetTelemetrySubscriptionsResponseTest {
@Test
public void testErrorCountsReturnsNoneWhenNoErrors() {
GetTelemetrySubscriptionsResponseData data = new GetTelemetrySubscriptionsResponseData()
.setErrorCode(Errors.NONE.code());
GetTelemetrySubscriptionsResponse response = new GetTelemetrySubscriptionsResponse(data);
assertEquals(Collections.singletonMap(Errors.NONE, 1), response.errorCounts());
}
@Test
public void testErrorCountsReturnsOneError() {
GetTelemetrySubscriptionsResponseData data = new GetTelemetrySubscriptionsResponseData()
.setErrorCode(Errors.CLUSTER_AUTHORIZATION_FAILED.code());
data.setErrorCode(Errors.INVALID_CONFIG.code());
GetTelemetrySubscriptionsResponse response = new GetTelemetrySubscriptionsResponse(data);
assertEquals(Collections.singletonMap(Errors.INVALID_CONFIG, 1), response.errorCounts());
}
}

37
clients/src/test/java/org/apache/kafka/common/requests/PushTelemetryRequestTest.java

@ -0,0 +1,37 @@ @@ -0,0 +1,37 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.common.requests;
import org.apache.kafka.common.message.PushTelemetryRequestData;
import org.apache.kafka.common.protocol.Errors;
import org.junit.jupiter.api.Test;
import java.util.Collections;
import static org.junit.jupiter.api.Assertions.assertEquals;
public class PushTelemetryRequestTest {
@Test
public void testGetErrorResponse() {
PushTelemetryRequest req = new PushTelemetryRequest(new PushTelemetryRequestData(), (short) 0);
PushTelemetryResponse response = req.getErrorResponse(0, Errors.CLUSTER_AUTHORIZATION_FAILED.exception());
assertEquals(Collections.singletonMap(Errors.CLUSTER_AUTHORIZATION_FAILED, 1), response.errorCounts());
}
}

48
clients/src/test/java/org/apache/kafka/common/requests/PushTelemetryResponseTest.java

@ -0,0 +1,48 @@ @@ -0,0 +1,48 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.common.requests;
import org.apache.kafka.common.message.PushTelemetryResponseData;
import org.apache.kafka.common.protocol.Errors;
import org.junit.jupiter.api.Test;
import java.util.Collections;
import static org.junit.jupiter.api.Assertions.assertEquals;
public class PushTelemetryResponseTest {
@Test
public void testErrorCountsReturnsNoneWhenNoErrors() {
PushTelemetryResponseData data = new PushTelemetryResponseData()
.setErrorCode(Errors.NONE.code());
PushTelemetryResponse response = new PushTelemetryResponse(data);
assertEquals(Collections.singletonMap(Errors.NONE, 1), response.errorCounts());
}
@Test
public void testErrorCountsReturnsOneError() {
PushTelemetryResponseData data = new PushTelemetryResponseData()
.setErrorCode(Errors.CLUSTER_AUTHORIZATION_FAILED.code());
data.setErrorCode(Errors.INVALID_CONFIG.code());
PushTelemetryResponse response = new PushTelemetryResponse(data);
assertEquals(Collections.singletonMap(Errors.INVALID_CONFIG, 1), response.errorCounts());
}
}

43
clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java

@ -146,6 +146,8 @@ import org.apache.kafka.common.message.FetchResponseData; @@ -146,6 +146,8 @@ import org.apache.kafka.common.message.FetchResponseData;
import org.apache.kafka.common.message.FetchSnapshotRequestData;
import org.apache.kafka.common.message.FetchSnapshotResponseData;
import org.apache.kafka.common.message.FindCoordinatorRequestData;
import org.apache.kafka.common.message.GetTelemetrySubscriptionsRequestData;
import org.apache.kafka.common.message.GetTelemetrySubscriptionsResponseData;
import org.apache.kafka.common.message.HeartbeatRequestData;
import org.apache.kafka.common.message.HeartbeatResponseData;
import org.apache.kafka.common.message.IncrementalAlterConfigsRequestData;
@ -193,6 +195,8 @@ import org.apache.kafka.common.message.OffsetForLeaderEpochResponseData.EpochEnd @@ -193,6 +195,8 @@ import org.apache.kafka.common.message.OffsetForLeaderEpochResponseData.EpochEnd
import org.apache.kafka.common.message.OffsetForLeaderEpochResponseData.OffsetForLeaderTopicResult;
import org.apache.kafka.common.message.ProduceRequestData;
import org.apache.kafka.common.message.ProduceResponseData;
import org.apache.kafka.common.message.PushTelemetryRequestData;
import org.apache.kafka.common.message.PushTelemetryResponseData;
import org.apache.kafka.common.message.RenewDelegationTokenRequestData;
import org.apache.kafka.common.message.RenewDelegationTokenResponseData;
import org.apache.kafka.common.message.SaslAuthenticateRequestData;
@ -1065,6 +1069,8 @@ public class RequestResponseTest { @@ -1065,6 +1069,8 @@ public class RequestResponseTest {
case CONSUMER_GROUP_HEARTBEAT: return createConsumerGroupHeartbeatRequest(version);
case CONSUMER_GROUP_DESCRIBE: return createConsumerGroupDescribeRequest(version);
case CONTROLLER_REGISTRATION: return createControllerRegistrationRequest(version);
case GET_TELEMETRY_SUBSCRIPTIONS: return createGetTelemetrySubscriptionsRequest(version);
case PUSH_TELEMETRY: return createPushTelemetryRequest(version);
default: throw new IllegalArgumentException("Unknown API key " + apikey);
}
}
@ -1142,6 +1148,8 @@ public class RequestResponseTest { @@ -1142,6 +1148,8 @@ public class RequestResponseTest {
case CONSUMER_GROUP_HEARTBEAT: return createConsumerGroupHeartbeatResponse();
case CONSUMER_GROUP_DESCRIBE: return createConsumerGroupDescribeResponse();
case CONTROLLER_REGISTRATION: return createControllerRegistrationResponse();
case GET_TELEMETRY_SUBSCRIPTIONS: return createGetTelemetrySubscriptionsResponse();
case PUSH_TELEMETRY: return createPushTelemetryResponse();
default: throw new IllegalArgumentException("Unknown API key " + apikey);
}
}
@ -3504,6 +3512,41 @@ public class RequestResponseTest { @@ -3504,6 +3512,41 @@ public class RequestResponseTest {
return new ListTransactionsResponse(response);
}
private GetTelemetrySubscriptionsRequest createGetTelemetrySubscriptionsRequest(short version) {
return new GetTelemetrySubscriptionsRequest.Builder(new GetTelemetrySubscriptionsRequestData()
.setClientInstanceId(Uuid.randomUuid())
).build(version);
}
private GetTelemetrySubscriptionsResponse createGetTelemetrySubscriptionsResponse() {
GetTelemetrySubscriptionsResponseData response = new GetTelemetrySubscriptionsResponseData();
response.setClientInstanceId(Uuid.randomUuid());
response.setSubscriptionId(1);
response.setAcceptedCompressionTypes(Collections.singletonList(CompressionType.GZIP.id));
response.setPushIntervalMs(60 * 1000);
response.setDeltaTemporality(false);
response.setErrorCode(Errors.NONE.code());
response.setThrottleTimeMs(10);
return new GetTelemetrySubscriptionsResponse(response);
}
private PushTelemetryRequest createPushTelemetryRequest(short version) {
return new PushTelemetryRequest.Builder(new PushTelemetryRequestData()
.setClientInstanceId(Uuid.randomUuid())
.setSubscriptionId(1)
.setTerminating(false)
.setCompressionType(CompressionType.ZSTD.id)
.setMetrics("test-metrics".getBytes(StandardCharsets.UTF_8))
).build(version);
}
private PushTelemetryResponse createPushTelemetryResponse() {
PushTelemetryResponseData response = new PushTelemetryResponseData();
response.setErrorCode(Errors.NONE.code());
response.setThrottleTimeMs(10);
return new PushTelemetryResponse(response);
}
@Test
public void testInvalidSaslHandShakeRequest() {
AbstractRequest request = new SaslHandshakeRequest.Builder(

4
core/src/main/scala/kafka/network/RequestConvertToJson.scala

@ -64,6 +64,7 @@ object RequestConvertToJson { @@ -64,6 +64,7 @@ object RequestConvertToJson {
case req: ExpireDelegationTokenRequest => ExpireDelegationTokenRequestDataJsonConverter.write(req.data, request.version)
case req: FetchRequest => FetchRequestDataJsonConverter.write(req.data, request.version)
case req: FindCoordinatorRequest => FindCoordinatorRequestDataJsonConverter.write(req.data, request.version)
case req: GetTelemetrySubscriptionsRequest => GetTelemetrySubscriptionsRequestDataJsonConverter.write(req.data, request.version)
case req: HeartbeatRequest => HeartbeatRequestDataJsonConverter.write(req.data, request.version)
case req: IncrementalAlterConfigsRequest => IncrementalAlterConfigsRequestDataJsonConverter.write(req.data, request.version)
case req: InitProducerIdRequest => InitProducerIdRequestDataJsonConverter.write(req.data, request.version)
@ -79,6 +80,7 @@ object RequestConvertToJson { @@ -79,6 +80,7 @@ object RequestConvertToJson {
case req: OffsetFetchRequest => OffsetFetchRequestDataJsonConverter.write(req.data, request.version)
case req: OffsetsForLeaderEpochRequest => OffsetForLeaderEpochRequestDataJsonConverter.write(req.data, request.version)
case req: ProduceRequest => ProduceRequestDataJsonConverter.write(req.data, request.version, false)
case req: PushTelemetryRequest => PushTelemetryRequestDataJsonConverter.write(req.data, request.version)
case req: RenewDelegationTokenRequest => RenewDelegationTokenRequestDataJsonConverter.write(req.data, request.version)
case req: SaslAuthenticateRequest => SaslAuthenticateRequestDataJsonConverter.write(req.data, request.version)
case req: SaslHandshakeRequest => SaslHandshakeRequestDataJsonConverter.write(req.data, request.version)
@ -142,6 +144,7 @@ object RequestConvertToJson { @@ -142,6 +144,7 @@ object RequestConvertToJson {
case res: ExpireDelegationTokenResponse => ExpireDelegationTokenResponseDataJsonConverter.write(res.data, version)
case res: FetchResponse => FetchResponseDataJsonConverter.write(res.data, version, false)
case res: FindCoordinatorResponse => FindCoordinatorResponseDataJsonConverter.write(res.data, version)
case res: GetTelemetrySubscriptionsResponse => GetTelemetrySubscriptionsResponseDataJsonConverter.write(res.data, version)
case res: HeartbeatResponse => HeartbeatResponseDataJsonConverter.write(res.data, version)
case res: IncrementalAlterConfigsResponse => IncrementalAlterConfigsResponseDataJsonConverter.write(res.data, version)
case res: InitProducerIdResponse => InitProducerIdResponseDataJsonConverter.write(res.data, version)
@ -157,6 +160,7 @@ object RequestConvertToJson { @@ -157,6 +160,7 @@ object RequestConvertToJson {
case res: OffsetFetchResponse => OffsetFetchResponseDataJsonConverter.write(res.data, version)
case res: OffsetsForLeaderEpochResponse => OffsetForLeaderEpochResponseDataJsonConverter.write(res.data, version)
case res: ProduceResponse => ProduceResponseDataJsonConverter.write(res.data, version)
case res: PushTelemetryResponse => PushTelemetryResponseDataJsonConverter.write(res.data, version)
case res: RenewDelegationTokenResponse => RenewDelegationTokenResponseDataJsonConverter.write(res.data, version)
case res: SaslAuthenticateResponse => SaslAuthenticateResponseDataJsonConverter.write(res.data, version)
case res: SaslHandshakeResponse => SaslHandshakeResponseDataJsonConverter.write(res.data, version)

14
core/src/main/scala/kafka/server/KafkaApis.scala

@ -243,6 +243,8 @@ class KafkaApis(val requestChannel: RequestChannel, @@ -243,6 +243,8 @@ class KafkaApis(val requestChannel: RequestChannel,
case ApiKeys.DESCRIBE_QUORUM => forwardToControllerOrFail(request)
case ApiKeys.CONSUMER_GROUP_HEARTBEAT => handleConsumerGroupHeartbeat(request).exceptionally(handleError)
case ApiKeys.CONSUMER_GROUP_DESCRIBE => handleConsumerGroupDescribe(request).exceptionally(handleError)
case ApiKeys.GET_TELEMETRY_SUBSCRIPTIONS => handleGetTelemetrySubscriptionsRequest(request)
case ApiKeys.PUSH_TELEMETRY => handlePushTelemetryRequest(request)
case _ => throw new IllegalStateException(s"No handler for request api key ${request.header.apiKey}")
}
} catch {
@ -3690,6 +3692,18 @@ class KafkaApis(val requestChannel: RequestChannel, @@ -3690,6 +3692,18 @@ class KafkaApis(val requestChannel: RequestChannel,
CompletableFuture.completedFuture[Unit](())
}
// Just a place holder for now.
def handleGetTelemetrySubscriptionsRequest(request: RequestChannel.Request): Unit = {
requestHelper.sendMaybeThrottle(request, request.body[GetTelemetrySubscriptionsRequest].getErrorResponse(Errors.UNSUPPORTED_VERSION.exception))
CompletableFuture.completedFuture[Unit](())
}
// Just a place holder for now.
def handlePushTelemetryRequest(request: RequestChannel.Request): Unit = {
requestHelper.sendMaybeThrottle(request, request.body[PushTelemetryRequest].getErrorResponse(Errors.UNSUPPORTED_VERSION.exception))
CompletableFuture.completedFuture[Unit](())
}
private def updateRecordConversionStats(request: RequestChannel.Request,
tp: TopicPartition,
conversionStats: RecordConversionStats): Unit = {

54
core/src/test/scala/unit/kafka/server/KafkaApisTest.scala

@ -6222,4 +6222,58 @@ class KafkaApisTest { @@ -6222,4 +6222,58 @@ class KafkaApisTest {
assertEquals(expectedResponse, response.data)
}
@Test
def testGetTelemetrySubscriptionsNotAllowedForZkClusters(): Unit = {
val data = new GetTelemetrySubscriptionsRequestData()
val request = buildRequest(new GetTelemetrySubscriptionsRequest.Builder(data, true).build())
createKafkaApis(enableForwarding = true).handle(request, RequestLocal.NoCaching)
val response = verifyNoThrottling[GetTelemetrySubscriptionsResponse](request)
assertEquals(Errors.UNKNOWN_SERVER_ERROR, Errors.forCode(response.data.errorCode))
}
@Test
def testGetTelemetrySubscriptionsUnsupportedVersionForKRaftClusters(): Unit = {
val data = new GetTelemetrySubscriptionsRequestData()
val request = buildRequest(new GetTelemetrySubscriptionsRequest.Builder(data, true).build())
val errorCode = Errors.UNSUPPORTED_VERSION.code
val expectedResponse = new GetTelemetrySubscriptionsResponseData()
expectedResponse.setErrorCode(errorCode)
metadataCache = MetadataCache.kRaftMetadataCache(brokerId)
createKafkaApis(raftSupport = true).handle(request, RequestLocal.NoCaching)
val response = verifyNoThrottling[GetTelemetrySubscriptionsResponse](request)
assertEquals(expectedResponse, response.data)
}
@Test
def testPushTelemetryNotAllowedForZkClusters(): Unit = {
val data = new PushTelemetryRequestData()
val request = buildRequest(new PushTelemetryRequest.Builder(data, true).build())
createKafkaApis(enableForwarding = true).handle(request, RequestLocal.NoCaching)
val response = verifyNoThrottling[PushTelemetryResponse](request)
assertEquals(Errors.UNKNOWN_SERVER_ERROR, Errors.forCode(response.data.errorCode))
}
@Test
def testPushTelemetryUnsupportedVersionForKRaftClusters(): Unit = {
val data = new PushTelemetryRequestData()
val request = buildRequest(new PushTelemetryRequest.Builder(data, true).build())
val errorCode = Errors.UNSUPPORTED_VERSION.code
val expectedResponse = new PushTelemetryResponseData()
expectedResponse.setErrorCode(errorCode)
metadataCache = MetadataCache.kRaftMetadataCache(brokerId)
createKafkaApis(raftSupport = true).handle(request, RequestLocal.NoCaching)
val response = verifyNoThrottling[PushTelemetryResponse](request)
assertEquals(expectedResponse, response.data)
}
}

6
core/src/test/scala/unit/kafka/server/RequestQuotaTest.scala

@ -713,6 +713,12 @@ class RequestQuotaTest extends BaseRequestTest { @@ -713,6 +713,12 @@ class RequestQuotaTest extends BaseRequestTest {
case ApiKeys.CONSUMER_GROUP_DESCRIBE =>
new ConsumerGroupDescribeRequest.Builder(new ConsumerGroupDescribeRequestData(), true)
case ApiKeys.GET_TELEMETRY_SUBSCRIPTIONS =>
new GetTelemetrySubscriptionsRequest.Builder(new GetTelemetrySubscriptionsRequestData(), true)
case ApiKeys.PUSH_TELEMETRY =>
new PushTelemetryRequest.Builder(new PushTelemetryRequestData(), true)
case _ =>
throw new IllegalArgumentException("Unsupported API key " + apiKey)
}

Loading…
Cancel
Save