Browse Source

KAFKA-14414: Fix request/response header size calculation (#12917)

Reviewers: Mickael Maison <mickael.maison@gmail.com>, Luke Chen <showuon@gmail.com>
pull/12280/head
Divij Vaidya 2 years ago committed by GitHub
parent
commit
b2d8354e10
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 16
      clients/src/main/java/org/apache/kafka/common/requests/RequestHeader.java
  2. 7
      clients/src/main/java/org/apache/kafka/common/requests/ResponseHeader.java
  3. 4
      clients/src/test/java/org/apache/kafka/common/requests/EnvelopeRequestTest.java
  4. 4
      clients/src/test/java/org/apache/kafka/common/requests/EnvelopeResponseTest.java
  5. 6
      clients/src/test/java/org/apache/kafka/common/requests/RequestHeaderTest.java
  6. 4
      clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java

16
clients/src/main/java/org/apache/kafka/common/requests/RequestHeader.java

@ -117,23 +117,23 @@ public class RequestHeader implements AbstractRequestResponse { @@ -117,23 +117,23 @@ public class RequestHeader implements AbstractRequestResponse {
try {
// We derive the header version from the request api version, so we read that first.
// The request api version is part of `RequestHeaderData`, so we reset the buffer position after the read.
int position = buffer.position();
int requestHeaderSize = buffer.remaining();
int bufferStartPositionForHeader = buffer.position();
apiKey = buffer.getShort();
short apiVersion = buffer.getShort();
short headerVersion = ApiKeys.forId(apiKey).requestHeaderVersion(apiVersion);
buffer.position(position);
RequestHeaderData headerData = new RequestHeaderData(new ByteBufferAccessor(buffer), headerVersion);
buffer.position(bufferStartPositionForHeader);
final RequestHeaderData headerData = new RequestHeaderData(new ByteBufferAccessor(buffer), headerVersion);
// Due to a quirk in the protocol, client ID is marked as nullable.
// However, we treat a null client ID as equivalent to an empty client ID.
if (headerData.clientId() == null) {
headerData.setClientId("");
}
final RequestHeader header = new RequestHeader(headerData, headerVersion);
// Size of a buffer required to serialize the information in this header is already known and would not
// change since the RequestHeader object is immutable. Instead of computing it again whenever
// RequestHeader#size() is called, we choose to cache the size value when available.
header.size = requestHeaderSize;
// Size of header is calculated by the shift in the position of buffer's start position during parsing.
// Prior to parsing, the buffer's start position points to header data and after the parsing operation
// the buffer's start position points to api message. For more information on how the buffer is
// constructed, see RequestUtils#serialize()
header.size = Math.max(buffer.position() - bufferStartPositionForHeader, 0);
return header;
} catch (UnsupportedVersionException e) {
throw new InvalidRequestException("Unknown API key " + apiKey, e);

7
clients/src/main/java/org/apache/kafka/common/requests/ResponseHeader.java

@ -96,9 +96,14 @@ public class ResponseHeader implements AbstractRequestResponse { @@ -96,9 +96,14 @@ public class ResponseHeader implements AbstractRequestResponse {
}
public static ResponseHeader parse(ByteBuffer buffer, short headerVersion) {
final int bufferStartPositionForHeader = buffer.position();
final ResponseHeader header = new ResponseHeader(
new ResponseHeaderData(new ByteBufferAccessor(buffer), headerVersion), headerVersion);
header.size = buffer.remaining();
// Size of header is calculated by the shift in the position of buffer's start position during parsing.
// Prior to parsing, the buffer's start position points to header data and after the parsing operation
// the buffer's start position points to api message. For more information on how the buffer is
// constructed, see RequestUtils#serialize()
header.size = Math.max(buffer.position() - bufferStartPositionForHeader, 0);
return header;
}

4
clients/src/test/java/org/apache/kafka/common/requests/EnvelopeRequestTest.java

@ -58,7 +58,9 @@ public class EnvelopeRequestTest { @@ -58,7 +58,9 @@ public class EnvelopeRequestTest {
Send send = request.toSend(header);
ByteBuffer buffer = TestUtils.toBuffer(send);
assertEquals(send.size() - 4, buffer.getInt());
assertEquals(header, RequestHeader.parse(buffer));
RequestHeader parsedHeader = RequestHeader.parse(buffer);
assertEquals(header.size(), parsedHeader.size());
assertEquals(header, parsedHeader);
EnvelopeRequestData parsedRequestData = new EnvelopeRequestData();
parsedRequestData.read(new ByteBufferAccessor(buffer), version);

4
clients/src/test/java/org/apache/kafka/common/requests/EnvelopeResponseTest.java

@ -41,7 +41,9 @@ class EnvelopeResponseTest { @@ -41,7 +41,9 @@ class EnvelopeResponseTest {
Send send = response.toSend(header, version);
ByteBuffer buffer = TestUtils.toBuffer(send);
assertEquals(send.size() - 4, buffer.getInt());
assertEquals(header, ResponseHeader.parse(buffer, headerVersion));
ResponseHeader parsedHeader = ResponseHeader.parse(buffer, headerVersion);
assertEquals(header.size(), parsedHeader.size());
assertEquals(header, parsedHeader);
EnvelopeResponseData parsedResponseData = new EnvelopeResponseData();
parsedResponseData.read(new ByteBufferAccessor(buffer), version);

6
clients/src/test/java/org/apache/kafka/common/requests/RequestHeaderTest.java

@ -135,8 +135,10 @@ public class RequestHeaderTest { @@ -135,8 +135,10 @@ public class RequestHeaderTest {
// actual call to generate the RequestHeader from buffer containing RequestHeaderData
RequestHeader parsed = spy(RequestHeader.parse(buffer));
// verify that the result of cached value is same as actual calculation of size
assertEquals(parsed.size(), parsed.size(new ObjectSerializationCache()));
// verify that the result of cached value of size is same as actual calculation of size
int sizeCalculatedFromData = parsed.size(new ObjectSerializationCache());
int sizeFromCache = parsed.size();
assertEquals(sizeCalculatedFromData, sizeFromCache);
// verify that size(ObjectSerializationCache) is only called once, i.e. during assertEquals call. This validates
// that size() method does not calculate the size instead it uses the cached value

4
clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java

@ -406,6 +406,7 @@ public class RequestResponseTest { @@ -406,6 +406,7 @@ public class RequestResponseTest {
header.write(buffer, serializationCache);
buffer.flip();
ResponseHeader deserialized = ResponseHeader.parse(buffer, header.headerVersion());
assertEquals(header.size(), deserialized.size());
assertEquals(header.correlationId(), deserialized.correlationId());
}
@ -606,7 +607,7 @@ public class RequestResponseTest { @@ -606,7 +607,7 @@ public class RequestResponseTest {
assertEquals(fetchResponse.serialize(version), buf);
FetchResponseData deserialized = new FetchResponseData(new ByteBufferAccessor(buf), version);
ObjectSerializationCache serializationCache = new ObjectSerializationCache();
assertEquals(size, responseHeader.size(serializationCache) + deserialized.size(serializationCache, version));
assertEquals(size, responseHeader.size() + deserialized.size(serializationCache, version));
}
@Test
@ -708,6 +709,7 @@ public class RequestResponseTest { @@ -708,6 +709,7 @@ public class RequestResponseTest {
ByteBuffer serializedRequest = createTopicsRequest.serializeWithHeader(requestHeader);
RequestHeader parsedHeader = RequestHeader.parse(serializedRequest);
assertEquals(requestHeader.size(), parsedHeader.size());
assertEquals(requestHeader, parsedHeader);
RequestAndSize parsedRequest = AbstractRequest.parseRequest(

Loading…
Cancel
Save