Browse Source

KAFKA-9216: Enforce internal config topic settings for Connect workers during startup (#8270)

Currently, Kafka Connect creates its config backing topic with a fire and forget approach.
This is fine unless someone has manually created that topic already with the wrong partition count.

In such a case Kafka Connect may run for some time. Especially if it's in standalone mode and once switched to distributed mode it will almost certainly fail.

This commits adds a check when the KafkaConfigBackingStore is starting.
This check will throw a ConfigException if there is more than one partition in the backing store.

This exception is then caught upstream and logged by either:
- DistributedHerder#run
- ConnectStandalone#main

A unit tests was added in KafkaConfigBackingStoreTest to verify the behaviour.

Author: Evelyn Bayes <evelyn@confluent.io>
Co-authored-by: Randall Hauch <rhauch@gmail.com>

Reviewer: Konstantine Karantasis <konstantine@confluent.io>
pull/8640/head
Evelyn Bayes 5 years ago committed by GitHub
parent
commit
9a0b694a66
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 10
      connect/runtime/src/main/java/org/apache/kafka/connect/storage/KafkaConfigBackingStore.java
  2. 5
      connect/runtime/src/main/java/org/apache/kafka/connect/util/KafkaBasedLog.java
  3. 36
      connect/runtime/src/test/java/org/apache/kafka/connect/storage/KafkaConfigBackingStoreTest.java

10
connect/runtime/src/main/java/org/apache/kafka/connect/storage/KafkaConfigBackingStore.java

@ -265,6 +265,16 @@ public class KafkaConfigBackingStore implements ConfigBackingStore { @@ -265,6 +265,16 @@ public class KafkaConfigBackingStore implements ConfigBackingStore {
// Before startup, callbacks are *not* invoked. You can grab a snapshot after starting -- just take care that
// updates can continue to occur in the background
configLog.start();
int partitionCount = configLog.partitionCount();
if (partitionCount > 1) {
String msg = String.format("Topic '%s' supplied via the '%s' property is required "
+ "to have a single partition in order to guarantee consistency of "
+ "connector configurations, but found %d partitions.",
topic, DistributedConfig.CONFIG_TOPIC_CONFIG, partitionCount);
throw new ConfigException(msg);
}
started = true;
log.info("Started KafkaConfigBackingStore");
}

5
connect/runtime/src/main/java/org/apache/kafka/connect/util/KafkaBasedLog.java

@ -74,6 +74,7 @@ public class KafkaBasedLog<K, V> { @@ -74,6 +74,7 @@ public class KafkaBasedLog<K, V> {
private Time time;
private final String topic;
private int partitionCount;
private final Map<String, Object> producerConfigs;
private final Map<String, Object> consumerConfigs;
private final Callback<ConsumerRecord<K, V>> consumedCallback;
@ -145,6 +146,7 @@ public class KafkaBasedLog<K, V> { @@ -145,6 +146,7 @@ public class KafkaBasedLog<K, V> {
for (PartitionInfo partition : partitionInfos)
partitions.add(new TopicPartition(partition.topic(), partition.partition()));
partitionCount = partitions.size();
consumer.assign(partitions);
// Always consume from the beginning of all partitions. Necessary to ensure that we don't use committed offsets
@ -238,6 +240,9 @@ public class KafkaBasedLog<K, V> { @@ -238,6 +240,9 @@ public class KafkaBasedLog<K, V> {
producer.send(new ProducerRecord<>(topic, key, value), callback);
}
public int partitionCount() {
return partitionCount;
}
private Producer<K, V> createProducer() {
// Always require producer acks to all to ensure durable writes

36
connect/runtime/src/test/java/org/apache/kafka/connect/storage/KafkaConfigBackingStoreTest.java

@ -22,6 +22,7 @@ import org.apache.kafka.clients.consumer.ConsumerConfig; @@ -22,6 +22,7 @@ import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.common.record.TimestampType;
import org.apache.kafka.common.config.ConfigException;
import org.apache.kafka.connect.data.Field;
import org.apache.kafka.connect.data.Schema;
import org.apache.kafka.connect.data.SchemaAndValue;
@ -60,6 +61,7 @@ import java.util.concurrent.TimeUnit; @@ -60,6 +61,7 @@ import java.util.concurrent.TimeUnit;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertNull;
import static org.junit.Assert.assertThrows;
import static org.junit.Assert.assertTrue;
@RunWith(PowerMockRunner.class)
@ -161,6 +163,7 @@ public class KafkaConfigBackingStoreTest { @@ -161,6 +163,7 @@ public class KafkaConfigBackingStoreTest {
public void testStartStop() throws Exception {
expectConfigure();
expectStart(Collections.emptyList(), Collections.emptyMap());
expectPartitionCount(1);
expectStop();
PowerMock.replayAll();
@ -208,6 +211,7 @@ public class KafkaConfigBackingStoreTest { @@ -208,6 +211,7 @@ public class KafkaConfigBackingStoreTest {
configUpdateListener.onConnectorConfigRemove(CONNECTOR_IDS.get(1));
EasyMock.expectLastCall();
expectPartitionCount(1);
expectStop();
PowerMock.replayAll();
@ -276,6 +280,7 @@ public class KafkaConfigBackingStoreTest { @@ -276,6 +280,7 @@ public class KafkaConfigBackingStoreTest {
serializedConfigs.put(COMMIT_TASKS_CONFIG_KEYS.get(0), CONFIGS_SERIALIZED.get(2));
expectReadToEnd(serializedConfigs);
expectPartitionCount(1);
expectStop();
PowerMock.replayAll();
@ -360,6 +365,7 @@ public class KafkaConfigBackingStoreTest { @@ -360,6 +365,7 @@ public class KafkaConfigBackingStoreTest {
serializedConfigs.put(COMMIT_TASKS_CONFIG_KEYS.get(1), CONFIGS_SERIALIZED.get(4));
expectReadToEnd(serializedConfigs);
expectPartitionCount(1);
expectStop();
PowerMock.replayAll();
@ -421,6 +427,7 @@ public class KafkaConfigBackingStoreTest { @@ -421,6 +427,7 @@ public class KafkaConfigBackingStoreTest {
serializedConfigs.put(COMMIT_TASKS_CONFIG_KEYS.get(0), CONFIGS_SERIALIZED.get(0));
expectReadToEnd(serializedConfigs);
expectPartitionCount(1);
expectStop();
PowerMock.replayAll();
@ -474,6 +481,7 @@ public class KafkaConfigBackingStoreTest { @@ -474,6 +481,7 @@ public class KafkaConfigBackingStoreTest {
// Shouldn't see any callbacks since this is during startup
expectPartitionCount(1);
expectStop();
PowerMock.replayAll();
@ -516,6 +524,7 @@ public class KafkaConfigBackingStoreTest { @@ -516,6 +524,7 @@ public class KafkaConfigBackingStoreTest {
configUpdateListener.onConnectorTargetStateChange(CONNECTOR_IDS.get(0));
EasyMock.expectLastCall();
expectPartitionCount(1);
expectStop();
PowerMock.replayAll();
@ -566,6 +575,7 @@ public class KafkaConfigBackingStoreTest { @@ -566,6 +575,7 @@ public class KafkaConfigBackingStoreTest {
configUpdateListener.onConnectorConfigRemove(CONNECTOR_IDS.get(0));
EasyMock.expectLastCall();
expectPartitionCount(1);
expectStop();
PowerMock.replayAll();
@ -611,6 +621,7 @@ public class KafkaConfigBackingStoreTest { @@ -611,6 +621,7 @@ public class KafkaConfigBackingStoreTest {
logOffset = 5;
expectStart(existingRecords, deserialized);
expectPartitionCount(1);
// Shouldn't see any callbacks since this is during startup
@ -658,6 +669,7 @@ public class KafkaConfigBackingStoreTest { @@ -658,6 +669,7 @@ public class KafkaConfigBackingStoreTest {
deserialized.put(CONFIGS_SERIALIZED.get(6), TASK_CONFIG_STRUCTS.get(1));
logOffset = 7;
expectStart(existingRecords, deserialized);
expectPartitionCount(1);
// Shouldn't see any callbacks since this is during startup
@ -712,6 +724,7 @@ public class KafkaConfigBackingStoreTest { @@ -712,6 +724,7 @@ public class KafkaConfigBackingStoreTest {
logOffset = 6;
expectStart(existingRecords, deserialized);
expectPartitionCount(1);
// Shouldn't see any callbacks since this is during startup
@ -759,6 +772,7 @@ public class KafkaConfigBackingStoreTest { @@ -759,6 +772,7 @@ public class KafkaConfigBackingStoreTest {
deserialized.put(CONFIGS_SERIALIZED.get(7), TASKS_COMMIT_STRUCT_ZERO_TASK_CONNECTOR);
logOffset = 8;
expectStart(existingRecords, deserialized);
expectPartitionCount(1);
// Shouldn't see any callbacks since this is during startup
@ -806,6 +820,7 @@ public class KafkaConfigBackingStoreTest { @@ -806,6 +820,7 @@ public class KafkaConfigBackingStoreTest {
deserialized.put(CONFIGS_SERIALIZED.get(5), TASK_CONFIG_STRUCTS.get(1));
logOffset = 6;
expectStart(existingRecords, deserialized);
expectPartitionCount(1);
// Successful attempt to write new task config
expectReadToEnd(new LinkedHashMap<>());
@ -860,6 +875,22 @@ public class KafkaConfigBackingStoreTest { @@ -860,6 +875,22 @@ public class KafkaConfigBackingStoreTest {
PowerMock.verifyAll();
}
@Test
public void testExceptionOnStartWhenConfigTopicHasMultiplePartitions() throws Exception {
expectConfigure();
expectStart(Collections.emptyList(), Collections.emptyMap());
expectPartitionCount(2);
PowerMock.replayAll();
configStorage.setupAndCreateKafkaBasedLog(TOPIC, DEFAULT_DISTRIBUTED_CONFIG);
ConfigException e = assertThrows(ConfigException.class, () -> configStorage.start());
assertTrue(e.getMessage().contains("required to have a single partition"));
PowerMock.verifyAll();
}
private void expectConfigure() throws Exception {
PowerMock.expectPrivate(configStorage, "createKafkaBasedLog",
EasyMock.capture(capturedTopic), EasyMock.capture(capturedProducerProps),
@ -868,6 +899,11 @@ public class KafkaConfigBackingStoreTest { @@ -868,6 +899,11 @@ public class KafkaConfigBackingStoreTest {
.andReturn(storeLog);
}
private void expectPartitionCount(int partitionCount) {
EasyMock.expect(storeLog.partitionCount())
.andReturn(partitionCount);
}
// If non-empty, deserializations should be a LinkedHashMap
private void expectStart(final List<ConsumerRecord<String, byte[]>> preexistingRecords,
final Map<byte[], Struct> deserializations) {

Loading…
Cancel
Save