diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/RegexSourceIntegrationTest.java b/streams/src/test/java/org/apache/kafka/streams/integration/RegexSourceIntegrationTest.java index f74487b7eb3..10e06508b90 100644 --- a/streams/src/test/java/org/apache/kafka/streams/integration/RegexSourceIntegrationTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/integration/RegexSourceIntegrationTest.java @@ -62,6 +62,7 @@ import java.util.Map; import java.util.Properties; import java.util.concurrent.CopyOnWriteArrayList; import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicInteger; import java.util.regex.Pattern; import static org.hamcrest.CoreMatchers.equalTo; @@ -90,11 +91,12 @@ public class RegexSourceIntegrationTest { private static final String PARTITIONED_TOPIC_1 = "partitioned-1"; private static final String PARTITIONED_TOPIC_2 = "partitioned-2"; - private static final String DEFAULT_OUTPUT_TOPIC = "outputTopic"; private static final String STRING_SERDE_CLASSNAME = Serdes.String().getClass().getName(); private Properties streamsConfiguration; private static final String STREAM_TASKS_NOT_UPDATED = "Stream tasks not updated"; private KafkaStreams streams; + private static volatile AtomicInteger topicSuffixGenerator = new AtomicInteger(0); + private String outputTopic; @BeforeClass @@ -107,16 +109,14 @@ public class RegexSourceIntegrationTest { TOPIC_Y, TOPIC_Z, FA_TOPIC, - FOO_TOPIC, - DEFAULT_OUTPUT_TOPIC); + FOO_TOPIC); CLUSTER.createTopic(PARTITIONED_TOPIC_1, 2, 1); CLUSTER.createTopic(PARTITIONED_TOPIC_2, 2, 1); } @Before - public void setUp() throws Exception { - CLUSTER.deleteAndRecreateTopics(DEFAULT_OUTPUT_TOPIC); - + public void setUp() throws InterruptedException { + outputTopic = createTopic(topicSuffixGenerator.incrementAndGet()); final Properties properties = new Properties(); properties.put(StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG, 0); properties.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, 100); @@ -141,6 +141,7 @@ public class RegexSourceIntegrationTest { @Test public void testRegexMatchesTopicsAWhenCreated() throws Exception { + final Serde stringSerde = Serdes.String(); final List expectedFirstAssignment = Collections.singletonList("TEST-TOPIC-1"); final List expectedSecondAssignment = Arrays.asList("TEST-TOPIC-1", "TEST-TOPIC-2"); @@ -151,7 +152,7 @@ public class RegexSourceIntegrationTest { final KStream pattern1Stream = builder.stream(Pattern.compile("TEST-TOPIC-\\d")); - pattern1Stream.to(DEFAULT_OUTPUT_TOPIC, Produced.with(stringSerde, stringSerde)); + pattern1Stream.to(outputTopic, Produced.with(stringSerde, stringSerde)); final List assignedTopics = new CopyOnWriteArrayList<>(); streams = new KafkaStreams(builder.build(), streamsConfiguration, new DefaultKafkaClientSupplier() { @Override @@ -175,6 +176,12 @@ public class RegexSourceIntegrationTest { } + private String createTopic(final int suffix) throws InterruptedException { + final String outputTopic = "outputTopic_" + suffix; + CLUSTER.createTopic(outputTopic); + return outputTopic; + } + @Test public void testRegexMatchesTopicsAWhenDeleted() throws Exception { @@ -188,7 +195,7 @@ public class RegexSourceIntegrationTest { final KStream pattern1Stream = builder.stream(Pattern.compile("TEST-TOPIC-[A-Z]")); - pattern1Stream.to(DEFAULT_OUTPUT_TOPIC, Produced.with(stringSerde, stringSerde)); + pattern1Stream.to(outputTopic, Produced.with(stringSerde, stringSerde)); final List assignedTopics = new CopyOnWriteArrayList<>(); streams = new KafkaStreams(builder.build(), streamsConfiguration, new DefaultKafkaClientSupplier() { @@ -262,9 +269,9 @@ public class RegexSourceIntegrationTest { final KStream pattern2Stream = builder.stream(Pattern.compile("topic-[A-D]")); final KStream namedTopicsStream = builder.stream(Arrays.asList(TOPIC_Y, TOPIC_Z)); - pattern1Stream.to(DEFAULT_OUTPUT_TOPIC, Produced.with(stringSerde, stringSerde)); - pattern2Stream.to(DEFAULT_OUTPUT_TOPIC, Produced.with(stringSerde, stringSerde)); - namedTopicsStream.to(DEFAULT_OUTPUT_TOPIC, Produced.with(stringSerde, stringSerde)); + pattern1Stream.to(outputTopic, Produced.with(stringSerde, stringSerde)); + pattern2Stream.to(outputTopic, Produced.with(stringSerde, stringSerde)); + namedTopicsStream.to(outputTopic, Produced.with(stringSerde, stringSerde)); streams = new KafkaStreams(builder.build(), streamsConfiguration); streams.start(); @@ -281,7 +288,7 @@ public class RegexSourceIntegrationTest { final Properties consumerConfig = TestUtils.consumerConfig(CLUSTER.bootstrapServers(), StringDeserializer.class, StringDeserializer.class); final List expectedReceivedValues = Arrays.asList(topicATestMessage, topic1TestMessage, topic2TestMessage, topicCTestMessage, topicYTestMessage, topicZTestMessage); - final List> receivedKeyValues = IntegrationTestUtils.waitUntilMinKeyValueRecordsReceived(consumerConfig, DEFAULT_OUTPUT_TOPIC, 6); + final List> receivedKeyValues = IntegrationTestUtils.waitUntilMinKeyValueRecordsReceived(consumerConfig, outputTopic, 6); final List actualValues = new ArrayList<>(6); for (final KeyValue receivedKeyValue : receivedKeyValues) { @@ -308,8 +315,8 @@ public class RegexSourceIntegrationTest { final KStream partitionedStreamFollower = builderFollower.stream(Pattern.compile("partitioned-\\d")); - partitionedStreamLeader.to(DEFAULT_OUTPUT_TOPIC, Produced.with(stringSerde, stringSerde)); - partitionedStreamFollower.to(DEFAULT_OUTPUT_TOPIC, Produced.with(stringSerde, stringSerde)); + partitionedStreamLeader.to(outputTopic, Produced.with(stringSerde, stringSerde)); + partitionedStreamFollower.to(outputTopic, Produced.with(stringSerde, stringSerde)); final List leaderAssignment = new ArrayList<>(); final List followerAssignment = new ArrayList<>(); @@ -355,6 +362,7 @@ public class RegexSourceIntegrationTest { @Test public void testNoMessagesSentExceptionFromOverlappingPatterns() throws Exception { + final String fMessage = "fMessage"; final String fooMessage = "fooMessage"; final Serde stringSerde = Serdes.String(); @@ -365,8 +373,8 @@ public class RegexSourceIntegrationTest { final KStream pattern1Stream = builder.stream(Pattern.compile("foo.*")); final KStream pattern2Stream = builder.stream(Pattern.compile("f.*")); - pattern1Stream.to(DEFAULT_OUTPUT_TOPIC, Produced.with(stringSerde, stringSerde)); - pattern2Stream.to(DEFAULT_OUTPUT_TOPIC, Produced.with(stringSerde, stringSerde)); + pattern1Stream.to(outputTopic, Produced.with(stringSerde, stringSerde)); + pattern2Stream.to(outputTopic, Produced.with(stringSerde, stringSerde)); final AtomicBoolean expectError = new AtomicBoolean(false); @@ -385,7 +393,7 @@ public class RegexSourceIntegrationTest { final Properties consumerConfig = TestUtils.consumerConfig(CLUSTER.bootstrapServers(), StringDeserializer.class, StringDeserializer.class); try { - IntegrationTestUtils.waitUntilMinKeyValueRecordsReceived(consumerConfig, DEFAULT_OUTPUT_TOPIC, 2, 5000); + IntegrationTestUtils.waitUntilMinKeyValueRecordsReceived(consumerConfig, outputTopic, 2, 5000); throw new IllegalStateException("This should not happen: an assertion error should have been thrown before this."); } catch (final AssertionError e) { // this is fine