From 4a0645863336bcdbcc71acc0d573af3791b2165d Mon Sep 17 00:00:00 2001 From: Mickael Maison Date: Fri, 10 Jun 2022 11:35:22 +0200 Subject: [PATCH] KAFKA-13780: Generate OpenAPI file for Connect REST API (#12067) New gradle task `connect:runtime:genConnectOpenAPIDocs` that generates `connect_rest.yaml` under `docs/generated`. This task is executed when `siteDocsTar` runs. --- build.gradle | 25 ++++++++++- checkstyle/import-control.xml | 1 + .../resources/ConnectorPluginsResource.java | 21 ++++++--- .../rest/resources/ConnectorsResource.java | 43 ++++++++++++++----- .../rest/resources/LoggingResource.java | 4 ++ .../runtime/rest/resources/RootResource.java | 2 + docs/connect.html | 2 + gradle/dependencies.gradle | 4 ++ gradle/openapi.template | 24 +++++++++++ 9 files changed, 107 insertions(+), 19 deletions(-) create mode 100644 gradle/openapi.template diff --git a/build.gradle b/build.gradle index 6983a971a79..064e397c152 100644 --- a/build.gradle +++ b/build.gradle @@ -41,6 +41,7 @@ plugins { id 'org.gradle.test-retry' version '1.3.1' apply false id 'org.scoverage' version '7.0.0' apply false id 'com.github.johnrengelman.shadow' version '7.1.2' apply false + id "io.swagger.core.v3.swagger-gradle-plugin" version "2.2.0" } spotless { @@ -1033,7 +1034,7 @@ project(':core') { ':connect:runtime:genConnectPredicateDocs', ':connect:runtime:genSinkConnectorConfigDocs', ':connect:runtime:genSourceConnectorConfigDocs', ':streams:genStreamsConfigDocs', 'genConsumerMetricsDocs', 'genProducerMetricsDocs', - ':connect:runtime:genConnectMetricsDocs'], type: Tar) { + ':connect:runtime:genConnectMetricsDocs', ':connect:runtime:genConnectOpenAPIDocs'], type: Tar) { archiveClassifier = 'site-docs' compression = Compression.GZIP from project.file("$rootDir/docs") @@ -2518,6 +2519,8 @@ project(':connect:runtime') { implementation libs.jettyClient implementation libs.reflections implementation libs.mavenArtifact + implementation libs.swaggerJaxrs2 + implementation libs.swaggerAnnotations testImplementation project(':clients').sourceSets.test.output testImplementation project(':core') @@ -2598,6 +2601,26 @@ project(':connect:runtime') { standardOutput = new File(generatedDocsDir, "connect_metrics.html").newOutputStream() } + task setVersionInOpenAPISpec(type: Copy) { + from "$rootDir/gradle/openapi.template" + into "$buildDir/resources/docs" + rename ('openapi.template', 'openapi.yaml') + expand(kafkaVersion: "$rootProject.version") + } + + task genConnectOpenAPIDocs(type: io.swagger.v3.plugins.gradle.tasks.ResolveTask, dependsOn: setVersionInOpenAPISpec) { + classpath = sourceSets.main.runtimeClasspath + buildClasspath = classpath + outputFileName = 'connect_rest' + outputFormat = 'YAML' + prettyPrint = 'TRUE' + sortOutput = 'TRUE' + openApiFile = file("$buildDir/resources/docs/openapi.yaml") + resourcePackages = ['org.apache.kafka.connect.runtime.rest.resources'] + if( !generatedDocsDir.exists() ) { generatedDocsDir.mkdirs() } + outputDir = file(generatedDocsDir) + } + } project(':connect:file') { diff --git a/checkstyle/import-control.xml b/checkstyle/import-control.xml index 7b5f20aea44..211d23ff60a 100644 --- a/checkstyle/import-control.xml +++ b/checkstyle/import-control.xml @@ -564,6 +564,7 @@ + diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/resources/ConnectorPluginsResource.java b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/resources/ConnectorPluginsResource.java index 2beda9fb8a1..269d4471a56 100644 --- a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/resources/ConnectorPluginsResource.java +++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/resources/ConnectorPluginsResource.java @@ -16,6 +16,8 @@ */ package org.apache.kafka.connect.runtime.rest.resources; +import io.swagger.v3.oas.annotations.Operation; +import io.swagger.v3.oas.annotations.Parameter; import org.apache.kafka.connect.runtime.ConnectorConfig; import org.apache.kafka.connect.runtime.Herder; import org.apache.kafka.connect.runtime.PredicatedTransformation; @@ -102,17 +104,18 @@ public class ConnectorPluginsResource { } @PUT - @Path("/{connectorType}/config/validate") + @Path("/{pluginName}/config/validate") + @Operation(summary = "Validate the provided configuration against the configuration definition for the specified pluginName") public ConfigInfos validateConfigs( - final @PathParam("connectorType") String connType, + final @PathParam("pluginName") String pluginName, final Map connectorConfig ) throws Throwable { String includedConnType = connectorConfig.get(ConnectorConfig.CONNECTOR_CLASS_CONFIG); if (includedConnType != null - && !normalizedPluginName(includedConnType).endsWith(normalizedPluginName(connType))) { + && !normalizedPluginName(includedConnType).endsWith(normalizedPluginName(pluginName))) { throw new BadRequestException( "Included connector type " + includedConnType + " does not match request type " - + connType + + pluginName ); } @@ -133,7 +136,10 @@ public class ConnectorPluginsResource { @GET @Path("/") - public List listConnectorPlugins(@DefaultValue("true") @QueryParam("connectorsOnly") boolean connectorsOnly) { + @Operation(summary = "List all connector plugins installed") + public List listConnectorPlugins( + @DefaultValue("true") @QueryParam("connectorsOnly") @Parameter(description = "Whether to list only connectors instead of all plugins") boolean connectorsOnly + ) { synchronized (this) { if (connectorsOnly) { return Collections.unmodifiableList(connectorPlugins.stream() @@ -146,8 +152,9 @@ public class ConnectorPluginsResource { } @GET - @Path("/{name}/config") - public List getConnectorConfigDef(final @PathParam("name") String pluginName) { + @Path("/{pluginName}/config") + @Operation(summary = "Get the configuration definition for the specified pluginName") + public List getConnectorConfigDef(final @PathParam("pluginName") String pluginName) { synchronized (this) { return herder.connectorPluginConfig(pluginName); } diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/resources/ConnectorsResource.java b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/resources/ConnectorsResource.java index dbf246f00ef..fac582de612 100644 --- a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/resources/ConnectorsResource.java +++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/resources/ConnectorsResource.java @@ -22,6 +22,8 @@ import javax.ws.rs.DefaultValue; import javax.ws.rs.core.HttpHeaders; import com.fasterxml.jackson.databind.ObjectMapper; +import io.swagger.v3.oas.annotations.Operation; +import io.swagger.v3.oas.annotations.Parameter; import org.apache.kafka.connect.errors.NotFoundException; import org.apache.kafka.connect.runtime.ConnectorConfig; import org.apache.kafka.connect.runtime.Herder; @@ -113,6 +115,7 @@ public class ConnectorsResource { @GET @Path("/") + @Operation(summary = "List all active connectors") public Response listConnectors( final @Context UriInfo uriInfo, final @Context HttpHeaders headers @@ -150,7 +153,8 @@ public class ConnectorsResource { @POST @Path("/") - public Response createConnector(final @QueryParam("forward") Boolean forward, + @Operation(summary = "Create a new connector") + public Response createConnector(final @Parameter(hidden = true) @QueryParam("forward") Boolean forward, final @Context HttpHeaders headers, final CreateConnectorRequest createRequest) throws Throwable { // Trim leading and trailing whitespaces from the connector name, replace null with empty string @@ -172,9 +176,10 @@ public class ConnectorsResource { @GET @Path("/{connector}") + @Operation(summary = "Get the details for the specified connector") public ConnectorInfo getConnector(final @PathParam("connector") String connector, final @Context HttpHeaders headers, - final @QueryParam("forward") Boolean forward) throws Throwable { + final @Parameter(hidden = true) @QueryParam("forward") Boolean forward) throws Throwable { FutureCallback cb = new FutureCallback<>(); herder.connectorInfo(connector, cb); return completeOrForwardRequest(cb, "/connectors/" + connector, "GET", headers, null, forward); @@ -182,9 +187,10 @@ public class ConnectorsResource { @GET @Path("/{connector}/config") + @Operation(summary = "Get the configuration for the specified connector") public Map getConnectorConfig(final @PathParam("connector") String connector, final @Context HttpHeaders headers, - final @QueryParam("forward") Boolean forward) throws Throwable { + final @Parameter(hidden = true) @QueryParam("forward") Boolean forward) throws Throwable { FutureCallback> cb = new FutureCallback<>(); herder.connectorConfig(connector, cb); return completeOrForwardRequest(cb, "/connectors/" + connector + "/config", "GET", headers, null, forward); @@ -192,10 +198,11 @@ public class ConnectorsResource { @GET @Path("/{connector}/tasks-config") + @Operation(summary = "Get the configuration of all tasks for the specified connector") public Map> getTasksConfig( final @PathParam("connector") String connector, final @Context HttpHeaders headers, - final @QueryParam("forward") Boolean forward) throws Throwable { + final @Parameter(hidden = true) @QueryParam("forward") Boolean forward) throws Throwable { FutureCallback>> cb = new FutureCallback<>(); herder.tasksConfig(connector, cb); return completeOrForwardRequest(cb, "/connectors/" + connector + "/tasks-config", "GET", headers, null, forward); @@ -203,12 +210,14 @@ public class ConnectorsResource { @GET @Path("/{connector}/status") + @Operation(summary = "Get the status for the specified connector") public ConnectorStateInfo getConnectorStatus(final @PathParam("connector") String connector) { return herder.connectorStatus(connector); } @GET @Path("/{connector}/topics") + @Operation(summary = "Get the list of topics actively used by the specified connector") public Response getConnectorActiveTopics(final @PathParam("connector") String connector) { if (isTopicTrackingDisabled) { throw new ConnectRestException(Response.Status.FORBIDDEN.getStatusCode(), @@ -220,6 +229,7 @@ public class ConnectorsResource { @PUT @Path("/{connector}/topics/reset") + @Operation(summary = "Reset the list of topics actively used by the specified connector") public Response resetConnectorActiveTopics(final @PathParam("connector") String connector, final @Context HttpHeaders headers) { if (isTopicTrackingDisabled) { throw new ConnectRestException(Response.Status.FORBIDDEN.getStatusCode(), @@ -235,9 +245,10 @@ public class ConnectorsResource { @PUT @Path("/{connector}/config") + @Operation(summary = "Create or reconfigure the specified connector") public Response putConnectorConfig(final @PathParam("connector") String connector, final @Context HttpHeaders headers, - final @QueryParam("forward") Boolean forward, + final @Parameter(hidden = true) @QueryParam("forward") Boolean forward, final Map connectorConfig) throws Throwable { FutureCallback> cb = new FutureCallback<>(); checkAndPutConnectorConfigName(connector, connectorConfig); @@ -257,11 +268,12 @@ public class ConnectorsResource { @POST @Path("/{connector}/restart") + @Operation(summary = "Restart the specified connector") public Response restartConnector(final @PathParam("connector") String connector, final @Context HttpHeaders headers, - final @DefaultValue("false") @QueryParam("includeTasks") Boolean includeTasks, - final @DefaultValue("false") @QueryParam("onlyFailed") Boolean onlyFailed, - final @QueryParam("forward") Boolean forward) throws Throwable { + final @DefaultValue("false") @QueryParam("includeTasks") @Parameter(description = "Whether to also restart tasks") Boolean includeTasks, + final @DefaultValue("false") @QueryParam("onlyFailed") @Parameter(description = "Whether to only restart failed tasks/connectors")Boolean onlyFailed, + final @Parameter(hidden = true) @QueryParam("forward") Boolean forward) throws Throwable { RestartRequest restartRequest = new RestartRequest(connector, onlyFailed, includeTasks); String forwardingPath = "/connectors/" + connector + "/restart"; if (restartRequest.forceRestartConnectorOnly()) { @@ -285,6 +297,8 @@ public class ConnectorsResource { @PUT @Path("/{connector}/pause") + @Operation(summary = "Pause the specified connector", + description = "This operation is idempotent and has no effects if the connector is already paused") public Response pauseConnector(@PathParam("connector") String connector, final @Context HttpHeaders headers) { herder.pauseConnector(connector); return Response.accepted().build(); @@ -292,6 +306,8 @@ public class ConnectorsResource { @PUT @Path("/{connector}/resume") + @Operation(summary = "Resume the specified connector", + description = "This operation is idempotent and has no effects if the connector is already running") public Response resumeConnector(@PathParam("connector") String connector) { herder.resumeConnector(connector); return Response.accepted().build(); @@ -299,9 +315,10 @@ public class ConnectorsResource { @GET @Path("/{connector}/tasks") + @Operation(summary = "List all tasks for the specified connector") public List getTaskConfigs(final @PathParam("connector") String connector, final @Context HttpHeaders headers, - final @QueryParam("forward") Boolean forward) throws Throwable { + final @Parameter(hidden = true) @QueryParam("forward") Boolean forward) throws Throwable { FutureCallback> cb = new FutureCallback<>(); herder.taskConfigs(connector, cb); return completeOrForwardRequest(cb, "/connectors/" + connector + "/tasks", "GET", headers, null, new TypeReference>() { @@ -310,6 +327,7 @@ public class ConnectorsResource { @POST @Path("/{connector}/tasks") + @Operation(hidden = true, summary = "This operation is only for inter-worker communications") public void putTaskConfigs(final @PathParam("connector") String connector, final @Context HttpHeaders headers, final @QueryParam("forward") Boolean forward, @@ -322,6 +340,7 @@ public class ConnectorsResource { @GET @Path("/{connector}/tasks/{task}/status") + @Operation(summary = "Get the state of the specified task for the specified connector") public ConnectorStateInfo.TaskState getTaskStatus(final @PathParam("connector") String connector, final @Context HttpHeaders headers, final @PathParam("task") Integer task) { @@ -330,10 +349,11 @@ public class ConnectorsResource { @POST @Path("/{connector}/tasks/{task}/restart") + @Operation(summary = "Restart the specified task for the specified connector") public void restartTask(final @PathParam("connector") String connector, final @PathParam("task") Integer task, final @Context HttpHeaders headers, - final @QueryParam("forward") Boolean forward) throws Throwable { + final @Parameter(hidden = true) @QueryParam("forward") Boolean forward) throws Throwable { FutureCallback cb = new FutureCallback<>(); ConnectorTaskId taskId = new ConnectorTaskId(connector, task); herder.restartTask(taskId, cb); @@ -342,9 +362,10 @@ public class ConnectorsResource { @DELETE @Path("/{connector}") + @Operation(summary = "Delete the specified connector") public void destroyConnector(final @PathParam("connector") String connector, final @Context HttpHeaders headers, - final @QueryParam("forward") Boolean forward) throws Throwable { + final @Parameter(hidden = true) @QueryParam("forward") Boolean forward) throws Throwable { FutureCallback> cb = new FutureCallback<>(); herder.deleteConnectorConfig(connector, cb); completeOrForwardRequest(cb, "/connectors/" + connector, "DELETE", headers, null, forward); diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/resources/LoggingResource.java b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/resources/LoggingResource.java index ce9ce14e974..cab9e4a5761 100644 --- a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/resources/LoggingResource.java +++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/resources/LoggingResource.java @@ -16,6 +16,7 @@ */ package org.apache.kafka.connect.runtime.rest.resources; +import io.swagger.v3.oas.annotations.Operation; import org.apache.kafka.connect.errors.NotFoundException; import org.apache.kafka.connect.runtime.rest.errors.BadRequestException; import org.apache.log4j.Level; @@ -59,6 +60,7 @@ public class LoggingResource { */ @GET @Path("/") + @Operation(summary = "List the current loggers that have their levels explicitly set and their log levels") public Response listLoggers() { Map> loggers = new TreeMap<>(); Enumeration enumeration = currentLoggers(); @@ -83,6 +85,7 @@ public class LoggingResource { */ @GET @Path("/{logger}") + @Operation(summary = "Get the log level for the specified logger") public Response getLogger(final @PathParam("logger") String namedLogger) { Objects.requireNonNull(namedLogger, "require non-null name"); @@ -120,6 +123,7 @@ public class LoggingResource { */ @PUT @Path("/{logger}") + @Operation(summary = "Set the level for the specified logger") public Response setLevel(final @PathParam("logger") String namedLogger, final Map levelMap) { String desiredLevelStr = levelMap.get("level"); diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/resources/RootResource.java b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/resources/RootResource.java index 9666bf15954..be0c2811d5b 100644 --- a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/resources/RootResource.java +++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/resources/RootResource.java @@ -16,6 +16,7 @@ */ package org.apache.kafka.connect.runtime.rest.resources; +import io.swagger.v3.oas.annotations.Operation; import org.apache.kafka.connect.runtime.Herder; import org.apache.kafka.connect.runtime.rest.entities.ServerInfo; @@ -36,6 +37,7 @@ public class RootResource { @GET @Path("/") + @Operation(summary = "Get details about this Connect worker and the id of the Kafka cluster it is connected to") public ServerInfo serverInfo() { return new ServerInfo(herder.kafkaClusterId()); } diff --git a/docs/connect.html b/docs/connect.html index be6a2ac4613..d13d25d3139 100644 --- a/docs/connect.html +++ b/docs/connect.html @@ -327,6 +327,8 @@ listeners=http://localhost:8080,https://localhost:8443
  • GET /- return basic information about the Kafka Connect cluster such as the version of the Connect worker that serves the REST request (including git commit ID of the source code) and the Kafka cluster ID that is connected to. +

    For the complete specification of the REST API, see the OpenAPI documentation

    +

    Error Reporting in Connect

    Kafka Connect provides error reporting to handle errors encountered along various stages of processing. By default, any error encountered during conversion or within transformations will cause the connector to fail. Each connector configuration can also enable tolerating such errors by skipping them, optionally writing each error and the details of the failed operation and problematic record (with various levels of detail) to the Connect application log. These mechanisms also capture errors when a sink connector is processing the messages consumed from its Kafka topics, and all of the errors can be written to a configurable "dead letter queue" (DLQ) Kafka topic.

    diff --git a/gradle/dependencies.gradle b/gradle/dependencies.gradle index acad88aed53..935e5d8d205 100644 --- a/gradle/dependencies.gradle +++ b/gradle/dependencies.gradle @@ -117,6 +117,8 @@ versions += [ slf4j: "1.7.36", snappy: "1.1.8.4", spotbugs: "4.2.2", + swaggerAnnotations: "2.2.0", + swaggerJaxrs2: "2.2.0", zinc: "1.3.5", zookeeper: "3.6.3", zstd: "1.5.2-1" @@ -200,6 +202,8 @@ libs += [ slf4jApi: "org.slf4j:slf4j-api:$versions.slf4j", slf4jlog4j: "org.slf4j:slf4j-log4j12:$versions.slf4j", snappy: "org.xerial.snappy:snappy-java:$versions.snappy", + swaggerAnnotations: "io.swagger.core.v3:swagger-annotations:$versions.swaggerAnnotations", + swaggerJaxrs2: "io.swagger.core.v3:swagger-jaxrs2:$versions.swaggerJaxrs2", zookeeper: "org.apache.zookeeper:zookeeper:$versions.zookeeper", jfreechart: "jfreechart:jfreechart:$versions.jfreechart", mavenArtifact: "org.apache.maven:maven-artifact:$versions.mavenArtifact", diff --git a/gradle/openapi.template b/gradle/openapi.template new file mode 100644 index 00000000000..d15c40c0070 --- /dev/null +++ b/gradle/openapi.template @@ -0,0 +1,24 @@ +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +openapi: 3.0.0 +info: + version: $kafkaVersion + title: Kafka Connect REST API + description: "This is the documentation of the [Apache Kafka](https://kafka.apache.org) Connect REST API." + contact: + email: dev@kafka.apache.org + license: + name: Apache 2.0 + url: https://www.apache.org/licenses/LICENSE-2.0.html