From c470262c8ea733fd0846a50232599715ec2d2b83 Mon Sep 17 00:00:00 2001 From: Brian Clozel Date: Mon, 22 Aug 2022 13:54:05 +0200 Subject: [PATCH] Rewrite ConcurrentLruCache implementation Prior to this commit, the `ConcurrentLruCache` implementation would not perform well under certain conditions. As long as the cache capacity was not reached, the cache would avoid maintaining an eviction queue (reordering entries depending with least/most recently read). When the cache capacity was reached, the LRU queue was updated for each read/write operation. This decreased performance significantly under contention when the capacity was reached. This commit completely rewrites the internals of `ConcurrentLruCache`. `ConcurrentLruCache` is now a specialized version of the `ConcurrentLinkedHashMap` [1]. This change focuses on buferring read and write operations, only processing them at certain times to avoid contention. When a cached entry is read, a read operation is queued and buffered operations are drained if the buffer reached a fixed limit. When a new cache entry is added or removed, a write operation is queued and triggers a drain attempt. When the capacity is outgrown, the cache polls items from the eviction queue, which maintains elements with the least recently used ones first. Entries are removed until the capacity is under control. The behavior described here and the buffer sizes are optimized with the number of available processors in mind. Work is localized as much as possible on a per-thread basis to avoid contention on the eviction queue. The new implementation has been tested with the JMH benchmark provided here, comparing the former `COncurrentLruCache`, the new implementation as well as the `ConcurrentLinkedHashMap` [1]. When testing with a cache reaching capacity, under contention, with a 10% cache miss, we're seeing a 40x improvement compared to the previous implementation and performance on par with the reference. See [2] for how to replicate the benchmark. [1] https://github.com/ben-manes/concurrentlinkedhashmap [2] https://github.com/spring-projects/spring-framework/wiki/Micro-Benchmarks Closes gh-26320 --- .../util/ConcurrentLruCacheBenchmark.java | 76 +++ .../util/ConcurrentLruCache.java | 607 +++++++++++++++--- .../util/ConcurrentLruCacheTests.java | 5 +- .../NamedParameterJdbcTemplate.java | 2 +- 4 files changed, 597 insertions(+), 93 deletions(-) create mode 100644 spring-core/src/jmh/java/org/springframework/util/ConcurrentLruCacheBenchmark.java diff --git a/spring-core/src/jmh/java/org/springframework/util/ConcurrentLruCacheBenchmark.java b/spring-core/src/jmh/java/org/springframework/util/ConcurrentLruCacheBenchmark.java new file mode 100644 index 0000000000..7aa4e8322d --- /dev/null +++ b/spring-core/src/jmh/java/org/springframework/util/ConcurrentLruCacheBenchmark.java @@ -0,0 +1,76 @@ +/* + * Copyright 2002-2022 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.springframework.util; + +import java.util.ArrayList; +import java.util.List; +import java.util.Random; +import java.util.function.Function; + +import org.openjdk.jmh.annotations.Benchmark; +import org.openjdk.jmh.annotations.BenchmarkMode; +import org.openjdk.jmh.annotations.Level; +import org.openjdk.jmh.annotations.Mode; +import org.openjdk.jmh.annotations.Param; +import org.openjdk.jmh.annotations.Scope; +import org.openjdk.jmh.annotations.Setup; +import org.openjdk.jmh.annotations.State; +import org.openjdk.jmh.infra.Blackhole; + +/** + * Benchmarks for {@link ConcurrentLruCache}. + * @author Brian Clozel + */ +@BenchmarkMode(Mode.Throughput) +public class ConcurrentLruCacheBenchmark { + + @Benchmark + public void lruCache(BenchmarkData data, Blackhole bh) { + for (String element : data.elements) { + String value = data.lruCache.get(element); + bh.consume(value); + } + } + + @State(Scope.Benchmark) + public static class BenchmarkData { + + ConcurrentLruCache lruCache; + + @Param({"100"}) + public int capacity; + + @Param({"0.1"}) + public float cacheMissRate; + + public List elements; + + public Function generator; + + @Setup(Level.Iteration) + public void setup() { + this.generator = key -> key + "value"; + this.lruCache = new ConcurrentLruCache<>(this.capacity, this.generator); + Assert.isTrue(this.cacheMissRate < 1, "cache miss rate should be < 1"); + Random random = new Random(); + int elementsCount = Math.round(this.capacity * (1 + this.cacheMissRate)); + this.elements = new ArrayList<>(elementsCount); + random.ints(elementsCount).forEach(value -> this.elements.add(String.valueOf(value))); + this.elements.sort(String::compareTo); + } + } +} diff --git a/spring-core/src/main/java/org/springframework/util/ConcurrentLruCache.java b/spring-core/src/main/java/org/springframework/util/ConcurrentLruCache.java index 1d99b4cd09..c4d02819bf 100644 --- a/spring-core/src/main/java/org/springframework/util/ConcurrentLruCache.java +++ b/spring-core/src/main/java/org/springframework/util/ConcurrentLruCache.java @@ -1,5 +1,5 @@ /* - * Copyright 2002-2021 the original author or authors. + * Copyright 2002-2022 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -16,116 +16,199 @@ package org.springframework.util; +import java.util.Queue; import java.util.concurrent.ConcurrentHashMap; -import java.util.concurrent.ConcurrentLinkedDeque; -import java.util.concurrent.locks.ReadWriteLock; -import java.util.concurrent.locks.ReentrantReadWriteLock; +import java.util.concurrent.ConcurrentLinkedQueue; +import java.util.concurrent.ConcurrentMap; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.atomic.AtomicLong; +import java.util.concurrent.atomic.AtomicReference; +import java.util.concurrent.locks.Lock; +import java.util.concurrent.locks.ReentrantLock; import java.util.function.Function; +import org.springframework.lang.Nullable; + + /** - * Simple LRU (Least Recently Used) cache, bounded by a specified cache limit. - * - *

