Browse Source

Refine Coroutines support in HttpServiceProxyFactory

Closes gh-29527
pull/29946/head
Sébastien Deleuze 2 years ago
parent
commit
cb63164593
  1. 5
      spring-core/src/main/java/org/springframework/core/CoroutinesUtils.java
  2. 1
      spring-web/src/main/java/org/springframework/web/service/invoker/HttpServiceMethod.java
  3. 27
      spring-web/src/main/java/org/springframework/web/service/invoker/HttpServiceProxyFactory.java

5
spring-core/src/main/java/org/springframework/core/CoroutinesUtils.java

@ -21,7 +21,6 @@ import java.lang.reflect.Method; @@ -21,7 +21,6 @@ import java.lang.reflect.Method;
import java.util.Objects;
import kotlin.Unit;
import kotlin.coroutines.Continuation;
import kotlin.coroutines.CoroutineContext;
import kotlin.jvm.JvmClassMappingKt;
import kotlin.reflect.KClass;
@ -68,10 +67,6 @@ public abstract class CoroutinesUtils { @@ -68,10 +67,6 @@ public abstract class CoroutinesUtils {
(scope, continuation) -> MonoKt.awaitSingleOrNull(source, continuation));
}
public static <T> Object awaitSingleOrNull(Mono<T> source, Continuation<T> continuation) {
return MonoKt.awaitSingleOrNull(source, continuation);
}
/**
* Invoke a suspending function and converts it to {@link Mono} or
* {@link Flux}. Uses an {@linkplain Dispatchers#getUnconfined() unconfined}

1
spring-web/src/main/java/org/springframework/web/service/invoker/HttpServiceMethod.java

@ -84,7 +84,6 @@ final class HttpServiceMethod { @@ -84,7 +84,6 @@ final class HttpServiceMethod {
if (count == 0) {
return new MethodParameter[0];
}
if (KotlinDetector.isSuspendingFunction(method)) {
count -= 1;
}

27
spring-web/src/main/java/org/springframework/web/service/invoker/HttpServiceProxyFactory.java

@ -26,13 +26,13 @@ import java.util.function.Function; @@ -26,13 +26,13 @@ import java.util.function.Function;
import java.util.stream.Collectors;
import kotlin.coroutines.Continuation;
import kotlinx.coroutines.reactor.MonoKt;
import org.aopalliance.intercept.MethodInterceptor;
import org.aopalliance.intercept.MethodInvocation;
import reactor.core.publisher.Mono;
import org.springframework.aop.framework.ProxyFactory;
import org.springframework.aop.framework.ReflectiveMethodInvocation;
import org.springframework.core.CoroutinesUtils;
import org.springframework.core.KotlinDetector;
import org.springframework.core.MethodIntrospector;
import org.springframework.core.ReactiveAdapterRegistry;
@ -268,18 +268,13 @@ public final class HttpServiceProxyFactory { @@ -268,18 +268,13 @@ public final class HttpServiceProxyFactory {
}
@Override
@SuppressWarnings({"unchecked"})
public Object invoke(MethodInvocation invocation) throws Throwable {
Method method = invocation.getMethod();
HttpServiceMethod httpServiceMethod = this.httpServiceMethods.get(method);
if (httpServiceMethod != null) {
if (KotlinDetector.isSuspendingFunction(method)) {
Object[] arguments = getSuspendedFunctionArgs(invocation.getArguments());
Continuation<Object> continuation = resolveContinuationArgument(invocation.getArguments());
Mono<Object> wrapped = (Mono<Object>) httpServiceMethod.invoke(arguments);
return CoroutinesUtils.awaitSingleOrNull(wrapped, continuation);
return KotlinDelegate.invokeSuspendingFunction(invocation, httpServiceMethod);
}
return httpServiceMethod.invoke(invocation.getArguments());
}
if (method.isDefault()) {
@ -290,13 +285,23 @@ public final class HttpServiceProxyFactory { @@ -290,13 +285,23 @@ public final class HttpServiceProxyFactory {
}
throw new IllegalStateException("Unexpected method invocation: " + method);
}
}
@SuppressWarnings({"unchecked"})
private static <T> Continuation<T> resolveContinuationArgument(Object[] args) {
return (Continuation<T>) args[args.length - 1];
/**
* Inner class to avoid a hard dependency on Kotlin at runtime.
*/
@SuppressWarnings("unchecked")
private static class KotlinDelegate {
public static Object invokeSuspendingFunction(MethodInvocation invocation, HttpServiceMethod httpServiceMethod) {
Object[] rawArguments = invocation.getArguments();
Object[] arguments = resolveArguments(rawArguments);
Continuation<Object> continuation = (Continuation<Object>) rawArguments[rawArguments.length - 1];
Mono<Object> wrapped = (Mono<Object>) httpServiceMethod.invoke(arguments);
return MonoKt.awaitSingleOrNull(wrapped, continuation);
}
private static Object[] getSuspendedFunctionArgs(Object[] args) {
private static Object[] resolveArguments(Object[] args) {
Object[] functionArgs = new Object[args.length - 1];
System.arraycopy(args, 0, functionArgs, 0, args.length - 1);
return functionArgs;

Loading…
Cancel
Save