Browse Source

MINOR: cleanup some state store code (#5656)

Reviewers: John Roesler <john@confluent.io>, Bill Bejeck <bill@confluent.io>, Guozhang Wang <guozhang@confluent.io>
pull/5662/head
Matthias J. Sax 6 years ago committed by GitHub
parent
commit
88823c6016
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 33
      streams/src/main/java/org/apache/kafka/streams/state/KeyValueStore.java
  2. 21
      streams/src/main/java/org/apache/kafka/streams/state/QueryableStoreType.java
  3. 5
      streams/src/main/java/org/apache/kafka/streams/state/ReadOnlyKeyValueStore.java
  4. 4
      streams/src/main/java/org/apache/kafka/streams/state/ReadOnlyWindowStore.java
  5. 4
      streams/src/main/java/org/apache/kafka/streams/state/RocksDBConfigSetter.java
  6. 14
      streams/src/main/java/org/apache/kafka/streams/state/SessionBytesStoreSupplier.java
  7. 1
      streams/src/main/java/org/apache/kafka/streams/state/StoreBuilder.java
  8. 79
      streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBStore.java
  9. 48
      streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBStoreTest.java

33
streams/src/main/java/org/apache/kafka/streams/state/KeyValueStore.java

@ -30,42 +30,41 @@ import java.util.List; @@ -30,42 +30,41 @@ import java.util.List;
public interface KeyValueStore<K, V> extends StateStore, ReadOnlyKeyValueStore<K, V> {
/**
* Update the value associated with this key
* Update the value associated with this key.
*
* @param key The key to associate the value to
* @param value The value to update, it can be null;
* if the serialized bytes are also null it is interpreted as deletes
* @throws NullPointerException If null is used for key.
* @param value The value to update, it can be {@code null};
* if the serialized bytes are also {@code null} it is interpreted as deletes
* @throws NullPointerException If {@code null} is used for key.
*/
void put(K key, V value);
/**
* Update the value associated with this key, unless a value
* is already associated with the key
* Update the value associated with this key, unless a value is already associated with the key.
*
* @param key The key to associate the value to
* @param value The value to update, it can be null;
* if the serialized bytes are also null it is interpreted as deletes
* @return The old value or null if there is no such key.
* @throws NullPointerException If null is used for key.
* @param value The value to update, it can be {@code null};
* if the serialized bytes are also {@code null} it is interpreted as deletes
* @return The old value or {@code null} if there is no such key.
* @throws NullPointerException If {@code null} is used for key.
*/
V putIfAbsent(K key, V value);
/**
* Update all the given key/value pairs
* Update all the given key/value pairs.
*
* @param entries A list of entries to put into the store;
* if the serialized bytes are also null it is interpreted as deletes
* @throws NullPointerException If null is used for key.
* if the serialized bytes are also {@code null} it is interpreted as deletes
* @throws NullPointerException If {@code null} is used for key.
*/
void putAll(List<KeyValue<K, V>> entries);
/**
* Delete the value from the store (if there is one)
* Delete the value from the store (if there is one).
*
* @param key The key
* @return The old value or null if there is no such key.
* @throws NullPointerException If null is used for key.
* @return The old value or {@code null} if there is no such key.
* @throws NullPointerException If {@code null} is used for key.
*/
V delete(K key);
}
}

21
streams/src/main/java/org/apache/kafka/streams/state/QueryableStoreType.java

