Browse Source

KAFKA-10173: Fix suppress changelog binary schema compatibility (#8905)

We inadvertently changed the binary schema of the suppress buffer changelog
in 2.4.0 without bumping the schema version number. As a result, it is impossible
to upgrade from 2.3.x to 2.4+ if you are using suppression.

* Refactor the schema compatibility test to use serialized data from older versions
as a more foolproof compatibility test.
* Refactor the upgrade system test to use the smoke test application so that we
actually exercise a significant portion of the Streams API during upgrade testing
* Add more recent versions to the upgrade system test matrix
* Fix the compatibility bug by bumping the schema version to 3

Reviewers: Chia-Ping Tsai <chia7712@gmail.com>, Guozhang Wang <wangguoz@gmail.com>
pull/8939/head
John Roesler 4 years ago committed by GitHub
parent
commit
831938952e
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 32
      clients/src/main/java/org/apache/kafka/common/utils/Utils.java
  2. 72
      clients/src/test/java/org/apache/kafka/common/utils/UtilsTest.java
  3. 43
      streams/src/main/java/org/apache/kafka/streams/kstream/internals/FullChangeSerde.java
  4. 23
      streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorRecordContext.java
  5. 27
      streams/src/main/java/org/apache/kafka/streams/state/internals/BufferValue.java
  6. 32
      streams/src/main/java/org/apache/kafka/streams/state/internals/ContextualRecord.java
  7. 186
      streams/src/main/java/org/apache/kafka/streams/state/internals/InMemoryTimeOrderedKeyValueBuffer.java
  8. 158
      streams/src/main/java/org/apache/kafka/streams/state/internals/TimeOrderedKeyValueBufferChangelogDeserializationHelper.java
  9. 39
      streams/src/test/java/org/apache/kafka/streams/kstream/internals/FullChangeSerdeTest.java
  10. 366
      streams/src/test/java/org/apache/kafka/streams/state/internals/TimeOrderedKeyValueBufferTest.java

32
clients/src/main/java/org/apache/kafka/common/utils/Utils.java

@ -16,6 +16,7 @@ @@ -16,6 +16,7 @@
*/
package org.apache.kafka.common.utils;
import java.nio.BufferUnderflowException;
import java.util.EnumSet;
import java.util.SortedSet;
import java.util.TreeSet;
@ -284,6 +285,37 @@ public final class Utils { @@ -284,6 +285,37 @@ public final class Utils {
return dest;
}
/**
* Starting from the current position, read an integer indicating the size of the byte array to read,
* then read the array. Consumes the buffer: upon returning, the buffer's position is after the array
* that is returned.
* @param buffer The buffer to read a size-prefixed array from
* @return The array
*/
public static byte[] getNullableSizePrefixedArray(final ByteBuffer buffer) {
final int size = buffer.getInt();
return getNullableArray(buffer, size);
}
/**
* Read a byte array of the given size. Consumes the buffer: upon returning, the buffer's position
* is after the array that is returned.
* @param buffer The buffer to read a size-prefixed array from
* @param size The number of bytes to read out of the buffer
* @return The array
*/
public static byte[] getNullableArray(final ByteBuffer buffer, final int size) {
if (size > buffer.remaining()) {
// preemptively throw this when the read is doomed to fail, so we don't have to allocate the array.
throw new BufferUnderflowException();
}
final byte[] oldBytes = size == -1 ? null : new byte[size];
if (oldBytes != null) {
buffer.get(oldBytes);
}
return oldBytes;
}
/**
* Returns a copy of src byte array
* @param src The byte array to copy

72
clients/src/test/java/org/apache/kafka/common/utils/UtilsTest.java

@ -26,6 +26,7 @@ import java.io.DataOutputStream; @@ -26,6 +26,7 @@ import java.io.DataOutputStream;
import java.io.EOFException;
import java.io.File;
import java.io.IOException;
import java.nio.BufferUnderflowException;
import java.nio.ByteBuffer;
import java.nio.channels.FileChannel;
import java.nio.charset.StandardCharsets;
@ -63,6 +64,7 @@ import static org.hamcrest.MatcherAssert.assertThat; @@ -63,6 +64,7 @@ import static org.hamcrest.MatcherAssert.assertThat;
import static org.junit.Assert.assertArrayEquals;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertNull;
import static org.junit.Assert.assertThrows;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
@ -83,7 +85,7 @@ public class UtilsTest { @@ -83,7 +85,7 @@ public class UtilsTest {
cases.put("a-little-bit-long-string".getBytes(), -985981536);
cases.put("a-little-bit-longer-string".getBytes(), -1486304829);
cases.put("lkjh234lh9fiuh90y23oiuhsafujhadof229phr9h19h89h8".getBytes(), -58897971);
cases.put(new byte[]{'a', 'b', 'c'}, 479470107);
cases.put(new byte[] {'a', 'b', 'c'}, 479470107);
for (Map.Entry<byte[], Integer> c : cases.entrySet()) {
assertEquals(c.getValue().intValue(), murmur2(c.getKey()));
@ -216,6 +218,65 @@ public class UtilsTest { @@ -216,6 +218,65 @@ public class UtilsTest {
assertEquals(2, buffer.position());
}
@Test
public void getNullableSizePrefixedArrayExact() {
byte[] input = {0, 0, 0, 2, 1, 0};
final ByteBuffer buffer = ByteBuffer.wrap(input);
final byte[] array = Utils.getNullableSizePrefixedArray(buffer);
assertArrayEquals(new byte[] {1, 0}, array);
assertEquals(6, buffer.position());
assertFalse(buffer.hasRemaining());
}
@Test
public void getNullableSizePrefixedArrayExactEmpty() {
byte[] input = {0, 0, 0, 0};
final ByteBuffer buffer = ByteBuffer.wrap(input);
final byte[] array = Utils.getNullableSizePrefixedArray(buffer);
assertArrayEquals(new byte[] {}, array);
assertEquals(4, buffer.position());
assertFalse(buffer.hasRemaining());
}
@Test
public void getNullableSizePrefixedArrayRemainder() {
byte[] input = {0, 0, 0, 2, 1, 0, 9};
final ByteBuffer buffer = ByteBuffer.wrap(input);
final byte[] array = Utils.getNullableSizePrefixedArray(buffer);
assertArrayEquals(new byte[] {1, 0}, array);
assertEquals(6, buffer.position());
assertTrue(buffer.hasRemaining());
}
@Test
public void getNullableSizePrefixedArrayNull() {
// -1
byte[] input = {-1, -1, -1, -1};
final ByteBuffer buffer = ByteBuffer.wrap(input);
final byte[] array = Utils.getNullableSizePrefixedArray(buffer);
assertNull(array);
assertEquals(4, buffer.position());
assertFalse(buffer.hasRemaining());
}
@Test
public void getNullableSizePrefixedArrayInvalid() {
// -2
byte[] input = {-1, -1, -1, -2};
final ByteBuffer buffer = ByteBuffer.wrap(input);
assertThrows(NegativeArraySizeException.class, () -> Utils.getNullableSizePrefixedArray(buffer));
}
@Test
public void getNullableSizePrefixedArrayUnderflow() {
// Integer.MAX_VALUE
byte[] input = {127, -1, -1, -1};
final ByteBuffer buffer = ByteBuffer.wrap(input);
// note, we get a buffer underflow exception instead of an OOME, even though the encoded size
// would be 2,147,483,647 aka 2.1 GB, probably larger than the available heap
assertThrows(BufferUnderflowException.class, () -> Utils.getNullableSizePrefixedArray(buffer));
}
@Test
public void utf8ByteArraySerde() {
String utf8String = "A\u00ea\u00f1\u00fcC";
@ -427,7 +488,7 @@ public class UtilsTest { @@ -427,7 +488,7 @@ public class UtilsTest {
String expectedBufferContent = fileChannelMockExpectReadWithRandomBytes(channelMock, bufferSize);
Utils.readFullyOrFail(channelMock, buffer, 0L, "test");
assertEquals("The buffer should be populated correctly", expectedBufferContent,
new String(buffer.array()));
new String(buffer.array()));
assertFalse("The buffer should be filled", buffer.hasRemaining());
verify(channelMock, atLeastOnce()).read(any(), anyLong());
}
@ -444,7 +505,7 @@ public class UtilsTest { @@ -444,7 +505,7 @@ public class UtilsTest {
ByteBuffer buffer = ByteBuffer.allocate(bufferSize);
Utils.readFully(channelMock, buffer, 0L);
assertEquals("The buffer should be populated correctly.", expectedBufferContent,
new String(buffer.array()));
new String(buffer.array()));
assertFalse("The buffer should be filled", buffer.hasRemaining());
verify(channelMock, atLeastOnce()).read(any(), anyLong());
}
@ -493,7 +554,7 @@ public class UtilsTest { @@ -493,7 +554,7 @@ public class UtilsTest {
*
* @param channelMock The mocked FileChannel object
* @param bufferSize The buffer size
* @return Expected buffer string
* @return Expected buffer string
* @throws IOException If an I/O error occurs
*/
private String fileChannelMockExpectReadWithRandomBytes(final FileChannel channelMock,
@ -530,8 +591,9 @@ public class UtilsTest { @@ -530,8 +591,9 @@ public class UtilsTest {
@Override
public void close() throws IOException {
closed = true;
if (closeException != null)
if (closeException != null) {
throw closeException;
}
}
static TestCloseable[] createCloseables(boolean... exceptionOnClose) {

43
streams/src/main/java/org/apache/kafka/streams/kstream/internals/FullChangeSerde.java

@ -23,6 +23,7 @@ import org.apache.kafka.common.serialization.Serializer; @@ -23,6 +23,7 @@ import org.apache.kafka.common.serialization.Serializer;
import java.nio.ByteBuffer;
import static java.util.Objects.requireNonNull;
import static org.apache.kafka.common.utils.Utils.getNullableSizePrefixedArray;
public final class FullChangeSerde<T> {
private final Serde<T> inner;
@ -68,33 +69,6 @@ public final class FullChangeSerde<T> { @@ -68,33 +69,6 @@ public final class FullChangeSerde<T> {
return new Change<>(newValue, oldValue);
}
/**
* We used to serialize a Change into a single byte[]. Now, we don't anymore, but we still keep this logic here
* so that we can produce the legacy format to test that we can still deserialize it.
*/
public static byte[] mergeChangeArraysIntoSingleLegacyFormattedArray(final Change<byte[]> serialChange) {
if (serialChange == null) {
return null;
}
final int oldSize = serialChange.oldValue == null ? -1 : serialChange.oldValue.length;
final int newSize = serialChange.newValue == null ? -1 : serialChange.newValue.length;
final ByteBuffer buffer = ByteBuffer.allocate(Integer.BYTES * 2 + Math.max(0, oldSize) + Math.max(0, newSize));
buffer.putInt(oldSize);
if (serialChange.oldValue != null) {
buffer.put(serialChange.oldValue);
}
buffer.putInt(newSize);
if (serialChange.newValue != null) {
buffer.put(serialChange.newValue);
}
return buffer.array();
}
/**
* We used to serialize a Change into a single byte[]. Now, we don't anymore, but we still
* need to be able to read it (so that we can load the state store from previously-written changelog records).
@ -104,19 +78,8 @@ public final class FullChangeSerde<T> { @@ -104,19 +78,8 @@ public final class FullChangeSerde<T> {
return null;
}
final ByteBuffer buffer = ByteBuffer.wrap(data);
final int oldSize = buffer.getInt();
final byte[] oldBytes = oldSize == -1 ? null : new byte[oldSize];
if (oldBytes != null) {
buffer.get(oldBytes);
}
final int newSize = buffer.getInt();
final byte[] newBytes = newSize == -1 ? null : new byte[newSize];
if (newBytes != null) {
buffer.get(newBytes);
}
final byte[] oldBytes = getNullableSizePrefixedArray(buffer);
final byte[] newBytes = getNullableSizePrefixedArray(buffer);
return new Change<>(newBytes, oldBytes);
}

23
streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorRecordContext.java

@ -26,6 +26,8 @@ import java.nio.ByteBuffer; @@ -26,6 +26,8 @@ import java.nio.ByteBuffer;
import java.util.Objects;
import static java.nio.charset.StandardCharsets.UTF_8;
import static java.util.Objects.requireNonNull;
import static org.apache.kafka.common.utils.Utils.getNullableSizePrefixedArray;
public class ProcessorRecordContext implements RecordContext {
@ -161,12 +163,10 @@ public class ProcessorRecordContext implements RecordContext { @@ -161,12 +163,10 @@ public class ProcessorRecordContext implements RecordContext {
public static ProcessorRecordContext deserialize(final ByteBuffer buffer) {
final long timestamp = buffer.getLong();
final long offset = buffer.getLong();
final int topicSize = buffer.getInt();
final String topic;
{
// not handling the null topic condition, because we believe the topic will never be null when we serialize
final byte[] topicBytes = new byte[topicSize];
buffer.get(topicBytes);
// we believe the topic will never be null when we serialize
final byte[] topicBytes = requireNonNull(getNullableSizePrefixedArray(buffer));
topic = new String(topicBytes, UTF_8);
}
final int partition = buffer.getInt();
@ -177,19 +177,8 @@ public class ProcessorRecordContext implements RecordContext { @@ -177,19 +177,8 @@ public class ProcessorRecordContext implements RecordContext {
} else {
final Header[] headerArr = new Header[headerCount];
for (int i = 0; i < headerCount; i++) {
final int keySize = buffer.getInt();
final byte[] keyBytes = new byte[keySize];
buffer.get(keyBytes);
final int valueSize = buffer.getInt();
final byte[] valueBytes;
if (valueSize == -1) {
valueBytes = null;
} else {
valueBytes = new byte[valueSize];
buffer.get(valueBytes);
}
final byte[] keyBytes = requireNonNull(getNullableSizePrefixedArray(buffer));
final byte[] valueBytes = getNullableSizePrefixedArray(buffer);
headerArr[i] = new RecordHeader(new String(keyBytes, UTF_8), valueBytes);
}
headers = new RecordHeaders(headerArr);

27
streams/src/main/java/org/apache/kafka/streams/state/internals/BufferValue.java

@ -22,6 +22,9 @@ import java.nio.ByteBuffer; @@ -22,6 +22,9 @@ import java.nio.ByteBuffer;
import java.util.Arrays;
import java.util.Objects;
import static org.apache.kafka.common.utils.Utils.getNullableArray;
import static org.apache.kafka.common.utils.Utils.getNullableSizePrefixedArray;
public final class BufferValue {
private static final int NULL_VALUE_SENTINEL = -1;
private static final int OLD_PREV_DUPLICATE_VALUE_SENTINEL = -2;
@ -67,35 +70,21 @@ public final class BufferValue { @@ -67,35 +70,21 @@ public final class BufferValue {
static BufferValue deserialize(final ByteBuffer buffer) {
final ProcessorRecordContext context = ProcessorRecordContext.deserialize(buffer);
final byte[] priorValue = extractValue(buffer);
final byte[] priorValue = getNullableSizePrefixedArray(buffer);
final byte[] oldValue;
final int oldValueLength = buffer.getInt();
if (oldValueLength == NULL_VALUE_SENTINEL) {
oldValue = null;
} else if (oldValueLength == OLD_PREV_DUPLICATE_VALUE_SENTINEL) {
if (oldValueLength == OLD_PREV_DUPLICATE_VALUE_SENTINEL) {
oldValue = priorValue;
} else {
oldValue = new byte[oldValueLength];
buffer.get(oldValue);
oldValue = getNullableArray(buffer, oldValueLength);
}
final byte[] newValue = extractValue(buffer);
final byte[] newValue = getNullableSizePrefixedArray(buffer);
return new BufferValue(priorValue, oldValue, newValue, context);
}
private static byte[] extractValue(final ByteBuffer buffer) {
final int valueLength = buffer.getInt();
if (valueLength == NULL_VALUE_SENTINEL) {
return null;
} else {
final byte[] value = new byte[valueLength];
buffer.get(value);
return value;
}
}
ByteBuffer serialize(final int endPadding) {
final int sizeOfValueLength = Integer.BYTES;
@ -120,7 +109,7 @@ public final class BufferValue { @@ -120,7 +109,7 @@ public final class BufferValue {
if (oldValue == null) {
buffer.putInt(NULL_VALUE_SENTINEL);
} else if (priorValue == oldValue) {
} else if (Arrays.equals(priorValue, oldValue)) {
buffer.putInt(OLD_PREV_DUPLICATE_VALUE_SENTINEL);
} else {
buffer.putInt(sizeOfOldValue);

32
streams/src/main/java/org/apache/kafka/streams/state/internals/ContextualRecord.java

@ -22,6 +22,8 @@ import java.nio.ByteBuffer; @@ -22,6 +22,8 @@ import java.nio.ByteBuffer;
import java.util.Arrays;
import java.util.Objects;
import static org.apache.kafka.common.utils.Utils.getNullableSizePrefixedArray;
public class ContextualRecord {
private final byte[] value;
private final ProcessorRecordContext recordContext;
@ -43,36 +45,10 @@ public class ContextualRecord { @@ -43,36 +45,10 @@ public class ContextualRecord {
return (value == null ? 0 : value.length) + recordContext.residentMemorySizeEstimate();
}
ByteBuffer serialize(final int endPadding) {
final byte[] serializedContext = recordContext.serialize();
final int sizeOfContext = serializedContext.length;
final int sizeOfValueLength = Integer.BYTES;
final int sizeOfValue = value == null ? 0 : value.length;
final ByteBuffer buffer = ByteBuffer.allocate(sizeOfContext + sizeOfValueLength + sizeOfValue + endPadding);
buffer.put(serializedContext);
if (value == null) {
buffer.putInt(-1);
} else {
buffer.putInt(value.length);
buffer.put(value);
}
return buffer;
}
static ContextualRecord deserialize(final ByteBuffer buffer) {
final ProcessorRecordContext context = ProcessorRecordContext.deserialize(buffer);
final int valueLength = buffer.getInt();
if (valueLength == -1) {
return new ContextualRecord(null, context);
} else {
final byte[] value = new byte[valueLength];
buffer.get(value);
return new ContextualRecord(value, context);
}
final byte[] value = getNullableSizePrefixedArray(buffer);
return new ContextualRecord(value, context);
}
@Override

186
streams/src/main/java/org/apache/kafka/streams/state/internals/InMemoryTimeOrderedKeyValueBuffer.java

@ -38,9 +38,11 @@ import org.apache.kafka.streams.processor.internals.RecordQueue; @@ -38,9 +38,11 @@ import org.apache.kafka.streams.processor.internals.RecordQueue;
import org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl;
import org.apache.kafka.streams.state.StoreBuilder;
import org.apache.kafka.streams.state.ValueAndTimestamp;
import org.apache.kafka.streams.state.internals.TimeOrderedKeyValueBufferChangelogDeserializationHelper.DeserializationResult;
import org.apache.kafka.streams.state.internals.metrics.StateStoreMetrics;
import java.nio.ByteBuffer;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
@ -54,14 +56,19 @@ import java.util.function.Consumer; @@ -54,14 +56,19 @@ import java.util.function.Consumer;
import java.util.function.Supplier;
import static java.util.Objects.requireNonNull;
import static org.apache.kafka.streams.state.internals.TimeOrderedKeyValueBufferChangelogDeserializationHelper.deserializeV0;
import static org.apache.kafka.streams.state.internals.TimeOrderedKeyValueBufferChangelogDeserializationHelper.deserializeV1;
import static org.apache.kafka.streams.state.internals.TimeOrderedKeyValueBufferChangelogDeserializationHelper.deserializeV3;
import static org.apache.kafka.streams.state.internals.TimeOrderedKeyValueBufferChangelogDeserializationHelper.duckTypeV2;
public final class InMemoryTimeOrderedKeyValueBuffer<K, V> implements TimeOrderedKeyValueBuffer<K, V> {
private static final BytesSerializer KEY_SERIALIZER = new BytesSerializer();
private static final ByteArraySerializer VALUE_SERIALIZER = new ByteArraySerializer();
private static final RecordHeaders V_1_CHANGELOG_HEADERS =
new RecordHeaders(new Header[] {new RecordHeader("v", new byte[] {(byte) 1})});
private static final RecordHeaders V_2_CHANGELOG_HEADERS =
new RecordHeaders(new Header[] {new RecordHeader("v", new byte[] {(byte) 2})});
private static final byte[] V_1_CHANGELOG_HEADER_VALUE = {(byte) 1};
private static final byte[] V_2_CHANGELOG_HEADER_VALUE = {(byte) 2};
private static final byte[] V_3_CHANGELOG_HEADER_VALUE = {(byte) 3};
static final RecordHeaders CHANGELOG_HEADERS =
new RecordHeaders(new Header[] {new RecordHeader("v", V_3_CHANGELOG_HEADER_VALUE)});
private static final String METRIC_SCOPE = "in-memory-suppression";
private final Map<Bytes, BufferKey> index = new HashMap<>();
@ -258,34 +265,43 @@ public final class InMemoryTimeOrderedKeyValueBuffer<K, V> implements TimeOrdere @@ -258,34 +265,43 @@ public final class InMemoryTimeOrderedKeyValueBuffer<K, V> implements TimeOrdere
final int sizeOfBufferTime = Long.BYTES;
final ByteBuffer buffer = value.serialize(sizeOfBufferTime);
buffer.putLong(bufferKey.time());
final byte[] array = buffer.array();
((RecordCollector.Supplier) context).recordCollector().send(
changelogTopic,
key,
buffer.array(),
V_2_CHANGELOG_HEADERS,
partition,
null,
KEY_SERIALIZER,
VALUE_SERIALIZER
changelogTopic,
key,
array,
CHANGELOG_HEADERS,
partition,
null,
KEY_SERIALIZER,
VALUE_SERIALIZER
);
}
private void logTombstone(final Bytes key) {
((RecordCollector.Supplier) context).recordCollector().send(
changelogTopic,
key,
null,
null,
partition,
null,
KEY_SERIALIZER,
VALUE_SERIALIZER
changelogTopic,
key,
null,
null,
partition,
null,
KEY_SERIALIZER,
VALUE_SERIALIZER
);
}
private void restoreBatch(final Collection<ConsumerRecord<byte[], byte[]>> batch) {
for (final ConsumerRecord<byte[], byte[]> record : batch) {
if (record.partition() != partition) {
throw new IllegalStateException(
String.format(
"record partition [%d] is being restored by the wrong suppress partition [%d]",
record.partition(),
partition
)
);
}
final Bytes key = Bytes.wrap(record.key());
if (record.value() == null) {
// This was a tombstone. Delete the record.
@ -299,92 +315,63 @@ public final class InMemoryTimeOrderedKeyValueBuffer<K, V> implements TimeOrdere @@ -299,92 +315,63 @@ public final class InMemoryTimeOrderedKeyValueBuffer<K, V> implements TimeOrdere
minTimestamp = sortedMap.isEmpty() ? Long.MAX_VALUE : sortedMap.firstKey().time();
}
}
if (record.partition() != partition) {
throw new IllegalStateException(
String.format(
"record partition [%d] is being restored by the wrong suppress partition [%d]",
record.partition(),
partition
)
);
}
} else {
if (record.headers().lastHeader("v") == null) {
// in this case, the changelog value is just the serialized record value
final ByteBuffer timeAndValue = ByteBuffer.wrap(record.value());
final long time = timeAndValue.getLong();
final byte[] changelogValue = new byte[record.value().length - 8];
timeAndValue.get(changelogValue);
final Change<byte[]> change = requireNonNull(FullChangeSerde.decomposeLegacyFormattedArrayIntoChangeArrays(changelogValue));
final ProcessorRecordContext recordContext = new ProcessorRecordContext(
record.timestamp(),
record.offset(),
record.partition(),
record.topic(),
record.headers()
);
cleanPut(
time,
key,
new BufferValue(
index.containsKey(key)
? internalPriorValueForBuffered(key)
: change.oldValue,
change.oldValue,
change.newValue,
recordContext
)
);
} else if (V_1_CHANGELOG_HEADERS.lastHeader("v").equals(record.headers().lastHeader("v"))) {
// in this case, the changelog value is a serialized ContextualRecord
final ByteBuffer timeAndValue = ByteBuffer.wrap(record.value());
final long time = timeAndValue.getLong();
final byte[] changelogValue = new byte[record.value().length - 8];
timeAndValue.get(changelogValue);
final ContextualRecord contextualRecord = ContextualRecord.deserialize(ByteBuffer.wrap(changelogValue));
final Change<byte[]> change = requireNonNull(FullChangeSerde.decomposeLegacyFormattedArrayIntoChangeArrays(contextualRecord.value()));
cleanPut(
time,
key,
new BufferValue(
index.containsKey(key)
? internalPriorValueForBuffered(key)
: change.oldValue,
change.oldValue,
change.newValue,
contextualRecord.recordContext()
)
);
} else if (V_2_CHANGELOG_HEADERS.lastHeader("v").equals(record.headers().lastHeader("v"))) {
// in this case, the changelog value is a serialized BufferValue
final ByteBuffer valueAndTime = ByteBuffer.wrap(record.value());
final BufferValue bufferValue = BufferValue.deserialize(valueAndTime);
final long time = valueAndTime.getLong();
cleanPut(time, key, bufferValue);
final Header versionHeader = record.headers().lastHeader("v");
if (versionHeader == null) {
// Version 0:
// value:
// - buffer time
// - old value
// - new value
final byte[] previousBufferedValue = index.containsKey(key)
? internalPriorValueForBuffered(key)
: null;
final DeserializationResult deserializationResult = deserializeV0(record, key, previousBufferedValue);
cleanPut(deserializationResult.time(), deserializationResult.key(), deserializationResult.bufferValue());
} else if (Arrays.equals(versionHeader.value(), V_3_CHANGELOG_HEADER_VALUE)) {
// Version 3:
// value:
// - record context
// - prior value
// - old value
// - new value
// - buffer time
final DeserializationResult deserializationResult = deserializeV3(record, key);
cleanPut(deserializationResult.time(), deserializationResult.key(), deserializationResult.bufferValue());
} else if (Arrays.equals(versionHeader.value(), V_2_CHANGELOG_HEADER_VALUE)) {
// Version 2:
// value:
// - record context
// - old value
// - new value
// - prior value
// - buffer time
// NOTE: 2.4.0, 2.4.1, and 2.5.0 actually encode Version 3 formatted data,
// but still set the Version 2 flag, so to deserialize, we have to duck type.
final DeserializationResult deserializationResult = duckTypeV2(record, key);
cleanPut(deserializationResult.time(), deserializationResult.key(), deserializationResult.bufferValue());
} else if (Arrays.equals(versionHeader.value(), V_1_CHANGELOG_HEADER_VALUE)) {
// Version 1:
// value:
// - buffer time
// - record context
// - old value
// - new value
final byte[] previousBufferedValue = index.containsKey(key)
? internalPriorValueForBuffered(key)
: null;
final DeserializationResult deserializationResult = deserializeV1(record, key, previousBufferedValue);
cleanPut(deserializationResult.time(), deserializationResult.key(), deserializationResult.bufferValue());
} else {
throw new IllegalArgumentException("Restoring apparently invalid changelog record: " + record);
}
}
if (record.partition() != partition) {
throw new IllegalStateException(
String.format(
"record partition [%d] is being restored by the wrong suppress partition [%d]",
record.partition(),
partition
)
);
}
}
updateBufferMetrics();
}
@Override
public void evictWhile(final Supplier<Boolean> predicate,
final Consumer<Eviction<K, V>> callback) {
@ -481,8 +468,7 @@ public final class InMemoryTimeOrderedKeyValueBuffer<K, V> implements TimeOrdere @@ -481,8 +468,7 @@ public final class InMemoryTimeOrderedKeyValueBuffer<K, V> implements TimeOrdere
final BufferValue buffered = getBuffered(serializedKey);
final byte[] serializedPriorValue;
if (buffered == null) {
final V priorValue = value.oldValue;
serializedPriorValue = (priorValue == null) ? null : valueSerde.innerSerde().serializer().serialize(changelogTopic, priorValue);
serializedPriorValue = serialChange.oldValue;
} else {
serializedPriorValue = buffered.priorValue();
}

158
streams/src/main/java/org/apache/kafka/streams/state/internals/TimeOrderedKeyValueBufferChangelogDeserializationHelper.java

@ -0,0 +1,158 @@ @@ -0,0 +1,158 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.streams.state.internals;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.common.utils.Bytes;
import org.apache.kafka.common.utils.Utils;
import org.apache.kafka.streams.kstream.internals.Change;
import org.apache.kafka.streams.kstream.internals.FullChangeSerde;
import org.apache.kafka.streams.processor.internals.ProcessorRecordContext;
import java.nio.ByteBuffer;
import static java.util.Objects.requireNonNull;
final class TimeOrderedKeyValueBufferChangelogDeserializationHelper {
private TimeOrderedKeyValueBufferChangelogDeserializationHelper() {}
static final class DeserializationResult {
private final long time;
private final Bytes key;
private final BufferValue bufferValue;
private DeserializationResult(final long time, final Bytes key, final BufferValue bufferValue) {
this.time = time;
this.key = key;
this.bufferValue = bufferValue;
}
long time() {
return time;
}
Bytes key() {
return key;
}
BufferValue bufferValue() {
return bufferValue;
}
}
static DeserializationResult deserializeV0(final ConsumerRecord<byte[], byte[]> record,
final Bytes key,
final byte[] previousBufferedValue) {
final ByteBuffer timeAndValue = ByteBuffer.wrap(record.value());
final long time = timeAndValue.getLong();
final byte[] changelogValue = new byte[record.value().length - 8];
timeAndValue.get(changelogValue);
final Change<byte[]> change = requireNonNull(FullChangeSerde.decomposeLegacyFormattedArrayIntoChangeArrays(changelogValue));
final ProcessorRecordContext recordContext = new ProcessorRecordContext(
record.timestamp(),
record.offset(),
record.partition(),
record.topic(),
record.headers()
);
return new DeserializationResult(
time,
key,
new BufferValue(
previousBufferedValue == null ? change.oldValue : previousBufferedValue,
change.oldValue,
change.newValue,
recordContext
)
);
}
static DeserializationResult deserializeV1(final ConsumerRecord<byte[], byte[]> record,
final Bytes key,
final byte[] previousBufferedValue) {
final ByteBuffer timeAndValue = ByteBuffer.wrap(record.value());
final long time = timeAndValue.getLong();
final byte[] changelogValue = new byte[record.value().length - 8];
timeAndValue.get(changelogValue);
final ContextualRecord contextualRecord = ContextualRecord.deserialize(ByteBuffer.wrap(changelogValue));
final Change<byte[]> change = requireNonNull(FullChangeSerde.decomposeLegacyFormattedArrayIntoChangeArrays(contextualRecord.value()));
return new DeserializationResult(
time,
key,
new BufferValue(
previousBufferedValue == null ? change.oldValue : previousBufferedValue,
change.oldValue,
change.newValue,
contextualRecord.recordContext()
)
);
}
static DeserializationResult duckTypeV2(final ConsumerRecord<byte[], byte[]> record, final Bytes key) {
DeserializationResult deserializationResult = null;
RuntimeException v2DeserializationException = null;
RuntimeException v3DeserializationException = null;
try {
deserializationResult = deserializeV2(record, key);
} catch (final RuntimeException e) {
v2DeserializationException = e;
}
// versions 2.4.0, 2.4.1, and 2.5.0 would have erroneously encoded a V3 record with the
// V2 header, so we'll try duck-typing to see if this is decodable as V3
if (deserializationResult == null) {
try {
deserializationResult = deserializeV3(record, key);
} catch (final RuntimeException e) {
v3DeserializationException = e;
}
}
if (deserializationResult == null) {
// ok, it wasn't V3 either. Throw both exceptions:
final RuntimeException exception =
new RuntimeException("Couldn't deserialize record as v2 or v3: " + record,
v2DeserializationException);
exception.addSuppressed(v3DeserializationException);
throw exception;
}
return deserializationResult;
}
private static DeserializationResult deserializeV2(final ConsumerRecord<byte[], byte[]> record,
final Bytes key) {
final ByteBuffer valueAndTime = ByteBuffer.wrap(record.value());
final ContextualRecord contextualRecord = ContextualRecord.deserialize(valueAndTime);
final Change<byte[]> change = requireNonNull(FullChangeSerde.decomposeLegacyFormattedArrayIntoChangeArrays(contextualRecord.value()));
final byte[] priorValue = Utils.getNullableSizePrefixedArray(valueAndTime);
final long time = valueAndTime.getLong();
final BufferValue bufferValue = new BufferValue(priorValue, change.oldValue, change.newValue, contextualRecord.recordContext());
return new DeserializationResult(time, key, bufferValue);
}
static DeserializationResult deserializeV3(final ConsumerRecord<byte[], byte[]> record, final Bytes key) {
final ByteBuffer valueAndTime = ByteBuffer.wrap(record.value());
final BufferValue bufferValue = BufferValue.deserialize(valueAndTime);
final long time = valueAndTime.getLong();
return new DeserializationResult(time, key, bufferValue);
}
}

39
streams/src/test/java/org/apache/kafka/streams/kstream/internals/FullChangeSerdeTest.java

@ -19,6 +19,8 @@ package org.apache.kafka.streams.kstream.internals; @@ -19,6 +19,8 @@ package org.apache.kafka.streams.kstream.internals;
import org.apache.kafka.common.serialization.Serdes;
import org.junit.Test;
import java.nio.ByteBuffer;
import static org.hamcrest.CoreMatchers.nullValue;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.core.Is.is;
@ -26,10 +28,37 @@ import static org.hamcrest.core.Is.is; @@ -26,10 +28,37 @@ import static org.hamcrest.core.Is.is;
public class FullChangeSerdeTest {
private final FullChangeSerde<String> serde = FullChangeSerde.wrap(Serdes.String());
/**
* We used to serialize a Change into a single byte[]. Now, we don't anymore, but we still keep this logic here
* so that we can produce the legacy format to test that we can still deserialize it.
*/
private static byte[] mergeChangeArraysIntoSingleLegacyFormattedArray(final Change<byte[]> serialChange) {
if (serialChange == null) {
return null;
}
final int oldSize = serialChange.oldValue == null ? -1 : serialChange.oldValue.length;
final int newSize = serialChange.newValue == null ? -1 : serialChange.newValue.length;
final ByteBuffer buffer = ByteBuffer.allocate(Integer.BYTES * 2 + Math.max(0, oldSize) + Math.max(0, newSize));
buffer.putInt(oldSize);
if (serialChange.oldValue != null) {
buffer.put(serialChange.oldValue);
}
buffer.putInt(newSize);
if (serialChange.newValue != null) {
buffer.put(serialChange.newValue);
}
return buffer.array();
}
@Test
public void shouldRoundTripNull() {
assertThat(serde.serializeParts(null, null), nullValue());
assertThat(FullChangeSerde.mergeChangeArraysIntoSingleLegacyFormattedArray(null), nullValue());
assertThat(mergeChangeArraysIntoSingleLegacyFormattedArray(null), nullValue());
assertThat(FullChangeSerde.decomposeLegacyFormattedArrayIntoChangeArrays(null), nullValue());
assertThat(serde.deserializeParts(null, null), nullValue());
}
@ -47,7 +76,7 @@ public class FullChangeSerdeTest { @@ -47,7 +76,7 @@ public class FullChangeSerdeTest {
is(new Change<String>(null, null))
);
final byte[] legacyFormat = FullChangeSerde.mergeChangeArraysIntoSingleLegacyFormattedArray(new Change<>(null, null));
final byte[] legacyFormat = mergeChangeArraysIntoSingleLegacyFormattedArray(new Change<>(null, null));
assertThat(
FullChangeSerde.decomposeLegacyFormattedArrayIntoChangeArrays(legacyFormat),
is(new Change<byte[]>(null, null))
@ -57,7 +86,7 @@ public class FullChangeSerdeTest { @@ -57,7 +86,7 @@ public class FullChangeSerdeTest {
@Test
public void shouldRoundTripOldNull() {
final Change<byte[]> serialized = serde.serializeParts(null, new Change<>("new", null));
final byte[] legacyFormat = FullChangeSerde.mergeChangeArraysIntoSingleLegacyFormattedArray(serialized);
final byte[] legacyFormat = mergeChangeArraysIntoSingleLegacyFormattedArray(serialized);
final Change<byte[]> decomposedLegacyFormat = FullChangeSerde.decomposeLegacyFormattedArrayIntoChangeArrays(legacyFormat);
assertThat(
serde.deserializeParts(null, decomposedLegacyFormat),
@ -68,7 +97,7 @@ public class FullChangeSerdeTest { @@ -68,7 +97,7 @@ public class FullChangeSerdeTest {
@Test
public void shouldRoundTripNewNull() {
final Change<byte[]> serialized = serde.serializeParts(null, new Change<>(null, "old"));
final byte[] legacyFormat = FullChangeSerde.mergeChangeArraysIntoSingleLegacyFormattedArray(serialized);
final byte[] legacyFormat = mergeChangeArraysIntoSingleLegacyFormattedArray(serialized);
final Change<byte[]> decomposedLegacyFormat = FullChangeSerde.decomposeLegacyFormattedArrayIntoChangeArrays(legacyFormat);
assertThat(
serde.deserializeParts(null, decomposedLegacyFormat),
@ -79,7 +108,7 @@ public class FullChangeSerdeTest { @@ -79,7 +108,7 @@ public class FullChangeSerdeTest {
@Test
public void shouldRoundTripChange() {
final Change<byte[]> serialized = serde.serializeParts(null, new Change<>("new", "old"));
final byte[] legacyFormat = FullChangeSerde.mergeChangeArraysIntoSingleLegacyFormattedArray(serialized);
final byte[] legacyFormat = mergeChangeArraysIntoSingleLegacyFormattedArray(serialized);
final Change<byte[]> decomposedLegacyFormat = FullChangeSerde.decomposeLegacyFormattedArrayIntoChangeArrays(legacyFormat);
assertThat(
serde.deserializeParts(null, decomposedLegacyFormat),

366
streams/src/test/java/org/apache/kafka/streams/state/internals/TimeOrderedKeyValueBufferTest.java

@ -29,7 +29,6 @@ import org.apache.kafka.common.utils.Utils; @@ -29,7 +29,6 @@ import org.apache.kafka.common.utils.Utils;
import org.apache.kafka.streams.KeyValue;
import org.apache.kafka.streams.StreamsConfig;
import org.apache.kafka.streams.kstream.internals.Change;
import org.apache.kafka.streams.kstream.internals.FullChangeSerde;
import org.apache.kafka.streams.processor.TaskId;
import org.apache.kafka.streams.processor.internals.ProcessorRecordContext;
import org.apache.kafka.streams.processor.internals.RecordBatchingStateRestoreCallback;
@ -56,14 +55,13 @@ import java.util.stream.Collectors; @@ -56,14 +55,13 @@ import java.util.stream.Collectors;
import static java.nio.charset.StandardCharsets.UTF_8;
import static java.util.Arrays.asList;
import static java.util.Collections.singletonList;
import static org.apache.kafka.streams.state.internals.InMemoryTimeOrderedKeyValueBuffer.CHANGELOG_HEADERS;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.is;
import static org.junit.Assert.fail;
@RunWith(Parameterized.class)
public class TimeOrderedKeyValueBufferTest<B extends TimeOrderedKeyValueBuffer<String, String>> {
private static final RecordHeaders V_2_CHANGELOG_HEADERS =
new RecordHeaders(new Header[] {new RecordHeader("v", new byte[] {(byte) 2})});
private static final String APP_ID = "test-app";
private final Function<String, B> bufferSupplier;
@ -73,7 +71,7 @@ public class TimeOrderedKeyValueBufferTest<B extends TimeOrderedKeyValueBuffer<S @@ -73,7 +71,7 @@ public class TimeOrderedKeyValueBufferTest<B extends TimeOrderedKeyValueBuffer<S
@Override
public byte[] serialize(final String topic, final String data) {
if (data == null) {
throw new IllegalArgumentException();
throw new IllegalArgumentException("null data not allowed");
}
return super.serialize(topic, data);
}
@ -347,14 +345,14 @@ public class TimeOrderedKeyValueBufferTest<B extends TimeOrderedKeyValueBuffer<S @@ -347,14 +345,14 @@ public class TimeOrderedKeyValueBufferTest<B extends TimeOrderedKeyValueBuffer<S
null,
"zxcv",
new KeyValue<>(1L, getBufferValue("3gon4i", 1)),
V_2_CHANGELOG_HEADERS
CHANGELOG_HEADERS
),
new ProducerRecord<>(APP_ID + "-" + testName + "-changelog",
0,
null,
"asdf",
new KeyValue<>(2L, getBufferValue("2093j", 0)),
V_2_CHANGELOG_HEADERS
CHANGELOG_HEADERS
)
)));
@ -362,7 +360,7 @@ public class TimeOrderedKeyValueBufferTest<B extends TimeOrderedKeyValueBuffer<S @@ -362,7 +360,7 @@ public class TimeOrderedKeyValueBufferTest<B extends TimeOrderedKeyValueBuffer<S
}
@Test
public void shouldRestoreOldFormat() {
public void shouldRestoreOldUnversionedFormat() {
final TimeOrderedKeyValueBuffer<String, String> buffer = bufferSupplier.apply(testName);
final MockInternalProcessorContext context = makeContext();
buffer.init(context, buffer);
@ -372,12 +370,14 @@ public class TimeOrderedKeyValueBufferTest<B extends TimeOrderedKeyValueBuffer<S @@ -372,12 +370,14 @@ public class TimeOrderedKeyValueBufferTest<B extends TimeOrderedKeyValueBuffer<S
context.setRecordContext(new ProcessorRecordContext(0, 0, 0, "", null));
final FullChangeSerde<String> serializer = FullChangeSerde.wrap(Serdes.String());
// These serialized formats were captured by running version 2.1 code.
// They verify that an upgrade from 2.1 will work.
// Do not change them.
final String toDeleteBinaryValue = "0000000000000000FFFFFFFF00000006646F6F6D6564";
final String asdfBinaryValue = "0000000000000002FFFFFFFF0000000471776572";
final String zxcvBinaryValue1 = "00000000000000010000000870726576696F757300000005656F34696D";
final String zxcvBinaryValue2 = "000000000000000100000005656F34696D000000046E657874";
final byte[] todeleteValue = FullChangeSerde.mergeChangeArraysIntoSingleLegacyFormattedArray(serializer.serializeParts(null, new Change<>("doomed", null)));
final byte[] asdfValue = FullChangeSerde.mergeChangeArraysIntoSingleLegacyFormattedArray(serializer.serializeParts(null, new Change<>("qwer", null)));
final byte[] zxcvValue1 = FullChangeSerde.mergeChangeArraysIntoSingleLegacyFormattedArray(serializer.serializeParts(null, new Change<>("eo4im", "previous")));
final byte[] zxcvValue2 = FullChangeSerde.mergeChangeArraysIntoSingleLegacyFormattedArray(serializer.serializeParts(null, new Change<>("next", "eo4im")));
stateRestoreCallback.restoreBatch(asList(
new ConsumerRecord<>("changelog-topic",
0,
@ -388,7 +388,7 @@ public class TimeOrderedKeyValueBufferTest<B extends TimeOrderedKeyValueBuffer<S @@ -388,7 +388,7 @@ public class TimeOrderedKeyValueBufferTest<B extends TimeOrderedKeyValueBuffer<S
-1,
-1,
"todelete".getBytes(UTF_8),
ByteBuffer.allocate(Long.BYTES + todeleteValue.length).putLong(0L).put(todeleteValue).array()),
hexStringToByteArray(toDeleteBinaryValue)),
new ConsumerRecord<>("changelog-topic",
0,
1,
@ -398,7 +398,7 @@ public class TimeOrderedKeyValueBufferTest<B extends TimeOrderedKeyValueBuffer<S @@ -398,7 +398,7 @@ public class TimeOrderedKeyValueBufferTest<B extends TimeOrderedKeyValueBuffer<S
-1,
-1,
"asdf".getBytes(UTF_8),
ByteBuffer.allocate(Long.BYTES + asdfValue.length).putLong(2L).put(asdfValue).array()),
hexStringToByteArray(asdfBinaryValue)),
new ConsumerRecord<>("changelog-topic",
0,
2,
@ -408,7 +408,7 @@ public class TimeOrderedKeyValueBufferTest<B extends TimeOrderedKeyValueBuffer<S @@ -408,7 +408,7 @@ public class TimeOrderedKeyValueBufferTest<B extends TimeOrderedKeyValueBuffer<S
-1,
-1,
"zxcv".getBytes(UTF_8),
ByteBuffer.allocate(Long.BYTES + zxcvValue1.length).putLong(1L).put(zxcvValue1).array()),
hexStringToByteArray(zxcvBinaryValue1)),
new ConsumerRecord<>("changelog-topic",
0,
3,
@ -418,7 +418,7 @@ public class TimeOrderedKeyValueBufferTest<B extends TimeOrderedKeyValueBuffer<S @@ -418,7 +418,7 @@ public class TimeOrderedKeyValueBufferTest<B extends TimeOrderedKeyValueBuffer<S
-1,
-1,
"zxcv".getBytes(UTF_8),
ByteBuffer.allocate(Long.BYTES + zxcvValue2.length).putLong(1L).put(zxcvValue2).array())
hexStringToByteArray(zxcvBinaryValue2))
));
assertThat(buffer.numRecords(), is(3));
@ -486,17 +486,14 @@ public class TimeOrderedKeyValueBufferTest<B extends TimeOrderedKeyValueBuffer<S @@ -486,17 +486,14 @@ public class TimeOrderedKeyValueBufferTest<B extends TimeOrderedKeyValueBuffer<S
final RecordHeaders v1FlagHeaders = new RecordHeaders(new Header[] {new RecordHeader("v", new byte[] {(byte) 1})});
final byte[] todeleteValue = getContextualRecord("doomed", 0).serialize(0).array();
final byte[] asdfValue = getContextualRecord("qwer", 1).serialize(0).array();
final FullChangeSerde<String> fullChangeSerde = FullChangeSerde.wrap(Serdes.String());
final byte[] zxcvValue1 = new ContextualRecord(
FullChangeSerde.mergeChangeArraysIntoSingleLegacyFormattedArray(fullChangeSerde.serializeParts(null, new Change<>("3o4im", "previous"))),
getContext(2L)
).serialize(0).array();
final byte[] zxcvValue2 = new ContextualRecord(
FullChangeSerde.mergeChangeArraysIntoSingleLegacyFormattedArray(fullChangeSerde.serializeParts(null, new Change<>("next", "3o4im"))),
getContext(3L)
).serialize(0).array();
// These serialized formats were captured by running version 2.2 code.
// They verify that an upgrade from 2.2 will work.
// Do not change them.
final String toDeleteBinary = "00000000000000000000000000000000000000000000000000000005746F70696300000000FFFFFFFF0000000EFFFFFFFF00000006646F6F6D6564";
final String asdfBinary = "00000000000000020000000000000001000000000000000000000005746F70696300000000FFFFFFFF0000000CFFFFFFFF0000000471776572";
final String zxcvBinary1 = "00000000000000010000000000000002000000000000000000000005746F70696300000000FFFFFFFF000000150000000870726576696F757300000005336F34696D";
final String zxcvBinary2 = "00000000000000010000000000000003000000000000000000000005746F70696300000000FFFFFFFF0000001100000005336F34696D000000046E657874";
stateRestoreCallback.restoreBatch(asList(
new ConsumerRecord<>("changelog-topic",
0,
@ -507,7 +504,7 @@ public class TimeOrderedKeyValueBufferTest<B extends TimeOrderedKeyValueBuffer<S @@ -507,7 +504,7 @@ public class TimeOrderedKeyValueBufferTest<B extends TimeOrderedKeyValueBuffer<S
-1,
-1,
"todelete".getBytes(UTF_8),
ByteBuffer.allocate(Long.BYTES + todeleteValue.length).putLong(0L).put(todeleteValue).array(),
hexStringToByteArray(toDeleteBinary),
v1FlagHeaders),
new ConsumerRecord<>("changelog-topic",
0,
@ -518,7 +515,7 @@ public class TimeOrderedKeyValueBufferTest<B extends TimeOrderedKeyValueBuffer<S @@ -518,7 +515,7 @@ public class TimeOrderedKeyValueBufferTest<B extends TimeOrderedKeyValueBuffer<S
-1,
-1,
"asdf".getBytes(UTF_8),
ByteBuffer.allocate(Long.BYTES + asdfValue.length).putLong(2L).put(asdfValue).array(),
hexStringToByteArray(asdfBinary),
v1FlagHeaders),
new ConsumerRecord<>("changelog-topic",
0,
@ -529,7 +526,7 @@ public class TimeOrderedKeyValueBufferTest<B extends TimeOrderedKeyValueBuffer<S @@ -529,7 +526,7 @@ public class TimeOrderedKeyValueBufferTest<B extends TimeOrderedKeyValueBuffer<S
-1,
-1,
"zxcv".getBytes(UTF_8),
ByteBuffer.allocate(Long.BYTES + zxcvValue1.length).putLong(1L).put(zxcvValue1).array(),
hexStringToByteArray(zxcvBinary1),
v1FlagHeaders),
new ConsumerRecord<>("changelog-topic",
0,
@ -540,7 +537,7 @@ public class TimeOrderedKeyValueBufferTest<B extends TimeOrderedKeyValueBuffer<S @@ -540,7 +537,7 @@ public class TimeOrderedKeyValueBufferTest<B extends TimeOrderedKeyValueBuffer<S
-1,
-1,
"zxcv".getBytes(UTF_8),
ByteBuffer.allocate(Long.BYTES + zxcvValue2.length).putLong(1L).put(zxcvValue2).array(),
hexStringToByteArray(zxcvBinary2),
v1FlagHeaders)
));
@ -596,6 +593,7 @@ public class TimeOrderedKeyValueBufferTest<B extends TimeOrderedKeyValueBuffer<S @@ -596,6 +593,7 @@ public class TimeOrderedKeyValueBufferTest<B extends TimeOrderedKeyValueBuffer<S
cleanup(context, buffer);
}
@Test
public void shouldRestoreV2Format() {
final TimeOrderedKeyValueBuffer<String, String> buffer = bufferSupplier.apply(testName);
@ -609,22 +607,14 @@ public class TimeOrderedKeyValueBufferTest<B extends TimeOrderedKeyValueBuffer<S @@ -609,22 +607,14 @@ public class TimeOrderedKeyValueBufferTest<B extends TimeOrderedKeyValueBuffer<S
final RecordHeaders v2FlagHeaders = new RecordHeaders(new Header[] {new RecordHeader("v", new byte[] {(byte) 2})});
final byte[] todeleteValue = getBufferValue("doomed", 0).serialize(0).array();
final byte[] asdfValue = getBufferValue("qwer", 1).serialize(0).array();
final byte[] zxcvValue1 =
new BufferValue(
Serdes.String().serializer().serialize(null, "previous"),
Serdes.String().serializer().serialize(null, "IGNORED"),
Serdes.String().serializer().serialize(null, "3o4im"),
getContext(2L)
).serialize(0).array();
final byte[] zxcvValue2 =
new BufferValue(
Serdes.String().serializer().serialize(null, "previous"),
Serdes.String().serializer().serialize(null, "3o4im"),
Serdes.String().serializer().serialize(null, "next"),
getContext(3L)
).serialize(0).array();
// These serialized formats were captured by running version 2.3 code.
// They verify that an upgrade from 2.3 will work.
// Do not change them.
final String toDeleteBinary = "0000000000000000000000000000000000000005746F70696300000000FFFFFFFF0000000EFFFFFFFF00000006646F6F6D6564FFFFFFFF0000000000000000";
final String asdfBinary = "0000000000000001000000000000000000000005746F70696300000000FFFFFFFF0000000CFFFFFFFF0000000471776572FFFFFFFF0000000000000002";
final String zxcvBinary1 = "0000000000000002000000000000000000000005746F70696300000000FFFFFFFF000000140000000749474E4F52454400000005336F34696D0000000870726576696F75730000000000000001";
final String zxcvBinary2 = "0000000000000003000000000000000000000005746F70696300000000FFFFFFFF0000001100000005336F34696D000000046E6578740000000870726576696F75730000000000000001";
stateRestoreCallback.restoreBatch(asList(
new ConsumerRecord<>("changelog-topic",
0,
@ -635,7 +625,7 @@ public class TimeOrderedKeyValueBufferTest<B extends TimeOrderedKeyValueBuffer<S @@ -635,7 +625,7 @@ public class TimeOrderedKeyValueBufferTest<B extends TimeOrderedKeyValueBuffer<S
-1,
-1,
"todelete".getBytes(UTF_8),
ByteBuffer.allocate(Long.BYTES + todeleteValue.length).put(todeleteValue).putLong(0L).array(),
hexStringToByteArray(toDeleteBinary),
v2FlagHeaders),
new ConsumerRecord<>("changelog-topic",
0,
@ -646,7 +636,7 @@ public class TimeOrderedKeyValueBufferTest<B extends TimeOrderedKeyValueBuffer<S @@ -646,7 +636,7 @@ public class TimeOrderedKeyValueBufferTest<B extends TimeOrderedKeyValueBuffer<S
-1,
-1,
"asdf".getBytes(UTF_8),
ByteBuffer.allocate(Long.BYTES + asdfValue.length).put(asdfValue).putLong(2L).array(),
hexStringToByteArray(asdfBinary),
v2FlagHeaders),
new ConsumerRecord<>("changelog-topic",
0,
@ -657,7 +647,7 @@ public class TimeOrderedKeyValueBufferTest<B extends TimeOrderedKeyValueBuffer<S @@ -657,7 +647,7 @@ public class TimeOrderedKeyValueBufferTest<B extends TimeOrderedKeyValueBuffer<S
-1,
-1,
"zxcv".getBytes(UTF_8),
ByteBuffer.allocate(Long.BYTES + zxcvValue1.length).put(zxcvValue1).putLong(1L).array(),
hexStringToByteArray(zxcvBinary1),
v2FlagHeaders),
new ConsumerRecord<>("changelog-topic",
0,
@ -668,7 +658,7 @@ public class TimeOrderedKeyValueBufferTest<B extends TimeOrderedKeyValueBuffer<S @@ -668,7 +658,7 @@ public class TimeOrderedKeyValueBufferTest<B extends TimeOrderedKeyValueBuffer<S
-1,
-1,
"zxcv".getBytes(UTF_8),
ByteBuffer.allocate(Long.BYTES + zxcvValue2.length).put(zxcvValue2).putLong(1L).array(),
hexStringToByteArray(zxcvBinary2),
v2FlagHeaders)
));
@ -724,6 +714,249 @@ public class TimeOrderedKeyValueBufferTest<B extends TimeOrderedKeyValueBuffer<S @@ -724,6 +714,249 @@ public class TimeOrderedKeyValueBufferTest<B extends TimeOrderedKeyValueBuffer<S
cleanup(context, buffer);
}
@Test
public void shouldRestoreV3FormatWithV2Header() {
// versions 2.4.0, 2.4.1, and 2.5.0 would have erroneously encoded a V3 record with the
// V2 header, so we need to be sure to handle this case as well.
// Note the data is the same as the V3 test.
final TimeOrderedKeyValueBuffer<String, String> buffer = bufferSupplier.apply(testName);
final MockInternalProcessorContext context = makeContext();
buffer.init(context, buffer);
final RecordBatchingStateRestoreCallback stateRestoreCallback =
(RecordBatchingStateRestoreCallback) context.stateRestoreCallback(testName);
context.setRecordContext(new ProcessorRecordContext(0, 0, 0, "", null));
final RecordHeaders headers = new RecordHeaders(new Header[] {new RecordHeader("v", new byte[] {(byte) 2})});
// These serialized formats were captured by running version 2.4 code.
// They verify that an upgrade from 2.4 will work.
// Do not change them.
final String toDeleteBinary = "0000000000000000000000000000000000000005746F70696300000000FFFFFFFFFFFFFFFFFFFFFFFF00000006646F6F6D65640000000000000000";
final String asdfBinary = "0000000000000001000000000000000000000005746F70696300000000FFFFFFFFFFFFFFFFFFFFFFFF00000004717765720000000000000002";
final String zxcvBinary1 = "0000000000000002000000000000000000000005746F70696300000000FFFFFFFF0000000870726576696F75730000000749474E4F52454400000005336F34696D0000000000000001";
final String zxcvBinary2 = "0000000000000003000000000000000000000005746F70696300000000FFFFFFFF0000000870726576696F757300000005336F34696D000000046E6578740000000000000001";
stateRestoreCallback.restoreBatch(asList(
new ConsumerRecord<>("changelog-topic",
0,
0,
999,
TimestampType.CREATE_TIME,
-1L,
-1,
-1,
"todelete".getBytes(UTF_8),
hexStringToByteArray(toDeleteBinary),
headers),
new ConsumerRecord<>("changelog-topic",
0,
1,
9999,
TimestampType.CREATE_TIME,
-1L,
-1,
-1,
"asdf".getBytes(UTF_8),
hexStringToByteArray(asdfBinary),
headers),
new ConsumerRecord<>("changelog-topic",
0,
2,
99,
TimestampType.CREATE_TIME,
-1L,
-1,
-1,
"zxcv".getBytes(UTF_8),
hexStringToByteArray(zxcvBinary1),
headers),
new ConsumerRecord<>("changelog-topic",
0,
2,
100,
TimestampType.CREATE_TIME,
-1L,
-1,
-1,
"zxcv".getBytes(UTF_8),
hexStringToByteArray(zxcvBinary2),
headers)
));
assertThat(buffer.numRecords(), is(3));
assertThat(buffer.minTimestamp(), is(0L));
assertThat(buffer.bufferSize(), is(142L));
stateRestoreCallback.restoreBatch(singletonList(
new ConsumerRecord<>("changelog-topic",
0,
3,
3,
TimestampType.CREATE_TIME,
-1L,
-1,
-1,
"todelete".getBytes(UTF_8),
null)
));
assertThat(buffer.numRecords(), is(2));
assertThat(buffer.minTimestamp(), is(1L));
assertThat(buffer.bufferSize(), is(95L));
assertThat(buffer.priorValueForBuffered("todelete"), is(Maybe.undefined()));
assertThat(buffer.priorValueForBuffered("asdf"), is(Maybe.defined(null)));
assertThat(buffer.priorValueForBuffered("zxcv"), is(Maybe.defined(ValueAndTimestamp.make("previous", -1))));
// flush the buffer into a list in buffer order so we can make assertions about the contents.
final List<Eviction<String, String>> evicted = new LinkedList<>();
buffer.evictWhile(() -> true, evicted::add);
// Several things to note:
// * The buffered records are ordered according to their buffer time (serialized in the value of the changelog)
// * The record timestamps are properly restored, and not conflated with the record's buffer time.
// * The keys and values are properly restored
// * The record topic is set to the original input topic, *not* the changelog topic
// * The record offset preserves the original input record's offset, *not* the offset of the changelog record
assertThat(evicted, is(asList(
new Eviction<>(
"zxcv",
new Change<>("next", "3o4im"),
getContext(3L)),
new Eviction<>(
"asdf",
new Change<>("qwer", null),
getContext(1L)
))));
cleanup(context, buffer);
}
@Test
public void shouldRestoreV3Format() {
final TimeOrderedKeyValueBuffer<String, String> buffer = bufferSupplier.apply(testName);
final MockInternalProcessorContext context = makeContext();
buffer.init(context, buffer);
final RecordBatchingStateRestoreCallback stateRestoreCallback =
(RecordBatchingStateRestoreCallback) context.stateRestoreCallback(testName);
context.setRecordContext(new ProcessorRecordContext(0, 0, 0, "", null));
final RecordHeaders headers = new RecordHeaders(new Header[] {new RecordHeader("v", new byte[] {(byte) 3})});
// These serialized formats were captured by running version 2.4 code.
// They verify that an upgrade from 2.4 will work.
// Do not change them.
final String toDeleteBinary = "0000000000000000000000000000000000000005746F70696300000000FFFFFFFFFFFFFFFFFFFFFFFF00000006646F6F6D65640000000000000000";
final String asdfBinary = "0000000000000001000000000000000000000005746F70696300000000FFFFFFFFFFFFFFFFFFFFFFFF00000004717765720000000000000002";
final String zxcvBinary1 = "0000000000000002000000000000000000000005746F70696300000000FFFFFFFF0000000870726576696F75730000000749474E4F52454400000005336F34696D0000000000000001";
final String zxcvBinary2 = "0000000000000003000000000000000000000005746F70696300000000FFFFFFFF0000000870726576696F757300000005336F34696D000000046E6578740000000000000001";
stateRestoreCallback.restoreBatch(asList(
new ConsumerRecord<>("changelog-topic",
0,
0,
999,
TimestampType.CREATE_TIME,
-1L,
-1,
-1,
"todelete".getBytes(UTF_8),
hexStringToByteArray(toDeleteBinary),
headers),
new ConsumerRecord<>("changelog-topic",
0,
1,
9999,
TimestampType.CREATE_TIME,
-1L,
-1,
-1,
"asdf".getBytes(UTF_8),
hexStringToByteArray(asdfBinary),
headers),
new ConsumerRecord<>("changelog-topic",
0,
2,
99,
TimestampType.CREATE_TIME,
-1L,
-1,
-1,
"zxcv".getBytes(UTF_8),
hexStringToByteArray(zxcvBinary1),
headers),
new ConsumerRecord<>("changelog-topic",
0,
2,
100,
TimestampType.CREATE_TIME,
-1L,
-1,
-1,
"zxcv".getBytes(UTF_8),
hexStringToByteArray(zxcvBinary2),
headers)
));
assertThat(buffer.numRecords(), is(3));
assertThat(buffer.minTimestamp(), is(0L));
assertThat(buffer.bufferSize(), is(142L));
stateRestoreCallback.restoreBatch(singletonList(
new ConsumerRecord<>("changelog-topic",
0,
3,
3,
TimestampType.CREATE_TIME,
-1L,
-1,
-1,
"todelete".getBytes(UTF_8),
null)
));
assertThat(buffer.numRecords(), is(2));
assertThat(buffer.minTimestamp(), is(1L));
assertThat(buffer.bufferSize(), is(95L));
assertThat(buffer.priorValueForBuffered("todelete"), is(Maybe.undefined()));
assertThat(buffer.priorValueForBuffered("asdf"), is(Maybe.defined(null)));
assertThat(buffer.priorValueForBuffered("zxcv"), is(Maybe.defined(ValueAndTimestamp.make("previous", -1))));
// flush the buffer into a list in buffer order so we can make assertions about the contents.
final List<Eviction<String, String>> evicted = new LinkedList<>();
buffer.evictWhile(() -> true, evicted::add);
// Several things to note:
// * The buffered records are ordered according to their buffer time (serialized in the value of the changelog)
// * The record timestamps are properly restored, and not conflated with the record's buffer time.
// * The keys and values are properly restored
// * The record topic is set to the original input topic, *not* the changelog topic
// * The record offset preserves the original input record's offset, *not* the offset of the changelog record
assertThat(evicted, is(asList(
new Eviction<>(
"zxcv",
new Change<>("next", "3o4im"),
getContext(3L)),
new Eviction<>(
"asdf",
new Change<>("qwer", null),
getContext(1L)
))));
cleanup(context, buffer);
}
@Test
public void shouldNotRestoreUnrecognizedVersionRecord() {
final TimeOrderedKeyValueBuffer<String, String> buffer = bufferSupplier.apply(testName);
@ -780,15 +1013,30 @@ public class TimeOrderedKeyValueBufferTest<B extends TimeOrderedKeyValueBuffer<S @@ -780,15 +1013,30 @@ public class TimeOrderedKeyValueBufferTest<B extends TimeOrderedKeyValueBuffer<S
);
}
private static ContextualRecord getContextualRecord(final String value, final long timestamp) {
final FullChangeSerde<String> fullChangeSerde = FullChangeSerde.wrap(Serdes.String());
return new ContextualRecord(
FullChangeSerde.mergeChangeArraysIntoSingleLegacyFormattedArray(fullChangeSerde.serializeParts(null, new Change<>(value, null))),
getContext(timestamp)
);
}
private static ProcessorRecordContext getContext(final long recordTimestamp) {
return new ProcessorRecordContext(recordTimestamp, 0, 0, "topic", null);
}
// to be used to generate future hex-encoded values
// private static final char[] HEX_ARRAY = "0123456789ABCDEF".toCharArray();
// private static String bytesToHex(final byte[] bytes) {
// final char[] hexChars = new char[bytes.length * 2];
// for (int j = 0; j < bytes.length; j++) {
// final int v = bytes[j] & 0xFF;
// hexChars[j * 2] = HEX_ARRAY[v >>> 4];
// hexChars[j * 2 + 1] = HEX_ARRAY[v & 0x0F];
// }
// return new String(hexChars);
// }
private static byte[] hexStringToByteArray(final String hexString) {
final int len = hexString.length();
final byte[] data = new byte[len / 2];
for (int i = 0; i < len; i += 2) {
data[i / 2] = (byte) ((Character.digit(hexString.charAt(i), 16) << 4)
+ Character.digit(hexString.charAt(i + 1), 16));
}
return data;
}
}

Loading…
Cancel
Save