diff --git a/build.gradle b/build.gradle index 3326f207b81..fbda13d5a79 100644 --- a/build.gradle +++ b/build.gradle @@ -1464,6 +1464,7 @@ project(':clients') { include "**/org/apache/kafka/server/authorizer/*" include "**/org/apache/kafka/server/policy/*" include "**/org/apache/kafka/server/quota/*" + include "**/org/apache/kafka/server/telemetry/*" } } diff --git a/checkstyle/import-control.xml b/checkstyle/import-control.xml index e8f8bd56547..0db1700c612 100644 --- a/checkstyle/import-control.xml +++ b/checkstyle/import-control.xml @@ -196,6 +196,11 @@ + + + + + @@ -255,6 +260,11 @@ + + + + + diff --git a/clients/src/main/java/org/apache/kafka/common/telemetry/internals/ClientTelemetrySender.java b/clients/src/main/java/org/apache/kafka/common/telemetry/internals/ClientTelemetrySender.java new file mode 100644 index 00000000000..b8499a3af33 --- /dev/null +++ b/clients/src/main/java/org/apache/kafka/common/telemetry/internals/ClientTelemetrySender.java @@ -0,0 +1,79 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kafka.common.telemetry.internals; + +import org.apache.kafka.common.KafkaException; +import org.apache.kafka.common.requests.AbstractRequest.Builder; +import org.apache.kafka.common.requests.GetTelemetrySubscriptionsResponse; +import org.apache.kafka.common.requests.PushTelemetryResponse; + +import java.util.Optional; + +/** + * The interface used by the `NetworkClient` to send telemetry requests. + */ +public interface ClientTelemetrySender extends AutoCloseable { + + /** + * Return the next time when the telemetry API should be attempted (i.e., interval time has elapsed). + *

+ * If another telemetry API is in-flight, then {@code timeoutMs} should be returned as the + * maximum wait time. + * + * @param timeoutMs The timeout for the inflight telemetry API call. + * @return remaining time in ms till the telemetry API be attempted again. + */ + long timeToNextUpdate(long timeoutMs); + + /** + * Return the telemetry request based on client state i.e. determine if + * {@link org.apache.kafka.common.requests.GetTelemetrySubscriptionsRequest} or + * {@link org.apache.kafka.common.requests.PushTelemetryRequest} be constructed. + * + * @return request for telemetry API call. + */ + Optional> createRequest(); + + /** + * Handle successful response for get telemetry subscriptions request. + * + * @param response subscriptions telemetry API response + */ + void handleResponse(GetTelemetrySubscriptionsResponse response); + + /** + * Handle successful response for push telemetry request. + * + * @param response push telemetry API response + */ + void handleResponse(PushTelemetryResponse response); + + /** + * Handle get telemetry subscriptions request failure. + * + * @param kafkaException the fatal exception. + */ + void handleFailedGetTelemetrySubscriptionsRequest(KafkaException kafkaException); + + /** + * Handle push telemetry request failure. + * + * @param kafkaException the fatal exception. + */ + void handleFailedPushTelemetryRequest(KafkaException kafkaException); +} diff --git a/clients/src/main/java/org/apache/kafka/common/telemetry/internals/MetricKey.java b/clients/src/main/java/org/apache/kafka/common/telemetry/internals/MetricKey.java new file mode 100644 index 00000000000..8c700dc722d --- /dev/null +++ b/clients/src/main/java/org/apache/kafka/common/telemetry/internals/MetricKey.java @@ -0,0 +1,94 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.common.telemetry.internals; + +import org.apache.kafka.common.MetricName; + +import java.util.Collections; +import java.util.Map; +import java.util.Objects; + +/** + * Value object that contains the name and tags for a Metric. + */ +public class MetricKey implements MetricKeyable { + + private final String name; + private final Map tags; + + /** + * Create a {@code MetricKey} + * + * @param name metric name. This should be the telemetry metric name of the metric (the final name + * under which this metric is emitted). + */ + public MetricKey(String name) { + this(name, null); + } + + /** + * Create a {@code MetricKey} + * + * @param name metric name. This should be the .converted. name of the metric (the final name + * under which this metric is emitted). + * @param tags mapping of tag keys to values. + */ + public MetricKey(String name, Map tags) { + this.name = Objects.requireNonNull(name); + this.tags = tags != null ? Collections.unmodifiableMap(tags) : Collections.emptyMap(); + } + + public MetricKey(MetricName metricName) { + this(metricName.name(), metricName.tags()); + } + + @Override + public MetricKey key() { + return this; + } + + public String getName() { + return name; + } + + public Map tags() { + return tags; + } + + @Override + public int hashCode() { + return Objects.hash(name, tags); + } + + @Override + public boolean equals(Object obj) { + if (this == obj) + return true; + if (obj == null) + return false; + if (getClass() != obj.getClass()) + return false; + MetricKey other = (MetricKey) obj; + return this.getName().equals(other.getName()) && this.tags().equals(other.tags()); + } + + @Override + public String toString() { + return "MetricKey {name=" + getName() + ", tags=" + tags() + "}"; + } + +} diff --git a/clients/src/main/java/org/apache/kafka/common/telemetry/internals/MetricKeyable.java b/clients/src/main/java/org/apache/kafka/common/telemetry/internals/MetricKeyable.java new file mode 100644 index 00000000000..8eb9220663a --- /dev/null +++ b/clients/src/main/java/org/apache/kafka/common/telemetry/internals/MetricKeyable.java @@ -0,0 +1,30 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.common.telemetry.internals; + +/** + * An object that provides a MetricKey that we can use to uniquely identify a metric. This + * is useful for filtering as well as calculating delta metrics. + */ +public interface MetricKeyable { + + /** + * @return The {@code MetricKey} for respective metric. + */ + MetricKey key(); + +} diff --git a/clients/src/main/java/org/apache/kafka/common/telemetry/internals/MetricsCollector.java b/clients/src/main/java/org/apache/kafka/common/telemetry/internals/MetricsCollector.java new file mode 100644 index 00000000000..ee8dfa2616e --- /dev/null +++ b/clients/src/main/java/org/apache/kafka/common/telemetry/internals/MetricsCollector.java @@ -0,0 +1,83 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.common.telemetry.internals; + +/** + * A {@code MetricsCollector} is responsible for scraping a source of metrics and forwarding + * them to the given {@link MetricsEmitter}. For example, a given collector might be used to collect + * system metrics, Kafka metrics, JVM metrics, or other metrics that are to be captured, exposed, + * and/or forwarded. + * + *

+ * + * In general, a {@code MetricsCollector} implementation is closely managed by another entity + * (that entity is colloquially referred to as the "telemetry reporter") that will be in + * charge of its lifecycle via the {@link #start()} and {@link #stop()} methods. The telemetry + * reporter should ensure that the {@link #start()} method is invoked once and only once + * before calls to {@link #collect(MetricsEmitter)} are made. Implementations of {@code MetricsCollector} + * should allow for the corner-case that {@link #stop()} is called before {@link #start()}, + * which might happen in the case of error on startup of the telemetry reporter. + * + *

+ * + * Regarding threading, the {@link #start()} and {@link #stop()} methods may be called from + * different threads and so proper care should be taken by implementations of the + * {@code MetricsCollector} interface to be thread-safe. However, the telemetry reporter must + * ensure that the {@link #collect(MetricsEmitter)} method should only be invoked in a synchronous + * manner. + * + * @see MetricsEmitter + */ +public interface MetricsCollector { + + /** + * The {@code collect} method is called by the telemetry reporter to retrieve the value + * of its desired set of metrics, and then forward those on to the provided + * {@link MetricsEmitter}. The implementation may choose to collect all the metrics before forwarding + * them to the {@code metricsEmitter}, or they may be forwarded as they are collected. + * + *

+ * + * In general, the implementation should try not to presume the characteristics of the + * {@link MetricsEmitter} so as to keep a loose coupling. + * + * @param metricsEmitter {@link MetricsEmitter} to which the metric values will be passed once collected + */ + void collect(MetricsEmitter metricsEmitter); + + /** + * Allows the {@code MetricsCollector} implementation to initialize itself. This method should + * be invoked by the telemetry reporter before calls to {@link #collect(MetricsEmitter)} are made. The + * telemetry reporter should not invoke this method more than once. + */ + default void start() { + // Do nothing... + } + + /** + * Allows the {@code MetricsCollector} implementation to stop itself and dispose of any resources. + * This method should ideally be invoked only once by the telemetry reporter. + * + *

+ * + * Calls to {@link #collect(MetricsEmitter)} once this method has been invoked should be expected to + * fail by the telemetry reporter; it should take caution to handle that case. + */ + default void stop() { + // Do nothing... + } +} diff --git a/clients/src/main/java/org/apache/kafka/common/telemetry/internals/MetricsEmitter.java b/clients/src/main/java/org/apache/kafka/common/telemetry/internals/MetricsEmitter.java new file mode 100644 index 00000000000..8ba3a13bd32 --- /dev/null +++ b/clients/src/main/java/org/apache/kafka/common/telemetry/internals/MetricsEmitter.java @@ -0,0 +1,107 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.common.telemetry.internals; + +import java.util.Collections; +import java.util.List; + +import java.io.Closeable; + +/** + * An {@code MetricsEmitter} emits the values held by the {@link SinglePointMetric}, likely first converting them + * to a format suitable for exposure, storage, or transmission. The telemetry reporter is likely + * the entity that is familiar with the underlying method of making the metrics visible to the + * broker. Thus, it is the primary place in the code where the implementation details are known. + * + *

+ * + * An {@code MetricsEmitter} is stateless and the telemetry reporter should assume that the object is + * not thread safe and thus concurrent access to either the + * {@link #shouldEmitMetric(MetricKeyable)} or {@link #emitMetric(SinglePointMetric)} should be avoided. + * + * Regarding threading, the {@link #init()} and {@link #close()} methods may be called from + * different threads and so proper care should be taken by implementations of the + * {@code MetricsCollector} interface to be thread-safe. However, the telemetry reporter must + * ensure that the {@link #emitMetric(SinglePointMetric)} and {@link #emittedMetrics()} methods + * should only be invoked in a synchronous manner. + */ +public interface MetricsEmitter extends Closeable { + + /** + * Performs the necessary logic to determine if the metric is to be emitted. The telemetry + * reporter should respect this and not just call the {@link #emitMetric(SinglePointMetric)} directly. + * + * @param metricKeyable Object from which to get the {@link MetricKey} + * @return {@code true} if the metric should be emitted, {@code false} otherwise + */ + boolean shouldEmitMetric(MetricKeyable metricKeyable); + + /** + * Emits the metric in an implementation-specific fashion. Depending on the implementation, + * calls made to this after {@link #close()} has been invoked will fail. + * + * @param metric {@code SinglePointMetric} + * @return {@code true} if the metric was emitted, {@code false} otherwise + */ + boolean emitMetric(SinglePointMetric metric); + + /** + * Return emitted metrics. Implementation should decide if all emitted metrics should be returned + * or should provide the delta from last invocation of this method. Depending on the implementation, + * calls made to this after {@link #close()} has been invoked will fail. + * + * @return emitted metrics. + */ + default List emittedMetrics() { + return Collections.emptyList(); + } + + /** + * Emits a metric if {@link MetricsEmitter#shouldEmitMetric(MetricKeyable)} returns true. + * @param metric to emit + * @return true if emit is successful, false otherwise + */ + default boolean maybeEmitMetric(SinglePointMetric metric) { + return shouldEmitMetric(metric) && emitMetric(metric); + } + + /** + * Allows the {@code MetricsEmitter} implementation to initialize itself. This method should be invoked + * by the telemetry reporter before calls to {@link #emitMetric(SinglePointMetric)} are made. + * + *

+ * + * The telemetry reporter should not invoke this method more than once. + */ + default void init() { + // Do nothing... + } + + /** + * Allows the {@code MetricsEmitter} implementation to stop itself and dispose of any resources. This + * method should ideally be invoked only once by the telemetry reporter. + * + *

+ * + * Calls to {@link #emitMetric(SinglePointMetric)} once this method has been invoked should be + * expected to fail by the telemetry reporter; it should take caution to handle that case. + */ + default void close() { + // Do nothing... + } + +} diff --git a/clients/src/main/java/org/apache/kafka/common/telemetry/internals/SinglePointMetric.java b/clients/src/main/java/org/apache/kafka/common/telemetry/internals/SinglePointMetric.java new file mode 100644 index 00000000000..f81a9bfb60e --- /dev/null +++ b/clients/src/main/java/org/apache/kafka/common/telemetry/internals/SinglePointMetric.java @@ -0,0 +1,37 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.common.telemetry.internals; + +/** + * This class represents a metric that does not yet contain resource tags. + * These additional resource tags will be added before emitting metrics by the telemetry reporter. + */ +public class SinglePointMetric implements MetricKeyable { + + private final MetricKey key; + + private SinglePointMetric(MetricKey key) { + this.key = key; + } + + @Override + public MetricKey key() { + return key; + } + + // TODO: Implement methods for serializing/deserializing metrics in required format. +} diff --git a/clients/src/main/java/org/apache/kafka/server/telemetry/ClientTelemetry.java b/clients/src/main/java/org/apache/kafka/server/telemetry/ClientTelemetry.java new file mode 100644 index 00000000000..4b45e6f359e --- /dev/null +++ b/clients/src/main/java/org/apache/kafka/server/telemetry/ClientTelemetry.java @@ -0,0 +1,44 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kafka.server.telemetry; + +import org.apache.kafka.common.metrics.MetricsContext; +import org.apache.kafka.common.metrics.MetricsReporter; +import org.apache.kafka.common.annotation.InterfaceStability; + +/** + * A {@link MetricsReporter} may implement this interface to indicate support for collecting client + * telemetry on the server side. + */ +@InterfaceStability.Evolving +public interface ClientTelemetry { + + /** + * Implemented by the broker {@link MetricsReporter} to provide a {@link ClientTelemetryReceiver} + * instance. + *

+ * This instance may be cached by the broker. + *

+ * This method must always be called after the initial call to + * {@link MetricsReporter#contextChange(MetricsContext)} on the {@link MetricsReporter} + * implementing this interface. + * + * @return broker side instance of {@link ClientTelemetryReceiver}. + */ + ClientTelemetryReceiver clientReceiver(); +} diff --git a/clients/src/main/java/org/apache/kafka/server/telemetry/ClientTelemetryPayload.java b/clients/src/main/java/org/apache/kafka/server/telemetry/ClientTelemetryPayload.java new file mode 100644 index 00000000000..0a7b55cfdae --- /dev/null +++ b/clients/src/main/java/org/apache/kafka/server/telemetry/ClientTelemetryPayload.java @@ -0,0 +1,53 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kafka.server.telemetry; + +import org.apache.kafka.common.Uuid; +import org.apache.kafka.common.annotation.InterfaceStability; + +import java.nio.ByteBuffer; + +@InterfaceStability.Evolving +public interface ClientTelemetryPayload { + + /** + * @return Client's instance id. + */ + Uuid clientInstanceId(); + + /** + * Indicates whether client is terminating, e.g., the last metrics push from this client instance. + *

+ *To avoid the receiving broker’s metrics rate-limiter discarding this out-of-profile push, the + * PushTelemetryRequest.Terminating field must be set to true. A broker must only allow one such + * unthrottled metrics push for each combination of client instance ID and SubscriptionId. + * + * @return {@code true} if client is terminating, else false + */ + boolean isTerminating(); + + /** + * @return Metrics data content-type/serialization format. + */ + String contentType(); + + /** + * @return Serialized metrics data. + */ + ByteBuffer data(); +} diff --git a/clients/src/main/java/org/apache/kafka/server/telemetry/ClientTelemetryReceiver.java b/clients/src/main/java/org/apache/kafka/server/telemetry/ClientTelemetryReceiver.java new file mode 100644 index 00000000000..ef4cb68dcf1 --- /dev/null +++ b/clients/src/main/java/org/apache/kafka/server/telemetry/ClientTelemetryReceiver.java @@ -0,0 +1,37 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kafka.server.telemetry; + +import org.apache.kafka.common.annotation.InterfaceStability; +import org.apache.kafka.server.authorizer.AuthorizableRequestContext; + +@InterfaceStability.Evolving +public interface ClientTelemetryReceiver { + /** + * Called by the broker when a client reports telemetry metrics. The associated request context + * can be used by the metrics plugin to retrieve additional client information such as client ids + * or endpoints. + *

s + * This method may be called from the request handling thread, and as such should avoid blocking. + * + * @param context the client request context for the corresponding {@code PushTelemetryRequest} + * api call. + * @param payload the encoded telemetry payload as sent by the client. + */ + void exportMetrics(AuthorizableRequestContext context, ClientTelemetryPayload payload); +} diff --git a/clients/src/main/java/org/apache/kafka/server/telemetry/package-info.java b/clients/src/main/java/org/apache/kafka/server/telemetry/package-info.java new file mode 100644 index 00000000000..14fe4408981 --- /dev/null +++ b/clients/src/main/java/org/apache/kafka/server/telemetry/package-info.java @@ -0,0 +1,20 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +/** + * Provides pluggable interface for capturing client telemetry metrics. + */ +package org.apache.kafka.server.telemetry; \ No newline at end of file