Browse Source

MINOR: Add log when the consumer does not send an offset commit due to not being part of an active group (#6404)

Reviewers: Jason Gustafson <jason@confluent.io>
pull/6775/head
Stanislav Kozlovski 6 years ago committed by Jason Gustafson
parent
commit
5a30a806ec
  1. 15
      clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java

15
clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java

@ -810,16 +810,17 @@ public final class ConsumerCoordinator extends AbstractCoordinator {
} }
final Generation generation; final Generation generation;
if (subscriptions.partitionsAutoAssigned()) if (subscriptions.partitionsAutoAssigned()) {
generation = generation(); generation = generation();
else // if the generation is null, we are not part of an active group (and we expect to be).
// the only thing we can do is fail the commit and let the user rejoin the group in poll()
if (generation == null) {
log.info("Failing OffsetCommit request since the consumer is not part of an active group");
return RequestFuture.failure(new CommitFailedException());
}
} else
generation = Generation.NO_GENERATION; generation = Generation.NO_GENERATION;
// if the generation is null, we are not part of an active group (and we expect to be).
// the only thing we can do is fail the commit and let the user rejoin the group in poll()
if (generation == null)
return RequestFuture.failure(new CommitFailedException());
OffsetCommitRequest.Builder builder = new OffsetCommitRequest.Builder( OffsetCommitRequest.Builder builder = new OffsetCommitRequest.Builder(
new OffsetCommitRequestData() new OffsetCommitRequestData()
.setGroupId(this.groupId) .setGroupId(this.groupId)

Loading…
Cancel
Save