Browse Source

KAFKA-2695: limited support for nullable byte arrays

Author: Jason Gustafson <jason@confluent.io>

Reviewers: Guozhang Wang

Closes #780 from hachikuji/KAFKA-2695
pull/780/merge
Jason Gustafson 9 years ago committed by Guozhang Wang
parent
commit
747dc930ff
  1. 4
      clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerProtocol.java
  2. 2
      clients/src/main/java/org/apache/kafka/common/protocol/types/Struct.java
  3. 67
      clients/src/main/java/org/apache/kafka/common/protocol/types/Type.java
  4. 18
      clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerProtocolTest.java
  5. 25
      clients/src/test/java/org/apache/kafka/common/protocol/types/ProtocolSerializationTest.java

4
clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerProtocol.java

@ -71,13 +71,13 @@ public class ConsumerProtocol { @@ -71,13 +71,13 @@ public class ConsumerProtocol {
public static final Schema SUBSCRIPTION_V0 = new Schema(
new Field(TOPICS_KEY_NAME, new ArrayOf(Type.STRING)),
new Field(USER_DATA_KEY_NAME, Type.BYTES));
new Field(USER_DATA_KEY_NAME, Type.NULLABLE_BYTES));
public static final Schema TOPIC_ASSIGNMENT_V0 = new Schema(
new Field(TOPIC_KEY_NAME, Type.STRING),
new Field(PARTITIONS_KEY_NAME, new ArrayOf(Type.INT32)));
public static final Schema ASSIGNMENT_V0 = new Schema(
new Field(TOPIC_PARTITIONS_KEY_NAME, new ArrayOf(TOPIC_ASSIGNMENT_V0)),
new Field(USER_DATA_KEY_NAME, Type.BYTES));
new Field(USER_DATA_KEY_NAME, Type.NULLABLE_BYTES));
public static ByteBuffer serializeSubscription(PartitionAssignor.Subscription subscription) {
Struct struct = new Struct(SUBSCRIPTION_V0);

2
clients/src/main/java/org/apache/kafka/common/protocol/types/Struct.java

@ -51,6 +51,8 @@ public class Struct { @@ -51,6 +51,8 @@ public class Struct {
return value;
else if (field.defaultValue != Field.NO_DEFAULT)
return field.defaultValue;
else if (field.type.isNullable())
return null;
else
throw new SchemaException("Missing value for field '" + field.name + "' which has no default value.");
}

67
clients/src/main/java/org/apache/kafka/common/protocol/types/Type.java

@ -51,6 +51,14 @@ public abstract class Type { @@ -51,6 +51,14 @@ public abstract class Type {
*/
public abstract int sizeOf(Object o);
/**
* Check if the type supports null values
* @return whether or not null is a valid value for the type implementation
*/
public boolean isNullable() {
return false;
}
public static final Type INT8 = new Type() {
@Override
public void write(ByteBuffer buffer, Object o) {
@ -247,4 +255,63 @@ public abstract class Type { @@ -247,4 +255,63 @@ public abstract class Type {
}
};
public static final Type NULLABLE_BYTES = new Type() {
@Override
public boolean isNullable() {
return true;
}
@Override
public void write(ByteBuffer buffer, Object o) {
if (o == null) {
buffer.putInt(-1);
return;
}
ByteBuffer arg = (ByteBuffer) o;
int pos = arg.position();
buffer.putInt(arg.remaining());
buffer.put(arg);
arg.position(pos);
}
@Override
public Object read(ByteBuffer buffer) {
int size = buffer.getInt();
if (size < 0)
return null;
ByteBuffer val = buffer.slice();
val.limit(size);
buffer.position(buffer.position() + size);
return val;
}
@Override
public int sizeOf(Object o) {
if (o == null)
return 4;
ByteBuffer buffer = (ByteBuffer) o;
return 4 + buffer.remaining();
}
@Override
public String toString() {
return "NULLABLE_BYTES";
}
@Override
public ByteBuffer validate(Object item) {
if (item == null)
return null;
if (item instanceof ByteBuffer)
return (ByteBuffer) item;
throw new SchemaException(item + " is not a java.nio.ByteBuffer.");
}
};
}

18
clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerProtocolTest.java

@ -33,16 +33,25 @@ import java.util.List; @@ -33,16 +33,25 @@ import java.util.List;
import java.util.Set;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNull;
public class ConsumerProtocolTest {
@Test
public void serializeDeserializeMetadata() {
Subscription subscription = new Subscription(Arrays.asList("foo", "bar"));
ByteBuffer buffer = ConsumerProtocol.serializeSubscription(subscription);
Subscription parsedSubscription = ConsumerProtocol.deserializeSubscription(buffer);
assertEquals(subscription.topics(), parsedSubscription.topics());
}
@Test
public void serializeDeserializeNullSubscriptionUserData() {
Subscription subscription = new Subscription(Arrays.asList("foo", "bar"), null);
ByteBuffer buffer = ConsumerProtocol.serializeSubscription(subscription);
Subscription parsedSubscription = ConsumerProtocol.deserializeSubscription(buffer);
assertEquals(subscription.topics(), parsedSubscription.topics());
assertNull(subscription.userData());
}
@Test
@ -81,6 +90,15 @@ public class ConsumerProtocolTest { @@ -81,6 +90,15 @@ public class ConsumerProtocolTest {
assertEquals(toSet(partitions), toSet(parsedAssignment.partitions()));
}
@Test
public void deserializeNullAssignmentUserData() {
List<TopicPartition> partitions = Arrays.asList(new TopicPartition("foo", 0), new TopicPartition("bar", 2));
ByteBuffer buffer = ConsumerProtocol.serializeAssignment(new PartitionAssignor.Assignment(partitions, null));
PartitionAssignor.Assignment parsedAssignment = ConsumerProtocol.deserializeAssignment(buffer);
assertEquals(toSet(partitions), toSet(parsedAssignment.partitions()));
assertNull(parsedAssignment.userData());
}
@Test
public void deserializeNewAssignmentVersion() {
// verify that a new version which adds a field is still parseable

25
clients/src/test/java/org/apache/kafka/common/protocol/types/ProtocolSerializationTest.java

@ -39,14 +39,16 @@ public class ProtocolSerializationTest { @@ -39,14 +39,16 @@ public class ProtocolSerializationTest {
new Field("int64", Type.INT64),
new Field("string", Type.STRING),
new Field("bytes", Type.BYTES),
new Field("nullable_bytes", Type.NULLABLE_BYTES),
new Field("array", new ArrayOf(Type.INT32)),
new Field("struct", new Schema(new Field("field", Type.INT32))));
new Field("struct", new Schema(new Field("field", new ArrayOf(Type.INT32)))));
this.struct = new Struct(this.schema).set("int8", (byte) 1)
.set("int16", (short) 1)
.set("int32", 1)
.set("int64", 1L)
.set("string", "1")
.set("bytes", "1".getBytes())
.set("bytes", ByteBuffer.wrap("1".getBytes()))
.set("nullable_bytes", null)
.set("array", new Object[] {1});
this.struct.set("struct", this.struct.instance("struct").set("field", new Object[] {1, 2, 3}));
}
@ -62,6 +64,9 @@ public class ProtocolSerializationTest { @@ -62,6 +64,9 @@ public class ProtocolSerializationTest {
check(Type.STRING, "A\u00ea\u00f1\u00fcC");
check(Type.BYTES, ByteBuffer.allocate(0));
check(Type.BYTES, ByteBuffer.wrap("abcd".getBytes()));
check(Type.NULLABLE_BYTES, null);
check(Type.NULLABLE_BYTES, ByteBuffer.allocate(0));
check(Type.NULLABLE_BYTES, ByteBuffer.wrap("abcd".getBytes()));
check(new ArrayOf(Type.INT32), new Object[] {1, 2, 3, 4});
check(new ArrayOf(Type.STRING), new Object[] {});
check(new ArrayOf(Type.STRING), new Object[] {"hello", "there", "beautiful"});
@ -74,9 +79,11 @@ public class ProtocolSerializationTest { @@ -74,9 +79,11 @@ public class ProtocolSerializationTest {
try {
this.struct.set(f, null);
this.struct.validate();
fail("Should not allow serialization of null value.");
if (!f.type.isNullable())
fail("Should not allow serialization of null value.");
} catch (SchemaException e) {
// this is good
assertFalse(f.type.isNullable());
} finally {
this.struct.set(f, o);
}
}
@ -90,6 +97,16 @@ public class ProtocolSerializationTest { @@ -90,6 +97,16 @@ public class ProtocolSerializationTest {
struct.validate(); // should be valid even with missing value
}
@Test
public void testNullableDefault() {
// Should use default even if the field allows null values
ByteBuffer empty = ByteBuffer.allocate(0);
Schema schema = new Schema(new Field("field", Type.NULLABLE_BYTES, "doc", empty));
Struct struct = new Struct(schema);
assertEquals("Should get the default value", empty, struct.get("field"));
struct.validate(); // should be valid even with missing value
}
@Test
public void testArray() {
Type type = new ArrayOf(Type.INT8);

Loading…
Cancel
Save