From ed7b67dd1135e213ed7eda8ee2d733de7c965519 Mon Sep 17 00:00:00 2001 From: "Matthias J. Sax" Date: Sat, 19 Jan 2019 22:54:26 -0800 Subject: [PATCH] KAFKA-3522: Add internal RecordConverter interface (#6150) Reviewers: Bill Bejeck , Guozhang Wang --- .../internals/DefaultRecordConverter.java | 29 +++ .../internals/GlobalStateManagerImpl.java | 19 +- .../internals/ProcessorStateManager.java | 24 +- .../processor/internals/StateRestorer.java | 13 +- .../kafka/streams/state/RecordConverter.java | 35 +++ .../state/internals/WrappedStateStore.java | 10 +- .../internals/GlobalStateManagerImplTest.java | 216 +++++++++++++++--- .../internals/ProcessorStateManagerTest.java | 42 ++++ .../internals/StateRestorerTest.java | 8 +- .../internals/StoreChangelogReaderTest.java | 100 +++++--- 10 files changed, 416 insertions(+), 80 deletions(-) create mode 100644 streams/src/main/java/org/apache/kafka/streams/processor/internals/DefaultRecordConverter.java create mode 100644 streams/src/main/java/org/apache/kafka/streams/state/RecordConverter.java diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/DefaultRecordConverter.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/DefaultRecordConverter.java new file mode 100644 index 00000000000..752a4a279f7 --- /dev/null +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/DefaultRecordConverter.java @@ -0,0 +1,29 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.streams.processor.internals; + +import org.apache.kafka.clients.consumer.ConsumerRecord; +import org.apache.kafka.streams.state.RecordConverter; + +public class DefaultRecordConverter implements RecordConverter { + + @Override + public ConsumerRecord convert(final ConsumerRecord record) { + return record; + } + +} diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/GlobalStateManagerImpl.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/GlobalStateManagerImpl.java index a20f3b0a7c9..e1963ed7a37 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/GlobalStateManagerImpl.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/GlobalStateManagerImpl.java @@ -32,6 +32,8 @@ import org.apache.kafka.streams.errors.StreamsException; import org.apache.kafka.streams.processor.StateRestoreCallback; import org.apache.kafka.streams.processor.StateRestoreListener; import org.apache.kafka.streams.processor.StateStore; +import org.apache.kafka.streams.state.RecordConverter; +import org.apache.kafka.streams.state.internals.WrappedStateStore; import org.slf4j.Logger; import java.io.File; @@ -195,7 +197,17 @@ public class GlobalStateManagerImpl extends AbstractStateManager implements Glob } } try { - restoreState(stateRestoreCallback, topicPartitions, highWatermarks, store.name()); + final StateStore stateStore = + store instanceof WrappedStateStore ? ((WrappedStateStore) store).inner() : store; + final RecordConverter recordConverter = + stateStore instanceof RecordConverter ? (RecordConverter) stateStore : new DefaultRecordConverter(); + + restoreState( + stateRestoreCallback, + topicPartitions, + highWatermarks, + store.name(), + recordConverter); globalStores.put(store.name(), store); } finally { globalConsumer.unsubscribe(); @@ -249,7 +261,8 @@ public class GlobalStateManagerImpl extends AbstractStateManager implements Glob private void restoreState(final StateRestoreCallback stateRestoreCallback, final List topicPartitions, final Map highWatermarks, - final String storeName) { + final String storeName, + final RecordConverter recordConverter) { for (final TopicPartition topicPartition : topicPartitions) { globalConsumer.assign(Collections.singletonList(topicPartition)); final Long checkpoint = checkpointableOffsets.get(topicPartition); @@ -273,7 +286,7 @@ public class GlobalStateManagerImpl extends AbstractStateManager implements Glob final List> restoreRecords = new ArrayList<>(); for (final ConsumerRecord record : records.records(topicPartition)) { if (record.key() != null) { - restoreRecords.add(record); + restoreRecords.add(recordConverter.convert(record)); } } offset = globalConsumer.position(topicPartition); diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorStateManager.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorStateManager.java index cf830fbba0d..669eb152712 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorStateManager.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorStateManager.java @@ -24,6 +24,8 @@ import org.apache.kafka.streams.processor.StateRestoreCallback; import org.apache.kafka.streams.processor.StateStore; import org.apache.kafka.streams.processor.TaskId; import org.apache.kafka.streams.state.internals.OffsetCheckpoint; +import org.apache.kafka.streams.state.RecordConverter; +import org.apache.kafka.streams.state.internals.WrappedStateStore; import org.slf4j.Logger; import java.io.File; @@ -48,6 +50,7 @@ public class ProcessorStateManager extends AbstractStateManager { private final Map offsetLimits; private final Map standbyRestoredOffsets; private final Map restoreCallbacks; // used for standby tasks, keyed by state topic name + private final Map recordConverters; // used for standby tasks, keyed by state topic name private final Map storeToChangelogTopic; private final List changelogPartitions = new ArrayList<>(); @@ -82,6 +85,7 @@ public class ProcessorStateManager extends AbstractStateManager { standbyRestoredOffsets = new HashMap<>(); this.isStandby = isStandby; restoreCallbacks = isStandby ? new HashMap<>() : null; + recordConverters = isStandby ? new HashMap<>() : null; this.storeToChangelogTopic = storeToChangelogTopic; // load the checkpoint information @@ -129,18 +133,28 @@ public class ProcessorStateManager extends AbstractStateManager { final TopicPartition storePartition = new TopicPartition(topic, getPartition(topic)); + final StateStore stateStore = + store instanceof WrappedStateStore ? ((WrappedStateStore) store).inner() : store; + final RecordConverter recordConverter = + stateStore instanceof RecordConverter ? (RecordConverter) stateStore : new DefaultRecordConverter(); + if (isStandby) { log.trace("Preparing standby replica of persistent state store {} with changelog topic {}", storeName, topic); + restoreCallbacks.put(topic, stateRestoreCallback); + recordConverters.put(topic, recordConverter); } else { log.trace("Restoring state store {} from changelog topic {}", storeName, topic); + final StateRestorer restorer = new StateRestorer( storePartition, new CompositeRestoreListener(stateRestoreCallback), checkpointableOffsets.get(storePartition), offsetLimit(storePartition), store.persistent(), - storeName); + storeName, + recordConverter + ); changelogReader.register(restorer); } @@ -181,8 +195,14 @@ public class ProcessorStateManager extends AbstractStateManager { final RecordBatchingStateRestoreCallback restoreCallback = adapt(restoreCallbacks.get(storePartition.topic())); if (!restoreRecords.isEmpty()) { + final RecordConverter converter = recordConverters.get(storePartition.topic()); + final List> convertedRecords = new ArrayList<>(restoreRecords.size()); + for (final ConsumerRecord record : restoreRecords) { + convertedRecords.add(converter.convert(record)); + } + try { - restoreCallback.restoreBatch(restoreRecords); + restoreCallback.restoreBatch(convertedRecords); } catch (final Exception e) { throw new ProcessorStateException(String.format("%sException caught while trying to restore state from %s", logPrefix, storePartition), e); } diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StateRestorer.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StateRestorer.java index 6a2076e1ccd..3d937db95b5 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StateRestorer.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StateRestorer.java @@ -19,7 +19,9 @@ package org.apache.kafka.streams.processor.internals; import org.apache.kafka.clients.consumer.ConsumerRecord; import org.apache.kafka.common.TopicPartition; import org.apache.kafka.streams.processor.StateRestoreListener; +import org.apache.kafka.streams.state.RecordConverter; +import java.util.ArrayList; import java.util.Collection; public class StateRestorer { @@ -31,6 +33,7 @@ public class StateRestorer { private final String storeName; private final TopicPartition partition; private final CompositeRestoreListener compositeRestoreListener; + private final RecordConverter recordConverter; private long checkpointOffset; private long restoredOffset; @@ -42,13 +45,15 @@ public class StateRestorer { final Long checkpoint, final long offsetLimit, final boolean persistent, - final String storeName) { + final String storeName, + final RecordConverter recordConverter) { this.partition = partition; this.compositeRestoreListener = compositeRestoreListener; this.checkpointOffset = checkpoint == null ? NO_CHECKPOINT : checkpoint; this.offsetLimit = offsetLimit; this.persistent = persistent; this.storeName = storeName; + this.recordConverter = recordConverter; } public TopicPartition partition() { @@ -80,7 +85,11 @@ public class StateRestorer { } void restore(final Collection> records) { - compositeRestoreListener.restoreBatch(records); + final Collection> convertedRecords = new ArrayList<>(records.size()); + for (final ConsumerRecord record : records) { + convertedRecords.add(recordConverter.convert(record)); + } + compositeRestoreListener.restoreBatch(convertedRecords); } boolean isPersistent() { diff --git a/streams/src/main/java/org/apache/kafka/streams/state/RecordConverter.java b/streams/src/main/java/org/apache/kafka/streams/state/RecordConverter.java new file mode 100644 index 00000000000..5cdaddfa20f --- /dev/null +++ b/streams/src/main/java/org/apache/kafka/streams/state/RecordConverter.java @@ -0,0 +1,35 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.streams.state; + +import org.apache.kafka.clients.consumer.ConsumerRecord; +import org.apache.kafka.streams.KeyValue; + +/** + * {@code RecordConverter} translates a {@link ConsumerRecord} into a {@link KeyValue} pair. + */ +public interface RecordConverter { + + /** + * Convert a given record into a key-value pair. + * + * @param record the consumer record + * @return the record as key-value pair + */ + ConsumerRecord convert(final ConsumerRecord record); + +} \ No newline at end of file diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/WrappedStateStore.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/WrappedStateStore.java index 570c465f77f..e45db07653b 100644 --- a/streams/src/main/java/org/apache/kafka/streams/state/internals/WrappedStateStore.java +++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/WrappedStateStore.java @@ -16,9 +16,11 @@ */ package org.apache.kafka.streams.state.internals; +import org.apache.kafka.clients.consumer.ConsumerRecord; import org.apache.kafka.streams.errors.InvalidStateStoreException; import org.apache.kafka.streams.processor.ProcessorContext; import org.apache.kafka.streams.processor.StateStore; +import org.apache.kafka.streams.state.RecordConverter; /** * A storage engine wrapper for utilities like logging, caching, and metering. @@ -38,7 +40,7 @@ public interface WrappedStateStore extends StateStore { */ StateStore wrappedStore(); - abstract class AbstractStateStore implements WrappedStateStore { + abstract class AbstractStateStore implements WrappedStateStore, RecordConverter { final StateStore innerState; protected AbstractStateStore(final StateStore inner) { @@ -94,5 +96,11 @@ public interface WrappedStateStore extends StateStore { public StateStore wrappedStore() { return innerState; } + + @Override + public ConsumerRecord convert(final ConsumerRecord record) { + return ((RecordConverter) innerState).convert(record); + } + } } diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/GlobalStateManagerImplTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/GlobalStateManagerImplTest.java index 8b02b3e66d2..4f160f5dfd2 100644 --- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/GlobalStateManagerImplTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/GlobalStateManagerImplTest.java @@ -31,8 +31,12 @@ import org.apache.kafka.streams.StreamsConfig; import org.apache.kafka.streams.errors.LockException; import org.apache.kafka.streams.errors.ProcessorStateException; import org.apache.kafka.streams.errors.StreamsException; +import org.apache.kafka.streams.processor.ProcessorContext; import org.apache.kafka.streams.processor.StateRestoreCallback; +import org.apache.kafka.streams.processor.StateStore; import org.apache.kafka.streams.state.internals.OffsetCheckpoint; +import org.apache.kafka.streams.state.RecordConverter; +import org.apache.kafka.streams.state.internals.WrappedStateStore; import org.apache.kafka.test.InternalMockProcessorContext; import org.apache.kafka.test.MockStateRestoreListener; import org.apache.kafka.test.NoOpReadOnlyStore; @@ -90,7 +94,7 @@ public class GlobalStateManagerImplTest { private InternalMockProcessorContext processorContext; @Before - public void before() throws IOException { + public void before() { final Map storeToTopic = new HashMap<>(); storeToTopic.put(storeName1, t1.topic()); @@ -99,7 +103,7 @@ public class GlobalStateManagerImplTest { storeToTopic.put(storeName4, t4.topic()); store1 = new NoOpReadOnlyStore<>(storeName1, true); - store2 = new NoOpReadOnlyStore<>(storeName2, true); + store2 = new ConverterStore<>(storeName2, true); store3 = new NoOpReadOnlyStore<>(storeName3); store4 = new NoOpReadOnlyStore<>(storeName4); @@ -198,7 +202,7 @@ public class GlobalStateManagerImplTest { @Test public void shouldThrowIllegalArgumentExceptionIfAttemptingToRegisterStoreTwice() { stateManager.initialize(); - initializeConsumer(2, 1, t1); + initializeConsumer(2, 0, t1); stateManager.register(store1, stateRestoreCallback); try { stateManager.register(store1, stateRestoreCallback); @@ -219,9 +223,138 @@ public class GlobalStateManagerImplTest { } } + @Test + public void shouldUseDefaultRecordConverterIfStoreDoesNotImplementRecordConverter() { + initializeConsumer(1, 0, t1); + + stateManager.initialize(); + stateManager.register(store1, stateRestoreCallback); + + final KeyValue restoredRecord = stateRestoreCallback.restored.get(0); + assertEquals(3, restoredRecord.key.length); + assertEquals(5, restoredRecord.value.length); + } + + @Test + public void shouldUseDefaultRecordConverterIfInnerStoreDoesNotImplementRecordConverter() { + initializeConsumer(1, 0, t1); + + stateManager.initialize(); + stateManager.register(new WrappedStateStore() { + @Override + public StateStore inner() { + return store1; + } + + @Override + public StateStore wrappedStore() { + return store1; + } + + @Override + public String name() { + return store1.name(); + } + + @Override + public void init(final ProcessorContext context, final StateStore root) { + store1.init(context, root); + } + + @Override + public void flush() { + store1.flush(); + } + + @Override + public void close() { + store1.close(); + } + + @Override + public boolean persistent() { + return store1.persistent(); + } + + @Override + public boolean isOpen() { + return store1.isOpen(); + } + }, stateRestoreCallback); + + final KeyValue restoredRecord = stateRestoreCallback.restored.get(0); + assertEquals(3, restoredRecord.key.length); + assertEquals(5, restoredRecord.value.length); + } + + @Test + public void shouldUseStoreAsRecordConverterIfStoreImplementsRecordConverter() { + initializeConsumer(1, 0, t2); + + stateManager.initialize(); + stateManager.register(store2, stateRestoreCallback); + + final KeyValue restoredRecord = stateRestoreCallback.restored.get(0); + assertEquals(0, restoredRecord.key.length); + assertEquals(0, restoredRecord.value.length); + + } + + @Test + public void shouldUseStoreAsRecordConverterIfInnerStoreImplementsRecordConverter() { + initializeConsumer(1, 0, t2); + + stateManager.initialize(); + stateManager.register(new WrappedStateStore() { + @Override + public StateStore inner() { + return store2; + } + + @Override + public StateStore wrappedStore() { + return store2; + } + + @Override + public String name() { + return store2.name(); + } + + @Override + public void init(final ProcessorContext context, final StateStore root) { + store2.init(context, root); + } + + @Override + public void flush() { + store2.flush(); + } + + @Override + public void close() { + store2.close(); + } + + @Override + public boolean persistent() { + return store2.persistent(); + } + + @Override + public boolean isOpen() { + return store2.isOpen(); + } + }, stateRestoreCallback); + + final KeyValue restoredRecord = stateRestoreCallback.restored.get(0); + assertEquals(0, restoredRecord.key.length); + assertEquals(0, restoredRecord.value.length); + } + @Test public void shouldRestoreRecordsUpToHighwatermark() { - initializeConsumer(2, 1, t1); + initializeConsumer(2, 0, t1); stateManager.initialize(); @@ -231,7 +364,7 @@ public class GlobalStateManagerImplTest { @Test public void shouldRecoverFromInvalidOffsetExceptionAndRestoreRecords() { - initializeConsumer(2, 1, t1); + initializeConsumer(2, 0, t1); consumer.setException(new InvalidOffsetException("Try Again!") { public Set partitions() { return Collections.singleton(t1); @@ -252,7 +385,7 @@ public class GlobalStateManagerImplTest { stateManager.register(store1, stateRestoreCallback); assertThat(stateRestoreListener.restoreStartOffset, equalTo(1L)); - assertThat(stateRestoreListener.restoreEndOffset, equalTo(5L)); + assertThat(stateRestoreListener.restoreEndOffset, equalTo(6L)); assertThat(stateRestoreListener.totalNumRestored, equalTo(5L)); @@ -263,11 +396,11 @@ public class GlobalStateManagerImplTest { @Test public void shouldRestoreRecordsFromCheckpointToHighwatermark() throws IOException { - initializeConsumer(5, 6, t1); + initializeConsumer(5, 5, t1); final OffsetCheckpoint offsetCheckpoint = new OffsetCheckpoint(new File(stateManager.baseDir(), ProcessorStateManager.CHECKPOINT_FILE_NAME)); - offsetCheckpoint.write(Collections.singletonMap(t1, 6L)); + offsetCheckpoint.write(Collections.singletonMap(t1, 5L)); stateManager.initialize(); stateManager.register(store1, stateRestoreCallback); @@ -279,9 +412,9 @@ public class GlobalStateManagerImplTest { public void shouldFlushStateStores() { stateManager.initialize(); // register the stores - initializeConsumer(1, 1, t1); + initializeConsumer(1, 0, t1); stateManager.register(store1, stateRestoreCallback); - initializeConsumer(1, 1, t2); + initializeConsumer(1, 0, t2); stateManager.register(store2, stateRestoreCallback); stateManager.flush(); @@ -293,7 +426,7 @@ public class GlobalStateManagerImplTest { public void shouldThrowProcessorStateStoreExceptionIfStoreFlushFailed() { stateManager.initialize(); // register the stores - initializeConsumer(1, 1, t1); + initializeConsumer(1, 0, t1); stateManager.register(new NoOpReadOnlyStore(store1.name()) { @Override public void flush() { @@ -308,12 +441,12 @@ public class GlobalStateManagerImplTest { public void shouldCloseStateStores() throws IOException { stateManager.initialize(); // register the stores - initializeConsumer(1, 1, t1); + initializeConsumer(1, 0, t1); stateManager.register(store1, stateRestoreCallback); - initializeConsumer(1, 1, t2); + initializeConsumer(1, 0, t2); stateManager.register(store2, stateRestoreCallback); - stateManager.close(Collections.emptyMap()); + stateManager.close(Collections.emptyMap()); assertFalse(store1.isOpen()); assertFalse(store2.isOpen()); } @@ -321,7 +454,7 @@ public class GlobalStateManagerImplTest { @Test public void shouldWriteCheckpointsOnClose() throws IOException { stateManager.initialize(); - initializeConsumer(1, 1, t1); + initializeConsumer(1, 0, t1); stateManager.register(store1, stateRestoreCallback); final Map expected = Collections.singletonMap(t1, 25L); stateManager.close(expected); @@ -332,7 +465,7 @@ public class GlobalStateManagerImplTest { @Test(expected = ProcessorStateException.class) public void shouldThrowProcessorStateStoreExceptionIfStoreCloseFailed() throws IOException { stateManager.initialize(); - initializeConsumer(1, 1, t1); + initializeConsumer(1, 0, t1); stateManager.register(new NoOpReadOnlyStore(store1.name()) { @Override public void close() { @@ -340,7 +473,7 @@ public class GlobalStateManagerImplTest { } }, stateRestoreCallback); - stateManager.close(Collections.emptyMap()); + stateManager.close(Collections.emptyMap()); } @Test @@ -370,7 +503,7 @@ public class GlobalStateManagerImplTest { @Test public void shouldNotCloseStoresIfCloseAlreadyCalled() throws IOException { stateManager.initialize(); - initializeConsumer(1, 1, t1); + initializeConsumer(1, 0, t1); stateManager.register(new NoOpReadOnlyStore("t1-store") { @Override public void close() { @@ -380,17 +513,15 @@ public class GlobalStateManagerImplTest { super.close(); } }, stateRestoreCallback); - stateManager.close(Collections.emptyMap()); - + stateManager.close(Collections.emptyMap()); - stateManager.close(Collections.emptyMap()); + stateManager.close(Collections.emptyMap()); } @Test public void shouldAttemptToCloseAllStoresEvenWhenSomeException() throws IOException { stateManager.initialize(); - initializeConsumer(1, 1, t1); - initializeConsumer(1, 1, t2); + initializeConsumer(1, 0, t1); final NoOpReadOnlyStore store = new NoOpReadOnlyStore("t1-store") { @Override public void close() { @@ -400,10 +531,11 @@ public class GlobalStateManagerImplTest { }; stateManager.register(store, stateRestoreCallback); + initializeConsumer(1, 0, t2); stateManager.register(store2, stateRestoreCallback); try { - stateManager.close(Collections.emptyMap()); + stateManager.close(Collections.emptyMap()); } catch (final ProcessorStateException e) { // expected } @@ -443,9 +575,9 @@ public class GlobalStateManagerImplTest { @Test public void shouldNotRemoveOffsetsOfUnUpdatedTablesDuringCheckpoint() { stateManager.initialize(); - initializeConsumer(10, 1, t1); + initializeConsumer(10, 0, t1); stateManager.register(store1, stateRestoreCallback); - initializeConsumer(20, 1, t2); + initializeConsumer(20, 0, t2); stateManager.register(store2, stateRestoreCallback); final Map initialCheckpoint = stateManager.checkpointed(); @@ -461,12 +593,12 @@ public class GlobalStateManagerImplTest { final HashMap startOffsets = new HashMap<>(); startOffsets.put(t1, 1L); final HashMap endOffsets = new HashMap<>(); - endOffsets.put(t1, 2L); + endOffsets.put(t1, 3L); consumer.updatePartitions(t1.topic(), Collections.singletonList(new PartitionInfo(t1.topic(), t1.partition(), null, null, null))); consumer.assign(Collections.singletonList(t1)); consumer.updateEndOffsets(endOffsets); consumer.updateBeginningOffsets(startOffsets); - consumer.addRecord(new ConsumerRecord<>(t1.topic(), t1.partition(), 1, (byte[]) null, "null".getBytes())); + consumer.addRecord(new ConsumerRecord<>(t1.topic(), t1.partition(), 1, null, "null".getBytes())); final byte[] expectedKey = "key".getBytes(); final byte[] expectedValue = "value".getBytes(); consumer.addRecord(new ConsumerRecord<>(t1.topic(), t1.partition(), 2, expectedKey, expectedValue)); @@ -480,19 +612,19 @@ public class GlobalStateManagerImplTest { @Test public void shouldCheckpointRestoredOffsetsToFile() throws IOException { stateManager.initialize(); - initializeConsumer(10, 1, t1); + initializeConsumer(10, 0, t1); stateManager.register(store1, stateRestoreCallback); - stateManager.close(Collections.emptyMap()); + stateManager.close(Collections.emptyMap()); final Map checkpointMap = stateManager.checkpointed(); - assertThat(checkpointMap, equalTo(Collections.singletonMap(t1, 11L))); + assertThat(checkpointMap, equalTo(Collections.singletonMap(t1, 10L))); assertThat(readOffsetsCheckpoint(), equalTo(checkpointMap)); } @Test public void shouldSkipGlobalInMemoryStoreOffsetsToFile() throws IOException { stateManager.initialize(); - initializeConsumer(10, 1, t3); + initializeConsumer(10, 0, t3); stateManager.register(store3, stateRestoreCallback); stateManager.close(Collections.emptyMap()); @@ -666,9 +798,9 @@ public class GlobalStateManagerImplTest { private void initializeConsumer(final long numRecords, final long startOffset, final TopicPartition topicPartition) { final HashMap startOffsets = new HashMap<>(); - startOffsets.put(topicPartition, 1L); + startOffsets.put(topicPartition, startOffset); final HashMap endOffsets = new HashMap<>(); - endOffsets.put(topicPartition, startOffset + numRecords - 1); + endOffsets.put(topicPartition, startOffset + numRecords); consumer.updatePartitions(topicPartition.topic(), Collections.singletonList(new PartitionInfo(topicPartition.topic(), topicPartition.partition(), null, null, null))); consumer.assign(Collections.singletonList(topicPartition)); consumer.updateEndOffsets(endOffsets); @@ -694,4 +826,20 @@ public class GlobalStateManagerImplTest { restored.add(KeyValue.pair(key, value)); } } + + + + private class ConverterStore extends NoOpReadOnlyStore implements RecordConverter { + + ConverterStore(final String name, + final boolean rocksdbStore) { + super(name, rocksdbStore); + } + + @Override + public ConsumerRecord convert(final ConsumerRecord record) { + return new ConsumerRecord<>("", 0, 0L, "".getBytes(), "".getBytes()); + } + } + } diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorStateManagerTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorStateManagerTest.java index 90f24201931..dee948f3df5 100644 --- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorStateManagerTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorStateManagerTest.java @@ -30,6 +30,7 @@ import org.apache.kafka.streams.processor.StateStore; import org.apache.kafka.streams.processor.TaskId; import org.apache.kafka.streams.processor.internals.testutil.LogCaptureAppender; import org.apache.kafka.streams.state.internals.OffsetCheckpoint; +import org.apache.kafka.streams.state.RecordConverter; import org.apache.kafka.test.MockBatchingStateRestoreListener; import org.apache.kafka.test.MockKeyValueStore; import org.apache.kafka.test.NoOpProcessorContext; @@ -157,6 +158,28 @@ public class ProcessorStateManagerTest { } } + @Test + public void shouldConvertDataOnRestoreIfStoreImplementsRecordConverter() throws Exception { + final TaskId taskId = new TaskId(0, 2); + final Integer intKey = 2; + + final MockKeyValueStore persistentStore = getConverterStore(); + final ProcessorStateManager stateMgr = getStandByStateManager(taskId); + + try { + stateMgr.register(persistentStore, persistentStore.stateRestoreCallback); + stateMgr.updateStandbyStates( + persistentStorePartition, + singletonList(consumerRecord), + consumerRecord.offset() + ); + assertThat(persistentStore.keys.size(), is(1)); + assertTrue(persistentStore.keys.contains(intKey)); + } finally { + stateMgr.close(Collections.emptyMap()); + } + } + @Test public void testRegisterPersistentStore() throws IOException { final TaskId taskId = new TaskId(0, 2); @@ -770,4 +793,23 @@ public class ProcessorStateManagerTest { return new MockKeyValueStore("persistentStore", true); } + private MockKeyValueStore getConverterStore() { + return new ConverterStore("persistentStore", true); + } + + + + private class ConverterStore extends MockKeyValueStore implements RecordConverter { + + ConverterStore(final String name, + final boolean persistent) { + super(name, persistent); + } + + @Override + public ConsumerRecord convert(final ConsumerRecord record) { + return new ConsumerRecord<>("", 0, 0L, new byte[]{0x0, 0x0, 0x0, 0x2}, "".getBytes()); + } + } + } diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StateRestorerTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StateRestorerTest.java index 3fa4b1e3745..d2eda4bd8c5 100644 --- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StateRestorerTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StateRestorerTest.java @@ -41,7 +41,8 @@ public class StateRestorerTest { null, OFFSET_LIMIT, true, - "storeName"); + "storeName", + new DefaultRecordConverter()); @Before public void setUp() { @@ -77,7 +78,8 @@ public class StateRestorerTest { null, 0, true, - "storeName"); + "storeName", + new DefaultRecordConverter()); assertTrue(restorer.hasCompleted(0, 10)); } @@ -105,4 +107,4 @@ public class StateRestorerTest { restorer.setRestoredOffset(100); assertThat(restorer.restoredNumRecords(), equalTo(OFFSET_LIMIT - 20)); } -} \ No newline at end of file +} diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StoreChangelogReaderTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StoreChangelogReaderTest.java index ebe50c026a5..974e62da797 100644 --- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StoreChangelogReaderTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StoreChangelogReaderTest.java @@ -106,7 +106,8 @@ public class StoreChangelogReaderTest { null, Long.MAX_VALUE, true, - "storeName")); + "storeName", + new DefaultRecordConverter())); changelogReader.restore(active); assertTrue(functionCalled.get()); } @@ -143,7 +144,8 @@ public class StoreChangelogReaderTest { null, Long.MAX_VALUE, true, - "storeName")); + "storeName", + new DefaultRecordConverter())); expect(active.restoringTaskFor(topicPartition)).andStubReturn(task); replay(active, task); changelogReader.restore(active); @@ -167,7 +169,8 @@ public class StoreChangelogReaderTest { null, Long.MAX_VALUE, true, - "storeName")); + "storeName", + new DefaultRecordConverter())); EasyMock.expect(active.restoringTaskFor(topicPartition)).andStubReturn(task); EasyMock.replay(active, task); @@ -181,7 +184,8 @@ public class StoreChangelogReaderTest { null, Long.MAX_VALUE, true, - "storeName")); + "storeName", + new DefaultRecordConverter())); // retry restore should succeed assertEquals(1, changelogReader.restore(active).size()); assertThat(callback.restored.size(), equalTo(messages)); @@ -200,12 +204,13 @@ public class StoreChangelogReaderTest { consumer.assign(Collections.emptyList()); final StateRestorer stateRestorer = new StateRestorer( - topicPartition, - restoreListener, - expiredCheckpoint, - Long.MAX_VALUE, - true, - "storeName"); + topicPartition, + restoreListener, + expiredCheckpoint, + Long.MAX_VALUE, + true, + "storeName", + new DefaultRecordConverter()); changelogReader.register(stateRestorer); EasyMock.expect(active.restoringTaskFor(topicPartition)).andStubReturn(task); @@ -234,7 +239,8 @@ public class StoreChangelogReaderTest { 5L, Long.MAX_VALUE, true, - "storeName")); + "storeName", + new DefaultRecordConverter())); changelogReader.restore(active); assertThat(callback.restored.size(), equalTo(5)); @@ -250,7 +256,8 @@ public class StoreChangelogReaderTest { null, Long.MAX_VALUE, true, - "storeName")); + "storeName", + new DefaultRecordConverter())); expect(active.restoringTaskFor(topicPartition)).andStubReturn(task); replay(active, task); changelogReader.restore(active); @@ -266,7 +273,8 @@ public class StoreChangelogReaderTest { null, 3, true, - "storeName"); + "storeName", + new DefaultRecordConverter()); changelogReader.register(restorer); expect(active.restoringTaskFor(topicPartition)).andStubReturn(task); replay(active, task); @@ -293,21 +301,24 @@ public class StoreChangelogReaderTest { null, Long.MAX_VALUE, true, - "storeName1")); + "storeName1", + new DefaultRecordConverter())); changelogReader.register(new StateRestorer( one, restoreListener1, null, Long.MAX_VALUE, true, - "storeName2")); + "storeName2", + new DefaultRecordConverter())); changelogReader.register(new StateRestorer( two, restoreListener2, null, Long.MAX_VALUE, true, - "storeName3")); + "storeName3", + new DefaultRecordConverter())); expect(active.restoringTaskFor(one)).andStubReturn(task); expect(active.restoringTaskFor(two)).andStubReturn(task); @@ -338,21 +349,24 @@ public class StoreChangelogReaderTest { 0L, Long.MAX_VALUE, true, - "storeName1")); + "storeName1", + new DefaultRecordConverter())); changelogReader.register(new StateRestorer( one, restoreListener1, 0L, Long.MAX_VALUE, true, - "storeName2")); + "storeName2", + new DefaultRecordConverter())); changelogReader.register(new StateRestorer( two, restoreListener2, 0L, Long.MAX_VALUE, true, - "storeName3")); + "storeName3", + new DefaultRecordConverter())); expect(active.restoringTaskFor(one)).andReturn(task); expect(active.restoringTaskFor(two)).andReturn(task); @@ -386,7 +400,8 @@ public class StoreChangelogReaderTest { 0L, 5, true, - "storeName1")); + "storeName1", + new DefaultRecordConverter())); expect(active.restoringTaskFor(topicPartition)).andStubReturn(task); replay(active, task); changelogReader.restore(active); @@ -421,7 +436,8 @@ public class StoreChangelogReaderTest { null, Long.MAX_VALUE, true, - "storeName"); + "storeName", + new DefaultRecordConverter()); setupConsumer(0, topicPartition); changelogReader.register(restorer); @@ -440,7 +456,8 @@ public class StoreChangelogReaderTest { endOffset, Long.MAX_VALUE, true, - "storeName"); + "storeName", + new DefaultRecordConverter()); changelogReader.register(restorer); @@ -458,7 +475,8 @@ public class StoreChangelogReaderTest { null, Long.MAX_VALUE, true, - "storeName")); + "storeName", + new DefaultRecordConverter())); expect(active.restoringTaskFor(topicPartition)).andStubReturn(task); replay(active, task); changelogReader.restore(active); @@ -476,7 +494,8 @@ public class StoreChangelogReaderTest { null, Long.MAX_VALUE, false, - "storeName")); + "storeName", + new DefaultRecordConverter())); expect(active.restoringTaskFor(topicPartition)).andStubReturn(task); replay(active, task); changelogReader.restore(active); @@ -498,7 +517,8 @@ public class StoreChangelogReaderTest { null, Long.MAX_VALUE, false, - "storeName")); + "storeName", + new DefaultRecordConverter())); expect(active.restoringTaskFor(topicPartition)).andStubReturn(task); replay(active, task); changelogReader.restore(active); @@ -516,7 +536,8 @@ public class StoreChangelogReaderTest { null, Long.MAX_VALUE, true, - "store")); + "store", + new DefaultRecordConverter())); expect(active.restoringTaskFor(topicPartition)).andStubReturn(task); replay(active, task); @@ -537,7 +558,8 @@ public class StoreChangelogReaderTest { null, Long.MAX_VALUE, false, - "storeName")); + "storeName", + new DefaultRecordConverter())); final TopicPartition postInitialization = new TopicPartition("other", 0); expect(active.restoringTaskFor(topicPartition)).andStubReturn(task); @@ -558,7 +580,8 @@ public class StoreChangelogReaderTest { null, Long.MAX_VALUE, false, - "otherStore")); + "otherStore", + new DefaultRecordConverter())); final Collection expected = Utils.mkSet(topicPartition, postInitialization); consumer.assign(expected); @@ -581,7 +604,8 @@ public class StoreChangelogReaderTest { null, 9L, true, - "storeName")); + "storeName", + new DefaultRecordConverter())); expect(active.restoringTaskFor(topicPartition)).andReturn(task); replay(active); @@ -602,7 +626,8 @@ public class StoreChangelogReaderTest { null, Long.MAX_VALUE, true, - "storeName")); + "storeName", + new DefaultRecordConverter())); expect(active.restoringTaskFor(topicPartition)).andReturn(task); replay(active); @@ -622,7 +647,8 @@ public class StoreChangelogReaderTest { null, Long.MAX_VALUE, true, - "storeName")); + "storeName", + new DefaultRecordConverter())); expect(active.restoringTaskFor(topicPartition)).andReturn(task); replay(active); @@ -641,7 +667,8 @@ public class StoreChangelogReaderTest { null, 5, true, - "storeName")); + "storeName", + new DefaultRecordConverter())); expect(active.restoringTaskFor(topicPartition)).andReturn(task); replay(active); @@ -661,7 +688,8 @@ public class StoreChangelogReaderTest { null, 10, true, - "storeName")); + "storeName", + new DefaultRecordConverter())); expect(active.restoringTaskFor(topicPartition)).andReturn(task); replay(active); @@ -688,7 +716,8 @@ public class StoreChangelogReaderTest { null, 6, true, - "storeName")); + "storeName", + new DefaultRecordConverter())); expect(active.restoringTaskFor(topicPartition)).andReturn(task); replay(active); @@ -710,7 +739,8 @@ public class StoreChangelogReaderTest { null, 11, true, - "storeName")); + "storeName", + new DefaultRecordConverter())); expect(active.restoringTaskFor(topicPartition)).andReturn(task); replay(active);