From ae70e3c81c83b7420e54d53d85904d95d33abfd5 Mon Sep 17 00:00:00 2001 From: Juergen Hoeller Date: Wed, 13 Jul 2022 11:09:17 +0200 Subject: [PATCH] Apply read-only enforcement after R2DBC transaction begin Includes prepareTransactionalConnection variant aligned with JDBC DataSourceTransactionManager. Closes gh-28610 --- .../connection/R2dbcTransactionManager.java | 96 ++++++++++--------- 1 file changed, 53 insertions(+), 43 deletions(-) diff --git a/spring-r2dbc/src/main/java/org/springframework/r2dbc/connection/R2dbcTransactionManager.java b/spring-r2dbc/src/main/java/org/springframework/r2dbc/connection/R2dbcTransactionManager.java index e22c7a1dac..2bebbd4c40 100644 --- a/spring-r2dbc/src/main/java/org/springframework/r2dbc/connection/R2dbcTransactionManager.java +++ b/spring-r2dbc/src/main/java/org/springframework/r2dbc/connection/R2dbcTransactionManager.java @@ -1,5 +1,5 @@ /* - * Copyright 2002-2021 the original author or authors. + * Copyright 2002-2022 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -47,7 +47,7 @@ import org.springframework.util.Assert; *

Note: The {@code ConnectionFactory} that this transaction manager * operates on needs to return independent {@code Connection}s. * The {@code Connection}s may come from a pool (the typical case), but the - * {@code ConnectionFactory} must not return scoped scoped {@code Connection}s + * {@code ConnectionFactory} must not return scoped {@code Connection}s * or the like. This transaction manager will associate {@code Connection} * with context-bound transactions itself, according to the specified propagation * behavior. It assumes that a separate, independent {@code Connection} can @@ -142,8 +142,8 @@ public class R2dbcTransactionManager extends AbstractReactiveTransactionManager * transactional connection: "SET TRANSACTION READ ONLY" as understood by Oracle, * MySQL and Postgres. *

The exact treatment, including any SQL statement executed on the connection, - * can be customized through through {@link #prepareTransactionalConnection}. - * @see #prepareTransactionalConnection + * can be customized through {@link #prepareTransactionalConnection(Connection, TransactionDefinition)}. + * @see #prepareTransactionalConnection(Connection, TransactionDefinition) */ public void setEnforceReadOnly(boolean enforceReadOnly) { this.enforceReadOnly = enforceReadOnly; @@ -179,6 +179,7 @@ public class R2dbcTransactionManager extends AbstractReactiveTransactionManager return (txObject.hasConnectionHolder() && txObject.getConnectionHolder().isTransactionActive()); } + @SuppressWarnings("deprecation") @Override protected Mono doBegin(TransactionSynchronizationManager synchronizationManager, Object transaction, TransactionDefinition definition) throws TransactionException { @@ -202,27 +203,27 @@ public class R2dbcTransactionManager extends AbstractReactiveTransactionManager connectionMono = Mono.just(txObject.getConnectionHolder().getConnection()); } - return connectionMono.flatMap(con -> { - return prepareTransactionalConnection(con, definition, transaction).then(Mono.from(con.beginTransaction())) - .doOnSuccess(v -> { - txObject.getConnectionHolder().setTransactionActive(true); - Duration timeout = determineTimeout(definition); - if (!timeout.isNegative() && !timeout.isZero()) { - txObject.getConnectionHolder().setTimeoutInMillis(timeout.toMillis()); - } - // Bind the connection holder to the thread. - if (txObject.isNewConnectionHolder()) { - synchronizationManager.bindResource(obtainConnectionFactory(), txObject.getConnectionHolder()); - } - }).thenReturn(con).onErrorResume(e -> { - if (txObject.isNewConnectionHolder()) { - return ConnectionFactoryUtils.releaseConnection(con, obtainConnectionFactory()) - .doOnTerminate(() -> txObject.setConnectionHolder(null, false)) - .then(Mono.error(e)); - } - return Mono.error(e); - }); - }).onErrorResume(e -> { + return connectionMono.flatMap(con -> prepareTransactionalConnection(con, definition, transaction) + .then(Mono.from(con.beginTransaction())) + .then(prepareTransactionalConnection(con, definition)) + .doOnSuccess(v -> { + txObject.getConnectionHolder().setTransactionActive(true); + Duration timeout = determineTimeout(definition); + if (!timeout.isNegative() && !timeout.isZero()) { + txObject.getConnectionHolder().setTimeoutInMillis(timeout.toMillis()); + } + // Bind the connection holder to the thread. + if (txObject.isNewConnectionHolder()) { + synchronizationManager.bindResource(obtainConnectionFactory(), txObject.getConnectionHolder()); + } + }).thenReturn(con).onErrorResume(e -> { + if (txObject.isNewConnectionHolder()) { + return ConnectionFactoryUtils.releaseConnection(con, obtainConnectionFactory()) + .doOnTerminate(() -> txObject.setConnectionHolder(null, false)) + .then(Mono.error(e)); + } + return Mono.error(e); + })).onErrorResume(e -> { CannotCreateTransactionException ex = new CannotCreateTransactionException( "Could not open R2DBC Connection for transaction", e); return Mono.error(ex); @@ -350,31 +351,17 @@ public class R2dbcTransactionManager extends AbstractReactiveTransactionManager } /** - * Prepare the transactional {@link Connection} right after transaction begin. - *

The default implementation executes a "SET TRANSACTION READ ONLY" statement if the - * {@link #setEnforceReadOnly "enforceReadOnly"} flag is set to {@code true} and the - * transaction definition indicates a read-only transaction. - *

The "SET TRANSACTION READ ONLY" is understood by Oracle, MySQL and Postgres - * and may work with other databases as well. If you'd like to adapt this treatment, - * override this method accordingly. - * @param con the transactional R2DBC Connection - * @param definition the current transaction definition - * @param transaction the transaction object - * @see #setEnforceReadOnly + * Prepare the transactional {@link Connection} right before transaction begin. + * @deprecated in favor of {@link #prepareTransactionalConnection(Connection, TransactionDefinition)} + * since this variant gets called too early (before transaction begin) for read-only customization */ + @Deprecated protected Mono prepareTransactionalConnection( Connection con, TransactionDefinition definition, Object transaction) { ConnectionFactoryTransactionObject txObject = (ConnectionFactoryTransactionObject) transaction; - Mono prepare = Mono.empty(); - if (isEnforceReadOnly() && definition.isReadOnly()) { - prepare = Mono.from(con.createStatement("SET TRANSACTION READ ONLY").execute()) - .flatMapMany(Result::getRowsUpdated) - .then(); - } - // Apply specific isolation level, if any. IsolationLevel isolationLevelToUse = resolveIsolationLevel(definition.getIsolationLevel()); if (isolationLevelToUse != null && definition.getIsolationLevel() != TransactionDefinition.ISOLATION_DEFAULT) { @@ -404,6 +391,29 @@ public class R2dbcTransactionManager extends AbstractReactiveTransactionManager return prepare; } + /** + * Prepare the transactional {@link Connection} right after transaction begin. + *

The default implementation executes a "SET TRANSACTION READ ONLY" statement if the + * {@link #setEnforceReadOnly "enforceReadOnly"} flag is set to {@code true} and the + * transaction definition indicates a read-only transaction. + *

The "SET TRANSACTION READ ONLY" is understood by Oracle, MySQL and Postgres + * and may work with other databases as well. If you'd like to adapt this treatment, + * override this method accordingly. + * @param con the transactional R2DBC Connection + * @param definition the current transaction definition + * @since 5.3.22 + * @see #setEnforceReadOnly + */ + protected Mono prepareTransactionalConnection(Connection con, TransactionDefinition definition) { + Mono prepare = Mono.empty(); + if (isEnforceReadOnly() && definition.isReadOnly()) { + prepare = Mono.from(con.createStatement("SET TRANSACTION READ ONLY").execute()) + .flatMapMany(Result::getRowsUpdated) + .then(); + } + return prepare; + } + /** * Resolve the {@linkplain TransactionDefinition#getIsolationLevel() isolation level constant} to a R2DBC * {@link IsolationLevel}. If you'd like to extend isolation level translation for vendor-specific