diff --git a/spring-r2dbc/src/main/java/org/springframework/r2dbc/connection/R2dbcTransactionManager.java b/spring-r2dbc/src/main/java/org/springframework/r2dbc/connection/R2dbcTransactionManager.java
index e22c7a1dac..2bebbd4c40 100644
--- a/spring-r2dbc/src/main/java/org/springframework/r2dbc/connection/R2dbcTransactionManager.java
+++ b/spring-r2dbc/src/main/java/org/springframework/r2dbc/connection/R2dbcTransactionManager.java
@@ -1,5 +1,5 @@
/*
- * Copyright 2002-2021 the original author or authors.
+ * Copyright 2002-2022 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@@ -47,7 +47,7 @@ import org.springframework.util.Assert;
*
Note: The {@code ConnectionFactory} that this transaction manager
* operates on needs to return independent {@code Connection}s.
* The {@code Connection}s may come from a pool (the typical case), but the
- * {@code ConnectionFactory} must not return scoped scoped {@code Connection}s
+ * {@code ConnectionFactory} must not return scoped {@code Connection}s
* or the like. This transaction manager will associate {@code Connection}
* with context-bound transactions itself, according to the specified propagation
* behavior. It assumes that a separate, independent {@code Connection} can
@@ -142,8 +142,8 @@ public class R2dbcTransactionManager extends AbstractReactiveTransactionManager
* transactional connection: "SET TRANSACTION READ ONLY" as understood by Oracle,
* MySQL and Postgres.
*
The exact treatment, including any SQL statement executed on the connection,
- * can be customized through through {@link #prepareTransactionalConnection}.
- * @see #prepareTransactionalConnection
+ * can be customized through {@link #prepareTransactionalConnection(Connection, TransactionDefinition)}.
+ * @see #prepareTransactionalConnection(Connection, TransactionDefinition)
*/
public void setEnforceReadOnly(boolean enforceReadOnly) {
this.enforceReadOnly = enforceReadOnly;
@@ -179,6 +179,7 @@ public class R2dbcTransactionManager extends AbstractReactiveTransactionManager
return (txObject.hasConnectionHolder() && txObject.getConnectionHolder().isTransactionActive());
}
+ @SuppressWarnings("deprecation")
@Override
protected Mono doBegin(TransactionSynchronizationManager synchronizationManager, Object transaction,
TransactionDefinition definition) throws TransactionException {
@@ -202,27 +203,27 @@ public class R2dbcTransactionManager extends AbstractReactiveTransactionManager
connectionMono = Mono.just(txObject.getConnectionHolder().getConnection());
}
- return connectionMono.flatMap(con -> {
- return prepareTransactionalConnection(con, definition, transaction).then(Mono.from(con.beginTransaction()))
- .doOnSuccess(v -> {
- txObject.getConnectionHolder().setTransactionActive(true);
- Duration timeout = determineTimeout(definition);
- if (!timeout.isNegative() && !timeout.isZero()) {
- txObject.getConnectionHolder().setTimeoutInMillis(timeout.toMillis());
- }
- // Bind the connection holder to the thread.
- if (txObject.isNewConnectionHolder()) {
- synchronizationManager.bindResource(obtainConnectionFactory(), txObject.getConnectionHolder());
- }
- }).thenReturn(con).onErrorResume(e -> {
- if (txObject.isNewConnectionHolder()) {
- return ConnectionFactoryUtils.releaseConnection(con, obtainConnectionFactory())
- .doOnTerminate(() -> txObject.setConnectionHolder(null, false))
- .then(Mono.error(e));
- }
- return Mono.error(e);
- });
- }).onErrorResume(e -> {
+ return connectionMono.flatMap(con -> prepareTransactionalConnection(con, definition, transaction)
+ .then(Mono.from(con.beginTransaction()))
+ .then(prepareTransactionalConnection(con, definition))
+ .doOnSuccess(v -> {
+ txObject.getConnectionHolder().setTransactionActive(true);
+ Duration timeout = determineTimeout(definition);
+ if (!timeout.isNegative() && !timeout.isZero()) {
+ txObject.getConnectionHolder().setTimeoutInMillis(timeout.toMillis());
+ }
+ // Bind the connection holder to the thread.
+ if (txObject.isNewConnectionHolder()) {
+ synchronizationManager.bindResource(obtainConnectionFactory(), txObject.getConnectionHolder());
+ }
+ }).thenReturn(con).onErrorResume(e -> {
+ if (txObject.isNewConnectionHolder()) {
+ return ConnectionFactoryUtils.releaseConnection(con, obtainConnectionFactory())
+ .doOnTerminate(() -> txObject.setConnectionHolder(null, false))
+ .then(Mono.error(e));
+ }
+ return Mono.error(e);
+ })).onErrorResume(e -> {
CannotCreateTransactionException ex = new CannotCreateTransactionException(
"Could not open R2DBC Connection for transaction", e);
return Mono.error(ex);
@@ -350,31 +351,17 @@ public class R2dbcTransactionManager extends AbstractReactiveTransactionManager
}
/**
- * Prepare the transactional {@link Connection} right after transaction begin.
- * The default implementation executes a "SET TRANSACTION READ ONLY" statement if the
- * {@link #setEnforceReadOnly "enforceReadOnly"} flag is set to {@code true} and the
- * transaction definition indicates a read-only transaction.
- *
The "SET TRANSACTION READ ONLY" is understood by Oracle, MySQL and Postgres
- * and may work with other databases as well. If you'd like to adapt this treatment,
- * override this method accordingly.
- * @param con the transactional R2DBC Connection
- * @param definition the current transaction definition
- * @param transaction the transaction object
- * @see #setEnforceReadOnly
+ * Prepare the transactional {@link Connection} right before transaction begin.
+ * @deprecated in favor of {@link #prepareTransactionalConnection(Connection, TransactionDefinition)}
+ * since this variant gets called too early (before transaction begin) for read-only customization
*/
+ @Deprecated
protected Mono prepareTransactionalConnection(
Connection con, TransactionDefinition definition, Object transaction) {
ConnectionFactoryTransactionObject txObject = (ConnectionFactoryTransactionObject) transaction;
-
Mono prepare = Mono.empty();
- if (isEnforceReadOnly() && definition.isReadOnly()) {
- prepare = Mono.from(con.createStatement("SET TRANSACTION READ ONLY").execute())
- .flatMapMany(Result::getRowsUpdated)
- .then();
- }
-
// Apply specific isolation level, if any.
IsolationLevel isolationLevelToUse = resolveIsolationLevel(definition.getIsolationLevel());
if (isolationLevelToUse != null && definition.getIsolationLevel() != TransactionDefinition.ISOLATION_DEFAULT) {
@@ -404,6 +391,29 @@ public class R2dbcTransactionManager extends AbstractReactiveTransactionManager
return prepare;
}
+ /**
+ * Prepare the transactional {@link Connection} right after transaction begin.
+ * The default implementation executes a "SET TRANSACTION READ ONLY" statement if the
+ * {@link #setEnforceReadOnly "enforceReadOnly"} flag is set to {@code true} and the
+ * transaction definition indicates a read-only transaction.
+ *
The "SET TRANSACTION READ ONLY" is understood by Oracle, MySQL and Postgres
+ * and may work with other databases as well. If you'd like to adapt this treatment,
+ * override this method accordingly.
+ * @param con the transactional R2DBC Connection
+ * @param definition the current transaction definition
+ * @since 5.3.22
+ * @see #setEnforceReadOnly
+ */
+ protected Mono prepareTransactionalConnection(Connection con, TransactionDefinition definition) {
+ Mono prepare = Mono.empty();
+ if (isEnforceReadOnly() && definition.isReadOnly()) {
+ prepare = Mono.from(con.createStatement("SET TRANSACTION READ ONLY").execute())
+ .flatMapMany(Result::getRowsUpdated)
+ .then();
+ }
+ return prepare;
+ }
+
/**
* Resolve the {@linkplain TransactionDefinition#getIsolationLevel() isolation level constant} to a R2DBC
* {@link IsolationLevel}. If you'd like to extend isolation level translation for vendor-specific