Browse Source

KAFKA-3353; Remove deprecated producer configs

These configs have been deprecated since 0.9.0.0:
block.on.buffer.full, metadata.fetch.timeout.ms and timeout.ms

Author: Ismael Juma <ismael@juma.me.uk>

Reviewers: Jason Gustafson <jason@confluent.io>

Closes #2987 from ijuma/kafka-3353-remove-deprecated-producer-configs
pull/2899/merge
Ismael Juma 8 years ago committed by Jason Gustafson
parent
commit
3bcadbfb47
  1. 45
      clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java
  2. 43
      clients/src/main/java/org/apache/kafka/clients/producer/ProducerConfig.java
  3. 2
      docs/upgrade.html

45
clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java

@ -270,8 +270,8 @@ public class KafkaProducer<K, V> implements Producer<K, V> { @@ -270,8 +270,8 @@ public class KafkaProducer<K, V> implements Producer<K, V> {
this.totalMemorySize = config.getLong(ProducerConfig.BUFFER_MEMORY_CONFIG);
this.compressionType = CompressionType.forName(config.getString(ProducerConfig.COMPRESSION_TYPE_CONFIG));
this.maxBlockTimeMs = configureMaxBlockTime(config, userProvidedConfigs);
this.requestTimeoutMs = configureRequestTimeout(config, userProvidedConfigs);
this.maxBlockTimeMs = config.getLong(ProducerConfig.MAX_BLOCK_MS_CONFIG);
this.requestTimeoutMs = config.getInt(ProducerConfig.REQUEST_TIMEOUT_MS_CONFIG);
this.transactionManager = configureTransactionState(config);
int retries = configureRetries(config, transactionManager != null);
int maxInflightRequests = configureInflightRequests(config, transactionManager != null);
@ -335,47 +335,6 @@ public class KafkaProducer<K, V> implements Producer<K, V> { @@ -335,47 +335,6 @@ public class KafkaProducer<K, V> implements Producer<K, V> {
return serializer instanceof ExtendedSerializer ? (ExtendedSerializer<T>) serializer : new ExtendedSerializer.Wrapper<>(serializer);
}
private static long configureMaxBlockTime(ProducerConfig config, Map<String, Object> userProvidedConfigs) {
/* check for user defined settings.
* If the BLOCK_ON_BUFFER_FULL is set to true,we do not honor METADATA_FETCH_TIMEOUT_CONFIG.
* This should be removed with release 0.9 when the deprecated configs are removed.
*/
if (userProvidedConfigs.containsKey(ProducerConfig.BLOCK_ON_BUFFER_FULL_CONFIG)) {
log.warn(ProducerConfig.BLOCK_ON_BUFFER_FULL_CONFIG + " config is deprecated and will be removed soon. " +
"Please use " + ProducerConfig.MAX_BLOCK_MS_CONFIG);
boolean blockOnBufferFull = config.getBoolean(ProducerConfig.BLOCK_ON_BUFFER_FULL_CONFIG);
if (blockOnBufferFull) {
return Long.MAX_VALUE;
} else if (userProvidedConfigs.containsKey(ProducerConfig.METADATA_FETCH_TIMEOUT_CONFIG)) {
log.warn(ProducerConfig.METADATA_FETCH_TIMEOUT_CONFIG + " config is deprecated and will be removed soon. " +
"Please use " + ProducerConfig.MAX_BLOCK_MS_CONFIG);
return config.getLong(ProducerConfig.METADATA_FETCH_TIMEOUT_CONFIG);
} else {
return config.getLong(ProducerConfig.MAX_BLOCK_MS_CONFIG);
}
} else if (userProvidedConfigs.containsKey(ProducerConfig.METADATA_FETCH_TIMEOUT_CONFIG)) {
log.warn(ProducerConfig.METADATA_FETCH_TIMEOUT_CONFIG + " config is deprecated and will be removed soon. " +
"Please use " + ProducerConfig.MAX_BLOCK_MS_CONFIG);
return config.getLong(ProducerConfig.METADATA_FETCH_TIMEOUT_CONFIG);
} else {
return config.getLong(ProducerConfig.MAX_BLOCK_MS_CONFIG);
}
}
private static int configureRequestTimeout(ProducerConfig config, Map<String, Object> userProvidedConfigs) {
/* check for user defined settings.
* If the TIME_OUT config is set use that for request timeout.
* This should be removed with release 0.9
*/
if (userProvidedConfigs.containsKey(ProducerConfig.TIMEOUT_CONFIG)) {
log.warn(ProducerConfig.TIMEOUT_CONFIG + " config is deprecated and will be removed soon. Please use " +
ProducerConfig.REQUEST_TIMEOUT_MS_CONFIG);
return config.getInt(ProducerConfig.TIMEOUT_CONFIG);
} else {
return config.getInt(ProducerConfig.REQUEST_TIMEOUT_MS_CONFIG);
}
}
private static TransactionManager configureTransactionState(ProducerConfig config) {
TransactionManager transactionManager = null;

43
clients/src/main/java/org/apache/kafka/clients/producer/ProducerConfig.java

@ -49,16 +49,6 @@ public class ProducerConfig extends AbstractConfig { @@ -49,16 +49,6 @@ public class ProducerConfig extends AbstractConfig {
/** <code>bootstrap.servers</code> */
public static final String BOOTSTRAP_SERVERS_CONFIG = CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG;
/** <code>metadata.fetch.timeout.ms</code> */
/**
* @deprecated This config will be removed in a future release. Please use {@link #MAX_BLOCK_MS_CONFIG}
*/
@Deprecated
public static final String METADATA_FETCH_TIMEOUT_CONFIG = "metadata.fetch.timeout.ms";
private static final String METADATA_FETCH_TIMEOUT_DOC = "The first time data is sent to a topic we must fetch metadata about that topic to know which servers "
+ "host the topic's partitions. This config specifies the maximum time, in milliseconds, for this fetch "
+ "to succeed before throwing an exception back to the client.";
/** <code>metadata.max.age.ms</code> */
public static final String METADATA_MAX_AGE_CONFIG = CommonClientConfigs.METADATA_MAX_AGE_CONFIG;
private static final String METADATA_MAX_AGE_DOC = CommonClientConfigs.METADATA_MAX_AGE_DOC;
@ -94,18 +84,6 @@ public class ProducerConfig extends AbstractConfig { @@ -94,18 +84,6 @@ public class ProducerConfig extends AbstractConfig {
+ " acknowledge the record. This guarantees that the record will not be lost as long as at least one in-sync replica"
+ " remains alive. This is the strongest available guarantee. This is equivalent to the acks=-1 setting.";
/** <code>timeout.ms</code> */
/**
* @deprecated This config will be removed in a future release. Please use {@link #REQUEST_TIMEOUT_MS_CONFIG}
*/
@Deprecated
public static final String TIMEOUT_CONFIG = "timeout.ms";
private static final String TIMEOUT_DOC = "The configuration controls the maximum amount of time the server will wait for acknowledgments from followers to "
+ "meet the acknowledgment requirements the producer has specified with the <code>acks</code> configuration. If the "
+ "requested number of acknowledgments are not met when the timeout elapses an error will be returned. This timeout "
+ "is measured on the server side and does not include the network latency of the request.";
/** <code>linger.ms</code> */
public static final String LINGER_MS_CONFIG = "linger.ms";
private static final String LINGER_MS_DOC = "The producer groups together any records that arrive in between request transmissions into a single batched request. "
@ -143,19 +121,6 @@ public class ProducerConfig extends AbstractConfig { @@ -143,19 +121,6 @@ public class ProducerConfig extends AbstractConfig {
+ "These methods can be blocked either because the buffer is full or metadata unavailable."
+ "Blocking in the user-supplied serializers or partitioner will not be counted against this timeout.";
/** <code>block.on.buffer.full</code> */
/**
* @deprecated This config will be removed in a future release. Please use {@link #MAX_BLOCK_MS_CONFIG}.
*/
@Deprecated
public static final String BLOCK_ON_BUFFER_FULL_CONFIG = "block.on.buffer.full";
private static final String BLOCK_ON_BUFFER_FULL_DOC = "When our memory buffer is exhausted we must either stop accepting new records (block) or throw errors. "
+ "By default this setting is false and the producer will no longer throw a BufferExhaustException but instead will use the <code>" + MAX_BLOCK_MS_CONFIG + "</code> "
+ "value to block, after which it will throw a TimeoutException. Setting this property to true will set the <code>" + MAX_BLOCK_MS_CONFIG + "</code> to Long.MAX_VALUE. "
+ "<em>Also if this property is set to true, parameter <code>" + METADATA_FETCH_TIMEOUT_CONFIG + "</code> is no longer honored.</em>"
+ "<p>This parameter is deprecated and will be removed in a future release. "
+ "Parameter <code>" + MAX_BLOCK_MS_CONFIG + "</code> should be used instead.";
/** <code>buffer.memory</code> */
public static final String BUFFER_MEMORY_CONFIG = "buffer.memory";
private static final String BUFFER_MEMORY_DOC = "The total bytes of memory the producer can use to buffer records waiting to be sent to the server. If records are "
@ -256,7 +221,6 @@ public class ProducerConfig extends AbstractConfig { @@ -256,7 +221,6 @@ public class ProducerConfig extends AbstractConfig {
ACKS_DOC)
.define(COMPRESSION_TYPE_CONFIG, Type.STRING, "none", Importance.HIGH, COMPRESSION_TYPE_DOC)
.define(BATCH_SIZE_CONFIG, Type.INT, 16384, atLeast(0), Importance.MEDIUM, BATCH_SIZE_DOC)
.define(TIMEOUT_CONFIG, Type.INT, 30 * 1000, atLeast(0), Importance.MEDIUM, TIMEOUT_DOC)
.define(LINGER_MS_CONFIG, Type.LONG, 0, atLeast(0L), Importance.MEDIUM, LINGER_MS_DOC)
.define(CLIENT_ID_CONFIG, Type.STRING, "", Importance.MEDIUM, CommonClientConfigs.CLIENT_ID_DOC)
.define(SEND_BUFFER_CONFIG, Type.INT, 128 * 1024, atLeast(-1), Importance.MEDIUM, CommonClientConfigs.SEND_BUFFER_DOC)
@ -267,16 +231,9 @@ public class ProducerConfig extends AbstractConfig { @@ -267,16 +231,9 @@ public class ProducerConfig extends AbstractConfig {
atLeast(0),
Importance.MEDIUM,
MAX_REQUEST_SIZE_DOC)
.define(BLOCK_ON_BUFFER_FULL_CONFIG, Type.BOOLEAN, false, Importance.LOW, BLOCK_ON_BUFFER_FULL_DOC)
.define(RECONNECT_BACKOFF_MS_CONFIG, Type.LONG, 50L, atLeast(0L), Importance.LOW, CommonClientConfigs.RECONNECT_BACKOFF_MS_DOC)
.define(METRIC_REPORTER_CLASSES_CONFIG, Type.LIST, "", Importance.LOW, CommonClientConfigs.METRIC_REPORTER_CLASSES_DOC)
.define(RETRY_BACKOFF_MS_CONFIG, Type.LONG, 100L, atLeast(0L), Importance.LOW, CommonClientConfigs.RETRY_BACKOFF_MS_DOC)
.define(METADATA_FETCH_TIMEOUT_CONFIG,
Type.LONG,
60 * 1000,
atLeast(0),
Importance.LOW,
METADATA_FETCH_TIMEOUT_DOC)
.define(MAX_BLOCK_MS_CONFIG,
Type.LONG,
60 * 1000,

2
docs/upgrade.html

@ -55,6 +55,8 @@ @@ -55,6 +55,8 @@
<ul>
<li>Unclean leader election is now disabled by default. The new default favors durability over availability. Users who wish to
to retain the previous behavior should set the broker config <code>unclean.leader.election.enabled</code> to <code>false</code>.</li>
<li>Producer configs <code>block.on.buffer.full</code>, <code>metadata.fetch.timeout.ms</code> and <code>timeout.ms</code> have been
removed. They were initially deprecated in Kafka 0.9.0.0.</li>
<li>The <code>offsets.topic.replication.factor</code> broker config is now enforced upon auto topic creation. Internal
auto topic creation will fail with a GROUP_COORDINATOR_NOT_AVAILABLE error until the cluster size meets this
replication factor requirement.</li>

Loading…
Cancel
Save