Browse Source

KAFKA-13759: Disable idempotence by default in producers instantiated by Connect (#11933)

With AK 3.0, idempotence was enabled by default in Kafka producers. However, if idempotence is enabled, Connect won't be able to communicate via its producers with Kafka brokers older than version 0.11. Perhaps more importantly, for brokers older than version 2.8 the IDEMPOTENT_WRITE ACL is required to be granted to the principal of the Connect worker.

Therefore this commit disables producer idempotence by default to all the producers instantiated by Connect. Users can still choose to enable producer idempotence by explicitly setting the right worker and/or connector properties.

The changes were tested via existing unit, integration and system tests.

Reviewers: Randall Hauch <rhauch@gmail.com>
pull/11948/head
Konstantine Karantasis 3 years ago committed by GitHub
parent
commit
6ce69021fd
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 6
      connect/runtime/src/main/java/org/apache/kafka/connect/runtime/Worker.java
  2. 6
      connect/runtime/src/main/java/org/apache/kafka/connect/storage/KafkaConfigBackingStore.java
  3. 6
      connect/runtime/src/main/java/org/apache/kafka/connect/storage/KafkaOffsetBackingStore.java
  4. 7
      connect/runtime/src/main/java/org/apache/kafka/connect/storage/KafkaStatusBackingStore.java
  5. 6
      connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerTest.java
  6. 10
      docs/upgrade.html

6
connect/runtime/src/main/java/org/apache/kafka/connect/runtime/Worker.java

@ -648,6 +648,12 @@ public class Worker { @@ -648,6 +648,12 @@ public class Worker {
// These settings will execute infinite retries on retriable exceptions. They *may* be overridden via configs passed to the worker,
// but this may compromise the delivery guarantees of Kafka Connect.
producerProps.put(ProducerConfig.MAX_BLOCK_MS_CONFIG, Long.toString(Long.MAX_VALUE));
// By default, Connect disables idempotent behavior for all producers, even though idempotence became
// default for Kafka producers. This is to ensure Connect continues to work with many Kafka broker versions, including older brokers that do not support
// idempotent producers or require explicit steps to enable them (e.g. adding the IDEMPOTENT_WRITE ACL to brokers older than 2.8).
// These settings might change when https://cwiki.apache.org/confluence/display/KAFKA/KIP-318%3A+Make+Kafka+Connect+Source+idempotent
// gets approved and scheduled for release.
producerProps.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, "false");
producerProps.put(ProducerConfig.ACKS_CONFIG, "all");
producerProps.put(ProducerConfig.MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION, "1");
producerProps.put(ProducerConfig.DELIVERY_TIMEOUT_MS_CONFIG, Integer.toString(Integer.MAX_VALUE));

6
connect/runtime/src/main/java/org/apache/kafka/connect/storage/KafkaConfigBackingStore.java

@ -511,6 +511,12 @@ public class KafkaConfigBackingStore implements ConfigBackingStore { @@ -511,6 +511,12 @@ public class KafkaConfigBackingStore implements ConfigBackingStore {
producerProps.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
producerProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, ByteArraySerializer.class.getName());
producerProps.put(ProducerConfig.DELIVERY_TIMEOUT_MS_CONFIG, Integer.MAX_VALUE);
// By default, Connect disables idempotent behavior for all producers, even though idempotence became
// default for Kafka producers. This is to ensure Connect continues to work with many Kafka broker versions, including older brokers that do not support
// idempotent producers or require explicit steps to enable them (e.g. adding the IDEMPOTENT_WRITE ACL to brokers older than 2.8).
// These settings might change when https://cwiki.apache.org/confluence/display/KAFKA/KIP-318%3A+Make+Kafka+Connect+Source+idempotent
// gets approved and scheduled for release.
producerProps.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, "false");
ConnectUtils.addMetricsContextProperties(producerProps, config, clusterId);
Map<String, Object> consumerProps = new HashMap<>(originals);

6
connect/runtime/src/main/java/org/apache/kafka/connect/storage/KafkaOffsetBackingStore.java

@ -91,6 +91,12 @@ public class KafkaOffsetBackingStore implements OffsetBackingStore { @@ -91,6 +91,12 @@ public class KafkaOffsetBackingStore implements OffsetBackingStore {
producerProps.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, ByteArraySerializer.class.getName());
producerProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, ByteArraySerializer.class.getName());
producerProps.put(ProducerConfig.DELIVERY_TIMEOUT_MS_CONFIG, Integer.MAX_VALUE);
// By default, Connect disables idempotent behavior for all producers, even though idempotence became
// default for Kafka producers. This is to ensure Connect continues to work with many Kafka broker versions, including older brokers that do not support
// idempotent producers or require explicit steps to enable them (e.g. adding the IDEMPOTENT_WRITE ACL to brokers older than 2.8).
// These settings might change when https://cwiki.apache.org/confluence/display/KAFKA/KIP-318%3A+Make+Kafka+Connect+Source+idempotent
// gets approved and scheduled for release.
producerProps.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, "false");
ConnectUtils.addMetricsContextProperties(producerProps, config, clusterId);
Map<String, Object> consumerProps = new HashMap<>(originals);

7
connect/runtime/src/main/java/org/apache/kafka/connect/storage/KafkaStatusBackingStore.java

@ -170,7 +170,12 @@ public class KafkaStatusBackingStore implements StatusBackingStore { @@ -170,7 +170,12 @@ public class KafkaStatusBackingStore implements StatusBackingStore {
producerProps.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
producerProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, ByteArraySerializer.class.getName());
producerProps.put(ProducerConfig.RETRIES_CONFIG, 0); // we handle retries in this class
producerProps.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, false); // disable idempotence since retries is force to 0
// By default, Connect disables idempotent behavior for all producers, even though idempotence became
// default for Kafka producers. This is to ensure Connect continues to work with many Kafka broker versions, including older brokers that do not support
// idempotent producers or require explicit steps to enable them (e.g. adding the IDEMPOTENT_WRITE ACL to brokers older than 2.8).
// These settings might change when https://cwiki.apache.org/confluence/display/KAFKA/KIP-318%3A+Make+Kafka+Connect+Source+idempotent
// gets approved and scheduled for release.
producerProps.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, "false"); // disable idempotence since retries is force to 0
ConnectUtils.addMetricsContextProperties(producerProps, config, clusterId);
Map<String, Object> consumerProps = new HashMap<>(originals);

