From e54ab292e7e2fd1d18e82387a586969ba57eb7ea Mon Sep 17 00:00:00 2001 From: John Roesler Date: Thu, 13 Jun 2019 08:48:15 -0500 Subject: [PATCH] KAFKA-8452: Compressed BufferValue (#6848) De-duplicate the common case in which the prior value is the same as the old value. Reviewers: Sophie Blee-Goldman , Bill Bejeck --- .../streams/kstream/internals/Change.java | 2 +- .../kstream/internals/FullChangeSerde.java | 153 ++++++------- .../internals/ProcessorRecordContext.java | 2 +- .../streams/state/internals/BufferValue.java | 137 +++++++++--- .../state/internals/ContextualRecord.java | 4 +- .../InMemoryTimeOrderedKeyValueBuffer.java | 71 +++--- .../state/internals/LRUCacheEntry.java | 2 +- .../internals/FullChangeSerdeTest.java | 132 +++--------- .../KTableSuppressProcessorMetricsTest.java | 15 +- .../suppress/KTableSuppressProcessorTest.java | 3 +- .../internals/suppress/SuppressSuite.java | 14 +- .../internals/ProcessorRecordContextTest.java | 10 +- .../state/internals/BufferValueTest.java | 203 ++++++++++++++++++ .../TimeOrderedKeyValueBufferTest.java | 78 +++---- 14 files changed, 518 insertions(+), 308 deletions(-) create mode 100644 streams/src/test/java/org/apache/kafka/streams/state/internals/BufferValueTest.java diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/Change.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/Change.java index c9a18de0de7..f28a16d964b 100644 --- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/Change.java +++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/Change.java @@ -30,7 +30,7 @@ public class Change { @Override public String toString() { - return "(" + newValue + "<-" + oldValue + ")"; + return "(" + String.valueOf(newValue) + "<-" + String.valueOf(oldValue) + ")"; } @Override diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/FullChangeSerde.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/FullChangeSerde.java index 30d55beacaa..f28f9e7ef39 100644 --- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/FullChangeSerde.java +++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/FullChangeSerde.java @@ -21,19 +21,15 @@ import org.apache.kafka.common.serialization.Serde; import org.apache.kafka.common.serialization.Serializer; import java.nio.ByteBuffer; -import java.util.Map; import static java.util.Objects.requireNonNull; -public final class FullChangeSerde implements Serde> { +public final class FullChangeSerde { private final Serde inner; - @SuppressWarnings("unchecked") - public static FullChangeSerde castOrWrap(final Serde serde) { + public static FullChangeSerde wrap(final Serde serde) { if (serde == null) { return null; - } else if (serde instanceof FullChangeSerde) { - return (FullChangeSerde) serde; } else { return new FullChangeSerde<>(serde); } @@ -47,98 +43,81 @@ public final class FullChangeSerde implements Serde> { return inner; } - @Override - public void configure(final Map configs, final boolean isKey) { - inner.configure(configs, isKey); + public Change serializeParts(final String topic, final Change data) { + if (data == null) { + return null; + } + final Serializer innerSerializer = innerSerde().serializer(); + final byte[] oldBytes = data.oldValue == null ? null : innerSerializer.serialize(topic, data.oldValue); + final byte[] newBytes = data.newValue == null ? null : innerSerializer.serialize(topic, data.newValue); + return new Change<>(newBytes, oldBytes); } - @Override - public void close() { - inner.close(); - } - @Override - public Serializer> serializer() { - final Serializer innerSerializer = inner.serializer(); - - return new Serializer>() { - @Override - public void configure(final Map configs, final boolean isKey) { - innerSerializer.configure(configs, isKey); - } - - @Override - public byte[] serialize(final String topic, final Change data) { - if (data == null) { - return null; - } - final byte[] oldBytes = data.oldValue == null ? null : innerSerializer.serialize(topic, data.oldValue); - final int oldSize = oldBytes == null ? -1 : oldBytes.length; - final byte[] newBytes = data.newValue == null ? null : innerSerializer.serialize(topic, data.newValue); - final int newSize = newBytes == null ? -1 : newBytes.length; - - final ByteBuffer buffer = ByteBuffer.wrap( - new byte[4 + (oldSize == -1 ? 0 : oldSize) + 4 + (newSize == -1 ? 0 : newSize)] - ); - buffer.putInt(oldSize); - if (oldBytes != null) { - buffer.put(oldBytes); - } - buffer.putInt(newSize); - if (newBytes != null) { - buffer.put(newBytes); - } - return buffer.array(); - } - - @Override - public void close() { - innerSerializer.close(); - } - }; + public Change deserializeParts(final String topic, final Change serialChange) { + if (serialChange == null) { + return null; + } + final Deserializer innerDeserializer = innerSerde().deserializer(); + + final T oldValue = + serialChange.oldValue == null ? null : innerDeserializer.deserialize(topic, serialChange.oldValue); + final T newValue = + serialChange.newValue == null ? null : innerDeserializer.deserialize(topic, serialChange.newValue); + + return new Change<>(newValue, oldValue); } - @Override - public Deserializer> deserializer() { - final Deserializer innerDeserializer = inner.deserializer(); - return new Deserializer>() { - @Override - public void configure(final Map configs, final boolean isKey) { - innerDeserializer.configure(configs, isKey); - } - - @Override - public Change deserialize(final String topic, final byte[] data) { - if (data == null) { - return null; - } - final ByteBuffer buffer = ByteBuffer.wrap(data); - - final byte[] oldBytes = extractOldValuePart(buffer); - final T oldValue = oldBytes == null ? null : innerDeserializer.deserialize(topic, oldBytes); - - final int newSize = buffer.getInt(); - final byte[] newBytes = newSize == -1 ? null : new byte[newSize]; - if (newBytes != null) { - buffer.get(newBytes); - } - final T newValue = newBytes == null ? null : innerDeserializer.deserialize(topic, newBytes); - return new Change<>(newValue, oldValue); - } - - @Override - public void close() { - innerDeserializer.close(); - } - }; + /** + * We used to serialize a Change into a single byte[]. Now, we don't anymore, but we still keep this logic here + * so that we can produce the legacy format to test that we can still deserialize it. + */ + public static byte[] composeLegacyFormat(final Change serialChange) { + if (serialChange == null) { + return null; + } + + final int oldSize = serialChange.oldValue == null ? -1 : serialChange.oldValue.length; + final int newSize = serialChange.newValue == null ? -1 : serialChange.newValue.length; + + final ByteBuffer buffer = ByteBuffer.allocate(Integer.BYTES * 2 + Math.max(0, oldSize) + Math.max(0, newSize)); + + + buffer.putInt(oldSize); + if (serialChange.oldValue != null) { + buffer.put(serialChange.oldValue); + } + + buffer.putInt(newSize); + if (serialChange.newValue != null) { + buffer.put(serialChange.newValue); + } + return buffer.array(); } - public static byte[] extractOldValuePart(final ByteBuffer buffer) { + /** + * We used to serialize a Change into a single byte[]. Now, we don't anymore, but we still + * need to be able to read it (so that we can load the state store from previously-written changelog records). + */ + public static Change decomposeLegacyFormat(final byte[] data) { + if (data == null) { + return null; + } + final ByteBuffer buffer = ByteBuffer.wrap(data); + final int oldSize = buffer.getInt(); final byte[] oldBytes = oldSize == -1 ? null : new byte[oldSize]; if (oldBytes != null) { buffer.get(oldBytes); } - return oldBytes; + + final int newSize = buffer.getInt(); + final byte[] newBytes = newSize == -1 ? null : new byte[newSize]; + if (newBytes != null) { + buffer.get(newBytes); + } + + return new Change<>(newBytes, oldBytes); } + } diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorRecordContext.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorRecordContext.java index 1b2248274b6..5662417a214 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorRecordContext.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorRecordContext.java @@ -73,7 +73,7 @@ public class ProcessorRecordContext implements RecordContext { return headers; } - public long sizeBytes() { + public long residentMemorySizeEstimate() { long size = 0; size += Long.BYTES; // value.context.timestamp size += Long.BYTES; // value.context.offset diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/BufferValue.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/BufferValue.java index 816894ec169..f1990c71256 100644 --- a/streams/src/main/java/org/apache/kafka/streams/state/internals/BufferValue.java +++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/BufferValue.java @@ -16,60 +16,133 @@ */ package org.apache.kafka.streams.state.internals; +import org.apache.kafka.streams.processor.internals.ProcessorRecordContext; + import java.nio.ByteBuffer; import java.util.Arrays; import java.util.Objects; public final class BufferValue { - private final ContextualRecord record; + private static final int NULL_VALUE_SENTINEL = -1; + private static final int OLD_PREV_DUPLICATE_VALUE_SENTINEL = -2; private final byte[] priorValue; + private final byte[] oldValue; + private final byte[] newValue; + private final ProcessorRecordContext recordContext; + + BufferValue(final byte[] priorValue, final byte[] oldValue, final byte[] newValue, final ProcessorRecordContext recordContext) { + this.oldValue = oldValue; + this.newValue = newValue; + this.recordContext = recordContext; + + // This de-duplicates the prior and old references. + // If they were already the same reference, the comparison is trivially fast, so we don't specifically check + // for that case. + if (Arrays.equals(priorValue, oldValue)) { + this.priorValue = oldValue; + } else { + this.priorValue = priorValue; + } + } - BufferValue(final ContextualRecord record, - final byte[] priorValue) { - this.record = record; - this.priorValue = priorValue; + byte[] priorValue() { + return priorValue; } - ContextualRecord record() { - return record; + byte[] oldValue() { + return oldValue; } - byte[] priorValue() { - return priorValue; + byte[] newValue() { + return newValue; + } + + ProcessorRecordContext context() { + return recordContext; } static BufferValue deserialize(final ByteBuffer buffer) { - final ContextualRecord record = ContextualRecord.deserialize(buffer); + final ProcessorRecordContext context = ProcessorRecordContext.deserialize(buffer); + + final byte[] priorValue = extractValue(buffer); + + final byte[] oldValue; + final int oldValueLength = buffer.getInt(); + if (oldValueLength == NULL_VALUE_SENTINEL) { + oldValue = null; + } else if (oldValueLength == OLD_PREV_DUPLICATE_VALUE_SENTINEL) { + oldValue = priorValue; + } else { + oldValue = new byte[oldValueLength]; + buffer.get(oldValue); + } + + final byte[] newValue = extractValue(buffer); + + return new BufferValue(priorValue, oldValue, newValue, context); + } - final int priorValueLength = buffer.getInt(); - if (priorValueLength == -1) { - return new BufferValue(record, null); + private static byte[] extractValue(final ByteBuffer buffer) { + final int valueLength = buffer.getInt(); + if (valueLength == NULL_VALUE_SENTINEL) { + return null; } else { - final byte[] priorValue = new byte[priorValueLength]; - buffer.get(priorValue); - return new BufferValue(record, priorValue); + final byte[] value = new byte[valueLength]; + buffer.get(value); + return value; } } ByteBuffer serialize(final int endPadding) { - final int sizeOfPriorValueLength = Integer.BYTES; + final int sizeOfValueLength = Integer.BYTES; + final int sizeOfPriorValue = priorValue == null ? 0 : priorValue.length; + final int sizeOfOldValue = oldValue == null || priorValue == oldValue ? 0 : oldValue.length; + final int sizeOfNewValue = newValue == null ? 0 : newValue.length; + + final byte[] serializedContext = recordContext.serialize(); + + final ByteBuffer buffer = ByteBuffer.allocate( + serializedContext.length + + sizeOfValueLength + sizeOfPriorValue + + sizeOfValueLength + sizeOfOldValue + + sizeOfValueLength + sizeOfNewValue + + endPadding + ); - final ByteBuffer buffer = record.serialize(sizeOfPriorValueLength + sizeOfPriorValue + endPadding); + buffer.put(serializedContext); - if (priorValue == null) { - buffer.putInt(-1); + addValue(buffer, priorValue); + + if (oldValue == null) { + buffer.putInt(NULL_VALUE_SENTINEL); + } else if (priorValue == oldValue) { + buffer.putInt(OLD_PREV_DUPLICATE_VALUE_SENTINEL); } else { - buffer.putInt(priorValue.length); - buffer.put(priorValue); + buffer.putInt(sizeOfOldValue); + buffer.put(oldValue); } + addValue(buffer, newValue); + return buffer; } - long sizeBytes() { - return (priorValue == null ? 0 : priorValue.length) + record.sizeBytes(); + private static void addValue(final ByteBuffer buffer, final byte[] value) { + if (value == null) { + buffer.putInt(NULL_VALUE_SENTINEL); + } else { + buffer.putInt(value.length); + buffer.put(value); + } + } + + long residentMemorySizeEstimate() { + return (priorValue == null ? 0 : priorValue.length) + + (oldValue == null || priorValue == oldValue ? 0 : oldValue.length) + + (newValue == null ? 0 : newValue.length) + + recordContext.residentMemorySizeEstimate(); } @Override @@ -77,22 +150,28 @@ public final class BufferValue { if (this == o) return true; if (o == null || getClass() != o.getClass()) return false; final BufferValue that = (BufferValue) o; - return Objects.equals(record, that.record) && - Arrays.equals(priorValue, that.priorValue); + return Arrays.equals(priorValue, that.priorValue) && + Arrays.equals(oldValue, that.oldValue) && + Arrays.equals(newValue, that.newValue) && + Objects.equals(recordContext, that.recordContext); } @Override public int hashCode() { - int result = Objects.hash(record); + int result = Objects.hash(recordContext); result = 31 * result + Arrays.hashCode(priorValue); + result = 31 * result + Arrays.hashCode(oldValue); + result = 31 * result + Arrays.hashCode(newValue); return result; } @Override public String toString() { return "BufferValue{" + - "record=" + record + - ", priorValue=" + Arrays.toString(priorValue) + + "priorValue=" + Arrays.toString(priorValue) + + ", oldValue=" + Arrays.toString(oldValue) + + ", newValue=" + Arrays.toString(newValue) + + ", recordContext=" + recordContext + '}'; } } diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/ContextualRecord.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/ContextualRecord.java index 3cd2c373584..3c24f521c23 100644 --- a/streams/src/main/java/org/apache/kafka/streams/state/internals/ContextualRecord.java +++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/ContextualRecord.java @@ -39,8 +39,8 @@ public class ContextualRecord { return value; } - long sizeBytes() { - return (value == null ? 0 : value.length) + recordContext.sizeBytes(); + long residentMemorySizeEstimate() { + return (value == null ? 0 : value.length) + recordContext.residentMemorySizeEstimate(); } ByteBuffer serialize(final int endPadding) { diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/InMemoryTimeOrderedKeyValueBuffer.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/InMemoryTimeOrderedKeyValueBuffer.java index 6c5022f080a..6c6ef367922 100644 --- a/streams/src/main/java/org/apache/kafka/streams/state/internals/InMemoryTimeOrderedKeyValueBuffer.java +++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/InMemoryTimeOrderedKeyValueBuffer.java @@ -159,7 +159,7 @@ public final class InMemoryTimeOrderedKeyValueBuffer implements TimeOrdere this.storeName = storeName; this.loggingEnabled = loggingEnabled; this.keySerde = keySerde; - this.valueSerde = FullChangeSerde.castOrWrap(valueSerde); + this.valueSerde = FullChangeSerde.wrap(valueSerde); } @Override @@ -176,7 +176,7 @@ public final class InMemoryTimeOrderedKeyValueBuffer implements TimeOrdere @Override public void setSerdesIfNull(final Serde keySerde, final Serde valueSerde) { this.keySerde = this.keySerde == null ? keySerde : this.keySerde; - this.valueSerde = this.valueSerde == null ? FullChangeSerde.castOrWrap(valueSerde) : this.valueSerde; + this.valueSerde = this.valueSerde == null ? FullChangeSerde.wrap(valueSerde) : this.valueSerde; } @Override @@ -296,21 +296,26 @@ public final class InMemoryTimeOrderedKeyValueBuffer implements TimeOrdere final byte[] changelogValue = new byte[record.value().length - 8]; timeAndValue.get(changelogValue); + final Change change = requireNonNull(FullChangeSerde.decomposeLegacyFormat(changelogValue)); + + final ProcessorRecordContext recordContext = new ProcessorRecordContext( + record.timestamp(), + record.offset(), + record.partition(), + record.topic(), + record.headers() + ); + cleanPut( time, key, new BufferValue( - new ContextualRecord( - changelogValue, - new ProcessorRecordContext( - record.timestamp(), - record.offset(), - record.partition(), - record.topic(), - record.headers() - ) - ), - inferPriorValue(key, changelogValue) + index.containsKey(key) + ? internalPriorValueForBuffered(key) + : change.oldValue, + change.oldValue, + change.newValue, + recordContext ) ); } else if (V_1_CHANGELOG_HEADERS.lastHeader("v").equals(record.headers().lastHeader("v"))) { @@ -321,7 +326,20 @@ public final class InMemoryTimeOrderedKeyValueBuffer implements TimeOrdere timeAndValue.get(changelogValue); final ContextualRecord contextualRecord = ContextualRecord.deserialize(ByteBuffer.wrap(changelogValue)); - cleanPut(time, key, new BufferValue(contextualRecord, inferPriorValue(key, contextualRecord.value()))); + final Change change = requireNonNull(FullChangeSerde.decomposeLegacyFormat(contextualRecord.value())); + + cleanPut( + time, + key, + new BufferValue( + index.containsKey(key) + ? internalPriorValueForBuffered(key) + : change.oldValue, + change.oldValue, + change.newValue, + contextualRecord.recordContext() + ) + ); } else if (V_2_CHANGELOG_HEADERS.lastHeader("v").equals(record.headers().lastHeader("v"))) { // in this case, the changelog value is a serialized BufferValue @@ -346,13 +364,6 @@ public final class InMemoryTimeOrderedKeyValueBuffer implements TimeOrdere updateBufferMetrics(); } - private byte[] inferPriorValue(final Bytes key, final byte[] serializedChange) { - return index.containsKey(key) - ? internalPriorValueForBuffered(key) - : FullChangeSerde.extractOldValuePart(ByteBuffer.wrap(serializedChange)); - } - - @Override public void evictWhile(final Supplier predicate, final Consumer> callback) { @@ -375,9 +386,11 @@ public final class InMemoryTimeOrderedKeyValueBuffer implements TimeOrdere } final K key = keySerde.deserializer().deserialize(changelogTopic, next.getKey().key().get()); final BufferValue bufferValue = next.getValue(); - final ContextualRecord record = bufferValue.record(); - final Change value = valueSerde.deserializer().deserialize(changelogTopic, record.value()); - callback.accept(new Eviction<>(key, value, record.recordContext())); + final Change value = valueSerde.deserializeParts( + changelogTopic, + new Change<>(bufferValue.newValue(), bufferValue.oldValue()) + ); + callback.accept(new Eviction<>(key, value, bufferValue.context())); delegate.remove(); index.remove(next.getKey().key()); @@ -442,7 +455,7 @@ public final class InMemoryTimeOrderedKeyValueBuffer implements TimeOrdere requireNonNull(recordContext, "recordContext cannot be null"); final Bytes serializedKey = Bytes.wrap(keySerde.serializer().serialize(changelogTopic, key)); - final byte[] serializedValue = valueSerde.serializer().serialize(changelogTopic, value); + final Change serialChange = valueSerde.serializeParts(changelogTopic, value); final BufferValue buffered = getBuffered(serializedKey); final byte[] serializedPriorValue; @@ -453,7 +466,11 @@ public final class InMemoryTimeOrderedKeyValueBuffer implements TimeOrdere serializedPriorValue = buffered.priorValue(); } - cleanPut(time, serializedKey, new BufferValue(new ContextualRecord(serializedValue, recordContext), serializedPriorValue)); + cleanPut( + time, + serializedKey, + new BufferValue(serializedPriorValue, serialChange.oldValue, serialChange.newValue, recordContext) + ); dirtyKeys.add(serializedKey); updateBufferMetrics(); } @@ -504,7 +521,7 @@ public final class InMemoryTimeOrderedKeyValueBuffer implements TimeOrdere size += 8; // buffer time size += key.get().length; if (value != null) { - size += value.sizeBytes(); + size += value.residentMemorySizeEstimate(); } return size; } diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/LRUCacheEntry.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/LRUCacheEntry.java index f454862f2a8..0f1a1acc3ec 100644 --- a/streams/src/main/java/org/apache/kafka/streams/state/internals/LRUCacheEntry.java +++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/LRUCacheEntry.java @@ -50,7 +50,7 @@ class LRUCacheEntry { this.isDirty = isDirty; this.sizeBytes = 1 + // isDirty - record.sizeBytes(); + record.residentMemorySizeEstimate(); } void markClean() { diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/FullChangeSerdeTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/FullChangeSerdeTest.java index ddba05e0641..97e6c0697ea 100644 --- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/FullChangeSerdeTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/FullChangeSerdeTest.java @@ -16,146 +16,74 @@ */ package org.apache.kafka.streams.kstream.internals; -import org.apache.kafka.common.serialization.Deserializer; -import org.apache.kafka.common.serialization.Serde; import org.apache.kafka.common.serialization.Serdes; -import org.apache.kafka.common.serialization.Serializer; -import org.easymock.EasyMock; import org.junit.Test; -import static java.util.Collections.emptyMap; import static org.hamcrest.CoreMatchers.nullValue; import static org.hamcrest.MatcherAssert.assertThat; import static org.hamcrest.core.Is.is; public class FullChangeSerdeTest { - private final FullChangeSerde serde = FullChangeSerde.castOrWrap(Serdes.String()); + private final FullChangeSerde serde = FullChangeSerde.wrap(Serdes.String()); @Test public void shouldRoundTripNull() { - final byte[] serialized = serde.serializer().serialize(null, null); - assertThat( - serde.deserializer().deserialize(null, serialized), - nullValue() - ); + assertThat(serde.serializeParts(null, null), nullValue()); + assertThat(FullChangeSerde.composeLegacyFormat(null), nullValue()); + assertThat(FullChangeSerde.decomposeLegacyFormat(null), nullValue()); + assertThat(serde.deserializeParts(null, null), nullValue()); } @Test public void shouldRoundTripNullChange() { - final byte[] serialized = serde.serializer().serialize(null, new Change<>(null, null)); assertThat( - serde.deserializer().deserialize(null, serialized), - is(new Change<>(null, null)) + serde.serializeParts(null, new Change<>(null, null)), + is(new Change(null, null)) + ); + + assertThat( + serde.deserializeParts(null, new Change<>(null, null)), + is(new Change(null, null)) + ); + + final byte[] legacyFormat = FullChangeSerde.composeLegacyFormat(new Change<>(null, null)); + assertThat( + FullChangeSerde.decomposeLegacyFormat(legacyFormat), + is(new Change(null, null)) ); } @Test public void shouldRoundTripOldNull() { - final byte[] serialized = serde.serializer().serialize(null, new Change<>("new", null)); + final Change serialized = serde.serializeParts(null, new Change<>("new", null)); + final byte[] legacyFormat = FullChangeSerde.composeLegacyFormat(serialized); + final Change decomposedLegacyFormat = FullChangeSerde.decomposeLegacyFormat(legacyFormat); assertThat( - serde.deserializer().deserialize(null, serialized), + serde.deserializeParts(null, decomposedLegacyFormat), is(new Change<>("new", null)) ); } @Test public void shouldRoundTripNewNull() { - final byte[] serialized = serde.serializer().serialize(null, new Change<>(null, "old")); + final Change serialized = serde.serializeParts(null, new Change<>(null, "old")); + final byte[] legacyFormat = FullChangeSerde.composeLegacyFormat(serialized); + final Change decomposedLegacyFormat = FullChangeSerde.decomposeLegacyFormat(legacyFormat); assertThat( - serde.deserializer().deserialize(null, serialized), + serde.deserializeParts(null, decomposedLegacyFormat), is(new Change<>(null, "old")) ); } @Test public void shouldRoundTripChange() { - final byte[] serialized = serde.serializer().serialize(null, new Change<>("new", "old")); + final Change serialized = serde.serializeParts(null, new Change<>("new", "old")); + final byte[] legacyFormat = FullChangeSerde.composeLegacyFormat(serialized); + final Change decomposedLegacyFormat = FullChangeSerde.decomposeLegacyFormat(legacyFormat); assertThat( - serde.deserializer().deserialize(null, serialized), + serde.deserializeParts(null, decomposedLegacyFormat), is(new Change<>("new", "old")) ); } - - @Test - public void shouldConfigureSerde() { - final Serde mock = EasyMock.mock(Serde.class); - mock.configure(emptyMap(), false); - EasyMock.expectLastCall(); - EasyMock.replay(mock); - final FullChangeSerde serde = FullChangeSerde.castOrWrap(mock); - serde.configure(emptyMap(), false); - EasyMock.verify(mock); - } - - @Test - public void shouldCloseSerde() { - final Serde mock = EasyMock.mock(Serde.class); - mock.close(); - EasyMock.expectLastCall(); - EasyMock.replay(mock); - final FullChangeSerde serde = FullChangeSerde.castOrWrap(mock); - serde.close(); - EasyMock.verify(mock); - } - - @Test - public void shouldConfigureSerializer() { - final Serde mockSerde = EasyMock.mock(Serde.class); - final Serializer mockSerializer = EasyMock.mock(Serializer.class); - EasyMock.expect(mockSerde.serializer()).andReturn(mockSerializer); - EasyMock.replay(mockSerde); - mockSerializer.configure(emptyMap(), false); - EasyMock.expectLastCall(); - EasyMock.replay(mockSerializer); - final Serializer> serializer = FullChangeSerde.castOrWrap(mockSerde).serializer(); - serializer.configure(emptyMap(), false); - EasyMock.verify(mockSerde); - EasyMock.verify(mockSerializer); - } - - @Test - public void shouldCloseSerializer() { - final Serde mockSerde = EasyMock.mock(Serde.class); - final Serializer mockSerializer = EasyMock.mock(Serializer.class); - EasyMock.expect(mockSerde.serializer()).andReturn(mockSerializer); - EasyMock.replay(mockSerde); - mockSerializer.close(); - EasyMock.expectLastCall(); - EasyMock.replay(mockSerializer); - final Serializer> serializer = FullChangeSerde.castOrWrap(mockSerde).serializer(); - serializer.close(); - EasyMock.verify(mockSerde); - EasyMock.verify(mockSerializer); - } - - @Test - public void shouldConfigureDeserializer() { - final Serde mockSerde = EasyMock.mock(Serde.class); - final Deserializer mockDeserializer = EasyMock.mock(Deserializer.class); - EasyMock.expect(mockSerde.deserializer()).andReturn(mockDeserializer); - EasyMock.replay(mockSerde); - mockDeserializer.configure(emptyMap(), false); - EasyMock.expectLastCall(); - EasyMock.replay(mockDeserializer); - final Deserializer> serializer = FullChangeSerde.castOrWrap(mockSerde).deserializer(); - serializer.configure(emptyMap(), false); - EasyMock.verify(mockSerde); - EasyMock.verify(mockDeserializer); - } - - @Test - public void shouldCloseDeserializer() { - final Serde mockSerde = EasyMock.mock(Serde.class); - final Deserializer mockDeserializer = EasyMock.mock(Deserializer.class); - EasyMock.expect(mockSerde.deserializer()).andReturn(mockDeserializer); - EasyMock.replay(mockSerde); - mockDeserializer.close(); - EasyMock.expectLastCall(); - EasyMock.replay(mockDeserializer); - final Deserializer> serializer = FullChangeSerde.castOrWrap(mockSerde).deserializer(); - serializer.close(); - EasyMock.verify(mockSerde); - EasyMock.verify(mockDeserializer); - } } diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/suppress/KTableSuppressProcessorMetricsTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/suppress/KTableSuppressProcessorMetricsTest.java index 96ee73553a4..ef466633090 100644 --- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/suppress/KTableSuppressProcessorMetricsTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/suppress/KTableSuppressProcessorMetricsTest.java @@ -21,7 +21,6 @@ import org.apache.kafka.common.MetricName; import org.apache.kafka.common.serialization.Serdes; import org.apache.kafka.streams.kstream.Suppressed; import org.apache.kafka.streams.kstream.internals.Change; -import org.apache.kafka.streams.kstream.internals.FullChangeSerde; import org.apache.kafka.streams.kstream.internals.KTableImpl; import org.apache.kafka.streams.processor.Processor; import org.apache.kafka.streams.processor.StateStore; @@ -139,7 +138,7 @@ public class KTableSuppressProcessorMetricsTest { final StateStore buffer = new InMemoryTimeOrderedKeyValueBuffer.Builder<>( storeName, Serdes.String(), - FullChangeSerde.castOrWrap(Serdes.Long()) + Serdes.Long() ) .withLoggingDisabled() .build(); @@ -169,9 +168,9 @@ public class KTableSuppressProcessorMetricsTest { verifyMetric(metrics, EVICTION_RATE_METRIC, is(0.0)); verifyMetric(metrics, EVICTION_TOTAL_METRIC, is(0.0)); - verifyMetric(metrics, BUFFER_SIZE_AVG_METRIC, is(29.5)); - verifyMetric(metrics, BUFFER_SIZE_CURRENT_METRIC, is(59.0)); - verifyMetric(metrics, BUFFER_SIZE_MAX_METRIC, is(59.0)); + verifyMetric(metrics, BUFFER_SIZE_AVG_METRIC, is(21.5)); + verifyMetric(metrics, BUFFER_SIZE_CURRENT_METRIC, is(43.0)); + verifyMetric(metrics, BUFFER_SIZE_MAX_METRIC, is(43.0)); verifyMetric(metrics, BUFFER_COUNT_AVG_METRIC, is(0.5)); verifyMetric(metrics, BUFFER_COUNT_CURRENT_METRIC, is(1.0)); verifyMetric(metrics, BUFFER_COUNT_MAX_METRIC, is(1.0)); @@ -185,9 +184,9 @@ public class KTableSuppressProcessorMetricsTest { verifyMetric(metrics, EVICTION_RATE_METRIC, greaterThan(0.0)); verifyMetric(metrics, EVICTION_TOTAL_METRIC, is(1.0)); - verifyMetric(metrics, BUFFER_SIZE_AVG_METRIC, is(57.0)); - verifyMetric(metrics, BUFFER_SIZE_CURRENT_METRIC, is(55.0)); - verifyMetric(metrics, BUFFER_SIZE_MAX_METRIC, is(114.0)); + verifyMetric(metrics, BUFFER_SIZE_AVG_METRIC, is(41.0)); + verifyMetric(metrics, BUFFER_SIZE_CURRENT_METRIC, is(39.0)); + verifyMetric(metrics, BUFFER_SIZE_MAX_METRIC, is(82.0)); verifyMetric(metrics, BUFFER_COUNT_AVG_METRIC, is(1.0)); verifyMetric(metrics, BUFFER_COUNT_CURRENT_METRIC, is(1.0)); verifyMetric(metrics, BUFFER_COUNT_MAX_METRIC, is(2.0)); diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/suppress/KTableSuppressProcessorTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/suppress/KTableSuppressProcessorTest.java index d8cb858f5f9..1d1d6fb2ab0 100644 --- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/suppress/KTableSuppressProcessorTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/suppress/KTableSuppressProcessorTest.java @@ -25,7 +25,6 @@ import org.apache.kafka.streams.kstream.TimeWindowedDeserializer; import org.apache.kafka.streams.kstream.TimeWindowedSerializer; import org.apache.kafka.streams.kstream.Windowed; import org.apache.kafka.streams.kstream.internals.Change; -import org.apache.kafka.streams.kstream.internals.FullChangeSerde; import org.apache.kafka.streams.kstream.internals.KTableImpl; import org.apache.kafka.streams.kstream.internals.SessionWindow; import org.apache.kafka.streams.kstream.internals.TimeWindow; @@ -75,7 +74,7 @@ public class KTableSuppressProcessorTest { final String storeName = "test-store"; - final StateStore buffer = new InMemoryTimeOrderedKeyValueBuffer.Builder<>(storeName, keySerde, FullChangeSerde.castOrWrap(valueSerde)) + final StateStore buffer = new InMemoryTimeOrderedKeyValueBuffer.Builder<>(storeName, keySerde, valueSerde) .withLoggingDisabled() .build(); diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/suppress/SuppressSuite.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/suppress/SuppressSuite.java index 3aef6d09b09..a323b9b25bd 100644 --- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/suppress/SuppressSuite.java +++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/suppress/SuppressSuite.java @@ -19,8 +19,10 @@ package org.apache.kafka.streams.kstream.internals.suppress; import org.apache.kafka.streams.integration.SuppressionDurabilityIntegrationTest; import org.apache.kafka.streams.integration.SuppressionIntegrationTest; import org.apache.kafka.streams.kstream.SuppressedTest; +import org.apache.kafka.streams.kstream.internals.FullChangeSerdeTest; import org.apache.kafka.streams.kstream.internals.SuppressScenarioTest; import org.apache.kafka.streams.kstream.internals.SuppressTopologyTest; +import org.apache.kafka.streams.state.internals.BufferValueTest; import org.apache.kafka.streams.state.internals.InMemoryTimeOrderedKeyValueBufferTest; import org.apache.kafka.streams.state.internals.TimeOrderedKeyValueBufferTest; import org.junit.runner.RunWith; @@ -30,21 +32,25 @@ import org.junit.runners.Suite; * This suite runs all the tests related to the Suppression feature. * * It can be used from an IDE to selectively just run these tests when developing code related to Suppress. - * + * * If desired, it can also be added to a Gradle build task, although this isn't strictly necessary, since all * these tests are already included in the `:streams:test` task. */ @RunWith(Suite.class) @Suite.SuiteClasses({ + BufferValueTest.class, KTableSuppressProcessorMetricsTest.class, KTableSuppressProcessorTest.class, SuppressScenarioTest.class, SuppressTopologyTest.class, SuppressedTest.class, - SuppressionIntegrationTest.class, - SuppressionDurabilityIntegrationTest.class, InMemoryTimeOrderedKeyValueBufferTest.class, - TimeOrderedKeyValueBufferTest.class + TimeOrderedKeyValueBufferTest.class, + FullChangeSerdeTest.class, + SuppressionIntegrationTest.class, + SuppressionDurabilityIntegrationTest.class }) public class SuppressSuite { } + + diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorRecordContextTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorRecordContextTest.java index 1ea646fce2f..83ab1279d0d 100644 --- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorRecordContextTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorRecordContextTest.java @@ -37,7 +37,7 @@ public class ProcessorRecordContextTest { null ); - assertEquals(MIN_SIZE, context.sizeBytes()); + assertEquals(MIN_SIZE, context.residentMemorySizeEstimate()); } @Test @@ -50,7 +50,7 @@ public class ProcessorRecordContextTest { new RecordHeaders() ); - assertEquals(MIN_SIZE, context.sizeBytes()); + assertEquals(MIN_SIZE, context.residentMemorySizeEstimate()); } @Test @@ -63,7 +63,7 @@ public class ProcessorRecordContextTest { null ); - assertEquals(MIN_SIZE + 5L, context.sizeBytes()); + assertEquals(MIN_SIZE + 5L, context.residentMemorySizeEstimate()); } @Test @@ -78,7 +78,7 @@ public class ProcessorRecordContextTest { headers ); - assertEquals(MIN_SIZE + 10L + 12L, context.sizeBytes()); + assertEquals(MIN_SIZE + 10L + 12L, context.residentMemorySizeEstimate()); } @Test @@ -93,6 +93,6 @@ public class ProcessorRecordContextTest { headers ); - assertEquals(MIN_SIZE + 10L, context.sizeBytes()); + assertEquals(MIN_SIZE + 10L, context.residentMemorySizeEstimate()); } } diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/BufferValueTest.java b/streams/src/test/java/org/apache/kafka/streams/state/internals/BufferValueTest.java new file mode 100644 index 00000000000..d663461a8cf --- /dev/null +++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/BufferValueTest.java @@ -0,0 +1,203 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.streams.state.internals; + +import org.apache.kafka.streams.processor.internals.ProcessorRecordContext; +import org.junit.Test; + +import java.nio.ByteBuffer; +import java.util.Arrays; + +import static org.hamcrest.MatcherAssert.assertThat; +import static org.hamcrest.Matchers.is; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNull; +import static org.junit.Assert.assertSame; + +public class BufferValueTest { + @Test + public void shouldDeduplicateNullValues() { + final BufferValue bufferValue = new BufferValue(null, null, null, null); + assertSame(bufferValue.priorValue(), bufferValue.oldValue()); + } + + @Test + public void shouldDeduplicateIndenticalValues() { + final byte[] bytes = {(byte) 0}; + final BufferValue bufferValue = new BufferValue(bytes, bytes, null, null); + assertSame(bufferValue.priorValue(), bufferValue.oldValue()); + } + + @Test + public void shouldDeduplicateEqualValues() { + final BufferValue bufferValue = new BufferValue(new byte[] {(byte) 0}, new byte[] {(byte) 0}, null, null); + assertSame(bufferValue.priorValue(), bufferValue.oldValue()); + } + + @Test + public void shouldStoreDifferentValues() { + final byte[] priorValue = {(byte) 0}; + final byte[] oldValue = {(byte) 1}; + final BufferValue bufferValue = new BufferValue(priorValue, oldValue, null, null); + assertSame(priorValue, bufferValue.priorValue()); + assertSame(oldValue, bufferValue.oldValue()); + } + + @Test + public void shouldStoreDifferentValuesWithPriorNull() { + final byte[] priorValue = null; + final byte[] oldValue = {(byte) 1}; + final BufferValue bufferValue = new BufferValue(priorValue, oldValue, null, null); + assertNull(bufferValue.priorValue()); + assertSame(oldValue, bufferValue.oldValue()); + } + + @Test + public void shouldStoreDifferentValuesWithOldNull() { + final byte[] priorValue = {(byte) 0}; + final byte[] oldValue = null; + final BufferValue bufferValue = new BufferValue(priorValue, oldValue, null, null); + assertSame(priorValue, bufferValue.priorValue()); + assertNull(bufferValue.oldValue()); + } + + @Test + public void shouldAccountForDeduplicationInSizeEstimate() { + final ProcessorRecordContext context = new ProcessorRecordContext(0L, 0L, 0, "topic", null); + assertEquals(25L, new BufferValue(null, null, null, context).residentMemorySizeEstimate()); + assertEquals(26L, new BufferValue(new byte[] {(byte) 0}, null, null, context).residentMemorySizeEstimate()); + assertEquals(26L, new BufferValue(null, new byte[] {(byte) 0}, null, context).residentMemorySizeEstimate()); + assertEquals(26L, new BufferValue(new byte[] {(byte) 0}, new byte[] {(byte) 0}, null, context).residentMemorySizeEstimate()); + assertEquals(27L, new BufferValue(new byte[] {(byte) 0}, new byte[] {(byte) 1}, null, context).residentMemorySizeEstimate()); + + // new value should get counted, but doesn't get deduplicated + assertEquals(28L, new BufferValue(new byte[] {(byte) 0}, new byte[] {(byte) 1}, new byte[] {(byte) 0}, context).residentMemorySizeEstimate()); + } + + @Test + public void shouldSerializeNulls() { + final ProcessorRecordContext context = new ProcessorRecordContext(0L, 0L, 0, "topic", null); + final byte[] serializedContext = context.serialize(); + final byte[] bytes = new BufferValue(null, null, null, context).serialize(0).array(); + final byte[] withoutContext = Arrays.copyOfRange(bytes, serializedContext.length, bytes.length); + + assertThat(withoutContext, is(ByteBuffer.allocate(Integer.BYTES * 3).putInt(-1).putInt(-1).putInt(-1).array())); + } + + @Test + public void shouldSerializePrior() { + final ProcessorRecordContext context = new ProcessorRecordContext(0L, 0L, 0, "topic", null); + final byte[] serializedContext = context.serialize(); + final byte[] priorValue = {(byte) 5}; + final byte[] bytes = new BufferValue(priorValue, null, null, context).serialize(0).array(); + final byte[] withoutContext = Arrays.copyOfRange(bytes, serializedContext.length, bytes.length); + + assertThat(withoutContext, is(ByteBuffer.allocate(Integer.BYTES * 3 + 1).putInt(1).put(priorValue).putInt(-1).putInt(-1).array())); + } + + @Test + public void shouldSerializeOld() { + final ProcessorRecordContext context = new ProcessorRecordContext(0L, 0L, 0, "topic", null); + final byte[] serializedContext = context.serialize(); + final byte[] oldValue = {(byte) 5}; + final byte[] bytes = new BufferValue(null, oldValue, null, context).serialize(0).array(); + final byte[] withoutContext = Arrays.copyOfRange(bytes, serializedContext.length, bytes.length); + + assertThat(withoutContext, is(ByteBuffer.allocate(Integer.BYTES * 3 + 1).putInt(-1).putInt(1).put(oldValue).putInt(-1).array())); + } + + @Test + public void shouldSerializeNew() { + final ProcessorRecordContext context = new ProcessorRecordContext(0L, 0L, 0, "topic", null); + final byte[] serializedContext = context.serialize(); + final byte[] newValue = {(byte) 5}; + final byte[] bytes = new BufferValue(null, null, newValue, context).serialize(0).array(); + final byte[] withoutContext = Arrays.copyOfRange(bytes, serializedContext.length, bytes.length); + + assertThat(withoutContext, is(ByteBuffer.allocate(Integer.BYTES * 3 + 1).putInt(-1).putInt(-1).putInt(1).put(newValue).array())); + } + + @Test + public void shouldCompactDuplicates() { + final ProcessorRecordContext context = new ProcessorRecordContext(0L, 0L, 0, "topic", null); + final byte[] serializedContext = context.serialize(); + final byte[] duplicate = {(byte) 5}; + final byte[] bytes = new BufferValue(duplicate, duplicate, null, context).serialize(0).array(); + final byte[] withoutContext = Arrays.copyOfRange(bytes, serializedContext.length, bytes.length); + + assertThat(withoutContext, is(ByteBuffer.allocate(Integer.BYTES * 3 + 1).putInt(1).put(duplicate).putInt(-2).putInt(-1).array())); + } + + @Test + public void shouldDeserializePrior() { + final ProcessorRecordContext context = new ProcessorRecordContext(0L, 0L, 0, "topic", null); + final byte[] serializedContext = context.serialize(); + final byte[] priorValue = {(byte) 5}; + final ByteBuffer serialValue = + ByteBuffer + .allocate(serializedContext.length + Integer.BYTES * 3 + priorValue.length) + .put(serializedContext).putInt(1).put(priorValue).putInt(-1).putInt(-1); + serialValue.position(0); + + final BufferValue deserialize = BufferValue.deserialize(serialValue); + assertThat(deserialize, is(new BufferValue(priorValue, null, null, context))); + } + + @Test + public void shouldDeserializeOld() { + final ProcessorRecordContext context = new ProcessorRecordContext(0L, 0L, 0, "topic", null); + final byte[] serializedContext = context.serialize(); + final byte[] oldValue = {(byte) 5}; + final ByteBuffer serialValue = + ByteBuffer + .allocate(serializedContext.length + Integer.BYTES * 3 + oldValue.length) + .put(serializedContext).putInt(-1).putInt(1).put(oldValue).putInt(-1); + serialValue.position(0); + + assertThat(BufferValue.deserialize(serialValue), is(new BufferValue(null, oldValue, null, context))); + } + + @Test + public void shouldDeserializeNew() { + final ProcessorRecordContext context = new ProcessorRecordContext(0L, 0L, 0, "topic", null); + final byte[] serializedContext = context.serialize(); + final byte[] newValue = {(byte) 5}; + final ByteBuffer serialValue = + ByteBuffer + .allocate(serializedContext.length + Integer.BYTES * 3 + newValue.length) + .put(serializedContext).putInt(-1).putInt(-1).putInt(1).put(newValue); + serialValue.position(0); + + assertThat(BufferValue.deserialize(serialValue), is(new BufferValue(null, null, newValue, context))); + } + + @Test + public void shouldDeserializeCompactedDuplicates() { + final ProcessorRecordContext context = new ProcessorRecordContext(0L, 0L, 0, "topic", null); + final byte[] serializedContext = context.serialize(); + final byte[] duplicate = {(byte) 5}; + final ByteBuffer serialValue = + ByteBuffer + .allocate(serializedContext.length + Integer.BYTES * 3 + duplicate.length) + .put(serializedContext).putInt(1).put(duplicate).putInt(-2).putInt(-1); + serialValue.position(0); + + final BufferValue bufferValue = BufferValue.deserialize(serialValue); + assertThat(bufferValue, is(new BufferValue(duplicate, duplicate, null, context))); + assertSame(bufferValue.priorValue(), bufferValue.oldValue()); + } +} diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/TimeOrderedKeyValueBufferTest.java b/streams/src/test/java/org/apache/kafka/streams/state/internals/TimeOrderedKeyValueBufferTest.java index 941832bff3a..5c9cbf91769 100644 --- a/streams/src/test/java/org/apache/kafka/streams/state/internals/TimeOrderedKeyValueBufferTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/TimeOrderedKeyValueBufferTest.java @@ -23,7 +23,6 @@ import org.apache.kafka.common.header.internals.RecordHeader; import org.apache.kafka.common.header.internals.RecordHeaders; import org.apache.kafka.common.record.TimestampType; import org.apache.kafka.common.serialization.Serdes; -import org.apache.kafka.common.serialization.Serializer; import org.apache.kafka.common.utils.Utils; import org.apache.kafka.streams.KeyValue; import org.apache.kafka.streams.StreamsConfig; @@ -190,11 +189,11 @@ public class TimeOrderedKeyValueBufferTest> serializer = FullChangeSerde.castOrWrap(Serdes.String()).serializer(); + final FullChangeSerde serializer = FullChangeSerde.wrap(Serdes.String()); - final byte[] todeleteValue = serializer.serialize(null, new Change<>("doomed", null)); - final byte[] asdfValue = serializer.serialize(null, new Change<>("qwer", null)); - final byte[] zxcvValue1 = serializer.serialize(null, new Change<>("eo4im", "previous")); - final byte[] zxcvValue2 = serializer.serialize(null, new Change<>("next", "eo4im")); + final byte[] todeleteValue = FullChangeSerde.composeLegacyFormat(serializer.serializeParts(null, new Change<>("doomed", null))); + final byte[] asdfValue = FullChangeSerde.composeLegacyFormat(serializer.serializeParts(null, new Change<>("qwer", null))); + final byte[] zxcvValue1 = FullChangeSerde.composeLegacyFormat(serializer.serializeParts(null, new Change<>("eo4im", "previous"))); + final byte[] zxcvValue2 = FullChangeSerde.composeLegacyFormat(serializer.serializeParts(null, new Change<>("next", "eo4im"))); stateRestoreCallback.restoreBatch(asList( new ConsumerRecord<>("changelog-topic", 0, @@ -412,7 +411,7 @@ public class TimeOrderedKeyValueBufferTest("changelog-topic", @@ -429,7 +428,7 @@ public class TimeOrderedKeyValueBufferTest fullChangeSerde = FullChangeSerde.castOrWrap(Serdes.String()); + final FullChangeSerde fullChangeSerde = FullChangeSerde.wrap(Serdes.String()); final byte[] zxcvValue1 = new ContextualRecord( - fullChangeSerde.serializer().serialize(null, new Change<>("3o4im", "previous")), + FullChangeSerde.composeLegacyFormat(fullChangeSerde.serializeParts(null, new Change<>("3o4im", "previous"))), getContext(2L) ).serialize(0).array(); - final FullChangeSerde fullChangeSerde1 = FullChangeSerde.castOrWrap(Serdes.String()); final byte[] zxcvValue2 = new ContextualRecord( - fullChangeSerde1.serializer().serialize(null, new Change<>("next", "3o4im")), + FullChangeSerde.composeLegacyFormat(fullChangeSerde.serializeParts(null, new Change<>("next", "3o4im"))), getContext(3L) ).serialize(0).array(); stateRestoreCallback.restoreBatch(asList( @@ -536,7 +534,7 @@ public class TimeOrderedKeyValueBufferTest("changelog-topic", @@ -553,7 +551,7 @@ public class TimeOrderedKeyValueBufferTest fullChangeSerde = FullChangeSerde.castOrWrap(Serdes.String()); + final FullChangeSerde fullChangeSerde = FullChangeSerde.wrap(Serdes.String()); final byte[] zxcvValue1 = new BufferValue( - new ContextualRecord( - fullChangeSerde.serializer().serialize(null, new Change<>("3o4im", "IGNORED")), - getContext(2L) - ), - Serdes.String().serializer().serialize(null, "previous") + Serdes.String().serializer().serialize(null, "previous"), + Serdes.String().serializer().serialize(null, "IGNORED"), + Serdes.String().serializer().serialize(null, "3o4im"), + getContext(2L) ).serialize(0).array(); - final FullChangeSerde fullChangeSerde1 = FullChangeSerde.castOrWrap(Serdes.String()); + final FullChangeSerde fullChangeSerde1 = FullChangeSerde.wrap(Serdes.String()); final byte[] zxcvValue2 = new BufferValue( - new ContextualRecord( - fullChangeSerde1.serializer().serialize(null, new Change<>("next", "3o4im")), - getContext(3L) - ), - Serdes.String().serializer().serialize(null, "previous") + Serdes.String().serializer().serialize(null, "previous"), + Serdes.String().serializer().serialize(null, "3o4im"), + Serdes.String().serializer().serialize(null, "next"), + getContext(3L) ).serialize(0).array(); stateRestoreCallback.restoreBatch(asList( new ConsumerRecord<>("changelog-topic", @@ -668,7 +664,7 @@ public class TimeOrderedKeyValueBufferTest("changelog-topic", @@ -685,7 +681,7 @@ public class TimeOrderedKeyValueBufferTest fullChangeSerde = FullChangeSerde.castOrWrap(Serdes.String()); + final FullChangeSerde fullChangeSerde = FullChangeSerde.wrap(Serdes.String()); return new ContextualRecord( - fullChangeSerde.serializer().serialize(null, new Change<>(value, null)), + FullChangeSerde.composeLegacyFormat(fullChangeSerde.serializeParts(null, new Change<>(value, null))), getContext(timestamp) ); }