Browse Source

KAFKA-3753: Add approximateNumEntries() method to KeyValueStore interface

See https://issues.apache.org/jira/browse/KAFKA-3753

This contribution is my original work and I license the work to the project under the project's open source license.

cc guozhangwang kichristensen ijuma

Author: Jeff Klukas <jeff@klukas.net>

Reviewers: Ismael Juma, Guozhang Wang

Closes #1486 from jklukas/kvstore-size
pull/1486/merge
Jeff Klukas 9 years ago committed by Guozhang Wang
parent
commit
1ef7b494bb
  1. 10
      streams/src/main/java/org/apache/kafka/streams/state/KeyValueStore.java
  2. 5
      streams/src/main/java/org/apache/kafka/streams/state/internals/InMemoryKeyValueLoggedStore.java
  3. 5
      streams/src/main/java/org/apache/kafka/streams/state/internals/InMemoryKeyValueStoreSupplier.java
  4. 5
      streams/src/main/java/org/apache/kafka/streams/state/internals/MemoryLRUCache.java
  5. 5
      streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredKeyValueStore.java
  6. 37
      streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBStore.java
  7. 19
      streams/src/test/java/org/apache/kafka/streams/state/internals/AbstractKeyValueStoreTest.java

10
streams/src/main/java/org/apache/kafka/streams/state/KeyValueStore.java

@ -97,4 +97,14 @@ public interface KeyValueStore<K, V> extends StateStore { @@ -97,4 +97,14 @@ public interface KeyValueStore<K, V> extends StateStore {
*/
KeyValueIterator<K, V> all();
/**
* Return an approximate count of key-value mappings in this store.
*
* The count is not guaranteed to be exact in order to accommodate stores
* where an exact count is expensive to calculate.
*
* @return an approximate count of key-value mappings in the store.
*/
long approximateNumEntries();
}

5
streams/src/main/java/org/apache/kafka/streams/state/internals/InMemoryKeyValueLoggedStore.java

@ -149,6 +149,11 @@ public class InMemoryKeyValueLoggedStore<K, V> implements KeyValueStore<K, V> { @@ -149,6 +149,11 @@ public class InMemoryKeyValueLoggedStore<K, V> implements KeyValueStore<K, V> {
return this.inner.all();
}
@Override
public long approximateNumEntries() {
return this.inner.approximateNumEntries();
}
@Override
public void close() {
inner.close();

5
streams/src/main/java/org/apache/kafka/streams/state/internals/InMemoryKeyValueStoreSupplier.java

@ -146,6 +146,11 @@ public class InMemoryKeyValueStoreSupplier<K, V> implements StateStoreSupplier { @@ -146,6 +146,11 @@ public class InMemoryKeyValueStoreSupplier<K, V> implements StateStoreSupplier {
return new MemoryStoreIterator<>(this.map.entrySet().iterator());
}
@Override
public long approximateNumEntries() {
return this.map.size();
}
@Override
public void flush() {
// do-nothing since it is in-memory

5
streams/src/main/java/org/apache/kafka/streams/state/internals/MemoryLRUCache.java

@ -153,6 +153,11 @@ public class MemoryLRUCache<K, V> implements KeyValueStore<K, V> { @@ -153,6 +153,11 @@ public class MemoryLRUCache<K, V> implements KeyValueStore<K, V> {
throw new UnsupportedOperationException("MemoryLRUCache does not support all() function.");
}
@Override
public long approximateNumEntries() {
return this.map.size();
}
@Override
public void flush() {
// do-nothing since it is in-memory

5
streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredKeyValueStore.java

@ -153,6 +153,11 @@ public class MeteredKeyValueStore<K, V> implements KeyValueStore<K, V> { @@ -153,6 +153,11 @@ public class MeteredKeyValueStore<K, V> implements KeyValueStore<K, V> {
return new MeteredKeyValueIterator<>(this.inner.all(), this.allTime);
}
@Override
public long approximateNumEntries() {
return this.inner.approximateNumEntries();
}
@Override
public void close() {
inner.close();

37
streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBStore.java

@ -354,6 +354,43 @@ public class RocksDBStore<K, V> implements KeyValueStore<K, V> { @@ -354,6 +354,43 @@ public class RocksDBStore<K, V> implements KeyValueStore<K, V> {
return new RocksDbIterator<>(innerIter, serdes);
}
/**
* Return an approximate count of key-value mappings in this store.
*
* <code>RocksDB</code> cannot return an exact entry count without doing a
* full scan, so this method relies on the <code>rocksdb.estimate-num-keys</code>
* property to get an approximate count. The returned size also includes
* a count of dirty keys in the store's in-memory cache, which may lead to some
* double-counting of entries and inflate the estimate.
*
* @return an approximate count of key-value mappings in the store.
*/
@Override
public long approximateNumEntries() {
long value;
try {
value = this.db.getLongProperty("rocksdb.estimate-num-keys");
} catch (RocksDBException e) {
throw new ProcessorStateException("Error fetching property from store " + this.name, e);
}
if (isOverflowing(value)) {
return Long.MAX_VALUE;
}
if (this.cacheDirtyKeys != null) {
value += this.cacheDirtyKeys.size();
}
if (isOverflowing(value)) {
return Long.MAX_VALUE;
}
return value;
}
private boolean isOverflowing(long value) {
// RocksDB returns an unsigned 8-byte integer, which could overflow long
// and manifest as a negative value.
return value < 0;
}
private void flushCache() {
// flush of the cache entries if necessary
if (cache != null) {

19
streams/src/test/java/org/apache/kafka/streams/state/internals/AbstractKeyValueStoreTest.java

@ -227,4 +227,23 @@ public abstract class AbstractKeyValueStoreTest { @@ -227,4 +227,23 @@ public abstract class AbstractKeyValueStoreTest {
store.close();
}
}
@Test
public void testSize() {
// Create the test driver ...
KeyValueStoreTestDriver<Integer, String> driver = KeyValueStoreTestDriver.create(Integer.class, String.class);
KeyValueStore<Integer, String> store = createKeyValueStore(driver.context(), Integer.class, String.class, true);
try {
assertEquals("A newly created store should have no entries", 0, store.approximateNumEntries());
store.put(0, "zero");
store.put(1, "one");
store.put(2, "two");
store.put(4, "four");
store.put(5, "five");
assertEquals(5, store.approximateNumEntries());
} finally {
store.close();
}
}
}

Loading…
Cancel
Save