diff --git a/connect/api/src/main/java/org/apache/kafka/connect/storage/SimpleHeaderConverter.java b/connect/api/src/main/java/org/apache/kafka/connect/storage/SimpleHeaderConverter.java
index 9aaed75f2bf..685905e8ead 100644
--- a/connect/api/src/main/java/org/apache/kafka/connect/storage/SimpleHeaderConverter.java
+++ b/connect/api/src/main/java/org/apache/kafka/connect/storage/SimpleHeaderConverter.java
@@ -17,6 +17,8 @@
package org.apache.kafka.connect.storage;
import org.apache.kafka.common.config.ConfigDef;
+import org.apache.kafka.common.utils.AppInfoParser;
+import org.apache.kafka.connect.components.Versioned;
import org.apache.kafka.connect.data.Schema;
import org.apache.kafka.connect.data.SchemaAndValue;
import org.apache.kafka.connect.data.Values;
@@ -34,13 +36,18 @@ import java.util.NoSuchElementException;
* A {@link HeaderConverter} that serializes header values as strings and that deserializes header values to the most appropriate
* numeric, boolean, array, or map representation. Schemas are not serialized, but are inferred upon deserialization when possible.
*/
-public class SimpleHeaderConverter implements HeaderConverter {
+public class SimpleHeaderConverter implements HeaderConverter, Versioned {
private static final Logger LOG = LoggerFactory.getLogger(SimpleHeaderConverter.class);
private static final ConfigDef CONFIG_DEF = new ConfigDef();
private static final SchemaAndValue NULL_SCHEMA_AND_VALUE = new SchemaAndValue(null, null);
private static final Charset UTF_8 = StandardCharsets.UTF_8;
+ @Override
+ public String version() {
+ return AppInfoParser.getVersion();
+ }
+
@Override
public ConfigDef config() {
return CONFIG_DEF;
diff --git a/connect/api/src/main/java/org/apache/kafka/connect/storage/StringConverter.java b/connect/api/src/main/java/org/apache/kafka/connect/storage/StringConverter.java
index 59391c11acc..35322669e40 100644
--- a/connect/api/src/main/java/org/apache/kafka/connect/storage/StringConverter.java
+++ b/connect/api/src/main/java/org/apache/kafka/connect/storage/StringConverter.java
@@ -20,7 +20,9 @@ import org.apache.kafka.common.config.ConfigDef;
import org.apache.kafka.common.errors.SerializationException;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.apache.kafka.common.serialization.StringSerializer;
+import org.apache.kafka.common.utils.AppInfoParser;
import org.apache.kafka.common.utils.Utils;
+import org.apache.kafka.connect.components.Versioned;
import org.apache.kafka.connect.data.Schema;
import org.apache.kafka.connect.data.SchemaAndValue;
import org.apache.kafka.connect.errors.DataException;
@@ -40,7 +42,7 @@ import java.util.Map;
*
* This implementation currently does nothing with the topic names or header keys.
*/
-public class StringConverter implements Converter, HeaderConverter {
+public class StringConverter implements Converter, HeaderConverter, Versioned {
private final StringSerializer serializer = new StringSerializer();
private final StringDeserializer deserializer = new StringDeserializer();
@@ -48,6 +50,11 @@ public class StringConverter implements Converter, HeaderConverter {
public StringConverter() {
}
+ @Override
+ public String version() {
+ return AppInfoParser.getVersion();
+ }
+
@Override
public ConfigDef config() {
return StringConverterConfig.configDef();
diff --git a/connect/api/src/test/java/org/apache/kafka/connect/storage/SimpleHeaderConverterTest.java b/connect/api/src/test/java/org/apache/kafka/connect/storage/SimpleHeaderConverterTest.java
index 8c9306c113a..d13b53e74d0 100644
--- a/connect/api/src/test/java/org/apache/kafka/connect/storage/SimpleHeaderConverterTest.java
+++ b/connect/api/src/test/java/org/apache/kafka/connect/storage/SimpleHeaderConverterTest.java
@@ -16,6 +16,7 @@
*/
package org.apache.kafka.connect.storage;
+import org.apache.kafka.common.utils.AppInfoParser;
import org.apache.kafka.connect.data.Schema;
import org.apache.kafka.connect.data.SchemaAndValue;
import org.apache.kafka.connect.data.SchemaBuilder;
@@ -205,6 +206,11 @@ public class SimpleHeaderConverterTest {
assertEquals(list, result.value());
}
+ @Test
+ public void converterShouldReturnAppInfoParserVersion() {
+ assertEquals(AppInfoParser.getVersion(), converter.version());
+ }
+
protected SchemaAndValue roundTrip(Schema schema, Object input) {
byte[] serialized = converter.fromConnectHeader(TOPIC, HEADER, schema, input);
return converter.toConnectHeader(TOPIC, HEADER, serialized);
diff --git a/connect/api/src/test/java/org/apache/kafka/connect/storage/StringConverterTest.java b/connect/api/src/test/java/org/apache/kafka/connect/storage/StringConverterTest.java
index 4c57d584a84..f6e9bdbfa16 100644
--- a/connect/api/src/test/java/org/apache/kafka/connect/storage/StringConverterTest.java
+++ b/connect/api/src/test/java/org/apache/kafka/connect/storage/StringConverterTest.java
@@ -16,6 +16,7 @@
*/
package org.apache.kafka.connect.storage;
+import org.apache.kafka.common.utils.AppInfoParser;
import org.apache.kafka.common.utils.Utils;
import org.apache.kafka.connect.data.Schema;
import org.apache.kafka.connect.data.SchemaAndValue;
@@ -99,4 +100,9 @@ public class StringConverterTest {
public void testNullHeaderValueToBytes() {
assertNull(converter.fromConnectHeader(TOPIC, "hdr", Schema.OPTIONAL_STRING_SCHEMA, null));
}
+
+ @Test
+ public void testInheritedVersionRetrievedFromAppInfoParser() {
+ assertEquals(AppInfoParser.getVersion(), converter.version());
+ }
}
diff --git a/connect/json/src/main/java/org/apache/kafka/connect/json/JsonConverter.java b/connect/json/src/main/java/org/apache/kafka/connect/json/JsonConverter.java
index ae5a6989b2a..105a805b49c 100644
--- a/connect/json/src/main/java/org/apache/kafka/connect/json/JsonConverter.java
+++ b/connect/json/src/main/java/org/apache/kafka/connect/json/JsonConverter.java
@@ -26,7 +26,9 @@ import org.apache.kafka.common.cache.LRUCache;
import org.apache.kafka.common.cache.SynchronizedCache;
import org.apache.kafka.common.config.ConfigDef;
import org.apache.kafka.common.errors.SerializationException;
+import org.apache.kafka.common.utils.AppInfoParser;
import org.apache.kafka.common.utils.Utils;
+import org.apache.kafka.connect.components.Versioned;
import org.apache.kafka.connect.data.SchemaBuilder;
import org.apache.kafka.connect.data.Schema;
import org.apache.kafka.connect.data.Struct;
@@ -62,7 +64,7 @@ import static org.apache.kafka.common.utils.Utils.mkSet;
*
* This implementation currently does nothing with the topic names or header keys.
*/
-public class JsonConverter implements Converter, HeaderConverter {
+public class JsonConverter implements Converter, HeaderConverter, Versioned {
private static final Map TO_CONNECT_CONVERTERS = new EnumMap<>(Schema.Type.class);
@@ -258,6 +260,11 @@ public class JsonConverter implements Converter, HeaderConverter {
return toConnectSchemaCache.size();
}
+ @Override
+ public String version() {
+ return AppInfoParser.getVersion();
+ }
+
@Override
public ConfigDef config() {
return JsonConverterConfig.configDef();
diff --git a/connect/json/src/test/java/org/apache/kafka/connect/json/JsonConverterTest.java b/connect/json/src/test/java/org/apache/kafka/connect/json/JsonConverterTest.java
index f9bc2c67766..f6e9341e231 100644
--- a/connect/json/src/test/java/org/apache/kafka/connect/json/JsonConverterTest.java
+++ b/connect/json/src/test/java/org/apache/kafka/connect/json/JsonConverterTest.java
@@ -21,6 +21,7 @@ import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.node.ArrayNode;
import com.fasterxml.jackson.databind.node.JsonNodeFactory;
+import org.apache.kafka.common.utils.AppInfoParser;
import org.apache.kafka.common.utils.Utils;
import org.apache.kafka.connect.data.Date;
import org.apache.kafka.connect.data.Decimal;
@@ -969,6 +970,11 @@ public class JsonConverterTest {
assertEquals(new Struct(structSchema), sav.value());
}
+ @Test
+ public void testVersionRetrievedFromAppInfoParser() {
+ assertEquals(AppInfoParser.getVersion(), converter.version());
+ }
+
private JsonNode parse(byte[] json) {
try {
return objectMapper.readTree(json);
diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/connector/policy/AbstractConnectorClientConfigOverridePolicy.java b/connect/runtime/src/main/java/org/apache/kafka/connect/connector/policy/AbstractConnectorClientConfigOverridePolicy.java
index 48d2208b53e..3c419663a42 100644
--- a/connect/runtime/src/main/java/org/apache/kafka/connect/connector/policy/AbstractConnectorClientConfigOverridePolicy.java
+++ b/connect/runtime/src/main/java/org/apache/kafka/connect/connector/policy/AbstractConnectorClientConfigOverridePolicy.java
@@ -18,13 +18,20 @@
package org.apache.kafka.connect.connector.policy;
import org.apache.kafka.common.config.ConfigValue;
+import org.apache.kafka.common.utils.AppInfoParser;
+import org.apache.kafka.connect.components.Versioned;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;
-public abstract class AbstractConnectorClientConfigOverridePolicy implements ConnectorClientConfigOverridePolicy {
+public abstract class AbstractConnectorClientConfigOverridePolicy implements ConnectorClientConfigOverridePolicy, Versioned {
+
+ @Override
+ public String version() {
+ return AppInfoParser.getVersion();
+ }
@Override
public void close() {
diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/converters/ByteArrayConverter.java b/connect/runtime/src/main/java/org/apache/kafka/connect/converters/ByteArrayConverter.java
index 65009b5cb5f..6d17873d072 100644
--- a/connect/runtime/src/main/java/org/apache/kafka/connect/converters/ByteArrayConverter.java
+++ b/connect/runtime/src/main/java/org/apache/kafka/connect/converters/ByteArrayConverter.java
@@ -18,6 +18,8 @@
package org.apache.kafka.connect.converters;
import org.apache.kafka.common.config.ConfigDef;
+import org.apache.kafka.common.utils.AppInfoParser;
+import org.apache.kafka.connect.components.Versioned;
import org.apache.kafka.connect.data.Schema;
import org.apache.kafka.connect.data.SchemaAndValue;
import org.apache.kafka.connect.errors.DataException;
@@ -32,10 +34,13 @@ import java.util.Map;
*
* This implementation currently does nothing with the topic names or header keys.
*/
-public class ByteArrayConverter implements Converter, HeaderConverter {
+public class ByteArrayConverter implements Converter, HeaderConverter, Versioned {
private static final ConfigDef CONFIG_DEF = ConverterConfig.newConfigDef();
-
+ @Override
+ public String version() {
+ return AppInfoParser.getVersion();
+ }
@Override
public ConfigDef config() {
return CONFIG_DEF;
diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/converters/NumberConverter.java b/connect/runtime/src/main/java/org/apache/kafka/connect/converters/NumberConverter.java
index bf0ed1a7834..9ab569bc4fe 100644
--- a/connect/runtime/src/main/java/org/apache/kafka/connect/converters/NumberConverter.java
+++ b/connect/runtime/src/main/java/org/apache/kafka/connect/converters/NumberConverter.java
@@ -20,7 +20,9 @@ import org.apache.kafka.common.config.ConfigDef;
import org.apache.kafka.common.errors.SerializationException;
import org.apache.kafka.common.serialization.Deserializer;
import org.apache.kafka.common.serialization.Serializer;
+import org.apache.kafka.common.utils.AppInfoParser;
import org.apache.kafka.common.utils.Utils;
+import org.apache.kafka.connect.components.Versioned;
import org.apache.kafka.connect.data.Schema;
import org.apache.kafka.connect.data.SchemaAndValue;
import org.apache.kafka.connect.errors.DataException;
@@ -39,7 +41,7 @@ import java.util.Map;
*
* This implementation currently does nothing with the topic names or header keys.
*/
-abstract class NumberConverter implements Converter, HeaderConverter {
+abstract class NumberConverter implements Converter, HeaderConverter, Versioned {
private final Serializer serializer;
private final Deserializer deserializer;
@@ -65,6 +67,10 @@ abstract class NumberConverter implements Converter, HeaderCon
assert this.schema != null;
}
+ @Override
+ public String version() {
+ return AppInfoParser.getVersion();
+ }
@Override
public ConfigDef config() {
return NumberConverterConfig.configDef();
diff --git a/connect/runtime/src/test/java/org/apache/kafka/connect/converters/ByteArrayConverterTest.java b/connect/runtime/src/test/java/org/apache/kafka/connect/converters/ByteArrayConverterTest.java
index f68716e4edc..4c1439cf358 100644
--- a/connect/runtime/src/test/java/org/apache/kafka/connect/converters/ByteArrayConverterTest.java
+++ b/connect/runtime/src/test/java/org/apache/kafka/connect/converters/ByteArrayConverterTest.java
@@ -17,6 +17,7 @@
package org.apache.kafka.connect.converters;
+import org.apache.kafka.common.utils.AppInfoParser;
import org.apache.kafka.connect.data.Schema;
import org.apache.kafka.connect.data.SchemaAndValue;
import org.apache.kafka.connect.errors.DataException;
@@ -88,4 +89,9 @@ public class ByteArrayConverterTest {
assertEquals(Schema.OPTIONAL_BYTES_SCHEMA, data.schema());
assertNull(data.value());
}
+
+ @Test
+ public void testVersionRetrievedFromAppInfoParser() {
+ assertEquals(AppInfoParser.getVersion(), converter.version());
+ }
}
diff --git a/connect/runtime/src/test/java/org/apache/kafka/connect/converters/NumberConverterTest.java b/connect/runtime/src/test/java/org/apache/kafka/connect/converters/NumberConverterTest.java
index d377c69a84a..f57fd73d458 100644
--- a/connect/runtime/src/test/java/org/apache/kafka/connect/converters/NumberConverterTest.java
+++ b/connect/runtime/src/test/java/org/apache/kafka/connect/converters/NumberConverterTest.java
@@ -17,6 +17,7 @@
package org.apache.kafka.connect.converters;
import org.apache.kafka.common.serialization.Serializer;
+import org.apache.kafka.common.utils.AppInfoParser;
import org.apache.kafka.connect.data.Schema;
import org.apache.kafka.connect.data.SchemaAndValue;
import org.apache.kafka.connect.errors.DataException;
@@ -104,4 +105,9 @@ public abstract class NumberConverterTest {
assertEquals(schema(), data.schema());
assertNull(data.value());
}
+
+ @Test
+ public void testInheritedVersionRetrievedFromAppInfoParser() {
+ assertEquals(AppInfoParser.getVersion(), converter.version());
+ }
}
diff --git a/connect/runtime/src/test/java/org/apache/kafka/connect/integration/ErrorHandlingIntegrationTest.java b/connect/runtime/src/test/java/org/apache/kafka/connect/integration/ErrorHandlingIntegrationTest.java
index 0dab3714f6d..7d3c1d6924b 100644
--- a/connect/runtime/src/test/java/org/apache/kafka/connect/integration/ErrorHandlingIntegrationTest.java
+++ b/connect/runtime/src/test/java/org/apache/kafka/connect/integration/ErrorHandlingIntegrationTest.java
@@ -20,6 +20,7 @@ import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.common.config.ConfigDef;
import org.apache.kafka.common.header.Headers;
+import org.apache.kafka.connect.components.Versioned;
import org.apache.kafka.connect.connector.ConnectRecord;
import org.apache.kafka.connect.errors.RetriableException;
import org.apache.kafka.connect.runtime.rest.entities.ConnectorStateInfo;
@@ -282,7 +283,7 @@ public class ErrorHandlingIntegrationTest {
assertEquals(expected, new String(actual));
}
- public static class FaultyPassthrough> implements Transformation {
+ public static class FaultyPassthrough> implements Transformation, Versioned {
static final ConfigDef CONFIG_DEF = new ConfigDef();
@@ -299,6 +300,11 @@ public class ErrorHandlingIntegrationTest {
private boolean shouldFail = true;
+ @Override
+ public String version() {
+ return "1.0";
+ }
+
@Override
public R apply(R record) {
String badValRetriable = "value-" + BAD_RECORD_VAL_RETRIABLE;
diff --git a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/ConnectorConfigTest.java b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/ConnectorConfigTest.java
index d8c071e6c2f..c113039912e 100644
--- a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/ConnectorConfigTest.java
+++ b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/ConnectorConfigTest.java
@@ -18,6 +18,7 @@ package org.apache.kafka.connect.runtime;
import org.apache.kafka.common.config.ConfigDef;
import org.apache.kafka.common.config.ConfigException;
+import org.apache.kafka.connect.components.Versioned;
import org.apache.kafka.connect.connector.ConnectRecord;
import org.apache.kafka.connect.connector.Connector;
import org.apache.kafka.connect.runtime.isolation.PluginDesc;
@@ -54,10 +55,15 @@ public class ConnectorConfigTest> {
public static abstract class TestConnector extends Connector {
}
- public static class SimpleTransformation> implements Transformation {
+ public static class SimpleTransformation> implements Transformation, Versioned {
int magicNumber = 0;
+ @Override
+ public String version() {
+ return "1.0";
+ }
+
@Override
public void configure(Map props) {
magicNumber = Integer.parseInt((String) props.get("magic.number"));
@@ -393,22 +399,37 @@ public class ConnectorConfigTest> {
}
}
- public static abstract class AbstractTestPredicate> implements Predicate {
+ public static abstract class AbstractTestPredicate> implements Predicate, Versioned {
+
+ @Override
+ public String version() {
+ return "1.0";
+ }
public AbstractTestPredicate() { }
}
- public static abstract class AbstractTransformation> implements Transformation {
+ public static abstract class AbstractTransformation> implements Transformation, Versioned {
+
+ @Override
+ public String version() {
+ return "1.0";
+ }
}
- public static abstract class AbstractKeyValueTransformation> implements Transformation {
+ public static abstract class AbstractKeyValueTransformation> implements Transformation, Versioned {
@Override
public R apply(R record) {
return null;
}
+ @Override
+ public String version() {
+ return "1.0";
+ }
+
@Override
public ConfigDef config() {
return new ConfigDef();
@@ -425,8 +446,12 @@ public class ConnectorConfigTest> {
}
- public static class Key> extends AbstractKeyValueTransformation {
+ public static class Key> extends AbstractKeyValueTransformation implements Versioned {
+ @Override
+ public String version() {
+ return "1.0";
+ }
}
public static class Value> extends AbstractKeyValueTransformation {
@@ -454,7 +479,7 @@ public class ConnectorConfigTest> {
assertEquals(prefix + keyName + "' config should be a " + expectedType, expectedType, configKey.type);
}
- public static class HasDuplicateConfigTransformation> implements Transformation {
+ public static class HasDuplicateConfigTransformation> implements Transformation, Versioned {
private static final String MUST_EXIST_KEY = "must.exist.key";
private static final ConfigDef CONFIG_DEF = new ConfigDef()
// this configDef is duplicate. It should be removed automatically so as to avoid duplicate config error.
@@ -469,6 +494,11 @@ public class ConnectorConfigTest> {
return record;
}
+ @Override
+ public String version() {
+ return "1.0";
+ }
+
@Override
public ConfigDef config() {
return CONFIG_DEF;
diff --git a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/ErrorHandlingTaskTest.java b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/ErrorHandlingTaskTest.java
index 3037803f6c5..18c3a1cdae1 100644
--- a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/ErrorHandlingTaskTest.java
+++ b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/ErrorHandlingTaskTest.java
@@ -26,6 +26,7 @@ import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.config.ConfigDef;
import org.apache.kafka.common.utils.MockTime;
import org.apache.kafka.common.utils.Time;
+import org.apache.kafka.connect.components.Versioned;
import org.apache.kafka.connect.connector.ConnectRecord;
import org.apache.kafka.connect.data.Schema;
import org.apache.kafka.connect.data.SchemaBuilder;
@@ -553,7 +554,7 @@ public class ErrorHandlingTaskTest {
}
// Public to allow plugin discovery to complete without errors
- public static class FaultyConverter extends JsonConverter {
+ public static class FaultyConverter extends JsonConverter implements Versioned {
private static final Logger log = LoggerFactory.getLogger(FaultyConverter.class);
private int invocations = 0;
@@ -570,10 +571,15 @@ public class ErrorHandlingTaskTest {
throw new RetriableException("Bad invocations " + invocations + " for mod 3");
}
}
+
+ @Override
+ public String version() {
+ return "1.0";
+ }
}
// Public to allow plugin discovery to complete without errors
- public static class FaultyPassthrough> implements Transformation {
+ public static class FaultyPassthrough> implements Transformation, Versioned {
private static final Logger log = LoggerFactory.getLogger(FaultyPassthrough.class);
@@ -599,6 +605,11 @@ public class ErrorHandlingTaskTest {
}
}
+ @Override
+ public String version() {
+ return "1.0";
+ }
+
@Override
public ConfigDef config() {
return CONFIG_DEF;
diff --git a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/SampleConverterWithHeaders.java b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/SampleConverterWithHeaders.java
index 3927ba75161..56250742558 100644
--- a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/SampleConverterWithHeaders.java
+++ b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/SampleConverterWithHeaders.java
@@ -20,6 +20,8 @@ import java.io.UnsupportedEncodingException;
import java.util.Map;
import org.apache.kafka.common.header.Header;
import org.apache.kafka.common.header.Headers;
+import org.apache.kafka.common.utils.AppInfoParser;
+import org.apache.kafka.connect.components.Versioned;
import org.apache.kafka.connect.data.Schema;
import org.apache.kafka.connect.data.SchemaAndValue;
import org.apache.kafka.connect.errors.DataException;
@@ -28,9 +30,14 @@ import org.apache.kafka.connect.storage.Converter;
/**
* This is a simple Converter implementation that uses "encoding" header to encode/decode strings via provided charset name
*/
-public class SampleConverterWithHeaders implements Converter {
+public class SampleConverterWithHeaders implements Converter, Versioned {
private static final String HEADER_ENCODING = "encoding";
+ @Override
+ public String version() {
+ return AppInfoParser.getVersion();
+ }
+
@Override
public void configure(Map configs, boolean isKey) {
diff --git a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/SampleHeaderConverter.java b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/SampleHeaderConverter.java
index ed11360a720..3491fde3988 100644
--- a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/SampleHeaderConverter.java
+++ b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/SampleHeaderConverter.java
@@ -17,6 +17,8 @@
package org.apache.kafka.connect.runtime;
import org.apache.kafka.common.config.ConfigDef;
+import org.apache.kafka.common.utils.AppInfoParser;
+import org.apache.kafka.connect.components.Versioned;
import org.apache.kafka.connect.data.Schema;
import org.apache.kafka.connect.data.SchemaAndValue;
import org.apache.kafka.connect.storage.HeaderConverter;
@@ -24,12 +26,17 @@ import org.apache.kafka.connect.storage.HeaderConverter;
import java.io.IOException;
import java.util.Map;
-public class SampleHeaderConverter implements HeaderConverter {
+public class SampleHeaderConverter implements HeaderConverter, Versioned {
@Override
public SchemaAndValue toConnectHeader(String topic, String headerKey, byte[] value) {
return null;
}
+ @Override
+ public String version() {
+ return AppInfoParser.getVersion();
+ }
+
@Override
public byte[] fromConnectHeader(String topic, String headerKey, Schema schema, Object value) {
return new byte[0];
diff --git a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/SamplePredicate.java b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/SamplePredicate.java
index 90dfba753c5..4d72df35ca3 100644
--- a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/SamplePredicate.java
+++ b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/SamplePredicate.java
@@ -17,12 +17,14 @@
package org.apache.kafka.connect.runtime;
import org.apache.kafka.common.config.ConfigDef;
+import org.apache.kafka.common.utils.AppInfoParser;
+import org.apache.kafka.connect.components.Versioned;
import org.apache.kafka.connect.source.SourceRecord;
import org.apache.kafka.connect.transforms.predicates.Predicate;
import java.util.Map;
-public class SamplePredicate implements Predicate {
+public class SamplePredicate implements Predicate, Versioned {
private boolean testResult;
boolean closed = false;
@@ -33,6 +35,11 @@ public class SamplePredicate implements Predicate {
this.testResult = testResult;
}
+ @Override
+ public String version() {
+ return AppInfoParser.getVersion();
+ }
+
@Override
public ConfigDef config() {
return new ConfigDef()
diff --git a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/SampleTransformation.java b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/SampleTransformation.java
index b9043599e98..443b488ef3b 100644
--- a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/SampleTransformation.java
+++ b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/SampleTransformation.java
@@ -17,12 +17,14 @@
package org.apache.kafka.connect.runtime;
import org.apache.kafka.common.config.ConfigDef;
+import org.apache.kafka.common.utils.AppInfoParser;
+import org.apache.kafka.connect.components.Versioned;
import org.apache.kafka.connect.connector.ConnectRecord;
import org.apache.kafka.connect.transforms.Transformation;
import java.util.Map;
-public class SampleTransformation> implements Transformation {
+public class SampleTransformation> implements Transformation, Versioned {
boolean closed = false;
private R transformedRecord;
@@ -38,6 +40,11 @@ public class SampleTransformation> implements Transfo
return transformedRecord;
}
+ @Override
+ public String version() {
+ return AppInfoParser.getVersion();
+ }
+
@Override
public ConfigDef config() {
return new ConfigDef()
diff --git a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/isolation/PluginScannerTest.java b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/isolation/PluginScannerTest.java
index d6a85b294cf..cd0ee9532c0 100644
--- a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/isolation/PluginScannerTest.java
+++ b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/isolation/PluginScannerTest.java
@@ -17,6 +17,7 @@
package org.apache.kafka.connect.runtime.isolation;
+import org.junit.BeforeClass;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.TemporaryFolder;
@@ -34,6 +35,7 @@ import java.util.Set;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.assertFalse;
@RunWith(Parameterized.class)
public class PluginScannerTest {
@@ -67,6 +69,12 @@ public class PluginScannerTest {
}
}
+ @BeforeClass
+ public static void setUp() {
+ // Work around a circular-dependency in TestPlugins.
+ TestPlugins.pluginPath();
+ }
+
@Test
public void testScanningEmptyPluginPath() {
PluginScanResult result = scan(
@@ -145,6 +153,21 @@ public class PluginScannerTest {
assertEquals(expectedClasses, classes);
}
+ @Test
+ public void testNonVersionedPluginHasUndefinedVersion() {
+ PluginScanResult unversionedPluginsResult = scan(TestPlugins.pluginPath(TestPlugins.TestPlugin.SAMPLING_HEADER_CONVERTER));
+ assertFalse(unversionedPluginsResult.isEmpty());
+ unversionedPluginsResult.forEach(pluginDesc -> assertEquals(PluginDesc.UNDEFINED_VERSION, pluginDesc.version()));
+ }
+
+ @Test
+ public void testVersionedPluginsHasVersion() {
+ PluginScanResult versionedPluginResult = scan(TestPlugins.pluginPath(TestPlugins.TestPlugin.READ_VERSION_FROM_RESOURCE_V1));
+ assertFalse(versionedPluginResult.isEmpty());
+ versionedPluginResult.forEach(pluginDesc -> assertEquals("1.0.0", pluginDesc.version()));
+
+ }
+
private PluginScanResult scan(Set pluginLocations) {
ClassLoaderFactory factory = new ClassLoaderFactory();
Set pluginSources = PluginUtils.pluginSources(pluginLocations, PluginScannerTest.class.getClassLoader(), factory);
diff --git a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/isolation/PluginUtilsTest.java b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/isolation/PluginUtilsTest.java
index 4dd168a3faf..524c419f5c5 100644
--- a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/isolation/PluginUtilsTest.java
+++ b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/isolation/PluginUtilsTest.java
@@ -17,6 +17,7 @@
package org.apache.kafka.connect.runtime.isolation;
import org.apache.kafka.common.config.ConfigDef;
+import org.apache.kafka.connect.components.Versioned;
import org.apache.kafka.connect.connector.ConnectRecord;
import org.apache.kafka.connect.data.Schema;
import org.apache.kafka.connect.data.SchemaAndValue;
@@ -609,7 +610,7 @@ public class PluginUtilsTest {
assertEquals(expectedAliases, actualAliases);
}
- public static class CollidingConverter implements Converter {
+ public static class CollidingConverter implements Converter, Versioned {
@Override
public void configure(Map configs, boolean isKey) {
}
@@ -623,9 +624,14 @@ public class PluginUtilsTest {
public SchemaAndValue toConnectData(String topic, byte[] value) {
return null;
}
+
+ @Override
+ public String version() {
+ return "1.0";
+ }
}
- public static class CollidingHeaderConverter implements HeaderConverter {
+ public static class CollidingHeaderConverter implements HeaderConverter, Versioned {
@Override
public SchemaAndValue toConnectHeader(String topic, String headerKey, byte[] value) {
@@ -649,9 +655,19 @@ public class PluginUtilsTest {
@Override
public void configure(Map configs) {
}
+
+ @Override
+ public String version() {
+ return "1.0";
+ }
}
- public static class Colliding> implements Transformation {
+ public static class Colliding> implements Transformation, Versioned {
+
+ @Override
+ public String version() {
+ return "1.0";
+ }
@Override
public void configure(Map configs) {
diff --git a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/isolation/PluginsTest.java b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/isolation/PluginsTest.java
index 189d75a842a..72fe97c2cb6 100644
--- a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/isolation/PluginsTest.java
+++ b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/isolation/PluginsTest.java
@@ -32,6 +32,7 @@ import org.apache.kafka.common.config.ConfigDef;
import org.apache.kafka.common.config.ConfigException;
import org.apache.kafka.common.config.provider.ConfigProvider;
import org.apache.kafka.common.utils.LogCaptureAppender;
+import org.apache.kafka.connect.components.Versioned;
import org.apache.kafka.connect.connector.policy.AllConnectorClientConfigOverridePolicy;
import org.apache.kafka.connect.connector.policy.ConnectorClientConfigOverridePolicy;
import org.apache.kafka.connect.converters.ByteArrayConverter;
@@ -688,7 +689,7 @@ public class PluginsTest {
}
}
- public static class TestConverter implements Converter, Configurable {
+ public static class TestConverter implements Converter, Configurable, Versioned {
public Map configs;
public ConfigDef config() {
@@ -715,6 +716,11 @@ public class PluginsTest {
public SchemaAndValue toConnectData(String topic, byte[] value) {
return null;
}
+
+ @Override
+ public String version() {
+ return "test";
+ }
}
public static class TestHeaderConverter implements HeaderConverter {
@@ -770,9 +776,14 @@ public class PluginsTest {
}
}
- public static class TestInternalConverter extends JsonConverter {
+ public static class TestInternalConverter extends JsonConverter implements Versioned {
public Map configs;
+ @Override
+ public String version() {
+ return "test";
+ }
+
@Override
public void configure(Map configs) {
this.configs = configs;
diff --git a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/rest/resources/ConnectorPluginsResourceTest.java b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/rest/resources/ConnectorPluginsResourceTest.java
index 78d3fe98276..94304b87810 100644
--- a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/rest/resources/ConnectorPluginsResourceTest.java
+++ b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/rest/resources/ConnectorPluginsResourceTest.java
@@ -129,17 +129,17 @@ public class ConnectorPluginsResourceTest {
SOURCE_CONNECTOR_PLUGINS.add(new PluginDesc<>(SchemaSourceConnector.class, appVersion, PluginType.SOURCE, classLoader));
SOURCE_CONNECTOR_PLUGINS.add(new PluginDesc<>(ConnectorPluginsResourceTestConnector.class, appVersion, PluginType.SOURCE, classLoader));
- CONVERTER_PLUGINS.add(new PluginDesc<>(StringConverter.class, PluginDesc.UNDEFINED_VERSION, PluginType.CONVERTER, classLoader));
- CONVERTER_PLUGINS.add(new PluginDesc<>(LongConverter.class, PluginDesc.UNDEFINED_VERSION, PluginType.CONVERTER, classLoader));
+ CONVERTER_PLUGINS.add(new PluginDesc<>(StringConverter.class, appVersion, PluginType.CONVERTER, classLoader));
+ CONVERTER_PLUGINS.add(new PluginDesc<>(LongConverter.class, appVersion, PluginType.CONVERTER, classLoader));
- HEADER_CONVERTER_PLUGINS.add(new PluginDesc<>(StringConverter.class, PluginDesc.UNDEFINED_VERSION, PluginType.HEADER_CONVERTER, classLoader));
- HEADER_CONVERTER_PLUGINS.add(new PluginDesc<>(LongConverter.class, PluginDesc.UNDEFINED_VERSION, PluginType.HEADER_CONVERTER, classLoader));
+ HEADER_CONVERTER_PLUGINS.add(new PluginDesc<>(StringConverter.class, appVersion, PluginType.HEADER_CONVERTER, classLoader));
+ HEADER_CONVERTER_PLUGINS.add(new PluginDesc<>(LongConverter.class, appVersion, PluginType.HEADER_CONVERTER, classLoader));
- TRANSFORMATION_PLUGINS.add(new PluginDesc<>(RegexRouter.class, PluginDesc.UNDEFINED_VERSION, PluginType.TRANSFORMATION, classLoader));
- TRANSFORMATION_PLUGINS.add(new PluginDesc<>(TimestampConverter.Key.class, PluginDesc.UNDEFINED_VERSION, PluginType.TRANSFORMATION, classLoader));
+ TRANSFORMATION_PLUGINS.add(new PluginDesc<>(RegexRouter.class, appVersion, PluginType.TRANSFORMATION, classLoader));
+ TRANSFORMATION_PLUGINS.add(new PluginDesc<>(TimestampConverter.Key.class, appVersion, PluginType.TRANSFORMATION, classLoader));
- PREDICATE_PLUGINS.add(new PluginDesc<>(HasHeaderKey.class, PluginDesc.UNDEFINED_VERSION, PluginType.PREDICATE, classLoader));
- PREDICATE_PLUGINS.add(new PluginDesc<>(RecordIsTombstone.class, PluginDesc.UNDEFINED_VERSION, PluginType.PREDICATE, classLoader));
+ PREDICATE_PLUGINS.add(new PluginDesc<>(HasHeaderKey.class, appVersion, PluginType.PREDICATE, classLoader));
+ PREDICATE_PLUGINS.add(new PluginDesc<>(RecordIsTombstone.class, appVersion, PluginType.PREDICATE, classLoader));
} catch (Exception e) {
e.printStackTrace();
fail("Failed setting up plugins");
diff --git a/connect/transforms/src/main/java/org/apache/kafka/connect/transforms/Cast.java b/connect/transforms/src/main/java/org/apache/kafka/connect/transforms/Cast.java
index 7166f8c272c..7fd27839c4c 100644
--- a/connect/transforms/src/main/java/org/apache/kafka/connect/transforms/Cast.java
+++ b/connect/transforms/src/main/java/org/apache/kafka/connect/transforms/Cast.java
@@ -22,7 +22,9 @@ import org.apache.kafka.common.cache.LRUCache;
import org.apache.kafka.common.cache.SynchronizedCache;
import org.apache.kafka.common.config.ConfigDef;
import org.apache.kafka.common.config.ConfigException;
+import org.apache.kafka.common.utils.AppInfoParser;
import org.apache.kafka.common.utils.Utils;
+import org.apache.kafka.connect.components.Versioned;
import org.apache.kafka.connect.connector.ConnectRecord;
import org.apache.kafka.connect.data.ConnectSchema;
import org.apache.kafka.connect.data.Date;
@@ -52,7 +54,7 @@ import java.util.Set;
import static org.apache.kafka.connect.transforms.util.Requirements.requireMap;
import static org.apache.kafka.connect.transforms.util.Requirements.requireStruct;
-public abstract class Cast> implements Transformation {
+public abstract class Cast> implements Transformation, Versioned {
private static final Logger log = LoggerFactory.getLogger(Cast.class);
// TODO: Currently we only support top-level field casting. Ideally we could use a dotted notation in the spec to
@@ -110,6 +112,11 @@ public abstract class Cast> implements Transformation
private Schema.Type wholeValueCastType;
private Cache schemaUpdateCache;
+ @Override
+ public String version() {
+ return AppInfoParser.getVersion();
+ }
+
@Override
public void configure(Map props) {
final SimpleConfig config = new SimpleConfig(CONFIG_DEF, props);
diff --git a/connect/transforms/src/main/java/org/apache/kafka/connect/transforms/DropHeaders.java b/connect/transforms/src/main/java/org/apache/kafka/connect/transforms/DropHeaders.java
index 6d1e1a49811..c1d20a48c1d 100644
--- a/connect/transforms/src/main/java/org/apache/kafka/connect/transforms/DropHeaders.java
+++ b/connect/transforms/src/main/java/org/apache/kafka/connect/transforms/DropHeaders.java
@@ -17,6 +17,8 @@
package org.apache.kafka.connect.transforms;
import org.apache.kafka.common.config.ConfigDef;
+import org.apache.kafka.common.utils.AppInfoParser;
+import org.apache.kafka.connect.components.Versioned;
import org.apache.kafka.connect.connector.ConnectRecord;
import org.apache.kafka.connect.header.ConnectHeaders;
import org.apache.kafka.connect.header.Header;
@@ -30,7 +32,7 @@ import java.util.Set;
import static org.apache.kafka.common.config.ConfigDef.NO_DEFAULT_VALUE;
-public class DropHeaders> implements Transformation {
+public class DropHeaders> implements Transformation, Versioned {
public static final String OVERVIEW_DOC =
"Removes one or more headers from each record.";
@@ -57,6 +59,11 @@ public class DropHeaders> implements Transformation> implements Transformation {
+public abstract class ExtractField> implements Transformation, Versioned {
public static final String OVERVIEW_DOC =
"Extract the specified field from a Struct when schema present, or a Map in the case of schemaless data. "
@@ -45,6 +47,11 @@ public abstract class ExtractField> implements Transf
private String fieldName;
+ @Override
+ public String version() {
+ return AppInfoParser.getVersion();
+ }
+
@Override
public void configure(Map props) {
final SimpleConfig config = new SimpleConfig(CONFIG_DEF, props);
diff --git a/connect/transforms/src/main/java/org/apache/kafka/connect/transforms/Filter.java b/connect/transforms/src/main/java/org/apache/kafka/connect/transforms/Filter.java
index d7fb54eaca9..e54531e645a 100644
--- a/connect/transforms/src/main/java/org/apache/kafka/connect/transforms/Filter.java
+++ b/connect/transforms/src/main/java/org/apache/kafka/connect/transforms/Filter.java
@@ -19,6 +19,8 @@ package org.apache.kafka.connect.transforms;
import java.util.Map;
import org.apache.kafka.common.config.ConfigDef;
+import org.apache.kafka.common.utils.AppInfoParser;
+import org.apache.kafka.connect.components.Versioned;
import org.apache.kafka.connect.connector.ConnectRecord;
/**
@@ -27,7 +29,7 @@ import org.apache.kafka.connect.connector.ConnectRecord;
* a particular {@link org.apache.kafka.connect.transforms.predicates.Predicate}.
* @param The type of record.
*/
-public class Filter> implements Transformation {
+public class Filter> implements Transformation, Versioned {
public static final String OVERVIEW_DOC = "Drops all records, filtering them from subsequent transformations in the chain. " +
"This is intended to be used conditionally to filter out records matching (or not matching) " +
@@ -39,6 +41,11 @@ public class Filter> implements Transformation {
return null;
}
+ @Override
+ public String version() {
+ return AppInfoParser.getVersion();
+ }
+
@Override
public ConfigDef config() {
return CONFIG_DEF;
diff --git a/connect/transforms/src/main/java/org/apache/kafka/connect/transforms/Flatten.java b/connect/transforms/src/main/java/org/apache/kafka/connect/transforms/Flatten.java
index 35a57dde1e9..985902a9bd4 100644
--- a/connect/transforms/src/main/java/org/apache/kafka/connect/transforms/Flatten.java
+++ b/connect/transforms/src/main/java/org/apache/kafka/connect/transforms/Flatten.java
@@ -21,6 +21,8 @@ import org.apache.kafka.common.cache.Cache;
import org.apache.kafka.common.cache.LRUCache;
import org.apache.kafka.common.cache.SynchronizedCache;
import org.apache.kafka.common.config.ConfigDef;
+import org.apache.kafka.common.utils.AppInfoParser;
+import org.apache.kafka.connect.components.Versioned;
import org.apache.kafka.connect.connector.ConnectRecord;
import org.apache.kafka.connect.data.ConnectSchema;
import org.apache.kafka.connect.data.Field;
@@ -37,7 +39,7 @@ import java.util.Map;
import static org.apache.kafka.connect.transforms.util.Requirements.requireMap;
import static org.apache.kafka.connect.transforms.util.Requirements.requireStructOrNull;
-public abstract class Flatten> implements Transformation {
+public abstract class Flatten> implements Transformation, Versioned {
public static final String OVERVIEW_DOC =
"Flatten a nested data structure, generating names for each field by concatenating the field names at each "
@@ -78,6 +80,11 @@ public abstract class Flatten> implements Transformat
}
}
+ @Override
+ public String version() {
+ return AppInfoParser.getVersion();
+ }
+
@Override
public void close() {
}
diff --git a/connect/transforms/src/main/java/org/apache/kafka/connect/transforms/HeaderFrom.java b/connect/transforms/src/main/java/org/apache/kafka/connect/transforms/HeaderFrom.java
index b32ad567e8f..2737b4207ce 100644
--- a/connect/transforms/src/main/java/org/apache/kafka/connect/transforms/HeaderFrom.java
+++ b/connect/transforms/src/main/java/org/apache/kafka/connect/transforms/HeaderFrom.java
@@ -21,6 +21,8 @@ import org.apache.kafka.common.cache.LRUCache;
import org.apache.kafka.common.cache.SynchronizedCache;
import org.apache.kafka.common.config.ConfigDef;
import org.apache.kafka.common.config.ConfigException;
+import org.apache.kafka.common.utils.AppInfoParser;
+import org.apache.kafka.connect.components.Versioned;
import org.apache.kafka.connect.connector.ConnectRecord;
import org.apache.kafka.connect.data.Field;
import org.apache.kafka.connect.data.Schema;
@@ -40,7 +42,7 @@ import java.util.Map;
import static java.lang.String.format;
import static org.apache.kafka.common.config.ConfigDef.NO_DEFAULT_VALUE;
-public abstract class HeaderFrom> implements Transformation {
+public abstract class HeaderFrom> implements Transformation, Versioned {
public static final String FIELDS_FIELD = "fields";
public static final String HEADERS_FIELD = "headers";
@@ -116,6 +118,11 @@ public abstract class HeaderFrom> implements Transfor
}
}
+ @Override
+ public String version() {
+ return AppInfoParser.getVersion();
+ }
+
private R applyWithSchema(R record, Object operatingValue, Schema operatingSchema) {
Headers updatedHeaders = record.headers().duplicate();
Struct value = Requirements.requireStruct(operatingValue, "header " + operation);
diff --git a/connect/transforms/src/main/java/org/apache/kafka/connect/transforms/HoistField.java b/connect/transforms/src/main/java/org/apache/kafka/connect/transforms/HoistField.java
index dc88287a246..9924571825f 100644
--- a/connect/transforms/src/main/java/org/apache/kafka/connect/transforms/HoistField.java
+++ b/connect/transforms/src/main/java/org/apache/kafka/connect/transforms/HoistField.java
@@ -20,6 +20,8 @@ import org.apache.kafka.common.cache.Cache;
import org.apache.kafka.common.cache.LRUCache;
import org.apache.kafka.common.cache.SynchronizedCache;
import org.apache.kafka.common.config.ConfigDef;
+import org.apache.kafka.common.utils.AppInfoParser;
+import org.apache.kafka.connect.components.Versioned;
import org.apache.kafka.connect.connector.ConnectRecord;
import org.apache.kafka.connect.data.Schema;
import org.apache.kafka.connect.data.SchemaBuilder;
@@ -29,7 +31,7 @@ import org.apache.kafka.connect.transforms.util.SimpleConfig;
import java.util.HashMap;
import java.util.Map;
-public abstract class HoistField> implements Transformation {
+public abstract class HoistField> implements Transformation, Versioned {
public static final String OVERVIEW_DOC =
"Wrap data using the specified field name in a Struct when schema present, or a Map in the case of schemaless data."
@@ -75,6 +77,11 @@ public abstract class HoistField> implements Transfor
}
}
+ @Override
+ public String version() {
+ return AppInfoParser.getVersion();
+ }
+
@Override
public void close() {
schemaUpdateCache = null;
diff --git a/connect/transforms/src/main/java/org/apache/kafka/connect/transforms/InsertField.java b/connect/transforms/src/main/java/org/apache/kafka/connect/transforms/InsertField.java
index cbc820b65cc..d31bbe010a6 100644
--- a/connect/transforms/src/main/java/org/apache/kafka/connect/transforms/InsertField.java
+++ b/connect/transforms/src/main/java/org/apache/kafka/connect/transforms/InsertField.java
@@ -21,6 +21,8 @@ import org.apache.kafka.common.cache.LRUCache;
import org.apache.kafka.common.cache.SynchronizedCache;
import org.apache.kafka.common.config.ConfigDef;
import org.apache.kafka.common.config.ConfigException;
+import org.apache.kafka.common.utils.AppInfoParser;
+import org.apache.kafka.connect.components.Versioned;
import org.apache.kafka.connect.connector.ConnectRecord;
import org.apache.kafka.connect.data.Field;
import org.apache.kafka.connect.data.Schema;
@@ -38,7 +40,7 @@ import static org.apache.kafka.connect.transforms.util.Requirements.requireMap;
import static org.apache.kafka.connect.transforms.util.Requirements.requireSinkRecord;
import static org.apache.kafka.connect.transforms.util.Requirements.requireStruct;
-public abstract class InsertField> implements Transformation {
+public abstract class InsertField> implements Transformation, Versioned {
public static final String OVERVIEW_DOC =
"Insert field(s) using attributes from the record metadata or a configured static value."
@@ -104,6 +106,11 @@ public abstract class InsertField> implements Transfo
private Cache schemaUpdateCache;
+ @Override
+ public String version() {
+ return AppInfoParser.getVersion();
+ }
+
@Override
public void configure(Map props) {
final SimpleConfig config = new SimpleConfig(CONFIG_DEF, props);
diff --git a/connect/transforms/src/main/java/org/apache/kafka/connect/transforms/InsertHeader.java b/connect/transforms/src/main/java/org/apache/kafka/connect/transforms/InsertHeader.java
index 88b20020eba..ce218705fb3 100644
--- a/connect/transforms/src/main/java/org/apache/kafka/connect/transforms/InsertHeader.java
+++ b/connect/transforms/src/main/java/org/apache/kafka/connect/transforms/InsertHeader.java
@@ -17,6 +17,8 @@
package org.apache.kafka.connect.transforms;
import org.apache.kafka.common.config.ConfigDef;
+import org.apache.kafka.common.utils.AppInfoParser;
+import org.apache.kafka.connect.components.Versioned;
import org.apache.kafka.connect.connector.ConnectRecord;
import org.apache.kafka.connect.data.SchemaAndValue;
import org.apache.kafka.connect.data.Values;
@@ -27,7 +29,7 @@ import java.util.Map;
import static org.apache.kafka.common.config.ConfigDef.NO_DEFAULT_VALUE;
-public class InsertHeader> implements Transformation {
+public class InsertHeader> implements Transformation, Versioned {
public static final String OVERVIEW_DOC =
"Add a header to each record.";
@@ -57,6 +59,11 @@ public class InsertHeader> implements Transformation<
record.valueSchema(), record.value(), record.timestamp(), updatedHeaders);
}
+ @Override
+ public String version() {
+ return AppInfoParser.getVersion();
+ }
+
@Override
public ConfigDef config() {
diff --git a/connect/transforms/src/main/java/org/apache/kafka/connect/transforms/MaskField.java b/connect/transforms/src/main/java/org/apache/kafka/connect/transforms/MaskField.java
index 9c6f044833f..a88454d0af0 100644
--- a/connect/transforms/src/main/java/org/apache/kafka/connect/transforms/MaskField.java
+++ b/connect/transforms/src/main/java/org/apache/kafka/connect/transforms/MaskField.java
@@ -18,6 +18,8 @@ package org.apache.kafka.connect.transforms;
import java.util.ArrayList;
import org.apache.kafka.common.config.ConfigDef;
+import org.apache.kafka.common.utils.AppInfoParser;
+import org.apache.kafka.connect.components.Versioned;
import org.apache.kafka.connect.connector.ConnectRecord;
import org.apache.kafka.connect.data.Field;
import org.apache.kafka.connect.data.Schema;
@@ -40,7 +42,7 @@ import java.util.function.Function;
import static org.apache.kafka.connect.transforms.util.Requirements.requireMap;
import static org.apache.kafka.connect.transforms.util.Requirements.requireStruct;
-public abstract class MaskField> implements Transformation {
+public abstract class MaskField> implements Transformation, Versioned {
public static final String OVERVIEW_DOC =
"Mask specified fields with a valid null value for the field type (i.e. 0, false, empty string, and so on)."
@@ -90,6 +92,11 @@ public abstract class MaskField> implements Transform
private Set maskedFields;
private String replacement;
+ @Override
+ public String version() {
+ return AppInfoParser.getVersion();
+ }
+
@Override
public void configure(Map props) {
final SimpleConfig config = new SimpleConfig(CONFIG_DEF, props);
diff --git a/connect/transforms/src/main/java/org/apache/kafka/connect/transforms/RegexRouter.java b/connect/transforms/src/main/java/org/apache/kafka/connect/transforms/RegexRouter.java
index a79f5c17dad..d72234f2639 100644
--- a/connect/transforms/src/main/java/org/apache/kafka/connect/transforms/RegexRouter.java
+++ b/connect/transforms/src/main/java/org/apache/kafka/connect/transforms/RegexRouter.java
@@ -17,6 +17,8 @@
package org.apache.kafka.connect.transforms;
import org.apache.kafka.common.config.ConfigDef;
+import org.apache.kafka.common.utils.AppInfoParser;
+import org.apache.kafka.connect.components.Versioned;
import org.apache.kafka.connect.connector.ConnectRecord;
import org.apache.kafka.connect.transforms.util.RegexValidator;
import org.apache.kafka.connect.transforms.util.SimpleConfig;
@@ -27,7 +29,7 @@ import java.util.Map;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
-public class RegexRouter> implements Transformation {
+public class RegexRouter> implements Transformation, Versioned {
private static final Logger log = LoggerFactory.getLogger(RegexRouter.class);
@@ -49,6 +51,11 @@ public class RegexRouter> implements Transformation props) {
final SimpleConfig config = new SimpleConfig(CONFIG_DEF, props);
diff --git a/connect/transforms/src/main/java/org/apache/kafka/connect/transforms/ReplaceField.java b/connect/transforms/src/main/java/org/apache/kafka/connect/transforms/ReplaceField.java
index a70acb60a58..38f5a8a019a 100644
--- a/connect/transforms/src/main/java/org/apache/kafka/connect/transforms/ReplaceField.java
+++ b/connect/transforms/src/main/java/org/apache/kafka/connect/transforms/ReplaceField.java
@@ -22,7 +22,9 @@ import org.apache.kafka.common.cache.SynchronizedCache;
import org.apache.kafka.common.config.ConfigDef;
import org.apache.kafka.common.config.ConfigDef.Importance;
import org.apache.kafka.common.config.ConfigException;
+import org.apache.kafka.common.utils.AppInfoParser;
import org.apache.kafka.common.utils.ConfigUtils;
+import org.apache.kafka.connect.components.Versioned;
import org.apache.kafka.connect.connector.ConnectRecord;
import org.apache.kafka.connect.data.Field;
import org.apache.kafka.connect.data.Schema;
@@ -41,7 +43,7 @@ import java.util.Set;
import static org.apache.kafka.connect.transforms.util.Requirements.requireMap;
import static org.apache.kafka.connect.transforms.util.Requirements.requireStruct;
-public abstract class ReplaceField> implements Transformation {
+public abstract class ReplaceField> implements Transformation, Versioned {
public static final String OVERVIEW_DOC = "Filter or rename fields."
+ "Use the concrete transformation type designed for the record key (" + Key.class.getName() + "
) "
@@ -89,6 +91,11 @@ public abstract class ReplaceField> implements Transf
private Cache schemaUpdateCache;
+ @Override
+ public String version() {
+ return AppInfoParser.getVersion();
+ }
+
@Override
public void configure(Map configs) {
final SimpleConfig config = new SimpleConfig(CONFIG_DEF, ConfigUtils.translateDeprecatedConfigs(configs, new String[][]{
diff --git a/connect/transforms/src/main/java/org/apache/kafka/connect/transforms/SetSchemaMetadata.java b/connect/transforms/src/main/java/org/apache/kafka/connect/transforms/SetSchemaMetadata.java
index c83ff649082..280fb5833f6 100644
--- a/connect/transforms/src/main/java/org/apache/kafka/connect/transforms/SetSchemaMetadata.java
+++ b/connect/transforms/src/main/java/org/apache/kafka/connect/transforms/SetSchemaMetadata.java
@@ -18,6 +18,8 @@ package org.apache.kafka.connect.transforms;
import org.apache.kafka.common.config.ConfigDef;
import org.apache.kafka.common.config.ConfigException;
+import org.apache.kafka.common.utils.AppInfoParser;
+import org.apache.kafka.connect.components.Versioned;
import org.apache.kafka.connect.connector.ConnectRecord;
import org.apache.kafka.connect.data.ConnectSchema;
import org.apache.kafka.connect.data.Field;
@@ -31,7 +33,7 @@ import java.util.Map;
import static org.apache.kafka.connect.transforms.util.Requirements.requireSchema;
-public abstract class SetSchemaMetadata> implements Transformation {
+public abstract class SetSchemaMetadata> implements Transformation, Versioned {
private static final Logger log = LoggerFactory.getLogger(SetSchemaMetadata.class);
public static final String OVERVIEW_DOC =
@@ -50,6 +52,11 @@ public abstract class SetSchemaMetadata> implements T
private String schemaName;
private Integer schemaVersion;
+ @Override
+ public String version() {
+ return AppInfoParser.getVersion();
+ }
+
@Override
public void configure(Map configs) {
final SimpleConfig config = new SimpleConfig(CONFIG_DEF, configs);
diff --git a/connect/transforms/src/main/java/org/apache/kafka/connect/transforms/TimestampConverter.java b/connect/transforms/src/main/java/org/apache/kafka/connect/transforms/TimestampConverter.java
index 3afd11522c7..0ef9292686f 100644
--- a/connect/transforms/src/main/java/org/apache/kafka/connect/transforms/TimestampConverter.java
+++ b/connect/transforms/src/main/java/org/apache/kafka/connect/transforms/TimestampConverter.java
@@ -22,7 +22,9 @@ import org.apache.kafka.common.cache.LRUCache;
import org.apache.kafka.common.cache.SynchronizedCache;
import org.apache.kafka.common.config.ConfigDef;
import org.apache.kafka.common.config.ConfigException;
+import org.apache.kafka.common.utils.AppInfoParser;
import org.apache.kafka.common.utils.Utils;
+import org.apache.kafka.connect.components.Versioned;
import org.apache.kafka.connect.connector.ConnectRecord;
import org.apache.kafka.connect.data.Field;
import org.apache.kafka.connect.data.Schema;
@@ -47,7 +49,7 @@ import java.util.concurrent.TimeUnit;
import static org.apache.kafka.connect.transforms.util.Requirements.requireMap;
import static org.apache.kafka.connect.transforms.util.Requirements.requireStructOrNull;
-public abstract class TimestampConverter> implements Transformation {
+public abstract class TimestampConverter> implements Transformation, Versioned {
public static final String OVERVIEW_DOC =
"Convert timestamps between different formats such as Unix epoch, strings, and Connect Date/Timestamp types."
@@ -121,6 +123,11 @@ public abstract class TimestampConverter> implements
Object toType(Config config, Date orig);
}
+ @Override
+ public String version() {
+ return AppInfoParser.getVersion();
+ }
+
private static final Map TRANSLATORS = new HashMap<>();
static {
TRANSLATORS.put(TYPE_STRING, new TimestampTranslator() {
diff --git a/connect/transforms/src/main/java/org/apache/kafka/connect/transforms/TimestampRouter.java b/connect/transforms/src/main/java/org/apache/kafka/connect/transforms/TimestampRouter.java
index f7b1e58248e..bc94964e659 100644
--- a/connect/transforms/src/main/java/org/apache/kafka/connect/transforms/TimestampRouter.java
+++ b/connect/transforms/src/main/java/org/apache/kafka/connect/transforms/TimestampRouter.java
@@ -17,6 +17,8 @@
package org.apache.kafka.connect.transforms;
import org.apache.kafka.common.config.ConfigDef;
+import org.apache.kafka.common.utils.AppInfoParser;
+import org.apache.kafka.connect.components.Versioned;
import org.apache.kafka.connect.connector.ConnectRecord;
import org.apache.kafka.connect.errors.DataException;
import org.apache.kafka.connect.transforms.util.SimpleConfig;
@@ -28,7 +30,7 @@ import java.util.TimeZone;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
-public class TimestampRouter> implements Transformation, AutoCloseable {
+public class TimestampRouter> implements Transformation, AutoCloseable, Versioned {
private static final Pattern TOPIC = Pattern.compile("${topic}", Pattern.LITERAL);
@@ -54,6 +56,11 @@ public class TimestampRouter> implements Transformati
private String topicFormat;
private ThreadLocal timestampFormat;
+ @Override
+ public String version() {
+ return AppInfoParser.getVersion();
+ }
+
@Override
public void configure(Map props) {
final SimpleConfig config = new SimpleConfig(CONFIG_DEF, props);
diff --git a/connect/transforms/src/main/java/org/apache/kafka/connect/transforms/ValueToKey.java b/connect/transforms/src/main/java/org/apache/kafka/connect/transforms/ValueToKey.java
index 8f843f4aaae..454ea38f572 100644
--- a/connect/transforms/src/main/java/org/apache/kafka/connect/transforms/ValueToKey.java
+++ b/connect/transforms/src/main/java/org/apache/kafka/connect/transforms/ValueToKey.java
@@ -20,6 +20,8 @@ import org.apache.kafka.common.cache.Cache;
import org.apache.kafka.common.cache.LRUCache;
import org.apache.kafka.common.cache.SynchronizedCache;
import org.apache.kafka.common.config.ConfigDef;
+import org.apache.kafka.common.utils.AppInfoParser;
+import org.apache.kafka.connect.components.Versioned;
import org.apache.kafka.connect.connector.ConnectRecord;
import org.apache.kafka.connect.data.Field;
import org.apache.kafka.connect.data.Schema;
@@ -36,7 +38,7 @@ import java.util.Map;
import static org.apache.kafka.connect.transforms.util.Requirements.requireMap;
import static org.apache.kafka.connect.transforms.util.Requirements.requireStruct;
-public class ValueToKey> implements Transformation {
+public class ValueToKey> implements Transformation, Versioned {
public static final String OVERVIEW_DOC = "Replace the record key with a new key formed from a subset of fields in the record value.";
@@ -52,6 +54,11 @@ public class ValueToKey> implements Transformation
private Cache valueToKeySchemaCache;
+ @Override
+ public String version() {
+ return AppInfoParser.getVersion();
+ }
+
@Override
public void configure(Map configs) {
final SimpleConfig config = new SimpleConfig(CONFIG_DEF, configs);
diff --git a/connect/transforms/src/main/java/org/apache/kafka/connect/transforms/predicates/HasHeaderKey.java b/connect/transforms/src/main/java/org/apache/kafka/connect/transforms/predicates/HasHeaderKey.java
index f15d426c026..6229bb7b729 100644
--- a/connect/transforms/src/main/java/org/apache/kafka/connect/transforms/predicates/HasHeaderKey.java
+++ b/connect/transforms/src/main/java/org/apache/kafka/connect/transforms/predicates/HasHeaderKey.java
@@ -20,6 +20,8 @@ import java.util.Iterator;
import java.util.Map;
import org.apache.kafka.common.config.ConfigDef;
+import org.apache.kafka.common.utils.AppInfoParser;
+import org.apache.kafka.connect.components.Versioned;
import org.apache.kafka.connect.connector.ConnectRecord;
import org.apache.kafka.connect.header.Header;
import org.apache.kafka.connect.transforms.util.SimpleConfig;
@@ -28,7 +30,7 @@ import org.apache.kafka.connect.transforms.util.SimpleConfig;
* A predicate which is true for records with at least one header with the configured name.
* @param The type of connect record.
*/
-public class HasHeaderKey> implements Predicate {
+public class HasHeaderKey> implements Predicate, Versioned {
private static final String NAME_CONFIG = "name";
public static final String OVERVIEW_DOC = "A predicate which is true for records with at least one header with the configured name.";
@@ -38,6 +40,11 @@ public class HasHeaderKey> implements Predicate {
"The header name.");
private String name;
+ @Override
+ public String version() {
+ return AppInfoParser.getVersion();
+ }
+
@Override
public ConfigDef config() {
return CONFIG_DEF;
diff --git a/connect/transforms/src/main/java/org/apache/kafka/connect/transforms/predicates/RecordIsTombstone.java b/connect/transforms/src/main/java/org/apache/kafka/connect/transforms/predicates/RecordIsTombstone.java
index 4a21eacc7ce..dc1f602b4a3 100644
--- a/connect/transforms/src/main/java/org/apache/kafka/connect/transforms/predicates/RecordIsTombstone.java
+++ b/connect/transforms/src/main/java/org/apache/kafka/connect/transforms/predicates/RecordIsTombstone.java
@@ -19,17 +19,24 @@ package org.apache.kafka.connect.transforms.predicates;
import java.util.Map;
import org.apache.kafka.common.config.ConfigDef;
+import org.apache.kafka.common.utils.AppInfoParser;
+import org.apache.kafka.connect.components.Versioned;
import org.apache.kafka.connect.connector.ConnectRecord;
/**
* A predicate which is true for records which are tombstones (i.e. have null value).
* @param The type of connect record.
*/
-public class RecordIsTombstone> implements Predicate {
+public class RecordIsTombstone> implements Predicate, Versioned {
public static final String OVERVIEW_DOC = "A predicate which is true for records which are tombstones (i.e. have null value).";
public static final ConfigDef CONFIG_DEF = new ConfigDef();
+ @Override
+ public String version() {
+ return AppInfoParser.getVersion();
+ }
+
@Override
public ConfigDef config() {
return CONFIG_DEF;
diff --git a/connect/transforms/src/main/java/org/apache/kafka/connect/transforms/predicates/TopicNameMatches.java b/connect/transforms/src/main/java/org/apache/kafka/connect/transforms/predicates/TopicNameMatches.java
index 3ea8f1ae956..7e78a69df00 100644
--- a/connect/transforms/src/main/java/org/apache/kafka/connect/transforms/predicates/TopicNameMatches.java
+++ b/connect/transforms/src/main/java/org/apache/kafka/connect/transforms/predicates/TopicNameMatches.java
@@ -22,6 +22,8 @@ import java.util.regex.PatternSyntaxException;
import org.apache.kafka.common.config.ConfigDef;
import org.apache.kafka.common.config.ConfigException;
+import org.apache.kafka.common.utils.AppInfoParser;
+import org.apache.kafka.connect.components.Versioned;
import org.apache.kafka.connect.connector.ConnectRecord;
import org.apache.kafka.connect.transforms.util.RegexValidator;
import org.apache.kafka.connect.transforms.util.SimpleConfig;
@@ -30,7 +32,7 @@ import org.apache.kafka.connect.transforms.util.SimpleConfig;
* A predicate which is true for records with a topic name that matches the configured regular expression.
* @param The type of connect record.
*/
-public class TopicNameMatches> implements Predicate {
+public class TopicNameMatches> implements Predicate, Versioned {
private static final String PATTERN_CONFIG = "pattern";
@@ -43,6 +45,11 @@ public class TopicNameMatches> implements Predicate) transformed.value()).get("string"));
}
+ @Test
+ public void testCastVersionRetrievedFromAppInfoParser() {
+ assertEquals(AppInfoParser.getVersion(), xformKey.version());
+ assertEquals(AppInfoParser.getVersion(), xformValue.version());
+
+ assertEquals(xformKey.version(), xformValue.version());
+ }
+
}
diff --git a/connect/transforms/src/test/java/org/apache/kafka/connect/transforms/DropHeadersTest.java b/connect/transforms/src/test/java/org/apache/kafka/connect/transforms/DropHeadersTest.java
index 7e9190d74dd..95649bd2b3d 100644
--- a/connect/transforms/src/test/java/org/apache/kafka/connect/transforms/DropHeadersTest.java
+++ b/connect/transforms/src/test/java/org/apache/kafka/connect/transforms/DropHeadersTest.java
@@ -17,6 +17,7 @@
package org.apache.kafka.connect.transforms;
import org.apache.kafka.common.config.ConfigException;
+import org.apache.kafka.common.utils.AppInfoParser;
import org.apache.kafka.connect.data.Schema;
import org.apache.kafka.connect.header.ConnectHeaders;
import org.apache.kafka.connect.source.SourceRecord;
@@ -86,6 +87,11 @@ public class DropHeadersTest {
assertThrows(ConfigException.class, () -> xform.configure(config()));
}
+ @Test
+ public void testDropHeadersVersionRetrievedFromAppInfoParser() {
+ assertEquals(AppInfoParser.getVersion(), xform.version());
+ }
+
private void assertNonHeaders(SourceRecord original, SourceRecord xformed) {
assertEquals(original.sourcePartition(), xformed.sourcePartition());
assertEquals(original.sourceOffset(), xformed.sourceOffset());
diff --git a/connect/transforms/src/test/java/org/apache/kafka/connect/transforms/ExtractFieldTest.java b/connect/transforms/src/test/java/org/apache/kafka/connect/transforms/ExtractFieldTest.java
index ce776f9c5f8..3ad9eef1016 100644
--- a/connect/transforms/src/test/java/org/apache/kafka/connect/transforms/ExtractFieldTest.java
+++ b/connect/transforms/src/test/java/org/apache/kafka/connect/transforms/ExtractFieldTest.java
@@ -16,6 +16,7 @@
*/
package org.apache.kafka.connect.transforms;
+import org.apache.kafka.common.utils.AppInfoParser;
import org.apache.kafka.connect.data.Schema;
import org.apache.kafka.connect.data.SchemaBuilder;
import org.apache.kafka.connect.data.Struct;
@@ -113,4 +114,10 @@ public class ExtractFieldTest {
assertEquals("Unknown field: nonexistent", iae.getMessage());
}
}
+
+ @Test
+ public void testExtractFieldVersionRetrievedFromAppInfoParser() {
+ assertEquals(AppInfoParser.getVersion(), xform.version());
+ }
+
}
diff --git a/connect/transforms/src/test/java/org/apache/kafka/connect/transforms/FlattenTest.java b/connect/transforms/src/test/java/org/apache/kafka/connect/transforms/FlattenTest.java
index 90d17245670..b5729c1929b 100644
--- a/connect/transforms/src/test/java/org/apache/kafka/connect/transforms/FlattenTest.java
+++ b/connect/transforms/src/test/java/org/apache/kafka/connect/transforms/FlattenTest.java
@@ -17,6 +17,7 @@
package org.apache.kafka.connect.transforms;
+import org.apache.kafka.common.utils.AppInfoParser;
import org.apache.kafka.connect.data.Schema;
import org.apache.kafka.connect.data.SchemaBuilder;
import org.apache.kafka.connect.data.Struct;
@@ -388,4 +389,12 @@ public class FlattenTest {
assertEquals(value, transformedRecord.value());
}
+
+ @Test
+ public void testFlattenVersionRetrievedFromAppInfoParser() {
+ assertEquals(AppInfoParser.getVersion(), xformKey.version());
+ assertEquals(AppInfoParser.getVersion(), xformValue.version());
+
+ assertEquals(xformKey.version(), xformValue.version());
+ }
}
diff --git a/connect/transforms/src/test/java/org/apache/kafka/connect/transforms/HoistFieldTest.java b/connect/transforms/src/test/java/org/apache/kafka/connect/transforms/HoistFieldTest.java
index e3429fcedf9..63f416f99f3 100644
--- a/connect/transforms/src/test/java/org/apache/kafka/connect/transforms/HoistFieldTest.java
+++ b/connect/transforms/src/test/java/org/apache/kafka/connect/transforms/HoistFieldTest.java
@@ -16,6 +16,7 @@
*/
package org.apache.kafka.connect.transforms;
+import org.apache.kafka.common.utils.AppInfoParser;
import org.apache.kafka.connect.data.Schema;
import org.apache.kafka.connect.data.Struct;
import org.apache.kafka.connect.sink.SinkRecord;
@@ -77,4 +78,9 @@ public class HoistFieldTest {
assertEquals(expectedKey, actualKey);
}
+ @Test
+ public void testHoistFieldVersionRetrievedFromAppInfoParser() {
+ assertEquals(AppInfoParser.getVersion(), xform.version());
+ }
+
}
diff --git a/connect/transforms/src/test/java/org/apache/kafka/connect/transforms/InsertFieldTest.java b/connect/transforms/src/test/java/org/apache/kafka/connect/transforms/InsertFieldTest.java
index 03933ad6c1d..727f9d0b217 100644
--- a/connect/transforms/src/test/java/org/apache/kafka/connect/transforms/InsertFieldTest.java
+++ b/connect/transforms/src/test/java/org/apache/kafka/connect/transforms/InsertFieldTest.java
@@ -16,6 +16,7 @@
*/
package org.apache.kafka.connect.transforms;
+import org.apache.kafka.common.utils.AppInfoParser;
import org.apache.kafka.connect.data.Schema;
import org.apache.kafka.connect.data.SchemaBuilder;
import org.apache.kafka.connect.data.Struct;
@@ -200,4 +201,12 @@ public class InsertFieldTest {
assertSame(record, transformedRecord);
}
+
+ @Test
+ public void testInsertFieldVersionRetrievedFromAppInfoParser() {
+ assertEquals(AppInfoParser.getVersion(), xformKey.version());
+ assertEquals(AppInfoParser.getVersion(), xformValue.version());
+
+ assertEquals(xformKey.version(), xformValue.version());
+ }
}
diff --git a/connect/transforms/src/test/java/org/apache/kafka/connect/transforms/InsertHeaderTest.java b/connect/transforms/src/test/java/org/apache/kafka/connect/transforms/InsertHeaderTest.java
index b0f572969ec..237662701f4 100644
--- a/connect/transforms/src/test/java/org/apache/kafka/connect/transforms/InsertHeaderTest.java
+++ b/connect/transforms/src/test/java/org/apache/kafka/connect/transforms/InsertHeaderTest.java
@@ -17,6 +17,7 @@
package org.apache.kafka.connect.transforms;
import org.apache.kafka.common.config.ConfigException;
+import org.apache.kafka.common.utils.AppInfoParser;
import org.apache.kafka.connect.data.Schema;
import org.apache.kafka.connect.header.ConnectHeaders;
import org.apache.kafka.connect.header.Headers;
@@ -116,5 +117,10 @@ public class InsertHeaderTest {
return new SourceRecord(sourcePartition, sourceOffset, topic, partition,
keySchema, key, valueSchema, value, timestamp, headers);
}
+
+ @Test
+ public void testInsertHeaderVersionRetrievedFromAppInfoParser() {
+ assertEquals(AppInfoParser.getVersion(), xform.version());
+ }
}
diff --git a/connect/transforms/src/test/java/org/apache/kafka/connect/transforms/MaskFieldTest.java b/connect/transforms/src/test/java/org/apache/kafka/connect/transforms/MaskFieldTest.java
index 6a514ebc875..2f5d7feaf21 100644
--- a/connect/transforms/src/test/java/org/apache/kafka/connect/transforms/MaskFieldTest.java
+++ b/connect/transforms/src/test/java/org/apache/kafka/connect/transforms/MaskFieldTest.java
@@ -17,6 +17,7 @@
package org.apache.kafka.connect.transforms;
import org.apache.kafka.common.config.ConfigException;
+import org.apache.kafka.common.utils.AppInfoParser;
import org.apache.kafka.connect.data.Decimal;
import org.apache.kafka.connect.data.Field;
import org.apache.kafka.connect.data.Schema;
@@ -265,4 +266,10 @@ public class MaskFieldTest {
actualMap.put("k", "v");
assertEquals(Collections.singletonMap("k", "v"), actualMap);
}
+
+ @Test
+ public void testMaskFieldReturnsVersionFromAppInfoParser() {
+ final MaskField xform = new MaskField.Value<>();
+ assertEquals(AppInfoParser.getVersion(), xform.version());
+ }
}
diff --git a/connect/transforms/src/test/java/org/apache/kafka/connect/transforms/RegexRouterTest.java b/connect/transforms/src/test/java/org/apache/kafka/connect/transforms/RegexRouterTest.java
index cef82d2410b..5ba2367545b 100644
--- a/connect/transforms/src/test/java/org/apache/kafka/connect/transforms/RegexRouterTest.java
+++ b/connect/transforms/src/test/java/org/apache/kafka/connect/transforms/RegexRouterTest.java
@@ -16,6 +16,7 @@
*/
package org.apache.kafka.connect.transforms;
+import org.apache.kafka.common.utils.AppInfoParser;
import org.apache.kafka.connect.sink.SinkRecord;
import org.junit.jupiter.api.Test;
@@ -67,4 +68,10 @@ public class RegexRouterTest {
assertEquals("index", apply("(.*)-(\\d\\d\\d\\d\\d\\d\\d\\d)", "$1", "index-20160117"));
}
+ @Test
+ public void testRegexRouterRetrievesVersionFromAppInfoParser() {
+ final RegexRouter router = new RegexRouter<>();
+ assertEquals(AppInfoParser.getVersion(), router.version());
+ }
+
}
diff --git a/connect/transforms/src/test/java/org/apache/kafka/connect/transforms/ReplaceFieldTest.java b/connect/transforms/src/test/java/org/apache/kafka/connect/transforms/ReplaceFieldTest.java
index fee5c8f01b3..9df4013c9a4 100644
--- a/connect/transforms/src/test/java/org/apache/kafka/connect/transforms/ReplaceFieldTest.java
+++ b/connect/transforms/src/test/java/org/apache/kafka/connect/transforms/ReplaceFieldTest.java
@@ -16,6 +16,7 @@
*/
package org.apache.kafka.connect.transforms;
+import org.apache.kafka.common.utils.AppInfoParser;
import org.apache.kafka.connect.data.Schema;
import org.apache.kafka.connect.data.SchemaBuilder;
import org.apache.kafka.connect.data.Struct;
@@ -169,4 +170,9 @@ public class ReplaceFieldTest {
assertEquals(true, updatedValue.get("bar"));
assertEquals("etc", updatedValue.get("etc"));
}
+
+ @Test
+ public void testReplaceFieldVersionRetrievedFromAppInfoParser() {
+ assertEquals(AppInfoParser.getVersion(), xform.version());
+ }
}
diff --git a/connect/transforms/src/test/java/org/apache/kafka/connect/transforms/SetSchemaMetadataTest.java b/connect/transforms/src/test/java/org/apache/kafka/connect/transforms/SetSchemaMetadataTest.java
index 74ac308555f..8c0f45ce865 100644
--- a/connect/transforms/src/test/java/org/apache/kafka/connect/transforms/SetSchemaMetadataTest.java
+++ b/connect/transforms/src/test/java/org/apache/kafka/connect/transforms/SetSchemaMetadataTest.java
@@ -16,6 +16,7 @@
*/
package org.apache.kafka.connect.transforms;
+import org.apache.kafka.common.utils.AppInfoParser;
import org.apache.kafka.connect.data.Field;
import org.apache.kafka.connect.data.Schema;
import org.apache.kafka.connect.data.SchemaBuilder;
@@ -157,6 +158,11 @@ public class SetSchemaMetadataTest {
assertNull(updatedValue);
}
+ @Test
+ public void testSchemaMetadataVersionRetrievedFromAppInfoParser() {
+ assertEquals(AppInfoParser.getVersion(), xform.version());
+ }
+
protected void assertMatchingSchema(Struct value, Schema schema) {
assertSame(schema, value.schema());
assertEquals(schema.name(), value.schema().name());
diff --git a/connect/transforms/src/test/java/org/apache/kafka/connect/transforms/TimestampConverterTest.java b/connect/transforms/src/test/java/org/apache/kafka/connect/transforms/TimestampConverterTest.java
index 2c746d3f154..3b47b8ba169 100644
--- a/connect/transforms/src/test/java/org/apache/kafka/connect/transforms/TimestampConverterTest.java
+++ b/connect/transforms/src/test/java/org/apache/kafka/connect/transforms/TimestampConverterTest.java
@@ -18,6 +18,7 @@
package org.apache.kafka.connect.transforms;
import org.apache.kafka.common.config.ConfigException;
+import org.apache.kafka.common.utils.AppInfoParser;
import org.apache.kafka.connect.data.Date;
import org.apache.kafka.connect.data.Schema;
import org.apache.kafka.connect.data.SchemaBuilder;
@@ -681,6 +682,13 @@ public class TimestampConverterTest {
assertEquals(DATE_PLUS_TIME.getTime(), transformed.key());
}
+ @Test
+ public void testTimestampConverterVersionRetrievedFromAppInfoParser() {
+ assertEquals(AppInfoParser.getVersion(), xformKey.version());
+ assertEquals(AppInfoParser.getVersion(), xformValue.version());
+ assertEquals(xformKey.version(), xformValue.version());
+ }
+
private SourceRecord createRecordWithSchema(Schema schema, Object value) {
return new SourceRecord(null, null, "topic", 0, schema, value);
}
diff --git a/connect/transforms/src/test/java/org/apache/kafka/connect/transforms/TimestampRouterTest.java b/connect/transforms/src/test/java/org/apache/kafka/connect/transforms/TimestampRouterTest.java
index 5fa87ba3144..ec1e3bbe054 100644
--- a/connect/transforms/src/test/java/org/apache/kafka/connect/transforms/TimestampRouterTest.java
+++ b/connect/transforms/src/test/java/org/apache/kafka/connect/transforms/TimestampRouterTest.java
@@ -16,8 +16,10 @@
*/
package org.apache.kafka.connect.transforms;
+import org.apache.kafka.common.utils.AppInfoParser;
import org.apache.kafka.connect.source.SourceRecord;
import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import java.util.Collections;
@@ -25,7 +27,12 @@ import java.util.Collections;
import static org.junit.jupiter.api.Assertions.assertEquals;
public class TimestampRouterTest {
- private final TimestampRouter xform = new TimestampRouter<>();
+ private TimestampRouter xform;
+ @BeforeEach
+ public void setup() {
+ xform = new TimestampRouter<>();
+ xform.configure(Collections.emptyMap()); // defaults
+ }
@AfterEach
public void teardown() {
@@ -34,7 +41,6 @@ public class TimestampRouterTest {
@Test
public void defaultConfiguration() {
- xform.configure(Collections.emptyMap()); // defaults
final SourceRecord record = new SourceRecord(
null, null,
"test", 0,
@@ -45,4 +51,9 @@ public class TimestampRouterTest {
assertEquals("test-20170103", xform.apply(record).topic());
}
+ @Test
+ public void testTimestampRouterVersionRetrievedFromAppInfoParser() {
+ assertEquals(AppInfoParser.getVersion(), xform.version());
+ }
+
}
diff --git a/connect/transforms/src/test/java/org/apache/kafka/connect/transforms/ValueToKeyTest.java b/connect/transforms/src/test/java/org/apache/kafka/connect/transforms/ValueToKeyTest.java
index 94fa85c1027..1ffebce6e3b 100644
--- a/connect/transforms/src/test/java/org/apache/kafka/connect/transforms/ValueToKeyTest.java
+++ b/connect/transforms/src/test/java/org/apache/kafka/connect/transforms/ValueToKeyTest.java
@@ -16,6 +16,7 @@
*/
package org.apache.kafka.connect.transforms;
+import org.apache.kafka.common.utils.AppInfoParser;
import org.apache.kafka.connect.data.Schema;
import org.apache.kafka.connect.data.SchemaBuilder;
import org.apache.kafka.connect.data.Struct;
@@ -106,4 +107,9 @@ public class ValueToKeyTest {
DataException actual = assertThrows(DataException.class, () -> xform.apply(record));
assertEquals("Field does not exist: not_exist", actual.getMessage());
}
+
+ @Test
+ public void testValueToKeyVersionRetrievedFromAppInfoParser() {
+ assertEquals(AppInfoParser.getVersion(), xform.version());
+ }
}