Browse Source

KAFKA-2400: Expose heartbeat interval in KafkaConsumer configuration

Author: Jason Gustafson <jason@confluent.io>

Reviewers: Guozhang Wang

Closes #116 from hachikuji/KAFKA-2400 and squashes the following commits:

3c1b1dd [Jason Gustafson] KAFKA-2400; expose heartbeat interval in KafkaConsumer configuration
pull/116/merge
Jason Gustafson 9 years ago committed by Guozhang Wang
parent
commit
006b45c7e5
  1. 11
      clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerConfig.java
  2. 1
      clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java
  3. 3
      clients/src/main/java/org/apache/kafka/clients/consumer/internals/Coordinator.java
  4. 24
      clients/src/main/java/org/apache/kafka/clients/consumer/internals/Heartbeat.java
  5. 2
      clients/src/test/java/org/apache/kafka/clients/consumer/internals/CoordinatorTest.java
  6. 7
      clients/src/test/java/org/apache/kafka/clients/consumer/internals/HeartbeatTest.java
  7. 3
      core/src/test/scala/integration/kafka/api/ConsumerBounceTest.scala
  8. 3
      core/src/test/scala/integration/kafka/api/ConsumerTest.scala

11
clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerConfig.java

@ -50,6 +50,12 @@ public class ConsumerConfig extends AbstractConfig { @@ -50,6 +50,12 @@ public class ConsumerConfig extends AbstractConfig {
public static final String SESSION_TIMEOUT_MS_CONFIG = "session.timeout.ms";
private static final String SESSION_TIMEOUT_MS_DOC = "The timeout used to detect failures when using Kafka's group management facilities.";
/**
* <code>heartbeat.interval.ms</code>
*/
public static final String HEARTBEAT_INTERVAL_MS_CONFIG = "heartbeat.interval.ms";
private static final String HEARTBEAT_INTERVAL_MS_DOC = "The expected time between heartbeats to the consumer coordinator when using Kafka's group management facilities. Heartbeats are used to ensure that the consumer's session stays active and to facilitate rebalancing when new consumers join or leave the group. The value must be set lower than <code>session.timeout.ms</code>, but typically should be set no higher than 1/3 of that value. It can be adjusted even lower to control the expected time for normal rebalances.";
/**
* <code>bootstrap.servers</code>
*/
@ -171,6 +177,11 @@ public class ConsumerConfig extends AbstractConfig { @@ -171,6 +177,11 @@ public class ConsumerConfig extends AbstractConfig {
30000,
Importance.HIGH,
SESSION_TIMEOUT_MS_DOC)
.define(HEARTBEAT_INTERVAL_MS_CONFIG,
Type.INT,
3000,
Importance.HIGH,
HEARTBEAT_INTERVAL_MS_DOC)
.define(PARTITION_ASSIGNMENT_STRATEGY_CONFIG,
Type.STRING,
"range",

1
clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java

@ -540,6 +540,7 @@ public class KafkaConsumer<K, V> implements Consumer<K, V> { @@ -540,6 +540,7 @@ public class KafkaConsumer<K, V> implements Consumer<K, V> {
this.coordinator = new Coordinator(this.client,
config.getString(ConsumerConfig.GROUP_ID_CONFIG),
config.getInt(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG),
config.getInt(ConsumerConfig.HEARTBEAT_INTERVAL_MS_CONFIG),
config.getString(ConsumerConfig.PARTITION_ASSIGNMENT_STRATEGY_CONFIG),
this.subscriptions,
metrics,

3
clients/src/main/java/org/apache/kafka/clients/consumer/internals/Coordinator.java

@ -84,6 +84,7 @@ public final class Coordinator { @@ -84,6 +84,7 @@ public final class Coordinator {
public Coordinator(ConsumerNetworkClient client,
String groupId,
int sessionTimeoutMs,
int heartbeatIntervalMs,
String assignmentStrategy,
SubscriptionState subscriptions,
Metrics metrics,
@ -103,7 +104,7 @@ public final class Coordinator { @@ -103,7 +104,7 @@ public final class Coordinator {
this.subscriptions = subscriptions;
this.sessionTimeoutMs = sessionTimeoutMs;
this.assignmentStrategy = assignmentStrategy;
this.heartbeat = new Heartbeat(this.sessionTimeoutMs, time.milliseconds());
this.heartbeat = new Heartbeat(this.sessionTimeoutMs, heartbeatIntervalMs, time.milliseconds());
this.heartbeatTask = new HeartbeatTask();
this.sensors = new CoordinatorMetrics(metrics, metricGrpPrefix, metricTags);
this.requestTimeoutMs = requestTimeoutMs;

24
clients/src/main/java/org/apache/kafka/clients/consumer/internals/Heartbeat.java

@ -16,20 +16,21 @@ package org.apache.kafka.clients.consumer.internals; @@ -16,20 +16,21 @@ package org.apache.kafka.clients.consumer.internals;
* A helper class for managing the heartbeat to the coordinator
*/
public final class Heartbeat {
/* The number of heartbeats to attempt to complete per session timeout interval.
* so, e.g., with a session timeout of 3 seconds we would attempt a heartbeat
* once per second.
*/
public final static int HEARTBEATS_PER_SESSION_INTERVAL = 3;
private final long timeout;
private final long interval;
private long lastHeartbeatSend;
private long lastHeartbeatReceive;
private long lastSessionReset;
public Heartbeat(long timeout, long now) {
public Heartbeat(long timeout,
long interval,
long now) {
if (interval >= timeout)
throw new IllegalArgumentException("Heartbeat must be set lower than the session timeout");
this.timeout = timeout;
this.interval = interval;
this.lastSessionReset = now;
}
@ -52,11 +53,10 @@ public final class Heartbeat { @@ -52,11 +53,10 @@ public final class Heartbeat {
public long timeToNextHeartbeat(long now) {
long timeSinceLastHeartbeat = now - Math.max(lastHeartbeatSend, lastSessionReset);
long hbInterval = timeout / HEARTBEATS_PER_SESSION_INTERVAL;
if (timeSinceLastHeartbeat > hbInterval)
if (timeSinceLastHeartbeat > interval)
return 0;
else
return hbInterval - timeSinceLastHeartbeat;
return interval - timeSinceLastHeartbeat;
}
public boolean sessionTimeoutExpired(long now) {
@ -64,7 +64,7 @@ public final class Heartbeat { @@ -64,7 +64,7 @@ public final class Heartbeat {
}
public long interval() {
return timeout / HEARTBEATS_PER_SESSION_INTERVAL;
return interval;
}
public void resetSessionTimeout(long now) {

2
clients/src/test/java/org/apache/kafka/clients/consumer/internals/CoordinatorTest.java

@ -59,6 +59,7 @@ public class CoordinatorTest { @@ -59,6 +59,7 @@ public class CoordinatorTest {
private String groupId = "test-group";
private TopicPartition tp = new TopicPartition(topicName, 0);
private int sessionTimeoutMs = 10;
private int heartbeatIntervalMs = 2;
private long retryBackoffMs = 100;
private long requestTimeoutMs = 5000;
private String rebalanceStrategy = "not-matter";
@ -89,6 +90,7 @@ public class CoordinatorTest { @@ -89,6 +90,7 @@ public class CoordinatorTest {
this.coordinator = new Coordinator(consumerClient,
groupId,
sessionTimeoutMs,
heartbeatIntervalMs,
rebalanceStrategy,
subscriptions,
metrics,

7
clients/src/test/java/org/apache/kafka/clients/consumer/internals/HeartbeatTest.java

@ -27,20 +27,21 @@ import static org.junit.Assert.assertTrue; @@ -27,20 +27,21 @@ import static org.junit.Assert.assertTrue;
public class HeartbeatTest {
private long timeout = 300L;
private long interval = 100L;
private MockTime time = new MockTime();
private Heartbeat heartbeat = new Heartbeat(timeout, -1L);
private Heartbeat heartbeat = new Heartbeat(timeout, interval, -1L);
@Test
public void testShouldHeartbeat() {
heartbeat.sentHeartbeat(time.milliseconds());
time.sleep((long) ((float) timeout / Heartbeat.HEARTBEATS_PER_SESSION_INTERVAL * 1.1));
time.sleep((long) ((float) interval * 1.1));
assertTrue(heartbeat.shouldHeartbeat(time.milliseconds()));
}
@Test
public void testShouldNotHeartbeat() {
heartbeat.sentHeartbeat(time.milliseconds());
time.sleep(timeout / (2 * Heartbeat.HEARTBEATS_PER_SESSION_INTERVAL));
time.sleep(interval / 2);
assertFalse(heartbeat.shouldHeartbeat(time.milliseconds()));
}

3
core/src/test/scala/integration/kafka/api/ConsumerBounceTest.scala

@ -43,7 +43,8 @@ class ConsumerBounceTest extends IntegrationTestHarness with Logging { @@ -43,7 +43,8 @@ class ConsumerBounceTest extends IntegrationTestHarness with Logging {
this.producerConfig.setProperty(ProducerConfig.ACKS_CONFIG, "all")
this.consumerConfig.setProperty(ConsumerConfig.GROUP_ID_CONFIG, "my-test")
this.consumerConfig.setProperty(ConsumerConfig.MAX_PARTITION_FETCH_BYTES_CONFIG, 4096.toString)
this.consumerConfig.setProperty(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, "20")
this.consumerConfig.setProperty(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, "100")
this.consumerConfig.setProperty(ConsumerConfig.HEARTBEAT_INTERVAL_MS_CONFIG, "30")
this.consumerConfig.setProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest")
override def generateConfigs() = {

3
core/src/test/scala/integration/kafka/api/ConsumerTest.scala

@ -207,6 +207,7 @@ class ConsumerTest extends IntegrationTestHarness with Logging { @@ -207,6 +207,7 @@ class ConsumerTest extends IntegrationTestHarness with Logging {
def testPartitionReassignmentCallback() {
val callback = new TestConsumerReassignmentCallback()
this.consumerConfig.setProperty(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, "100"); // timeout quickly to avoid slow test
this.consumerConfig.setProperty(ConsumerConfig.HEARTBEAT_INTERVAL_MS_CONFIG, "30");
val consumer0 = new KafkaConsumer(this.consumerConfig, callback, new ByteArrayDeserializer(), new ByteArrayDeserializer())
consumer0.subscribe(topic)
@ -238,6 +239,7 @@ class ConsumerTest extends IntegrationTestHarness with Logging { @@ -238,6 +239,7 @@ class ConsumerTest extends IntegrationTestHarness with Logging {
def testUnsubscribeTopic() {
val callback = new TestConsumerReassignmentCallback()
this.consumerConfig.setProperty(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, "100"); // timeout quickly to avoid slow test
this.consumerConfig.setProperty(ConsumerConfig.HEARTBEAT_INTERVAL_MS_CONFIG, "30");
val consumer0 = new KafkaConsumer(this.consumerConfig, callback, new ByteArrayDeserializer(), new ByteArrayDeserializer())
try {
@ -267,6 +269,7 @@ class ConsumerTest extends IntegrationTestHarness with Logging { @@ -267,6 +269,7 @@ class ConsumerTest extends IntegrationTestHarness with Logging {
def testPauseStateNotPreservedByRebalance() {
this.consumerConfig.setProperty(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, "100"); // timeout quickly to avoid slow test
this.consumerConfig.setProperty(ConsumerConfig.HEARTBEAT_INTERVAL_MS_CONFIG, "30");
val consumer0 = new KafkaConsumer(this.consumerConfig, null, new ByteArrayDeserializer(), new ByteArrayDeserializer())
sendRecords(5)

Loading…
Cancel
Save