Browse Source

HOTFIX: Fix putAll and putIfAbsent logic for correct eviction behavior

Author: Eno Thereska <eno.thereska@gmail.com>

Reviewers: Damian Guy, Guozhang Wang

Closes #2038 from enothereska/hotfix-put-cache
pull/2038/merge
Eno Thereska 8 years ago committed by Guozhang Wang
parent
commit
179d4dc0f2
  1. 14
      streams/src/main/java/org/apache/kafka/streams/state/internals/ThreadCache.java
  2. 37
      streams/src/test/java/org/apache/kafka/streams/state/internals/ThreadCacheTest.java

14
streams/src/main/java/org/apache/kafka/streams/state/internals/ThreadCache.java

@ -123,12 +123,20 @@ public class ThreadCache { @@ -123,12 +123,20 @@ public class ThreadCache {
public LRUCacheEntry putIfAbsent(final String namespace, byte[] key, LRUCacheEntry value) {
final NamedCache cache = getOrCreateCache(namespace);
return cache.putIfAbsent(Bytes.wrap(key), value);
final LRUCacheEntry result = cache.putIfAbsent(Bytes.wrap(key), value);
maybeEvict(namespace);
if (result == null) {
numPuts++;
}
return result;
}
public void putAll(final String namespace, final List<KeyValue<byte[], LRUCacheEntry>> entries) {
final NamedCache cache = getOrCreateCache(namespace);
cache.putAll(entries);
for (KeyValue<byte[], LRUCacheEntry> entry : entries) {
put(namespace, entry.key, entry.value);
}
}
public LRUCacheEntry delete(final String namespace, final byte[] key) {

37
streams/src/test/java/org/apache/kafka/streams/state/internals/ThreadCacheTest.java

@ -388,6 +388,24 @@ public class ThreadCacheTest { @@ -388,6 +388,24 @@ public class ThreadCacheTest {
shouldEvictImmediatelyIfCacheSizeIsZeroOrVerySmall(cache);
}
@Test
public void shouldEvictAfterPutAll() throws Exception {
final List<ThreadCache.DirtyEntry> received = new ArrayList<>();
final String namespace = "namespace";
final ThreadCache cache = new ThreadCache(1);
cache.addDirtyEntryFlushListener(namespace, new ThreadCache.DirtyEntryFlushListener() {
@Override
public void apply(final List<ThreadCache.DirtyEntry> dirty) {
received.addAll(dirty);
}
});
cache.putAll(namespace, Arrays.asList(KeyValue.pair(new byte[]{0}, dirtyEntry(new byte[]{5})),
KeyValue.pair(new byte[]{1}, dirtyEntry(new byte[]{6}))));
assertEquals(cache.evicts(), 2);
}
@Test
public void shouldPutAll() throws Exception {
final ThreadCache cache = new ThreadCache(100000);
@ -422,6 +440,25 @@ public class ThreadCacheTest { @@ -422,6 +440,25 @@ public class ThreadCacheTest {
assertArrayEquals(value, cache.get("n", key).value);
}
@Test
public void shouldEvictAfterPutIfAbsent() throws Exception {
final List<ThreadCache.DirtyEntry> received = new ArrayList<>();
final String namespace = "namespace";
final ThreadCache cache = new ThreadCache(1);
cache.addDirtyEntryFlushListener(namespace, new ThreadCache.DirtyEntryFlushListener() {
@Override
public void apply(final List<ThreadCache.DirtyEntry> dirty) {
received.addAll(dirty);
}
});
cache.putIfAbsent(namespace, new byte[]{0}, dirtyEntry(new byte[]{5}));
cache.putIfAbsent(namespace, new byte[]{1}, dirtyEntry(new byte[]{6}));
cache.putIfAbsent(namespace, new byte[]{1}, dirtyEntry(new byte[]{6}));
assertEquals(cache.evicts(), 3);
}
private LRUCacheEntry dirtyEntry(final byte[] key) {
return new LRUCacheEntry(key, true, -1, -1, -1, "");
}

Loading…
Cancel
Save