Browse Source

KAFKA-4850: Enable bloomfilters (#6012)

This PR enables BloomFilters for RocksDB to speed up point lookups.
The request for this has been around for some time - https://issues.apache.org/jira/browse/KAFKA-4850

For testing, I've done the following

Ran the standard streams suite of unit and integration tests
Kicked off the simple benchmark test with bloom filters enabled
Kicked off the simple benchmark test with bloom filters not enabled
Kicked off streams system tests

Matthias J. Sax <matthias@confluent.io>, Guozhang Wang <wangguoz@gmail.com>, John Roesler <john@confluent.io>
pull/3453/merge
Bill Bejeck 6 years ago committed by Guozhang Wang
parent
commit
0efed12f50
  1. 8
      streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBStore.java
  2. 1
      streams/src/main/java/org/apache/kafka/streams/state/internals/Segment.java
  3. 75
      streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBStoreTest.java

8
streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBStore.java

@ -32,6 +32,7 @@ import org.apache.kafka.streams.state.KeyValueIterator;
import org.apache.kafka.streams.state.KeyValueStore; import org.apache.kafka.streams.state.KeyValueStore;
import org.apache.kafka.streams.state.RocksDBConfigSetter; import org.apache.kafka.streams.state.RocksDBConfigSetter;
import org.rocksdb.BlockBasedTableConfig; import org.rocksdb.BlockBasedTableConfig;
import org.rocksdb.BloomFilter;
import org.rocksdb.CompactionStyle; import org.rocksdb.CompactionStyle;
import org.rocksdb.CompressionType; import org.rocksdb.CompressionType;
import org.rocksdb.FlushOptions; import org.rocksdb.FlushOptions;
@ -109,14 +110,19 @@ public class RocksDBStore implements KeyValueStore<Bytes, byte[]> {
this.parentDir = parentDir; this.parentDir = parentDir;
} }
@SuppressWarnings("unchecked") @SuppressWarnings("unchecked")
public void openDB(final ProcessorContext context) { public void openDB(final ProcessorContext context) {
// initialize the default rocksdb options // initialize the default rocksdb options
options = new Options();
final BlockBasedTableConfig tableConfig = new BlockBasedTableConfig(); final BlockBasedTableConfig tableConfig = new BlockBasedTableConfig();
tableConfig.setBlockCacheSize(BLOCK_CACHE_SIZE); tableConfig.setBlockCacheSize(BLOCK_CACHE_SIZE);
tableConfig.setBlockSize(BLOCK_SIZE); tableConfig.setBlockSize(BLOCK_SIZE);
tableConfig.setFilter(new BloomFilter());
options = new Options(); options.optimizeFiltersForHits();
options.setTableFormatConfig(tableConfig); options.setTableFormatConfig(tableConfig);
options.setWriteBufferSize(WRITE_BUFFER_SIZE); options.setWriteBufferSize(WRITE_BUFFER_SIZE);
options.setCompressionType(COMPRESSION_TYPE); options.setCompressionType(COMPRESSION_TYPE);

1
streams/src/main/java/org/apache/kafka/streams/state/internals/Segment.java

@ -39,7 +39,6 @@ class Segment extends RocksDBStore implements Comparable<Segment> {
return Long.compare(id, segment.id); return Long.compare(id, segment.id);
} }
@Override @Override
public void openDB(final ProcessorContext context) { public void openDB(final ProcessorContext context) {
super.openDB(context); super.openDB(context);

75
streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBStoreTest.java

@ -35,6 +35,8 @@ import org.apache.kafka.test.TestUtils;
import org.junit.After; import org.junit.After;
import org.junit.Before; import org.junit.Before;
import org.junit.Test; import org.junit.Test;
import org.rocksdb.BlockBasedTableConfig;
import org.rocksdb.BloomFilter;
import org.rocksdb.Options; import org.rocksdb.Options;
import java.io.File; import java.io.File;
@ -48,6 +50,7 @@ import java.util.Set;
import static java.nio.charset.StandardCharsets.UTF_8; import static java.nio.charset.StandardCharsets.UTF_8;
import static org.hamcrest.CoreMatchers.equalTo; import static org.hamcrest.CoreMatchers.equalTo;
import static org.hamcrest.CoreMatchers.is;
import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertThat; import static org.junit.Assert.assertThat;
@ -60,6 +63,7 @@ public class RocksDBStoreTest {
private RocksDBStore rocksDBStore; private RocksDBStore rocksDBStore;
private InternalMockProcessorContext context; private InternalMockProcessorContext context;
private File dir; private File dir;
private static boolean enableBloomFilters = false;
@Before @Before
public void setUp() { public void setUp() {
@ -415,6 +419,53 @@ public class RocksDBStoreTest {
rocksDBStore.flush(); rocksDBStore.flush();
} }
@Test
public void shouldHandleToggleOfEnablingBloomFilters() {
final Properties props = StreamsTestUtils.getStreamsConfig();
props.put(StreamsConfig.ROCKSDB_CONFIG_SETTER_CLASS_CONFIG, TestingBloomFilterRocksDBConfigSetter.class);
rocksDBStore = new RocksDBStore("test");
dir = TestUtils.tempDirectory();
context = new InternalMockProcessorContext(dir,
Serdes.String(),
Serdes.String(),
new StreamsConfig(props));
rocksDBStore.init(context, rocksDBStore);
final List<String> expectedValues = new ArrayList<>();
expectedValues.add("a");
expectedValues.add("b");
expectedValues.add("c");
final List<KeyValue<byte[], byte[]>> keyValues = getKeyValueEntries();
for (final KeyValue<byte[], byte[]> keyValue : keyValues) {
rocksDBStore.put(new Bytes(keyValue.key), keyValue.value);
}
int expectedIndex = 0;
for (final KeyValue<byte[], byte[]> keyValue : keyValues) {
final byte[] valBytes = rocksDBStore.get(new Bytes(keyValue.key));
assertThat(new String(valBytes, UTF_8), is(expectedValues.get(expectedIndex++)));
}
assertFalse(TestingBloomFilterRocksDBConfigSetter.bloomFiltersSet);
rocksDBStore.close();
enableBloomFilters = true;
expectedIndex = 0;
// reopen with Bloom Filters enabled
// should open fine without errors
rocksDBStore.init(context, rocksDBStore);
for (final KeyValue<byte[], byte[]> keyValue : keyValues) {
final byte[] valBytes = rocksDBStore.get(new Bytes(keyValue.key));
assertThat(new String(valBytes, UTF_8), is(expectedValues.get(expectedIndex++)));
}
assertTrue(TestingBloomFilterRocksDBConfigSetter.bloomFiltersSet);
}
public static class MockRocksDbConfigSetter implements RocksDBConfigSetter { public static class MockRocksDbConfigSetter implements RocksDBConfigSetter {
static boolean called; static boolean called;
@ -426,6 +477,29 @@ public class RocksDBStoreTest {
} }
} }
public static class TestingBloomFilterRocksDBConfigSetter implements RocksDBConfigSetter {
static boolean bloomFiltersSet = false;
@Override
public void setConfig(final String storeName, final Options options, final Map<String, Object> configs) {
final BlockBasedTableConfig tableConfig = new BlockBasedTableConfig();
tableConfig.setBlockCacheSize(50 * 1024 * 1024L);
tableConfig.setBlockSize(4096L);
if (enableBloomFilters) {
tableConfig.setFilter(new BloomFilter());
options.optimizeFiltersForHits();
bloomFiltersSet = true;
} else {
options.setOptimizeFiltersForHits(false);
}
options.setTableFormatConfig(tableConfig);
}
}
private List<KeyValue<byte[], byte[]>> getKeyValueEntries() { private List<KeyValue<byte[], byte[]>> getKeyValueEntries() {
final List<KeyValue<byte[], byte[]>> entries = new ArrayList<>(); final List<KeyValue<byte[], byte[]>> entries = new ArrayList<>();
entries.add(new KeyValue<>("1".getBytes(UTF_8), "a".getBytes(UTF_8))); entries.add(new KeyValue<>("1".getBytes(UTF_8), "a".getBytes(UTF_8)));
@ -433,4 +507,5 @@ public class RocksDBStoreTest {
entries.add(new KeyValue<>("3".getBytes(UTF_8), "c".getBytes(UTF_8))); entries.add(new KeyValue<>("3".getBytes(UTF_8), "c".getBytes(UTF_8)));
return entries; return entries;
} }
} }

Loading…
Cancel
Save