|
|
|
@ -31,7 +31,9 @@ import org.apache.kafka.common.serialization.StringSerializer;
@@ -31,7 +31,9 @@ import org.apache.kafka.common.serialization.StringSerializer;
|
|
|
|
|
import org.apache.kafka.common.utils.SystemTime; |
|
|
|
|
import org.apache.kafka.streams.errors.TopologyException; |
|
|
|
|
import org.apache.kafka.streams.kstream.Consumed; |
|
|
|
|
import org.apache.kafka.streams.kstream.KTable; |
|
|
|
|
import org.apache.kafka.streams.kstream.Materialized; |
|
|
|
|
import org.apache.kafka.streams.kstream.Named; |
|
|
|
|
import org.apache.kafka.streams.processor.AbstractProcessor; |
|
|
|
|
import org.apache.kafka.streams.processor.Processor; |
|
|
|
|
import org.apache.kafka.streams.processor.ProcessorContext; |
|
|
|
@ -73,10 +75,9 @@ import java.util.regex.Pattern;
@@ -73,10 +75,9 @@ import java.util.regex.Pattern;
|
|
|
|
|
import static org.apache.kafka.common.utils.Utils.mkEntry; |
|
|
|
|
import static org.apache.kafka.common.utils.Utils.mkMap; |
|
|
|
|
import static org.apache.kafka.common.utils.Utils.mkProperties; |
|
|
|
|
import static org.hamcrest.CoreMatchers.endsWith; |
|
|
|
|
import static org.apache.kafka.common.utils.Utils.mkSet; |
|
|
|
|
import static org.hamcrest.CoreMatchers.equalTo; |
|
|
|
|
import static org.hamcrest.CoreMatchers.hasItem; |
|
|
|
|
import static org.hamcrest.CoreMatchers.hasItems; |
|
|
|
|
import static org.hamcrest.CoreMatchers.is; |
|
|
|
|
import static org.hamcrest.CoreMatchers.notNullValue; |
|
|
|
|
import static org.hamcrest.MatcherAssert.assertThat; |
|
|
|
@ -391,15 +392,18 @@ public class TopologyTestDriverTest {
@@ -391,15 +392,18 @@ public class TopologyTestDriverTest {
|
|
|
|
|
return topology; |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
private Topology setupTopologyWithInternalTopic() { |
|
|
|
|
private Topology setupTopologyWithInternalTopic(final String firstTableName, |
|
|
|
|
final String secondTableName, |
|
|
|
|
final String joinName) { |
|
|
|
|
final StreamsBuilder builder = new StreamsBuilder(); |
|
|
|
|
|
|
|
|
|
builder.stream(SOURCE_TOPIC_1) |
|
|
|
|
final KTable<Object, Long> t1 = builder.stream(SOURCE_TOPIC_1) |
|
|
|
|
.selectKey((k, v) -> v) |
|
|
|
|
.groupByKey() |
|
|
|
|
.count() |
|
|
|
|
.toStream() |
|
|
|
|
.to(SINK_TOPIC_1); |
|
|
|
|
.count(Materialized.as(firstTableName)); |
|
|
|
|
|
|
|
|
|
builder.table(SOURCE_TOPIC_2, Materialized.as(secondTableName)) |
|
|
|
|
.join(t1, v -> v, (v1, v2) -> v2, Named.as(joinName)); |
|
|
|
|
|
|
|
|
|
return builder.build(config); |
|
|
|
|
} |
|
|
|
@ -467,23 +471,62 @@ public class TopologyTestDriverTest {
@@ -467,23 +471,62 @@ public class TopologyTestDriverTest {
|
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
@Test |
|
|
|
|
public void shouldGetSinkTopicNames() { |
|
|
|
|
public void shouldCaptureSinkTopicNamesIfWrittenInto() { |
|
|
|
|
testDriver = new TopologyTestDriver(setupSourceSinkTopology(), config); |
|
|
|
|
|
|
|
|
|
pipeRecord(SOURCE_TOPIC_1, testRecord1); |
|
|
|
|
assertThat(testDriver.producedTopicNames(), is(Collections.emptySet())); |
|
|
|
|
|
|
|
|
|
pipeRecord(SOURCE_TOPIC_1, testRecord1); |
|
|
|
|
assertThat(testDriver.producedTopicNames(), hasItem(SINK_TOPIC_1)); |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
@Test |
|
|
|
|
public void shouldGetInternalTopicNames() { |
|
|
|
|
testDriver = new TopologyTestDriver(setupTopologyWithInternalTopic(), config); |
|
|
|
|
public void shouldCaptureInternalTopicNamesIfWrittenInto() { |
|
|
|
|
testDriver = new TopologyTestDriver( |
|
|
|
|
setupTopologyWithInternalTopic("table1", "table2", "join"), |
|
|
|
|
config |
|
|
|
|
); |
|
|
|
|
|
|
|
|
|
assertThat(testDriver.producedTopicNames(), is(Collections.emptySet())); |
|
|
|
|
|
|
|
|
|
pipeRecord(SOURCE_TOPIC_1, testRecord1); |
|
|
|
|
assertThat( |
|
|
|
|
testDriver.producedTopicNames(), |
|
|
|
|
equalTo(mkSet( |
|
|
|
|
config.getProperty(StreamsConfig.APPLICATION_ID_CONFIG) + "-table1-repartition", |
|
|
|
|
config.getProperty(StreamsConfig.APPLICATION_ID_CONFIG) + "-table1-changelog" |
|
|
|
|
)) |
|
|
|
|
); |
|
|
|
|
|
|
|
|
|
pipeRecord(SOURCE_TOPIC_2, testRecord1); |
|
|
|
|
assertThat( |
|
|
|
|
testDriver.producedTopicNames(), |
|
|
|
|
equalTo(mkSet( |
|
|
|
|
config.getProperty(StreamsConfig.APPLICATION_ID_CONFIG) + "-table1-repartition", |
|
|
|
|
config.getProperty(StreamsConfig.APPLICATION_ID_CONFIG) + "-table1-changelog", |
|
|
|
|
config.getProperty(StreamsConfig.APPLICATION_ID_CONFIG) + "-table2-changelog", |
|
|
|
|
config.getProperty(StreamsConfig.APPLICATION_ID_CONFIG) + "-join-subscription-registration-topic", |
|
|
|
|
config.getProperty(StreamsConfig.APPLICATION_ID_CONFIG) + "-join-subscription-store-changelog", |
|
|
|
|
config.getProperty(StreamsConfig.APPLICATION_ID_CONFIG) + "-join-subscription-response-topic" |
|
|
|
|
)) |
|
|
|
|
); |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
@Test |
|
|
|
|
public void shouldCaptureGlobalTopicNameIfWrittenInto() { |
|
|
|
|
final StreamsBuilder builder = new StreamsBuilder(); |
|
|
|
|
builder.globalTable(SOURCE_TOPIC_1, Materialized.as("globalTable")); |
|
|
|
|
builder.stream(SOURCE_TOPIC_2).to(SOURCE_TOPIC_1); |
|
|
|
|
|
|
|
|
|
testDriver = new TopologyTestDriver(builder.build(), config); |
|
|
|
|
|
|
|
|
|
assertThat(testDriver.producedTopicNames(), hasItems( |
|
|
|
|
endsWith("-changelog"), endsWith("-repartition") |
|
|
|
|
)); |
|
|
|
|
assertThat(testDriver.producedTopicNames(), is(Collections.emptySet())); |
|
|
|
|
|
|
|
|
|
pipeRecord(SOURCE_TOPIC_2, testRecord1); |
|
|
|
|
assertThat( |
|
|
|
|
testDriver.producedTopicNames(), |
|
|
|
|
equalTo(Collections.singleton(SOURCE_TOPIC_1)) |
|
|
|
|
); |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
@Test |
|
|
|
|