@ -24,7 +24,6 @@ import io.r2dbc.spi.IsolationLevel;
@@ -24,7 +24,6 @@ import io.r2dbc.spi.IsolationLevel;
import io.r2dbc.spi.R2dbcBadGrammarException ;
import io.r2dbc.spi.R2dbcTimeoutException ;
import io.r2dbc.spi.Statement ;
import org.assertj.core.api.Assertions ;
import org.junit.jupiter.api.BeforeEach ;
import org.junit.jupiter.api.Test ;
import reactor.core.publisher.Mono ;
@ -55,6 +54,7 @@ import static org.mockito.BDDMockito.when;
@@ -55,6 +54,7 @@ import static org.mockito.BDDMockito.when;
* Unit tests for { @link R2dbcTransactionManager } .
*
* @author Mark Paluch
* @author Juergen Hoeller
* /
class R2dbcTransactionManagerUnitTests {
@ -85,8 +85,7 @@ class R2dbcTransactionManagerUnitTests {
@@ -85,8 +85,7 @@ class R2dbcTransactionManagerUnitTests {
ConnectionFactoryUtils . getConnection ( connectionFactoryMock )
. flatMap ( connection - > TransactionSynchronizationManager . forCurrentTransaction ( )
. doOnNext ( synchronizationManager - > synchronizationManager . registerSynchronization (
sync ) ) )
. doOnNext ( synchronizationManager - > synchronizationManager . registerSynchronization ( sync ) ) )
. as ( operator : : transactional )
. as ( StepVerifier : : create )
. expectNextCount ( 1 )
@ -118,12 +117,11 @@ class R2dbcTransactionManagerUnitTests {
@@ -118,12 +117,11 @@ class R2dbcTransactionManagerUnitTests {
TransactionalOperator operator = TransactionalOperator . create ( tm , definition ) ;
ConnectionFactoryUtils . getConnection ( connectionFactoryMock ) . as (
operator : : transactional )
ConnectionFactoryUtils . getConnection ( connectionFactoryMock )
. as ( operator : : transactional )
. as ( StepVerifier : : create )
. expectErrorSatisfies ( actual - > assertThat ( actual ) . isInstanceOf (
CannotCreateTransactionException . class ) . hasCauseInstanceOf (
R2dbcBadGrammarException . class ) )
CannotCreateTransactionException . class ) . hasCauseInstanceOf ( R2dbcBadGrammarException . class ) )
. verify ( ) ;
}
@ -139,8 +137,8 @@ class R2dbcTransactionManagerUnitTests {
@@ -139,8 +137,8 @@ class R2dbcTransactionManagerUnitTests {
TransactionalOperator operator = TransactionalOperator . create ( tm , definition ) ;
ConnectionFactoryUtils . getConnection ( connectionFactoryMock ) . as (
operator : : transactional )
ConnectionFactoryUtils . getConnection ( connectionFactoryMock )
. as ( operator : : transactional )
. as ( StepVerifier : : create )
. expectNextCount ( 1 )
. verifyComplete ( ) ;
@ -164,8 +162,8 @@ class R2dbcTransactionManagerUnitTests {
@@ -164,8 +162,8 @@ class R2dbcTransactionManagerUnitTests {
TransactionalOperator operator = TransactionalOperator . create ( tm , definition ) ;
ConnectionFactoryUtils . getConnection ( connectionFactoryMock ) . as (
operator : : transactional )
ConnectionFactoryUtils . getConnection ( connectionFactoryMock )
. as ( operator : : transactional )
. as ( StepVerifier : : create )
. expectNextCount ( 1 )
. verifyComplete ( ) ;
@ -184,8 +182,8 @@ class R2dbcTransactionManagerUnitTests {
@@ -184,8 +182,8 @@ class R2dbcTransactionManagerUnitTests {
TransactionalOperator operator = TransactionalOperator . create ( tm , definition ) ;
ConnectionFactoryUtils . getConnection ( connectionFactoryMock ) . as (
operator : : transactional )
ConnectionFactoryUtils . getConnection ( connectionFactoryMock )
. as ( operator : : transactional )
. as ( StepVerifier : : create )
. expectNextCount ( 1 )
. verifyComplete ( ) ;
@ -232,8 +230,8 @@ class R2dbcTransactionManagerUnitTests {
@@ -232,8 +230,8 @@ class R2dbcTransactionManagerUnitTests {
TransactionalOperator operator = TransactionalOperator . create ( tm , definition ) ;
ConnectionFactoryUtils . getConnection ( connectionFactoryMock ) . as (
operator : : transactional )
ConnectionFactoryUtils . getConnection ( connectionFactoryMock )
. as ( operator : : transactional )
. as ( StepVerifier : : create )
. expectNextCount ( 1 )
. verifyComplete ( ) ;
@ -249,7 +247,6 @@ class R2dbcTransactionManagerUnitTests {
@@ -249,7 +247,6 @@ class R2dbcTransactionManagerUnitTests {
@Test
void testCommitFails ( ) {
when ( connectionMock . commitTransaction ( ) ) . thenReturn ( Mono . defer ( ( ) - > Mono . error ( new R2dbcBadGrammarException ( "Commit should fail" ) ) ) ) ;
when ( connectionMock . rollbackTransaction ( ) ) . thenReturn ( Mono . empty ( ) ) ;
TransactionalOperator operator = TransactionalOperator . create ( tm ) ;
@ -270,7 +267,6 @@ class R2dbcTransactionManagerUnitTests {
@@ -270,7 +267,6 @@ class R2dbcTransactionManagerUnitTests {
@Test
void testRollback ( ) {
AtomicInteger commits = new AtomicInteger ( ) ;
when ( connectionMock . commitTransaction ( ) ) . thenReturn (
Mono . fromRunnable ( commits : : incrementAndGet ) ) ;
@ -282,11 +278,9 @@ class R2dbcTransactionManagerUnitTests {
@@ -282,11 +278,9 @@ class R2dbcTransactionManagerUnitTests {
TransactionalOperator operator = TransactionalOperator . create ( tm ) ;
ConnectionFactoryUtils . getConnection ( connectionFactoryMock )
. doOnNext ( connection - > {
throw new IllegalStateException ( ) ;
} ) . as ( operator : : transactional )
. as ( StepVerifier : : create )
. verifyError ( IllegalStateException . class ) ;
. doOnNext ( connection - > { throw new IllegalStateException ( ) ; } )
. as ( operator : : transactional )
. as ( StepVerifier : : create ) . verifyError ( IllegalStateException . class ) ;
assertThat ( commits ) . hasValue ( 0 ) ;
assertThat ( rollbacks ) . hasValue ( 1 ) ;
@ -303,15 +297,11 @@ class R2dbcTransactionManagerUnitTests {
@@ -303,15 +297,11 @@ class R2dbcTransactionManagerUnitTests {
when ( connectionMock . rollbackTransaction ( ) ) . thenReturn ( Mono . defer ( ( ) - > Mono . error ( new R2dbcBadGrammarException ( "Commit should fail" ) ) ) , Mono . empty ( ) ) ;
TransactionalOperator operator = TransactionalOperator . create ( tm ) ;
operator . execute ( reactiveTransaction - > {
reactiveTransaction . setRollbackOnly ( ) ;
return ConnectionFactoryUtils . getConnection ( connectionFactoryMock )
. doOnNext ( connection - > connection . createStatement ( "foo" ) ) . then ( ) ;
} ) . as ( StepVerifier : : create )
. verifyError ( IllegalTransactionStateException . class ) ;
} ) . as ( StepVerifier : : create ) . verifyError ( IllegalTransactionStateException . class ) ;
verify ( connectionMock ) . isAutoCommit ( ) ;
verify ( connectionMock ) . beginTransaction ( ) ;
@ -338,7 +328,7 @@ class R2dbcTransactionManagerUnitTests {
@@ -338,7 +328,7 @@ class R2dbcTransactionManagerUnitTests {
. doOnNext ( connection - > {
throw new IllegalStateException ( "Intentional error to trigger rollback" ) ;
} ) . then ( ) ) . as ( StepVerifier : : create )
. verifyErrorSatisfies ( e - > Assertions . assertThat ( e )
. verifyErrorSatisfies ( ex - > assertThat ( ex )
. isInstanceOf ( BadSqlGrammarException . class )
. hasCause ( new R2dbcBadGrammarException ( "Rollback should fail" ) )
) ;
@ -357,19 +347,15 @@ class R2dbcTransactionManagerUnitTests {
@@ -357,19 +347,15 @@ class R2dbcTransactionManagerUnitTests {
TransactionSynchronization . STATUS_ROLLED_BACK ) ;
TransactionalOperator operator = TransactionalOperator . create ( tm ) ;
operator . execute ( tx - > {
tx . setRollbackOnly ( ) ;
assertThat ( tx . isNewTransaction ( ) ) . isTrue ( ) ;
return TransactionSynchronizationManager . forCurrentTransaction ( ) . doOnNext (
synchronizationManager - > {
assertThat ( synchronizationManager . hasResource ( connectionFactoryMock ) ) . isTrue ( ) ;
synchronizationManager . registerSynchronization ( sync ) ;
} ) . then ( ) ;
} ) . as ( StepVerifier : : create )
. verifyComplete ( ) ;
} ) . as ( StepVerifier : : create ) . verifyComplete ( ) ;
verify ( connectionMock ) . isAutoCommit ( ) ;
verify ( connectionMock ) . beginTransaction ( ) ;
@ -389,20 +375,16 @@ class R2dbcTransactionManagerUnitTests {
@@ -389,20 +375,16 @@ class R2dbcTransactionManagerUnitTests {
DefaultTransactionDefinition definition = new DefaultTransactionDefinition ( ) ;
definition . setPropagationBehavior ( TransactionDefinition . PROPAGATION_REQUIRES_NEW ) ;
TransactionalOperator operator = TransactionalOperator . create ( tm , definition ) ;
TransactionalOperator operator = TransactionalOperator . create ( tm , definition ) ;
operator . execute ( tx1 - > {
assertThat ( tx1 . isNewTransaction ( ) ) . isTrue ( ) ;
definition . setPropagationBehavior ( TransactionDefinition . PROPAGATION_NEVER ) ;
return operator . execute ( tx2 - > {
fail ( "Should have thrown IllegalTransactionStateException" ) ;
return Mono . empty ( ) ;
} ) ;
} ) . as ( StepVerifier : : create )
. verifyError ( IllegalTransactionStateException . class ) ;
} ) . as ( StepVerifier : : create ) . verifyError ( IllegalTransactionStateException . class ) ;
verify ( connectionMock ) . rollbackTransaction ( ) ;
verify ( connectionMock ) . close ( ) ;
@ -414,32 +396,49 @@ class R2dbcTransactionManagerUnitTests {
@@ -414,32 +396,49 @@ class R2dbcTransactionManagerUnitTests {
DefaultTransactionDefinition definition = new DefaultTransactionDefinition ( ) ;
definition . setPropagationBehavior ( TransactionDefinition . PROPAGATION_SUPPORTS ) ;
TransactionalOperator operator = TransactionalOperator . create ( tm , definition ) ;
TransactionalOperator operator = TransactionalOperator . create ( tm , definition ) ;
operator . execute ( tx1 - > {
assertThat ( tx1 . isNewTransaction ( ) ) . isFalse ( ) ;
DefaultTransactionDefinition innerDef = new DefaultTransactionDefinition ( ) ;
innerDef . setPropagationBehavior (
TransactionDefinition . PROPAGATION_REQUIRES_NEW ) ;
innerDef . setPropagationBehavior ( TransactionDefinition . PROPAGATION_REQUIRES_NEW ) ;
TransactionalOperator inner = TransactionalOperator . create ( tm , innerDef ) ;
return inner . execute ( tx2 - > {
assertThat ( tx2 . isNewTransaction ( ) ) . isTrue ( ) ;
return Mono . empty ( ) ;
} ) ;
} ) . as ( StepVerifier : : create )
. verifyComplete ( ) ;
} ) . as ( StepVerifier : : create ) . verifyComplete ( ) ;
verify ( connectionMock ) . commitTransaction ( ) ;
verify ( connectionMock ) . close ( ) ;
}
@Test
void testPropagationSupportsAndRequiresNewWithRollback ( ) {
when ( connectionMock . rollbackTransaction ( ) ) . thenReturn ( Mono . empty ( ) ) ;
private static class TestTransactionSynchronization
implements TransactionSynchronization {
DefaultTransactionDefinition definition = new DefaultTransactionDefinition ( ) ;
definition . setPropagationBehavior ( TransactionDefinition . PROPAGATION_SUPPORTS ) ;
TransactionalOperator operator = TransactionalOperator . create ( tm , definition ) ;
operator . execute ( tx1 - > {
assertThat ( tx1 . isNewTransaction ( ) ) . isFalse ( ) ;
DefaultTransactionDefinition innerDef = new DefaultTransactionDefinition ( ) ;
innerDef . setPropagationBehavior ( TransactionDefinition . PROPAGATION_REQUIRES_NEW ) ;
TransactionalOperator inner = TransactionalOperator . create ( tm , innerDef ) ;
return inner . execute ( tx2 - > {
assertThat ( tx2 . isNewTransaction ( ) ) . isTrue ( ) ;
tx2 . setRollbackOnly ( ) ;
return Mono . empty ( ) ;
} ) ;
} ) . as ( StepVerifier : : create ) . verifyComplete ( ) ;
verify ( connectionMock ) . rollbackTransaction ( ) ;
verify ( connectionMock ) . close ( ) ;
}
private static class TestTransactionSynchronization implements TransactionSynchronization {
private int status ;
@ -512,7 +511,6 @@ class R2dbcTransactionManagerUnitTests {
@@ -512,7 +511,6 @@ class R2dbcTransactionManagerUnitTests {
this . afterCompletionCalled = true ;
assertThat ( status ) . isEqualTo ( this . status ) ;
}
}
}