Browse Source

KAFKA-8094: Iterating over cache with get(key) is inefficient (#6433)

Use concurrent data structure for the underlying cache in NamedCache, and iterate over it with subMap instead of many calls to get()

Reviewers: Guozhang Wang <wangguoz@gmail.com>, Bill Bejeck <bbejeck@gmail.com>
pull/6479/head
A. Sophie Blee-Goldman 6 years ago committed by Bill Bejeck
parent
commit
538bd7eddf
  1. 18
      streams/src/main/java/org/apache/kafka/streams/state/internals/NamedCache.java
  2. 24
      streams/src/main/java/org/apache/kafka/streams/state/internals/ThreadCache.java
  3. 27
      streams/src/test/java/org/apache/kafka/streams/state/internals/NamedCacheTest.java
  4. 3
      streams/src/test/java/org/apache/kafka/streams/state/internals/ThreadCacheTest.java

18
streams/src/main/java/org/apache/kafka/streams/state/internals/NamedCache.java

@ -16,6 +16,8 @@
*/ */
package org.apache.kafka.streams.state.internals; package org.apache.kafka.streams.state.internals;
import java.util.NavigableMap;
import java.util.concurrent.ConcurrentSkipListMap;
import org.apache.kafka.common.MetricName; import org.apache.kafka.common.MetricName;
import org.apache.kafka.common.metrics.Sensor; import org.apache.kafka.common.metrics.Sensor;
import org.apache.kafka.common.metrics.stats.Avg; import org.apache.kafka.common.metrics.stats.Avg;
@ -33,13 +35,11 @@ import java.util.LinkedHashSet;
import java.util.List; import java.util.List;
import java.util.Map; import java.util.Map;
import java.util.Set; import java.util.Set;
import java.util.TreeMap;
import java.util.TreeSet;
class NamedCache { class NamedCache {
private static final Logger log = LoggerFactory.getLogger(NamedCache.class); private static final Logger log = LoggerFactory.getLogger(NamedCache.class);
private final String name; private final String name;
private final TreeMap<Bytes, LRUNode> cache = new TreeMap<>(); private final NavigableMap<Bytes, LRUNode> cache = new ConcurrentSkipListMap<>();
private final Set<Bytes> dirtyKeys = new LinkedHashSet<>(); private final Set<Bytes> dirtyKeys = new LinkedHashSet<>();
private ThreadCache.DirtyEntryFlushListener listener; private ThreadCache.DirtyEntryFlushListener listener;
private LRUNode tail; private LRUNode tail;
@ -266,16 +266,12 @@ class NamedCache {
return cache.size(); return cache.size();
} }
synchronized Iterator<Bytes> keyRange(final Bytes from, final Bytes to) { synchronized Iterator<Map.Entry<Bytes, LRUNode>> subMapIterator(final Bytes from, final Bytes to) {
return keySetIterator(cache.navigableKeySet().subSet(from, true, to, true)); return cache.subMap(from, true, to, true).entrySet().iterator();
} }
private Iterator<Bytes> keySetIterator(final Set<Bytes> keySet) { synchronized Iterator<Map.Entry<Bytes, LRUNode>> allIterator() {
return new TreeSet<>(keySet).iterator(); return cache.entrySet().iterator();
}
synchronized Iterator<Bytes> allKeys() {
return keySetIterator(cache.navigableKeySet());
} }
synchronized LRUCacheEntry first() { synchronized LRUCacheEntry first() {

24
streams/src/main/java/org/apache/kafka/streams/state/internals/ThreadCache.java

@ -20,6 +20,7 @@ import org.apache.kafka.common.utils.Bytes;
import org.apache.kafka.common.utils.LogContext; import org.apache.kafka.common.utils.LogContext;
import org.apache.kafka.streams.KeyValue; import org.apache.kafka.streams.KeyValue;
import org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl; import org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl;
import org.apache.kafka.streams.state.internals.NamedCache.LRUNode;
import org.slf4j.Logger; import org.slf4j.Logger;
import java.util.Collections; import java.util.Collections;
@ -180,17 +181,17 @@ public class ThreadCache {
public MemoryLRUCacheBytesIterator range(final String namespace, final Bytes from, final Bytes to) { public MemoryLRUCacheBytesIterator range(final String namespace, final Bytes from, final Bytes to) {
final NamedCache cache = getCache(namespace); final NamedCache cache = getCache(namespace);
if (cache == null) { if (cache == null) {
return new MemoryLRUCacheBytesIterator(Collections.<Bytes>emptyIterator(), new NamedCache(namespace, this.metrics)); return new MemoryLRUCacheBytesIterator(Collections.emptyIterator());
} }
return new MemoryLRUCacheBytesIterator(cache.keyRange(from, to), cache); return new MemoryLRUCacheBytesIterator(cache.subMapIterator(from, to));
} }
public MemoryLRUCacheBytesIterator all(final String namespace) { public MemoryLRUCacheBytesIterator all(final String namespace) {
final NamedCache cache = getCache(namespace); final NamedCache cache = getCache(namespace);
if (cache == null) { if (cache == null) {
return new MemoryLRUCacheBytesIterator(Collections.<Bytes>emptyIterator(), new NamedCache(namespace, this.metrics)); return new MemoryLRUCacheBytesIterator(Collections.emptyIterator());
} }
return new MemoryLRUCacheBytesIterator(cache.allKeys(), cache); return new MemoryLRUCacheBytesIterator(cache.allIterator());
} }
public long size() { public long size() {
@ -260,13 +261,11 @@ public class ThreadCache {
} }
static class MemoryLRUCacheBytesIterator implements PeekingKeyValueIterator<Bytes, LRUCacheEntry> { static class MemoryLRUCacheBytesIterator implements PeekingKeyValueIterator<Bytes, LRUCacheEntry> {
private final Iterator<Bytes> keys; private final Iterator<Map.Entry<Bytes, LRUNode>> underlying;
private final NamedCache cache;
private KeyValue<Bytes, LRUCacheEntry> nextEntry; private KeyValue<Bytes, LRUCacheEntry> nextEntry;
MemoryLRUCacheBytesIterator(final Iterator<Bytes> keys, final NamedCache cache) { MemoryLRUCacheBytesIterator(final Iterator<Map.Entry<Bytes, LRUNode>> underlying) {
this.keys = keys; this.underlying = underlying;
this.cache = cache;
} }
public Bytes peekNextKey() { public Bytes peekNextKey() {
@ -290,7 +289,7 @@ public class ThreadCache {
return true; return true;
} }
while (keys.hasNext() && nextEntry == null) { while (underlying.hasNext() && nextEntry == null) {
internalNext(); internalNext();
} }
@ -308,8 +307,9 @@ public class ThreadCache {
} }
private void internalNext() { private void internalNext() {
final Bytes cacheKey = keys.next(); final Map.Entry<Bytes, LRUNode> mapEntry = underlying.next();
final LRUCacheEntry entry = cache.get(cacheKey); final Bytes cacheKey = mapEntry.getKey();
final LRUCacheEntry entry = mapEntry.getValue().entry();
if (entry == null) { if (entry == null) {
return; return;
} }

27
streams/src/test/java/org/apache/kafka/streams/state/internals/NamedCacheTest.java

@ -32,7 +32,6 @@ import org.junit.Test;
import java.io.IOException; import java.io.IOException;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.Arrays; import java.util.Arrays;
import java.util.Iterator;
import java.util.LinkedHashMap; import java.util.LinkedHashMap;
import java.util.List; import java.util.List;
import java.util.Map; import java.util.Map;
@ -40,7 +39,6 @@ import java.util.Map;
import static org.apache.kafka.test.StreamsTestUtils.getMetricByNameFilterByTags; import static org.apache.kafka.test.StreamsTestUtils.getMetricByNameFilterByTags;
import static org.junit.Assert.assertArrayEquals; import static org.junit.Assert.assertArrayEquals;
import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertNotNull; import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertNull; import static org.junit.Assert.assertNull;
import static org.junit.Assert.assertSame; import static org.junit.Assert.assertSame;
@ -208,31 +206,6 @@ public class NamedCacheTest {
assertEquals(cache.flushes(), 1); assertEquals(cache.flushes(), 1);
} }
@Test
public void shouldGetRangeIteratorOverKeys() {
cache.put(Bytes.wrap(new byte[]{0}), new LRUCacheEntry(new byte[]{10}, headers, true, 0, 0, 0, ""));
cache.put(Bytes.wrap(new byte[]{1}), new LRUCacheEntry(new byte[]{20}));
cache.put(Bytes.wrap(new byte[]{2}), new LRUCacheEntry(new byte[]{30}, null, true, 0, 0, 0, ""));
final Iterator<Bytes> iterator = cache.keyRange(Bytes.wrap(new byte[]{1}), Bytes.wrap(new byte[]{2}));
assertEquals(Bytes.wrap(new byte[]{1}), iterator.next());
assertEquals(Bytes.wrap(new byte[]{2}), iterator.next());
assertFalse(iterator.hasNext());
}
@Test
public void shouldGetIteratorOverAllKeys() {
cache.put(Bytes.wrap(new byte[]{0}), new LRUCacheEntry(new byte[]{10}, headers, true, 0, 0, 0, ""));
cache.put(Bytes.wrap(new byte[]{1}), new LRUCacheEntry(new byte[]{20}));
cache.put(Bytes.wrap(new byte[]{2}), new LRUCacheEntry(new byte[]{30}, null, true, 0, 0, 0, ""));
final Iterator<Bytes> iterator = cache.allKeys();
assertEquals(Bytes.wrap(new byte[]{0}), iterator.next());
assertEquals(Bytes.wrap(new byte[]{1}), iterator.next());
assertEquals(Bytes.wrap(new byte[]{2}), iterator.next());
assertFalse(iterator.hasNext());
}
@Test @Test
public void shouldNotThrowNullPointerWhenCacheIsEmptyAndEvictionCalled() { public void shouldNotThrowNullPointerWhenCacheIsEmptyAndEvictionCalled() {
cache.evict(); cache.evict();

3
streams/src/test/java/org/apache/kafka/streams/state/internals/ThreadCacheTest.java

@ -310,10 +310,11 @@ public class ThreadCacheTest {
} }
assertEquals(5, cache.size()); assertEquals(5, cache.size());
final ThreadCache.MemoryLRUCacheBytesIterator range = cache.range(namespace, Bytes.wrap(new byte[]{0}), Bytes.wrap(new byte[]{5}));
// should evict byte[] {0} // should evict byte[] {0}
cache.put(namespace, Bytes.wrap(new byte[]{6}), dirtyEntry(new byte[]{6})); cache.put(namespace, Bytes.wrap(new byte[]{6}), dirtyEntry(new byte[]{6}));
final ThreadCache.MemoryLRUCacheBytesIterator range = cache.range(namespace, Bytes.wrap(new byte[]{0}), Bytes.wrap(new byte[]{5}));
assertEquals(Bytes.wrap(new byte[]{1}), range.peekNextKey()); assertEquals(Bytes.wrap(new byte[]{1}), range.peekNextKey());
} }

Loading…
Cancel
Save