Browse Source

MINOR: AbstractRequestResponse should be an interface (#7513)

AbstractRequestResponse should be an interface, since it has no concrete elements or implementation.  Move AbstractRequestResponse#serialize to RequestUtils#serialize and make it package-private, since it doesn't need to be public.

Reviewers: Ismael Juma <ismael@juma.me.uk>
pull/7546/head
Colin Patrick McCabe 5 years ago committed by GitHub
parent
commit
3cb8ccf63a
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 4
      clients/src/main/java/org/apache/kafka/common/requests/AbstractRequest.java
  2. 16
      clients/src/main/java/org/apache/kafka/common/requests/AbstractRequestResponse.java
  3. 10
      clients/src/main/java/org/apache/kafka/common/requests/AbstractResponse.java
  4. 2
      clients/src/main/java/org/apache/kafka/common/requests/RequestHeader.java
  5. 11
      clients/src/main/java/org/apache/kafka/common/requests/RequestUtils.java
  6. 2
      clients/src/main/java/org/apache/kafka/common/requests/ResponseHeader.java
  7. 6
      clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetcherTest.java
  8. 5
      clients/src/test/java/org/apache/kafka/clients/producer/internals/SenderTest.java
  9. 4
      core/src/test/scala/unit/kafka/server/BaseRequestTest.scala

4
clients/src/main/java/org/apache/kafka/common/requests/AbstractRequest.java

@ -26,7 +26,7 @@ import org.apache.kafka.common.protocol.types.Struct; @@ -26,7 +26,7 @@ import org.apache.kafka.common.protocol.types.Struct;
import java.nio.ByteBuffer;
import java.util.Map;
public abstract class AbstractRequest extends AbstractRequestResponse {
public abstract class AbstractRequest implements AbstractRequestResponse {
public static abstract class Builder<T extends AbstractRequest> {
private final ApiKeys apiKey;
@ -100,7 +100,7 @@ public abstract class AbstractRequest extends AbstractRequestResponse { @@ -100,7 +100,7 @@ public abstract class AbstractRequest extends AbstractRequestResponse {
* Use with care, typically {@link #toSend(String, RequestHeader)} should be used instead.
*/
public ByteBuffer serialize(RequestHeader header) {
return serialize(header.toStruct(), toStruct());
return RequestUtils.serialize(header.toStruct(), toStruct());
}
protected abstract Struct toStruct();

16
clients/src/main/java/org/apache/kafka/common/requests/AbstractRequestResponse.java

@ -16,19 +16,5 @@ @@ -16,19 +16,5 @@
*/
package org.apache.kafka.common.requests;
import org.apache.kafka.common.protocol.types.Struct;
import java.nio.ByteBuffer;
public abstract class AbstractRequestResponse {
/**
* Visible for testing.
*/
public static ByteBuffer serialize(Struct headerStruct, Struct bodyStruct) {
ByteBuffer buffer = ByteBuffer.allocate(headerStruct.sizeOf() + bodyStruct.sizeOf());
headerStruct.writeTo(buffer);
bodyStruct.writeTo(buffer);
buffer.rewind();
return buffer;
}
public interface AbstractRequestResponse {
}

10
clients/src/main/java/org/apache/kafka/common/requests/AbstractResponse.java

@ -28,18 +28,18 @@ import java.util.Collections; @@ -28,18 +28,18 @@ import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
public abstract class AbstractResponse extends AbstractRequestResponse {
public abstract class AbstractResponse implements AbstractRequestResponse {
public static final int DEFAULT_THROTTLE_TIME = 0;
protected Send toSend(String destination, ResponseHeader header, short apiVersion) {
return new NetworkSend(destination, serialize(header.toStruct(), toStruct(apiVersion)));
return new NetworkSend(destination, RequestUtils.serialize(header.toStruct(), toStruct(apiVersion)));
}
/**
* Visible for testing, typically {@link #toSend(String, ResponseHeader, short)} should be used instead.
*/
public ByteBuffer serialize(ApiKeys apiKey, int correlationId) {
return serialize(apiKey, apiKey.latestVersion(), correlationId);
public ByteBuffer serialize(short version, ResponseHeader responseHeader) {
return RequestUtils.serialize(responseHeader.toStruct(), toStruct(version));
}
/**
@ -48,7 +48,7 @@ public abstract class AbstractResponse extends AbstractRequestResponse { @@ -48,7 +48,7 @@ public abstract class AbstractResponse extends AbstractRequestResponse {
public ByteBuffer serialize(ApiKeys apiKey, short version, int correlationId) {
ResponseHeader header =
new ResponseHeader(correlationId, apiKey.responseHeaderVersion(version));
return serialize(header.toStruct(), toStruct(version));
return RequestUtils.serialize(header.toStruct(), toStruct(version));
}
public abstract Map<Errors, Integer> errorCounts();

2
clients/src/main/java/org/apache/kafka/common/requests/RequestHeader.java

@ -28,7 +28,7 @@ import java.nio.ByteBuffer; @@ -28,7 +28,7 @@ import java.nio.ByteBuffer;
/**
* The header for a request in the Kafka protocol
*/
public class RequestHeader extends AbstractRequestResponse {
public class RequestHeader implements AbstractRequestResponse {
private final RequestHeaderData data;
private final short headerVersion;

11
clients/src/main/java/org/apache/kafka/common/requests/RequestUtils.java

@ -28,6 +28,7 @@ import org.apache.kafka.common.resource.ResourcePatternFilter; @@ -28,6 +28,7 @@ import org.apache.kafka.common.resource.ResourcePatternFilter;
import org.apache.kafka.common.protocol.types.Struct;
import org.apache.kafka.common.resource.ResourceType;
import java.nio.ByteBuffer;
import java.util.Optional;
import static org.apache.kafka.common.protocol.CommonFields.HOST;
@ -42,7 +43,7 @@ import static org.apache.kafka.common.protocol.CommonFields.RESOURCE_PATTERN_TYP @@ -42,7 +43,7 @@ import static org.apache.kafka.common.protocol.CommonFields.RESOURCE_PATTERN_TYP
import static org.apache.kafka.common.protocol.CommonFields.RESOURCE_PATTERN_TYPE_FILTER;
import static org.apache.kafka.common.protocol.CommonFields.RESOURCE_TYPE;
final class RequestUtils {
public final class RequestUtils {
private RequestUtils() {}
@ -122,4 +123,12 @@ final class RequestUtils { @@ -122,4 +123,12 @@ final class RequestUtils {
Optional.empty() : Optional.of(leaderEpoch);
return leaderEpochOpt;
}
public static ByteBuffer serialize(Struct headerStruct, Struct bodyStruct) {
ByteBuffer buffer = ByteBuffer.allocate(headerStruct.sizeOf() + bodyStruct.sizeOf());
headerStruct.writeTo(buffer);
bodyStruct.writeTo(buffer);
buffer.rewind();
return buffer;
}
}

2
clients/src/main/java/org/apache/kafka/common/requests/ResponseHeader.java

@ -25,7 +25,7 @@ import java.nio.ByteBuffer; @@ -25,7 +25,7 @@ import java.nio.ByteBuffer;
/**
* A response header in the kafka protocol.
*/
public class ResponseHeader extends AbstractRequestResponse {
public class ResponseHeader implements AbstractRequestResponse {
private final ResponseHeaderData data;
private final short headerVersion;

6
clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetcherTest.java

@ -1999,7 +1999,7 @@ public class FetcherTest { @@ -1999,7 +1999,7 @@ public class FetcherTest {
ByteBuffer buffer = ApiVersionsResponse.
createApiVersionsResponse(400, RecordBatch.CURRENT_MAGIC_VALUE).
serialize(ApiKeys.API_VERSIONS, 0);
serialize(ApiKeys.API_VERSIONS, ApiKeys.API_VERSIONS.latestVersion(), 0);
selector.delayedReceive(new DelayedReceive(node.idString(), new NetworkReceive(node.idString(), buffer)));
while (!client.ready(node, time.milliseconds())) {
client.poll(1, time.milliseconds());
@ -2016,7 +2016,9 @@ public class FetcherTest { @@ -2016,7 +2016,9 @@ public class FetcherTest {
client.send(request, time.milliseconds());
client.poll(1, time.milliseconds());
FetchResponse response = fullFetchResponse(tp0, nextRecords, Errors.NONE, i, throttleTimeMs);
buffer = response.serialize(ApiKeys.FETCH, request.correlationId());
buffer = response.serialize(ApiKeys.FETCH,
ApiKeys.FETCH.latestVersion(),
request.correlationId());
selector.completeReceive(new NetworkReceive(node.idString(), buffer));
client.poll(1, time.milliseconds());
// If a throttled response is received, advance the time to ensure progress.

5
clients/src/test/java/org/apache/kafka/clients/producer/internals/SenderTest.java

@ -275,7 +275,7 @@ public class SenderTest { @@ -275,7 +275,7 @@ public class SenderTest {
time, true, new ApiVersions(), throttleTimeSensor, logContext);
ByteBuffer buffer = ApiVersionsResponse.createApiVersionsResponse(400, RecordBatch.CURRENT_MAGIC_VALUE).
serialize(ApiKeys.API_VERSIONS, 0);
serialize(ApiKeys.API_VERSIONS, ApiKeys.API_VERSIONS.latestVersion(), 0);
selector.delayedReceive(new DelayedReceive(node.idString(), new NetworkReceive(node.idString(), buffer)));
while (!client.ready(node, time.milliseconds())) {
client.poll(1, time.milliseconds());
@ -292,7 +292,8 @@ public class SenderTest { @@ -292,7 +292,8 @@ public class SenderTest {
client.send(request, time.milliseconds());
client.poll(1, time.milliseconds());
ProduceResponse response = produceResponse(tp0, i, Errors.NONE, throttleTimeMs);
buffer = response.serialize(ApiKeys.PRODUCE, request.correlationId());
buffer = response.
serialize(ApiKeys.PRODUCE, ApiKeys.PRODUCE.latestVersion(), request.correlationId());
selector.completeReceive(new NetworkReceive(node.idString(), buffer));
client.poll(1, time.milliseconds());
// If a throttled response is received, advance the time to ensure progress.

4
core/src/test/scala/unit/kafka/server/BaseRequestTest.scala

@ -29,7 +29,7 @@ import kafka.network.SocketServer @@ -29,7 +29,7 @@ import kafka.network.SocketServer
import org.apache.kafka.common.network.ListenerName
import org.apache.kafka.common.protocol.types.Struct
import org.apache.kafka.common.protocol.ApiKeys
import org.apache.kafka.common.requests.{AbstractRequest, AbstractRequestResponse, RequestHeader, ResponseHeader}
import org.apache.kafka.common.requests.{AbstractRequest, RequestHeader, RequestUtils, ResponseHeader}
import org.apache.kafka.common.security.auth.SecurityProtocol
abstract class BaseRequestTest extends IntegrationTestHarness {
@ -170,7 +170,7 @@ abstract class BaseRequestTest extends IntegrationTestHarness { @@ -170,7 +170,7 @@ abstract class BaseRequestTest extends IntegrationTestHarness {
*/
def sendStructAndReceive(requestStruct: Struct, apiKey: ApiKeys, socket: Socket, apiVersion: Short): ByteBuffer = {
val header = nextRequestHeader(apiKey, apiVersion)
val serializedBytes = AbstractRequestResponse.serialize(header.toStruct, requestStruct).array
val serializedBytes = RequestUtils.serialize(header.toStruct, requestStruct).array
val response = requestAndReceive(socket, serializedBytes)
skipResponseHeader(response, apiKey.responseHeaderVersion(apiVersion))
}

Loading…
Cancel
Save