diff --git a/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java b/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java index 4b7f3d2bd98..c7516392e39 100644 --- a/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java +++ b/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java @@ -880,14 +880,16 @@ public class KafkaProducer implements Producer { try { throwIfProducerClosed(); // first make sure the metadata for the topic is available + long nowMs = time.milliseconds(); ClusterAndWaitTime clusterAndWaitTime; try { - clusterAndWaitTime = waitOnMetadata(record.topic(), record.partition(), maxBlockTimeMs); + clusterAndWaitTime = waitOnMetadata(record.topic(), record.partition(), nowMs, maxBlockTimeMs); } catch (KafkaException e) { if (metadata.isClosed()) throw new KafkaException("Producer closed while send in progress", e); throw e; } + nowMs += clusterAndWaitTime.waitedOnMetadataMs; long remainingWaitMs = Math.max(0, maxBlockTimeMs - clusterAndWaitTime.waitedOnMetadataMs); Cluster cluster = clusterAndWaitTime.cluster; byte[] serializedKey; @@ -915,7 +917,7 @@ public class KafkaProducer implements Producer { int serializedSize = AbstractRecords.estimateSizeInBytesUpperBound(apiVersions.maxUsableProduceMagic(), compressionType, serializedKey, serializedValue, headers); ensureValidRecordSize(serializedSize); - long timestamp = record.timestamp() == null ? time.milliseconds() : record.timestamp(); + long timestamp = record.timestamp() == null ? nowMs : record.timestamp(); if (log.isTraceEnabled()) { log.trace("Attempting to append record {} with callback {} to topic {} partition {}", record, callback, record.topic(), partition); } @@ -926,8 +928,8 @@ public class KafkaProducer implements Producer { transactionManager.failIfNotReadyForSend(); } RecordAccumulator.RecordAppendResult result = accumulator.append(tp, timestamp, serializedKey, - serializedValue, headers, interceptCallback, remainingWaitMs, true); - + serializedValue, headers, interceptCallback, remainingWaitMs, true, nowMs); + if (result.abortForNewBatch) { int prevPartition = partition; partitioner.onNewBatch(record.topic(), cluster, prevPartition); @@ -940,9 +942,9 @@ public class KafkaProducer implements Producer { interceptCallback = new InterceptorCallback<>(callback, this.interceptors, tp); result = accumulator.append(tp, timestamp, serializedKey, - serializedValue, headers, interceptCallback, remainingWaitMs, false); + serializedValue, headers, interceptCallback, remainingWaitMs, false, nowMs); } - + if (transactionManager != null && transactionManager.isTransactional()) transactionManager.maybeAddPartitionToTransaction(tp); @@ -991,18 +993,19 @@ public class KafkaProducer implements Producer { * Wait for cluster metadata including partitions for the given topic to be available. * @param topic The topic we want metadata for * @param partition A specific partition expected to exist in metadata, or null if there's no preference + * @param nowMs The current time in ms * @param maxWaitMs The maximum time in ms for waiting on the metadata * @return The cluster containing topic metadata and the amount of time we waited in ms * @throws KafkaException for all Kafka-related exceptions, including the case where this method is called after producer close */ - private ClusterAndWaitTime waitOnMetadata(String topic, Integer partition, long maxWaitMs) throws InterruptedException { + private ClusterAndWaitTime waitOnMetadata(String topic, Integer partition, long nowMs, long maxWaitMs) throws InterruptedException { // add topic to metadata topic list if it is not there already and reset expiry Cluster cluster = metadata.fetch(); if (cluster.invalidTopics().contains(topic)) throw new InvalidTopicException(topic); - metadata.add(topic); + metadata.add(topic, nowMs); Integer partitionsCount = cluster.partitionCountForTopic(topic); // Return cached metadata if we have it, and if the record's partition is either undefined @@ -1010,9 +1013,8 @@ public class KafkaProducer implements Producer { if (partitionsCount != null && (partition == null || partition < partitionsCount)) return new ClusterAndWaitTime(cluster, 0); - long begin = time.milliseconds(); long remainingWaitMs = maxWaitMs; - long elapsed; + long elapsed = 0; // Issue metadata requests until we have metadata for the topic and the requested partition, // or until maxWaitTimeMs is exceeded. This is necessary in case the metadata // is stale and the number of partitions for this topic has increased in the meantime. @@ -1022,7 +1024,7 @@ public class KafkaProducer implements Producer { } else { log.trace("Requesting metadata update for topic {}.", topic); } - metadata.add(topic); + metadata.add(topic, nowMs + elapsed); int version = metadata.requestUpdate(); sender.wakeup(); try { @@ -1034,7 +1036,7 @@ public class KafkaProducer implements Producer { topic, maxWaitMs)); } cluster = metadata.fetch(); - elapsed = time.milliseconds() - begin; + elapsed = time.milliseconds() - nowMs; if (elapsed >= maxWaitMs) { throw new TimeoutException(partitionsCount == null ? String.format("Topic %s not present in metadata after %d ms.", @@ -1124,7 +1126,7 @@ public class KafkaProducer implements Producer { public List partitionsFor(String topic) { Objects.requireNonNull(topic, "topic cannot be null"); try { - return waitOnMetadata(topic, null, maxBlockTimeMs).cluster.partitionsForTopic(topic); + return waitOnMetadata(topic, null, time.milliseconds(), maxBlockTimeMs).cluster.partitionsForTopic(topic); } catch (InterruptedException e) { throw new InterruptException(e); } diff --git a/clients/src/main/java/org/apache/kafka/clients/producer/internals/ProducerMetadata.java b/clients/src/main/java/org/apache/kafka/clients/producer/internals/ProducerMetadata.java index ef53af4a177..6d88640b2df 100644 --- a/clients/src/main/java/org/apache/kafka/clients/producer/internals/ProducerMetadata.java +++ b/clients/src/main/java/org/apache/kafka/clients/producer/internals/ProducerMetadata.java @@ -32,7 +32,6 @@ import java.util.Objects; import java.util.Set; public class ProducerMetadata extends Metadata { - private static final long TOPIC_EXPIRY_NEEDS_UPDATE = -1L; static final long TOPIC_EXPIRY_MS = 5 * 60 * 1000; /* Topics with expiry time */ @@ -55,9 +54,9 @@ public class ProducerMetadata extends Metadata { return new MetadataRequest.Builder(new ArrayList<>(topics.keySet()), true); } - public synchronized void add(String topic) { + public synchronized void add(String topic, long nowMs) { Objects.requireNonNull(topic, "topic cannot be null"); - if (topics.put(topic, TOPIC_EXPIRY_NEEDS_UPDATE) == null) { + if (topics.put(topic, nowMs + TOPIC_EXPIRY_MS) == null) { requestUpdateForNewTopics(); } } @@ -76,9 +75,6 @@ public class ProducerMetadata extends Metadata { Long expireMs = topics.get(topic); if (expireMs == null) { return false; - } else if (expireMs == TOPIC_EXPIRY_NEEDS_UPDATE) { - topics.put(topic, nowMs + TOPIC_EXPIRY_MS); - return true; } else if (expireMs <= nowMs) { log.debug("Removing unused topic {} from the metadata list, expiryMs {} now {}", topic, expireMs, nowMs); topics.remove(topic); diff --git a/clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordAccumulator.java b/clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordAccumulator.java index c02b60eae43..80d0c5fcb9c 100644 --- a/clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordAccumulator.java +++ b/clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordAccumulator.java @@ -180,8 +180,9 @@ public final class RecordAccumulator { * @param headers the Headers for the record * @param callback The user-supplied callback to execute when the request is complete * @param maxTimeToBlock The maximum time in milliseconds to block for buffer memory to be available - * @param abortOnNewBatch A boolean that indicates returning before a new batch is created and + * @param abortOnNewBatch A boolean that indicates returning before a new batch is created and * running the the partitioner's onNewBatch method before trying to append again + * @param nowMs The current time, in milliseconds */ public RecordAppendResult append(TopicPartition tp, long timestamp, @@ -190,7 +191,8 @@ public final class RecordAccumulator { Header[] headers, Callback callback, long maxTimeToBlock, - boolean abortOnNewBatch) throws InterruptedException { + boolean abortOnNewBatch, + long nowMs) throws InterruptedException { // We keep track of the number of appending thread to make sure we do not miss batches in // abortIncompleteBatches(). appendsInProgress.incrementAndGet(); @@ -202,7 +204,7 @@ public final class RecordAccumulator { synchronized (dq) { if (closed) throw new KafkaException("Producer closed while send in progress"); - RecordAppendResult appendResult = tryAppend(timestamp, key, value, headers, callback, dq); + RecordAppendResult appendResult = tryAppend(timestamp, key, value, headers, callback, dq, nowMs); if (appendResult != null) return appendResult; } @@ -212,26 +214,29 @@ public final class RecordAccumulator { // Return a result that will cause another call to append. return new RecordAppendResult(null, false, false, true); } - + byte maxUsableMagic = apiVersions.maxUsableProduceMagic(); int size = Math.max(this.batchSize, AbstractRecords.estimateSizeInBytesUpperBound(maxUsableMagic, compression, key, value, headers)); log.trace("Allocating a new {} byte message buffer for topic {} partition {}", size, tp.topic(), tp.partition()); buffer = free.allocate(size, maxTimeToBlock); + + // Update the current time in case the buffer allocation blocked above. + nowMs = time.milliseconds(); synchronized (dq) { // Need to check if producer is closed again after grabbing the dequeue lock. if (closed) throw new KafkaException("Producer closed while send in progress"); - RecordAppendResult appendResult = tryAppend(timestamp, key, value, headers, callback, dq); + RecordAppendResult appendResult = tryAppend(timestamp, key, value, headers, callback, dq, nowMs); if (appendResult != null) { // Somebody else found us a batch, return the one we waited for! Hopefully this doesn't happen often... return appendResult; } MemoryRecordsBuilder recordsBuilder = recordsBuilder(buffer, maxUsableMagic); - ProducerBatch batch = new ProducerBatch(tp, recordsBuilder, time.milliseconds()); + ProducerBatch batch = new ProducerBatch(tp, recordsBuilder, nowMs); FutureRecordMetadata future = Objects.requireNonNull(batch.tryAppend(timestamp, key, value, headers, - callback, time.milliseconds())); + callback, nowMs)); dq.addLast(batch); incomplete.add(batch); @@ -264,10 +269,10 @@ public final class RecordAccumulator { * if it is expired, or when the producer is closed. */ private RecordAppendResult tryAppend(long timestamp, byte[] key, byte[] value, Header[] headers, - Callback callback, Deque deque) { + Callback callback, Deque deque, long nowMs) { ProducerBatch last = deque.peekLast(); if (last != null) { - FutureRecordMetadata future = last.tryAppend(timestamp, key, value, headers, callback, time.milliseconds()); + FutureRecordMetadata future = last.tryAppend(timestamp, key, value, headers, callback, nowMs); if (future == null) last.closeForRecordAppends(); else @@ -287,7 +292,7 @@ public final class RecordAccumulator { muted.remove(tp); return false; } - + return true; } diff --git a/clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java b/clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java index 8d0ef53b694..3400a4a40b5 100644 --- a/clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java +++ b/clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java @@ -347,7 +347,7 @@ public class Sender implements Runnable { // topics which may have expired. Add the topic again to metadata to ensure it is included // and request metadata update, since there are messages to send to the topic. for (String topic : result.unknownLeaderTopics) - this.metadata.add(topic); + this.metadata.add(topic, now); log.debug("Requesting metadata update due to unknown leader topics from the batched records: {}", result.unknownLeaderTopics); diff --git a/clients/src/test/java/org/apache/kafka/clients/producer/KafkaProducerTest.java b/clients/src/test/java/org/apache/kafka/clients/producer/KafkaProducerTest.java index a209a923b36..eee067fa620 100644 --- a/clients/src/test/java/org/apache/kafka/clients/producer/KafkaProducerTest.java +++ b/clients/src/test/java/org/apache/kafka/clients/producer/KafkaProducerTest.java @@ -530,12 +530,13 @@ public class KafkaProducerTest { @SuppressWarnings("unchecked") Serializer valueSerializer = mock(serializerClassToMock); + long nowMs = Time.SYSTEM.milliseconds(); String topic = "topic"; ProducerMetadata metadata = newMetadata(0, 90000); - metadata.add(topic); + metadata.add(topic, nowMs); MetadataResponse initialUpdateResponse = TestUtils.metadataUpdateWith(1, singletonMap(topic, 1)); - metadata.update(initialUpdateResponse, Time.SYSTEM.milliseconds()); + metadata.update(initialUpdateResponse, nowMs); KafkaProducer producer = new KafkaProducer<>(configs, keySerializer, valueSerializer, metadata, null, null, Time.SYSTEM); @@ -596,10 +597,11 @@ public class KafkaProducerTest { String topic = "topic"; ProducerRecord record = new ProducerRecord<>(topic, "value"); + long nowMs = Time.SYSTEM.milliseconds(); ProducerMetadata metadata = newMetadata(0, 90000); - metadata.add(topic); + metadata.add(topic, nowMs); MetadataResponse initialUpdateResponse = TestUtils.metadataUpdateWith(1, singletonMap(topic, 1)); - metadata.update(initialUpdateResponse, Time.SYSTEM.milliseconds()); + metadata.update(initialUpdateResponse, nowMs); @SuppressWarnings("unchecked") // it is safe to suppress, since this is a mock class ProducerInterceptors interceptors = mock(ProducerInterceptors.class); diff --git a/clients/src/test/java/org/apache/kafka/clients/producer/internals/ProducerMetadataTest.java b/clients/src/test/java/org/apache/kafka/clients/producer/internals/ProducerMetadataTest.java index 3e29d61149e..5de333f3b2b 100644 --- a/clients/src/test/java/org/apache/kafka/clients/producer/internals/ProducerMetadataTest.java +++ b/clients/src/test/java/org/apache/kafka/clients/producer/internals/ProducerMetadataTest.java @@ -56,10 +56,10 @@ public class ProducerMetadataTest { @Test public void testMetadata() throws Exception { + long time = Time.SYSTEM.milliseconds(); String topic = "my-topic"; - metadata.add(topic); + metadata.add(topic, time); - long time = Time.SYSTEM.milliseconds(); metadata.update(responseWithTopics(Collections.emptySet()), time); assertTrue("No update needed.", metadata.timeToNextUpdate(time) > 0); metadata.requestUpdate(); @@ -139,17 +139,17 @@ public class ProducerMetadataTest { // New topic added to fetch set and update requested. It should allow immediate update. metadata.update(responseWithCurrentTopics(), now); - metadata.add("new-topic"); + metadata.add("new-topic", now); assertEquals(0, metadata.timeToNextUpdate(now)); // Even though add is called, immediate update isn't necessary if the new topic set isn't // containing a new topic, metadata.update(responseWithCurrentTopics(), now); - metadata.add("new-topic"); + metadata.add("new-topic", now); assertEquals(metadataExpireMs, metadata.timeToNextUpdate(now)); // If the new set of topics containing a new topic then it should allow immediate update. - metadata.add("another-new-topic"); + metadata.add("another-new-topic", now); assertEquals(0, metadata.timeToNextUpdate(now)); } @@ -158,7 +158,7 @@ public class ProducerMetadataTest { // Test that topic is expired if not used within the expiry interval long time = 0; String topic1 = "topic1"; - metadata.add(topic1); + metadata.add(topic1, time); metadata.update(responseWithCurrentTopics(), time); assertTrue(metadata.containsTopic(topic1)); @@ -167,13 +167,13 @@ public class ProducerMetadataTest { assertFalse("Unused topic not expired", metadata.containsTopic(topic1)); // Test that topic is not expired if used within the expiry interval - metadata.add("topic2"); + metadata.add("topic2", time); metadata.update(responseWithCurrentTopics(), time); for (int i = 0; i < 3; i++) { time += ProducerMetadata.TOPIC_EXPIRY_MS / 2; metadata.update(responseWithCurrentTopics(), time); assertTrue("Topic expired even though in use", metadata.containsTopic("topic2")); - metadata.add("topic2"); + metadata.add("topic2", time); } } diff --git a/clients/src/test/java/org/apache/kafka/clients/producer/internals/RecordAccumulatorTest.java b/clients/src/test/java/org/apache/kafka/clients/producer/internals/RecordAccumulatorTest.java index b04e5ddcfe5..08b29b0996c 100644 --- a/clients/src/test/java/org/apache/kafka/clients/producer/internals/RecordAccumulatorTest.java +++ b/clients/src/test/java/org/apache/kafka/clients/producer/internals/RecordAccumulatorTest.java @@ -107,7 +107,7 @@ public class RecordAccumulatorTest { int appends = expectedNumAppends(batchSize); for (int i = 0; i < appends; i++) { // append to the first batch - accum.append(tp1, 0L, key, value, Record.EMPTY_HEADERS, null, maxBlockTimeMs, false); + accum.append(tp1, 0L, key, value, Record.EMPTY_HEADERS, null, maxBlockTimeMs, false, time.milliseconds()); Deque partitionBatches = accum.batches().get(tp1); assertEquals(1, partitionBatches.size()); @@ -118,7 +118,7 @@ public class RecordAccumulatorTest { // this append doesn't fit in the first batch, so a new batch is created and the first batch is closed - accum.append(tp1, 0L, key, value, Record.EMPTY_HEADERS, null, maxBlockTimeMs, false); + accum.append(tp1, 0L, key, value, Record.EMPTY_HEADERS, null, maxBlockTimeMs, false, time.milliseconds()); Deque partitionBatches = accum.batches().get(tp1); assertEquals(2, partitionBatches.size()); Iterator partitionBatchesIterator = partitionBatches.iterator(); @@ -153,7 +153,7 @@ public class RecordAccumulatorTest { byte[] value = new byte[2 * batchSize]; RecordAccumulator accum = createTestRecordAccumulator( batchSize + DefaultRecordBatch.RECORD_BATCH_OVERHEAD, 10 * 1024, compressionType, 0); - accum.append(tp1, 0L, key, value, Record.EMPTY_HEADERS, null, maxBlockTimeMs, false); + accum.append(tp1, 0L, key, value, Record.EMPTY_HEADERS, null, maxBlockTimeMs, false, time.milliseconds()); assertEquals("Our partition's leader should be ready", Collections.singleton(node1), accum.ready(cluster, time.milliseconds()).readyNodes); Deque batches = accum.batches().get(tp1); @@ -191,7 +191,7 @@ public class RecordAccumulatorTest { RecordAccumulator accum = createTestRecordAccumulator( batchSize + DefaultRecordBatch.RECORD_BATCH_OVERHEAD, 10 * 1024, compressionType, 0); - accum.append(tp1, 0L, key, value, Record.EMPTY_HEADERS, null, maxBlockTimeMs, false); + accum.append(tp1, 0L, key, value, Record.EMPTY_HEADERS, null, maxBlockTimeMs, false, time.milliseconds()); assertEquals("Our partition's leader should be ready", Collections.singleton(node1), accum.ready(cluster, time.milliseconds()).readyNodes); Deque batches = accum.batches().get(tp1); @@ -215,7 +215,7 @@ public class RecordAccumulatorTest { int lingerMs = 10; RecordAccumulator accum = createTestRecordAccumulator( 1024 + DefaultRecordBatch.RECORD_BATCH_OVERHEAD, 10 * 1024, CompressionType.NONE, lingerMs); - accum.append(tp1, 0L, key, value, Record.EMPTY_HEADERS, null, maxBlockTimeMs, false); + accum.append(tp1, 0L, key, value, Record.EMPTY_HEADERS, null, maxBlockTimeMs, false, time.milliseconds()); assertEquals("No partitions should be ready", 0, accum.ready(cluster, time.milliseconds()).readyNodes.size()); time.sleep(10); assertEquals("Our partition's leader should be ready", Collections.singleton(node1), accum.ready(cluster, time.milliseconds()).readyNodes); @@ -238,7 +238,7 @@ public class RecordAccumulatorTest { List partitions = asList(tp1, tp2); for (TopicPartition tp : partitions) { for (int i = 0; i < appends; i++) - accum.append(tp, 0L, key, value, Record.EMPTY_HEADERS, null, maxBlockTimeMs, false); + accum.append(tp, 0L, key, value, Record.EMPTY_HEADERS, null, maxBlockTimeMs, false, time.milliseconds()); } assertEquals("Partition's leader should be ready", Collections.singleton(node1), accum.ready(cluster, time.milliseconds()).readyNodes); @@ -260,7 +260,7 @@ public class RecordAccumulatorTest { public void run() { for (int i = 0; i < msgs; i++) { try { - accum.append(new TopicPartition(topic, i % numParts), 0L, key, value, Record.EMPTY_HEADERS, null, maxBlockTimeMs, false); + accum.append(new TopicPartition(topic, i % numParts), 0L, key, value, Record.EMPTY_HEADERS, null, maxBlockTimeMs, false, time.milliseconds()); } catch (Exception e) { e.printStackTrace(); } @@ -304,7 +304,7 @@ public class RecordAccumulatorTest { // Partition on node1 only for (int i = 0; i < appends; i++) - accum.append(tp1, 0L, key, value, Record.EMPTY_HEADERS, null, maxBlockTimeMs, false); + accum.append(tp1, 0L, key, value, Record.EMPTY_HEADERS, null, maxBlockTimeMs, false, time.milliseconds()); RecordAccumulator.ReadyCheckResult result = accum.ready(cluster, time.milliseconds()); assertEquals("No nodes should be ready.", 0, result.readyNodes.size()); assertEquals("Next check time should be the linger time", lingerMs, result.nextReadyCheckDelayMs); @@ -313,14 +313,14 @@ public class RecordAccumulatorTest { // Add partition on node2 only for (int i = 0; i < appends; i++) - accum.append(tp3, 0L, key, value, Record.EMPTY_HEADERS, null, maxBlockTimeMs, false); + accum.append(tp3, 0L, key, value, Record.EMPTY_HEADERS, null, maxBlockTimeMs, false, time.milliseconds()); result = accum.ready(cluster, time.milliseconds()); assertEquals("No nodes should be ready.", 0, result.readyNodes.size()); assertEquals("Next check time should be defined by node1, half remaining linger time", lingerMs / 2, result.nextReadyCheckDelayMs); // Add data for another partition on node1, enough to make data sendable immediately for (int i = 0; i < appends + 1; i++) - accum.append(tp2, 0L, key, value, Record.EMPTY_HEADERS, null, maxBlockTimeMs, false); + accum.append(tp2, 0L, key, value, Record.EMPTY_HEADERS, null, maxBlockTimeMs, false, time.milliseconds()); result = accum.ready(cluster, time.milliseconds()); assertEquals("Node1 should be ready", Collections.singleton(node1), result.readyNodes); // Note this can actually be < linger time because it may use delays from partitions that aren't sendable @@ -342,7 +342,7 @@ public class RecordAccumulatorTest { new BufferPool(totalSize, batchSize, metrics, time, metricGrpName)); long now = time.milliseconds(); - accum.append(tp1, 0L, key, value, Record.EMPTY_HEADERS, null, maxBlockTimeMs, false); + accum.append(tp1, 0L, key, value, Record.EMPTY_HEADERS, null, maxBlockTimeMs, false, time.milliseconds()); RecordAccumulator.ReadyCheckResult result = accum.ready(cluster, now + lingerMs + 1); assertEquals("Node1 should be ready", Collections.singleton(node1), result.readyNodes); Map> batches = accum.drain(cluster, result.readyNodes, Integer.MAX_VALUE, now + lingerMs + 1); @@ -354,7 +354,7 @@ public class RecordAccumulatorTest { accum.reenqueue(batches.get(0).get(0), now); // Put message for partition 1 into accumulator - accum.append(tp2, 0L, key, value, Record.EMPTY_HEADERS, null, maxBlockTimeMs, false); + accum.append(tp2, 0L, key, value, Record.EMPTY_HEADERS, null, maxBlockTimeMs, false, time.milliseconds()); result = accum.ready(cluster, now + lingerMs + 1); assertEquals("Node1 should be ready", Collections.singleton(node1), result.readyNodes); @@ -380,7 +380,7 @@ public class RecordAccumulatorTest { 4 * 1024 + DefaultRecordBatch.RECORD_BATCH_OVERHEAD, 64 * 1024, CompressionType.NONE, lingerMs); for (int i = 0; i < 100; i++) { - accum.append(new TopicPartition(topic, i % 3), 0L, key, value, Record.EMPTY_HEADERS, null, maxBlockTimeMs, false); + accum.append(new TopicPartition(topic, i % 3), 0L, key, value, Record.EMPTY_HEADERS, null, maxBlockTimeMs, false, time.milliseconds()); assertTrue(accum.hasIncomplete()); } RecordAccumulator.ReadyCheckResult result = accum.ready(cluster, time.milliseconds()); @@ -418,7 +418,7 @@ public class RecordAccumulatorTest { public void testAwaitFlushComplete() throws Exception { RecordAccumulator accum = createTestRecordAccumulator( 4 * 1024 + DefaultRecordBatch.RECORD_BATCH_OVERHEAD, 64 * 1024, CompressionType.NONE, Integer.MAX_VALUE); - accum.append(new TopicPartition(topic, 0), 0L, key, value, Record.EMPTY_HEADERS, null, maxBlockTimeMs, false); + accum.append(new TopicPartition(topic, 0), 0L, key, value, Record.EMPTY_HEADERS, null, maxBlockTimeMs, false, time.milliseconds()); accum.beginFlush(); assertTrue(accum.flushInProgress()); @@ -447,7 +447,7 @@ public class RecordAccumulatorTest { } } for (int i = 0; i < numRecords; i++) - accum.append(new TopicPartition(topic, i % 3), 0L, key, value, null, new TestCallback(), maxBlockTimeMs, false); + accum.append(new TopicPartition(topic, i % 3), 0L, key, value, null, new TestCallback(), maxBlockTimeMs, false, time.milliseconds()); RecordAccumulator.ReadyCheckResult result = accum.ready(cluster, time.milliseconds()); assertFalse(result.readyNodes.isEmpty()); Map> drained = accum.drain(cluster, result.readyNodes, Integer.MAX_VALUE, time.milliseconds()); @@ -488,7 +488,7 @@ public class RecordAccumulatorTest { } } for (int i = 0; i < numRecords; i++) - accum.append(new TopicPartition(topic, i % 3), 0L, key, value, null, new TestCallback(), maxBlockTimeMs, false); + accum.append(new TopicPartition(topic, i % 3), 0L, key, value, null, new TestCallback(), maxBlockTimeMs, false, time.milliseconds()); RecordAccumulator.ReadyCheckResult result = accum.ready(cluster, time.milliseconds()); assertFalse(result.readyNodes.isEmpty()); Map> drained = accum.drain(cluster, result.readyNodes, Integer.MAX_VALUE, @@ -527,7 +527,7 @@ public class RecordAccumulatorTest { for (Boolean mute: muteStates) { if (time.milliseconds() < System.currentTimeMillis()) time.setCurrentTimeMs(System.currentTimeMillis()); - accum.append(tp1, 0L, key, value, Record.EMPTY_HEADERS, null, maxBlockTimeMs, false); + accum.append(tp1, 0L, key, value, Record.EMPTY_HEADERS, null, maxBlockTimeMs, false, time.milliseconds()); assertEquals("No partition should be ready.", 0, accum.ready(cluster, time.milliseconds()).readyNodes.size()); time.sleep(lingerMs); @@ -576,11 +576,11 @@ public class RecordAccumulatorTest { // Test batches not in retry for (int i = 0; i < appends; i++) { - accum.append(tp1, 0L, key, value, Record.EMPTY_HEADERS, null, maxBlockTimeMs, false); + accum.append(tp1, 0L, key, value, Record.EMPTY_HEADERS, null, maxBlockTimeMs, false, time.milliseconds()); assertEquals("No partitions should be ready.", 0, accum.ready(cluster, time.milliseconds()).readyNodes.size()); } // Make the batches ready due to batch full - accum.append(tp1, 0L, key, value, Record.EMPTY_HEADERS, null, 0, false); + accum.append(tp1, 0L, key, value, Record.EMPTY_HEADERS, null, 0, false, time.milliseconds()); Set readyNodes = accum.ready(cluster, time.milliseconds()).readyNodes; assertEquals("Our partition's leader should be ready", Collections.singleton(node1), readyNodes); // Advance the clock to expire the batch. @@ -610,7 +610,7 @@ public class RecordAccumulatorTest { // Test batches in retry. // Create a retried batch - accum.append(tp1, 0L, key, value, Record.EMPTY_HEADERS, null, 0, false); + accum.append(tp1, 0L, key, value, Record.EMPTY_HEADERS, null, 0, false, time.milliseconds()); time.sleep(lingerMs); readyNodes = accum.ready(cluster, time.milliseconds()).readyNodes; assertEquals("Our partition's leader should be ready", Collections.singleton(node1), readyNodes); @@ -634,7 +634,7 @@ public class RecordAccumulatorTest { assertEquals("All batches should have been expired.", 0, expiredBatches.size()); // Test that when being throttled muted batches are expired before the throttle time is over. - accum.append(tp1, 0L, key, value, Record.EMPTY_HEADERS, null, 0, false); + accum.append(tp1, 0L, key, value, Record.EMPTY_HEADERS, null, 0, false, time.milliseconds()); time.sleep(lingerMs); readyNodes = accum.ready(cluster, time.milliseconds()).readyNodes; assertEquals("Our partition's leader should be ready", Collections.singleton(node1), readyNodes); @@ -667,7 +667,7 @@ public class RecordAccumulatorTest { batchSize + DefaultRecordBatch.RECORD_BATCH_OVERHEAD, 10 * batchSize, CompressionType.NONE, 10); int appends = expectedNumAppends(batchSize); for (int i = 0; i < appends; i++) { - accum.append(tp1, 0L, key, value, Record.EMPTY_HEADERS, null, maxBlockTimeMs, false); + accum.append(tp1, 0L, key, value, Record.EMPTY_HEADERS, null, maxBlockTimeMs, false, time.milliseconds()); assertEquals("No partitions should be ready.", 0, accum.ready(cluster, now).readyNodes.size()); } time.sleep(2000); @@ -708,7 +708,7 @@ public class RecordAccumulatorTest { RecordAccumulator accum = new RecordAccumulator(logContext, batchSize + DefaultRecordBatch.RECORD_BATCH_OVERHEAD, CompressionType.NONE, lingerMs, retryBackoffMs, deliveryTimeoutMs, metrics, metricGrpName, time, apiVersions, new TransactionManager(), new BufferPool(totalSize, batchSize, metrics, time, metricGrpName)); - accum.append(tp1, 0L, key, value, Record.EMPTY_HEADERS, null, 0, false); + accum.append(tp1, 0L, key, value, Record.EMPTY_HEADERS, null, 0, false, time.milliseconds()); } @Test @@ -808,7 +808,7 @@ public class RecordAccumulatorTest { int dice = random.nextInt(100); byte[] value = (dice < goodCompRatioPercentage) ? bytesWithGoodCompression(random) : bytesWithPoorCompression(random, 100); - accum.append(tp1, 0L, null, value, Record.EMPTY_HEADERS, null, 0, false); + accum.append(tp1, 0L, null, value, Record.EMPTY_HEADERS, null, 0, false, time.milliseconds()); BatchDrainedResult result = completeOrSplitBatches(accum, batchSize); numSplit += result.numSplit; numBatches += result.numBatches; @@ -831,7 +831,7 @@ public class RecordAccumulatorTest { RecordAccumulator accum = createTestRecordAccumulator( batchSize + DefaultRecordBatch.RECORD_BATCH_OVERHEAD, 10 * batchSize, CompressionType.NONE, lingerMs); - accum.append(tp1, 0L, key, value, Record.EMPTY_HEADERS, null, maxBlockTimeMs, false); + accum.append(tp1, 0L, key, value, Record.EMPTY_HEADERS, null, maxBlockTimeMs, false, time.milliseconds()); Set readyNodes = accum.ready(cluster, time.milliseconds()).readyNodes; Map> drained = accum.drain(cluster, readyNodes, Integer.MAX_VALUE, time.milliseconds()); assertTrue(drained.isEmpty()); @@ -846,7 +846,7 @@ public class RecordAccumulatorTest { //assertTrue(accum.soonToExpireInFlightBatches().isEmpty()); // Queue another batch and advance clock such that batch expiry time is earlier than request timeout. - accum.append(tp2, 0L, key, value, Record.EMPTY_HEADERS, null, maxBlockTimeMs, false); + accum.append(tp2, 0L, key, value, Record.EMPTY_HEADERS, null, maxBlockTimeMs, false, time.milliseconds()); time.sleep(lingerMs * 4); // Now drain and check that accumulator picked up the drained batch because its expiry is soon. @@ -871,7 +871,7 @@ public class RecordAccumulatorTest { // Test batches in retry. for (Boolean mute : muteStates) { - accum.append(tp1, 0L, key, value, Record.EMPTY_HEADERS, null, 0, false); + accum.append(tp1, 0L, key, value, Record.EMPTY_HEADERS, null, 0, false, time.milliseconds()); time.sleep(lingerMs); readyNodes = accum.ready(cluster, time.milliseconds()).readyNodes; assertEquals("Our partition's leader should be ready", Collections.singleton(node1), readyNodes); @@ -892,7 +892,7 @@ public class RecordAccumulatorTest { assertEquals("RecordAccumulator has expired batches if the partition is not muted", mute ? 1 : 0, expiredBatches.size()); } } - + @Test public void testStickyBatches() throws Exception { long now = time.milliseconds(); @@ -904,19 +904,19 @@ public class RecordAccumulatorTest { RecordAccumulator accum = createTestRecordAccumulator(3200, batchSize + DefaultRecordBatch.RECORD_BATCH_OVERHEAD, 10L * batchSize, CompressionType.NONE, 10); int expectedAppends = expectedNumAppendsNoKey(batchSize); - + // Create first batch int partition = partitioner.partition(topic, null, null, "value", value, cluster); TopicPartition tp = new TopicPartition(topic, partition); - accum.append(tp, 0L, null, value, Record.EMPTY_HEADERS, null, maxBlockTimeMs, false); + accum.append(tp, 0L, null, value, Record.EMPTY_HEADERS, null, maxBlockTimeMs, false, time.milliseconds()); int appends = 1; - + boolean switchPartition = false; while (!switchPartition) { // Append to the first batch partition = partitioner.partition(topic, null, null, "value", value, cluster); tp = new TopicPartition(topic, partition); - RecordAccumulator.RecordAppendResult result = accum.append(tp, 0L, null, value, Record.EMPTY_HEADERS, null, maxBlockTimeMs, true); + RecordAccumulator.RecordAppendResult result = accum.append(tp, 0L, null, value, Record.EMPTY_HEADERS, null, maxBlockTimeMs, true, time.milliseconds()); Deque partitionBatches1 = accum.batches().get(tp1); Deque partitionBatches2 = accum.batches().get(tp2); Deque partitionBatches3 = accum.batches().get(tp3); @@ -931,38 +931,38 @@ public class RecordAccumulatorTest { assertEquals("No partitions should be ready.", 0, accum.ready(cluster, now).readyNodes.size()); } } - + // Batch should be full. assertEquals(1, accum.ready(cluster, time.milliseconds()).readyNodes.size()); assertEquals(appends, expectedAppends); switchPartition = false; - + // KafkaProducer would call this method in this case, make second batch partitioner.onNewBatch(topic, cluster, partition); partition = partitioner.partition(topic, null, null, "value", value, cluster); tp = new TopicPartition(topic, partition); - accum.append(tp, 0L, null, value, Record.EMPTY_HEADERS, null, maxBlockTimeMs, false); + accum.append(tp, 0L, null, value, Record.EMPTY_HEADERS, null, maxBlockTimeMs, false, time.milliseconds()); appends++; - + // These appends all go into the second batch while (!switchPartition) { partition = partitioner.partition(topic, null, null, "value", value, cluster); tp = new TopicPartition(topic, partition); - RecordAccumulator.RecordAppendResult result = accum.append(tp, 0L, null, value, Record.EMPTY_HEADERS, null, maxBlockTimeMs, true); + RecordAccumulator.RecordAppendResult result = accum.append(tp, 0L, null, value, Record.EMPTY_HEADERS, null, maxBlockTimeMs, true, time.milliseconds()); Deque partitionBatches1 = accum.batches().get(tp1); Deque partitionBatches2 = accum.batches().get(tp2); Deque partitionBatches3 = accum.batches().get(tp3); int numBatches = (partitionBatches1 == null ? 0 : partitionBatches1.size()) + (partitionBatches2 == null ? 0 : partitionBatches2.size()) + (partitionBatches3 == null ? 0 : partitionBatches3.size()); // Only two batches because the new partition is also sticky. assertEquals(2, numBatches); - + switchPartition = result.abortForNewBatch; // We only appended if we do not retry. if (!switchPartition) { appends++; } } - + // There should be two full batches now. assertEquals(appends, 2 * expectedAppends); } @@ -976,7 +976,7 @@ public class RecordAccumulatorTest { CompressionRatioEstimator.setEstimation(tp1.topic(), CompressionType.GZIP, 0.1f); // Append 20 records of 100 bytes size with poor compression ratio should make the batch too big. for (int i = 0; i < numRecords; i++) { - accum.append(tp1, 0L, null, bytesWithPoorCompression(random, recordSize), Record.EMPTY_HEADERS, null, 0, false); + accum.append(tp1, 0L, null, bytesWithPoorCompression(random, recordSize), Record.EMPTY_HEADERS, null, 0, false, time.milliseconds()); } RecordAccumulator.ReadyCheckResult result = accum.ready(cluster, time.milliseconds()); @@ -1061,7 +1061,7 @@ public class RecordAccumulatorTest { size += recordSize; } } - + /** * Return the offset delta when there is no key. */ diff --git a/clients/src/test/java/org/apache/kafka/clients/producer/internals/SenderTest.java b/clients/src/test/java/org/apache/kafka/clients/producer/internals/SenderTest.java index 0215eae5998..0251f7be854 100644 --- a/clients/src/test/java/org/apache/kafka/clients/producer/internals/SenderTest.java +++ b/clients/src/test/java/org/apache/kafka/clients/producer/internals/SenderTest.java @@ -152,8 +152,7 @@ public class SenderTest { @Test public void testSimple() throws Exception { long offset = 0; - Future future = accumulator.append(tp0, 0L, "key".getBytes(), "value".getBytes(), - null, null, MAX_BLOCK_TIMEOUT, false).future; + Future future = appendToAccumulator(tp0, 0L, "key", "value"); sender.runOnce(); // connect sender.runOnce(); // send produce request assertEquals("We should have a single produce request in flight.", 1, client.inFlightRequestCount()); @@ -179,8 +178,7 @@ public class SenderTest { // start off support produce request v3 apiVersions.update("0", NodeApiVersions.create()); - Future future = accumulator.append(tp0, 0L, "key".getBytes(), "value".getBytes(), - null, null, MAX_BLOCK_TIMEOUT, false).future; + Future future = appendToAccumulator(tp0, 0L, "key", "value"); // now the partition leader supports only v2 apiVersions.update("0", NodeApiVersions.create(ApiKeys.PRODUCE.id, (short) 0, (short) 2)); @@ -218,14 +216,12 @@ public class SenderTest { // start off support produce request v3 apiVersions.update("0", NodeApiVersions.create()); - Future future1 = accumulator.append(tp0, 0L, "key".getBytes(), "value".getBytes(), - null, null, MAX_BLOCK_TIMEOUT, false).future; + Future future1 = appendToAccumulator(tp0, 0L, "key", "value"); // now the partition leader supports only v2 apiVersions.update("0", NodeApiVersions.create(ApiKeys.PRODUCE.id, (short) 0, (short) 2)); - Future future2 = accumulator.append(tp1, 0L, "key".getBytes(), "value".getBytes(), - null, null, MAX_BLOCK_TIMEOUT, false).future; + Future future2 = appendToAccumulator(tp1, 0L, "key", "value"); // start off support produce request v3 apiVersions.update("0", NodeApiVersions.create()); @@ -320,7 +316,7 @@ public class SenderTest { 1, metricsRegistry, time, REQUEST_TIMEOUT, RETRY_BACKOFF_MS, null, apiVersions); // Append a message so that topic metrics are created - accumulator.append(tp0, 0L, "key".getBytes(), "value".getBytes(), null, null, MAX_BLOCK_TIMEOUT, false); + appendToAccumulator(tp0, 0L, "key", "value"); sender.runOnce(); // connect sender.runOnce(); // send produce request client.respond(produceResponse(tp0, 0, Errors.NONE, 0)); @@ -347,7 +343,7 @@ public class SenderTest { Sender sender = new Sender(logContext, client, metadata, this.accumulator, false, MAX_REQUEST_SIZE, ACKS_ALL, maxRetries, senderMetrics, time, REQUEST_TIMEOUT, RETRY_BACKOFF_MS, null, apiVersions); // do a successful retry - Future future = accumulator.append(tp0, 0L, "key".getBytes(), "value".getBytes(), null, null, MAX_BLOCK_TIMEOUT, false).future; + Future future = appendToAccumulator(tp0, 0L, "key", "value"); sender.runOnce(); // connect sender.runOnce(); // send produce request String id = client.requests().peek().destination(); @@ -376,7 +372,7 @@ public class SenderTest { assertEquals(0, sender.inFlightBatches(tp0).size()); // do an unsuccessful retry - future = accumulator.append(tp0, 0L, "key".getBytes(), "value".getBytes(), null, null, MAX_BLOCK_TIMEOUT, false).future; + future = appendToAccumulator(tp0, 0L, "key", "value"); sender.runOnce(); // send produce request assertEquals(1, sender.inFlightBatches(tp0).size()); for (int i = 0; i < maxRetries + 1; i++) { @@ -410,7 +406,7 @@ public class SenderTest { // Send the first message. TopicPartition tp2 = new TopicPartition("test", 1); - accumulator.append(tp2, 0L, "key1".getBytes(), "value1".getBytes(), null, null, MAX_BLOCK_TIMEOUT, false); + appendToAccumulator(tp2, 0L, "key1", "value1"); sender.runOnce(); // connect sender.runOnce(); // send produce request String id = client.requests().peek().destination(); @@ -423,7 +419,7 @@ public class SenderTest { time.sleep(900); // Now send another message to tp2 - accumulator.append(tp2, 0L, "key2".getBytes(), "value2".getBytes(), null, null, MAX_BLOCK_TIMEOUT, false); + appendToAccumulator(tp2, 0L, "key2", "value2"); // Update metadata before sender receives response from broker 0. Now partition 2 moves to broker 0 MetadataResponse metadataUpdate2 = TestUtils.metadataUpdateWith(1, Collections.singletonMap("test", 2)); @@ -453,7 +449,7 @@ public class SenderTest { if (exception instanceof TimeoutException) { expiryCallbackCount.incrementAndGet(); try { - accumulator.append(tp1, 0L, key, value, Record.EMPTY_HEADERS, null, maxBlockTimeMs, false); + accumulator.append(tp1, 0L, key, value, Record.EMPTY_HEADERS, null, maxBlockTimeMs, false, time.milliseconds()); } catch (InterruptedException e) { throw new RuntimeException("Unexpected interruption", e); } @@ -462,8 +458,9 @@ public class SenderTest { } }; + final long nowMs = time.milliseconds(); for (int i = 0; i < messagesPerBatch; i++) - accumulator.append(tp1, 0L, key, value, null, callback, maxBlockTimeMs, false); + accumulator.append(tp1, 0L, key, value, null, callback, maxBlockTimeMs, false, nowMs); // Advance the clock to expire the first batch. time.sleep(10000); @@ -496,7 +493,7 @@ public class SenderTest { long offset = 0; client.updateMetadata(TestUtils.metadataUpdateWith(1, Collections.singletonMap("test", 2))); - Future future = accumulator.append(tp0, time.milliseconds(), "key".getBytes(), "value".getBytes(), null, null, MAX_BLOCK_TIMEOUT, false).future; + Future future = appendToAccumulator(tp0); sender.runOnce(); assertTrue("Topic not added to metadata", metadata.containsTopic(tp0.topic())); client.updateMetadata(TestUtils.metadataUpdateWith(1, Collections.singletonMap("test", 2))); @@ -513,7 +510,7 @@ public class SenderTest { time.sleep(ProducerMetadata.TOPIC_EXPIRY_MS); client.updateMetadata(TestUtils.metadataUpdateWith(1, Collections.singletonMap("test", 2))); assertFalse("Unused topic has not been expired", metadata.containsTopic(tp0.topic())); - future = accumulator.append(tp0, time.milliseconds(), "key".getBytes(), "value".getBytes(), null, null, MAX_BLOCK_TIMEOUT, false).future; + future = appendToAccumulator(tp0); sender.runOnce(); assertTrue("Topic not added to metadata", metadata.containsTopic(tp0.topic())); client.updateMetadata(TestUtils.metadataUpdateWith(1, Collections.singletonMap("test", 2))); @@ -555,7 +552,7 @@ public class SenderTest { @Test public void testCanRetryWithoutIdempotence() throws Exception { // do a successful retry - Future future = accumulator.append(tp0, 0L, "key".getBytes(), "value".getBytes(), null, null, MAX_BLOCK_TIMEOUT, false).future; + Future future = appendToAccumulator(tp0, 0L, "key", "value"); sender.runOnce(); // connect sender.runOnce(); // send produce request String id = client.requests().peek().destination(); @@ -594,7 +591,7 @@ public class SenderTest { assertEquals(0, transactionManager.sequenceNumber(tp0).longValue()); // Send first ProduceRequest - Future request1 = accumulator.append(tp0, time.milliseconds(), "key".getBytes(), "value".getBytes(), null, null, MAX_BLOCK_TIMEOUT, false).future; + Future request1 = appendToAccumulator(tp0); sender.runOnce(); String nodeId = client.requests().peek().destination(); Node node = new Node(Integer.valueOf(nodeId), "localhost", 0); @@ -603,7 +600,7 @@ public class SenderTest { assertEquals(OptionalInt.empty(), transactionManager.lastAckedSequence(tp0)); // Send second ProduceRequest - Future request2 = accumulator.append(tp0, time.milliseconds(), "key".getBytes(), "value".getBytes(), null, null, MAX_BLOCK_TIMEOUT, false).future; + Future request2 = appendToAccumulator(tp0); sender.runOnce(); assertEquals(2, client.inFlightRequestCount()); assertEquals(2, transactionManager.sequenceNumber(tp0).longValue()); @@ -644,7 +641,7 @@ public class SenderTest { assertEquals(0, transactionManager.sequenceNumber(tp0).longValue()); // Send first ProduceRequest - Future request1 = accumulator.append(tp0, time.milliseconds(), "key".getBytes(), "value".getBytes(), null, null, MAX_BLOCK_TIMEOUT, false).future; + Future request1 = appendToAccumulator(tp0); sender.runOnce(); String nodeId = client.requests().peek().destination(); Node node = new Node(Integer.valueOf(nodeId), "localhost", 0); @@ -653,11 +650,11 @@ public class SenderTest { assertEquals(OptionalInt.empty(), transactionManager.lastAckedSequence(tp0)); // Send second ProduceRequest - Future request2 = accumulator.append(tp0, time.milliseconds(), "key".getBytes(), "value".getBytes(), null, null, MAX_BLOCK_TIMEOUT, false).future; + Future request2 = appendToAccumulator(tp0); sender.runOnce(); // Send third ProduceRequest - Future request3 = accumulator.append(tp0, time.milliseconds(), "key".getBytes(), "value".getBytes(), null, null, MAX_BLOCK_TIMEOUT, false).future; + Future request3 = appendToAccumulator(tp0); sender.runOnce(); assertEquals(3, client.inFlightRequestCount()); @@ -672,7 +669,7 @@ public class SenderTest { sender.runOnce(); // receive response 0 // Queue the fourth request, it shouldn't be sent until the first 3 complete. - Future request4 = accumulator.append(tp0, time.milliseconds(), "key".getBytes(), "value".getBytes(), null, null, MAX_BLOCK_TIMEOUT, false).future; + Future request4 = appendToAccumulator(tp0); assertEquals(2, client.inFlightRequestCount()); assertEquals(OptionalInt.empty(), transactionManager.lastAckedSequence(tp0)); @@ -744,7 +741,7 @@ public class SenderTest { assertEquals(0, transactionManager.sequenceNumber(tp0).longValue()); // Send first ProduceRequest - Future request1 = accumulator.append(tp0, time.milliseconds(), "key".getBytes(), "value".getBytes(), null, null, MAX_BLOCK_TIMEOUT, false).future; + Future request1 = appendToAccumulator(tp0); sender.runOnce(); String nodeId = client.requests().peek().destination(); Node node = new Node(Integer.valueOf(nodeId), "localhost", 0); @@ -753,7 +750,7 @@ public class SenderTest { assertEquals(OptionalInt.empty(), transactionManager.lastAckedSequence(tp0)); // Send second ProduceRequest - Future request2 = accumulator.append(tp0, time.milliseconds(), "key".getBytes(), "value".getBytes(), null, null, MAX_BLOCK_TIMEOUT, false).future; + Future request2 = appendToAccumulator(tp0); sender.runOnce(); assertEquals(2, client.inFlightRequestCount()); assertEquals(2, transactionManager.sequenceNumber(tp0).longValue()); @@ -803,8 +800,8 @@ public class SenderTest { assertEquals(0, transactionManager.sequenceNumber(tp0).longValue()); // Send first ProduceRequest with multiple messages. - Future request1 = accumulator.append(tp0, time.milliseconds(), "key".getBytes(), "value".getBytes(), null, null, MAX_BLOCK_TIMEOUT, false).future; - accumulator.append(tp0, time.milliseconds(), "key".getBytes(), "value".getBytes(), null, null, MAX_BLOCK_TIMEOUT, false); + Future request1 = appendToAccumulator(tp0); + appendToAccumulator(tp0); sender.runOnce(); String nodeId = client.requests().peek().destination(); Node node = new Node(Integer.valueOf(nodeId), "localhost", 0); @@ -818,7 +815,7 @@ public class SenderTest { sender.runOnce(); // Send second ProduceRequest - Future request2 = accumulator.append(tp0, time.milliseconds(), "key".getBytes(), "value".getBytes(), null, null, MAX_BLOCK_TIMEOUT, false).future; + Future request2 = appendToAccumulator(tp0); sender.runOnce(); assertEquals(1, client.inFlightRequestCount()); assertEquals(3, transactionManager.sequenceNumber(tp0).longValue()); @@ -845,7 +842,7 @@ public class SenderTest { assertEquals(0, transactionManager.sequenceNumber(tp0).longValue()); // Send first ProduceRequest - Future request1 = accumulator.append(tp0, time.milliseconds(), "key".getBytes(), "value".getBytes(), null, null, MAX_BLOCK_TIMEOUT, false).future; + Future request1 = appendToAccumulator(tp0); sender.runOnce(); String nodeId = client.requests().peek().destination(); Node node = new Node(Integer.valueOf(nodeId), "localhost", 0); @@ -854,7 +851,7 @@ public class SenderTest { assertEquals(OptionalInt.empty(), transactionManager.lastAckedSequence(tp0)); // Send second ProduceRequest - Future request2 = accumulator.append(tp0, time.milliseconds(), "key".getBytes(), "value".getBytes(), null, null, MAX_BLOCK_TIMEOUT, false).future; + Future request2 = appendToAccumulator(tp0); sender.runOnce(); assertEquals(2, client.inFlightRequestCount()); assertEquals(2, transactionManager.sequenceNumber(tp0).longValue()); @@ -927,14 +924,14 @@ public class SenderTest { assertEquals(0, transactionManager.sequenceNumber(tp0).longValue()); // Send first ProduceRequest - Future request1 = accumulator.append(tp0, time.milliseconds(), "key".getBytes(), "value".getBytes(), null, null, MAX_BLOCK_TIMEOUT, false).future; + Future request1 = appendToAccumulator(tp0); sender.runOnce(); String nodeId = client.requests().peek().destination(); Node node = new Node(Integer.valueOf(nodeId), "localhost", 0); assertEquals(1, client.inFlightRequestCount()); // Send second ProduceRequest - Future request2 = accumulator.append(tp0, time.milliseconds(), "key".getBytes(), "value".getBytes(), null, null, MAX_BLOCK_TIMEOUT, false).future; + Future request2 = appendToAccumulator(tp0); sender.runOnce(); assertEquals(2, client.inFlightRequestCount()); assertFalse(request1.isDone()); @@ -995,7 +992,7 @@ public class SenderTest { assertEquals(0, transactionManager.sequenceNumber(tp0).longValue()); // Send first ProduceRequest - Future request1 = accumulator.append(tp0, 0L, "key".getBytes(), "value".getBytes(), null, null, MAX_BLOCK_TIMEOUT, false).future; + Future request1 = appendToAccumulator(tp0, 0L, "key", "value"); Node node = metadata.fetch().nodes().get(0); time.sleep(10000L); client.disconnect(node.idString()); @@ -1017,13 +1014,13 @@ public class SenderTest { assertEquals(0, transactionManager.sequenceNumber(tp0).longValue()); // Send first ProduceRequest - Future request1 = accumulator.append(tp0, time.milliseconds(), "key".getBytes(), "value".getBytes(), null, null, MAX_BLOCK_TIMEOUT, false).future; + Future request1 = appendToAccumulator(tp0); sender.runOnce(); // send request // We separate the two appends by 1 second so that the two batches // don't expire at the same time. time.sleep(1000L); - Future request2 = accumulator.append(tp0, time.milliseconds(), "key".getBytes(), "value".getBytes(), null, null, MAX_BLOCK_TIMEOUT, false).future; + Future request2 = appendToAccumulator(tp0); sender.runOnce(); // send request assertEquals(2, client.inFlightRequestCount()); assertEquals(2, sender.inFlightBatches(tp0).size()); @@ -1045,7 +1042,7 @@ public class SenderTest { assertEquals(0, sender.inFlightBatches(tp0).size()); // let's enqueue another batch, which should not be dequeued until the unresolved state is clear. - Future request3 = accumulator.append(tp0, time.milliseconds(), "key".getBytes(), "value".getBytes(), null, null, MAX_BLOCK_TIMEOUT, false).future; + Future request3 = appendToAccumulator(tp0); time.sleep(20); assertFalse(request2.isDone()); @@ -1084,11 +1081,11 @@ public class SenderTest { assertEquals(0, transactionManager.sequenceNumber(tp0).longValue()); // Send first ProduceRequest - Future request1 = accumulator.append(tp0, time.milliseconds(), "key".getBytes(), "value".getBytes(), null, null, MAX_BLOCK_TIMEOUT, false).future; + Future request1 = appendToAccumulator(tp0); sender.runOnce(); // send request time.sleep(1000L); - Future request2 = accumulator.append(tp0, time.milliseconds(), "key".getBytes(), "value".getBytes(), null, null, MAX_BLOCK_TIMEOUT, false).future; + Future request2 = appendToAccumulator(tp0); sender.runOnce(); // send request assertEquals(2, client.inFlightRequestCount()); @@ -1105,7 +1102,7 @@ public class SenderTest { assertFutureFailure(request1, TimeoutException.class); assertTrue(transactionManager.hasUnresolvedSequence(tp0)); // let's enqueue another batch, which should not be dequeued until the unresolved state is clear. - Future request3 = accumulator.append(tp0, time.milliseconds(), "key".getBytes(), "value".getBytes(), null, null, MAX_BLOCK_TIMEOUT, false).future; + Future request3 = appendToAccumulator(tp0); time.sleep(20); assertFalse(request2.isDone()); @@ -1137,7 +1134,7 @@ public class SenderTest { assertEquals(0, transactionManager.sequenceNumber(tp0).longValue()); // Send first ProduceRequest - Future request1 = accumulator.append(tp0, 0L, "key".getBytes(), "value".getBytes(), null, null, MAX_BLOCK_TIMEOUT, false).future; + Future request1 = appendToAccumulator(tp0, 0L, "key", "value"); sender.runOnce(); // send request sendIdempotentProducerResponse(0, tp0, Errors.NOT_LEADER_FOR_PARTITION, -1); @@ -1176,10 +1173,8 @@ public class SenderTest { Sender sender = new Sender(logContext, client, metadata, this.accumulator, true, MAX_REQUEST_SIZE, ACKS_ALL, maxRetries, senderMetrics, time, REQUEST_TIMEOUT, RETRY_BACKOFF_MS, transactionManager, apiVersions); - Future failedResponse = accumulator.append(tp0, time.milliseconds(), "key".getBytes(), - "value".getBytes(), null, null, MAX_BLOCK_TIMEOUT, false).future; - Future successfulResponse = accumulator.append(tp1, time.milliseconds(), "key".getBytes(), - "value".getBytes(), null, null, MAX_BLOCK_TIMEOUT, false).future; + Future failedResponse = appendToAccumulator(tp0); + Future successfulResponse = appendToAccumulator(tp1); sender.runOnce(); // connect and send. assertEquals(1, client.inFlightRequestCount()); @@ -1218,10 +1213,8 @@ public class SenderTest { Sender sender = new Sender(logContext, client, metadata, this.accumulator, true, MAX_REQUEST_SIZE, ACKS_ALL, 10, senderMetrics, time, REQUEST_TIMEOUT, RETRY_BACKOFF_MS, transactionManager, apiVersions); - Future failedResponse = accumulator.append(tp0, time.milliseconds(), "key".getBytes(), - "value".getBytes(), null, null, MAX_BLOCK_TIMEOUT, false).future; - Future successfulResponse = accumulator.append(tp1, time.milliseconds(), "key".getBytes(), - "value".getBytes(), null, null, MAX_BLOCK_TIMEOUT, false).future; + Future failedResponse = appendToAccumulator(tp0); + Future successfulResponse = appendToAccumulator(tp1); sender.runOnce(); // connect and send. assertEquals(1, client.inFlightRequestCount()); @@ -1257,10 +1250,8 @@ public class SenderTest { Sender sender = new Sender(logContext, client, metadata, this.accumulator, true, MAX_REQUEST_SIZE, ACKS_ALL, 10, senderMetrics, time, REQUEST_TIMEOUT, RETRY_BACKOFF_MS, transactionManager, apiVersions); - Future failedResponse = accumulator.append(tp0, time.milliseconds(), "key".getBytes(), - "value".getBytes(), null, null, MAX_BLOCK_TIMEOUT, false).future; - Future successfulResponse = accumulator.append(tp1, time.milliseconds(), "key".getBytes(), - "value".getBytes(), null, null, MAX_BLOCK_TIMEOUT, false).future; + Future failedResponse = appendToAccumulator(tp0); + Future successfulResponse = appendToAccumulator(tp1); sender.runOnce(); // connect and send. assertEquals(1, client.inFlightRequestCount()); @@ -1293,10 +1284,8 @@ public class SenderTest { Sender sender = new Sender(logContext, client, metadata, this.accumulator, true, MAX_REQUEST_SIZE, ACKS_ALL, maxRetries, senderMetrics, time, REQUEST_TIMEOUT, RETRY_BACKOFF_MS, transactionManager, apiVersions); - Future failedResponse = accumulator.append(tp0, time.milliseconds(), "key".getBytes(), - "value".getBytes(), null, null, MAX_BLOCK_TIMEOUT, false).future; - Future successfulResponse = accumulator.append(tp1, time.milliseconds(), "key".getBytes(), - "value".getBytes(), null, null, MAX_BLOCK_TIMEOUT, false).future; + Future failedResponse = appendToAccumulator(tp0); + Future successfulResponse = appendToAccumulator(tp1); sender.runOnce(); // connect. sender.runOnce(); // send. @@ -1340,7 +1329,7 @@ public class SenderTest { assertEquals(0, transactionManager.sequenceNumber(tp0).longValue()); // Send first ProduceRequest - Future request1 = accumulator.append(tp0, time.milliseconds(), "key".getBytes(), "value".getBytes(), null, null, MAX_BLOCK_TIMEOUT, false).future; + Future request1 = appendToAccumulator(tp0); sender.runOnce(); String nodeId = client.requests().peek().destination(); Node node = new Node(Integer.valueOf(nodeId), "localhost", 0); @@ -1349,7 +1338,7 @@ public class SenderTest { assertEquals(OptionalInt.empty(), transactionManager.lastAckedSequence(tp0)); // Send second ProduceRequest - Future request2 = accumulator.append(tp0, time.milliseconds(), "key".getBytes(), "value".getBytes(), null, null, MAX_BLOCK_TIMEOUT, false).future; + Future request2 = appendToAccumulator(tp0); sender.runOnce(); assertEquals(2, client.inFlightRequestCount()); assertEquals(2, transactionManager.sequenceNumber(tp0).longValue()); @@ -1393,7 +1382,7 @@ public class SenderTest { assertEquals(0, transactionManager.sequenceNumber(tp0).longValue()); // Send first ProduceRequest - Future request1 = accumulator.append(tp0, time.milliseconds(), "key".getBytes(), "value".getBytes(), null, null, MAX_BLOCK_TIMEOUT, false).future; + Future request1 = appendToAccumulator(tp0); sender.runOnce(); assertEquals(1, client.inFlightRequestCount()); @@ -1410,8 +1399,8 @@ public class SenderTest { assertEquals(OptionalLong.of(1000L), transactionManager.lastAckedOffset(tp0)); // Send second ProduceRequest, a single batch with 2 records. - accumulator.append(tp0, time.milliseconds(), "key".getBytes(), "value".getBytes(), null, null, MAX_BLOCK_TIMEOUT, false); - Future request2 = accumulator.append(tp0, time.milliseconds(), "key".getBytes(), "value".getBytes(), null, null, MAX_BLOCK_TIMEOUT, false).future; + appendToAccumulator(tp0); + Future request2 = appendToAccumulator(tp0); sender.runOnce(); assertEquals(3, transactionManager.sequenceNumber(tp0).longValue()); assertEquals(OptionalInt.of(0), transactionManager.lastAckedSequence(tp0)); @@ -1451,7 +1440,7 @@ public class SenderTest { assertEquals(0, transactionManager.sequenceNumber(tp0).longValue()); // Send first ProduceRequest - Future request1 = accumulator.append(tp0, time.milliseconds(), "key".getBytes(), "value".getBytes(), null, null, MAX_BLOCK_TIMEOUT, false).future; + Future request1 = appendToAccumulator(tp0); sender.runOnce(); assertEquals(1, client.inFlightRequestCount()); @@ -1468,7 +1457,7 @@ public class SenderTest { assertEquals(OptionalLong.of(1000L), transactionManager.lastAckedOffset(tp0)); // Send second ProduceRequest - Future request2 = accumulator.append(tp0, time.milliseconds(), "key".getBytes(), "value".getBytes(), null, null, MAX_BLOCK_TIMEOUT, false).future; + Future request2 = appendToAccumulator(tp0); sender.runOnce(); assertEquals(2, transactionManager.sequenceNumber(tp0).longValue()); assertEquals(OptionalInt.of(0), transactionManager.lastAckedSequence(tp0)); @@ -1509,7 +1498,7 @@ public class SenderTest { assertEquals(0, transactionManager.sequenceNumber(tp0).longValue()); // Send first ProduceRequest - Future request1 = accumulator.append(tp0, time.milliseconds(), "key".getBytes(), "value".getBytes(), null, null, MAX_BLOCK_TIMEOUT, false).future; + Future request1 = appendToAccumulator(tp0); sender.runOnce(); assertEquals(1, client.inFlightRequestCount()); @@ -1526,14 +1515,14 @@ public class SenderTest { assertEquals(OptionalLong.of(1000L), transactionManager.lastAckedOffset(tp0)); // Send second ProduceRequest - Future request2 = accumulator.append(tp0, time.milliseconds(), "key".getBytes(), "value".getBytes(), null, null, MAX_BLOCK_TIMEOUT, false).future; + Future request2 = appendToAccumulator(tp0); sender.runOnce(); assertEquals(2, transactionManager.sequenceNumber(tp0).longValue()); assertEquals(OptionalInt.of(0), transactionManager.lastAckedSequence(tp0)); // Send the third ProduceRequest, in parallel with the second. It should be retried even though the // lastAckedOffset > logStartOffset when its UnknownProducerResponse comes back. - Future request3 = accumulator.append(tp0, time.milliseconds(), "key".getBytes(), "value".getBytes(), null, null, MAX_BLOCK_TIMEOUT, false).future; + Future request3 = appendToAccumulator(tp0); sender.runOnce(); assertEquals(3, transactionManager.sequenceNumber(tp0).longValue()); assertEquals(OptionalInt.of(0), transactionManager.lastAckedSequence(tp0)); @@ -1598,7 +1587,7 @@ public class SenderTest { assertEquals(0, transactionManager.sequenceNumber(tp0).longValue()); // Send first ProduceRequest - Future request1 = accumulator.append(tp0, time.milliseconds(), "key".getBytes(), "value".getBytes(), null, null, MAX_BLOCK_TIMEOUT, false).future; + Future request1 = appendToAccumulator(tp0); sender.runOnce(); assertEquals(1, client.inFlightRequestCount()); @@ -1615,7 +1604,7 @@ public class SenderTest { assertEquals(OptionalLong.of(1000L), transactionManager.lastAckedOffset(tp0)); // Send second ProduceRequest, - Future request2 = accumulator.append(tp0, time.milliseconds(), "key".getBytes(), "value".getBytes(), null, null, MAX_BLOCK_TIMEOUT, false).future; + Future request2 = appendToAccumulator(tp0); sender.runOnce(); assertEquals(2, transactionManager.sequenceNumber(tp0).longValue()); assertEquals(OptionalInt.of(0), transactionManager.lastAckedSequence(tp0)); @@ -1658,8 +1647,7 @@ public class SenderTest { assertTrue(transactionManager.hasProducerId()); // cluster authorization is a fatal error for the producer - Future future = accumulator.append(tp0, time.milliseconds(), "key".getBytes(), "value".getBytes(), - null, null, MAX_BLOCK_TIMEOUT, false).future; + Future future = appendToAccumulator(tp0); client.prepareResponse(new MockClient.RequestMatcher() { @Override public boolean matches(AbstractRequest body) { @@ -1685,12 +1673,10 @@ public class SenderTest { assertTrue(transactionManager.hasProducerId()); // cluster authorization is a fatal error for the producer - Future future1 = accumulator.append(tp0, time.milliseconds(), "key".getBytes(), "value".getBytes(), - null, null, MAX_BLOCK_TIMEOUT, false).future; + Future future1 = appendToAccumulator(tp0); sender.runOnce(); - Future future2 = accumulator.append(tp1, time.milliseconds(), "key".getBytes(), "value".getBytes(), - null, null, MAX_BLOCK_TIMEOUT, false).future; + Future future2 = appendToAccumulator(tp1); sender.runOnce(); client.respond(new MockClient.RequestMatcher() { @@ -1726,8 +1712,7 @@ public class SenderTest { prepareAndReceiveInitProducerId(producerId, Errors.NONE); assertTrue(transactionManager.hasProducerId()); - Future future = accumulator.append(tp0, time.milliseconds(), "key".getBytes(), "value".getBytes(), - null, null, MAX_BLOCK_TIMEOUT, false).future; + Future future = appendToAccumulator(tp0); client.prepareResponse(new MockClient.RequestMatcher() { @Override public boolean matches(AbstractRequest body) { @@ -1751,8 +1736,7 @@ public class SenderTest { prepareAndReceiveInitProducerId(producerId, Errors.NONE); assertTrue(transactionManager.hasProducerId()); - Future future = accumulator.append(tp0, time.milliseconds(), "key".getBytes(), "value".getBytes(), - null, null, MAX_BLOCK_TIMEOUT, false).future; + Future future = appendToAccumulator(tp0); client.prepareUnsupportedVersionResponse(new MockClient.RequestMatcher() { @Override public boolean matches(AbstractRequest body) { @@ -1782,7 +1766,7 @@ public class SenderTest { Sender sender = new Sender(logContext, client, metadata, this.accumulator, true, MAX_REQUEST_SIZE, ACKS_ALL, maxRetries, senderMetrics, time, REQUEST_TIMEOUT, RETRY_BACKOFF_MS, transactionManager, apiVersions); - Future responseFuture = accumulator.append(tp0, time.milliseconds(), "key".getBytes(), "value".getBytes(), null, null, MAX_BLOCK_TIMEOUT, false).future; + Future responseFuture = appendToAccumulator(tp0); client.prepareResponse(new MockClient.RequestMatcher() { @Override public boolean matches(AbstractRequest body) { @@ -1824,7 +1808,7 @@ public class SenderTest { Sender sender = new Sender(logContext, client, metadata, this.accumulator, true, MAX_REQUEST_SIZE, ACKS_ALL, maxRetries, senderMetrics, time, REQUEST_TIMEOUT, RETRY_BACKOFF_MS, transactionManager, apiVersions); - Future responseFuture = accumulator.append(tp0, time.milliseconds(), "key".getBytes(), "value".getBytes(), null, null, MAX_BLOCK_TIMEOUT, false).future; + Future responseFuture = appendToAccumulator(tp0); sender.runOnce(); // connect. sender.runOnce(); // send. String id = client.requests().peek().destination(); @@ -1863,7 +1847,7 @@ public class SenderTest { Sender sender = new Sender(logContext, client, metadata, this.accumulator, true, MAX_REQUEST_SIZE, ACKS_ALL, maxRetries, senderMetrics, time, REQUEST_TIMEOUT, RETRY_BACKOFF_MS, transactionManager, apiVersions); - Future responseFuture = accumulator.append(tp0, time.milliseconds(), "key".getBytes(), "value".getBytes(), null, null, MAX_BLOCK_TIMEOUT, false).future; + Future responseFuture = appendToAccumulator(tp0); sender.runOnce(); // connect. sender.runOnce(); // send. @@ -1926,10 +1910,11 @@ public class SenderTest { MetadataResponse metadataUpdate1 = TestUtils.metadataUpdateWith(2, Collections.singletonMap(topic, 2)); client.prepareMetadataUpdate(metadataUpdate1); // Send the first message. + long nowMs = time.milliseconds(); Future f1 = - accumulator.append(tp, 0L, "key1".getBytes(), new byte[batchSize / 2], null, null, MAX_BLOCK_TIMEOUT, false).future; + accumulator.append(tp, 0L, "key1".getBytes(), new byte[batchSize / 2], null, null, MAX_BLOCK_TIMEOUT, false, nowMs).future; Future f2 = - accumulator.append(tp, 0L, "key2".getBytes(), new byte[batchSize / 2], null, null, MAX_BLOCK_TIMEOUT, false).future; + accumulator.append(tp, 0L, "key2".getBytes(), new byte[batchSize / 2], null, null, MAX_BLOCK_TIMEOUT, false, nowMs).future; sender.runOnce(); // connect sender.runOnce(); // send produce request @@ -1998,8 +1983,7 @@ public class SenderTest { setupWithTransactionState(null, false, pool); // Send first ProduceRequest - Future request1 = - accumulator.append(tp0, time.milliseconds(), "key".getBytes(), "value".getBytes(), null, null, MAX_BLOCK_TIMEOUT, false).future; + Future request1 = appendToAccumulator(tp0); sender.runOnce(); // send request assertEquals(1, client.inFlightRequestCount()); assertEquals(1, sender.inFlightBatches(tp0).size()); @@ -2024,7 +2008,7 @@ public class SenderTest { setupWithTransactionState(null, true, null); // Send first ProduceRequest - Future request = accumulator.append(tp0, time.milliseconds(), "key".getBytes(), "value".getBytes(), null, null, MAX_BLOCK_TIMEOUT, false).future; + Future request = appendToAccumulator(tp0); sender.runOnce(); // send request assertEquals(1, client.inFlightRequestCount()); assertEquals("Expect one in-flight batch in accumulator", 1, sender.inFlightBatches(tp0).size()); @@ -2050,7 +2034,7 @@ public class SenderTest { setupWithTransactionState(null, true, null); // Send first ProduceRequest - accumulator.append(tp0, time.milliseconds(), "key".getBytes(), "value".getBytes(), null, null, MAX_BLOCK_TIMEOUT, false); + appendToAccumulator(tp0); sender.runOnce(); // send request assertEquals(1, client.inFlightRequestCount()); assertEquals(1, sender.inFlightBatches(tp0).size()); @@ -2058,7 +2042,7 @@ public class SenderTest { time.sleep(deliveryTimeoutMs / 2); // Send second ProduceRequest - accumulator.append(tp0, time.milliseconds(), "key".getBytes(), "value".getBytes(), null, null, MAX_BLOCK_TIMEOUT, false); + appendToAccumulator(tp0); sender.runOnce(); // must not send request because the partition is muted assertEquals(1, client.inFlightRequestCount()); assertEquals(1, sender.inFlightBatches(tp0).size()); @@ -2081,9 +2065,7 @@ public class SenderTest { setupWithTransactionState(null, false, null); // Send first ProduceRequest - Future request1 = - accumulator.append(tp0, time.milliseconds(), "key".getBytes(), "value".getBytes(), null, null, - MAX_BLOCK_TIMEOUT, false).future; + Future request1 = appendToAccumulator(tp0); sender.runOnce(); // send request assertEquals(1, client.inFlightRequestCount()); time.sleep(deliverTimeoutMs); @@ -2110,12 +2092,8 @@ public class SenderTest { public void testExpiredBatchDoesNotSplitOnMessageTooLargeError() throws Exception { long deliverTimeoutMs = 1500L; // create a producer batch with more than one record so it is eligible for splitting - Future request1 = - accumulator.append(tp0, time.milliseconds(), "key1".getBytes(), "value1".getBytes(), null, null, - MAX_BLOCK_TIMEOUT, false).future; - Future request2 = - accumulator.append(tp0, time.milliseconds(), "key2".getBytes(), "value2".getBytes(), null, null, - MAX_BLOCK_TIMEOUT, false).future; + Future request1 = appendToAccumulator(tp0); + Future request2 = appendToAccumulator(tp0); // send request sender.runOnce(); @@ -2143,8 +2121,7 @@ public class SenderTest { setupWithTransactionState(null); - accumulator.append(tp0, 0L, "key".getBytes(), "value".getBytes(), null, null, - MAX_BLOCK_TIMEOUT, false); + appendToAccumulator(tp0, 0L, "key", "value"); sender.runOnce(); sender.runOnce(); @@ -2168,8 +2145,8 @@ public class SenderTest { setupWithTransactionState(null, true, null); // Send multiple ProduceRequest across multiple partitions. - Future request1 = accumulator.append(tp0, time.milliseconds(), "k1".getBytes(), "v1".getBytes(), null, null, MAX_BLOCK_TIMEOUT, false).future; - Future request2 = accumulator.append(tp1, time.milliseconds(), "k2".getBytes(), "v2".getBytes(), null, null, MAX_BLOCK_TIMEOUT, false).future; + Future request1 = appendToAccumulator(tp0, time.milliseconds(), "k1", "v1"); + Future request2 = appendToAccumulator(tp1, time.milliseconds(), "k2", "v2"); // Send request. sender.runOnce(); @@ -2327,12 +2304,8 @@ public class SenderTest { sender.runOnce(); // create a producer batch with more than one record so it is eligible for splitting - Future request1 = - accumulator.append(tp0, time.milliseconds(), "key1".getBytes(), "value1".getBytes(), null, null, - MAX_BLOCK_TIMEOUT, false).future; - Future request2 = - accumulator.append(tp0, time.milliseconds(), "key2".getBytes(), "value2".getBytes(), null, null, - MAX_BLOCK_TIMEOUT, false).future; + Future request1 = appendToAccumulator(tp0, time.milliseconds(), "key1", "value1"); + Future request2 = appendToAccumulator(tp0, time.milliseconds(), "key2", "value2"); // send request sender.runOnce(); @@ -2441,6 +2414,15 @@ public class SenderTest { } } + private FutureRecordMetadata appendToAccumulator(TopicPartition tp) throws InterruptedException { + return appendToAccumulator(tp, time.milliseconds(), "key", "value"); + } + + private FutureRecordMetadata appendToAccumulator(TopicPartition tp, long timestamp, String key, String value) throws InterruptedException { + return accumulator.append(tp, timestamp, key.getBytes(), value.getBytes(), Record.EMPTY_HEADERS, + null, MAX_BLOCK_TIMEOUT, false, time.milliseconds()).future; + } + private ProduceResponse produceResponse(TopicPartition tp, long offset, Errors error, int throttleTimeMs, long logStartOffset) { ProduceResponse.PartitionResponse resp = new ProduceResponse.PartitionResponse(error, offset, RecordBatch.NO_TIMESTAMP, logStartOffset); Map partResp = Collections.singletonMap(tp, resp); @@ -2479,13 +2461,12 @@ public class SenderTest { this.sender = new Sender(logContext, this.client, this.metadata, this.accumulator, guaranteeOrder, MAX_REQUEST_SIZE, ACKS_ALL, Integer.MAX_VALUE, this.senderMetricsRegistry, this.time, REQUEST_TIMEOUT, RETRY_BACKOFF_MS, transactionManager, apiVersions); - metadata.add("test"); + metadata.add("test", time.milliseconds()); this.client.updateMetadata(TestUtils.metadataUpdateWith(1, Collections.singletonMap("test", 2))); } private void assertSendFailure(Class expectedError) throws Exception { - Future future = accumulator.append(tp0, time.milliseconds(), "key".getBytes(), "value".getBytes(), - null, null, MAX_BLOCK_TIMEOUT, false).future; + Future future = appendToAccumulator(tp0); sender.runOnce(); assertTrue(future.isDone()); try { diff --git a/clients/src/test/java/org/apache/kafka/clients/producer/internals/TransactionManagerTest.java b/clients/src/test/java/org/apache/kafka/clients/producer/internals/TransactionManagerTest.java index a2412d16167..e3879a47ee6 100644 --- a/clients/src/test/java/org/apache/kafka/clients/producer/internals/TransactionManagerTest.java +++ b/clients/src/test/java/org/apache/kafka/clients/producer/internals/TransactionManagerTest.java @@ -141,7 +141,7 @@ public class TransactionManagerTest { new BufferPool(totalSize, batchSize, metrics, time, metricGrpName)); this.sender = new Sender(logContext, this.client, this.metadata, this.accumulator, true, MAX_REQUEST_SIZE, ACKS_ALL, MAX_RETRIES, senderMetrics, this.time, REQUEST_TIMEOUT, 50, transactionManager, apiVersions); - this.metadata.add("test"); + this.metadata.add("test", time.milliseconds()); this.client.updateMetadata(TestUtils.metadataUpdateWith(1, singletonMap("test", 2))); } @@ -154,8 +154,8 @@ public class TransactionManagerTest { transactionManager.failIfNotReadyForSend(); transactionManager.maybeAddPartitionToTransaction(tp0); - FutureRecordMetadata sendFuture = accumulator.append(tp0, time.milliseconds(), "key".getBytes(), - "value".getBytes(), Record.EMPTY_HEADERS, null, MAX_BLOCK_TIMEOUT, false).future; + long nowMs = time.milliseconds(); + FutureRecordMetadata sendFuture = appendToAccumulator(tp0); prepareAddPartitionsToTxn(tp0, Errors.NONE); prepareProduceResponse(Errors.NONE, pid, epoch); @@ -800,8 +800,7 @@ public class TransactionManagerTest { transactionManager.failIfNotReadyForSend(); transactionManager.maybeAddPartitionToTransaction(tp0); - Future responseFuture = accumulator.append(tp0, time.milliseconds(), "key".getBytes(), - "value".getBytes(), Record.EMPTY_HEADERS, null, MAX_BLOCK_TIMEOUT, false).future; + Future responseFuture = appendToAccumulator(tp0); assertFalse(responseFuture.isDone()); prepareAddPartitionsToTxnResponse(Errors.NONE, tp0, epoch, pid); @@ -1251,8 +1250,7 @@ public class TransactionManagerTest { transactionManager.failIfNotReadyForSend(); transactionManager.maybeAddPartitionToTransaction(unauthorizedPartition); - Future responseFuture = accumulator.append(unauthorizedPartition, time.milliseconds(), "key".getBytes(), - "value".getBytes(), Record.EMPTY_HEADERS, null, MAX_BLOCK_TIMEOUT, false).future; + Future responseFuture = appendToAccumulator(unauthorizedPartition); prepareAddPartitionsToTxn(singletonMap(unauthorizedPartition, Errors.TOPIC_AUTHORIZATION_FAILED)); sender.runOnce(); @@ -1275,8 +1273,7 @@ public class TransactionManagerTest { transactionManager.failIfNotReadyForSend(); transactionManager.maybeAddPartitionToTransaction(tp0); - responseFuture = accumulator.append(tp0, time.milliseconds(), "key".getBytes(), - "value".getBytes(), Record.EMPTY_HEADERS, null, MAX_BLOCK_TIMEOUT, false).future; + responseFuture = appendToAccumulator(tp0); prepareAddPartitionsToTxn(singletonMap(tp0, Errors.NONE)); sender.runOnce(); @@ -1309,15 +1306,13 @@ public class TransactionManagerTest { transactionManager.maybeAddPartitionToTransaction(tp0); prepareAddPartitionsToTxn(tp0, Errors.NONE); - Future authorizedTopicProduceFuture = accumulator.append(unauthorizedPartition, time.milliseconds(), - "key".getBytes(), "value".getBytes(), Record.EMPTY_HEADERS, null, MAX_BLOCK_TIMEOUT, false).future; + Future authorizedTopicProduceFuture = appendToAccumulator(unauthorizedPartition); sender.runOnce(); assertTrue(transactionManager.isPartitionAdded(tp0)); transactionManager.failIfNotReadyForSend(); transactionManager.maybeAddPartitionToTransaction(unauthorizedPartition); - Future unauthorizedTopicProduceFuture = accumulator.append(unauthorizedPartition, time.milliseconds(), - "key".getBytes(), "value".getBytes(), Record.EMPTY_HEADERS, null, MAX_BLOCK_TIMEOUT, false).future; + Future unauthorizedTopicProduceFuture = appendToAccumulator(unauthorizedPartition); prepareAddPartitionsToTxn(singletonMap(unauthorizedPartition, Errors.TOPIC_AUTHORIZATION_FAILED)); sender.runOnce(); assertTrue(transactionManager.hasAbortableError()); @@ -1342,8 +1337,7 @@ public class TransactionManagerTest { transactionManager.failIfNotReadyForSend(); transactionManager.maybeAddPartitionToTransaction(tp0); - FutureRecordMetadata nextTransactionFuture = accumulator.append(tp0, time.milliseconds(), "key".getBytes(), - "value".getBytes(), Record.EMPTY_HEADERS, null, MAX_BLOCK_TIMEOUT, false).future; + FutureRecordMetadata nextTransactionFuture = appendToAccumulator(tp0); prepareAddPartitionsToTxn(singletonMap(tp0, Errors.NONE)); sender.runOnce(); @@ -1376,8 +1370,7 @@ public class TransactionManagerTest { transactionManager.maybeAddPartitionToTransaction(tp0); prepareAddPartitionsToTxn(tp0, Errors.NONE); - Future authorizedTopicProduceFuture = accumulator.append(tp0, time.milliseconds(), - "key".getBytes(), "value".getBytes(), Record.EMPTY_HEADERS, null, MAX_BLOCK_TIMEOUT, false).future; + Future authorizedTopicProduceFuture = appendToAccumulator(tp0); sender.runOnce(); assertTrue(transactionManager.isPartitionAdded(tp0)); @@ -1389,8 +1382,7 @@ public class TransactionManagerTest { transactionManager.failIfNotReadyForSend(); transactionManager.maybeAddPartitionToTransaction(unauthorizedPartition); - Future unauthorizedTopicProduceFuture = accumulator.append(unauthorizedPartition, time.milliseconds(), - "key".getBytes(), "value".getBytes(), Record.EMPTY_HEADERS, null, MAX_BLOCK_TIMEOUT, false).future; + Future unauthorizedTopicProduceFuture = appendToAccumulator(unauthorizedPartition); prepareAddPartitionsToTxn(singletonMap(unauthorizedPartition, Errors.TOPIC_AUTHORIZATION_FAILED)); sender.runOnce(); assertTrue(transactionManager.hasAbortableError()); @@ -1419,8 +1411,7 @@ public class TransactionManagerTest { transactionManager.failIfNotReadyForSend(); transactionManager.maybeAddPartitionToTransaction(tp0); - FutureRecordMetadata nextTransactionFuture = accumulator.append(tp0, time.milliseconds(), "key".getBytes(), - "value".getBytes(), Record.EMPTY_HEADERS, null, MAX_BLOCK_TIMEOUT, false).future; + FutureRecordMetadata nextTransactionFuture = appendToAccumulator(tp0); prepareAddPartitionsToTxn(singletonMap(tp0, Errors.NONE)); sender.runOnce(); @@ -1472,8 +1463,7 @@ public class TransactionManagerTest { transactionManager.failIfNotReadyForSend(); transactionManager.maybeAddPartitionToTransaction(tp0); - Future responseFuture = accumulator.append(tp0, time.milliseconds(), "key".getBytes(), - "value".getBytes(), Record.EMPTY_HEADERS, null, MAX_BLOCK_TIMEOUT, false).future; + Future responseFuture = appendToAccumulator(tp0); assertFalse(responseFuture.isDone()); @@ -1518,8 +1508,7 @@ public class TransactionManagerTest { transactionManager.failIfNotReadyForSend(); transactionManager.maybeAddPartitionToTransaction(tp0); - Future responseFuture = accumulator.append(tp0, time.milliseconds(), "key".getBytes(), - "value".getBytes(), Record.EMPTY_HEADERS, null, MAX_BLOCK_TIMEOUT, false).future; + Future responseFuture = appendToAccumulator(tp0); assertFalse(responseFuture.isDone()); prepareAddPartitionsToTxnResponse(Errors.NONE, tp0, epoch, pid); @@ -1534,8 +1523,7 @@ public class TransactionManagerTest { // In the mean time, the user does a second produce to a different partition transactionManager.failIfNotReadyForSend(); transactionManager.maybeAddPartitionToTransaction(tp1); - Future secondResponseFuture = accumulator.append(tp0, time.milliseconds(), "key".getBytes(), - "value".getBytes(), Record.EMPTY_HEADERS, null, MAX_BLOCK_TIMEOUT, false).future; + Future secondResponseFuture = appendToAccumulator(tp0); prepareAddPartitionsToTxnResponse(Errors.NONE, tp1, epoch, pid); prepareProduceResponse(Errors.NONE, pid, epoch); @@ -1570,8 +1558,7 @@ public class TransactionManagerTest { transactionManager.failIfNotReadyForSend(); transactionManager.maybeAddPartitionToTransaction(tp0); - Future responseFuture = accumulator.append(tp0, time.milliseconds(), "key".getBytes(), - "value".getBytes(), Record.EMPTY_HEADERS, null, MAX_BLOCK_TIMEOUT, false).future; + Future responseFuture = appendToAccumulator(tp0); assertFalse(responseFuture.isDone()); prepareAddPartitionsToTxnResponse(Errors.NONE, tp0, epoch, pid); @@ -1609,8 +1596,7 @@ public class TransactionManagerTest { transactionManager.failIfNotReadyForSend(); transactionManager.maybeAddPartitionToTransaction(tp0); - Future responseFuture = accumulator.append(tp0, time.milliseconds(), "key".getBytes(), - "value".getBytes(), Record.EMPTY_HEADERS, null, MAX_BLOCK_TIMEOUT, false).future; + Future responseFuture = appendToAccumulator(tp0); TransactionalRequestResult commitResult = transactionManager.beginCommit(); assertFalse(responseFuture.isDone()); @@ -1659,8 +1645,7 @@ public class TransactionManagerTest { transactionManager.failIfNotReadyForSend(); transactionManager.maybeAddPartitionToTransaction(tp0); - Future responseFuture = accumulator.append(tp0, time.milliseconds(), "key".getBytes(), - "value".getBytes(), Record.EMPTY_HEADERS, null, MAX_BLOCK_TIMEOUT, false).future; + Future responseFuture = appendToAccumulator(tp0); assertFalse(responseFuture.isDone()); prepareAddPartitionsToTxnResponse(Errors.NONE, tp0, epoch, pid); @@ -1688,8 +1673,7 @@ public class TransactionManagerTest { transactionManager.failIfNotReadyForSend(); transactionManager.maybeAddPartitionToTransaction(tp0); - Future responseFuture = accumulator.append(tp0, time.milliseconds(), "key".getBytes(), - "value".getBytes(), Record.EMPTY_HEADERS, null, MAX_BLOCK_TIMEOUT, false).future; + Future responseFuture = appendToAccumulator(tp0); assertFalse(responseFuture.isDone()); prepareAddPartitionsToTxnResponse(Errors.NONE, tp0, epoch, pid); @@ -1726,8 +1710,7 @@ public class TransactionManagerTest { transactionManager.failIfNotReadyForSend(); transactionManager.maybeAddPartitionToTransaction(tp0); - Future responseFuture = accumulator.append(tp0, time.milliseconds(), "key".getBytes(), - "value".getBytes(), Record.EMPTY_HEADERS, null, MAX_BLOCK_TIMEOUT, false).future; + Future responseFuture = appendToAccumulator(tp0); prepareAddPartitionsToTxn(tp0, Errors.NONE); sender.runOnce(); @@ -1776,8 +1759,7 @@ public class TransactionManagerTest { transactionManager.failIfNotReadyForSend(); transactionManager.maybeAddPartitionToTransaction(tp0); - Future responseFuture = accumulator.append(tp0, time.milliseconds(), "key".getBytes(), - "value".getBytes(), Record.EMPTY_HEADERS, null, MAX_BLOCK_TIMEOUT, false).future; + Future responseFuture = appendToAccumulator(tp0); prepareAddPartitionsToTxn(tp0, Errors.NONE); sender.runOnce(); @@ -1832,8 +1814,7 @@ public class TransactionManagerTest { transactionManager.failIfNotReadyForSend(); transactionManager.maybeAddPartitionToTransaction(tp0); - Future responseFuture = accumulator.append(tp0, time.milliseconds(), "key".getBytes(), - "value".getBytes(), Record.EMPTY_HEADERS, null, MAX_BLOCK_TIMEOUT, false).future; + Future responseFuture = appendToAccumulator(tp0); assertFalse(responseFuture.isDone()); sender.runOnce(); // Send AddPartitionsRequest @@ -1861,8 +1842,7 @@ public class TransactionManagerTest { transactionManager.failIfNotReadyForSend(); transactionManager.maybeAddPartitionToTransaction(tp0); - Future responseFuture = accumulator.append(tp0, time.milliseconds(), "key".getBytes(), - "value".getBytes(), Record.EMPTY_HEADERS, null, MAX_BLOCK_TIMEOUT, false).future; + Future responseFuture = appendToAccumulator(tp0); assertFalse(responseFuture.isDone()); @@ -1894,8 +1874,7 @@ public class TransactionManagerTest { transactionManager.maybeAddPartitionToTransaction(tp0); prepareAddPartitionsToTxnResponse(Errors.UNKNOWN_TOPIC_OR_PARTITION, tp0, producerEpoch, producerId); - Future responseFuture = accumulator.append(tp0, time.milliseconds(), "key".getBytes(), - "value".getBytes(), Record.EMPTY_HEADERS, null, MAX_BLOCK_TIMEOUT, false).future; + Future responseFuture = appendToAccumulator(tp0); sender.runOnce(); // Send AddPartitions and let it fail assertFalse(responseFuture.isDone()); @@ -1934,8 +1913,7 @@ public class TransactionManagerTest { prepareAddPartitionsToTxnResponse(Errors.NONE, tp0, producerEpoch, producerId); prepareProduceResponse(Errors.REQUEST_TIMED_OUT, producerId, producerEpoch); - Future responseFuture = accumulator.append(tp0, time.milliseconds(), "key".getBytes(), - "value".getBytes(), Record.EMPTY_HEADERS, null, MAX_BLOCK_TIMEOUT, false).future; + Future responseFuture = appendToAccumulator(tp0); sender.runOnce(); // Send AddPartitions sender.runOnce(); // Send ProduceRequest and let it fail @@ -1970,8 +1948,7 @@ public class TransactionManagerTest { transactionManager.failIfNotReadyForSend(); transactionManager.maybeAddPartitionToTransaction(tp0); - Future responseFuture = accumulator.append(tp0, time.milliseconds(), "key".getBytes(), - "value".getBytes(), Record.EMPTY_HEADERS, null, MAX_BLOCK_TIMEOUT, false).future; + Future responseFuture = appendToAccumulator(tp0); assertFalse(responseFuture.isDone()); prepareAddPartitionsToTxnResponse(Errors.UNKNOWN_TOPIC_OR_PARTITION, tp0, epoch, pid); @@ -2131,12 +2108,10 @@ public class TransactionManagerTest { transactionManager.beginTransaction(); transactionManager.failIfNotReadyForSend(); transactionManager.maybeAddPartitionToTransaction(tp0); - accumulator.append(tp0, time.milliseconds(), "key".getBytes(), - "value".getBytes(), Record.EMPTY_HEADERS, null, MAX_BLOCK_TIMEOUT, false); + appendToAccumulator(tp0); transactionManager.failIfNotReadyForSend(); transactionManager.maybeAddPartitionToTransaction(tp1); - accumulator.append(tp1, time.milliseconds(), "key".getBytes(), - "value".getBytes(), Record.EMPTY_HEADERS, null, MAX_BLOCK_TIMEOUT, false); + appendToAccumulator(tp1); assertFalse(transactionManager.isSendToPartitionAllowed(tp0)); assertFalse(transactionManager.isSendToPartitionAllowed(tp1)); @@ -2188,8 +2163,7 @@ public class TransactionManagerTest { PartitionInfo part1 = new PartitionInfo(topic, 1, node1, null, null); Cluster cluster = new Cluster(null, Collections.singletonList(node1), Collections.singletonList(part1), Collections.emptySet(), Collections.emptySet()); - accumulator.append(tp1, time.milliseconds(), "key".getBytes(), - "value".getBytes(), Record.EMPTY_HEADERS, null, MAX_BLOCK_TIMEOUT, false); + appendToAccumulator(tp1); Map> drainedBatches = accumulator.drain(cluster, Collections.singleton(node1), Integer.MAX_VALUE, time.milliseconds()); @@ -2208,8 +2182,7 @@ public class TransactionManagerTest { doInitTransactions(pid, epoch); transactionManager.beginTransaction(); // Don't execute transactionManager.maybeAddPartitionToTransaction(tp0). This should result in an error on drain. - accumulator.append(tp0, time.milliseconds(), "key".getBytes(), - "value".getBytes(), Record.EMPTY_HEADERS, null, MAX_BLOCK_TIMEOUT, false); + appendToAccumulator(tp0); Node node1 = new Node(0, "localhost", 1111); PartitionInfo part1 = new PartitionInfo(topic, 0, node1, null, null); @@ -2235,8 +2208,7 @@ public class TransactionManagerTest { transactionManager.failIfNotReadyForSend(); transactionManager.maybeAddPartitionToTransaction(tp0); - Future responseFuture = accumulator.append(tp0, time.milliseconds(), "key".getBytes(), - "value".getBytes(), Record.EMPTY_HEADERS, null, MAX_BLOCK_TIMEOUT, false).future; + Future responseFuture = appendToAccumulator(tp0); prepareAddPartitionsToTxnResponse(Errors.NONE, tp0, epoch, pid); prepareProduceResponse(Errors.NOT_LEADER_FOR_PARTITION, pid, epoch); @@ -2264,8 +2236,7 @@ public class TransactionManagerTest { transactionManager.failIfNotReadyForSend(); transactionManager.maybeAddPartitionToTransaction(tp0); - Future responseFuture = accumulator.append(tp0, time.milliseconds(), "key".getBytes(), - "value".getBytes(), Record.EMPTY_HEADERS, null, MAX_BLOCK_TIMEOUT, false).future; + Future responseFuture = appendToAccumulator(tp0); assertFalse(responseFuture.isDone()); @@ -2313,10 +2284,8 @@ public class TransactionManagerTest { transactionManager.failIfNotReadyForSend(); transactionManager.maybeAddPartitionToTransaction(tp1); - Future firstBatchResponse = accumulator.append(tp0, time.milliseconds(), "key".getBytes(), - "value".getBytes(), Record.EMPTY_HEADERS, null, MAX_BLOCK_TIMEOUT, false).future; - Future secondBatchResponse = accumulator.append(tp1, time.milliseconds(), "key".getBytes(), - "value".getBytes(), Record.EMPTY_HEADERS, null, MAX_BLOCK_TIMEOUT, false).future; + Future firstBatchResponse = appendToAccumulator(tp0); + Future secondBatchResponse = appendToAccumulator(tp1); assertFalse(firstBatchResponse.isDone()); assertFalse(secondBatchResponse.isDone()); @@ -2378,8 +2347,7 @@ public class TransactionManagerTest { transactionManager.failIfNotReadyForSend(); transactionManager.maybeAddPartitionToTransaction(tp0); - Future responseFuture = accumulator.append(tp0, time.milliseconds(), "key".getBytes(), - "value".getBytes(), Record.EMPTY_HEADERS, null, MAX_BLOCK_TIMEOUT, false).future; + Future responseFuture = appendToAccumulator(tp0); assertFalse(responseFuture.isDone()); @@ -2445,8 +2413,7 @@ public class TransactionManagerTest { transactionManager.failIfNotReadyForSend(); transactionManager.maybeAddPartitionToTransaction(tp0); - Future responseFuture = accumulator.append(tp0, time.milliseconds(), "key".getBytes(), - "value".getBytes(), Record.EMPTY_HEADERS, null, MAX_BLOCK_TIMEOUT, false).future; + Future responseFuture = appendToAccumulator(tp0); assertFalse(responseFuture.isDone()); @@ -2632,6 +2599,12 @@ public class TransactionManagerTest { verifyCommitOrAbortTranscationRetriable(TransactionResult.ABORT, TransactionResult.COMMIT); } + private FutureRecordMetadata appendToAccumulator(TopicPartition tp) throws InterruptedException { + final long nowMs = time.milliseconds(); + return accumulator.append(tp, nowMs, "key".getBytes(), "value".getBytes(), Record.EMPTY_HEADERS, + null, MAX_BLOCK_TIMEOUT, false, nowMs).future; + } + private void verifyCommitOrAbortTranscationRetriable(TransactionResult firstTransactionResult, TransactionResult retryTransactionResult) throws InterruptedException { @@ -2644,8 +2617,7 @@ public class TransactionManagerTest { transactionManager.failIfNotReadyForSend(); transactionManager.maybeAddPartitionToTransaction(tp0); - accumulator.append(tp0, time.milliseconds(), "key".getBytes(), - "value".getBytes(), Record.EMPTY_HEADERS, null, MAX_BLOCK_TIMEOUT, false); + appendToAccumulator(tp0); prepareAddPartitionsToTxnResponse(Errors.NONE, tp0, epoch, pid); @@ -2686,8 +2658,7 @@ public class TransactionManagerTest { transactionManager.failIfNotReadyForSend(); transactionManager.maybeAddPartitionToTransaction(tp0); - Future responseFuture = accumulator.append(tp0, time.milliseconds(), "key".getBytes(), - "value".getBytes(), Record.EMPTY_HEADERS, null, MAX_BLOCK_TIMEOUT, false).future; + Future responseFuture = appendToAccumulator(tp0); assertFalse(responseFuture.isDone()); prepareAddPartitionsToTxn(tp0, error); sender.runOnce(); // attempt send addPartitions.