diff --git a/streams/src/main/java/org/apache/kafka/streams/state/KeyValueStore.java b/streams/src/main/java/org/apache/kafka/streams/state/KeyValueStore.java index 36852290837..b104ad488db 100644 --- a/streams/src/main/java/org/apache/kafka/streams/state/KeyValueStore.java +++ b/streams/src/main/java/org/apache/kafka/streams/state/KeyValueStore.java @@ -30,42 +30,41 @@ import java.util.List; public interface KeyValueStore extends StateStore, ReadOnlyKeyValueStore { /** - * Update the value associated with this key + * Update the value associated with this key. * * @param key The key to associate the value to - * @param value The value to update, it can be null; - * if the serialized bytes are also null it is interpreted as deletes - * @throws NullPointerException If null is used for key. + * @param value The value to update, it can be {@code null}; + * if the serialized bytes are also {@code null} it is interpreted as deletes + * @throws NullPointerException If {@code null} is used for key. */ void put(K key, V value); /** - * Update the value associated with this key, unless a value - * is already associated with the key + * Update the value associated with this key, unless a value is already associated with the key. * * @param key The key to associate the value to - * @param value The value to update, it can be null; - * if the serialized bytes are also null it is interpreted as deletes - * @return The old value or null if there is no such key. - * @throws NullPointerException If null is used for key. + * @param value The value to update, it can be {@code null}; + * if the serialized bytes are also {@code null} it is interpreted as deletes + * @return The old value or {@code null} if there is no such key. + * @throws NullPointerException If {@code null} is used for key. */ V putIfAbsent(K key, V value); /** - * Update all the given key/value pairs + * Update all the given key/value pairs. * * @param entries A list of entries to put into the store; - * if the serialized bytes are also null it is interpreted as deletes - * @throws NullPointerException If null is used for key. + * if the serialized bytes are also {@code null} it is interpreted as deletes + * @throws NullPointerException If {@code null} is used for key. */ void putAll(List> entries); /** - * Delete the value from the store (if there is one) + * Delete the value from the store (if there is one). * * @param key The key - * @return The old value or null if there is no such key. - * @throws NullPointerException If null is used for key. + * @return The old value or {@code null} if there is no such key. + * @throws NullPointerException If {@code null} is used for key. */ V delete(K key); -} +} \ No newline at end of file diff --git a/streams/src/main/java/org/apache/kafka/streams/state/QueryableStoreType.java b/streams/src/main/java/org/apache/kafka/streams/state/QueryableStoreType.java index 11b849b6b01..6ba66725621 100644 --- a/streams/src/main/java/org/apache/kafka/streams/state/QueryableStoreType.java +++ b/streams/src/main/java/org/apache/kafka/streams/state/QueryableStoreType.java @@ -16,33 +16,34 @@ */ package org.apache.kafka.streams.state; +import org.apache.kafka.streams.KafkaStreams; import org.apache.kafka.streams.processor.StateStore; import org.apache.kafka.streams.state.internals.StateStoreProvider; /** - * Used to enable querying of custom {@link StateStore} types via the - * {@link org.apache.kafka.streams.KafkaStreams} - * API. - * @see QueryableStoreTypes + * Used to enable querying of custom {@link StateStore} types via the {@link KafkaStreams} API. * - * @param The store type + * @param The store type + * @see QueryableStoreTypes */ public interface QueryableStoreType { /** * Called when searching for {@link StateStore}s to see if they - * match the type expected by implementors of this interface + * match the type expected by implementors of this interface. + * * @param stateStore The stateStore * @return true if it is a match */ boolean accepts(final StateStore stateStore); /** - * Create an instance of T (usually a facade) that developers can use - * to query the underlying {@link StateStore}s + * Create an instance of {@code T} (usually a facade) that developers can use + * to query the underlying {@link StateStore}s. + * * @param storeProvider provides access to all the underlying StateStore instances * @param storeName The name of the Store - * @return T usually a read-only interface over a StateStore @see {@link QueryableStoreTypes.KeyValueStoreType} + * @return a read-only interface over a {@code StateStore} (cf. {@link QueryableStoreTypes.KeyValueStoreType}) */ T create(final StateStoreProvider storeProvider, final String storeName); -} +} \ No newline at end of file diff --git a/streams/src/main/java/org/apache/kafka/streams/state/ReadOnlyKeyValueStore.java b/streams/src/main/java/org/apache/kafka/streams/state/ReadOnlyKeyValueStore.java index 8725ebc35cb..9b2f8f5fc66 100644 --- a/streams/src/main/java/org/apache/kafka/streams/state/ReadOnlyKeyValueStore.java +++ b/streams/src/main/java/org/apache/kafka/streams/state/ReadOnlyKeyValueStore.java @@ -19,9 +19,8 @@ package org.apache.kafka.streams.state; import org.apache.kafka.streams.errors.InvalidStateStoreException; /** - * A key value store that only supports read operations. - * Implementations should be thread-safe as concurrent reads and writes - * are expected. + * A key-value store that only supports read operations. + * Implementations should be thread-safe as concurrent reads and writes are expected. * * Please note that this contract defines the thread-safe read functionality only; it does not * guarantee anything about whether the actual instance is writable by another thread, or diff --git a/streams/src/main/java/org/apache/kafka/streams/state/ReadOnlyWindowStore.java b/streams/src/main/java/org/apache/kafka/streams/state/ReadOnlyWindowStore.java index dea759f486c..0c46fc2e87b 100644 --- a/streams/src/main/java/org/apache/kafka/streams/state/ReadOnlyWindowStore.java +++ b/streams/src/main/java/org/apache/kafka/streams/state/ReadOnlyWindowStore.java @@ -21,8 +21,8 @@ import org.apache.kafka.streams.kstream.Windowed; /** * A window store that only supports read operations - * Implementations should be thread-safe as concurrent reads and writes - * are expected. + * Implementations should be thread-safe as concurrent reads and writes are expected. + * * @param Type of keys * @param Type of values */ diff --git a/streams/src/main/java/org/apache/kafka/streams/state/RocksDBConfigSetter.java b/streams/src/main/java/org/apache/kafka/streams/state/RocksDBConfigSetter.java index 1dba933f125..b65baa5cab0 100644 --- a/streams/src/main/java/org/apache/kafka/streams/state/RocksDBConfigSetter.java +++ b/streams/src/main/java/org/apache/kafka/streams/state/RocksDBConfigSetter.java @@ -21,8 +21,8 @@ import org.rocksdb.Options; import java.util.Map; /** - * An interface to that allows developers to customize the RocksDB settings - * for a given Store. Please read the RocksDB Tuning Guide. + * An interface to that allows developers to customize the RocksDB settings for a given Store. + * Please read the RocksDB Tuning Guide. */ public interface RocksDBConfigSetter { diff --git a/streams/src/main/java/org/apache/kafka/streams/state/SessionBytesStoreSupplier.java b/streams/src/main/java/org/apache/kafka/streams/state/SessionBytesStoreSupplier.java index 69540899ada..5c7bc253209 100644 --- a/streams/src/main/java/org/apache/kafka/streams/state/SessionBytesStoreSupplier.java +++ b/streams/src/main/java/org/apache/kafka/streams/state/SessionBytesStoreSupplier.java @@ -19,12 +19,14 @@ package org.apache.kafka.streams.state; import org.apache.kafka.common.utils.Bytes; /** - * A store supplier that can be used to create one or more {@link SessionStore SessionStore} instances of type <Byte, byte[]>. + * A store supplier that can be used to create one or more {@link SessionStore SessionStore<Byte, byte[]>} instances. * - * For any stores implementing the {@link SessionStore SessionStore} interface, null value bytes are considered as "not exist". This means: - * - * 1. Null value bytes in put operations should be treated as delete. - * 2. Null value bytes should never be returned in range query results. + * For any stores implementing the {@link SessionStore SessionStore<Byte, byte[]>} interface, {@code null} value + * bytes are considered as "not exist". This means: + *
    + *
  1. {@code null} value bytes in put operations should be treated as delete.
  2. + *
  3. {@code null} value bytes should never be returned in range query results.
  4. + *
*/ public interface SessionBytesStoreSupplier extends StoreSupplier> { @@ -42,4 +44,4 @@ public interface SessionBytesStoreSupplier extends StoreSupplier { */ T build(); - /** * Returns a Map containing any log configs that will be used when creating the changelog for the {@link StateStore}. *

diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBStore.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBStore.java index fbf7df3de8e..bf748fca2b4 100644 --- a/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBStore.java +++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBStore.java @@ -44,7 +44,6 @@ import org.rocksdb.WriteBatch; import org.rocksdb.WriteOptions; import java.io.File; -import java.io.FilenameFilter; import java.io.IOException; import java.nio.file.Files; import java.util.Collection; @@ -69,14 +68,11 @@ public class RocksDBStore implements KeyValueStore { private static final Pattern SST_FILE_EXTENSION = Pattern.compile(".*\\.sst"); - private static final int TTL_NOT_USED = -1; - private static final CompressionType COMPRESSION_TYPE = CompressionType.NO_COMPRESSION; private static final CompactionStyle COMPACTION_STYLE = CompactionStyle.UNIVERSAL; private static final long WRITE_BUFFER_SIZE = 16 * 1024 * 1024L; private static final long BLOCK_CACHE_SIZE = 50 * 1024 * 1024L; private static final long BLOCK_SIZE = 4096L; - private static final int TTL_SECONDS = TTL_NOT_USED; private static final int MAX_WRITE_BUFFERS = 3; private static final String DB_FILE_DIR = "rocksdb"; @@ -152,10 +148,15 @@ public class RocksDBStore implements KeyValueStore { options.prepareForBulkLoad(); } - this.dbDir = new File(new File(context.stateDir(), parentDir), this.name); + dbDir = new File(new File(context.stateDir(), parentDir), name); try { - this.db = openDB(this.dbDir, this.options, TTL_SECONDS); + try { + Files.createDirectories(dbDir.getParentFile().toPath()); + db = RocksDB.open(options, dbDir.getAbsolutePath()); + } catch (final RocksDBException e) { + throw new ProcessorStateException("Error opening store " + name + " at location " + dbDir.toString(), e); + } } catch (final IOException e) { throw new ProcessorStateException(e); } @@ -166,30 +167,13 @@ public class RocksDBStore implements KeyValueStore { public void init(final ProcessorContext context, final StateStore root) { // open the DB dir - this.internalProcessorContext = context; + internalProcessorContext = context; openDB(context); - this.batchingStateRestoreCallback = new RocksDBBatchingRestoreCallback(this); + batchingStateRestoreCallback = new RocksDBBatchingRestoreCallback(this); // value getter should always read directly from rocksDB // since it is only for values that are already flushed - context.register(root, this.batchingStateRestoreCallback); - } - - private RocksDB openDB(final File dir, - final Options options, - final int ttl) throws IOException { - try { - if (ttl == TTL_NOT_USED) { - Files.createDirectories(dir.getParentFile().toPath()); - return RocksDB.open(options, dir.getAbsolutePath()); - } else { - throw new UnsupportedOperationException("Change log is not supported for store " + this.name + " since it is TTL based."); - // TODO: support TTL with change log? - // return TtlDB.open(options, dir.toString(), ttl, false); - } - } catch (final RocksDBException e) { - throw new ProcessorStateException("Error opening store " + this.name + " at location " + dir.toString(), e); - } + context.register(root, batchingStateRestoreCallback); } // visible for testing @@ -199,7 +183,7 @@ public class RocksDBStore implements KeyValueStore { @Override public String name() { - return this.name; + return name; } @Override @@ -220,16 +204,16 @@ public class RocksDBStore implements KeyValueStore { private void validateStoreOpen() { if (!open) { - throw new InvalidStateStoreException("Store " + this.name + " is currently closed"); + throw new InvalidStateStoreException("Store " + name + " is currently closed"); } } private byte[] getInternal(final byte[] rawKey) { try { - return this.db.get(rawKey); + return db.get(rawKey); } catch (final RocksDBException e) { // String format is happening in wrapping stores. So formatted message is thrown from wrapping stores. - throw new ProcessorStateException("Error while getting value for key %s from store " + this.name, e); + throw new ProcessorStateException("Error while getting value for key %s from store " + name, e); } } @@ -238,18 +222,13 @@ public class RocksDBStore implements KeyValueStore { if (prepareForBulkload) { // if the store is not empty, we need to compact to get around the num.levels check // for bulk loading - final String[] sstFileNames = dbDir.list(new FilenameFilter() { - @Override - public boolean accept(final File dir, final String name) { - return SST_FILE_EXTENSION.matcher(name).matches(); - } - }); + final String[] sstFileNames = dbDir.list((dir, name) -> SST_FILE_EXTENSION.matcher(name).matches()); if (sstFileNames != null && sstFileNames.length > 0) { try { - this.db.compactRange(true, 1, 0); + db.compactRange(true, 1, 0); } catch (final RocksDBException e) { - throw new ProcessorStateException("Error while range compacting during restoring store " + this.name, e); + throw new ProcessorStateException("Error while range compacting during restoring store " + name, e); } } } @@ -279,7 +258,7 @@ public class RocksDBStore implements KeyValueStore { return originalValue; } - void restoreAllInternal(final Collection> records) { + private void restoreAllInternal(final Collection> records) { try (final WriteBatch batch = new WriteBatch()) { for (final KeyValue record : records) { if (record.value == null) { @@ -290,7 +269,7 @@ public class RocksDBStore implements KeyValueStore { } write(batch); } catch (final RocksDBException e) { - throw new ProcessorStateException("Error restoring batch to store " + this.name, e); + throw new ProcessorStateException("Error restoring batch to store " + name, e); } } @@ -301,14 +280,14 @@ public class RocksDBStore implements KeyValueStore { db.delete(wOptions, rawKey); } catch (final RocksDBException e) { // String format is happening in wrapping stores. So formatted message is thrown from wrapping stores. - throw new ProcessorStateException("Error while removing key %s from store " + this.name, e); + throw new ProcessorStateException("Error while removing key %s from store " + name, e); } } else { try { db.put(wOptions, rawKey, rawValue); } catch (final RocksDBException e) { // String format is happening in wrapping stores. So formatted message is thrown from wrapping stores. - throw new ProcessorStateException("Error while putting key %s value %s into store " + this.name, e); + throw new ProcessorStateException("Error while putting key %s value %s into store " + name, e); } } } @@ -330,7 +309,7 @@ public class RocksDBStore implements KeyValueStore { } write(batch); } catch (final RocksDBException e) { - throw new ProcessorStateException("Error while batch writing to store " + this.name, e); + throw new ProcessorStateException("Error while batch writing to store " + name, e); } } @@ -384,9 +363,9 @@ public class RocksDBStore implements KeyValueStore { validateStoreOpen(); final long value; try { - value = this.db.getLongProperty("rocksdb.estimate-num-keys"); + value = db.getLongProperty("rocksdb.estimate-num-keys"); } catch (final RocksDBException e) { - throw new ProcessorStateException("Error fetching property from store " + this.name, e); + throw new ProcessorStateException("Error fetching property from store " + name, e); } if (isOverflowing(value)) { return Long.MAX_VALUE; @@ -415,7 +394,7 @@ public class RocksDBStore implements KeyValueStore { try { db.flush(fOptions); } catch (final RocksDBException e) { - throw new ProcessorStateException("Error while executing flush from store " + this.name, e); + throw new ProcessorStateException("Error while executing flush from store " + name, e); } } @@ -480,7 +459,7 @@ public class RocksDBStore implements KeyValueStore { if (!iter.isValid()) { return allDone(); } else { - next = this.getKeyValue(); + next = getKeyValue(); iter.next(); return next; } @@ -524,8 +503,8 @@ public class RocksDBStore implements KeyValueStore { final Bytes to) { super(storeName, iter); iter.seek(from.get()); - this.rawToKey = to.get(); - if (this.rawToKey == null) { + rawToKey = to.get(); + if (rawToKey == null) { throw new NullPointerException("RocksDBRangeIterator: RawToKey is null for key " + to); } } @@ -537,7 +516,7 @@ public class RocksDBStore implements KeyValueStore { if (next == null) { return allDone(); } else { - if (comparator.compare(next.key.get(), this.rawToKey) <= 0) + if (comparator.compare(next.key.get(), rawToKey) <= 0) return next; else return allDone(); diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBStoreTest.java b/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBStoreTest.java index 5ae32eb4eff..b77b02dda26 100644 --- a/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBStoreTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBStoreTest.java @@ -39,7 +39,6 @@ import org.rocksdb.Options; import java.io.File; import java.io.IOException; -import java.io.UnsupportedEncodingException; import java.util.ArrayList; import java.util.HashSet; import java.util.List; @@ -47,6 +46,7 @@ import java.util.Map; import java.util.Properties; import java.util.Set; +import static java.nio.charset.StandardCharsets.UTF_8; import static org.hamcrest.CoreMatchers.equalTo; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertFalse; @@ -55,8 +55,8 @@ import static org.junit.Assert.assertTrue; import static org.junit.Assert.fail; public class RocksDBStoreTest { - private Serializer stringSerializer = new StringSerializer(); - private Deserializer stringDeserializer = new StringDeserializer(); + private final Serializer stringSerializer = new StringSerializer(); + private final Deserializer stringDeserializer = new StringDeserializer(); private RocksDBStore rocksDBStore; private InternalMockProcessorContext context; private File dir; @@ -94,7 +94,7 @@ public class RocksDBStoreTest { } @Test - public void shouldNotThrowExceptionOnRestoreWhenThereIsPreExistingRocksDbFiles() throws Exception { + public void shouldNotThrowExceptionOnRestoreWhenThereIsPreExistingRocksDbFiles() { rocksDBStore.init(context, rocksDBStore); final String message = "how can a 4 ounce bird carry a 2lb coconut"; @@ -106,8 +106,8 @@ public class RocksDBStoreTest { final List> restoreBytes = new ArrayList<>(); - final byte[] restoredKey = "restoredKey".getBytes("UTF-8"); - final byte[] restoredValue = "restoredValue".getBytes("UTF-8"); + final byte[] restoredKey = "restoredKey".getBytes(UTF_8); + final byte[] restoredValue = "restoredValue".getBytes(UTF_8); restoreBytes.add(KeyValue.pair(restoredKey, restoredValue)); context.restore("test", restoreBytes); @@ -191,7 +191,7 @@ public class RocksDBStoreTest { } @Test - public void shouldTogglePrepareForBulkloadSettingWhenPrexistingSstFiles() throws Exception { + public void shouldTogglePrepareForBulkloadSettingWhenPrexistingSstFiles() { final List> entries = getKeyValueEntries(); rocksDBStore.init(context, rocksDBStore); @@ -208,7 +208,7 @@ public class RocksDBStoreTest { } @Test - public void shouldRestoreAll() throws Exception { + public void shouldRestoreAll() { final List> entries = getKeyValueEntries(); rocksDBStore.init(context, rocksDBStore); @@ -246,9 +246,9 @@ public class RocksDBStoreTest { } @Test - public void shouldHandleDeletesOnRestoreAll() throws Exception { + public void shouldHandleDeletesOnRestoreAll() { final List> entries = getKeyValueEntries(); - entries.add(new KeyValue<>("1".getBytes("UTF-8"), null)); + entries.add(new KeyValue<>("1".getBytes(UTF_8), null)); rocksDBStore.init(context, rocksDBStore); context.restore(rocksDBStore.name(), entries); @@ -264,15 +264,15 @@ public class RocksDBStoreTest { } @Test - public void shouldHandleDeletesAndPutbackOnRestoreAll() throws Exception { + public void shouldHandleDeletesAndPutbackOnRestoreAll() { final List> entries = new ArrayList<>(); - entries.add(new KeyValue<>("1".getBytes("UTF-8"), "a".getBytes("UTF-8"))); - entries.add(new KeyValue<>("2".getBytes("UTF-8"), "b".getBytes("UTF-8"))); + entries.add(new KeyValue<>("1".getBytes(UTF_8), "a".getBytes(UTF_8))); + entries.add(new KeyValue<>("2".getBytes(UTF_8), "b".getBytes(UTF_8))); // this will be deleted - entries.add(new KeyValue<>("1".getBytes("UTF-8"), null)); - entries.add(new KeyValue<>("3".getBytes("UTF-8"), "c".getBytes("UTF-8"))); + entries.add(new KeyValue<>("1".getBytes(UTF_8), null)); + entries.add(new KeyValue<>("3".getBytes(UTF_8), "c".getBytes(UTF_8))); // this will restore key "1" as WriteBatch applies updates in order - entries.add(new KeyValue<>("1".getBytes("UTF-8"), "restored".getBytes("UTF-8"))); + entries.add(new KeyValue<>("1".getBytes(UTF_8), "restored".getBytes(UTF_8))); rocksDBStore.init(context, rocksDBStore); context.restore(rocksDBStore.name(), entries); @@ -304,7 +304,7 @@ public class RocksDBStoreTest { } @Test - public void shouldRestoreThenDeleteOnRestoreAll() throws Exception { + public void shouldRestoreThenDeleteOnRestoreAll() { final List> entries = getKeyValueEntries(); rocksDBStore.init(context, rocksDBStore); @@ -329,9 +329,9 @@ public class RocksDBStoreTest { entries.clear(); - entries.add(new KeyValue<>("2".getBytes("UTF-8"), "b".getBytes("UTF-8"))); - entries.add(new KeyValue<>("3".getBytes("UTF-8"), "c".getBytes("UTF-8"))); - entries.add(new KeyValue<>("1".getBytes("UTF-8"), null)); + entries.add(new KeyValue<>("2".getBytes(UTF_8), "b".getBytes(UTF_8))); + entries.add(new KeyValue<>("3".getBytes(UTF_8), "c".getBytes(UTF_8))); + entries.add(new KeyValue<>("1".getBytes(UTF_8), null)); context.restore(rocksDBStore.name(), entries); @@ -423,11 +423,11 @@ public class RocksDBStoreTest { } } - private List> getKeyValueEntries() throws UnsupportedEncodingException { + private List> getKeyValueEntries() { final List> entries = new ArrayList<>(); - entries.add(new KeyValue<>("1".getBytes("UTF-8"), "a".getBytes("UTF-8"))); - entries.add(new KeyValue<>("2".getBytes("UTF-8"), "b".getBytes("UTF-8"))); - entries.add(new KeyValue<>("3".getBytes("UTF-8"), "c".getBytes("UTF-8"))); + entries.add(new KeyValue<>("1".getBytes(UTF_8), "a".getBytes(UTF_8))); + entries.add(new KeyValue<>("2".getBytes(UTF_8), "b".getBytes(UTF_8))); + entries.add(new KeyValue<>("3".getBytes(UTF_8), "c".getBytes(UTF_8))); return entries; } }