From d2d68380175905775aac0eeaf2238a280fb77f7f Mon Sep 17 00:00:00 2001 From: Rajini Sivaram Date: Mon, 9 Dec 2019 18:18:01 +0000 Subject: [PATCH] MINOR: Test for non-blocking send using max.block.ms=0 (#7370) Reviewers: Manikumar Reddy --- .../kafka/api/BaseProducerSendTest.scala | 7 ++- .../kafka/api/PlaintextProducerSendTest.scala | 60 ++++++++++++++++++- 2 files changed, 62 insertions(+), 5 deletions(-) diff --git a/core/src/test/scala/integration/kafka/api/BaseProducerSendTest.scala b/core/src/test/scala/integration/kafka/api/BaseProducerSendTest.scala index 9ae095372aa..f1696115769 100644 --- a/core/src/test/scala/integration/kafka/api/BaseProducerSendTest.scala +++ b/core/src/test/scala/integration/kafka/api/BaseProducerSendTest.scala @@ -76,7 +76,8 @@ abstract class BaseProducerSendTest extends KafkaServerTestHarness { deliveryTimeoutMs: Int = 2 * 60 * 1000, batchSize: Int = 16384, compressionType: String = "none", - maxBlockMs: Long = 60 * 1000L): KafkaProducer[Array[Byte],Array[Byte]] = { + maxBlockMs: Long = 60 * 1000L, + bufferSize: Long = 1024L * 1024L): KafkaProducer[Array[Byte],Array[Byte]] = { val producer = TestUtils.createProducer(brokerList, compressionType = compressionType, securityProtocol = securityProtocol, @@ -84,7 +85,9 @@ abstract class BaseProducerSendTest extends KafkaServerTestHarness { saslProperties = clientSaslProperties, lingerMs = lingerMs, deliveryTimeoutMs = deliveryTimeoutMs, - maxBlockMs = maxBlockMs) + maxBlockMs = maxBlockMs, + batchSize = batchSize, + bufferSize = bufferSize) registerProducer(producer) } diff --git a/core/src/test/scala/integration/kafka/api/PlaintextProducerSendTest.scala b/core/src/test/scala/integration/kafka/api/PlaintextProducerSendTest.scala index 1dda60c589a..4b82783db4f 100644 --- a/core/src/test/scala/integration/kafka/api/PlaintextProducerSendTest.scala +++ b/core/src/test/scala/integration/kafka/api/PlaintextProducerSendTest.scala @@ -18,15 +18,17 @@ package kafka.api import java.util.Properties -import java.util.concurrent.ExecutionException +import java.util.concurrent.{ExecutionException, Future, TimeUnit} import kafka.log.LogConfig import kafka.utils.TestUtils -import org.apache.kafka.clients.producer.{KafkaProducer, ProducerConfig, ProducerRecord} -import org.apache.kafka.common.errors.{InvalidTimestampException, SerializationException} +import org.apache.kafka.clients.producer.{KafkaProducer, ProducerConfig, ProducerRecord, RecordMetadata} +import org.apache.kafka.common.errors.{InvalidTimestampException, SerializationException, TimeoutException} import org.apache.kafka.common.record.TimestampType import org.junit.Assert._ import org.junit.Test +import org.scalatest.Assertions.intercept + class PlaintextProducerSendTest extends BaseProducerSendTest { @@ -114,4 +116,56 @@ class PlaintextProducerSendTest extends BaseProducerSendTest { } } + // Test that producer with max.block.ms=0 can be used to send in non-blocking mode + // where requests are failed immediately without blocking if metadata is not available + // or buffer is full. + @Test + def testNonBlockingProducer(): Unit = { + + def send(producer: KafkaProducer[Array[Byte],Array[Byte]]): Future[RecordMetadata] = { + producer.send(new ProducerRecord(topic, 0, "key".getBytes, new Array[Byte](1000))) + } + + def sendUntilQueued(producer: KafkaProducer[Array[Byte],Array[Byte]]): Future[RecordMetadata] = { + val (future, _) = TestUtils.computeUntilTrue(send(producer))(future => { + if (future.isDone) { + try { + future.get + true // Send was queued and completed successfully + } catch { + case _: ExecutionException => false + } + } else + true // Send future not yet complete, so it has been queued to be sent + }) + future + } + + def verifySendSuccess(future: Future[RecordMetadata]): Unit = { + val recordMetadata = future.get(10, TimeUnit.SECONDS) + assertEquals(topic, recordMetadata.topic) + assertEquals(0, recordMetadata.partition) + assertTrue(s"Invalid offset $recordMetadata", recordMetadata.offset >= 0) + } + + def verifySendFailure(future: Future[RecordMetadata]): Unit = { + assertTrue(future.isDone) // verify future was completed immediately + assertEquals(classOf[TimeoutException], intercept[ExecutionException](future.get).getCause.getClass) + } + + // Topic metadata not available, send should fail without blocking + val producer = createProducer(brokerList = brokerList, maxBlockMs = 0) + verifySendFailure(send(producer)) + + // Test that send starts succeeding once metadata is available + val future = sendUntilQueued(producer) + verifySendSuccess(future) + + // Verify that send fails immediately without blocking when there is no space left in the buffer + val producer2 = createProducer(brokerList = brokerList, maxBlockMs = 0, + lingerMs = 15000, batchSize = 1100, bufferSize = 1500) + val future2 = sendUntilQueued(producer2) // wait until metadata is available and one record is queued + verifySendFailure(send(producer2)) // should fail send since buffer is full + verifySendSuccess(future2) // previous batch should be completed and sent now + } }