diff --git a/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java b/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java index 286387ba4bb..f8123895d60 100644 --- a/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java +++ b/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java @@ -270,8 +270,8 @@ public class KafkaProducer implements Producer { this.totalMemorySize = config.getLong(ProducerConfig.BUFFER_MEMORY_CONFIG); this.compressionType = CompressionType.forName(config.getString(ProducerConfig.COMPRESSION_TYPE_CONFIG)); - this.maxBlockTimeMs = configureMaxBlockTime(config, userProvidedConfigs); - this.requestTimeoutMs = configureRequestTimeout(config, userProvidedConfigs); + this.maxBlockTimeMs = config.getLong(ProducerConfig.MAX_BLOCK_MS_CONFIG); + this.requestTimeoutMs = config.getInt(ProducerConfig.REQUEST_TIMEOUT_MS_CONFIG); this.transactionManager = configureTransactionState(config); int retries = configureRetries(config, transactionManager != null); int maxInflightRequests = configureInflightRequests(config, transactionManager != null); @@ -335,47 +335,6 @@ public class KafkaProducer implements Producer { return serializer instanceof ExtendedSerializer ? (ExtendedSerializer) serializer : new ExtendedSerializer.Wrapper<>(serializer); } - private static long configureMaxBlockTime(ProducerConfig config, Map userProvidedConfigs) { - /* check for user defined settings. - * If the BLOCK_ON_BUFFER_FULL is set to true,we do not honor METADATA_FETCH_TIMEOUT_CONFIG. - * This should be removed with release 0.9 when the deprecated configs are removed. - */ - if (userProvidedConfigs.containsKey(ProducerConfig.BLOCK_ON_BUFFER_FULL_CONFIG)) { - log.warn(ProducerConfig.BLOCK_ON_BUFFER_FULL_CONFIG + " config is deprecated and will be removed soon. " + - "Please use " + ProducerConfig.MAX_BLOCK_MS_CONFIG); - boolean blockOnBufferFull = config.getBoolean(ProducerConfig.BLOCK_ON_BUFFER_FULL_CONFIG); - if (blockOnBufferFull) { - return Long.MAX_VALUE; - } else if (userProvidedConfigs.containsKey(ProducerConfig.METADATA_FETCH_TIMEOUT_CONFIG)) { - log.warn(ProducerConfig.METADATA_FETCH_TIMEOUT_CONFIG + " config is deprecated and will be removed soon. " + - "Please use " + ProducerConfig.MAX_BLOCK_MS_CONFIG); - return config.getLong(ProducerConfig.METADATA_FETCH_TIMEOUT_CONFIG); - } else { - return config.getLong(ProducerConfig.MAX_BLOCK_MS_CONFIG); - } - } else if (userProvidedConfigs.containsKey(ProducerConfig.METADATA_FETCH_TIMEOUT_CONFIG)) { - log.warn(ProducerConfig.METADATA_FETCH_TIMEOUT_CONFIG + " config is deprecated and will be removed soon. " + - "Please use " + ProducerConfig.MAX_BLOCK_MS_CONFIG); - return config.getLong(ProducerConfig.METADATA_FETCH_TIMEOUT_CONFIG); - } else { - return config.getLong(ProducerConfig.MAX_BLOCK_MS_CONFIG); - } - } - - private static int configureRequestTimeout(ProducerConfig config, Map userProvidedConfigs) { - /* check for user defined settings. - * If the TIME_OUT config is set use that for request timeout. - * This should be removed with release 0.9 - */ - if (userProvidedConfigs.containsKey(ProducerConfig.TIMEOUT_CONFIG)) { - log.warn(ProducerConfig.TIMEOUT_CONFIG + " config is deprecated and will be removed soon. Please use " + - ProducerConfig.REQUEST_TIMEOUT_MS_CONFIG); - return config.getInt(ProducerConfig.TIMEOUT_CONFIG); - } else { - return config.getInt(ProducerConfig.REQUEST_TIMEOUT_MS_CONFIG); - } - } - private static TransactionManager configureTransactionState(ProducerConfig config) { TransactionManager transactionManager = null; diff --git a/clients/src/main/java/org/apache/kafka/clients/producer/ProducerConfig.java b/clients/src/main/java/org/apache/kafka/clients/producer/ProducerConfig.java index 4bceb9528b9..12e8c64644b 100644 --- a/clients/src/main/java/org/apache/kafka/clients/producer/ProducerConfig.java +++ b/clients/src/main/java/org/apache/kafka/clients/producer/ProducerConfig.java @@ -49,16 +49,6 @@ public class ProducerConfig extends AbstractConfig { /** bootstrap.servers */ public static final String BOOTSTRAP_SERVERS_CONFIG = CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG; - /** metadata.fetch.timeout.ms */ - /** - * @deprecated This config will be removed in a future release. Please use {@link #MAX_BLOCK_MS_CONFIG} - */ - @Deprecated - public static final String METADATA_FETCH_TIMEOUT_CONFIG = "metadata.fetch.timeout.ms"; - private static final String METADATA_FETCH_TIMEOUT_DOC = "The first time data is sent to a topic we must fetch metadata about that topic to know which servers " - + "host the topic's partitions. This config specifies the maximum time, in milliseconds, for this fetch " - + "to succeed before throwing an exception back to the client."; - /** metadata.max.age.ms */ public static final String METADATA_MAX_AGE_CONFIG = CommonClientConfigs.METADATA_MAX_AGE_CONFIG; private static final String METADATA_MAX_AGE_DOC = CommonClientConfigs.METADATA_MAX_AGE_DOC; @@ -94,18 +84,6 @@ public class ProducerConfig extends AbstractConfig { + " acknowledge the record. This guarantees that the record will not be lost as long as at least one in-sync replica" + " remains alive. This is the strongest available guarantee. This is equivalent to the acks=-1 setting."; - /** timeout.ms */ - - /** - * @deprecated This config will be removed in a future release. Please use {@link #REQUEST_TIMEOUT_MS_CONFIG} - */ - @Deprecated - public static final String TIMEOUT_CONFIG = "timeout.ms"; - private static final String TIMEOUT_DOC = "The configuration controls the maximum amount of time the server will wait for acknowledgments from followers to " - + "meet the acknowledgment requirements the producer has specified with the acks configuration. If the " - + "requested number of acknowledgments are not met when the timeout elapses an error will be returned. This timeout " - + "is measured on the server side and does not include the network latency of the request."; - /** linger.ms */ public static final String LINGER_MS_CONFIG = "linger.ms"; private static final String LINGER_MS_DOC = "The producer groups together any records that arrive in between request transmissions into a single batched request. " @@ -143,19 +121,6 @@ public class ProducerConfig extends AbstractConfig { + "These methods can be blocked either because the buffer is full or metadata unavailable." + "Blocking in the user-supplied serializers or partitioner will not be counted against this timeout."; - /** block.on.buffer.full */ - /** - * @deprecated This config will be removed in a future release. Please use {@link #MAX_BLOCK_MS_CONFIG}. - */ - @Deprecated - public static final String BLOCK_ON_BUFFER_FULL_CONFIG = "block.on.buffer.full"; - private static final String BLOCK_ON_BUFFER_FULL_DOC = "When our memory buffer is exhausted we must either stop accepting new records (block) or throw errors. " - + "By default this setting is false and the producer will no longer throw a BufferExhaustException but instead will use the " + MAX_BLOCK_MS_CONFIG + " " - + "value to block, after which it will throw a TimeoutException. Setting this property to true will set the " + MAX_BLOCK_MS_CONFIG + " to Long.MAX_VALUE. " - + "Also if this property is set to true, parameter " + METADATA_FETCH_TIMEOUT_CONFIG + " is no longer honored." - + "

This parameter is deprecated and will be removed in a future release. " - + "Parameter " + MAX_BLOCK_MS_CONFIG + " should be used instead."; - /** buffer.memory */ public static final String BUFFER_MEMORY_CONFIG = "buffer.memory"; private static final String BUFFER_MEMORY_DOC = "The total bytes of memory the producer can use to buffer records waiting to be sent to the server. If records are " @@ -256,7 +221,6 @@ public class ProducerConfig extends AbstractConfig { ACKS_DOC) .define(COMPRESSION_TYPE_CONFIG, Type.STRING, "none", Importance.HIGH, COMPRESSION_TYPE_DOC) .define(BATCH_SIZE_CONFIG, Type.INT, 16384, atLeast(0), Importance.MEDIUM, BATCH_SIZE_DOC) - .define(TIMEOUT_CONFIG, Type.INT, 30 * 1000, atLeast(0), Importance.MEDIUM, TIMEOUT_DOC) .define(LINGER_MS_CONFIG, Type.LONG, 0, atLeast(0L), Importance.MEDIUM, LINGER_MS_DOC) .define(CLIENT_ID_CONFIG, Type.STRING, "", Importance.MEDIUM, CommonClientConfigs.CLIENT_ID_DOC) .define(SEND_BUFFER_CONFIG, Type.INT, 128 * 1024, atLeast(-1), Importance.MEDIUM, CommonClientConfigs.SEND_BUFFER_DOC) @@ -267,16 +231,9 @@ public class ProducerConfig extends AbstractConfig { atLeast(0), Importance.MEDIUM, MAX_REQUEST_SIZE_DOC) - .define(BLOCK_ON_BUFFER_FULL_CONFIG, Type.BOOLEAN, false, Importance.LOW, BLOCK_ON_BUFFER_FULL_DOC) .define(RECONNECT_BACKOFF_MS_CONFIG, Type.LONG, 50L, atLeast(0L), Importance.LOW, CommonClientConfigs.RECONNECT_BACKOFF_MS_DOC) .define(METRIC_REPORTER_CLASSES_CONFIG, Type.LIST, "", Importance.LOW, CommonClientConfigs.METRIC_REPORTER_CLASSES_DOC) .define(RETRY_BACKOFF_MS_CONFIG, Type.LONG, 100L, atLeast(0L), Importance.LOW, CommonClientConfigs.RETRY_BACKOFF_MS_DOC) - .define(METADATA_FETCH_TIMEOUT_CONFIG, - Type.LONG, - 60 * 1000, - atLeast(0), - Importance.LOW, - METADATA_FETCH_TIMEOUT_DOC) .define(MAX_BLOCK_MS_CONFIG, Type.LONG, 60 * 1000, diff --git a/docs/upgrade.html b/docs/upgrade.html index 227c7281da0..116d2ff7694 100644 --- a/docs/upgrade.html +++ b/docs/upgrade.html @@ -55,6 +55,8 @@

  • Unclean leader election is now disabled by default. The new default favors durability over availability. Users who wish to to retain the previous behavior should set the broker config unclean.leader.election.enabled to false.
  • +
  • Producer configs block.on.buffer.full, metadata.fetch.timeout.ms and timeout.ms have been + removed. They were initially deprecated in Kafka 0.9.0.0.
  • The offsets.topic.replication.factor broker config is now enforced upon auto topic creation. Internal auto topic creation will fail with a GROUP_COORDINATOR_NOT_AVAILABLE error until the cluster size meets this replication factor requirement.