@ -52,8 +52,11 @@ import static org.apache.kafka.streams.state.internals.RecordConverters.identity
import static org.apache.kafka.test.MockStateRestoreListener.RESTORE_BATCH ;
import static org.apache.kafka.test.MockStateRestoreListener.RESTORE_BATCH ;
import static org.apache.kafka.test.MockStateRestoreListener.RESTORE_END ;
import static org.apache.kafka.test.MockStateRestoreListener.RESTORE_END ;
import static org.apache.kafka.test.MockStateRestoreListener.RESTORE_START ;
import static org.apache.kafka.test.MockStateRestoreListener.RESTORE_START ;
import static org.easymock.EasyMock.anyObject ;
import static org.easymock.EasyMock.expect ;
import static org.easymock.EasyMock.expect ;
import static org.easymock.EasyMock.expectLastCall ;
import static org.easymock.EasyMock.replay ;
import static org.easymock.EasyMock.replay ;
import static org.easymock.EasyMock.verify ;
import static org.hamcrest.MatcherAssert.assertThat ;
import static org.hamcrest.MatcherAssert.assertThat ;
import static org.hamcrest.core.IsEqual.equalTo ;
import static org.hamcrest.core.IsEqual.equalTo ;
import static org.junit.Assert.assertEquals ;
import static org.junit.Assert.assertEquals ;
@ -618,6 +621,7 @@ public class StoreChangelogReaderTest {
public void shouldNotThrowTaskMigratedExceptionDuringRestoreForChangelogTopicWhenEndOffsetNotExceededEOSEnabled ( ) {
public void shouldNotThrowTaskMigratedExceptionDuringRestoreForChangelogTopicWhenEndOffsetNotExceededEOSEnabled ( ) {
final int totalMessages = 10 ;
final int totalMessages = 10 ;
setupConsumer ( totalMessages , topicPartition ) ;
setupConsumer ( totalMessages , topicPartition ) ;
// EOS enabled simulated by setting offsets with commit marker
// records have offsets of 0..9 10 is commit marker so 11 is end offset
// records have offsets of 0..9 10 is commit marker so 11 is end offset
consumer . updateEndOffsets ( Collections . singletonMap ( topicPartition , 11L ) ) ;
consumer . updateEndOffsets ( Collections . singletonMap ( topicPartition , 11L ) ) ;
@ -637,6 +641,33 @@ public class StoreChangelogReaderTest {
assertThat ( callback . restored . size ( ) , equalTo ( 10 ) ) ;
assertThat ( callback . restored . size ( ) , equalTo ( 10 ) ) ;
}
}
@Test
public void shouldReinitializeStateStoresWhenNoCheckpointFoundAndEOSEnabled ( ) {
final int totalMessages = 10 ;
setupConsumer ( totalMessages , topicPartition ) ;
// EOS enabled simulated by setting offsets with commit marker
// records have offsets of 0..9 10 is commit marker so 11 is end offset
consumer . updateEndOffsets ( Collections . singletonMap ( topicPartition , 11L ) ) ;
changelogReader . register ( new StateRestorer (
topicPartition ,
restoreListener ,
null ,
Long . MAX_VALUE ,
true ,
"storeName" ,
identity ( ) ) ) ;
expect ( active . restoringTaskFor ( topicPartition ) ) . andReturn ( task ) ;
expect ( task . isEosEnabled ( ) ) . andReturn ( true ) ;
task . reinitializeStateStoresForPartitions ( Collections . singleton ( anyObject ( TopicPartition . class ) ) ) ;
expectLastCall ( ) ;
replay ( active , task ) ;
changelogReader . restore ( active ) ;
verify ( active , task ) ;
}
@Test
@Test
public void shouldNotThrowTaskMigratedExceptionDuringRestoreForChangelogTopicWhenEndOffsetNotExceededEOSDisabled ( ) {
public void shouldNotThrowTaskMigratedExceptionDuringRestoreForChangelogTopicWhenEndOffsetNotExceededEOSDisabled ( ) {
final int totalMessages = 10 ;
final int totalMessages = 10 ;
@ -705,7 +736,8 @@ public class StoreChangelogReaderTest {
assignPartition ( totalMessages , topicPartition ) ;
assignPartition ( totalMessages , topicPartition ) ;
// records 0..4 last offset before commit is 4
// records 0..4 last offset before commit is 4
addRecords ( 5 , topicPartition , 0 ) ;
addRecords ( 5 , topicPartition , 0 ) ;
//EOS enabled so commit marker at offset 5 so records start at 6
// EOS enabled simulated by setting offsets with commit marker
// EOS enabled so commit marker at offset 5 so records start at 6
addRecords ( 5 , topicPartition , 6 ) ;
addRecords ( 5 , topicPartition , 6 ) ;
consumer . assign ( Collections . emptyList ( ) ) ;
consumer . assign ( Collections . emptyList ( ) ) ;
// commit marker is 5 so ending offset is 12
// commit marker is 5 so ending offset is 12
@ -731,6 +763,7 @@ public class StoreChangelogReaderTest {
public void shouldNotThrowTaskMigratedExceptionIfEndOffsetNotExceededDuringRestoreForSourceTopicEOSEnabled ( ) {
public void shouldNotThrowTaskMigratedExceptionIfEndOffsetNotExceededDuringRestoreForSourceTopicEOSEnabled ( ) {
final int totalMessages = 10 ;
final int totalMessages = 10 ;
setupConsumer ( totalMessages , topicPartition ) ;
setupConsumer ( totalMessages , topicPartition ) ;
// EOS enabled simulated by setting offsets with commit marker
// records have offsets 0..9 10 is commit marker so 11 is ending offset
// records have offsets 0..9 10 is commit marker so 11 is ending offset
consumer . updateEndOffsets ( Collections . singletonMap ( topicPartition , 11L ) ) ;
consumer . updateEndOffsets ( Collections . singletonMap ( topicPartition , 11L ) ) ;