Browse Source

KAFKA-9480: Fix bug that prevented to measure task-level process-rate (#8018)

Reviewers: Guozhang Wang <guozhang@confluent.io>, Matthias J. Sax <matthias@confluent.io>
pull/8093/head
Bruno Cadonna 5 years ago committed by GitHub
parent
commit
3dfc6c15e4
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 9
      streams/src/main/java/org/apache/kafka/streams/processor/internals/SourceNode.java
  2. 35
      streams/src/test/java/org/apache/kafka/common/metrics/SensorAccessor.java
  3. 13
      streams/src/test/java/org/apache/kafka/streams/processor/internals/SourceNodeTest.java

9
streams/src/main/java/org/apache/kafka/streams/processor/internals/SourceNode.java

@ -66,14 +66,19 @@ public class SourceNode<K, V> extends ProcessorNode<K, V> { @@ -66,14 +66,19 @@ public class SourceNode<K, V> extends ProcessorNode<K, V> {
@SuppressWarnings("unchecked")
@Override
public void init(final InternalProcessorContext context) {
super.init(context);
this.context = context;
// It is important to first create the sensor before calling init on the
// parent object. Otherwise due to backwards compatibility an empty sensor
// without parent is created with the same name.
// Once the backwards compatibility is not needed anymore it might be possible to
// change this.
processAtSourceSensor = ProcessorNodeMetrics.processorAtSourceSensorOrForwardSensor(
Thread.currentThread().getName(),
context.taskId().toString(),
context.currentNode().name(),
context.metrics()
);
super.init(context);
this.context = context;
// if deserializers are null, get the default ones from the context
if (this.keyDeserializer == null) {

35
streams/src/test/java/org/apache/kafka/common/metrics/SensorAccessor.java

@ -0,0 +1,35 @@ @@ -0,0 +1,35 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.common.metrics;
import java.util.List;
/**
* This class allows unit tests to access package-private members in class {@link Sensor}.
*/
public class SensorAccessor {
public final Sensor sensor;
public SensorAccessor(final Sensor sensor) {
this.sensor = sensor;
}
public List<Sensor> parents() {
return sensor.parents();
}
}

13
streams/src/test/java/org/apache/kafka/streams/processor/internals/SourceNodeTest.java

@ -19,6 +19,8 @@ package org.apache.kafka.streams.processor.internals; @@ -19,6 +19,8 @@ package org.apache.kafka.streams.processor.internals;
import org.apache.kafka.common.header.Headers;
import org.apache.kafka.common.header.internals.RecordHeaders;
import org.apache.kafka.common.metrics.Metrics;
import org.apache.kafka.common.metrics.Sensor;
import org.apache.kafka.common.metrics.SensorAccessor;
import org.apache.kafka.common.serialization.Deserializer;
import org.apache.kafka.streams.StreamsConfig;
import org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl;
@ -30,11 +32,13 @@ import org.junit.Test; @@ -30,11 +32,13 @@ import org.junit.Test;
import java.nio.charset.StandardCharsets;
import java.util.Collections;
import java.util.Map;
import java.util.stream.Collectors;
import static org.apache.kafka.common.utils.Utils.mkEntry;
import static org.apache.kafka.common.utils.Utils.mkMap;
import static org.hamcrest.CoreMatchers.is;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.contains;
import static org.junit.Assert.assertTrue;
public class SourceNodeTest {
@ -112,6 +116,15 @@ public class SourceNodeTest { @@ -112,6 +116,15 @@ public class SourceNodeTest {
final String parentGroupName = "stream-task-metrics";
assertTrue(StreamsTestUtils.containsMetric(metrics, "process-rate", parentGroupName, metricTags));
assertTrue(StreamsTestUtils.containsMetric(metrics, "process-total", parentGroupName, metricTags));
final String sensorNamePrefix = "internal." + threadId + ".task." + context.taskId().toString();
final Sensor processSensor =
metrics.getSensor(sensorNamePrefix + ".node." + context.currentNode().name() + ".s.process");
final SensorAccessor sensorAccessor = new SensorAccessor(processSensor);
assertThat(
sensorAccessor.parents().stream().map(Sensor::name).collect(Collectors.toList()),
contains(sensorNamePrefix + ".s.process")
);
}
}
}

Loading…
Cancel
Save