Browse Source

KAFKA-7002: Add a config property for DLQ topic's replication factor (KIP-298)

Currently, the replication factor is hardcoded to a value of 3. This means that we cannot use a DLQ in any cluster setup with less than three brokers. It is better to have the user specify this value if the default value does meet the requirements.

Testing: A unit test is added.

Signed-off-by: Arjun Satish <arjunconfluent.io>

Author: Arjun Satish <arjun@confluent.io>

Reviewers: Randall Hauch <rhauch@gmail.com>, Ewen Cheslack-Postava <ewen@confluent.io>

Closes #5145 from wicknicks/KAFKA-7002
pull/5164/head
Arjun Satish 7 years ago committed by Ewen Cheslack-Postava
parent
commit
22612be46d
  1. 12
      connect/runtime/src/main/java/org/apache/kafka/connect/runtime/SinkConnectorConfig.java
  2. 9
      connect/runtime/src/main/java/org/apache/kafka/connect/runtime/errors/DeadLetterQueueReporter.java
  3. 9
      connect/runtime/src/test/java/org/apache/kafka/connect/runtime/errors/ErrorReporterTest.java

12
connect/runtime/src/main/java/org/apache/kafka/connect/runtime/SinkConnectorConfig.java

@ -52,10 +52,16 @@ public class SinkConnectorConfig extends ConnectorConfig { @@ -52,10 +52,16 @@ public class SinkConnectorConfig extends ConnectorConfig {
public static final String DLQ_TOPIC_DEFAULT = "";
private static final String DLQ_TOPIC_DISPLAY = "Dead Letter Queue Topic Name";
public static final String DLQ_TOPIC_REPLICATION_FACTOR_CONFIG = DLQ_PREFIX + "topic.replication.factor";
private static final String DLQ_TOPIC_REPLICATION_FACTOR_CONFIG_DOC = "Replication factor used to create the dead letter queue topic when it doesn't already exist.";
public static final short DLQ_TOPIC_REPLICATION_FACTOR_CONFIG_DEFAULT = 3;
private static final String DLQ_TOPIC_REPLICATION_FACTOR_CONFIG_DISPLAY = "Dead Letter Queue Topic Replication Factor";
static ConfigDef config = ConnectorConfig.configDef()
.define(TOPICS_CONFIG, ConfigDef.Type.LIST, TOPICS_DEFAULT, ConfigDef.Importance.HIGH, TOPICS_DOC, COMMON_GROUP, 4, ConfigDef.Width.LONG, TOPICS_DISPLAY)
.define(TOPICS_REGEX_CONFIG, ConfigDef.Type.STRING, TOPICS_REGEX_DEFAULT, new RegexValidator(), ConfigDef.Importance.HIGH, TOPICS_REGEX_DOC, COMMON_GROUP, 4, ConfigDef.Width.LONG, TOPICS_REGEX_DISPLAY)
.define(DLQ_TOPIC_NAME_CONFIG, ConfigDef.Type.STRING, DLQ_TOPIC_DEFAULT, Importance.MEDIUM, DLQ_TOPIC_NAME_DOC, ERROR_GROUP, 6, ConfigDef.Width.MEDIUM, DLQ_TOPIC_DISPLAY);
.define(DLQ_TOPIC_NAME_CONFIG, ConfigDef.Type.STRING, DLQ_TOPIC_DEFAULT, Importance.MEDIUM, DLQ_TOPIC_NAME_DOC, ERROR_GROUP, 6, ConfigDef.Width.MEDIUM, DLQ_TOPIC_DISPLAY)
.define(DLQ_TOPIC_REPLICATION_FACTOR_CONFIG, ConfigDef.Type.SHORT, DLQ_TOPIC_REPLICATION_FACTOR_CONFIG_DEFAULT, Importance.MEDIUM, DLQ_TOPIC_REPLICATION_FACTOR_CONFIG_DOC, ERROR_GROUP, 7, ConfigDef.Width.MEDIUM, DLQ_TOPIC_REPLICATION_FACTOR_CONFIG_DISPLAY);
public static ConfigDef configDef() {
return config;
@ -97,4 +103,8 @@ public class SinkConnectorConfig extends ConnectorConfig { @@ -97,4 +103,8 @@ public class SinkConnectorConfig extends ConnectorConfig {
public String dlqTopicName() {
return getString(DLQ_TOPIC_NAME_CONFIG);
}
public short dlqTopicReplicationFactor() {
return getShort(DLQ_TOPIC_REPLICATION_FACTOR_CONFIG);
}
}

9
connect/runtime/src/main/java/org/apache/kafka/connect/runtime/errors/DeadLetterQueueReporter.java

@ -44,7 +44,6 @@ public class DeadLetterQueueReporter implements ErrorReporter { @@ -44,7 +44,6 @@ public class DeadLetterQueueReporter implements ErrorReporter {
private static final Logger log = LoggerFactory.getLogger(DeadLetterQueueReporter.class);
private static final short DLQ_MAX_DESIRED_REPLICATION_FACTOR = 3;
private static final int DLQ_NUM_DESIRED_PARTITIONS = 1;
private final SinkConnectorConfig connConfig;
@ -53,13 +52,13 @@ public class DeadLetterQueueReporter implements ErrorReporter { @@ -53,13 +52,13 @@ public class DeadLetterQueueReporter implements ErrorReporter {
private ErrorHandlingMetrics errorHandlingMetrics;
public static DeadLetterQueueReporter createAndSetup(WorkerConfig workerConfig,
SinkConnectorConfig connConfig, Map<String, Object> producerProps) {
String topic = connConfig.dlqTopicName();
SinkConnectorConfig sinkConfig, Map<String, Object> producerProps) {
String topic = sinkConfig.dlqTopicName();
try (AdminClient admin = AdminClient.create(workerConfig.originals())) {
if (!admin.listTopics().names().get().contains(topic)) {
log.error("Topic {} doesn't exist. Will attempt to create topic.", topic);
NewTopic schemaTopicRequest = new NewTopic(topic, DLQ_NUM_DESIRED_PARTITIONS, DLQ_MAX_DESIRED_REPLICATION_FACTOR);
NewTopic schemaTopicRequest = new NewTopic(topic, DLQ_NUM_DESIRED_PARTITIONS, sinkConfig.dlqTopicReplicationFactor());
admin.createTopics(singleton(schemaTopicRequest)).all().get();
}
} catch (InterruptedException e) {
@ -71,7 +70,7 @@ public class DeadLetterQueueReporter implements ErrorReporter { @@ -71,7 +70,7 @@ public class DeadLetterQueueReporter implements ErrorReporter {
}
KafkaProducer<byte[], byte[]> dlqProducer = new KafkaProducer<>(producerProps);
return new DeadLetterQueueReporter(dlqProducer, connConfig);
return new DeadLetterQueueReporter(dlqProducer, sinkConfig);
}
/**

9
connect/runtime/src/test/java/org/apache/kafka/connect/runtime/errors/ErrorReporterTest.java

@ -180,6 +180,15 @@ public class ErrorReporterTest { @@ -180,6 +180,15 @@ public class ErrorReporterTest {
"partition=5, offset=100}.", msg);
}
@Test
public void testSetDLQConfigs() {
SinkConnectorConfig configuration = config(singletonMap(SinkConnectorConfig.DLQ_TOPIC_NAME_CONFIG, DLQ_TOPIC));
assertEquals(configuration.dlqTopicName(), DLQ_TOPIC);
configuration = config(singletonMap(SinkConnectorConfig.DLQ_TOPIC_REPLICATION_FACTOR_CONFIG, "7"));
assertEquals(configuration.dlqTopicReplicationFactor(), 7);
}
private ProcessingContext processingContext() {
ProcessingContext context = new ProcessingContext();
context.consumerRecord(new ConsumerRecord<>(TOPIC, 5, 100, new byte[]{'a', 'b'}, new byte[]{'x'}));

Loading…
Cancel
Save