diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java index 8cd6e5b4ead..f025b00e039 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java @@ -444,7 +444,7 @@ public class StreamTask extends AbstractTask implements ProcessorNodePunctuator, * An active task is processable if its buffer contains data for all of its input * source topic partitions, or if it is enforced to be processable */ - private boolean isProcessable(final long wallClockTime) { + public boolean isProcessable(final long wallClockTime) { if (partitionGroup.allPartitionsBuffered()) { idleStartTime = RecordQueue.UNKNOWN; return true; @@ -907,6 +907,10 @@ public class StreamTask extends AbstractTask implements ProcessorNodePunctuator, } } + public boolean hasRecordsQueued() { + return numBuffered() > 0; + } + // below are visible for testing only RecordCollector recordCollector() { return recordCollector; diff --git a/streams/test-utils/src/main/java/org/apache/kafka/streams/TopologyTestDriver.java b/streams/test-utils/src/main/java/org/apache/kafka/streams/TopologyTestDriver.java index 9cf0e1272e7..52e5aa25c3d 100644 --- a/streams/test-utils/src/main/java/org/apache/kafka/streams/TopologyTestDriver.java +++ b/streams/test-utils/src/main/java/org/apache/kafka/streams/TopologyTestDriver.java @@ -43,11 +43,6 @@ import org.apache.kafka.streams.errors.TopologyException; import org.apache.kafka.streams.internals.KeyValueStoreFacade; import org.apache.kafka.streams.internals.QuietStreamsConfig; import org.apache.kafka.streams.internals.WindowStoreFacade; -import org.apache.kafka.streams.processor.internals.ProcessorStateManager; -import org.apache.kafka.streams.processor.internals.RecordCollector; -import org.apache.kafka.streams.processor.internals.RecordCollectorImpl; -import org.apache.kafka.streams.processor.internals.Task; -import org.apache.kafka.streams.processor.internals.metrics.TaskMetrics; import org.apache.kafka.streams.processor.ProcessorContext; import org.apache.kafka.streams.processor.PunctuationType; import org.apache.kafka.streams.processor.Punctuator; @@ -62,11 +57,16 @@ import org.apache.kafka.streams.processor.internals.InternalProcessorContext; import org.apache.kafka.streams.processor.internals.InternalTopologyBuilder; import org.apache.kafka.streams.processor.internals.ProcessorContextImpl; import org.apache.kafka.streams.processor.internals.ProcessorRecordContext; +import org.apache.kafka.streams.processor.internals.ProcessorStateManager; import org.apache.kafka.streams.processor.internals.ProcessorTopology; +import org.apache.kafka.streams.processor.internals.RecordCollector; +import org.apache.kafka.streams.processor.internals.RecordCollectorImpl; import org.apache.kafka.streams.processor.internals.StateDirectory; import org.apache.kafka.streams.processor.internals.StoreChangelogReader; import org.apache.kafka.streams.processor.internals.StreamTask; +import org.apache.kafka.streams.processor.internals.Task; import org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl; +import org.apache.kafka.streams.processor.internals.metrics.TaskMetrics; import org.apache.kafka.streams.state.KeyValueStore; import org.apache.kafka.streams.state.ReadOnlyKeyValueStore; import org.apache.kafka.streams.state.ReadOnlySessionStore; @@ -213,9 +213,9 @@ public class TopologyTestDriver implements Closeable { private final MockProducer producer; private final Set internalTopics = new HashSet<>(); - private final Map partitionsByTopic = new HashMap<>(); - private final Map globalPartitionsByTopic = new HashMap<>(); - private final Map offsetsByTopicPartition = new HashMap<>(); + private final Map partitionsByInputTopic = new HashMap<>(); + private final Map globalPartitionsByInputTopic = new HashMap<>(); + private final Map offsetsByTopicOrPatternPartition = new HashMap<>(); private final Map>> outputRecordsByTopic = new HashMap<>(); private final boolean eosEnabled; @@ -287,6 +287,7 @@ public class TopologyTestDriver implements Closeable { final Properties config, final long initialWallClockTimeMs) { final StreamsConfig streamsConfig = new QuietStreamsConfig(config); + logIfTaskIdleEnabled(streamsConfig); mockWallClockTime = new MockTime(initialWallClockTimeMs); internalTopologyBuilder = builder; @@ -334,16 +335,16 @@ public class TopologyTestDriver implements Closeable { for (final String topic : processorTopology.sourceTopics()) { final TopicPartition tp = new TopicPartition(topic, PARTITION_ID); - partitionsByTopic.put(topic, tp); - offsetsByTopicPartition.put(tp, new AtomicLong()); + partitionsByInputTopic.put(topic, tp); + offsetsByTopicOrPatternPartition.put(tp, new AtomicLong()); } - consumer.assign(partitionsByTopic.values()); + consumer.assign(partitionsByInputTopic.values()); if (globalTopology != null) { for (final String topicName : globalTopology.sourceTopics()) { final TopicPartition partition = new TopicPartition(topicName, 0); - globalPartitionsByTopic.put(topicName, partition); - offsetsByTopicPartition.put(partition, new AtomicLong()); + globalPartitionsByInputTopic.put(topicName, partition); + offsetsByTopicOrPatternPartition.put(partition, new AtomicLong()); consumer.updatePartitions(topicName, Collections.singletonList( new PartitionInfo(topicName, 0, null, null, null))); consumer.updateBeginningOffsets(Collections.singletonMap(partition, 0L)); @@ -381,11 +382,11 @@ public class TopologyTestDriver implements Closeable { globalStateTask = null; } - if (!partitionsByTopic.isEmpty()) { + if (!partitionsByInputTopic.isEmpty()) { final LogContext logContext = new LogContext("topology-test-driver "); final ProcessorStateManager stateManager = new ProcessorStateManager( TASK_ID, - new HashSet<>(partitionsByTopic.values()), + new HashSet<>(partitionsByInputTopic.values()), Task.TaskType.ACTIVE, stateDirectory, processorTopology.storeToChangelogTopic(), @@ -405,7 +406,7 @@ public class TopologyTestDriver implements Closeable { taskId -> producer); task = new StreamTask( TASK_ID, - new HashSet<>(partitionsByTopic.values()), + new HashSet<>(partitionsByInputTopic.values()), processorTopology, consumer, streamsConfig, @@ -429,6 +430,20 @@ public class TopologyTestDriver implements Closeable { eosEnabled = streamsConfig.getString(StreamsConfig.PROCESSING_GUARANTEE_CONFIG).equals(StreamsConfig.EXACTLY_ONCE); } + private static void logIfTaskIdleEnabled(final StreamsConfig streamsConfig) { + final Long taskIdleTime = streamsConfig.getLong(StreamsConfig.MAX_TASK_IDLE_MS_CONFIG); + if (taskIdleTime > 0) { + log.info("Detected {} config in use with TopologyTestDriver (set to {}ms)." + + " This means you might need to use TopologyTestDriver#advanceWallClockTime()" + + " or enqueue records on all partitions to allow Steams to make progress." + + " TopologyTestDriver will log a message each time it cannot process enqueued" + + " records due to {}.", + StreamsConfig.MAX_TASK_IDLE_MS_CONFIG, + taskIdleTime, + StreamsConfig.MAX_TASK_IDLE_MS_CONFIG); + } + } + /** * Get read-only handle on global metrics registry. * @@ -456,77 +471,114 @@ public class TopologyTestDriver implements Closeable { consumerRecord.headers()); } - private void pipeRecord(final ProducerRecord record) { - pipeRecord(record.topic(), record.timestamp(), record.key(), record.value(), record.headers()); - } - private void pipeRecord(final String topicName, - final Long timestamp, + final long timestamp, final byte[] key, final byte[] value, final Headers headers) { + final TopicPartition inputTopicOrPatternPartition = getInputTopicOrPatternPartition(topicName); + final TopicPartition globalInputTopicPartition = globalPartitionsByInputTopic.get(topicName); - if (!internalTopologyBuilder.sourceTopicNames().isEmpty()) { - validateSourceTopicNameRegexPattern(topicName); + if (inputTopicOrPatternPartition == null && globalInputTopicPartition == null) { + throw new IllegalArgumentException("Unknown topic: " + topicName); } - final TopicPartition topicPartition = getTopicPartition(topicName); - if (topicPartition != null) { - final long offset = offsetsByTopicPartition.get(topicPartition).incrementAndGet() - 1; - task.addRecords(topicPartition, Collections.singleton(new ConsumerRecord<>( - topicName, - topicPartition.partition(), - offset, - timestamp, - TimestampType.CREATE_TIME, - (long) ConsumerRecord.NULL_CHECKSUM, - key == null ? ConsumerRecord.NULL_SIZE : key.length, - value == null ? ConsumerRecord.NULL_SIZE : value.length, - key, - value, - headers))); - - // Process the record ... - task.process(mockWallClockTime.milliseconds()); - task.maybePunctuateStreamTime(); - task.commit(); - captureOutputRecords(); - } else { - final TopicPartition globalTopicPartition = globalPartitionsByTopic.get(topicName); - if (globalTopicPartition == null) { - throw new IllegalArgumentException("Unknown topic: " + topicName); + + if (inputTopicOrPatternPartition != null) { + enqueueTaskRecord(topicName, inputTopicOrPatternPartition, timestamp, key, value, headers); + completeAllProcessableWork(); + } + + if (globalInputTopicPartition != null) { + processGlobalRecord(globalInputTopicPartition, timestamp, key, value, headers); + } + } + + private void enqueueTaskRecord(final String inputTopic, + final TopicPartition topicOrPatternPartition, + final long timestamp, + final byte[] key, + final byte[] value, + final Headers headers) { + task.addRecords(topicOrPatternPartition, Collections.singleton(new ConsumerRecord<>( + inputTopic, + topicOrPatternPartition.partition(), + offsetsByTopicOrPatternPartition.get(topicOrPatternPartition).incrementAndGet() - 1, + timestamp, + TimestampType.CREATE_TIME, + (long) ConsumerRecord.NULL_CHECKSUM, + key == null ? ConsumerRecord.NULL_SIZE : key.length, + value == null ? ConsumerRecord.NULL_SIZE : value.length, + key, + value, + headers))); + } + + private void completeAllProcessableWork() { + // for internally triggered processing (like wall-clock punctuations), + // we might have buffered some records to internal topics that need to + // be piped back in to kick-start the processing loop. This is idempotent + // and therefore harmless in the case where all we've done is enqueued an + // input record from the user. + captureOutputsAndReEnqueueInternalResults(); + + // If the topology only has global tasks, then `task` would be null. + // For this method, it just means there's nothing to do. + if (task != null) { + while (task.hasRecordsQueued() && task.isProcessable(mockWallClockTime.milliseconds())) { + // Process the record ... + task.process(mockWallClockTime.milliseconds()); + task.maybePunctuateStreamTime(); + task.commit(); + captureOutputsAndReEnqueueInternalResults(); + } + if (task.hasRecordsQueued()) { + log.info("Due to the {} configuration, there are currently some records" + + " that cannot be processed. Advancing wall-clock time or" + + " enqueuing records on the empty topics will allow" + + " Streams to process more.", + StreamsConfig.MAX_TASK_IDLE_MS_CONFIG); } - final long offset = offsetsByTopicPartition.get(globalTopicPartition).incrementAndGet() - 1; - globalStateTask.update(new ConsumerRecord<>( - globalTopicPartition.topic(), - globalTopicPartition.partition(), - offset, - timestamp, - TimestampType.CREATE_TIME, - (long) ConsumerRecord.NULL_CHECKSUM, - key == null ? ConsumerRecord.NULL_SIZE : key.length, - value == null ? ConsumerRecord.NULL_SIZE : value.length, - key, - value, - headers)); - globalStateTask.flushState(); } } + private void processGlobalRecord(final TopicPartition globalInputTopicPartition, + final long timestamp, + final byte[] key, + final byte[] value, + final Headers headers) { + globalStateTask.update(new ConsumerRecord<>( + globalInputTopicPartition.topic(), + globalInputTopicPartition.partition(), + offsetsByTopicOrPatternPartition.get(globalInputTopicPartition).incrementAndGet() - 1, + timestamp, + TimestampType.CREATE_TIME, + (long) ConsumerRecord.NULL_CHECKSUM, + key == null ? ConsumerRecord.NULL_SIZE : key.length, + value == null ? ConsumerRecord.NULL_SIZE : value.length, + key, + value, + headers)); + globalStateTask.flushState(); + } private void validateSourceTopicNameRegexPattern(final String inputRecordTopic) { for (final String sourceTopicName : internalTopologyBuilder.sourceTopicNames()) { if (!sourceTopicName.equals(inputRecordTopic) && Pattern.compile(sourceTopicName).matcher(inputRecordTopic).matches()) { throw new TopologyException("Topology add source of type String for topic: " + sourceTopicName + - " cannot contain regex pattern for input record topic: " + inputRecordTopic + - " and hence cannot process the message."); + " cannot contain regex pattern for input record topic: " + inputRecordTopic + + " and hence cannot process the message."); } } } - private TopicPartition getTopicPartition(final String topicName) { - final TopicPartition topicPartition = partitionsByTopic.get(topicName); + private TopicPartition getInputTopicOrPatternPartition(final String topicName) { + if (!internalTopologyBuilder.sourceTopicNames().isEmpty()) { + validateSourceTopicNameRegexPattern(topicName); + } + + final TopicPartition topicPartition = partitionsByInputTopic.get(topicName); if (topicPartition == null) { - for (final Map.Entry entry : partitionsByTopic.entrySet()) { + for (final Map.Entry entry : partitionsByInputTopic.entrySet()) { if (Pattern.compile(entry.getKey()).matcher(topicName).matches()) { return entry.getValue(); } @@ -535,7 +587,7 @@ public class TopologyTestDriver implements Closeable { return topicPartition; } - private void captureOutputRecords() { + private void captureOutputsAndReEnqueueInternalResults() { // Capture all the records sent to the producer ... final List> output = producer.history(); producer.clear(); @@ -548,9 +600,27 @@ public class TopologyTestDriver implements Closeable { // Forward back into the topology if the produced record is to an internal or a source topic ... final String outputTopicName = record.topic(); - if (internalTopics.contains(outputTopicName) || processorTopology.sourceTopics().contains(outputTopicName) - || globalPartitionsByTopic.containsKey(outputTopicName)) { - pipeRecord(record); + + final TopicPartition inputTopicOrPatternPartition = getInputTopicOrPatternPartition(outputTopicName); + final TopicPartition globalInputTopicPartition = globalPartitionsByInputTopic.get(outputTopicName); + + if (inputTopicOrPatternPartition != null) { + enqueueTaskRecord( + outputTopicName, + inputTopicOrPatternPartition, + record.timestamp(), + record.key(), + record.value(), + record.headers()); + } + + if (globalInputTopicPartition != null) { + processGlobalRecord( + globalInputTopicPartition, + record.timestamp(), + record.key(), + record.value(), + record.headers()); } } } @@ -597,7 +667,7 @@ public class TopologyTestDriver implements Closeable { task.maybePunctuateSystemTime(); task.commit(); } - captureOutputRecords(); + completeAllProcessableWork(); } /** @@ -847,23 +917,23 @@ public class TopologyTestDriver implements Closeable { private void throwIfBuiltInStore(final StateStore stateStore) { if (stateStore instanceof TimestampedKeyValueStore) { throw new IllegalArgumentException("Store " + stateStore.name() - + " is a timestamped key-value store and should be accessed via `getTimestampedKeyValueStore()`"); + + " is a timestamped key-value store and should be accessed via `getTimestampedKeyValueStore()`"); } if (stateStore instanceof ReadOnlyKeyValueStore) { throw new IllegalArgumentException("Store " + stateStore.name() - + " is a key-value store and should be accessed via `getKeyValueStore()`"); + + " is a key-value store and should be accessed via `getKeyValueStore()`"); } if (stateStore instanceof TimestampedWindowStore) { throw new IllegalArgumentException("Store " + stateStore.name() - + " is a timestamped window store and should be accessed via `getTimestampedWindowStore()`"); + + " is a timestamped window store and should be accessed via `getTimestampedWindowStore()`"); } if (stateStore instanceof ReadOnlyWindowStore) { throw new IllegalArgumentException("Store " + stateStore.name() - + " is a window store and should be accessed via `getWindowStore()`"); + + " is a window store and should be accessed via `getWindowStore()`"); } if (stateStore instanceof ReadOnlySessionStore) { throw new IllegalArgumentException("Store " + stateStore.name() - + " is a session store and should be accessed via `getSessionStore()`"); + + " is a session store and should be accessed via `getSessionStore()`"); } } @@ -1009,7 +1079,12 @@ public class TopologyTestDriver implements Closeable { // ignore } } - captureOutputRecords(); + completeAllProcessableWork(); + if (task != null && task.hasRecordsQueued()) { + log.warn("Found some records that cannot be processed due to the" + + " {} configuration during TopologyTestDriver#close().", + StreamsConfig.MAX_TASK_IDLE_MS_CONFIG); + } if (!eosEnabled) { producer.close(); } diff --git a/streams/test-utils/src/test/java/org/apache/kafka/streams/TopologyTestDriverTest.java b/streams/test-utils/src/test/java/org/apache/kafka/streams/TopologyTestDriverTest.java index d7ac6b46557..60ab516f6f7 100644 --- a/streams/test-utils/src/test/java/org/apache/kafka/streams/TopologyTestDriverTest.java +++ b/streams/test-utils/src/test/java/org/apache/kafka/streams/TopologyTestDriverTest.java @@ -33,6 +33,7 @@ import org.apache.kafka.common.utils.SystemTime; import org.apache.kafka.streams.errors.TopologyException; import org.apache.kafka.streams.kstream.Consumed; import org.apache.kafka.streams.kstream.Materialized; +import org.apache.kafka.streams.processor.AbstractProcessor; import org.apache.kafka.streams.processor.Processor; import org.apache.kafka.streams.processor.ProcessorContext; import org.apache.kafka.streams.processor.ProcessorSupplier; @@ -40,6 +41,7 @@ import org.apache.kafka.streams.processor.PunctuationType; import org.apache.kafka.streams.processor.Punctuator; import org.apache.kafka.streams.processor.StateStore; import org.apache.kafka.streams.processor.TaskId; +import org.apache.kafka.streams.processor.To; import org.apache.kafka.streams.state.KeyValueBytesStoreSupplier; import org.apache.kafka.streams.state.KeyValueIterator; import org.apache.kafka.streams.state.KeyValueStore; @@ -73,6 +75,8 @@ import static org.apache.kafka.common.utils.Utils.mkEntry; import static org.apache.kafka.common.utils.Utils.mkMap; import static org.apache.kafka.common.utils.Utils.mkProperties; import static org.hamcrest.CoreMatchers.equalTo; +import static org.hamcrest.CoreMatchers.is; +import static org.hamcrest.CoreMatchers.notNullValue; import static org.hamcrest.MatcherAssert.assertThat; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertFalse; @@ -145,7 +149,7 @@ public class TopologyTestDriverTest { private final String topic; private final Headers headers; - Record(final ConsumerRecord consumerRecord, + Record(final ConsumerRecord consumerRecord, final long newOffset) { key = consumerRecord.key(); value = consumerRecord.value(); @@ -156,7 +160,7 @@ public class TopologyTestDriverTest { } Record(final String newTopic, - final TestRecord consumerRecord, + final TestRecord consumerRecord, final long newOffset) { key = consumerRecord.key(); value = consumerRecord.value(); @@ -231,7 +235,7 @@ public class TopologyTestDriverTest { } } - private final static class MockProcessor implements Processor { + private final static class MockProcessor implements Processor { private final Collection punctuations; private ProcessorContext context; @@ -266,7 +270,7 @@ public class TopologyTestDriverTest { private final List mockProcessors = new ArrayList<>(); - private final class MockProcessorSupplier implements ProcessorSupplier { + private final class MockProcessorSupplier implements ProcessorSupplier { private final Collection punctuations; private MockProcessorSupplier() { @@ -278,7 +282,7 @@ public class TopologyTestDriverTest { } @Override - public Processor get() { + public Processor get() { final MockProcessor mockProcessor = new MockProcessor(punctuations); mockProcessors.add(mockProcessor); return mockProcessor; @@ -452,7 +456,7 @@ public class TopologyTestDriverTest { testDriver = new TopologyTestDriver(setupSourceSinkTopology(), config); pipeRecord(SOURCE_TOPIC_1, testRecord1); - final ProducerRecord outputRecord = testDriver.readRecord(SINK_TOPIC_1); + final ProducerRecord outputRecord = testDriver.readRecord(SINK_TOPIC_1); assertEquals(key1, outputRecord.key()); assertEquals(value1, outputRecord.value()); @@ -705,7 +709,7 @@ public class TopologyTestDriverTest { pipeRecord(SOURCE_TOPIC_1, testRecord1); - ProducerRecord outputRecord = testDriver.readRecord(SINK_TOPIC_1); + ProducerRecord outputRecord = testDriver.readRecord(SINK_TOPIC_1); assertEquals(key1, outputRecord.key()); assertEquals(value1, outputRecord.value()); assertEquals(SINK_TOPIC_1, outputRecord.topic()); @@ -1209,7 +1213,7 @@ public class TopologyTestDriverTest { testDriver.pipeRecord(topic, new TestRecord<>(key, value, null, time), new StringSerializer(), new LongSerializer(), null); } - + private void compareKeyValue(final TestRecord record, final String key, final Long value) { assertThat(record.getKey(), equalTo(key)); assertThat(record.getValue(), equalTo(value)); @@ -1337,9 +1341,9 @@ public class TopologyTestDriverTest { topology.addSource("sourceProcessor", "input-topic"); topology.addProcessor( "storeProcessor", - new ProcessorSupplier() { + new ProcessorSupplier() { @Override - public Processor get() { + public Processor get() { return new Processor() { private KeyValueStore store; @@ -1472,7 +1476,7 @@ public class TopologyTestDriverTest { testDriver = new TopologyTestDriver(topology, config); pipeRecord(SOURCE_TOPIC_1, testRecord1); - final ProducerRecord outputRecord = testDriver.readRecord(SINK_TOPIC_1); + final ProducerRecord outputRecord = testDriver.readRecord(SINK_TOPIC_1); assertEquals(key1, outputRecord.key()); assertEquals(value1, outputRecord.value()); assertEquals(SINK_TOPIC_1, outputRecord.topic()); @@ -1522,4 +1526,174 @@ public class TopologyTestDriverTest { final TaskId taskId = new TaskId(0, 0); assertTrue(new File(appDir, taskId.toString()).exists()); } + + @Test + public void shouldEnqueueLaterOutputsAfterEarlierOnes() { + final Properties properties = new Properties(); + properties.setProperty(StreamsConfig.APPLICATION_ID_CONFIG, "dummy"); + properties.setProperty(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "dummy"); + + final Topology topology = new Topology(); + topology.addSource("source", new StringDeserializer(), new StringDeserializer(), "input"); + topology.addProcessor( + "recursiveProcessor", + () -> new AbstractProcessor() { + @Override + public void process(final String key, final String value) { + if (!value.startsWith("recurse-")) { + context().forward(key, "recurse-" + value, To.child("recursiveSink")); + } + context().forward(key, value, To.child("sink")); + } + }, + "source" + ); + topology.addSink("recursiveSink", "input", new StringSerializer(), new StringSerializer(), "recursiveProcessor"); + topology.addSink("sink", "output", new StringSerializer(), new StringSerializer(), "recursiveProcessor"); + + try (final TopologyTestDriver topologyTestDriver = new TopologyTestDriver(topology, properties)) { + final TestInputTopic in = topologyTestDriver.createInputTopic("input", new StringSerializer(), new StringSerializer()); + final TestOutputTopic out = topologyTestDriver.createOutputTopic("output", new StringDeserializer(), new StringDeserializer()); + + // given the topology above, we expect to see the output _first_ echo the input + // and _then_ print it with "recurse-" prepended. + + in.pipeInput("B", "beta"); + final List> events = out.readKeyValuesToList(); + assertThat( + events, + is(Arrays.asList( + new KeyValue<>("B", "beta"), + new KeyValue<>("B", "recurse-beta") + )) + ); + + } + } + + @Test + public void shouldApplyGlobalUpdatesCorrectlyInRecursiveTopologies() { + final Properties properties = new Properties(); + properties.setProperty(StreamsConfig.APPLICATION_ID_CONFIG, "dummy"); + properties.setProperty(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "dummy"); + + final Topology topology = new Topology(); + topology.addSource("source", new StringDeserializer(), new StringDeserializer(), "input"); + topology.addGlobalStore( + Stores.keyValueStoreBuilder(Stores.inMemoryKeyValueStore("globule-store"), Serdes.String(), Serdes.String()).withLoggingDisabled(), + "globuleSource", + new StringDeserializer(), + new StringDeserializer(), + "globule-topic", + "globuleProcessor", + () -> new Processor() { + private KeyValueStore stateStore; + + @SuppressWarnings("unchecked") + @Override + public void init(final ProcessorContext context) { + stateStore = (KeyValueStore) context.getStateStore("globule-store"); + } + + @Override + public void process(final String key, final String value) { + stateStore.put(key, value); + } + + @Override + public void close() { + + } + } + ); + topology.addProcessor( + "recursiveProcessor", + () -> new AbstractProcessor() { + @Override + public void process(final String key, final String value) { + if (!value.startsWith("recurse-")) { + context().forward(key, "recurse-" + value, To.child("recursiveSink")); + } + context().forward(key, value, To.child("sink")); + context().forward(key, value, To.child("globuleSink")); + } + }, + "source" + ); + topology.addSink("recursiveSink", "input", new StringSerializer(), new StringSerializer(), "recursiveProcessor"); + topology.addSink("sink", "output", new StringSerializer(), new StringSerializer(), "recursiveProcessor"); + topology.addSink("globuleSink", "globule-topic", new StringSerializer(), new StringSerializer(), "recursiveProcessor"); + + try (final TopologyTestDriver topologyTestDriver = new TopologyTestDriver(topology, properties)) { + final TestInputTopic in = topologyTestDriver.createInputTopic("input", new StringSerializer(), new StringSerializer()); + final TestOutputTopic globalTopic = topologyTestDriver.createOutputTopic("globule-topic", new StringDeserializer(), new StringDeserializer()); + + in.pipeInput("A", "alpha"); + + // expect the global store to correctly reflect the last update + final KeyValueStore keyValueStore = topologyTestDriver.getKeyValueStore("globule-store"); + assertThat(keyValueStore, notNullValue()); + assertThat(keyValueStore.get("A"), is("recurse-alpha")); + + // and also just make sure the test really sent both events to the topic. + final List> events = globalTopic.readKeyValuesToList(); + assertThat( + events, + is(Arrays.asList( + new KeyValue<>("A", "alpha"), + new KeyValue<>("A", "recurse-alpha") + )) + ); + } + } + + @Test + public void shouldRespectTaskIdling() { + final Properties properties = new Properties(); + properties.setProperty(StreamsConfig.APPLICATION_ID_CONFIG, "dummy"); + properties.setProperty(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "dummy"); + + // This is the key to this test. Wall-clock time doesn't advance automatically in TopologyTestDriver, + // so with an idle time specified, TTD can't just expect all enqueued records to be processable. + properties.setProperty(StreamsConfig.MAX_TASK_IDLE_MS_CONFIG, "1000"); + + final Topology topology = new Topology(); + topology.addSource("source1", new StringDeserializer(), new StringDeserializer(), "input1"); + topology.addSource("source2", new StringDeserializer(), new StringDeserializer(), "input2"); + topology.addSink("sink", "output", new StringSerializer(), new StringSerializer(), "source1", "source2"); + + try (final TopologyTestDriver topologyTestDriver = new TopologyTestDriver(topology, properties)) { + final TestInputTopic in1 = topologyTestDriver.createInputTopic("input1", new StringSerializer(), new StringSerializer()); + final TestInputTopic in2 = topologyTestDriver.createInputTopic("input2", new StringSerializer(), new StringSerializer()); + final TestOutputTopic out = topologyTestDriver.createOutputTopic("output", new StringDeserializer(), new StringDeserializer()); + + in1.pipeInput("A", "alpha"); + topologyTestDriver.advanceWallClockTime(Duration.ofMillis(1)); + + // only one input has records, and it's only been one ms + assertThat(out.readKeyValuesToList(), is(Collections.emptyList())); + + in2.pipeInput("B", "beta"); + + // because both topics have records, we can process (even though it's only been one ms) + // but after processing A (the earlier record), we now only have one input queued, so + // task idling takes effect again + assertThat( + out.readKeyValuesToList(), + is(Collections.singletonList( + new KeyValue<>("A", "alpha") + )) + ); + + topologyTestDriver.advanceWallClockTime(Duration.ofSeconds(1)); + + // now that one second has elapsed, the idle time has expired, and we can process B + assertThat( + out.readKeyValuesToList(), + is(Collections.singletonList( + new KeyValue<>("B", "beta") + )) + ); + } + } } diff --git a/streams/test-utils/src/test/resources/log4j.properties b/streams/test-utils/src/test/resources/log4j.properties new file mode 100644 index 00000000000..be36f90299a --- /dev/null +++ b/streams/test-utils/src/test/resources/log4j.properties @@ -0,0 +1,21 @@ +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +log4j.rootLogger=INFO, stdout + +log4j.appender.stdout=org.apache.log4j.ConsoleAppender +log4j.appender.stdout.layout=org.apache.log4j.PatternLayout +log4j.appender.stdout.layout.ConversionPattern=[%d] %p %m (%c:%L)%n + +log4j.logger.org.apache.kafka=INFO