From 39d5534f8d1b6113d23e9cd38f40ab5495ce3286 Mon Sep 17 00:00:00 2001 From: Bruno Cadonna Date: Thu, 27 Feb 2020 00:33:18 +0100 Subject: [PATCH] MINOR: Remove tag from metric to measure process-rate on source nodes (#8175) Reviewers: Guozhang Wang --- .../processor/internals/metrics/ProcessorNodeMetrics.java | 2 +- .../kafka/streams/processor/internals/SourceNodeTest.java | 2 +- .../processor/internals/metrics/ProcessorNodeMetricsTest.java | 2 +- 3 files changed, 3 insertions(+), 3 deletions(-) diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/metrics/ProcessorNodeMetrics.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/metrics/ProcessorNodeMetrics.java index a1c2d042964..b495f7faad3 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/metrics/ProcessorNodeMetrics.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/metrics/ProcessorNodeMetrics.java @@ -137,7 +137,7 @@ public class ProcessorNodeMetrics { addInvocationRateAndCountToSensor( parentSensor, TASK_LEVEL_GROUP, - streamsMetrics.nodeLevelTagMap(threadId, taskId, ROLLUP_VALUE), + streamsMetrics.taskLevelTagMap(threadId, taskId), PROCESS, PROCESS_RATE_DESCRIPTION, PROCESS_TOTAL_DESCRIPTION diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/SourceNodeTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/SourceNodeTest.java index 97dec8f4311..32ba4fbac6d 100644 --- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/SourceNodeTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/SourceNodeTest.java @@ -112,8 +112,8 @@ public class SourceNodeTest { assertTrue(StreamsTestUtils.containsMetric(metrics, "process-total", groupName, metricTags)); // test parent sensors - metricTags.put("processor-node-id", StreamsMetricsImpl.ROLLUP_VALUE); final String parentGroupName = "stream-task-metrics"; + metricTags.remove("processor-node-id"); assertTrue(StreamsTestUtils.containsMetric(metrics, "process-rate", parentGroupName, metricTags)); assertTrue(StreamsTestUtils.containsMetric(metrics, "process-total", parentGroupName, metricTags)); diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/metrics/ProcessorNodeMetricsTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/metrics/ProcessorNodeMetricsTest.java index 6926ec43683..9778db8334f 100644 --- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/metrics/ProcessorNodeMetricsTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/metrics/ProcessorNodeMetricsTest.java @@ -121,7 +121,7 @@ public class ProcessorNodeMetricsTest { final String descriptionOfRate = "The average number of calls to process per second"; expect(streamsMetrics.taskLevelSensor(THREAD_ID, TASK_ID, metricNamePrefix, RecordingLevel.DEBUG)) .andReturn(expectedParentSensor); - expect(streamsMetrics.nodeLevelTagMap(THREAD_ID, TASK_ID, StreamsMetricsImpl.ROLLUP_VALUE)) + expect(streamsMetrics.taskLevelTagMap(THREAD_ID, TASK_ID)) .andReturn(parentTagMap); StreamsMetricsImpl.addInvocationRateAndCountToSensor( expectedParentSensor,