From 747dc930ff710ec890b5f054b4ec4bf7bbb7c960 Mon Sep 17 00:00:00 2001 From: Jason Gustafson Date: Mon, 18 Jan 2016 09:54:02 -0800 Subject: [PATCH] KAFKA-2695: limited support for nullable byte arrays Author: Jason Gustafson Reviewers: Guozhang Wang Closes #780 from hachikuji/KAFKA-2695 --- .../consumer/internals/ConsumerProtocol.java | 4 +- .../kafka/common/protocol/types/Struct.java | 2 + .../kafka/common/protocol/types/Type.java | 67 +++++++++++++++++++ .../internals/ConsumerProtocolTest.java | 18 +++++ .../types/ProtocolSerializationTest.java | 25 +++++-- 5 files changed, 110 insertions(+), 6 deletions(-) diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerProtocol.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerProtocol.java index 3f87995642d..361865d0082 100644 --- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerProtocol.java +++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerProtocol.java @@ -71,13 +71,13 @@ public class ConsumerProtocol { public static final Schema SUBSCRIPTION_V0 = new Schema( new Field(TOPICS_KEY_NAME, new ArrayOf(Type.STRING)), - new Field(USER_DATA_KEY_NAME, Type.BYTES)); + new Field(USER_DATA_KEY_NAME, Type.NULLABLE_BYTES)); public static final Schema TOPIC_ASSIGNMENT_V0 = new Schema( new Field(TOPIC_KEY_NAME, Type.STRING), new Field(PARTITIONS_KEY_NAME, new ArrayOf(Type.INT32))); public static final Schema ASSIGNMENT_V0 = new Schema( new Field(TOPIC_PARTITIONS_KEY_NAME, new ArrayOf(TOPIC_ASSIGNMENT_V0)), - new Field(USER_DATA_KEY_NAME, Type.BYTES)); + new Field(USER_DATA_KEY_NAME, Type.NULLABLE_BYTES)); public static ByteBuffer serializeSubscription(PartitionAssignor.Subscription subscription) { Struct struct = new Struct(SUBSCRIPTION_V0); diff --git a/clients/src/main/java/org/apache/kafka/common/protocol/types/Struct.java b/clients/src/main/java/org/apache/kafka/common/protocol/types/Struct.java index 54c3debd0fb..4902f25d175 100644 --- a/clients/src/main/java/org/apache/kafka/common/protocol/types/Struct.java +++ b/clients/src/main/java/org/apache/kafka/common/protocol/types/Struct.java @@ -51,6 +51,8 @@ public class Struct { return value; else if (field.defaultValue != Field.NO_DEFAULT) return field.defaultValue; + else if (field.type.isNullable()) + return null; else throw new SchemaException("Missing value for field '" + field.name + "' which has no default value."); } diff --git a/clients/src/main/java/org/apache/kafka/common/protocol/types/Type.java b/clients/src/main/java/org/apache/kafka/common/protocol/types/Type.java index 9ea28b27c34..04833877979 100644 --- a/clients/src/main/java/org/apache/kafka/common/protocol/types/Type.java +++ b/clients/src/main/java/org/apache/kafka/common/protocol/types/Type.java @@ -51,6 +51,14 @@ public abstract class Type { */ public abstract int sizeOf(Object o); + /** + * Check if the type supports null values + * @return whether or not null is a valid value for the type implementation + */ + public boolean isNullable() { + return false; + } + public static final Type INT8 = new Type() { @Override public void write(ByteBuffer buffer, Object o) { @@ -247,4 +255,63 @@ public abstract class Type { } }; + public static final Type NULLABLE_BYTES = new Type() { + @Override + public boolean isNullable() { + return true; + } + + @Override + public void write(ByteBuffer buffer, Object o) { + if (o == null) { + buffer.putInt(-1); + return; + } + + ByteBuffer arg = (ByteBuffer) o; + int pos = arg.position(); + buffer.putInt(arg.remaining()); + buffer.put(arg); + arg.position(pos); + } + + @Override + public Object read(ByteBuffer buffer) { + int size = buffer.getInt(); + if (size < 0) + return null; + + ByteBuffer val = buffer.slice(); + val.limit(size); + buffer.position(buffer.position() + size); + return val; + } + + @Override + public int sizeOf(Object o) { + if (o == null) + return 4; + + ByteBuffer buffer = (ByteBuffer) o; + return 4 + buffer.remaining(); + } + + @Override + public String toString() { + return "NULLABLE_BYTES"; + } + + @Override + public ByteBuffer validate(Object item) { + if (item == null) + return null; + + if (item instanceof ByteBuffer) + return (ByteBuffer) item; + + throw new SchemaException(item + " is not a java.nio.ByteBuffer."); + } + }; + + } diff --git a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerProtocolTest.java b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerProtocolTest.java index 8113770b78a..be98ce7e989 100644 --- a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerProtocolTest.java +++ b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerProtocolTest.java @@ -33,16 +33,25 @@ import java.util.List; import java.util.Set; import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNull; public class ConsumerProtocolTest { @Test public void serializeDeserializeMetadata() { Subscription subscription = new Subscription(Arrays.asList("foo", "bar")); + ByteBuffer buffer = ConsumerProtocol.serializeSubscription(subscription); + Subscription parsedSubscription = ConsumerProtocol.deserializeSubscription(buffer); + assertEquals(subscription.topics(), parsedSubscription.topics()); + } + @Test + public void serializeDeserializeNullSubscriptionUserData() { + Subscription subscription = new Subscription(Arrays.asList("foo", "bar"), null); ByteBuffer buffer = ConsumerProtocol.serializeSubscription(subscription); Subscription parsedSubscription = ConsumerProtocol.deserializeSubscription(buffer); assertEquals(subscription.topics(), parsedSubscription.topics()); + assertNull(subscription.userData()); } @Test @@ -81,6 +90,15 @@ public class ConsumerProtocolTest { assertEquals(toSet(partitions), toSet(parsedAssignment.partitions())); } + @Test + public void deserializeNullAssignmentUserData() { + List partitions = Arrays.asList(new TopicPartition("foo", 0), new TopicPartition("bar", 2)); + ByteBuffer buffer = ConsumerProtocol.serializeAssignment(new PartitionAssignor.Assignment(partitions, null)); + PartitionAssignor.Assignment parsedAssignment = ConsumerProtocol.deserializeAssignment(buffer); + assertEquals(toSet(partitions), toSet(parsedAssignment.partitions())); + assertNull(parsedAssignment.userData()); + } + @Test public void deserializeNewAssignmentVersion() { // verify that a new version which adds a field is still parseable diff --git a/clients/src/test/java/org/apache/kafka/common/protocol/types/ProtocolSerializationTest.java b/clients/src/test/java/org/apache/kafka/common/protocol/types/ProtocolSerializationTest.java index d2e2782754f..9fe20c145c3 100644 --- a/clients/src/test/java/org/apache/kafka/common/protocol/types/ProtocolSerializationTest.java +++ b/clients/src/test/java/org/apache/kafka/common/protocol/types/ProtocolSerializationTest.java @@ -39,14 +39,16 @@ public class ProtocolSerializationTest { new Field("int64", Type.INT64), new Field("string", Type.STRING), new Field("bytes", Type.BYTES), + new Field("nullable_bytes", Type.NULLABLE_BYTES), new Field("array", new ArrayOf(Type.INT32)), - new Field("struct", new Schema(new Field("field", Type.INT32)))); + new Field("struct", new Schema(new Field("field", new ArrayOf(Type.INT32))))); this.struct = new Struct(this.schema).set("int8", (byte) 1) .set("int16", (short) 1) .set("int32", 1) .set("int64", 1L) .set("string", "1") - .set("bytes", "1".getBytes()) + .set("bytes", ByteBuffer.wrap("1".getBytes())) + .set("nullable_bytes", null) .set("array", new Object[] {1}); this.struct.set("struct", this.struct.instance("struct").set("field", new Object[] {1, 2, 3})); } @@ -62,6 +64,9 @@ public class ProtocolSerializationTest { check(Type.STRING, "A\u00ea\u00f1\u00fcC"); check(Type.BYTES, ByteBuffer.allocate(0)); check(Type.BYTES, ByteBuffer.wrap("abcd".getBytes())); + check(Type.NULLABLE_BYTES, null); + check(Type.NULLABLE_BYTES, ByteBuffer.allocate(0)); + check(Type.NULLABLE_BYTES, ByteBuffer.wrap("abcd".getBytes())); check(new ArrayOf(Type.INT32), new Object[] {1, 2, 3, 4}); check(new ArrayOf(Type.STRING), new Object[] {}); check(new ArrayOf(Type.STRING), new Object[] {"hello", "there", "beautiful"}); @@ -74,9 +79,11 @@ public class ProtocolSerializationTest { try { this.struct.set(f, null); this.struct.validate(); - fail("Should not allow serialization of null value."); + if (!f.type.isNullable()) + fail("Should not allow serialization of null value."); } catch (SchemaException e) { - // this is good + assertFalse(f.type.isNullable()); + } finally { this.struct.set(f, o); } } @@ -90,6 +97,16 @@ public class ProtocolSerializationTest { struct.validate(); // should be valid even with missing value } + @Test + public void testNullableDefault() { + // Should use default even if the field allows null values + ByteBuffer empty = ByteBuffer.allocate(0); + Schema schema = new Schema(new Field("field", Type.NULLABLE_BYTES, "doc", empty)); + Struct struct = new Struct(schema); + assertEquals("Should get the default value", empty, struct.get("field")); + struct.validate(); // should be valid even with missing value + } + @Test public void testArray() { Type type = new ArrayOf(Type.INT8);