Browse Source

KAFKA-3845: KIP-75: Add per-connector converters

Author: Ewen Cheslack-Postava <me@ewencp.org>

Reviewers: Shikhar Bhushan, Gwen Shapira

Closes #1721 from ewencp/kafka-3845-per-connector-converters
pull/1721/merge
Ewen Cheslack-Postava 9 years ago committed by Gwen Shapira
parent
commit
05ed54bf2b
  1. 2
      build.gradle
  2. 12
      connect/runtime/src/main/java/org/apache/kafka/connect/runtime/ConnectorConfig.java
  3. 31
      connect/runtime/src/main/java/org/apache/kafka/connect/runtime/Worker.java
  4. 6
      connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/DistributedHerder.java
  5. 2
      connect/runtime/src/main/java/org/apache/kafka/connect/runtime/standalone/StandaloneConfig.java
  6. 15
      connect/runtime/src/main/java/org/apache/kafka/connect/runtime/standalone/StandaloneHerder.java
  7. 113
      connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerTest.java
  8. 20
      connect/runtime/src/test/java/org/apache/kafka/connect/runtime/distributed/DistributedHerderTest.java
  9. 117
      connect/runtime/src/test/java/org/apache/kafka/connect/runtime/standalone/StandaloneHerderTest.java
  10. 16
      docs/connect.html
  11. 4
      tests/kafkatest/tests/connect/connect_rest_test.py
  12. 13
      tests/kafkatest/tests/connect/connect_test.py
  13. 10
      tests/kafkatest/tests/connect/templates/connect-file-sink.properties
  14. 10
      tests/kafkatest/tests/connect/templates/connect-file-source.properties

2
build.gradle

