Browse Source

MINOR: Test for non-blocking send using max.block.ms=0 (#7370)

Reviewers: Manikumar Reddy <manikumar.reddy@gmail.com>
pull/7807/head
Rajini Sivaram 5 years ago committed by GitHub
parent
commit
d2d6838017
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 7
      core/src/test/scala/integration/kafka/api/BaseProducerSendTest.scala
  2. 60
      core/src/test/scala/integration/kafka/api/PlaintextProducerSendTest.scala

7
core/src/test/scala/integration/kafka/api/BaseProducerSendTest.scala

@ -76,7 +76,8 @@ abstract class BaseProducerSendTest extends KafkaServerTestHarness { @@ -76,7 +76,8 @@ abstract class BaseProducerSendTest extends KafkaServerTestHarness {
deliveryTimeoutMs: Int = 2 * 60 * 1000,
batchSize: Int = 16384,
compressionType: String = "none",
maxBlockMs: Long = 60 * 1000L): KafkaProducer[Array[Byte],Array[Byte]] = {
maxBlockMs: Long = 60 * 1000L,
bufferSize: Long = 1024L * 1024L): KafkaProducer[Array[Byte],Array[Byte]] = {
val producer = TestUtils.createProducer(brokerList,
compressionType = compressionType,
securityProtocol = securityProtocol,
@ -84,7 +85,9 @@ abstract class BaseProducerSendTest extends KafkaServerTestHarness { @@ -84,7 +85,9 @@ abstract class BaseProducerSendTest extends KafkaServerTestHarness {
saslProperties = clientSaslProperties,
lingerMs = lingerMs,
deliveryTimeoutMs = deliveryTimeoutMs,
maxBlockMs = maxBlockMs)
maxBlockMs = maxBlockMs,
batchSize = batchSize,
bufferSize = bufferSize)
registerProducer(producer)
}

60
core/src/test/scala/integration/kafka/api/PlaintextProducerSendTest.scala

@ -18,15 +18,17 @@ @@ -18,15 +18,17 @@
package kafka.api
import java.util.Properties
import java.util.concurrent.ExecutionException
import java.util.concurrent.{ExecutionException, Future, TimeUnit}
import kafka.log.LogConfig
import kafka.utils.TestUtils
import org.apache.kafka.clients.producer.{KafkaProducer, ProducerConfig, ProducerRecord}
import org.apache.kafka.common.errors.{InvalidTimestampException, SerializationException}
import org.apache.kafka.clients.producer.{KafkaProducer, ProducerConfig, ProducerRecord, RecordMetadata}
import org.apache.kafka.common.errors.{InvalidTimestampException, SerializationException, TimeoutException}
import org.apache.kafka.common.record.TimestampType
import org.junit.Assert._
import org.junit.Test
import org.scalatest.Assertions.intercept
class PlaintextProducerSendTest extends BaseProducerSendTest {
@ -114,4 +116,56 @@ class PlaintextProducerSendTest extends BaseProducerSendTest { @@ -114,4 +116,56 @@ class PlaintextProducerSendTest extends BaseProducerSendTest {
}
}
// Test that producer with max.block.ms=0 can be used to send in non-blocking mode
// where requests are failed immediately without blocking if metadata is not available
// or buffer is full.
@Test
def testNonBlockingProducer(): Unit = {
def send(producer: KafkaProducer[Array[Byte],Array[Byte]]): Future[RecordMetadata] = {
producer.send(new ProducerRecord(topic, 0, "key".getBytes, new Array[Byte](1000)))
}
def sendUntilQueued(producer: KafkaProducer[Array[Byte],Array[Byte]]): Future[RecordMetadata] = {
val (future, _) = TestUtils.computeUntilTrue(send(producer))(future => {
if (future.isDone) {
try {
future.get
true // Send was queued and completed successfully
} catch {
case _: ExecutionException => false
}
} else
true // Send future not yet complete, so it has been queued to be sent
})
future
}
def verifySendSuccess(future: Future[RecordMetadata]): Unit = {
val recordMetadata = future.get(10, TimeUnit.SECONDS)
assertEquals(topic, recordMetadata.topic)
assertEquals(0, recordMetadata.partition)
assertTrue(s"Invalid offset $recordMetadata", recordMetadata.offset >= 0)
}
def verifySendFailure(future: Future[RecordMetadata]): Unit = {
assertTrue(future.isDone) // verify future was completed immediately
assertEquals(classOf[TimeoutException], intercept[ExecutionException](future.get).getCause.getClass)
}
// Topic metadata not available, send should fail without blocking
val producer = createProducer(brokerList = brokerList, maxBlockMs = 0)
verifySendFailure(send(producer))
// Test that send starts succeeding once metadata is available
val future = sendUntilQueued(producer)
verifySendSuccess(future)
// Verify that send fails immediately without blocking when there is no space left in the buffer
val producer2 = createProducer(brokerList = brokerList, maxBlockMs = 0,
lingerMs = 15000, batchSize = 1100, bufferSize = 1500)
val future2 = sendUntilQueued(producer2) // wait until metadata is available and one record is queued
verifySendFailure(send(producer2)) // should fail send since buffer is full
verifySendSuccess(future2) // previous batch should be completed and sent now
}
}

Loading…
Cancel
Save