Browse Source

KAFKA-3720: Change TimeoutException to BufferExhaustedException when no memory can be allocated for a record within max.block.ms (#8399)

Change TimeoutException to BufferExhaustedException when no memory can be allocated for a record within max.block.ms

Refactored BufferExhaustedException to be a subclass of TimeoutException so existing code that catches TimeoutExceptions keeps working.

Added handling to count these Exceptions in the metric "buffer-exhausted-records".

Test Strategy
There were existing test cases to check this behavior, which I refactored.
I then added an extra case to check whether the expected Exception is actually thrown, which was not covered by current tests.

Reviewers: Ismael Juma <ismael@juma.me.uk>, Jun Rao <junrao@gmail.com>
pull/8464/head
Sönke Liebau 5 years ago committed by GitHub
parent
commit
e032a36070
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 12
      clients/src/main/java/org/apache/kafka/clients/producer/BufferExhaustedException.java
  2. 5
      clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java
  3. 11
      clients/src/main/java/org/apache/kafka/clients/producer/internals/BufferPool.java
  4. 7
      clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordAccumulator.java
  5. 26
      clients/src/test/java/org/apache/kafka/clients/producer/internals/BufferPoolTest.java
  6. 13
      core/src/test/scala/integration/kafka/api/PlaintextProducerSendTest.scala

12
clients/src/main/java/org/apache/kafka/clients/producer/BufferExhaustedException.java

@ -16,13 +16,17 @@ @@ -16,13 +16,17 @@
*/
package org.apache.kafka.clients.producer;
import org.apache.kafka.common.KafkaException;
import org.apache.kafka.common.errors.TimeoutException;
/**
* This exception is thrown if the producer is in non-blocking mode and the rate of data production exceeds the rate at
* which data can be sent for long enough for the allocated buffer to be exhausted.
* This exception is thrown if the producer cannot allocate memory for a record within max.block.ms due to the buffer
* being too full.
*
* In earlier versions a TimeoutException was thrown instead of this. To keep existing catch-clauses working
* this class extends TimeoutException.
*
*/
public class BufferExhaustedException extends KafkaException {
public class BufferExhaustedException extends TimeoutException {
private static final long serialVersionUID = 1L;

5
clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java

@ -966,11 +966,6 @@ public class KafkaProducer<K, V> implements Producer<K, V> { @@ -966,11 +966,6 @@ public class KafkaProducer<K, V> implements Producer<K, V> {
this.errors.record();
this.interceptors.onSendError(record, tp, e);
throw new InterruptException(e);
} catch (BufferExhaustedException e) {
this.errors.record();
this.metrics.sensor("buffer-exhausted-records").record();
this.interceptors.onSendError(record, tp, e);
throw e;
} catch (KafkaException e) {
this.errors.record();
this.interceptors.onSendError(record, tp, e);

11
clients/src/main/java/org/apache/kafka/clients/producer/internals/BufferPool.java

@ -23,9 +23,9 @@ import java.util.concurrent.TimeUnit; @@ -23,9 +23,9 @@ import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.ReentrantLock;
import org.apache.kafka.clients.producer.BufferExhaustedException;
import org.apache.kafka.common.KafkaException;
import org.apache.kafka.common.MetricName;
import org.apache.kafka.common.errors.TimeoutException;
import org.apache.kafka.common.metrics.Metrics;
import org.apache.kafka.common.metrics.Sensor;
import org.apache.kafka.common.metrics.stats.Meter;
@ -83,6 +83,12 @@ public class BufferPool { @@ -83,6 +83,12 @@ public class BufferPool {
MetricName totalMetricName = metrics.metricName("bufferpool-wait-time-total",
metricGrpName,
"The total time an appender waits for space allocation.");
Sensor bufferExhaustedRecordSensor = metrics.sensor("buffer-exhausted-records");
MetricName bufferExhaustedRateMetricName = metrics.metricName("buffer-exhausted-rate", metricGrpName, "The average per-second number of record sends that are dropped due to buffer exhaustion");
MetricName bufferExhaustedTotalMetricName = metrics.metricName("buffer-exhausted-total", metricGrpName, "The total number of record sends that are dropped due to buffer exhaustion");
bufferExhaustedRecordSensor.add(new Meter(bufferExhaustedRateMetricName, bufferExhaustedTotalMetricName));
this.waitTime.add(new Meter(TimeUnit.NANOSECONDS, rateMetricName, totalMetricName));
this.closed = false;
}
@ -151,7 +157,8 @@ public class BufferPool { @@ -151,7 +157,8 @@ public class BufferPool {
throw new KafkaException("Producer closed while allocating memory");
if (waitingTimeElapsed) {
throw new TimeoutException("Failed to allocate memory within the configured max blocking time " + maxTimeToBlockMs + " ms.");
this.metrics.sensor("buffer-exhausted-records").record();
throw new BufferExhaustedException("Failed to allocate memory within the configured max blocking time " + maxTimeToBlockMs + " ms.");
}
remainingTimeToBlockNs -= timeNs;

7
clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordAccumulator.java

@ -43,8 +43,6 @@ import org.apache.kafka.common.header.Header; @@ -43,8 +43,6 @@ import org.apache.kafka.common.header.Header;
import org.apache.kafka.common.metrics.Measurable;
import org.apache.kafka.common.metrics.MetricConfig;
import org.apache.kafka.common.metrics.Metrics;
import org.apache.kafka.common.metrics.Sensor;
import org.apache.kafka.common.metrics.stats.Meter;
import org.apache.kafka.common.record.AbstractRecords;
import org.apache.kafka.common.record.CompressionRatioEstimator;
import org.apache.kafka.common.record.CompressionType;
@ -160,11 +158,6 @@ public final class RecordAccumulator { @@ -160,11 +158,6 @@ public final class RecordAccumulator {
}
};
metrics.addMetric(metricName, availableBytes);
Sensor bufferExhaustedRecordSensor = metrics.sensor("buffer-exhausted-records");
MetricName rateMetricName = metrics.metricName("buffer-exhausted-rate", metricGrpName, "The average per-second number of record sends that are dropped due to buffer exhaustion");
MetricName totalMetricName = metrics.metricName("buffer-exhausted-total", metricGrpName, "The total number of record sends that are dropped due to buffer exhaustion");
bufferExhaustedRecordSensor.add(new Meter(rateMetricName, totalMetricName));
}
/**

26
clients/src/test/java/org/apache/kafka/clients/producer/internals/BufferPoolTest.java

@ -16,8 +16,8 @@ @@ -16,8 +16,8 @@
*/
package org.apache.kafka.clients.producer.internals;
import org.apache.kafka.clients.producer.BufferExhaustedException;
import org.apache.kafka.common.KafkaException;
import org.apache.kafka.common.errors.TimeoutException;
import org.apache.kafka.common.metrics.Metrics;
import org.apache.kafka.common.utils.MockTime;
import org.apache.kafka.common.utils.Time;
@ -152,8 +152,18 @@ public class BufferPoolTest { @@ -152,8 +152,18 @@ public class BufferPoolTest {
}
/**
* Test if Timeout exception is thrown when there is not enough memory to allocate and the elapsed time is greater than the max specified block time.
* And verify that the allocation attempt finishes soon after the maxBlockTimeMs.
* Test if BufferExhausted exception is thrown when there is not enough memory to allocate and the elapsed
* time is greater than the max specified block time.
*/
@Test(expected = BufferExhaustedException.class)
public void testBufferExhaustedExceptionIsThrown() throws Exception {
BufferPool pool = new BufferPool(2, 1, metrics, time, metricGroup);
pool.allocate(1, maxBlockTimeMs);
pool.allocate(2, maxBlockTimeMs);
}
/**
* Verify that a failed allocation attempt due to not enough memory finishes soon after the maxBlockTimeMs.
*/
@Test
public void testBlockTimeout() throws Exception {
@ -171,14 +181,14 @@ public class BufferPoolTest { @@ -171,14 +181,14 @@ public class BufferPoolTest {
try {
pool.allocate(10, maxBlockTimeMs);
fail("The buffer allocated more memory than its maximum value 10");
} catch (TimeoutException e) {
} catch (BufferExhaustedException e) {
// this is good
}
// Thread scheduling sometimes means that deallocation varies by this point
assertTrue("available memory " + pool.availableMemory(), pool.availableMemory() >= 8 && pool.availableMemory() <= 10);
long durationMs = Time.SYSTEM.milliseconds() - beginTimeMs;
assertTrue("TimeoutException should not throw before maxBlockTimeMs", durationMs >= maxBlockTimeMs);
assertTrue("TimeoutException should throw soon after maxBlockTimeMs", durationMs < maxBlockTimeMs + 1000);
assertTrue("BufferExhaustedException should not throw before maxBlockTimeMs", durationMs >= maxBlockTimeMs);
assertTrue("BufferExhaustedException should throw soon after maxBlockTimeMs", durationMs < maxBlockTimeMs + 1000);
}
/**
@ -191,7 +201,7 @@ public class BufferPoolTest { @@ -191,7 +201,7 @@ public class BufferPoolTest {
try {
pool.allocate(2, maxBlockTimeMs);
fail("The buffer allocated more memory than its maximum value 2");
} catch (TimeoutException e) {
} catch (BufferExhaustedException e) {
// this is good
}
assertEquals(0, pool.queued());
@ -266,7 +276,7 @@ public class BufferPoolTest { @@ -266,7 +276,7 @@ public class BufferPoolTest {
try {
pool.allocate(2, maxBlockTimeMs);
fail("The buffer allocated more memory than its maximum value 2");
} catch (TimeoutException e) {
} catch (BufferExhaustedException e) {
// this is good
} catch (InterruptedException e) {
// this can be neglected

13
core/src/test/scala/integration/kafka/api/PlaintextProducerSendTest.scala

@ -23,7 +23,7 @@ import java.util.concurrent.{ExecutionException, Future, TimeUnit} @@ -23,7 +23,7 @@ import java.util.concurrent.{ExecutionException, Future, TimeUnit}
import kafka.log.LogConfig
import kafka.server.Defaults
import kafka.utils.TestUtils
import org.apache.kafka.clients.producer.{KafkaProducer, ProducerConfig, ProducerRecord, RecordMetadata}
import org.apache.kafka.clients.producer.{BufferExhaustedException, KafkaProducer, ProducerConfig, ProducerRecord, RecordMetadata}
import org.apache.kafka.common.errors.{InvalidTimestampException, RecordTooLargeException, SerializationException, TimeoutException}
import org.apache.kafka.common.record.{DefaultRecord, DefaultRecordBatch, Records, TimestampType}
import org.apache.kafka.common.serialization.ByteArraySerializer
@ -150,14 +150,19 @@ class PlaintextProducerSendTest extends BaseProducerSendTest { @@ -150,14 +150,19 @@ class PlaintextProducerSendTest extends BaseProducerSendTest {
assertTrue(s"Invalid offset $recordMetadata", recordMetadata.offset >= 0)
}
def verifySendFailure(future: Future[RecordMetadata]): Unit = {
def verifyMetadataNotAvailable(future: Future[RecordMetadata]): Unit = {
assertTrue(future.isDone) // verify future was completed immediately
assertEquals(classOf[TimeoutException], intercept[ExecutionException](future.get).getCause.getClass)
}
def verifyBufferExhausted(future: Future[RecordMetadata]): Unit = {
assertTrue(future.isDone) // verify future was completed immediately
assertEquals(classOf[BufferExhaustedException], intercept[ExecutionException](future.get).getCause.getClass)
}
// Topic metadata not available, send should fail without blocking
val producer = createProducer(brokerList = brokerList, maxBlockMs = 0)
verifySendFailure(send(producer))
verifyMetadataNotAvailable(send(producer))
// Test that send starts succeeding once metadata is available
val future = sendUntilQueued(producer)
@ -167,7 +172,7 @@ class PlaintextProducerSendTest extends BaseProducerSendTest { @@ -167,7 +172,7 @@ class PlaintextProducerSendTest extends BaseProducerSendTest {
val producer2 = createProducer(brokerList = brokerList, maxBlockMs = 0,
lingerMs = 15000, batchSize = 1100, bufferSize = 1500)
val future2 = sendUntilQueued(producer2) // wait until metadata is available and one record is queued
verifySendFailure(send(producer2)) // should fail send since buffer is full
verifyBufferExhausted(send(producer2)) // should fail send since buffer is full
verifySendSuccess(future2) // previous batch should be completed and sent now
}

Loading…
Cancel
Save