From ea72edebf2d484e42a4251c53cd6e383743b5d1a Mon Sep 17 00:00:00 2001 From: Stanislav Kozlovski Date: Thu, 13 Feb 2020 07:42:02 +0000 Subject: [PATCH] MINOR: Do not override retries for idempotent producers (#8097) The KafkaProducer code would set infinite retries (MAX_INT) if the producer was configured with idempotence and no retries were configured by the user. This is superfluous because KIP-91 changed the retry functionality to both be time-based and the default retries config to be MAX_INT. Reviewers: Jason Gustafson --- .../apache/kafka/clients/producer/KafkaProducer.java | 12 +----------- 1 file changed, 1 insertion(+), 11 deletions(-) diff --git a/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java b/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java index 36af5030ec6..cb9cfe46290 100644 --- a/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java +++ b/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java @@ -459,7 +459,6 @@ public class KafkaProducer implements Producer { apiVersions, throttleTimeSensor, logContext); - int retries = configureRetries(producerConfig, log); short acks = configureAcks(producerConfig, log); return new Sender(logContext, client, @@ -468,7 +467,7 @@ public class KafkaProducer implements Producer { maxInflightRequests == 1, producerConfig.getInt(ProducerConfig.MAX_REQUEST_SIZE_CONFIG), acks, - retries, + producerConfig.getInt(ProducerConfig.RETRIES_CONFIG), metricsRegistry.senderMetrics, time, requestTimeoutMs, @@ -527,15 +526,6 @@ public class KafkaProducer implements Producer { return transactionManager; } - private static int configureRetries(ProducerConfig config, Logger log) { - boolean userConfiguredRetries = config.originals().containsKey(ProducerConfig.RETRIES_CONFIG); - if (config.idempotenceEnabled() && !userConfiguredRetries) { - log.info("Overriding the default retries config to the recommended value of {} since the idempotent " + - "producer is enabled.", Integer.MAX_VALUE); - } - return config.getInt(ProducerConfig.RETRIES_CONFIG); - } - private static int configureInflightRequests(ProducerConfig config) { if (config.idempotenceEnabled() && 5 < config.getInt(ProducerConfig.MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION)) { throw new ConfigException("Must set " + ProducerConfig.MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION + " to at most 5" +