diff --git a/streams/src/main/java/org/apache/kafka/streams/state/RocksDBConfigSetter.java b/streams/src/main/java/org/apache/kafka/streams/state/RocksDBConfigSetter.java index ec7ac8f6e9a..613d636d852 100644 --- a/streams/src/main/java/org/apache/kafka/streams/state/RocksDBConfigSetter.java +++ b/streams/src/main/java/org/apache/kafka/streams/state/RocksDBConfigSetter.java @@ -20,6 +20,9 @@ import org.rocksdb.Options; import java.util.Map; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + /** * An interface to that allows developers to customize the RocksDB settings for a given Store. * Please read the RocksDB Tuning Guide. @@ -29,12 +32,31 @@ import java.util.Map; */ public interface RocksDBConfigSetter { + Logger LOG = LoggerFactory.getLogger(RocksDBConfigSetter.class); + /** * Set the rocks db options for the provided storeName. * * @param storeName the name of the store being configured - * @param options the Rocks DB options + * @param options the RocksDB options * @param configs the configuration supplied to {@link org.apache.kafka.streams.StreamsConfig} */ void setConfig(final String storeName, final Options options, final Map configs); + + /** + * Close any user-constructed objects that inherit from {@code org.rocksdb.RocksObject}. + *

+ * Any object created with {@code new} in {@link RocksDBConfigSetter#setConfig setConfig()} and that inherits + * from {@code org.rocksdb.RocksObject} should have {@code org.rocksdb.RocksObject#close()} + * called on it here to avoid leaking off-heap memory. Objects to be closed can be saved by the user or retrieved + * back from {@code options} using its getter methods. + *

+ * Example objects needing to be closed include {@code org.rocksdb.Filter} and {@code org.rocksdb.Cache}. + * + * @param storeName the name of the store being configured + * @param options the RocksDB options + */ + default void close(final String storeName, final Options options) { + LOG.warn("The default close will be removed in 3.0.0 -- you should overwrite it if you have implemented RocksDBConfigSetter"); + } } diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBStore.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBStore.java index e4c416c231b..ed744686c2c 100644 --- a/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBStore.java +++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBStore.java @@ -92,6 +92,8 @@ public class RocksDBStore implements KeyValueStore, BulkLoadingSt FlushOptions fOptions; private BloomFilter filter; + private RocksDBConfigSetter configSetter; + private volatile boolean prepareForBulkload = false; ProcessorContext internalProcessorContext; // visible for testing @@ -154,7 +156,7 @@ public class RocksDBStore implements KeyValueStore, BulkLoadingSt (Class) configs.get(StreamsConfig.ROCKSDB_CONFIG_SETTER_CLASS_CONFIG); if (configSetterClass != null) { - final RocksDBConfigSetter configSetter = Utils.newInstance(configSetterClass); + configSetter = Utils.newInstance(configSetterClass); configSetter.setConfig(name, userSpecifiedOptions, configs); } @@ -390,6 +392,12 @@ public class RocksDBStore implements KeyValueStore, BulkLoadingSt open = false; closeOpenIterators(); + + if (configSetter != null) { + configSetter.close(name, userSpecifiedOptions); + configSetter = null; + } + dbAccessor.close(); userSpecifiedOptions.close(); wOptions.close();