Browse Source

MINOR: improve Session expiration notice (#6618)

Reviewers: Matthias J. Sax <matthias@confluent.io>, A. Sophie Blee-Goldman <sophie@confluent.io>
pull/6611/merge
John Roesler 6 years ago committed by Matthias J. Sax
parent
commit
39400c46ac
  1. 20
      streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamSessionWindowAggregate.java
  2. 19
      streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamWindowAggregate.java
  3. 4
      streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamSessionWindowAggregateProcessorTest.java
  4. 28
      streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamWindowAggregateTest.java

20
streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamSessionWindowAggregate.java

@ -147,8 +147,24 @@ public class KStreamSessionWindowAggregate<K, V, Agg> implements KStreamAggProce @@ -147,8 +147,24 @@ public class KStreamSessionWindowAggregate<K, V, Agg> implements KStreamAggProce
tupleForwarder.maybeForward(sessionKey, agg, null);
} else {
LOG.debug(
"Skipping record for expired window. key=[{}] topic=[{}] partition=[{}] offset=[{}] timestamp=[{}] window=[{},{}) expiration=[{}]",
key, context().topic(), context().partition(), context().offset(), context().timestamp(), mergedWindow.start(), mergedWindow.end(), closeTime
"Skipping record for expired window. " +
"key=[{}] " +
"topic=[{}] " +
"partition=[{}] " +
"offset=[{}] " +
"timestamp=[{}] " +
"window=[{},{}) " +
"expiration=[{}] " +
"streamTime=[{}]",
key,
context().topic(),
context().partition(),
context().offset(),
context().timestamp(),
mergedWindow.start(),
mergedWindow.end(),
closeTime,
observedStreamTime
);
lateRecordDropSensor.record();
}

19
streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamWindowAggregate.java

@ -128,8 +128,23 @@ public class KStreamWindowAggregate<K, V, Agg, W extends Window> implements KStr @@ -128,8 +128,23 @@ public class KStreamWindowAggregate<K, V, Agg, W extends Window> implements KStr
tupleForwarder.maybeForward(new Windowed<>(key, entry.getValue()), newAgg, sendOldValues ? oldAgg : null);
} else {
log.debug(
"Skipping record for expired window. key=[{}] topic=[{}] partition=[{}] offset=[{}] timestamp=[{}] window=[{},{}) expiration=[{}]",
key, context().topic(), context().partition(), context().offset(), context().timestamp(), windowStart, windowEnd, closeTime
"Skipping record for expired window. " +
"key=[{}] " +
"topic=[{}] " +
"partition=[{}] " +
"offset=[{}] " +
"timestamp=[{}] " +
"window=[{},{}) " +
"expiration=[{}] " +
"streamTime=[{}]",
key,
context().topic(),
context().partition(),
context().offset(),
context().timestamp(),
windowStart, windowEnd,
closeTime,
observedStreamTime
);
lateRecordDropSensor.record();
}

4
streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamSessionWindowAggregateProcessorTest.java

@ -382,9 +382,9 @@ public class KStreamSessionWindowAggregateProcessorTest { @@ -382,9 +382,9 @@ public class KStreamSessionWindowAggregateProcessorTest {
greaterThan(0.0));
assertThat(
appender.getMessages(),
hasItem("Skipping record for expired window. key=[A] topic=[topic] partition=[-3] offset=[-2] timestamp=[0] window=[0,0) expiration=[10]"));
hasItem("Skipping record for expired window. key=[A] topic=[topic] partition=[-3] offset=[-2] timestamp=[0] window=[0,0) expiration=[10] streamTime=[20]"));
assertThat(
appender.getMessages(),
hasItem("Skipping record for expired window. key=[A] topic=[topic] partition=[-3] offset=[-2] timestamp=[1] window=[1,1) expiration=[10]"));
hasItem("Skipping record for expired window. key=[A] topic=[topic] partition=[-3] offset=[-2] timestamp=[1] window=[1,1) expiration=[10] streamTime=[20]"));
}
}

28
streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamWindowAggregateTest.java

