diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamSessionWindowAggregate.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamSessionWindowAggregate.java index 2a3f27c68fc..5f7f709a654 100644 --- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamSessionWindowAggregate.java +++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamSessionWindowAggregate.java @@ -264,22 +264,24 @@ public class KStreamSessionWindowAggregate implements KStreamAgg final long emitRangeUpperBound) { final long startMs = time.milliseconds(); + int emittedCount = 0; + // Only time ordered (indexed) session store should have implemented // this function, otherwise a not-supported exception would throw - final KeyValueIterator, VAgg> windowToEmit = store - .findSessions(emitRangeLowerBound, emitRangeUpperBound); - - int emittedCount = 0; - while (windowToEmit.hasNext()) { - emittedCount++; - final KeyValue, VAgg> kv = windowToEmit.next(); - - tupleForwarder.maybeForward( - record.withKey(kv.key) - .withValue(new Change<>(kv.value, null)) - // set the timestamp as the window end timestamp - .withTimestamp(kv.key.window().end()) - .withHeaders(record.headers())); + try (final KeyValueIterator, VAgg> windowToEmit = store + .findSessions(emitRangeLowerBound, emitRangeUpperBound)) { + + while (windowToEmit.hasNext()) { + emittedCount++; + final KeyValue, VAgg> kv = windowToEmit.next(); + + tupleForwarder.maybeForward( + record.withKey(kv.key) + .withValue(new Change<>(kv.value, null)) + // set the timestamp as the window end timestamp + .withTimestamp(kv.key.window().end()) + .withHeaders(record.headers())); + } } emittedRecordsSensor.record(emittedCount); emitFinalLatencySensor.record(time.milliseconds() - startMs);