From 8d0b069b0fbda0826ad447accbf93a19f6e813fe Mon Sep 17 00:00:00 2001 From: John Roesler Date: Fri, 14 Feb 2020 15:54:39 -0600 Subject: [PATCH] KAFKA-9557: correct thread process-rate sensor to measure throughput (#8112) Correct the process-rate (and total) sensor to measure throughput (and total record processing count). Reviewers: Guozhang Wang --- .../processor/internals/StreamThread.java | 16 +++- .../internals/metrics/ThreadMetrics.java | 44 ++++++++--- .../internals/metrics/ThreadMetricsTest.java | 77 +++++++++++++------ 3 files changed, 99 insertions(+), 38 deletions(-) diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java index 7efbbc759ba..94e270b5aa1 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java @@ -502,7 +502,8 @@ public class StreamThread extends Thread { private final Sensor commitSensor; private final Sensor pollSensor; private final Sensor punctuateSensor; - private final Sensor processSensor; + private final Sensor processLatencySensor; + private final Sensor processRateSensor; private long now; private long lastPollMs; @@ -637,7 +638,8 @@ public class StreamThread extends Thread { this.streamsMetrics = streamsMetrics; this.commitSensor = ThreadMetrics.commitSensor(threadId, streamsMetrics); this.pollSensor = ThreadMetrics.pollSensor(threadId, streamsMetrics); - this.processSensor = ThreadMetrics.processSensor(threadId, streamsMetrics); + this.processLatencySensor = ThreadMetrics.processLatencySensor(threadId, streamsMetrics); + this.processRateSensor = ThreadMetrics.processRateSensor(threadId, streamsMetrics); this.punctuateSensor = ThreadMetrics.punctuateSensor(threadId, streamsMetrics); // The following sensors are created here but their references are not stored in this object, since within @@ -851,11 +853,19 @@ public class StreamThread extends Thread { do { for (int i = 0; i < numIterations; i++) { + advanceNowAndComputeLatency(); processed = taskManager.process(now); if (processed > 0) { + // It makes no difference to the outcome of these metrics when we record "0", + // so we can just avoid the method call when we didn't process anything. + processRateSensor.record(processed, now); + + // This metric is scaled to represent the _average_ processing time of _each_ + // task. Note, it's hard to interpret this as defined, but we would need a KIP + // to change it to simply report the overall time spent processing all tasks. final long processLatency = advanceNowAndComputeLatency(); - processSensor.record(processLatency / (double) processed, now); + processLatencySensor.record(processLatency / (double) processed, now); // commit any tasks that have requested a commit final int committed = taskManager.maybeCommitActiveTasksPerUserRequested(); diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/metrics/ThreadMetrics.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/metrics/ThreadMetrics.java index 393d5e73386..be813bed2de 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/metrics/ThreadMetrics.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/metrics/ThreadMetrics.java @@ -22,15 +22,17 @@ import org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.V import java.util.Map; +import static org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.LATENCY_SUFFIX; import static org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.RATE_DESCRIPTION; +import static org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.RATE_SUFFIX; import static org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.ROLLUP_VALUE; -import static org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.LATENCY_SUFFIX; import static org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.TASK_LEVEL_GROUP; import static org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.THREAD_LEVEL_GROUP; import static org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.THREAD_LEVEL_GROUP_0100_TO_24; import static org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.TOTAL_DESCRIPTION; import static org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.addAvgAndMaxToSensor; import static org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.addInvocationRateAndCountToSensor; +import static org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.addRateOfSumAndSumMetricsToSensor; public class ThreadMetrics { private ThreadMetrics() {} @@ -146,18 +148,40 @@ public class ThreadMetrics { ); } - public static Sensor processSensor(final String threadId, - final StreamsMetricsImpl streamsMetrics) { - return invocationRateAndCountAndAvgAndMaxLatencySensor( - threadId, + public static Sensor processLatencySensor(final String threadId, + final StreamsMetricsImpl streamsMetrics) { + final Sensor sensor = streamsMetrics.threadLevelSensor(threadId, + PROCESS + LATENCY_SUFFIX, + RecordingLevel.INFO); + final Map tagMap = streamsMetrics.threadLevelTagMap(threadId); + final String threadLevelGroup = threadLevelGroup(streamsMetrics); + addAvgAndMaxToSensor( + sensor, + threadLevelGroup, + tagMap, + PROCESS + LATENCY_SUFFIX, + PROCESS_AVG_LATENCY_DESCRIPTION, + PROCESS_MAX_LATENCY_DESCRIPTION + ); + return sensor; + } + + public static Sensor processRateSensor(final String threadId, + final StreamsMetricsImpl streamsMetrics) { + final Sensor sensor = streamsMetrics.threadLevelSensor(threadId, + PROCESS + RATE_SUFFIX, + RecordingLevel.INFO); + final Map tagMap = streamsMetrics.threadLevelTagMap(threadId); + final String threadLevelGroup = threadLevelGroup(streamsMetrics); + addRateOfSumAndSumMetricsToSensor( + sensor, + threadLevelGroup, + tagMap, PROCESS, PROCESS_RATE_DESCRIPTION, - PROCESS_TOTAL_DESCRIPTION, - PROCESS_AVG_LATENCY_DESCRIPTION, - PROCESS_MAX_LATENCY_DESCRIPTION, - Sensor.RecordingLevel.INFO, - streamsMetrics + PROCESS_TOTAL_DESCRIPTION ); + return sensor; } public static Sensor punctuateSensor(final String threadId, diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/metrics/ThreadMetricsTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/metrics/ThreadMetricsTest.java index 3f9678da45c..41796522633 100644 --- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/metrics/ThreadMetricsTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/metrics/ThreadMetricsTest.java @@ -16,6 +16,7 @@ */ package org.apache.kafka.streams.processor.internals.metrics; +import org.apache.kafka.common.MetricName; import org.apache.kafka.common.metrics.Sensor; import org.apache.kafka.common.metrics.Sensor.RecordingLevel; import org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.Version; @@ -35,6 +36,8 @@ import java.util.Collections; import java.util.Map; import static org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.ROLLUP_VALUE; +import static org.easymock.EasyMock.anyObject; +import static org.easymock.EasyMock.eq; import static org.easymock.EasyMock.expect; import static org.easymock.EasyMock.mock; import static org.hamcrest.CoreMatchers.is; @@ -75,7 +78,6 @@ public class ThreadMetricsTest { @Before public void setUp() { expect(streamsMetrics.version()).andReturn(builtInMetricsVersion).anyTimes(); - mockStatic(StreamsMetricsImpl.class); } @Test @@ -85,6 +87,7 @@ public class ThreadMetricsTest { final String rateDescription = "The average per-second number of newly created tasks"; expect(streamsMetrics.threadLevelSensor(THREAD_ID, operation, RecordingLevel.INFO)).andReturn(expectedSensor); expect(streamsMetrics.threadLevelTagMap(THREAD_ID)).andReturn(tagMap); + mockStatic(StreamsMetricsImpl.class); StreamsMetricsImpl.addInvocationRateAndCountToSensor( expectedSensor, threadLevelGroup, @@ -108,6 +111,7 @@ public class ThreadMetricsTest { final String rateDescription = "The average per-second number of closed tasks"; expect(streamsMetrics.threadLevelSensor(THREAD_ID, operation, RecordingLevel.INFO)).andReturn(expectedSensor); expect(streamsMetrics.threadLevelTagMap(THREAD_ID)).andReturn(tagMap); + mockStatic(StreamsMetricsImpl.class); StreamsMetricsImpl.addInvocationRateAndCountToSensor( expectedSensor, threadLevelGroup, @@ -134,6 +138,7 @@ public class ThreadMetricsTest { final String maxLatencyDescription = "The maximum commit latency"; expect(streamsMetrics.threadLevelSensor(THREAD_ID, operation, RecordingLevel.INFO)).andReturn(expectedSensor); expect(streamsMetrics.threadLevelTagMap(THREAD_ID)).andReturn(tagMap); + mockStatic(StreamsMetricsImpl.class); StreamsMetricsImpl.addInvocationRateAndCountToSensor( expectedSensor, threadLevelGroup, @@ -167,6 +172,7 @@ public class ThreadMetricsTest { final String maxLatencyDescription = "The maximum poll latency"; expect(streamsMetrics.threadLevelSensor(THREAD_ID, operation, RecordingLevel.INFO)).andReturn(expectedSensor); expect(streamsMetrics.threadLevelTagMap(THREAD_ID)).andReturn(tagMap); + mockStatic(StreamsMetricsImpl.class); StreamsMetricsImpl.addInvocationRateAndCountToSensor( expectedSensor, threadLevelGroup, @@ -192,34 +198,52 @@ public class ThreadMetricsTest { } @Test - public void shouldGetProcessSensor() { - final String operation = "process"; - final String operationLatency = operation + StreamsMetricsImpl.LATENCY_SUFFIX; - final String totalDescription = "The total number of calls to process"; - final String rateDescription = "The average per-second number of calls to process"; - final String avgLatencyDescription = "The average process latency"; - final String maxLatencyDescription = "The maximum process latency"; - expect(streamsMetrics.threadLevelSensor(THREAD_ID, operation, RecordingLevel.INFO)).andReturn(expectedSensor); + public void shouldGetProcessLatencySensor() { + expect(streamsMetrics.threadLevelSensor(THREAD_ID, "process-latency", RecordingLevel.INFO)) + .andReturn(expectedSensor); expect(streamsMetrics.threadLevelTagMap(THREAD_ID)).andReturn(tagMap); - StreamsMetricsImpl.addInvocationRateAndCountToSensor( - expectedSensor, + expect(expectedSensor.add(eq(new MetricName( + "process-latency-avg", threadLevelGroup, - tagMap, - operation, - rateDescription, - totalDescription - ); - StreamsMetricsImpl.addAvgAndMaxToSensor( - expectedSensor, + "The average execution time in ms for processing, across all running tasks of this thread.", + tagMap + )), anyObject())).andReturn(true); + + expect(expectedSensor.add(eq(new MetricName( + "process-latency-max", threadLevelGroup, - tagMap, - operationLatency, - avgLatencyDescription, - maxLatencyDescription - ); - replay(StreamsMetricsImpl.class, streamsMetrics); + "The maximum execution time in ms for processing across all running tasks of this thread.", + tagMap + )), anyObject())).andReturn(true); + replay(StreamsMetricsImpl.class, streamsMetrics, expectedSensor); - final Sensor sensor = ThreadMetrics.processSensor(THREAD_ID, streamsMetrics); + final Sensor sensor = ThreadMetrics.processLatencySensor(THREAD_ID, streamsMetrics); + + verify(StreamsMetricsImpl.class, streamsMetrics); + assertThat(sensor, is(expectedSensor)); + } + + @Test + public void shouldGetProcessRateSensor() { + expect(streamsMetrics.threadLevelSensor(THREAD_ID, "process-rate", RecordingLevel.INFO)) + .andReturn(expectedSensor); + expect(streamsMetrics.threadLevelTagMap(THREAD_ID)).andReturn(tagMap); + expect(expectedSensor.add(eq(new MetricName( + "process-rate", + threadLevelGroup, + "The average per-second number of calls to process", + tagMap + )), anyObject())).andReturn(true); + + expect(expectedSensor.add(eq(new MetricName( + "process-total", + threadLevelGroup, + "The total number of calls to process", + tagMap + )), anyObject())).andReturn(true); + replay(StreamsMetricsImpl.class, streamsMetrics, expectedSensor); + + final Sensor sensor = ThreadMetrics.processRateSensor(THREAD_ID, streamsMetrics); verify(StreamsMetricsImpl.class, streamsMetrics); assertThat(sensor, is(expectedSensor)); @@ -235,6 +259,7 @@ public class ThreadMetricsTest { final String maxLatencyDescription = "The maximum punctuate latency"; expect(streamsMetrics.threadLevelSensor(THREAD_ID, operation, RecordingLevel.INFO)).andReturn(expectedSensor); expect(streamsMetrics.threadLevelTagMap(THREAD_ID)).andReturn(tagMap); + mockStatic(StreamsMetricsImpl.class); StreamsMetricsImpl.addInvocationRateAndCountToSensor( expectedSensor, threadLevelGroup, @@ -266,6 +291,7 @@ public class ThreadMetricsTest { expect(streamsMetrics.threadLevelSensor(THREAD_ID, operation, RecordingLevel.INFO)) .andReturn(expectedSensor); expect(streamsMetrics.threadLevelTagMap(THREAD_ID)).andReturn(tagMap); + mockStatic(StreamsMetricsImpl.class); StreamsMetricsImpl.addInvocationRateAndCountToSensor( expectedSensor, threadLevelGroup, @@ -296,6 +322,7 @@ public class ThreadMetricsTest { "The maximum commit latency over all tasks assigned to one stream thread"; expect(streamsMetrics.threadLevelSensor(THREAD_ID, operation, RecordingLevel.DEBUG)).andReturn(expectedSensor); expect(streamsMetrics.taskLevelTagMap(THREAD_ID, ROLLUP_VALUE)).andReturn(tagMap); + mockStatic(StreamsMetricsImpl.class); StreamsMetricsImpl.addInvocationRateAndCountToSensor( expectedSensor, TASK_LEVEL_GROUP,