Browse Source
Author: Ewen Cheslack-Postava <me@ewencp.org> Reviewers: Gwen Shapira, James Cheng Closes #378 from ewencp/kafka-2369-copycat-rest-apipull/378/merge
Ewen Cheslack-Postava
9 years ago
committed by
Gwen Shapira
51 changed files with 2799 additions and 375 deletions
@ -0,0 +1,35 @@
@@ -0,0 +1,35 @@
|
||||
/** |
||||
* Licensed to the Apache Software Foundation (ASF) under one or more |
||||
* contributor license agreements. See the NOTICE file distributed with |
||||
* this work for additional information regarding copyright ownership. |
||||
* The ASF licenses this file to You under the Apache License, Version 2.0 |
||||
* (the "License"); you may not use this file except in compliance with |
||||
* the License. You may obtain a copy of the License at |
||||
* |
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
* |
||||
* Unless required by applicable law or agreed to in writing, software |
||||
* distributed under the License is distributed on an "AS IS" BASIS, |
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
||||
* See the License for the specific language governing permissions and |
||||
* limitations under the License. |
||||
**/ |
||||
|
||||
package org.apache.kafka.copycat.errors; |
||||
|
||||
/** |
||||
* Indicates the operation tried to create an entity that already exists. |
||||
*/ |
||||
public class AlreadyExistsException extends CopycatException { |
||||
public AlreadyExistsException(String s) { |
||||
super(s); |
||||
} |
||||
|
||||
public AlreadyExistsException(String s, Throwable throwable) { |
||||
super(s, throwable); |
||||
} |
||||
|
||||
public AlreadyExistsException(Throwable throwable) { |
||||
super(throwable); |
||||
} |
||||
} |
@ -0,0 +1,35 @@
@@ -0,0 +1,35 @@
|
||||
/** |
||||
* Licensed to the Apache Software Foundation (ASF) under one or more |
||||
* contributor license agreements. See the NOTICE file distributed with |
||||
* this work for additional information regarding copyright ownership. |
||||
* The ASF licenses this file to You under the Apache License, Version 2.0 |
||||
* (the "License"); you may not use this file except in compliance with |
||||
* the License. You may obtain a copy of the License at |
||||
* |
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
* |
||||
* Unless required by applicable law or agreed to in writing, software |
||||
* distributed under the License is distributed on an "AS IS" BASIS, |
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
||||
* See the License for the specific language governing permissions and |
||||
* limitations under the License. |
||||
**/ |
||||
|
||||
package org.apache.kafka.copycat.errors; |
||||
|
||||
/** |
||||
* Indicates that an operation attempted to modify or delete a connector or task that is not present on the worker. |
||||
*/ |
||||
public class NotFoundException extends CopycatException { |
||||
public NotFoundException(String s) { |
||||
super(s); |
||||
} |
||||
|
||||
public NotFoundException(String s, Throwable throwable) { |
||||
super(s, throwable); |
||||
} |
||||
|
||||
public NotFoundException(Throwable throwable) { |
||||
super(throwable); |
||||
} |
||||
} |
@ -0,0 +1,258 @@
@@ -0,0 +1,258 @@
|
||||
/** |
||||
* Licensed to the Apache Software Foundation (ASF) under one or more |
||||
* contributor license agreements. See the NOTICE file distributed with |
||||
* this work for additional information regarding copyright ownership. |
||||
* The ASF licenses this file to You under the Apache License, Version 2.0 |
||||
* (the "License"); you may not use this file except in compliance with |
||||
* the License. You may obtain a copy of the License at |
||||
* |
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
* |
||||
* Unless required by applicable law or agreed to in writing, software |
||||
* distributed under the License is distributed on an "AS IS" BASIS, |
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
||||
* See the License for the specific language governing permissions and |
||||
* limitations under the License. |
||||
**/ |
||||
|
||||
package org.apache.kafka.copycat.runtime.rest; |
||||
|
||||
import com.fasterxml.jackson.core.type.TypeReference; |
||||
import com.fasterxml.jackson.databind.ObjectMapper; |
||||
import com.fasterxml.jackson.jaxrs.json.JacksonJsonProvider; |
||||
import org.apache.kafka.copycat.errors.CopycatException; |
||||
import org.apache.kafka.copycat.runtime.Herder; |
||||
import org.apache.kafka.copycat.runtime.WorkerConfig; |
||||
import org.apache.kafka.copycat.runtime.rest.entities.ErrorMessage; |
||||
import org.apache.kafka.copycat.runtime.rest.errors.CopycatExceptionMapper; |
||||
import org.apache.kafka.copycat.runtime.rest.errors.CopycatRestException; |
||||
import org.apache.kafka.copycat.runtime.rest.resources.ConnectorsResource; |
||||
import org.apache.kafka.copycat.runtime.rest.resources.RootResource; |
||||
import org.eclipse.jetty.server.Connector; |
||||
import org.eclipse.jetty.server.Handler; |
||||
import org.eclipse.jetty.server.Server; |
||||
import org.eclipse.jetty.server.ServerConnector; |
||||
import org.eclipse.jetty.server.Slf4jRequestLog; |
||||
import org.eclipse.jetty.server.handler.DefaultHandler; |
||||
import org.eclipse.jetty.server.handler.HandlerCollection; |
||||
import org.eclipse.jetty.server.handler.RequestLogHandler; |
||||
import org.eclipse.jetty.server.handler.StatisticsHandler; |
||||
import org.eclipse.jetty.servlet.ServletContextHandler; |
||||
import org.eclipse.jetty.servlet.ServletHolder; |
||||
import org.glassfish.jersey.server.ResourceConfig; |
||||
import org.glassfish.jersey.servlet.ServletContainer; |
||||
import org.slf4j.Logger; |
||||
import org.slf4j.LoggerFactory; |
||||
|
||||
import javax.ws.rs.core.Response; |
||||
import javax.ws.rs.core.UriBuilder; |
||||
import java.io.IOException; |
||||
import java.io.InputStream; |
||||
import java.io.OutputStream; |
||||
import java.net.HttpURLConnection; |
||||
import java.net.URL; |
||||
import java.util.List; |
||||
import java.util.Map; |
||||
|
||||
/** |
||||
* Embedded server for the REST API that provides the control plane for Copycat workers. |
||||
*/ |
||||
public class RestServer { |
||||
private static final Logger log = LoggerFactory.getLogger(RestServer.class); |
||||
|
||||
private static final long GRACEFUL_SHUTDOWN_TIMEOUT_MS = 60 * 1000; |
||||
|
||||
private static final ObjectMapper JSON_SERDE = new ObjectMapper(); |
||||
|
||||
private final WorkerConfig config; |
||||
private Herder herder; |
||||
private Server jettyServer; |
||||
|
||||
/** |
||||
* Create a REST server for this herder using the specified configs. |
||||
*/ |
||||
public RestServer(WorkerConfig config) { |
||||
this.config = config; |
||||
|
||||
// To make the advertised port available immediately, we need to do some configuration here
|
||||
String hostname = config.getString(WorkerConfig.REST_HOST_NAME_CONFIG); |
||||
Integer port = config.getInt(WorkerConfig.REST_PORT_CONFIG); |
||||
|
||||
jettyServer = new Server(); |
||||
|
||||
ServerConnector connector = new ServerConnector(jettyServer); |
||||
if (hostname != null && !hostname.isEmpty()) |
||||
connector.setHost(hostname); |
||||
connector.setPort(port); |
||||
jettyServer.setConnectors(new Connector[]{connector}); |
||||
} |
||||
|
||||
public void start(Herder herder) { |
||||
log.info("Starting REST server"); |
||||
|
||||
this.herder = herder; |
||||
|
||||
ResourceConfig resourceConfig = new ResourceConfig(); |
||||
resourceConfig.register(new JacksonJsonProvider()); |
||||
|
||||
resourceConfig.register(RootResource.class); |
||||
resourceConfig.register(new ConnectorsResource(herder)); |
||||
|
||||
resourceConfig.register(CopycatExceptionMapper.class); |
||||
|
||||
ServletContainer servletContainer = new ServletContainer(resourceConfig); |
||||
ServletHolder servletHolder = new ServletHolder(servletContainer); |
||||
|
||||
ServletContextHandler context = new ServletContextHandler(ServletContextHandler.SESSIONS); |
||||
context.setContextPath("/"); |
||||
context.addServlet(servletHolder, "/*"); |
||||
|
||||
RequestLogHandler requestLogHandler = new RequestLogHandler(); |
||||
Slf4jRequestLog requestLog = new Slf4jRequestLog(); |
||||
requestLog.setLoggerName(RestServer.class.getCanonicalName()); |
||||
requestLog.setLogLatency(true); |
||||
requestLogHandler.setRequestLog(requestLog); |
||||
|
||||
HandlerCollection handlers = new HandlerCollection(); |
||||
handlers.setHandlers(new Handler[]{context, new DefaultHandler(), requestLogHandler}); |
||||
|
||||
/* Needed for graceful shutdown as per `setStopTimeout` documentation */ |
||||
StatisticsHandler statsHandler = new StatisticsHandler(); |
||||
statsHandler.setHandler(handlers); |
||||
jettyServer.setHandler(statsHandler); |
||||
jettyServer.setStopTimeout(GRACEFUL_SHUTDOWN_TIMEOUT_MS); |
||||
jettyServer.setStopAtShutdown(true); |
||||
|
||||
try { |
||||
jettyServer.start(); |
||||
} catch (Exception e) { |
||||
throw new CopycatException("Unable to start REST server", e); |
||||
} |
||||
|
||||
log.info("REST server listening at " + jettyServer.getURI() + ", advertising URL " + advertisedUrl()); |
||||
} |
||||
|
||||
public void stop() { |
||||
try { |
||||
jettyServer.stop(); |
||||
jettyServer.join(); |
||||
} catch (Exception e) { |
||||
throw new CopycatException("Unable to stop REST server", e); |
||||
} finally { |
||||
jettyServer.destroy(); |
||||
} |
||||
} |
||||
|
||||
/** |
||||
* Get the URL to advertise to other workers and clients. This uses the default connector from the embedded Jetty |
||||
* server, unless overrides for advertised hostname and/or port are provided via configs. |
||||
*/ |
||||
public String advertisedUrl() { |
||||
UriBuilder builder = UriBuilder.fromUri(jettyServer.getURI()); |
||||
String advertisedHostname = config.getString(WorkerConfig.REST_ADVERTISED_HOST_NAME_CONFIG); |
||||
if (advertisedHostname != null && !advertisedHostname.isEmpty()) |
||||
builder.host(advertisedHostname); |
||||
Integer advertisedPort = config.getInt(WorkerConfig.REST_ADVERTISED_PORT_CONFIG); |
||||
if (advertisedPort != null) |
||||
builder.port(advertisedPort); |
||||
else |
||||
builder.port(config.getInt(WorkerConfig.REST_PORT_CONFIG)); |
||||
return builder.build().toString(); |
||||
} |
||||
|
||||
|
||||
/** |
||||
* @param url HTTP connection will be established with this url. |
||||
* @param method HTTP method ("GET", "POST", "PUT", etc.) |
||||
* @param requestBodyData Object to serialize as JSON and send in the request body. |
||||
* @param responseFormat Expected format of the response to the HTTP request. |
||||
* @param <T> The type of the deserialized response to the HTTP request. |
||||
* @return The deserialized response to the HTTP request, or null if no data is expected. |
||||
*/ |
||||
public static <T> HttpResponse<T> httpRequest(String url, String method, Object requestBodyData, |
||||
TypeReference<T> responseFormat) { |
||||
HttpURLConnection connection = null; |
||||
try { |
||||
String serializedBody = requestBodyData == null ? null : JSON_SERDE.writeValueAsString(requestBodyData); |
||||
log.debug("Sending {} with input {} to {}", method, serializedBody, url); |
||||
|
||||
connection = (HttpURLConnection) new URL(url).openConnection(); |
||||
connection.setRequestMethod(method); |
||||
|
||||
connection.setRequestProperty("User-Agent", "kafka-copycat"); |
||||
connection.setRequestProperty("Accept", "application/json"); |
||||
|
||||
// connection.getResponseCode() implicitly calls getInputStream, so always set to true.
|
||||
// On the other hand, leaving this out breaks nothing.
|
||||
connection.setDoInput(true); |
||||
|
||||
connection.setUseCaches(false); |
||||
|
||||
if (requestBodyData != null) { |
||||
connection.setRequestProperty("Content-Type", "application/json"); |
||||
connection.setDoOutput(true); |
||||
|
||||
OutputStream os = connection.getOutputStream(); |
||||
os.write(serializedBody.getBytes()); |
||||
os.flush(); |
||||
os.close(); |
||||
} |
||||
|
||||
int responseCode = connection.getResponseCode(); |
||||
if (responseCode == HttpURLConnection.HTTP_NO_CONTENT) { |
||||
return new HttpResponse<>(responseCode, connection.getHeaderFields(), null); |
||||
} else if (responseCode >= 400) { |
||||
InputStream es = connection.getErrorStream(); |
||||
ErrorMessage errorMessage = JSON_SERDE.readValue(es, ErrorMessage.class); |
||||
es.close(); |
||||
throw new CopycatRestException(responseCode, errorMessage.errorCode(), errorMessage.message()); |
||||
} else if (responseCode >= 200 && responseCode < 300) { |
||||
InputStream is = connection.getInputStream(); |
||||
T result = JSON_SERDE.readValue(is, responseFormat); |
||||
is.close(); |
||||
return new HttpResponse<>(responseCode, connection.getHeaderFields(), result); |
||||
} else { |
||||
throw new CopycatRestException(Response.Status.INTERNAL_SERVER_ERROR, |
||||
Response.Status.INTERNAL_SERVER_ERROR.getStatusCode(), |
||||
"Unexpected status code when handling forwarded request: " + responseCode); |
||||
} |
||||
} catch (IOException e) { |
||||
log.error("IO error forwarding REST request: ", e); |
||||
throw new CopycatRestException(Response.Status.INTERNAL_SERVER_ERROR, "IO Error trying to forward REST request: " + e.getMessage(), e); |
||||
} finally { |
||||
if (connection != null) |
||||
connection.disconnect(); |
||||
} |
||||
} |
||||
|
||||
public static class HttpResponse<T> { |
||||
private int status; |
||||
private Map<String, List<String>> headers; |
||||
private T body; |
||||
|
||||
public HttpResponse(int status, Map<String, List<String>> headers, T body) { |
||||
this.status = status; |
||||
this.headers = headers; |
||||
this.body = body; |
||||
} |
||||
|
||||
public int status() { |
||||
return status; |
||||
} |
||||
|
||||
public Map<String, List<String>> headers() { |
||||
return headers; |
||||
} |
||||
|
||||
public T body() { |
||||
return body; |
||||
} |
||||
} |
||||
|
||||
public static String urlJoin(String base, String path) { |
||||
if (base.endsWith("/") && path.startsWith("/")) |
||||
return base + path.substring(1); |
||||
else |
||||
return base + path; |
||||
} |
||||
} |
@ -0,0 +1,81 @@
@@ -0,0 +1,81 @@
|
||||
/** |
||||
* Licensed to the Apache Software Foundation (ASF) under one or more |
||||
* contributor license agreements. See the NOTICE file distributed with |
||||
* this work for additional information regarding copyright ownership. |
||||
* The ASF licenses this file to You under the Apache License, Version 2.0 |
||||
* (the "License"); you may not use this file except in compliance with |
||||
* the License. You may obtain a copy of the License at |
||||
* |
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
* |
||||
* Unless required by applicable law or agreed to in writing, software |
||||
* distributed under the License is distributed on an "AS IS" BASIS, |
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
||||
* See the License for the specific language governing permissions and |
||||
* limitations under the License. |
||||
**/ |
||||
|
||||
package org.apache.kafka.copycat.runtime.rest.entities; |
||||
|
||||
import com.fasterxml.jackson.annotation.JsonCreator; |
||||
import com.fasterxml.jackson.annotation.JsonProperty; |
||||
import org.apache.kafka.copycat.util.ConnectorTaskId; |
||||
|
||||
import java.util.ArrayList; |
||||
import java.util.Collection; |
||||
import java.util.List; |
||||
import java.util.Map; |
||||
import java.util.Objects; |
||||
|
||||
public class ConnectorInfo { |
||||
|
||||
private final String name; |
||||
private final Map<String, String> config; |
||||
private final List<ConnectorTaskId> tasks; |
||||
|
||||
@JsonCreator |
||||
public ConnectorInfo(@JsonProperty("name") String name, @JsonProperty("config") Map<String, String> config, |
||||
@JsonProperty("tasks") List<ConnectorTaskId> tasks) { |
||||
this.name = name; |
||||
this.config = config; |
||||
this.tasks = tasks; |
||||
} |
||||
|
||||
@JsonProperty |
||||
public String name() { |
||||
return name; |
||||
} |
||||
|
||||
@JsonProperty |
||||
public Map<String, String> config() { |
||||
return config; |
||||
} |
||||
|
||||
@JsonProperty |
||||
public List<ConnectorTaskId> tasks() { |
||||
return tasks; |
||||
} |
||||
|
||||
@Override |
||||
public boolean equals(Object o) { |
||||
if (this == o) return true; |
||||
if (o == null || getClass() != o.getClass()) return false; |
||||
ConnectorInfo that = (ConnectorInfo) o; |
||||
return Objects.equals(name, that.name) && |
||||
Objects.equals(config, that.config) && |
||||
Objects.equals(tasks, that.tasks); |
||||
} |
||||
|
||||
@Override |
||||
public int hashCode() { |
||||
return Objects.hash(name, config, tasks); |
||||
} |
||||
|
||||
|
||||
private static List<ConnectorTaskId> jsonTasks(Collection<org.apache.kafka.copycat.util.ConnectorTaskId> tasks) { |
||||
List<ConnectorTaskId> jsonTasks = new ArrayList<>(); |
||||
for (ConnectorTaskId task : tasks) |
||||
jsonTasks.add(task); |
||||
return jsonTasks; |
||||
} |
||||
} |
@ -0,0 +1,59 @@
@@ -0,0 +1,59 @@
|
||||
/** |
||||
* Licensed to the Apache Software Foundation (ASF) under one or more |
||||
* contributor license agreements. See the NOTICE file distributed with |
||||
* this work for additional information regarding copyright ownership. |
||||
* The ASF licenses this file to You under the Apache License, Version 2.0 |
||||
* (the "License"); you may not use this file except in compliance with |
||||
* the License. You may obtain a copy of the License at |
||||
* |
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
* |
||||
* Unless required by applicable law or agreed to in writing, software |
||||
* distributed under the License is distributed on an "AS IS" BASIS, |
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
||||
* See the License for the specific language governing permissions and |
||||
* limitations under the License. |
||||
**/ |
||||
|
||||
package org.apache.kafka.copycat.runtime.rest.entities; |
||||
|
||||
import com.fasterxml.jackson.annotation.JsonCreator; |
||||
import com.fasterxml.jackson.annotation.JsonProperty; |
||||
|
||||
import java.util.Map; |
||||
import java.util.Objects; |
||||
|
||||
public class CreateConnectorRequest { |
||||
private final String name; |
||||
private final Map<String, String> config; |
||||
|
||||
@JsonCreator |
||||
public CreateConnectorRequest(@JsonProperty("name") String name, @JsonProperty("config") Map<String, String> config) { |
||||
this.name = name; |
||||
this.config = config; |
||||
} |
||||
|
||||
@JsonProperty |
||||
public String name() { |
||||
return name; |
||||
} |
||||
|
||||
@JsonProperty |
||||
public Map<String, String> config() { |
||||
return config; |
||||
} |
||||
|
||||
@Override |
||||
public boolean equals(Object o) { |
||||
if (this == o) return true; |
||||
if (o == null || getClass() != o.getClass()) return false; |
||||
CreateConnectorRequest that = (CreateConnectorRequest) o; |
||||
return Objects.equals(name, that.name) && |
||||
Objects.equals(config, that.config); |
||||
} |
||||
|
||||
@Override |
||||
public int hashCode() { |
||||
return Objects.hash(name, config); |
||||
} |
||||
} |
@ -0,0 +1,63 @@
@@ -0,0 +1,63 @@
|
||||
/** |
||||
* Licensed to the Apache Software Foundation (ASF) under one or more |
||||
* contributor license agreements. See the NOTICE file distributed with |
||||
* this work for additional information regarding copyright ownership. |
||||
* The ASF licenses this file to You under the Apache License, Version 2.0 |
||||
* (the "License"); you may not use this file except in compliance with |
||||
* the License. You may obtain a copy of the License at |
||||
* |
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
* |
||||
* Unless required by applicable law or agreed to in writing, software |
||||
* distributed under the License is distributed on an "AS IS" BASIS, |
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
||||
* See the License for the specific language governing permissions and |
||||
* limitations under the License. |
||||
**/ |
||||
|
||||
package org.apache.kafka.copycat.runtime.rest.entities; |
||||
|
||||
import com.fasterxml.jackson.annotation.JsonCreator; |
||||
import com.fasterxml.jackson.annotation.JsonProperty; |
||||
|
||||
import java.util.Objects; |
||||
|
||||
/** |
||||
* Standard error format for all REST API failures. These are generated automatically by |
||||
* {@link org.apache.kafka.copycat.runtime.rest.errors.CopycatExceptionMapper} in response to uncaught |
||||
* {@link org.apache.kafka.copycat.errors.CopycatException}s. |
||||
*/ |
||||
public class ErrorMessage { |
||||
private final int errorCode; |
||||
private final String message; |
||||
|
||||
@JsonCreator |
||||
public ErrorMessage(@JsonProperty("error_code") int errorCode, @JsonProperty("message") String message) { |
||||
this.errorCode = errorCode; |
||||
this.message = message; |
||||
} |
||||
|
||||
@JsonProperty("error_code") |
||||
public int errorCode() { |
||||
return errorCode; |
||||
} |
||||
|
||||
@JsonProperty |
||||
public String message() { |
||||
return message; |
||||
} |
||||
|
||||
@Override |
||||
public boolean equals(Object o) { |
||||
if (this == o) return true; |
||||
if (o == null || getClass() != o.getClass()) return false; |
||||
ErrorMessage that = (ErrorMessage) o; |
||||
return Objects.equals(errorCode, that.errorCode) && |
||||
Objects.equals(message, that.message); |
||||
} |
||||
|
||||
@Override |
||||
public int hashCode() { |
||||
return Objects.hash(errorCode, message); |
||||
} |
||||
} |
@ -0,0 +1,41 @@
@@ -0,0 +1,41 @@
|
||||
/** |
||||
* Licensed to the Apache Software Foundation (ASF) under one or more |
||||
* contributor license agreements. See the NOTICE file distributed with |
||||
* this work for additional information regarding copyright ownership. |
||||
* The ASF licenses this file to You under the Apache License, Version 2.0 |
||||
* (the "License"); you may not use this file except in compliance with |
||||
* the License. You may obtain a copy of the License at |
||||
* |
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
* |
||||
* Unless required by applicable law or agreed to in writing, software |
||||
* distributed under the License is distributed on an "AS IS" BASIS, |
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
||||
* See the License for the specific language governing permissions and |
||||
* limitations under the License. |
||||
**/ |
||||
|
||||
package org.apache.kafka.copycat.runtime.rest.entities; |
||||
|
||||
import com.fasterxml.jackson.annotation.JsonProperty; |
||||
import org.apache.kafka.common.utils.AppInfoParser; |
||||
|
||||
public class ServerInfo { |
||||
private String version; |
||||
private String commit; |
||||
|
||||
public ServerInfo() { |
||||
version = AppInfoParser.getVersion(); |
||||
commit = AppInfoParser.getCommitId(); |
||||
} |
||||
|
||||
@JsonProperty |
||||
public String version() { |
||||
return version; |
||||
} |
||||
|
||||
@JsonProperty |
||||
public String commit() { |
||||
return commit; |
||||
} |
||||
} |
@ -0,0 +1,58 @@
@@ -0,0 +1,58 @@
|
||||
/** |
||||
* Licensed to the Apache Software Foundation (ASF) under one or more |
||||
* contributor license agreements. See the NOTICE file distributed with |
||||
* this work for additional information regarding copyright ownership. |
||||
* The ASF licenses this file to You under the Apache License, Version 2.0 |
||||
* (the "License"); you may not use this file except in compliance with |
||||
* the License. You may obtain a copy of the License at |
||||
* |
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
* |
||||
* Unless required by applicable law or agreed to in writing, software |
||||
* distributed under the License is distributed on an "AS IS" BASIS, |
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
||||
* See the License for the specific language governing permissions and |
||||
* limitations under the License. |
||||
**/ |
||||
|
||||
package org.apache.kafka.copycat.runtime.rest.entities; |
||||
|
||||
import com.fasterxml.jackson.annotation.JsonProperty; |
||||
import org.apache.kafka.copycat.util.ConnectorTaskId; |
||||
|
||||
import java.util.Map; |
||||
import java.util.Objects; |
||||
|
||||
public class TaskInfo { |
||||
private final ConnectorTaskId id; |
||||
private final Map<String, String> config; |
||||
|
||||
public TaskInfo(ConnectorTaskId id, Map<String, String> config) { |
||||
this.id = id; |
||||
this.config = config; |
||||
} |
||||
|
||||
@JsonProperty |
||||
public ConnectorTaskId id() { |
||||
return id; |
||||
} |
||||
|
||||
@JsonProperty |
||||
public Map<String, String> config() { |
||||
return config; |
||||
} |
||||
|
||||
@Override |
||||
public boolean equals(Object o) { |
||||
if (this == o) return true; |
||||
if (o == null || getClass() != o.getClass()) return false; |
||||
TaskInfo taskInfo = (TaskInfo) o; |
||||
return Objects.equals(id, taskInfo.id) && |
||||
Objects.equals(config, taskInfo.config); |
||||
} |
||||
|
||||
@Override |
||||
public int hashCode() { |
||||
return Objects.hash(id, config); |
||||
} |
||||
} |
@ -0,0 +1,60 @@
@@ -0,0 +1,60 @@
|
||||
/** |
||||
* Licensed to the Apache Software Foundation (ASF) under one or more |
||||
* contributor license agreements. See the NOTICE file distributed with |
||||
* this work for additional information regarding copyright ownership. |
||||
* The ASF licenses this file to You under the Apache License, Version 2.0 |
||||
* (the "License"); you may not use this file except in compliance with |
||||
* the License. You may obtain a copy of the License at |
||||
* |
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
* |
||||
* Unless required by applicable law or agreed to in writing, software |
||||
* distributed under the License is distributed on an "AS IS" BASIS, |
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
||||
* See the License for the specific language governing permissions and |
||||
* limitations under the License. |
||||
**/ |
||||
|
||||
package org.apache.kafka.copycat.runtime.rest.errors; |
||||
|
||||
import org.apache.kafka.copycat.errors.AlreadyExistsException; |
||||
import org.apache.kafka.copycat.errors.CopycatException; |
||||
import org.apache.kafka.copycat.errors.NotFoundException; |
||||
import org.apache.kafka.copycat.runtime.rest.entities.ErrorMessage; |
||||
import org.slf4j.Logger; |
||||
import org.slf4j.LoggerFactory; |
||||
|
||||
import javax.ws.rs.core.Response; |
||||
import javax.ws.rs.ext.ExceptionMapper; |
||||
|
||||
public class CopycatExceptionMapper implements ExceptionMapper<CopycatException> { |
||||
private static final Logger log = LoggerFactory.getLogger(CopycatExceptionMapper.class); |
||||
|
||||
@Override |
||||
public Response toResponse(CopycatException exception) { |
||||
log.debug("Uncaught exception in REST call: ", exception); |
||||
|
||||
if (exception instanceof CopycatRestException) { |
||||
CopycatRestException restException = (CopycatRestException) exception; |
||||
return Response.status(restException.statusCode()) |
||||
.entity(new ErrorMessage(restException.errorCode(), restException.getMessage())) |
||||
.build(); |
||||
} |
||||
|
||||
if (exception instanceof NotFoundException) { |
||||
return Response.status(Response.Status.NOT_FOUND) |
||||
.entity(new ErrorMessage(Response.Status.NOT_FOUND.getStatusCode(), exception.getMessage())) |
||||
.build(); |
||||
} |
||||
|
||||
if (exception instanceof AlreadyExistsException) { |
||||
return Response.status(Response.Status.CONFLICT) |
||||
.entity(new ErrorMessage(Response.Status.CONFLICT.getStatusCode(), exception.getMessage())) |
||||
.build(); |
||||
} |
||||
|
||||
return Response.status(Response.Status.INTERNAL_SERVER_ERROR) |
||||
.entity(new ErrorMessage(Response.Status.INTERNAL_SERVER_ERROR.getStatusCode(), exception.getMessage())) |
||||
.build(); |
||||
} |
||||
} |
@ -0,0 +1,70 @@
@@ -0,0 +1,70 @@
|
||||
/** |
||||
* Licensed to the Apache Software Foundation (ASF) under one or more |
||||
* contributor license agreements. See the NOTICE file distributed with |
||||
* this work for additional information regarding copyright ownership. |
||||
* The ASF licenses this file to You under the Apache License, Version 2.0 |
||||
* (the "License"); you may not use this file except in compliance with |
||||
* the License. You may obtain a copy of the License at |
||||
* |
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
* |
||||
* Unless required by applicable law or agreed to in writing, software |
||||
* distributed under the License is distributed on an "AS IS" BASIS, |
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
||||
* See the License for the specific language governing permissions and |
||||
* limitations under the License. |
||||
**/ |
||||
|
||||
package org.apache.kafka.copycat.runtime.rest.errors; |
||||
|
||||
import org.apache.kafka.copycat.errors.CopycatException; |
||||
|
||||
import javax.ws.rs.core.Response; |
||||
|
||||
public class CopycatRestException extends CopycatException { |
||||
private final int statusCode; |
||||
private final int errorCode; |
||||
|
||||
public CopycatRestException(int statusCode, int errorCode, String message, Throwable t) { |
||||
super(message, t); |
||||
this.statusCode = statusCode; |
||||
this.errorCode = errorCode; |
||||
} |
||||
|
||||
public CopycatRestException(Response.Status status, int errorCode, String message, Throwable t) { |
||||
this(status.getStatusCode(), errorCode, message, t); |
||||
} |
||||
|
||||
public CopycatRestException(int statusCode, int errorCode, String message) { |
||||
this(statusCode, errorCode, message, null); |
||||
} |
||||
|
||||
public CopycatRestException(Response.Status status, int errorCode, String message) { |
||||
this(status, errorCode, message, null); |
||||
} |
||||
|
||||
public CopycatRestException(int statusCode, String message, Throwable t) { |
||||
this(statusCode, statusCode, message, t); |
||||
} |
||||
|
||||
public CopycatRestException(Response.Status status, String message, Throwable t) { |
||||
this(status, status.getStatusCode(), message, t); |
||||
} |
||||
|
||||
public CopycatRestException(int statusCode, String message) { |
||||
this(statusCode, statusCode, message, null); |
||||
} |
||||
|
||||
public CopycatRestException(Response.Status status, String message) { |
||||
this(status.getStatusCode(), status.getStatusCode(), message, null); |
||||
} |
||||
|
||||
|
||||
public int statusCode() { |
||||
return statusCode; |
||||
} |
||||
|
||||
public int errorCode() { |
||||
return errorCode; |
||||
} |
||||
} |
@ -0,0 +1,201 @@
@@ -0,0 +1,201 @@
|
||||
/** |
||||
* Licensed to the Apache Software Foundation (ASF) under one or more |
||||
* contributor license agreements. See the NOTICE file distributed with |
||||
* this work for additional information regarding copyright ownership. |
||||
* The ASF licenses this file to You under the Apache License, Version 2.0 |
||||
* (the "License"); you may not use this file except in compliance with |
||||
* the License. You may obtain a copy of the License at |
||||
* |
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
* |
||||
* Unless required by applicable law or agreed to in writing, software |
||||
* distributed under the License is distributed on an "AS IS" BASIS, |
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
||||
* See the License for the specific language governing permissions and |
||||
* limitations under the License. |
||||
**/ |
||||
|
||||
package org.apache.kafka.copycat.runtime.rest.resources; |
||||
|
||||
import com.fasterxml.jackson.core.type.TypeReference; |
||||
import org.apache.kafka.copycat.runtime.ConnectorConfig; |
||||
import org.apache.kafka.copycat.runtime.Herder; |
||||
import org.apache.kafka.copycat.runtime.distributed.NotLeaderException; |
||||
import org.apache.kafka.copycat.runtime.rest.RestServer; |
||||
import org.apache.kafka.copycat.runtime.rest.entities.ConnectorInfo; |
||||
import org.apache.kafka.copycat.runtime.rest.entities.CreateConnectorRequest; |
||||
import org.apache.kafka.copycat.runtime.rest.entities.TaskInfo; |
||||
import org.apache.kafka.copycat.runtime.rest.errors.CopycatRestException; |
||||
import org.apache.kafka.copycat.util.FutureCallback; |
||||
|
||||
import javax.servlet.ServletContext; |
||||
import javax.ws.rs.Consumes; |
||||
import javax.ws.rs.DELETE; |
||||
import javax.ws.rs.GET; |
||||
import javax.ws.rs.POST; |
||||
import javax.ws.rs.PUT; |
||||
import javax.ws.rs.Path; |
||||
import javax.ws.rs.PathParam; |
||||
import javax.ws.rs.Produces; |
||||
import javax.ws.rs.core.MediaType; |
||||
import javax.ws.rs.core.Response; |
||||
import java.net.URI; |
||||
import java.util.Collection; |
||||
import java.util.List; |
||||
import java.util.Map; |
||||
import java.util.concurrent.ExecutionException; |
||||
import java.util.concurrent.TimeUnit; |
||||
import java.util.concurrent.TimeoutException; |
||||
|
||||
@Path("/connectors") |
||||
@Produces(MediaType.APPLICATION_JSON) |
||||
@Consumes(MediaType.APPLICATION_JSON) |
||||
public class ConnectorsResource { |
||||
// TODO: This should not be so long. However, due to potentially long rebalances that may have to wait a full
|
||||
// session timeout to complete, during which we cannot serve some requests. Ideally we could reduce this, but
|
||||
// we need to consider all possible scenarios this could fail. It might be ok to fail with a timeout in rare cases,
|
||||
// but currently a worker simply leaving the group can take this long as well.
|
||||
private static final long REQUEST_TIMEOUT_MS = 90 * 1000; |
||||
|
||||
private final Herder herder; |
||||
@javax.ws.rs.core.Context |
||||
private ServletContext context; |
||||
|
||||
public ConnectorsResource(Herder herder) { |
||||
this.herder = herder; |
||||
} |
||||
|
||||
@GET |
||||
@Path("/") |
||||
public Collection<String> listConnectors() throws Throwable { |
||||
FutureCallback<Collection<String>> cb = new FutureCallback<>(); |
||||
herder.connectors(cb); |
||||
return completeOrForwardRequest(cb, "/connectors", "GET", null, new TypeReference<Collection<String>>() { |
||||
}); |
||||
} |
||||
|
||||
@POST |
||||
@Path("/") |
||||
public Response createConnector(final CreateConnectorRequest createRequest) throws Throwable { |
||||
String name = createRequest.name(); |
||||
Map<String, String> configs = createRequest.config(); |
||||
if (!configs.containsKey(ConnectorConfig.NAME_CONFIG)) |
||||
configs.put(ConnectorConfig.NAME_CONFIG, name); |
||||
|
||||
FutureCallback<Herder.Created<ConnectorInfo>> cb = new FutureCallback<>(); |
||||
herder.putConnectorConfig(name, configs, false, cb); |
||||
Herder.Created<ConnectorInfo> info = completeOrForwardRequest(cb, "/connectors", "POST", createRequest, |
||||
new TypeReference<ConnectorInfo>() { }, new CreatedConnectorInfoTranslator()); |
||||
return Response.created(URI.create("/connectors/" + name)).entity(info.result()).build(); |
||||
} |
||||
|
||||
@GET |
||||
@Path("/{connector}") |
||||
public ConnectorInfo getConnector(final @PathParam("connector") String connector) throws Throwable { |
||||
FutureCallback<ConnectorInfo> cb = new FutureCallback<>(); |
||||
herder.connectorInfo(connector, cb); |
||||
return completeOrForwardRequest(cb, "/connectors/" + connector, "GET", null, new TypeReference<ConnectorInfo>() { |
||||
}); |
||||
} |
||||
|
||||
@GET |
||||
@Path("/{connector}/config") |
||||
public Map<String, String> getConnectorConfig(final @PathParam("connector") String connector) throws Throwable { |
||||
FutureCallback<Map<String, String>> cb = new FutureCallback<>(); |
||||
herder.connectorConfig(connector, cb); |
||||
return completeOrForwardRequest(cb, "/connectors/" + connector + "/config", "GET", null, new TypeReference<Map<String, String>>() { |
||||
}); |
||||
} |
||||
|
||||
@PUT |
||||
@Path("/{connector}/config") |
||||
public Response putConnectorConfig(final @PathParam("connector") String connector, |
||||
final Map<String, String> connectorConfig) throws Throwable { |
||||
FutureCallback<Herder.Created<ConnectorInfo>> cb = new FutureCallback<>(); |
||||
herder.putConnectorConfig(connector, connectorConfig, true, cb); |
||||
Herder.Created<ConnectorInfo> createdInfo = completeOrForwardRequest(cb, "/connectors/" + connector + "/config", |
||||
"PUT", connectorConfig, new TypeReference<ConnectorInfo>() { }, new CreatedConnectorInfoTranslator()); |
||||
Response.ResponseBuilder response; |
||||
if (createdInfo.created()) |
||||
response = Response.created(URI.create("/connectors/" + connector)); |
||||
else |
||||
response = Response.ok(); |
||||
return response.entity(createdInfo.result()).build(); |
||||
} |
||||
|
||||
@GET |
||||
@Path("/{connector}/tasks") |
||||
public List<TaskInfo> getTaskConfigs(final @PathParam("connector") String connector) throws Throwable { |
||||
FutureCallback<List<TaskInfo>> cb = new FutureCallback<>(); |
||||
herder.taskConfigs(connector, cb); |
||||
return completeOrForwardRequest(cb, "/connectors/" + connector + "/tasks", "GET", null, new TypeReference<List<TaskInfo>>() { |
||||
}); |
||||
} |
||||
|
||||
@POST |
||||
@Path("/{connector}/tasks") |
||||
public void putTaskConfigs(final @PathParam("connector") String connector, |
||||
final List<Map<String, String>> taskConfigs) throws Throwable { |
||||
FutureCallback<Void> cb = new FutureCallback<>(); |
||||
herder.putTaskConfigs(connector, taskConfigs, cb); |
||||
completeOrForwardRequest(cb, "/connectors/" + connector + "/tasks", "POST", taskConfigs); |
||||
} |
||||
|
||||
@DELETE |
||||
@Path("/{connector}") |
||||
public void destroyConnector(final @PathParam("connector") String connector) throws Throwable { |
||||
FutureCallback<Herder.Created<ConnectorInfo>> cb = new FutureCallback<>(); |
||||
herder.putConnectorConfig(connector, null, true, cb); |
||||
completeOrForwardRequest(cb, "/connectors/" + connector, "DELETE", null); |
||||
} |
||||
|
||||
// Wait for a FutureCallback to complete. If it succeeds, return the parsed response. If it fails, try to forward the
|
||||
// request to the leader.
|
||||
private <T, U> T completeOrForwardRequest( |
||||
FutureCallback<T> cb, String path, String method, Object body, TypeReference<U> resultType, |
||||
Translator<T, U> translator) throws Throwable { |
||||
try { |
||||
return cb.get(REQUEST_TIMEOUT_MS, TimeUnit.MILLISECONDS); |
||||
} catch (ExecutionException e) { |
||||
if (e.getCause() instanceof NotLeaderException) { |
||||
NotLeaderException notLeaderError = (NotLeaderException) e.getCause(); |
||||
return translator.translate(RestServer.httpRequest(RestServer.urlJoin(notLeaderError.leaderUrl(), path), method, body, resultType)); |
||||
} |
||||
|
||||
throw e.getCause(); |
||||
} catch (TimeoutException e) { |
||||
// This timeout is for the operation itself. None of the timeout error codes are relevant, so internal server
|
||||
// error is the best option
|
||||
throw new CopycatRestException(Response.Status.INTERNAL_SERVER_ERROR.getStatusCode(), "Request timed out"); |
||||
} catch (InterruptedException e) { |
||||
throw new CopycatRestException(Response.Status.INTERNAL_SERVER_ERROR.getStatusCode(), "Request interrupted"); |
||||
} |
||||
} |
||||
|
||||
private <T> T completeOrForwardRequest(FutureCallback<T> cb, String path, String method, Object body, TypeReference<T> resultType) throws Throwable { |
||||
return completeOrForwardRequest(cb, path, method, body, resultType, new IdentityTranslator<T>()); |
||||
} |
||||
|
||||
private <T> T completeOrForwardRequest(FutureCallback<T> cb, String path, String method, Object body) throws Throwable { |
||||
return completeOrForwardRequest(cb, path, method, body, null, new IdentityTranslator<T>()); |
||||
} |
||||
|
||||
private interface Translator<T, U> { |
||||
T translate(RestServer.HttpResponse<U> response); |
||||
} |
||||
|
||||
private class IdentityTranslator<T> implements Translator<T, T> { |
||||
@Override |
||||
public T translate(RestServer.HttpResponse<T> response) { |
||||
return response.body(); |
||||
} |
||||
} |
||||
|
||||
private class CreatedConnectorInfoTranslator implements Translator<Herder.Created<ConnectorInfo>, ConnectorInfo> { |
||||
@Override |
||||
public Herder.Created<ConnectorInfo> translate(RestServer.HttpResponse<ConnectorInfo> response) { |
||||
boolean created = response.status() == 201; |
||||
return new Herder.Created<>(created, response.body()); |
||||
} |
||||
} |
||||
} |
@ -0,0 +1,36 @@
@@ -0,0 +1,36 @@
|
||||
/** |
||||
* Licensed to the Apache Software Foundation (ASF) under one or more |
||||
* contributor license agreements. See the NOTICE file distributed with |
||||
* this work for additional information regarding copyright ownership. |
||||
* The ASF licenses this file to You under the Apache License, Version 2.0 |
||||
* (the "License"); you may not use this file except in compliance with |
||||
* the License. You may obtain a copy of the License at |
||||
* |
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
* |
||||
* Unless required by applicable law or agreed to in writing, software |
||||
* distributed under the License is distributed on an "AS IS" BASIS, |
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
||||
* See the License for the specific language governing permissions and |
||||
* limitations under the License. |
||||
**/ |
||||
|
||||
package org.apache.kafka.copycat.runtime.rest.resources; |
||||
|
||||
import org.apache.kafka.copycat.runtime.rest.entities.ServerInfo; |
||||
|
||||
import javax.ws.rs.GET; |
||||
import javax.ws.rs.Path; |
||||
import javax.ws.rs.Produces; |
||||
import javax.ws.rs.core.MediaType; |
||||
|
||||
@Path("/") |
||||
@Produces(MediaType.APPLICATION_JSON) |
||||
public class RootResource { |
||||
|
||||
@GET |
||||
@Path("/") |
||||
public ServerInfo serverInfo() { |
||||
return new ServerInfo(); |
||||
} |
||||
} |
@ -0,0 +1,35 @@
@@ -0,0 +1,35 @@
|
||||
/** |
||||
* Licensed to the Apache Software Foundation (ASF) under one or more |
||||
* contributor license agreements. See the NOTICE file distributed with |
||||
* this work for additional information regarding copyright ownership. |
||||
* The ASF licenses this file to You under the Apache License, Version 2.0 |
||||
* (the "License"); you may not use this file except in compliance with |
||||
* the License. You may obtain a copy of the License at |
||||
* |
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
* |
||||
* Unless required by applicable law or agreed to in writing, software |
||||
* distributed under the License is distributed on an "AS IS" BASIS, |
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
||||
* See the License for the specific language governing permissions and |
||||
* limitations under the License. |
||||
**/ |
||||
|
||||
package org.apache.kafka.copycat.runtime.standalone; |
||||
|
||||
import org.apache.kafka.common.config.ConfigDef; |
||||
import org.apache.kafka.copycat.runtime.WorkerConfig; |
||||
|
||||
import java.util.Properties; |
||||
|
||||
public class StandaloneConfig extends WorkerConfig { |
||||
private static final ConfigDef CONFIG; |
||||
|
||||
static { |
||||
CONFIG = baseConfigDef(); |
||||
} |
||||
|
||||
public StandaloneConfig(Properties props) { |
||||
super(CONFIG, props); |
||||
} |
||||
} |
@ -0,0 +1,364 @@
@@ -0,0 +1,364 @@
|
||||
/** |
||||
* Licensed to the Apache Software Foundation (ASF) under one or more |
||||
* contributor license agreements. See the NOTICE file distributed with |
||||
* this work for additional information regarding copyright ownership. |
||||
* The ASF licenses this file to You under the Apache License, Version 2.0 |
||||
* (the "License"); you may not use this file except in compliance with |
||||
* the License. You may obtain a copy of the License at |
||||
* |
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
* |
||||
* Unless required by applicable law or agreed to in writing, software |
||||
* distributed under the License is distributed on an "AS IS" BASIS, |
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
||||
* See the License for the specific language governing permissions and |
||||
* limitations under the License. |
||||
**/ |
||||
|
||||
package org.apache.kafka.copycat.runtime.rest.resources; |
||||
|
||||
import com.fasterxml.jackson.core.type.TypeReference; |
||||
import org.apache.kafka.copycat.errors.AlreadyExistsException; |
||||
import org.apache.kafka.copycat.errors.CopycatException; |
||||
import org.apache.kafka.copycat.errors.NotFoundException; |
||||
import org.apache.kafka.copycat.runtime.ConnectorConfig; |
||||
import org.apache.kafka.copycat.runtime.Herder; |
||||
import org.apache.kafka.copycat.runtime.distributed.NotLeaderException; |
||||
import org.apache.kafka.copycat.runtime.rest.RestServer; |
||||
import org.apache.kafka.copycat.runtime.rest.entities.ConnectorInfo; |
||||
import org.apache.kafka.copycat.runtime.rest.entities.CreateConnectorRequest; |
||||
import org.apache.kafka.copycat.runtime.rest.entities.TaskInfo; |
||||
import org.apache.kafka.copycat.util.Callback; |
||||
import org.apache.kafka.copycat.util.ConnectorTaskId; |
||||
import org.easymock.Capture; |
||||
import org.easymock.EasyMock; |
||||
import org.easymock.IAnswer; |
||||
import org.junit.Before; |
||||
import org.junit.Test; |
||||
import org.junit.runner.RunWith; |
||||
import org.powermock.api.easymock.PowerMock; |
||||
import org.powermock.api.easymock.annotation.Mock; |
||||
import org.powermock.core.classloader.annotations.PowerMockIgnore; |
||||
import org.powermock.core.classloader.annotations.PrepareForTest; |
||||
import org.powermock.modules.junit4.PowerMockRunner; |
||||
|
||||
import java.util.ArrayList; |
||||
import java.util.Arrays; |
||||
import java.util.Collection; |
||||
import java.util.Collections; |
||||
import java.util.HashMap; |
||||
import java.util.HashSet; |
||||
import java.util.List; |
||||
import java.util.Map; |
||||
|
||||
import static org.junit.Assert.assertEquals; |
||||
|
||||
@RunWith(PowerMockRunner.class) |
||||
@PrepareForTest(RestServer.class) |
||||
@PowerMockIgnore("javax.management.*") |
||||
public class ConnectorsResourceTest { |
||||
// Note trailing / and that we do *not* use LEADER_URL to construct our reference values. This checks that we handle
|
||||
// URL construction properly, avoiding //, which will mess up routing in the REST server
|
||||
private static final String LEADER_URL = "http://leader:8083/"; |
||||
private static final String CONNECTOR_NAME = "test"; |
||||
private static final String CONNECTOR2_NAME = "test2"; |
||||
private static final Map<String, String> CONNECTOR_CONFIG = new HashMap<>(); |
||||
static { |
||||
CONNECTOR_CONFIG.put("name", CONNECTOR_NAME); |
||||
CONNECTOR_CONFIG.put("sample_config", "test_config"); |
||||
} |
||||
private static final List<ConnectorTaskId> CONNECTOR_TASK_NAMES = Arrays.asList( |
||||
new ConnectorTaskId(CONNECTOR_NAME, 0), |
||||
new ConnectorTaskId(CONNECTOR_NAME, 1) |
||||
); |
||||
private static final List<Map<String, String>> TASK_CONFIGS = new ArrayList<>(); |
||||
static { |
||||
TASK_CONFIGS.add(Collections.singletonMap("config", "value")); |
||||
TASK_CONFIGS.add(Collections.singletonMap("config", "other_value")); |
||||
} |
||||
private static final List<TaskInfo> TASK_INFOS = new ArrayList<>(); |
||||
static { |
||||
TASK_INFOS.add(new TaskInfo(new ConnectorTaskId(CONNECTOR_NAME, 0), TASK_CONFIGS.get(0))); |
||||
TASK_INFOS.add(new TaskInfo(new ConnectorTaskId(CONNECTOR_NAME, 1), TASK_CONFIGS.get(1))); |
||||
} |
||||
|
||||
|
||||
@Mock |
||||
private Herder herder; |
||||
private ConnectorsResource connectorsResource; |
||||
|
||||
@Before |
||||
public void setUp() throws NoSuchMethodException { |
||||
PowerMock.mockStatic(RestServer.class, |
||||
RestServer.class.getMethod("httpRequest", String.class, String.class, Object.class, TypeReference.class)); |
||||
connectorsResource = new ConnectorsResource(herder); |
||||
} |
||||
|
||||
@Test |
||||
public void testListConnectors() throws Throwable { |
||||
final Capture<Callback<Collection<String>>> cb = Capture.newInstance(); |
||||
herder.connectors(EasyMock.capture(cb)); |
||||
expectAndCallbackResult(cb, Arrays.asList(CONNECTOR2_NAME, CONNECTOR_NAME)); |
||||
|
||||
PowerMock.replayAll(); |
||||
|
||||
Collection<String> connectors = connectorsResource.listConnectors(); |
||||
// Ordering isn't guaranteed, compare sets
|
||||
assertEquals(new HashSet<>(Arrays.asList(CONNECTOR_NAME, CONNECTOR2_NAME)), new HashSet<>(connectors)); |
||||
|
||||
PowerMock.verifyAll(); |
||||
} |
||||
|
||||
@Test |
||||
public void testListConnectorsNotLeader() throws Throwable { |
||||
final Capture<Callback<Collection<String>>> cb = Capture.newInstance(); |
||||
herder.connectors(EasyMock.capture(cb)); |
||||
expectAndCallbackNotLeaderException(cb); |
||||
// Should forward request
|
||||
EasyMock.expect(RestServer.httpRequest(EasyMock.eq("http://leader:8083/connectors"), EasyMock.eq("GET"), |
||||
EasyMock.isNull(), EasyMock.anyObject(TypeReference.class))) |
||||
.andReturn(new RestServer.HttpResponse<>(200, new HashMap<String, List<String>>(), Arrays.asList(CONNECTOR2_NAME, CONNECTOR_NAME))); |
||||
|
||||
PowerMock.replayAll(); |
||||
|
||||
Collection<String> connectors = connectorsResource.listConnectors(); |
||||
// Ordering isn't guaranteed, compare sets
|
||||
assertEquals(new HashSet<>(Arrays.asList(CONNECTOR_NAME, CONNECTOR2_NAME)), new HashSet<>(connectors)); |
||||
|
||||
PowerMock.verifyAll(); |
||||
} |
||||
|
||||
@Test(expected = CopycatException.class) |
||||
public void testListConnectorsNotSynced() throws Throwable { |
||||
final Capture<Callback<Collection<String>>> cb = Capture.newInstance(); |
||||
herder.connectors(EasyMock.capture(cb)); |
||||
expectAndCallbackException(cb, new CopycatException("not synced")); |
||||
|
||||
PowerMock.replayAll(); |
||||
|
||||
// throws
|
||||
connectorsResource.listConnectors(); |
||||
} |
||||
|
||||
@Test |
||||
public void testCreateConnector() throws Throwable { |
||||
CreateConnectorRequest body = new CreateConnectorRequest(CONNECTOR_NAME, Collections.singletonMap(ConnectorConfig.NAME_CONFIG, CONNECTOR_NAME)); |
||||
|
||||
final Capture<Callback<Herder.Created<ConnectorInfo>>> cb = Capture.newInstance(); |
||||
herder.putConnectorConfig(EasyMock.eq(CONNECTOR_NAME), EasyMock.eq(body.config()), EasyMock.eq(false), EasyMock.capture(cb)); |
||||
expectAndCallbackResult(cb, new Herder.Created<>(true, new ConnectorInfo(CONNECTOR_NAME, CONNECTOR_CONFIG, CONNECTOR_TASK_NAMES))); |
||||
|
||||
PowerMock.replayAll(); |
||||
|
||||
connectorsResource.createConnector(body); |
||||
|
||||
PowerMock.verifyAll(); |
||||
} |
||||
|
||||
@Test |
||||
public void testCreateConnectorNotLeader() throws Throwable { |
||||
CreateConnectorRequest body = new CreateConnectorRequest(CONNECTOR_NAME, Collections.singletonMap(ConnectorConfig.NAME_CONFIG, CONNECTOR_NAME)); |
||||
|
||||
final Capture<Callback<Herder.Created<ConnectorInfo>>> cb = Capture.newInstance(); |
||||
herder.putConnectorConfig(EasyMock.eq(CONNECTOR_NAME), EasyMock.eq(body.config()), EasyMock.eq(false), EasyMock.capture(cb)); |
||||
expectAndCallbackNotLeaderException(cb); |
||||
// Should forward request
|
||||
EasyMock.expect(RestServer.httpRequest(EasyMock.eq("http://leader:8083/connectors"), EasyMock.eq("POST"), EasyMock.eq(body), EasyMock.<TypeReference>anyObject())) |
||||
.andReturn(new RestServer.HttpResponse<>(201, new HashMap<String, List<String>>(), new ConnectorInfo(CONNECTOR_NAME, CONNECTOR_CONFIG, CONNECTOR_TASK_NAMES))); |
||||
|
||||
PowerMock.replayAll(); |
||||
|
||||
connectorsResource.createConnector(body); |
||||
|
||||
PowerMock.verifyAll(); |
||||
} |
||||
|
||||
@Test(expected = AlreadyExistsException.class) |
||||
public void testCreateConnectorExists() throws Throwable { |
||||
CreateConnectorRequest body = new CreateConnectorRequest(CONNECTOR_NAME, Collections.singletonMap(ConnectorConfig.NAME_CONFIG, CONNECTOR_NAME)); |
||||
|
||||
final Capture<Callback<Herder.Created<ConnectorInfo>>> cb = Capture.newInstance(); |
||||
herder.putConnectorConfig(EasyMock.eq(CONNECTOR_NAME), EasyMock.eq(body.config()), EasyMock.eq(false), EasyMock.capture(cb)); |
||||
expectAndCallbackException(cb, new AlreadyExistsException("already exists")); |
||||
|
||||
PowerMock.replayAll(); |
||||
|
||||
connectorsResource.createConnector(body); |
||||
|
||||
PowerMock.verifyAll(); |
||||
} |
||||
|
||||
@Test |
||||
public void testDeleteConnector() throws Throwable { |
||||
final Capture<Callback<Herder.Created<ConnectorInfo>>> cb = Capture.newInstance(); |
||||
herder.putConnectorConfig(EasyMock.eq(CONNECTOR_NAME), EasyMock.<Map<String, String>>isNull(), EasyMock.eq(true), EasyMock.capture(cb)); |
||||
expectAndCallbackResult(cb, null); |
||||
|
||||
PowerMock.replayAll(); |
||||
|
||||
connectorsResource.destroyConnector(CONNECTOR_NAME); |
||||
|
||||
PowerMock.verifyAll(); |
||||
} |
||||
|
||||
@Test |
||||
public void testDeleteConnectorNotLeader() throws Throwable { |
||||
final Capture<Callback<Herder.Created<ConnectorInfo>>> cb = Capture.newInstance(); |
||||
herder.putConnectorConfig(EasyMock.eq(CONNECTOR_NAME), EasyMock.<Map<String, String>>isNull(), EasyMock.eq(true), EasyMock.capture(cb)); |
||||
expectAndCallbackNotLeaderException(cb); |
||||
// Should forward request
|
||||
EasyMock.expect(RestServer.httpRequest("http://leader:8083/connectors/" + CONNECTOR_NAME, "DELETE", null, null)) |
||||
.andReturn(new RestServer.HttpResponse<>(204, new HashMap<String, List<String>>(), null)); |
||||
|
||||
PowerMock.replayAll(); |
||||
|
||||
connectorsResource.destroyConnector(CONNECTOR_NAME); |
||||
|
||||
PowerMock.verifyAll(); |
||||
} |
||||
|
||||
// Not found exceptions should pass through to caller so they can be processed for 404s
|
||||
@Test(expected = NotFoundException.class) |
||||
public void testDeleteConnectorNotFound() throws Throwable { |
||||
final Capture<Callback<Herder.Created<ConnectorInfo>>> cb = Capture.newInstance(); |
||||
herder.putConnectorConfig(EasyMock.eq(CONNECTOR_NAME), EasyMock.<Map<String, String>>isNull(), EasyMock.eq(true), EasyMock.capture(cb)); |
||||
expectAndCallbackException(cb, new NotFoundException("not found")); |
||||
|
||||
PowerMock.replayAll(); |
||||
|
||||
connectorsResource.destroyConnector(CONNECTOR_NAME); |
||||
|
||||
PowerMock.verifyAll(); |
||||
} |
||||
|
||||
@Test |
||||
public void testGetConnector() throws Throwable { |
||||
final Capture<Callback<ConnectorInfo>> cb = Capture.newInstance(); |
||||
herder.connectorInfo(EasyMock.eq(CONNECTOR_NAME), EasyMock.capture(cb)); |
||||
expectAndCallbackResult(cb, new ConnectorInfo(CONNECTOR_NAME, CONNECTOR_CONFIG, CONNECTOR_TASK_NAMES)); |
||||
|
||||
PowerMock.replayAll(); |
||||
|
||||
ConnectorInfo connInfo = connectorsResource.getConnector(CONNECTOR_NAME); |
||||
assertEquals(new ConnectorInfo(CONNECTOR_NAME, CONNECTOR_CONFIG, CONNECTOR_TASK_NAMES), connInfo); |
||||
|
||||
PowerMock.verifyAll(); |
||||
} |
||||
|
||||
@Test |
||||
public void testGetConnectorConfig() throws Throwable { |
||||
final Capture<Callback<Map<String, String>>> cb = Capture.newInstance(); |
||||
herder.connectorConfig(EasyMock.eq(CONNECTOR_NAME), EasyMock.capture(cb)); |
||||
expectAndCallbackResult(cb, CONNECTOR_CONFIG); |
||||
|
||||
PowerMock.replayAll(); |
||||
|
||||
Map<String, String> connConfig = connectorsResource.getConnectorConfig(CONNECTOR_NAME); |
||||
assertEquals(CONNECTOR_CONFIG, connConfig); |
||||
|
||||
PowerMock.verifyAll(); |
||||
} |
||||
|
||||
@Test(expected = NotFoundException.class) |
||||
public void testGetConnectorConfigConnectorNotFound() throws Throwable { |
||||
final Capture<Callback<Map<String, String>>> cb = Capture.newInstance(); |
||||
herder.connectorConfig(EasyMock.eq(CONNECTOR_NAME), EasyMock.capture(cb)); |
||||
expectAndCallbackException(cb, new NotFoundException("not found")); |
||||
|
||||
PowerMock.replayAll(); |
||||
|
||||
connectorsResource.getConnectorConfig(CONNECTOR_NAME); |
||||
|
||||
PowerMock.verifyAll(); |
||||
} |
||||
|
||||
@Test |
||||
public void testPutConnectorConfig() throws Throwable { |
||||
final Capture<Callback<Herder.Created<ConnectorInfo>>> cb = Capture.newInstance(); |
||||
herder.putConnectorConfig(EasyMock.eq(CONNECTOR_NAME), EasyMock.eq(CONNECTOR_CONFIG), EasyMock.eq(true), EasyMock.capture(cb)); |
||||
expectAndCallbackResult(cb, new Herder.Created<>(false, new ConnectorInfo(CONNECTOR_NAME, CONNECTOR_CONFIG, CONNECTOR_TASK_NAMES))); |
||||
|
||||
PowerMock.replayAll(); |
||||
|
||||
connectorsResource.putConnectorConfig(CONNECTOR_NAME, CONNECTOR_CONFIG); |
||||
|
||||
PowerMock.verifyAll(); |
||||
} |
||||
|
||||
@Test |
||||
public void testGetConnectorTaskConfigs() throws Throwable { |
||||
final Capture<Callback<List<TaskInfo>>> cb = Capture.newInstance(); |
||||
herder.taskConfigs(EasyMock.eq(CONNECTOR_NAME), EasyMock.capture(cb)); |
||||
expectAndCallbackResult(cb, TASK_INFOS); |
||||
|
||||
PowerMock.replayAll(); |
||||
|
||||
List<TaskInfo> taskInfos = connectorsResource.getTaskConfigs(CONNECTOR_NAME); |
||||
assertEquals(TASK_INFOS, taskInfos); |
||||
|
||||
PowerMock.verifyAll(); |
||||
} |
||||
|
||||
@Test(expected = NotFoundException.class) |
||||
public void testGetConnectorTaskConfigsConnectorNotFound() throws Throwable { |
||||
final Capture<Callback<List<TaskInfo>>> cb = Capture.newInstance(); |
||||
herder.taskConfigs(EasyMock.eq(CONNECTOR_NAME), EasyMock.capture(cb)); |
||||
expectAndCallbackException(cb, new NotFoundException("connector not found")); |
||||
|
||||
PowerMock.replayAll(); |
||||
|
||||
connectorsResource.getTaskConfigs(CONNECTOR_NAME); |
||||
|
||||
PowerMock.verifyAll(); |
||||
} |
||||
|
||||
@Test |
||||
public void testPutConnectorTaskConfigs() throws Throwable { |
||||
final Capture<Callback<Void>> cb = Capture.newInstance(); |
||||
herder.putTaskConfigs(EasyMock.eq(CONNECTOR_NAME), EasyMock.eq(TASK_CONFIGS), EasyMock.capture(cb)); |
||||
expectAndCallbackResult(cb, null); |
||||
|
||||
PowerMock.replayAll(); |
||||
|
||||
connectorsResource.putTaskConfigs(CONNECTOR_NAME, TASK_CONFIGS); |
||||
|
||||
PowerMock.verifyAll(); |
||||
} |
||||
|
||||
@Test(expected = NotFoundException.class) |
||||
public void testPutConnectorTaskConfigsConnectorNotFound() throws Throwable { |
||||
final Capture<Callback<Void>> cb = Capture.newInstance(); |
||||
herder.putTaskConfigs(EasyMock.eq(CONNECTOR_NAME), EasyMock.eq(TASK_CONFIGS), EasyMock.capture(cb)); |
||||
expectAndCallbackException(cb, new NotFoundException("not found")); |
||||
|
||||
PowerMock.replayAll(); |
||||
|
||||
connectorsResource.putTaskConfigs(CONNECTOR_NAME, TASK_CONFIGS); |
||||
|
||||
PowerMock.verifyAll(); |
||||
} |
||||
|
||||
private <T> void expectAndCallbackResult(final Capture<Callback<T>> cb, final T value) { |
||||
PowerMock.expectLastCall().andAnswer(new IAnswer<Void>() { |
||||
@Override |
||||
public Void answer() throws Throwable { |
||||
cb.getValue().onCompletion(null, value); |
||||
return null; |
||||
} |
||||
}); |
||||
} |
||||
|
||||
private <T> void expectAndCallbackException(final Capture<Callback<T>> cb, final Throwable t) { |
||||
PowerMock.expectLastCall().andAnswer(new IAnswer<Void>() { |
||||
@Override |
||||
public Void answer() throws Throwable { |
||||
cb.getValue().onCompletion(t, null); |
||||
return null; |
||||
} |
||||
}); |
||||
} |
||||
|
||||
private <T> void expectAndCallbackNotLeaderException(final Capture<Callback<T>> cb) { |
||||
expectAndCallbackException(cb, new NotLeaderException("not leader test", LEADER_URL)); |
||||
} |
||||
} |
@ -0,0 +1,163 @@
@@ -0,0 +1,163 @@
|
||||
# Licensed to the Apache Software Foundation (ASF) under one or more |
||||
# contributor license agreements. See the NOTICE file distributed with |
||||
# this work for additional information regarding copyright ownership. |
||||
# The ASF licenses this file to You under the Apache License, Version 2.0 |
||||
# (the "License"); you may not use this file except in compliance with |
||||
# the License. You may obtain a copy of the License at |
||||
# |
||||
# http://www.apache.org/licenses/LICENSE-2.0 |
||||
# |
||||
# Unless required by applicable law or agreed to in writing, software |
||||
# distributed under the License is distributed on an "AS IS" BASIS, |
||||
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
||||
# See the License for the specific language governing permissions and |
||||
# limitations under the License. |
||||
|
||||
from kafkatest.tests.kafka_test import KafkaTest |
||||
from kafkatest.services.copycat import CopycatDistributedService, CopycatRestError |
||||
from ducktape.utils.util import wait_until |
||||
import hashlib, subprocess, json, itertools |
||||
|
||||
class CopycatRestApiTest(KafkaTest): |
||||
""" |
||||
Test of Copycat's REST API endpoints. |
||||
""" |
||||
|
||||
INPUT_FILE = "/mnt/copycat.input" |
||||
INPUT_FILE2 = "/mnt/copycat.input2" |
||||
OUTPUT_FILE = "/mnt/copycat.output" |
||||
|
||||
TOPIC = "test" |
||||
OFFSETS_TOPIC = "copycat-offsets" |
||||
CONFIG_TOPIC = "copycat-configs" |
||||
|
||||
# Since tasks can be assigned to any node and we're testing with files, we need to make sure the content is the same |
||||
# across all nodes. |
||||
INPUT_LIST = ["foo", "bar", "baz"] |
||||
INPUTS = "\n".join(INPUT_LIST) + "\n" |
||||
LONGER_INPUT_LIST = ["foo", "bar", "baz", "razz", "ma", "tazz"] |
||||
LONER_INPUTS = "\n".join(LONGER_INPUT_LIST) + "\n" |
||||
|
||||
SCHEMA = { "type": "string", "optional": False } |
||||
|
||||
def __init__(self, test_context): |
||||
super(CopycatRestApiTest, self).__init__(test_context, num_zk=1, num_brokers=1, topics={ |
||||
'test' : { 'partitions': 1, 'replication-factor': 1 } |
||||
}) |
||||
|
||||
self.cc = CopycatDistributedService(test_context, 2, self.kafka, [self.INPUT_FILE, self.INPUT_FILE2, self.OUTPUT_FILE]) |
||||
|
||||
def test_rest_api(self): |
||||
# Template parameters |
||||
self.key_converter = "org.apache.kafka.copycat.json.JsonConverter" |
||||
self.value_converter = "org.apache.kafka.copycat.json.JsonConverter" |
||||
self.schemas = True |
||||
|
||||
self.cc.set_configs(lambda node: self.render("copycat-distributed.properties", node=node)) |
||||
|
||||
self.cc.start() |
||||
|
||||
assert self.cc.list_connectors() == [] |
||||
|
||||
self.logger.info("Creating connectors") |
||||
source_connector_props = self.render("copycat-file-source.properties") |
||||
sink_connector_props = self.render("copycat-file-sink.properties") |
||||
for connector_props in [source_connector_props, sink_connector_props]: |
||||
connector_config = self._config_dict_from_props(connector_props) |
||||
self.cc.create_connector(connector_config) |
||||
|
||||
# We should see the connectors appear |
||||
wait_until(lambda: set(self.cc.list_connectors()) == set(["local-file-source", "local-file-sink"]), |
||||
timeout_sec=10, err_msg="Connectors that were just created did not appear in connector listing") |
||||
|
||||
# We'll only do very simple validation that the connectors and tasks really ran. |
||||
for node in self.cc.nodes: |
||||
node.account.ssh("echo -e -n " + repr(self.INPUTS) + " >> " + self.INPUT_FILE) |
||||
wait_until(lambda: self.validate_output(self.INPUT_LIST), timeout_sec=120, err_msg="Data added to input file was not seen in the output file in a reasonable amount of time.") |
||||
|
||||
|
||||
# Trying to create the same connector again should cause an error |
||||
try: |
||||
self.cc.create_connector(self._config_dict_from_props(source_connector_props)) |
||||
assert False, "creating the same connector should have caused a conflict" |
||||
except CopycatRestError: |
||||
pass # expected |
||||
|
||||
# Validate that we can get info about connectors |
||||
expected_source_info = { |
||||
'name': 'local-file-source', |
||||
'config': self._config_dict_from_props(source_connector_props), |
||||
'tasks': [{ 'connector': 'local-file-source', 'task': 0 }] |
||||
} |
||||
source_info = self.cc.get_connector("local-file-source") |
||||
assert expected_source_info == source_info, "Incorrect info:" + json.dumps(source_info) |
||||
source_config = self.cc.get_connector_config("local-file-source") |
||||
assert expected_source_info['config'] == source_config, "Incorrect config: " + json.dumps(source_config) |
||||
expected_sink_info = { |
||||
'name': 'local-file-sink', |
||||
'config': self._config_dict_from_props(sink_connector_props), |
||||
'tasks': [{ 'connector': 'local-file-sink', 'task': 0 }] |
||||
} |
||||
sink_info = self.cc.get_connector("local-file-sink") |
||||
assert expected_sink_info == sink_info, "Incorrect info:" + json.dumps(sink_info) |
||||
sink_config = self.cc.get_connector_config("local-file-sink") |
||||
assert expected_sink_info['config'] == sink_config, "Incorrect config: " + json.dumps(sink_config) |
||||
|
||||
|
||||
# Validate that we can get info about tasks. This info should definitely be available now without waiting since |
||||
# we've already seen data appear in files. |
||||
# TODO: It would be nice to validate a complete listing, but that doesn't make sense for the file connectors |
||||
expected_source_task_info = [{ |
||||
'id': { 'connector': 'local-file-source', 'task': 0 }, |
||||
'config': { |
||||
'task.class': 'org.apache.kafka.copycat.file.FileStreamSourceTask', |
||||
'file': self.INPUT_FILE, |
||||
'topic': self.TOPIC |
||||
} |
||||
}] |
||||
source_task_info = self.cc.get_connector_tasks("local-file-source") |
||||
assert expected_source_task_info == source_task_info, "Incorrect info:" + json.dumps(source_task_info) |
||||
expected_sink_task_info = [{ |
||||
'id': { 'connector': 'local-file-sink', 'task': 0 }, |
||||
'config': { |
||||
'task.class': 'org.apache.kafka.copycat.file.FileStreamSinkTask', |
||||
'file': self.OUTPUT_FILE, |
||||
'topics': self.TOPIC |
||||
} |
||||
}] |
||||
sink_task_info = self.cc.get_connector_tasks("local-file-sink") |
||||
assert expected_sink_task_info == sink_task_info, "Incorrect info:" + json.dumps(sink_task_info) |
||||
|
||||
file_source_config = self._config_dict_from_props(source_connector_props) |
||||
file_source_config['file'] = self.INPUT_FILE2 |
||||
self.cc.set_connector_config("local-file-source", file_source_config) |
||||
|
||||
# We should also be able to verify that the modified configs caused the tasks to move to the new file and pick up |
||||
# more data. |
||||
for node in self.cc.nodes: |
||||
node.account.ssh("echo -e -n " + repr(self.LONER_INPUTS) + " >> " + self.INPUT_FILE2) |
||||
wait_until(lambda: self.validate_output(self.LONGER_INPUT_LIST), timeout_sec=120, err_msg="Data added to input file was not seen in the output file in a reasonable amount of time.") |
||||
|
||||
self.cc.delete_connector("local-file-source") |
||||
self.cc.delete_connector("local-file-sink") |
||||
wait_until(lambda: len(self.cc.list_connectors()) == 0, timeout_sec=10, err_msg="Deleted connectors did not disappear from REST listing") |
||||
|
||||
def validate_output(self, input): |
||||
input_set = set(input) |
||||
# Output needs to be collected from all nodes because we can't be sure where the tasks will be scheduled. |
||||
output_set = set(itertools.chain(*[ |
||||
[line.strip() for line in self.file_contents(node, self.OUTPUT_FILE)] for node in self.cc.nodes |
||||
])) |
||||
return input_set == output_set |
||||
|
||||
|
||||
def file_contents(self, node, file): |
||||
try: |
||||
# Convert to a list here or the CalledProcessError may be returned during a call to the generator instead of |
||||
# immediately |
||||
return list(node.account.ssh_capture("cat " + file)) |
||||
except subprocess.CalledProcessError: |
||||
return [] |
||||
|
||||
def _config_dict_from_props(self, connector_props): |
||||
return dict([line.strip().split('=', 1) for line in connector_props.split('\n') if line.strip() and not line.strip().startswith('#')]) |
Loading…
Reference in new issue