@ -43,11 +43,6 @@ import org.apache.kafka.streams.errors.TopologyException;
@@ -43,11 +43,6 @@ import org.apache.kafka.streams.errors.TopologyException;
import org.apache.kafka.streams.internals.KeyValueStoreFacade ;
import org.apache.kafka.streams.internals.QuietStreamsConfig ;
import org.apache.kafka.streams.internals.WindowStoreFacade ;
import org.apache.kafka.streams.processor.internals.ProcessorStateManager ;
import org.apache.kafka.streams.processor.internals.RecordCollector ;
import org.apache.kafka.streams.processor.internals.RecordCollectorImpl ;
import org.apache.kafka.streams.processor.internals.Task ;
import org.apache.kafka.streams.processor.internals.metrics.TaskMetrics ;
import org.apache.kafka.streams.processor.ProcessorContext ;
import org.apache.kafka.streams.processor.PunctuationType ;
import org.apache.kafka.streams.processor.Punctuator ;
@ -62,11 +57,16 @@ import org.apache.kafka.streams.processor.internals.InternalProcessorContext;
@@ -62,11 +57,16 @@ import org.apache.kafka.streams.processor.internals.InternalProcessorContext;
import org.apache.kafka.streams.processor.internals.InternalTopologyBuilder ;
import org.apache.kafka.streams.processor.internals.ProcessorContextImpl ;
import org.apache.kafka.streams.processor.internals.ProcessorRecordContext ;
import org.apache.kafka.streams.processor.internals.ProcessorStateManager ;
import org.apache.kafka.streams.processor.internals.ProcessorTopology ;
import org.apache.kafka.streams.processor.internals.RecordCollector ;
import org.apache.kafka.streams.processor.internals.RecordCollectorImpl ;
import org.apache.kafka.streams.processor.internals.StateDirectory ;
import org.apache.kafka.streams.processor.internals.StoreChangelogReader ;
import org.apache.kafka.streams.processor.internals.StreamTask ;
import org.apache.kafka.streams.processor.internals.Task ;
import org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl ;
import org.apache.kafka.streams.processor.internals.metrics.TaskMetrics ;
import org.apache.kafka.streams.state.KeyValueStore ;
import org.apache.kafka.streams.state.ReadOnlyKeyValueStore ;
import org.apache.kafka.streams.state.ReadOnlySessionStore ;
@ -213,9 +213,9 @@ public class TopologyTestDriver implements Closeable {
@@ -213,9 +213,9 @@ public class TopologyTestDriver implements Closeable {
private final MockProducer < byte [ ] , byte [ ] > producer ;
private final Set < String > internalTopics = new HashSet < > ( ) ;
private final Map < String , TopicPartition > partitionsByTopic = new HashMap < > ( ) ;
private final Map < String , TopicPartition > globalPartitionsByTopic = new HashMap < > ( ) ;
private final Map < TopicPartition , AtomicLong > offsetsByTopicPartition = new HashMap < > ( ) ;
private final Map < String , TopicPartition > partitionsByInput Topic = new HashMap < > ( ) ;
private final Map < String , TopicPartition > globalPartitionsByInput Topic = new HashMap < > ( ) ;
private final Map < TopicPartition , AtomicLong > offsetsByTopicOrPattern Partition = new HashMap < > ( ) ;
private final Map < String , Queue < ProducerRecord < byte [ ] , byte [ ] > > > outputRecordsByTopic = new HashMap < > ( ) ;
private final boolean eosEnabled ;
@ -287,6 +287,7 @@ public class TopologyTestDriver implements Closeable {
@@ -287,6 +287,7 @@ public class TopologyTestDriver implements Closeable {
final Properties config ,
final long initialWallClockTimeMs ) {
final StreamsConfig streamsConfig = new QuietStreamsConfig ( config ) ;
logIfTaskIdleEnabled ( streamsConfig ) ;
mockWallClockTime = new MockTime ( initialWallClockTimeMs ) ;
internalTopologyBuilder = builder ;
@ -334,16 +335,16 @@ public class TopologyTestDriver implements Closeable {
@@ -334,16 +335,16 @@ public class TopologyTestDriver implements Closeable {
for ( final String topic : processorTopology . sourceTopics ( ) ) {
final TopicPartition tp = new TopicPartition ( topic , PARTITION_ID ) ;
partitionsByTopic . put ( topic , tp ) ;
offsetsByTopicPartition . put ( tp , new AtomicLong ( ) ) ;
partitionsByInput Topic . put ( topic , tp ) ;
offsetsByTopicOrPattern Partition . put ( tp , new AtomicLong ( ) ) ;
}
consumer . assign ( partitionsByTopic . values ( ) ) ;
consumer . assign ( partitionsByInput Topic . values ( ) ) ;
if ( globalTopology ! = null ) {
for ( final String topicName : globalTopology . sourceTopics ( ) ) {
final TopicPartition partition = new TopicPartition ( topicName , 0 ) ;
globalPartitionsByTopic . put ( topicName , partition ) ;
offsetsByTopicPartition . put ( partition , new AtomicLong ( ) ) ;
globalPartitionsByInput Topic . put ( topicName , partition ) ;
offsetsByTopicOrPattern Partition . put ( partition , new AtomicLong ( ) ) ;
consumer . updatePartitions ( topicName , Collections . singletonList (
new PartitionInfo ( topicName , 0 , null , null , null ) ) ) ;
consumer . updateBeginningOffsets ( Collections . singletonMap ( partition , 0L ) ) ;
@ -381,11 +382,11 @@ public class TopologyTestDriver implements Closeable {
@@ -381,11 +382,11 @@ public class TopologyTestDriver implements Closeable {
globalStateTask = null ;
}
if ( ! partitionsByTopic . isEmpty ( ) ) {
if ( ! partitionsByInput Topic . isEmpty ( ) ) {
final LogContext logContext = new LogContext ( "topology-test-driver " ) ;
final ProcessorStateManager stateManager = new ProcessorStateManager (
TASK_ID ,
new HashSet < > ( partitionsByTopic . values ( ) ) ,
new HashSet < > ( partitionsByInput Topic . values ( ) ) ,
Task . TaskType . ACTIVE ,
stateDirectory ,
processorTopology . storeToChangelogTopic ( ) ,
@ -405,7 +406,7 @@ public class TopologyTestDriver implements Closeable {
@@ -405,7 +406,7 @@ public class TopologyTestDriver implements Closeable {
taskId - > producer ) ;
task = new StreamTask (
TASK_ID ,
new HashSet < > ( partitionsByTopic . values ( ) ) ,
new HashSet < > ( partitionsByInput Topic . values ( ) ) ,
processorTopology ,
consumer ,
streamsConfig ,
@ -429,6 +430,20 @@ public class TopologyTestDriver implements Closeable {
@@ -429,6 +430,20 @@ public class TopologyTestDriver implements Closeable {
eosEnabled = streamsConfig . getString ( StreamsConfig . PROCESSING_GUARANTEE_CONFIG ) . equals ( StreamsConfig . EXACTLY_ONCE ) ;
}
private static void logIfTaskIdleEnabled ( final StreamsConfig streamsConfig ) {
final Long taskIdleTime = streamsConfig . getLong ( StreamsConfig . MAX_TASK_IDLE_MS_CONFIG ) ;
if ( taskIdleTime > 0 ) {
log . info ( "Detected {} config in use with TopologyTestDriver (set to {}ms)." +
" This means you might need to use TopologyTestDriver#advanceWallClockTime()" +
" or enqueue records on all partitions to allow Steams to make progress." +
" TopologyTestDriver will log a message each time it cannot process enqueued" +
" records due to {}." ,
StreamsConfig . MAX_TASK_IDLE_MS_CONFIG ,
taskIdleTime ,
StreamsConfig . MAX_TASK_IDLE_MS_CONFIG ) ;
}
}
/ * *
* Get read - only handle on global metrics registry .
*
@ -456,77 +471,114 @@ public class TopologyTestDriver implements Closeable {
@@ -456,77 +471,114 @@ public class TopologyTestDriver implements Closeable {
consumerRecord . headers ( ) ) ;
}
private void pipeRecord ( final ProducerRecord < byte [ ] , byte [ ] > record ) {
pipeRecord ( record . topic ( ) , record . timestamp ( ) , record . key ( ) , record . value ( ) , record . headers ( ) ) ;
}
private void pipeRecord ( final String topicName ,
final L ong timestamp ,
final long timestamp ,
final byte [ ] key ,
final byte [ ] value ,
final Headers headers ) {
final TopicPartition inputTopicOrPatternPartition = getInputTopicOrPatternPartition ( topicName ) ;
final TopicPartition globalInputTopicPartition = globalPartitionsByInputTopic . get ( topicName ) ;
if ( ! internalTopologyBuilder . sourceTopicNames ( ) . isEmpty ( ) ) {
validateSourceTopicNameRegexPattern ( topicName ) ;
if ( inputTopicOrPatternPartition = = null & & globalInputTopicPartition = = null ) {
throw new IllegalArgumentException ( "Unknown topic: " + topicName ) ;
}
final TopicPartition topicPartition = getTopicPartition ( topicName ) ;
if ( topicPartition ! = null ) {
final long offset = offsetsByTopicPartition . get ( topicPartition ) . incrementAndGet ( ) - 1 ;
task . addRecords ( topicPartition , Collections . singleton ( new ConsumerRecord < > (
topicName ,
topicPartition . partition ( ) ,
offset ,
timestamp ,
TimestampType . CREATE_TIME ,
( long ) ConsumerRecord . NULL_CHECKSUM ,
key = = null ? ConsumerRecord . NULL_SIZE : key . length ,
value = = null ? ConsumerRecord . NULL_SIZE : value . length ,
key ,
value ,
headers ) ) ) ;
// Process the record ...
task . process ( mockWallClockTime . milliseconds ( ) ) ;
task . maybePunctuateStreamTime ( ) ;
task . commit ( ) ;
captureOutputRecords ( ) ;
} else {
final TopicPartition globalTopicPartition = globalPartitionsByTopic . get ( topicName ) ;
if ( globalTopicPartition = = null ) {
throw new IllegalArgumentException ( "Unknown topic: " + topicName ) ;
if ( inputTopicOrPatternPartition ! = null ) {
enqueueTaskRecord ( topicName , inputTopicOrPatternPartition , timestamp , key , value , headers ) ;
completeAllProcessableWork ( ) ;
}
if ( globalInputTopicPartition ! = null ) {
processGlobalRecord ( globalInputTopicPartition , timestamp , key , value , headers ) ;
}
}
private void enqueueTaskRecord ( final String inputTopic ,
final TopicPartition topicOrPatternPartition ,
final long timestamp ,
final byte [ ] key ,
final byte [ ] value ,
final Headers headers ) {
task . addRecords ( topicOrPatternPartition , Collections . singleton ( new ConsumerRecord < > (
inputTopic ,
topicOrPatternPartition . partition ( ) ,
offsetsByTopicOrPatternPartition . get ( topicOrPatternPartition ) . incrementAndGet ( ) - 1 ,
timestamp ,
TimestampType . CREATE_TIME ,
( long ) ConsumerRecord . NULL_CHECKSUM ,
key = = null ? ConsumerRecord . NULL_SIZE : key . length ,
value = = null ? ConsumerRecord . NULL_SIZE : value . length ,
key ,
value ,
headers ) ) ) ;
}
private void completeAllProcessableWork ( ) {
// for internally triggered processing (like wall-clock punctuations),
// we might have buffered some records to internal topics that need to
// be piped back in to kick-start the processing loop. This is idempotent
// and therefore harmless in the case where all we've done is enqueued an
// input record from the user.
captureOutputsAndReEnqueueInternalResults ( ) ;
// If the topology only has global tasks, then `task` would be null.
// For this method, it just means there's nothing to do.
if ( task ! = null ) {
while ( task . hasRecordsQueued ( ) & & task . isProcessable ( mockWallClockTime . milliseconds ( ) ) ) {
// Process the record ...
task . process ( mockWallClockTime . milliseconds ( ) ) ;
task . maybePunctuateStreamTime ( ) ;
task . commit ( ) ;
captureOutputsAndReEnqueueInternalResults ( ) ;
}
if ( task . hasRecordsQueued ( ) ) {
log . info ( "Due to the {} configuration, there are currently some records" +
" that cannot be processed. Advancing wall-clock time or" +
" enqueuing records on the empty topics will allow" +
" Streams to process more." ,
StreamsConfig . MAX_TASK_IDLE_MS_CONFIG ) ;
}
final long offset = offsetsByTopicPartition . get ( globalTopicPartition ) . incrementAndGet ( ) - 1 ;
globalStateTask . update ( new ConsumerRecord < > (
globalTopicPartition . topic ( ) ,
globalTopicPartition . partition ( ) ,
offset ,
timestamp ,
TimestampType . CREATE_TIME ,
( long ) ConsumerRecord . NULL_CHECKSUM ,
key = = null ? ConsumerRecord . NULL_SIZE : key . length ,
value = = null ? ConsumerRecord . NULL_SIZE : value . length ,
key ,
value ,
headers ) ) ;
globalStateTask . flushState ( ) ;
}
}
private void processGlobalRecord ( final TopicPartition globalInputTopicPartition ,
final long timestamp ,
final byte [ ] key ,
final byte [ ] value ,
final Headers headers ) {
globalStateTask . update ( new ConsumerRecord < > (
globalInputTopicPartition . topic ( ) ,
globalInputTopicPartition . partition ( ) ,
offsetsByTopicOrPatternPartition . get ( globalInputTopicPartition ) . incrementAndGet ( ) - 1 ,
timestamp ,
TimestampType . CREATE_TIME ,
( long ) ConsumerRecord . NULL_CHECKSUM ,
key = = null ? ConsumerRecord . NULL_SIZE : key . length ,
value = = null ? ConsumerRecord . NULL_SIZE : value . length ,
key ,
value ,
headers ) ) ;
globalStateTask . flushState ( ) ;
}
private void validateSourceTopicNameRegexPattern ( final String inputRecordTopic ) {
for ( final String sourceTopicName : internalTopologyBuilder . sourceTopicNames ( ) ) {
if ( ! sourceTopicName . equals ( inputRecordTopic ) & & Pattern . compile ( sourceTopicName ) . matcher ( inputRecordTopic ) . matches ( ) ) {
throw new TopologyException ( "Topology add source of type String for topic: " + sourceTopicName +
" cannot contain regex pattern for input record topic: " + inputRecordTopic +
" and hence cannot process the message." ) ;
" cannot contain regex pattern for input record topic: " + inputRecordTopic +
" and hence cannot process the message." ) ;
}
}
}
private TopicPartition getTopicPartition ( final String topicName ) {
final TopicPartition topicPartition = partitionsByTopic . get ( topicName ) ;
private TopicPartition getInputTopicOrPatternPartition ( final String topicName ) {
if ( ! internalTopologyBuilder . sourceTopicNames ( ) . isEmpty ( ) ) {
validateSourceTopicNameRegexPattern ( topicName ) ;
}
final TopicPartition topicPartition = partitionsByInputTopic . get ( topicName ) ;
if ( topicPartition = = null ) {
for ( final Map . Entry < String , TopicPartition > entry : partitionsByTopic . entrySet ( ) ) {
for ( final Map . Entry < String , TopicPartition > entry : partitionsByInput Topic . entrySet ( ) ) {
if ( Pattern . compile ( entry . getKey ( ) ) . matcher ( topicName ) . matches ( ) ) {
return entry . getValue ( ) ;
}
@ -535,7 +587,7 @@ public class TopologyTestDriver implements Closeable {
@@ -535,7 +587,7 @@ public class TopologyTestDriver implements Closeable {
return topicPartition ;
}
private void captureOutputRecord s ( ) {
private void captureOutputsAndReEnqueueInternalResult s ( ) {
// Capture all the records sent to the producer ...
final List < ProducerRecord < byte [ ] , byte [ ] > > output = producer . history ( ) ;
producer . clear ( ) ;
@ -548,9 +600,27 @@ public class TopologyTestDriver implements Closeable {
@@ -548,9 +600,27 @@ public class TopologyTestDriver implements Closeable {
// Forward back into the topology if the produced record is to an internal or a source topic ...
final String outputTopicName = record . topic ( ) ;
if ( internalTopics . contains ( outputTopicName ) | | processorTopology . sourceTopics ( ) . contains ( outputTopicName )
| | globalPartitionsByTopic . containsKey ( outputTopicName ) ) {
pipeRecord ( record ) ;
final TopicPartition inputTopicOrPatternPartition = getInputTopicOrPatternPartition ( outputTopicName ) ;
final TopicPartition globalInputTopicPartition = globalPartitionsByInputTopic . get ( outputTopicName ) ;
if ( inputTopicOrPatternPartition ! = null ) {
enqueueTaskRecord (
outputTopicName ,
inputTopicOrPatternPartition ,
record . timestamp ( ) ,
record . key ( ) ,
record . value ( ) ,
record . headers ( ) ) ;
}
if ( globalInputTopicPartition ! = null ) {
processGlobalRecord (
globalInputTopicPartition ,
record . timestamp ( ) ,
record . key ( ) ,
record . value ( ) ,
record . headers ( ) ) ;
}
}
}
@ -597,7 +667,7 @@ public class TopologyTestDriver implements Closeable {
@@ -597,7 +667,7 @@ public class TopologyTestDriver implements Closeable {
task . maybePunctuateSystemTime ( ) ;
task . commit ( ) ;
}
captureOutputRecords ( ) ;
completeAllProcessableWork ( ) ;
}
/ * *
@ -847,23 +917,23 @@ public class TopologyTestDriver implements Closeable {
@@ -847,23 +917,23 @@ public class TopologyTestDriver implements Closeable {
private void throwIfBuiltInStore ( final StateStore stateStore ) {
if ( stateStore instanceof TimestampedKeyValueStore ) {
throw new IllegalArgumentException ( "Store " + stateStore . name ( )
+ " is a timestamped key-value store and should be accessed via `getTimestampedKeyValueStore()`" ) ;
+ " is a timestamped key-value store and should be accessed via `getTimestampedKeyValueStore()`" ) ;
}
if ( stateStore instanceof ReadOnlyKeyValueStore ) {
throw new IllegalArgumentException ( "Store " + stateStore . name ( )
+ " is a key-value store and should be accessed via `getKeyValueStore()`" ) ;
+ " is a key-value store and should be accessed via `getKeyValueStore()`" ) ;
}
if ( stateStore instanceof TimestampedWindowStore ) {
throw new IllegalArgumentException ( "Store " + stateStore . name ( )
+ " is a timestamped window store and should be accessed via `getTimestampedWindowStore()`" ) ;
+ " is a timestamped window store and should be accessed via `getTimestampedWindowStore()`" ) ;
}
if ( stateStore instanceof ReadOnlyWindowStore ) {
throw new IllegalArgumentException ( "Store " + stateStore . name ( )
+ " is a window store and should be accessed via `getWindowStore()`" ) ;
+ " is a window store and should be accessed via `getWindowStore()`" ) ;
}
if ( stateStore instanceof ReadOnlySessionStore ) {
throw new IllegalArgumentException ( "Store " + stateStore . name ( )
+ " is a session store and should be accessed via `getSessionStore()`" ) ;
+ " is a session store and should be accessed via `getSessionStore()`" ) ;
}
}
@ -1009,7 +1079,12 @@ public class TopologyTestDriver implements Closeable {
@@ -1009,7 +1079,12 @@ public class TopologyTestDriver implements Closeable {
// ignore
}
}
captureOutputRecords ( ) ;
completeAllProcessableWork ( ) ;
if ( task ! = null & & task . hasRecordsQueued ( ) ) {
log . warn ( "Found some records that cannot be processed due to the" +
" {} configuration during TopologyTestDriver#close()." ,
StreamsConfig . MAX_TASK_IDLE_MS_CONFIG ) ;
}
if ( ! eosEnabled ) {
producer . close ( ) ;
}