Browse Source

MINOR: ignore wakeups when committing offsets on consumer close

Author: Jason Gustafson <jason@confluent.io>

Reviewers: Gwen Shapira

Closes #306 from hachikuji/handle-wakeup-in-consumer-close
pull/306/merge
Jason Gustafson 9 years ago committed by Gwen Shapira
parent
commit
27c099b043
  1. 14
      clients/src/main/java/org/apache/kafka/clients/consumer/internals/Coordinator.java
  2. 30
      core/src/test/scala/integration/kafka/api/ConsumerTest.scala

14
clients/src/main/java/org/apache/kafka/clients/consumer/internals/Coordinator.java

@ -14,6 +14,7 @@ package org.apache.kafka.clients.consumer.internals; @@ -14,6 +14,7 @@ package org.apache.kafka.clients.consumer.internals;
import org.apache.kafka.clients.ClientResponse;
import org.apache.kafka.clients.consumer.ConsumerRebalanceListener;
import org.apache.kafka.clients.consumer.ConsumerWakeupException;
import org.apache.kafka.clients.consumer.OffsetAndMetadata;
import org.apache.kafka.clients.consumer.OffsetCommitCallback;
import org.apache.kafka.common.KafkaException;
@ -235,7 +236,15 @@ public final class Coordinator implements Closeable { @@ -235,7 +236,15 @@ public final class Coordinator implements Closeable {
@Override
public void close() {
// commit offsets prior to closing if auto-commit enabled
maybeAutoCommitOffsetsSync();
while (true) {
try {
maybeAutoCommitOffsetsSync();
return;
} catch (ConsumerWakeupException e) {
// ignore wakeups while closing to ensure we have a chance to commit
continue;
}
}
}
private class HeartbeatTask implements DelayedTask {
@ -430,6 +439,9 @@ public final class Coordinator implements Closeable { @@ -430,6 +439,9 @@ public final class Coordinator implements Closeable {
if (autoCommitEnabled) {
try {
commitOffsetsSync(subscriptions.allConsumed());
} catch (ConsumerWakeupException e) {
// rethrow wakeups since they are triggered by the user
throw e;
} catch (Exception e) {
// consistent with async auto-commit failures, we do not propagate the exception
log.error("Auto offset commit failed.", e);

30
core/src/test/scala/integration/kafka/api/ConsumerTest.scala

@ -115,6 +115,36 @@ class ConsumerTest extends IntegrationTestHarness with Logging { @@ -115,6 +115,36 @@ class ConsumerTest extends IntegrationTestHarness with Logging {
assertEquals(500, this.consumers(0).committed(tp2).offset)
}
@Test
def testAutoCommitOnCloseAfterWakeup() {
this.consumerConfig.setProperty(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "true")
val consumer0 = new KafkaConsumer(this.consumerConfig, new ByteArrayDeserializer(), new ByteArrayDeserializer())
val numRecords = 10000
sendRecords(numRecords)
consumer0.subscribe(List(topic))
val assignment = Set(tp, tp2)
TestUtils.waitUntilTrue(() => {
consumer0.poll(50)
consumer0.assignment() == assignment.asJava
}, s"Expected partitions ${assignment.asJava} but actually got ${consumer0.assignment()}")
// should auto-commit seeked positions before closing
consumer0.seek(tp, 300)
consumer0.seek(tp2, 500)
// wakeup the consumer before closing to simulate trying to break a poll
// loop from another thread
consumer0.wakeup()
consumer0.close()
// now we should see the committed positions from another consumer
assertEquals(300, this.consumers(0).committed(tp).offset)
assertEquals(500, this.consumers(0).committed(tp2).offset)
}
@Test
def testAutoCommitOnRebalance() {
val topic2 = "topic2"

Loading…
Cancel
Save