diff --git a/clients/src/main/java/org/apache/kafka/common/requests/AbstractRequest.java b/clients/src/main/java/org/apache/kafka/common/requests/AbstractRequest.java index b737b49f4a5..97bc72852e3 100644 --- a/clients/src/main/java/org/apache/kafka/common/requests/AbstractRequest.java +++ b/clients/src/main/java/org/apache/kafka/common/requests/AbstractRequest.java @@ -26,7 +26,7 @@ import org.apache.kafka.common.protocol.types.Struct; import java.nio.ByteBuffer; import java.util.Map; -public abstract class AbstractRequest extends AbstractRequestResponse { +public abstract class AbstractRequest implements AbstractRequestResponse { public static abstract class Builder { private final ApiKeys apiKey; @@ -100,7 +100,7 @@ public abstract class AbstractRequest extends AbstractRequestResponse { * Use with care, typically {@link #toSend(String, RequestHeader)} should be used instead. */ public ByteBuffer serialize(RequestHeader header) { - return serialize(header.toStruct(), toStruct()); + return RequestUtils.serialize(header.toStruct(), toStruct()); } protected abstract Struct toStruct(); diff --git a/clients/src/main/java/org/apache/kafka/common/requests/AbstractRequestResponse.java b/clients/src/main/java/org/apache/kafka/common/requests/AbstractRequestResponse.java index 0ba373d6fea..b02659d4f35 100644 --- a/clients/src/main/java/org/apache/kafka/common/requests/AbstractRequestResponse.java +++ b/clients/src/main/java/org/apache/kafka/common/requests/AbstractRequestResponse.java @@ -16,19 +16,5 @@ */ package org.apache.kafka.common.requests; -import org.apache.kafka.common.protocol.types.Struct; - -import java.nio.ByteBuffer; - -public abstract class AbstractRequestResponse { - /** - * Visible for testing. - */ - public static ByteBuffer serialize(Struct headerStruct, Struct bodyStruct) { - ByteBuffer buffer = ByteBuffer.allocate(headerStruct.sizeOf() + bodyStruct.sizeOf()); - headerStruct.writeTo(buffer); - bodyStruct.writeTo(buffer); - buffer.rewind(); - return buffer; - } +public interface AbstractRequestResponse { } diff --git a/clients/src/main/java/org/apache/kafka/common/requests/AbstractResponse.java b/clients/src/main/java/org/apache/kafka/common/requests/AbstractResponse.java index 8a6edf14cba..64701529403 100644 --- a/clients/src/main/java/org/apache/kafka/common/requests/AbstractResponse.java +++ b/clients/src/main/java/org/apache/kafka/common/requests/AbstractResponse.java @@ -28,18 +28,18 @@ import java.util.Collections; import java.util.HashMap; import java.util.Map; -public abstract class AbstractResponse extends AbstractRequestResponse { +public abstract class AbstractResponse implements AbstractRequestResponse { public static final int DEFAULT_THROTTLE_TIME = 0; protected Send toSend(String destination, ResponseHeader header, short apiVersion) { - return new NetworkSend(destination, serialize(header.toStruct(), toStruct(apiVersion))); + return new NetworkSend(destination, RequestUtils.serialize(header.toStruct(), toStruct(apiVersion))); } /** * Visible for testing, typically {@link #toSend(String, ResponseHeader, short)} should be used instead. */ - public ByteBuffer serialize(ApiKeys apiKey, int correlationId) { - return serialize(apiKey, apiKey.latestVersion(), correlationId); + public ByteBuffer serialize(short version, ResponseHeader responseHeader) { + return RequestUtils.serialize(responseHeader.toStruct(), toStruct(version)); } /** @@ -48,7 +48,7 @@ public abstract class AbstractResponse extends AbstractRequestResponse { public ByteBuffer serialize(ApiKeys apiKey, short version, int correlationId) { ResponseHeader header = new ResponseHeader(correlationId, apiKey.responseHeaderVersion(version)); - return serialize(header.toStruct(), toStruct(version)); + return RequestUtils.serialize(header.toStruct(), toStruct(version)); } public abstract Map errorCounts(); diff --git a/clients/src/main/java/org/apache/kafka/common/requests/RequestHeader.java b/clients/src/main/java/org/apache/kafka/common/requests/RequestHeader.java index 8da1eb5fde9..3d80c4e7563 100644 --- a/clients/src/main/java/org/apache/kafka/common/requests/RequestHeader.java +++ b/clients/src/main/java/org/apache/kafka/common/requests/RequestHeader.java @@ -28,7 +28,7 @@ import java.nio.ByteBuffer; /** * The header for a request in the Kafka protocol */ -public class RequestHeader extends AbstractRequestResponse { +public class RequestHeader implements AbstractRequestResponse { private final RequestHeaderData data; private final short headerVersion; diff --git a/clients/src/main/java/org/apache/kafka/common/requests/RequestUtils.java b/clients/src/main/java/org/apache/kafka/common/requests/RequestUtils.java index b4a2420c46a..c3dfaa13b99 100644 --- a/clients/src/main/java/org/apache/kafka/common/requests/RequestUtils.java +++ b/clients/src/main/java/org/apache/kafka/common/requests/RequestUtils.java @@ -28,6 +28,7 @@ import org.apache.kafka.common.resource.ResourcePatternFilter; import org.apache.kafka.common.protocol.types.Struct; import org.apache.kafka.common.resource.ResourceType; +import java.nio.ByteBuffer; import java.util.Optional; import static org.apache.kafka.common.protocol.CommonFields.HOST; @@ -42,7 +43,7 @@ import static org.apache.kafka.common.protocol.CommonFields.RESOURCE_PATTERN_TYP import static org.apache.kafka.common.protocol.CommonFields.RESOURCE_PATTERN_TYPE_FILTER; import static org.apache.kafka.common.protocol.CommonFields.RESOURCE_TYPE; -final class RequestUtils { +public final class RequestUtils { private RequestUtils() {} @@ -122,4 +123,12 @@ final class RequestUtils { Optional.empty() : Optional.of(leaderEpoch); return leaderEpochOpt; } + + public static ByteBuffer serialize(Struct headerStruct, Struct bodyStruct) { + ByteBuffer buffer = ByteBuffer.allocate(headerStruct.sizeOf() + bodyStruct.sizeOf()); + headerStruct.writeTo(buffer); + bodyStruct.writeTo(buffer); + buffer.rewind(); + return buffer; + } } diff --git a/clients/src/main/java/org/apache/kafka/common/requests/ResponseHeader.java b/clients/src/main/java/org/apache/kafka/common/requests/ResponseHeader.java index 249b5d02108..118e5d3506d 100644 --- a/clients/src/main/java/org/apache/kafka/common/requests/ResponseHeader.java +++ b/clients/src/main/java/org/apache/kafka/common/requests/ResponseHeader.java @@ -25,7 +25,7 @@ import java.nio.ByteBuffer; /** * A response header in the kafka protocol. */ -public class ResponseHeader extends AbstractRequestResponse { +public class ResponseHeader implements AbstractRequestResponse { private final ResponseHeaderData data; private final short headerVersion; diff --git a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetcherTest.java b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetcherTest.java index d3fcbc334c7..fd1be374b7e 100644 --- a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetcherTest.java +++ b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetcherTest.java @@ -1999,7 +1999,7 @@ public class FetcherTest { ByteBuffer buffer = ApiVersionsResponse. createApiVersionsResponse(400, RecordBatch.CURRENT_MAGIC_VALUE). - serialize(ApiKeys.API_VERSIONS, 0); + serialize(ApiKeys.API_VERSIONS, ApiKeys.API_VERSIONS.latestVersion(), 0); selector.delayedReceive(new DelayedReceive(node.idString(), new NetworkReceive(node.idString(), buffer))); while (!client.ready(node, time.milliseconds())) { client.poll(1, time.milliseconds()); @@ -2016,7 +2016,9 @@ public class FetcherTest { client.send(request, time.milliseconds()); client.poll(1, time.milliseconds()); FetchResponse response = fullFetchResponse(tp0, nextRecords, Errors.NONE, i, throttleTimeMs); - buffer = response.serialize(ApiKeys.FETCH, request.correlationId()); + buffer = response.serialize(ApiKeys.FETCH, + ApiKeys.FETCH.latestVersion(), + request.correlationId()); selector.completeReceive(new NetworkReceive(node.idString(), buffer)); client.poll(1, time.milliseconds()); // If a throttled response is received, advance the time to ensure progress. diff --git a/clients/src/test/java/org/apache/kafka/clients/producer/internals/SenderTest.java b/clients/src/test/java/org/apache/kafka/clients/producer/internals/SenderTest.java index 71180bdd7f7..1b35e0b4311 100644 --- a/clients/src/test/java/org/apache/kafka/clients/producer/internals/SenderTest.java +++ b/clients/src/test/java/org/apache/kafka/clients/producer/internals/SenderTest.java @@ -275,7 +275,7 @@ public class SenderTest { time, true, new ApiVersions(), throttleTimeSensor, logContext); ByteBuffer buffer = ApiVersionsResponse.createApiVersionsResponse(400, RecordBatch.CURRENT_MAGIC_VALUE). - serialize(ApiKeys.API_VERSIONS, 0); + serialize(ApiKeys.API_VERSIONS, ApiKeys.API_VERSIONS.latestVersion(), 0); selector.delayedReceive(new DelayedReceive(node.idString(), new NetworkReceive(node.idString(), buffer))); while (!client.ready(node, time.milliseconds())) { client.poll(1, time.milliseconds()); @@ -292,7 +292,8 @@ public class SenderTest { client.send(request, time.milliseconds()); client.poll(1, time.milliseconds()); ProduceResponse response = produceResponse(tp0, i, Errors.NONE, throttleTimeMs); - buffer = response.serialize(ApiKeys.PRODUCE, request.correlationId()); + buffer = response. + serialize(ApiKeys.PRODUCE, ApiKeys.PRODUCE.latestVersion(), request.correlationId()); selector.completeReceive(new NetworkReceive(node.idString(), buffer)); client.poll(1, time.milliseconds()); // If a throttled response is received, advance the time to ensure progress. diff --git a/core/src/test/scala/unit/kafka/server/BaseRequestTest.scala b/core/src/test/scala/unit/kafka/server/BaseRequestTest.scala index 5d00d540b14..2cc355395d4 100644 --- a/core/src/test/scala/unit/kafka/server/BaseRequestTest.scala +++ b/core/src/test/scala/unit/kafka/server/BaseRequestTest.scala @@ -29,7 +29,7 @@ import kafka.network.SocketServer import org.apache.kafka.common.network.ListenerName import org.apache.kafka.common.protocol.types.Struct import org.apache.kafka.common.protocol.ApiKeys -import org.apache.kafka.common.requests.{AbstractRequest, AbstractRequestResponse, RequestHeader, ResponseHeader} +import org.apache.kafka.common.requests.{AbstractRequest, RequestHeader, RequestUtils, ResponseHeader} import org.apache.kafka.common.security.auth.SecurityProtocol abstract class BaseRequestTest extends IntegrationTestHarness { @@ -170,7 +170,7 @@ abstract class BaseRequestTest extends IntegrationTestHarness { */ def sendStructAndReceive(requestStruct: Struct, apiKey: ApiKeys, socket: Socket, apiVersion: Short): ByteBuffer = { val header = nextRequestHeader(apiKey, apiVersion) - val serializedBytes = AbstractRequestResponse.serialize(header.toStruct, requestStruct).array + val serializedBytes = RequestUtils.serialize(header.toStruct, requestStruct).array val response = requestAndReceive(socket, serializedBytes) skipResponseHeader(response, apiKey.responseHeaderVersion(apiVersion)) }