Browse Source

MINOR: Make info logs for KafkaConsumer a bit more verbose (#6279)

When debugging KafkaConsumer production issues, it's pretty
useful to have log entries related to seeking and committed
offset retrieval enabled by default. These are currently present,
but only when debug logging is enabled. Change them to `info`.

Also included a minor code simplication and a slight improvement
to an exception message.

Reviewers: Jason Gustafson <jason@confluent.io>
pull/5756/head
Ismael Juma 6 years ago committed by GitHub
parent
commit
b53f844028
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 8
      clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java
  2. 10
      clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java

8
clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java

@ -1527,10 +1527,10 @@ public class KafkaConsumer<K, V> implements Consumer<K, V> {
acquireAndEnsureOpen(); acquireAndEnsureOpen();
try { try {
if (offsetAndMetadata.leaderEpoch().isPresent()) { if (offsetAndMetadata.leaderEpoch().isPresent()) {
log.debug("Seeking to offset {} for partition {} with epoch {}", log.info("Seeking to offset {} for partition {} with epoch {}",
offset, partition, offsetAndMetadata.leaderEpoch().get()); offset, partition, offsetAndMetadata.leaderEpoch().get());
} else { } else {
log.debug("Seeking to offset {} for partition {}", offset, partition); log.info("Seeking to offset {} for partition {}", offset, partition);
} }
this.updateLastSeenEpochIfNewer(partition, offsetAndMetadata); this.updateLastSeenEpochIfNewer(partition, offsetAndMetadata);
this.subscriptions.seek(partition, offset); this.subscriptions.seek(partition, offset);
@ -1556,7 +1556,7 @@ public class KafkaConsumer<K, V> implements Consumer<K, V> {
try { try {
Collection<TopicPartition> parts = partitions.size() == 0 ? this.subscriptions.assignedPartitions() : partitions; Collection<TopicPartition> parts = partitions.size() == 0 ? this.subscriptions.assignedPartitions() : partitions;
for (TopicPartition tp : parts) { for (TopicPartition tp : parts) {
log.debug("Seeking to beginning of partition {}", tp); log.info("Seeking to beginning of partition {}", tp);
subscriptions.requestOffsetReset(tp, OffsetResetStrategy.EARLIEST); subscriptions.requestOffsetReset(tp, OffsetResetStrategy.EARLIEST);
} }
} finally { } finally {
@ -1584,7 +1584,7 @@ public class KafkaConsumer<K, V> implements Consumer<K, V> {
try { try {
Collection<TopicPartition> parts = partitions.size() == 0 ? this.subscriptions.assignedPartitions() : partitions; Collection<TopicPartition> parts = partitions.size() == 0 ? this.subscriptions.assignedPartitions() : partitions;
for (TopicPartition tp : parts) { for (TopicPartition tp : parts) {
log.debug("Seeking to end of partition {}", tp); log.info("Seeking to end of partition {}", tp);
subscriptions.requestOffsetReset(tp, OffsetResetStrategy.LATEST); subscriptions.requestOffsetReset(tp, OffsetResetStrategy.LATEST);
} }
} finally { } finally {

10
clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java

@ -107,8 +107,7 @@ public final class ConsumerCoordinator extends AbstractCoordinator {
} }
private boolean sameRequest(final Set<TopicPartition> currentRequest, final Generation currentGeneration) { private boolean sameRequest(final Set<TopicPartition> currentRequest, final Generation currentGeneration) {
return (requestedGeneration == null ? currentGeneration == null : requestedGeneration.equals(currentGeneration)) return Objects.equals(requestedGeneration, currentGeneration) && requestedPartitions.equals(currentRequest);
&& requestedPartitions.equals(currentRequest);
} }
} }
@ -499,7 +498,7 @@ public final class ConsumerCoordinator extends AbstractCoordinator {
for (final Map.Entry<TopicPartition, OffsetAndMetadata> entry : offsets.entrySet()) { for (final Map.Entry<TopicPartition, OffsetAndMetadata> entry : offsets.entrySet()) {
final TopicPartition tp = entry.getKey(); final TopicPartition tp = entry.getKey();
final long offset = entry.getValue().offset(); final long offset = entry.getValue().offset();
log.debug("Setting offset for partition {} to the committed offset {}", tp, offset); log.info("Setting offset for partition {} to the committed offset {}", tp, offset);
entry.getValue().leaderEpoch().ifPresent(epoch -> this.metadata.updateLastSeenEpochIfNewer(entry.getKey(), epoch)); entry.getValue().leaderEpoch().ifPresent(epoch -> this.metadata.updateLastSeenEpochIfNewer(entry.getKey(), epoch));
this.subscriptions.seek(tp, offset); this.subscriptions.seek(tp, offset);
} }
@ -909,14 +908,15 @@ public final class ConsumerCoordinator extends AbstractCoordinator {
if (error == Errors.UNKNOWN_TOPIC_OR_PARTITION) { if (error == Errors.UNKNOWN_TOPIC_OR_PARTITION) {
future.raise(new KafkaException("Topic or Partition " + tp + " does not exist")); future.raise(new KafkaException("Topic or Partition " + tp + " does not exist"));
} else { } else {
future.raise(new KafkaException("Unexpected error in fetch offset response: " + error.message())); future.raise(new KafkaException("Unexpected error in fetch offset response for partition " +
tp + ": " + error.message()));
} }
return; return;
} else if (data.offset >= 0) { } else if (data.offset >= 0) {
// record the position with the offset (-1 indicates no committed offset to fetch) // record the position with the offset (-1 indicates no committed offset to fetch)
offsets.put(tp, new OffsetAndMetadata(data.offset, data.leaderEpoch, data.metadata)); offsets.put(tp, new OffsetAndMetadata(data.offset, data.leaderEpoch, data.metadata));
} else { } else {
log.debug("Found no committed offset for partition {}", tp); log.info("Found no committed offset for partition {}", tp);
} }
} }

Loading…
Cancel
Save