Browse Source
Reviewers: Greg Harris <greg.harris@aiven.io>, Yang Yang <yayang@uber.com>, Yash Mayya <yash.mayya@gmail.com>pull/11052/merge
Chris Egerton
1 year ago
committed by
GitHub
34 changed files with 3254 additions and 1827 deletions
@ -0,0 +1,215 @@
@@ -0,0 +1,215 @@
|
||||
/* |
||||
* Licensed to the Apache Software Foundation (ASF) under one or more |
||||
* contributor license agreements. See the NOTICE file distributed with |
||||
* this work for additional information regarding copyright ownership. |
||||
* The ASF licenses this file to You under the Apache License, Version 2.0 |
||||
* (the "License"); you may not use this file except in compliance with |
||||
* the License. You may obtain a copy of the License at |
||||
* |
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
* |
||||
* Unless required by applicable law or agreed to in writing, software |
||||
* distributed under the License is distributed on an "AS IS" BASIS, |
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
||||
* See the License for the specific language governing permissions and |
||||
* limitations under the License. |
||||
*/ |
||||
package org.apache.kafka.connect.runtime; |
||||
|
||||
import org.apache.kafka.common.utils.Time; |
||||
import org.apache.kafka.connect.runtime.rest.entities.LoggerLevel; |
||||
import org.apache.log4j.Level; |
||||
import org.apache.log4j.LogManager; |
||||
import org.slf4j.Logger; |
||||
import org.slf4j.LoggerFactory; |
||||
|
||||
import java.util.ArrayList; |
||||
import java.util.Collections; |
||||
import java.util.Enumeration; |
||||
import java.util.HashMap; |
||||
import java.util.List; |
||||
import java.util.Map; |
||||
import java.util.Objects; |
||||
import java.util.TreeMap; |
||||
|
||||
/** |
||||
* Manages logging levels on a single worker. Supports dynamic adjustment and querying |
||||
* of logging levels. |
||||
* <p> |
||||
* This class is thread-safe; concurrent calls to all of its public methods from any number |
||||
* of threads are permitted. |
||||
*/ |
||||
public class Loggers { |
||||
|
||||
private static final Logger log = LoggerFactory.getLogger(Loggers.class); |
||||
|
||||
/** |
||||
* Log4j uses "root" (case-insensitive) as name of the root logger. |
||||
*/ |
||||
private static final String ROOT_LOGGER_NAME = "root"; |
||||
|
||||
private final Time time; |
||||
private final Map<String, Long> lastModifiedTimes; |
||||
|
||||
public Loggers(Time time) { |
||||
this.time = time; |
||||
this.lastModifiedTimes = new HashMap<>(); |
||||
} |
||||
|
||||
/** |
||||
* Retrieve the current level for a single logger. |
||||
* @param logger the name of the logger to retrieve the level for; may not be null |
||||
* @return the current level (falling back on the effective level if necessary) of the logger, |
||||
* or null if no logger with the specified name exists |
||||
*/ |
||||
public synchronized LoggerLevel level(String logger) { |
||||
Objects.requireNonNull(logger, "Logger may not be null"); |
||||
|
||||
org.apache.log4j.Logger foundLogger = null; |
||||
if (ROOT_LOGGER_NAME.equalsIgnoreCase(logger)) { |
||||
foundLogger = rootLogger(); |
||||
} else { |
||||
Enumeration<org.apache.log4j.Logger> en = currentLoggers(); |
||||
// search within existing loggers for the given name.
|
||||
// using LogManger.getLogger() will create a logger if it doesn't exist
|
||||
// (potential leak since these don't get cleaned up).
|
||||
while (en.hasMoreElements()) { |
||||
org.apache.log4j.Logger l = en.nextElement(); |
||||
if (logger.equals(l.getName())) { |
||||
foundLogger = l; |
||||
break; |
||||
} |
||||
} |
||||
} |
||||
|
||||
if (foundLogger == null) { |
||||
log.warn("Unable to find level for logger {}", logger); |
||||
return null; |
||||
} |
||||
|
||||
return loggerLevel(foundLogger); |
||||
} |
||||
|
||||
/** |
||||
* Retrieve the current levels of all known loggers |
||||
* @return the levels of all known loggers; may be empty, but never null |
||||
*/ |
||||
public synchronized Map<String, LoggerLevel> allLevels() { |
||||
Map<String, LoggerLevel> result = new TreeMap<>(); |
||||
|
||||
Enumeration<org.apache.log4j.Logger> enumeration = currentLoggers(); |
||||
Collections.list(enumeration) |
||||
.stream() |
||||
.filter(logger -> logger.getLevel() != null) |
||||
.forEach(logger -> result.put(logger.getName(), loggerLevel(logger))); |
||||
|
||||
org.apache.log4j.Logger root = rootLogger(); |
||||
if (root.getLevel() != null) { |
||||
result.put(ROOT_LOGGER_NAME, loggerLevel(root)); |
||||
} |
||||
|
||||
return result; |
||||
} |
||||
|
||||
/** |
||||
* Set the level for the specified logger and all of its children |
||||
* @param namespace the name of the logger to adjust along with its children; may not be null |
||||
* @param level the level to set for the logger and its children; may not be null |
||||
* @return all loggers that were affected by this action, sorted by their natural ordering; |
||||
* may be empty, but never null |
||||
*/ |
||||
public synchronized List<String> setLevel(String namespace, Level level) { |
||||
Objects.requireNonNull(namespace, "Logging namespace may not be null"); |
||||
Objects.requireNonNull(level, "Level may not be null"); |
||||
|
||||
log.info("Setting level of namespace {} and children to {}", namespace, level); |
||||
List<org.apache.log4j.Logger> childLoggers = loggers(namespace); |
||||
|
||||
List<String> result = new ArrayList<>(); |
||||
for (org.apache.log4j.Logger logger: childLoggers) { |
||||
setLevel(logger, level); |
||||
result.add(logger.getName()); |
||||
} |
||||
Collections.sort(result); |
||||
|
||||
return result; |
||||
} |
||||
|
||||
/** |
||||
* Retrieve all known loggers within a given namespace, creating an ancestor logger for that |
||||
* namespace if one does not already exist |
||||
* @param namespace the namespace that the loggers should fall under; may not be null |
||||
* @return all loggers that fall under the given namespace; never null, and will always contain |
||||
* at least one logger (the ancestor logger for the namespace) |
||||
*/ |
||||
private synchronized List<org.apache.log4j.Logger> loggers(String namespace) { |
||||
Objects.requireNonNull(namespace, "Logging namespace may not be null"); |
||||
|
||||
if (ROOT_LOGGER_NAME.equalsIgnoreCase(namespace)) { |
||||
List<org.apache.log4j.Logger> result = Collections.list(currentLoggers()); |
||||
result.add(rootLogger()); |
||||
return result; |
||||
} |
||||
|
||||
List<org.apache.log4j.Logger> result = new ArrayList<>(); |
||||
org.apache.log4j.Logger ancestorLogger = lookupLogger(namespace); |
||||
Enumeration<org.apache.log4j.Logger> en = currentLoggers(); |
||||
boolean present = false; |
||||
while (en.hasMoreElements()) { |
||||
org.apache.log4j.Logger current = en.nextElement(); |
||||
if (current.getName().startsWith(namespace)) { |
||||
result.add(current); |
||||
} |
||||
if (namespace.equals(current.getName())) { |
||||
present = true; |
||||
} |
||||
} |
||||
|
||||
if (!present) { |
||||
result.add(ancestorLogger); |
||||
} |
||||
|
||||
return result; |
||||
} |
||||
|
||||
// visible for testing
|
||||
org.apache.log4j.Logger lookupLogger(String logger) { |
||||
return LogManager.getLogger(logger); |
||||
} |
||||
|
||||
@SuppressWarnings("unchecked") |
||||
// visible for testing
|
||||
Enumeration<org.apache.log4j.Logger> currentLoggers() { |
||||
return LogManager.getCurrentLoggers(); |
||||
} |
||||
|
||||
// visible for testing
|
||||
org.apache.log4j.Logger rootLogger() { |
||||
return LogManager.getRootLogger(); |
||||
} |
||||
|
||||
private void setLevel(org.apache.log4j.Logger logger, Level level) { |
||||
Level currentLevel = logger.getLevel(); |
||||
if (currentLevel == null) |
||||
currentLevel = logger.getEffectiveLevel(); |
||||
|
||||
if (level.equals(currentLevel)) { |
||||
log.debug("Skipping update for logger {} since its level is already {}", logger.getName(), level); |
||||
return; |
||||
} |
||||
|
||||
log.debug("Setting level of logger {} (excluding children) to {}", logger.getName(), level); |
||||
logger.setLevel(level); |
||||
lastModifiedTimes.put(logger.getName(), time.milliseconds()); |
||||
} |
||||
|
||||
private LoggerLevel loggerLevel(org.apache.log4j.Logger logger) { |
||||
Level level = logger.getLevel(); |
||||
if (level == null) |
||||
level = logger.getEffectiveLevel(); |
||||
|
||||
Long lastModified = lastModifiedTimes.get(logger.getName()); |
||||
return new LoggerLevel(Objects.toString(level), lastModified); |
||||
} |
||||
|
||||
} |
@ -0,0 +1,68 @@
@@ -0,0 +1,68 @@
|
||||
/* |
||||
* Licensed to the Apache Software Foundation (ASF) under one or more |
||||
* contributor license agreements. See the NOTICE file distributed with |
||||
* this work for additional information regarding copyright ownership. |
||||
* The ASF licenses this file to You under the Apache License, Version 2.0 |
||||
* (the "License"); you may not use this file except in compliance with |
||||
* the License. You may obtain a copy of the License at |
||||
* |
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
* |
||||
* Unless required by applicable law or agreed to in writing, software |
||||
* distributed under the License is distributed on an "AS IS" BASIS, |
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
||||
* See the License for the specific language governing permissions and |
||||
* limitations under the License. |
||||
*/ |
||||
package org.apache.kafka.connect.runtime.rest.entities; |
||||
|
||||
import com.fasterxml.jackson.annotation.JsonProperty; |
||||
|
||||
import java.util.Objects; |
||||
|
||||
public class LoggerLevel { |
||||
|
||||
private final String level; |
||||
private final Long lastModified; |
||||
|
||||
public LoggerLevel( |
||||
@JsonProperty("level") String level, |
||||
@JsonProperty("last_modified") Long lastModified |
||||
) { |
||||
this.level = Objects.requireNonNull(level, "level may not be null"); |
||||
this.lastModified = lastModified; |
||||
} |
||||
|
||||
@JsonProperty |
||||
public String level() { |
||||
return level; |
||||
} |
||||
|
||||
@JsonProperty("last_modified") |
||||
public Long lastModified() { |
||||
return lastModified; |
||||
} |
||||
|
||||
@Override |
||||
public boolean equals(Object o) { |
||||
if (this == o) |
||||
return true; |
||||
if (o == null || getClass() != o.getClass()) |
||||
return false; |
||||
LoggerLevel that = (LoggerLevel) o; |
||||
return level.equals(that.level) && Objects.equals(lastModified, that.lastModified); |
||||
} |
||||
|
||||
@Override |
||||
public int hashCode() { |
||||
return Objects.hash(level, lastModified); |
||||
} |
||||
|
||||
@Override |
||||
public String toString() { |
||||
return "LoggerLevel{" |
||||
+ "level='" + level + '\'' |
||||
+ ", lastModified=" + lastModified |
||||
+ '}'; |
||||
} |
||||
} |
@ -0,0 +1,205 @@
@@ -0,0 +1,205 @@
|
||||
/* |
||||
* Licensed to the Apache Software Foundation (ASF) under one or more |
||||
* contributor license agreements. See the NOTICE file distributed with |
||||
* this work for additional information regarding copyright ownership. |
||||
* The ASF licenses this file to You under the Apache License, Version 2.0 |
||||
* (the "License"); you may not use this file except in compliance with |
||||
* the License. You may obtain a copy of the License at |
||||
* |
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
* |
||||
* Unless required by applicable law or agreed to in writing, software |
||||
* distributed under the License is distributed on an "AS IS" BASIS, |
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
||||
* See the License for the specific language governing permissions and |
||||
* limitations under the License. |
||||
*/ |
||||
package org.apache.kafka.connect.integration; |
||||
|
||||
import org.apache.kafka.common.utils.Utils; |
||||
import org.apache.kafka.connect.runtime.rest.entities.LoggerLevel; |
||||
import org.apache.kafka.connect.util.clusters.EmbeddedConnectStandalone; |
||||
import org.apache.kafka.test.IntegrationTest; |
||||
import org.junit.After; |
||||
import org.junit.Before; |
||||
import org.junit.Test; |
||||
import org.junit.experimental.categories.Category; |
||||
|
||||
import java.util.Collections; |
||||
import java.util.HashSet; |
||||
import java.util.List; |
||||
import java.util.Map; |
||||
import java.util.Set; |
||||
import java.util.stream.Collectors; |
||||
|
||||
import static org.junit.Assert.assertEquals; |
||||
import static org.junit.Assert.assertFalse; |
||||
import static org.junit.Assert.assertNotNull; |
||||
import static org.junit.Assert.assertNull; |
||||
import static org.junit.Assert.assertTrue; |
||||
|
||||
@Category(IntegrationTest.class) |
||||
public class StandaloneWorkerIntegrationTest { |
||||
|
||||
private EmbeddedConnectStandalone connect; |
||||
|
||||
@Before |
||||
public void setup() { |
||||
connect = new EmbeddedConnectStandalone.Builder() |
||||
.build(); |
||||
connect.start(); |
||||
} |
||||
|
||||
@After |
||||
public void cleanup() { |
||||
connect.stop(); |
||||
} |
||||
|
||||
@Test |
||||
public void testDynamicLogging() { |
||||
Map<String, LoggerLevel> initialLevels = connect.allLogLevels(); |
||||
assertFalse("Connect REST API did not list any known loggers", initialLevels.isEmpty()); |
||||
Map<String, LoggerLevel> invalidModifiedLoggers = Utils.filterMap( |
||||
initialLevels, |
||||
StandaloneWorkerIntegrationTest::isModified |
||||
); |
||||
assertEquals( |
||||
"No loggers should have a non-null last-modified timestamp", |
||||
Collections.emptyMap(), |
||||
invalidModifiedLoggers |
||||
); |
||||
|
||||
// Tests with no scope
|
||||
// The current level may match the first level we set the namespace to,
|
||||
// so we issue a preliminary request with a different level to guarantee that a
|
||||
// change takes place and that the last modified timestamp should be non-null
|
||||
final String namespace1 = "org.apache.kafka.connect"; |
||||
final String level1 = "DEBUG"; |
||||
connect.setLogLevel(namespace1, "ERROR", null); |
||||
Map<String, LoggerLevel> currentLevels = testSetLoggingLevel(namespace1, level1, null, initialLevels); |
||||
|
||||
// Tests with scope=worker
|
||||
final String namespace2 = "org.apache.kafka.clients"; |
||||
final String level2 = "INFO"; |
||||
connect.setLogLevel(namespace2, "WARN", "worker"); |
||||
currentLevels = testSetLoggingLevel(namespace2, level2, "worker", currentLevels); |
||||
|
||||
LoggerLevel priorLoggerLevel = connect.getLogLevel(namespace2); |
||||
connect.setLogLevel(namespace2, level2, "worker"); |
||||
LoggerLevel currentLoggerLevel = connect.getLogLevel(namespace2); |
||||
assertEquals( |
||||
"Log level and last-modified timestamp should not be affected by consecutive identical requests", |
||||
priorLoggerLevel, |
||||
currentLoggerLevel |
||||
); |
||||
|
||||
// Tests with scope=cluster
|
||||
final String namespace3 = "org.apache.kafka.streams"; |
||||
final String level3 = "TRACE"; |
||||
connect.setLogLevel(namespace3, "DEBUG", "cluster"); |
||||
testSetLoggingLevel(namespace3, level3, "cluster", currentLevels); |
||||
} |
||||
|
||||
private Map<String, LoggerLevel> testSetLoggingLevel( |
||||
String namespace, |
||||
String level, |
||||
String scope, |
||||
Map<String, LoggerLevel> initialLevels |
||||
) { |
||||
long requestTime = System.currentTimeMillis(); |
||||
List<String> affectedLoggers = connect.setLogLevel(namespace, level, scope); |
||||
if ("cluster".equals(scope)) { |
||||
assertNull( |
||||
"Modifying log levels with scope=cluster should result in an empty response", |
||||
affectedLoggers |
||||
); |
||||
} else { |
||||
assertTrue(affectedLoggers.contains(namespace)); |
||||
List<String> invalidAffectedLoggers = affectedLoggers.stream() |
||||
.filter(l -> !l.startsWith(namespace)) |
||||
.collect(Collectors.toList()); |
||||
assertEquals( |
||||
"No loggers outside the namespace '" + namespace |
||||
+ "' should have been included in the response for a request to modify that namespace", |
||||
Collections.emptyList(), |
||||
invalidAffectedLoggers |
||||
); |
||||
} |
||||
|
||||
// Verify the information for this single logger
|
||||
|
||||
LoggerLevel loggerLevel = connect.getLogLevel(namespace); |
||||
assertNotNull(loggerLevel); |
||||
assertEquals(level, loggerLevel.level()); |
||||
assertNotNull(loggerLevel.lastModified()); |
||||
assertTrue( |
||||
"Last-modified timestamp for logger level is " + loggerLevel.lastModified() |
||||
+ ", which is before " + requestTime + ", the most-recent time the level was adjusted", |
||||
loggerLevel.lastModified() >= requestTime |
||||
); |
||||
|
||||
// Verify information for all listed loggers
|
||||
|
||||
Map<String, LoggerLevel> newLevels = connect.allLogLevels(); |
||||
|
||||
Map<String, LoggerLevel> invalidAffectedLoggerLevels = Utils.filterMap( |
||||
newLevels, |
||||
e -> hasNamespace(e, namespace) |
||||
&& (!level(e).equals(level) |
||||
|| !isModified(e) |
||||
|| lastModified(e) < requestTime |
||||
) |
||||
); |
||||
assertEquals( |
||||
"At least one logger in the affected namespace '" + namespace |
||||
+ "' does not have the expected level of '" + level |
||||
+ "', has a null last-modified timestamp, or has a last-modified timestamp " |
||||
+ "that is less recent than " + requestTime |
||||
+ ", which is when the namespace was last adjusted", |
||||
Collections.emptyMap(), |
||||
invalidAffectedLoggerLevels |
||||
); |
||||
|
||||
Set<String> droppedLoggers = Utils.diff(HashSet::new, initialLevels.keySet(), newLevels.keySet()); |
||||
assertEquals( |
||||
"At least one logger was present in the listing of all loggers " |
||||
+ "before the logging level for namespace '" + namespace |
||||
+ "' was set to '" + level |
||||
+ "' that is no longer present", |
||||
Collections.emptySet(), |
||||
droppedLoggers |
||||
); |
||||
|
||||
Map<String, LoggerLevel> invalidUnaffectedLoggerLevels = Utils.filterMap( |
||||
newLevels, |
||||
e -> !hasNamespace(e, namespace) && !e.getValue().equals(initialLevels.get(e.getKey())) |
||||
); |
||||
assertEquals( |
||||
"At least one logger outside of the affected namespace '" + namespace |
||||
+ "' has a different logging level or last-modified timestamp than it did " |
||||
+ "before the namespace was set to level '" + level |
||||
+ "'; none of these loggers should have been affected", |
||||
Collections.emptyMap(), |
||||
invalidUnaffectedLoggerLevels |
||||
); |
||||
|
||||
return newLevels; |
||||
} |
||||
|
||||
private static boolean hasNamespace(Map.Entry<String, ?> entry, String namespace) { |
||||
return entry.getKey().startsWith(namespace); |
||||
} |
||||
|
||||
private static boolean isModified(Map.Entry<?, LoggerLevel> entry) { |
||||
return lastModified(entry) != null; |
||||
} |
||||
|
||||
private static Long lastModified(Map.Entry<?, LoggerLevel> entry) { |
||||
return entry.getValue().lastModified(); |
||||
} |
||||
|
||||
private static String level(Map.Entry<?, LoggerLevel> entry) { |
||||
return entry.getValue().level(); |
||||
} |
||||
|
||||
} |
@ -0,0 +1,246 @@
@@ -0,0 +1,246 @@
|
||||
/* |
||||
* Licensed to the Apache Software Foundation (ASF) under one or more |
||||
* contributor license agreements. See the NOTICE file distributed with |
||||
* this work for additional information regarding copyright ownership. |
||||
* The ASF licenses this file to You under the Apache License, Version 2.0 |
||||
* (the "License"); you may not use this file except in compliance with |
||||
* the License. You may obtain a copy of the License at |
||||
* |
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
* |
||||
* Unless required by applicable law or agreed to in writing, software |
||||
* distributed under the License is distributed on an "AS IS" BASIS, |
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
||||
* See the License for the specific language governing permissions and |
||||
* limitations under the License. |
||||
*/ |
||||
package org.apache.kafka.connect.runtime; |
||||
|
||||
import org.apache.kafka.common.utils.MockTime; |
||||
import org.apache.kafka.common.utils.Time; |
||||
import org.apache.kafka.connect.runtime.rest.entities.LoggerLevel; |
||||
import org.apache.log4j.Hierarchy; |
||||
import org.apache.log4j.Level; |
||||
import org.apache.log4j.Logger; |
||||
import org.junit.Before; |
||||
import org.junit.Test; |
||||
import org.junit.runner.RunWith; |
||||
import org.mockito.junit.MockitoJUnitRunner; |
||||
|
||||
import java.util.Arrays; |
||||
import java.util.Collections; |
||||
import java.util.Enumeration; |
||||
import java.util.HashMap; |
||||
import java.util.List; |
||||
import java.util.Map; |
||||
import java.util.Vector; |
||||
import java.util.function.Function; |
||||
import java.util.stream.Collectors; |
||||
import java.util.stream.Stream; |
||||
|
||||
import static org.junit.Assert.assertEquals; |
||||
import static org.junit.Assert.assertNull; |
||||
import static org.junit.Assert.assertThrows; |
||||
|
||||
@RunWith(MockitoJUnitRunner.StrictStubs.class) |
||||
public class LoggersTest { |
||||
|
||||
private static final long INITIAL_TIME = 1696951712135L; |
||||
private Time time; |
||||
|
||||
@Before |
||||
public void setup() { |
||||
time = new MockTime(0, INITIAL_TIME, 0); |
||||
} |
||||
|
||||
@Test |
||||
public void testGetLoggersIgnoresNullLevels() { |
||||
Logger root = logger("root"); |
||||
|
||||
Logger a = logger("a"); |
||||
a.setLevel(null); |
||||
Logger b = logger("b"); |
||||
b.setLevel(Level.INFO); |
||||
|
||||
Loggers loggers = new TestLoggers(root, a, b); |
||||
|
||||
Map<String, LoggerLevel> expectedLevels = Collections.singletonMap( |
||||
"b", |
||||
new LoggerLevel(Level.INFO.toString(), null) |
||||
); |
||||
Map<String, LoggerLevel> actualLevels = loggers.allLevels(); |
||||
assertEquals(expectedLevels, actualLevels); |
||||
} |
||||
|
||||
@Test |
||||
public void testGetLoggerFallsBackToEffectiveLogLevel() { |
||||
Logger root = logger("root"); |
||||
root.setLevel(Level.ERROR); |
||||
|
||||
Hierarchy hierarchy = new Hierarchy(root); |
||||
Logger a = hierarchy.getLogger("a"); |
||||
a.setLevel(null); |
||||
Logger b = hierarchy.getLogger("b"); |
||||
b.setLevel(Level.INFO); |
||||
|
||||
Loggers loggers = new TestLoggers(root, a, b); |
||||
|
||||
LoggerLevel expectedLevel = new LoggerLevel(Level.ERROR.toString(), null); |
||||
LoggerLevel actualLevel = loggers.level("a"); |
||||
assertEquals(expectedLevel, actualLevel); |
||||
} |
||||
|
||||
@Test |
||||
public void testGetUnknownLogger() { |
||||
Logger root = logger("root"); |
||||
root.setLevel(Level.ERROR); |
||||
|
||||
Hierarchy hierarchy = new Hierarchy(root); |
||||
Logger a = hierarchy.getLogger("a"); |
||||
a.setLevel(null); |
||||
Logger b = hierarchy.getLogger("b"); |
||||
b.setLevel(Level.INFO); |
||||
|
||||
Loggers loggers = new TestLoggers(root, a, b); |
||||
|
||||
LoggerLevel level = loggers.level("c"); |
||||
assertNull(level); |
||||
} |
||||
|
||||
@Test |
||||
public void testSetLevel() { |
||||
Logger root = logger("root"); |
||||
root.setLevel(Level.ERROR); |
||||
|
||||
Logger x = logger("a.b.c.p.X"); |
||||
Logger y = logger("a.b.c.p.Y"); |
||||
Logger z = logger("a.b.c.p.Z"); |
||||
Logger w = logger("a.b.c.s.W"); |
||||
x.setLevel(Level.INFO); |
||||
y.setLevel(Level.INFO); |
||||
z.setLevel(Level.INFO); |
||||
w.setLevel(Level.INFO); |
||||
|
||||
// We don't explicitly register a logger for a.b.c.p, so it won't appear in the list of current loggers;
|
||||
// one should be created by the Loggers instance when we set the level
|
||||
TestLoggers loggers = new TestLoggers(root, x, y, z, w); |
||||
|
||||
List<String> modified = loggers.setLevel("a.b.c.p", Level.DEBUG); |
||||
assertEquals(Arrays.asList("a.b.c.p", "a.b.c.p.X", "a.b.c.p.Y", "a.b.c.p.Z"), modified); |
||||
assertEquals(Level.DEBUG.toString(), loggers.level("a.b.c.p").level()); |
||||
assertEquals(Level.DEBUG, x.getLevel()); |
||||
assertEquals(Level.DEBUG, y.getLevel()); |
||||
assertEquals(Level.DEBUG, z.getLevel()); |
||||
|
||||
LoggerLevel expectedLevel = new LoggerLevel(Level.DEBUG.toString(), INITIAL_TIME); |
||||
LoggerLevel actualLevel = loggers.level("a.b.c.p"); |
||||
assertEquals(expectedLevel, actualLevel); |
||||
|
||||
// Sleep a little and adjust the level of a leaf logger
|
||||
time.sleep(10); |
||||
loggers.setLevel("a.b.c.p.X", Level.ERROR); |
||||
expectedLevel = new LoggerLevel(Level.ERROR.toString(), INITIAL_TIME + 10); |
||||
actualLevel = loggers.level("a.b.c.p.X"); |
||||
assertEquals(expectedLevel, actualLevel); |
||||
|
||||
// Make sure that the direct parent logger and a sibling logger remain unaffected
|
||||
expectedLevel = new LoggerLevel(Level.DEBUG.toString(), INITIAL_TIME); |
||||
actualLevel = loggers.level("a.b.c.p"); |
||||
assertEquals(expectedLevel, actualLevel); |
||||
|
||||
expectedLevel = new LoggerLevel(Level.DEBUG.toString(), INITIAL_TIME); |
||||
actualLevel = loggers.level("a.b.c.p.Y"); |
||||
assertEquals(expectedLevel, actualLevel); |
||||
|
||||
// Set the same level again, and verify that the last modified time hasn't been altered
|
||||
time.sleep(10); |
||||
loggers.setLevel("a.b.c.p.X", Level.ERROR); |
||||
expectedLevel = new LoggerLevel(Level.ERROR.toString(), INITIAL_TIME + 10); |
||||
actualLevel = loggers.level("a.b.c.p.X"); |
||||
assertEquals(expectedLevel, actualLevel); |
||||
} |
||||
|
||||
@Test |
||||
public void testSetRootLevel() { |
||||
Logger root = logger("root"); |
||||
root.setLevel(Level.ERROR); |
||||
|
||||
Logger p = logger("a.b.c.p"); |
||||
Logger x = logger("a.b.c.p.X"); |
||||
Logger y = logger("a.b.c.p.Y"); |
||||
Logger z = logger("a.b.c.p.Z"); |
||||
Logger w = logger("a.b.c.s.W"); |
||||
x.setLevel(Level.INFO); |
||||
y.setLevel(Level.INFO); |
||||
z.setLevel(Level.INFO); |
||||
w.setLevel(Level.INFO); |
||||
|
||||
Loggers loggers = new TestLoggers(root, x, y, z, w); |
||||
|
||||
List<String> modified = loggers.setLevel("root", Level.DEBUG); |
||||
assertEquals(Arrays.asList("a.b.c.p.X", "a.b.c.p.Y", "a.b.c.p.Z", "a.b.c.s.W", "root"), modified); |
||||
|
||||
assertNull(p.getLevel()); |
||||
|
||||
assertEquals(root.getLevel(), Level.DEBUG); |
||||
|
||||
assertEquals(w.getLevel(), Level.DEBUG); |
||||
assertEquals(x.getLevel(), Level.DEBUG); |
||||
assertEquals(y.getLevel(), Level.DEBUG); |
||||
assertEquals(z.getLevel(), Level.DEBUG); |
||||
|
||||
Map<String, LoggerLevel> expectedLevels = new HashMap<>(); |
||||
expectedLevels.put("root", new LoggerLevel(Level.DEBUG.toString(), INITIAL_TIME)); |
||||
expectedLevels.put("a.b.c.p.X", new LoggerLevel(Level.DEBUG.toString(), INITIAL_TIME)); |
||||
expectedLevels.put("a.b.c.p.Y", new LoggerLevel(Level.DEBUG.toString(), INITIAL_TIME)); |
||||
expectedLevels.put("a.b.c.p.Z", new LoggerLevel(Level.DEBUG.toString(), INITIAL_TIME)); |
||||
expectedLevels.put("a.b.c.s.W", new LoggerLevel(Level.DEBUG.toString(), INITIAL_TIME)); |
||||
|
||||
Map<String, LoggerLevel> actualLevels = loggers.allLevels(); |
||||
assertEquals(expectedLevels, actualLevels); |
||||
} |
||||
|
||||
@Test |
||||
public void testSetLevelNullArguments() { |
||||
Logger root = logger("root"); |
||||
Loggers loggers = new TestLoggers(root); |
||||
assertThrows(NullPointerException.class, () -> loggers.setLevel(null, Level.INFO)); |
||||
assertThrows(NullPointerException.class, () -> loggers.setLevel("root", null)); |
||||
} |
||||
|
||||
private class TestLoggers extends Loggers { |
||||
|
||||
private final Logger rootLogger; |
||||
private final Map<String, Logger> currentLoggers; |
||||
|
||||
public TestLoggers(Logger rootLogger, Logger... knownLoggers) { |
||||
super(time); |
||||
this.rootLogger = rootLogger; |
||||
this.currentLoggers = new HashMap<>(Stream.of(knownLoggers) |
||||
.collect(Collectors.toMap( |
||||
Logger::getName, |
||||
Function.identity() |
||||
))); |
||||
} |
||||
|
||||
@Override |
||||
Logger lookupLogger(String logger) { |
||||
return currentLoggers.computeIfAbsent(logger, l -> new Logger(logger) { }); |
||||
} |
||||
|
||||
@Override |
||||
Enumeration<Logger> currentLoggers() { |
||||
return new Vector<>(currentLoggers.values()).elements(); |
||||
} |
||||
|
||||
@Override |
||||
Logger rootLogger() { |
||||
return rootLogger; |
||||
} |
||||
} |
||||
|
||||
private Logger logger(String name) { |
||||
return new Logger(name) { }; |
||||
} |
||||
|
||||
} |
@ -0,0 +1,618 @@
@@ -0,0 +1,618 @@
|
||||
/* |
||||
* Licensed to the Apache Software Foundation (ASF) under one or more |
||||
* contributor license agreements. See the NOTICE file distributed with |
||||
* this work for additional information regarding copyright ownership. |
||||
* The ASF licenses this file to You under the Apache License, Version 2.0 |
||||
* (the "License"); you may not use this file except in compliance with |
||||
* the License. You may obtain a copy of the License at |
||||
* |
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
* |
||||
* Unless required by applicable law or agreed to in writing, software |
||||
* distributed under the License is distributed on an "AS IS" BASIS, |
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
||||
* See the License for the specific language governing permissions and |
||||
* limitations under the License. |
||||
*/ |
||||
package org.apache.kafka.connect.util.clusters; |
||||
|
||||
import org.apache.kafka.clients.admin.TopicDescription; |
||||
import org.apache.kafka.connect.runtime.AbstractStatus; |
||||
import org.apache.kafka.connect.runtime.rest.entities.ActiveTopicsInfo; |
||||
import org.apache.kafka.connect.runtime.rest.entities.ConnectorStateInfo; |
||||
import org.apache.kafka.connect.runtime.rest.errors.ConnectRestException; |
||||
import org.slf4j.Logger; |
||||
import org.slf4j.LoggerFactory; |
||||
|
||||
import javax.ws.rs.core.Response; |
||||
import java.util.Arrays; |
||||
import java.util.Collection; |
||||
import java.util.HashSet; |
||||
import java.util.Map; |
||||
import java.util.Optional; |
||||
import java.util.Set; |
||||
import java.util.concurrent.TimeUnit; |
||||
import java.util.concurrent.atomic.AtomicReference; |
||||
import java.util.function.BiFunction; |
||||
import java.util.stream.Collectors; |
||||
|
||||
import static org.apache.kafka.test.TestUtils.waitForCondition; |
||||
|
||||
/** |
||||
* A set of common assertions that can be applied to a Connect cluster during integration testing |
||||
*/ |
||||
public class ConnectAssertions { |
||||
|
||||
private static final Logger log = LoggerFactory.getLogger(ConnectAssertions.class); |
||||
public static final long WORKER_SETUP_DURATION_MS = TimeUnit.MINUTES.toMillis(5); |
||||
public static final long VALIDATION_DURATION_MS = TimeUnit.SECONDS.toMillis(30); |
||||
public static final long CONNECTOR_SETUP_DURATION_MS = TimeUnit.MINUTES.toMillis(2); |
||||
// Creating a connector requires two rounds of rebalance; destroying one only requires one
|
||||
// Assume it'll take ~half the time to destroy a connector as it does to create one
|
||||
public static final long CONNECTOR_SHUTDOWN_DURATION_MS = TimeUnit.MINUTES.toMillis(1); |
||||
private static final long CONNECT_INTERNAL_TOPIC_UPDATES_DURATION_MS = TimeUnit.SECONDS.toMillis(60); |
||||
|
||||
private final EmbeddedConnect connect; |
||||
|
||||
ConnectAssertions(EmbeddedConnect connect) { |
||||
this.connect = connect; |
||||
} |
||||
|
||||
/** |
||||
* Assert that at least the requested number of workers are up and running. |
||||
* |
||||
* @param numWorkers the number of online workers |
||||
*/ |
||||
public void assertAtLeastNumWorkersAreUp(int numWorkers, String detailMessage) throws InterruptedException { |
||||
try { |
||||
waitForCondition( |
||||
() -> checkWorkersUp(numWorkers, (actual, expected) -> actual >= expected).orElse(false), |
||||
WORKER_SETUP_DURATION_MS, |
||||
"Didn't meet the minimum requested number of online workers: " + numWorkers); |
||||
} catch (AssertionError e) { |
||||
throw new AssertionError(detailMessage, e); |
||||
} |
||||
} |
||||
|
||||
/** |
||||
* Assert that at least the requested number of workers are up and running. |
||||
* |
||||
* @param numWorkers the number of online workers |
||||
*/ |
||||
public void assertExactlyNumWorkersAreUp(int numWorkers, String detailMessage) throws InterruptedException { |
||||
try { |
||||
waitForCondition( |
||||
() -> checkWorkersUp(numWorkers, (actual, expected) -> actual == expected).orElse(false), |
||||
WORKER_SETUP_DURATION_MS, |
||||
"Didn't meet the exact requested number of online workers: " + numWorkers); |
||||
} catch (AssertionError e) { |
||||
throw new AssertionError(detailMessage, e); |
||||
} |
||||
} |
||||
|
||||
/** |
||||
* Confirm that the requested number of workers are up and running. |
||||
* |
||||
* @param numWorkers the number of online workers |
||||
* @return true if at least {@code numWorkers} are up; false otherwise |
||||
*/ |
||||
protected Optional<Boolean> checkWorkersUp(int numWorkers, BiFunction<Integer, Integer, Boolean> comp) { |
||||
try { |
||||
int numUp = connect.activeWorkers().size(); |
||||
return Optional.of(comp.apply(numUp, numWorkers)); |
||||
} catch (Exception e) { |
||||
log.error("Could not check active workers.", e); |
||||
return Optional.empty(); |
||||
} |
||||
} |
||||
|
||||
/** |
||||
* Assert that at least the requested number of workers are up and running. |
||||
* |
||||
* @param numBrokers the number of online brokers |
||||
*/ |
||||
public void assertExactlyNumBrokersAreUp(int numBrokers, String detailMessage) throws InterruptedException { |
||||
try { |
||||
waitForCondition( |
||||
() -> checkBrokersUp(numBrokers, (actual, expected) -> actual == expected).orElse(false), |
||||
WORKER_SETUP_DURATION_MS, |
||||
"Didn't meet the exact requested number of online brokers: " + numBrokers); |
||||
} catch (AssertionError e) { |
||||
throw new AssertionError(detailMessage, e); |
||||
} |
||||
} |
||||
|
||||
/** |
||||
* Confirm that the requested number of brokers are up and running. |
||||
* |
||||
* @param numBrokers the number of online brokers |
||||
* @return true if at least {@code numBrokers} are up; false otherwise |
||||
*/ |
||||
protected Optional<Boolean> checkBrokersUp(int numBrokers, BiFunction<Integer, Integer, Boolean> comp) { |
||||
try { |
||||
int numRunning = connect.kafka().runningBrokers().size(); |
||||
return Optional.of(comp.apply(numRunning, numBrokers)); |
||||
} catch (Exception e) { |
||||
log.error("Could not check running brokers.", e); |
||||
return Optional.empty(); |
||||
} |
||||
} |
||||
|
||||
/** |
||||
* Assert that the topics with the specified names do not exist. |
||||
* |
||||
* @param topicNames the names of the topics that are expected to not exist |
||||
*/ |
||||
public void assertTopicsDoNotExist(String... topicNames) throws InterruptedException { |
||||
Set<String> topicNameSet = new HashSet<>(Arrays.asList(topicNames)); |
||||
AtomicReference<Set<String>> existingTopics = new AtomicReference<>(topicNameSet); |
||||
waitForCondition( |
||||
() -> checkTopicsExist(topicNameSet, (actual, expected) -> { |
||||
existingTopics.set(actual); |
||||
return actual.isEmpty(); |
||||
}).orElse(false), |
||||
CONNECTOR_SETUP_DURATION_MS, |
||||
"Unexpectedly found topics " + existingTopics.get()); |
||||
} |
||||
|
||||
/** |
||||
* Assert that the topics with the specified names do exist. |
||||
* |
||||
* @param topicNames the names of the topics that are expected to exist |
||||
*/ |
||||
public void assertTopicsExist(String... topicNames) throws InterruptedException { |
||||
Set<String> topicNameSet = new HashSet<>(Arrays.asList(topicNames)); |
||||
AtomicReference<Set<String>> missingTopics = new AtomicReference<>(topicNameSet); |
||||
waitForCondition( |
||||
() -> checkTopicsExist(topicNameSet, (actual, expected) -> { |
||||
Set<String> missing = new HashSet<>(expected); |
||||
missing.removeAll(actual); |
||||
missingTopics.set(missing); |
||||
return missing.isEmpty(); |
||||
}).orElse(false), |
||||
CONNECTOR_SETUP_DURATION_MS, |
||||
"Didn't find the topics " + missingTopics.get()); |
||||
} |
||||
|
||||
protected Optional<Boolean> checkTopicsExist(Set<String> topicNames, BiFunction<Set<String>, Set<String>, Boolean> comp) { |
||||
try { |
||||
Map<String, Optional<TopicDescription>> topics = connect.kafka().describeTopics(topicNames); |
||||
Set<String> actualExistingTopics = topics.entrySet() |
||||
.stream() |
||||
.filter(e -> e.getValue().isPresent()) |
||||
.map(Map.Entry::getKey) |
||||
.collect(Collectors.toSet()); |
||||
return Optional.of(comp.apply(actualExistingTopics, topicNames)); |
||||
} catch (Exception e) { |
||||
log.error("Failed to describe the topic(s): {}.", topicNames, e); |
||||
return Optional.empty(); |
||||
} |
||||
} |
||||
|
||||
/** |
||||
* Assert that the named topic is configured to have the specified replication factor and |
||||
* number of partitions. |
||||
* |
||||
* @param topicName the name of the topic that is expected to exist |
||||
* @param replicas the replication factor |
||||
* @param partitions the number of partitions |
||||
* @param detailMessage the assertion message |
||||
*/ |
||||
public void assertTopicSettings(String topicName, int replicas, int partitions, String detailMessage) |
||||
throws InterruptedException { |
||||
try { |
||||
waitForCondition( |
||||
() -> checkTopicSettings( |
||||
topicName, |
||||
replicas, |
||||
partitions |
||||
).orElse(false), |
||||
VALIDATION_DURATION_MS, |
||||
"Topic " + topicName + " does not exist or does not have exactly " |
||||
+ partitions + " partitions or at least " |
||||
+ replicas + " per partition"); |
||||
} catch (AssertionError e) { |
||||
throw new AssertionError(detailMessage, e); |
||||
} |
||||
} |
||||
|
||||
protected Optional<Boolean> checkTopicSettings(String topicName, int replicas, int partitions) { |
||||
try { |
||||
Map<String, Optional<TopicDescription>> topics = connect.kafka().describeTopics(topicName); |
||||
TopicDescription topicDesc = topics.get(topicName).orElse(null); |
||||
boolean result = topicDesc != null |
||||
&& topicDesc.name().equals(topicName) |
||||
&& topicDesc.partitions().size() == partitions |
||||
&& topicDesc.partitions().stream().allMatch(p -> p.replicas().size() >= replicas); |
||||
return Optional.of(result); |
||||
} catch (Exception e) { |
||||
log.error("Failed to describe the topic: {}.", topicName, e); |
||||
return Optional.empty(); |
||||
} |
||||
} |
||||
|
||||
/** |
||||
* Assert that the required number of errors are produced by a connector config validation. |
||||
* |
||||
* @param connectorClass the class of the connector to validate |
||||
* @param connConfig the intended configuration |
||||
* @param numErrors the number of errors expected |
||||
* @param detailMessage the assertion message |
||||
*/ |
||||
public void assertExactlyNumErrorsOnConnectorConfigValidation(String connectorClass, Map<String, String> connConfig, |
||||
int numErrors, String detailMessage) throws InterruptedException { |
||||
assertExactlyNumErrorsOnConnectorConfigValidation(connectorClass, connConfig, numErrors, detailMessage, VALIDATION_DURATION_MS); |
||||
} |
||||
|
||||
/** |
||||
* Assert that the required number of errors are produced by a connector config validation. |
||||
* |
||||
* @param connectorClass the class of the connector to validate |
||||
* @param connConfig the intended configuration |
||||
* @param numErrors the number of errors expected |
||||
* @param detailMessage the assertion message |
||||
* @param timeout how long to retry for before throwing an exception |
||||
* |
||||
* @throws AssertionError if the exact number of errors is not produced during config |
||||
* validation before the timeout expires |
||||
*/ |
||||
public void assertExactlyNumErrorsOnConnectorConfigValidation(String connectorClass, Map<String, String> connConfig, |
||||
int numErrors, String detailMessage, long timeout) throws InterruptedException { |
||||
try { |
||||
waitForCondition( |
||||
() -> checkValidationErrors( |
||||
connectorClass, |
||||
connConfig, |
||||
numErrors, |
||||
(actual, expected) -> actual == expected |
||||
).orElse(false), |
||||
timeout, |
||||
"Didn't meet the exact requested number of validation errors: " + numErrors); |
||||
} catch (AssertionError e) { |
||||
throw new AssertionError(detailMessage, e); |
||||
} |
||||
} |
||||
|
||||
/** |
||||
* Confirm that the requested number of errors are produced by {@link EmbeddedConnect#validateConnectorConfig}. |
||||
* |
||||
* @param connectorClass the class of the connector to validate |
||||
* @param connConfig the intended configuration |
||||
* @param numErrors the number of errors expected |
||||
* @return true if exactly {@code numErrors} are produced by the validation; false otherwise |
||||
*/ |
||||
protected Optional<Boolean> checkValidationErrors(String connectorClass, Map<String, String> connConfig, |
||||
int numErrors, BiFunction<Integer, Integer, Boolean> comp) { |
||||
try { |
||||
int numErrorsProduced = connect.validateConnectorConfig(connectorClass, connConfig).errorCount(); |
||||
return Optional.of(comp.apply(numErrorsProduced, numErrors)); |
||||
} catch (Exception e) { |
||||
log.error("Could not check config validation error count.", e); |
||||
return Optional.empty(); |
||||
} |
||||
} |
||||
|
||||
/** |
||||
* Assert that a connector is running with at least the given number of tasks all in running state |
||||
* |
||||
* @param connectorName the connector name |
||||
* @param numTasks the number of tasks |
||||
* @param detailMessage |
||||
* @throws InterruptedException |
||||
*/ |
||||
public void assertConnectorAndAtLeastNumTasksAreRunning(String connectorName, int numTasks, String detailMessage) |
||||
throws InterruptedException { |
||||
try { |
||||
waitForCondition( |
||||
() -> checkConnectorState( |
||||
connectorName, |
||||
AbstractStatus.State.RUNNING, |
||||
numTasks, |
||||
AbstractStatus.State.RUNNING, |
||||
(actual, expected) -> actual >= expected |
||||
).orElse(false), |
||||
CONNECTOR_SETUP_DURATION_MS, |
||||
"The connector or at least " + numTasks + " of tasks are not running."); |
||||
} catch (AssertionError e) { |
||||
throw new AssertionError(detailMessage, e); |
||||
} |
||||
} |
||||
|
||||
/** |
||||
* Assert that a connector is running, that it has a specific number of tasks, and that all of |
||||
* its tasks are in the RUNNING state. |
||||
* |
||||
* @param connectorName the connector name |
||||
* @param numTasks the number of tasks |
||||
* @param detailMessage the assertion message |
||||
* @throws InterruptedException |
||||
*/ |
||||
public void assertConnectorAndExactlyNumTasksAreRunning(String connectorName, int numTasks, String detailMessage) |
||||
throws InterruptedException { |
||||
try { |
||||
waitForCondition( |
||||
() -> checkConnectorState( |
||||
connectorName, |
||||
AbstractStatus.State.RUNNING, |
||||
numTasks, |
||||
AbstractStatus.State.RUNNING, |
||||
(actual, expected) -> actual == expected |
||||
).orElse(false), |
||||
CONNECTOR_SETUP_DURATION_MS, |
||||
"The connector or exactly " + numTasks + " tasks are not running."); |
||||
} catch (AssertionError e) { |
||||
throw new AssertionError(detailMessage, e); |
||||
} |
||||
} |
||||
|
||||
/** |
||||
* Assert that a connector is paused, that it has a specific number of tasks, and that all of |
||||
* its tasks are in the PAUSED state. |
||||
* |
||||
* @param connectorName the connector name |
||||
* @param numTasks the number of tasks |
||||
* @param detailMessage the assertion message |
||||
* @throws InterruptedException |
||||
*/ |
||||
public void assertConnectorAndExactlyNumTasksArePaused(String connectorName, int numTasks, String detailMessage) |
||||
throws InterruptedException { |
||||
try { |
||||
waitForCondition( |
||||
() -> checkConnectorState( |
||||
connectorName, |
||||
AbstractStatus.State.PAUSED, |
||||
numTasks, |
||||
AbstractStatus.State.PAUSED, |
||||
Integer::equals |
||||
).orElse(false), |
||||
CONNECTOR_SHUTDOWN_DURATION_MS, |
||||
"The connector or exactly " + numTasks + " tasks are not paused."); |
||||
} catch (AssertionError e) { |
||||
throw new AssertionError(detailMessage, e); |
||||
} |
||||
} |
||||
|
||||
/** |
||||
* Assert that a connector is running, that it has a specific number of tasks, and that all of |
||||
* its tasks are in the FAILED state. |
||||
* |
||||
* @param connectorName the connector name |
||||
* @param numTasks the number of tasks |
||||
* @param detailMessage the assertion message |
||||
* @throws InterruptedException |
||||
*/ |
||||
public void assertConnectorIsRunningAndTasksHaveFailed(String connectorName, int numTasks, String detailMessage) |
||||
throws InterruptedException { |
||||
try { |
||||
waitForCondition( |
||||
() -> checkConnectorState( |
||||
connectorName, |
||||
AbstractStatus.State.RUNNING, |
||||
numTasks, |
||||
AbstractStatus.State.FAILED, |
||||
(actual, expected) -> actual >= expected |
||||
).orElse(false), |
||||
CONNECTOR_SETUP_DURATION_MS, |
||||
"Either the connector is not running or not all the " + numTasks + " tasks have failed."); |
||||
} catch (AssertionError e) { |
||||
throw new AssertionError(detailMessage, e); |
||||
} |
||||
} |
||||
|
||||
/** |
||||
* Assert that a connector is running, that it has a specific number of tasks out of that numFailedTasks are in the FAILED state. |
||||
* |
||||
* @param connectorName the connector name |
||||
* @param numTasks the number of tasks |
||||
* @param numFailedTasks the number of failed tasks |
||||
* @param detailMessage the assertion message |
||||
* @throws InterruptedException |
||||
*/ |
||||
public void assertConnectorIsRunningAndNumTasksHaveFailed(String connectorName, int numTasks, int numFailedTasks, String detailMessage) |
||||
throws InterruptedException { |
||||
try { |
||||
waitForCondition( |
||||
() -> checkConnectorState( |
||||
connectorName, |
||||
AbstractStatus.State.RUNNING, |
||||
numTasks, |
||||
numFailedTasks, |
||||
AbstractStatus.State.FAILED, |
||||
(actual, expected) -> actual >= expected |
||||
).orElse(false), |
||||
CONNECTOR_SETUP_DURATION_MS, |
||||
"Either the connector is not running or not all the " + numTasks + " tasks have failed."); |
||||
} catch (AssertionError e) { |
||||
throw new AssertionError(detailMessage, e); |
||||
} |
||||
} |
||||
|
||||
/** |
||||
* Assert that a connector is in FAILED state, that it has a specific number of tasks, and that all of |
||||
* its tasks are in the FAILED state. |
||||
* |
||||
* @param connectorName the connector name |
||||
* @param numTasks the number of tasks |
||||
* @param detailMessage the assertion message |
||||
* @throws InterruptedException |
||||
*/ |
||||
public void assertConnectorIsFailedAndTasksHaveFailed(String connectorName, int numTasks, String detailMessage) |
||||
throws InterruptedException { |
||||
try { |
||||
waitForCondition( |
||||
() -> checkConnectorState( |
||||
connectorName, |
||||
AbstractStatus.State.FAILED, |
||||
numTasks, |
||||
AbstractStatus.State.FAILED, |
||||
(actual, expected) -> actual >= expected |
||||
).orElse(false), |
||||
CONNECTOR_SETUP_DURATION_MS, |
||||
"Either the connector is running or not all the " + numTasks + " tasks have failed."); |
||||
} catch (AssertionError e) { |
||||
throw new AssertionError(detailMessage, e); |
||||
} |
||||
} |
||||
|
||||
/** |
||||
* Assert that a connector does not exist. This can be used to verify that a connector has been successfully deleted. |
||||
* |
||||
* @param connectorName the connector name |
||||
* @param detailMessage the assertion message |
||||
* @throws InterruptedException |
||||
*/ |
||||
public void assertConnectorDoesNotExist(String connectorName, String detailMessage) |
||||
throws InterruptedException { |
||||
try { |
||||
waitForCondition( |
||||
() -> checkConnectorDoesNotExist(connectorName), |
||||
CONNECTOR_SETUP_DURATION_MS, |
||||
"The connector should not exist."); |
||||
} catch (AssertionError e) { |
||||
throw new AssertionError(detailMessage, e); |
||||
} |
||||
} |
||||
|
||||
/** |
||||
* Check whether a connector exists by querying the <strong><em>GET /connectors/{connector}/status</em></strong> endpoint |
||||
* |
||||
* @param connectorName the connector name |
||||
* @return true if the connector does not exist; false otherwise |
||||
*/ |
||||
protected boolean checkConnectorDoesNotExist(String connectorName) { |
||||
try { |
||||
connect.connectorStatus(connectorName); |
||||
} catch (ConnectRestException e) { |
||||
return e.statusCode() == Response.Status.NOT_FOUND.getStatusCode(); |
||||
} catch (Exception e) { |
||||
log.error("Could not check connector state info.", e); |
||||
return false; |
||||
} |
||||
return false; |
||||
} |
||||
|
||||
/** |
||||
* Assert that a connector is in the stopped state and has no tasks. |
||||
* |
||||
* @param connectorName the connector name |
||||
* @param detailMessage the assertion message |
||||
* @throws InterruptedException |
||||
*/ |
||||
public void assertConnectorIsStopped(String connectorName, String detailMessage) |
||||
throws InterruptedException { |
||||
try { |
||||
waitForCondition( |
||||
() -> checkConnectorState( |
||||
connectorName, |
||||
AbstractStatus.State.STOPPED, |
||||
0, |
||||
null, |
||||
Integer::equals |
||||
).orElse(false), |
||||
CONNECTOR_SHUTDOWN_DURATION_MS, |
||||
"At least the connector or one of its tasks is still running"); |
||||
} catch (AssertionError e) { |
||||
throw new AssertionError(detailMessage, e); |
||||
} |
||||
} |
||||
|
||||
/** |
||||
* Check whether the given connector state matches the current state of the connector and |
||||
* whether it has at least the given number of tasks, with all the tasks matching the given |
||||
* task state. |
||||
* @param connectorName the connector |
||||
* @param connectorState |
||||
* @param numTasks the expected number of tasks |
||||
* @param tasksState |
||||
* @return true if the connector and tasks are in RUNNING state; false otherwise |
||||
*/ |
||||
protected Optional<Boolean> checkConnectorState( |
||||
String connectorName, |
||||
AbstractStatus.State connectorState, |
||||
int numTasks, |
||||
AbstractStatus.State tasksState, |
||||
BiFunction<Integer, Integer, Boolean> comp |
||||
) { |
||||
try { |
||||
ConnectorStateInfo info = connect.connectorStatus(connectorName); |
||||
boolean result = info != null |
||||
&& comp.apply(info.tasks().size(), numTasks) |
||||
&& info.connector().state().equals(connectorState.toString()) |
||||
&& info.tasks().stream().allMatch(s -> s.state().equals(tasksState.toString())); |
||||
return Optional.of(result); |
||||
} catch (Exception e) { |
||||
log.error("Could not check connector state info.", e); |
||||
return Optional.empty(); |
||||
} |
||||
} |
||||
|
||||
/** |
||||
* Check whether the given connector state matches the current state of the connector and |
||||
* whether it has at least the given number of tasks, with numTasksInTasksState matching the given |
||||
* task state. |
||||
* @param connectorName the connector |
||||
* @param connectorState |
||||
* @param numTasks the expected number of tasks |
||||
* @param tasksState |
||||
* @return true if the connector and tasks are in RUNNING state; false otherwise |
||||
*/ |
||||
protected Optional<Boolean> checkConnectorState( |
||||
String connectorName, |
||||
AbstractStatus.State connectorState, |
||||
int numTasks, |
||||
int numTasksInTasksState, |
||||
AbstractStatus.State tasksState, |
||||
BiFunction<Integer, Integer, Boolean> comp |
||||
) { |
||||
try { |
||||
ConnectorStateInfo info = connect.connectorStatus(connectorName); |
||||
boolean result = info != null |
||||
&& comp.apply(info.tasks().size(), numTasks) |
||||
&& info.connector().state().equals(connectorState.toString()) |
||||
&& info.tasks().stream().filter(s -> s.state().equals(tasksState.toString())).count() == numTasksInTasksState; |
||||
return Optional.of(result); |
||||
} catch (Exception e) { |
||||
log.error("Could not check connector state info.", e); |
||||
return Optional.empty(); |
||||
} |
||||
} |
||||
/** |
||||
* Assert that a connector's set of active topics matches the given collection of topic names. |
||||
* |
||||
* @param connectorName the connector name |
||||
* @param topics a collection of topics to compare against |
||||
* @param detailMessage the assertion message |
||||
* @throws InterruptedException |
||||
*/ |
||||
public void assertConnectorActiveTopics(String connectorName, Collection<String> topics, String detailMessage) throws InterruptedException { |
||||
try { |
||||
waitForCondition( |
||||
() -> checkConnectorActiveTopics(connectorName, topics).orElse(false), |
||||
CONNECT_INTERNAL_TOPIC_UPDATES_DURATION_MS, |
||||
"Connector active topics don't match the expected collection"); |
||||
} catch (AssertionError e) { |
||||
throw new AssertionError(detailMessage, e); |
||||
} |
||||
} |
||||
|
||||
/** |
||||
* Check whether a connector's set of active topics matches the given collection of topic names. |
||||
* |
||||
* @param connectorName the connector name |
||||
* @param topics a collection of topics to compare against |
||||
* @return true if the connector's active topics matches the given collection; false otherwise |
||||
*/ |
||||
protected Optional<Boolean> checkConnectorActiveTopics(String connectorName, Collection<String> topics) { |
||||
try { |
||||
ActiveTopicsInfo info = connect.connectorTopics(connectorName); |
||||
boolean result = info != null |
||||
&& topics.size() == info.topics().size() |
||||
&& topics.containsAll(info.topics()); |
||||
log.debug("Found connector {} using topics: {}", connectorName, info.topics()); |
||||
return Optional.of(result); |
||||
} catch (Exception e) { |
||||
log.error("Could not check connector {} state info.", connectorName, e); |
||||
return Optional.empty(); |
||||
} |
||||
} |
||||
} |
@ -0,0 +1,956 @@
@@ -0,0 +1,956 @@
|
||||
/* |
||||
* Licensed to the Apache Software Foundation (ASF) under one or more |
||||
* contributor license agreements. See the NOTICE file distributed with |
||||
* this work for additional information regarding copyright ownership. |
||||
* The ASF licenses this file to You under the Apache License, Version 2.0 |
||||
* (the "License"); you may not use this file except in compliance with |
||||
* the License. You may obtain a copy of the License at |
||||
* |
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
* |
||||
* Unless required by applicable law or agreed to in writing, software |
||||
* distributed under the License is distributed on an "AS IS" BASIS, |
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
||||
* See the License for the specific language governing permissions and |
||||
* limitations under the License. |
||||
*/ |
||||
package org.apache.kafka.connect.util.clusters; |
||||
|
||||
import com.fasterxml.jackson.core.type.TypeReference; |
||||
import com.fasterxml.jackson.databind.ObjectMapper; |
||||
import org.apache.kafka.clients.consumer.OffsetAndMetadata; |
||||
import org.apache.kafka.common.TopicPartition; |
||||
import org.apache.kafka.common.utils.Exit; |
||||
import org.apache.kafka.common.utils.Utils; |
||||
import org.apache.kafka.connect.errors.ConnectException; |
||||
import org.apache.kafka.connect.runtime.isolation.Plugins; |
||||
import org.apache.kafka.connect.runtime.rest.entities.ActiveTopicsInfo; |
||||
import org.apache.kafka.connect.runtime.rest.entities.ConfigInfos; |
||||
import org.apache.kafka.connect.runtime.rest.entities.ConnectorInfo; |
||||
import org.apache.kafka.connect.runtime.rest.entities.ConnectorOffset; |
||||
import org.apache.kafka.connect.runtime.rest.entities.ConnectorOffsets; |
||||
import org.apache.kafka.connect.runtime.rest.entities.ConnectorStateInfo; |
||||
import org.apache.kafka.connect.runtime.rest.entities.LoggerLevel; |
||||
import org.apache.kafka.connect.runtime.rest.entities.ServerInfo; |
||||
import org.apache.kafka.connect.runtime.rest.entities.TaskInfo; |
||||
import org.apache.kafka.connect.runtime.rest.errors.ConnectRestException; |
||||
import org.apache.kafka.connect.util.SinkUtils; |
||||
import org.eclipse.jetty.client.HttpClient; |
||||
import org.eclipse.jetty.client.api.ContentResponse; |
||||
import org.eclipse.jetty.client.api.Request; |
||||
import org.eclipse.jetty.client.util.StringContentProvider; |
||||
import org.slf4j.Logger; |
||||
import org.slf4j.LoggerFactory; |
||||
|
||||
import javax.ws.rs.core.Response; |
||||
import java.io.IOException; |
||||
import java.util.Collection; |
||||
import java.util.Collections; |
||||
import java.util.HashSet; |
||||
import java.util.List; |
||||
import java.util.Map; |
||||
import java.util.Objects; |
||||
import java.util.Properties; |
||||
import java.util.Set; |
||||
import java.util.stream.Collectors; |
||||
|
||||
abstract class EmbeddedConnect { |
||||
|
||||
private static final Logger log = LoggerFactory.getLogger(EmbeddedConnect.class); |
||||
|
||||
public static final int DEFAULT_NUM_BROKERS = 1; |
||||
|
||||
protected final int numBrokers; |
||||
|
||||
private final EmbeddedKafkaCluster kafkaCluster; |
||||
private final boolean maskExitProcedures; |
||||
private final HttpClient httpClient; |
||||
private final ConnectAssertions assertions; |
||||
private final ClassLoader originalClassLoader; |
||||
|
||||
protected EmbeddedConnect( |
||||
int numBrokers, |
||||
Properties brokerProps, |
||||
boolean maskExitProcedures, |
||||
Map<String, String> clientProps |
||||
) { |
||||
this.numBrokers = numBrokers; |
||||
this.kafkaCluster = new EmbeddedKafkaCluster(numBrokers, brokerProps, clientProps); |
||||
this.maskExitProcedures = maskExitProcedures; |
||||
this.httpClient = new HttpClient(); |
||||
this.assertions = new ConnectAssertions(this); |
||||
// we should keep the original class loader and set it back after connector stopped since the connector will change the class loader,
|
||||
// and then, the Mockito will use the unexpected class loader to generate the wrong proxy instance, which makes mock failed
|
||||
this.originalClassLoader = Thread.currentThread().getContextClassLoader(); |
||||
} |
||||
|
||||
/** |
||||
* @return the set of all {@link WorkerHandle workers}, running or stopped, in the cluster; |
||||
* may be empty, but never null |
||||
*/ |
||||
protected abstract Set<WorkerHandle> workers(); |
||||
|
||||
/** |
||||
* Start (or restart) the {@link WorkerHandle workers} in the cluster. |
||||
*/ |
||||
public abstract void startConnect(); |
||||
|
||||
/** |
||||
* A more graceful way to handle abnormal exit of services in integration tests. |
||||
*/ |
||||
public Exit.Procedure exitProcedure = (code, message) -> { |
||||
if (code != 0) { |
||||
String exitMessage = "Abrupt service exit with code " + code + " and message " + message; |
||||
log.warn(exitMessage); |
||||
throw new UngracefulShutdownException(exitMessage); |
||||
} |
||||
}; |
||||
|
||||
/** |
||||
* A more graceful way to handle abnormal halt of services in integration tests. |
||||
*/ |
||||
public Exit.Procedure haltProcedure = (code, message) -> { |
||||
if (code != 0) { |
||||
String haltMessage = "Abrupt service halt with code " + code + " and message " + message; |
||||
log.warn(haltMessage); |
||||
throw new UngracefulShutdownException(haltMessage); |
||||
} |
||||
}; |
||||
|
||||
/** |
||||
* Start the connect cluster and the embedded Kafka and Zookeeper cluster. |
||||
*/ |
||||
public void start() { |
||||
if (maskExitProcedures) { |
||||
Exit.setExitProcedure(exitProcedure); |
||||
Exit.setHaltProcedure(haltProcedure); |
||||
} |
||||
kafkaCluster.start(); |
||||
startConnect(); |
||||
try { |
||||
httpClient.start(); |
||||
} catch (Exception e) { |
||||
throw new ConnectException("Failed to start HTTP client", e); |
||||
} |
||||
} |
||||
|
||||
/** |
||||
* Stop the connect cluster and the embedded Kafka and Zookeeper cluster. |
||||
* Clean up any temp directories created locally. |
||||
* |
||||
* @throws RuntimeException if Kafka brokers fail to stop |
||||
*/ |
||||
public void stop() { |
||||
Utils.closeQuietly(httpClient::stop, "HTTP client for embedded Connect cluster"); |
||||
workers().forEach(this::stopWorker); |
||||
try { |
||||
kafkaCluster.stop(); |
||||
} catch (UngracefulShutdownException e) { |
||||
log.warn("Kafka did not shutdown gracefully"); |
||||
} catch (Exception e) { |
||||
log.error("Could not stop kafka", e); |
||||
throw new RuntimeException("Could not stop brokers", e); |
||||
} finally { |
||||
if (maskExitProcedures) { |
||||
Exit.resetExitProcedure(); |
||||
Exit.resetHaltProcedure(); |
||||
} |
||||
Plugins.compareAndSwapLoaders(originalClassLoader); |
||||
} |
||||
} |
||||
|
||||
protected void stopWorker(WorkerHandle worker) { |
||||
try { |
||||
log.info("Stopping worker {}", worker); |
||||
worker.stop(); |
||||
} catch (UngracefulShutdownException e) { |
||||
log.warn("Worker {} did not shutdown gracefully", worker); |
||||
} catch (Exception e) { |
||||
log.error("Could not stop connect", e); |
||||
throw new RuntimeException("Could not stop worker", e); |
||||
} |
||||
} |
||||
|
||||
/** |
||||
* Set a new timeout for REST requests to each worker in the cluster. Useful if a request |
||||
* is expected to block, since the time spent awaiting that request can be reduced |
||||
* and test runtime bloat can be avoided. |
||||
* @param requestTimeoutMs the new timeout in milliseconds; must be positive |
||||
*/ |
||||
public void requestTimeout(long requestTimeoutMs) { |
||||
workers().forEach(worker -> worker.requestTimeout(requestTimeoutMs)); |
||||
} |
||||
|
||||
/** |
||||
* Configure a connector. If the connector does not already exist, a new one will be created and |
||||
* the given configuration will be applied to it. |
||||
* |
||||
* @param connName the name of the connector |
||||
* @param connConfig the intended configuration |
||||
* @throws ConnectRestException if the REST api returns error status |
||||
* @throws ConnectException if the configuration fails to be serialized or if the request could not be sent |
||||
*/ |
||||
public String configureConnector(String connName, Map<String, String> connConfig) { |
||||
String url = endpointForResource(String.format("connectors/%s/config", connName)); |
||||
return putConnectorConfig(url, connConfig); |
||||
} |
||||
|
||||
/** |
||||
* Validate a given connector configuration. If the configuration validates or |
||||
* has a configuration error, an instance of {@link ConfigInfos} is returned. If the validation fails |
||||
* an exception is thrown. |
||||
* |
||||
* @param connClassName the name of the connector class |
||||
* @param connConfig the intended configuration |
||||
* @throws ConnectRestException if the REST api returns error status |
||||
* @throws ConnectException if the configuration fails to serialize/deserialize or if the request failed to send |
||||
*/ |
||||
public ConfigInfos validateConnectorConfig(String connClassName, Map<String, String> connConfig) { |
||||
String url = endpointForResource(String.format("connector-plugins/%s/config/validate", connClassName)); |
||||
String response = putConnectorConfig(url, connConfig); |
||||
ConfigInfos configInfos; |
||||
try { |
||||
configInfos = new ObjectMapper().readValue(response, ConfigInfos.class); |
||||
} catch (IOException e) { |
||||
throw new ConnectException("Unable deserialize response into a ConfigInfos object"); |
||||
} |
||||
return configInfos; |
||||
} |
||||
|
||||
/** |
||||
* Execute a PUT request with the given connector configuration on the given URL endpoint. |
||||
* |
||||
* @param url the full URL of the endpoint that corresponds to the given REST resource |
||||
* @param connConfig the intended configuration |
||||
* @throws ConnectRestException if the REST api returns error status |
||||
* @throws ConnectException if the configuration fails to be serialized or if the request could not be sent |
||||
*/ |
||||
protected String putConnectorConfig(String url, Map<String, String> connConfig) { |
||||
ObjectMapper mapper = new ObjectMapper(); |
||||
String content; |
||||
try { |
||||
content = mapper.writeValueAsString(connConfig); |
||||
} catch (IOException e) { |
||||
throw new ConnectException("Could not serialize connector configuration and execute PUT request"); |
||||
} |
||||
Response response = requestPut(url, content); |
||||
if (response.getStatus() < Response.Status.BAD_REQUEST.getStatusCode()) { |
||||
return responseToString(response); |
||||
} |
||||
throw new ConnectRestException(response.getStatus(), |
||||
"Could not execute PUT request. Error response: " + responseToString(response)); |
||||
} |
||||
|
||||
/** |
||||
* Delete an existing connector. |
||||
* |
||||
* @param connName name of the connector to be deleted |
||||
* @throws ConnectRestException if the REST API returns error status |
||||
* @throws ConnectException for any other error. |
||||
*/ |
||||
public void deleteConnector(String connName) { |
||||
String url = endpointForResource(String.format("connectors/%s", connName)); |
||||
Response response = requestDelete(url); |
||||
if (response.getStatus() >= Response.Status.BAD_REQUEST.getStatusCode()) { |
||||
throw new ConnectRestException(response.getStatus(), |
||||
"Could not execute DELETE request. Error response: " + responseToString(response)); |
||||
} |
||||
} |
||||
|
||||
/** |
||||
* Stop an existing connector. |
||||
* |
||||
* @param connName name of the connector to be paused |
||||
* @throws ConnectRestException if the REST API returns error status |
||||
* @throws ConnectException for any other error. |
||||
*/ |
||||
public void stopConnector(String connName) { |
||||
String url = endpointForResource(String.format("connectors/%s/stop", connName)); |
||||
Response response = requestPut(url, ""); |
||||
if (response.getStatus() >= Response.Status.BAD_REQUEST.getStatusCode()) { |
||||
throw new ConnectRestException(response.getStatus(), |
||||
"Could not execute PUT request. Error response: " + responseToString(response)); |
||||
} |
||||
} |
||||
|
||||
/** |
||||
* Pause an existing connector. |
||||
* |
||||
* @param connName name of the connector to be paused |
||||
* @throws ConnectRestException if the REST API returns error status |
||||
* @throws ConnectException for any other error. |
||||
*/ |
||||
public void pauseConnector(String connName) { |
||||
String url = endpointForResource(String.format("connectors/%s/pause", connName)); |
||||
Response response = requestPut(url, ""); |
||||
if (response.getStatus() >= Response.Status.BAD_REQUEST.getStatusCode()) { |
||||
throw new ConnectRestException(response.getStatus(), |
||||
"Could not execute PUT request. Error response: " + responseToString(response)); |
||||
} |
||||
} |
||||
|
||||
/** |
||||
* Resume an existing connector. |
||||
* |
||||
* @param connName name of the connector to be resumed |
||||
* @throws ConnectRestException if the REST API returns error status |
||||
* @throws ConnectException for any other error. |
||||
*/ |
||||
public void resumeConnector(String connName) { |
||||
String url = endpointForResource(String.format("connectors/%s/resume", connName)); |
||||
Response response = requestPut(url, ""); |
||||
if (response.getStatus() >= Response.Status.BAD_REQUEST.getStatusCode()) { |
||||
throw new ConnectRestException(response.getStatus(), |
||||
"Could not execute PUT request. Error response: " + responseToString(response)); |
||||
} |
||||
} |
||||
|
||||
/** |
||||
* Restart an existing connector. |
||||
* |
||||
* @param connName name of the connector to be restarted |
||||
* @throws ConnectRestException if the REST API returns error status |
||||
* @throws ConnectException for any other error. |
||||
*/ |
||||
public void restartConnector(String connName) { |
||||
String url = endpointForResource(String.format("connectors/%s/restart", connName)); |
||||
Response response = requestPost(url, "", Collections.emptyMap()); |
||||
if (response.getStatus() >= Response.Status.BAD_REQUEST.getStatusCode()) { |
||||
throw new ConnectRestException(response.getStatus(), |
||||
"Could not execute POST request. Error response: " + responseToString(response)); |
||||
} |
||||
} |
||||
|
||||
/** |
||||
* Restart an existing connector and its tasks. |
||||
* |
||||
* @param connName name of the connector to be restarted |
||||
* @param onlyFailed true if only failed instances should be restarted |
||||
* @param includeTasks true if tasks should be restarted, or false if only the connector should be restarted |
||||
* @param onlyCallOnEmptyWorker true if the REST API call should be called on a worker not running this connector or its tasks |
||||
* @throws ConnectRestException if the REST API returns error status |
||||
* @throws ConnectException for any other error. |
||||
*/ |
||||
public ConnectorStateInfo restartConnectorAndTasks(String connName, boolean onlyFailed, boolean includeTasks, boolean onlyCallOnEmptyWorker) { |
||||
ObjectMapper mapper = new ObjectMapper(); |
||||
String restartPath = String.format("connectors/%s/restart?onlyFailed=" + onlyFailed + "&includeTasks=" + includeTasks, connName); |
||||
String restartEndpoint; |
||||
if (onlyCallOnEmptyWorker) { |
||||
restartEndpoint = endpointForResourceNotRunningConnector(restartPath, connName); |
||||
} else { |
||||
restartEndpoint = endpointForResource(restartPath); |
||||
} |
||||
Response response = requestPost(restartEndpoint, "", Collections.emptyMap()); |
||||
try { |
||||
if (response.getStatus() < Response.Status.BAD_REQUEST.getStatusCode()) { |
||||
//only the 202 stauts returns a body
|
||||
if (response.getStatus() == Response.Status.ACCEPTED.getStatusCode()) { |
||||
return mapper.readerFor(ConnectorStateInfo.class) |
||||
.readValue(responseToString(response)); |
||||
} |
||||
} |
||||
return null; |
||||
} catch (IOException e) { |
||||
log.error("Could not read connector state from response: {}", |
||||
responseToString(response), e); |
||||
throw new ConnectException("Could not not parse connector state", e); |
||||
} |
||||
} |
||||
/** |
||||
* Get the connector names of the connectors currently running on this cluster. |
||||
* |
||||
* @return the list of connector names |
||||
* @throws ConnectRestException if the HTTP request to the REST API failed with a valid status code. |
||||
* @throws ConnectException for any other error. |
||||
*/ |
||||
public Collection<String> connectors() { |
||||
ObjectMapper mapper = new ObjectMapper(); |
||||
String url = endpointForResource("connectors"); |
||||
Response response = requestGet(url); |
||||
if (response.getStatus() < Response.Status.BAD_REQUEST.getStatusCode()) { |
||||
try { |
||||
return mapper.readerFor(Collection.class).readValue(responseToString(response)); |
||||
} catch (IOException e) { |
||||
log.error("Could not parse connector list from response: {}", |
||||
responseToString(response), e |
||||
); |
||||
throw new ConnectException("Could not not parse connector list", e); |
||||
} |
||||
} |
||||
throw new ConnectRestException(response.getStatus(), |
||||
"Could not read connector list. Error response: " + responseToString(response)); |
||||
} |
||||
|
||||
/** |
||||
* Get the status for a connector running in this cluster. |
||||
* |
||||
* @param connectorName name of the connector |
||||
* @return an instance of {@link ConnectorStateInfo} populated with state information of the connector and its tasks. |
||||
* @throws ConnectRestException if the HTTP request to the REST API failed with a valid status code. |
||||
* @throws ConnectException for any other error. |
||||
*/ |
||||
public ConnectorStateInfo connectorStatus(String connectorName) { |
||||
ObjectMapper mapper = new ObjectMapper(); |
||||
String url = endpointForResource(String.format("connectors/%s/status", connectorName)); |
||||
Response response = requestGet(url); |
||||
try { |
||||
if (response.getStatus() < Response.Status.BAD_REQUEST.getStatusCode()) { |
||||
return mapper.readerFor(ConnectorStateInfo.class) |
||||
.readValue(responseToString(response)); |
||||
} |
||||
} catch (IOException e) { |
||||
log.error("Could not read connector state from response: {}", |
||||
responseToString(response), e); |
||||
throw new ConnectException("Could not not parse connector state", e); |
||||
} |
||||
throw new ConnectRestException(response.getStatus(), |
||||
"Could not read connector state. Error response: " + responseToString(response)); |
||||
} |
||||
|
||||
/** |
||||
* Get the active topics of a connector running in this cluster. |
||||
* |
||||
* @param connectorName name of the connector |
||||
* @return an instance of {@link ConnectorStateInfo} populated with state information of the connector and its tasks. |
||||
* @throws ConnectRestException if the HTTP request to the REST API failed with a valid status code. |
||||
* @throws ConnectException for any other error. |
||||
*/ |
||||
public ActiveTopicsInfo connectorTopics(String connectorName) { |
||||
ObjectMapper mapper = new ObjectMapper(); |
||||
String url = endpointForResource(String.format("connectors/%s/topics", connectorName)); |
||||
Response response = requestGet(url); |
||||
try { |
||||
if (response.getStatus() < Response.Status.BAD_REQUEST.getStatusCode()) { |
||||
Map<String, Map<String, List<String>>> activeTopics = mapper |
||||
.readerFor(new TypeReference<Map<String, Map<String, List<String>>>>() { }) |
||||
.readValue(responseToString(response)); |
||||
return new ActiveTopicsInfo(connectorName, |
||||
activeTopics.get(connectorName).getOrDefault("topics", Collections.emptyList())); |
||||
} |
||||
} catch (IOException e) { |
||||
log.error("Could not read connector state from response: {}", |
||||
responseToString(response), e); |
||||
throw new ConnectException("Could not not parse connector state", e); |
||||
} |
||||
throw new ConnectRestException(response.getStatus(), |
||||
"Could not read connector state. Error response: " + responseToString(response)); |
||||
} |
||||
|
||||
/** |
||||
* Get the info of a connector running in this cluster (retrieved via the <code>GET /connectors/{connector}</code> endpoint). |
||||
|
||||
* @param connectorName name of the connector |
||||
* @return an instance of {@link ConnectorInfo} populated with state information of the connector and its tasks. |
||||
*/ |
||||
public ConnectorInfo connectorInfo(String connectorName) { |
||||
ObjectMapper mapper = new ObjectMapper(); |
||||
String url = endpointForResource(String.format("connectors/%s", connectorName)); |
||||
Response response = requestGet(url); |
||||
try { |
||||
if (response.getStatus() < Response.Status.BAD_REQUEST.getStatusCode()) { |
||||
return mapper.readValue(responseToString(response), ConnectorInfo.class); |
||||
} |
||||
} catch (IOException e) { |
||||
log.error("Could not read connector info from response: {}", |
||||
responseToString(response), e); |
||||
throw new ConnectException("Could not not parse connector info", e); |
||||
} |
||||
throw new ConnectRestException(response.getStatus(), |
||||
"Could not read connector info. Error response: " + responseToString(response)); |
||||
} |
||||
|
||||
/** |
||||
* Get the task configs of a connector running in this cluster. |
||||
* |
||||
* @param connectorName name of the connector |
||||
* @return a list of task configurations for the connector |
||||
*/ |
||||
public List<TaskInfo> taskConfigs(String connectorName) { |
||||
ObjectMapper mapper = new ObjectMapper(); |
||||
String url = endpointForResource(String.format("connectors/%s/tasks", connectorName)); |
||||
Response response = requestGet(url); |
||||
try { |
||||
if (response.getStatus() < Response.Status.BAD_REQUEST.getStatusCode()) { |
||||
// We use String instead of ConnectorTaskId as the key here since the latter can't be automatically
|
||||
// deserialized by Jackson when used as a JSON object key (i.e., when it's serialized as a JSON string)
|
||||
return mapper.readValue(responseToString(response), new TypeReference<List<TaskInfo>>() { }); |
||||
} |
||||
} catch (IOException e) { |
||||
log.error("Could not read task configs from response: {}", |
||||
responseToString(response), e); |
||||
throw new ConnectException("Could not not parse task configs", e); |
||||
} |
||||
throw new ConnectRestException(response.getStatus(), |
||||
"Could not read task configs. Error response: " + responseToString(response)); |
||||
} |
||||
|
||||
/** |
||||
* Reset the set of active topics of a connector running in this cluster. |
||||
* |
||||
* @param connectorName name of the connector |
||||
* @throws ConnectRestException if the HTTP request to the REST API failed with a valid status code. |
||||
* @throws ConnectException for any other error. |
||||
*/ |
||||
public void resetConnectorTopics(String connectorName) { |
||||
String url = endpointForResource(String.format("connectors/%s/topics/reset", connectorName)); |
||||
Response response = requestPut(url, null); |
||||
if (response.getStatus() >= Response.Status.BAD_REQUEST.getStatusCode()) { |
||||
throw new ConnectRestException(response.getStatus(), |
||||
"Resetting active topics for connector " + connectorName + " failed. " |
||||
+ "Error response: " + responseToString(response)); |
||||
} |
||||
} |
||||
|
||||
/** |
||||
* Get the offsets for a connector via the <strong><em>GET /connectors/{connector}/offsets</em></strong> endpoint |
||||
* |
||||
* @param connectorName name of the connector whose offsets are to be retrieved |
||||
* @return the connector's offsets |
||||
*/ |
||||
public ConnectorOffsets connectorOffsets(String connectorName) { |
||||
String url = endpointForResource(String.format("connectors/%s/offsets", connectorName)); |
||||
Response response = requestGet(url); |
||||
ObjectMapper mapper = new ObjectMapper(); |
||||
|
||||
try { |
||||
if (response.getStatus() < Response.Status.BAD_REQUEST.getStatusCode()) { |
||||
return mapper.readerFor(ConnectorOffsets.class).readValue(responseToString(response)); |
||||
} |
||||
} catch (IOException e) { |
||||
throw new ConnectException("Could not not parse connector offsets", e); |
||||
} |
||||
throw new ConnectRestException(response.getStatus(), |
||||
"Could not fetch connector offsets. Error response: " + responseToString(response)); |
||||
} |
||||
|
||||
/** |
||||
* Alter the offset for a source connector's partition via the <strong><em>PATCH /connectors/{connector}/offsets</em></strong> |
||||
* endpoint |
||||
* |
||||
* @param connectorName name of the source connector whose offset is to be altered |
||||
* @param partition the source partition for which the offset is to be altered |
||||
* @param offset the source offset to be written |
||||
* |
||||
* @return the API response as a {@link java.lang.String} |
||||
*/ |
||||
public String alterSourceConnectorOffset(String connectorName, Map<String, ?> partition, Map<String, ?> offset) { |
||||
return alterConnectorOffsets( |
||||
connectorName, |
||||
new ConnectorOffsets(Collections.singletonList(new ConnectorOffset(partition, offset))) |
||||
); |
||||
} |
||||
|
||||
/** |
||||
* Alter the offset for a sink connector's topic partition via the <strong><em>PATCH /connectors/{connector}/offsets</em></strong> |
||||
* endpoint |
||||
* |
||||
* @param connectorName name of the sink connector whose offset is to be altered |
||||
* @param topicPartition the topic partition for which the offset is to be altered |
||||
* @param offset the offset to be written |
||||
* |
||||
* @return the API response as a {@link java.lang.String} |
||||
*/ |
||||
public String alterSinkConnectorOffset(String connectorName, TopicPartition topicPartition, Long offset) { |
||||
return alterConnectorOffsets( |
||||
connectorName, |
||||
SinkUtils.consumerGroupOffsetsToConnectorOffsets(Collections.singletonMap(topicPartition, new OffsetAndMetadata(offset))) |
||||
); |
||||
} |
||||
|
||||
/** |
||||
* Alter a connector's offsets via the <strong><em>PATCH /connectors/{connector}/offsets</em></strong> endpoint |
||||
* |
||||
* @param connectorName name of the connector whose offsets are to be altered |
||||
* @param offsets offsets to alter |
||||
* |
||||
* @return the API response as a {@link java.lang.String} |
||||
*/ |
||||
public String alterConnectorOffsets(String connectorName, ConnectorOffsets offsets) { |
||||
String url = endpointForResource(String.format("connectors/%s/offsets", connectorName)); |
||||
ObjectMapper mapper = new ObjectMapper(); |
||||
String content; |
||||
try { |
||||
content = mapper.writeValueAsString(offsets); |
||||
} catch (IOException e) { |
||||
throw new ConnectException("Could not serialize connector offsets and execute PATCH request"); |
||||
} |
||||
|
||||
Response response = requestPatch(url, content); |
||||
if (response.getStatus() < Response.Status.BAD_REQUEST.getStatusCode()) { |
||||
return responseToString(response); |
||||
} else { |
||||
throw new ConnectRestException(response.getStatus(), |
||||
"Could not alter connector offsets. Error response: " + responseToString(response)); |
||||
} |
||||
} |
||||
|
||||
/** |
||||
* Reset a connector's offsets via the <strong><em>DELETE /connectors/{connector}/offsets</em></strong> endpoint |
||||
* |
||||
* @param connectorName name of the connector whose offsets are to be reset |
||||
*/ |
||||
public String resetConnectorOffsets(String connectorName) { |
||||
String url = endpointForResource(String.format("connectors/%s/offsets", connectorName)); |
||||
Response response = requestDelete(url); |
||||
if (response.getStatus() < Response.Status.BAD_REQUEST.getStatusCode()) { |
||||
return responseToString(response); |
||||
} else { |
||||
throw new ConnectRestException(response.getStatus(), |
||||
"Could not reset connector offsets. Error response: " + responseToString(response)); |
||||
} |
||||
} |
||||
|
||||
/** |
||||
* Get the {@link LoggerLevel level} for a specific logger |
||||
* @param logger the name of the logger |
||||
* @return the level for the logger, as reported by the Connect REST API |
||||
*/ |
||||
public LoggerLevel getLogLevel(String logger) { |
||||
String resource = "admin/loggers/" + logger; |
||||
String url = adminEndpoint(resource); |
||||
Response response = requestGet(url); |
||||
|
||||
if (response.getStatus() < Response.Status.BAD_REQUEST.getStatusCode()) { |
||||
ObjectMapper mapper = new ObjectMapper(); |
||||
try { |
||||
return mapper.readerFor(LoggerLevel.class).readValue(responseToString(response)); |
||||
} catch (IOException e) { |
||||
log.error("Could not read logger level from response: {}", |
||||
responseToString(response), e); |
||||
throw new ConnectException("Could not not parse logger level", e); |
||||
} |
||||
} else { |
||||
throw new ConnectRestException( |
||||
response.getStatus(), |
||||
"Could not read log level. Error response: " + responseToString(response) |
||||
); |
||||
} |
||||
} |
||||
|
||||
/** |
||||
* Get the {@link LoggerLevel levels} for all known loggers |
||||
* @return the levels of all known loggers, as reported by the Connect REST API |
||||
*/ |
||||
public Map<String, LoggerLevel> allLogLevels() { |
||||
String resource = "admin/loggers"; |
||||
String url = adminEndpoint(resource); |
||||
Response response = requestGet(url); |
||||
|
||||
if (response.getStatus() < Response.Status.BAD_REQUEST.getStatusCode()) { |
||||
ObjectMapper mapper = new ObjectMapper(); |
||||
try { |
||||
return mapper |
||||
.readerFor(new TypeReference<Map<String, LoggerLevel>>() { }) |
||||
.readValue(responseToString(response)); |
||||
} catch (IOException e) { |
||||
log.error("Could not read logger levels from response: {}", |
||||
responseToString(response), e); |
||||
throw new ConnectException("Could not not parse logger levels", e); |
||||
} |
||||
} else { |
||||
throw new ConnectRestException( |
||||
response.getStatus(), |
||||
"Could not read log levels. Error response: " + responseToString(response) |
||||
); |
||||
} |
||||
} |
||||
|
||||
/** |
||||
* Adjust the level of a logging namespace. |
||||
* @param namespace the namespace to adjust; may not be null |
||||
* @param level the level to set the namespace to; may not be null |
||||
* @param scope the scope of the operation; may be null |
||||
* @return the list of affected loggers, as reported by the Connect REST API; |
||||
* may be null if no body was included in the response |
||||
*/ |
||||
public List<String> setLogLevel(String namespace, String level, String scope) { |
||||
String resource = "admin/loggers/" + namespace; |
||||
if (scope != null) |
||||
resource += "?scope=" + scope; |
||||
String url = adminEndpoint(resource); |
||||
String body = "{\"level\": \"" + level + "\"}"; |
||||
Response response = requestPut(url, body); |
||||
|
||||
if (response.getStatus() == Response.Status.NO_CONTENT.getStatusCode()) { |
||||
if (response.getEntity() != null && !response.getEntity().equals("")) { |
||||
// Don't use JUnit assertNull here because this library is used by both
|
||||
// Connect runtime tests and MirrorMaker 2 tests, which use different
|
||||
// versions of JUnit
|
||||
throw new AssertionError( |
||||
"Response with 204 status contained non-null entity: '" |
||||
+ response.getEntity() + "'" |
||||
); |
||||
} |
||||
return null; |
||||
} else if (response.getStatus() < Response.Status.BAD_REQUEST.getStatusCode()) { |
||||
ObjectMapper mapper = new ObjectMapper(); |
||||
try { |
||||
return mapper |
||||
.readerFor(new TypeReference<List<String>>() { }) |
||||
.readValue(responseToString(response)); |
||||
} catch (IOException e) { |
||||
log.error("Could not read loggers from response: {}", |
||||
responseToString(response), e); |
||||
throw new ConnectException("Could not not parse loggers", e); |
||||
} |
||||
} else { |
||||
throw new ConnectRestException( |
||||
response.getStatus(), |
||||
"Could not set log level. Error response: " + responseToString(response) |
||||
); |
||||
} |
||||
} |
||||
|
||||
/** |
||||
* Get the full URL of the admin endpoint that corresponds to the given REST resource |
||||
* |
||||
* @param resource the resource under the worker's admin endpoint |
||||
* @return the admin endpoint URL |
||||
* @throws ConnectException if no admin REST endpoint is available |
||||
*/ |
||||
public String adminEndpoint(String resource) { |
||||
String url = workers().stream() |
||||
.map(WorkerHandle::adminUrl) |
||||
.filter(Objects::nonNull) |
||||
.findFirst() |
||||
.orElseThrow(() -> new ConnectException("Admin endpoint is disabled.")) |
||||
.toString(); |
||||
return url + resource; |
||||
} |
||||
|
||||
/** |
||||
* Get the full URL of the endpoint that corresponds to the given REST resource |
||||
* |
||||
* @param resource the resource under the worker's admin endpoint |
||||
* @return the admin endpoint URL |
||||
* @throws ConnectException if no REST endpoint is available |
||||
*/ |
||||
public String endpointForResource(String resource) { |
||||
String url = workers().stream() |
||||
.map(WorkerHandle::url) |
||||
.filter(Objects::nonNull) |
||||
.findFirst() |
||||
.orElseThrow(() -> new ConnectException("Connect workers have not been provisioned")) |
||||
.toString(); |
||||
return url + resource; |
||||
} |
||||
|
||||
/** |
||||
* Get the full URL of the endpoint that corresponds to the given REST resource using a worker |
||||
* that is not running any tasks or connector instance for the connectorName provided in the arguments |
||||
* |
||||
* @param resource the resource under the worker's admin endpoint |
||||
* @param connectorName the name of the connector |
||||
* @return the admin endpoint URL |
||||
* @throws ConnectException if no REST endpoint is available |
||||
*/ |
||||
public String endpointForResourceNotRunningConnector(String resource, String connectorName) { |
||||
ConnectorStateInfo info = connectorStatus(connectorName); |
||||
Set<String> activeWorkerUrls = new HashSet<>(); |
||||
activeWorkerUrls.add(String.format("http://%s/", info.connector().workerId())); |
||||
info.tasks().forEach(t -> activeWorkerUrls.add(String.format("http://%s/", t.workerId()))); |
||||
String url = workers().stream() |
||||
.map(WorkerHandle::url) |
||||
.filter(Objects::nonNull) |
||||
.filter(workerUrl -> !activeWorkerUrls.contains(workerUrl.toString())) |
||||
.findFirst() |
||||
.orElseThrow(() -> new ConnectException( |
||||
String.format("Connect workers have not been provisioned or no free worker found that is not running this connector(%s) or its tasks", connectorName))) |
||||
.toString(); |
||||
return url + resource; |
||||
} |
||||
|
||||
/** |
||||
* Return the handle to the Kafka cluster this Connect cluster connects to. |
||||
* |
||||
* @return the Kafka cluster handle |
||||
*/ |
||||
public EmbeddedKafkaCluster kafka() { |
||||
return kafkaCluster; |
||||
} |
||||
|
||||
/** |
||||
* Execute a GET request on the given URL. |
||||
* |
||||
* @param url the HTTP endpoint |
||||
* @return the response to the GET request |
||||
* @throws ConnectException if execution of the GET request fails |
||||
* @deprecated Use {@link #requestGet(String)} instead. |
||||
*/ |
||||
@Deprecated |
||||
public String executeGet(String url) { |
||||
return responseToString(requestGet(url)); |
||||
} |
||||
|
||||
/** |
||||
* Execute a GET request on the given URL. |
||||
* |
||||
* @param url the HTTP endpoint |
||||
* @return the response to the GET request |
||||
* @throws ConnectException if execution of the GET request fails |
||||
*/ |
||||
public Response requestGet(String url) { |
||||
return requestHttpMethod(url, null, Collections.emptyMap(), "GET"); |
||||
} |
||||
|
||||
/** |
||||
* Execute a PUT request on the given URL. |
||||
* |
||||
* @param url the HTTP endpoint |
||||
* @param body the payload of the PUT request |
||||
* @return the response to the PUT request |
||||
* @throws ConnectException if execution of the PUT request fails |
||||
* @deprecated Use {@link #requestPut(String, String)} instead. |
||||
*/ |
||||
@Deprecated |
||||
public int executePut(String url, String body) { |
||||
return requestPut(url, body).getStatus(); |
||||
} |
||||
|
||||
/** |
||||
* Execute a PUT request on the given URL. |
||||
* |
||||
* @param url the HTTP endpoint |
||||
* @param body the payload of the PUT request |
||||
* @return the response to the PUT request |
||||
* @throws ConnectException if execution of the PUT request fails |
||||
*/ |
||||
public Response requestPut(String url, String body) { |
||||
return requestHttpMethod(url, body, Collections.emptyMap(), "PUT"); |
||||
} |
||||
|
||||
/** |
||||
* Execute a POST request on the given URL. |
||||
* |
||||
* @param url the HTTP endpoint |
||||
* @param body the payload of the POST request |
||||
* @param headers a map that stores the POST request headers |
||||
* @return the response to the POST request |
||||
* @throws ConnectException if execution of the POST request fails |
||||
* @deprecated Use {@link #requestPost(String, String, java.util.Map)} instead. |
||||
*/ |
||||
@Deprecated |
||||
public int executePost(String url, String body, Map<String, String> headers) { |
||||
return requestPost(url, body, headers).getStatus(); |
||||
} |
||||
|
||||
/** |
||||
* Execute a POST request on the given URL. |
||||
* |
||||
* @param url the HTTP endpoint |
||||
* @param body the payload of the POST request |
||||
* @param headers a map that stores the POST request headers |
||||
* @return the response to the POST request |
||||
* @throws ConnectException if execution of the POST request fails |
||||
*/ |
||||
public Response requestPost(String url, String body, Map<String, String> headers) { |
||||
return requestHttpMethod(url, body, headers, "POST"); |
||||
} |
||||
|
||||
/** |
||||
* Execute a PATCH request on the given URL. |
||||
* |
||||
* @param url the HTTP endpoint |
||||
* @param body the payload of the PATCH request |
||||
* @return the response to the PATCH request |
||||
* @throws ConnectException if execution of the PATCH request fails |
||||
*/ |
||||
public Response requestPatch(String url, String body) { |
||||
return requestHttpMethod(url, body, Collections.emptyMap(), "PATCH"); |
||||
} |
||||
|
||||
/** |
||||
* Execute a DELETE request on the given URL. |
||||
* |
||||
* @param url the HTTP endpoint |
||||
* @return the response to the DELETE request |
||||
* @throws ConnectException if execution of the DELETE request fails |
||||
* @deprecated Use {@link #requestDelete(String)} instead. |
||||
*/ |
||||
@Deprecated |
||||
public int executeDelete(String url) { |
||||
return requestDelete(url).getStatus(); |
||||
} |
||||
|
||||
/** |
||||
* Execute a DELETE request on the given URL. |
||||
* |
||||
* @param url the HTTP endpoint |
||||
* @return the response to the DELETE request |
||||
* @throws ConnectException if execution of the DELETE request fails |
||||
*/ |
||||
public Response requestDelete(String url) { |
||||
return requestHttpMethod(url, null, Collections.emptyMap(), "DELETE"); |
||||
} |
||||
|
||||
/** |
||||
* A general method that executes an HTTP request on a given URL. |
||||
* |
||||
* @param url the HTTP endpoint |
||||
* @param body the payload of the request; null if there isn't one |
||||
* @param headers a map that stores the request headers; empty if there are no headers |
||||
* @param httpMethod the name of the HTTP method to execute |
||||
* @return the response to the HTTP request |
||||
* @throws ConnectException if execution of the HTTP method fails |
||||
*/ |
||||
protected Response requestHttpMethod(String url, String body, Map<String, String> headers, |
||||
String httpMethod) { |
||||
log.debug("Executing {} request to URL={}." + (body != null ? " Payload={}" : ""), |
||||
httpMethod, url, body); |
||||
|
||||
try { |
||||
Request req = httpClient.newRequest(url); |
||||
req.method(httpMethod); |
||||
if (body != null) { |
||||
headers.forEach(req::header); |
||||
req.content(new StringContentProvider(body), "application/json"); |
||||
} |
||||
|
||||
ContentResponse res = req.send(); |
||||
log.info("{} response for URL={} is {}", |
||||
httpMethod, url, res.getContentAsString().isEmpty() ? "empty" : res.getContentAsString()); |
||||
return Response.status(Response.Status.fromStatusCode(res.getStatus())) |
||||
.entity(res.getContentAsString()) |
||||
.build(); |
||||
} catch (Exception e) { |
||||
log.error("Could not execute " + httpMethod + " request to " + url, e); |
||||
throw new ConnectException(e); |
||||
} |
||||
} |
||||
|
||||
private String responseToString(Response response) { |
||||
return response == null ? "empty" : (String) response.getEntity(); |
||||
} |
||||
|
||||
/** |
||||
* Get the workers that are up and running. |
||||
* |
||||
* @return the list of handles of the online workers |
||||
*/ |
||||
public Set<WorkerHandle> activeWorkers() { |
||||
ObjectMapper mapper = new ObjectMapper(); |
||||
return workers().stream() |
||||
.filter(w -> { |
||||
try { |
||||
mapper.readerFor(ServerInfo.class) |
||||
.readValue(responseToString(requestGet(w.url().toString()))); |
||||
return true; |
||||
} catch (ConnectException | IOException e) { |
||||
// Worker failed to respond. Consider it's offline
|
||||
return false; |
||||
} |
||||
}) |
||||
.collect(Collectors.toSet()); |
||||
} |
||||
|
||||
|
||||
/** |
||||
* Return the available assertions for this Connect cluster |
||||
* |
||||
* @return the assertions object |
||||
*/ |
||||
public ConnectAssertions assertions() { |
||||
return assertions; |
||||
} |
||||
|
||||
} |
@ -0,0 +1,84 @@
@@ -0,0 +1,84 @@
|
||||
/* |
||||
* Licensed to the Apache Software Foundation (ASF) under one or more |
||||
* contributor license agreements. See the NOTICE file distributed with |
||||
* this work for additional information regarding copyright ownership. |
||||
* The ASF licenses this file to You under the Apache License, Version 2.0 |
||||
* (the "License"); you may not use this file except in compliance with |
||||
* the License. You may obtain a copy of the License at |
||||
* |
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
* |
||||
* Unless required by applicable law or agreed to in writing, software |
||||
* distributed under the License is distributed on an "AS IS" BASIS, |
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
||||
* See the License for the specific language governing permissions and |
||||
* limitations under the License. |
||||
*/ |
||||
package org.apache.kafka.connect.util.clusters; |
||||
|
||||
import java.util.HashMap; |
||||
import java.util.Map; |
||||
import java.util.Properties; |
||||
|
||||
abstract class EmbeddedConnectBuilder<C extends EmbeddedConnect, B extends EmbeddedConnectBuilder<C, B>> { |
||||
private Map<String, String> workerProps = new HashMap<>(); |
||||
private int numBrokers = EmbeddedConnect.DEFAULT_NUM_BROKERS; |
||||
private Properties brokerProps = new Properties(); |
||||
private boolean maskExitProcedures = true; |
||||
private final Map<String, String> clientProps = new HashMap<>(); |
||||
|
||||
protected abstract C build( |
||||
int numBrokers, |
||||
Properties brokerProps, |
||||
boolean maskExitProcedures, |
||||
Map<String, String> clientProps, |
||||
Map<String, String> workerProps |
||||
); |
||||
|
||||
public B workerProps(Map<String, String> workerProps) { |
||||
this.workerProps = workerProps; |
||||
return self(); |
||||
} |
||||
|
||||
public B numBrokers(int numBrokers) { |
||||
this.numBrokers = numBrokers; |
||||
return self(); |
||||
} |
||||
|
||||
public B brokerProps(Properties brokerProps) { |
||||
this.brokerProps = brokerProps; |
||||
return self(); |
||||
} |
||||
|
||||
public B clientProps(Map<String, String> clientProps) { |
||||
this.clientProps.putAll(clientProps); |
||||
return self(); |
||||
} |
||||
|
||||
/** |
||||
* In the event of ungraceful shutdown, embedded clusters call exit or halt with non-zero |
||||
* exit statuses. Exiting with a non-zero status forces a test to fail and is hard to |
||||
* handle. Because graceful exit is usually not required during a test and because |
||||
* depending on such an exit increases flakiness, this setting allows masking |
||||
* exit and halt procedures by using a runtime exception instead. Customization of the |
||||
* exit and halt procedures is possible through {@code exitProcedure} and {@code |
||||
* haltProcedure} respectively. |
||||
* |
||||
* @param mask if false, exit and halt procedures remain unchanged; true is the default. |
||||
* @return the builder for this cluster |
||||
*/ |
||||
public B maskExitProcedures(boolean mask) { |
||||
this.maskExitProcedures = mask; |
||||
return self(); |
||||
} |
||||
|
||||
public C build() { |
||||
return build(numBrokers, brokerProps, maskExitProcedures, clientProps, workerProps); |
||||
} |
||||
|
||||
@SuppressWarnings("unchecked") |
||||
protected B self() { |
||||
return (B) this; |
||||
} |
||||
|
||||
} |
File diff suppressed because it is too large
Load Diff
@ -0,0 +1,140 @@
@@ -0,0 +1,140 @@
|
||||
/* |
||||
* Licensed to the Apache Software Foundation (ASF) under one or more |
||||
* contributor license agreements. See the NOTICE file distributed with |
||||
* this work for additional information regarding copyright ownership. |
||||
* The ASF licenses this file to You under the Apache License, Version 2.0 |
||||
* (the "License"); you may not use this file except in compliance with |
||||
* the License. You may obtain a copy of the License at |
||||
* |
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
* |
||||
* Unless required by applicable law or agreed to in writing, software |
||||
* distributed under the License is distributed on an "AS IS" BASIS, |
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
||||
* See the License for the specific language governing permissions and |
||||
* limitations under the License. |
||||
*/ |
||||
package org.apache.kafka.connect.util.clusters; |
||||
|
||||
import org.apache.kafka.connect.cli.ConnectStandalone; |
||||
import org.apache.kafka.connect.runtime.Connect; |
||||
import org.apache.kafka.test.TestUtils; |
||||
import org.slf4j.Logger; |
||||
import org.slf4j.LoggerFactory; |
||||
|
||||
import java.io.IOException; |
||||
import java.io.UncheckedIOException; |
||||
import java.util.Collections; |
||||
import java.util.Map; |
||||
import java.util.Properties; |
||||
import java.util.Set; |
||||
|
||||
import static org.apache.kafka.connect.runtime.ConnectorConfig.KEY_CONVERTER_CLASS_CONFIG; |
||||
import static org.apache.kafka.connect.runtime.ConnectorConfig.VALUE_CONVERTER_CLASS_CONFIG; |
||||
import static org.apache.kafka.connect.runtime.WorkerConfig.BOOTSTRAP_SERVERS_CONFIG; |
||||
import static org.apache.kafka.connect.runtime.WorkerConfig.PLUGIN_DISCOVERY_CONFIG; |
||||
import static org.apache.kafka.connect.runtime.rest.RestServerConfig.LISTENERS_CONFIG; |
||||
import static org.apache.kafka.connect.runtime.standalone.StandaloneConfig.OFFSET_STORAGE_FILE_FILENAME_CONFIG; |
||||
|
||||
/** |
||||
* Start a standalone embedded connect worker. Internally, this class will spin up a Kafka and Zk cluster, |
||||
* set up any tmp directories. and clean them up on exit. Methods on the same |
||||
* {@code EmbeddedConnectStandalone} are not guaranteed to be thread-safe. |
||||
*/ |
||||
public class EmbeddedConnectStandalone extends EmbeddedConnect { |
||||
|
||||
private static final Logger log = LoggerFactory.getLogger(EmbeddedConnectStandalone.class); |
||||
|
||||
private static final String REST_HOST_NAME = "localhost"; |
||||
|
||||
private final Map<String, String> workerProps; |
||||
private final String offsetsFile; |
||||
|
||||
private WorkerHandle connectWorker; |
||||
|
||||
private EmbeddedConnectStandalone( |
||||
int numBrokers, |
||||
Properties brokerProps, |
||||
boolean maskExitProcedures, |
||||
Map<String, String> clientProps, |
||||
Map<String, String> workerProps, |
||||
String offsetsFile |
||||
) { |
||||
super(numBrokers, brokerProps, maskExitProcedures, clientProps); |
||||
this.workerProps = workerProps; |
||||
this.offsetsFile = offsetsFile; |
||||
} |
||||
|
||||
@Override |
||||
public void startConnect() { |
||||
log.info("Starting standalone Connect worker"); |
||||
|
||||
workerProps.put(BOOTSTRAP_SERVERS_CONFIG, kafka().bootstrapServers()); |
||||
// use a random available port
|
||||
workerProps.put(LISTENERS_CONFIG, "HTTP://" + REST_HOST_NAME + ":0"); |
||||
|
||||
workerProps.putIfAbsent(OFFSET_STORAGE_FILE_FILENAME_CONFIG, offsetsFile); |
||||
workerProps.putIfAbsent(KEY_CONVERTER_CLASS_CONFIG, "org.apache.kafka.connect.storage.StringConverter"); |
||||
workerProps.putIfAbsent(VALUE_CONVERTER_CLASS_CONFIG, "org.apache.kafka.connect.storage.StringConverter"); |
||||
workerProps.putIfAbsent(PLUGIN_DISCOVERY_CONFIG, "hybrid_fail"); |
||||
|
||||
Connect connect = new ConnectStandalone().startConnect(workerProps); |
||||
connectWorker = new WorkerHandle("standalone", connect); |
||||
} |
||||
|
||||
@Override |
||||
public String toString() { |
||||
return String.format("EmbeddedConnectStandalone(numBrokers= %d, workerProps= %s)", |
||||
numBrokers, |
||||
workerProps); |
||||
} |
||||
|
||||
@Override |
||||
protected Set<WorkerHandle> workers() { |
||||
return connectWorker != null |
||||
? Collections.singleton(connectWorker) |
||||
: Collections.emptySet(); |
||||
} |
||||
|
||||
public static class Builder extends EmbeddedConnectBuilder<EmbeddedConnectStandalone, Builder> { |
||||
|
||||
private String offsetsFile = null; |
||||
|
||||
public Builder offsetsFile(String offsetsFile) { |
||||
this.offsetsFile = offsetsFile; |
||||
return this; |
||||
} |
||||
|
||||
@Override |
||||
protected EmbeddedConnectStandalone build( |
||||
int numBrokers, |
||||
Properties brokerProps, |
||||
boolean maskExitProcedures, |
||||
Map<String, String> clientProps, |
||||
Map<String, String> workerProps |
||||
) { |
||||
if (offsetsFile == null) |
||||
offsetsFile = tempOffsetsFile(); |
||||
|
||||
return new EmbeddedConnectStandalone( |
||||
numBrokers, |
||||
brokerProps, |
||||
maskExitProcedures, |
||||
clientProps, |
||||
workerProps, |
||||
offsetsFile |
||||
); |
||||
} |
||||
|
||||
private String tempOffsetsFile() { |
||||
try { |
||||
return TestUtils |
||||
.tempFile("connect-standalone-offsets", null) |
||||
.getAbsolutePath(); |
||||
} catch (IOException e) { |
||||
throw new UncheckedIOException("Failed to create temporary offsets file", e); |
||||
} |
||||
} |
||||
} |
||||
|
||||
} |
Loading…
Reference in new issue