From fe27d8f787f38428e0add36edeac9d694f16af53 Mon Sep 17 00:00:00 2001 From: Ismael Juma Date: Tue, 24 May 2016 09:13:40 +0100 Subject: [PATCH] KAFKA-3747; Close `RecordBatch.records` when append to batch fails With this change, `test_producer_throughput` with message_size=10000, compression_type=snappy and a snappy buffer size of 32k can be executed in a heap of 192m in a local environment (768m is needed without this change). Author: Ismael Juma Reviewers: Guozhang Wang Closes #1418 from ijuma/kafka-3747-close-record-batch-when-append-fails --- .../producer/internals/RecordAccumulator.java | 63 ++++++++++++------- .../kafka/common/record/MemoryRecords.java | 5 ++ .../internals/RecordAccumulatorTest.java | 18 +++++- 3 files changed, 60 insertions(+), 26 deletions(-) diff --git a/clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordAccumulator.java b/clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordAccumulator.java index 5339096efa7..a73d8822265 100644 --- a/clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordAccumulator.java +++ b/clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordAccumulator.java @@ -74,7 +74,6 @@ public final class RecordAccumulator { private final Set muted; private int drainIndex; - /** * Create a new record accumulator * @@ -104,11 +103,11 @@ public final class RecordAccumulator { this.compression = compression; this.lingerMs = lingerMs; this.retryBackoffMs = retryBackoffMs; - this.batches = new CopyOnWriteMap>(); + this.batches = new CopyOnWriteMap<>(); String metricGrpName = "producer-metrics"; this.free = new BufferPool(totalSize, batchSize, metrics, time, metricGrpName); this.incomplete = new IncompleteRecordBatches(); - this.muted = new HashSet(); + this.muted = new HashSet<>(); this.time = time; registerMetrics(metrics, metricGrpName); } @@ -171,12 +170,9 @@ public final class RecordAccumulator { synchronized (dq) { if (closed) throw new IllegalStateException("Cannot send after the producer is closed."); - RecordBatch last = dq.peekLast(); - if (last != null) { - FutureRecordMetadata future = last.tryAppend(timestamp, key, value, callback, time.milliseconds()); - if (future != null) - return new RecordAppendResult(future, dq.size() > 1 || last.records.isFull(), false); - } + RecordAppendResult appendResult = tryAppend(timestamp, key, value, callback, dq); + if (appendResult != null) + return appendResult; } // we don't have an in-progress record batch try to allocate a new batch @@ -187,14 +183,12 @@ public final class RecordAccumulator { // Need to check if producer is closed again after grabbing the dequeue lock. if (closed) throw new IllegalStateException("Cannot send after the producer is closed."); - RecordBatch last = dq.peekLast(); - if (last != null) { - FutureRecordMetadata future = last.tryAppend(timestamp, key, value, callback, time.milliseconds()); - if (future != null) { - // Somebody else found us a batch, return the one we waited for! Hopefully this doesn't happen often... - free.deallocate(buffer); - return new RecordAppendResult(future, dq.size() > 1 || last.records.isFull(), false); - } + + RecordAppendResult appendResult = tryAppend(timestamp, key, value, callback, dq); + if (appendResult != null) { + // Somebody else found us a batch, return the one we waited for! Hopefully this doesn't happen often... + free.deallocate(buffer); + return appendResult; } MemoryRecords records = MemoryRecords.emptyRecords(buffer, compression, this.batchSize); RecordBatch batch = new RecordBatch(tp, records, time.milliseconds()); @@ -209,12 +203,28 @@ public final class RecordAccumulator { } } + /** + * If `RecordBatch.tryAppend` fails (i.e. the record batch is full), close its memory records to release temporary + * resources (like compression streams buffers). + */ + private RecordAppendResult tryAppend(long timestamp, byte[] key, byte[] value, Callback callback, Deque deque) { + RecordBatch last = deque.peekLast(); + if (last != null) { + FutureRecordMetadata future = last.tryAppend(timestamp, key, value, callback, time.milliseconds()); + if (future == null) + last.records.close(); + else + return new RecordAppendResult(future, deque.size() > 1 || last.records.isFull(), false); + } + return null; + } + /** * Abort the batches that have been sitting in RecordAccumulator for more than the configured requestTimeout * due to metadata being unavailable */ public List abortExpiredBatches(int requestTimeout, long now) { - List expiredBatches = new ArrayList(); + List expiredBatches = new ArrayList<>(); int count = 0; for (Map.Entry> entry : this.batches.entrySet()) { Deque dq = entry.getValue(); @@ -245,7 +255,7 @@ public final class RecordAccumulator { } } } - if (expiredBatches.size() > 0) + if (!expiredBatches.isEmpty()) log.trace("Expired {} batches in accumulator", count); return expiredBatches; @@ -287,7 +297,7 @@ public final class RecordAccumulator { * */ public ReadyCheckResult ready(Cluster cluster, long nowMs) { - Set readyNodes = new HashSet(); + Set readyNodes = new HashSet<>(); long nextReadyCheckDelayMs = Long.MAX_VALUE; boolean unknownLeadersExist = false; @@ -333,7 +343,7 @@ public final class RecordAccumulator { for (Map.Entry> entry : this.batches.entrySet()) { Deque deque = entry.getValue(); synchronized (deque) { - if (deque.size() > 0) + if (!deque.isEmpty()) return true; } } @@ -357,11 +367,11 @@ public final class RecordAccumulator { if (nodes.isEmpty()) return Collections.emptyMap(); - Map> batches = new HashMap>(); + Map> batches = new HashMap<>(); for (Node node : nodes) { int size = 0; List parts = cluster.partitionsForNode(node.id()); - List ready = new ArrayList(); + List ready = new ArrayList<>(); /* to make starvation less likely this loop doesn't start at 0 */ int start = drainIndex = drainIndex % parts.size(); do { @@ -436,6 +446,11 @@ public final class RecordAccumulator { boolean flushInProgress() { return flushesInProgress.get() > 0; } + + /* Visible for testing */ + Map> batches() { + return Collections.unmodifiableMap(batches); + } /** * Initiate the flushing of data from the accumulator...this makes all requests immediately ready @@ -569,7 +584,7 @@ public final class RecordAccumulator { public Iterable all() { synchronized (incomplete) { - return new ArrayList(this.incomplete); + return new ArrayList<>(this.incomplete); } } } diff --git a/clients/src/main/java/org/apache/kafka/common/record/MemoryRecords.java b/clients/src/main/java/org/apache/kafka/common/record/MemoryRecords.java index fcf7f446a45..603f74b18e0 100644 --- a/clients/src/main/java/org/apache/kafka/common/record/MemoryRecords.java +++ b/clients/src/main/java/org/apache/kafka/common/record/MemoryRecords.java @@ -213,6 +213,11 @@ public class MemoryRecords implements Records { return builder.toString(); } + /** Visible for testing */ + public boolean isWritable() { + return writable; + } + public static class RecordsIterator extends AbstractIterator { private final ByteBuffer buffer; private final DataInputStream stream; diff --git a/clients/src/test/java/org/apache/kafka/clients/producer/internals/RecordAccumulatorTest.java b/clients/src/test/java/org/apache/kafka/clients/producer/internals/RecordAccumulatorTest.java index b3a5a049a82..43ac15a09a4 100644 --- a/clients/src/test/java/org/apache/kafka/clients/producer/internals/RecordAccumulatorTest.java +++ b/clients/src/test/java/org/apache/kafka/clients/producer/internals/RecordAccumulatorTest.java @@ -22,6 +22,7 @@ import java.nio.ByteBuffer; import java.util.ArrayList; import java.util.Arrays; import java.util.Collections; +import java.util.Deque; import java.util.Iterator; import java.util.List; import java.util.Map; @@ -75,14 +76,27 @@ public class RecordAccumulatorTest { @Test public void testFull() throws Exception { long now = time.milliseconds(); - RecordAccumulator accum = new RecordAccumulator(1024, 10 * 1024, CompressionType.NONE, 10L, 100L, metrics, time); - int appends = 1024 / msgSize; + int batchSize = 1024; + RecordAccumulator accum = new RecordAccumulator(batchSize, 10 * batchSize, CompressionType.NONE, 10L, 100L, metrics, time); + int appends = batchSize / msgSize; for (int i = 0; i < appends; i++) { + // append to the first batch accum.append(tp1, 0L, key, value, null, maxBlockTimeMs); + Deque partitionBatches = accum.batches().get(tp1); + assertEquals(1, partitionBatches.size()); + assertTrue(partitionBatches.peekFirst().records.isWritable()); assertEquals("No partitions should be ready.", 0, accum.ready(cluster, now).readyNodes.size()); } + + // this append doesn't fit in the first batch, so a new batch is created and the first batch is closed accum.append(tp1, 0L, key, value, null, maxBlockTimeMs); + Deque partitionBatches = accum.batches().get(tp1); + assertEquals(2, partitionBatches.size()); + Iterator partitionBatchesIterator = partitionBatches.iterator(); + assertFalse(partitionBatchesIterator.next().records.isWritable()); + assertTrue(partitionBatchesIterator.next().records.isWritable()); assertEquals("Our partition's leader should be ready", Collections.singleton(node1), accum.ready(cluster, time.milliseconds()).readyNodes); + List batches = accum.drain(cluster, Collections.singleton(node1), Integer.MAX_VALUE, 0).get(node1.id()); assertEquals(1, batches.size()); RecordBatch batch = batches.get(0);