diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/ThreadCache.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/ThreadCache.java index d76e5c8ff52..f7355d80202 100644 --- a/streams/src/main/java/org/apache/kafka/streams/state/internals/ThreadCache.java +++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/ThreadCache.java @@ -123,12 +123,20 @@ public class ThreadCache { public LRUCacheEntry putIfAbsent(final String namespace, byte[] key, LRUCacheEntry value) { final NamedCache cache = getOrCreateCache(namespace); - return cache.putIfAbsent(Bytes.wrap(key), value); + + final LRUCacheEntry result = cache.putIfAbsent(Bytes.wrap(key), value); + maybeEvict(namespace); + + if (result == null) { + numPuts++; + } + return result; } public void putAll(final String namespace, final List> entries) { - final NamedCache cache = getOrCreateCache(namespace); - cache.putAll(entries); + for (KeyValue entry : entries) { + put(namespace, entry.key, entry.value); + } } public LRUCacheEntry delete(final String namespace, final byte[] key) { diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/ThreadCacheTest.java b/streams/src/test/java/org/apache/kafka/streams/state/internals/ThreadCacheTest.java index 2ff3b891b76..b07da6e971a 100644 --- a/streams/src/test/java/org/apache/kafka/streams/state/internals/ThreadCacheTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/ThreadCacheTest.java @@ -388,6 +388,24 @@ public class ThreadCacheTest { shouldEvictImmediatelyIfCacheSizeIsZeroOrVerySmall(cache); } + @Test + public void shouldEvictAfterPutAll() throws Exception { + final List received = new ArrayList<>(); + final String namespace = "namespace"; + final ThreadCache cache = new ThreadCache(1); + cache.addDirtyEntryFlushListener(namespace, new ThreadCache.DirtyEntryFlushListener() { + @Override + public void apply(final List dirty) { + received.addAll(dirty); + } + }); + + cache.putAll(namespace, Arrays.asList(KeyValue.pair(new byte[]{0}, dirtyEntry(new byte[]{5})), + KeyValue.pair(new byte[]{1}, dirtyEntry(new byte[]{6})))); + + assertEquals(cache.evicts(), 2); + } + @Test public void shouldPutAll() throws Exception { final ThreadCache cache = new ThreadCache(100000); @@ -422,6 +440,25 @@ public class ThreadCacheTest { assertArrayEquals(value, cache.get("n", key).value); } + @Test + public void shouldEvictAfterPutIfAbsent() throws Exception { + final List received = new ArrayList<>(); + final String namespace = "namespace"; + final ThreadCache cache = new ThreadCache(1); + cache.addDirtyEntryFlushListener(namespace, new ThreadCache.DirtyEntryFlushListener() { + @Override + public void apply(final List dirty) { + received.addAll(dirty); + } + }); + + cache.putIfAbsent(namespace, new byte[]{0}, dirtyEntry(new byte[]{5})); + cache.putIfAbsent(namespace, new byte[]{1}, dirtyEntry(new byte[]{6})); + cache.putIfAbsent(namespace, new byte[]{1}, dirtyEntry(new byte[]{6})); + + assertEquals(cache.evicts(), 3); + } + private LRUCacheEntry dirtyEntry(final byte[] key) { return new LRUCacheEntry(key, true, -1, -1, -1, ""); }