@ -16,15 +16,20 @@
@@ -16,15 +16,20 @@
* /
package org.apache.kafka.streams.kstream.internals.suppress ;
import org.apache.kafka.common.serialization.Serde ;
import org.apache.kafka.common.serialization.Serdes ;
import org.apache.kafka.streams.KeyValue ;
import org.apache.kafka.streams.errors.StreamsException ;
import org.apache.kafka.streams.kstream.Suppressed ;
import org.apache.kafka.streams.kstream.TimeWindowedDeserializer ;
import org.apache.kafka.streams.kstream.TimeWindowedSerializer ;
import org.apache.kafka.streams.kstream.Windowed ;
import org.apache.kafka.streams.kstream.internals.Change ;
import org.apache.kafka.streams.kstream.internals.FullChangeSerde ;
import org.apache.kafka.streams.kstream.internals.SessionWindow ;
import org.apache.kafka.streams.kstream.internals.TimeWindow ;
import org.apache.kafka.streams.processor.MockProcessorContext ;
import org.apache.kafka.streams.processor.internals.ProcessorNode ;
import org.apache.kafka.test.MockInternalProcessorContext ;
import org.hamcrest.BaseMatcher ;
import org.hamcrest.Description ;
@ -38,25 +43,23 @@ import static java.time.Duration.ZERO;
@@ -38,25 +43,23 @@ import static java.time.Duration.ZERO;
import static java.time.Duration.ofMillis ;
import static org.apache.kafka.common.serialization.Serdes.Long ;
import static org.apache.kafka.common.serialization.Serdes.String ;
import static org.apache.kafka.streams.kstream.Suppressed.BufferConfig.maxBytes ;
import static org.apache.kafka.streams.kstream.Suppressed.BufferConfig.maxRecords ;
import static org.apache.kafka.streams.kstream.Suppressed.BufferConfig.unbounded ;
import static org.apache.kafka.streams.kstream.Suppressed.untilTimeLimit ;
import static org.apache.kafka.streams.kstream.Suppressed.untilWindowCloses ;
import static org.apache.kafka.streams.kstream.WindowedSerdes.sessionWindowedSerdeFrom ;
import static org.apache.kafka.streams.kstream.WindowedSerdes.timeWindowedSerdeFrom ;
import static org.hamcrest.CoreMatchers.containsString ;
import static org.hamcrest.CoreMatchers.is ;
import static org.hamcrest.MatcherAssert.assertThat ;
import static org.junit.Assert.fail ;
@SuppressWarnings ( "PointlessArithmeticExpression" )
public class KTableSuppressProcessorTest {
private static final long ARBITRARY_LONG = 5L ;
private static final long ARBITRARY_TIMESTAMP = 1993L ;
private static final Change < Long > ARBITRARY_CHANGE = new Change < > ( 7L , 14L ) ;
private static final TimeWindow ARBITRARY_WINDOW = new TimeWindow ( 0L , 100L ) ;
@Test
public void zeroTimeLimitShouldImmediatelyEmit ( ) {
final KTableSuppressProcessor < String , Long > processor =
@ -66,7 +69,7 @@ public class KTableSuppressProcessorTest {
@@ -66,7 +69,7 @@ public class KTableSuppressProcessorTest {
processor . init ( context ) ;
final long timestamp = ARBITRARY_LONG ;
context . setTimestamp ( timestamp ) ;
context . setRecordMetadata ( "" , 0 , 0L , null , timestamp ) ;
context . setStreamTime ( timestamp ) ;
final String key = "hey" ;
final Change < Long > value = ARBITRARY_CHANGE ;
@ -83,7 +86,7 @@ public class KTableSuppressProcessorTest {
@@ -83,7 +86,7 @@ public class KTableSuppressProcessorTest {
final KTableSuppressProcessor < Windowed < String > , Long > processor =
new KTableSuppressProcessor < > (
getImpl ( untilTimeLimit ( ZERO , unbounded ( ) ) ) ,
timeWindowedSerdeFrom ( String . class ) ,
timeWindowedSerdeFrom ( String . class , 100L ) ,
new FullChangeSerde < > ( Long ( ) )
) ;
@ -91,9 +94,9 @@ public class KTableSuppressProcessorTest {
@@ -91,9 +94,9 @@ public class KTableSuppressProcessorTest {
processor . init ( context ) ;
final long timestamp = ARBITRARY_LONG ;
context . setTimestamp ( timestamp ) ;
context . setRecordMetadata ( "" , 0 , 0L , null , timestamp ) ;
context . setStreamTime ( timestamp ) ;
final Windowed < String > key = new Windowed < > ( "hey" , ARBITRARY_WINDOW ) ;
final Windowed < String > key = new Windowed < > ( "hey" , new TimeWindow ( 0L , 100L ) ) ;
final Change < Long > value = ARBITRARY_CHANGE ;
processor . process ( key , value ) ;
@ -103,7 +106,7 @@ public class KTableSuppressProcessorTest {
@@ -103,7 +106,7 @@ public class KTableSuppressProcessorTest {
assertThat ( capturedForward . timestamp ( ) , is ( timestamp ) ) ;
}
@Test ( expected = KTableSuppressProcessor . NotImplementedException . class )
@Test
public void intermediateSuppressionShouldBufferAndEmitLater ( ) {
final KTableSuppressProcessor < String , Long > processor =
new KTableSuppressProcessor < > (
@ -117,13 +120,15 @@ public class KTableSuppressProcessorTest {
@@ -117,13 +120,15 @@ public class KTableSuppressProcessorTest {
final long timestamp = 0L ;
context . setRecordMetadata ( "topic" , 0 , 0 , null , timestamp ) ;
context . setStreamTime ( timestamp ) ;
final String key = "hey" ;
final Change < Long > value = new Change < > ( null , 1L ) ;
processor . process ( key , value ) ;
assertThat ( context . forwarded ( ) , hasSize ( 0 ) ) ;
assertThat ( context . scheduledPunctuators ( ) , hasSize ( 1 ) ) ;
context . scheduledPunctuators ( ) . get ( 0 ) . getPunctuator ( ) . punctuate ( 1 ) ;
context . setRecordMetadata ( "topic" , 0 , 1 , null , 1L ) ;
context . setStreamTime ( 1L ) ;
processor . process ( "tick" , new Change < > ( null , null ) ) ;
assertThat ( context . forwarded ( ) , hasSize ( 1 ) ) ;
final MockProcessorContext . CapturedForward capturedForward = context . forwarded ( ) . get ( 0 ) ;
@ -131,38 +136,49 @@ public class KTableSuppressProcessorTest {
@@ -131,38 +136,49 @@ public class KTableSuppressProcessorTest {
assertThat ( capturedForward . timestamp ( ) , is ( timestamp ) ) ;
}
@SuppressWarnings ( "unchecked" )
private < K extends Windowed > SuppressedImpl < K > finalResults ( final Duration grace ) {
return ( ( FinalResultsSuppressionBuilder ) untilWindowCloses ( unbounded ( ) ) ) . buildFinalResultsSuppression ( grace ) ;
}
@Test ( expected = KTableSuppressProcessor . NotImplementedException . class )
@Test
public void finalResultsSuppressionShouldBufferAndEmitAtGraceExpiration ( ) {
final KTableSuppressProcessor < Windowed < String > , Long > processor = new KTableSuppressProcessor < > (
finalResults ( ofMillis ( 1L ) ) ,
timeWindowedSerdeFrom ( String . class ) ,
timeWindowedSerdeFrom ( String . class , 1L ) ,
new FullChangeSerde < > ( Long ( ) )
) ;
final MockInternalProcessorContext context = new MockInternalProcessorContext ( ) ;
processor . init ( context ) ;
final long timestamp = ARBITRARY_TIMESTAMP ;
context . setRecordMetadata ( "topic" , 0 , 0 , null , timestamp ) ;
final Windowed < String > key = new Windowed < > ( "hey" , ARBITRARY_WINDOW ) ;
final long windowStart = 99L ;
final long recordTime = 99L ;
final long windowEnd = 100L ;
context . setRecordMetadata ( "topic" , 0 , 0 , null , recordTime ) ;
context . setStreamTime ( recordTime ) ;
final Windowed < String > key = new Windowed < > ( "hey" , new TimeWindow ( windowStart , windowEnd ) ) ;
final Change < Long > value = ARBITRARY_CHANGE ;
processor . process ( key , value ) ;
assertThat ( context . forwarded ( ) , hasSize ( 0 ) ) ;
assertThat ( context . scheduledPunctuators ( ) , hasSize ( 1 ) ) ;
context . scheduledPunctuators ( ) . get ( 0 ) . getPunctuator ( ) . punctuate ( timestamp + 1L ) ;
// although the stream time is now 100, we have to wait 1 ms after the window *end* before we
// emit "hey", so we don't emit yet.
final long windowStart2 = 100L ;
final long recordTime2 = 100L ;
final long windowEnd2 = 101L ;
context . setRecordMetadata ( "topic" , 0 , 1 , null , recordTime2 ) ;
context . setStreamTime ( recordTime2 ) ;
processor . process ( new Windowed < > ( "dummyKey1" , new TimeWindow ( windowStart2 , windowEnd2 ) ) , ARBITRARY_CHANGE ) ;
assertThat ( context . forwarded ( ) , hasSize ( 0 ) ) ;
// ok, now it's time to emit "hey"
final long windowStart3 = 101L ;
final long recordTime3 = 101L ;
final long windowEnd3 = 102L ;
context . setRecordMetadata ( "topic" , 0 , 1 , null , recordTime3 ) ;
context . setStreamTime ( recordTime3 ) ;
processor . process ( new Windowed < > ( "dummyKey2" , new TimeWindow ( windowStart3 , windowEnd3 ) ) , ARBITRARY_CHANGE ) ;
assertThat ( context . forwarded ( ) , hasSize ( 1 ) ) ;
final MockProcessorContext . CapturedForward capturedForward = context . forwarded ( ) . get ( 0 ) ;
assertThat ( capturedForward . keyValue ( ) , is ( new KeyValue < > ( key , value ) ) ) ;
assertThat ( capturedForward . timestamp ( ) , is ( timestamp ) ) ;
assertThat ( capturedForward . timestamp ( ) , is ( recordTime ) ) ;
}
/ * *
@ -170,27 +186,32 @@ public class KTableSuppressProcessorTest {
@@ -170,27 +186,32 @@ public class KTableSuppressProcessorTest {
* it will still buffer events and emit only after the end of the window .
* As opposed to emitting immediately the way regular suppresion would with a time limit of 0 .
* /
@Test ( expected = KTableSuppressProcessor . NotImplementedException . class )
@Test
public void finalResultsWithZeroGraceShouldStillBufferUntilTheWindowEnd ( ) {
final KTableSuppressProcessor < Windowed < String > , Long > processor = new KTableSuppressProcessor < > (
finalResults ( ofMillis ( 0 ) ) ,
timeWindowedSerdeFrom ( String . class ) ,
timeWindowedSerdeFrom ( String . class , 100L ) ,
new FullChangeSerde < > ( Long ( ) )
) ;
final MockInternalProcessorContext context = new MockInternalProcessorContext ( ) ;
processor . init ( context ) ;
// note the record is in the past, but the window end is in the future, so we still have to buffer,
// even though the grace period is 0.
final long timestamp = 5L ;
context . setRecordMetadata ( "" , 0 , 0L , null , timestamp ) ;
final long streamTime = 99L ;
final long windowEnd = 100L ;
context . setRecordMetadata ( "" , 0 , 0L , null , timestamp ) ;
context . setStreamTime ( streamTime ) ;
final Windowed < String > key = new Windowed < > ( "hey" , new TimeWindow ( 0 , windowEnd ) ) ;
final Change < Long > value = ARBITRARY_CHANGE ;
processor . process ( key , value ) ;
assertThat ( context . forwarded ( ) , hasSize ( 0 ) ) ;
assertThat ( context . scheduledPunctuators ( ) , hasSize ( 1 ) ) ;
context . scheduledPunctuators ( ) . get ( 0 ) . getPunctuator ( ) . punctuate ( windowEnd ) ;
context . setRecordMetadata ( "" , 0 , 1L , null , windowEnd ) ;
context . setStreamTime ( windowEnd ) ;
processor . process ( new Windowed < > ( "dummyKey" , new TimeWindow ( windowEnd , windowEnd + 100L ) ) , ARBITRARY_CHANGE ) ;
assertThat ( context . forwarded ( ) , hasSize ( 1 ) ) ;
final MockProcessorContext . CapturedForward capturedForward = context . forwarded ( ) . get ( 0 ) ;
@ -202,7 +223,7 @@ public class KTableSuppressProcessorTest {
@@ -202,7 +223,7 @@ public class KTableSuppressProcessorTest {
public void finalResultsWithZeroGraceAtWindowEndShouldImmediatelyEmit ( ) {
final KTableSuppressProcessor < Windowed < String > , Long > processor = new KTableSuppressProcessor < > (
finalResults ( ofMillis ( 0 ) ) ,
timeWindowedSerdeFrom ( String . class ) ,
timeWindowedSerdeFrom ( String . class , 100L ) ,
new FullChangeSerde < > ( Long ( ) )
) ;
@ -210,7 +231,7 @@ public class KTableSuppressProcessorTest {
@@ -210,7 +231,7 @@ public class KTableSuppressProcessorTest {
processor . init ( context ) ;
final long timestamp = 100L ;
context . setTimestamp ( timestamp ) ;
context . setRecordMetadata ( "" , 0 , 0L , null , timestamp ) ;
context . setStreamTime ( timestamp ) ;
final Windowed < String > key = new Windowed < > ( "hey" , new TimeWindow ( 0 , 100L ) ) ;
final Change < Long > value = ARBITRARY_CHANGE ;
@ -226,7 +247,7 @@ public class KTableSuppressProcessorTest {
@@ -226,7 +247,7 @@ public class KTableSuppressProcessorTest {
public void finalResultsShouldSuppressTombstonesForTimeWindows ( ) {
final KTableSuppressProcessor < Windowed < String > , Long > processor = new KTableSuppressProcessor < > (
finalResults ( ofMillis ( 0 ) ) ,
timeWindowedSerdeFrom ( String . class ) ,
timeWindowedSerdeFrom ( String . class , 100L ) ,
new FullChangeSerde < > ( Long ( ) )
) ;
@ -234,7 +255,7 @@ public class KTableSuppressProcessorTest {
@@ -234,7 +255,7 @@ public class KTableSuppressProcessorTest {
processor . init ( context ) ;
final long timestamp = 100L ;
context . setTimestamp ( timestamp ) ;
context . setRecordMetadata ( "" , 0 , 0L , null , timestamp ) ;
context . setStreamTime ( timestamp ) ;
final Windowed < String > key = new Windowed < > ( "hey" , new TimeWindow ( 0 , 100L ) ) ;
final Change < Long > value = new Change < > ( null , ARBITRARY_LONG ) ;
@ -255,7 +276,7 @@ public class KTableSuppressProcessorTest {
@@ -255,7 +276,7 @@ public class KTableSuppressProcessorTest {
processor . init ( context ) ;
final long timestamp = 100L ;
context . setTimestamp ( timestamp ) ;
context . setRecordMetadata ( "" , 0 , 0L , null , timestamp ) ;
context . setStreamTime ( timestamp ) ;
final Windowed < String > key = new Windowed < > ( "hey" , new SessionWindow ( 0L , 0L ) ) ;
final Change < Long > value = new Change < > ( null , ARBITRARY_LONG ) ;
@ -264,12 +285,11 @@ public class KTableSuppressProcessorTest {
@@ -264,12 +285,11 @@ public class KTableSuppressProcessorTest {
assertThat ( context . forwarded ( ) , hasSize ( 0 ) ) ;
}
@SuppressWarnings ( "unchecked" )
@Test
public void suppressShouldNotSuppressTombstonesForTimeWindows ( ) {
final KTableSuppressProcessor < Windowed < String > , Long > processor = new KTableSuppressProcessor < Windowed < String > , Long > (
( SuppressedImpl ) untilTimeLimit ( ofMillis ( 0 ) , maxRecords ( 0 ) ) ,
timeWindowedSerdeFrom ( String . class ) ,
final KTableSuppressProcessor < Windowed < String > , Long > processor = new KTableSuppressProcessor < > (
getImpl ( untilTimeLimit ( ofMillis ( 0 ) , maxRecords ( 0 ) ) ) ,
timeWindowedSerdeFrom ( String . class , 100L ) ,
new FullChangeSerde < > ( Long ( ) )
) ;
@ -277,7 +297,7 @@ public class KTableSuppressProcessorTest {
@@ -277,7 +297,7 @@ public class KTableSuppressProcessorTest {
processor . init ( context ) ;
final long timestamp = 100L ;
context . setTimestamp ( timestamp ) ;
context . setRecordMetadata ( "" , 0 , 0L , null , timestamp ) ;
context . setStreamTime ( timestamp ) ;
final Windowed < String > key = new Windowed < > ( "hey" , new TimeWindow ( 0L , 100L ) ) ;
final Change < Long > value = new Change < > ( null , ARBITRARY_LONG ) ;
@ -289,11 +309,10 @@ public class KTableSuppressProcessorTest {
@@ -289,11 +309,10 @@ public class KTableSuppressProcessorTest {
assertThat ( capturedForward . timestamp ( ) , is ( timestamp ) ) ;
}
@SuppressWarnings ( "unchecked" )
@Test
public void suppressShouldNotSuppressTombstonesForSessionWindows ( ) {
final KTableSuppressProcessor < Windowed < String > , Long > processor = new KTableSuppressProcessor < Windowed < String > , Long > (
( SuppressedImpl ) untilTimeLimit ( ofMillis ( 0 ) , maxRecords ( 0 ) ) ,
final KTableSuppressProcessor < Windowed < String > , Long > processor = new KTableSuppressProcessor < > (
getImpl ( untilTimeLimit ( ofMillis ( 0 ) , maxRecords ( 0 ) ) ) ,
sessionWindowedSerdeFrom ( String . class ) ,
new FullChangeSerde < > ( Long ( ) )
) ;
@ -302,7 +321,7 @@ public class KTableSuppressProcessorTest {
@@ -302,7 +321,7 @@ public class KTableSuppressProcessorTest {
processor . init ( context ) ;
final long timestamp = 100L ;
context . setTimestamp ( timestamp ) ;
context . setRecordMetadata ( "" , 0 , 0L , null , timestamp ) ;
context . setStreamTime ( timestamp ) ;
final Windowed < String > key = new Windowed < > ( "hey" , new SessionWindow ( 0L , 0L ) ) ;
final Change < Long > value = new Change < > ( null , ARBITRARY_LONG ) ;
@ -314,11 +333,61 @@ public class KTableSuppressProcessorTest {
@@ -314,11 +333,61 @@ public class KTableSuppressProcessorTest {
assertThat ( capturedForward . timestamp ( ) , is ( timestamp ) ) ;
}
@SuppressWarnings ( "unchecked" )
@Test
public void suppressShouldNotSuppressTombstonesForKTable ( ) {
final KTableSuppressProcessor < String , Long > processor = new KTableSuppressProcessor < String , Long > (
( SuppressedImpl ) untilTimeLimit ( ofMillis ( 0 ) , maxRecords ( 0 ) ) ,
final KTableSuppressProcessor < String , Long > processor = new KTableSuppressProcessor < > (
getImpl ( untilTimeLimit ( ofMillis ( 0 ) , maxRecords ( 0 ) ) ) ,
Serdes . String ( ) ,
new FullChangeSerde < > ( Long ( ) )
) ;
final MockInternalProcessorContext context = new MockInternalProcessorContext ( ) ;
processor . init ( context ) ;
final long timestamp = 100L ;
context . setRecordMetadata ( "" , 0 , 0L , null , timestamp ) ;
context . setStreamTime ( timestamp ) ;
final String key = "hey" ;
final Change < Long > value = new Change < > ( null , ARBITRARY_LONG ) ;
processor . process ( key , value ) ;
assertThat ( context . forwarded ( ) , hasSize ( 1 ) ) ;
final MockProcessorContext . CapturedForward capturedForward = context . forwarded ( ) . get ( 0 ) ;
assertThat ( capturedForward . keyValue ( ) , is ( new KeyValue < > ( key , value ) ) ) ;
assertThat ( capturedForward . timestamp ( ) , is ( timestamp ) ) ;
}
@Test
public void suppressShouldEmitWhenOverRecordCapacity ( ) {
final KTableSuppressProcessor < String , Long > processor = new KTableSuppressProcessor < > (
getImpl ( untilTimeLimit ( Duration . ofDays ( 100 ) , maxRecords ( 1 ) ) ) ,
Serdes . String ( ) ,
new FullChangeSerde < > ( Long ( ) )
) ;
final MockInternalProcessorContext context = new MockInternalProcessorContext ( ) ;
processor . init ( context ) ;
final long timestamp = 100L ;
context . setStreamTime ( timestamp ) ;
context . setRecordMetadata ( "" , 0 , 0L , null , timestamp ) ;
final String key = "hey" ;
final Change < Long > value = new Change < > ( null , ARBITRARY_LONG ) ;
processor . process ( key , value ) ;
context . setRecordMetadata ( "" , 0 , 1L , null , timestamp + 1 ) ;
processor . process ( "dummyKey" , value ) ;
assertThat ( context . forwarded ( ) , hasSize ( 1 ) ) ;
final MockProcessorContext . CapturedForward capturedForward = context . forwarded ( ) . get ( 0 ) ;
assertThat ( capturedForward . keyValue ( ) , is ( new KeyValue < > ( key , value ) ) ) ;
assertThat ( capturedForward . timestamp ( ) , is ( timestamp ) ) ;
}
@Test
public void suppressShouldEmitWhenOverByteCapacity ( ) {
final KTableSuppressProcessor < String , Long > processor = new KTableSuppressProcessor < > (
getImpl ( untilTimeLimit ( Duration . ofDays ( 100 ) , maxBytes ( 60L ) ) ) ,
Serdes . String ( ) ,
new FullChangeSerde < > ( Long ( ) )
) ;
@ -327,18 +396,82 @@ public class KTableSuppressProcessorTest {
@@ -327,18 +396,82 @@ public class KTableSuppressProcessorTest {
processor . init ( context ) ;
final long timestamp = 100L ;
context . setTimestamp ( timestamp ) ;
context . setStreamTime ( timestamp ) ;
context . setRecordMetadata ( "" , 0 , 0L , null , timestamp ) ;
final String key = "hey" ;
final Change < Long > value = new Change < > ( null , ARBITRARY_LONG ) ;
processor . process ( key , value ) ;
context . setRecordMetadata ( "" , 0 , 1L , null , timestamp + 1 ) ;
processor . process ( "dummyKey" , value ) ;
assertThat ( context . forwarded ( ) , hasSize ( 1 ) ) ;
final MockProcessorContext . CapturedForward capturedForward = context . forwarded ( ) . get ( 0 ) ;
assertThat ( capturedForward . keyValue ( ) , is ( new KeyValue < > ( key , value ) ) ) ;
assertThat ( capturedForward . timestamp ( ) , is ( timestamp ) ) ;
}
@Test
public void suppressShouldShutDownWhenOverRecordCapacity ( ) {
final KTableSuppressProcessor < String , Long > processor = new KTableSuppressProcessor < > (
getImpl ( untilTimeLimit ( Duration . ofDays ( 100 ) , maxRecords ( 1 ) . shutDownWhenFull ( ) ) ) ,
Serdes . String ( ) ,
new FullChangeSerde < > ( Long ( ) )
) ;
final MockInternalProcessorContext context = new MockInternalProcessorContext ( ) ;
processor . init ( context ) ;
final long timestamp = 100L ;
context . setStreamTime ( timestamp ) ;
context . setRecordMetadata ( "" , 0 , 0L , null , timestamp ) ;
context . setCurrentNode ( new ProcessorNode ( "testNode" ) ) ;
final String key = "hey" ;
final Change < Long > value = new Change < > ( null , ARBITRARY_LONG ) ;
processor . process ( key , value ) ;
context . setRecordMetadata ( "" , 0 , 1L , null , timestamp ) ;
try {
processor . process ( "dummyKey" , value ) ;
fail ( "expected an exception" ) ;
} catch ( final StreamsException e ) {
assertThat ( e . getMessage ( ) , containsString ( "buffer exceeded its max capacity" ) ) ;
}
}
@Test
public void suppressShouldShutDownWhenOverByteCapacity ( ) {
final KTableSuppressProcessor < String , Long > processor = new KTableSuppressProcessor < > (
getImpl ( untilTimeLimit ( Duration . ofDays ( 100 ) , maxBytes ( 60L ) . shutDownWhenFull ( ) ) ) ,
Serdes . String ( ) ,
new FullChangeSerde < > ( Long ( ) )
) ;
final MockInternalProcessorContext context = new MockInternalProcessorContext ( ) ;
processor . init ( context ) ;
final long timestamp = 100L ;
context . setStreamTime ( timestamp ) ;
context . setRecordMetadata ( "" , 0 , 0L , null , timestamp ) ;
context . setCurrentNode ( new ProcessorNode ( "testNode" ) ) ;
final String key = "hey" ;
final Change < Long > value = new Change < > ( null , ARBITRARY_LONG ) ;
processor . process ( key , value ) ;
context . setRecordMetadata ( "" , 0 , 1L , null , timestamp ) ;
try {
processor . process ( "dummyKey" , value ) ;
fail ( "expected an exception" ) ;
} catch ( final StreamsException e ) {
assertThat ( e . getMessage ( ) , containsString ( "buffer exceeded its max capacity" ) ) ;
}
}
@SuppressWarnings ( "unchecked" )
private < K extends Windowed > SuppressedInternal < K > finalResults ( final Duration grace ) {
return ( ( FinalResultsSuppressionBuilder ) untilWindowCloses ( unbounded ( ) ) ) . buildFinalResultsSuppression ( grace ) ;
}
private static < E > Matcher < Collection < E > > hasSize ( final int i ) {
return new BaseMatcher < Collection < E > > ( ) {
@Override
@ -359,7 +492,15 @@ public class KTableSuppressProcessorTest {
@@ -359,7 +492,15 @@ public class KTableSuppressProcessorTest {
} ;
}
private static < K > SuppressedImpl < K > getImpl ( final Suppressed < K > suppressed ) {
return ( SuppressedImpl < K > ) suppressed ;
private static < K > SuppressedInternal < K > getImpl ( final Suppressed < K > suppressed ) {
return ( SuppressedInternal < K > ) suppressed ;
}
private < K > Serde < Windowed < K > > timeWindowedSerdeFrom ( final Class < K > rawType , final long windowSize ) {
final Serde < K > kSerde = Serdes . serdeFrom ( rawType ) ;
return new Serdes . WrapperSerde < > (
new TimeWindowedSerializer < > ( kSerde . serializer ( ) ) ,
new TimeWindowedDeserializer < > ( kSerde . deserializer ( ) , windowSize )
) ;
}
}