diff --git a/spring-tx/src/test/java/org/springframework/transaction/reactive/TransactionalOperatorTests.java b/spring-tx/src/test/java/org/springframework/transaction/reactive/TransactionalOperatorTests.java index bc19adb58b..8a78557376 100644 --- a/spring-tx/src/test/java/org/springframework/transaction/reactive/TransactionalOperatorTests.java +++ b/spring-tx/src/test/java/org/springframework/transaction/reactive/TransactionalOperatorTests.java @@ -61,6 +61,34 @@ public class TransactionalOperatorTests { assertThat(cancelled).isFalse(); } + @Test + public void cancellationPropagatedToMono() { + AtomicBoolean cancelled = new AtomicBoolean(); + TransactionalOperator operator = TransactionalOperator.create(tm, new DefaultTransactionDefinition()); + Mono.create(sink -> sink.onCancel(() -> cancelled.set(true))).as(operator::transactional) + .as(StepVerifier::create) + .thenAwait() + .thenCancel() + .verify(); + assertThat(tm.commit).isTrue(); + assertThat(tm.rollback).isFalse(); + assertThat(cancelled).isTrue(); + } + + @Test + public void cancellationPropagatedToFlux() { + AtomicBoolean cancelled = new AtomicBoolean(); + TransactionalOperator operator = TransactionalOperator.create(tm, new DefaultTransactionDefinition()); + Flux.create(sink -> sink.onCancel(() -> cancelled.set(true))).as(operator::transactional) + .as(StepVerifier::create) + .thenAwait() + .thenCancel() + .verify(); + assertThat(tm.commit).isTrue(); + assertThat(tm.rollback).isFalse(); + assertThat(cancelled).isTrue(); + } + @Test public void rollbackWithMono() { TransactionalOperator operator = TransactionalOperator.create(tm, new DefaultTransactionDefinition());