Browse Source

KAFKA-9557: correct thread process-rate sensor to measure throughput (#8112)

Correct the process-rate (and total) sensor to measure throughput (and total record processing count).

Reviewers: Guozhang Wang <guozhang@confluent.io>
pull/8120/head
John Roesler 5 years ago committed by GitHub
parent
commit
8d0b069b0f
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 16
      streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
  2. 44
      streams/src/main/java/org/apache/kafka/streams/processor/internals/metrics/ThreadMetrics.java
  3. 77
      streams/src/test/java/org/apache/kafka/streams/processor/internals/metrics/ThreadMetricsTest.java

16
streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java

@ -502,7 +502,8 @@ public class StreamThread extends Thread {
private final Sensor commitSensor; private final Sensor commitSensor;
private final Sensor pollSensor; private final Sensor pollSensor;
private final Sensor punctuateSensor; private final Sensor punctuateSensor;
private final Sensor processSensor; private final Sensor processLatencySensor;
private final Sensor processRateSensor;
private long now; private long now;
private long lastPollMs; private long lastPollMs;
@ -637,7 +638,8 @@ public class StreamThread extends Thread {
this.streamsMetrics = streamsMetrics; this.streamsMetrics = streamsMetrics;
this.commitSensor = ThreadMetrics.commitSensor(threadId, streamsMetrics); this.commitSensor = ThreadMetrics.commitSensor(threadId, streamsMetrics);
this.pollSensor = ThreadMetrics.pollSensor(threadId, streamsMetrics); this.pollSensor = ThreadMetrics.pollSensor(threadId, streamsMetrics);
this.processSensor = ThreadMetrics.processSensor(threadId, streamsMetrics); this.processLatencySensor = ThreadMetrics.processLatencySensor(threadId, streamsMetrics);
this.processRateSensor = ThreadMetrics.processRateSensor(threadId, streamsMetrics);
this.punctuateSensor = ThreadMetrics.punctuateSensor(threadId, streamsMetrics); this.punctuateSensor = ThreadMetrics.punctuateSensor(threadId, streamsMetrics);
// The following sensors are created here but their references are not stored in this object, since within // The following sensors are created here but their references are not stored in this object, since within
@ -851,11 +853,19 @@ public class StreamThread extends Thread {
do { do {
for (int i = 0; i < numIterations; i++) { for (int i = 0; i < numIterations; i++) {
advanceNowAndComputeLatency();
processed = taskManager.process(now); processed = taskManager.process(now);
if (processed > 0) { if (processed > 0) {
// It makes no difference to the outcome of these metrics when we record "0",
// so we can just avoid the method call when we didn't process anything.
processRateSensor.record(processed, now);
// This metric is scaled to represent the _average_ processing time of _each_
// task. Note, it's hard to interpret this as defined, but we would need a KIP
// to change it to simply report the overall time spent processing all tasks.
final long processLatency = advanceNowAndComputeLatency(); final long processLatency = advanceNowAndComputeLatency();
processSensor.record(processLatency / (double) processed, now); processLatencySensor.record(processLatency / (double) processed, now);
// commit any tasks that have requested a commit // commit any tasks that have requested a commit
final int committed = taskManager.maybeCommitActiveTasksPerUserRequested(); final int committed = taskManager.maybeCommitActiveTasksPerUserRequested();

44
streams/src/main/java/org/apache/kafka/streams/processor/internals/metrics/ThreadMetrics.java

@ -22,15 +22,17 @@ import org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.V
import java.util.Map; import java.util.Map;
import static org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.LATENCY_SUFFIX;
import static org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.RATE_DESCRIPTION; import static org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.RATE_DESCRIPTION;
import static org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.RATE_SUFFIX;
import static org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.ROLLUP_VALUE; import static org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.ROLLUP_VALUE;
import static org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.LATENCY_SUFFIX;
import static org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.TASK_LEVEL_GROUP; import static org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.TASK_LEVEL_GROUP;
import static org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.THREAD_LEVEL_GROUP; import static org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.THREAD_LEVEL_GROUP;
import static org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.THREAD_LEVEL_GROUP_0100_TO_24; import static org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.THREAD_LEVEL_GROUP_0100_TO_24;
import static org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.TOTAL_DESCRIPTION; import static org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.TOTAL_DESCRIPTION;
import static org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.addAvgAndMaxToSensor; import static org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.addAvgAndMaxToSensor;
import static org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.addInvocationRateAndCountToSensor; import static org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.addInvocationRateAndCountToSensor;
import static org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.addRateOfSumAndSumMetricsToSensor;
public class ThreadMetrics { public class ThreadMetrics {
private ThreadMetrics() {} private ThreadMetrics() {}
@ -146,18 +148,40 @@ public class ThreadMetrics {
); );
} }
public static Sensor processSensor(final String threadId, public static Sensor processLatencySensor(final String threadId,
final StreamsMetricsImpl streamsMetrics) { final StreamsMetricsImpl streamsMetrics) {
return invocationRateAndCountAndAvgAndMaxLatencySensor( final Sensor sensor = streamsMetrics.threadLevelSensor(threadId,
threadId, PROCESS + LATENCY_SUFFIX,
RecordingLevel.INFO);
final Map<String, String> tagMap = streamsMetrics.threadLevelTagMap(threadId);
final String threadLevelGroup = threadLevelGroup(streamsMetrics);
addAvgAndMaxToSensor(
sensor,
threadLevelGroup,
tagMap,
PROCESS + LATENCY_SUFFIX,
PROCESS_AVG_LATENCY_DESCRIPTION,
PROCESS_MAX_LATENCY_DESCRIPTION
);
return sensor;
}
public static Sensor processRateSensor(final String threadId,
final StreamsMetricsImpl streamsMetrics) {
final Sensor sensor = streamsMetrics.threadLevelSensor(threadId,
PROCESS + RATE_SUFFIX,
RecordingLevel.INFO);
final Map<String, String> tagMap = streamsMetrics.threadLevelTagMap(threadId);
final String threadLevelGroup = threadLevelGroup(streamsMetrics);
addRateOfSumAndSumMetricsToSensor(
sensor,
threadLevelGroup,
tagMap,
PROCESS, PROCESS,
PROCESS_RATE_DESCRIPTION, PROCESS_RATE_DESCRIPTION,
PROCESS_TOTAL_DESCRIPTION, PROCESS_TOTAL_DESCRIPTION
PROCESS_AVG_LATENCY_DESCRIPTION,
PROCESS_MAX_LATENCY_DESCRIPTION,
Sensor.RecordingLevel.INFO,
streamsMetrics
); );
return sensor;
} }
public static Sensor punctuateSensor(final String threadId, public static Sensor punctuateSensor(final String threadId,

77
streams/src/test/java/org/apache/kafka/streams/processor/internals/metrics/ThreadMetricsTest.java

@ -16,6 +16,7 @@
*/ */
package org.apache.kafka.streams.processor.internals.metrics; package org.apache.kafka.streams.processor.internals.metrics;
import org.apache.kafka.common.MetricName;
import org.apache.kafka.common.metrics.Sensor; import org.apache.kafka.common.metrics.Sensor;
import org.apache.kafka.common.metrics.Sensor.RecordingLevel; import org.apache.kafka.common.metrics.Sensor.RecordingLevel;
import org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.Version; import org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.Version;
@ -35,6 +36,8 @@ import java.util.Collections;
import java.util.Map; import java.util.Map;
import static org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.ROLLUP_VALUE; import static org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.ROLLUP_VALUE;
import static org.easymock.EasyMock.anyObject;
import static org.easymock.EasyMock.eq;
import static org.easymock.EasyMock.expect; import static org.easymock.EasyMock.expect;
import static org.easymock.EasyMock.mock; import static org.easymock.EasyMock.mock;
import static org.hamcrest.CoreMatchers.is; import static org.hamcrest.CoreMatchers.is;
@ -75,7 +78,6 @@ public class ThreadMetricsTest {
@Before @Before
public void setUp() { public void setUp() {
expect(streamsMetrics.version()).andReturn(builtInMetricsVersion).anyTimes(); expect(streamsMetrics.version()).andReturn(builtInMetricsVersion).anyTimes();
mockStatic(StreamsMetricsImpl.class);
} }
@Test @Test
@ -85,6 +87,7 @@ public class ThreadMetricsTest {
final String rateDescription = "The average per-second number of newly created tasks"; final String rateDescription = "The average per-second number of newly created tasks";
expect(streamsMetrics.threadLevelSensor(THREAD_ID, operation, RecordingLevel.INFO)).andReturn(expectedSensor); expect(streamsMetrics.threadLevelSensor(THREAD_ID, operation, RecordingLevel.INFO)).andReturn(expectedSensor);
expect(streamsMetrics.threadLevelTagMap(THREAD_ID)).andReturn(tagMap); expect(streamsMetrics.threadLevelTagMap(THREAD_ID)).andReturn(tagMap);
mockStatic(StreamsMetricsImpl.class);
StreamsMetricsImpl.addInvocationRateAndCountToSensor( StreamsMetricsImpl.addInvocationRateAndCountToSensor(
expectedSensor, expectedSensor,
threadLevelGroup, threadLevelGroup,
@ -108,6 +111,7 @@ public class ThreadMetricsTest {
final String rateDescription = "The average per-second number of closed tasks"; final String rateDescription = "The average per-second number of closed tasks";
expect(streamsMetrics.threadLevelSensor(THREAD_ID, operation, RecordingLevel.INFO)).andReturn(expectedSensor); expect(streamsMetrics.threadLevelSensor(THREAD_ID, operation, RecordingLevel.INFO)).andReturn(expectedSensor);
expect(streamsMetrics.threadLevelTagMap(THREAD_ID)).andReturn(tagMap); expect(streamsMetrics.threadLevelTagMap(THREAD_ID)).andReturn(tagMap);
mockStatic(StreamsMetricsImpl.class);
StreamsMetricsImpl.addInvocationRateAndCountToSensor( StreamsMetricsImpl.addInvocationRateAndCountToSensor(
expectedSensor, expectedSensor,
threadLevelGroup, threadLevelGroup,
@ -134,6 +138,7 @@ public class ThreadMetricsTest {
final String maxLatencyDescription = "The maximum commit latency"; final String maxLatencyDescription = "The maximum commit latency";
expect(streamsMetrics.threadLevelSensor(THREAD_ID, operation, RecordingLevel.INFO)).andReturn(expectedSensor); expect(streamsMetrics.threadLevelSensor(THREAD_ID, operation, RecordingLevel.INFO)).andReturn(expectedSensor);
expect(streamsMetrics.threadLevelTagMap(THREAD_ID)).andReturn(tagMap); expect(streamsMetrics.threadLevelTagMap(THREAD_ID)).andReturn(tagMap);
mockStatic(StreamsMetricsImpl.class);
StreamsMetricsImpl.addInvocationRateAndCountToSensor( StreamsMetricsImpl.addInvocationRateAndCountToSensor(
expectedSensor, expectedSensor,
threadLevelGroup, threadLevelGroup,
@ -167,6 +172,7 @@ public class ThreadMetricsTest {
final String maxLatencyDescription = "The maximum poll latency"; final String maxLatencyDescription = "The maximum poll latency";
expect(streamsMetrics.threadLevelSensor(THREAD_ID, operation, RecordingLevel.INFO)).andReturn(expectedSensor); expect(streamsMetrics.threadLevelSensor(THREAD_ID, operation, RecordingLevel.INFO)).andReturn(expectedSensor);
expect(streamsMetrics.threadLevelTagMap(THREAD_ID)).andReturn(tagMap); expect(streamsMetrics.threadLevelTagMap(THREAD_ID)).andReturn(tagMap);
mockStatic(StreamsMetricsImpl.class);
StreamsMetricsImpl.addInvocationRateAndCountToSensor( StreamsMetricsImpl.addInvocationRateAndCountToSensor(
expectedSensor, expectedSensor,
threadLevelGroup, threadLevelGroup,
@ -192,34 +198,52 @@ public class ThreadMetricsTest {
} }
@Test @Test
public void shouldGetProcessSensor() { public void shouldGetProcessLatencySensor() {
final String operation = "process"; expect(streamsMetrics.threadLevelSensor(THREAD_ID, "process-latency", RecordingLevel.INFO))
final String operationLatency = operation + StreamsMetricsImpl.LATENCY_SUFFIX; .andReturn(expectedSensor);
final String totalDescription = "The total number of calls to process";
final String rateDescription = "The average per-second number of calls to process";
final String avgLatencyDescription = "The average process latency";
final String maxLatencyDescription = "The maximum process latency";
expect(streamsMetrics.threadLevelSensor(THREAD_ID, operation, RecordingLevel.INFO)).andReturn(expectedSensor);
expect(streamsMetrics.threadLevelTagMap(THREAD_ID)).andReturn(tagMap); expect(streamsMetrics.threadLevelTagMap(THREAD_ID)).andReturn(tagMap);
StreamsMetricsImpl.addInvocationRateAndCountToSensor( expect(expectedSensor.add(eq(new MetricName(
expectedSensor, "process-latency-avg",
threadLevelGroup, threadLevelGroup,
tagMap, "The average execution time in ms for processing, across all running tasks of this thread.",
operation, tagMap
rateDescription, )), anyObject())).andReturn(true);
totalDescription
); expect(expectedSensor.add(eq(new MetricName(
StreamsMetricsImpl.addAvgAndMaxToSensor( "process-latency-max",
expectedSensor,
threadLevelGroup, threadLevelGroup,
tagMap, "The maximum execution time in ms for processing across all running tasks of this thread.",
operationLatency, tagMap
avgLatencyDescription, )), anyObject())).andReturn(true);
maxLatencyDescription replay(StreamsMetricsImpl.class, streamsMetrics, expectedSensor);
);
replay(StreamsMetricsImpl.class, streamsMetrics);
final Sensor sensor = ThreadMetrics.processSensor(THREAD_ID, streamsMetrics); final Sensor sensor = ThreadMetrics.processLatencySensor(THREAD_ID, streamsMetrics);
verify(StreamsMetricsImpl.class, streamsMetrics);
assertThat(sensor, is(expectedSensor));
}
@Test
public void shouldGetProcessRateSensor() {
expect(streamsMetrics.threadLevelSensor(THREAD_ID, "process-rate", RecordingLevel.INFO))
.andReturn(expectedSensor);
expect(streamsMetrics.threadLevelTagMap(THREAD_ID)).andReturn(tagMap);
expect(expectedSensor.add(eq(new MetricName(
"process-rate",
threadLevelGroup,
"The average per-second number of calls to process",
tagMap
)), anyObject())).andReturn(true);
expect(expectedSensor.add(eq(new MetricName(
"process-total",
threadLevelGroup,
"The total number of calls to process",
tagMap
)), anyObject())).andReturn(true);
replay(StreamsMetricsImpl.class, streamsMetrics, expectedSensor);
final Sensor sensor = ThreadMetrics.processRateSensor(THREAD_ID, streamsMetrics);
verify(StreamsMetricsImpl.class, streamsMetrics); verify(StreamsMetricsImpl.class, streamsMetrics);
assertThat(sensor, is(expectedSensor)); assertThat(sensor, is(expectedSensor));
@ -235,6 +259,7 @@ public class ThreadMetricsTest {
final String maxLatencyDescription = "The maximum punctuate latency"; final String maxLatencyDescription = "The maximum punctuate latency";
expect(streamsMetrics.threadLevelSensor(THREAD_ID, operation, RecordingLevel.INFO)).andReturn(expectedSensor); expect(streamsMetrics.threadLevelSensor(THREAD_ID, operation, RecordingLevel.INFO)).andReturn(expectedSensor);
expect(streamsMetrics.threadLevelTagMap(THREAD_ID)).andReturn(tagMap); expect(streamsMetrics.threadLevelTagMap(THREAD_ID)).andReturn(tagMap);
mockStatic(StreamsMetricsImpl.class);
StreamsMetricsImpl.addInvocationRateAndCountToSensor( StreamsMetricsImpl.addInvocationRateAndCountToSensor(
expectedSensor, expectedSensor,
threadLevelGroup, threadLevelGroup,
@ -266,6 +291,7 @@ public class ThreadMetricsTest {
expect(streamsMetrics.threadLevelSensor(THREAD_ID, operation, RecordingLevel.INFO)) expect(streamsMetrics.threadLevelSensor(THREAD_ID, operation, RecordingLevel.INFO))
.andReturn(expectedSensor); .andReturn(expectedSensor);
expect(streamsMetrics.threadLevelTagMap(THREAD_ID)).andReturn(tagMap); expect(streamsMetrics.threadLevelTagMap(THREAD_ID)).andReturn(tagMap);
mockStatic(StreamsMetricsImpl.class);
StreamsMetricsImpl.addInvocationRateAndCountToSensor( StreamsMetricsImpl.addInvocationRateAndCountToSensor(
expectedSensor, expectedSensor,
threadLevelGroup, threadLevelGroup,
@ -296,6 +322,7 @@ public class ThreadMetricsTest {
"The maximum commit latency over all tasks assigned to one stream thread"; "The maximum commit latency over all tasks assigned to one stream thread";
expect(streamsMetrics.threadLevelSensor(THREAD_ID, operation, RecordingLevel.DEBUG)).andReturn(expectedSensor); expect(streamsMetrics.threadLevelSensor(THREAD_ID, operation, RecordingLevel.DEBUG)).andReturn(expectedSensor);
expect(streamsMetrics.taskLevelTagMap(THREAD_ID, ROLLUP_VALUE)).andReturn(tagMap); expect(streamsMetrics.taskLevelTagMap(THREAD_ID, ROLLUP_VALUE)).andReturn(tagMap);
mockStatic(StreamsMetricsImpl.class);
StreamsMetricsImpl.addInvocationRateAndCountToSensor( StreamsMetricsImpl.addInvocationRateAndCountToSensor(
expectedSensor, expectedSensor,
TASK_LEVEL_GROUP, TASK_LEVEL_GROUP,

Loading…
Cancel
Save