Browse Source

KAFKA-7763; Calls to commitTransaction and abortTransaction should not block indefinitely (#6066)

Currently, commitTransaction and abortTransaction wait indefinitely for the respective operation to be completed. This patch uses the producer's max block time to limit the time that we will wait. If the timeout elapses, we raise a TimeoutException, which allows the user to either close the producer or retry the operation.

Reviewers: Guozhang Wang <wangguoz@gmail.com>, Jason Gustafson <jason@confluent.io>
pull/6227/head
huxi 6 years ago committed by Jason Gustafson
parent
commit
201da05427
  1. 36
      clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java
  2. 68
      clients/src/main/java/org/apache/kafka/clients/producer/internals/TransactionManager.java
  3. 30
      clients/src/main/java/org/apache/kafka/clients/producer/internals/TransactionalRequestResult.java
  4. 74
      clients/src/test/java/org/apache/kafka/clients/producer/internals/TransactionManagerTest.java
  5. 2
      core/src/test/scala/integration/kafka/api/AuthorizerIntegrationTest.scala
  6. 29
      core/src/test/scala/integration/kafka/api/TransactionsTest.scala
  7. 4
      core/src/test/scala/unit/kafka/utils/TestUtils.scala

36
clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java

@ -254,7 +254,6 @@ public class KafkaProducer<K, V> implements Producer<K, V> { @@ -254,7 +254,6 @@ public class KafkaProducer<K, V> implements Producer<K, V> {
private final ProducerInterceptors<K, V> interceptors;
private final ApiVersions apiVersions;
private final TransactionManager transactionManager;
private TransactionalRequestResult initTransactionsResult;
/**
* A producer is instantiated by providing a set of key-value pairs as configuration. Valid configuration strings
@ -609,20 +608,9 @@ public class KafkaProducer<K, V> implements Producer<K, V> { @@ -609,20 +608,9 @@ public class KafkaProducer<K, V> implements Producer<K, V> {
*/
public void initTransactions() {
throwIfNoTransactionManager();
if (initTransactionsResult == null) {
initTransactionsResult = transactionManager.initializeTransactions();
sender.wakeup();
}
try {
if (initTransactionsResult.await(maxBlockTimeMs, TimeUnit.MILLISECONDS)) {
initTransactionsResult = null;
} else {
throw new TimeoutException("Timeout expired while initializing transactional state in " + maxBlockTimeMs + "ms.");
}
} catch (InterruptedException e) {
throw new InterruptException("Initialize transactions interrupted.", e);
}
TransactionalRequestResult result = transactionManager.initializeTransactions();
sender.wakeup();
result.await(maxBlockTimeMs, TimeUnit.MILLISECONDS);
}
/**
@ -682,6 +670,11 @@ public class KafkaProducer<K, V> implements Producer<K, V> { @@ -682,6 +670,11 @@ public class KafkaProducer<K, V> implements Producer<K, V> {
* errors, this method will throw the last received exception immediately and the transaction will not be committed.
* So all {@link #send(ProducerRecord)} calls in a transaction must succeed in order for this method to succeed.
*
* Note that this method will raise {@link TimeoutException} if the transaction cannot be committed before expiration
* of {@code max.block.ms}. Additionally, it will raise {@link InterruptException} if interrupted.
* It is safe to retry in either case, but it is not possible to attempt a different operation (such as abortTransaction)
* since the commit may already be in the progress of completing. If not retrying, the only option is to close the producer.
*
* @throws IllegalStateException if no transactional.id has been configured or no transaction has been started
* @throws ProducerFencedException fatal error indicating another producer with the same transactional.id is active
* @throws org.apache.kafka.common.errors.UnsupportedVersionException fatal error indicating the broker
@ -690,12 +683,14 @@ public class KafkaProducer<K, V> implements Producer<K, V> { @@ -690,12 +683,14 @@ public class KafkaProducer<K, V> implements Producer<K, V> {
* transactional.id is not authorized. See the exception for more details
* @throws KafkaException if the producer has encountered a previous fatal or abortable error, or for any
* other unexpected error
* @throws TimeoutException if the time taken for committing the transaction has surpassed <code>max.block.ms</code>.
* @throws InterruptException if the thread is interrupted while blocked
*/
public void commitTransaction() throws ProducerFencedException {
throwIfNoTransactionManager();
TransactionalRequestResult result = transactionManager.beginCommit();
sender.wakeup();
result.await();
result.await(maxBlockTimeMs, TimeUnit.MILLISECONDS);
}
/**
@ -703,6 +698,11 @@ public class KafkaProducer<K, V> implements Producer<K, V> { @@ -703,6 +698,11 @@ public class KafkaProducer<K, V> implements Producer<K, V> {
* This call will throw an exception immediately if any prior {@link #send(ProducerRecord)} calls failed with a
* {@link ProducerFencedException} or an instance of {@link org.apache.kafka.common.errors.AuthorizationException}.
*
* Note that this method will raise {@link TimeoutException} if the transaction cannot be aborted before expiration
* of {@code max.block.ms}. Additionally, it will raise {@link InterruptException} if interrupted.
* It is safe to retry in either case, but it is not possible to attempt a different operation (such as commitTransaction)
* since the abort may already be in the progress of completing. If not retrying, the only option is to close the producer.
*
* @throws IllegalStateException if no transactional.id has been configured or no transaction has been started
* @throws ProducerFencedException fatal error indicating another producer with the same transactional.id is active
* @throws org.apache.kafka.common.errors.UnsupportedVersionException fatal error indicating the broker
@ -710,12 +710,14 @@ public class KafkaProducer<K, V> implements Producer<K, V> { @@ -710,12 +710,14 @@ public class KafkaProducer<K, V> implements Producer<K, V> {
* @throws org.apache.kafka.common.errors.AuthorizationException fatal error indicating that the configured
* transactional.id is not authorized. See the exception for more details
* @throws KafkaException if the producer has encountered a previous fatal error or for any other unexpected error
* @throws TimeoutException if the time taken for aborting the transaction has surpassed <code>max.block.ms</code>.
* @throws InterruptException if the thread is interrupted while blocked
*/
public void abortTransaction() throws ProducerFencedException {
throwIfNoTransactionManager();
TransactionalRequestResult result = transactionManager.beginAbort();
sender.wakeup();
result.await();
result.await(maxBlockTimeMs, TimeUnit.MILLISECONDS);
}
/**

68
clients/src/main/java/org/apache/kafka/clients/producer/internals/TransactionManager.java

@ -56,6 +56,7 @@ import java.util.Iterator; @@ -56,6 +56,7 @@ import java.util.Iterator;
import java.util.Map;
import java.util.PriorityQueue;
import java.util.Set;
import java.util.function.Supplier;
import static org.apache.kafka.common.record.RecordBatch.NO_PRODUCER_EPOCH;
import static org.apache.kafka.common.record.RecordBatch.NO_PRODUCER_ID;
@ -100,6 +101,7 @@ public class TransactionManager { @@ -100,6 +101,7 @@ public class TransactionManager {
private final Set<TopicPartition> pendingPartitionsInTransaction;
private final Set<TopicPartition> partitionsInTransaction;
private final Map<TopicPartition, CommittedOffset> pendingTxnOffsetCommits;
private TransactionalRequestResult pendingResult;
// This is used by the TxnRequestHandlers to control how long to back off before a given request is retried.
// For instance, this value is lowered by the AddPartitionsToTxnHandler when it receives a CONCURRENT_TRANSACTIONS
@ -201,14 +203,15 @@ public class TransactionManager { @@ -201,14 +203,15 @@ public class TransactionManager {
}
public synchronized TransactionalRequestResult initializeTransactions() {
ensureTransactional();
transitionTo(State.INITIALIZING);
setProducerIdAndEpoch(ProducerIdAndEpoch.NONE);
this.nextSequence.clear();
InitProducerIdRequest.Builder builder = new InitProducerIdRequest.Builder(transactionalId, transactionTimeoutMs);
InitProducerIdHandler handler = new InitProducerIdHandler(builder);
enqueueRequest(handler);
return handler.result;
return handleCachedTransactionRequestResult(() -> {
transitionTo(State.INITIALIZING);
setProducerIdAndEpoch(ProducerIdAndEpoch.NONE);
this.nextSequence.clear();
InitProducerIdRequest.Builder builder = new InitProducerIdRequest.Builder(transactionalId, transactionTimeoutMs);
InitProducerIdHandler handler = new InitProducerIdHandler(builder);
enqueueRequest(handler);
return handler.result;
}, State.INITIALIZING);
}
public synchronized void beginTransaction() {
@ -218,21 +221,23 @@ public class TransactionManager { @@ -218,21 +221,23 @@ public class TransactionManager {
}
public synchronized TransactionalRequestResult beginCommit() {
ensureTransactional();
maybeFailWithError();
transitionTo(State.COMMITTING_TRANSACTION);
return beginCompletingTransaction(TransactionResult.COMMIT);
return handleCachedTransactionRequestResult(() -> {
maybeFailWithError();
transitionTo(State.COMMITTING_TRANSACTION);
return beginCompletingTransaction(TransactionResult.COMMIT);
}, State.COMMITTING_TRANSACTION);
}
public synchronized TransactionalRequestResult beginAbort() {
ensureTransactional();
if (currentState != State.ABORTABLE_ERROR)
maybeFailWithError();
transitionTo(State.ABORTING_TRANSACTION);
return handleCachedTransactionRequestResult(() -> {
if (currentState != State.ABORTABLE_ERROR)
maybeFailWithError();
transitionTo(State.ABORTING_TRANSACTION);
// We're aborting the transaction, so there should be no need to add new partitions
newPartitionsInTransaction.clear();
return beginCompletingTransaction(TransactionResult.ABORT);
// We're aborting the transaction, so there should be no need to add new partitions
newPartitionsInTransaction.clear();
return beginCompletingTransaction(TransactionResult.ABORT);
}, State.ABORTING_TRANSACTION);
}
private TransactionalRequestResult beginCompletingTransaction(TransactionResult transactionResult) {
@ -849,6 +854,22 @@ public class TransactionManager { @@ -849,6 +854,22 @@ public class TransactionManager {
return new TxnOffsetCommitHandler(result, builder);
}
private TransactionalRequestResult handleCachedTransactionRequestResult(
Supplier<TransactionalRequestResult> transactionalRequestResultSupplier,
State targetState) {
ensureTransactional();
if (pendingResult != null && currentState == targetState) {
TransactionalRequestResult result = pendingResult;
if (result.isCompleted())
pendingResult = null;
return result;
}
pendingResult = transactionalRequestResultSupplier.get();
return pendingResult;
}
abstract class TxnRequestHandler implements RequestCompletionHandler {
protected final TransactionalRequestResult result;
private boolean isRetry = false;
@ -857,8 +878,8 @@ public class TransactionManager { @@ -857,8 +878,8 @@ public class TransactionManager {
this.result = result;
}
TxnRequestHandler() {
this(new TransactionalRequestResult());
TxnRequestHandler(String operation) {
this(new TransactionalRequestResult(operation));
}
void fatalError(RuntimeException e) {
@ -949,6 +970,7 @@ public class TransactionManager { @@ -949,6 +970,7 @@ public class TransactionManager {
private final InitProducerIdRequest.Builder builder;
private InitProducerIdHandler(InitProducerIdRequest.Builder builder) {
super("InitProducerId");
this.builder = builder;
}
@ -991,6 +1013,7 @@ public class TransactionManager { @@ -991,6 +1013,7 @@ public class TransactionManager {
private long retryBackoffMs;
private AddPartitionsToTxnHandler(AddPartitionsToTxnRequest.Builder builder) {
super("AddPartitionsToTxn");
this.builder = builder;
this.retryBackoffMs = TransactionManager.this.retryBackoffMs;
}
@ -1094,6 +1117,7 @@ public class TransactionManager { @@ -1094,6 +1117,7 @@ public class TransactionManager {
private final FindCoordinatorRequest.Builder builder;
private FindCoordinatorHandler(FindCoordinatorRequest.Builder builder) {
super("FindCoordinator");
this.builder = builder;
}
@ -1150,6 +1174,7 @@ public class TransactionManager { @@ -1150,6 +1174,7 @@ public class TransactionManager {
private final EndTxnRequest.Builder builder;
private EndTxnHandler(EndTxnRequest.Builder builder) {
super("EndTxn(" + builder.result() + ")");
this.builder = builder;
}
@ -1199,6 +1224,7 @@ public class TransactionManager { @@ -1199,6 +1224,7 @@ public class TransactionManager {
private AddOffsetsToTxnHandler(AddOffsetsToTxnRequest.Builder builder,
Map<TopicPartition, OffsetAndMetadata> offsets) {
super("AddOffsetsToTxn");
this.builder = builder;
this.offsets = offsets;
}

30
clients/src/main/java/org/apache/kafka/clients/producer/internals/TransactionalRequestResult.java

@ -17,21 +17,26 @@ @@ -17,21 +17,26 @@
package org.apache.kafka.clients.producer.internals;
import org.apache.kafka.common.errors.InterruptException;
import org.apache.kafka.common.errors.TimeoutException;
import java.util.Locale;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
public final class TransactionalRequestResult {
static final TransactionalRequestResult COMPLETE = new TransactionalRequestResult(new CountDownLatch(0));
private final CountDownLatch latch;
private volatile RuntimeException error = null;
private final String operation;
public TransactionalRequestResult() {
this(new CountDownLatch(1));
public TransactionalRequestResult(String operation) {
this(new CountDownLatch(1), operation);
}
private TransactionalRequestResult(CountDownLatch latch) {
private TransactionalRequestResult(CountDownLatch latch, String operation) {
this.latch = latch;
this.operation = operation;
}
public void setError(RuntimeException error) {
@ -58,11 +63,18 @@ public final class TransactionalRequestResult { @@ -58,11 +63,18 @@ public final class TransactionalRequestResult {
throw error();
}
public boolean await(long timeout, TimeUnit unit) throws InterruptedException {
boolean success = latch.await(timeout, unit);
if (!isSuccessful())
throw error();
return success;
public void await(long timeout, TimeUnit unit) {
try {
boolean success = latch.await(timeout, unit);
if (!isSuccessful()) {
throw error();
}
if (!success) {
throw new TimeoutException("Timeout expired after " + timeout + unit.name().toLowerCase(Locale.ROOT) + " while awaiting " + operation);
}
} catch (InterruptedException e) {
throw new InterruptException("Received interrupt while awaiting " + operation, e);
}
}
public RuntimeException error() {

74
clients/src/test/java/org/apache/kafka/clients/producer/internals/TransactionManagerTest.java

@ -75,6 +75,7 @@ import java.util.Map; @@ -75,6 +75,7 @@ import java.util.Map;
import java.util.Set;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import static java.util.Collections.singleton;
import static java.util.Collections.singletonList;
@ -2281,6 +2282,69 @@ public class TransactionManagerTest { @@ -2281,6 +2282,69 @@ public class TransactionManagerTest {
assertTrue(manager.shouldResetProducerStateAfterResolvingSequences());
}
@Test
public void testRetryAbortTransaction() throws InterruptedException {
verifyCommitOrAbortTranscationRetriable(TransactionResult.ABORT, TransactionResult.ABORT);
}
@Test
public void testRetryCommitTransaction() throws InterruptedException {
verifyCommitOrAbortTranscationRetriable(TransactionResult.COMMIT, TransactionResult.COMMIT);
}
@Test(expected = KafkaException.class)
public void testRetryAbortTransactionAfterCommitTimeout() throws InterruptedException {
verifyCommitOrAbortTranscationRetriable(TransactionResult.COMMIT, TransactionResult.ABORT);
}
@Test(expected = KafkaException.class)
public void testRetryCommitTransactionAfterAbortTimeout() throws InterruptedException {
verifyCommitOrAbortTranscationRetriable(TransactionResult.ABORT, TransactionResult.COMMIT);
}
private void verifyCommitOrAbortTranscationRetriable(TransactionResult firstTransactionResult,
TransactionResult retryTransactionResult)
throws InterruptedException {
final long pid = 13131L;
final short epoch = 1;
doInitTransactions(pid, epoch);
transactionManager.beginTransaction();
transactionManager.maybeAddPartitionToTransaction(tp0);
accumulator.append(tp0, time.milliseconds(), "key".getBytes(),
"value".getBytes(), Record.EMPTY_HEADERS, null, MAX_BLOCK_TIMEOUT);
prepareAddPartitionsToTxnResponse(Errors.NONE, tp0, epoch, pid);
prepareProduceResponse(Errors.NONE, pid, epoch);
sender.run(time.milliseconds()); // send addPartitions.
sender.run(time.milliseconds()); // send produce request.
TransactionalRequestResult result = firstTransactionResult == TransactionResult.COMMIT ?
transactionManager.beginCommit() : transactionManager.beginAbort();
prepareEndTxnResponse(Errors.NONE, firstTransactionResult, pid, epoch, true);
sender.run(time.milliseconds());
assertFalse(result.isCompleted());
try {
result.await(MAX_BLOCK_TIMEOUT, TimeUnit.MILLISECONDS);
fail("Should have raised TimeoutException");
} catch (TimeoutException e) {
}
prepareFindCoordinatorResponse(Errors.NONE, false, CoordinatorType.TRANSACTION, transactionalId);
sender.run(time.milliseconds());
TransactionalRequestResult retryResult = retryTransactionResult == TransactionResult.COMMIT ?
transactionManager.beginCommit() : transactionManager.beginAbort();
assertEquals(retryResult, result); // check if cached result is reused.
prepareEndTxnResponse(Errors.NONE, retryTransactionResult, pid, epoch, false);
sender.run(time.milliseconds());
assertTrue(retryResult.isCompleted());
assertFalse(transactionManager.hasOngoingTransaction());
}
private void verifyAddPartitionsFailsWithPartitionLevelError(final Errors error) throws InterruptedException {
final long pid = 1L;
final short epoch = 1;
@ -2380,7 +2444,15 @@ public class TransactionManagerTest { @@ -2380,7 +2444,15 @@ public class TransactionManagerTest {
}
private void prepareEndTxnResponse(Errors error, final TransactionResult result, final long pid, final short epoch) {
client.prepareResponse(endTxnMatcher(result, pid, epoch), new EndTxnResponse(0, error));
this.prepareEndTxnResponse(error, result, pid, epoch, false);
}
private void prepareEndTxnResponse(Errors error,
final TransactionResult result,
final long pid,
final short epoch,
final boolean shouldDisconnect) {
client.prepareResponse(endTxnMatcher(result, pid, epoch), new EndTxnResponse(0, error), shouldDisconnect);
}
private void sendEndTxnResponse(Errors error, final TransactionResult result, final long pid, final short epoch) {

2
core/src/test/scala/integration/kafka/api/AuthorizerIntegrationTest.scala

@ -101,7 +101,7 @@ class AuthorizerIntegrationTest extends BaseRequestTest { @@ -101,7 +101,7 @@ class AuthorizerIntegrationTest extends BaseRequestTest {
val adminClients = Buffer[AdminClient]()
producerConfig.setProperty(ProducerConfig.ACKS_CONFIG, "1")
producerConfig.setProperty(ProducerConfig.MAX_BLOCK_MS_CONFIG, "3000")
producerConfig.setProperty(ProducerConfig.MAX_BLOCK_MS_CONFIG, "50000")
consumerConfig.setProperty(ConsumerConfig.GROUP_ID_CONFIG, group)
override def propertyOverrides(properties: Properties): Unit = {

29
core/src/test/scala/integration/kafka/api/TransactionsTest.scala

@ -28,7 +28,7 @@ import kafka.utils.TestUtils.consumeRecords @@ -28,7 +28,7 @@ import kafka.utils.TestUtils.consumeRecords
import org.apache.kafka.clients.consumer.{ConsumerConfig, KafkaConsumer, OffsetAndMetadata}
import org.apache.kafka.clients.producer.{KafkaProducer, ProducerRecord}
import org.apache.kafka.common.{KafkaException, TopicPartition}
import org.apache.kafka.common.errors.ProducerFencedException
import org.apache.kafka.common.errors.{ProducerFencedException, TimeoutException}
import org.junit.{After, Before, Test}
import org.junit.Assert._
@ -560,12 +560,25 @@ class TransactionsTest extends KafkaServerTestHarness { @@ -560,12 +560,25 @@ class TransactionsTest extends KafkaServerTestHarness {
def testConsecutivelyRunInitTransactions(): Unit = {
val producer = createTransactionalProducer(transactionalId = "normalProducer")
producer.initTransactions()
producer.initTransactions()
fail("Should have raised a KafkaException")
}
@Test(expected = classOf[TimeoutException])
def testCommitTransactionTimeout(): Unit = {
val producer = createTransactionalProducer("transactionalProducer", maxBlockMs = 1000)
producer.initTransactions()
producer.beginTransaction()
producer.send(new ProducerRecord[Array[Byte], Array[Byte]](topic1, "foobar".getBytes))
for (i <- 0 until servers.size)
killBroker(i) // pretend all brokers not available
try {
producer.initTransactions()
producer.initTransactions()
fail("Should have raised a KafkaException")
producer.commitTransaction()
} finally {
producer.close()
producer.close(0, TimeUnit.MILLISECONDS)
}
}
@ -615,9 +628,11 @@ class TransactionsTest extends KafkaServerTestHarness { @@ -615,9 +628,11 @@ class TransactionsTest extends KafkaServerTestHarness {
}
private def createTransactionalProducer(transactionalId: String,
transactionTimeoutMs: Long = 60000): KafkaProducer[Array[Byte], Array[Byte]] = {
transactionTimeoutMs: Long = 60000,
maxBlockMs: Long = 60000): KafkaProducer[Array[Byte], Array[Byte]] = {
val producer = TestUtils.createTransactionalProducer(transactionalId, servers,
transactionTimeoutMs = transactionTimeoutMs)
transactionTimeoutMs = transactionTimeoutMs,
maxBlockMs = maxBlockMs)
transactionalProducers += producer
producer
}

4
core/src/test/scala/unit/kafka/utils/TestUtils.scala

@ -1284,7 +1284,8 @@ object TestUtils extends Logging { @@ -1284,7 +1284,8 @@ object TestUtils extends Logging {
def createTransactionalProducer(transactionalId: String,
servers: Seq[KafkaServer],
batchSize: Int = 16384,
transactionTimeoutMs: Long = 60000) = {
transactionTimeoutMs: Long = 60000,
maxBlockMs: Long = 60000) = {
val props = new Properties()
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, TestUtils.getBrokerListStrFromServers(servers))
props.put(ProducerConfig.ACKS_CONFIG, "all")
@ -1292,6 +1293,7 @@ object TestUtils extends Logging { @@ -1292,6 +1293,7 @@ object TestUtils extends Logging {
props.put(ProducerConfig.TRANSACTIONAL_ID_CONFIG, transactionalId)
props.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, "true")
props.put(ProducerConfig.TRANSACTION_TIMEOUT_CONFIG, transactionTimeoutMs.toString)
props.put(ProducerConfig.MAX_BLOCK_MS_CONFIG, maxBlockMs.toString)
new KafkaProducer[Array[Byte], Array[Byte]](props, new ByteArraySerializer, new ByteArraySerializer)
}

Loading…
Cancel
Save