6
connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerTest.java

@ -213,6 +213,12 @@ public class WorkerTest extends ThreadedTest { @@ -213,6 +213,12 @@ public class WorkerTest extends ThreadedTest {
defaultProducerConfigs.put(
ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.ByteArraySerializer");
defaultProducerConfigs.put(ProducerConfig.MAX_BLOCK_MS_CONFIG, Long.toString(Long.MAX_VALUE));
// By default, producers that are instantiated and used by Connect have idempotency disabled even after idempotency became
// default for Kafka producers. This is chosen to avoid breaking changes when Connect contacts Kafka brokers that do not support
// idempotent producers or require explicit steps to enable them (e.g. adding the IDEMPOTENT_WRITE ACL to brokers older than 2.8).
// These settings might change when https://cwiki.apache.org/confluence/display/KAFKA/KIP-318%3A+Make+Kafka+Connect+Source+idempotent
// gets approved and scheduled for release.
defaultProducerConfigs.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, "false");
defaultProducerConfigs.put(ProducerConfig.ACKS_CONFIG, "all");
defaultProducerConfigs.put(ProducerConfig.MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION, "1");
defaultProducerConfigs.put(ProducerConfig.DELIVERY_TIMEOUT_MS_CONFIG, Integer.toString(Integer.MAX_VALUE));

10
docs/upgrade.html

@ -25,6 +25,11 @@ @@ -25,6 +25,11 @@
which meant that idempotence remained disabled unless the user had explicitly set <code>enable.idempotence</code> to true
(See <a href="https://issues.apache.org/jira/browse/KAFKA-13598">KAFKA-13598</a>for more details).
This issue was fixed and the default is properly applied in 3.0.1, 3.1.1, and 3.2.0.</li>
<li>A notable exception is Connect that by default disables idempotent behavior for all of its
producers in order to uniformly support using a wide range of Kafka broker versions.
Users can change this behavior to enable idempotence for some or all producers
via Connect worker and/or connector configuration. Connect may enable idempotent producers
by default in a future major release.</li>
</ul>
<h4><a id="upgrade_3_1_0" href="#upgrade_3_1_0">Upgrading to 3.1.0 from any version 0.8.x through 3.0.x</a></h4>
@ -75,6 +80,11 @@ @@ -75,6 +80,11 @@
A bug prevented the producer idempotence default from being applied which meant that it remained disabled unless the user had explicitly set
<code>enable.idempotence</code> to true. See <a href="https://issues.apache.org/jira/browse/KAFKA-13598">KAFKA-13598</a>for more details.
This issue was fixed and the default is properly applied.</li>
<li>A notable exception is Connect that by default disables idempotent behavior for all of its
producers in order to uniformly support using a wide range of Kafka broker versions.
Users can change this behavior to enable idempotence for some or all producers
via Connect worker and/or connector configuration. Connect may enable idempotent producers
by default in a future major release.</li>
</ul>
<h5><a id="upgrade_310_notable" href="#upgrade_310_notable">Notable changes in 3.1.0</a></h5>

Loading…
Cancel
Save