diff --git a/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java b/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java index bfd6bf3ec9b..5a383eaa1bc 100644 --- a/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java +++ b/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java @@ -254,7 +254,6 @@ public class KafkaProducer implements Producer { private final ProducerInterceptors interceptors; private final ApiVersions apiVersions; private final TransactionManager transactionManager; - private TransactionalRequestResult initTransactionsResult; /** * A producer is instantiated by providing a set of key-value pairs as configuration. Valid configuration strings @@ -609,20 +608,9 @@ public class KafkaProducer implements Producer { */ public void initTransactions() { throwIfNoTransactionManager(); - if (initTransactionsResult == null) { - initTransactionsResult = transactionManager.initializeTransactions(); - sender.wakeup(); - } - - try { - if (initTransactionsResult.await(maxBlockTimeMs, TimeUnit.MILLISECONDS)) { - initTransactionsResult = null; - } else { - throw new TimeoutException("Timeout expired while initializing transactional state in " + maxBlockTimeMs + "ms."); - } - } catch (InterruptedException e) { - throw new InterruptException("Initialize transactions interrupted.", e); - } + TransactionalRequestResult result = transactionManager.initializeTransactions(); + sender.wakeup(); + result.await(maxBlockTimeMs, TimeUnit.MILLISECONDS); } /** @@ -682,6 +670,11 @@ public class KafkaProducer implements Producer { * errors, this method will throw the last received exception immediately and the transaction will not be committed. * So all {@link #send(ProducerRecord)} calls in a transaction must succeed in order for this method to succeed. * + * Note that this method will raise {@link TimeoutException} if the transaction cannot be committed before expiration + * of {@code max.block.ms}. Additionally, it will raise {@link InterruptException} if interrupted. + * It is safe to retry in either case, but it is not possible to attempt a different operation (such as abortTransaction) + * since the commit may already be in the progress of completing. If not retrying, the only option is to close the producer. + * * @throws IllegalStateException if no transactional.id has been configured or no transaction has been started * @throws ProducerFencedException fatal error indicating another producer with the same transactional.id is active * @throws org.apache.kafka.common.errors.UnsupportedVersionException fatal error indicating the broker @@ -690,12 +683,14 @@ public class KafkaProducer implements Producer { * transactional.id is not authorized. See the exception for more details * @throws KafkaException if the producer has encountered a previous fatal or abortable error, or for any * other unexpected error + * @throws TimeoutException if the time taken for committing the transaction has surpassed max.block.ms. + * @throws InterruptException if the thread is interrupted while blocked */ public void commitTransaction() throws ProducerFencedException { throwIfNoTransactionManager(); TransactionalRequestResult result = transactionManager.beginCommit(); sender.wakeup(); - result.await(); + result.await(maxBlockTimeMs, TimeUnit.MILLISECONDS); } /** @@ -703,6 +698,11 @@ public class KafkaProducer implements Producer { * This call will throw an exception immediately if any prior {@link #send(ProducerRecord)} calls failed with a * {@link ProducerFencedException} or an instance of {@link org.apache.kafka.common.errors.AuthorizationException}. * + * Note that this method will raise {@link TimeoutException} if the transaction cannot be aborted before expiration + * of {@code max.block.ms}. Additionally, it will raise {@link InterruptException} if interrupted. + * It is safe to retry in either case, but it is not possible to attempt a different operation (such as commitTransaction) + * since the abort may already be in the progress of completing. If not retrying, the only option is to close the producer. + * * @throws IllegalStateException if no transactional.id has been configured or no transaction has been started * @throws ProducerFencedException fatal error indicating another producer with the same transactional.id is active * @throws org.apache.kafka.common.errors.UnsupportedVersionException fatal error indicating the broker @@ -710,12 +710,14 @@ public class KafkaProducer implements Producer { * @throws org.apache.kafka.common.errors.AuthorizationException fatal error indicating that the configured * transactional.id is not authorized. See the exception for more details * @throws KafkaException if the producer has encountered a previous fatal error or for any other unexpected error + * @throws TimeoutException if the time taken for aborting the transaction has surpassed max.block.ms. + * @throws InterruptException if the thread is interrupted while blocked */ public void abortTransaction() throws ProducerFencedException { throwIfNoTransactionManager(); TransactionalRequestResult result = transactionManager.beginAbort(); sender.wakeup(); - result.await(); + result.await(maxBlockTimeMs, TimeUnit.MILLISECONDS); } /** diff --git a/clients/src/main/java/org/apache/kafka/clients/producer/internals/TransactionManager.java b/clients/src/main/java/org/apache/kafka/clients/producer/internals/TransactionManager.java index dd884c6be72..3d41a3c59dd 100644 --- a/clients/src/main/java/org/apache/kafka/clients/producer/internals/TransactionManager.java +++ b/clients/src/main/java/org/apache/kafka/clients/producer/internals/TransactionManager.java @@ -56,6 +56,7 @@ import java.util.Iterator; import java.util.Map; import java.util.PriorityQueue; import java.util.Set; +import java.util.function.Supplier; import static org.apache.kafka.common.record.RecordBatch.NO_PRODUCER_EPOCH; import static org.apache.kafka.common.record.RecordBatch.NO_PRODUCER_ID; @@ -100,6 +101,7 @@ public class TransactionManager { private final Set pendingPartitionsInTransaction; private final Set partitionsInTransaction; private final Map pendingTxnOffsetCommits; + private TransactionalRequestResult pendingResult; // This is used by the TxnRequestHandlers to control how long to back off before a given request is retried. // For instance, this value is lowered by the AddPartitionsToTxnHandler when it receives a CONCURRENT_TRANSACTIONS @@ -201,14 +203,15 @@ public class TransactionManager { } public synchronized TransactionalRequestResult initializeTransactions() { - ensureTransactional(); - transitionTo(State.INITIALIZING); - setProducerIdAndEpoch(ProducerIdAndEpoch.NONE); - this.nextSequence.clear(); - InitProducerIdRequest.Builder builder = new InitProducerIdRequest.Builder(transactionalId, transactionTimeoutMs); - InitProducerIdHandler handler = new InitProducerIdHandler(builder); - enqueueRequest(handler); - return handler.result; + return handleCachedTransactionRequestResult(() -> { + transitionTo(State.INITIALIZING); + setProducerIdAndEpoch(ProducerIdAndEpoch.NONE); + this.nextSequence.clear(); + InitProducerIdRequest.Builder builder = new InitProducerIdRequest.Builder(transactionalId, transactionTimeoutMs); + InitProducerIdHandler handler = new InitProducerIdHandler(builder); + enqueueRequest(handler); + return handler.result; + }, State.INITIALIZING); } public synchronized void beginTransaction() { @@ -218,21 +221,23 @@ public class TransactionManager { } public synchronized TransactionalRequestResult beginCommit() { - ensureTransactional(); - maybeFailWithError(); - transitionTo(State.COMMITTING_TRANSACTION); - return beginCompletingTransaction(TransactionResult.COMMIT); + return handleCachedTransactionRequestResult(() -> { + maybeFailWithError(); + transitionTo(State.COMMITTING_TRANSACTION); + return beginCompletingTransaction(TransactionResult.COMMIT); + }, State.COMMITTING_TRANSACTION); } public synchronized TransactionalRequestResult beginAbort() { - ensureTransactional(); - if (currentState != State.ABORTABLE_ERROR) - maybeFailWithError(); - transitionTo(State.ABORTING_TRANSACTION); + return handleCachedTransactionRequestResult(() -> { + if (currentState != State.ABORTABLE_ERROR) + maybeFailWithError(); + transitionTo(State.ABORTING_TRANSACTION); - // We're aborting the transaction, so there should be no need to add new partitions - newPartitionsInTransaction.clear(); - return beginCompletingTransaction(TransactionResult.ABORT); + // We're aborting the transaction, so there should be no need to add new partitions + newPartitionsInTransaction.clear(); + return beginCompletingTransaction(TransactionResult.ABORT); + }, State.ABORTING_TRANSACTION); } private TransactionalRequestResult beginCompletingTransaction(TransactionResult transactionResult) { @@ -849,6 +854,22 @@ public class TransactionManager { return new TxnOffsetCommitHandler(result, builder); } + private TransactionalRequestResult handleCachedTransactionRequestResult( + Supplier transactionalRequestResultSupplier, + State targetState) { + ensureTransactional(); + + if (pendingResult != null && currentState == targetState) { + TransactionalRequestResult result = pendingResult; + if (result.isCompleted()) + pendingResult = null; + return result; + } + + pendingResult = transactionalRequestResultSupplier.get(); + return pendingResult; + } + abstract class TxnRequestHandler implements RequestCompletionHandler { protected final TransactionalRequestResult result; private boolean isRetry = false; @@ -857,8 +878,8 @@ public class TransactionManager { this.result = result; } - TxnRequestHandler() { - this(new TransactionalRequestResult()); + TxnRequestHandler(String operation) { + this(new TransactionalRequestResult(operation)); } void fatalError(RuntimeException e) { @@ -949,6 +970,7 @@ public class TransactionManager { private final InitProducerIdRequest.Builder builder; private InitProducerIdHandler(InitProducerIdRequest.Builder builder) { + super("InitProducerId"); this.builder = builder; } @@ -991,6 +1013,7 @@ public class TransactionManager { private long retryBackoffMs; private AddPartitionsToTxnHandler(AddPartitionsToTxnRequest.Builder builder) { + super("AddPartitionsToTxn"); this.builder = builder; this.retryBackoffMs = TransactionManager.this.retryBackoffMs; } @@ -1094,6 +1117,7 @@ public class TransactionManager { private final FindCoordinatorRequest.Builder builder; private FindCoordinatorHandler(FindCoordinatorRequest.Builder builder) { + super("FindCoordinator"); this.builder = builder; } @@ -1150,6 +1174,7 @@ public class TransactionManager { private final EndTxnRequest.Builder builder; private EndTxnHandler(EndTxnRequest.Builder builder) { + super("EndTxn(" + builder.result() + ")"); this.builder = builder; } @@ -1199,6 +1224,7 @@ public class TransactionManager { private AddOffsetsToTxnHandler(AddOffsetsToTxnRequest.Builder builder, Map offsets) { + super("AddOffsetsToTxn"); this.builder = builder; this.offsets = offsets; } diff --git a/clients/src/main/java/org/apache/kafka/clients/producer/internals/TransactionalRequestResult.java b/clients/src/main/java/org/apache/kafka/clients/producer/internals/TransactionalRequestResult.java index 9c02e94c045..dc1aefb0def 100644 --- a/clients/src/main/java/org/apache/kafka/clients/producer/internals/TransactionalRequestResult.java +++ b/clients/src/main/java/org/apache/kafka/clients/producer/internals/TransactionalRequestResult.java @@ -17,21 +17,26 @@ package org.apache.kafka.clients.producer.internals; +import org.apache.kafka.common.errors.InterruptException; +import org.apache.kafka.common.errors.TimeoutException; + +import java.util.Locale; import java.util.concurrent.CountDownLatch; import java.util.concurrent.TimeUnit; public final class TransactionalRequestResult { - static final TransactionalRequestResult COMPLETE = new TransactionalRequestResult(new CountDownLatch(0)); private final CountDownLatch latch; private volatile RuntimeException error = null; + private final String operation; - public TransactionalRequestResult() { - this(new CountDownLatch(1)); + public TransactionalRequestResult(String operation) { + this(new CountDownLatch(1), operation); } - private TransactionalRequestResult(CountDownLatch latch) { + private TransactionalRequestResult(CountDownLatch latch, String operation) { this.latch = latch; + this.operation = operation; } public void setError(RuntimeException error) { @@ -58,11 +63,18 @@ public final class TransactionalRequestResult { throw error(); } - public boolean await(long timeout, TimeUnit unit) throws InterruptedException { - boolean success = latch.await(timeout, unit); - if (!isSuccessful()) - throw error(); - return success; + public void await(long timeout, TimeUnit unit) { + try { + boolean success = latch.await(timeout, unit); + if (!isSuccessful()) { + throw error(); + } + if (!success) { + throw new TimeoutException("Timeout expired after " + timeout + unit.name().toLowerCase(Locale.ROOT) + " while awaiting " + operation); + } + } catch (InterruptedException e) { + throw new InterruptException("Received interrupt while awaiting " + operation, e); + } } public RuntimeException error() { diff --git a/clients/src/test/java/org/apache/kafka/clients/producer/internals/TransactionManagerTest.java b/clients/src/test/java/org/apache/kafka/clients/producer/internals/TransactionManagerTest.java index 8da76fb90fe..733541917d1 100644 --- a/clients/src/test/java/org/apache/kafka/clients/producer/internals/TransactionManagerTest.java +++ b/clients/src/test/java/org/apache/kafka/clients/producer/internals/TransactionManagerTest.java @@ -75,6 +75,7 @@ import java.util.Map; import java.util.Set; import java.util.concurrent.ExecutionException; import java.util.concurrent.Future; +import java.util.concurrent.TimeUnit; import static java.util.Collections.singleton; import static java.util.Collections.singletonList; @@ -2281,6 +2282,69 @@ public class TransactionManagerTest { assertTrue(manager.shouldResetProducerStateAfterResolvingSequences()); } + @Test + public void testRetryAbortTransaction() throws InterruptedException { + verifyCommitOrAbortTranscationRetriable(TransactionResult.ABORT, TransactionResult.ABORT); + } + + @Test + public void testRetryCommitTransaction() throws InterruptedException { + verifyCommitOrAbortTranscationRetriable(TransactionResult.COMMIT, TransactionResult.COMMIT); + } + + @Test(expected = KafkaException.class) + public void testRetryAbortTransactionAfterCommitTimeout() throws InterruptedException { + verifyCommitOrAbortTranscationRetriable(TransactionResult.COMMIT, TransactionResult.ABORT); + } + + @Test(expected = KafkaException.class) + public void testRetryCommitTransactionAfterAbortTimeout() throws InterruptedException { + verifyCommitOrAbortTranscationRetriable(TransactionResult.ABORT, TransactionResult.COMMIT); + } + + private void verifyCommitOrAbortTranscationRetriable(TransactionResult firstTransactionResult, + TransactionResult retryTransactionResult) + throws InterruptedException { + final long pid = 13131L; + final short epoch = 1; + + doInitTransactions(pid, epoch); + + transactionManager.beginTransaction(); + transactionManager.maybeAddPartitionToTransaction(tp0); + + accumulator.append(tp0, time.milliseconds(), "key".getBytes(), + "value".getBytes(), Record.EMPTY_HEADERS, null, MAX_BLOCK_TIMEOUT); + + prepareAddPartitionsToTxnResponse(Errors.NONE, tp0, epoch, pid); + + prepareProduceResponse(Errors.NONE, pid, epoch); + sender.run(time.milliseconds()); // send addPartitions. + sender.run(time.milliseconds()); // send produce request. + + TransactionalRequestResult result = firstTransactionResult == TransactionResult.COMMIT ? + transactionManager.beginCommit() : transactionManager.beginAbort(); + prepareEndTxnResponse(Errors.NONE, firstTransactionResult, pid, epoch, true); + sender.run(time.milliseconds()); + assertFalse(result.isCompleted()); + + try { + result.await(MAX_BLOCK_TIMEOUT, TimeUnit.MILLISECONDS); + fail("Should have raised TimeoutException"); + } catch (TimeoutException e) { + } + + prepareFindCoordinatorResponse(Errors.NONE, false, CoordinatorType.TRANSACTION, transactionalId); + sender.run(time.milliseconds()); + TransactionalRequestResult retryResult = retryTransactionResult == TransactionResult.COMMIT ? + transactionManager.beginCommit() : transactionManager.beginAbort(); + assertEquals(retryResult, result); // check if cached result is reused. + prepareEndTxnResponse(Errors.NONE, retryTransactionResult, pid, epoch, false); + sender.run(time.milliseconds()); + assertTrue(retryResult.isCompleted()); + assertFalse(transactionManager.hasOngoingTransaction()); + } + private void verifyAddPartitionsFailsWithPartitionLevelError(final Errors error) throws InterruptedException { final long pid = 1L; final short epoch = 1; @@ -2380,7 +2444,15 @@ public class TransactionManagerTest { } private void prepareEndTxnResponse(Errors error, final TransactionResult result, final long pid, final short epoch) { - client.prepareResponse(endTxnMatcher(result, pid, epoch), new EndTxnResponse(0, error)); + this.prepareEndTxnResponse(error, result, pid, epoch, false); + } + + private void prepareEndTxnResponse(Errors error, + final TransactionResult result, + final long pid, + final short epoch, + final boolean shouldDisconnect) { + client.prepareResponse(endTxnMatcher(result, pid, epoch), new EndTxnResponse(0, error), shouldDisconnect); } private void sendEndTxnResponse(Errors error, final TransactionResult result, final long pid, final short epoch) { diff --git a/core/src/test/scala/integration/kafka/api/AuthorizerIntegrationTest.scala b/core/src/test/scala/integration/kafka/api/AuthorizerIntegrationTest.scala index f570a44113f..e0c2832e358 100644 --- a/core/src/test/scala/integration/kafka/api/AuthorizerIntegrationTest.scala +++ b/core/src/test/scala/integration/kafka/api/AuthorizerIntegrationTest.scala @@ -101,7 +101,7 @@ class AuthorizerIntegrationTest extends BaseRequestTest { val adminClients = Buffer[AdminClient]() producerConfig.setProperty(ProducerConfig.ACKS_CONFIG, "1") - producerConfig.setProperty(ProducerConfig.MAX_BLOCK_MS_CONFIG, "3000") + producerConfig.setProperty(ProducerConfig.MAX_BLOCK_MS_CONFIG, "50000") consumerConfig.setProperty(ConsumerConfig.GROUP_ID_CONFIG, group) override def propertyOverrides(properties: Properties): Unit = { diff --git a/core/src/test/scala/integration/kafka/api/TransactionsTest.scala b/core/src/test/scala/integration/kafka/api/TransactionsTest.scala index e3b447eba0a..34dea70ca8c 100644 --- a/core/src/test/scala/integration/kafka/api/TransactionsTest.scala +++ b/core/src/test/scala/integration/kafka/api/TransactionsTest.scala @@ -28,7 +28,7 @@ import kafka.utils.TestUtils.consumeRecords import org.apache.kafka.clients.consumer.{ConsumerConfig, KafkaConsumer, OffsetAndMetadata} import org.apache.kafka.clients.producer.{KafkaProducer, ProducerRecord} import org.apache.kafka.common.{KafkaException, TopicPartition} -import org.apache.kafka.common.errors.ProducerFencedException +import org.apache.kafka.common.errors.{ProducerFencedException, TimeoutException} import org.junit.{After, Before, Test} import org.junit.Assert._ @@ -560,12 +560,25 @@ class TransactionsTest extends KafkaServerTestHarness { def testConsecutivelyRunInitTransactions(): Unit = { val producer = createTransactionalProducer(transactionalId = "normalProducer") + producer.initTransactions() + producer.initTransactions() + fail("Should have raised a KafkaException") + } + + @Test(expected = classOf[TimeoutException]) + def testCommitTransactionTimeout(): Unit = { + val producer = createTransactionalProducer("transactionalProducer", maxBlockMs = 1000) + producer.initTransactions() + producer.beginTransaction() + producer.send(new ProducerRecord[Array[Byte], Array[Byte]](topic1, "foobar".getBytes)) + + for (i <- 0 until servers.size) + killBroker(i) // pretend all brokers not available + try { - producer.initTransactions() - producer.initTransactions() - fail("Should have raised a KafkaException") + producer.commitTransaction() } finally { - producer.close() + producer.close(0, TimeUnit.MILLISECONDS) } } @@ -615,9 +628,11 @@ class TransactionsTest extends KafkaServerTestHarness { } private def createTransactionalProducer(transactionalId: String, - transactionTimeoutMs: Long = 60000): KafkaProducer[Array[Byte], Array[Byte]] = { + transactionTimeoutMs: Long = 60000, + maxBlockMs: Long = 60000): KafkaProducer[Array[Byte], Array[Byte]] = { val producer = TestUtils.createTransactionalProducer(transactionalId, servers, - transactionTimeoutMs = transactionTimeoutMs) + transactionTimeoutMs = transactionTimeoutMs, + maxBlockMs = maxBlockMs) transactionalProducers += producer producer } diff --git a/core/src/test/scala/unit/kafka/utils/TestUtils.scala b/core/src/test/scala/unit/kafka/utils/TestUtils.scala index ecdb8c8739c..ce0714df8cd 100755 --- a/core/src/test/scala/unit/kafka/utils/TestUtils.scala +++ b/core/src/test/scala/unit/kafka/utils/TestUtils.scala @@ -1284,7 +1284,8 @@ object TestUtils extends Logging { def createTransactionalProducer(transactionalId: String, servers: Seq[KafkaServer], batchSize: Int = 16384, - transactionTimeoutMs: Long = 60000) = { + transactionTimeoutMs: Long = 60000, + maxBlockMs: Long = 60000) = { val props = new Properties() props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, TestUtils.getBrokerListStrFromServers(servers)) props.put(ProducerConfig.ACKS_CONFIG, "all") @@ -1292,6 +1293,7 @@ object TestUtils extends Logging { props.put(ProducerConfig.TRANSACTIONAL_ID_CONFIG, transactionalId) props.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, "true") props.put(ProducerConfig.TRANSACTION_TIMEOUT_CONFIG, transactionTimeoutMs.toString) + props.put(ProducerConfig.MAX_BLOCK_MS_CONFIG, maxBlockMs.toString) new KafkaProducer[Array[Byte], Array[Byte]](props, new ByteArraySerializer, new ByteArraySerializer) }