Browse Source

KAFKA-8262, KAFKA-8263: Fix flaky test `MetricsIntegrationTest` (#6922)

- Timeout occurred due to initial slow rebalancing.
- Added code to wait until `KafkaStreams` instance is in state RUNNING to check registration of metrics and in state NOT_RUNNING to check deregistration of metrics.
- I removed all other wait conditions, because they are not needed if `KafkaStreams` instance is in the right state.

Reviewers: Guozhang Wang <wangguoz@gmail.com>
pull/6928/head
cadonna 6 years ago committed by Guozhang Wang
parent
commit
df9ea618a3
  1. 236
      streams/src/test/java/org/apache/kafka/streams/integration/MetricsIntegrationTest.java

236
streams/src/test/java/org/apache/kafka/streams/integration/MetricsIntegrationTest.java

@ -21,6 +21,7 @@ import org.apache.kafka.common.metrics.Sensor; @@ -21,6 +21,7 @@ import org.apache.kafka.common.metrics.Sensor;
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.common.utils.Bytes;
import org.apache.kafka.streams.KafkaStreams;
import org.apache.kafka.streams.KafkaStreams.State;
import org.apache.kafka.streams.StreamsBuilder;
import org.apache.kafka.streams.StreamsConfig;
import org.apache.kafka.streams.integration.utils.EmbeddedKafkaCluster;
@ -50,6 +51,9 @@ import java.util.List; @@ -50,6 +51,9 @@ import java.util.List;
import java.util.Properties;
import java.util.stream.Collectors;
import static org.hamcrest.CoreMatchers.is;
import static org.hamcrest.MatcherAssert.assertThat;
@SuppressWarnings("unchecked")
@Category({IntegrationTest.class})
public class MetricsIntegrationTest {
@ -187,27 +191,35 @@ public class MetricsIntegrationTest { @@ -187,27 +191,35 @@ public class MetricsIntegrationTest {
CLUSTER.deleteTopics(STREAM_INPUT, STREAM_OUTPUT_1, STREAM_OUTPUT_2, STREAM_OUTPUT_3, STREAM_OUTPUT_4);
}
private void startApplication() {
private void startApplication() throws InterruptedException {
kafkaStreams = new KafkaStreams(builder.build(), streamsConfiguration);
kafkaStreams.start();
final long timeout = 60000;
TestUtils.waitForCondition(
() -> kafkaStreams.state() == State.RUNNING,
timeout,
() -> "Kafka Streams application did not reach state RUNNING in " + timeout + " ms");
}
private void closeApplication() throws Exception {
kafkaStreams.close();
kafkaStreams.cleanUp();
IntegrationTestUtils.purgeLocalStreamsState(streamsConfiguration);
final long timeout = 60000;
TestUtils.waitForCondition(
() -> kafkaStreams.state() == State.NOT_RUNNING,
timeout,
() -> "Kafka Streams application did not reach state NOT_RUNNING in " + timeout + " ms");
}
private void checkMetricDeregistration() throws InterruptedException {
TestUtils.waitForCondition(() -> {
final List<Metric> listMetricAfterClosingApp = new ArrayList<Metric>(kafkaStreams.metrics().values()).stream().filter(m -> m.metricName().group().contains(STREAM_STRING)).collect(Collectors.toList());
return listMetricAfterClosingApp.size() == 0;
}, 10000, "de-registration of metrics");
private void checkMetricDeregistration() {
final List<Metric> listMetricAfterClosingApp = new ArrayList<Metric>(kafkaStreams.metrics().values()).stream()
.filter(m -> m.metricName().group().contains(STREAM_STRING)).collect(Collectors.toList());
assertThat(listMetricAfterClosingApp.size(), is(0));
}
@Test
public void testStreamMetric() throws Exception {
final StringBuilder errorMessage = new StringBuilder();
stream = builder.stream(STREAM_INPUT, Consumed.with(Serdes.Integer(), Serdes.String()));
stream.to(STREAM_OUTPUT_1, Produced.with(Serdes.Integer(), Serdes.String()));
builder.table(STREAM_OUTPUT_1, Materialized.as(Stores.inMemoryKeyValueStore(MY_STORE_IN_MEMORY)).withCachingEnabled())
@ -222,22 +234,13 @@ public class MetricsIntegrationTest { @@ -222,22 +234,13 @@ public class MetricsIntegrationTest {
startApplication();
// metric level : Thread
TestUtils.waitForCondition(() -> testThreadMetric(errorMessage), 10000, () -> "testThreadMetric -> " + errorMessage.toString());
// metric level : Task
TestUtils.waitForCondition(() -> testTaskMetric(errorMessage), 10000, () -> "testTaskMetric -> " + errorMessage.toString());
// metric level : Processor
TestUtils.waitForCondition(() -> testProcessorMetric(errorMessage), 10000, () -> "testProcessorMetric -> " + errorMessage.toString());
// metric level : Store (in-memory-state, in-memory-lru-state, rocksdb-state)
TestUtils.waitForCondition(() -> testStoreMetricKeyValueByType(STREAM_STORE_IN_MEMORY_STATE_METRICS, errorMessage), 10000, () -> "testStoreMetricKeyValueByType:" + STREAM_STORE_IN_MEMORY_STATE_METRICS + " -> " + errorMessage.toString());
TestUtils.waitForCondition(() -> testStoreMetricKeyValueByType(STREAM_STORE_IN_MEMORY_LRU_STATE_METRICS, errorMessage), 10000, () -> "testStoreMetricKeyValueByType:" + STREAM_STORE_IN_MEMORY_LRU_STATE_METRICS + " -> " + errorMessage.toString());
TestUtils.waitForCondition(() -> testStoreMetricKeyValueByType(STREAM_STORE_ROCKSDB_STATE_METRICS, errorMessage), 10000, () -> "testStoreMetricKeyValueByType:" + STREAM_STORE_ROCKSDB_STATE_METRICS + " -> " + errorMessage.toString());
//metric level : Cache
TestUtils.waitForCondition(() -> testCacheMetric(errorMessage), 10000, () -> "testCacheMetric -> " + errorMessage.toString());
checkThreadLevelMetrics();
checkTaskLevelMetrics();
checkProcessorLevelMetrics();
checkKeyValueStoreMetricsByType(STREAM_STORE_IN_MEMORY_STATE_METRICS);
checkKeyValueStoreMetricsByType(STREAM_STORE_IN_MEMORY_LRU_STATE_METRICS);
checkKeyValueStoreMetricsByType(STREAM_STORE_ROCKSDB_STATE_METRICS);
checkCacheMetrics();
closeApplication();
@ -247,7 +250,6 @@ public class MetricsIntegrationTest { @@ -247,7 +250,6 @@ public class MetricsIntegrationTest {
@Test
public void testStreamMetricOfWindowStore() throws Exception {
final StringBuilder errorMessage = new StringBuilder();
stream2 = builder.stream(STREAM_INPUT, Consumed.with(Serdes.Integer(), Serdes.String()));
final KGroupedStream<Integer, String> groupedStream = stream2.groupByKey();
groupedStream.windowedBy(TimeWindows.of(Duration.ofMillis(50)))
@ -257,8 +259,7 @@ public class MetricsIntegrationTest { @@ -257,8 +259,7 @@ public class MetricsIntegrationTest {
startApplication();
// metric level : Store (window)
TestUtils.waitForCondition(() -> testStoreMetricWindow(errorMessage), 10000, () -> "testStoreMetricWindow -> " + errorMessage.toString());
checkWindowStoreMetrics();
closeApplication();
@ -268,7 +269,6 @@ public class MetricsIntegrationTest { @@ -268,7 +269,6 @@ public class MetricsIntegrationTest {
@Test
public void testStreamMetricOfSessionStore() throws Exception {
final StringBuilder errorMessage = new StringBuilder();
stream2 = builder.stream(STREAM_INPUT, Consumed.with(Serdes.Integer(), Serdes.String()));
final KGroupedStream<Integer, String> groupedStream = stream2.groupByKey();
groupedStream.windowedBy(SessionWindows.with(Duration.ofMillis(50)))
@ -278,8 +278,7 @@ public class MetricsIntegrationTest { @@ -278,8 +278,7 @@ public class MetricsIntegrationTest {
startApplication();
// metric level : Store (session)
TestUtils.waitForCondition(() -> testStoreMetricSession(errorMessage), 10000, () -> "testStoreMetricSession -> " + errorMessage.toString());
checkSessionStoreMetrics();
closeApplication();
@ -287,10 +286,20 @@ public class MetricsIntegrationTest { @@ -287,10 +286,20 @@ public class MetricsIntegrationTest {
checkMetricDeregistration();
}
private boolean testThreadMetric(final StringBuilder errorMessage) {
errorMessage.setLength(0);
try {
final List<Metric> listMetricThread = new ArrayList<Metric>(kafkaStreams.metrics().values()).stream().filter(m -> m.metricName().group().equals(STREAM_THREAD_NODE_METRICS)).collect(Collectors.toList());
private void checkTaskLevelMetrics() {
final List<Metric> listMetricTask = new ArrayList<Metric>(kafkaStreams.metrics().values()).stream()
.filter(m -> m.metricName().group().equals(STREAM_TASK_NODE_METRICS)).collect(Collectors.toList());
testMetricByName(listMetricTask, COMMIT_LATENCY_AVG, 5);
testMetricByName(listMetricTask, COMMIT_LATENCY_MAX, 5);
testMetricByName(listMetricTask, COMMIT_RATE, 5);
testMetricByName(listMetricTask, COMMIT_TOTAL, 5);
testMetricByName(listMetricTask, RECORD_LATENESS_AVG, 4);
testMetricByName(listMetricTask, RECORD_LATENESS_MAX, 4);
}
private void checkThreadLevelMetrics() {
final List<Metric> listMetricThread = new ArrayList<Metric>(kafkaStreams.metrics().values()).stream()
.filter(m -> m.metricName().group().equals(STREAM_THREAD_NODE_METRICS)).collect(Collectors.toList());
testMetricByName(listMetricThread, COMMIT_LATENCY_AVG, 1);
testMetricByName(listMetricThread, COMMIT_LATENCY_MAX, 1);
testMetricByName(listMetricThread, POLL_LATENCY_AVG, 1);
@ -313,34 +322,11 @@ public class MetricsIntegrationTest { @@ -313,34 +322,11 @@ public class MetricsIntegrationTest {
testMetricByName(listMetricThread, TASK_CLOSED_TOTAL, 1);
testMetricByName(listMetricThread, SKIPPED_RECORDS_RATE, 1);
testMetricByName(listMetricThread, SKIPPED_RECORDS_TOTAL, 1);
return true;
} catch (final Throwable e) {
errorMessage.append(e.getMessage());
return false;
}
}
private boolean testTaskMetric(final StringBuilder errorMessage) {
errorMessage.setLength(0);
try {
final List<Metric> listMetricTask = new ArrayList<Metric>(kafkaStreams.metrics().values()).stream().filter(m -> m.metricName().group().equals(STREAM_TASK_NODE_METRICS)).collect(Collectors.toList());
testMetricByName(listMetricTask, COMMIT_LATENCY_AVG, 5);
testMetricByName(listMetricTask, COMMIT_LATENCY_MAX, 5);
testMetricByName(listMetricTask, COMMIT_RATE, 5);
testMetricByName(listMetricTask, COMMIT_TOTAL, 5);
testMetricByName(listMetricTask, RECORD_LATENESS_AVG, 4);
testMetricByName(listMetricTask, RECORD_LATENESS_MAX, 4);
return true;
} catch (final Throwable e) {
errorMessage.append(e.getMessage());
return false;
}
}
private boolean testProcessorMetric(final StringBuilder errorMessage) {
errorMessage.setLength(0);
try {
final List<Metric> listMetricProcessor = new ArrayList<Metric>(kafkaStreams.metrics().values()).stream().filter(m -> m.metricName().group().equals(STREAM_PROCESSOR_NODE_METRICS)).collect(Collectors.toList());
private void checkProcessorLevelMetrics() {
final List<Metric> listMetricProcessor = new ArrayList<Metric>(kafkaStreams.metrics().values()).stream()
.filter(m -> m.metricName().group().equals(STREAM_PROCESSOR_NODE_METRICS)).collect(Collectors.toList());
testMetricByName(listMetricProcessor, PROCESS_LATENCY_AVG, 18);
testMetricByName(listMetricProcessor, PROCESS_LATENCY_MAX, 18);
testMetricByName(listMetricProcessor, PUNCTUATE_LATENCY_AVG, 18);
@ -358,16 +344,58 @@ public class MetricsIntegrationTest { @@ -358,16 +344,58 @@ public class MetricsIntegrationTest {
testMetricByName(listMetricProcessor, DESTROY_RATE, 18);
testMetricByName(listMetricProcessor, DESTROY_TOTAL, 18);
testMetricByName(listMetricProcessor, FORWARD_TOTAL, 18);
return true;
} catch (final Throwable e) {
errorMessage.append(e.getMessage());
return false;
}
private void checkKeyValueStoreMetricsByType(final String storeType) {
final List<Metric> listMetricStore = new ArrayList<Metric>(kafkaStreams.metrics().values()).stream()
.filter(m -> m.metricName().group().equals(storeType))
.collect(Collectors.toList());
testMetricByName(listMetricStore, PUT_LATENCY_AVG, 2);
testMetricByName(listMetricStore, PUT_LATENCY_MAX, 2);
testMetricByName(listMetricStore, PUT_IF_ABSENT_LATENCY_AVG, 2);
testMetricByName(listMetricStore, PUT_IF_ABSENT_LATENCY_MAX, 2);
testMetricByName(listMetricStore, GET_LATENCY_AVG, 2);
testMetricByName(listMetricStore, GET_LATENCY_MAX, 2);
testMetricByName(listMetricStore, DELETE_LATENCY_AVG, 2);
testMetricByName(listMetricStore, DELETE_LATENCY_MAX, 2);
testMetricByName(listMetricStore, PUT_ALL_LATENCY_AVG, 2);
testMetricByName(listMetricStore, PUT_ALL_LATENCY_MAX, 2);
testMetricByName(listMetricStore, ALL_LATENCY_AVG, 2);
testMetricByName(listMetricStore, ALL_LATENCY_MAX, 2);
testMetricByName(listMetricStore, RANGE_LATENCY_AVG, 2);
testMetricByName(listMetricStore, RANGE_LATENCY_MAX, 2);
testMetricByName(listMetricStore, FLUSH_LATENCY_AVG, 2);
testMetricByName(listMetricStore, FLUSH_LATENCY_MAX, 2);
testMetricByName(listMetricStore, RESTORE_LATENCY_AVG, 2);
testMetricByName(listMetricStore, RESTORE_LATENCY_MAX, 2);
testMetricByName(listMetricStore, PUT_RATE, 2);
testMetricByName(listMetricStore, PUT_TOTAL, 2);
testMetricByName(listMetricStore, PUT_IF_ABSENT_RATE, 2);
testMetricByName(listMetricStore, PUT_IF_ABSENT_TOTAL, 2);
testMetricByName(listMetricStore, GET_RATE, 2);
testMetricByName(listMetricStore, DELETE_RATE, 2);
testMetricByName(listMetricStore, DELETE_TOTAL, 2);
testMetricByName(listMetricStore, PUT_ALL_RATE, 2);
testMetricByName(listMetricStore, PUT_ALL_TOTAL, 2);
testMetricByName(listMetricStore, ALL_RATE, 2);
testMetricByName(listMetricStore, ALL_TOTAL, 2);
testMetricByName(listMetricStore, RANGE_RATE, 2);
testMetricByName(listMetricStore, RANGE_TOTAL, 2);
testMetricByName(listMetricStore, FLUSH_RATE, 2);
testMetricByName(listMetricStore, FLUSH_TOTAL, 2);
testMetricByName(listMetricStore, RESTORE_RATE, 2);
testMetricByName(listMetricStore, RESTORE_TOTAL, 2);
}
private void checkCacheMetrics() {
final List<Metric> listMetricCache = new ArrayList<Metric>(kafkaStreams.metrics().values()).stream()
.filter(m -> m.metricName().group().equals(STREAM_CACHE_NODE_METRICS)).collect(Collectors.toList());
testMetricByName(listMetricCache, HIT_RATIO_AVG, 6);
testMetricByName(listMetricCache, HIT_RATIO_MIN, 6);
testMetricByName(listMetricCache, HIT_RATIO_MAX, 6);
}
private boolean testStoreMetricWindow(final StringBuilder errorMessage) {
errorMessage.setLength(0);
try {
private void checkWindowStoreMetrics() {
final List<Metric> listMetricStore = new ArrayList<Metric>(kafkaStreams.metrics().values()).stream()
.filter(m -> m.metricName().group().equals(STREAM_STORE_WINDOW_ROCKSDB_STATE_METRICS))
.collect(Collectors.toList());
@ -406,16 +434,9 @@ public class MetricsIntegrationTest { @@ -406,16 +434,9 @@ public class MetricsIntegrationTest {
testMetricByName(listMetricStore, FLUSH_TOTAL, 2);
testMetricByName(listMetricStore, RESTORE_RATE, 2);
testMetricByName(listMetricStore, RESTORE_TOTAL, 2);
return true;
} catch (final Throwable e) {
errorMessage.append(e.getMessage());
return false;
}
}
private boolean testStoreMetricSession(final StringBuilder errorMessage) {
errorMessage.setLength(0);
try {
private void checkSessionStoreMetrics() {
final List<Metric> listMetricStore = new ArrayList<Metric>(kafkaStreams.metrics().values()).stream()
.filter(m -> m.metricName().group().equals(STREAM_STORE_SESSION_ROCKSDB_STATE_METRICS))
.collect(Collectors.toList());
@ -454,73 +475,6 @@ public class MetricsIntegrationTest { @@ -454,73 +475,6 @@ public class MetricsIntegrationTest {
testMetricByName(listMetricStore, FLUSH_TOTAL, 2);
testMetricByName(listMetricStore, RESTORE_RATE, 2);
testMetricByName(listMetricStore, RESTORE_TOTAL, 2);
return true;
} catch (final Throwable e) {
errorMessage.append(e.getMessage());
return false;
}
}
private boolean testStoreMetricKeyValueByType(final String storeType, final StringBuilder errorMessage) {
errorMessage.setLength(0);
try {
final List<Metric> listMetricStore = new ArrayList<Metric>(kafkaStreams.metrics().values()).stream()
.filter(m -> m.metricName().group().equals(storeType))
.collect(Collectors.toList());
testMetricByName(listMetricStore, PUT_LATENCY_AVG, 2);
testMetricByName(listMetricStore, PUT_LATENCY_MAX, 2);
testMetricByName(listMetricStore, PUT_IF_ABSENT_LATENCY_AVG, 2);
testMetricByName(listMetricStore, PUT_IF_ABSENT_LATENCY_MAX, 2);
testMetricByName(listMetricStore, GET_LATENCY_AVG, 2);
testMetricByName(listMetricStore, GET_LATENCY_MAX, 2);
testMetricByName(listMetricStore, DELETE_LATENCY_AVG, 2);
testMetricByName(listMetricStore, DELETE_LATENCY_MAX, 2);
testMetricByName(listMetricStore, PUT_ALL_LATENCY_AVG, 2);
testMetricByName(listMetricStore, PUT_ALL_LATENCY_MAX, 2);
testMetricByName(listMetricStore, ALL_LATENCY_AVG, 2);
testMetricByName(listMetricStore, ALL_LATENCY_MAX, 2);
testMetricByName(listMetricStore, RANGE_LATENCY_AVG, 2);
testMetricByName(listMetricStore, RANGE_LATENCY_MAX, 2);
testMetricByName(listMetricStore, FLUSH_LATENCY_AVG, 2);
testMetricByName(listMetricStore, FLUSH_LATENCY_MAX, 2);
testMetricByName(listMetricStore, RESTORE_LATENCY_AVG, 2);
testMetricByName(listMetricStore, RESTORE_LATENCY_MAX, 2);
testMetricByName(listMetricStore, PUT_RATE, 2);
testMetricByName(listMetricStore, PUT_TOTAL, 2);
testMetricByName(listMetricStore, PUT_IF_ABSENT_RATE, 2);
testMetricByName(listMetricStore, PUT_IF_ABSENT_TOTAL, 2);
testMetricByName(listMetricStore, GET_RATE, 2);
testMetricByName(listMetricStore, DELETE_RATE, 2);
testMetricByName(listMetricStore, DELETE_TOTAL, 2);
testMetricByName(listMetricStore, PUT_ALL_RATE, 2);
testMetricByName(listMetricStore, PUT_ALL_TOTAL, 2);
testMetricByName(listMetricStore, ALL_RATE, 2);
testMetricByName(listMetricStore, ALL_TOTAL, 2);
testMetricByName(listMetricStore, RANGE_RATE, 2);
testMetricByName(listMetricStore, RANGE_TOTAL, 2);
testMetricByName(listMetricStore, FLUSH_RATE, 2);
testMetricByName(listMetricStore, FLUSH_TOTAL, 2);
testMetricByName(listMetricStore, RESTORE_RATE, 2);
testMetricByName(listMetricStore, RESTORE_TOTAL, 2);
return true;
} catch (final Throwable e) {
errorMessage.append(e.getMessage());
return false;
}
}
private boolean testCacheMetric(final StringBuilder errorMessage) {
errorMessage.setLength(0);
try {
final List<Metric> listMetricCache = new ArrayList<Metric>(kafkaStreams.metrics().values()).stream().filter(m -> m.metricName().group().equals(STREAM_CACHE_NODE_METRICS)).collect(Collectors.toList());
testMetricByName(listMetricCache, HIT_RATIO_AVG, 6);
testMetricByName(listMetricCache, HIT_RATIO_MIN, 6);
testMetricByName(listMetricCache, HIT_RATIO_MAX, 6);
return true;
} catch (final Throwable e) {
errorMessage.append(e.getMessage());
return false;
}
}
private void testMetricByName(final List<Metric> listMetric, final String metricName, final int numMetric) {

Loading…
Cancel
Save