Browse Source

KAFKA-3522: Add RocksDBTimestampedStore (#6149)

Reviewers: Bill Bejeck <bill@confluent.io>, John Roesler <john@confluent.io>, Guozhang Wang <guozhang@confluent.io>
pull/6135/merge
Matthias J. Sax 6 years ago committed by GitHub
parent
commit
73565b7f67
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 2
      streams/src/main/java/org/apache/kafka/streams/state/Stores.java
  2. 1362
      streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBGenericOptionsToDbOptionsColumnFamilyOptionsAdapter.java
  3. 62
      streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBRangeIterator.java
  4. 420
      streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBStore.java
  5. 394
      streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBTimestampedStore.java
  6. 89
      streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDbIterator.java
  7. 7
      streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDbKeyValueBytesStoreSupplier.java
  8. 36
      streams/src/main/java/org/apache/kafka/streams/state/internals/StoreProxyUtils.java
  9. 263
      streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBGenericOptionsToDbOptionsColumnFamilyOptionsAdapterTest.java
  10. 2
      streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBSessionStoreTest.java
  11. 83
      streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBStoreTest.java
  12. 293
      streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBTimestampedStoreTest.java

2
streams/src/main/java/org/apache/kafka/streams/state/Stores.java

@ -84,7 +84,7 @@ public class Stores { @@ -84,7 +84,7 @@ public class Stores {
*/
public static KeyValueBytesStoreSupplier persistentKeyValueStore(final String name) {
Objects.requireNonNull(name, "name cannot be null");
return new RocksDbKeyValueBytesStoreSupplier(name);
return new RocksDbKeyValueBytesStoreSupplier(name, true);
}
/**

1362
streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBGenericOptionsToDbOptionsColumnFamilyOptionsAdapter.java

File diff suppressed because it is too large Load Diff

62
streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBRangeIterator.java

@ -0,0 +1,62 @@ @@ -0,0 +1,62 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.streams.state.internals;
import org.apache.kafka.common.utils.Bytes;
import org.apache.kafka.streams.KeyValue;
import org.apache.kafka.streams.state.KeyValueIterator;
import org.rocksdb.RocksIterator;
import java.util.Comparator;
import java.util.Set;
class RocksDBRangeIterator extends RocksDbIterator {
// RocksDB's JNI interface does not expose getters/setters that allow the
// comparator to be pluggable, and the default is lexicographic, so it's
// safe to just force lexicographic comparator here for now.
private final Comparator<byte[]> comparator = Bytes.BYTES_LEXICO_COMPARATOR;
private final byte[] rawToKey;
RocksDBRangeIterator(final String storeName,
final RocksIterator iter,
final Set<KeyValueIterator<Bytes, byte[]>> openIterators,
final Bytes from,
final Bytes to) {
super(storeName, iter, openIterators);
iter.seek(from.get());
rawToKey = to.get();
if (rawToKey == null) {
throw new NullPointerException("RocksDBRangeIterator: RawToKey is null for key " + to);
}
}
@Override
public KeyValue<Bytes, byte[]> makeNext() {
final KeyValue<Bytes, byte[]> next = super.makeNext();
if (next == null) {
return allDone();
} else {
if (comparator.compare(next.key.get(), rawToKey) <= 0) {
return next;
} else {
return allDone();
}
}
}
}

420
streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBStore.java

@ -17,7 +17,6 @@ @@ -17,7 +17,6 @@
package org.apache.kafka.streams.state.internals;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.utils.AbstractIterator;
import org.apache.kafka.common.utils.Bytes;
import org.apache.kafka.common.utils.Utils;
import org.apache.kafka.streams.KeyValue;
@ -33,8 +32,12 @@ import org.apache.kafka.streams.state.KeyValueStore; @@ -33,8 +32,12 @@ import org.apache.kafka.streams.state.KeyValueStore;
import org.apache.kafka.streams.state.RocksDBConfigSetter;
import org.rocksdb.BlockBasedTableConfig;
import org.rocksdb.BloomFilter;
import org.rocksdb.ColumnFamilyDescriptor;
import org.rocksdb.ColumnFamilyHandle;
import org.rocksdb.ColumnFamilyOptions;
import org.rocksdb.CompactionStyle;
import org.rocksdb.CompressionType;
import org.rocksdb.DBOptions;
import org.rocksdb.FlushOptions;
import org.rocksdb.InfoLogLevel;
import org.rocksdb.Options;
@ -49,26 +52,20 @@ import org.slf4j.LoggerFactory; @@ -49,26 +52,20 @@ import org.slf4j.LoggerFactory;
import java.io.File;
import java.io.IOException;
import java.nio.file.Files;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.Comparator;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.NoSuchElementException;
import java.util.Objects;
import java.util.Set;
import java.util.regex.Pattern;
/**
* A persistent key-value store based on RocksDB.
*
* Note that the use of array-typed keys is discouraged because they result in incorrect caching behavior.
* If you intend to work on byte arrays as key, for example, you may want to wrap them with the {@code Bytes} class,
* i.e. use {@code RocksDBStore<Bytes, ...>} rather than {@code RocksDBStore<byte[], ...>}.
*/
public class RocksDBStore implements KeyValueStore<Bytes, byte[]> {
private static final Logger log = LoggerFactory.getLogger(RocksDBStore.class);
private static final Pattern SST_FILE_EXTENSION = Pattern.compile(".*\\.sst");
@ -81,17 +78,18 @@ public class RocksDBStore implements KeyValueStore<Bytes, byte[]> { @@ -81,17 +78,18 @@ public class RocksDBStore implements KeyValueStore<Bytes, byte[]> {
private static final int MAX_WRITE_BUFFERS = 3;
private static final String DB_FILE_DIR = "rocksdb";
private final String name;
final String name;
private final String parentDir;
private final Set<KeyValueIterator> openIterators = Collections.synchronizedSet(new HashSet<>());
final Set<KeyValueIterator<Bytes, byte[]>> openIterators = Collections.synchronizedSet(new HashSet<>());
File dbDir;
private RocksDB db;
RocksDB db;
RocksDBAccessor dbAccessor;
// the following option objects will be created in the constructor and closed in the close() method
private Options options;
private WriteOptions wOptions;
private FlushOptions fOptions;
private RocksDBGenericOptionsToDbOptionsColumnFamilyOptionsAdapter userSpecifiedOptions;
WriteOptions wOptions;
FlushOptions fOptions;
private volatile boolean prepareForBulkload = false;
ProcessorContext internalProcessorContext;
@ -112,25 +110,27 @@ public class RocksDBStore implements KeyValueStore<Bytes, byte[]> { @@ -112,25 +110,27 @@ public class RocksDBStore implements KeyValueStore<Bytes, byte[]> {
@SuppressWarnings("unchecked")
public void openDB(final ProcessorContext context) {
void openDB(final ProcessorContext context) {
// initialize the default rocksdb options
options = new Options();
final DBOptions dbOptions = new DBOptions();
final ColumnFamilyOptions columnFamilyOptions = new ColumnFamilyOptions();
userSpecifiedOptions = new RocksDBGenericOptionsToDbOptionsColumnFamilyOptionsAdapter(dbOptions, columnFamilyOptions);
final BlockBasedTableConfig tableConfig = new BlockBasedTableConfig();
tableConfig.setBlockCacheSize(BLOCK_CACHE_SIZE);
tableConfig.setBlockSize(BLOCK_SIZE);
tableConfig.setFilter(new BloomFilter());
options.optimizeFiltersForHits();
options.setTableFormatConfig(tableConfig);
options.setWriteBufferSize(WRITE_BUFFER_SIZE);
options.setCompressionType(COMPRESSION_TYPE);
options.setCompactionStyle(COMPACTION_STYLE);
options.setMaxWriteBufferNumber(MAX_WRITE_BUFFERS);
options.setCreateIfMissing(true);
options.setErrorIfExists(false);
options.setInfoLogLevel(InfoLogLevel.ERROR_LEVEL);
userSpecifiedOptions.optimizeFiltersForHits();
userSpecifiedOptions.setTableFormatConfig(tableConfig);
userSpecifiedOptions.setWriteBufferSize(WRITE_BUFFER_SIZE);
userSpecifiedOptions.setCompressionType(COMPRESSION_TYPE);
userSpecifiedOptions.setCompactionStyle(COMPACTION_STYLE);
userSpecifiedOptions.setMaxWriteBufferNumber(MAX_WRITE_BUFFERS);
userSpecifiedOptions.setCreateIfMissing(true);
userSpecifiedOptions.setErrorIfExists(false);
userSpecifiedOptions.setInfoLogLevel(InfoLogLevel.ERROR_LEVEL);
// this is the recommended way to increase parallelism in RocksDb
// note that the current implementation of setIncreaseParallelism affects the number
// of compaction threads but not flush threads (the latter remains one). Also
@ -138,7 +138,7 @@ public class RocksDBStore implements KeyValueStore<Bytes, byte[]> { @@ -138,7 +138,7 @@ public class RocksDBStore implements KeyValueStore<Bytes, byte[]> {
// https://github.com/facebook/rocksdb/blob/62ad0a9b19f0be4cefa70b6b32876e764b7f3c11/util/options.cc#L580
// subtracts one from the value passed to determine the number of compaction threads
// (this could be a bug in the RocksDB code and their devs have been contacted).
options.setIncreaseParallelism(Math.max(Runtime.getRuntime().availableProcessors(), 2));
userSpecifiedOptions.setIncreaseParallelism(Math.max(Runtime.getRuntime().availableProcessors(), 2));
wOptions = new WriteOptions();
wOptions.setDisableWAL(true);
@ -148,34 +148,44 @@ public class RocksDBStore implements KeyValueStore<Bytes, byte[]> { @@ -148,34 +148,44 @@ public class RocksDBStore implements KeyValueStore<Bytes, byte[]> {
final Map<String, Object> configs = context.appConfigs();
final Class<RocksDBConfigSetter> configSetterClass =
(Class<RocksDBConfigSetter>) configs.get(StreamsConfig.ROCKSDB_CONFIG_SETTER_CLASS_CONFIG);
(Class<RocksDBConfigSetter>) configs.get(StreamsConfig.ROCKSDB_CONFIG_SETTER_CLASS_CONFIG);
if (configSetterClass != null) {
final RocksDBConfigSetter configSetter = Utils.newInstance(configSetterClass);
configSetter.setConfig(name, options, configs);
configSetter.setConfig(name, userSpecifiedOptions, configs);
}
if (prepareForBulkload) {
options.prepareForBulkLoad();
userSpecifiedOptions.prepareForBulkLoad();
}
dbDir = new File(new File(context.stateDir(), parentDir), name);
try {
try {
Files.createDirectories(dbDir.getParentFile().toPath());
Files.createDirectories(dbDir.getAbsoluteFile().toPath());
db = RocksDB.open(options, dbDir.getAbsolutePath());
} catch (final RocksDBException e) {
throw new ProcessorStateException("Error opening store " + name + " at location " + dbDir.toString(), e);
}
} catch (final IOException e) {
throw new ProcessorStateException(e);
Files.createDirectories(dbDir.getParentFile().toPath());
Files.createDirectories(dbDir.getAbsoluteFile().toPath());
} catch (final IOException fatal) {
throw new ProcessorStateException(fatal);
}
openRocksDB(dbOptions, columnFamilyOptions);
open = true;
}
void openRocksDB(final DBOptions dbOptions,
final ColumnFamilyOptions columnFamilyOptions) {
final List<ColumnFamilyDescriptor> columnFamilyDescriptors
= Collections.singletonList(new ColumnFamilyDescriptor(RocksDB.DEFAULT_COLUMN_FAMILY, columnFamilyOptions));
final List<ColumnFamilyHandle> columnFamilies = new ArrayList<>(columnFamilyDescriptors.size());
try {
db = RocksDB.open(dbOptions, dbDir.getAbsolutePath(), columnFamilyDescriptors, columnFamilies);
dbAccessor = new SingleColumnFamilyAccessor(columnFamilies.get(0));
} catch (final RocksDBException e) {
throw new ProcessorStateException("Error opening store " + name + " at location " + dbDir.toString(), e);
}
}
public void init(final ProcessorContext context,
final StateStore root) {
// open the DB dir
@ -208,54 +218,19 @@ public class RocksDBStore implements KeyValueStore<Bytes, byte[]> { @@ -208,54 +218,19 @@ public class RocksDBStore implements KeyValueStore<Bytes, byte[]> {
return open;
}
@Override
public synchronized byte[] get(final Bytes key) {
validateStoreOpen();
return getInternal(key.get());
}
private void validateStoreOpen() {
if (!open) {
throw new InvalidStateStoreException("Store " + name + " is currently closed");
}
}
private byte[] getInternal(final byte[] rawKey) {
try {
return db.get(rawKey);
} catch (final RocksDBException e) {
// String format is happening in wrapping stores. So formatted message is thrown from wrapping stores.
throw new ProcessorStateException("Error while getting value for key %s from store " + name, e);
}
}
void toggleDbForBulkLoading(final boolean prepareForBulkload) {
if (prepareForBulkload) {
// if the store is not empty, we need to compact to get around the num.levels check
// for bulk loading
final String[] sstFileNames = dbDir.list((dir, name) -> SST_FILE_EXTENSION.matcher(name).matches());
if (sstFileNames != null && sstFileNames.length > 0) {
try {
db.compactRange(true, 1, 0);
} catch (final RocksDBException e) {
throw new ProcessorStateException("Error while range compacting during restoring store " + name, e);
}
}
}
close();
this.prepareForBulkload = prepareForBulkload;
openDB(internalProcessorContext);
}
@SuppressWarnings("unchecked")
@Override
public synchronized void put(final Bytes key,
final byte[] value) {
Objects.requireNonNull(key, "key cannot be null");
validateStoreOpen();
putInternal(key.get(), value);
dbAccessor.put(key.get(), value);
}
@Override
@ -269,68 +244,39 @@ public class RocksDBStore implements KeyValueStore<Bytes, byte[]> { @@ -269,68 +244,39 @@ public class RocksDBStore implements KeyValueStore<Bytes, byte[]> {
return originalValue;
}
private void restoreAllInternal(final Collection<KeyValue<byte[], byte[]>> records) {
@Override
public void putAll(final List<KeyValue<Bytes, byte[]>> entries) {
try (final WriteBatch batch = new WriteBatch()) {
for (final KeyValue<byte[], byte[]> record : records) {
if (record.value == null) {
batch.delete(record.key);
} else {
batch.put(record.key, record.value);
}
}
dbAccessor.prepareBatch(entries, batch);
write(batch);
} catch (final RocksDBException e) {
throw new ProcessorStateException("Error restoring batch to store " + name, e);
}
}
private void putInternal(final byte[] rawKey,
final byte[] rawValue) {
if (rawValue == null) {
try {
db.delete(wOptions, rawKey);
} catch (final RocksDBException e) {
// String format is happening in wrapping stores. So formatted message is thrown from wrapping stores.
throw new ProcessorStateException("Error while removing key %s from store " + name, e);
}
} else {
try {
db.put(wOptions, rawKey, rawValue);
} catch (final RocksDBException e) {
// String format is happening in wrapping stores. So formatted message is thrown from wrapping stores.
throw new ProcessorStateException("Error while putting key %s value %s into store " + name, e);
}
throw new ProcessorStateException("Error while batch writing to store " + name, e);
}
}
void write(final WriteBatch batch) throws RocksDBException {
db.write(wOptions, batch);
}
@Override
public void putAll(final List<KeyValue<Bytes, byte[]>> entries) {
try (final WriteBatch batch = new WriteBatch()) {
for (final KeyValue<Bytes, byte[]> entry : entries) {
Objects.requireNonNull(entry.key, "key cannot be null");
if (entry.value == null) {
batch.delete(entry.key.get());
} else {
batch.put(entry.key.get(), entry.value);
}
}
write(batch);
public synchronized byte[] get(final Bytes key) {
validateStoreOpen();
try {
return dbAccessor.get(key.get());
} catch (final RocksDBException e) {
throw new ProcessorStateException("Error while batch writing to store " + name, e);
// String format is happening in wrapping stores. So formatted message is thrown from wrapping stores.
throw new ProcessorStateException("Error while getting value for key from store " + name, e);
}
}
@Override
public synchronized byte[] delete(final Bytes key) {
Objects.requireNonNull(key, "key cannot be null");
final byte[] value = get(key);
final byte[] oldValue;
try {
oldValue = dbAccessor.getOnly(key.get());
} catch (final RocksDBException e) {
// String format is happening in wrapping stores. So formatted message is thrown from wrapping stores.
throw new ProcessorStateException("Error while getting value for key from store " + name, e);
}
put(key, null);
return value;
return oldValue;
}
@Override
@ -340,8 +286,7 @@ public class RocksDBStore implements KeyValueStore<Bytes, byte[]> { @@ -340,8 +286,7 @@ public class RocksDBStore implements KeyValueStore<Bytes, byte[]> {
Objects.requireNonNull(to, "to cannot be null");
validateStoreOpen();
// query rocksdb
final RocksDBRangeIterator rocksDBRangeIterator = new RocksDBRangeIterator(name, db.newIterator(), from, to);
final KeyValueIterator<Bytes, byte[]> rocksDBRangeIterator = dbAccessor.range(from, to);
openIterators.add(rocksDBRangeIterator);
return rocksDBRangeIterator;
@ -350,10 +295,7 @@ public class RocksDBStore implements KeyValueStore<Bytes, byte[]> { @@ -350,10 +295,7 @@ public class RocksDBStore implements KeyValueStore<Bytes, byte[]> {
@Override
public synchronized KeyValueIterator<Bytes, byte[]> all() {
validateStoreOpen();
// query rocksdb
final RocksIterator innerIter = db.newIterator();
innerIter.seekToFirst();
final RocksDbIterator rocksDbIterator = new RocksDbIterator(name, innerIter);
final KeyValueIterator<Bytes, byte[]> rocksDbIterator = dbAccessor.all();
openIterators.add(rocksDbIterator);
return rocksDbIterator;
}
@ -372,16 +314,16 @@ public class RocksDBStore implements KeyValueStore<Bytes, byte[]> { @@ -372,16 +314,16 @@ public class RocksDBStore implements KeyValueStore<Bytes, byte[]> {
@Override
public long approximateNumEntries() {
validateStoreOpen();
final long value;
final long numEntries;
try {
value = db.getLongProperty("rocksdb.estimate-num-keys");
numEntries = dbAccessor.approximateNumEntries();
} catch (final RocksDBException e) {
throw new ProcessorStateException("Error fetching property from store " + name, e);
}
if (isOverflowing(value)) {
if (isOverflowing(numEntries)) {
return Long.MAX_VALUE;
}
return value;
return numEntries;
}
private boolean isOverflowing(final long value) {
@ -395,20 +337,32 @@ public class RocksDBStore implements KeyValueStore<Bytes, byte[]> { @@ -395,20 +337,32 @@ public class RocksDBStore implements KeyValueStore<Bytes, byte[]> {
if (db == null) {
return;
}
// flush RocksDB
flushInternal();
}
/**
* @throws ProcessorStateException if flushing failed because of any internal store exceptions
*/
private void flushInternal() {
try {
db.flush(fOptions);
dbAccessor.flush();
} catch (final RocksDBException e) {
throw new ProcessorStateException("Error while executing flush from store " + name, e);
}
}
void toggleDbForBulkLoading(final boolean prepareForBulkload) {
if (prepareForBulkload) {
// if the store is not empty, we need to compact to get around the num.levels check for bulk loading
final String[] sstFileNames = dbDir.list((dir, name) -> SST_FILE_EXTENSION.matcher(name).matches());
if (sstFileNames != null && sstFileNames.length > 0) {
dbAccessor.toggleDbForBulkLoading();
}
}
close();
this.prepareForBulkload = prepareForBulkload;
openDB(internalProcessorContext);
}
void write(final WriteBatch batch) throws RocksDBException {
db.write(wOptions, batch);
}
@Override
public synchronized void close() {
if (!open) {
@ -417,129 +371,170 @@ public class RocksDBStore implements KeyValueStore<Bytes, byte[]> { @@ -417,129 +371,170 @@ public class RocksDBStore implements KeyValueStore<Bytes, byte[]> {
open = false;
closeOpenIterators();
options.close();
dbAccessor.close();
userSpecifiedOptions.close();
wOptions.close();
fOptions.close();
db.close();
options = null;
userSpecifiedOptions = null;
wOptions = null;
fOptions = null;
db = null;
}
private void closeOpenIterators() {
final HashSet<KeyValueIterator> iterators;
final HashSet<KeyValueIterator<Bytes, byte[]>> iterators;
synchronized (openIterators) {
iterators = new HashSet<>(openIterators);
}
if (iterators.size() != 0) {
log.warn("Closing {} open iterators for store {}", iterators.size(), name);
for (final KeyValueIterator iterator : iterators) {
for (final KeyValueIterator<Bytes, byte[]> iterator : iterators) {
iterator.close();
}
}
}
private class RocksDbIterator
extends AbstractIterator<KeyValue<Bytes, byte[]>>
implements KeyValueIterator<Bytes, byte[]> {
private final String storeName;
private final RocksIterator iter;
private volatile boolean open = true;
interface RocksDBAccessor {
void put(final byte[] key,
final byte[] value);
void prepareBatch(final List<KeyValue<Bytes, byte[]>> entries,
final WriteBatch batch) throws RocksDBException;
byte[] get(final byte[] key) throws RocksDBException;
/**
* In contrast to get(), we don't migrate the key to new CF.
* <p>
* Use for get() within delete() -- no need to migrate, as it's deleted anyway
*/
byte[] getOnly(final byte[] key) throws RocksDBException;
KeyValueIterator<Bytes, byte[]> range(final Bytes from,
final Bytes to);
KeyValueIterator<Bytes, byte[]> all();
long approximateNumEntries() throws RocksDBException;
void flush() throws RocksDBException;
void prepareBatchForRestore(final Collection<KeyValue<byte[], byte[]>> records,
final WriteBatch batch) throws RocksDBException;
void close();
void toggleDbForBulkLoading();
}
private KeyValue<Bytes, byte[]> next;
class SingleColumnFamilyAccessor implements RocksDBAccessor {
private final ColumnFamilyHandle columnFamily;
RocksDbIterator(final String storeName,
final RocksIterator iter) {
this.iter = iter;
this.storeName = storeName;
SingleColumnFamilyAccessor(final ColumnFamilyHandle columnFamily) {
this.columnFamily = columnFamily;
}
@Override
public synchronized boolean hasNext() {
if (!open) {
throw new InvalidStateStoreException(String.format("RocksDB iterator for store %s has closed", storeName));
public void put(final byte[] key,
final byte[] value) {
if (value == null) {
try {
db.delete(columnFamily, wOptions, key);
} catch (final RocksDBException e) {
// String format is happening in wrapping stores. So formatted message is thrown from wrapping stores.
throw new ProcessorStateException("Error while removing key from store " + name, e);
}
} else {
try {
db.put(columnFamily, wOptions, key, value);
} catch (final RocksDBException e) {
// String format is happening in wrapping stores. So formatted message is thrown from wrapping stores.
throw new ProcessorStateException("Error while putting key/value into store " + name, e);
}
}
return super.hasNext();
}
@Override
public synchronized KeyValue<Bytes, byte[]> next() {
return super.next();
public void prepareBatch(final List<KeyValue<Bytes, byte[]>> entries,
final WriteBatch batch) throws RocksDBException {
for (final KeyValue<Bytes, byte[]> entry : entries) {
Objects.requireNonNull(entry.key, "key cannot be null");
if (entry.value == null) {
batch.delete(columnFamily, entry.key.get());
} else {
batch.put(columnFamily, entry.key.get(), entry.value);
}
}
}
@Override
public KeyValue<Bytes, byte[]> makeNext() {
if (!iter.isValid()) {
return allDone();
} else {
next = getKeyValue();
iter.next();
return next;
}
public byte[] get(final byte[] key) throws RocksDBException {
return db.get(columnFamily, key);
}
private KeyValue<Bytes, byte[]> getKeyValue() {
return new KeyValue<>(new Bytes(iter.key()), iter.value());
@Override
public byte[] getOnly(final byte[] key) throws RocksDBException {
return db.get(columnFamily, key);
}
@Override
public void remove() {
throw new UnsupportedOperationException("RocksDB iterator does not support remove()");
public KeyValueIterator<Bytes, byte[]> range(final Bytes from,
final Bytes to) {
return new RocksDBRangeIterator(
name,
db.newIterator(columnFamily),
openIterators,
from,
to);
}
@Override
public synchronized void close() {
openIterators.remove(this);
iter.close();
open = false;
public KeyValueIterator<Bytes, byte[]> all() {
final RocksIterator innerIterWithTimestamp = db.newIterator(columnFamily);
innerIterWithTimestamp.seekToFirst();
return new RocksDbIterator(name, innerIterWithTimestamp, openIterators);
}
@Override
public Bytes peekNextKey() {
if (!hasNext()) {
throw new NoSuchElementException();
}
return next.key;
public long approximateNumEntries() throws RocksDBException {
return db.getLongProperty(columnFamily, "rocksdb.estimate-num-keys");
}
}
private class RocksDBRangeIterator extends RocksDbIterator {
// RocksDB's JNI interface does not expose getters/setters that allow the
// comparator to be pluggable, and the default is lexicographic, so it's
// safe to just force lexicographic comparator here for now.
private final Comparator<byte[]> comparator = Bytes.BYTES_LEXICO_COMPARATOR;
private final byte[] rawToKey;
RocksDBRangeIterator(final String storeName,
final RocksIterator iter,
final Bytes from,
final Bytes to) {
super(storeName, iter);
iter.seek(from.get());
rawToKey = to.get();
if (rawToKey == null) {
throw new NullPointerException("RocksDBRangeIterator: RawToKey is null for key " + to);
}
@Override
public void flush() throws RocksDBException {
db.flush(fOptions, columnFamily);
}
@Override
public KeyValue<Bytes, byte[]> makeNext() {
final KeyValue<Bytes, byte[]> next = super.makeNext();
if (next == null) {
return allDone();
} else {
if (comparator.compare(next.key.get(), rawToKey) <= 0) {
return next;
public void prepareBatchForRestore(final Collection<KeyValue<byte[], byte[]>> records,
final WriteBatch batch) throws RocksDBException {
for (final KeyValue<byte[], byte[]> record : records) {
if (record.value == null) {
batch.delete(columnFamily, record.key);
} else {
return allDone();
batch.put(columnFamily, record.key, record.value);
}
}
}
@Override
public void close() {
columnFamily.close();
}
@Override
public void toggleDbForBulkLoading() {
try {
db.compactRange(columnFamily, true, 1, 0);
} catch (final RocksDBException e) {
throw new ProcessorStateException("Error while range compacting during restoring store " + name, e);
}
}
}
// not private for testing
@ -553,7 +548,12 @@ public class RocksDBStore implements KeyValueStore<Bytes, byte[]> { @@ -553,7 +548,12 @@ public class RocksDBStore implements KeyValueStore<Bytes, byte[]> {
@Override
public void restoreAll(final Collection<KeyValue<byte[], byte[]>> records) {
rocksDBStore.restoreAllInternal(records);
try (final WriteBatch batch = new WriteBatch()) {
rocksDBStore.dbAccessor.prepareBatchForRestore(records, batch);
rocksDBStore.write(batch);
} catch (final RocksDBException e) {
throw new ProcessorStateException("Error restoring batch to store " + rocksDBStore.name, e);
}
}
@Override
@ -574,6 +574,6 @@ public class RocksDBStore implements KeyValueStore<Bytes, byte[]> { @@ -574,6 +574,6 @@ public class RocksDBStore implements KeyValueStore<Bytes, byte[]> {
// for testing
public Options getOptions() {
return options;
return userSpecifiedOptions;
}
}

394
streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBTimestampedStore.java

@ -0,0 +1,394 @@ @@ -0,0 +1,394 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.streams.state.internals;
import org.apache.kafka.common.utils.AbstractIterator;
import org.apache.kafka.common.utils.Bytes;
import org.apache.kafka.streams.KeyValue;
import org.apache.kafka.streams.errors.InvalidStateStoreException;
import org.apache.kafka.streams.errors.ProcessorStateException;
import org.apache.kafka.streams.state.KeyValueIterator;
import org.rocksdb.ColumnFamilyDescriptor;
import org.rocksdb.ColumnFamilyHandle;
import org.rocksdb.ColumnFamilyOptions;
import org.rocksdb.DBOptions;
import org.rocksdb.RocksDB;
import org.rocksdb.RocksDBException;
import org.rocksdb.RocksIterator;
import org.rocksdb.WriteBatch;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Comparator;
import java.util.List;
import java.util.NoSuchElementException;
import java.util.Objects;
import static java.util.Arrays.asList;
import static org.apache.kafka.streams.state.internals.StoreProxyUtils.getValueWithUnknownTimestamp;
/**
* A persistent key-(value-timestamp) store based on RocksDB.
*/
public class RocksDBTimestampedStore extends RocksDBStore {
private static final Logger log = LoggerFactory.getLogger(RocksDBTimestampedStore.class);
RocksDBTimestampedStore(final String name) {
super(name);
}
@Override
void openRocksDB(final DBOptions dbOptions,
final ColumnFamilyOptions columnFamilyOptions) {
final List<ColumnFamilyDescriptor> columnFamilyDescriptors = asList(
new ColumnFamilyDescriptor(RocksDB.DEFAULT_COLUMN_FAMILY, columnFamilyOptions),
new ColumnFamilyDescriptor("keyValueWithTimestamp".getBytes(StandardCharsets.UTF_8), columnFamilyOptions));
final List<ColumnFamilyHandle> columnFamilies = new ArrayList<>(columnFamilyDescriptors.size());
try {
db = RocksDB.open(dbOptions, dbDir.getAbsolutePath(), columnFamilyDescriptors, columnFamilies);
final ColumnFamilyHandle noTimestampColumnFamily = columnFamilies.get(0);
final RocksIterator noTimestampsIter = db.newIterator(noTimestampColumnFamily);
noTimestampsIter.seekToFirst();
if (noTimestampsIter.isValid()) {
log.info("Opening store {} in upgrade mode", name);
dbAccessor = new DualColumnFamilyAccessor(noTimestampColumnFamily, columnFamilies.get(1));
} else {
log.info("Opening store {} in regular mode", name);
dbAccessor = new SingleColumnFamilyAccessor(columnFamilies.get(1));
}
noTimestampsIter.close();
} catch (final RocksDBException e) {
if ("Column family not found: : keyValueWithTimestamp".equals(e.getMessage())) {
try {
db = RocksDB.open(dbOptions, dbDir.getAbsolutePath(), columnFamilyDescriptors.subList(0, 1), columnFamilies);
columnFamilies.add(db.createColumnFamily(columnFamilyDescriptors.get(1)));
} catch (final RocksDBException fatal) {
throw new ProcessorStateException("Error opening store " + name + " at location " + dbDir.toString(), fatal);
}
log.info("Opening store {} in upgrade mode", name);
dbAccessor = new DualColumnFamilyAccessor(columnFamilies.get(0), columnFamilies.get(1));
} else {
throw new ProcessorStateException("Error opening store " + name + " at location " + dbDir.toString(), e);
}
}
}
private class DualColumnFamilyAccessor implements RocksDBAccessor {
private final ColumnFamilyHandle oldColumnFamily;
private final ColumnFamilyHandle newColumnFamily;
private DualColumnFamilyAccessor(final ColumnFamilyHandle oldColumnFamily,
final ColumnFamilyHandle newColumnFamily) {
this.oldColumnFamily = oldColumnFamily;
this.newColumnFamily = newColumnFamily;
}
@Override
public void put(final byte[] key,
final byte[] valueWithTimestamp) {
if (valueWithTimestamp == null) {
try {
db.delete(oldColumnFamily, wOptions, key);
} catch (final RocksDBException e) {
// String format is happening in wrapping stores. So formatted message is thrown from wrapping stores.
throw new ProcessorStateException("Error while removing key from store " + name, e);
}
try {
db.delete(newColumnFamily, wOptions, key);
} catch (final RocksDBException e) {
// String format is happening in wrapping stores. So formatted message is thrown from wrapping stores.
throw new ProcessorStateException("Error while removing key from store " + name, e);
}
} else {
try {
db.delete(oldColumnFamily, wOptions, key);
} catch (final RocksDBException e) {
// String format is happening in wrapping stores. So formatted message is thrown from wrapping stores.
throw new ProcessorStateException("Error while removing key from store " + name, e);
}
try {
db.put(newColumnFamily, wOptions, key, valueWithTimestamp);
} catch (final RocksDBException e) {
// String format is happening in wrapping stores. So formatted message is thrown from wrapping stores.
throw new ProcessorStateException("Error while putting key/value into store " + name, e);
}
}
}
@Override
public void prepareBatch(final List<KeyValue<Bytes, byte[]>> entries,
final WriteBatch batch) throws RocksDBException {
for (final KeyValue<Bytes, byte[]> entry : entries) {
Objects.requireNonNull(entry.key, "key cannot be null");
if (entry.value == null) {
batch.delete(oldColumnFamily, entry.key.get());
batch.delete(newColumnFamily, entry.key.get());
} else {
batch.delete(oldColumnFamily, entry.key.get());
batch.put(newColumnFamily, entry.key.get(), entry.value);
}
}
}
@Override
public byte[] get(final byte[] key) throws RocksDBException {
final byte[] valueWithTimestamp = db.get(newColumnFamily, key);
if (valueWithTimestamp != null) {
return valueWithTimestamp;
}
final byte[] plainValue = db.get(oldColumnFamily, key);
if (plainValue != null) {
final byte[] valueWithUnknownTimestamp = getValueWithUnknownTimestamp(plainValue);
// this does only work, because the changelog topic contains correct data already
// for other format changes, we cannot take this short cut and can only migrate data
// from old to new store on put()
put(key, valueWithUnknownTimestamp);
return valueWithUnknownTimestamp;
}
return null;
}
@Override
public byte[] getOnly(final byte[] key) throws RocksDBException {
final byte[] valueWithTimestamp = db.get(newColumnFamily, key);
if (valueWithTimestamp != null) {
return valueWithTimestamp;
}
final byte[] plainValue = db.get(oldColumnFamily, key);
if (plainValue != null) {
return getValueWithUnknownTimestamp(plainValue);
}
return null;
}
@Override
public KeyValueIterator<Bytes, byte[]> range(final Bytes from,
final Bytes to) {
return new RocksDBDualCFRangeIterator(
name,
db.newIterator(newColumnFamily),
db.newIterator(oldColumnFamily),
from,
to);
}
@Override
public KeyValueIterator<Bytes, byte[]> all() {
final RocksIterator innerIterWithTimestamp = db.newIterator(newColumnFamily);
innerIterWithTimestamp.seekToFirst();
final RocksIterator innerIterNoTimestamp = db.newIterator(oldColumnFamily);
innerIterNoTimestamp.seekToFirst();
return new RocksDBDualCFIterator(name, innerIterWithTimestamp, innerIterNoTimestamp);
}
@Override
public long approximateNumEntries() throws RocksDBException {
return db.getLongProperty(oldColumnFamily, "rocksdb.estimate-num-keys")
+ db.getLongProperty(newColumnFamily, "rocksdb.estimate-num-keys");
}
@Override
public void flush() throws RocksDBException {
db.flush(fOptions, oldColumnFamily);
db.flush(fOptions, newColumnFamily);
}
@Override
public void prepareBatchForRestore(final Collection<KeyValue<byte[], byte[]>> records,
final WriteBatch batch) throws RocksDBException {
for (final KeyValue<byte[], byte[]> record : records) {
if (record.value == null) {
batch.delete(oldColumnFamily, record.key);
batch.delete(newColumnFamily, record.key);
} else {
batch.delete(oldColumnFamily, record.key);
batch.put(newColumnFamily, record.key, record.value);
}
}
}
@Override
public void close() {
oldColumnFamily.close();
newColumnFamily.close();
}
@Override
public void toggleDbForBulkLoading() {
try {
db.compactRange(oldColumnFamily, true, 1, 0);
} catch (final RocksDBException e) {
throw new ProcessorStateException("Error while range compacting during restoring store " + name, e);
}
try {
db.compactRange(newColumnFamily, true, 1, 0);
} catch (final RocksDBException e) {
throw new ProcessorStateException("Error while range compacting during restoring store " + name, e);
}
}
}
private class RocksDBDualCFIterator extends AbstractIterator<KeyValue<Bytes, byte[]>>
implements KeyValueIterator<Bytes, byte[]> {
// RocksDB's JNI interface does not expose getters/setters that allow the
// comparator to be pluggable, and the default is lexicographic, so it's
// safe to just force lexicographic comparator here for now.
private final Comparator<byte[]> comparator = Bytes.BYTES_LEXICO_COMPARATOR;
private final String storeName;
private final RocksIterator iterWithTimestamp;
private final RocksIterator iterNoTimestamp;
private volatile boolean open = true;
private byte[] nextWithTimestamp;
private byte[] nextNoTimestamp;
private KeyValue<Bytes, byte[]> next;
RocksDBDualCFIterator(final String storeName,
final RocksIterator iterWithTimestamp,
final RocksIterator iterNoTimestamp) {
this.iterWithTimestamp = iterWithTimestamp;
this.iterNoTimestamp = iterNoTimestamp;
this.storeName = storeName;
}
@Override
public synchronized boolean hasNext() {
if (!open) {
throw new InvalidStateStoreException(String.format("RocksDB iterator for store %s has closed", storeName));
}
return super.hasNext();
}
@Override
public synchronized KeyValue<Bytes, byte[]> next() {
return super.next();
}
@Override
public KeyValue<Bytes, byte[]> makeNext() {
if (nextNoTimestamp == null && iterNoTimestamp.isValid()) {
nextNoTimestamp = iterNoTimestamp.key();
}
if (nextWithTimestamp == null && iterWithTimestamp.isValid()) {
nextWithTimestamp = iterWithTimestamp.key();
}
if (nextNoTimestamp == null && !iterNoTimestamp.isValid()) {
if (nextWithTimestamp == null && !iterWithTimestamp.isValid()) {
return allDone();
} else {
next = KeyValue.pair(new Bytes(nextWithTimestamp), iterWithTimestamp.value());
nextWithTimestamp = null;
iterWithTimestamp.next();
}
} else {
if (nextWithTimestamp == null) {
next = KeyValue.pair(new Bytes(nextNoTimestamp), getValueWithUnknownTimestamp(iterNoTimestamp.value()));
nextNoTimestamp = null;
iterNoTimestamp.next();
} else {
if (comparator.compare(nextNoTimestamp, nextWithTimestamp) <= 0) {
next = KeyValue.pair(new Bytes(nextNoTimestamp), getValueWithUnknownTimestamp(iterNoTimestamp.value()));
nextNoTimestamp = null;
iterNoTimestamp.next();
} else {
next = KeyValue.pair(new Bytes(nextWithTimestamp), iterWithTimestamp.value());
nextWithTimestamp = null;
iterWithTimestamp.next();
}
}
}
return next;
}
@Override
public void remove() {
throw new UnsupportedOperationException("RocksDB iterator does not support remove()");
}
@Override
public synchronized void close() {
openIterators.remove(this);
iterNoTimestamp.close();
iterWithTimestamp.close();
open = false;
}
@Override
public Bytes peekNextKey() {
if (!hasNext()) {
throw new NoSuchElementException();
}
return next.key;
}
}
private class RocksDBDualCFRangeIterator extends RocksDBDualCFIterator {
// RocksDB's JNI interface does not expose getters/setters that allow the
// comparator to be pluggable, and the default is lexicographic, so it's
// safe to just force lexicographic comparator here for now.
private final Comparator<byte[]> comparator = Bytes.BYTES_LEXICO_COMPARATOR;
private final byte[] upperBoundKey;
RocksDBDualCFRangeIterator(final String storeName,
final RocksIterator iterWithTimestamp,
final RocksIterator iterNoTimestamp,
final Bytes from,
final Bytes to) {
super(storeName, iterWithTimestamp, iterNoTimestamp);
iterWithTimestamp.seek(from.get());
iterNoTimestamp.seek(from.get());
upperBoundKey = to.get();
if (upperBoundKey == null) {
throw new NullPointerException("RocksDBDualCFRangeIterator: upperBoundKey is null for key " + to);
}
}
@Override
public KeyValue<Bytes, byte[]> makeNext() {
final KeyValue<Bytes, byte[]> next = super.makeNext();
if (next == null) {
return allDone();
} else {
if (comparator.compare(next.key.get(), upperBoundKey) <= 0) {
return next;
} else {
return allDone();
}
}
}
}
}

89
streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDbIterator.java

@ -0,0 +1,89 @@ @@ -0,0 +1,89 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.streams.state.internals;
import org.apache.kafka.common.utils.AbstractIterator;
import org.apache.kafka.common.utils.Bytes;
import org.apache.kafka.streams.KeyValue;
import org.apache.kafka.streams.errors.InvalidStateStoreException;
import org.apache.kafka.streams.state.KeyValueIterator;
import org.rocksdb.RocksIterator;
import java.util.NoSuchElementException;
import java.util.Set;
class RocksDbIterator extends AbstractIterator<KeyValue<Bytes, byte[]>> implements KeyValueIterator<Bytes, byte[]> {
private final String storeName;
private final RocksIterator iter;
private final Set<KeyValueIterator<Bytes, byte[]>> openIterators;
private volatile boolean open = true;
private KeyValue<Bytes, byte[]> next;
RocksDbIterator(final String storeName,
final RocksIterator iter,
final Set<KeyValueIterator<Bytes, byte[]>> openIterators) {
this.storeName = storeName;
this.iter = iter;
this.openIterators = openIterators;
}
@Override
public synchronized boolean hasNext() {
if (!open) {
throw new InvalidStateStoreException(String.format("RocksDB iterator for store %s has closed", storeName));
}
return super.hasNext();
}
@Override
public KeyValue<Bytes, byte[]> makeNext() {
if (!iter.isValid()) {
return allDone();
} else {
next = getKeyValue();
iter.next();
return next;
}
}
private KeyValue<Bytes, byte[]> getKeyValue() {
return new KeyValue<>(new Bytes(iter.key()), iter.value());
}
@Override
public void remove() {
throw new UnsupportedOperationException("RocksDB iterator does not support remove()");
}
@Override
public synchronized void close() {
openIterators.remove(this);
iter.close();
open = false;
}
@Override
public Bytes peekNextKey() {
if (!hasNext()) {
throw new NoSuchElementException();
}
return next.key;
}
}

7
streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDbKeyValueBytesStoreSupplier.java

@ -23,9 +23,12 @@ import org.apache.kafka.streams.state.KeyValueStore; @@ -23,9 +23,12 @@ import org.apache.kafka.streams.state.KeyValueStore;
public class RocksDbKeyValueBytesStoreSupplier implements KeyValueBytesStoreSupplier {
private final String name;
private final boolean returnTimestampedStore;
public RocksDbKeyValueBytesStoreSupplier(final String name) {
public RocksDbKeyValueBytesStoreSupplier(final String name,
final boolean returnTimestampedStore) {
this.name = name;
this.returnTimestampedStore = returnTimestampedStore;
}
@Override
@ -35,7 +38,7 @@ public class RocksDbKeyValueBytesStoreSupplier implements KeyValueBytesStoreSupp @@ -35,7 +38,7 @@ public class RocksDbKeyValueBytesStoreSupplier implements KeyValueBytesStoreSupp
@Override
public KeyValueStore<Bytes, byte[]> get() {
return new RocksDBStore(name);
return returnTimestampedStore ? new RocksDBTimestampedStore(name) : new RocksDBStore(name);
}
@Override

36
streams/src/main/java/org/apache/kafka/streams/state/internals/StoreProxyUtils.java

@ -0,0 +1,36 @@ @@ -0,0 +1,36 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.streams.state.internals;
import java.nio.ByteBuffer;
import static org.apache.kafka.clients.consumer.ConsumerRecord.NO_TIMESTAMP;
class StoreProxyUtils {
static byte[] getValueWithUnknownTimestamp(final byte[] rawValue) {
if (rawValue == null) {
return null;
}
return ByteBuffer
.allocate(8 + rawValue.length)
.putLong(NO_TIMESTAMP)
.put(rawValue)
.array();
}
}

263
streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBGenericOptionsToDbOptionsColumnFamilyOptionsAdapterTest.java

@ -0,0 +1,263 @@ @@ -0,0 +1,263 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.streams.state.internals;
import org.easymock.EasyMockRunner;
import org.easymock.Mock;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.rocksdb.AccessHint;
import org.rocksdb.BuiltinComparator;
import org.rocksdb.ColumnFamilyOptions;
import org.rocksdb.CompactionPriority;
import org.rocksdb.CompactionStyle;
import org.rocksdb.ComparatorOptions;
import org.rocksdb.CompressionType;
import org.rocksdb.DBOptions;
import org.rocksdb.Env;
import org.rocksdb.InfoLogLevel;
import org.rocksdb.LRUCache;
import org.rocksdb.Logger;
import org.rocksdb.Options;
import org.rocksdb.PlainTableConfig;
import org.rocksdb.RateLimiter;
import org.rocksdb.RocksDB;
import org.rocksdb.SstFileManager;
import org.rocksdb.StringAppendOperator;
import org.rocksdb.VectorMemTableConfig;
import org.rocksdb.WALRecoveryMode;
import org.rocksdb.util.BytewiseComparator;
import java.lang.reflect.InvocationTargetException;
import java.lang.reflect.Method;
import java.util.ArrayList;
import java.util.LinkedList;
import java.util.List;
import static org.easymock.EasyMock.replay;
import static org.easymock.EasyMock.reset;
import static org.easymock.EasyMock.verify;
import static org.hamcrest.Matchers.instanceOf;
import static org.hamcrest.Matchers.matchesPattern;
import static org.junit.Assert.assertThat;
import static org.junit.Assert.fail;
/**
* The purpose of this test is, to catch interface changes if we upgrade {@link RocksDB}.
* Using reflections, we make sure the {@link RocksDBGenericOptionsToDbOptionsColumnFamilyOptionsAdapter} maps all
* methods from {@link DBOptions} and {@link ColumnFamilyOptions} to/from {@link Options} correctly.
*/
@RunWith(EasyMockRunner.class)
public class RocksDBGenericOptionsToDbOptionsColumnFamilyOptionsAdapterTest {
private final List<String> ignoreMethods = new LinkedList<String>() {
{
add("isOwningHandle");
add("dispose");
add("wait");
add("equals");
add("getClass");
add("hashCode");
add("notify");
add("notifyAll");
add("toString");
}
};
@Mock
private DBOptions dbOptions;
@Mock
private ColumnFamilyOptions columnFamilyOptions;
@Test
public void shouldOverwriteAllOptionsMethods() throws Exception {
for (final Method method : Options.class.getMethods()) {
if (!ignoreMethods.contains(method.getName())) {
RocksDBGenericOptionsToDbOptionsColumnFamilyOptionsAdapter.class
.getDeclaredMethod(method.getName(), method.getParameterTypes());
}
}
}
@Test
public void shouldForwardAllDbOptionsCalls() throws Exception {
for (final Method method : Options.class.getMethods()) {
if (!ignoreMethods.contains(method.getName())) {
try {
DBOptions.class.getMethod(method.getName(), method.getParameterTypes());
verifyDBOptionsMethodCall(method);
} catch (final NoSuchMethodException expectedAndSwallow) { }
}
}
}
private void verifyDBOptionsMethodCall(final Method method) throws Exception {
final RocksDBGenericOptionsToDbOptionsColumnFamilyOptionsAdapter optionsFacadeDbOptions
= new RocksDBGenericOptionsToDbOptionsColumnFamilyOptionsAdapter(dbOptions, new ColumnFamilyOptions());
final Object[] parameters = getDBOptionsParameters(method.getParameterTypes());
try {
reset(dbOptions);
replay(dbOptions);
method.invoke(optionsFacadeDbOptions, parameters);
verify();
fail("Should have called DBOptions." + method.getName() + "()");
} catch (final InvocationTargetException undeclaredMockMethodCall) {
assertThat(undeclaredMockMethodCall.getCause(), instanceOf(AssertionError.class));
assertThat(undeclaredMockMethodCall.getCause().getMessage().trim(),
matchesPattern("Unexpected method call DBOptions\\." + method.getName() + "((.*\n*)*):"));
}
}
private Object[] getDBOptionsParameters(final Class<?>[] parameterTypes) throws Exception {
final Object[] parameters = new Object[parameterTypes.length];
for (int i = 0; i < parameterTypes.length; ++i) {
switch (parameterTypes[i].getName()) {
case "boolean":
parameters[i] = true;
break;
case "int":
parameters[i] = 0;
break;
case "long":
parameters[i] = 0L;
break;
case "java.util.Collection":
parameters[i] = new ArrayList<>();
break;
case "org.rocksdb.AccessHint":
parameters[i] = AccessHint.NONE;
break;
case "org.rocksdb.Cache":
parameters[i] = new LRUCache(1L);
break;
case "org.rocksdb.Env":
parameters[i] = Env.getDefault();
break;
case "org.rocksdb.InfoLogLevel":
parameters[i] = InfoLogLevel.FATAL_LEVEL;
break;
case "org.rocksdb.Logger":
parameters[i] = new Logger(new Options()) {
@Override
protected void log(final InfoLogLevel infoLogLevel, final String logMsg) {}
};
break;
case "org.rocksdb.RateLimiter":
parameters[i] = new RateLimiter(1L);
break;
case "org.rocksdb.SstFileManager":
parameters[i] = new SstFileManager(Env.getDefault());
break;
case "org.rocksdb.WALRecoveryMode":
parameters[i] = WALRecoveryMode.AbsoluteConsistency;
break;
default:
parameters[i] = parameterTypes[i].newInstance();
}
}
return parameters;
}
@Test
public void shouldForwardAllColumnFamilyCalls() throws Exception {
for (final Method method : Options.class.getMethods()) {
if (!ignoreMethods.contains(method.getName())) {
try {
ColumnFamilyOptions.class.getMethod(method.getName(), method.getParameterTypes());
verifyColumnFamilyOptionsMethodCall(method);
} catch (final NoSuchMethodException expectedAndSwallow) { }
}
}
}
private void verifyColumnFamilyOptionsMethodCall(final Method method) throws Exception {
final RocksDBGenericOptionsToDbOptionsColumnFamilyOptionsAdapter optionsFacadeColumnFamilyOptions
= new RocksDBGenericOptionsToDbOptionsColumnFamilyOptionsAdapter(new DBOptions(), columnFamilyOptions);
final Object[] parameters = getColumnFamilyOptionsParameters(method.getParameterTypes());
try {
reset(columnFamilyOptions);
replay(columnFamilyOptions);
method.invoke(optionsFacadeColumnFamilyOptions, parameters);
verify();
fail("Should have called ColumnFamilyOptions." + method.getName() + "()");
} catch (final InvocationTargetException undeclaredMockMethodCall) {
assertThat(undeclaredMockMethodCall.getCause(), instanceOf(AssertionError.class));
assertThat(undeclaredMockMethodCall.getCause().getMessage().trim(),
matchesPattern("Unexpected method call ColumnFamilyOptions\\." + method.getName() + "(.*)"));
}
}
private Object[] getColumnFamilyOptionsParameters(final Class<?>[] parameterTypes) throws Exception {
final Object[] parameters = new Object[parameterTypes.length];
for (int i = 0; i < parameterTypes.length; ++i) {
switch (parameterTypes[i].getName()) {
case "boolean":
parameters[i] = true;
break;
case "double":
parameters[i] = 0.0d;
break;
case "int":
parameters[i] = 0;
break;
case "long":
parameters[i] = 0L;
break;
case "[I":
parameters[i] = new int[0];
break;
case "java.util.List":
parameters[i] = new ArrayList<>();
break;
case "org.rocksdb.AbstractComparator":
parameters[i] = new BytewiseComparator(new ComparatorOptions());
break;
case "org.rocksdb.BuiltinComparator":
parameters[i] = BuiltinComparator.BYTEWISE_COMPARATOR;
break;
case "org.rocksdb.CompactionPriority":
parameters[i] = CompactionPriority.ByCompensatedSize;
break;
case "org.rocksdb.CompactionStyle":
parameters[i] = CompactionStyle.UNIVERSAL;
break;
case "org.rocksdb.CompressionType":
parameters[i] = CompressionType.NO_COMPRESSION;
break;
case "org.rocksdb.MemTableConfig":
parameters[i] = new VectorMemTableConfig();
break;
case "org.rocksdb.MergeOperator":
parameters[i] = new StringAppendOperator();
break;
case "org.rocksdb.TableFormatConfig":
parameters[i] = new PlainTableConfig();
break;
default:
parameters[i] = parameterTypes[i].newInstance();
}
}
return parameters;
}
}

2
streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBSessionStoreTest.java

@ -280,7 +280,7 @@ public class RocksDBSessionStoreTest { @@ -280,7 +280,7 @@ public class RocksDBSessionStoreTest {
return results;
}
static <K, V> List<V> valuesToList(final KeyValueIterator<Windowed<K>, V> iterator) {
private static <K, V> List<V> valuesToList(final KeyValueIterator<Windowed<K>, V> iterator) {
final List<V> results = new ArrayList<>();
while (iterator.hasNext()) {
results.add(iterator.next().value);

83
streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBStoreTest.java

@ -58,18 +58,21 @@ import static org.junit.Assert.assertTrue; @@ -58,18 +58,21 @@ import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
public class RocksDBStoreTest {
private static boolean enableBloomFilters = false;
final static String DB_NAME = "db-name";
private File dir;
private final Serializer<String> stringSerializer = new StringSerializer();
private final Deserializer<String> stringDeserializer = new StringDeserializer();
private RocksDBStore rocksDBStore;
private InternalMockProcessorContext context;
private File dir;
private static boolean enableBloomFilters = false;
InternalMockProcessorContext context;
RocksDBStore rocksDBStore;
@Before
public void setUp() {
final Properties props = StreamsTestUtils.getStreamsConfig();
props.put(StreamsConfig.ROCKSDB_CONFIG_SETTER_CLASS_CONFIG, MockRocksDbConfigSetter.class);
rocksDBStore = new RocksDBStore("test");
rocksDBStore = getRocksDBStore();
dir = TestUtils.tempDirectory();
context = new InternalMockProcessorContext(dir,
Serdes.String(),
@ -77,6 +80,10 @@ public class RocksDBStoreTest { @@ -77,6 +80,10 @@ public class RocksDBStoreTest {
new StreamsConfig(props));
}
RocksDBStore getRocksDBStore() {
return new RocksDBStore(DB_NAME);
}
@After
public void tearDown() {
rocksDBStore.close();
@ -98,7 +105,8 @@ public class RocksDBStoreTest { @@ -98,7 +105,8 @@ public class RocksDBStoreTest {
assertThat(rocksDBStore.getOptions().level0FileNumCompactionTrigger(), equalTo(10));
assertThat(rocksDBStore.getOptions().level0SlowdownWritesTrigger(), equalTo(20));
assertThat(rocksDBStore.getOptions().level0StopWritesTrigger(), equalTo(36)); }
assertThat(rocksDBStore.getOptions().level0StopWritesTrigger(), equalTo(36));
}
@Test
public void shouldNotThrowExceptionOnRestoreWhenThereIsPreExistingRocksDbFiles() {
@ -117,7 +125,7 @@ public class RocksDBStoreTest { @@ -117,7 +125,7 @@ public class RocksDBStoreTest {
final byte[] restoredValue = "restoredValue".getBytes(UTF_8);
restoreBytes.add(KeyValue.pair(restoredKey, restoredValue));
context.restore("test", restoreBytes);
context.restore(DB_NAME, restoreBytes);
assertThat(
stringDeserializer.deserialize(
@ -168,20 +176,20 @@ public class RocksDBStoreTest { @@ -168,20 +176,20 @@ public class RocksDBStoreTest {
rocksDBStore.flush();
assertEquals(
"a",
stringDeserializer.deserialize(
null,
rocksDBStore.get(new Bytes(stringSerializer.serialize(null, "1")))),
"a");
rocksDBStore.get(new Bytes(stringSerializer.serialize(null, "1")))));
assertEquals(
"b",
stringDeserializer.deserialize(
null,
rocksDBStore.get(new Bytes(stringSerializer.serialize(null, "2")))),
"b");
rocksDBStore.get(new Bytes(stringSerializer.serialize(null, "2")))));
assertEquals(
"c",
stringDeserializer.deserialize(
null,
rocksDBStore.get(new Bytes(stringSerializer.serialize(null, "3")))),
"c");
rocksDBStore.get(new Bytes(stringSerializer.serialize(null, "3")))));
}
@Test
@ -222,20 +230,20 @@ public class RocksDBStoreTest { @@ -222,20 +230,20 @@ public class RocksDBStoreTest {
context.restore(rocksDBStore.name(), entries);
assertEquals(
"a",
stringDeserializer.deserialize(
null,
rocksDBStore.get(new Bytes(stringSerializer.serialize(null, "1")))),
"a");
rocksDBStore.get(new Bytes(stringSerializer.serialize(null, "1")))));
assertEquals(
"b",
stringDeserializer.deserialize(
null,
rocksDBStore.get(new Bytes(stringSerializer.serialize(null, "2")))),
"b");
rocksDBStore.get(new Bytes(stringSerializer.serialize(null, "2")))));
assertEquals(
"c",
stringDeserializer.deserialize(
null,
rocksDBStore.get(new Bytes(stringSerializer.serialize(null, "3")))),
"c");
rocksDBStore.get(new Bytes(stringSerializer.serialize(null, "3")))));
}
@Test
@ -249,7 +257,7 @@ public class RocksDBStoreTest { @@ -249,7 +257,7 @@ public class RocksDBStoreTest {
rocksDBStore.putIfAbsent(keyBytes, valueBytesUpdate);
final String retrievedValue = stringDeserializer.deserialize(null, rocksDBStore.get(keyBytes));
assertEquals(retrievedValue, "A");
assertEquals("A", retrievedValue);
}
@Test
@ -294,20 +302,20 @@ public class RocksDBStoreTest { @@ -294,20 +302,20 @@ public class RocksDBStoreTest {
assertThat(keys, equalTo(Utils.mkSet("1", "2", "3")));
assertEquals(
"restored",
stringDeserializer.deserialize(
null,
rocksDBStore.get(new Bytes(stringSerializer.serialize(null, "1")))),
"restored");
rocksDBStore.get(new Bytes(stringSerializer.serialize(null, "1")))));
assertEquals(
"b",
stringDeserializer.deserialize(
null,
rocksDBStore.get(new Bytes(stringSerializer.serialize(null, "2")))),
"b");
rocksDBStore.get(new Bytes(stringSerializer.serialize(null, "2")))));
assertEquals(
"c",
stringDeserializer.deserialize(
null,
rocksDBStore.get(new Bytes(stringSerializer.serialize(null, "3")))),
"c");
rocksDBStore.get(new Bytes(stringSerializer.serialize(null, "3")))));
}
@Test
@ -319,20 +327,20 @@ public class RocksDBStoreTest { @@ -319,20 +327,20 @@ public class RocksDBStoreTest {
context.restore(rocksDBStore.name(), entries);
assertEquals(
"a",
stringDeserializer.deserialize(
null,
rocksDBStore.get(new Bytes(stringSerializer.serialize(null, "1")))),
"a");
rocksDBStore.get(new Bytes(stringSerializer.serialize(null, "1")))));
assertEquals(
"b",
stringDeserializer.deserialize(
null,
rocksDBStore.get(new Bytes(stringSerializer.serialize(null, "2")))),
"b");
rocksDBStore.get(new Bytes(stringSerializer.serialize(null, "2")))));
assertEquals(
"c",
stringDeserializer.deserialize(
null,
rocksDBStore.get(new Bytes(stringSerializer.serialize(null, "3")))),
"c");
rocksDBStore.get(new Bytes(stringSerializer.serialize(null, "3")))));
entries.clear();
@ -352,8 +360,6 @@ public class RocksDBStoreTest { @@ -352,8 +360,6 @@ public class RocksDBStoreTest {
assertThat(keys, equalTo(Utils.mkSet("2", "3")));
}
@Test
public void shouldThrowNullPointerExceptionOnNullPut() {
rocksDBStore.init(context, rocksDBStore);
@ -424,13 +430,14 @@ public class RocksDBStoreTest { @@ -424,13 +430,14 @@ public class RocksDBStoreTest {
final Properties props = StreamsTestUtils.getStreamsConfig();
props.put(StreamsConfig.ROCKSDB_CONFIG_SETTER_CLASS_CONFIG, TestingBloomFilterRocksDBConfigSetter.class);
rocksDBStore = new RocksDBStore("test");
rocksDBStore = getRocksDBStore();
dir = TestUtils.tempDirectory();
context = new InternalMockProcessorContext(dir,
Serdes.String(),
Serdes.String(),
new StreamsConfig(props));
enableBloomFilters = false;
rocksDBStore.init(context, rocksDBStore);
final List<String> expectedValues = new ArrayList<>();
@ -451,11 +458,11 @@ public class RocksDBStoreTest { @@ -451,11 +458,11 @@ public class RocksDBStoreTest {
assertFalse(TestingBloomFilterRocksDBConfigSetter.bloomFiltersSet);
rocksDBStore.close();
enableBloomFilters = true;
expectedIndex = 0;
// reopen with Bloom Filters enabled
// should open fine without errors
enableBloomFilters = true;
rocksDBStore.init(context, rocksDBStore);
for (final KeyValue<byte[], byte[]> keyValue : keyValues) {
@ -479,7 +486,7 @@ public class RocksDBStoreTest { @@ -479,7 +486,7 @@ public class RocksDBStoreTest {
public static class TestingBloomFilterRocksDBConfigSetter implements RocksDBConfigSetter {
static boolean bloomFiltersSet = false;
static boolean bloomFiltersSet;
@Override
public void setConfig(final String storeName, final Options options, final Map<String, Object> configs) {
@ -493,10 +500,10 @@ public class RocksDBStoreTest { @@ -493,10 +500,10 @@ public class RocksDBStoreTest {
bloomFiltersSet = true;
} else {
options.setOptimizeFiltersForHits(false);
bloomFiltersSet = false;
}
options.setTableFormatConfig(tableConfig);
}
}

293
streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBTimestampedStoreTest.java

@ -0,0 +1,293 @@ @@ -0,0 +1,293 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.streams.state.internals;
import org.apache.kafka.common.utils.Bytes;
import org.apache.kafka.streams.KeyValue;
import org.apache.kafka.streams.processor.internals.testutil.LogCaptureAppender;
import org.apache.kafka.streams.state.KeyValueIterator;
import org.hamcrest.core.IsNull;
import org.junit.Test;
import org.rocksdb.ColumnFamilyDescriptor;
import org.rocksdb.ColumnFamilyHandle;
import org.rocksdb.ColumnFamilyOptions;
import org.rocksdb.DBOptions;
import org.rocksdb.RocksDB;
import java.io.File;
import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.List;
import static java.util.Arrays.asList;
import static org.hamcrest.CoreMatchers.hasItem;
import static org.hamcrest.CoreMatchers.is;
import static org.junit.Assert.assertArrayEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertThat;
public class RocksDBTimestampedStoreTest extends RocksDBStoreTest {
RocksDBStore getRocksDBStore() {
return new RocksDBTimestampedStore(DB_NAME);
}
@Test
public void shouldMigrateDataFromDefaultToTimestampColumnFamily() throws Exception {
prepareOldStore();
LogCaptureAppender.setClassLoggerToDebug(RocksDBTimestampedStore.class);
final LogCaptureAppender appender = LogCaptureAppender.createAndRegister();
rocksDBStore.init(context, rocksDBStore);
assertThat(appender.getMessages(), hasItem("Opening store " + DB_NAME + " in upgrade mode"));
LogCaptureAppender.unregister(appender);
// approx: 7 entries on old CF, 0 in new CF
assertThat(rocksDBStore.approximateNumEntries(), is(7L));
// get()
// should be no-op on both CF
assertThat(rocksDBStore.get(new Bytes("unknown".getBytes())), new IsNull<>());
// approx: 7 entries on old CF, 0 in new CF
assertThat(rocksDBStore.approximateNumEntries(), is(7L));
// should migrate key1 from old to new CF
// must return timestamp plus value, ie, it's not 1 byte but 9 bytes
assertThat(rocksDBStore.get(new Bytes("key1".getBytes())).length, is(8 + 1));
// one delete on old CF, one put on new CF
// approx: 6 entries on old CF, 1 in new CF
assertThat(rocksDBStore.approximateNumEntries(), is(7L));
// put()
// should migrate key2 from old to new CF with new value
rocksDBStore.put(new Bytes("key2".getBytes()), "timestamp+22".getBytes());
// one delete on old CF, one put on new CF
// approx: 5 entries on old CF, 2 in new CF
assertThat(rocksDBStore.approximateNumEntries(), is(7L));
// should delete key3 from old and new CF
rocksDBStore.put(new Bytes("key3".getBytes()), null);
// count is off by one, due to two delete operations (even if one does not delete anything)
// approx: 4 entries on old CF, 1 in new CF
assertThat(rocksDBStore.approximateNumEntries(), is(5L));
// should add new key8 to new CF
rocksDBStore.put(new Bytes("key8".getBytes()), "timestamp+88888888".getBytes());
// one delete on old CF, one put on new CF
// approx: 3 entries on old CF, 2 in new CF
assertThat(rocksDBStore.approximateNumEntries(), is(5L));
// putIfAbsent()
// should migrate key4 from old to new CF with old value
assertThat(rocksDBStore.putIfAbsent(new Bytes("key4".getBytes()), "timestamp+4444".getBytes()).length, is(8 + 4));
// one delete on old CF, one put on new CF
// approx: 2 entries on old CF, 3 in new CF
assertThat(rocksDBStore.approximateNumEntries(), is(5L));
// should add new key11 to new CF
assertThat(rocksDBStore.putIfAbsent(new Bytes("key11".getBytes()), "timestamp+11111111111".getBytes()), new IsNull<>());
// one delete on old CF, one put on new CF
// approx: 1 entries on old CF, 4 in new CF
assertThat(rocksDBStore.approximateNumEntries(), is(5L));
// should not delete key5 but migrate to new CF
assertThat(rocksDBStore.putIfAbsent(new Bytes("key5".getBytes()), null).length, is(8 + 5));
// one delete on old CF, one put on new CF
// approx: 0 entries on old CF, 5 in new CF
assertThat(rocksDBStore.approximateNumEntries(), is(5L));
// should be no-op on both CF
assertThat(rocksDBStore.putIfAbsent(new Bytes("key12".getBytes()), null), new IsNull<>());
// two delete operation, however, only one is counted because old CF count was zero before already
// approx: 0 entries on old CF, 4 in new CF
assertThat(rocksDBStore.approximateNumEntries(), is(4L));
// delete()
// should delete key6 from old and new CF
assertThat(rocksDBStore.delete(new Bytes("key6".getBytes())).length, is(8 + 6));
// two delete operation, however, only one is counted because old CF count was zero before already
// approx: 0 entries on old CF, 3 in new CF
assertThat(rocksDBStore.approximateNumEntries(), is(3L));
iteratorsShouldNotMigrateData();
assertThat(rocksDBStore.approximateNumEntries(), is(3L));
rocksDBStore.close();
verifyOldAndNewColumnFamily();
}
private void iteratorsShouldNotMigrateData() {
// iterating should not migrate any data, but return all key over both CF (plus surrogate timestamps for old CF)
final KeyValueIterator<Bytes, byte[]> itAll = rocksDBStore.all();
{
final KeyValue<Bytes, byte[]> keyValue = itAll.next();
assertArrayEquals("key1".getBytes(), keyValue.key.get());
// unknown timestamp == -1 plus value == 1
assertArrayEquals(new byte[]{-1, -1, -1, -1, -1, -1, -1, -1, '1'}, keyValue.value);
}
{
final KeyValue<Bytes, byte[]> keyValue = itAll.next();
assertArrayEquals("key11".getBytes(), keyValue.key.get());
assertArrayEquals(new byte[]{'t', 'i', 'm', 'e', 's', 't', 'a', 'm', 'p', '+', '1', '1', '1', '1', '1', '1', '1', '1', '1', '1', '1'}, keyValue.value);
}
{
final KeyValue<Bytes, byte[]> keyValue = itAll.next();
assertArrayEquals("key2".getBytes(), keyValue.key.get());
assertArrayEquals(new byte[]{'t', 'i', 'm', 'e', 's', 't', 'a', 'm', 'p', '+', '2', '2'}, keyValue.value);
}
{
final KeyValue<Bytes, byte[]> keyValue = itAll.next();
assertArrayEquals("key4".getBytes(), keyValue.key.get());
// unknown timestamp == -1 plus value == 4444
assertArrayEquals(new byte[]{-1, -1, -1, -1, -1, -1, -1, -1, '4', '4', '4', '4'}, keyValue.value);
}
{
final KeyValue<Bytes, byte[]> keyValue = itAll.next();
assertArrayEquals("key5".getBytes(), keyValue.key.get());
// unknown timestamp == -1 plus value == 55555
assertArrayEquals(new byte[]{-1, -1, -1, -1, -1, -1, -1, -1, '5', '5', '5', '5', '5'}, keyValue.value);
}
{
final KeyValue<Bytes, byte[]> keyValue = itAll.next();
assertArrayEquals("key7".getBytes(), keyValue.key.get());
// unknown timestamp == -1 plus value == 7777777
assertArrayEquals(new byte[]{-1, -1, -1, -1, -1, -1, -1, -1, '7', '7', '7', '7', '7', '7', '7'}, keyValue.value);
}
{
final KeyValue<Bytes, byte[]> keyValue = itAll.next();
assertArrayEquals("key8".getBytes(), keyValue.key.get());
assertArrayEquals(new byte[]{'t', 'i', 'm', 'e', 's', 't', 'a', 'm', 'p', '+', '8', '8', '8', '8', '8', '8', '8', '8'}, keyValue.value);
}
assertFalse(itAll.hasNext());
itAll.close();
final KeyValueIterator<Bytes, byte[]> it =
rocksDBStore.range(new Bytes("key2".getBytes()), new Bytes("key5".getBytes()));
{
final KeyValue<Bytes, byte[]> keyValue = it.next();
assertArrayEquals("key2".getBytes(), keyValue.key.get());
assertArrayEquals(new byte[]{'t', 'i', 'm', 'e', 's', 't', 'a', 'm', 'p', '+', '2', '2'}, keyValue.value);
}
{
final KeyValue<Bytes, byte[]> keyValue = it.next();
assertArrayEquals("key4".getBytes(), keyValue.key.get());
// unknown timestamp == -1 plus value == 4444
assertArrayEquals(new byte[]{-1, -1, -1, -1, -1, -1, -1, -1, '4', '4', '4', '4'}, keyValue.value);
}
{
final KeyValue<Bytes, byte[]> keyValue = it.next();
assertArrayEquals("key5".getBytes(), keyValue.key.get());
// unknown timestamp == -1 plus value == 55555
assertArrayEquals(new byte[]{-1, -1, -1, -1, -1, -1, -1, -1, '5', '5', '5', '5', '5'}, keyValue.value);
}
assertFalse(it.hasNext());
it.close();
}
private void verifyOldAndNewColumnFamily() throws Exception {
final DBOptions dbOptions = new DBOptions();
final ColumnFamilyOptions columnFamilyOptions = new ColumnFamilyOptions();
final List<ColumnFamilyDescriptor> columnFamilyDescriptors = asList(
new ColumnFamilyDescriptor(RocksDB.DEFAULT_COLUMN_FAMILY, columnFamilyOptions),
new ColumnFamilyDescriptor("keyValueWithTimestamp".getBytes(StandardCharsets.UTF_8), columnFamilyOptions));
final List<ColumnFamilyHandle> columnFamilies = new ArrayList<>(columnFamilyDescriptors.size());
RocksDB db = RocksDB.open(
dbOptions,
new File(new File(context.stateDir(), "rocksdb"), DB_NAME).getAbsolutePath(),
columnFamilyDescriptors,
columnFamilies);
ColumnFamilyHandle noTimestampColumnFamily = columnFamilies.get(0);
final ColumnFamilyHandle withTimestampColumnFamily = columnFamilies.get(1);
assertThat(db.get(noTimestampColumnFamily, "unknown".getBytes()), new IsNull<>());
assertThat(db.get(noTimestampColumnFamily, "key1".getBytes()), new IsNull<>());
assertThat(db.get(noTimestampColumnFamily, "key2".getBytes()), new IsNull<>());
assertThat(db.get(noTimestampColumnFamily, "key3".getBytes()), new IsNull<>());
assertThat(db.get(noTimestampColumnFamily, "key4".getBytes()), new IsNull<>());
assertThat(db.get(noTimestampColumnFamily, "key5".getBytes()), new IsNull<>());
assertThat(db.get(noTimestampColumnFamily, "key6".getBytes()), new IsNull<>());
assertThat(db.get(noTimestampColumnFamily, "key7".getBytes()).length, is(7));
assertThat(db.get(noTimestampColumnFamily, "key8".getBytes()), new IsNull<>());
assertThat(db.get(noTimestampColumnFamily, "key11".getBytes()), new IsNull<>());
assertThat(db.get(noTimestampColumnFamily, "key12".getBytes()), new IsNull<>());
assertThat(db.get(withTimestampColumnFamily, "unknown".getBytes()), new IsNull<>());
assertThat(db.get(withTimestampColumnFamily, "key1".getBytes()).length, is(8 + 1));
assertThat(db.get(withTimestampColumnFamily, "key2".getBytes()).length, is(12));
assertThat(db.get(withTimestampColumnFamily, "key3".getBytes()), new IsNull<>());
assertThat(db.get(withTimestampColumnFamily, "key4".getBytes()).length, is(8 + 4));
assertThat(db.get(withTimestampColumnFamily, "key5".getBytes()).length, is(8 + 5));
assertThat(db.get(withTimestampColumnFamily, "key6".getBytes()), new IsNull<>());
assertThat(db.get(withTimestampColumnFamily, "key7".getBytes()), new IsNull<>());
assertThat(db.get(withTimestampColumnFamily, "key8".getBytes()).length, is(18));
assertThat(db.get(withTimestampColumnFamily, "key11".getBytes()).length, is(21));
assertThat(db.get(withTimestampColumnFamily, "key12".getBytes()), new IsNull<>());
db.close();
// check that still in upgrade mode
LogCaptureAppender appender = LogCaptureAppender.createAndRegister();
rocksDBStore.init(context, rocksDBStore);
assertThat(appender.getMessages(), hasItem("Opening store " + DB_NAME + " in upgrade mode"));
LogCaptureAppender.unregister(appender);
rocksDBStore.close();
// clear old CF
columnFamilies.clear();
db = RocksDB.open(
dbOptions,
new File(new File(context.stateDir(), "rocksdb"), DB_NAME).getAbsolutePath(),
columnFamilyDescriptors,
columnFamilies);
noTimestampColumnFamily = columnFamilies.get(0);
db.delete(noTimestampColumnFamily, "key7".getBytes());
db.close();
// check that still in regular mode
appender = LogCaptureAppender.createAndRegister();
rocksDBStore.init(context, rocksDBStore);
assertThat(appender.getMessages(), hasItem("Opening store " + DB_NAME + " in regular mode"));
LogCaptureAppender.unregister(appender);
}
private void prepareOldStore() {
final RocksDBStore keyValueStore = new RocksDBStore(DB_NAME);
keyValueStore.init(context, keyValueStore);
keyValueStore.put(new Bytes("key1".getBytes()), "1".getBytes());
keyValueStore.put(new Bytes("key2".getBytes()), "22".getBytes());
keyValueStore.put(new Bytes("key3".getBytes()), "333".getBytes());
keyValueStore.put(new Bytes("key4".getBytes()), "4444".getBytes());
keyValueStore.put(new Bytes("key5".getBytes()), "55555".getBytes());
keyValueStore.put(new Bytes("key6".getBytes()), "666666".getBytes());
keyValueStore.put(new Bytes("key7".getBytes()), "7777777".getBytes());
keyValueStore.close();
}
}
Loading…
Cancel
Save