@ -28,15 +28,16 @@ import org.apache.kafka.streams.KeyValue;
@@ -28,15 +28,16 @@ import org.apache.kafka.streams.KeyValue;
import org.apache.kafka.streams.StoreQueryParameters ;
import org.apache.kafka.streams.StreamsBuilder ;
import org.apache.kafka.streams.StreamsConfig ;
import org.apache.kafka.streams.errors.InvalidStateStoreException ;
import org.apache.kafka.streams.integration.utils.EmbeddedKafkaCluster ;
import org.apache.kafka.streams.integration.utils.IntegrationTestUtils ;
import org.apache.kafka.streams.kstream.Consumed ;
import org.apache.kafka.streams.kstream.Materialized ;
import org.apache.kafka.streams.state.KeyValueStore ;
import org.apache.kafka.streams.state.QueryableStoreType ;
import org.apache.kafka.streams.state.QueryableStoreTypes ;
import org.apache.kafka.streams.state.ReadOnlyKeyValueStore ;
import org.apache.kafka.test.IntegrationTest ;
import org.apache.kafka.test.TestCondition ;
import org.apache.kafka.test.TestUtils ;
import org.junit.After ;
import org.junit.Before ;
@ -44,6 +45,8 @@ import org.junit.Rule;
@@ -44,6 +45,8 @@ import org.junit.Rule;
import org.junit.Test ;
import org.junit.experimental.categories.Category ;
import org.junit.rules.TestName ;
import org.slf4j.Logger ;
import org.slf4j.LoggerFactory ;
import java.time.Duration ;
import java.util.ArrayList ;
@ -55,19 +58,25 @@ import java.util.concurrent.TimeUnit;
@@ -55,19 +58,25 @@ import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors ;
import java.util.stream.IntStream ;
import static org.apache.kafka.streams.integration.utils.IntegrationTestUtils.getStore ;
import static org.apache.kafka.streams.integration.utils.IntegrationTestUtils.safeUniqueTestName ;
import static org.apache.kafka.streams.integration.utils.IntegrationTestUtils.startApplicationAndWaitUntilRunning ;
import static org.apache.kafka.streams.state.QueryableStoreTypes.keyValueStore ;
import static org.hamcrest.MatcherAssert.assertThat ;
import static org.hamcrest.Matchers.containsString ;
import static org.hamcrest.Matchers.equalTo ;
import static org.hamcrest.Matchers.is ;
import static org.hamcrest.Matchers.notNullValue ;
import static org.hamcrest.Matchers.nullValue ;
import static org.junit.Assert.assertThrows ;
import static org.junit.Assert.assertTrue ;
@Category ( { IntegrationTest . class } )
public class StoreQueryIntegrationTest {
private static final Logger LOG = LoggerFactory . getLogger ( StoreQueryIntegrationTest . class ) ;
private static final int NUM_BROKERS = 1 ;
private static int port = 0 ;
private static final String INPUT_TOPIC_NAME = "input-topic" ;
@ -117,17 +126,34 @@ public class StoreQueryIntegrationTest {
@@ -117,17 +126,34 @@ public class StoreQueryIntegrationTest {
// Assert that all messages in the first batch were processed in a timely manner
assertThat ( semaphore . tryAcquire ( batch1NumMessages , 60 , TimeUnit . SECONDS ) , is ( equalTo ( true ) ) ) ;
until ( ( ) - > {
final KeyQueryMetadata keyQueryMetadata = kafkaStreams1 . queryMetadataForKey ( TABLE_NAME , key , ( topic , somekey , value , numPartitions ) - > 0 ) ;
final QueryableStoreType < ReadOnlyKeyValueStore < Integer , Integer > > queryableStoreType = QueryableStoreTypes . keyValueStore ( ) ;
final ReadOnlyKeyValueStore < Integer , Integer > store1 = IntegrationTestUtils . getStore ( TABLE_NAME , kafkaStreams1 , queryableStoreType ) ;
final ReadOnlyKeyValueStore < Integer , Integer > store2 = IntegrationTestUtils . getStore ( TABLE_NAME , kafkaStreams2 , queryableStoreType ) ;
final QueryableStoreType < ReadOnlyKeyValueStore < Integer , Integer > > queryableStoreType = keyValueStore ( ) ;
final ReadOnlyKeyValueStore < Integer , Integer > store1 = getStore ( TABLE_NAME , kafkaStreams1 , queryableStoreType ) ;
final ReadOnlyKeyValueStore < Integer , Integer > store2 = getStore ( TABLE_NAME , kafkaStreams2 , queryableStoreType ) ;
final boolean kafkaStreams1IsActive = ( keyQueryMetadata . activeHost ( ) . port ( ) % 2 ) = = 1 ;
// Assert that only active is able to query for a key by default
assertThat ( kafkaStreams1IsActive ? store1 . get ( key ) : store2 . get ( key ) , is ( notNullValue ( ) ) ) ;
assertThat ( kafkaStreams1IsActive ? store2 . get ( key ) : store1 . get ( key ) , is ( nullValue ( ) ) ) ;
try {
if ( kafkaStreams1IsActive ) {
assertThat ( store2 . get ( key ) , is ( nullValue ( ) ) ) ;
} else {
assertThat ( store1 . get ( key ) , is ( nullValue ( ) ) ) ;
}
return true ;
} catch ( final InvalidStateStoreException exception ) {
assertThat (
exception . getMessage ( ) ,
containsString ( "Cannot get state store source-table because the stream thread is PARTITIONS_ASSIGNED, not RUNNING" )
) ;
LOG . info ( "Streams wasn't running. Will try again." ) ;
return false ;
}
} ) ;
}
@Test
@ -153,6 +179,7 @@ public class StoreQueryIntegrationTest {
@@ -153,6 +179,7 @@ public class StoreQueryIntegrationTest {
// Assert that all messages in the first batch were processed in a timely manner
assertThat ( semaphore . tryAcquire ( batch1NumMessages , 60 , TimeUnit . SECONDS ) , is ( equalTo ( true ) ) ) ;
until ( ( ) - > {
final KeyQueryMetadata keyQueryMetadata = kafkaStreams1 . queryMetadataForKey ( TABLE_NAME , key , ( topic , somekey , value , numPartitions ) - > 0 ) ;
//key belongs to this partition
@ -162,15 +189,15 @@ public class StoreQueryIntegrationTest {
@@ -162,15 +189,15 @@ public class StoreQueryIntegrationTest {
final int keyDontBelongPartition = ( keyPartition = = 0 ) ? 1 : 0 ;
final boolean kafkaStreams1IsActive = ( keyQueryMetadata . activeHost ( ) . port ( ) % 2 ) = = 1 ;
StoreQueryParameters < ReadOnlyKeyValueStore < Integer , Integer > > storeQueryParam =
StoreQueryParameters . < ReadOnlyKeyValueStore < Integer , Integer > > fromNameAndType ( TABLE_NAME , QueryableStoreTypes . keyValueStore ( ) )
final StoreQueryParameters < ReadOnlyKeyValueStore < Integer , Integer > > storeQueryParam =
StoreQueryParameters . < ReadOnlyKeyValueStore < Integer , Integer > > fromNameAndType ( TABLE_NAME , keyValueStore ( ) )
. withPartition ( keyPartition ) ;
ReadOnlyKeyValueStore < Integer , Integer > store1 = null ;
ReadOnlyKeyValueStore < Integer , Integer > store2 = null ;
if ( kafkaStreams1IsActive ) {
store1 = IntegrationTestUtils . getStore ( kafkaStreams1 , storeQueryParam ) ;
store1 = getStore ( kafkaStreams1 , storeQueryParam ) ;
} else {
store2 = IntegrationTestUtils . getStore ( kafkaStreams2 , storeQueryParam ) ;
store2 = getStore ( kafkaStreams2 , storeQueryParam ) ;
}
if ( kafkaStreams1IsActive ) {
@ -184,20 +211,43 @@ public class StoreQueryIntegrationTest {
@@ -184,20 +211,43 @@ public class StoreQueryIntegrationTest {
// Assert that only active for a specific requested partition serves key if stale stores and not enabled
assertThat ( kafkaStreams1IsActive ? store1 . get ( key ) : store2 . get ( key ) , is ( notNullValue ( ) ) ) ;
storeQueryParam = StoreQueryParameters . < ReadOnlyKeyValueStore < Integer , Integer > > fromNameAndType ( TABLE_NAME , QueryableStoreTypes . keyValueStore ( ) )
final StoreQueryParameters < ReadOnlyKeyValueStore < Integer , Integer > > storeQueryParam2 =
StoreQueryParameters . < ReadOnlyKeyValueStore < Integer , Integer > > fromNameAndType ( TABLE_NAME , keyValueStore ( ) )
. withPartition ( keyDontBelongPartition ) ;
ReadOnlyKeyValueStore < Integer , Integer > store3 = null ;
ReadOnlyKeyValueStore < Integer , Integer > store4 = null ;
if ( ! kafkaStreams1IsActive ) {
store3 = IntegrationTestUtils . getStore ( kafkaStreams1 , storeQueryParam ) ;
} else {
store4 = IntegrationTestUtils . getStore ( kafkaStreams2 , storeQueryParam ) ;
}
try {
// Assert that key is not served when wrong specific partition is requested
// If kafkaStreams1 is active for keyPartition, kafkaStreams2 would be active for keyDontBelongPartition
// So, in that case, store3 would be null and the store4 would not return the value for key as wrong partition was requested
assertThat ( kafkaStreams1IsActive ? store4 . get ( key ) : store3 . get ( key ) , is ( nullValue ( ) ) ) ;
if ( kafkaStreams1IsActive ) {
assertThat ( getStore ( kafkaStreams2 , storeQueryParam2 ) . get ( key ) , is ( nullValue ( ) ) ) ;
final InvalidStateStoreException exception =
assertThrows ( InvalidStateStoreException . class , ( ) - > getStore ( kafkaStreams1 , storeQueryParam2 ) . get ( key ) ) ;
assertThat (
exception . getMessage ( ) ,
containsString ( "The specified partition 1 for store source-table does not exist." )
) ;
} else {
assertThat ( getStore ( kafkaStreams1 , storeQueryParam2 ) . get ( key ) , is ( nullValue ( ) ) ) ;
final InvalidStateStoreException exception =
assertThrows ( InvalidStateStoreException . class , ( ) - > getStore ( kafkaStreams2 , storeQueryParam2 ) . get ( key ) ) ;
assertThat (
exception . getMessage ( ) ,
containsString ( "The specified partition 1 for store source-table does not exist." )
) ;
}
return true ;
} catch ( final InvalidStateStoreException exception ) {
assertThat (
exception . getMessage ( ) ,
containsString ( "Cannot get state store source-table because the stream thread is PARTITIONS_ASSIGNED, not RUNNING" )
) ;
LOG . info ( "Streams wasn't running. Will try again." ) ;
return false ;
}
} ) ;
}
@Test
@ -224,17 +274,15 @@ public class StoreQueryIntegrationTest {
@@ -224,17 +274,15 @@ public class StoreQueryIntegrationTest {
// Assert that all messages in the first batch were processed in a timely manner
assertThat ( semaphore . tryAcquire ( batch1NumMessages , 60 , TimeUnit . SECONDS ) , is ( equalTo ( true ) ) ) ;
final QueryableStoreType < ReadOnlyKeyValueStore < Integer , Integer > > queryableStoreType = QueryableStoreTypes . keyValueStore ( ) ;
final QueryableStoreType < ReadOnlyKeyValueStore < Integer , Integer > > queryableStoreType = keyValueStore ( ) ;
// Assert that both active and standby are able to query for a key
TestUtils . waitForCondition ( ( ) - > {
final ReadOnlyKeyValueStore < Integer , Integer > store1 = IntegrationTestUtils
. getStore ( TABLE_NAME , kafkaStreams1 , true , queryableStoreType ) ;
final ReadOnlyKeyValueStore < Integer , Integer > store1 = getStore ( TABLE_NAME , kafkaStreams1 , true , queryableStoreType ) ;
return store1 . get ( key ) ! = null ;
} , "store1 cannot find results for key" ) ;
TestUtils . waitForCondition ( ( ) - > {
final ReadOnlyKeyValueStore < Integer , Integer > store2 = IntegrationTestUtils
. getStore ( TABLE_NAME , kafkaStreams2 , true , queryableStoreType ) ;
final ReadOnlyKeyValueStore < Integer , Integer > store2 = getStore ( TABLE_NAME , kafkaStreams2 , true , queryableStoreType ) ;
return store2 . get ( key ) ! = null ;
} , "store2 cannot find results for key" ) ;
}
@ -269,7 +317,7 @@ public class StoreQueryIntegrationTest {
@@ -269,7 +317,7 @@ public class StoreQueryIntegrationTest {
//key doesn't belongs to this partition
final int keyDontBelongPartition = ( keyPartition = = 0 ) ? 1 : 0 ;
final QueryableStoreType < ReadOnlyKeyValueStore < Integer , Integer > > queryableStoreType = QueryableStoreTypes . keyValueStore ( ) ;
final QueryableStoreType < ReadOnlyKeyValueStore < Integer , Integer > > queryableStoreType = keyValueStore ( ) ;
// Assert that both active and standby are able to query for a key
final StoreQueryParameters < ReadOnlyKeyValueStore < Integer , Integer > > param = StoreQueryParameters
@ -277,11 +325,11 @@ public class StoreQueryIntegrationTest {
@@ -277,11 +325,11 @@ public class StoreQueryIntegrationTest {
. enableStaleStores ( )
. withPartition ( keyPartition ) ;
TestUtils . waitForCondition ( ( ) - > {
final ReadOnlyKeyValueStore < Integer , Integer > store1 = IntegrationTestUtils . getStore ( kafkaStreams1 , param ) ;
final ReadOnlyKeyValueStore < Integer , Integer > store1 = getStore ( kafkaStreams1 , param ) ;
return store1 . get ( key ) ! = null ;
} , "store1 cannot find results for key" ) ;
TestUtils . waitForCondition ( ( ) - > {
final ReadOnlyKeyValueStore < Integer , Integer > store2 = IntegrationTestUtils . getStore ( kafkaStreams2 , param ) ;
final ReadOnlyKeyValueStore < Integer , Integer > store2 = getStore ( kafkaStreams2 , param ) ;
return store2 . get ( key ) ! = null ;
} , "store2 cannot find results for key" ) ;
@ -289,8 +337,8 @@ public class StoreQueryIntegrationTest {
@@ -289,8 +337,8 @@ public class StoreQueryIntegrationTest {
. fromNameAndType ( TABLE_NAME , queryableStoreType )
. enableStaleStores ( )
. withPartition ( keyDontBelongPartition ) ;
final ReadOnlyKeyValueStore < Integer , Integer > store3 = IntegrationTestUtils . getStore ( kafkaStreams1 , otherParam ) ;
final ReadOnlyKeyValueStore < Integer , Integer > store4 = IntegrationTestUtils . getStore ( kafkaStreams2 , otherParam ) ;
final ReadOnlyKeyValueStore < Integer , Integer > store3 = getStore ( kafkaStreams1 , otherParam ) ;
final ReadOnlyKeyValueStore < Integer , Integer > store4 = getStore ( kafkaStreams2 , otherParam ) ;
// Assert that
assertThat ( store3 . get ( key ) , is ( nullValue ( ) ) ) ;
@ -337,7 +385,7 @@ public class StoreQueryIntegrationTest {
@@ -337,7 +385,7 @@ public class StoreQueryIntegrationTest {
//key doesn't belongs to this partition
final int keyDontBelongPartition = ( keyPartition = = 0 ) ? 1 : 0 ;
final QueryableStoreType < ReadOnlyKeyValueStore < Integer , Integer > > queryableStoreType = QueryableStoreTypes . keyValueStore ( ) ;
final QueryableStoreType < ReadOnlyKeyValueStore < Integer , Integer > > queryableStoreType = keyValueStore ( ) ;
// Assert that both active and standby are able to query for a key
final StoreQueryParameters < ReadOnlyKeyValueStore < Integer , Integer > > param = StoreQueryParameters
@ -345,11 +393,11 @@ public class StoreQueryIntegrationTest {
@@ -345,11 +393,11 @@ public class StoreQueryIntegrationTest {
. enableStaleStores ( )
. withPartition ( keyPartition ) ;
TestUtils . waitForCondition ( ( ) - > {
final ReadOnlyKeyValueStore < Integer , Integer > store1 = IntegrationTestUtils . getStore ( kafkaStreams1 , param ) ;
final ReadOnlyKeyValueStore < Integer , Integer > store1 = getStore ( kafkaStreams1 , param ) ;
return store1 . get ( key ) ! = null ;
} , "store1 cannot find results for key" ) ;
TestUtils . waitForCondition ( ( ) - > {
final ReadOnlyKeyValueStore < Integer , Integer > store2 = IntegrationTestUtils . getStore ( kafkaStreams2 , param ) ;
final ReadOnlyKeyValueStore < Integer , Integer > store2 = getStore ( kafkaStreams2 , param ) ;
return store2 . get ( key ) ! = null ;
} , "store2 cannot find results for key" ) ;
@ -357,14 +405,29 @@ public class StoreQueryIntegrationTest {
@@ -357,14 +405,29 @@ public class StoreQueryIntegrationTest {
. fromNameAndType ( TABLE_NAME , queryableStoreType )
. enableStaleStores ( )
. withPartition ( keyDontBelongPartition ) ;
final ReadOnlyKeyValueStore < Integer , Integer > store3 = IntegrationTestUtils . getStore ( kafkaStreams1 , otherParam ) ;
final ReadOnlyKeyValueStore < Integer , Integer > store4 = IntegrationTestUtils . getStore ( kafkaStreams2 , otherParam ) ;
final ReadOnlyKeyValueStore < Integer , Integer > store3 = getStore ( kafkaStreams1 , otherParam ) ;
final ReadOnlyKeyValueStore < Integer , Integer > store4 = getStore ( kafkaStreams2 , otherParam ) ;
// Assert that
assertThat ( store3 . get ( key ) , is ( nullValue ( ) ) ) ;
assertThat ( store4 . get ( key ) , is ( nullValue ( ) ) ) ;
}
private static void until ( final TestCondition condition ) {
boolean success = false ;
final long deadline = System . currentTimeMillis ( ) + IntegrationTestUtils . DEFAULT_TIMEOUT ;
while ( ! success & & System . currentTimeMillis ( ) < deadline ) {
try {
success = condition . conditionMet ( ) ;
Thread . sleep ( 500L ) ;
} catch ( final RuntimeException e ) {
throw e ;
} catch ( final Exception e ) {
throw new RuntimeException ( e ) ;
}
}
}
private KafkaStreams createKafkaStreams ( final StreamsBuilder builder , final Properties config ) {
final KafkaStreams streams = new KafkaStreams ( builder . build ( config ) , config ) ;
streamsToCleanup . add ( streams ) ;