Browse Source

kafka-1851; OffsetFetchRequest returns extra partitions when input only contains unknown partitions; patched by Jun Rao; reviewed by Neha Narkhede

pull/38/merge
Jun Rao 10 years ago
parent
commit
e52a6181bf
  1. 6
      core/src/main/scala/kafka/server/KafkaApis.scala
  2. 9
      core/src/test/scala/unit/kafka/server/OffsetCommitTest.scala

6
core/src/main/scala/kafka/server/KafkaApis.scala

@ -396,7 +396,11 @@ class KafkaApis(val requestChannel: RequestChannel,
metadataCache.getPartitionInfo(topicAndPartition.topic, topicAndPartition.partition).isEmpty metadataCache.getPartitionInfo(topicAndPartition.topic, topicAndPartition.partition).isEmpty
) )
val unknownStatus = unknownTopicPartitions.map(topicAndPartition => (topicAndPartition, OffsetMetadataAndError.UnknownTopicOrPartition)).toMap val unknownStatus = unknownTopicPartitions.map(topicAndPartition => (topicAndPartition, OffsetMetadataAndError.UnknownTopicOrPartition)).toMap
val knownStatus = offsetManager.getOffsets(offsetFetchRequest.groupId, knownTopicPartitions).toMap val knownStatus =
if (knownTopicPartitions.size > 0)
offsetManager.getOffsets(offsetFetchRequest.groupId, knownTopicPartitions).toMap
else
Map.empty[TopicAndPartition, OffsetMetadataAndError]
val status = unknownStatus ++ knownStatus val status = unknownStatus ++ knownStatus
val response = OffsetFetchResponse(status, offsetFetchRequest.correlationId) val response = OffsetFetchResponse(status, offsetFetchRequest.correlationId)

9
core/src/test/scala/unit/kafka/server/OffsetCommitTest.scala

@ -79,7 +79,7 @@ class OffsetCommitTest extends JUnit3Suite with ZooKeeperTestHarness {
// create the topic // create the topic
createTopic(zkClient, topic, partitionReplicaAssignment = expectedReplicaAssignment, servers = Seq(server)) createTopic(zkClient, topic, partitionReplicaAssignment = expectedReplicaAssignment, servers = Seq(server))
val commitRequest = OffsetCommitRequest("test-group", immutable.Map(topicAndPartition -> OffsetAndMetadata(offset=42L))) val commitRequest = OffsetCommitRequest(group, immutable.Map(topicAndPartition -> OffsetAndMetadata(offset=42L)))
val commitResponse = simpleConsumer.commitOffsets(commitRequest) val commitResponse = simpleConsumer.commitOffsets(commitRequest)
assertEquals(ErrorMapping.NoError, commitResponse.commitStatus.get(topicAndPartition).get) assertEquals(ErrorMapping.NoError, commitResponse.commitStatus.get(topicAndPartition).get)
@ -109,6 +109,13 @@ class OffsetCommitTest extends JUnit3Suite with ZooKeeperTestHarness {
assertEquals("some metadata", fetchResponse1.requestInfo.get(topicAndPartition).get.metadata) assertEquals("some metadata", fetchResponse1.requestInfo.get(topicAndPartition).get.metadata)
assertEquals(100L, fetchResponse1.requestInfo.get(topicAndPartition).get.offset) assertEquals(100L, fetchResponse1.requestInfo.get(topicAndPartition).get.offset)
// Fetch an unknown topic and verify
val unknownTopicAndPartition = TopicAndPartition("unknownTopic", 0)
val fetchRequest2 = OffsetFetchRequest(group, Seq(unknownTopicAndPartition))
val fetchResponse2 = simpleConsumer.fetchOffsets(fetchRequest2)
assertEquals(OffsetMetadataAndError.UnknownTopicOrPartition, fetchResponse2.requestInfo.get(unknownTopicAndPartition).get)
assertEquals(1, fetchResponse2.requestInfo.size)
} }
@Test @Test

Loading…
Cancel
Save