@ -18,6 +18,7 @@ package org.apache.kafka.streams.state.internals;
@@ -18,6 +18,7 @@ package org.apache.kafka.streams.state.internals;
import java.nio.ByteBuffer ;
import java.util.Iterator ;
import java.util.Objects ;
import java.util.Set ;
import java.util.concurrent.ConcurrentHashMap ;
import java.util.concurrent.ConcurrentNavigableMap ;
@ -135,6 +136,9 @@ public class InMemoryWindowStore implements WindowStore<Bytes, byte[]> {
@@ -135,6 +136,9 @@ public class InMemoryWindowStore implements WindowStore<Bytes, byte[]> {
} else {
segmentMap . computeIfPresent ( windowStartTimestamp , ( t , kvMap ) - > {
kvMap . remove ( keyBytes ) ;
if ( kvMap . isEmpty ( ) ) {
segmentMap . remove ( windowStartTimestamp ) ;
}
return kvMap ;
} ) ;
}
@ -143,6 +147,9 @@ public class InMemoryWindowStore implements WindowStore<Bytes, byte[]> {
@@ -143,6 +147,9 @@ public class InMemoryWindowStore implements WindowStore<Bytes, byte[]> {
@Override
public byte [ ] fetch ( final Bytes key , final long windowStartTimestamp ) {
Objects . requireNonNull ( key , "key cannot be null" ) ;
removeExpiredSegments ( ) ;
if ( windowStartTimestamp < = observedStreamTime - retentionPeriod ) {
@ -160,6 +167,9 @@ public class InMemoryWindowStore implements WindowStore<Bytes, byte[]> {
@@ -160,6 +167,9 @@ public class InMemoryWindowStore implements WindowStore<Bytes, byte[]> {
@Deprecated
@Override
public WindowStoreIterator < byte [ ] > fetch ( final Bytes key , final long timeFrom , final long timeTo ) {
Objects . requireNonNull ( key , "key cannot be null" ) ;
removeExpiredSegments ( ) ;
// add one b/c records expire exactly retentionPeriod ms after created
@ -179,6 +189,9 @@ public class InMemoryWindowStore implements WindowStore<Bytes, byte[]> {
@@ -179,6 +189,9 @@ public class InMemoryWindowStore implements WindowStore<Bytes, byte[]> {
final Bytes to ,
final long timeFrom ,
final long timeTo ) {
Objects . requireNonNull ( from , "from key cannot be null" ) ;
Objects . requireNonNull ( to , "to key cannot be null" ) ;
removeExpiredSegments ( ) ;
if ( from . compareTo ( to ) > 0 ) {
@ -242,6 +255,13 @@ public class InMemoryWindowStore implements WindowStore<Bytes, byte[]> {
@@ -242,6 +255,13 @@ public class InMemoryWindowStore implements WindowStore<Bytes, byte[]> {
@Override
public void close ( ) {
if ( openIterators . size ( ) ! = 0 ) {
LOG . warn ( "Closing {} open iterators for store {}" , openIterators . size ( ) , name ) ;
for ( final InMemoryWindowStoreIteratorWrapper it : openIterators ) {
it . close ( ) ;
}
}
segmentMap . clear ( ) ;
open = false ;
}
@ -281,7 +301,7 @@ public class InMemoryWindowStore implements WindowStore<Bytes, byte[]> {
@@ -281,7 +301,7 @@ public class InMemoryWindowStore implements WindowStore<Bytes, byte[]> {
final Bytes keyTo = retainDuplicates ? wrapForDups ( key , Integer . MAX_VALUE ) : key ;
final WrappedInMemoryWindowStoreIterator iterator =
new WrappedInMemoryWindowStoreIterator ( keyFrom , keyTo , segmentIterator , openIterators : : remove ) ;
new WrappedInMemoryWindowStoreIterator ( keyFrom , keyTo , segmentIterator , openIterators : : remove , retainDuplicates ) ;
openIterators . add ( iterator ) ;
return iterator ;
@ -319,15 +339,18 @@ public class InMemoryWindowStore implements WindowStore<Bytes, byte[]> {
@@ -319,15 +339,18 @@ public class InMemoryWindowStore implements WindowStore<Bytes, byte[]> {
private final boolean allKeys ;
private final Bytes keyFrom ;
private final Bytes keyTo ;
private final boolean retainDuplicates ;
private final ClosingCallback callback ;
InMemoryWindowStoreIteratorWrapper ( final Bytes keyFrom ,
final Bytes keyTo ,
final Iterator < Map . Entry < Long , ConcurrentNavigableMap < Bytes , byte [ ] > > > segmentIterator ,
final ClosingCallback callback ) {
final ClosingCallback callback ,
final boolean retainDuplicates ) {
this . keyFrom = keyFrom ;
this . keyTo = keyTo ;
allKeys = ( keyFrom = = null ) & & ( keyTo = = null ) ;
this . retainDuplicates = retainDuplicates ;
this . segmentIterator = segmentIterator ;
this . callback = callback ;
@ -343,15 +366,26 @@ public class InMemoryWindowStore implements WindowStore<Bytes, byte[]> {
@@ -343,15 +366,26 @@ public class InMemoryWindowStore implements WindowStore<Bytes, byte[]> {
}
next = getNext ( ) ;
return next ! = null ;
}
if ( next = = null ) {
return false ;
}
public void remove ( ) {
throw new UnsupportedOperationException (
"remove() is not supported in " + getClass ( ) . getName ( ) ) ;
if ( allKeys | | ! retainDuplicates ) {
return true ;
}
final Bytes key = getKey ( next . key ) ;
if ( key . compareTo ( getKey ( keyFrom ) ) > = 0 & & key . compareTo ( getKey ( keyTo ) ) < = 0 ) {
return true ;
} else {
next = null ;
return hasNext ( ) ;
}
}
public void close ( ) {
next = null ;
recordIterator = null ;
callback . deregisterIterator ( this ) ;
}
@ -395,8 +429,9 @@ public class InMemoryWindowStore implements WindowStore<Bytes, byte[]> {
@@ -395,8 +429,9 @@ public class InMemoryWindowStore implements WindowStore<Bytes, byte[]> {
WrappedInMemoryWindowStoreIterator ( final Bytes keyFrom ,
final Bytes keyTo ,
final Iterator < Map . Entry < Long , ConcurrentNavigableMap < Bytes , byte [ ] > > > segmentIterator ,
final ClosingCallback callback ) {
super ( keyFrom , keyTo , segmentIterator , callback ) ;
final ClosingCallback callback ,
final boolean retainDuplicates ) {
super ( keyFrom , keyTo , segmentIterator , callback , retainDuplicates ) ;
}
@Override
@ -419,13 +454,12 @@ public class InMemoryWindowStore implements WindowStore<Bytes, byte[]> {
@@ -419,13 +454,12 @@ public class InMemoryWindowStore implements WindowStore<Bytes, byte[]> {
}
public static WrappedInMemoryWindowStoreIterator emptyIterator ( ) {
return new WrappedInMemoryWindowStoreIterator ( null , null , null , it - > { } ) ;
return new WrappedInMemoryWindowStoreIterator ( null , null , null , it - > { } , false ) ;
}
}
private static class WrappedWindowedKeyValueIterator extends InMemoryWindowStoreIteratorWrapper implements KeyValueIterator < Windowed < Bytes > , byte [ ] > {
private final boolean retainDuplicates ;
private final long windowSize ;
WrappedWindowedKeyValueIterator ( final Bytes keyFrom ,
@ -434,8 +468,7 @@ public class InMemoryWindowStore implements WindowStore<Bytes, byte[]> {
@@ -434,8 +468,7 @@ public class InMemoryWindowStore implements WindowStore<Bytes, byte[]> {
final ClosingCallback callback ,
final boolean retainDuplicates ,
final long windowSize ) {
super ( keyFrom , keyTo , segmentIterator , callback ) ;
this . retainDuplicates = retainDuplicates ;
super ( keyFrom , keyTo , segmentIterator , callback , retainDuplicates ) ;
this . windowSize = windowSize ;
}
@ -457,8 +490,15 @@ public class InMemoryWindowStore implements WindowStore<Bytes, byte[]> {
@@ -457,8 +490,15 @@ public class InMemoryWindowStore implements WindowStore<Bytes, byte[]> {
}
private Windowed < Bytes > getWindowedKey ( ) {
final Bytes key = retainDuplicates ? getKey ( super . next . key ) : super . next . key ;
final TimeWindow timeWindow = new TimeWindow ( super . currentTime , super . currentTime + windowSize ) ;
final Bytes key = super . retainDuplicates ? getKey ( super . next . key ) : super . next . key ;
long endTime = super . currentTime + windowSize ;
if ( endTime < 0 ) {
LOG . warn ( "Warning: window end time was truncated to Long.MAX" ) ;
endTime = Long . MAX_VALUE ;
}
final TimeWindow timeWindow = new TimeWindow ( super . currentTime , endTime ) ;
return new Windowed < > ( key , timeWindow ) ;
}
}