Browse Source

KAFKA-8134: `linger.ms` must be a long

Reviewers: Ismael Juma <ismael@juma.me.uk>, Colin P. McCabe <cmccabe@apache.org>
pull/6656/head
Dhruvil Shah 6 years ago committed by Colin Patrick McCabe
parent
commit
b4532a65f7
  1. 13
      clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java
  2. 2
      clients/src/main/java/org/apache/kafka/clients/producer/ProducerConfig.java
  3. 8
      clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordAccumulator.java
  4. 51
      clients/src/test/java/org/apache/kafka/clients/producer/internals/RecordAccumulatorTest.java
  5. 8
      clients/src/test/java/org/apache/kafka/clients/producer/internals/SenderTest.java
  6. 4
      clients/src/test/java/org/apache/kafka/clients/producer/internals/TransactionManagerTest.java
  7. 13
      core/src/test/scala/integration/kafka/api/BaseProducerSendTest.scala
  8. 6
      core/src/test/scala/integration/kafka/api/PlaintextProducerSendTest.scala
  9. 1
      core/src/test/scala/unit/kafka/server/FetchRequestTest.scala

13
clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java

@ -395,7 +395,7 @@ public class KafkaProducer<K, V> implements Producer<K, V> { @@ -395,7 +395,7 @@ public class KafkaProducer<K, V> implements Producer<K, V> {
this.accumulator = new RecordAccumulator(logContext,
config.getInt(ProducerConfig.BATCH_SIZE_CONFIG),
this.compressionType,
config.getInt(ProducerConfig.LINGER_MS_CONFIG),
lingerMs(config),
retryBackoffMs,
deliveryTimeoutMs,
metrics,
@ -475,12 +475,17 @@ public class KafkaProducer<K, V> implements Producer<K, V> { @@ -475,12 +475,17 @@ public class KafkaProducer<K, V> implements Producer<K, V> {
apiVersions);
}
private static int lingerMs(ProducerConfig config) {
return (int) Math.min(config.getLong(ProducerConfig.LINGER_MS_CONFIG), Integer.MAX_VALUE);
}
private static int configureDeliveryTimeout(ProducerConfig config, Logger log) {
int deliveryTimeoutMs = config.getInt(ProducerConfig.DELIVERY_TIMEOUT_MS_CONFIG);
int lingerMs = config.getInt(ProducerConfig.LINGER_MS_CONFIG);
int lingerMs = lingerMs(config);
int requestTimeoutMs = config.getInt(ProducerConfig.REQUEST_TIMEOUT_MS_CONFIG);
int lingerAndRequestTimeoutMs = (int) Math.min((long) lingerMs + requestTimeoutMs, Integer.MAX_VALUE);
if (deliveryTimeoutMs < Integer.MAX_VALUE && deliveryTimeoutMs < lingerMs + requestTimeoutMs) {
if (deliveryTimeoutMs < Integer.MAX_VALUE && deliveryTimeoutMs < lingerAndRequestTimeoutMs) {
if (config.originals().containsKey(ProducerConfig.DELIVERY_TIMEOUT_MS_CONFIG)) {
// throw an exception if the user explicitly set an inconsistent value
throw new ConfigException(ProducerConfig.DELIVERY_TIMEOUT_MS_CONFIG
@ -488,7 +493,7 @@ public class KafkaProducer<K, V> implements Producer<K, V> { @@ -488,7 +493,7 @@ public class KafkaProducer<K, V> implements Producer<K, V> {
+ " + " + ProducerConfig.REQUEST_TIMEOUT_MS_CONFIG);
} else {
// override deliveryTimeoutMs default value to lingerMs + requestTimeoutMs for backward compatibility
deliveryTimeoutMs = lingerMs + requestTimeoutMs;
deliveryTimeoutMs = lingerAndRequestTimeoutMs;
log.warn("{} should be equal to or larger than {} + {}. Setting it to {}.",
ProducerConfig.DELIVERY_TIMEOUT_MS_CONFIG, ProducerConfig.LINGER_MS_CONFIG,
ProducerConfig.REQUEST_TIMEOUT_MS_CONFIG, deliveryTimeoutMs);

2
clients/src/main/java/org/apache/kafka/clients/producer/ProducerConfig.java

@ -260,7 +260,7 @@ public class ProducerConfig extends AbstractConfig { @@ -260,7 +260,7 @@ public class ProducerConfig extends AbstractConfig {
ACKS_DOC)
.define(COMPRESSION_TYPE_CONFIG, Type.STRING, "none", Importance.HIGH, COMPRESSION_TYPE_DOC)
.define(BATCH_SIZE_CONFIG, Type.INT, 16384, atLeast(0), Importance.MEDIUM, BATCH_SIZE_DOC)
.define(LINGER_MS_CONFIG, Type.INT, 0, atLeast(0), Importance.MEDIUM, LINGER_MS_DOC)
.define(LINGER_MS_CONFIG, Type.LONG, 0, atLeast(0), Importance.MEDIUM, LINGER_MS_DOC)
.define(DELIVERY_TIMEOUT_MS_CONFIG, Type.INT, 120 * 1000, atLeast(0), Importance.MEDIUM, DELIVERY_TIMEOUT_MS_DOC)
.define(CLIENT_ID_CONFIG, Type.STRING, "", Importance.MEDIUM, CommonClientConfigs.CLIENT_ID_DOC)
.define(SEND_BUFFER_CONFIG, Type.INT, 128 * 1024, atLeast(CommonClientConfigs.SEND_BUFFER_LOWER_BOUND), Importance.MEDIUM, CommonClientConfigs.SEND_BUFFER_DOC)

8
clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordAccumulator.java

@ -72,9 +72,9 @@ public final class RecordAccumulator { @@ -72,9 +72,9 @@ public final class RecordAccumulator {
private final AtomicInteger appendsInProgress;
private final int batchSize;
private final CompressionType compression;
private final long lingerMs;
private final int lingerMs;
private final long retryBackoffMs;
private final long deliveryTimeoutMs;
private final int deliveryTimeoutMs;
private final BufferPool free;
private final Time time;
private final ApiVersions apiVersions;
@ -106,9 +106,9 @@ public final class RecordAccumulator { @@ -106,9 +106,9 @@ public final class RecordAccumulator {
public RecordAccumulator(LogContext logContext,
int batchSize,
CompressionType compression,
long lingerMs,
int lingerMs,
long retryBackoffMs,
long deliveryTimeoutMs,
int deliveryTimeoutMs,
Metrics metrics,
String metricGrpName,
Time time,

51
clients/src/test/java/org/apache/kafka/clients/producer/internals/RecordAccumulatorTest.java

@ -103,7 +103,7 @@ public class RecordAccumulatorTest { @@ -103,7 +103,7 @@ public class RecordAccumulatorTest {
int batchSize = 1025;
RecordAccumulator accum = createTestRecordAccumulator(
batchSize + DefaultRecordBatch.RECORD_BATCH_OVERHEAD, 10L * batchSize, CompressionType.NONE, 10L);
batchSize + DefaultRecordBatch.RECORD_BATCH_OVERHEAD, 10L * batchSize, CompressionType.NONE, 10);
int appends = expectedNumAppends(batchSize);
for (int i = 0; i < appends; i++) {
// append to the first batch
@ -152,7 +152,7 @@ public class RecordAccumulatorTest { @@ -152,7 +152,7 @@ public class RecordAccumulatorTest {
int batchSize = 512;
byte[] value = new byte[2 * batchSize];
RecordAccumulator accum = createTestRecordAccumulator(
batchSize + DefaultRecordBatch.RECORD_BATCH_OVERHEAD, 10 * 1024, compressionType, 0L);
batchSize + DefaultRecordBatch.RECORD_BATCH_OVERHEAD, 10 * 1024, compressionType, 0);
accum.append(tp1, 0L, key, value, Record.EMPTY_HEADERS, null, maxBlockTimeMs);
assertEquals("Our partition's leader should be ready", Collections.singleton(node1), accum.ready(cluster, time.milliseconds()).readyNodes);
@ -191,7 +191,7 @@ public class RecordAccumulatorTest { @@ -191,7 +191,7 @@ public class RecordAccumulatorTest {
new ApiVersionsResponse.ApiVersion(ApiKeys.PRODUCE.id, (short) 0, (short) 2))));
RecordAccumulator accum = createTestRecordAccumulator(
batchSize + DefaultRecordBatch.RECORD_BATCH_OVERHEAD, 10 * 1024, compressionType, 0L);
batchSize + DefaultRecordBatch.RECORD_BATCH_OVERHEAD, 10 * 1024, compressionType, 0);
accum.append(tp1, 0L, key, value, Record.EMPTY_HEADERS, null, maxBlockTimeMs);
assertEquals("Our partition's leader should be ready", Collections.singleton(node1), accum.ready(cluster, time.milliseconds()).readyNodes);
@ -213,7 +213,7 @@ public class RecordAccumulatorTest { @@ -213,7 +213,7 @@ public class RecordAccumulatorTest {
@Test
public void testLinger() throws Exception {
long lingerMs = 10L;
int lingerMs = 10;
RecordAccumulator accum = createTestRecordAccumulator(
1024 + DefaultRecordBatch.RECORD_BATCH_OVERHEAD, 10 * 1024, CompressionType.NONE, lingerMs);
accum.append(tp1, 0L, key, value, Record.EMPTY_HEADERS, null, maxBlockTimeMs);
@ -234,7 +234,7 @@ public class RecordAccumulatorTest { @@ -234,7 +234,7 @@ public class RecordAccumulatorTest {
@Test
public void testPartialDrain() throws Exception {
RecordAccumulator accum = createTestRecordAccumulator(
1024 + DefaultRecordBatch.RECORD_BATCH_OVERHEAD, 10 * 1024, CompressionType.NONE, 10L);
1024 + DefaultRecordBatch.RECORD_BATCH_OVERHEAD, 10 * 1024, CompressionType.NONE, 10);
int appends = 1024 / msgSize + 1;
List<TopicPartition> partitions = asList(tp1, tp2);
for (TopicPartition tp : partitions) {
@ -254,7 +254,7 @@ public class RecordAccumulatorTest { @@ -254,7 +254,7 @@ public class RecordAccumulatorTest {
final int msgs = 10000;
final int numParts = 2;
final RecordAccumulator accum = createTestRecordAccumulator(
1024 + DefaultRecordBatch.RECORD_BATCH_OVERHEAD, 10 * 1024, CompressionType.NONE, 0L);
1024 + DefaultRecordBatch.RECORD_BATCH_OVERHEAD, 10 * 1024, CompressionType.NONE, 0);
List<Thread> threads = new ArrayList<>();
for (int i = 0; i < numThreads; i++) {
threads.add(new Thread() {
@ -293,7 +293,7 @@ public class RecordAccumulatorTest { @@ -293,7 +293,7 @@ public class RecordAccumulatorTest {
@Test
public void testNextReadyCheckDelay() throws Exception {
// Next check time will use lingerMs since this test won't trigger any retries/backoff
long lingerMs = 10L;
int lingerMs = 10;
// test case assumes that the records do not fill the batch completely
int batchSize = 1025;
@ -331,10 +331,9 @@ public class RecordAccumulatorTest { @@ -331,10 +331,9 @@ public class RecordAccumulatorTest {
@Test
public void testRetryBackoff() throws Exception {
long lingerMs = Integer.MAX_VALUE / 16;
int lingerMs = Integer.MAX_VALUE / 16;
long retryBackoffMs = Integer.MAX_VALUE / 8;
int requestTimeoutMs = Integer.MAX_VALUE / 4;
long deliveryTimeoutMs = Integer.MAX_VALUE;
int deliveryTimeoutMs = Integer.MAX_VALUE;
long totalSize = 10 * 1024;
int batchSize = 1024 + DefaultRecordBatch.RECORD_BATCH_OVERHEAD;
String metricGrpName = "producer-metrics";
@ -377,7 +376,7 @@ public class RecordAccumulatorTest { @@ -377,7 +376,7 @@ public class RecordAccumulatorTest {
@Test
public void testFlush() throws Exception {
long lingerMs = Integer.MAX_VALUE;
int lingerMs = Integer.MAX_VALUE;
final RecordAccumulator accum = createTestRecordAccumulator(
4 * 1024 + DefaultRecordBatch.RECORD_BATCH_OVERHEAD, 64 * 1024, CompressionType.NONE, lingerMs);
@ -419,7 +418,7 @@ public class RecordAccumulatorTest { @@ -419,7 +418,7 @@ public class RecordAccumulatorTest {
@Test
public void testAwaitFlushComplete() throws Exception {
RecordAccumulator accum = createTestRecordAccumulator(
4 * 1024 + DefaultRecordBatch.RECORD_BATCH_OVERHEAD, 64 * 1024, CompressionType.NONE, Long.MAX_VALUE);
4 * 1024 + DefaultRecordBatch.RECORD_BATCH_OVERHEAD, 64 * 1024, CompressionType.NONE, Integer.MAX_VALUE);
accum.append(new TopicPartition(topic, 0), 0L, key, value, Record.EMPTY_HEADERS, null, maxBlockTimeMs);
accum.beginFlush();
@ -515,8 +514,8 @@ public class RecordAccumulatorTest { @@ -515,8 +514,8 @@ public class RecordAccumulatorTest {
assertTrue(accum.hasIncomplete());
}
private void doExpireBatchSingle(long deliveryTimeoutMs) throws InterruptedException {
long lingerMs = 300L;
private void doExpireBatchSingle(int deliveryTimeoutMs) throws InterruptedException {
int lingerMs = 300;
List<Boolean> muteStates = Arrays.asList(false, true);
Set<Node> readyNodes = null;
List<ProducerBatch> expiredBatches = new ArrayList<>();
@ -554,20 +553,20 @@ public class RecordAccumulatorTest { @@ -554,20 +553,20 @@ public class RecordAccumulatorTest {
@Test
public void testExpiredBatchSingle() throws InterruptedException {
doExpireBatchSingle(3200L);
doExpireBatchSingle(3200);
}
@Test
public void testExpiredBatchSingleMaxValue() throws InterruptedException {
doExpireBatchSingle(Long.MAX_VALUE);
doExpireBatchSingle(Integer.MAX_VALUE);
}
@Test
public void testExpiredBatches() throws InterruptedException {
long retryBackoffMs = 100L;
long lingerMs = 30L;
int lingerMs = 30;
int requestTimeout = 60;
long deliveryTimeoutMs = 3200L;
int deliveryTimeoutMs = 3200;
// test case assumes that the records do not fill the batch completely
int batchSize = 1025;
@ -700,9 +699,8 @@ public class RecordAccumulatorTest { @@ -700,9 +699,8 @@ public class RecordAccumulatorTest {
// Simulate talking to an older broker, ie. one which supports a lower magic.
ApiVersions apiVersions = new ApiVersions();
int batchSize = 1025;
int requestTimeoutMs = 1600;
long deliveryTimeoutMs = 3200L;
long lingerMs = 10L;
int deliveryTimeoutMs = 3200;
int lingerMs = 10;
long retryBackoffMs = 100L;
long totalSize = 10 * batchSize;
String metricGrpName = "producer-metrics";
@ -777,7 +775,7 @@ public class RecordAccumulatorTest { @@ -777,7 +775,7 @@ public class RecordAccumulatorTest {
// First set the compression ratio estimation to be good.
CompressionRatioEstimator.setEstimation(tp1.topic(), CompressionType.GZIP, 0.1f);
RecordAccumulator accum = createTestRecordAccumulator(batchSize, bufferCapacity, CompressionType.GZIP, 0L);
RecordAccumulator accum = createTestRecordAccumulator(batchSize, bufferCapacity, CompressionType.GZIP, 0);
int numSplitBatches = prepareSplitBatches(accum, seed, 100, 20);
assertTrue("There should be some split batches", numSplitBatches > 0);
// Drain all the split batches.
@ -829,7 +827,7 @@ public class RecordAccumulatorTest { @@ -829,7 +827,7 @@ public class RecordAccumulatorTest {
@Test
public void testSoonToExpireBatchesArePickedUpForExpiry() throws InterruptedException {
long lingerMs = 500L;
int lingerMs = 500;
int batchSize = 1025;
RecordAccumulator accum = createTestRecordAccumulator(
@ -993,17 +991,16 @@ public class RecordAccumulatorTest { @@ -993,17 +991,16 @@ public class RecordAccumulatorTest {
}
private RecordAccumulator createTestRecordAccumulator(int batchSize, long totalSize, CompressionType type, long lingerMs) {
long deliveryTimeoutMs = 3200L;
private RecordAccumulator createTestRecordAccumulator(int batchSize, long totalSize, CompressionType type, int lingerMs) {
int deliveryTimeoutMs = 3200;
return createTestRecordAccumulator(deliveryTimeoutMs, batchSize, totalSize, type, lingerMs);
}
/**
* Return a test RecordAccumulator instance
*/
private RecordAccumulator createTestRecordAccumulator(long deliveryTimeoutMs, int batchSize, long totalSize, CompressionType type, long lingerMs) {
private RecordAccumulator createTestRecordAccumulator(int deliveryTimeoutMs, int batchSize, long totalSize, CompressionType type, int lingerMs) {
long retryBackoffMs = 100L;
int requestTimeoutMs = 1600;
String metricGrpName = "producer-metrics";
return new RecordAccumulator(

8
clients/src/test/java/org/apache/kafka/clients/producer/internals/SenderTest.java

@ -1910,14 +1910,14 @@ public class SenderTest { @@ -1910,14 +1910,14 @@ public class SenderTest {
TopicPartition tp) throws Exception {
int maxRetries = 1;
String topic = tp.topic();
long deliveryTimeoutMs = 3000L;
int deliveryTimeoutMs = 3000;
long totalSize = 1024 * 1024;
String metricGrpName = "producer-metrics";
// Set a good compression ratio.
CompressionRatioEstimator.setEstimation(topic, CompressionType.GZIP, 0.2f);
try (Metrics m = new Metrics()) {
accumulator = new RecordAccumulator(logContext, batchSize, CompressionType.GZIP,
0L, 0L, deliveryTimeoutMs, m, metricGrpName, time, new ApiVersions(), txnManager,
0, 0L, deliveryTimeoutMs, m, metricGrpName, time, new ApiVersions(), txnManager,
new BufferPool(totalSize, batchSize, metrics, time, "producer-internal-metrics"));
SenderMetricsRegistry senderMetrics = new SenderMetricsRegistry(m);
Sender sender = new Sender(logContext, client, metadata, this.accumulator, true, MAX_REQUEST_SIZE, ACKS_ALL, maxRetries,
@ -2410,14 +2410,14 @@ public class SenderTest { @@ -2410,14 +2410,14 @@ public class SenderTest {
}
private void setupWithTransactionState(TransactionManager transactionManager, boolean guaranteeOrder, BufferPool customPool) {
long deliveryTimeoutMs = 1500L;
int deliveryTimeoutMs = 1500;
long totalSize = 1024 * 1024;
String metricGrpName = "producer-metrics";
MetricConfig metricConfig = new MetricConfig().tags(Collections.singletonMap("client-id", CLIENT_ID));
this.metrics = new Metrics(metricConfig, time);
BufferPool pool = (customPool == null) ? new BufferPool(totalSize, batchSize, metrics, time, metricGrpName) : customPool;
this.accumulator = new RecordAccumulator(logContext, batchSize, CompressionType.NONE, 0L, 0L,
this.accumulator = new RecordAccumulator(logContext, batchSize, CompressionType.NONE, 0, 0L,
deliveryTimeoutMs, metrics, metricGrpName, time, apiVersions, transactionManager, pool);
this.senderMetricsRegistry = new SenderMetricsRegistry(this.metrics);
this.sender = new Sender(logContext, this.client, this.metadata, this.accumulator, guaranteeOrder, MAX_REQUEST_SIZE, ACKS_ALL,

4
clients/src/test/java/org/apache/kafka/clients/producer/internals/TransactionManagerTest.java

@ -118,7 +118,7 @@ public class TransactionManagerTest { @@ -118,7 +118,7 @@ public class TransactionManagerTest {
Map<String, String> metricTags = new LinkedHashMap<>();
metricTags.put("client-id", CLIENT_ID);
int batchSize = 16 * 1024;
long deliveryTimeoutMs = 3000L;
int deliveryTimeoutMs = 3000;
long totalSize = 1024 * 1024;
String metricGrpName = "producer-metrics";
MetricConfig metricConfig = new MetricConfig().tags(metricTags);
@ -128,7 +128,7 @@ public class TransactionManagerTest { @@ -128,7 +128,7 @@ public class TransactionManagerTest {
Metrics metrics = new Metrics(metricConfig, time);
SenderMetricsRegistry senderMetrics = new SenderMetricsRegistry(metrics);
this.accumulator = new RecordAccumulator(logContext, batchSize, CompressionType.NONE, 0L, 0L,
this.accumulator = new RecordAccumulator(logContext, batchSize, CompressionType.NONE, 0, 0L,
deliveryTimeoutMs, metrics, metricGrpName, time, apiVersions, transactionManager,
new BufferPool(totalSize, batchSize, metrics, time, metricGrpName));
this.sender = new Sender(logContext, this.client, this.metadata, this.accumulator, true, MAX_REQUEST_SIZE, ACKS_ALL,

13
core/src/test/scala/integration/kafka/api/BaseProducerSendTest.scala

@ -72,6 +72,7 @@ abstract class BaseProducerSendTest extends KafkaServerTestHarness { @@ -72,6 +72,7 @@ abstract class BaseProducerSendTest extends KafkaServerTestHarness {
protected def createProducer(brokerList: String,
lingerMs: Int = 0,
deliveryTimeoutMs: Int = 2 * 60 * 1000,
batchSize: Int = 16384,
compressionType: String = "none",
maxBlockMs: Long = 60 * 1000L): KafkaProducer[Array[Byte],Array[Byte]] = {
@ -81,6 +82,7 @@ abstract class BaseProducerSendTest extends KafkaServerTestHarness { @@ -81,6 +82,7 @@ abstract class BaseProducerSendTest extends KafkaServerTestHarness {
trustStoreFile = trustStoreFile,
saslProperties = clientSaslProperties,
lingerMs = lingerMs,
deliveryTimeoutMs = deliveryTimeoutMs,
maxBlockMs = maxBlockMs)
registerProducer(producer)
}
@ -172,13 +174,14 @@ abstract class BaseProducerSendTest extends KafkaServerTestHarness { @@ -172,13 +174,14 @@ abstract class BaseProducerSendTest extends KafkaServerTestHarness {
def testSendCompressedMessageWithCreateTime() {
val producer = createProducer(brokerList = brokerList,
compressionType = "gzip",
lingerMs = Int.MaxValue)
lingerMs = Int.MaxValue,
deliveryTimeoutMs = Int.MaxValue)
sendAndVerifyTimestamp(producer, TimestampType.CREATE_TIME)
}
@Test
def testSendNonCompressedMessageWithCreateTime() {
val producer = createProducer(brokerList = brokerList, lingerMs = Int.MaxValue)
val producer = createProducer(brokerList = brokerList, lingerMs = Int.MaxValue, deliveryTimeoutMs = Int.MaxValue)
sendAndVerifyTimestamp(producer, TimestampType.CREATE_TIME)
}
@ -414,7 +417,7 @@ abstract class BaseProducerSendTest extends KafkaServerTestHarness { @@ -414,7 +417,7 @@ abstract class BaseProducerSendTest extends KafkaServerTestHarness {
*/
@Test
def testFlush() {
val producer = createProducer(brokerList, lingerMs = Int.MaxValue)
val producer = createProducer(brokerList, lingerMs = Int.MaxValue, deliveryTimeoutMs = Int.MaxValue)
try {
createTopic(topic, 2, 2)
val record = new ProducerRecord[Array[Byte], Array[Byte]](topic,
@ -443,7 +446,7 @@ abstract class BaseProducerSendTest extends KafkaServerTestHarness { @@ -443,7 +446,7 @@ abstract class BaseProducerSendTest extends KafkaServerTestHarness {
// Test closing from caller thread.
for (_ <- 0 until 50) {
val producer = createProducer(brokerList, lingerMs = Int.MaxValue)
val producer = createProducer(brokerList, lingerMs = Int.MaxValue, deliveryTimeoutMs = Int.MaxValue)
val responses = (0 until numRecords) map (_ => producer.send(record0))
assertTrue("No request is complete.", responses.forall(!_.isDone()))
producer.close(Duration.ZERO)
@ -483,7 +486,7 @@ abstract class BaseProducerSendTest extends KafkaServerTestHarness { @@ -483,7 +486,7 @@ abstract class BaseProducerSendTest extends KafkaServerTestHarness {
}
}
for (i <- 0 until 50) {
val producer = createProducer(brokerList, lingerMs = Int.MaxValue)
val producer = createProducer(brokerList, lingerMs = Int.MaxValue, deliveryTimeoutMs = Int.MaxValue)
try {
// send message to partition 0
// Only send the records in the first callback since we close the producer in the callback and no records

6
core/src/test/scala/integration/kafka/api/PlaintextProducerSendTest.scala

@ -45,6 +45,7 @@ class PlaintextProducerSendTest extends BaseProducerSendTest { @@ -45,6 +45,7 @@ class PlaintextProducerSendTest extends BaseProducerSendTest {
def testBatchSizeZero() {
val producer = createProducer(brokerList = brokerList,
lingerMs = Int.MaxValue,
deliveryTimeoutMs = Int.MaxValue,
batchSize = 0)
sendAndVerify(producer)
}
@ -53,13 +54,14 @@ class PlaintextProducerSendTest extends BaseProducerSendTest { @@ -53,13 +54,14 @@ class PlaintextProducerSendTest extends BaseProducerSendTest {
def testSendCompressedMessageWithLogAppendTime() {
val producer = createProducer(brokerList = brokerList,
compressionType = "gzip",
lingerMs = Int.MaxValue)
lingerMs = Int.MaxValue,
deliveryTimeoutMs = Int.MaxValue)
sendAndVerifyTimestamp(producer, TimestampType.LOG_APPEND_TIME)
}
@Test
def testSendNonCompressedMessageWithLogAppendTime() {
val producer = createProducer(brokerList = brokerList, lingerMs = Int.MaxValue)
val producer = createProducer(brokerList = brokerList, lingerMs = Int.MaxValue, deliveryTimeoutMs = Int.MaxValue)
sendAndVerifyTimestamp(producer, TimestampType.LOG_APPEND_TIME)
}

1
core/src/test/scala/unit/kafka/server/FetchRequestTest.scala

@ -257,6 +257,7 @@ class FetchRequestTest extends BaseRequestTest { @@ -257,6 +257,7 @@ class FetchRequestTest extends BaseRequestTest {
val batchSize = 4 * msgValueLen
val producer = TestUtils.createProducer(TestUtils.getBrokerListStrFromServers(servers),
lingerMs = Int.MaxValue,
deliveryTimeoutMs = Int.MaxValue,
batchSize = batchSize,
keySerializer = new StringSerializer,
valueSerializer = new ByteArraySerializer)

Loading…
Cancel
Save