This implementation is backed by a {@code ConcurrentHashMap} for storing - * the cached values and a {@code ConcurrentLinkedDeque} for ordering the keys - * and choosing the least recently used key when the cache is at full capacity. + * Simple LRU (Least Recently Used) cache, bounded by a specified cache capacity. + *

This is a simplified, opinionated implementation of a LRU cache for internal + * use in Spring Framework. It is inspired from + * ConcurrentLinkedHashMap. + *

Read and write operations are internally recorded in dedicated buffers, + * then drained at chosen times to avoid contention. * * @author Brian Clozel - * @author Juergen Hoeller + * @author Ben Manes * @since 5.3 * @param the type of the key used for cache retrieval - * @param the type of the cached values - * @see #get + * @param the type of the cached values, does not allow null values + * @see #get(Object) */ -public class ConcurrentLruCache { +@SuppressWarnings({"unchecked"}) +public final class ConcurrentLruCache { + + private final int capacity; - private final int sizeLimit; + private final AtomicInteger currentSize = new AtomicInteger(); + + private final ConcurrentMap> cache; private final Function generator; - private final ConcurrentHashMap cache = new ConcurrentHashMap<>(); + private final ReadOperations readOperations; - private final ConcurrentLinkedDeque queue = new ConcurrentLinkedDeque<>(); + private final WriteOperations writeOperations; - private final ReadWriteLock lock = new ReentrantReadWriteLock(); + private final Lock evictionLock = new ReentrantLock(); - private volatile int size; + /* + * Queue that contains all ACTIVE cache entries, ordered with least recently used entries first. + * Read and write operations are buffered and periodically processed to reorder the queue. + */ + private final EvictionQueue evictionQueue = new EvictionQueue<>(); + private final AtomicReference drainStatus = new AtomicReference<>(DrainStatus.IDLE); /** - * Create a new cache instance with the given limit and generator function. - * @param sizeLimit the maximum number of entries in the cache + * Create a new cache instance with the given capacity and generator function. + * @param capacity the maximum number of entries in the cache * (0 indicates no caching, always generating a new value) * @param generator a function to generate a new value for a given key */ - public ConcurrentLruCache(int sizeLimit, Function generator) { - Assert.isTrue(sizeLimit >= 0, "Cache size limit must not be negative"); - Assert.notNull(generator, "Generator function must not be null"); - this.sizeLimit = sizeLimit; - this.generator = generator; + public ConcurrentLruCache(int capacity, Function generator) { + this(capacity, generator, 16); } + private ConcurrentLruCache(int capacity, Function generator, int concurrencyLevel) { + Assert.isTrue(capacity > 0, "Capacity should be > 0"); + this.capacity = capacity; + this.cache = new ConcurrentHashMap<>(16, 0.75f, concurrencyLevel); + this.generator = generator; + this.readOperations = new ReadOperations<>(this.evictionQueue); + this.writeOperations = new WriteOperations(); + } /** - * Retrieve an entry from the cache, potentially triggering generation - * of the value. + * Retrieve an entry from the cache, potentially triggering generation of the value. * @param key the key to retrieve the entry for * @return the cached or newly generated value */ public V get(K key) { - if (this.sizeLimit == 0) { - return this.generator.apply(key); + final Node node = this.cache.get(key); + if (node == null) { + V value = this.generator.apply(key); + put(key, value); + return value; } + processRead(node); + return node.getValue(); + } - V cached = this.cache.get(key); - if (cached != null) { - if (this.size < this.sizeLimit) { - return cached; - } - this.lock.readLock().lock(); + private void put(K key, V value) { + Assert.notNull(key, "key should not be null"); + Assert.notNull(value, "value should not be null"); + final CacheEntry cacheEntry = new CacheEntry<>(value, CacheEntryState.ACTIVE); + final Node node = new Node<>(key, cacheEntry); + final Node prior = this.cache.put(node.key, node); + if (prior == null) { + processWrite(new AddTask(node)); + } + else { + processRead(prior); + } + } + + private void processRead(Node node) { + boolean drainRequested = this.readOperations.recordRead(node); + final DrainStatus status = this.drainStatus.get(); + if (status.shouldDrainBuffers(drainRequested)) { + drainOperations(); + } + } + + private void processWrite(Runnable task) { + this.writeOperations.add(task); + this.drainStatus.lazySet(DrainStatus.REQUIRED); + drainOperations(); + } + + private void drainOperations() { + if (this.evictionLock.tryLock()) { try { - if (this.queue.removeLastOccurrence(key)) { - this.queue.offer(key); - } - return cached; + this.drainStatus.lazySet(DrainStatus.PROCESSING); + this.readOperations.drain(); + this.writeOperations.drain(); } finally { - this.lock.readLock().unlock(); + this.drainStatus.compareAndSet(DrainStatus.PROCESSING, DrainStatus.IDLE); + this.evictionLock.unlock(); } } + } + + /** + * Return the maximum number of entries in the cache. + * @see #size() + */ + public int capacity() { + return this.capacity; + } - this.lock.writeLock().lock(); + /** + * Return the maximum number of entries in the cache. + * @deprecated in favor of {@link #capacity()} as of 6.0. + */ + @Deprecated + public int sizeLimit() { + return this.capacity; + } + + /** + * Return the current size of the cache. + * @see #capacity() + */ + public int size() { + return this.cache.size(); + } + + /** + * Immediately remove all entries from this cache. + */ + public void clear() { + this.evictionLock.lock(); try { - // Retrying in case of concurrent reads on the same key - cached = this.cache.get(key); - if (cached != null) { - if (this.queue.removeLastOccurrence(key)) { - this.queue.offer(key); - } - return cached; - } - // Generate value first, to prevent size inconsistency - V value = this.generator.apply(key); - if (this.size == this.sizeLimit) { - K leastUsed = this.queue.poll(); - if (leastUsed != null) { - this.cache.remove(leastUsed); - } + Node node; + while ((node = this.evictionQueue.poll()) != null) { + this.cache.remove(node.key, node); + markAsRemoved(node); } - this.queue.offer(key); - this.cache.put(key, value); - this.size = this.cache.size(); - return value; + this.readOperations.clear(); + this.writeOperations.drainAll(); } finally { - this.lock.writeLock().unlock(); + this.evictionLock.unlock(); + } + } + + /* + * Transition the node to the {@code removed} state and decrement the current size of the cache. + */ + private void markAsRemoved(Node node) { + for (; ; ) { + CacheEntry current = node.get(); + CacheEntry removed = new CacheEntry<>(current.value, CacheEntryState.REMOVED); + if (node.compareAndSet(current, removed)) { + this.currentSize.lazySet(this.currentSize.get() - 1); + return; + } } } /** * Determine whether the given key is present in this cache. * @param key the key to check for - * @return {@code true} if the key is present, - * {@code false} if there was no matching key + * @return {@code true} if the key is present, {@code false} if there was no matching key */ public boolean contains(K key) { return this.cache.containsKey(key); @@ -137,49 +220,393 @@ public class ConcurrentLruCache { * @return {@code true} if the key was present before, * {@code false} if there was no matching key */ + @Nullable public boolean remove(K key) { - this.lock.writeLock().lock(); - try { - boolean wasPresent = (this.cache.remove(key) != null); - this.queue.remove(key); - this.size = this.cache.size(); - return wasPresent; + final Node node = this.cache.remove(key); + if (node == null) { + return false; } - finally { - this.lock.writeLock().unlock(); + markForRemoval(node); + processWrite(new RemovalTask(node)); + return true; + } + + /* + * Transition the node from the {@code active} state to the {@code pending removal} state, + * if the transition is valid. + */ + private void markForRemoval(Node node) { + for (; ; ) { + final CacheEntry current = node.get(); + if (!current.isActive()) { + return; + } + final CacheEntry pendingRemoval = new CacheEntry<>(current.value, CacheEntryState.PENDING_REMOVAL); + if (node.compareAndSet(current, pendingRemoval)) { + return; + } } } /** - * Immediately remove all entries from this cache. + * Write operation recorded when a new entry is added to the cache. */ - public void clear() { - this.lock.writeLock().lock(); - try { - this.cache.clear(); - this.queue.clear(); - this.size = 0; + private final class AddTask implements Runnable { + final Node node; + + AddTask(Node node) { + this.node = node; } - finally { - this.lock.writeLock().unlock(); + + @Override + public void run() { + currentSize.lazySet(currentSize.get() + 1); + if (this.node.get().isActive()) { + evictionQueue.add(this.node); + evictEntries(); + } } + + private void evictEntries() { + while (currentSize.get() > capacity) { + final Node node = evictionQueue.poll(); + if (node == null) { + return; + } + cache.remove(node.key, node); + markAsRemoved(node); + } + } + } + /** - * Return the current size of the cache. - * @see #sizeLimit() + * Write operation recorded when an entry is removed to the cache. */ - public int size() { - return this.size; + private final class RemovalTask implements Runnable { + final Node node; + + RemovalTask(Node node) { + this.node = node; + } + + @Override + public void run() { + evictionQueue.remove(this.node); + markAsRemoved(this.node); + } } - /** - * Return the maximum number of entries in the cache - * (0 indicates no caching, always generating a new value). - * @see #size() + + /* + * Draining status for the read/write buffers. */ - public int sizeLimit() { - return this.sizeLimit; + private enum DrainStatus { + + /* + * No drain operation currently running. + */ + IDLE { + @Override + boolean shouldDrainBuffers(boolean delayable) { + return !delayable; + } + }, + + /* + * A drain operation is required due to a pending write modification. + */ + REQUIRED { + @Override + boolean shouldDrainBuffers(boolean delayable) { + return true; + } + }, + + /* + * A drain operation is in progress. + */ + PROCESSING { + @Override + boolean shouldDrainBuffers(boolean delayable) { + return false; + } + }; + + /** + * Determine whether the buffers should be drained. + * @param delayable if a drain should be delayed until required + * @return if a drain should be attempted + */ + abstract boolean shouldDrainBuffers(boolean delayable); + } + + private enum CacheEntryState { + ACTIVE, PENDING_REMOVAL, REMOVED + } + + private record CacheEntry(V value, CacheEntryState state) { + + boolean isActive() { + return this.state == CacheEntryState.ACTIVE; + } + } + + private static final class ReadOperations { + + private static final int BUFFER_COUNT = detectNumberOfBuffers(); + + private static int detectNumberOfBuffers() { + int availableProcessors = Runtime.getRuntime().availableProcessors(); + return 1 << (Integer.SIZE - Integer.numberOfLeadingZeros(availableProcessors - 1)); + } + + private static final int BUFFERS_MASK = BUFFER_COUNT - 1; + + private static final int MAX_PENDING_OPERATIONS = 32; + + private static final int MAX_DRAIN_COUNT = 2 * MAX_PENDING_OPERATIONS; + + private static final int BUFFER_SIZE = 2 * MAX_DRAIN_COUNT; + + private static final int BUFFER_INDEX_MASK = BUFFER_SIZE - 1; + + /* + * Number of operations recorded, for each buffer + */ + private final AtomicLong[] recordedCount = new AtomicLong[BUFFER_COUNT]; + + /* + * Number of operations read, for each buffer + */ + private final long[] readCount = new long[BUFFER_COUNT]; + + /* + * Number of operations processed, for each buffer + */ + private final AtomicLong[] processedCount = new AtomicLong[BUFFER_COUNT]; + + @SuppressWarnings("rawtypes") + private final AtomicReference>[][] buffers = new AtomicReference[BUFFER_COUNT][BUFFER_SIZE]; + + private final EvictionQueue evictionQueue; + + @SuppressWarnings("rawtypes") + ReadOperations(EvictionQueue evictionQueue) { + this.evictionQueue = evictionQueue; + for (int i = 0; i < BUFFER_COUNT; i++) { + this.recordedCount[i] = new AtomicLong(); + this.processedCount[i] = new AtomicLong(); + this.buffers[i] = new AtomicReference[BUFFER_SIZE]; + for (int j = 0; j < BUFFER_SIZE; j++) { + this.buffers[i][j] = new AtomicReference<>(); + } + } + } + + private static int getBufferIndex() { + return ((int) Thread.currentThread().getId()) & BUFFERS_MASK; + } + + boolean recordRead(Node node) { + int bufferIndex = getBufferIndex(); + final AtomicLong counter = this.recordedCount[bufferIndex]; + final long writeCount = counter.get(); + counter.lazySet(writeCount + 1); + final int index = (int) (writeCount & BUFFER_INDEX_MASK); + this.buffers[bufferIndex][index].lazySet(node); + final long pending = (writeCount - this.processedCount[bufferIndex].get()); + return (pending < MAX_PENDING_OPERATIONS); + } + + void drain() { + final int start = (int) Thread.currentThread().getId(); + final int end = start + BUFFER_COUNT; + for (int i = start; i < end; i++) { + drainReadBuffer(i & BUFFERS_MASK); + } + } + + void clear() { + for (AtomicReference>[] buffer : this.buffers) { + for (AtomicReference> slot : buffer) { + slot.lazySet(null); + } + } + } + + private void drainReadBuffer(int bufferIndex) { + final long writeCount = this.recordedCount[bufferIndex].get(); + for (int i = 0; i < MAX_DRAIN_COUNT; i++) { + final int index = (int) (this.readCount[bufferIndex] & BUFFER_INDEX_MASK); + final AtomicReference> slot = this.buffers[bufferIndex][index]; + final Node node = slot.get(); + if (node == null) { + break; + } + slot.lazySet(null); + this.evictionQueue.moveToBack(node); + this.readCount[bufferIndex]++; + } + this.processedCount[bufferIndex].lazySet(writeCount); + } + } + + private static final class WriteOperations { + + private static final int DRAIN_THRESHOLD = 16; + + private final Queue operations = new ConcurrentLinkedQueue<>(); + + public void add(Runnable task) { + this.operations.add(task); + } + + public void drain() { + for (int i = 0; i < DRAIN_THRESHOLD; i++) { + final Runnable task = this.operations.poll(); + if (task == null) { + break; + } + task.run(); + } + } + + public void drainAll() { + Runnable task; + while ((task = this.operations.poll()) != null) { + task.run(); + } + } + + } + + @SuppressWarnings("serial") + private static final class Node extends AtomicReference> { + final K key; + + @Nullable + Node prev; + + @Nullable + Node next; + + Node(K key, CacheEntry cacheEntry) { + super(cacheEntry); + this.key = key; + } + + @Nullable + public Node getPrevious() { + return this.prev; + } + + public void setPrevious(@Nullable Node prev) { + this.prev = prev; + } + + @Nullable + public Node getNext() { + return this.next; + } + + public void setNext(@Nullable Node next) { + this.next = next; + } + + V getValue() { + return get().value; + } + } + + + private static final class EvictionQueue { + + @Nullable + Node first; + + @Nullable + Node last; + + + @Nullable + Node poll() { + if (this.first == null) { + return null; + } + final Node f = this.first; + final Node next = f.getNext(); + f.setNext(null); + + this.first = next; + if (next == null) { + this.last = null; + } + else { + next.setPrevious(null); + } + return f; + } + + void add(Node e) { + if (contains(e)) { + return; + } + linkLast(e); + } + + private boolean contains(Node e) { + return (e.getPrevious() != null) + || (e.getNext() != null) + || (e == this.first); + } + + private void linkLast(final Node e) { + final Node l = this.last; + this.last = e; + + if (l == null) { + this.first = e; + } + else { + l.setNext(e); + e.setPrevious(l); + } + } + + private void unlink(Node e) { + final Node prev = e.getPrevious(); + final Node next = e.getNext(); + if (prev == null) { + this.first = next; + } + else { + prev.setNext(next); + e.setPrevious(null); + } + if (next == null) { + this.last = prev; + } + else { + next.setPrevious(prev); + e.setNext(null); + } + } + + void moveToBack(Node e) { + if (contains(e) && e != this.last) { + unlink(e); + linkLast(e); + } + } + + void remove(Node e) { + if (contains(e)) { + unlink(e); + } + } + } } diff --git a/spring-core/src/test/java/org/springframework/util/ConcurrentLruCacheTests.java b/spring-core/src/test/java/org/springframework/util/ConcurrentLruCacheTests.java index 31ec75944f..5c62362c11 100644 --- a/spring-core/src/test/java/org/springframework/util/ConcurrentLruCacheTests.java +++ b/spring-core/src/test/java/org/springframework/util/ConcurrentLruCacheTests.java @@ -1,5 +1,5 @@ /* - * Copyright 2002-2020 the original author or authors. + * Copyright 2002-2022 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -21,6 +21,7 @@ import org.junit.jupiter.api.Test; import static org.assertj.core.api.Assertions.assertThat; /** + * Tests for {@link ConcurrentLruCache}. * @author Juergen Hoeller */ class ConcurrentLruCacheTests { @@ -30,7 +31,7 @@ class ConcurrentLruCacheTests { @Test void getAndSize() { - assertThat(this.cache.sizeLimit()).isEqualTo(2); + assertThat(this.cache.capacity()).isEqualTo(2); assertThat(this.cache.size()).isEqualTo(0); assertThat(this.cache.get("k1")).isEqualTo("k1value"); assertThat(this.cache.size()).isEqualTo(1); diff --git a/spring-jdbc/src/main/java/org/springframework/jdbc/core/namedparam/NamedParameterJdbcTemplate.java b/spring-jdbc/src/main/java/org/springframework/jdbc/core/namedparam/NamedParameterJdbcTemplate.java index 3d3752b5bf..5159ed16cd 100644 --- a/spring-jdbc/src/main/java/org/springframework/jdbc/core/namedparam/NamedParameterJdbcTemplate.java +++ b/spring-jdbc/src/main/java/org/springframework/jdbc/core/namedparam/NamedParameterJdbcTemplate.java @@ -135,7 +135,7 @@ public class NamedParameterJdbcTemplate implements NamedParameterJdbcOperations * Return the maximum number of entries for this template's SQL cache. */ public int getCacheLimit() { - return this.parsedSqlCache.sizeLimit(); + return this.parsedSqlCache.capacity(); }