@ -70,6 +70,8 @@ import org.junit.jupiter.api.TestInfo;
import org.junit.jupiter.api.Timeout ;
import org.junit.jupiter.api.Timeout ;
import org.junit.jupiter.params.ParameterizedTest ;
import org.junit.jupiter.params.ParameterizedTest ;
import org.junit.jupiter.params.provider.MethodSource ;
import org.junit.jupiter.params.provider.MethodSource ;
import org.slf4j.Logger ;
import org.slf4j.LoggerFactory ;
import java.io.File ;
import java.io.File ;
import java.io.IOException ;
import java.io.IOException ;
@ -108,6 +110,8 @@ import static org.junit.jupiter.api.Assertions.assertTrue;
@Timeout ( 600 )
@Timeout ( 600 )
@Tag ( "integration" )
@Tag ( "integration" )
public class RestoreIntegrationTest {
public class RestoreIntegrationTest {
private static final Logger log = LoggerFactory . getLogger ( RestoreIntegrationTest . class ) ;
private static final Duration RESTORATION_DELAY = Duration . ofSeconds ( 1 ) ;
private static final Duration RESTORATION_DELAY = Duration . ofSeconds ( 1 ) ;
private static final int NUM_BROKERS = 1 ;
private static final int NUM_BROKERS = 1 ;
@ -523,13 +527,13 @@ public class RestoreIntegrationTest {
final Map < String , Object > kafkaStreams1Configuration = mkMap (
final Map < String , Object > kafkaStreams1Configuration = mkMap (
mkEntry ( StreamsConfig . STATE_DIR_CONFIG , TestUtils . tempDirectory ( appId ) . getPath ( ) + "-ks1" ) ,
mkEntry ( StreamsConfig . STATE_DIR_CONFIG , TestUtils . tempDirectory ( appId ) . getPath ( ) + "-ks1" ) ,
mkEntry ( StreamsConfig . consumerPrefix ( ConsumerConfig . GROUP_INSTANCE_ID_CONFIG ) , appId + "-ks1" ) ,
mkEntry ( StreamsConfig . CLIENT_ID_CONFIG , appId + "-ks1" ) ,
mkEntry ( StreamsConfig . restoreConsumerPrefix ( ConsumerConfig . MAX_POLL_RECORDS_CONFIG ) , 10 )
mkEntry ( StreamsConfig . restoreConsumerPrefix ( ConsumerConfig . MAX_POLL_RECORDS_CONFIG ) , 5 )
) ;
) ;
final Map < String , Object > kafkaStreams2Configuration = mkMap (
final Map < String , Object > kafkaStreams2Configuration = mkMap (
mkEntry ( StreamsConfig . STATE_DIR_CONFIG , TestUtils . tempDirectory ( appId ) . getPath ( ) + "-ks2" ) ,
mkEntry ( StreamsConfig . STATE_DIR_CONFIG , TestUtils . tempDirectory ( appId ) . getPath ( ) + "-ks2" ) ,
mkEntry ( StreamsConfig . consumerPrefix ( ConsumerConfig . GROUP_INSTANCE_ID_CONFIG ) , appId + "-ks2" ) ,
mkEntry ( StreamsConfig . CLIENT_ID_CONFIG , appId + "-ks2" ) ,
mkEntry ( StreamsConfig . restoreConsumerPrefix ( ConsumerConfig . MAX_POLL_RECORDS_CONFIG ) , 10 )
mkEntry ( StreamsConfig . restoreConsumerPrefix ( ConsumerConfig . MAX_POLL_RECORDS_CONFIG ) , 5 )
) ;
) ;
final StreamsBuilder builder = new StreamsBuilder ( ) ;
final StreamsBuilder builder = new StreamsBuilder ( ) ;
@ -553,7 +557,7 @@ public class RestoreIntegrationTest {
kafkaStreams . close ( Duration . ofMillis ( IntegrationTestUtils . DEFAULT_TIMEOUT ) ) ;
kafkaStreams . close ( Duration . ofMillis ( IntegrationTestUtils . DEFAULT_TIMEOUT ) ) ;
IntegrationTestUtils . purgeLocalStreamsState ( streamsConfigurations ) ;
IntegrationTestUtils . purgeLocalStreamsState ( streamsConfigurations ) ;
final TestStateRestoreListener kafkaStreams1StateRestoreListener = new TestStateRestoreListener ( RESTORATION_DELAY ) ;
final TestStateRestoreListener kafkaStreams1StateRestoreListener = new TestStateRestoreListener ( "ks1" , RESTORATION_DELAY ) ;
kafkaStreams = startKafkaStreams ( builder , kafkaStreams1StateRestoreListener , kafkaStreams1Configuration ) ;
kafkaStreams = startKafkaStreams ( builder , kafkaStreams1StateRestoreListener , kafkaStreams1Configuration ) ;
assertTrue ( kafkaStreams1StateRestoreListener . awaitUntilRestorationStarts ( ) ) ;
assertTrue ( kafkaStreams1StateRestoreListener . awaitUntilRestorationStarts ( ) ) ;
@ -562,17 +566,23 @@ public class RestoreIntegrationTest {
// Simulate a new instance joining in the middle of the restoration.
// Simulate a new instance joining in the middle of the restoration.
// When this happens, some of the partitions that kafkaStreams1 was restoring will be migrated to kafkaStreams2,
// When this happens, some of the partitions that kafkaStreams1 was restoring will be migrated to kafkaStreams2,
// and kafkaStreams1 must call StateRestoreListener#onRestoreSuspended.
// and kafkaStreams1 must call StateRestoreListener#onRestoreSuspended.
final TestStateRestoreListener kafkaStreams2StateRestoreListener = new TestStateRestoreListener ( RESTORATION_DELAY ) ;
final TestStateRestoreListener kafkaStreams2StateRestoreListener = new TestStateRestoreListener ( "ks2" , RESTORATION_DELAY ) ;
final KafkaStreams kafkaStreams2 = startKafkaStreams ( builder , kafkaStreams2StateRestoreListener , kafkaStreams2Configuration ) ;
assertTrue ( kafkaStreams1StateRestoreListener . awaitUntilRestorationSuspends ( ) ) ;
try ( final KafkaStreams kafkaStreams2 = startKafkaStreams ( builder ,
kafkaStreams2StateRestoreListener ,
kafkaStreams2Configuration ) ) {
waitForCondition ( ( ) - > State . RUNNING = = kafkaStreams2 . state ( ) ,
IntegrationTestUtils . DEFAULT_TIMEOUT ,
( ) - > "kafkaStreams2 never transitioned to a RUNNING state." ) ;
assertTrue ( kafkaStreams2StateRestoreListener . awaitUntilRestorationStarts ( ) ) ;
assertTrue ( kafkaStreams1StateRestoreListener . awaitUntilRestorationSuspend s ( ) ) ;
assertTrue ( kafkaStreams1StateRestoreListener . awaitUntilRestorationEnds ( ) ) ;
assertTrue ( kafkaStreams2StateRestoreListener . awaitUntilRestorationStarts ( ) ) ;
assertTrue ( kafkaStreams2StateRestoreListener . awaitUntilRestorationEnds ( ) ) ;
// Cleanup
assertTrue ( kafkaStreams1StateRestoreListener . awaitUntilRestorationEnds ( ) ) ;
kafkaStreams2 . close ( Duration . ofMillis ( IntegrationTestUtils . DEFAULT_TIMEOUT ) ) ;
assertTrue ( kafkaStreams2StateRestoreListener . awaitUntilRestorationEnds ( ) ) ;
}
}
}
private void validateReceivedMessages ( final List < KeyValue < Integer , Integer > > expectedRecords ,
private void validateReceivedMessages ( final List < KeyValue < Integer , Integer > > expectedRecords ,
@ -601,8 +611,6 @@ public class RestoreIntegrationTest {
final StateRestoreListener stateRestoreListener ,
final StateRestoreListener stateRestoreListener ,
final Map < String , Object > extraConfiguration ) {
final Map < String , Object > extraConfiguration ) {
final Properties streamsConfiguration = props ( mkObjectProperties ( extraConfiguration ) ) ;
final Properties streamsConfiguration = props ( mkObjectProperties ( extraConfiguration ) ) ;
streamsConfiguration . put ( StreamsConfig . restoreConsumerPrefix ( ConsumerConfig . MAX_POLL_RECORDS_CONFIG ) , 10 ) ;
final KafkaStreams kafkaStreams = new KafkaStreams ( streamsBuilder . build ( ) , streamsConfiguration ) ;
final KafkaStreams kafkaStreams = new KafkaStreams ( streamsBuilder . build ( ) , streamsConfiguration ) ;
kafkaStreams . setGlobalStateRestoreListener ( stateRestoreListener ) ;
kafkaStreams . setGlobalStateRestoreListener ( stateRestoreListener ) ;
@ -612,6 +620,7 @@ public class RestoreIntegrationTest {
}
}
private static final class TestStateRestoreListener implements StateRestoreListener {
private static final class TestStateRestoreListener implements StateRestoreListener {
private final String instanceName ;
private final Duration onBatchRestoredSleepDuration ;
private final Duration onBatchRestoredSleepDuration ;
private final CountDownLatch onRestoreStartLatch = new CountDownLatch ( 1 ) ;
private final CountDownLatch onRestoreStartLatch = new CountDownLatch ( 1 ) ;
@ -619,8 +628,9 @@ public class RestoreIntegrationTest {
private final CountDownLatch onRestoreSuspendedLatch = new CountDownLatch ( 1 ) ;
private final CountDownLatch onRestoreSuspendedLatch = new CountDownLatch ( 1 ) ;
private final CountDownLatch onBatchRestoredLatch = new CountDownLatch ( 1 ) ;
private final CountDownLatch onBatchRestoredLatch = new CountDownLatch ( 1 ) ;
TestStateRestoreListener ( final Duration onBatchRestoredSleepDuration ) {
TestStateRestoreListener ( final String instanceName , final Duration onBatchRestoredSleepDuration ) {
this . onBatchRestoredSleepDuration = onBatchRestoredSleepDuration ;
this . onBatchRestoredSleepDuration = onBatchRestoredSleepDuration ;
this . instanceName = instanceName ;
}
}
boolean awaitUntilRestorationStarts ( ) throws InterruptedException {
boolean awaitUntilRestorationStarts ( ) throws InterruptedException {
@ -644,6 +654,8 @@ public class RestoreIntegrationTest {
final String storeName ,
final String storeName ,
final long startingOffset ,
final long startingOffset ,
final long endingOffset ) {
final long endingOffset ) {
log . info ( "[{}] called onRestoreStart. topicPartition={}, storeName={}, startingOffset={}, endingOffset={}" ,
instanceName , topicPartition , storeName , startingOffset , endingOffset ) ;
onRestoreStartLatch . countDown ( ) ;
onRestoreStartLatch . countDown ( ) ;
}
}
@ -652,6 +664,8 @@ public class RestoreIntegrationTest {
final String storeName ,
final String storeName ,
final long batchEndOffset ,
final long batchEndOffset ,
final long numRestored ) {
final long numRestored ) {
log . info ( "[{}] called onBatchRestored. topicPartition={}, storeName={}, batchEndOffset={}, numRestored={}" ,
instanceName , topicPartition , storeName , batchEndOffset , numRestored ) ;
Utils . sleep ( onBatchRestoredSleepDuration . toMillis ( ) ) ;
Utils . sleep ( onBatchRestoredSleepDuration . toMillis ( ) ) ;
onBatchRestoredLatch . countDown ( ) ;
onBatchRestoredLatch . countDown ( ) ;
}
}
@ -660,6 +674,8 @@ public class RestoreIntegrationTest {
public void onRestoreEnd ( final TopicPartition topicPartition ,
public void onRestoreEnd ( final TopicPartition topicPartition ,
final String storeName ,
final String storeName ,
final long totalRestored ) {
final long totalRestored ) {
log . info ( "[{}] called onRestoreEnd. topicPartition={}, storeName={}, totalRestored={}" ,
instanceName , topicPartition , storeName , totalRestored ) ;
onRestoreEndLatch . countDown ( ) ;
onRestoreEndLatch . countDown ( ) ;
}
}
@ -667,6 +683,8 @@ public class RestoreIntegrationTest {
public void onRestoreSuspended ( final TopicPartition topicPartition ,
public void onRestoreSuspended ( final TopicPartition topicPartition ,
final String storeName ,
final String storeName ,
final long totalRestored ) {
final long totalRestored ) {
log . info ( "[{}] called onRestoreSuspended. topicPartition={}, storeName={}, totalRestored={}" ,
instanceName , topicPartition , storeName , totalRestored ) ;
onRestoreSuspendedLatch . countDown ( ) ;
onRestoreSuspendedLatch . countDown ( ) ;
}
}