Browse Source

KAFKA-8452: Compressed BufferValue (#6848)

De-duplicate the common case in which the prior value is the same as the old value.

Reviewers: Sophie Blee-Goldman <sophie@confluent.io>,  Bill Bejeck <bbejeck@gmail.com>
pull/6934/head
John Roesler 6 years ago committed by Bill Bejeck
parent
commit
e54ab292e7
  1. 2
      streams/src/main/java/org/apache/kafka/streams/kstream/internals/Change.java
  2. 117
      streams/src/main/java/org/apache/kafka/streams/kstream/internals/FullChangeSerde.java
  3. 2
      streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorRecordContext.java
  4. 135
      streams/src/main/java/org/apache/kafka/streams/state/internals/BufferValue.java
  5. 4
      streams/src/main/java/org/apache/kafka/streams/state/internals/ContextualRecord.java
  6. 69
      streams/src/main/java/org/apache/kafka/streams/state/internals/InMemoryTimeOrderedKeyValueBuffer.java
  7. 2
      streams/src/main/java/org/apache/kafka/streams/state/internals/LRUCacheEntry.java
  8. 132
      streams/src/test/java/org/apache/kafka/streams/kstream/internals/FullChangeSerdeTest.java
  9. 15
      streams/src/test/java/org/apache/kafka/streams/kstream/internals/suppress/KTableSuppressProcessorMetricsTest.java
  10. 3
      streams/src/test/java/org/apache/kafka/streams/kstream/internals/suppress/KTableSuppressProcessorTest.java
  11. 12
      streams/src/test/java/org/apache/kafka/streams/kstream/internals/suppress/SuppressSuite.java
  12. 10
      streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorRecordContextTest.java
  13. 203
      streams/src/test/java/org/apache/kafka/streams/state/internals/BufferValueTest.java
  14. 74
      streams/src/test/java/org/apache/kafka/streams/state/internals/TimeOrderedKeyValueBufferTest.java

2
streams/src/main/java/org/apache/kafka/streams/kstream/internals/Change.java

@ -30,7 +30,7 @@ public class Change<T> { @@ -30,7 +30,7 @@ public class Change<T> {
@Override
public String toString() {
return "(" + newValue + "<-" + oldValue + ")";
return "(" + String.valueOf(newValue) + "<-" + String.valueOf(oldValue) + ")";
}
@Override

117
streams/src/main/java/org/apache/kafka/streams/kstream/internals/FullChangeSerde.java

@ -21,19 +21,15 @@ import org.apache.kafka.common.serialization.Serde; @@ -21,19 +21,15 @@ import org.apache.kafka.common.serialization.Serde;
import org.apache.kafka.common.serialization.Serializer;
import java.nio.ByteBuffer;
import java.util.Map;
import static java.util.Objects.requireNonNull;
public final class FullChangeSerde<T> implements Serde<Change<T>> {
public final class FullChangeSerde<T> {
private final Serde<T> inner;
@SuppressWarnings("unchecked")
public static <T> FullChangeSerde<T> castOrWrap(final Serde<T> serde) {
public static <T> FullChangeSerde<T> wrap(final Serde<T> serde) {
if (serde == null) {
return null;
} else if (serde instanceof FullChangeSerde) {
return (FullChangeSerde<T>) serde;
} else {
return new FullChangeSerde<>(serde);
}
@ -47,98 +43,81 @@ public final class FullChangeSerde<T> implements Serde<Change<T>> { @@ -47,98 +43,81 @@ public final class FullChangeSerde<T> implements Serde<Change<T>> {
return inner;
}
@Override
public void configure(final Map<String, ?> configs, final boolean isKey) {
inner.configure(configs, isKey);
public Change<byte[]> serializeParts(final String topic, final Change<T> data) {
if (data == null) {
return null;
}
final Serializer<T> innerSerializer = innerSerde().serializer();
final byte[] oldBytes = data.oldValue == null ? null : innerSerializer.serialize(topic, data.oldValue);
final byte[] newBytes = data.newValue == null ? null : innerSerializer.serialize(topic, data.newValue);
return new Change<>(newBytes, oldBytes);
}
@Override
public void close() {
inner.close();
public Change<T> deserializeParts(final String topic, final Change<byte[]> serialChange) {
if (serialChange == null) {
return null;
}
final Deserializer<T> innerDeserializer = innerSerde().deserializer();
@Override
public Serializer<Change<T>> serializer() {
final Serializer<T> innerSerializer = inner.serializer();
final T oldValue =
serialChange.oldValue == null ? null : innerDeserializer.deserialize(topic, serialChange.oldValue);
final T newValue =
serialChange.newValue == null ? null : innerDeserializer.deserialize(topic, serialChange.newValue);
return new Serializer<Change<T>>() {
@Override
public void configure(final Map<String, ?> configs, final boolean isKey) {
innerSerializer.configure(configs, isKey);
return new Change<>(newValue, oldValue);
}
@Override
public byte[] serialize(final String topic, final Change<T> data) {
if (data == null) {
/**
* We used to serialize a Change into a single byte[]. Now, we don't anymore, but we still keep this logic here
* so that we can produce the legacy format to test that we can still deserialize it.
*/
public static byte[] composeLegacyFormat(final Change<byte[]> serialChange) {
if (serialChange == null) {
return null;
}
final byte[] oldBytes = data.oldValue == null ? null : innerSerializer.serialize(topic, data.oldValue);
final int oldSize = oldBytes == null ? -1 : oldBytes.length;
final byte[] newBytes = data.newValue == null ? null : innerSerializer.serialize(topic, data.newValue);
final int newSize = newBytes == null ? -1 : newBytes.length;
final ByteBuffer buffer = ByteBuffer.wrap(
new byte[4 + (oldSize == -1 ? 0 : oldSize) + 4 + (newSize == -1 ? 0 : newSize)]
);
final int oldSize = serialChange.oldValue == null ? -1 : serialChange.oldValue.length;
final int newSize = serialChange.newValue == null ? -1 : serialChange.newValue.length;
final ByteBuffer buffer = ByteBuffer.allocate(Integer.BYTES * 2 + Math.max(0, oldSize) + Math.max(0, newSize));
buffer.putInt(oldSize);
if (oldBytes != null) {
buffer.put(oldBytes);
if (serialChange.oldValue != null) {
buffer.put(serialChange.oldValue);
}
buffer.putInt(newSize);
if (newBytes != null) {
buffer.put(newBytes);
if (serialChange.newValue != null) {
buffer.put(serialChange.newValue);
}
return buffer.array();
}
@Override
public void close() {
innerSerializer.close();
}
};
}
@Override
public Deserializer<Change<T>> deserializer() {
final Deserializer<T> innerDeserializer = inner.deserializer();
return new Deserializer<Change<T>>() {
@Override
public void configure(final Map<String, ?> configs, final boolean isKey) {
innerDeserializer.configure(configs, isKey);
}
@Override
public Change<T> deserialize(final String topic, final byte[] data) {
/**
* We used to serialize a Change into a single byte[]. Now, we don't anymore, but we still
* need to be able to read it (so that we can load the state store from previously-written changelog records).
*/
public static Change<byte[]> decomposeLegacyFormat(final byte[] data) {
if (data == null) {
return null;
}
final ByteBuffer buffer = ByteBuffer.wrap(data);
final byte[] oldBytes = extractOldValuePart(buffer);
final T oldValue = oldBytes == null ? null : innerDeserializer.deserialize(topic, oldBytes);
final int oldSize = buffer.getInt();
final byte[] oldBytes = oldSize == -1 ? null : new byte[oldSize];
if (oldBytes != null) {
buffer.get(oldBytes);
}
final int newSize = buffer.getInt();
final byte[] newBytes = newSize == -1 ? null : new byte[newSize];
if (newBytes != null) {
buffer.get(newBytes);
}
final T newValue = newBytes == null ? null : innerDeserializer.deserialize(topic, newBytes);
return new Change<>(newValue, oldValue);
}
@Override
public void close() {
innerDeserializer.close();
}
};
return new Change<>(newBytes, oldBytes);
}
public static byte[] extractOldValuePart(final ByteBuffer buffer) {
final int oldSize = buffer.getInt();
final byte[] oldBytes = oldSize == -1 ? null : new byte[oldSize];
if (oldBytes != null) {
buffer.get(oldBytes);
}
return oldBytes;
}
}

2
streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorRecordContext.java

@ -73,7 +73,7 @@ public class ProcessorRecordContext implements RecordContext { @@ -73,7 +73,7 @@ public class ProcessorRecordContext implements RecordContext {
return headers;
}
public long sizeBytes() {
public long residentMemorySizeEstimate() {
long size = 0;
size += Long.BYTES; // value.context.timestamp
size += Long.BYTES; // value.context.offset

135
streams/src/main/java/org/apache/kafka/streams/state/internals/BufferValue.java

@ -16,60 +16,133 @@ @@ -16,60 +16,133 @@
*/
package org.apache.kafka.streams.state.internals;
import org.apache.kafka.streams.processor.internals.ProcessorRecordContext;
import java.nio.ByteBuffer;
import java.util.Arrays;
import java.util.Objects;
public final class BufferValue {
private final ContextualRecord record;
private static final int NULL_VALUE_SENTINEL = -1;
private static final int OLD_PREV_DUPLICATE_VALUE_SENTINEL = -2;
private final byte[] priorValue;
BufferValue(final ContextualRecord record,
final byte[] priorValue) {
this.record = record;
private final byte[] oldValue;
private final byte[] newValue;
private final ProcessorRecordContext recordContext;
BufferValue(final byte[] priorValue, final byte[] oldValue, final byte[] newValue, final ProcessorRecordContext recordContext) {
this.oldValue = oldValue;
this.newValue = newValue;
this.recordContext = recordContext;
// This de-duplicates the prior and old references.
// If they were already the same reference, the comparison is trivially fast, so we don't specifically check
// for that case.
if (Arrays.equals(priorValue, oldValue)) {
this.priorValue = oldValue;
} else {
this.priorValue = priorValue;
}
ContextualRecord record() {
return record;
}
byte[] priorValue() {
return priorValue;
}
byte[] oldValue() {
return oldValue;
}
byte[] newValue() {
return newValue;
}
ProcessorRecordContext context() {
return recordContext;
}
static BufferValue deserialize(final ByteBuffer buffer) {
final ContextualRecord record = ContextualRecord.deserialize(buffer);
final ProcessorRecordContext context = ProcessorRecordContext.deserialize(buffer);
final byte[] priorValue = extractValue(buffer);
final byte[] oldValue;
final int oldValueLength = buffer.getInt();
if (oldValueLength == NULL_VALUE_SENTINEL) {
oldValue = null;
} else if (oldValueLength == OLD_PREV_DUPLICATE_VALUE_SENTINEL) {
oldValue = priorValue;
} else {
oldValue = new byte[oldValueLength];
buffer.get(oldValue);
}
final byte[] newValue = extractValue(buffer);
return new BufferValue(priorValue, oldValue, newValue, context);
}
final int priorValueLength = buffer.getInt();
if (priorValueLength == -1) {
return new BufferValue(record, null);
private static byte[] extractValue(final ByteBuffer buffer) {
final int valueLength = buffer.getInt();
if (valueLength == NULL_VALUE_SENTINEL) {
return null;
} else {
final byte[] priorValue = new byte[priorValueLength];
buffer.get(priorValue);
return new BufferValue(record, priorValue);
final byte[] value = new byte[valueLength];
buffer.get(value);
return value;
}
}
ByteBuffer serialize(final int endPadding) {
final int sizeOfPriorValueLength = Integer.BYTES;
final int sizeOfValueLength = Integer.BYTES;
final int sizeOfPriorValue = priorValue == null ? 0 : priorValue.length;
final int sizeOfOldValue = oldValue == null || priorValue == oldValue ? 0 : oldValue.length;
final int sizeOfNewValue = newValue == null ? 0 : newValue.length;
final byte[] serializedContext = recordContext.serialize();
final ByteBuffer buffer = ByteBuffer.allocate(
serializedContext.length
+ sizeOfValueLength + sizeOfPriorValue
+ sizeOfValueLength + sizeOfOldValue
+ sizeOfValueLength + sizeOfNewValue
+ endPadding
);
final ByteBuffer buffer = record.serialize(sizeOfPriorValueLength + sizeOfPriorValue + endPadding);
buffer.put(serializedContext);
if (priorValue == null) {
buffer.putInt(-1);
addValue(buffer, priorValue);
if (oldValue == null) {
buffer.putInt(NULL_VALUE_SENTINEL);
} else if (priorValue == oldValue) {
buffer.putInt(OLD_PREV_DUPLICATE_VALUE_SENTINEL);
} else {
buffer.putInt(priorValue.length);
buffer.put(priorValue);
buffer.putInt(sizeOfOldValue);
buffer.put(oldValue);
}
addValue(buffer, newValue);
return buffer;
}
long sizeBytes() {
return (priorValue == null ? 0 : priorValue.length) + record.sizeBytes();
private static void addValue(final ByteBuffer buffer, final byte[] value) {
if (value == null) {
buffer.putInt(NULL_VALUE_SENTINEL);
} else {
buffer.putInt(value.length);
buffer.put(value);
}
}
long residentMemorySizeEstimate() {
return (priorValue == null ? 0 : priorValue.length)
+ (oldValue == null || priorValue == oldValue ? 0 : oldValue.length)
+ (newValue == null ? 0 : newValue.length)
+ recordContext.residentMemorySizeEstimate();
}
@Override
@ -77,22 +150,28 @@ public final class BufferValue { @@ -77,22 +150,28 @@ public final class BufferValue {
if (this == o) return true;
if (o == null || getClass() != o.getClass()) return false;
final BufferValue that = (BufferValue) o;
return Objects.equals(record, that.record) &&
Arrays.equals(priorValue, that.priorValue);
return Arrays.equals(priorValue, that.priorValue) &&
Arrays.equals(oldValue, that.oldValue) &&
Arrays.equals(newValue, that.newValue) &&
Objects.equals(recordContext, that.recordContext);
}
@Override
public int hashCode() {
int result = Objects.hash(record);
int result = Objects.hash(recordContext);
result = 31 * result + Arrays.hashCode(priorValue);
result = 31 * result + Arrays.hashCode(oldValue);
result = 31 * result + Arrays.hashCode(newValue);
return result;
}
@Override
public String toString() {
return "BufferValue{" +
"record=" + record +
", priorValue=" + Arrays.toString(priorValue) +
"priorValue=" + Arrays.toString(priorValue) +
", oldValue=" + Arrays.toString(oldValue) +
", newValue=" + Arrays.toString(newValue) +
", recordContext=" + recordContext +
'}';
}
}

4
streams/src/main/java/org/apache/kafka/streams/state/internals/ContextualRecord.java

@ -39,8 +39,8 @@ public class ContextualRecord { @@ -39,8 +39,8 @@ public class ContextualRecord {
return value;
}
long sizeBytes() {
return (value == null ? 0 : value.length) + recordContext.sizeBytes();
long residentMemorySizeEstimate() {
return (value == null ? 0 : value.length) + recordContext.residentMemorySizeEstimate();
}
ByteBuffer serialize(final int endPadding) {

69
streams/src/main/java/org/apache/kafka/streams/state/internals/InMemoryTimeOrderedKeyValueBuffer.java

@ -159,7 +159,7 @@ public final class InMemoryTimeOrderedKeyValueBuffer<K, V> implements TimeOrdere @@ -159,7 +159,7 @@ public final class InMemoryTimeOrderedKeyValueBuffer<K, V> implements TimeOrdere
this.storeName = storeName;
this.loggingEnabled = loggingEnabled;
this.keySerde = keySerde;
this.valueSerde = FullChangeSerde.castOrWrap(valueSerde);
this.valueSerde = FullChangeSerde.wrap(valueSerde);
}
@Override
@ -176,7 +176,7 @@ public final class InMemoryTimeOrderedKeyValueBuffer<K, V> implements TimeOrdere @@ -176,7 +176,7 @@ public final class InMemoryTimeOrderedKeyValueBuffer<K, V> implements TimeOrdere
@Override
public void setSerdesIfNull(final Serde<K> keySerde, final Serde<V> valueSerde) {
this.keySerde = this.keySerde == null ? keySerde : this.keySerde;
this.valueSerde = this.valueSerde == null ? FullChangeSerde.castOrWrap(valueSerde) : this.valueSerde;
this.valueSerde = this.valueSerde == null ? FullChangeSerde.wrap(valueSerde) : this.valueSerde;
}
@Override
@ -296,21 +296,26 @@ public final class InMemoryTimeOrderedKeyValueBuffer<K, V> implements TimeOrdere @@ -296,21 +296,26 @@ public final class InMemoryTimeOrderedKeyValueBuffer<K, V> implements TimeOrdere
final byte[] changelogValue = new byte[record.value().length - 8];
timeAndValue.get(changelogValue);
cleanPut(
time,
key,
new BufferValue(
new ContextualRecord(
changelogValue,
new ProcessorRecordContext(
final Change<byte[]> change = requireNonNull(FullChangeSerde.decomposeLegacyFormat(changelogValue));
final ProcessorRecordContext recordContext = new ProcessorRecordContext(
record.timestamp(),
record.offset(),
record.partition(),
record.topic(),
record.headers()
)
),
inferPriorValue(key, changelogValue)
);
cleanPut(
time,
key,
new BufferValue(
index.containsKey(key)
? internalPriorValueForBuffered(key)
: change.oldValue,
change.oldValue,
change.newValue,
recordContext
)
);
} else if (V_1_CHANGELOG_HEADERS.lastHeader("v").equals(record.headers().lastHeader("v"))) {
@ -321,7 +326,20 @@ public final class InMemoryTimeOrderedKeyValueBuffer<K, V> implements TimeOrdere @@ -321,7 +326,20 @@ public final class InMemoryTimeOrderedKeyValueBuffer<K, V> implements TimeOrdere
timeAndValue.get(changelogValue);
final ContextualRecord contextualRecord = ContextualRecord.deserialize(ByteBuffer.wrap(changelogValue));
cleanPut(time, key, new BufferValue(contextualRecord, inferPriorValue(key, contextualRecord.value())));
final Change<byte[]> change = requireNonNull(FullChangeSerde.decomposeLegacyFormat(contextualRecord.value()));
cleanPut(
time,
key,
new BufferValue(
index.containsKey(key)
? internalPriorValueForBuffered(key)
: change.oldValue,
change.oldValue,
change.newValue,
contextualRecord.recordContext()
)
);
} else if (V_2_CHANGELOG_HEADERS.lastHeader("v").equals(record.headers().lastHeader("v"))) {
// in this case, the changelog value is a serialized BufferValue
@ -346,13 +364,6 @@ public final class InMemoryTimeOrderedKeyValueBuffer<K, V> implements TimeOrdere @@ -346,13 +364,6 @@ public final class InMemoryTimeOrderedKeyValueBuffer<K, V> implements TimeOrdere
updateBufferMetrics();
}
private byte[] inferPriorValue(final Bytes key, final byte[] serializedChange) {
return index.containsKey(key)
? internalPriorValueForBuffered(key)
: FullChangeSerde.extractOldValuePart(ByteBuffer.wrap(serializedChange));
}
@Override
public void evictWhile(final Supplier<Boolean> predicate,
final Consumer<Eviction<K, V>> callback) {
@ -375,9 +386,11 @@ public final class InMemoryTimeOrderedKeyValueBuffer<K, V> implements TimeOrdere @@ -375,9 +386,11 @@ public final class InMemoryTimeOrderedKeyValueBuffer<K, V> implements TimeOrdere
}
final K key = keySerde.deserializer().deserialize(changelogTopic, next.getKey().key().get());
final BufferValue bufferValue = next.getValue();
final ContextualRecord record = bufferValue.record();
final Change<V> value = valueSerde.deserializer().deserialize(changelogTopic, record.value());
callback.accept(new Eviction<>(key, value, record.recordContext()));
final Change<V> value = valueSerde.deserializeParts(
changelogTopic,
new Change<>(bufferValue.newValue(), bufferValue.oldValue())
);
callback.accept(new Eviction<>(key, value, bufferValue.context()));
delegate.remove();
index.remove(next.getKey().key());
@ -442,7 +455,7 @@ public final class InMemoryTimeOrderedKeyValueBuffer<K, V> implements TimeOrdere @@ -442,7 +455,7 @@ public final class InMemoryTimeOrderedKeyValueBuffer<K, V> implements TimeOrdere
requireNonNull(recordContext, "recordContext cannot be null");
final Bytes serializedKey = Bytes.wrap(keySerde.serializer().serialize(changelogTopic, key));
final byte[] serializedValue = valueSerde.serializer().serialize(changelogTopic, value);
final Change<byte[]> serialChange = valueSerde.serializeParts(changelogTopic, value);
final BufferValue buffered = getBuffered(serializedKey);
final byte[] serializedPriorValue;
@ -453,7 +466,11 @@ public final class InMemoryTimeOrderedKeyValueBuffer<K, V> implements TimeOrdere @@ -453,7 +466,11 @@ public final class InMemoryTimeOrderedKeyValueBuffer<K, V> implements TimeOrdere
serializedPriorValue = buffered.priorValue();
}
cleanPut(time, serializedKey, new BufferValue(new ContextualRecord(serializedValue, recordContext), serializedPriorValue));
cleanPut(
time,
serializedKey,
new BufferValue(serializedPriorValue, serialChange.oldValue, serialChange.newValue, recordContext)
);
dirtyKeys.add(serializedKey);
updateBufferMetrics();
}
@ -504,7 +521,7 @@ public final class InMemoryTimeOrderedKeyValueBuffer<K, V> implements TimeOrdere @@ -504,7 +521,7 @@ public final class InMemoryTimeOrderedKeyValueBuffer<K, V> implements TimeOrdere
size += 8; // buffer time
size += key.get().length;
if (value != null) {
size += value.sizeBytes();
size += value.residentMemorySizeEstimate();
}
return size;
}

2
streams/src/main/java/org/apache/kafka/streams/state/internals/LRUCacheEntry.java

@ -50,7 +50,7 @@ class LRUCacheEntry { @@ -50,7 +50,7 @@ class LRUCacheEntry {
this.isDirty = isDirty;
this.sizeBytes = 1 + // isDirty
record.sizeBytes();
record.residentMemorySizeEstimate();
}
void markClean() {

132
streams/src/test/java/org/apache/kafka/streams/kstream/internals/FullChangeSerdeTest.java

@ -16,146 +16,74 @@ @@ -16,146 +16,74 @@
*/
package org.apache.kafka.streams.kstream.internals;
import org.apache.kafka.common.serialization.Deserializer;
import org.apache.kafka.common.serialization.Serde;
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.common.serialization.Serializer;
import org.easymock.EasyMock;
import org.junit.Test;
import static java.util.Collections.emptyMap;
import static org.hamcrest.CoreMatchers.nullValue;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.core.Is.is;
public class FullChangeSerdeTest {
private final FullChangeSerde<String> serde = FullChangeSerde.castOrWrap(Serdes.String());
private final FullChangeSerde<String> serde = FullChangeSerde.wrap(Serdes.String());
@Test
public void shouldRoundTripNull() {
final byte[] serialized = serde.serializer().serialize(null, null);
assertThat(
serde.deserializer().deserialize(null, serialized),
nullValue()
);
assertThat(serde.serializeParts(null, null), nullValue());
assertThat(FullChangeSerde.composeLegacyFormat(null), nullValue());
assertThat(FullChangeSerde.decomposeLegacyFormat(null), nullValue());
assertThat(serde.deserializeParts(null, null), nullValue());
}
@Test
public void shouldRoundTripNullChange() {
final byte[] serialized = serde.serializer().serialize(null, new Change<>(null, null));
assertThat(
serde.deserializer().deserialize(null, serialized),
is(new Change<>(null, null))
serde.serializeParts(null, new Change<>(null, null)),
is(new Change<byte[]>(null, null))
);
assertThat(
serde.deserializeParts(null, new Change<>(null, null)),
is(new Change<String>(null, null))
);
final byte[] legacyFormat = FullChangeSerde.composeLegacyFormat(new Change<>(null, null));
assertThat(
FullChangeSerde.decomposeLegacyFormat(legacyFormat),
is(new Change<byte[]>(null, null))
);
}
@Test
public void shouldRoundTripOldNull() {
final byte[] serialized = serde.serializer().serialize(null, new Change<>("new", null));
final Change<byte[]> serialized = serde.serializeParts(null, new Change<>("new", null));
final byte[] legacyFormat = FullChangeSerde.composeLegacyFormat(serialized);
final Change<byte[]> decomposedLegacyFormat = FullChangeSerde.decomposeLegacyFormat(legacyFormat);
assertThat(
serde.deserializer().deserialize(null, serialized),
serde.deserializeParts(null, decomposedLegacyFormat),
is(new Change<>("new", null))
);
}
@Test
public void shouldRoundTripNewNull() {
final byte[] serialized = serde.serializer().serialize(null, new Change<>(null, "old"));
final Change<byte[]> serialized = serde.serializeParts(null, new Change<>(null, "old"));
final byte[] legacyFormat = FullChangeSerde.composeLegacyFormat(serialized);
final Change<byte[]> decomposedLegacyFormat = FullChangeSerde.decomposeLegacyFormat(legacyFormat);
assertThat(
serde.deserializer().deserialize(null, serialized),
serde.deserializeParts(null, decomposedLegacyFormat),
is(new Change<>(null, "old"))
);
}
@Test
public void shouldRoundTripChange() {
final byte[] serialized = serde.serializer().serialize(null, new Change<>("new", "old"));
final Change<byte[]> serialized = serde.serializeParts(null, new Change<>("new", "old"));
final byte[] legacyFormat = FullChangeSerde.composeLegacyFormat(serialized);
final Change<byte[]> decomposedLegacyFormat = FullChangeSerde.decomposeLegacyFormat(legacyFormat);
assertThat(
serde.deserializer().deserialize(null, serialized),
serde.deserializeParts(null, decomposedLegacyFormat),
is(new Change<>("new", "old"))
);
}
@Test
public void shouldConfigureSerde() {
final Serde<Void> mock = EasyMock.mock(Serde.class);
mock.configure(emptyMap(), false);
EasyMock.expectLastCall();
EasyMock.replay(mock);
final FullChangeSerde<Void> serde = FullChangeSerde.castOrWrap(mock);
serde.configure(emptyMap(), false);
EasyMock.verify(mock);
}
@Test
public void shouldCloseSerde() {
final Serde<Void> mock = EasyMock.mock(Serde.class);
mock.close();
EasyMock.expectLastCall();
EasyMock.replay(mock);
final FullChangeSerde<Void> serde = FullChangeSerde.castOrWrap(mock);
serde.close();
EasyMock.verify(mock);
}
@Test
public void shouldConfigureSerializer() {
final Serde<Void> mockSerde = EasyMock.mock(Serde.class);
final Serializer<Void> mockSerializer = EasyMock.mock(Serializer.class);
EasyMock.expect(mockSerde.serializer()).andReturn(mockSerializer);
EasyMock.replay(mockSerde);
mockSerializer.configure(emptyMap(), false);
EasyMock.expectLastCall();
EasyMock.replay(mockSerializer);
final Serializer<Change<Void>> serializer = FullChangeSerde.castOrWrap(mockSerde).serializer();
serializer.configure(emptyMap(), false);
EasyMock.verify(mockSerde);
EasyMock.verify(mockSerializer);
}
@Test
public void shouldCloseSerializer() {
final Serde<Void> mockSerde = EasyMock.mock(Serde.class);
final Serializer<Void> mockSerializer = EasyMock.mock(Serializer.class);
EasyMock.expect(mockSerde.serializer()).andReturn(mockSerializer);
EasyMock.replay(mockSerde);
mockSerializer.close();
EasyMock.expectLastCall();
EasyMock.replay(mockSerializer);
final Serializer<Change<Void>> serializer = FullChangeSerde.castOrWrap(mockSerde).serializer();
serializer.close();
EasyMock.verify(mockSerde);
EasyMock.verify(mockSerializer);
}
@Test
public void shouldConfigureDeserializer() {
final Serde<Void> mockSerde = EasyMock.mock(Serde.class);
final Deserializer<Void> mockDeserializer = EasyMock.mock(Deserializer.class);
EasyMock.expect(mockSerde.deserializer()).andReturn(mockDeserializer);
EasyMock.replay(mockSerde);
mockDeserializer.configure(emptyMap(), false);
EasyMock.expectLastCall();
EasyMock.replay(mockDeserializer);
final Deserializer<Change<Void>> serializer = FullChangeSerde.castOrWrap(mockSerde).deserializer();
serializer.configure(emptyMap(), false);
EasyMock.verify(mockSerde);
EasyMock.verify(mockDeserializer);
}
@Test
public void shouldCloseDeserializer() {
final Serde<Void> mockSerde = EasyMock.mock(Serde.class);
final Deserializer<Void> mockDeserializer = EasyMock.mock(Deserializer.class);
EasyMock.expect(mockSerde.deserializer()).andReturn(mockDeserializer);
EasyMock.replay(mockSerde);
mockDeserializer.close();
EasyMock.expectLastCall();
EasyMock.replay(mockDeserializer);
final Deserializer<Change<Void>> serializer = FullChangeSerde.castOrWrap(mockSerde).deserializer();
serializer.close();
EasyMock.verify(mockSerde);
EasyMock.verify(mockDeserializer);
}
}

15
streams/src/test/java/org/apache/kafka/streams/kstream/internals/suppress/KTableSuppressProcessorMetricsTest.java

@ -21,7 +21,6 @@ import org.apache.kafka.common.MetricName; @@ -21,7 +21,6 @@ import org.apache.kafka.common.MetricName;
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.streams.kstream.Suppressed;
import org.apache.kafka.streams.kstream.internals.Change;
import org.apache.kafka.streams.kstream.internals.FullChangeSerde;
import org.apache.kafka.streams.kstream.internals.KTableImpl;
import org.apache.kafka.streams.processor.Processor;
import org.apache.kafka.streams.processor.StateStore;
@ -139,7 +138,7 @@ public class KTableSuppressProcessorMetricsTest { @@ -139,7 +138,7 @@ public class KTableSuppressProcessorMetricsTest {
final StateStore buffer = new InMemoryTimeOrderedKeyValueBuffer.Builder<>(
storeName, Serdes.String(),
FullChangeSerde.castOrWrap(Serdes.Long())
Serdes.Long()
)
.withLoggingDisabled()
.build();
@ -169,9 +168,9 @@ public class KTableSuppressProcessorMetricsTest { @@ -169,9 +168,9 @@ public class KTableSuppressProcessorMetricsTest {
verifyMetric(metrics, EVICTION_RATE_METRIC, is(0.0));
verifyMetric(metrics, EVICTION_TOTAL_METRIC, is(0.0));
verifyMetric(metrics, BUFFER_SIZE_AVG_METRIC, is(29.5));
verifyMetric(metrics, BUFFER_SIZE_CURRENT_METRIC, is(59.0));
verifyMetric(metrics, BUFFER_SIZE_MAX_METRIC, is(59.0));
verifyMetric(metrics, BUFFER_SIZE_AVG_METRIC, is(21.5));
verifyMetric(metrics, BUFFER_SIZE_CURRENT_METRIC, is(43.0));
verifyMetric(metrics, BUFFER_SIZE_MAX_METRIC, is(43.0));
verifyMetric(metrics, BUFFER_COUNT_AVG_METRIC, is(0.5));
verifyMetric(metrics, BUFFER_COUNT_CURRENT_METRIC, is(1.0));
verifyMetric(metrics, BUFFER_COUNT_MAX_METRIC, is(1.0));
@ -185,9 +184,9 @@ public class KTableSuppressProcessorMetricsTest { @@ -185,9 +184,9 @@ public class KTableSuppressProcessorMetricsTest {
verifyMetric(metrics, EVICTION_RATE_METRIC, greaterThan(0.0));
verifyMetric(metrics, EVICTION_TOTAL_METRIC, is(1.0));
verifyMetric(metrics, BUFFER_SIZE_AVG_METRIC, is(57.0));
verifyMetric(metrics, BUFFER_SIZE_CURRENT_METRIC, is(55.0));
verifyMetric(metrics, BUFFER_SIZE_MAX_METRIC, is(114.0));
verifyMetric(metrics, BUFFER_SIZE_AVG_METRIC, is(41.0));
verifyMetric(metrics, BUFFER_SIZE_CURRENT_METRIC, is(39.0));
verifyMetric(metrics, BUFFER_SIZE_MAX_METRIC, is(82.0));
verifyMetric(metrics, BUFFER_COUNT_AVG_METRIC, is(1.0));
verifyMetric(metrics, BUFFER_COUNT_CURRENT_METRIC, is(1.0));
verifyMetric(metrics, BUFFER_COUNT_MAX_METRIC, is(2.0));

3
streams/src/test/java/org/apache/kafka/streams/kstream/internals/suppress/KTableSuppressProcessorTest.java

@ -25,7 +25,6 @@ import org.apache.kafka.streams.kstream.TimeWindowedDeserializer; @@ -25,7 +25,6 @@ import org.apache.kafka.streams.kstream.TimeWindowedDeserializer;
import org.apache.kafka.streams.kstream.TimeWindowedSerializer;
import org.apache.kafka.streams.kstream.Windowed;
import org.apache.kafka.streams.kstream.internals.Change;
import org.apache.kafka.streams.kstream.internals.FullChangeSerde;
import org.apache.kafka.streams.kstream.internals.KTableImpl;
import org.apache.kafka.streams.kstream.internals.SessionWindow;
import org.apache.kafka.streams.kstream.internals.TimeWindow;
@ -75,7 +74,7 @@ public class KTableSuppressProcessorTest { @@ -75,7 +74,7 @@ public class KTableSuppressProcessorTest {
final String storeName = "test-store";
final StateStore buffer = new InMemoryTimeOrderedKeyValueBuffer.Builder<>(storeName, keySerde, FullChangeSerde.castOrWrap(valueSerde))
final StateStore buffer = new InMemoryTimeOrderedKeyValueBuffer.Builder<>(storeName, keySerde, valueSerde)
.withLoggingDisabled()
.build();

12
streams/src/test/java/org/apache/kafka/streams/kstream/internals/suppress/SuppressSuite.java

@ -19,8 +19,10 @@ package org.apache.kafka.streams.kstream.internals.suppress; @@ -19,8 +19,10 @@ package org.apache.kafka.streams.kstream.internals.suppress;
import org.apache.kafka.streams.integration.SuppressionDurabilityIntegrationTest;
import org.apache.kafka.streams.integration.SuppressionIntegrationTest;
import org.apache.kafka.streams.kstream.SuppressedTest;
import org.apache.kafka.streams.kstream.internals.FullChangeSerdeTest;
import org.apache.kafka.streams.kstream.internals.SuppressScenarioTest;
import org.apache.kafka.streams.kstream.internals.SuppressTopologyTest;
import org.apache.kafka.streams.state.internals.BufferValueTest;
import org.apache.kafka.streams.state.internals.InMemoryTimeOrderedKeyValueBufferTest;
import org.apache.kafka.streams.state.internals.TimeOrderedKeyValueBufferTest;
import org.junit.runner.RunWith;
@ -36,15 +38,19 @@ import org.junit.runners.Suite; @@ -36,15 +38,19 @@ import org.junit.runners.Suite;
*/
@RunWith(Suite.class)
@Suite.SuiteClasses({
BufferValueTest.class,
KTableSuppressProcessorMetricsTest.class,
KTableSuppressProcessorTest.class,
SuppressScenarioTest.class,
SuppressTopologyTest.class,
SuppressedTest.class,
SuppressionIntegrationTest.class,
SuppressionDurabilityIntegrationTest.class,
InMemoryTimeOrderedKeyValueBufferTest.class,
TimeOrderedKeyValueBufferTest.class
TimeOrderedKeyValueBufferTest.class,
FullChangeSerdeTest.class,
SuppressionIntegrationTest.class,
SuppressionDurabilityIntegrationTest.class
})
public class SuppressSuite {
}

10
streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorRecordContextTest.java

@ -37,7 +37,7 @@ public class ProcessorRecordContextTest { @@ -37,7 +37,7 @@ public class ProcessorRecordContextTest {
null
);
assertEquals(MIN_SIZE, context.sizeBytes());
assertEquals(MIN_SIZE, context.residentMemorySizeEstimate());
}
@Test
@ -50,7 +50,7 @@ public class ProcessorRecordContextTest { @@ -50,7 +50,7 @@ public class ProcessorRecordContextTest {
new RecordHeaders()
);
assertEquals(MIN_SIZE, context.sizeBytes());
assertEquals(MIN_SIZE, context.residentMemorySizeEstimate());
}
@Test
@ -63,7 +63,7 @@ public class ProcessorRecordContextTest { @@ -63,7 +63,7 @@ public class ProcessorRecordContextTest {
null
);
assertEquals(MIN_SIZE + 5L, context.sizeBytes());
assertEquals(MIN_SIZE + 5L, context.residentMemorySizeEstimate());
}
@Test
@ -78,7 +78,7 @@ public class ProcessorRecordContextTest { @@ -78,7 +78,7 @@ public class ProcessorRecordContextTest {
headers
);
assertEquals(MIN_SIZE + 10L + 12L, context.sizeBytes());
assertEquals(MIN_SIZE + 10L + 12L, context.residentMemorySizeEstimate());
}
@Test
@ -93,6 +93,6 @@ public class ProcessorRecordContextTest { @@ -93,6 +93,6 @@ public class ProcessorRecordContextTest {
headers
);
assertEquals(MIN_SIZE + 10L, context.sizeBytes());
assertEquals(MIN_SIZE + 10L, context.residentMemorySizeEstimate());
}
}

203
streams/src/test/java/org/apache/kafka/streams/state/internals/BufferValueTest.java

@ -0,0 +1,203 @@ @@ -0,0 +1,203 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.streams.state.internals;
import org.apache.kafka.streams.processor.internals.ProcessorRecordContext;
import org.junit.Test;
import java.nio.ByteBuffer;
import java.util.Arrays;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.is;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNull;
import static org.junit.Assert.assertSame;
public class BufferValueTest {
@Test
public void shouldDeduplicateNullValues() {
final BufferValue bufferValue = new BufferValue(null, null, null, null);
assertSame(bufferValue.priorValue(), bufferValue.oldValue());
}
@Test
public void shouldDeduplicateIndenticalValues() {
final byte[] bytes = {(byte) 0};
final BufferValue bufferValue = new BufferValue(bytes, bytes, null, null);
assertSame(bufferValue.priorValue(), bufferValue.oldValue());
}
@Test
public void shouldDeduplicateEqualValues() {
final BufferValue bufferValue = new BufferValue(new byte[] {(byte) 0}, new byte[] {(byte) 0}, null, null);
assertSame(bufferValue.priorValue(), bufferValue.oldValue());
}
@Test
public void shouldStoreDifferentValues() {
final byte[] priorValue = {(byte) 0};
final byte[] oldValue = {(byte) 1};
final BufferValue bufferValue = new BufferValue(priorValue, oldValue, null, null);
assertSame(priorValue, bufferValue.priorValue());
assertSame(oldValue, bufferValue.oldValue());
}
@Test
public void shouldStoreDifferentValuesWithPriorNull() {
final byte[] priorValue = null;
final byte[] oldValue = {(byte) 1};
final BufferValue bufferValue = new BufferValue(priorValue, oldValue, null, null);
assertNull(bufferValue.priorValue());
assertSame(oldValue, bufferValue.oldValue());
}
@Test
public void shouldStoreDifferentValuesWithOldNull() {
final byte[] priorValue = {(byte) 0};
final byte[] oldValue = null;
final BufferValue bufferValue = new BufferValue(priorValue, oldValue, null, null);
assertSame(priorValue, bufferValue.priorValue());
assertNull(bufferValue.oldValue());
}
@Test
public void shouldAccountForDeduplicationInSizeEstimate() {
final ProcessorRecordContext context = new ProcessorRecordContext(0L, 0L, 0, "topic", null);
assertEquals(25L, new BufferValue(null, null, null, context).residentMemorySizeEstimate());
assertEquals(26L, new BufferValue(new byte[] {(byte) 0}, null, null, context).residentMemorySizeEstimate());
assertEquals(26L, new BufferValue(null, new byte[] {(byte) 0}, null, context).residentMemorySizeEstimate());
assertEquals(26L, new BufferValue(new byte[] {(byte) 0}, new byte[] {(byte) 0}, null, context).residentMemorySizeEstimate());
assertEquals(27L, new BufferValue(new byte[] {(byte) 0}, new byte[] {(byte) 1}, null, context).residentMemorySizeEstimate());
// new value should get counted, but doesn't get deduplicated
assertEquals(28L, new BufferValue(new byte[] {(byte) 0}, new byte[] {(byte) 1}, new byte[] {(byte) 0}, context).residentMemorySizeEstimate());
}
@Test
public void shouldSerializeNulls() {
final ProcessorRecordContext context = new ProcessorRecordContext(0L, 0L, 0, "topic", null);
final byte[] serializedContext = context.serialize();
final byte[] bytes = new BufferValue(null, null, null, context).serialize(0).array();
final byte[] withoutContext = Arrays.copyOfRange(bytes, serializedContext.length, bytes.length);
assertThat(withoutContext, is(ByteBuffer.allocate(Integer.BYTES * 3).putInt(-1).putInt(-1).putInt(-1).array()));
}
@Test
public void shouldSerializePrior() {
final ProcessorRecordContext context = new ProcessorRecordContext(0L, 0L, 0, "topic", null);
final byte[] serializedContext = context.serialize();
final byte[] priorValue = {(byte) 5};
final byte[] bytes = new BufferValue(priorValue, null, null, context).serialize(0).array();
final byte[] withoutContext = Arrays.copyOfRange(bytes, serializedContext.length, bytes.length);
assertThat(withoutContext, is(ByteBuffer.allocate(Integer.BYTES * 3 + 1).putInt(1).put(priorValue).putInt(-1).putInt(-1).array()));
}
@Test
public void shouldSerializeOld() {
final ProcessorRecordContext context = new ProcessorRecordContext(0L, 0L, 0, "topic", null);
final byte[] serializedContext = context.serialize();
final byte[] oldValue = {(byte) 5};
final byte[] bytes = new BufferValue(null, oldValue, null, context).serialize(0).array();
final byte[] withoutContext = Arrays.copyOfRange(bytes, serializedContext.length, bytes.length);
assertThat(withoutContext, is(ByteBuffer.allocate(Integer.BYTES * 3 + 1).putInt(-1).putInt(1).put(oldValue).putInt(-1).array()));
}
@Test
public void shouldSerializeNew() {
final ProcessorRecordContext context = new ProcessorRecordContext(0L, 0L, 0, "topic", null);
final byte[] serializedContext = context.serialize();
final byte[] newValue = {(byte) 5};
final byte[] bytes = new BufferValue(null, null, newValue, context).serialize(0).array();
final byte[] withoutContext = Arrays.copyOfRange(bytes, serializedContext.length, bytes.length);
assertThat(withoutContext, is(ByteBuffer.allocate(Integer.BYTES * 3 + 1).putInt(-1).putInt(-1).putInt(1).put(newValue).array()));
}
@Test
public void shouldCompactDuplicates() {
final ProcessorRecordContext context = new ProcessorRecordContext(0L, 0L, 0, "topic", null);
final byte[] serializedContext = context.serialize();
final byte[] duplicate = {(byte) 5};
final byte[] bytes = new BufferValue(duplicate, duplicate, null, context).serialize(0).array();
final byte[] withoutContext = Arrays.copyOfRange(bytes, serializedContext.length, bytes.length);
assertThat(withoutContext, is(ByteBuffer.allocate(Integer.BYTES * 3 + 1).putInt(1).put(duplicate).putInt(-2).putInt(-1).array()));
}
@Test
public void shouldDeserializePrior() {
final ProcessorRecordContext context = new ProcessorRecordContext(0L, 0L, 0, "topic", null);
final byte[] serializedContext = context.serialize();
final byte[] priorValue = {(byte) 5};
final ByteBuffer serialValue =
ByteBuffer
.allocate(serializedContext.length + Integer.BYTES * 3 + priorValue.length)
.put(serializedContext).putInt(1).put(priorValue).putInt(-1).putInt(-1);
serialValue.position(0);
final BufferValue deserialize = BufferValue.deserialize(serialValue);
assertThat(deserialize, is(new BufferValue(priorValue, null, null, context)));
}
@Test
public void shouldDeserializeOld() {
final ProcessorRecordContext context = new ProcessorRecordContext(0L, 0L, 0, "topic", null);
final byte[] serializedContext = context.serialize();
final byte[] oldValue = {(byte) 5};
final ByteBuffer serialValue =
ByteBuffer
.allocate(serializedContext.length + Integer.BYTES * 3 + oldValue.length)
.put(serializedContext).putInt(-1).putInt(1).put(oldValue).putInt(-1);
serialValue.position(0);
assertThat(BufferValue.deserialize(serialValue), is(new BufferValue(null, oldValue, null, context)));
}
@Test
public void shouldDeserializeNew() {
final ProcessorRecordContext context = new ProcessorRecordContext(0L, 0L, 0, "topic", null);
final byte[] serializedContext = context.serialize();
final byte[] newValue = {(byte) 5};
final ByteBuffer serialValue =
ByteBuffer
.allocate(serializedContext.length + Integer.BYTES * 3 + newValue.length)
.put(serializedContext).putInt(-1).putInt(-1).putInt(1).put(newValue);
serialValue.position(0);
assertThat(BufferValue.deserialize(serialValue), is(new BufferValue(null, null, newValue, context)));
}
@Test
public void shouldDeserializeCompactedDuplicates() {
final ProcessorRecordContext context = new ProcessorRecordContext(0L, 0L, 0, "topic", null);
final byte[] serializedContext = context.serialize();
final byte[] duplicate = {(byte) 5};
final ByteBuffer serialValue =
ByteBuffer
.allocate(serializedContext.length + Integer.BYTES * 3 + duplicate.length)
.put(serializedContext).putInt(1).put(duplicate).putInt(-2).putInt(-1);
serialValue.position(0);
final BufferValue bufferValue = BufferValue.deserialize(serialValue);
assertThat(bufferValue, is(new BufferValue(duplicate, duplicate, null, context)));
assertSame(bufferValue.priorValue(), bufferValue.oldValue());
}
}

74
streams/src/test/java/org/apache/kafka/streams/state/internals/TimeOrderedKeyValueBufferTest.java

@ -23,7 +23,6 @@ import org.apache.kafka.common.header.internals.RecordHeader; @@ -23,7 +23,6 @@ import org.apache.kafka.common.header.internals.RecordHeader;
import org.apache.kafka.common.header.internals.RecordHeaders;
import org.apache.kafka.common.record.TimestampType;
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.common.serialization.Serializer;
import org.apache.kafka.common.utils.Utils;
import org.apache.kafka.streams.KeyValue;
import org.apache.kafka.streams.StreamsConfig;
@ -190,11 +189,11 @@ public class TimeOrderedKeyValueBufferTest<B extends TimeOrderedKeyValueBuffer<S @@ -190,11 +189,11 @@ public class TimeOrderedKeyValueBufferTest<B extends TimeOrderedKeyValueBuffer<S
final MockInternalProcessorContext context = makeContext();
buffer.init(context, buffer);
putRecord(buffer, context, 0L, 0L, "asdf", "23roni");
assertThat(buffer.bufferSize(), is(51L));
assertThat(buffer.bufferSize(), is(43L));
putRecord(buffer, context, 1L, 0L, "asdf", "3l");
assertThat(buffer.bufferSize(), is(47L));
assertThat(buffer.bufferSize(), is(39L));
putRecord(buffer, context, 0L, 0L, "zxcv", "qfowin");
assertThat(buffer.bufferSize(), is(98L));
assertThat(buffer.bufferSize(), is(82L));
cleanup(context, buffer);
}
@ -218,12 +217,12 @@ public class TimeOrderedKeyValueBufferTest<B extends TimeOrderedKeyValueBuffer<S @@ -218,12 +217,12 @@ public class TimeOrderedKeyValueBufferTest<B extends TimeOrderedKeyValueBuffer<S
putRecord(buffer, context, 1L, 0L, "zxcv", "o23i4");
assertThat(buffer.numRecords(), is(1));
assertThat(buffer.bufferSize(), is(50L));
assertThat(buffer.bufferSize(), is(42L));
assertThat(buffer.minTimestamp(), is(1L));
putRecord(buffer, context, 0L, 0L, "asdf", "3ng");
assertThat(buffer.numRecords(), is(2));
assertThat(buffer.bufferSize(), is(98L));
assertThat(buffer.bufferSize(), is(82L));
assertThat(buffer.minTimestamp(), is(0L));
final AtomicInteger callbackCount = new AtomicInteger(0);
@ -232,14 +231,14 @@ public class TimeOrderedKeyValueBufferTest<B extends TimeOrderedKeyValueBuffer<S @@ -232,14 +231,14 @@ public class TimeOrderedKeyValueBufferTest<B extends TimeOrderedKeyValueBuffer<S
case 1: {
assertThat(kv.key(), is("asdf"));
assertThat(buffer.numRecords(), is(2));
assertThat(buffer.bufferSize(), is(98L));
assertThat(buffer.bufferSize(), is(82L));
assertThat(buffer.minTimestamp(), is(0L));
break;
}
case 2: {
assertThat(kv.key(), is("zxcv"));
assertThat(buffer.numRecords(), is(1));
assertThat(buffer.bufferSize(), is(50L));
assertThat(buffer.bufferSize(), is(42L));
assertThat(buffer.minTimestamp(), is(1L));
break;
}
@ -361,12 +360,12 @@ public class TimeOrderedKeyValueBufferTest<B extends TimeOrderedKeyValueBuffer<S @@ -361,12 +360,12 @@ public class TimeOrderedKeyValueBufferTest<B extends TimeOrderedKeyValueBuffer<S
context.setRecordContext(new ProcessorRecordContext(0, 0, 0, "", null));
final Serializer<Change<String>> serializer = FullChangeSerde.castOrWrap(Serdes.String()).serializer();
final FullChangeSerde<String> serializer = FullChangeSerde.wrap(Serdes.String());
final byte[] todeleteValue = serializer.serialize(null, new Change<>("doomed", null));
final byte[] asdfValue = serializer.serialize(null, new Change<>("qwer", null));
final byte[] zxcvValue1 = serializer.serialize(null, new Change<>("eo4im", "previous"));
final byte[] zxcvValue2 = serializer.serialize(null, new Change<>("next", "eo4im"));
final byte[] todeleteValue = FullChangeSerde.composeLegacyFormat(serializer.serializeParts(null, new Change<>("doomed", null)));
final byte[] asdfValue = FullChangeSerde.composeLegacyFormat(serializer.serializeParts(null, new Change<>("qwer", null)));
final byte[] zxcvValue1 = FullChangeSerde.composeLegacyFormat(serializer.serializeParts(null, new Change<>("eo4im", "previous")));
final byte[] zxcvValue2 = FullChangeSerde.composeLegacyFormat(serializer.serializeParts(null, new Change<>("next", "eo4im")));
stateRestoreCallback.restoreBatch(asList(
new ConsumerRecord<>("changelog-topic",
0,
@ -412,7 +411,7 @@ public class TimeOrderedKeyValueBufferTest<B extends TimeOrderedKeyValueBuffer<S @@ -412,7 +411,7 @@ public class TimeOrderedKeyValueBufferTest<B extends TimeOrderedKeyValueBuffer<S
assertThat(buffer.numRecords(), is(3));
assertThat(buffer.minTimestamp(), is(0L));
assertThat(buffer.bufferSize(), is(196L));
assertThat(buffer.bufferSize(), is(172L));
stateRestoreCallback.restoreBatch(singletonList(
new ConsumerRecord<>("changelog-topic",
@ -429,7 +428,7 @@ public class TimeOrderedKeyValueBufferTest<B extends TimeOrderedKeyValueBuffer<S @@ -429,7 +428,7 @@ public class TimeOrderedKeyValueBufferTest<B extends TimeOrderedKeyValueBuffer<S
assertThat(buffer.numRecords(), is(2));
assertThat(buffer.minTimestamp(), is(1L));
assertThat(buffer.bufferSize(), is(131L));
assertThat(buffer.bufferSize(), is(115L));
assertThat(buffer.priorValueForBuffered("todelete"), is(Maybe.undefined()));
assertThat(buffer.priorValueForBuffered("asdf"), is(Maybe.defined(null)));
@ -477,14 +476,13 @@ public class TimeOrderedKeyValueBufferTest<B extends TimeOrderedKeyValueBuffer<S @@ -477,14 +476,13 @@ public class TimeOrderedKeyValueBufferTest<B extends TimeOrderedKeyValueBuffer<S
final byte[] todeleteValue = getContextualRecord("doomed", 0).serialize(0).array();
final byte[] asdfValue = getContextualRecord("qwer", 1).serialize(0).array();
final FullChangeSerde<String> fullChangeSerde = FullChangeSerde.castOrWrap(Serdes.String());
final FullChangeSerde<String> fullChangeSerde = FullChangeSerde.wrap(Serdes.String());
final byte[] zxcvValue1 = new ContextualRecord(
fullChangeSerde.serializer().serialize(null, new Change<>("3o4im", "previous")),
FullChangeSerde.composeLegacyFormat(fullChangeSerde.serializeParts(null, new Change<>("3o4im", "previous"))),
getContext(2L)
).serialize(0).array();
final FullChangeSerde<String> fullChangeSerde1 = FullChangeSerde.castOrWrap(Serdes.String());
final byte[] zxcvValue2 = new ContextualRecord(
fullChangeSerde1.serializer().serialize(null, new Change<>("next", "3o4im")),
FullChangeSerde.composeLegacyFormat(fullChangeSerde.serializeParts(null, new Change<>("next", "3o4im"))),
getContext(3L)
).serialize(0).array();
stateRestoreCallback.restoreBatch(asList(
@ -536,7 +534,7 @@ public class TimeOrderedKeyValueBufferTest<B extends TimeOrderedKeyValueBuffer<S @@ -536,7 +534,7 @@ public class TimeOrderedKeyValueBufferTest<B extends TimeOrderedKeyValueBuffer<S
assertThat(buffer.numRecords(), is(3));
assertThat(buffer.minTimestamp(), is(0L));
assertThat(buffer.bufferSize(), is(166L));
assertThat(buffer.bufferSize(), is(142L));
stateRestoreCallback.restoreBatch(singletonList(
new ConsumerRecord<>("changelog-topic",
@ -553,7 +551,7 @@ public class TimeOrderedKeyValueBufferTest<B extends TimeOrderedKeyValueBuffer<S @@ -553,7 +551,7 @@ public class TimeOrderedKeyValueBufferTest<B extends TimeOrderedKeyValueBuffer<S
assertThat(buffer.numRecords(), is(2));
assertThat(buffer.minTimestamp(), is(1L));
assertThat(buffer.bufferSize(), is(111L));
assertThat(buffer.bufferSize(), is(95L));
assertThat(buffer.priorValueForBuffered("todelete"), is(Maybe.undefined()));
assertThat(buffer.priorValueForBuffered("asdf"), is(Maybe.defined(null)));
@ -601,23 +599,21 @@ public class TimeOrderedKeyValueBufferTest<B extends TimeOrderedKeyValueBuffer<S @@ -601,23 +599,21 @@ public class TimeOrderedKeyValueBufferTest<B extends TimeOrderedKeyValueBuffer<S
final byte[] todeleteValue = getBufferValue("doomed", 0).serialize(0).array();
final byte[] asdfValue = getBufferValue("qwer", 1).serialize(0).array();
final FullChangeSerde<String> fullChangeSerde = FullChangeSerde.castOrWrap(Serdes.String());
final FullChangeSerde<String> fullChangeSerde = FullChangeSerde.wrap(Serdes.String());
final byte[] zxcvValue1 =
new BufferValue(
new ContextualRecord(
fullChangeSerde.serializer().serialize(null, new Change<>("3o4im", "IGNORED")),
Serdes.String().serializer().serialize(null, "previous"),
Serdes.String().serializer().serialize(null, "IGNORED"),
Serdes.String().serializer().serialize(null, "3o4im"),
getContext(2L)
),
Serdes.String().serializer().serialize(null, "previous")
).serialize(0).array();
final FullChangeSerde<String> fullChangeSerde1 = FullChangeSerde.castOrWrap(Serdes.String());
final FullChangeSerde<String> fullChangeSerde1 = FullChangeSerde.wrap(Serdes.String());
final byte[] zxcvValue2 =
new BufferValue(
new ContextualRecord(
fullChangeSerde1.serializer().serialize(null, new Change<>("next", "3o4im")),
Serdes.String().serializer().serialize(null, "previous"),
Serdes.String().serializer().serialize(null, "3o4im"),
Serdes.String().serializer().serialize(null, "next"),
getContext(3L)
),
Serdes.String().serializer().serialize(null, "previous")
).serialize(0).array();
stateRestoreCallback.restoreBatch(asList(
new ConsumerRecord<>("changelog-topic",
@ -668,7 +664,7 @@ public class TimeOrderedKeyValueBufferTest<B extends TimeOrderedKeyValueBuffer<S @@ -668,7 +664,7 @@ public class TimeOrderedKeyValueBufferTest<B extends TimeOrderedKeyValueBuffer<S
assertThat(buffer.numRecords(), is(3));
assertThat(buffer.minTimestamp(), is(0L));
assertThat(buffer.bufferSize(), is(166L));
assertThat(buffer.bufferSize(), is(142L));
stateRestoreCallback.restoreBatch(singletonList(
new ConsumerRecord<>("changelog-topic",
@ -685,7 +681,7 @@ public class TimeOrderedKeyValueBufferTest<B extends TimeOrderedKeyValueBuffer<S @@ -685,7 +681,7 @@ public class TimeOrderedKeyValueBufferTest<B extends TimeOrderedKeyValueBuffer<S
assertThat(buffer.numRecords(), is(2));
assertThat(buffer.minTimestamp(), is(1L));
assertThat(buffer.bufferSize(), is(111L));
assertThat(buffer.bufferSize(), is(95L));
assertThat(buffer.priorValueForBuffered("todelete"), is(Maybe.undefined()));
assertThat(buffer.priorValueForBuffered("asdf"), is(Maybe.defined(null)));
@ -766,14 +762,18 @@ public class TimeOrderedKeyValueBufferTest<B extends TimeOrderedKeyValueBuffer<S @@ -766,14 +762,18 @@ public class TimeOrderedKeyValueBufferTest<B extends TimeOrderedKeyValueBuffer<S
}
private static BufferValue getBufferValue(final String value, final long timestamp) {
final ContextualRecord contextualRecord = getContextualRecord(value, timestamp);
return new BufferValue(contextualRecord, null);
return new BufferValue(
null,
null,
Serdes.String().serializer().serialize(null, value),
getContext(timestamp)
);
}
private static ContextualRecord getContextualRecord(final String value, final long timestamp) {
final FullChangeSerde<String> fullChangeSerde = FullChangeSerde.castOrWrap(Serdes.String());
final FullChangeSerde<String> fullChangeSerde = FullChangeSerde.wrap(Serdes.String());
return new ContextualRecord(
fullChangeSerde.serializer().serialize(null, new Change<>(value, null)),
FullChangeSerde.composeLegacyFormat(fullChangeSerde.serializeParts(null, new Change<>(value, null))),
getContext(timestamp)
);
}

Loading…
Cancel
Save