diff --git a/spring-tx/src/main/java/org/springframework/transaction/reactive/TransactionalOperator.java b/spring-tx/src/main/java/org/springframework/transaction/reactive/TransactionalOperator.java index a1e753500d..f31a742904 100644 --- a/spring-tx/src/main/java/org/springframework/transaction/reactive/TransactionalOperator.java +++ b/spring-tx/src/main/java/org/springframework/transaction/reactive/TransactionalOperator.java @@ -38,6 +38,10 @@ import org.springframework.transaction.TransactionException; * application services utilizing this class, making calls to the low-level * services via an inner-class callback object. * + *

Transactional Publishers should avoid Subscription cancellation. + * Cancelling initiates asynchronous transaction cleanup that does not allow for + * synchronization on completion. + * * @author Mark Paluch * @author Juergen Hoeller * @since 5.2 @@ -64,9 +68,7 @@ public interface TransactionalOperator { * @throws TransactionException in case of initialization, rollback, or system errors * @throws RuntimeException if thrown by the TransactionCallback */ - default Mono transactional(Mono mono) { - return execute(it -> mono).next(); - } + Mono transactional(Mono mono); /** * Execute the action specified by the given callback object within a transaction. diff --git a/spring-tx/src/main/java/org/springframework/transaction/reactive/TransactionalOperatorImpl.java b/spring-tx/src/main/java/org/springframework/transaction/reactive/TransactionalOperatorImpl.java index 3162525a4e..7e4c3fd5ab 100644 --- a/spring-tx/src/main/java/org/springframework/transaction/reactive/TransactionalOperatorImpl.java +++ b/spring-tx/src/main/java/org/springframework/transaction/reactive/TransactionalOperatorImpl.java @@ -71,6 +71,21 @@ final class TransactionalOperatorImpl implements TransactionalOperator { return this.transactionManager; } + @Override + public Mono transactional(Mono mono) { + return TransactionContextManager.currentContext().flatMap(context -> { + Mono status = this.transactionManager.getReactiveTransaction(this.transactionDefinition); + // This is an around advice: Invoke the next interceptor in the chain. + // This will normally result in a target object being invoked. + // Need re-wrapping of ReactiveTransaction until we get hold of the exception + // through usingWhen. + return status.flatMap(it -> Mono.usingWhen(Mono.just(it), ignore -> mono, + this.transactionManager::commit, s -> Mono.empty()) + .onErrorResume(ex -> rollbackOnException(it, ex).then(Mono.error(ex)))); + }) + .subscriberContext(TransactionContextManager.getOrCreateContext()) + .subscriberContext(TransactionContextManager.getOrCreateContextHolder()); + } @Override public Flux execute(TransactionCallback action) throws TransactionException { diff --git a/spring-tx/src/test/java/org/springframework/transaction/reactive/TransactionalOperatorTests.java b/spring-tx/src/test/java/org/springframework/transaction/reactive/TransactionalOperatorTests.java index 12de331a8b..bc19adb58b 100644 --- a/spring-tx/src/test/java/org/springframework/transaction/reactive/TransactionalOperatorTests.java +++ b/spring-tx/src/test/java/org/springframework/transaction/reactive/TransactionalOperatorTests.java @@ -16,6 +16,8 @@ package org.springframework.transaction.reactive; +import java.util.concurrent.atomic.AtomicBoolean; + import org.junit.jupiter.api.Test; import reactor.core.publisher.Flux; import reactor.core.publisher.Mono; @@ -46,6 +48,19 @@ public class TransactionalOperatorTests { assertThat(tm.rollback).isFalse(); } + @Test + public void monoSubscriptionNotCancelled() { + AtomicBoolean cancelled = new AtomicBoolean(); + TransactionalOperator operator = TransactionalOperator.create(tm, new DefaultTransactionDefinition()); + Mono.just(true).doOnCancel(() -> cancelled.set(true)).as(operator::transactional) + .as(StepVerifier::create) + .expectNext(true) + .verifyComplete(); + assertThat(tm.commit).isTrue(); + assertThat(tm.rollback).isFalse(); + assertThat(cancelled).isFalse(); + } + @Test public void rollbackWithMono() { TransactionalOperator operator = TransactionalOperator.create(tm, new DefaultTransactionDefinition());