Browse Source

KAFKA-5427; Transactional producer should allow FindCoordinator in error state

Author: Jason Gustafson <jason@confluent.io>

Reviewers: Ismael Juma <ismael@juma.me.uk>, Apurva Mehta <apurva@confluent.io>, Guozhang Wang <wangguoz@gmail.com>

Closes #3297 from hachikuji/KAFKA-5427
pull/3297/merge
Jason Gustafson 8 years ago
parent
commit
43e935a630
  1. 30
      clients/src/main/java/org/apache/kafka/clients/producer/internals/TransactionManager.java
  2. 124
      clients/src/test/java/org/apache/kafka/clients/producer/internals/TransactionManagerTest.java

30
clients/src/main/java/org/apache/kafka/clients/producer/internals/TransactionManager.java

@ -97,8 +97,7 @@ public class TransactionManager { @@ -97,8 +97,7 @@ public class TransactionManager {
case INITIALIZING:
return source == UNINITIALIZED;
case READY:
return source == INITIALIZING || source == COMMITTING_TRANSACTION
|| source == ABORTING_TRANSACTION || source == ABORTABLE_ERROR;
return source == INITIALIZING || source == COMMITTING_TRANSACTION || source == ABORTING_TRANSACTION;
case IN_TRANSACTION:
return source == READY;
case COMMITTING_TRANSACTION:
@ -106,8 +105,7 @@ public class TransactionManager { @@ -106,8 +105,7 @@ public class TransactionManager {
case ABORTING_TRANSACTION:
return source == IN_TRANSACTION || source == ABORTABLE_ERROR;
case ABORTABLE_ERROR:
return source == IN_TRANSACTION || source == COMMITTING_TRANSACTION || source == ABORTING_TRANSACTION
|| source == ABORTABLE_ERROR;
return source == IN_TRANSACTION || source == COMMITTING_TRANSACTION || source == ABORTABLE_ERROR;
case FATAL_ERROR:
default:
// We can transition to FATAL_ERROR unconditionally.
@ -179,7 +177,7 @@ public class TransactionManager { @@ -179,7 +177,7 @@ public class TransactionManager {
ensureTransactional();
maybeFailWithError();
transitionTo(State.COMMITTING_TRANSACTION);
return beginCompletingTransaction(true);
return beginCompletingTransaction(TransactionResult.COMMIT);
}
public synchronized TransactionalRequestResult beginAbortingTransaction() {
@ -190,14 +188,12 @@ public class TransactionManager { @@ -190,14 +188,12 @@ public class TransactionManager {
// We're aborting the transaction, so there should be no need to add new partitions
newPartitionsInTransaction.clear();
return beginCompletingTransaction(false);
return beginCompletingTransaction(TransactionResult.ABORT);
}
private TransactionalRequestResult beginCompletingTransaction(boolean isCommit) {
private TransactionalRequestResult beginCompletingTransaction(TransactionResult transactionResult) {
if (!newPartitionsInTransaction.isEmpty())
enqueueRequest(addPartitionsToTransactionHandler());
TransactionResult transactionResult = isCommit ? TransactionResult.COMMIT : TransactionResult.ABORT;
EndTxnRequest.Builder builder = new EndTxnRequest.Builder(transactionalId, producerIdAndEpoch.producerId,
producerIdAndEpoch.epoch, transactionResult);
EndTxnHandler handler = new EndTxnHandler(builder);
@ -225,7 +221,7 @@ public class TransactionManager { @@ -225,7 +221,7 @@ public class TransactionManager {
if (currentState != State.IN_TRANSACTION)
throw new IllegalStateException("Cannot add partitions to a transaction in state " + currentState);
if (partitionsInTransaction.contains(topicPartition) || pendingPartitionsInTransaction.contains(topicPartition))
if (isPartitionAdded(topicPartition) || isPartitionPendingAdd(topicPartition))
return;
log.debug("{}Begin adding new partition {} to transaction", logPrefix, topicPartition);
@ -286,6 +282,11 @@ public class TransactionManager { @@ -286,6 +282,11 @@ public class TransactionManager {
}
synchronized void transitionToAbortableError(RuntimeException exception) {
if (currentState == State.ABORTING_TRANSACTION) {
log.debug("Skipping transition to abortable error state since the transaction is already being " +
"aborted. Underlying exception: ", exception);
return;
}
transitionTo(State.ABORTABLE_ERROR, exception);
}
@ -504,13 +505,10 @@ public class TransactionManager { @@ -504,13 +505,10 @@ public class TransactionManager {
private boolean maybeTerminateRequestWithError(TxnRequestHandler requestHandler) {
if (hasError()) {
if (requestHandler instanceof EndTxnHandler) {
// we allow abort requests to break out of the error state. The state and the last error
// will be cleared when the request returns
EndTxnHandler endTxnHandler = (EndTxnHandler) requestHandler;
if (endTxnHandler.builder.result() == TransactionResult.ABORT)
if (hasAbortableError() && requestHandler instanceof FindCoordinatorHandler)
// No harm letting the FindCoordinator request go through if we're expecting to abort
return false;
}
requestHandler.fail(lastError);
return true;
}

124
clients/src/test/java/org/apache/kafka/clients/producer/internals/TransactionManagerTest.java

@ -80,6 +80,7 @@ import static java.util.Collections.singletonMap; @@ -80,6 +80,7 @@ import static java.util.Collections.singletonMap;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertNull;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
@ -1058,6 +1059,71 @@ public class TransactionManagerTest { @@ -1058,6 +1059,71 @@ public class TransactionManagerTest {
assertTrue(transactionManager.isReady()); // make sure we are ready for a transaction now.
}
@Test
public void testAbortableErrorWhileAbortInProgress() throws InterruptedException {
final long pid = 13131L;
final short epoch = 1;
doInitTransactions(pid, epoch);
transactionManager.beginTransaction();
transactionManager.maybeAddPartitionToTransaction(tp0);
Future<RecordMetadata> responseFuture = accumulator.append(tp0, time.milliseconds(), "key".getBytes(),
"value".getBytes(), Record.EMPTY_HEADERS, null, MAX_BLOCK_TIMEOUT).future;
assertFalse(responseFuture.isDone());
prepareAddPartitionsToTxnResponse(Errors.NONE, tp0, epoch, pid);
sender.run(time.milliseconds()); // Send AddPartitionsRequest
sender.run(time.milliseconds()); // Send Produce Request
TransactionalRequestResult abortResult = transactionManager.beginAbortingTransaction();
assertTrue(transactionManager.isAborting());
assertFalse(transactionManager.hasError());
sendProduceResponse(Errors.OUT_OF_ORDER_SEQUENCE_NUMBER, pid, epoch);
prepareEndTxnResponse(Errors.NONE, TransactionResult.ABORT, pid, epoch);
sender.run(time.milliseconds()); // receive the produce response
// we do not transition to ABORTABLE_ERROR since we were already aborting
assertTrue(transactionManager.isAborting());
assertFalse(transactionManager.hasError());
sender.run(time.milliseconds()); // handle the abort
assertTrue(abortResult.isCompleted());
assertTrue(abortResult.isSuccessful());
assertTrue(transactionManager.isReady()); // make sure we are ready for a transaction now.
}
@Test
public void testFindCoordinatorAllowedInAbortableErrorState() throws InterruptedException {
final long pid = 13131L;
final short epoch = 1;
doInitTransactions(pid, epoch);
transactionManager.beginTransaction();
transactionManager.maybeAddPartitionToTransaction(tp0);
Future<RecordMetadata> responseFuture = accumulator.append(tp0, time.milliseconds(), "key".getBytes(),
"value".getBytes(), Record.EMPTY_HEADERS, null, MAX_BLOCK_TIMEOUT).future;
assertFalse(responseFuture.isDone());
sender.run(time.milliseconds()); // Send AddPartitionsRequest
transactionManager.transitionToAbortableError(new KafkaException());
sendAddPartitionsToTxnResponse(Errors.NOT_COORDINATOR, tp0, epoch, pid);
sender.run(time.milliseconds()); // AddPartitions returns
assertTrue(transactionManager.hasAbortableError());
assertNull(transactionManager.coordinator(CoordinatorType.TRANSACTION));
prepareFindCoordinatorResponse(Errors.NONE, false, CoordinatorType.TRANSACTION, transactionalId);
sender.run(time.milliseconds()); // FindCoordinator handled
assertEquals(brokerNode, transactionManager.coordinator(CoordinatorType.TRANSACTION));
assertTrue(transactionManager.hasAbortableError());
}
@Test
public void testCancelUnsentAddPartitionsAndProduceOnAbort() throws InterruptedException {
final long pid = 13131L;
@ -1279,15 +1345,42 @@ public class TransactionManagerTest { @@ -1279,15 +1345,42 @@ public class TransactionManagerTest {
TransactionalRequestResult abortResult = transactionManager.beginAbortingTransaction();
prepareAddOffsetsToTxnResponse(Errors.TOPIC_AUTHORIZATION_FAILED, consumerGroupId, pid, epoch);
prepareAddOffsetsToTxnResponse(Errors.GROUP_AUTHORIZATION_FAILED, consumerGroupId, pid, epoch);
sender.run(time.milliseconds()); // Send AddOffsetsToTxnRequest
assertFalse(abortResult.isCompleted());
sender.run(time.milliseconds());
assertTrue(transactionManager.isReady());
assertTrue(abortResult.isCompleted());
assertTrue(abortResult.isSuccessful());
}
@Test
public void shouldFailAbortIfAddOffsetsFailsWithFatalError() throws Exception {
final long pid = 13131L;
final short epoch = 1;
doInitTransactions(pid, epoch);
transactionManager.beginTransaction();
Map<TopicPartition, OffsetAndMetadata> offsets = new HashMap<>();
offsets.put(tp1, new OffsetAndMetadata(1));
final String consumerGroupId = "myconsumergroup";
transactionManager.sendOffsetsToTransaction(offsets, consumerGroupId);
TransactionalRequestResult abortResult = transactionManager.beginAbortingTransaction();
prepareAddOffsetsToTxnResponse(Errors.UNKNOWN, consumerGroupId, pid, epoch);
sender.run(time.milliseconds()); // Send AddOffsetsToTxnRequest
assertFalse(abortResult.isCompleted());
sender.run(time.milliseconds());
assertTrue(abortResult.isCompleted());
assertFalse(abortResult.isSuccessful());
assertTrue(transactionManager.hasFatalError());
}
@Test
public void testNoDrainWhenPartitionsPending() throws InterruptedException {
final long pid = 13131L;
@ -1623,8 +1716,15 @@ public class TransactionManagerTest { @@ -1623,8 +1716,15 @@ public class TransactionManagerTest {
}, new InitProducerIdResponse(0, error, pid, epoch), shouldDisconnect);
}
private void sendProduceResponse(Errors error, final long pid, final short epoch) {
client.respond(produceRequestMatcher(pid, epoch), produceResponse(tp0, 0, error, 0));
}
private void prepareProduceResponse(Errors error, final long pid, final short epoch) {
client.prepareResponse(new MockClient.RequestMatcher() {
client.prepareResponse(produceRequestMatcher(pid, epoch), produceResponse(tp0, 0, error, 0));
}
private MockClient.RequestMatcher produceRequestMatcher(final long pid, final short epoch) {
return new MockClient.RequestMatcher() {
@Override
public boolean matches(AbstractRequest body) {
ProduceRequest produceRequest = (ProduceRequest) body;
@ -1640,12 +1740,24 @@ public class TransactionManagerTest { @@ -1640,12 +1740,24 @@ public class TransactionManagerTest {
assertEquals(transactionalId, produceRequest.transactionalId());
return true;
}
}, produceResponse(tp0, 0, error, 0));
};
}
private void prepareAddPartitionsToTxnResponse(Errors error, final TopicPartition topicPartition,
final short epoch, final long pid) {
client.prepareResponse(addPartitionsRequestMatcher(topicPartition, epoch, pid),
new AddPartitionsToTxnResponse(0, singletonMap(topicPartition, error)));
}
private void prepareAddPartitionsToTxnResponse(Errors error, final TopicPartition topicPartition, final short epoch, final long pid) {
client.prepareResponse(new MockClient.RequestMatcher() {
private void sendAddPartitionsToTxnResponse(Errors error, final TopicPartition topicPartition,
final short epoch, final long pid) {
client.respond(addPartitionsRequestMatcher(topicPartition, epoch, pid),
new AddPartitionsToTxnResponse(0, singletonMap(topicPartition, error)));
}
private MockClient.RequestMatcher addPartitionsRequestMatcher(final TopicPartition topicPartition,
final short epoch, final long pid) {
return new MockClient.RequestMatcher() {
@Override
public boolean matches(AbstractRequest body) {
AddPartitionsToTxnRequest addPartitionsToTxnRequest = (AddPartitionsToTxnRequest) body;
@ -1655,7 +1767,7 @@ public class TransactionManagerTest { @@ -1655,7 +1767,7 @@ public class TransactionManagerTest {
assertEquals(transactionalId, addPartitionsToTxnRequest.transactionalId());
return true;
}
}, new AddPartitionsToTxnResponse(0, singletonMap(topicPartition, error)));
};
}
private void prepareEndTxnResponse(Errors error, final TransactionResult result, final long pid, final short epoch) {

Loading…
Cancel
Save