From df9ea618a397955806fc7f1fba7003492046c77a Mon Sep 17 00:00:00 2001 From: cadonna Date: Wed, 12 Jun 2019 23:10:39 +0200 Subject: [PATCH] KAFKA-8262, KAFKA-8263: Fix flaky test `MetricsIntegrationTest` (#6922) - Timeout occurred due to initial slow rebalancing. - Added code to wait until `KafkaStreams` instance is in state RUNNING to check registration of metrics and in state NOT_RUNNING to check deregistration of metrics. - I removed all other wait conditions, because they are not needed if `KafkaStreams` instance is in the right state. Reviewers: Guozhang Wang --- .../integration/MetricsIntegrationTest.java | 456 ++++++++---------- 1 file changed, 205 insertions(+), 251 deletions(-) diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/MetricsIntegrationTest.java b/streams/src/test/java/org/apache/kafka/streams/integration/MetricsIntegrationTest.java index bc4d89535e0..a3c5b8753c2 100644 --- a/streams/src/test/java/org/apache/kafka/streams/integration/MetricsIntegrationTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/integration/MetricsIntegrationTest.java @@ -21,6 +21,7 @@ import org.apache.kafka.common.metrics.Sensor; import org.apache.kafka.common.serialization.Serdes; import org.apache.kafka.common.utils.Bytes; import org.apache.kafka.streams.KafkaStreams; +import org.apache.kafka.streams.KafkaStreams.State; import org.apache.kafka.streams.StreamsBuilder; import org.apache.kafka.streams.StreamsConfig; import org.apache.kafka.streams.integration.utils.EmbeddedKafkaCluster; @@ -50,6 +51,9 @@ import java.util.List; import java.util.Properties; import java.util.stream.Collectors; +import static org.hamcrest.CoreMatchers.is; +import static org.hamcrest.MatcherAssert.assertThat; + @SuppressWarnings("unchecked") @Category({IntegrationTest.class}) public class MetricsIntegrationTest { @@ -187,27 +191,35 @@ public class MetricsIntegrationTest { CLUSTER.deleteTopics(STREAM_INPUT, STREAM_OUTPUT_1, STREAM_OUTPUT_2, STREAM_OUTPUT_3, STREAM_OUTPUT_4); } - private void startApplication() { + private void startApplication() throws InterruptedException { kafkaStreams = new KafkaStreams(builder.build(), streamsConfiguration); kafkaStreams.start(); + final long timeout = 60000; + TestUtils.waitForCondition( + () -> kafkaStreams.state() == State.RUNNING, + timeout, + () -> "Kafka Streams application did not reach state RUNNING in " + timeout + " ms"); } private void closeApplication() throws Exception { kafkaStreams.close(); kafkaStreams.cleanUp(); IntegrationTestUtils.purgeLocalStreamsState(streamsConfiguration); + final long timeout = 60000; + TestUtils.waitForCondition( + () -> kafkaStreams.state() == State.NOT_RUNNING, + timeout, + () -> "Kafka Streams application did not reach state NOT_RUNNING in " + timeout + " ms"); } - private void checkMetricDeregistration() throws InterruptedException { - TestUtils.waitForCondition(() -> { - final List listMetricAfterClosingApp = new ArrayList(kafkaStreams.metrics().values()).stream().filter(m -> m.metricName().group().contains(STREAM_STRING)).collect(Collectors.toList()); - return listMetricAfterClosingApp.size() == 0; - }, 10000, "de-registration of metrics"); + private void checkMetricDeregistration() { + final List listMetricAfterClosingApp = new ArrayList(kafkaStreams.metrics().values()).stream() + .filter(m -> m.metricName().group().contains(STREAM_STRING)).collect(Collectors.toList()); + assertThat(listMetricAfterClosingApp.size(), is(0)); } @Test public void testStreamMetric() throws Exception { - final StringBuilder errorMessage = new StringBuilder(); stream = builder.stream(STREAM_INPUT, Consumed.with(Serdes.Integer(), Serdes.String())); stream.to(STREAM_OUTPUT_1, Produced.with(Serdes.Integer(), Serdes.String())); builder.table(STREAM_OUTPUT_1, Materialized.as(Stores.inMemoryKeyValueStore(MY_STORE_IN_MEMORY)).withCachingEnabled()) @@ -222,22 +234,13 @@ public class MetricsIntegrationTest { startApplication(); - // metric level : Thread - TestUtils.waitForCondition(() -> testThreadMetric(errorMessage), 10000, () -> "testThreadMetric -> " + errorMessage.toString()); - - // metric level : Task - TestUtils.waitForCondition(() -> testTaskMetric(errorMessage), 10000, () -> "testTaskMetric -> " + errorMessage.toString()); - - // metric level : Processor - TestUtils.waitForCondition(() -> testProcessorMetric(errorMessage), 10000, () -> "testProcessorMetric -> " + errorMessage.toString()); - - // metric level : Store (in-memory-state, in-memory-lru-state, rocksdb-state) - TestUtils.waitForCondition(() -> testStoreMetricKeyValueByType(STREAM_STORE_IN_MEMORY_STATE_METRICS, errorMessage), 10000, () -> "testStoreMetricKeyValueByType:" + STREAM_STORE_IN_MEMORY_STATE_METRICS + " -> " + errorMessage.toString()); - TestUtils.waitForCondition(() -> testStoreMetricKeyValueByType(STREAM_STORE_IN_MEMORY_LRU_STATE_METRICS, errorMessage), 10000, () -> "testStoreMetricKeyValueByType:" + STREAM_STORE_IN_MEMORY_LRU_STATE_METRICS + " -> " + errorMessage.toString()); - TestUtils.waitForCondition(() -> testStoreMetricKeyValueByType(STREAM_STORE_ROCKSDB_STATE_METRICS, errorMessage), 10000, () -> "testStoreMetricKeyValueByType:" + STREAM_STORE_ROCKSDB_STATE_METRICS + " -> " + errorMessage.toString()); - - //metric level : Cache - TestUtils.waitForCondition(() -> testCacheMetric(errorMessage), 10000, () -> "testCacheMetric -> " + errorMessage.toString()); + checkThreadLevelMetrics(); + checkTaskLevelMetrics(); + checkProcessorLevelMetrics(); + checkKeyValueStoreMetricsByType(STREAM_STORE_IN_MEMORY_STATE_METRICS); + checkKeyValueStoreMetricsByType(STREAM_STORE_IN_MEMORY_LRU_STATE_METRICS); + checkKeyValueStoreMetricsByType(STREAM_STORE_ROCKSDB_STATE_METRICS); + checkCacheMetrics(); closeApplication(); @@ -247,7 +250,6 @@ public class MetricsIntegrationTest { @Test public void testStreamMetricOfWindowStore() throws Exception { - final StringBuilder errorMessage = new StringBuilder(); stream2 = builder.stream(STREAM_INPUT, Consumed.with(Serdes.Integer(), Serdes.String())); final KGroupedStream groupedStream = stream2.groupByKey(); groupedStream.windowedBy(TimeWindows.of(Duration.ofMillis(50))) @@ -257,8 +259,7 @@ public class MetricsIntegrationTest { startApplication(); - // metric level : Store (window) - TestUtils.waitForCondition(() -> testStoreMetricWindow(errorMessage), 10000, () -> "testStoreMetricWindow -> " + errorMessage.toString()); + checkWindowStoreMetrics(); closeApplication(); @@ -268,7 +269,6 @@ public class MetricsIntegrationTest { @Test public void testStreamMetricOfSessionStore() throws Exception { - final StringBuilder errorMessage = new StringBuilder(); stream2 = builder.stream(STREAM_INPUT, Consumed.with(Serdes.Integer(), Serdes.String())); final KGroupedStream groupedStream = stream2.groupByKey(); groupedStream.windowedBy(SessionWindows.with(Duration.ofMillis(50))) @@ -278,8 +278,7 @@ public class MetricsIntegrationTest { startApplication(); - // metric level : Store (session) - TestUtils.waitForCondition(() -> testStoreMetricSession(errorMessage), 10000, () -> "testStoreMetricSession -> " + errorMessage.toString()); + checkSessionStoreMetrics(); closeApplication(); @@ -287,240 +286,195 @@ public class MetricsIntegrationTest { checkMetricDeregistration(); } - private boolean testThreadMetric(final StringBuilder errorMessage) { - errorMessage.setLength(0); - try { - final List listMetricThread = new ArrayList(kafkaStreams.metrics().values()).stream().filter(m -> m.metricName().group().equals(STREAM_THREAD_NODE_METRICS)).collect(Collectors.toList()); - testMetricByName(listMetricThread, COMMIT_LATENCY_AVG, 1); - testMetricByName(listMetricThread, COMMIT_LATENCY_MAX, 1); - testMetricByName(listMetricThread, POLL_LATENCY_AVG, 1); - testMetricByName(listMetricThread, POLL_LATENCY_MAX, 1); - testMetricByName(listMetricThread, PROCESS_LATENCY_AVG, 1); - testMetricByName(listMetricThread, PROCESS_LATENCY_MAX, 1); - testMetricByName(listMetricThread, PUNCTUATE_LATENCY_AVG, 1); - testMetricByName(listMetricThread, PUNCTUATE_LATENCY_MAX, 1); - testMetricByName(listMetricThread, COMMIT_RATE, 1); - testMetricByName(listMetricThread, COMMIT_TOTAL, 1); - testMetricByName(listMetricThread, POLL_RATE, 1); - testMetricByName(listMetricThread, POLL_TOTAL, 1); - testMetricByName(listMetricThread, PROCESS_RATE, 1); - testMetricByName(listMetricThread, PROCESS_TOTAL, 1); - testMetricByName(listMetricThread, PUNCTUATE_RATE, 1); - testMetricByName(listMetricThread, PUNCTUATE_TOTAL, 1); - testMetricByName(listMetricThread, TASK_CREATED_RATE, 1); - testMetricByName(listMetricThread, TASK_CREATED_TOTAL, 1); - testMetricByName(listMetricThread, TASK_CLOSED_RATE, 1); - testMetricByName(listMetricThread, TASK_CLOSED_TOTAL, 1); - testMetricByName(listMetricThread, SKIPPED_RECORDS_RATE, 1); - testMetricByName(listMetricThread, SKIPPED_RECORDS_TOTAL, 1); - return true; - } catch (final Throwable e) { - errorMessage.append(e.getMessage()); - return false; - } + private void checkTaskLevelMetrics() { + final List listMetricTask = new ArrayList(kafkaStreams.metrics().values()).stream() + .filter(m -> m.metricName().group().equals(STREAM_TASK_NODE_METRICS)).collect(Collectors.toList()); + testMetricByName(listMetricTask, COMMIT_LATENCY_AVG, 5); + testMetricByName(listMetricTask, COMMIT_LATENCY_MAX, 5); + testMetricByName(listMetricTask, COMMIT_RATE, 5); + testMetricByName(listMetricTask, COMMIT_TOTAL, 5); + testMetricByName(listMetricTask, RECORD_LATENESS_AVG, 4); + testMetricByName(listMetricTask, RECORD_LATENESS_MAX, 4); } - private boolean testTaskMetric(final StringBuilder errorMessage) { - errorMessage.setLength(0); - try { - final List listMetricTask = new ArrayList(kafkaStreams.metrics().values()).stream().filter(m -> m.metricName().group().equals(STREAM_TASK_NODE_METRICS)).collect(Collectors.toList()); - testMetricByName(listMetricTask, COMMIT_LATENCY_AVG, 5); - testMetricByName(listMetricTask, COMMIT_LATENCY_MAX, 5); - testMetricByName(listMetricTask, COMMIT_RATE, 5); - testMetricByName(listMetricTask, COMMIT_TOTAL, 5); - testMetricByName(listMetricTask, RECORD_LATENESS_AVG, 4); - testMetricByName(listMetricTask, RECORD_LATENESS_MAX, 4); - return true; - } catch (final Throwable e) { - errorMessage.append(e.getMessage()); - return false; - } + private void checkThreadLevelMetrics() { + final List listMetricThread = new ArrayList(kafkaStreams.metrics().values()).stream() + .filter(m -> m.metricName().group().equals(STREAM_THREAD_NODE_METRICS)).collect(Collectors.toList()); + testMetricByName(listMetricThread, COMMIT_LATENCY_AVG, 1); + testMetricByName(listMetricThread, COMMIT_LATENCY_MAX, 1); + testMetricByName(listMetricThread, POLL_LATENCY_AVG, 1); + testMetricByName(listMetricThread, POLL_LATENCY_MAX, 1); + testMetricByName(listMetricThread, PROCESS_LATENCY_AVG, 1); + testMetricByName(listMetricThread, PROCESS_LATENCY_MAX, 1); + testMetricByName(listMetricThread, PUNCTUATE_LATENCY_AVG, 1); + testMetricByName(listMetricThread, PUNCTUATE_LATENCY_MAX, 1); + testMetricByName(listMetricThread, COMMIT_RATE, 1); + testMetricByName(listMetricThread, COMMIT_TOTAL, 1); + testMetricByName(listMetricThread, POLL_RATE, 1); + testMetricByName(listMetricThread, POLL_TOTAL, 1); + testMetricByName(listMetricThread, PROCESS_RATE, 1); + testMetricByName(listMetricThread, PROCESS_TOTAL, 1); + testMetricByName(listMetricThread, PUNCTUATE_RATE, 1); + testMetricByName(listMetricThread, PUNCTUATE_TOTAL, 1); + testMetricByName(listMetricThread, TASK_CREATED_RATE, 1); + testMetricByName(listMetricThread, TASK_CREATED_TOTAL, 1); + testMetricByName(listMetricThread, TASK_CLOSED_RATE, 1); + testMetricByName(listMetricThread, TASK_CLOSED_TOTAL, 1); + testMetricByName(listMetricThread, SKIPPED_RECORDS_RATE, 1); + testMetricByName(listMetricThread, SKIPPED_RECORDS_TOTAL, 1); } - private boolean testProcessorMetric(final StringBuilder errorMessage) { - errorMessage.setLength(0); - try { - final List listMetricProcessor = new ArrayList(kafkaStreams.metrics().values()).stream().filter(m -> m.metricName().group().equals(STREAM_PROCESSOR_NODE_METRICS)).collect(Collectors.toList()); - testMetricByName(listMetricProcessor, PROCESS_LATENCY_AVG, 18); - testMetricByName(listMetricProcessor, PROCESS_LATENCY_MAX, 18); - testMetricByName(listMetricProcessor, PUNCTUATE_LATENCY_AVG, 18); - testMetricByName(listMetricProcessor, PUNCTUATE_LATENCY_MAX, 18); - testMetricByName(listMetricProcessor, CREATE_LATENCY_AVG, 18); - testMetricByName(listMetricProcessor, CREATE_LATENCY_MAX, 18); - testMetricByName(listMetricProcessor, DESTROY_LATENCY_AVG, 18); - testMetricByName(listMetricProcessor, DESTROY_LATENCY_MAX, 18); - testMetricByName(listMetricProcessor, PROCESS_RATE, 18); - testMetricByName(listMetricProcessor, PROCESS_TOTAL, 18); - testMetricByName(listMetricProcessor, PUNCTUATE_RATE, 18); - testMetricByName(listMetricProcessor, PUNCTUATE_TOTAL, 18); - testMetricByName(listMetricProcessor, CREATE_RATE, 18); - testMetricByName(listMetricProcessor, CREATE_TOTAL, 18); - testMetricByName(listMetricProcessor, DESTROY_RATE, 18); - testMetricByName(listMetricProcessor, DESTROY_TOTAL, 18); - testMetricByName(listMetricProcessor, FORWARD_TOTAL, 18); - return true; - } catch (final Throwable e) { - errorMessage.append(e.getMessage()); - return false; - } + private void checkProcessorLevelMetrics() { + final List listMetricProcessor = new ArrayList(kafkaStreams.metrics().values()).stream() + .filter(m -> m.metricName().group().equals(STREAM_PROCESSOR_NODE_METRICS)).collect(Collectors.toList()); + testMetricByName(listMetricProcessor, PROCESS_LATENCY_AVG, 18); + testMetricByName(listMetricProcessor, PROCESS_LATENCY_MAX, 18); + testMetricByName(listMetricProcessor, PUNCTUATE_LATENCY_AVG, 18); + testMetricByName(listMetricProcessor, PUNCTUATE_LATENCY_MAX, 18); + testMetricByName(listMetricProcessor, CREATE_LATENCY_AVG, 18); + testMetricByName(listMetricProcessor, CREATE_LATENCY_MAX, 18); + testMetricByName(listMetricProcessor, DESTROY_LATENCY_AVG, 18); + testMetricByName(listMetricProcessor, DESTROY_LATENCY_MAX, 18); + testMetricByName(listMetricProcessor, PROCESS_RATE, 18); + testMetricByName(listMetricProcessor, PROCESS_TOTAL, 18); + testMetricByName(listMetricProcessor, PUNCTUATE_RATE, 18); + testMetricByName(listMetricProcessor, PUNCTUATE_TOTAL, 18); + testMetricByName(listMetricProcessor, CREATE_RATE, 18); + testMetricByName(listMetricProcessor, CREATE_TOTAL, 18); + testMetricByName(listMetricProcessor, DESTROY_RATE, 18); + testMetricByName(listMetricProcessor, DESTROY_TOTAL, 18); + testMetricByName(listMetricProcessor, FORWARD_TOTAL, 18); } - private boolean testStoreMetricWindow(final StringBuilder errorMessage) { - errorMessage.setLength(0); - try { - final List listMetricStore = new ArrayList(kafkaStreams.metrics().values()).stream() - .filter(m -> m.metricName().group().equals(STREAM_STORE_WINDOW_ROCKSDB_STATE_METRICS)) - .collect(Collectors.toList()); - testMetricByName(listMetricStore, PUT_LATENCY_AVG, 2); - testMetricByName(listMetricStore, PUT_LATENCY_MAX, 2); - testMetricByName(listMetricStore, PUT_IF_ABSENT_LATENCY_AVG, 0); - testMetricByName(listMetricStore, PUT_IF_ABSENT_LATENCY_MAX, 0); - testMetricByName(listMetricStore, GET_LATENCY_AVG, 0); - testMetricByName(listMetricStore, GET_LATENCY_MAX, 0); - testMetricByName(listMetricStore, DELETE_LATENCY_AVG, 0); - testMetricByName(listMetricStore, DELETE_LATENCY_MAX, 0); - testMetricByName(listMetricStore, PUT_ALL_LATENCY_AVG, 0); - testMetricByName(listMetricStore, PUT_ALL_LATENCY_MAX, 0); - testMetricByName(listMetricStore, ALL_LATENCY_AVG, 0); - testMetricByName(listMetricStore, ALL_LATENCY_MAX, 0); - testMetricByName(listMetricStore, RANGE_LATENCY_AVG, 0); - testMetricByName(listMetricStore, RANGE_LATENCY_MAX, 0); - testMetricByName(listMetricStore, FLUSH_LATENCY_AVG, 2); - testMetricByName(listMetricStore, FLUSH_LATENCY_MAX, 2); - testMetricByName(listMetricStore, RESTORE_LATENCY_AVG, 2); - testMetricByName(listMetricStore, RESTORE_LATENCY_MAX, 2); - testMetricByName(listMetricStore, PUT_RATE, 2); - testMetricByName(listMetricStore, PUT_TOTAL, 2); - testMetricByName(listMetricStore, PUT_IF_ABSENT_RATE, 0); - testMetricByName(listMetricStore, PUT_IF_ABSENT_TOTAL, 0); - testMetricByName(listMetricStore, GET_RATE, 0); - testMetricByName(listMetricStore, DELETE_RATE, 0); - testMetricByName(listMetricStore, DELETE_TOTAL, 0); - testMetricByName(listMetricStore, PUT_ALL_RATE, 0); - testMetricByName(listMetricStore, PUT_ALL_TOTAL, 0); - testMetricByName(listMetricStore, ALL_RATE, 0); - testMetricByName(listMetricStore, ALL_TOTAL, 0); - testMetricByName(listMetricStore, RANGE_RATE, 0); - testMetricByName(listMetricStore, RANGE_TOTAL, 0); - testMetricByName(listMetricStore, FLUSH_RATE, 2); - testMetricByName(listMetricStore, FLUSH_TOTAL, 2); - testMetricByName(listMetricStore, RESTORE_RATE, 2); - testMetricByName(listMetricStore, RESTORE_TOTAL, 2); - return true; - } catch (final Throwable e) { - errorMessage.append(e.getMessage()); - return false; - } + private void checkKeyValueStoreMetricsByType(final String storeType) { + final List listMetricStore = new ArrayList(kafkaStreams.metrics().values()).stream() + .filter(m -> m.metricName().group().equals(storeType)) + .collect(Collectors.toList()); + testMetricByName(listMetricStore, PUT_LATENCY_AVG, 2); + testMetricByName(listMetricStore, PUT_LATENCY_MAX, 2); + testMetricByName(listMetricStore, PUT_IF_ABSENT_LATENCY_AVG, 2); + testMetricByName(listMetricStore, PUT_IF_ABSENT_LATENCY_MAX, 2); + testMetricByName(listMetricStore, GET_LATENCY_AVG, 2); + testMetricByName(listMetricStore, GET_LATENCY_MAX, 2); + testMetricByName(listMetricStore, DELETE_LATENCY_AVG, 2); + testMetricByName(listMetricStore, DELETE_LATENCY_MAX, 2); + testMetricByName(listMetricStore, PUT_ALL_LATENCY_AVG, 2); + testMetricByName(listMetricStore, PUT_ALL_LATENCY_MAX, 2); + testMetricByName(listMetricStore, ALL_LATENCY_AVG, 2); + testMetricByName(listMetricStore, ALL_LATENCY_MAX, 2); + testMetricByName(listMetricStore, RANGE_LATENCY_AVG, 2); + testMetricByName(listMetricStore, RANGE_LATENCY_MAX, 2); + testMetricByName(listMetricStore, FLUSH_LATENCY_AVG, 2); + testMetricByName(listMetricStore, FLUSH_LATENCY_MAX, 2); + testMetricByName(listMetricStore, RESTORE_LATENCY_AVG, 2); + testMetricByName(listMetricStore, RESTORE_LATENCY_MAX, 2); + testMetricByName(listMetricStore, PUT_RATE, 2); + testMetricByName(listMetricStore, PUT_TOTAL, 2); + testMetricByName(listMetricStore, PUT_IF_ABSENT_RATE, 2); + testMetricByName(listMetricStore, PUT_IF_ABSENT_TOTAL, 2); + testMetricByName(listMetricStore, GET_RATE, 2); + testMetricByName(listMetricStore, DELETE_RATE, 2); + testMetricByName(listMetricStore, DELETE_TOTAL, 2); + testMetricByName(listMetricStore, PUT_ALL_RATE, 2); + testMetricByName(listMetricStore, PUT_ALL_TOTAL, 2); + testMetricByName(listMetricStore, ALL_RATE, 2); + testMetricByName(listMetricStore, ALL_TOTAL, 2); + testMetricByName(listMetricStore, RANGE_RATE, 2); + testMetricByName(listMetricStore, RANGE_TOTAL, 2); + testMetricByName(listMetricStore, FLUSH_RATE, 2); + testMetricByName(listMetricStore, FLUSH_TOTAL, 2); + testMetricByName(listMetricStore, RESTORE_RATE, 2); + testMetricByName(listMetricStore, RESTORE_TOTAL, 2); } - private boolean testStoreMetricSession(final StringBuilder errorMessage) { - errorMessage.setLength(0); - try { - final List listMetricStore = new ArrayList(kafkaStreams.metrics().values()).stream() - .filter(m -> m.metricName().group().equals(STREAM_STORE_SESSION_ROCKSDB_STATE_METRICS)) - .collect(Collectors.toList()); - testMetricByName(listMetricStore, PUT_LATENCY_AVG, 2); - testMetricByName(listMetricStore, PUT_LATENCY_MAX, 2); - testMetricByName(listMetricStore, PUT_IF_ABSENT_LATENCY_AVG, 0); - testMetricByName(listMetricStore, PUT_IF_ABSENT_LATENCY_MAX, 0); - testMetricByName(listMetricStore, GET_LATENCY_AVG, 0); - testMetricByName(listMetricStore, GET_LATENCY_MAX, 0); - testMetricByName(listMetricStore, DELETE_LATENCY_AVG, 0); - testMetricByName(listMetricStore, DELETE_LATENCY_MAX, 0); - testMetricByName(listMetricStore, PUT_ALL_LATENCY_AVG, 0); - testMetricByName(listMetricStore, PUT_ALL_LATENCY_MAX, 0); - testMetricByName(listMetricStore, ALL_LATENCY_AVG, 0); - testMetricByName(listMetricStore, ALL_LATENCY_MAX, 0); - testMetricByName(listMetricStore, RANGE_LATENCY_AVG, 0); - testMetricByName(listMetricStore, RANGE_LATENCY_MAX, 0); - testMetricByName(listMetricStore, FLUSH_LATENCY_AVG, 2); - testMetricByName(listMetricStore, FLUSH_LATENCY_MAX, 2); - testMetricByName(listMetricStore, RESTORE_LATENCY_AVG, 2); - testMetricByName(listMetricStore, RESTORE_LATENCY_MAX, 2); - testMetricByName(listMetricStore, PUT_RATE, 2); - testMetricByName(listMetricStore, PUT_TOTAL, 2); - testMetricByName(listMetricStore, PUT_IF_ABSENT_RATE, 0); - testMetricByName(listMetricStore, PUT_IF_ABSENT_TOTAL, 0); - testMetricByName(listMetricStore, GET_RATE, 0); - testMetricByName(listMetricStore, DELETE_RATE, 0); - testMetricByName(listMetricStore, DELETE_TOTAL, 0); - testMetricByName(listMetricStore, PUT_ALL_RATE, 0); - testMetricByName(listMetricStore, PUT_ALL_TOTAL, 0); - testMetricByName(listMetricStore, ALL_RATE, 0); - testMetricByName(listMetricStore, ALL_TOTAL, 0); - testMetricByName(listMetricStore, RANGE_RATE, 0); - testMetricByName(listMetricStore, RANGE_TOTAL, 0); - testMetricByName(listMetricStore, FLUSH_RATE, 2); - testMetricByName(listMetricStore, FLUSH_TOTAL, 2); - testMetricByName(listMetricStore, RESTORE_RATE, 2); - testMetricByName(listMetricStore, RESTORE_TOTAL, 2); - return true; - } catch (final Throwable e) { - errorMessage.append(e.getMessage()); - return false; - } + private void checkCacheMetrics() { + final List listMetricCache = new ArrayList(kafkaStreams.metrics().values()).stream() + .filter(m -> m.metricName().group().equals(STREAM_CACHE_NODE_METRICS)).collect(Collectors.toList()); + testMetricByName(listMetricCache, HIT_RATIO_AVG, 6); + testMetricByName(listMetricCache, HIT_RATIO_MIN, 6); + testMetricByName(listMetricCache, HIT_RATIO_MAX, 6); } - private boolean testStoreMetricKeyValueByType(final String storeType, final StringBuilder errorMessage) { - errorMessage.setLength(0); - try { - final List listMetricStore = new ArrayList(kafkaStreams.metrics().values()).stream() - .filter(m -> m.metricName().group().equals(storeType)) - .collect(Collectors.toList()); - testMetricByName(listMetricStore, PUT_LATENCY_AVG, 2); - testMetricByName(listMetricStore, PUT_LATENCY_MAX, 2); - testMetricByName(listMetricStore, PUT_IF_ABSENT_LATENCY_AVG, 2); - testMetricByName(listMetricStore, PUT_IF_ABSENT_LATENCY_MAX, 2); - testMetricByName(listMetricStore, GET_LATENCY_AVG, 2); - testMetricByName(listMetricStore, GET_LATENCY_MAX, 2); - testMetricByName(listMetricStore, DELETE_LATENCY_AVG, 2); - testMetricByName(listMetricStore, DELETE_LATENCY_MAX, 2); - testMetricByName(listMetricStore, PUT_ALL_LATENCY_AVG, 2); - testMetricByName(listMetricStore, PUT_ALL_LATENCY_MAX, 2); - testMetricByName(listMetricStore, ALL_LATENCY_AVG, 2); - testMetricByName(listMetricStore, ALL_LATENCY_MAX, 2); - testMetricByName(listMetricStore, RANGE_LATENCY_AVG, 2); - testMetricByName(listMetricStore, RANGE_LATENCY_MAX, 2); - testMetricByName(listMetricStore, FLUSH_LATENCY_AVG, 2); - testMetricByName(listMetricStore, FLUSH_LATENCY_MAX, 2); - testMetricByName(listMetricStore, RESTORE_LATENCY_AVG, 2); - testMetricByName(listMetricStore, RESTORE_LATENCY_MAX, 2); - testMetricByName(listMetricStore, PUT_RATE, 2); - testMetricByName(listMetricStore, PUT_TOTAL, 2); - testMetricByName(listMetricStore, PUT_IF_ABSENT_RATE, 2); - testMetricByName(listMetricStore, PUT_IF_ABSENT_TOTAL, 2); - testMetricByName(listMetricStore, GET_RATE, 2); - testMetricByName(listMetricStore, DELETE_RATE, 2); - testMetricByName(listMetricStore, DELETE_TOTAL, 2); - testMetricByName(listMetricStore, PUT_ALL_RATE, 2); - testMetricByName(listMetricStore, PUT_ALL_TOTAL, 2); - testMetricByName(listMetricStore, ALL_RATE, 2); - testMetricByName(listMetricStore, ALL_TOTAL, 2); - testMetricByName(listMetricStore, RANGE_RATE, 2); - testMetricByName(listMetricStore, RANGE_TOTAL, 2); - testMetricByName(listMetricStore, FLUSH_RATE, 2); - testMetricByName(listMetricStore, FLUSH_TOTAL, 2); - testMetricByName(listMetricStore, RESTORE_RATE, 2); - testMetricByName(listMetricStore, RESTORE_TOTAL, 2); - return true; - } catch (final Throwable e) { - errorMessage.append(e.getMessage()); - return false; - } + private void checkWindowStoreMetrics() { + final List listMetricStore = new ArrayList(kafkaStreams.metrics().values()).stream() + .filter(m -> m.metricName().group().equals(STREAM_STORE_WINDOW_ROCKSDB_STATE_METRICS)) + .collect(Collectors.toList()); + testMetricByName(listMetricStore, PUT_LATENCY_AVG, 2); + testMetricByName(listMetricStore, PUT_LATENCY_MAX, 2); + testMetricByName(listMetricStore, PUT_IF_ABSENT_LATENCY_AVG, 0); + testMetricByName(listMetricStore, PUT_IF_ABSENT_LATENCY_MAX, 0); + testMetricByName(listMetricStore, GET_LATENCY_AVG, 0); + testMetricByName(listMetricStore, GET_LATENCY_MAX, 0); + testMetricByName(listMetricStore, DELETE_LATENCY_AVG, 0); + testMetricByName(listMetricStore, DELETE_LATENCY_MAX, 0); + testMetricByName(listMetricStore, PUT_ALL_LATENCY_AVG, 0); + testMetricByName(listMetricStore, PUT_ALL_LATENCY_MAX, 0); + testMetricByName(listMetricStore, ALL_LATENCY_AVG, 0); + testMetricByName(listMetricStore, ALL_LATENCY_MAX, 0); + testMetricByName(listMetricStore, RANGE_LATENCY_AVG, 0); + testMetricByName(listMetricStore, RANGE_LATENCY_MAX, 0); + testMetricByName(listMetricStore, FLUSH_LATENCY_AVG, 2); + testMetricByName(listMetricStore, FLUSH_LATENCY_MAX, 2); + testMetricByName(listMetricStore, RESTORE_LATENCY_AVG, 2); + testMetricByName(listMetricStore, RESTORE_LATENCY_MAX, 2); + testMetricByName(listMetricStore, PUT_RATE, 2); + testMetricByName(listMetricStore, PUT_TOTAL, 2); + testMetricByName(listMetricStore, PUT_IF_ABSENT_RATE, 0); + testMetricByName(listMetricStore, PUT_IF_ABSENT_TOTAL, 0); + testMetricByName(listMetricStore, GET_RATE, 0); + testMetricByName(listMetricStore, DELETE_RATE, 0); + testMetricByName(listMetricStore, DELETE_TOTAL, 0); + testMetricByName(listMetricStore, PUT_ALL_RATE, 0); + testMetricByName(listMetricStore, PUT_ALL_TOTAL, 0); + testMetricByName(listMetricStore, ALL_RATE, 0); + testMetricByName(listMetricStore, ALL_TOTAL, 0); + testMetricByName(listMetricStore, RANGE_RATE, 0); + testMetricByName(listMetricStore, RANGE_TOTAL, 0); + testMetricByName(listMetricStore, FLUSH_RATE, 2); + testMetricByName(listMetricStore, FLUSH_TOTAL, 2); + testMetricByName(listMetricStore, RESTORE_RATE, 2); + testMetricByName(listMetricStore, RESTORE_TOTAL, 2); } - private boolean testCacheMetric(final StringBuilder errorMessage) { - errorMessage.setLength(0); - try { - final List listMetricCache = new ArrayList(kafkaStreams.metrics().values()).stream().filter(m -> m.metricName().group().equals(STREAM_CACHE_NODE_METRICS)).collect(Collectors.toList()); - testMetricByName(listMetricCache, HIT_RATIO_AVG, 6); - testMetricByName(listMetricCache, HIT_RATIO_MIN, 6); - testMetricByName(listMetricCache, HIT_RATIO_MAX, 6); - return true; - } catch (final Throwable e) { - errorMessage.append(e.getMessage()); - return false; - } + private void checkSessionStoreMetrics() { + final List listMetricStore = new ArrayList(kafkaStreams.metrics().values()).stream() + .filter(m -> m.metricName().group().equals(STREAM_STORE_SESSION_ROCKSDB_STATE_METRICS)) + .collect(Collectors.toList()); + testMetricByName(listMetricStore, PUT_LATENCY_AVG, 2); + testMetricByName(listMetricStore, PUT_LATENCY_MAX, 2); + testMetricByName(listMetricStore, PUT_IF_ABSENT_LATENCY_AVG, 0); + testMetricByName(listMetricStore, PUT_IF_ABSENT_LATENCY_MAX, 0); + testMetricByName(listMetricStore, GET_LATENCY_AVG, 0); + testMetricByName(listMetricStore, GET_LATENCY_MAX, 0); + testMetricByName(listMetricStore, DELETE_LATENCY_AVG, 0); + testMetricByName(listMetricStore, DELETE_LATENCY_MAX, 0); + testMetricByName(listMetricStore, PUT_ALL_LATENCY_AVG, 0); + testMetricByName(listMetricStore, PUT_ALL_LATENCY_MAX, 0); + testMetricByName(listMetricStore, ALL_LATENCY_AVG, 0); + testMetricByName(listMetricStore, ALL_LATENCY_MAX, 0); + testMetricByName(listMetricStore, RANGE_LATENCY_AVG, 0); + testMetricByName(listMetricStore, RANGE_LATENCY_MAX, 0); + testMetricByName(listMetricStore, FLUSH_LATENCY_AVG, 2); + testMetricByName(listMetricStore, FLUSH_LATENCY_MAX, 2); + testMetricByName(listMetricStore, RESTORE_LATENCY_AVG, 2); + testMetricByName(listMetricStore, RESTORE_LATENCY_MAX, 2); + testMetricByName(listMetricStore, PUT_RATE, 2); + testMetricByName(listMetricStore, PUT_TOTAL, 2); + testMetricByName(listMetricStore, PUT_IF_ABSENT_RATE, 0); + testMetricByName(listMetricStore, PUT_IF_ABSENT_TOTAL, 0); + testMetricByName(listMetricStore, GET_RATE, 0); + testMetricByName(listMetricStore, DELETE_RATE, 0); + testMetricByName(listMetricStore, DELETE_TOTAL, 0); + testMetricByName(listMetricStore, PUT_ALL_RATE, 0); + testMetricByName(listMetricStore, PUT_ALL_TOTAL, 0); + testMetricByName(listMetricStore, ALL_RATE, 0); + testMetricByName(listMetricStore, ALL_TOTAL, 0); + testMetricByName(listMetricStore, RANGE_RATE, 0); + testMetricByName(listMetricStore, RANGE_TOTAL, 0); + testMetricByName(listMetricStore, FLUSH_RATE, 2); + testMetricByName(listMetricStore, FLUSH_TOTAL, 2); + testMetricByName(listMetricStore, RESTORE_RATE, 2); + testMetricByName(listMetricStore, RESTORE_TOTAL, 2); } private void testMetricByName(final List listMetric, final String metricName, final int numMetric) {