diff --git a/streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java b/streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java index a99f8cc1963..776dde70816 100644 --- a/streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java +++ b/streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java @@ -673,7 +673,8 @@ public class KafkaStreams { throw new StreamsException(fatal); } - final MetricConfig metricConfig = new MetricConfig().samples(config.getInt(StreamsConfig.METRICS_NUM_SAMPLES_CONFIG)) + final MetricConfig metricConfig = new MetricConfig() + .samples(config.getInt(StreamsConfig.METRICS_NUM_SAMPLES_CONFIG)) .recordLevel(Sensor.RecordingLevel.forName(config.getString(StreamsConfig.METRICS_RECORDING_LEVEL_CONFIG))) .timeWindow(config.getLong(StreamsConfig.METRICS_SAMPLE_WINDOW_MS_CONFIG), TimeUnit.MILLISECONDS); final List reporters = config.getConfiguredInstances(StreamsConfig.METRIC_REPORTER_CLASSES_CONFIG, diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/GlobalStreamThread.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/GlobalStreamThread.java index af3c7dbc304..1c348975f2a 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/GlobalStreamThread.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/GlobalStreamThread.java @@ -298,7 +298,7 @@ public class GlobalStreamThread extends Thread { log.error("Failed to close state maintainer due to the following error:", e); } - streamsMetrics.removeOwnedSensors(); + streamsMetrics.removeAllThreadLevelSensors(); setState(DEAD); diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java index d9515b15ecd..b9753249eae 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java @@ -22,9 +22,14 @@ import org.apache.kafka.clients.consumer.ConsumerRecord; import org.apache.kafka.clients.consumer.OffsetAndMetadata; import org.apache.kafka.clients.producer.Producer; import org.apache.kafka.common.KafkaException; +import org.apache.kafka.common.MetricName; import org.apache.kafka.common.TopicPartition; import org.apache.kafka.common.errors.ProducerFencedException; import org.apache.kafka.common.metrics.Sensor; +import org.apache.kafka.common.metrics.stats.Avg; +import org.apache.kafka.common.metrics.stats.Count; +import org.apache.kafka.common.metrics.stats.Max; +import org.apache.kafka.common.metrics.stats.Rate; import org.apache.kafka.common.utils.Time; import org.apache.kafka.streams.StreamsConfig; import org.apache.kafka.streams.errors.DeserializationExceptionHandler; @@ -42,6 +47,7 @@ import org.apache.kafka.streams.state.internals.ThreadCache; import java.util.Collection; import java.util.HashMap; import java.util.Map; +import java.util.concurrent.TimeUnit; import static java.lang.String.format; import static java.util.Collections.singleton; @@ -72,16 +78,57 @@ public class StreamTask extends AbstractTask implements ProcessorNodePunctuator protected static final class TaskMetrics { final StreamsMetricsImpl metrics; final Sensor taskCommitTimeSensor; + private final String taskName; TaskMetrics(final TaskId id, final StreamsMetricsImpl metrics) { - final String name = id.toString(); + taskName = id.toString(); this.metrics = metrics; - taskCommitTimeSensor = metrics.addLatencyAndThroughputSensor("task", name, "commit", Sensor.RecordingLevel.DEBUG); + final String group = "stream-task-metrics"; + + // first add the global operation metrics if not yet, with the global tags only + final Map allTagMap = metrics.tagMap("task-id", "all"); + final Sensor parent = metrics.threadLevelSensor("commit", Sensor.RecordingLevel.DEBUG); + parent.add( + new MetricName("commit-latency-avg", group, "The average latency of commit operation.", allTagMap), + new Avg() + ); + parent.add( + new MetricName("commit-latency-max", group, "The max latency of commit operation.", allTagMap), + new Max() + ); + parent.add( + new MetricName("commit-rate", group, "The average number of occurrence of commit operation per second.", allTagMap), + new Rate(TimeUnit.SECONDS, new Count()) + ); + parent.add( + new MetricName("commit-total", group, "The total number of occurrence of commit operations.", allTagMap), + new Count() + ); + + // add the operation metrics with additional tags + final Map tagMap = metrics.tagMap("task-id", taskName); + taskCommitTimeSensor = metrics.taskLevelSensor("commit", taskName, Sensor.RecordingLevel.DEBUG, parent); + taskCommitTimeSensor.add( + new MetricName("commit-latency-avg", group, "The average latency of commit operation.", tagMap), + new Avg() + ); + taskCommitTimeSensor.add( + new MetricName("commit-latency-max", group, "The max latency of commit operation.", tagMap), + new Max() + ); + taskCommitTimeSensor.add( + new MetricName("commit-rate", group, "The average number of occurrence of commit operation per second.", tagMap), + new Rate(TimeUnit.SECONDS, new Count()) + ); + taskCommitTimeSensor.add( + new MetricName("commit-total", group, "The total number of occurrence of commit operations.", tagMap), + new Count() + ); } void removeAllSensors() { - metrics.removeSensor(taskCommitTimeSensor); + metrics.removeAllTaskLevelSensors(taskName); } } diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java index 39727be5a3e..e4ad1385e5c 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java @@ -52,10 +52,8 @@ import java.util.ArrayList; import java.util.Arrays; import java.util.Collection; import java.util.Collections; -import java.util.Deque; import java.util.HashMap; import java.util.HashSet; -import java.util.LinkedList; import java.util.List; import java.util.Map; import java.util.Set; @@ -64,7 +62,6 @@ import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicInteger; import static java.util.Collections.singleton; -import static org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsConventions.threadLevelSensorName; public class StreamThread extends Thread { @@ -509,58 +506,41 @@ public class StreamThread extends Thread { private final Sensor taskCreatedSensor; private final Sensor tasksClosedSensor; - private final Deque ownedSensors = new LinkedList<>(); - StreamsMetricsThreadImpl(final Metrics metrics, final String threadName) { super(metrics, threadName); - final String groupName = "stream-metrics"; - - commitTimeSensor = metrics.sensor(threadLevelSensorName(threadName, "commit-latency"), Sensor.RecordingLevel.INFO); - commitTimeSensor.add(metrics.metricName("commit-latency-avg", groupName, "The average commit time in ms", tags()), new Avg()); - commitTimeSensor.add(metrics.metricName("commit-latency-max", groupName, "The maximum commit time in ms", tags()), new Max()); - commitTimeSensor.add(metrics.metricName("commit-rate", groupName, "The average per-second number of commit calls", tags()), new Rate(TimeUnit.SECONDS, new Count())); - commitTimeSensor.add(metrics.metricName("commit-total", groupName, "The total number of commit calls", tags()), new Count()); - ownedSensors.push(commitTimeSensor.name()); - - pollTimeSensor = metrics.sensor(threadLevelSensorName(threadName, "poll-latency"), Sensor.RecordingLevel.INFO); - pollTimeSensor.add(metrics.metricName("poll-latency-avg", groupName, "The average poll time in ms", tags()), new Avg()); - pollTimeSensor.add(metrics.metricName("poll-latency-max", groupName, "The maximum poll time in ms", tags()), new Max()); - pollTimeSensor.add(metrics.metricName("poll-rate", groupName, "The average per-second number of record-poll calls", tags()), new Rate(TimeUnit.SECONDS, new Count())); - pollTimeSensor.add(metrics.metricName("poll-total", groupName, "The total number of record-poll calls", tags()), new Count()); - ownedSensors.push(pollTimeSensor.name()); - - processTimeSensor = metrics.sensor(threadLevelSensorName(threadName, "process-latency"), Sensor.RecordingLevel.INFO); - processTimeSensor.add(metrics.metricName("process-latency-avg", groupName, "The average process time in ms", tags()), new Avg()); - processTimeSensor.add(metrics.metricName("process-latency-max", groupName, "The maximum process time in ms", tags()), new Max()); - processTimeSensor.add(metrics.metricName("process-rate", groupName, "The average per-second number of process calls", tags()), new Rate(TimeUnit.SECONDS, new Count())); - processTimeSensor.add(metrics.metricName("process-total", groupName, "The total number of process calls", tags()), new Count()); - ownedSensors.push(processTimeSensor.name()); - - punctuateTimeSensor = metrics.sensor(threadLevelSensorName(threadName, "punctuate-latency"), Sensor.RecordingLevel.INFO); - punctuateTimeSensor.add(metrics.metricName("punctuate-latency-avg", groupName, "The average punctuate time in ms", tags()), new Avg()); - punctuateTimeSensor.add(metrics.metricName("punctuate-latency-max", groupName, "The maximum punctuate time in ms", tags()), new Max()); - punctuateTimeSensor.add(metrics.metricName("punctuate-rate", groupName, "The average per-second number of punctuate calls", tags()), new Rate(TimeUnit.SECONDS, new Count())); - punctuateTimeSensor.add(metrics.metricName("punctuate-total", groupName, "The total number of punctuate calls", tags()), new Count()); - ownedSensors.push(punctuateTimeSensor.name()); - - taskCreatedSensor = metrics.sensor(threadLevelSensorName(threadName, "task-created"), Sensor.RecordingLevel.INFO); + final String group = "stream-metrics"; + + commitTimeSensor = threadLevelSensor("commit-latency", Sensor.RecordingLevel.INFO); + commitTimeSensor.add(metrics.metricName("commit-latency-avg", group, "The average commit time in ms", tags()), new Avg()); + commitTimeSensor.add(metrics.metricName("commit-latency-max", group, "The maximum commit time in ms", tags()), new Max()); + commitTimeSensor.add(metrics.metricName("commit-rate", group, "The average per-second number of commit calls", tags()), new Rate(TimeUnit.SECONDS, new Count())); + commitTimeSensor.add(metrics.metricName("commit-total", group, "The total number of commit calls", tags()), new Count()); + + pollTimeSensor = threadLevelSensor("poll-latency", Sensor.RecordingLevel.INFO); + pollTimeSensor.add(metrics.metricName("poll-latency-avg", group, "The average poll time in ms", tags()), new Avg()); + pollTimeSensor.add(metrics.metricName("poll-latency-max", group, "The maximum poll time in ms", tags()), new Max()); + pollTimeSensor.add(metrics.metricName("poll-rate", group, "The average per-second number of record-poll calls", tags()), new Rate(TimeUnit.SECONDS, new Count())); + pollTimeSensor.add(metrics.metricName("poll-total", group, "The total number of record-poll calls", tags()), new Count()); + + processTimeSensor = threadLevelSensor("process-latency", Sensor.RecordingLevel.INFO); + processTimeSensor.add(metrics.metricName("process-latency-avg", group, "The average process time in ms", tags()), new Avg()); + processTimeSensor.add(metrics.metricName("process-latency-max", group, "The maximum process time in ms", tags()), new Max()); + processTimeSensor.add(metrics.metricName("process-rate", group, "The average per-second number of process calls", tags()), new Rate(TimeUnit.SECONDS, new Count())); + processTimeSensor.add(metrics.metricName("process-total", group, "The total number of process calls", tags()), new Count()); + + punctuateTimeSensor = threadLevelSensor("punctuate-latency", Sensor.RecordingLevel.INFO); + punctuateTimeSensor.add(metrics.metricName("punctuate-latency-avg", group, "The average punctuate time in ms", tags()), new Avg()); + punctuateTimeSensor.add(metrics.metricName("punctuate-latency-max", group, "The maximum punctuate time in ms", tags()), new Max()); + punctuateTimeSensor.add(metrics.metricName("punctuate-rate", group, "The average per-second number of punctuate calls", tags()), new Rate(TimeUnit.SECONDS, new Count())); + punctuateTimeSensor.add(metrics.metricName("punctuate-total", group, "The total number of punctuate calls", tags()), new Count()); + + taskCreatedSensor = threadLevelSensor("task-created", Sensor.RecordingLevel.INFO); taskCreatedSensor.add(metrics.metricName("task-created-rate", "stream-metrics", "The average per-second number of newly created tasks", tags()), new Rate(TimeUnit.SECONDS, new Count())); taskCreatedSensor.add(metrics.metricName("task-created-total", "stream-metrics", "The total number of newly created tasks", tags()), new Total()); - ownedSensors.push(taskCreatedSensor.name()); - - tasksClosedSensor = metrics.sensor(threadLevelSensorName(threadName, "task-closed"), Sensor.RecordingLevel.INFO); - tasksClosedSensor.add(metrics.metricName("task-closed-rate", groupName, "The average per-second number of closed tasks", tags()), new Rate(TimeUnit.SECONDS, new Count())); - tasksClosedSensor.add(metrics.metricName("task-closed-total", groupName, "The total number of closed tasks", tags()), new Total()); - ownedSensors.push(tasksClosedSensor.name()); - } - public void removeOwnedSensors() { - synchronized (ownedSensors) { - super.removeOwnedSensors(); - while (!ownedSensors.isEmpty()) { - registry().removeSensor(ownedSensors.pop()); - } - } + tasksClosedSensor = threadLevelSensor("task-closed", Sensor.RecordingLevel.INFO); + tasksClosedSensor.add(metrics.metricName("task-closed-rate", group, "The average per-second number of closed tasks", tags()), new Rate(TimeUnit.SECONDS, new Count())); + tasksClosedSensor.add(metrics.metricName("task-closed-total", group, "The total number of closed tasks", tags()), new Total()); } } @@ -1165,7 +1145,7 @@ public class StreamThread extends Thread { } catch (final Throwable e) { log.error("Failed to close restore consumer due to the following error:", e); } - streamsMetrics.removeOwnedSensors(); + streamsMetrics.removeAllThreadLevelSensors(); setState(State.DEAD); log.info("Shutdown complete"); diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/metrics/StreamsMetricsConventions.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/metrics/StreamsMetricsConventions.java deleted file mode 100644 index cfe206c7d9c..00000000000 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/metrics/StreamsMetricsConventions.java +++ /dev/null @@ -1,39 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.kafka.streams.processor.internals.metrics; - -import java.util.LinkedHashMap; -import java.util.Map; - -public final class StreamsMetricsConventions { - private StreamsMetricsConventions() { - } - - public static String threadLevelSensorName(final String threadName, final String sensorName) { - return "thread." + threadName + "." + sensorName; - } - - static Map threadLevelTags(final String threadName, final Map tags) { - if (tags.containsKey("client-id")) { - return tags; - } else { - final LinkedHashMap newTags = new LinkedHashMap<>(tags); - newTags.put("client-id", threadName); - return newTags; - } - } -} diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/metrics/StreamsMetricsImpl.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/metrics/StreamsMetricsImpl.java index 76a1d2b1dab..02512656bd0 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/metrics/StreamsMetricsImpl.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/metrics/StreamsMetricsImpl.java @@ -31,35 +31,125 @@ import java.util.Arrays; import java.util.Collections; import java.util.Deque; import java.util.HashMap; +import java.util.LinkedHashMap; import java.util.LinkedList; import java.util.Map; import java.util.Objects; import java.util.concurrent.TimeUnit; -import static org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsConventions.threadLevelSensorName; - public class StreamsMetricsImpl implements StreamsMetrics { private final Metrics metrics; private final Map tags; private final Map parentSensors; - private final Deque ownedSensors = new LinkedList<>(); private final Sensor skippedRecordsSensor; + private final String threadName; + + private final Deque threadLevelSensors = new LinkedList<>(); + private final Map> taskLevelSensors = new HashMap<>(); + private final Map> cacheLevelSensors = new HashMap<>(); public StreamsMetricsImpl(final Metrics metrics, final String threadName) { Objects.requireNonNull(metrics, "Metrics cannot be null"); + this.threadName = threadName; this.metrics = metrics; - this.tags = StreamsMetricsConventions.threadLevelTags(threadName, Collections.emptyMap()); + + + final HashMap tags = new LinkedHashMap<>(); + tags.put("client-id", threadName); + this.tags = Collections.unmodifiableMap(tags); + this.parentSensors = new HashMap<>(); - skippedRecordsSensor = metrics.sensor(threadLevelSensorName(threadName, "skipped-records"), Sensor.RecordingLevel.INFO); - skippedRecordsSensor.add(metrics.metricName("skipped-records-rate", "stream-metrics", "The average per-second number of skipped records", tags), new Rate(TimeUnit.SECONDS, new Count())); - skippedRecordsSensor.add(metrics.metricName("skipped-records-total", "stream-metrics", "The total number of skipped records", tags), new Total()); - ownedSensors.push(skippedRecordsSensor.name()); + final String group = "stream-metrics"; + skippedRecordsSensor = threadLevelSensor("skipped-records", Sensor.RecordingLevel.INFO); + skippedRecordsSensor.add(metrics.metricName("skipped-records-rate", group, "The average per-second number of skipped records", tags), new Rate(TimeUnit.SECONDS, new Count())); + skippedRecordsSensor.add(metrics.metricName("skipped-records-total", group, "The total number of skipped records", tags), new Total()); + } + + public final Sensor threadLevelSensor(final String sensorName, + final Sensor.RecordingLevel recordingLevel, + final Sensor... parents) { + synchronized (threadLevelSensors) { + final String fullSensorName = threadName + "." + sensorName; + final Sensor sensor = metrics.sensor(fullSensorName, recordingLevel, parents); + threadLevelSensors.push(fullSensorName); + + return sensor; + } + } + + public final void removeAllThreadLevelSensors() { + synchronized (threadLevelSensors) { + while (!threadLevelSensors.isEmpty()) { + metrics.removeSensor(threadLevelSensors.pop()); + } + } + } + + public final Sensor taskLevelSensor(final String taskName, + final String sensorName, + final Sensor.RecordingLevel recordingLevel, + final Sensor... parents) { + final String key = threadName + "." + taskName; + synchronized (taskLevelSensors) { + if (!taskLevelSensors.containsKey(key)) { + taskLevelSensors.put(key, new LinkedList()); + } + + final String fullSensorName = key + "." + sensorName; + + final Sensor sensor = metrics.sensor(fullSensorName, recordingLevel, parents); + + taskLevelSensors.get(key).push(fullSensorName); + + return sensor; + } } - public final Metrics registry() { - return metrics; + public final void removeAllTaskLevelSensors(final String taskName) { + final String key = threadName + "." + taskName; + synchronized (taskLevelSensors) { + if (taskLevelSensors.containsKey(key)) { + while (!taskLevelSensors.get(key).isEmpty()) { + metrics.removeSensor(taskLevelSensors.get(key).pop()); + } + taskLevelSensors.remove(key); + } + } + } + + public final Sensor cacheLevelSensor(final String taskName, + final String cacheName, + final String sensorName, + final Sensor.RecordingLevel recordingLevel, + final Sensor... parents) { + final String key = threadName + "." + taskName + "." + cacheName; + synchronized (cacheLevelSensors) { + if (!cacheLevelSensors.containsKey(key)) { + cacheLevelSensors.put(key, new LinkedList()); + } + + final String fullSensorName = key + "." + sensorName; + + final Sensor sensor = metrics.sensor(fullSensorName, recordingLevel, parents); + + cacheLevelSensors.get(key).push(fullSensorName); + + return sensor; + } + } + + public final void removeAllCacheLevelSensors(final String taskName, final String cacheName) { + final String key = threadName + "." + taskName + "." + cacheName; + synchronized (cacheLevelSensors) { + if (cacheLevelSensors.containsKey(key)) { + while (!cacheLevelSensors.get(key).isEmpty()) { + metrics.removeSensor(cacheLevelSensors.get(key).pop()); + } + cacheLevelSensors.remove(key); + } + } } protected final Map tags() { @@ -236,11 +326,4 @@ public class StreamsMetricsImpl implements StreamsMetrics { } } - public void removeOwnedSensors() { - synchronized (ownedSensors) { - while (!ownedSensors.isEmpty()) { - metrics.removeSensor(ownedSensors.pop()); - } - } - } } diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/NamedCache.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/NamedCache.java index de62a2d3696..d058c9cee4f 100644 --- a/streams/src/main/java/org/apache/kafka/streams/state/internals/NamedCache.java +++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/NamedCache.java @@ -16,13 +16,13 @@ */ package org.apache.kafka.streams.state.internals; +import org.apache.kafka.common.MetricName; import org.apache.kafka.common.metrics.Sensor; import org.apache.kafka.common.metrics.stats.Avg; import org.apache.kafka.common.metrics.stats.Max; import org.apache.kafka.common.metrics.stats.Min; import org.apache.kafka.common.utils.Bytes; import org.apache.kafka.streams.KeyValue; -import org.apache.kafka.streams.StreamsMetrics; import org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -53,7 +53,7 @@ class NamedCache { private long numOverwrites = 0; private long numFlushes = 0; - NamedCache(final String name, final StreamsMetrics metrics) { + NamedCache(final String name, final StreamsMetricsImpl metrics) { this.name = name; this.namedCacheMetrics = new NamedCacheMetrics(metrics, name); } @@ -355,45 +355,66 @@ class NamedCache { private static class NamedCacheMetrics { private final StreamsMetricsImpl metrics; - private final String groupName; - private final Map metricTags; - private final Map allMetricTags; + private final Sensor hitRatioSensor; + private final String taskName; + private final String cacheName; - private NamedCacheMetrics(final StreamsMetrics metrics, final String name) { - final String scope = "record-cache"; - final String opName = "hitRatio"; - final String tagKey = scope + "-id"; - final String tagValue = ThreadCache.underlyingStoreNamefromCacheName(name); - this.groupName = "stream-" + scope + "-metrics"; - this.metrics = (StreamsMetricsImpl) metrics; - this.allMetricTags = ((StreamsMetricsImpl) metrics).tagMap(tagKey, "all", - "task-id", ThreadCache.taskIDfromCacheName(name)); - this.metricTags = ((StreamsMetricsImpl) metrics).tagMap(tagKey, tagValue, - "task-id", ThreadCache.taskIDfromCacheName(name)); + private NamedCacheMetrics(final StreamsMetricsImpl metrics, final String cacheName) { + taskName = ThreadCache.taskIDfromCacheName(cacheName); + this.cacheName = cacheName; + this.metrics = metrics; + final String group = "stream-record-cache-metrics"; // add parent - final Sensor parent = this.metrics.registry().sensor(opName, Sensor.RecordingLevel.DEBUG); - parent.add(this.metrics.registry().metricName(opName + "-avg", groupName, - "The average cache hit ratio.", allMetricTags), new Avg()); - parent.add(this.metrics.registry().metricName(opName + "-min", groupName, - "The minimum cache hit ratio.", allMetricTags), new Min()); - parent.add(this.metrics.registry().metricName(opName + "-max", groupName, - "The maximum cache hit ratio.", allMetricTags), new Max()); + final Map allMetricTags = metrics.tagMap( + "record-cache-id", "all", + "task-id", taskName + ); + final Sensor taskLevelHitRatioSensor = metrics.taskLevelSensor("hitRatio", taskName, Sensor.RecordingLevel.DEBUG); + taskLevelHitRatioSensor.add( + new MetricName("hitRatio-avg", group, "The average cache hit ratio.", allMetricTags), + new Avg() + ); + taskLevelHitRatioSensor.add( + new MetricName("hitRatio-min", group, "The minimum cache hit ratio.", allMetricTags), + new Min() + ); + taskLevelHitRatioSensor.add( + new MetricName("hitRatio-max", group, "The maximum cache hit ratio.", allMetricTags), + new Max() + ); // add child - hitRatioSensor = this.metrics.registry().sensor(opName, Sensor.RecordingLevel.DEBUG, parent); - hitRatioSensor.add(this.metrics.registry().metricName(opName + "-avg", groupName, - "The average cache hit ratio.", metricTags), new Avg()); - hitRatioSensor.add(this.metrics.registry().metricName(opName + "-min", groupName, - "The minimum cache hit ratio.", metricTags), new Min()); - hitRatioSensor.add(this.metrics.registry().metricName(opName + "-max", groupName, - "The maximum cache hit ratio.", metricTags), new Max()); + final Map metricTags = metrics.tagMap( + "record-cache-id", ThreadCache.underlyingStoreNamefromCacheName(cacheName), + "task-id", taskName + ); + + hitRatioSensor = metrics.cacheLevelSensor( + taskName, + cacheName, + "hitRatio", + Sensor.RecordingLevel.DEBUG, + taskLevelHitRatioSensor + ); + hitRatioSensor.add( + new MetricName("hitRatio-avg", group, "The average cache hit ratio.", metricTags), + new Avg() + ); + hitRatioSensor.add( + new MetricName("hitRatio-min", group, "The minimum cache hit ratio.", metricTags), + new Min() + ); + hitRatioSensor.add( + new MetricName("hitRatio-max", group, "The maximum cache hit ratio.", metricTags), + new Max() + ); } private void removeAllSensors() { - metrics.removeSensor(hitRatioSensor); + metrics.removeAllCacheLevelSensors(taskName, cacheName); } } } diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/ThreadCache.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/ThreadCache.java index b1fd1989168..b947664fe29 100644 --- a/streams/src/main/java/org/apache/kafka/streams/state/internals/ThreadCache.java +++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/ThreadCache.java @@ -19,8 +19,8 @@ package org.apache.kafka.streams.state.internals; import org.apache.kafka.common.utils.Bytes; import org.apache.kafka.common.utils.LogContext; import org.apache.kafka.streams.KeyValue; -import org.apache.kafka.streams.StreamsMetrics; import org.apache.kafka.streams.processor.internals.RecordContext; +import org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl; import org.slf4j.Logger; import java.util.Collections; @@ -39,7 +39,7 @@ import java.util.NoSuchElementException; public class ThreadCache { private final Logger log; private final long maxCacheSizeBytes; - private final StreamsMetrics metrics; + private final StreamsMetricsImpl metrics; private final Map caches = new HashMap<>(); // internal stats @@ -52,7 +52,7 @@ public class ThreadCache { void apply(final List dirty); } - public ThreadCache(final LogContext logContext, long maxCacheSizeBytes, final StreamsMetrics metrics) { + public ThreadCache(final LogContext logContext, long maxCacheSizeBytes, final StreamsMetricsImpl metrics) { this.maxCacheSizeBytes = maxCacheSizeBytes; this.metrics = metrics; this.log = logContext.logger(getClass()); @@ -91,7 +91,7 @@ public class ThreadCache { * @return */ public static String taskIDfromCacheName(final String cacheName) { - String[] tokens = cacheName.split("-"); + String[] tokens = cacheName.split("-", 2); return tokens[0]; } @@ -101,7 +101,7 @@ public class ThreadCache { * @return */ public static String underlyingStoreNamefromCacheName(final String cacheName) { - String[] tokens = cacheName.split("-"); + String[] tokens = cacheName.split("-", 2); return tokens[1]; } diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamSessionWindowAggregateProcessorTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamSessionWindowAggregateProcessorTest.java index 301d448ae65..8cb2eaebe1c 100644 --- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamSessionWindowAggregateProcessorTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamSessionWindowAggregateProcessorTest.java @@ -121,7 +121,6 @@ public class KStreamSessionWindowAggregateProcessorTest { @After public void closeStore() { - context.close(); sessionStore.close(); } diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorNodeTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorNodeTest.java index 1409d683e89..a7a26107bf2 100644 --- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorNodeTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorNodeTest.java @@ -148,7 +148,6 @@ public class ProcessorNodeTest { "The average number of occurrence of " + throughputOperation + " operation per second.", metricTags))); - context.close(); } } diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/SinkNodeTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/SinkNodeTest.java index 0013167fb24..753d26bd86e 100644 --- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/SinkNodeTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/SinkNodeTest.java @@ -26,7 +26,6 @@ import org.apache.kafka.streams.errors.DefaultProductionExceptionHandler; import org.apache.kafka.streams.errors.StreamsException; import org.apache.kafka.streams.state.StateSerdes; import org.apache.kafka.test.InternalMockProcessorContext; -import org.junit.After; import org.junit.Before; import org.junit.Test; @@ -55,11 +54,6 @@ public class SinkNodeTest { sink.init(context); } - @After - public void after() { - context.close(); - } - @Test @SuppressWarnings("unchecked") public void shouldThrowStreamsExceptionOnInputRecordWithInvalidTimestamp() { diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamTaskTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamTaskTest.java index 28e0b46b2a6..598e47ee13e 100644 --- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamTaskTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamTaskTest.java @@ -232,7 +232,6 @@ public class StreamTaskTest { public void testMetrics() { task = createStatelessTask(createConfig(false)); - assertNotNull(metrics.getSensor("commit")); assertNotNull(getMetric("%s-latency-avg", "The average latency of %s operation.", task.id().toString())); assertNotNull(getMetric("%s-latency-max", "The max latency of %s operation.", task.id().toString())); assertNotNull(getMetric("%s-rate", "The average number of occurrence of %s operation per second.", task.id().toString())); diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamThreadTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamThreadTest.java index 36a1bce8cf0..3ae7acbc25d 100644 --- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamThreadTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamThreadTest.java @@ -243,17 +243,8 @@ public class StreamThreadTest { public void testMetricsCreatedAtStartup() { final StreamThread thread = createStreamThread(clientId, config, false); final String defaultGroupName = "stream-metrics"; - final String defaultPrefix = "thread." + thread.getName(); final Map defaultTags = Collections.singletonMap("client-id", thread.getName()); - assertNotNull(metrics.getSensor(defaultPrefix + ".commit-latency")); - assertNotNull(metrics.getSensor(defaultPrefix + ".poll-latency")); - assertNotNull(metrics.getSensor(defaultPrefix + ".process-latency")); - assertNotNull(metrics.getSensor(defaultPrefix + ".punctuate-latency")); - assertNotNull(metrics.getSensor(defaultPrefix + ".task-created")); - assertNotNull(metrics.getSensor(defaultPrefix + ".task-closed")); - assertNotNull(metrics.getSensor(defaultPrefix + ".skipped-records")); - assertNotNull(metrics.metrics().get(metrics.metricName("commit-latency-avg", defaultGroupName, "The average commit time in ms", defaultTags))); assertNotNull(metrics.metrics().get(metrics.metricName("commit-latency-max", defaultGroupName, "The maximum commit time in ms", defaultTags))); assertNotNull(metrics.metrics().get(metrics.metricName("commit-rate", defaultGroupName, "The average per-second number of commit calls", defaultTags))); diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/AbstractKeyValueStoreTest.java b/streams/src/test/java/org/apache/kafka/streams/state/internals/AbstractKeyValueStoreTest.java index 51c782ad9d5..c4536dfd2a2 100644 --- a/streams/src/test/java/org/apache/kafka/streams/state/internals/AbstractKeyValueStoreTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/AbstractKeyValueStoreTest.java @@ -63,7 +63,6 @@ public abstract class AbstractKeyValueStoreTest { @After public void after() { store.close(); - context.close(); driver.clear(); } diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/CachingKeyValueStoreTest.java b/streams/src/test/java/org/apache/kafka/streams/state/internals/CachingKeyValueStoreTest.java index 8705326b782..3e0241e3043 100644 --- a/streams/src/test/java/org/apache/kafka/streams/state/internals/CachingKeyValueStoreTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/CachingKeyValueStoreTest.java @@ -82,7 +82,6 @@ public class CachingKeyValueStoreTest extends AbstractKeyValueStoreTest { @After public void after() { super.after(); - context.close(); } @SuppressWarnings("unchecked") diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/CachingSessionStoreTest.java b/streams/src/test/java/org/apache/kafka/streams/state/internals/CachingSessionStoreTest.java index a9a66e9a7df..b77f4e979d5 100644 --- a/streams/src/test/java/org/apache/kafka/streams/state/internals/CachingSessionStoreTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/CachingSessionStoreTest.java @@ -82,7 +82,6 @@ public class CachingSessionStoreTest { @After public void close() { - context.close(); cachingStore.close(); } diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/CachingWindowStoreTest.java b/streams/src/test/java/org/apache/kafka/streams/state/internals/CachingWindowStoreTest.java index c25655b6b2e..a87b2e41890 100644 --- a/streams/src/test/java/org/apache/kafka/streams/state/internals/CachingWindowStoreTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/CachingWindowStoreTest.java @@ -87,7 +87,6 @@ public class CachingWindowStoreTest { @After public void closeStore() { - context.close(); cachingStore.close(); } diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/ChangeLoggingKeyValueBytesStoreTest.java b/streams/src/test/java/org/apache/kafka/streams/state/internals/ChangeLoggingKeyValueBytesStoreTest.java index 7342c93abcc..5bb0de7a0eb 100644 --- a/streams/src/test/java/org/apache/kafka/streams/state/internals/ChangeLoggingKeyValueBytesStoreTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/ChangeLoggingKeyValueBytesStoreTest.java @@ -76,7 +76,6 @@ public class ChangeLoggingKeyValueBytesStoreTest { @After public void after() { - context.close(); store.close(); } diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/MeteredWindowStoreTest.java b/streams/src/test/java/org/apache/kafka/streams/state/internals/MeteredWindowStoreTest.java index eab523e398d..19bd523a8d5 100644 --- a/streams/src/test/java/org/apache/kafka/streams/state/internals/MeteredWindowStoreTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/MeteredWindowStoreTest.java @@ -35,7 +35,6 @@ import org.apache.kafka.test.NoOpRecordCollector; import org.apache.kafka.test.StreamsTestUtils; import org.apache.kafka.test.TestUtils; import org.easymock.EasyMock; -import org.junit.After; import org.junit.Before; import org.junit.Test; @@ -83,11 +82,6 @@ public class MeteredWindowStoreTest { ); } - @After - public void after() { - context.close(); - } - @Test public void shouldRecordRestoreLatencyOnInit() { innerStoreMock.init(context, store); diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/NamedCacheTest.java b/streams/src/test/java/org/apache/kafka/streams/state/internals/NamedCacheTest.java index 6b410dc73dd..9ae0feb1850 100644 --- a/streams/src/test/java/org/apache/kafka/streams/state/internals/NamedCacheTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/NamedCacheTest.java @@ -16,12 +16,11 @@ */ package org.apache.kafka.streams.state.internals; -import org.apache.kafka.common.MetricName; -import org.apache.kafka.common.metrics.KafkaMetric; import org.apache.kafka.common.metrics.Metrics; import org.apache.kafka.common.utils.Bytes; import org.apache.kafka.streams.KeyValue; import org.apache.kafka.streams.processor.internals.MockStreamsMetrics; +import org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl; import org.junit.Before; import org.junit.Test; @@ -44,13 +43,14 @@ import static org.junit.Assert.assertSame; public class NamedCacheTest { private NamedCache cache; - private MockStreamsMetrics streamMetrics; + private StreamsMetricsImpl metrics; private final String taskIDString = "0.0"; private final String underlyingStoreName = "storeName"; + @Before public void setUp() { - streamMetrics = new MockStreamsMetrics(new Metrics()); - cache = new NamedCache(taskIDString + "-" + underlyingStoreName, streamMetrics); + metrics = new MockStreamsMetrics(new Metrics()); + cache = new NamedCache(taskIDString + "-" + underlyingStoreName, metrics); } @Test @@ -83,18 +83,15 @@ public class NamedCacheTest { metricTags.put("task-id", taskIDString); metricTags.put("client-id", "test"); - assertNotNull(streamMetrics.registry().getSensor("hitRatio")); - final Map metrics1 = streamMetrics.registry().metrics(); - getMetricByNameFilterByTags(metrics1, "hitRatio-avg", "stream-record-cache-metrics", metricTags); - getMetricByNameFilterByTags(metrics1, "hitRatio-min", "stream-record-cache-metrics", metricTags); - getMetricByNameFilterByTags(metrics1, "hitRatio-max", "stream-record-cache-metrics", metricTags); + getMetricByNameFilterByTags(metrics.metrics(), "hitRatio-avg", "stream-record-cache-metrics", metricTags); + getMetricByNameFilterByTags(metrics.metrics(), "hitRatio-min", "stream-record-cache-metrics", metricTags); + getMetricByNameFilterByTags(metrics.metrics(), "hitRatio-max", "stream-record-cache-metrics", metricTags); // test "all" metricTags.put("record-cache-id", "all"); - final Map metrics = streamMetrics.registry().metrics(); - getMetricByNameFilterByTags(metrics, "hitRatio-avg", "stream-record-cache-metrics", metricTags); - getMetricByNameFilterByTags(metrics, "hitRatio-min", "stream-record-cache-metrics", metricTags); - getMetricByNameFilterByTags(metrics, "hitRatio-max", "stream-record-cache-metrics", metricTags); + getMetricByNameFilterByTags(metrics.metrics(), "hitRatio-avg", "stream-record-cache-metrics", metricTags); + getMetricByNameFilterByTags(metrics.metrics(), "hitRatio-min", "stream-record-cache-metrics", metricTags); + getMetricByNameFilterByTags(metrics.metrics(), "hitRatio-max", "stream-record-cache-metrics", metricTags); } @Test diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBKeyValueStoreSupplierTest.java b/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBKeyValueStoreSupplierTest.java index 098c3262e34..b25b8cbb00a 100644 --- a/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBKeyValueStoreSupplierTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBKeyValueStoreSupplierTest.java @@ -53,7 +53,6 @@ public class RocksDBKeyValueStoreSupplierTest { @After public void close() { - context.close(); store.close(); } diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBSegmentedBytesStoreTest.java b/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBSegmentedBytesStoreTest.java index 388a2fc47ba..bd2fa9110a5 100644 --- a/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBSegmentedBytesStoreTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBSegmentedBytesStoreTest.java @@ -82,7 +82,6 @@ public class RocksDBSegmentedBytesStoreTest { @After public void close() { - context.close(); bytesStore.close(); } diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBSessionStoreSupplierTest.java b/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBSessionStoreSupplierTest.java index 272e0b0f5e2..c50dfba2942 100644 --- a/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBSessionStoreSupplierTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBSessionStoreSupplierTest.java @@ -68,7 +68,6 @@ public class RocksDBSessionStoreSupplierTest { @After public void close() { - context.close(); store.close(); } diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBSessionStoreTest.java b/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBSessionStoreTest.java index 64953153045..bcb411b0709 100644 --- a/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBSessionStoreTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBSessionStoreTest.java @@ -69,7 +69,6 @@ public class RocksDBSessionStoreTest { @After public void close() { - context.close(); sessionStore.close(); } diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBStoreTest.java b/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBStoreTest.java index a09d87dbad3..b7a9d375d9c 100644 --- a/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBStoreTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBStoreTest.java @@ -79,7 +79,6 @@ public class RocksDBStoreTest { @After public void tearDown() { rocksDBStore.close(); - context.close(); } @Test diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBWindowStoreSupplierTest.java b/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBWindowStoreSupplierTest.java index a6ccfdf9337..7409a13c154 100644 --- a/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBWindowStoreSupplierTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBWindowStoreSupplierTest.java @@ -53,7 +53,6 @@ public class RocksDBWindowStoreSupplierTest { @After public void close() { - context.close(); if (store != null) { store.close(); } diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBWindowStoreTest.java b/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBWindowStoreTest.java index bf556adebfa..92edbd8d31c 100644 --- a/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBWindowStoreTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBWindowStoreTest.java @@ -126,7 +126,6 @@ public class RocksDBWindowStoreTest { @After public void closeStore() { - context.close(); if (windowStore != null) { windowStore.close(); } diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/SegmentIteratorTest.java b/streams/src/test/java/org/apache/kafka/streams/state/internals/SegmentIteratorTest.java index d61218eb9b5..7a7b266537b 100644 --- a/streams/src/test/java/org/apache/kafka/streams/state/internals/SegmentIteratorTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/SegmentIteratorTest.java @@ -76,7 +76,6 @@ public class SegmentIteratorTest { } segmentOne.close(); segmentTwo.close(); - context.close(); } @Test diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/SegmentsTest.java b/streams/src/test/java/org/apache/kafka/streams/state/internals/SegmentsTest.java index ec59a008112..bfa317ddc51 100644 --- a/streams/src/test/java/org/apache/kafka/streams/state/internals/SegmentsTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/SegmentsTest.java @@ -65,7 +65,6 @@ public class SegmentsTest { @After public void close() { - context.close(); segments.close(); } diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/StoreChangeLoggerTest.java b/streams/src/test/java/org/apache/kafka/streams/state/internals/StoreChangeLoggerTest.java index 21b5c5c06cd..6bacd910cce 100644 --- a/streams/src/test/java/org/apache/kafka/streams/state/internals/StoreChangeLoggerTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/StoreChangeLoggerTest.java @@ -25,7 +25,6 @@ import org.apache.kafka.streams.processor.StreamPartitioner; import org.apache.kafka.streams.processor.internals.RecordCollectorImpl; import org.apache.kafka.streams.state.StateSerdes; import org.apache.kafka.test.InternalMockProcessorContext; -import org.junit.After; import org.junit.Test; import java.util.HashMap; @@ -68,11 +67,6 @@ public class StoreChangeLoggerTest { private final StoreChangeLogger changeLogger = new StoreChangeLogger<>(topic, context, StateSerdes.withBuiltinTypes(topic, Integer.class, String.class)); - @After - public void after() { - context.close(); - } - @Test public void testAddRemove() { context.setTime(1); diff --git a/streams/src/test/java/org/apache/kafka/test/InternalMockProcessorContext.java b/streams/src/test/java/org/apache/kafka/test/InternalMockProcessorContext.java index 57e3efb1b16..5e619102ede 100644 --- a/streams/src/test/java/org/apache/kafka/test/InternalMockProcessorContext.java +++ b/streams/src/test/java/org/apache/kafka/test/InternalMockProcessorContext.java @@ -51,7 +51,6 @@ import java.util.Map; public class InternalMockProcessorContext extends AbstractProcessorContext implements RecordCollector.Supplier { private final File stateDir; - private final Metrics metrics; private final RecordCollector.Supplier recordCollectorSupplier; private final Map storeMap = new LinkedHashMap<>(); private final Map restoreFuncs = new HashMap<>(); @@ -135,7 +134,6 @@ public class InternalMockProcessorContext extends AbstractProcessorContext imple this.stateDir = stateDir; this.keySerde = keySerde; this.valSerde = valSerde; - this.metrics = metrics.registry(); this.recordCollectorSupplier = collectorSupplier; } @@ -306,10 +304,6 @@ public class InternalMockProcessorContext extends AbstractProcessorContext imple restoreListener.onRestoreEnd(null, storeName, 0L); } - public void close() { - metrics.close(); - } - private StateRestoreListener getStateRestoreListener(StateRestoreCallback restoreCallback) { if (restoreCallback instanceof StateRestoreListener) { return (StateRestoreListener) restoreCallback; diff --git a/streams/src/test/java/org/apache/kafka/test/KStreamTestDriver.java b/streams/src/test/java/org/apache/kafka/test/KStreamTestDriver.java index 7313414981b..3daf0511867 100644 --- a/streams/src/test/java/org/apache/kafka/test/KStreamTestDriver.java +++ b/streams/src/test/java/org/apache/kafka/test/KStreamTestDriver.java @@ -147,7 +147,12 @@ public class KStreamTestDriver extends ExternalResource { private void initTopology(final ProcessorTopology topology, final List stores) { for (final StateStore store : stores) { - store.init(context, store); + try { + store.init(context, store); + } catch (final RuntimeException e) { + new RuntimeException("Fatal exception initializing store.", e).printStackTrace(); + throw e; + } } for (final ProcessorNode node : topology.processors()) { @@ -230,7 +235,6 @@ public class KStreamTestDriver extends ExternalResource { } closeState(); - context.close(); } public Set allProcessorNames() {