Browse Source

KAFKA-8928: Logged producer config does not always match actual configured values (#7466)

Some logged producer configs(clientId, acks, retries) might not be reflected the actual values.

Reviewers: Guozhang Wang <wangguoz@gmail.com>
pull/7892/head
huxi 5 years ago committed by Guozhang Wang
parent
commit
b56b848682
  1. 87
      clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java
  2. 70
      clients/src/main/java/org/apache/kafka/clients/producer/ProducerConfig.java
  3. 2
      clients/src/main/java/org/apache/kafka/common/config/AbstractConfig.java
  4. 16
      clients/src/test/java/org/apache/kafka/clients/producer/KafkaProducerTest.java

87
clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java

@ -81,7 +81,6 @@ import java.util.Properties; @@ -81,7 +81,6 @@ import java.util.Properties;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;
@ -230,7 +229,6 @@ import java.util.concurrent.atomic.AtomicReference; @@ -230,7 +229,6 @@ import java.util.concurrent.atomic.AtomicReference;
public class KafkaProducer<K, V> implements Producer<K, V> {
private final Logger log;
private static final AtomicInteger PRODUCER_CLIENT_ID_SEQUENCE = new AtomicInteger(1);
private static final String JMX_PREFIX = "kafka.producer";
public static final String NETWORK_THREAD_PREFIX = "kafka-producer-network-thread";
public static final String PRODUCER_METRIC_GROUP_NAME = "producer-metrics";
@ -333,7 +331,7 @@ public class KafkaProducer<K, V> implements Producer<K, V> { @@ -333,7 +331,7 @@ public class KafkaProducer<K, V> implements Producer<K, V> {
String transactionalId = userProvidedConfigs.containsKey(ProducerConfig.TRANSACTIONAL_ID_CONFIG) ?
(String) userProvidedConfigs.get(ProducerConfig.TRANSACTIONAL_ID_CONFIG) : null;
this.clientId = buildClientId(config.getString(ProducerConfig.CLIENT_ID_CONFIG), transactionalId);
this.clientId = config.getString(ProducerConfig.CLIENT_ID_CONFIG);
LogContext logContext;
if (transactionalId == null)
@ -433,19 +431,9 @@ public class KafkaProducer<K, V> implements Producer<K, V> { @@ -433,19 +431,9 @@ public class KafkaProducer<K, V> implements Producer<K, V> {
}
}
private static String buildClientId(String configuredClientId, String transactionalId) {
if (!configuredClientId.isEmpty())
return configuredClientId;
if (transactionalId != null)
return "producer-" + transactionalId;
return "producer-" + PRODUCER_CLIENT_ID_SEQUENCE.getAndIncrement();
}
// visible for testing
Sender newSender(LogContext logContext, KafkaClient kafkaClient, ProducerMetadata metadata) {
int maxInflightRequests = configureInflightRequests(producerConfig, transactionManager != null);
int maxInflightRequests = configureInflightRequests(producerConfig);
int requestTimeoutMs = producerConfig.getInt(ProducerConfig.REQUEST_TIMEOUT_MS_CONFIG);
ChannelBuilder channelBuilder = ClientUtils.createChannelBuilder(producerConfig, time);
ProducerMetrics metricsRegistry = new ProducerMetrics(this.metrics);
@ -467,8 +455,8 @@ public class KafkaProducer<K, V> implements Producer<K, V> { @@ -467,8 +455,8 @@ public class KafkaProducer<K, V> implements Producer<K, V> {
apiVersions,
throttleTimeSensor,
logContext);
int retries = configureRetries(producerConfig, transactionManager != null, log);
short acks = configureAcks(producerConfig, transactionManager != null, log);
int retries = configureRetries(producerConfig, log);
short acks = configureAcks(producerConfig, log);
return new Sender(logContext,
client,
metadata,
@ -516,23 +504,13 @@ public class KafkaProducer<K, V> implements Producer<K, V> { @@ -516,23 +504,13 @@ public class KafkaProducer<K, V> implements Producer<K, V> {
TransactionManager transactionManager = null;
boolean userConfiguredIdempotence = false;
if (config.originals().containsKey(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG))
userConfiguredIdempotence = true;
boolean userConfiguredTransactions = false;
if (config.originals().containsKey(ProducerConfig.TRANSACTIONAL_ID_CONFIG))
userConfiguredTransactions = true;
boolean userConfiguredIdempotence = config.originals().containsKey(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG);
boolean userConfiguredTransactions = config.originals().containsKey(ProducerConfig.TRANSACTIONAL_ID_CONFIG);
if (userConfiguredTransactions && !userConfiguredIdempotence)
log.info("Overriding the default {} to true since {} is specified.", ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG,
ProducerConfig.TRANSACTIONAL_ID_CONFIG);
boolean idempotenceEnabled = config.getBoolean(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG);
if (!idempotenceEnabled && userConfiguredIdempotence && userConfiguredTransactions)
throw new ConfigException("Cannot set a " + ProducerConfig.TRANSACTIONAL_ID_CONFIG + " without also enabling idempotence.");
if (userConfiguredTransactions)
idempotenceEnabled = true;
if (idempotenceEnabled) {
if (config.idempotenceEnabled()) {
String transactionalId = config.getString(ProducerConfig.TRANSACTIONAL_ID_CONFIG);
int transactionTimeoutMs = config.getInt(ProducerConfig.TRANSACTION_TIMEOUT_CONFIG);
long retryBackoffMs = config.getLong(ProducerConfig.RETRY_BACKOFF_MS_CONFIG);
@ -542,63 +520,40 @@ public class KafkaProducer<K, V> implements Producer<K, V> { @@ -542,63 +520,40 @@ public class KafkaProducer<K, V> implements Producer<K, V> {
else
log.info("Instantiated an idempotent producer.");
}
return transactionManager;
}
private static int configureRetries(ProducerConfig config, boolean idempotenceEnabled, Logger log) {
boolean userConfiguredRetries = false;
if (config.originals().containsKey(ProducerConfig.RETRIES_CONFIG)) {
userConfiguredRetries = true;
}
if (idempotenceEnabled && !userConfiguredRetries) {
// We recommend setting infinite retries when the idempotent producer is enabled, so it makes sense to make
// this the default.
private static int configureRetries(ProducerConfig config, Logger log) {
boolean userConfiguredRetries = config.originals().containsKey(ProducerConfig.RETRIES_CONFIG);
if (config.idempotenceEnabled() && !userConfiguredRetries) {
log.info("Overriding the default retries config to the recommended value of {} since the idempotent " +
"producer is enabled.", Integer.MAX_VALUE);
return Integer.MAX_VALUE;
}
if (idempotenceEnabled && config.getInt(ProducerConfig.RETRIES_CONFIG) == 0) {
throw new ConfigException("Must set " + ProducerConfig.RETRIES_CONFIG + " to non-zero when using the idempotent producer.");
}
return config.getInt(ProducerConfig.RETRIES_CONFIG);
}
private static int configureInflightRequests(ProducerConfig config, boolean idempotenceEnabled) {
if (idempotenceEnabled && 5 < config.getInt(ProducerConfig.MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION)) {
private static int configureInflightRequests(ProducerConfig config) {
if (config.idempotenceEnabled() && 5 < config.getInt(ProducerConfig.MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION)) {
throw new ConfigException("Must set " + ProducerConfig.MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION + " to at most 5" +
" to use the idempotent producer.");
}
return config.getInt(ProducerConfig.MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION);
}
private static short configureAcks(ProducerConfig config, boolean idempotenceEnabled, Logger log) {
boolean userConfiguredAcks = false;
short acks = (short) parseAcks(config.getString(ProducerConfig.ACKS_CONFIG));
if (config.originals().containsKey(ProducerConfig.ACKS_CONFIG)) {
userConfiguredAcks = true;
}
private static short configureAcks(ProducerConfig config, Logger log) {
boolean userConfiguredAcks = config.originals().containsKey(ProducerConfig.ACKS_CONFIG);
short acks = Short.valueOf(config.getString(ProducerConfig.ACKS_CONFIG));
if (idempotenceEnabled && !userConfiguredAcks) {
if (config.idempotenceEnabled()) {
if (!userConfiguredAcks)
log.info("Overriding the default {} to all since idempotence is enabled.", ProducerConfig.ACKS_CONFIG);
return -1;
}
if (idempotenceEnabled && acks != -1) {
else if (acks != -1)
throw new ConfigException("Must set " + ProducerConfig.ACKS_CONFIG + " to all in order to use the idempotent " +
"producer. Otherwise we cannot guarantee idempotence.");
}
return acks;
}
private static int parseAcks(String acksString) {
try {
return acksString.trim().equalsIgnoreCase("all") ? -1 : Integer.parseInt(acksString.trim());
} catch (NumberFormatException e) {
throw new ConfigException("Invalid configuration value for 'acks': " + acksString);
}
}
/**
* Needs to be called before any other methods when the transactional.id is set in the configuration.
*

70
clients/src/main/java/org/apache/kafka/clients/producer/ProducerConfig.java

@ -23,6 +23,7 @@ import org.apache.kafka.common.config.AbstractConfig; @@ -23,6 +23,7 @@ import org.apache.kafka.common.config.AbstractConfig;
import org.apache.kafka.common.config.ConfigDef;
import org.apache.kafka.common.config.ConfigDef.Importance;
import org.apache.kafka.common.config.ConfigDef.Type;
import org.apache.kafka.common.config.ConfigException;
import org.apache.kafka.common.config.SecurityConfig;
import org.apache.kafka.common.metrics.Sensor;
import org.apache.kafka.common.serialization.Serializer;
@ -32,6 +33,7 @@ import java.util.HashMap; @@ -32,6 +33,7 @@ import java.util.HashMap;
import java.util.Map;
import java.util.Properties;
import java.util.Set;
import java.util.concurrent.atomic.AtomicInteger;
import static org.apache.kafka.common.config.ConfigDef.Range.atLeast;
import static org.apache.kafka.common.config.ConfigDef.Range.between;
@ -249,6 +251,8 @@ public class ProducerConfig extends AbstractConfig { @@ -249,6 +251,8 @@ public class ProducerConfig extends AbstractConfig {
public static final String SECURITY_PROVIDERS_CONFIG = SecurityConfig.SECURITY_PROVIDERS_CONFIG;
private static final String SECURITY_PROVIDERS_DOC = SecurityConfig.SECURITY_PROVIDERS_DOC;
private static final AtomicInteger PRODUCER_CLIENT_ID_SEQUENCE = new AtomicInteger(1);
static {
CONFIG = new ConfigDef().define(BOOTSTRAP_SERVERS_CONFIG, Type.LIST, Collections.emptyList(), new ConfigDef.NonNullValidator(), Importance.HIGH, CommonClientConfigs.BOOTSTRAP_SERVERS_DOC)
.define(CLIENT_DNS_LOOKUP_CONFIG,
@ -377,7 +381,61 @@ public class ProducerConfig extends AbstractConfig { @@ -377,7 +381,61 @@ public class ProducerConfig extends AbstractConfig {
@Override
protected Map<String, Object> postProcessParsedConfig(final Map<String, Object> parsedValues) {
return CommonClientConfigs.postProcessReconnectBackoffConfigs(this, parsedValues);
Map<String, Object> refinedConfigs = CommonClientConfigs.postProcessReconnectBackoffConfigs(this, parsedValues);
maybeOverrideEnableIdempotence(refinedConfigs);
maybeOverrideClientId(refinedConfigs);
maybeOverrideAcksAndRetries(refinedConfigs);
return refinedConfigs;
}
private void maybeOverrideClientId(final Map<String, Object> configs) {
String refinedClientId;
boolean userConfiguredClientId = this.originals().containsKey(CLIENT_ID_CONFIG);
if (userConfiguredClientId) {
refinedClientId = this.getString(CLIENT_ID_CONFIG);
} else {
String transactionalId = this.getString(TRANSACTIONAL_ID_CONFIG);
refinedClientId = "producer-" + (transactionalId != null ? transactionalId : PRODUCER_CLIENT_ID_SEQUENCE.getAndIncrement());
}
configs.put(CLIENT_ID_CONFIG, refinedClientId);
}
private void maybeOverrideEnableIdempotence(final Map<String, Object> configs) {
boolean userConfiguredIdempotence = this.originals().containsKey(ENABLE_IDEMPOTENCE_CONFIG);
boolean userConfiguredTransactions = this.originals().containsKey(TRANSACTIONAL_ID_CONFIG);
if (userConfiguredTransactions && !userConfiguredIdempotence) {
configs.put(ENABLE_IDEMPOTENCE_CONFIG, true);
}
}
private void maybeOverrideAcksAndRetries(final Map<String, Object> configs) {
final String acksStr = parseAcks(this.getString(ACKS_CONFIG));
configs.put(ACKS_CONFIG, acksStr);
// For idempotence producers, values for `RETRIES_CONFIG` and `ACKS_CONFIG` might need to be overridden.
if (idempotenceEnabled()) {
boolean userConfiguredRetries = this.originals().containsKey(RETRIES_CONFIG);
if (this.getInt(RETRIES_CONFIG) == 0) {
throw new ConfigException("Must set " + ProducerConfig.RETRIES_CONFIG + " to non-zero when using the idempotent producer.");
}
configs.put(RETRIES_CONFIG, userConfiguredRetries ? this.getInt(RETRIES_CONFIG) : Integer.MAX_VALUE);
boolean userConfiguredAcks = this.originals().containsKey(ACKS_CONFIG);
final short acks = Short.valueOf(acksStr);
if (userConfiguredAcks && acks != (short) -1) {
throw new ConfigException("Must set " + ACKS_CONFIG + " to all in order to use the idempotent " +
"producer. Otherwise we cannot guarantee idempotence.");
}
configs.put(ACKS_CONFIG, "-1");
}
}
private static String parseAcks(String acksString) {
try {
return acksString.trim().equalsIgnoreCase("all") ? "-1" : Short.parseShort(acksString.trim()) + "";
} catch (NumberFormatException e) {
throw new ConfigException("Invalid configuration value for 'acks': " + acksString);
}
}
public static Map<String, Object> addSerializerToConfig(Map<String, Object> configs,
@ -410,6 +468,16 @@ public class ProducerConfig extends AbstractConfig { @@ -410,6 +468,16 @@ public class ProducerConfig extends AbstractConfig {
super(CONFIG, props);
}
boolean idempotenceEnabled() {
boolean userConfiguredIdempotence = this.originals().containsKey(ENABLE_IDEMPOTENCE_CONFIG);
boolean userConfiguredTransactions = this.originals().containsKey(TRANSACTIONAL_ID_CONFIG);
boolean idempotenceEnabled = userConfiguredIdempotence && this.getBoolean(ENABLE_IDEMPOTENCE_CONFIG);
if (!idempotenceEnabled && userConfiguredIdempotence && userConfiguredTransactions)
throw new ConfigException("Cannot set a " + ProducerConfig.TRANSACTIONAL_ID_CONFIG + " without also enabling idempotence.");
return userConfiguredTransactions || idempotenceEnabled;
}
ProducerConfig(Map<?, ?> props, boolean doLog) {
super(CONFIG, props, doLog);
}

2
clients/src/main/java/org/apache/kafka/common/config/AbstractConfig.java

@ -106,12 +106,12 @@ public class AbstractConfig { @@ -106,12 +106,12 @@ public class AbstractConfig {
this.originals = resolveConfigVariables(configProviderProps, (Map<String, Object>) originals);
this.values = definition.parse(this.originals);
this.used = Collections.synchronizedSet(new HashSet<>());
Map<String, Object> configUpdates = postProcessParsedConfig(Collections.unmodifiableMap(this.values));
for (Map.Entry<String, Object> update : configUpdates.entrySet()) {
this.values.put(update.getKey(), update.getValue());
}
definition.parse(this.values);
this.used = Collections.synchronizedSet(new HashSet<>());
this.definition = definition;
if (doLog)
logAll();

16
clients/src/test/java/org/apache/kafka/clients/producer/KafkaProducerTest.java

@ -113,6 +113,22 @@ public class KafkaProducerTest { @@ -113,6 +113,22 @@ public class KafkaProducerTest {
Collections.emptySet(),
Collections.emptySet());
@Test
public void testOverwriteAcksAndRetriesForIdempotentProducers() {
Properties props = new Properties();
props.setProperty(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9999");
props.setProperty(ProducerConfig.TRANSACTIONAL_ID_CONFIG, "transactionalId");
props.setProperty(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, new StringSerializer().getClass().getName());
props.setProperty(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, new StringSerializer().getClass().getName());
ProducerConfig config = new ProducerConfig(props);
assertTrue(config.getBoolean(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG));
assertTrue(Arrays.asList("-1", "all").stream().anyMatch(each -> each.equalsIgnoreCase(config.getString(ProducerConfig.ACKS_CONFIG))));
assertTrue(config.getInt(ProducerConfig.RETRIES_CONFIG) == Integer.MAX_VALUE);
assertTrue(config.getString(ProducerConfig.CLIENT_ID_CONFIG).equalsIgnoreCase("producer-" +
config.getString(ProducerConfig.TRANSACTIONAL_ID_CONFIG)));
}
@Test
public void testMetricsReporterAutoGeneratedClientId() {
Properties props = new Properties();

Loading…
Cancel
Save