|
|
@ -61,6 +61,34 @@ public class TransactionalOperatorTests { |
|
|
|
assertThat(cancelled).isFalse(); |
|
|
|
assertThat(cancelled).isFalse(); |
|
|
|
} |
|
|
|
} |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
@Test |
|
|
|
|
|
|
|
public void cancellationPropagatedToMono() { |
|
|
|
|
|
|
|
AtomicBoolean cancelled = new AtomicBoolean(); |
|
|
|
|
|
|
|
TransactionalOperator operator = TransactionalOperator.create(tm, new DefaultTransactionDefinition()); |
|
|
|
|
|
|
|
Mono.create(sink -> sink.onCancel(() -> cancelled.set(true))).as(operator::transactional) |
|
|
|
|
|
|
|
.as(StepVerifier::create) |
|
|
|
|
|
|
|
.thenAwait() |
|
|
|
|
|
|
|
.thenCancel() |
|
|
|
|
|
|
|
.verify(); |
|
|
|
|
|
|
|
assertThat(tm.commit).isTrue(); |
|
|
|
|
|
|
|
assertThat(tm.rollback).isFalse(); |
|
|
|
|
|
|
|
assertThat(cancelled).isTrue(); |
|
|
|
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
@Test |
|
|
|
|
|
|
|
public void cancellationPropagatedToFlux() { |
|
|
|
|
|
|
|
AtomicBoolean cancelled = new AtomicBoolean(); |
|
|
|
|
|
|
|
TransactionalOperator operator = TransactionalOperator.create(tm, new DefaultTransactionDefinition()); |
|
|
|
|
|
|
|
Flux.create(sink -> sink.onCancel(() -> cancelled.set(true))).as(operator::transactional) |
|
|
|
|
|
|
|
.as(StepVerifier::create) |
|
|
|
|
|
|
|
.thenAwait() |
|
|
|
|
|
|
|
.thenCancel() |
|
|
|
|
|
|
|
.verify(); |
|
|
|
|
|
|
|
assertThat(tm.commit).isTrue(); |
|
|
|
|
|
|
|
assertThat(tm.rollback).isFalse(); |
|
|
|
|
|
|
|
assertThat(cancelled).isTrue(); |
|
|
|
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
|
|
@Test |
|
|
|
@Test |
|
|
|
public void rollbackWithMono() { |
|
|
|
public void rollbackWithMono() { |
|
|
|
TransactionalOperator operator = TransactionalOperator.create(tm, new DefaultTransactionDefinition()); |
|
|
|
TransactionalOperator operator = TransactionalOperator.create(tm, new DefaultTransactionDefinition()); |
|
|
|