From 974e10379a4bce1b012e85fa18ca22c0df4b8eb4 Mon Sep 17 00:00:00 2001 From: Juergen Hoeller Date: Mon, 5 Jun 2023 10:45:47 +0200 Subject: [PATCH] Add nested propagation support to R2dbcTransactionManager Closes gh-30134 --- .../r2dbc/connection/ConnectionHolder.java | 22 ++- .../connection/R2dbcTransactionManager.java | 137 +++++++++------- .../R2dbcTransactionManagerUnitTests.java | 151 +++++++++++++++--- 3 files changed, 232 insertions(+), 78 deletions(-) diff --git a/spring-r2dbc/src/main/java/org/springframework/r2dbc/connection/ConnectionHolder.java b/spring-r2dbc/src/main/java/org/springframework/r2dbc/connection/ConnectionHolder.java index 5ad913d0b8..dd1b9c36c7 100644 --- a/spring-r2dbc/src/main/java/org/springframework/r2dbc/connection/ConnectionHolder.java +++ b/spring-r2dbc/src/main/java/org/springframework/r2dbc/connection/ConnectionHolder.java @@ -1,5 +1,5 @@ /* - * Copyright 2002-2022 the original author or authors. + * Copyright 2002-2023 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -41,11 +41,20 @@ import org.springframework.util.Assert; */ public class ConnectionHolder extends ResourceHolderSupport { + /** + * Prefix for savepoint names. + * @since 6.0.10 + */ + static final String SAVEPOINT_NAME_PREFIX = "SAVEPOINT_"; + + @Nullable private Connection currentConnection; private boolean transactionActive; + private int savepointCounter = 0; + /** * Create a new ConnectionHolder for the given R2DBC {@link Connection}, @@ -112,6 +121,17 @@ public class ConnectionHolder extends ResourceHolderSupport { return this.currentConnection; } + /** + * Create a new savepoint for the current {@link Connection}, + * using generated savepoint names that are unique for the Connection. + * @return the name of the new savepoint + * @since 6.0.10 + */ + String nextSavepoint() { + this.savepointCounter++; + return SAVEPOINT_NAME_PREFIX + this.savepointCounter; + } + /** * Releases the current {@link Connection}. */ diff --git a/spring-r2dbc/src/main/java/org/springframework/r2dbc/connection/R2dbcTransactionManager.java b/spring-r2dbc/src/main/java/org/springframework/r2dbc/connection/R2dbcTransactionManager.java index 1143aa1a7f..9fe2e1140e 100644 --- a/spring-r2dbc/src/main/java/org/springframework/r2dbc/connection/R2dbcTransactionManager.java +++ b/spring-r2dbc/src/main/java/org/springframework/r2dbc/connection/R2dbcTransactionManager.java @@ -24,7 +24,6 @@ import io.r2dbc.spi.IsolationLevel; import io.r2dbc.spi.Option; import io.r2dbc.spi.R2dbcException; import io.r2dbc.spi.Result; -import org.reactivestreams.Publisher; import reactor.core.publisher.Mono; import org.springframework.beans.factory.InitializingBean; @@ -38,46 +37,42 @@ import org.springframework.transaction.reactive.TransactionSynchronizationManage import org.springframework.util.Assert; /** - * {@link org.springframework.transaction.ReactiveTransactionManager} - * implementation for a single R2DBC {@link ConnectionFactory}. This class is - * capable of working in any environment with any R2DBC driver, as long as the - * setup uses a {@code ConnectionFactory} as its {@link Connection} factory - * mechanism. Binds a R2DBC {@code Connection} from the specified - * {@code ConnectionFactory} to the current subscriber context, potentially - * allowing for one context-bound {@code Connection} per {@code ConnectionFactory}. + * {@link org.springframework.transaction.ReactiveTransactionManager} implementation + * for a single R2DBC {@link ConnectionFactory}. This class is capable of working + * in any environment with any R2DBC driver, as long as the setup uses a + * {@code ConnectionFactory} as its {@link Connection} factory mechanism. + * Binds a R2DBC {@code Connection} from the specified {@code ConnectionFactory} + * to the current subscriber context, potentially allowing for one context-bound + * {@code Connection} per {@code ConnectionFactory}. * - *

Note: The {@code ConnectionFactory} that this transaction manager - * operates on needs to return independent {@code Connection}s. - * The {@code Connection}s may come from a pool (the typical case), but the - * {@code ConnectionFactory} must not return scoped {@code Connection}s - * or the like. This transaction manager will associate {@code Connection} - * with context-bound transactions itself, according to the specified propagation - * behavior. It assumes that a separate, independent {@code Connection} can - * be obtained even during an ongoing transaction. + *

Note: The {@code ConnectionFactory} that this transaction manager operates + * on needs to return independent {@code Connection}s. The {@code Connection}s + * typically come from a connection pool but the {@code ConnectionFactory} must not + * return specifically scoped or constrained {@code Connection}s. This transaction + * manager will associate {@code Connection} with context-bound transactions, + * according to the specified propagation behavior. It assumes that a separate, + * independent {@code Connection} can be obtained even during an ongoing transaction. * *

Application code is required to retrieve the R2DBC Connection via * {@link ConnectionFactoryUtils#getConnection(ConnectionFactory)} * instead of a standard R2DBC-style {@link ConnectionFactory#create()} call. * Spring classes such as {@code DatabaseClient} use this strategy implicitly. * If not used in combination with this transaction manager, the - * {@link ConnectionFactoryUtils} lookup strategy behaves exactly like the - * native {@code ConnectionFactory} lookup; it can thus be used in a portable fashion. + * {@link ConnectionFactoryUtils} lookup strategy behaves exactly like the native + * {@code ConnectionFactory} lookup; it can thus be used in a portable fashion. * - *

Alternatively, you can allow application code to work with the standard - * R2DBC lookup pattern {@link ConnectionFactory#create()}, for example for code - * that is not aware of Spring at all. In that case, define a - * {@link TransactionAwareConnectionFactoryProxy} for your target {@code ConnectionFactory}, - * and pass that proxy {@code ConnectionFactory} to your DAOs, which will automatically - * participate in Spring-managed transactions when accessing it. + *

Alternatively, you can allow application code to work with the lookup pattern + * {@link ConnectionFactory#create()}, for example for code not aware of Spring. + * In that case, define a {@link TransactionAwareConnectionFactoryProxy} for your + * target {@code ConnectionFactory}, and pass that proxy {@code ConnectionFactory} + * to your DAOs which will automatically participate in Spring-managed transactions + * when accessing it. * - *

This transaction manager triggers flush callbacks on registered transaction - * synchronizations (if synchronization is generally active), assuming resources - * operating on the underlying R2DBC {@code Connection}. - * - *

Spring's {@code TransactionDefinition} attributes are carried forward to R2DBC drivers - * using extensible R2DBC {@link io.r2dbc.spi.TransactionDefinition}. Subclasses may - * override {@link #createTransactionDefinition(TransactionDefinition)} to customize - * transaction definitions for vendor-specific attributes. + *

Spring's {@code TransactionDefinition} attributes are carried forward to + * R2DBC drivers using extensible R2DBC {@link io.r2dbc.spi.TransactionDefinition}. + * Subclasses may override {@link #createTransactionDefinition(TransactionDefinition)} + * to customize transaction definitions for vendor-specific attributes. As of 6.0.10, + * this transaction manager supports nested transactions via R2DBC savepoints as well. * * @author Mark Paluch * @author Juergen Hoeller @@ -97,7 +92,7 @@ public class R2dbcTransactionManager extends AbstractReactiveTransactionManager /** * Create a new {@code R2dbcTransactionManager} instance. - * A ConnectionFactory has to be set to be able to use it. + * A {@code ConnectionFactory} has to be set to be able to use it. * @see #setConnectionFactory */ public R2dbcTransactionManager() {} @@ -114,12 +109,13 @@ public class R2dbcTransactionManager extends AbstractReactiveTransactionManager /** - * Set the R2DBC {@link ConnectionFactory} that this instance should manage transactions for. - *

This will typically be a locally defined {@code ConnectionFactory}, for example a connection pool. - *

The {@code ConnectionFactory} passed in here needs to return independent {@link Connection}s. - * The {@code Connection}s may come from a pool (the typical case), but the {@code ConnectionFactory} - * must not return scoped {@code Connection}s or the like. - * @see TransactionAwareConnectionFactoryProxy + * Set the R2DBC {@link ConnectionFactory} that this instance should manage transactions + * for. This will typically be a locally defined {@code ConnectionFactory}, for example + * an R2DBC connection pool. + *

The {@code ConnectionFactory} passed in here needs to return independent + * {@link Connection}s. The {@code Connection}s typically come from a connection + * pool but the {@code ConnectionFactory} must not return specifically scoped or + * constrained {@code Connection}s. */ public void setConnectionFactory(@Nullable ConnectionFactory connectionFactory) { this.connectionFactory = connectionFactory; @@ -183,8 +179,7 @@ public class R2dbcTransactionManager extends AbstractReactiveTransactionManager @Override protected boolean isExistingTransaction(Object transaction) { - ConnectionFactoryTransactionObject txObject = (ConnectionFactoryTransactionObject) transaction; - return (txObject.hasConnectionHolder() && txObject.getConnectionHolder().isTransactionActive()); + return ((ConnectionFactoryTransactionObject) transaction).isTransactionActive(); } @Override @@ -193,6 +188,11 @@ public class R2dbcTransactionManager extends AbstractReactiveTransactionManager ConnectionFactoryTransactionObject txObject = (ConnectionFactoryTransactionObject) transaction; + if (definition.getPropagationBehavior() == TransactionDefinition.PROPAGATION_NESTED && + txObject.isTransactionActive()) { + return txObject.createSavepoint(); + } + return Mono.defer(() -> { Mono connectionMono; @@ -210,7 +210,7 @@ public class R2dbcTransactionManager extends AbstractReactiveTransactionManager connectionMono = Mono.just(txObject.getConnectionHolder().getConnection()); } - return connectionMono.flatMap(con -> Mono.from(doBegin(definition, con)) + return connectionMono.flatMap(con -> doBegin(definition, con) .then(prepareTransactionalConnection(con, definition)) .doOnSuccess(v -> { txObject.getConnectionHolder().setTransactionActive(true); @@ -234,12 +234,12 @@ public class R2dbcTransactionManager extends AbstractReactiveTransactionManager }).then(); } - private Publisher doBegin(TransactionDefinition definition, Connection con) { + private Mono doBegin(TransactionDefinition definition, Connection con) { io.r2dbc.spi.TransactionDefinition transactionDefinition = createTransactionDefinition(definition); if (logger.isDebugEnabled()) { logger.debug("Starting R2DBC transaction on Connection [" + con + "] using [" + transactionDefinition + "]"); } - return con.beginTransaction(transactionDefinition); + return Mono.from(con.beginTransaction(transactionDefinition)); } /** @@ -300,12 +300,11 @@ public class R2dbcTransactionManager extends AbstractReactiveTransactionManager GenericReactiveTransaction status) throws TransactionException { ConnectionFactoryTransactionObject txObject = (ConnectionFactoryTransactionObject) status.getTransaction(); - Connection connection = txObject.getConnectionHolder().getConnection(); if (status.isDebug()) { - logger.debug("Committing R2DBC transaction on Connection [" + connection + "]"); + logger.debug("Committing R2DBC transaction on Connection [" + + txObject.getConnectionHolder().getConnection() + "]"); } - return Mono.from(connection.commitTransaction()) - .onErrorMap(R2dbcException.class, ex -> translateException("R2DBC commit", ex)); + return txObject.commit().onErrorMap(R2dbcException.class, ex -> translateException("R2DBC commit", ex)); } @Override @@ -313,12 +312,11 @@ public class R2dbcTransactionManager extends AbstractReactiveTransactionManager GenericReactiveTransaction status) throws TransactionException { ConnectionFactoryTransactionObject txObject = (ConnectionFactoryTransactionObject) status.getTransaction(); - Connection connection = txObject.getConnectionHolder().getConnection(); if (status.isDebug()) { - logger.debug("Rolling back R2DBC transaction on Connection [" + connection + "]"); + logger.debug("Rolling back R2DBC transaction on Connection [" + + txObject.getConnectionHolder().getConnection() + "]"); } - return Mono.from(connection.rollbackTransaction()) - .onErrorMap(R2dbcException.class, ex -> translateException("R2DBC rollback", ex)); + return txObject.rollback().onErrorMap(R2dbcException.class, ex -> translateException("R2DBC rollback", ex)); } @Override @@ -496,6 +494,9 @@ public class R2dbcTransactionManager extends AbstractReactiveTransactionManager private boolean newConnectionHolder; + @Nullable + private String savepointName; + void setConnectionHolder(@Nullable ConnectionHolder connectionHolder, boolean newConnectionHolder) { setConnectionHolder(connectionHolder); this.newConnectionHolder = newConnectionHolder; @@ -505,10 +506,6 @@ public class R2dbcTransactionManager extends AbstractReactiveTransactionManager return this.newConnectionHolder; } - void setRollbackOnly() { - getConnectionHolder().setRollbackOnly(); - } - public void setConnectionHolder(@Nullable ConnectionHolder connectionHolder) { this.connectionHolder = connectionHolder; } @@ -521,6 +518,34 @@ public class R2dbcTransactionManager extends AbstractReactiveTransactionManager public boolean hasConnectionHolder() { return (this.connectionHolder != null); } + + public boolean isTransactionActive() { + return (this.connectionHolder != null && this.connectionHolder.isTransactionActive()); + } + + public Mono createSavepoint() { + ConnectionHolder holder = getConnectionHolder(); + this.savepointName = holder.nextSavepoint(); + return Mono.from(holder.getConnection().createSavepoint(this.savepointName)); + } + + public Mono commit() { + Connection connection = getConnectionHolder().getConnection(); + return (this.savepointName != null ? + Mono.from(connection.releaseSavepoint(this.savepointName)) : + Mono.from(connection.commitTransaction())); + } + + public Mono rollback() { + Connection connection = getConnectionHolder().getConnection(); + return (this.savepointName != null ? + Mono.from(connection.rollbackTransactionToSavepoint(this.savepointName)) : + Mono.from(connection.rollbackTransaction())); + } + + public void setRollbackOnly() { + getConnectionHolder().setRollbackOnly(); + } } } diff --git a/spring-r2dbc/src/test/java/org/springframework/r2dbc/connection/R2dbcTransactionManagerUnitTests.java b/spring-r2dbc/src/test/java/org/springframework/r2dbc/connection/R2dbcTransactionManagerUnitTests.java index b428d8ff7d..89ccc9fe2d 100644 --- a/spring-r2dbc/src/test/java/org/springframework/r2dbc/connection/R2dbcTransactionManagerUnitTests.java +++ b/spring-r2dbc/src/test/java/org/springframework/r2dbc/connection/R2dbcTransactionManagerUnitTests.java @@ -231,7 +231,6 @@ class R2dbcTransactionManagerUnitTests { @Test void testCommitFails() { when(connectionMock.commitTransaction()).thenReturn(Mono.defer(() -> Mono.error(new R2dbcBadGrammarException("Commit should fail")))); - when(connectionMock.rollbackTransaction()).thenReturn(Mono.empty()); TransactionalOperator operator = TransactionalOperator.create(tm); @@ -252,7 +251,6 @@ class R2dbcTransactionManagerUnitTests { @Test void testRollback() { - AtomicInteger commits = new AtomicInteger(); when(connectionMock.commitTransaction()).thenReturn( Mono.fromRunnable(commits::incrementAndGet)); @@ -284,11 +282,8 @@ class R2dbcTransactionManagerUnitTests { when(connectionMock.rollbackTransaction()).thenReturn(Mono.defer(() -> Mono.error(new R2dbcBadGrammarException("Commit should fail"))), Mono.empty()); TransactionalOperator operator = TransactionalOperator.create(tm); - operator.execute(reactiveTransaction -> { - reactiveTransaction.setRollbackOnly(); - return ConnectionFactoryUtils.getConnection(connectionFactoryMock) .doOnNext(connection -> connection.createStatement("foo")).then(); }).as(StepVerifier::create) @@ -306,11 +301,9 @@ class R2dbcTransactionManagerUnitTests { @SuppressWarnings("unchecked") void testConnectionReleasedWhenRollbackFails() { when(connectionMock.rollbackTransaction()).thenReturn(Mono.defer(() -> Mono.error(new R2dbcBadGrammarException("Rollback should fail"))), Mono.empty()); - - TransactionalOperator operator = TransactionalOperator.create(tm); - when(connectionMock.setTransactionIsolationLevel(any())).thenReturn(Mono.empty()); + TransactionalOperator operator = TransactionalOperator.create(tm); operator.execute(reactiveTransaction -> ConnectionFactoryUtils.getConnection(connectionFactoryMock) .doOnNext(connection -> { throw new IllegalStateException("Intentional error to trigger rollback"); @@ -333,12 +326,9 @@ class R2dbcTransactionManagerUnitTests { TransactionSynchronization.STATUS_ROLLED_BACK); TransactionalOperator operator = TransactionalOperator.create(tm); - operator.execute(tx -> { - tx.setRollbackOnly(); assertThat(tx.isNewTransaction()).isTrue(); - return TransactionSynchronizationManager.forCurrentTransaction().doOnNext( synchronizationManager -> { assertThat(synchronizationManager.hasResource(connectionFactoryMock)).isTrue(); @@ -364,15 +354,12 @@ class R2dbcTransactionManagerUnitTests { DefaultTransactionDefinition definition = new DefaultTransactionDefinition(); definition.setPropagationBehavior(TransactionDefinition.PROPAGATION_REQUIRES_NEW); - TransactionalOperator operator = TransactionalOperator.create(tm, definition); + TransactionalOperator operator = TransactionalOperator.create(tm, definition); operator.execute(tx1 -> { - assertThat(tx1.isNewTransaction()).isTrue(); - definition.setPropagationBehavior(TransactionDefinition.PROPAGATION_NEVER); return operator.execute(tx2 -> { - fail("Should have thrown IllegalTransactionStateException"); return Mono.empty(); }); @@ -384,24 +371,121 @@ class R2dbcTransactionManagerUnitTests { } @Test - void testPropagationSupportsAndRequiresNew() { + void testPropagationNestedWithExistingTransaction() { + when(connectionMock.createSavepoint("SAVEPOINT_1")).thenReturn(Mono.empty()); + when(connectionMock.releaseSavepoint("SAVEPOINT_1")).thenReturn(Mono.empty()); + when(connectionMock.commitTransaction()).thenReturn(Mono.empty()); + + DefaultTransactionDefinition definition = new DefaultTransactionDefinition(); + definition.setPropagationBehavior(TransactionDefinition.PROPAGATION_REQUIRES_NEW); + + TransactionalOperator operator = TransactionalOperator.create(tm, definition); + operator.execute(tx1 -> { + assertThat(tx1.isNewTransaction()).isTrue(); + definition.setPropagationBehavior(TransactionDefinition.PROPAGATION_NESTED); + return operator.execute(tx2 -> { + assertThat(tx2.isNewTransaction()).isTrue(); + return Mono.empty(); + }); + }).as(StepVerifier::create) + .verifyComplete(); + + verify(connectionMock).createSavepoint("SAVEPOINT_1"); + verify(connectionMock).releaseSavepoint("SAVEPOINT_1"); + verify(connectionMock).commitTransaction(); + verify(connectionMock).close(); + } + + @Test + void testPropagationNestedWithExistingTransactionAndRollback() { + when(connectionMock.createSavepoint("SAVEPOINT_1")).thenReturn(Mono.empty()); + when(connectionMock.rollbackTransactionToSavepoint("SAVEPOINT_1")).thenReturn(Mono.empty()); + when(connectionMock.commitTransaction()).thenReturn(Mono.empty()); + + DefaultTransactionDefinition definition = new DefaultTransactionDefinition(); + definition.setPropagationBehavior(TransactionDefinition.PROPAGATION_REQUIRES_NEW); + + TransactionalOperator operator = TransactionalOperator.create(tm, definition); + operator.execute(tx1 -> { + assertThat(tx1.isNewTransaction()).isTrue(); + definition.setPropagationBehavior(TransactionDefinition.PROPAGATION_NESTED); + return operator.execute(tx2 -> { + assertThat(tx2.isNewTransaction()).isTrue(); + tx2.setRollbackOnly(); + return Mono.empty(); + }); + }).as(StepVerifier::create) + .verifyComplete(); + + verify(connectionMock).createSavepoint("SAVEPOINT_1"); + verify(connectionMock).rollbackTransactionToSavepoint("SAVEPOINT_1"); + verify(connectionMock).commitTransaction(); + verify(connectionMock).close(); + } + + @Test + void testPropagationSupportsAndNested() { when(connectionMock.commitTransaction()).thenReturn(Mono.empty()); DefaultTransactionDefinition definition = new DefaultTransactionDefinition(); definition.setPropagationBehavior(TransactionDefinition.PROPAGATION_SUPPORTS); + TransactionalOperator operator = TransactionalOperator.create(tm, definition); + operator.execute(tx1 -> { + assertThat(tx1.isNewTransaction()).isFalse(); + DefaultTransactionDefinition innerDef = new DefaultTransactionDefinition(); + innerDef.setPropagationBehavior(TransactionDefinition.PROPAGATION_NESTED); + TransactionalOperator inner = TransactionalOperator.create(tm, innerDef); + return inner.execute(tx2 -> { + assertThat(tx2.isNewTransaction()).isTrue(); + return Mono.empty(); + }); + }).as(StepVerifier::create) + .verifyComplete(); + + verify(connectionMock).commitTransaction(); + verify(connectionMock).close(); + } + + @Test + void testPropagationSupportsAndNestedWithRollback() { + when(connectionMock.rollbackTransaction()).thenReturn(Mono.empty()); + DefaultTransactionDefinition definition = new DefaultTransactionDefinition(); + definition.setPropagationBehavior(TransactionDefinition.PROPAGATION_SUPPORTS); + + TransactionalOperator operator = TransactionalOperator.create(tm, definition); operator.execute(tx1 -> { + assertThat(tx1.isNewTransaction()).isFalse(); + DefaultTransactionDefinition innerDef = new DefaultTransactionDefinition(); + innerDef.setPropagationBehavior(TransactionDefinition.PROPAGATION_NESTED); + TransactionalOperator inner = TransactionalOperator.create(tm, innerDef); + return inner.execute(tx2 -> { + assertThat(tx2.isNewTransaction()).isTrue(); + tx2.setRollbackOnly(); + return Mono.empty(); + }); + }).as(StepVerifier::create) + .verifyComplete(); - assertThat(tx1.isNewTransaction()).isFalse(); + verify(connectionMock).rollbackTransaction(); + verify(connectionMock).close(); + } + + @Test + void testPropagationSupportsAndRequiresNew() { + when(connectionMock.commitTransaction()).thenReturn(Mono.empty()); + DefaultTransactionDefinition definition = new DefaultTransactionDefinition(); + definition.setPropagationBehavior(TransactionDefinition.PROPAGATION_SUPPORTS); + + TransactionalOperator operator = TransactionalOperator.create(tm, definition); + operator.execute(tx1 -> { + assertThat(tx1.isNewTransaction()).isFalse(); DefaultTransactionDefinition innerDef = new DefaultTransactionDefinition(); - innerDef.setPropagationBehavior( - TransactionDefinition.PROPAGATION_REQUIRES_NEW); + innerDef.setPropagationBehavior(TransactionDefinition.PROPAGATION_REQUIRES_NEW); TransactionalOperator inner = TransactionalOperator.create(tm, innerDef); - return inner.execute(tx2 -> { - assertThat(tx2.isNewTransaction()).isTrue(); return Mono.empty(); }); @@ -412,6 +496,31 @@ class R2dbcTransactionManagerUnitTests { verify(connectionMock).close(); } + @Test + void testPropagationSupportsAndRequiresNewWithRollback() { + when(connectionMock.rollbackTransaction()).thenReturn(Mono.empty()); + + DefaultTransactionDefinition definition = new DefaultTransactionDefinition(); + definition.setPropagationBehavior(TransactionDefinition.PROPAGATION_SUPPORTS); + + TransactionalOperator operator = TransactionalOperator.create(tm, definition); + operator.execute(tx1 -> { + assertThat(tx1.isNewTransaction()).isFalse(); + DefaultTransactionDefinition innerDef = new DefaultTransactionDefinition(); + innerDef.setPropagationBehavior(TransactionDefinition.PROPAGATION_REQUIRES_NEW); + TransactionalOperator inner = TransactionalOperator.create(tm, innerDef); + return inner.execute(tx2 -> { + assertThat(tx2.isNewTransaction()).isTrue(); + tx2.setRollbackOnly(); + return Mono.empty(); + }); + }).as(StepVerifier::create) + .verifyComplete(); + + verify(connectionMock).rollbackTransaction(); + verify(connectionMock).close(); + } + private static class TestTransactionSynchronization implements TransactionSynchronization {