From 201da0542726472d954080d54bc585b111aaf86f Mon Sep 17 00:00:00 2001 From: huxi Date: Fri, 22 Feb 2019 06:18:36 +0800 Subject: [PATCH] KAFKA-7763; Calls to commitTransaction and abortTransaction should not block indefinitely (#6066) Currently, commitTransaction and abortTransaction wait indefinitely for the respective operation to be completed. This patch uses the producer's max block time to limit the time that we will wait. If the timeout elapses, we raise a TimeoutException, which allows the user to either close the producer or retry the operation. Reviewers: Guozhang Wang , Jason Gustafson --- .../kafka/clients/producer/KafkaProducer.java | 36 ++++----- .../internals/TransactionManager.java | 68 +++++++++++------ .../internals/TransactionalRequestResult.java | 30 +++++--- .../internals/TransactionManagerTest.java | 74 ++++++++++++++++++- .../kafka/api/AuthorizerIntegrationTest.scala | 2 +- .../kafka/api/TransactionsTest.scala | 29 ++++++-- .../scala/unit/kafka/utils/TestUtils.scala | 4 +- 7 files changed, 186 insertions(+), 57 deletions(-) diff --git a/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java b/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java index bfd6bf3ec9b..5a383eaa1bc 100644 --- a/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java +++ b/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java @@ -254,7 +254,6 @@ public class KafkaProducer implements Producer { private final ProducerInterceptors interceptors; private final ApiVersions apiVersions; private final TransactionManager transactionManager; - private TransactionalRequestResult initTransactionsResult; /** * A producer is instantiated by providing a set of key-value pairs as configuration. Valid configuration strings @@ -609,20 +608,9 @@ public class KafkaProducer implements Producer { */ public void initTransactions() { throwIfNoTransactionManager(); - if (initTransactionsResult == null) { - initTransactionsResult = transactionManager.initializeTransactions(); - sender.wakeup(); - } - - try { - if (initTransactionsResult.await(maxBlockTimeMs, TimeUnit.MILLISECONDS)) { - initTransactionsResult = null; - } else { - throw new TimeoutException("Timeout expired while initializing transactional state in " + maxBlockTimeMs + "ms."); - } - } catch (InterruptedException e) { - throw new InterruptException("Initialize transactions interrupted.", e); - } + TransactionalRequestResult result = transactionManager.initializeTransactions(); + sender.wakeup(); + result.await(maxBlockTimeMs, TimeUnit.MILLISECONDS); } /** @@ -682,6 +670,11 @@ public class KafkaProducer implements Producer { * errors, this method will throw the last received exception immediately and the transaction will not be committed. * So all {@link #send(ProducerRecord)} calls in a transaction must succeed in order for this method to succeed. * + * Note that this method will raise {@link TimeoutException} if the transaction cannot be committed before expiration + * of {@code max.block.ms}. Additionally, it will raise {@link InterruptException} if interrupted. + * It is safe to retry in either case, but it is not possible to attempt a different operation (such as abortTransaction) + * since the commit may already be in the progress of completing. If not retrying, the only option is to close the producer. + * * @throws IllegalStateException if no transactional.id has been configured or no transaction has been started * @throws ProducerFencedException fatal error indicating another producer with the same transactional.id is active * @throws org.apache.kafka.common.errors.UnsupportedVersionException fatal error indicating the broker @@ -690,12 +683,14 @@ public class KafkaProducer implements Producer { * transactional.id is not authorized. See the exception for more details * @throws KafkaException if the producer has encountered a previous fatal or abortable error, or for any * other unexpected error + * @throws TimeoutException if the time taken for committing the transaction has surpassed max.block.ms. + * @throws InterruptException if the thread is interrupted while blocked */ public void commitTransaction() throws ProducerFencedException { throwIfNoTransactionManager(); TransactionalRequestResult result = transactionManager.beginCommit(); sender.wakeup(); - result.await(); + result.await(maxBlockTimeMs, TimeUnit.MILLISECONDS); } /** @@ -703,6 +698,11 @@ public class KafkaProducer implements Producer { * This call will throw an exception immediately if any prior {@link #send(ProducerRecord)} calls failed with a * {@link ProducerFencedException} or an instance of {@link org.apache.kafka.common.errors.AuthorizationException}. * + * Note that this method will raise {@link TimeoutException} if the transaction cannot be aborted before expiration + * of {@code max.block.ms}. Additionally, it will raise {@link InterruptException} if interrupted. + * It is safe to retry in either case, but it is not possible to attempt a different operation (such as commitTransaction) + * since the abort may already be in the progress of completing. If not retrying, the only option is to close the producer. + * * @throws IllegalStateException if no transactional.id has been configured or no transaction has been started * @throws ProducerFencedException fatal error indicating another producer with the same transactional.id is active * @throws org.apache.kafka.common.errors.UnsupportedVersionException fatal error indicating the broker @@ -710,12 +710,14 @@ public class KafkaProducer implements Producer { * @throws org.apache.kafka.common.errors.AuthorizationException fatal error indicating that the configured * transactional.id is not authorized. See the exception for more details * @throws KafkaException if the producer has encountered a previous fatal error or for any other unexpected error + * @throws TimeoutException if the time taken for aborting the transaction has surpassed max.block.ms. + * @throws InterruptException if the thread is interrupted while blocked */ public void abortTransaction() throws ProducerFencedException { throwIfNoTransactionManager(); TransactionalRequestResult result = transactionManager.beginAbort(); sender.wakeup(); - result.await(); + result.await(maxBlockTimeMs, TimeUnit.MILLISECONDS); } /** diff --git a/clients/src/main/java/org/apache/kafka/clients/producer/internals/TransactionManager.java b/clients/src/main/java/org/apache/kafka/clients/producer/internals/TransactionManager.java index dd884c6be72..3d41a3c59dd 100644 --- a/clients/src/main/java/org/apache/kafka/clients/producer/internals/TransactionManager.java +++ b/clients/src/main/java/org/apache/kafka/clients/producer/internals/TransactionManager.java @@ -56,6 +56,7 @@ import java.util.Iterator; import java.util.Map; import java.util.PriorityQueue; import java.util.Set; +import java.util.function.Supplier; import static org.apache.kafka.common.record.RecordBatch.NO_PRODUCER_EPOCH; import static org.apache.kafka.common.record.RecordBatch.NO_PRODUCER_ID; @@ -100,6 +101,7 @@ public class TransactionManager { private final Set pendingPartitionsInTransaction; private final Set partitionsInTransaction; private final Map pendingTxnOffsetCommits; + private TransactionalRequestResult pendingResult; // This is used by the TxnRequestHandlers to control how long to back off before a given request is retried. // For instance, this value is lowered by the AddPartitionsToTxnHandler when it receives a CONCURRENT_TRANSACTIONS @@ -201,14 +203,15 @@ public class TransactionManager { } public synchronized TransactionalRequestResult initializeTransactions() { - ensureTransactional(); - transitionTo(State.INITIALIZING); - setProducerIdAndEpoch(ProducerIdAndEpoch.NONE); - this.nextSequence.clear(); - InitProducerIdRequest.Builder builder = new InitProducerIdRequest.Builder(transactionalId, transactionTimeoutMs); - InitProducerIdHandler handler = new InitProducerIdHandler(builder); - enqueueRequest(handler); - return handler.result; + return handleCachedTransactionRequestResult(() -> { + transitionTo(State.INITIALIZING); + setProducerIdAndEpoch(ProducerIdAndEpoch.NONE); + this.nextSequence.clear(); + InitProducerIdRequest.Builder builder = new InitProducerIdRequest.Builder(transactionalId, transactionTimeoutMs); + InitProducerIdHandler handler = new InitProducerIdHandler(builder); + enqueueRequest(handler); + return handler.result; + }, State.INITIALIZING); } public synchronized void beginTransaction() { @@ -218,21 +221,23 @@ public class TransactionManager { } public synchronized TransactionalRequestResult beginCommit() { - ensureTransactional(); - maybeFailWithError(); - transitionTo(State.COMMITTING_TRANSACTION); - return beginCompletingTransaction(TransactionResult.COMMIT); + return handleCachedTransactionRequestResult(() -> { + maybeFailWithError(); + transitionTo(State.COMMITTING_TRANSACTION); + return beginCompletingTransaction(TransactionResult.COMMIT); + }, State.COMMITTING_TRANSACTION); } public synchronized TransactionalRequestResult beginAbort() { - ensureTransactional(); - if (currentState != State.ABORTABLE_ERROR) - maybeFailWithError(); - transitionTo(State.ABORTING_TRANSACTION); + return handleCachedTransactionRequestResult(() -> { + if (currentState != State.ABORTABLE_ERROR) + maybeFailWithError(); + transitionTo(State.ABORTING_TRANSACTION); - // We're aborting the transaction, so there should be no need to add new partitions - newPartitionsInTransaction.clear(); - return beginCompletingTransaction(TransactionResult.ABORT); + // We're aborting the transaction, so there should be no need to add new partitions + newPartitionsInTransaction.clear(); + return beginCompletingTransaction(TransactionResult.ABORT); + }, State.ABORTING_TRANSACTION); } private TransactionalRequestResult beginCompletingTransaction(TransactionResult transactionResult) { @@ -849,6 +854,22 @@ public class TransactionManager { return new TxnOffsetCommitHandler(result, builder); } + private TransactionalRequestResult handleCachedTransactionRequestResult( + Supplier transactionalRequestResultSupplier, + State targetState) { + ensureTransactional(); + + if (pendingResult != null && currentState == targetState) { + TransactionalRequestResult result = pendingResult; + if (result.isCompleted()) + pendingResult = null; + return result; + } + + pendingResult = transactionalRequestResultSupplier.get(); + return pendingResult; + } + abstract class TxnRequestHandler implements RequestCompletionHandler { protected final TransactionalRequestResult result; private boolean isRetry = false; @@ -857,8 +878,8 @@ public class TransactionManager { this.result = result; } - TxnRequestHandler() { - this(new TransactionalRequestResult()); + TxnRequestHandler(String operation) { + this(new TransactionalRequestResult(operation)); } void fatalError(RuntimeException e) { @@ -949,6 +970,7 @@ public class TransactionManager { private final InitProducerIdRequest.Builder builder; private InitProducerIdHandler(InitProducerIdRequest.Builder builder) { + super("InitProducerId"); this.builder = builder; } @@ -991,6 +1013,7 @@ public class TransactionManager { private long retryBackoffMs; private AddPartitionsToTxnHandler(AddPartitionsToTxnRequest.Builder builder) { + super("AddPartitionsToTxn"); this.builder = builder; this.retryBackoffMs = TransactionManager.this.retryBackoffMs; } @@ -1094,6 +1117,7 @@ public class TransactionManager { private final FindCoordinatorRequest.Builder builder; private FindCoordinatorHandler(FindCoordinatorRequest.Builder builder) { + super("FindCoordinator"); this.builder = builder; } @@ -1150,6 +1174,7 @@ public class TransactionManager { private final EndTxnRequest.Builder builder; private EndTxnHandler(EndTxnRequest.Builder builder) { + super("EndTxn(" + builder.result() + ")"); this.builder = builder; } @@ -1199,6 +1224,7 @@ public class TransactionManager { private AddOffsetsToTxnHandler(AddOffsetsToTxnRequest.Builder builder, Map offsets) { + super("AddOffsetsToTxn"); this.builder = builder; this.offsets = offsets; } diff --git a/clients/src/main/java/org/apache/kafka/clients/producer/internals/TransactionalRequestResult.java b/clients/src/main/java/org/apache/kafka/clients/producer/internals/TransactionalRequestResult.java index 9c02e94c045..dc1aefb0def 100644 --- a/clients/src/main/java/org/apache/kafka/clients/producer/internals/TransactionalRequestResult.java +++ b/clients/src/main/java/org/apache/kafka/clients/producer/internals/TransactionalRequestResult.java @@ -17,21 +17,26 @@ package org.apache.kafka.clients.producer.internals; +import org.apache.kafka.common.errors.InterruptException; +import org.apache.kafka.common.errors.TimeoutException; + +import java.util.Locale; import java.util.concurrent.CountDownLatch; import java.util.concurrent.TimeUnit; public final class TransactionalRequestResult { - static final TransactionalRequestResult COMPLETE = new TransactionalRequestResult(new CountDownLatch(0)); private final CountDownLatch latch; private volatile RuntimeException error = null; + private final String operation; - public TransactionalRequestResult() { - this(new CountDownLatch(1)); + public TransactionalRequestResult(String operation) { + this(new CountDownLatch(1), operation); } - private TransactionalRequestResult(CountDownLatch latch) { + private TransactionalRequestResult(CountDownLatch latch, String operation) { this.latch = latch; + this.operation = operation; } public void setError(RuntimeException error) { @@ -58,11 +63,18 @@ public final class TransactionalRequestResult { throw error(); } - public boolean await(long timeout, TimeUnit unit) throws InterruptedException { - boolean success = latch.await(timeout, unit); - if (!isSuccessful()) - throw error(); - return success; + public void await(long timeout, TimeUnit unit) { + try { + boolean success = latch.await(timeout, unit); + if (!isSuccessful()) { + throw error(); + } + if (!success) { + throw new TimeoutException("Timeout expired after " + timeout + unit.name().toLowerCase(Locale.ROOT) + " while awaiting " + operation); + } + } catch (InterruptedException e) { + throw new InterruptException("Received interrupt while awaiting " + operation, e); + } } public RuntimeException error() { diff --git a/clients/src/test/java/org/apache/kafka/clients/producer/internals/TransactionManagerTest.java b/clients/src/test/java/org/apache/kafka/clients/producer/internals/TransactionManagerTest.java index 8da76fb90fe..733541917d1 100644 --- a/clients/src/test/java/org/apache/kafka/clients/producer/internals/TransactionManagerTest.java +++ b/clients/src/test/java/org/apache/kafka/clients/producer/internals/TransactionManagerTest.java @@ -75,6 +75,7 @@ import java.util.Map; import java.util.Set; import java.util.concurrent.ExecutionException; import java.util.concurrent.Future; +import java.util.concurrent.TimeUnit; import static java.util.Collections.singleton; import static java.util.Collections.singletonList; @@ -2281,6 +2282,69 @@ public class TransactionManagerTest { assertTrue(manager.shouldResetProducerStateAfterResolvingSequences()); } + @Test + public void testRetryAbortTransaction() throws InterruptedException { + verifyCommitOrAbortTranscationRetriable(TransactionResult.ABORT, TransactionResult.ABORT); + } + + @Test + public void testRetryCommitTransaction() throws InterruptedException { + verifyCommitOrAbortTranscationRetriable(TransactionResult.COMMIT, TransactionResult.COMMIT); + } + + @Test(expected = KafkaException.class) + public void testRetryAbortTransactionAfterCommitTimeout() throws InterruptedException { + verifyCommitOrAbortTranscationRetriable(TransactionResult.COMMIT, TransactionResult.ABORT); + } + + @Test(expected = KafkaException.class) + public void testRetryCommitTransactionAfterAbortTimeout() throws InterruptedException { + verifyCommitOrAbortTranscationRetriable(TransactionResult.ABORT, TransactionResult.COMMIT); + } + + private void verifyCommitOrAbortTranscationRetriable(TransactionResult firstTransactionResult, + TransactionResult retryTransactionResult) + throws InterruptedException { + final long pid = 13131L; + final short epoch = 1; + + doInitTransactions(pid, epoch); + + transactionManager.beginTransaction(); + transactionManager.maybeAddPartitionToTransaction(tp0); + + accumulator.append(tp0, time.milliseconds(), "key".getBytes(), + "value".getBytes(), Record.EMPTY_HEADERS, null, MAX_BLOCK_TIMEOUT); + + prepareAddPartitionsToTxnResponse(Errors.NONE, tp0, epoch, pid); + + prepareProduceResponse(Errors.NONE, pid, epoch); + sender.run(time.milliseconds()); // send addPartitions. + sender.run(time.milliseconds()); // send produce request. + + TransactionalRequestResult result = firstTransactionResult == TransactionResult.COMMIT ? + transactionManager.beginCommit() : transactionManager.beginAbort(); + prepareEndTxnResponse(Errors.NONE, firstTransactionResult, pid, epoch, true); + sender.run(time.milliseconds()); + assertFalse(result.isCompleted()); + + try { + result.await(MAX_BLOCK_TIMEOUT, TimeUnit.MILLISECONDS); + fail("Should have raised TimeoutException"); + } catch (TimeoutException e) { + } + + prepareFindCoordinatorResponse(Errors.NONE, false, CoordinatorType.TRANSACTION, transactionalId); + sender.run(time.milliseconds()); + TransactionalRequestResult retryResult = retryTransactionResult == TransactionResult.COMMIT ? + transactionManager.beginCommit() : transactionManager.beginAbort(); + assertEquals(retryResult, result); // check if cached result is reused. + prepareEndTxnResponse(Errors.NONE, retryTransactionResult, pid, epoch, false); + sender.run(time.milliseconds()); + assertTrue(retryResult.isCompleted()); + assertFalse(transactionManager.hasOngoingTransaction()); + } + private void verifyAddPartitionsFailsWithPartitionLevelError(final Errors error) throws InterruptedException { final long pid = 1L; final short epoch = 1; @@ -2380,7 +2444,15 @@ public class TransactionManagerTest { } private void prepareEndTxnResponse(Errors error, final TransactionResult result, final long pid, final short epoch) { - client.prepareResponse(endTxnMatcher(result, pid, epoch), new EndTxnResponse(0, error)); + this.prepareEndTxnResponse(error, result, pid, epoch, false); + } + + private void prepareEndTxnResponse(Errors error, + final TransactionResult result, + final long pid, + final short epoch, + final boolean shouldDisconnect) { + client.prepareResponse(endTxnMatcher(result, pid, epoch), new EndTxnResponse(0, error), shouldDisconnect); } private void sendEndTxnResponse(Errors error, final TransactionResult result, final long pid, final short epoch) { diff --git a/core/src/test/scala/integration/kafka/api/AuthorizerIntegrationTest.scala b/core/src/test/scala/integration/kafka/api/AuthorizerIntegrationTest.scala index f570a44113f..e0c2832e358 100644 --- a/core/src/test/scala/integration/kafka/api/AuthorizerIntegrationTest.scala +++ b/core/src/test/scala/integration/kafka/api/AuthorizerIntegrationTest.scala @@ -101,7 +101,7 @@ class AuthorizerIntegrationTest extends BaseRequestTest { val adminClients = Buffer[AdminClient]() producerConfig.setProperty(ProducerConfig.ACKS_CONFIG, "1") - producerConfig.setProperty(ProducerConfig.MAX_BLOCK_MS_CONFIG, "3000") + producerConfig.setProperty(ProducerConfig.MAX_BLOCK_MS_CONFIG, "50000") consumerConfig.setProperty(ConsumerConfig.GROUP_ID_CONFIG, group) override def propertyOverrides(properties: Properties): Unit = { diff --git a/core/src/test/scala/integration/kafka/api/TransactionsTest.scala b/core/src/test/scala/integration/kafka/api/TransactionsTest.scala index e3b447eba0a..34dea70ca8c 100644 --- a/core/src/test/scala/integration/kafka/api/TransactionsTest.scala +++ b/core/src/test/scala/integration/kafka/api/TransactionsTest.scala @@ -28,7 +28,7 @@ import kafka.utils.TestUtils.consumeRecords import org.apache.kafka.clients.consumer.{ConsumerConfig, KafkaConsumer, OffsetAndMetadata} import org.apache.kafka.clients.producer.{KafkaProducer, ProducerRecord} import org.apache.kafka.common.{KafkaException, TopicPartition} -import org.apache.kafka.common.errors.ProducerFencedException +import org.apache.kafka.common.errors.{ProducerFencedException, TimeoutException} import org.junit.{After, Before, Test} import org.junit.Assert._ @@ -560,12 +560,25 @@ class TransactionsTest extends KafkaServerTestHarness { def testConsecutivelyRunInitTransactions(): Unit = { val producer = createTransactionalProducer(transactionalId = "normalProducer") + producer.initTransactions() + producer.initTransactions() + fail("Should have raised a KafkaException") + } + + @Test(expected = classOf[TimeoutException]) + def testCommitTransactionTimeout(): Unit = { + val producer = createTransactionalProducer("transactionalProducer", maxBlockMs = 1000) + producer.initTransactions() + producer.beginTransaction() + producer.send(new ProducerRecord[Array[Byte], Array[Byte]](topic1, "foobar".getBytes)) + + for (i <- 0 until servers.size) + killBroker(i) // pretend all brokers not available + try { - producer.initTransactions() - producer.initTransactions() - fail("Should have raised a KafkaException") + producer.commitTransaction() } finally { - producer.close() + producer.close(0, TimeUnit.MILLISECONDS) } } @@ -615,9 +628,11 @@ class TransactionsTest extends KafkaServerTestHarness { } private def createTransactionalProducer(transactionalId: String, - transactionTimeoutMs: Long = 60000): KafkaProducer[Array[Byte], Array[Byte]] = { + transactionTimeoutMs: Long = 60000, + maxBlockMs: Long = 60000): KafkaProducer[Array[Byte], Array[Byte]] = { val producer = TestUtils.createTransactionalProducer(transactionalId, servers, - transactionTimeoutMs = transactionTimeoutMs) + transactionTimeoutMs = transactionTimeoutMs, + maxBlockMs = maxBlockMs) transactionalProducers += producer producer } diff --git a/core/src/test/scala/unit/kafka/utils/TestUtils.scala b/core/src/test/scala/unit/kafka/utils/TestUtils.scala index ecdb8c8739c..ce0714df8cd 100755 --- a/core/src/test/scala/unit/kafka/utils/TestUtils.scala +++ b/core/src/test/scala/unit/kafka/utils/TestUtils.scala @@ -1284,7 +1284,8 @@ object TestUtils extends Logging { def createTransactionalProducer(transactionalId: String, servers: Seq[KafkaServer], batchSize: Int = 16384, - transactionTimeoutMs: Long = 60000) = { + transactionTimeoutMs: Long = 60000, + maxBlockMs: Long = 60000) = { val props = new Properties() props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, TestUtils.getBrokerListStrFromServers(servers)) props.put(ProducerConfig.ACKS_CONFIG, "all") @@ -1292,6 +1293,7 @@ object TestUtils extends Logging { props.put(ProducerConfig.TRANSACTIONAL_ID_CONFIG, transactionalId) props.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, "true") props.put(ProducerConfig.TRANSACTION_TIMEOUT_CONFIG, transactionTimeoutMs.toString) + props.put(ProducerConfig.MAX_BLOCK_MS_CONFIG, maxBlockMs.toString) new KafkaProducer[Array[Byte], Array[Byte]](props, new ByteArraySerializer, new ByteArraySerializer) }