Browse Source

Rewrite ConcurrentLruCache implementation

Prior to this commit, the `ConcurrentLruCache` implementation would not
perform well under certain conditions. As long as the cache capacity was
not reached, the cache would avoid maintaining an eviction queue
(reordering entries depending with least/most recently read). When the
cache capacity was reached, the LRU queue was updated for each
read/write operation. This decreased performance significantly under
contention when the capacity was reached.

This commit completely rewrites the internals of `ConcurrentLruCache`.
`ConcurrentLruCache` is now a specialized version of the
`ConcurrentLinkedHashMap` [1]. This change focuses on buferring read and
write operations, only processing them at certain times to avoid
contention.

When a cached entry is read, a read operation is queued and buffered
operations are drained if the buffer reached a fixed limit. When a new
cache entry is added or removed, a write operation is queued and
triggers a drain attempt. When the capacity is outgrown, the cache polls
items from the eviction queue, which maintains elements with the
least recently used ones first. Entries are removed until the capacity
is under control.

The behavior described here and the buffer sizes are optimized with the
number of available processors in mind. Work is localized as much as
possible on a per-thread basis to avoid contention on the eviction queue.

The new implementation has been tested with the JMH benchmark provided
here, comparing the former `COncurrentLruCache`, the new implementation
as well as the `ConcurrentLinkedHashMap` [1].

When testing with a cache reaching capacity, under contention, with a
10% cache miss, we're seeing a 40x improvement compared to the previous
implementation and performance on par with the reference.
See [2] for how to replicate the benchmark.

[1] https://github.com/ben-manes/concurrentlinkedhashmap
[2] https://github.com/spring-projects/spring-framework/wiki/Micro-Benchmarks

Closes gh-26320
pull/29057/head
Brian Clozel 2 years ago
parent
commit
c470262c8e
  1. 76
      spring-core/src/jmh/java/org/springframework/util/ConcurrentLruCacheBenchmark.java
  2. 607
      spring-core/src/main/java/org/springframework/util/ConcurrentLruCache.java
  3. 5
      spring-core/src/test/java/org/springframework/util/ConcurrentLruCacheTests.java
  4. 2
      spring-jdbc/src/main/java/org/springframework/jdbc/core/namedparam/NamedParameterJdbcTemplate.java

76
spring-core/src/jmh/java/org/springframework/util/ConcurrentLruCacheBenchmark.java

@ -0,0 +1,76 @@ @@ -0,0 +1,76 @@
/*
* Copyright 2002-2022 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.springframework.util;
import java.util.ArrayList;
import java.util.List;
import java.util.Random;
import java.util.function.Function;
import org.openjdk.jmh.annotations.Benchmark;
import org.openjdk.jmh.annotations.BenchmarkMode;
import org.openjdk.jmh.annotations.Level;
import org.openjdk.jmh.annotations.Mode;
import org.openjdk.jmh.annotations.Param;
import org.openjdk.jmh.annotations.Scope;
import org.openjdk.jmh.annotations.Setup;
import org.openjdk.jmh.annotations.State;
import org.openjdk.jmh.infra.Blackhole;
/**
* Benchmarks for {@link ConcurrentLruCache}.
* @author Brian Clozel
*/
@BenchmarkMode(Mode.Throughput)
public class ConcurrentLruCacheBenchmark {
@Benchmark
public void lruCache(BenchmarkData data, Blackhole bh) {
for (String element : data.elements) {
String value = data.lruCache.get(element);
bh.consume(value);
}
}
@State(Scope.Benchmark)
public static class BenchmarkData {
ConcurrentLruCache<String, String> lruCache;
@Param({"100"})
public int capacity;
@Param({"0.1"})
public float cacheMissRate;
public List<String> elements;
public Function<String, String> generator;
@Setup(Level.Iteration)
public void setup() {
this.generator = key -> key + "value";
this.lruCache = new ConcurrentLruCache<>(this.capacity, this.generator);
Assert.isTrue(this.cacheMissRate < 1, "cache miss rate should be < 1");
Random random = new Random();
int elementsCount = Math.round(this.capacity * (1 + this.cacheMissRate));
this.elements = new ArrayList<>(elementsCount);
random.ints(elementsCount).forEach(value -> this.elements.add(String.valueOf(value)));
this.elements.sort(String::compareTo);
}
}
}

