Browse Source

MINOR: Document that max.block.ms affects some transaction methods (#8975)

The documentation for max.block.ms said it affected only send()
and partitionsFor(), but it actually also affects initTransactions(),
abortTransaction() and commitTransaction(). So rework the
documentation to cover these methods too.

Reviewers: Boyang Chen <boyang@confluent.io>
pull/8982/head
Tom Bentley 4 years ago committed by GitHub
parent
commit
ce939e9136
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 11
      clients/src/main/java/org/apache/kafka/clients/producer/ProducerConfig.java
  2. 2
      clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordAccumulator.java

11
clients/src/main/java/org/apache/kafka/clients/producer/ProducerConfig.java

@ -156,9 +156,14 @@ public class ProducerConfig extends AbstractConfig { @@ -156,9 +156,14 @@ public class ProducerConfig extends AbstractConfig {
/** <code>max.block.ms</code> */
public static final String MAX_BLOCK_MS_CONFIG = "max.block.ms";
private static final String MAX_BLOCK_MS_DOC = "The configuration controls how long <code>KafkaProducer.send()</code> and <code>KafkaProducer.partitionsFor()</code> will block."
+ "These methods can be blocked either because the buffer is full or metadata unavailable."
+ "Blocking in the user-supplied serializers or partitioner will not be counted against this timeout.";
private static final String MAX_BLOCK_MS_DOC = "The configuration controls how long the <code>KafkaProducer</code>'s <code>send()</code>, <code>partitionsFor()</code>, "
+ "<code>initTransactions()</code>, <code>commitTransaction()</code> "
+ "and <code>abortTransaction()</code> methods will block. "
+ "For <code>send()</code> this timeout bounds the total time waiting for both metadata fetch and buffer allocation "
+ "(blocking in the user-supplied serializers or partitioner is not counted against this timeout). "
+ "For <code>partitionsFor()</code> this timeout bounds the time spent waiting for metadata if it is unavailable. "
+ "The transaction-related methods always block, but may timeout if "
+ "the transaction coordinator could not be discovered or did not respond within the timeout.";
/** <code>buffer.memory</code> */
public static final String BUFFER_MEMORY_CONFIG = "buffer.memory";

2
clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordAccumulator.java

@ -210,7 +210,7 @@ public final class RecordAccumulator { @@ -210,7 +210,7 @@ public final class RecordAccumulator {
byte maxUsableMagic = apiVersions.maxUsableProduceMagic();
int size = Math.max(this.batchSize, AbstractRecords.estimateSizeInBytesUpperBound(maxUsableMagic, compression, key, value, headers));
log.trace("Allocating a new {} byte message buffer for topic {} partition {}", size, tp.topic(), tp.partition());
log.trace("Allocating a new {} byte message buffer for topic {} partition {} with remaining timeout {}ms", size, tp.topic(), tp.partition(), maxTimeToBlock);
buffer = free.allocate(size, maxTimeToBlock);
// Update the current time in case the buffer allocation blocked above.

Loading…
Cancel
Save