Browse Source

KAFKA-2138; Fix producer to honor retry backoff; reviewed by Joel Koshy and Guozhang Wang

pull/55/merge
Jiangjie Qin 10 years ago committed by Joel Koshy
parent
commit
2166104aff
  1. 26
      clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordAccumulator.java
  2. 38
      clients/src/test/java/org/apache/kafka/clients/producer/internals/RecordAccumulatorTest.java

26
clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordAccumulator.java

@ -292,17 +292,21 @@ public final class RecordAccumulator {
synchronized (deque) { synchronized (deque) {
RecordBatch first = deque.peekFirst(); RecordBatch first = deque.peekFirst();
if (first != null) { if (first != null) {
if (size + first.records.sizeInBytes() > maxSize && !ready.isEmpty()) { boolean backoff = first.attempts > 0 && first.lastAttemptMs + retryBackoffMs > now;
// there is a rare case that a single batch size is larger than the request size due // Only drain the batch if it is not during backoff period.
// to compression; in this case we will still eventually send this batch in a single if (!backoff) {
// request if (size + first.records.sizeInBytes() > maxSize && !ready.isEmpty()) {
break; // there is a rare case that a single batch size is larger than the request size due
} else { // to compression; in this case we will still eventually send this batch in a single
RecordBatch batch = deque.pollFirst(); // request
batch.records.close(); break;
size += batch.records.sizeInBytes(); } else {
ready.add(batch); RecordBatch batch = deque.pollFirst();
batch.drainedMs = now; batch.records.close();
size += batch.records.sizeInBytes();
ready.add(batch);
batch.drainedMs = now;
}
} }
} }
} }

38
clients/src/test/java/org/apache/kafka/clients/producer/internals/RecordAccumulatorTest.java

@ -203,6 +203,44 @@ public class RecordAccumulatorTest {
// but have leaders with other sendable data. // but have leaders with other sendable data.
assertTrue("Next check time should be defined by node2, at most linger time", result.nextReadyCheckDelayMs <= lingerMs); assertTrue("Next check time should be defined by node2, at most linger time", result.nextReadyCheckDelayMs <= lingerMs);
} }
@Test
public void testRetryBackoff() throws Exception {
long lingerMs = Long.MAX_VALUE / 4;
long retryBackoffMs = Long.MAX_VALUE / 2;
final RecordAccumulator accum = new RecordAccumulator(1024, 10 * 1024, CompressionType.NONE, lingerMs, retryBackoffMs, false, metrics, time, metricTags);
long now = time.milliseconds();
accum.append(tp1, key, value, null);
RecordAccumulator.ReadyCheckResult result = accum.ready(cluster, now + lingerMs + 1);
assertEquals("Node1 should be ready", Collections.singleton(node1), result.readyNodes);
Map<Integer, List<RecordBatch>> batches = accum.drain(cluster, result.readyNodes, Integer.MAX_VALUE, now + lingerMs + 1);
assertEquals("Node1 should be the only ready node.", 1, batches.size());
assertEquals("Partition 0 should only have one batch drained.", 1, batches.get(0).size());
// Reenqueue the batch
now = time.milliseconds();
accum.reenqueue(batches.get(0).get(0), now);
// Put message for partition 1 into accumulator
accum.append(tp2, key, value, null);
result = accum.ready(cluster, now + lingerMs + 1);
assertEquals("Node1 should be ready", Collections.singleton(node1), result.readyNodes);
// tp1 should backoff while tp2 should not
batches = accum.drain(cluster, result.readyNodes, Integer.MAX_VALUE, now + lingerMs + 1);
assertEquals("Node1 should be the only ready node.", 1, batches.size());
assertEquals("Node1 should only have one batch drained.", 1, batches.get(0).size());
assertEquals("Node1 should only have one batch for partition 1.", tp2, batches.get(0).get(0).topicPartition);
// Partition 0 can be drained after retry backoff
result = accum.ready(cluster, now + retryBackoffMs + 1);
assertEquals("Node1 should be ready", Collections.singleton(node1), result.readyNodes);
batches = accum.drain(cluster, result.readyNodes, Integer.MAX_VALUE, now + retryBackoffMs + 1);
assertEquals("Node1 should be the only ready node.", 1, batches.size());
assertEquals("Node1 should only have one batch drained.", 1, batches.get(0).size());
assertEquals("Node1 should only have one batch for partition 0.", tp1, batches.get(0).get(0).topicPartition);
}
@Test @Test
public void testFlush() throws Exception { public void testFlush() throws Exception {

Loading…
Cancel
Save