Browse Source

KAFKA-15614: Define interfaces and classes for client telemetry (#14575)

This PR for KIP-714 - KAFKA-1564 lays out interfaces and classes for capturing client telemetry metrics.

Below image defines interaction of different classes among them interfaces have been included in the PR.

Reviewers: Walker Carlson <wcarlson@apache.org>, Matthias J. Sax <matthias@confluent.io>, Andrew Schofield <andrew_schofield@uk.ibm.com>, Kirk True <ktrue@confluent.io>, Philip Nee <pnee@confluent.io>, Jun Rao <junrao@gmail.com>,
pull/14444/merge
Apoorv Mittal 11 months ago committed by GitHub
parent
commit
ad2677bb7b
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 1
      build.gradle
  2. 10
      checkstyle/import-control.xml
  3. 79
      clients/src/main/java/org/apache/kafka/common/telemetry/internals/ClientTelemetrySender.java
  4. 94
      clients/src/main/java/org/apache/kafka/common/telemetry/internals/MetricKey.java
  5. 30
      clients/src/main/java/org/apache/kafka/common/telemetry/internals/MetricKeyable.java
  6. 83
      clients/src/main/java/org/apache/kafka/common/telemetry/internals/MetricsCollector.java
  7. 107
      clients/src/main/java/org/apache/kafka/common/telemetry/internals/MetricsEmitter.java
  8. 37
      clients/src/main/java/org/apache/kafka/common/telemetry/internals/SinglePointMetric.java
  9. 44
      clients/src/main/java/org/apache/kafka/server/telemetry/ClientTelemetry.java
  10. 53
      clients/src/main/java/org/apache/kafka/server/telemetry/ClientTelemetryPayload.java
  11. 37
      clients/src/main/java/org/apache/kafka/server/telemetry/ClientTelemetryReceiver.java
  12. 20
      clients/src/main/java/org/apache/kafka/server/telemetry/package-info.java

1
build.gradle

@ -1464,6 +1464,7 @@ project(':clients') { @@ -1464,6 +1464,7 @@ project(':clients') {
include "**/org/apache/kafka/server/authorizer/*"
include "**/org/apache/kafka/server/policy/*"
include "**/org/apache/kafka/server/quota/*"
include "**/org/apache/kafka/server/telemetry/*"
}
}

10
checkstyle/import-control.xml

@ -196,6 +196,11 @@ @@ -196,6 +196,11 @@
<subpackage name="quotas">
<allow pkg="org.apache.kafka.common" />
</subpackage>
<subpackage name="telemetry">
<allow pkg="org.apache.kafka.common" />
</subpackage>
</subpackage>
<subpackage name="clients">
@ -255,6 +260,11 @@ @@ -255,6 +260,11 @@
<!-- This is required to make AlterConfigPolicyTest work. -->
<allow pkg="org.apache.kafka.server.policy" />
<subpackage name="telemetry">
<allow class="org.apache.kafka.server.authorizer.AuthorizableRequestContext" />
</subpackage>
</subpackage>
<subpackage name="shell">

79
clients/src/main/java/org/apache/kafka/common/telemetry/internals/ClientTelemetrySender.java

@ -0,0 +1,79 @@ @@ -0,0 +1,79 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.common.telemetry.internals;
import org.apache.kafka.common.KafkaException;
import org.apache.kafka.common.requests.AbstractRequest.Builder;
import org.apache.kafka.common.requests.GetTelemetrySubscriptionsResponse;
import org.apache.kafka.common.requests.PushTelemetryResponse;
import java.util.Optional;
/**
* The interface used by the `NetworkClient` to send telemetry requests.
*/
public interface ClientTelemetrySender extends AutoCloseable {
/**
* Return the next time when the telemetry API should be attempted (i.e., interval time has elapsed).
* <p>
* If another telemetry API is in-flight, then {@code timeoutMs} should be returned as the
* maximum wait time.
*
* @param timeoutMs The timeout for the inflight telemetry API call.
* @return remaining time in ms till the telemetry API be attempted again.
*/
long timeToNextUpdate(long timeoutMs);
/**
* Return the telemetry request based on client state i.e. determine if
* {@link org.apache.kafka.common.requests.GetTelemetrySubscriptionsRequest} or
* {@link org.apache.kafka.common.requests.PushTelemetryRequest} be constructed.
*
* @return request for telemetry API call.
*/
Optional<Builder<?>> createRequest();
/**
* Handle successful response for get telemetry subscriptions request.
*
* @param response subscriptions telemetry API response
*/
void handleResponse(GetTelemetrySubscriptionsResponse response);
/**
* Handle successful response for push telemetry request.
*
* @param response push telemetry API response
*/
void handleResponse(PushTelemetryResponse response);
/**
* Handle get telemetry subscriptions request failure.
*
* @param kafkaException the fatal exception.
*/
void handleFailedGetTelemetrySubscriptionsRequest(KafkaException kafkaException);
/**
* Handle push telemetry request failure.
*
* @param kafkaException the fatal exception.
*/
void handleFailedPushTelemetryRequest(KafkaException kafkaException);
}

94
clients/src/main/java/org/apache/kafka/common/telemetry/internals/MetricKey.java

@ -0,0 +1,94 @@ @@ -0,0 +1,94 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.common.telemetry.internals;
import org.apache.kafka.common.MetricName;
import java.util.Collections;
import java.util.Map;
import java.util.Objects;
/**
* Value object that contains the name and tags for a Metric.
*/
public class MetricKey implements MetricKeyable {
private final String name;
private final Map<String, String> tags;
/**
* Create a {@code MetricKey}
*
* @param name metric name. This should be the telemetry metric name of the metric (the final name
* under which this metric is emitted).
*/
public MetricKey(String name) {
this(name, null);
}
/**
* Create a {@code MetricKey}
*
* @param name metric name. This should be the .converted. name of the metric (the final name
* under which this metric is emitted).
* @param tags mapping of tag keys to values.
*/
public MetricKey(String name, Map<String, String> tags) {
this.name = Objects.requireNonNull(name);
this.tags = tags != null ? Collections.unmodifiableMap(tags) : Collections.emptyMap();
}
public MetricKey(MetricName metricName) {
this(metricName.name(), metricName.tags());
}
@Override
public MetricKey key() {
return this;
}
public String getName() {
return name;
}
public Map<String, String> tags() {
return tags;
}
@Override
public int hashCode() {
return Objects.hash(name, tags);
}
@Override
public boolean equals(Object obj) {
if (this == obj)
return true;
if (obj == null)
return false;
if (getClass() != obj.getClass())
return false;
MetricKey other = (MetricKey) obj;
return this.getName().equals(other.getName()) && this.tags().equals(other.tags());
}
@Override
public String toString() {
return "MetricKey {name=" + getName() + ", tags=" + tags() + "}";
}
}

30
clients/src/main/java/org/apache/kafka/common/telemetry/internals/MetricKeyable.java

@ -0,0 +1,30 @@ @@ -0,0 +1,30 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.common.telemetry.internals;
/**
* An object that provides a MetricKey that we can use to uniquely identify a metric. This
* is useful for filtering as well as calculating delta metrics.
*/
public interface MetricKeyable {
/**
* @return The {@code MetricKey} for respective metric.
*/
MetricKey key();
}

83
clients/src/main/java/org/apache/kafka/common/telemetry/internals/MetricsCollector.java

@ -0,0 +1,83 @@ @@ -0,0 +1,83 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.common.telemetry.internals;
/**
* A {@code MetricsCollector} is responsible for scraping a source of metrics and forwarding
* them to the given {@link MetricsEmitter}. For example, a given collector might be used to collect
* system metrics, Kafka metrics, JVM metrics, or other metrics that are to be captured, exposed,
* and/or forwarded.
*
* <p/>
*
* In general, a {@code MetricsCollector} implementation is closely managed by another entity
* (that entity is colloquially referred to as the "telemetry reporter") that will be in
* charge of its lifecycle via the {@link #start()} and {@link #stop()} methods. The telemetry
* reporter should ensure that the {@link #start()} method is invoked <i>once and only once</i>
* before calls to {@link #collect(MetricsEmitter)} are made. Implementations of {@code MetricsCollector}
* should allow for the corner-case that {@link #stop()} is called before {@link #start()},
* which might happen in the case of error on startup of the telemetry reporter.
*
* <p/>
*
* Regarding threading, the {@link #start()} and {@link #stop()} methods may be called from
* different threads and so proper care should be taken by implementations of the
* {@code MetricsCollector} interface to be thread-safe. However, the telemetry reporter must
* ensure that the {@link #collect(MetricsEmitter)} method should only be invoked in a synchronous
* manner.
*
* @see MetricsEmitter
*/
public interface MetricsCollector {
/**
* The {@code collect} method is called by the telemetry reporter to retrieve the value
* of its desired set of metrics, and then forward those on to the provided
* {@link MetricsEmitter}. The implementation may choose to collect all the metrics before forwarding
* them to the {@code metricsEmitter}, or they may be forwarded as they are collected.
*
* <p>
*
* In general, the implementation should try not to presume the characteristics of the
* {@link MetricsEmitter} so as to keep a loose coupling.
*
* @param metricsEmitter {@link MetricsEmitter} to which the metric values will be passed once collected
*/
void collect(MetricsEmitter metricsEmitter);
/**
* Allows the {@code MetricsCollector} implementation to initialize itself. This method should
* be invoked by the telemetry reporter before calls to {@link #collect(MetricsEmitter)} are made. The
* telemetry reporter should not invoke this method more than once.
*/
default void start() {
// Do nothing...
}
/**
* Allows the {@code MetricsCollector} implementation to stop itself and dispose of any resources.
* This method should ideally be invoked only once by the telemetry reporter.
*
* <p>
*
* Calls to {@link #collect(MetricsEmitter)} once this method has been invoked should be expected to
* fail by the telemetry reporter; it should take caution to handle that case.
*/
default void stop() {
// Do nothing...
}
}

107
clients/src/main/java/org/apache/kafka/common/telemetry/internals/MetricsEmitter.java

@ -0,0 +1,107 @@ @@ -0,0 +1,107 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.common.telemetry.internals;
import java.util.Collections;
import java.util.List;
import java.io.Closeable;
/**
* An {@code MetricsEmitter} emits the values held by the {@link SinglePointMetric}, likely first converting them
* to a format suitable for exposure, storage, or transmission. The telemetry reporter is likely
* the entity that is familiar with the underlying method of making the metrics visible to the
* broker. Thus, it is the primary place in the code where the implementation details are known.
*
* <p>
*
* An {@code MetricsEmitter} is stateless and the telemetry reporter should assume that the object is
* not thread safe and thus concurrent access to either the
* {@link #shouldEmitMetric(MetricKeyable)} or {@link #emitMetric(SinglePointMetric)} should be avoided.
*
* Regarding threading, the {@link #init()} and {@link #close()} methods may be called from
* different threads and so proper care should be taken by implementations of the
* {@code MetricsCollector} interface to be thread-safe. However, the telemetry reporter must
* ensure that the {@link #emitMetric(SinglePointMetric)} and {@link #emittedMetrics()} methods
* should only be invoked in a synchronous manner.
*/
public interface MetricsEmitter extends Closeable {
/**
* Performs the necessary logic to determine if the metric is to be emitted. The telemetry
* reporter should respect this and not just call the {@link #emitMetric(SinglePointMetric)} directly.
*
* @param metricKeyable Object from which to get the {@link MetricKey}
* @return {@code true} if the metric should be emitted, {@code false} otherwise
*/
boolean shouldEmitMetric(MetricKeyable metricKeyable);
/**
* Emits the metric in an implementation-specific fashion. Depending on the implementation,
* calls made to this after {@link #close()} has been invoked will fail.
*
* @param metric {@code SinglePointMetric}
* @return {@code true} if the metric was emitted, {@code false} otherwise
*/
boolean emitMetric(SinglePointMetric metric);
/**
* Return emitted metrics. Implementation should decide if all emitted metrics should be returned
* or should provide the delta from last invocation of this method. Depending on the implementation,
* calls made to this after {@link #close()} has been invoked will fail.
*
* @return emitted metrics.
*/
default List<SinglePointMetric> emittedMetrics() {
return Collections.emptyList();
}
/**
* Emits a metric if {@link MetricsEmitter#shouldEmitMetric(MetricKeyable)} returns <tt>true</tt>.
* @param metric to emit
* @return true if emit is successful, false otherwise
*/
default boolean maybeEmitMetric(SinglePointMetric metric) {
return shouldEmitMetric(metric) && emitMetric(metric);
}
/**
* Allows the {@code MetricsEmitter} implementation to initialize itself. This method should be invoked
* by the telemetry reporter before calls to {@link #emitMetric(SinglePointMetric)} are made.
*
* <p>
*
* The telemetry reporter should not invoke this method more than once.
*/
default void init() {
// Do nothing...
}
/**
* Allows the {@code MetricsEmitter} implementation to stop itself and dispose of any resources. This
* method should ideally be invoked only once by the telemetry reporter.
*
* <p>
*
* Calls to {@link #emitMetric(SinglePointMetric)} once this method has been invoked should be
* expected to fail by the telemetry reporter; it should take caution to handle that case.
*/
default void close() {
// Do nothing...
}
}

37
clients/src/main/java/org/apache/kafka/common/telemetry/internals/SinglePointMetric.java

@ -0,0 +1,37 @@ @@ -0,0 +1,37 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.common.telemetry.internals;
/**
* This class represents a metric that does not yet contain resource tags.
* These additional resource tags will be added before emitting metrics by the telemetry reporter.
*/
public class SinglePointMetric implements MetricKeyable {
private final MetricKey key;
private SinglePointMetric(MetricKey key) {
this.key = key;
}
@Override
public MetricKey key() {
return key;
}
// TODO: Implement methods for serializing/deserializing metrics in required format.
}

44
clients/src/main/java/org/apache/kafka/server/telemetry/ClientTelemetry.java

@ -0,0 +1,44 @@ @@ -0,0 +1,44 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.server.telemetry;
import org.apache.kafka.common.metrics.MetricsContext;
import org.apache.kafka.common.metrics.MetricsReporter;
import org.apache.kafka.common.annotation.InterfaceStability;
/**
* A {@link MetricsReporter} may implement this interface to indicate support for collecting client
* telemetry on the server side.
*/
@InterfaceStability.Evolving
public interface ClientTelemetry {
/**
* Implemented by the broker {@link MetricsReporter} to provide a {@link ClientTelemetryReceiver}
* instance.
* <p>
* This instance may be cached by the broker.
* <p>
* This method must always be called after the initial call to
* {@link MetricsReporter#contextChange(MetricsContext)} on the {@link MetricsReporter}
* implementing this interface.
*
* @return broker side instance of {@link ClientTelemetryReceiver}.
*/
ClientTelemetryReceiver clientReceiver();
}

53
clients/src/main/java/org/apache/kafka/server/telemetry/ClientTelemetryPayload.java

@ -0,0 +1,53 @@ @@ -0,0 +1,53 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.server.telemetry;
import org.apache.kafka.common.Uuid;
import org.apache.kafka.common.annotation.InterfaceStability;
import java.nio.ByteBuffer;
@InterfaceStability.Evolving
public interface ClientTelemetryPayload {
/**
* @return Client's instance id.
*/
Uuid clientInstanceId();
/**
* Indicates whether client is terminating, e.g., the last metrics push from this client instance.
* <p>
*To avoid the receiving brokers metrics rate-limiter discarding this out-of-profile push, the
* PushTelemetryRequest.Terminating field must be set to true. A broker must only allow one such
* unthrottled metrics push for each combination of client instance ID and SubscriptionId.
*
* @return {@code true} if client is terminating, else false
*/
boolean isTerminating();
/**
* @return Metrics data content-type/serialization format.
*/
String contentType();
/**
* @return Serialized metrics data.
*/
ByteBuffer data();
}

37
clients/src/main/java/org/apache/kafka/server/telemetry/ClientTelemetryReceiver.java

@ -0,0 +1,37 @@ @@ -0,0 +1,37 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.server.telemetry;
import org.apache.kafka.common.annotation.InterfaceStability;
import org.apache.kafka.server.authorizer.AuthorizableRequestContext;
@InterfaceStability.Evolving
public interface ClientTelemetryReceiver {
/**
* Called by the broker when a client reports telemetry metrics. The associated request context
* can be used by the metrics plugin to retrieve additional client information such as client ids
* or endpoints.
* <p>s
* This method may be called from the request handling thread, and as such should avoid blocking.
*
* @param context the client request context for the corresponding {@code PushTelemetryRequest}
* api call.
* @param payload the encoded telemetry payload as sent by the client.
*/
void exportMetrics(AuthorizableRequestContext context, ClientTelemetryPayload payload);
}

20
clients/src/main/java/org/apache/kafka/server/telemetry/package-info.java

@ -0,0 +1,20 @@ @@ -0,0 +1,20 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
/**
* Provides pluggable interface for capturing client telemetry metrics.
*/
package org.apache.kafka.server.telemetry;
Loading…
Cancel
Save