@ -1,5 +1,5 @@
@@ -1,5 +1,5 @@
/ *
* Copyright 2002 - 2021 the original author or authors .
* Copyright 2002 - 2022 the original author or authors .
*
* Licensed under the Apache License , Version 2 . 0 ( the "License" ) ;
* you may not use this file except in compliance with the License .
@ -47,7 +47,7 @@ import org.springframework.util.Assert;
@@ -47,7 +47,7 @@ import org.springframework.util.Assert;
* < p > < b > Note : The { @code ConnectionFactory } that this transaction manager
* operates on needs to return independent { @code Connection } s . < / b >
* The { @code Connection } s may come from a pool ( the typical case ) , but the
* { @code ConnectionFactory } must not return scoped scoped { @code Connection } s
* { @code ConnectionFactory } must not return scoped { @code Connection } s
* or the like . This transaction manager will associate { @code Connection }
* with context - bound transactions itself , according to the specified propagation
* behavior . It assumes that a separate , independent { @code Connection } can
@ -142,8 +142,8 @@ public class R2dbcTransactionManager extends AbstractReactiveTransactionManager
@@ -142,8 +142,8 @@ public class R2dbcTransactionManager extends AbstractReactiveTransactionManager
* transactional connection : "SET TRANSACTION READ ONLY" as understood by Oracle ,
* MySQL and Postgres .
* < p > The exact treatment , including any SQL statement executed on the connection ,
* can be customized through through { @link # prepareTransactionalConnection } .
* @see # prepareTransactionalConnection
* can be customized through { @link # prepareTransactionalConnection ( Connection , TransactionDefinition ) } .
* @see # prepareTransactionalConnection ( Connection , TransactionDefinition )
* /
public void setEnforceReadOnly ( boolean enforceReadOnly ) {
this . enforceReadOnly = enforceReadOnly ;
@ -179,6 +179,7 @@ public class R2dbcTransactionManager extends AbstractReactiveTransactionManager
@@ -179,6 +179,7 @@ public class R2dbcTransactionManager extends AbstractReactiveTransactionManager
return ( txObject . hasConnectionHolder ( ) & & txObject . getConnectionHolder ( ) . isTransactionActive ( ) ) ;
}
@SuppressWarnings ( "deprecation" )
@Override
protected Mono < Void > doBegin ( TransactionSynchronizationManager synchronizationManager , Object transaction ,
TransactionDefinition definition ) throws TransactionException {
@ -202,27 +203,27 @@ public class R2dbcTransactionManager extends AbstractReactiveTransactionManager
@@ -202,27 +203,27 @@ public class R2dbcTransactionManager extends AbstractReactiveTransactionManager
connectionMono = Mono . just ( txObject . getConnectionHolder ( ) . getConnection ( ) ) ;
}
return connectionMono . flatMap ( con - > {
return prepareTransactionalConnection ( con , definition , transaction ) . then ( Mono . from ( con . beginTransaction ( ) ) )
. doOnSuccess ( v - > {
txObject . getConnectionHolder ( ) . setTransactionActive ( true ) ;
Duration timeout = determineTimeout ( definition ) ;
if ( ! timeout . isNegative ( ) & & ! timeout . isZero ( ) ) {
txObject . getConnectionHolder ( ) . setTimeoutInMillis ( timeout . toMill is( ) ) ;
}
// Bind the connection holder to the thread.
if ( txObject . isNewConnectionHolder ( ) ) {
synchronizationManager . bindResource ( obtainConnectionFactory ( ) , txObject . getConnectionHolder ( ) ) ;
}
} ) . thenReturn ( con ) . onErrorResume ( e - > {
if ( txObject . isNewConnectionHolder ( ) ) {
return ConnectionFactoryUtils . releaseConnection ( con , obtainConnectionFactory ( ) )
. doOnTerminate ( ( ) - > txObject . setConnectionHolder ( null , false ) )
. then ( Mono . error ( e ) ) ;
}
return Mono . error ( e ) ;
} ) ;
} ) . onErrorResume ( e - > {
return connectionMono . flatMap ( con - > prepareTransactionalConnection ( con , definition , transaction )
. then ( Mono . from ( con . beginTransaction ( ) ) )
. then ( prepareTransactionalConnection ( con , definition ) )
. doOnSuccess ( v - > {
txObject . getConnectionHolder ( ) . setTransactionActive ( true ) ;
Duration timeout = determineTimeout ( definition ) ;
if ( ! timeout . isNegative ( ) & & ! timeout . isZero ( ) ) {
txObject . getConnectionHolder ( ) . setTimeoutInMillis ( timeout . toMillis ( ) ) ;
}
// Bind the connection holder to the thread.
if ( txObject . isNewConnectionHolder ( ) ) {
synchronizationManager . bindResource ( obtainConnectionFactory ( ) , txObject . getConnectionHolder ( ) ) ;
}
} ) . thenReturn ( con ) . onErrorResume ( e - > {
if ( txObject . isNewConnectionHolder ( ) ) {
return ConnectionFactoryUtils . releaseConnection ( con , obtainConnectionFactory ( ) )
. doOnTerminate ( ( ) - > txObject . setConnectionHolder ( null , fals e) )
. then ( Mono . error ( e ) ) ;
}
return Mono . error ( e ) ;
} ) ) . onErrorResume ( e - > {
CannotCreateTransactionException ex = new CannotCreateTransactionException (
"Could not open R2DBC Connection for transaction" , e ) ;
return Mono . error ( ex ) ;
@ -350,31 +351,17 @@ public class R2dbcTransactionManager extends AbstractReactiveTransactionManager
@@ -350,31 +351,17 @@ public class R2dbcTransactionManager extends AbstractReactiveTransactionManager
}
/ * *
* Prepare the transactional { @link Connection } right after transaction begin .
* < p > The default implementation executes a "SET TRANSACTION READ ONLY" statement if the
* { @link # setEnforceReadOnly "enforceReadOnly" } flag is set to { @code true } and the
* transaction definition indicates a read - only transaction .
* < p > The "SET TRANSACTION READ ONLY" is understood by Oracle , MySQL and Postgres
* and may work with other databases as well . If you ' d like to adapt this treatment ,
* override this method accordingly .
* @param con the transactional R2DBC Connection
* @param definition the current transaction definition
* @param transaction the transaction object
* @see # setEnforceReadOnly
* Prepare the transactional { @link Connection } right before transaction begin .
* @deprecated in favor of { @link # prepareTransactionalConnection ( Connection , TransactionDefinition ) }
* since this variant gets called too early ( before transaction begin ) for read - only customization
* /
@Deprecated
protected Mono < Void > prepareTransactionalConnection (
Connection con , TransactionDefinition definition , Object transaction ) {
ConnectionFactoryTransactionObject txObject = ( ConnectionFactoryTransactionObject ) transaction ;
Mono < Void > prepare = Mono . empty ( ) ;
if ( isEnforceReadOnly ( ) & & definition . isReadOnly ( ) ) {
prepare = Mono . from ( con . createStatement ( "SET TRANSACTION READ ONLY" ) . execute ( ) )
. flatMapMany ( Result : : getRowsUpdated )
. then ( ) ;
}
// Apply specific isolation level, if any.
IsolationLevel isolationLevelToUse = resolveIsolationLevel ( definition . getIsolationLevel ( ) ) ;
if ( isolationLevelToUse ! = null & & definition . getIsolationLevel ( ) ! = TransactionDefinition . ISOLATION_DEFAULT ) {
@ -404,6 +391,29 @@ public class R2dbcTransactionManager extends AbstractReactiveTransactionManager
@@ -404,6 +391,29 @@ public class R2dbcTransactionManager extends AbstractReactiveTransactionManager
return prepare ;
}
/ * *
* Prepare the transactional { @link Connection } right after transaction begin .
* < p > The default implementation executes a "SET TRANSACTION READ ONLY" statement if the
* { @link # setEnforceReadOnly "enforceReadOnly" } flag is set to { @code true } and the
* transaction definition indicates a read - only transaction .
* < p > The "SET TRANSACTION READ ONLY" is understood by Oracle , MySQL and Postgres
* and may work with other databases as well . If you ' d like to adapt this treatment ,
* override this method accordingly .
* @param con the transactional R2DBC Connection
* @param definition the current transaction definition
* @since 5 . 3 . 22
* @see # setEnforceReadOnly
* /
protected Mono < Void > prepareTransactionalConnection ( Connection con , TransactionDefinition definition ) {
Mono < Void > prepare = Mono . empty ( ) ;
if ( isEnforceReadOnly ( ) & & definition . isReadOnly ( ) ) {
prepare = Mono . from ( con . createStatement ( "SET TRANSACTION READ ONLY" ) . execute ( ) )
. flatMapMany ( Result : : getRowsUpdated )
. then ( ) ;
}
return prepare ;
}
/ * *
* Resolve the { @linkplain TransactionDefinition # getIsolationLevel ( ) isolation level constant } to a R2DBC
* { @link IsolationLevel } . If you ' d like to extend isolation level translation for vendor - specific