diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/SuppressionIntegrationTest.java b/streams/src/test/java/org/apache/kafka/streams/integration/SuppressionIntegrationTest.java index 14776b0d1b3..1991e152205 100644 --- a/streams/src/test/java/org/apache/kafka/streams/integration/SuppressionIntegrationTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/integration/SuppressionIntegrationTest.java @@ -16,14 +16,10 @@ */ package org.apache.kafka.streams.integration; -import org.apache.kafka.clients.consumer.ConsumerConfig; import org.apache.kafka.clients.producer.ProducerConfig; -import org.apache.kafka.common.serialization.Deserializer; -import org.apache.kafka.common.serialization.LongDeserializer; import org.apache.kafka.common.serialization.Serde; import org.apache.kafka.common.serialization.Serdes; import org.apache.kafka.common.serialization.Serializer; -import org.apache.kafka.common.serialization.StringDeserializer; import org.apache.kafka.common.serialization.StringSerializer; import org.apache.kafka.common.utils.Bytes; import org.apache.kafka.streams.KafkaStreams; @@ -38,11 +34,7 @@ import org.apache.kafka.streams.kstream.Grouped; import org.apache.kafka.streams.kstream.KTable; import org.apache.kafka.streams.kstream.Materialized; import org.apache.kafka.streams.kstream.Produced; -import org.apache.kafka.streams.kstream.TimeWindows; -import org.apache.kafka.streams.kstream.Windowed; -import org.apache.kafka.streams.kstream.internals.TimeWindow; import org.apache.kafka.streams.state.KeyValueStore; -import org.apache.kafka.streams.state.WindowStore; import org.apache.kafka.test.IntegrationTest; import org.apache.kafka.test.TestUtils; import org.junit.ClassRule; @@ -57,18 +49,16 @@ import java.util.Properties; import static java.lang.Long.MAX_VALUE; import static java.time.Duration.ofMillis; import static java.util.Arrays.asList; -import static java.util.Collections.singletonList; import static org.apache.kafka.common.utils.Utils.mkEntry; import static org.apache.kafka.common.utils.Utils.mkMap; import static org.apache.kafka.common.utils.Utils.mkProperties; import static org.apache.kafka.streams.StreamsConfig.AT_LEAST_ONCE; +import static org.apache.kafka.streams.integration.utils.IntegrationTestUtils.DEFAULT_TIMEOUT; import static org.apache.kafka.streams.integration.utils.IntegrationTestUtils.cleanStateAfterTest; import static org.apache.kafka.streams.integration.utils.IntegrationTestUtils.cleanStateBeforeTest; import static org.apache.kafka.streams.kstream.Suppressed.BufferConfig.maxBytes; import static org.apache.kafka.streams.kstream.Suppressed.BufferConfig.maxRecords; -import static org.apache.kafka.streams.kstream.Suppressed.BufferConfig.unbounded; import static org.apache.kafka.streams.kstream.Suppressed.untilTimeLimit; -import static org.apache.kafka.streams.kstream.Suppressed.untilWindowCloses; import static org.apache.kafka.test.TestUtils.waitForCondition; import static org.hamcrest.CoreMatchers.is; import static org.hamcrest.MatcherAssert.assertThat; @@ -81,71 +71,9 @@ public class SuppressionIntegrationTest { mkProperties(mkMap()), 0L ); - private static final StringDeserializer STRING_DESERIALIZER = new StringDeserializer(); private static final StringSerializer STRING_SERIALIZER = new StringSerializer(); private static final Serde STRING_SERDE = Serdes.String(); - private static final LongDeserializer LONG_DESERIALIZER = new LongDeserializer(); private static final int COMMIT_INTERVAL = 100; - private static final long TIMEOUT_MS = 30_000L; - - @Test - public void shouldSuppressIntermediateEventsWithEmitAfter() { - final String testId = "-shouldSuppressIntermediateEventsWithEmitAfter"; - final String appId = getClass().getSimpleName().toLowerCase(Locale.getDefault()) + testId; - final String input = "input" + testId; - final String outputSuppressed = "output-suppressed" + testId; - final String outputRaw = "output-raw" + testId; - - cleanStateBeforeTest(CLUSTER, input, outputSuppressed, outputRaw); - - final StreamsBuilder builder = new StreamsBuilder(); - final KTable valueCounts = buildCountsTable(input, builder); - - valueCounts - .suppress(untilTimeLimit(ofMillis(scaledTime(2L)), unbounded())) - .toStream() - .to(outputSuppressed, Produced.with(STRING_SERDE, Serdes.Long())); - - valueCounts - .toStream() - .to(outputRaw, Produced.with(STRING_SERDE, Serdes.Long())); - - final Properties streamsConfig = getStreamsConfig(appId); - final KafkaStreams driver = IntegrationTestUtils.getStartedStreams(streamsConfig, builder, true); - - try { - produceSynchronously( - input, - asList( - new KeyValueTimestamp<>("k1", "v1", scaledTime(0L)), - new KeyValueTimestamp<>("k1", "v2", scaledTime(1L)), - new KeyValueTimestamp<>("k2", "v1", scaledTime(2L)), - // this record is just here to advance stream time and flush the other records through the buffer - new KeyValueTimestamp<>("tick", "tick", scaledTime(5L)) - ) - ); - verifyOutput( - outputRaw, - asList( - new KeyValueTimestamp<>("v1", 1L, scaledTime(0L)), - new KeyValueTimestamp<>("v1", 0L, scaledTime(1L)), - new KeyValueTimestamp<>("v2", 1L, scaledTime(1L)), - new KeyValueTimestamp<>("v1", 1L, scaledTime(2L)), - new KeyValueTimestamp<>("tick", 1L, scaledTime(5L)) - ) - ); - verifyOutput( - outputSuppressed, - asList( - new KeyValueTimestamp<>("v1", 1L, scaledTime(2L)), - new KeyValueTimestamp<>("v2", 1L, scaledTime(1L)) - ) - ); - } finally { - driver.close(); - cleanStateAfterTest(CLUSTER, driver); - } - } private KTable buildCountsTable(final String input, final StreamsBuilder builder) { return builder @@ -160,126 +88,6 @@ public class SuppressionIntegrationTest { .count(Materialized.>as("counts").withCachingDisabled()); } - @Test - public void shouldNotSuppressIntermediateEventsWithZeroEmitAfter() { - final String testId = "-shouldNotSuppressIntermediateEventsWithZeroEmitAfter"; - final String appId = getClass().getSimpleName().toLowerCase(Locale.getDefault()) + testId; - final String input = "input" + testId; - final String outputSuppressed = "output-suppressed" + testId; - final String outputRaw = "output-raw" + testId; - - cleanStateBeforeTest(CLUSTER, input, outputSuppressed, outputRaw); - - final StreamsBuilder builder = new StreamsBuilder(); - final KTable valueCounts = buildCountsTable(input, builder); - - valueCounts - .suppress(untilTimeLimit(Duration.ZERO, unbounded())) - .toStream() - .to(outputSuppressed, Produced.with(STRING_SERDE, Serdes.Long())); - - valueCounts - .toStream() - .to(outputRaw, Produced.with(STRING_SERDE, Serdes.Long())); - - final Properties streamsConfig = getStreamsConfig(appId); - final KafkaStreams driver = IntegrationTestUtils.getStartedStreams(streamsConfig, builder, true); - - try { - produceSynchronously( - input, - asList( - new KeyValueTimestamp<>("k1", "v1", scaledTime(0L)), - new KeyValueTimestamp<>("k1", "v2", scaledTime(1L)), - new KeyValueTimestamp<>("k2", "v1", scaledTime(2L)), - new KeyValueTimestamp<>("x", "x", scaledTime(4L)) - ) - ); - verifyOutput( - outputRaw, - asList( - new KeyValueTimestamp<>("v1", 1L, scaledTime(0L)), - new KeyValueTimestamp<>("v1", 0L, scaledTime(1L)), - new KeyValueTimestamp<>("v2", 1L, scaledTime(1L)), - new KeyValueTimestamp<>("v1", 1L, scaledTime(2L)), - new KeyValueTimestamp<>("x", 1L, scaledTime(4L)) - ) - ); - verifyOutput( - outputSuppressed, - asList( - new KeyValueTimestamp<>("v1", 1L, scaledTime(0L)), - new KeyValueTimestamp<>("v1", 0L, scaledTime(1L)), - new KeyValueTimestamp<>("v2", 1L, scaledTime(1L)), - new KeyValueTimestamp<>("v1", 1L, scaledTime(2L)), - new KeyValueTimestamp<>("x", 1L, scaledTime(4L)) - ) - ); - } finally { - driver.close(); - cleanStateAfterTest(CLUSTER, driver); - } - } - - @Test - public void shouldSuppressIntermediateEventsWithRecordLimit() { - final String testId = "-shouldSuppressIntermediateEventsWithRecordLimit"; - final String appId = getClass().getSimpleName().toLowerCase(Locale.getDefault()) + testId; - final String input = "input" + testId; - final String outputSuppressed = "output-suppressed" + testId; - final String outputRaw = "output-raw" + testId; - - cleanStateBeforeTest(CLUSTER, input, outputRaw, outputSuppressed); - - final StreamsBuilder builder = new StreamsBuilder(); - final KTable valueCounts = buildCountsTable(input, builder); - - valueCounts - .suppress(untilTimeLimit(ofMillis(MAX_VALUE), maxRecords(1L).emitEarlyWhenFull())) - .toStream() - .to(outputSuppressed, Produced.with(STRING_SERDE, Serdes.Long())); - - valueCounts - .toStream() - .to(outputRaw, Produced.with(STRING_SERDE, Serdes.Long())); - - final Properties streamsConfig = getStreamsConfig(appId); - final KafkaStreams driver = IntegrationTestUtils.getStartedStreams(streamsConfig, builder, true); - try { - produceSynchronously( - input, - asList( - new KeyValueTimestamp<>("k1", "v1", scaledTime(0L)), - new KeyValueTimestamp<>("k1", "v2", scaledTime(1L)), - new KeyValueTimestamp<>("k2", "v1", scaledTime(2L)), - new KeyValueTimestamp<>("x", "x", scaledTime(3L)) - ) - ); - verifyOutput( - outputRaw, - asList( - new KeyValueTimestamp<>("v1", 1L, scaledTime(0L)), - new KeyValueTimestamp<>("v1", 0L, scaledTime(1L)), - new KeyValueTimestamp<>("v2", 1L, scaledTime(1L)), - new KeyValueTimestamp<>("v1", 1L, scaledTime(2L)), - new KeyValueTimestamp<>("x", 1L, scaledTime(3L)) - ) - ); - verifyOutput( - outputSuppressed, - asList( - // consecutive updates to v1 get suppressed into only the latter. - new KeyValueTimestamp<>("v1", 0L, scaledTime(1L)), - new KeyValueTimestamp<>("v2", 1L, scaledTime(1L)), - new KeyValueTimestamp<>("v1", 1L, scaledTime(2L)) - ) - ); - } finally { - driver.close(); - cleanStateAfterTest(CLUSTER, driver); - } - } - @Test public void shouldShutdownWhenRecordConstraintIsViolated() throws InterruptedException { final String testId = "-shouldShutdownWhenRecordConstraintIsViolated"; @@ -321,66 +129,6 @@ public class SuppressionIntegrationTest { } } - @Test - public void shouldSuppressIntermediateEventsWithBytesLimit() { - final String testId = "-shouldSuppressIntermediateEventsWithBytesLimit"; - final String appId = getClass().getSimpleName().toLowerCase(Locale.getDefault()) + testId; - final String input = "input" + testId; - final String outputSuppressed = "output-suppressed" + testId; - final String outputRaw = "output-raw" + testId; - - cleanStateBeforeTest(CLUSTER, input, outputRaw, outputSuppressed); - - final StreamsBuilder builder = new StreamsBuilder(); - final KTable valueCounts = buildCountsTable(input, builder); - - valueCounts - // this is a bit brittle, but I happen to know that the entries are a little over 100 bytes in size. - .suppress(untilTimeLimit(Duration.ofMillis(MAX_VALUE), maxBytes(200L).emitEarlyWhenFull())) - .toStream() - .to(outputSuppressed, Produced.with(STRING_SERDE, Serdes.Long())); - - valueCounts - .toStream() - .to(outputRaw, Produced.with(STRING_SERDE, Serdes.Long())); - - final Properties streamsConfig = getStreamsConfig(appId); - final KafkaStreams driver = IntegrationTestUtils.getStartedStreams(streamsConfig, builder, true); - try { - produceSynchronously( - input, - asList( - new KeyValueTimestamp<>("k1", "v1", scaledTime(0L)), - new KeyValueTimestamp<>("k1", "v2", scaledTime(1L)), - new KeyValueTimestamp<>("k2", "v1", scaledTime(2L)), - new KeyValueTimestamp<>("x", "x", scaledTime(3L)) - ) - ); - verifyOutput( - outputRaw, - asList( - new KeyValueTimestamp<>("v1", 1L, scaledTime(0L)), - new KeyValueTimestamp<>("v1", 0L, scaledTime(1L)), - new KeyValueTimestamp<>("v2", 1L, scaledTime(1L)), - new KeyValueTimestamp<>("v1", 1L, scaledTime(2L)), - new KeyValueTimestamp<>("x", 1L, scaledTime(3L)) - ) - ); - verifyOutput( - outputSuppressed, - asList( - // consecutive updates to v1 get suppressed into only the latter. - new KeyValueTimestamp<>("v1", 0L, scaledTime(1L)), - new KeyValueTimestamp<>("v2", 1L, scaledTime(1L)), - new KeyValueTimestamp<>("v1", 1L, scaledTime(2L)) - ) - ); - } finally { - driver.close(); - cleanStateAfterTest(CLUSTER, driver); - } - } - @Test public void shouldShutdownWhenBytesConstraintIsViolated() throws InterruptedException { final String testId = "-shouldShutdownWhenBytesConstraintIsViolated"; @@ -423,73 +171,6 @@ public class SuppressionIntegrationTest { } } - @Test - public void shouldSupportFinalResultsForTimeWindows() { - final String testId = "-shouldSupportFinalResultsForTimeWindows"; - final String appId = getClass().getSimpleName().toLowerCase(Locale.getDefault()) + testId; - final String input = "input" + testId; - final String outputSuppressed = "output-suppressed" + testId; - final String outputRaw = "output-raw" + testId; - - cleanStateBeforeTest(CLUSTER, input, outputRaw, outputSuppressed); - - final StreamsBuilder builder = new StreamsBuilder(); - final KTable, Long> valueCounts = builder - .stream(input, - Consumed.with(STRING_SERDE, STRING_SERDE) - ) - .groupBy((String k1, String v1) -> k1, Grouped.with(STRING_SERDE, STRING_SERDE)) - .windowedBy(TimeWindows.of(ofMillis(scaledTime(2L))).grace(ofMillis(scaledTime(1L)))) - .count(Materialized.>as("counts").withCachingDisabled().withLoggingDisabled()); - - valueCounts - .suppress(untilWindowCloses(unbounded())) - .toStream() - .map((final Windowed k, final Long v) -> new KeyValue<>(k.toString(), v)) - .to(outputSuppressed, Produced.with(STRING_SERDE, Serdes.Long())); - - valueCounts - .toStream() - .map((final Windowed k, final Long v) -> new KeyValue<>(k.toString(), v)) - .to(outputRaw, Produced.with(STRING_SERDE, Serdes.Long())); - - final Properties streamsConfig = getStreamsConfig(appId); - final KafkaStreams driver = IntegrationTestUtils.getStartedStreams(streamsConfig, builder, true); - try { - produceSynchronously(input, asList( - new KeyValueTimestamp<>("k1", "v1", scaledTime(0L)), - new KeyValueTimestamp<>("k1", "v1", scaledTime(1L)), - new KeyValueTimestamp<>("k1", "v1", scaledTime(2L)), - new KeyValueTimestamp<>("k1", "v1", scaledTime(1L)), - new KeyValueTimestamp<>("k1", "v1", scaledTime(0L)), - new KeyValueTimestamp<>("k1", "v1", scaledTime(4L)), - // note this event is dropped since it is out of the grace period - new KeyValueTimestamp<>("k1", "v1", scaledTime(0L)) - )); - verifyOutput( - outputRaw, - asList( - new KeyValueTimestamp<>(scaledWindowKey("k1", 0L, 2L), 1L, scaledTime(0L)), - new KeyValueTimestamp<>(scaledWindowKey("k1", 0L, 2L), 2L, scaledTime(1L)), - new KeyValueTimestamp<>(scaledWindowKey("k1", 2L, 4L), 1L, scaledTime(2L)), - new KeyValueTimestamp<>(scaledWindowKey("k1", 0L, 2L), 3L, scaledTime(1L)), - new KeyValueTimestamp<>(scaledWindowKey("k1", 0L, 2L), 4L, scaledTime(0L)), - new KeyValueTimestamp<>(scaledWindowKey("k1", 4L, 6L), 1L, scaledTime(4L)) - ) - ); - - verifyOutput( - outputSuppressed, - singletonList( - new KeyValueTimestamp<>(scaledWindowKey("k1", 0L, 2L), 4L, scaledTime(0L)) - ) - ); - } finally { - driver.close(); - cleanStateAfterTest(CLUSTER, driver); - } - } - private Properties getStreamsConfig(final String appId) { return mkProperties(mkMap( mkEntry(StreamsConfig.APPLICATION_ID_CONFIG, appId), @@ -501,10 +182,6 @@ public class SuppressionIntegrationTest { )); } - private String scaledWindowKey(final String key, final long unscaledStart, final long unscaledEnd) { - return new Windowed<>(key, new TimeWindow(scaledTime(unscaledStart), scaledTime(unscaledEnd))).toString(); - } - /** * scaling to ensure that there are commits in between the various test events, * just to exercise that everything works properly in the presence of commits. @@ -524,20 +201,7 @@ public class SuppressionIntegrationTest { } private void verifyErrorShutdown(final KafkaStreams driver) throws InterruptedException { - waitForCondition(() -> !driver.state().isRunning(), TIMEOUT_MS, "Streams didn't shut down."); + waitForCondition(() -> !driver.state().isRunning(), DEFAULT_TIMEOUT, "Streams didn't shut down."); assertThat(driver.state(), is(KafkaStreams.State.ERROR)); } - - private void verifyOutput(final String topic, final List> keyValueTimestamps) { - final Properties properties = mkProperties( - mkMap( - mkEntry(ConsumerConfig.GROUP_ID_CONFIG, "test-group"), - mkEntry(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, CLUSTER.bootstrapServers()), - mkEntry(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, ((Deserializer) STRING_DESERIALIZER).getClass().getName()), - mkEntry(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, ((Deserializer) LONG_DESERIALIZER).getClass().getName()) - ) - ); - IntegrationTestUtils.verifyKeyValueTimestamps(properties, topic, keyValueTimestamps); - - } } diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/utils/IntegrationTestUtils.java b/streams/src/test/java/org/apache/kafka/streams/integration/utils/IntegrationTestUtils.java index 8bca79f8d83..8a2122d8e37 100644 --- a/streams/src/test/java/org/apache/kafka/streams/integration/utils/IntegrationTestUtils.java +++ b/streams/src/test/java/org/apache/kafka/streams/integration/utils/IntegrationTestUtils.java @@ -68,8 +68,7 @@ import java.util.stream.Collectors; */ public class IntegrationTestUtils { - public static final long DEFAULT_TIMEOUT = 30 * 1000L; - private static final long DEFAULT_COMMIT_INTERVAL = 100L; + public static final long DEFAULT_TIMEOUT = 60 * 1000L; public static final String INTERNAL_LEAVE_GROUP_ON_CLOSE = "internal.leave.group.on.close"; /*