diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StoreChangelogReaderTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StoreChangelogReaderTest.java index 9ae9d798cb8..73fd6bd0f88 100644 --- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StoreChangelogReaderTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StoreChangelogReaderTest.java @@ -52,8 +52,11 @@ import static org.apache.kafka.streams.state.internals.RecordConverters.identity import static org.apache.kafka.test.MockStateRestoreListener.RESTORE_BATCH; import static org.apache.kafka.test.MockStateRestoreListener.RESTORE_END; import static org.apache.kafka.test.MockStateRestoreListener.RESTORE_START; +import static org.easymock.EasyMock.anyObject; import static org.easymock.EasyMock.expect; +import static org.easymock.EasyMock.expectLastCall; import static org.easymock.EasyMock.replay; +import static org.easymock.EasyMock.verify; import static org.hamcrest.MatcherAssert.assertThat; import static org.hamcrest.core.IsEqual.equalTo; import static org.junit.Assert.assertEquals; @@ -618,6 +621,7 @@ public class StoreChangelogReaderTest { public void shouldNotThrowTaskMigratedExceptionDuringRestoreForChangelogTopicWhenEndOffsetNotExceededEOSEnabled() { final int totalMessages = 10; setupConsumer(totalMessages, topicPartition); + // EOS enabled simulated by setting offsets with commit marker // records have offsets of 0..9 10 is commit marker so 11 is end offset consumer.updateEndOffsets(Collections.singletonMap(topicPartition, 11L)); @@ -637,6 +641,33 @@ public class StoreChangelogReaderTest { assertThat(callback.restored.size(), equalTo(10)); } + @Test + public void shouldReinitializeStateStoresWhenNoCheckpointFoundAndEOSEnabled() { + final int totalMessages = 10; + setupConsumer(totalMessages, topicPartition); + // EOS enabled simulated by setting offsets with commit marker + // records have offsets of 0..9 10 is commit marker so 11 is end offset + consumer.updateEndOffsets(Collections.singletonMap(topicPartition, 11L)); + + changelogReader.register(new StateRestorer( + topicPartition, + restoreListener, + null, + Long.MAX_VALUE, + true, + "storeName", + identity())); + + expect(active.restoringTaskFor(topicPartition)).andReturn(task); + expect(task.isEosEnabled()).andReturn(true); + task.reinitializeStateStoresForPartitions(Collections.singleton(anyObject(TopicPartition.class))); + expectLastCall(); + replay(active, task); + + changelogReader.restore(active); + verify(active, task); + } + @Test public void shouldNotThrowTaskMigratedExceptionDuringRestoreForChangelogTopicWhenEndOffsetNotExceededEOSDisabled() { final int totalMessages = 10; @@ -705,7 +736,8 @@ public class StoreChangelogReaderTest { assignPartition(totalMessages, topicPartition); // records 0..4 last offset before commit is 4 addRecords(5, topicPartition, 0); - //EOS enabled so commit marker at offset 5 so records start at 6 + // EOS enabled simulated by setting offsets with commit marker + // EOS enabled so commit marker at offset 5 so records start at 6 addRecords(5, topicPartition, 6); consumer.assign(Collections.emptyList()); // commit marker is 5 so ending offset is 12 @@ -731,6 +763,7 @@ public class StoreChangelogReaderTest { public void shouldNotThrowTaskMigratedExceptionIfEndOffsetNotExceededDuringRestoreForSourceTopicEOSEnabled() { final int totalMessages = 10; setupConsumer(totalMessages, topicPartition); + // EOS enabled simulated by setting offsets with commit marker // records have offsets 0..9 10 is commit marker so 11 is ending offset consumer.updateEndOffsets(Collections.singletonMap(topicPartition, 11L));