Browse Source

KAFKA-14491: [8/N] Add serdes for ValueAndTimestamp with null value (#13249)

Introduces a new Serde, that serializes a value and timestamp as a single byte array, where the value may be null (in order to represent putting a tombstone with timestamp into the versioned store).

Part of KIP-889.

Reviewers: Matthias J. Sax <matthias@confluent.io>
pull/13220/merge
Victoria Xia 2 years ago committed by GitHub
parent
commit
dcaf95a35f
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 7
      gradle/spotbugs-exclude.xml
  2. 14
      streams/src/main/java/org/apache/kafka/streams/state/ValueAndTimestamp.java
  3. 105
      streams/src/main/java/org/apache/kafka/streams/state/internals/NullableValueAndTimestampDeserializer.java
  4. 89
      streams/src/main/java/org/apache/kafka/streams/state/internals/NullableValueAndTimestampSerde.java
  5. 92
      streams/src/main/java/org/apache/kafka/streams/state/internals/NullableValueAndTimestampSerializer.java
  6. 72
      streams/src/test/java/org/apache/kafka/streams/state/internals/NullableValueAndTimestampSerdeTest.java

7
gradle/spotbugs-exclude.xml

@ -488,6 +488,13 @@ For a detailed description of spotbugs bug categories, see https://spotbugs.read @@ -488,6 +488,13 @@ For a detailed description of spotbugs bug categories, see https://spotbugs.read
<!-- END Suppress warnings for unused members that are undetectably used by Jackson -->
<Match>
<!-- Boolean deserializer intentionally returns null on null input. -->
<Class name="org.apache.kafka.streams.state.internals.NullableValueAndTimestampSerde$BooleanSerde$BooleanDeserializer"/>
<Method name="deserialize"/>
<Bug pattern="NP_BOOLEAN_RETURN_NULL"/>
</Match>
<Match>
<!-- Suppress a warning about ignoring return value because this is intentional. -->
<Class name="org.apache.kafka.common.config.AbstractConfig$ResolvingMap"/>

14
streams/src/main/java/org/apache/kafka/streams/state/ValueAndTimestamp.java

@ -31,7 +31,6 @@ public final class ValueAndTimestamp<V> { @@ -31,7 +31,6 @@ public final class ValueAndTimestamp<V> {
private ValueAndTimestamp(final V value,
final long timestamp) {
Objects.requireNonNull(value);
this.value = value;
this.timestamp = timestamp;
}
@ -50,6 +49,19 @@ public final class ValueAndTimestamp<V> { @@ -50,6 +49,19 @@ public final class ValueAndTimestamp<V> {
return value == null ? null : new ValueAndTimestamp<>(value, timestamp);
}
/**
* Create a new {@link ValueAndTimestamp} instance. The provided {@code value} may be {@code null}.
*
* @param value the value
* @param timestamp the timestamp
* @param <V> the type of the value
* @return a new {@link ValueAndTimestamp} instance
*/
public static <V> ValueAndTimestamp<V> makeAllowNullable(
final V value, final long timestamp) {
return new ValueAndTimestamp<>(value, timestamp);
}
/**
* Return the wrapped {@code value} of the given {@code valueAndTimestamp} parameter
* if the parameter is not {@code null}.

105
streams/src/main/java/org/apache/kafka/streams/state/internals/NullableValueAndTimestampDeserializer.java

@ -0,0 +1,105 @@ @@ -0,0 +1,105 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.streams.state.internals;
import static org.apache.kafka.streams.kstream.internals.WrappingNullableUtils.initNullableDeserializer;
import static org.apache.kafka.streams.state.internals.NullableValueAndTimestampSerde.RAW_BOOLEAN_LENGTH;
import static org.apache.kafka.streams.state.internals.NullableValueAndTimestampSerde.RAW_TIMESTAMP_LENGTH;
import java.util.Map;
import java.util.Objects;
import org.apache.kafka.common.errors.SerializationException;
import org.apache.kafka.common.serialization.Deserializer;
import org.apache.kafka.common.serialization.LongDeserializer;
import org.apache.kafka.streams.kstream.internals.WrappingNullableDeserializer;
import org.apache.kafka.streams.processor.internals.SerdeGetter;
import org.apache.kafka.streams.state.ValueAndTimestamp;
import org.apache.kafka.streams.state.internals.NullableValueAndTimestampSerde.BooleanSerde.BooleanDeserializer;
/**
* See {@link NullableValueAndTimestampSerde}.
*/
public class NullableValueAndTimestampDeserializer<V> implements WrappingNullableDeserializer<ValueAndTimestamp<V>, Void, V> {
public final Deserializer<V> valueDeserializer;
private final Deserializer<Long> timestampDeserializer;
private final Deserializer<Boolean> booleanDeserializer;
NullableValueAndTimestampDeserializer(final Deserializer<V> valueDeserializer) {
this.valueDeserializer = Objects.requireNonNull(valueDeserializer);
timestampDeserializer = new LongDeserializer();
booleanDeserializer = new BooleanDeserializer();
}
@Override
public void configure(final Map<String, ?> configs, final boolean isKey) {
valueDeserializer.configure(configs, isKey);
timestampDeserializer.configure(configs, isKey);
booleanDeserializer.configure(configs, isKey);
}
@Override
public ValueAndTimestamp<V> deserialize(final String topic, final byte[] rawValueAndTimestamp) {
if (rawValueAndTimestamp == null) {
return null;
}
final long timestamp = timestampDeserializer.deserialize(topic, rawTimestamp(rawValueAndTimestamp));
final boolean isTombstone = booleanDeserializer.deserialize(topic, rawIsTombstone(rawValueAndTimestamp));
if (isTombstone) {
return ValueAndTimestamp.makeAllowNullable(null, timestamp);
} else {
final V value = valueDeserializer.deserialize(topic, rawValue(rawValueAndTimestamp));
if (value == null) {
throw new SerializationException("Deserializer cannot deserialize non-null bytes as null");
}
return ValueAndTimestamp.make(value, timestamp);
}
}
@Override
public void close() {
valueDeserializer.close();
timestampDeserializer.close();
booleanDeserializer.close();
}
@Override
public void setIfUnset(final SerdeGetter getter) {
// NullableValueAndTimestampDeserializer never wraps a null deserializer (or configure would throw),
// but it may wrap a deserializer that itself wraps a null deserializer.
initNullableDeserializer(valueDeserializer, getter);
}
private static byte[] rawTimestamp(final byte[] rawValueAndTimestamp) {
final byte[] rawTimestamp = new byte[RAW_TIMESTAMP_LENGTH];
System.arraycopy(rawValueAndTimestamp, 0, rawTimestamp, 0, RAW_TIMESTAMP_LENGTH);
return rawTimestamp;
}
private static byte[] rawIsTombstone(final byte[] rawValueAndTimestamp) {
final byte[] rawIsTombstone = new byte[RAW_BOOLEAN_LENGTH];
System.arraycopy(rawValueAndTimestamp, RAW_TIMESTAMP_LENGTH, rawIsTombstone, 0, RAW_BOOLEAN_LENGTH);
return rawIsTombstone;
}
private static byte[] rawValue(final byte[] rawValueAndTimestamp) {
final int rawValueLength = rawValueAndTimestamp.length - RAW_TIMESTAMP_LENGTH - RAW_BOOLEAN_LENGTH;
final byte[] rawValue = new byte[rawValueLength];
System.arraycopy(rawValueAndTimestamp, RAW_TIMESTAMP_LENGTH + RAW_BOOLEAN_LENGTH, rawValue, 0, rawValueLength);
return rawValue;
}
}

89
streams/src/main/java/org/apache/kafka/streams/state/internals/NullableValueAndTimestampSerde.java

@ -0,0 +1,89 @@ @@ -0,0 +1,89 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.streams.state.internals;
import static java.util.Objects.requireNonNull;
import org.apache.kafka.common.errors.SerializationException;
import org.apache.kafka.common.serialization.Deserializer;
import org.apache.kafka.common.serialization.Serde;
import org.apache.kafka.common.serialization.Serializer;
import org.apache.kafka.streams.kstream.internals.WrappingNullableSerde;
import org.apache.kafka.streams.state.ValueAndTimestamp;
/**
* Similar to {@link ValueAndTimestampSerde} but this serde additionally supports (de)serializing
* {@link ValueAndTimestamp} instances for which the {@code value} is {@code null}.
* <p>
* The serialized format is:
* <pre>
* <timestamp> + <bool indicating whether value is null> + <raw value>
* </pre>
* where the boolean is needed in order to distinguish between null and empty values (i.e., between
* tombstones and {@code byte[0]} values).
*/
public class NullableValueAndTimestampSerde<V> extends WrappingNullableSerde<ValueAndTimestamp<V>, Void, V> {
static final int RAW_TIMESTAMP_LENGTH = 8;
static final int RAW_BOOLEAN_LENGTH = 1;
public NullableValueAndTimestampSerde(final Serde<V> valueSerde) {
super(
new NullableValueAndTimestampSerializer<>(requireNonNull(valueSerde).serializer()),
new NullableValueAndTimestampDeserializer<>(requireNonNull(valueSerde).deserializer())
);
}
static final class BooleanSerde {
private static final byte TRUE = 0x01;
private static final byte FALSE = 0x00;
static class BooleanSerializer implements Serializer<Boolean> {
@Override
public byte[] serialize(final String topic, final Boolean data) {
if (data == null) {
return null;
}
return new byte[] {
data ? TRUE : FALSE
};
}
}
static class BooleanDeserializer implements Deserializer<Boolean> {
@Override
public Boolean deserialize(final String topic, final byte[] data) {
if (data == null) {
return null;
}
if (data.length != 1) {
throw new SerializationException("Size of data received by BooleanDeserializer is not 1");
}
if (data[0] == TRUE) {
return true;
} else if (data[0] == FALSE) {
return false;
} else {
throw new SerializationException("Unexpected byte received by BooleanDeserializer: " + data[0]);
}
}
}
}
}

92
streams/src/main/java/org/apache/kafka/streams/state/internals/NullableValueAndTimestampSerializer.java

@ -0,0 +1,92 @@ @@ -0,0 +1,92 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.streams.state.internals;
import static org.apache.kafka.streams.kstream.internals.WrappingNullableUtils.initNullableSerializer;
import static org.apache.kafka.streams.state.internals.NullableValueAndTimestampSerde.RAW_BOOLEAN_LENGTH;
import static org.apache.kafka.streams.state.internals.NullableValueAndTimestampSerde.RAW_TIMESTAMP_LENGTH;
import java.nio.ByteBuffer;
import java.util.Map;
import java.util.Objects;
import org.apache.kafka.common.errors.SerializationException;
import org.apache.kafka.common.serialization.LongSerializer;
import org.apache.kafka.common.serialization.Serializer;
import org.apache.kafka.streams.kstream.internals.WrappingNullableSerializer;
import org.apache.kafka.streams.processor.internals.SerdeGetter;
import org.apache.kafka.streams.state.ValueAndTimestamp;
import org.apache.kafka.streams.state.internals.NullableValueAndTimestampSerde.BooleanSerde.BooleanSerializer;
/**
* See {@link NullableValueAndTimestampSerde}.
*/
public class NullableValueAndTimestampSerializer<V> implements WrappingNullableSerializer<ValueAndTimestamp<V>, Void, V> {
public final Serializer<V> valueSerializer;
private final Serializer<Long> timestampSerializer;
private final Serializer<Boolean> booleanSerializer;
NullableValueAndTimestampSerializer(final Serializer<V> valueSerializer) {
this.valueSerializer = Objects.requireNonNull(valueSerializer);
timestampSerializer = new LongSerializer();
booleanSerializer = new BooleanSerializer();
}
@Override
public void configure(final Map<String, ?> configs, final boolean isKey) {
valueSerializer.configure(configs, isKey);
timestampSerializer.configure(configs, isKey);
booleanSerializer.configure(configs, isKey);
}
@Override
public byte[] serialize(final String topic, final ValueAndTimestamp<V> data) {
if (data == null) {
return null;
}
final byte[] rawValue = valueSerializer.serialize(topic, data.value());
final byte[] rawIsTombstone = booleanSerializer.serialize(topic, rawValue == null);
final byte[] rawTimestamp = timestampSerializer.serialize(topic, data.timestamp());
if (rawIsTombstone.length != RAW_BOOLEAN_LENGTH) {
throw new SerializationException("Unexpected length for serialized boolean: " + rawIsTombstone.length);
}
if (rawTimestamp.length != RAW_TIMESTAMP_LENGTH) {
throw new SerializationException("Unexpected length for serialized timestamp: " + rawTimestamp.length);
}
final byte[] nonNullRawValue = rawValue == null ? new byte[0] : rawValue;
return ByteBuffer
.allocate(RAW_TIMESTAMP_LENGTH + RAW_BOOLEAN_LENGTH + nonNullRawValue.length)
.put(rawTimestamp)
.put(rawIsTombstone)
.put(nonNullRawValue)
.array();
}
@Override
public void close() {
valueSerializer.close();
timestampSerializer.close();
booleanSerializer.close();
}
@Override
public void setIfUnset(final SerdeGetter getter) {
// NullableValueAndTimestampSerializer never wraps a null serializer (or configure would throw),
// but it may wrap a serializer that itself wraps a null serializer.
initNullableSerializer(valueSerializer, getter);
}
}

72
streams/src/test/java/org/apache/kafka/streams/state/internals/NullableValueAndTimestampSerdeTest.java

@ -0,0 +1,72 @@ @@ -0,0 +1,72 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.streams.state.internals;
import org.apache.kafka.common.serialization.Deserializer;
import org.apache.kafka.common.serialization.Serdes.StringSerde;
import org.apache.kafka.common.serialization.Serializer;
import org.apache.kafka.streams.state.ValueAndTimestamp;
import org.junit.Test;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.is;
import static org.hamcrest.Matchers.notNullValue;
import static org.hamcrest.Matchers.nullValue;
public class NullableValueAndTimestampSerdeTest {
private final static NullableValueAndTimestampSerde<String> SERDE = new NullableValueAndTimestampSerde<>(new StringSerde());
private final static Serializer<ValueAndTimestamp<String>> SERIALIZER = SERDE.serializer();
private final static Deserializer<ValueAndTimestamp<String>> DESERIALIZER = SERDE.deserializer();
@Test
public void shouldSerdeNull() {
assertThat(SERIALIZER.serialize(null, null), is(nullValue()));
assertThat(DESERIALIZER.deserialize(null, null), is(nullValue()));
}
@Test
public void shouldSerdeNonNull() {
final ValueAndTimestamp<String> valueAndTimestamp = ValueAndTimestamp.make("foo", 10L);
final byte[] rawValueAndTimestamp = SERIALIZER.serialize(null, valueAndTimestamp);
assertThat(rawValueAndTimestamp, is(notNullValue()));
assertThat(DESERIALIZER.deserialize(null, rawValueAndTimestamp), is(valueAndTimestamp));
}
@Test
public void shouldSerdeNonNullWithNullValue() {
final ValueAndTimestamp<String> valueAndTimestamp = ValueAndTimestamp.makeAllowNullable(null, 10L);
final byte[] rawValueAndTimestamp = SERIALIZER.serialize(null, valueAndTimestamp);
assertThat(rawValueAndTimestamp, is(notNullValue()));
assertThat(DESERIALIZER.deserialize(null, rawValueAndTimestamp), is(valueAndTimestamp));
}
@Test
public void shouldSerializeNonNullWithEmptyBytes() {
// empty string serializes to empty bytes
final ValueAndTimestamp<String> valueAndTimestamp = ValueAndTimestamp.make("", 10L);
final byte[] rawValueAndTimestamp = SERIALIZER.serialize(null, valueAndTimestamp);
assertThat(rawValueAndTimestamp, is(notNullValue()));
assertThat(DESERIALIZER.deserialize(null, rawValueAndTimestamp), is(valueAndTimestamp));
}
}
Loading…
Cancel
Save