Browse Source

HOTFIX: close iterator to avoid resource leak (#14624)

Reviewers: Hao Li <hli@confluent.io>, Bill Bejeck <bill@confluent.io>
pull/14327/merge
Matthias J. Sax 11 months ago committed by GitHub
parent
commit
a6c14003a9
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 30
      streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamSessionWindowAggregate.java

30
streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamSessionWindowAggregate.java

@ -264,22 +264,24 @@ public class KStreamSessionWindowAggregate<KIn, VIn, VAgg> implements KStreamAgg @@ -264,22 +264,24 @@ public class KStreamSessionWindowAggregate<KIn, VIn, VAgg> implements KStreamAgg
final long emitRangeUpperBound) {
final long startMs = time.milliseconds();
int emittedCount = 0;
// Only time ordered (indexed) session store should have implemented
// this function, otherwise a not-supported exception would throw
final KeyValueIterator<Windowed<KIn>, VAgg> windowToEmit = store
.findSessions(emitRangeLowerBound, emitRangeUpperBound);
int emittedCount = 0;
while (windowToEmit.hasNext()) {
emittedCount++;
final KeyValue<Windowed<KIn>, VAgg> kv = windowToEmit.next();
tupleForwarder.maybeForward(
record.withKey(kv.key)
.withValue(new Change<>(kv.value, null))
// set the timestamp as the window end timestamp
.withTimestamp(kv.key.window().end())
.withHeaders(record.headers()));
try (final KeyValueIterator<Windowed<KIn>, VAgg> windowToEmit = store
.findSessions(emitRangeLowerBound, emitRangeUpperBound)) {
while (windowToEmit.hasNext()) {
emittedCount++;
final KeyValue<Windowed<KIn>, VAgg> kv = windowToEmit.next();
tupleForwarder.maybeForward(
record.withKey(kv.key)
.withValue(new Change<>(kv.value, null))
// set the timestamp as the window end timestamp
.withTimestamp(kv.key.window().end())
.withHeaders(record.headers()));
}
}
emittedRecordsSensor.record(emittedCount);
emitFinalLatencySensor.record(time.milliseconds() - startMs);

Loading…
Cancel
Save