diff --git a/clients/src/main/java/org/apache/kafka/clients/producer/internals/TransactionManager.java b/clients/src/main/java/org/apache/kafka/clients/producer/internals/TransactionManager.java index a761b31db24..821c56b53a5 100644 --- a/clients/src/main/java/org/apache/kafka/clients/producer/internals/TransactionManager.java +++ b/clients/src/main/java/org/apache/kafka/clients/producer/internals/TransactionManager.java @@ -106,7 +106,8 @@ public class TransactionManager { case ABORTING_TRANSACTION: return source == IN_TRANSACTION || source == ABORTABLE_ERROR; case ABORTABLE_ERROR: - return source == IN_TRANSACTION || source == COMMITTING_TRANSACTION || source == ABORTING_TRANSACTION; + return source == IN_TRANSACTION || source == COMMITTING_TRANSACTION || source == ABORTING_TRANSACTION + || source == ABORTABLE_ERROR; case FATAL_ERROR: default: // We can transition to FATAL_ERROR unconditionally. diff --git a/clients/src/test/java/org/apache/kafka/clients/producer/internals/TransactionManagerTest.java b/clients/src/test/java/org/apache/kafka/clients/producer/internals/TransactionManagerTest.java index 22afcea9254..8d5dbe9a452 100644 --- a/clients/src/test/java/org/apache/kafka/clients/producer/internals/TransactionManagerTest.java +++ b/clients/src/test/java/org/apache/kafka/clients/producer/internals/TransactionManagerTest.java @@ -1432,6 +1432,71 @@ public class TransactionManagerTest { assertTrue(transactionManager.hasAbortableError()); } + @Test + public void testTransitionToAbortableErrorOnMultipleBatchExpiry() throws InterruptedException, ExecutionException { + final long pid = 13131L; + final short epoch = 1; + + doInitTransactions(pid, epoch); + + transactionManager.beginTransaction(); + transactionManager.maybeAddPartitionToTransaction(tp0); + transactionManager.maybeAddPartitionToTransaction(tp1); + + Future firstBatchResponse = accumulator.append(tp0, time.milliseconds(), "key".getBytes(), + "value".getBytes(), Record.EMPTY_HEADERS, null, MAX_BLOCK_TIMEOUT).future; + Future secondBatchResponse = accumulator.append(tp1, time.milliseconds(), "key".getBytes(), + "value".getBytes(), Record.EMPTY_HEADERS, null, MAX_BLOCK_TIMEOUT).future; + + assertFalse(firstBatchResponse.isDone()); + assertFalse(secondBatchResponse.isDone()); + + Map partitionErrors = new HashMap<>(); + partitionErrors.put(tp0, Errors.NONE); + partitionErrors.put(tp1, Errors.NONE); + prepareAddPartitionsToTxn(partitionErrors); + + assertFalse(transactionManager.transactionContainsPartition(tp0)); + assertFalse(transactionManager.isSendToPartitionAllowed(tp0)); + sender.run(time.milliseconds()); // send addPartitions. + // Check that only addPartitions was sent. + assertTrue(transactionManager.transactionContainsPartition(tp0)); + assertTrue(transactionManager.transactionContainsPartition(tp1)); + assertTrue(transactionManager.isSendToPartitionAllowed(tp1)); + assertTrue(transactionManager.isSendToPartitionAllowed(tp1)); + assertFalse(firstBatchResponse.isDone()); + assertFalse(secondBatchResponse.isDone()); + + // Sleep 10 seconds to make sure that the batches in the queue would be expired if they can't be drained. + time.sleep(10000); + // Disconnect the target node for the pending produce request. This will ensure that sender will try to + // expire the batch. + Node clusterNode = this.cluster.nodes().get(0); + client.disconnect(clusterNode.idString()); + client.blackout(clusterNode, 100); + + sender.run(time.milliseconds()); // We should try to flush the produce, but expire it instead without sending anything. + assertTrue(firstBatchResponse.isDone()); + assertTrue(secondBatchResponse.isDone()); + + try { + // make sure the produce was expired. + firstBatchResponse.get(); + fail("Expected to get a TimeoutException since the queued ProducerBatch should have been expired"); + } catch (ExecutionException e) { + assertTrue(e.getCause() instanceof TimeoutException); + } + + try { + // make sure the produce was expired. + secondBatchResponse.get(); + fail("Expected to get a TimeoutException since the queued ProducerBatch should have been expired"); + } catch (ExecutionException e) { + assertTrue(e.getCause() instanceof TimeoutException); + } + assertTrue(transactionManager.hasAbortableError()); + } + @Test public void testDropCommitOnBatchExpiry() throws InterruptedException, ExecutionException { final long pid = 13131L;