Browse Source

KAFKA-6362; Async auto-commit should discover coordinator if it is unknown (#4326)

Currently `maybeAutoCommitOffsetsAsync` does not try to find the coordinator if it is unknown. As a result, asynchronous auto-commits will fail indefinitely. This patch changes the behavior to add coordinator discovery to the async auto-commit path.
pull/4539/head
huxi 7 years ago committed by Jason Gustafson
parent
commit
ac267dc5ce
  1. 2
      clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java
  2. 20
      clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java
  3. 19
      clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinatorTest.java

2
clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java

@ -1058,7 +1058,7 @@ public class KafkaConsumer<K, V> implements Consumer<K, V> { @@ -1058,7 +1058,7 @@ public class KafkaConsumer<K, V> implements Consumer<K, V> {
// make sure the offsets of topic partitions the consumer is unsubscribing from
// are committed since there will be no following rebalance
this.coordinator.maybeAutoCommitOffsetsNow();
this.coordinator.maybeAutoCommitOffsetsAsync(time.milliseconds());
log.debug("Subscribed to partition(s): {}", Utils.join(partitions, ", "));
this.subscriptions.assignFromUser(new HashSet<>(partitions));

20
clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java

@ -528,6 +528,7 @@ public final class ConsumerCoordinator extends AbstractCoordinator { @@ -528,6 +528,7 @@ public final class ConsumerCoordinator extends AbstractCoordinator {
public void onSuccess(Void value) {
pendingAsyncCommits.decrementAndGet();
doCommitOffsetsAsync(offsets, callback);
client.pollNoWakeup();
}
@Override
@ -623,20 +624,10 @@ public final class ConsumerCoordinator extends AbstractCoordinator { @@ -623,20 +624,10 @@ public final class ConsumerCoordinator extends AbstractCoordinator {
return false;
}
private void maybeAutoCommitOffsetsAsync(long now) {
if (autoCommitEnabled) {
if (coordinatorUnknown()) {
this.nextAutoCommitDeadline = now + retryBackoffMs;
} else if (now >= nextAutoCommitDeadline) {
this.nextAutoCommitDeadline = now + autoCommitIntervalMs;
doAutoCommitOffsetsAsync();
}
}
}
public void maybeAutoCommitOffsetsNow() {
if (autoCommitEnabled && !coordinatorUnknown())
public void maybeAutoCommitOffsetsAsync(long now) {
if (autoCommitEnabled && now >= nextAutoCommitDeadline) {
doAutoCommitOffsetsAsync();
}
}
private void doAutoCommitOffsetsAsync() {
@ -650,8 +641,11 @@ public final class ConsumerCoordinator extends AbstractCoordinator { @@ -650,8 +641,11 @@ public final class ConsumerCoordinator extends AbstractCoordinator {
log.warn("Asynchronous auto-commit of offsets {} failed: {}", offsets, exception.getMessage());
if (exception instanceof RetriableException)
nextAutoCommitDeadline = Math.min(time.milliseconds() + retryBackoffMs, nextAutoCommitDeadline);
else
nextAutoCommitDeadline = time.milliseconds() + autoCommitIntervalMs;
} else {
log.debug("Completed asynchronous auto-commit of offsets {}", offsets);
nextAutoCommitDeadline = time.milliseconds() + autoCommitIntervalMs;
}
}
});

19
clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinatorTest.java

@ -1625,6 +1625,25 @@ public class ConsumerCoordinatorTest { @@ -1625,6 +1625,25 @@ public class ConsumerCoordinatorTest {
assertFalse("Heartbeat thread active after close", threads[i].getName().contains(groupId));
}
@Test
public void testAutoCommitAfterCoordinatorBackToService() {
ConsumerCoordinator coordinator = buildCoordinator(new Metrics(), assignors,
ConsumerConfig.DEFAULT_EXCLUDE_INTERNAL_TOPICS, true, true);
subscriptions.assignFromUser(Collections.singleton(t1p));
subscriptions.seek(t1p, 100L);
coordinator.coordinatorDead();
assertTrue(coordinator.coordinatorUnknown());
client.prepareResponse(groupCoordinatorResponse(node, Errors.NONE));
client.prepareResponse(offsetCommitResponse(Collections.singletonMap(t1p, Errors.NONE)));
// async commit offset should find coordinator
time.sleep(autoCommitIntervalMs); // sleep for a while to ensure auto commit does happen
coordinator.maybeAutoCommitOffsetsAsync(time.milliseconds());
assertFalse(coordinator.coordinatorUnknown());
assertEquals(subscriptions.committed(t1p).offset(), 100L);
}
private ConsumerCoordinator prepareCoordinatorForCloseTest(final boolean useGroupManagement,
final boolean autoCommit,
final boolean leaveGroup) {

Loading…
Cancel
Save