Browse Source

KAFKA-1723; make the metrics name in new producer more standard; patched by Manikumar Reddy; reviewed by Jay Kreps and Jun Rao

pull/38/merge
Manikumar Reddy 10 years ago committed by Jun Rao
parent
commit
688e38ce45
  1. 1
      build.gradle
  2. 5
      clients/src/main/java/org/apache/kafka/clients/consumer/Consumer.java
  3. 3
      clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java
  4. 3
      clients/src/main/java/org/apache/kafka/clients/consumer/MockConsumer.java
  5. 19
      clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java
  6. 7
      clients/src/main/java/org/apache/kafka/clients/producer/MockProducer.java
  7. 4
      clients/src/main/java/org/apache/kafka/clients/producer/Producer.java
  8. 28
      clients/src/main/java/org/apache/kafka/clients/producer/internals/BufferPool.java
  9. 25
      clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordAccumulator.java
  10. 81
      clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java
  11. 9
      clients/src/main/java/org/apache/kafka/common/Metric.java
  12. 179
      clients/src/main/java/org/apache/kafka/common/MetricName.java
  13. 14
      clients/src/main/java/org/apache/kafka/common/metrics/CompoundStat.java
  14. 60
      clients/src/main/java/org/apache/kafka/common/metrics/JmxReporter.java
  15. 18
      clients/src/main/java/org/apache/kafka/common/metrics/KafkaMetric.java
  16. 58
      clients/src/main/java/org/apache/kafka/common/metrics/Metrics.java
  17. 51
      clients/src/main/java/org/apache/kafka/common/metrics/Sensor.java
  18. 18
      clients/src/main/java/org/apache/kafka/common/metrics/stats/Percentile.java
  19. 2
      clients/src/main/java/org/apache/kafka/common/metrics/stats/Percentiles.java
  20. 93
      clients/src/main/java/org/apache/kafka/common/network/Selector.java
  21. 14
      clients/src/test/java/org/apache/kafka/clients/producer/BufferPoolTest.java
  22. 16
      clients/src/test/java/org/apache/kafka/clients/producer/RecordAccumulatorTest.java
  23. 11
      clients/src/test/java/org/apache/kafka/clients/producer/SenderTest.java
  24. 13
      clients/src/test/java/org/apache/kafka/common/metrics/JmxReporterTest.java
  25. 87
      clients/src/test/java/org/apache/kafka/common/metrics/MetricsTest.java
  26. 3
      clients/src/test/java/org/apache/kafka/common/network/SelectorTest.java
  27. 11
      clients/src/test/java/org/apache/kafka/test/MetricsBench.java

1
build.gradle

