Browse Source

Add Coroutines support to Spring AOP

This commit adds support for Kotlin Coroutines to Spring AOP
by leveraging CoroutinesUtils#invokeSuspendingFunction in
AopUtils#invokeJoinpointUsingReflection to convert it to the
equivalent Publisher return value, like in other parts of Spring
Framework.

That allows method interceptors with Reactive support to process
related return values.

CglibAopProxy#processReturnType and JdkDynamicAopProxy#invoke
take care of the conversion from the Publisher return value
to Kotlin Coroutines.

Reactive transactional and HTTP service interface support
have been refined to leverage those new generic capabilities.

Closes gh-22462
pull/31115/head
Sébastien Deleuze 1 year ago
parent
commit
c8169e5cad
  1. 2
      integration-tests/integration-tests.gradle
  2. 107
      integration-tests/src/test/kotlin/org/springframework/aop/framework/autoproxy/AspectJAutoProxyInterceptorKotlinIntegrationTests.kt
  3. 4
      spring-aop/spring-aop.gradle
  4. 24
      spring-aop/src/main/java/org/springframework/aop/framework/CglibAopProxy.java
  5. 45
      spring-aop/src/main/java/org/springframework/aop/framework/CoroutinesUtils.java
  6. 9
      spring-aop/src/main/java/org/springframework/aop/framework/JdkDynamicAopProxy.java
  7. 26
      spring-aop/src/main/java/org/springframework/aop/support/AopUtils.java
  8. 13
      spring-aop/src/test/java/org/springframework/aop/support/AopUtilsTests.java
  9. 52
      spring-aop/src/test/kotlin/org/springframework/aop/support/AopUtilsKotlinTests.kt
  10. 19
      spring-tx/src/main/java/org/springframework/transaction/interceptor/TransactionAspectSupport.java
  11. 28
      spring-web/src/main/java/org/springframework/web/service/invoker/HttpServiceProxyFactory.java

2
integration-tests/integration-tests.gradle

