@ -38,9 +38,10 @@ import org.apache.kafka.streams.kstream.ValueMapper;
@@ -38,9 +38,10 @@ import org.apache.kafka.streams.kstream.ValueMapper;
import org.apache.kafka.streams.kstream.ValueTransformerSupplier ;
import org.apache.kafka.streams.processor.FailOnInvalidTimestamp ;
import org.apache.kafka.streams.processor.ProcessorSupplier ;
import org.apache.kafka.streams.processor.StateStoreSupplier ;
import org.apache.kafka.streams.processor.StreamPartitioner ;
import org.apache.kafka.streams.state.StoreBuilder ;
import org.apache.kafka.streams.state.Stores ;
import org.apache.kafka.streams.state.WindowStore ;
import java.lang.reflect.Array ;
import java.util.Collections ;
@ -827,16 +828,16 @@ public class KStreamImpl<K, V> extends AbstractStream<K> implements KStream<K, V
@@ -827,16 +828,16 @@ public class KStreamImpl<K, V> extends AbstractStream<K> implements KStream<K, V
return groupByKey ( Serialized . with ( keySerde , valSerde ) ) ;
}
private static < K , V > StateStoreSupplier createWindowedStateStore ( final JoinWindows windows ,
final Serde < K > keySerde ,
final Serde < V > valueSerde ,
final String storeName ) {
return Stores . c rea te( storeName )
. withKeys ( keySerde )
. withValues ( valueSerde )
. persistent ( )
. windowed ( windows . size ( ) , windows . maintainMs ( ) , windows . segments , true )
. build ( ) ;
private static < K , V > StoreBuilder < WindowStore < K , V > > createWindowedStateStore ( final JoinWindows windows ,
final Serde < K > keySerde ,
final Serde < V > valueSerde ,
final String storeName ) {
return Stores . windowStoreBuilder ( Sto res . persis tentWindowStor e( storeName ,
windows . maintainMs ( ) ,
windows . segments ,
windows . size ( ) ,
true ) , keySerde , valueSerde ) ;
}
private class KStreamImplJoin {
@ -854,17 +855,17 @@ public class KStreamImpl<K, V> extends AbstractStream<K> implements KStream<K, V
@@ -854,17 +855,17 @@ public class KStreamImpl<K, V> extends AbstractStream<K> implements KStream<K, V
final KStream < K1 , V2 > other ,
final ValueJoiner < ? super V1 , ? super V2 , ? extends R > joiner ,
final JoinWindows windows ,
final Joined joined ) {
final Joined < K1 , V1 , V2 > joined ) {
String thisWindowStreamName = builder . newName ( WINDOWED_NAME ) ;
String otherWindowStreamName = builder . newName ( WINDOWED_NAME ) ;
String joinThisName = rightOuter ? builder . newName ( OUTERTHIS_NAME ) : builder . newName ( JOINTHIS_NAME ) ;
String joinOtherName = leftOuter ? builder . newName ( OUTEROTHER_NAME ) : builder . newName ( JOINOTHER_NAME ) ;
String joinMergeName = builder . newName ( MERGE_NAME ) ;
StateStoreSupplier thisWindow =
final StoreBuilder < WindowStore < K1 , V1 > > thisWindow =
createWindowedStateStore ( windows , joined . keySerde ( ) , joined . valueSerde ( ) , joinThisName + "-store" ) ;
StateStoreSupplier otherWindow =
final StoreBuilder < WindowStore < K1 , V2 > > otherWindow =
createWindowedStateStore ( windows , joined . keySerde ( ) , joined . otherValueSerde ( ) , joinOtherName + "-store" ) ;