From e52a6181bf0969f315ac0f0d325eac34d2b4a6ee Mon Sep 17 00:00:00 2001 From: Jun Rao Date: Fri, 9 Jan 2015 11:33:48 -0800 Subject: [PATCH] kafka-1851; OffsetFetchRequest returns extra partitions when input only contains unknown partitions; patched by Jun Rao; reviewed by Neha Narkhede --- core/src/main/scala/kafka/server/KafkaApis.scala | 6 +++++- .../test/scala/unit/kafka/server/OffsetCommitTest.scala | 9 ++++++++- 2 files changed, 13 insertions(+), 2 deletions(-) diff --git a/core/src/main/scala/kafka/server/KafkaApis.scala b/core/src/main/scala/kafka/server/KafkaApis.scala index 2a1c0326b6e..c011a1b79bd 100644 --- a/core/src/main/scala/kafka/server/KafkaApis.scala +++ b/core/src/main/scala/kafka/server/KafkaApis.scala @@ -396,7 +396,11 @@ class KafkaApis(val requestChannel: RequestChannel, metadataCache.getPartitionInfo(topicAndPartition.topic, topicAndPartition.partition).isEmpty ) val unknownStatus = unknownTopicPartitions.map(topicAndPartition => (topicAndPartition, OffsetMetadataAndError.UnknownTopicOrPartition)).toMap - val knownStatus = offsetManager.getOffsets(offsetFetchRequest.groupId, knownTopicPartitions).toMap + val knownStatus = + if (knownTopicPartitions.size > 0) + offsetManager.getOffsets(offsetFetchRequest.groupId, knownTopicPartitions).toMap + else + Map.empty[TopicAndPartition, OffsetMetadataAndError] val status = unknownStatus ++ knownStatus val response = OffsetFetchResponse(status, offsetFetchRequest.correlationId) diff --git a/core/src/test/scala/unit/kafka/server/OffsetCommitTest.scala b/core/src/test/scala/unit/kafka/server/OffsetCommitTest.scala index 8c5364fa97d..4a3a5b264a0 100644 --- a/core/src/test/scala/unit/kafka/server/OffsetCommitTest.scala +++ b/core/src/test/scala/unit/kafka/server/OffsetCommitTest.scala @@ -79,7 +79,7 @@ class OffsetCommitTest extends JUnit3Suite with ZooKeeperTestHarness { // create the topic createTopic(zkClient, topic, partitionReplicaAssignment = expectedReplicaAssignment, servers = Seq(server)) - val commitRequest = OffsetCommitRequest("test-group", immutable.Map(topicAndPartition -> OffsetAndMetadata(offset=42L))) + val commitRequest = OffsetCommitRequest(group, immutable.Map(topicAndPartition -> OffsetAndMetadata(offset=42L))) val commitResponse = simpleConsumer.commitOffsets(commitRequest) assertEquals(ErrorMapping.NoError, commitResponse.commitStatus.get(topicAndPartition).get) @@ -109,6 +109,13 @@ class OffsetCommitTest extends JUnit3Suite with ZooKeeperTestHarness { assertEquals("some metadata", fetchResponse1.requestInfo.get(topicAndPartition).get.metadata) assertEquals(100L, fetchResponse1.requestInfo.get(topicAndPartition).get.offset) + // Fetch an unknown topic and verify + val unknownTopicAndPartition = TopicAndPartition("unknownTopic", 0) + val fetchRequest2 = OffsetFetchRequest(group, Seq(unknownTopicAndPartition)) + val fetchResponse2 = simpleConsumer.fetchOffsets(fetchRequest2) + + assertEquals(OffsetMetadataAndError.UnknownTopicOrPartition, fetchResponse2.requestInfo.get(unknownTopicAndPartition).get) + assertEquals(1, fetchResponse2.requestInfo.size) } @Test