@ -290,13 +290,13 @@ public class KStreamWindowAggregateTest { @@ -290,13 +290,13 @@ public class KStreamWindowAggregateTest {
);
assertThat(appender.getMessages(), hasItems(
"Skipping record for expired window. key=[k] topic=[topic] partition=[0] offset=[1] timestamp=[0] window=[0,10) expiration=[10]",
"Skipping record for expired window. key=[k] topic=[topic] partition=[0] offset=[2] timestamp=[1] window=[0,10) expiration=[10]",
"Skipping record for expired window. key=[k] topic=[topic] partition=[0] offset=[3] timestamp=[2] window=[0,10) expiration=[10]",
"Skipping record for expired window. key=[k] topic=[topic] partition=[0] offset=[4] timestamp=[3] window=[0,10) expiration=[10]",
"Skipping record for expired window. key=[k] topic=[topic] partition=[0] offset=[5] timestamp=[4] window=[0,10) expiration=[10]",
"Skipping record for expired window. key=[k] topic=[topic] partition=[0] offset=[6] timestamp=[5] window=[0,10) expiration=[10]",
"Skipping record for expired window. key=[k] topic=[topic] partition=[0] offset=[7] timestamp=[6] window=[0,10) expiration=[10]"
"Skipping record for expired window. key=[k] topic=[topic] partition=[0] offset=[1] timestamp=[0] window=[0,10) expiration=[10] streamTime=[100]",
"Skipping record for expired window. key=[k] topic=[topic] partition=[0] offset=[2] timestamp=[1] window=[0,10) expiration=[10] streamTime=[100]",
"Skipping record for expired window. key=[k] topic=[topic] partition=[0] offset=[3] timestamp=[2] window=[0,10) expiration=[10] streamTime=[100]",
"Skipping record for expired window. key=[k] topic=[topic] partition=[0] offset=[4] timestamp=[3] window=[0,10) expiration=[10] streamTime=[100]",
"Skipping record for expired window. key=[k] topic=[topic] partition=[0] offset=[5] timestamp=[4] window=[0,10) expiration=[10] streamTime=[100]",
"Skipping record for expired window. key=[k] topic=[topic] partition=[0] offset=[6] timestamp=[5] window=[0,10) expiration=[10] streamTime=[100]",
"Skipping record for expired window. key=[k] topic=[topic] partition=[0] offset=[7] timestamp=[6] window=[0,10) expiration=[10] streamTime=[100]"
));
OutputVerifier.compareKeyValueTimestamp(getOutput(driver), "[k@95/105]", "+100", 100);
@ -340,13 +340,13 @@ public class KStreamWindowAggregateTest { @@ -340,13 +340,13 @@ public class KStreamWindowAggregateTest {
assertLatenessMetrics(driver, is(7.0), is(194.0), is(97.375));
assertThat(appender.getMessages(), hasItems(
"Skipping record for expired window. key=[k] topic=[topic] partition=[0] offset=[1] timestamp=[100] window=[100,110) expiration=[110]",
"Skipping record for expired window. key=[k] topic=[topic] partition=[0] offset=[2] timestamp=[101] window=[100,110) expiration=[110]",
"Skipping record for expired window. key=[k] topic=[topic] partition=[0] offset=[3] timestamp=[102] window=[100,110) expiration=[110]",
"Skipping record for expired window. key=[k] topic=[topic] partition=[0] offset=[4] timestamp=[103] window=[100,110) expiration=[110]",
"Skipping record for expired window. key=[k] topic=[topic] partition=[0] offset=[5] timestamp=[104] window=[100,110) expiration=[110]",
"Skipping record for expired window. key=[k] topic=[topic] partition=[0] offset=[6] timestamp=[105] window=[100,110) expiration=[110]",
"Skipping record for expired window. key=[k] topic=[topic] partition=[0] offset=[7] timestamp=[6] window=[0,10) expiration=[110]"
"Skipping record for expired window. key=[k] topic=[topic] partition=[0] offset=[1] timestamp=[100] window=[100,110) expiration=[110] streamTime=[200]",
"Skipping record for expired window. key=[k] topic=[topic] partition=[0] offset=[2] timestamp=[101] window=[100,110) expiration=[110] streamTime=[200]",
"Skipping record for expired window. key=[k] topic=[topic] partition=[0] offset=[3] timestamp=[102] window=[100,110) expiration=[110] streamTime=[200]",
"Skipping record for expired window. key=[k] topic=[topic] partition=[0] offset=[4] timestamp=[103] window=[100,110) expiration=[110] streamTime=[200]",
"Skipping record for expired window. key=[k] topic=[topic] partition=[0] offset=[5] timestamp=[104] window=[100,110) expiration=[110] streamTime=[200]",
"Skipping record for expired window. key=[k] topic=[topic] partition=[0] offset=[6] timestamp=[105] window=[100,110) expiration=[110] streamTime=[200]",
"Skipping record for expired window. key=[k] topic=[topic] partition=[0] offset=[7] timestamp=[6] window=[0,10) expiration=[110] streamTime=[200]"
));
OutputVerifier.compareKeyValueTimestamp(getOutput(driver), "[k@200/210]", "+100", 200);

Loading…
Cancel
Save