From 264d1d8a8b7ede0fb8e3595d0153893966bb73b8 Mon Sep 17 00:00:00 2001 From: David Arthur Date: Tue, 4 Jun 2019 17:49:30 -0400 Subject: [PATCH] Improve logging in the consumer for epoch updates (#6879) --- .../src/main/java/org/apache/kafka/clients/Metadata.java | 2 +- .../main/java/org/apache/kafka/clients/MetadataCache.java | 5 ++++- .../apache/kafka/clients/consumer/internals/Fetcher.java | 7 ++++--- 3 files changed, 9 insertions(+), 5 deletions(-) diff --git a/clients/src/main/java/org/apache/kafka/clients/Metadata.java b/clients/src/main/java/org/apache/kafka/clients/Metadata.java index f991fa64016..94e4eb32987 100644 --- a/clients/src/main/java/org/apache/kafka/clients/Metadata.java +++ b/clients/src/main/java/org/apache/kafka/clients/Metadata.java @@ -177,7 +177,7 @@ public class Metadata implements Closeable { } return true; } else { - log.debug("Not replacing existing epoch {} with new epoch {}", oldEpoch, epoch); + log.debug("Not replacing existing epoch {} with new epoch {} for partition {}", oldEpoch, epoch, topicPartition); return false; } } diff --git a/clients/src/main/java/org/apache/kafka/clients/MetadataCache.java b/clients/src/main/java/org/apache/kafka/clients/MetadataCache.java index b928b8e0ef7..b5c9de638a7 100644 --- a/clients/src/main/java/org/apache/kafka/clients/MetadataCache.java +++ b/clients/src/main/java/org/apache/kafka/clients/MetadataCache.java @@ -137,7 +137,10 @@ public class MetadataCache { @Override public String toString() { return "MetadataCache{" + - "cluster=" + cluster() + + "clusterId='" + clusterId + '\'' + + ", nodes=" + nodes + + ", partitions=" + metadataByPartition.values() + + ", controller=" + controller + '}'; } diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java index e638963e9c8..cff6e30bc93 100644 --- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java +++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java @@ -861,13 +861,14 @@ public class Fetcher implements Closeable { final Map partitionDataMap = new HashMap<>(); for (Map.Entry entry: timestampsToSearch.entrySet()) { TopicPartition tp = entry.getKey(); + Long offset = entry.getValue(); Optional currentInfo = metadata.partitionInfoIfCurrent(tp); if (!currentInfo.isPresent()) { - log.debug("Leader for partition {} is unknown for fetching offset", tp); + log.debug("Leader for partition {} is unknown for fetching offset {}", tp, offset); metadata.requestUpdate(); partitionsToRetry.add(tp); } else if (currentInfo.get().partitionInfo().leader() == null) { - log.debug("Leader for partition {} is unavailable for fetching offset", tp); + log.debug("Leader for partition {} is unavailable for fetching offset {}", tp, offset); metadata.requestUpdate(); partitionsToRetry.add(tp); } else if (client.isUnavailable(currentInfo.get().partitionInfo().leader())) { @@ -881,7 +882,7 @@ public class Fetcher implements Closeable { partitionsToRetry.add(tp); } else { partitionDataMap.put(tp, - new ListOffsetRequest.PartitionData(entry.getValue(), Optional.of(currentInfo.get().epoch()))); + new ListOffsetRequest.PartitionData(offset, Optional.of(currentInfo.get().epoch()))); } } return regroupPartitionMapByNode(partitionDataMap);