@ -44,6 +44,7 @@ import org.apache.kafka.streams.state.SessionStore;
import org.apache.kafka.streams.state.WindowStore ;
import org.apache.kafka.streams.state.WindowStore ;
import org.apache.kafka.streams.test.ConsumerRecordFactory ;
import org.apache.kafka.streams.test.ConsumerRecordFactory ;
import org.apache.kafka.streams.test.OutputVerifier ;
import org.apache.kafka.streams.test.OutputVerifier ;
import org.apache.kafka.test.TestUtils ;
import org.junit.Test ;
import org.junit.Test ;
import java.time.Duration ;
import java.time.Duration ;
@ -70,6 +71,11 @@ public class SuppressScenarioTest {
private static final StringSerializer STRING_SERIALIZER = new StringSerializer ( ) ;
private static final StringSerializer STRING_SERIALIZER = new StringSerializer ( ) ;
private static final Serde < String > STRING_SERDE = Serdes . String ( ) ;
private static final Serde < String > STRING_SERDE = Serdes . String ( ) ;
private static final LongDeserializer LONG_DESERIALIZER = new LongDeserializer ( ) ;
private static final LongDeserializer LONG_DESERIALIZER = new LongDeserializer ( ) ;
private final Properties config = Utils . mkProperties ( Utils . mkMap (
Utils . mkEntry ( StreamsConfig . APPLICATION_ID_CONFIG , getClass ( ) . getSimpleName ( ) . toLowerCase ( Locale . getDefault ( ) ) ) ,
Utils . mkEntry ( StreamsConfig . STATE_DIR_CONFIG , TestUtils . tempDirectory ( ) . getPath ( ) ) ,
Utils . mkEntry ( StreamsConfig . BOOTSTRAP_SERVERS_CONFIG , "bogus" )
) ) ;
@Test
@Test
public void shouldImmediatelyEmitEventsWithZeroEmitAfter ( ) {
public void shouldImmediatelyEmitEventsWithZeroEmitAfter ( ) {
@ -97,10 +103,6 @@ public class SuppressScenarioTest {
final Topology topology = builder . build ( ) ;
final Topology topology = builder . build ( ) ;
final Properties config = Utils . mkProperties ( Utils . mkMap (
Utils . mkEntry ( StreamsConfig . APPLICATION_ID_CONFIG , getClass ( ) . getSimpleName ( ) . toLowerCase ( Locale . getDefault ( ) ) ) ,
Utils . mkEntry ( StreamsConfig . BOOTSTRAP_SERVERS_CONFIG , "bogus" )
) ) ;
final ConsumerRecordFactory < String , String > recordFactory = new ConsumerRecordFactory < > ( STRING_SERIALIZER , STRING_SERIALIZER ) ;
final ConsumerRecordFactory < String , String > recordFactory = new ConsumerRecordFactory < > ( STRING_SERIALIZER , STRING_SERIALIZER ) ;
@ -178,10 +180,6 @@ public class SuppressScenarioTest {
. toStream ( )
. toStream ( )
. to ( "output-raw" , Produced . with ( STRING_SERDE , Serdes . Long ( ) ) ) ;
. to ( "output-raw" , Produced . with ( STRING_SERDE , Serdes . Long ( ) ) ) ;
final Topology topology = builder . build ( ) ;
final Topology topology = builder . build ( ) ;
final Properties config = Utils . mkProperties ( Utils . mkMap (
Utils . mkEntry ( StreamsConfig . APPLICATION_ID_CONFIG , getClass ( ) . getSimpleName ( ) . toLowerCase ( Locale . getDefault ( ) ) ) ,
Utils . mkEntry ( StreamsConfig . BOOTSTRAP_SERVERS_CONFIG , "bogus" )
) ) ;
final ConsumerRecordFactory < String , String > recordFactory = new ConsumerRecordFactory < > ( STRING_SERIALIZER , STRING_SERIALIZER ) ;
final ConsumerRecordFactory < String , String > recordFactory = new ConsumerRecordFactory < > ( STRING_SERIALIZER , STRING_SERIALIZER ) ;
try ( final TopologyTestDriver driver = new TopologyTestDriver ( topology , config ) ) {
try ( final TopologyTestDriver driver = new TopologyTestDriver ( topology , config ) ) {
driver . pipeInput ( recordFactory . create ( "input" , "k1" , "v1" , 0L ) ) ;
driver . pipeInput ( recordFactory . create ( "input" , "k1" , "v1" , 0L ) ) ;
@ -251,10 +249,6 @@ public class SuppressScenarioTest {
. to ( "output-raw" , Produced . with ( STRING_SERDE , Serdes . Long ( ) ) ) ;
. to ( "output-raw" , Produced . with ( STRING_SERDE , Serdes . Long ( ) ) ) ;
final Topology topology = builder . build ( ) ;
final Topology topology = builder . build ( ) ;
System . out . println ( topology . describe ( ) ) ;
System . out . println ( topology . describe ( ) ) ;
final Properties config = Utils . mkProperties ( Utils . mkMap (
Utils . mkEntry ( StreamsConfig . APPLICATION_ID_CONFIG , getClass ( ) . getSimpleName ( ) . toLowerCase ( Locale . getDefault ( ) ) ) ,
Utils . mkEntry ( StreamsConfig . BOOTSTRAP_SERVERS_CONFIG , "bogus" )
) ) ;
final ConsumerRecordFactory < String , String > recordFactory = new ConsumerRecordFactory < > ( STRING_SERIALIZER , STRING_SERIALIZER ) ;
final ConsumerRecordFactory < String , String > recordFactory = new ConsumerRecordFactory < > ( STRING_SERIALIZER , STRING_SERIALIZER ) ;
try ( final TopologyTestDriver driver = new TopologyTestDriver ( topology , config ) ) {
try ( final TopologyTestDriver driver = new TopologyTestDriver ( topology , config ) ) {
driver . pipeInput ( recordFactory . create ( "input" , "k1" , "v1" , 0L ) ) ;
driver . pipeInput ( recordFactory . create ( "input" , "k1" , "v1" , 0L ) ) ;
@ -318,10 +312,6 @@ public class SuppressScenarioTest {
. to ( "output-raw" , Produced . with ( STRING_SERDE , Serdes . Long ( ) ) ) ;
. to ( "output-raw" , Produced . with ( STRING_SERDE , Serdes . Long ( ) ) ) ;
final Topology topology = builder . build ( ) ;
final Topology topology = builder . build ( ) ;
System . out . println ( topology . describe ( ) ) ;
System . out . println ( topology . describe ( ) ) ;
final Properties config = Utils . mkProperties ( Utils . mkMap (
Utils . mkEntry ( StreamsConfig . APPLICATION_ID_CONFIG , getClass ( ) . getSimpleName ( ) . toLowerCase ( Locale . getDefault ( ) ) ) ,
Utils . mkEntry ( StreamsConfig . BOOTSTRAP_SERVERS_CONFIG , "bogus" )
) ) ;
final ConsumerRecordFactory < String , String > recordFactory = new ConsumerRecordFactory < > ( STRING_SERIALIZER , STRING_SERIALIZER ) ;
final ConsumerRecordFactory < String , String > recordFactory = new ConsumerRecordFactory < > ( STRING_SERIALIZER , STRING_SERIALIZER ) ;
try ( final TopologyTestDriver driver = new TopologyTestDriver ( topology , config ) ) {
try ( final TopologyTestDriver driver = new TopologyTestDriver ( topology , config ) ) {
driver . pipeInput ( recordFactory . create ( "input" , "k1" , "v1" , 0L ) ) ;
driver . pipeInput ( recordFactory . create ( "input" , "k1" , "v1" , 0L ) ) ;
@ -381,10 +371,6 @@ public class SuppressScenarioTest {
. to ( "output-raw" , Produced . with ( STRING_SERDE , Serdes . Long ( ) ) ) ;
. to ( "output-raw" , Produced . with ( STRING_SERDE , Serdes . Long ( ) ) ) ;
final Topology topology = builder . build ( ) ;
final Topology topology = builder . build ( ) ;
System . out . println ( topology . describe ( ) ) ;
System . out . println ( topology . describe ( ) ) ;
final Properties config = Utils . mkProperties ( Utils . mkMap (
Utils . mkEntry ( StreamsConfig . APPLICATION_ID_CONFIG , getClass ( ) . getSimpleName ( ) . toLowerCase ( Locale . getDefault ( ) ) ) ,
Utils . mkEntry ( StreamsConfig . BOOTSTRAP_SERVERS_CONFIG , "bogus" )
) ) ;
final ConsumerRecordFactory < String , String > recordFactory = new ConsumerRecordFactory < > ( STRING_SERIALIZER , STRING_SERIALIZER ) ;
final ConsumerRecordFactory < String , String > recordFactory = new ConsumerRecordFactory < > ( STRING_SERIALIZER , STRING_SERIALIZER ) ;
try ( final TopologyTestDriver driver = new TopologyTestDriver ( topology , config ) ) {
try ( final TopologyTestDriver driver = new TopologyTestDriver ( topology , config ) ) {
driver . pipeInput ( recordFactory . create ( "input" , "k1" , "v1" , 0L ) ) ;
driver . pipeInput ( recordFactory . create ( "input" , "k1" , "v1" , 0L ) ) ;
@ -435,10 +421,6 @@ public class SuppressScenarioTest {
. to ( "output-raw" , Produced . with ( STRING_SERDE , Serdes . Long ( ) ) ) ;
. to ( "output-raw" , Produced . with ( STRING_SERDE , Serdes . Long ( ) ) ) ;
final Topology topology = builder . build ( ) ;
final Topology topology = builder . build ( ) ;
System . out . println ( topology . describe ( ) ) ;
System . out . println ( topology . describe ( ) ) ;
final Properties config = Utils . mkProperties ( Utils . mkMap (
Utils . mkEntry ( StreamsConfig . APPLICATION_ID_CONFIG , getClass ( ) . getSimpleName ( ) . toLowerCase ( Locale . getDefault ( ) ) ) ,
Utils . mkEntry ( StreamsConfig . BOOTSTRAP_SERVERS_CONFIG , "bogus" )
) ) ;
final ConsumerRecordFactory < String , String > recordFactory = new ConsumerRecordFactory < > ( STRING_SERIALIZER , STRING_SERIALIZER ) ;
final ConsumerRecordFactory < String , String > recordFactory = new ConsumerRecordFactory < > ( STRING_SERIALIZER , STRING_SERIALIZER ) ;
try ( final TopologyTestDriver driver = new TopologyTestDriver ( topology , config ) ) {
try ( final TopologyTestDriver driver = new TopologyTestDriver ( topology , config ) ) {
driver . pipeInput ( recordFactory . create ( "input" , "k1" , "v1" , 0L ) ) ;
driver . pipeInput ( recordFactory . create ( "input" , "k1" , "v1" , 0L ) ) ;
@ -494,10 +476,6 @@ public class SuppressScenarioTest {
. to ( "output-raw" , Produced . with ( STRING_SERDE , Serdes . Long ( ) ) ) ;
. to ( "output-raw" , Produced . with ( STRING_SERDE , Serdes . Long ( ) ) ) ;
final Topology topology = builder . build ( ) ;
final Topology topology = builder . build ( ) ;
System . out . println ( topology . describe ( ) ) ;
System . out . println ( topology . describe ( ) ) ;
final Properties config = Utils . mkProperties ( Utils . mkMap (
Utils . mkEntry ( StreamsConfig . APPLICATION_ID_CONFIG , getClass ( ) . getSimpleName ( ) . toLowerCase ( Locale . getDefault ( ) ) ) ,
Utils . mkEntry ( StreamsConfig . BOOTSTRAP_SERVERS_CONFIG , "bogus" )
) ) ;
final ConsumerRecordFactory < String , String > recordFactory = new ConsumerRecordFactory < > ( STRING_SERIALIZER , STRING_SERIALIZER ) ;
final ConsumerRecordFactory < String , String > recordFactory = new ConsumerRecordFactory < > ( STRING_SERIALIZER , STRING_SERIALIZER ) ;
try ( final TopologyTestDriver driver = new TopologyTestDriver ( topology , config ) ) {
try ( final TopologyTestDriver driver = new TopologyTestDriver ( topology , config ) ) {
// first window
// first window