Browse Source

KAFKA-5422; Handle multiple transitions to ABORTABLE_ERROR correctly

Author: Apurva Mehta <apurva@confluent.io>

Reviewers: Jason Gustafson <jason@confluent.io>

Closes #3285 from apurvam/KAFKA-5422-allow-multiple-transitions-to-abortable-error
pull/3286/merge
Apurva Mehta 8 years ago committed by Jason Gustafson
parent
commit
8e7839f610
  1. 3
      clients/src/main/java/org/apache/kafka/clients/producer/internals/TransactionManager.java
  2. 65
      clients/src/test/java/org/apache/kafka/clients/producer/internals/TransactionManagerTest.java

3
clients/src/main/java/org/apache/kafka/clients/producer/internals/TransactionManager.java

@ -106,7 +106,8 @@ public class TransactionManager { @@ -106,7 +106,8 @@ public class TransactionManager {
case ABORTING_TRANSACTION:
return source == IN_TRANSACTION || source == ABORTABLE_ERROR;
case ABORTABLE_ERROR:
return source == IN_TRANSACTION || source == COMMITTING_TRANSACTION || source == ABORTING_TRANSACTION;
return source == IN_TRANSACTION || source == COMMITTING_TRANSACTION || source == ABORTING_TRANSACTION
|| source == ABORTABLE_ERROR;
case FATAL_ERROR:
default:
// We can transition to FATAL_ERROR unconditionally.

65
clients/src/test/java/org/apache/kafka/clients/producer/internals/TransactionManagerTest.java

@ -1432,6 +1432,71 @@ public class TransactionManagerTest { @@ -1432,6 +1432,71 @@ public class TransactionManagerTest {
assertTrue(transactionManager.hasAbortableError());
}
@Test
public void testTransitionToAbortableErrorOnMultipleBatchExpiry() throws InterruptedException, ExecutionException {
final long pid = 13131L;
final short epoch = 1;
doInitTransactions(pid, epoch);
transactionManager.beginTransaction();
transactionManager.maybeAddPartitionToTransaction(tp0);
transactionManager.maybeAddPartitionToTransaction(tp1);
Future<RecordMetadata> firstBatchResponse = accumulator.append(tp0, time.milliseconds(), "key".getBytes(),
"value".getBytes(), Record.EMPTY_HEADERS, null, MAX_BLOCK_TIMEOUT).future;
Future<RecordMetadata> secondBatchResponse = accumulator.append(tp1, time.milliseconds(), "key".getBytes(),
"value".getBytes(), Record.EMPTY_HEADERS, null, MAX_BLOCK_TIMEOUT).future;
assertFalse(firstBatchResponse.isDone());
assertFalse(secondBatchResponse.isDone());
Map<TopicPartition, Errors> partitionErrors = new HashMap<>();
partitionErrors.put(tp0, Errors.NONE);
partitionErrors.put(tp1, Errors.NONE);
prepareAddPartitionsToTxn(partitionErrors);
assertFalse(transactionManager.transactionContainsPartition(tp0));
assertFalse(transactionManager.isSendToPartitionAllowed(tp0));
sender.run(time.milliseconds()); // send addPartitions.
// Check that only addPartitions was sent.
assertTrue(transactionManager.transactionContainsPartition(tp0));
assertTrue(transactionManager.transactionContainsPartition(tp1));
assertTrue(transactionManager.isSendToPartitionAllowed(tp1));
assertTrue(transactionManager.isSendToPartitionAllowed(tp1));
assertFalse(firstBatchResponse.isDone());
assertFalse(secondBatchResponse.isDone());
// Sleep 10 seconds to make sure that the batches in the queue would be expired if they can't be drained.
time.sleep(10000);
// Disconnect the target node for the pending produce request. This will ensure that sender will try to
// expire the batch.
Node clusterNode = this.cluster.nodes().get(0);
client.disconnect(clusterNode.idString());
client.blackout(clusterNode, 100);
sender.run(time.milliseconds()); // We should try to flush the produce, but expire it instead without sending anything.
assertTrue(firstBatchResponse.isDone());
assertTrue(secondBatchResponse.isDone());
try {
// make sure the produce was expired.
firstBatchResponse.get();
fail("Expected to get a TimeoutException since the queued ProducerBatch should have been expired");
} catch (ExecutionException e) {
assertTrue(e.getCause() instanceof TimeoutException);
}
try {
// make sure the produce was expired.
secondBatchResponse.get();
fail("Expected to get a TimeoutException since the queued ProducerBatch should have been expired");
} catch (ExecutionException e) {
assertTrue(e.getCause() instanceof TimeoutException);
}
assertTrue(transactionManager.hasAbortableError());
}
@Test
public void testDropCommitOnBatchExpiry() throws InterruptedException, ExecutionException {
final long pid = 13131L;

Loading…
Cancel
Save