diff --git a/spring-core-coroutines/src/main/kotlin/org/springframework/core/CoroutinesUtils.kt b/spring-core-coroutines/src/main/kotlin/org/springframework/core/CoroutinesUtils.kt index b4e4cd8e0a..a9c10c09a9 100644 --- a/spring-core-coroutines/src/main/kotlin/org/springframework/core/CoroutinesUtils.kt +++ b/spring-core-coroutines/src/main/kotlin/org/springframework/core/CoroutinesUtils.kt @@ -23,9 +23,8 @@ import kotlinx.coroutines.FlowPreview import kotlinx.coroutines.GlobalScope import kotlinx.coroutines.async import kotlinx.coroutines.flow.Flow -import kotlinx.coroutines.flow.collect -import kotlinx.coroutines.flow.flow import kotlinx.coroutines.reactive.awaitFirstOrNull +import kotlinx.coroutines.reactive.flow.asPublisher import kotlinx.coroutines.reactor.mono import reactor.core.publisher.Mono @@ -33,8 +32,6 @@ import reactor.core.publisher.onErrorMap import java.lang.reflect.InvocationTargetException import java.lang.reflect.Method import kotlin.reflect.full.callSuspend -import kotlin.reflect.full.isSubtypeOf -import kotlin.reflect.full.starProjectedType import kotlin.reflect.jvm.kotlinFunction /** @@ -56,7 +53,8 @@ internal fun monoToDeferred(source: Mono) = GlobalScope.async(Dispatchers.Unconfined) { source.awaitFirstOrNull() } /** - * Invoke an handler method converting suspending method to [Mono] or [Flow] if necessary. + * Invoke an handler method converting suspending method to [Mono] or + * [reactor.core.publisher.Flux] if necessary. * * @author Sebastien Deleuze * @since 5.2 @@ -66,18 +64,15 @@ internal fun monoToDeferred(source: Mono) = internal fun invokeHandlerMethod(method: Method, bean: Any, vararg args: Any?): Any? { val function = method.kotlinFunction!! return if (function.isSuspend) { - if (function.returnType.isSubtypeOf(Flow::class.starProjectedType)) { - flow { - (function.callSuspend(bean, *args.sliceArray(0..(args.size-2))) as Flow<*>).collect { - emit(it) - } - } + val mono = GlobalScope.mono(Dispatchers.Unconfined) { + function.callSuspend(bean, *args.sliceArray(0..(args.size-2))) + .let { if (it == Unit) null else it } + }.onErrorMap(InvocationTargetException::class) { it.targetException } + if (function.returnType.classifier == Flow::class) { + mono.flatMapMany { (it as Flow).asPublisher() } } else { - GlobalScope.mono(Dispatchers.Unconfined) { - function.callSuspend(bean, *args.sliceArray(0..(args.size-2))) - .let { if (it == Unit) null else it} - }.onErrorMap(InvocationTargetException::class) { it.targetException } + mono } } else {