Browse Source

KAFKA-7736; Consolidate Map usages in TransactionManager (#6270)

Refactors the various maps used in TransactionManager into one map to simplify bookkeeping of inflight batches, offsets and sequence numbers.

Reviewers: Jason Gustafson <jason@confluent.io>
pull/6290/head
Viktor Somogyi 6 years ago committed by Jason Gustafson
parent
commit
1db4667366
  1. 2
      clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java
  2. 222
      clients/src/main/java/org/apache/kafka/clients/producer/internals/TransactionManager.java
  3. 146
      clients/src/test/java/org/apache/kafka/clients/producer/internals/SenderTest.java
  4. 6
      clients/src/test/java/org/apache/kafka/clients/producer/internals/TransactionManagerTest.java

2
clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java

@ -668,7 +668,7 @@ public class Sender implements Runnable { @@ -668,7 +668,7 @@ public class Sender implements Runnable {
log.debug("ProducerId: {}; Set last ack'd sequence number for topic-partition {} to {}",
batch.producerId(),
batch.topicPartition,
transactionManager.lastAckedSequence(batch.topicPartition));
transactionManager.lastAckedSequence(batch.topicPartition).orElse(-1));
}
transactionManager.updateLastAckedOffset(response, batch);
transactionManager.removeInFlightBatch(batch);

222
clients/src/main/java/org/apache/kafka/clients/producer/internals/TransactionManager.java

@ -54,6 +54,8 @@ import java.util.HashMap; @@ -54,6 +54,8 @@ import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.Map;
import java.util.OptionalInt;
import java.util.OptionalLong;
import java.util.PriorityQueue;
import java.util.Set;
import java.util.function.Supplier;
@ -66,17 +68,84 @@ import static org.apache.kafka.common.record.RecordBatch.NO_PRODUCER_ID; @@ -66,17 +68,84 @@ import static org.apache.kafka.common.record.RecordBatch.NO_PRODUCER_ID;
*/
public class TransactionManager {
private static final int NO_INFLIGHT_REQUEST_CORRELATION_ID = -1;
private static final int NO_LAST_ACKED_SEQUENCE_NUMBER = -1;
private final Logger log;
private final String transactionalId;
private final int transactionTimeoutMs;
// The base sequence of the next batch bound for a given partition.
private final Map<TopicPartition, Integer> nextSequence;
private static class TopicPartitionBookkeeper {
// The sequence of the last record of the last ack'd batch from the given partition. When there are no
// in flight requests for a partition, the lastAckedSequence(topicPartition) == nextSequence(topicPartition) - 1.
private final Map<TopicPartition, Integer> lastAckedSequence;
private final Map<TopicPartition, TopicPartitionEntry> topicPartitionBookkeeping = new HashMap<>();
public TopicPartitionEntry getPartition(TopicPartition topic) {
TopicPartitionEntry ent = topicPartitionBookkeeping.get(topic);
if (ent == null)
throw new IllegalStateException("Trying to get the sequence number for " + topic +
", but the sequence number was never set for this partition.");
return ent;
}
public void addPartition(TopicPartition topic) {
if (!topicPartitionBookkeeping.containsKey(topic))
topicPartitionBookkeeping.put(topic, new TopicPartitionEntry());
}
boolean contains(TopicPartition partition) {
return topicPartitionBookkeeping.containsKey(partition);
}
public void reset() {
topicPartitionBookkeeping.clear();
}
OptionalLong lastAckedOffset(TopicPartition partition) {
TopicPartitionEntry entry = topicPartitionBookkeeping.get(partition);
if (entry != null && entry.lastAckedOffset != ProduceResponse.INVALID_OFFSET)
return OptionalLong.of(entry.lastAckedOffset);
else
return OptionalLong.empty();
}
OptionalInt lastAckedSequence(TopicPartition partition) {
TopicPartitionEntry entry = topicPartitionBookkeeping.get(partition);
if (entry != null && entry.lastAckedSequence != NO_LAST_ACKED_SEQUENCE_NUMBER)
return OptionalInt.of(entry.lastAckedSequence);
else
return OptionalInt.empty();
}
}
private static class TopicPartitionEntry {
// The base sequence of the next batch bound for a given partition.
private int nextSequence;
// The sequence number of the last record of the last ack'd batch from the given partition. When there are no
// in flight requests for a partition, the lastAckedSequence(topicPartition) == nextSequence(topicPartition) - 1.
private int lastAckedSequence;
// Keep track of the in flight batches bound for a partition, ordered by sequence. This helps us to ensure that
// we continue to order batches by the sequence numbers even when the responses come back out of order during
// leader failover. We add a batch to the queue when it is drained, and remove it when the batch completes
// (either successfully or through a fatal failure).
private PriorityQueue<ProducerBatch> inflightBatchesBySequence;
// We keep track of the last acknowledged offset on a per partition basis in order to disambiguate UnknownProducer
// responses which are due to the retention period elapsing, and those which are due to actual lost data.
private long lastAckedOffset;
TopicPartitionEntry() {
this.nextSequence = 0;
this.lastAckedSequence = NO_LAST_ACKED_SEQUENCE_NUMBER;
this.lastAckedOffset = ProduceResponse.INVALID_OFFSET;
this.inflightBatchesBySequence = new PriorityQueue<>(5, Comparator.comparingInt(ProducerBatch::baseSequence));
}
}
private final TopicPartitionBookkeeper topicPartitionBookkeeper;
private final Map<TopicPartition, CommittedOffset> pendingTxnOffsetCommits;
// If a batch bound for a partition expired locally after being sent at least once, the partition has is considered
// to have an unresolved state. We keep track fo such partitions here, and cannot assign any more sequence numbers
@ -86,21 +155,10 @@ public class TransactionManager { @@ -86,21 +155,10 @@ public class TransactionManager {
// consequently clear this data structure as well.
private final Set<TopicPartition> partitionsWithUnresolvedSequences;
// Keep track of the in flight batches bound for a partition, ordered by sequence. This helps us to ensure that
// we continue to order batches by the sequence numbers even when the responses come back out of order during
// leader failover. We add a batch to the queue when it is drained, and remove it when the batch completes
// (either successfully or through a fatal failure).
private final Map<TopicPartition, PriorityQueue<ProducerBatch>> inflightBatchesBySequence;
// We keep track of the last acknowledged offset on a per partition basis in order to disambiguate UnknownProducer
// responses which are due to the retention period elapsing, and those which are due to actual lost data.
private final Map<TopicPartition, Long> lastAckedOffset;
private final PriorityQueue<TxnRequestHandler> pendingRequests;
private final Set<TopicPartition> newPartitionsInTransaction;
private final Set<TopicPartition> pendingPartitionsInTransaction;
private final Set<TopicPartition> partitionsInTransaction;
private final Map<TopicPartition, CommittedOffset> pendingTxnOffsetCommits;
private TransactionalRequestResult pendingResult;
// This is used by the TxnRequestHandlers to control how long to back off before a given request is retried.
@ -173,8 +231,6 @@ public class TransactionManager { @@ -173,8 +231,6 @@ public class TransactionManager {
public TransactionManager(LogContext logContext, String transactionalId, int transactionTimeoutMs, long retryBackoffMs) {
this.producerIdAndEpoch = new ProducerIdAndEpoch(NO_PRODUCER_ID, NO_PRODUCER_EPOCH);
this.nextSequence = new HashMap<>();
this.lastAckedSequence = new HashMap<>();
this.transactionalId = transactionalId;
this.log = logContext.logger(TransactionManager.class);
this.transactionTimeoutMs = transactionTimeoutMs;
@ -183,19 +239,11 @@ public class TransactionManager { @@ -183,19 +239,11 @@ public class TransactionManager {
this.newPartitionsInTransaction = new HashSet<>();
this.pendingPartitionsInTransaction = new HashSet<>();
this.partitionsInTransaction = new HashSet<>();
this.pendingRequests = new PriorityQueue<>(10, Comparator.comparingInt(o -> o.priority().priority));
this.pendingTxnOffsetCommits = new HashMap<>();
this.pendingRequests = new PriorityQueue<>(10, new Comparator<TxnRequestHandler>() {
@Override
public int compare(TxnRequestHandler o1, TxnRequestHandler o2) {
return Integer.compare(o1.priority().priority, o2.priority().priority);
}
});
this.partitionsWithUnresolvedSequences = new HashSet<>();
this.inflightBatchesBySequence = new HashMap<>();
this.lastAckedOffset = new HashMap<>();
this.retryBackoffMs = retryBackoffMs;
this.topicPartitionBookkeeper = new TopicPartitionBookkeeper();
}
TransactionManager() {
@ -206,7 +254,6 @@ public class TransactionManager { @@ -206,7 +254,6 @@ public class TransactionManager {
return handleCachedTransactionRequestResult(() -> {
transitionTo(State.INITIALIZING);
setProducerIdAndEpoch(ProducerIdAndEpoch.NONE);
this.nextSequence.clear();
InitProducerIdRequest.Builder builder = new InitProducerIdRequest.Builder(transactionalId, transactionTimeoutMs);
InitProducerIdHandler handler = new InitProducerIdHandler(builder);
enqueueRequest(handler);
@ -273,6 +320,7 @@ public class TransactionManager { @@ -273,6 +320,7 @@ public class TransactionManager {
return;
log.debug("Begin adding new partition {} to transaction", topicPartition);
topicPartitionBookkeeper.addPartition(topicPartition);
newPartitionsInTransaction.add(topicPartition);
}
@ -280,7 +328,7 @@ public class TransactionManager { @@ -280,7 +328,7 @@ public class TransactionManager {
return lastError;
}
public synchronized void failIfNotReadyForSend() {
synchronized void failIfNotReadyForSend() {
if (hasError())
throw new KafkaException("Cannot perform send because at least one previous transactional or " +
"idempotent request has failed with errors.", lastError);
@ -401,46 +449,31 @@ public class TransactionManager { @@ -401,46 +449,31 @@ public class TransactionManager {
throw new IllegalStateException("Cannot reset producer state for a transactional producer. " +
"You must either abort the ongoing transaction or reinitialize the transactional producer instead");
setProducerIdAndEpoch(ProducerIdAndEpoch.NONE);
this.nextSequence.clear();
this.lastAckedSequence.clear();
this.inflightBatchesBySequence.clear();
topicPartitionBookkeeper.reset();
this.partitionsWithUnresolvedSequences.clear();
this.lastAckedOffset.clear();
}
/**
* Returns the next sequence number to be written to the given TopicPartition.
*/
synchronized Integer sequenceNumber(TopicPartition topicPartition) {
Integer currentSequenceNumber = nextSequence.get(topicPartition);
if (currentSequenceNumber == null) {
currentSequenceNumber = 0;
nextSequence.put(topicPartition, currentSequenceNumber);
}
return currentSequenceNumber;
if (!isTransactional())
topicPartitionBookkeeper.addPartition(topicPartition);
return topicPartitionBookkeeper.getPartition(topicPartition).nextSequence;
}
synchronized void incrementSequenceNumber(TopicPartition topicPartition, int increment) {
Integer currentSequenceNumber = nextSequence.get(topicPartition);
if (currentSequenceNumber == null)
throw new IllegalStateException("Attempt to increment sequence number for a partition with no current sequence.");
Integer currentSequence = sequenceNumber(topicPartition);
currentSequenceNumber = DefaultRecordBatch.incrementSequence(currentSequenceNumber, increment);
nextSequence.put(topicPartition, currentSequenceNumber);
currentSequence = DefaultRecordBatch.incrementSequence(currentSequence, increment);
topicPartitionBookkeeper.getPartition(topicPartition).nextSequence = currentSequence;
}
synchronized void addInFlightBatch(ProducerBatch batch) {
if (!batch.hasSequence())
throw new IllegalStateException("Can't track batch for partition " + batch.topicPartition + " when sequence is not set.");
if (!inflightBatchesBySequence.containsKey(batch.topicPartition)) {
inflightBatchesBySequence.put(batch.topicPartition, new PriorityQueue<>(5, new Comparator<ProducerBatch>() {
@Override
public int compare(ProducerBatch o1, ProducerBatch o2) {
return o1.baseSequence() - o2.baseSequence();
}
}));
}
inflightBatchesBySequence.get(batch.topicPartition).offer(batch);
topicPartitionBookkeeper.getPartition(batch.topicPartition).inflightBatchesBySequence.offer(batch);
}
/**
@ -451,56 +484,54 @@ public class TransactionManager { @@ -451,56 +484,54 @@ public class TransactionManager {
* RecordBatch.NO_SEQUENCE.
*/
synchronized int firstInFlightSequence(TopicPartition topicPartition) {
PriorityQueue<ProducerBatch> inFlightBatches = inflightBatchesBySequence.get(topicPartition);
if (inFlightBatches == null)
if (!hasInflightBatches(topicPartition))
return RecordBatch.NO_SEQUENCE;
ProducerBatch firstInFlightBatch = inFlightBatches.peek();
if (firstInFlightBatch == null)
ProducerBatch first = topicPartitionBookkeeper.getPartition(topicPartition).inflightBatchesBySequence.peek();
if (first == null)
return RecordBatch.NO_SEQUENCE;
return firstInFlightBatch.baseSequence();
return first.baseSequence();
}
synchronized ProducerBatch nextBatchBySequence(TopicPartition topicPartition) {
PriorityQueue<ProducerBatch> queue = inflightBatchesBySequence.get(topicPartition);
if (queue == null)
return null;
PriorityQueue<ProducerBatch> queue = topicPartitionBookkeeper.getPartition(topicPartition).inflightBatchesBySequence;
return queue.peek();
}
synchronized void removeInFlightBatch(ProducerBatch batch) {
PriorityQueue<ProducerBatch> queue = inflightBatchesBySequence.get(batch.topicPartition);
if (queue == null)
return;
queue.remove(batch);
if (hasInflightBatches(batch.topicPartition)) {
PriorityQueue<ProducerBatch> queue = topicPartitionBookkeeper.getPartition(batch.topicPartition).inflightBatchesBySequence;
queue.remove(batch);
}
}
synchronized void maybeUpdateLastAckedSequence(TopicPartition topicPartition, int sequence) {
if (sequence > lastAckedSequence(topicPartition))
lastAckedSequence.put(topicPartition, sequence);
if (sequence > lastAckedSequence(topicPartition).orElse(NO_LAST_ACKED_SEQUENCE_NUMBER))
topicPartitionBookkeeper.getPartition(topicPartition).lastAckedSequence = sequence;
}
synchronized int lastAckedSequence(TopicPartition topicPartition) {
Integer currentLastAckedSequence = lastAckedSequence.get(topicPartition);
if (currentLastAckedSequence == null)
return -1;
return currentLastAckedSequence;
synchronized OptionalInt lastAckedSequence(TopicPartition topicPartition) {
return topicPartitionBookkeeper.lastAckedSequence(topicPartition);
}
synchronized long lastAckedOffset(TopicPartition topicPartition) {
Long offset = lastAckedOffset.get(topicPartition);
if (offset == null)
return ProduceResponse.INVALID_OFFSET;
return offset;
synchronized OptionalLong lastAckedOffset(TopicPartition topicPartition) {
return topicPartitionBookkeeper.lastAckedOffset(topicPartition);
}
synchronized void updateLastAckedOffset(ProduceResponse.PartitionResponse response, ProducerBatch batch) {
if (response.baseOffset == ProduceResponse.INVALID_OFFSET)
return;
long lastOffset = response.baseOffset + batch.recordCount - 1;
if (lastOffset > lastAckedOffset(batch.topicPartition)) {
lastAckedOffset.put(batch.topicPartition, lastOffset);
OptionalLong lastAckedOffset = lastAckedOffset(batch.topicPartition);
// It might happen that the TransactionManager has been reset while a request was reenqueued and got a valid
// response for this. This can happen only if the producer is only idempotent (not transactional) and in
// this case there will be no tracked bookkeeper entry about it, so we have to insert one.
if (!lastAckedOffset.isPresent() && !isTransactional()) {
topicPartitionBookkeeper.addPartition(batch.topicPartition);
}
if (lastOffset > lastAckedOffset.orElse(ProduceResponse.INVALID_OFFSET)) {
topicPartitionBookkeeper.getPartition(batch.topicPartition).lastAckedOffset = lastOffset;
} else {
log.trace("Partition {} keeps lastOffset at {}", batch.topicPartition, lastOffset);
}
@ -512,7 +543,7 @@ public class TransactionManager { @@ -512,7 +543,7 @@ public class TransactionManager {
// This method must only be called when we know that the batch is question has been unequivocally failed by the broker,
// ie. it has received a confirmed fatal status code like 'Message Too Large' or something similar.
synchronized void adjustSequencesDueToFailedBatch(ProducerBatch batch) {
if (!this.nextSequence.containsKey(batch.topicPartition))
if (!topicPartitionBookkeeper.contains(batch.topicPartition))
// Sequence numbers are not being tracked for this partition. This could happen if the producer id was just
// reset due to a previous OutOfOrderSequenceException.
return;
@ -525,7 +556,7 @@ public class TransactionManager { @@ -525,7 +556,7 @@ public class TransactionManager {
setNextSequence(batch.topicPartition, currentSequence);
for (ProducerBatch inFlightBatch : inflightBatchesBySequence.get(batch.topicPartition)) {
for (ProducerBatch inFlightBatch : topicPartitionBookkeeper.getPartition(batch.topicPartition).inflightBatchesBySequence) {
if (inFlightBatch.baseSequence() < batch.baseSequence())
continue;
int newSequence = inFlightBatch.baseSequence() - batch.recordCount;
@ -540,7 +571,7 @@ public class TransactionManager { @@ -540,7 +571,7 @@ public class TransactionManager {
private synchronized void startSequencesAtBeginning(TopicPartition topicPartition) {
int sequence = 0;
for (ProducerBatch inFlightBatch : inflightBatchesBySequence.get(topicPartition)) {
for (ProducerBatch inFlightBatch : topicPartitionBookkeeper.getPartition(topicPartition).inflightBatchesBySequence) {
log.info("Resetting sequence number of batch with current sequence {} for partition {} to {}",
inFlightBatch.baseSequence(), inFlightBatch.topicPartition, sequence);
inFlightBatch.resetProducerState(new ProducerIdAndEpoch(inFlightBatch.producerId(),
@ -549,11 +580,12 @@ public class TransactionManager { @@ -549,11 +580,12 @@ public class TransactionManager {
sequence += inFlightBatch.recordCount;
}
setNextSequence(topicPartition, sequence);
lastAckedSequence.remove(topicPartition);
topicPartitionBookkeeper.getPartition(topicPartition).lastAckedSequence = NO_LAST_ACKED_SEQUENCE_NUMBER;
}
synchronized boolean hasInflightBatches(TopicPartition topicPartition) {
return inflightBatchesBySequence.containsKey(topicPartition) && !inflightBatchesBySequence.get(topicPartition).isEmpty();
private synchronized boolean hasInflightBatches(TopicPartition topicPartition) {
return topicPartitionBookkeeper.contains(topicPartition)
&& !topicPartitionBookkeeper.getPartition(topicPartition).inflightBatchesBySequence.isEmpty();
}
synchronized boolean hasUnresolvedSequences() {
@ -587,7 +619,8 @@ public class TransactionManager { @@ -587,7 +619,8 @@ public class TransactionManager {
} else {
// We would enter this branch if all in flight batches were ultimately expired in the producer.
log.info("No inflight batches remaining for {}, last ack'd sequence for partition is {}, next sequence is {}. " +
"Going to reset producer state.", topicPartition, lastAckedSequence(topicPartition), sequenceNumber(topicPartition));
"Going to reset producer state.", topicPartition,
lastAckedSequence(topicPartition).orElse(NO_LAST_ACKED_SEQUENCE_NUMBER), sequenceNumber(topicPartition));
return true;
}
}
@ -595,15 +628,12 @@ public class TransactionManager { @@ -595,15 +628,12 @@ public class TransactionManager {
return false;
}
synchronized boolean isNextSequence(TopicPartition topicPartition, int sequence) {
return sequence - lastAckedSequence(topicPartition) == 1;
private synchronized boolean isNextSequence(TopicPartition topicPartition, int sequence) {
return sequence - lastAckedSequence(topicPartition).orElse(NO_LAST_ACKED_SEQUENCE_NUMBER) == 1;
}
private synchronized void setNextSequence(TopicPartition topicPartition, int sequence) {
if (!nextSequence.containsKey(topicPartition) && sequence != 0)
throw new IllegalStateException("Trying to set the sequence number for " + topicPartition + " to " + sequence +
", but the sequence number was never set for this partition.");
nextSequence.put(topicPartition, sequence);
topicPartitionBookkeeper.getPartition(topicPartition).nextSequence = sequence;
}
synchronized TxnRequestHandler nextRequestHandler(boolean hasIncompleteBatches) {
@ -670,7 +700,7 @@ public class TransactionManager { @@ -670,7 +700,7 @@ public class TransactionManager {
inFlightRequestCorrelationId = correlationId;
}
void clearInFlightTransactionalRequestCorrelationId() {
private void clearInFlightCorrelationId() {
inFlightRequestCorrelationId = NO_INFLIGHT_REQUEST_CORRELATION_ID;
}
@ -737,7 +767,7 @@ public class TransactionManager { @@ -737,7 +767,7 @@ public class TransactionManager {
// come back from the broker, they would also come with an UNKNOWN_PRODUCER_ID error. In this case, we should not
// reset the sequence numbers to the beginning.
return true;
} else if (lastAckedOffset(batch.topicPartition) < response.logStartOffset) {
} else if (lastAckedOffset(batch.topicPartition).orElse(NO_LAST_ACKED_SEQUENCE_NUMBER) < response.logStartOffset) {
// The head of the log has been removed, probably due to the retention time elapsing. In this case,
// we expect to lose the producer state. Reset the sequences of all inflight batches to be from the beginning
// and retry them.
@ -915,7 +945,7 @@ public class TransactionManager { @@ -915,7 +945,7 @@ public class TransactionManager {
if (response.requestHeader().correlationId() != inFlightRequestCorrelationId) {
fatalError(new RuntimeException("Detected more than one in-flight transactional request."));
} else {
clearInFlightTransactionalRequestCorrelationId();
clearInFlightCorrelationId();
if (response.wasDisconnected()) {
log.debug("Disconnected from {}. Will retry.", response.destination());
if (this.needsCoordinator())

146
clients/src/test/java/org/apache/kafka/clients/producer/internals/SenderTest.java

@ -26,6 +26,8 @@ import java.util.Iterator; @@ -26,6 +26,8 @@ import java.util.Iterator;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.OptionalInt;
import java.util.OptionalLong;
import java.util.Set;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
@ -589,14 +591,14 @@ public class SenderTest { @@ -589,14 +591,14 @@ public class SenderTest {
Node node = new Node(Integer.valueOf(nodeId), "localhost", 0);
assertEquals(1, client.inFlightRequestCount());
assertEquals(1, transactionManager.sequenceNumber(tp0).longValue());
assertEquals(-1, transactionManager.lastAckedSequence(tp0));
assertEquals(OptionalInt.empty(), transactionManager.lastAckedSequence(tp0));
// Send second ProduceRequest
Future<RecordMetadata> request2 = accumulator.append(tp0, time.milliseconds(), "key".getBytes(), "value".getBytes(), null, null, MAX_BLOCK_TIMEOUT).future;
sender.run(time.milliseconds());
assertEquals(2, client.inFlightRequestCount());
assertEquals(2, transactionManager.sequenceNumber(tp0).longValue());
assertEquals(-1, transactionManager.lastAckedSequence(tp0));
assertEquals(OptionalInt.empty(), transactionManager.lastAckedSequence(tp0));
assertFalse(request1.isDone());
assertFalse(request2.isDone());
assertTrue(client.isReady(node, time.milliseconds()));
@ -606,14 +608,14 @@ public class SenderTest { @@ -606,14 +608,14 @@ public class SenderTest {
sender.run(time.milliseconds()); // receive response 0
assertEquals(1, client.inFlightRequestCount());
assertEquals(0, transactionManager.lastAckedSequence(tp0));
assertEquals(OptionalInt.of(0), transactionManager.lastAckedSequence(tp0));
assertTrue(request1.isDone());
assertEquals(0, request1.get().offset());
assertFalse(request2.isDone());
sendIdempotentProducerResponse(1, tp0, Errors.NONE, 1L);
sender.run(time.milliseconds()); // receive response 1
assertEquals(1, transactionManager.lastAckedSequence(tp0));
assertEquals(OptionalInt.of(1), transactionManager.lastAckedSequence(tp0));
assertFalse(client.hasInFlightRequests());
assertEquals(0, sender.inFlightBatches(tp0).size());
assertTrue(request2.isDone());
@ -639,7 +641,7 @@ public class SenderTest { @@ -639,7 +641,7 @@ public class SenderTest {
Node node = new Node(Integer.valueOf(nodeId), "localhost", 0);
assertEquals(1, client.inFlightRequestCount());
assertEquals(1, transactionManager.sequenceNumber(tp0).longValue());
assertEquals(-1, transactionManager.lastAckedSequence(tp0));
assertEquals(OptionalInt.empty(), transactionManager.lastAckedSequence(tp0));
// Send second ProduceRequest
Future<RecordMetadata> request2 = accumulator.append(tp0, time.milliseconds(), "key".getBytes(), "value".getBytes(), null, null, MAX_BLOCK_TIMEOUT).future;
@ -651,7 +653,7 @@ public class SenderTest { @@ -651,7 +653,7 @@ public class SenderTest {
assertEquals(3, client.inFlightRequestCount());
assertEquals(3, transactionManager.sequenceNumber(tp0).longValue());
assertEquals(-1, transactionManager.lastAckedSequence(tp0));
assertEquals(OptionalInt.empty(), transactionManager.lastAckedSequence(tp0));
assertFalse(request1.isDone());
assertFalse(request2.isDone());
assertFalse(request3.isDone());
@ -664,7 +666,7 @@ public class SenderTest { @@ -664,7 +666,7 @@ public class SenderTest {
Future<RecordMetadata> request4 = accumulator.append(tp0, time.milliseconds(), "key".getBytes(), "value".getBytes(), null, null, MAX_BLOCK_TIMEOUT).future;
assertEquals(2, client.inFlightRequestCount());
assertEquals(-1, transactionManager.lastAckedSequence(tp0));
assertEquals(OptionalInt.empty(), transactionManager.lastAckedSequence(tp0));
sendIdempotentProducerResponse(1, tp0, Errors.OUT_OF_ORDER_SEQUENCE_NUMBER, -1L);
sender.run(time.milliseconds()); // re send request 1, receive response 2
@ -672,7 +674,7 @@ public class SenderTest { @@ -672,7 +674,7 @@ public class SenderTest {
sendIdempotentProducerResponse(2, tp0, Errors.OUT_OF_ORDER_SEQUENCE_NUMBER, -1L);
sender.run(time.milliseconds()); // receive response 3
assertEquals(-1, transactionManager.lastAckedSequence(tp0));
assertEquals(OptionalInt.empty(), transactionManager.lastAckedSequence(tp0));
assertEquals(1, client.inFlightRequestCount());
sender.run(time.milliseconds()); // Do nothing, we are reduced to one in flight request during retries.
@ -680,11 +682,11 @@ public class SenderTest { @@ -680,11 +682,11 @@ public class SenderTest {
assertEquals(3, transactionManager.sequenceNumber(tp0).longValue()); // the batch for request 4 shouldn't have been drained, and hence the sequence should not have been incremented.
assertEquals(1, client.inFlightRequestCount());
assertEquals(-1, transactionManager.lastAckedSequence(tp0));
assertEquals(OptionalInt.empty(), transactionManager.lastAckedSequence(tp0));
sendIdempotentProducerResponse(0, tp0, Errors.NONE, 0L);
sender.run(time.milliseconds()); // receive response 1
assertEquals(0, transactionManager.lastAckedSequence(tp0));
assertEquals(OptionalInt.of(0), transactionManager.lastAckedSequence(tp0));
assertTrue(request1.isDone());
assertEquals(0, request1.get().offset());
assertFalse(client.hasInFlightRequests());
@ -696,7 +698,7 @@ public class SenderTest { @@ -696,7 +698,7 @@ public class SenderTest {
sendIdempotentProducerResponse(1, tp0, Errors.NONE, 1L);
sender.run(time.milliseconds()); // receive response 2
assertEquals(1, transactionManager.lastAckedSequence(tp0));
assertEquals(OptionalInt.of(1), transactionManager.lastAckedSequence(tp0));
assertTrue(request2.isDone());
assertEquals(1, request2.get().offset());
@ -709,7 +711,7 @@ public class SenderTest { @@ -709,7 +711,7 @@ public class SenderTest {
sendIdempotentProducerResponse(2, tp0, Errors.NONE, 2L);
sender.run(time.milliseconds()); // receive response 3, send request 4 since we are out of 'retry' mode.
assertEquals(2, transactionManager.lastAckedSequence(tp0));
assertEquals(OptionalInt.of(2), transactionManager.lastAckedSequence(tp0));
assertTrue(request3.isDone());
assertEquals(2, request3.get().offset());
assertEquals(1, client.inFlightRequestCount());
@ -717,7 +719,7 @@ public class SenderTest { @@ -717,7 +719,7 @@ public class SenderTest {
sendIdempotentProducerResponse(3, tp0, Errors.NONE, 3L);
sender.run(time.milliseconds()); // receive response 4
assertEquals(3, transactionManager.lastAckedSequence(tp0));
assertEquals(OptionalInt.of(3), transactionManager.lastAckedSequence(tp0));
assertTrue(request4.isDone());
assertEquals(3, request4.get().offset());
}
@ -739,14 +741,14 @@ public class SenderTest { @@ -739,14 +741,14 @@ public class SenderTest {
Node node = new Node(Integer.valueOf(nodeId), "localhost", 0);
assertEquals(1, client.inFlightRequestCount());
assertEquals(1, transactionManager.sequenceNumber(tp0).longValue());
assertEquals(-1, transactionManager.lastAckedSequence(tp0));
assertEquals(OptionalInt.empty(), transactionManager.lastAckedSequence(tp0));
// Send second ProduceRequest
Future<RecordMetadata> request2 = accumulator.append(tp0, time.milliseconds(), "key".getBytes(), "value".getBytes(), null, null, MAX_BLOCK_TIMEOUT).future;
sender.run(time.milliseconds());
assertEquals(2, client.inFlightRequestCount());
assertEquals(2, transactionManager.sequenceNumber(tp0).longValue());
assertEquals(-1, transactionManager.lastAckedSequence(tp0));
assertEquals(OptionalInt.empty(), transactionManager.lastAckedSequence(tp0));
assertFalse(request1.isDone());
assertFalse(request2.isDone());
assertTrue(client.isReady(node, time.milliseconds()));
@ -757,24 +759,24 @@ public class SenderTest { @@ -757,24 +759,24 @@ public class SenderTest {
assertFutureFailure(request1, RecordTooLargeException.class);
assertEquals(1, client.inFlightRequestCount());
assertEquals(-1, transactionManager.lastAckedSequence(tp0));
assertEquals(OptionalInt.empty(), transactionManager.lastAckedSequence(tp0));
sendIdempotentProducerResponse(1, tp0, Errors.OUT_OF_ORDER_SEQUENCE_NUMBER, -1L);
sender.run(time.milliseconds()); // receive response 1
assertEquals(-1, transactionManager.lastAckedSequence(tp0));
assertEquals(OptionalInt.empty(), transactionManager.lastAckedSequence(tp0));
assertEquals(0, client.inFlightRequestCount());
sender.run(time.milliseconds()); // resend request 1
assertEquals(1, client.inFlightRequestCount());
assertEquals(-1, transactionManager.lastAckedSequence(tp0));
assertEquals(OptionalInt.empty(), transactionManager.lastAckedSequence(tp0));
sendIdempotentProducerResponse(0, tp0, Errors.NONE, 0L);
sender.run(time.milliseconds()); // receive response 1
assertEquals(0, transactionManager.lastAckedSequence(tp0));
assertEquals(OptionalInt.of(0), transactionManager.lastAckedSequence(tp0));
assertEquals(0, client.inFlightRequestCount());
assertTrue(request1.isDone());
@ -801,7 +803,7 @@ public class SenderTest { @@ -801,7 +803,7 @@ public class SenderTest {
// make sure the next sequence number accounts for multi-message batches.
assertEquals(2, transactionManager.sequenceNumber(tp0).longValue());
assertEquals(-1, transactionManager.lastAckedSequence(tp0));
assertEquals(OptionalInt.empty(), transactionManager.lastAckedSequence(tp0));
sendIdempotentProducerResponse(0, tp0, Errors.NONE, 0);
sender.run(time.milliseconds());
@ -811,7 +813,7 @@ public class SenderTest { @@ -811,7 +813,7 @@ public class SenderTest {
sender.run(time.milliseconds());
assertEquals(1, client.inFlightRequestCount());
assertEquals(3, transactionManager.sequenceNumber(tp0).longValue());
assertEquals(1, transactionManager.lastAckedSequence(tp0));
assertEquals(OptionalInt.of(1), transactionManager.lastAckedSequence(tp0));
assertTrue(request1.isDone());
assertEquals(0, request1.get().offset());
assertFalse(request2.isDone());
@ -840,14 +842,14 @@ public class SenderTest { @@ -840,14 +842,14 @@ public class SenderTest {
Node node = new Node(Integer.valueOf(nodeId), "localhost", 0);
assertEquals(1, client.inFlightRequestCount());
assertEquals(1, transactionManager.sequenceNumber(tp0).longValue());
assertEquals(-1, transactionManager.lastAckedSequence(tp0));
assertEquals(OptionalInt.empty(), transactionManager.lastAckedSequence(tp0));
// Send second ProduceRequest
Future<RecordMetadata> request2 = accumulator.append(tp0, time.milliseconds(), "key".getBytes(), "value".getBytes(), null, null, MAX_BLOCK_TIMEOUT).future;
sender.run(time.milliseconds());
assertEquals(2, client.inFlightRequestCount());
assertEquals(2, transactionManager.sequenceNumber(tp0).longValue());
assertEquals(-1, transactionManager.lastAckedSequence(tp0));
assertEquals(OptionalInt.empty(), transactionManager.lastAckedSequence(tp0));
assertFalse(request1.isDone());
assertFalse(request2.isDone());
assertTrue(client.isReady(node, time.milliseconds()));
@ -864,7 +866,7 @@ public class SenderTest { @@ -864,7 +866,7 @@ public class SenderTest {
assertEquals(1, queuedBatches.size());
assertEquals(1, queuedBatches.peekFirst().baseSequence());
assertEquals(1, client.inFlightRequestCount());
assertEquals(-1, transactionManager.lastAckedSequence(tp0));
assertEquals(OptionalInt.empty(), transactionManager.lastAckedSequence(tp0));
client.respondToRequest(firstClientRequest, produceResponse(tp0, -1, Errors.NOT_LEADER_FOR_PARTITION, -1));
@ -874,7 +876,7 @@ public class SenderTest { @@ -874,7 +876,7 @@ public class SenderTest {
assertEquals(2, queuedBatches.size());
assertEquals(0, queuedBatches.peekFirst().baseSequence());
assertEquals(1, queuedBatches.peekLast().baseSequence());
assertEquals(-1, transactionManager.lastAckedSequence(tp0));
assertEquals(OptionalInt.empty(), transactionManager.lastAckedSequence(tp0));
assertEquals(0, client.inFlightRequestCount());
assertFalse(request1.isDone());
assertFalse(request2.isDone());
@ -884,12 +886,12 @@ public class SenderTest { @@ -884,12 +886,12 @@ public class SenderTest {
sender.run(time.milliseconds()); // don't do anything, only one inflight allowed once we are retrying.
assertEquals(1, client.inFlightRequestCount());
assertEquals(-1, transactionManager.lastAckedSequence(tp0));
assertEquals(OptionalInt.empty(), transactionManager.lastAckedSequence(tp0));
// Make sure that the requests are sent in order, even though the previous responses were not in order.
sendIdempotentProducerResponse(0, tp0, Errors.NONE, 0L);
sender.run(time.milliseconds()); // receive response 0
assertEquals(0, transactionManager.lastAckedSequence(tp0));
assertEquals(OptionalInt.of(0), transactionManager.lastAckedSequence(tp0));
assertEquals(0, client.inFlightRequestCount());
assertTrue(request1.isDone());
assertEquals(0, request1.get().offset());
@ -900,7 +902,7 @@ public class SenderTest { @@ -900,7 +902,7 @@ public class SenderTest {
sender.run(time.milliseconds()); // receive response 1
assertFalse(client.hasInFlightRequests());
assertEquals(1, transactionManager.lastAckedSequence(tp0));
assertEquals(OptionalInt.of(1), transactionManager.lastAckedSequence(tp0));
assertTrue(request2.isDone());
assertEquals(1, request2.get().offset());
}
@ -943,7 +945,7 @@ public class SenderTest { @@ -943,7 +945,7 @@ public class SenderTest {
assertEquals(0, queuedBatches.size());
assertEquals(1, client.inFlightRequestCount());
assertEquals(1, transactionManager.lastAckedSequence(tp0));
assertEquals(OptionalInt.of(1), transactionManager.lastAckedSequence(tp0));
client.respondToRequest(firstClientRequest, produceResponse(tp0, -1, Errors.REQUEST_TIMED_OUT, -1));
@ -952,20 +954,20 @@ public class SenderTest { @@ -952,20 +954,20 @@ public class SenderTest {
// Make sure we requeued both batches in the correct order.
assertEquals(1, queuedBatches.size());
assertEquals(0, queuedBatches.peekFirst().baseSequence());
assertEquals(1, transactionManager.lastAckedSequence(tp0));
assertEquals(OptionalInt.of(1), transactionManager.lastAckedSequence(tp0));
assertEquals(0, client.inFlightRequestCount());
sender.run(time.milliseconds()); // resend request 0
assertEquals(1, client.inFlightRequestCount());
assertEquals(1, client.inFlightRequestCount());
assertEquals(1, transactionManager.lastAckedSequence(tp0));
assertEquals(OptionalInt.of(1), transactionManager.lastAckedSequence(tp0));
// Make sure we handle the out of order successful responses correctly.
sendIdempotentProducerResponse(0, tp0, Errors.NONE, 0L);
sender.run(time.milliseconds()); // receive response 0
assertEquals(0, queuedBatches.size());
assertEquals(1, transactionManager.lastAckedSequence(tp0));
assertEquals(OptionalInt.of(1), transactionManager.lastAckedSequence(tp0));
assertEquals(0, client.inFlightRequestCount());
assertFalse(client.hasInFlightRequests());
@ -1169,8 +1171,7 @@ public class SenderTest { @@ -1169,8 +1171,7 @@ public class SenderTest {
"value".getBytes(), null, null, MAX_BLOCK_TIMEOUT).future;
Future<RecordMetadata> successfulResponse = accumulator.append(tp1, time.milliseconds(), "key".getBytes(),
"value".getBytes(), null, null, MAX_BLOCK_TIMEOUT).future;
sender.run(time.milliseconds()); // connect.
sender.run(time.milliseconds()); // send.
sender.run(time.milliseconds()); // connect and send.
assertEquals(1, client.inFlightRequestCount());
@ -1181,9 +1182,8 @@ public class SenderTest { @@ -1181,9 +1182,8 @@ public class SenderTest {
sender.run(time.milliseconds());
assertTrue(failedResponse.isDone());
assertFalse("Expected transaction state to be reset upon receiving an OutOfOrderSequenceException", transactionManager.hasProducerId());
prepareAndReceiveInitProducerId(producerId + 1, Errors.NONE);
prepareAndReceiveInitProducerId(producerId + 1, Errors.NONE); // also send request to tp1
assertEquals(producerId + 1, transactionManager.producerIdAndEpoch().producerId);
sender.run(time.milliseconds()); // send request to tp1
assertFalse(successfulResponse.isDone());
client.respond(produceResponse(tp1, 10, Errors.NONE, -1));
@ -1263,14 +1263,14 @@ public class SenderTest { @@ -1263,14 +1263,14 @@ public class SenderTest {
Node node = new Node(Integer.valueOf(nodeId), "localhost", 0);
assertEquals(1, client.inFlightRequestCount());
assertEquals(1, transactionManager.sequenceNumber(tp0).longValue());
assertEquals(-1, transactionManager.lastAckedSequence(tp0));
assertEquals(OptionalInt.empty(), transactionManager.lastAckedSequence(tp0));
// Send second ProduceRequest
Future<RecordMetadata> request2 = accumulator.append(tp0, time.milliseconds(), "key".getBytes(), "value".getBytes(), null, null, MAX_BLOCK_TIMEOUT).future;
sender.run(time.milliseconds());
assertEquals(2, client.inFlightRequestCount());
assertEquals(2, transactionManager.sequenceNumber(tp0).longValue());
assertEquals(-1, transactionManager.lastAckedSequence(tp0));
assertEquals(OptionalInt.empty(), transactionManager.lastAckedSequence(tp0));
assertFalse(request1.isDone());
assertFalse(request2.isDone());
assertTrue(client.isReady(node, time.milliseconds()));
@ -1282,16 +1282,16 @@ public class SenderTest { @@ -1282,16 +1282,16 @@ public class SenderTest {
sender.run(time.milliseconds()); // receive response 1
assertEquals(1000, transactionManager.lastAckedOffset(tp0));
assertEquals(1, transactionManager.lastAckedSequence(tp0));
assertEquals(OptionalLong.of(1000), transactionManager.lastAckedOffset(tp0));
assertEquals(OptionalInt.of(1), transactionManager.lastAckedSequence(tp0));
client.respondToRequest(firstClientRequest, produceResponse(tp0, ProduceResponse.INVALID_OFFSET, Errors.DUPLICATE_SEQUENCE_NUMBER, 0));
sender.run(time.milliseconds()); // receive response 0
// Make sure that the last ack'd sequence doesn't change.
assertEquals(1, transactionManager.lastAckedSequence(tp0));
assertEquals(1000, transactionManager.lastAckedOffset(tp0));
assertEquals(OptionalInt.of(1), transactionManager.lastAckedSequence(tp0));
assertEquals(OptionalLong.of(1000), transactionManager.lastAckedOffset(tp0));
assertFalse(client.hasInFlightRequests());
RecordMetadata unknownMetadata = request1.get();
@ -1315,7 +1315,7 @@ public class SenderTest { @@ -1315,7 +1315,7 @@ public class SenderTest {
assertEquals(1, client.inFlightRequestCount());
assertEquals(1, transactionManager.sequenceNumber(tp0).longValue());
assertEquals(-1, transactionManager.lastAckedSequence(tp0));
assertEquals(OptionalInt.empty(), transactionManager.lastAckedSequence(tp0));
sendIdempotentProducerResponse(0, tp0, Errors.NONE, 1000L, 10L);
@ -1323,15 +1323,15 @@ public class SenderTest { @@ -1323,15 +1323,15 @@ public class SenderTest {
assertTrue(request1.isDone());
assertEquals(1000L, request1.get().offset());
assertEquals(0L, transactionManager.lastAckedSequence(tp0));
assertEquals(1000L, transactionManager.lastAckedOffset(tp0));
assertEquals(OptionalInt.of(0), transactionManager.lastAckedSequence(tp0));
assertEquals(OptionalLong.of(1000L), transactionManager.lastAckedOffset(tp0));
// Send second ProduceRequest, a single batch with 2 records.
accumulator.append(tp0, time.milliseconds(), "key".getBytes(), "value".getBytes(), null, null, MAX_BLOCK_TIMEOUT);
Future<RecordMetadata> request2 = accumulator.append(tp0, time.milliseconds(), "key".getBytes(), "value".getBytes(), null, null, MAX_BLOCK_TIMEOUT).future;
sender.run(time.milliseconds());
assertEquals(3, transactionManager.sequenceNumber(tp0).longValue());
assertEquals(0, transactionManager.lastAckedSequence(tp0));
assertEquals(OptionalInt.of(0), transactionManager.lastAckedSequence(tp0));
assertFalse(request2.isDone());
@ -1339,7 +1339,7 @@ public class SenderTest { @@ -1339,7 +1339,7 @@ public class SenderTest {
sender.run(time.milliseconds()); // receive response 0, should be retried since the logStartOffset > lastAckedOffset.
// We should have reset the sequence number state of the partition because the state was lost on the broker.
assertEquals(-1, transactionManager.lastAckedSequence(tp0));
assertEquals(OptionalInt.empty(), transactionManager.lastAckedSequence(tp0));
assertEquals(2, transactionManager.sequenceNumber(tp0).longValue());
assertFalse(request2.isDone());
assertFalse(client.hasInFlightRequests());
@ -1349,12 +1349,12 @@ public class SenderTest { @@ -1349,12 +1349,12 @@ public class SenderTest {
// resend the request. Note that the expected sequence is 0, since we have lost producer state on the broker.
sendIdempotentProducerResponse(0, tp0, Errors.NONE, 1011L, 1010L);
sender.run(time.milliseconds()); // receive response 1
assertEquals(1, transactionManager.lastAckedSequence(tp0));
assertEquals(OptionalInt.of(1), transactionManager.lastAckedSequence(tp0));
assertEquals(2, transactionManager.sequenceNumber(tp0).longValue());
assertFalse(client.hasInFlightRequests());
assertTrue(request2.isDone());
assertEquals(1012L, request2.get().offset());
assertEquals(1012L, transactionManager.lastAckedOffset(tp0));
assertEquals(OptionalLong.of(1012L), transactionManager.lastAckedOffset(tp0));
}
@Test
@ -1373,7 +1373,7 @@ public class SenderTest { @@ -1373,7 +1373,7 @@ public class SenderTest {
assertEquals(1, client.inFlightRequestCount());
assertEquals(1, transactionManager.sequenceNumber(tp0).longValue());
assertEquals(-1, transactionManager.lastAckedSequence(tp0));
assertEquals(OptionalInt.empty(), transactionManager.lastAckedSequence(tp0));
sendIdempotentProducerResponse(0, tp0, Errors.NONE, 1000L, 10L);
@ -1381,14 +1381,14 @@ public class SenderTest { @@ -1381,14 +1381,14 @@ public class SenderTest {
assertTrue(request1.isDone());
assertEquals(1000L, request1.get().offset());
assertEquals(0L, transactionManager.lastAckedSequence(tp0));
assertEquals(1000L, transactionManager.lastAckedOffset(tp0));
assertEquals(OptionalInt.of(0), transactionManager.lastAckedSequence(tp0));
assertEquals(OptionalLong.of(1000L), transactionManager.lastAckedOffset(tp0));
// Send second ProduceRequest
Future<RecordMetadata> request2 = accumulator.append(tp0, time.milliseconds(), "key".getBytes(), "value".getBytes(), null, null, MAX_BLOCK_TIMEOUT).future;
sender.run(time.milliseconds());
assertEquals(2, transactionManager.sequenceNumber(tp0).longValue());
assertEquals(0, transactionManager.lastAckedSequence(tp0));
assertEquals(OptionalInt.of(0), transactionManager.lastAckedSequence(tp0));
assertFalse(request2.isDone());
@ -1396,7 +1396,7 @@ public class SenderTest { @@ -1396,7 +1396,7 @@ public class SenderTest {
sender.run(time.milliseconds()); // receive response 0, should be retried without resetting the sequence numbers since the log start offset is unknown.
// We should have reset the sequence number state of the partition because the state was lost on the broker.
assertEquals(0, transactionManager.lastAckedSequence(tp0));
assertEquals(OptionalInt.of(0), transactionManager.lastAckedSequence(tp0));
assertEquals(2, transactionManager.sequenceNumber(tp0).longValue());
assertFalse(request2.isDone());
assertFalse(client.hasInFlightRequests());
@ -1407,12 +1407,12 @@ public class SenderTest { @@ -1407,12 +1407,12 @@ public class SenderTest {
// response and hence we didn't reset the sequence numbers.
sendIdempotentProducerResponse(1, tp0, Errors.NONE, 1011L, 1010L);
sender.run(time.milliseconds()); // receive response 1
assertEquals(1, transactionManager.lastAckedSequence(tp0));
assertEquals(OptionalInt.of(1), transactionManager.lastAckedSequence(tp0));
assertEquals(2, transactionManager.sequenceNumber(tp0).longValue());
assertFalse(client.hasInFlightRequests());
assertTrue(request2.isDone());
assertEquals(1011L, request2.get().offset());
assertEquals(1011L, transactionManager.lastAckedOffset(tp0));
assertEquals(OptionalLong.of(1011L), transactionManager.lastAckedOffset(tp0));
}
@Test
@ -1431,7 +1431,7 @@ public class SenderTest { @@ -1431,7 +1431,7 @@ public class SenderTest {
assertEquals(1, client.inFlightRequestCount());
assertEquals(1, transactionManager.sequenceNumber(tp0).longValue());
assertEquals(-1, transactionManager.lastAckedSequence(tp0));
assertEquals(OptionalInt.empty(), transactionManager.lastAckedSequence(tp0));
sendIdempotentProducerResponse(0, tp0, Errors.NONE, 1000L, 10L);
@ -1439,21 +1439,21 @@ public class SenderTest { @@ -1439,21 +1439,21 @@ public class SenderTest {
assertTrue(request1.isDone());
assertEquals(1000L, request1.get().offset());
assertEquals(0L, transactionManager.lastAckedSequence(tp0));
assertEquals(1000L, transactionManager.lastAckedOffset(tp0));
assertEquals(OptionalInt.of(0), transactionManager.lastAckedSequence(tp0));
assertEquals(OptionalLong.of(1000L), transactionManager.lastAckedOffset(tp0));
// Send second ProduceRequest
Future<RecordMetadata> request2 = accumulator.append(tp0, time.milliseconds(), "key".getBytes(), "value".getBytes(), null, null, MAX_BLOCK_TIMEOUT).future;
sender.run(time.milliseconds());
assertEquals(2, transactionManager.sequenceNumber(tp0).longValue());
assertEquals(0, transactionManager.lastAckedSequence(tp0));
assertEquals(OptionalInt.of(0), transactionManager.lastAckedSequence(tp0));
// Send the third ProduceRequest, in parallel with the second. It should be retried even though the
// lastAckedOffset > logStartOffset when its UnknownProducerResponse comes back.
Future<RecordMetadata> request3 = accumulator.append(tp0, time.milliseconds(), "key".getBytes(), "value".getBytes(), null, null, MAX_BLOCK_TIMEOUT).future;
sender.run(time.milliseconds());
assertEquals(3, transactionManager.sequenceNumber(tp0).longValue());
assertEquals(0, transactionManager.lastAckedSequence(tp0));
assertEquals(OptionalInt.of(0), transactionManager.lastAckedSequence(tp0));
assertFalse(request2.isDone());
assertFalse(request3.isDone());
@ -1464,7 +1464,7 @@ public class SenderTest { @@ -1464,7 +1464,7 @@ public class SenderTest {
sender.run(time.milliseconds()); // receive response 2, should reset the sequence numbers and be retried.
// We should have reset the sequence number state of the partition because the state was lost on the broker.
assertEquals(-1, transactionManager.lastAckedSequence(tp0));
assertEquals(OptionalInt.empty(), transactionManager.lastAckedSequence(tp0));
assertEquals(2, transactionManager.sequenceNumber(tp0).longValue());
assertFalse(request2.isDone());
assertFalse(request3.isDone());
@ -1479,7 +1479,7 @@ public class SenderTest { @@ -1479,7 +1479,7 @@ public class SenderTest {
sender.run(time.milliseconds()); // receive response 3
assertEquals(1, client.inFlightRequestCount());
assertEquals(-1, transactionManager.lastAckedSequence(tp0));
assertEquals(OptionalInt.empty(), transactionManager.lastAckedSequence(tp0));
assertEquals(2, transactionManager.sequenceNumber(tp0).longValue());
sendIdempotentProducerResponse(0, tp0, Errors.NONE, 1011L, 1010L);
@ -1488,9 +1488,9 @@ public class SenderTest { @@ -1488,9 +1488,9 @@ public class SenderTest {
assertTrue(request2.isDone());
assertFalse(request3.isDone());
assertFalse(client.hasInFlightRequests());
assertEquals(0, transactionManager.lastAckedSequence(tp0));
assertEquals(OptionalInt.of(0), transactionManager.lastAckedSequence(tp0));
assertEquals(1011L, request2.get().offset());
assertEquals(1011L, transactionManager.lastAckedOffset(tp0));
assertEquals(OptionalLong.of(1011L), transactionManager.lastAckedOffset(tp0));
sender.run(time.milliseconds()); // resend request 3.
assertEquals(1, client.inFlightRequestCount());
@ -1501,7 +1501,7 @@ public class SenderTest { @@ -1501,7 +1501,7 @@ public class SenderTest {
assertFalse(client.hasInFlightRequests());
assertTrue(request3.isDone());
assertEquals(1012L, request3.get().offset());
assertEquals(1012L, transactionManager.lastAckedOffset(tp0));
assertEquals(OptionalLong.of(1012L), transactionManager.lastAckedOffset(tp0));
}
@Test
@ -1520,7 +1520,7 @@ public class SenderTest { @@ -1520,7 +1520,7 @@ public class SenderTest {
assertEquals(1, client.inFlightRequestCount());
assertEquals(1, transactionManager.sequenceNumber(tp0).longValue());
assertEquals(-1, transactionManager.lastAckedSequence(tp0));
assertEquals(OptionalInt.empty(), transactionManager.lastAckedSequence(tp0));
sendIdempotentProducerResponse(0, tp0, Errors.NONE, 1000L, 10L);
@ -1528,14 +1528,14 @@ public class SenderTest { @@ -1528,14 +1528,14 @@ public class SenderTest {
assertTrue(request1.isDone());
assertEquals(1000L, request1.get().offset());
assertEquals(0L, transactionManager.lastAckedSequence(tp0));
assertEquals(1000L, transactionManager.lastAckedOffset(tp0));
assertEquals(OptionalInt.of(0), transactionManager.lastAckedSequence(tp0));
assertEquals(OptionalLong.of(1000L), transactionManager.lastAckedOffset(tp0));
// Send second ProduceRequest,
Future<RecordMetadata> request2 = accumulator.append(tp0, time.milliseconds(), "key".getBytes(), "value".getBytes(), null, null, MAX_BLOCK_TIMEOUT).future;
sender.run(time.milliseconds());
assertEquals(2, transactionManager.sequenceNumber(tp0).longValue());
assertEquals(0, transactionManager.lastAckedSequence(tp0));
assertEquals(OptionalInt.of(0), transactionManager.lastAckedSequence(tp0));
assertFalse(request2.isDone());
@ -1724,7 +1724,7 @@ public class SenderTest { @@ -1724,7 +1724,7 @@ public class SenderTest {
sender.run(time.milliseconds()); // receive response
assertTrue(responseFuture.isDone());
assertEquals(0L, (long) transactionManager.lastAckedSequence(tp0));
assertEquals(OptionalInt.of(0), transactionManager.lastAckedSequence(tp0));
assertEquals(1L, (long) transactionManager.sequenceNumber(tp0));
}
@ -1883,7 +1883,7 @@ public class SenderTest { @@ -1883,7 +1883,7 @@ public class SenderTest {
sender.run(time.milliseconds()); // receive
assertTrue("The future should have been done.", f1.isDone());
assertEquals("The next sequence number should still be 2", 2, txnManager.sequenceNumber(tp).longValue());
assertEquals("The last ack'd sequence number should be 0", 0, txnManager.lastAckedSequence(tp));
assertEquals("The last ack'd sequence number should be 0", OptionalInt.of(0), txnManager.lastAckedSequence(tp));
assertFalse("The future shouldn't have been done.", f2.isDone());
assertEquals("Offset of the first message should be 0", 0L, f1.get().offset());
sender.run(time.milliseconds()); // send the seconcd produce request
@ -1900,7 +1900,7 @@ public class SenderTest { @@ -1900,7 +1900,7 @@ public class SenderTest {
sender.run(time.milliseconds()); // receive
assertTrue("The future should have been done.", f2.isDone());
assertEquals("The next sequence number should be 2", 2, txnManager.sequenceNumber(tp).longValue());
assertEquals("The last ack'd sequence number should be 1", 1, txnManager.lastAckedSequence(tp));
assertEquals("The last ack'd sequence number should be 1", OptionalInt.of(1), txnManager.lastAckedSequence(tp));
assertEquals("Offset of the first message should be 1", 1L, f2.get().offset());
assertTrue("There should be no batch in the accumulator", accumulator.batches().get(tp).isEmpty());
assertTrue("There should be a split", (Double) (m.metrics().get(senderMetrics.batchSplitRate).metricValue()) > 0);

6
clients/src/test/java/org/apache/kafka/clients/producer/internals/TransactionManagerTest.java

@ -563,12 +563,6 @@ public class TransactionManagerTest { @@ -563,12 +563,6 @@ public class TransactionManagerTest {
assertFalse(transactionManager.isSendToPartitionAllowed(tp0));
}
@Test(expected = IllegalStateException.class)
public void testInvalidSequenceIncrement() {
TransactionManager transactionManager = new TransactionManager();
transactionManager.incrementSequenceNumber(tp0, 3333);
}
@Test
public void testDefaultSequenceNumber() {
TransactionManager transactionManager = new TransactionManager();

Loading…
Cancel
Save