Browse Source

Omit cancellation of transactional Monos in TransactionOperator

TransactionOperator.as(Mono) now no longer short-cuts via a Flux.next() but provides an implementation via Mono.usingWhen(…).
The short-cut previously issued a cancellation signal to the transactional Mono causing the transaction cleanup to happen without a handle for synchronization.

Using Mono.usingWhen(…) initiates transaction cleanup when the Mono completes eliminating the need for cancellation of the transactional Publisher.

This change does not fully fix gh-23304 but it softens its impact because TransactionalOperator.transactional(Mono) avoids cancellation.
pull/23676/head
Mark Paluch 6 years ago committed by Juergen Hoeller
parent
commit
393a81d4a9
  1. 8
      spring-tx/src/main/java/org/springframework/transaction/reactive/TransactionalOperator.java
  2. 15
      spring-tx/src/main/java/org/springframework/transaction/reactive/TransactionalOperatorImpl.java
  3. 15
      spring-tx/src/test/java/org/springframework/transaction/reactive/TransactionalOperatorTests.java

8
spring-tx/src/main/java/org/springframework/transaction/reactive/TransactionalOperator.java

@ -38,6 +38,10 @@ import org.springframework.transaction.TransactionException; @@ -38,6 +38,10 @@ import org.springframework.transaction.TransactionException;
* application services utilizing this class, making calls to the low-level
* services via an inner-class callback object.
*
* <p>Transactional Publishers should avoid Subscription cancellation.
* Cancelling initiates asynchronous transaction cleanup that does not allow for
* synchronization on completion.
*
* @author Mark Paluch
* @author Juergen Hoeller
* @since 5.2
@ -64,9 +68,7 @@ public interface TransactionalOperator { @@ -64,9 +68,7 @@ public interface TransactionalOperator {
* @throws TransactionException in case of initialization, rollback, or system errors
* @throws RuntimeException if thrown by the TransactionCallback
*/
default <T> Mono<T> transactional(Mono<T> mono) {
return execute(it -> mono).next();
}
<T> Mono<T> transactional(Mono<T> mono);
/**
* Execute the action specified by the given callback object within a transaction.

15
spring-tx/src/main/java/org/springframework/transaction/reactive/TransactionalOperatorImpl.java

@ -71,6 +71,21 @@ final class TransactionalOperatorImpl implements TransactionalOperator { @@ -71,6 +71,21 @@ final class TransactionalOperatorImpl implements TransactionalOperator {
return this.transactionManager;
}
@Override
public <T> Mono<T> transactional(Mono<T> mono) {
return TransactionContextManager.currentContext().flatMap(context -> {
Mono<ReactiveTransaction> status = this.transactionManager.getReactiveTransaction(this.transactionDefinition);
// This is an around advice: Invoke the next interceptor in the chain.
// This will normally result in a target object being invoked.
// Need re-wrapping of ReactiveTransaction until we get hold of the exception
// through usingWhen.
return status.flatMap(it -> Mono.usingWhen(Mono.just(it), ignore -> mono,
this.transactionManager::commit, s -> Mono.empty())
.onErrorResume(ex -> rollbackOnException(it, ex).then(Mono.error(ex))));
})
.subscriberContext(TransactionContextManager.getOrCreateContext())
.subscriberContext(TransactionContextManager.getOrCreateContextHolder());
}
@Override
public <T> Flux<T> execute(TransactionCallback<T> action) throws TransactionException {

15
spring-tx/src/test/java/org/springframework/transaction/reactive/TransactionalOperatorTests.java

@ -16,6 +16,8 @@ @@ -16,6 +16,8 @@
package org.springframework.transaction.reactive;
import java.util.concurrent.atomic.AtomicBoolean;
import org.junit.jupiter.api.Test;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
@ -46,6 +48,19 @@ public class TransactionalOperatorTests { @@ -46,6 +48,19 @@ public class TransactionalOperatorTests {
assertThat(tm.rollback).isFalse();
}
@Test
public void monoSubscriptionNotCancelled() {
AtomicBoolean cancelled = new AtomicBoolean();
TransactionalOperator operator = TransactionalOperator.create(tm, new DefaultTransactionDefinition());
Mono.just(true).doOnCancel(() -> cancelled.set(true)).as(operator::transactional)
.as(StepVerifier::create)
.expectNext(true)
.verifyComplete();
assertThat(tm.commit).isTrue();
assertThat(tm.rollback).isFalse();
assertThat(cancelled).isFalse();
}
@Test
public void rollbackWithMono() {
TransactionalOperator operator = TransactionalOperator.create(tm, new DefaultTransactionDefinition());

Loading…
Cancel
Save