From 538bd7eddf13897245524f015e3207affb03fcdc Mon Sep 17 00:00:00 2001 From: "A. Sophie Blee-Goldman" Date: Tue, 19 Mar 2019 08:51:10 -0700 Subject: [PATCH] KAFKA-8094: Iterating over cache with get(key) is inefficient (#6433) Use concurrent data structure for the underlying cache in NamedCache, and iterate over it with subMap instead of many calls to get() Reviewers: Guozhang Wang , Bill Bejeck --- .../streams/state/internals/NamedCache.java | 18 +++++-------- .../streams/state/internals/ThreadCache.java | 24 ++++++++--------- .../state/internals/NamedCacheTest.java | 27 ------------------- .../state/internals/ThreadCacheTest.java | 3 ++- 4 files changed, 21 insertions(+), 51 deletions(-) diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/NamedCache.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/NamedCache.java index 3ce7cbe427e..0201f20f9fb 100644 --- a/streams/src/main/java/org/apache/kafka/streams/state/internals/NamedCache.java +++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/NamedCache.java @@ -16,6 +16,8 @@ */ package org.apache.kafka.streams.state.internals; +import java.util.NavigableMap; +import java.util.concurrent.ConcurrentSkipListMap; import org.apache.kafka.common.MetricName; import org.apache.kafka.common.metrics.Sensor; import org.apache.kafka.common.metrics.stats.Avg; @@ -33,13 +35,11 @@ import java.util.LinkedHashSet; import java.util.List; import java.util.Map; import java.util.Set; -import java.util.TreeMap; -import java.util.TreeSet; class NamedCache { private static final Logger log = LoggerFactory.getLogger(NamedCache.class); private final String name; - private final TreeMap cache = new TreeMap<>(); + private final NavigableMap cache = new ConcurrentSkipListMap<>(); private final Set dirtyKeys = new LinkedHashSet<>(); private ThreadCache.DirtyEntryFlushListener listener; private LRUNode tail; @@ -266,16 +266,12 @@ class NamedCache { return cache.size(); } - synchronized Iterator keyRange(final Bytes from, final Bytes to) { - return keySetIterator(cache.navigableKeySet().subSet(from, true, to, true)); + synchronized Iterator> subMapIterator(final Bytes from, final Bytes to) { + return cache.subMap(from, true, to, true).entrySet().iterator(); } - private Iterator keySetIterator(final Set keySet) { - return new TreeSet<>(keySet).iterator(); - } - - synchronized Iterator allKeys() { - return keySetIterator(cache.navigableKeySet()); + synchronized Iterator> allIterator() { + return cache.entrySet().iterator(); } synchronized LRUCacheEntry first() { diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/ThreadCache.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/ThreadCache.java index 941b5221524..0db6c78a577 100644 --- a/streams/src/main/java/org/apache/kafka/streams/state/internals/ThreadCache.java +++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/ThreadCache.java @@ -20,6 +20,7 @@ import org.apache.kafka.common.utils.Bytes; import org.apache.kafka.common.utils.LogContext; import org.apache.kafka.streams.KeyValue; import org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl; +import org.apache.kafka.streams.state.internals.NamedCache.LRUNode; import org.slf4j.Logger; import java.util.Collections; @@ -180,17 +181,17 @@ public class ThreadCache { public MemoryLRUCacheBytesIterator range(final String namespace, final Bytes from, final Bytes to) { final NamedCache cache = getCache(namespace); if (cache == null) { - return new MemoryLRUCacheBytesIterator(Collections.emptyIterator(), new NamedCache(namespace, this.metrics)); + return new MemoryLRUCacheBytesIterator(Collections.emptyIterator()); } - return new MemoryLRUCacheBytesIterator(cache.keyRange(from, to), cache); + return new MemoryLRUCacheBytesIterator(cache.subMapIterator(from, to)); } public MemoryLRUCacheBytesIterator all(final String namespace) { final NamedCache cache = getCache(namespace); if (cache == null) { - return new MemoryLRUCacheBytesIterator(Collections.emptyIterator(), new NamedCache(namespace, this.metrics)); + return new MemoryLRUCacheBytesIterator(Collections.emptyIterator()); } - return new MemoryLRUCacheBytesIterator(cache.allKeys(), cache); + return new MemoryLRUCacheBytesIterator(cache.allIterator()); } public long size() { @@ -260,13 +261,11 @@ public class ThreadCache { } static class MemoryLRUCacheBytesIterator implements PeekingKeyValueIterator { - private final Iterator keys; - private final NamedCache cache; + private final Iterator> underlying; private KeyValue nextEntry; - MemoryLRUCacheBytesIterator(final Iterator keys, final NamedCache cache) { - this.keys = keys; - this.cache = cache; + MemoryLRUCacheBytesIterator(final Iterator> underlying) { + this.underlying = underlying; } public Bytes peekNextKey() { @@ -290,7 +289,7 @@ public class ThreadCache { return true; } - while (keys.hasNext() && nextEntry == null) { + while (underlying.hasNext() && nextEntry == null) { internalNext(); } @@ -308,8 +307,9 @@ public class ThreadCache { } private void internalNext() { - final Bytes cacheKey = keys.next(); - final LRUCacheEntry entry = cache.get(cacheKey); + final Map.Entry mapEntry = underlying.next(); + final Bytes cacheKey = mapEntry.getKey(); + final LRUCacheEntry entry = mapEntry.getValue().entry(); if (entry == null) { return; } diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/NamedCacheTest.java b/streams/src/test/java/org/apache/kafka/streams/state/internals/NamedCacheTest.java index 394feed256a..6c82209c33a 100644 --- a/streams/src/test/java/org/apache/kafka/streams/state/internals/NamedCacheTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/NamedCacheTest.java @@ -32,7 +32,6 @@ import org.junit.Test; import java.io.IOException; import java.util.ArrayList; import java.util.Arrays; -import java.util.Iterator; import java.util.LinkedHashMap; import java.util.List; import java.util.Map; @@ -40,7 +39,6 @@ import java.util.Map; import static org.apache.kafka.test.StreamsTestUtils.getMetricByNameFilterByTags; import static org.junit.Assert.assertArrayEquals; import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertNotNull; import static org.junit.Assert.assertNull; import static org.junit.Assert.assertSame; @@ -208,31 +206,6 @@ public class NamedCacheTest { assertEquals(cache.flushes(), 1); } - @Test - public void shouldGetRangeIteratorOverKeys() { - cache.put(Bytes.wrap(new byte[]{0}), new LRUCacheEntry(new byte[]{10}, headers, true, 0, 0, 0, "")); - cache.put(Bytes.wrap(new byte[]{1}), new LRUCacheEntry(new byte[]{20})); - cache.put(Bytes.wrap(new byte[]{2}), new LRUCacheEntry(new byte[]{30}, null, true, 0, 0, 0, "")); - - final Iterator iterator = cache.keyRange(Bytes.wrap(new byte[]{1}), Bytes.wrap(new byte[]{2})); - assertEquals(Bytes.wrap(new byte[]{1}), iterator.next()); - assertEquals(Bytes.wrap(new byte[]{2}), iterator.next()); - assertFalse(iterator.hasNext()); - } - - @Test - public void shouldGetIteratorOverAllKeys() { - cache.put(Bytes.wrap(new byte[]{0}), new LRUCacheEntry(new byte[]{10}, headers, true, 0, 0, 0, "")); - cache.put(Bytes.wrap(new byte[]{1}), new LRUCacheEntry(new byte[]{20})); - cache.put(Bytes.wrap(new byte[]{2}), new LRUCacheEntry(new byte[]{30}, null, true, 0, 0, 0, "")); - - final Iterator iterator = cache.allKeys(); - assertEquals(Bytes.wrap(new byte[]{0}), iterator.next()); - assertEquals(Bytes.wrap(new byte[]{1}), iterator.next()); - assertEquals(Bytes.wrap(new byte[]{2}), iterator.next()); - assertFalse(iterator.hasNext()); - } - @Test public void shouldNotThrowNullPointerWhenCacheIsEmptyAndEvictionCalled() { cache.evict(); diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/ThreadCacheTest.java b/streams/src/test/java/org/apache/kafka/streams/state/internals/ThreadCacheTest.java index a7a64c423b2..5882ee4b94a 100644 --- a/streams/src/test/java/org/apache/kafka/streams/state/internals/ThreadCacheTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/ThreadCacheTest.java @@ -310,10 +310,11 @@ public class ThreadCacheTest { } assertEquals(5, cache.size()); - final ThreadCache.MemoryLRUCacheBytesIterator range = cache.range(namespace, Bytes.wrap(new byte[]{0}), Bytes.wrap(new byte[]{5})); // should evict byte[] {0} cache.put(namespace, Bytes.wrap(new byte[]{6}), dirtyEntry(new byte[]{6})); + final ThreadCache.MemoryLRUCacheBytesIterator range = cache.range(namespace, Bytes.wrap(new byte[]{0}), Bytes.wrap(new byte[]{5})); + assertEquals(Bytes.wrap(new byte[]{1}), range.peekNextKey()); }