Browse Source

KAFKA-8696: clean up Sum/Count/Total metrics (#7057)

* Clean up one redundant and one misplaced metric
* Clarify the relationship among these metrics to avoid future confusion

Reviewers: Matthias J. Sax <matthias@confluent.io>, Bill Bejeck <bill@confluent.io>, Guozhang Wang <wangguoz@gmail.com>
pull/7117/head
John Roesler 5 years ago committed by Guozhang Wang
parent
commit
a8aedc85eb
  1. 4
      clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java
  2. 4
      clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java
  3. 2
      clients/src/main/java/org/apache/kafka/common/metrics/MeasurableStat.java
  4. 30
      clients/src/main/java/org/apache/kafka/common/metrics/stats/Count.java
  5. 22
      clients/src/main/java/org/apache/kafka/common/metrics/stats/CumulativeCount.java
  6. 50
      clients/src/main/java/org/apache/kafka/common/metrics/stats/CumulativeSum.java
  7. 24
      clients/src/main/java/org/apache/kafka/common/metrics/stats/Meter.java
  8. 27
      clients/src/main/java/org/apache/kafka/common/metrics/stats/Rate.java
  9. 30
      clients/src/main/java/org/apache/kafka/common/metrics/stats/Sum.java
  10. 36
      clients/src/main/java/org/apache/kafka/common/metrics/stats/Total.java
  11. 35
      clients/src/main/java/org/apache/kafka/common/metrics/stats/WindowedCount.java
  12. 48
      clients/src/main/java/org/apache/kafka/common/metrics/stats/WindowedSum.java
  13. 18
      clients/src/main/java/org/apache/kafka/common/network/Selector.java
  14. 14
      clients/src/test/java/org/apache/kafka/common/metrics/JmxReporterTest.java
  15. 8
      clients/src/test/java/org/apache/kafka/common/metrics/KafkaMbeanTest.java
  16. 64
      clients/src/test/java/org/apache/kafka/common/metrics/MetricsTest.java
  17. 4
      clients/src/test/java/org/apache/kafka/common/metrics/SensorTest.java
  18. 2
      clients/src/test/java/org/apache/kafka/common/metrics/stats/MeterTest.java
  19. 4
      clients/src/test/java/org/apache/kafka/test/MetricsBench.java
  20. 14
      connect/runtime/src/main/java/org/apache/kafka/connect/runtime/Worker.java
  21. 10
      connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSinkTask.java
  22. 6
      connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSourceTask.java
  23. 4
      connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/DistributedHerder.java
  24. 16
      connect/runtime/src/main/java/org/apache/kafka/connect/runtime/errors/ErrorHandlingMetrics.java
  25. 5
      core/src/main/scala/kafka/network/SocketServer.scala
  26. 4
      core/src/main/scala/kafka/server/ClientQuotaManager.scala
  27. 8
      streams/src/main/java/org/apache/kafka/streams/kstream/internals/metrics/Sensors.java
  28. 8
      streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java
  29. 5
      streams/src/main/java/org/apache/kafka/streams/processor/internals/metrics/StreamsMetricsImpl.java
  30. 4
      streams/src/test/java/org/apache/kafka/streams/processor/internals/RecordCollectorTest.java
  31. 4
      streams/src/test/java/org/apache/kafka/streams/processor/internals/StandbyTaskTest.java
  32. 8
      streams/test-utils/src/main/java/org/apache/kafka/streams/TopologyTestDriver.java

4
clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java

@ -42,9 +42,9 @@ import org.apache.kafka.common.metrics.MetricConfig; @@ -42,9 +42,9 @@ import org.apache.kafka.common.metrics.MetricConfig;
import org.apache.kafka.common.metrics.Metrics;
import org.apache.kafka.common.metrics.Sensor;
import org.apache.kafka.common.metrics.stats.Avg;
import org.apache.kafka.common.metrics.stats.Count;
import org.apache.kafka.common.metrics.stats.Max;
import org.apache.kafka.common.metrics.stats.Meter;
import org.apache.kafka.common.metrics.stats.WindowedCount;
import org.apache.kafka.common.protocol.Errors;
import org.apache.kafka.common.requests.FindCoordinatorRequest;
import org.apache.kafka.common.requests.FindCoordinatorRequest.CoordinatorType;
@ -961,7 +961,7 @@ public abstract class AbstractCoordinator implements Closeable { @@ -961,7 +961,7 @@ public abstract class AbstractCoordinator implements Closeable {
}
protected Meter createMeter(Metrics metrics, String groupName, String baseName, String descriptiveName) {
return new Meter(new Count(),
return new Meter(new WindowedCount(),
metrics.metricName(baseName + "-rate", groupName,
String.format("The number of %s per second", descriptiveName)),
metrics.metricName(baseName + "-total", groupName,

4
clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java

@ -47,10 +47,10 @@ import org.apache.kafka.common.metrics.Gauge; @@ -47,10 +47,10 @@ import org.apache.kafka.common.metrics.Gauge;
import org.apache.kafka.common.metrics.Metrics;
import org.apache.kafka.common.metrics.Sensor;
import org.apache.kafka.common.metrics.stats.Avg;
import org.apache.kafka.common.metrics.stats.Count;
import org.apache.kafka.common.metrics.stats.Max;
import org.apache.kafka.common.metrics.stats.Meter;
import org.apache.kafka.common.metrics.stats.Min;
import org.apache.kafka.common.metrics.stats.WindowedCount;
import org.apache.kafka.common.metrics.stats.Value;
import org.apache.kafka.common.protocol.ApiKeys;
import org.apache.kafka.common.protocol.Errors;
@ -1657,7 +1657,7 @@ public class Fetcher<K, V> implements Closeable { @@ -1657,7 +1657,7 @@ public class Fetcher<K, V> implements Closeable {
this.fetchLatency = metrics.sensor("fetch-latency");
this.fetchLatency.add(metrics.metricInstance(metricsRegistry.fetchLatencyAvg), new Avg());
this.fetchLatency.add(metrics.metricInstance(metricsRegistry.fetchLatencyMax), new Max());
this.fetchLatency.add(new Meter(new Count(), metrics.metricInstance(metricsRegistry.fetchRequestRate),
this.fetchLatency.add(new Meter(new WindowedCount(), metrics.metricInstance(metricsRegistry.fetchRequestRate),
metrics.metricInstance(metricsRegistry.fetchRequestTotal)));
this.recordsFetchLag = metrics.sensor("records-lag");

2
clients/src/main/java/org/apache/kafka/common/metrics/MeasurableStat.java

@ -19,7 +19,7 @@ package org.apache.kafka.common.metrics; @@ -19,7 +19,7 @@ package org.apache.kafka.common.metrics;
/**
* A MeasurableStat is a {@link Stat} that is also {@link Measurable} (i.e. can produce a single floating point value).
* This is the interface used for most of the simple statistics such as {@link org.apache.kafka.common.metrics.stats.Avg},
* {@link org.apache.kafka.common.metrics.stats.Max}, {@link org.apache.kafka.common.metrics.stats.Count}, etc.
* {@link org.apache.kafka.common.metrics.stats.Max}, {@link org.apache.kafka.common.metrics.stats.CumulativeCount}, etc.
*/
public interface MeasurableStat extends Stat, Measurable {

30
clients/src/main/java/org/apache/kafka/common/metrics/stats/Count.java

@ -16,30 +16,14 @@ @@ -16,30 +16,14 @@
*/
package org.apache.kafka.common.metrics.stats;
import java.util.List;
import org.apache.kafka.common.metrics.MetricConfig;
/**
* A {@link SampledStat} that maintains a simple count of what it has seen.
* This is a special kind of {@link WindowedSum} that always records a value of {@code 1} instead of the provided value.
*
* See also {@link CumulativeCount} for a non-sampled version of this metric.
*
* @deprecated since 2.4 . Use {@link WindowedCount} instead
*/
public class Count extends SampledStat {
public Count() {
super(0);
}
@Override
protected void update(Sample sample, MetricConfig config, double value, long now) {
sample.value += 1.0;
}
@Override
public double combine(List<Sample> samples, MetricConfig config, long now) {
double total = 0.0;
for (Sample sample : samples)
total += sample.value;
return total;
}
@Deprecated
public class Count extends WindowedCount {
}

22
streams/src/main/java/org/apache/kafka/streams/processor/internals/metrics/CumulativeCount.java → clients/src/main/java/org/apache/kafka/common/metrics/stats/CumulativeCount.java

@ -14,25 +14,21 @@ @@ -14,25 +14,21 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.streams.processor.internals.metrics;
package org.apache.kafka.common.metrics.stats;
import org.apache.kafka.common.metrics.MeasurableStat;
import org.apache.kafka.common.metrics.MetricConfig;
/**
* A non-SampledStat version of Count for measuring -total metrics in streams
* A non-sampled version of {@link WindowedCount} maintained over all time.
*
* This is a special kind of {@link CumulativeSum} that always records {@code 1} instead of the provided value.
* In other words, it counts the number of
* {@link CumulativeCount#record(MetricConfig, double, long)} invocations,
* instead of summing the recorded values.
*/
public class CumulativeCount implements MeasurableStat {
private double count = 0.0;
public class CumulativeCount extends CumulativeSum {
@Override
public void record(final MetricConfig config, final double value, final long timeMs) {
count += 1;
}
@Override
public double measure(final MetricConfig config, final long now) {
return count;
super.record(config, 1, timeMs);
}
}

50
clients/src/main/java/org/apache/kafka/common/metrics/stats/CumulativeSum.java

@ -0,0 +1,50 @@ @@ -0,0 +1,50 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.common.metrics.stats;
import org.apache.kafka.common.metrics.MeasurableStat;
import org.apache.kafka.common.metrics.MetricConfig;
/**
* An non-sampled cumulative total maintained over all time.
* This is a non-sampled version of {@link WindowedSum}.
*
* See also {@link CumulativeCount} if you just want to increment the value by 1 on each recording.
*/
public class CumulativeSum implements MeasurableStat {
private double total;
public CumulativeSum() {
total = 0.0;
}
public CumulativeSum(double value) {
total = value;
}
@Override
public void record(MetricConfig config, double value, long now) {
total += value;
}
@Override
public double measure(MetricConfig config, long now) {
return total;
}
}

24
clients/src/main/java/org/apache/kafka/common/metrics/stats/Meter.java

@ -23,48 +23,46 @@ import java.util.concurrent.TimeUnit; @@ -23,48 +23,46 @@ import java.util.concurrent.TimeUnit;
import org.apache.kafka.common.MetricName;
import org.apache.kafka.common.metrics.CompoundStat;
import org.apache.kafka.common.metrics.MetricConfig;
import org.apache.kafka.common.metrics.stats.Rate.SampledTotal;
/**
* A compound stat that includes a rate metric and a cumulative total metric.
*/
public class Meter implements CompoundStat {
private final MetricName rateMetricName;
private final MetricName totalMetricName;
private final Rate rate;
private final Total total;
private final CumulativeSum total;
/**
* Construct a Meter with seconds as time unit and {@link SampledTotal} stats for Rate
* Construct a Meter with seconds as time unit
*/
public Meter(MetricName rateMetricName, MetricName totalMetricName) {
this(TimeUnit.SECONDS, new SampledTotal(), rateMetricName, totalMetricName);
this(TimeUnit.SECONDS, new WindowedSum(), rateMetricName, totalMetricName);
}
/**
* Construct a Meter with provided time unit and {@link SampledTotal} stats for Rate
* Construct a Meter with provided time unit
*/
public Meter(TimeUnit unit, MetricName rateMetricName, MetricName totalMetricName) {
this(unit, new SampledTotal(), rateMetricName, totalMetricName);
this(unit, new WindowedSum(), rateMetricName, totalMetricName);
}
/**
* Construct a Meter with seconds as time unit and provided {@link SampledStat} stats for Rate
* Construct a Meter with seconds as time unit
*/
public Meter(SampledStat rateStat, MetricName rateMetricName, MetricName totalMetricName) {
this(TimeUnit.SECONDS, rateStat, rateMetricName, totalMetricName);
}
/**
* Construct a Meter with provided time unit and provided {@link SampledStat} stats for Rate
* Construct a Meter with provided time unit
*/
public Meter(TimeUnit unit, SampledStat rateStat, MetricName rateMetricName, MetricName totalMetricName) {
if (!(rateStat instanceof SampledTotal) && !(rateStat instanceof Count)) {
throw new IllegalArgumentException("Meter is supported only for SampledTotal and Count");
if (!(rateStat instanceof WindowedSum)) {
throw new IllegalArgumentException("Meter is supported only for WindowedCount or WindowedSum.");
}
this.total = new Total();
this.total = new CumulativeSum();
this.rate = new Rate(unit, rateStat);
this.rateMetricName = rateMetricName;
this.totalMetricName = totalMetricName;
@ -81,7 +79,7 @@ public class Meter implements CompoundStat { @@ -81,7 +79,7 @@ public class Meter implements CompoundStat {
public void record(MetricConfig config, double value, long timeMs) {
rate.record(config, value, timeMs);
// Total metrics with Count stat should record 1.0 (as recorded in the count)
double totalValue = (rate.stat instanceof Count) ? 1.0 : value;
double totalValue = (rate.stat instanceof WindowedCount) ? 1.0 : value;
total.record(config, totalValue, timeMs);
}
}

27
clients/src/main/java/org/apache/kafka/common/metrics/stats/Rate.java

@ -16,7 +16,6 @@ @@ -16,7 +16,6 @@
*/
package org.apache.kafka.common.metrics.stats;
import java.util.List;
import java.util.Locale;
import java.util.concurrent.TimeUnit;
@ -40,7 +39,7 @@ public class Rate implements MeasurableStat { @@ -40,7 +39,7 @@ public class Rate implements MeasurableStat {
}
public Rate(TimeUnit unit) {
this(unit, new SampledTotal());
this(unit, new WindowedSum());
}
public Rate(SampledStat stat) {
@ -115,24 +114,10 @@ public class Rate implements MeasurableStat { @@ -115,24 +114,10 @@ public class Rate implements MeasurableStat {
}
}
public static class SampledTotal extends SampledStat {
public SampledTotal() {
super(0.0d);
}
@Override
protected void update(Sample sample, MetricConfig config, double value, long timeMs) {
sample.value += value;
}
@Override
public double combine(List<Sample> samples, MetricConfig config, long now) {
double total = 0.0;
for (Sample sample : samples)
total += sample.value;
return total;
}
/**
* @deprecated since 2.4 Use {@link WindowedSum} instead.
*/
@Deprecated
public static class SampledTotal extends WindowedSum {
}
}

30
clients/src/main/java/org/apache/kafka/common/metrics/stats/Sum.java

@ -16,30 +16,14 @@ @@ -16,30 +16,14 @@
*/
package org.apache.kafka.common.metrics.stats;
import java.util.List;
import org.apache.kafka.common.metrics.MetricConfig;
/**
* A {@link SampledStat} that maintains the sum of what it has seen.
* This is a sampled version of {@link CumulativeSum}.
*
* See also {@link WindowedCount} if you want to increment the value by 1 on each recording.
*
* @deprecated since 2.4 . Use {@link WindowedSum} instead
*/
public class Sum extends SampledStat {
public Sum() {
super(0);
}
@Override
protected void update(Sample sample, MetricConfig config, double value, long now) {
sample.value += value;
}
@Override
public double combine(List<Sample> samples, MetricConfig config, long now) {
double total = 0.0;
for (Sample sample : samples)
total += sample.value;
return total;
}
@Deprecated
public class Sum extends WindowedSum {
}

36
clients/src/main/java/org/apache/kafka/common/metrics/stats/Total.java

@ -16,32 +16,14 @@ @@ -16,32 +16,14 @@
*/
package org.apache.kafka.common.metrics.stats;
import org.apache.kafka.common.metrics.MeasurableStat;
import org.apache.kafka.common.metrics.MetricConfig;
/**
* An un-windowed cumulative total maintained over all time.
* An non-sampled cumulative total maintained over all time.
* This is a non-sampled version of {@link WindowedSum}.
*
* See also {@link CumulativeCount} if you just want to increment the value by 1 on each recording.
*
* @deprecated since 2.4 . Use {@link CumulativeSum} instead.
*/
public class Total implements MeasurableStat {
private double total;
public Total() {
this.total = 0.0;
}
public Total(double value) {
this.total = value;
}
@Override
public void record(MetricConfig config, double value, long now) {
this.total += value;
}
@Override
public double measure(MetricConfig config, long now) {
return this.total;
}
}
@Deprecated
public class Total extends CumulativeSum {
}

35
clients/src/main/java/org/apache/kafka/common/metrics/stats/WindowedCount.java

@ -0,0 +1,35 @@ @@ -0,0 +1,35 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.common.metrics.stats;
import org.apache.kafka.common.metrics.MetricConfig;
/**
* A {@link SampledStat} that maintains a simple count of what it has seen.
* This is a special kind of {@link WindowedSum} that always records a value of {@code 1} instead of the provided value.
* In other words, it counts the number of
* {@link WindowedCount#record(MetricConfig, double, long)} invocations,
* instead of summing the recorded values.
*
* See also {@link CumulativeCount} for a non-sampled version of this metric.
*/
public class WindowedCount extends WindowedSum {
@Override
protected void update(Sample sample, MetricConfig config, double value, long now) {
super.update(sample, config, 1.0, now);
}
}

48
clients/src/main/java/org/apache/kafka/common/metrics/stats/WindowedSum.java

@ -0,0 +1,48 @@ @@ -0,0 +1,48 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.common.metrics.stats;
import org.apache.kafka.common.metrics.MetricConfig;
import java.util.List;
/**
* A {@link SampledStat} that maintains the sum of what it has seen.
* This is a sampled version of {@link CumulativeSum}.
*
* See also {@link WindowedCount} if you want to increment the value by 1 on each recording.
*/
public class WindowedSum extends SampledStat {
public WindowedSum() {
super(0);
}
@Override
protected void update(Sample sample, MetricConfig config, double value, long now) {
sample.value += value;
}
@Override
public double combine(List<Sample> samples, MetricConfig config, long now) {
double total = 0.0;
for (Sample sample : samples)
total += sample.value;
return total;
}
}

18
clients/src/main/java/org/apache/kafka/common/network/Selector.java

@ -23,11 +23,11 @@ import org.apache.kafka.common.memory.MemoryPool; @@ -23,11 +23,11 @@ import org.apache.kafka.common.memory.MemoryPool;
import org.apache.kafka.common.metrics.Metrics;
import org.apache.kafka.common.metrics.Sensor;
import org.apache.kafka.common.metrics.stats.Avg;
import org.apache.kafka.common.metrics.stats.Count;
import org.apache.kafka.common.metrics.stats.CumulativeSum;
import org.apache.kafka.common.metrics.stats.Max;
import org.apache.kafka.common.metrics.stats.Meter;
import org.apache.kafka.common.metrics.stats.WindowedCount;
import org.apache.kafka.common.metrics.stats.SampledStat;
import org.apache.kafka.common.metrics.stats.Total;
import org.apache.kafka.common.utils.LogContext;
import org.apache.kafka.common.utils.Time;
import org.apache.kafka.common.utils.Utils;
@ -1084,7 +1084,7 @@ public class Selector implements Selectable, AutoCloseable { @@ -1084,7 +1084,7 @@ public class Selector implements Selectable, AutoCloseable {
"successful-authentication-no-reauth-total", metricGrpName,
"The total number of connections with successful authentication where the client does not support re-authentication",
metricTags);
this.successfulAuthenticationNoReauth.add(successfulAuthenticationNoReauthMetricName, new Total());
this.successfulAuthenticationNoReauth.add(successfulAuthenticationNoReauthMetricName, new CumulativeSum());
this.failedAuthentication = sensor("failed-authentication:" + tagsSuffix);
this.failedAuthentication.add(createMeter(metrics, metricGrpName, metricTags,
@ -1105,13 +1105,13 @@ public class Selector implements Selectable, AutoCloseable { @@ -1105,13 +1105,13 @@ public class Selector implements Selectable, AutoCloseable {
this.reauthenticationLatency.add(reauthenticationLatencyAvgMetricName, new Avg());
this.bytesTransferred = sensor("bytes-sent-received:" + tagsSuffix);
bytesTransferred.add(createMeter(metrics, metricGrpName, metricTags, new Count(),
bytesTransferred.add(createMeter(metrics, metricGrpName, metricTags, new WindowedCount(),
"network-io", "network operations (reads or writes) on all connections"));
this.bytesSent = sensor("bytes-sent:" + tagsSuffix, bytesTransferred);
this.bytesSent.add(createMeter(metrics, metricGrpName, metricTags,
"outgoing-byte", "outgoing bytes sent to all servers"));
this.bytesSent.add(createMeter(metrics, metricGrpName, metricTags, new Count(),
this.bytesSent.add(createMeter(metrics, metricGrpName, metricTags, new WindowedCount(),
"request", "requests sent"));
MetricName metricName = metrics.metricName("request-size-avg", metricGrpName, "The average size of requests sent.", metricTags);
this.bytesSent.add(metricName, new Avg());
@ -1122,11 +1122,11 @@ public class Selector implements Selectable, AutoCloseable { @@ -1122,11 +1122,11 @@ public class Selector implements Selectable, AutoCloseable {
this.bytesReceived.add(createMeter(metrics, metricGrpName, metricTags,
"incoming-byte", "bytes read off all sockets"));
this.bytesReceived.add(createMeter(metrics, metricGrpName, metricTags,
new Count(), "response", "responses received"));
new WindowedCount(), "response", "responses received"));
this.selectTime = sensor("select-time:" + tagsSuffix);
this.selectTime.add(createMeter(metrics, metricGrpName, metricTags,
new Count(), "select", "times the I/O layer checked for new I/O to perform"));
new WindowedCount(), "select", "times the I/O layer checked for new I/O to perform"));
metricName = metrics.metricName("io-wait-time-ns-avg", metricGrpName, "The average length of time the I/O thread spent waiting for a socket ready for reads or writes in nanoseconds.", metricTags);
this.selectTime.add(metricName, new Avg());
this.selectTime.add(createIOThreadRatioMeter(metrics, metricGrpName, metricTags, "io-wait", "waiting"));
@ -1187,7 +1187,7 @@ public class Selector implements Selectable, AutoCloseable { @@ -1187,7 +1187,7 @@ public class Selector implements Selectable, AutoCloseable {
nodeRequest = sensor(nodeRequestName);
nodeRequest.add(createMeter(metrics, metricGrpName, tags, "outgoing-byte", "outgoing bytes"));
nodeRequest.add(createMeter(metrics, metricGrpName, tags, new Count(), "request", "requests sent"));
nodeRequest.add(createMeter(metrics, metricGrpName, tags, new WindowedCount(), "request", "requests sent"));
MetricName metricName = metrics.metricName("request-size-avg", metricGrpName, "The average size of requests sent.", tags);
nodeRequest.add(metricName, new Avg());
metricName = metrics.metricName("request-size-max", metricGrpName, "The maximum size of any request sent.", tags);
@ -1196,7 +1196,7 @@ public class Selector implements Selectable, AutoCloseable { @@ -1196,7 +1196,7 @@ public class Selector implements Selectable, AutoCloseable {
String nodeResponseName = "node-" + connectionId + ".bytes-received";
Sensor nodeResponse = sensor(nodeResponseName);
nodeResponse.add(createMeter(metrics, metricGrpName, tags, "incoming-byte", "incoming bytes"));
nodeResponse.add(createMeter(metrics, metricGrpName, tags, new Count(), "response", "responses received"));
nodeResponse.add(createMeter(metrics, metricGrpName, tags, new WindowedCount(), "response", "responses received"));
String nodeTimeName = "node-" + connectionId + ".latency";
Sensor nodeRequestTime = sensor(nodeTimeName);

14
clients/src/test/java/org/apache/kafka/common/metrics/JmxReporterTest.java

@ -18,7 +18,7 @@ package org.apache.kafka.common.metrics; @@ -18,7 +18,7 @@ package org.apache.kafka.common.metrics;
import org.apache.kafka.common.MetricName;
import org.apache.kafka.common.metrics.stats.Avg;
import org.apache.kafka.common.metrics.stats.Total;
import org.apache.kafka.common.metrics.stats.CumulativeSum;
import org.junit.Test;
import javax.management.MBeanServer;
@ -43,7 +43,7 @@ public class JmxReporterTest { @@ -43,7 +43,7 @@ public class JmxReporterTest {
Sensor sensor = metrics.sensor("kafka.requests");
sensor.add(metrics.metricName("pack.bean1.avg", "grp1"), new Avg());
sensor.add(metrics.metricName("pack.bean2.total", "grp2"), new Total());
sensor.add(metrics.metricName("pack.bean2.total", "grp2"), new CumulativeSum());
assertTrue(server.isRegistered(new ObjectName(":type=grp1")));
assertEquals(Double.NaN, server.getAttribute(new ObjectName(":type=grp1"), "pack.bean1.avg"));
@ -79,11 +79,11 @@ public class JmxReporterTest { @@ -79,11 +79,11 @@ public class JmxReporterTest {
metrics.addReporter(new JmxReporter());
Sensor sensor = metrics.sensor("kafka.requests");
sensor.add(metrics.metricName("name", "group", "desc", "id", "foo*"), new Total());
sensor.add(metrics.metricName("name", "group", "desc", "id", "foo+"), new Total());
sensor.add(metrics.metricName("name", "group", "desc", "id", "foo?"), new Total());
sensor.add(metrics.metricName("name", "group", "desc", "id", "foo:"), new Total());
sensor.add(metrics.metricName("name", "group", "desc", "id", "foo%"), new Total());
sensor.add(metrics.metricName("name", "group", "desc", "id", "foo*"), new CumulativeSum());
sensor.add(metrics.metricName("name", "group", "desc", "id", "foo+"), new CumulativeSum());
sensor.add(metrics.metricName("name", "group", "desc", "id", "foo?"), new CumulativeSum());
sensor.add(metrics.metricName("name", "group", "desc", "id", "foo:"), new CumulativeSum());
sensor.add(metrics.metricName("name", "group", "desc", "id", "foo%"), new CumulativeSum());
assertTrue(server.isRegistered(new ObjectName(":type=group,id=\"foo\\*\"")));
assertEquals(0.0, server.getAttribute(new ObjectName(":type=group,id=\"foo\\*\""), "name"));

8
clients/src/test/java/org/apache/kafka/common/metrics/KafkaMbeanTest.java

@ -17,8 +17,8 @@ @@ -17,8 +17,8 @@
package org.apache.kafka.common.metrics;
import org.apache.kafka.common.MetricName;
import org.apache.kafka.common.metrics.stats.Count;
import org.apache.kafka.common.metrics.stats.Sum;
import org.apache.kafka.common.metrics.stats.WindowedCount;
import org.apache.kafka.common.metrics.stats.WindowedSum;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
@ -51,9 +51,9 @@ public class KafkaMbeanTest { @@ -51,9 +51,9 @@ public class KafkaMbeanTest {
metrics.addReporter(new JmxReporter());
sensor = metrics.sensor("kafka.requests");
countMetricName = metrics.metricName("pack.bean1.count", "grp1");
sensor.add(countMetricName, new Count());
sensor.add(countMetricName, new WindowedCount());
sumMetricName = metrics.metricName("pack.bean1.sum", "grp1");
sensor.add(sumMetricName, new Sum());
sensor.add(sumMetricName, new WindowedSum());
}
@After

64
clients/src/test/java/org/apache/kafka/common/metrics/MetricsTest.java

@ -44,7 +44,7 @@ import java.util.function.Function; @@ -44,7 +44,7 @@ import java.util.function.Function;
import org.apache.kafka.common.Metric;
import org.apache.kafka.common.MetricName;
import org.apache.kafka.common.metrics.stats.Avg;
import org.apache.kafka.common.metrics.stats.Count;
import org.apache.kafka.common.metrics.stats.CumulativeSum;
import org.apache.kafka.common.metrics.stats.Max;
import org.apache.kafka.common.metrics.stats.Meter;
import org.apache.kafka.common.metrics.stats.Min;
@ -52,9 +52,9 @@ import org.apache.kafka.common.metrics.stats.Percentile; @@ -52,9 +52,9 @@ import org.apache.kafka.common.metrics.stats.Percentile;
import org.apache.kafka.common.metrics.stats.Percentiles;
import org.apache.kafka.common.metrics.stats.Percentiles.BucketSizing;
import org.apache.kafka.common.metrics.stats.Rate;
import org.apache.kafka.common.metrics.stats.WindowedCount;
import org.apache.kafka.common.metrics.stats.WindowedSum;
import org.apache.kafka.common.metrics.stats.SimpleRate;
import org.apache.kafka.common.metrics.stats.Sum;
import org.apache.kafka.common.metrics.stats.Total;
import org.apache.kafka.common.metrics.stats.Value;
import org.apache.kafka.common.utils.MockTime;
import org.junit.After;
@ -119,15 +119,15 @@ public class MetricsTest { @@ -119,15 +119,15 @@ public class MetricsTest {
s.add(metrics.metricName("test.min", "grp1"), new Min());
s.add(new Meter(TimeUnit.SECONDS, metrics.metricName("test.rate", "grp1"),
metrics.metricName("test.total", "grp1")));
s.add(new Meter(TimeUnit.SECONDS, new Count(), metrics.metricName("test.occurences", "grp1"),
s.add(new Meter(TimeUnit.SECONDS, new WindowedCount(), metrics.metricName("test.occurences", "grp1"),
metrics.metricName("test.occurences.total", "grp1")));
s.add(metrics.metricName("test.count", "grp1"), new Count());
s.add(metrics.metricName("test.count", "grp1"), new WindowedCount());
s.add(new Percentiles(100, -100, 100, BucketSizing.CONSTANT,
new Percentile(metrics.metricName("test.median", "grp1"), 50.0),
new Percentile(metrics.metricName("test.perc99_9", "grp1"), 99.9)));
Sensor s2 = metrics.sensor("test.sensor2");
s2.add(metrics.metricName("s2.total", "grp1"), new Total());
s2.add(metrics.metricName("s2.total", "grp1"), new CumulativeSum());
s2.record(5.0);
int sum = 0;
@ -162,15 +162,15 @@ public class MetricsTest { @@ -162,15 +162,15 @@ public class MetricsTest {
@Test
public void testHierarchicalSensors() {
Sensor parent1 = metrics.sensor("test.parent1");
parent1.add(metrics.metricName("test.parent1.count", "grp1"), new Count());
parent1.add(metrics.metricName("test.parent1.count", "grp1"), new WindowedCount());
Sensor parent2 = metrics.sensor("test.parent2");
parent2.add(metrics.metricName("test.parent2.count", "grp1"), new Count());
parent2.add(metrics.metricName("test.parent2.count", "grp1"), new WindowedCount());
Sensor child1 = metrics.sensor("test.child1", parent1, parent2);
child1.add(metrics.metricName("test.child1.count", "grp1"), new Count());
child1.add(metrics.metricName("test.child1.count", "grp1"), new WindowedCount());
Sensor child2 = metrics.sensor("test.child2", parent1);
child2.add(metrics.metricName("test.child2.count", "grp1"), new Count());
child2.add(metrics.metricName("test.child2.count", "grp1"), new WindowedCount());
Sensor grandchild = metrics.sensor("test.grandchild", child1);
grandchild.add(metrics.metricName("test.grandchild.count", "grp1"), new Count());
grandchild.add(metrics.metricName("test.grandchild.count", "grp1"), new WindowedCount());
/* increment each sensor one time */
parent1.record();
@ -222,15 +222,15 @@ public class MetricsTest { @@ -222,15 +222,15 @@ public class MetricsTest {
public void testRemoveSensor() {
int size = metrics.metrics().size();
Sensor parent1 = metrics.sensor("test.parent1");
parent1.add(metrics.metricName("test.parent1.count", "grp1"), new Count());
parent1.add(metrics.metricName("test.parent1.count", "grp1"), new WindowedCount());
Sensor parent2 = metrics.sensor("test.parent2");
parent2.add(metrics.metricName("test.parent2.count", "grp1"), new Count());
parent2.add(metrics.metricName("test.parent2.count", "grp1"), new WindowedCount());
Sensor child1 = metrics.sensor("test.child1", parent1, parent2);
child1.add(metrics.metricName("test.child1.count", "grp1"), new Count());
child1.add(metrics.metricName("test.child1.count", "grp1"), new WindowedCount());
Sensor child2 = metrics.sensor("test.child2", parent2);
child2.add(metrics.metricName("test.child2.count", "grp1"), new Count());
child2.add(metrics.metricName("test.child2.count", "grp1"), new WindowedCount());
Sensor grandChild1 = metrics.sensor("test.gchild2", child2);
grandChild1.add(metrics.metricName("test.gchild2.count", "grp1"), new Count());
grandChild1.add(metrics.metricName("test.gchild2.count", "grp1"), new WindowedCount());
Sensor sensor = metrics.getSensor("test.parent1");
assertNotNull(sensor);
@ -268,10 +268,10 @@ public class MetricsTest { @@ -268,10 +268,10 @@ public class MetricsTest {
@Test
public void testRemoveInactiveMetrics() {
Sensor s1 = metrics.sensor("test.s1", null, 1);
s1.add(metrics.metricName("test.s1.count", "grp1"), new Count());
s1.add(metrics.metricName("test.s1.count", "grp1"), new WindowedCount());
Sensor s2 = metrics.sensor("test.s2", null, 3);
s2.add(metrics.metricName("test.s2.count", "grp1"), new Count());
s2.add(metrics.metricName("test.s2.count", "grp1"), new WindowedCount());
Metrics.ExpireSensorTask purger = metrics.new ExpireSensorTask();
purger.run();
@ -309,7 +309,7 @@ public class MetricsTest { @@ -309,7 +309,7 @@ public class MetricsTest {
// After purging, it should be possible to recreate a metric
s1 = metrics.sensor("test.s1", null, 1);
s1.add(metrics.metricName("test.s1.count", "grp1"), new Count());
s1.add(metrics.metricName("test.s1.count", "grp1"), new WindowedCount());
assertNotNull("Sensor test.s1 must be present", metrics.getSensor("test.s1"));
assertNotNull("MetricName test.s1.count must be present",
metrics.metrics().get(metrics.metricName("test.s1.count", "grp1")));
@ -318,8 +318,8 @@ public class MetricsTest { @@ -318,8 +318,8 @@ public class MetricsTest {
@Test
public void testRemoveMetric() {
int size = metrics.metrics().size();
metrics.addMetric(metrics.metricName("test1", "grp1"), new Count());
metrics.addMetric(metrics.metricName("test2", "grp1"), new Count());
metrics.addMetric(metrics.metricName("test1", "grp1"), new WindowedCount());
metrics.addMetric(metrics.metricName("test2", "grp1"), new WindowedCount());
assertNotNull(metrics.removeMetric(metrics.metricName("test1", "grp1")));
assertNull(metrics.metrics().get(metrics.metricName("test1", "grp1")));
@ -333,7 +333,7 @@ public class MetricsTest { @@ -333,7 +333,7 @@ public class MetricsTest {
@Test
public void testEventWindowing() {
Count count = new Count();
WindowedCount count = new WindowedCount();
MetricConfig config = new MetricConfig().eventWindow(1).samples(2);
count.record(config, 1.0, time.milliseconds());
count.record(config, 1.0, time.milliseconds());
@ -344,7 +344,7 @@ public class MetricsTest { @@ -344,7 +344,7 @@ public class MetricsTest {
@Test
public void testTimeWindowing() {
Count count = new Count();
WindowedCount count = new WindowedCount();
MetricConfig config = new MetricConfig().timeWindow(1, TimeUnit.MILLISECONDS).samples(2);
count.record(config, 1.0, time.milliseconds());
time.sleep(1);
@ -397,8 +397,8 @@ public class MetricsTest { @@ -397,8 +397,8 @@ public class MetricsTest {
*/
@Test
public void testSampledStatReturnsInitialValueWhenNoValuesExist() {
Count count = new Count();
Rate.SampledTotal sampledTotal = new Rate.SampledTotal();
WindowedCount count = new WindowedCount();
WindowedSum sampledTotal = new WindowedSum();
long windowMs = 100;
int samples = 2;
MetricConfig config = new MetricConfig().timeWindow(windowMs, TimeUnit.MILLISECONDS).samples(samples);
@ -415,14 +415,14 @@ public class MetricsTest { @@ -415,14 +415,14 @@ public class MetricsTest {
@Test(expected = IllegalArgumentException.class)
public void testDuplicateMetricName() {
metrics.sensor("test").add(metrics.metricName("test", "grp1"), new Avg());
metrics.sensor("test2").add(metrics.metricName("test", "grp1"), new Total());
metrics.sensor("test2").add(metrics.metricName("test", "grp1"), new CumulativeSum());
}
@Test
public void testQuotas() {
Sensor sensor = metrics.sensor("test");
sensor.add(metrics.metricName("test1.total", "grp1"), new Total(), new MetricConfig().quota(Quota.upperBound(5.0)));
sensor.add(metrics.metricName("test2.total", "grp1"), new Total(), new MetricConfig().quota(Quota.lowerBound(0.0)));
sensor.add(metrics.metricName("test1.total", "grp1"), new CumulativeSum(), new MetricConfig().quota(Quota.upperBound(5.0)));
sensor.add(metrics.metricName("test2.total", "grp1"), new CumulativeSum(), new MetricConfig().quota(Quota.lowerBound(0.0)));
sensor.record(5.0);
try {
sensor.record(1.0);
@ -503,7 +503,7 @@ public class MetricsTest { @@ -503,7 +503,7 @@ public class MetricsTest {
MetricName countRateMetricName = metrics.metricName("test.count.rate", "grp1");
MetricName countTotalMetricName = metrics.metricName("test.count.total", "grp1");
s.add(new Meter(TimeUnit.SECONDS, rateMetricName, totalMetricName));
s.add(new Meter(TimeUnit.SECONDS, new Count(), countRateMetricName, countTotalMetricName));
s.add(new Meter(TimeUnit.SECONDS, new WindowedCount(), countRateMetricName, countTotalMetricName));
KafkaMetric totalMetric = metrics.metrics().get(totalMetricName);
KafkaMetric countTotalMetric = metrics.metrics().get(countTotalMetricName);
@ -825,10 +825,10 @@ public class MetricsTest { @@ -825,10 +825,10 @@ public class MetricsTest {
sensor.add(metrics.metricName("test.metric.avg", "avg", tags), new Avg());
break;
case TOTAL:
sensor.add(metrics.metricName("test.metric.total", "total", tags), new Total());
sensor.add(metrics.metricName("test.metric.total", "total", tags), new CumulativeSum());
break;
case COUNT:
sensor.add(metrics.metricName("test.metric.count", "count", tags), new Count());
sensor.add(metrics.metricName("test.metric.count", "count", tags), new WindowedCount());
break;
case MAX:
sensor.add(metrics.metricName("test.metric.max", "max", tags), new Max());
@ -843,7 +843,7 @@ public class MetricsTest { @@ -843,7 +843,7 @@ public class MetricsTest {
sensor.add(metrics.metricName("test.metric.simpleRate", "simpleRate", tags), new SimpleRate());
break;
case SUM:
sensor.add(metrics.metricName("test.metric.sum", "sum", tags), new Sum());
sensor.add(metrics.metricName("test.metric.sum", "sum", tags), new WindowedSum());
break;
case VALUE:
sensor.add(metrics.metricName("test.metric.value", "value", tags), new Value());

4
clients/src/test/java/org/apache/kafka/common/metrics/SensorTest.java

@ -20,7 +20,7 @@ import org.apache.kafka.common.MetricName; @@ -20,7 +20,7 @@ import org.apache.kafka.common.MetricName;
import org.apache.kafka.common.metrics.stats.Avg;
import org.apache.kafka.common.metrics.stats.Meter;
import org.apache.kafka.common.metrics.stats.Rate;
import org.apache.kafka.common.metrics.stats.Sum;
import org.apache.kafka.common.metrics.stats.WindowedSum;
import org.apache.kafka.common.utils.MockTime;
import org.apache.kafka.common.utils.SystemTime;
import org.apache.kafka.common.utils.Time;
@ -128,7 +128,7 @@ public class SensorTest { @@ -128,7 +128,7 @@ public class SensorTest {
}
// note that adding a different metric with the same name is also a no-op
assertTrue(sensor.add(metrics.metricName("test-metric", "test-group"), new Sum()));
assertTrue(sensor.add(metrics.metricName("test-metric", "test-group"), new WindowedSum()));
// so after all this, we still just have the original metric registered
assertEquals(1, sensor.metrics().size());

2
clients/src/test/java/org/apache/kafka/common/metrics/stats/MeterTest.java

@ -44,7 +44,7 @@ public class MeterTest { @@ -44,7 +44,7 @@ public class MeterTest {
assertEquals(rateMetricName, rate.name());
assertEquals(totalMetricName, total.name());
Rate rateStat = (Rate) rate.stat();
Total totalStat = (Total) total.stat();
CumulativeSum totalStat = (CumulativeSum) total.stat();
MetricConfig config = new MetricConfig();
double nextValue = 0.0;

4
clients/src/test/java/org/apache/kafka/test/MetricsBench.java

@ -21,11 +21,11 @@ import java.util.Arrays; @@ -21,11 +21,11 @@ import java.util.Arrays;
import org.apache.kafka.common.metrics.Metrics;
import org.apache.kafka.common.metrics.Sensor;
import org.apache.kafka.common.metrics.stats.Avg;
import org.apache.kafka.common.metrics.stats.Count;
import org.apache.kafka.common.metrics.stats.Max;
import org.apache.kafka.common.metrics.stats.Percentile;
import org.apache.kafka.common.metrics.stats.Percentiles;
import org.apache.kafka.common.metrics.stats.Percentiles.BucketSizing;
import org.apache.kafka.common.metrics.stats.WindowedCount;
public class MetricsBench {
@ -37,7 +37,7 @@ public class MetricsBench { @@ -37,7 +37,7 @@ public class MetricsBench {
Sensor child = metrics.sensor("child", parent);
for (Sensor sensor : Arrays.asList(parent, child)) {
sensor.add(metrics.metricName(sensor.name() + ".avg", "grp1"), new Avg());
sensor.add(metrics.metricName(sensor.name() + ".count", "grp1"), new Count());
sensor.add(metrics.metricName(sensor.name() + ".count", "grp1"), new WindowedCount());
sensor.add(metrics.metricName(sensor.name() + ".max", "grp1"), new Max());
sensor.add(new Percentiles(1024,
0.0,

14
connect/runtime/src/main/java/org/apache/kafka/connect/runtime/Worker.java

@ -24,8 +24,8 @@ import org.apache.kafka.common.MetricName; @@ -24,8 +24,8 @@ import org.apache.kafka.common.MetricName;
import org.apache.kafka.common.config.ConfigValue;
import org.apache.kafka.common.config.provider.ConfigProvider;
import org.apache.kafka.common.metrics.Sensor;
import org.apache.kafka.common.metrics.stats.CumulativeSum;
import org.apache.kafka.common.metrics.stats.Frequencies;
import org.apache.kafka.common.metrics.stats.Total;
import org.apache.kafka.common.utils.Time;
import org.apache.kafka.common.utils.Utils;
import org.apache.kafka.connect.connector.Connector;
@ -863,13 +863,13 @@ public class Worker { @@ -863,13 +863,13 @@ public class Worker {
connectorStartupResults.add(connectorStartupResultFrequencies);
connectorStartupAttempts = metricGroup.sensor("connector-startup-attempts");
connectorStartupAttempts.add(metricGroup.metricName(registry.connectorStartupAttemptsTotal), new Total());
connectorStartupAttempts.add(metricGroup.metricName(registry.connectorStartupAttemptsTotal), new CumulativeSum());
connectorStartupSuccesses = metricGroup.sensor("connector-startup-successes");
connectorStartupSuccesses.add(metricGroup.metricName(registry.connectorStartupSuccessTotal), new Total());
connectorStartupSuccesses.add(metricGroup.metricName(registry.connectorStartupSuccessTotal), new CumulativeSum());
connectorStartupFailures = metricGroup.sensor("connector-startup-failures");
connectorStartupFailures.add(metricGroup.metricName(registry.connectorStartupFailureTotal), new Total());
connectorStartupFailures.add(metricGroup.metricName(registry.connectorStartupFailureTotal), new CumulativeSum());
MetricName taskFailurePct = metricGroup.metricName(registry.taskStartupFailurePercentage);
MetricName taskSuccessPct = metricGroup.metricName(registry.taskStartupSuccessPercentage);
@ -878,13 +878,13 @@ public class Worker { @@ -878,13 +878,13 @@ public class Worker {
taskStartupResults.add(taskStartupResultFrequencies);
taskStartupAttempts = metricGroup.sensor("task-startup-attempts");
taskStartupAttempts.add(metricGroup.metricName(registry.taskStartupAttemptsTotal), new Total());
taskStartupAttempts.add(metricGroup.metricName(registry.taskStartupAttemptsTotal), new CumulativeSum());
taskStartupSuccesses = metricGroup.sensor("task-startup-successes");
taskStartupSuccesses.add(metricGroup.metricName(registry.taskStartupSuccessTotal), new Total());
taskStartupSuccesses.add(metricGroup.metricName(registry.taskStartupSuccessTotal), new CumulativeSum());
taskStartupFailures = metricGroup.sensor("task-startup-failures");
taskStartupFailures.add(metricGroup.metricName(registry.taskStartupFailureTotal), new Total());
taskStartupFailures.add(metricGroup.metricName(registry.taskStartupFailureTotal), new CumulativeSum());
}
void close() {

10
connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSinkTask.java

@ -27,9 +27,9 @@ import org.apache.kafka.common.TopicPartition; @@ -27,9 +27,9 @@ import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.errors.WakeupException;
import org.apache.kafka.common.metrics.Sensor;
import org.apache.kafka.common.metrics.stats.Avg;
import org.apache.kafka.common.metrics.stats.CumulativeSum;
import org.apache.kafka.common.metrics.stats.Max;
import org.apache.kafka.common.metrics.stats.Rate;
import org.apache.kafka.common.metrics.stats.Total;
import org.apache.kafka.common.metrics.stats.Value;
import org.apache.kafka.common.utils.Time;
import org.apache.kafka.connect.data.SchemaAndValue;
@ -705,11 +705,11 @@ class WorkerSinkTask extends WorkerTask { @@ -705,11 +705,11 @@ class WorkerSinkTask extends WorkerTask {
sinkRecordRead = metricGroup.sensor("sink-record-read");
sinkRecordRead.add(metricGroup.metricName(registry.sinkRecordReadRate), new Rate());
sinkRecordRead.add(metricGroup.metricName(registry.sinkRecordReadTotal), new Total());
sinkRecordRead.add(metricGroup.metricName(registry.sinkRecordReadTotal), new CumulativeSum());
sinkRecordSend = metricGroup.sensor("sink-record-send");
sinkRecordSend.add(metricGroup.metricName(registry.sinkRecordSendRate), new Rate());
sinkRecordSend.add(metricGroup.metricName(registry.sinkRecordSendTotal), new Total());
sinkRecordSend.add(metricGroup.metricName(registry.sinkRecordSendTotal), new CumulativeSum());
sinkRecordActiveCount = metricGroup.sensor("sink-record-active-count");
sinkRecordActiveCount.add(metricGroup.metricName(registry.sinkRecordActiveCount), new Value());
@ -724,11 +724,11 @@ class WorkerSinkTask extends WorkerTask { @@ -724,11 +724,11 @@ class WorkerSinkTask extends WorkerTask {
offsetCompletion = metricGroup.sensor("offset-commit-completion");
offsetCompletion.add(metricGroup.metricName(registry.sinkRecordOffsetCommitCompletionRate), new Rate());
offsetCompletion.add(metricGroup.metricName(registry.sinkRecordOffsetCommitCompletionTotal), new Total());
offsetCompletion.add(metricGroup.metricName(registry.sinkRecordOffsetCommitCompletionTotal), new CumulativeSum());
offsetCompletionSkip = metricGroup.sensor("offset-commit-completion-skip");
offsetCompletionSkip.add(metricGroup.metricName(registry.sinkRecordOffsetCommitSkipRate), new Rate());
offsetCompletionSkip.add(metricGroup.metricName(registry.sinkRecordOffsetCommitSkipTotal), new Total());
offsetCompletionSkip.add(metricGroup.metricName(registry.sinkRecordOffsetCommitSkipTotal), new CumulativeSum());
putBatchTime = metricGroup.sensor("put-batch-time");
putBatchTime.add(metricGroup.metricName(registry.sinkRecordPutBatchTimeMax), new Max());

6
connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSourceTask.java

@ -24,9 +24,9 @@ import org.apache.kafka.common.KafkaException; @@ -24,9 +24,9 @@ import org.apache.kafka.common.KafkaException;
import org.apache.kafka.common.header.internals.RecordHeaders;
import org.apache.kafka.common.metrics.Sensor;
import org.apache.kafka.common.metrics.stats.Avg;
import org.apache.kafka.common.metrics.stats.CumulativeSum;
import org.apache.kafka.common.metrics.stats.Max;
import org.apache.kafka.common.metrics.stats.Rate;
import org.apache.kafka.common.metrics.stats.Total;
import org.apache.kafka.common.metrics.stats.Value;
import org.apache.kafka.common.utils.Time;
import org.apache.kafka.connect.errors.ConnectException;
@ -591,11 +591,11 @@ class WorkerSourceTask extends WorkerTask { @@ -591,11 +591,11 @@ class WorkerSourceTask extends WorkerTask {
sourceRecordPoll = metricGroup.sensor("source-record-poll");
sourceRecordPoll.add(metricGroup.metricName(registry.sourceRecordPollRate), new Rate());
sourceRecordPoll.add(metricGroup.metricName(registry.sourceRecordPollTotal), new Total());
sourceRecordPoll.add(metricGroup.metricName(registry.sourceRecordPollTotal), new CumulativeSum());
sourceRecordWrite = metricGroup.sensor("source-record-write");
sourceRecordWrite.add(metricGroup.metricName(registry.sourceRecordWriteRate), new Rate());
sourceRecordWrite.add(metricGroup.metricName(registry.sourceRecordWriteTotal), new Total());
sourceRecordWrite.add(metricGroup.metricName(registry.sourceRecordWriteTotal), new CumulativeSum());
pollTime = metricGroup.sensor("poll-batch-time");
pollTime.add(metricGroup.metricName(registry.sourceRecordPollBatchTimeMax), new Max());

4
connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/DistributedHerder.java

@ -22,8 +22,8 @@ import org.apache.kafka.common.config.ConfigValue; @@ -22,8 +22,8 @@ import org.apache.kafka.common.config.ConfigValue;
import org.apache.kafka.common.errors.WakeupException;
import org.apache.kafka.common.metrics.Sensor;
import org.apache.kafka.common.metrics.stats.Avg;
import org.apache.kafka.common.metrics.stats.CumulativeSum;
import org.apache.kafka.common.metrics.stats.Max;
import org.apache.kafka.common.metrics.stats.Total;
import org.apache.kafka.common.utils.Exit;
import org.apache.kafka.common.utils.LogContext;
import org.apache.kafka.common.utils.Time;
@ -1483,7 +1483,7 @@ public class DistributedHerder extends AbstractHerder implements Runnable { @@ -1483,7 +1483,7 @@ public class DistributedHerder extends AbstractHerder implements Runnable {
});
rebalanceCompletedCounts = metricGroup.sensor("completed-rebalance-count");
rebalanceCompletedCounts.add(metricGroup.metricName(registry.rebalanceCompletedTotal), new Total());
rebalanceCompletedCounts.add(metricGroup.metricName(registry.rebalanceCompletedTotal), new CumulativeSum());
rebalanceTime = metricGroup.sensor("rebalance-time");
rebalanceTime.add(metricGroup.metricName(registry.rebalanceTimeMax), new Max());

16
connect/runtime/src/main/java/org/apache/kafka/connect/runtime/errors/ErrorHandlingMetrics.java

@ -17,7 +17,7 @@ @@ -17,7 +17,7 @@
package org.apache.kafka.connect.runtime.errors;
import org.apache.kafka.common.metrics.Sensor;
import org.apache.kafka.common.metrics.stats.Total;
import org.apache.kafka.common.metrics.stats.CumulativeSum;
import org.apache.kafka.common.utils.SystemTime;
import org.apache.kafka.common.utils.Time;
import org.apache.kafka.connect.runtime.ConnectMetrics;
@ -62,25 +62,25 @@ public class ErrorHandlingMetrics { @@ -62,25 +62,25 @@ public class ErrorHandlingMetrics {
metricGroup.close();
recordProcessingFailures = metricGroup.sensor("total-record-failures");
recordProcessingFailures.add(metricGroup.metricName(registry.recordProcessingFailures), new Total());
recordProcessingFailures.add(metricGroup.metricName(registry.recordProcessingFailures), new CumulativeSum());
recordProcessingErrors = metricGroup.sensor("total-record-errors");
recordProcessingErrors.add(metricGroup.metricName(registry.recordProcessingErrors), new Total());
recordProcessingErrors.add(metricGroup.metricName(registry.recordProcessingErrors), new CumulativeSum());
recordsSkipped = metricGroup.sensor("total-records-skipped");
recordsSkipped.add(metricGroup.metricName(registry.recordsSkipped), new Total());
recordsSkipped.add(metricGroup.metricName(registry.recordsSkipped), new CumulativeSum());
retries = metricGroup.sensor("total-retries");
retries.add(metricGroup.metricName(registry.retries), new Total());
retries.add(metricGroup.metricName(registry.retries), new CumulativeSum());
errorsLogged = metricGroup.sensor("total-errors-logged");
errorsLogged.add(metricGroup.metricName(registry.errorsLogged), new Total());
errorsLogged.add(metricGroup.metricName(registry.errorsLogged), new CumulativeSum());
dlqProduceRequests = metricGroup.sensor("deadletterqueue-produce-requests");
dlqProduceRequests.add(metricGroup.metricName(registry.dlqProduceRequests), new Total());
dlqProduceRequests.add(metricGroup.metricName(registry.dlqProduceRequests), new CumulativeSum());
dlqProduceFailures = metricGroup.sensor("deadletterqueue-produce-failures");
dlqProduceFailures.add(metricGroup.metricName(registry.dlqProduceFailures), new Total());
dlqProduceFailures.add(metricGroup.metricName(registry.dlqProduceFailures), new CumulativeSum());
metricGroup.addValueMetric(registry.lastErrorTimestamp, now -> lastErrorTime);
}

5
core/src/main/scala/kafka/network/SocketServer.scala

@ -39,8 +39,7 @@ import org.apache.kafka.common.config.ConfigException @@ -39,8 +39,7 @@ import org.apache.kafka.common.config.ConfigException
import org.apache.kafka.common.{KafkaException, Reconfigurable}
import org.apache.kafka.common.memory.{MemoryPool, SimpleMemoryPool}
import org.apache.kafka.common.metrics._
import org.apache.kafka.common.metrics.stats.Meter
import org.apache.kafka.common.metrics.stats.Total
import org.apache.kafka.common.metrics.stats.{CumulativeSum, Meter}
import org.apache.kafka.common.network.KafkaChannel.ChannelMuteEvent
import org.apache.kafka.common.network.{ChannelBuilder, ChannelBuilders, KafkaChannel, ListenerName, ListenerReconfigurable, Selectable, Send, Selector => KSelector}
import org.apache.kafka.common.protocol.ApiKeys
@ -712,7 +711,7 @@ private[kafka] class Processor(val id: Int, @@ -712,7 +711,7 @@ private[kafka] class Processor(val id: Int,
Map(NetworkProcessorMetricTag -> id.toString)
)
val expiredConnectionsKilledCount = new Total()
val expiredConnectionsKilledCount = new CumulativeSum()
private val expiredConnectionsKilledCountMetricName = metrics.metricName("expired-connections-killed-count", "socket-server-metrics", metricTags)
metrics.addMetric(expiredConnectionsKilledCountMetricName, expiredConnectionsKilledCount)

4
core/src/main/scala/kafka/server/ClientQuotaManager.scala

@ -27,7 +27,7 @@ import kafka.utils.{Logging, ShutdownableThread} @@ -27,7 +27,7 @@ import kafka.utils.{Logging, ShutdownableThread}
import org.apache.kafka.common.{Cluster, MetricName}
import org.apache.kafka.common.metrics._
import org.apache.kafka.common.metrics.Metrics
import org.apache.kafka.common.metrics.stats.{Avg, Rate, Total}
import org.apache.kafka.common.metrics.stats.{Avg, CumulativeSum, Rate}
import org.apache.kafka.common.security.auth.KafkaPrincipal
import org.apache.kafka.common.utils.{Sanitizer, Time}
import org.apache.kafka.server.quota.{ClientQuotaCallback, ClientQuotaEntity, ClientQuotaType}
@ -179,7 +179,7 @@ class ClientQuotaManager(private val config: ClientQuotaManagerConfig, @@ -179,7 +179,7 @@ class ClientQuotaManager(private val config: ClientQuotaManagerConfig,
private val delayQueueSensor = metrics.sensor(quotaType + "-delayQueue")
delayQueueSensor.add(metrics.metricName("queue-size",
quotaType.toString,
"Tracks the size of the delay queue"), new Total())
"Tracks the size of the delay queue"), new CumulativeSum())
start() // Use start method to keep spotbugs happy
private def start() {
throttledChannelReaper.start()

8
streams/src/main/java/org/apache/kafka/streams/kstream/internals/metrics/Sensors.java

@ -19,10 +19,10 @@ package org.apache.kafka.streams.kstream.internals.metrics; @@ -19,10 +19,10 @@ package org.apache.kafka.streams.kstream.internals.metrics;
import org.apache.kafka.common.MetricName;
import org.apache.kafka.common.metrics.Sensor;
import org.apache.kafka.common.metrics.stats.Avg;
import org.apache.kafka.common.metrics.stats.CumulativeSum;
import org.apache.kafka.common.metrics.stats.Max;
import org.apache.kafka.common.metrics.stats.Rate;
import org.apache.kafka.common.metrics.stats.Sum;
import org.apache.kafka.common.metrics.stats.Total;
import org.apache.kafka.common.metrics.stats.WindowedSum;
import org.apache.kafka.streams.processor.internals.InternalProcessorContext;
import org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl;
@ -106,7 +106,7 @@ public class Sensors { @@ -106,7 +106,7 @@ public class Sensors {
"The average number of occurrence of suppression-emit operation per second.",
tags
),
new Rate(TimeUnit.SECONDS, new Sum())
new Rate(TimeUnit.SECONDS, new WindowedSum())
);
sensor.add(
new MetricName(
@ -115,7 +115,7 @@ public class Sensors { @@ -115,7 +115,7 @@ public class Sensors {
"The total number of occurrence of suppression-emit operations.",
tags
),
new Total()
new CumulativeSum()
);
return sensor;
}

8
streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java

@ -28,9 +28,10 @@ import org.apache.kafka.common.errors.ProducerFencedException; @@ -28,9 +28,10 @@ import org.apache.kafka.common.errors.ProducerFencedException;
import org.apache.kafka.common.errors.TimeoutException;
import org.apache.kafka.common.metrics.Sensor;
import org.apache.kafka.common.metrics.stats.Avg;
import org.apache.kafka.common.metrics.stats.Count;
import org.apache.kafka.common.metrics.stats.CumulativeCount;
import org.apache.kafka.common.metrics.stats.Max;
import org.apache.kafka.common.metrics.stats.Rate;
import org.apache.kafka.common.metrics.stats.WindowedCount;
import org.apache.kafka.common.utils.Time;
import org.apache.kafka.streams.StreamsConfig;
import org.apache.kafka.streams.errors.DeserializationExceptionHandler;
@ -43,7 +44,6 @@ import org.apache.kafka.streams.processor.PunctuationType; @@ -43,7 +44,6 @@ import org.apache.kafka.streams.processor.PunctuationType;
import org.apache.kafka.streams.processor.Punctuator;
import org.apache.kafka.streams.processor.TaskId;
import org.apache.kafka.streams.processor.TimestampExtractor;
import org.apache.kafka.streams.processor.internals.metrics.CumulativeCount;
import org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl;
import org.apache.kafka.streams.processor.internals.metrics.ThreadMetrics;
import org.apache.kafka.streams.state.internals.ThreadCache;
@ -112,7 +112,7 @@ public class StreamTask extends AbstractTask implements ProcessorNodePunctuator @@ -112,7 +112,7 @@ public class StreamTask extends AbstractTask implements ProcessorNodePunctuator
);
taskCommitTimeSensor.add(
new MetricName("commit-rate", group, "The average number of occurrence of commit operation per second.", tagMap),
new Rate(TimeUnit.SECONDS, new Count())
new Rate(TimeUnit.SECONDS, new WindowedCount())
);
taskCommitTimeSensor.add(
new MetricName("commit-total", group, "The total number of occurrence of commit operations.", tagMap),
@ -123,7 +123,7 @@ public class StreamTask extends AbstractTask implements ProcessorNodePunctuator @@ -123,7 +123,7 @@ public class StreamTask extends AbstractTask implements ProcessorNodePunctuator
taskEnforcedProcessSensor = metrics.taskLevelSensor(taskName, "enforced-processing", Sensor.RecordingLevel.DEBUG, parent);
taskEnforcedProcessSensor.add(
new MetricName("enforced-processing-rate", group, "The average number of occurrence of enforced-processing operation per second.", tagMap),
new Rate(TimeUnit.SECONDS, new Count())
new Rate(TimeUnit.SECONDS, new WindowedCount())
);
taskEnforcedProcessSensor.add(
new MetricName("enforced-processing-total", group, "The total number of occurrence of enforced-processing operations.", tagMap),

5
streams/src/main/java/org/apache/kafka/streams/processor/internals/metrics/StreamsMetricsImpl.java

@ -22,9 +22,10 @@ import org.apache.kafka.common.metrics.Metrics; @@ -22,9 +22,10 @@ import org.apache.kafka.common.metrics.Metrics;
import org.apache.kafka.common.metrics.Sensor;
import org.apache.kafka.common.metrics.Sensor.RecordingLevel;
import org.apache.kafka.common.metrics.stats.Avg;
import org.apache.kafka.common.metrics.stats.Count;
import org.apache.kafka.common.metrics.stats.CumulativeCount;
import org.apache.kafka.common.metrics.stats.Max;
import org.apache.kafka.common.metrics.stats.Rate;
import org.apache.kafka.common.metrics.stats.WindowedCount;
import org.apache.kafka.streams.StreamsMetrics;
import java.util.Arrays;
@ -445,7 +446,7 @@ public class StreamsMetricsImpl implements StreamsMetrics { @@ -445,7 +446,7 @@ public class StreamsMetricsImpl implements StreamsMetrics {
descriptionOfRate,
tags
),
new Rate(TimeUnit.SECONDS, new Count())
new Rate(TimeUnit.SECONDS, new WindowedCount())
);
}

4
streams/src/test/java/org/apache/kafka/streams/processor/internals/RecordCollectorTest.java

@ -33,7 +33,7 @@ import org.apache.kafka.common.header.internals.RecordHeader; @@ -33,7 +33,7 @@ import org.apache.kafka.common.header.internals.RecordHeader;
import org.apache.kafka.common.header.internals.RecordHeaders;
import org.apache.kafka.common.metrics.Metrics;
import org.apache.kafka.common.metrics.Sensor;
import org.apache.kafka.common.metrics.stats.Sum;
import org.apache.kafka.common.metrics.stats.WindowedSum;
import org.apache.kafka.common.serialization.ByteArraySerializer;
import org.apache.kafka.common.serialization.StringSerializer;
import org.apache.kafka.common.utils.LogContext;
@ -215,7 +215,7 @@ public class RecordCollectorTest { @@ -215,7 +215,7 @@ public class RecordCollectorTest {
final Sensor sensor = metrics.sensor("skipped-records");
final LogCaptureAppender logCaptureAppender = LogCaptureAppender.createAndRegister();
final MetricName metricName = new MetricName("name", "group", "description", Collections.emptyMap());
sensor.add(metricName, new Sum());
sensor.add(metricName, new WindowedSum());
final RecordCollector collector = new RecordCollectorImpl(
"test",
logContext,

4
streams/src/test/java/org/apache/kafka/streams/processor/internals/StandbyTaskTest.java

@ -27,7 +27,7 @@ import org.apache.kafka.common.TopicPartition; @@ -27,7 +27,7 @@ import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.metrics.KafkaMetric;
import org.apache.kafka.common.metrics.Metrics;
import org.apache.kafka.common.metrics.Sensor;
import org.apache.kafka.common.metrics.stats.Total;
import org.apache.kafka.common.metrics.stats.CumulativeSum;
import org.apache.kafka.common.record.TimestampType;
import org.apache.kafka.common.serialization.IntegerDeserializer;
import org.apache.kafka.common.serialization.IntegerSerializer;
@ -692,7 +692,7 @@ public class StandbyTaskTest { @@ -692,7 +692,7 @@ public class StandbyTaskTest {
private MetricName setupCloseTaskMetric() {
final MetricName metricName = new MetricName("name", "group", "description", Collections.emptyMap());
final Sensor sensor = streamsMetrics.threadLevelSensor("task-closed", Sensor.RecordingLevel.INFO);
sensor.add(metricName, new Total());
sensor.add(metricName, new CumulativeSum());
return metricName;
}

8
streams/test-utils/src/main/java/org/apache/kafka/streams/TopologyTestDriver.java

@ -32,9 +32,9 @@ import org.apache.kafka.common.header.internals.RecordHeaders; @@ -32,9 +32,9 @@ import org.apache.kafka.common.header.internals.RecordHeaders;
import org.apache.kafka.common.metrics.MetricConfig;
import org.apache.kafka.common.metrics.Metrics;
import org.apache.kafka.common.metrics.Sensor;
import org.apache.kafka.common.metrics.stats.Count;
import org.apache.kafka.common.metrics.stats.CumulativeSum;
import org.apache.kafka.common.metrics.stats.Rate;
import org.apache.kafka.common.metrics.stats.Total;
import org.apache.kafka.common.metrics.stats.WindowedCount;
import org.apache.kafka.common.record.TimestampType;
import org.apache.kafka.common.serialization.ByteArraySerializer;
import org.apache.kafka.common.serialization.Deserializer;
@ -289,12 +289,12 @@ public class TopologyTestDriver implements Closeable { @@ -289,12 +289,12 @@ public class TopologyTestDriver implements Closeable {
threadLevelGroup,
"The average per-second number of skipped records",
streamsMetrics.tagMap()),
new Rate(TimeUnit.SECONDS, new Count()));
new Rate(TimeUnit.SECONDS, new WindowedCount()));
skippedRecordsSensor.add(new MetricName("skipped-records-total",
threadLevelGroup,
"The total number of skipped records",
streamsMetrics.tagMap()),
new Total());
new CumulativeSum());
final ThreadCache cache = new ThreadCache(
new LogContext("topology-test-driver "),
Math.max(0, streamsConfig.getLong(StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG)),

Loading…
Cancel
Save