From 201022d19eac50a4bce8ccfa12c8e233f2b5cad6 Mon Sep 17 00:00:00 2001 From: "Matthias J. Sax" Date: Wed, 30 Jan 2019 15:57:56 -0800 Subject: [PATCH] KAFKA-3522: Replace RecordConverter with TimestampedBytesStore (#6204) Reviewers: John Roesler , Bill Bejeck , Guozhang Wang --- .../internals/GlobalStateManagerImpl.java | 5 +- .../internals/ProcessorStateManager.java | 5 +- .../processor/internals/StateRestorer.java | 2 +- .../TimestampedBytesStore.java} | 18 +++--- .../{ => internals}/RecordConverter.java | 42 +++++++++---- .../state/internals/WrappedStateStore.java | 9 +-- .../internals/GlobalStateManagerImplTest.java | 29 ++++----- .../internals/ProcessorStateManagerTest.java | 19 ++---- .../internals/StateRestorerTest.java | 4 +- .../internals/StoreChangelogReaderTest.java | 60 +++++++++---------- .../apache/kafka/test/MockKeyValueStore.java | 2 + 11 files changed, 97 insertions(+), 98 deletions(-) rename streams/src/main/java/org/apache/kafka/streams/{processor/internals/DefaultRecordConverter.java => state/TimestampedBytesStore.java} (66%) rename streams/src/main/java/org/apache/kafka/streams/state/{ => internals}/RecordConverter.java (51%) diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/GlobalStateManagerImpl.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/GlobalStateManagerImpl.java index e1963ed7a37..ca4a49f7aa5 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/GlobalStateManagerImpl.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/GlobalStateManagerImpl.java @@ -32,7 +32,8 @@ import org.apache.kafka.streams.errors.StreamsException; import org.apache.kafka.streams.processor.StateRestoreCallback; import org.apache.kafka.streams.processor.StateRestoreListener; import org.apache.kafka.streams.processor.StateStore; -import org.apache.kafka.streams.state.RecordConverter; +import org.apache.kafka.streams.state.TimestampedBytesStore; +import org.apache.kafka.streams.state.internals.RecordConverter; import org.apache.kafka.streams.state.internals.WrappedStateStore; import org.slf4j.Logger; @@ -200,7 +201,7 @@ public class GlobalStateManagerImpl extends AbstractStateManager implements Glob final StateStore stateStore = store instanceof WrappedStateStore ? ((WrappedStateStore) store).inner() : store; final RecordConverter recordConverter = - stateStore instanceof RecordConverter ? (RecordConverter) stateStore : new DefaultRecordConverter(); + stateStore instanceof TimestampedBytesStore ? RecordConverter.converter() : record -> record; restoreState( stateRestoreCallback, diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorStateManager.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorStateManager.java index 669eb152712..8c9eb081abe 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorStateManager.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorStateManager.java @@ -23,8 +23,9 @@ import org.apache.kafka.streams.errors.ProcessorStateException; import org.apache.kafka.streams.processor.StateRestoreCallback; import org.apache.kafka.streams.processor.StateStore; import org.apache.kafka.streams.processor.TaskId; +import org.apache.kafka.streams.state.TimestampedBytesStore; import org.apache.kafka.streams.state.internals.OffsetCheckpoint; -import org.apache.kafka.streams.state.RecordConverter; +import org.apache.kafka.streams.state.internals.RecordConverter; import org.apache.kafka.streams.state.internals.WrappedStateStore; import org.slf4j.Logger; @@ -136,7 +137,7 @@ public class ProcessorStateManager extends AbstractStateManager { final StateStore stateStore = store instanceof WrappedStateStore ? ((WrappedStateStore) store).inner() : store; final RecordConverter recordConverter = - stateStore instanceof RecordConverter ? (RecordConverter) stateStore : new DefaultRecordConverter(); + stateStore instanceof TimestampedBytesStore ? RecordConverter.converter() : record -> record; if (isStandby) { log.trace("Preparing standby replica of persistent state store {} with changelog topic {}", storeName, topic); diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StateRestorer.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StateRestorer.java index 3d937db95b5..4736a3318ba 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StateRestorer.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StateRestorer.java @@ -19,7 +19,7 @@ package org.apache.kafka.streams.processor.internals; import org.apache.kafka.clients.consumer.ConsumerRecord; import org.apache.kafka.common.TopicPartition; import org.apache.kafka.streams.processor.StateRestoreListener; -import org.apache.kafka.streams.state.RecordConverter; +import org.apache.kafka.streams.state.internals.RecordConverter; import java.util.ArrayList; import java.util.Collection; diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/DefaultRecordConverter.java b/streams/src/main/java/org/apache/kafka/streams/state/TimestampedBytesStore.java similarity index 66% rename from streams/src/main/java/org/apache/kafka/streams/processor/internals/DefaultRecordConverter.java rename to streams/src/main/java/org/apache/kafka/streams/state/TimestampedBytesStore.java index 752a4a279f7..5b5fbc521af 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/DefaultRecordConverter.java +++ b/streams/src/main/java/org/apache/kafka/streams/state/TimestampedBytesStore.java @@ -14,16 +14,18 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -package org.apache.kafka.streams.processor.internals; +package org.apache.kafka.streams.state; -import org.apache.kafka.clients.consumer.ConsumerRecord; -import org.apache.kafka.streams.state.RecordConverter; +import java.nio.ByteBuffer; -public class DefaultRecordConverter implements RecordConverter { +import static org.apache.kafka.clients.consumer.ConsumerRecord.NO_TIMESTAMP; - @Override - public ConsumerRecord convert(final ConsumerRecord record) { - return record; +public interface TimestampedBytesStore { + static byte[] convertToTimestampedFormat(final byte[] plainValue) { + return ByteBuffer + .allocate(8 + plainValue.length) + .putLong(NO_TIMESTAMP) + .put(plainValue) + .array(); } - } diff --git a/streams/src/main/java/org/apache/kafka/streams/state/RecordConverter.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/RecordConverter.java similarity index 51% rename from streams/src/main/java/org/apache/kafka/streams/state/RecordConverter.java rename to streams/src/main/java/org/apache/kafka/streams/state/internals/RecordConverter.java index 5cdaddfa20f..64733b2b1f4 100644 --- a/streams/src/main/java/org/apache/kafka/streams/state/RecordConverter.java +++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/RecordConverter.java @@ -14,22 +14,38 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -package org.apache.kafka.streams.state; +package org.apache.kafka.streams.state.internals; import org.apache.kafka.clients.consumer.ConsumerRecord; -import org.apache.kafka.streams.KeyValue; -/** - * {@code RecordConverter} translates a {@link ConsumerRecord} into a {@link KeyValue} pair. - */ -public interface RecordConverter { +import java.nio.ByteBuffer; - /** - * Convert a given record into a key-value pair. - * - * @param record the consumer record - * @return the record as key-value pair - */ +public interface RecordConverter { ConsumerRecord convert(final ConsumerRecord record); -} \ No newline at end of file + @SuppressWarnings("deprecation") + static RecordConverter converter() { + return record -> { + final byte[] rawValue = record.value(); + final long timestamp = record.timestamp(); + return new ConsumerRecord<>( + record.topic(), + record.partition(), + record.offset(), + timestamp, + record.timestampType(), + record.checksum(), + record.serializedKeySize(), + record.serializedValueSize(), + record.key(), + ByteBuffer + .allocate(8 + rawValue.length) + .putLong(timestamp) + .put(rawValue) + .array(), + record.headers(), + record.leaderEpoch() + ); + }; + } +} diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/WrappedStateStore.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/WrappedStateStore.java index e45db07653b..2bb60bc47df 100644 --- a/streams/src/main/java/org/apache/kafka/streams/state/internals/WrappedStateStore.java +++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/WrappedStateStore.java @@ -16,11 +16,9 @@ */ package org.apache.kafka.streams.state.internals; -import org.apache.kafka.clients.consumer.ConsumerRecord; import org.apache.kafka.streams.errors.InvalidStateStoreException; import org.apache.kafka.streams.processor.ProcessorContext; import org.apache.kafka.streams.processor.StateStore; -import org.apache.kafka.streams.state.RecordConverter; /** * A storage engine wrapper for utilities like logging, caching, and metering. @@ -40,7 +38,7 @@ public interface WrappedStateStore extends StateStore { */ StateStore wrappedStore(); - abstract class AbstractStateStore implements WrappedStateStore, RecordConverter { + abstract class AbstractStateStore implements WrappedStateStore { final StateStore innerState; protected AbstractStateStore(final StateStore inner) { @@ -97,10 +95,5 @@ public interface WrappedStateStore extends StateStore { return innerState; } - @Override - public ConsumerRecord convert(final ConsumerRecord record) { - return ((RecordConverter) innerState).convert(record); - } - } } diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/GlobalStateManagerImplTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/GlobalStateManagerImplTest.java index 4f160f5dfd2..03a431d1729 100644 --- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/GlobalStateManagerImplTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/GlobalStateManagerImplTest.java @@ -34,8 +34,8 @@ import org.apache.kafka.streams.errors.StreamsException; import org.apache.kafka.streams.processor.ProcessorContext; import org.apache.kafka.streams.processor.StateRestoreCallback; import org.apache.kafka.streams.processor.StateStore; +import org.apache.kafka.streams.state.TimestampedBytesStore; import org.apache.kafka.streams.state.internals.OffsetCheckpoint; -import org.apache.kafka.streams.state.RecordConverter; import org.apache.kafka.streams.state.internals.WrappedStateStore; import org.apache.kafka.test.InternalMockProcessorContext; import org.apache.kafka.test.MockStateRestoreListener; @@ -224,7 +224,7 @@ public class GlobalStateManagerImplTest { } @Test - public void shouldUseDefaultRecordConverterIfStoreDoesNotImplementRecordConverter() { + public void shouldNotConvertValuesIfStoreDoesNotImplementTimestampedBytesStore() { initializeConsumer(1, 0, t1); stateManager.initialize(); @@ -236,7 +236,7 @@ public class GlobalStateManagerImplTest { } @Test - public void shouldUseDefaultRecordConverterIfInnerStoreDoesNotImplementRecordConverter() { + public void shouldNotConvertValuesIfInnerStoreDoesNotImplementTimestampedBytesStore() { initializeConsumer(1, 0, t1); stateManager.initialize(); @@ -288,20 +288,19 @@ public class GlobalStateManagerImplTest { } @Test - public void shouldUseStoreAsRecordConverterIfStoreImplementsRecordConverter() { + public void shouldConvertValuesIfStoreImplementsTimestampedBytesStore() { initializeConsumer(1, 0, t2); stateManager.initialize(); stateManager.register(store2, stateRestoreCallback); final KeyValue restoredRecord = stateRestoreCallback.restored.get(0); - assertEquals(0, restoredRecord.key.length); - assertEquals(0, restoredRecord.value.length); - + assertEquals(3, restoredRecord.key.length); + assertEquals(13, restoredRecord.value.length); } @Test - public void shouldUseStoreAsRecordConverterIfInnerStoreImplementsRecordConverter() { + public void shouldConvertValuesIfInnerStoreImplementsTimestampedBytesStore() { initializeConsumer(1, 0, t2); stateManager.initialize(); @@ -348,8 +347,8 @@ public class GlobalStateManagerImplTest { }, stateRestoreCallback); final KeyValue restoredRecord = stateRestoreCallback.restored.get(0); - assertEquals(0, restoredRecord.key.length); - assertEquals(0, restoredRecord.value.length); + assertEquals(3, restoredRecord.key.length); + assertEquals(13, restoredRecord.value.length); } @Test @@ -827,19 +826,11 @@ public class GlobalStateManagerImplTest { } } - - - private class ConverterStore extends NoOpReadOnlyStore implements RecordConverter { - + private class ConverterStore extends NoOpReadOnlyStore implements TimestampedBytesStore { ConverterStore(final String name, final boolean rocksdbStore) { super(name, rocksdbStore); } - - @Override - public ConsumerRecord convert(final ConsumerRecord record) { - return new ConsumerRecord<>("", 0, 0L, "".getBytes(), "".getBytes()); - } } } diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorStateManagerTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorStateManagerTest.java index dee948f3df5..ce76749d50d 100644 --- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorStateManagerTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorStateManagerTest.java @@ -29,8 +29,8 @@ import org.apache.kafka.streams.processor.StateRestoreCallback; import org.apache.kafka.streams.processor.StateStore; import org.apache.kafka.streams.processor.TaskId; import org.apache.kafka.streams.processor.internals.testutil.LogCaptureAppender; +import org.apache.kafka.streams.state.TimestampedBytesStore; import org.apache.kafka.streams.state.internals.OffsetCheckpoint; -import org.apache.kafka.streams.state.RecordConverter; import org.apache.kafka.test.MockBatchingStateRestoreListener; import org.apache.kafka.test.MockKeyValueStore; import org.apache.kafka.test.NoOpProcessorContext; @@ -153,15 +153,16 @@ public class ProcessorStateManagerTest { ); assertThat(persistentStore.keys.size(), is(1)); assertTrue(persistentStore.keys.contains(intKey)); + assertEquals(9, persistentStore.values.get(0).length); } finally { stateMgr.close(Collections.emptyMap()); } } @Test - public void shouldConvertDataOnRestoreIfStoreImplementsRecordConverter() throws Exception { + public void shouldConvertDataOnRestoreIfStoreImplementsTimestampedBytesStore() throws Exception { final TaskId taskId = new TaskId(0, 2); - final Integer intKey = 2; + final Integer intKey = 1; final MockKeyValueStore persistentStore = getConverterStore(); final ProcessorStateManager stateMgr = getStandByStateManager(taskId); @@ -175,6 +176,7 @@ public class ProcessorStateManagerTest { ); assertThat(persistentStore.keys.size(), is(1)); assertTrue(persistentStore.keys.contains(intKey)); + assertEquals(17, persistentStore.values.get(0).length); } finally { stateMgr.close(Collections.emptyMap()); } @@ -797,19 +799,10 @@ public class ProcessorStateManagerTest { return new ConverterStore("persistentStore", true); } - - - private class ConverterStore extends MockKeyValueStore implements RecordConverter { - + private class ConverterStore extends MockKeyValueStore implements TimestampedBytesStore { ConverterStore(final String name, final boolean persistent) { super(name, persistent); } - - @Override - public ConsumerRecord convert(final ConsumerRecord record) { - return new ConsumerRecord<>("", 0, 0L, new byte[]{0x0, 0x0, 0x0, 0x2}, "".getBytes()); - } } - } diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StateRestorerTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StateRestorerTest.java index d2eda4bd8c5..8ed23a6bc72 100644 --- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StateRestorerTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StateRestorerTest.java @@ -42,7 +42,7 @@ public class StateRestorerTest { OFFSET_LIMIT, true, "storeName", - new DefaultRecordConverter()); + record -> record); @Before public void setUp() { @@ -79,7 +79,7 @@ public class StateRestorerTest { 0, true, "storeName", - new DefaultRecordConverter()); + record -> record); assertTrue(restorer.hasCompleted(0, 10)); } diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StoreChangelogReaderTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StoreChangelogReaderTest.java index 974e62da797..ed82421f2ea 100644 --- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StoreChangelogReaderTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StoreChangelogReaderTest.java @@ -107,7 +107,7 @@ public class StoreChangelogReaderTest { Long.MAX_VALUE, true, "storeName", - new DefaultRecordConverter())); + record -> record)); changelogReader.restore(active); assertTrue(functionCalled.get()); } @@ -145,7 +145,7 @@ public class StoreChangelogReaderTest { Long.MAX_VALUE, true, "storeName", - new DefaultRecordConverter())); + record -> record)); expect(active.restoringTaskFor(topicPartition)).andStubReturn(task); replay(active, task); changelogReader.restore(active); @@ -170,7 +170,7 @@ public class StoreChangelogReaderTest { Long.MAX_VALUE, true, "storeName", - new DefaultRecordConverter())); + record -> record)); EasyMock.expect(active.restoringTaskFor(topicPartition)).andStubReturn(task); EasyMock.replay(active, task); @@ -185,7 +185,7 @@ public class StoreChangelogReaderTest { Long.MAX_VALUE, true, "storeName", - new DefaultRecordConverter())); + record -> record)); // retry restore should succeed assertEquals(1, changelogReader.restore(active).size()); assertThat(callback.restored.size(), equalTo(messages)); @@ -210,7 +210,7 @@ public class StoreChangelogReaderTest { Long.MAX_VALUE, true, "storeName", - new DefaultRecordConverter()); + record -> record); changelogReader.register(stateRestorer); EasyMock.expect(active.restoringTaskFor(topicPartition)).andStubReturn(task); @@ -240,7 +240,7 @@ public class StoreChangelogReaderTest { Long.MAX_VALUE, true, "storeName", - new DefaultRecordConverter())); + record -> record)); changelogReader.restore(active); assertThat(callback.restored.size(), equalTo(5)); @@ -257,7 +257,7 @@ public class StoreChangelogReaderTest { Long.MAX_VALUE, true, "storeName", - new DefaultRecordConverter())); + record -> record)); expect(active.restoringTaskFor(topicPartition)).andStubReturn(task); replay(active, task); changelogReader.restore(active); @@ -274,7 +274,7 @@ public class StoreChangelogReaderTest { 3, true, "storeName", - new DefaultRecordConverter()); + record -> record); changelogReader.register(restorer); expect(active.restoringTaskFor(topicPartition)).andStubReturn(task); replay(active, task); @@ -302,7 +302,7 @@ public class StoreChangelogReaderTest { Long.MAX_VALUE, true, "storeName1", - new DefaultRecordConverter())); + record -> record)); changelogReader.register(new StateRestorer( one, restoreListener1, @@ -310,7 +310,7 @@ public class StoreChangelogReaderTest { Long.MAX_VALUE, true, "storeName2", - new DefaultRecordConverter())); + record -> record)); changelogReader.register(new StateRestorer( two, restoreListener2, @@ -318,7 +318,7 @@ public class StoreChangelogReaderTest { Long.MAX_VALUE, true, "storeName3", - new DefaultRecordConverter())); + record -> record)); expect(active.restoringTaskFor(one)).andStubReturn(task); expect(active.restoringTaskFor(two)).andStubReturn(task); @@ -350,7 +350,7 @@ public class StoreChangelogReaderTest { Long.MAX_VALUE, true, "storeName1", - new DefaultRecordConverter())); + record -> record)); changelogReader.register(new StateRestorer( one, restoreListener1, @@ -358,7 +358,7 @@ public class StoreChangelogReaderTest { Long.MAX_VALUE, true, "storeName2", - new DefaultRecordConverter())); + record -> record)); changelogReader.register(new StateRestorer( two, restoreListener2, @@ -366,7 +366,7 @@ public class StoreChangelogReaderTest { Long.MAX_VALUE, true, "storeName3", - new DefaultRecordConverter())); + record -> record)); expect(active.restoringTaskFor(one)).andReturn(task); expect(active.restoringTaskFor(two)).andReturn(task); @@ -401,7 +401,7 @@ public class StoreChangelogReaderTest { 5, true, "storeName1", - new DefaultRecordConverter())); + record -> record)); expect(active.restoringTaskFor(topicPartition)).andStubReturn(task); replay(active, task); changelogReader.restore(active); @@ -437,7 +437,7 @@ public class StoreChangelogReaderTest { Long.MAX_VALUE, true, "storeName", - new DefaultRecordConverter()); + record -> record); setupConsumer(0, topicPartition); changelogReader.register(restorer); @@ -457,7 +457,7 @@ public class StoreChangelogReaderTest { Long.MAX_VALUE, true, "storeName", - new DefaultRecordConverter()); + record -> record); changelogReader.register(restorer); @@ -476,7 +476,7 @@ public class StoreChangelogReaderTest { Long.MAX_VALUE, true, "storeName", - new DefaultRecordConverter())); + record -> record)); expect(active.restoringTaskFor(topicPartition)).andStubReturn(task); replay(active, task); changelogReader.restore(active); @@ -495,7 +495,7 @@ public class StoreChangelogReaderTest { Long.MAX_VALUE, false, "storeName", - new DefaultRecordConverter())); + record -> record)); expect(active.restoringTaskFor(topicPartition)).andStubReturn(task); replay(active, task); changelogReader.restore(active); @@ -518,7 +518,7 @@ public class StoreChangelogReaderTest { Long.MAX_VALUE, false, "storeName", - new DefaultRecordConverter())); + record -> record)); expect(active.restoringTaskFor(topicPartition)).andStubReturn(task); replay(active, task); changelogReader.restore(active); @@ -537,7 +537,7 @@ public class StoreChangelogReaderTest { Long.MAX_VALUE, true, "store", - new DefaultRecordConverter())); + record -> record)); expect(active.restoringTaskFor(topicPartition)).andStubReturn(task); replay(active, task); @@ -559,7 +559,7 @@ public class StoreChangelogReaderTest { Long.MAX_VALUE, false, "storeName", - new DefaultRecordConverter())); + record -> record)); final TopicPartition postInitialization = new TopicPartition("other", 0); expect(active.restoringTaskFor(topicPartition)).andStubReturn(task); @@ -581,7 +581,7 @@ public class StoreChangelogReaderTest { Long.MAX_VALUE, false, "otherStore", - new DefaultRecordConverter())); + record -> record)); final Collection expected = Utils.mkSet(topicPartition, postInitialization); consumer.assign(expected); @@ -605,7 +605,7 @@ public class StoreChangelogReaderTest { 9L, true, "storeName", - new DefaultRecordConverter())); + record -> record)); expect(active.restoringTaskFor(topicPartition)).andReturn(task); replay(active); @@ -627,7 +627,7 @@ public class StoreChangelogReaderTest { Long.MAX_VALUE, true, "storeName", - new DefaultRecordConverter())); + record -> record)); expect(active.restoringTaskFor(topicPartition)).andReturn(task); replay(active); @@ -648,7 +648,7 @@ public class StoreChangelogReaderTest { Long.MAX_VALUE, true, "storeName", - new DefaultRecordConverter())); + record -> record)); expect(active.restoringTaskFor(topicPartition)).andReturn(task); replay(active); @@ -668,7 +668,7 @@ public class StoreChangelogReaderTest { 5, true, "storeName", - new DefaultRecordConverter())); + record -> record)); expect(active.restoringTaskFor(topicPartition)).andReturn(task); replay(active); @@ -689,7 +689,7 @@ public class StoreChangelogReaderTest { 10, true, "storeName", - new DefaultRecordConverter())); + record -> record)); expect(active.restoringTaskFor(topicPartition)).andReturn(task); replay(active); @@ -717,7 +717,7 @@ public class StoreChangelogReaderTest { 6, true, "storeName", - new DefaultRecordConverter())); + record -> record)); expect(active.restoringTaskFor(topicPartition)).andReturn(task); replay(active); @@ -740,7 +740,7 @@ public class StoreChangelogReaderTest { 11, true, "storeName", - new DefaultRecordConverter())); + record -> record)); expect(active.restoringTaskFor(topicPartition)).andReturn(task); replay(active); diff --git a/streams/src/test/java/org/apache/kafka/test/MockKeyValueStore.java b/streams/src/test/java/org/apache/kafka/test/MockKeyValueStore.java index 8fd1f74b550..1729b2433b5 100644 --- a/streams/src/test/java/org/apache/kafka/test/MockKeyValueStore.java +++ b/streams/src/test/java/org/apache/kafka/test/MockKeyValueStore.java @@ -35,6 +35,7 @@ public class MockKeyValueStore implements KeyValueStore { public boolean flushed = false; public boolean closed = true; public final ArrayList keys = new ArrayList<>(); + public final ArrayList values = new ArrayList<>(); public MockKeyValueStore(final String name, final boolean persistent) { @@ -82,6 +83,7 @@ public class MockKeyValueStore implements KeyValueStore { public void restore(final byte[] key, final byte[] value) { keys.add(deserializer.deserialize("", key)); + values.add(value); } };