Browse Source

MINOR: Include partition when logging fetch errors (#6206)

This helps narrow down the specific broker they came from when debugging
ACL propagation issues.

Reviewers: Vahid Hashemian <vahid.hashemian@gmail.com>, Ismael Juma <ismael@juma.me.uk>
pull/6254/head
Radai Rosenblatt 6 years ago committed by Ismael Juma
parent
commit
c6528424df
  1. 5
      clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java

5
clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java

@ -990,12 +990,13 @@ public class Fetcher<K, V> implements SubscriptionState.Listener, Closeable { @@ -990,12 +990,13 @@ public class Fetcher<K, V> implements SubscriptionState.Listener, Closeable {
throw new OffsetOutOfRangeException(Collections.singletonMap(tp, fetchOffset));
}
} else if (error == Errors.TOPIC_AUTHORIZATION_FAILED) {
log.warn("Not authorized to read from topic {}.", tp.topic());
//we log the actual partition and not just the topic to help with ACL propagation issues in large clusters
log.warn("Not authorized to read from partition {}.", tp);
throw new TopicAuthorizationException(Collections.singleton(tp.topic()));
} else if (error == Errors.UNKNOWN_SERVER_ERROR) {
log.warn("Unknown error fetching data for topic-partition {}", tp);
} else {
throw new IllegalStateException("Unexpected error code " + error.code() + " while fetching data");
throw new IllegalStateException("Unexpected error code " + error.code() + " while fetching from partition " + tp);
}
} finally {
if (partitionRecords == null)

Loading…
Cancel
Save