Browse Source
Reviewers: Daniel Urban <durban@cloudera.com>, Greg Harris <greg.harris@aiven.io>, Viktor Somogyi-Vass <viktorsomogyi@gmail.com>, Mickael Maison <mickael.maison@gmail.com>pull/13226/head
Chris Egerton
2 years ago
committed by
GitHub
34 changed files with 2014 additions and 852 deletions
@ -0,0 +1,59 @@
@@ -0,0 +1,59 @@
|
||||
/* |
||||
* Licensed to the Apache Software Foundation (ASF) under one or more |
||||
* contributor license agreements. See the NOTICE file distributed with |
||||
* this work for additional information regarding copyright ownership. |
||||
* The ASF licenses this file to You under the Apache License, Version 2.0 |
||||
* (the "License"); you may not use this file except in compliance with |
||||
* the License. You may obtain a copy of the License at |
||||
* |
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
* |
||||
* Unless required by applicable law or agreed to in writing, software |
||||
* distributed under the License is distributed on an "AS IS" BASIS, |
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
||||
* See the License for the specific language governing permissions and |
||||
* limitations under the License. |
||||
*/ |
||||
package org.apache.kafka.connect.mirror.rest; |
||||
|
||||
import org.apache.kafka.connect.mirror.SourceAndTarget; |
||||
import org.apache.kafka.connect.mirror.rest.resources.InternalMirrorResource; |
||||
import org.apache.kafka.connect.runtime.Herder; |
||||
import org.apache.kafka.connect.runtime.rest.RestClient; |
||||
import org.apache.kafka.connect.runtime.rest.RestServer; |
||||
import org.apache.kafka.connect.runtime.rest.RestServerConfig; |
||||
import org.apache.kafka.connect.runtime.rest.resources.ConnectResource; |
||||
|
||||
import java.util.Arrays; |
||||
import java.util.Collection; |
||||
import java.util.Collections; |
||||
import java.util.Map; |
||||
|
||||
public class MirrorRestServer extends RestServer { |
||||
|
||||
private final RestClient restClient; |
||||
private Map<SourceAndTarget, Herder> herders; |
||||
|
||||
public MirrorRestServer(Map<?, ?> props, RestClient restClient) { |
||||
super(RestServerConfig.forInternal(props)); |
||||
this.restClient = restClient; |
||||
} |
||||
|
||||
public void initializeInternalResources(Map<SourceAndTarget, Herder> herders) { |
||||
this.herders = herders; |
||||
super.initializeResources(); |
||||
} |
||||
|
||||
@Override |
||||
protected Collection<ConnectResource> regularResources() { |
||||
return Arrays.asList( |
||||
new InternalMirrorResource(herders, restClient) |
||||
); |
||||
} |
||||
|
||||
@Override |
||||
protected Collection<ConnectResource> adminResources() { |
||||
return Collections.emptyList(); |
||||
} |
||||
|
||||
} |
@ -0,0 +1,65 @@
@@ -0,0 +1,65 @@
|
||||
/* |
||||
* Licensed to the Apache Software Foundation (ASF) under one or more |
||||
* contributor license agreements. See the NOTICE file distributed with |
||||
* this work for additional information regarding copyright ownership. |
||||
* The ASF licenses this file to You under the Apache License, Version 2.0 |
||||
* (the "License"); you may not use this file except in compliance with |
||||
* the License. You may obtain a copy of the License at |
||||
* |
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
* |
||||
* Unless required by applicable law or agreed to in writing, software |
||||
* distributed under the License is distributed on an "AS IS" BASIS, |
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
||||
* See the License for the specific language governing permissions and |
||||
* limitations under the License. |
||||
*/ |
||||
package org.apache.kafka.connect.mirror.rest.resources; |
||||
|
||||
import org.apache.kafka.connect.mirror.SourceAndTarget; |
||||
import org.apache.kafka.connect.runtime.Herder; |
||||
import org.apache.kafka.connect.runtime.rest.RestClient; |
||||
import org.apache.kafka.connect.runtime.rest.resources.InternalClusterResource; |
||||
import org.slf4j.Logger; |
||||
import org.slf4j.LoggerFactory; |
||||
|
||||
import javax.ws.rs.NotFoundException; |
||||
import javax.ws.rs.Path; |
||||
import javax.ws.rs.core.Context; |
||||
import javax.ws.rs.core.UriInfo; |
||||
import java.util.Map; |
||||
|
||||
@Path("/{source}/{target}/connectors") |
||||
public class InternalMirrorResource extends InternalClusterResource { |
||||
|
||||
@Context |
||||
private UriInfo uriInfo; |
||||
|
||||
private static final Logger log = LoggerFactory.getLogger(InternalMirrorResource.class); |
||||
|
||||
private final Map<SourceAndTarget, Herder> herders; |
||||
|
||||
public InternalMirrorResource(Map<SourceAndTarget, Herder> herders, RestClient restClient) { |
||||
super(restClient); |
||||
this.herders = herders; |
||||
} |
||||
|
||||
@Override |
||||
protected Herder herderForRequest() { |
||||
String source = pathParam("source"); |
||||
String target = pathParam("target"); |
||||
Herder result = herders.get(new SourceAndTarget(source, target)); |
||||
if (result == null) { |
||||
throw new NotFoundException("No replication flow found for source '" + source + "' and target '" + target + "'"); |
||||
} |
||||
return result; |
||||
} |
||||
|
||||
private String pathParam(String name) { |
||||
String result = uriInfo.getPathParameters().getFirst(name); |
||||
if (result == null) |
||||
throw new NotFoundException("Could not parse " + name + " cluster from request path"); |
||||
return result; |
||||
} |
||||
|
||||
} |
@ -0,0 +1,290 @@
@@ -0,0 +1,290 @@
|
||||
/* |
||||
* Licensed to the Apache Software Foundation (ASF) under one or more |
||||
* contributor license agreements. See the NOTICE file distributed with |
||||
* this work for additional information regarding copyright ownership. |
||||
* The ASF licenses this file to You under the Apache License, Version 2.0 |
||||
* (the "License"); you may not use this file except in compliance with |
||||
* the License. You may obtain a copy of the License at |
||||
* |
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
* |
||||
* Unless required by applicable law or agreed to in writing, software |
||||
* distributed under the License is distributed on an "AS IS" BASIS, |
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
||||
* See the License for the specific language governing permissions and |
||||
* limitations under the License. |
||||
*/ |
||||
package org.apache.kafka.connect.mirror.integration; |
||||
|
||||
import org.apache.kafka.clients.admin.Admin; |
||||
import org.apache.kafka.clients.admin.NewTopic; |
||||
import org.apache.kafka.clients.consumer.Consumer; |
||||
import org.apache.kafka.clients.consumer.ConsumerRecords; |
||||
import org.apache.kafka.common.utils.Utils; |
||||
import org.apache.kafka.connect.mirror.MirrorMaker; |
||||
import org.apache.kafka.connect.util.clusters.EmbeddedKafkaCluster; |
||||
import org.junit.jupiter.api.AfterEach; |
||||
import org.junit.jupiter.api.BeforeEach; |
||||
import org.junit.jupiter.api.Tag; |
||||
import org.junit.jupiter.api.Test; |
||||
import org.slf4j.Logger; |
||||
import org.slf4j.LoggerFactory; |
||||
|
||||
import java.time.Duration; |
||||
import java.util.Collections; |
||||
import java.util.HashMap; |
||||
import java.util.Map; |
||||
import java.util.Properties; |
||||
import java.util.Set; |
||||
import java.util.concurrent.atomic.AtomicInteger; |
||||
import java.util.concurrent.atomic.AtomicReference; |
||||
|
||||
import static org.apache.kafka.clients.consumer.ConsumerConfig.AUTO_OFFSET_RESET_CONFIG; |
||||
import static org.apache.kafka.test.TestUtils.waitForCondition; |
||||
|
||||
@Tag("integration") |
||||
public class DedicatedMirrorIntegrationTest { |
||||
|
||||
private static final Logger log = LoggerFactory.getLogger(DedicatedMirrorIntegrationTest.class); |
||||
|
||||
private static final int TOPIC_CREATION_TIMEOUT_MS = 30_000; |
||||
private static final int TOPIC_REPLICATION_TIMEOUT_MS = 30_000; |
||||
|
||||
private Map<String, EmbeddedKafkaCluster> kafkaClusters; |
||||
private Map<String, MirrorMaker> mirrorMakers; |
||||
|
||||
@BeforeEach |
||||
public void setup() { |
||||
kafkaClusters = new HashMap<>(); |
||||
mirrorMakers = new HashMap<>(); |
||||
} |
||||
|
||||
@AfterEach |
||||
public void teardown() throws Throwable { |
||||
AtomicReference<Throwable> shutdownFailure = new AtomicReference<>(); |
||||
mirrorMakers.forEach((name, mirrorMaker) -> |
||||
Utils.closeQuietly(mirrorMaker::stop, "MirrorMaker worker '" + name + "'", shutdownFailure) |
||||
); |
||||
kafkaClusters.forEach((name, kafkaCluster) -> |
||||
Utils.closeQuietly(kafkaCluster::stop, "Embedded Kafka cluster '" + name + "'", shutdownFailure) |
||||
); |
||||
if (shutdownFailure.get() != null) { |
||||
throw shutdownFailure.get(); |
||||
} |
||||
} |
||||
|
||||
private EmbeddedKafkaCluster startKafkaCluster(String name, int numBrokers, Properties brokerProperties) { |
||||
if (kafkaClusters.containsKey(name)) |
||||
throw new IllegalStateException("Cannot register multiple Kafka clusters with the same name"); |
||||
|
||||
EmbeddedKafkaCluster result = new EmbeddedKafkaCluster(numBrokers, brokerProperties); |
||||
kafkaClusters.put(name, result); |
||||
|
||||
result.start(); |
||||
|
||||
return result; |
||||
} |
||||
|
||||
private MirrorMaker startMirrorMaker(String name, Map<String, String> mmProps) { |
||||
if (mirrorMakers.containsKey(name)) |
||||
throw new IllegalStateException("Cannot register multiple MirrorMaker nodes with the same name"); |
||||
|
||||
MirrorMaker result = new MirrorMaker(mmProps); |
||||
mirrorMakers.put(name, result); |
||||
|
||||
result.start(); |
||||
|
||||
return result; |
||||
} |
||||
|
||||
/** |
||||
* Tests a single-node cluster without the REST server enabled. |
||||
*/ |
||||
@Test |
||||
public void testSingleNodeCluster() throws Exception { |
||||
Properties brokerProps = new Properties(); |
||||
EmbeddedKafkaCluster clusterA = startKafkaCluster("A", 1, brokerProps); |
||||
EmbeddedKafkaCluster clusterB = startKafkaCluster("B", 1, brokerProps); |
||||
|
||||
clusterA.start(); |
||||
clusterB.start(); |
||||
|
||||
try (Admin adminA = clusterA.createAdminClient(); |
||||
Admin adminB = clusterB.createAdminClient()) { |
||||
|
||||
// Cluster aliases
|
||||
final String a = "A"; |
||||
final String b = "B"; |
||||
final String ab = a + "->" + b; |
||||
final String ba = b + "->" + a; |
||||
final String testTopicPrefix = "test-topic-"; |
||||
|
||||
Map<String, String> mmProps = new HashMap<String, String>() {{ |
||||
put("dedicated.mode.enable.internal.rest", "false"); |
||||
put("listeners", "http://localhost:0"); |
||||
// Refresh topics very frequently to quickly pick up on topics that are created
|
||||
// after the MM2 nodes are brought up during testing
|
||||
put("refresh.topics.interval.seconds", "1"); |
||||
put("clusters", String.join(", ", a, b)); |
||||
put(a + ".bootstrap.servers", clusterA.bootstrapServers()); |
||||
put(b + ".bootstrap.servers", clusterB.bootstrapServers()); |
||||
put(ab + ".enabled", "true"); |
||||
put(ab + ".topics", "^" + testTopicPrefix + ".*"); |
||||
put(ba + ".enabled", "false"); |
||||
put(ba + ".emit.heartbeats.enabled", "false"); |
||||
put("replication.factor", "1"); |
||||
put("checkpoints.topic.replication.factor", "1"); |
||||
put("heartbeats.topic.replication.factor", "1"); |
||||
put("offset-syncs.topic.replication.factor", "1"); |
||||
put("offset.storage.replication.factor", "1"); |
||||
put("status.storage.replication.factor", "1"); |
||||
put("config.storage.replication.factor", "1"); |
||||
}}; |
||||
|
||||
// Bring up a single-node cluster
|
||||
startMirrorMaker("single node", mmProps); |
||||
|
||||
final int numMessages = 10; |
||||
String topic = testTopicPrefix + "1"; |
||||
|
||||
// Create the topic on cluster A
|
||||
createTopic(adminA, topic); |
||||
// and wait for MirrorMaker to create it on cluster B
|
||||
awaitTopicCreation(b, adminB, a + "." + topic); |
||||
|
||||
// Write data to the topic on cluster A
|
||||
writeToTopic(clusterA, topic, numMessages); |
||||
// and wait for MirrorMaker to copy it to cluster B
|
||||
awaitTopicContent(clusterB, b, a + "." + topic, numMessages); |
||||
} |
||||
} |
||||
|
||||
/** |
||||
* Test that a multi-node dedicated cluster is able to dynamically detect new topics at runtime |
||||
* and reconfigure its connectors and their tasks to replicate those topics correctly. |
||||
* See <a href="https://cwiki.apache.org/confluence/display/KAFKA/KIP-710%3A+Full+support+for+distributed+mode+in+dedicated+MirrorMaker+2.0+clusters">KIP-710</a> |
||||
* for more detail on the necessity for this test case. |
||||
*/ |
||||
@Test |
||||
public void testMultiNodeCluster() throws Exception { |
||||
Properties brokerProps = new Properties(); |
||||
brokerProps.put("transaction.state.log.replication.factor", "1"); |
||||
brokerProps.put("transaction.state.log.min.isr", "1"); |
||||
EmbeddedKafkaCluster clusterA = startKafkaCluster("A", 1, brokerProps); |
||||
EmbeddedKafkaCluster clusterB = startKafkaCluster("B", 1, brokerProps); |
||||
|
||||
clusterA.start(); |
||||
clusterB.start(); |
||||
|
||||
try (Admin adminA = clusterA.createAdminClient(); |
||||
Admin adminB = clusterB.createAdminClient()) { |
||||
|
||||
// Cluster aliases
|
||||
final String a = "A"; |
||||
// Use a convoluted cluster name to ensure URL encoding/decoding works
|
||||
final String b = "B- ._~:/?#[]@!$&'()*+;=\"<>%{}|\\^`618"; |
||||
final String ab = a + "->" + b; |
||||
final String ba = b + "->" + a; |
||||
final String testTopicPrefix = "test-topic-"; |
||||
|
||||
Map<String, String> mmProps = new HashMap<String, String>() {{ |
||||
put("dedicated.mode.enable.internal.rest", "true"); |
||||
put("listeners", "http://localhost:0"); |
||||
// Refresh topics very frequently to quickly pick up on topics that are created
|
||||
// after the MM2 nodes are brought up during testing
|
||||
put("refresh.topics.interval.seconds", "1"); |
||||
put("clusters", String.join(", ", a, b)); |
||||
put(a + ".bootstrap.servers", clusterA.bootstrapServers()); |
||||
put(b + ".bootstrap.servers", clusterB.bootstrapServers()); |
||||
// Enable exactly-once support to both validate that MirrorMaker can run with
|
||||
// that feature turned on, and to force cross-worker communication before
|
||||
// task startup
|
||||
put(a + ".exactly.once.source.support", "enabled"); |
||||
put(ab + ".enabled", "true"); |
||||
put(ab + ".topics", "^" + testTopicPrefix + ".*"); |
||||
// The name of the offset syncs topic will contain the name of the cluster in
|
||||
// the replication flow that it is _not_ hosted on; create the offset syncs topic
|
||||
// on the target cluster so that its name will contain the source cluster's name
|
||||
// (since the target cluster's name contains characters that are not valid for
|
||||
// use in a topic name)
|
||||
put(ab + ".offset-syncs.topic.location", "target"); |
||||
// Disable b -> a (and heartbeats from it) so that no topics are created that use
|
||||
// the target cluster's name
|
||||
put(ba + ".enabled", "false"); |
||||
put(ba + ".emit.heartbeats.enabled", "false"); |
||||
put("replication.factor", "1"); |
||||
put("checkpoints.topic.replication.factor", "1"); |
||||
put("heartbeats.topic.replication.factor", "1"); |
||||
put("offset-syncs.topic.replication.factor", "1"); |
||||
put("offset.storage.replication.factor", "1"); |
||||
put("status.storage.replication.factor", "1"); |
||||
put("config.storage.replication.factor", "1"); |
||||
}}; |
||||
|
||||
// Bring up a three-node cluster
|
||||
final int numNodes = 3; |
||||
for (int i = 0; i < numNodes; i++) { |
||||
startMirrorMaker("node " + i, mmProps); |
||||
} |
||||
|
||||
// Create one topic per Kafka cluster per MirrorMaker node
|
||||
final int topicsPerCluster = numNodes; |
||||
final int messagesPerTopic = 10; |
||||
for (int i = 0; i < topicsPerCluster; i++) { |
||||
String topic = testTopicPrefix + i; |
||||
|
||||
// Create the topic on cluster A
|
||||
createTopic(adminA, topic); |
||||
// and wait for MirrorMaker to create it on cluster B
|
||||
awaitTopicCreation(b, adminB, a + "." + topic); |
||||
|
||||
// Write data to the topic on cluster A
|
||||
writeToTopic(clusterA, topic, messagesPerTopic); |
||||
// and wait for MirrorMaker to copy it to cluster B
|
||||
awaitTopicContent(clusterB, b, a + "." + topic, messagesPerTopic); |
||||
} |
||||
} |
||||
} |
||||
|
||||
private void createTopic(Admin admin, String name) throws Exception { |
||||
admin.createTopics(Collections.singleton(new NewTopic(name, 1, (short) 1))).all().get(); |
||||
} |
||||
|
||||
private void awaitTopicCreation(String clusterName, Admin admin, String topic) throws Exception { |
||||
waitForCondition( |
||||
() -> { |
||||
try { |
||||
Set<String> allTopics = admin.listTopics().names().get(); |
||||
return allTopics.contains(topic); |
||||
} catch (Exception e) { |
||||
log.debug("Failed to check for existence of topic {} on cluster {}", topic, clusterName, e); |
||||
return false; |
||||
} |
||||
}, |
||||
TOPIC_CREATION_TIMEOUT_MS, |
||||
"topic " + topic + " was not created on cluster " + clusterName + " in time" |
||||
); |
||||
} |
||||
|
||||
private void writeToTopic(EmbeddedKafkaCluster cluster, String topic, int numMessages) { |
||||
for (int i = 0; i <= numMessages; i++) { |
||||
cluster.produce(topic, Integer.toString(i)); |
||||
} |
||||
} |
||||
|
||||
private void awaitTopicContent(EmbeddedKafkaCluster cluster, String clusterName, String topic, int numMessages) throws Exception { |
||||
try (Consumer<?, ?> consumer = cluster.createConsumer(Collections.singletonMap(AUTO_OFFSET_RESET_CONFIG, "earliest"))) { |
||||
consumer.subscribe(Collections.singleton(topic)); |
||||
AtomicInteger messagesRead = new AtomicInteger(0); |
||||
waitForCondition( |
||||
() -> { |
||||
ConsumerRecords<?, ?> records = consumer.poll(Duration.ofSeconds(1)); |
||||
return messagesRead.addAndGet(records.count()) >= numMessages; |
||||
}, |
||||
TOPIC_REPLICATION_TIMEOUT_MS, |
||||
() -> "could not read " + numMessages + " from topic " + topic + " on cluster " + clusterName + " in time; only read " + messagesRead.get() |
||||
); |
||||
} |
||||
} |
||||
|
||||
} |
@ -0,0 +1,69 @@
@@ -0,0 +1,69 @@
|
||||
/* |
||||
* Licensed to the Apache Software Foundation (ASF) under one or more |
||||
* contributor license agreements. See the NOTICE file distributed with |
||||
* this work for additional information regarding copyright ownership. |
||||
* The ASF licenses this file to You under the Apache License, Version 2.0 |
||||
* (the "License"); you may not use this file except in compliance with |
||||
* the License. You may obtain a copy of the License at |
||||
* |
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
* |
||||
* Unless required by applicable law or agreed to in writing, software |
||||
* distributed under the License is distributed on an "AS IS" BASIS, |
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
||||
* See the License for the specific language governing permissions and |
||||
* limitations under the License. |
||||
*/ |
||||
package org.apache.kafka.connect.runtime.rest; |
||||
|
||||
import org.apache.kafka.connect.runtime.Herder; |
||||
import org.apache.kafka.connect.runtime.rest.resources.ConnectResource; |
||||
import org.apache.kafka.connect.runtime.rest.resources.ConnectorPluginsResource; |
||||
import org.apache.kafka.connect.runtime.rest.resources.ConnectorsResource; |
||||
import org.apache.kafka.connect.runtime.rest.resources.InternalConnectResource; |
||||
import org.apache.kafka.connect.runtime.rest.resources.LoggingResource; |
||||
import org.apache.kafka.connect.runtime.rest.resources.RootResource; |
||||
import org.glassfish.jersey.server.ResourceConfig; |
||||
|
||||
import java.util.Arrays; |
||||
import java.util.Collection; |
||||
import java.util.Map; |
||||
|
||||
public class ConnectRestServer extends RestServer { |
||||
|
||||
private final RestClient restClient; |
||||
private Herder herder; |
||||
|
||||
public ConnectRestServer(Integer rebalanceTimeoutMs, RestClient restClient, Map<?, ?> props) { |
||||
super(RestServerConfig.forPublic(rebalanceTimeoutMs, props)); |
||||
this.restClient = restClient; |
||||
} |
||||
|
||||
public void initializeResources(Herder herder) { |
||||
this.herder = herder; |
||||
super.initializeResources(); |
||||
} |
||||
|
||||
@Override |
||||
protected Collection<ConnectResource> regularResources() { |
||||
return Arrays.asList( |
||||
new RootResource(herder), |
||||
new ConnectorsResource(herder, config, restClient), |
||||
new InternalConnectResource(herder, restClient), |
||||
new ConnectorPluginsResource(herder) |
||||
); |
||||
} |
||||
|
||||
@Override |
||||
protected Collection<ConnectResource> adminResources() { |
||||
return Arrays.asList( |
||||
new LoggingResource() |
||||
); |
||||
} |
||||
|
||||
@Override |
||||
protected void configureRegularResources(ResourceConfig resourceConfig) { |
||||
registerRestExtensions(herder, resourceConfig); |
||||
} |
||||
|
||||
} |
@ -0,0 +1,142 @@
@@ -0,0 +1,142 @@
|
||||
/* |
||||
* Licensed to the Apache Software Foundation (ASF) under one or more |
||||
* contributor license agreements. See the NOTICE file distributed with |
||||
* this work for additional information regarding copyright ownership. |
||||
* The ASF licenses this file to You under the Apache License, Version 2.0 |
||||
* (the "License"); you may not use this file except in compliance with |
||||
* the License. You may obtain a copy of the License at |
||||
* |
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
* |
||||
* Unless required by applicable law or agreed to in writing, software |
||||
* distributed under the License is distributed on an "AS IS" BASIS, |
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
||||
* See the License for the specific language governing permissions and |
||||
* limitations under the License. |
||||
*/ |
||||
package org.apache.kafka.connect.runtime.rest; |
||||
|
||||
import com.fasterxml.jackson.core.type.TypeReference; |
||||
import org.apache.kafka.connect.runtime.distributed.RebalanceNeededException; |
||||
import org.apache.kafka.connect.runtime.distributed.RequestTargetException; |
||||
import org.apache.kafka.connect.runtime.rest.errors.ConnectRestException; |
||||
import org.apache.kafka.connect.util.FutureCallback; |
||||
import org.slf4j.Logger; |
||||
import org.slf4j.LoggerFactory; |
||||
|
||||
import javax.ws.rs.core.HttpHeaders; |
||||
import javax.ws.rs.core.Response; |
||||
import javax.ws.rs.core.UriBuilder; |
||||
import java.util.Map; |
||||
import java.util.concurrent.ExecutionException; |
||||
import java.util.concurrent.TimeUnit; |
||||
import java.util.concurrent.TimeoutException; |
||||
|
||||
public class HerderRequestHandler { |
||||
|
||||
private static final Logger log = LoggerFactory.getLogger(HerderRequestHandler.class); |
||||
|
||||
private final RestClient restClient; |
||||
|
||||
private long requestTimeoutMs; |
||||
|
||||
public HerderRequestHandler(RestClient restClient, long requestTimeoutMs) { |
||||
this.restClient = restClient; |
||||
this.requestTimeoutMs = requestTimeoutMs; |
||||
} |
||||
|
||||
public void requestTimeoutMs(long requestTimeoutMs) { |
||||
if (requestTimeoutMs < 1) { |
||||
throw new IllegalArgumentException("REST request timeout must be positive"); |
||||
} |
||||
this.requestTimeoutMs = requestTimeoutMs; |
||||
} |
||||
|
||||
/** |
||||
* Wait for a FutureCallback to complete. If it succeeds, return the parsed response. If it fails, try to forward the |
||||
* request to the leader. |
||||
*/ |
||||
public <T, U> T completeOrForwardRequest(FutureCallback<T> cb, |
||||
String path, |
||||
String method, |
||||
HttpHeaders headers, |
||||
Map<String, String> queryParameters, |
||||
Object body, |
||||
TypeReference<U> resultType, |
||||
Translator<T, U> translator, |
||||
Boolean forward) throws Throwable { |
||||
try { |
||||
return cb.get(requestTimeoutMs, TimeUnit.MILLISECONDS); |
||||
} catch (ExecutionException e) { |
||||
Throwable cause = e.getCause(); |
||||
|
||||
if (cause instanceof RequestTargetException) { |
||||
if (forward == null || forward) { |
||||
// the only time we allow recursive forwarding is when no forward flag has
|
||||
// been set, which should only be seen by the first worker to handle a user request.
|
||||
// this gives two total hops to resolve the request before giving up.
|
||||
boolean recursiveForward = forward == null; |
||||
RequestTargetException targetException = (RequestTargetException) cause; |
||||
String forwardedUrl = targetException.forwardUrl(); |
||||
if (forwardedUrl == null) { |
||||
// the target didn't know of the leader at this moment.
|
||||
throw new ConnectRestException(Response.Status.CONFLICT.getStatusCode(), |
||||
"Cannot complete request momentarily due to no known leader URL, " |
||||
+ "likely because a rebalance was underway."); |
||||
} |
||||
UriBuilder uriBuilder = UriBuilder.fromUri(forwardedUrl) |
||||
.path(path) |
||||
.queryParam("forward", recursiveForward); |
||||
if (queryParameters != null) { |
||||
queryParameters.forEach(uriBuilder::queryParam); |
||||
} |
||||
String forwardUrl = uriBuilder.build().toString(); |
||||
log.debug("Forwarding request {} {} {}", forwardUrl, method, body); |
||||
return translator.translate(restClient.httpRequest(forwardUrl, method, headers, body, resultType)); |
||||
} else { |
||||
// we should find the right target for the query within two hops, so if
|
||||
// we don't, it probably means that a rebalance has taken place.
|
||||
throw new ConnectRestException(Response.Status.CONFLICT.getStatusCode(), |
||||
"Cannot complete request because of a conflicting operation (e.g. worker rebalance)"); |
||||
} |
||||
} else if (cause instanceof RebalanceNeededException) { |
||||
throw new ConnectRestException(Response.Status.CONFLICT.getStatusCode(), |
||||
"Cannot complete request momentarily due to stale configuration (typically caused by a concurrent config change)"); |
||||
} |
||||
|
||||
throw cause; |
||||
} catch (TimeoutException e) { |
||||
// This timeout is for the operation itself. None of the timeout error codes are relevant, so internal server
|
||||
// error is the best option
|
||||
throw new ConnectRestException(Response.Status.INTERNAL_SERVER_ERROR.getStatusCode(), "Request timed out"); |
||||
} catch (InterruptedException e) { |
||||
throw new ConnectRestException(Response.Status.INTERNAL_SERVER_ERROR.getStatusCode(), "Request interrupted"); |
||||
} |
||||
} |
||||
|
||||
public <T, U> T completeOrForwardRequest(FutureCallback<T> cb, String path, String method, HttpHeaders headers, Object body, |
||||
TypeReference<U> resultType, Translator<T, U> translator, Boolean forward) throws Throwable { |
||||
return completeOrForwardRequest(cb, path, method, headers, null, body, resultType, translator, forward); |
||||
} |
||||
|
||||
public <T> T completeOrForwardRequest(FutureCallback<T> cb, String path, String method, HttpHeaders headers, Object body, |
||||
TypeReference<T> resultType, Boolean forward) throws Throwable { |
||||
return completeOrForwardRequest(cb, path, method, headers, body, resultType, new IdentityTranslator<>(), forward); |
||||
} |
||||
|
||||
public <T> T completeOrForwardRequest(FutureCallback<T> cb, String path, String method, HttpHeaders headers, |
||||
Object body, Boolean forward) throws Throwable { |
||||
return completeOrForwardRequest(cb, path, method, headers, body, null, new IdentityTranslator<>(), forward); |
||||
} |
||||
|
||||
public interface Translator<T, U> { |
||||
T translate(RestClient.HttpResponse<U> response); |
||||
} |
||||
|
||||
public static class IdentityTranslator<T> implements Translator<T, T> { |
||||
@Override |
||||
public T translate(RestClient.HttpResponse<T> response) { |
||||
return response.body(); |
||||
} |
||||
} |
||||
} |
@ -0,0 +1,463 @@
@@ -0,0 +1,463 @@
|
||||
/* |
||||
* Licensed to the Apache Software Foundation (ASF) under one or more |
||||
* contributor license agreements. See the NOTICE file distributed with |
||||
* this work for additional information regarding copyright ownership. |
||||
* The ASF licenses this file to You under the Apache License, Version 2.0 |
||||
* (the "License"); you may not use this file except in compliance with |
||||
* the License. You may obtain a copy of the License at |
||||
* |
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
* |
||||
* Unless required by applicable law or agreed to in writing, software |
||||
* distributed under the License is distributed on an "AS IS" BASIS, |
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
||||
* See the License for the specific language governing permissions and |
||||
* limitations under the License. |
||||
*/ |
||||
package org.apache.kafka.connect.runtime.rest; |
||||
|
||||
import org.apache.kafka.common.config.AbstractConfig; |
||||
import org.apache.kafka.common.config.ConfigDef; |
||||
import org.apache.kafka.common.config.ConfigException; |
||||
import org.apache.kafka.common.config.SslClientAuth; |
||||
import org.apache.kafka.common.config.internals.BrokerSecurityConfigs; |
||||
import org.apache.kafka.common.utils.Utils; |
||||
import org.apache.kafka.connect.runtime.WorkerConfig; |
||||
import org.eclipse.jetty.util.StringUtil; |
||||
|
||||
import java.util.Arrays; |
||||
import java.util.Collection; |
||||
import java.util.Collections; |
||||
import java.util.List; |
||||
import java.util.Map; |
||||
|
||||
import static org.apache.kafka.common.config.ConfigDef.ValidString.in; |
||||
|
||||
/** |
||||
* Defines the configuration surface for a {@link RestServer} instance, with support for both |
||||
* {@link #forInternal(Map) internal-only} and {@link #forPublic(Integer, Map) user-facing} |
||||
* servers. An internal-only server will expose only the endpoints and listeners necessary for |
||||
* intra-cluster communication; these include the task-write and zombie-fencing endpoints. A |
||||
* user-facing server will expose these endpoints and, in addition, all endpoints that are part of |
||||
* the public REST API for Kafka Connect; these include the connector creation, connector |
||||
* status, configuration validation, and logging endpoints. In addition, a user-facing server will |
||||
* instantiate any user-configured |
||||
* {@link RestServerConfig#REST_EXTENSION_CLASSES_CONFIG REST extensions}. |
||||
*/ |
||||
public abstract class RestServerConfig extends AbstractConfig { |
||||
|
||||
public static final String LISTENERS_CONFIG = "listeners"; |
||||
private static final String LISTENERS_DOC |
||||
= "List of comma-separated URIs the REST API will listen on. The supported protocols are HTTP and HTTPS.\n" + |
||||
" Specify hostname as 0.0.0.0 to bind to all interfaces.\n" + |
||||
" Leave hostname empty to bind to default interface.\n" + |
||||
" Examples of legal listener lists: HTTP://myhost:8083,HTTPS://myhost:8084"; |
||||
// Visible for testing
|
||||
static final List<String> LISTENERS_DEFAULT = Collections.singletonList("http://:8083"); |
||||
|
||||
public static final String REST_ADVERTISED_HOST_NAME_CONFIG = "rest.advertised.host.name"; |
||||
private static final String REST_ADVERTISED_HOST_NAME_DOC |
||||
= "If this is set, this is the hostname that will be given out to other workers to connect to."; |
||||
|
||||
public static final String REST_ADVERTISED_PORT_CONFIG = "rest.advertised.port"; |
||||
private static final String REST_ADVERTISED_PORT_DOC |
||||
= "If this is set, this is the port that will be given out to other workers to connect to."; |
||||
|
||||
public static final String REST_ADVERTISED_LISTENER_CONFIG = "rest.advertised.listener"; |
||||
private static final String REST_ADVERTISED_LISTENER_DOC |
||||
= "Sets the advertised listener (HTTP or HTTPS) which will be given to other workers to use."; |
||||
|
||||
public static final String ACCESS_CONTROL_ALLOW_ORIGIN_CONFIG = "access.control.allow.origin"; |
||||
private static final String ACCESS_CONTROL_ALLOW_ORIGIN_DOC = |
||||
"Value to set the Access-Control-Allow-Origin header to for REST API requests." + |
||||
"To enable cross origin access, set this to the domain of the application that should be permitted" + |
||||
" to access the API, or '*' to allow access from any domain. The default value only allows access" + |
||||
" from the domain of the REST API."; |
||||
protected static final String ACCESS_CONTROL_ALLOW_ORIGIN_DEFAULT = ""; |
||||
|
||||
public static final String ACCESS_CONTROL_ALLOW_METHODS_CONFIG = "access.control.allow.methods"; |
||||
private static final String ACCESS_CONTROL_ALLOW_METHODS_DOC = |
||||
"Sets the methods supported for cross origin requests by setting the Access-Control-Allow-Methods header. " |
||||
+ "The default value of the Access-Control-Allow-Methods header allows cross origin requests for GET, POST and HEAD."; |
||||
private static final String ACCESS_CONTROL_ALLOW_METHODS_DEFAULT = ""; |
||||
|
||||
public static final String ADMIN_LISTENERS_CONFIG = "admin.listeners"; |
||||
private static final String ADMIN_LISTENERS_DOC = "List of comma-separated URIs the Admin REST API will listen on." + |
||||
" The supported protocols are HTTP and HTTPS." + |
||||
" An empty or blank string will disable this feature." + |
||||
" The default behavior is to use the regular listener (specified by the 'listeners' property)."; |
||||
public static final String ADMIN_LISTENERS_HTTPS_CONFIGS_PREFIX = "admin.listeners.https."; |
||||
|
||||
public static final String REST_EXTENSION_CLASSES_CONFIG = "rest.extension.classes"; |
||||
private static final String REST_EXTENSION_CLASSES_DOC = |
||||
"Comma-separated names of <code>ConnectRestExtension</code> classes, loaded and called " |
||||
+ "in the order specified. Implementing the interface " |
||||
+ "<code>ConnectRestExtension</code> allows you to inject into Connect's REST API user defined resources like filters. " |
||||
+ "Typically used to add custom capability like logging, security, etc. "; |
||||
|
||||
// Visible for testing
|
||||
static final String RESPONSE_HTTP_HEADERS_CONFIG = "response.http.headers.config"; |
||||
// Visible for testing
|
||||
static final String RESPONSE_HTTP_HEADERS_DOC = "Rules for REST API HTTP response headers"; |
||||
// Visible for testing
|
||||
static final String RESPONSE_HTTP_HEADERS_DEFAULT = ""; |
||||
private static final Collection<String> HEADER_ACTIONS = Collections.unmodifiableList( |
||||
Arrays.asList("set", "add", "setDate", "addDate") |
||||
); |
||||
|
||||
|
||||
/** |
||||
* @return the listeners to use for this server, or empty if no admin endpoints should be exposed, |
||||
* or null if the admin endpoints should be exposed on the {@link #listeners() regular listeners} for |
||||
* this server |
||||
*/ |
||||
public abstract List<String> adminListeners(); |
||||
|
||||
/** |
||||
* @return a list of {@link #REST_EXTENSION_CLASSES_CONFIG REST extension} classes |
||||
* to instantiate and use with the server |
||||
*/ |
||||
public abstract List<String> restExtensions(); |
||||
|
||||
/** |
||||
* @return whether {@link WorkerConfig#TOPIC_TRACKING_ENABLE_CONFIG topic tracking} |
||||
* is enabled on this worker |
||||
*/ |
||||
public abstract boolean topicTrackingEnabled(); |
||||
|
||||
/** |
||||
* @return whether {@link WorkerConfig#TOPIC_TRACKING_ALLOW_RESET_CONFIG topic tracking resets} |
||||
* are enabled on this worker |
||||
*/ |
||||
public abstract boolean topicTrackingResetEnabled(); |
||||
|
||||
/** |
||||
* Add the properties related to a user-facing server to the given {@link ConfigDef}. |
||||
* </p> |
||||
* This automatically adds the properties for intra-cluster communication; it is not necessary to |
||||
* invoke both {@link #addInternalConfig(ConfigDef)} and this method on the same {@link ConfigDef}. |
||||
* @param configDef the {@link ConfigDef} to add the properties to; may not be null |
||||
*/ |
||||
public static void addPublicConfig(ConfigDef configDef) { |
||||
addInternalConfig(configDef); |
||||
configDef |
||||
.define( |
||||
REST_EXTENSION_CLASSES_CONFIG, |
||||
ConfigDef.Type.LIST, |
||||
"", |
||||
ConfigDef.Importance.LOW, REST_EXTENSION_CLASSES_DOC |
||||
).define(ADMIN_LISTENERS_CONFIG, |
||||
ConfigDef.Type.LIST, |
||||
null, |
||||
new AdminListenersValidator(), |
||||
ConfigDef.Importance.LOW, |
||||
ADMIN_LISTENERS_DOC); |
||||
} |
||||
|
||||
/** |
||||
* Add the properties related to an internal-only server to the given {@link ConfigDef}. |
||||
* @param configDef the {@link ConfigDef} to add the properties to; may not be null |
||||
*/ |
||||
public static void addInternalConfig(ConfigDef configDef) { |
||||
configDef |
||||
.define( |
||||
LISTENERS_CONFIG, |
||||
ConfigDef.Type.LIST, |
||||
LISTENERS_DEFAULT, |
||||
new ListenersValidator(), |
||||
ConfigDef.Importance.LOW, |
||||
LISTENERS_DOC |
||||
).define( |
||||
REST_ADVERTISED_HOST_NAME_CONFIG, |
||||
ConfigDef.Type.STRING, |
||||
null, |
||||
ConfigDef.Importance.LOW, |
||||
REST_ADVERTISED_HOST_NAME_DOC |
||||
).define( |
||||
REST_ADVERTISED_PORT_CONFIG, |
||||
ConfigDef.Type.INT, |
||||
null, |
||||
ConfigDef.Importance.LOW, |
||||
REST_ADVERTISED_PORT_DOC |
||||
).define( |
||||
REST_ADVERTISED_LISTENER_CONFIG, |
||||
ConfigDef.Type.STRING, |
||||
null, |
||||
ConfigDef.Importance.LOW, |
||||
REST_ADVERTISED_LISTENER_DOC |
||||
).define( |
||||
ACCESS_CONTROL_ALLOW_ORIGIN_CONFIG, |
||||
ConfigDef.Type.STRING, |
||||
ACCESS_CONTROL_ALLOW_ORIGIN_DEFAULT, |
||||
ConfigDef.Importance.LOW, |
||||
ACCESS_CONTROL_ALLOW_ORIGIN_DOC |
||||
).define( |
||||
ACCESS_CONTROL_ALLOW_METHODS_CONFIG, |
||||
ConfigDef.Type.STRING, |
||||
ACCESS_CONTROL_ALLOW_METHODS_DEFAULT, |
||||
ConfigDef.Importance.LOW, |
||||
ACCESS_CONTROL_ALLOW_METHODS_DOC |
||||
).define( |
||||
RESPONSE_HTTP_HEADERS_CONFIG, |
||||
ConfigDef.Type.STRING, |
||||
RESPONSE_HTTP_HEADERS_DEFAULT, |
||||
new ResponseHttpHeadersValidator(), |
||||
ConfigDef.Importance.LOW, |
||||
RESPONSE_HTTP_HEADERS_DOC |
||||
).define( |
||||
BrokerSecurityConfigs.SSL_CLIENT_AUTH_CONFIG, |
||||
ConfigDef.Type.STRING, |
||||
SslClientAuth.NONE.toString(), |
||||
in(Utils.enumOptions(SslClientAuth.class)), |
||||
ConfigDef.Importance.LOW, |
||||
BrokerSecurityConfigs.SSL_CLIENT_AUTH_DOC); |
||||
} |
||||
|
||||
public static RestServerConfig forPublic(Integer rebalanceTimeoutMs, Map<?, ?> props) { |
||||
return new PublicConfig(rebalanceTimeoutMs, props); |
||||
} |
||||
|
||||
public static RestServerConfig forInternal(Map<?, ?> props) { |
||||
return new InternalConfig(props); |
||||
} |
||||
|
||||
public List<String> listeners() { |
||||
return getList(LISTENERS_CONFIG); |
||||
} |
||||
|
||||
public String rawListeners() { |
||||
return (String) originals().get(LISTENERS_CONFIG); |
||||
} |
||||
|
||||
public String allowedOrigins() { |
||||
return getString(ACCESS_CONTROL_ALLOW_ORIGIN_CONFIG); |
||||
} |
||||
|
||||
public String allowedMethods() { |
||||
return getString(ACCESS_CONTROL_ALLOW_METHODS_CONFIG); |
||||
} |
||||
|
||||
public String responseHeaders() { |
||||
return getString(RESPONSE_HTTP_HEADERS_CONFIG); |
||||
} |
||||
|
||||
public String advertisedListener() { |
||||
return getString(RestServerConfig.REST_ADVERTISED_LISTENER_CONFIG); |
||||
} |
||||
|
||||
public String advertisedHostName() { |
||||
return getString(REST_ADVERTISED_HOST_NAME_CONFIG); |
||||
} |
||||
|
||||
public Integer advertisedPort() { |
||||
return getInt(REST_ADVERTISED_PORT_CONFIG); |
||||
} |
||||
|
||||
public Integer rebalanceTimeoutMs() { |
||||
return null; |
||||
} |
||||
|
||||
protected RestServerConfig(ConfigDef configDef, Map<?, ?> props) { |
||||
super(configDef, props); |
||||
} |
||||
|
||||
// Visible for testing
|
||||
static void validateHttpResponseHeaderConfig(String config) { |
||||
try { |
||||
// validate format
|
||||
String[] configTokens = config.trim().split("\\s+", 2); |
||||
if (configTokens.length != 2) { |
||||
throw new ConfigException(String.format("Invalid format of header config '%s'. " |
||||
+ "Expected: '[action] [header name]:[header value]'", config)); |
||||
} |
||||
|
||||
// validate action
|
||||
String method = configTokens[0].trim(); |
||||
validateHeaderConfigAction(method); |
||||
|
||||
// validate header name and header value pair
|
||||
String header = configTokens[1]; |
||||
String[] headerTokens = header.trim().split(":"); |
||||
if (headerTokens.length != 2) { |
||||
throw new ConfigException( |
||||
String.format("Invalid format of header name and header value pair '%s'. " |
||||
+ "Expected: '[header name]:[header value]'", header)); |
||||
} |
||||
|
||||
// validate header name
|
||||
String headerName = headerTokens[0].trim(); |
||||
if (headerName.isEmpty() || headerName.matches(".*\\s+.*")) { |
||||
throw new ConfigException(String.format("Invalid header name '%s'. " |
||||
+ "The '[header name]' cannot contain whitespace", headerName)); |
||||
} |
||||
} catch (ArrayIndexOutOfBoundsException e) { |
||||
throw new ConfigException(String.format("Invalid header config '%s'.", config), e); |
||||
} |
||||
} |
||||
|
||||
// Visible for testing
|
||||
static void validateHeaderConfigAction(String action) { |
||||
if (HEADER_ACTIONS.stream().noneMatch(action::equalsIgnoreCase)) { |
||||
throw new ConfigException(String.format("Invalid header config action: '%s'. " |
||||
+ "Expected one of %s", action, HEADER_ACTIONS)); |
||||
} |
||||
} |
||||
|
||||
private static class ListenersValidator implements ConfigDef.Validator { |
||||
@Override |
||||
public void ensureValid(String name, Object value) { |
||||
if (!(value instanceof List)) { |
||||
throw new ConfigException("Invalid value type for listeners (expected list of URLs , ex: http://localhost:8080,https://localhost:8443)."); |
||||
} |
||||
|
||||
List<?> items = (List<?>) value; |
||||
if (items.isEmpty()) { |
||||
throw new ConfigException("Invalid value for listeners, at least one URL is expected, ex: http://localhost:8080,https://localhost:8443."); |
||||
} |
||||
|
||||
for (Object item : items) { |
||||
if (!(item instanceof String)) { |
||||
throw new ConfigException("Invalid type for listeners (expected String)."); |
||||
} |
||||
if (Utils.isBlank((String) item)) { |
||||
throw new ConfigException("Empty URL found when parsing listeners list."); |
||||
} |
||||
} |
||||
} |
||||
|
||||
@Override |
||||
public String toString() { |
||||
return "List of comma-separated URLs, ex: http://localhost:8080,https://localhost:8443."; |
||||
} |
||||
} |
||||
|
||||
private static class AdminListenersValidator implements ConfigDef.Validator { |
||||
@Override |
||||
public void ensureValid(String name, Object value) { |
||||
if (value == null) { |
||||
return; |
||||
} |
||||
|
||||
if (!(value instanceof List)) { |
||||
throw new ConfigException("Invalid value type for admin.listeners (expected list)."); |
||||
} |
||||
|
||||
List<?> items = (List<?>) value; |
||||
if (items.isEmpty()) { |
||||
return; |
||||
} |
||||
|
||||
for (Object item : items) { |
||||
if (!(item instanceof String)) { |
||||
throw new ConfigException("Invalid type for admin.listeners (expected String)."); |
||||
} |
||||
if (Utils.isBlank((String) item)) { |
||||
throw new ConfigException("Empty URL found when parsing admin.listeners list."); |
||||
} |
||||
} |
||||
} |
||||
|
||||
@Override |
||||
public String toString() { |
||||
return "List of comma-separated URLs, ex: http://localhost:8080,https://localhost:8443."; |
||||
} |
||||
} |
||||
|
||||
private static class ResponseHttpHeadersValidator implements ConfigDef.Validator { |
||||
@Override |
||||
public void ensureValid(String name, Object value) { |
||||
String strValue = (String) value; |
||||
if (Utils.isBlank(strValue)) { |
||||
return; |
||||
} |
||||
|
||||
String[] configs = StringUtil.csvSplit(strValue); // handles and removed surrounding quotes
|
||||
Arrays.stream(configs).forEach(RestServerConfig::validateHttpResponseHeaderConfig); |
||||
} |
||||
|
||||
@Override |
||||
public String toString() { |
||||
return "Comma-separated header rules, where each header rule is of the form " |
||||
+ "'[action] [header name]:[header value]' and optionally surrounded by double quotes " |
||||
+ "if any part of a header rule contains a comma"; |
||||
} |
||||
} |
||||
|
||||
private static class InternalConfig extends RestServerConfig { |
||||
|
||||
private static ConfigDef config() { |
||||
ConfigDef result = new ConfigDef().withClientSslSupport(); |
||||
addInternalConfig(result); |
||||
return result; |
||||
} |
||||
|
||||
@Override |
||||
public List<String> adminListeners() { |
||||
// Disable admin resources (such as the logging resource)
|
||||
return Collections.emptyList(); |
||||
} |
||||
|
||||
@Override |
||||
public List<String> restExtensions() { |
||||
// Disable the use of REST extensions
|
||||
return null; |
||||
} |
||||
|
||||
@Override |
||||
public boolean topicTrackingEnabled() { |
||||
// Topic tracking is unnecessary if we don't expose a public REST API
|
||||
return false; |
||||
} |
||||
|
||||
@Override |
||||
public boolean topicTrackingResetEnabled() { |
||||
// Topic tracking is unnecessary if we don't expose a public REST API
|
||||
return false; |
||||
} |
||||
|
||||
public InternalConfig(Map<?, ?> props) { |
||||
super(config(), props); |
||||
} |
||||
} |
||||
|
||||
private static class PublicConfig extends RestServerConfig { |
||||
|
||||
private final Integer rebalanceTimeoutMs; |
||||
private static ConfigDef config() { |
||||
ConfigDef result = new ConfigDef().withClientSslSupport(); |
||||
addPublicConfig(result); |
||||
WorkerConfig.addTopicTrackingConfig(result); |
||||
return result; |
||||
} |
||||
|
||||
@Override |
||||
public List<String> adminListeners() { |
||||
return getList(ADMIN_LISTENERS_CONFIG); |
||||
} |
||||
|
||||
@Override |
||||
public List<String> restExtensions() { |
||||
return getList(REST_EXTENSION_CLASSES_CONFIG); |
||||
} |
||||
|
||||
@Override |
||||
public Integer rebalanceTimeoutMs() { |
||||
return rebalanceTimeoutMs; |
||||
} |
||||
|
||||
@Override |
||||
public boolean topicTrackingEnabled() { |
||||
return getBoolean(WorkerConfig.TOPIC_TRACKING_ENABLE_CONFIG); |
||||
} |
||||
|
||||
@Override |
||||
public boolean topicTrackingResetEnabled() { |
||||
return getBoolean(WorkerConfig.TOPIC_TRACKING_ALLOW_RESET_CONFIG); |
||||
} |
||||
|
||||
public PublicConfig(Integer rebalanceTimeoutMs, Map<?, ?> props) { |
||||
super(config(), props); |
||||
this.rebalanceTimeoutMs = rebalanceTimeoutMs; |
||||
} |
||||
} |
||||
} |
@ -0,0 +1,115 @@
@@ -0,0 +1,115 @@
|
||||
/* |
||||
* Licensed to the Apache Software Foundation (ASF) under one or more |
||||
* contributor license agreements. See the NOTICE file distributed with |
||||
* this work for additional information regarding copyright ownership. |
||||
* The ASF licenses this file to You under the Apache License, Version 2.0 |
||||
* (the "License"); you may not use this file except in compliance with |
||||
* the License. You may obtain a copy of the License at |
||||
* |
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
* |
||||
* Unless required by applicable law or agreed to in writing, software |
||||
* distributed under the License is distributed on an "AS IS" BASIS, |
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
||||
* See the License for the specific language governing permissions and |
||||
* limitations under the License. |
||||
*/ |
||||
package org.apache.kafka.connect.runtime.rest.resources; |
||||
|
||||
import com.fasterxml.jackson.core.type.TypeReference; |
||||
import com.fasterxml.jackson.databind.ObjectMapper; |
||||
import io.swagger.v3.oas.annotations.Operation; |
||||
import org.apache.kafka.connect.runtime.Herder; |
||||
import org.apache.kafka.connect.runtime.distributed.Crypto; |
||||
import org.apache.kafka.connect.runtime.rest.HerderRequestHandler; |
||||
import org.apache.kafka.connect.runtime.rest.InternalRequestSignature; |
||||
import org.apache.kafka.connect.runtime.rest.RestClient; |
||||
import org.apache.kafka.connect.util.FutureCallback; |
||||
|
||||
import javax.ws.rs.POST; |
||||
import javax.ws.rs.PUT; |
||||
import javax.ws.rs.Path; |
||||
import javax.ws.rs.PathParam; |
||||
import javax.ws.rs.Produces; |
||||
import javax.ws.rs.QueryParam; |
||||
import javax.ws.rs.core.Context; |
||||
import javax.ws.rs.core.HttpHeaders; |
||||
import javax.ws.rs.core.MediaType; |
||||
import javax.ws.rs.core.UriInfo; |
||||
import java.util.List; |
||||
import java.util.Map; |
||||
|
||||
/** |
||||
* Contains endpoints necessary for intra-cluster communication--that is, requests that |
||||
* workers will issue to each other that originate from within the cluster, as opposed to |
||||
* requests that originate from a user and are forwarded from one worker to another. |
||||
*/ |
||||
@Produces(MediaType.APPLICATION_JSON) |
||||
public abstract class InternalClusterResource implements ConnectResource { |
||||
|
||||
private static final TypeReference<List<Map<String, String>>> TASK_CONFIGS_TYPE = |
||||
new TypeReference<List<Map<String, String>>>() { }; |
||||
|
||||
private final HerderRequestHandler requestHandler; |
||||
|
||||
// Visible for testing
|
||||
@Context |
||||
UriInfo uriInfo; |
||||
|
||||
protected InternalClusterResource(RestClient restClient) { |
||||
this.requestHandler = new HerderRequestHandler(restClient, DEFAULT_REST_REQUEST_TIMEOUT_MS); |
||||
} |
||||
|
||||
@Override |
||||
public void requestTimeout(long requestTimeoutMs) { |
||||
requestHandler.requestTimeoutMs(requestTimeoutMs); |
||||
} |
||||
|
||||
/** |
||||
* @return a {@link Herder} instance that can be used to satisfy the current request; may not be null |
||||
* @throws javax.ws.rs.NotFoundException if no such herder can be provided |
||||
*/ |
||||
protected abstract Herder herderForRequest(); |
||||
|
||||
@POST |
||||
@Path("/{connector}/tasks") |
||||
@Operation(hidden = true, summary = "This operation is only for inter-worker communications") |
||||
public void putTaskConfigs( |
||||
final @PathParam("connector") String connector, |
||||
final @Context HttpHeaders headers, |
||||
final @QueryParam("forward") Boolean forward, |
||||
final byte[] requestBody) throws Throwable { |
||||
List<Map<String, String>> taskConfigs = new ObjectMapper().readValue(requestBody, TASK_CONFIGS_TYPE); |
||||
FutureCallback<Void> cb = new FutureCallback<>(); |
||||
herderForRequest().putTaskConfigs(connector, taskConfigs, cb, InternalRequestSignature.fromHeaders(Crypto.SYSTEM, requestBody, headers)); |
||||
requestHandler.completeOrForwardRequest( |
||||
cb, |
||||
uriInfo.getPath(), |
||||
"POST", |
||||
headers, |
||||
taskConfigs, |
||||
forward |
||||
); |
||||
} |
||||
|
||||
@PUT |
||||
@Path("/{connector}/fence") |
||||
@Operation(hidden = true, summary = "This operation is only for inter-worker communications") |
||||
public void fenceZombies( |
||||
final @PathParam("connector") String connector, |
||||
final @Context HttpHeaders headers, |
||||
final @QueryParam("forward") Boolean forward, |
||||
final byte[] requestBody) throws Throwable { |
||||
FutureCallback<Void> cb = new FutureCallback<>(); |
||||
herderForRequest().fenceZombieSourceTasks(connector, cb, InternalRequestSignature.fromHeaders(Crypto.SYSTEM, requestBody, headers)); |
||||
requestHandler.completeOrForwardRequest( |
||||
cb, |
||||
uriInfo.getPath(), |
||||
"PUT", |
||||
headers, |
||||
requestBody, |
||||
forward |
||||
); |
||||
} |
||||
|
||||
} |
@ -0,0 +1,39 @@
@@ -0,0 +1,39 @@
|
||||
/* |
||||
* Licensed to the Apache Software Foundation (ASF) under one or more |
||||
* contributor license agreements. See the NOTICE file distributed with |
||||
* this work for additional information regarding copyright ownership. |
||||
* The ASF licenses this file to You under the Apache License, Version 2.0 |
||||
* (the "License"); you may not use this file except in compliance with |
||||
* the License. You may obtain a copy of the License at |
||||
* |
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
* |
||||
* Unless required by applicable law or agreed to in writing, software |
||||
* distributed under the License is distributed on an "AS IS" BASIS, |
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
||||
* See the License for the specific language governing permissions and |
||||
* limitations under the License. |
||||
*/ |
||||
package org.apache.kafka.connect.runtime.rest.resources; |
||||
|
||||
import org.apache.kafka.connect.runtime.Herder; |
||||
import org.apache.kafka.connect.runtime.rest.RestClient; |
||||
|
||||
import javax.ws.rs.Path; |
||||
|
||||
@Path("/connectors") |
||||
public class InternalConnectResource extends InternalClusterResource { |
||||
|
||||
private final Herder herder; |
||||
|
||||
public InternalConnectResource(Herder herder, RestClient restClient) { |
||||
super(restClient); |
||||
this.herder = herder; |
||||
} |
||||
|
||||
@Override |
||||
protected Herder herderForRequest() { |
||||
return herder; |
||||
} |
||||
|
||||
} |
@ -0,0 +1,167 @@
@@ -0,0 +1,167 @@
|
||||
/* |
||||
* Licensed to the Apache Software Foundation (ASF) under one or more |
||||
* contributor license agreements. See the NOTICE file distributed with |
||||
* this work for additional information regarding copyright ownership. |
||||
* The ASF licenses this file to You under the Apache License, Version 2.0 |
||||
* (the "License"); you may not use this file except in compliance with |
||||
* the License. You may obtain a copy of the License at |
||||
* |
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
* |
||||
* Unless required by applicable law or agreed to in writing, software |
||||
* distributed under the License is distributed on an "AS IS" BASIS, |
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
||||
* See the License for the specific language governing permissions and |
||||
* limitations under the License. |
||||
*/ |
||||
package org.apache.kafka.connect.runtime.rest; |
||||
|
||||
import org.apache.kafka.common.config.ConfigException; |
||||
import org.apache.kafka.common.config.internals.BrokerSecurityConfigs; |
||||
import org.junit.Test; |
||||
|
||||
import java.util.Arrays; |
||||
import java.util.HashMap; |
||||
import java.util.List; |
||||
import java.util.Map; |
||||
|
||||
import static org.apache.kafka.connect.runtime.rest.RestServerConfig.LISTENERS_DEFAULT; |
||||
import static org.junit.Assert.assertEquals; |
||||
import static org.junit.Assert.assertNull; |
||||
import static org.junit.Assert.assertThrows; |
||||
import static org.junit.Assert.assertTrue; |
||||
|
||||
public class RestServerConfigTest { |
||||
|
||||
private static final List<String> VALID_HEADER_CONFIGS = Arrays.asList( |
||||
"add \t Cache-Control: no-cache, no-store, must-revalidate", |
||||
"add \r X-XSS-Protection: 1; mode=block", |
||||
"\n add Strict-Transport-Security: max-age=31536000; includeSubDomains", |
||||
"AdD Strict-Transport-Security: \r max-age=31536000; includeSubDomains", |
||||
"AdD \t Strict-Transport-Security : \n max-age=31536000; includeSubDomains", |
||||
"add X-Content-Type-Options: \r nosniff", |
||||
"Set \t X-Frame-Options: \t Deny\n ", |
||||
"seT \t X-Cache-Info: \t not cacheable\n ", |
||||
"seTDate \t Expires: \r 31540000000", |
||||
"adDdate \n Last-Modified: \t 0" |
||||
); |
||||
|
||||
private static final List<String> INVALID_HEADER_CONFIGS = Arrays.asList( |
||||
"set \t", |
||||
"badaction \t X-Frame-Options:DENY", |
||||
"set add X-XSS-Protection:1", |
||||
"addX-XSS-Protection", |
||||
"X-XSS-Protection:", |
||||
"add set X-XSS-Protection: 1", |
||||
"add X-XSS-Protection:1 X-XSS-Protection:1 ", |
||||
"add X-XSS-Protection", |
||||
"set X-Frame-Options:DENY, add :no-cache, no-store, must-revalidate " |
||||
); |
||||
|
||||
@Test |
||||
public void testListenersConfigAllowedValues() { |
||||
Map<String, String> props = new HashMap<>(); |
||||
|
||||
// no value set for "listeners"
|
||||
RestServerConfig config = RestServerConfig.forPublic(null, props); |
||||
assertEquals(LISTENERS_DEFAULT, config.listeners()); |
||||
|
||||
props.put(RestServerConfig.LISTENERS_CONFIG, "http://a.b:9999"); |
||||
config = RestServerConfig.forPublic(null, props); |
||||
assertEquals(Arrays.asList("http://a.b:9999"), config.listeners()); |
||||
|
||||
props.put(RestServerConfig.LISTENERS_CONFIG, "http://a.b:9999, https://a.b:7812"); |
||||
config = RestServerConfig.forPublic(null, props); |
||||
assertEquals(Arrays.asList("http://a.b:9999", "https://a.b:7812"), config.listeners()); |
||||
|
||||
config = RestServerConfig.forPublic(null, props); |
||||
} |
||||
|
||||
@Test |
||||
public void testListenersConfigNotAllowedValues() { |
||||
Map<String, String> props = new HashMap<>(); |
||||
assertEquals(LISTENERS_DEFAULT, RestServerConfig.forPublic(null, props).listeners()); |
||||
|
||||
props.put(RestServerConfig.LISTENERS_CONFIG, ""); |
||||
ConfigException ce = assertThrows(ConfigException.class, () -> RestServerConfig.forPublic(null, props)); |
||||
assertTrue(ce.getMessage().contains(" listeners")); |
||||
|
||||
props.put(RestServerConfig.LISTENERS_CONFIG, ",,,"); |
||||
ce = assertThrows(ConfigException.class, () -> RestServerConfig.forPublic(null, props)); |
||||
assertTrue(ce.getMessage().contains(" listeners")); |
||||
|
||||
props.put(RestServerConfig.LISTENERS_CONFIG, "http://a.b:9999,"); |
||||
ce = assertThrows(ConfigException.class, () -> RestServerConfig.forPublic(null, props)); |
||||
assertTrue(ce.getMessage().contains(" listeners")); |
||||
|
||||
props.put(RestServerConfig.LISTENERS_CONFIG, "http://a.b:9999, ,https://a.b:9999"); |
||||
ce = assertThrows(ConfigException.class, () -> RestServerConfig.forPublic(null, props)); |
||||
assertTrue(ce.getMessage().contains(" listeners")); |
||||
} |
||||
|
||||
@Test |
||||
public void testAdminListenersConfigAllowedValues() { |
||||
Map<String, String> props = new HashMap<>(); |
||||
|
||||
// no value set for "admin.listeners"
|
||||
RestServerConfig config = RestServerConfig.forPublic(null, props); |
||||
assertNull("Default value should be null.", config.adminListeners()); |
||||
|
||||
props.put(RestServerConfig.ADMIN_LISTENERS_CONFIG, ""); |
||||
config = RestServerConfig.forPublic(null, props); |
||||
assertTrue(config.adminListeners().isEmpty()); |
||||
|
||||
props.put(RestServerConfig.ADMIN_LISTENERS_CONFIG, "http://a.b:9999, https://a.b:7812"); |
||||
config = RestServerConfig.forPublic(null, props); |
||||
assertEquals(Arrays.asList("http://a.b:9999", "https://a.b:7812"), config.adminListeners()); |
||||
|
||||
RestServerConfig.forPublic(null, props); |
||||
} |
||||
|
||||
@Test |
||||
public void testAdminListenersNotAllowingEmptyStrings() { |
||||
Map<String, String> props = new HashMap<>(); |
||||
|
||||
props.put(RestServerConfig.ADMIN_LISTENERS_CONFIG, "http://a.b:9999,"); |
||||
ConfigException ce = assertThrows(ConfigException.class, () -> RestServerConfig.forPublic(null, props)); |
||||
assertTrue(ce.getMessage().contains(" admin.listeners")); |
||||
} |
||||
|
||||
@Test |
||||
public void testAdminListenersNotAllowingBlankStrings() { |
||||
Map<String, String> props = new HashMap<>(); |
||||
props.put(RestServerConfig.ADMIN_LISTENERS_CONFIG, "http://a.b:9999, ,https://a.b:9999"); |
||||
assertThrows(ConfigException.class, () -> RestServerConfig.forPublic(null, props)); |
||||
} |
||||
|
||||
@Test |
||||
public void testInvalidHeaderConfigs() { |
||||
for (String config : INVALID_HEADER_CONFIGS) { |
||||
assertInvalidHeaderConfig(config); |
||||
} |
||||
} |
||||
|
||||
@Test |
||||
public void testValidHeaderConfigs() { |
||||
for (String config : VALID_HEADER_CONFIGS) { |
||||
assertValidHeaderConfig(config); |
||||
} |
||||
} |
||||
|
||||
private void assertInvalidHeaderConfig(String config) { |
||||
assertThrows(ConfigException.class, () -> RestServerConfig.validateHttpResponseHeaderConfig(config)); |
||||
} |
||||
|
||||
private void assertValidHeaderConfig(String config) { |
||||
RestServerConfig.validateHttpResponseHeaderConfig(config); |
||||
} |
||||
|
||||
@Test |
||||
public void testInvalidSslClientAuthConfig() { |
||||
Map<String, String> props = new HashMap<>(); |
||||
|
||||
props.put(BrokerSecurityConfigs.SSL_CLIENT_AUTH_CONFIG, "abc"); |
||||
ConfigException ce = assertThrows(ConfigException.class, () -> RestServerConfig.forPublic(null, props)); |
||||
assertTrue(ce.getMessage().contains(BrokerSecurityConfigs.SSL_CLIENT_AUTH_CONFIG)); |
||||
} |
||||
} |
@ -0,0 +1,224 @@
@@ -0,0 +1,224 @@
|
||||
/* |
||||
* Licensed to the Apache Software Foundation (ASF) under one or more |
||||
* contributor license agreements. See the NOTICE file distributed with |
||||
* this work for additional information regarding copyright ownership. |
||||
* The ASF licenses this file to You under the Apache License, Version 2.0 |
||||
* (the "License"); you may not use this file except in compliance with |
||||
* the License. You may obtain a copy of the License at |
||||
* |
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
* |
||||
* Unless required by applicable law or agreed to in writing, software |
||||
* distributed under the License is distributed on an "AS IS" BASIS, |
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
||||
* See the License for the specific language governing permissions and |
||||
* limitations under the License. |
||||
*/ |
||||
package org.apache.kafka.connect.runtime.rest.resources; |
||||
|
||||
import com.fasterxml.jackson.databind.ObjectMapper; |
||||
import org.apache.kafka.connect.errors.NotFoundException; |
||||
import org.apache.kafka.connect.runtime.Herder; |
||||
import org.apache.kafka.connect.runtime.rest.InternalRequestSignature; |
||||
import org.apache.kafka.connect.runtime.rest.RestClient; |
||||
import org.apache.kafka.connect.util.Callback; |
||||
import org.junit.Before; |
||||
import org.junit.Test; |
||||
import org.junit.runner.RunWith; |
||||
import org.mockito.ArgumentCaptor; |
||||
import org.mockito.Mock; |
||||
import org.mockito.junit.MockitoJUnitRunner; |
||||
import org.mockito.stubbing.Stubber; |
||||
|
||||
import javax.crypto.Mac; |
||||
import javax.ws.rs.core.HttpHeaders; |
||||
import javax.ws.rs.core.UriInfo; |
||||
import java.io.IOException; |
||||
import java.util.ArrayList; |
||||
import java.util.Base64; |
||||
import java.util.Collections; |
||||
import java.util.List; |
||||
import java.util.Map; |
||||
|
||||
import static org.junit.Assert.assertEquals; |
||||
import static org.junit.Assert.assertThrows; |
||||
import static org.mockito.Mockito.any; |
||||
import static org.mockito.Mockito.doAnswer; |
||||
import static org.mockito.Mockito.eq; |
||||
import static org.mockito.Mockito.isNull; |
||||
import static org.mockito.Mockito.mock; |
||||
import static org.mockito.Mockito.when; |
||||
|
||||
@RunWith(MockitoJUnitRunner.StrictStubs.class) |
||||
public class InternalConnectResourceTest { |
||||
|
||||
private static final Boolean FORWARD = true; |
||||
private static final String CONNECTOR_NAME = "test"; |
||||
private static final HttpHeaders NULL_HEADERS = null; |
||||
private static final List<Map<String, String>> TASK_CONFIGS = new ArrayList<>(); |
||||
static { |
||||
TASK_CONFIGS.add(Collections.singletonMap("config", "value")); |
||||
TASK_CONFIGS.add(Collections.singletonMap("config", "other_value")); |
||||
} |
||||
private static final String FENCE_PATH = "/connectors/" + CONNECTOR_NAME + "/fence"; |
||||
private static final String TASK_CONFIGS_PATH = "/connectors/" + CONNECTOR_NAME + "/tasks"; |
||||
|
||||
@Mock |
||||
private UriInfo uriInfo; |
||||
@Mock |
||||
private Herder herder; |
||||
@Mock |
||||
private RestClient restClient; |
||||
|
||||
private InternalConnectResource internalResource; |
||||
|
||||
@Before |
||||
public void setup() { |
||||
internalResource = new InternalConnectResource(herder, restClient); |
||||
internalResource.uriInfo = uriInfo; |
||||
} |
||||
|
||||
@Test |
||||
public void testPutConnectorTaskConfigsNoInternalRequestSignature() throws Throwable { |
||||
@SuppressWarnings("unchecked") |
||||
final ArgumentCaptor<Callback<Void>> cb = ArgumentCaptor.forClass(Callback.class); |
||||
expectAndCallbackResult(cb, null).when(herder).putTaskConfigs( |
||||
eq(CONNECTOR_NAME), |
||||
eq(TASK_CONFIGS), |
||||
cb.capture(), |
||||
any() |
||||
); |
||||
expectRequestPath(TASK_CONFIGS_PATH); |
||||
|
||||
internalResource.putTaskConfigs(CONNECTOR_NAME, NULL_HEADERS, FORWARD, serializeAsBytes(TASK_CONFIGS)); |
||||
} |
||||
|
||||
@Test |
||||
public void testPutConnectorTaskConfigsWithInternalRequestSignature() throws Throwable { |
||||
final String signatureAlgorithm = "HmacSHA256"; |
||||
final String encodedSignature = "Kv1/OSsxzdVIwvZ4e30avyRIVrngDfhzVUm/kAZEKc4="; |
||||
|
||||
@SuppressWarnings("unchecked") |
||||
final ArgumentCaptor<Callback<Void>> cb = ArgumentCaptor.forClass(Callback.class); |
||||
final ArgumentCaptor<InternalRequestSignature> signatureCapture = ArgumentCaptor.forClass(InternalRequestSignature.class); |
||||
expectAndCallbackResult(cb, null).when(herder).putTaskConfigs( |
||||
eq(CONNECTOR_NAME), |
||||
eq(TASK_CONFIGS), |
||||
cb.capture(), |
||||
signatureCapture.capture() |
||||
); |
||||
|
||||
HttpHeaders headers = mock(HttpHeaders.class); |
||||
when(headers.getHeaderString(InternalRequestSignature.SIGNATURE_ALGORITHM_HEADER)) |
||||
.thenReturn(signatureAlgorithm); |
||||
when(headers.getHeaderString(InternalRequestSignature.SIGNATURE_HEADER)) |
||||
.thenReturn(encodedSignature); |
||||
expectRequestPath(TASK_CONFIGS_PATH); |
||||
|
||||
internalResource.putTaskConfigs(CONNECTOR_NAME, headers, FORWARD, serializeAsBytes(TASK_CONFIGS)); |
||||
|
||||
InternalRequestSignature expectedSignature = new InternalRequestSignature( |
||||
serializeAsBytes(TASK_CONFIGS), |
||||
Mac.getInstance(signatureAlgorithm), |
||||
Base64.getDecoder().decode(encodedSignature) |
||||
); |
||||
assertEquals( |
||||
expectedSignature, |
||||
signatureCapture.getValue() |
||||
); |
||||
} |
||||
|
||||
@Test |
||||
public void testPutConnectorTaskConfigsConnectorNotFound() { |
||||
@SuppressWarnings("unchecked") |
||||
final ArgumentCaptor<Callback<Void>> cb = ArgumentCaptor.forClass(Callback.class); |
||||
expectAndCallbackException(cb, new NotFoundException("not found")).when(herder).putTaskConfigs( |
||||
eq(CONNECTOR_NAME), |
||||
eq(TASK_CONFIGS), |
||||
cb.capture(), |
||||
any() |
||||
); |
||||
expectRequestPath(TASK_CONFIGS_PATH); |
||||
|
||||
assertThrows(NotFoundException.class, () -> internalResource.putTaskConfigs(CONNECTOR_NAME, NULL_HEADERS, |
||||
FORWARD, serializeAsBytes(TASK_CONFIGS))); |
||||
} |
||||
|
||||
@Test |
||||
public void testFenceZombiesNoInternalRequestSignature() throws Throwable { |
||||
@SuppressWarnings("unchecked") |
||||
final ArgumentCaptor<Callback<Void>> cb = ArgumentCaptor.forClass(Callback.class); |
||||
expectAndCallbackResult(cb, null) |
||||
.when(herder).fenceZombieSourceTasks(eq(CONNECTOR_NAME), cb.capture(), isNull()); |
||||
expectRequestPath(FENCE_PATH); |
||||
|
||||
internalResource.fenceZombies(CONNECTOR_NAME, NULL_HEADERS, FORWARD, serializeAsBytes(null)); |
||||
} |
||||
|
||||
@Test |
||||
public void testFenceZombiesWithInternalRequestSignature() throws Throwable { |
||||
final String signatureAlgorithm = "HmacSHA256"; |
||||
final String encodedSignature = "Kv1/OSsxzdVIwvZ4e30avyRIVrngDfhzVUm/kAZEKc4="; |
||||
|
||||
@SuppressWarnings("unchecked") |
||||
final ArgumentCaptor<Callback<Void>> cb = ArgumentCaptor.forClass(Callback.class); |
||||
final ArgumentCaptor<InternalRequestSignature> signatureCapture = ArgumentCaptor.forClass(InternalRequestSignature.class); |
||||
expectAndCallbackResult(cb, null) |
||||
.when(herder).fenceZombieSourceTasks(eq(CONNECTOR_NAME), cb.capture(), signatureCapture.capture()); |
||||
|
||||
HttpHeaders headers = mock(HttpHeaders.class); |
||||
when(headers.getHeaderString(InternalRequestSignature.SIGNATURE_ALGORITHM_HEADER)) |
||||
.thenReturn(signatureAlgorithm); |
||||
when(headers.getHeaderString(InternalRequestSignature.SIGNATURE_HEADER)) |
||||
.thenReturn(encodedSignature); |
||||
expectRequestPath(FENCE_PATH); |
||||
|
||||
internalResource.fenceZombies(CONNECTOR_NAME, headers, FORWARD, serializeAsBytes(null)); |
||||
|
||||
InternalRequestSignature expectedSignature = new InternalRequestSignature( |
||||
serializeAsBytes(null), |
||||
Mac.getInstance(signatureAlgorithm), |
||||
Base64.getDecoder().decode(encodedSignature) |
||||
); |
||||
assertEquals( |
||||
expectedSignature, |
||||
signatureCapture.getValue() |
||||
); |
||||
} |
||||
|
||||
@Test |
||||
public void testFenceZombiesConnectorNotFound() throws Throwable { |
||||
@SuppressWarnings("unchecked") |
||||
final ArgumentCaptor<Callback<Void>> cb = ArgumentCaptor.forClass(Callback.class); |
||||
|
||||
expectAndCallbackException(cb, new NotFoundException("not found")) |
||||
.when(herder).fenceZombieSourceTasks(eq(CONNECTOR_NAME), cb.capture(), any()); |
||||
expectRequestPath(FENCE_PATH); |
||||
|
||||
assertThrows(NotFoundException.class, |
||||
() -> internalResource.fenceZombies(CONNECTOR_NAME, NULL_HEADERS, FORWARD, serializeAsBytes(null))); |
||||
} |
||||
|
||||
private <T> byte[] serializeAsBytes(final T value) throws IOException { |
||||
return new ObjectMapper().writeValueAsBytes(value); |
||||
} |
||||
|
||||
private <T> Stubber expectAndCallbackResult(final ArgumentCaptor<Callback<T>> cb, final T value) { |
||||
return doAnswer(invocation -> { |
||||
cb.getValue().onCompletion(null, value); |
||||
return null; |
||||
}); |
||||
} |
||||
|
||||
private <T> Stubber expectAndCallbackException(final ArgumentCaptor<Callback<T>> cb, final Throwable t) { |
||||
return doAnswer(invocation -> { |
||||
cb.getValue().onCompletion(t, null); |
||||
return null; |
||||
}); |
||||
} |
||||
|
||||
private void expectRequestPath(String path) { |
||||
when(uriInfo.getPath()).thenReturn(path); |
||||
} |
||||
|
||||
} |
Loading…
Reference in new issue