From 83823aedf2e2d176004402152b69bd67f97d8e12 Mon Sep 17 00:00:00 2001 From: cadonna Date: Wed, 8 May 2019 17:26:25 +0200 Subject: [PATCH] BUGFIX: Add missing recording of close of stand-by task (#6663) Adds recording of close of a stand-by task to the task-closed metric Adds unit tests to verify the recording Reviewers: Guozhang Wang , John Roesler --- .../processor/internals/StandbyTask.java | 4 + .../processor/internals/StandbyTaskTest.java | 175 ++++++++++++++++-- 2 files changed, 159 insertions(+), 20 deletions(-) diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StandbyTask.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StandbyTask.java index 749b2ed6cbf..424ded4381c 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StandbyTask.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StandbyTask.java @@ -19,6 +19,7 @@ package org.apache.kafka.streams.processor.internals; import org.apache.kafka.clients.consumer.Consumer; import org.apache.kafka.clients.consumer.ConsumerRecord; import org.apache.kafka.common.TopicPartition; +import org.apache.kafka.common.metrics.Sensor; import org.apache.kafka.streams.StreamsConfig; import org.apache.kafka.streams.StreamsMetrics; import org.apache.kafka.streams.processor.TaskId; @@ -37,6 +38,7 @@ import java.util.Map; public class StandbyTask extends AbstractTask { private Map checkpointedOffsets = new HashMap<>(); + private final Sensor closeTaskSensor; /** * Create {@link StandbyTask} with its assigned partitions @@ -59,6 +61,7 @@ public class StandbyTask extends AbstractTask { final StateDirectory stateDirectory) { super(id, partitions, topology, consumer, changelogReader, true, stateDirectory, config); + closeTaskSensor = metrics.threadLevelSensor("task-closed", Sensor.RecordingLevel.INFO); processorContext = new StandbyContextImpl(id, config, stateMgr, metrics); } @@ -132,6 +135,7 @@ public class StandbyTask extends AbstractTask { @Override public void close(final boolean clean, final boolean isZombie) { + closeTaskSensor.record(); if (!taskInitialized) { return; } diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StandbyTaskTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StandbyTaskTest.java index 6e7655a925f..8c8811d37d9 100644 --- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StandbyTaskTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StandbyTaskTest.java @@ -20,10 +20,14 @@ import org.apache.kafka.clients.consumer.ConsumerRecord; import org.apache.kafka.clients.consumer.MockConsumer; import org.apache.kafka.clients.consumer.OffsetAndMetadata; import org.apache.kafka.clients.consumer.OffsetResetStrategy; +import org.apache.kafka.common.MetricName; import org.apache.kafka.common.Node; import org.apache.kafka.common.PartitionInfo; import org.apache.kafka.common.TopicPartition; +import org.apache.kafka.common.metrics.KafkaMetric; import org.apache.kafka.common.metrics.Metrics; +import org.apache.kafka.common.metrics.Sensor; +import org.apache.kafka.common.metrics.stats.Total; import org.apache.kafka.common.record.TimestampType; import org.apache.kafka.common.serialization.IntegerDeserializer; import org.apache.kafka.common.serialization.IntegerSerializer; @@ -45,6 +49,7 @@ import org.apache.kafka.streams.kstream.internals.InternalStreamsBuilder; import org.apache.kafka.streams.kstream.internals.InternalStreamsBuilderTest; import org.apache.kafka.streams.kstream.internals.TimeWindow; import org.apache.kafka.streams.processor.TaskId; +import org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl; import org.apache.kafka.streams.state.KeyValueIterator; import org.apache.kafka.streams.state.WindowStore; import org.apache.kafka.streams.state.internals.OffsetCheckpoint; @@ -136,7 +141,10 @@ public class StandbyTaskTest { } private final MockConsumer consumer = new MockConsumer<>(OffsetResetStrategy.EARLIEST); - private final MockRestoreConsumer restoreStateConsumer = new MockRestoreConsumer<>(new IntegerSerializer(), new IntegerSerializer()); + private final MockRestoreConsumer restoreStateConsumer = new MockRestoreConsumer<>( + new IntegerSerializer(), + new IntegerSerializer() + ); private final StoreChangelogReader changelogReader = new StoreChangelogReader( restoreStateConsumer, Duration.ZERO, @@ -147,6 +155,9 @@ public class StandbyTaskTest { private final byte[] recordValue = intSerializer.serialize(null, 10); private final byte[] recordKey = intSerializer.serialize(null, 1); + private final String threadName = "threadName"; + private final StreamsMetricsImpl streamsMetrics = new StreamsMetricsImpl(new Metrics(), threadName); + @Before public void setup() throws Exception { restoreStateConsumer.reset(); @@ -177,7 +188,14 @@ public class StandbyTaskTest { @Test public void testStorePartitions() throws IOException { final StreamsConfig config = createConfig(baseDir); - task = new StandbyTask(taskId, topicPartitions, topology, consumer, changelogReader, config, null, stateDirectory); + task = new StandbyTask(taskId, + topicPartitions, + topology, + consumer, + changelogReader, + config, + streamsMetrics, + stateDirectory); task.initializeStateStores(); assertEquals(Utils.mkSet(partition2, partition1), new HashSet<>(task.checkpointedOffsets().keySet())); } @@ -186,13 +204,31 @@ public class StandbyTaskTest { @Test public void testUpdateNonInitializedStore() throws IOException { final StreamsConfig config = createConfig(baseDir); - task = new StandbyTask(taskId, topicPartitions, topology, consumer, changelogReader, config, null, stateDirectory); + task = new StandbyTask(taskId, + topicPartitions, + topology, + consumer, + changelogReader, + config, + streamsMetrics, + stateDirectory); restoreStateConsumer.assign(new ArrayList<>(task.checkpointedOffsets().keySet())); try { task.update(partition1, - singletonList(new ConsumerRecord<>(partition1.topic(), partition1.partition(), 10, 0L, TimestampType.CREATE_TIME, 0L, 0, 0, recordKey, recordValue)) + singletonList( + new ConsumerRecord<>( + partition1.topic(), + partition1.partition(), + 10, + 0L, + TimestampType.CREATE_TIME, + 0L, + 0, + 0, + recordKey, + recordValue)) ); fail("expected an exception"); } catch (final NullPointerException npe) { @@ -204,15 +240,48 @@ public class StandbyTaskTest { @Test public void testUpdate() throws IOException { final StreamsConfig config = createConfig(baseDir); - task = new StandbyTask(taskId, topicPartitions, topology, consumer, changelogReader, config, null, stateDirectory); + task = new StandbyTask(taskId, + topicPartitions, + topology, + consumer, + changelogReader, + config, + streamsMetrics, + stateDirectory); task.initializeStateStores(); final Set partition = Collections.singleton(partition2); restoreStateConsumer.assign(partition); - for (final ConsumerRecord record : asList( - new ConsumerRecord<>(partition2.topic(), partition2.partition(), 10, 0L, TimestampType.CREATE_TIME, 0L, 0, 0, 1, 100), - new ConsumerRecord<>(partition2.topic(), partition2.partition(), 20, 0L, TimestampType.CREATE_TIME, 0L, 0, 0, 2, 100), - new ConsumerRecord<>(partition2.topic(), partition2.partition(), 30, 0L, TimestampType.CREATE_TIME, 0L, 0, 0, 3, 100))) { + for (final ConsumerRecord record : asList(new ConsumerRecord<>(partition2.topic(), + partition2.partition(), + 10, + 0L, + TimestampType.CREATE_TIME, + 0L, + 0, + 0, + 1, + 100), + new ConsumerRecord<>(partition2.topic(), + partition2.partition(), + 20, + 0L, + TimestampType.CREATE_TIME, + 0L, + 0, + 0, + 2, + 100), + new ConsumerRecord<>(partition2.topic(), + partition2.partition(), + 30, + 0L, + TimestampType.CREATE_TIME, + 0L, + 0, + 0, + 3, + 100))) { restoreStateConsumer.bufferRecord(record); } @@ -380,7 +449,8 @@ public class StandbyTaskTest { } @SuppressWarnings("unchecked") - private List, Long>> getWindowedStoreContents(final String storeName, final StandbyTask task) { + private List, Long>> getWindowedStoreContents(final String storeName, + final StandbyTask task) { final StandbyContextImpl context = (StandbyContextImpl) task.context(); final List, Long>> result = new ArrayList<>(); @@ -410,7 +480,7 @@ public class StandbyTaskTest { consumer, changelogReader, createConfig(baseDir), - null, + streamsMetrics, stateDirectory ); task.initializeStateStores(); @@ -464,7 +534,9 @@ public class StandbyTaskTest { assertEquals(emptyList(), remaining); } - private ConsumerRecord makeConsumerRecord(final TopicPartition topicPartition, final long offset, final int key) { + private ConsumerRecord makeConsumerRecord(final TopicPartition topicPartition, + final long offset, + final int key) { final IntegerSerializer integerSerializer = new IntegerSerializer(); return new ConsumerRecord<>( topicPartition.topic(), @@ -491,7 +563,8 @@ public class StandbyTaskTest { @Test public void shouldInitializeWindowStoreWithoutException() throws IOException { final InternalStreamsBuilder builder = new InternalStreamsBuilder(new InternalTopologyBuilder()); - builder.stream(Collections.singleton("topic"), new ConsumedInternal<>()).groupByKey().windowedBy(TimeWindows.of(ofMillis(100))).count(); + builder.stream(Collections.singleton("topic"), + new ConsumedInternal<>()).groupByKey().windowedBy(TimeWindows.of(ofMillis(100))).count(); initializeStandbyStores(builder); } @@ -522,7 +595,8 @@ public class StandbyTaskTest { public void shouldCheckpointStoreOffsetsOnCommit() throws IOException { consumer.assign(Collections.singletonList(globalTopicPartition)); final Map committedOffsets = new HashMap<>(); - committedOffsets.put(new TopicPartition(globalTopicPartition.topic(), globalTopicPartition.partition()), new OffsetAndMetadata(100L)); + committedOffsets.put(new TopicPartition(globalTopicPartition.topic(), globalTopicPartition.partition()), + new OffsetAndMetadata(100L)); consumer.commitSync(committedOffsets); restoreStateConsumer.updatePartitions( @@ -540,7 +614,7 @@ public class StandbyTaskTest { consumer, changelogReader, config, - null, + streamsMetrics, stateDirectory ); task.initializeStateStores(); @@ -550,9 +624,11 @@ public class StandbyTaskTest { final byte[] serializedValue = Serdes.Integer().serializer().serialize("", 1); task.update( globalTopicPartition, - singletonList( - new ConsumerRecord<>(globalTopicPartition.topic(), globalTopicPartition.partition(), 50L, serializedValue, serializedValue) - ) + singletonList(new ConsumerRecord<>(globalTopicPartition.topic(), + globalTopicPartition.partition(), + 50L, + serializedValue, + serializedValue)) ); time.sleep(config.getLong(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG)); @@ -569,7 +645,8 @@ public class StandbyTaskTest { public void shouldCloseStateMangerOnTaskCloseWhenCommitFailed() throws Exception { consumer.assign(Collections.singletonList(globalTopicPartition)); final Map committedOffsets = new HashMap<>(); - committedOffsets.put(new TopicPartition(globalTopicPartition.topic(), globalTopicPartition.partition()), new OffsetAndMetadata(100L)); + committedOffsets.put(new TopicPartition(globalTopicPartition.topic(), globalTopicPartition.partition()), + new OffsetAndMetadata(100L)); consumer.commitSync(committedOffsets); restoreStateConsumer.updatePartitions( @@ -586,7 +663,7 @@ public class StandbyTaskTest { consumer, changelogReader, config, - null, + streamsMetrics, stateDirectory ) { @Override @@ -610,4 +687,62 @@ public class StandbyTaskTest { assertTrue(closedStateManager.get()); } + private MetricName setupCloseTaskMetric() { + final MetricName metricName = new MetricName("name", "group", "description", Collections.emptyMap()); + final Sensor sensor = streamsMetrics.threadLevelSensor("task-closed", Sensor.RecordingLevel.INFO); + sensor.add(metricName, new Total()); + return metricName; + } + + private void verifyCloseTaskMetric(final double expected, + final StreamsMetricsImpl streamsMetrics, + final MetricName metricName) { + final KafkaMetric metric = (KafkaMetric) streamsMetrics.metrics().get(metricName); + final double totalCloses = metric.measurable().measure(metric.config(), System.currentTimeMillis()); + assertThat(totalCloses, equalTo(expected)); + } + + @Test + public void shouldRecordTaskClosedMetricOnClose() throws IOException { + final MetricName metricName = setupCloseTaskMetric(); + final StandbyTask task = new StandbyTask( + taskId, + ktablePartitions, + ktableTopology, + consumer, + changelogReader, + createConfig(baseDir), + streamsMetrics, + stateDirectory + ); + + final boolean clean = true; + final boolean isZombie = false; + task.close(clean, isZombie); + + final double expectedCloseTaskMetric = 1.0; + verifyCloseTaskMetric(expectedCloseTaskMetric, streamsMetrics, metricName); + } + + @Test + public void shouldRecordTaskClosedMetricOnCloseSuspended() throws IOException { + final MetricName metricName = setupCloseTaskMetric(); + final StandbyTask task = new StandbyTask( + taskId, + ktablePartitions, + ktableTopology, + consumer, + changelogReader, + createConfig(baseDir), + streamsMetrics, + stateDirectory + ); + + final boolean clean = true; + final boolean isZombie = false; + task.closeSuspended(clean, isZombie, new RuntimeException()); + + final double expectedCloseTaskMetric = 1.0; + verifyCloseTaskMetric(expectedCloseTaskMetric, streamsMetrics, metricName); + } }