diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/DistributedHerder.java b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/DistributedHerder.java index 25420fd259b..d25bbbca35e 100644 --- a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/DistributedHerder.java +++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/DistributedHerder.java @@ -555,9 +555,9 @@ public class DistributedHerder extends AbstractHerder implements Runnable { // Note that we use the updated connector config despite the fact that we don't have an updated // snapshot yet. The existing task info should still be accurate. - Map map = configState.connectorConfig(connName); ConnectorInfo info = new ConnectorInfo(connName, config, configState.tasks(connName), - map == null ? null : connectorTypeForClass(map.get(ConnectorConfig.CONNECTOR_CLASS_CONFIG))); + // validateConnectorConfig have checked the existence of CONNECTOR_CLASS_CONFIG + connectorTypeForClass(config.get(ConnectorConfig.CONNECTOR_CLASS_CONFIG))); callback.onCompletion(null, new Created<>(!exists, info)); return null; } diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/entities/ConnectorInfo.java b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/entities/ConnectorInfo.java index 9a10d748877..f36ee74e980 100644 --- a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/entities/ConnectorInfo.java +++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/entities/ConnectorInfo.java @@ -71,12 +71,13 @@ public class ConnectorInfo { ConnectorInfo that = (ConnectorInfo) o; return Objects.equals(name, that.name) && Objects.equals(config, that.config) && - Objects.equals(tasks, that.tasks); + Objects.equals(tasks, that.tasks) && + Objects.equals(type, that.type); } @Override public int hashCode() { - return Objects.hash(name, config, tasks); + return Objects.hash(name, config, tasks, type); } }