Browse Source

Improve logging in the consumer for epoch updates (#6879)

pull/6881/head
David Arthur 6 years ago committed by GitHub
parent
commit
264d1d8a8b
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 2
      clients/src/main/java/org/apache/kafka/clients/Metadata.java
  2. 5
      clients/src/main/java/org/apache/kafka/clients/MetadataCache.java
  3. 7
      clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java

2
clients/src/main/java/org/apache/kafka/clients/Metadata.java

@ -177,7 +177,7 @@ public class Metadata implements Closeable { @@ -177,7 +177,7 @@ public class Metadata implements Closeable {
}
return true;
} else {
log.debug("Not replacing existing epoch {} with new epoch {}", oldEpoch, epoch);
log.debug("Not replacing existing epoch {} with new epoch {} for partition {}", oldEpoch, epoch, topicPartition);
return false;
}
}

5
clients/src/main/java/org/apache/kafka/clients/MetadataCache.java

@ -137,7 +137,10 @@ public class MetadataCache { @@ -137,7 +137,10 @@ public class MetadataCache {
@Override
public String toString() {
return "MetadataCache{" +
"cluster=" + cluster() +
"clusterId='" + clusterId + '\'' +
", nodes=" + nodes +
", partitions=" + metadataByPartition.values() +
", controller=" + controller +
'}';
}

7
clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java

@ -861,13 +861,14 @@ public class Fetcher<K, V> implements Closeable { @@ -861,13 +861,14 @@ public class Fetcher<K, V> implements Closeable {
final Map<TopicPartition, ListOffsetRequest.PartitionData> partitionDataMap = new HashMap<>();
for (Map.Entry<TopicPartition, Long> entry: timestampsToSearch.entrySet()) {
TopicPartition tp = entry.getKey();
Long offset = entry.getValue();
Optional<MetadataCache.PartitionInfoAndEpoch> currentInfo = metadata.partitionInfoIfCurrent(tp);
if (!currentInfo.isPresent()) {
log.debug("Leader for partition {} is unknown for fetching offset", tp);
log.debug("Leader for partition {} is unknown for fetching offset {}", tp, offset);
metadata.requestUpdate();
partitionsToRetry.add(tp);
} else if (currentInfo.get().partitionInfo().leader() == null) {
log.debug("Leader for partition {} is unavailable for fetching offset", tp);
log.debug("Leader for partition {} is unavailable for fetching offset {}", tp, offset);
metadata.requestUpdate();
partitionsToRetry.add(tp);
} else if (client.isUnavailable(currentInfo.get().partitionInfo().leader())) {
@ -881,7 +882,7 @@ public class Fetcher<K, V> implements Closeable { @@ -881,7 +882,7 @@ public class Fetcher<K, V> implements Closeable {
partitionsToRetry.add(tp);
} else {
partitionDataMap.put(tp,
new ListOffsetRequest.PartitionData(entry.getValue(), Optional.of(currentInfo.get().epoch())));
new ListOffsetRequest.PartitionData(offset, Optional.of(currentInfo.get().epoch())));
}
}
return regroupPartitionMapByNode(partitionDataMap);

Loading…
Cancel
Save