diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java b/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java index 772f973bc58..b9c811d1f93 100644 --- a/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java +++ b/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java @@ -1527,10 +1527,10 @@ public class KafkaConsumer implements Consumer { acquireAndEnsureOpen(); try { if (offsetAndMetadata.leaderEpoch().isPresent()) { - log.debug("Seeking to offset {} for partition {} with epoch {}", + log.info("Seeking to offset {} for partition {} with epoch {}", offset, partition, offsetAndMetadata.leaderEpoch().get()); } else { - log.debug("Seeking to offset {} for partition {}", offset, partition); + log.info("Seeking to offset {} for partition {}", offset, partition); } this.updateLastSeenEpochIfNewer(partition, offsetAndMetadata); this.subscriptions.seek(partition, offset); @@ -1556,7 +1556,7 @@ public class KafkaConsumer implements Consumer { try { Collection parts = partitions.size() == 0 ? this.subscriptions.assignedPartitions() : partitions; for (TopicPartition tp : parts) { - log.debug("Seeking to beginning of partition {}", tp); + log.info("Seeking to beginning of partition {}", tp); subscriptions.requestOffsetReset(tp, OffsetResetStrategy.EARLIEST); } } finally { @@ -1584,7 +1584,7 @@ public class KafkaConsumer implements Consumer { try { Collection parts = partitions.size() == 0 ? this.subscriptions.assignedPartitions() : partitions; for (TopicPartition tp : parts) { - log.debug("Seeking to end of partition {}", tp); + log.info("Seeking to end of partition {}", tp); subscriptions.requestOffsetReset(tp, OffsetResetStrategy.LATEST); } } finally { diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java index 902a72c126c..f9c42570fc7 100644 --- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java +++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java @@ -107,8 +107,7 @@ public final class ConsumerCoordinator extends AbstractCoordinator { } private boolean sameRequest(final Set currentRequest, final Generation currentGeneration) { - return (requestedGeneration == null ? currentGeneration == null : requestedGeneration.equals(currentGeneration)) - && requestedPartitions.equals(currentRequest); + return Objects.equals(requestedGeneration, currentGeneration) && requestedPartitions.equals(currentRequest); } } @@ -499,7 +498,7 @@ public final class ConsumerCoordinator extends AbstractCoordinator { for (final Map.Entry entry : offsets.entrySet()) { final TopicPartition tp = entry.getKey(); final long offset = entry.getValue().offset(); - log.debug("Setting offset for partition {} to the committed offset {}", tp, offset); + log.info("Setting offset for partition {} to the committed offset {}", tp, offset); entry.getValue().leaderEpoch().ifPresent(epoch -> this.metadata.updateLastSeenEpochIfNewer(entry.getKey(), epoch)); this.subscriptions.seek(tp, offset); } @@ -909,14 +908,15 @@ public final class ConsumerCoordinator extends AbstractCoordinator { if (error == Errors.UNKNOWN_TOPIC_OR_PARTITION) { future.raise(new KafkaException("Topic or Partition " + tp + " does not exist")); } else { - future.raise(new KafkaException("Unexpected error in fetch offset response: " + error.message())); + future.raise(new KafkaException("Unexpected error in fetch offset response for partition " + + tp + ": " + error.message())); } return; } else if (data.offset >= 0) { // record the position with the offset (-1 indicates no committed offset to fetch) offsets.put(tp, new OffsetAndMetadata(data.offset, data.leaderEpoch, data.metadata)); } else { - log.debug("Found no committed offset for partition {}", tp); + log.info("Found no committed offset for partition {}", tp); } }