@ -874,8 +874,8 @@ project(':connect:runtime') { @@ -874,8 +874,8 @@ project(':connect:runtime') {
testCompile libs.junit
testCompile libs.powermock
testCompile libs.powermockEasymock
testCompile project(":connect:json")
testRuntime project(":connect:json")
testRuntime libs.slf4jlog4j
}

12
connect/runtime/src/main/java/org/apache/kafka/connect/runtime/ConnectorConfig.java

@ -53,6 +53,14 @@ public class ConnectorConfig extends AbstractConfig { @@ -53,6 +53,14 @@ public class ConnectorConfig extends AbstractConfig {
" or use \"FileStreamSink\" or \"FileStreamSinkConnector\" to make the configuration a bit shorter";
private static final String CONNECTOR_CLASS_DISPLAY = "Connector class";
public static final String KEY_CONVERTER_CLASS_CONFIG = WorkerConfig.KEY_CONVERTER_CLASS_CONFIG;
public static final String KEY_CONVERTER_CLASS_DOC = WorkerConfig.KEY_CONVERTER_CLASS_DOC;
public static final String KEY_CONVERTER_CLASS_DISPLAY = "Key converter class";
public static final String VALUE_CONVERTER_CLASS_CONFIG = WorkerConfig.VALUE_CONVERTER_CLASS_CONFIG;
public static final String VALUE_CONVERTER_CLASS_DOC = WorkerConfig.VALUE_CONVERTER_CLASS_DOC;
public static final String VALUE_CONVERTER_CLASS_DISPLAY = "Value converter class";
public static final String TASKS_MAX_CONFIG = "tasks.max";
private static final String TASKS_MAX_DOC = "Maximum number of tasks to use for this connector.";
public static final int TASKS_MAX_DEFAULT = 1;
@ -64,7 +72,9 @@ public class ConnectorConfig extends AbstractConfig { @@ -64,7 +72,9 @@ public class ConnectorConfig extends AbstractConfig {
return new ConfigDef()
.define(NAME_CONFIG, Type.STRING, Importance.HIGH, NAME_DOC, COMMON_GROUP, 1, Width.MEDIUM, NAME_DISPLAY)
.define(CONNECTOR_CLASS_CONFIG, Type.STRING, Importance.HIGH, CONNECTOR_CLASS_DOC, COMMON_GROUP, 2, Width.LONG, CONNECTOR_CLASS_DISPLAY)
.define(TASKS_MAX_CONFIG, Type.INT, TASKS_MAX_DEFAULT, atLeast(TASKS_MIN_CONFIG), Importance.HIGH, TASKS_MAX_DOC, COMMON_GROUP, 3, Width.SHORT, TASK_MAX_DISPLAY);
.define(TASKS_MAX_CONFIG, Type.INT, TASKS_MAX_DEFAULT, atLeast(TASKS_MIN_CONFIG), Importance.HIGH, TASKS_MAX_DOC, COMMON_GROUP, 3, Width.SHORT, TASK_MAX_DISPLAY)
.define(KEY_CONVERTER_CLASS_CONFIG, Type.CLASS, null, Importance.LOW, KEY_CONVERTER_CLASS_DOC, COMMON_GROUP, 4, Width.SHORT, KEY_CONVERTER_CLASS_DISPLAY)
.define(VALUE_CONVERTER_CLASS_CONFIG, Type.CLASS, null, Importance.LOW, VALUE_CONVERTER_CLASS_DOC, COMMON_GROUP, 5, Width.SHORT, VALUE_CONVERTER_CLASS_DISPLAY);
}
public ConnectorConfig() {

31
connect/runtime/src/main/java/org/apache/kafka/connect/runtime/Worker.java

@ -66,8 +66,8 @@ public class Worker { @@ -66,8 +66,8 @@ public class Worker {
private final Time time;
private final String workerId;
private final WorkerConfig config;
private final Converter keyConverter;
private final Converter valueConverter;
private final Converter defaultKeyConverter;
private final Converter defaultValueConverter;
private final Converter internalKeyConverter;
private final Converter internalValueConverter;
private final OffsetBackingStore offsetBackingStore;
@ -85,10 +85,10 @@ public class Worker { @@ -85,10 +85,10 @@ public class Worker {
this.workerId = workerId;
this.time = time;
this.config = config;
this.keyConverter = config.getConfiguredInstance(WorkerConfig.KEY_CONVERTER_CLASS_CONFIG, Converter.class);
this.keyConverter.configure(config.originalsWithPrefix("key.converter."), true);
this.valueConverter = config.getConfiguredInstance(WorkerConfig.VALUE_CONVERTER_CLASS_CONFIG, Converter.class);
this.valueConverter.configure(config.originalsWithPrefix("value.converter."), false);
this.defaultKeyConverter = config.getConfiguredInstance(WorkerConfig.KEY_CONVERTER_CLASS_CONFIG, Converter.class);
this.defaultKeyConverter.configure(config.originalsWithPrefix("key.converter."), true);
this.defaultValueConverter = config.getConfiguredInstance(WorkerConfig.VALUE_CONVERTER_CLASS_CONFIG, Converter.class);
this.defaultValueConverter.configure(config.originalsWithPrefix("value.converter."), false);
this.internalKeyConverter = config.getConfiguredInstance(WorkerConfig.INTERNAL_KEY_CONVERTER_CLASS_CONFIG, Converter.class);
this.internalKeyConverter.configure(config.originalsWithPrefix("internal.key.converter."), true);
this.internalValueConverter = config.getConfiguredInstance(WorkerConfig.INTERNAL_VALUE_CONVERTER_CLASS_CONFIG, Converter.class);
@ -302,11 +302,13 @@ public class Worker { @@ -302,11 +302,13 @@ public class Worker {
* Add a new task.
* @param id Globally unique ID for this task.
* @param taskConfig the parsed task configuration
* @param connConfig the parsed connector configuration
* @param statusListener listener for notifications of task status changes
* @param initialState the initial target state that the task should be initialized to
*/
public void startTask(ConnectorTaskId id,
TaskConfig taskConfig,
ConnectorConfig connConfig,
TaskStatus.Listener statusListener,
TargetState initialState) {
log.info("Creating task {}", id);
@ -322,7 +324,18 @@ public class Worker { @@ -322,7 +324,18 @@ public class Worker {
final Task task = instantiateTask(taskClass);
log.info("Instantiated task {} with version {} of type {}", id, task.version(), taskClass.getName());
final WorkerTask workerTask = buildWorkerTask(id, task, statusListener, initialState);
Converter keyConverter = connConfig.getConfiguredInstance(WorkerConfig.KEY_CONVERTER_CLASS_CONFIG, Converter.class);
if (keyConverter != null)
keyConverter.configure(connConfig.originalsWithPrefix("key.converter."), true);
else
keyConverter = defaultKeyConverter;
Converter valueConverter = connConfig.getConfiguredInstance(WorkerConfig.VALUE_CONVERTER_CLASS_CONFIG, Converter.class);
if (valueConverter != null)
valueConverter.configure(connConfig.originalsWithPrefix("value.converter."), false);
else
valueConverter = defaultValueConverter;
final WorkerTask workerTask = buildWorkerTask(id, task, statusListener, initialState, keyConverter, valueConverter);
// Start the task before adding modifying any state, any exceptions are caught higher up the
// call chain and there's no cleanup to do here
@ -339,7 +352,9 @@ public class Worker { @@ -339,7 +352,9 @@ public class Worker {
private WorkerTask buildWorkerTask(ConnectorTaskId id,
Task task,
TaskStatus.Listener statusListener,
TargetState initialState) {
TargetState initialState,
Converter keyConverter,
Converter valueConverter) {
// Decide which type of worker task we need based on the type of task.
if (task instanceof SourceTask) {
OffsetStorageReader offsetReader = new OffsetStorageReaderImpl(offsetBackingStore, id.connector(),

6
connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/DistributedHerder.java

@ -772,9 +772,9 @@ public class DistributedHerder extends AbstractHerder implements Runnable { @@ -772,9 +772,9 @@ public class DistributedHerder extends AbstractHerder implements Runnable {
private void startTask(ConnectorTaskId taskId) {
log.info("Starting task {}", taskId);
TargetState initialState = configState.targetState(taskId.connector());
Map<String, String> configs = configState.taskConfig(taskId);
TaskConfig taskConfig = new TaskConfig(configs);
worker.startTask(taskId, taskConfig, this, initialState);
TaskConfig taskConfig = new TaskConfig(configState.taskConfig(taskId));
ConnectorConfig connConfig = new ConnectorConfig(configState.connectorConfig(taskId.connector()));
worker.startTask(taskId, taskConfig, connConfig, this, initialState);
}
// Helper for starting a connector with the given name, which will extract & parse the config, generate connector

2
connect/runtime/src/main/java/org/apache/kafka/connect/runtime/standalone/StandaloneConfig.java

@ -29,7 +29,7 @@ public class StandaloneConfig extends WorkerConfig { @@ -29,7 +29,7 @@ public class StandaloneConfig extends WorkerConfig {
* <code>offset.storage.file.filename</code>
*/
public static final String OFFSET_STORAGE_FILE_FILENAME_CONFIG = "offset.storage.file.filename";
private static final String OFFSET_STORAGE_FILE_FILENAME_DOC = "file to store offset data in";
private static final String OFFSET_STORAGE_FILE_FILENAME_DOC = "File to store offset data in";
static {
CONFIG = baseConfigDef()

15
connect/runtime/src/main/java/org/apache/kafka/connect/runtime/standalone/StandaloneHerder.java

@ -206,14 +206,16 @@ public class StandaloneHerder extends AbstractHerder { @@ -206,14 +206,16 @@ public class StandaloneHerder extends AbstractHerder {
if (!configState.contains(taskId.connector()))
cb.onCompletion(new NotFoundException("Connector " + taskId.connector() + " not found", null), null);
Map<String, String> taskConfig = configState.taskConfig(taskId);
if (taskConfig == null)
Map<String, String> taskConfigProps = configState.taskConfig(taskId);
if (taskConfigProps == null)
cb.onCompletion(new NotFoundException("Task " + taskId + " not found", null), null);
TaskConfig taskConfig = new TaskConfig(taskConfigProps);
ConnectorConfig connConfig = new ConnectorConfig(configState.connectorConfig(taskId.connector()));
TargetState targetState = configState.targetState(taskId.connector());
try {
worker.stopAndAwaitTask(taskId);
worker.startTask(taskId, new TaskConfig(taskConfig), this, targetState);
worker.startTask(taskId, taskConfig, connConfig, this, targetState);
cb.onCompletion(null, null);
} catch (Exception e) {
log.error("Failed to restart task {}", taskId, e);
@ -270,11 +272,14 @@ public class StandaloneHerder extends AbstractHerder { @@ -270,11 +272,14 @@ public class StandaloneHerder extends AbstractHerder {
}
private void createConnectorTasks(String connName, TargetState initialState) {
Map<String, String> connConfigs = configState.connectorConfig(connName);
ConnectorConfig connConfig = new ConnectorConfig(connConfigs);
for (ConnectorTaskId taskId : configState.tasks(connName)) {
Map<String, String> taskConfigMap = configState.taskConfig(taskId);
TaskConfig config = new TaskConfig(taskConfigMap);
TaskConfig taskConfig = new TaskConfig(taskConfigMap);
try {
worker.startTask(taskId, config, this, initialState);
worker.startTask(taskId, taskConfig, connConfig, this, initialState);
} catch (Throwable e) {
log.error("Failed to add task {}: ", taskId, e);
// Swallow this so we can continue updating the rest of the tasks

113
connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerTest.java

@ -23,7 +23,10 @@ import org.apache.kafka.common.utils.Time; @@ -23,7 +23,10 @@ import org.apache.kafka.common.utils.Time;
import org.apache.kafka.connect.connector.Connector;
import org.apache.kafka.connect.connector.ConnectorContext;
import org.apache.kafka.connect.connector.Task;
import org.apache.kafka.connect.data.Schema;
import org.apache.kafka.connect.data.SchemaAndValue;
import org.apache.kafka.connect.errors.ConnectException;
import org.apache.kafka.connect.json.JsonConverter;
import org.apache.kafka.connect.runtime.standalone.StandaloneConfig;
import org.apache.kafka.connect.sink.SinkTask;
import org.apache.kafka.connect.source.SourceRecord;
@ -35,6 +38,7 @@ import org.apache.kafka.connect.storage.OffsetStorageWriter; @@ -35,6 +38,7 @@ import org.apache.kafka.connect.storage.OffsetStorageWriter;
import org.apache.kafka.connect.util.ConnectorTaskId;
import org.apache.kafka.connect.util.MockTime;
import org.apache.kafka.connect.util.ThreadedTest;
import org.easymock.Capture;
import org.easymock.EasyMock;
import org.junit.Before;
import org.junit.Test;
@ -352,8 +356,8 @@ public class WorkerTest extends ThreadedTest { @@ -352,8 +356,8 @@ public class WorkerTest extends ThreadedTest {
EasyMock.eq(task),
EasyMock.anyObject(TaskStatus.Listener.class),
EasyMock.eq(TargetState.STARTED),
EasyMock.anyObject(Converter.class),
EasyMock.anyObject(Converter.class),
EasyMock.anyObject(JsonConverter.class),
EasyMock.anyObject(JsonConverter.class),
EasyMock.anyObject(KafkaProducer.class),
EasyMock.anyObject(OffsetStorageReader.class),
EasyMock.anyObject(OffsetStorageWriter.class),
@ -380,7 +384,7 @@ public class WorkerTest extends ThreadedTest { @@ -380,7 +384,7 @@ public class WorkerTest extends ThreadedTest {
worker = new Worker(WORKER_ID, new MockTime(), config, offsetBackingStore);
worker.start();
assertEquals(Collections.emptySet(), worker.taskIds());
worker.startTask(TASK_ID, new TaskConfig(origProps), taskStatusListener, TargetState.STARTED);
worker.startTask(TASK_ID, new TaskConfig(origProps), anyConnectorConfig(), taskStatusListener, TargetState.STARTED);
assertEquals(new HashSet<>(Arrays.asList(TASK_ID)), worker.taskIds());
worker.stopAndAwaitTask(TASK_ID);
assertEquals(Collections.emptySet(), worker.taskIds());
@ -420,8 +424,8 @@ public class WorkerTest extends ThreadedTest { @@ -420,8 +424,8 @@ public class WorkerTest extends ThreadedTest {
EasyMock.eq(task),
EasyMock.anyObject(TaskStatus.Listener.class),
EasyMock.eq(TargetState.STARTED),
EasyMock.anyObject(Converter.class),
EasyMock.anyObject(Converter.class),
EasyMock.anyObject(JsonConverter.class),
EasyMock.anyObject(JsonConverter.class),
EasyMock.anyObject(KafkaProducer.class),
EasyMock.anyObject(OffsetStorageReader.class),
EasyMock.anyObject(OffsetStorageWriter.class),
@ -449,12 +453,79 @@ public class WorkerTest extends ThreadedTest { @@ -449,12 +453,79 @@ public class WorkerTest extends ThreadedTest {
worker = new Worker(WORKER_ID, new MockTime(), config, offsetBackingStore);
worker.start();
worker.startTask(TASK_ID, new TaskConfig(origProps), taskStatusListener, TargetState.STARTED);
worker.startTask(TASK_ID, new TaskConfig(origProps), anyConnectorConfig(), taskStatusListener, TargetState.STARTED);
worker.stop();
PowerMock.verifyAll();
}
@Test
public void testConverterOverrides() throws Exception {
expectStartStorage();
TestSourceTask task = PowerMock.createMock(TestSourceTask.class);
WorkerSourceTask workerTask = PowerMock.createMock(WorkerSourceTask.class);
EasyMock.expect(workerTask.id()).andStubReturn(TASK_ID);
PowerMock.mockStaticPartial(Worker.class, "instantiateTask");
PowerMock.expectPrivate(Worker.class, "instantiateTask", new Object[]{TestSourceTask.class}).andReturn(task);
EasyMock.expect(task.version()).andReturn("1.0");
Capture<TestConverter> keyConverter = EasyMock.newCapture();
Capture<TestConverter> valueConverter = EasyMock.newCapture();
PowerMock.expectNew(
WorkerSourceTask.class, EasyMock.eq(TASK_ID),
EasyMock.eq(task),
EasyMock.anyObject(TaskStatus.Listener.class),
EasyMock.eq(TargetState.STARTED),
EasyMock.capture(keyConverter),
EasyMock.capture(valueConverter),
EasyMock.anyObject(KafkaProducer.class),
EasyMock.anyObject(OffsetStorageReader.class),
EasyMock.anyObject(OffsetStorageWriter.class),
EasyMock.anyObject(WorkerConfig.class),
EasyMock.anyObject(Time.class))
.andReturn(workerTask);
Map<String, String> origProps = new HashMap<>();
origProps.put(TaskConfig.TASK_CLASS_CONFIG, TestSourceTask.class.getName());
workerTask.initialize(new TaskConfig(origProps));
EasyMock.expectLastCall();
workerTask.run();
EasyMock.expectLastCall();
// Remove
workerTask.stop();
EasyMock.expectLastCall();
EasyMock.expect(workerTask.awaitStop(EasyMock.anyLong())).andStubReturn(true);
EasyMock.expectLastCall();
expectStopStorage();
PowerMock.replayAll();
worker = new Worker(WORKER_ID, new MockTime(), config, offsetBackingStore);
worker.start();
assertEquals(Collections.emptySet(), worker.taskIds());
Map<String, String> connProps = anyConnectorConfigMap();
connProps.put(ConnectorConfig.KEY_CONVERTER_CLASS_CONFIG, TestConverter.class.getName());
connProps.put("key.converter.extra.config", "foo");
connProps.put(ConnectorConfig.VALUE_CONVERTER_CLASS_CONFIG, TestConverter.class.getName());
connProps.put("value.converter.extra.config", "bar");
worker.startTask(TASK_ID, new TaskConfig(origProps), new ConnectorConfig(connProps), taskStatusListener, TargetState.STARTED);
assertEquals(new HashSet<>(Arrays.asList(TASK_ID)), worker.taskIds());
worker.stopAndAwaitTask(TASK_ID);
assertEquals(Collections.emptySet(), worker.taskIds());
// Nothing should be left, so this should effectively be a nop
worker.stop();
// Validate extra configs got passed through to overridden converters
assertEquals("foo", keyConverter.getValue().configs.get("extra.config"));
assertEquals("bar", valueConverter.getValue().configs.get("extra.config"));
PowerMock.verifyAll();
}
private void expectStartStorage() {
offsetBackingStore.configure(EasyMock.anyObject(WorkerConfig.class));
EasyMock.expectLastCall();
@ -467,6 +538,17 @@ public class WorkerTest extends ThreadedTest { @@ -467,6 +538,17 @@ public class WorkerTest extends ThreadedTest {
EasyMock.expectLastCall();
}
private Map<String, String> anyConnectorConfigMap() {
Map<String, String> props = new HashMap<>();
props.put(ConnectorConfig.NAME_CONFIG, CONNECTOR_ID);
props.put(ConnectorConfig.CONNECTOR_CLASS_CONFIG, WorkerTestConnector.class.getName());
props.put(ConnectorConfig.TASKS_MAX_CONFIG, "1");
return props;
}
private ConnectorConfig anyConnectorConfig() {
return new ConnectorConfig(anyConnectorConfigMap());
}
/* Name here needs to be unique as we are testing the aliasing mechanism */
public static class WorkerTestConnector extends Connector {
@ -527,4 +609,23 @@ public class WorkerTest extends ThreadedTest { @@ -527,4 +609,23 @@ public class WorkerTest extends ThreadedTest {
public void stop() {
}
}
public static class TestConverter implements Converter {
public Map<String, ?> configs;
@Override
public void configure(Map<String, ?> configs, boolean isKey) {
this.configs = configs;
}
@Override
public byte[] fromConnectData(String topic, Schema schema, Object value) {
return new byte[0];
}
@Override
public SchemaAndValue toConnectData(String topic, byte[] value) {
return null;
}
}
}

20
connect/runtime/src/test/java/org/apache/kafka/connect/runtime/distributed/DistributedHerderTest.java

@ -175,7 +175,7 @@ public class DistributedHerderTest { @@ -175,7 +175,7 @@ public class DistributedHerderTest {
EasyMock.expect(worker.isRunning(CONN1)).andReturn(true);
EasyMock.expect(worker.connectorTaskConfigs(CONN1, MAX_TASKS, null)).andReturn(TASK_CONFIGS);
worker.startTask(EasyMock.eq(TASK1), EasyMock.<TaskConfig>anyObject(), EasyMock.eq(herder), EasyMock.eq(TargetState.STARTED));
worker.startTask(EasyMock.eq(TASK1), EasyMock.<TaskConfig>anyObject(), EasyMock.<ConnectorConfig>anyObject(), EasyMock.eq(herder), EasyMock.eq(TargetState.STARTED));
PowerMock.expectLastCall();
member.poll(EasyMock.anyInt());
PowerMock.expectLastCall();
@ -198,7 +198,7 @@ public class DistributedHerderTest { @@ -198,7 +198,7 @@ public class DistributedHerderTest {
PowerMock.expectLastCall();
EasyMock.expect(worker.isRunning(CONN1)).andReturn(true);
EasyMock.expect(worker.connectorTaskConfigs(CONN1, MAX_TASKS, null)).andReturn(TASK_CONFIGS);
worker.startTask(EasyMock.eq(TASK1), EasyMock.<TaskConfig>anyObject(), EasyMock.eq(herder), EasyMock.eq(TargetState.STARTED));
worker.startTask(EasyMock.eq(TASK1), EasyMock.<TaskConfig>anyObject(), EasyMock.<ConnectorConfig>anyObject(), EasyMock.eq(herder), EasyMock.eq(TargetState.STARTED));
PowerMock.expectLastCall();
member.poll(EasyMock.anyInt());
PowerMock.expectLastCall();
@ -234,7 +234,7 @@ public class DistributedHerderTest { @@ -234,7 +234,7 @@ public class DistributedHerderTest {
PowerMock.expectLastCall();
EasyMock.expect(worker.isRunning(CONN1)).andReturn(true);
EasyMock.expect(worker.connectorTaskConfigs(CONN1, MAX_TASKS, null)).andReturn(TASK_CONFIGS);
worker.startTask(EasyMock.eq(TASK1), EasyMock.<TaskConfig>anyObject(), EasyMock.eq(herder), EasyMock.eq(TargetState.STARTED));
worker.startTask(EasyMock.eq(TASK1), EasyMock.<TaskConfig>anyObject(), EasyMock.<ConnectorConfig>anyObject(), EasyMock.eq(herder), EasyMock.eq(TargetState.STARTED));
PowerMock.expectLastCall();
member.poll(EasyMock.anyInt());
PowerMock.expectLastCall();
@ -530,7 +530,7 @@ public class DistributedHerderTest { @@ -530,7 +530,7 @@ public class DistributedHerderTest {
expectPostRebalanceCatchup(SNAPSHOT);
member.poll(EasyMock.anyInt());
PowerMock.expectLastCall();
worker.startTask(EasyMock.eq(TASK0), EasyMock.<TaskConfig>anyObject(), EasyMock.eq(herder), EasyMock.eq(TargetState.STARTED));
worker.startTask(EasyMock.eq(TASK0), EasyMock.<TaskConfig>anyObject(), EasyMock.<ConnectorConfig>anyObject(), EasyMock.eq(herder), EasyMock.eq(TargetState.STARTED));
PowerMock.expectLastCall();
// now handle the task restart
@ -545,7 +545,7 @@ public class DistributedHerderTest { @@ -545,7 +545,7 @@ public class DistributedHerderTest {
worker.stopAndAwaitTask(TASK0);
PowerMock.expectLastCall();
worker.startTask(EasyMock.eq(TASK0), EasyMock.<TaskConfig>anyObject(), EasyMock.eq(herder), EasyMock.eq(TargetState.STARTED));
worker.startTask(EasyMock.eq(TASK0), EasyMock.<TaskConfig>anyObject(), EasyMock.<ConnectorConfig>anyObject(), EasyMock.eq(herder), EasyMock.eq(TargetState.STARTED));
PowerMock.expectLastCall();
PowerMock.replayAll();
@ -841,7 +841,7 @@ public class DistributedHerderTest { @@ -841,7 +841,7 @@ public class DistributedHerderTest {
// join
expectRebalance(1, Collections.<String>emptyList(), Collections.singletonList(TASK0));
expectPostRebalanceCatchup(SNAPSHOT);
worker.startTask(EasyMock.eq(TASK0), EasyMock.<TaskConfig>anyObject(), EasyMock.eq(herder), EasyMock.eq(TargetState.STARTED));
worker.startTask(EasyMock.eq(TASK0), EasyMock.<TaskConfig>anyObject(), EasyMock.<ConnectorConfig>anyObject(), EasyMock.eq(herder), EasyMock.eq(TargetState.STARTED));
PowerMock.expectLastCall();
member.poll(EasyMock.anyInt());
PowerMock.expectLastCall();
@ -877,7 +877,7 @@ public class DistributedHerderTest { @@ -877,7 +877,7 @@ public class DistributedHerderTest {
// join
expectRebalance(1, Collections.<String>emptyList(), Collections.singletonList(TASK0));
expectPostRebalanceCatchup(SNAPSHOT);
worker.startTask(EasyMock.eq(TASK0), EasyMock.<TaskConfig>anyObject(), EasyMock.eq(herder), EasyMock.eq(TargetState.STARTED));
worker.startTask(EasyMock.eq(TASK0), EasyMock.<TaskConfig>anyObject(), EasyMock.<ConnectorConfig>anyObject(), EasyMock.eq(herder), EasyMock.eq(TargetState.STARTED));
PowerMock.expectLastCall();
member.poll(EasyMock.anyInt());
PowerMock.expectLastCall();
@ -918,7 +918,7 @@ public class DistributedHerderTest { @@ -918,7 +918,7 @@ public class DistributedHerderTest {
// join
expectRebalance(1, Collections.<String>emptyList(), Collections.singletonList(TASK0));
expectPostRebalanceCatchup(SNAPSHOT_PAUSED_CONN1);
worker.startTask(EasyMock.eq(TASK0), EasyMock.<TaskConfig>anyObject(), EasyMock.eq(herder), EasyMock.eq(TargetState.PAUSED));
worker.startTask(EasyMock.eq(TASK0), EasyMock.<TaskConfig>anyObject(), EasyMock.<ConnectorConfig>anyObject(), EasyMock.eq(herder), EasyMock.eq(TargetState.PAUSED));
PowerMock.expectLastCall();
member.poll(EasyMock.anyInt());
PowerMock.expectLastCall();
@ -970,7 +970,7 @@ public class DistributedHerderTest { @@ -970,7 +970,7 @@ public class DistributedHerderTest {
expectRebalance(Collections.<String>emptyList(), Collections.<ConnectorTaskId>emptyList(),
ConnectProtocol.Assignment.NO_ERROR, 1, Collections.<String>emptyList(),
Arrays.asList(TASK0));
worker.startTask(EasyMock.eq(TASK0), EasyMock.<TaskConfig>anyObject(), EasyMock.eq(herder), EasyMock.eq(TargetState.STARTED));
worker.startTask(EasyMock.eq(TASK0), EasyMock.<TaskConfig>anyObject(), EasyMock.<ConnectorConfig>anyObject(), EasyMock.eq(herder), EasyMock.eq(TargetState.STARTED));
PowerMock.expectLastCall();
member.poll(EasyMock.anyInt());
PowerMock.expectLastCall();
@ -1008,7 +1008,7 @@ public class DistributedHerderTest { @@ -1008,7 +1008,7 @@ public class DistributedHerderTest {
EasyMock.eq(herder), EasyMock.eq(TargetState.STARTED));
PowerMock.expectLastCall();
EasyMock.expect(worker.connectorTaskConfigs(CONN1, MAX_TASKS, null)).andReturn(TASK_CONFIGS);
worker.startTask(EasyMock.eq(TASK1), EasyMock.<TaskConfig>anyObject(), EasyMock.eq(herder), EasyMock.eq(TargetState.STARTED));
worker.startTask(EasyMock.eq(TASK1), EasyMock.<TaskConfig>anyObject(), EasyMock.<ConnectorConfig>anyObject(), EasyMock.eq(herder), EasyMock.eq(TargetState.STARTED));
PowerMock.expectLastCall();
EasyMock.expect(worker.isRunning(CONN1)).andReturn(true);
member.poll(EasyMock.anyInt());

117
connect/runtime/src/test/java/org/apache/kafka/connect/runtime/standalone/StandaloneHerderTest.java

@ -73,6 +73,10 @@ public class StandaloneHerderTest { @@ -73,6 +73,10 @@ public class StandaloneHerderTest {
private static final int DEFAULT_MAX_TASKS = 1;
private static final String WORKER_ID = "localhost:8083";
private enum SourceSink {
SOURCE, SINK
};
private StandaloneHerder herder;
private Connector connector;
@ -88,11 +92,11 @@ public class StandaloneHerderTest { @@ -88,11 +92,11 @@ public class StandaloneHerderTest {
@Test
public void testCreateSourceConnector() throws Exception {
connector = PowerMock.createMock(BogusSourceConnector.class);
expectAdd(CONNECTOR_NAME, BogusSourceConnector.class, BogusSourceTask.class, false);
expectAdd(SourceSink.SOURCE);
PowerMock.replayAll();
herder.putConnectorConfig(CONNECTOR_NAME, connectorConfig(CONNECTOR_NAME, BogusSourceConnector.class, false), false, createCallback);
herder.putConnectorConfig(CONNECTOR_NAME, connectorConfig(SourceSink.SOURCE), false, createCallback);
PowerMock.verifyAll();
}
@ -101,7 +105,7 @@ public class StandaloneHerderTest { @@ -101,7 +105,7 @@ public class StandaloneHerderTest {
public void testCreateConnectorAlreadyExists() throws Exception {
connector = PowerMock.createMock(BogusSourceConnector.class);
// First addition should succeed
expectAdd(CONNECTOR_NAME, BogusSourceConnector.class, BogusSourceTask.class, false);
expectAdd(SourceSink.SOURCE);
// Second should fail
createCallback.onCompletion(EasyMock.<AlreadyExistsException>anyObject(), EasyMock.<Herder.Created<ConnectorInfo>>isNull());
@ -109,8 +113,8 @@ public class StandaloneHerderTest { @@ -109,8 +113,8 @@ public class StandaloneHerderTest {
PowerMock.replayAll();
herder.putConnectorConfig(CONNECTOR_NAME, connectorConfig(CONNECTOR_NAME, BogusSourceConnector.class, false), false, createCallback);
herder.putConnectorConfig(CONNECTOR_NAME, connectorConfig(CONNECTOR_NAME, BogusSourceConnector.class, false), false, createCallback);
herder.putConnectorConfig(CONNECTOR_NAME, connectorConfig(SourceSink.SOURCE), false, createCallback);
herder.putConnectorConfig(CONNECTOR_NAME, connectorConfig(SourceSink.SOURCE), false, createCallback);
PowerMock.verifyAll();
}
@ -118,11 +122,11 @@ public class StandaloneHerderTest { @@ -118,11 +122,11 @@ public class StandaloneHerderTest {
@Test
public void testCreateSinkConnector() throws Exception {
connector = PowerMock.createMock(BogusSinkConnector.class);
expectAdd(CONNECTOR_NAME, BogusSinkConnector.class, BogusSinkTask.class, true);
expectAdd(SourceSink.SINK);
PowerMock.replayAll();
herder.putConnectorConfig(CONNECTOR_NAME, connectorConfig(CONNECTOR_NAME, BogusSinkConnector.class, true), false, createCallback);
herder.putConnectorConfig(CONNECTOR_NAME, connectorConfig(SourceSink.SINK), false, createCallback);
PowerMock.verifyAll();
}
@ -130,7 +134,7 @@ public class StandaloneHerderTest { @@ -130,7 +134,7 @@ public class StandaloneHerderTest {
@Test
public void testDestroyConnector() throws Exception {
connector = PowerMock.createMock(BogusSourceConnector.class);
expectAdd(CONNECTOR_NAME, BogusSourceConnector.class, BogusSourceTask.class, false);
expectAdd(SourceSink.SOURCE);
EasyMock.expect(statusBackingStore.getAll(CONNECTOR_NAME)).andReturn(Collections.<TaskStatus>emptyList());
statusBackingStore.put(new ConnectorStatus(CONNECTOR_NAME, AbstractStatus.State.DESTROYED, WORKER_ID, 0));
@ -139,7 +143,7 @@ public class StandaloneHerderTest { @@ -139,7 +143,7 @@ public class StandaloneHerderTest {
PowerMock.replayAll();
herder.putConnectorConfig(CONNECTOR_NAME, connectorConfig(CONNECTOR_NAME, BogusSourceConnector.class, false), false, createCallback);
herder.putConnectorConfig(CONNECTOR_NAME, connectorConfig(SourceSink.SOURCE), false, createCallback);
FutureCallback<Herder.Created<ConnectorInfo>> futureCb = new FutureCallback<>();
herder.putConnectorConfig(CONNECTOR_NAME, null, true, futureCb);
futureCb.get(1000L, TimeUnit.MILLISECONDS);
@ -159,18 +163,18 @@ public class StandaloneHerderTest { @@ -159,18 +163,18 @@ public class StandaloneHerderTest {
@Test
public void testRestartConnector() throws Exception {
expectAdd(CONNECTOR_NAME, BogusSourceConnector.class, BogusSourceTask.class, false);
expectAdd(SourceSink.SOURCE);
worker.stopConnector(CONNECTOR_NAME);
EasyMock.expectLastCall();
worker.startConnector(EasyMock.eq(new ConnectorConfig(connectorConfig(CONNECTOR_NAME, BogusSourceConnector.class, false))),
worker.startConnector(EasyMock.eq(new ConnectorConfig(connectorConfig(SourceSink.SOURCE))),
EasyMock.anyObject(HerderConnectorContext.class), EasyMock.eq(herder), EasyMock.eq(TargetState.STARTED));
EasyMock.expectLastCall();
PowerMock.replayAll();
herder.putConnectorConfig(CONNECTOR_NAME, connectorConfig(CONNECTOR_NAME, BogusSourceConnector.class, false), false, createCallback);
herder.putConnectorConfig(CONNECTOR_NAME, connectorConfig(SourceSink.SOURCE), false, createCallback);
FutureCallback<Void> cb = new FutureCallback<>();
herder.restartConnector(CONNECTOR_NAME, cb);
@ -181,7 +185,7 @@ public class StandaloneHerderTest { @@ -181,7 +185,7 @@ public class StandaloneHerderTest {
@Test
public void testRestartConnectorFailureOnStop() throws Exception {
expectAdd(CONNECTOR_NAME, BogusSourceConnector.class, BogusSourceTask.class, false);
expectAdd(SourceSink.SOURCE);
RuntimeException e = new RuntimeException();
worker.stopConnector(CONNECTOR_NAME);
@ -191,7 +195,7 @@ public class StandaloneHerderTest { @@ -191,7 +195,7 @@ public class StandaloneHerderTest {
PowerMock.replayAll();
herder.putConnectorConfig(CONNECTOR_NAME, connectorConfig(CONNECTOR_NAME, BogusSourceConnector.class, false), false, createCallback);
herder.putConnectorConfig(CONNECTOR_NAME, connectorConfig(SourceSink.SOURCE), false, createCallback);
FutureCallback<Void> cb = new FutureCallback<>();
herder.restartConnector(CONNECTOR_NAME, cb);
@ -207,19 +211,19 @@ public class StandaloneHerderTest { @@ -207,19 +211,19 @@ public class StandaloneHerderTest {
@Test
public void testRestartConnectorFailureOnStart() throws Exception {
expectAdd(CONNECTOR_NAME, BogusSourceConnector.class, BogusSourceTask.class, false);
expectAdd(SourceSink.SOURCE);
worker.stopConnector(CONNECTOR_NAME);
EasyMock.expectLastCall();
RuntimeException e = new RuntimeException();
worker.startConnector(EasyMock.eq(new ConnectorConfig(connectorConfig(CONNECTOR_NAME, BogusSourceConnector.class, false))),
worker.startConnector(EasyMock.eq(new ConnectorConfig(connectorConfig(SourceSink.SOURCE))),
EasyMock.anyObject(HerderConnectorContext.class), EasyMock.eq(herder), EasyMock.eq(TargetState.STARTED));
EasyMock.expectLastCall().andThrow(e);
PowerMock.replayAll();
herder.putConnectorConfig(CONNECTOR_NAME, connectorConfig(CONNECTOR_NAME, BogusSourceConnector.class, false), false, createCallback);
herder.putConnectorConfig(CONNECTOR_NAME, connectorConfig(SourceSink.SOURCE), false, createCallback);
FutureCallback<Void> cb = new FutureCallback<>();
herder.restartConnector(CONNECTOR_NAME, cb);
@ -236,18 +240,19 @@ public class StandaloneHerderTest { @@ -236,18 +240,19 @@ public class StandaloneHerderTest {
@Test
public void testRestartTask() throws Exception {
ConnectorTaskId taskId = new ConnectorTaskId(CONNECTOR_NAME, 0);
expectAdd(CONNECTOR_NAME, BogusSourceConnector.class, BogusSourceTask.class, false);
expectAdd(SourceSink.SOURCE);
worker.stopAndAwaitTask(taskId);
EasyMock.expectLastCall();
Map<String, String> generatedTaskProps = taskConfig(BogusSourceTask.class, false);
worker.startTask(taskId, new TaskConfig(generatedTaskProps), herder, TargetState.STARTED);
ConnectorConfig connConfig = new ConnectorConfig(connectorConfig(SourceSink.SOURCE));
TaskConfig taskConfig = new TaskConfig(taskConfig(SourceSink.SOURCE));
worker.startTask(taskId, taskConfig, connConfig, herder, TargetState.STARTED);
EasyMock.expectLastCall();
PowerMock.replayAll();
herder.putConnectorConfig(CONNECTOR_NAME, connectorConfig(CONNECTOR_NAME, BogusSourceConnector.class, false), false, createCallback);
herder.putConnectorConfig(CONNECTOR_NAME, connectorConfig(SourceSink.SOURCE), false, createCallback);
FutureCallback<Void> cb = new FutureCallback<>();
herder.restartTask(taskId, cb);
@ -259,7 +264,7 @@ public class StandaloneHerderTest { @@ -259,7 +264,7 @@ public class StandaloneHerderTest {
@Test
public void testRestartTaskFailureOnStop() throws Exception {
ConnectorTaskId taskId = new ConnectorTaskId(CONNECTOR_NAME, 0);
expectAdd(CONNECTOR_NAME, BogusSourceConnector.class, BogusSourceTask.class, false);
expectAdd(SourceSink.SOURCE);
RuntimeException e = new RuntimeException();
worker.stopAndAwaitTask(taskId);
@ -269,7 +274,7 @@ public class StandaloneHerderTest { @@ -269,7 +274,7 @@ public class StandaloneHerderTest {
PowerMock.replayAll();
herder.putConnectorConfig(CONNECTOR_NAME, connectorConfig(CONNECTOR_NAME, BogusSourceConnector.class, false), false, createCallback);
herder.putConnectorConfig(CONNECTOR_NAME, connectorConfig(SourceSink.SOURCE), false, createCallback);
FutureCallback<Void> cb = new FutureCallback<>();
herder.restartTask(taskId, cb);
@ -285,19 +290,20 @@ public class StandaloneHerderTest { @@ -285,19 +290,20 @@ public class StandaloneHerderTest {
@Test
public void testRestartTaskFailureOnStart() throws Exception {
ConnectorTaskId taskId = new ConnectorTaskId(CONNECTOR_NAME, 0);
expectAdd(CONNECTOR_NAME, BogusSourceConnector.class, BogusSourceTask.class, false);
expectAdd(SourceSink.SOURCE);
worker.stopAndAwaitTask(taskId);
EasyMock.expectLastCall();
RuntimeException e = new RuntimeException();
Map<String, String> generatedTaskProps = taskConfig(BogusSourceTask.class, false);
worker.startTask(taskId, new TaskConfig(generatedTaskProps), herder, TargetState.STARTED);
ConnectorConfig connConfig = new ConnectorConfig(connectorConfig(SourceSink.SOURCE));
TaskConfig taskConfig = new TaskConfig(taskConfig(SourceSink.SOURCE));
worker.startTask(taskId, taskConfig, connConfig, herder, TargetState.STARTED);
EasyMock.expectLastCall().andThrow(e);
PowerMock.replayAll();
herder.putConnectorConfig(CONNECTOR_NAME, connectorConfig(CONNECTOR_NAME, BogusSourceConnector.class, false), false, createCallback);
herder.putConnectorConfig(CONNECTOR_NAME, connectorConfig(SourceSink.SOURCE), false, createCallback);
FutureCallback<Void> cb = new FutureCallback<>();
herder.restartTask(taskId, cb);
@ -314,7 +320,7 @@ public class StandaloneHerderTest { @@ -314,7 +320,7 @@ public class StandaloneHerderTest {
@Test
public void testCreateAndStop() throws Exception {
connector = PowerMock.createMock(BogusSourceConnector.class);
expectAdd(CONNECTOR_NAME, BogusSourceConnector.class, BogusSourceTask.class, false);
expectAdd(SourceSink.SOURCE);
// herder.stop() should stop any running connectors and tasks even if destroyConnector was not invoked
expectStop();
@ -325,7 +331,7 @@ public class StandaloneHerderTest { @@ -325,7 +331,7 @@ public class StandaloneHerderTest {
PowerMock.replayAll();
herder.putConnectorConfig(CONNECTOR_NAME, connectorConfig(CONNECTOR_NAME, BogusSourceConnector.class, false), false, createCallback);
herder.putConnectorConfig(CONNECTOR_NAME, connectorConfig(SourceSink.SOURCE), false, createCallback);
herder.stop();
PowerMock.verifyAll();
@ -333,7 +339,7 @@ public class StandaloneHerderTest { @@ -333,7 +339,7 @@ public class StandaloneHerderTest {
@Test
public void testAccessors() throws Exception {
Map<String, String> connConfig = connectorConfig(CONNECTOR_NAME, BogusSourceConnector.class, false);
Map<String, String> connConfig = connectorConfig(SourceSink.SOURCE);
Callback<Collection<String>> listConnectorsCb = PowerMock.createMock(Callback.class);
Callback<ConnectorInfo> connectorInfoCb = PowerMock.createMock(Callback.class);
@ -353,7 +359,7 @@ public class StandaloneHerderTest { @@ -353,7 +359,7 @@ public class StandaloneHerderTest {
// Create connector
connector = PowerMock.createMock(BogusSourceConnector.class);
expectAdd(CONNECTOR_NAME, BogusSourceConnector.class, BogusSourceTask.class, false);
expectAdd(SourceSink.SOURCE);
// Validate accessors with 1 connector
listConnectorsCb.onCompletion(null, Collections.singleton(CONNECTOR_NAME));
@ -364,7 +370,7 @@ public class StandaloneHerderTest { @@ -364,7 +370,7 @@ public class StandaloneHerderTest {
connectorConfigCb.onCompletion(null, connConfig);
EasyMock.expectLastCall();
TaskInfo taskInfo = new TaskInfo(new ConnectorTaskId(CONNECTOR_NAME, 0), taskConfig(BogusSourceTask.class, false));
TaskInfo taskInfo = new TaskInfo(new ConnectorTaskId(CONNECTOR_NAME, 0), taskConfig(SourceSink.SOURCE));
taskConfigsCb.onCompletion(null, Arrays.asList(taskInfo));
EasyMock.expectLastCall();
@ -388,7 +394,7 @@ public class StandaloneHerderTest { @@ -388,7 +394,7 @@ public class StandaloneHerderTest {
@Test
public void testPutConnectorConfig() throws Exception {
Map<String, String> connConfig = connectorConfig(CONNECTOR_NAME, BogusSourceConnector.class, false);
Map<String, String> connConfig = connectorConfig(SourceSink.SOURCE);
Map<String, String> newConnConfig = new HashMap<>(connConfig);
newConnConfig.put("foo", "bar");
@ -397,7 +403,7 @@ public class StandaloneHerderTest { @@ -397,7 +403,7 @@ public class StandaloneHerderTest {
// Create
connector = PowerMock.createMock(BogusSourceConnector.class);
expectAdd(CONNECTOR_NAME, BogusSourceConnector.class, BogusSourceTask.class, false);
expectAdd(SourceSink.SOURCE);
// Should get first config
connectorConfigCb.onCompletion(null, connConfig);
EasyMock.expectLastCall();
@ -411,7 +417,7 @@ public class StandaloneHerderTest { @@ -411,7 +417,7 @@ public class StandaloneHerderTest {
EasyMock.expect(worker.isRunning(CONNECTOR_NAME)).andReturn(true);
// Generate same task config, which should result in no additional action to restart tasks
EasyMock.expect(worker.connectorTaskConfigs(CONNECTOR_NAME, DEFAULT_MAX_TASKS, null))
.andReturn(Collections.singletonList(taskConfig(BogusSourceTask.class, false)));
.andReturn(Collections.singletonList(taskConfig(SourceSink.SOURCE)));
worker.isSinkConnector(CONNECTOR_NAME);
EasyMock.expectLastCall().andReturn(false);
ConnectorInfo newConnInfo = new ConnectorInfo(CONNECTOR_NAME, newConnConfig, Arrays.asList(new ConnectorTaskId(CONNECTOR_NAME, 0)));
@ -446,17 +452,14 @@ public class StandaloneHerderTest { @@ -446,17 +452,14 @@ public class StandaloneHerderTest {
PowerMock.verifyAll();
}
private void expectAdd(String name,
Class<? extends Connector> connClass,
Class<? extends Task> taskClass,
boolean sink) throws Exception {
private void expectAdd(SourceSink sourceSink) throws Exception {
Map<String, String> connectorProps = connectorConfig(name, connClass, sink);
Map<String, String> connectorProps = connectorConfig(sourceSink);
worker.startConnector(EasyMock.eq(new ConnectorConfig(connectorProps)), EasyMock.anyObject(HerderConnectorContext.class),
EasyMock.eq(herder), EasyMock.eq(TargetState.STARTED));
EasyMock.expectLastCall();
EasyMock.expect(worker.isRunning(name)).andReturn(true);
EasyMock.expect(worker.isRunning(CONNECTOR_NAME)).andReturn(true);
ConnectorInfo connInfo = new ConnectorInfo(CONNECTOR_NAME, connectorProps, Arrays.asList(new ConnectorTaskId(CONNECTOR_NAME, 0)));
createCallback.onCompletion(null, new Herder.Created<>(true, connInfo));
@ -464,16 +467,18 @@ public class StandaloneHerderTest { @@ -464,16 +467,18 @@ public class StandaloneHerderTest {
// And we should instantiate the tasks. For a sink task, we should see added properties for
// the input topic partitions
Map<String, String> generatedTaskProps = taskConfig(taskClass, sink);
ConnectorConfig connConfig = new ConnectorConfig(connectorConfig(sourceSink));
Map<String, String> generatedTaskProps = taskConfig(sourceSink);
TaskConfig taskConfig = new TaskConfig(generatedTaskProps);
EasyMock.expect(worker.connectorTaskConfigs(CONNECTOR_NAME, DEFAULT_MAX_TASKS, sink ? TOPICS_LIST : null))
EasyMock.expect(worker.connectorTaskConfigs(CONNECTOR_NAME, DEFAULT_MAX_TASKS, sourceSink == SourceSink.SINK ? TOPICS_LIST : null))
.andReturn(Collections.singletonList(generatedTaskProps));
worker.startTask(new ConnectorTaskId(CONNECTOR_NAME, 0), new TaskConfig(generatedTaskProps), herder, TargetState.STARTED);
worker.startTask(new ConnectorTaskId(CONNECTOR_NAME, 0), taskConfig, connConfig, herder, TargetState.STARTED);
EasyMock.expectLastCall();
worker.isSinkConnector(CONNECTOR_NAME);
PowerMock.expectLastCall().andReturn(sink);
PowerMock.expectLastCall().andReturn(sourceSink == SourceSink.SINK);
}
private void expectStop() {
@ -490,22 +495,24 @@ public class StandaloneHerderTest { @@ -490,22 +495,24 @@ public class StandaloneHerderTest {
expectStop();
}
private static HashMap<String, String> connectorConfig(String name, Class<? extends Connector> connClass, boolean sink) {
HashMap<String, String> connectorProps = new HashMap<>();
connectorProps.put(ConnectorConfig.NAME_CONFIG, name);
connectorProps.put(ConnectorConfig.CONNECTOR_CLASS_CONFIG, connClass.getName());
if (sink) {
connectorProps.put(SinkConnector.TOPICS_CONFIG, TOPICS_LIST_STR);
}
return connectorProps;
private static Map<String, String> connectorConfig(SourceSink sourceSink) {
Map<String, String> props = new HashMap<>();
props.put(ConnectorConfig.NAME_CONFIG, CONNECTOR_NAME);
Class<? extends Connector> connectorClass = sourceSink == SourceSink.SINK ? BogusSinkConnector.class : BogusSourceConnector.class;
props.put(ConnectorConfig.CONNECTOR_CLASS_CONFIG, connectorClass.getName());
props.put(ConnectorConfig.TASKS_MAX_CONFIG, "1");
if (sourceSink == SourceSink.SINK)
props.put(SinkTask.TOPICS_CONFIG, TOPICS_LIST_STR);
return props;
}
private static Map<String, String> taskConfig(Class<? extends Task> taskClass, boolean sink) {
private static Map<String, String> taskConfig(SourceSink sourceSink) {
HashMap<String, String> generatedTaskProps = new HashMap<>();
// Connectors can add any settings, so these are arbitrary
generatedTaskProps.put("foo", "bar");
Class<? extends Task> taskClass = sourceSink == SourceSink.SINK ? BogusSinkTask.class : BogusSourceTask.class;
generatedTaskProps.put(TaskConfig.TASK_CLASS_CONFIG, taskClass.getName());
if (sink)
if (sourceSink == SourceSink.SINK)
generatedTaskProps.put(SinkTask.TOPICS_CONFIG, TOPICS_LIST_STR);
return generatedTaskProps;
}

16
docs/connect.html

@ -43,7 +43,17 @@ In standalone mode all work is performed in a single process. This configuration @@ -43,7 +43,17 @@ In standalone mode all work is performed in a single process. This configuration
&gt; bin/connect-standalone.sh config/connect-standalone.properties connector1.properties [connector2.properties ...]
</pre>
The first parameter is the configuration for the worker. This includes settings such as the Kafka connection parameters, serialization format, and how frequently to commit offsets. The provided example should work well with a local cluster running with the default configuration provided by <code>config/server.properties</code>. It will require tweaking to use with a different configuration or production deployment.
The first parameter is the configuration for the worker. This includes settings such as the Kafka connection parameters, serialization format, and how frequently to commit offsets. The provided example should work well with a local cluster running with the default configuration provided by <code>config/server.properties</code>. It will require tweaking to use with a different configuration or production deployment. All workers (both standalone and distributed) require a few configs:
<ul>
<li><code>bootstrap.servers</code> - List of Kafka servers used to bootstrap connections to Kafka</li>
<li><code>key.converter</code> - Converter class used to convert between Kafka Connect format and the serialized form that is written to Kafka. This controls the format of the keys in messages written to or read from Kafka, and since this is independent of connectors it allows any connector to work with any serialization format. Examples of common formats include JSON and Avro.</li>
<li><code>value.converter</code> - Converter class used to convert between Kafka Connect format and the serialized form that is written to Kafka. This controls the format of the values in messages written to or read from Kafka, and since this is independent of connectors it allows any connector to work with any serialization format. Examples of common formats include JSON and Avro.</li>
</ul>
The important configuration options specific to standalone mode are:
<ul>
<li><code>offset.storage.file.filename</code> - File to store offset data in</li>
</ul>
The remaining parameters are connector configuration files. You may include as many as you want, but all will execute within the same process (on different threads).
@ -55,7 +65,7 @@ Distributed mode handles automatic balancing of work, allows you to scale up (or @@ -55,7 +65,7 @@ Distributed mode handles automatic balancing of work, allows you to scale up (or
The difference is in the class which is started and the configuration parameters which change how the Kafka Connect process decides where to store configurations, how to assign work, and where to store offsets and task statues. In the distributed mode, Kafka Connect stores the offsets, configs and task statuses in Kafka topics. It is recommended to manually create the topics for offset, configs and statuses in order to achieve the desired the number of partitions and replication factors. If the topics are not yet created when starting Kafka Connect, the topics will be auto created with default number of partitions and replication factor, which may not be best suited for its usage.
In particular, the following configuration parameters are critical to set before starting your cluster:
In particular, the following configuration parameters, in addition to the common settings mentioned above, are critical to set before starting your cluster:
<ul>
<li><code>group.id</code> (default <code>connect-cluster</code>) - unique name for the cluster, used in forming the Connect cluster group; note that this <b>must not conflict</b> with consumer group IDs</li>
<li><code>config.storage.topic</code> (default <code>connect-configs</code>) - topic to use for storing connector and task configurations; note that this should be a single partition, highly replicated topic. You may need to manually create the topic to ensure single partition for the config topic as auto created topics may have multiple partitions.</li>
@ -76,6 +86,8 @@ Most configurations are connector dependent, so they can't be outlined here. How @@ -76,6 +86,8 @@ Most configurations are connector dependent, so they can't be outlined here. How
<li><code>name</code> - Unique name for the connector. Attempting to register again with the same name will fail.</li>
<li><code>connector.class</code> - The Java class for the connector</li>
<li><code>tasks.max</code> - The maximum number of tasks that should be created for this connector. The connector may create fewer tasks if it cannot achieve this level of parallelism.</li>
<li><code>key.converter</code> - (optional) Override the default key converter set by the worker.</li>
<li><code>value.converter</code> - (optional) Override the default value converter set by the worker.</li>
</ul>
The <code>connector.class</code> config supports several formats: the full name or alias of the class for this connector. If the connector is org.apache.kafka.connect.file.FileStreamSinkConnector, you can either specify this full name or use FileStreamSink or FileStreamSinkConnector to make the configuration a bit shorter.

4
tests/kafkatest/tests/connect/connect_rest_test.py

@ -29,8 +29,8 @@ class ConnectRestApiTest(KafkaTest): @@ -29,8 +29,8 @@ class ConnectRestApiTest(KafkaTest):
FILE_SOURCE_CONNECTOR = 'org.apache.kafka.connect.file.FileStreamSourceConnector'
FILE_SINK_CONNECTOR = 'org.apache.kafka.connect.file.FileStreamSinkConnector'
FILE_SOURCE_CONFIGS = {'name', 'connector.class', 'tasks.max', 'topic', 'file'}
FILE_SINK_CONFIGS = {'name', 'connector.class', 'tasks.max', 'topics', 'file'}
FILE_SOURCE_CONFIGS = {'name', 'connector.class', 'tasks.max', 'key.converter', 'value.converter', 'topic', 'file'}
FILE_SINK_CONFIGS = {'name', 'connector.class', 'tasks.max', 'key.converter', 'value.converter', 'topics', 'file'}
INPUT_FILE = "/mnt/connect.input"
INPUT_FILE2 = "/mnt/connect.input2"

13
tests/kafkatest/tests/connect/connect_test.py

@ -63,10 +63,17 @@ class ConnectStandaloneFileTest(Test): @@ -63,10 +63,17 @@ class ConnectStandaloneFileTest(Test):
@parametrize(converter="org.apache.kafka.connect.storage.StringConverter", schemas=None)
@matrix(security_protocol=[SecurityConfig.PLAINTEXT, SecurityConfig.SASL_SSL])
def test_file_source_and_sink(self, converter="org.apache.kafka.connect.json.JsonConverter", schemas=True, security_protocol='PLAINTEXT'):
"""
Validates basic end-to-end functionality of Connect standalone using the file source and sink converters. Includes
parameterizations to test different converters (which also test per-connector converter overrides), schema/schemaless
modes, and security support.
"""
assert converter != None, "converter type must be set"
# Template parameters
self.key_converter = converter
self.value_converter = converter
# Template parameters. Note that we don't set key/value.converter. These default to JsonConverter and we validate
# converter overrides via the connector configuration.
if converter != "org.apache.kafka.connect.json.JsonConverter":
self.override_key_converter = converter
self.override_value_converter = converter
self.schemas = schemas
self.kafka = KafkaService(self.test_context, self.num_brokers, self.zk,

10
tests/kafkatest/tests/connect/templates/connect-file-sink.properties

@ -17,4 +17,12 @@ name=local-file-sink @@ -17,4 +17,12 @@ name=local-file-sink
connector.class={{ FILE_SINK_CONNECTOR }}
tasks.max=1
file={{ OUTPUT_FILE }}
topics={{ TOPIC }}
topics={{ TOPIC }}
# For testing per-connector converters
{% if override_key_converter is defined %}
key.converter={{ override_key_converter }}
{% endif %}
{% if override_key_converter is defined %}
value.converter={{ override_value_converter }}
{% endif %}

10
tests/kafkatest/tests/connect/templates/connect-file-source.properties

@ -17,4 +17,12 @@ name=local-file-source @@ -17,4 +17,12 @@ name=local-file-source
connector.class={{ FILE_SOURCE_CONNECTOR }}
tasks.max=1
file={{ INPUT_FILE }}
topic={{ TOPIC }}
topic={{ TOPIC }}
# For testing per-connector converters
{% if override_key_converter is defined %}
key.converter={{ override_key_converter }}
{% endif %}
{% if override_key_converter is defined %}
value.converter={{ override_value_converter }}
{% endif %}

Loading…
Cancel
Save