From b56b848682079052e98ecde9a15a8f6b8860c599 Mon Sep 17 00:00:00 2001 From: huxi Date: Thu, 2 Jan 2020 02:49:21 +0800 Subject: [PATCH] KAFKA-8928: Logged producer config does not always match actual configured values (#7466) Some logged producer configs(clientId, acks, retries) might not be reflected the actual values. Reviewers: Guozhang Wang --- .../kafka/clients/producer/KafkaProducer.java | 93 +++++-------------- .../clients/producer/ProducerConfig.java | 70 +++++++++++++- .../kafka/common/config/AbstractConfig.java | 2 +- .../clients/producer/KafkaProducerTest.java | 16 ++++ 4 files changed, 110 insertions(+), 71 deletions(-) diff --git a/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java b/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java index a5d749a4ff0..66a11033f4f 100644 --- a/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java +++ b/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java @@ -81,7 +81,6 @@ import java.util.Properties; import java.util.concurrent.ExecutionException; import java.util.concurrent.Future; import java.util.concurrent.TimeUnit; -import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicReference; @@ -230,7 +229,6 @@ import java.util.concurrent.atomic.AtomicReference; public class KafkaProducer implements Producer { private final Logger log; - private static final AtomicInteger PRODUCER_CLIENT_ID_SEQUENCE = new AtomicInteger(1); private static final String JMX_PREFIX = "kafka.producer"; public static final String NETWORK_THREAD_PREFIX = "kafka-producer-network-thread"; public static final String PRODUCER_METRIC_GROUP_NAME = "producer-metrics"; @@ -333,7 +331,7 @@ public class KafkaProducer implements Producer { String transactionalId = userProvidedConfigs.containsKey(ProducerConfig.TRANSACTIONAL_ID_CONFIG) ? (String) userProvidedConfigs.get(ProducerConfig.TRANSACTIONAL_ID_CONFIG) : null; - this.clientId = buildClientId(config.getString(ProducerConfig.CLIENT_ID_CONFIG), transactionalId); + this.clientId = config.getString(ProducerConfig.CLIENT_ID_CONFIG); LogContext logContext; if (transactionalId == null) @@ -433,19 +431,9 @@ public class KafkaProducer implements Producer { } } - private static String buildClientId(String configuredClientId, String transactionalId) { - if (!configuredClientId.isEmpty()) - return configuredClientId; - - if (transactionalId != null) - return "producer-" + transactionalId; - - return "producer-" + PRODUCER_CLIENT_ID_SEQUENCE.getAndIncrement(); - } - // visible for testing Sender newSender(LogContext logContext, KafkaClient kafkaClient, ProducerMetadata metadata) { - int maxInflightRequests = configureInflightRequests(producerConfig, transactionManager != null); + int maxInflightRequests = configureInflightRequests(producerConfig); int requestTimeoutMs = producerConfig.getInt(ProducerConfig.REQUEST_TIMEOUT_MS_CONFIG); ChannelBuilder channelBuilder = ClientUtils.createChannelBuilder(producerConfig, time); ProducerMetrics metricsRegistry = new ProducerMetrics(this.metrics); @@ -467,8 +455,8 @@ public class KafkaProducer implements Producer { apiVersions, throttleTimeSensor, logContext); - int retries = configureRetries(producerConfig, transactionManager != null, log); - short acks = configureAcks(producerConfig, transactionManager != null, log); + int retries = configureRetries(producerConfig, log); + short acks = configureAcks(producerConfig, log); return new Sender(logContext, client, metadata, @@ -516,23 +504,13 @@ public class KafkaProducer implements Producer { TransactionManager transactionManager = null; - boolean userConfiguredIdempotence = false; - if (config.originals().containsKey(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG)) - userConfiguredIdempotence = true; - - boolean userConfiguredTransactions = false; - if (config.originals().containsKey(ProducerConfig.TRANSACTIONAL_ID_CONFIG)) - userConfiguredTransactions = true; + boolean userConfiguredIdempotence = config.originals().containsKey(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG); + boolean userConfiguredTransactions = config.originals().containsKey(ProducerConfig.TRANSACTIONAL_ID_CONFIG); + if (userConfiguredTransactions && !userConfiguredIdempotence) + log.info("Overriding the default {} to true since {} is specified.", ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, + ProducerConfig.TRANSACTIONAL_ID_CONFIG); - boolean idempotenceEnabled = config.getBoolean(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG); - - if (!idempotenceEnabled && userConfiguredIdempotence && userConfiguredTransactions) - throw new ConfigException("Cannot set a " + ProducerConfig.TRANSACTIONAL_ID_CONFIG + " without also enabling idempotence."); - - if (userConfiguredTransactions) - idempotenceEnabled = true; - - if (idempotenceEnabled) { + if (config.idempotenceEnabled()) { String transactionalId = config.getString(ProducerConfig.TRANSACTIONAL_ID_CONFIG); int transactionTimeoutMs = config.getInt(ProducerConfig.TRANSACTION_TIMEOUT_CONFIG); long retryBackoffMs = config.getLong(ProducerConfig.RETRY_BACKOFF_MS_CONFIG); @@ -542,63 +520,40 @@ public class KafkaProducer implements Producer { else log.info("Instantiated an idempotent producer."); } - return transactionManager; } - private static int configureRetries(ProducerConfig config, boolean idempotenceEnabled, Logger log) { - boolean userConfiguredRetries = false; - if (config.originals().containsKey(ProducerConfig.RETRIES_CONFIG)) { - userConfiguredRetries = true; - } - if (idempotenceEnabled && !userConfiguredRetries) { - // We recommend setting infinite retries when the idempotent producer is enabled, so it makes sense to make - // this the default. + private static int configureRetries(ProducerConfig config, Logger log) { + boolean userConfiguredRetries = config.originals().containsKey(ProducerConfig.RETRIES_CONFIG); + if (config.idempotenceEnabled() && !userConfiguredRetries) { log.info("Overriding the default retries config to the recommended value of {} since the idempotent " + "producer is enabled.", Integer.MAX_VALUE); - return Integer.MAX_VALUE; - } - if (idempotenceEnabled && config.getInt(ProducerConfig.RETRIES_CONFIG) == 0) { - throw new ConfigException("Must set " + ProducerConfig.RETRIES_CONFIG + " to non-zero when using the idempotent producer."); } return config.getInt(ProducerConfig.RETRIES_CONFIG); } - private static int configureInflightRequests(ProducerConfig config, boolean idempotenceEnabled) { - if (idempotenceEnabled && 5 < config.getInt(ProducerConfig.MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION)) { + private static int configureInflightRequests(ProducerConfig config) { + if (config.idempotenceEnabled() && 5 < config.getInt(ProducerConfig.MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION)) { throw new ConfigException("Must set " + ProducerConfig.MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION + " to at most 5" + " to use the idempotent producer."); } return config.getInt(ProducerConfig.MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION); } - private static short configureAcks(ProducerConfig config, boolean idempotenceEnabled, Logger log) { - boolean userConfiguredAcks = false; - short acks = (short) parseAcks(config.getString(ProducerConfig.ACKS_CONFIG)); - if (config.originals().containsKey(ProducerConfig.ACKS_CONFIG)) { - userConfiguredAcks = true; - } - - if (idempotenceEnabled && !userConfiguredAcks) { - log.info("Overriding the default {} to all since idempotence is enabled.", ProducerConfig.ACKS_CONFIG); - return -1; - } + private static short configureAcks(ProducerConfig config, Logger log) { + boolean userConfiguredAcks = config.originals().containsKey(ProducerConfig.ACKS_CONFIG); + short acks = Short.valueOf(config.getString(ProducerConfig.ACKS_CONFIG)); - if (idempotenceEnabled && acks != -1) { - throw new ConfigException("Must set " + ProducerConfig.ACKS_CONFIG + " to all in order to use the idempotent " + - "producer. Otherwise we cannot guarantee idempotence."); + if (config.idempotenceEnabled()) { + if (!userConfiguredAcks) + log.info("Overriding the default {} to all since idempotence is enabled.", ProducerConfig.ACKS_CONFIG); + else if (acks != -1) + throw new ConfigException("Must set " + ProducerConfig.ACKS_CONFIG + " to all in order to use the idempotent " + + "producer. Otherwise we cannot guarantee idempotence."); } return acks; } - private static int parseAcks(String acksString) { - try { - return acksString.trim().equalsIgnoreCase("all") ? -1 : Integer.parseInt(acksString.trim()); - } catch (NumberFormatException e) { - throw new ConfigException("Invalid configuration value for 'acks': " + acksString); - } - } - /** * Needs to be called before any other methods when the transactional.id is set in the configuration. * diff --git a/clients/src/main/java/org/apache/kafka/clients/producer/ProducerConfig.java b/clients/src/main/java/org/apache/kafka/clients/producer/ProducerConfig.java index fc1e6a731fc..75af5d64ecc 100644 --- a/clients/src/main/java/org/apache/kafka/clients/producer/ProducerConfig.java +++ b/clients/src/main/java/org/apache/kafka/clients/producer/ProducerConfig.java @@ -23,6 +23,7 @@ import org.apache.kafka.common.config.AbstractConfig; import org.apache.kafka.common.config.ConfigDef; import org.apache.kafka.common.config.ConfigDef.Importance; import org.apache.kafka.common.config.ConfigDef.Type; +import org.apache.kafka.common.config.ConfigException; import org.apache.kafka.common.config.SecurityConfig; import org.apache.kafka.common.metrics.Sensor; import org.apache.kafka.common.serialization.Serializer; @@ -32,6 +33,7 @@ import java.util.HashMap; import java.util.Map; import java.util.Properties; import java.util.Set; +import java.util.concurrent.atomic.AtomicInteger; import static org.apache.kafka.common.config.ConfigDef.Range.atLeast; import static org.apache.kafka.common.config.ConfigDef.Range.between; @@ -249,6 +251,8 @@ public class ProducerConfig extends AbstractConfig { public static final String SECURITY_PROVIDERS_CONFIG = SecurityConfig.SECURITY_PROVIDERS_CONFIG; private static final String SECURITY_PROVIDERS_DOC = SecurityConfig.SECURITY_PROVIDERS_DOC; + private static final AtomicInteger PRODUCER_CLIENT_ID_SEQUENCE = new AtomicInteger(1); + static { CONFIG = new ConfigDef().define(BOOTSTRAP_SERVERS_CONFIG, Type.LIST, Collections.emptyList(), new ConfigDef.NonNullValidator(), Importance.HIGH, CommonClientConfigs.BOOTSTRAP_SERVERS_DOC) .define(CLIENT_DNS_LOOKUP_CONFIG, @@ -377,7 +381,61 @@ public class ProducerConfig extends AbstractConfig { @Override protected Map postProcessParsedConfig(final Map parsedValues) { - return CommonClientConfigs.postProcessReconnectBackoffConfigs(this, parsedValues); + Map refinedConfigs = CommonClientConfigs.postProcessReconnectBackoffConfigs(this, parsedValues); + maybeOverrideEnableIdempotence(refinedConfigs); + maybeOverrideClientId(refinedConfigs); + maybeOverrideAcksAndRetries(refinedConfigs); + return refinedConfigs; + } + + private void maybeOverrideClientId(final Map configs) { + String refinedClientId; + boolean userConfiguredClientId = this.originals().containsKey(CLIENT_ID_CONFIG); + if (userConfiguredClientId) { + refinedClientId = this.getString(CLIENT_ID_CONFIG); + } else { + String transactionalId = this.getString(TRANSACTIONAL_ID_CONFIG); + refinedClientId = "producer-" + (transactionalId != null ? transactionalId : PRODUCER_CLIENT_ID_SEQUENCE.getAndIncrement()); + } + configs.put(CLIENT_ID_CONFIG, refinedClientId); + } + + private void maybeOverrideEnableIdempotence(final Map configs) { + boolean userConfiguredIdempotence = this.originals().containsKey(ENABLE_IDEMPOTENCE_CONFIG); + boolean userConfiguredTransactions = this.originals().containsKey(TRANSACTIONAL_ID_CONFIG); + + if (userConfiguredTransactions && !userConfiguredIdempotence) { + configs.put(ENABLE_IDEMPOTENCE_CONFIG, true); + } + } + + private void maybeOverrideAcksAndRetries(final Map configs) { + final String acksStr = parseAcks(this.getString(ACKS_CONFIG)); + configs.put(ACKS_CONFIG, acksStr); + // For idempotence producers, values for `RETRIES_CONFIG` and `ACKS_CONFIG` might need to be overridden. + if (idempotenceEnabled()) { + boolean userConfiguredRetries = this.originals().containsKey(RETRIES_CONFIG); + if (this.getInt(RETRIES_CONFIG) == 0) { + throw new ConfigException("Must set " + ProducerConfig.RETRIES_CONFIG + " to non-zero when using the idempotent producer."); + } + configs.put(RETRIES_CONFIG, userConfiguredRetries ? this.getInt(RETRIES_CONFIG) : Integer.MAX_VALUE); + + boolean userConfiguredAcks = this.originals().containsKey(ACKS_CONFIG); + final short acks = Short.valueOf(acksStr); + if (userConfiguredAcks && acks != (short) -1) { + throw new ConfigException("Must set " + ACKS_CONFIG + " to all in order to use the idempotent " + + "producer. Otherwise we cannot guarantee idempotence."); + } + configs.put(ACKS_CONFIG, "-1"); + } + } + + private static String parseAcks(String acksString) { + try { + return acksString.trim().equalsIgnoreCase("all") ? "-1" : Short.parseShort(acksString.trim()) + ""; + } catch (NumberFormatException e) { + throw new ConfigException("Invalid configuration value for 'acks': " + acksString); + } } public static Map addSerializerToConfig(Map configs, @@ -410,6 +468,16 @@ public class ProducerConfig extends AbstractConfig { super(CONFIG, props); } + boolean idempotenceEnabled() { + boolean userConfiguredIdempotence = this.originals().containsKey(ENABLE_IDEMPOTENCE_CONFIG); + boolean userConfiguredTransactions = this.originals().containsKey(TRANSACTIONAL_ID_CONFIG); + boolean idempotenceEnabled = userConfiguredIdempotence && this.getBoolean(ENABLE_IDEMPOTENCE_CONFIG); + + if (!idempotenceEnabled && userConfiguredIdempotence && userConfiguredTransactions) + throw new ConfigException("Cannot set a " + ProducerConfig.TRANSACTIONAL_ID_CONFIG + " without also enabling idempotence."); + return userConfiguredTransactions || idempotenceEnabled; + } + ProducerConfig(Map props, boolean doLog) { super(CONFIG, props, doLog); } diff --git a/clients/src/main/java/org/apache/kafka/common/config/AbstractConfig.java b/clients/src/main/java/org/apache/kafka/common/config/AbstractConfig.java index 3992a4171e2..819770b2827 100644 --- a/clients/src/main/java/org/apache/kafka/common/config/AbstractConfig.java +++ b/clients/src/main/java/org/apache/kafka/common/config/AbstractConfig.java @@ -106,12 +106,12 @@ public class AbstractConfig { this.originals = resolveConfigVariables(configProviderProps, (Map) originals); this.values = definition.parse(this.originals); + this.used = Collections.synchronizedSet(new HashSet<>()); Map configUpdates = postProcessParsedConfig(Collections.unmodifiableMap(this.values)); for (Map.Entry update : configUpdates.entrySet()) { this.values.put(update.getKey(), update.getValue()); } definition.parse(this.values); - this.used = Collections.synchronizedSet(new HashSet<>()); this.definition = definition; if (doLog) logAll(); diff --git a/clients/src/test/java/org/apache/kafka/clients/producer/KafkaProducerTest.java b/clients/src/test/java/org/apache/kafka/clients/producer/KafkaProducerTest.java index eee067fa620..92ae90b20ac 100644 --- a/clients/src/test/java/org/apache/kafka/clients/producer/KafkaProducerTest.java +++ b/clients/src/test/java/org/apache/kafka/clients/producer/KafkaProducerTest.java @@ -113,6 +113,22 @@ public class KafkaProducerTest { Collections.emptySet(), Collections.emptySet()); + @Test + public void testOverwriteAcksAndRetriesForIdempotentProducers() { + Properties props = new Properties(); + props.setProperty(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9999"); + props.setProperty(ProducerConfig.TRANSACTIONAL_ID_CONFIG, "transactionalId"); + props.setProperty(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, new StringSerializer().getClass().getName()); + props.setProperty(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, new StringSerializer().getClass().getName()); + + ProducerConfig config = new ProducerConfig(props); + assertTrue(config.getBoolean(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG)); + assertTrue(Arrays.asList("-1", "all").stream().anyMatch(each -> each.equalsIgnoreCase(config.getString(ProducerConfig.ACKS_CONFIG)))); + assertTrue(config.getInt(ProducerConfig.RETRIES_CONFIG) == Integer.MAX_VALUE); + assertTrue(config.getString(ProducerConfig.CLIENT_ID_CONFIG).equalsIgnoreCase("producer-" + + config.getString(ProducerConfig.TRANSACTIONAL_ID_CONFIG))); + } + @Test public void testMetricsReporterAutoGeneratedClientId() { Properties props = new Properties();