|
|
|
@ -38,7 +38,6 @@ import java.util.TreeMap;
@@ -38,7 +38,6 @@ import java.util.TreeMap;
|
|
|
|
|
import java.util.concurrent.ConcurrentHashMap; |
|
|
|
|
import java.util.concurrent.ConcurrentMap; |
|
|
|
|
import java.util.concurrent.ScheduledThreadPoolExecutor; |
|
|
|
|
import java.util.concurrent.ThreadFactory; |
|
|
|
|
import java.util.concurrent.TimeUnit; |
|
|
|
|
|
|
|
|
|
import static java.util.Collections.emptyList; |
|
|
|
@ -137,29 +136,20 @@ public class Metrics implements Closeable {
@@ -137,29 +136,20 @@ public class Metrics implements Closeable {
|
|
|
|
|
this.reporters = Objects.requireNonNull(reporters); |
|
|
|
|
this.time = time; |
|
|
|
|
for (MetricsReporter reporter : reporters) |
|
|
|
|
reporter.init(new ArrayList<KafkaMetric>()); |
|
|
|
|
reporter.init(new ArrayList<>()); |
|
|
|
|
|
|
|
|
|
// Create the ThreadPoolExecutor only if expiration of Sensors is enabled.
|
|
|
|
|
if (enableExpiration) { |
|
|
|
|
this.metricsScheduler = new ScheduledThreadPoolExecutor(1); |
|
|
|
|
// Creating a daemon thread to not block shutdown
|
|
|
|
|
this.metricsScheduler.setThreadFactory(new ThreadFactory() { |
|
|
|
|
public Thread newThread(Runnable runnable) { |
|
|
|
|
return KafkaThread.daemon("SensorExpiryThread", runnable); |
|
|
|
|
} |
|
|
|
|
}); |
|
|
|
|
this.metricsScheduler.setThreadFactory(runnable -> KafkaThread.daemon("SensorExpiryThread", runnable)); |
|
|
|
|
this.metricsScheduler.scheduleAtFixedRate(new ExpireSensorTask(), 30, 30, TimeUnit.SECONDS); |
|
|
|
|
} else { |
|
|
|
|
this.metricsScheduler = null; |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
addMetric(metricName("count", "kafka-metrics-count", "total number of registered metrics"), |
|
|
|
|
new Measurable() { |
|
|
|
|
@Override |
|
|
|
|
public double measure(MetricConfig config, long now) { |
|
|
|
|
return metrics.size(); |
|
|
|
|
} |
|
|
|
|
}); |
|
|
|
|
(config, now) -> metrics.size()); |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
/** |
|
|
|
@ -186,7 +176,7 @@ public class Metrics implements Closeable {
@@ -186,7 +176,7 @@ public class Metrics implements Closeable {
|
|
|
|
|
* @param description A human-readable description to include in the metric |
|
|
|
|
*/ |
|
|
|
|
public MetricName metricName(String name, String group, String description) { |
|
|
|
|
return metricName(name, group, description, new HashMap<String, String>()); |
|
|
|
|
return metricName(name, group, description, new HashMap<>()); |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
/** |
|
|
|
@ -196,7 +186,7 @@ public class Metrics implements Closeable {
@@ -196,7 +186,7 @@ public class Metrics implements Closeable {
|
|
|
|
|
* @param group logical group name of the metrics to which this metric belongs |
|
|
|
|
*/ |
|
|
|
|
public MetricName metricName(String name, String group) { |
|
|
|
|
return metricName(name, group, "", new HashMap<String, String>()); |
|
|
|
|
return metricName(name, group, "", new HashMap<>()); |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
/** |
|
|
|
@ -227,7 +217,7 @@ public class Metrics implements Closeable {
@@ -227,7 +217,7 @@ public class Metrics implements Closeable {
|
|
|
|
|
private static Map<String, String> getTags(String... keyValue) { |
|
|
|
|
if ((keyValue.length % 2) != 0) |
|
|
|
|
throw new IllegalArgumentException("keyValue needs to be specified in pairs"); |
|
|
|
|
Map<String, String> tags = new LinkedHashMap<String, String>(); |
|
|
|
|
Map<String, String> tags = new LinkedHashMap<>(); |
|
|
|
|
|
|
|
|
|
for (int i = 0; i < keyValue.length; i += 2) |
|
|
|
|
tags.put(keyValue[i], keyValue[i + 1]); |
|
|
|
@ -245,7 +235,7 @@ public class Metrics implements Closeable {
@@ -245,7 +235,7 @@ public class Metrics implements Closeable {
|
|
|
|
|
* @return the string containing the HTML table; never null |
|
|
|
|
*/ |
|
|
|
|
public static String toHtmlTable(String domain, Iterable<MetricNameTemplate> allMetrics) { |
|
|
|
|
Map<String, Map<String, String>> beansAndAttributes = new TreeMap<String, Map<String, String>>(); |
|
|
|
|
Map<String, Map<String, String>> beansAndAttributes = new TreeMap<>(); |
|
|
|
|
|
|
|
|
|
try (Metrics metrics = new Metrics()) { |
|
|
|
|
for (MetricNameTemplate template : allMetrics) { |
|
|
|
@ -257,7 +247,7 @@ public class Metrics implements Closeable {
@@ -257,7 +247,7 @@ public class Metrics implements Closeable {
|
|
|
|
|
MetricName metricName = metrics.metricName(template.name(), template.group(), template.description(), tags); |
|
|
|
|
String mBeanName = JmxReporter.getMBeanName(domain, metricName); |
|
|
|
|
if (!beansAndAttributes.containsKey(mBeanName)) { |
|
|
|
|
beansAndAttributes.put(mBeanName, new TreeMap<String, String>()); |
|
|
|
|
beansAndAttributes.put(mBeanName, new TreeMap<>()); |
|
|
|
|
} |
|
|
|
|
Map<String, String> attrAndDesc = beansAndAttributes.get(mBeanName); |
|
|
|
|
if (!attrAndDesc.containsKey(template.name())) { |
|
|
|
@ -405,11 +395,7 @@ public class Metrics implements Closeable {
@@ -405,11 +395,7 @@ public class Metrics implements Closeable {
|
|
|
|
|
this.sensors.put(name, s); |
|
|
|
|
if (parents != null) { |
|
|
|
|
for (Sensor parent : parents) { |
|
|
|
|
List<Sensor> children = childrenSensors.get(parent); |
|
|
|
|
if (children == null) { |
|
|
|
|
children = new ArrayList<>(); |
|
|
|
|
childrenSensors.put(parent, children); |
|
|
|
|
} |
|
|
|
|
List<Sensor> children = childrenSensors.computeIfAbsent(parent, k -> new ArrayList<>()); |
|
|
|
|
children.add(s); |
|
|
|
|
} |
|
|
|
|
} |
|
|
|
|