@ -20,7 +20,6 @@ import org.apache.kafka.streams.StoreQueryParameters;
@@ -20,7 +20,6 @@ import org.apache.kafka.streams.StoreQueryParameters;
import org.apache.kafka.streams.errors.InvalidStateStoreException ;
import org.apache.kafka.streams.processor.StateStore ;
import org.apache.kafka.streams.processor.TaskId ;
import org.apache.kafka.streams.processor.internals.InternalTopologyBuilder ;
import org.apache.kafka.streams.processor.internals.StreamThread ;
import org.apache.kafka.streams.processor.internals.Task ;
import org.apache.kafka.streams.state.QueryableStoreType ;
@ -28,54 +27,46 @@ import org.apache.kafka.streams.state.QueryableStoreTypes;
@@ -28,54 +27,46 @@ import org.apache.kafka.streams.state.QueryableStoreTypes;
import org.apache.kafka.streams.state.TimestampedKeyValueStore ;
import org.apache.kafka.streams.state.TimestampedWindowStore ;
import java.util.ArrayList ;
import java.util.Collections ;
import java.util.HashSet ;
import java.util.List ;
import java.util.Map ;
import java.util.Set ;
import java.util.Collection ;
import java.util.Objects ;
import java.util.Optional ;
import java.util.stream.Collectors ;
public class StreamThreadStateStoreProvider {
private final StreamThread streamThread ;
private final InternalTopologyBuilder internalTopologyBuilder ;
public StreamThreadStateStoreProvider ( final StreamThread streamThread ,
final InternalTopologyBuilder internalTopologyBuilder ) {
public StreamThreadStateStoreProvider ( final StreamThread streamThread ) {
this . streamThread = streamThread ;
this . internalTopologyBuilder = internalTopologyBuilder ;
}
@SuppressWarnings ( "unchecked" )
public < T > List < T > stores ( final StoreQueryParameters storeQueryParams ) {
final String storeName = storeQueryParams . storeName ( ) ;
final QueryableStoreType < T > queryableStoreType = storeQueryParams . queryableStoreType ( ) ;
final TaskId keyTaskId = createKeyTaskId ( storeName , storeQueryParams . partition ( ) ) ;
if ( streamThread . state ( ) = = StreamThread . State . DEAD ) {
return Collections . emptyList ( ) ;
}
final StreamThread . State state = streamThread . state ( ) ;
if ( storeQueryParams . staleStoresEnabled ( ) ? state . isAlive ( ) : state = = StreamThread . State . RUNNING ) {
final Map < TaskId , ? extends Task > tasks = storeQueryParams . staleStoresEnabled ( ) ? streamThread . allTasks ( ) : streamThread . activeTaskMap ( ) ;
final List < T > stores = new ArrayList < > ( ) ;
if ( keyTaskId ! = null ) {
final Task task = tasks . get ( keyTaskId ) ;
if ( task = = null ) {
return Collections . emptyList ( ) ;
}
final T store = validateAndListStores ( task . getStore ( storeName ) , queryableStoreType , storeName , keyTaskId ) ;
if ( store ! = null ) {
return Collections . singletonList ( store ) ;
}
final Collection < Task > tasks = storeQueryParams . staleStoresEnabled ( ) ?
streamThread . allTasks ( ) . values ( ) : streamThread . activeTasks ( ) ;
if ( storeQueryParams . partition ( ) ! = null ) {
return findStreamTask ( tasks , storeName , storeQueryParams . partition ( ) ) .
map ( streamTask - >
validateAndListStores ( streamTask . getStore ( storeName ) , queryableStoreType , storeName , streamTask . id ( ) ) ) .
map ( Collections : : singletonList ) .
orElse ( Collections . emptyList ( ) ) ;
} else {
for ( final Task streamTask : tasks . values ( ) ) {
final T store = validateAndListStores ( streamTask . getStore ( storeName ) , queryableStoreType , storeName , streamTask . id ( ) ) ;
if ( store ! = null ) {
stores . add ( store ) ;
}
}
return tasks . stream ( ) .
map ( streamTask - >
validateAndListStores ( streamTask . getStore ( storeName ) , queryableStoreType , storeName , streamTask . id ( ) ) ) .
filter ( Objects : : nonNull ) .
collect ( Collectors . toList ( ) ) ;
}
return stores ;
} else {
throw new InvalidStateStoreException ( "Cannot get state store " + storeName + " because the stream thread is " +
state + ", not RUNNING" +
@ -104,19 +95,11 @@ public class StreamThreadStateStoreProvider {
@@ -104,19 +95,11 @@ public class StreamThreadStateStoreProvider {
}
}
private TaskId createKeyTaskId ( final String storeName , final Integer partition ) {
if ( partition = = null ) {
return null ;
}
final List < String > sourceTopics = internalTopologyBuilder . stateStoreNameToSourceTopics ( ) . get ( storeName ) ;
final Set < String > sourceTopicsSet = new HashSet < > ( sourceTopics ) ;
final Map < Integer , InternalTopologyBuilder . TopicsInfo > topicGroups = internalTopologyBuilder . topicGroups ( ) ;
for ( final Map . Entry < Integer , InternalTopologyBuilder . TopicsInfo > topicGroup : topicGroups . entrySet ( ) ) {
if ( topicGroup . getValue ( ) . sourceTopics . containsAll ( sourceTopicsSet ) ) {
return new TaskId ( topicGroup . getKey ( ) , partition ) ;
}
}
throw new InvalidStateStoreException ( "Cannot get state store " + storeName + " because the requested partition " +
partition + " is not available on this instance" ) ;
private Optional < Task > findStreamTask ( final Collection < Task > tasks , final String storeName , final int partition ) {
return tasks . stream ( ) .
filter ( streamTask - > streamTask . id ( ) . partition = = partition & &
streamTask . getStore ( storeName ) ! = null & &
storeName . equals ( streamTask . getStore ( storeName ) . name ( ) ) ) .
findFirst ( ) ;
}
}