From f99faac073dabc9e6f1045b00ed4c51bc7d8fa17 Mon Sep 17 00:00:00 2001 From: Juergen Hoeller Date: Fri, 21 Jul 2023 20:27:23 +0200 Subject: [PATCH] Add caching annotation support for CompletableFuture and reactive return values Includes CompletableFuture-based retrieve operations on Spring's Cache interface. Includes support for retrieve operations on CaffeineCache and ConcurrentMapCache. Includes async cache mode option on CaffeineCacheManager. Closes gh-17559 Closes gh-17920 Closes gh-30122 --- .../cache/caffeine/CaffeineCache.java | 62 ++- .../cache/caffeine/CaffeineCacheManager.java | 129 +++++- .../TransactionAwareCacheDecorator.java | 15 +- .../caffeine/CaffeineCacheManagerTests.java | 96 ++++- .../java/org/springframework/cache/Cache.java | 76 +++- .../cache/concurrent/ConcurrentMapCache.java | 27 +- .../cache/interceptor/CacheAspectSupport.java | 402 +++++++++++++----- .../cache/support/NoOpCache.java | 15 +- .../cache/CacheReproTests.java | 269 +++++++++++- 9 files changed, 949 insertions(+), 142 deletions(-) diff --git a/spring-context-support/src/main/java/org/springframework/cache/caffeine/CaffeineCache.java b/spring-context-support/src/main/java/org/springframework/cache/caffeine/CaffeineCache.java index c47497f85e..d73b130cbd 100644 --- a/spring-context-support/src/main/java/org/springframework/cache/caffeine/CaffeineCache.java +++ b/spring-context-support/src/main/java/org/springframework/cache/caffeine/CaffeineCache.java @@ -17,8 +17,11 @@ package org.springframework.cache.caffeine; import java.util.concurrent.Callable; +import java.util.concurrent.CompletableFuture; import java.util.function.Function; +import java.util.function.Supplier; +import com.github.benmanes.caffeine.cache.AsyncCache; import com.github.benmanes.caffeine.cache.LoadingCache; import org.springframework.cache.support.AbstractValueAdaptingCache; @@ -29,7 +32,11 @@ import org.springframework.util.Assert; * Spring {@link org.springframework.cache.Cache} adapter implementation * on top of a Caffeine {@link com.github.benmanes.caffeine.cache.Cache} instance. * - *

Requires Caffeine 2.1 or higher. + *

