@ -22,6 +22,8 @@ import javax.ws.rs.DefaultValue;
import javax.ws.rs.core.HttpHeaders ;
import javax.ws.rs.core.HttpHeaders ;
import com.fasterxml.jackson.databind.ObjectMapper ;
import com.fasterxml.jackson.databind.ObjectMapper ;
import io.swagger.v3.oas.annotations.Operation ;
import io.swagger.v3.oas.annotations.Parameter ;
import org.apache.kafka.connect.errors.NotFoundException ;
import org.apache.kafka.connect.errors.NotFoundException ;
import org.apache.kafka.connect.runtime.ConnectorConfig ;
import org.apache.kafka.connect.runtime.ConnectorConfig ;
import org.apache.kafka.connect.runtime.Herder ;
import org.apache.kafka.connect.runtime.Herder ;
@ -113,6 +115,7 @@ public class ConnectorsResource {
@GET
@GET
@Path ( "/" )
@Path ( "/" )
@Operation ( summary = "List all active connectors" )
public Response listConnectors (
public Response listConnectors (
final @Context UriInfo uriInfo ,
final @Context UriInfo uriInfo ,
final @Context HttpHeaders headers
final @Context HttpHeaders headers
@ -150,7 +153,8 @@ public class ConnectorsResource {
@POST
@POST
@Path ( "/" )
@Path ( "/" )
public Response createConnector ( final @QueryParam ( "forward" ) Boolean forward ,
@Operation ( summary = "Create a new connector" )
public Response createConnector ( final @Parameter ( hidden = true ) @QueryParam ( "forward" ) Boolean forward ,
final @Context HttpHeaders headers ,
final @Context HttpHeaders headers ,
final CreateConnectorRequest createRequest ) throws Throwable {
final CreateConnectorRequest createRequest ) throws Throwable {
// Trim leading and trailing whitespaces from the connector name, replace null with empty string
// Trim leading and trailing whitespaces from the connector name, replace null with empty string
@ -172,9 +176,10 @@ public class ConnectorsResource {
@GET
@GET
@Path ( "/{connector}" )
@Path ( "/{connector}" )
@Operation ( summary = "Get the details for the specified connector" )
public ConnectorInfo getConnector ( final @PathParam ( "connector" ) String connector ,
public ConnectorInfo getConnector ( final @PathParam ( "connector" ) String connector ,
final @Context HttpHeaders headers ,
final @Context HttpHeaders headers ,
final @QueryParam ( "forward" ) Boolean forward ) throws Throwable {
final @Parameter ( hidden = true ) @ QueryParam ( "forward" ) Boolean forward ) throws Throwable {
FutureCallback < ConnectorInfo > cb = new FutureCallback < > ( ) ;
FutureCallback < ConnectorInfo > cb = new FutureCallback < > ( ) ;
herder . connectorInfo ( connector , cb ) ;
herder . connectorInfo ( connector , cb ) ;
return completeOrForwardRequest ( cb , "/connectors/" + connector , "GET" , headers , null , forward ) ;
return completeOrForwardRequest ( cb , "/connectors/" + connector , "GET" , headers , null , forward ) ;
@ -182,9 +187,10 @@ public class ConnectorsResource {
@GET
@GET
@Path ( "/{connector}/config" )
@Path ( "/{connector}/config" )
@Operation ( summary = "Get the configuration for the specified connector" )
public Map < String , String > getConnectorConfig ( final @PathParam ( "connector" ) String connector ,
public Map < String , String > getConnectorConfig ( final @PathParam ( "connector" ) String connector ,
final @Context HttpHeaders headers ,
final @Context HttpHeaders headers ,
final @QueryParam ( "forward" ) Boolean forward ) throws Throwable {
final @Parameter ( hidden = true ) @ QueryParam ( "forward" ) Boolean forward ) throws Throwable {
FutureCallback < Map < String , String > > cb = new FutureCallback < > ( ) ;
FutureCallback < Map < String , String > > cb = new FutureCallback < > ( ) ;
herder . connectorConfig ( connector , cb ) ;
herder . connectorConfig ( connector , cb ) ;
return completeOrForwardRequest ( cb , "/connectors/" + connector + "/config" , "GET" , headers , null , forward ) ;
return completeOrForwardRequest ( cb , "/connectors/" + connector + "/config" , "GET" , headers , null , forward ) ;
@ -192,10 +198,11 @@ public class ConnectorsResource {
@GET
@GET
@Path ( "/{connector}/tasks-config" )
@Path ( "/{connector}/tasks-config" )
@Operation ( summary = "Get the configuration of all tasks for the specified connector" )
public Map < ConnectorTaskId , Map < String , String > > getTasksConfig (
public Map < ConnectorTaskId , Map < String , String > > getTasksConfig (
final @PathParam ( "connector" ) String connector ,
final @PathParam ( "connector" ) String connector ,
final @Context HttpHeaders headers ,
final @Context HttpHeaders headers ,
final @QueryParam ( "forward" ) Boolean forward ) throws Throwable {
final @Parameter ( hidden = true ) @ QueryParam ( "forward" ) Boolean forward ) throws Throwable {
FutureCallback < Map < ConnectorTaskId , Map < String , String > > > cb = new FutureCallback < > ( ) ;
FutureCallback < Map < ConnectorTaskId , Map < String , String > > > cb = new FutureCallback < > ( ) ;
herder . tasksConfig ( connector , cb ) ;
herder . tasksConfig ( connector , cb ) ;
return completeOrForwardRequest ( cb , "/connectors/" + connector + "/tasks-config" , "GET" , headers , null , forward ) ;
return completeOrForwardRequest ( cb , "/connectors/" + connector + "/tasks-config" , "GET" , headers , null , forward ) ;
@ -203,12 +210,14 @@ public class ConnectorsResource {
@GET
@GET
@Path ( "/{connector}/status" )
@Path ( "/{connector}/status" )
@Operation ( summary = "Get the status for the specified connector" )
public ConnectorStateInfo getConnectorStatus ( final @PathParam ( "connector" ) String connector ) {
public ConnectorStateInfo getConnectorStatus ( final @PathParam ( "connector" ) String connector ) {
return herder . connectorStatus ( connector ) ;
return herder . connectorStatus ( connector ) ;
}
}
@GET
@GET
@Path ( "/{connector}/topics" )
@Path ( "/{connector}/topics" )
@Operation ( summary = "Get the list of topics actively used by the specified connector" )
public Response getConnectorActiveTopics ( final @PathParam ( "connector" ) String connector ) {
public Response getConnectorActiveTopics ( final @PathParam ( "connector" ) String connector ) {
if ( isTopicTrackingDisabled ) {
if ( isTopicTrackingDisabled ) {
throw new ConnectRestException ( Response . Status . FORBIDDEN . getStatusCode ( ) ,
throw new ConnectRestException ( Response . Status . FORBIDDEN . getStatusCode ( ) ,
@ -220,6 +229,7 @@ public class ConnectorsResource {
@PUT
@PUT
@Path ( "/{connector}/topics/reset" )
@Path ( "/{connector}/topics/reset" )
@Operation ( summary = "Reset the list of topics actively used by the specified connector" )
public Response resetConnectorActiveTopics ( final @PathParam ( "connector" ) String connector , final @Context HttpHeaders headers ) {
public Response resetConnectorActiveTopics ( final @PathParam ( "connector" ) String connector , final @Context HttpHeaders headers ) {
if ( isTopicTrackingDisabled ) {
if ( isTopicTrackingDisabled ) {
throw new ConnectRestException ( Response . Status . FORBIDDEN . getStatusCode ( ) ,
throw new ConnectRestException ( Response . Status . FORBIDDEN . getStatusCode ( ) ,
@ -235,9 +245,10 @@ public class ConnectorsResource {
@PUT
@PUT
@Path ( "/{connector}/config" )
@Path ( "/{connector}/config" )
@Operation ( summary = "Create or reconfigure the specified connector" )
public Response putConnectorConfig ( final @PathParam ( "connector" ) String connector ,
public Response putConnectorConfig ( final @PathParam ( "connector" ) String connector ,
final @Context HttpHeaders headers ,
final @Context HttpHeaders headers ,
final @QueryParam ( "forward" ) Boolean forward ,
final @Parameter ( hidden = true ) @ QueryParam ( "forward" ) Boolean forward ,
final Map < String , String > connectorConfig ) throws Throwable {
final Map < String , String > connectorConfig ) throws Throwable {
FutureCallback < Herder . Created < ConnectorInfo > > cb = new FutureCallback < > ( ) ;
FutureCallback < Herder . Created < ConnectorInfo > > cb = new FutureCallback < > ( ) ;
checkAndPutConnectorConfigName ( connector , connectorConfig ) ;
checkAndPutConnectorConfigName ( connector , connectorConfig ) ;
@ -257,11 +268,12 @@ public class ConnectorsResource {
@POST
@POST
@Path ( "/{connector}/restart" )
@Path ( "/{connector}/restart" )
@Operation ( summary = "Restart the specified connector" )
public Response restartConnector ( final @PathParam ( "connector" ) String connector ,
public Response restartConnector ( final @PathParam ( "connector" ) String connector ,
final @Context HttpHeaders headers ,
final @Context HttpHeaders headers ,
final @DefaultValue ( "false" ) @QueryParam ( "includeTasks" ) Boolean includeTasks ,
final @DefaultValue ( "false" ) @QueryParam ( "includeTasks" ) @Parameter ( description = "Whether to also restart tasks" ) Boolean includeTasks ,
final @DefaultValue ( "false" ) @QueryParam ( "onlyFailed" ) Boolean onlyFailed ,
final @DefaultValue ( "false" ) @QueryParam ( "onlyFailed" ) @Parameter ( description = "Whether to only restart failed tasks/connectors" ) Boolean onlyFailed ,
final @QueryParam ( "forward" ) Boolean forward ) throws Throwable {
final @Parameter ( hidden = true ) @ QueryParam ( "forward" ) Boolean forward ) throws Throwable {
RestartRequest restartRequest = new RestartRequest ( connector , onlyFailed , includeTasks ) ;
RestartRequest restartRequest = new RestartRequest ( connector , onlyFailed , includeTasks ) ;
String forwardingPath = "/connectors/" + connector + "/restart" ;
String forwardingPath = "/connectors/" + connector + "/restart" ;
if ( restartRequest . forceRestartConnectorOnly ( ) ) {
if ( restartRequest . forceRestartConnectorOnly ( ) ) {
@ -285,6 +297,8 @@ public class ConnectorsResource {
@PUT
@PUT
@Path ( "/{connector}/pause" )
@Path ( "/{connector}/pause" )
@Operation ( summary = "Pause the specified connector" ,
description = "This operation is idempotent and has no effects if the connector is already paused" )
public Response pauseConnector ( @PathParam ( "connector" ) String connector , final @Context HttpHeaders headers ) {
public Response pauseConnector ( @PathParam ( "connector" ) String connector , final @Context HttpHeaders headers ) {
herder . pauseConnector ( connector ) ;
herder . pauseConnector ( connector ) ;
return Response . accepted ( ) . build ( ) ;
return Response . accepted ( ) . build ( ) ;
@ -292,6 +306,8 @@ public class ConnectorsResource {
@PUT
@PUT
@Path ( "/{connector}/resume" )
@Path ( "/{connector}/resume" )
@Operation ( summary = "Resume the specified connector" ,
description = "This operation is idempotent and has no effects if the connector is already running" )
public Response resumeConnector ( @PathParam ( "connector" ) String connector ) {
public Response resumeConnector ( @PathParam ( "connector" ) String connector ) {
herder . resumeConnector ( connector ) ;
herder . resumeConnector ( connector ) ;
return Response . accepted ( ) . build ( ) ;
return Response . accepted ( ) . build ( ) ;
@ -299,9 +315,10 @@ public class ConnectorsResource {
@GET
@GET
@Path ( "/{connector}/tasks" )
@Path ( "/{connector}/tasks" )
@Operation ( summary = "List all tasks for the specified connector" )
public List < TaskInfo > getTaskConfigs ( final @PathParam ( "connector" ) String connector ,
public List < TaskInfo > getTaskConfigs ( final @PathParam ( "connector" ) String connector ,
final @Context HttpHeaders headers ,
final @Context HttpHeaders headers ,
final @QueryParam ( "forward" ) Boolean forward ) throws Throwable {
final @Parameter ( hidden = true ) @ QueryParam ( "forward" ) Boolean forward ) throws Throwable {
FutureCallback < List < TaskInfo > > cb = new FutureCallback < > ( ) ;
FutureCallback < List < TaskInfo > > cb = new FutureCallback < > ( ) ;
herder . taskConfigs ( connector , cb ) ;
herder . taskConfigs ( connector , cb ) ;
return completeOrForwardRequest ( cb , "/connectors/" + connector + "/tasks" , "GET" , headers , null , new TypeReference < List < TaskInfo > > ( ) {
return completeOrForwardRequest ( cb , "/connectors/" + connector + "/tasks" , "GET" , headers , null , new TypeReference < List < TaskInfo > > ( ) {
@ -310,6 +327,7 @@ public class ConnectorsResource {
@POST
@POST
@Path ( "/{connector}/tasks" )
@Path ( "/{connector}/tasks" )
@Operation ( hidden = true , summary = "This operation is only for inter-worker communications" )
public void putTaskConfigs ( final @PathParam ( "connector" ) String connector ,
public void putTaskConfigs ( final @PathParam ( "connector" ) String connector ,
final @Context HttpHeaders headers ,
final @Context HttpHeaders headers ,
final @QueryParam ( "forward" ) Boolean forward ,
final @QueryParam ( "forward" ) Boolean forward ,
@ -322,6 +340,7 @@ public class ConnectorsResource {
@GET
@GET
@Path ( "/{connector}/tasks/{task}/status" )
@Path ( "/{connector}/tasks/{task}/status" )
@Operation ( summary = "Get the state of the specified task for the specified connector" )
public ConnectorStateInfo . TaskState getTaskStatus ( final @PathParam ( "connector" ) String connector ,
public ConnectorStateInfo . TaskState getTaskStatus ( final @PathParam ( "connector" ) String connector ,
final @Context HttpHeaders headers ,
final @Context HttpHeaders headers ,
final @PathParam ( "task" ) Integer task ) {
final @PathParam ( "task" ) Integer task ) {
@ -330,10 +349,11 @@ public class ConnectorsResource {
@POST
@POST
@Path ( "/{connector}/tasks/{task}/restart" )
@Path ( "/{connector}/tasks/{task}/restart" )
@Operation ( summary = "Restart the specified task for the specified connector" )
public void restartTask ( final @PathParam ( "connector" ) String connector ,
public void restartTask ( final @PathParam ( "connector" ) String connector ,
final @PathParam ( "task" ) Integer task ,
final @PathParam ( "task" ) Integer task ,
final @Context HttpHeaders headers ,
final @Context HttpHeaders headers ,
final @QueryParam ( "forward" ) Boolean forward ) throws Throwable {
final @Parameter ( hidden = true ) @ QueryParam ( "forward" ) Boolean forward ) throws Throwable {
FutureCallback < Void > cb = new FutureCallback < > ( ) ;
FutureCallback < Void > cb = new FutureCallback < > ( ) ;
ConnectorTaskId taskId = new ConnectorTaskId ( connector , task ) ;
ConnectorTaskId taskId = new ConnectorTaskId ( connector , task ) ;
herder . restartTask ( taskId , cb ) ;
herder . restartTask ( taskId , cb ) ;
@ -342,9 +362,10 @@ public class ConnectorsResource {
@DELETE
@DELETE
@Path ( "/{connector}" )
@Path ( "/{connector}" )
@Operation ( summary = "Delete the specified connector" )
public void destroyConnector ( final @PathParam ( "connector" ) String connector ,
public void destroyConnector ( final @PathParam ( "connector" ) String connector ,
final @Context HttpHeaders headers ,
final @Context HttpHeaders headers ,
final @QueryParam ( "forward" ) Boolean forward ) throws Throwable {
final @Parameter ( hidden = true ) @ QueryParam ( "forward" ) Boolean forward ) throws Throwable {
FutureCallback < Herder . Created < ConnectorInfo > > cb = new FutureCallback < > ( ) ;
FutureCallback < Herder . Created < ConnectorInfo > > cb = new FutureCallback < > ( ) ;
herder . deleteConnectorConfig ( connector , cb ) ;
herder . deleteConnectorConfig ( connector , cb ) ;
completeOrForwardRequest ( cb , "/connectors/" + connector , "DELETE" , headers , null , forward ) ;
completeOrForwardRequest ( cb , "/connectors/" + connector , "DELETE" , headers , null , forward ) ;