Browse Source

KAFKA-15613: Client API definition and configurations (KIP-714) (#14560)

Part of KIP-714.

Reviewers: Andrew Schofield <aschofield@confluent.io>, Walker Carlson <wcarlson@confluent.io>, Matthias J. Sax <matthias@confluent.io>
pull/14203/merge
Apoorv Mittal 11 months ago committed by GitHub
parent
commit
78166101eb
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 3
      clients/src/main/java/org/apache/kafka/clients/CommonClientConfigs.java
  2. 30
      clients/src/main/java/org/apache/kafka/clients/admin/Admin.java
  3. 11
      clients/src/main/java/org/apache/kafka/clients/admin/AdminClientConfig.java
  4. 6
      clients/src/main/java/org/apache/kafka/clients/admin/ForwardingAdmin.java
  5. 5
      clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java
  6. 6
      clients/src/main/java/org/apache/kafka/clients/consumer/Consumer.java
  7. 11
      clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerConfig.java
  8. 31
      clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java
  9. 6
      clients/src/main/java/org/apache/kafka/clients/consumer/MockConsumer.java
  10. 6
      clients/src/main/java/org/apache/kafka/clients/consumer/internals/PrototypeAsyncConsumer.java
  11. 31
      clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java
  12. 6
      clients/src/main/java/org/apache/kafka/clients/producer/MockProducer.java
  13. 6
      clients/src/main/java/org/apache/kafka/clients/producer/Producer.java
  14. 11
      clients/src/main/java/org/apache/kafka/clients/producer/ProducerConfig.java
  15. 5
      clients/src/test/java/org/apache/kafka/clients/admin/MockAdminClient.java

3
clients/src/main/java/org/apache/kafka/clients/CommonClientConfigs.java

@ -105,6 +105,9 @@ public class CommonClientConfigs { @@ -105,6 +105,9 @@ public class CommonClientConfigs {
public static final int RETRY_BACKOFF_EXP_BASE = 2;
public static final double RETRY_BACKOFF_JITTER = 0.2;
public static final String ENABLE_METRICS_PUSH_CONFIG = "enable.metrics.push";
public static final String ENABLE_METRICS_PUSH_DOC = "Whether to enable pushing of client metrics to the cluster, if the cluster has a client metrics subscription which matches this client.";
public static final String METRICS_SAMPLE_WINDOW_MS_CONFIG = "metrics.sample.window.ms";
public static final String METRICS_SAMPLE_WINDOW_MS_DOC = "The window of time a metrics sample is computed over.";

30
clients/src/main/java/org/apache/kafka/clients/admin/Admin.java

@ -19,17 +19,20 @@ package org.apache.kafka.clients.admin; @@ -19,17 +19,20 @@ package org.apache.kafka.clients.admin;
import org.apache.kafka.clients.consumer.OffsetAndMetadata;
import org.apache.kafka.common.ElectionType;
import org.apache.kafka.common.KafkaException;
import org.apache.kafka.common.KafkaFuture;
import org.apache.kafka.common.Metric;
import org.apache.kafka.common.MetricName;
import org.apache.kafka.common.TopicCollection;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.TopicPartitionReplica;
import org.apache.kafka.common.Uuid;
import org.apache.kafka.common.acl.AclBinding;
import org.apache.kafka.common.acl.AclBindingFilter;
import org.apache.kafka.common.annotation.InterfaceStability;
import org.apache.kafka.common.config.ConfigResource;
import org.apache.kafka.common.errors.FeatureUpdateFailedException;
import org.apache.kafka.common.errors.InterruptException;
import org.apache.kafka.common.quota.ClientQuotaAlteration;
import org.apache.kafka.common.quota.ClientQuotaFilter;
import org.apache.kafka.common.requests.LeaveGroupResponse;
@ -1660,6 +1663,33 @@ public interface Admin extends AutoCloseable { @@ -1660,6 +1663,33 @@ public interface Admin extends AutoCloseable {
FenceProducersResult fenceProducers(Collection<String> transactionalIds,
FenceProducersOptions options);
/**
* Determines the client's unique client instance ID used for telemetry. This ID is unique to
* this specific client instance and will not change after it is initially generated.
* The ID is useful for correlating client operations with telemetry sent to the broker and
* to its eventual monitoring destinations.
* <p>
* If telemetry is enabled, this will first require a connection to the cluster to generate
* the unique client instance ID. This method waits up to {@code timeout} for the admin
* client to complete the request.
* <p>
* Client telemetry is controlled by the {@link AdminClientConfig#ENABLE_METRICS_PUSH_CONFIG}
* configuration option.
*
* @param timeout The maximum time to wait for admin client to determine its client instance ID.
* The value must be non-negative. Specifying a timeout of zero means do not
* wait for the initial request to complete if it hasn't already.
* @throws InterruptException If the thread is interrupted while blocked.
* @throws KafkaException If an unexpected error occurs while trying to determine the client
* instance ID, though this error does not necessarily imply the
* admin client is otherwise unusable.
* @throws IllegalArgumentException If the {@code timeout} is negative.
* @throws IllegalStateException If telemetry is not enabled ie, config `{@code enable.metrics.push}`
* is set to `{@code false}`.
* @return The client's assigned instance id used for metrics collection.
*/
Uuid clientInstanceId(Duration timeout);
/**
* Get the metrics kept by the adminClient
*/

11
clients/src/main/java/org/apache/kafka/clients/admin/AdminClientConfig.java

@ -84,6 +84,12 @@ public class AdminClientConfig extends AbstractConfig { @@ -84,6 +84,12 @@ public class AdminClientConfig extends AbstractConfig {
public static final String RETRY_BACKOFF_MAX_MS_CONFIG = CommonClientConfigs.RETRY_BACKOFF_MAX_MS_CONFIG;
private static final String RETRY_BACKOFF_MAX_MS_DOC = CommonClientConfigs.RETRY_BACKOFF_MAX_MS_DOC;
/**
* <code>enable.metrics.push</code>
*/
public static final String ENABLE_METRICS_PUSH_CONFIG = CommonClientConfigs.ENABLE_METRICS_PUSH_CONFIG;
public static final String ENABLE_METRICS_PUSH_DOC = CommonClientConfigs.ENABLE_METRICS_PUSH_DOC;
/** <code>socket.connection.setup.timeout.ms</code> */
public static final String SOCKET_CONNECTION_SETUP_TIMEOUT_MS_CONFIG = CommonClientConfigs.SOCKET_CONNECTION_SETUP_TIMEOUT_MS_CONFIG;
@ -178,6 +184,11 @@ public class AdminClientConfig extends AbstractConfig { @@ -178,6 +184,11 @@ public class AdminClientConfig extends AbstractConfig {
atLeast(0L),
Importance.LOW,
RETRY_BACKOFF_MAX_MS_DOC)
.define(ENABLE_METRICS_PUSH_CONFIG,
Type.BOOLEAN,
true,
Importance.LOW,
ENABLE_METRICS_PUSH_DOC)
.define(REQUEST_TIMEOUT_MS_CONFIG,
Type.INT,
30000,

6
clients/src/main/java/org/apache/kafka/clients/admin/ForwardingAdmin.java

@ -24,6 +24,7 @@ import org.apache.kafka.common.MetricName; @@ -24,6 +24,7 @@ import org.apache.kafka.common.MetricName;
import org.apache.kafka.common.TopicCollection;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.TopicPartitionReplica;
import org.apache.kafka.common.Uuid;
import org.apache.kafka.common.acl.AclBinding;
import org.apache.kafka.common.acl.AclBindingFilter;
import org.apache.kafka.common.config.ConfigResource;
@ -277,6 +278,11 @@ public class ForwardingAdmin implements Admin { @@ -277,6 +278,11 @@ public class ForwardingAdmin implements Admin {
return delegate.fenceProducers(transactionalIds, options);
}
@Override
public Uuid clientInstanceId(Duration timeout) {
return delegate.clientInstanceId(timeout);
}
@Override
public Map<MetricName, ? extends Metric> metrics() {
return delegate.metrics();

5
clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java

@ -4385,6 +4385,11 @@ public class KafkaAdminClient extends AdminClient { @@ -4385,6 +4385,11 @@ public class KafkaAdminClient extends AdminClient {
return new FenceProducersResult(future.all());
}
@Override
public Uuid clientInstanceId(Duration timeout) {
throw new UnsupportedOperationException();
}
private <K, V> void invokeDriver(
AdminApiHandler<K, V> handler,
AdminApiFuture<K, V> future,

6
clients/src/main/java/org/apache/kafka/clients/consumer/Consumer.java

@ -20,6 +20,7 @@ import org.apache.kafka.common.Metric; @@ -20,6 +20,7 @@ import org.apache.kafka.common.Metric;
import org.apache.kafka.common.MetricName;
import org.apache.kafka.common.PartitionInfo;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.Uuid;
import java.io.Closeable;
import java.time.Duration;
@ -173,6 +174,11 @@ public interface Consumer<K, V> extends Closeable { @@ -173,6 +174,11 @@ public interface Consumer<K, V> extends Closeable {
*/
Map<TopicPartition, OffsetAndMetadata> committed(Set<TopicPartition> partitions, final Duration timeout);
/**
* See {@link KafkaConsumer#clientInstanceId(Duration)}}
*/
Uuid clientInstanceId(Duration timeout);
/**
* @see KafkaConsumer#metrics()
*/

11
clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerConfig.java

@ -230,6 +230,12 @@ public class ConsumerConfig extends AbstractConfig { @@ -230,6 +230,12 @@ public class ConsumerConfig extends AbstractConfig {
*/
public static final String RETRY_BACKOFF_MS_CONFIG = CommonClientConfigs.RETRY_BACKOFF_MS_CONFIG;
/**
* <code>enable.metrics.push</code>
*/
public static final String ENABLE_METRICS_PUSH_CONFIG = CommonClientConfigs.ENABLE_METRICS_PUSH_CONFIG;
public static final String ENABLE_METRICS_PUSH_DOC = CommonClientConfigs.ENABLE_METRICS_PUSH_DOC;
/**
* <code>retry.backoff.max.ms</code>
*/
@ -481,6 +487,11 @@ public class ConsumerConfig extends AbstractConfig { @@ -481,6 +487,11 @@ public class ConsumerConfig extends AbstractConfig {
atLeast(0L),
Importance.LOW,
CommonClientConfigs.RETRY_BACKOFF_MAX_MS_DOC)
.define(ENABLE_METRICS_PUSH_CONFIG,
Type.BOOLEAN,
true,
Importance.LOW,
ENABLE_METRICS_PUSH_DOC)
.define(AUTO_OFFSET_RESET_CONFIG,
Type.STRING,
OffsetResetStrategy.LATEST.toString(),

31
clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java

@ -42,6 +42,7 @@ import org.apache.kafka.common.Metric; @@ -42,6 +42,7 @@ import org.apache.kafka.common.Metric;
import org.apache.kafka.common.MetricName;
import org.apache.kafka.common.PartitionInfo;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.Uuid;
import org.apache.kafka.common.errors.InterruptException;
import org.apache.kafka.common.errors.InvalidGroupIdException;
import org.apache.kafka.common.errors.TimeoutException;
@ -1892,6 +1893,36 @@ public class KafkaConsumer<K, V> implements Consumer<K, V> { @@ -1892,6 +1893,36 @@ public class KafkaConsumer<K, V> implements Consumer<K, V> {
}
/**
* Determines the client's unique client instance ID used for telemetry. This ID is unique to
* this specific client instance and will not change after it is initially generated.
* The ID is useful for correlating client operations with telemetry sent to the broker and
* to its eventual monitoring destinations.
* <p>
* If telemetry is enabled, this will first require a connection to the cluster to generate
* the unique client instance ID. This method waits up to {@code timeout} for the consumer
* client to complete the request.
* <p>
* Client telemetry is controlled by the {@link ConsumerConfig#ENABLE_METRICS_PUSH_CONFIG}
* configuration option.
*
* @param timeout The maximum time to wait for consumer client to determine its client instance ID.
* The value must be non-negative. Specifying a timeout of zero means do not
* wait for the initial request to complete if it hasn't already.
* @throws InterruptException If the thread is interrupted while blocked.
* @throws KafkaException If an unexpected error occurs while trying to determine the client
* instance ID, though this error does not necessarily imply the
* consumer client is otherwise unusable.
* @throws IllegalArgumentException If the {@code timeout} is negative.
* @throws IllegalStateException If telemetry is not enabled ie, config `{@code enable.metrics.push}`
* is set to `{@code false}`.
* @return The client's assigned instance id used for metrics collection.
*/
@Override
public Uuid clientInstanceId(Duration timeout) {
throw new UnsupportedOperationException();
}
/**
* Get the metrics kept by the consumer
*/
@Override

6
clients/src/main/java/org/apache/kafka/clients/consumer/MockConsumer.java

@ -24,6 +24,7 @@ import org.apache.kafka.common.Metric; @@ -24,6 +24,7 @@ import org.apache.kafka.common.Metric;
import org.apache.kafka.common.MetricName;
import org.apache.kafka.common.PartitionInfo;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.Uuid;
import org.apache.kafka.common.errors.WakeupException;
import org.apache.kafka.common.utils.LogContext;
@ -384,6 +385,11 @@ public class MockConsumer<K, V> implements Consumer<K, V> { @@ -384,6 +385,11 @@ public class MockConsumer<K, V> implements Consumer<K, V> {
endOffsets.putAll(newOffsets);
}
@Override
public Uuid clientInstanceId(Duration timeout) {
throw new UnsupportedOperationException("Not implemented yet.");
}
@Override
public synchronized Map<MetricName, ? extends Metric> metrics() {
ensureNotClosed();

6
clients/src/main/java/org/apache/kafka/clients/consumer/internals/PrototypeAsyncConsumer.java

@ -44,6 +44,7 @@ import org.apache.kafka.common.Metric; @@ -44,6 +44,7 @@ import org.apache.kafka.common.Metric;
import org.apache.kafka.common.MetricName;
import org.apache.kafka.common.PartitionInfo;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.Uuid;
import org.apache.kafka.common.config.ConfigException;
import org.apache.kafka.common.errors.InterruptException;
import org.apache.kafka.common.errors.InvalidGroupIdException;
@ -590,6 +591,11 @@ public class PrototypeAsyncConsumer<K, V> implements Consumer<K, V> { @@ -590,6 +591,11 @@ public class PrototypeAsyncConsumer<K, V> implements Consumer<K, V> {
}
}
@Override
public Uuid clientInstanceId(Duration timeout) {
throw new KafkaException("method not implemented");
}
@Override
public Set<TopicPartition> assignment() {
return Collections.unmodifiableSet(this.subscriptions.assignedPartitions());

31
clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java

@ -40,6 +40,7 @@ import org.apache.kafka.common.Metric; @@ -40,6 +40,7 @@ import org.apache.kafka.common.Metric;
import org.apache.kafka.common.MetricName;
import org.apache.kafka.common.PartitionInfo;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.Uuid;
import org.apache.kafka.common.config.ConfigException;
import org.apache.kafka.common.errors.ApiException;
import org.apache.kafka.common.errors.AuthenticationException;
@ -1252,6 +1253,36 @@ public class KafkaProducer<K, V> implements Producer<K, V> { @@ -1252,6 +1253,36 @@ public class KafkaProducer<K, V> implements Producer<K, V> {
return Collections.unmodifiableMap(this.metrics.metrics());
}
/**
* Determines the client's unique client instance ID used for telemetry. This ID is unique to
* this specific client instance and will not change after it is initially generated.
* The ID is useful for correlating client operations with telemetry sent to the broker and
* to its eventual monitoring destinations.
* <p>
* If telemetry is enabled, this will first require a connection to the cluster to generate
* the unique client instance ID. This method waits up to {@code timeout} for the producer
* client to complete the request.
* <p>
* Client telemetry is controlled by the {@link ProducerConfig#ENABLE_METRICS_PUSH_CONFIG}
* configuration option.
*
* @param timeout The maximum time to wait for producer client to determine its client instance ID.
* The value must be non-negative. Specifying a timeout of zero means do not
* wait for the initial request to complete if it hasn't already.
* @throws InterruptException If the thread is interrupted while blocked.
* @throws KafkaException If an unexpected error occurs while trying to determine the client
* instance ID, though this error does not necessarily imply the
* producer client is otherwise unusable.
* @throws IllegalArgumentException If the {@code timeout} is negative.
* @throws IllegalStateException If telemetry is not enabled ie, config `{@code enable.metrics.push}`
* is set to `{@code false}`.
* @return The client's assigned instance id used for metrics collection.
*/
@Override
public Uuid clientInstanceId(Duration timeout) {
throw new UnsupportedOperationException();
}
/**
* Close this producer. This method blocks until all previously sent requests complete.
* This method is equivalent to <code>close(Long.MAX_VALUE, TimeUnit.MILLISECONDS)</code>.

6
clients/src/main/java/org/apache/kafka/clients/producer/MockProducer.java

@ -26,6 +26,7 @@ import org.apache.kafka.common.Metric; @@ -26,6 +26,7 @@ import org.apache.kafka.common.Metric;
import org.apache.kafka.common.MetricName;
import org.apache.kafka.common.PartitionInfo;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.Uuid;
import org.apache.kafka.common.errors.ProducerFencedException;
import org.apache.kafka.common.record.RecordBatch;
import org.apache.kafka.common.serialization.Serializer;
@ -379,6 +380,11 @@ public class MockProducer<K, V> implements Producer<K, V> { @@ -379,6 +380,11 @@ public class MockProducer<K, V> implements Producer<K, V> {
return this.cluster.partitionsForTopic(topic);
}
@Override
public Uuid clientInstanceId(Duration timeout) {
throw new UnsupportedOperationException();
}
public Map<MetricName, Metric> metrics() {
return mockMetrics;
}

6
clients/src/main/java/org/apache/kafka/clients/producer/Producer.java

@ -22,6 +22,7 @@ import org.apache.kafka.common.MetricName; @@ -22,6 +22,7 @@ import org.apache.kafka.common.MetricName;
import org.apache.kafka.common.PartitionInfo;
import org.apache.kafka.clients.consumer.OffsetAndMetadata;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.Uuid;
import org.apache.kafka.common.errors.ProducerFencedException;
import java.io.Closeable;
@ -95,6 +96,11 @@ public interface Producer<K, V> extends Closeable { @@ -95,6 +96,11 @@ public interface Producer<K, V> extends Closeable {
*/
Map<MetricName, ? extends Metric> metrics();
/**
* See {@link KafkaProducer#clientInstanceId(Duration)}}
*/
Uuid clientInstanceId(Duration timeout);
/**
* See {@link KafkaProducer#close()}
*/

11
clients/src/main/java/org/apache/kafka/clients/producer/ProducerConfig.java

@ -213,6 +213,12 @@ public class ProducerConfig extends AbstractConfig { @@ -213,6 +213,12 @@ public class ProducerConfig extends AbstractConfig {
/** <code>retry.backoff.max.ms</code> */
public static final String RETRY_BACKOFF_MAX_MS_CONFIG = CommonClientConfigs.RETRY_BACKOFF_MAX_MS_CONFIG;
/**
* <code>enable.metrics.push</code>
*/
public static final String ENABLE_METRICS_PUSH_CONFIG = CommonClientConfigs.ENABLE_METRICS_PUSH_CONFIG;
public static final String ENABLE_METRICS_PUSH_DOC = CommonClientConfigs.ENABLE_METRICS_PUSH_DOC;
/** <code>compression.type</code> */
public static final String COMPRESSION_TYPE_CONFIG = "compression.type";
private static final String COMPRESSION_TYPE_DOC = "The compression type for all data generated by the producer. The default is none (i.e. no compression). Valid "
@ -385,6 +391,11 @@ public class ProducerConfig extends AbstractConfig { @@ -385,6 +391,11 @@ public class ProducerConfig extends AbstractConfig {
atLeast(0L),
Importance.LOW,
CommonClientConfigs.RETRY_BACKOFF_MAX_MS_DOC)
.define(ENABLE_METRICS_PUSH_CONFIG,
Type.BOOLEAN,
true,
Importance.LOW,
ENABLE_METRICS_PUSH_DOC)
.define(MAX_BLOCK_MS_CONFIG,
Type.LONG,
60 * 1000,

5
clients/src/test/java/org/apache/kafka/clients/admin/MockAdminClient.java

@ -1312,6 +1312,11 @@ public class MockAdminClient extends AdminClient { @@ -1312,6 +1312,11 @@ public class MockAdminClient extends AdminClient {
mockMetrics.put(name, metric);
}
@Override
public Uuid clientInstanceId(Duration timeout) {
throw new UnsupportedOperationException("Not implemented yet");
}
@Override
synchronized public Map<MetricName, ? extends Metric> metrics() {
return mockMetrics;

Loading…
Cancel
Save