Browse Source

KAFKA-14346: Remove hard-to-mock RestClient calls (#12828)

Reviewers: Chris Egerton <chrise@aiven.io>
pull/12295/head
Greg Harris 2 years ago committed by GitHub
parent
commit
fca5bfe13c
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 1
      build.gradle
  2. 2
      checkstyle/import-control.xml
  3. 3
      connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorMaker.java
  4. 7
      connect/runtime/src/main/java/org/apache/kafka/connect/cli/ConnectDistributed.java
  5. 3
      connect/runtime/src/main/java/org/apache/kafka/connect/cli/ConnectStandalone.java
  6. 52
      connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/DistributedHerder.java
  7. 35
      connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/RestClient.java
  8. 9
      connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/RestServer.java
  9. 13
      connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/resources/ConnectorsResource.java
  10. 256
      connect/runtime/src/test/java/org/apache/kafka/connect/integration/RestForwardingIntegrationTest.java
  11. 13
      connect/runtime/src/test/java/org/apache/kafka/connect/runtime/distributed/DistributedHerderTest.java
  12. 24
      connect/runtime/src/test/java/org/apache/kafka/connect/runtime/rest/RestClientTest.java
  13. 26
      connect/runtime/src/test/java/org/apache/kafka/connect/runtime/rest/RestServerTest.java
  14. 109
      connect/runtime/src/test/java/org/apache/kafka/connect/runtime/rest/resources/ConnectorsResourceTest.java

1
build.gradle

@ -2627,6 +2627,7 @@ project(':connect:runtime') { @@ -2627,6 +2627,7 @@ project(':connect:runtime') {
testImplementation libs.httpclient
testRuntimeOnly libs.slf4jlog4j
testRuntimeOnly libs.bcpkix
}
javadoc {

2
checkstyle/import-control.xml

@ -630,6 +630,8 @@ @@ -630,6 +630,8 @@
<allow pkg="org.apache.kafka.connect" />
<allow pkg="org.apache.kafka.tools" />
<allow pkg="javax.ws.rs" />
<allow pkg="org.apache.http"/>
<allow pkg="org.eclipse.jetty.util"/>
</subpackage>
<subpackage name="json">

3
connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorMaker.java

@ -255,9 +255,10 @@ public class MirrorMaker { @@ -255,9 +255,10 @@ public class MirrorMaker {
// Pass the shared admin to the distributed herder as an additional AutoCloseable object that should be closed when the
// herder is stopped. MirrorMaker has multiple herders, and having the herder own the close responsibility is much easier than
// tracking the various shared admin objects in this class.
// Do not provide a restClient to the DistributedHerder to indicate that request forwarding is disabled
Herder herder = new DistributedHerder(distributedConfig, time, worker,
kafkaClusterId, statusBackingStore, configBackingStore,
advertisedUrl, CLIENT_CONFIG_OVERRIDE_POLICY, sharedAdmin);
advertisedUrl, null, CLIENT_CONFIG_OVERRIDE_POLICY, sharedAdmin);
herders.put(sourceAndTarget, herder);
}

7
connect/runtime/src/main/java/org/apache/kafka/connect/cli/ConnectDistributed.java

@ -28,6 +28,7 @@ import org.apache.kafka.connect.runtime.WorkerInfo; @@ -28,6 +28,7 @@ import org.apache.kafka.connect.runtime.WorkerInfo;
import org.apache.kafka.connect.runtime.distributed.DistributedConfig;
import org.apache.kafka.connect.runtime.distributed.DistributedHerder;
import org.apache.kafka.connect.runtime.isolation.Plugins;
import org.apache.kafka.connect.runtime.rest.RestClient;
import org.apache.kafka.connect.runtime.rest.RestServer;
import org.apache.kafka.connect.storage.ConfigBackingStore;
import org.apache.kafka.connect.storage.Converter;
@ -99,7 +100,9 @@ public class ConnectDistributed { @@ -99,7 +100,9 @@ public class ConnectDistributed {
String kafkaClusterId = config.kafkaClusterId();
log.debug("Kafka cluster ID: {}", kafkaClusterId);
RestServer rest = new RestServer(config);
RestClient restClient = new RestClient(config);
RestServer rest = new RestServer(config, restClient);
rest.initializeServer();
URI advertisedUrl = rest.advertisedUrl();
@ -138,7 +141,7 @@ public class ConnectDistributed { @@ -138,7 +141,7 @@ public class ConnectDistributed {
// herder is stopped. This is easier than having to track and own the lifecycle ourselves.
DistributedHerder herder = new DistributedHerder(config, time, worker,
kafkaClusterId, statusBackingStore, configBackingStore,
advertisedUrl.toString(), connectorClientConfigOverridePolicy, sharedAdmin);
advertisedUrl.toString(), restClient, connectorClientConfigOverridePolicy, sharedAdmin);
final Connect connect = new Connect(herder, rest);
log.info("Kafka Connect distributed worker initialization took {}ms", time.hiResClockMs() - initStart);

3
connect/runtime/src/main/java/org/apache/kafka/connect/cli/ConnectStandalone.java

@ -82,7 +82,8 @@ public class ConnectStandalone { @@ -82,7 +82,8 @@ public class ConnectStandalone {
String kafkaClusterId = config.kafkaClusterId();
log.debug("Kafka cluster ID: {}", kafkaClusterId);
RestServer rest = new RestServer(config);
// Do not initialize a RestClient because the ConnectorsResource will not use it in standalone mode.
RestServer rest = new RestServer(config, null);
rest.initializeServer();
URI advertisedUrl = rest.advertisedUrl();

52
connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/DistributedHerder.java

@ -166,6 +166,7 @@ public class DistributedHerder extends AbstractHerder implements Runnable { @@ -166,6 +166,7 @@ public class DistributedHerder extends AbstractHerder implements Runnable {
private final String requestSignatureAlgorithm;
private final List<String> keySignatureVerificationAlgorithms;
private final KeyGenerator keyGenerator;
private final RestClient restClient;
// Visible for testing
ExecutorService forwardRequestExecutor;
@ -240,9 +241,11 @@ public class DistributedHerder extends AbstractHerder implements Runnable { @@ -240,9 +241,11 @@ public class DistributedHerder extends AbstractHerder implements Runnable {
StatusBackingStore statusBackingStore,
ConfigBackingStore configBackingStore,
String restUrl,
RestClient restClient,
ConnectorClientConfigOverridePolicy connectorClientConfigOverridePolicy,
AutoCloseable... uponShutdown) {
this(config, worker, worker.workerId(), kafkaClusterId, statusBackingStore, configBackingStore, null, restUrl, worker.metrics(),
this(config, worker, worker.workerId(), kafkaClusterId, statusBackingStore, configBackingStore,
null, restUrl, restClient, worker.metrics(),
time, connectorClientConfigOverridePolicy, uponShutdown);
configBackingStore.setUpdateListener(new ConfigUpdateListener());
}
@ -256,6 +259,7 @@ public class DistributedHerder extends AbstractHerder implements Runnable { @@ -256,6 +259,7 @@ public class DistributedHerder extends AbstractHerder implements Runnable {
ConfigBackingStore configBackingStore,
WorkerGroupMember member,
String restUrl,
RestClient restClient,
ConnectMetrics metrics,
Time time,
ConnectorClientConfigOverridePolicy connectorClientConfigOverridePolicy,
@ -272,6 +276,7 @@ public class DistributedHerder extends AbstractHerder implements Runnable { @@ -272,6 +276,7 @@ public class DistributedHerder extends AbstractHerder implements Runnable {
this.keyRotationIntervalMs = config.getInt(DistributedConfig.INTER_WORKER_KEY_TTL_MS_CONFIG);
this.keySignatureVerificationAlgorithms = config.getList(DistributedConfig.INTER_WORKER_VERIFICATION_ALGORITHMS_CONFIG);
this.keyGenerator = config.getInternalRequestKeyGenerator();
this.restClient = restClient;
this.isTopicTrackingEnabled = config.getBoolean(TOPIC_TRACKING_ENABLE_CONFIG);
this.uponShutdown = Arrays.asList(uponShutdown);
@ -1157,16 +1162,30 @@ public class DistributedHerder extends AbstractHerder implements Runnable { @@ -1157,16 +1162,30 @@ public class DistributedHerder extends AbstractHerder implements Runnable {
if (error == null) {
callback.onCompletion(null, null);
} else if (error instanceof NotLeaderException) {
String forwardedUrl = ((NotLeaderException) error).forwardUrl() + "connectors/" + id.connector() + "/fence";
log.trace("Forwarding zombie fencing request for connector {} to leader at {}", id.connector(), forwardedUrl);
forwardRequestExecutor.execute(() -> {
try {
RestClient.httpRequest(forwardedUrl, "PUT", null, null, null, config, sessionKey, requestSignatureAlgorithm);
callback.onCompletion(null, null);
} catch (Throwable t) {
callback.onCompletion(t, null);
}
});
if (restClient != null) {
String forwardedUrl = ((NotLeaderException) error).forwardUrl() + "connectors/" + id.connector() + "/fence";
log.trace("Forwarding zombie fencing request for connector {} to leader at {}", id.connector(), forwardedUrl);
forwardRequestExecutor.execute(() -> {
try {
restClient.httpRequest(forwardedUrl, "PUT", null, null, null, sessionKey, requestSignatureAlgorithm);
callback.onCompletion(null, null);
} catch (Throwable t) {
callback.onCompletion(t, null);
}
});
} else {
callback.onCompletion(
new ConnectException(
// TODO: Update this message if KIP-710 is accepted and merged
// (https://cwiki.apache.org/confluence/display/KAFKA/KIP-710%3A+Full+support+for+distributed+mode+in+dedicated+MirrorMaker+2.0+clusters)
"This worker is not able to communicate with the leader of the cluster, "
+ "which is required for exactly-once source tasks. If running MirrorMaker 2 "
+ "in dedicated mode, consider either disabling exactly-once support, or deploying "
+ "the connectors for MirrorMaker 2 directly onto a distributed Kafka Connect cluster."
),
null
);
}
} else {
error = ConnectUtils.maybeWrap(error, "Failed to perform zombie fencing");
callback.onCompletion(error, null);
@ -1906,6 +1925,15 @@ public class DistributedHerder extends AbstractHerder implements Runnable { @@ -1906,6 +1925,15 @@ public class DistributedHerder extends AbstractHerder implements Runnable {
if (isLeader()) {
writeToConfigTopicAsLeader(() -> configBackingStore.putTaskConfigs(connName, rawTaskProps));
cb.onCompletion(null, null);
} else if (restClient == null) {
// TODO: Update this message if KIP-710 is accepted and merged
// (https://cwiki.apache.org/confluence/display/KAFKA/KIP-710%3A+Full+support+for+distributed+mode+in+dedicated+MirrorMaker+2.0+clusters)
throw new NotLeaderException("This worker is not able to communicate with the leader of the cluster, "
+ "which is required for dynamically-reconfiguring connectors. If running MirrorMaker 2 "
+ "in dedicated mode, consider deploying the connectors for MirrorMaker 2 directly onto a "
+ "distributed Kafka Connect cluster.",
leaderUrl()
);
} else {
// We cannot forward the request on the same thread because this reconfiguration can happen as a result of connector
// addition or removal. If we blocked waiting for the response from leader, we may be kicked out of the worker group.
@ -1925,7 +1953,7 @@ public class DistributedHerder extends AbstractHerder implements Runnable { @@ -1925,7 +1953,7 @@ public class DistributedHerder extends AbstractHerder implements Runnable {
.build()
.toString();
log.trace("Forwarding task configurations for connector {} to leader", connName);
RestClient.httpRequest(reconfigUrl, "POST", null, rawTaskProps, null, config, sessionKey, requestSignatureAlgorithm);
restClient.httpRequest(reconfigUrl, "POST", null, rawTaskProps, null, sessionKey, requestSignatureAlgorithm);
cb.onCompletion(null, null);
} catch (ConnectException e) {
log.error("Request to leader to reconfigure connector tasks failed", e);

35
connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/RestClient.java

@ -43,9 +43,23 @@ import java.util.Map; @@ -43,9 +43,23 @@ import java.util.Map;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeoutException;
/**
* Client for outbound REST requests to other members of a Connect cluster
* This class is thread-safe.
*/
public class RestClient {
private static final Logger log = LoggerFactory.getLogger(RestClient.class);
private static final ObjectMapper JSON_SERDE = new ObjectMapper();
private WorkerConfig config;
public RestClient(WorkerConfig config) {
this.config = config;
}
// VisibleForTesting
HttpClient httpClient() {
return new HttpClient(SSLUtils.createClientSideSslContextFactory(config));
}
/**
* Sends HTTP request to remote REST server
@ -58,9 +72,9 @@ public class RestClient { @@ -58,9 +72,9 @@ public class RestClient {
* @param <T> The type of the deserialized response to the HTTP request.
* @return The deserialized response to the HTTP request, or null if no data is expected.
*/
public static <T> HttpResponse<T> httpRequest(String url, String method, HttpHeaders headers, Object requestBodyData,
TypeReference<T> responseFormat, WorkerConfig config) {
return httpRequest(url, method, headers, requestBodyData, responseFormat, config, null, null);
public <T> HttpResponse<T> httpRequest(String url, String method, HttpHeaders headers, Object requestBodyData,
TypeReference<T> responseFormat) {
return httpRequest(url, method, headers, requestBodyData, responseFormat, null, null);
}
/**
@ -78,17 +92,10 @@ public class RestClient { @@ -78,17 +92,10 @@ public class RestClient {
* may be null if the request doesn't need to be signed
* @return The deserialized response to the HTTP request, or null if no data is expected.
*/
public static <T> HttpResponse<T> httpRequest(String url, String method, HttpHeaders headers, Object requestBodyData,
TypeReference<T> responseFormat, WorkerConfig config,
public <T> HttpResponse<T> httpRequest(String url, String method, HttpHeaders headers, Object requestBodyData,
TypeReference<T> responseFormat,
SecretKey sessionKey, String requestSignatureAlgorithm) {
HttpClient client;
if (url.startsWith("https://")) {
client = new HttpClient(SSLUtils.createClientSideSslContextFactory(config));
} else {
client = new HttpClient();
}
HttpClient client = httpClient();
client.setFollowRedirects(false);
try {
@ -109,7 +116,7 @@ public class RestClient { @@ -109,7 +116,7 @@ public class RestClient {
}
}
static <T> HttpResponse<T> httpRequest(HttpClient client, String url, String method,
private <T> HttpResponse<T> httpRequest(HttpClient client, String url, String method,
HttpHeaders headers, Object requestBodyData,
TypeReference<T> responseFormat, SecretKey sessionKey,
String requestSignatureAlgorithm) {

9
connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/RestServer.java

@ -85,6 +85,8 @@ public class RestServer { @@ -85,6 +85,8 @@ public class RestServer {
private static final String PROTOCOL_HTTPS = "https";
private final WorkerConfig config;
private final RestClient restClient;
private final ContextHandlerCollection handlers;
private final Server jettyServer;
@ -94,8 +96,9 @@ public class RestServer { @@ -94,8 +96,9 @@ public class RestServer {
/**
* Create a REST server for this herder using the specified configs.
*/
public RestServer(WorkerConfig config) {
public RestServer(WorkerConfig config, RestClient restClient) {
this.config = config;
this.restClient = restClient;
List<String> listeners = config.getList(WorkerConfig.LISTENERS_CONFIG);
List<String> adminListeners = config.getList(WorkerConfig.ADMIN_LISTENERS_CONFIG);
@ -218,7 +221,7 @@ public class RestServer { @@ -218,7 +221,7 @@ public class RestServer {
this.resources = new ArrayList<>();
resources.add(new RootResource(herder));
resources.add(new ConnectorsResource(herder, config));
resources.add(new ConnectorsResource(herder, config, restClient));
resources.add(new ConnectorPluginsResource(herder));
resources.forEach(resourceConfig::register);
@ -354,6 +357,8 @@ public class RestServer { @@ -354,6 +357,8 @@ public class RestServer {
builder.port(advertisedPort);
else if (serverConnector != null && serverConnector.getPort() > 0)
builder.port(serverConnector.getPort());
else if (serverConnector != null && serverConnector.getLocalPort() > 0)
builder.port(serverConnector.getLocalPort());
log.info("Advertised URI: {}", builder.build());

13
connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/resources/ConnectorsResource.java

@ -82,16 +82,16 @@ public class ConnectorsResource implements ConnectResource { @@ -82,16 +82,16 @@ public class ConnectorsResource implements ConnectResource {
new TypeReference<List<Map<String, String>>>() { };
private final Herder herder;
private final WorkerConfig config;
private final RestClient restClient;
private long requestTimeoutMs;
@javax.ws.rs.core.Context
private ServletContext context;
private final boolean isTopicTrackingDisabled;
private final boolean isTopicTrackingResetDisabled;
public ConnectorsResource(Herder herder, WorkerConfig config) {
public ConnectorsResource(Herder herder, WorkerConfig config, RestClient restClient) {
this.herder = herder;
this.config = config;
this.restClient = restClient;
isTopicTrackingDisabled = !config.getBoolean(TOPIC_TRACKING_ENABLE_CONFIG);
isTopicTrackingResetDisabled = !config.getBoolean(TOPIC_TRACKING_ALLOW_RESET_CONFIG);
this.requestTimeoutMs = DEFAULT_REST_REQUEST_TIMEOUT_MS;
@ -404,7 +404,10 @@ public class ConnectorsResource implements ConnectResource { @@ -404,7 +404,10 @@ public class ConnectorsResource implements ConnectResource {
Throwable cause = e.getCause();
if (cause instanceof RequestTargetException) {
if (forward == null || forward) {
if (restClient == null) {
throw new ConnectRestException(Response.Status.INTERNAL_SERVER_ERROR.getStatusCode(),
"Cannot complete request as non-leader with request forwarding disabled");
} else if (forward == null || forward) {
// the only time we allow recursive forwarding is when no forward flag has
// been set, which should only be seen by the first worker to handle a user request.
// this gives two total hops to resolve the request before giving up.
@ -425,7 +428,7 @@ public class ConnectorsResource implements ConnectResource { @@ -425,7 +428,7 @@ public class ConnectorsResource implements ConnectResource {
}
String forwardUrl = uriBuilder.build().toString();
log.debug("Forwarding request {} {} {}", forwardUrl, method, body);
return translator.translate(RestClient.httpRequest(forwardUrl, method, headers, body, resultType, config));
return translator.translate(restClient.httpRequest(forwardUrl, method, headers, body, resultType));
} else {
// we should find the right target for the query within two hops, so if
// we don't, it probably means that a rebalance has taken place.

256
connect/runtime/src/test/java/org/apache/kafka/connect/integration/RestForwardingIntegrationTest.java

@ -0,0 +1,256 @@ @@ -0,0 +1,256 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.connect.integration;
import org.apache.http.HttpHost;
import org.apache.http.HttpRequest;
import org.apache.http.HttpResponse;
import org.apache.http.client.methods.CloseableHttpResponse;
import org.apache.http.client.methods.HttpPost;
import org.apache.http.entity.StringEntity;
import org.apache.http.impl.client.CloseableHttpClient;
import org.apache.http.impl.client.HttpClients;
import org.apache.kafka.clients.CommonClientConfigs;
import org.apache.kafka.common.config.types.Password;
import org.apache.kafka.common.network.Mode;
import org.apache.kafka.common.utils.Utils;
import org.apache.kafka.connect.runtime.Herder;
import org.apache.kafka.connect.runtime.WorkerConfig;
import org.apache.kafka.connect.runtime.distributed.DistributedConfig;
import org.apache.kafka.connect.runtime.distributed.NotLeaderException;
import org.apache.kafka.connect.runtime.distributed.RequestTargetException;
import org.apache.kafka.connect.runtime.isolation.Plugins;
import org.apache.kafka.connect.runtime.rest.RestClient;
import org.apache.kafka.connect.runtime.rest.RestServer;
import org.apache.kafka.connect.runtime.rest.entities.ConnectorInfo;
import org.apache.kafka.connect.runtime.rest.entities.ConnectorType;
import org.apache.kafka.connect.runtime.rest.util.SSLUtils;
import org.apache.kafka.connect.util.Callback;
import org.apache.kafka.test.IntegrationTest;
import org.apache.kafka.test.TestSslUtils;
import org.apache.kafka.test.TestUtils;
import org.eclipse.jetty.util.ssl.SslContextFactory;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
import org.junit.experimental.categories.Category;
import org.junit.runner.RunWith;
import org.mockito.ArgumentCaptor;
import org.mockito.Mock;
import org.mockito.junit.MockitoJUnitRunner;
import javax.net.ssl.SSLContext;
import java.io.IOException;
import java.net.URI;
import java.nio.charset.StandardCharsets;
import java.security.GeneralSecurityException;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.atomic.AtomicReference;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.anyBoolean;
import static org.mockito.Mockito.doAnswer;
import static org.mockito.Mockito.when;
@RunWith(MockitoJUnitRunner.StrictStubs.class)
@SuppressWarnings("unchecked")
@Category(IntegrationTest.class)
public class RestForwardingIntegrationTest {
private Map<String, Object> sslConfig;
@Mock
private Plugins plugins;
private RestServer followerServer;
@Mock
private Herder followerHerder;
private RestServer leaderServer;
@Mock
private Herder leaderHerder;
private SslContextFactory factory;
private CloseableHttpClient httpClient;
private Collection<CloseableHttpResponse> responses;
@Before
public void setUp() throws IOException, GeneralSecurityException {
sslConfig = TestSslUtils.createSslConfig(false, true, Mode.SERVER, TestUtils.tempFile(), "testCert");
responses = new ArrayList<>();
}
@After
public void tearDown() throws IOException {
for (CloseableHttpResponse response: responses) {
response.close();
}
AtomicReference<Throwable> firstException = new AtomicReference<>();
Utils.closeAllQuietly(
firstException,
"clientsAndServers",
httpClient,
followerServer != null ? followerServer::stop : null,
leaderServer != null ? leaderServer::stop : null,
factory != null ? factory::stop : null
);
if (firstException.get() != null) {
throw new RuntimeException("Unable to cleanly close resources", firstException.get());
}
}
@Test
public void testRestForwardNoSsl() throws Exception {
// A cluster with no SSL configured whatsoever, using HTTP
testRestForwardToLeader(false, false, false);
}
@Test
public void testRestForwardNoSslDualListener() throws Exception {
// A cluster configured with HTTPS listeners, but still advertising HTTP
testRestForwardToLeader(true, false, false);
}
@Test
public void testRestForwardLeaderSsl() throws Exception {
// A heterogeneous cluster where the leader rolls to advertise HTTPS before the follower
testRestForwardToLeader(true, false, true);
}
@Test
public void testRestForwardFollowerSsl() throws Exception {
// A heterogeneous cluster where the follower rolls to advertise HTTPS before the leader
testRestForwardToLeader(true, true, false);
}
@Test
public void testRestForwardSslDualListener() throws Exception {
// A cluster that has just rolled to advertise HTTPS on both workers
testRestForwardToLeader(true, true, true);
}
@Test
public void testRestForwardSsl() throws Exception {
// A cluster that has finished rolling to SSL and disabled the HTTP listener
testRestForwardToLeader(false, true, true);
}
public void testRestForwardToLeader(boolean dualListener, boolean followerSsl, boolean leaderSsl) throws Exception {
DistributedConfig followerConfig = new DistributedConfig(baseWorkerProps(dualListener, followerSsl));
DistributedConfig leaderConfig = new DistributedConfig(baseWorkerProps(dualListener, leaderSsl));
// Follower worker setup
RestClient followerClient = new RestClient(followerConfig);
followerServer = new RestServer(followerConfig, followerClient);
followerServer.initializeServer();
when(followerHerder.plugins()).thenReturn(plugins);
followerServer.initializeResources(followerHerder);
// Leader worker setup
RestClient leaderClient = new RestClient(leaderConfig);
leaderServer = new RestServer(leaderConfig, leaderClient);
leaderServer.initializeServer();
when(leaderHerder.plugins()).thenReturn(plugins);
leaderServer.initializeResources(leaderHerder);
// External client setup
factory = SSLUtils.createClientSideSslContextFactory(followerConfig);
factory.start();
SSLContext ssl = factory.getSslContext();
httpClient = HttpClients.custom()
.setSSLContext(ssl)
.build();
// Follower will forward to the leader
URI leaderUrl = leaderServer.advertisedUrl();
RequestTargetException forwardException = new NotLeaderException("Not leader", leaderUrl.toString());
ArgumentCaptor<Callback<Herder.Created<ConnectorInfo>>> followerCallbackCaptor = ArgumentCaptor.forClass(Callback.class);
doAnswer(invocation -> {
followerCallbackCaptor.getValue().onCompletion(forwardException, null);
return null;
}).when(followerHerder)
.putConnectorConfig(any(), any(), anyBoolean(), followerCallbackCaptor.capture());
// Leader will reply
ConnectorInfo connectorInfo = new ConnectorInfo("blah", Collections.emptyMap(), Collections.emptyList(), ConnectorType.SOURCE);
Herder.Created<ConnectorInfo> leaderAnswer = new Herder.Created<>(true, connectorInfo);
ArgumentCaptor<Callback<Herder.Created<ConnectorInfo>>> leaderCallbackCaptor = ArgumentCaptor.forClass(Callback.class);
doAnswer(invocation -> {
leaderCallbackCaptor.getValue().onCompletion(null, leaderAnswer);
return null;
}).when(leaderHerder)
.putConnectorConfig(any(), any(), anyBoolean(), leaderCallbackCaptor.capture());
// Client makes request to the follower
URI followerUrl = followerServer.advertisedUrl();
HttpPost request = new HttpPost("/connectors");
String jsonBody = "{" +
"\"name\": \"blah\"," +
"\"config\": {}" +
"}";
StringEntity entity = new StringEntity(jsonBody, StandardCharsets.UTF_8.name());
entity.setContentType("application/json");
request.setEntity(entity);
HttpResponse httpResponse = executeRequest(followerUrl, request);
// And sees the success from the leader
assertEquals(201, httpResponse.getStatusLine().getStatusCode());
}
private Map<String, String> baseWorkerProps(boolean dualListener, boolean advertiseSSL) {
Map<String, String> workerProps = new HashMap<>();
workerProps.put(DistributedConfig.STATUS_STORAGE_TOPIC_CONFIG, "status-topic");
workerProps.put(DistributedConfig.CONFIG_TOPIC_CONFIG, "config-topic");
workerProps.put(CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
workerProps.put(DistributedConfig.GROUP_ID_CONFIG, "connect-test-group");
workerProps.put(WorkerConfig.KEY_CONVERTER_CLASS_CONFIG, "org.apache.kafka.connect.json.JsonConverter");
workerProps.put(WorkerConfig.VALUE_CONVERTER_CLASS_CONFIG, "org.apache.kafka.connect.json.JsonConverter");
workerProps.put(DistributedConfig.OFFSET_STORAGE_TOPIC_CONFIG, "connect-offsets");
if (dualListener || advertiseSSL) {
for (String k : sslConfig.keySet()) {
if (sslConfig.get(k) instanceof Password) {
workerProps.put(k, ((Password) sslConfig.get(k)).value());
} else if (sslConfig.get(k) instanceof List) {
workerProps.put(k, String.join(",", (List<String>) sslConfig.get(k)));
} else {
workerProps.put(k, sslConfig.get(k).toString());
}
}
}
if (dualListener) {
workerProps.put(WorkerConfig.LISTENERS_CONFIG, "http://localhost:0, https://localhost:0");
// This server is brought up with both a plaintext and an SSL listener; we use this property
// to dictate which URL it advertises to other servers when a request must be forwarded to it
// and which URL we issue requests against during testing
workerProps.put(WorkerConfig.REST_ADVERTISED_LISTENER_CONFIG, advertiseSSL ? "https" : "http");
} else {
workerProps.put(WorkerConfig.LISTENERS_CONFIG, advertiseSSL ? "https://localhost:0" : "http://localhost:0");
}
return workerProps;
}
private HttpResponse executeRequest(URI serverUrl, HttpRequest request) throws IOException {
HttpHost httpHost = new HttpHost(serverUrl.getHost(), serverUrl.getPort(), serverUrl.getScheme());
CloseableHttpResponse response = httpClient.execute(httpHost, request);
responses.add(response);
return response;
}
}

13
connect/runtime/src/test/java/org/apache/kafka/connect/runtime/distributed/DistributedHerderTest.java

@ -122,7 +122,7 @@ import static org.junit.Assert.assertTrue; @@ -122,7 +122,7 @@ import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
@RunWith(PowerMockRunner.class)
@PrepareForTest({DistributedHerder.class, RestClient.class})
@PrepareForTest({DistributedHerder.class})
@PowerMockIgnore({"javax.management.*", "javax.crypto.*"})
public class DistributedHerderTest {
private static final Map<String, String> HERDER_CONFIG = new HashMap<>();
@ -212,6 +212,7 @@ public class DistributedHerderTest { @@ -212,6 +212,7 @@ public class DistributedHerderTest {
@Mock private WorkerConfigTransformer transformer;
@Mock private Callback<Herder.Created<ConnectorInfo>> putConnectorCallback;
@Mock private Plugins plugins;
@Mock private RestClient restClient;
private CountDownLatch shutdownCalled = new CountDownLatch(1);
private ConfigBackingStore.UpdateListener configUpdateListener;
@ -240,7 +241,7 @@ public class DistributedHerderTest { @@ -240,7 +241,7 @@ public class DistributedHerderTest {
herder = PowerMock.createPartialMock(DistributedHerder.class,
new String[]{"connectorType", "updateDeletedConnectorStatus", "updateDeletedTaskStatus", "validateConnectorConfig", "buildRestartPlan", "recordRestarting"},
new DistributedConfig(HERDER_CONFIG), worker, WORKER_ID, KAFKA_CLUSTER_ID,
statusBackingStore, configBackingStore, member, MEMBER_URL, metrics, time, noneConnectorClientConfigOverridePolicy,
statusBackingStore, configBackingStore, member, MEMBER_URL, restClient, metrics, time, noneConnectorClientConfigOverridePolicy,
new AutoCloseable[]{uponShutdown});
configUpdateListener = herder.new ConfigUpdateListener();
@ -2969,11 +2970,9 @@ public class DistributedHerderTest { @@ -2969,11 +2970,9 @@ public class DistributedHerderTest {
member.wakeup();
EasyMock.expectLastCall();
PowerMock.mockStatic(RestClient.class);
org.easymock.IExpectationSetters<RestClient.HttpResponse<Object>> expectRequest = EasyMock.expect(
RestClient.httpRequest(
anyObject(), EasyMock.eq("PUT"), EasyMock.isNull(), EasyMock.isNull(), EasyMock.isNull(), anyObject(), anyObject(), anyObject()
restClient.httpRequest(
anyObject(), EasyMock.eq("PUT"), EasyMock.isNull(), EasyMock.isNull(), EasyMock.isNull(), anyObject(), anyObject()
));
if (succeed) {
expectRequest.andReturn(null);
@ -3835,7 +3834,7 @@ public class DistributedHerderTest { @@ -3835,7 +3834,7 @@ public class DistributedHerderTest {
return PowerMock.createPartialMock(DistributedHerder.class,
new String[]{"connectorType", "updateDeletedConnectorStatus", "updateDeletedTaskStatus", "validateConnectorConfig"},
new DistributedConfig(config), worker, WORKER_ID, KAFKA_CLUSTER_ID,
statusBackingStore, configBackingStore, member, MEMBER_URL, metrics, time, noneConnectorClientConfigOverridePolicy,
statusBackingStore, configBackingStore, member, MEMBER_URL, restClient, metrics, time, noneConnectorClientConfigOverridePolicy,
new AutoCloseable[0]);
}

24
connect/runtime/src/test/java/org/apache/kafka/connect/runtime/rest/RestClientTest.java

@ -50,7 +50,9 @@ import static org.junit.jupiter.api.Assertions.assertEquals; @@ -50,7 +50,9 @@ import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertNull;
import static org.junit.jupiter.api.Assertions.assertThrows;
import static org.mockito.ArgumentMatchers.anyString;
import static org.mockito.Mockito.doReturn;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.spy;
import static org.mockito.Mockito.when;
@RunWith(Enclosed.class)
@ -74,18 +76,20 @@ public class RestClientTest { @@ -74,18 +76,20 @@ public class RestClientTest {
}
private static RestClient.HttpResponse<TestDTO> httpRequest(HttpClient httpClient, String requestSignatureAlgorithm) {
return RestClient.httpRequest(
httpClient,
"https://localhost:1234/api/endpoint",
"GET",
null,
new TestDTO("requestBodyData"),
TEST_TYPE,
MOCK_SECRET_KEY,
requestSignatureAlgorithm);
RestClient client = spy(new RestClient(null));
doReturn(httpClient).when(client).httpClient();
return client.httpRequest(
"https://localhost:1234/api/endpoint",
"GET",
null,
new TestDTO("requestBodyData"),
TEST_TYPE,
MOCK_SECRET_KEY,
requestSignatureAlgorithm
);
}
private static RestClient.HttpResponse<TestDTO> httpRequest(HttpClient httpClient) {
private static RestClient.HttpResponse<TestDTO> httpRequest(HttpClient httpClient) throws Exception {
String validRequestSignatureAlgorithm = "HmacSHA1";
return httpRequest(httpClient, validRequestSignatureAlgorithm);
}

26
connect/runtime/src/test/java/org/apache/kafka/connect/runtime/rest/RestServerTest.java

@ -125,7 +125,7 @@ public class RestServerTest { @@ -125,7 +125,7 @@ public class RestServerTest {
configMap.put(WorkerConfig.LISTENERS_CONFIG, "http://localhost:8080,https://localhost:8443");
DistributedConfig config = new DistributedConfig(configMap);
server = new RestServer(config);
server = new RestServer(config, null);
Assert.assertEquals("http://localhost:8080/", server.advertisedUrl().toString());
server.stop();
@ -135,7 +135,7 @@ public class RestServerTest { @@ -135,7 +135,7 @@ public class RestServerTest {
configMap.put(WorkerConfig.REST_ADVERTISED_LISTENER_CONFIG, "https");
config = new DistributedConfig(configMap);
server = new RestServer(config);
server = new RestServer(config, null);
Assert.assertEquals("https://localhost:8443/", server.advertisedUrl().toString());
server.stop();
@ -144,7 +144,7 @@ public class RestServerTest { @@ -144,7 +144,7 @@ public class RestServerTest {
configMap.put(WorkerConfig.LISTENERS_CONFIG, "https://localhost:8443");
config = new DistributedConfig(configMap);
server = new RestServer(config);
server = new RestServer(config, null);
Assert.assertEquals("https://localhost:8443/", server.advertisedUrl().toString());
server.stop();
@ -156,7 +156,7 @@ public class RestServerTest { @@ -156,7 +156,7 @@ public class RestServerTest {
configMap.put(WorkerConfig.REST_ADVERTISED_PORT_CONFIG, "10000");
config = new DistributedConfig(configMap);
server = new RestServer(config);
server = new RestServer(config, null);
Assert.assertEquals("http://somehost:10000/", server.advertisedUrl().toString());
server.stop();
@ -165,7 +165,7 @@ public class RestServerTest { @@ -165,7 +165,7 @@ public class RestServerTest {
configMap.put(WorkerConfig.LISTENERS_CONFIG, "https://encrypted-localhost:42069,http://plaintext-localhost:4761");
configMap.put(WorkerConfig.REST_ADVERTISED_LISTENER_CONFIG, "http");
config = new DistributedConfig(configMap);
server = new RestServer(config);
server = new RestServer(config, null);
Assert.assertEquals("http://plaintext-localhost:4761/", server.advertisedUrl().toString());
server.stop();
}
@ -179,7 +179,7 @@ public class RestServerTest { @@ -179,7 +179,7 @@ public class RestServerTest {
doReturn(plugins).when(herder).plugins();
doReturn(Collections.emptyList()).when(plugins).newPlugins(Collections.emptyList(), workerConfig, ConnectRestExtension.class);
server = new RestServer(workerConfig);
server = new RestServer(workerConfig, null);
server.initializeServer();
server.initializeResources(herder);
@ -207,7 +207,7 @@ public class RestServerTest { @@ -207,7 +207,7 @@ public class RestServerTest {
doReturn(Collections.emptyList()).when(plugins).newPlugins(Collections.emptyList(), workerConfig, ConnectRestExtension.class);
doReturn(Arrays.asList("a", "b")).when(herder).connectors();
server = new RestServer(workerConfig);
server = new RestServer(workerConfig, null);
server.initializeServer();
server.initializeResources(herder);
URI serverUrl = server.advertisedUrl();
@ -251,7 +251,7 @@ public class RestServerTest { @@ -251,7 +251,7 @@ public class RestServerTest {
doReturn(Collections.emptyList()).when(plugins).newPlugins(Collections.emptyList(), workerConfig, ConnectRestExtension.class);
doReturn(Arrays.asList("a", "b")).when(herder).connectors();
server = new RestServer(workerConfig);
server = new RestServer(workerConfig, null);
server.initializeServer();
server.initializeResources(herder);
HttpRequest request = new HttpGet("/connectors");
@ -272,7 +272,7 @@ public class RestServerTest { @@ -272,7 +272,7 @@ public class RestServerTest {
// create some loggers in the process
LoggerFactory.getLogger("a.b.c.s.W");
server = new RestServer(workerConfig);
server = new RestServer(workerConfig, null);
server.initializeServer();
server.initializeResources(herder);
@ -307,7 +307,7 @@ public class RestServerTest { @@ -307,7 +307,7 @@ public class RestServerTest {
LoggerFactory.getLogger("a.b.c.p.Y");
LoggerFactory.getLogger("a.b.c.p.Z");
server = new RestServer(workerConfig);
server = new RestServer(workerConfig, null);
server.initializeServer();
server.initializeResources(herder);
@ -331,7 +331,7 @@ public class RestServerTest { @@ -331,7 +331,7 @@ public class RestServerTest {
doReturn(plugins).when(herder).plugins();
doReturn(Collections.emptyList()).when(plugins).newPlugins(Collections.emptyList(), workerConfig, ConnectRestExtension.class);
server = new RestServer(workerConfig);
server = new RestServer(workerConfig, null);
server.initializeServer();
server.initializeResources(herder);
@ -351,7 +351,7 @@ public class RestServerTest { @@ -351,7 +351,7 @@ public class RestServerTest {
doReturn(plugins).when(herder).plugins();
doReturn(Collections.emptyList()).when(plugins).newPlugins(Collections.emptyList(), workerConfig, ConnectRestExtension.class);
server = new RestServer(workerConfig);
server = new RestServer(workerConfig, null);
server.initializeServer();
server.initializeResources(herder);
@ -398,7 +398,7 @@ public class RestServerTest { @@ -398,7 +398,7 @@ public class RestServerTest {
doReturn(Collections.emptyList()).when(plugins).newPlugins(Collections.emptyList(), workerConfig, ConnectRestExtension.class);
doReturn(Arrays.asList("a", "b")).when(herder).connectors();
server = new RestServer(workerConfig);
server = new RestServer(workerConfig, null);
server.initializeServer();
server.initializeResources(herder);
HttpRequest request = new HttpGet("/connectors");

109
connect/runtime/src/test/java/org/apache/kafka/connect/runtime/rest/resources/ConnectorsResourceTest.java

@ -48,7 +48,6 @@ import org.junit.Test; @@ -48,7 +48,6 @@ import org.junit.Test;
import org.junit.runner.RunWith;
import org.mockito.ArgumentCaptor;
import org.mockito.Mock;
import org.mockito.MockedStatic;
import org.mockito.junit.MockitoJUnitRunner;
import org.mockito.stubbing.Stubber;
@ -82,7 +81,6 @@ import static org.mockito.Mockito.doThrow; @@ -82,7 +81,6 @@ import static org.mockito.Mockito.doThrow;
import static org.mockito.Mockito.eq;
import static org.mockito.Mockito.isNull;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.mockStatic;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.verifyNoMoreInteractions;
import static org.mockito.Mockito.when;
@ -156,15 +154,14 @@ public class ConnectorsResourceTest { @@ -156,15 +154,14 @@ public class ConnectorsResourceTest {
private UriInfo forward;
@Mock
private WorkerConfig workerConfig;
private MockedStatic<RestClient> restClientStatic;
@Mock
private RestClient restClient;
@Before
public void setUp() throws NoSuchMethodException {
restClientStatic = mockStatic(RestClient.class);
when(workerConfig.getBoolean(TOPIC_TRACKING_ENABLE_CONFIG)).thenReturn(true);
when(workerConfig.getBoolean(TOPIC_TRACKING_ALLOW_RESET_CONFIG)).thenReturn(true);
connectorsResource = new ConnectorsResource(herder, workerConfig);
connectorsResource = new ConnectorsResource(herder, workerConfig, restClient);
forward = mock(UriInfo.class);
MultivaluedMap<String, String> queryParams = new MultivaluedHashMap<>();
queryParams.putSingle("forward", "true");
@ -173,7 +170,6 @@ public class ConnectorsResourceTest { @@ -173,7 +170,6 @@ public class ConnectorsResourceTest {
@After
public void teardown() {
restClientStatic.close();
verifyNoMoreInteractions(herder);
}
@ -297,11 +293,9 @@ public class ConnectorsResourceTest { @@ -297,11 +293,9 @@ public class ConnectorsResourceTest {
expectAndCallbackNotLeaderException(cb).when(herder)
.putConnectorConfig(eq(CONNECTOR_NAME), eq(body.config()), eq(false), cb.capture());
verifyRestRequestWithCall(
() -> RestClient.httpRequest(eq(LEADER_URL + "connectors?forward=false"), eq("POST"), isNull(), eq(body), any(), any(WorkerConfig.class)),
new RestClient.HttpResponse<>(201, new HashMap<>(), new ConnectorInfo(CONNECTOR_NAME, CONNECTOR_CONFIG, CONNECTOR_TASK_NAMES, ConnectorType.SOURCE)),
() -> connectorsResource.createConnector(FORWARD, NULL_HEADERS, body)
);
when(restClient.httpRequest(eq(LEADER_URL + "connectors?forward=false"), eq("POST"), isNull(), eq(body), any()))
.thenReturn(new RestClient.HttpResponse<>(201, new HashMap<>(), new ConnectorInfo(CONNECTOR_NAME, CONNECTOR_CONFIG, CONNECTOR_TASK_NAMES, ConnectorType.SOURCE)));
connectorsResource.createConnector(FORWARD, NULL_HEADERS, body);
}
@Test
@ -312,11 +306,9 @@ public class ConnectorsResourceTest { @@ -312,11 +306,9 @@ public class ConnectorsResourceTest {
expectAndCallbackNotLeaderException(cb)
.when(herder).putConnectorConfig(eq(CONNECTOR_NAME), eq(body.config()), eq(false), cb.capture());
verifyRestRequestWithCall(
() -> RestClient.httpRequest(eq(LEADER_URL + "connectors?forward=false"), eq("POST"), eq(httpHeaders), any(), any(), any(WorkerConfig.class)),
new RestClient.HttpResponse<>(202, new HashMap<>(), null),
() -> connectorsResource.createConnector(FORWARD, httpHeaders, body)
);
when(restClient.httpRequest(eq(LEADER_URL + "connectors?forward=false"), eq("POST"), eq(httpHeaders), any(), any()))
.thenReturn(new RestClient.HttpResponse<>(202, new HashMap<>(), null));
connectorsResource.createConnector(FORWARD, httpHeaders, body);
}
@Test
@ -391,14 +383,9 @@ public class ConnectorsResourceTest { @@ -391,14 +383,9 @@ public class ConnectorsResourceTest {
expectAndCallbackNotLeaderException(cb).when(herder)
.deleteConnectorConfig(eq(CONNECTOR_NAME), cb.capture());
// Should forward request
verifyRestRequestWithCall(
() -> RestClient.httpRequest(LEADER_URL + "connectors/" + CONNECTOR_NAME + "?forward=false", "DELETE", NULL_HEADERS, null, null, workerConfig),
new RestClient.HttpResponse<>(204, new HashMap<>(), null),
() -> {
connectorsResource.destroyConnector(CONNECTOR_NAME, NULL_HEADERS, FORWARD);
return null;
}
);
when(restClient.httpRequest(LEADER_URL + "connectors/" + CONNECTOR_NAME + "?forward=false", "DELETE", NULL_HEADERS, null, null))
.thenReturn(new RestClient.HttpResponse<>(204, new HashMap<>(), null));
connectorsResource.destroyConnector(CONNECTOR_NAME, NULL_HEADERS, FORWARD);
}
// Not found exceptions should pass through to caller so they can be processed for 404s
@ -657,12 +644,9 @@ public class ConnectorsResourceTest { @@ -657,12 +644,9 @@ public class ConnectorsResourceTest {
expectAndCallbackNotLeaderException(cb).when(herder)
.restartConnectorAndTasks(eq(restartRequest), cb.capture());
Response response = verifyRestRequestWithCall(
() -> RestClient.httpRequest(eq(LEADER_URL + "connectors/" + CONNECTOR_NAME + "/restart?forward=true&includeTasks=" + restartRequest.includeTasks() + "&onlyFailed=" + restartRequest.onlyFailed()),
eq("POST"), isNull(), isNull(), any(), any(WorkerConfig.class)),
new RestClient.HttpResponse<>(202, new HashMap<>(), null),
() -> connectorsResource.restartConnector(CONNECTOR_NAME, NULL_HEADERS, restartRequest.includeTasks(), restartRequest.onlyFailed(), null)
);
when(restClient.httpRequest(eq(LEADER_URL + "connectors/" + CONNECTOR_NAME + "/restart?forward=true&includeTasks=" + restartRequest.includeTasks() + "&onlyFailed=" + restartRequest.onlyFailed()), eq("POST"), isNull(), isNull(), any()))
.thenReturn(new RestClient.HttpResponse<>(202, new HashMap<>(), null));
Response response = connectorsResource.restartConnector(CONNECTOR_NAME, NULL_HEADERS, restartRequest.includeTasks(), restartRequest.onlyFailed(), null);
assertEquals(Response.Status.ACCEPTED.getStatusCode(), response.getStatus());
}
@ -765,11 +749,9 @@ public class ConnectorsResourceTest { @@ -765,11 +749,9 @@ public class ConnectorsResourceTest {
expectAndCallbackNotLeaderException(cb).when(herder)
.restartConnector(eq(CONNECTOR_NAME), cb.capture());
Response response = verifyRestRequestWithCall(
() -> RestClient.httpRequest(eq(LEADER_URL + "connectors/" + CONNECTOR_NAME + "/restart?forward=true"), eq("POST"), isNull(), isNull(), any(), any(WorkerConfig.class)),
new RestClient.HttpResponse<>(202, new HashMap<>(), null),
() -> connectorsResource.restartConnector(CONNECTOR_NAME, NULL_HEADERS, false, false, null)
);
when(restClient.httpRequest(eq(LEADER_URL + "connectors/" + CONNECTOR_NAME + "/restart?forward=true"), eq("POST"), isNull(), isNull(), any()))
.thenReturn(new RestClient.HttpResponse<>(202, new HashMap<>(), null));
Response response = connectorsResource.restartConnector(CONNECTOR_NAME, NULL_HEADERS, false, false, null);
assertEquals(Response.Status.NO_CONTENT.getStatusCode(), response.getStatus());
}
@ -779,11 +761,9 @@ public class ConnectorsResourceTest { @@ -779,11 +761,9 @@ public class ConnectorsResourceTest {
String ownerUrl = "http://owner:8083";
expectAndCallbackException(cb, new NotAssignedException("not owner test", ownerUrl))
.when(herder).restartConnector(eq(CONNECTOR_NAME), cb.capture());
Response response = verifyRestRequestWithCall(
() -> RestClient.httpRequest(eq("http://owner:8083/connectors/" + CONNECTOR_NAME + "/restart?forward=false"), eq("POST"), isNull(), isNull(), any(), any(WorkerConfig.class)),
new RestClient.HttpResponse<>(202, new HashMap<>(), null),
() -> connectorsResource.restartConnector(CONNECTOR_NAME, NULL_HEADERS, false, false, true)
);
when(restClient.httpRequest(eq("http://owner:8083/connectors/" + CONNECTOR_NAME + "/restart?forward=false"), eq("POST"), isNull(), isNull(), any()))
.thenReturn(new RestClient.HttpResponse<>(202, new HashMap<>(), null));
Response response = connectorsResource.restartConnector(CONNECTOR_NAME, NULL_HEADERS, false, false, true);
assertEquals(Response.Status.NO_CONTENT.getStatusCode(), response.getStatus());
}
@ -805,14 +785,9 @@ public class ConnectorsResourceTest { @@ -805,14 +785,9 @@ public class ConnectorsResourceTest {
expectAndCallbackNotLeaderException(cb).when(herder)
.restartTask(eq(taskId), cb.capture());
verifyRestRequestWithCall(
() -> RestClient.httpRequest(eq(LEADER_URL + "connectors/" + CONNECTOR_NAME + "/tasks/0/restart?forward=true"), eq("POST"), isNull(), isNull(), any(), any(WorkerConfig.class)),
new RestClient.HttpResponse<>(202, new HashMap<>(), null),
() -> {
connectorsResource.restartTask(CONNECTOR_NAME, 0, NULL_HEADERS, null);
return null;
}
);
when(restClient.httpRequest(eq(LEADER_URL + "connectors/" + CONNECTOR_NAME + "/tasks/0/restart?forward=true"), eq("POST"), isNull(), isNull(), any()))
.thenReturn(new RestClient.HttpResponse<>(202, new HashMap<>(), null));
connectorsResource.restartTask(CONNECTOR_NAME, 0, NULL_HEADERS, null);
}
@Test
@ -824,21 +799,16 @@ public class ConnectorsResourceTest { @@ -824,21 +799,16 @@ public class ConnectorsResourceTest {
expectAndCallbackException(cb, new NotAssignedException("not owner test", ownerUrl))
.when(herder).restartTask(eq(taskId), cb.capture());
verifyRestRequestWithCall(
() -> RestClient.httpRequest(eq("http://owner:8083/connectors/" + CONNECTOR_NAME + "/tasks/0/restart?forward=false"), eq("POST"), isNull(), isNull(), any(), any(WorkerConfig.class)),
new RestClient.HttpResponse<>(202, new HashMap<>(), null),
() -> {
connectorsResource.restartTask(CONNECTOR_NAME, 0, NULL_HEADERS, true);
return null;
}
);
when(restClient.httpRequest(eq("http://owner:8083/connectors/" + CONNECTOR_NAME + "/tasks/0/restart?forward=false"), eq("POST"), isNull(), isNull(), any()))
.thenReturn(new RestClient.HttpResponse<>(202, new HashMap<>(), null));
connectorsResource.restartTask(CONNECTOR_NAME, 0, NULL_HEADERS, true);
}
@Test
public void testConnectorActiveTopicsWithTopicTrackingDisabled() {
when(workerConfig.getBoolean(TOPIC_TRACKING_ENABLE_CONFIG)).thenReturn(false);
when(workerConfig.getBoolean(TOPIC_TRACKING_ALLOW_RESET_CONFIG)).thenReturn(false);
connectorsResource = new ConnectorsResource(herder, workerConfig);
connectorsResource = new ConnectorsResource(herder, workerConfig, restClient);
Exception e = assertThrows(ConnectRestException.class,
() -> connectorsResource.getConnectorActiveTopics(CONNECTOR_NAME));
@ -850,7 +820,7 @@ public class ConnectorsResourceTest { @@ -850,7 +820,7 @@ public class ConnectorsResourceTest {
when(workerConfig.getBoolean(TOPIC_TRACKING_ENABLE_CONFIG)).thenReturn(false);
when(workerConfig.getBoolean(TOPIC_TRACKING_ALLOW_RESET_CONFIG)).thenReturn(true);
HttpHeaders headers = mock(HttpHeaders.class);
connectorsResource = new ConnectorsResource(herder, workerConfig);
connectorsResource = new ConnectorsResource(herder, workerConfig, restClient);
Exception e = assertThrows(ConnectRestException.class,
() -> connectorsResource.resetConnectorActiveTopics(CONNECTOR_NAME, headers));
@ -862,7 +832,7 @@ public class ConnectorsResourceTest { @@ -862,7 +832,7 @@ public class ConnectorsResourceTest {
when(workerConfig.getBoolean(TOPIC_TRACKING_ENABLE_CONFIG)).thenReturn(true);
when(workerConfig.getBoolean(TOPIC_TRACKING_ALLOW_RESET_CONFIG)).thenReturn(false);
HttpHeaders headers = mock(HttpHeaders.class);
connectorsResource = new ConnectorsResource(herder, workerConfig);
connectorsResource = new ConnectorsResource(herder, workerConfig, restClient);
Exception e = assertThrows(ConnectRestException.class,
() -> connectorsResource.resetConnectorActiveTopics(CONNECTOR_NAME, headers));
@ -875,7 +845,7 @@ public class ConnectorsResourceTest { @@ -875,7 +845,7 @@ public class ConnectorsResourceTest {
when(workerConfig.getBoolean(TOPIC_TRACKING_ALLOW_RESET_CONFIG)).thenReturn(true);
when(herder.connectorActiveTopics(CONNECTOR_NAME))
.thenReturn(new ActiveTopicsInfo(CONNECTOR_NAME, CONNECTOR_ACTIVE_TOPICS));
connectorsResource = new ConnectorsResource(herder, workerConfig);
connectorsResource = new ConnectorsResource(herder, workerConfig, restClient);
Response response = connectorsResource.getConnectorActiveTopics(CONNECTOR_NAME);
assertEquals(Response.Status.OK.getStatusCode(), response.getStatus());
@ -888,7 +858,7 @@ public class ConnectorsResourceTest { @@ -888,7 +858,7 @@ public class ConnectorsResourceTest {
@Test
public void testResetConnectorActiveTopics() {
HttpHeaders headers = mock(HttpHeaders.class);
connectorsResource = new ConnectorsResource(herder, workerConfig);
connectorsResource = new ConnectorsResource(herder, workerConfig, restClient);
Response response = connectorsResource.resetConnectorActiveTopics(CONNECTOR_NAME, headers);
verify(herder).resetConnectorActiveTopics(CONNECTOR_NAME);
@ -934,19 +904,4 @@ public class ConnectorsResourceTest { @@ -934,19 +904,4 @@ public class ConnectorsResourceTest {
T run() throws Throwable;
}
/**
* Used to verify that a provided restCall actually gets executed with a given RestClient verification
* @param verification The RestClient method being mocked
* @param mockReturn What the mocked method returns
* @param restCall The call against the RestClient to execute
* @return The return value of restCall
*/
private <T> T verifyRestRequestWithCall(MockedStatic.Verification verification,
RestClient.HttpResponse<Object> mockReturn,
RunnableWithThrowable<T> restCall) throws Throwable {
restClientStatic.when(verification).thenReturn(mockReturn);
final T value = restCall.run();
restClientStatic.verify(verification);
return value;
}
}

Loading…
Cancel
Save