Browse Source

MINOR: Fix sensor retrieval in stand0by task's constructor (#7632)

We should not use StreamsMetricsImpl. threadLevelSensor directly which would only retrieve the sensor but would not add any metrics to the sensor. Generally speaking we should always use the corresponding-level Metrics class (e.g. ThreadMetrics) to get the sensors which are populated with metrics.

Reviewers: Guozhang Wang <wangguoz@gmail.com>, John Roesler <john@confluent.io>, Bill Bejeck <bbejeck@gmail.com>, Matthias J. Sax <matthias@confluent.io>
pull/7634/head
Bruno Cadonna 5 years ago committed by Guozhang Wang
parent
commit
96d95e947a
  1. 4
      streams/src/main/java/org/apache/kafka/streams/processor/internals/StandbyTask.java

4
streams/src/main/java/org/apache/kafka/streams/processor/internals/StandbyTask.java

@ -28,6 +28,7 @@ import org.apache.kafka.streams.StreamsMetrics; @@ -28,6 +28,7 @@ import org.apache.kafka.streams.StreamsMetrics;
import org.apache.kafka.streams.errors.ProcessorStateException;
import org.apache.kafka.streams.processor.TaskId;
import org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl;
import org.apache.kafka.streams.processor.internals.metrics.ThreadMetrics;
import java.util.ArrayList;
import java.util.Collections;
@ -68,8 +69,7 @@ public class StandbyTask extends AbstractTask { @@ -68,8 +69,7 @@ public class StandbyTask extends AbstractTask {
final StateDirectory stateDirectory) {
super(id, partitions, topology, consumer, changelogReader, true, stateDirectory, config);
closeTaskSensor = metrics
.threadLevelSensor(Thread.currentThread().getName(), "task-closed", Sensor.RecordingLevel.INFO);
closeTaskSensor = ThreadMetrics.closeTaskSensor(Thread.currentThread().getName(), metrics);
processorContext = new StandbyContextImpl(id, config, stateMgr, metrics);
final Set<String> changelogTopicNames = new HashSet<>(topology.storeToChangelogTopic().values());

Loading…
Cancel
Save