diff --git a/clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordAccumulator.java b/clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordAccumulator.java index 0e7ab29a073..49a98838767 100644 --- a/clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordAccumulator.java +++ b/clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordAccumulator.java @@ -292,17 +292,21 @@ public final class RecordAccumulator { synchronized (deque) { RecordBatch first = deque.peekFirst(); if (first != null) { - if (size + first.records.sizeInBytes() > maxSize && !ready.isEmpty()) { - // there is a rare case that a single batch size is larger than the request size due - // to compression; in this case we will still eventually send this batch in a single - // request - break; - } else { - RecordBatch batch = deque.pollFirst(); - batch.records.close(); - size += batch.records.sizeInBytes(); - ready.add(batch); - batch.drainedMs = now; + boolean backoff = first.attempts > 0 && first.lastAttemptMs + retryBackoffMs > now; + // Only drain the batch if it is not during backoff period. + if (!backoff) { + if (size + first.records.sizeInBytes() > maxSize && !ready.isEmpty()) { + // there is a rare case that a single batch size is larger than the request size due + // to compression; in this case we will still eventually send this batch in a single + // request + break; + } else { + RecordBatch batch = deque.pollFirst(); + batch.records.close(); + size += batch.records.sizeInBytes(); + ready.add(batch); + batch.drainedMs = now; + } } } } diff --git a/clients/src/test/java/org/apache/kafka/clients/producer/internals/RecordAccumulatorTest.java b/clients/src/test/java/org/apache/kafka/clients/producer/internals/RecordAccumulatorTest.java index 05e2929c0a9..baa48e7c1b7 100644 --- a/clients/src/test/java/org/apache/kafka/clients/producer/internals/RecordAccumulatorTest.java +++ b/clients/src/test/java/org/apache/kafka/clients/producer/internals/RecordAccumulatorTest.java @@ -203,6 +203,44 @@ public class RecordAccumulatorTest { // but have leaders with other sendable data. assertTrue("Next check time should be defined by node2, at most linger time", result.nextReadyCheckDelayMs <= lingerMs); } + + @Test + public void testRetryBackoff() throws Exception { + long lingerMs = Long.MAX_VALUE / 4; + long retryBackoffMs = Long.MAX_VALUE / 2; + final RecordAccumulator accum = new RecordAccumulator(1024, 10 * 1024, CompressionType.NONE, lingerMs, retryBackoffMs, false, metrics, time, metricTags); + + long now = time.milliseconds(); + accum.append(tp1, key, value, null); + RecordAccumulator.ReadyCheckResult result = accum.ready(cluster, now + lingerMs + 1); + assertEquals("Node1 should be ready", Collections.singleton(node1), result.readyNodes); + Map> batches = accum.drain(cluster, result.readyNodes, Integer.MAX_VALUE, now + lingerMs + 1); + assertEquals("Node1 should be the only ready node.", 1, batches.size()); + assertEquals("Partition 0 should only have one batch drained.", 1, batches.get(0).size()); + + // Reenqueue the batch + now = time.milliseconds(); + accum.reenqueue(batches.get(0).get(0), now); + + // Put message for partition 1 into accumulator + accum.append(tp2, key, value, null); + result = accum.ready(cluster, now + lingerMs + 1); + assertEquals("Node1 should be ready", Collections.singleton(node1), result.readyNodes); + + // tp1 should backoff while tp2 should not + batches = accum.drain(cluster, result.readyNodes, Integer.MAX_VALUE, now + lingerMs + 1); + assertEquals("Node1 should be the only ready node.", 1, batches.size()); + assertEquals("Node1 should only have one batch drained.", 1, batches.get(0).size()); + assertEquals("Node1 should only have one batch for partition 1.", tp2, batches.get(0).get(0).topicPartition); + + // Partition 0 can be drained after retry backoff + result = accum.ready(cluster, now + retryBackoffMs + 1); + assertEquals("Node1 should be ready", Collections.singleton(node1), result.readyNodes); + batches = accum.drain(cluster, result.readyNodes, Integer.MAX_VALUE, now + retryBackoffMs + 1); + assertEquals("Node1 should be the only ready node.", 1, batches.size()); + assertEquals("Node1 should only have one batch drained.", 1, batches.get(0).size()); + assertEquals("Node1 should only have one batch for partition 0.", tp1, batches.get(0).get(0).topicPartition); + } @Test public void testFlush() throws Exception {