diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/SuppressScenarioTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/SuppressScenarioTest.java index 65c51fc5f28..f23bf0b83d0 100644 --- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/SuppressScenarioTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/SuppressScenarioTest.java @@ -44,6 +44,7 @@ import org.apache.kafka.streams.state.SessionStore; import org.apache.kafka.streams.state.WindowStore; import org.apache.kafka.streams.test.ConsumerRecordFactory; import org.apache.kafka.streams.test.OutputVerifier; +import org.apache.kafka.test.TestUtils; import org.junit.Test; import java.time.Duration; @@ -70,6 +71,11 @@ public class SuppressScenarioTest { private static final StringSerializer STRING_SERIALIZER = new StringSerializer(); private static final Serde STRING_SERDE = Serdes.String(); private static final LongDeserializer LONG_DESERIALIZER = new LongDeserializer(); + private final Properties config = Utils.mkProperties(Utils.mkMap( + Utils.mkEntry(StreamsConfig.APPLICATION_ID_CONFIG, getClass().getSimpleName().toLowerCase(Locale.getDefault())), + Utils.mkEntry(StreamsConfig.STATE_DIR_CONFIG, TestUtils.tempDirectory().getPath()), + Utils.mkEntry(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "bogus") + )); @Test public void shouldImmediatelyEmitEventsWithZeroEmitAfter() { @@ -97,10 +103,6 @@ public class SuppressScenarioTest { final Topology topology = builder.build(); - final Properties config = Utils.mkProperties(Utils.mkMap( - Utils.mkEntry(StreamsConfig.APPLICATION_ID_CONFIG, getClass().getSimpleName().toLowerCase(Locale.getDefault())), - Utils.mkEntry(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "bogus") - )); final ConsumerRecordFactory recordFactory = new ConsumerRecordFactory<>(STRING_SERIALIZER, STRING_SERIALIZER); @@ -178,10 +180,6 @@ public class SuppressScenarioTest { .toStream() .to("output-raw", Produced.with(STRING_SERDE, Serdes.Long())); final Topology topology = builder.build(); - final Properties config = Utils.mkProperties(Utils.mkMap( - Utils.mkEntry(StreamsConfig.APPLICATION_ID_CONFIG, getClass().getSimpleName().toLowerCase(Locale.getDefault())), - Utils.mkEntry(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "bogus") - )); final ConsumerRecordFactory recordFactory = new ConsumerRecordFactory<>(STRING_SERIALIZER, STRING_SERIALIZER); try (final TopologyTestDriver driver = new TopologyTestDriver(topology, config)) { driver.pipeInput(recordFactory.create("input", "k1", "v1", 0L)); @@ -251,10 +249,6 @@ public class SuppressScenarioTest { .to("output-raw", Produced.with(STRING_SERDE, Serdes.Long())); final Topology topology = builder.build(); System.out.println(topology.describe()); - final Properties config = Utils.mkProperties(Utils.mkMap( - Utils.mkEntry(StreamsConfig.APPLICATION_ID_CONFIG, getClass().getSimpleName().toLowerCase(Locale.getDefault())), - Utils.mkEntry(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "bogus") - )); final ConsumerRecordFactory recordFactory = new ConsumerRecordFactory<>(STRING_SERIALIZER, STRING_SERIALIZER); try (final TopologyTestDriver driver = new TopologyTestDriver(topology, config)) { driver.pipeInput(recordFactory.create("input", "k1", "v1", 0L)); @@ -318,10 +312,6 @@ public class SuppressScenarioTest { .to("output-raw", Produced.with(STRING_SERDE, Serdes.Long())); final Topology topology = builder.build(); System.out.println(topology.describe()); - final Properties config = Utils.mkProperties(Utils.mkMap( - Utils.mkEntry(StreamsConfig.APPLICATION_ID_CONFIG, getClass().getSimpleName().toLowerCase(Locale.getDefault())), - Utils.mkEntry(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "bogus") - )); final ConsumerRecordFactory recordFactory = new ConsumerRecordFactory<>(STRING_SERIALIZER, STRING_SERIALIZER); try (final TopologyTestDriver driver = new TopologyTestDriver(topology, config)) { driver.pipeInput(recordFactory.create("input", "k1", "v1", 0L)); @@ -381,10 +371,6 @@ public class SuppressScenarioTest { .to("output-raw", Produced.with(STRING_SERDE, Serdes.Long())); final Topology topology = builder.build(); System.out.println(topology.describe()); - final Properties config = Utils.mkProperties(Utils.mkMap( - Utils.mkEntry(StreamsConfig.APPLICATION_ID_CONFIG, getClass().getSimpleName().toLowerCase(Locale.getDefault())), - Utils.mkEntry(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "bogus") - )); final ConsumerRecordFactory recordFactory = new ConsumerRecordFactory<>(STRING_SERIALIZER, STRING_SERIALIZER); try (final TopologyTestDriver driver = new TopologyTestDriver(topology, config)) { driver.pipeInput(recordFactory.create("input", "k1", "v1", 0L)); @@ -435,10 +421,6 @@ public class SuppressScenarioTest { .to("output-raw", Produced.with(STRING_SERDE, Serdes.Long())); final Topology topology = builder.build(); System.out.println(topology.describe()); - final Properties config = Utils.mkProperties(Utils.mkMap( - Utils.mkEntry(StreamsConfig.APPLICATION_ID_CONFIG, getClass().getSimpleName().toLowerCase(Locale.getDefault())), - Utils.mkEntry(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "bogus") - )); final ConsumerRecordFactory recordFactory = new ConsumerRecordFactory<>(STRING_SERIALIZER, STRING_SERIALIZER); try (final TopologyTestDriver driver = new TopologyTestDriver(topology, config)) { driver.pipeInput(recordFactory.create("input", "k1", "v1", 0L)); @@ -494,10 +476,6 @@ public class SuppressScenarioTest { .to("output-raw", Produced.with(STRING_SERDE, Serdes.Long())); final Topology topology = builder.build(); System.out.println(topology.describe()); - final Properties config = Utils.mkProperties(Utils.mkMap( - Utils.mkEntry(StreamsConfig.APPLICATION_ID_CONFIG, getClass().getSimpleName().toLowerCase(Locale.getDefault())), - Utils.mkEntry(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "bogus") - )); final ConsumerRecordFactory recordFactory = new ConsumerRecordFactory<>(STRING_SERIALIZER, STRING_SERIALIZER); try (final TopologyTestDriver driver = new TopologyTestDriver(topology, config)) { // first window