Browse Source

KAFKA-3522: Add internal RecordConverter interface (#6150)

Reviewers: Bill Bejeck <bill@confluent.io>, Guozhang Wang <guozhang@confluent.io>
pull/1894/merge
Matthias J. Sax 6 years ago committed by GitHub
parent
commit
ed7b67dd11
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 29
      streams/src/main/java/org/apache/kafka/streams/processor/internals/DefaultRecordConverter.java
  2. 19
      streams/src/main/java/org/apache/kafka/streams/processor/internals/GlobalStateManagerImpl.java
  3. 24
      streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorStateManager.java
  4. 13
      streams/src/main/java/org/apache/kafka/streams/processor/internals/StateRestorer.java
  5. 35
      streams/src/main/java/org/apache/kafka/streams/state/RecordConverter.java
  6. 10
      streams/src/main/java/org/apache/kafka/streams/state/internals/WrappedStateStore.java
  7. 216
      streams/src/test/java/org/apache/kafka/streams/processor/internals/GlobalStateManagerImplTest.java
  8. 42
      streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorStateManagerTest.java
  9. 8
      streams/src/test/java/org/apache/kafka/streams/processor/internals/StateRestorerTest.java
  10. 100
      streams/src/test/java/org/apache/kafka/streams/processor/internals/StoreChangelogReaderTest.java

29
streams/src/main/java/org/apache/kafka/streams/processor/internals/DefaultRecordConverter.java

@ -0,0 +1,29 @@ @@ -0,0 +1,29 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.streams.processor.internals;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.streams.state.RecordConverter;
public class DefaultRecordConverter implements RecordConverter {
@Override
public ConsumerRecord<byte[], byte[]> convert(final ConsumerRecord<byte[], byte[]> record) {
return record;
}
}

19
streams/src/main/java/org/apache/kafka/streams/processor/internals/GlobalStateManagerImpl.java

@ -32,6 +32,8 @@ import org.apache.kafka.streams.errors.StreamsException; @@ -32,6 +32,8 @@ import org.apache.kafka.streams.errors.StreamsException;
import org.apache.kafka.streams.processor.StateRestoreCallback;
import org.apache.kafka.streams.processor.StateRestoreListener;
import org.apache.kafka.streams.processor.StateStore;
import org.apache.kafka.streams.state.RecordConverter;
import org.apache.kafka.streams.state.internals.WrappedStateStore;
import org.slf4j.Logger;
import java.io.File;
@ -195,7 +197,17 @@ public class GlobalStateManagerImpl extends AbstractStateManager implements Glob @@ -195,7 +197,17 @@ public class GlobalStateManagerImpl extends AbstractStateManager implements Glob
}
}
try {
restoreState(stateRestoreCallback, topicPartitions, highWatermarks, store.name());
final StateStore stateStore =
store instanceof WrappedStateStore ? ((WrappedStateStore) store).inner() : store;
final RecordConverter recordConverter =
stateStore instanceof RecordConverter ? (RecordConverter) stateStore : new DefaultRecordConverter();
restoreState(
stateRestoreCallback,
topicPartitions,
highWatermarks,
store.name(),
recordConverter);
globalStores.put(store.name(), store);
} finally {
globalConsumer.unsubscribe();
@ -249,7 +261,8 @@ public class GlobalStateManagerImpl extends AbstractStateManager implements Glob @@ -249,7 +261,8 @@ public class GlobalStateManagerImpl extends AbstractStateManager implements Glob
private void restoreState(final StateRestoreCallback stateRestoreCallback,
final List<TopicPartition> topicPartitions,
final Map<TopicPartition, Long> highWatermarks,
final String storeName) {
final String storeName,
final RecordConverter recordConverter) {
for (final TopicPartition topicPartition : topicPartitions) {
globalConsumer.assign(Collections.singletonList(topicPartition));
final Long checkpoint = checkpointableOffsets.get(topicPartition);
@ -273,7 +286,7 @@ public class GlobalStateManagerImpl extends AbstractStateManager implements Glob @@ -273,7 +286,7 @@ public class GlobalStateManagerImpl extends AbstractStateManager implements Glob
final List<ConsumerRecord<byte[], byte[]>> restoreRecords = new ArrayList<>();
for (final ConsumerRecord<byte[], byte[]> record : records.records(topicPartition)) {
if (record.key() != null) {
restoreRecords.add(record);
restoreRecords.add(recordConverter.convert(record));
}
}
offset = globalConsumer.position(topicPartition);

24
streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorStateManager.java

@ -24,6 +24,8 @@ import org.apache.kafka.streams.processor.StateRestoreCallback; @@ -24,6 +24,8 @@ import org.apache.kafka.streams.processor.StateRestoreCallback;
import org.apache.kafka.streams.processor.StateStore;
import org.apache.kafka.streams.processor.TaskId;
import org.apache.kafka.streams.state.internals.OffsetCheckpoint;
import org.apache.kafka.streams.state.RecordConverter;
import org.apache.kafka.streams.state.internals.WrappedStateStore;
import org.slf4j.Logger;
import java.io.File;
@ -48,6 +50,7 @@ public class ProcessorStateManager extends AbstractStateManager { @@ -48,6 +50,7 @@ public class ProcessorStateManager extends AbstractStateManager {
private final Map<TopicPartition, Long> offsetLimits;
private final Map<TopicPartition, Long> standbyRestoredOffsets;
private final Map<String, StateRestoreCallback> restoreCallbacks; // used for standby tasks, keyed by state topic name
private final Map<String, RecordConverter> recordConverters; // used for standby tasks, keyed by state topic name
private final Map<String, String> storeToChangelogTopic;
private final List<TopicPartition> changelogPartitions = new ArrayList<>();
@ -82,6 +85,7 @@ public class ProcessorStateManager extends AbstractStateManager { @@ -82,6 +85,7 @@ public class ProcessorStateManager extends AbstractStateManager {
standbyRestoredOffsets = new HashMap<>();
this.isStandby = isStandby;
restoreCallbacks = isStandby ? new HashMap<>() : null;
recordConverters = isStandby ? new HashMap<>() : null;
this.storeToChangelogTopic = storeToChangelogTopic;
// load the checkpoint information
@ -129,18 +133,28 @@ public class ProcessorStateManager extends AbstractStateManager { @@ -129,18 +133,28 @@ public class ProcessorStateManager extends AbstractStateManager {
final TopicPartition storePartition = new TopicPartition(topic, getPartition(topic));
final StateStore stateStore =
store instanceof WrappedStateStore ? ((WrappedStateStore) store).inner() : store;
final RecordConverter recordConverter =
stateStore instanceof RecordConverter ? (RecordConverter) stateStore : new DefaultRecordConverter();
if (isStandby) {
log.trace("Preparing standby replica of persistent state store {} with changelog topic {}", storeName, topic);
restoreCallbacks.put(topic, stateRestoreCallback);
recordConverters.put(topic, recordConverter);
} else {
log.trace("Restoring state store {} from changelog topic {}", storeName, topic);
final StateRestorer restorer = new StateRestorer(
storePartition,
new CompositeRestoreListener(stateRestoreCallback),
checkpointableOffsets.get(storePartition),
offsetLimit(storePartition),
store.persistent(),
storeName);
storeName,
recordConverter
);
changelogReader.register(restorer);
}
@ -181,8 +195,14 @@ public class ProcessorStateManager extends AbstractStateManager { @@ -181,8 +195,14 @@ public class ProcessorStateManager extends AbstractStateManager {
final RecordBatchingStateRestoreCallback restoreCallback = adapt(restoreCallbacks.get(storePartition.topic()));
if (!restoreRecords.isEmpty()) {
final RecordConverter converter = recordConverters.get(storePartition.topic());
final List<ConsumerRecord<byte[], byte[]>> convertedRecords = new ArrayList<>(restoreRecords.size());
for (final ConsumerRecord<byte[], byte[]> record : restoreRecords) {
convertedRecords.add(converter.convert(record));
}
try {
restoreCallback.restoreBatch(restoreRecords);
restoreCallback.restoreBatch(convertedRecords);
} catch (final Exception e) {
throw new ProcessorStateException(String.format("%sException caught while trying to restore state from %s", logPrefix, storePartition), e);
}

13
streams/src/main/java/org/apache/kafka/streams/processor/internals/StateRestorer.java

@ -19,7 +19,9 @@ package org.apache.kafka.streams.processor.internals; @@ -19,7 +19,9 @@ package org.apache.kafka.streams.processor.internals;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.streams.processor.StateRestoreListener;
import org.apache.kafka.streams.state.RecordConverter;
import java.util.ArrayList;
import java.util.Collection;
public class StateRestorer {
@ -31,6 +33,7 @@ public class StateRestorer { @@ -31,6 +33,7 @@ public class StateRestorer {
private final String storeName;
private final TopicPartition partition;
private final CompositeRestoreListener compositeRestoreListener;
private final RecordConverter recordConverter;
private long checkpointOffset;
private long restoredOffset;
@ -42,13 +45,15 @@ public class StateRestorer { @@ -42,13 +45,15 @@ public class StateRestorer {
final Long checkpoint,
final long offsetLimit,
final boolean persistent,
final String storeName) {
final String storeName,
final RecordConverter recordConverter) {
this.partition = partition;
this.compositeRestoreListener = compositeRestoreListener;
this.checkpointOffset = checkpoint == null ? NO_CHECKPOINT : checkpoint;
this.offsetLimit = offsetLimit;
this.persistent = persistent;
this.storeName = storeName;
this.recordConverter = recordConverter;
}
public TopicPartition partition() {
@ -80,7 +85,11 @@ public class StateRestorer { @@ -80,7 +85,11 @@ public class StateRestorer {
}
void restore(final Collection<ConsumerRecord<byte[], byte[]>> records) {
compositeRestoreListener.restoreBatch(records);
final Collection<ConsumerRecord<byte[], byte[]>> convertedRecords = new ArrayList<>(records.size());
for (final ConsumerRecord<byte[], byte[]> record : records) {
convertedRecords.add(recordConverter.convert(record));
}
compositeRestoreListener.restoreBatch(convertedRecords);
}
boolean isPersistent() {

35
streams/src/main/java/org/apache/kafka/streams/state/RecordConverter.java

@ -0,0 +1,35 @@ @@ -0,0 +1,35 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.streams.state;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.streams.KeyValue;
/**
* {@code RecordConverter} translates a {@link ConsumerRecord} into a {@link KeyValue} pair.
*/
public interface RecordConverter {
/**
* Convert a given record into a key-value pair.
*
* @param record the consumer record
* @return the record as key-value pair
*/
ConsumerRecord<byte[], byte[]> convert(final ConsumerRecord<byte[], byte[]> record);
}

10
streams/src/main/java/org/apache/kafka/streams/state/internals/WrappedStateStore.java

@ -16,9 +16,11 @@ @@ -16,9 +16,11 @@
*/
package org.apache.kafka.streams.state.internals;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.streams.errors.InvalidStateStoreException;
import org.apache.kafka.streams.processor.ProcessorContext;
import org.apache.kafka.streams.processor.StateStore;
import org.apache.kafka.streams.state.RecordConverter;
/**
* A storage engine wrapper for utilities like logging, caching, and metering.
@ -38,7 +40,7 @@ public interface WrappedStateStore extends StateStore { @@ -38,7 +40,7 @@ public interface WrappedStateStore extends StateStore {
*/
StateStore wrappedStore();
abstract class AbstractStateStore implements WrappedStateStore {
abstract class AbstractStateStore implements WrappedStateStore, RecordConverter {
final StateStore innerState;
protected AbstractStateStore(final StateStore inner) {
@ -94,5 +96,11 @@ public interface WrappedStateStore extends StateStore { @@ -94,5 +96,11 @@ public interface WrappedStateStore extends StateStore {
public StateStore wrappedStore() {
return innerState;
}
@Override
public ConsumerRecord<byte[], byte[]> convert(final ConsumerRecord<byte[], byte[]> record) {
return ((RecordConverter) innerState).convert(record);
}
}
}

216
streams/src/test/java/org/apache/kafka/streams/processor/internals/GlobalStateManagerImplTest.java

@ -31,8 +31,12 @@ import org.apache.kafka.streams.StreamsConfig; @@ -31,8 +31,12 @@ import org.apache.kafka.streams.StreamsConfig;
import org.apache.kafka.streams.errors.LockException;
import org.apache.kafka.streams.errors.ProcessorStateException;
import org.apache.kafka.streams.errors.StreamsException;
import org.apache.kafka.streams.processor.ProcessorContext;
import org.apache.kafka.streams.processor.StateRestoreCallback;
import org.apache.kafka.streams.processor.StateStore;
import org.apache.kafka.streams.state.internals.OffsetCheckpoint;
import org.apache.kafka.streams.state.RecordConverter;
import org.apache.kafka.streams.state.internals.WrappedStateStore;
import org.apache.kafka.test.InternalMockProcessorContext;
import org.apache.kafka.test.MockStateRestoreListener;
import org.apache.kafka.test.NoOpReadOnlyStore;
@ -90,7 +94,7 @@ public class GlobalStateManagerImplTest { @@ -90,7 +94,7 @@ public class GlobalStateManagerImplTest {
private InternalMockProcessorContext processorContext;
@Before
public void before() throws IOException {
public void before() {
final Map<String, String> storeToTopic = new HashMap<>();
storeToTopic.put(storeName1, t1.topic());
@ -99,7 +103,7 @@ public class GlobalStateManagerImplTest { @@ -99,7 +103,7 @@ public class GlobalStateManagerImplTest {
storeToTopic.put(storeName4, t4.topic());
store1 = new NoOpReadOnlyStore<>(storeName1, true);
store2 = new NoOpReadOnlyStore<>(storeName2, true);
store2 = new ConverterStore<>(storeName2, true);
store3 = new NoOpReadOnlyStore<>(storeName3);
store4 = new NoOpReadOnlyStore<>(storeName4);
@ -198,7 +202,7 @@ public class GlobalStateManagerImplTest { @@ -198,7 +202,7 @@ public class GlobalStateManagerImplTest {
@Test
public void shouldThrowIllegalArgumentExceptionIfAttemptingToRegisterStoreTwice() {
stateManager.initialize();
initializeConsumer(2, 1, t1);
initializeConsumer(2, 0, t1);
stateManager.register(store1, stateRestoreCallback);
try {
stateManager.register(store1, stateRestoreCallback);
@ -219,9 +223,138 @@ public class GlobalStateManagerImplTest { @@ -219,9 +223,138 @@ public class GlobalStateManagerImplTest {
}
}
@Test
public void shouldUseDefaultRecordConverterIfStoreDoesNotImplementRecordConverter() {
initializeConsumer(1, 0, t1);
stateManager.initialize();
stateManager.register(store1, stateRestoreCallback);
final KeyValue<byte[], byte[]> restoredRecord = stateRestoreCallback.restored.get(0);
assertEquals(3, restoredRecord.key.length);
assertEquals(5, restoredRecord.value.length);
}
@Test
public void shouldUseDefaultRecordConverterIfInnerStoreDoesNotImplementRecordConverter() {
initializeConsumer(1, 0, t1);
stateManager.initialize();
stateManager.register(new WrappedStateStore() {
@Override
public StateStore inner() {
return store1;
}
@Override
public StateStore wrappedStore() {
return store1;
}
@Override
public String name() {
return store1.name();
}
@Override
public void init(final ProcessorContext context, final StateStore root) {
store1.init(context, root);
}
@Override
public void flush() {
store1.flush();
}
@Override
public void close() {
store1.close();
}
@Override
public boolean persistent() {
return store1.persistent();
}
@Override
public boolean isOpen() {
return store1.isOpen();
}
}, stateRestoreCallback);
final KeyValue<byte[], byte[]> restoredRecord = stateRestoreCallback.restored.get(0);
assertEquals(3, restoredRecord.key.length);
assertEquals(5, restoredRecord.value.length);
}
@Test
public void shouldUseStoreAsRecordConverterIfStoreImplementsRecordConverter() {
initializeConsumer(1, 0, t2);
stateManager.initialize();
stateManager.register(store2, stateRestoreCallback);
final KeyValue<byte[], byte[]> restoredRecord = stateRestoreCallback.restored.get(0);
assertEquals(0, restoredRecord.key.length);
assertEquals(0, restoredRecord.value.length);
}
@Test
public void shouldUseStoreAsRecordConverterIfInnerStoreImplementsRecordConverter() {
initializeConsumer(1, 0, t2);
stateManager.initialize();
stateManager.register(new WrappedStateStore() {
@Override
public StateStore inner() {
return store2;
}
@Override
public StateStore wrappedStore() {
return store2;
}
@Override
public String name() {
return store2.name();
}
@Override
public void init(final ProcessorContext context, final StateStore root) {
store2.init(context, root);
}
@Override
public void flush() {
store2.flush();
}
@Override
public void close() {
store2.close();
}
@Override
public boolean persistent() {
return store2.persistent();
}
@Override
public boolean isOpen() {
return store2.isOpen();
}
}, stateRestoreCallback);
final KeyValue<byte[], byte[]> restoredRecord = stateRestoreCallback.restored.get(0);
assertEquals(0, restoredRecord.key.length);
assertEquals(0, restoredRecord.value.length);
}
@Test
public void shouldRestoreRecordsUpToHighwatermark() {
initializeConsumer(2, 1, t1);
initializeConsumer(2, 0, t1);
stateManager.initialize();
@ -231,7 +364,7 @@ public class GlobalStateManagerImplTest { @@ -231,7 +364,7 @@ public class GlobalStateManagerImplTest {
@Test
public void shouldRecoverFromInvalidOffsetExceptionAndRestoreRecords() {
initializeConsumer(2, 1, t1);
initializeConsumer(2, 0, t1);
consumer.setException(new InvalidOffsetException("Try Again!") {
public Set<TopicPartition> partitions() {
return Collections.singleton(t1);
@ -252,7 +385,7 @@ public class GlobalStateManagerImplTest { @@ -252,7 +385,7 @@ public class GlobalStateManagerImplTest {
stateManager.register(store1, stateRestoreCallback);
assertThat(stateRestoreListener.restoreStartOffset, equalTo(1L));
assertThat(stateRestoreListener.restoreEndOffset, equalTo(5L));
assertThat(stateRestoreListener.restoreEndOffset, equalTo(6L));
assertThat(stateRestoreListener.totalNumRestored, equalTo(5L));
@ -263,11 +396,11 @@ public class GlobalStateManagerImplTest { @@ -263,11 +396,11 @@ public class GlobalStateManagerImplTest {
@Test
public void shouldRestoreRecordsFromCheckpointToHighwatermark() throws IOException {
initializeConsumer(5, 6, t1);
initializeConsumer(5, 5, t1);
final OffsetCheckpoint offsetCheckpoint = new OffsetCheckpoint(new File(stateManager.baseDir(),
ProcessorStateManager.CHECKPOINT_FILE_NAME));
offsetCheckpoint.write(Collections.singletonMap(t1, 6L));
offsetCheckpoint.write(Collections.singletonMap(t1, 5L));
stateManager.initialize();
stateManager.register(store1, stateRestoreCallback);
@ -279,9 +412,9 @@ public class GlobalStateManagerImplTest { @@ -279,9 +412,9 @@ public class GlobalStateManagerImplTest {
public void shouldFlushStateStores() {
stateManager.initialize();
// register the stores
initializeConsumer(1, 1, t1);
initializeConsumer(1, 0, t1);
stateManager.register(store1, stateRestoreCallback);
initializeConsumer(1, 1, t2);
initializeConsumer(1, 0, t2);
stateManager.register(store2, stateRestoreCallback);
stateManager.flush();
@ -293,7 +426,7 @@ public class GlobalStateManagerImplTest { @@ -293,7 +426,7 @@ public class GlobalStateManagerImplTest {
public void shouldThrowProcessorStateStoreExceptionIfStoreFlushFailed() {
stateManager.initialize();
// register the stores
initializeConsumer(1, 1, t1);
initializeConsumer(1, 0, t1);
stateManager.register(new NoOpReadOnlyStore(store1.name()) {
@Override
public void flush() {
@ -308,12 +441,12 @@ public class GlobalStateManagerImplTest { @@ -308,12 +441,12 @@ public class GlobalStateManagerImplTest {
public void shouldCloseStateStores() throws IOException {
stateManager.initialize();
// register the stores
initializeConsumer(1, 1, t1);
initializeConsumer(1, 0, t1);
stateManager.register(store1, stateRestoreCallback);
initializeConsumer(1, 1, t2);
initializeConsumer(1, 0, t2);
stateManager.register(store2, stateRestoreCallback);
stateManager.close(Collections.<TopicPartition, Long>emptyMap());
stateManager.close(Collections.emptyMap());
assertFalse(store1.isOpen());
assertFalse(store2.isOpen());
}
@ -321,7 +454,7 @@ public class GlobalStateManagerImplTest { @@ -321,7 +454,7 @@ public class GlobalStateManagerImplTest {
@Test
public void shouldWriteCheckpointsOnClose() throws IOException {
stateManager.initialize();
initializeConsumer(1, 1, t1);
initializeConsumer(1, 0, t1);
stateManager.register(store1, stateRestoreCallback);
final Map<TopicPartition, Long> expected = Collections.singletonMap(t1, 25L);
stateManager.close(expected);
@ -332,7 +465,7 @@ public class GlobalStateManagerImplTest { @@ -332,7 +465,7 @@ public class GlobalStateManagerImplTest {
@Test(expected = ProcessorStateException.class)
public void shouldThrowProcessorStateStoreExceptionIfStoreCloseFailed() throws IOException {
stateManager.initialize();
initializeConsumer(1, 1, t1);
initializeConsumer(1, 0, t1);
stateManager.register(new NoOpReadOnlyStore(store1.name()) {
@Override
public void close() {
@ -340,7 +473,7 @@ public class GlobalStateManagerImplTest { @@ -340,7 +473,7 @@ public class GlobalStateManagerImplTest {
}
}, stateRestoreCallback);
stateManager.close(Collections.<TopicPartition, Long>emptyMap());
stateManager.close(Collections.emptyMap());
}
@Test
@ -370,7 +503,7 @@ public class GlobalStateManagerImplTest { @@ -370,7 +503,7 @@ public class GlobalStateManagerImplTest {
@Test
public void shouldNotCloseStoresIfCloseAlreadyCalled() throws IOException {
stateManager.initialize();
initializeConsumer(1, 1, t1);
initializeConsumer(1, 0, t1);
stateManager.register(new NoOpReadOnlyStore("t1-store") {
@Override
public void close() {
@ -380,17 +513,15 @@ public class GlobalStateManagerImplTest { @@ -380,17 +513,15 @@ public class GlobalStateManagerImplTest {
super.close();
}
}, stateRestoreCallback);
stateManager.close(Collections.<TopicPartition, Long>emptyMap());
stateManager.close(Collections.emptyMap());
stateManager.close(Collections.<TopicPartition, Long>emptyMap());
stateManager.close(Collections.emptyMap());
}
@Test
public void shouldAttemptToCloseAllStoresEvenWhenSomeException() throws IOException {
stateManager.initialize();
initializeConsumer(1, 1, t1);
initializeConsumer(1, 1, t2);
initializeConsumer(1, 0, t1);
final NoOpReadOnlyStore store = new NoOpReadOnlyStore("t1-store") {
@Override
public void close() {
@ -400,10 +531,11 @@ public class GlobalStateManagerImplTest { @@ -400,10 +531,11 @@ public class GlobalStateManagerImplTest {
};
stateManager.register(store, stateRestoreCallback);
initializeConsumer(1, 0, t2);
stateManager.register(store2, stateRestoreCallback);
try {
stateManager.close(Collections.<TopicPartition, Long>emptyMap());
stateManager.close(Collections.emptyMap());
} catch (final ProcessorStateException e) {
// expected
}
@ -443,9 +575,9 @@ public class GlobalStateManagerImplTest { @@ -443,9 +575,9 @@ public class GlobalStateManagerImplTest {
@Test
public void shouldNotRemoveOffsetsOfUnUpdatedTablesDuringCheckpoint() {
stateManager.initialize();
initializeConsumer(10, 1, t1);
initializeConsumer(10, 0, t1);
stateManager.register(store1, stateRestoreCallback);
initializeConsumer(20, 1, t2);
initializeConsumer(20, 0, t2);
stateManager.register(store2, stateRestoreCallback);
final Map<TopicPartition, Long> initialCheckpoint = stateManager.checkpointed();
@ -461,12 +593,12 @@ public class GlobalStateManagerImplTest { @@ -461,12 +593,12 @@ public class GlobalStateManagerImplTest {
final HashMap<TopicPartition, Long> startOffsets = new HashMap<>();
startOffsets.put(t1, 1L);
final HashMap<TopicPartition, Long> endOffsets = new HashMap<>();
endOffsets.put(t1, 2L);
endOffsets.put(t1, 3L);
consumer.updatePartitions(t1.topic(), Collections.singletonList(new PartitionInfo(t1.topic(), t1.partition(), null, null, null)));
consumer.assign(Collections.singletonList(t1));
consumer.updateEndOffsets(endOffsets);
consumer.updateBeginningOffsets(startOffsets);
consumer.addRecord(new ConsumerRecord<>(t1.topic(), t1.partition(), 1, (byte[]) null, "null".getBytes()));
consumer.addRecord(new ConsumerRecord<>(t1.topic(), t1.partition(), 1, null, "null".getBytes()));
final byte[] expectedKey = "key".getBytes();
final byte[] expectedValue = "value".getBytes();
consumer.addRecord(new ConsumerRecord<>(t1.topic(), t1.partition(), 2, expectedKey, expectedValue));
@ -480,19 +612,19 @@ public class GlobalStateManagerImplTest { @@ -480,19 +612,19 @@ public class GlobalStateManagerImplTest {
@Test
public void shouldCheckpointRestoredOffsetsToFile() throws IOException {
stateManager.initialize();
initializeConsumer(10, 1, t1);
initializeConsumer(10, 0, t1);
stateManager.register(store1, stateRestoreCallback);
stateManager.close(Collections.<TopicPartition, Long>emptyMap());
stateManager.close(Collections.emptyMap());
final Map<TopicPartition, Long> checkpointMap = stateManager.checkpointed();
assertThat(checkpointMap, equalTo(Collections.singletonMap(t1, 11L)));
assertThat(checkpointMap, equalTo(Collections.singletonMap(t1, 10L)));
assertThat(readOffsetsCheckpoint(), equalTo(checkpointMap));
}
@Test
public void shouldSkipGlobalInMemoryStoreOffsetsToFile() throws IOException {
stateManager.initialize();
initializeConsumer(10, 1, t3);
initializeConsumer(10, 0, t3);
stateManager.register(store3, stateRestoreCallback);
stateManager.close(Collections.emptyMap());
@ -666,9 +798,9 @@ public class GlobalStateManagerImplTest { @@ -666,9 +798,9 @@ public class GlobalStateManagerImplTest {
private void initializeConsumer(final long numRecords, final long startOffset, final TopicPartition topicPartition) {
final HashMap<TopicPartition, Long> startOffsets = new HashMap<>();
startOffsets.put(topicPartition, 1L);
startOffsets.put(topicPartition, startOffset);
final HashMap<TopicPartition, Long> endOffsets = new HashMap<>();
endOffsets.put(topicPartition, startOffset + numRecords - 1);
endOffsets.put(topicPartition, startOffset + numRecords);
consumer.updatePartitions(topicPartition.topic(), Collections.singletonList(new PartitionInfo(topicPartition.topic(), topicPartition.partition(), null, null, null)));
consumer.assign(Collections.singletonList(topicPartition));
consumer.updateEndOffsets(endOffsets);
@ -694,4 +826,20 @@ public class GlobalStateManagerImplTest { @@ -694,4 +826,20 @@ public class GlobalStateManagerImplTest {
restored.add(KeyValue.pair(key, value));
}
}
private class ConverterStore<K, V> extends NoOpReadOnlyStore<K, V> implements RecordConverter {
ConverterStore(final String name,
final boolean rocksdbStore) {
super(name, rocksdbStore);
}
@Override
public ConsumerRecord<byte[], byte[]> convert(final ConsumerRecord<byte[], byte[]> record) {
return new ConsumerRecord<>("", 0, 0L, "".getBytes(), "".getBytes());
}
}
}

42
streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorStateManagerTest.java

@ -30,6 +30,7 @@ import org.apache.kafka.streams.processor.StateStore; @@ -30,6 +30,7 @@ import org.apache.kafka.streams.processor.StateStore;
import org.apache.kafka.streams.processor.TaskId;
import org.apache.kafka.streams.processor.internals.testutil.LogCaptureAppender;
import org.apache.kafka.streams.state.internals.OffsetCheckpoint;
import org.apache.kafka.streams.state.RecordConverter;
import org.apache.kafka.test.MockBatchingStateRestoreListener;
import org.apache.kafka.test.MockKeyValueStore;
import org.apache.kafka.test.NoOpProcessorContext;
@ -157,6 +158,28 @@ public class ProcessorStateManagerTest { @@ -157,6 +158,28 @@ public class ProcessorStateManagerTest {
}
}
@Test
public void shouldConvertDataOnRestoreIfStoreImplementsRecordConverter() throws Exception {
final TaskId taskId = new TaskId(0, 2);
final Integer intKey = 2;
final MockKeyValueStore persistentStore = getConverterStore();
final ProcessorStateManager stateMgr = getStandByStateManager(taskId);
try {
stateMgr.register(persistentStore, persistentStore.stateRestoreCallback);
stateMgr.updateStandbyStates(
persistentStorePartition,
singletonList(consumerRecord),
consumerRecord.offset()
);
assertThat(persistentStore.keys.size(), is(1));
assertTrue(persistentStore.keys.contains(intKey));
} finally {
stateMgr.close(Collections.emptyMap());
}
}
@Test
public void testRegisterPersistentStore() throws IOException {
final TaskId taskId = new TaskId(0, 2);
@ -770,4 +793,23 @@ public class ProcessorStateManagerTest { @@ -770,4 +793,23 @@ public class ProcessorStateManagerTest {
return new MockKeyValueStore("persistentStore", true);
}
private MockKeyValueStore getConverterStore() {
return new ConverterStore("persistentStore", true);
}
private class ConverterStore extends MockKeyValueStore implements RecordConverter {
ConverterStore(final String name,
final boolean persistent) {
super(name, persistent);
}
@Override
public ConsumerRecord<byte[], byte[]> convert(final ConsumerRecord<byte[], byte[]> record) {
return new ConsumerRecord<>("", 0, 0L, new byte[]{0x0, 0x0, 0x0, 0x2}, "".getBytes());
}
}
}

8
streams/src/test/java/org/apache/kafka/streams/processor/internals/StateRestorerTest.java

@ -41,7 +41,8 @@ public class StateRestorerTest { @@ -41,7 +41,8 @@ public class StateRestorerTest {
null,
OFFSET_LIMIT,
true,
"storeName");
"storeName",
new DefaultRecordConverter());
@Before
public void setUp() {
@ -77,7 +78,8 @@ public class StateRestorerTest { @@ -77,7 +78,8 @@ public class StateRestorerTest {
null,
0,
true,
"storeName");
"storeName",
new DefaultRecordConverter());
assertTrue(restorer.hasCompleted(0, 10));
}
@ -105,4 +107,4 @@ public class StateRestorerTest { @@ -105,4 +107,4 @@ public class StateRestorerTest {
restorer.setRestoredOffset(100);
assertThat(restorer.restoredNumRecords(), equalTo(OFFSET_LIMIT - 20));
}
}
}

100
streams/src/test/java/org/apache/kafka/streams/processor/internals/StoreChangelogReaderTest.java

@ -106,7 +106,8 @@ public class StoreChangelogReaderTest { @@ -106,7 +106,8 @@ public class StoreChangelogReaderTest {
null,
Long.MAX_VALUE,
true,
"storeName"));
"storeName",
new DefaultRecordConverter()));
changelogReader.restore(active);
assertTrue(functionCalled.get());
}
@ -143,7 +144,8 @@ public class StoreChangelogReaderTest { @@ -143,7 +144,8 @@ public class StoreChangelogReaderTest {
null,
Long.MAX_VALUE,
true,
"storeName"));
"storeName",
new DefaultRecordConverter()));
expect(active.restoringTaskFor(topicPartition)).andStubReturn(task);
replay(active, task);
changelogReader.restore(active);
@ -167,7 +169,8 @@ public class StoreChangelogReaderTest { @@ -167,7 +169,8 @@ public class StoreChangelogReaderTest {
null,
Long.MAX_VALUE,
true,
"storeName"));
"storeName",
new DefaultRecordConverter()));
EasyMock.expect(active.restoringTaskFor(topicPartition)).andStubReturn(task);
EasyMock.replay(active, task);
@ -181,7 +184,8 @@ public class StoreChangelogReaderTest { @@ -181,7 +184,8 @@ public class StoreChangelogReaderTest {
null,
Long.MAX_VALUE,
true,
"storeName"));
"storeName",
new DefaultRecordConverter()));
// retry restore should succeed
assertEquals(1, changelogReader.restore(active).size());
assertThat(callback.restored.size(), equalTo(messages));
@ -200,12 +204,13 @@ public class StoreChangelogReaderTest { @@ -200,12 +204,13 @@ public class StoreChangelogReaderTest {
consumer.assign(Collections.emptyList());
final StateRestorer stateRestorer = new StateRestorer(
topicPartition,
restoreListener,
expiredCheckpoint,
Long.MAX_VALUE,
true,
"storeName");
topicPartition,
restoreListener,
expiredCheckpoint,
Long.MAX_VALUE,
true,
"storeName",
new DefaultRecordConverter());
changelogReader.register(stateRestorer);
EasyMock.expect(active.restoringTaskFor(topicPartition)).andStubReturn(task);
@ -234,7 +239,8 @@ public class StoreChangelogReaderTest { @@ -234,7 +239,8 @@ public class StoreChangelogReaderTest {
5L,
Long.MAX_VALUE,
true,
"storeName"));
"storeName",
new DefaultRecordConverter()));
changelogReader.restore(active);
assertThat(callback.restored.size(), equalTo(5));
@ -250,7 +256,8 @@ public class StoreChangelogReaderTest { @@ -250,7 +256,8 @@ public class StoreChangelogReaderTest {
null,
Long.MAX_VALUE,
true,
"storeName"));
"storeName",
new DefaultRecordConverter()));
expect(active.restoringTaskFor(topicPartition)).andStubReturn(task);
replay(active, task);
changelogReader.restore(active);
@ -266,7 +273,8 @@ public class StoreChangelogReaderTest { @@ -266,7 +273,8 @@ public class StoreChangelogReaderTest {
null,
3,
true,
"storeName");
"storeName",
new DefaultRecordConverter());
changelogReader.register(restorer);
expect(active.restoringTaskFor(topicPartition)).andStubReturn(task);
replay(active, task);
@ -293,21 +301,24 @@ public class StoreChangelogReaderTest { @@ -293,21 +301,24 @@ public class StoreChangelogReaderTest {
null,
Long.MAX_VALUE,
true,
"storeName1"));
"storeName1",
new DefaultRecordConverter()));
changelogReader.register(new StateRestorer(
one,
restoreListener1,
null,
Long.MAX_VALUE,
true,
"storeName2"));
"storeName2",
new DefaultRecordConverter()));
changelogReader.register(new StateRestorer(
two,
restoreListener2,
null,
Long.MAX_VALUE,
true,
"storeName3"));
"storeName3",
new DefaultRecordConverter()));
expect(active.restoringTaskFor(one)).andStubReturn(task);
expect(active.restoringTaskFor(two)).andStubReturn(task);
@ -338,21 +349,24 @@ public class StoreChangelogReaderTest { @@ -338,21 +349,24 @@ public class StoreChangelogReaderTest {
0L,
Long.MAX_VALUE,
true,
"storeName1"));
"storeName1",
new DefaultRecordConverter()));
changelogReader.register(new StateRestorer(
one,
restoreListener1,
0L,
Long.MAX_VALUE,
true,
"storeName2"));
"storeName2",
new DefaultRecordConverter()));
changelogReader.register(new StateRestorer(
two,
restoreListener2,
0L,
Long.MAX_VALUE,
true,
"storeName3"));
"storeName3",
new DefaultRecordConverter()));
expect(active.restoringTaskFor(one)).andReturn(task);
expect(active.restoringTaskFor(two)).andReturn(task);
@ -386,7 +400,8 @@ public class StoreChangelogReaderTest { @@ -386,7 +400,8 @@ public class StoreChangelogReaderTest {
0L,
5,
true,
"storeName1"));
"storeName1",
new DefaultRecordConverter()));
expect(active.restoringTaskFor(topicPartition)).andStubReturn(task);
replay(active, task);
changelogReader.restore(active);
@ -421,7 +436,8 @@ public class StoreChangelogReaderTest { @@ -421,7 +436,8 @@ public class StoreChangelogReaderTest {
null,
Long.MAX_VALUE,
true,
"storeName");
"storeName",
new DefaultRecordConverter());
setupConsumer(0, topicPartition);
changelogReader.register(restorer);
@ -440,7 +456,8 @@ public class StoreChangelogReaderTest { @@ -440,7 +456,8 @@ public class StoreChangelogReaderTest {
endOffset,
Long.MAX_VALUE,
true,
"storeName");
"storeName",
new DefaultRecordConverter());
changelogReader.register(restorer);
@ -458,7 +475,8 @@ public class StoreChangelogReaderTest { @@ -458,7 +475,8 @@ public class StoreChangelogReaderTest {
null,
Long.MAX_VALUE,
true,
"storeName"));
"storeName",
new DefaultRecordConverter()));
expect(active.restoringTaskFor(topicPartition)).andStubReturn(task);
replay(active, task);
changelogReader.restore(active);
@ -476,7 +494,8 @@ public class StoreChangelogReaderTest { @@ -476,7 +494,8 @@ public class StoreChangelogReaderTest {
null,
Long.MAX_VALUE,
false,
"storeName"));
"storeName",
new DefaultRecordConverter()));
expect(active.restoringTaskFor(topicPartition)).andStubReturn(task);
replay(active, task);
changelogReader.restore(active);
@ -498,7 +517,8 @@ public class StoreChangelogReaderTest { @@ -498,7 +517,8 @@ public class StoreChangelogReaderTest {
null,
Long.MAX_VALUE,
false,
"storeName"));
"storeName",
new DefaultRecordConverter()));
expect(active.restoringTaskFor(topicPartition)).andStubReturn(task);
replay(active, task);
changelogReader.restore(active);
@ -516,7 +536,8 @@ public class StoreChangelogReaderTest { @@ -516,7 +536,8 @@ public class StoreChangelogReaderTest {
null,
Long.MAX_VALUE,
true,
"store"));
"store",
new DefaultRecordConverter()));
expect(active.restoringTaskFor(topicPartition)).andStubReturn(task);
replay(active, task);
@ -537,7 +558,8 @@ public class StoreChangelogReaderTest { @@ -537,7 +558,8 @@ public class StoreChangelogReaderTest {
null,
Long.MAX_VALUE,
false,
"storeName"));
"storeName",
new DefaultRecordConverter()));
final TopicPartition postInitialization = new TopicPartition("other", 0);
expect(active.restoringTaskFor(topicPartition)).andStubReturn(task);
@ -558,7 +580,8 @@ public class StoreChangelogReaderTest { @@ -558,7 +580,8 @@ public class StoreChangelogReaderTest {
null,
Long.MAX_VALUE,
false,
"otherStore"));
"otherStore",
new DefaultRecordConverter()));
final Collection<TopicPartition> expected = Utils.mkSet(topicPartition, postInitialization);
consumer.assign(expected);
@ -581,7 +604,8 @@ public class StoreChangelogReaderTest { @@ -581,7 +604,8 @@ public class StoreChangelogReaderTest {
null,
9L,
true,
"storeName"));
"storeName",
new DefaultRecordConverter()));
expect(active.restoringTaskFor(topicPartition)).andReturn(task);
replay(active);
@ -602,7 +626,8 @@ public class StoreChangelogReaderTest { @@ -602,7 +626,8 @@ public class StoreChangelogReaderTest {
null,
Long.MAX_VALUE,
true,
"storeName"));
"storeName",
new DefaultRecordConverter()));
expect(active.restoringTaskFor(topicPartition)).andReturn(task);
replay(active);
@ -622,7 +647,8 @@ public class StoreChangelogReaderTest { @@ -622,7 +647,8 @@ public class StoreChangelogReaderTest {
null,
Long.MAX_VALUE,
true,
"storeName"));
"storeName",
new DefaultRecordConverter()));
expect(active.restoringTaskFor(topicPartition)).andReturn(task);
replay(active);
@ -641,7 +667,8 @@ public class StoreChangelogReaderTest { @@ -641,7 +667,8 @@ public class StoreChangelogReaderTest {
null,
5,
true,
"storeName"));
"storeName",
new DefaultRecordConverter()));
expect(active.restoringTaskFor(topicPartition)).andReturn(task);
replay(active);
@ -661,7 +688,8 @@ public class StoreChangelogReaderTest { @@ -661,7 +688,8 @@ public class StoreChangelogReaderTest {
null,
10,
true,
"storeName"));
"storeName",
new DefaultRecordConverter()));
expect(active.restoringTaskFor(topicPartition)).andReturn(task);
replay(active);
@ -688,7 +716,8 @@ public class StoreChangelogReaderTest { @@ -688,7 +716,8 @@ public class StoreChangelogReaderTest {
null,
6,
true,
"storeName"));
"storeName",
new DefaultRecordConverter()));
expect(active.restoringTaskFor(topicPartition)).andReturn(task);
replay(active);
@ -710,7 +739,8 @@ public class StoreChangelogReaderTest { @@ -710,7 +739,8 @@ public class StoreChangelogReaderTest {
null,
11,
true,
"storeName"));
"storeName",
new DefaultRecordConverter()));
expect(active.restoringTaskFor(topicPartition)).andReturn(task);
replay(active);

Loading…
Cancel
Save