diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Coordinator.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Coordinator.java index 701a81bf586..98193e829ad 100644 --- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Coordinator.java +++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Coordinator.java @@ -14,6 +14,7 @@ package org.apache.kafka.clients.consumer.internals; import org.apache.kafka.clients.ClientResponse; import org.apache.kafka.clients.consumer.ConsumerRebalanceListener; +import org.apache.kafka.clients.consumer.ConsumerWakeupException; import org.apache.kafka.clients.consumer.OffsetAndMetadata; import org.apache.kafka.clients.consumer.OffsetCommitCallback; import org.apache.kafka.common.KafkaException; @@ -235,7 +236,15 @@ public final class Coordinator implements Closeable { @Override public void close() { // commit offsets prior to closing if auto-commit enabled - maybeAutoCommitOffsetsSync(); + while (true) { + try { + maybeAutoCommitOffsetsSync(); + return; + } catch (ConsumerWakeupException e) { + // ignore wakeups while closing to ensure we have a chance to commit + continue; + } + } } private class HeartbeatTask implements DelayedTask { @@ -430,6 +439,9 @@ public final class Coordinator implements Closeable { if (autoCommitEnabled) { try { commitOffsetsSync(subscriptions.allConsumed()); + } catch (ConsumerWakeupException e) { + // rethrow wakeups since they are triggered by the user + throw e; } catch (Exception e) { // consistent with async auto-commit failures, we do not propagate the exception log.error("Auto offset commit failed.", e); diff --git a/core/src/test/scala/integration/kafka/api/ConsumerTest.scala b/core/src/test/scala/integration/kafka/api/ConsumerTest.scala index 166b914a584..0a02b037bd8 100644 --- a/core/src/test/scala/integration/kafka/api/ConsumerTest.scala +++ b/core/src/test/scala/integration/kafka/api/ConsumerTest.scala @@ -115,6 +115,36 @@ class ConsumerTest extends IntegrationTestHarness with Logging { assertEquals(500, this.consumers(0).committed(tp2).offset) } + @Test + def testAutoCommitOnCloseAfterWakeup() { + this.consumerConfig.setProperty(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "true") + val consumer0 = new KafkaConsumer(this.consumerConfig, new ByteArrayDeserializer(), new ByteArrayDeserializer()) + + val numRecords = 10000 + sendRecords(numRecords) + + consumer0.subscribe(List(topic)) + + val assignment = Set(tp, tp2) + TestUtils.waitUntilTrue(() => { + consumer0.poll(50) + consumer0.assignment() == assignment.asJava + }, s"Expected partitions ${assignment.asJava} but actually got ${consumer0.assignment()}") + + // should auto-commit seeked positions before closing + consumer0.seek(tp, 300) + consumer0.seek(tp2, 500) + + // wakeup the consumer before closing to simulate trying to break a poll + // loop from another thread + consumer0.wakeup() + consumer0.close() + + // now we should see the committed positions from another consumer + assertEquals(300, this.consumers(0).committed(tp).offset) + assertEquals(500, this.consumers(0).committed(tp2).offset) + } + @Test def testAutoCommitOnRebalance() { val topic2 = "topic2"