From 7a2a198d1e8755ec367a3d95c68aaf35b8ad9ede Mon Sep 17 00:00:00 2001 From: David Mao <47232755+splett2@users.noreply.github.com> Date: Fri, 7 Feb 2020 16:43:52 -0800 Subject: [PATCH] KAFKA-9507; AdminClient should check for missing committed offsets (#8057) Addresses exception being thrown by `AdminClient` when `listConsumerGroupOffsets` returns a negative offset. A negative offset indicates the absence of a committed offset for a requested partition, and should result in a null in the returned offset map. Reviewers: Anna Povzner , Jason Gustafson --- .../org/apache/kafka/clients/admin/KafkaAdminClient.java | 7 ++++++- .../clients/admin/ListConsumerGroupOffsetsResult.java | 1 + .../apache/kafka/clients/admin/KafkaAdminClientTest.java | 7 ++++++- 3 files changed, 13 insertions(+), 2 deletions(-) diff --git a/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java b/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java index 6b8e962f979..52332f88206 100644 --- a/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java +++ b/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java @@ -3100,7 +3100,12 @@ public class KafkaAdminClient extends AdminClient { final Long offset = partitionData.offset; final String metadata = partitionData.metadata; final Optional leaderEpoch = partitionData.leaderEpoch; - groupOffsetsListing.put(topicPartition, new OffsetAndMetadata(offset, leaderEpoch, metadata)); + // Negative offset indicates that the group has no committed offset for this partition + if (offset < 0) { + groupOffsetsListing.put(topicPartition, null); + } else { + groupOffsetsListing.put(topicPartition, new OffsetAndMetadata(offset, leaderEpoch, metadata)); + } } else { log.warn("Skipping return offset for {} due to error {}.", topicPartition, error); } diff --git a/clients/src/main/java/org/apache/kafka/clients/admin/ListConsumerGroupOffsetsResult.java b/clients/src/main/java/org/apache/kafka/clients/admin/ListConsumerGroupOffsetsResult.java index ea5193402bd..48f45314181 100644 --- a/clients/src/main/java/org/apache/kafka/clients/admin/ListConsumerGroupOffsetsResult.java +++ b/clients/src/main/java/org/apache/kafka/clients/admin/ListConsumerGroupOffsetsResult.java @@ -40,6 +40,7 @@ public class ListConsumerGroupOffsetsResult { /** * Return a future which yields a map of topic partitions to OffsetAndMetadata objects. + * If the group does not have a committed offset for this partition, the corresponding value in the returned map will be null. */ public KafkaFuture> partitionsToOffsetAndMetadata() { return future; diff --git a/clients/src/test/java/org/apache/kafka/clients/admin/KafkaAdminClientTest.java b/clients/src/test/java/org/apache/kafka/clients/admin/KafkaAdminClientTest.java index cba9b48d382..ed3e8a6781a 100644 --- a/clients/src/test/java/org/apache/kafka/clients/admin/KafkaAdminClientTest.java +++ b/clients/src/test/java/org/apache/kafka/clients/admin/KafkaAdminClientTest.java @@ -1494,6 +1494,7 @@ public class KafkaAdminClientTest { TopicPartition myTopicPartition0 = new TopicPartition("my_topic", 0); TopicPartition myTopicPartition1 = new TopicPartition("my_topic", 1); TopicPartition myTopicPartition2 = new TopicPartition("my_topic", 2); + TopicPartition myTopicPartition3 = new TopicPartition("my_topic", 3); final Map responseData = new HashMap<>(); responseData.put(myTopicPartition0, new OffsetFetchResponse.PartitionData(10, @@ -1502,15 +1503,19 @@ public class KafkaAdminClientTest { Optional.empty(), "", Errors.NONE)); responseData.put(myTopicPartition2, new OffsetFetchResponse.PartitionData(20, Optional.empty(), "", Errors.NONE)); + responseData.put(myTopicPartition3, new OffsetFetchResponse.PartitionData(OffsetFetchResponse.INVALID_OFFSET, + Optional.empty(), "", Errors.NONE)); env.kafkaClient().prepareResponse(new OffsetFetchResponse(Errors.NONE, responseData)); final ListConsumerGroupOffsetsResult result = env.adminClient().listConsumerGroupOffsets("group-0"); final Map partitionToOffsetAndMetadata = result.partitionsToOffsetAndMetadata().get(); - assertEquals(3, partitionToOffsetAndMetadata.size()); + assertEquals(4, partitionToOffsetAndMetadata.size()); assertEquals(10, partitionToOffsetAndMetadata.get(myTopicPartition0).offset()); assertEquals(0, partitionToOffsetAndMetadata.get(myTopicPartition1).offset()); assertEquals(20, partitionToOffsetAndMetadata.get(myTopicPartition2).offset()); + assertTrue(partitionToOffsetAndMetadata.containsKey(myTopicPartition3)); + assertNull(partitionToOffsetAndMetadata.get(myTopicPartition3)); } }