Browse Source

MINOR: fix NamedCache metrics in Streams (#4917)

* Fixes a bug in which all NamedCache instances in a process shared
one parent metric.

* Also fixes a bug which incorrectly computed the per-cache metric tag
(which was undetected due to the former bug).

* Drop the StreamsMetricsConventions#xLevelSensorName convention
in favor of StreamsMetricsImpl#xLevelSensor to allow StreamsMetricsImpl
to track thread- and cache-level metrics, so that they may be cleanly declared
from anywhere but still unloaded at the appropriate time. This was necessary
right now so that the NamedCache could register a thread-level parent sensor
to be unloaded when the thread, not the cache, is closed.

* The above changes made it mostly unnecessary for the StreamsMetricsImpl to
expose a reference to the underlying Metrics registry, so I did a little extra work
to remove that reference, including removing inconsistently-used and unnecessary
calls to Metrics#close() in the tests.

The existing tests should be sufficient to verify this change.

Reviewers: Bill Bejeck <bill@confluent.io>, Guozhang Wang <wangguoz@gmail.com>
pull/4937/head
John Roesler 7 years ago committed by Guozhang Wang
parent
commit
fc6e92260c
  1. 3
      streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java
  2. 2
      streams/src/main/java/org/apache/kafka/streams/processor/internals/GlobalStreamThread.java
  3. 53
      streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java
  4. 82
      streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
  5. 39
      streams/src/main/java/org/apache/kafka/streams/processor/internals/metrics/StreamsMetricsConventions.java
  6. 117
      streams/src/main/java/org/apache/kafka/streams/processor/internals/metrics/StreamsMetricsImpl.java
  7. 83
      streams/src/main/java/org/apache/kafka/streams/state/internals/NamedCache.java
  8. 10
      streams/src/main/java/org/apache/kafka/streams/state/internals/ThreadCache.java
  9. 1
      streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamSessionWindowAggregateProcessorTest.java
  10. 1
      streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorNodeTest.java
  11. 6
      streams/src/test/java/org/apache/kafka/streams/processor/internals/SinkNodeTest.java
  12. 1
      streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamTaskTest.java
  13. 9
      streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamThreadTest.java
  14. 1
      streams/src/test/java/org/apache/kafka/streams/state/internals/AbstractKeyValueStoreTest.java
  15. 1
      streams/src/test/java/org/apache/kafka/streams/state/internals/CachingKeyValueStoreTest.java
  16. 1
      streams/src/test/java/org/apache/kafka/streams/state/internals/CachingSessionStoreTest.java
  17. 1
      streams/src/test/java/org/apache/kafka/streams/state/internals/CachingWindowStoreTest.java
  18. 1
      streams/src/test/java/org/apache/kafka/streams/state/internals/ChangeLoggingKeyValueBytesStoreTest.java
  19. 6
      streams/src/test/java/org/apache/kafka/streams/state/internals/MeteredWindowStoreTest.java
  20. 25
      streams/src/test/java/org/apache/kafka/streams/state/internals/NamedCacheTest.java
  21. 1
      streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBKeyValueStoreSupplierTest.java
  22. 1
      streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBSegmentedBytesStoreTest.java
  23. 1
      streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBSessionStoreSupplierTest.java
  24. 1
      streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBSessionStoreTest.java
  25. 1
      streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBStoreTest.java
  26. 1
      streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBWindowStoreSupplierTest.java
  27. 1
      streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBWindowStoreTest.java
  28. 1
      streams/src/test/java/org/apache/kafka/streams/state/internals/SegmentIteratorTest.java
  29. 1
      streams/src/test/java/org/apache/kafka/streams/state/internals/SegmentsTest.java
  30. 6
      streams/src/test/java/org/apache/kafka/streams/state/internals/StoreChangeLoggerTest.java
  31. 6
      streams/src/test/java/org/apache/kafka/test/InternalMockProcessorContext.java
  32. 8
      streams/src/test/java/org/apache/kafka/test/KStreamTestDriver.java

3
streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java

@ -673,7 +673,8 @@ public class KafkaStreams { @@ -673,7 +673,8 @@ public class KafkaStreams {
throw new StreamsException(fatal);
}
final MetricConfig metricConfig = new MetricConfig().samples(config.getInt(StreamsConfig.METRICS_NUM_SAMPLES_CONFIG))
final MetricConfig metricConfig = new MetricConfig()
.samples(config.getInt(StreamsConfig.METRICS_NUM_SAMPLES_CONFIG))
.recordLevel(Sensor.RecordingLevel.forName(config.getString(StreamsConfig.METRICS_RECORDING_LEVEL_CONFIG)))
.timeWindow(config.getLong(StreamsConfig.METRICS_SAMPLE_WINDOW_MS_CONFIG), TimeUnit.MILLISECONDS);
final List<MetricsReporter> reporters = config.getConfiguredInstances(StreamsConfig.METRIC_REPORTER_CLASSES_CONFIG,

2
streams/src/main/java/org/apache/kafka/streams/processor/internals/GlobalStreamThread.java

@ -298,7 +298,7 @@ public class GlobalStreamThread extends Thread { @@ -298,7 +298,7 @@ public class GlobalStreamThread extends Thread {
log.error("Failed to close state maintainer due to the following error:", e);
}
streamsMetrics.removeOwnedSensors();
streamsMetrics.removeAllThreadLevelSensors();
setState(DEAD);

53
streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java

@ -22,9 +22,14 @@ import org.apache.kafka.clients.consumer.ConsumerRecord; @@ -22,9 +22,14 @@ import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.OffsetAndMetadata;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.common.KafkaException;
import org.apache.kafka.common.MetricName;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.errors.ProducerFencedException;
import org.apache.kafka.common.metrics.Sensor;
import org.apache.kafka.common.metrics.stats.Avg;
import org.apache.kafka.common.metrics.stats.Count;
import org.apache.kafka.common.metrics.stats.Max;
import org.apache.kafka.common.metrics.stats.Rate;
import org.apache.kafka.common.utils.Time;
import org.apache.kafka.streams.StreamsConfig;
import org.apache.kafka.streams.errors.DeserializationExceptionHandler;
@ -42,6 +47,7 @@ import org.apache.kafka.streams.state.internals.ThreadCache; @@ -42,6 +47,7 @@ import org.apache.kafka.streams.state.internals.ThreadCache;
import java.util.Collection;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.TimeUnit;
import static java.lang.String.format;
import static java.util.Collections.singleton;
@ -72,16 +78,57 @@ public class StreamTask extends AbstractTask implements ProcessorNodePunctuator @@ -72,16 +78,57 @@ public class StreamTask extends AbstractTask implements ProcessorNodePunctuator
protected static final class TaskMetrics {
final StreamsMetricsImpl metrics;
final Sensor taskCommitTimeSensor;
private final String taskName;
TaskMetrics(final TaskId id, final StreamsMetricsImpl metrics) {
final String name = id.toString();
taskName = id.toString();
this.metrics = metrics;
taskCommitTimeSensor = metrics.addLatencyAndThroughputSensor("task", name, "commit", Sensor.RecordingLevel.DEBUG);
final String group = "stream-task-metrics";
// first add the global operation metrics if not yet, with the global tags only
final Map<String, String> allTagMap = metrics.tagMap("task-id", "all");
final Sensor parent = metrics.threadLevelSensor("commit", Sensor.RecordingLevel.DEBUG);
parent.add(
new MetricName("commit-latency-avg", group, "The average latency of commit operation.", allTagMap),
new Avg()
);
parent.add(
new MetricName("commit-latency-max", group, "The max latency of commit operation.", allTagMap),
new Max()
);
parent.add(
new MetricName("commit-rate", group, "The average number of occurrence of commit operation per second.", allTagMap),
new Rate(TimeUnit.SECONDS, new Count())
);
parent.add(
new MetricName("commit-total", group, "The total number of occurrence of commit operations.", allTagMap),
new Count()
);
// add the operation metrics with additional tags
final Map<String, String> tagMap = metrics.tagMap("task-id", taskName);
taskCommitTimeSensor = metrics.taskLevelSensor("commit", taskName, Sensor.RecordingLevel.DEBUG, parent);
taskCommitTimeSensor.add(
new MetricName("commit-latency-avg", group, "The average latency of commit operation.", tagMap),
new Avg()
);
taskCommitTimeSensor.add(
new MetricName("commit-latency-max", group, "The max latency of commit operation.", tagMap),
new Max()
);
taskCommitTimeSensor.add(
new MetricName("commit-rate", group, "The average number of occurrence of commit operation per second.", tagMap),
new Rate(TimeUnit.SECONDS, new Count())
);
taskCommitTimeSensor.add(
new MetricName("commit-total", group, "The total number of occurrence of commit operations.", tagMap),
new Count()
);
}
void removeAllSensors() {
metrics.removeSensor(taskCommitTimeSensor);
metrics.removeAllTaskLevelSensors(taskName);
}
}

82
streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java

@ -52,10 +52,8 @@ import java.util.ArrayList; @@ -52,10 +52,8 @@ import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.Deque;
import java.util.HashMap;
import java.util.HashSet;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Set;
@ -64,7 +62,6 @@ import java.util.concurrent.TimeUnit; @@ -64,7 +62,6 @@ import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import static java.util.Collections.singleton;
import static org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsConventions.threadLevelSensorName;
public class StreamThread extends Thread {
@ -509,58 +506,41 @@ public class StreamThread extends Thread { @@ -509,58 +506,41 @@ public class StreamThread extends Thread {
private final Sensor taskCreatedSensor;
private final Sensor tasksClosedSensor;
private final Deque<String> ownedSensors = new LinkedList<>();
StreamsMetricsThreadImpl(final Metrics metrics, final String threadName) {
super(metrics, threadName);
final String groupName = "stream-metrics";
commitTimeSensor = metrics.sensor(threadLevelSensorName(threadName, "commit-latency"), Sensor.RecordingLevel.INFO);
commitTimeSensor.add(metrics.metricName("commit-latency-avg", groupName, "The average commit time in ms", tags()), new Avg());
commitTimeSensor.add(metrics.metricName("commit-latency-max", groupName, "The maximum commit time in ms", tags()), new Max());
commitTimeSensor.add(metrics.metricName("commit-rate", groupName, "The average per-second number of commit calls", tags()), new Rate(TimeUnit.SECONDS, new Count()));
commitTimeSensor.add(metrics.metricName("commit-total", groupName, "The total number of commit calls", tags()), new Count());
ownedSensors.push(commitTimeSensor.name());
pollTimeSensor = metrics.sensor(threadLevelSensorName(threadName, "poll-latency"), Sensor.RecordingLevel.INFO);
pollTimeSensor.add(metrics.metricName("poll-latency-avg", groupName, "The average poll time in ms", tags()), new Avg());
pollTimeSensor.add(metrics.metricName("poll-latency-max", groupName, "The maximum poll time in ms", tags()), new Max());
pollTimeSensor.add(metrics.metricName("poll-rate", groupName, "The average per-second number of record-poll calls", tags()), new Rate(TimeUnit.SECONDS, new Count()));
pollTimeSensor.add(metrics.metricName("poll-total", groupName, "The total number of record-poll calls", tags()), new Count());
ownedSensors.push(pollTimeSensor.name());
processTimeSensor = metrics.sensor(threadLevelSensorName(threadName, "process-latency"), Sensor.RecordingLevel.INFO);
processTimeSensor.add(metrics.metricName("process-latency-avg", groupName, "The average process time in ms", tags()), new Avg());
processTimeSensor.add(metrics.metricName("process-latency-max", groupName, "The maximum process time in ms", tags()), new Max());
processTimeSensor.add(metrics.metricName("process-rate", groupName, "The average per-second number of process calls", tags()), new Rate(TimeUnit.SECONDS, new Count()));
processTimeSensor.add(metrics.metricName("process-total", groupName, "The total number of process calls", tags()), new Count());
ownedSensors.push(processTimeSensor.name());
punctuateTimeSensor = metrics.sensor(threadLevelSensorName(threadName, "punctuate-latency"), Sensor.RecordingLevel.INFO);
punctuateTimeSensor.add(metrics.metricName("punctuate-latency-avg", groupName, "The average punctuate time in ms", tags()), new Avg());
punctuateTimeSensor.add(metrics.metricName("punctuate-latency-max", groupName, "The maximum punctuate time in ms", tags()), new Max());
punctuateTimeSensor.add(metrics.metricName("punctuate-rate", groupName, "The average per-second number of punctuate calls", tags()), new Rate(TimeUnit.SECONDS, new Count()));
punctuateTimeSensor.add(metrics.metricName("punctuate-total", groupName, "The total number of punctuate calls", tags()), new Count());
ownedSensors.push(punctuateTimeSensor.name());
taskCreatedSensor = metrics.sensor(threadLevelSensorName(threadName, "task-created"), Sensor.RecordingLevel.INFO);
final String group = "stream-metrics";
commitTimeSensor = threadLevelSensor("commit-latency", Sensor.RecordingLevel.INFO);
commitTimeSensor.add(metrics.metricName("commit-latency-avg", group, "The average commit time in ms", tags()), new Avg());
commitTimeSensor.add(metrics.metricName("commit-latency-max", group, "The maximum commit time in ms", tags()), new Max());
commitTimeSensor.add(metrics.metricName("commit-rate", group, "The average per-second number of commit calls", tags()), new Rate(TimeUnit.SECONDS, new Count()));
commitTimeSensor.add(metrics.metricName("commit-total", group, "The total number of commit calls", tags()), new Count());
pollTimeSensor = threadLevelSensor("poll-latency", Sensor.RecordingLevel.INFO);
pollTimeSensor.add(metrics.metricName("poll-latency-avg", group, "The average poll time in ms", tags()), new Avg());
pollTimeSensor.add(metrics.metricName("poll-latency-max", group, "The maximum poll time in ms", tags()), new Max());
pollTimeSensor.add(metrics.metricName("poll-rate", group, "The average per-second number of record-poll calls", tags()), new Rate(TimeUnit.SECONDS, new Count()));
pollTimeSensor.add(metrics.metricName("poll-total", group, "The total number of record-poll calls", tags()), new Count());
processTimeSensor = threadLevelSensor("process-latency", Sensor.RecordingLevel.INFO);
processTimeSensor.add(metrics.metricName("process-latency-avg", group, "The average process time in ms", tags()), new Avg());
processTimeSensor.add(metrics.metricName("process-latency-max", group, "The maximum process time in ms", tags()), new Max());
processTimeSensor.add(metrics.metricName("process-rate", group, "The average per-second number of process calls", tags()), new Rate(TimeUnit.SECONDS, new Count()));
processTimeSensor.add(metrics.metricName("process-total", group, "The total number of process calls", tags()), new Count());
punctuateTimeSensor = threadLevelSensor("punctuate-latency", Sensor.RecordingLevel.INFO);
punctuateTimeSensor.add(metrics.metricName("punctuate-latency-avg", group, "The average punctuate time in ms", tags()), new Avg());
punctuateTimeSensor.add(metrics.metricName("punctuate-latency-max", group, "The maximum punctuate time in ms", tags()), new Max());
punctuateTimeSensor.add(metrics.metricName("punctuate-rate", group, "The average per-second number of punctuate calls", tags()), new Rate(TimeUnit.SECONDS, new Count()));
punctuateTimeSensor.add(metrics.metricName("punctuate-total", group, "The total number of punctuate calls", tags()), new Count());
taskCreatedSensor = threadLevelSensor("task-created", Sensor.RecordingLevel.INFO);
taskCreatedSensor.add(metrics.metricName("task-created-rate", "stream-metrics", "The average per-second number of newly created tasks", tags()), new Rate(TimeUnit.SECONDS, new Count()));
taskCreatedSensor.add(metrics.metricName("task-created-total", "stream-metrics", "The total number of newly created tasks", tags()), new Total());
ownedSensors.push(taskCreatedSensor.name());
tasksClosedSensor = metrics.sensor(threadLevelSensorName(threadName, "task-closed"), Sensor.RecordingLevel.INFO);
tasksClosedSensor.add(metrics.metricName("task-closed-rate", groupName, "The average per-second number of closed tasks", tags()), new Rate(TimeUnit.SECONDS, new Count()));
tasksClosedSensor.add(metrics.metricName("task-closed-total", groupName, "The total number of closed tasks", tags()), new Total());
ownedSensors.push(tasksClosedSensor.name());
}
public void removeOwnedSensors() {
synchronized (ownedSensors) {
super.removeOwnedSensors();
while (!ownedSensors.isEmpty()) {
registry().removeSensor(ownedSensors.pop());
}
}
tasksClosedSensor = threadLevelSensor("task-closed", Sensor.RecordingLevel.INFO);
tasksClosedSensor.add(metrics.metricName("task-closed-rate", group, "The average per-second number of closed tasks", tags()), new Rate(TimeUnit.SECONDS, new Count()));
tasksClosedSensor.add(metrics.metricName("task-closed-total", group, "The total number of closed tasks", tags()), new Total());
}
}
@ -1165,7 +1145,7 @@ public class StreamThread extends Thread { @@ -1165,7 +1145,7 @@ public class StreamThread extends Thread {
} catch (final Throwable e) {
log.error("Failed to close restore consumer due to the following error:", e);
}
streamsMetrics.removeOwnedSensors();
streamsMetrics.removeAllThreadLevelSensors();
setState(State.DEAD);
log.info("Shutdown complete");

39
streams/src/main/java/org/apache/kafka/streams/processor/internals/metrics/StreamsMetricsConventions.java

@ -1,39 +0,0 @@ @@ -1,39 +0,0 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.streams.processor.internals.metrics;
import java.util.LinkedHashMap;
import java.util.Map;
public final class StreamsMetricsConventions {
private StreamsMetricsConventions() {
}
public static String threadLevelSensorName(final String threadName, final String sensorName) {
return "thread." + threadName + "." + sensorName;
}
static Map<String, String> threadLevelTags(final String threadName, final Map<String, String> tags) {
if (tags.containsKey("client-id")) {
return tags;
} else {
final LinkedHashMap<String, String> newTags = new LinkedHashMap<>(tags);
newTags.put("client-id", threadName);
return newTags;
}
}
}

117
streams/src/main/java/org/apache/kafka/streams/processor/internals/metrics/StreamsMetricsImpl.java

@ -31,35 +31,125 @@ import java.util.Arrays; @@ -31,35 +31,125 @@ import java.util.Arrays;
import java.util.Collections;
import java.util.Deque;
import java.util.HashMap;
import java.util.LinkedHashMap;
import java.util.LinkedList;
import java.util.Map;
import java.util.Objects;
import java.util.concurrent.TimeUnit;
import static org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsConventions.threadLevelSensorName;
public class StreamsMetricsImpl implements StreamsMetrics {
private final Metrics metrics;
private final Map<String, String> tags;
private final Map<Sensor, Sensor> parentSensors;
private final Deque<String> ownedSensors = new LinkedList<>();
private final Sensor skippedRecordsSensor;
private final String threadName;
private final Deque<String> threadLevelSensors = new LinkedList<>();
private final Map<String, Deque<String>> taskLevelSensors = new HashMap<>();
private final Map<String, Deque<String>> cacheLevelSensors = new HashMap<>();
public StreamsMetricsImpl(final Metrics metrics, final String threadName) {
Objects.requireNonNull(metrics, "Metrics cannot be null");
this.threadName = threadName;
this.metrics = metrics;
this.tags = StreamsMetricsConventions.threadLevelTags(threadName, Collections.<String, String>emptyMap());
final HashMap<String, String> tags = new LinkedHashMap<>();
tags.put("client-id", threadName);
this.tags = Collections.unmodifiableMap(tags);
this.parentSensors = new HashMap<>();
skippedRecordsSensor = metrics.sensor(threadLevelSensorName(threadName, "skipped-records"), Sensor.RecordingLevel.INFO);
skippedRecordsSensor.add(metrics.metricName("skipped-records-rate", "stream-metrics", "The average per-second number of skipped records", tags), new Rate(TimeUnit.SECONDS, new Count()));
skippedRecordsSensor.add(metrics.metricName("skipped-records-total", "stream-metrics", "The total number of skipped records", tags), new Total());
ownedSensors.push(skippedRecordsSensor.name());
final String group = "stream-metrics";
skippedRecordsSensor = threadLevelSensor("skipped-records", Sensor.RecordingLevel.INFO);
skippedRecordsSensor.add(metrics.metricName("skipped-records-rate", group, "The average per-second number of skipped records", tags), new Rate(TimeUnit.SECONDS, new Count()));
skippedRecordsSensor.add(metrics.metricName("skipped-records-total", group, "The total number of skipped records", tags), new Total());
}
public final Sensor threadLevelSensor(final String sensorName,
final Sensor.RecordingLevel recordingLevel,
final Sensor... parents) {
synchronized (threadLevelSensors) {
final String fullSensorName = threadName + "." + sensorName;
final Sensor sensor = metrics.sensor(fullSensorName, recordingLevel, parents);
threadLevelSensors.push(fullSensorName);
return sensor;
}
}
public final void removeAllThreadLevelSensors() {
synchronized (threadLevelSensors) {
while (!threadLevelSensors.isEmpty()) {
metrics.removeSensor(threadLevelSensors.pop());
}
}
}
public final Sensor taskLevelSensor(final String taskName,
final String sensorName,
final Sensor.RecordingLevel recordingLevel,
final Sensor... parents) {
final String key = threadName + "." + taskName;
synchronized (taskLevelSensors) {
if (!taskLevelSensors.containsKey(key)) {
taskLevelSensors.put(key, new LinkedList<String>());
}
final String fullSensorName = key + "." + sensorName;
final Sensor sensor = metrics.sensor(fullSensorName, recordingLevel, parents);
taskLevelSensors.get(key).push(fullSensorName);
return sensor;
}
}
public final Metrics registry() {
return metrics;
public final void removeAllTaskLevelSensors(final String taskName) {
final String key = threadName + "." + taskName;
synchronized (taskLevelSensors) {
if (taskLevelSensors.containsKey(key)) {
while (!taskLevelSensors.get(key).isEmpty()) {
metrics.removeSensor(taskLevelSensors.get(key).pop());
}
taskLevelSensors.remove(key);
}
}
}
public final Sensor cacheLevelSensor(final String taskName,
final String cacheName,
final String sensorName,
final Sensor.RecordingLevel recordingLevel,
final Sensor... parents) {
final String key = threadName + "." + taskName + "." + cacheName;
synchronized (cacheLevelSensors) {
if (!cacheLevelSensors.containsKey(key)) {
cacheLevelSensors.put(key, new LinkedList<String>());
}
final String fullSensorName = key + "." + sensorName;
final Sensor sensor = metrics.sensor(fullSensorName, recordingLevel, parents);
cacheLevelSensors.get(key).push(fullSensorName);
return sensor;
}
}
public final void removeAllCacheLevelSensors(final String taskName, final String cacheName) {
final String key = threadName + "." + taskName + "." + cacheName;
synchronized (cacheLevelSensors) {
if (cacheLevelSensors.containsKey(key)) {
while (!cacheLevelSensors.get(key).isEmpty()) {
metrics.removeSensor(cacheLevelSensors.get(key).pop());
}
cacheLevelSensors.remove(key);
}
}
}
protected final Map<String, String> tags() {
@ -236,11 +326,4 @@ public class StreamsMetricsImpl implements StreamsMetrics { @@ -236,11 +326,4 @@ public class StreamsMetricsImpl implements StreamsMetrics {
}
}
public void removeOwnedSensors() {
synchronized (ownedSensors) {
while (!ownedSensors.isEmpty()) {
metrics.removeSensor(ownedSensors.pop());
}
}
}
}

83
streams/src/main/java/org/apache/kafka/streams/state/internals/NamedCache.java

@ -16,13 +16,13 @@ @@ -16,13 +16,13 @@
*/
package org.apache.kafka.streams.state.internals;
import org.apache.kafka.common.MetricName;
import org.apache.kafka.common.metrics.Sensor;
import org.apache.kafka.common.metrics.stats.Avg;
import org.apache.kafka.common.metrics.stats.Max;
import org.apache.kafka.common.metrics.stats.Min;
import org.apache.kafka.common.utils.Bytes;
import org.apache.kafka.streams.KeyValue;
import org.apache.kafka.streams.StreamsMetrics;
import org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@ -53,7 +53,7 @@ class NamedCache { @@ -53,7 +53,7 @@ class NamedCache {
private long numOverwrites = 0;
private long numFlushes = 0;
NamedCache(final String name, final StreamsMetrics metrics) {
NamedCache(final String name, final StreamsMetricsImpl metrics) {
this.name = name;
this.namedCacheMetrics = new NamedCacheMetrics(metrics, name);
}
@ -355,45 +355,66 @@ class NamedCache { @@ -355,45 +355,66 @@ class NamedCache {
private static class NamedCacheMetrics {
private final StreamsMetricsImpl metrics;
private final String groupName;
private final Map<String, String> metricTags;
private final Map<String, String> allMetricTags;
private final Sensor hitRatioSensor;
private final String taskName;
private final String cacheName;
private NamedCacheMetrics(final StreamsMetrics metrics, final String name) {
final String scope = "record-cache";
final String opName = "hitRatio";
final String tagKey = scope + "-id";
final String tagValue = ThreadCache.underlyingStoreNamefromCacheName(name);
this.groupName = "stream-" + scope + "-metrics";
this.metrics = (StreamsMetricsImpl) metrics;
this.allMetricTags = ((StreamsMetricsImpl) metrics).tagMap(tagKey, "all",
"task-id", ThreadCache.taskIDfromCacheName(name));
this.metricTags = ((StreamsMetricsImpl) metrics).tagMap(tagKey, tagValue,
"task-id", ThreadCache.taskIDfromCacheName(name));
private NamedCacheMetrics(final StreamsMetricsImpl metrics, final String cacheName) {
taskName = ThreadCache.taskIDfromCacheName(cacheName);
this.cacheName = cacheName;
this.metrics = metrics;
final String group = "stream-record-cache-metrics";
// add parent
final Sensor parent = this.metrics.registry().sensor(opName, Sensor.RecordingLevel.DEBUG);
parent.add(this.metrics.registry().metricName(opName + "-avg", groupName,
"The average cache hit ratio.", allMetricTags), new Avg());
parent.add(this.metrics.registry().metricName(opName + "-min", groupName,
"The minimum cache hit ratio.", allMetricTags), new Min());
parent.add(this.metrics.registry().metricName(opName + "-max", groupName,
"The maximum cache hit ratio.", allMetricTags), new Max());
final Map<String, String> allMetricTags = metrics.tagMap(
"record-cache-id", "all",
"task-id", taskName
);
final Sensor taskLevelHitRatioSensor = metrics.taskLevelSensor("hitRatio", taskName, Sensor.RecordingLevel.DEBUG);
taskLevelHitRatioSensor.add(
new MetricName("hitRatio-avg", group, "The average cache hit ratio.", allMetricTags),
new Avg()
);
taskLevelHitRatioSensor.add(
new MetricName("hitRatio-min", group, "The minimum cache hit ratio.", allMetricTags),
new Min()
);
taskLevelHitRatioSensor.add(
new MetricName("hitRatio-max", group, "The maximum cache hit ratio.", allMetricTags),
new Max()
);
// add child
hitRatioSensor = this.metrics.registry().sensor(opName, Sensor.RecordingLevel.DEBUG, parent);
hitRatioSensor.add(this.metrics.registry().metricName(opName + "-avg", groupName,
"The average cache hit ratio.", metricTags), new Avg());
hitRatioSensor.add(this.metrics.registry().metricName(opName + "-min", groupName,
"The minimum cache hit ratio.", metricTags), new Min());
hitRatioSensor.add(this.metrics.registry().metricName(opName + "-max", groupName,
"The maximum cache hit ratio.", metricTags), new Max());
final Map<String, String> metricTags = metrics.tagMap(
"record-cache-id", ThreadCache.underlyingStoreNamefromCacheName(cacheName),
"task-id", taskName
);
hitRatioSensor = metrics.cacheLevelSensor(
taskName,
cacheName,
"hitRatio",
Sensor.RecordingLevel.DEBUG,
taskLevelHitRatioSensor
);
hitRatioSensor.add(
new MetricName("hitRatio-avg", group, "The average cache hit ratio.", metricTags),
new Avg()
);
hitRatioSensor.add(
new MetricName("hitRatio-min", group, "The minimum cache hit ratio.", metricTags),
new Min()
);
hitRatioSensor.add(
new MetricName("hitRatio-max", group, "The maximum cache hit ratio.", metricTags),
new Max()
);
}
private void removeAllSensors() {
metrics.removeSensor(hitRatioSensor);
metrics.removeAllCacheLevelSensors(taskName, cacheName);
}
}
}

10
streams/src/main/java/org/apache/kafka/streams/state/internals/ThreadCache.java

@ -19,8 +19,8 @@ package org.apache.kafka.streams.state.internals; @@ -19,8 +19,8 @@ package org.apache.kafka.streams.state.internals;
import org.apache.kafka.common.utils.Bytes;
import org.apache.kafka.common.utils.LogContext;
import org.apache.kafka.streams.KeyValue;
import org.apache.kafka.streams.StreamsMetrics;
import org.apache.kafka.streams.processor.internals.RecordContext;
import org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl;
import org.slf4j.Logger;
import java.util.Collections;
@ -39,7 +39,7 @@ import java.util.NoSuchElementException; @@ -39,7 +39,7 @@ import java.util.NoSuchElementException;
public class ThreadCache {
private final Logger log;
private final long maxCacheSizeBytes;
private final StreamsMetrics metrics;
private final StreamsMetricsImpl metrics;
private final Map<String, NamedCache> caches = new HashMap<>();
// internal stats
@ -52,7 +52,7 @@ public class ThreadCache { @@ -52,7 +52,7 @@ public class ThreadCache {
void apply(final List<DirtyEntry> dirty);
}
public ThreadCache(final LogContext logContext, long maxCacheSizeBytes, final StreamsMetrics metrics) {
public ThreadCache(final LogContext logContext, long maxCacheSizeBytes, final StreamsMetricsImpl metrics) {
this.maxCacheSizeBytes = maxCacheSizeBytes;
this.metrics = metrics;
this.log = logContext.logger(getClass());
@ -91,7 +91,7 @@ public class ThreadCache { @@ -91,7 +91,7 @@ public class ThreadCache {
* @return
*/
public static String taskIDfromCacheName(final String cacheName) {
String[] tokens = cacheName.split("-");
String[] tokens = cacheName.split("-", 2);
return tokens[0];
}
@ -101,7 +101,7 @@ public class ThreadCache { @@ -101,7 +101,7 @@ public class ThreadCache {
* @return
*/
public static String underlyingStoreNamefromCacheName(final String cacheName) {
String[] tokens = cacheName.split("-");
String[] tokens = cacheName.split("-", 2);
return tokens[1];
}

1
streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamSessionWindowAggregateProcessorTest.java

@ -121,7 +121,6 @@ public class KStreamSessionWindowAggregateProcessorTest { @@ -121,7 +121,6 @@ public class KStreamSessionWindowAggregateProcessorTest {
@After
public void closeStore() {
context.close();
sessionStore.close();
}

1
streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorNodeTest.java

@ -148,7 +148,6 @@ public class ProcessorNodeTest { @@ -148,7 +148,6 @@ public class ProcessorNodeTest {
"The average number of occurrence of " + throughputOperation + " operation per second.", metricTags)));
context.close();
}
}

6
streams/src/test/java/org/apache/kafka/streams/processor/internals/SinkNodeTest.java

@ -26,7 +26,6 @@ import org.apache.kafka.streams.errors.DefaultProductionExceptionHandler; @@ -26,7 +26,6 @@ import org.apache.kafka.streams.errors.DefaultProductionExceptionHandler;
import org.apache.kafka.streams.errors.StreamsException;
import org.apache.kafka.streams.state.StateSerdes;
import org.apache.kafka.test.InternalMockProcessorContext;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
@ -55,11 +54,6 @@ public class SinkNodeTest { @@ -55,11 +54,6 @@ public class SinkNodeTest {
sink.init(context);
}
@After
public void after() {
context.close();
}
@Test
@SuppressWarnings("unchecked")
public void shouldThrowStreamsExceptionOnInputRecordWithInvalidTimestamp() {

1
streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamTaskTest.java

@ -232,7 +232,6 @@ public class StreamTaskTest { @@ -232,7 +232,6 @@ public class StreamTaskTest {
public void testMetrics() {
task = createStatelessTask(createConfig(false));
assertNotNull(metrics.getSensor("commit"));
assertNotNull(getMetric("%s-latency-avg", "The average latency of %s operation.", task.id().toString()));
assertNotNull(getMetric("%s-latency-max", "The max latency of %s operation.", task.id().toString()));
assertNotNull(getMetric("%s-rate", "The average number of occurrence of %s operation per second.", task.id().toString()));

9
streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamThreadTest.java

@ -243,17 +243,8 @@ public class StreamThreadTest { @@ -243,17 +243,8 @@ public class StreamThreadTest {
public void testMetricsCreatedAtStartup() {
final StreamThread thread = createStreamThread(clientId, config, false);
final String defaultGroupName = "stream-metrics";
final String defaultPrefix = "thread." + thread.getName();
final Map<String, String> defaultTags = Collections.singletonMap("client-id", thread.getName());
assertNotNull(metrics.getSensor(defaultPrefix + ".commit-latency"));
assertNotNull(metrics.getSensor(defaultPrefix + ".poll-latency"));
assertNotNull(metrics.getSensor(defaultPrefix + ".process-latency"));
assertNotNull(metrics.getSensor(defaultPrefix + ".punctuate-latency"));
assertNotNull(metrics.getSensor(defaultPrefix + ".task-created"));
assertNotNull(metrics.getSensor(defaultPrefix + ".task-closed"));
assertNotNull(metrics.getSensor(defaultPrefix + ".skipped-records"));
assertNotNull(metrics.metrics().get(metrics.metricName("commit-latency-avg", defaultGroupName, "The average commit time in ms", defaultTags)));
assertNotNull(metrics.metrics().get(metrics.metricName("commit-latency-max", defaultGroupName, "The maximum commit time in ms", defaultTags)));
assertNotNull(metrics.metrics().get(metrics.metricName("commit-rate", defaultGroupName, "The average per-second number of commit calls", defaultTags)));

1
streams/src/test/java/org/apache/kafka/streams/state/internals/AbstractKeyValueStoreTest.java

@ -63,7 +63,6 @@ public abstract class AbstractKeyValueStoreTest { @@ -63,7 +63,6 @@ public abstract class AbstractKeyValueStoreTest {
@After
public void after() {
store.close();
context.close();
driver.clear();
}

1
streams/src/test/java/org/apache/kafka/streams/state/internals/CachingKeyValueStoreTest.java

@ -82,7 +82,6 @@ public class CachingKeyValueStoreTest extends AbstractKeyValueStoreTest { @@ -82,7 +82,6 @@ public class CachingKeyValueStoreTest extends AbstractKeyValueStoreTest {
@After
public void after() {
super.after();
context.close();
}
@SuppressWarnings("unchecked")

1
streams/src/test/java/org/apache/kafka/streams/state/internals/CachingSessionStoreTest.java

@ -82,7 +82,6 @@ public class CachingSessionStoreTest { @@ -82,7 +82,6 @@ public class CachingSessionStoreTest {
@After
public void close() {
context.close();
cachingStore.close();
}

1
streams/src/test/java/org/apache/kafka/streams/state/internals/CachingWindowStoreTest.java

@ -87,7 +87,6 @@ public class CachingWindowStoreTest { @@ -87,7 +87,6 @@ public class CachingWindowStoreTest {
@After
public void closeStore() {
context.close();
cachingStore.close();
}

1
streams/src/test/java/org/apache/kafka/streams/state/internals/ChangeLoggingKeyValueBytesStoreTest.java

@ -76,7 +76,6 @@ public class ChangeLoggingKeyValueBytesStoreTest { @@ -76,7 +76,6 @@ public class ChangeLoggingKeyValueBytesStoreTest {
@After
public void after() {
context.close();
store.close();
}

6
streams/src/test/java/org/apache/kafka/streams/state/internals/MeteredWindowStoreTest.java

@ -35,7 +35,6 @@ import org.apache.kafka.test.NoOpRecordCollector; @@ -35,7 +35,6 @@ import org.apache.kafka.test.NoOpRecordCollector;
import org.apache.kafka.test.StreamsTestUtils;
import org.apache.kafka.test.TestUtils;
import org.easymock.EasyMock;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
@ -83,11 +82,6 @@ public class MeteredWindowStoreTest { @@ -83,11 +82,6 @@ public class MeteredWindowStoreTest {
);
}
@After
public void after() {
context.close();
}
@Test
public void shouldRecordRestoreLatencyOnInit() {
innerStoreMock.init(context, store);

25
streams/src/test/java/org/apache/kafka/streams/state/internals/NamedCacheTest.java

@ -16,12 +16,11 @@ @@ -16,12 +16,11 @@
*/
package org.apache.kafka.streams.state.internals;
import org.apache.kafka.common.MetricName;
import org.apache.kafka.common.metrics.KafkaMetric;
import org.apache.kafka.common.metrics.Metrics;
import org.apache.kafka.common.utils.Bytes;
import org.apache.kafka.streams.KeyValue;
import org.apache.kafka.streams.processor.internals.MockStreamsMetrics;
import org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl;
import org.junit.Before;
import org.junit.Test;
@ -44,13 +43,14 @@ import static org.junit.Assert.assertSame; @@ -44,13 +43,14 @@ import static org.junit.Assert.assertSame;
public class NamedCacheTest {
private NamedCache cache;
private MockStreamsMetrics streamMetrics;
private StreamsMetricsImpl metrics;
private final String taskIDString = "0.0";
private final String underlyingStoreName = "storeName";
@Before
public void setUp() {
streamMetrics = new MockStreamsMetrics(new Metrics());
cache = new NamedCache(taskIDString + "-" + underlyingStoreName, streamMetrics);
metrics = new MockStreamsMetrics(new Metrics());
cache = new NamedCache(taskIDString + "-" + underlyingStoreName, metrics);
}
@Test
@ -83,18 +83,15 @@ public class NamedCacheTest { @@ -83,18 +83,15 @@ public class NamedCacheTest {
metricTags.put("task-id", taskIDString);
metricTags.put("client-id", "test");
assertNotNull(streamMetrics.registry().getSensor("hitRatio"));
final Map<MetricName, KafkaMetric> metrics1 = streamMetrics.registry().metrics();
getMetricByNameFilterByTags(metrics1, "hitRatio-avg", "stream-record-cache-metrics", metricTags);
getMetricByNameFilterByTags(metrics1, "hitRatio-min", "stream-record-cache-metrics", metricTags);
getMetricByNameFilterByTags(metrics1, "hitRatio-max", "stream-record-cache-metrics", metricTags);
getMetricByNameFilterByTags(metrics.metrics(), "hitRatio-avg", "stream-record-cache-metrics", metricTags);
getMetricByNameFilterByTags(metrics.metrics(), "hitRatio-min", "stream-record-cache-metrics", metricTags);
getMetricByNameFilterByTags(metrics.metrics(), "hitRatio-max", "stream-record-cache-metrics", metricTags);
// test "all"
metricTags.put("record-cache-id", "all");
final Map<MetricName, KafkaMetric> metrics = streamMetrics.registry().metrics();
getMetricByNameFilterByTags(metrics, "hitRatio-avg", "stream-record-cache-metrics", metricTags);
getMetricByNameFilterByTags(metrics, "hitRatio-min", "stream-record-cache-metrics", metricTags);
getMetricByNameFilterByTags(metrics, "hitRatio-max", "stream-record-cache-metrics", metricTags);
getMetricByNameFilterByTags(metrics.metrics(), "hitRatio-avg", "stream-record-cache-metrics", metricTags);
getMetricByNameFilterByTags(metrics.metrics(), "hitRatio-min", "stream-record-cache-metrics", metricTags);
getMetricByNameFilterByTags(metrics.metrics(), "hitRatio-max", "stream-record-cache-metrics", metricTags);
}
@Test

1
streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBKeyValueStoreSupplierTest.java

@ -53,7 +53,6 @@ public class RocksDBKeyValueStoreSupplierTest { @@ -53,7 +53,6 @@ public class RocksDBKeyValueStoreSupplierTest {
@After
public void close() {
context.close();
store.close();
}

1
streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBSegmentedBytesStoreTest.java

@ -82,7 +82,6 @@ public class RocksDBSegmentedBytesStoreTest { @@ -82,7 +82,6 @@ public class RocksDBSegmentedBytesStoreTest {
@After
public void close() {
context.close();
bytesStore.close();
}

1
streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBSessionStoreSupplierTest.java

@ -68,7 +68,6 @@ public class RocksDBSessionStoreSupplierTest { @@ -68,7 +68,6 @@ public class RocksDBSessionStoreSupplierTest {
@After
public void close() {
context.close();
store.close();
}

1
streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBSessionStoreTest.java

@ -69,7 +69,6 @@ public class RocksDBSessionStoreTest { @@ -69,7 +69,6 @@ public class RocksDBSessionStoreTest {
@After
public void close() {
context.close();
sessionStore.close();
}

1
streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBStoreTest.java

@ -79,7 +79,6 @@ public class RocksDBStoreTest { @@ -79,7 +79,6 @@ public class RocksDBStoreTest {
@After
public void tearDown() {
rocksDBStore.close();
context.close();
}
@Test

1
streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBWindowStoreSupplierTest.java

@ -53,7 +53,6 @@ public class RocksDBWindowStoreSupplierTest { @@ -53,7 +53,6 @@ public class RocksDBWindowStoreSupplierTest {
@After
public void close() {
context.close();
if (store != null) {
store.close();
}

1
streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBWindowStoreTest.java

@ -126,7 +126,6 @@ public class RocksDBWindowStoreTest { @@ -126,7 +126,6 @@ public class RocksDBWindowStoreTest {
@After
public void closeStore() {
context.close();
if (windowStore != null) {
windowStore.close();
}

1
streams/src/test/java/org/apache/kafka/streams/state/internals/SegmentIteratorTest.java

@ -76,7 +76,6 @@ public class SegmentIteratorTest { @@ -76,7 +76,6 @@ public class SegmentIteratorTest {
}
segmentOne.close();
segmentTwo.close();
context.close();
}
@Test

1
streams/src/test/java/org/apache/kafka/streams/state/internals/SegmentsTest.java

@ -65,7 +65,6 @@ public class SegmentsTest { @@ -65,7 +65,6 @@ public class SegmentsTest {
@After
public void close() {
context.close();
segments.close();
}

6
streams/src/test/java/org/apache/kafka/streams/state/internals/StoreChangeLoggerTest.java

@ -25,7 +25,6 @@ import org.apache.kafka.streams.processor.StreamPartitioner; @@ -25,7 +25,6 @@ import org.apache.kafka.streams.processor.StreamPartitioner;
import org.apache.kafka.streams.processor.internals.RecordCollectorImpl;
import org.apache.kafka.streams.state.StateSerdes;
import org.apache.kafka.test.InternalMockProcessorContext;
import org.junit.After;
import org.junit.Test;
import java.util.HashMap;
@ -68,11 +67,6 @@ public class StoreChangeLoggerTest { @@ -68,11 +67,6 @@ public class StoreChangeLoggerTest {
private final StoreChangeLogger<Integer, String> changeLogger = new StoreChangeLogger<>(topic, context, StateSerdes.withBuiltinTypes(topic, Integer.class, String.class));
@After
public void after() {
context.close();
}
@Test
public void testAddRemove() {
context.setTime(1);

6
streams/src/test/java/org/apache/kafka/test/InternalMockProcessorContext.java

@ -51,7 +51,6 @@ import java.util.Map; @@ -51,7 +51,6 @@ import java.util.Map;
public class InternalMockProcessorContext extends AbstractProcessorContext implements RecordCollector.Supplier {
private final File stateDir;
private final Metrics metrics;
private final RecordCollector.Supplier recordCollectorSupplier;
private final Map<String, StateStore> storeMap = new LinkedHashMap<>();
private final Map<String, StateRestoreCallback> restoreFuncs = new HashMap<>();
@ -135,7 +134,6 @@ public class InternalMockProcessorContext extends AbstractProcessorContext imple @@ -135,7 +134,6 @@ public class InternalMockProcessorContext extends AbstractProcessorContext imple
this.stateDir = stateDir;
this.keySerde = keySerde;
this.valSerde = valSerde;
this.metrics = metrics.registry();
this.recordCollectorSupplier = collectorSupplier;
}
@ -306,10 +304,6 @@ public class InternalMockProcessorContext extends AbstractProcessorContext imple @@ -306,10 +304,6 @@ public class InternalMockProcessorContext extends AbstractProcessorContext imple
restoreListener.onRestoreEnd(null, storeName, 0L);
}
public void close() {
metrics.close();
}
private StateRestoreListener getStateRestoreListener(StateRestoreCallback restoreCallback) {
if (restoreCallback instanceof StateRestoreListener) {
return (StateRestoreListener) restoreCallback;

8
streams/src/test/java/org/apache/kafka/test/KStreamTestDriver.java

@ -147,7 +147,12 @@ public class KStreamTestDriver extends ExternalResource { @@ -147,7 +147,12 @@ public class KStreamTestDriver extends ExternalResource {
private void initTopology(final ProcessorTopology topology, final List<StateStore> stores) {
for (final StateStore store : stores) {
store.init(context, store);
try {
store.init(context, store);
} catch (final RuntimeException e) {
new RuntimeException("Fatal exception initializing store.", e).printStackTrace();
throw e;
}
}
for (final ProcessorNode node : topology.processors()) {
@ -230,7 +235,6 @@ public class KStreamTestDriver extends ExternalResource { @@ -230,7 +235,6 @@ public class KStreamTestDriver extends ExternalResource {
}
closeState();
context.close();
}
public Set<String> allProcessorNames() {

Loading…
Cancel
Save