Browse Source

KAFKA-9200: ListOffsetRequest missing error response for v5 (#7704)

ListOffsetResponse getErrorResponse is missing a a case for version 5, introduced
by 152292994e and released in 2.3.0.

```
java.lang.IllegalArgumentException: Version 5 is not valid. Valid versions for ListOffsetRequest are 0 to 5                                                                                                                                                                                                                                                               
        at org.apache.kafka.common.requests.ListOffsetRequest.getErrorResponse(ListOffsetRequest.java:282)                                                                                                                                                                                                                                                                
        at kafka.server.KafkaApis.sendErrorOrCloseConnection(KafkaApis.scala:3062)                                                                                                                                                                                                                                                                                        
        at kafka.server.KafkaApis.sendErrorResponseMaybeThrottle(KafkaApis.scala:3045)                                                                                                                                                                                                                                                                                    
        at kafka.server.KafkaApis.handleError(KafkaApis.scala:3027)                                                                                                                                                                                                                                                                                                       
        at kafka.server.KafkaApis.handle(KafkaApis.scala:209)                                                                                                                                                                                                                                                                                                             
        at kafka.server.KafkaRequestHandler.run(KafkaRequestHandler.scala:78)                                                                                                                                                                                                                                                                                             
        at java.lang.Thread.run(Thread.java:748)
```

Reviewers: Ismael Juma <ismael@juma.me.uk>
pull/7700/head
Lucas Bradstreet 5 years ago committed by Ismael Juma
parent
commit
374e480352
  1. 3
      clients/src/main/java/org/apache/kafka/common/requests/ListOffsetRequest.java
  2. 15
      clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java

3
clients/src/main/java/org/apache/kafka/common/requests/ListOffsetRequest.java

@ -269,12 +269,13 @@ public class ListOffsetRequest extends AbstractRequest { @@ -269,12 +269,13 @@ public class ListOffsetRequest extends AbstractRequest {
responseData.put(partition, partitionError);
}
switch (version()) {
switch (versionId) {
case 0:
case 1:
case 2:
case 3:
case 4:
case 5:
return new ListOffsetResponse(throttleTimeMs, responseData);
default:
throw new IllegalArgumentException(String.format("Version %d is not valid. Valid versions for %s are 0 to %d",

15
clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java

@ -210,12 +210,11 @@ public class RequestResponseTest { @@ -210,12 +210,11 @@ public class RequestResponseTest {
checkRequest(createDeleteGroupsRequest(), true);
checkErrorResponse(createDeleteGroupsRequest(), new UnknownServerException(), true);
checkResponse(createDeleteGroupsResponse(), 0, true);
checkRequest(createListOffsetRequest(1), true);
checkErrorResponse(createListOffsetRequest(1), new UnknownServerException(), true);
checkResponse(createListOffsetResponse(1), 1, true);
checkRequest(createListOffsetRequest(2), true);
checkErrorResponse(createListOffsetRequest(2), new UnknownServerException(), true);
checkResponse(createListOffsetResponse(2), 2, true);
for (int i = 0; i < ApiKeys.LIST_OFFSETS.latestVersion(); i++) {
checkRequest(createListOffsetRequest(i), true);
checkErrorResponse(createListOffsetRequest(i), new UnknownServerException(), true);
checkResponse(createListOffsetResponse(i), i, true);
}
checkRequest(MetadataRequest.Builder.allTopics().build((short) 2), true);
checkRequest(createMetadataRequest(1, Collections.singletonList("topic1")), true);
checkErrorResponse(createMetadataRequest(1, Collections.singletonList("topic1")), new UnknownServerException(), true);
@ -1097,7 +1096,7 @@ public class RequestResponseTest { @@ -1097,7 +1096,7 @@ public class RequestResponseTest {
.forConsumer(true, IsolationLevel.READ_UNCOMMITTED)
.setTargetTimes(offsetData)
.build((short) version);
} else if (version == 2) {
} else if (version >= 2 && version <= 5) {
Map<TopicPartition, ListOffsetRequest.PartitionData> offsetData = Collections.singletonMap(
new TopicPartition("test", 0),
new ListOffsetRequest.PartitionData(1000000L, Optional.of(5)));
@ -1117,7 +1116,7 @@ public class RequestResponseTest { @@ -1117,7 +1116,7 @@ public class RequestResponseTest {
responseData.put(new TopicPartition("test", 0),
new ListOffsetResponse.PartitionData(Errors.NONE, asList(100L)));
return new ListOffsetResponse(responseData);
} else if (version == 1 || version == 2) {
} else if (version >= 1 && version <= 5) {
Map<TopicPartition, ListOffsetResponse.PartitionData> responseData = new HashMap<>();
responseData.put(new TopicPartition("test", 0),
new ListOffsetResponse.PartitionData(Errors.NONE, 10000L, 100L, Optional.of(27)));

Loading…
Cancel
Save