Browse Source

KAFKA-6069: Properly tag KafkaStreams metrics with the client id.

Author: Tommy Becker <tobecker@tivo.com>

Reviewers: Bill Bejeck <bill@confluent.io>, Damian Guy <damian.guy@gmail.com>

Closes #4081 from twbecker/KAFKA-6069
pull/4081/merge
Tommy Becker 7 years ago committed by Damian Guy
parent
commit
249e398bf8
  1. 4
      streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsKafkaClient.java
  2. 44
      streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamsKafkaClientTest.java

4
streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsKafkaClient.java

@ -111,7 +111,8 @@ public class StreamsKafkaClient { @@ -111,7 +111,8 @@ public class StreamsKafkaClient {
final Time time = new SystemTime();
final Map<String, String> metricTags = new LinkedHashMap<>();
metricTags.put("client-id", StreamsConfig.CLIENT_ID_CONFIG);
final String clientId = streamsConfig.getString(StreamsConfig.CLIENT_ID_CONFIG);
metricTags.put("client-id", clientId);
final Metadata metadata = new Metadata(streamsConfig.getLong(
StreamsConfig.RETRY_BACKOFF_MS_CONFIG),
@ -129,7 +130,6 @@ public class StreamsKafkaClient { @@ -129,7 +130,6 @@ public class StreamsKafkaClient {
final Metrics metrics = new Metrics(metricConfig, reporters, time);
final ChannelBuilder channelBuilder = ClientUtils.createChannelBuilder(streamsConfig);
final String clientId = streamsConfig.getString(StreamsConfig.CLIENT_ID_CONFIG);
final LogContext logContext = createLogContext(clientId);
final Selector selector = new Selector(

44
streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamsKafkaClientTest.java

@ -17,11 +17,13 @@ @@ -17,11 +17,13 @@
package org.apache.kafka.streams.processor.internals;
import org.apache.kafka.clients.MockClient;
import org.apache.kafka.common.MetricName;
import org.apache.kafka.common.Node;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.config.AbstractConfig;
import org.apache.kafka.common.config.SaslConfigs;
import org.apache.kafka.common.config.TopicConfig;
import org.apache.kafka.common.metrics.KafkaMetric;
import org.apache.kafka.common.metrics.MetricsReporter;
import org.apache.kafka.common.protocol.ApiKeys;
import org.apache.kafka.common.protocol.Errors;
@ -46,6 +48,7 @@ import java.util.Map; @@ -46,6 +48,7 @@ import java.util.Map;
import static java.util.Arrays.asList;
import static org.hamcrest.CoreMatchers.equalTo;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertThat;
public class StreamsKafkaClientTest {
@ -130,6 +133,17 @@ public class StreamsKafkaClientTest { @@ -130,6 +133,17 @@ public class StreamsKafkaClientTest {
verifyCorrectTopicConfigs(streamsKafkaClient, topicConfigWithNoOverrides, Collections.singletonMap("cleanup.policy", "delete"));
}
@Test
public void metricsShouldBeTaggedWithClientId() {
config.put(StreamsConfig.CLIENT_ID_CONFIG, "some_client_id");
config.put(StreamsConfig.METRIC_REPORTER_CLASSES_CONFIG, TestMetricsReporter.class.getName());
StreamsKafkaClient.create(new StreamsConfig(config));
assertFalse(TestMetricsReporter.METRICS.isEmpty());
for (KafkaMetric kafkaMetric : TestMetricsReporter.METRICS.values()) {
assertEquals("some_client_id", kafkaMetric.metricName().tags().get("client-id"));
}
}
@Test(expected = StreamsException.class)
public void shouldThrowStreamsExceptionOnEmptyBrokerCompatibilityResponse() {
kafkaClient.prepareResponse(null);
@ -203,4 +217,34 @@ public class StreamsKafkaClientTest { @@ -203,4 +217,34 @@ public class StreamsKafkaClientTest {
kafkaClient,
reporters);
}
public static class TestMetricsReporter implements MetricsReporter {
static final Map<MetricName, KafkaMetric> METRICS = new HashMap<>();
@Override
public void configure(final Map<String, ?> configs) { }
@Override
public void init(final List<KafkaMetric> metrics) {
for (final KafkaMetric metric : metrics) {
metricChange(metric);
}
}
@Override
public void metricChange(final KafkaMetric metric) {
METRICS.put(metric.metricName(), metric);
}
@Override
public void metricRemoval(final KafkaMetric metric) {
METRICS.remove(metric.metricName());
}
@Override
public void close() {
METRICS.clear();
}
}
}

Loading…
Cancel
Save