@ -62,6 +62,7 @@ import java.util.Map;
import java.util.Properties ;
import java.util.Properties ;
import java.util.concurrent.CopyOnWriteArrayList ;
import java.util.concurrent.CopyOnWriteArrayList ;
import java.util.concurrent.atomic.AtomicBoolean ;
import java.util.concurrent.atomic.AtomicBoolean ;
import java.util.concurrent.atomic.AtomicInteger ;
import java.util.regex.Pattern ;
import java.util.regex.Pattern ;
import static org.hamcrest.CoreMatchers.equalTo ;
import static org.hamcrest.CoreMatchers.equalTo ;
@ -90,11 +91,12 @@ public class RegexSourceIntegrationTest {
private static final String PARTITIONED_TOPIC_1 = "partitioned-1" ;
private static final String PARTITIONED_TOPIC_1 = "partitioned-1" ;
private static final String PARTITIONED_TOPIC_2 = "partitioned-2" ;
private static final String PARTITIONED_TOPIC_2 = "partitioned-2" ;
private static final String DEFAULT_OUTPUT_TOPIC = "outputTopic" ;
private static final String STRING_SERDE_CLASSNAME = Serdes . String ( ) . getClass ( ) . getName ( ) ;
private static final String STRING_SERDE_CLASSNAME = Serdes . String ( ) . getClass ( ) . getName ( ) ;
private Properties streamsConfiguration ;
private Properties streamsConfiguration ;
private static final String STREAM_TASKS_NOT_UPDATED = "Stream tasks not updated" ;
private static final String STREAM_TASKS_NOT_UPDATED = "Stream tasks not updated" ;
private KafkaStreams streams ;
private KafkaStreams streams ;
private static volatile AtomicInteger topicSuffixGenerator = new AtomicInteger ( 0 ) ;
private String outputTopic ;
@BeforeClass
@BeforeClass
@ -107,16 +109,14 @@ public class RegexSourceIntegrationTest {
TOPIC_Y ,
TOPIC_Y ,
TOPIC_Z ,
TOPIC_Z ,
FA_TOPIC ,
FA_TOPIC ,
FOO_TOPIC ,
FOO_TOPIC ) ;
DEFAULT_OUTPUT_TOPIC ) ;
CLUSTER . createTopic ( PARTITIONED_TOPIC_1 , 2 , 1 ) ;
CLUSTER . createTopic ( PARTITIONED_TOPIC_1 , 2 , 1 ) ;
CLUSTER . createTopic ( PARTITIONED_TOPIC_2 , 2 , 1 ) ;
CLUSTER . createTopic ( PARTITIONED_TOPIC_2 , 2 , 1 ) ;
}
}
@Before
@Before
public void setUp ( ) throws Exception {
public void setUp ( ) throws InterruptedException {
CLUSTER . deleteAndRecreateTopics ( DEFAULT_OUTPUT_TOPIC ) ;
outputTopic = createTopic ( topicSuffixGenerator . incrementAndGet ( ) ) ;
final Properties properties = new Properties ( ) ;
final Properties properties = new Properties ( ) ;
properties . put ( StreamsConfig . CACHE_MAX_BYTES_BUFFERING_CONFIG , 0 ) ;
properties . put ( StreamsConfig . CACHE_MAX_BYTES_BUFFERING_CONFIG , 0 ) ;
properties . put ( StreamsConfig . COMMIT_INTERVAL_MS_CONFIG , 100 ) ;
properties . put ( StreamsConfig . COMMIT_INTERVAL_MS_CONFIG , 100 ) ;
@ -141,6 +141,7 @@ public class RegexSourceIntegrationTest {
@Test
@Test
public void testRegexMatchesTopicsAWhenCreated ( ) throws Exception {
public void testRegexMatchesTopicsAWhenCreated ( ) throws Exception {
final Serde < String > stringSerde = Serdes . String ( ) ;
final Serde < String > stringSerde = Serdes . String ( ) ;
final List < String > expectedFirstAssignment = Collections . singletonList ( "TEST-TOPIC-1" ) ;
final List < String > expectedFirstAssignment = Collections . singletonList ( "TEST-TOPIC-1" ) ;
final List < String > expectedSecondAssignment = Arrays . asList ( "TEST-TOPIC-1" , "TEST-TOPIC-2" ) ;
final List < String > expectedSecondAssignment = Arrays . asList ( "TEST-TOPIC-1" , "TEST-TOPIC-2" ) ;
@ -151,7 +152,7 @@ public class RegexSourceIntegrationTest {
final KStream < String , String > pattern1Stream = builder . stream ( Pattern . compile ( "TEST-TOPIC-\\d" ) ) ;
final KStream < String , String > pattern1Stream = builder . stream ( Pattern . compile ( "TEST-TOPIC-\\d" ) ) ;
pattern1Stream . to ( DEFAULT_OUTPUT_TOPIC , Produced . with ( stringSerde , stringSerde ) ) ;
pattern1Stream . to ( outputTopic , Produced . with ( stringSerde , stringSerde ) ) ;
final List < String > assignedTopics = new CopyOnWriteArrayList < > ( ) ;
final List < String > assignedTopics = new CopyOnWriteArrayList < > ( ) ;
streams = new KafkaStreams ( builder . build ( ) , streamsConfiguration , new DefaultKafkaClientSupplier ( ) {
streams = new KafkaStreams ( builder . build ( ) , streamsConfiguration , new DefaultKafkaClientSupplier ( ) {
@Override
@Override
@ -175,6 +176,12 @@ public class RegexSourceIntegrationTest {
}
}
private String createTopic ( final int suffix ) throws InterruptedException {
final String outputTopic = "outputTopic_" + suffix ;
CLUSTER . createTopic ( outputTopic ) ;
return outputTopic ;
}
@Test
@Test
public void testRegexMatchesTopicsAWhenDeleted ( ) throws Exception {
public void testRegexMatchesTopicsAWhenDeleted ( ) throws Exception {
@ -188,7 +195,7 @@ public class RegexSourceIntegrationTest {
final KStream < String , String > pattern1Stream = builder . stream ( Pattern . compile ( "TEST-TOPIC-[A-Z]" ) ) ;
final KStream < String , String > pattern1Stream = builder . stream ( Pattern . compile ( "TEST-TOPIC-[A-Z]" ) ) ;
pattern1Stream . to ( DEFAULT_OUTPUT_TOPIC , Produced . with ( stringSerde , stringSerde ) ) ;
pattern1Stream . to ( outputTopic , Produced . with ( stringSerde , stringSerde ) ) ;
final List < String > assignedTopics = new CopyOnWriteArrayList < > ( ) ;
final List < String > assignedTopics = new CopyOnWriteArrayList < > ( ) ;
streams = new KafkaStreams ( builder . build ( ) , streamsConfiguration , new DefaultKafkaClientSupplier ( ) {
streams = new KafkaStreams ( builder . build ( ) , streamsConfiguration , new DefaultKafkaClientSupplier ( ) {
@ -262,9 +269,9 @@ public class RegexSourceIntegrationTest {
final KStream < String , String > pattern2Stream = builder . stream ( Pattern . compile ( "topic-[A-D]" ) ) ;
final KStream < String , String > pattern2Stream = builder . stream ( Pattern . compile ( "topic-[A-D]" ) ) ;
final KStream < String , String > namedTopicsStream = builder . stream ( Arrays . asList ( TOPIC_Y , TOPIC_Z ) ) ;
final KStream < String , String > namedTopicsStream = builder . stream ( Arrays . asList ( TOPIC_Y , TOPIC_Z ) ) ;
pattern1Stream . to ( DEFAULT_OUTPUT_TOPIC , Produced . with ( stringSerde , stringSerde ) ) ;
pattern1Stream . to ( outputTopic , Produced . with ( stringSerde , stringSerde ) ) ;
pattern2Stream . to ( DEFAULT_OUTPUT_TOPIC , Produced . with ( stringSerde , stringSerde ) ) ;
pattern2Stream . to ( outputTopic , Produced . with ( stringSerde , stringSerde ) ) ;
namedTopicsStream . to ( DEFAULT_OUTPUT_TOPIC , Produced . with ( stringSerde , stringSerde ) ) ;
namedTopicsStream . to ( outputTopic , Produced . with ( stringSerde , stringSerde ) ) ;
streams = new KafkaStreams ( builder . build ( ) , streamsConfiguration ) ;
streams = new KafkaStreams ( builder . build ( ) , streamsConfiguration ) ;
streams . start ( ) ;
streams . start ( ) ;
@ -281,7 +288,7 @@ public class RegexSourceIntegrationTest {
final Properties consumerConfig = TestUtils . consumerConfig ( CLUSTER . bootstrapServers ( ) , StringDeserializer . class , StringDeserializer . class ) ;
final Properties consumerConfig = TestUtils . consumerConfig ( CLUSTER . bootstrapServers ( ) , StringDeserializer . class , StringDeserializer . class ) ;
final List < String > expectedReceivedValues = Arrays . asList ( topicATestMessage , topic1TestMessage , topic2TestMessage , topicCTestMessage , topicYTestMessage , topicZTestMessage ) ;
final List < String > expectedReceivedValues = Arrays . asList ( topicATestMessage , topic1TestMessage , topic2TestMessage , topicCTestMessage , topicYTestMessage , topicZTestMessage ) ;
final List < KeyValue < String , String > > receivedKeyValues = IntegrationTestUtils . waitUntilMinKeyValueRecordsReceived ( consumerConfig , DEFAULT_OUTPUT_TOPIC , 6 ) ;
final List < KeyValue < String , String > > receivedKeyValues = IntegrationTestUtils . waitUntilMinKeyValueRecordsReceived ( consumerConfig , outputTopic , 6 ) ;
final List < String > actualValues = new ArrayList < > ( 6 ) ;
final List < String > actualValues = new ArrayList < > ( 6 ) ;
for ( final KeyValue < String , String > receivedKeyValue : receivedKeyValues ) {
for ( final KeyValue < String , String > receivedKeyValue : receivedKeyValues ) {
@ -308,8 +315,8 @@ public class RegexSourceIntegrationTest {
final KStream < String , String > partitionedStreamFollower = builderFollower . stream ( Pattern . compile ( "partitioned-\\d" ) ) ;
final KStream < String , String > partitionedStreamFollower = builderFollower . stream ( Pattern . compile ( "partitioned-\\d" ) ) ;
partitionedStreamLeader . to ( DEFAULT_OUTPUT_TOPIC , Produced . with ( stringSerde , stringSerde ) ) ;
partitionedStreamLeader . to ( outputTopic , Produced . with ( stringSerde , stringSerde ) ) ;
partitionedStreamFollower . to ( DEFAULT_OUTPUT_TOPIC , Produced . with ( stringSerde , stringSerde ) ) ;
partitionedStreamFollower . to ( outputTopic , Produced . with ( stringSerde , stringSerde ) ) ;
final List < String > leaderAssignment = new ArrayList < > ( ) ;
final List < String > leaderAssignment = new ArrayList < > ( ) ;
final List < String > followerAssignment = new ArrayList < > ( ) ;
final List < String > followerAssignment = new ArrayList < > ( ) ;
@ -355,6 +362,7 @@ public class RegexSourceIntegrationTest {
@Test
@Test
public void testNoMessagesSentExceptionFromOverlappingPatterns ( ) throws Exception {
public void testNoMessagesSentExceptionFromOverlappingPatterns ( ) throws Exception {
final String fMessage = "fMessage" ;
final String fMessage = "fMessage" ;
final String fooMessage = "fooMessage" ;
final String fooMessage = "fooMessage" ;
final Serde < String > stringSerde = Serdes . String ( ) ;
final Serde < String > stringSerde = Serdes . String ( ) ;
@ -365,8 +373,8 @@ public class RegexSourceIntegrationTest {
final KStream < String , String > pattern1Stream = builder . stream ( Pattern . compile ( "foo.*" ) ) ;
final KStream < String , String > pattern1Stream = builder . stream ( Pattern . compile ( "foo.*" ) ) ;
final KStream < String , String > pattern2Stream = builder . stream ( Pattern . compile ( "f.*" ) ) ;
final KStream < String , String > pattern2Stream = builder . stream ( Pattern . compile ( "f.*" ) ) ;
pattern1Stream . to ( DEFAULT_OUTPUT_TOPIC , Produced . with ( stringSerde , stringSerde ) ) ;
pattern1Stream . to ( outputTopic , Produced . with ( stringSerde , stringSerde ) ) ;
pattern2Stream . to ( DEFAULT_OUTPUT_TOPIC , Produced . with ( stringSerde , stringSerde ) ) ;
pattern2Stream . to ( outputTopic , Produced . with ( stringSerde , stringSerde ) ) ;
final AtomicBoolean expectError = new AtomicBoolean ( false ) ;
final AtomicBoolean expectError = new AtomicBoolean ( false ) ;
@ -385,7 +393,7 @@ public class RegexSourceIntegrationTest {
final Properties consumerConfig = TestUtils . consumerConfig ( CLUSTER . bootstrapServers ( ) , StringDeserializer . class , StringDeserializer . class ) ;
final Properties consumerConfig = TestUtils . consumerConfig ( CLUSTER . bootstrapServers ( ) , StringDeserializer . class , StringDeserializer . class ) ;
try {
try {
IntegrationTestUtils . waitUntilMinKeyValueRecordsReceived ( consumerConfig , DEFAULT_OUTPUT_TOPIC , 2 , 5000 ) ;
IntegrationTestUtils . waitUntilMinKeyValueRecordsReceived ( consumerConfig , outputTopic , 2 , 5000 ) ;
throw new IllegalStateException ( "This should not happen: an assertion error should have been thrown before this." ) ;
throw new IllegalStateException ( "This should not happen: an assertion error should have been thrown before this." ) ;
} catch ( final AssertionError e ) {
} catch ( final AssertionError e ) {
// this is fine
// this is fine