Browse Source

MINOR: Proactively update producer topic access time. (#7672)

Changes the ProducerMetadata to longer record a sentinel TOPIC_EXPIRY_NEEDS_UPDATE upon topic map emplacement, and instead set the expiry time directly. Previously the expiry time was being updated for all touched topics after a metadata fetch was processed, which could be seconds/minutes in the future.

Additionally propagates the current time further in the Producer, which should reduce the total number of current-time calls.

Reviewers: Ismael Juma <ismael@juma.me.uk>,  Rajini Sivaram <rajinisivaram@googlemail.com>
pull/7769/head
Brian Byrne 5 years ago committed by Rajini Sivaram
parent
commit
38fde81132
  1. 28
      clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java
  2. 8
      clients/src/main/java/org/apache/kafka/clients/producer/internals/ProducerMetadata.java
  3. 25
      clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordAccumulator.java
  4. 2
      clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java
  5. 10
      clients/src/test/java/org/apache/kafka/clients/producer/KafkaProducerTest.java
  6. 16
      clients/src/test/java/org/apache/kafka/clients/producer/internals/ProducerMetadataTest.java
  7. 82
      clients/src/test/java/org/apache/kafka/clients/producer/internals/RecordAccumulatorTest.java
  8. 203
      clients/src/test/java/org/apache/kafka/clients/producer/internals/SenderTest.java
  9. 117
      clients/src/test/java/org/apache/kafka/clients/producer/internals/TransactionManagerTest.java

28
clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java

@ -880,14 +880,16 @@ public class KafkaProducer<K, V> implements Producer<K, V> { @@ -880,14 +880,16 @@ public class KafkaProducer<K, V> implements Producer<K, V> {
try {
throwIfProducerClosed();
// first make sure the metadata for the topic is available
long nowMs = time.milliseconds();
ClusterAndWaitTime clusterAndWaitTime;
try {
clusterAndWaitTime = waitOnMetadata(record.topic(), record.partition(), maxBlockTimeMs);
clusterAndWaitTime = waitOnMetadata(record.topic(), record.partition(), nowMs, maxBlockTimeMs);
} catch (KafkaException e) {
if (metadata.isClosed())
throw new KafkaException("Producer closed while send in progress", e);
throw e;
}
nowMs += clusterAndWaitTime.waitedOnMetadataMs;
long remainingWaitMs = Math.max(0, maxBlockTimeMs - clusterAndWaitTime.waitedOnMetadataMs);
Cluster cluster = clusterAndWaitTime.cluster;
byte[] serializedKey;
@ -915,7 +917,7 @@ public class KafkaProducer<K, V> implements Producer<K, V> { @@ -915,7 +917,7 @@ public class KafkaProducer<K, V> implements Producer<K, V> {
int serializedSize = AbstractRecords.estimateSizeInBytesUpperBound(apiVersions.maxUsableProduceMagic(),
compressionType, serializedKey, serializedValue, headers);
ensureValidRecordSize(serializedSize);
long timestamp = record.timestamp() == null ? time.milliseconds() : record.timestamp();
long timestamp = record.timestamp() == null ? nowMs : record.timestamp();
if (log.isTraceEnabled()) {
log.trace("Attempting to append record {} with callback {} to topic {} partition {}", record, callback, record.topic(), partition);
}
@ -926,8 +928,8 @@ public class KafkaProducer<K, V> implements Producer<K, V> { @@ -926,8 +928,8 @@ public class KafkaProducer<K, V> implements Producer<K, V> {
transactionManager.failIfNotReadyForSend();
}
RecordAccumulator.RecordAppendResult result = accumulator.append(tp, timestamp, serializedKey,
serializedValue, headers, interceptCallback, remainingWaitMs, true);
serializedValue, headers, interceptCallback, remainingWaitMs, true, nowMs);
if (result.abortForNewBatch) {
int prevPartition = partition;
partitioner.onNewBatch(record.topic(), cluster, prevPartition);
@ -940,9 +942,9 @@ public class KafkaProducer<K, V> implements Producer<K, V> { @@ -940,9 +942,9 @@ public class KafkaProducer<K, V> implements Producer<K, V> {
interceptCallback = new InterceptorCallback<>(callback, this.interceptors, tp);
result = accumulator.append(tp, timestamp, serializedKey,
serializedValue, headers, interceptCallback, remainingWaitMs, false);
serializedValue, headers, interceptCallback, remainingWaitMs, false, nowMs);
}
if (transactionManager != null && transactionManager.isTransactional())
transactionManager.maybeAddPartitionToTransaction(tp);
@ -991,18 +993,19 @@ public class KafkaProducer<K, V> implements Producer<K, V> { @@ -991,18 +993,19 @@ public class KafkaProducer<K, V> implements Producer<K, V> {
* Wait for cluster metadata including partitions for the given topic to be available.
* @param topic The topic we want metadata for
* @param partition A specific partition expected to exist in metadata, or null if there's no preference
* @param nowMs The current time in ms
* @param maxWaitMs The maximum time in ms for waiting on the metadata
* @return The cluster containing topic metadata and the amount of time we waited in ms
* @throws KafkaException for all Kafka-related exceptions, including the case where this method is called after producer close
*/
private ClusterAndWaitTime waitOnMetadata(String topic, Integer partition, long maxWaitMs) throws InterruptedException {
private ClusterAndWaitTime waitOnMetadata(String topic, Integer partition, long nowMs, long maxWaitMs) throws InterruptedException {
// add topic to metadata topic list if it is not there already and reset expiry
Cluster cluster = metadata.fetch();
if (cluster.invalidTopics().contains(topic))
throw new InvalidTopicException(topic);
metadata.add(topic);
metadata.add(topic, nowMs);
Integer partitionsCount = cluster.partitionCountForTopic(topic);
// Return cached metadata if we have it, and if the record's partition is either undefined
@ -1010,9 +1013,8 @@ public class KafkaProducer<K, V> implements Producer<K, V> { @@ -1010,9 +1013,8 @@ public class KafkaProducer<K, V> implements Producer<K, V> {
if (partitionsCount != null && (partition == null || partition < partitionsCount))
return new ClusterAndWaitTime(cluster, 0);
long begin = time.milliseconds();
long remainingWaitMs = maxWaitMs;
long elapsed;
long elapsed = 0;
// Issue metadata requests until we have metadata for the topic and the requested partition,
// or until maxWaitTimeMs is exceeded. This is necessary in case the metadata
// is stale and the number of partitions for this topic has increased in the meantime.
@ -1022,7 +1024,7 @@ public class KafkaProducer<K, V> implements Producer<K, V> { @@ -1022,7 +1024,7 @@ public class KafkaProducer<K, V> implements Producer<K, V> {
} else {
log.trace("Requesting metadata update for topic {}.", topic);
}
metadata.add(topic);
metadata.add(topic, nowMs + elapsed);
int version = metadata.requestUpdate();
sender.wakeup();
try {
@ -1034,7 +1036,7 @@ public class KafkaProducer<K, V> implements Producer<K, V> { @@ -1034,7 +1036,7 @@ public class KafkaProducer<K, V> implements Producer<K, V> {
topic, maxWaitMs));
}
cluster = metadata.fetch();
elapsed = time.milliseconds() - begin;
elapsed = time.milliseconds() - nowMs;
if (elapsed >= maxWaitMs) {
throw new TimeoutException(partitionsCount == null ?
String.format("Topic %s not present in metadata after %d ms.",
@ -1124,7 +1126,7 @@ public class KafkaProducer<K, V> implements Producer<K, V> { @@ -1124,7 +1126,7 @@ public class KafkaProducer<K, V> implements Producer<K, V> {
public List<PartitionInfo> partitionsFor(String topic) {
Objects.requireNonNull(topic, "topic cannot be null");
try {
return waitOnMetadata(topic, null, maxBlockTimeMs).cluster.partitionsForTopic(topic);
return waitOnMetadata(topic, null, time.milliseconds(), maxBlockTimeMs).cluster.partitionsForTopic(topic);
} catch (InterruptedException e) {
throw new InterruptException(e);
}

8
clients/src/main/java/org/apache/kafka/clients/producer/internals/ProducerMetadata.java

@ -32,7 +32,6 @@ import java.util.Objects; @@ -32,7 +32,6 @@ import java.util.Objects;
import java.util.Set;
public class ProducerMetadata extends Metadata {
private static final long TOPIC_EXPIRY_NEEDS_UPDATE = -1L;
static final long TOPIC_EXPIRY_MS = 5 * 60 * 1000;
/* Topics with expiry time */
@ -55,9 +54,9 @@ public class ProducerMetadata extends Metadata { @@ -55,9 +54,9 @@ public class ProducerMetadata extends Metadata {
return new MetadataRequest.Builder(new ArrayList<>(topics.keySet()), true);
}
public synchronized void add(String topic) {
public synchronized void add(String topic, long nowMs) {
Objects.requireNonNull(topic, "topic cannot be null");
if (topics.put(topic, TOPIC_EXPIRY_NEEDS_UPDATE) == null) {
if (topics.put(topic, nowMs + TOPIC_EXPIRY_MS) == null) {
requestUpdateForNewTopics();
}
}
@ -76,9 +75,6 @@ public class ProducerMetadata extends Metadata { @@ -76,9 +75,6 @@ public class ProducerMetadata extends Metadata {
Long expireMs = topics.get(topic);
if (expireMs == null) {
return false;
} else if (expireMs == TOPIC_EXPIRY_NEEDS_UPDATE) {
topics.put(topic, nowMs + TOPIC_EXPIRY_MS);
return true;
} else if (expireMs <= nowMs) {
log.debug("Removing unused topic {} from the metadata list, expiryMs {} now {}", topic, expireMs, nowMs);
topics.remove(topic);

25
clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordAccumulator.java

@ -180,8 +180,9 @@ public final class RecordAccumulator { @@ -180,8 +180,9 @@ public final class RecordAccumulator {
* @param headers the Headers for the record
* @param callback The user-supplied callback to execute when the request is complete
* @param maxTimeToBlock The maximum time in milliseconds to block for buffer memory to be available
* @param abortOnNewBatch A boolean that indicates returning before a new batch is created and
* @param abortOnNewBatch A boolean that indicates returning before a new batch is created and
* running the the partitioner's onNewBatch method before trying to append again
* @param nowMs The current time, in milliseconds
*/
public RecordAppendResult append(TopicPartition tp,
long timestamp,
@ -190,7 +191,8 @@ public final class RecordAccumulator { @@ -190,7 +191,8 @@ public final class RecordAccumulator {
Header[] headers,
Callback callback,
long maxTimeToBlock,
boolean abortOnNewBatch) throws InterruptedException {
boolean abortOnNewBatch,
long nowMs) throws InterruptedException {
// We keep track of the number of appending thread to make sure we do not miss batches in
// abortIncompleteBatches().
appendsInProgress.incrementAndGet();
@ -202,7 +204,7 @@ public final class RecordAccumulator { @@ -202,7 +204,7 @@ public final class RecordAccumulator {
synchronized (dq) {
if (closed)
throw new KafkaException("Producer closed while send in progress");
RecordAppendResult appendResult = tryAppend(timestamp, key, value, headers, callback, dq);
RecordAppendResult appendResult = tryAppend(timestamp, key, value, headers, callback, dq, nowMs);
if (appendResult != null)
return appendResult;
}
@ -212,26 +214,29 @@ public final class RecordAccumulator { @@ -212,26 +214,29 @@ public final class RecordAccumulator {
// Return a result that will cause another call to append.
return new RecordAppendResult(null, false, false, true);
}
byte maxUsableMagic = apiVersions.maxUsableProduceMagic();
int size = Math.max(this.batchSize, AbstractRecords.estimateSizeInBytesUpperBound(maxUsableMagic, compression, key, value, headers));
log.trace("Allocating a new {} byte message buffer for topic {} partition {}", size, tp.topic(), tp.partition());
buffer = free.allocate(size, maxTimeToBlock);
// Update the current time in case the buffer allocation blocked above.
nowMs = time.milliseconds();
synchronized (dq) {
// Need to check if producer is closed again after grabbing the dequeue lock.
if (closed)
throw new KafkaException("Producer closed while send in progress");
RecordAppendResult appendResult = tryAppend(timestamp, key, value, headers, callback, dq);
RecordAppendResult appendResult = tryAppend(timestamp, key, value, headers, callback, dq, nowMs);
if (appendResult != null) {
// Somebody else found us a batch, return the one we waited for! Hopefully this doesn't happen often...
return appendResult;
}
MemoryRecordsBuilder recordsBuilder = recordsBuilder(buffer, maxUsableMagic);
ProducerBatch batch = new ProducerBatch(tp, recordsBuilder, time.milliseconds());
ProducerBatch batch = new ProducerBatch(tp, recordsBuilder, nowMs);
FutureRecordMetadata future = Objects.requireNonNull(batch.tryAppend(timestamp, key, value, headers,
callback, time.milliseconds()));
callback, nowMs));
dq.addLast(batch);
incomplete.add(batch);
@ -264,10 +269,10 @@ public final class RecordAccumulator { @@ -264,10 +269,10 @@ public final class RecordAccumulator {
* if it is expired, or when the producer is closed.
*/
private RecordAppendResult tryAppend(long timestamp, byte[] key, byte[] value, Header[] headers,
Callback callback, Deque<ProducerBatch> deque) {
Callback callback, Deque<ProducerBatch> deque, long nowMs) {
ProducerBatch last = deque.peekLast();
if (last != null) {
FutureRecordMetadata future = last.tryAppend(timestamp, key, value, headers, callback, time.milliseconds());
FutureRecordMetadata future = last.tryAppend(timestamp, key, value, headers, callback, nowMs);
if (future == null)
last.closeForRecordAppends();
else
@ -287,7 +292,7 @@ public final class RecordAccumulator { @@ -287,7 +292,7 @@ public final class RecordAccumulator {
muted.remove(tp);
return false;
}
return true;
}

2
clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java

@ -347,7 +347,7 @@ public class Sender implements Runnable { @@ -347,7 +347,7 @@ public class Sender implements Runnable {
// topics which may have expired. Add the topic again to metadata to ensure it is included
// and request metadata update, since there are messages to send to the topic.
for (String topic : result.unknownLeaderTopics)
this.metadata.add(topic);
this.metadata.add(topic, now);
log.debug("Requesting metadata update due to unknown leader topics from the batched records: {}",
result.unknownLeaderTopics);

10
clients/src/test/java/org/apache/kafka/clients/producer/KafkaProducerTest.java

@ -530,12 +530,13 @@ public class KafkaProducerTest { @@ -530,12 +530,13 @@ public class KafkaProducerTest {
@SuppressWarnings("unchecked")
Serializer<String> valueSerializer = mock(serializerClassToMock);
long nowMs = Time.SYSTEM.milliseconds();
String topic = "topic";
ProducerMetadata metadata = newMetadata(0, 90000);
metadata.add(topic);
metadata.add(topic, nowMs);
MetadataResponse initialUpdateResponse = TestUtils.metadataUpdateWith(1, singletonMap(topic, 1));
metadata.update(initialUpdateResponse, Time.SYSTEM.milliseconds());
metadata.update(initialUpdateResponse, nowMs);
KafkaProducer<String, String> producer = new KafkaProducer<>(configs, keySerializer, valueSerializer, metadata,
null, null, Time.SYSTEM);
@ -596,10 +597,11 @@ public class KafkaProducerTest { @@ -596,10 +597,11 @@ public class KafkaProducerTest {
String topic = "topic";
ProducerRecord<String, String> record = new ProducerRecord<>(topic, "value");
long nowMs = Time.SYSTEM.milliseconds();
ProducerMetadata metadata = newMetadata(0, 90000);
metadata.add(topic);
metadata.add(topic, nowMs);
MetadataResponse initialUpdateResponse = TestUtils.metadataUpdateWith(1, singletonMap(topic, 1));
metadata.update(initialUpdateResponse, Time.SYSTEM.milliseconds());
metadata.update(initialUpdateResponse, nowMs);
@SuppressWarnings("unchecked") // it is safe to suppress, since this is a mock class
ProducerInterceptors<String, String> interceptors = mock(ProducerInterceptors.class);

16
clients/src/test/java/org/apache/kafka/clients/producer/internals/ProducerMetadataTest.java

@ -56,10 +56,10 @@ public class ProducerMetadataTest { @@ -56,10 +56,10 @@ public class ProducerMetadataTest {
@Test
public void testMetadata() throws Exception {
long time = Time.SYSTEM.milliseconds();
String topic = "my-topic";
metadata.add(topic);
metadata.add(topic, time);
long time = Time.SYSTEM.milliseconds();
metadata.update(responseWithTopics(Collections.emptySet()), time);
assertTrue("No update needed.", metadata.timeToNextUpdate(time) > 0);
metadata.requestUpdate();
@ -139,17 +139,17 @@ public class ProducerMetadataTest { @@ -139,17 +139,17 @@ public class ProducerMetadataTest {
// New topic added to fetch set and update requested. It should allow immediate update.
metadata.update(responseWithCurrentTopics(), now);
metadata.add("new-topic");
metadata.add("new-topic", now);
assertEquals(0, metadata.timeToNextUpdate(now));
// Even though add is called, immediate update isn't necessary if the new topic set isn't
// containing a new topic,
metadata.update(responseWithCurrentTopics(), now);
metadata.add("new-topic");
metadata.add("new-topic", now);
assertEquals(metadataExpireMs, metadata.timeToNextUpdate(now));
// If the new set of topics containing a new topic then it should allow immediate update.
metadata.add("another-new-topic");
metadata.add("another-new-topic", now);
assertEquals(0, metadata.timeToNextUpdate(now));
}
@ -158,7 +158,7 @@ public class ProducerMetadataTest { @@ -158,7 +158,7 @@ public class ProducerMetadataTest {
// Test that topic is expired if not used within the expiry interval
long time = 0;
String topic1 = "topic1";
metadata.add(topic1);
metadata.add(topic1, time);
metadata.update(responseWithCurrentTopics(), time);
assertTrue(metadata.containsTopic(topic1));
@ -167,13 +167,13 @@ public class ProducerMetadataTest { @@ -167,13 +167,13 @@ public class ProducerMetadataTest {
assertFalse("Unused topic not expired", metadata.containsTopic(topic1));
// Test that topic is not expired if used within the expiry interval
metadata.add("topic2");
metadata.add("topic2", time);
metadata.update(responseWithCurrentTopics(), time);
for (int i = 0; i < 3; i++) {
time += ProducerMetadata.TOPIC_EXPIRY_MS / 2;
metadata.update(responseWithCurrentTopics(), time);
assertTrue("Topic expired even though in use", metadata.containsTopic("topic2"));
metadata.add("topic2");
metadata.add("topic2", time);
}
}

82
clients/src/test/java/org/apache/kafka/clients/producer/internals/RecordAccumulatorTest.java

@ -107,7 +107,7 @@ public class RecordAccumulatorTest { @@ -107,7 +107,7 @@ public class RecordAccumulatorTest {
int appends = expectedNumAppends(batchSize);
for (int i = 0; i < appends; i++) {
// append to the first batch
accum.append(tp1, 0L, key, value, Record.EMPTY_HEADERS, null, maxBlockTimeMs, false);
accum.append(tp1, 0L, key, value, Record.EMPTY_HEADERS, null, maxBlockTimeMs, false, time.milliseconds());
Deque<ProducerBatch> partitionBatches = accum.batches().get(tp1);
assertEquals(1, partitionBatches.size());
@ -118,7 +118,7 @@ public class RecordAccumulatorTest { @@ -118,7 +118,7 @@ public class RecordAccumulatorTest {
// this append doesn't fit in the first batch, so a new batch is created and the first batch is closed
accum.append(tp1, 0L, key, value, Record.EMPTY_HEADERS, null, maxBlockTimeMs, false);
accum.append(tp1, 0L, key, value, Record.EMPTY_HEADERS, null, maxBlockTimeMs, false, time.milliseconds());
Deque<ProducerBatch> partitionBatches = accum.batches().get(tp1);
assertEquals(2, partitionBatches.size());
Iterator<ProducerBatch> partitionBatchesIterator = partitionBatches.iterator();
@ -153,7 +153,7 @@ public class RecordAccumulatorTest { @@ -153,7 +153,7 @@ public class RecordAccumulatorTest {
byte[] value = new byte[2 * batchSize];
RecordAccumulator accum = createTestRecordAccumulator(
batchSize + DefaultRecordBatch.RECORD_BATCH_OVERHEAD, 10 * 1024, compressionType, 0);
accum.append(tp1, 0L, key, value, Record.EMPTY_HEADERS, null, maxBlockTimeMs, false);
accum.append(tp1, 0L, key, value, Record.EMPTY_HEADERS, null, maxBlockTimeMs, false, time.milliseconds());
assertEquals("Our partition's leader should be ready", Collections.singleton(node1), accum.ready(cluster, time.milliseconds()).readyNodes);
Deque<ProducerBatch> batches = accum.batches().get(tp1);
@ -191,7 +191,7 @@ public class RecordAccumulatorTest { @@ -191,7 +191,7 @@ public class RecordAccumulatorTest {
RecordAccumulator accum = createTestRecordAccumulator(
batchSize + DefaultRecordBatch.RECORD_BATCH_OVERHEAD, 10 * 1024, compressionType, 0);
accum.append(tp1, 0L, key, value, Record.EMPTY_HEADERS, null, maxBlockTimeMs, false);
accum.append(tp1, 0L, key, value, Record.EMPTY_HEADERS, null, maxBlockTimeMs, false, time.milliseconds());
assertEquals("Our partition's leader should be ready", Collections.singleton(node1), accum.ready(cluster, time.milliseconds()).readyNodes);
Deque<ProducerBatch> batches = accum.batches().get(tp1);
@ -215,7 +215,7 @@ public class RecordAccumulatorTest { @@ -215,7 +215,7 @@ public class RecordAccumulatorTest {
int lingerMs = 10;
RecordAccumulator accum = createTestRecordAccumulator(
1024 + DefaultRecordBatch.RECORD_BATCH_OVERHEAD, 10 * 1024, CompressionType.NONE, lingerMs);
accum.append(tp1, 0L, key, value, Record.EMPTY_HEADERS, null, maxBlockTimeMs, false);
accum.append(tp1, 0L, key, value, Record.EMPTY_HEADERS, null, maxBlockTimeMs, false, time.milliseconds());
assertEquals("No partitions should be ready", 0, accum.ready(cluster, time.milliseconds()).readyNodes.size());
time.sleep(10);
assertEquals("Our partition's leader should be ready", Collections.singleton(node1), accum.ready(cluster, time.milliseconds()).readyNodes);
@ -238,7 +238,7 @@ public class RecordAccumulatorTest { @@ -238,7 +238,7 @@ public class RecordAccumulatorTest {
List<TopicPartition> partitions = asList(tp1, tp2);
for (TopicPartition tp : partitions) {
for (int i = 0; i < appends; i++)
accum.append(tp, 0L, key, value, Record.EMPTY_HEADERS, null, maxBlockTimeMs, false);
accum.append(tp, 0L, key, value, Record.EMPTY_HEADERS, null, maxBlockTimeMs, false, time.milliseconds());
}
assertEquals("Partition's leader should be ready", Collections.singleton(node1), accum.ready(cluster, time.milliseconds()).readyNodes);
@ -260,7 +260,7 @@ public class RecordAccumulatorTest { @@ -260,7 +260,7 @@ public class RecordAccumulatorTest {
public void run() {
for (int i = 0; i < msgs; i++) {
try {
accum.append(new TopicPartition(topic, i % numParts), 0L, key, value, Record.EMPTY_HEADERS, null, maxBlockTimeMs, false);
accum.append(new TopicPartition(topic, i % numParts), 0L, key, value, Record.EMPTY_HEADERS, null, maxBlockTimeMs, false, time.milliseconds());
} catch (Exception e) {
e.printStackTrace();
}
@ -304,7 +304,7 @@ public class RecordAccumulatorTest { @@ -304,7 +304,7 @@ public class RecordAccumulatorTest {
// Partition on node1 only
for (int i = 0; i < appends; i++)
accum.append(tp1, 0L, key, value, Record.EMPTY_HEADERS, null, maxBlockTimeMs, false);
accum.append(tp1, 0L, key, value, Record.EMPTY_HEADERS, null, maxBlockTimeMs, false, time.milliseconds());
RecordAccumulator.ReadyCheckResult result = accum.ready(cluster, time.milliseconds());
assertEquals("No nodes should be ready.", 0, result.readyNodes.size());
assertEquals("Next check time should be the linger time", lingerMs, result.nextReadyCheckDelayMs);
@ -313,14 +313,14 @@ public class RecordAccumulatorTest { @@ -313,14 +313,14 @@ public class RecordAccumulatorTest {
// Add partition on node2 only
for (int i = 0; i < appends; i++)
accum.append(tp3, 0L, key, value, Record.EMPTY_HEADERS, null, maxBlockTimeMs, false);
accum.append(tp3, 0L, key, value, Record.EMPTY_HEADERS, null, maxBlockTimeMs, false, time.milliseconds());
result = accum.ready(cluster, time.milliseconds());
assertEquals("No nodes should be ready.", 0, result.readyNodes.size());
assertEquals("Next check time should be defined by node1, half remaining linger time", lingerMs / 2, result.nextReadyCheckDelayMs);
// Add data for another partition on node1, enough to make data sendable immediately
for (int i = 0; i < appends + 1; i++)
accum.append(tp2, 0L, key, value, Record.EMPTY_HEADERS, null, maxBlockTimeMs, false);
accum.append(tp2, 0L, key, value, Record.EMPTY_HEADERS, null, maxBlockTimeMs, false, time.milliseconds());
result = accum.ready(cluster, time.milliseconds());
assertEquals("Node1 should be ready", Collections.singleton(node1), result.readyNodes);
// Note this can actually be < linger time because it may use delays from partitions that aren't sendable
@ -342,7 +342,7 @@ public class RecordAccumulatorTest { @@ -342,7 +342,7 @@ public class RecordAccumulatorTest {
new BufferPool(totalSize, batchSize, metrics, time, metricGrpName));
long now = time.milliseconds();
accum.append(tp1, 0L, key, value, Record.EMPTY_HEADERS, null, maxBlockTimeMs, false);
accum.append(tp1, 0L, key, value, Record.EMPTY_HEADERS, null, maxBlockTimeMs, false, time.milliseconds());
RecordAccumulator.ReadyCheckResult result = accum.ready(cluster, now + lingerMs + 1);
assertEquals("Node1 should be ready", Collections.singleton(node1), result.readyNodes);
Map<Integer, List<ProducerBatch>> batches = accum.drain(cluster, result.readyNodes, Integer.MAX_VALUE, now + lingerMs + 1);
@ -354,7 +354,7 @@ public class RecordAccumulatorTest { @@ -354,7 +354,7 @@ public class RecordAccumulatorTest {
accum.reenqueue(batches.get(0).get(0), now);
// Put message for partition 1 into accumulator
accum.append(tp2, 0L, key, value, Record.EMPTY_HEADERS, null, maxBlockTimeMs, false);
accum.append(tp2, 0L, key, value, Record.EMPTY_HEADERS, null, maxBlockTimeMs, false, time.milliseconds());
result = accum.ready(cluster, now + lingerMs + 1);
assertEquals("Node1 should be ready", Collections.singleton(node1), result.readyNodes);
@ -380,7 +380,7 @@ public class RecordAccumulatorTest { @@ -380,7 +380,7 @@ public class RecordAccumulatorTest {
4 * 1024 + DefaultRecordBatch.RECORD_BATCH_OVERHEAD, 64 * 1024, CompressionType.NONE, lingerMs);
for (int i = 0; i < 100; i++) {
accum.append(new TopicPartition(topic, i % 3), 0L, key, value, Record.EMPTY_HEADERS, null, maxBlockTimeMs, false);
accum.append(new TopicPartition(topic, i % 3), 0L, key, value, Record.EMPTY_HEADERS, null, maxBlockTimeMs, false, time.milliseconds());
assertTrue(accum.hasIncomplete());
}
RecordAccumulator.ReadyCheckResult result = accum.ready(cluster, time.milliseconds());
@ -418,7 +418,7 @@ public class RecordAccumulatorTest { @@ -418,7 +418,7 @@ public class RecordAccumulatorTest {
public void testAwaitFlushComplete() throws Exception {
RecordAccumulator accum = createTestRecordAccumulator(
4 * 1024 + DefaultRecordBatch.RECORD_BATCH_OVERHEAD, 64 * 1024, CompressionType.NONE, Integer.MAX_VALUE);
accum.append(new TopicPartition(topic, 0), 0L, key, value, Record.EMPTY_HEADERS, null, maxBlockTimeMs, false);
accum.append(new TopicPartition(topic, 0), 0L, key, value, Record.EMPTY_HEADERS, null, maxBlockTimeMs, false, time.milliseconds());
accum.beginFlush();
assertTrue(accum.flushInProgress());
@ -447,7 +447,7 @@ public class RecordAccumulatorTest { @@ -447,7 +447,7 @@ public class RecordAccumulatorTest {
}
}
for (int i = 0; i < numRecords; i++)
accum.append(new TopicPartition(topic, i % 3), 0L, key, value, null, new TestCallback(), maxBlockTimeMs, false);
accum.append(new TopicPartition(topic, i % 3), 0L, key, value, null, new TestCallback(), maxBlockTimeMs, false, time.milliseconds());
RecordAccumulator.ReadyCheckResult result = accum.ready(cluster, time.milliseconds());
assertFalse(result.readyNodes.isEmpty());
Map<Integer, List<ProducerBatch>> drained = accum.drain(cluster, result.readyNodes, Integer.MAX_VALUE, time.milliseconds());
@ -488,7 +488,7 @@ public class RecordAccumulatorTest { @@ -488,7 +488,7 @@ public class RecordAccumulatorTest {
}
}
for (int i = 0; i < numRecords; i++)
accum.append(new TopicPartition(topic, i % 3), 0L, key, value, null, new TestCallback(), maxBlockTimeMs, false);
accum.append(new TopicPartition(topic, i % 3), 0L, key, value, null, new TestCallback(), maxBlockTimeMs, false, time.milliseconds());
RecordAccumulator.ReadyCheckResult result = accum.ready(cluster, time.milliseconds());
assertFalse(result.readyNodes.isEmpty());
Map<Integer, List<ProducerBatch>> drained = accum.drain(cluster, result.readyNodes, Integer.MAX_VALUE,
@ -527,7 +527,7 @@ public class RecordAccumulatorTest { @@ -527,7 +527,7 @@ public class RecordAccumulatorTest {
for (Boolean mute: muteStates) {
if (time.milliseconds() < System.currentTimeMillis())
time.setCurrentTimeMs(System.currentTimeMillis());
accum.append(tp1, 0L, key, value, Record.EMPTY_HEADERS, null, maxBlockTimeMs, false);
accum.append(tp1, 0L, key, value, Record.EMPTY_HEADERS, null, maxBlockTimeMs, false, time.milliseconds());
assertEquals("No partition should be ready.", 0, accum.ready(cluster, time.milliseconds()).readyNodes.size());
time.sleep(lingerMs);
@ -576,11 +576,11 @@ public class RecordAccumulatorTest { @@ -576,11 +576,11 @@ public class RecordAccumulatorTest {
// Test batches not in retry
for (int i = 0; i < appends; i++) {
accum.append(tp1, 0L, key, value, Record.EMPTY_HEADERS, null, maxBlockTimeMs, false);
accum.append(tp1, 0L, key, value, Record.EMPTY_HEADERS, null, maxBlockTimeMs, false, time.milliseconds());
assertEquals("No partitions should be ready.", 0, accum.ready(cluster, time.milliseconds()).readyNodes.size());
}
// Make the batches ready due to batch full
accum.append(tp1, 0L, key, value, Record.EMPTY_HEADERS, null, 0, false);
accum.append(tp1, 0L, key, value, Record.EMPTY_HEADERS, null, 0, false, time.milliseconds());
Set<Node> readyNodes = accum.ready(cluster, time.milliseconds()).readyNodes;
assertEquals("Our partition's leader should be ready", Collections.singleton(node1), readyNodes);
// Advance the clock to expire the batch.
@ -610,7 +610,7 @@ public class RecordAccumulatorTest { @@ -610,7 +610,7 @@ public class RecordAccumulatorTest {
// Test batches in retry.
// Create a retried batch
accum.append(tp1, 0L, key, value, Record.EMPTY_HEADERS, null, 0, false);
accum.append(tp1, 0L, key, value, Record.EMPTY_HEADERS, null, 0, false, time.milliseconds());
time.sleep(lingerMs);
readyNodes = accum.ready(cluster, time.milliseconds()).readyNodes;
assertEquals("Our partition's leader should be ready", Collections.singleton(node1), readyNodes);
@ -634,7 +634,7 @@ public class RecordAccumulatorTest { @@ -634,7 +634,7 @@ public class RecordAccumulatorTest {
assertEquals("All batches should have been expired.", 0, expiredBatches.size());
// Test that when being throttled muted batches are expired before the throttle time is over.
accum.append(tp1, 0L, key, value, Record.EMPTY_HEADERS, null, 0, false);
accum.append(tp1, 0L, key, value, Record.EMPTY_HEADERS, null, 0, false, time.milliseconds());
time.sleep(lingerMs);
readyNodes = accum.ready(cluster, time.milliseconds()).readyNodes;
assertEquals("Our partition's leader should be ready", Collections.singleton(node1), readyNodes);
@ -667,7 +667,7 @@ public class RecordAccumulatorTest { @@ -667,7 +667,7 @@ public class RecordAccumulatorTest {
batchSize + DefaultRecordBatch.RECORD_BATCH_OVERHEAD, 10 * batchSize, CompressionType.NONE, 10);
int appends = expectedNumAppends(batchSize);
for (int i = 0; i < appends; i++) {
accum.append(tp1, 0L, key, value, Record.EMPTY_HEADERS, null, maxBlockTimeMs, false);
accum.append(tp1, 0L, key, value, Record.EMPTY_HEADERS, null, maxBlockTimeMs, false, time.milliseconds());
assertEquals("No partitions should be ready.", 0, accum.ready(cluster, now).readyNodes.size());
}
time.sleep(2000);
@ -708,7 +708,7 @@ public class RecordAccumulatorTest { @@ -708,7 +708,7 @@ public class RecordAccumulatorTest {
RecordAccumulator accum = new RecordAccumulator(logContext, batchSize + DefaultRecordBatch.RECORD_BATCH_OVERHEAD,
CompressionType.NONE, lingerMs, retryBackoffMs, deliveryTimeoutMs, metrics, metricGrpName, time, apiVersions, new TransactionManager(),
new BufferPool(totalSize, batchSize, metrics, time, metricGrpName));
accum.append(tp1, 0L, key, value, Record.EMPTY_HEADERS, null, 0, false);
accum.append(tp1, 0L, key, value, Record.EMPTY_HEADERS, null, 0, false, time.milliseconds());
}
@Test
@ -808,7 +808,7 @@ public class RecordAccumulatorTest { @@ -808,7 +808,7 @@ public class RecordAccumulatorTest {
int dice = random.nextInt(100);
byte[] value = (dice < goodCompRatioPercentage) ?
bytesWithGoodCompression(random) : bytesWithPoorCompression(random, 100);
accum.append(tp1, 0L, null, value, Record.EMPTY_HEADERS, null, 0, false);
accum.append(tp1, 0L, null, value, Record.EMPTY_HEADERS, null, 0, false, time.milliseconds());
BatchDrainedResult result = completeOrSplitBatches(accum, batchSize);
numSplit += result.numSplit;
numBatches += result.numBatches;
@ -831,7 +831,7 @@ public class RecordAccumulatorTest { @@ -831,7 +831,7 @@ public class RecordAccumulatorTest {
RecordAccumulator accum = createTestRecordAccumulator(
batchSize + DefaultRecordBatch.RECORD_BATCH_OVERHEAD, 10 * batchSize, CompressionType.NONE, lingerMs);
accum.append(tp1, 0L, key, value, Record.EMPTY_HEADERS, null, maxBlockTimeMs, false);
accum.append(tp1, 0L, key, value, Record.EMPTY_HEADERS, null, maxBlockTimeMs, false, time.milliseconds());
Set<Node> readyNodes = accum.ready(cluster, time.milliseconds()).readyNodes;
Map<Integer, List<ProducerBatch>> drained = accum.drain(cluster, readyNodes, Integer.MAX_VALUE, time.milliseconds());
assertTrue(drained.isEmpty());
@ -846,7 +846,7 @@ public class RecordAccumulatorTest { @@ -846,7 +846,7 @@ public class RecordAccumulatorTest {
//assertTrue(accum.soonToExpireInFlightBatches().isEmpty());
// Queue another batch and advance clock such that batch expiry time is earlier than request timeout.
accum.append(tp2, 0L, key, value, Record.EMPTY_HEADERS, null, maxBlockTimeMs, false);
accum.append(tp2, 0L, key, value, Record.EMPTY_HEADERS, null, maxBlockTimeMs, false, time.milliseconds());
time.sleep(lingerMs * 4);
// Now drain and check that accumulator picked up the drained batch because its expiry is soon.
@ -871,7 +871,7 @@ public class RecordAccumulatorTest { @@ -871,7 +871,7 @@ public class RecordAccumulatorTest {
// Test batches in retry.
for (Boolean mute : muteStates) {
accum.append(tp1, 0L, key, value, Record.EMPTY_HEADERS, null, 0, false);
accum.append(tp1, 0L, key, value, Record.EMPTY_HEADERS, null, 0, false, time.milliseconds());
time.sleep(lingerMs);
readyNodes = accum.ready(cluster, time.milliseconds()).readyNodes;
assertEquals("Our partition's leader should be ready", Collections.singleton(node1), readyNodes);
@ -892,7 +892,7 @@ public class RecordAccumulatorTest { @@ -892,7 +892,7 @@ public class RecordAccumulatorTest {
assertEquals("RecordAccumulator has expired batches if the partition is not muted", mute ? 1 : 0, expiredBatches.size());
}
}
@Test
public void testStickyBatches() throws Exception {
long now = time.milliseconds();
@ -904,19 +904,19 @@ public class RecordAccumulatorTest { @@ -904,19 +904,19 @@ public class RecordAccumulatorTest {
RecordAccumulator accum = createTestRecordAccumulator(3200,
batchSize + DefaultRecordBatch.RECORD_BATCH_OVERHEAD, 10L * batchSize, CompressionType.NONE, 10);
int expectedAppends = expectedNumAppendsNoKey(batchSize);
// Create first batch
int partition = partitioner.partition(topic, null, null, "value", value, cluster);
TopicPartition tp = new TopicPartition(topic, partition);
accum.append(tp, 0L, null, value, Record.EMPTY_HEADERS, null, maxBlockTimeMs, false);
accum.append(tp, 0L, null, value, Record.EMPTY_HEADERS, null, maxBlockTimeMs, false, time.milliseconds());
int appends = 1;
boolean switchPartition = false;
while (!switchPartition) {
// Append to the first batch
partition = partitioner.partition(topic, null, null, "value", value, cluster);
tp = new TopicPartition(topic, partition);
RecordAccumulator.RecordAppendResult result = accum.append(tp, 0L, null, value, Record.EMPTY_HEADERS, null, maxBlockTimeMs, true);
RecordAccumulator.RecordAppendResult result = accum.append(tp, 0L, null, value, Record.EMPTY_HEADERS, null, maxBlockTimeMs, true, time.milliseconds());
Deque<ProducerBatch> partitionBatches1 = accum.batches().get(tp1);
Deque<ProducerBatch> partitionBatches2 = accum.batches().get(tp2);
Deque<ProducerBatch> partitionBatches3 = accum.batches().get(tp3);
@ -931,38 +931,38 @@ public class RecordAccumulatorTest { @@ -931,38 +931,38 @@ public class RecordAccumulatorTest {
assertEquals("No partitions should be ready.", 0, accum.ready(cluster, now).readyNodes.size());
}
}
// Batch should be full.
assertEquals(1, accum.ready(cluster, time.milliseconds()).readyNodes.size());
assertEquals(appends, expectedAppends);
switchPartition = false;
// KafkaProducer would call this method in this case, make second batch
partitioner.onNewBatch(topic, cluster, partition);
partition = partitioner.partition(topic, null, null, "value", value, cluster);
tp = new TopicPartition(topic, partition);
accum.append(tp, 0L, null, value, Record.EMPTY_HEADERS, null, maxBlockTimeMs, false);
accum.append(tp, 0L, null, value, Record.EMPTY_HEADERS, null, maxBlockTimeMs, false, time.milliseconds());
appends++;
// These appends all go into the second batch
while (!switchPartition) {
partition = partitioner.partition(topic, null, null, "value", value, cluster);
tp = new TopicPartition(topic, partition);
RecordAccumulator.RecordAppendResult result = accum.append(tp, 0L, null, value, Record.EMPTY_HEADERS, null, maxBlockTimeMs, true);
RecordAccumulator.RecordAppendResult result = accum.append(tp, 0L, null, value, Record.EMPTY_HEADERS, null, maxBlockTimeMs, true, time.milliseconds());
Deque<ProducerBatch> partitionBatches1 = accum.batches().get(tp1);
Deque<ProducerBatch> partitionBatches2 = accum.batches().get(tp2);
Deque<ProducerBatch> partitionBatches3 = accum.batches().get(tp3);
int numBatches = (partitionBatches1 == null ? 0 : partitionBatches1.size()) + (partitionBatches2 == null ? 0 : partitionBatches2.size()) + (partitionBatches3 == null ? 0 : partitionBatches3.size());
// Only two batches because the new partition is also sticky.
assertEquals(2, numBatches);
switchPartition = result.abortForNewBatch;
// We only appended if we do not retry.
if (!switchPartition) {
appends++;
}
}
// There should be two full batches now.
assertEquals(appends, 2 * expectedAppends);
}
@ -976,7 +976,7 @@ public class RecordAccumulatorTest { @@ -976,7 +976,7 @@ public class RecordAccumulatorTest {
CompressionRatioEstimator.setEstimation(tp1.topic(), CompressionType.GZIP, 0.1f);
// Append 20 records of 100 bytes size with poor compression ratio should make the batch too big.
for (int i = 0; i < numRecords; i++) {
accum.append(tp1, 0L, null, bytesWithPoorCompression(random, recordSize), Record.EMPTY_HEADERS, null, 0, false);
accum.append(tp1, 0L, null, bytesWithPoorCompression(random, recordSize), Record.EMPTY_HEADERS, null, 0, false, time.milliseconds());
}
RecordAccumulator.ReadyCheckResult result = accum.ready(cluster, time.milliseconds());
@ -1061,7 +1061,7 @@ public class RecordAccumulatorTest { @@ -1061,7 +1061,7 @@ public class RecordAccumulatorTest {
size += recordSize;
}
}
/**
* Return the offset delta when there is no key.
*/

203
clients/src/test/java/org/apache/kafka/clients/producer/internals/SenderTest.java

@ -152,8 +152,7 @@ public class SenderTest { @@ -152,8 +152,7 @@ public class SenderTest {
@Test
public void testSimple() throws Exception {
long offset = 0;
Future<RecordMetadata> future = accumulator.append(tp0, 0L, "key".getBytes(), "value".getBytes(),
null, null, MAX_BLOCK_TIMEOUT, false).future;
Future<RecordMetadata> future = appendToAccumulator(tp0, 0L, "key", "value");
sender.runOnce(); // connect
sender.runOnce(); // send produce request
assertEquals("We should have a single produce request in flight.", 1, client.inFlightRequestCount());
@ -179,8 +178,7 @@ public class SenderTest { @@ -179,8 +178,7 @@ public class SenderTest {
// start off support produce request v3
apiVersions.update("0", NodeApiVersions.create());
Future<RecordMetadata> future = accumulator.append(tp0, 0L, "key".getBytes(), "value".getBytes(),
null, null, MAX_BLOCK_TIMEOUT, false).future;
Future<RecordMetadata> future = appendToAccumulator(tp0, 0L, "key", "value");
// now the partition leader supports only v2
apiVersions.update("0", NodeApiVersions.create(ApiKeys.PRODUCE.id, (short) 0, (short) 2));
@ -218,14 +216,12 @@ public class SenderTest { @@ -218,14 +216,12 @@ public class SenderTest {
// start off support produce request v3
apiVersions.update("0", NodeApiVersions.create());
Future<RecordMetadata> future1 = accumulator.append(tp0, 0L, "key".getBytes(), "value".getBytes(),
null, null, MAX_BLOCK_TIMEOUT, false).future;
Future<RecordMetadata> future1 = appendToAccumulator(tp0, 0L, "key", "value");
// now the partition leader supports only v2
apiVersions.update("0", NodeApiVersions.create(ApiKeys.PRODUCE.id, (short) 0, (short) 2));
Future<RecordMetadata> future2 = accumulator.append(tp1, 0L, "key".getBytes(), "value".getBytes(),
null, null, MAX_BLOCK_TIMEOUT, false).future;
Future<RecordMetadata> future2 = appendToAccumulator(tp1, 0L, "key", "value");
// start off support produce request v3
apiVersions.update("0", NodeApiVersions.create());
@ -320,7 +316,7 @@ public class SenderTest { @@ -320,7 +316,7 @@ public class SenderTest {
1, metricsRegistry, time, REQUEST_TIMEOUT, RETRY_BACKOFF_MS, null, apiVersions);
// Append a message so that topic metrics are created
accumulator.append(tp0, 0L, "key".getBytes(), "value".getBytes(), null, null, MAX_BLOCK_TIMEOUT, false);
appendToAccumulator(tp0, 0L, "key", "value");
sender.runOnce(); // connect
sender.runOnce(); // send produce request
client.respond(produceResponse(tp0, 0, Errors.NONE, 0));
@ -347,7 +343,7 @@ public class SenderTest { @@ -347,7 +343,7 @@ public class SenderTest {
Sender sender = new Sender(logContext, client, metadata, this.accumulator, false, MAX_REQUEST_SIZE, ACKS_ALL,
maxRetries, senderMetrics, time, REQUEST_TIMEOUT, RETRY_BACKOFF_MS, null, apiVersions);
// do a successful retry
Future<RecordMetadata> future = accumulator.append(tp0, 0L, "key".getBytes(), "value".getBytes(), null, null, MAX_BLOCK_TIMEOUT, false).future;
Future<RecordMetadata> future = appendToAccumulator(tp0, 0L, "key", "value");
sender.runOnce(); // connect
sender.runOnce(); // send produce request
String id = client.requests().peek().destination();
@ -376,7 +372,7 @@ public class SenderTest { @@ -376,7 +372,7 @@ public class SenderTest {
assertEquals(0, sender.inFlightBatches(tp0).size());
// do an unsuccessful retry
future = accumulator.append(tp0, 0L, "key".getBytes(), "value".getBytes(), null, null, MAX_BLOCK_TIMEOUT, false).future;
future = appendToAccumulator(tp0, 0L, "key", "value");
sender.runOnce(); // send produce request
assertEquals(1, sender.inFlightBatches(tp0).size());
for (int i = 0; i < maxRetries + 1; i++) {
@ -410,7 +406,7 @@ public class SenderTest { @@ -410,7 +406,7 @@ public class SenderTest {
// Send the first message.
TopicPartition tp2 = new TopicPartition("test", 1);
accumulator.append(tp2, 0L, "key1".getBytes(), "value1".getBytes(), null, null, MAX_BLOCK_TIMEOUT, false);
appendToAccumulator(tp2, 0L, "key1", "value1");
sender.runOnce(); // connect
sender.runOnce(); // send produce request
String id = client.requests().peek().destination();
@ -423,7 +419,7 @@ public class SenderTest { @@ -423,7 +419,7 @@ public class SenderTest {
time.sleep(900);
// Now send another message to tp2
accumulator.append(tp2, 0L, "key2".getBytes(), "value2".getBytes(), null, null, MAX_BLOCK_TIMEOUT, false);
appendToAccumulator(tp2, 0L, "key2", "value2");
// Update metadata before sender receives response from broker 0. Now partition 2 moves to broker 0
MetadataResponse metadataUpdate2 = TestUtils.metadataUpdateWith(1, Collections.singletonMap("test", 2));
@ -453,7 +449,7 @@ public class SenderTest { @@ -453,7 +449,7 @@ public class SenderTest {
if (exception instanceof TimeoutException) {
expiryCallbackCount.incrementAndGet();
try {
accumulator.append(tp1, 0L, key, value, Record.EMPTY_HEADERS, null, maxBlockTimeMs, false);
accumulator.append(tp1, 0L, key, value, Record.EMPTY_HEADERS, null, maxBlockTimeMs, false, time.milliseconds());
} catch (InterruptedException e) {
throw new RuntimeException("Unexpected interruption", e);
}
@ -462,8 +458,9 @@ public class SenderTest { @@ -462,8 +458,9 @@ public class SenderTest {
}
};
final long nowMs = time.milliseconds();
for (int i = 0; i < messagesPerBatch; i++)
accumulator.append(tp1, 0L, key, value, null, callback, maxBlockTimeMs, false);
accumulator.append(tp1, 0L, key, value, null, callback, maxBlockTimeMs, false, nowMs);
// Advance the clock to expire the first batch.
time.sleep(10000);
@ -496,7 +493,7 @@ public class SenderTest { @@ -496,7 +493,7 @@ public class SenderTest {
long offset = 0;
client.updateMetadata(TestUtils.metadataUpdateWith(1, Collections.singletonMap("test", 2)));
Future<RecordMetadata> future = accumulator.append(tp0, time.milliseconds(), "key".getBytes(), "value".getBytes(), null, null, MAX_BLOCK_TIMEOUT, false).future;
Future<RecordMetadata> future = appendToAccumulator(tp0);
sender.runOnce();
assertTrue("Topic not added to metadata", metadata.containsTopic(tp0.topic()));
client.updateMetadata(TestUtils.metadataUpdateWith(1, Collections.singletonMap("test", 2)));
@ -513,7 +510,7 @@ public class SenderTest { @@ -513,7 +510,7 @@ public class SenderTest {
time.sleep(ProducerMetadata.TOPIC_EXPIRY_MS);
client.updateMetadata(TestUtils.metadataUpdateWith(1, Collections.singletonMap("test", 2)));
assertFalse("Unused topic has not been expired", metadata.containsTopic(tp0.topic()));
future = accumulator.append(tp0, time.milliseconds(), "key".getBytes(), "value".getBytes(), null, null, MAX_BLOCK_TIMEOUT, false).future;
future = appendToAccumulator(tp0);
sender.runOnce();
assertTrue("Topic not added to metadata", metadata.containsTopic(tp0.topic()));
client.updateMetadata(TestUtils.metadataUpdateWith(1, Collections.singletonMap("test", 2)));
@ -555,7 +552,7 @@ public class SenderTest { @@ -555,7 +552,7 @@ public class SenderTest {
@Test
public void testCanRetryWithoutIdempotence() throws Exception {
// do a successful retry
Future<RecordMetadata> future = accumulator.append(tp0, 0L, "key".getBytes(), "value".getBytes(), null, null, MAX_BLOCK_TIMEOUT, false).future;
Future<RecordMetadata> future = appendToAccumulator(tp0, 0L, "key", "value");
sender.runOnce(); // connect
sender.runOnce(); // send produce request
String id = client.requests().peek().destination();
@ -594,7 +591,7 @@ public class SenderTest { @@ -594,7 +591,7 @@ public class SenderTest {
assertEquals(0, transactionManager.sequenceNumber(tp0).longValue());
// Send first ProduceRequest
Future<RecordMetadata> request1 = accumulator.append(tp0, time.milliseconds(), "key".getBytes(), "value".getBytes(), null, null, MAX_BLOCK_TIMEOUT, false).future;
Future<RecordMetadata> request1 = appendToAccumulator(tp0);
sender.runOnce();
String nodeId = client.requests().peek().destination();
Node node = new Node(Integer.valueOf(nodeId), "localhost", 0);
@ -603,7 +600,7 @@ public class SenderTest { @@ -603,7 +600,7 @@ public class SenderTest {
assertEquals(OptionalInt.empty(), transactionManager.lastAckedSequence(tp0));
// Send second ProduceRequest
Future<RecordMetadata> request2 = accumulator.append(tp0, time.milliseconds(), "key".getBytes(), "value".getBytes(), null, null, MAX_BLOCK_TIMEOUT, false).future;
Future<RecordMetadata> request2 = appendToAccumulator(tp0);
sender.runOnce();
assertEquals(2, client.inFlightRequestCount());
assertEquals(2, transactionManager.sequenceNumber(tp0).longValue());
@ -644,7 +641,7 @@ public class SenderTest { @@ -644,7 +641,7 @@ public class SenderTest {
assertEquals(0, transactionManager.sequenceNumber(tp0).longValue());
// Send first ProduceRequest
Future<RecordMetadata> request1 = accumulator.append(tp0, time.milliseconds(), "key".getBytes(), "value".getBytes(), null, null, MAX_BLOCK_TIMEOUT, false).future;
Future<RecordMetadata> request1 = appendToAccumulator(tp0);
sender.runOnce();
String nodeId = client.requests().peek().destination();
Node node = new Node(Integer.valueOf(nodeId), "localhost", 0);
@ -653,11 +650,11 @@ public class SenderTest { @@ -653,11 +650,11 @@ public class SenderTest {
assertEquals(OptionalInt.empty(), transactionManager.lastAckedSequence(tp0));
// Send second ProduceRequest
Future<RecordMetadata> request2 = accumulator.append(tp0, time.milliseconds(), "key".getBytes(), "value".getBytes(), null, null, MAX_BLOCK_TIMEOUT, false).future;
Future<RecordMetadata> request2 = appendToAccumulator(tp0);
sender.runOnce();
// Send third ProduceRequest
Future<RecordMetadata> request3 = accumulator.append(tp0, time.milliseconds(), "key".getBytes(), "value".getBytes(), null, null, MAX_BLOCK_TIMEOUT, false).future;
Future<RecordMetadata> request3 = appendToAccumulator(tp0);
sender.runOnce();
assertEquals(3, client.inFlightRequestCount());
@ -672,7 +669,7 @@ public class SenderTest { @@ -672,7 +669,7 @@ public class SenderTest {
sender.runOnce(); // receive response 0
// Queue the fourth request, it shouldn't be sent until the first 3 complete.
Future<RecordMetadata> request4 = accumulator.append(tp0, time.milliseconds(), "key".getBytes(), "value".getBytes(), null, null, MAX_BLOCK_TIMEOUT, false).future;
Future<RecordMetadata> request4 = appendToAccumulator(tp0);
assertEquals(2, client.inFlightRequestCount());
assertEquals(OptionalInt.empty(), transactionManager.lastAckedSequence(tp0));
@ -744,7 +741,7 @@ public class SenderTest { @@ -744,7 +741,7 @@ public class SenderTest {
assertEquals(0, transactionManager.sequenceNumber(tp0).longValue());
// Send first ProduceRequest
Future<RecordMetadata> request1 = accumulator.append(tp0, time.milliseconds(), "key".getBytes(), "value".getBytes(), null, null, MAX_BLOCK_TIMEOUT, false).future;
Future<RecordMetadata> request1 = appendToAccumulator(tp0);
sender.runOnce();
String nodeId = client.requests().peek().destination();
Node node = new Node(Integer.valueOf(nodeId), "localhost", 0);
@ -753,7 +750,7 @@ public class SenderTest { @@ -753,7 +750,7 @@ public class SenderTest {
assertEquals(OptionalInt.empty(), transactionManager.lastAckedSequence(tp0));
// Send second ProduceRequest
Future<RecordMetadata> request2 = accumulator.append(tp0, time.milliseconds(), "key".getBytes(), "value".getBytes(), null, null, MAX_BLOCK_TIMEOUT, false).future;
Future<RecordMetadata> request2 = appendToAccumulator(tp0);
sender.runOnce();
assertEquals(2, client.inFlightRequestCount());
assertEquals(2, transactionManager.sequenceNumber(tp0).longValue());
@ -803,8 +800,8 @@ public class SenderTest { @@ -803,8 +800,8 @@ public class SenderTest {
assertEquals(0, transactionManager.sequenceNumber(tp0).longValue());
// Send first ProduceRequest with multiple messages.
Future<RecordMetadata> request1 = accumulator.append(tp0, time.milliseconds(), "key".getBytes(), "value".getBytes(), null, null, MAX_BLOCK_TIMEOUT, false).future;
accumulator.append(tp0, time.milliseconds(), "key".getBytes(), "value".getBytes(), null, null, MAX_BLOCK_TIMEOUT, false);
Future<RecordMetadata> request1 = appendToAccumulator(tp0);
appendToAccumulator(tp0);
sender.runOnce();
String nodeId = client.requests().peek().destination();
Node node = new Node(Integer.valueOf(nodeId), "localhost", 0);
@ -818,7 +815,7 @@ public class SenderTest { @@ -818,7 +815,7 @@ public class SenderTest {
sender.runOnce();
// Send second ProduceRequest
Future<RecordMetadata> request2 = accumulator.append(tp0, time.milliseconds(), "key".getBytes(), "value".getBytes(), null, null, MAX_BLOCK_TIMEOUT, false).future;
Future<RecordMetadata> request2 = appendToAccumulator(tp0);
sender.runOnce();
assertEquals(1, client.inFlightRequestCount());
assertEquals(3, transactionManager.sequenceNumber(tp0).longValue());
@ -845,7 +842,7 @@ public class SenderTest { @@ -845,7 +842,7 @@ public class SenderTest {
assertEquals(0, transactionManager.sequenceNumber(tp0).longValue());
// Send first ProduceRequest
Future<RecordMetadata> request1 = accumulator.append(tp0, time.milliseconds(), "key".getBytes(), "value".getBytes(), null, null, MAX_BLOCK_TIMEOUT, false).future;
Future<RecordMetadata> request1 = appendToAccumulator(tp0);
sender.runOnce();
String nodeId = client.requests().peek().destination();
Node node = new Node(Integer.valueOf(nodeId), "localhost", 0);
@ -854,7 +851,7 @@ public class SenderTest { @@ -854,7 +851,7 @@ public class SenderTest {
assertEquals(OptionalInt.empty(), transactionManager.lastAckedSequence(tp0));
// Send second ProduceRequest
Future<RecordMetadata> request2 = accumulator.append(tp0, time.milliseconds(), "key".getBytes(), "value".getBytes(), null, null, MAX_BLOCK_TIMEOUT, false).future;
Future<RecordMetadata> request2 = appendToAccumulator(tp0);
sender.runOnce();
assertEquals(2, client.inFlightRequestCount());
assertEquals(2, transactionManager.sequenceNumber(tp0).longValue());
@ -927,14 +924,14 @@ public class SenderTest { @@ -927,14 +924,14 @@ public class SenderTest {
assertEquals(0, transactionManager.sequenceNumber(tp0).longValue());
// Send first ProduceRequest
Future<RecordMetadata> request1 = accumulator.append(tp0, time.milliseconds(), "key".getBytes(), "value".getBytes(), null, null, MAX_BLOCK_TIMEOUT, false).future;
Future<RecordMetadata> request1 = appendToAccumulator(tp0);
sender.runOnce();
String nodeId = client.requests().peek().destination();
Node node = new Node(Integer.valueOf(nodeId), "localhost", 0);
assertEquals(1, client.inFlightRequestCount());
// Send second ProduceRequest
Future<RecordMetadata> request2 = accumulator.append(tp0, time.milliseconds(), "key".getBytes(), "value".getBytes(), null, null, MAX_BLOCK_TIMEOUT, false).future;
Future<RecordMetadata> request2 = appendToAccumulator(tp0);
sender.runOnce();
assertEquals(2, client.inFlightRequestCount());
assertFalse(request1.isDone());
@ -995,7 +992,7 @@ public class SenderTest { @@ -995,7 +992,7 @@ public class SenderTest {
assertEquals(0, transactionManager.sequenceNumber(tp0).longValue());
// Send first ProduceRequest
Future<RecordMetadata> request1 = accumulator.append(tp0, 0L, "key".getBytes(), "value".getBytes(), null, null, MAX_BLOCK_TIMEOUT, false).future;
Future<RecordMetadata> request1 = appendToAccumulator(tp0, 0L, "key", "value");
Node node = metadata.fetch().nodes().get(0);
time.sleep(10000L);
client.disconnect(node.idString());
@ -1017,13 +1014,13 @@ public class SenderTest { @@ -1017,13 +1014,13 @@ public class SenderTest {
assertEquals(0, transactionManager.sequenceNumber(tp0).longValue());
// Send first ProduceRequest
Future<RecordMetadata> request1 = accumulator.append(tp0, time.milliseconds(), "key".getBytes(), "value".getBytes(), null, null, MAX_BLOCK_TIMEOUT, false).future;
Future<RecordMetadata> request1 = appendToAccumulator(tp0);
sender.runOnce(); // send request
// We separate the two appends by 1 second so that the two batches
// don't expire at the same time.
time.sleep(1000L);
Future<RecordMetadata> request2 = accumulator.append(tp0, time.milliseconds(), "key".getBytes(), "value".getBytes(), null, null, MAX_BLOCK_TIMEOUT, false).future;
Future<RecordMetadata> request2 = appendToAccumulator(tp0);
sender.runOnce(); // send request
assertEquals(2, client.inFlightRequestCount());
assertEquals(2, sender.inFlightBatches(tp0).size());
@ -1045,7 +1042,7 @@ public class SenderTest { @@ -1045,7 +1042,7 @@ public class SenderTest {
assertEquals(0, sender.inFlightBatches(tp0).size());
// let's enqueue another batch, which should not be dequeued until the unresolved state is clear.
Future<RecordMetadata> request3 = accumulator.append(tp0, time.milliseconds(), "key".getBytes(), "value".getBytes(), null, null, MAX_BLOCK_TIMEOUT, false).future;
Future<RecordMetadata> request3 = appendToAccumulator(tp0);
time.sleep(20);
assertFalse(request2.isDone());
@ -1084,11 +1081,11 @@ public class SenderTest { @@ -1084,11 +1081,11 @@ public class SenderTest {
assertEquals(0, transactionManager.sequenceNumber(tp0).longValue());
// Send first ProduceRequest
Future<RecordMetadata> request1 = accumulator.append(tp0, time.milliseconds(), "key".getBytes(), "value".getBytes(), null, null, MAX_BLOCK_TIMEOUT, false).future;
Future<RecordMetadata> request1 = appendToAccumulator(tp0);
sender.runOnce(); // send request
time.sleep(1000L);
Future<RecordMetadata> request2 = accumulator.append(tp0, time.milliseconds(), "key".getBytes(), "value".getBytes(), null, null, MAX_BLOCK_TIMEOUT, false).future;
Future<RecordMetadata> request2 = appendToAccumulator(tp0);
sender.runOnce(); // send request
assertEquals(2, client.inFlightRequestCount());
@ -1105,7 +1102,7 @@ public class SenderTest { @@ -1105,7 +1102,7 @@ public class SenderTest {
assertFutureFailure(request1, TimeoutException.class);
assertTrue(transactionManager.hasUnresolvedSequence(tp0));
// let's enqueue another batch, which should not be dequeued until the unresolved state is clear.
Future<RecordMetadata> request3 = accumulator.append(tp0, time.milliseconds(), "key".getBytes(), "value".getBytes(), null, null, MAX_BLOCK_TIMEOUT, false).future;
Future<RecordMetadata> request3 = appendToAccumulator(tp0);
time.sleep(20);
assertFalse(request2.isDone());
@ -1137,7 +1134,7 @@ public class SenderTest { @@ -1137,7 +1134,7 @@ public class SenderTest {
assertEquals(0, transactionManager.sequenceNumber(tp0).longValue());
// Send first ProduceRequest
Future<RecordMetadata> request1 = accumulator.append(tp0, 0L, "key".getBytes(), "value".getBytes(), null, null, MAX_BLOCK_TIMEOUT, false).future;
Future<RecordMetadata> request1 = appendToAccumulator(tp0, 0L, "key", "value");
sender.runOnce(); // send request
sendIdempotentProducerResponse(0, tp0, Errors.NOT_LEADER_FOR_PARTITION, -1);
@ -1176,10 +1173,8 @@ public class SenderTest { @@ -1176,10 +1173,8 @@ public class SenderTest {
Sender sender = new Sender(logContext, client, metadata, this.accumulator, true, MAX_REQUEST_SIZE, ACKS_ALL, maxRetries,
senderMetrics, time, REQUEST_TIMEOUT, RETRY_BACKOFF_MS, transactionManager, apiVersions);
Future<RecordMetadata> failedResponse = accumulator.append(tp0, time.milliseconds(), "key".getBytes(),
"value".getBytes(), null, null, MAX_BLOCK_TIMEOUT, false).future;
Future<RecordMetadata> successfulResponse = accumulator.append(tp1, time.milliseconds(), "key".getBytes(),
"value".getBytes(), null, null, MAX_BLOCK_TIMEOUT, false).future;
Future<RecordMetadata> failedResponse = appendToAccumulator(tp0);
Future<RecordMetadata> successfulResponse = appendToAccumulator(tp1);
sender.runOnce(); // connect and send.
assertEquals(1, client.inFlightRequestCount());
@ -1218,10 +1213,8 @@ public class SenderTest { @@ -1218,10 +1213,8 @@ public class SenderTest {
Sender sender = new Sender(logContext, client, metadata, this.accumulator, true, MAX_REQUEST_SIZE, ACKS_ALL, 10,
senderMetrics, time, REQUEST_TIMEOUT, RETRY_BACKOFF_MS, transactionManager, apiVersions);
Future<RecordMetadata> failedResponse = accumulator.append(tp0, time.milliseconds(), "key".getBytes(),
"value".getBytes(), null, null, MAX_BLOCK_TIMEOUT, false).future;
Future<RecordMetadata> successfulResponse = accumulator.append(tp1, time.milliseconds(), "key".getBytes(),
"value".getBytes(), null, null, MAX_BLOCK_TIMEOUT, false).future;
Future<RecordMetadata> failedResponse = appendToAccumulator(tp0);
Future<RecordMetadata> successfulResponse = appendToAccumulator(tp1);
sender.runOnce(); // connect and send.
assertEquals(1, client.inFlightRequestCount());
@ -1257,10 +1250,8 @@ public class SenderTest { @@ -1257,10 +1250,8 @@ public class SenderTest {
Sender sender = new Sender(logContext, client, metadata, this.accumulator, true, MAX_REQUEST_SIZE, ACKS_ALL, 10,
senderMetrics, time, REQUEST_TIMEOUT, RETRY_BACKOFF_MS, transactionManager, apiVersions);
Future<RecordMetadata> failedResponse = accumulator.append(tp0, time.milliseconds(), "key".getBytes(),
"value".getBytes(), null, null, MAX_BLOCK_TIMEOUT, false).future;
Future<RecordMetadata> successfulResponse = accumulator.append(tp1, time.milliseconds(), "key".getBytes(),
"value".getBytes(), null, null, MAX_BLOCK_TIMEOUT, false).future;
Future<RecordMetadata> failedResponse = appendToAccumulator(tp0);
Future<RecordMetadata> successfulResponse = appendToAccumulator(tp1);
sender.runOnce(); // connect and send.
assertEquals(1, client.inFlightRequestCount());
@ -1293,10 +1284,8 @@ public class SenderTest { @@ -1293,10 +1284,8 @@ public class SenderTest {
Sender sender = new Sender(logContext, client, metadata, this.accumulator, true, MAX_REQUEST_SIZE, ACKS_ALL, maxRetries,
senderMetrics, time, REQUEST_TIMEOUT, RETRY_BACKOFF_MS, transactionManager, apiVersions);
Future<RecordMetadata> failedResponse = accumulator.append(tp0, time.milliseconds(), "key".getBytes(),
"value".getBytes(), null, null, MAX_BLOCK_TIMEOUT, false).future;
Future<RecordMetadata> successfulResponse = accumulator.append(tp1, time.milliseconds(), "key".getBytes(),
"value".getBytes(), null, null, MAX_BLOCK_TIMEOUT, false).future;
Future<RecordMetadata> failedResponse = appendToAccumulator(tp0);
Future<RecordMetadata> successfulResponse = appendToAccumulator(tp1);
sender.runOnce(); // connect.
sender.runOnce(); // send.
@ -1340,7 +1329,7 @@ public class SenderTest { @@ -1340,7 +1329,7 @@ public class SenderTest {
assertEquals(0, transactionManager.sequenceNumber(tp0).longValue());
// Send first ProduceRequest
Future<RecordMetadata> request1 = accumulator.append(tp0, time.milliseconds(), "key".getBytes(), "value".getBytes(), null, null, MAX_BLOCK_TIMEOUT, false).future;
Future<RecordMetadata> request1 = appendToAccumulator(tp0);
sender.runOnce();
String nodeId = client.requests().peek().destination();
Node node = new Node(Integer.valueOf(nodeId), "localhost", 0);
@ -1349,7 +1338,7 @@ public class SenderTest { @@ -1349,7 +1338,7 @@ public class SenderTest {
assertEquals(OptionalInt.empty(), transactionManager.lastAckedSequence(tp0));
// Send second ProduceRequest
Future<RecordMetadata> request2 = accumulator.append(tp0, time.milliseconds(), "key".getBytes(), "value".getBytes(), null, null, MAX_BLOCK_TIMEOUT, false).future;
Future<RecordMetadata> request2 = appendToAccumulator(tp0);
sender.runOnce();
assertEquals(2, client.inFlightRequestCount());
assertEquals(2, transactionManager.sequenceNumber(tp0).longValue());
@ -1393,7 +1382,7 @@ public class SenderTest { @@ -1393,7 +1382,7 @@ public class SenderTest {
assertEquals(0, transactionManager.sequenceNumber(tp0).longValue());
// Send first ProduceRequest
Future<RecordMetadata> request1 = accumulator.append(tp0, time.milliseconds(), "key".getBytes(), "value".getBytes(), null, null, MAX_BLOCK_TIMEOUT, false).future;
Future<RecordMetadata> request1 = appendToAccumulator(tp0);
sender.runOnce();
assertEquals(1, client.inFlightRequestCount());
@ -1410,8 +1399,8 @@ public class SenderTest { @@ -1410,8 +1399,8 @@ public class SenderTest {
assertEquals(OptionalLong.of(1000L), transactionManager.lastAckedOffset(tp0));
// Send second ProduceRequest, a single batch with 2 records.
accumulator.append(tp0, time.milliseconds(), "key".getBytes(), "value".getBytes(), null, null, MAX_BLOCK_TIMEOUT, false);
Future<RecordMetadata> request2 = accumulator.append(tp0, time.milliseconds(), "key".getBytes(), "value".getBytes(), null, null, MAX_BLOCK_TIMEOUT, false).future;
appendToAccumulator(tp0);
Future<RecordMetadata> request2 = appendToAccumulator(tp0);
sender.runOnce();
assertEquals(3, transactionManager.sequenceNumber(tp0).longValue());
assertEquals(OptionalInt.of(0), transactionManager.lastAckedSequence(tp0));
@ -1451,7 +1440,7 @@ public class SenderTest { @@ -1451,7 +1440,7 @@ public class SenderTest {
assertEquals(0, transactionManager.sequenceNumber(tp0).longValue());
// Send first ProduceRequest
Future<RecordMetadata> request1 = accumulator.append(tp0, time.milliseconds(), "key".getBytes(), "value".getBytes(), null, null, MAX_BLOCK_TIMEOUT, false).future;
Future<RecordMetadata> request1 = appendToAccumulator(tp0);
sender.runOnce();
assertEquals(1, client.inFlightRequestCount());
@ -1468,7 +1457,7 @@ public class SenderTest { @@ -1468,7 +1457,7 @@ public class SenderTest {
assertEquals(OptionalLong.of(1000L), transactionManager.lastAckedOffset(tp0));
// Send second ProduceRequest
Future<RecordMetadata> request2 = accumulator.append(tp0, time.milliseconds(), "key".getBytes(), "value".getBytes(), null, null, MAX_BLOCK_TIMEOUT, false).future;
Future<RecordMetadata> request2 = appendToAccumulator(tp0);
sender.runOnce();
assertEquals(2, transactionManager.sequenceNumber(tp0).longValue());
assertEquals(OptionalInt.of(0), transactionManager.lastAckedSequence(tp0));
@ -1509,7 +1498,7 @@ public class SenderTest { @@ -1509,7 +1498,7 @@ public class SenderTest {
assertEquals(0, transactionManager.sequenceNumber(tp0).longValue());
// Send first ProduceRequest
Future<RecordMetadata> request1 = accumulator.append(tp0, time.milliseconds(), "key".getBytes(), "value".getBytes(), null, null, MAX_BLOCK_TIMEOUT, false).future;
Future<RecordMetadata> request1 = appendToAccumulator(tp0);
sender.runOnce();
assertEquals(1, client.inFlightRequestCount());
@ -1526,14 +1515,14 @@ public class SenderTest { @@ -1526,14 +1515,14 @@ public class SenderTest {
assertEquals(OptionalLong.of(1000L), transactionManager.lastAckedOffset(tp0));
// Send second ProduceRequest
Future<RecordMetadata> request2 = accumulator.append(tp0, time.milliseconds(), "key".getBytes(), "value".getBytes(), null, null, MAX_BLOCK_TIMEOUT, false).future;
Future<RecordMetadata> request2 = appendToAccumulator(tp0);
sender.runOnce();
assertEquals(2, transactionManager.sequenceNumber(tp0).longValue());
assertEquals(OptionalInt.of(0), transactionManager.lastAckedSequence(tp0));
// Send the third ProduceRequest, in parallel with the second. It should be retried even though the
// lastAckedOffset > logStartOffset when its UnknownProducerResponse comes back.
Future<RecordMetadata> request3 = accumulator.append(tp0, time.milliseconds(), "key".getBytes(), "value".getBytes(), null, null, MAX_BLOCK_TIMEOUT, false).future;
Future<RecordMetadata> request3 = appendToAccumulator(tp0);
sender.runOnce();
assertEquals(3, transactionManager.sequenceNumber(tp0).longValue());
assertEquals(OptionalInt.of(0), transactionManager.lastAckedSequence(tp0));
@ -1598,7 +1587,7 @@ public class SenderTest { @@ -1598,7 +1587,7 @@ public class SenderTest {
assertEquals(0, transactionManager.sequenceNumber(tp0).longValue());
// Send first ProduceRequest
Future<RecordMetadata> request1 = accumulator.append(tp0, time.milliseconds(), "key".getBytes(), "value".getBytes(), null, null, MAX_BLOCK_TIMEOUT, false).future;
Future<RecordMetadata> request1 = appendToAccumulator(tp0);
sender.runOnce();
assertEquals(1, client.inFlightRequestCount());
@ -1615,7 +1604,7 @@ public class SenderTest { @@ -1615,7 +1604,7 @@ public class SenderTest {
assertEquals(OptionalLong.of(1000L), transactionManager.lastAckedOffset(tp0));
// Send second ProduceRequest,
Future<RecordMetadata> request2 = accumulator.append(tp0, time.milliseconds(), "key".getBytes(), "value".getBytes(), null, null, MAX_BLOCK_TIMEOUT, false).future;
Future<RecordMetadata> request2 = appendToAccumulator(tp0);
sender.runOnce();
assertEquals(2, transactionManager.sequenceNumber(tp0).longValue());
assertEquals(OptionalInt.of(0), transactionManager.lastAckedSequence(tp0));
@ -1658,8 +1647,7 @@ public class SenderTest { @@ -1658,8 +1647,7 @@ public class SenderTest {
assertTrue(transactionManager.hasProducerId());
// cluster authorization is a fatal error for the producer
Future<RecordMetadata> future = accumulator.append(tp0, time.milliseconds(), "key".getBytes(), "value".getBytes(),
null, null, MAX_BLOCK_TIMEOUT, false).future;
Future<RecordMetadata> future = appendToAccumulator(tp0);
client.prepareResponse(new MockClient.RequestMatcher() {
@Override
public boolean matches(AbstractRequest body) {
@ -1685,12 +1673,10 @@ public class SenderTest { @@ -1685,12 +1673,10 @@ public class SenderTest {
assertTrue(transactionManager.hasProducerId());
// cluster authorization is a fatal error for the producer
Future<RecordMetadata> future1 = accumulator.append(tp0, time.milliseconds(), "key".getBytes(), "value".getBytes(),
null, null, MAX_BLOCK_TIMEOUT, false).future;
Future<RecordMetadata> future1 = appendToAccumulator(tp0);
sender.runOnce();
Future<RecordMetadata> future2 = accumulator.append(tp1, time.milliseconds(), "key".getBytes(), "value".getBytes(),
null, null, MAX_BLOCK_TIMEOUT, false).future;
Future<RecordMetadata> future2 = appendToAccumulator(tp1);
sender.runOnce();
client.respond(new MockClient.RequestMatcher() {
@ -1726,8 +1712,7 @@ public class SenderTest { @@ -1726,8 +1712,7 @@ public class SenderTest {
prepareAndReceiveInitProducerId(producerId, Errors.NONE);
assertTrue(transactionManager.hasProducerId());
Future<RecordMetadata> future = accumulator.append(tp0, time.milliseconds(), "key".getBytes(), "value".getBytes(),
null, null, MAX_BLOCK_TIMEOUT, false).future;
Future<RecordMetadata> future = appendToAccumulator(tp0);
client.prepareResponse(new MockClient.RequestMatcher() {
@Override
public boolean matches(AbstractRequest body) {
@ -1751,8 +1736,7 @@ public class SenderTest { @@ -1751,8 +1736,7 @@ public class SenderTest {
prepareAndReceiveInitProducerId(producerId, Errors.NONE);
assertTrue(transactionManager.hasProducerId());
Future<RecordMetadata> future = accumulator.append(tp0, time.milliseconds(), "key".getBytes(), "value".getBytes(),
null, null, MAX_BLOCK_TIMEOUT, false).future;
Future<RecordMetadata> future = appendToAccumulator(tp0);
client.prepareUnsupportedVersionResponse(new MockClient.RequestMatcher() {
@Override
public boolean matches(AbstractRequest body) {
@ -1782,7 +1766,7 @@ public class SenderTest { @@ -1782,7 +1766,7 @@ public class SenderTest {
Sender sender = new Sender(logContext, client, metadata, this.accumulator, true, MAX_REQUEST_SIZE, ACKS_ALL, maxRetries,
senderMetrics, time, REQUEST_TIMEOUT, RETRY_BACKOFF_MS, transactionManager, apiVersions);
Future<RecordMetadata> responseFuture = accumulator.append(tp0, time.milliseconds(), "key".getBytes(), "value".getBytes(), null, null, MAX_BLOCK_TIMEOUT, false).future;
Future<RecordMetadata> responseFuture = appendToAccumulator(tp0);
client.prepareResponse(new MockClient.RequestMatcher() {
@Override
public boolean matches(AbstractRequest body) {
@ -1824,7 +1808,7 @@ public class SenderTest { @@ -1824,7 +1808,7 @@ public class SenderTest {
Sender sender = new Sender(logContext, client, metadata, this.accumulator, true, MAX_REQUEST_SIZE, ACKS_ALL, maxRetries,
senderMetrics, time, REQUEST_TIMEOUT, RETRY_BACKOFF_MS, transactionManager, apiVersions);
Future<RecordMetadata> responseFuture = accumulator.append(tp0, time.milliseconds(), "key".getBytes(), "value".getBytes(), null, null, MAX_BLOCK_TIMEOUT, false).future;
Future<RecordMetadata> responseFuture = appendToAccumulator(tp0);
sender.runOnce(); // connect.
sender.runOnce(); // send.
String id = client.requests().peek().destination();
@ -1863,7 +1847,7 @@ public class SenderTest { @@ -1863,7 +1847,7 @@ public class SenderTest {
Sender sender = new Sender(logContext, client, metadata, this.accumulator, true, MAX_REQUEST_SIZE, ACKS_ALL, maxRetries,
senderMetrics, time, REQUEST_TIMEOUT, RETRY_BACKOFF_MS, transactionManager, apiVersions);
Future<RecordMetadata> responseFuture = accumulator.append(tp0, time.milliseconds(), "key".getBytes(), "value".getBytes(), null, null, MAX_BLOCK_TIMEOUT, false).future;
Future<RecordMetadata> responseFuture = appendToAccumulator(tp0);
sender.runOnce(); // connect.
sender.runOnce(); // send.
@ -1926,10 +1910,11 @@ public class SenderTest { @@ -1926,10 +1910,11 @@ public class SenderTest {
MetadataResponse metadataUpdate1 = TestUtils.metadataUpdateWith(2, Collections.singletonMap(topic, 2));
client.prepareMetadataUpdate(metadataUpdate1);
// Send the first message.
long nowMs = time.milliseconds();
Future<RecordMetadata> f1 =
accumulator.append(tp, 0L, "key1".getBytes(), new byte[batchSize / 2], null, null, MAX_BLOCK_TIMEOUT, false).future;
accumulator.append(tp, 0L, "key1".getBytes(), new byte[batchSize / 2], null, null, MAX_BLOCK_TIMEOUT, false, nowMs).future;
Future<RecordMetadata> f2 =
accumulator.append(tp, 0L, "key2".getBytes(), new byte[batchSize / 2], null, null, MAX_BLOCK_TIMEOUT, false).future;
accumulator.append(tp, 0L, "key2".getBytes(), new byte[batchSize / 2], null, null, MAX_BLOCK_TIMEOUT, false, nowMs).future;
sender.runOnce(); // connect
sender.runOnce(); // send produce request
@ -1998,8 +1983,7 @@ public class SenderTest { @@ -1998,8 +1983,7 @@ public class SenderTest {
setupWithTransactionState(null, false, pool);
// Send first ProduceRequest
Future<RecordMetadata> request1 =
accumulator.append(tp0, time.milliseconds(), "key".getBytes(), "value".getBytes(), null, null, MAX_BLOCK_TIMEOUT, false).future;
Future<RecordMetadata> request1 = appendToAccumulator(tp0);
sender.runOnce(); // send request
assertEquals(1, client.inFlightRequestCount());
assertEquals(1, sender.inFlightBatches(tp0).size());
@ -2024,7 +2008,7 @@ public class SenderTest { @@ -2024,7 +2008,7 @@ public class SenderTest {
setupWithTransactionState(null, true, null);
// Send first ProduceRequest
Future<RecordMetadata> request = accumulator.append(tp0, time.milliseconds(), "key".getBytes(), "value".getBytes(), null, null, MAX_BLOCK_TIMEOUT, false).future;
Future<RecordMetadata> request = appendToAccumulator(tp0);
sender.runOnce(); // send request
assertEquals(1, client.inFlightRequestCount());
assertEquals("Expect one in-flight batch in accumulator", 1, sender.inFlightBatches(tp0).size());
@ -2050,7 +2034,7 @@ public class SenderTest { @@ -2050,7 +2034,7 @@ public class SenderTest {
setupWithTransactionState(null, true, null);
// Send first ProduceRequest
accumulator.append(tp0, time.milliseconds(), "key".getBytes(), "value".getBytes(), null, null, MAX_BLOCK_TIMEOUT, false);
appendToAccumulator(tp0);
sender.runOnce(); // send request
assertEquals(1, client.inFlightRequestCount());
assertEquals(1, sender.inFlightBatches(tp0).size());
@ -2058,7 +2042,7 @@ public class SenderTest { @@ -2058,7 +2042,7 @@ public class SenderTest {
time.sleep(deliveryTimeoutMs / 2);
// Send second ProduceRequest
accumulator.append(tp0, time.milliseconds(), "key".getBytes(), "value".getBytes(), null, null, MAX_BLOCK_TIMEOUT, false);
appendToAccumulator(tp0);
sender.runOnce(); // must not send request because the partition is muted
assertEquals(1, client.inFlightRequestCount());
assertEquals(1, sender.inFlightBatches(tp0).size());
@ -2081,9 +2065,7 @@ public class SenderTest { @@ -2081,9 +2065,7 @@ public class SenderTest {
setupWithTransactionState(null, false, null);
// Send first ProduceRequest
Future<RecordMetadata> request1 =
accumulator.append(tp0, time.milliseconds(), "key".getBytes(), "value".getBytes(), null, null,
MAX_BLOCK_TIMEOUT, false).future;
Future<RecordMetadata> request1 = appendToAccumulator(tp0);
sender.runOnce(); // send request
assertEquals(1, client.inFlightRequestCount());
time.sleep(deliverTimeoutMs);
@ -2110,12 +2092,8 @@ public class SenderTest { @@ -2110,12 +2092,8 @@ public class SenderTest {
public void testExpiredBatchDoesNotSplitOnMessageTooLargeError() throws Exception {
long deliverTimeoutMs = 1500L;
// create a producer batch with more than one record so it is eligible for splitting
Future<RecordMetadata> request1 =
accumulator.append(tp0, time.milliseconds(), "key1".getBytes(), "value1".getBytes(), null, null,
MAX_BLOCK_TIMEOUT, false).future;
Future<RecordMetadata> request2 =
accumulator.append(tp0, time.milliseconds(), "key2".getBytes(), "value2".getBytes(), null, null,
MAX_BLOCK_TIMEOUT, false).future;
Future<RecordMetadata> request1 = appendToAccumulator(tp0);
Future<RecordMetadata> request2 = appendToAccumulator(tp0);
// send request
sender.runOnce();
@ -2143,8 +2121,7 @@ public class SenderTest { @@ -2143,8 +2121,7 @@ public class SenderTest {
setupWithTransactionState(null);
accumulator.append(tp0, 0L, "key".getBytes(), "value".getBytes(), null, null,
MAX_BLOCK_TIMEOUT, false);
appendToAccumulator(tp0, 0L, "key", "value");
sender.runOnce();
sender.runOnce();
@ -2168,8 +2145,8 @@ public class SenderTest { @@ -2168,8 +2145,8 @@ public class SenderTest {
setupWithTransactionState(null, true, null);
// Send multiple ProduceRequest across multiple partitions.
Future<RecordMetadata> request1 = accumulator.append(tp0, time.milliseconds(), "k1".getBytes(), "v1".getBytes(), null, null, MAX_BLOCK_TIMEOUT, false).future;
Future<RecordMetadata> request2 = accumulator.append(tp1, time.milliseconds(), "k2".getBytes(), "v2".getBytes(), null, null, MAX_BLOCK_TIMEOUT, false).future;
Future<RecordMetadata> request1 = appendToAccumulator(tp0, time.milliseconds(), "k1", "v1");
Future<RecordMetadata> request2 = appendToAccumulator(tp1, time.milliseconds(), "k2", "v2");
// Send request.
sender.runOnce();
@ -2327,12 +2304,8 @@ public class SenderTest { @@ -2327,12 +2304,8 @@ public class SenderTest {
sender.runOnce();
// create a producer batch with more than one record so it is eligible for splitting
Future<RecordMetadata> request1 =
accumulator.append(tp0, time.milliseconds(), "key1".getBytes(), "value1".getBytes(), null, null,
MAX_BLOCK_TIMEOUT, false).future;
Future<RecordMetadata> request2 =
accumulator.append(tp0, time.milliseconds(), "key2".getBytes(), "value2".getBytes(), null, null,
MAX_BLOCK_TIMEOUT, false).future;
Future<RecordMetadata> request1 = appendToAccumulator(tp0, time.milliseconds(), "key1", "value1");
Future<RecordMetadata> request2 = appendToAccumulator(tp0, time.milliseconds(), "key2", "value2");
// send request
sender.runOnce();
@ -2441,6 +2414,15 @@ public class SenderTest { @@ -2441,6 +2414,15 @@ public class SenderTest {
}
}
private FutureRecordMetadata appendToAccumulator(TopicPartition tp) throws InterruptedException {
return appendToAccumulator(tp, time.milliseconds(), "key", "value");
}
private FutureRecordMetadata appendToAccumulator(TopicPartition tp, long timestamp, String key, String value) throws InterruptedException {
return accumulator.append(tp, timestamp, key.getBytes(), value.getBytes(), Record.EMPTY_HEADERS,
null, MAX_BLOCK_TIMEOUT, false, time.milliseconds()).future;
}
private ProduceResponse produceResponse(TopicPartition tp, long offset, Errors error, int throttleTimeMs, long logStartOffset) {
ProduceResponse.PartitionResponse resp = new ProduceResponse.PartitionResponse(error, offset, RecordBatch.NO_TIMESTAMP, logStartOffset);
Map<TopicPartition, ProduceResponse.PartitionResponse> partResp = Collections.singletonMap(tp, resp);
@ -2479,13 +2461,12 @@ public class SenderTest { @@ -2479,13 +2461,12 @@ public class SenderTest {
this.sender = new Sender(logContext, this.client, this.metadata, this.accumulator, guaranteeOrder, MAX_REQUEST_SIZE, ACKS_ALL,
Integer.MAX_VALUE, this.senderMetricsRegistry, this.time, REQUEST_TIMEOUT, RETRY_BACKOFF_MS, transactionManager, apiVersions);
metadata.add("test");
metadata.add("test", time.milliseconds());
this.client.updateMetadata(TestUtils.metadataUpdateWith(1, Collections.singletonMap("test", 2)));
}
private void assertSendFailure(Class<? extends RuntimeException> expectedError) throws Exception {
Future<RecordMetadata> future = accumulator.append(tp0, time.milliseconds(), "key".getBytes(), "value".getBytes(),
null, null, MAX_BLOCK_TIMEOUT, false).future;
Future<RecordMetadata> future = appendToAccumulator(tp0);
sender.runOnce();
assertTrue(future.isDone());
try {

117
clients/src/test/java/org/apache/kafka/clients/producer/internals/TransactionManagerTest.java

@ -141,7 +141,7 @@ public class TransactionManagerTest { @@ -141,7 +141,7 @@ public class TransactionManagerTest {
new BufferPool(totalSize, batchSize, metrics, time, metricGrpName));
this.sender = new Sender(logContext, this.client, this.metadata, this.accumulator, true, MAX_REQUEST_SIZE, ACKS_ALL,
MAX_RETRIES, senderMetrics, this.time, REQUEST_TIMEOUT, 50, transactionManager, apiVersions);
this.metadata.add("test");
this.metadata.add("test", time.milliseconds());
this.client.updateMetadata(TestUtils.metadataUpdateWith(1, singletonMap("test", 2)));
}
@ -154,8 +154,8 @@ public class TransactionManagerTest { @@ -154,8 +154,8 @@ public class TransactionManagerTest {
transactionManager.failIfNotReadyForSend();
transactionManager.maybeAddPartitionToTransaction(tp0);
FutureRecordMetadata sendFuture = accumulator.append(tp0, time.milliseconds(), "key".getBytes(),
"value".getBytes(), Record.EMPTY_HEADERS, null, MAX_BLOCK_TIMEOUT, false).future;
long nowMs = time.milliseconds();
FutureRecordMetadata sendFuture = appendToAccumulator(tp0);
prepareAddPartitionsToTxn(tp0, Errors.NONE);
prepareProduceResponse(Errors.NONE, pid, epoch);
@ -800,8 +800,7 @@ public class TransactionManagerTest { @@ -800,8 +800,7 @@ public class TransactionManagerTest {
transactionManager.failIfNotReadyForSend();
transactionManager.maybeAddPartitionToTransaction(tp0);
Future<RecordMetadata> responseFuture = accumulator.append(tp0, time.milliseconds(), "key".getBytes(),
"value".getBytes(), Record.EMPTY_HEADERS, null, MAX_BLOCK_TIMEOUT, false).future;
Future<RecordMetadata> responseFuture = appendToAccumulator(tp0);
assertFalse(responseFuture.isDone());
prepareAddPartitionsToTxnResponse(Errors.NONE, tp0, epoch, pid);
@ -1251,8 +1250,7 @@ public class TransactionManagerTest { @@ -1251,8 +1250,7 @@ public class TransactionManagerTest {
transactionManager.failIfNotReadyForSend();
transactionManager.maybeAddPartitionToTransaction(unauthorizedPartition);
Future<RecordMetadata> responseFuture = accumulator.append(unauthorizedPartition, time.milliseconds(), "key".getBytes(),
"value".getBytes(), Record.EMPTY_HEADERS, null, MAX_BLOCK_TIMEOUT, false).future;
Future<RecordMetadata> responseFuture = appendToAccumulator(unauthorizedPartition);
prepareAddPartitionsToTxn(singletonMap(unauthorizedPartition, Errors.TOPIC_AUTHORIZATION_FAILED));
sender.runOnce();
@ -1275,8 +1273,7 @@ public class TransactionManagerTest { @@ -1275,8 +1273,7 @@ public class TransactionManagerTest {
transactionManager.failIfNotReadyForSend();
transactionManager.maybeAddPartitionToTransaction(tp0);
responseFuture = accumulator.append(tp0, time.milliseconds(), "key".getBytes(),
"value".getBytes(), Record.EMPTY_HEADERS, null, MAX_BLOCK_TIMEOUT, false).future;
responseFuture = appendToAccumulator(tp0);
prepareAddPartitionsToTxn(singletonMap(tp0, Errors.NONE));
sender.runOnce();
@ -1309,15 +1306,13 @@ public class TransactionManagerTest { @@ -1309,15 +1306,13 @@ public class TransactionManagerTest {
transactionManager.maybeAddPartitionToTransaction(tp0);
prepareAddPartitionsToTxn(tp0, Errors.NONE);
Future<RecordMetadata> authorizedTopicProduceFuture = accumulator.append(unauthorizedPartition, time.milliseconds(),
"key".getBytes(), "value".getBytes(), Record.EMPTY_HEADERS, null, MAX_BLOCK_TIMEOUT, false).future;
Future<RecordMetadata> authorizedTopicProduceFuture = appendToAccumulator(unauthorizedPartition);
sender.runOnce();
assertTrue(transactionManager.isPartitionAdded(tp0));
transactionManager.failIfNotReadyForSend();
transactionManager.maybeAddPartitionToTransaction(unauthorizedPartition);
Future<RecordMetadata> unauthorizedTopicProduceFuture = accumulator.append(unauthorizedPartition, time.milliseconds(),
"key".getBytes(), "value".getBytes(), Record.EMPTY_HEADERS, null, MAX_BLOCK_TIMEOUT, false).future;
Future<RecordMetadata> unauthorizedTopicProduceFuture = appendToAccumulator(unauthorizedPartition);
prepareAddPartitionsToTxn(singletonMap(unauthorizedPartition, Errors.TOPIC_AUTHORIZATION_FAILED));
sender.runOnce();
assertTrue(transactionManager.hasAbortableError());
@ -1342,8 +1337,7 @@ public class TransactionManagerTest { @@ -1342,8 +1337,7 @@ public class TransactionManagerTest {
transactionManager.failIfNotReadyForSend();
transactionManager.maybeAddPartitionToTransaction(tp0);
FutureRecordMetadata nextTransactionFuture = accumulator.append(tp0, time.milliseconds(), "key".getBytes(),
"value".getBytes(), Record.EMPTY_HEADERS, null, MAX_BLOCK_TIMEOUT, false).future;
FutureRecordMetadata nextTransactionFuture = appendToAccumulator(tp0);
prepareAddPartitionsToTxn(singletonMap(tp0, Errors.NONE));
sender.runOnce();
@ -1376,8 +1370,7 @@ public class TransactionManagerTest { @@ -1376,8 +1370,7 @@ public class TransactionManagerTest {
transactionManager.maybeAddPartitionToTransaction(tp0);
prepareAddPartitionsToTxn(tp0, Errors.NONE);
Future<RecordMetadata> authorizedTopicProduceFuture = accumulator.append(tp0, time.milliseconds(),
"key".getBytes(), "value".getBytes(), Record.EMPTY_HEADERS, null, MAX_BLOCK_TIMEOUT, false).future;
Future<RecordMetadata> authorizedTopicProduceFuture = appendToAccumulator(tp0);
sender.runOnce();
assertTrue(transactionManager.isPartitionAdded(tp0));
@ -1389,8 +1382,7 @@ public class TransactionManagerTest { @@ -1389,8 +1382,7 @@ public class TransactionManagerTest {
transactionManager.failIfNotReadyForSend();
transactionManager.maybeAddPartitionToTransaction(unauthorizedPartition);
Future<RecordMetadata> unauthorizedTopicProduceFuture = accumulator.append(unauthorizedPartition, time.milliseconds(),
"key".getBytes(), "value".getBytes(), Record.EMPTY_HEADERS, null, MAX_BLOCK_TIMEOUT, false).future;
Future<RecordMetadata> unauthorizedTopicProduceFuture = appendToAccumulator(unauthorizedPartition);
prepareAddPartitionsToTxn(singletonMap(unauthorizedPartition, Errors.TOPIC_AUTHORIZATION_FAILED));
sender.runOnce();
assertTrue(transactionManager.hasAbortableError());
@ -1419,8 +1411,7 @@ public class TransactionManagerTest { @@ -1419,8 +1411,7 @@ public class TransactionManagerTest {
transactionManager.failIfNotReadyForSend();
transactionManager.maybeAddPartitionToTransaction(tp0);
FutureRecordMetadata nextTransactionFuture = accumulator.append(tp0, time.milliseconds(), "key".getBytes(),
"value".getBytes(), Record.EMPTY_HEADERS, null, MAX_BLOCK_TIMEOUT, false).future;
FutureRecordMetadata nextTransactionFuture = appendToAccumulator(tp0);
prepareAddPartitionsToTxn(singletonMap(tp0, Errors.NONE));
sender.runOnce();
@ -1472,8 +1463,7 @@ public class TransactionManagerTest { @@ -1472,8 +1463,7 @@ public class TransactionManagerTest {
transactionManager.failIfNotReadyForSend();
transactionManager.maybeAddPartitionToTransaction(tp0);
Future<RecordMetadata> responseFuture = accumulator.append(tp0, time.milliseconds(), "key".getBytes(),
"value".getBytes(), Record.EMPTY_HEADERS, null, MAX_BLOCK_TIMEOUT, false).future;
Future<RecordMetadata> responseFuture = appendToAccumulator(tp0);
assertFalse(responseFuture.isDone());
@ -1518,8 +1508,7 @@ public class TransactionManagerTest { @@ -1518,8 +1508,7 @@ public class TransactionManagerTest {
transactionManager.failIfNotReadyForSend();
transactionManager.maybeAddPartitionToTransaction(tp0);
Future<RecordMetadata> responseFuture = accumulator.append(tp0, time.milliseconds(), "key".getBytes(),
"value".getBytes(), Record.EMPTY_HEADERS, null, MAX_BLOCK_TIMEOUT, false).future;
Future<RecordMetadata> responseFuture = appendToAccumulator(tp0);
assertFalse(responseFuture.isDone());
prepareAddPartitionsToTxnResponse(Errors.NONE, tp0, epoch, pid);
@ -1534,8 +1523,7 @@ public class TransactionManagerTest { @@ -1534,8 +1523,7 @@ public class TransactionManagerTest {
// In the mean time, the user does a second produce to a different partition
transactionManager.failIfNotReadyForSend();
transactionManager.maybeAddPartitionToTransaction(tp1);
Future<RecordMetadata> secondResponseFuture = accumulator.append(tp0, time.milliseconds(), "key".getBytes(),
"value".getBytes(), Record.EMPTY_HEADERS, null, MAX_BLOCK_TIMEOUT, false).future;
Future<RecordMetadata> secondResponseFuture = appendToAccumulator(tp0);
prepareAddPartitionsToTxnResponse(Errors.NONE, tp1, epoch, pid);
prepareProduceResponse(Errors.NONE, pid, epoch);
@ -1570,8 +1558,7 @@ public class TransactionManagerTest { @@ -1570,8 +1558,7 @@ public class TransactionManagerTest {
transactionManager.failIfNotReadyForSend();
transactionManager.maybeAddPartitionToTransaction(tp0);
Future<RecordMetadata> responseFuture = accumulator.append(tp0, time.milliseconds(), "key".getBytes(),
"value".getBytes(), Record.EMPTY_HEADERS, null, MAX_BLOCK_TIMEOUT, false).future;
Future<RecordMetadata> responseFuture = appendToAccumulator(tp0);
assertFalse(responseFuture.isDone());
prepareAddPartitionsToTxnResponse(Errors.NONE, tp0, epoch, pid);
@ -1609,8 +1596,7 @@ public class TransactionManagerTest { @@ -1609,8 +1596,7 @@ public class TransactionManagerTest {
transactionManager.failIfNotReadyForSend();
transactionManager.maybeAddPartitionToTransaction(tp0);
Future<RecordMetadata> responseFuture = accumulator.append(tp0, time.milliseconds(), "key".getBytes(),
"value".getBytes(), Record.EMPTY_HEADERS, null, MAX_BLOCK_TIMEOUT, false).future;
Future<RecordMetadata> responseFuture = appendToAccumulator(tp0);
TransactionalRequestResult commitResult = transactionManager.beginCommit();
assertFalse(responseFuture.isDone());
@ -1659,8 +1645,7 @@ public class TransactionManagerTest { @@ -1659,8 +1645,7 @@ public class TransactionManagerTest {
transactionManager.failIfNotReadyForSend();
transactionManager.maybeAddPartitionToTransaction(tp0);
Future<RecordMetadata> responseFuture = accumulator.append(tp0, time.milliseconds(), "key".getBytes(),
"value".getBytes(), Record.EMPTY_HEADERS, null, MAX_BLOCK_TIMEOUT, false).future;
Future<RecordMetadata> responseFuture = appendToAccumulator(tp0);
assertFalse(responseFuture.isDone());
prepareAddPartitionsToTxnResponse(Errors.NONE, tp0, epoch, pid);
@ -1688,8 +1673,7 @@ public class TransactionManagerTest { @@ -1688,8 +1673,7 @@ public class TransactionManagerTest {
transactionManager.failIfNotReadyForSend();
transactionManager.maybeAddPartitionToTransaction(tp0);
Future<RecordMetadata> responseFuture = accumulator.append(tp0, time.milliseconds(), "key".getBytes(),
"value".getBytes(), Record.EMPTY_HEADERS, null, MAX_BLOCK_TIMEOUT, false).future;
Future<RecordMetadata> responseFuture = appendToAccumulator(tp0);
assertFalse(responseFuture.isDone());
prepareAddPartitionsToTxnResponse(Errors.NONE, tp0, epoch, pid);
@ -1726,8 +1710,7 @@ public class TransactionManagerTest { @@ -1726,8 +1710,7 @@ public class TransactionManagerTest {
transactionManager.failIfNotReadyForSend();
transactionManager.maybeAddPartitionToTransaction(tp0);
Future<RecordMetadata> responseFuture = accumulator.append(tp0, time.milliseconds(), "key".getBytes(),
"value".getBytes(), Record.EMPTY_HEADERS, null, MAX_BLOCK_TIMEOUT, false).future;
Future<RecordMetadata> responseFuture = appendToAccumulator(tp0);
prepareAddPartitionsToTxn(tp0, Errors.NONE);
sender.runOnce();
@ -1776,8 +1759,7 @@ public class TransactionManagerTest { @@ -1776,8 +1759,7 @@ public class TransactionManagerTest {
transactionManager.failIfNotReadyForSend();
transactionManager.maybeAddPartitionToTransaction(tp0);
Future<RecordMetadata> responseFuture = accumulator.append(tp0, time.milliseconds(), "key".getBytes(),
"value".getBytes(), Record.EMPTY_HEADERS, null, MAX_BLOCK_TIMEOUT, false).future;
Future<RecordMetadata> responseFuture = appendToAccumulator(tp0);
prepareAddPartitionsToTxn(tp0, Errors.NONE);
sender.runOnce();
@ -1832,8 +1814,7 @@ public class TransactionManagerTest { @@ -1832,8 +1814,7 @@ public class TransactionManagerTest {
transactionManager.failIfNotReadyForSend();
transactionManager.maybeAddPartitionToTransaction(tp0);
Future<RecordMetadata> responseFuture = accumulator.append(tp0, time.milliseconds(), "key".getBytes(),
"value".getBytes(), Record.EMPTY_HEADERS, null, MAX_BLOCK_TIMEOUT, false).future;
Future<RecordMetadata> responseFuture = appendToAccumulator(tp0);
assertFalse(responseFuture.isDone());
sender.runOnce(); // Send AddPartitionsRequest
@ -1861,8 +1842,7 @@ public class TransactionManagerTest { @@ -1861,8 +1842,7 @@ public class TransactionManagerTest {
transactionManager.failIfNotReadyForSend();
transactionManager.maybeAddPartitionToTransaction(tp0);
Future<RecordMetadata> responseFuture = accumulator.append(tp0, time.milliseconds(), "key".getBytes(),
"value".getBytes(), Record.EMPTY_HEADERS, null, MAX_BLOCK_TIMEOUT, false).future;
Future<RecordMetadata> responseFuture = appendToAccumulator(tp0);
assertFalse(responseFuture.isDone());
@ -1894,8 +1874,7 @@ public class TransactionManagerTest { @@ -1894,8 +1874,7 @@ public class TransactionManagerTest {
transactionManager.maybeAddPartitionToTransaction(tp0);
prepareAddPartitionsToTxnResponse(Errors.UNKNOWN_TOPIC_OR_PARTITION, tp0, producerEpoch, producerId);
Future<RecordMetadata> responseFuture = accumulator.append(tp0, time.milliseconds(), "key".getBytes(),
"value".getBytes(), Record.EMPTY_HEADERS, null, MAX_BLOCK_TIMEOUT, false).future;
Future<RecordMetadata> responseFuture = appendToAccumulator(tp0);
sender.runOnce(); // Send AddPartitions and let it fail
assertFalse(responseFuture.isDone());
@ -1934,8 +1913,7 @@ public class TransactionManagerTest { @@ -1934,8 +1913,7 @@ public class TransactionManagerTest {
prepareAddPartitionsToTxnResponse(Errors.NONE, tp0, producerEpoch, producerId);
prepareProduceResponse(Errors.REQUEST_TIMED_OUT, producerId, producerEpoch);
Future<RecordMetadata> responseFuture = accumulator.append(tp0, time.milliseconds(), "key".getBytes(),
"value".getBytes(), Record.EMPTY_HEADERS, null, MAX_BLOCK_TIMEOUT, false).future;
Future<RecordMetadata> responseFuture = appendToAccumulator(tp0);
sender.runOnce(); // Send AddPartitions
sender.runOnce(); // Send ProduceRequest and let it fail
@ -1970,8 +1948,7 @@ public class TransactionManagerTest { @@ -1970,8 +1948,7 @@ public class TransactionManagerTest {
transactionManager.failIfNotReadyForSend();
transactionManager.maybeAddPartitionToTransaction(tp0);
Future<RecordMetadata> responseFuture = accumulator.append(tp0, time.milliseconds(), "key".getBytes(),
"value".getBytes(), Record.EMPTY_HEADERS, null, MAX_BLOCK_TIMEOUT, false).future;
Future<RecordMetadata> responseFuture = appendToAccumulator(tp0);
assertFalse(responseFuture.isDone());
prepareAddPartitionsToTxnResponse(Errors.UNKNOWN_TOPIC_OR_PARTITION, tp0, epoch, pid);
@ -2131,12 +2108,10 @@ public class TransactionManagerTest { @@ -2131,12 +2108,10 @@ public class TransactionManagerTest {
transactionManager.beginTransaction();
transactionManager.failIfNotReadyForSend();
transactionManager.maybeAddPartitionToTransaction(tp0);
accumulator.append(tp0, time.milliseconds(), "key".getBytes(),
"value".getBytes(), Record.EMPTY_HEADERS, null, MAX_BLOCK_TIMEOUT, false);
appendToAccumulator(tp0);
transactionManager.failIfNotReadyForSend();
transactionManager.maybeAddPartitionToTransaction(tp1);
accumulator.append(tp1, time.milliseconds(), "key".getBytes(),
"value".getBytes(), Record.EMPTY_HEADERS, null, MAX_BLOCK_TIMEOUT, false);
appendToAccumulator(tp1);
assertFalse(transactionManager.isSendToPartitionAllowed(tp0));
assertFalse(transactionManager.isSendToPartitionAllowed(tp1));
@ -2188,8 +2163,7 @@ public class TransactionManagerTest { @@ -2188,8 +2163,7 @@ public class TransactionManagerTest {
PartitionInfo part1 = new PartitionInfo(topic, 1, node1, null, null);
Cluster cluster = new Cluster(null, Collections.singletonList(node1), Collections.singletonList(part1),
Collections.emptySet(), Collections.emptySet());
accumulator.append(tp1, time.milliseconds(), "key".getBytes(),
"value".getBytes(), Record.EMPTY_HEADERS, null, MAX_BLOCK_TIMEOUT, false);
appendToAccumulator(tp1);
Map<Integer, List<ProducerBatch>> drainedBatches = accumulator.drain(cluster, Collections.singleton(node1),
Integer.MAX_VALUE,
time.milliseconds());
@ -2208,8 +2182,7 @@ public class TransactionManagerTest { @@ -2208,8 +2182,7 @@ public class TransactionManagerTest {
doInitTransactions(pid, epoch);
transactionManager.beginTransaction();
// Don't execute transactionManager.maybeAddPartitionToTransaction(tp0). This should result in an error on drain.
accumulator.append(tp0, time.milliseconds(), "key".getBytes(),
"value".getBytes(), Record.EMPTY_HEADERS, null, MAX_BLOCK_TIMEOUT, false);
appendToAccumulator(tp0);
Node node1 = new Node(0, "localhost", 1111);
PartitionInfo part1 = new PartitionInfo(topic, 0, node1, null, null);
@ -2235,8 +2208,7 @@ public class TransactionManagerTest { @@ -2235,8 +2208,7 @@ public class TransactionManagerTest {
transactionManager.failIfNotReadyForSend();
transactionManager.maybeAddPartitionToTransaction(tp0);
Future<RecordMetadata> responseFuture = accumulator.append(tp0, time.milliseconds(), "key".getBytes(),
"value".getBytes(), Record.EMPTY_HEADERS, null, MAX_BLOCK_TIMEOUT, false).future;
Future<RecordMetadata> responseFuture = appendToAccumulator(tp0);
prepareAddPartitionsToTxnResponse(Errors.NONE, tp0, epoch, pid);
prepareProduceResponse(Errors.NOT_LEADER_FOR_PARTITION, pid, epoch);
@ -2264,8 +2236,7 @@ public class TransactionManagerTest { @@ -2264,8 +2236,7 @@ public class TransactionManagerTest {
transactionManager.failIfNotReadyForSend();
transactionManager.maybeAddPartitionToTransaction(tp0);
Future<RecordMetadata> responseFuture = accumulator.append(tp0, time.milliseconds(), "key".getBytes(),
"value".getBytes(), Record.EMPTY_HEADERS, null, MAX_BLOCK_TIMEOUT, false).future;
Future<RecordMetadata> responseFuture = appendToAccumulator(tp0);
assertFalse(responseFuture.isDone());
@ -2313,10 +2284,8 @@ public class TransactionManagerTest { @@ -2313,10 +2284,8 @@ public class TransactionManagerTest {
transactionManager.failIfNotReadyForSend();
transactionManager.maybeAddPartitionToTransaction(tp1);
Future<RecordMetadata> firstBatchResponse = accumulator.append(tp0, time.milliseconds(), "key".getBytes(),
"value".getBytes(), Record.EMPTY_HEADERS, null, MAX_BLOCK_TIMEOUT, false).future;
Future<RecordMetadata> secondBatchResponse = accumulator.append(tp1, time.milliseconds(), "key".getBytes(),
"value".getBytes(), Record.EMPTY_HEADERS, null, MAX_BLOCK_TIMEOUT, false).future;
Future<RecordMetadata> firstBatchResponse = appendToAccumulator(tp0);
Future<RecordMetadata> secondBatchResponse = appendToAccumulator(tp1);
assertFalse(firstBatchResponse.isDone());
assertFalse(secondBatchResponse.isDone());
@ -2378,8 +2347,7 @@ public class TransactionManagerTest { @@ -2378,8 +2347,7 @@ public class TransactionManagerTest {
transactionManager.failIfNotReadyForSend();
transactionManager.maybeAddPartitionToTransaction(tp0);
Future<RecordMetadata> responseFuture = accumulator.append(tp0, time.milliseconds(), "key".getBytes(),
"value".getBytes(), Record.EMPTY_HEADERS, null, MAX_BLOCK_TIMEOUT, false).future;
Future<RecordMetadata> responseFuture = appendToAccumulator(tp0);
assertFalse(responseFuture.isDone());
@ -2445,8 +2413,7 @@ public class TransactionManagerTest { @@ -2445,8 +2413,7 @@ public class TransactionManagerTest {
transactionManager.failIfNotReadyForSend();
transactionManager.maybeAddPartitionToTransaction(tp0);
Future<RecordMetadata> responseFuture = accumulator.append(tp0, time.milliseconds(), "key".getBytes(),
"value".getBytes(), Record.EMPTY_HEADERS, null, MAX_BLOCK_TIMEOUT, false).future;
Future<RecordMetadata> responseFuture = appendToAccumulator(tp0);
assertFalse(responseFuture.isDone());
@ -2632,6 +2599,12 @@ public class TransactionManagerTest { @@ -2632,6 +2599,12 @@ public class TransactionManagerTest {
verifyCommitOrAbortTranscationRetriable(TransactionResult.ABORT, TransactionResult.COMMIT);
}
private FutureRecordMetadata appendToAccumulator(TopicPartition tp) throws InterruptedException {
final long nowMs = time.milliseconds();
return accumulator.append(tp, nowMs, "key".getBytes(), "value".getBytes(), Record.EMPTY_HEADERS,
null, MAX_BLOCK_TIMEOUT, false, nowMs).future;
}
private void verifyCommitOrAbortTranscationRetriable(TransactionResult firstTransactionResult,
TransactionResult retryTransactionResult)
throws InterruptedException {
@ -2644,8 +2617,7 @@ public class TransactionManagerTest { @@ -2644,8 +2617,7 @@ public class TransactionManagerTest {
transactionManager.failIfNotReadyForSend();
transactionManager.maybeAddPartitionToTransaction(tp0);
accumulator.append(tp0, time.milliseconds(), "key".getBytes(),
"value".getBytes(), Record.EMPTY_HEADERS, null, MAX_BLOCK_TIMEOUT, false);
appendToAccumulator(tp0);
prepareAddPartitionsToTxnResponse(Errors.NONE, tp0, epoch, pid);
@ -2686,8 +2658,7 @@ public class TransactionManagerTest { @@ -2686,8 +2658,7 @@ public class TransactionManagerTest {
transactionManager.failIfNotReadyForSend();
transactionManager.maybeAddPartitionToTransaction(tp0);
Future<RecordMetadata> responseFuture = accumulator.append(tp0, time.milliseconds(), "key".getBytes(),
"value".getBytes(), Record.EMPTY_HEADERS, null, MAX_BLOCK_TIMEOUT, false).future;
Future<RecordMetadata> responseFuture = appendToAccumulator(tp0);
assertFalse(responseFuture.isDone());
prepareAddPartitionsToTxn(tp0, error);
sender.runOnce(); // attempt send addPartitions.

Loading…
Cancel
Save