Supports the {@link #retrieve(Object)} and {@link #retrieve(Object, Supplier)} + * operations through Caffeine's {@link AsyncCache}, when provided via the + * {@link #CaffeineCache(String, AsyncCache, boolean)} constructor. + * + *

Requires Caffeine 3.0 or higher, as of Spring Framework 6.1. * * @author Ben Manes * @author Juergen Hoeller @@ -43,6 +50,9 @@ public class CaffeineCache extends AbstractValueAdaptingCache { private final com.github.benmanes.caffeine.cache.Cache cache; + @Nullable + private AsyncCache asyncCache; + /** * Create a {@link CaffeineCache} instance with the specified name and the @@ -72,17 +82,51 @@ public class CaffeineCache extends AbstractValueAdaptingCache { this.cache = cache; } + /** + * Create a {@link CaffeineCache} instance with the specified name and the + * given internal {@link AsyncCache} to use. + * @param name the name of the cache + * @param cache the backing Caffeine Cache instance + * @param allowNullValues whether to accept and convert {@code null} + * values for this cache + * @since 6.1 + */ + public CaffeineCache(String name, AsyncCache cache, boolean allowNullValues) { + super(allowNullValues); + Assert.notNull(name, "Name must not be null"); + Assert.notNull(cache, "Cache must not be null"); + this.name = name; + this.cache = cache.synchronous(); + this.asyncCache = cache; + } + @Override public final String getName() { return this.name; } + /** + * Return the internal Caffeine Cache + * (possibly an adapter on top of an {@link #getAsyncCache()}). + */ @Override public final com.github.benmanes.caffeine.cache.Cache getNativeCache() { return this.cache; } + /** + * Return the internal Caffeine AsyncCache. + * @throws IllegalStateException if no AsyncCache is available + * @see #CaffeineCache(String, AsyncCache, boolean) + * @see CaffeineCacheManager#setAsyncCacheMode + */ + public final AsyncCache getAsyncCache() { + Assert.state(this.asyncCache != null, + "No Caffeine AsyncCache available: set CaffeineCacheManager.setAsyncCacheMode(true)"); + return this.asyncCache; + } + @SuppressWarnings("unchecked") @Override @Nullable @@ -90,6 +134,22 @@ public class CaffeineCache extends AbstractValueAdaptingCache { return (T) fromStoreValue(this.cache.get(key, new LoadFunction(valueLoader))); } + @Override + @Nullable + public CompletableFuture retrieve(Object key) { + CompletableFuture result = getAsyncCache().getIfPresent(key); + if (result != null && isAllowNullValues()) { + result = result.handle((value, ex) -> fromStoreValue(value)); + } + return result; + } + + @SuppressWarnings("unchecked") + @Override + public CompletableFuture retrieve(Object key, Supplier> valueLoader) { + return (CompletableFuture) getAsyncCache().get(key, (k, e) -> valueLoader.get()); + } + @Override @Nullable protected Object lookup(Object key) { diff --git a/spring-context-support/src/main/java/org/springframework/cache/caffeine/CaffeineCacheManager.java b/spring-context-support/src/main/java/org/springframework/cache/caffeine/CaffeineCacheManager.java index 239a7350cd..bd2c2415c4 100644 --- a/spring-context-support/src/main/java/org/springframework/cache/caffeine/CaffeineCacheManager.java +++ b/spring-context-support/src/main/java/org/springframework/cache/caffeine/CaffeineCacheManager.java @@ -22,7 +22,10 @@ import java.util.Collections; import java.util.Map; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.CopyOnWriteArrayList; +import java.util.function.Supplier; +import com.github.benmanes.caffeine.cache.AsyncCache; +import com.github.benmanes.caffeine.cache.AsyncCacheLoader; import com.github.benmanes.caffeine.cache.CacheLoader; import com.github.benmanes.caffeine.cache.Caffeine; import com.github.benmanes.caffeine.cache.CaffeineSpec; @@ -45,7 +48,11 @@ import org.springframework.util.ObjectUtils; * A {@link CaffeineSpec}-compliant expression value can also be applied * via the {@link #setCacheSpecification "cacheSpecification"} bean property. * - *

Requires Caffeine 2.1 or higher. + *

Supports the {@link Cache#retrieve(Object)} and + * {@link Cache#retrieve(Object, Supplier)} operations through Caffeine's + * {@link AsyncCache}, when configured via {@link #setAsyncCacheMode}. + * + *

Requires Caffeine 3.0 or higher, as of Spring Framework 6.1. * * @author Ben Manes * @author Juergen Hoeller @@ -54,13 +61,18 @@ import org.springframework.util.ObjectUtils; * @author Brian Clozel * @since 4.3 * @see CaffeineCache + * @see #setCaffeineSpec + * @see #setCacheSpecification + * @see #setAsyncCacheMode */ public class CaffeineCacheManager implements CacheManager { private Caffeine cacheBuilder = Caffeine.newBuilder(); @Nullable - private CacheLoader cacheLoader; + private AsyncCacheLoader cacheLoader; + + private boolean asyncCacheMode = false; private boolean allowNullValues = true; @@ -110,7 +122,7 @@ public class CaffeineCacheManager implements CacheManager { * Set the Caffeine to use for building each individual * {@link CaffeineCache} instance. * @see #createNativeCaffeineCache - * @see com.github.benmanes.caffeine.cache.Caffeine#build() + * @see Caffeine#build() */ public void setCaffeine(Caffeine caffeine) { Assert.notNull(caffeine, "Caffeine must not be null"); @@ -121,7 +133,7 @@ public class CaffeineCacheManager implements CacheManager { * Set the {@link CaffeineSpec} to use for building each individual * {@link CaffeineCache} instance. * @see #createNativeCaffeineCache - * @see com.github.benmanes.caffeine.cache.Caffeine#from(CaffeineSpec) + * @see Caffeine#from(CaffeineSpec) */ public void setCaffeineSpec(CaffeineSpec caffeineSpec) { doSetCaffeine(Caffeine.from(caffeineSpec)); @@ -132,7 +144,7 @@ public class CaffeineCacheManager implements CacheManager { * individual {@link CaffeineCache} instance. The given value needs to * comply with Caffeine's {@link CaffeineSpec} (see its javadoc). * @see #createNativeCaffeineCache - * @see com.github.benmanes.caffeine.cache.Caffeine#from(String) + * @see Caffeine#from(String) */ public void setCacheSpecification(String cacheSpecification) { doSetCaffeine(Caffeine.from(cacheSpecification)); @@ -149,7 +161,7 @@ public class CaffeineCacheManager implements CacheManager { * Set the Caffeine CacheLoader to use for building each individual * {@link CaffeineCache} instance, turning it into a LoadingCache. * @see #createNativeCaffeineCache - * @see com.github.benmanes.caffeine.cache.Caffeine#build(CacheLoader) + * @see Caffeine#build(CacheLoader) * @see com.github.benmanes.caffeine.cache.LoadingCache */ public void setCacheLoader(CacheLoader cacheLoader) { @@ -159,6 +171,45 @@ public class CaffeineCacheManager implements CacheManager { } } + /** + * Set the Caffeine AsyncCacheLoader to use for building each individual + * {@link CaffeineCache} instance, turning it into a LoadingCache. + *

This implicitly switches the {@link #setAsyncCacheMode "asyncCacheMode"} + * flag to {@code true}. + * @since 6.1 + * @see #createAsyncCaffeineCache + * @see Caffeine#buildAsync(AsyncCacheLoader) + * @see com.github.benmanes.caffeine.cache.LoadingCache + */ + public void setAsyncCacheLoader(AsyncCacheLoader cacheLoader) { + if (!ObjectUtils.nullSafeEquals(this.cacheLoader, cacheLoader)) { + this.cacheLoader = cacheLoader; + this.asyncCacheMode = true; + refreshCommonCaches(); + } + } + + /** + * Set the common cache type that this cache manager builds to async. + * This applies to {@link #setCacheNames} as well as on-demand caches. + *

Individual cache registrations (such as {@link #registerCustomCache(String, AsyncCache)} + * and {@link #registerCustomCache(String, com.github.benmanes.caffeine.cache.Cache)} + * are not dependent on this setting. + *

By default, this cache manager builds regular native Caffeine caches. + * To switch to async caches which can also be used through the synchronous API + * but come with support for {@code Cache#retrieve}, set this flag to {@code true}. + * @since 6.1 + * @see Caffeine#buildAsync() + * @see Cache#retrieve(Object) + * @see Cache#retrieve(Object, Supplier) + */ + public void setAsyncCacheMode(boolean asyncCacheMode) { + if (this.asyncCacheMode != asyncCacheMode) { + this.asyncCacheMode = asyncCacheMode; + refreshCommonCaches(); + } + } + /** * Specify whether to accept and convert {@code null} values for all caches * in this cache manager. @@ -211,13 +262,34 @@ public class CaffeineCacheManager implements CacheManager { * @param name the name of the cache * @param cache the custom Caffeine Cache instance to register * @since 5.2.8 - * @see #adaptCaffeineCache + * @see #adaptCaffeineCache(String, com.github.benmanes.caffeine.cache.Cache) */ public void registerCustomCache(String name, com.github.benmanes.caffeine.cache.Cache cache) { this.customCacheNames.add(name); this.cacheMap.put(name, adaptCaffeineCache(name, cache)); } + /** + * Register the given Caffeine AsyncCache instance with this cache manager, + * adapting it to Spring's cache API for exposure through {@link #getCache}. + * Any number of such custom caches may be registered side by side. + *

This allows for custom settings per cache (as opposed to all caches + * sharing the common settings in the cache manager's configuration) and + * is typically used with the Caffeine builder API: + * {@code registerCustomCache("myCache", Caffeine.newBuilder().maximumSize(10).build())} + *

Note that any other caches, whether statically specified through + * {@link #setCacheNames} or dynamically built on demand, still operate + * with the common settings in the cache manager's configuration. + * @param name the name of the cache + * @param cache the custom Caffeine Cache instance to register + * @since 6.1 + * @see #adaptCaffeineCache(String, AsyncCache) + */ + public void registerCustomCache(String name, AsyncCache cache) { + this.customCacheNames.add(name); + this.cacheMap.put(name, adaptCaffeineCache(name, cache)); + } + /** * Adapt the given new native Caffeine Cache instance to Spring's {@link Cache} * abstraction for the specified cache name. @@ -225,13 +297,27 @@ public class CaffeineCacheManager implements CacheManager { * @param cache the native Caffeine Cache instance * @return the Spring CaffeineCache adapter (or a decorator thereof) * @since 5.2.8 - * @see CaffeineCache + * @see CaffeineCache#CaffeineCache(String, com.github.benmanes.caffeine.cache.Cache, boolean) * @see #isAllowNullValues() */ protected Cache adaptCaffeineCache(String name, com.github.benmanes.caffeine.cache.Cache cache) { return new CaffeineCache(name, cache, isAllowNullValues()); } + /** + * Adapt the given new Caffeine AsyncCache instance to Spring's {@link Cache} + * abstraction for the specified cache name. + * @param name the name of the cache + * @param cache the Caffeine AsyncCache instance + * @return the Spring CaffeineCache adapter (or a decorator thereof) + * @since 6.1 + * @see CaffeineCache#CaffeineCache(String, AsyncCache, boolean) + * @see #isAllowNullValues() + */ + protected Cache adaptCaffeineCache(String name, AsyncCache cache) { + return new CaffeineCache(name, cache, isAllowNullValues()); + } + /** * Build a common {@link CaffeineCache} instance for the specified cache name, * using the common Caffeine configuration specified on this cache manager. @@ -244,7 +330,8 @@ public class CaffeineCacheManager implements CacheManager { * @see #createNativeCaffeineCache */ protected Cache createCaffeineCache(String name) { - return adaptCaffeineCache(name, createNativeCaffeineCache(name)); + return (this.asyncCacheMode ? adaptCaffeineCache(name, createAsyncCaffeineCache(name)) : + adaptCaffeineCache(name, createNativeCaffeineCache(name))); } /** @@ -255,7 +342,29 @@ public class CaffeineCacheManager implements CacheManager { * @see #createCaffeineCache */ protected com.github.benmanes.caffeine.cache.Cache createNativeCaffeineCache(String name) { - return (this.cacheLoader != null ? this.cacheBuilder.build(this.cacheLoader) : this.cacheBuilder.build()); + if (this.cacheLoader != null) { + if (this.cacheLoader instanceof CacheLoader regularCacheLoader) { + return this.cacheBuilder.build(regularCacheLoader); + } + else { + throw new IllegalStateException( + "Cannot create regular Caffeine Cache with async-only cache loader: " + this.cacheLoader); + } + } + return this.cacheBuilder.build(); + } + + /** + * Build a common Caffeine AsyncCache instance for the specified cache name, + * using the common Caffeine configuration specified on this cache manager. + * @param name the name of the cache + * @return the Caffeine AsyncCache instance + * @since 6.1 + * @see #createCaffeineCache + */ + protected AsyncCache createAsyncCaffeineCache(String name) { + return (this.cacheLoader != null ? this.cacheBuilder.buildAsync(this.cacheLoader) : + this.cacheBuilder.buildAsync()); } /** diff --git a/spring-context-support/src/main/java/org/springframework/cache/transaction/TransactionAwareCacheDecorator.java b/spring-context-support/src/main/java/org/springframework/cache/transaction/TransactionAwareCacheDecorator.java index 33571ffc90..1c14c85a4b 100644 --- a/spring-context-support/src/main/java/org/springframework/cache/transaction/TransactionAwareCacheDecorator.java +++ b/spring-context-support/src/main/java/org/springframework/cache/transaction/TransactionAwareCacheDecorator.java @@ -1,5 +1,5 @@ /* - * Copyright 2002-2020 the original author or authors. + * Copyright 2002-2023 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -17,6 +17,8 @@ package org.springframework.cache.transaction; import java.util.concurrent.Callable; +import java.util.concurrent.CompletableFuture; +import java.util.function.Supplier; import org.springframework.cache.Cache; import org.springframework.lang.Nullable; @@ -91,6 +93,17 @@ public class TransactionAwareCacheDecorator implements Cache { return this.targetCache.get(key, valueLoader); } + @Override + @Nullable + public CompletableFuture retrieve(Object key) { + return this.targetCache.retrieve(key); + } + + @Override + public CompletableFuture retrieve(Object key, Supplier> valueLoader) { + return this.targetCache.retrieve(key, valueLoader); + } + @Override public void put(final Object key, @Nullable final Object value) { if (TransactionSynchronizationManager.isSynchronizationActive()) { diff --git a/spring-context-support/src/test/java/org/springframework/cache/caffeine/CaffeineCacheManagerTests.java b/spring-context-support/src/test/java/org/springframework/cache/caffeine/CaffeineCacheManagerTests.java index fd2943c191..a9f9858087 100644 --- a/spring-context-support/src/test/java/org/springframework/cache/caffeine/CaffeineCacheManagerTests.java +++ b/spring-context-support/src/test/java/org/springframework/cache/caffeine/CaffeineCacheManagerTests.java @@ -16,6 +16,8 @@ package org.springframework.cache.caffeine; +import java.util.concurrent.CompletableFuture; + import com.github.benmanes.caffeine.cache.CacheLoader; import com.github.benmanes.caffeine.cache.Caffeine; import com.github.benmanes.caffeine.cache.CaffeineSpec; @@ -26,6 +28,7 @@ import org.springframework.cache.CacheManager; import static org.assertj.core.api.Assertions.assertThat; import static org.assertj.core.api.Assertions.assertThatIllegalArgumentException; +import static org.assertj.core.api.Assertions.assertThatIllegalStateException; import static org.mockito.Mockito.mock; /** @@ -38,22 +41,26 @@ public class CaffeineCacheManagerTests { @Test public void testDynamicMode() { CacheManager cm = new CaffeineCacheManager(); + Cache cache1 = cm.getCache("c1"); - boolean condition2 = cache1 instanceof CaffeineCache; - assertThat(condition2).isTrue(); + assertThat(cache1).isInstanceOf(CaffeineCache.class); Cache cache1again = cm.getCache("c1"); assertThat(cache1).isSameAs(cache1again); Cache cache2 = cm.getCache("c2"); - boolean condition1 = cache2 instanceof CaffeineCache; - assertThat(condition1).isTrue(); + assertThat(cache2).isInstanceOf(CaffeineCache.class); Cache cache2again = cm.getCache("c2"); assertThat(cache2).isSameAs(cache2again); Cache cache3 = cm.getCache("c3"); - boolean condition = cache3 instanceof CaffeineCache; - assertThat(condition).isTrue(); + assertThat(cache3).isInstanceOf(CaffeineCache.class); Cache cache3again = cm.getCache("c3"); assertThat(cache3).isSameAs(cache3again); + assertThatIllegalStateException().isThrownBy(() -> cache1.retrieve("key1")); + assertThatIllegalStateException().isThrownBy(() -> cache1.retrieve("key2")); + assertThatIllegalStateException().isThrownBy(() -> cache1.retrieve("key3")); + assertThatIllegalStateException().isThrownBy(() -> cache1.retrieve("key3", + () -> CompletableFuture.completedFuture("value3"))); + cache1.put("key1", "value1"); assertThat(cache1.get("key1").get()).isEqualTo("value1"); cache1.put("key2", 2); @@ -62,19 +69,23 @@ public class CaffeineCacheManagerTests { assertThat(cache1.get("key3").get()).isNull(); cache1.evict("key3"); assertThat(cache1.get("key3")).isNull(); + assertThat(cache1.get("key3", () -> "value3")).isEqualTo("value3"); + assertThat(cache1.get("key3", () -> "value3")).isEqualTo("value3"); + cache1.evict("key3"); + assertThat(cache1.get("key3", () -> (String) null)).isNull(); + assertThat(cache1.get("key3", () -> (String) null)).isNull(); } @Test public void testStaticMode() { CaffeineCacheManager cm = new CaffeineCacheManager("c1", "c2"); + Cache cache1 = cm.getCache("c1"); - boolean condition3 = cache1 instanceof CaffeineCache; - assertThat(condition3).isTrue(); + assertThat(cache1).isInstanceOf(CaffeineCache.class); Cache cache1again = cm.getCache("c1"); assertThat(cache1).isSameAs(cache1again); Cache cache2 = cm.getCache("c2"); - boolean condition2 = cache2 instanceof CaffeineCache; - assertThat(condition2).isTrue(); + assertThat(cache2).isInstanceOf(CaffeineCache.class); Cache cache2again = cm.getCache("c2"); assertThat(cache2).isSameAs(cache2again); Cache cache3 = cm.getCache("c3"); @@ -88,15 +99,24 @@ public class CaffeineCacheManagerTests { assertThat(cache1.get("key3").get()).isNull(); cache1.evict("key3"); assertThat(cache1.get("key3")).isNull(); + assertThat(cache1.get("key3", () -> "value3")).isEqualTo("value3"); + assertThat(cache1.get("key3", () -> "value3")).isEqualTo("value3"); + cache1.evict("key3"); + assertThat(cache1.get("key3", () -> (String) null)).isNull(); + assertThat(cache1.get("key3", () -> (String) null)).isNull(); + + assertThatIllegalStateException().isThrownBy(() -> cache1.retrieve("key1")); + assertThatIllegalStateException().isThrownBy(() -> cache1.retrieve("key2")); + assertThatIllegalStateException().isThrownBy(() -> cache1.retrieve("key3")); + assertThatIllegalStateException().isThrownBy(() -> cache1.retrieve("key3", + () -> CompletableFuture.completedFuture("value3"))); cm.setAllowNullValues(false); Cache cache1x = cm.getCache("c1"); - boolean condition1 = cache1x instanceof CaffeineCache; - assertThat(condition1).isTrue(); + assertThat(cache1x).isInstanceOf(CaffeineCache.class); assertThat(cache1x).isNotSameAs(cache1); Cache cache2x = cm.getCache("c2"); - boolean condition = cache2x instanceof CaffeineCache; - assertThat(condition).isTrue(); + assertThat(cache2x).isInstanceOf(CaffeineCache.class); assertThat(cache2x).isNotSameAs(cache2); Cache cache3x = cm.getCache("c3"); assertThat(cache3x).isNull(); @@ -115,6 +135,52 @@ public class CaffeineCacheManagerTests { assertThat(cache1y.get("key3")).isNull(); } + @Test + public void testAsyncMode() { + CaffeineCacheManager cm = new CaffeineCacheManager(); + cm.setAsyncCacheMode(true); + + Cache cache1 = cm.getCache("c1"); + assertThat(cache1).isInstanceOf(CaffeineCache.class); + Cache cache1again = cm.getCache("c1"); + assertThat(cache1).isSameAs(cache1again); + Cache cache2 = cm.getCache("c2"); + assertThat(cache2).isInstanceOf(CaffeineCache.class); + Cache cache2again = cm.getCache("c2"); + assertThat(cache2).isSameAs(cache2again); + Cache cache3 = cm.getCache("c3"); + assertThat(cache3).isInstanceOf(CaffeineCache.class); + Cache cache3again = cm.getCache("c3"); + assertThat(cache3).isSameAs(cache3again); + + cache1.put("key1", "value1"); + assertThat(cache1.get("key1").get()).isEqualTo("value1"); + cache1.put("key2", 2); + assertThat(cache1.get("key2").get()).isEqualTo(2); + cache1.put("key3", null); + assertThat(cache1.get("key3").get()).isNull(); + cache1.evict("key3"); + assertThat(cache1.get("key3")).isNull(); + assertThat(cache1.get("key3", () -> "value3")).isEqualTo("value3"); + assertThat(cache1.get("key3", () -> "value3")).isEqualTo("value3"); + cache1.evict("key3"); + assertThat(cache1.get("key3", () -> (String) null)).isNull(); + assertThat(cache1.get("key3", () -> (String) null)).isNull(); + + assertThat(cache1.retrieve("key1").join()).isEqualTo("value1"); + assertThat(cache1.retrieve("key2").join()).isEqualTo(2); + assertThat(cache1.retrieve("key3").join()).isNull(); + cache1.evict("key3"); + assertThat(cache1.retrieve("key3")).isNull(); + assertThat(cache1.retrieve("key3", () -> CompletableFuture.completedFuture("value3")).join()) + .isEqualTo("value3"); + assertThat(cache1.retrieve("key3", () -> CompletableFuture.completedFuture("value3")).join()) + .isEqualTo("value3"); + cache1.evict("key3"); + assertThat(cache1.retrieve("key3", () -> CompletableFuture.completedFuture(null)).join()).isNull(); + assertThat(cache1.retrieve("key3", () -> CompletableFuture.completedFuture(null)).join()).isNull(); + } + @Test public void changeCaffeineRecreateCache() { CaffeineCacheManager cm = new CaffeineCacheManager("c1"); @@ -190,7 +256,7 @@ public class CaffeineCacheManagerTests { assertThat(value.get()).isEqualTo("pong"); assertThatIllegalArgumentException().isThrownBy(() -> assertThat(cache1.get("foo")).isNull()) - .withMessageContaining("I only know ping"); + .withMessageContaining("I only know ping"); } @Test diff --git a/spring-context/src/main/java/org/springframework/cache/Cache.java b/spring-context/src/main/java/org/springframework/cache/Cache.java index 8a3b904f49..63f93f32ea 100644 --- a/spring-context/src/main/java/org/springframework/cache/Cache.java +++ b/spring-context/src/main/java/org/springframework/cache/Cache.java @@ -1,5 +1,5 @@ /* - * Copyright 2002-2022 the original author or authors. + * Copyright 2002-2023 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -17,20 +17,28 @@ package org.springframework.cache; import java.util.concurrent.Callable; +import java.util.concurrent.CompletableFuture; +import java.util.function.Supplier; import org.springframework.lang.Nullable; /** * Interface that defines common cache operations. * - * Note: Due to the generic use of caching, it is recommended that - * implementations allow storage of {@code null} values (for example to - * cache methods that return {@code null}). + *

Serves as an SPI for Spring's annotation-based caching model + * ({@link org.springframework.cache.annotation.Cacheable} and co) + * as well as an API for direct usage in applications. + * + *

Note: Due to the generic use of caching, it is recommended + * that implementations allow storage of {@code null} values + * (for example to cache methods that return {@code null}). * * @author Costin Leau * @author Juergen Hoeller * @author Stephane Nicoll * @since 3.1 + * @see CacheManager + * @see org.springframework.cache.annotation.Cacheable */ public interface Cache { @@ -100,6 +108,51 @@ public interface Cache { @Nullable T get(Object key, Callable valueLoader); + /** + * Return the value to which this cache maps the specified key, + * wrapped in a {@link CompletableFuture}. This operation must not block + * but is allowed to return a completed {@link CompletableFuture} if the + * corresponding value is immediately available. + *

Returns {@code null} if the cache contains no mapping for this key; + * otherwise, the cached value (which may be {@code null} itself) will + * be returned in the {@link CompletableFuture}. + * @param key the key whose associated value is to be returned + * @return the value to which this cache maps the specified key, + * contained within a {@link CompletableFuture} which may also hold + * a cached {@code null} value. A straight {@code null} being + * returned means that the cache contains no mapping for this key. + * @since 6.1 + * @see #get(Object) + */ + @Nullable + default CompletableFuture retrieve(Object key) { + throw new UnsupportedOperationException( + getClass().getName() + " does not support CompletableFuture-based retrieval"); + } + + /** + * Return the value to which this cache maps the specified key, obtaining + * that value from {@code valueLoader} if necessary. This method provides + * a simple substitute for the conventional "if cached, return; otherwise + * create, cache and return" pattern, based on {@link CompletableFuture}. + * This operation must not block. + *

If possible, implementations should ensure that the loading operation + * is synchronized so that the specified {@code valueLoader} is only called + * once in case of concurrent access on the same key. + *

If the {@code valueLoader} throws an exception, it will be propagated + * to the {@code CompletableFuture} handle returned from here. + * @param key the key whose associated value is to be returned + * @return the value to which this cache maps the specified key, + * contained within a {@link CompletableFuture} + * @since 6.1 + * @see #retrieve(Object) + * @see #get(Object, Callable) + */ + default CompletableFuture retrieve(Object key, Supplier> valueLoader) { + throw new UnsupportedOperationException( + getClass().getName() + " does not support CompletableFuture-based retrieval"); + } + /** * Associate the specified value with the specified key in this cache. *

If the cache previously contained a mapping for this key, the old @@ -108,6 +161,11 @@ public interface Cache { * fashion, with subsequent lookups possibly not seeing the entry yet. * This may for example be the case with transactional cache decorators. * Use {@link #putIfAbsent} for guaranteed immediate registration. + *

If the cache is supposed to be compatible with {@link CompletableFuture} + * and reactive interactions, the put operation needs to be effectively + * non-blocking, with any backend write-through happening asynchronously. + * This goes along with a cache implemented and configured to support + * {@link #retrieve(Object)} and {@link #retrieve(Object, Supplier)}. * @param key the key with which the specified value is to be associated * @param value the value to be associated with the specified key * @see #putIfAbsent(Object, Object) @@ -156,6 +214,11 @@ public interface Cache { * fashion, with subsequent lookups possibly still seeing the entry. * This may for example be the case with transactional cache decorators. * Use {@link #evictIfPresent} for guaranteed immediate removal. + *

If the cache is supposed to be compatible with {@link CompletableFuture} + * and reactive interactions, the evict operation needs to be effectively + * non-blocking, with any backend write-through happening asynchronously. + * This goes along with a cache implemented and configured to support + * {@link #retrieve(Object)} and {@link #retrieve(Object, Supplier)}. * @param key the key whose mapping is to be removed from the cache * @see #evictIfPresent(Object) */ @@ -188,6 +251,11 @@ public interface Cache { * fashion, with subsequent lookups possibly still seeing the entries. * This may for example be the case with transactional cache decorators. * Use {@link #invalidate()} for guaranteed immediate removal of entries. + *

If the cache is supposed to be compatible with {@link CompletableFuture} + * and reactive interactions, the clear operation needs to be effectively + * non-blocking, with any backend write-through happening asynchronously. + * This goes along with a cache implemented and configured to support + * {@link #retrieve(Object)} and {@link #retrieve(Object, Supplier)}. * @see #invalidate() */ void clear(); diff --git a/spring-context/src/main/java/org/springframework/cache/concurrent/ConcurrentMapCache.java b/spring-context/src/main/java/org/springframework/cache/concurrent/ConcurrentMapCache.java index 1a17605578..10648155c4 100644 --- a/spring-context/src/main/java/org/springframework/cache/concurrent/ConcurrentMapCache.java +++ b/spring-context/src/main/java/org/springframework/cache/concurrent/ConcurrentMapCache.java @@ -1,5 +1,5 @@ /* - * Copyright 2002-2020 the original author or authors. + * Copyright 2002-2023 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -17,8 +17,11 @@ package org.springframework.cache.concurrent; import java.util.concurrent.Callable; +import java.util.concurrent.CompletableFuture; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentMap; +import java.util.concurrent.ForkJoinPool; +import java.util.function.Supplier; import org.springframework.cache.support.AbstractValueAdaptingCache; import org.springframework.core.serializer.support.SerializationDelegate; @@ -26,13 +29,17 @@ import org.springframework.lang.Nullable; import org.springframework.util.Assert; /** - * Simple {@link org.springframework.cache.Cache} implementation based on the - * core JDK {@code java.util.concurrent} package. + * Simple {@link org.springframework.cache.Cache} implementation based on the core + * JDK {@code java.util.concurrent} package. * *

Useful for testing or simple caching scenarios, typically in combination * with {@link org.springframework.cache.support.SimpleCacheManager} or * dynamically through {@link ConcurrentMapCacheManager}. * + *

Supports the {@link #retrieve(Object)} and {@link #retrieve(Object, Supplier)} + * operations in a best-effort fashion, relying on default {@link CompletableFuture} + * execution (typically within the JVM's {@link ForkJoinPool#commonPool()}). + * *

Note: As {@link ConcurrentHashMap} (the default implementation used) * does not allow for {@code null} values to be stored, this class will replace * them with a predefined internal object. This behavior can be changed through the @@ -149,6 +156,20 @@ public class ConcurrentMapCache extends AbstractValueAdaptingCache { })); } + @Override + @Nullable + public CompletableFuture retrieve(Object key) { + Object value = lookup(key); + return (value != null ? CompletableFuture.completedFuture(fromStoreValue(value)) : null); + } + + @SuppressWarnings("unchecked") + @Override + public CompletableFuture retrieve(Object key, Supplier> valueLoader) { + return CompletableFuture.supplyAsync(() -> + (T) fromStoreValue(this.store.computeIfAbsent(key, k -> toStoreValue(valueLoader.get().join())))); + } + @Override public void put(Object key, @Nullable Object value) { this.store.put(key, toStoreValue(value)); diff --git a/spring-context/src/main/java/org/springframework/cache/interceptor/CacheAspectSupport.java b/spring-context/src/main/java/org/springframework/cache/interceptor/CacheAspectSupport.java index fe60df9d6b..695c53471c 100644 --- a/spring-context/src/main/java/org/springframework/cache/interceptor/CacheAspectSupport.java +++ b/spring-context/src/main/java/org/springframework/cache/interceptor/CacheAspectSupport.java @@ -24,11 +24,16 @@ import java.util.Collections; import java.util.List; import java.util.Map; import java.util.Optional; +import java.util.concurrent.CompletableFuture; import java.util.concurrent.ConcurrentHashMap; import java.util.function.Supplier; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; +import org.reactivestreams.Subscriber; +import org.reactivestreams.Subscription; +import reactor.core.publisher.Flux; +import reactor.core.publisher.Mono; import org.springframework.aop.framework.AopProxyUtils; import org.springframework.aop.support.AopUtils; @@ -43,6 +48,8 @@ import org.springframework.cache.Cache; import org.springframework.cache.CacheManager; import org.springframework.context.expression.AnnotatedElementKey; import org.springframework.core.BridgeMethodResolver; +import org.springframework.core.ReactiveAdapter; +import org.springframework.core.ReactiveAdapterRegistry; import org.springframework.expression.EvaluationContext; import org.springframework.lang.Nullable; import org.springframework.util.Assert; @@ -83,12 +90,18 @@ import org.springframework.util.function.SupplierUtils; public abstract class CacheAspectSupport extends AbstractCacheInvoker implements BeanFactoryAware, InitializingBean, SmartInitializingSingleton { + private static final boolean reactiveStreamsPresent = ClassUtils.isPresent( + "org.reactivestreams.Publisher", CacheAspectSupport.class.getClassLoader()); + protected final Log logger = LogFactory.getLog(getClass()); private final Map metadataCache = new ConcurrentHashMap<>(1024); private final CacheOperationExpressionEvaluator evaluator = new CacheOperationExpressionEvaluator(); + @Nullable + private final ReactiveCachingHandler reactiveCachingHandler; + @Nullable private CacheOperationSource cacheOperationSource; @@ -103,6 +116,11 @@ public abstract class CacheAspectSupport extends AbstractCacheInvoker private boolean initialized = false; + protected CacheAspectSupport() { + this.reactiveCachingHandler = (reactiveStreamsPresent ? new ReactiveCachingHandler() : null); + } + + /** * Configure this aspect with the given error handler, key generator and cache resolver/manager * suppliers, applying the corresponding default if a supplier is not resolvable. @@ -371,41 +389,25 @@ public abstract class CacheAspectSupport extends AbstractCacheInvoker } @Nullable - private Object execute(final CacheOperationInvoker invoker, Method method, CacheOperationContexts contexts) { - // Special handling of synchronized invocation + private Object execute(CacheOperationInvoker invoker, Method method, CacheOperationContexts contexts) { if (contexts.isSynchronized()) { - CacheOperationContext context = contexts.get(CacheableOperation.class).iterator().next(); - if (isConditionPassing(context, CacheOperationExpressionEvaluator.NO_RESULT)) { - Object key = generateKey(context, CacheOperationExpressionEvaluator.NO_RESULT); - Cache cache = context.getCaches().iterator().next(); - try { - return wrapCacheValue(method, handleSynchronizedGet(invoker, key, cache)); - } - catch (Cache.ValueRetrievalException ex) { - // Directly propagate ThrowableWrapper from the invoker, - // or potentially also an IllegalArgumentException etc. - ReflectionUtils.rethrowRuntimeException(ex.getCause()); - } - } - else { - // No caching required, just call the underlying method - return invokeOperation(invoker); - } + // Special handling of synchronized invocation + return executeSynchronized(invoker, method, contexts); } // Process any early evictions processCacheEvicts(contexts.get(CacheEvictOperation.class), true, CacheOperationExpressionEvaluator.NO_RESULT); - // Check if we have a cached item matching the conditions - Cache.ValueWrapper cacheHit = findCachedItem(contexts.get(CacheableOperation.class)); + // Check if we have a cached value matching the conditions + Object cacheHit = findCachedValue(contexts.get(CacheableOperation.class)); Object cacheValue; Object returnValue; if (cacheHit != null && !hasCachePut(contexts)) { // If there are no put requests, just use the cache hit - cacheValue = cacheHit.get(); + cacheValue = (cacheHit instanceof Cache.ValueWrapper wrapper ? wrapper.get() : cacheHit); returnValue = wrapCacheValue(method, cacheValue); } else { @@ -414,8 +416,8 @@ public abstract class CacheAspectSupport extends AbstractCacheInvoker cacheValue = unwrapReturnValue(returnValue); } - // Collect puts from any @Cacheable miss, if no cached item is found - List cachePutRequests = new ArrayList<>(); + // Collect puts from any @Cacheable miss, if no cached value is found + List cachePutRequests = new ArrayList<>(1); if (cacheHit == null) { collectPutRequests(contexts.get(CacheableOperation.class), cacheValue, cachePutRequests); } @@ -425,29 +427,52 @@ public abstract class CacheAspectSupport extends AbstractCacheInvoker // Process any collected put requests, either from @CachePut or a @Cacheable miss for (CachePutRequest cachePutRequest : cachePutRequests) { - cachePutRequest.apply(cacheValue); + Object returnOverride = cachePutRequest.apply(cacheValue); + if (returnOverride != null) { + returnValue = returnOverride; + } } // Process any late evictions - processCacheEvicts(contexts.get(CacheEvictOperation.class), false, cacheValue); + Object returnOverride = processCacheEvicts( + contexts.get(CacheEvictOperation.class), false, returnValue); + if (returnOverride != null) { + returnValue = returnOverride; + } return returnValue; } @Nullable - private Object handleSynchronizedGet(CacheOperationInvoker invoker, Object key, Cache cache) { - InvocationAwareResult invocationResult = new InvocationAwareResult(); - Object result = cache.get(key, () -> { - invocationResult.invoked = true; - if (logger.isTraceEnabled()) { - logger.trace("No cache entry for key '" + key + "' in cache " + cache.getName()); + private Object executeSynchronized(CacheOperationInvoker invoker, Method method, CacheOperationContexts contexts) { + CacheOperationContext context = contexts.get(CacheableOperation.class).iterator().next(); + if (isConditionPassing(context, CacheOperationExpressionEvaluator.NO_RESULT)) { + Object key = generateKey(context, CacheOperationExpressionEvaluator.NO_RESULT); + Cache cache = context.getCaches().iterator().next(); + if (CompletableFuture.class.isAssignableFrom(method.getReturnType())) { + return cache.retrieve(key, () -> (CompletableFuture) invokeOperation(invoker)); + } + if (this.reactiveCachingHandler != null) { + Object returnValue = this.reactiveCachingHandler.executeSynchronized(invoker, method, cache, key); + if (returnValue != ReactiveCachingHandler.NOT_HANDLED) { + return returnValue; + } } - return unwrapReturnValue(invokeOperation(invoker)); - }); - if (!invocationResult.invoked && logger.isTraceEnabled()) { - logger.trace("Cache entry for key '" + key + "' found in cache '" + cache.getName() + "'"); + try { + return wrapCacheValue(method, cache.get(key, () -> unwrapReturnValue(invokeOperation(invoker)))); + } + catch (Cache.ValueRetrievalException ex) { + // Directly propagate ThrowableWrapper from the invoker, + // or potentially also an IllegalArgumentException etc. + ReflectionUtils.rethrowRuntimeException(ex.getCause()); + // Never reached + return null; + } + } + else { + // No caching required, just call the underlying method + return invokeOperation(invoker); } - return result; } @Nullable @@ -467,7 +492,7 @@ public abstract class CacheAspectSupport extends AbstractCacheInvoker private boolean hasCachePut(CacheOperationContexts contexts) { // Evaluate the conditions *without* the result object because we don't have it yet... Collection cachePutContexts = contexts.get(CachePutOperation.class); - Collection excluded = new ArrayList<>(); + Collection excluded = new ArrayList<>(1); for (CacheOperationContext context : cachePutContexts) { try { if (!context.isConditionPassing(CacheOperationExpressionEvaluator.RESULT_UNAVAILABLE)) { @@ -482,32 +507,55 @@ public abstract class CacheAspectSupport extends AbstractCacheInvoker return (cachePutContexts.size() != excluded.size()); } - private void processCacheEvicts( - Collection contexts, boolean beforeInvocation, @Nullable Object result) { + @Nullable + private Object processCacheEvicts(Collection contexts, boolean beforeInvocation, + @Nullable Object result) { - for (CacheOperationContext context : contexts) { - CacheEvictOperation operation = (CacheEvictOperation) context.metadata.operation; - if (beforeInvocation == operation.isBeforeInvocation() && isConditionPassing(context, result)) { - performCacheEvict(context, operation, result); + if (contexts.isEmpty()) { + return null; + } + List applicable = contexts.stream() + .filter(context -> (context.metadata.operation instanceof CacheEvictOperation evict && + beforeInvocation == evict.isBeforeInvocation())).toList(); + if (applicable.isEmpty()) { + return null; + } + + if (result instanceof CompletableFuture future) { + return future.whenComplete((value, ex) -> { + if (ex == null) { + performCacheEvicts(applicable, result); + } + }); + } + if (this.reactiveCachingHandler != null) { + Object returnValue = this.reactiveCachingHandler.processCacheEvicts(applicable, result); + if (returnValue != ReactiveCachingHandler.NOT_HANDLED) { + return returnValue; } } + performCacheEvicts(applicable, result); + return null; } - private void performCacheEvict( - CacheOperationContext context, CacheEvictOperation operation, @Nullable Object result) { - - Object key = null; - for (Cache cache : context.getCaches()) { - if (operation.isCacheWide()) { - logInvalidating(context, operation, null); - doClear(cache, operation.isBeforeInvocation()); - } - else { - if (key == null) { - key = generateKey(context, result); + private void performCacheEvicts(List contexts, @Nullable Object result) { + for (CacheOperationContext context : contexts) { + CacheEvictOperation operation = (CacheEvictOperation) context.metadata.operation; + if (isConditionPassing(context, result)) { + Object key = null; + for (Cache cache : context.getCaches()) { + if (operation.isCacheWide()) { + logInvalidating(context, operation, null); + doClear(cache, operation.isBeforeInvocation()); + } + else { + if (key == null) { + key = generateKey(context, result); + } + logInvalidating(context, operation, key); + doEvict(cache, key, operation.isBeforeInvocation()); + } } - logInvalidating(context, operation, key); - doEvict(cache, key, operation.isBeforeInvocation()); } } } @@ -520,19 +568,21 @@ public abstract class CacheAspectSupport extends AbstractCacheInvoker } /** - * Find a cached item only for {@link CacheableOperation} that passes the condition. + * Find a cached value only for {@link CacheableOperation} that passes the condition. * @param contexts the cacheable operations - * @return a {@link Cache.ValueWrapper} holding the cached item, + * @return a {@link Cache.ValueWrapper} holding the cached value, * or {@code null} if none is found */ @Nullable - private Cache.ValueWrapper findCachedItem(Collection contexts) { - Object result = CacheOperationExpressionEvaluator.NO_RESULT; + private Object findCachedValue(Collection contexts) { for (CacheOperationContext context : contexts) { - if (isConditionPassing(context, result)) { - Object key = generateKey(context, result); - Cache.ValueWrapper cached = findInCaches(context, key); + if (isConditionPassing(context, CacheOperationExpressionEvaluator.NO_RESULT)) { + Object key = generateKey(context, CacheOperationExpressionEvaluator.NO_RESULT); + Object cached = findInCaches(context, key); if (cached != null) { + if (logger.isTraceEnabled()) { + logger.trace("Cache entry for key '" + key + "' found in cache(s) " + context.getCacheNames()); + } return cached; } else { @@ -547,9 +597,9 @@ public abstract class CacheAspectSupport extends AbstractCacheInvoker /** * Collect the {@link CachePutRequest} for all {@link CacheOperation} using - * the specified result item. + * the specified result value. * @param contexts the contexts to handle - * @param result the result item (never {@code null}) + * @param result the result value (never {@code null}) * @param putRequests the collection to update */ private void collectPutRequests(Collection contexts, @@ -564,15 +614,18 @@ public abstract class CacheAspectSupport extends AbstractCacheInvoker } @Nullable - private Cache.ValueWrapper findInCaches(CacheOperationContext context, Object key) { + private Object findInCaches(CacheOperationContext context, Object key) { for (Cache cache : context.getCaches()) { - Cache.ValueWrapper wrapper = doGet(cache, key); - if (wrapper != null) { - if (logger.isTraceEnabled()) { - logger.trace("Cache entry for key '" + key + "' found in cache '" + cache.getName() + "'"); + if (CompletableFuture.class.isAssignableFrom(context.getMethod().getReturnType())) { + return cache.retrieve(key); + } + if (this.reactiveCachingHandler != null) { + Object returnValue = this.reactiveCachingHandler.findInCaches(context, cache, key); + if (returnValue != ReactiveCachingHandler.NOT_HANDLED) { + return returnValue; } - return wrapper; } + return doGet(cache, key); } return null; } @@ -625,13 +678,13 @@ public abstract class CacheAspectSupport extends AbstractCacheInvoker } private boolean determineSyncFlag(Method method) { - List cacheOperationContexts = this.contexts.get(CacheableOperation.class); - if (cacheOperationContexts == null) { // no @Cacheable operation at all + List cacheableContexts = this.contexts.get(CacheableOperation.class); + if (cacheableContexts == null) { // no @Cacheable operation at all return false; } boolean syncEnabled = false; - for (CacheOperationContext cacheOperationContext : cacheOperationContexts) { - if (((CacheableOperation) cacheOperationContext.getOperation()).isSync()) { + for (CacheOperationContext context : cacheableContexts) { + if (context.getOperation() instanceof CacheableOperation cacheable && cacheable.isSync()) { syncEnabled = true; break; } @@ -641,13 +694,13 @@ public abstract class CacheAspectSupport extends AbstractCacheInvoker throw new IllegalStateException( "A sync=true operation cannot be combined with other cache operations on '" + method + "'"); } - if (cacheOperationContexts.size() > 1) { + if (cacheableContexts.size() > 1) { throw new IllegalStateException( "Only one sync=true operation is allowed on '" + method + "'"); } - CacheOperationContext cacheOperationContext = cacheOperationContexts.iterator().next(); - CacheOperation operation = cacheOperationContext.getOperation(); - if (cacheOperationContext.getCaches().size() > 1) { + CacheOperationContext cacheableContext = cacheableContexts.iterator().next(); + CacheOperation operation = cacheableContext.getOperation(); + if (cacheableContext.getCaches().size() > 1) { throw new IllegalStateException( "A sync=true operation is restricted to a single cache on '" + operation + "'"); } @@ -720,7 +773,7 @@ public abstract class CacheAspectSupport extends AbstractCacheInvoker this.args = extractArgs(metadata.method, args); this.target = target; this.caches = CacheAspectSupport.this.getCaches(this, metadata.cacheResolver); - this.cacheNames = createCacheNames(this.caches); + this.cacheNames = prepareCacheNames(this.caches); } @Override @@ -808,8 +861,8 @@ public abstract class CacheAspectSupport extends AbstractCacheInvoker return this.cacheNames; } - private Collection createCacheNames(Collection caches) { - Collection names = new ArrayList<>(); + private Collection prepareCacheNames(Collection caches) { + Collection names = new ArrayList<>(caches.size()); for (Cache cache : caches) { names.add(cache.getName()); } @@ -818,25 +871,6 @@ public abstract class CacheAspectSupport extends AbstractCacheInvoker } - private class CachePutRequest { - - private final CacheOperationContext context; - - private final Object key; - - public CachePutRequest(CacheOperationContext context, Object key) { - this.context = context; - this.key = key; - } - - public void apply(@Nullable Object result) { - for (Cache cache : this.context.getCaches()) { - doPut(cache, this.key, result); - } - } - } - - private static final class CacheOperationCacheKey implements Comparable { private final CacheOperation cacheOperation; @@ -876,12 +910,168 @@ public abstract class CacheAspectSupport extends AbstractCacheInvoker } + private class CachePutRequest { + + private final CacheOperationContext context; + + private final Object key; + + public CachePutRequest(CacheOperationContext context, Object key) { + this.context = context; + this.key = key; + } + + @Nullable + public Object apply(@Nullable Object result) { + if (result instanceof CompletableFuture future) { + return future.whenComplete((value, ex) -> { + if (ex != null) { + performEvict(ex); + } + else { + performPut(value); + } + }); + } + if (reactiveCachingHandler != null) { + Object returnValue = reactiveCachingHandler.processPutRequest(this, result); + if (returnValue != ReactiveCachingHandler.NOT_HANDLED) { + return returnValue; + } + } + performPut(result); + return null; + } + + void performPut(@Nullable Object value) { + if (logger.isTraceEnabled()) { + logger.trace("Creating cache entry for key '" + this.key + "' in cache(s) " + + this.context.getCacheNames()); + } + for (Cache cache : this.context.getCaches()) { + doPut(cache, this.key, value); + } + } + + void performEvict(Throwable cause) { + if (logger.isTraceEnabled()) { + logger.trace("Removing cache entry for key '" + this.key + "' from cache(s) " + + this.context.getCacheNames() + " due to exception: " + cause); + } + for (Cache cache : this.context.getCaches()) { + doEvict(cache, this.key, false); + } + } + } + + /** - * Internal holder class for recording that a cache method was invoked. + * Reactive Streams Subscriber collection for collecting a List to cache. */ - private static class InvocationAwareResult { + private class CachePutListSubscriber implements Subscriber { + + private final CachePutRequest request; - boolean invoked; + private final List cacheValue = new ArrayList<>(); + + public CachePutListSubscriber(CachePutRequest request) { + this.request = request; + } + + @Override + public void onSubscribe(Subscription s) { + s.request(Integer.MAX_VALUE); + } + @Override + public void onNext(Object o) { + this.cacheValue.add(o); + } + @Override + public void onError(Throwable t) { + this.request.performEvict(t); + } + @Override + public void onComplete() { + this.request.performPut(this.cacheValue); + } + } + + + /** + * Inner class to avoid a hard dependency on the Reactive Streams API at runtime. + */ + private class ReactiveCachingHandler { + + public static final Object NOT_HANDLED = new Object(); + + private final ReactiveAdapterRegistry registry = ReactiveAdapterRegistry.getSharedInstance(); + + @Nullable + public Object executeSynchronized(CacheOperationInvoker invoker, Method method, Cache cache, Object key) { + ReactiveAdapter adapter = this.registry.getAdapter(method.getReturnType()); + if (adapter != null) { + if (adapter.isMultiValue()) { + // Flux or similar + return adapter.fromPublisher(Flux.from(Mono.fromFuture( + cache.retrieve(key, + () -> Flux.from(adapter.toPublisher(invokeOperation(invoker))).collectList().toFuture()))) + .flatMap(Flux::fromIterable)); + } + else { + // Mono or similar + return adapter.fromPublisher(Mono.fromFuture( + cache.retrieve(key, + () -> Mono.from(adapter.toPublisher(invokeOperation(invoker))).toFuture()))); + } + } + return NOT_HANDLED; + } + + @Nullable + public Object processCacheEvicts(List contexts, @Nullable Object result) { + ReactiveAdapter adapter = (result != null ? this.registry.getAdapter(result.getClass()) : null); + if (adapter != null) { + return adapter.fromPublisher(Mono.from(adapter.toPublisher(result)) + .doOnSuccess(value -> performCacheEvicts(contexts, result))); + } + return NOT_HANDLED; + } + + @Nullable + public Object findInCaches(CacheOperationContext context, Cache cache, Object key) { + ReactiveAdapter adapter = this.registry.getAdapter(context.getMethod().getReturnType()); + if (adapter != null) { + CompletableFuture cachedFuture = cache.retrieve(key); + if (cachedFuture == null) { + return null; + } + if (adapter.isMultiValue()) { + return adapter.fromPublisher(Flux.from(Mono.fromFuture(cachedFuture)) + .flatMap(v -> (v instanceof Iterable iv ? Flux.fromIterable(iv) : Flux.just(v)))); + } + else { + return adapter.fromPublisher(Mono.fromFuture(cachedFuture)); + } + } + return NOT_HANDLED; + } + + @Nullable + public Object processPutRequest(CachePutRequest request, @Nullable Object result) { + ReactiveAdapter adapter = (result != null ? this.registry.getAdapter(result.getClass()) : null); + if (adapter != null) { + if (adapter.isMultiValue()) { + Flux source = Flux.from(adapter.toPublisher(result)); + source.subscribe(new CachePutListSubscriber(request)); + return adapter.fromPublisher(source); + } + else { + return adapter.fromPublisher(Mono.from(adapter.toPublisher(result)) + .doOnSuccess(request::performPut).doOnError(request::performEvict)); + } + } + return NOT_HANDLED; + } } } diff --git a/spring-context/src/main/java/org/springframework/cache/support/NoOpCache.java b/spring-context/src/main/java/org/springframework/cache/support/NoOpCache.java index 6c814ff18e..b8746e97f9 100644 --- a/spring-context/src/main/java/org/springframework/cache/support/NoOpCache.java +++ b/spring-context/src/main/java/org/springframework/cache/support/NoOpCache.java @@ -1,5 +1,5 @@ /* - * Copyright 2002-2020 the original author or authors. + * Copyright 2002-2023 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -17,6 +17,8 @@ package org.springframework.cache.support; import java.util.concurrent.Callable; +import java.util.concurrent.CompletableFuture; +import java.util.function.Supplier; import org.springframework.cache.Cache; import org.springframework.lang.Nullable; @@ -80,6 +82,17 @@ public class NoOpCache implements Cache { } } + @Override + @Nullable + public CompletableFuture retrieve(Object key) { + return null; + } + + @Override + public CompletableFuture retrieve(Object key, Supplier> valueLoader) { + return valueLoader.get(); + } + @Override public void put(Object key, @Nullable Object value) { } diff --git a/spring-context/src/test/java/org/springframework/cache/CacheReproTests.java b/spring-context/src/test/java/org/springframework/cache/CacheReproTests.java index 48878ef4e1..e538deedd5 100644 --- a/spring-context/src/test/java/org/springframework/cache/CacheReproTests.java +++ b/spring-context/src/test/java/org/springframework/cache/CacheReproTests.java @@ -20,11 +20,15 @@ import java.util.Collection; import java.util.Collections; import java.util.List; import java.util.Optional; +import java.util.concurrent.CompletableFuture; import org.junit.jupiter.api.Test; import org.mockito.Mockito; +import reactor.core.publisher.Flux; +import reactor.core.publisher.Mono; import org.springframework.beans.testfixture.beans.TestBean; +import org.springframework.cache.annotation.CacheEvict; import org.springframework.cache.annotation.CachePut; import org.springframework.cache.annotation.Cacheable; import org.springframework.cache.annotation.Caching; @@ -118,6 +122,7 @@ class CacheReproTests { assertThat(cacheResolver.getCache("foo").get("foo")).isNull(); Object result = bean.getSimple("foo"); // cache name = id assertThat(cacheResolver.getCache("foo").get("foo").get()).isEqualTo(result); + context.close(); } @@ -127,7 +132,7 @@ class CacheReproTests { Spr13081Service bean = context.getBean(Spr13081Service.class); assertThatIllegalStateException().isThrownBy(() -> bean.getSimple(null)) - .withMessageContaining(MyCacheResolver.class.getName()); + .withMessageContaining(MyCacheResolver.class.getName()); context.close(); } @@ -146,6 +151,7 @@ class CacheReproTests { TestBean tb2 = bean.findById("tb1").get(); assertThat(tb2).isNotSameAs(tb); assertThat(cache.get("tb1").get()).isSameAs(tb2); + context.close(); } @@ -164,6 +170,151 @@ class CacheReproTests { TestBean tb2 = bean.findById("tb1").get(); assertThat(tb2).isNotSameAs(tb); assertThat(cache.get("tb1").get()).isSameAs(tb2); + + context.close(); + } + + @Test + void spr14235AdaptsToCompletableFuture() { + AnnotationConfigApplicationContext context = + new AnnotationConfigApplicationContext(Spr14235Config.class, Spr14235FutureService.class); + Spr14235FutureService bean = context.getBean(Spr14235FutureService.class); + Cache cache = context.getBean(CacheManager.class).getCache("itemCache"); + + TestBean tb = bean.findById("tb1").join(); + assertThat(bean.findById("tb1").join()).isSameAs(tb); + assertThat(cache.get("tb1").get()).isSameAs(tb); + + bean.clear().join(); + TestBean tb2 = bean.findById("tb1").join(); + assertThat(tb2).isNotSameAs(tb); + assertThat(cache.get("tb1").get()).isSameAs(tb2); + + bean.clear().join(); + bean.insertItem(tb).join(); + assertThat(bean.findById("tb1").join()).isSameAs(tb); + assertThat(cache.get("tb1").get()).isSameAs(tb); + + context.close(); + } + + @Test + void spr14235AdaptsToCompletableFutureWithSync() throws Exception { + AnnotationConfigApplicationContext context = + new AnnotationConfigApplicationContext(Spr14235Config.class, Spr14235FutureServiceSync.class); + Spr14235FutureServiceSync bean = context.getBean(Spr14235FutureServiceSync.class); + Cache cache = context.getBean(CacheManager.class).getCache("itemCache"); + + TestBean tb = bean.findById("tb1").get(); + assertThat(bean.findById("tb1").get()).isSameAs(tb); + assertThat(cache.get("tb1").get()).isSameAs(tb); + + cache.clear(); + TestBean tb2 = bean.findById("tb1").get(); + assertThat(tb2).isNotSameAs(tb); + assertThat(cache.get("tb1").get()).isSameAs(tb2); + + cache.clear(); + bean.insertItem(tb); + assertThat(bean.findById("tb1").get()).isSameAs(tb); + assertThat(cache.get("tb1").get()).isSameAs(tb); + + context.close(); + } + + @Test + void spr14235AdaptsToReactorMono() { + AnnotationConfigApplicationContext context = + new AnnotationConfigApplicationContext(Spr14235Config.class, Spr14235MonoService.class); + Spr14235MonoService bean = context.getBean(Spr14235MonoService.class); + Cache cache = context.getBean(CacheManager.class).getCache("itemCache"); + + TestBean tb = bean.findById("tb1").block(); + assertThat(bean.findById("tb1").block()).isSameAs(tb); + assertThat(cache.get("tb1").get()).isSameAs(tb); + + bean.clear().block(); + TestBean tb2 = bean.findById("tb1").block(); + assertThat(tb2).isNotSameAs(tb); + assertThat(cache.get("tb1").get()).isSameAs(tb2); + + bean.clear().block(); + bean.insertItem(tb).block(); + assertThat(bean.findById("tb1").block()).isSameAs(tb); + assertThat(cache.get("tb1").get()).isSameAs(tb); + + context.close(); + } + + @Test + void spr14235AdaptsToReactorMonoWithSync() { + AnnotationConfigApplicationContext context = + new AnnotationConfigApplicationContext(Spr14235Config.class, Spr14235MonoServiceSync.class); + Spr14235MonoServiceSync bean = context.getBean(Spr14235MonoServiceSync.class); + Cache cache = context.getBean(CacheManager.class).getCache("itemCache"); + + TestBean tb = bean.findById("tb1").block(); + assertThat(bean.findById("tb1").block()).isSameAs(tb); + assertThat(cache.get("tb1").get()).isSameAs(tb); + + cache.clear(); + TestBean tb2 = bean.findById("tb1").block(); + assertThat(tb2).isNotSameAs(tb); + assertThat(cache.get("tb1").get()).isSameAs(tb2); + + cache.clear(); + bean.insertItem(tb); + assertThat(bean.findById("tb1").block()).isSameAs(tb); + assertThat(cache.get("tb1").get()).isSameAs(tb); + + context.close(); + } + + @Test + void spr14235AdaptsToReactorFlux() { + AnnotationConfigApplicationContext context = + new AnnotationConfigApplicationContext(Spr14235Config.class, Spr14235FluxService.class); + Spr14235FluxService bean = context.getBean(Spr14235FluxService.class); + Cache cache = context.getBean(CacheManager.class).getCache("itemCache"); + + List tb = bean.findById("tb1").collectList().block(); + assertThat(bean.findById("tb1").collectList().block()).isEqualTo(tb); + assertThat(cache.get("tb1").get()).isEqualTo(tb); + + bean.clear().blockLast(); + List tb2 = bean.findById("tb1").collectList().block(); + assertThat(tb2).isNotEqualTo(tb); + assertThat(cache.get("tb1").get()).isEqualTo(tb2); + + bean.clear().blockLast(); + bean.insertItem("tb1", tb).blockLast(); + assertThat(bean.findById("tb1").collectList().block()).isEqualTo(tb); + assertThat(cache.get("tb1").get()).isEqualTo(tb); + + context.close(); + } + + @Test + void spr14235AdaptsToReactorFluxWithSync() { + AnnotationConfigApplicationContext context = + new AnnotationConfigApplicationContext(Spr14235Config.class, Spr14235FluxServiceSync.class); + Spr14235FluxServiceSync bean = context.getBean(Spr14235FluxServiceSync.class); + Cache cache = context.getBean(CacheManager.class).getCache("itemCache"); + + List tb = bean.findById("tb1").collectList().block(); + assertThat(bean.findById("tb1").collectList().block()).isEqualTo(tb); + assertThat(cache.get("tb1").get()).isEqualTo(tb); + + cache.clear(); + List tb2 = bean.findById("tb1").collectList().block(); + assertThat(tb2).isNotEqualTo(tb); + assertThat(cache.get("tb1").get()).isEqualTo(tb2); + + cache.clear(); + bean.insertItem("tb1", tb); + assertThat(bean.findById("tb1").collectList().block()).isEqualTo(tb); + assertThat(cache.get("tb1").get()).isEqualTo(tb); + context.close(); } @@ -177,6 +328,7 @@ class CacheReproTests { bean.insertItem(tb); assertThat(bean.findById("tb1").get()).isSameAs(tb); assertThat(cache.get("tb1").get()).isSameAs(tb); + context.close(); } @@ -190,6 +342,7 @@ class CacheReproTests { bean.insertItem(tb); assertThat(bean.findById("tb1").get()).isSameAs(tb); assertThat(cache.get("tb1").get()).isSameAs(tb); + context.close(); } @@ -387,6 +540,120 @@ class CacheReproTests { } + public static class Spr14235FutureService { + + @Cacheable(value = "itemCache") + public CompletableFuture findById(String id) { + return CompletableFuture.completedFuture(new TestBean(id)); + } + + @CachePut(cacheNames = "itemCache", key = "#item.name") + public CompletableFuture insertItem(TestBean item) { + return CompletableFuture.completedFuture(item); + } + + @CacheEvict(cacheNames = "itemCache", allEntries = true) + public CompletableFuture clear() { + return CompletableFuture.completedFuture(null); + } + } + + + public static class Spr14235FutureServiceSync { + + @Cacheable(value = "itemCache", sync = true) + public CompletableFuture findById(String id) { + return CompletableFuture.completedFuture(new TestBean(id)); + } + + @CachePut(cacheNames = "itemCache", key = "#item.name") + public TestBean insertItem(TestBean item) { + return item; + } + } + + + public static class Spr14235MonoService { + + @Cacheable(value = "itemCache") + public Mono findById(String id) { + return Mono.just(new TestBean(id)); + } + + @CachePut(cacheNames = "itemCache", key = "#item.name") + public Mono insertItem(TestBean item) { + return Mono.just(item); + } + + @CacheEvict(cacheNames = "itemCache", allEntries = true) + public Mono clear() { + return Mono.empty(); + } + } + + + public static class Spr14235MonoServiceSync { + + @Cacheable(value = "itemCache", sync = true) + public Mono findById(String id) { + return Mono.just(new TestBean(id)); + } + + @CachePut(cacheNames = "itemCache", key = "#item.name") + public TestBean insertItem(TestBean item) { + return item; + } + } + + + public static class Spr14235FluxService { + + private int counter = 0; + + @Cacheable(value = "itemCache") + public Flux findById(String id) { + return Flux.just(new TestBean(id), new TestBean(id + (counter++))); + } + + @CachePut(cacheNames = "itemCache", key = "#id") + public Flux insertItem(String id, List item) { + return Flux.fromIterable(item); + } + + @CacheEvict(cacheNames = "itemCache", allEntries = true) + public Flux clear() { + return Flux.empty(); + } + } + + + public static class Spr14235FluxServiceSync { + + private int counter = 0; + + @Cacheable(value = "itemCache", sync = true) + public Flux findById(String id) { + return Flux.just(new TestBean(id), new TestBean(id + (counter++))); + } + + @CachePut(cacheNames = "itemCache", key = "#id") + public List insertItem(String id, List item) { + return item; + } + } + + + @Configuration + @EnableCaching + public static class Spr14235Config { + + @Bean + public CacheManager cacheManager() { + return new ConcurrentMapCacheManager(); + } + } + + public static class Spr14853Service { @Cacheable(value = "itemCache", sync = true)