From 179d4dc0f2a23d7e67caf4875a0563e74027933a Mon Sep 17 00:00:00 2001 From: Eno Thereska Date: Wed, 19 Oct 2016 14:01:23 -0700 Subject: [PATCH] HOTFIX: Fix putAll and putIfAbsent logic for correct eviction behavior Author: Eno Thereska Reviewers: Damian Guy, Guozhang Wang Closes #2038 from enothereska/hotfix-put-cache --- .../streams/state/internals/ThreadCache.java | 14 +++++-- .../state/internals/ThreadCacheTest.java | 37 +++++++++++++++++++ 2 files changed, 48 insertions(+), 3 deletions(-) diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/ThreadCache.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/ThreadCache.java index d76e5c8ff52..f7355d80202 100644 --- a/streams/src/main/java/org/apache/kafka/streams/state/internals/ThreadCache.java +++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/ThreadCache.java @@ -123,12 +123,20 @@ public class ThreadCache { public LRUCacheEntry putIfAbsent(final String namespace, byte[] key, LRUCacheEntry value) { final NamedCache cache = getOrCreateCache(namespace); - return cache.putIfAbsent(Bytes.wrap(key), value); + + final LRUCacheEntry result = cache.putIfAbsent(Bytes.wrap(key), value); + maybeEvict(namespace); + + if (result == null) { + numPuts++; + } + return result; } public void putAll(final String namespace, final List> entries) { - final NamedCache cache = getOrCreateCache(namespace); - cache.putAll(entries); + for (KeyValue entry : entries) { + put(namespace, entry.key, entry.value); + } } public LRUCacheEntry delete(final String namespace, final byte[] key) { diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/ThreadCacheTest.java b/streams/src/test/java/org/apache/kafka/streams/state/internals/ThreadCacheTest.java index 2ff3b891b76..b07da6e971a 100644 --- a/streams/src/test/java/org/apache/kafka/streams/state/internals/ThreadCacheTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/ThreadCacheTest.java @@ -388,6 +388,24 @@ public class ThreadCacheTest { shouldEvictImmediatelyIfCacheSizeIsZeroOrVerySmall(cache); } + @Test + public void shouldEvictAfterPutAll() throws Exception { + final List received = new ArrayList<>(); + final String namespace = "namespace"; + final ThreadCache cache = new ThreadCache(1); + cache.addDirtyEntryFlushListener(namespace, new ThreadCache.DirtyEntryFlushListener() { + @Override + public void apply(final List dirty) { + received.addAll(dirty); + } + }); + + cache.putAll(namespace, Arrays.asList(KeyValue.pair(new byte[]{0}, dirtyEntry(new byte[]{5})), + KeyValue.pair(new byte[]{1}, dirtyEntry(new byte[]{6})))); + + assertEquals(cache.evicts(), 2); + } + @Test public void shouldPutAll() throws Exception { final ThreadCache cache = new ThreadCache(100000); @@ -422,6 +440,25 @@ public class ThreadCacheTest { assertArrayEquals(value, cache.get("n", key).value); } + @Test + public void shouldEvictAfterPutIfAbsent() throws Exception { + final List received = new ArrayList<>(); + final String namespace = "namespace"; + final ThreadCache cache = new ThreadCache(1); + cache.addDirtyEntryFlushListener(namespace, new ThreadCache.DirtyEntryFlushListener() { + @Override + public void apply(final List dirty) { + received.addAll(dirty); + } + }); + + cache.putIfAbsent(namespace, new byte[]{0}, dirtyEntry(new byte[]{5})); + cache.putIfAbsent(namespace, new byte[]{1}, dirtyEntry(new byte[]{6})); + cache.putIfAbsent(namespace, new byte[]{1}, dirtyEntry(new byte[]{6})); + + assertEquals(cache.evicts(), 3); + } + private LRUCacheEntry dirtyEntry(final byte[] key) { return new LRUCacheEntry(key, true, -1, -1, -1, ""); }