@ -1,5 +1,6 @@ @@ -1,5 +1,6 @@
plugins {
id 'org.springframework.build.runtimehints-agent'
id 'kotlin'
}
description = "Spring Integration Tests"
@ -26,6 +27,7 @@ dependencies { @@ -26,6 +27,7 @@ dependencies {
testImplementation("org.aspectj:aspectjweaver")
testImplementation("org.hsqldb:hsqldb")
testImplementation("org.hibernate:hibernate-core-jakarta")
testImplementation("org.jetbrains.kotlinx:kotlinx-coroutines-reactor")
}
normalization {

107
integration-tests/src/test/kotlin/org/springframework/aop/framework/autoproxy/AspectJAutoProxyInterceptorKotlinIntegrationTests.kt

@ -0,0 +1,107 @@ @@ -0,0 +1,107 @@
package org.springframework.aop.framework.autoproxy
import kotlinx.coroutines.delay
import kotlinx.coroutines.runBlocking
import org.aopalliance.intercept.MethodInterceptor
import org.aopalliance.intercept.MethodInvocation
import org.assertj.core.api.Assertions.assertThat
import org.junit.jupiter.api.Test
import org.springframework.aop.framework.autoproxy.AspectJAutoProxyInterceptorKotlinIntegrationTests.InterceptorConfig
import org.springframework.aop.support.StaticMethodMatcherPointcutAdvisor
import org.springframework.beans.factory.annotation.Autowired
import org.springframework.context.annotation.Bean
import org.springframework.context.annotation.Configuration
import org.springframework.context.annotation.EnableAspectJAutoProxy
import org.springframework.test.annotation.DirtiesContext
import org.springframework.test.context.junit.jupiter.SpringJUnitConfig
import reactor.core.publisher.Mono
import java.lang.reflect.Method
/**
* Integration tests for interceptors with Kotlin (with and without Coroutines) configured
* via AspectJ auto-proxy support.
*/
@SpringJUnitConfig(InterceptorConfig::class)
@DirtiesContext(classMode = DirtiesContext.ClassMode.AFTER_EACH_TEST_METHOD)
class AspectJAutoProxyInterceptorKotlinIntegrationTests(
@Autowired val echo: Echo,
@Autowired val firstAdvisor: TestPointcutAdvisor,
@Autowired val secondAdvisor: TestPointcutAdvisor) {
@Test
fun `Multiple interceptors with regular function`() {
assertThat(firstAdvisor.interceptor.invocations).isEmpty()
assertThat(secondAdvisor.interceptor.invocations).isEmpty()
val value = "Hello!"
assertThat(echo.echo(value)).isEqualTo(value)
assertThat(firstAdvisor.interceptor.invocations).singleElement().matches { String::class.java.isAssignableFrom(it) }
assertThat(secondAdvisor.interceptor.invocations).singleElement().matches { String::class.java.isAssignableFrom(it) }
}
@Test
fun `Multiple interceptors with suspending function`() {
assertThat(firstAdvisor.interceptor.invocations).isEmpty()
assertThat(secondAdvisor.interceptor.invocations).isEmpty()
val value = "Hello!"
runBlocking {
assertThat(echo.suspendingEcho(value)).isEqualTo(value)
}
assertThat(firstAdvisor.interceptor.invocations).singleElement().matches { Mono::class.java.isAssignableFrom(it) }
assertThat(secondAdvisor.interceptor.invocations).singleElement().matches { Mono::class.java.isAssignableFrom(it) }
}
@Configuration
@EnableAspectJAutoProxy
open class InterceptorConfig {
@Bean
open fun firstAdvisor() = TestPointcutAdvisor().apply { order = 0 }
@Bean
open fun secondAdvisor() = TestPointcutAdvisor().apply { order = 1 }
@Bean
open fun echo(): Echo {
return Echo()
}
}
class TestMethodInterceptor: MethodInterceptor {
var invocations: MutableList<Class<*>> = mutableListOf()
@Suppress("RedundantNullableReturnType")
override fun invoke(invocation: MethodInvocation): Any? {
val result = invocation.proceed()
invocations.add(result!!.javaClass)
return result
}
}
class TestPointcutAdvisor : StaticMethodMatcherPointcutAdvisor(TestMethodInterceptor()) {
val interceptor: TestMethodInterceptor
get() = advice as TestMethodInterceptor
override fun matches(method: Method, targetClass: Class<*>): Boolean {
return targetClass == Echo::class.java && method.name.lowercase().endsWith("echo")
}
}
open class Echo {
open fun echo(value: String): String {
return value;
}
open suspend fun suspendingEcho(value: String): String {
delay(1)
return value;
}
}
}

4
spring-aop/spring-aop.gradle

@ -1,14 +1,18 @@ @@ -1,14 +1,18 @@
description = "Spring AOP"
apply plugin: "kotlin"
dependencies {
api(project(":spring-beans"))
api(project(":spring-core"))
optional("org.apache.commons:commons-pool2")
optional("org.aspectj:aspectjweaver")
optional("org.jetbrains.kotlinx:kotlinx-coroutines-reactor")
testFixturesImplementation(testFixtures(project(":spring-beans")))
testFixturesImplementation(testFixtures(project(":spring-core")))
testFixturesImplementation("com.google.code.findbugs:jsr305")
testImplementation(project(":spring-core-test"))
testImplementation(testFixtures(project(":spring-beans")))
testImplementation(testFixtures(project(":spring-core")))
testImplementation("org.jetbrains.kotlinx:kotlinx-coroutines-reactor")
}

24
spring-aop/src/main/java/org/springframework/aop/framework/CglibAopProxy.java

@ -47,6 +47,7 @@ import org.springframework.cglib.proxy.MethodInterceptor; @@ -47,6 +47,7 @@ import org.springframework.cglib.proxy.MethodInterceptor;
import org.springframework.cglib.proxy.MethodProxy;
import org.springframework.cglib.proxy.NoOp;
import org.springframework.core.KotlinDetector;
import org.springframework.core.MethodParameter;
import org.springframework.core.SmartClassLoader;
import org.springframework.lang.Nullable;
import org.springframework.util.Assert;
@ -75,6 +76,7 @@ import org.springframework.util.ReflectionUtils; @@ -75,6 +76,7 @@ import org.springframework.util.ReflectionUtils;
* @author Ramnivas Laddad
* @author Chris Beams
* @author Dave Syer
* @author Sebastien Deleuze
* @see org.springframework.cglib.proxy.Enhancer
* @see AdvisedSupport#setProxyTargetClass
* @see DefaultAopProxyFactory
@ -98,6 +100,8 @@ class CglibAopProxy implements AopProxy, Serializable { @@ -98,6 +100,8 @@ class CglibAopProxy implements AopProxy, Serializable {
/** Keeps track of the Classes that we have validated for final methods. */
private static final Map<Class<?>, Boolean> validatedClasses = new WeakHashMap<>();
private static final String COROUTINES_FLOW_CLASS_NAME = "kotlinx.coroutines.flow.Flow";
/** The configuration used to configure this proxy. */
protected final AdvisedSupport advised;
@ -399,10 +403,11 @@ class CglibAopProxy implements AopProxy, Serializable { @@ -399,10 +403,11 @@ class CglibAopProxy implements AopProxy, Serializable {
/**
* Process a return value. Wraps a return of {@code this} if necessary to be the
* {@code proxy} and also verifies that {@code null} is not returned as a primitive.
* Also takes care of the conversion from {@code Mono} to Kotlin Coroutines if needed.
*/
@Nullable
private static Object processReturnType(
Object proxy, @Nullable Object target, Method method, @Nullable Object returnValue) {
Object proxy, @Nullable Object target, Method method, Object[] arguments, @Nullable Object returnValue) {
// Massage return value if necessary
if (returnValue != null && returnValue == target &&
@ -416,6 +421,11 @@ class CglibAopProxy implements AopProxy, Serializable { @@ -416,6 +421,11 @@ class CglibAopProxy implements AopProxy, Serializable {
throw new AopInvocationException(
"Null return value from advice does not match primitive return type for: " + method);
}
if (KotlinDetector.isSuspendingFunction(method)) {
return COROUTINES_FLOW_CLASS_NAME.equals(new MethodParameter(method, -1).getParameterType().getName()) ?
CoroutinesUtils.asFlow(returnValue) :
CoroutinesUtils.awaitSingleOrNull(returnValue, arguments[arguments.length - 1]);
}
return returnValue;
}
@ -446,7 +456,7 @@ class CglibAopProxy implements AopProxy, Serializable { @@ -446,7 +456,7 @@ class CglibAopProxy implements AopProxy, Serializable {
@Nullable
public Object intercept(Object proxy, Method method, Object[] args, MethodProxy methodProxy) throws Throwable {
Object retVal = AopUtils.invokeJoinpointUsingReflection(this.target, method, args);
return processReturnType(proxy, this.target, method, retVal);
return processReturnType(proxy, this.target, method, args, retVal);
}
}
@ -471,7 +481,7 @@ class CglibAopProxy implements AopProxy, Serializable { @@ -471,7 +481,7 @@ class CglibAopProxy implements AopProxy, Serializable {
try {
oldProxy = AopContext.setCurrentProxy(proxy);
Object retVal = AopUtils.invokeJoinpointUsingReflection(this.target, method, args);
return processReturnType(proxy, this.target, method, retVal);
return processReturnType(proxy, this.target, method, args, retVal);
}
finally {
AopContext.setCurrentProxy(oldProxy);
@ -499,7 +509,7 @@ class CglibAopProxy implements AopProxy, Serializable { @@ -499,7 +509,7 @@ class CglibAopProxy implements AopProxy, Serializable {
Object target = this.targetSource.getTarget();
try {
Object retVal = AopUtils.invokeJoinpointUsingReflection(target, method, args);
return processReturnType(proxy, target, method, retVal);
return processReturnType(proxy, target, method, args, retVal);
}
finally {
if (target != null) {
@ -529,7 +539,7 @@ class CglibAopProxy implements AopProxy, Serializable { @@ -529,7 +539,7 @@ class CglibAopProxy implements AopProxy, Serializable {
try {
oldProxy = AopContext.setCurrentProxy(proxy);
Object retVal = AopUtils.invokeJoinpointUsingReflection(target, method, args);
return processReturnType(proxy, target, method, retVal);
return processReturnType(proxy, target, method, args, retVal);
}
finally {
AopContext.setCurrentProxy(oldProxy);
@ -656,7 +666,7 @@ class CglibAopProxy implements AopProxy, Serializable { @@ -656,7 +666,7 @@ class CglibAopProxy implements AopProxy, Serializable {
proxy, this.target, method, args, this.targetClass, this.adviceChain, methodProxy);
// If we get here, we need to create a MethodInvocation.
Object retVal = invocation.proceed();
retVal = processReturnType(proxy, this.target, method, retVal);
retVal = processReturnType(proxy, this.target, method, args, retVal);
return retVal;
}
}
@ -706,7 +716,7 @@ class CglibAopProxy implements AopProxy, Serializable { @@ -706,7 +716,7 @@ class CglibAopProxy implements AopProxy, Serializable {
// We need to create a method invocation...
retVal = new CglibMethodInvocation(proxy, target, method, args, targetClass, chain, methodProxy).proceed();
}
return processReturnType(proxy, target, method, retVal);
return processReturnType(proxy, target, method, args, retVal);
}
finally {
if (target != null && !targetSource.isStatic()) {

45
spring-aop/src/main/java/org/springframework/aop/framework/CoroutinesUtils.java

@ -0,0 +1,45 @@ @@ -0,0 +1,45 @@
/*
* Copyright 2002-2023 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.springframework.aop.framework;
import kotlin.coroutines.Continuation;
import kotlinx.coroutines.reactive.ReactiveFlowKt;
import kotlinx.coroutines.reactor.MonoKt;
import org.reactivestreams.Publisher;
import reactor.core.publisher.Mono;
import org.springframework.lang.Nullable;
/**
* Package-visible class designed to avoid a hard dependency on Kotlin and Coroutines dependency at runtime.
*
* @author Sebastien Deleuze
* @since 6.1.0
*/
abstract class CoroutinesUtils {
static Object asFlow(Object publisher) {
return ReactiveFlowKt.asFlow((Publisher<?>) publisher);
}
@SuppressWarnings("unchecked")
@Nullable
static Object awaitSingleOrNull(Object mono, Object continuation) {
return MonoKt.awaitSingleOrNull((Mono<?>) mono, (Continuation<Object>) continuation);
}
}

9
spring-aop/src/main/java/org/springframework/aop/framework/JdkDynamicAopProxy.java

@ -31,6 +31,8 @@ import org.springframework.aop.RawTargetAccess; @@ -31,6 +31,8 @@ import org.springframework.aop.RawTargetAccess;
import org.springframework.aop.TargetSource;
import org.springframework.aop.support.AopUtils;
import org.springframework.core.DecoratingProxy;
import org.springframework.core.KotlinDetector;
import org.springframework.core.MethodParameter;
import org.springframework.lang.Nullable;
import org.springframework.util.Assert;
import org.springframework.util.ClassUtils;
@ -58,6 +60,7 @@ import org.springframework.util.ClassUtils; @@ -58,6 +60,7 @@ import org.springframework.util.ClassUtils;
* @author Rob Harrop
* @author Dave Syer
* @author Sergey Tsypanov
* @author Sebastien Deleuze
* @see java.lang.reflect.Proxy
* @see AdvisedSupport
* @see ProxyFactory
@ -80,6 +83,8 @@ final class JdkDynamicAopProxy implements AopProxy, InvocationHandler, Serializa @@ -80,6 +83,8 @@ final class JdkDynamicAopProxy implements AopProxy, InvocationHandler, Serializa
/** We use a static Log to avoid serialization issues. */
private static final Log logger = LogFactory.getLog(JdkDynamicAopProxy.class);
private static final String COROUTINES_FLOW_CLASS_NAME = "kotlinx.coroutines.flow.Flow";
/** Config used to configure this proxy. */
private final AdvisedSupport advised;
@ -258,6 +263,10 @@ final class JdkDynamicAopProxy implements AopProxy, InvocationHandler, Serializa @@ -258,6 +263,10 @@ final class JdkDynamicAopProxy implements AopProxy, InvocationHandler, Serializa
throw new AopInvocationException(
"Null return value from advice does not match primitive return type for: " + method);
}
if (KotlinDetector.isSuspendingFunction(method)) {
return COROUTINES_FLOW_CLASS_NAME.equals(new MethodParameter(method, -1).getParameterType().getName()) ?
CoroutinesUtils.asFlow(retVal) : CoroutinesUtils.awaitSingleOrNull(retVal, args[args.length - 1]);
}
return retVal;
}
finally {

26
spring-aop/src/main/java/org/springframework/aop/support/AopUtils.java

@ -1,5 +1,5 @@ @@ -1,5 +1,5 @@
/*
* Copyright 2002-2022 the original author or authors.
* Copyright 2002-2023 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@ -25,6 +25,11 @@ import java.util.LinkedHashSet; @@ -25,6 +25,11 @@ import java.util.LinkedHashSet;
import java.util.List;
import java.util.Set;
import kotlin.coroutines.Continuation;
import kotlin.coroutines.CoroutineContext;
import kotlinx.coroutines.Job;
import org.reactivestreams.Publisher;
import org.springframework.aop.Advisor;
import org.springframework.aop.AopInvocationException;
import org.springframework.aop.IntroductionAdvisor;
@ -35,6 +40,8 @@ import org.springframework.aop.PointcutAdvisor; @@ -35,6 +40,8 @@ import org.springframework.aop.PointcutAdvisor;
import org.springframework.aop.SpringProxy;
import org.springframework.aop.TargetClassAware;
import org.springframework.core.BridgeMethodResolver;
import org.springframework.core.CoroutinesUtils;
import org.springframework.core.KotlinDetector;
import org.springframework.core.MethodIntrospector;
import org.springframework.lang.Nullable;
import org.springframework.util.Assert;
@ -53,6 +60,7 @@ import org.springframework.util.ReflectionUtils; @@ -53,6 +60,7 @@ import org.springframework.util.ReflectionUtils;
* @author Rod Johnson
* @author Juergen Hoeller
* @author Rob Harrop
* @author Sebastien Deleuze
* @see org.springframework.aop.framework.AopProxyUtils
*/
public abstract class AopUtils {
@ -340,7 +348,8 @@ public abstract class AopUtils { @@ -340,7 +348,8 @@ public abstract class AopUtils {
// Use reflection to invoke the method.
try {
ReflectionUtils.makeAccessible(method);
return method.invoke(target, args);
return KotlinDetector.isSuspendingFunction(method) ?
KotlinDelegate.invokeSuspendingFunction(method, target, args) : method.invoke(target, args);
}
catch (InvocationTargetException ex) {
// Invoked method threw a checked exception.
@ -356,4 +365,17 @@ public abstract class AopUtils { @@ -356,4 +365,17 @@ public abstract class AopUtils {
}
}
/**
* Inner class to avoid a hard dependency on Kotlin at runtime.
*/
private static class KotlinDelegate {
public static Publisher<?> invokeSuspendingFunction(Method method, Object target, Object... args) {
Continuation<?> continuation = (Continuation<?>) args[args.length -1];
CoroutineContext context = continuation.getContext().minusKey(Job.Key);
return CoroutinesUtils.invokeSuspendingFunction(context, method, target, args);
}
}
}

13
spring-aop/src/test/java/org/springframework/aop/support/AopUtilsTests.java

@ -1,5 +1,5 @@ @@ -1,5 +1,5 @@
/*
* Copyright 2002-2019 the original author or authors.
* Copyright 2002-2023 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@ -29,12 +29,14 @@ import org.springframework.aop.testfixture.interceptor.NopInterceptor; @@ -29,12 +29,14 @@ import org.springframework.aop.testfixture.interceptor.NopInterceptor;
import org.springframework.beans.testfixture.beans.TestBean;
import org.springframework.core.testfixture.io.SerializationTestUtils;
import org.springframework.lang.Nullable;
import org.springframework.util.ReflectionUtils;
import static org.assertj.core.api.Assertions.assertThat;
/**
* @author Rod Johnson
* @author Chris Beams
* @author Sebastien Deleuze
*/
public class AopUtilsTests {
@ -88,4 +90,13 @@ public class AopUtilsTests { @@ -88,4 +90,13 @@ public class AopUtilsTests {
assertThat(SerializationTestUtils.serializeAndDeserialize(ExposeInvocationInterceptor.INSTANCE)).isSameAs(ExposeInvocationInterceptor.INSTANCE);
}
@Test
public void testInvokeJoinpointUsingReflection() throws Throwable {
String name = "foo";
TestBean testBean = new TestBean(name);
Method method = ReflectionUtils.findMethod(TestBean.class, "getName");
Object result = AopUtils.invokeJoinpointUsingReflection(testBean, method, new Object[0]);
assertThat(result).isEqualTo(name);
}
}

52
spring-aop/src/test/kotlin/org/springframework/aop/support/AopUtilsKotlinTests.kt

@ -0,0 +1,52 @@ @@ -0,0 +1,52 @@
/*
* Copyright 2002-2023 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.springframework.aop.support
import kotlinx.coroutines.CoroutineName
import kotlinx.coroutines.delay
import org.assertj.core.api.Assertions.assertThat
import org.junit.jupiter.api.Test
import org.springframework.util.ReflectionUtils
import reactor.core.publisher.Mono
import kotlin.coroutines.Continuation
/**
* Tests for Kotlin support in [AopUtils].
*
* @author Sebastien Deleuze
*/
class AopUtilsKotlinTests {
@Test
fun `Invoking suspending function should return Mono`() {
val value = "foo"
val method = ReflectionUtils.findMethod(AopUtilsKotlinTests::class.java, "suspendingFunction",
String::class.java, Continuation::class.java)!!
val continuation = Continuation<Any>(CoroutineName("test")) { }
val result = AopUtils.invokeJoinpointUsingReflection(this, method, arrayOf(value, continuation))
assertThat(result).isInstanceOfSatisfying(Mono::class.java) {
assertThat(it.block()).isEqualTo(value)
}
}
@Suppress("unused")
suspend fun suspendingFunction(value: String): String {
delay(1)
return value;
}
}

19
spring-tx/src/main/java/org/springframework/transaction/interceptor/TransactionAspectSupport.java

@ -26,8 +26,6 @@ import io.vavr.control.Try; @@ -26,8 +26,6 @@ import io.vavr.control.Try;
import kotlin.coroutines.Continuation;
import kotlin.coroutines.CoroutineContext;
import kotlinx.coroutines.Job;
import kotlinx.coroutines.reactive.ReactiveFlowKt;
import kotlinx.coroutines.reactor.MonoKt;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.reactivestreams.Publisher;
@ -370,12 +368,7 @@ public abstract class TransactionAspectSupport implements BeanFactoryAware, Init @@ -370,12 +368,7 @@ public abstract class TransactionAspectSupport implements BeanFactoryAware, Init
if (corInv != null) {
callback = () -> KotlinDelegate.invokeSuspendingFunction(method, corInv);
}
Object result = txSupport.invokeWithinTransaction(method, targetClass, callback, txAttr, rtm);
if (corInv != null) {
return (hasSuspendingFlowReturnType ? KotlinDelegate.asFlow((Publisher<?>) result) :
KotlinDelegate.awaitSingleOrNull((Mono<?>) result, corInv.getContinuation()));
}
return result;
return txSupport.invokeWithinTransaction(method, targetClass, callback, txAttr, rtm);
}
PlatformTransactionManager ptm = asPlatformTransactionManager(tm);
@ -896,16 +889,6 @@ public abstract class TransactionAspectSupport implements BeanFactoryAware, Init @@ -896,16 +889,6 @@ public abstract class TransactionAspectSupport implements BeanFactoryAware, Init
*/
private static class KotlinDelegate {
private static Object asFlow(Publisher<?> publisher) {
return ReactiveFlowKt.asFlow(publisher);
}
@SuppressWarnings("unchecked")
@Nullable
private static Object awaitSingleOrNull(Mono<?> publisher, Object continuation) {
return MonoKt.awaitSingleOrNull(publisher, (Continuation<Object>) continuation);
}
public static Publisher<?> invokeSuspendingFunction(Method method, CoroutinesInvocationCallback callback) {
CoroutineContext coroutineContext = ((Continuation<?>) callback.getContinuation()).getContext().minusKey(Job.Key);
return CoroutinesUtils.invokeSuspendingFunction(coroutineContext, method, callback.getTarget(), callback.getArguments());

28
spring-web/src/main/java/org/springframework/web/service/invoker/HttpServiceProxyFactory.java

@ -25,11 +25,8 @@ import java.util.Map; @@ -25,11 +25,8 @@ import java.util.Map;
import java.util.function.Function;
import java.util.stream.Collectors;
import kotlin.coroutines.Continuation;
import kotlinx.coroutines.reactor.MonoKt;
import org.aopalliance.intercept.MethodInterceptor;
import org.aopalliance.intercept.MethodInvocation;
import reactor.core.publisher.Mono;
import org.springframework.aop.framework.ProxyFactory;
import org.springframework.aop.framework.ReflectiveMethodInvocation;
@ -301,10 +298,9 @@ public final class HttpServiceProxyFactory { @@ -301,10 +298,9 @@ public final class HttpServiceProxyFactory {
Method method = invocation.getMethod();
HttpServiceMethod httpServiceMethod = this.httpServiceMethods.get(method);
if (httpServiceMethod != null) {
if (KotlinDetector.isSuspendingFunction(method)) {
return KotlinDelegate.invokeSuspendingFunction(invocation, httpServiceMethod);
}
return httpServiceMethod.invoke(invocation.getArguments());
Object[] arguments = KotlinDetector.isSuspendingFunction(method) ?
resolveCoroutinesArguments(invocation.getArguments()) : invocation.getArguments();
return httpServiceMethod.invoke(arguments);
}
if (method.isDefault()) {
if (invocation instanceof ReflectiveMethodInvocation reflectiveMethodInvocation) {
@ -314,27 +310,13 @@ public final class HttpServiceProxyFactory { @@ -314,27 +310,13 @@ public final class HttpServiceProxyFactory {
}
throw new IllegalStateException("Unexpected method invocation: " + method);
}
}
/**
* Inner class to avoid a hard dependency on Kotlin at runtime.
*/
@SuppressWarnings("unchecked")
private static class KotlinDelegate {
public static Object invokeSuspendingFunction(MethodInvocation invocation, HttpServiceMethod httpServiceMethod) {
Object[] rawArguments = invocation.getArguments();
Object[] arguments = resolveArguments(rawArguments);
Continuation<Object> continuation = (Continuation<Object>) rawArguments[rawArguments.length - 1];
Mono<Object> wrapped = (Mono<Object>) httpServiceMethod.invoke(arguments);
return MonoKt.awaitSingleOrNull(wrapped, continuation);
}
private static Object[] resolveArguments(Object[] args) {
private static Object[] resolveCoroutinesArguments(Object[] args) {
Object[] functionArgs = new Object[args.length - 1];
System.arraycopy(args, 0, functionArgs, 0, args.length - 1);
return functionArgs;
}
}
}

Loading…
Cancel
Save