Browse Source

MINOR: Do not override retries for idempotent producers (#8097)

The KafkaProducer code would set infinite retries (MAX_INT) if the producer was configured with idempotence and no retries were configured by the user. This is superfluous because KIP-91 changed the retry functionality to both be time-based and the default retries config to be MAX_INT.

Reviewers: Jason Gustafson <jason@confluent.io>
pull/8045/merge
Stanislav Kozlovski 5 years ago committed by GitHub
parent
commit
ea72edebf2
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 12
      clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java

12
clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java

@ -459,7 +459,6 @@ public class KafkaProducer<K, V> implements Producer<K, V> { @@ -459,7 +459,6 @@ public class KafkaProducer<K, V> implements Producer<K, V> {
apiVersions,
throttleTimeSensor,
logContext);
int retries = configureRetries(producerConfig, log);
short acks = configureAcks(producerConfig, log);
return new Sender(logContext,
client,
@ -468,7 +467,7 @@ public class KafkaProducer<K, V> implements Producer<K, V> { @@ -468,7 +467,7 @@ public class KafkaProducer<K, V> implements Producer<K, V> {
maxInflightRequests == 1,
producerConfig.getInt(ProducerConfig.MAX_REQUEST_SIZE_CONFIG),
acks,
retries,
producerConfig.getInt(ProducerConfig.RETRIES_CONFIG),
metricsRegistry.senderMetrics,
time,
requestTimeoutMs,
@ -527,15 +526,6 @@ public class KafkaProducer<K, V> implements Producer<K, V> { @@ -527,15 +526,6 @@ public class KafkaProducer<K, V> implements Producer<K, V> {
return transactionManager;
}
private static int configureRetries(ProducerConfig config, Logger log) {
boolean userConfiguredRetries = config.originals().containsKey(ProducerConfig.RETRIES_CONFIG);
if (config.idempotenceEnabled() && !userConfiguredRetries) {
log.info("Overriding the default retries config to the recommended value of {} since the idempotent " +
"producer is enabled.", Integer.MAX_VALUE);
}
return config.getInt(ProducerConfig.RETRIES_CONFIG);
}
private static int configureInflightRequests(ProducerConfig config) {
if (config.idempotenceEnabled() && 5 < config.getInt(ProducerConfig.MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION)) {
throw new ConfigException("Must set " + ProducerConfig.MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION + " to at most 5" +

Loading…
Cancel
Save