Browse Source

KAFKA-7253; The returned connector type is always null when creating connector (#5470)

The null map returned from the current snapshot causes the null type in response. The connector class name can be taken from the config of request instead since we require the config should contain the connector class name.

Reviewers: Jason Gustafson <jason@confluent.io>
pull/6055/head
Chia-Ping Tsai 6 years ago committed by Jason Gustafson
parent
commit
9601315420
  1. 4
      connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/DistributedHerder.java
  2. 5
      connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/entities/ConnectorInfo.java

4
connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/DistributedHerder.java

@ -555,9 +555,9 @@ public class DistributedHerder extends AbstractHerder implements Runnable { @@ -555,9 +555,9 @@ public class DistributedHerder extends AbstractHerder implements Runnable {
// Note that we use the updated connector config despite the fact that we don't have an updated
// snapshot yet. The existing task info should still be accurate.
Map<String, String> map = configState.connectorConfig(connName);
ConnectorInfo info = new ConnectorInfo(connName, config, configState.tasks(connName),
map == null ? null : connectorTypeForClass(map.get(ConnectorConfig.CONNECTOR_CLASS_CONFIG)));
// validateConnectorConfig have checked the existence of CONNECTOR_CLASS_CONFIG
connectorTypeForClass(config.get(ConnectorConfig.CONNECTOR_CLASS_CONFIG)));
callback.onCompletion(null, new Created<>(!exists, info));
return null;
}

5
connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/entities/ConnectorInfo.java

@ -71,12 +71,13 @@ public class ConnectorInfo { @@ -71,12 +71,13 @@ public class ConnectorInfo {
ConnectorInfo that = (ConnectorInfo) o;
return Objects.equals(name, that.name) &&
Objects.equals(config, that.config) &&
Objects.equals(tasks, that.tasks);
Objects.equals(tasks, that.tasks) &&
Objects.equals(type, that.type);
}
@Override
public int hashCode() {
return Objects.hash(name, config, tasks);
return Objects.hash(name, config, tasks, type);
}
}

Loading…
Cancel
Save