diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java b/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java index 1d84f847cd8..2f7fd58a66f 100644 --- a/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java +++ b/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java @@ -1058,7 +1058,7 @@ public class KafkaConsumer implements Consumer { // make sure the offsets of topic partitions the consumer is unsubscribing from // are committed since there will be no following rebalance - this.coordinator.maybeAutoCommitOffsetsNow(); + this.coordinator.maybeAutoCommitOffsetsAsync(time.milliseconds()); log.debug("Subscribed to partition(s): {}", Utils.join(partitions, ", ")); this.subscriptions.assignFromUser(new HashSet<>(partitions)); diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java index 5c1e60eee82..d7c1ce9966e 100644 --- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java +++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java @@ -528,6 +528,7 @@ public final class ConsumerCoordinator extends AbstractCoordinator { public void onSuccess(Void value) { pendingAsyncCommits.decrementAndGet(); doCommitOffsetsAsync(offsets, callback); + client.pollNoWakeup(); } @Override @@ -623,20 +624,10 @@ public final class ConsumerCoordinator extends AbstractCoordinator { return false; } - private void maybeAutoCommitOffsetsAsync(long now) { - if (autoCommitEnabled) { - if (coordinatorUnknown()) { - this.nextAutoCommitDeadline = now + retryBackoffMs; - } else if (now >= nextAutoCommitDeadline) { - this.nextAutoCommitDeadline = now + autoCommitIntervalMs; - doAutoCommitOffsetsAsync(); - } - } - } - - public void maybeAutoCommitOffsetsNow() { - if (autoCommitEnabled && !coordinatorUnknown()) + public void maybeAutoCommitOffsetsAsync(long now) { + if (autoCommitEnabled && now >= nextAutoCommitDeadline) { doAutoCommitOffsetsAsync(); + } } private void doAutoCommitOffsetsAsync() { @@ -650,8 +641,11 @@ public final class ConsumerCoordinator extends AbstractCoordinator { log.warn("Asynchronous auto-commit of offsets {} failed: {}", offsets, exception.getMessage()); if (exception instanceof RetriableException) nextAutoCommitDeadline = Math.min(time.milliseconds() + retryBackoffMs, nextAutoCommitDeadline); + else + nextAutoCommitDeadline = time.milliseconds() + autoCommitIntervalMs; } else { log.debug("Completed asynchronous auto-commit of offsets {}", offsets); + nextAutoCommitDeadline = time.milliseconds() + autoCommitIntervalMs; } } }); diff --git a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinatorTest.java b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinatorTest.java index 76301a71bea..c49339b6525 100644 --- a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinatorTest.java +++ b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinatorTest.java @@ -1625,6 +1625,25 @@ public class ConsumerCoordinatorTest { assertFalse("Heartbeat thread active after close", threads[i].getName().contains(groupId)); } + @Test + public void testAutoCommitAfterCoordinatorBackToService() { + ConsumerCoordinator coordinator = buildCoordinator(new Metrics(), assignors, + ConsumerConfig.DEFAULT_EXCLUDE_INTERNAL_TOPICS, true, true); + subscriptions.assignFromUser(Collections.singleton(t1p)); + subscriptions.seek(t1p, 100L); + + coordinator.coordinatorDead(); + assertTrue(coordinator.coordinatorUnknown()); + client.prepareResponse(groupCoordinatorResponse(node, Errors.NONE)); + client.prepareResponse(offsetCommitResponse(Collections.singletonMap(t1p, Errors.NONE))); + + // async commit offset should find coordinator + time.sleep(autoCommitIntervalMs); // sleep for a while to ensure auto commit does happen + coordinator.maybeAutoCommitOffsetsAsync(time.milliseconds()); + assertFalse(coordinator.coordinatorUnknown()); + assertEquals(subscriptions.committed(t1p).offset(), 100L); + } + private ConsumerCoordinator prepareCoordinatorForCloseTest(final boolean useGroupManagement, final boolean autoCommit, final boolean leaveGroup) {