From 0efed12f50f683131b31d33d787e2141122a5c11 Mon Sep 17 00:00:00 2001 From: Bill Bejeck Date: Wed, 23 Jan 2019 19:00:26 -0500 Subject: [PATCH] KAFKA-4850: Enable bloomfilters (#6012) This PR enables BloomFilters for RocksDB to speed up point lookups. The request for this has been around for some time - https://issues.apache.org/jira/browse/KAFKA-4850 For testing, I've done the following Ran the standard streams suite of unit and integration tests Kicked off the simple benchmark test with bloom filters enabled Kicked off the simple benchmark test with bloom filters not enabled Kicked off streams system tests Matthias J. Sax , Guozhang Wang , John Roesler --- .../streams/state/internals/RocksDBStore.java | 8 +- .../streams/state/internals/Segment.java | 1 - .../state/internals/RocksDBStoreTest.java | 75 +++++++++++++++++++ 3 files changed, 82 insertions(+), 2 deletions(-) diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBStore.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBStore.java index c86d1e4bda0..232d8f23840 100644 --- a/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBStore.java +++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBStore.java @@ -32,6 +32,7 @@ import org.apache.kafka.streams.state.KeyValueIterator; import org.apache.kafka.streams.state.KeyValueStore; import org.apache.kafka.streams.state.RocksDBConfigSetter; import org.rocksdb.BlockBasedTableConfig; +import org.rocksdb.BloomFilter; import org.rocksdb.CompactionStyle; import org.rocksdb.CompressionType; import org.rocksdb.FlushOptions; @@ -109,14 +110,19 @@ public class RocksDBStore implements KeyValueStore { this.parentDir = parentDir; } + @SuppressWarnings("unchecked") public void openDB(final ProcessorContext context) { // initialize the default rocksdb options + + options = new Options(); + final BlockBasedTableConfig tableConfig = new BlockBasedTableConfig(); tableConfig.setBlockCacheSize(BLOCK_CACHE_SIZE); tableConfig.setBlockSize(BLOCK_SIZE); + tableConfig.setFilter(new BloomFilter()); - options = new Options(); + options.optimizeFiltersForHits(); options.setTableFormatConfig(tableConfig); options.setWriteBufferSize(WRITE_BUFFER_SIZE); options.setCompressionType(COMPRESSION_TYPE); diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/Segment.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/Segment.java index f80ffa3a3b3..f95e3955c10 100644 --- a/streams/src/main/java/org/apache/kafka/streams/state/internals/Segment.java +++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/Segment.java @@ -39,7 +39,6 @@ class Segment extends RocksDBStore implements Comparable { return Long.compare(id, segment.id); } - @Override public void openDB(final ProcessorContext context) { super.openDB(context); diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBStoreTest.java b/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBStoreTest.java index 67c2d5d7c11..b98f72f58e8 100644 --- a/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBStoreTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBStoreTest.java @@ -35,6 +35,8 @@ import org.apache.kafka.test.TestUtils; import org.junit.After; import org.junit.Before; import org.junit.Test; +import org.rocksdb.BlockBasedTableConfig; +import org.rocksdb.BloomFilter; import org.rocksdb.Options; import java.io.File; @@ -48,6 +50,7 @@ import java.util.Set; import static java.nio.charset.StandardCharsets.UTF_8; import static org.hamcrest.CoreMatchers.equalTo; +import static org.hamcrest.CoreMatchers.is; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertThat; @@ -60,6 +63,7 @@ public class RocksDBStoreTest { private RocksDBStore rocksDBStore; private InternalMockProcessorContext context; private File dir; + private static boolean enableBloomFilters = false; @Before public void setUp() { @@ -415,6 +419,53 @@ public class RocksDBStoreTest { rocksDBStore.flush(); } + @Test + public void shouldHandleToggleOfEnablingBloomFilters() { + + final Properties props = StreamsTestUtils.getStreamsConfig(); + props.put(StreamsConfig.ROCKSDB_CONFIG_SETTER_CLASS_CONFIG, TestingBloomFilterRocksDBConfigSetter.class); + rocksDBStore = new RocksDBStore("test"); + dir = TestUtils.tempDirectory(); + context = new InternalMockProcessorContext(dir, + Serdes.String(), + Serdes.String(), + new StreamsConfig(props)); + + rocksDBStore.init(context, rocksDBStore); + + final List expectedValues = new ArrayList<>(); + expectedValues.add("a"); + expectedValues.add("b"); + expectedValues.add("c"); + + final List> keyValues = getKeyValueEntries(); + for (final KeyValue keyValue : keyValues) { + rocksDBStore.put(new Bytes(keyValue.key), keyValue.value); + } + + int expectedIndex = 0; + for (final KeyValue keyValue : keyValues) { + final byte[] valBytes = rocksDBStore.get(new Bytes(keyValue.key)); + assertThat(new String(valBytes, UTF_8), is(expectedValues.get(expectedIndex++))); + } + assertFalse(TestingBloomFilterRocksDBConfigSetter.bloomFiltersSet); + + rocksDBStore.close(); + enableBloomFilters = true; + expectedIndex = 0; + + // reopen with Bloom Filters enabled + // should open fine without errors + rocksDBStore.init(context, rocksDBStore); + + for (final KeyValue keyValue : keyValues) { + final byte[] valBytes = rocksDBStore.get(new Bytes(keyValue.key)); + assertThat(new String(valBytes, UTF_8), is(expectedValues.get(expectedIndex++))); + } + + assertTrue(TestingBloomFilterRocksDBConfigSetter.bloomFiltersSet); + } + public static class MockRocksDbConfigSetter implements RocksDBConfigSetter { static boolean called; @@ -426,6 +477,29 @@ public class RocksDBStoreTest { } } + public static class TestingBloomFilterRocksDBConfigSetter implements RocksDBConfigSetter { + + static boolean bloomFiltersSet = false; + + @Override + public void setConfig(final String storeName, final Options options, final Map configs) { + + final BlockBasedTableConfig tableConfig = new BlockBasedTableConfig(); + tableConfig.setBlockCacheSize(50 * 1024 * 1024L); + tableConfig.setBlockSize(4096L); + if (enableBloomFilters) { + tableConfig.setFilter(new BloomFilter()); + options.optimizeFiltersForHits(); + bloomFiltersSet = true; + } else { + options.setOptimizeFiltersForHits(false); + } + + options.setTableFormatConfig(tableConfig); + + } + } + private List> getKeyValueEntries() { final List> entries = new ArrayList<>(); entries.add(new KeyValue<>("1".getBytes(UTF_8), "a".getBytes(UTF_8))); @@ -433,4 +507,5 @@ public class RocksDBStoreTest { entries.add(new KeyValue<>("3".getBytes(UTF_8), "c".getBytes(UTF_8))); return entries; } + }