Browse Source

KAFKA-8324: Add close() method to RocksDBConfigSetter (#6697)

Following KIP-453, this PR adds a default close() method to the RocksDBConfigSetter interface and calls it when closing a store.

Guozhang Wang <wangguoz@gmail.com>, Matthias J. Sax <mjsax@apache.org>,  John Roesler <john@confluent.io>, Bruno Cadonna <bruno@confluent.io>
pull/6717/head
A. Sophie Blee-Goldman 6 years ago committed by Bill Bejeck
parent
commit
b2826c6c2b
  1. 24
      streams/src/main/java/org/apache/kafka/streams/state/RocksDBConfigSetter.java
  2. 10
      streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBStore.java

24
streams/src/main/java/org/apache/kafka/streams/state/RocksDBConfigSetter.java

@ -20,6 +20,9 @@ import org.rocksdb.Options; @@ -20,6 +20,9 @@ import org.rocksdb.Options;
import java.util.Map;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* An interface to that allows developers to customize the RocksDB settings for a given Store.
* Please read the <a href="https://github.com/facebook/rocksdb/wiki/RocksDB-Tuning-Guide">RocksDB Tuning Guide</a>.
@ -29,12 +32,31 @@ import java.util.Map; @@ -29,12 +32,31 @@ import java.util.Map;
*/
public interface RocksDBConfigSetter {
Logger LOG = LoggerFactory.getLogger(RocksDBConfigSetter.class);
/**
* Set the rocks db options for the provided storeName.
*
* @param storeName the name of the store being configured
* @param options the Rocks DB options
* @param options the RocksDB options
* @param configs the configuration supplied to {@link org.apache.kafka.streams.StreamsConfig}
*/
void setConfig(final String storeName, final Options options, final Map<String, Object> configs);
/**
* Close any user-constructed objects that inherit from {@code org.rocksdb.RocksObject}.
* <p>
* Any object created with {@code new} in {@link RocksDBConfigSetter#setConfig setConfig()} and that inherits
* from {@code org.rocksdb.RocksObject} should have {@code org.rocksdb.RocksObject#close()}
* called on it here to avoid leaking off-heap memory. Objects to be closed can be saved by the user or retrieved
* back from {@code options} using its getter methods.
* <p>
* Example objects needing to be closed include {@code org.rocksdb.Filter} and {@code org.rocksdb.Cache}.
*
* @param storeName the name of the store being configured
* @param options the RocksDB options
*/
default void close(final String storeName, final Options options) {
LOG.warn("The default close will be removed in 3.0.0 -- you should overwrite it if you have implemented RocksDBConfigSetter");
}
}

10
streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBStore.java

@ -92,6 +92,8 @@ public class RocksDBStore implements KeyValueStore<Bytes, byte[]>, BulkLoadingSt @@ -92,6 +92,8 @@ public class RocksDBStore implements KeyValueStore<Bytes, byte[]>, BulkLoadingSt
FlushOptions fOptions;
private BloomFilter filter;
private RocksDBConfigSetter configSetter;
private volatile boolean prepareForBulkload = false;
ProcessorContext internalProcessorContext;
// visible for testing
@ -154,7 +156,7 @@ public class RocksDBStore implements KeyValueStore<Bytes, byte[]>, BulkLoadingSt @@ -154,7 +156,7 @@ public class RocksDBStore implements KeyValueStore<Bytes, byte[]>, BulkLoadingSt
(Class<RocksDBConfigSetter>) configs.get(StreamsConfig.ROCKSDB_CONFIG_SETTER_CLASS_CONFIG);
if (configSetterClass != null) {
final RocksDBConfigSetter configSetter = Utils.newInstance(configSetterClass);
configSetter = Utils.newInstance(configSetterClass);
configSetter.setConfig(name, userSpecifiedOptions, configs);
}
@ -390,6 +392,12 @@ public class RocksDBStore implements KeyValueStore<Bytes, byte[]>, BulkLoadingSt @@ -390,6 +392,12 @@ public class RocksDBStore implements KeyValueStore<Bytes, byte[]>, BulkLoadingSt
open = false;
closeOpenIterators();
if (configSetter != null) {
configSetter.close(name, userSpecifiedOptions);
configSetter = null;
}
dbAccessor.close();
userSpecifiedOptions.close();
wOptions.close();

Loading…
Cancel
Save