Browse Source

MINOR: Increase max.poll time for streams consumers

Author: Eno Thereska <eno@confluent.io>

Reviewers: Damian Guy, Matthias J. Sax, Guozhang Wang

Closes #2770 from enothereska/minor-increase-max-poll
pull/2550/merge
Eno Thereska 8 years ago committed by Guozhang Wang
parent
commit
5f88cf79fb
  1. 8
      streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java

8
streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java

@ -401,7 +401,13 @@ public class StreamsConfig extends AbstractConfig { @@ -401,7 +401,13 @@ public class StreamsConfig extends AbstractConfig {
tempConsumerDefaultOverrides.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, "1000");
tempConsumerDefaultOverrides.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
tempConsumerDefaultOverrides.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false");
tempConsumerDefaultOverrides.put(ConsumerConfig.MAX_POLL_INTERVAL_MS_CONFIG, "300000");
// MAX_POLL_INTERVAL_MS_CONFIG needs to be large for streams to handle cases when
// streams is recovering data from state stores. We may set it to Integer.MAX_VALUE since
// the streams code itself catches most exceptions and acts accordingly without needing
// this timeout. Note however that deadlocks are not detected (by definition) so we
// are losing the ability to detect them by setting this value to large. Hopefully
// deadlocks happen very rarely or never.
tempConsumerDefaultOverrides.put(ConsumerConfig.MAX_POLL_INTERVAL_MS_CONFIG, Integer.toString(Integer.MAX_VALUE));
CONSUMER_DEFAULT_OVERRIDES = Collections.unmodifiableMap(tempConsumerDefaultOverrides);
}

Loading…
Cancel
Save