Browse Source

KAFKA-8615: Change to track partition time breaks TimestampExtractor (#7054)

The timestamp extractor takes a previousTimestamp parameter which should be the partition time. This PR adds back in partition time tracking for the extractor, and renames previousTimestamp --> partitionTime

Reviewers: Guozhang Wang <wangguoz@gmail.com>, Bill Bejeck <bbejeck@gmail.com>, Matthias J. Sax <mjsax@apache.org>
pull/6069/head
A. Sophie Blee-Goldman 5 years ago committed by Guozhang Wang
parent
commit
62fbc92e3d
  1. 2
      streams/examples/src/main/java/org/apache/kafka/streams/examples/pageview/JsonTimestampExtractor.java
  2. 10
      streams/src/main/java/org/apache/kafka/streams/processor/ExtractRecordMetadataTimestamp.java
  3. 4
      streams/src/main/java/org/apache/kafka/streams/processor/FailOnInvalidTimestamp.java
  4. 4
      streams/src/main/java/org/apache/kafka/streams/processor/LogAndSkipOnInvalidTimestamp.java
  5. 4
      streams/src/main/java/org/apache/kafka/streams/processor/TimestampExtractor.java
  6. 10
      streams/src/main/java/org/apache/kafka/streams/processor/UsePreviousTimeOnInvalidTimestamp.java
  7. 4
      streams/src/main/java/org/apache/kafka/streams/processor/WallclockTimestampExtractor.java
  8. 18
      streams/src/main/java/org/apache/kafka/streams/processor/internals/PartitionGroup.java
  9. 23
      streams/src/main/java/org/apache/kafka/streams/processor/internals/RecordQueue.java
  10. 10
      streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java
  11. 2
      streams/src/test/java/org/apache/kafka/streams/StreamsConfigTest.java
  12. 8
      streams/src/test/java/org/apache/kafka/streams/processor/internals/PartitionGroupTest.java
  13. 2
      streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorTopologyTest.java
  14. 106
      streams/src/test/java/org/apache/kafka/streams/processor/internals/RecordQueueTest.java
  15. 2
      streams/src/test/java/org/apache/kafka/test/MockTimestampExtractor.java

2
streams/examples/src/main/java/org/apache/kafka/streams/examples/pageview/JsonTimestampExtractor.java

@ -27,7 +27,7 @@ import org.apache.kafka.streams.processor.TimestampExtractor; @@ -27,7 +27,7 @@ import org.apache.kafka.streams.processor.TimestampExtractor;
public class JsonTimestampExtractor implements TimestampExtractor {
@Override
public long extract(final ConsumerRecord<Object, Object> record, final long previousTimestamp) {
public long extract(final ConsumerRecord<Object, Object> record, final long partitionTime) {
if (record.value() instanceof PageViewTypedDemo.PageView) {
return ((PageViewTypedDemo.PageView) record.value()).timestamp;
}

10
streams/src/main/java/org/apache/kafka/streams/processor/ExtractRecordMetadataTimestamp.java

@ -50,15 +50,15 @@ abstract class ExtractRecordMetadataTimestamp implements TimestampExtractor { @@ -50,15 +50,15 @@ abstract class ExtractRecordMetadataTimestamp implements TimestampExtractor {
* Extracts the embedded metadata timestamp from the given {@link ConsumerRecord}.
*
* @param record a data record
* @param previousTimestamp the latest extracted valid timestamp of the current record's partition˙ (could be -1 if unknown)
* @param partitionTime the highest extracted valid timestamp of the current record's partition˙ (could be -1 if unknown)
* @return the embedded metadata timestamp of the given {@link ConsumerRecord}
*/
@Override
public long extract(final ConsumerRecord<Object, Object> record, final long previousTimestamp) {
public long extract(final ConsumerRecord<Object, Object> record, final long partitionTime) {
final long timestamp = record.timestamp();
if (timestamp < 0) {
return onInvalidTimestamp(record, timestamp, previousTimestamp);
return onInvalidTimestamp(record, timestamp, partitionTime);
}
return timestamp;
@ -69,10 +69,10 @@ abstract class ExtractRecordMetadataTimestamp implements TimestampExtractor { @@ -69,10 +69,10 @@ abstract class ExtractRecordMetadataTimestamp implements TimestampExtractor {
*
* @param record a data record
* @param recordTimestamp the timestamp extractor from the record
* @param previousTimestamp the latest extracted valid timestamp of the current record's partition˙ (could be -1 if unknown)
* @param partitionTime the highest extracted valid timestamp of the current record's partition˙ (could be -1 if unknown)
* @return a new timestamp for the record (if negative, record will not be processed but dropped silently)
*/
public abstract long onInvalidTimestamp(final ConsumerRecord<Object, Object> record,
final long recordTimestamp,
final long previousTimestamp);
final long partitionTime);
}

4
streams/src/main/java/org/apache/kafka/streams/processor/FailOnInvalidTimestamp.java

@ -54,14 +54,14 @@ public class FailOnInvalidTimestamp extends ExtractRecordMetadataTimestamp { @@ -54,14 +54,14 @@ public class FailOnInvalidTimestamp extends ExtractRecordMetadataTimestamp {
*
* @param record a data record
* @param recordTimestamp the timestamp extractor from the record
* @param previousTimestamp the latest extracted valid timestamp of the current record's partition˙ (could be -1 if unknown)
* @param partitionTime the highest extracted valid timestamp of the current record's partition˙ (could be -1 if unknown)
* @return nothing; always raises an exception
* @throws StreamsException on every invocation
*/
@Override
public long onInvalidTimestamp(final ConsumerRecord<Object, Object> record,
final long recordTimestamp,
final long previousTimestamp)
final long partitionTime)
throws StreamsException {
final String message = "Input record " + record + " has invalid (negative) timestamp. " +

4
streams/src/main/java/org/apache/kafka/streams/processor/LogAndSkipOnInvalidTimestamp.java

@ -56,13 +56,13 @@ public class LogAndSkipOnInvalidTimestamp extends ExtractRecordMetadataTimestamp @@ -56,13 +56,13 @@ public class LogAndSkipOnInvalidTimestamp extends ExtractRecordMetadataTimestamp
*
* @param record a data record
* @param recordTimestamp the timestamp extractor from the record
* @param previousTimestamp the latest extracted valid timestamp of the current record's partition˙ (could be -1 if unknown)
* @param partitionTime the highest extracted valid timestamp of the current record's partition˙ (could be -1 if unknown)
* @return the originally extracted timestamp of the record
*/
@Override
public long onInvalidTimestamp(final ConsumerRecord<Object, Object> record,
final long recordTimestamp,
final long previousTimestamp) {
final long partitionTime) {
log.warn("Input record {} will be dropped because it has an invalid (negative) timestamp.", record);
return recordTimestamp;
}

4
streams/src/main/java/org/apache/kafka/streams/processor/TimestampExtractor.java

@ -46,8 +46,8 @@ public interface TimestampExtractor { @@ -46,8 +46,8 @@ public interface TimestampExtractor {
*
*
* @param record a data record
* @param previousTimestamp the latest extracted valid timestamp of the current record's partition˙ (could be -1 if unknown)
* @param partitionTime the highest extracted valid timestamp of the current record's partition˙ (could be -1 if unknown)
* @return the timestamp of the record
*/
long extract(ConsumerRecord<Object, Object> record, long previousTimestamp);
long extract(ConsumerRecord<Object, Object> record, long partitionTime);
}

10
streams/src/main/java/org/apache/kafka/streams/processor/UsePreviousTimeOnInvalidTimestamp.java

@ -51,20 +51,20 @@ public class UsePreviousTimeOnInvalidTimestamp extends ExtractRecordMetadataTime @@ -51,20 +51,20 @@ public class UsePreviousTimeOnInvalidTimestamp extends ExtractRecordMetadataTime
*
* @param record a data record
* @param recordTimestamp the timestamp extractor from the record
* @param previousTimestamp the latest extracted valid timestamp of the current record's partition˙ (could be -1 if unknown)
* @param partitionTime the highest extracted valid timestamp of the current record's partition˙ (could be -1 if unknown)
* @return the provided latest extracted valid timestamp as new timestamp for the record
* @throws StreamsException if latest extracted valid timestamp is unknown
*/
@Override
public long onInvalidTimestamp(final ConsumerRecord<Object, Object> record,
final long recordTimestamp,
final long previousTimestamp)
final long partitionTime)
throws StreamsException {
if (previousTimestamp < 0) {
if (partitionTime < 0) {
throw new StreamsException("Could not infer new timestamp for input record " + record
+ " because latest extracted valid timestamp is unknown.");
+ " because partition time is unknown.");
}
return previousTimestamp;
return partitionTime;
}

4
streams/src/main/java/org/apache/kafka/streams/processor/WallclockTimestampExtractor.java

@ -38,11 +38,11 @@ public class WallclockTimestampExtractor implements TimestampExtractor { @@ -38,11 +38,11 @@ public class WallclockTimestampExtractor implements TimestampExtractor {
* Return the current wall clock time as timestamp.
*
* @param record a data record
* @param previousTimestamp the latest extracted valid timestamp of the current record's partition˙ (could be -1 if unknown)
* @param partitionTime the highest extracted valid timestamp of the current record's partition˙ (could be -1 if unknown)
* @return the current wall clock time, expressed in milliseconds since midnight, January 1, 1970 UTC
*/
@Override
public long extract(final ConsumerRecord<Object, Object> record, final long previousTimestamp) {
public long extract(final ConsumerRecord<Object, Object> record, final long partitionTime) {
return System.currentTimeMillis();
}
}

18
streams/src/main/java/org/apache/kafka/streams/processor/internals/PartitionGroup.java

@ -41,8 +41,8 @@ import java.util.Set; @@ -41,8 +41,8 @@ import java.util.Set;
*
* PartitionGroup also maintains a stream-time for the group as a whole.
* This is defined as the highest timestamp of any record yet polled from the PartitionGroup.
* The PartitionGroup's stream-time is also the stream-time of its task and is used as the
* stream-time for any computations that require it.
* Note however that any computation that depends on stream-time should track it on a per-operator basis to obtain an
* accurate view of the local time as seen by that processor.
*
* The PartitionGroups's stream-time is initially UNKNOWN (-1), and it set to a known value upon first poll.
* As a consequence of the definition, the PartitionGroup's stream-time is non-decreasing
@ -76,7 +76,7 @@ public class PartitionGroup { @@ -76,7 +76,7 @@ public class PartitionGroup {
}
PartitionGroup(final Map<TopicPartition, RecordQueue> partitionQueues, final Sensor recordLatenessSensor) {
nonEmptyQueuesByTime = new PriorityQueue<>(partitionQueues.size(), Comparator.comparingLong(RecordQueue::timestamp));
nonEmptyQueuesByTime = new PriorityQueue<>(partitionQueues.size(), Comparator.comparingLong(RecordQueue::headRecordTimestamp));
this.partitionQueues = partitionQueues;
this.recordLatenessSensor = recordLatenessSensor;
totalBuffered = 0;
@ -109,7 +109,7 @@ public class PartitionGroup { @@ -109,7 +109,7 @@ public class PartitionGroup {
nonEmptyQueuesByTime.offer(queue);
}
// always update the stream time to the record's timestamp yet to be processed if it is larger
// always update the stream-time to the record's timestamp yet to be processed if it is larger
if (record.timestamp > streamTime) {
streamTime = record.timestamp;
recordLatenessSensor.record(0);
@ -140,8 +140,8 @@ public class PartitionGroup { @@ -140,8 +140,8 @@ public class PartitionGroup {
nonEmptyQueuesByTime.offer(recordQueue);
// if all partitions now are non-empty, set the flag
// we do not need to update the stream time here since this task will definitely be
// processed next, and hence the stream time will be updated when we retrieved records by then
// we do not need to update the stream-time here since this task will definitely be
// processed next, and hence the stream-time will be updated when we retrieved records by then
if (nonEmptyQueuesByTime.size() == this.partitionQueues.size()) {
allBuffered = true;
}
@ -157,10 +157,9 @@ public class PartitionGroup { @@ -157,10 +157,9 @@ public class PartitionGroup {
}
/**
* Return the timestamp of this partition group as the smallest
* partition timestamp among all its partitions
* Return the stream-time of this partition group defined as the largest timestamp seen across all partitions
*/
public long timestamp() {
public long streamTime() {
return streamTime;
}
@ -192,6 +191,7 @@ public class PartitionGroup { @@ -192,6 +191,7 @@ public class PartitionGroup {
public void clear() {
nonEmptyQueuesByTime.clear();
streamTime = RecordQueue.UNKNOWN;
for (final RecordQueue queue : partitionQueues.values()) {
queue.clear();
}

23
streams/src/main/java/org/apache/kafka/streams/processor/internals/RecordQueue.java

@ -31,8 +31,8 @@ import java.util.ArrayDeque; @@ -31,8 +31,8 @@ import java.util.ArrayDeque;
/**
* RecordQueue is a FIFO queue of {@link StampedRecord} (ConsumerRecord + timestamp). It also keeps track of the
* partition timestamp defined as the minimum timestamp of records in its queue; in addition, its partition
* timestamp is monotonically increasing such that once it is advanced, it will not be decremented.
* partition timestamp defined as the largest timestamp seen on the partition so far; this is passed to the
* timestamp extractor.
*/
public class RecordQueue {
@ -47,6 +47,7 @@ public class RecordQueue { @@ -47,6 +47,7 @@ public class RecordQueue {
private final ArrayDeque<ConsumerRecord<byte[], byte[]>> fifoQueue;
private StampedRecord headRecord = null;
private long partitionTime = RecordQueue.UNKNOWN;
private Sensor skipRecordsSensor;
@ -139,20 +140,30 @@ public class RecordQueue { @@ -139,20 +140,30 @@ public class RecordQueue {
}
/**
* Returns the tracked partition timestamp
* Returns the head record's timestamp
*
* @return timestamp
*/
public long timestamp() {
public long headRecordTimestamp() {
return headRecord == null ? UNKNOWN : headRecord.timestamp;
}
/**
* Returns the tracked partition time
*
* @return partition time
*/
long partitionTime() {
return partitionTime;
}
/**
* Clear the fifo queue of its elements, also clear the time tracker's kept stamped elements
*/
public void clear() {
fifoQueue.clear();
headRecord = null;
partitionTime = RecordQueue.UNKNOWN;
}
private void updateHead() {
@ -167,7 +178,7 @@ public class RecordQueue { @@ -167,7 +178,7 @@ public class RecordQueue {
final long timestamp;
try {
timestamp = timestampExtractor.extract(deserialized, timestamp());
timestamp = timestampExtractor.extract(deserialized, partitionTime);
} catch (final StreamsException internalFatalExtractorException) {
throw internalFatalExtractorException;
} catch (final Exception fatalUserException) {
@ -189,6 +200,8 @@ public class RecordQueue { @@ -189,6 +200,8 @@ public class RecordQueue {
}
headRecord = new StampedRecord(deserialized, timestamp);
partitionTime = Math.max(partitionTime, timestamp);
}
}
}

10
streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java

@ -798,14 +798,14 @@ public class StreamTask extends AbstractTask implements ProcessorNodePunctuator @@ -798,14 +798,14 @@ public class StreamTask extends AbstractTask implements ProcessorNodePunctuator
* @throws TaskMigratedException if the task producer got fenced (EOS only)
*/
public boolean maybePunctuateStreamTime() {
final long timestamp = partitionGroup.timestamp();
final long streamTime = partitionGroup.streamTime();
// if the timestamp is not known yet, meaning there is not enough data accumulated
// to reason stream partition time, then skip.
if (timestamp == RecordQueue.UNKNOWN) {
if (streamTime == RecordQueue.UNKNOWN) {
return false;
} else {
final boolean punctuated = streamTimePunctuationQueue.mayPunctuate(timestamp, PunctuationType.STREAM_TIME, this);
final boolean punctuated = streamTimePunctuationQueue.mayPunctuate(streamTime, PunctuationType.STREAM_TIME, this);
if (punctuated) {
commitNeeded = true;
@ -823,9 +823,9 @@ public class StreamTask extends AbstractTask implements ProcessorNodePunctuator @@ -823,9 +823,9 @@ public class StreamTask extends AbstractTask implements ProcessorNodePunctuator
* @throws TaskMigratedException if the task producer got fenced (EOS only)
*/
public boolean maybePunctuateSystemTime() {
final long timestamp = time.milliseconds();
final long systemTime = time.milliseconds();
final boolean punctuated = systemTimePunctuationQueue.mayPunctuate(timestamp, PunctuationType.WALL_CLOCK_TIME, this);
final boolean punctuated = systemTimePunctuationQueue.mayPunctuate(systemTime, PunctuationType.WALL_CLOCK_TIME, this);
if (punctuated) {
commitNeeded = true;

2
streams/src/test/java/org/apache/kafka/streams/StreamsConfigTest.java

@ -662,7 +662,7 @@ public class StreamsConfigTest { @@ -662,7 +662,7 @@ public class StreamsConfigTest {
public static class MockTimestampExtractor implements TimestampExtractor {
@Override
public long extract(final ConsumerRecord<Object, Object> record, final long previousTimestamp) {
public long extract(final ConsumerRecord<Object, Object> record, final long partitionTime) {
return 0;
}
}

8
streams/src/test/java/org/apache/kafka/streams/processor/internals/PartitionGroupTest.java

@ -107,7 +107,7 @@ public class PartitionGroupTest { @@ -107,7 +107,7 @@ public class PartitionGroupTest {
// st: -1 since no records was being processed yet
verifyBuffered(6, 3, 3);
assertEquals(-1L, group.timestamp());
assertEquals(-1L, group.streamTime());
assertEquals(0.0, metrics.metric(lastLatenessValue).metricValue());
StampedRecord record;
@ -143,7 +143,7 @@ public class PartitionGroupTest { @@ -143,7 +143,7 @@ public class PartitionGroupTest {
// 2:[4, 6]
// st: 2 (just adding records shouldn't change it)
verifyBuffered(6, 4, 2);
assertEquals(2L, group.timestamp());
assertEquals(2L, group.streamTime());
assertEquals(0.0, metrics.metric(lastLatenessValue).metricValue());
// get one record, time should be advanced
@ -221,7 +221,7 @@ public class PartitionGroupTest { @@ -221,7 +221,7 @@ public class PartitionGroupTest {
group.addRawRecords(partition1, list1);
verifyBuffered(3, 3, 0);
assertEquals(-1L, group.timestamp());
assertEquals(-1L, group.streamTime());
assertEquals(0.0, metrics.metric(lastLatenessValue).metricValue());
StampedRecord record;
@ -258,7 +258,7 @@ public class PartitionGroupTest { @@ -258,7 +258,7 @@ public class PartitionGroupTest {
private void verifyTimes(final StampedRecord record, final long recordTime, final long streamTime) {
assertEquals(recordTime, record.timestamp);
assertEquals(streamTime, group.timestamp());
assertEquals(streamTime, group.streamTime());
}
private void verifyBuffered(final int totalBuffered, final int partitionOneBuffered, final int partitionTwoBuffered) {

2
streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorTopologyTest.java

@ -731,7 +731,7 @@ public class ProcessorTopologyTest { @@ -731,7 +731,7 @@ public class ProcessorTopologyTest {
private static final long DEFAULT_TIMESTAMP = 1000L;
@Override
public long extract(final ConsumerRecord<Object, Object> record, final long previousTimestamp) {
public long extract(final ConsumerRecord<Object, Object> record, final long partitionTime) {
if (record.value().toString().matches(".*@[0-9]+")) {
return Long.parseLong(record.value().toString().split("@")[1]);
}

106
streams/src/test/java/org/apache/kafka/streams/processor/internals/RecordQueueTest.java

@ -101,7 +101,7 @@ public class RecordQueueTest { @@ -101,7 +101,7 @@ public class RecordQueueTest {
assertTrue(queue.isEmpty());
assertEquals(0, queue.size());
assertEquals(RecordQueue.UNKNOWN, queue.timestamp());
assertEquals(RecordQueue.UNKNOWN, queue.headRecordTimestamp());
// add three 3 out-of-order records with timestamp 2, 1, 3
final List<ConsumerRecord<byte[], byte[]>> list1 = Arrays.asList(
@ -112,17 +112,17 @@ public class RecordQueueTest { @@ -112,17 +112,17 @@ public class RecordQueueTest {
queue.addRawRecords(list1);
assertEquals(3, queue.size());
assertEquals(2L, queue.timestamp());
assertEquals(2L, queue.headRecordTimestamp());
// poll the first record, now with 1, 3
assertEquals(2L, queue.poll().timestamp);
assertEquals(2, queue.size());
assertEquals(1L, queue.timestamp());
assertEquals(1L, queue.headRecordTimestamp());
// poll the second record, now with 3
assertEquals(1L, queue.poll().timestamp);
assertEquals(1, queue.size());
assertEquals(3L, queue.timestamp());
assertEquals(3L, queue.headRecordTimestamp());
// add three 3 out-of-order records with timestamp 4, 1, 2
// now with 3, 4, 1, 2
@ -134,24 +134,24 @@ public class RecordQueueTest { @@ -134,24 +134,24 @@ public class RecordQueueTest {
queue.addRawRecords(list2);
assertEquals(4, queue.size());
assertEquals(3L, queue.timestamp());
assertEquals(3L, queue.headRecordTimestamp());
// poll the third record, now with 4, 1, 2
assertEquals(3L, queue.poll().timestamp);
assertEquals(3, queue.size());
assertEquals(4L, queue.timestamp());
assertEquals(4L, queue.headRecordTimestamp());
// poll the rest records
assertEquals(4L, queue.poll().timestamp);
assertEquals(1L, queue.timestamp());
assertEquals(1L, queue.headRecordTimestamp());
assertEquals(1L, queue.poll().timestamp);
assertEquals(2L, queue.timestamp());
assertEquals(2L, queue.headRecordTimestamp());
assertEquals(2L, queue.poll().timestamp);
assertTrue(queue.isEmpty());
assertEquals(0, queue.size());
assertEquals(RecordQueue.UNKNOWN, queue.timestamp());
assertEquals(RecordQueue.UNKNOWN, queue.headRecordTimestamp());
// add three more records with 4, 5, 6
final List<ConsumerRecord<byte[], byte[]>> list3 = Arrays.asList(
@ -162,24 +162,51 @@ public class RecordQueueTest { @@ -162,24 +162,51 @@ public class RecordQueueTest {
queue.addRawRecords(list3);
assertEquals(3, queue.size());
assertEquals(4L, queue.timestamp());
assertEquals(4L, queue.headRecordTimestamp());
// poll one record again, the timestamp should advance now
assertEquals(4L, queue.poll().timestamp);
assertEquals(2, queue.size());
assertEquals(5L, queue.timestamp());
assertEquals(5L, queue.headRecordTimestamp());
// clear the queue
queue.clear();
assertTrue(queue.isEmpty());
assertEquals(0, queue.size());
assertEquals(RecordQueue.UNKNOWN, queue.timestamp());
assertEquals(RecordQueue.UNKNOWN, queue.headRecordTimestamp());
// re-insert the three records with 4, 5, 6
queue.addRawRecords(list3);
assertEquals(3, queue.size());
assertEquals(4L, queue.timestamp());
assertEquals(4L, queue.headRecordTimestamp());
}
@Test
public void shouldTrackPartitionTimeAsMaxSeenTimestamp() {
assertTrue(queue.isEmpty());
assertEquals(0, queue.size());
assertEquals(RecordQueue.UNKNOWN, queue.headRecordTimestamp());
// add three 3 out-of-order records with timestamp 2, 1, 3, 4
final List<ConsumerRecord<byte[], byte[]>> list1 = Arrays.asList(
new ConsumerRecord<>("topic", 1, 2, 0L, TimestampType.CREATE_TIME, 0L, 0, 0, recordKey, recordValue),
new ConsumerRecord<>("topic", 1, 1, 0L, TimestampType.CREATE_TIME, 0L, 0, 0, recordKey, recordValue),
new ConsumerRecord<>("topic", 1, 3, 0L, TimestampType.CREATE_TIME, 0L, 0, 0, recordKey, recordValue),
new ConsumerRecord<>("topic", 1, 4, 0L, TimestampType.CREATE_TIME, 0L, 0, 0, recordKey, recordValue));
assertEquals(queue.partitionTime(), RecordQueue.UNKNOWN);
queue.addRawRecords(list1);
assertEquals(queue.partitionTime(), 2L);
queue.poll();
assertEquals(queue.partitionTime(), 2L);
queue.poll();
assertEquals(queue.partitionTime(), 3L);
}
@Test(expected = StreamsException.class)
@ -253,4 +280,57 @@ public class RecordQueueTest { @@ -253,4 +280,57 @@ public class RecordQueueTest {
assertEquals(0, queue.size());
}
@Test
public void shouldPassPartitionTimeToTimestampExtractor() {
final PartitionTimeTrackingTimestampExtractor timestampExtractor = new PartitionTimeTrackingTimestampExtractor();
final RecordQueue queue = new RecordQueue(
new TopicPartition(topics[0], 1),
mockSourceNodeWithMetrics,
timestampExtractor,
new LogAndFailExceptionHandler(),
context,
new LogContext());
assertTrue(queue.isEmpty());
assertEquals(0, queue.size());
assertEquals(RecordQueue.UNKNOWN, queue.headRecordTimestamp());
// add three 3 out-of-order records with timestamp 2, 1, 3, 4
final List<ConsumerRecord<byte[], byte[]>> list1 = Arrays.asList(
new ConsumerRecord<>("topic", 1, 2, 0L, TimestampType.CREATE_TIME, 0L, 0, 0, recordKey, recordValue),
new ConsumerRecord<>("topic", 1, 1, 0L, TimestampType.CREATE_TIME, 0L, 0, 0, recordKey, recordValue),
new ConsumerRecord<>("topic", 1, 3, 0L, TimestampType.CREATE_TIME, 0L, 0, 0, recordKey, recordValue),
new ConsumerRecord<>("topic", 1, 4, 0L, TimestampType.CREATE_TIME, 0L, 0, 0, recordKey, recordValue));
assertEquals(RecordQueue.UNKNOWN, timestampExtractor.partitionTime);
queue.addRawRecords(list1);
// no (known) timestamp has yet been passed to the timestamp extractor
assertEquals(RecordQueue.UNKNOWN, timestampExtractor.partitionTime);
queue.poll();
assertEquals(2L, timestampExtractor.partitionTime);
queue.poll();
assertEquals(2L, timestampExtractor.partitionTime);
queue.poll();
assertEquals(3L, timestampExtractor.partitionTime);
}
class PartitionTimeTrackingTimestampExtractor implements TimestampExtractor {
private long partitionTime = RecordQueue.UNKNOWN;
public long extract(final ConsumerRecord<Object, Object> record, final long partitionTime) {
if (partitionTime < this.partitionTime) {
throw new IllegalStateException("Partition time should not decrease");
}
this.partitionTime = partitionTime;
return record.offset();
}
}
}

2
streams/src/test/java/org/apache/kafka/test/MockTimestampExtractor.java

@ -23,7 +23,7 @@ import org.apache.kafka.streams.processor.TimestampExtractor; @@ -23,7 +23,7 @@ import org.apache.kafka.streams.processor.TimestampExtractor;
public class MockTimestampExtractor implements TimestampExtractor {
@Override
public long extract(final ConsumerRecord<Object, Object> record, final long previousTimestamp) {
public long extract(final ConsumerRecord<Object, Object> record, final long partitionTime) {
return record.offset();
}
}

Loading…
Cancel
Save