@ -16,33 +16,34 @@ @@ -16,33 +16,34 @@
*/
package org.apache.kafka.streams.state;
import org.apache.kafka.streams.KafkaStreams;
import org.apache.kafka.streams.processor.StateStore;
import org.apache.kafka.streams.state.internals.StateStoreProvider;
/**
* Used to enable querying of custom {@link StateStore} types via the
* {@link org.apache.kafka.streams.KafkaStreams}
* API.
* @see QueryableStoreTypes
* Used to enable querying of custom {@link StateStore} types via the {@link KafkaStreams} API.
*
* @param <T> The store type
* @param <T> The store type
* @see QueryableStoreTypes
*/
public interface QueryableStoreType<T> {
/**
* Called when searching for {@link StateStore}s to see if they
* match the type expected by implementors of this interface
* match the type expected by implementors of this interface.
*
* @param stateStore The stateStore
* @return true if it is a match
*/
boolean accepts(final StateStore stateStore);
/**
* Create an instance of T (usually a facade) that developers can use
* to query the underlying {@link StateStore}s
* Create an instance of {@code T} (usually a facade) that developers can use
* to query the underlying {@link StateStore}s.
*
* @param storeProvider provides access to all the underlying StateStore instances
* @param storeName The name of the Store
* @return T usually a read-only interface over a StateStore @see {@link QueryableStoreTypes.KeyValueStoreType}
* @return a read-only interface over a {@code StateStore} (cf. {@link QueryableStoreTypes.KeyValueStoreType})
*/
T create(final StateStoreProvider storeProvider, final String storeName);
}
}

5
streams/src/main/java/org/apache/kafka/streams/state/ReadOnlyKeyValueStore.java

@ -19,9 +19,8 @@ package org.apache.kafka.streams.state; @@ -19,9 +19,8 @@ package org.apache.kafka.streams.state;
import org.apache.kafka.streams.errors.InvalidStateStoreException;
/**
* A key value store that only supports read operations.
* Implementations should be thread-safe as concurrent reads and writes
* are expected.
* A key-value store that only supports read operations.
* Implementations should be thread-safe as concurrent reads and writes are expected.
*
* Please note that this contract defines the thread-safe read functionality only; it does not
* guarantee anything about whether the actual instance is writable by another thread, or

4
streams/src/main/java/org/apache/kafka/streams/state/ReadOnlyWindowStore.java

@ -21,8 +21,8 @@ import org.apache.kafka.streams.kstream.Windowed; @@ -21,8 +21,8 @@ import org.apache.kafka.streams.kstream.Windowed;
/**
* A window store that only supports read operations
* Implementations should be thread-safe as concurrent reads and writes
* are expected.
* Implementations should be thread-safe as concurrent reads and writes are expected.
*
* @param <K> Type of keys
* @param <V> Type of values
*/

4
streams/src/main/java/org/apache/kafka/streams/state/RocksDBConfigSetter.java

@ -21,8 +21,8 @@ import org.rocksdb.Options; @@ -21,8 +21,8 @@ import org.rocksdb.Options;
import java.util.Map;
/**
* An interface to that allows developers to customize the RocksDB settings
* for a given Store. Please read the <a href="https://github.com/facebook/rocksdb/wiki/RocksDB-Tuning-Guide">RocksDB Tuning Guide</a>.
* An interface to that allows developers to customize the RocksDB settings for a given Store.
* Please read the <a href="https://github.com/facebook/rocksdb/wiki/RocksDB-Tuning-Guide">RocksDB Tuning Guide</a>.
*/
public interface RocksDBConfigSetter {

14
streams/src/main/java/org/apache/kafka/streams/state/SessionBytesStoreSupplier.java

@ -19,12 +19,14 @@ package org.apache.kafka.streams.state; @@ -19,12 +19,14 @@ package org.apache.kafka.streams.state;
import org.apache.kafka.common.utils.Bytes;
/**
* A store supplier that can be used to create one or more {@link SessionStore SessionStore<Bytes, byte[]>} instances of type &lt;Byte, byte[]&gt;.
* A store supplier that can be used to create one or more {@link SessionStore SessionStore&lt;Byte, byte[]&gt;} instances.
*
* For any stores implementing the {@link SessionStore SessionStore<Bytes, byte[]>} interface, null value bytes are considered as "not exist". This means:
*
* 1. Null value bytes in put operations should be treated as delete.
* 2. Null value bytes should never be returned in range query results.
* For any stores implementing the {@link SessionStore SessionStore&lt;Byte, byte[]&gt;} interface, {@code null} value
* bytes are considered as "not exist". This means:
* <ol>
* <li>{@code null} value bytes in put operations should be treated as delete.</li>
* <li>{@code null} value bytes should never be returned in range query results.</li>
* </ol>
*/
public interface SessionBytesStoreSupplier extends StoreSupplier<SessionStore<Bytes, byte[]>> {
@ -42,4 +44,4 @@ public interface SessionBytesStoreSupplier extends StoreSupplier<SessionStore<By @@ -42,4 +44,4 @@ public interface SessionBytesStoreSupplier extends StoreSupplier<SessionStore<By
* @return retentionPeriod
*/
long retentionPeriod();
}
}

