Browse Source

KAFKA-15291: Connect plugins should declare a version (#14159)

Signed-off-by: Aindriu Lavelle <aindriu.lavelle@aiven.io>
Reviewers: Andrew Schofield, Greg Harris <greg.harris@aiven.io>
pull/14134/merge
aindriu-aiven 1 year ago committed by GitHub
parent
commit
db34f8b9a1
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 9
      connect/api/src/main/java/org/apache/kafka/connect/storage/SimpleHeaderConverter.java
  2. 9
      connect/api/src/main/java/org/apache/kafka/connect/storage/StringConverter.java
  3. 6
      connect/api/src/test/java/org/apache/kafka/connect/storage/SimpleHeaderConverterTest.java
  4. 6
      connect/api/src/test/java/org/apache/kafka/connect/storage/StringConverterTest.java
  5. 9
      connect/json/src/main/java/org/apache/kafka/connect/json/JsonConverter.java
  6. 6
      connect/json/src/test/java/org/apache/kafka/connect/json/JsonConverterTest.java
  7. 9
      connect/runtime/src/main/java/org/apache/kafka/connect/connector/policy/AbstractConnectorClientConfigOverridePolicy.java
  8. 9
      connect/runtime/src/main/java/org/apache/kafka/connect/converters/ByteArrayConverter.java
  9. 8
      connect/runtime/src/main/java/org/apache/kafka/connect/converters/NumberConverter.java
  10. 6
      connect/runtime/src/test/java/org/apache/kafka/connect/converters/ByteArrayConverterTest.java
  11. 6
      connect/runtime/src/test/java/org/apache/kafka/connect/converters/NumberConverterTest.java
  12. 8
      connect/runtime/src/test/java/org/apache/kafka/connect/integration/ErrorHandlingIntegrationTest.java
  13. 42
      connect/runtime/src/test/java/org/apache/kafka/connect/runtime/ConnectorConfigTest.java
  14. 15
      connect/runtime/src/test/java/org/apache/kafka/connect/runtime/ErrorHandlingTaskTest.java
  15. 9
      connect/runtime/src/test/java/org/apache/kafka/connect/runtime/SampleConverterWithHeaders.java
  16. 9
      connect/runtime/src/test/java/org/apache/kafka/connect/runtime/SampleHeaderConverter.java
  17. 9
      connect/runtime/src/test/java/org/apache/kafka/connect/runtime/SamplePredicate.java
  18. 9
      connect/runtime/src/test/java/org/apache/kafka/connect/runtime/SampleTransformation.java
  19. 23
      connect/runtime/src/test/java/org/apache/kafka/connect/runtime/isolation/PluginScannerTest.java
  20. 22
      connect/runtime/src/test/java/org/apache/kafka/connect/runtime/isolation/PluginUtilsTest.java
  21. 15
      connect/runtime/src/test/java/org/apache/kafka/connect/runtime/isolation/PluginsTest.java
  22. 16
      connect/runtime/src/test/java/org/apache/kafka/connect/runtime/rest/resources/ConnectorPluginsResourceTest.java
  23. 9
      connect/transforms/src/main/java/org/apache/kafka/connect/transforms/Cast.java
  24. 9
      connect/transforms/src/main/java/org/apache/kafka/connect/transforms/DropHeaders.java
  25. 9
      connect/transforms/src/main/java/org/apache/kafka/connect/transforms/ExtractField.java
  26. 9
      connect/transforms/src/main/java/org/apache/kafka/connect/transforms/Filter.java
  27. 9
      connect/transforms/src/main/java/org/apache/kafka/connect/transforms/Flatten.java
  28. 9
      connect/transforms/src/main/java/org/apache/kafka/connect/transforms/HeaderFrom.java
  29. 9
      connect/transforms/src/main/java/org/apache/kafka/connect/transforms/HoistField.java
  30. 9
      connect/transforms/src/main/java/org/apache/kafka/connect/transforms/InsertField.java
  31. 9
      connect/transforms/src/main/java/org/apache/kafka/connect/transforms/InsertHeader.java
  32. 9
      connect/transforms/src/main/java/org/apache/kafka/connect/transforms/MaskField.java
  33. 9
      connect/transforms/src/main/java/org/apache/kafka/connect/transforms/RegexRouter.java
  34. 9
      connect/transforms/src/main/java/org/apache/kafka/connect/transforms/ReplaceField.java
  35. 9
      connect/transforms/src/main/java/org/apache/kafka/connect/transforms/SetSchemaMetadata.java
  36. 9
      connect/transforms/src/main/java/org/apache/kafka/connect/transforms/TimestampConverter.java
  37. 9
      connect/transforms/src/main/java/org/apache/kafka/connect/transforms/TimestampRouter.java
  38. 9
      connect/transforms/src/main/java/org/apache/kafka/connect/transforms/ValueToKey.java
  39. 9
      connect/transforms/src/main/java/org/apache/kafka/connect/transforms/predicates/HasHeaderKey.java
  40. 9
      connect/transforms/src/main/java/org/apache/kafka/connect/transforms/predicates/RecordIsTombstone.java
  41. 9
      connect/transforms/src/main/java/org/apache/kafka/connect/transforms/predicates/TopicNameMatches.java
  42. 9
      connect/transforms/src/test/java/org/apache/kafka/connect/transforms/CastTest.java
  43. 6
      connect/transforms/src/test/java/org/apache/kafka/connect/transforms/DropHeadersTest.java
  44. 7
      connect/transforms/src/test/java/org/apache/kafka/connect/transforms/ExtractFieldTest.java
  45. 9
      connect/transforms/src/test/java/org/apache/kafka/connect/transforms/FlattenTest.java
  46. 6
      connect/transforms/src/test/java/org/apache/kafka/connect/transforms/HoistFieldTest.java
  47. 9
      connect/transforms/src/test/java/org/apache/kafka/connect/transforms/InsertFieldTest.java
  48. 6
      connect/transforms/src/test/java/org/apache/kafka/connect/transforms/InsertHeaderTest.java
  49. 7
      connect/transforms/src/test/java/org/apache/kafka/connect/transforms/MaskFieldTest.java
  50. 7
      connect/transforms/src/test/java/org/apache/kafka/connect/transforms/RegexRouterTest.java
  51. 6
      connect/transforms/src/test/java/org/apache/kafka/connect/transforms/ReplaceFieldTest.java
  52. 6
      connect/transforms/src/test/java/org/apache/kafka/connect/transforms/SetSchemaMetadataTest.java
  53. 8
      connect/transforms/src/test/java/org/apache/kafka/connect/transforms/TimestampConverterTest.java
  54. 15
      connect/transforms/src/test/java/org/apache/kafka/connect/transforms/TimestampRouterTest.java
  55. 6
      connect/transforms/src/test/java/org/apache/kafka/connect/transforms/ValueToKeyTest.java

9
connect/api/src/main/java/org/apache/kafka/connect/storage/SimpleHeaderConverter.java

@ -17,6 +17,8 @@ @@ -17,6 +17,8 @@
package org.apache.kafka.connect.storage;
import org.apache.kafka.common.config.ConfigDef;
import org.apache.kafka.common.utils.AppInfoParser;
import org.apache.kafka.connect.components.Versioned;
import org.apache.kafka.connect.data.Schema;
import org.apache.kafka.connect.data.SchemaAndValue;
import org.apache.kafka.connect.data.Values;
@ -34,13 +36,18 @@ import java.util.NoSuchElementException; @@ -34,13 +36,18 @@ import java.util.NoSuchElementException;
* A {@link HeaderConverter} that serializes header values as strings and that deserializes header values to the most appropriate
* numeric, boolean, array, or map representation. Schemas are not serialized, but are inferred upon deserialization when possible.
*/
public class SimpleHeaderConverter implements HeaderConverter {
public class SimpleHeaderConverter implements HeaderConverter, Versioned {
private static final Logger LOG = LoggerFactory.getLogger(SimpleHeaderConverter.class);
private static final ConfigDef CONFIG_DEF = new ConfigDef();
private static final SchemaAndValue NULL_SCHEMA_AND_VALUE = new SchemaAndValue(null, null);
private static final Charset UTF_8 = StandardCharsets.UTF_8;
@Override
public String version() {
return AppInfoParser.getVersion();
}
@Override
public ConfigDef config() {
return CONFIG_DEF;

9
connect/api/src/main/java/org/apache/kafka/connect/storage/StringConverter.java

@ -20,7 +20,9 @@ import org.apache.kafka.common.config.ConfigDef; @@ -20,7 +20,9 @@ import org.apache.kafka.common.config.ConfigDef;
import org.apache.kafka.common.errors.SerializationException;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.apache.kafka.common.serialization.StringSerializer;
import org.apache.kafka.common.utils.AppInfoParser;
import org.apache.kafka.common.utils.Utils;
import org.apache.kafka.connect.components.Versioned;
import org.apache.kafka.connect.data.Schema;
import org.apache.kafka.connect.data.SchemaAndValue;
import org.apache.kafka.connect.errors.DataException;
@ -40,7 +42,7 @@ import java.util.Map; @@ -40,7 +42,7 @@ import java.util.Map;
* <p>
* This implementation currently does nothing with the topic names or header keys.
*/
public class StringConverter implements Converter, HeaderConverter {
public class StringConverter implements Converter, HeaderConverter, Versioned {
private final StringSerializer serializer = new StringSerializer();
private final StringDeserializer deserializer = new StringDeserializer();
@ -48,6 +50,11 @@ public class StringConverter implements Converter, HeaderConverter { @@ -48,6 +50,11 @@ public class StringConverter implements Converter, HeaderConverter {
public StringConverter() {
}
@Override
public String version() {
return AppInfoParser.getVersion();
}
@Override
public ConfigDef config() {
return StringConverterConfig.configDef();

6
connect/api/src/test/java/org/apache/kafka/connect/storage/SimpleHeaderConverterTest.java

@ -16,6 +16,7 @@ @@ -16,6 +16,7 @@
*/
package org.apache.kafka.connect.storage;
import org.apache.kafka.common.utils.AppInfoParser;
import org.apache.kafka.connect.data.Schema;
import org.apache.kafka.connect.data.SchemaAndValue;
import org.apache.kafka.connect.data.SchemaBuilder;
@ -205,6 +206,11 @@ public class SimpleHeaderConverterTest { @@ -205,6 +206,11 @@ public class SimpleHeaderConverterTest {
assertEquals(list, result.value());
}
@Test
public void converterShouldReturnAppInfoParserVersion() {
assertEquals(AppInfoParser.getVersion(), converter.version());
}
protected SchemaAndValue roundTrip(Schema schema, Object input) {
byte[] serialized = converter.fromConnectHeader(TOPIC, HEADER, schema, input);
return converter.toConnectHeader(TOPIC, HEADER, serialized);

6
connect/api/src/test/java/org/apache/kafka/connect/storage/StringConverterTest.java

@ -16,6 +16,7 @@ @@ -16,6 +16,7 @@
*/
package org.apache.kafka.connect.storage;
import org.apache.kafka.common.utils.AppInfoParser;
import org.apache.kafka.common.utils.Utils;
import org.apache.kafka.connect.data.Schema;
import org.apache.kafka.connect.data.SchemaAndValue;
@ -99,4 +100,9 @@ public class StringConverterTest { @@ -99,4 +100,9 @@ public class StringConverterTest {
public void testNullHeaderValueToBytes() {
assertNull(converter.fromConnectHeader(TOPIC, "hdr", Schema.OPTIONAL_STRING_SCHEMA, null));
}
@Test
public void testInheritedVersionRetrievedFromAppInfoParser() {
assertEquals(AppInfoParser.getVersion(), converter.version());
}
}

9
connect/json/src/main/java/org/apache/kafka/connect/json/JsonConverter.java

@ -26,7 +26,9 @@ import org.apache.kafka.common.cache.LRUCache; @@ -26,7 +26,9 @@ import org.apache.kafka.common.cache.LRUCache;
import org.apache.kafka.common.cache.SynchronizedCache;
import org.apache.kafka.common.config.ConfigDef;
import org.apache.kafka.common.errors.SerializationException;
import org.apache.kafka.common.utils.AppInfoParser;
import org.apache.kafka.common.utils.Utils;
import org.apache.kafka.connect.components.Versioned;
import org.apache.kafka.connect.data.SchemaBuilder;
import org.apache.kafka.connect.data.Schema;
import org.apache.kafka.connect.data.Struct;
@ -62,7 +64,7 @@ import static org.apache.kafka.common.utils.Utils.mkSet; @@ -62,7 +64,7 @@ import static org.apache.kafka.common.utils.Utils.mkSet;
* <p>
* This implementation currently does nothing with the topic names or header keys.
*/
public class JsonConverter implements Converter, HeaderConverter {
public class JsonConverter implements Converter, HeaderConverter, Versioned {
private static final Map<Schema.Type, JsonToConnectTypeConverter> TO_CONNECT_CONVERTERS = new EnumMap<>(Schema.Type.class);
@ -258,6 +260,11 @@ public class JsonConverter implements Converter, HeaderConverter { @@ -258,6 +260,11 @@ public class JsonConverter implements Converter, HeaderConverter {
return toConnectSchemaCache.size();
}
@Override
public String version() {
return AppInfoParser.getVersion();
}
@Override
public ConfigDef config() {
return JsonConverterConfig.configDef();

6
connect/json/src/test/java/org/apache/kafka/connect/json/JsonConverterTest.java

@ -21,6 +21,7 @@ import com.fasterxml.jackson.databind.JsonNode; @@ -21,6 +21,7 @@ import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.node.ArrayNode;
import com.fasterxml.jackson.databind.node.JsonNodeFactory;
import org.apache.kafka.common.utils.AppInfoParser;
import org.apache.kafka.common.utils.Utils;
import org.apache.kafka.connect.data.Date;
import org.apache.kafka.connect.data.Decimal;
@ -969,6 +970,11 @@ public class JsonConverterTest { @@ -969,6 +970,11 @@ public class JsonConverterTest {
assertEquals(new Struct(structSchema), sav.value());
}
@Test
public void testVersionRetrievedFromAppInfoParser() {
assertEquals(AppInfoParser.getVersion(), converter.version());
}
private JsonNode parse(byte[] json) {
try {
return objectMapper.readTree(json);

9
connect/runtime/src/main/java/org/apache/kafka/connect/connector/policy/AbstractConnectorClientConfigOverridePolicy.java

@ -18,13 +18,20 @@ @@ -18,13 +18,20 @@
package org.apache.kafka.connect.connector.policy;
import org.apache.kafka.common.config.ConfigValue;
import org.apache.kafka.common.utils.AppInfoParser;
import org.apache.kafka.connect.components.Versioned;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;
public abstract class AbstractConnectorClientConfigOverridePolicy implements ConnectorClientConfigOverridePolicy {
public abstract class AbstractConnectorClientConfigOverridePolicy implements ConnectorClientConfigOverridePolicy, Versioned {
@Override
public String version() {
return AppInfoParser.getVersion();
}
@Override
public void close() {

9
connect/runtime/src/main/java/org/apache/kafka/connect/converters/ByteArrayConverter.java

@ -18,6 +18,8 @@ @@ -18,6 +18,8 @@
package org.apache.kafka.connect.converters;
import org.apache.kafka.common.config.ConfigDef;
import org.apache.kafka.common.utils.AppInfoParser;
import org.apache.kafka.connect.components.Versioned;
import org.apache.kafka.connect.data.Schema;
import org.apache.kafka.connect.data.SchemaAndValue;
import org.apache.kafka.connect.errors.DataException;
@ -32,10 +34,13 @@ import java.util.Map; @@ -32,10 +34,13 @@ import java.util.Map;
* <p>
* This implementation currently does nothing with the topic names or header keys.
*/
public class ByteArrayConverter implements Converter, HeaderConverter {
public class ByteArrayConverter implements Converter, HeaderConverter, Versioned {
private static final ConfigDef CONFIG_DEF = ConverterConfig.newConfigDef();
@Override
public String version() {
return AppInfoParser.getVersion();
}
@Override
public ConfigDef config() {
return CONFIG_DEF;

8
connect/runtime/src/main/java/org/apache/kafka/connect/converters/NumberConverter.java

@ -20,7 +20,9 @@ import org.apache.kafka.common.config.ConfigDef; @@ -20,7 +20,9 @@ import org.apache.kafka.common.config.ConfigDef;
import org.apache.kafka.common.errors.SerializationException;
import org.apache.kafka.common.serialization.Deserializer;
import org.apache.kafka.common.serialization.Serializer;
import org.apache.kafka.common.utils.AppInfoParser;
import org.apache.kafka.common.utils.Utils;
import org.apache.kafka.connect.components.Versioned;
import org.apache.kafka.connect.data.Schema;
import org.apache.kafka.connect.data.SchemaAndValue;
import org.apache.kafka.connect.errors.DataException;
@ -39,7 +41,7 @@ import java.util.Map; @@ -39,7 +41,7 @@ import java.util.Map;
* <p>
* This implementation currently does nothing with the topic names or header keys.
*/
abstract class NumberConverter<T extends Number> implements Converter, HeaderConverter {
abstract class NumberConverter<T extends Number> implements Converter, HeaderConverter, Versioned {
private final Serializer<T> serializer;
private final Deserializer<T> deserializer;
@ -65,6 +67,10 @@ abstract class NumberConverter<T extends Number> implements Converter, HeaderCon @@ -65,6 +67,10 @@ abstract class NumberConverter<T extends Number> implements Converter, HeaderCon
assert this.schema != null;
}
@Override
public String version() {
return AppInfoParser.getVersion();
}
@Override
public ConfigDef config() {
return NumberConverterConfig.configDef();

6
connect/runtime/src/test/java/org/apache/kafka/connect/converters/ByteArrayConverterTest.java

@ -17,6 +17,7 @@ @@ -17,6 +17,7 @@
package org.apache.kafka.connect.converters;
import org.apache.kafka.common.utils.AppInfoParser;
import org.apache.kafka.connect.data.Schema;
import org.apache.kafka.connect.data.SchemaAndValue;
import org.apache.kafka.connect.errors.DataException;
@ -88,4 +89,9 @@ public class ByteArrayConverterTest { @@ -88,4 +89,9 @@ public class ByteArrayConverterTest {
assertEquals(Schema.OPTIONAL_BYTES_SCHEMA, data.schema());
assertNull(data.value());
}
@Test
public void testVersionRetrievedFromAppInfoParser() {
assertEquals(AppInfoParser.getVersion(), converter.version());
}
}

6
connect/runtime/src/test/java/org/apache/kafka/connect/converters/NumberConverterTest.java

@ -17,6 +17,7 @@ @@ -17,6 +17,7 @@
package org.apache.kafka.connect.converters;
import org.apache.kafka.common.serialization.Serializer;
import org.apache.kafka.common.utils.AppInfoParser;
import org.apache.kafka.connect.data.Schema;
import org.apache.kafka.connect.data.SchemaAndValue;
import org.apache.kafka.connect.errors.DataException;
@ -104,4 +105,9 @@ public abstract class NumberConverterTest<T extends Number> { @@ -104,4 +105,9 @@ public abstract class NumberConverterTest<T extends Number> {
assertEquals(schema(), data.schema());
assertNull(data.value());
}
@Test
public void testInheritedVersionRetrievedFromAppInfoParser() {
assertEquals(AppInfoParser.getVersion(), converter.version());
}
}

8
connect/runtime/src/test/java/org/apache/kafka/connect/integration/ErrorHandlingIntegrationTest.java

@ -20,6 +20,7 @@ import org.apache.kafka.clients.consumer.ConsumerRecord; @@ -20,6 +20,7 @@ import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.common.config.ConfigDef;
import org.apache.kafka.common.header.Headers;
import org.apache.kafka.connect.components.Versioned;
import org.apache.kafka.connect.connector.ConnectRecord;
import org.apache.kafka.connect.errors.RetriableException;
import org.apache.kafka.connect.runtime.rest.entities.ConnectorStateInfo;
@ -282,7 +283,7 @@ public class ErrorHandlingIntegrationTest { @@ -282,7 +283,7 @@ public class ErrorHandlingIntegrationTest {
assertEquals(expected, new String(actual));
}
public static class FaultyPassthrough<R extends ConnectRecord<R>> implements Transformation<R> {
public static class FaultyPassthrough<R extends ConnectRecord<R>> implements Transformation<R>, Versioned {
static final ConfigDef CONFIG_DEF = new ConfigDef();
@ -299,6 +300,11 @@ public class ErrorHandlingIntegrationTest { @@ -299,6 +300,11 @@ public class ErrorHandlingIntegrationTest {
private boolean shouldFail = true;
@Override
public String version() {
return "1.0";
}
@Override
public R apply(R record) {
String badValRetriable = "value-" + BAD_RECORD_VAL_RETRIABLE;

42
connect/runtime/src/test/java/org/apache/kafka/connect/runtime/ConnectorConfigTest.java

@ -18,6 +18,7 @@ package org.apache.kafka.connect.runtime; @@ -18,6 +18,7 @@ package org.apache.kafka.connect.runtime;
import org.apache.kafka.common.config.ConfigDef;
import org.apache.kafka.common.config.ConfigException;
import org.apache.kafka.connect.components.Versioned;
import org.apache.kafka.connect.connector.ConnectRecord;
import org.apache.kafka.connect.connector.Connector;
import org.apache.kafka.connect.runtime.isolation.PluginDesc;
@ -54,10 +55,15 @@ public class ConnectorConfigTest<R extends ConnectRecord<R>> { @@ -54,10 +55,15 @@ public class ConnectorConfigTest<R extends ConnectRecord<R>> {
public static abstract class TestConnector extends Connector {
}
public static class SimpleTransformation<R extends ConnectRecord<R>> implements Transformation<R> {
public static class SimpleTransformation<R extends ConnectRecord<R>> implements Transformation<R>, Versioned {
int magicNumber = 0;
@Override
public String version() {
return "1.0";
}
@Override
public void configure(Map<String, ?> props) {
magicNumber = Integer.parseInt((String) props.get("magic.number"));
@ -393,22 +399,37 @@ public class ConnectorConfigTest<R extends ConnectRecord<R>> { @@ -393,22 +399,37 @@ public class ConnectorConfigTest<R extends ConnectRecord<R>> {
}
}
public static abstract class AbstractTestPredicate<R extends ConnectRecord<R>> implements Predicate<R> {
public static abstract class AbstractTestPredicate<R extends ConnectRecord<R>> implements Predicate<R>, Versioned {
@Override
public String version() {
return "1.0";
}
public AbstractTestPredicate() { }
}
public static abstract class AbstractTransformation<R extends ConnectRecord<R>> implements Transformation<R> {
public static abstract class AbstractTransformation<R extends ConnectRecord<R>> implements Transformation<R>, Versioned {
@Override
public String version() {
return "1.0";
}
}
public static abstract class AbstractKeyValueTransformation<R extends ConnectRecord<R>> implements Transformation<R> {
public static abstract class AbstractKeyValueTransformation<R extends ConnectRecord<R>> implements Transformation<R>, Versioned {
@Override
public R apply(R record) {
return null;
}
@Override
public String version() {
return "1.0";
}
@Override
public ConfigDef config() {
return new ConfigDef();
@ -425,8 +446,12 @@ public class ConnectorConfigTest<R extends ConnectRecord<R>> { @@ -425,8 +446,12 @@ public class ConnectorConfigTest<R extends ConnectRecord<R>> {
}
public static class Key<R extends ConnectRecord<R>> extends AbstractKeyValueTransformation<R> {
public static class Key<R extends ConnectRecord<R>> extends AbstractKeyValueTransformation<R> implements Versioned {
@Override
public String version() {
return "1.0";
}
}
public static class Value<R extends ConnectRecord<R>> extends AbstractKeyValueTransformation<R> {
@ -454,7 +479,7 @@ public class ConnectorConfigTest<R extends ConnectRecord<R>> { @@ -454,7 +479,7 @@ public class ConnectorConfigTest<R extends ConnectRecord<R>> {
assertEquals(prefix + keyName + "' config should be a " + expectedType, expectedType, configKey.type);
}
public static class HasDuplicateConfigTransformation<R extends ConnectRecord<R>> implements Transformation<R> {
public static class HasDuplicateConfigTransformation<R extends ConnectRecord<R>> implements Transformation<R>, Versioned {
private static final String MUST_EXIST_KEY = "must.exist.key";
private static final ConfigDef CONFIG_DEF = new ConfigDef()
// this configDef is duplicate. It should be removed automatically so as to avoid duplicate config error.
@ -469,6 +494,11 @@ public class ConnectorConfigTest<R extends ConnectRecord<R>> { @@ -469,6 +494,11 @@ public class ConnectorConfigTest<R extends ConnectRecord<R>> {
return record;
}
@Override
public String version() {
return "1.0";
}
@Override
public ConfigDef config() {
return CONFIG_DEF;

15
connect/runtime/src/test/java/org/apache/kafka/connect/runtime/ErrorHandlingTaskTest.java

@ -26,6 +26,7 @@ import org.apache.kafka.common.TopicPartition; @@ -26,6 +26,7 @@ import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.config.ConfigDef;
import org.apache.kafka.common.utils.MockTime;
import org.apache.kafka.common.utils.Time;
import org.apache.kafka.connect.components.Versioned;
import org.apache.kafka.connect.connector.ConnectRecord;
import org.apache.kafka.connect.data.Schema;
import org.apache.kafka.connect.data.SchemaBuilder;
@ -553,7 +554,7 @@ public class ErrorHandlingTaskTest { @@ -553,7 +554,7 @@ public class ErrorHandlingTaskTest {
}
// Public to allow plugin discovery to complete without errors
public static class FaultyConverter extends JsonConverter {
public static class FaultyConverter extends JsonConverter implements Versioned {
private static final Logger log = LoggerFactory.getLogger(FaultyConverter.class);
private int invocations = 0;
@ -570,10 +571,15 @@ public class ErrorHandlingTaskTest { @@ -570,10 +571,15 @@ public class ErrorHandlingTaskTest {
throw new RetriableException("Bad invocations " + invocations + " for mod 3");
}
}
@Override
public String version() {
return "1.0";
}
}
// Public to allow plugin discovery to complete without errors
public static class FaultyPassthrough<R extends ConnectRecord<R>> implements Transformation<R> {
public static class FaultyPassthrough<R extends ConnectRecord<R>> implements Transformation<R>, Versioned {
private static final Logger log = LoggerFactory.getLogger(FaultyPassthrough.class);
@ -599,6 +605,11 @@ public class ErrorHandlingTaskTest { @@ -599,6 +605,11 @@ public class ErrorHandlingTaskTest {
}
}
@Override
public String version() {
return "1.0";
}
@Override
public ConfigDef config() {
return CONFIG_DEF;

9
connect/runtime/src/test/java/org/apache/kafka/connect/runtime/SampleConverterWithHeaders.java

@ -20,6 +20,8 @@ import java.io.UnsupportedEncodingException; @@ -20,6 +20,8 @@ import java.io.UnsupportedEncodingException;
import java.util.Map;
import org.apache.kafka.common.header.Header;
import org.apache.kafka.common.header.Headers;
import org.apache.kafka.common.utils.AppInfoParser;
import org.apache.kafka.connect.components.Versioned;
import org.apache.kafka.connect.data.Schema;
import org.apache.kafka.connect.data.SchemaAndValue;
import org.apache.kafka.connect.errors.DataException;
@ -28,9 +30,14 @@ import org.apache.kafka.connect.storage.Converter; @@ -28,9 +30,14 @@ import org.apache.kafka.connect.storage.Converter;
/**
* This is a simple Converter implementation that uses "encoding" header to encode/decode strings via provided charset name
*/
public class SampleConverterWithHeaders implements Converter {
public class SampleConverterWithHeaders implements Converter, Versioned {
private static final String HEADER_ENCODING = "encoding";
@Override
public String version() {
return AppInfoParser.getVersion();
}
@Override
public void configure(Map<String, ?> configs, boolean isKey) {

9
connect/runtime/src/test/java/org/apache/kafka/connect/runtime/SampleHeaderConverter.java

@ -17,6 +17,8 @@ @@ -17,6 +17,8 @@
package org.apache.kafka.connect.runtime;
import org.apache.kafka.common.config.ConfigDef;
import org.apache.kafka.common.utils.AppInfoParser;
import org.apache.kafka.connect.components.Versioned;
import org.apache.kafka.connect.data.Schema;
import org.apache.kafka.connect.data.SchemaAndValue;
import org.apache.kafka.connect.storage.HeaderConverter;
@ -24,12 +26,17 @@ import org.apache.kafka.connect.storage.HeaderConverter; @@ -24,12 +26,17 @@ import org.apache.kafka.connect.storage.HeaderConverter;
import java.io.IOException;
import java.util.Map;
public class SampleHeaderConverter implements HeaderConverter {
public class SampleHeaderConverter implements HeaderConverter, Versioned {
@Override
public SchemaAndValue toConnectHeader(String topic, String headerKey, byte[] value) {
return null;
}
@Override
public String version() {
return AppInfoParser.getVersion();
}
@Override
public byte[] fromConnectHeader(String topic, String headerKey, Schema schema, Object value) {
return new byte[0];

9
connect/runtime/src/test/java/org/apache/kafka/connect/runtime/SamplePredicate.java

@ -17,12 +17,14 @@ @@ -17,12 +17,14 @@
package org.apache.kafka.connect.runtime;
import org.apache.kafka.common.config.ConfigDef;
import org.apache.kafka.common.utils.AppInfoParser;
import org.apache.kafka.connect.components.Versioned;
import org.apache.kafka.connect.source.SourceRecord;
import org.apache.kafka.connect.transforms.predicates.Predicate;
import java.util.Map;
public class SamplePredicate implements Predicate<SourceRecord> {
public class SamplePredicate implements Predicate<SourceRecord>, Versioned {
private boolean testResult;
boolean closed = false;
@ -33,6 +35,11 @@ public class SamplePredicate implements Predicate<SourceRecord> { @@ -33,6 +35,11 @@ public class SamplePredicate implements Predicate<SourceRecord> {
this.testResult = testResult;
}
@Override
public String version() {
return AppInfoParser.getVersion();
}
@Override
public ConfigDef config() {
return new ConfigDef()

9
connect/runtime/src/test/java/org/apache/kafka/connect/runtime/SampleTransformation.java

@ -17,12 +17,14 @@ @@ -17,12 +17,14 @@
package org.apache.kafka.connect.runtime;
import org.apache.kafka.common.config.ConfigDef;
import org.apache.kafka.common.utils.AppInfoParser;
import org.apache.kafka.connect.components.Versioned;
import org.apache.kafka.connect.connector.ConnectRecord;
import org.apache.kafka.connect.transforms.Transformation;
import java.util.Map;
public class SampleTransformation<R extends ConnectRecord<R>> implements Transformation<R> {
public class SampleTransformation<R extends ConnectRecord<R>> implements Transformation<R>, Versioned {
boolean closed = false;
private R transformedRecord;
@ -38,6 +40,11 @@ public class SampleTransformation<R extends ConnectRecord<R>> implements Transfo @@ -38,6 +40,11 @@ public class SampleTransformation<R extends ConnectRecord<R>> implements Transfo
return transformedRecord;
}
@Override
public String version() {
return AppInfoParser.getVersion();
}
@Override
public ConfigDef config() {
return new ConfigDef()

23
connect/runtime/src/test/java/org/apache/kafka/connect/runtime/isolation/PluginScannerTest.java

@ -17,6 +17,7 @@ @@ -17,6 +17,7 @@
package org.apache.kafka.connect.runtime.isolation;
import org.junit.BeforeClass;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.TemporaryFolder;
@ -34,6 +35,7 @@ import java.util.Set; @@ -34,6 +35,7 @@ import java.util.Set;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.assertFalse;
@RunWith(Parameterized.class)
public class PluginScannerTest {
@ -67,6 +69,12 @@ public class PluginScannerTest { @@ -67,6 +69,12 @@ public class PluginScannerTest {
}
}
@BeforeClass
public static void setUp() {
// Work around a circular-dependency in TestPlugins.
TestPlugins.pluginPath();
}
@Test
public void testScanningEmptyPluginPath() {
PluginScanResult result = scan(
@ -145,6 +153,21 @@ public class PluginScannerTest { @@ -145,6 +153,21 @@ public class PluginScannerTest {
assertEquals(expectedClasses, classes);
}
@Test
public void testNonVersionedPluginHasUndefinedVersion() {
PluginScanResult unversionedPluginsResult = scan(TestPlugins.pluginPath(TestPlugins.TestPlugin.SAMPLING_HEADER_CONVERTER));
assertFalse(unversionedPluginsResult.isEmpty());
unversionedPluginsResult.forEach(pluginDesc -> assertEquals(PluginDesc.UNDEFINED_VERSION, pluginDesc.version()));
}
@Test
public void testVersionedPluginsHasVersion() {
PluginScanResult versionedPluginResult = scan(TestPlugins.pluginPath(TestPlugins.TestPlugin.READ_VERSION_FROM_RESOURCE_V1));
assertFalse(versionedPluginResult.isEmpty());
versionedPluginResult.forEach(pluginDesc -> assertEquals("1.0.0", pluginDesc.version()));
}
private PluginScanResult scan(Set<Path> pluginLocations) {
ClassLoaderFactory factory = new ClassLoaderFactory();
Set<PluginSource> pluginSources = PluginUtils.pluginSources(pluginLocations, PluginScannerTest.class.getClassLoader(), factory);

22
connect/runtime/src/test/java/org/apache/kafka/connect/runtime/isolation/PluginUtilsTest.java

@ -17,6 +17,7 @@ @@ -17,6 +17,7 @@
package org.apache.kafka.connect.runtime.isolation;
import org.apache.kafka.common.config.ConfigDef;
import org.apache.kafka.connect.components.Versioned;
import org.apache.kafka.connect.connector.ConnectRecord;
import org.apache.kafka.connect.data.Schema;
import org.apache.kafka.connect.data.SchemaAndValue;
@ -609,7 +610,7 @@ public class PluginUtilsTest { @@ -609,7 +610,7 @@ public class PluginUtilsTest {
assertEquals(expectedAliases, actualAliases);
}
public static class CollidingConverter implements Converter {
public static class CollidingConverter implements Converter, Versioned {
@Override
public void configure(Map<String, ?> configs, boolean isKey) {
}
@ -623,9 +624,14 @@ public class PluginUtilsTest { @@ -623,9 +624,14 @@ public class PluginUtilsTest {
public SchemaAndValue toConnectData(String topic, byte[] value) {
return null;
}
@Override
public String version() {
return "1.0";
}
}
public static class CollidingHeaderConverter implements HeaderConverter {
public static class CollidingHeaderConverter implements HeaderConverter, Versioned {
@Override
public SchemaAndValue toConnectHeader(String topic, String headerKey, byte[] value) {
@ -649,9 +655,19 @@ public class PluginUtilsTest { @@ -649,9 +655,19 @@ public class PluginUtilsTest {
@Override
public void configure(Map<String, ?> configs) {
}
@Override
public String version() {
return "1.0";
}
}
public static class Colliding<R extends ConnectRecord<R>> implements Transformation<R> {
public static class Colliding<R extends ConnectRecord<R>> implements Transformation<R>, Versioned {
@Override
public String version() {
return "1.0";
}
@Override
public void configure(Map<String, ?> configs) {

15
connect/runtime/src/test/java/org/apache/kafka/connect/runtime/isolation/PluginsTest.java

@ -32,6 +32,7 @@ import org.apache.kafka.common.config.ConfigDef; @@ -32,6 +32,7 @@ import org.apache.kafka.common.config.ConfigDef;
import org.apache.kafka.common.config.ConfigException;
import org.apache.kafka.common.config.provider.ConfigProvider;
import org.apache.kafka.common.utils.LogCaptureAppender;
import org.apache.kafka.connect.components.Versioned;
import org.apache.kafka.connect.connector.policy.AllConnectorClientConfigOverridePolicy;
import org.apache.kafka.connect.connector.policy.ConnectorClientConfigOverridePolicy;
import org.apache.kafka.connect.converters.ByteArrayConverter;
@ -688,7 +689,7 @@ public class PluginsTest { @@ -688,7 +689,7 @@ public class PluginsTest {
}
}
public static class TestConverter implements Converter, Configurable {
public static class TestConverter implements Converter, Configurable, Versioned {
public Map<String, ?> configs;
public ConfigDef config() {
@ -715,6 +716,11 @@ public class PluginsTest { @@ -715,6 +716,11 @@ public class PluginsTest {
public SchemaAndValue toConnectData(String topic, byte[] value) {
return null;
}
@Override
public String version() {
return "test";
}
}
public static class TestHeaderConverter implements HeaderConverter {
@ -770,9 +776,14 @@ public class PluginsTest { @@ -770,9 +776,14 @@ public class PluginsTest {
}
}
public static class TestInternalConverter extends JsonConverter {
public static class TestInternalConverter extends JsonConverter implements Versioned {
public Map<String, ?> configs;
@Override
public String version() {
return "test";
}
@Override
public void configure(Map<String, ?> configs) {
this.configs = configs;

16
connect/runtime/src/test/java/org/apache/kafka/connect/runtime/rest/resources/ConnectorPluginsResourceTest.java

@ -129,17 +129,17 @@ public class ConnectorPluginsResourceTest { @@ -129,17 +129,17 @@ public class ConnectorPluginsResourceTest {
SOURCE_CONNECTOR_PLUGINS.add(new PluginDesc<>(SchemaSourceConnector.class, appVersion, PluginType.SOURCE, classLoader));
SOURCE_CONNECTOR_PLUGINS.add(new PluginDesc<>(ConnectorPluginsResourceTestConnector.class, appVersion, PluginType.SOURCE, classLoader));
CONVERTER_PLUGINS.add(new PluginDesc<>(StringConverter.class, PluginDesc.UNDEFINED_VERSION, PluginType.CONVERTER, classLoader));
CONVERTER_PLUGINS.add(new PluginDesc<>(LongConverter.class, PluginDesc.UNDEFINED_VERSION, PluginType.CONVERTER, classLoader));
CONVERTER_PLUGINS.add(new PluginDesc<>(StringConverter.class, appVersion, PluginType.CONVERTER, classLoader));
CONVERTER_PLUGINS.add(new PluginDesc<>(LongConverter.class, appVersion, PluginType.CONVERTER, classLoader));
HEADER_CONVERTER_PLUGINS.add(new PluginDesc<>(StringConverter.class, PluginDesc.UNDEFINED_VERSION, PluginType.HEADER_CONVERTER, classLoader));
HEADER_CONVERTER_PLUGINS.add(new PluginDesc<>(LongConverter.class, PluginDesc.UNDEFINED_VERSION, PluginType.HEADER_CONVERTER, classLoader));
HEADER_CONVERTER_PLUGINS.add(new PluginDesc<>(StringConverter.class, appVersion, PluginType.HEADER_CONVERTER, classLoader));
HEADER_CONVERTER_PLUGINS.add(new PluginDesc<>(LongConverter.class, appVersion, PluginType.HEADER_CONVERTER, classLoader));
TRANSFORMATION_PLUGINS.add(new PluginDesc<>(RegexRouter.class, PluginDesc.UNDEFINED_VERSION, PluginType.TRANSFORMATION, classLoader));
TRANSFORMATION_PLUGINS.add(new PluginDesc<>(TimestampConverter.Key.class, PluginDesc.UNDEFINED_VERSION, PluginType.TRANSFORMATION, classLoader));
TRANSFORMATION_PLUGINS.add(new PluginDesc<>(RegexRouter.class, appVersion, PluginType.TRANSFORMATION, classLoader));
TRANSFORMATION_PLUGINS.add(new PluginDesc<>(TimestampConverter.Key.class, appVersion, PluginType.TRANSFORMATION, classLoader));
PREDICATE_PLUGINS.add(new PluginDesc<>(HasHeaderKey.class, PluginDesc.UNDEFINED_VERSION, PluginType.PREDICATE, classLoader));
PREDICATE_PLUGINS.add(new PluginDesc<>(RecordIsTombstone.class, PluginDesc.UNDEFINED_VERSION, PluginType.PREDICATE, classLoader));
PREDICATE_PLUGINS.add(new PluginDesc<>(HasHeaderKey.class, appVersion, PluginType.PREDICATE, classLoader));
PREDICATE_PLUGINS.add(new PluginDesc<>(RecordIsTombstone.class, appVersion, PluginType.PREDICATE, classLoader));
} catch (Exception e) {
e.printStackTrace();
fail("Failed setting up plugins");

9
connect/transforms/src/main/java/org/apache/kafka/connect/transforms/Cast.java

@ -22,7 +22,9 @@ import org.apache.kafka.common.cache.LRUCache; @@ -22,7 +22,9 @@ import org.apache.kafka.common.cache.LRUCache;
import org.apache.kafka.common.cache.SynchronizedCache;
import org.apache.kafka.common.config.ConfigDef;
import org.apache.kafka.common.config.ConfigException;
import org.apache.kafka.common.utils.AppInfoParser;
import org.apache.kafka.common.utils.Utils;
import org.apache.kafka.connect.components.Versioned;
import org.apache.kafka.connect.connector.ConnectRecord;
import org.apache.kafka.connect.data.ConnectSchema;
import org.apache.kafka.connect.data.Date;
@ -52,7 +54,7 @@ import java.util.Set; @@ -52,7 +54,7 @@ import java.util.Set;
import static org.apache.kafka.connect.transforms.util.Requirements.requireMap;
import static org.apache.kafka.connect.transforms.util.Requirements.requireStruct;
public abstract class Cast<R extends ConnectRecord<R>> implements Transformation<R> {
public abstract class Cast<R extends ConnectRecord<R>> implements Transformation<R>, Versioned {
private static final Logger log = LoggerFactory.getLogger(Cast.class);
// TODO: Currently we only support top-level field casting. Ideally we could use a dotted notation in the spec to
@ -110,6 +112,11 @@ public abstract class Cast<R extends ConnectRecord<R>> implements Transformation @@ -110,6 +112,11 @@ public abstract class Cast<R extends ConnectRecord<R>> implements Transformation
private Schema.Type wholeValueCastType;
private Cache<Schema, Schema> schemaUpdateCache;
@Override
public String version() {
return AppInfoParser.getVersion();
}
@Override
public void configure(Map<String, ?> props) {
final SimpleConfig config = new SimpleConfig(CONFIG_DEF, props);

9
connect/transforms/src/main/java/org/apache/kafka/connect/transforms/DropHeaders.java

@ -17,6 +17,8 @@ @@ -17,6 +17,8 @@
package org.apache.kafka.connect.transforms;
import org.apache.kafka.common.config.ConfigDef;
import org.apache.kafka.common.utils.AppInfoParser;
import org.apache.kafka.connect.components.Versioned;
import org.apache.kafka.connect.connector.ConnectRecord;
import org.apache.kafka.connect.header.ConnectHeaders;
import org.apache.kafka.connect.header.Header;
@ -30,7 +32,7 @@ import java.util.Set; @@ -30,7 +32,7 @@ import java.util.Set;
import static org.apache.kafka.common.config.ConfigDef.NO_DEFAULT_VALUE;
public class DropHeaders<R extends ConnectRecord<R>> implements Transformation<R> {
public class DropHeaders<R extends ConnectRecord<R>> implements Transformation<R>, Versioned {
public static final String OVERVIEW_DOC =
"Removes one or more headers from each record.";
@ -57,6 +59,11 @@ public class DropHeaders<R extends ConnectRecord<R>> implements Transformation<R @@ -57,6 +59,11 @@ public class DropHeaders<R extends ConnectRecord<R>> implements Transformation<R
record.valueSchema(), record.value(), record.timestamp(), updatedHeaders);
}
@Override
public String version() {
return AppInfoParser.getVersion();
}
@Override
public ConfigDef config() {
return CONFIG_DEF;

9
connect/transforms/src/main/java/org/apache/kafka/connect/transforms/ExtractField.java

@ -17,6 +17,8 @@ @@ -17,6 +17,8 @@
package org.apache.kafka.connect.transforms;
import org.apache.kafka.common.config.ConfigDef;
import org.apache.kafka.common.utils.AppInfoParser;
import org.apache.kafka.connect.components.Versioned;
import org.apache.kafka.connect.connector.ConnectRecord;
import org.apache.kafka.connect.data.Field;
import org.apache.kafka.connect.data.Schema;
@ -28,7 +30,7 @@ import java.util.Map; @@ -28,7 +30,7 @@ import java.util.Map;
import static org.apache.kafka.connect.transforms.util.Requirements.requireMapOrNull;
import static org.apache.kafka.connect.transforms.util.Requirements.requireStructOrNull;
public abstract class ExtractField<R extends ConnectRecord<R>> implements Transformation<R> {
public abstract class ExtractField<R extends ConnectRecord<R>> implements Transformation<R>, Versioned {
public static final String OVERVIEW_DOC =
"Extract the specified field from a Struct when schema present, or a Map in the case of schemaless data. "
@ -45,6 +47,11 @@ public abstract class ExtractField<R extends ConnectRecord<R>> implements Transf @@ -45,6 +47,11 @@ public abstract class ExtractField<R extends ConnectRecord<R>> implements Transf
private String fieldName;
@Override
public String version() {
return AppInfoParser.getVersion();
}
@Override
public void configure(Map<String, ?> props) {
final SimpleConfig config = new SimpleConfig(CONFIG_DEF, props);

9
connect/transforms/src/main/java/org/apache/kafka/connect/transforms/Filter.java

@ -19,6 +19,8 @@ package org.apache.kafka.connect.transforms; @@ -19,6 +19,8 @@ package org.apache.kafka.connect.transforms;
import java.util.Map;
import org.apache.kafka.common.config.ConfigDef;
import org.apache.kafka.common.utils.AppInfoParser;
import org.apache.kafka.connect.components.Versioned;
import org.apache.kafka.connect.connector.ConnectRecord;
/**
@ -27,7 +29,7 @@ import org.apache.kafka.connect.connector.ConnectRecord; @@ -27,7 +29,7 @@ import org.apache.kafka.connect.connector.ConnectRecord;
* a particular {@link org.apache.kafka.connect.transforms.predicates.Predicate}.
* @param <R> The type of record.
*/
public class Filter<R extends ConnectRecord<R>> implements Transformation<R> {
public class Filter<R extends ConnectRecord<R>> implements Transformation<R>, Versioned {
public static final String OVERVIEW_DOC = "Drops all records, filtering them from subsequent transformations in the chain. " +
"This is intended to be used conditionally to filter out records matching (or not matching) " +
@ -39,6 +41,11 @@ public class Filter<R extends ConnectRecord<R>> implements Transformation<R> { @@ -39,6 +41,11 @@ public class Filter<R extends ConnectRecord<R>> implements Transformation<R> {
return null;
}
@Override
public String version() {
return AppInfoParser.getVersion();
}
@Override
public ConfigDef config() {
return CONFIG_DEF;

9
connect/transforms/src/main/java/org/apache/kafka/connect/transforms/Flatten.java

@ -21,6 +21,8 @@ import org.apache.kafka.common.cache.Cache; @@ -21,6 +21,8 @@ import org.apache.kafka.common.cache.Cache;
import org.apache.kafka.common.cache.LRUCache;
import org.apache.kafka.common.cache.SynchronizedCache;
import org.apache.kafka.common.config.ConfigDef;
import org.apache.kafka.common.utils.AppInfoParser;
import org.apache.kafka.connect.components.Versioned;
import org.apache.kafka.connect.connector.ConnectRecord;
import org.apache.kafka.connect.data.ConnectSchema;
import org.apache.kafka.connect.data.Field;
@ -37,7 +39,7 @@ import java.util.Map; @@ -37,7 +39,7 @@ import java.util.Map;
import static org.apache.kafka.connect.transforms.util.Requirements.requireMap;
import static org.apache.kafka.connect.transforms.util.Requirements.requireStructOrNull;
public abstract class Flatten<R extends ConnectRecord<R>> implements Transformation<R> {
public abstract class Flatten<R extends ConnectRecord<R>> implements Transformation<R>, Versioned {
public static final String OVERVIEW_DOC =
"Flatten a nested data structure, generating names for each field by concatenating the field names at each "
@ -78,6 +80,11 @@ public abstract class Flatten<R extends ConnectRecord<R>> implements Transformat @@ -78,6 +80,11 @@ public abstract class Flatten<R extends ConnectRecord<R>> implements Transformat
}
}
@Override
public String version() {
return AppInfoParser.getVersion();
}
@Override
public void close() {
}

9
connect/transforms/src/main/java/org/apache/kafka/connect/transforms/HeaderFrom.java

@ -21,6 +21,8 @@ import org.apache.kafka.common.cache.LRUCache; @@ -21,6 +21,8 @@ import org.apache.kafka.common.cache.LRUCache;
import org.apache.kafka.common.cache.SynchronizedCache;
import org.apache.kafka.common.config.ConfigDef;
import org.apache.kafka.common.config.ConfigException;
import org.apache.kafka.common.utils.AppInfoParser;
import org.apache.kafka.connect.components.Versioned;
import org.apache.kafka.connect.connector.ConnectRecord;
import org.apache.kafka.connect.data.Field;
import org.apache.kafka.connect.data.Schema;
@ -40,7 +42,7 @@ import java.util.Map; @@ -40,7 +42,7 @@ import java.util.Map;
import static java.lang.String.format;
import static org.apache.kafka.common.config.ConfigDef.NO_DEFAULT_VALUE;
public abstract class HeaderFrom<R extends ConnectRecord<R>> implements Transformation<R> {
public abstract class HeaderFrom<R extends ConnectRecord<R>> implements Transformation<R>, Versioned {
public static final String FIELDS_FIELD = "fields";
public static final String HEADERS_FIELD = "headers";
@ -116,6 +118,11 @@ public abstract class HeaderFrom<R extends ConnectRecord<R>> implements Transfor @@ -116,6 +118,11 @@ public abstract class HeaderFrom<R extends ConnectRecord<R>> implements Transfor
}
}
@Override
public String version() {
return AppInfoParser.getVersion();
}
private R applyWithSchema(R record, Object operatingValue, Schema operatingSchema) {
Headers updatedHeaders = record.headers().duplicate();
Struct value = Requirements.requireStruct(operatingValue, "header " + operation);

9
connect/transforms/src/main/java/org/apache/kafka/connect/transforms/HoistField.java

@ -20,6 +20,8 @@ import org.apache.kafka.common.cache.Cache; @@ -20,6 +20,8 @@ import org.apache.kafka.common.cache.Cache;
import org.apache.kafka.common.cache.LRUCache;
import org.apache.kafka.common.cache.SynchronizedCache;
import org.apache.kafka.common.config.ConfigDef;
import org.apache.kafka.common.utils.AppInfoParser;
import org.apache.kafka.connect.components.Versioned;
import org.apache.kafka.connect.connector.ConnectRecord;
import org.apache.kafka.connect.data.Schema;
import org.apache.kafka.connect.data.SchemaBuilder;
@ -29,7 +31,7 @@ import org.apache.kafka.connect.transforms.util.SimpleConfig; @@ -29,7 +31,7 @@ import org.apache.kafka.connect.transforms.util.SimpleConfig;
import java.util.HashMap;
import java.util.Map;
public abstract class HoistField<R extends ConnectRecord<R>> implements Transformation<R> {
public abstract class HoistField<R extends ConnectRecord<R>> implements Transformation<R>, Versioned {
public static final String OVERVIEW_DOC =
"Wrap data using the specified field name in a Struct when schema present, or a Map in the case of schemaless data."
@ -75,6 +77,11 @@ public abstract class HoistField<R extends ConnectRecord<R>> implements Transfor @@ -75,6 +77,11 @@ public abstract class HoistField<R extends ConnectRecord<R>> implements Transfor
}
}
@Override
public String version() {
return AppInfoParser.getVersion();
}
@Override
public void close() {
schemaUpdateCache = null;

9
connect/transforms/src/main/java/org/apache/kafka/connect/transforms/InsertField.java

@ -21,6 +21,8 @@ import org.apache.kafka.common.cache.LRUCache; @@ -21,6 +21,8 @@ import org.apache.kafka.common.cache.LRUCache;
import org.apache.kafka.common.cache.SynchronizedCache;
import org.apache.kafka.common.config.ConfigDef;
import org.apache.kafka.common.config.ConfigException;
import org.apache.kafka.common.utils.AppInfoParser;
import org.apache.kafka.connect.components.Versioned;
import org.apache.kafka.connect.connector.ConnectRecord;
import org.apache.kafka.connect.data.Field;
import org.apache.kafka.connect.data.Schema;
@ -38,7 +40,7 @@ import static org.apache.kafka.connect.transforms.util.Requirements.requireMap; @@ -38,7 +40,7 @@ import static org.apache.kafka.connect.transforms.util.Requirements.requireMap;
import static org.apache.kafka.connect.transforms.util.Requirements.requireSinkRecord;
import static org.apache.kafka.connect.transforms.util.Requirements.requireStruct;
public abstract class InsertField<R extends ConnectRecord<R>> implements Transformation<R> {
public abstract class InsertField<R extends ConnectRecord<R>> implements Transformation<R>, Versioned {
public static final String OVERVIEW_DOC =
"Insert field(s) using attributes from the record metadata or a configured static value."
@ -104,6 +106,11 @@ public abstract class InsertField<R extends ConnectRecord<R>> implements Transfo @@ -104,6 +106,11 @@ public abstract class InsertField<R extends ConnectRecord<R>> implements Transfo
private Cache<Schema, Schema> schemaUpdateCache;
@Override
public String version() {
return AppInfoParser.getVersion();
}
@Override
public void configure(Map<String, ?> props) {
final SimpleConfig config = new SimpleConfig(CONFIG_DEF, props);

9
connect/transforms/src/main/java/org/apache/kafka/connect/transforms/InsertHeader.java

@ -17,6 +17,8 @@ @@ -17,6 +17,8 @@
package org.apache.kafka.connect.transforms;
import org.apache.kafka.common.config.ConfigDef;
import org.apache.kafka.common.utils.AppInfoParser;
import org.apache.kafka.connect.components.Versioned;
import org.apache.kafka.connect.connector.ConnectRecord;
import org.apache.kafka.connect.data.SchemaAndValue;
import org.apache.kafka.connect.data.Values;
@ -27,7 +29,7 @@ import java.util.Map; @@ -27,7 +29,7 @@ import java.util.Map;
import static org.apache.kafka.common.config.ConfigDef.NO_DEFAULT_VALUE;
public class InsertHeader<R extends ConnectRecord<R>> implements Transformation<R> {
public class InsertHeader<R extends ConnectRecord<R>> implements Transformation<R>, Versioned {
public static final String OVERVIEW_DOC =
"Add a header to each record.";
@ -57,6 +59,11 @@ public class InsertHeader<R extends ConnectRecord<R>> implements Transformation< @@ -57,6 +59,11 @@ public class InsertHeader<R extends ConnectRecord<R>> implements Transformation<
record.valueSchema(), record.value(), record.timestamp(), updatedHeaders);
}
@Override
public String version() {
return AppInfoParser.getVersion();
}
@Override
public ConfigDef config() {

9
connect/transforms/src/main/java/org/apache/kafka/connect/transforms/MaskField.java

@ -18,6 +18,8 @@ package org.apache.kafka.connect.transforms; @@ -18,6 +18,8 @@ package org.apache.kafka.connect.transforms;
import java.util.ArrayList;
import org.apache.kafka.common.config.ConfigDef;
import org.apache.kafka.common.utils.AppInfoParser;
import org.apache.kafka.connect.components.Versioned;
import org.apache.kafka.connect.connector.ConnectRecord;
import org.apache.kafka.connect.data.Field;
import org.apache.kafka.connect.data.Schema;
@ -40,7 +42,7 @@ import java.util.function.Function; @@ -40,7 +42,7 @@ import java.util.function.Function;
import static org.apache.kafka.connect.transforms.util.Requirements.requireMap;
import static org.apache.kafka.connect.transforms.util.Requirements.requireStruct;
public abstract class MaskField<R extends ConnectRecord<R>> implements Transformation<R> {
public abstract class MaskField<R extends ConnectRecord<R>> implements Transformation<R>, Versioned {
public static final String OVERVIEW_DOC =
"Mask specified fields with a valid null value for the field type (i.e. 0, false, empty string, and so on)."
@ -90,6 +92,11 @@ public abstract class MaskField<R extends ConnectRecord<R>> implements Transform @@ -90,6 +92,11 @@ public abstract class MaskField<R extends ConnectRecord<R>> implements Transform
private Set<String> maskedFields;
private String replacement;
@Override
public String version() {
return AppInfoParser.getVersion();
}
@Override
public void configure(Map<String, ?> props) {
final SimpleConfig config = new SimpleConfig(CONFIG_DEF, props);

9
connect/transforms/src/main/java/org/apache/kafka/connect/transforms/RegexRouter.java

@ -17,6 +17,8 @@ @@ -17,6 +17,8 @@
package org.apache.kafka.connect.transforms;
import org.apache.kafka.common.config.ConfigDef;
import org.apache.kafka.common.utils.AppInfoParser;
import org.apache.kafka.connect.components.Versioned;
import org.apache.kafka.connect.connector.ConnectRecord;
import org.apache.kafka.connect.transforms.util.RegexValidator;
import org.apache.kafka.connect.transforms.util.SimpleConfig;
@ -27,7 +29,7 @@ import java.util.Map; @@ -27,7 +29,7 @@ import java.util.Map;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
public class RegexRouter<R extends ConnectRecord<R>> implements Transformation<R> {
public class RegexRouter<R extends ConnectRecord<R>> implements Transformation<R>, Versioned {
private static final Logger log = LoggerFactory.getLogger(RegexRouter.class);
@ -49,6 +51,11 @@ public class RegexRouter<R extends ConnectRecord<R>> implements Transformation<R @@ -49,6 +51,11 @@ public class RegexRouter<R extends ConnectRecord<R>> implements Transformation<R
private Pattern regex;
private String replacement;
@Override
public String version() {
return AppInfoParser.getVersion();
}
@Override
public void configure(Map<String, ?> props) {
final SimpleConfig config = new SimpleConfig(CONFIG_DEF, props);

9
connect/transforms/src/main/java/org/apache/kafka/connect/transforms/ReplaceField.java

@ -22,7 +22,9 @@ import org.apache.kafka.common.cache.SynchronizedCache; @@ -22,7 +22,9 @@ import org.apache.kafka.common.cache.SynchronizedCache;
import org.apache.kafka.common.config.ConfigDef;
import org.apache.kafka.common.config.ConfigDef.Importance;
import org.apache.kafka.common.config.ConfigException;
import org.apache.kafka.common.utils.AppInfoParser;
import org.apache.kafka.common.utils.ConfigUtils;
import org.apache.kafka.connect.components.Versioned;
import org.apache.kafka.connect.connector.ConnectRecord;
import org.apache.kafka.connect.data.Field;
import org.apache.kafka.connect.data.Schema;
@ -41,7 +43,7 @@ import java.util.Set; @@ -41,7 +43,7 @@ import java.util.Set;
import static org.apache.kafka.connect.transforms.util.Requirements.requireMap;
import static org.apache.kafka.connect.transforms.util.Requirements.requireStruct;
public abstract class ReplaceField<R extends ConnectRecord<R>> implements Transformation<R> {
public abstract class ReplaceField<R extends ConnectRecord<R>> implements Transformation<R>, Versioned {
public static final String OVERVIEW_DOC = "Filter or rename fields."
+ "<p/>Use the concrete transformation type designed for the record key (<code>" + Key.class.getName() + "</code>) "
@ -89,6 +91,11 @@ public abstract class ReplaceField<R extends ConnectRecord<R>> implements Transf @@ -89,6 +91,11 @@ public abstract class ReplaceField<R extends ConnectRecord<R>> implements Transf
private Cache<Schema, Schema> schemaUpdateCache;
@Override
public String version() {
return AppInfoParser.getVersion();
}
@Override
public void configure(Map<String, ?> configs) {
final SimpleConfig config = new SimpleConfig(CONFIG_DEF, ConfigUtils.translateDeprecatedConfigs(configs, new String[][]{

9
connect/transforms/src/main/java/org/apache/kafka/connect/transforms/SetSchemaMetadata.java

@ -18,6 +18,8 @@ package org.apache.kafka.connect.transforms; @@ -18,6 +18,8 @@ package org.apache.kafka.connect.transforms;
import org.apache.kafka.common.config.ConfigDef;
import org.apache.kafka.common.config.ConfigException;
import org.apache.kafka.common.utils.AppInfoParser;
import org.apache.kafka.connect.components.Versioned;
import org.apache.kafka.connect.connector.ConnectRecord;
import org.apache.kafka.connect.data.ConnectSchema;
import org.apache.kafka.connect.data.Field;
@ -31,7 +33,7 @@ import java.util.Map; @@ -31,7 +33,7 @@ import java.util.Map;
import static org.apache.kafka.connect.transforms.util.Requirements.requireSchema;
public abstract class SetSchemaMetadata<R extends ConnectRecord<R>> implements Transformation<R> {
public abstract class SetSchemaMetadata<R extends ConnectRecord<R>> implements Transformation<R>, Versioned {
private static final Logger log = LoggerFactory.getLogger(SetSchemaMetadata.class);
public static final String OVERVIEW_DOC =
@ -50,6 +52,11 @@ public abstract class SetSchemaMetadata<R extends ConnectRecord<R>> implements T @@ -50,6 +52,11 @@ public abstract class SetSchemaMetadata<R extends ConnectRecord<R>> implements T
private String schemaName;
private Integer schemaVersion;
@Override
public String version() {
return AppInfoParser.getVersion();
}
@Override
public void configure(Map<String, ?> configs) {
final SimpleConfig config = new SimpleConfig(CONFIG_DEF, configs);

9
connect/transforms/src/main/java/org/apache/kafka/connect/transforms/TimestampConverter.java

@ -22,7 +22,9 @@ import org.apache.kafka.common.cache.LRUCache; @@ -22,7 +22,9 @@ import org.apache.kafka.common.cache.LRUCache;
import org.apache.kafka.common.cache.SynchronizedCache;
import org.apache.kafka.common.config.ConfigDef;
import org.apache.kafka.common.config.ConfigException;
import org.apache.kafka.common.utils.AppInfoParser;
import org.apache.kafka.common.utils.Utils;
import org.apache.kafka.connect.components.Versioned;
import org.apache.kafka.connect.connector.ConnectRecord;
import org.apache.kafka.connect.data.Field;
import org.apache.kafka.connect.data.Schema;
@ -47,7 +49,7 @@ import java.util.concurrent.TimeUnit; @@ -47,7 +49,7 @@ import java.util.concurrent.TimeUnit;
import static org.apache.kafka.connect.transforms.util.Requirements.requireMap;
import static org.apache.kafka.connect.transforms.util.Requirements.requireStructOrNull;
public abstract class TimestampConverter<R extends ConnectRecord<R>> implements Transformation<R> {
public abstract class TimestampConverter<R extends ConnectRecord<R>> implements Transformation<R>, Versioned {
public static final String OVERVIEW_DOC =
"Convert timestamps between different formats such as Unix epoch, strings, and Connect Date/Timestamp types."
@ -121,6 +123,11 @@ public abstract class TimestampConverter<R extends ConnectRecord<R>> implements @@ -121,6 +123,11 @@ public abstract class TimestampConverter<R extends ConnectRecord<R>> implements
Object toType(Config config, Date orig);
}
@Override
public String version() {
return AppInfoParser.getVersion();
}
private static final Map<String, TimestampTranslator> TRANSLATORS = new HashMap<>();
static {
TRANSLATORS.put(TYPE_STRING, new TimestampTranslator() {

9
connect/transforms/src/main/java/org/apache/kafka/connect/transforms/TimestampRouter.java

@ -17,6 +17,8 @@ @@ -17,6 +17,8 @@
package org.apache.kafka.connect.transforms;
import org.apache.kafka.common.config.ConfigDef;
import org.apache.kafka.common.utils.AppInfoParser;
import org.apache.kafka.connect.components.Versioned;
import org.apache.kafka.connect.connector.ConnectRecord;
import org.apache.kafka.connect.errors.DataException;
import org.apache.kafka.connect.transforms.util.SimpleConfig;
@ -28,7 +30,7 @@ import java.util.TimeZone; @@ -28,7 +30,7 @@ import java.util.TimeZone;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
public class TimestampRouter<R extends ConnectRecord<R>> implements Transformation<R>, AutoCloseable {
public class TimestampRouter<R extends ConnectRecord<R>> implements Transformation<R>, AutoCloseable, Versioned {
private static final Pattern TOPIC = Pattern.compile("${topic}", Pattern.LITERAL);
@ -54,6 +56,11 @@ public class TimestampRouter<R extends ConnectRecord<R>> implements Transformati @@ -54,6 +56,11 @@ public class TimestampRouter<R extends ConnectRecord<R>> implements Transformati
private String topicFormat;
private ThreadLocal<SimpleDateFormat> timestampFormat;
@Override
public String version() {
return AppInfoParser.getVersion();
}
@Override
public void configure(Map<String, ?> props) {
final SimpleConfig config = new SimpleConfig(CONFIG_DEF, props);

9
connect/transforms/src/main/java/org/apache/kafka/connect/transforms/ValueToKey.java

@ -20,6 +20,8 @@ import org.apache.kafka.common.cache.Cache; @@ -20,6 +20,8 @@ import org.apache.kafka.common.cache.Cache;
import org.apache.kafka.common.cache.LRUCache;
import org.apache.kafka.common.cache.SynchronizedCache;
import org.apache.kafka.common.config.ConfigDef;
import org.apache.kafka.common.utils.AppInfoParser;
import org.apache.kafka.connect.components.Versioned;
import org.apache.kafka.connect.connector.ConnectRecord;
import org.apache.kafka.connect.data.Field;
import org.apache.kafka.connect.data.Schema;
@ -36,7 +38,7 @@ import java.util.Map; @@ -36,7 +38,7 @@ import java.util.Map;
import static org.apache.kafka.connect.transforms.util.Requirements.requireMap;
import static org.apache.kafka.connect.transforms.util.Requirements.requireStruct;
public class ValueToKey<R extends ConnectRecord<R>> implements Transformation<R> {
public class ValueToKey<R extends ConnectRecord<R>> implements Transformation<R>, Versioned {
public static final String OVERVIEW_DOC = "Replace the record key with a new key formed from a subset of fields in the record value.";
@ -52,6 +54,11 @@ public class ValueToKey<R extends ConnectRecord<R>> implements Transformation<R> @@ -52,6 +54,11 @@ public class ValueToKey<R extends ConnectRecord<R>> implements Transformation<R>
private Cache<Schema, Schema> valueToKeySchemaCache;
@Override
public String version() {
return AppInfoParser.getVersion();
}
@Override
public void configure(Map<String, ?> configs) {
final SimpleConfig config = new SimpleConfig(CONFIG_DEF, configs);

9
connect/transforms/src/main/java/org/apache/kafka/connect/transforms/predicates/HasHeaderKey.java

@ -20,6 +20,8 @@ import java.util.Iterator; @@ -20,6 +20,8 @@ import java.util.Iterator;
import java.util.Map;
import org.apache.kafka.common.config.ConfigDef;
import org.apache.kafka.common.utils.AppInfoParser;
import org.apache.kafka.connect.components.Versioned;
import org.apache.kafka.connect.connector.ConnectRecord;
import org.apache.kafka.connect.header.Header;
import org.apache.kafka.connect.transforms.util.SimpleConfig;
@ -28,7 +30,7 @@ import org.apache.kafka.connect.transforms.util.SimpleConfig; @@ -28,7 +30,7 @@ import org.apache.kafka.connect.transforms.util.SimpleConfig;
* A predicate which is true for records with at least one header with the configured name.
* @param <R> The type of connect record.
*/
public class HasHeaderKey<R extends ConnectRecord<R>> implements Predicate<R> {
public class HasHeaderKey<R extends ConnectRecord<R>> implements Predicate<R>, Versioned {
private static final String NAME_CONFIG = "name";
public static final String OVERVIEW_DOC = "A predicate which is true for records with at least one header with the configured name.";
@ -38,6 +40,11 @@ public class HasHeaderKey<R extends ConnectRecord<R>> implements Predicate<R> { @@ -38,6 +40,11 @@ public class HasHeaderKey<R extends ConnectRecord<R>> implements Predicate<R> {
"The header name.");
private String name;
@Override
public String version() {
return AppInfoParser.getVersion();
}
@Override
public ConfigDef config() {
return CONFIG_DEF;

9
connect/transforms/src/main/java/org/apache/kafka/connect/transforms/predicates/RecordIsTombstone.java

@ -19,17 +19,24 @@ package org.apache.kafka.connect.transforms.predicates; @@ -19,17 +19,24 @@ package org.apache.kafka.connect.transforms.predicates;
import java.util.Map;
import org.apache.kafka.common.config.ConfigDef;
import org.apache.kafka.common.utils.AppInfoParser;
import org.apache.kafka.connect.components.Versioned;
import org.apache.kafka.connect.connector.ConnectRecord;
/**
* A predicate which is true for records which are tombstones (i.e. have null value).
* @param <R> The type of connect record.
*/
public class RecordIsTombstone<R extends ConnectRecord<R>> implements Predicate<R> {
public class RecordIsTombstone<R extends ConnectRecord<R>> implements Predicate<R>, Versioned {
public static final String OVERVIEW_DOC = "A predicate which is true for records which are tombstones (i.e. have null value).";
public static final ConfigDef CONFIG_DEF = new ConfigDef();
@Override
public String version() {
return AppInfoParser.getVersion();
}
@Override
public ConfigDef config() {
return CONFIG_DEF;

9
connect/transforms/src/main/java/org/apache/kafka/connect/transforms/predicates/TopicNameMatches.java

@ -22,6 +22,8 @@ import java.util.regex.PatternSyntaxException; @@ -22,6 +22,8 @@ import java.util.regex.PatternSyntaxException;
import org.apache.kafka.common.config.ConfigDef;
import org.apache.kafka.common.config.ConfigException;
import org.apache.kafka.common.utils.AppInfoParser;
import org.apache.kafka.connect.components.Versioned;
import org.apache.kafka.connect.connector.ConnectRecord;
import org.apache.kafka.connect.transforms.util.RegexValidator;
import org.apache.kafka.connect.transforms.util.SimpleConfig;
@ -30,7 +32,7 @@ import org.apache.kafka.connect.transforms.util.SimpleConfig; @@ -30,7 +32,7 @@ import org.apache.kafka.connect.transforms.util.SimpleConfig;
* A predicate which is true for records with a topic name that matches the configured regular expression.
* @param <R> The type of connect record.
*/
public class TopicNameMatches<R extends ConnectRecord<R>> implements Predicate<R> {
public class TopicNameMatches<R extends ConnectRecord<R>> implements Predicate<R>, Versioned {
private static final String PATTERN_CONFIG = "pattern";
@ -43,6 +45,11 @@ public class TopicNameMatches<R extends ConnectRecord<R>> implements Predicate<R @@ -43,6 +45,11 @@ public class TopicNameMatches<R extends ConnectRecord<R>> implements Predicate<R
"A Java regular expression for matching against the name of a record's topic.");
private Pattern pattern;
@Override
public String version() {
return AppInfoParser.getVersion();
}
@Override
public ConfigDef config() {
return CONFIG_DEF;

9
connect/transforms/src/test/java/org/apache/kafka/connect/transforms/CastTest.java

@ -23,6 +23,7 @@ import java.util.List; @@ -23,6 +23,7 @@ import java.util.List;
import java.util.concurrent.TimeUnit;
import org.apache.kafka.common.config.ConfigException;
import org.apache.kafka.common.utils.AppInfoParser;
import org.apache.kafka.connect.data.Decimal;
import org.apache.kafka.connect.data.Schema;
import org.apache.kafka.connect.data.Schema.Type;
@ -573,4 +574,12 @@ public class CastTest { @@ -573,4 +574,12 @@ public class CastTest {
assertEquals(42, ((Map<String, Object>) transformed.value()).get("string"));
}
@Test
public void testCastVersionRetrievedFromAppInfoParser() {
assertEquals(AppInfoParser.getVersion(), xformKey.version());
assertEquals(AppInfoParser.getVersion(), xformValue.version());
assertEquals(xformKey.version(), xformValue.version());
}
}

6
connect/transforms/src/test/java/org/apache/kafka/connect/transforms/DropHeadersTest.java

@ -17,6 +17,7 @@ @@ -17,6 +17,7 @@
package org.apache.kafka.connect.transforms;
import org.apache.kafka.common.config.ConfigException;
import org.apache.kafka.common.utils.AppInfoParser;
import org.apache.kafka.connect.data.Schema;
import org.apache.kafka.connect.header.ConnectHeaders;
import org.apache.kafka.connect.source.SourceRecord;
@ -86,6 +87,11 @@ public class DropHeadersTest { @@ -86,6 +87,11 @@ public class DropHeadersTest {
assertThrows(ConfigException.class, () -> xform.configure(config()));
}
@Test
public void testDropHeadersVersionRetrievedFromAppInfoParser() {
assertEquals(AppInfoParser.getVersion(), xform.version());
}
private void assertNonHeaders(SourceRecord original, SourceRecord xformed) {
assertEquals(original.sourcePartition(), xformed.sourcePartition());
assertEquals(original.sourceOffset(), xformed.sourceOffset());

7
connect/transforms/src/test/java/org/apache/kafka/connect/transforms/ExtractFieldTest.java

@ -16,6 +16,7 @@ @@ -16,6 +16,7 @@
*/
package org.apache.kafka.connect.transforms;
import org.apache.kafka.common.utils.AppInfoParser;
import org.apache.kafka.connect.data.Schema;
import org.apache.kafka.connect.data.SchemaBuilder;
import org.apache.kafka.connect.data.Struct;
@ -113,4 +114,10 @@ public class ExtractFieldTest { @@ -113,4 +114,10 @@ public class ExtractFieldTest {
assertEquals("Unknown field: nonexistent", iae.getMessage());
}
}
@Test
public void testExtractFieldVersionRetrievedFromAppInfoParser() {
assertEquals(AppInfoParser.getVersion(), xform.version());
}
}

9
connect/transforms/src/test/java/org/apache/kafka/connect/transforms/FlattenTest.java

@ -17,6 +17,7 @@ @@ -17,6 +17,7 @@
package org.apache.kafka.connect.transforms;
import org.apache.kafka.common.utils.AppInfoParser;
import org.apache.kafka.connect.data.Schema;
import org.apache.kafka.connect.data.SchemaBuilder;
import org.apache.kafka.connect.data.Struct;
@ -388,4 +389,12 @@ public class FlattenTest { @@ -388,4 +389,12 @@ public class FlattenTest {
assertEquals(value, transformedRecord.value());
}
@Test
public void testFlattenVersionRetrievedFromAppInfoParser() {
assertEquals(AppInfoParser.getVersion(), xformKey.version());
assertEquals(AppInfoParser.getVersion(), xformValue.version());
assertEquals(xformKey.version(), xformValue.version());
}
}

6
connect/transforms/src/test/java/org/apache/kafka/connect/transforms/HoistFieldTest.java

@ -16,6 +16,7 @@ @@ -16,6 +16,7 @@
*/
package org.apache.kafka.connect.transforms;
import org.apache.kafka.common.utils.AppInfoParser;
import org.apache.kafka.connect.data.Schema;
import org.apache.kafka.connect.data.Struct;
import org.apache.kafka.connect.sink.SinkRecord;
@ -77,4 +78,9 @@ public class HoistFieldTest { @@ -77,4 +78,9 @@ public class HoistFieldTest {
assertEquals(expectedKey, actualKey);
}
@Test
public void testHoistFieldVersionRetrievedFromAppInfoParser() {
assertEquals(AppInfoParser.getVersion(), xform.version());
}
}

9
connect/transforms/src/test/java/org/apache/kafka/connect/transforms/InsertFieldTest.java

@ -16,6 +16,7 @@ @@ -16,6 +16,7 @@
*/
package org.apache.kafka.connect.transforms;
import org.apache.kafka.common.utils.AppInfoParser;
import org.apache.kafka.connect.data.Schema;
import org.apache.kafka.connect.data.SchemaBuilder;
import org.apache.kafka.connect.data.Struct;
@ -200,4 +201,12 @@ public class InsertFieldTest { @@ -200,4 +201,12 @@ public class InsertFieldTest {
assertSame(record, transformedRecord);
}
@Test
public void testInsertFieldVersionRetrievedFromAppInfoParser() {
assertEquals(AppInfoParser.getVersion(), xformKey.version());
assertEquals(AppInfoParser.getVersion(), xformValue.version());
assertEquals(xformKey.version(), xformValue.version());
}
}

6
connect/transforms/src/test/java/org/apache/kafka/connect/transforms/InsertHeaderTest.java

@ -17,6 +17,7 @@ @@ -17,6 +17,7 @@
package org.apache.kafka.connect.transforms;
import org.apache.kafka.common.config.ConfigException;
import org.apache.kafka.common.utils.AppInfoParser;
import org.apache.kafka.connect.data.Schema;
import org.apache.kafka.connect.header.ConnectHeaders;
import org.apache.kafka.connect.header.Headers;
@ -116,5 +117,10 @@ public class InsertHeaderTest { @@ -116,5 +117,10 @@ public class InsertHeaderTest {
return new SourceRecord(sourcePartition, sourceOffset, topic, partition,
keySchema, key, valueSchema, value, timestamp, headers);
}
@Test
public void testInsertHeaderVersionRetrievedFromAppInfoParser() {
assertEquals(AppInfoParser.getVersion(), xform.version());
}
}

7
connect/transforms/src/test/java/org/apache/kafka/connect/transforms/MaskFieldTest.java

@ -17,6 +17,7 @@ @@ -17,6 +17,7 @@
package org.apache.kafka.connect.transforms;
import org.apache.kafka.common.config.ConfigException;
import org.apache.kafka.common.utils.AppInfoParser;
import org.apache.kafka.connect.data.Decimal;
import org.apache.kafka.connect.data.Field;
import org.apache.kafka.connect.data.Schema;
@ -265,4 +266,10 @@ public class MaskFieldTest { @@ -265,4 +266,10 @@ public class MaskFieldTest {
actualMap.put("k", "v");
assertEquals(Collections.singletonMap("k", "v"), actualMap);
}
@Test
public void testMaskFieldReturnsVersionFromAppInfoParser() {
final MaskField<SinkRecord> xform = new MaskField.Value<>();
assertEquals(AppInfoParser.getVersion(), xform.version());
}
}

7
connect/transforms/src/test/java/org/apache/kafka/connect/transforms/RegexRouterTest.java

@ -16,6 +16,7 @@ @@ -16,6 +16,7 @@
*/
package org.apache.kafka.connect.transforms;
import org.apache.kafka.common.utils.AppInfoParser;
import org.apache.kafka.connect.sink.SinkRecord;
import org.junit.jupiter.api.Test;
@ -67,4 +68,10 @@ public class RegexRouterTest { @@ -67,4 +68,10 @@ public class RegexRouterTest {
assertEquals("index", apply("(.*)-(\\d\\d\\d\\d\\d\\d\\d\\d)", "$1", "index-20160117"));
}
@Test
public void testRegexRouterRetrievesVersionFromAppInfoParser() {
final RegexRouter<SinkRecord> router = new RegexRouter<>();
assertEquals(AppInfoParser.getVersion(), router.version());
}
}

6
connect/transforms/src/test/java/org/apache/kafka/connect/transforms/ReplaceFieldTest.java

@ -16,6 +16,7 @@ @@ -16,6 +16,7 @@
*/
package org.apache.kafka.connect.transforms;
import org.apache.kafka.common.utils.AppInfoParser;
import org.apache.kafka.connect.data.Schema;
import org.apache.kafka.connect.data.SchemaBuilder;
import org.apache.kafka.connect.data.Struct;
@ -169,4 +170,9 @@ public class ReplaceFieldTest { @@ -169,4 +170,9 @@ public class ReplaceFieldTest {
assertEquals(true, updatedValue.get("bar"));
assertEquals("etc", updatedValue.get("etc"));
}
@Test
public void testReplaceFieldVersionRetrievedFromAppInfoParser() {
assertEquals(AppInfoParser.getVersion(), xform.version());
}
}

6
connect/transforms/src/test/java/org/apache/kafka/connect/transforms/SetSchemaMetadataTest.java

@ -16,6 +16,7 @@ @@ -16,6 +16,7 @@
*/
package org.apache.kafka.connect.transforms;
import org.apache.kafka.common.utils.AppInfoParser;
import org.apache.kafka.connect.data.Field;
import org.apache.kafka.connect.data.Schema;
import org.apache.kafka.connect.data.SchemaBuilder;
@ -157,6 +158,11 @@ public class SetSchemaMetadataTest { @@ -157,6 +158,11 @@ public class SetSchemaMetadataTest {
assertNull(updatedValue);
}
@Test
public void testSchemaMetadataVersionRetrievedFromAppInfoParser() {
assertEquals(AppInfoParser.getVersion(), xform.version());
}
protected void assertMatchingSchema(Struct value, Schema schema) {
assertSame(schema, value.schema());
assertEquals(schema.name(), value.schema().name());

8
connect/transforms/src/test/java/org/apache/kafka/connect/transforms/TimestampConverterTest.java

@ -18,6 +18,7 @@ @@ -18,6 +18,7 @@
package org.apache.kafka.connect.transforms;
import org.apache.kafka.common.config.ConfigException;
import org.apache.kafka.common.utils.AppInfoParser;
import org.apache.kafka.connect.data.Date;
import org.apache.kafka.connect.data.Schema;
import org.apache.kafka.connect.data.SchemaBuilder;
@ -681,6 +682,13 @@ public class TimestampConverterTest { @@ -681,6 +682,13 @@ public class TimestampConverterTest {
assertEquals(DATE_PLUS_TIME.getTime(), transformed.key());
}
@Test
public void testTimestampConverterVersionRetrievedFromAppInfoParser() {
assertEquals(AppInfoParser.getVersion(), xformKey.version());
assertEquals(AppInfoParser.getVersion(), xformValue.version());
assertEquals(xformKey.version(), xformValue.version());
}
private SourceRecord createRecordWithSchema(Schema schema, Object value) {
return new SourceRecord(null, null, "topic", 0, schema, value);
}

15
connect/transforms/src/test/java/org/apache/kafka/connect/transforms/TimestampRouterTest.java

@ -16,8 +16,10 @@ @@ -16,8 +16,10 @@
*/
package org.apache.kafka.connect.transforms;
import org.apache.kafka.common.utils.AppInfoParser;
import org.apache.kafka.connect.source.SourceRecord;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import java.util.Collections;
@ -25,7 +27,12 @@ import java.util.Collections; @@ -25,7 +27,12 @@ import java.util.Collections;
import static org.junit.jupiter.api.Assertions.assertEquals;
public class TimestampRouterTest {
private final TimestampRouter<SourceRecord> xform = new TimestampRouter<>();
private TimestampRouter<SourceRecord> xform;
@BeforeEach
public void setup() {
xform = new TimestampRouter<>();
xform.configure(Collections.emptyMap()); // defaults
}
@AfterEach
public void teardown() {
@ -34,7 +41,6 @@ public class TimestampRouterTest { @@ -34,7 +41,6 @@ public class TimestampRouterTest {
@Test
public void defaultConfiguration() {
xform.configure(Collections.emptyMap()); // defaults
final SourceRecord record = new SourceRecord(
null, null,
"test", 0,
@ -45,4 +51,9 @@ public class TimestampRouterTest { @@ -45,4 +51,9 @@ public class TimestampRouterTest {
assertEquals("test-20170103", xform.apply(record).topic());
}
@Test
public void testTimestampRouterVersionRetrievedFromAppInfoParser() {
assertEquals(AppInfoParser.getVersion(), xform.version());
}
}

6
connect/transforms/src/test/java/org/apache/kafka/connect/transforms/ValueToKeyTest.java

@ -16,6 +16,7 @@ @@ -16,6 +16,7 @@
*/
package org.apache.kafka.connect.transforms;
import org.apache.kafka.common.utils.AppInfoParser;
import org.apache.kafka.connect.data.Schema;
import org.apache.kafka.connect.data.SchemaBuilder;
import org.apache.kafka.connect.data.Struct;
@ -106,4 +107,9 @@ public class ValueToKeyTest { @@ -106,4 +107,9 @@ public class ValueToKeyTest {
DataException actual = assertThrows(DataException.class, () -> xform.apply(record));
assertEquals("Field does not exist: not_exist", actual.getMessage());
}
@Test
public void testValueToKeyVersionRetrievedFromAppInfoParser() {
assertEquals(AppInfoParser.getVersion(), xform.version());
}
}

Loading…
Cancel
Save