607
spring-core/src/main/java/org/springframework/util/ConcurrentLruCache.java

@ -1,5 +1,5 @@ @@ -1,5 +1,5 @@
/*
* Copyright 2002-2021 the original author or authors.
* Copyright 2002-2022 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@ -16,116 +16,199 @@ @@ -16,116 +16,199 @@
package org.springframework.util;
import java.util.Queue;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentLinkedDeque;
import java.util.concurrent.locks.ReadWriteLock;
import java.util.concurrent.locks.ReentrantReadWriteLock;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReference;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
import java.util.function.Function;
import org.springframework.lang.Nullable;
/**
* Simple LRU (Least Recently Used) cache, bounded by a specified cache limit.
*
* <p>This implementation is backed by a {@code ConcurrentHashMap} for storing
* the cached values and a {@code ConcurrentLinkedDeque} for ordering the keys
* and choosing the least recently used key when the cache is at full capacity.
* Simple LRU (Least Recently Used) cache, bounded by a specified cache capacity.
* <p>This is a simplified, opinionated implementation of a LRU cache for internal
* use in Spring Framework. It is inspired from
* <a href="https://github.com/ben-manes/concurrentlinkedhashmap">ConcurrentLinkedHashMap</a>.
* <p>Read and write operations are internally recorded in dedicated buffers,
* then drained at chosen times to avoid contention.
*
* @author Brian Clozel
* @author Juergen Hoeller
* @author Ben Manes
* @since 5.3
* @param <K> the type of the key used for cache retrieval
* @param <V> the type of the cached values
* @see #get
* @param <V> the type of the cached values, does not allow null values
* @see #get(Object)
*/
public class ConcurrentLruCache<K, V> {
@SuppressWarnings({"unchecked"})
public final class ConcurrentLruCache<K, V> {
private final int capacity;
private final int sizeLimit;
private final AtomicInteger currentSize = new AtomicInteger();
private final ConcurrentMap<K, Node<K, V>> cache;
private final Function<K, V> generator;
private final ConcurrentHashMap<K, V> cache = new ConcurrentHashMap<>();
private final ReadOperations<K, V> readOperations;
private final ConcurrentLinkedDeque<K> queue = new ConcurrentLinkedDeque<>();
private final WriteOperations writeOperations;
private final ReadWriteLock lock = new ReentrantReadWriteLock();
private final Lock evictionLock = new ReentrantLock();
private volatile int size;
/*
* Queue that contains all ACTIVE cache entries, ordered with least recently used entries first.
* Read and write operations are buffered and periodically processed to reorder the queue.
*/
private final EvictionQueue<K, V> evictionQueue = new EvictionQueue<>();
private final AtomicReference<DrainStatus> drainStatus = new AtomicReference<>(DrainStatus.IDLE);
/**
* Create a new cache instance with the given limit and generator function.
* @param sizeLimit the maximum number of entries in the cache
* Create a new cache instance with the given capacity and generator function.
* @param capacity the maximum number of entries in the cache
* (0 indicates no caching, always generating a new value)
* @param generator a function to generate a new value for a given key
*/
public ConcurrentLruCache(int sizeLimit, Function<K, V> generator) {
Assert.isTrue(sizeLimit >= 0, "Cache size limit must not be negative");
Assert.notNull(generator, "Generator function must not be null");
this.sizeLimit = sizeLimit;
this.generator = generator;
public ConcurrentLruCache(int capacity, Function<K, V> generator) {
this(capacity, generator, 16);
}
private ConcurrentLruCache(int capacity, Function<K, V> generator, int concurrencyLevel) {
Assert.isTrue(capacity > 0, "Capacity should be > 0");
this.capacity = capacity;
this.cache = new ConcurrentHashMap<>(16, 0.75f, concurrencyLevel);
this.generator = generator;
this.readOperations = new ReadOperations<>(this.evictionQueue);
this.writeOperations = new WriteOperations();
}
/**
* Retrieve an entry from the cache, potentially triggering generation
* of the value.
* Retrieve an entry from the cache, potentially triggering generation of the value.
* @param key the key to retrieve the entry for
* @return the cached or newly generated value
*/
public V get(K key) {
if (this.sizeLimit == 0) {
return this.generator.apply(key);
final Node<K, V> node = this.cache.get(key);
if (node == null) {
V value = this.generator.apply(key);
put(key, value);
return value;
}
processRead(node);
return node.getValue();
}
V cached = this.cache.get(key);
if (cached != null) {
if (this.size < this.sizeLimit) {
return cached;
}
this.lock.readLock().lock();
private void put(K key, V value) {
Assert.notNull(key, "key should not be null");
Assert.notNull(value, "value should not be null");
final CacheEntry<V> cacheEntry = new CacheEntry<>(value, CacheEntryState.ACTIVE);
final Node<K, V> node = new Node<>(key, cacheEntry);
final Node<K, V> prior = this.cache.put(node.key, node);
if (prior == null) {
processWrite(new AddTask(node));
}
else {
processRead(prior);
}
}
private void processRead(Node<K, V> node) {
boolean drainRequested = this.readOperations.recordRead(node);
final DrainStatus status = this.drainStatus.get();
if (status.shouldDrainBuffers(drainRequested)) {
drainOperations();
}
}
private void processWrite(Runnable task) {
this.writeOperations.add(task);
this.drainStatus.lazySet(DrainStatus.REQUIRED);
drainOperations();
}
private void drainOperations() {
if (this.evictionLock.tryLock()) {
try {
if (this.queue.removeLastOccurrence(key)) {
this.queue.offer(key);
}
return cached;
this.drainStatus.lazySet(DrainStatus.PROCESSING);
this.readOperations.drain();
this.writeOperations.drain();
}
finally {
this.lock.readLock().unlock();
this.drainStatus.compareAndSet(DrainStatus.PROCESSING, DrainStatus.IDLE);
this.evictionLock.unlock();
}
}
}
/**
* Return the maximum number of entries in the cache.
* @see #size()
*/
public int capacity() {
return this.capacity;
}
this.lock.writeLock().lock();
/**
* Return the maximum number of entries in the cache.
* @deprecated in favor of {@link #capacity()} as of 6.0.
*/
@Deprecated
public int sizeLimit() {
return this.capacity;
}
/**
* Return the current size of the cache.
* @see #capacity()
*/
public int size() {
return this.cache.size();
}
/**
* Immediately remove all entries from this cache.
*/
public void clear() {
this.evictionLock.lock();
try {
// Retrying in case of concurrent reads on the same key
cached = this.cache.get(key);
if (cached != null) {
if (this.queue.removeLastOccurrence(key)) {
this.queue.offer(key);
}
return cached;
}
// Generate value first, to prevent size inconsistency
V value = this.generator.apply(key);
if (this.size == this.sizeLimit) {
K leastUsed = this.queue.poll();
if (leastUsed != null) {
this.cache.remove(leastUsed);
}
Node<K, V> node;
while ((node = this.evictionQueue.poll()) != null) {
this.cache.remove(node.key, node);
markAsRemoved(node);
}
this.queue.offer(key);
this.cache.put(key, value);
this.size = this.cache.size();
return value;
this.readOperations.clear();
this.writeOperations.drainAll();
}
finally {
this.lock.writeLock().unlock();
this.evictionLock.unlock();
}
}
/*
* Transition the node to the {@code removed} state and decrement the current size of the cache.
*/
private void markAsRemoved(Node<K, V> node) {
for (; ; ) {
CacheEntry<V> current = node.get();
CacheEntry<V> removed = new CacheEntry<>(current.value, CacheEntryState.REMOVED);
if (node.compareAndSet(current, removed)) {
this.currentSize.lazySet(this.currentSize.get() - 1);
return;
}
}
}
/**
* Determine whether the given key is present in this cache.
* @param key the key to check for
* @return {@code true} if the key is present,
* {@code false} if there was no matching key
* @return {@code true} if the key is present, {@code false} if there was no matching key
*/
public boolean contains(K key) {
return this.cache.containsKey(key);
@ -137,49 +220,393 @@ public class ConcurrentLruCache<K, V> { @@ -137,49 +220,393 @@ public class ConcurrentLruCache<K, V> {
* @return {@code true} if the key was present before,
* {@code false} if there was no matching key
*/
@Nullable
public boolean remove(K key) {
this.lock.writeLock().lock();
try {
boolean wasPresent = (this.cache.remove(key) != null);
this.queue.remove(key);
this.size = this.cache.size();
return wasPresent;
final Node<K, V> node = this.cache.remove(key);
if (node == null) {
return false;
}
finally {
this.lock.writeLock().unlock();
markForRemoval(node);
processWrite(new RemovalTask(node));
return true;
}
/*
* Transition the node from the {@code active} state to the {@code pending removal} state,
* if the transition is valid.
*/
private void markForRemoval(Node<K, V> node) {
for (; ; ) {
final CacheEntry<V> current = node.get();
if (!current.isActive()) {
return;
}
final CacheEntry<V> pendingRemoval = new CacheEntry<>(current.value, CacheEntryState.PENDING_REMOVAL);
if (node.compareAndSet(current, pendingRemoval)) {
return;
}
}
}
/**
* Immediately remove all entries from this cache.
* Write operation recorded when a new entry is added to the cache.
*/
public void clear() {
this.lock.writeLock().lock();
try {
this.cache.clear();
this.queue.clear();
this.size = 0;
private final class AddTask implements Runnable {
final Node<K, V> node;
AddTask(Node<K, V> node) {
this.node = node;
}
finally {
this.lock.writeLock().unlock();
@Override
public void run() {
currentSize.lazySet(currentSize.get() + 1);
if (this.node.get().isActive()) {
evictionQueue.add(this.node);
evictEntries();
}
}
private void evictEntries() {
while (currentSize.get() > capacity) {
final Node<K, V> node = evictionQueue.poll();
if (node == null) {
return;
}
cache.remove(node.key, node);
markAsRemoved(node);
}
}
}
/**
* Return the current size of the cache.
* @see #sizeLimit()
* Write operation recorded when an entry is removed to the cache.
*/
public int size() {
return this.size;
private final class RemovalTask implements Runnable {
final Node<K, V> node;
RemovalTask(Node<K, V> node) {
this.node = node;
}
@Override
public void run() {
evictionQueue.remove(this.node);
markAsRemoved(this.node);
}
}
/**
* Return the maximum number of entries in the cache
* (0 indicates no caching, always generating a new value).
* @see #size()
/*
* Draining status for the read/write buffers.
*/
public int sizeLimit() {
return this.sizeLimit;
private enum DrainStatus {
/*
* No drain operation currently running.
*/
IDLE {
@Override
boolean shouldDrainBuffers(boolean delayable) {
return !delayable;
}
},
/*
* A drain operation is required due to a pending write modification.
*/
REQUIRED {
@Override
boolean shouldDrainBuffers(boolean delayable) {
return true;
}
},
/*
* A drain operation is in progress.
*/
PROCESSING {
@Override
boolean shouldDrainBuffers(boolean delayable) {
return false;
}
};
/**
* Determine whether the buffers should be drained.
* @param delayable if a drain should be delayed until required
* @return if a drain should be attempted
*/
abstract boolean shouldDrainBuffers(boolean delayable);
}
private enum CacheEntryState {
ACTIVE, PENDING_REMOVAL, REMOVED
}
private record CacheEntry<V>(V value, CacheEntryState state) {
boolean isActive() {
return this.state == CacheEntryState.ACTIVE;
}
}
private static final class ReadOperations<K, V> {
private static final int BUFFER_COUNT = detectNumberOfBuffers();
private static int detectNumberOfBuffers() {
int availableProcessors = Runtime.getRuntime().availableProcessors();
return 1 << (Integer.SIZE - Integer.numberOfLeadingZeros(availableProcessors - 1));
}
private static final int BUFFERS_MASK = BUFFER_COUNT - 1;
private static final int MAX_PENDING_OPERATIONS = 32;
private static final int MAX_DRAIN_COUNT = 2 * MAX_PENDING_OPERATIONS;
private static final int BUFFER_SIZE = 2 * MAX_DRAIN_COUNT;
private static final int BUFFER_INDEX_MASK = BUFFER_SIZE - 1;
/*
* Number of operations recorded, for each buffer
*/
private final AtomicLong[] recordedCount = new AtomicLong[BUFFER_COUNT];
/*
* Number of operations read, for each buffer
*/
private final long[] readCount = new long[BUFFER_COUNT];
/*
* Number of operations processed, for each buffer
*/
private final AtomicLong[] processedCount = new AtomicLong[BUFFER_COUNT];
@SuppressWarnings("rawtypes")
private final AtomicReference<Node<K, V>>[][] buffers = new AtomicReference[BUFFER_COUNT][BUFFER_SIZE];
private final EvictionQueue<K, V> evictionQueue;
@SuppressWarnings("rawtypes")
ReadOperations(EvictionQueue<K, V> evictionQueue) {
this.evictionQueue = evictionQueue;
for (int i = 0; i < BUFFER_COUNT; i++) {
this.recordedCount[i] = new AtomicLong();
this.processedCount[i] = new AtomicLong();
this.buffers[i] = new AtomicReference[BUFFER_SIZE];
for (int j = 0; j < BUFFER_SIZE; j++) {
this.buffers[i][j] = new AtomicReference<>();
}
}
}
private static int getBufferIndex() {
return ((int) Thread.currentThread().getId()) & BUFFERS_MASK;
}
boolean recordRead(Node<K, V> node) {
int bufferIndex = getBufferIndex();
final AtomicLong counter = this.recordedCount[bufferIndex];
final long writeCount = counter.get();
counter.lazySet(writeCount + 1);
final int index = (int) (writeCount & BUFFER_INDEX_MASK);
this.buffers[bufferIndex][index].lazySet(node);
final long pending = (writeCount - this.processedCount[bufferIndex].get());
return (pending < MAX_PENDING_OPERATIONS);
}
void drain() {
final int start = (int) Thread.currentThread().getId();
final int end = start + BUFFER_COUNT;
for (int i = start; i < end; i++) {
drainReadBuffer(i & BUFFERS_MASK);
}
}
void clear() {
for (AtomicReference<Node<K, V>>[] buffer : this.buffers) {
for (AtomicReference<Node<K, V>> slot : buffer) {
slot.lazySet(null);
}
}
}
private void drainReadBuffer(int bufferIndex) {
final long writeCount = this.recordedCount[bufferIndex].get();
for (int i = 0; i < MAX_DRAIN_COUNT; i++) {
final int index = (int) (this.readCount[bufferIndex] & BUFFER_INDEX_MASK);
final AtomicReference<Node<K, V>> slot = this.buffers[bufferIndex][index];
final Node<K, V> node = slot.get();
if (node == null) {
break;
}
slot.lazySet(null);
this.evictionQueue.moveToBack(node);
this.readCount[bufferIndex]++;
}
this.processedCount[bufferIndex].lazySet(writeCount);
}
}
private static final class WriteOperations {
private static final int DRAIN_THRESHOLD = 16;
private final Queue<Runnable> operations = new ConcurrentLinkedQueue<>();
public void add(Runnable task) {
this.operations.add(task);
}
public void drain() {
for (int i = 0; i < DRAIN_THRESHOLD; i++) {
final Runnable task = this.operations.poll();
if (task == null) {
break;
}
task.run();
}
}
public void drainAll() {
Runnable task;
while ((task = this.operations.poll()) != null) {
task.run();
}
}
}
@SuppressWarnings("serial")
private static final class Node<K, V> extends AtomicReference<CacheEntry<V>> {
final K key;
@Nullable
Node<K, V> prev;
@Nullable
Node<K, V> next;
Node(K key, CacheEntry<V> cacheEntry) {
super(cacheEntry);
this.key = key;
}
@Nullable
public Node<K, V> getPrevious() {
return this.prev;
}
public void setPrevious(@Nullable Node<K, V> prev) {
this.prev = prev;
}
@Nullable
public Node<K, V> getNext() {
return this.next;
}
public void setNext(@Nullable Node<K, V> next) {
this.next = next;
}
V getValue() {
return get().value;
}
}
private static final class EvictionQueue<K, V> {
@Nullable
Node<K, V> first;
@Nullable
Node<K, V> last;
@Nullable
Node<K, V> poll() {
if (this.first == null) {
return null;
}
final Node<K, V> f = this.first;
final Node<K, V> next = f.getNext();
f.setNext(null);
this.first = next;
if (next == null) {
this.last = null;
}
else {
next.setPrevious(null);
}
return f;
}
void add(Node<K, V> e) {
if (contains(e)) {
return;
}
linkLast(e);
}
private boolean contains(Node<K, V> e) {
return (e.getPrevious() != null)
|| (e.getNext() != null)
|| (e == this.first);
}
private void linkLast(final Node<K, V> e) {
final Node<K, V> l = this.last;
this.last = e;
if (l == null) {
this.first = e;
}
else {
l.setNext(e);
e.setPrevious(l);
}
}
private void unlink(Node<K, V> e) {
final Node<K, V> prev = e.getPrevious();
final Node<K, V> next = e.getNext();
if (prev == null) {
this.first = next;
}
else {
prev.setNext(next);
e.setPrevious(null);
}
if (next == null) {
this.last = prev;
}
else {
next.setPrevious(prev);
e.setNext(null);
}
}
void moveToBack(Node<K, V> e) {
if (contains(e) && e != this.last) {
unlink(e);
linkLast(e);
}
}
void remove(Node<K, V> e) {
if (contains(e)) {
unlink(e);
}
}
}
}

5
spring-core/src/test/java/org/springframework/util/ConcurrentLruCacheTests.java

@ -1,5 +1,5 @@ @@ -1,5 +1,5 @@
/*
* Copyright 2002-2020 the original author or authors.
* Copyright 2002-2022 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@ -21,6 +21,7 @@ import org.junit.jupiter.api.Test; @@ -21,6 +21,7 @@ import org.junit.jupiter.api.Test;
import static org.assertj.core.api.Assertions.assertThat;
/**
* Tests for {@link ConcurrentLruCache}.
* @author Juergen Hoeller
*/
class ConcurrentLruCacheTests {
@ -30,7 +31,7 @@ class ConcurrentLruCacheTests { @@ -30,7 +31,7 @@ class ConcurrentLruCacheTests {
@Test
void getAndSize() {
assertThat(this.cache.sizeLimit()).isEqualTo(2);
assertThat(this.cache.capacity()).isEqualTo(2);
assertThat(this.cache.size()).isEqualTo(0);
assertThat(this.cache.get("k1")).isEqualTo("k1value");
assertThat(this.cache.size()).isEqualTo(1);

2
spring-jdbc/src/main/java/org/springframework/jdbc/core/namedparam/NamedParameterJdbcTemplate.java

@ -135,7 +135,7 @@ public class NamedParameterJdbcTemplate implements NamedParameterJdbcOperations @@ -135,7 +135,7 @@ public class NamedParameterJdbcTemplate implements NamedParameterJdbcOperations
* Return the maximum number of entries for this template's SQL cache.
*/
public int getCacheLimit() {
return this.parsedSqlCache.sizeLimit();
return this.parsedSqlCache.capacity();
}

Loading…
Cancel
Save