1
streams/src/main/java/org/apache/kafka/streams/state/StoreBuilder.java

@ -61,7 +61,6 @@ public interface StoreBuilder<T extends StateStore> { @@ -61,7 +61,6 @@ public interface StoreBuilder<T extends StateStore> {
*/
T build();
/**
* Returns a Map containing any log configs that will be used when creating the changelog for the {@link StateStore}.
* <p>

79
streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBStore.java

@ -44,7 +44,6 @@ import org.rocksdb.WriteBatch; @@ -44,7 +44,6 @@ import org.rocksdb.WriteBatch;
import org.rocksdb.WriteOptions;
import java.io.File;
import java.io.FilenameFilter;
import java.io.IOException;
import java.nio.file.Files;
import java.util.Collection;
@ -69,14 +68,11 @@ public class RocksDBStore implements KeyValueStore<Bytes, byte[]> { @@ -69,14 +68,11 @@ public class RocksDBStore implements KeyValueStore<Bytes, byte[]> {
private static final Pattern SST_FILE_EXTENSION = Pattern.compile(".*\\.sst");
private static final int TTL_NOT_USED = -1;
private static final CompressionType COMPRESSION_TYPE = CompressionType.NO_COMPRESSION;
private static final CompactionStyle COMPACTION_STYLE = CompactionStyle.UNIVERSAL;
private static final long WRITE_BUFFER_SIZE = 16 * 1024 * 1024L;
private static final long BLOCK_CACHE_SIZE = 50 * 1024 * 1024L;
private static final long BLOCK_SIZE = 4096L;
private static final int TTL_SECONDS = TTL_NOT_USED;
private static final int MAX_WRITE_BUFFERS = 3;
private static final String DB_FILE_DIR = "rocksdb";
@ -152,10 +148,15 @@ public class RocksDBStore implements KeyValueStore<Bytes, byte[]> { @@ -152,10 +148,15 @@ public class RocksDBStore implements KeyValueStore<Bytes, byte[]> {
options.prepareForBulkLoad();
}
this.dbDir = new File(new File(context.stateDir(), parentDir), this.name);
dbDir = new File(new File(context.stateDir(), parentDir), name);
try {
this.db = openDB(this.dbDir, this.options, TTL_SECONDS);
try {
Files.createDirectories(dbDir.getParentFile().toPath());
db = RocksDB.open(options, dbDir.getAbsolutePath());
} catch (final RocksDBException e) {
throw new ProcessorStateException("Error opening store " + name + " at location " + dbDir.toString(), e);
}
} catch (final IOException e) {
throw new ProcessorStateException(e);
}
@ -166,30 +167,13 @@ public class RocksDBStore implements KeyValueStore<Bytes, byte[]> { @@ -166,30 +167,13 @@ public class RocksDBStore implements KeyValueStore<Bytes, byte[]> {
public void init(final ProcessorContext context,
final StateStore root) {
// open the DB dir
this.internalProcessorContext = context;
internalProcessorContext = context;
openDB(context);
this.batchingStateRestoreCallback = new RocksDBBatchingRestoreCallback(this);
batchingStateRestoreCallback = new RocksDBBatchingRestoreCallback(this);
// value getter should always read directly from rocksDB
// since it is only for values that are already flushed
context.register(root, this.batchingStateRestoreCallback);
}
private RocksDB openDB(final File dir,
final Options options,
final int ttl) throws IOException {
try {
if (ttl == TTL_NOT_USED) {
Files.createDirectories(dir.getParentFile().toPath());
return RocksDB.open(options, dir.getAbsolutePath());
} else {
throw new UnsupportedOperationException("Change log is not supported for store " + this.name + " since it is TTL based.");
// TODO: support TTL with change log?
// return TtlDB.open(options, dir.toString(), ttl, false);
}
} catch (final RocksDBException e) {
throw new ProcessorStateException("Error opening store " + this.name + " at location " + dir.toString(), e);
}
context.register(root, batchingStateRestoreCallback);
}
// visible for testing
@ -199,7 +183,7 @@ public class RocksDBStore implements KeyValueStore<Bytes, byte[]> { @@ -199,7 +183,7 @@ public class RocksDBStore implements KeyValueStore<Bytes, byte[]> {
@Override
public String name() {
return this.name;
return name;
}
@Override
@ -220,16 +204,16 @@ public class RocksDBStore implements KeyValueStore<Bytes, byte[]> { @@ -220,16 +204,16 @@ public class RocksDBStore implements KeyValueStore<Bytes, byte[]> {
private void validateStoreOpen() {
if (!open) {
throw new InvalidStateStoreException("Store " + this.name + " is currently closed");
throw new InvalidStateStoreException("Store " + name + " is currently closed");
}
}
private byte[] getInternal(final byte[] rawKey) {
try {
return this.db.get(rawKey);
return db.get(rawKey);
} catch (final RocksDBException e) {
// String format is happening in wrapping stores. So formatted message is thrown from wrapping stores.
throw new ProcessorStateException("Error while getting value for key %s from store " + this.name, e);
throw new ProcessorStateException("Error while getting value for key %s from store " + name, e);
}
}
@ -238,18 +222,13 @@ public class RocksDBStore implements KeyValueStore<Bytes, byte[]> { @@ -238,18 +222,13 @@ public class RocksDBStore implements KeyValueStore<Bytes, byte[]> {
if (prepareForBulkload) {
// if the store is not empty, we need to compact to get around the num.levels check
// for bulk loading
final String[] sstFileNames = dbDir.list(new FilenameFilter() {
@Override
public boolean accept(final File dir, final String name) {
return SST_FILE_EXTENSION.matcher(name).matches();
}
});
final String[] sstFileNames = dbDir.list((dir, name) -> SST_FILE_EXTENSION.matcher(name).matches());
if (sstFileNames != null && sstFileNames.length > 0) {
try {
this.db.compactRange(true, 1, 0);
db.compactRange(true, 1, 0);
} catch (final RocksDBException e) {
throw new ProcessorStateException("Error while range compacting during restoring store " + this.name, e);
throw new ProcessorStateException("Error while range compacting during restoring store " + name, e);
}
}
}
@ -279,7 +258,7 @@ public class RocksDBStore implements KeyValueStore<Bytes, byte[]> { @@ -279,7 +258,7 @@ public class RocksDBStore implements KeyValueStore<Bytes, byte[]> {
return originalValue;
}
void restoreAllInternal(final Collection<KeyValue<byte[], byte[]>> records) {
private void restoreAllInternal(final Collection<KeyValue<byte[], byte[]>> records) {
try (final WriteBatch batch = new WriteBatch()) {
for (final KeyValue<byte[], byte[]> record : records) {
if (record.value == null) {
@ -290,7 +269,7 @@ public class RocksDBStore implements KeyValueStore<Bytes, byte[]> { @@ -290,7 +269,7 @@ public class RocksDBStore implements KeyValueStore<Bytes, byte[]> {
}
write(batch);
} catch (final RocksDBException e) {
throw new ProcessorStateException("Error restoring batch to store " + this.name, e);
throw new ProcessorStateException("Error restoring batch to store " + name, e);
}
}
@ -301,14 +280,14 @@ public class RocksDBStore implements KeyValueStore<Bytes, byte[]> { @@ -301,14 +280,14 @@ public class RocksDBStore implements KeyValueStore<Bytes, byte[]> {
db.delete(wOptions, rawKey);
} catch (final RocksDBException e) {
// String format is happening in wrapping stores. So formatted message is thrown from wrapping stores.
throw new ProcessorStateException("Error while removing key %s from store " + this.name, e);
throw new ProcessorStateException("Error while removing key %s from store " + name, e);
}
} else {
try {
db.put(wOptions, rawKey, rawValue);
} catch (final RocksDBException e) {
// String format is happening in wrapping stores. So formatted message is thrown from wrapping stores.
throw new ProcessorStateException("Error while putting key %s value %s into store " + this.name, e);
throw new ProcessorStateException("Error while putting key %s value %s into store " + name, e);
}
}
}
@ -330,7 +309,7 @@ public class RocksDBStore implements KeyValueStore<Bytes, byte[]> { @@ -330,7 +309,7 @@ public class RocksDBStore implements KeyValueStore<Bytes, byte[]> {
}
write(batch);
} catch (final RocksDBException e) {
throw new ProcessorStateException("Error while batch writing to store " + this.name, e);
throw new ProcessorStateException("Error while batch writing to store " + name, e);
}
}
@ -384,9 +363,9 @@ public class RocksDBStore implements KeyValueStore<Bytes, byte[]> { @@ -384,9 +363,9 @@ public class RocksDBStore implements KeyValueStore<Bytes, byte[]> {
validateStoreOpen();
final long value;
try {
value = this.db.getLongProperty("rocksdb.estimate-num-keys");
value = db.getLongProperty("rocksdb.estimate-num-keys");
} catch (final RocksDBException e) {
throw new ProcessorStateException("Error fetching property from store " + this.name, e);
throw new ProcessorStateException("Error fetching property from store " + name, e);
}
if (isOverflowing(value)) {
return Long.MAX_VALUE;
@ -415,7 +394,7 @@ public class RocksDBStore implements KeyValueStore<Bytes, byte[]> { @@ -415,7 +394,7 @@ public class RocksDBStore implements KeyValueStore<Bytes, byte[]> {
try {
db.flush(fOptions);
} catch (final RocksDBException e) {
throw new ProcessorStateException("Error while executing flush from store " + this.name, e);
throw new ProcessorStateException("Error while executing flush from store " + name, e);
}
}
@ -480,7 +459,7 @@ public class RocksDBStore implements KeyValueStore<Bytes, byte[]> { @@ -480,7 +459,7 @@ public class RocksDBStore implements KeyValueStore<Bytes, byte[]> {
if (!iter.isValid()) {
return allDone();
} else {
next = this.getKeyValue();
next = getKeyValue();
iter.next();
return next;
}
@ -524,8 +503,8 @@ public class RocksDBStore implements KeyValueStore<Bytes, byte[]> { @@ -524,8 +503,8 @@ public class RocksDBStore implements KeyValueStore<Bytes, byte[]> {
final Bytes to) {
super(storeName, iter);
iter.seek(from.get());
this.rawToKey = to.get();
if (this.rawToKey == null) {
rawToKey = to.get();
if (rawToKey == null) {
throw new NullPointerException("RocksDBRangeIterator: RawToKey is null for key " + to);
}
}
@ -537,7 +516,7 @@ public class RocksDBStore implements KeyValueStore<Bytes, byte[]> { @@ -537,7 +516,7 @@ public class RocksDBStore implements KeyValueStore<Bytes, byte[]> {
if (next == null) {
return allDone();
} else {
if (comparator.compare(next.key.get(), this.rawToKey) <= 0)
if (comparator.compare(next.key.get(), rawToKey) <= 0)
return next;
else
return allDone();

48
streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBStoreTest.java

@ -39,7 +39,6 @@ import org.rocksdb.Options; @@ -39,7 +39,6 @@ import org.rocksdb.Options;
import java.io.File;
import java.io.IOException;
import java.io.UnsupportedEncodingException;
import java.util.ArrayList;
import java.util.HashSet;
import java.util.List;
@ -47,6 +46,7 @@ import java.util.Map; @@ -47,6 +46,7 @@ import java.util.Map;
import java.util.Properties;
import java.util.Set;
import static java.nio.charset.StandardCharsets.UTF_8;
import static org.hamcrest.CoreMatchers.equalTo;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
@ -55,8 +55,8 @@ import static org.junit.Assert.assertTrue; @@ -55,8 +55,8 @@ import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
public class RocksDBStoreTest {
private Serializer<String> stringSerializer = new StringSerializer();
private Deserializer<String> stringDeserializer = new StringDeserializer();
private final Serializer<String> stringSerializer = new StringSerializer();
private final Deserializer<String> stringDeserializer = new StringDeserializer();
private RocksDBStore rocksDBStore;
private InternalMockProcessorContext context;
private File dir;
@ -94,7 +94,7 @@ public class RocksDBStoreTest { @@ -94,7 +94,7 @@ public class RocksDBStoreTest {
}
@Test
public void shouldNotThrowExceptionOnRestoreWhenThereIsPreExistingRocksDbFiles() throws Exception {
public void shouldNotThrowExceptionOnRestoreWhenThereIsPreExistingRocksDbFiles() {
rocksDBStore.init(context, rocksDBStore);
final String message = "how can a 4 ounce bird carry a 2lb coconut";
@ -106,8 +106,8 @@ public class RocksDBStoreTest { @@ -106,8 +106,8 @@ public class RocksDBStoreTest {
final List<KeyValue<byte[], byte[]>> restoreBytes = new ArrayList<>();
final byte[] restoredKey = "restoredKey".getBytes("UTF-8");
final byte[] restoredValue = "restoredValue".getBytes("UTF-8");
final byte[] restoredKey = "restoredKey".getBytes(UTF_8);
final byte[] restoredValue = "restoredValue".getBytes(UTF_8);
restoreBytes.add(KeyValue.pair(restoredKey, restoredValue));
context.restore("test", restoreBytes);
@ -191,7 +191,7 @@ public class RocksDBStoreTest { @@ -191,7 +191,7 @@ public class RocksDBStoreTest {
}
@Test
public void shouldTogglePrepareForBulkloadSettingWhenPrexistingSstFiles() throws Exception {
public void shouldTogglePrepareForBulkloadSettingWhenPrexistingSstFiles() {
final List<KeyValue<byte[], byte[]>> entries = getKeyValueEntries();
rocksDBStore.init(context, rocksDBStore);
@ -208,7 +208,7 @@ public class RocksDBStoreTest { @@ -208,7 +208,7 @@ public class RocksDBStoreTest {
}
@Test
public void shouldRestoreAll() throws Exception {
public void shouldRestoreAll() {
final List<KeyValue<byte[], byte[]>> entries = getKeyValueEntries();
rocksDBStore.init(context, rocksDBStore);
@ -246,9 +246,9 @@ public class RocksDBStoreTest { @@ -246,9 +246,9 @@ public class RocksDBStoreTest {
}
@Test
public void shouldHandleDeletesOnRestoreAll() throws Exception {
public void shouldHandleDeletesOnRestoreAll() {
final List<KeyValue<byte[], byte[]>> entries = getKeyValueEntries();
entries.add(new KeyValue<>("1".getBytes("UTF-8"), null));
entries.add(new KeyValue<>("1".getBytes(UTF_8), null));
rocksDBStore.init(context, rocksDBStore);
context.restore(rocksDBStore.name(), entries);
@ -264,15 +264,15 @@ public class RocksDBStoreTest { @@ -264,15 +264,15 @@ public class RocksDBStoreTest {
}
@Test
public void shouldHandleDeletesAndPutbackOnRestoreAll() throws Exception {
public void shouldHandleDeletesAndPutbackOnRestoreAll() {
final List<KeyValue<byte[], byte[]>> entries = new ArrayList<>();
entries.add(new KeyValue<>("1".getBytes("UTF-8"), "a".getBytes("UTF-8")));
entries.add(new KeyValue<>("2".getBytes("UTF-8"), "b".getBytes("UTF-8")));
entries.add(new KeyValue<>("1".getBytes(UTF_8), "a".getBytes(UTF_8)));
entries.add(new KeyValue<>("2".getBytes(UTF_8), "b".getBytes(UTF_8)));
// this will be deleted
entries.add(new KeyValue<>("1".getBytes("UTF-8"), null));
entries.add(new KeyValue<>("3".getBytes("UTF-8"), "c".getBytes("UTF-8")));
entries.add(new KeyValue<>("1".getBytes(UTF_8), null));
entries.add(new KeyValue<>("3".getBytes(UTF_8), "c".getBytes(UTF_8)));
// this will restore key "1" as WriteBatch applies updates in order
entries.add(new KeyValue<>("1".getBytes("UTF-8"), "restored".getBytes("UTF-8")));
entries.add(new KeyValue<>("1".getBytes(UTF_8), "restored".getBytes(UTF_8)));
rocksDBStore.init(context, rocksDBStore);
context.restore(rocksDBStore.name(), entries);
@ -304,7 +304,7 @@ public class RocksDBStoreTest { @@ -304,7 +304,7 @@ public class RocksDBStoreTest {
}
@Test
public void shouldRestoreThenDeleteOnRestoreAll() throws Exception {
public void shouldRestoreThenDeleteOnRestoreAll() {
final List<KeyValue<byte[], byte[]>> entries = getKeyValueEntries();
rocksDBStore.init(context, rocksDBStore);
@ -329,9 +329,9 @@ public class RocksDBStoreTest { @@ -329,9 +329,9 @@ public class RocksDBStoreTest {
entries.clear();
entries.add(new KeyValue<>("2".getBytes("UTF-8"), "b".getBytes("UTF-8")));
entries.add(new KeyValue<>("3".getBytes("UTF-8"), "c".getBytes("UTF-8")));
entries.add(new KeyValue<>("1".getBytes("UTF-8"), null));
entries.add(new KeyValue<>("2".getBytes(UTF_8), "b".getBytes(UTF_8)));
entries.add(new KeyValue<>("3".getBytes(UTF_8), "c".getBytes(UTF_8)));
entries.add(new KeyValue<>("1".getBytes(UTF_8), null));
context.restore(rocksDBStore.name(), entries);
@ -423,11 +423,11 @@ public class RocksDBStoreTest { @@ -423,11 +423,11 @@ public class RocksDBStoreTest {
}
}
private List<KeyValue<byte[], byte[]>> getKeyValueEntries() throws UnsupportedEncodingException {
private List<KeyValue<byte[], byte[]>> getKeyValueEntries() {
final List<KeyValue<byte[], byte[]>> entries = new ArrayList<>();
entries.add(new KeyValue<>("1".getBytes("UTF-8"), "a".getBytes("UTF-8")));
entries.add(new KeyValue<>("2".getBytes("UTF-8"), "b".getBytes("UTF-8")));
entries.add(new KeyValue<>("3".getBytes("UTF-8"), "c".getBytes("UTF-8")));
entries.add(new KeyValue<>("1".getBytes(UTF_8), "a".getBytes(UTF_8)));
entries.add(new KeyValue<>("2".getBytes(UTF_8), "b".getBytes(UTF_8)));
entries.add(new KeyValue<>("3".getBytes(UTF_8), "c".getBytes(UTF_8)));
return entries;
}
}

Loading…
Cancel
Save