@ -30,6 +30,7 @@ import org.apache.kafka.common.serialization.IntegerSerializer;
@@ -30,6 +30,7 @@ import org.apache.kafka.common.serialization.IntegerSerializer;
import org.apache.kafka.common.serialization.Serdes ;
import org.apache.kafka.common.utils.Bytes ;
import org.apache.kafka.common.utils.MockTime ;
import org.apache.kafka.common.utils.Utils ;
import org.apache.kafka.streams.KafkaStreams ;
import org.apache.kafka.streams.KafkaStreams.State ;
import org.apache.kafka.streams.KeyValue ;
@ -56,15 +57,15 @@ import org.apache.kafka.streams.state.Stores;
@@ -56,15 +57,15 @@ import org.apache.kafka.streams.state.Stores;
import org.apache.kafka.streams.state.internals.InMemoryKeyValueStore ;
import org.apache.kafka.streams.state.internals.KeyValueStoreBuilder ;
import org.apache.kafka.streams.state.internals.OffsetCheckpoint ;
import org.apache.kafka.test.TestUtils ;
import org.apache.kafka.test.StreamsTestUtils ;
import org.apache.kafka.test.TestUtils ;
import org.hamcrest.CoreMatchers ;
import org.junit.jupiter.api.Test ;
import org.junit.jupiter.api.AfterAll ;
import org.junit.jupiter.api.AfterEach ;
import org.junit.jupiter.api.BeforeAll ;
import org.junit.jupiter.api.BeforeEach ;
import org.junit.jupiter.api.Tag ;
import org.junit.jupiter.api.Test ;
import org.junit.jupiter.api.TestInfo ;
import org.junit.jupiter.api.Timeout ;
import org.junit.jupiter.params.ParameterizedTest ;
@ -73,9 +74,11 @@ import org.junit.jupiter.params.provider.MethodSource;
@@ -73,9 +74,11 @@ import org.junit.jupiter.params.provider.MethodSource;
import java.io.File ;
import java.io.IOException ;
import java.time.Duration ;
import java.util.ArrayList ;
import java.util.Arrays ;
import java.util.Collections ;
import java.util.List ;
import java.util.Map ;
import java.util.Properties ;
import java.util.Set ;
import java.util.concurrent.ConcurrentHashMap ;
@ -83,9 +86,15 @@ import java.util.concurrent.CountDownLatch;
@@ -83,9 +86,15 @@ import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit ;
import java.util.concurrent.atomic.AtomicInteger ;
import java.util.concurrent.atomic.AtomicLong ;
import java.util.stream.Collectors ;
import java.util.stream.IntStream ;
import java.util.stream.Stream ;
import static java.util.Arrays.asList ;
import static org.apache.kafka.common.utils.Utils.mkEntry ;
import static org.apache.kafka.common.utils.Utils.mkMap ;
import static org.apache.kafka.common.utils.Utils.mkObjectProperties ;
import static org.apache.kafka.streams.Topology.AutoOffsetReset.EARLIEST ;
import static org.apache.kafka.streams.integration.utils.IntegrationTestUtils.purgeLocalStreamsState ;
import static org.apache.kafka.streams.integration.utils.IntegrationTestUtils.safeUniqueTestName ;
import static org.apache.kafka.streams.integration.utils.IntegrationTestUtils.startApplicationAndWaitUntilRunning ;
@ -99,6 +108,8 @@ import static org.junit.jupiter.api.Assertions.assertTrue;
@@ -99,6 +108,8 @@ import static org.junit.jupiter.api.Assertions.assertTrue;
@Timeout ( 600 )
@Tag ( "integration" )
public class RestoreIntegrationTest {
private static final Duration RESTORATION_DELAY = Duration . ofSeconds ( 1 ) ;
private static final int NUM_BROKERS = 1 ;
public static final EmbeddedKafkaCluster CLUSTER = new EmbeddedKafkaCluster ( NUM_BROKERS ) ;
@ -119,6 +130,8 @@ public class RestoreIntegrationTest {
@@ -119,6 +130,8 @@ public class RestoreIntegrationTest {
private final int numberOfKeys = 10000 ;
private KafkaStreams kafkaStreams ;
private final List < Properties > streamsConfigurations = new ArrayList < > ( ) ;
@BeforeEach
public void createTopics ( final TestInfo testInfo ) throws InterruptedException {
appId = safeUniqueTestName ( RestoreIntegrationTest . class , testInfo ) ;
@ -127,7 +140,12 @@ public class RestoreIntegrationTest {
@@ -127,7 +140,12 @@ public class RestoreIntegrationTest {
}
private Properties props ( final boolean stateUpdaterEnabled ) {
return props ( mkObjectProperties ( mkMap ( mkEntry ( InternalConfig . STATE_UPDATER_ENABLED , stateUpdaterEnabled ) ) ) ) ;
}
private Properties props ( final Properties extraProperties ) {
final Properties streamsConfiguration = new Properties ( ) ;
streamsConfiguration . put ( StreamsConfig . APPLICATION_ID_CONFIG , appId ) ;
streamsConfiguration . put ( StreamsConfig . BOOTSTRAP_SERVERS_CONFIG , CLUSTER . bootstrapServers ( ) ) ;
streamsConfiguration . put ( StreamsConfig . STATESTORE_CACHE_MAX_BYTES_CONFIG , 0 ) ;
@ -136,15 +154,21 @@ public class RestoreIntegrationTest {
@@ -136,15 +154,21 @@ public class RestoreIntegrationTest {
streamsConfiguration . put ( StreamsConfig . DEFAULT_VALUE_SERDE_CLASS_CONFIG , Serdes . Integer ( ) . getClass ( ) ) ;
streamsConfiguration . put ( StreamsConfig . COMMIT_INTERVAL_MS_CONFIG , 1000L ) ;
streamsConfiguration . put ( ConsumerConfig . AUTO_OFFSET_RESET_CONFIG , "earliest" ) ;
streamsConfiguration . put ( InternalConfig . STATE_UPDATER_ENABLED , stateUpdaterEnabled ) ;
streamsConfiguration . putAll ( extraProperties ) ;
streamsConfigurations . add ( streamsConfiguration ) ;
return streamsConfiguration ;
}
@AfterEach
public void shutdown ( ) {
public void shutdown ( ) throws Exception {
if ( kafkaStreams ! = null ) {
kafkaStreams . close ( Duration . ofSeconds ( 30 ) ) ;
}
IntegrationTestUtils . purgeLocalStreamsState ( streamsConfigurations ) ;
streamsConfigurations . clear ( ) ;
}
private static Stream < Boolean > parameters ( ) {
@ -490,6 +514,181 @@ public class RestoreIntegrationTest {
@@ -490,6 +514,181 @@ public class RestoreIntegrationTest {
assertThat ( CloseCountingInMemoryStore . numStoresClosed ( ) , CoreMatchers . equalTo ( initialStoreCloseCount + 4 ) ) ;
}
@Test
public void shouldInvokeUserDefinedGlobalStateRestoreListener ( ) throws Exception {
final String inputTopic = "inputTopic" ;
final String outputTopic = "outputTopic" ;
CLUSTER . createTopic ( inputTopic , 5 , 1 ) ;
CLUSTER . createTopic ( outputTopic , 5 , 1 ) ;
final Map < String , Object > kafkaStreams1Configuration = mkMap (
mkEntry ( StreamsConfig . STATE_DIR_CONFIG , TestUtils . tempDirectory ( appId ) . getPath ( ) + "-ks1" ) ,
mkEntry ( StreamsConfig . consumerPrefix ( ConsumerConfig . GROUP_INSTANCE_ID_CONFIG ) , appId + "-ks1" ) ,
mkEntry ( StreamsConfig . restoreConsumerPrefix ( ConsumerConfig . MAX_POLL_RECORDS_CONFIG ) , 10 )
) ;
final Map < String , Object > kafkaStreams2Configuration = mkMap (
mkEntry ( StreamsConfig . STATE_DIR_CONFIG , TestUtils . tempDirectory ( appId ) . getPath ( ) + "-ks2" ) ,
mkEntry ( StreamsConfig . consumerPrefix ( ConsumerConfig . GROUP_INSTANCE_ID_CONFIG ) , appId + "-ks2" ) ,
mkEntry ( StreamsConfig . restoreConsumerPrefix ( ConsumerConfig . MAX_POLL_RECORDS_CONFIG ) , 10 )
) ;
final StreamsBuilder builder = new StreamsBuilder ( ) ;
builder . stream ( inputTopic , Consumed . with ( EARLIEST ) )
. groupByKey ( )
. reduce ( ( oldVal , newVal ) - > newVal )
. toStream ( )
. to ( outputTopic ) ;
final List < KeyValue < Integer , Integer > > sampleData = IntStream . range ( 0 , 100 )
. mapToObj ( i - > new KeyValue < > ( i , i ) )
. collect ( Collectors . toList ( ) ) ;
sendEvents ( inputTopic , sampleData ) ;
kafkaStreams = startKafkaStreams ( builder , null , kafkaStreams1Configuration ) ;
validateReceivedMessages ( sampleData , outputTopic ) ;
// Close kafkaStreams1 (with cleanup) and start it again to force the restoration of the state.
kafkaStreams . close ( Duration . ofMillis ( IntegrationTestUtils . DEFAULT_TIMEOUT ) ) ;
IntegrationTestUtils . purgeLocalStreamsState ( streamsConfigurations ) ;
final TestStateRestoreListener kafkaStreams1StateRestoreListener = new TestStateRestoreListener ( RESTORATION_DELAY ) ;
kafkaStreams = startKafkaStreams ( builder , kafkaStreams1StateRestoreListener , kafkaStreams1Configuration ) ;
assertTrue ( kafkaStreams1StateRestoreListener . awaitUntilRestorationStarts ( ) ) ;
assertTrue ( kafkaStreams1StateRestoreListener . awaitUntilBatchRestoredIsCalled ( ) ) ;
// Simulate a new instance joining in the middle of the restoration.
// When this happens, some of the partitions that kafkaStreams1 was restoring will be migrated to kafkaStreams2,
// and kafkaStreams1 must call StateRestoreListener#onRestoreSuspended.
final TestStateRestoreListener kafkaStreams2StateRestoreListener = new TestStateRestoreListener ( RESTORATION_DELAY ) ;
final KafkaStreams kafkaStreams2 = startKafkaStreams ( builder , kafkaStreams2StateRestoreListener , kafkaStreams2Configuration ) ;
assertTrue ( kafkaStreams1StateRestoreListener . awaitUntilRestorationSuspends ( ) ) ;
assertTrue ( kafkaStreams2StateRestoreListener . awaitUntilRestorationStarts ( ) ) ;
assertTrue ( kafkaStreams1StateRestoreListener . awaitUntilRestorationEnds ( ) ) ;
assertTrue ( kafkaStreams2StateRestoreListener . awaitUntilRestorationEnds ( ) ) ;
// Cleanup
kafkaStreams2 . close ( Duration . ofMillis ( IntegrationTestUtils . DEFAULT_TIMEOUT ) ) ;
}
private void validateReceivedMessages ( final List < KeyValue < Integer , Integer > > expectedRecords ,
final String outputTopic ) throws Exception {
final Properties consumerProperties = new Properties ( ) ;
consumerProperties . setProperty ( ConsumerConfig . BOOTSTRAP_SERVERS_CONFIG , CLUSTER . bootstrapServers ( ) ) ;
consumerProperties . setProperty ( ConsumerConfig . GROUP_ID_CONFIG , "group-" + appId ) ;
consumerProperties . setProperty ( ConsumerConfig . AUTO_OFFSET_RESET_CONFIG , "earliest" ) ;
consumerProperties . setProperty (
ConsumerConfig . KEY_DESERIALIZER_CLASS_CONFIG ,
IntegerDeserializer . class . getName ( )
) ;
consumerProperties . setProperty (
ConsumerConfig . VALUE_DESERIALIZER_CLASS_CONFIG ,
IntegerDeserializer . class . getName ( )
) ;
IntegrationTestUtils . waitUntilFinalKeyValueRecordsReceived (
consumerProperties ,
outputTopic ,
expectedRecords
) ;
}
private KafkaStreams startKafkaStreams ( final StreamsBuilder streamsBuilder ,
final StateRestoreListener stateRestoreListener ,
final Map < String , Object > extraConfiguration ) {
final Properties streamsConfiguration = props ( mkObjectProperties ( extraConfiguration ) ) ;
streamsConfiguration . put ( StreamsConfig . restoreConsumerPrefix ( ConsumerConfig . MAX_POLL_RECORDS_CONFIG ) , 10 ) ;
final KafkaStreams kafkaStreams = new KafkaStreams ( streamsBuilder . build ( ) , streamsConfiguration ) ;
kafkaStreams . setGlobalStateRestoreListener ( stateRestoreListener ) ;
kafkaStreams . start ( ) ;
return kafkaStreams ;
}
private static final class TestStateRestoreListener implements StateRestoreListener {
private final Duration onBatchRestoredSleepDuration ;
private final CountDownLatch onRestoreStartLatch = new CountDownLatch ( 1 ) ;
private final CountDownLatch onRestoreEndLatch = new CountDownLatch ( 1 ) ;
private final CountDownLatch onRestoreSuspendedLatch = new CountDownLatch ( 1 ) ;
private final CountDownLatch onBatchRestoredLatch = new CountDownLatch ( 1 ) ;
TestStateRestoreListener ( final Duration onBatchRestoredSleepDuration ) {
this . onBatchRestoredSleepDuration = onBatchRestoredSleepDuration ;
}
boolean awaitUntilRestorationStarts ( ) throws InterruptedException {
return awaitLatchWithTimeout ( onRestoreStartLatch ) ;
}
boolean awaitUntilRestorationSuspends ( ) throws InterruptedException {
return awaitLatchWithTimeout ( onRestoreSuspendedLatch ) ;
}
boolean awaitUntilRestorationEnds ( ) throws InterruptedException {
return awaitLatchWithTimeout ( onRestoreEndLatch ) ;
}
public boolean awaitUntilBatchRestoredIsCalled ( ) throws InterruptedException {
return awaitLatchWithTimeout ( onBatchRestoredLatch ) ;
}
@Override
public void onRestoreStart ( final TopicPartition topicPartition ,
final String storeName ,
final long startingOffset ,
final long endingOffset ) {
onRestoreStartLatch . countDown ( ) ;
}
@Override
public void onBatchRestored ( final TopicPartition topicPartition ,
final String storeName ,
final long batchEndOffset ,
final long numRestored ) {
Utils . sleep ( onBatchRestoredSleepDuration . toMillis ( ) ) ;
onBatchRestoredLatch . countDown ( ) ;
}
@Override
public void onRestoreEnd ( final TopicPartition topicPartition ,
final String storeName ,
final long totalRestored ) {
onRestoreEndLatch . countDown ( ) ;
}
@Override
public void onRestoreSuspended ( final TopicPartition topicPartition ,
final String storeName ,
final long totalRestored ) {
onRestoreSuspendedLatch . countDown ( ) ;
}
private static boolean awaitLatchWithTimeout ( final CountDownLatch latch ) throws InterruptedException {
return latch . await ( IntegrationTestUtils . DEFAULT_TIMEOUT , TimeUnit . MILLISECONDS ) ;
}
}
private void sendEvents ( final String topic , final List < KeyValue < Integer , Integer > > events ) {
IntegrationTestUtils . produceKeyValuesSynchronously (
topic ,
events ,
TestUtils . producerConfig (
CLUSTER . bootstrapServers ( ) ,
IntegerSerializer . class ,
IntegerSerializer . class ,
new Properties ( )
) ,
CLUSTER . time
) ;
}
private static KeyValueBytesStoreSupplier getCloseCountingStore ( final String name ) {
return new KeyValueBytesStoreSupplier ( ) {
@Override