Browse Source

KAFKA-6179: Clear min timestamp tracker upon partition queue cleanup

Author: Guozhang Wang <wangguoz@gmail.com>

Reviewers: Matthias J. Sax <matthias@confluent.io>, Bill Bejeck <bill@confluent.io>, Damian Guy <damian.guy@gmail.com>

Closes #4186 from guozhangwang/K6179-cleanup-timestamp-tracker-on-clear
pull/4186/merge
Guozhang Wang 7 years ago
parent
commit
ee1aaa091f
  1. 4
      streams/src/main/java/org/apache/kafka/streams/processor/internals/MinTimestampTracker.java
  2. 13
      streams/src/main/java/org/apache/kafka/streams/processor/internals/RecordQueue.java
  3. 6
      streams/src/main/java/org/apache/kafka/streams/processor/internals/TimestampTracker.java
  4. 25
      streams/src/test/java/org/apache/kafka/streams/processor/internals/RecordQueueTest.java

4
streams/src/main/java/org/apache/kafka/streams/processor/internals/MinTimestampTracker.java

@ -77,4 +77,8 @@ public class MinTimestampTracker<E> implements TimestampTracker<E> { @@ -77,4 +77,8 @@ public class MinTimestampTracker<E> implements TimestampTracker<E> {
return stamped.timestamp;
}
public void clear() {
lastKnownTime = NOT_KNOWN;
ascendingSubsequence.clear();
}
}

13
streams/src/main/java/org/apache/kafka/streams/processor/internals/RecordQueue.java

@ -180,9 +180,20 @@ public class RecordQueue { @@ -180,9 +180,20 @@ public class RecordQueue {
}
/**
* Clear the fifo queue of its elements
* Clear the fifo queue of its elements, also clear the time tracker's kept stamped elements
*/
public void clear() {
fifoQueue.clear();
timeTracker.clear();
partitionTime = TimestampTracker.NOT_KNOWN;
}
/*
* Returns the timestamp tracker of the record queue
*
* This is only used for testing
*/
TimestampTracker<ConsumerRecord<Object, Object>> timeTracker() {
return timeTracker;
}
}

6
streams/src/main/java/org/apache/kafka/streams/processor/internals/TimestampTracker.java

@ -24,7 +24,7 @@ package org.apache.kafka.streams.processor.internals; @@ -24,7 +24,7 @@ package org.apache.kafka.streams.processor.internals;
*/
public interface TimestampTracker<E> {
static final long NOT_KNOWN = -1L;
long NOT_KNOWN = -1L;
/**
* Adds a stamped elements to this tracker.
@ -54,4 +54,8 @@ public interface TimestampTracker<E> { @@ -54,4 +54,8 @@ public interface TimestampTracker<E> {
*/
int size();
/**
* Empty the tracker by removing any tracked stamped elements
*/
void clear();
}

25
streams/src/test/java/org/apache/kafka/streams/processor/internals/RecordQueueTest.java

@ -88,6 +88,8 @@ public class RecordQueueTest { @@ -88,6 +88,8 @@ public class RecordQueueTest {
public void testTimeTracking() {
assertTrue(queue.isEmpty());
assertEquals(0, queue.size());
assertEquals(TimestampTracker.NOT_KNOWN, queue.timestamp());
// add three 3 out-of-order records with timestamp 2, 1, 3
List<ConsumerRecord<byte[], byte[]>> list1 = Arrays.asList(
@ -99,16 +101,19 @@ public class RecordQueueTest { @@ -99,16 +101,19 @@ public class RecordQueueTest {
assertEquals(3, queue.size());
assertEquals(1L, queue.timestamp());
assertEquals(2, queue.timeTracker().size());
// poll the first record, now with 1, 3
assertEquals(2L, queue.poll().timestamp);
assertEquals(2, queue.size());
assertEquals(1L, queue.timestamp());
assertEquals(2, queue.timeTracker().size());
// poll the second record, now with 3
assertEquals(1L, queue.poll().timestamp);
assertEquals(1, queue.size());
assertEquals(3L, queue.timestamp());
assertEquals(1, queue.timeTracker().size());
// add three 3 out-of-order records with timestamp 4, 1, 2
// now with 3, 4, 1, 2
@ -121,22 +126,28 @@ public class RecordQueueTest { @@ -121,22 +126,28 @@ public class RecordQueueTest {
assertEquals(4, queue.size());
assertEquals(3L, queue.timestamp());
assertEquals(2, queue.timeTracker().size());
// poll the third record, now with 4, 1, 2
assertEquals(3L, queue.poll().timestamp);
assertEquals(3, queue.size());
assertEquals(3L, queue.timestamp());
assertEquals(2, queue.timeTracker().size());
// poll the rest records
assertEquals(4L, queue.poll().timestamp);
assertEquals(3L, queue.timestamp());
assertEquals(2, queue.timeTracker().size());
assertEquals(1L, queue.poll().timestamp);
assertEquals(3L, queue.timestamp());
assertEquals(1, queue.timeTracker().size());
assertEquals(2L, queue.poll().timestamp);
assertTrue(queue.isEmpty());
assertEquals(0, queue.size());
assertEquals(3L, queue.timestamp());
assertEquals(0, queue.timeTracker().size());
// add three more records with 4, 5, 6
List<ConsumerRecord<byte[], byte[]>> list3 = Arrays.asList(
@ -153,6 +164,20 @@ public class RecordQueueTest { @@ -153,6 +164,20 @@ public class RecordQueueTest {
assertEquals(4L, queue.poll().timestamp);
assertEquals(2, queue.size());
assertEquals(5L, queue.timestamp());
assertEquals(2, queue.timeTracker().size());
// clear the queue
queue.clear();
assertTrue(queue.isEmpty());
assertEquals(0, queue.size());
assertEquals(0, queue.timeTracker().size());
assertEquals(TimestampTracker.NOT_KNOWN, queue.timestamp());
// re-insert the three records with 4, 5, 6
queue.addRawRecords(list3);
assertEquals(3, queue.size());
assertEquals(4L, queue.timestamp());
}
@Test(expected = StreamsException.class)

Loading…
Cancel
Save