Browse Source

Improve Coroutines transaction API

As a follow-up of gh-22915, the purpose of this commit is to improve
Coroutines programmatic transaction API to make it more consistent with
the Java one and more idiomatic.

For suspending functions, this commit changes the
TransactionalOperator.transactional extension with a suspending lambda
parameter to a TransactionalOperator.executeAndAwait one which is
conceptually closer to TransactionalOperator.execute Java API so more
consistent.

For Flow, the TransactionalOperator.transactional extension is correct
but would be more idiomatic as a Flow extension.

This commit also adds code samples to the reference documentation.

Closes gh-23627
pull/23629/head
Sebastien Deleuze 5 years ago
parent
commit
fc6480631e
  1. 13
      spring-tx/src/main/kotlin/org/springframework/transaction/reactive/TransactionalOperatorExtensions.kt
  2. 8
      spring-tx/src/test/kotlin/org/springframework/transaction/reactive/TransactionalOperatorExtensionsTests.kt
  3. 47
      src/docs/asciidoc/languages/kotlin.adoc

13
spring-tx/src/main/kotlin/org/springframework/transaction/reactive/TransactionalOperatorExtensions.kt

@ -7,23 +7,24 @@ import kotlinx.coroutines.reactive.asFlow @@ -7,23 +7,24 @@ import kotlinx.coroutines.reactive.asFlow
import kotlinx.coroutines.reactive.awaitFirstOrNull
import kotlinx.coroutines.reactor.asFlux
import kotlinx.coroutines.reactor.mono
import org.springframework.transaction.ReactiveTransaction
/**
* Coroutines variant of [TransactionalOperator.transactional] with a [Flow] parameter.
* Coroutines variant of [TransactionalOperator.transactional] as a [Flow] extension.
*
* @author Sebastien Deleuze
* @since 5.2
*/
@ExperimentalCoroutinesApi
fun <T : Any> TransactionalOperator.transactional(flow: Flow<T>): Flow<T> =
transactional(flow.asFlux()).asFlow()
fun <T : Any> Flow<T>.transactional(operator: TransactionalOperator): Flow<T> =
operator.transactional(asFlux()).asFlow()
/**
* Coroutines variant of [TransactionalOperator.transactional] with a suspending lambda
* Coroutines variant of [TransactionalOperator.execute] with a suspending lambda
* parameter.
*
* @author Sebastien Deleuze
* @since 5.2
*/
suspend fun <T : Any> TransactionalOperator.transactional(f: suspend () -> T?): T? =
transactional(mono(Dispatchers.Unconfined) { f() }).awaitFirstOrNull()
suspend fun <T : Any> TransactionalOperator.executeAndAwait(f: suspend (ReactiveTransaction) -> T?): T? =
execute { status -> mono(Dispatchers.Unconfined) { f(status) } }.awaitFirstOrNull()

8
spring-tx/src/test/kotlin/org/springframework/transaction/reactive/TransactionalOperatorExtensionsTests.kt

@ -34,7 +34,7 @@ class TransactionalOperatorExtensionsTests { @@ -34,7 +34,7 @@ class TransactionalOperatorExtensionsTests {
fun commitWithSuspendingFunction() {
val operator = TransactionalOperator.create(tm, DefaultTransactionDefinition())
runBlocking {
operator.transactional {
operator.executeAndAwait {
delay(1)
true
}
@ -48,7 +48,7 @@ class TransactionalOperatorExtensionsTests { @@ -48,7 +48,7 @@ class TransactionalOperatorExtensionsTests {
val operator = TransactionalOperator.create(tm, DefaultTransactionDefinition())
runBlocking {
try {
operator.transactional {
operator.executeAndAwait {
delay(1)
throw IllegalStateException()
}
@ -72,7 +72,7 @@ class TransactionalOperatorExtensionsTests { @@ -72,7 +72,7 @@ class TransactionalOperatorExtensionsTests {
emit(4)
}
runBlocking {
val list = operator.transactional(flow).toList()
val list = flow.transactional(operator).toList()
assertThat(list).hasSize(4)
}
assertThat(tm.commit).isTrue()
@ -89,7 +89,7 @@ class TransactionalOperatorExtensionsTests { @@ -89,7 +89,7 @@ class TransactionalOperatorExtensionsTests {
}
runBlocking {
try {
operator.transactional(flow).toList()
flow.transactional(operator).toList()
} catch (ex: IllegalStateException) {
assertThat(tm.commit).isFalse()
assertThat(tm.rollback).isTrue()

47
src/docs/asciidoc/languages/kotlin.adoc

@ -577,8 +577,51 @@ class UserHandler(builder: WebClient.Builder) { @@ -577,8 +577,51 @@ class UserHandler(builder: WebClient.Builder) {
=== Transactions
Transactions on Coroutines are supported via the programmatic variant of the Reactive
transaction management provided as of Spring Framework 5.2. `TransactionalOperator.transactional`
extensions with suspending lambda and Kotlin `Flow` parameter are provided for that purpose.
transaction management provided as of Spring Framework 5.2.
For suspending functions, a `TransactionalOperator.executeAndAwait` extension is provided.
[source,kotlin,indent=0]
----
import org.springframework.transaction.reactive.executeAndAwait
class PersonRepository(private val operator: TransactionalOperator) {
suspend fun initDatabase() = operator.executeAndAwait {
insertPerson1()
insertPerson2()
}
private suspend fun insertPerson1() {
// INSERT SQL statement
}
private suspend fun insertPerson2() {
// INSERT SQL statement
}
}
----
For Kotlin `Flow`, a `Flow<T>.transactional` extension is provided.
[source,kotlin,indent=0]
----
import org.springframework.transaction.reactive.transactional
class PersonRepository(private val operator: TransactionalOperator) {
fun updatePeople() = findPeople().map(::updatePerson).transactional(operator)
private fun findPeople(): Flow<Person> {
// SELECT SQL statement
}
private suspend fun updatePerson(person: Person): Person {
// UPDATE SQL statement
}
}
----
[[kotlin-spring-projects-in-kotlin]]
== Spring Projects in Kotlin

Loading…
Cancel
Save