Browse Source

KAFKA-3747; Close `RecordBatch.records` when append to batch fails

With this change, `test_producer_throughput` with message_size=10000, compression_type=snappy and a snappy buffer size of 32k can be executed in a heap of 192m in a local environment (768m is needed without this change).

Author: Ismael Juma <ismael@juma.me.uk>

Reviewers: Guozhang Wang <wangguoz@gmail.com>

Closes #1418 from ijuma/kafka-3747-close-record-batch-when-append-fails
pull/1423/head
Ismael Juma 9 years ago
parent
commit
fe27d8f787
  1. 63
      clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordAccumulator.java
  2. 5
      clients/src/main/java/org/apache/kafka/common/record/MemoryRecords.java
  3. 18
      clients/src/test/java/org/apache/kafka/clients/producer/internals/RecordAccumulatorTest.java

63
clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordAccumulator.java

@ -74,7 +74,6 @@ public final class RecordAccumulator { @@ -74,7 +74,6 @@ public final class RecordAccumulator {
private final Set<TopicPartition> muted;
private int drainIndex;
/**
* Create a new record accumulator
*
@ -104,11 +103,11 @@ public final class RecordAccumulator { @@ -104,11 +103,11 @@ public final class RecordAccumulator {
this.compression = compression;
this.lingerMs = lingerMs;
this.retryBackoffMs = retryBackoffMs;
this.batches = new CopyOnWriteMap<TopicPartition, Deque<RecordBatch>>();
this.batches = new CopyOnWriteMap<>();
String metricGrpName = "producer-metrics";
this.free = new BufferPool(totalSize, batchSize, metrics, time, metricGrpName);
this.incomplete = new IncompleteRecordBatches();
this.muted = new HashSet<TopicPartition>();
this.muted = new HashSet<>();
this.time = time;
registerMetrics(metrics, metricGrpName);
}
@ -171,12 +170,9 @@ public final class RecordAccumulator { @@ -171,12 +170,9 @@ public final class RecordAccumulator {
synchronized (dq) {
if (closed)
throw new IllegalStateException("Cannot send after the producer is closed.");
RecordBatch last = dq.peekLast();
if (last != null) {
FutureRecordMetadata future = last.tryAppend(timestamp, key, value, callback, time.milliseconds());
if (future != null)
return new RecordAppendResult(future, dq.size() > 1 || last.records.isFull(), false);
}
RecordAppendResult appendResult = tryAppend(timestamp, key, value, callback, dq);
if (appendResult != null)
return appendResult;
}
// we don't have an in-progress record batch try to allocate a new batch
@ -187,14 +183,12 @@ public final class RecordAccumulator { @@ -187,14 +183,12 @@ public final class RecordAccumulator {
// Need to check if producer is closed again after grabbing the dequeue lock.
if (closed)
throw new IllegalStateException("Cannot send after the producer is closed.");
RecordBatch last = dq.peekLast();
if (last != null) {
FutureRecordMetadata future = last.tryAppend(timestamp, key, value, callback, time.milliseconds());
if (future != null) {
// Somebody else found us a batch, return the one we waited for! Hopefully this doesn't happen often...
free.deallocate(buffer);
return new RecordAppendResult(future, dq.size() > 1 || last.records.isFull(), false);
}
RecordAppendResult appendResult = tryAppend(timestamp, key, value, callback, dq);
if (appendResult != null) {
// Somebody else found us a batch, return the one we waited for! Hopefully this doesn't happen often...
free.deallocate(buffer);
return appendResult;
}
MemoryRecords records = MemoryRecords.emptyRecords(buffer, compression, this.batchSize);
RecordBatch batch = new RecordBatch(tp, records, time.milliseconds());
@ -209,12 +203,28 @@ public final class RecordAccumulator { @@ -209,12 +203,28 @@ public final class RecordAccumulator {
}
}
/**
* If `RecordBatch.tryAppend` fails (i.e. the record batch is full), close its memory records to release temporary
* resources (like compression streams buffers).
*/
private RecordAppendResult tryAppend(long timestamp, byte[] key, byte[] value, Callback callback, Deque<RecordBatch> deque) {
RecordBatch last = deque.peekLast();
if (last != null) {
FutureRecordMetadata future = last.tryAppend(timestamp, key, value, callback, time.milliseconds());
if (future == null)
last.records.close();
else
return new RecordAppendResult(future, deque.size() > 1 || last.records.isFull(), false);
}
return null;
}
/**
* Abort the batches that have been sitting in RecordAccumulator for more than the configured requestTimeout
* due to metadata being unavailable
*/
public List<RecordBatch> abortExpiredBatches(int requestTimeout, long now) {
List<RecordBatch> expiredBatches = new ArrayList<RecordBatch>();
List<RecordBatch> expiredBatches = new ArrayList<>();
int count = 0;
for (Map.Entry<TopicPartition, Deque<RecordBatch>> entry : this.batches.entrySet()) {
Deque<RecordBatch> dq = entry.getValue();
@ -245,7 +255,7 @@ public final class RecordAccumulator { @@ -245,7 +255,7 @@ public final class RecordAccumulator {
}
}
}
if (expiredBatches.size() > 0)
if (!expiredBatches.isEmpty())
log.trace("Expired {} batches in accumulator", count);
return expiredBatches;
@ -287,7 +297,7 @@ public final class RecordAccumulator { @@ -287,7 +297,7 @@ public final class RecordAccumulator {
* </ol>
*/
public ReadyCheckResult ready(Cluster cluster, long nowMs) {
Set<Node> readyNodes = new HashSet<Node>();
Set<Node> readyNodes = new HashSet<>();
long nextReadyCheckDelayMs = Long.MAX_VALUE;
boolean unknownLeadersExist = false;
@ -333,7 +343,7 @@ public final class RecordAccumulator { @@ -333,7 +343,7 @@ public final class RecordAccumulator {
for (Map.Entry<TopicPartition, Deque<RecordBatch>> entry : this.batches.entrySet()) {
Deque<RecordBatch> deque = entry.getValue();
synchronized (deque) {
if (deque.size() > 0)
if (!deque.isEmpty())
return true;
}
}
@ -357,11 +367,11 @@ public final class RecordAccumulator { @@ -357,11 +367,11 @@ public final class RecordAccumulator {
if (nodes.isEmpty())
return Collections.emptyMap();
Map<Integer, List<RecordBatch>> batches = new HashMap<Integer, List<RecordBatch>>();
Map<Integer, List<RecordBatch>> batches = new HashMap<>();
for (Node node : nodes) {
int size = 0;
List<PartitionInfo> parts = cluster.partitionsForNode(node.id());
List<RecordBatch> ready = new ArrayList<RecordBatch>();
List<RecordBatch> ready = new ArrayList<>();
/* to make starvation less likely this loop doesn't start at 0 */
int start = drainIndex = drainIndex % parts.size();
do {
@ -436,6 +446,11 @@ public final class RecordAccumulator { @@ -436,6 +446,11 @@ public final class RecordAccumulator {
boolean flushInProgress() {
return flushesInProgress.get() > 0;
}
/* Visible for testing */
Map<TopicPartition, Deque<RecordBatch>> batches() {
return Collections.unmodifiableMap(batches);
}
/**
* Initiate the flushing of data from the accumulator...this makes all requests immediately ready
@ -569,7 +584,7 @@ public final class RecordAccumulator { @@ -569,7 +584,7 @@ public final class RecordAccumulator {
public Iterable<RecordBatch> all() {
synchronized (incomplete) {
return new ArrayList<RecordBatch>(this.incomplete);
return new ArrayList<>(this.incomplete);
}
}
}

5
clients/src/main/java/org/apache/kafka/common/record/MemoryRecords.java

@ -213,6 +213,11 @@ public class MemoryRecords implements Records { @@ -213,6 +213,11 @@ public class MemoryRecords implements Records {
return builder.toString();
}
/** Visible for testing */
public boolean isWritable() {
return writable;
}
public static class RecordsIterator extends AbstractIterator<LogEntry> {
private final ByteBuffer buffer;
private final DataInputStream stream;

18
clients/src/test/java/org/apache/kafka/clients/producer/internals/RecordAccumulatorTest.java

@ -22,6 +22,7 @@ import java.nio.ByteBuffer; @@ -22,6 +22,7 @@ import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.Deque;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
@ -75,14 +76,27 @@ public class RecordAccumulatorTest { @@ -75,14 +76,27 @@ public class RecordAccumulatorTest {
@Test
public void testFull() throws Exception {
long now = time.milliseconds();
RecordAccumulator accum = new RecordAccumulator(1024, 10 * 1024, CompressionType.NONE, 10L, 100L, metrics, time);
int appends = 1024 / msgSize;
int batchSize = 1024;
RecordAccumulator accum = new RecordAccumulator(batchSize, 10 * batchSize, CompressionType.NONE, 10L, 100L, metrics, time);
int appends = batchSize / msgSize;
for (int i = 0; i < appends; i++) {
// append to the first batch
accum.append(tp1, 0L, key, value, null, maxBlockTimeMs);
Deque<RecordBatch> partitionBatches = accum.batches().get(tp1);
assertEquals(1, partitionBatches.size());
assertTrue(partitionBatches.peekFirst().records.isWritable());
assertEquals("No partitions should be ready.", 0, accum.ready(cluster, now).readyNodes.size());
}
// this append doesn't fit in the first batch, so a new batch is created and the first batch is closed
accum.append(tp1, 0L, key, value, null, maxBlockTimeMs);
Deque<RecordBatch> partitionBatches = accum.batches().get(tp1);
assertEquals(2, partitionBatches.size());
Iterator<RecordBatch> partitionBatchesIterator = partitionBatches.iterator();
assertFalse(partitionBatchesIterator.next().records.isWritable());
assertTrue(partitionBatchesIterator.next().records.isWritable());
assertEquals("Our partition's leader should be ready", Collections.singleton(node1), accum.ready(cluster, time.milliseconds()).readyNodes);
List<RecordBatch> batches = accum.drain(cluster, Collections.singleton(node1), Integer.MAX_VALUE, 0).get(node1.id());
assertEquals(1, batches.size());
RecordBatch batch = batches.get(0);

Loading…
Cancel
Save