@ -370,6 +370,7 @@ project(':clients') { @@ -370,6 +370,7 @@ project(':clients') {
javadoc {
include "**/org/apache/kafka/clients/producer/*"
include "**/org/apache/kafka/common/*"
include "**/org/apache/kafka/common/errors/*"
include "**/org/apache/kafka/common/serialization/*"
}

5
clients/src/main/java/org/apache/kafka/clients/consumer/Consumer.java

@ -18,6 +18,7 @@ import java.util.Map; @@ -18,6 +18,7 @@ import java.util.Map;
import org.apache.kafka.common.Metric;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.MetricName;
/**
* @see KafkaConsumer
@ -111,11 +112,11 @@ public interface Consumer<K,V> extends Closeable { @@ -111,11 +112,11 @@ public interface Consumer<K,V> extends Closeable {
* @return The offsets for messages that were written to the server before the specified timestamp.
*/
public Map<TopicPartition, Long> offsetsBeforeTime(long timestamp, Collection<TopicPartition> partitions);
/**
* Return a map of metrics maintained by the consumer
*/
public Map<String, ? extends Metric> metrics();
public Map<MetricName, ? extends Metric> metrics();
/**
* Close this consumer

3
clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java

@ -16,6 +16,7 @@ import org.apache.kafka.common.Metric; @@ -16,6 +16,7 @@ import org.apache.kafka.common.Metric;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.metrics.JmxReporter;
import org.apache.kafka.common.metrics.MetricConfig;
import org.apache.kafka.common.MetricName;
import org.apache.kafka.common.metrics.Metrics;
import org.apache.kafka.common.metrics.MetricsReporter;
import org.apache.kafka.common.serialization.Deserializer;
@ -627,7 +628,7 @@ public class KafkaConsumer<K,V> implements Consumer<K,V> { @@ -627,7 +628,7 @@ public class KafkaConsumer<K,V> implements Consumer<K,V> {
}
@Override
public Map<String, ? extends Metric> metrics() {
public Map<MetricName, ? extends Metric> metrics() {
return Collections.unmodifiableMap(this.metrics.metrics());
}

3
clients/src/main/java/org/apache/kafka/clients/consumer/MockConsumer.java

@ -25,6 +25,7 @@ import java.util.Map.Entry; @@ -25,6 +25,7 @@ import java.util.Map.Entry;
import java.util.Set;
import org.apache.kafka.common.Metric;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.MetricName;
/**
* A mock of the {@link Consumer} interface you can use for testing code that uses Kafka.
@ -179,7 +180,7 @@ public class MockConsumer implements Consumer<byte[], byte[]> { @@ -179,7 +180,7 @@ public class MockConsumer implements Consumer<byte[], byte[]> {
}
@Override
public Map<String, ? extends Metric> metrics() {
public Map<MetricName, ? extends Metric> metrics() {
return null;
}

19
clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java

@ -17,6 +17,7 @@ import java.util.*; @@ -17,6 +17,7 @@ import java.util.*;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.kafka.clients.NetworkClient;
import org.apache.kafka.clients.producer.internals.Metadata;
@ -35,6 +36,7 @@ import org.apache.kafka.common.errors.SerializationException; @@ -35,6 +36,7 @@ import org.apache.kafka.common.errors.SerializationException;
import org.apache.kafka.common.errors.TimeoutException;
import org.apache.kafka.common.metrics.JmxReporter;
import org.apache.kafka.common.metrics.MetricConfig;
import org.apache.kafka.common.MetricName;
import org.apache.kafka.common.metrics.Metrics;
import org.apache.kafka.common.metrics.MetricsReporter;
import org.apache.kafka.common.metrics.Sensor;
@ -77,6 +79,7 @@ public class KafkaProducer<K,V> implements Producer<K,V> { @@ -77,6 +79,7 @@ public class KafkaProducer<K,V> implements Producer<K,V> {
private final Serializer<K> keySerializer;
private final Serializer<V> valueSerializer;
private final ProducerConfig producerConfig;
private static final AtomicInteger producerAutoId = new AtomicInteger(1);
/**
* A producer is instantiated by providing a set of key-value pairs as configuration. Valid configuration strings
@ -159,7 +162,9 @@ public class KafkaProducer<K,V> implements Producer<K,V> { @@ -159,7 +162,9 @@ public class KafkaProducer<K,V> implements Producer<K,V> {
.timeWindow(config.getLong(ProducerConfig.METRICS_SAMPLE_WINDOW_MS_CONFIG),
TimeUnit.MILLISECONDS);
String clientId = config.getString(ProducerConfig.CLIENT_ID_CONFIG);
String jmxPrefix = "kafka.producer." + (clientId.length() > 0 ? clientId + "." : "");
if(clientId.length() <= 0)
clientId = "producer-" + producerAutoId.getAndIncrement();
String jmxPrefix = "kafka.producer";
List<MetricsReporter> reporters = config.getConfiguredInstances(ProducerConfig.METRIC_REPORTER_CLASSES_CONFIG,
MetricsReporter.class);
reporters.add(new JmxReporter(jmxPrefix));
@ -171,17 +176,20 @@ public class KafkaProducer<K,V> implements Producer<K,V> { @@ -171,17 +176,20 @@ public class KafkaProducer<K,V> implements Producer<K,V> {
this.maxRequestSize = config.getInt(ProducerConfig.MAX_REQUEST_SIZE_CONFIG);
this.totalMemorySize = config.getLong(ProducerConfig.BUFFER_MEMORY_CONFIG);
this.compressionType = CompressionType.forName(config.getString(ProducerConfig.COMPRESSION_TYPE_CONFIG));
Map<String, String> metricTags = new LinkedHashMap<String, String>();
metricTags.put("client-id", clientId);
this.accumulator = new RecordAccumulator(config.getInt(ProducerConfig.BATCH_SIZE_CONFIG),
this.totalMemorySize,
config.getLong(ProducerConfig.LINGER_MS_CONFIG),
retryBackoffMs,
config.getBoolean(ProducerConfig.BLOCK_ON_BUFFER_FULL_CONFIG),
metrics,
time);
time,
metricTags);
List<InetSocketAddress> addresses = ClientUtils.parseAndValidateAddresses(config.getList(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG));
this.metadata.update(Cluster.bootstrap(addresses), time.milliseconds());
NetworkClient client = new NetworkClient(new Selector(this.metrics, time),
NetworkClient client = new NetworkClient(new Selector(this.metrics, time , "producer", metricTags),
this.metadata,
clientId,
config.getInt(ProducerConfig.MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION),
@ -196,7 +204,8 @@ public class KafkaProducer<K,V> implements Producer<K,V> { @@ -196,7 +204,8 @@ public class KafkaProducer<K,V> implements Producer<K,V> {
config.getInt(ProducerConfig.RETRIES_CONFIG),
config.getInt(ProducerConfig.TIMEOUT_CONFIG),
this.metrics,
new SystemTime());
new SystemTime(),
clientId);
String ioThreadName = "kafka-producer-network-thread" + (clientId.length() > 0 ? " | " + clientId : "");
this.ioThread = new KafkaThread(ioThreadName, this.sender, true);
this.ioThread.start();
@ -398,7 +407,7 @@ public class KafkaProducer<K,V> implements Producer<K,V> { @@ -398,7 +407,7 @@ public class KafkaProducer<K,V> implements Producer<K,V> {
}
@Override
public Map<String, ? extends Metric> metrics() {
public Map<MetricName, ? extends Metric> metrics() {
return Collections.unmodifiableMap(this.metrics.metrics());
}

7
clients/src/main/java/org/apache/kafka/clients/producer/MockProducer.java

@ -28,10 +28,7 @@ import java.util.concurrent.Future; @@ -28,10 +28,7 @@ import java.util.concurrent.Future;
import org.apache.kafka.clients.producer.internals.FutureRecordMetadata;
import org.apache.kafka.clients.producer.internals.Partitioner;
import org.apache.kafka.clients.producer.internals.ProduceRequestResult;
import org.apache.kafka.common.Cluster;
import org.apache.kafka.common.Metric;
import org.apache.kafka.common.PartitionInfo;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.*;
/**
@ -136,7 +133,7 @@ public class MockProducer implements Producer<byte[], byte[]> { @@ -136,7 +133,7 @@ public class MockProducer implements Producer<byte[], byte[]> {
return this.cluster.partitionsForTopic(topic);
}
public Map<String, Metric> metrics() {
public Map<MetricName, Metric> metrics() {
return Collections.emptyMap();
}

4
clients/src/main/java/org/apache/kafka/clients/producer/Producer.java

@ -23,11 +23,11 @@ import java.util.concurrent.Future; @@ -23,11 +23,11 @@ import java.util.concurrent.Future;
import org.apache.kafka.common.Metric;
import org.apache.kafka.common.PartitionInfo;
import org.apache.kafka.common.MetricName;
/**
* The interface for the {@link KafkaProducer}
*
* @see KafkaProducer
* @see MockProducer
*/
@ -55,7 +55,7 @@ public interface Producer<K,V> extends Closeable { @@ -55,7 +55,7 @@ public interface Producer<K,V> extends Closeable {
/**
* Return a map of metrics maintained by the producer
*/
public Map<String, ? extends Metric> metrics();
public Map<MetricName, ? extends Metric> metrics();
/**
* Close this producer

28
clients/src/main/java/org/apache/kafka/clients/producer/internals/BufferPool.java

@ -16,19 +16,21 @@ @@ -16,19 +16,21 @@
*/
package org.apache.kafka.clients.producer.internals;
import org.apache.kafka.clients.producer.BufferExhaustedException;
import org.apache.kafka.common.metrics.Metrics;
import org.apache.kafka.common.metrics.Sensor;
import org.apache.kafka.common.metrics.stats.Rate;
import org.apache.kafka.common.utils.Time;
import java.nio.ByteBuffer;
import java.util.ArrayDeque;
import java.util.Deque;
import java.util.Map;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.ReentrantLock;
import org.apache.kafka.clients.producer.BufferExhaustedException;
import org.apache.kafka.common.MetricName;
import org.apache.kafka.common.metrics.Metrics;
import org.apache.kafka.common.metrics.Sensor;
import org.apache.kafka.common.metrics.stats.Rate;
import org.apache.kafka.common.utils.Time;
/**
* A pool of ByteBuffers kept under a given memory limit. This class is fairly specific to the needs of the producer. In
@ -61,8 +63,12 @@ public final class BufferPool { @@ -61,8 +63,12 @@ public final class BufferPool {
* @param blockOnExhaustion This controls the behavior when the buffer pool is out of memory. If true the
* {@link #allocate(int)} call will block and wait for memory to be returned to the pool. If false
* {@link #allocate(int)} will throw an exception if the buffer is out of memory.
* @param metrics instance of Metrics
* @param time time instance
* @param metricGrpName logical group name for metrics
* @param metricTags additional key/val attributes for metrics
*/
public BufferPool(long memory, int poolableSize, boolean blockOnExhaustion, Metrics metrics, Time time) {
public BufferPool(long memory, int poolableSize, boolean blockOnExhaustion, Metrics metrics, Time time , String metricGrpName , Map<String, String> metricTags) {
this.poolableSize = poolableSize;
this.blockOnExhaustion = blockOnExhaustion;
this.lock = new ReentrantLock();
@ -73,9 +79,11 @@ public final class BufferPool { @@ -73,9 +79,11 @@ public final class BufferPool {
this.metrics = metrics;
this.time = time;
this.waitTime = this.metrics.sensor("bufferpool-wait-time");
this.waitTime.add("bufferpool-wait-ratio",
"The fraction of time an appender waits for space allocation.",
new Rate(TimeUnit.NANOSECONDS));
MetricName metricName = new MetricName("bufferpool-wait-ratio",
metricGrpName,
"The fraction of time an appender waits for space allocation.",
metricTags);
this.waitTime.add(metricName, new Rate(TimeUnit.NANOSECONDS));
}
/**

25
clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordAccumulator.java

@ -31,6 +31,7 @@ import org.apache.kafka.common.PartitionInfo; @@ -31,6 +31,7 @@ import org.apache.kafka.common.PartitionInfo;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.metrics.Measurable;
import org.apache.kafka.common.metrics.MetricConfig;
import org.apache.kafka.common.MetricName;
import org.apache.kafka.common.metrics.Metrics;
import org.apache.kafka.common.record.CompressionType;
import org.apache.kafka.common.record.MemoryRecords;
@ -76,6 +77,7 @@ public final class RecordAccumulator { @@ -76,6 +77,7 @@ public final class RecordAccumulator {
* memory
* @param metrics The metrics
* @param time The time instance to use
* @param metricTags additional key/value attributes of the metric
*/
public RecordAccumulator(int batchSize,
long totalSize,
@ -83,35 +85,38 @@ public final class RecordAccumulator { @@ -83,35 +85,38 @@ public final class RecordAccumulator {
long retryBackoffMs,
boolean blockOnBufferFull,
Metrics metrics,
Time time) {
Time time,
Map<String, String> metricTags) {
this.drainIndex = 0;
this.closed = false;
this.batchSize = batchSize;
this.lingerMs = lingerMs;
this.retryBackoffMs = retryBackoffMs;
this.batches = new CopyOnWriteMap<TopicPartition, Deque<RecordBatch>>();
this.free = new BufferPool(totalSize, batchSize, blockOnBufferFull, metrics, time);
String metricGrpName = "producer-metrics";
this.free = new BufferPool(totalSize, batchSize, blockOnBufferFull, metrics, time , metricGrpName , metricTags);
this.time = time;
registerMetrics(metrics);
registerMetrics(metrics, metricGrpName, metricTags);
}
private void registerMetrics(Metrics metrics) {
metrics.addMetric("waiting-threads",
"The number of user threads blocked waiting for buffer memory to enqueue their records",
private void registerMetrics(Metrics metrics, String metricGrpName, Map<String, String> metricTags) {
MetricName metricName = new MetricName("waiting-threads", metricGrpName, "The number of user threads blocked waiting for buffer memory to enqueue their records", metricTags);
metrics.addMetric(metricName,
new Measurable() {
public double measure(MetricConfig config, long now) {
return free.queued();
}
});
metrics.addMetric("buffer-total-bytes",
"The maximum amount of buffer memory the client can use (whether or not it is currently used).",
metricName = new MetricName("buffer-total-bytes", metricGrpName, "The maximum amount of buffer memory the client can use (whether or not it is currently used).", metricTags);
metrics.addMetric(metricName,
new Measurable() {
public double measure(MetricConfig config, long now) {
return free.totalMemory();
}
});
metrics.addMetric("buffer-available-bytes",
"The total amount of buffer memory that is not being used (either unallocated or in the free list).",
metricName = new MetricName("buffer-available-bytes", metricGrpName, "The total amount of buffer memory that is not being used (either unallocated or in the free list).", metricTags);
metrics.addMetric(metricName,
new Measurable() {
public double measure(MetricConfig config, long now) {
return free.availableMemory();

81
clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java

@ -16,6 +16,7 @@ import java.nio.ByteBuffer; @@ -16,6 +16,7 @@ import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.Iterator;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
@ -29,6 +30,7 @@ import org.apache.kafka.common.errors.InvalidMetadataException; @@ -29,6 +30,7 @@ import org.apache.kafka.common.errors.InvalidMetadataException;
import org.apache.kafka.common.errors.RetriableException;
import org.apache.kafka.common.metrics.Measurable;
import org.apache.kafka.common.metrics.MetricConfig;
import org.apache.kafka.common.MetricName;
import org.apache.kafka.common.metrics.Metrics;
import org.apache.kafka.common.metrics.Sensor;
import org.apache.kafka.common.metrics.stats.Avg;
@ -82,6 +84,9 @@ public class Sender implements Runnable { @@ -82,6 +84,9 @@ public class Sender implements Runnable {
/* metrics */
private final SenderMetrics sensors;
/* param clientId of the client */
private String clientId;
public Sender(KafkaClient client,
Metadata metadata,
RecordAccumulator accumulator,
@ -90,7 +95,8 @@ public class Sender implements Runnable { @@ -90,7 +95,8 @@ public class Sender implements Runnable {
int retries,
int requestTimeout,
Metrics metrics,
Time time) {
Time time,
String clientId) {
this.client = client;
this.accumulator = accumulator;
this.metadata = metadata;
@ -100,6 +106,7 @@ public class Sender implements Runnable { @@ -100,6 +106,7 @@ public class Sender implements Runnable {
this.acks = acks;
this.retries = retries;
this.time = time;
this.clientId = clientId;
this.sensors = new SenderMetrics(metrics);
}
@ -324,46 +331,60 @@ public class Sender implements Runnable { @@ -324,46 +331,60 @@ public class Sender implements Runnable {
public SenderMetrics(Metrics metrics) {
this.metrics = metrics;
Map<String, String> metricTags = new LinkedHashMap<String, String>();
metricTags.put("client-id", clientId);
String metricGrpName = "producer-metrics";
this.batchSizeSensor = metrics.sensor("batch-size");
this.batchSizeSensor.add("batch-size-avg", "The average number of bytes sent per partition per-request.", new Avg());
this.batchSizeSensor.add("batch-size-max", "The max number of bytes sent per partition per-request.", new Max());
MetricName m = new MetricName("batch-size-avg", metricGrpName, "The average number of bytes sent per partition per-request.", metricTags);
this.batchSizeSensor.add(m, new Avg());
m = new MetricName("batch-size-max", metricGrpName, "The max number of bytes sent per partition per-request.", metricTags);
this.batchSizeSensor.add(m, new Max());
this.compressionRateSensor = metrics.sensor("compression-rate");
this.compressionRateSensor.add("compression-rate-avg", "The average compression rate of record batches.", new Avg());
m = new MetricName("compression-rate-avg", metricGrpName, "The average compression rate of record batches.", metricTags);
this.compressionRateSensor.add(m, new Avg());
this.queueTimeSensor = metrics.sensor("queue-time");
this.queueTimeSensor.add("record-queue-time-avg",
"The average time in ms record batches spent in the record accumulator.",
new Avg());
this.queueTimeSensor.add("record-queue-time-max",
"The maximum time in ms record batches spent in the record accumulator.",
new Max());
m = new MetricName("record-queue-time-avg", metricGrpName, "The average time in ms record batches spent in the record accumulator.", metricTags);
this.queueTimeSensor.add(m, new Avg());
m = new MetricName("record-queue-time-max", metricGrpName, "The maximum time in ms record batches spent in the record accumulator.", metricTags);
this.queueTimeSensor.add(m, new Max());
this.requestTimeSensor = metrics.sensor("request-time");
this.requestTimeSensor.add("request-latency-avg", "The average request latency in ms", new Avg());
this.requestTimeSensor.add("request-latency-max", "The maximum request latency in ms", new Max());
m = new MetricName("request-latency-avg", metricGrpName, "The average request latency in ms", metricTags);
this.requestTimeSensor.add(m, new Avg());
m = new MetricName("request-latency-max", metricGrpName, "The maximum request latency in ms", metricTags);
this.requestTimeSensor.add(m, new Max());
this.recordsPerRequestSensor = metrics.sensor("records-per-request");
this.recordsPerRequestSensor.add("record-send-rate", "The average number of records sent per second.", new Rate());
this.recordsPerRequestSensor.add("records-per-request-avg", "The average number of records per request.", new Avg());
m = new MetricName("record-send-rate", metricGrpName, "The average number of records sent per second.", metricTags);
this.recordsPerRequestSensor.add(m, new Rate());
m = new MetricName("records-per-request-avg", metricGrpName, "The average number of records per request.", metricTags);
this.recordsPerRequestSensor.add(m, new Avg());
this.retrySensor = metrics.sensor("record-retries");
this.retrySensor.add("record-retry-rate", "The average per-second number of retried record sends", new Rate());
m = new MetricName("record-retry-rate", metricGrpName, "The average per-second number of retried record sends", metricTags);
this.retrySensor.add(m, new Rate());
this.errorSensor = metrics.sensor("errors");
this.errorSensor.add("record-error-rate", "The average per-second number of record sends that resulted in errors", new Rate());
m = new MetricName("record-error-rate", metricGrpName, "The average per-second number of record sends that resulted in errors", metricTags);
this.errorSensor.add(m, new Rate());
this.maxRecordSizeSensor = metrics.sensor("record-size-max");
this.maxRecordSizeSensor.add("record-size-max", "The maximum record size", new Max());
this.maxRecordSizeSensor.add("record-size-avg", "The average record size", new Avg());
m = new MetricName("record-size-max", metricGrpName, "The maximum record size", metricTags);
this.maxRecordSizeSensor.add(m, new Max());
m = new MetricName("record-size-avg", metricGrpName, "The average record size", metricTags);
this.maxRecordSizeSensor.add(m, new Avg());
this.metrics.addMetric("requests-in-flight", "The current number of in-flight requests awaiting a response.", new Measurable() {
m = new MetricName("requests-in-flight", metricGrpName, "The current number of in-flight requests awaiting a response.", metricTags);
this.metrics.addMetric(m, new Measurable() {
public double measure(MetricConfig config, long now) {
return client.inFlightRequestCount();
}
});
metrics.addMetric("metadata-age", "The age in seconds of the current producer metadata being used.", new Measurable() {
m = new MetricName("metadata-age", metricGrpName, "The age in seconds of the current producer metadata being used.", metricTags);
metrics.addMetric(m, new Measurable() {
public double measure(MetricConfig config, long now) {
return (now - metadata.lastUpdate()) / 1000.0;
}
@ -376,24 +397,34 @@ public class Sender implements Runnable { @@ -376,24 +397,34 @@ public class Sender implements Runnable {
String topicRecordsCountName = "topic." + topic + ".records-per-batch";
Sensor topicRecordCount = this.metrics.getSensor(topicRecordsCountName);
if (topicRecordCount == null) {
Map<String, String> metricTags = new LinkedHashMap<String, String>();
metricTags.put("client-id", clientId);
metricTags.put("topic", topic);
String metricGrpName = "producer-topic-metrics";
topicRecordCount = this.metrics.sensor(topicRecordsCountName);
topicRecordCount.add("topic." + topic + ".record-send-rate", new Rate());
MetricName m = new MetricName("record-send-rate", metricGrpName , metricTags);
topicRecordCount.add(m, new Rate());
String topicByteRateName = "topic." + topic + ".bytes";
Sensor topicByteRate = this.metrics.sensor(topicByteRateName);
topicByteRate.add("topic." + topic + ".byte-rate", new Rate());
m = new MetricName("byte-rate", metricGrpName , metricTags);
topicByteRate.add(m, new Rate());
String topicCompressionRateName = "topic." + topic + ".compression-rate";
Sensor topicCompressionRate = this.metrics.sensor(topicCompressionRateName);
topicCompressionRate.add("topic." + topic + ".compression-rate", new Avg());
m = new MetricName("compression-rate", metricGrpName , metricTags);
topicCompressionRate.add(m, new Avg());
String topicRetryName = "topic." + topic + ".record-retries";
Sensor topicRetrySensor = this.metrics.sensor(topicRetryName);
topicRetrySensor.add("topic." + topic + ".record-retry-rate", new Rate());
m = new MetricName("record-retry-rate", metricGrpName , metricTags);
topicRetrySensor.add(m, new Rate());
String topicErrorName = "topic." + topic + ".record-errors";
Sensor topicErrorSensor = this.metrics.sensor(topicErrorName);
topicErrorSensor.add("topic." + topic + ".record-error-rate", new Rate());
m = new MetricName("record-error-rate", metricGrpName , metricTags);
topicErrorSensor.add(m, new Rate());
}
}

9
clients/src/main/java/org/apache/kafka/common/Metric.java

@ -22,14 +22,9 @@ package org.apache.kafka.common; @@ -22,14 +22,9 @@ package org.apache.kafka.common;
public interface Metric {
/**
* A unique name for this metric
* A name for this metric
*/
public String name();
/**
* A description of what is measured...this will be "" if no description was given
*/
public String description();
public MetricName metricName();
/**
* The value of the metric

179
clients/src/main/java/org/apache/kafka/common/MetricName.java

@ -0,0 +1,179 @@ @@ -0,0 +1,179 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE
* file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file
* to You under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the
* License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
* an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
* specific language governing permissions and limitations under the License.
*/
package org.apache.kafka.common;
import java.util.HashMap;
import java.util.Map;
import org.apache.kafka.common.utils.Utils;
/**
* The <code>MetricName</code> class encapsulates a metric's name, logical group and its related attributes
* <p/>
* This class captures the following parameters
* <pre>
* <b>name</b> The name of the metric
* <b>group</b> logical group name of the metrics to which this metric belongs.
* <b>description</b> A human-readable description to include in the metric. This is optional.
* <b>tags</b> additional key/value attributes of the metric. This is optional.
* </pre>
* group, tags parameters can be used to create unique metric names while reporting in JMX or any custom reporting.
*
* Ex: standard JMX MBean can be constructed like <b>domainName:type=group,key1=val1,key2=val2</b>
*
* Usage looks something like this:
* <pre>
* // set up metrics:
* Metrics metrics = new Metrics(); // this is the global repository of metrics and sensors
* Sensor sensor = metrics.sensor(&quot;message-sizes&quot;);
* Map<String, String> metricTags = new LinkedHashMap<String, String>();
* metricTags.put("client-id", "producer-1");
* metricTags.put("topic", "topic");
* MetricName metricName = new MetricName(&quot;message-size-avg&quot;, &quot;producer-metrics&quot;, "average message size", metricTags);
* sensor.add(metricName, new Avg());
* metricName = new MetricName(&quot;message-size-max&quot;, &quot;producer-metrics&quot;,metricTags);
* sensor.add(metricName, new Max());
*
* // as messages are sent we record the sizes
* sensor.record(messageSize);
* </pre>
*/
public final class MetricName {
private final String name;
private final String group;
private final String description;
private Map<String, String> tags;
private int hash = 0;
/**
* @param name The name of the metric
* @param group logical group name of the metrics to which this metric belongs
* @param description A human-readable description to include in the metric
* @param tags additional key/value attributes of the metric
*/
public MetricName(String name, String group, String description, Map<String, String> tags) {
this.name = Utils.notNull(name);
this.group = Utils.notNull(group);
this.description = Utils.notNull(description);
this.tags = Utils.notNull(tags);
}
/**
* @param name The name of the metric
* @param group logical group name of the metrics to which this metric belongs
* @param description A human-readable description to include in the metric
* @param keyValue additional key/value attributes of the metric (must come in pairs)
*/
public MetricName(String name, String group, String description, String... keyValue) {
this(name, group, description, getTags(keyValue));
}
private static Map<String, String> getTags(String... keyValue) {
if ((keyValue.length % 2) != 0)
throw new IllegalArgumentException("keyValue needs to be specified in paris");
Map<String, String> tags = new HashMap<String, String>();
for (int i=0; i<(keyValue.length / 2); i++)
tags.put(keyValue[i], keyValue[i+1]);
return tags;
}
/**
* @param name The name of the metric
* @param group logical group name of the metrics to which this metric belongs
* @param tags key/value attributes of the metric
*/
public MetricName(String name, String group, Map<String, String> tags) {
this(name, group, "", tags);
}
/**
* @param name The name of the metric
* @param group logical group name of the metrics to which this metric belongs
* @param description A human-readable description to include in the metric
*/
public MetricName(String name, String group, String description) {
this(name, group, description, new HashMap<String, String>());
}
/**
* @param name The name of the metric
* @param group logical group name of the metrics to which this metric belongs
*/
public MetricName(String name, String group) {
this(name, group, "", new HashMap<String, String>());
}
public String name() {
return this.name;
}
public String group() {
return this.group;
}
public Map<String, String> tags() {
return this.tags;
}
public String description() {
return this.description;
}
@Override
public int hashCode() {
if (hash != 0)
return hash;
final int prime = 31;
int result = 1;
result = prime * result + ((group == null) ? 0 : group.hashCode());
result = prime * result + ((name == null) ? 0 : name.hashCode());
result = prime * result + ((tags == null) ? 0 : tags.hashCode());
this.hash = result;
return result;
}
@Override
public boolean equals(Object obj) {
if (this == obj)
return true;
if (obj == null)
return false;
if (getClass() != obj.getClass())
return false;
MetricName other = (MetricName) obj;
if (group == null) {
if (other.group != null)
return false;
} else if (!group.equals(other.group))
return false;
if (name == null) {
if (other.name != null)
return false;
} else if (!name.equals(other.name))
return false;
if (tags == null) {
if (other.tags != null)
return false;
} else if (!tags.equals(other.tags))
return false;
return true;
}
@Override
public String toString() {
return "MetricName [name=" + name + ", group=" + group + ", description="
+ description + ", tags=" + tags + "]";
}
}

14
clients/src/main/java/org/apache/kafka/common/metrics/CompoundStat.java

@ -16,6 +16,8 @@ @@ -16,6 +16,8 @@
*/
package org.apache.kafka.common.metrics;
import org.apache.kafka.common.MetricName;
import java.util.List;
/**
@ -28,25 +30,19 @@ public interface CompoundStat extends Stat { @@ -28,25 +30,19 @@ public interface CompoundStat extends Stat {
public static class NamedMeasurable {
private final String name;
private final String description;
private final MetricName name;
private final Measurable stat;
public NamedMeasurable(String name, String description, Measurable stat) {
public NamedMeasurable(MetricName name, Measurable stat) {
super();
this.name = name;
this.description = description;
this.stat = stat;
}
public String name() {
public MetricName name() {
return name;
}
public String description() {
return description;
}
public Measurable stat() {
return stat;
}

60
clients/src/main/java/org/apache/kafka/common/metrics/JmxReporter.java

@ -32,6 +32,7 @@ import javax.management.ObjectName; @@ -32,6 +32,7 @@ import javax.management.ObjectName;
import javax.management.ReflectionException;
import org.apache.kafka.common.KafkaException;
import org.apache.kafka.common.MetricName;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@ -80,18 +81,39 @@ public class JmxReporter implements MetricsReporter { @@ -80,18 +81,39 @@ public class JmxReporter implements MetricsReporter {
private KafkaMbean addAttribute(KafkaMetric metric) {
try {
String[] names = split(prefix + metric.name());
String qualifiedName = names[0] + "." + names[1];
if (!this.mbeans.containsKey(qualifiedName))
mbeans.put(qualifiedName, new KafkaMbean(names[0], names[1]));
KafkaMbean mbean = this.mbeans.get(qualifiedName);
mbean.setAttribute(names[2], metric);
MetricName metricName = metric.metricName();
String mBeanName = getMBeanName(metricName);
if (!this.mbeans.containsKey(mBeanName))
mbeans.put(mBeanName, new KafkaMbean(mBeanName));
KafkaMbean mbean = this.mbeans.get(mBeanName);
mbean.setAttribute(metricName.name() , metric);
return mbean;
} catch (JMException e) {
throw new KafkaException("Error creating mbean attribute " + metric.name(), e);
throw new KafkaException("Error creating mbean attribute for metricName :" + metric.metricName(), e);
}
}
/**
* @param metricName
* @return standard JMX MBean name in the following format
* domainName:type=metricType,key1=val1,key2=val2
*/
private String getMBeanName(MetricName metricName) {
StringBuilder mBeanName = new StringBuilder();
mBeanName.append(prefix);
mBeanName.append(":type=");
mBeanName.append(metricName.group());
for (Map.Entry<String, String> entry : metricName.tags().entrySet()) {
if(entry.getKey().length() <= 0 || entry.getValue().length() <= 0)
continue;
mBeanName.append(",");
mBeanName.append(entry.getKey());
mBeanName.append("=");
mBeanName.append(entry.getValue());
}
return mBeanName.toString();
}
public void close() {
synchronized (lock) {
for (KafkaMbean mbean : this.mbeans.values())
@ -118,29 +140,13 @@ public class JmxReporter implements MetricsReporter { @@ -118,29 +140,13 @@ public class JmxReporter implements MetricsReporter {
}
}
private String[] split(String name) {
int attributeStart = name.lastIndexOf('.');
if (attributeStart < 0)
throw new IllegalArgumentException("No MBean name in metric name: " + name);
String attributeName = name.substring(attributeStart + 1, name.length());
String remainder = name.substring(0, attributeStart);
int beanStart = remainder.lastIndexOf('.');
if (beanStart < 0)
return new String[] { "", remainder, attributeName };
String packageName = remainder.substring(0, beanStart);
String beanName = remainder.substring(beanStart + 1, remainder.length());
return new String[] { packageName, beanName, attributeName };
}
private static class KafkaMbean implements DynamicMBean {
private final String beanName;
private final ObjectName objectName;
private final Map<String, KafkaMetric> metrics;
public KafkaMbean(String packageName, String beanName) throws MalformedObjectNameException {
this.beanName = beanName;
public KafkaMbean(String mbeanName) throws MalformedObjectNameException {
this.metrics = new HashMap<String, KafkaMetric>();
this.objectName = new ObjectName(packageName + ":type=" + beanName);
this.objectName = new ObjectName(mbeanName);
}
public ObjectName name() {
@ -179,10 +185,10 @@ public class JmxReporter implements MetricsReporter { @@ -179,10 +185,10 @@ public class JmxReporter implements MetricsReporter {
for (Map.Entry<String, KafkaMetric> entry : this.metrics.entrySet()) {
String attribute = entry.getKey();
KafkaMetric metric = entry.getValue();
attrs[i] = new MBeanAttributeInfo(attribute, double.class.getName(), metric.description(), true, false, false);
attrs[i] = new MBeanAttributeInfo(attribute, double.class.getName(), metric.metricName().description(), true, false, false);
i += 1;
}
return new MBeanInfo(beanName, "", attrs, null, null, null);
return new MBeanInfo(this.getClass().getName(), "", attrs, null, null, null);
}
@Override

18
clients/src/main/java/org/apache/kafka/common/metrics/KafkaMetric.java

@ -17,21 +17,20 @@ @@ -17,21 +17,20 @@
package org.apache.kafka.common.metrics;
import org.apache.kafka.common.Metric;
import org.apache.kafka.common.MetricName;
import org.apache.kafka.common.utils.Time;
public final class KafkaMetric implements Metric {
private final String name;
private final String description;
private MetricName metricName;
private final Object lock;
private final Time time;
private final Measurable measurable;
private MetricConfig config;
KafkaMetric(Object lock, String name, String description, Measurable measurable, MetricConfig config, Time time) {
KafkaMetric(Object lock, MetricName metricName, Measurable measurable, MetricConfig config, Time time) {
super();
this.name = name;
this.description = description;
this.metricName = metricName;
this.lock = lock;
this.measurable = measurable;
this.config = config;
@ -43,13 +42,8 @@ public final class KafkaMetric implements Metric { @@ -43,13 +42,8 @@ public final class KafkaMetric implements Metric {
}
@Override
public String name() {
return this.name;
}
@Override
public String description() {
return this.description;
public MetricName metricName() {
return this.metricName;
}
@Override

58
clients/src/main/java/org/apache/kafka/common/metrics/Metrics.java

@ -17,6 +17,7 @@ import java.util.List; @@ -17,6 +17,7 @@ import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentMap;
import org.apache.kafka.common.MetricName;
import org.apache.kafka.common.utils.CopyOnWriteMap;
import org.apache.kafka.common.utils.SystemTime;
import org.apache.kafka.common.utils.Time;
@ -36,8 +37,10 @@ import org.apache.kafka.common.utils.Utils; @@ -36,8 +37,10 @@ import org.apache.kafka.common.utils.Utils;
* // set up metrics:
* Metrics metrics = new Metrics(); // this is the global repository of metrics and sensors
* Sensor sensor = metrics.sensor(&quot;message-sizes&quot;);
* sensor.add(&quot;kafka.producer.message-sizes.avg&quot;, new Avg());
* sensor.add(&quot;kafka.producer.message-sizes.max&quot;, new Max());
* MetricName metricName = new MetricName(&quot;message-size-avg&quot;, &quot;producer-metrics&quot;);
* sensor.add(metricName, new Avg());
* metricName = new MetricName(&quot;message-size-max&quot;, &quot;producer-metrics&quot;);
* sensor.add(metricName, new Max());
*
* // as messages are sent we record the sizes
* sensor.record(messageSize);
@ -46,7 +49,7 @@ import org.apache.kafka.common.utils.Utils; @@ -46,7 +49,7 @@ import org.apache.kafka.common.utils.Utils;
public class Metrics {
private final MetricConfig config;
private final ConcurrentMap<String, KafkaMetric> metrics;
private final ConcurrentMap<MetricName, KafkaMetric> metrics;
private final ConcurrentMap<String, Sensor> sensors;
private final List<MetricsReporter> reporters;
private final Time time;
@ -83,7 +86,7 @@ public class Metrics { @@ -83,7 +86,7 @@ public class Metrics {
public Metrics(MetricConfig defaultConfig, List<MetricsReporter> reporters, Time time) {
this.config = defaultConfig;
this.sensors = new CopyOnWriteMap<String, Sensor>();
this.metrics = new CopyOnWriteMap<String, KafkaMetric>();
this.metrics = new CopyOnWriteMap<MetricName, KafkaMetric>();
this.reporters = Utils.notNull(reporters);
this.time = time;
for (MetricsReporter reporter : reporters)
@ -139,47 +142,23 @@ public class Metrics { @@ -139,47 +142,23 @@ public class Metrics {
/**
* Add a metric to monitor an object that implements measurable. This metric won't be associated with any sensor.
* This is a way to expose existing values as metrics.
* @param name The name of the metric
* @param metricName The name of the metric
* @param measurable The measurable that will be measured by this metric
*/
public void addMetric(String name, Measurable measurable) {
addMetric(name, "", measurable);
public void addMetric(MetricName metricName, Measurable measurable) {
addMetric(metricName, null, measurable);
}
/**
* Add a metric to monitor an object that implements measurable. This metric won't be associated with any sensor.
* This is a way to expose existing values as metrics.
* @param name The name of the metric
* @param description A human-readable description to include in the metric
* @param measurable The measurable that will be measured by this metric
*/
public void addMetric(String name, String description, Measurable measurable) {
addMetric(name, description, null, measurable);
}
/**
* Add a metric to monitor an object that implements measurable. This metric won't be associated with any sensor.
* This is a way to expose existing values as metrics.
* @param name The name of the metric
* @param config The configuration to use when measuring this measurable
* @param measurable The measurable that will be measured by this metric
*/
public void addMetric(String name, MetricConfig config, Measurable measurable) {
addMetric(name, "", config, measurable);
}
/**
* Add a metric to monitor an object that implements measurable. This metric won't be associated with any sensor.
* This is a way to expose existing values as metrics.
* @param name The name of the metric
* @param description A human-readable description to include in the metric
* @param metricName The name of the metric
* @param config The configuration to use when measuring this measurable
* @param measurable The measurable that will be measured by this metric
*/
public synchronized void addMetric(String name, String description, MetricConfig config, Measurable measurable) {
public synchronized void addMetric(MetricName metricName, MetricConfig config, Measurable measurable) {
KafkaMetric m = new KafkaMetric(new Object(),
Utils.notNull(name),
Utils.notNull(description),
Utils.notNull(metricName),
Utils.notNull(measurable),
config == null ? this.config : config,
time);
@ -195,17 +174,18 @@ public class Metrics { @@ -195,17 +174,18 @@ public class Metrics {
}
synchronized void registerMetric(KafkaMetric metric) {
if (this.metrics.containsKey(metric.name()))
throw new IllegalArgumentException("A metric named '" + metric.name() + "' already exists, can't register another one.");
this.metrics.put(metric.name(), metric);
MetricName metricName = metric.metricName();
if (this.metrics.containsKey(metricName))
throw new IllegalArgumentException("A metric named '" + metricName + "' already exists, can't register another one.");
this.metrics.put(metricName, metric);
for (MetricsReporter reporter : reporters)
reporter.metricChange(metric);
}
/**
* Get all the metrics currently maintained indexed by metric name
* Get all the metrics currently maintained indexed by metricName
*/
public Map<String, KafkaMetric> metrics() {
public Map<MetricName, KafkaMetric> metrics() {
return this.metrics;
}

51
clients/src/main/java/org/apache/kafka/common/metrics/Sensor.java

@ -18,6 +18,7 @@ import java.util.HashSet; @@ -18,6 +18,7 @@ import java.util.HashSet;
import java.util.List;
import java.util.Set;
import org.apache.kafka.common.MetricName;
import org.apache.kafka.common.metrics.CompoundStat.NamedMeasurable;
import org.apache.kafka.common.utils.Time;
import org.apache.kafka.common.utils.Utils;
@ -112,7 +113,7 @@ public final class Sensor { @@ -112,7 +113,7 @@ public final class Sensor {
Quota quota = config.quota();
if (quota != null) {
if (!quota.acceptable(metric.value(timeMs)))
throw new QuotaViolationException("Metric " + metric.name() + " is in violation of its quota of " + quota.bound());
throw new QuotaViolationException(metric.metricName() + " is in violation of its quota of " + quota.bound());
}
}
}
@ -134,55 +135,33 @@ public final class Sensor { @@ -134,55 +135,33 @@ public final class Sensor {
public synchronized void add(CompoundStat stat, MetricConfig config) {
this.stats.add(Utils.notNull(stat));
for (NamedMeasurable m : stat.stats()) {
KafkaMetric metric = new KafkaMetric(this, m.name(), m.description(), m.stat(), config == null ? this.config : config, time);
KafkaMetric metric = new KafkaMetric(this, m.name(), m.stat(), config == null ? this.config : config, time);
this.registry.registerMetric(metric);
this.metrics.add(metric);
}
}
/**
* Add a metric with default configuration and no description. Equivalent to
* {@link Sensor#add(String, String, MeasurableStat, MetricConfig) add(name, "", stat, null)}
*
*/
public void add(String name, MeasurableStat stat) {
add(name, stat, null);
}
/**
* Add a metric with default configuration. Equivalent to
* {@link Sensor#add(String, String, MeasurableStat, MetricConfig) add(name, description, stat, null)}
*
*/
public void add(String name, String description, MeasurableStat stat) {
add(name, description, stat, null);
}
/**
* Add a metric to this sensor with no description. Equivalent to
* {@link Sensor#add(String, String, MeasurableStat, MetricConfig) add(name, "", stat, config)}
* @param name
* @param stat
* @param config
* Register a metric with this sensor
* @param metricName The name of the metric
* @param stat The statistic to keep
*/
public void add(String name, MeasurableStat stat, MetricConfig config) {
add(name, "", stat, config);
public void add(MetricName metricName, MeasurableStat stat) {
add(metricName, stat, null);
}
/**
* Register a metric with this sensor
* @param name The name of the metric
* @param description A description used when reporting the value
* @param metricName The name of the metric
* @param stat The statistic to keep
* @param config A special configuration for this metric. If null use the sensor default configuration.
*/
public synchronized void add(String name, String description, MeasurableStat stat, MetricConfig config) {
KafkaMetric metric = new KafkaMetric(this,
Utils.notNull(name),
Utils.notNull(description),
Utils.notNull(stat),
config == null ? this.config : config,
time);
public synchronized void add(MetricName metricName, MeasurableStat stat, MetricConfig config) {
KafkaMetric metric = new KafkaMetric(new Object(),
Utils.notNull(metricName),
Utils.notNull(stat),
config == null ? this.config : config,
time);
this.registry.registerMetric(metric);
this.metrics.add(metric);
this.stats.add(stat);

18
clients/src/main/java/org/apache/kafka/common/metrics/stats/Percentile.java

@ -16,31 +16,23 @@ @@ -16,31 +16,23 @@
*/
package org.apache.kafka.common.metrics.stats;
import org.apache.kafka.common.MetricName;
public class Percentile {
private final String name;
private final String description;
private final MetricName name;
private final double percentile;
public Percentile(String name, double percentile) {
this(name, "", percentile);
}
public Percentile(String name, String description, double percentile) {
public Percentile(MetricName name, double percentile) {
super();
this.name = name;
this.description = description;
this.percentile = percentile;
}
public String name() {
public MetricName name() {
return this.name;
}
public String description() {
return this.description;
}
public double percentile() {
return this.percentile;
}

2
clients/src/main/java/org/apache/kafka/common/metrics/stats/Percentiles.java

@ -59,7 +59,7 @@ public class Percentiles extends SampledStat implements CompoundStat { @@ -59,7 +59,7 @@ public class Percentiles extends SampledStat implements CompoundStat {
List<NamedMeasurable> ms = new ArrayList<NamedMeasurable>(this.percentiles.length);
for (Percentile percentile : this.percentiles) {
final double pct = percentile.percentile();
ms.add(new NamedMeasurable(percentile.name(), percentile.description(), new Measurable() {
ms.add(new NamedMeasurable(percentile.name(), new Measurable() {
public double measure(MetricConfig config, long now) {
return value(config, now, pct / 100.0);
}

93
clients/src/main/java/org/apache/kafka/common/network/Selector.java

@ -23,6 +23,7 @@ import java.nio.channels.UnresolvedAddressException; @@ -23,6 +23,7 @@ import java.nio.channels.UnresolvedAddressException;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.Iterator;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.Set;
@ -31,6 +32,7 @@ import java.util.concurrent.TimeUnit; @@ -31,6 +32,7 @@ import java.util.concurrent.TimeUnit;
import org.apache.kafka.common.KafkaException;
import org.apache.kafka.common.metrics.Measurable;
import org.apache.kafka.common.metrics.MetricConfig;
import org.apache.kafka.common.MetricName;
import org.apache.kafka.common.metrics.Metrics;
import org.apache.kafka.common.metrics.Sensor;
import org.apache.kafka.common.metrics.stats.Avg;
@ -81,17 +83,21 @@ public class Selector implements Selectable { @@ -81,17 +83,21 @@ public class Selector implements Selectable {
private final List<Integer> connected;
private final Time time;
private final SelectorMetrics sensors;
private final String metricGrpPrefix;
private final Map<String, String> metricTags;
/**
* Create a new selector
*/
public Selector(Metrics metrics, Time time) {
public Selector(Metrics metrics, Time time , String metricGrpPrefix , Map<String, String> metricTags) {
try {
this.selector = java.nio.channels.Selector.open();
} catch (IOException e) {
throw new KafkaException(e);
}
this.time = time;
this.metricGrpPrefix = metricGrpPrefix;
this.metricTags = metricTags;
this.keys = new HashMap<Integer, SelectionKey>();
this.completedSends = new ArrayList<NetworkSend>();
this.completedReceives = new ArrayList<NetworkReceive>();
@ -410,42 +416,52 @@ public class Selector implements Selectable { @@ -410,42 +416,52 @@ public class Selector implements Selectable {
public SelectorMetrics(Metrics metrics) {
this.metrics = metrics;
String metricGrpName = metricGrpPrefix + "-metrics";
this.connectionClosed = this.metrics.sensor("connections-closed");
this.connectionClosed.add("connection-close-rate", "Connections closed per second in the window.", new Rate());
MetricName metricName = new MetricName("connection-close-rate", metricGrpName, "Connections closed per second in the window.", metricTags);
this.connectionClosed.add(metricName, new Rate());
this.connectionCreated = this.metrics.sensor("connections-created");
this.connectionCreated.add("connection-creation-rate", "New connections established per second in the window.", new Rate());
metricName = new MetricName("connection-creation-rate", metricGrpName, "New connections established per second in the window.", metricTags);
this.connectionCreated.add(metricName, new Rate());
this.bytesTransferred = this.metrics.sensor("bytes-sent-received");
bytesTransferred.add("network-io-rate",
"The average number of network operations (reads or writes) on all connections per second.",
new Rate(new Count()));
metricName = new MetricName("network-io-rate", metricGrpName, "The average number of network operations (reads or writes) on all connections per second.", metricTags);
bytesTransferred.add(metricName, new Rate(new Count()));
this.bytesSent = this.metrics.sensor("bytes-sent", bytesTransferred);
this.bytesSent.add("outgoing-byte-rate", "The average number of outgoing bytes sent per second to all servers.", new Rate());
this.bytesSent.add("request-rate", "The average number of requests sent per second.", new Rate(new Count()));
this.bytesSent.add("request-size-avg", "The average size of all requests in the window..", new Avg());
this.bytesSent.add("request-size-max", "The maximum size of any request sent in the window.", new Max());
metricName = new MetricName("outgoing-byte-rate", metricGrpName, "The average number of outgoing bytes sent per second to all servers.", metricTags);
this.bytesSent.add(metricName, new Rate());
metricName = new MetricName("request-rate", metricGrpName, "The average number of requests sent per second.", metricTags);
this.bytesSent.add(metricName, new Rate(new Count()));
metricName = new MetricName("request-size-avg", metricGrpName, "The average size of all requests in the window..", metricTags);
this.bytesSent.add(metricName, new Avg());
metricName = new MetricName("request-size-max", metricGrpName, "The maximum size of any request sent in the window.", metricTags);
this.bytesSent.add(metricName, new Max());
this.bytesReceived = this.metrics.sensor("bytes-received", bytesTransferred);
this.bytesReceived.add("incoming-byte-rate", "Bytes/second read off all sockets", new Rate());
this.bytesReceived.add("response-rate", "Responses received sent per second.", new Rate(new Count()));
metricName = new MetricName("incoming-byte-rate", metricGrpName, "Bytes/second read off all sockets", metricTags);
this.bytesReceived.add(metricName, new Rate());
metricName = new MetricName("response-rate", metricGrpName, "Responses received sent per second.", metricTags);
this.bytesReceived.add(metricName, new Rate(new Count()));
this.selectTime = this.metrics.sensor("select-time");
this.selectTime.add("select-rate",
"Number of times the I/O layer checked for new I/O to perform per second",
new Rate(new Count()));
this.selectTime.add("io-wait-time-ns-avg",
"The average length of time the I/O thread spent waiting for a socket ready for reads or writes in nanoseconds.",
new Avg());
this.selectTime.add("io-wait-ratio", "The fraction of time the I/O thread spent waiting.", new Rate(TimeUnit.NANOSECONDS));
metricName = new MetricName("select-rate", metricGrpName, "Number of times the I/O layer checked for new I/O to perform per second", metricTags);
this.selectTime.add(metricName, new Rate(new Count()));
metricName = new MetricName("io-wait-time-ns-avg", metricGrpName, "The average length of time the I/O thread spent waiting for a socket ready for reads or writes in nanoseconds.", metricTags);
this.selectTime.add(metricName, new Avg());
metricName = new MetricName("io-wait-ratio", metricGrpName, "The fraction of time the I/O thread spent waiting.", metricTags);
this.selectTime.add(metricName, new Rate(TimeUnit.NANOSECONDS));
this.ioTime = this.metrics.sensor("io-time");
this.ioTime.add("io-time-ns-avg", "The average length of time for I/O per select call in nanoseconds.", new Avg());
this.ioTime.add("io-ratio", "The fraction of time the I/O thread spent doing I/O", new Rate(TimeUnit.NANOSECONDS));
metricName = new MetricName("io-time-ns-avg", metricGrpName, "The average length of time for I/O per select call in nanoseconds.", metricTags);
this.ioTime.add(metricName, new Avg());
metricName = new MetricName("io-ratio", metricGrpName, "The fraction of time the I/O thread spent doing I/O", metricTags);
this.ioTime.add(metricName, new Rate(TimeUnit.NANOSECONDS));
this.metrics.addMetric("connection-count", "The current number of active connections.", new Measurable() {
metricName = new MetricName("connection-count", metricGrpName, "The current number of active connections.", metricTags);
this.metrics.addMetric(metricName, new Measurable() {
public double measure(MetricConfig config, long now) {
return keys.size();
}
@ -459,25 +475,34 @@ public class Selector implements Selectable { @@ -459,25 +475,34 @@ public class Selector implements Selectable {
String nodeRequestName = "node-" + node + ".bytes-sent";
Sensor nodeRequest = this.metrics.getSensor(nodeRequestName);
if (nodeRequest == null) {
String metricGrpName = metricGrpPrefix + "-node-metrics";
Map<String, String> tags = new LinkedHashMap<String, String>(metricTags);
tags.put("node-id", "node-"+node);
nodeRequest = this.metrics.sensor(nodeRequestName);
nodeRequest.add("node-" + node + ".outgoing-byte-rate", new Rate());
nodeRequest.add("node-" + node + ".request-rate",
"The average number of requests sent per second.",
new Rate(new Count()));
nodeRequest.add("node-" + node + ".request-size-avg", "The average size of all requests in the window..", new Avg());
nodeRequest.add("node-" + node + ".request-size-max", "The maximum size of any request sent in the window.", new Max());
MetricName metricName = new MetricName("outgoing-byte-rate", metricGrpName, tags);
nodeRequest.add(metricName, new Rate());
metricName = new MetricName("request-rate", metricGrpName, "The average number of requests sent per second.", tags);
nodeRequest.add(metricName, new Rate(new Count()));
metricName = new MetricName("request-size-avg", metricGrpName, "The average size of all requests in the window..", tags);
nodeRequest.add(metricName, new Avg());
metricName = new MetricName("request-size-max", metricGrpName, "The maximum size of any request sent in the window.", tags);
nodeRequest.add(metricName, new Max());
String nodeResponseName = "node-" + node + ".bytes-received";
Sensor nodeResponse = this.metrics.sensor(nodeResponseName);
nodeResponse.add("node-" + node + ".incoming-byte-rate", new Rate());
nodeResponse.add("node-" + node + ".response-rate",
"The average number of responses received per second.",
new Rate(new Count()));
metricName = new MetricName("incoming-byte-rate", metricGrpName, tags);
nodeResponse.add(metricName, new Rate());
metricName = new MetricName("response-rate", metricGrpName, "The average number of responses received per second.", tags);
nodeResponse.add(metricName, new Rate(new Count()));
String nodeTimeName = "node-" + node + ".latency";
Sensor nodeRequestTime = this.metrics.sensor(nodeTimeName);
nodeRequestTime.add("node-" + node + ".request-latency-avg", new Avg());
nodeRequestTime.add("node-" + node + ".request-latency-max", new Max());
metricName = new MetricName("request-latency-avg", metricGrpName, tags);
nodeRequestTime.add(metricName, new Avg());
metricName = new MetricName("request-latency-max", metricGrpName, tags);
nodeRequestTime.add(metricName, new Max());
}
}
}

14
clients/src/test/java/org/apache/kafka/clients/producer/BufferPoolTest.java

@ -24,7 +24,9 @@ import org.junit.Test; @@ -24,7 +24,9 @@ import org.junit.Test;
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.atomic.AtomicBoolean;
@ -33,6 +35,8 @@ import static org.junit.Assert.*; @@ -33,6 +35,8 @@ import static org.junit.Assert.*;
public class BufferPoolTest {
private MockTime time = new MockTime();
private Metrics metrics = new Metrics(time);
String metricGroup = "TestMetrics";
Map<String, String> metricTags = new LinkedHashMap<String, String>();
/**
* Test the simple non-blocking allocation paths
@ -41,7 +45,7 @@ public class BufferPoolTest { @@ -41,7 +45,7 @@ public class BufferPoolTest {
public void testSimple() throws Exception {
int totalMemory = 64 * 1024;
int size = 1024;
BufferPool pool = new BufferPool(totalMemory, size, false, metrics, time);
BufferPool pool = new BufferPool(totalMemory, size, false, metrics, time, metricGroup, metricTags);
ByteBuffer buffer = pool.allocate(size);
assertEquals("Buffer size should equal requested size.", size, buffer.limit());
assertEquals("Unallocated memory should have shrunk", totalMemory - size, pool.unallocatedMemory());
@ -68,7 +72,7 @@ public class BufferPoolTest { @@ -68,7 +72,7 @@ public class BufferPoolTest {
*/
@Test(expected = IllegalArgumentException.class)
public void testCantAllocateMoreMemoryThanWeHave() throws Exception {
BufferPool pool = new BufferPool(1024, 512, true, metrics, time);
BufferPool pool = new BufferPool(1024, 512, true, metrics, time, metricGroup, metricTags);
ByteBuffer buffer = pool.allocate(1024);
assertEquals(1024, buffer.limit());
pool.deallocate(buffer);
@ -77,7 +81,7 @@ public class BufferPoolTest { @@ -77,7 +81,7 @@ public class BufferPoolTest {
@Test
public void testNonblockingMode() throws Exception {
BufferPool pool = new BufferPool(2, 1, false, metrics, time);
BufferPool pool = new BufferPool(2, 1, false, metrics, time, metricGroup, metricTags);
pool.allocate(1);
try {
pool.allocate(2);
@ -92,7 +96,7 @@ public class BufferPoolTest { @@ -92,7 +96,7 @@ public class BufferPoolTest {
*/
@Test
public void testDelayedAllocation() throws Exception {
BufferPool pool = new BufferPool(5 * 1024, 1024, true, metrics, time);
BufferPool pool = new BufferPool(5 * 1024, 1024, true, metrics, time, metricGroup, metricTags);
ByteBuffer buffer = pool.allocate(1024);
CountDownLatch doDealloc = asyncDeallocate(pool, buffer);
CountDownLatch allocation = asyncAllocate(pool, 5 * 1024);
@ -141,7 +145,7 @@ public class BufferPoolTest { @@ -141,7 +145,7 @@ public class BufferPoolTest {
final int iterations = 50000;
final int poolableSize = 1024;
final int totalMemory = numThreads / 2 * poolableSize;
final BufferPool pool = new BufferPool(totalMemory, poolableSize, true, metrics, time);
final BufferPool pool = new BufferPool(totalMemory, poolableSize, true, metrics, time, metricGroup, metricTags);
List<StressTestThread> threads = new ArrayList<StressTestThread>();
for (int i = 0; i < numThreads; i++)
threads.add(new StressTestThread(pool, iterations));

16
clients/src/test/java/org/apache/kafka/clients/producer/RecordAccumulatorTest.java

@ -22,7 +22,9 @@ import java.util.ArrayList; @@ -22,7 +22,9 @@ import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.Iterator;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.Set;
import org.apache.kafka.clients.producer.internals.RecordAccumulator;
@ -59,11 +61,13 @@ public class RecordAccumulatorTest { @@ -59,11 +61,13 @@ public class RecordAccumulatorTest {
private int msgSize = Records.LOG_OVERHEAD + Record.recordSize(key, value);
private Cluster cluster = new Cluster(Arrays.asList(node1, node2), Arrays.asList(part1, part2, part3));
private Metrics metrics = new Metrics(time);
String metricGroup = "TestMetrics";
Map<String, String> metricTags = new LinkedHashMap<String, String>();
@Test
public void testFull() throws Exception {
long now = time.milliseconds();
RecordAccumulator accum = new RecordAccumulator(1024, 10 * 1024, 10L, 100L, false, metrics, time);
RecordAccumulator accum = new RecordAccumulator(1024, 10 * 1024, 10L, 100L, false, metrics, time, metricTags);
int appends = 1024 / msgSize;
for (int i = 0; i < appends; i++) {
accum.append(tp1, key, value, CompressionType.NONE, null);
@ -86,7 +90,7 @@ public class RecordAccumulatorTest { @@ -86,7 +90,7 @@ public class RecordAccumulatorTest {
@Test
public void testAppendLarge() throws Exception {
int batchSize = 512;
RecordAccumulator accum = new RecordAccumulator(batchSize, 10 * 1024, 0L, 100L, false, metrics, time);
RecordAccumulator accum = new RecordAccumulator(batchSize, 10 * 1024, 0L, 100L, false, metrics, time, metricTags);
accum.append(tp1, key, new byte[2 * batchSize], CompressionType.NONE, null);
assertEquals("Our partition's leader should be ready", Collections.singleton(node1), accum.ready(cluster, time.milliseconds()).readyNodes);
}
@ -94,7 +98,7 @@ public class RecordAccumulatorTest { @@ -94,7 +98,7 @@ public class RecordAccumulatorTest {
@Test
public void testLinger() throws Exception {
long lingerMs = 10L;
RecordAccumulator accum = new RecordAccumulator(1024, 10 * 1024, lingerMs, 100L, false, metrics, time);
RecordAccumulator accum = new RecordAccumulator(1024, 10 * 1024, lingerMs, 100L, false, metrics, time, metricTags);
accum.append(tp1, key, value, CompressionType.NONE, null);
assertEquals("No partitions should be ready", 0, accum.ready(cluster, time.milliseconds()).readyNodes.size());
time.sleep(10);
@ -111,7 +115,7 @@ public class RecordAccumulatorTest { @@ -111,7 +115,7 @@ public class RecordAccumulatorTest {
@Test
public void testPartialDrain() throws Exception {
RecordAccumulator accum = new RecordAccumulator(1024, 10 * 1024, 10L, 100L, false, metrics, time);
RecordAccumulator accum = new RecordAccumulator(1024, 10 * 1024, 10L, 100L, false, metrics, time, metricTags);
int appends = 1024 / msgSize + 1;
List<TopicPartition> partitions = asList(tp1, tp2);
for (TopicPartition tp : partitions) {
@ -129,7 +133,7 @@ public class RecordAccumulatorTest { @@ -129,7 +133,7 @@ public class RecordAccumulatorTest {
final int numThreads = 5;
final int msgs = 10000;
final int numParts = 2;
final RecordAccumulator accum = new RecordAccumulator(1024, 10 * 1024, 0L, 100L, true, metrics, time);
final RecordAccumulator accum = new RecordAccumulator(1024, 10 * 1024, 0L, 100L, true, metrics, time, metricTags);
List<Thread> threads = new ArrayList<Thread>();
for (int i = 0; i < numThreads; i++) {
threads.add(new Thread() {
@ -169,7 +173,7 @@ public class RecordAccumulatorTest { @@ -169,7 +173,7 @@ public class RecordAccumulatorTest {
public void testNextReadyCheckDelay() throws Exception {
// Next check time will use lingerMs since this test won't trigger any retries/backoff
long lingerMs = 10L;
RecordAccumulator accum = new RecordAccumulator(1024, 10 * 1024, lingerMs, 100L, false, metrics, time);
RecordAccumulator accum = new RecordAccumulator(1024, 10 * 1024, lingerMs, 100L, false, metrics, time, metricTags);
// Just short of going over the limit so we trigger linger time
int appends = 1024 / msgSize;

11
clients/src/test/java/org/apache/kafka/clients/producer/SenderTest.java

@ -16,6 +16,8 @@ import static org.junit.Assert.assertEquals; @@ -16,6 +16,8 @@ import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
import java.util.LinkedHashMap;
import java.util.Map;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
@ -50,7 +52,8 @@ public class SenderTest { @@ -50,7 +52,8 @@ public class SenderTest {
private Metadata metadata = new Metadata(0, Long.MAX_VALUE);
private Cluster cluster = TestUtils.singletonCluster("test", 1);
private Metrics metrics = new Metrics(time);
private RecordAccumulator accumulator = new RecordAccumulator(batchSize, 1024 * 1024, 0L, 0L, false, metrics, time);
Map<String, String> metricTags = new LinkedHashMap<String, String>();
private RecordAccumulator accumulator = new RecordAccumulator(batchSize, 1024 * 1024, 0L, 0L, false, metrics, time, metricTags);
private Sender sender = new Sender(client,
metadata,
this.accumulator,
@ -59,7 +62,8 @@ public class SenderTest { @@ -59,7 +62,8 @@ public class SenderTest {
MAX_RETRIES,
REQUEST_TIMEOUT_MS,
metrics,
time);
time,
"clientId");
@Before
public void setup() {
@ -93,7 +97,8 @@ public class SenderTest { @@ -93,7 +97,8 @@ public class SenderTest {
maxRetries,
REQUEST_TIMEOUT_MS,
new Metrics(),
time);
time,
"clientId");
// do a successful retry
Future<RecordMetadata> future = accumulator.append(tp, "key".getBytes(), "value".getBytes(), CompressionType.NONE, null).future;
sender.run(time.milliseconds()); // connect

13
clients/src/test/java/org/apache/kafka/common/metrics/JmxReporterTest.java

@ -16,10 +16,7 @@ @@ -16,10 +16,7 @@
*/
package org.apache.kafka.common.metrics;
import org.apache.kafka.common.metrics.JmxReporter;
import org.apache.kafka.common.metrics.Metrics;
import org.apache.kafka.common.metrics.Sensor;
import org.apache.kafka.common.MetricName;
import org.apache.kafka.common.metrics.stats.Avg;
import org.apache.kafka.common.metrics.stats.Total;
import org.junit.Test;
@ -31,10 +28,10 @@ public class JmxReporterTest { @@ -31,10 +28,10 @@ public class JmxReporterTest {
Metrics metrics = new Metrics();
metrics.addReporter(new JmxReporter());
Sensor sensor = metrics.sensor("kafka.requests");
sensor.add("pack.bean1.avg", new Avg());
sensor.add("pack.bean2.total", new Total());
sensor.add(new MetricName("pack.bean1.avg", "grp1"), new Avg());
sensor.add(new MetricName("pack.bean2.total", "grp2"), new Total());
Sensor sensor2 = metrics.sensor("kafka.blah");
sensor2.add("pack.bean1.some", new Total());
sensor2.add("pack.bean2.some", new Total());
sensor2.add(new MetricName("pack.bean1.some", "grp1"), new Total());
sensor2.add(new MetricName("pack.bean2.some", "grp1"), new Total());
}
}

87
clients/src/test/java/org/apache/kafka/common/metrics/MetricsTest.java

@ -16,9 +16,12 @@ import static org.junit.Assert.assertEquals; @@ -16,9 +16,12 @@ import static org.junit.Assert.assertEquals;
import static org.junit.Assert.fail;
import java.util.Arrays;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.TimeUnit;
import org.apache.kafka.common.Metric;
import org.apache.kafka.common.MetricName;
import org.apache.kafka.common.metrics.stats.Avg;
import org.apache.kafka.common.metrics.stats.Count;
import org.apache.kafka.common.metrics.stats.Max;
@ -38,22 +41,40 @@ public class MetricsTest { @@ -38,22 +41,40 @@ public class MetricsTest {
MockTime time = new MockTime();
Metrics metrics = new Metrics(new MetricConfig(), Arrays.asList((MetricsReporter) new JmxReporter()), time);
@Test
public void testMetricName() {
MetricName n1 = new MetricName("name", "group", "description", "key1", "value1");
Map<String, String> tags = new HashMap<String, String>();
tags.put("key1", "value1");
MetricName n2 = new MetricName("name", "group", "description", tags);
assertEquals("metric names created in two different ways should be equal", n1, n2);
try {
new MetricName("name", "group", "description", "key1");
fail("Creating MetricName with an old number of keyValue should fail");
} catch (IllegalArgumentException e) {
// this is expected
}
}
@Test
public void testSimpleStats() throws Exception {
ConstantMeasurable measurable = new ConstantMeasurable();
metrics.addMetric("direct.measurable", measurable);
metrics.addMetric(new MetricName("direct.measurable", "grp1", "The fraction of time an appender waits for space allocation."), measurable);
Sensor s = metrics.sensor("test.sensor");
s.add("test.avg", new Avg());
s.add("test.max", new Max());
s.add("test.min", new Min());
s.add("test.rate", new Rate(TimeUnit.SECONDS));
s.add("test.occurences", new Rate(TimeUnit.SECONDS, new Count()));
s.add("test.count", new Count());
s.add(new Percentiles(100, -100, 100, BucketSizing.CONSTANT, new Percentile("test.median", 50.0), new Percentile("test.perc99_9",
99.9)));
s.add(new MetricName("test.avg", "grp1"), new Avg());
s.add(new MetricName("test.max", "grp1"), new Max());
s.add(new MetricName("test.min", "grp1"), new Min());
s.add(new MetricName("test.rate", "grp1"), new Rate(TimeUnit.SECONDS));
s.add(new MetricName("test.occurences", "grp1"), new Rate(TimeUnit.SECONDS, new Count()));
s.add(new MetricName("test.count", "grp1"), new Count());
s.add(new Percentiles(100, -100, 100, BucketSizing.CONSTANT,
new Percentile(new MetricName("test.median", "grp1"), 50.0),
new Percentile(new MetricName("test.perc99_9", "grp1"),99.9)));
Sensor s2 = metrics.sensor("test.sensor2");
s2.add("s2.total", new Total());
s2.add(new MetricName("s2.total", "grp1"), new Total());
s2.record(5.0);
for (int i = 0; i < 10; i++)
@ -62,27 +83,27 @@ public class MetricsTest { @@ -62,27 +83,27 @@ public class MetricsTest {
// pretend 2 seconds passed...
time.sleep(2000);
assertEquals("s2 reflects the constant value", 5.0, metrics.metrics().get("s2.total").value(), EPS);
assertEquals("Avg(0...9) = 4.5", 4.5, metrics.metrics().get("test.avg").value(), EPS);
assertEquals("Max(0...9) = 9", 9.0, metrics.metrics().get("test.max").value(), EPS);
assertEquals("Min(0...9) = 0", 0.0, metrics.metrics().get("test.min").value(), EPS);
assertEquals("Rate(0...9) = 22.5", 22.5, metrics.metrics().get("test.rate").value(), EPS);
assertEquals("Occurences(0...9) = 5", 5.0, metrics.metrics().get("test.occurences").value(), EPS);
assertEquals("Count(0...9) = 10", 10.0, metrics.metrics().get("test.count").value(), EPS);
assertEquals("s2 reflects the constant value", 5.0, metrics.metrics().get(new MetricName("s2.total", "grp1")).value(), EPS);
assertEquals("Avg(0...9) = 4.5", 4.5, metrics.metrics().get(new MetricName("test.avg", "grp1")).value(), EPS);
assertEquals("Max(0...9) = 9", 9.0, metrics.metrics().get(new MetricName("test.max", "grp1")).value(), EPS);
assertEquals("Min(0...9) = 0", 0.0, metrics.metrics().get(new MetricName("test.min", "grp1")).value(), EPS);
assertEquals("Rate(0...9) = 22.5", 22.5, metrics.metrics().get(new MetricName("test.rate", "grp1")).value(), EPS);
assertEquals("Occurences(0...9) = 5", 5.0, metrics.metrics().get(new MetricName("test.occurences", "grp1")).value(), EPS);
assertEquals("Count(0...9) = 10", 10.0, metrics.metrics().get(new MetricName("test.count", "grp1")).value(), EPS);
}
@Test
public void testHierarchicalSensors() {
Sensor parent1 = metrics.sensor("test.parent1");
parent1.add("test.parent1.count", new Count());
parent1.add(new MetricName("test.parent1.count", "grp1"), new Count());
Sensor parent2 = metrics.sensor("test.parent2");
parent2.add("test.parent2.count", new Count());
parent2.add(new MetricName("test.parent2.count", "grp1"), new Count());
Sensor child1 = metrics.sensor("test.child1", parent1, parent2);
child1.add("test.child1.count", new Count());
child1.add(new MetricName("test.child1.count", "grp1"), new Count());
Sensor child2 = metrics.sensor("test.child2", parent1);
child2.add("test.child2.count", new Count());
child2.add(new MetricName("test.child2.count", "grp1"), new Count());
Sensor grandchild = metrics.sensor("test.grandchild", child1);
grandchild.add("test.grandchild.count", new Count());
grandchild.add(new MetricName("test.grandchild.count", "grp1"), new Count());
/* increment each sensor one time */
parent1.record();
@ -150,15 +171,15 @@ public class MetricsTest { @@ -150,15 +171,15 @@ public class MetricsTest {
@Test(expected = IllegalArgumentException.class)
public void testDuplicateMetricName() {
metrics.sensor("test").add("test", new Avg());
metrics.sensor("test2").add("test", new Total());
metrics.sensor("test").add(new MetricName("test", "grp1"), new Avg());
metrics.sensor("test2").add(new MetricName("test", "grp1"), new Total());
}
@Test
public void testQuotas() {
Sensor sensor = metrics.sensor("test");
sensor.add("test1.total", new Total(), new MetricConfig().quota(Quota.lessThan(5.0)));
sensor.add("test2.total", new Total(), new MetricConfig().quota(Quota.moreThan(0.0)));
sensor.add(new MetricName("test1.total", "grp1"), new Total(), new MetricConfig().quota(Quota.lessThan(5.0)));
sensor.add(new MetricName("test2.total", "grp1"), new Total(), new MetricConfig().quota(Quota.moreThan(0.0)));
sensor.record(5.0);
try {
sensor.record(1.0);
@ -166,7 +187,7 @@ public class MetricsTest { @@ -166,7 +187,7 @@ public class MetricsTest {
} catch (QuotaViolationException e) {
// this is good
}
assertEquals(6.0, metrics.metrics().get("test1.total").value(), EPS);
assertEquals(6.0, metrics.metrics().get(new MetricName("test1.total", "grp1")).value(), EPS);
sensor.record(-6.0);
try {
sensor.record(-1.0);
@ -183,15 +204,15 @@ public class MetricsTest { @@ -183,15 +204,15 @@ public class MetricsTest {
0.0,
100.0,
BucketSizing.CONSTANT,
new Percentile("test.p25", 25),
new Percentile("test.p50", 50),
new Percentile("test.p75", 75));
new Percentile(new MetricName("test.p25", "grp1"), 25),
new Percentile(new MetricName("test.p50", "grp1"), 50),
new Percentile(new MetricName("test.p75", "grp1"), 75));
MetricConfig config = new MetricConfig().eventWindow(50).samples(2);
Sensor sensor = metrics.sensor("test", config);
sensor.add(percs);
Metric p25 = this.metrics.metrics().get("test.p25");
Metric p50 = this.metrics.metrics().get("test.p50");
Metric p75 = this.metrics.metrics().get("test.p75");
Metric p25 = this.metrics.metrics().get(new MetricName("test.p25", "grp1"));
Metric p50 = this.metrics.metrics().get(new MetricName("test.p50", "grp1"));
Metric p75 = this.metrics.metrics().get(new MetricName("test.p75", "grp1"));
// record two windows worth of sequential values
for (int i = 0; i < buckets; i++)

3
clients/src/test/java/org/apache/kafka/common/network/SelectorTest.java

@ -25,6 +25,7 @@ import java.net.Socket; @@ -25,6 +25,7 @@ import java.net.Socket;
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.Collections;
import java.util.LinkedHashMap;
import java.util.List;
import org.apache.kafka.common.metrics.Metrics;
@ -50,7 +51,7 @@ public class SelectorTest { @@ -50,7 +51,7 @@ public class SelectorTest {
public void setup() throws Exception {
this.server = new EchoServer();
this.server.start();
this.selector = new Selector(new Metrics(), new MockTime());
this.selector = new Selector(new Metrics(), new MockTime() , "MetricGroup", new LinkedHashMap<String, String>());
}
@After

11
clients/src/test/java/org/apache/kafka/test/MetricsBench.java

@ -14,6 +14,7 @@ package org.apache.kafka.test; @@ -14,6 +14,7 @@ package org.apache.kafka.test;
import java.util.Arrays;
import org.apache.kafka.common.MetricName;
import org.apache.kafka.common.metrics.Metrics;
import org.apache.kafka.common.metrics.Sensor;
import org.apache.kafka.common.metrics.stats.Avg;
@ -31,15 +32,15 @@ public class MetricsBench { @@ -31,15 +32,15 @@ public class MetricsBench {
Sensor parent = metrics.sensor("parent");
Sensor child = metrics.sensor("child", parent);
for (Sensor sensor : Arrays.asList(parent, child)) {
sensor.add(sensor.name() + ".avg", new Avg());
sensor.add(sensor.name() + ".count", new Count());
sensor.add(sensor.name() + ".max", new Max());
sensor.add(new MetricName(sensor.name() + ".avg", "grp1"), new Avg());
sensor.add(new MetricName(sensor.name() + ".count", "grp1"), new Count());
sensor.add(new MetricName(sensor.name() + ".max", "grp1"), new Max());
sensor.add(new Percentiles(1024,
0.0,
iters,
BucketSizing.CONSTANT,
new Percentile(sensor.name() + ".median", 50.0),
new Percentile(sensor.name() + ".p_99", 99.0)));
new Percentile(new MetricName(sensor.name() + ".median", "grp1"), 50.0),
new Percentile(new MetricName(sensor.name() + ".p_99", "grp1"), 99.0)));
}
long start = System.nanoTime();
for (int i = 0; i < iters; i++)

Loading…
Cancel
Save