Browse Source

KAFKA-3522: Replace RecordConverter with TimestampedBytesStore (#6204)

Reviewers: John Roesler <john@confluent.io>, Bill Bejeck <bill@confluent.io>, Guozhang Wang <guozhang@confluent.io>
pull/6135/merge
Matthias J. Sax 6 years ago committed by GitHub
parent
commit
201022d19e
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 5
      streams/src/main/java/org/apache/kafka/streams/processor/internals/GlobalStateManagerImpl.java
  2. 5
      streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorStateManager.java
  3. 2
      streams/src/main/java/org/apache/kafka/streams/processor/internals/StateRestorer.java
  4. 18
      streams/src/main/java/org/apache/kafka/streams/state/TimestampedBytesStore.java
  5. 42
      streams/src/main/java/org/apache/kafka/streams/state/internals/RecordConverter.java
  6. 9
      streams/src/main/java/org/apache/kafka/streams/state/internals/WrappedStateStore.java
  7. 29
      streams/src/test/java/org/apache/kafka/streams/processor/internals/GlobalStateManagerImplTest.java
  8. 19
      streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorStateManagerTest.java
  9. 4
      streams/src/test/java/org/apache/kafka/streams/processor/internals/StateRestorerTest.java
  10. 60
      streams/src/test/java/org/apache/kafka/streams/processor/internals/StoreChangelogReaderTest.java
  11. 2
      streams/src/test/java/org/apache/kafka/test/MockKeyValueStore.java

5
streams/src/main/java/org/apache/kafka/streams/processor/internals/GlobalStateManagerImpl.java

@ -32,7 +32,8 @@ import org.apache.kafka.streams.errors.StreamsException; @@ -32,7 +32,8 @@ import org.apache.kafka.streams.errors.StreamsException;
import org.apache.kafka.streams.processor.StateRestoreCallback;
import org.apache.kafka.streams.processor.StateRestoreListener;
import org.apache.kafka.streams.processor.StateStore;
import org.apache.kafka.streams.state.RecordConverter;
import org.apache.kafka.streams.state.TimestampedBytesStore;
import org.apache.kafka.streams.state.internals.RecordConverter;
import org.apache.kafka.streams.state.internals.WrappedStateStore;
import org.slf4j.Logger;
@ -200,7 +201,7 @@ public class GlobalStateManagerImpl extends AbstractStateManager implements Glob @@ -200,7 +201,7 @@ public class GlobalStateManagerImpl extends AbstractStateManager implements Glob
final StateStore stateStore =
store instanceof WrappedStateStore ? ((WrappedStateStore) store).inner() : store;
final RecordConverter recordConverter =
stateStore instanceof RecordConverter ? (RecordConverter) stateStore : new DefaultRecordConverter();
stateStore instanceof TimestampedBytesStore ? RecordConverter.converter() : record -> record;
restoreState(
stateRestoreCallback,

5
streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorStateManager.java

@ -23,8 +23,9 @@ import org.apache.kafka.streams.errors.ProcessorStateException; @@ -23,8 +23,9 @@ import org.apache.kafka.streams.errors.ProcessorStateException;
import org.apache.kafka.streams.processor.StateRestoreCallback;
import org.apache.kafka.streams.processor.StateStore;
import org.apache.kafka.streams.processor.TaskId;
import org.apache.kafka.streams.state.TimestampedBytesStore;
import org.apache.kafka.streams.state.internals.OffsetCheckpoint;
import org.apache.kafka.streams.state.RecordConverter;
import org.apache.kafka.streams.state.internals.RecordConverter;
import org.apache.kafka.streams.state.internals.WrappedStateStore;
import org.slf4j.Logger;
@ -136,7 +137,7 @@ public class ProcessorStateManager extends AbstractStateManager { @@ -136,7 +137,7 @@ public class ProcessorStateManager extends AbstractStateManager {
final StateStore stateStore =
store instanceof WrappedStateStore ? ((WrappedStateStore) store).inner() : store;
final RecordConverter recordConverter =
stateStore instanceof RecordConverter ? (RecordConverter) stateStore : new DefaultRecordConverter();
stateStore instanceof TimestampedBytesStore ? RecordConverter.converter() : record -> record;
if (isStandby) {
log.trace("Preparing standby replica of persistent state store {} with changelog topic {}", storeName, topic);

2
streams/src/main/java/org/apache/kafka/streams/processor/internals/StateRestorer.java

@ -19,7 +19,7 @@ package org.apache.kafka.streams.processor.internals; @@ -19,7 +19,7 @@ package org.apache.kafka.streams.processor.internals;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.streams.processor.StateRestoreListener;
import org.apache.kafka.streams.state.RecordConverter;
import org.apache.kafka.streams.state.internals.RecordConverter;
import java.util.ArrayList;
import java.util.Collection;

18
streams/src/main/java/org/apache/kafka/streams/processor/internals/DefaultRecordConverter.java → streams/src/main/java/org/apache/kafka/streams/state/TimestampedBytesStore.java

@ -14,16 +14,18 @@ @@ -14,16 +14,18 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.streams.processor.internals;
package org.apache.kafka.streams.state;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.streams.state.RecordConverter;
import java.nio.ByteBuffer;
public class DefaultRecordConverter implements RecordConverter {
import static org.apache.kafka.clients.consumer.ConsumerRecord.NO_TIMESTAMP;
@Override
public ConsumerRecord<byte[], byte[]> convert(final ConsumerRecord<byte[], byte[]> record) {
return record;
public interface TimestampedBytesStore {
static byte[] convertToTimestampedFormat(final byte[] plainValue) {
return ByteBuffer
.allocate(8 + plainValue.length)
.putLong(NO_TIMESTAMP)
.put(plainValue)
.array();
}
}

42
streams/src/main/java/org/apache/kafka/streams/state/RecordConverter.java → streams/src/main/java/org/apache/kafka/streams/state/internals/RecordConverter.java

@ -14,22 +14,38 @@ @@ -14,22 +14,38 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.streams.state;
package org.apache.kafka.streams.state.internals;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.streams.KeyValue;
/**
* {@code RecordConverter} translates a {@link ConsumerRecord} into a {@link KeyValue} pair.
*/
public interface RecordConverter {
import java.nio.ByteBuffer;
/**
* Convert a given record into a key-value pair.
*
* @param record the consumer record
* @return the record as key-value pair
*/
public interface RecordConverter {
ConsumerRecord<byte[], byte[]> convert(final ConsumerRecord<byte[], byte[]> record);
}
@SuppressWarnings("deprecation")
static RecordConverter converter() {
return record -> {
final byte[] rawValue = record.value();
final long timestamp = record.timestamp();
return new ConsumerRecord<>(
record.topic(),
record.partition(),
record.offset(),
timestamp,
record.timestampType(),
record.checksum(),
record.serializedKeySize(),
record.serializedValueSize(),
record.key(),
ByteBuffer
.allocate(8 + rawValue.length)
.putLong(timestamp)
.put(rawValue)
.array(),
record.headers(),
record.leaderEpoch()
);
};
}
}

9
streams/src/main/java/org/apache/kafka/streams/state/internals/WrappedStateStore.java

@ -16,11 +16,9 @@ @@ -16,11 +16,9 @@
*/
package org.apache.kafka.streams.state.internals;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.streams.errors.InvalidStateStoreException;
import org.apache.kafka.streams.processor.ProcessorContext;
import org.apache.kafka.streams.processor.StateStore;
import org.apache.kafka.streams.state.RecordConverter;
/**
* A storage engine wrapper for utilities like logging, caching, and metering.
@ -40,7 +38,7 @@ public interface WrappedStateStore extends StateStore { @@ -40,7 +38,7 @@ public interface WrappedStateStore extends StateStore {
*/
StateStore wrappedStore();
abstract class AbstractStateStore implements WrappedStateStore, RecordConverter {
abstract class AbstractStateStore implements WrappedStateStore {
final StateStore innerState;
protected AbstractStateStore(final StateStore inner) {
@ -97,10 +95,5 @@ public interface WrappedStateStore extends StateStore { @@ -97,10 +95,5 @@ public interface WrappedStateStore extends StateStore {
return innerState;
}
@Override
public ConsumerRecord<byte[], byte[]> convert(final ConsumerRecord<byte[], byte[]> record) {
return ((RecordConverter) innerState).convert(record);
}
}
}

29
streams/src/test/java/org/apache/kafka/streams/processor/internals/GlobalStateManagerImplTest.java

@ -34,8 +34,8 @@ import org.apache.kafka.streams.errors.StreamsException; @@ -34,8 +34,8 @@ import org.apache.kafka.streams.errors.StreamsException;
import org.apache.kafka.streams.processor.ProcessorContext;
import org.apache.kafka.streams.processor.StateRestoreCallback;
import org.apache.kafka.streams.processor.StateStore;
import org.apache.kafka.streams.state.TimestampedBytesStore;
import org.apache.kafka.streams.state.internals.OffsetCheckpoint;
import org.apache.kafka.streams.state.RecordConverter;
import org.apache.kafka.streams.state.internals.WrappedStateStore;
import org.apache.kafka.test.InternalMockProcessorContext;
import org.apache.kafka.test.MockStateRestoreListener;
@ -224,7 +224,7 @@ public class GlobalStateManagerImplTest { @@ -224,7 +224,7 @@ public class GlobalStateManagerImplTest {
}
@Test
public void shouldUseDefaultRecordConverterIfStoreDoesNotImplementRecordConverter() {
public void shouldNotConvertValuesIfStoreDoesNotImplementTimestampedBytesStore() {
initializeConsumer(1, 0, t1);
stateManager.initialize();
@ -236,7 +236,7 @@ public class GlobalStateManagerImplTest { @@ -236,7 +236,7 @@ public class GlobalStateManagerImplTest {
}
@Test
public void shouldUseDefaultRecordConverterIfInnerStoreDoesNotImplementRecordConverter() {
public void shouldNotConvertValuesIfInnerStoreDoesNotImplementTimestampedBytesStore() {
initializeConsumer(1, 0, t1);
stateManager.initialize();
@ -288,20 +288,19 @@ public class GlobalStateManagerImplTest { @@ -288,20 +288,19 @@ public class GlobalStateManagerImplTest {
}
@Test
public void shouldUseStoreAsRecordConverterIfStoreImplementsRecordConverter() {
public void shouldConvertValuesIfStoreImplementsTimestampedBytesStore() {
initializeConsumer(1, 0, t2);
stateManager.initialize();
stateManager.register(store2, stateRestoreCallback);
final KeyValue<byte[], byte[]> restoredRecord = stateRestoreCallback.restored.get(0);
assertEquals(0, restoredRecord.key.length);
assertEquals(0, restoredRecord.value.length);
assertEquals(3, restoredRecord.key.length);
assertEquals(13, restoredRecord.value.length);
}
@Test
public void shouldUseStoreAsRecordConverterIfInnerStoreImplementsRecordConverter() {
public void shouldConvertValuesIfInnerStoreImplementsTimestampedBytesStore() {
initializeConsumer(1, 0, t2);
stateManager.initialize();
@ -348,8 +347,8 @@ public class GlobalStateManagerImplTest { @@ -348,8 +347,8 @@ public class GlobalStateManagerImplTest {
}, stateRestoreCallback);
final KeyValue<byte[], byte[]> restoredRecord = stateRestoreCallback.restored.get(0);
assertEquals(0, restoredRecord.key.length);
assertEquals(0, restoredRecord.value.length);
assertEquals(3, restoredRecord.key.length);
assertEquals(13, restoredRecord.value.length);
}
@Test
@ -827,19 +826,11 @@ public class GlobalStateManagerImplTest { @@ -827,19 +826,11 @@ public class GlobalStateManagerImplTest {
}
}
private class ConverterStore<K, V> extends NoOpReadOnlyStore<K, V> implements RecordConverter {
private class ConverterStore<K, V> extends NoOpReadOnlyStore<K, V> implements TimestampedBytesStore {
ConverterStore(final String name,
final boolean rocksdbStore) {
super(name, rocksdbStore);
}
@Override
public ConsumerRecord<byte[], byte[]> convert(final ConsumerRecord<byte[], byte[]> record) {
return new ConsumerRecord<>("", 0, 0L, "".getBytes(), "".getBytes());
}
}
}

19
streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorStateManagerTest.java

@ -29,8 +29,8 @@ import org.apache.kafka.streams.processor.StateRestoreCallback; @@ -29,8 +29,8 @@ import org.apache.kafka.streams.processor.StateRestoreCallback;
import org.apache.kafka.streams.processor.StateStore;
import org.apache.kafka.streams.processor.TaskId;
import org.apache.kafka.streams.processor.internals.testutil.LogCaptureAppender;
import org.apache.kafka.streams.state.TimestampedBytesStore;
import org.apache.kafka.streams.state.internals.OffsetCheckpoint;
import org.apache.kafka.streams.state.RecordConverter;
import org.apache.kafka.test.MockBatchingStateRestoreListener;
import org.apache.kafka.test.MockKeyValueStore;
import org.apache.kafka.test.NoOpProcessorContext;
@ -153,15 +153,16 @@ public class ProcessorStateManagerTest { @@ -153,15 +153,16 @@ public class ProcessorStateManagerTest {
);
assertThat(persistentStore.keys.size(), is(1));
assertTrue(persistentStore.keys.contains(intKey));
assertEquals(9, persistentStore.values.get(0).length);
} finally {
stateMgr.close(Collections.emptyMap());
}
}
@Test
public void shouldConvertDataOnRestoreIfStoreImplementsRecordConverter() throws Exception {
public void shouldConvertDataOnRestoreIfStoreImplementsTimestampedBytesStore() throws Exception {
final TaskId taskId = new TaskId(0, 2);
final Integer intKey = 2;
final Integer intKey = 1;
final MockKeyValueStore persistentStore = getConverterStore();
final ProcessorStateManager stateMgr = getStandByStateManager(taskId);
@ -175,6 +176,7 @@ public class ProcessorStateManagerTest { @@ -175,6 +176,7 @@ public class ProcessorStateManagerTest {
);
assertThat(persistentStore.keys.size(), is(1));
assertTrue(persistentStore.keys.contains(intKey));
assertEquals(17, persistentStore.values.get(0).length);
} finally {
stateMgr.close(Collections.emptyMap());
}
@ -797,19 +799,10 @@ public class ProcessorStateManagerTest { @@ -797,19 +799,10 @@ public class ProcessorStateManagerTest {
return new ConverterStore("persistentStore", true);
}
private class ConverterStore extends MockKeyValueStore implements RecordConverter {
private class ConverterStore extends MockKeyValueStore implements TimestampedBytesStore {
ConverterStore(final String name,
final boolean persistent) {
super(name, persistent);
}
@Override
public ConsumerRecord<byte[], byte[]> convert(final ConsumerRecord<byte[], byte[]> record) {
return new ConsumerRecord<>("", 0, 0L, new byte[]{0x0, 0x0, 0x0, 0x2}, "".getBytes());
}
}
}

4
streams/src/test/java/org/apache/kafka/streams/processor/internals/StateRestorerTest.java

@ -42,7 +42,7 @@ public class StateRestorerTest { @@ -42,7 +42,7 @@ public class StateRestorerTest {
OFFSET_LIMIT,
true,
"storeName",
new DefaultRecordConverter());
record -> record);
@Before
public void setUp() {
@ -79,7 +79,7 @@ public class StateRestorerTest { @@ -79,7 +79,7 @@ public class StateRestorerTest {
0,
true,
"storeName",
new DefaultRecordConverter());
record -> record);
assertTrue(restorer.hasCompleted(0, 10));
}

60
streams/src/test/java/org/apache/kafka/streams/processor/internals/StoreChangelogReaderTest.java

@ -107,7 +107,7 @@ public class StoreChangelogReaderTest { @@ -107,7 +107,7 @@ public class StoreChangelogReaderTest {
Long.MAX_VALUE,
true,
"storeName",
new DefaultRecordConverter()));
record -> record));
changelogReader.restore(active);
assertTrue(functionCalled.get());
}
@ -145,7 +145,7 @@ public class StoreChangelogReaderTest { @@ -145,7 +145,7 @@ public class StoreChangelogReaderTest {
Long.MAX_VALUE,
true,
"storeName",
new DefaultRecordConverter()));
record -> record));
expect(active.restoringTaskFor(topicPartition)).andStubReturn(task);
replay(active, task);
changelogReader.restore(active);
@ -170,7 +170,7 @@ public class StoreChangelogReaderTest { @@ -170,7 +170,7 @@ public class StoreChangelogReaderTest {
Long.MAX_VALUE,
true,
"storeName",
new DefaultRecordConverter()));
record -> record));
EasyMock.expect(active.restoringTaskFor(topicPartition)).andStubReturn(task);
EasyMock.replay(active, task);
@ -185,7 +185,7 @@ public class StoreChangelogReaderTest { @@ -185,7 +185,7 @@ public class StoreChangelogReaderTest {
Long.MAX_VALUE,
true,
"storeName",
new DefaultRecordConverter()));
record -> record));
// retry restore should succeed
assertEquals(1, changelogReader.restore(active).size());
assertThat(callback.restored.size(), equalTo(messages));
@ -210,7 +210,7 @@ public class StoreChangelogReaderTest { @@ -210,7 +210,7 @@ public class StoreChangelogReaderTest {
Long.MAX_VALUE,
true,
"storeName",
new DefaultRecordConverter());
record -> record);
changelogReader.register(stateRestorer);
EasyMock.expect(active.restoringTaskFor(topicPartition)).andStubReturn(task);
@ -240,7 +240,7 @@ public class StoreChangelogReaderTest { @@ -240,7 +240,7 @@ public class StoreChangelogReaderTest {
Long.MAX_VALUE,
true,
"storeName",
new DefaultRecordConverter()));
record -> record));
changelogReader.restore(active);
assertThat(callback.restored.size(), equalTo(5));
@ -257,7 +257,7 @@ public class StoreChangelogReaderTest { @@ -257,7 +257,7 @@ public class StoreChangelogReaderTest {
Long.MAX_VALUE,
true,
"storeName",
new DefaultRecordConverter()));
record -> record));
expect(active.restoringTaskFor(topicPartition)).andStubReturn(task);
replay(active, task);
changelogReader.restore(active);
@ -274,7 +274,7 @@ public class StoreChangelogReaderTest { @@ -274,7 +274,7 @@ public class StoreChangelogReaderTest {
3,
true,
"storeName",
new DefaultRecordConverter());
record -> record);
changelogReader.register(restorer);
expect(active.restoringTaskFor(topicPartition)).andStubReturn(task);
replay(active, task);
@ -302,7 +302,7 @@ public class StoreChangelogReaderTest { @@ -302,7 +302,7 @@ public class StoreChangelogReaderTest {
Long.MAX_VALUE,
true,
"storeName1",
new DefaultRecordConverter()));
record -> record));
changelogReader.register(new StateRestorer(
one,
restoreListener1,
@ -310,7 +310,7 @@ public class StoreChangelogReaderTest { @@ -310,7 +310,7 @@ public class StoreChangelogReaderTest {
Long.MAX_VALUE,
true,
"storeName2",
new DefaultRecordConverter()));
record -> record));
changelogReader.register(new StateRestorer(
two,
restoreListener2,
@ -318,7 +318,7 @@ public class StoreChangelogReaderTest { @@ -318,7 +318,7 @@ public class StoreChangelogReaderTest {
Long.MAX_VALUE,
true,
"storeName3",
new DefaultRecordConverter()));
record -> record));
expect(active.restoringTaskFor(one)).andStubReturn(task);
expect(active.restoringTaskFor(two)).andStubReturn(task);
@ -350,7 +350,7 @@ public class StoreChangelogReaderTest { @@ -350,7 +350,7 @@ public class StoreChangelogReaderTest {
Long.MAX_VALUE,
true,
"storeName1",
new DefaultRecordConverter()));
record -> record));
changelogReader.register(new StateRestorer(
one,
restoreListener1,
@ -358,7 +358,7 @@ public class StoreChangelogReaderTest { @@ -358,7 +358,7 @@ public class StoreChangelogReaderTest {
Long.MAX_VALUE,
true,
"storeName2",
new DefaultRecordConverter()));
record -> record));
changelogReader.register(new StateRestorer(
two,
restoreListener2,
@ -366,7 +366,7 @@ public class StoreChangelogReaderTest { @@ -366,7 +366,7 @@ public class StoreChangelogReaderTest {
Long.MAX_VALUE,
true,
"storeName3",
new DefaultRecordConverter()));
record -> record));
expect(active.restoringTaskFor(one)).andReturn(task);
expect(active.restoringTaskFor(two)).andReturn(task);
@ -401,7 +401,7 @@ public class StoreChangelogReaderTest { @@ -401,7 +401,7 @@ public class StoreChangelogReaderTest {
5,
true,
"storeName1",
new DefaultRecordConverter()));
record -> record));
expect(active.restoringTaskFor(topicPartition)).andStubReturn(task);
replay(active, task);
changelogReader.restore(active);
@ -437,7 +437,7 @@ public class StoreChangelogReaderTest { @@ -437,7 +437,7 @@ public class StoreChangelogReaderTest {
Long.MAX_VALUE,
true,
"storeName",
new DefaultRecordConverter());
record -> record);
setupConsumer(0, topicPartition);
changelogReader.register(restorer);
@ -457,7 +457,7 @@ public class StoreChangelogReaderTest { @@ -457,7 +457,7 @@ public class StoreChangelogReaderTest {
Long.MAX_VALUE,
true,
"storeName",
new DefaultRecordConverter());
record -> record);
changelogReader.register(restorer);
@ -476,7 +476,7 @@ public class StoreChangelogReaderTest { @@ -476,7 +476,7 @@ public class StoreChangelogReaderTest {
Long.MAX_VALUE,
true,
"storeName",
new DefaultRecordConverter()));
record -> record));
expect(active.restoringTaskFor(topicPartition)).andStubReturn(task);
replay(active, task);
changelogReader.restore(active);
@ -495,7 +495,7 @@ public class StoreChangelogReaderTest { @@ -495,7 +495,7 @@ public class StoreChangelogReaderTest {
Long.MAX_VALUE,
false,
"storeName",
new DefaultRecordConverter()));
record -> record));
expect(active.restoringTaskFor(topicPartition)).andStubReturn(task);
replay(active, task);
changelogReader.restore(active);
@ -518,7 +518,7 @@ public class StoreChangelogReaderTest { @@ -518,7 +518,7 @@ public class StoreChangelogReaderTest {
Long.MAX_VALUE,
false,
"storeName",
new DefaultRecordConverter()));
record -> record));
expect(active.restoringTaskFor(topicPartition)).andStubReturn(task);
replay(active, task);
changelogReader.restore(active);
@ -537,7 +537,7 @@ public class StoreChangelogReaderTest { @@ -537,7 +537,7 @@ public class StoreChangelogReaderTest {
Long.MAX_VALUE,
true,
"store",
new DefaultRecordConverter()));
record -> record));
expect(active.restoringTaskFor(topicPartition)).andStubReturn(task);
replay(active, task);
@ -559,7 +559,7 @@ public class StoreChangelogReaderTest { @@ -559,7 +559,7 @@ public class StoreChangelogReaderTest {
Long.MAX_VALUE,
false,
"storeName",
new DefaultRecordConverter()));
record -> record));
final TopicPartition postInitialization = new TopicPartition("other", 0);
expect(active.restoringTaskFor(topicPartition)).andStubReturn(task);
@ -581,7 +581,7 @@ public class StoreChangelogReaderTest { @@ -581,7 +581,7 @@ public class StoreChangelogReaderTest {
Long.MAX_VALUE,
false,
"otherStore",
new DefaultRecordConverter()));
record -> record));
final Collection<TopicPartition> expected = Utils.mkSet(topicPartition, postInitialization);
consumer.assign(expected);
@ -605,7 +605,7 @@ public class StoreChangelogReaderTest { @@ -605,7 +605,7 @@ public class StoreChangelogReaderTest {
9L,
true,
"storeName",
new DefaultRecordConverter()));
record -> record));
expect(active.restoringTaskFor(topicPartition)).andReturn(task);
replay(active);
@ -627,7 +627,7 @@ public class StoreChangelogReaderTest { @@ -627,7 +627,7 @@ public class StoreChangelogReaderTest {
Long.MAX_VALUE,
true,
"storeName",
new DefaultRecordConverter()));
record -> record));
expect(active.restoringTaskFor(topicPartition)).andReturn(task);
replay(active);
@ -648,7 +648,7 @@ public class StoreChangelogReaderTest { @@ -648,7 +648,7 @@ public class StoreChangelogReaderTest {
Long.MAX_VALUE,
true,
"storeName",
new DefaultRecordConverter()));
record -> record));
expect(active.restoringTaskFor(topicPartition)).andReturn(task);
replay(active);
@ -668,7 +668,7 @@ public class StoreChangelogReaderTest { @@ -668,7 +668,7 @@ public class StoreChangelogReaderTest {
5,
true,
"storeName",
new DefaultRecordConverter()));
record -> record));
expect(active.restoringTaskFor(topicPartition)).andReturn(task);
replay(active);
@ -689,7 +689,7 @@ public class StoreChangelogReaderTest { @@ -689,7 +689,7 @@ public class StoreChangelogReaderTest {
10,
true,
"storeName",
new DefaultRecordConverter()));
record -> record));
expect(active.restoringTaskFor(topicPartition)).andReturn(task);
replay(active);
@ -717,7 +717,7 @@ public class StoreChangelogReaderTest { @@ -717,7 +717,7 @@ public class StoreChangelogReaderTest {
6,
true,
"storeName",
new DefaultRecordConverter()));
record -> record));
expect(active.restoringTaskFor(topicPartition)).andReturn(task);
replay(active);
@ -740,7 +740,7 @@ public class StoreChangelogReaderTest { @@ -740,7 +740,7 @@ public class StoreChangelogReaderTest {
11,
true,
"storeName",
new DefaultRecordConverter()));
record -> record));
expect(active.restoringTaskFor(topicPartition)).andReturn(task);
replay(active);

2
streams/src/test/java/org/apache/kafka/test/MockKeyValueStore.java

@ -35,6 +35,7 @@ public class MockKeyValueStore implements KeyValueStore { @@ -35,6 +35,7 @@ public class MockKeyValueStore implements KeyValueStore {
public boolean flushed = false;
public boolean closed = true;
public final ArrayList<Integer> keys = new ArrayList<>();
public final ArrayList<byte[]> values = new ArrayList<>();
public MockKeyValueStore(final String name,
final boolean persistent) {
@ -82,6 +83,7 @@ public class MockKeyValueStore implements KeyValueStore { @@ -82,6 +83,7 @@ public class MockKeyValueStore implements KeyValueStore {
public void restore(final byte[] key,
final byte[] value) {
keys.add(deserializer.deserialize("", key));
values.add(value);
}
};

Loading…
Cancel
Save