From 62fbc92e3d9d84617dab46329c279835d157bda4 Mon Sep 17 00:00:00 2001 From: "A. Sophie Blee-Goldman" Date: Thu, 18 Jul 2019 13:54:46 -0700 Subject: [PATCH] KAFKA-8615: Change to track partition time breaks TimestampExtractor (#7054) The timestamp extractor takes a previousTimestamp parameter which should be the partition time. This PR adds back in partition time tracking for the extractor, and renames previousTimestamp --> partitionTime Reviewers: Guozhang Wang , Bill Bejeck , Matthias J. Sax --- .../pageview/JsonTimestampExtractor.java | 2 +- .../ExtractRecordMetadataTimestamp.java | 10 +- .../processor/FailOnInvalidTimestamp.java | 4 +- .../LogAndSkipOnInvalidTimestamp.java | 4 +- .../streams/processor/TimestampExtractor.java | 4 +- .../UsePreviousTimeOnInvalidTimestamp.java | 10 +- .../WallclockTimestampExtractor.java | 4 +- .../processor/internals/PartitionGroup.java | 18 +-- .../processor/internals/RecordQueue.java | 23 +++- .../processor/internals/StreamTask.java | 10 +- .../kafka/streams/StreamsConfigTest.java | 2 +- .../internals/PartitionGroupTest.java | 8 +- .../internals/ProcessorTopologyTest.java | 2 +- .../processor/internals/RecordQueueTest.java | 106 +++++++++++++++--- .../kafka/test/MockTimestampExtractor.java | 2 +- 15 files changed, 151 insertions(+), 58 deletions(-) diff --git a/streams/examples/src/main/java/org/apache/kafka/streams/examples/pageview/JsonTimestampExtractor.java b/streams/examples/src/main/java/org/apache/kafka/streams/examples/pageview/JsonTimestampExtractor.java index 4f6257ac74a..d760183a8a5 100644 --- a/streams/examples/src/main/java/org/apache/kafka/streams/examples/pageview/JsonTimestampExtractor.java +++ b/streams/examples/src/main/java/org/apache/kafka/streams/examples/pageview/JsonTimestampExtractor.java @@ -27,7 +27,7 @@ import org.apache.kafka.streams.processor.TimestampExtractor; public class JsonTimestampExtractor implements TimestampExtractor { @Override - public long extract(final ConsumerRecord record, final long previousTimestamp) { + public long extract(final ConsumerRecord record, final long partitionTime) { if (record.value() instanceof PageViewTypedDemo.PageView) { return ((PageViewTypedDemo.PageView) record.value()).timestamp; } diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/ExtractRecordMetadataTimestamp.java b/streams/src/main/java/org/apache/kafka/streams/processor/ExtractRecordMetadataTimestamp.java index 79c8dd34e54..3c7428a8c23 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/ExtractRecordMetadataTimestamp.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/ExtractRecordMetadataTimestamp.java @@ -50,15 +50,15 @@ abstract class ExtractRecordMetadataTimestamp implements TimestampExtractor { * Extracts the embedded metadata timestamp from the given {@link ConsumerRecord}. * * @param record a data record - * @param previousTimestamp the latest extracted valid timestamp of the current record's partition˙ (could be -1 if unknown) + * @param partitionTime the highest extracted valid timestamp of the current record's partition˙ (could be -1 if unknown) * @return the embedded metadata timestamp of the given {@link ConsumerRecord} */ @Override - public long extract(final ConsumerRecord record, final long previousTimestamp) { + public long extract(final ConsumerRecord record, final long partitionTime) { final long timestamp = record.timestamp(); if (timestamp < 0) { - return onInvalidTimestamp(record, timestamp, previousTimestamp); + return onInvalidTimestamp(record, timestamp, partitionTime); } return timestamp; @@ -69,10 +69,10 @@ abstract class ExtractRecordMetadataTimestamp implements TimestampExtractor { * * @param record a data record * @param recordTimestamp the timestamp extractor from the record - * @param previousTimestamp the latest extracted valid timestamp of the current record's partition˙ (could be -1 if unknown) + * @param partitionTime the highest extracted valid timestamp of the current record's partition˙ (could be -1 if unknown) * @return a new timestamp for the record (if negative, record will not be processed but dropped silently) */ public abstract long onInvalidTimestamp(final ConsumerRecord record, final long recordTimestamp, - final long previousTimestamp); + final long partitionTime); } diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/FailOnInvalidTimestamp.java b/streams/src/main/java/org/apache/kafka/streams/processor/FailOnInvalidTimestamp.java index 87cb0dec0e8..40d3e0ea595 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/FailOnInvalidTimestamp.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/FailOnInvalidTimestamp.java @@ -54,14 +54,14 @@ public class FailOnInvalidTimestamp extends ExtractRecordMetadataTimestamp { * * @param record a data record * @param recordTimestamp the timestamp extractor from the record - * @param previousTimestamp the latest extracted valid timestamp of the current record's partition˙ (could be -1 if unknown) + * @param partitionTime the highest extracted valid timestamp of the current record's partition˙ (could be -1 if unknown) * @return nothing; always raises an exception * @throws StreamsException on every invocation */ @Override public long onInvalidTimestamp(final ConsumerRecord record, final long recordTimestamp, - final long previousTimestamp) + final long partitionTime) throws StreamsException { final String message = "Input record " + record + " has invalid (negative) timestamp. " + diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/LogAndSkipOnInvalidTimestamp.java b/streams/src/main/java/org/apache/kafka/streams/processor/LogAndSkipOnInvalidTimestamp.java index 0561e61011d..b759e5bd424 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/LogAndSkipOnInvalidTimestamp.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/LogAndSkipOnInvalidTimestamp.java @@ -56,13 +56,13 @@ public class LogAndSkipOnInvalidTimestamp extends ExtractRecordMetadataTimestamp * * @param record a data record * @param recordTimestamp the timestamp extractor from the record - * @param previousTimestamp the latest extracted valid timestamp of the current record's partition˙ (could be -1 if unknown) + * @param partitionTime the highest extracted valid timestamp of the current record's partition˙ (could be -1 if unknown) * @return the originally extracted timestamp of the record */ @Override public long onInvalidTimestamp(final ConsumerRecord record, final long recordTimestamp, - final long previousTimestamp) { + final long partitionTime) { log.warn("Input record {} will be dropped because it has an invalid (negative) timestamp.", record); return recordTimestamp; } diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/TimestampExtractor.java b/streams/src/main/java/org/apache/kafka/streams/processor/TimestampExtractor.java index 0780dc09e59..1e6d6cd65c1 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/TimestampExtractor.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/TimestampExtractor.java @@ -46,8 +46,8 @@ public interface TimestampExtractor { * * * @param record a data record - * @param previousTimestamp the latest extracted valid timestamp of the current record's partition˙ (could be -1 if unknown) + * @param partitionTime the highest extracted valid timestamp of the current record's partition˙ (could be -1 if unknown) * @return the timestamp of the record */ - long extract(ConsumerRecord record, long previousTimestamp); + long extract(ConsumerRecord record, long partitionTime); } diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/UsePreviousTimeOnInvalidTimestamp.java b/streams/src/main/java/org/apache/kafka/streams/processor/UsePreviousTimeOnInvalidTimestamp.java index dd952ccf120..89e2fd3729b 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/UsePreviousTimeOnInvalidTimestamp.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/UsePreviousTimeOnInvalidTimestamp.java @@ -51,20 +51,20 @@ public class UsePreviousTimeOnInvalidTimestamp extends ExtractRecordMetadataTime * * @param record a data record * @param recordTimestamp the timestamp extractor from the record - * @param previousTimestamp the latest extracted valid timestamp of the current record's partition˙ (could be -1 if unknown) + * @param partitionTime the highest extracted valid timestamp of the current record's partition˙ (could be -1 if unknown) * @return the provided latest extracted valid timestamp as new timestamp for the record * @throws StreamsException if latest extracted valid timestamp is unknown */ @Override public long onInvalidTimestamp(final ConsumerRecord record, final long recordTimestamp, - final long previousTimestamp) + final long partitionTime) throws StreamsException { - if (previousTimestamp < 0) { + if (partitionTime < 0) { throw new StreamsException("Could not infer new timestamp for input record " + record - + " because latest extracted valid timestamp is unknown."); + + " because partition time is unknown."); } - return previousTimestamp; + return partitionTime; } diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/WallclockTimestampExtractor.java b/streams/src/main/java/org/apache/kafka/streams/processor/WallclockTimestampExtractor.java index ad3b3bc75de..baa1cb6dbea 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/WallclockTimestampExtractor.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/WallclockTimestampExtractor.java @@ -38,11 +38,11 @@ public class WallclockTimestampExtractor implements TimestampExtractor { * Return the current wall clock time as timestamp. * * @param record a data record - * @param previousTimestamp the latest extracted valid timestamp of the current record's partition˙ (could be -1 if unknown) + * @param partitionTime the highest extracted valid timestamp of the current record's partition˙ (could be -1 if unknown) * @return the current wall clock time, expressed in milliseconds since midnight, January 1, 1970 UTC */ @Override - public long extract(final ConsumerRecord record, final long previousTimestamp) { + public long extract(final ConsumerRecord record, final long partitionTime) { return System.currentTimeMillis(); } } diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/PartitionGroup.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/PartitionGroup.java index fbafa73aca2..83b36734647 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/PartitionGroup.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/PartitionGroup.java @@ -41,8 +41,8 @@ import java.util.Set; * * PartitionGroup also maintains a stream-time for the group as a whole. * This is defined as the highest timestamp of any record yet polled from the PartitionGroup. - * The PartitionGroup's stream-time is also the stream-time of its task and is used as the - * stream-time for any computations that require it. + * Note however that any computation that depends on stream-time should track it on a per-operator basis to obtain an + * accurate view of the local time as seen by that processor. * * The PartitionGroups's stream-time is initially UNKNOWN (-1), and it set to a known value upon first poll. * As a consequence of the definition, the PartitionGroup's stream-time is non-decreasing @@ -76,7 +76,7 @@ public class PartitionGroup { } PartitionGroup(final Map partitionQueues, final Sensor recordLatenessSensor) { - nonEmptyQueuesByTime = new PriorityQueue<>(partitionQueues.size(), Comparator.comparingLong(RecordQueue::timestamp)); + nonEmptyQueuesByTime = new PriorityQueue<>(partitionQueues.size(), Comparator.comparingLong(RecordQueue::headRecordTimestamp)); this.partitionQueues = partitionQueues; this.recordLatenessSensor = recordLatenessSensor; totalBuffered = 0; @@ -109,7 +109,7 @@ public class PartitionGroup { nonEmptyQueuesByTime.offer(queue); } - // always update the stream time to the record's timestamp yet to be processed if it is larger + // always update the stream-time to the record's timestamp yet to be processed if it is larger if (record.timestamp > streamTime) { streamTime = record.timestamp; recordLatenessSensor.record(0); @@ -140,8 +140,8 @@ public class PartitionGroup { nonEmptyQueuesByTime.offer(recordQueue); // if all partitions now are non-empty, set the flag - // we do not need to update the stream time here since this task will definitely be - // processed next, and hence the stream time will be updated when we retrieved records by then + // we do not need to update the stream-time here since this task will definitely be + // processed next, and hence the stream-time will be updated when we retrieved records by then if (nonEmptyQueuesByTime.size() == this.partitionQueues.size()) { allBuffered = true; } @@ -157,10 +157,9 @@ public class PartitionGroup { } /** - * Return the timestamp of this partition group as the smallest - * partition timestamp among all its partitions + * Return the stream-time of this partition group defined as the largest timestamp seen across all partitions */ - public long timestamp() { + public long streamTime() { return streamTime; } @@ -192,6 +191,7 @@ public class PartitionGroup { public void clear() { nonEmptyQueuesByTime.clear(); + streamTime = RecordQueue.UNKNOWN; for (final RecordQueue queue : partitionQueues.values()) { queue.clear(); } diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/RecordQueue.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/RecordQueue.java index 6f3e70b9922..de1d9a26bb3 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/RecordQueue.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/RecordQueue.java @@ -31,8 +31,8 @@ import java.util.ArrayDeque; /** * RecordQueue is a FIFO queue of {@link StampedRecord} (ConsumerRecord + timestamp). It also keeps track of the - * partition timestamp defined as the minimum timestamp of records in its queue; in addition, its partition - * timestamp is monotonically increasing such that once it is advanced, it will not be decremented. + * partition timestamp defined as the largest timestamp seen on the partition so far; this is passed to the + * timestamp extractor. */ public class RecordQueue { @@ -47,6 +47,7 @@ public class RecordQueue { private final ArrayDeque> fifoQueue; private StampedRecord headRecord = null; + private long partitionTime = RecordQueue.UNKNOWN; private Sensor skipRecordsSensor; @@ -139,20 +140,30 @@ public class RecordQueue { } /** - * Returns the tracked partition timestamp + * Returns the head record's timestamp * * @return timestamp */ - public long timestamp() { + public long headRecordTimestamp() { return headRecord == null ? UNKNOWN : headRecord.timestamp; } + /** + * Returns the tracked partition time + * + * @return partition time + */ + long partitionTime() { + return partitionTime; + } + /** * Clear the fifo queue of its elements, also clear the time tracker's kept stamped elements */ public void clear() { fifoQueue.clear(); headRecord = null; + partitionTime = RecordQueue.UNKNOWN; } private void updateHead() { @@ -167,7 +178,7 @@ public class RecordQueue { final long timestamp; try { - timestamp = timestampExtractor.extract(deserialized, timestamp()); + timestamp = timestampExtractor.extract(deserialized, partitionTime); } catch (final StreamsException internalFatalExtractorException) { throw internalFatalExtractorException; } catch (final Exception fatalUserException) { @@ -189,6 +200,8 @@ public class RecordQueue { } headRecord = new StampedRecord(deserialized, timestamp); + + partitionTime = Math.max(partitionTime, timestamp); } } } diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java index 59d7503102e..210412be92f 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java @@ -798,14 +798,14 @@ public class StreamTask extends AbstractTask implements ProcessorNodePunctuator * @throws TaskMigratedException if the task producer got fenced (EOS only) */ public boolean maybePunctuateStreamTime() { - final long timestamp = partitionGroup.timestamp(); + final long streamTime = partitionGroup.streamTime(); // if the timestamp is not known yet, meaning there is not enough data accumulated // to reason stream partition time, then skip. - if (timestamp == RecordQueue.UNKNOWN) { + if (streamTime == RecordQueue.UNKNOWN) { return false; } else { - final boolean punctuated = streamTimePunctuationQueue.mayPunctuate(timestamp, PunctuationType.STREAM_TIME, this); + final boolean punctuated = streamTimePunctuationQueue.mayPunctuate(streamTime, PunctuationType.STREAM_TIME, this); if (punctuated) { commitNeeded = true; @@ -823,9 +823,9 @@ public class StreamTask extends AbstractTask implements ProcessorNodePunctuator * @throws TaskMigratedException if the task producer got fenced (EOS only) */ public boolean maybePunctuateSystemTime() { - final long timestamp = time.milliseconds(); + final long systemTime = time.milliseconds(); - final boolean punctuated = systemTimePunctuationQueue.mayPunctuate(timestamp, PunctuationType.WALL_CLOCK_TIME, this); + final boolean punctuated = systemTimePunctuationQueue.mayPunctuate(systemTime, PunctuationType.WALL_CLOCK_TIME, this); if (punctuated) { commitNeeded = true; diff --git a/streams/src/test/java/org/apache/kafka/streams/StreamsConfigTest.java b/streams/src/test/java/org/apache/kafka/streams/StreamsConfigTest.java index aa3860e8bc2..27e225597b2 100644 --- a/streams/src/test/java/org/apache/kafka/streams/StreamsConfigTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/StreamsConfigTest.java @@ -662,7 +662,7 @@ public class StreamsConfigTest { public static class MockTimestampExtractor implements TimestampExtractor { @Override - public long extract(final ConsumerRecord record, final long previousTimestamp) { + public long extract(final ConsumerRecord record, final long partitionTime) { return 0; } } diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/PartitionGroupTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/PartitionGroupTest.java index 6b95bdf9f0a..cfc814f2e42 100644 --- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/PartitionGroupTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/PartitionGroupTest.java @@ -107,7 +107,7 @@ public class PartitionGroupTest { // st: -1 since no records was being processed yet verifyBuffered(6, 3, 3); - assertEquals(-1L, group.timestamp()); + assertEquals(-1L, group.streamTime()); assertEquals(0.0, metrics.metric(lastLatenessValue).metricValue()); StampedRecord record; @@ -143,7 +143,7 @@ public class PartitionGroupTest { // 2:[4, 6] // st: 2 (just adding records shouldn't change it) verifyBuffered(6, 4, 2); - assertEquals(2L, group.timestamp()); + assertEquals(2L, group.streamTime()); assertEquals(0.0, metrics.metric(lastLatenessValue).metricValue()); // get one record, time should be advanced @@ -221,7 +221,7 @@ public class PartitionGroupTest { group.addRawRecords(partition1, list1); verifyBuffered(3, 3, 0); - assertEquals(-1L, group.timestamp()); + assertEquals(-1L, group.streamTime()); assertEquals(0.0, metrics.metric(lastLatenessValue).metricValue()); StampedRecord record; @@ -258,7 +258,7 @@ public class PartitionGroupTest { private void verifyTimes(final StampedRecord record, final long recordTime, final long streamTime) { assertEquals(recordTime, record.timestamp); - assertEquals(streamTime, group.timestamp()); + assertEquals(streamTime, group.streamTime()); } private void verifyBuffered(final int totalBuffered, final int partitionOneBuffered, final int partitionTwoBuffered) { diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorTopologyTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorTopologyTest.java index 76252c1581e..2669039ed61 100644 --- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorTopologyTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorTopologyTest.java @@ -731,7 +731,7 @@ public class ProcessorTopologyTest { private static final long DEFAULT_TIMESTAMP = 1000L; @Override - public long extract(final ConsumerRecord record, final long previousTimestamp) { + public long extract(final ConsumerRecord record, final long partitionTime) { if (record.value().toString().matches(".*@[0-9]+")) { return Long.parseLong(record.value().toString().split("@")[1]); } diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/RecordQueueTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/RecordQueueTest.java index c16cb2a7137..6dadb49b8ab 100644 --- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/RecordQueueTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/RecordQueueTest.java @@ -101,7 +101,7 @@ public class RecordQueueTest { assertTrue(queue.isEmpty()); assertEquals(0, queue.size()); - assertEquals(RecordQueue.UNKNOWN, queue.timestamp()); + assertEquals(RecordQueue.UNKNOWN, queue.headRecordTimestamp()); // add three 3 out-of-order records with timestamp 2, 1, 3 final List> list1 = Arrays.asList( @@ -112,17 +112,17 @@ public class RecordQueueTest { queue.addRawRecords(list1); assertEquals(3, queue.size()); - assertEquals(2L, queue.timestamp()); + assertEquals(2L, queue.headRecordTimestamp()); // poll the first record, now with 1, 3 assertEquals(2L, queue.poll().timestamp); assertEquals(2, queue.size()); - assertEquals(1L, queue.timestamp()); + assertEquals(1L, queue.headRecordTimestamp()); // poll the second record, now with 3 assertEquals(1L, queue.poll().timestamp); assertEquals(1, queue.size()); - assertEquals(3L, queue.timestamp()); + assertEquals(3L, queue.headRecordTimestamp()); // add three 3 out-of-order records with timestamp 4, 1, 2 // now with 3, 4, 1, 2 @@ -134,24 +134,24 @@ public class RecordQueueTest { queue.addRawRecords(list2); assertEquals(4, queue.size()); - assertEquals(3L, queue.timestamp()); + assertEquals(3L, queue.headRecordTimestamp()); // poll the third record, now with 4, 1, 2 assertEquals(3L, queue.poll().timestamp); assertEquals(3, queue.size()); - assertEquals(4L, queue.timestamp()); + assertEquals(4L, queue.headRecordTimestamp()); // poll the rest records assertEquals(4L, queue.poll().timestamp); - assertEquals(1L, queue.timestamp()); + assertEquals(1L, queue.headRecordTimestamp()); assertEquals(1L, queue.poll().timestamp); - assertEquals(2L, queue.timestamp()); + assertEquals(2L, queue.headRecordTimestamp()); assertEquals(2L, queue.poll().timestamp); assertTrue(queue.isEmpty()); assertEquals(0, queue.size()); - assertEquals(RecordQueue.UNKNOWN, queue.timestamp()); + assertEquals(RecordQueue.UNKNOWN, queue.headRecordTimestamp()); // add three more records with 4, 5, 6 final List> list3 = Arrays.asList( @@ -162,24 +162,51 @@ public class RecordQueueTest { queue.addRawRecords(list3); assertEquals(3, queue.size()); - assertEquals(4L, queue.timestamp()); + assertEquals(4L, queue.headRecordTimestamp()); // poll one record again, the timestamp should advance now assertEquals(4L, queue.poll().timestamp); assertEquals(2, queue.size()); - assertEquals(5L, queue.timestamp()); + assertEquals(5L, queue.headRecordTimestamp()); // clear the queue queue.clear(); assertTrue(queue.isEmpty()); assertEquals(0, queue.size()); - assertEquals(RecordQueue.UNKNOWN, queue.timestamp()); + assertEquals(RecordQueue.UNKNOWN, queue.headRecordTimestamp()); // re-insert the three records with 4, 5, 6 queue.addRawRecords(list3); assertEquals(3, queue.size()); - assertEquals(4L, queue.timestamp()); + assertEquals(4L, queue.headRecordTimestamp()); + } + + @Test + public void shouldTrackPartitionTimeAsMaxSeenTimestamp() { + + assertTrue(queue.isEmpty()); + assertEquals(0, queue.size()); + assertEquals(RecordQueue.UNKNOWN, queue.headRecordTimestamp()); + + // add three 3 out-of-order records with timestamp 2, 1, 3, 4 + final List> list1 = Arrays.asList( + new ConsumerRecord<>("topic", 1, 2, 0L, TimestampType.CREATE_TIME, 0L, 0, 0, recordKey, recordValue), + new ConsumerRecord<>("topic", 1, 1, 0L, TimestampType.CREATE_TIME, 0L, 0, 0, recordKey, recordValue), + new ConsumerRecord<>("topic", 1, 3, 0L, TimestampType.CREATE_TIME, 0L, 0, 0, recordKey, recordValue), + new ConsumerRecord<>("topic", 1, 4, 0L, TimestampType.CREATE_TIME, 0L, 0, 0, recordKey, recordValue)); + + assertEquals(queue.partitionTime(), RecordQueue.UNKNOWN); + + queue.addRawRecords(list1); + + assertEquals(queue.partitionTime(), 2L); + + queue.poll(); + assertEquals(queue.partitionTime(), 2L); + + queue.poll(); + assertEquals(queue.partitionTime(), 3L); } @Test(expected = StreamsException.class) @@ -253,4 +280,57 @@ public class RecordQueueTest { assertEquals(0, queue.size()); } + + @Test + public void shouldPassPartitionTimeToTimestampExtractor() { + + final PartitionTimeTrackingTimestampExtractor timestampExtractor = new PartitionTimeTrackingTimestampExtractor(); + final RecordQueue queue = new RecordQueue( + new TopicPartition(topics[0], 1), + mockSourceNodeWithMetrics, + timestampExtractor, + new LogAndFailExceptionHandler(), + context, + new LogContext()); + + assertTrue(queue.isEmpty()); + assertEquals(0, queue.size()); + assertEquals(RecordQueue.UNKNOWN, queue.headRecordTimestamp()); + + // add three 3 out-of-order records with timestamp 2, 1, 3, 4 + final List> list1 = Arrays.asList( + new ConsumerRecord<>("topic", 1, 2, 0L, TimestampType.CREATE_TIME, 0L, 0, 0, recordKey, recordValue), + new ConsumerRecord<>("topic", 1, 1, 0L, TimestampType.CREATE_TIME, 0L, 0, 0, recordKey, recordValue), + new ConsumerRecord<>("topic", 1, 3, 0L, TimestampType.CREATE_TIME, 0L, 0, 0, recordKey, recordValue), + new ConsumerRecord<>("topic", 1, 4, 0L, TimestampType.CREATE_TIME, 0L, 0, 0, recordKey, recordValue)); + + assertEquals(RecordQueue.UNKNOWN, timestampExtractor.partitionTime); + + queue.addRawRecords(list1); + + // no (known) timestamp has yet been passed to the timestamp extractor + assertEquals(RecordQueue.UNKNOWN, timestampExtractor.partitionTime); + + queue.poll(); + assertEquals(2L, timestampExtractor.partitionTime); + + queue.poll(); + assertEquals(2L, timestampExtractor.partitionTime); + + queue.poll(); + assertEquals(3L, timestampExtractor.partitionTime); + + } + + class PartitionTimeTrackingTimestampExtractor implements TimestampExtractor { + private long partitionTime = RecordQueue.UNKNOWN; + + public long extract(final ConsumerRecord record, final long partitionTime) { + if (partitionTime < this.partitionTime) { + throw new IllegalStateException("Partition time should not decrease"); + } + this.partitionTime = partitionTime; + return record.offset(); + } + } } diff --git a/streams/src/test/java/org/apache/kafka/test/MockTimestampExtractor.java b/streams/src/test/java/org/apache/kafka/test/MockTimestampExtractor.java index 17011643797..f437772cf37 100644 --- a/streams/src/test/java/org/apache/kafka/test/MockTimestampExtractor.java +++ b/streams/src/test/java/org/apache/kafka/test/MockTimestampExtractor.java @@ -23,7 +23,7 @@ import org.apache.kafka.streams.processor.TimestampExtractor; public class MockTimestampExtractor implements TimestampExtractor { @Override - public long extract(final ConsumerRecord record, final long previousTimestamp) { + public long extract(final ConsumerRecord record, final long partitionTime) { return record.offset(); } }