From cd5dc84832376c1b0b2b2e8779d6956a91597f5e Mon Sep 17 00:00:00 2001 From: Sebastien Deleuze Date: Tue, 23 Apr 2019 17:20:11 +0200 Subject: [PATCH] Improve WebFlux suspending handler method support This commit leverages Flux instead of Flow to support suspending handler methods returning Flow in order to avoid multiple invocations of the suspending function on every collect(). See gh-22820 --- .../springframework/core/CoroutinesUtils.kt | 25 ++++++++----------- 1 file changed, 10 insertions(+), 15 deletions(-) diff --git a/spring-core-coroutines/src/main/kotlin/org/springframework/core/CoroutinesUtils.kt b/spring-core-coroutines/src/main/kotlin/org/springframework/core/CoroutinesUtils.kt index b4e4cd8e0a..a9c10c09a9 100644 --- a/spring-core-coroutines/src/main/kotlin/org/springframework/core/CoroutinesUtils.kt +++ b/spring-core-coroutines/src/main/kotlin/org/springframework/core/CoroutinesUtils.kt @@ -23,9 +23,8 @@ import kotlinx.coroutines.FlowPreview import kotlinx.coroutines.GlobalScope import kotlinx.coroutines.async import kotlinx.coroutines.flow.Flow -import kotlinx.coroutines.flow.collect -import kotlinx.coroutines.flow.flow import kotlinx.coroutines.reactive.awaitFirstOrNull +import kotlinx.coroutines.reactive.flow.asPublisher import kotlinx.coroutines.reactor.mono import reactor.core.publisher.Mono @@ -33,8 +32,6 @@ import reactor.core.publisher.onErrorMap import java.lang.reflect.InvocationTargetException import java.lang.reflect.Method import kotlin.reflect.full.callSuspend -import kotlin.reflect.full.isSubtypeOf -import kotlin.reflect.full.starProjectedType import kotlin.reflect.jvm.kotlinFunction /** @@ -56,7 +53,8 @@ internal fun monoToDeferred(source: Mono) = GlobalScope.async(Dispatchers.Unconfined) { source.awaitFirstOrNull() } /** - * Invoke an handler method converting suspending method to [Mono] or [Flow] if necessary. + * Invoke an handler method converting suspending method to [Mono] or + * [reactor.core.publisher.Flux] if necessary. * * @author Sebastien Deleuze * @since 5.2 @@ -66,18 +64,15 @@ internal fun monoToDeferred(source: Mono) = internal fun invokeHandlerMethod(method: Method, bean: Any, vararg args: Any?): Any? { val function = method.kotlinFunction!! return if (function.isSuspend) { - if (function.returnType.isSubtypeOf(Flow::class.starProjectedType)) { - flow { - (function.callSuspend(bean, *args.sliceArray(0..(args.size-2))) as Flow<*>).collect { - emit(it) - } - } + val mono = GlobalScope.mono(Dispatchers.Unconfined) { + function.callSuspend(bean, *args.sliceArray(0..(args.size-2))) + .let { if (it == Unit) null else it } + }.onErrorMap(InvocationTargetException::class) { it.targetException } + if (function.returnType.classifier == Flow::class) { + mono.flatMapMany { (it as Flow).asPublisher() } } else { - GlobalScope.mono(Dispatchers.Unconfined) { - function.callSuspend(bean, *args.sliceArray(0..(args.size-2))) - .let { if (it == Unit) null else it} - }.onErrorMap(InvocationTargetException::class) { it.targetException } + mono } } else {