Browse Source

KAFKA-3209: KIP-66: more single message transforms

Renames `HoistToStruct` SMT to `HoistField`.

Adds the following SMTs:
`ExtractField`
`MaskField`
`RegexRouter`
`ReplaceField`
`SetSchemaMetadata`
`ValueToKey`

Adds HTML doc generation and updates to `connect.html`.

Author: Shikhar Bhushan <shikhar@confluent.io>

Reviewers: Ewen Cheslack-Postava <ewen@confluent.io>

Closes #2374 from shikhar/more-smt
pull/2407/merge
Shikhar Bhushan 8 years ago committed by Ewen Cheslack-Postava
parent
commit
a8aa756166
  1. 9
      build.gradle
  2. 87
      connect/runtime/src/main/java/org/apache/kafka/connect/tools/TransformationDoc.java
  3. 114
      connect/transforms/src/main/java/org/apache/kafka/connect/transforms/ExtractField.java
  4. 46
      connect/transforms/src/main/java/org/apache/kafka/connect/transforms/HoistField.java
  5. 170
      connect/transforms/src/main/java/org/apache/kafka/connect/transforms/InsertField.java
  6. 172
      connect/transforms/src/main/java/org/apache/kafka/connect/transforms/MaskField.java
  7. 75
      connect/transforms/src/main/java/org/apache/kafka/connect/transforms/RegexRouter.java
  8. 230
      connect/transforms/src/main/java/org/apache/kafka/connect/transforms/ReplaceField.java
  9. 124
      connect/transforms/src/main/java/org/apache/kafka/connect/transforms/SetSchemaMetadata.java
  10. 30
      connect/transforms/src/main/java/org/apache/kafka/connect/transforms/TimestampRouter.java
  11. 111
      connect/transforms/src/main/java/org/apache/kafka/connect/transforms/ValueToKey.java
  12. 39
      connect/transforms/src/main/java/org/apache/kafka/connect/transforms/util/NonEmptyListValidator.java
  13. 41
      connect/transforms/src/main/java/org/apache/kafka/connect/transforms/util/RegexValidator.java
  14. 61
      connect/transforms/src/main/java/org/apache/kafka/connect/transforms/util/Requirements.java
  15. 40
      connect/transforms/src/main/java/org/apache/kafka/connect/transforms/util/SchemaUtil.java
  16. 59
      connect/transforms/src/test/java/org/apache/kafka/connect/transforms/ExtractFieldTest.java
  17. 19
      connect/transforms/src/test/java/org/apache/kafka/connect/transforms/HoistFieldTest.java
  18. 156
      connect/transforms/src/test/java/org/apache/kafka/connect/transforms/MaskFieldTest.java
  19. 70
      connect/transforms/src/test/java/org/apache/kafka/connect/transforms/RegexRouterTest.java
  20. 92
      connect/transforms/src/test/java/org/apache/kafka/connect/transforms/ReplaceFieldTest.java
  21. 67
      connect/transforms/src/test/java/org/apache/kafka/connect/transforms/SetSchemaMetadataTest.java
  22. 87
      connect/transforms/src/test/java/org/apache/kafka/connect/transforms/ValueToKeyTest.java
  23. 16
      docs/connect.html
  24. 2
      tests/kafkatest/tests/connect/connect_distributed_test.py

9
build.gradle

@ -508,7 +508,7 @@ project(':core') { @@ -508,7 +508,7 @@ project(':core') {
task siteDocsTar(dependsOn: ['genProtocolErrorDocs', 'genProtocolApiKeyDocs', 'genProtocolMessageDocs',
'genProducerConfigDocs', 'genConsumerConfigDocs', 'genKafkaConfigDocs',
'genTopicConfigDocs', ':connect:runtime:genConnectConfigDocs',
'genTopicConfigDocs', ':connect:runtime:genConnectConfigDocs', ':connect:runtime:genConnectTransformationDocs',
':streams:genStreamsConfigDocs'], type: Tar) {
classifier = 'site-docs'
compression = Compression.GZIP
@ -948,6 +948,13 @@ project(':connect:runtime') { @@ -948,6 +948,13 @@ project(':connect:runtime') {
if( !generatedDocsDir.exists() ) { generatedDocsDir.mkdirs() }
standardOutput = new File(generatedDocsDir, "connect_config.html").newOutputStream()
}
task genConnectTransformationDocs(type: JavaExec) {
classpath = sourceSets.main.runtimeClasspath
main = 'org.apache.kafka.connect.tools.TransformationDoc'
if( !generatedDocsDir.exists() ) { generatedDocsDir.mkdirs() }
standardOutput = new File(generatedDocsDir, "connect_transforms.html").newOutputStream()
}
}
project(':connect:file') {

87
connect/runtime/src/main/java/org/apache/kafka/connect/tools/TransformationDoc.java

@ -0,0 +1,87 @@ @@ -0,0 +1,87 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
* <p/>
* http://www.apache.org/licenses/LICENSE-2.0
* <p/>
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
**/
package org.apache.kafka.connect.tools;
import org.apache.kafka.common.config.ConfigDef;
import org.apache.kafka.connect.transforms.ExtractField;
import org.apache.kafka.connect.transforms.HoistField;
import org.apache.kafka.connect.transforms.InsertField;
import org.apache.kafka.connect.transforms.MaskField;
import org.apache.kafka.connect.transforms.RegexRouter;
import org.apache.kafka.connect.transforms.ReplaceField;
import org.apache.kafka.connect.transforms.SetSchemaMetadata;
import org.apache.kafka.connect.transforms.TimestampRouter;
import org.apache.kafka.connect.transforms.ValueToKey;
import java.io.PrintStream;
import java.util.Arrays;
import java.util.List;
public class TransformationDoc {
private static final class DocInfo {
final String transformationName;
final String overview;
final ConfigDef configDef;
private DocInfo(String transformationName, String overview, ConfigDef configDef) {
this.transformationName = transformationName;
this.overview = overview;
this.configDef = configDef;
}
}
private static final List<DocInfo> TRANSFORMATIONS = Arrays.asList(
new DocInfo(InsertField.class.getName(), InsertField.OVERVIEW_DOC, InsertField.CONFIG_DEF),
new DocInfo(ReplaceField.class.getName(), ReplaceField.OVERVIEW_DOC, ReplaceField.CONFIG_DEF),
new DocInfo(MaskField.class.getName(), MaskField.OVERVIEW_DOC, MaskField.CONFIG_DEF),
new DocInfo(ValueToKey.class.getName(), ValueToKey.OVERVIEW_DOC, ValueToKey.CONFIG_DEF),
new DocInfo(HoistField.class.getName(), HoistField.OVERVIEW_DOC, HoistField.CONFIG_DEF),
new DocInfo(ExtractField.class.getName(), ExtractField.OVERVIEW_DOC, ExtractField.CONFIG_DEF),
new DocInfo(SetSchemaMetadata.class.getName(), SetSchemaMetadata.OVERVIEW_DOC, SetSchemaMetadata.CONFIG_DEF),
new DocInfo(TimestampRouter.class.getName(), TimestampRouter.OVERVIEW_DOC, TimestampRouter.CONFIG_DEF),
new DocInfo(RegexRouter.class.getName(), RegexRouter.OVERVIEW_DOC, RegexRouter.CONFIG_DEF)
);
private static void printTransformationHtml(PrintStream out, DocInfo docInfo) {
out.println("<div id=\"" + docInfo.transformationName + "\">");
out.print("<h5>");
out.print(docInfo.transformationName);
out.println("</h5>");
out.println(docInfo.overview);
out.println("<p/>");
out.println(docInfo.configDef.toHtmlTable());
out.println("</div>");
}
private static void printHtml(PrintStream out) throws NoSuchFieldException, IllegalAccessException, InstantiationException {
for (final DocInfo docInfo : TRANSFORMATIONS) {
printTransformationHtml(out, docInfo);
}
}
public static void main(String... args) throws Exception {
printHtml(System.out);
}
}

114
connect/transforms/src/main/java/org/apache/kafka/connect/transforms/ExtractField.java

@ -0,0 +1,114 @@ @@ -0,0 +1,114 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
* <p/>
* http://www.apache.org/licenses/LICENSE-2.0
* <p/>
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
**/
package org.apache.kafka.connect.transforms;
import org.apache.kafka.common.config.ConfigDef;
import org.apache.kafka.connect.connector.ConnectRecord;
import org.apache.kafka.connect.data.Schema;
import org.apache.kafka.connect.data.Struct;
import org.apache.kafka.connect.transforms.util.SimpleConfig;
import java.util.Map;
import static org.apache.kafka.connect.transforms.util.Requirements.requireMap;
import static org.apache.kafka.connect.transforms.util.Requirements.requireStruct;
public abstract class ExtractField<R extends ConnectRecord<R>> implements Transformation<R> {
public static final String OVERVIEW_DOC =
"Extract the specified field from a Struct when schema present, or a Map in the case of schemaless data."
+ "<p/>Use the concrete transformation type designed for the record key (<code>" + Key.class.getCanonicalName() + "</code>) "
+ "or value (<code>" + Value.class.getCanonicalName() + "</code>).";
private static final String FIELD_CONFIG = "field";
public static final ConfigDef CONFIG_DEF = new ConfigDef()
.define(FIELD_CONFIG, ConfigDef.Type.STRING, ConfigDef.NO_DEFAULT_VALUE, ConfigDef.Importance.MEDIUM, "Field name to extract.");
private static final String PURPOSE = "field extraction";
private String fieldName;
@Override
public void configure(Map<String, ?> props) {
final SimpleConfig config = new SimpleConfig(CONFIG_DEF, props);
fieldName = config.getString(FIELD_CONFIG);
}
@Override
public R apply(R record) {
final Schema schema = operatingSchema(record);
if (schema == null) {
final Map<String, Object> value = requireMap(operatingValue(record), PURPOSE);
return newRecord(record, null, value.get(fieldName));
} else {
final Struct value = requireStruct(operatingValue(record), PURPOSE);
return newRecord(record, schema.field(fieldName).schema(), value.get(fieldName));
}
}
@Override
public void close() {
}
@Override
public ConfigDef config() {
return CONFIG_DEF;
}
protected abstract Schema operatingSchema(R record);
protected abstract Object operatingValue(R record);
protected abstract R newRecord(R record, Schema updatedSchema, Object updatedValue);
public static class Key<R extends ConnectRecord<R>> extends ExtractField<R> {
@Override
protected Schema operatingSchema(R record) {
return record.keySchema();
}
@Override
protected Object operatingValue(R record) {
return record.key();
}
@Override
protected R newRecord(R record, Schema updatedSchema, Object updatedValue) {
return record.newRecord(record.topic(), record.kafkaPartition(), updatedSchema, updatedValue, record.valueSchema(), record.value(), record.timestamp());
}
}
public static class Value<R extends ConnectRecord<R>> extends ExtractField<R> {
@Override
protected Schema operatingSchema(R record) {
return record.valueSchema();
}
@Override
protected Object operatingValue(R record) {
return record.value();
}
@Override
protected R newRecord(R record, Schema updatedSchema, Object updatedValue) {
return record.newRecord(record.topic(), record.kafkaPartition(), record.keySchema(), record.key(), updatedSchema, updatedValue, record.timestamp());
}
}
}

46
connect/transforms/src/main/java/org/apache/kafka/connect/transforms/HoistToStruct.java → connect/transforms/src/main/java/org/apache/kafka/connect/transforms/HoistField.java

@ -27,15 +27,21 @@ import org.apache.kafka.connect.data.SchemaBuilder; @@ -27,15 +27,21 @@ import org.apache.kafka.connect.data.SchemaBuilder;
import org.apache.kafka.connect.data.Struct;
import org.apache.kafka.connect.transforms.util.SimpleConfig;
import java.util.Collections;
import java.util.Map;
public abstract class HoistToStruct<R extends ConnectRecord<R>> implements Transformation<R> {
public abstract class HoistField<R extends ConnectRecord<R>> implements Transformation<R> {
public static final String FIELD_CONFIG = "field";
public static final String OVERVIEW_DOC =
"Wrap data using the specified field name in a Struct when schema present, or a Map in the case of schemaless data."
+ "<p/>Use the concrete transformation type designed for the record key (<code>" + Key.class.getCanonicalName() + "</code>) "
+ "or value (<code>" + Value.class.getCanonicalName() + "</code>).";
private static final ConfigDef CONFIG_DEF = new ConfigDef()
private static final String FIELD_CONFIG = "field";
public static final ConfigDef CONFIG_DEF = new ConfigDef()
.define(FIELD_CONFIG, ConfigDef.Type.STRING, ConfigDef.NO_DEFAULT_VALUE, ConfigDef.Importance.MEDIUM,
"Field name for the single field that will be created in the resulting Struct.");
"Field name for the single field that will be created in the resulting Struct or Map.");
private Cache<Schema, Schema> schemaUpdateCache;
@ -53,15 +59,19 @@ public abstract class HoistToStruct<R extends ConnectRecord<R>> implements Trans @@ -53,15 +59,19 @@ public abstract class HoistToStruct<R extends ConnectRecord<R>> implements Trans
final Schema schema = operatingSchema(record);
final Object value = operatingValue(record);
Schema updatedSchema = schemaUpdateCache.get(schema);
if (updatedSchema == null) {
updatedSchema = SchemaBuilder.struct().field(fieldName, schema).build();
schemaUpdateCache.put(schema, updatedSchema);
}
if (schema == null) {
return newRecord(record, null, Collections.singletonMap(fieldName, value));
} else {
Schema updatedSchema = schemaUpdateCache.get(schema);
if (updatedSchema == null) {
updatedSchema = SchemaBuilder.struct().field(fieldName, schema).build();
schemaUpdateCache.put(schema, updatedSchema);
}
final Struct updatedValue = new Struct(updatedSchema).put(fieldName, value);
final Struct updatedValue = new Struct(updatedSchema).put(fieldName, value);
return newRecord(record, updatedSchema, updatedValue);
return newRecord(record, updatedSchema, updatedValue);
}
}
@Override
@ -80,11 +90,7 @@ public abstract class HoistToStruct<R extends ConnectRecord<R>> implements Trans @@ -80,11 +90,7 @@ public abstract class HoistToStruct<R extends ConnectRecord<R>> implements Trans
protected abstract R newRecord(R record, Schema updatedSchema, Object updatedValue);
/**
* Wraps the record key in a {@link org.apache.kafka.connect.data.Struct} with specified field name.
*/
public static class Key<R extends ConnectRecord<R>> extends HoistToStruct<R> {
public static class Key<R extends ConnectRecord<R>> extends HoistField<R> {
@Override
protected Schema operatingSchema(R record) {
return record.keySchema();
@ -99,14 +105,9 @@ public abstract class HoistToStruct<R extends ConnectRecord<R>> implements Trans @@ -99,14 +105,9 @@ public abstract class HoistToStruct<R extends ConnectRecord<R>> implements Trans
protected R newRecord(R record, Schema updatedSchema, Object updatedValue) {
return record.newRecord(record.topic(), record.kafkaPartition(), updatedSchema, updatedValue, record.valueSchema(), record.value(), record.timestamp());
}
}
/**
* Wraps the record value in a {@link org.apache.kafka.connect.data.Struct} with specified field name.
*/
public static class Value<R extends ConnectRecord<R>> extends HoistToStruct<R> {
public static class Value<R extends ConnectRecord<R>> extends HoistField<R> {
@Override
protected Schema operatingSchema(R record) {
return record.valueSchema();
@ -121,7 +122,6 @@ public abstract class HoistToStruct<R extends ConnectRecord<R>> implements Trans @@ -121,7 +122,6 @@ public abstract class HoistToStruct<R extends ConnectRecord<R>> implements Trans
protected R newRecord(R record, Schema updatedSchema, Object updatedValue) {
return record.newRecord(record.topic(), record.kafkaPartition(), record.keySchema(), record.key(), updatedSchema, updatedValue, record.timestamp());
}
}
}

170
connect/transforms/src/main/java/org/apache/kafka/connect/transforms/InsertField.java

@ -21,23 +21,32 @@ import org.apache.kafka.common.cache.Cache; @@ -21,23 +21,32 @@ import org.apache.kafka.common.cache.Cache;
import org.apache.kafka.common.cache.LRUCache;
import org.apache.kafka.common.cache.SynchronizedCache;
import org.apache.kafka.common.config.ConfigDef;
import org.apache.kafka.common.config.ConfigException;
import org.apache.kafka.connect.connector.ConnectRecord;
import org.apache.kafka.connect.data.Field;
import org.apache.kafka.connect.data.Schema;
import org.apache.kafka.connect.data.SchemaBuilder;
import org.apache.kafka.connect.data.Struct;
import org.apache.kafka.connect.data.Timestamp;
import org.apache.kafka.connect.errors.DataException;
import org.apache.kafka.connect.sink.SinkRecord;
import org.apache.kafka.connect.transforms.util.SimpleConfig;
import org.apache.kafka.connect.transforms.util.SchemaUtil;
import java.util.Date;
import java.util.HashMap;
import java.util.Map;
import static org.apache.kafka.connect.transforms.util.Requirements.requireMap;
import static org.apache.kafka.connect.transforms.util.Requirements.requireSinkRecord;
import static org.apache.kafka.connect.transforms.util.Requirements.requireStruct;
public abstract class InsertField<R extends ConnectRecord<R>> implements Transformation<R> {
public interface Keys {
public static final String OVERVIEW_DOC =
"Insert field(s) using attributes from the record metadata or a configured static value."
+ "<p/>Use the concrete transformation type designed for the record key (<code>" + Key.class.getCanonicalName() + "</code>) "
+ "or value (<code>" + Value.class.getCanonicalName() + "</code>).";
private interface ConfigName {
String TOPIC_FIELD = "topic.field";
String PARTITION_FIELD = "partition.field";
String OFFSET_FIELD = "offset.field";
@ -46,22 +55,24 @@ public abstract class InsertField<R extends ConnectRecord<R>> implements Transfo @@ -46,22 +55,24 @@ public abstract class InsertField<R extends ConnectRecord<R>> implements Transfo
String STATIC_VALUE = "static.value";
}
private static final String OPTIONALITY_DOC = "Suffix with '!' to make this a required field, or '?' to keep it optional (the default).";
private static final ConfigDef CONFIG_DEF = new ConfigDef()
.define(Keys.TOPIC_FIELD, ConfigDef.Type.STRING, null, ConfigDef.Importance.MEDIUM,
"Field name for Kafka topic.\n" + OPTIONALITY_DOC)
.define(Keys.PARTITION_FIELD, ConfigDef.Type.STRING, null, ConfigDef.Importance.MEDIUM,
"Field name for Kafka partition.\n" + OPTIONALITY_DOC)
.define(Keys.OFFSET_FIELD, ConfigDef.Type.STRING, null, ConfigDef.Importance.MEDIUM,
"Field name for Kafka offset - only applicable to sink connectors.\n" + OPTIONALITY_DOC)
.define(Keys.TIMESTAMP_FIELD, ConfigDef.Type.STRING, null, ConfigDef.Importance.MEDIUM,
"Field name for record timestamp.\n" + OPTIONALITY_DOC)
.define(Keys.STATIC_FIELD, ConfigDef.Type.STRING, null, ConfigDef.Importance.MEDIUM,
"Field name for static data field.\n" + OPTIONALITY_DOC)
.define(Keys.STATIC_VALUE, ConfigDef.Type.STRING, null, ConfigDef.Importance.MEDIUM,
private static final String OPTIONALITY_DOC = "Suffix with <code>!</code> to make this a required field, or <code>?</code> to keep it optional (the default).";
public static final ConfigDef CONFIG_DEF = new ConfigDef()
.define(ConfigName.TOPIC_FIELD, ConfigDef.Type.STRING, null, ConfigDef.Importance.MEDIUM,
"Field name for Kafka topic. " + OPTIONALITY_DOC)
.define(ConfigName.PARTITION_FIELD, ConfigDef.Type.STRING, null, ConfigDef.Importance.MEDIUM,
"Field name for Kafka partition. " + OPTIONALITY_DOC)
.define(ConfigName.OFFSET_FIELD, ConfigDef.Type.STRING, null, ConfigDef.Importance.MEDIUM,
"Field name for Kafka offset - only applicable to sink connectors.<br/>" + OPTIONALITY_DOC)
.define(ConfigName.TIMESTAMP_FIELD, ConfigDef.Type.STRING, null, ConfigDef.Importance.MEDIUM,
"Field name for record timestamp. " + OPTIONALITY_DOC)
.define(ConfigName.STATIC_FIELD, ConfigDef.Type.STRING, null, ConfigDef.Importance.MEDIUM,
"Field name for static data field. " + OPTIONALITY_DOC)
.define(ConfigName.STATIC_VALUE, ConfigDef.Type.STRING, null, ConfigDef.Importance.MEDIUM,
"Static field value, if field name configured.");
private static final String PURPOSE = "field insertion";
private static final Schema OPTIONAL_TIMESTAMP_SCHEMA = Timestamp.builder().optional().build();
private static final class InsertionSpec {
@ -91,46 +102,42 @@ public abstract class InsertField<R extends ConnectRecord<R>> implements Transfo @@ -91,46 +102,42 @@ public abstract class InsertField<R extends ConnectRecord<R>> implements Transfo
private InsertionSpec timestampField;
private InsertionSpec staticField;
private String staticValue;
private boolean applicable;
private Cache<Schema, Schema> schemaUpdateCache;
@Override
public void configure(Map<String, ?> props) {
final SimpleConfig config = new SimpleConfig(CONFIG_DEF, props);
topicField = InsertionSpec.parse(config.getString(Keys.TOPIC_FIELD));
partitionField = InsertionSpec.parse(config.getString(Keys.PARTITION_FIELD));
offsetField = InsertionSpec.parse(config.getString(Keys.OFFSET_FIELD));
timestampField = InsertionSpec.parse(config.getString(Keys.TIMESTAMP_FIELD));
staticField = InsertionSpec.parse(config.getString(Keys.STATIC_FIELD));
staticValue = config.getString(Keys.STATIC_VALUE);
applicable = topicField != null || partitionField != null || offsetField != null || timestampField != null;
topicField = InsertionSpec.parse(config.getString(ConfigName.TOPIC_FIELD));
partitionField = InsertionSpec.parse(config.getString(ConfigName.PARTITION_FIELD));
offsetField = InsertionSpec.parse(config.getString(ConfigName.OFFSET_FIELD));
timestampField = InsertionSpec.parse(config.getString(ConfigName.TIMESTAMP_FIELD));
staticField = InsertionSpec.parse(config.getString(ConfigName.STATIC_FIELD));
staticValue = config.getString(ConfigName.STATIC_VALUE);
if (topicField == null && partitionField == null && offsetField == null && timestampField == null && staticField == null) {
throw new ConfigException("No field insertion configured");
}
if (staticField != null && staticValue == null) {
throw new ConfigException(ConfigName.STATIC_VALUE, null, "No value specified for static field: " + staticField);
}
schemaUpdateCache = new SynchronizedCache<>(new LRUCache<Schema, Schema>(16));
}
@Override
public R apply(R record) {
if (!applicable) return record;
final Schema schema = operatingSchema(record);
final Object value = operatingValue(record);
if (value == null)
throw new DataException("null value");
if (schema == null) {
if (!(value instanceof Map))
throw new DataException("Can only operate on Map value in schemaless mode: " + value.getClass().getName());
return applySchemaless(record, (Map<String, Object>) value);
if (operatingSchema(record) == null) {
return applySchemaless(record);
} else {
if (schema.type() != Schema.Type.STRUCT)
throw new DataException("Can only operate on Struct types: " + value.getClass().getName());
return applyWithSchema(record, schema, (Struct) value);
return applyWithSchema(record);
}
}
private R applySchemaless(R record, Map<String, Object> value) {
private R applySchemaless(R record) {
final Map<String, Object> value = requireMap(operatingValue(record), PURPOSE);
final Map<String, Object> updatedValue = new HashMap<>(value);
if (topicField != null) {
@ -140,9 +147,7 @@ public abstract class InsertField<R extends ConnectRecord<R>> implements Transfo @@ -140,9 +147,7 @@ public abstract class InsertField<R extends ConnectRecord<R>> implements Transfo
updatedValue.put(partitionField.name, record.kafkaPartition());
}
if (offsetField != null) {
if (!(record instanceof SinkRecord))
throw new DataException("Offset insertion is only supported for sink connectors, record is of type: " + record.getClass());
updatedValue.put(offsetField.name, ((SinkRecord) record).kafkaOffset());
updatedValue.put(offsetField.name, requireSinkRecord(record, PURPOSE).kafkaOffset());
}
if (timestampField != null && record.timestamp() != null) {
updatedValue.put(timestampField.name, record.timestamp());
@ -150,36 +155,46 @@ public abstract class InsertField<R extends ConnectRecord<R>> implements Transfo @@ -150,36 +155,46 @@ public abstract class InsertField<R extends ConnectRecord<R>> implements Transfo
if (staticField != null && staticValue != null) {
updatedValue.put(staticField.name, staticValue);
}
return newRecord(record, null, updatedValue);
}
private R applyWithSchema(R record, Schema schema, Struct value) {
Schema updatedSchema = schemaUpdateCache.get(schema);
private R applyWithSchema(R record) {
final Struct value = requireStruct(operatingValue(record), PURPOSE);
Schema updatedSchema = schemaUpdateCache.get(value.schema());
if (updatedSchema == null) {
updatedSchema = makeUpdatedSchema(schema);
schemaUpdateCache.put(schema, updatedSchema);
updatedSchema = makeUpdatedSchema(value.schema());
schemaUpdateCache.put(value.schema(), updatedSchema);
}
final Struct updatedValue = new Struct(updatedSchema);
copyFields(value, updatedValue);
for (Field field : value.schema().fields()) {
updatedValue.put(field.name(), value.get(field));
}
insertFields(record, updatedValue);
if (topicField != null) {
updatedValue.put(topicField.name, record.topic());
}
if (partitionField != null && record.kafkaPartition() != null) {
updatedValue.put(partitionField.name, record.kafkaPartition());
}
if (offsetField != null) {
updatedValue.put(offsetField.name, requireSinkRecord(record, PURPOSE).kafkaOffset());
}
if (timestampField != null && record.timestamp() != null) {
updatedValue.put(timestampField.name, new Date(record.timestamp()));
}
if (staticField != null && staticValue != null) {
updatedValue.put(staticField.name, staticValue);
}
return newRecord(record, updatedSchema, updatedValue);
}
private Schema makeUpdatedSchema(Schema schema) {
final SchemaBuilder builder = SchemaBuilder.struct();
builder.name(schema.name());
builder.version(schema.version());
builder.doc(schema.doc());
final Map<String, String> params = schema.parameters();
if (params != null) {
builder.parameters(params);
}
final SchemaBuilder builder = SchemaUtil.copySchemaBasics(schema, SchemaBuilder.struct());
for (Field field : schema.fields()) {
builder.field(field.name(), field.schema());
@ -204,33 +219,6 @@ public abstract class InsertField<R extends ConnectRecord<R>> implements Transfo @@ -204,33 +219,6 @@ public abstract class InsertField<R extends ConnectRecord<R>> implements Transfo
return builder.build();
}
private void copyFields(Struct value, Struct updatedValue) {
for (Field field : value.schema().fields()) {
updatedValue.put(field.name(), value.get(field));
}
}
private void insertFields(R record, Struct value) {
if (topicField != null) {
value.put(topicField.name, record.topic());
}
if (partitionField != null && record.kafkaPartition() != null) {
value.put(partitionField.name, record.kafkaPartition());
}
if (offsetField != null) {
if (!(record instanceof SinkRecord)) {
throw new DataException("Offset insertion is only supported for sink connectors, record is of type: " + record.getClass());
}
value.put(offsetField.name, ((SinkRecord) record).kafkaOffset());
}
if (timestampField != null && record.timestamp() != null) {
value.put(timestampField.name, new Date(record.timestamp()));
}
if (staticField != null && staticValue != null) {
value.put(staticField.name, staticValue);
}
}
@Override
public void close() {
schemaUpdateCache = null;
@ -247,10 +235,6 @@ public abstract class InsertField<R extends ConnectRecord<R>> implements Transfo @@ -247,10 +235,6 @@ public abstract class InsertField<R extends ConnectRecord<R>> implements Transfo
protected abstract R newRecord(R record, Schema updatedSchema, Object updatedValue);
/**
* This transformation allows inserting configured attributes of the record metadata as fields in the record key.
* It also allows adding a static data field.
*/
public static class Key<R extends ConnectRecord<R>> extends InsertField<R> {
@Override
@ -270,10 +254,6 @@ public abstract class InsertField<R extends ConnectRecord<R>> implements Transfo @@ -270,10 +254,6 @@ public abstract class InsertField<R extends ConnectRecord<R>> implements Transfo
}
/**
* This transformation allows inserting configured attributes of the record metadata as fields in the record value.
* It also allows adding a static data field.
*/
public static class Value<R extends ConnectRecord<R>> extends InsertField<R> {
@Override

172
connect/transforms/src/main/java/org/apache/kafka/connect/transforms/MaskField.java

@ -0,0 +1,172 @@ @@ -0,0 +1,172 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
* <p/>
* http://www.apache.org/licenses/LICENSE-2.0
* <p/>
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
**/
package org.apache.kafka.connect.transforms;
import org.apache.kafka.common.config.ConfigDef;
import org.apache.kafka.connect.connector.ConnectRecord;
import org.apache.kafka.connect.data.Field;
import org.apache.kafka.connect.data.Schema;
import org.apache.kafka.connect.data.Struct;
import org.apache.kafka.connect.errors.DataException;
import org.apache.kafka.connect.transforms.util.NonEmptyListValidator;
import org.apache.kafka.connect.transforms.util.SimpleConfig;
import java.math.BigDecimal;
import java.math.BigInteger;
import java.util.Collections;
import java.util.Date;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import static org.apache.kafka.connect.transforms.util.Requirements.requireMap;
import static org.apache.kafka.connect.transforms.util.Requirements.requireStruct;
public abstract class MaskField<R extends ConnectRecord<R>> implements Transformation<R> {
public static final String OVERVIEW_DOC =
"Mask specified fields with a valid null value for the field type (i.e. 0, false, empty string, and so on)."
+ "<p/>Use the concrete transformation type designed for the record key (<code>" + Key.class.getCanonicalName() + "</code>) "
+ "or value (<code>" + Value.class.getCanonicalName() + "</code>).";
private static final String FIELDS_CONFIG = "fields";
public static final ConfigDef CONFIG_DEF = new ConfigDef()
.define(FIELDS_CONFIG, ConfigDef.Type.LIST, ConfigDef.NO_DEFAULT_VALUE, new NonEmptyListValidator(), ConfigDef.Importance.HIGH, "Names of fields to mask.");
private static final String PURPOSE = "mask fields";
private static final Map<Class<?>, Object> PRIMITIVE_VALUE_MAPPING = new HashMap<>();
static {
PRIMITIVE_VALUE_MAPPING.put(Boolean.class, Boolean.FALSE);
PRIMITIVE_VALUE_MAPPING.put(Byte.class, (byte) 0);
PRIMITIVE_VALUE_MAPPING.put(Short.class, (short) 0);
PRIMITIVE_VALUE_MAPPING.put(Integer.class, 0);
PRIMITIVE_VALUE_MAPPING.put(Long.class, 0L);
PRIMITIVE_VALUE_MAPPING.put(Float.class, 0f);
PRIMITIVE_VALUE_MAPPING.put(Double.class, 0d);
PRIMITIVE_VALUE_MAPPING.put(BigInteger.class, BigInteger.ZERO);
PRIMITIVE_VALUE_MAPPING.put(BigDecimal.class, BigDecimal.ZERO);
PRIMITIVE_VALUE_MAPPING.put(Date.class, new Date(0));
PRIMITIVE_VALUE_MAPPING.put(String.class, "");
}
private Set<String> maskedFields;
@Override
public void configure(Map<String, ?> props) {
final SimpleConfig config = new SimpleConfig(CONFIG_DEF, props);
maskedFields = new HashSet<>(config.getList(FIELDS_CONFIG));
}
@Override
public R apply(R record) {
if (operatingSchema(record) == null) {
return applySchemaless(record);
} else {
return applyWithSchema(record);
}
}
private R applySchemaless(R record) {
final Map<String, Object> value = requireMap(operatingValue(record), PURPOSE);
final HashMap<String, Object> updatedValue = new HashMap<>(value);
for (String field : maskedFields) {
updatedValue.put(field, masked(value.get(field)));
}
return newRecord(record, updatedValue);
}
private R applyWithSchema(R record) {
final Struct value = requireStruct(operatingValue(record), PURPOSE);
final Struct updatedValue = new Struct(value.schema());
for (Field field : value.schema().fields()) {
final Object origFieldValue = value.get(field);
updatedValue.put(field, maskedFields.contains(field.name()) ? masked(origFieldValue) : origFieldValue);
}
return newRecord(record, updatedValue);
}
private static Object masked(Object value) {
if (value == null)
return null;
Object maskedValue = PRIMITIVE_VALUE_MAPPING.get(value.getClass());
if (maskedValue == null) {
if (value instanceof List)
maskedValue = Collections.emptyList();
else if (value instanceof Map)
maskedValue = Collections.emptyMap();
else
throw new DataException("Cannot mask value of type: " + value.getClass());
}
return maskedValue;
}
@Override
public ConfigDef config() {
return CONFIG_DEF;
}
@Override
public void close() {
}
protected abstract Schema operatingSchema(R record);
protected abstract Object operatingValue(R record);
protected abstract R newRecord(R base, Object value);
public static final class Key<R extends ConnectRecord<R>> extends MaskField<R> {
@Override
protected Schema operatingSchema(R record) {
return record.keySchema();
}
@Override
protected Object operatingValue(R record) {
return record.key();
}
@Override
protected R newRecord(R record, Object updatedValue) {
return record.newRecord(record.topic(), record.kafkaPartition(), record.keySchema(), updatedValue, record.valueSchema(), record.value(), record.timestamp());
}
}
public static final class Value<R extends ConnectRecord<R>> extends MaskField<R> {
@Override
protected Schema operatingSchema(R record) {
return record.valueSchema();
}
@Override
protected Object operatingValue(R record) {
return record.value();
}
@Override
protected R newRecord(R record, Object updatedValue) {
return record.newRecord(record.topic(), record.kafkaPartition(), record.keySchema(), record.key(), record.valueSchema(), updatedValue, record.timestamp());
}
}
}

75
connect/transforms/src/main/java/org/apache/kafka/connect/transforms/RegexRouter.java

@ -0,0 +1,75 @@ @@ -0,0 +1,75 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
* <p>
* http://www.apache.org/licenses/LICENSE-2.0
* <p>
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
**/
package org.apache.kafka.connect.transforms;
import org.apache.kafka.common.config.ConfigDef;
import org.apache.kafka.connect.connector.ConnectRecord;
import org.apache.kafka.connect.transforms.util.RegexValidator;
import org.apache.kafka.connect.transforms.util.SimpleConfig;
import java.util.Map;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
public class RegexRouter<R extends ConnectRecord<R>> implements Transformation<R> {
public static final String OVERVIEW_DOC = "Update the record topic using the configured regular expression and replacement string."
+ "<p/>Under the hood, the regex is compiled to a <code>java.util.regex.Pattern</code>. "
+ "If the pattern matches the input topic, <code>java.util.regex.Matcher#replaceFirst()</code> is used with the replacement string to obtain the new topic.";
public static final ConfigDef CONFIG_DEF = new ConfigDef()
.define(ConfigName.REGEX, ConfigDef.Type.STRING, ConfigDef.NO_DEFAULT_VALUE, new RegexValidator(), ConfigDef.Importance.HIGH,
"Regular expression to use for matching.")
.define(ConfigName.REPLACEMENT, ConfigDef.Type.STRING, ConfigDef.NO_DEFAULT_VALUE, ConfigDef.Importance.HIGH,
"Replacement string.");
private interface ConfigName {
String REGEX = "regex";
String REPLACEMENT = "replacement";
}
private Pattern regex;
private String replacement;
@Override
public void configure(Map<String, ?> props) {
final SimpleConfig config = new SimpleConfig(CONFIG_DEF, props);
regex = Pattern.compile(config.getString(ConfigName.REGEX));
replacement = config.getString(ConfigName.REPLACEMENT);
}
@Override
public R apply(R record) {
final Matcher matcher = regex.matcher(record.topic());
if (matcher.matches()) {
final String topic = matcher.replaceFirst(replacement);
return record.newRecord(topic, record.kafkaPartition(), record.keySchema(), record.key(), record.valueSchema(), record.value(), record.timestamp());
}
return record;
}
@Override
public void close() {
}
@Override
public ConfigDef config() {
return CONFIG_DEF;
}
}

230
connect/transforms/src/main/java/org/apache/kafka/connect/transforms/ReplaceField.java

@ -0,0 +1,230 @@ @@ -0,0 +1,230 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
* <p>
* http://www.apache.org/licenses/LICENSE-2.0
* <p>
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
**/
package org.apache.kafka.connect.transforms;
import org.apache.kafka.common.cache.Cache;
import org.apache.kafka.common.cache.LRUCache;
import org.apache.kafka.common.cache.SynchronizedCache;
import org.apache.kafka.common.config.ConfigDef;
import org.apache.kafka.common.config.ConfigException;
import org.apache.kafka.connect.connector.ConnectRecord;
import org.apache.kafka.connect.data.Field;
import org.apache.kafka.connect.data.Schema;
import org.apache.kafka.connect.data.SchemaBuilder;
import org.apache.kafka.connect.data.Struct;
import org.apache.kafka.connect.transforms.util.SchemaUtil;
import org.apache.kafka.connect.transforms.util.SimpleConfig;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import static org.apache.kafka.connect.transforms.util.Requirements.requireMap;
import static org.apache.kafka.connect.transforms.util.Requirements.requireStruct;
public abstract class ReplaceField<R extends ConnectRecord<R>> implements Transformation<R> {
public static final String OVERVIEW_DOC = "Filter or rename fields.";
interface ConfigName {
String BLACKLIST = "blacklist";
String WHITELIST = "whitelist";
String RENAME = "renames";
}
public static final ConfigDef CONFIG_DEF = new ConfigDef()
.define(ConfigName.BLACKLIST, ConfigDef.Type.LIST, Collections.emptyList(), ConfigDef.Importance.MEDIUM,
"Fields to exclude. This takes precedence over the whitelist.")
.define(ConfigName.WHITELIST, ConfigDef.Type.LIST, Collections.emptyList(), ConfigDef.Importance.MEDIUM,
"Fields to include. If specified, only these fields will be used.")
.define(ConfigName.RENAME, ConfigDef.Type.LIST, Collections.emptyList(), new ConfigDef.Validator() {
@Override
public void ensureValid(String name, Object value) {
parseRenameMappings((List<String>) value);
}
@Override
public String toString() {
return "list of colon-delimited pairs, e.g. <code>foo:bar,abc:xyz</code>";
}
}, ConfigDef.Importance.MEDIUM, "Field rename mappings.");
private static final String PURPOSE = "field replacement";
private List<String> blacklist;
private List<String> whitelist;
private Map<String, String> renames;
private Map<String, String> reverseRenames;
private Cache<Schema, Schema> schemaUpdateCache;
@Override
public void configure(Map<String, ?> configs) {
final SimpleConfig config = new SimpleConfig(CONFIG_DEF, configs);
blacklist = config.getList(ConfigName.BLACKLIST);
whitelist = config.getList(ConfigName.WHITELIST);
renames = parseRenameMappings(config.getList(ConfigName.RENAME));
reverseRenames = invert(renames);
schemaUpdateCache = new SynchronizedCache<>(new LRUCache<Schema, Schema>(16));
}
static Map<String, String> parseRenameMappings(List<String> mappings) {
final Map<String, String> m = new HashMap<>();
for (String mapping : mappings) {
final String[] parts = mapping.split(":");
if (parts.length != 2) {
throw new ConfigException(ConfigName.RENAME, mappings, "Invalid rename mapping: " + mapping);
}
m.put(parts[0], parts[1]);
}
return m;
}
static Map<String, String> invert(Map<String, String> source) {
final Map<String, String> m = new HashMap<>();
for (Map.Entry<String, String> e : source.entrySet()) {
m.put(e.getValue(), e.getKey());
}
return m;
}
boolean filter(String fieldName) {
return !blacklist.contains(fieldName) && (whitelist.isEmpty() || whitelist.contains(fieldName));
}
String renamed(String fieldName) {
final String mapping = renames.get(fieldName);
return mapping == null ? fieldName : mapping;
}
String reverseRenamed(String fieldName) {
final String mapping = reverseRenames.get(fieldName);
return mapping == null ? fieldName : mapping;
}
@Override
public R apply(R record) {
if (operatingSchema(record) == null) {
return applySchemaless(record);
} else {
return applyWithSchema(record);
}
}
private R applySchemaless(R record) {
final Map<String, Object> value = requireMap(operatingValue(record), PURPOSE);
final Map<String, Object> updatedValue = new HashMap<>(value.size());
for (Map.Entry<String, Object> e : value.entrySet()) {
final String fieldName = e.getKey();
if (filter(fieldName)) {
final Object fieldValue = e.getValue();
updatedValue.put(renamed(fieldName), fieldValue);
}
}
return newRecord(record, null, updatedValue);
}
private R applyWithSchema(R record) {
final Struct value = requireStruct(operatingValue(record), PURPOSE);
Schema updatedSchema = schemaUpdateCache.get(value.schema());
if (updatedSchema == null) {
updatedSchema = makeUpdatedSchema(value.schema());
schemaUpdateCache.put(value.schema(), updatedSchema);
}
final Struct updatedValue = new Struct(updatedSchema);
for (Field field : updatedSchema.fields()) {
final Object fieldValue = value.get(reverseRenamed(field.name()));
updatedValue.put(field.name(), fieldValue);
}
return newRecord(record, updatedSchema, updatedValue);
}
private Schema makeUpdatedSchema(Schema schema) {
final SchemaBuilder builder = SchemaUtil.copySchemaBasics(schema, SchemaBuilder.struct());
for (Field field : schema.fields()) {
if (filter(field.name())) {
builder.field(renamed(field.name()), field.schema());
}
}
return builder.build();
}
@Override
public ConfigDef config() {
return CONFIG_DEF;
}
@Override
public void close() {
schemaUpdateCache = null;
}
protected abstract Schema operatingSchema(R record);
protected abstract Object operatingValue(R record);
protected abstract R newRecord(R record, Schema updatedSchema, Object updatedValue);
public static class Key<R extends ConnectRecord<R>> extends ReplaceField<R> {
@Override
protected Schema operatingSchema(R record) {
return record.keySchema();
}
@Override
protected Object operatingValue(R record) {
return record.key();
}
@Override
protected R newRecord(R record, Schema updatedSchema, Object updatedValue) {
return record.newRecord(record.topic(), record.kafkaPartition(), updatedSchema, updatedValue, record.valueSchema(), record.value(), record.timestamp());
}
}
public static class Value<R extends ConnectRecord<R>> extends ReplaceField<R> {
@Override
protected Schema operatingSchema(R record) {
return record.valueSchema();
}
@Override
protected Object operatingValue(R record) {
return record.value();
}
@Override
protected R newRecord(R record, Schema updatedSchema, Object updatedValue) {
return record.newRecord(record.topic(), record.kafkaPartition(), record.keySchema(), record.key(), updatedSchema, updatedValue, record.timestamp());
}
}
}

124
connect/transforms/src/main/java/org/apache/kafka/connect/transforms/SetSchemaMetadata.java

@ -0,0 +1,124 @@ @@ -0,0 +1,124 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
* <p>
* http://www.apache.org/licenses/LICENSE-2.0
* <p>
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
**/
package org.apache.kafka.connect.transforms;
import org.apache.kafka.common.config.ConfigDef;
import org.apache.kafka.common.config.ConfigException;
import org.apache.kafka.connect.connector.ConnectRecord;
import org.apache.kafka.connect.data.ConnectSchema;
import org.apache.kafka.connect.data.Schema;
import org.apache.kafka.connect.transforms.util.SimpleConfig;
import java.util.Map;
import static org.apache.kafka.connect.transforms.util.Requirements.requireSchema;
public abstract class SetSchemaMetadata<R extends ConnectRecord<R>> implements Transformation<R> {
public static final String OVERVIEW_DOC =
"Set the schema name, version or both on the record's key (<code>" + Key.class.getCanonicalName() + "</code>)"
+ " or value (<code>" + Value.class.getCanonicalName() + "</code>) schema.";
private interface ConfigName {
String SCHEMA_NAME = "schema.name";
String SCHEMA_VERSION = "schema.version";
}
public static final ConfigDef CONFIG_DEF = new ConfigDef()
.define(ConfigName.SCHEMA_NAME, ConfigDef.Type.STRING, null, ConfigDef.Importance.HIGH, "Schema name to set.")
.define(ConfigName.SCHEMA_VERSION, ConfigDef.Type.INT, null, ConfigDef.Importance.HIGH, "Schema version to set.");
private String schemaName;
private Integer schemaVersion;
@Override
public void configure(Map<String, ?> configs) {
final SimpleConfig config = new SimpleConfig(CONFIG_DEF, configs);
schemaName = config.getString(ConfigName.SCHEMA_NAME);
schemaVersion = config.getInt(ConfigName.SCHEMA_VERSION);
if (schemaName == null && schemaVersion == null) {
throw new ConfigException("Neither schema name nor version configured");
}
}
@Override
public R apply(R record) {
final Schema schema = operatingSchema(record);
requireSchema(schema, "updating schema metadata");
final boolean isArray = schema.type() == Schema.Type.ARRAY;
final boolean isMap = schema.type() == Schema.Type.MAP;
final Schema updatedSchema = new ConnectSchema(
schema.type(),
schema.isOptional(),
schema.defaultValue(),
schemaName != null ? schemaName : schema.name(),
schemaVersion != null ? schemaVersion : schema.version(),
schema.doc(),
schema.parameters(),
schema.fields(),
isMap ? schema.keySchema() : null,
isMap || isArray ? schema.valueSchema() : null
);
return newRecord(record, updatedSchema);
}
@Override
public ConfigDef config() {
return CONFIG_DEF;
}
@Override
public void close() {
}
protected abstract Schema operatingSchema(R record);
protected abstract R newRecord(R record, Schema updatedSchema);
/**
* Set the schema name, version or both on the record's key schema.
*/
public static class Key<R extends ConnectRecord<R>> extends SetSchemaMetadata<R> {
@Override
protected Schema operatingSchema(R record) {
return record.keySchema();
}
@Override
protected R newRecord(R record, Schema updatedSchema) {
return record.newRecord(record.topic(), record.kafkaPartition(), updatedSchema, record.key(), record.valueSchema(), record.value(), record.timestamp());
}
}
/**
* Set the schema name, version or both on the record's value schema.
*/
public static class Value<R extends ConnectRecord<R>> extends SetSchemaMetadata<R> {
@Override
protected Schema operatingSchema(R record) {
return record.valueSchema();
}
@Override
protected R newRecord(R record, Schema updatedSchema) {
return record.newRecord(record.topic(), record.kafkaPartition(), record.keySchema(), record.key(), updatedSchema, record.value(), record.timestamp());
}
}
}

30
connect/transforms/src/main/java/org/apache/kafka/connect/transforms/TimestampRouter.java

@ -27,25 +27,25 @@ import java.util.Date; @@ -27,25 +27,25 @@ import java.util.Date;
import java.util.Map;
import java.util.TimeZone;
/**
* This transformation facilitates updating the record's topic field as a function of the original topic value and the record timestamp.
* <p/>
* It is mainly useful for sink connectors, since the topic field is often used to determine the equivalent entity name in the destination system
* (e.g. database table or search index name).
*/
public class TimestampRouter<R extends ConnectRecord<R>> implements Transformation<R> {
public interface Keys {
public static final String OVERVIEW_DOC =
"Update the record's topic field as a function of the original topic value and the record timestamp."
+ "<p/>"
+ "This is mainly useful for sink connectors, since the topic field is often used to determine the equivalent entity name in the destination system"
+ "(e.g. database table or search index name).";
public static final ConfigDef CONFIG_DEF = new ConfigDef()
.define(ConfigName.TOPIC_FORMAT, ConfigDef.Type.STRING, "${topic}-${timestamp}", ConfigDef.Importance.HIGH,
"Format string which can contain <code>${topic}</code> and <code>${timestamp}</code> as placeholders for the topic and timestamp, respectively.")
.define(ConfigName.TIMESTAMP_FORMAT, ConfigDef.Type.STRING, "yyyyMMdd", ConfigDef.Importance.HIGH,
"Format string for the timestamp that is compatible with <code>java.text.SimpleDateFormat</code>.");
private interface ConfigName {
String TOPIC_FORMAT = "topic.format";
String TIMESTAMP_FORMAT = "timestamp.format";
}
private static final ConfigDef CONFIG_DEF = new ConfigDef()
.define(Keys.TOPIC_FORMAT, ConfigDef.Type.STRING, "${topic}-${timestamp}", ConfigDef.Importance.HIGH,
"Format string which can contain ``${topic}`` and ``${timestamp}`` as placeholders for the topic and timestamp, respectively.")
.define(Keys.TIMESTAMP_FORMAT, ConfigDef.Type.STRING, "yyyyMMdd", ConfigDef.Importance.HIGH,
"Format string for the timestamp that is compatible with java.text.SimpleDateFormat.");
private String topicFormat;
private ThreadLocal<SimpleDateFormat> timestampFormat;
@ -53,9 +53,9 @@ public class TimestampRouter<R extends ConnectRecord<R>> implements Transformati @@ -53,9 +53,9 @@ public class TimestampRouter<R extends ConnectRecord<R>> implements Transformati
public void configure(Map<String, ?> props) {
final SimpleConfig config = new SimpleConfig(CONFIG_DEF, props);
topicFormat = config.getString(Keys.TOPIC_FORMAT);
topicFormat = config.getString(ConfigName.TOPIC_FORMAT);
final String timestampFormatStr = config.getString(Keys.TIMESTAMP_FORMAT);
final String timestampFormatStr = config.getString(ConfigName.TIMESTAMP_FORMAT);
timestampFormat = new ThreadLocal<SimpleDateFormat>() {
@Override
protected SimpleDateFormat initialValue() {

111
connect/transforms/src/main/java/org/apache/kafka/connect/transforms/ValueToKey.java

@ -0,0 +1,111 @@ @@ -0,0 +1,111 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
* <p>
* http://www.apache.org/licenses/LICENSE-2.0
* <p>
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
**/
package org.apache.kafka.connect.transforms;
import org.apache.kafka.common.cache.Cache;
import org.apache.kafka.common.cache.LRUCache;
import org.apache.kafka.common.cache.SynchronizedCache;
import org.apache.kafka.common.config.ConfigDef;
import org.apache.kafka.connect.connector.ConnectRecord;
import org.apache.kafka.connect.data.Schema;
import org.apache.kafka.connect.data.SchemaBuilder;
import org.apache.kafka.connect.data.Struct;
import org.apache.kafka.connect.transforms.util.NonEmptyListValidator;
import org.apache.kafka.connect.transforms.util.SimpleConfig;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import static org.apache.kafka.connect.transforms.util.Requirements.requireMap;
import static org.apache.kafka.connect.transforms.util.Requirements.requireStruct;
public class ValueToKey<R extends ConnectRecord<R>> implements Transformation<R> {
public static final String OVERVIEW_DOC = "Replace the record key with a new key formed from a subset of fields in the record value.";
public static final String FIELDS_CONFIG = "fields";
public static final ConfigDef CONFIG_DEF = new ConfigDef()
.define(FIELDS_CONFIG, ConfigDef.Type.LIST, ConfigDef.NO_DEFAULT_VALUE, new NonEmptyListValidator(), ConfigDef.Importance.HIGH,
"Field names on the record value to extract as the record key.");
private static final String PURPOSE = "copying fields from value to key";
private List<String> fields;
private Cache<Schema, Schema> valueToKeySchemaCache;
@Override
public void configure(Map<String, ?> configs) {
final SimpleConfig config = new SimpleConfig(CONFIG_DEF, configs);
fields = config.getList(FIELDS_CONFIG);
valueToKeySchemaCache = new SynchronizedCache<>(new LRUCache<Schema, Schema>(16));
}
@Override
public R apply(R record) {
if (record.valueSchema() == null) {
return applySchemaless(record);
} else {
return applyWithSchema(record);
}
}
private R applySchemaless(R record) {
final Map<String, Object> value = requireMap(record.value(), PURPOSE);
final Map<String, Object> key = new HashMap<>(fields.size());
for (String field : fields) {
key.put(field, value.get(field));
}
return record.newRecord(record.topic(), record.kafkaPartition(), null, key, record.valueSchema(), record.value(), record.timestamp());
}
private R applyWithSchema(R record) {
final Struct value = requireStruct(record.value(), PURPOSE);
Schema keySchema = valueToKeySchemaCache.get(value.schema());
if (keySchema == null) {
final SchemaBuilder keySchemaBuilder = SchemaBuilder.struct();
for (String field : fields) {
final Schema fieldSchema = value.schema().field(field).schema();
keySchemaBuilder.field(field, fieldSchema);
}
keySchema = keySchemaBuilder.build();
valueToKeySchemaCache.put(value.schema(), keySchema);
}
final Struct key = new Struct(keySchema);
for (String field : fields) {
key.put(field, value.get(field));
}
return record.newRecord(record.topic(), record.kafkaPartition(), keySchema, key, value.schema(), value, record.timestamp());
}
@Override
public ConfigDef config() {
return CONFIG_DEF;
}
@Override
public void close() {
valueToKeySchemaCache = null;
}
}

39
connect/transforms/src/main/java/org/apache/kafka/connect/transforms/util/NonEmptyListValidator.java

@ -0,0 +1,39 @@ @@ -0,0 +1,39 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
* <p>
* http://www.apache.org/licenses/LICENSE-2.0
* <p>
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
**/
package org.apache.kafka.connect.transforms.util;
import org.apache.kafka.common.config.ConfigDef;
import org.apache.kafka.common.config.ConfigException;
import java.util.List;
public class NonEmptyListValidator implements ConfigDef.Validator {
@Override
public void ensureValid(String name, Object value) {
if (((List) value).isEmpty()) {
throw new ConfigException(name, value, "Empty list");
}
}
@Override
public String toString() {
return "non-empty list";
}
}

41
connect/transforms/src/main/java/org/apache/kafka/connect/transforms/util/RegexValidator.java

@ -0,0 +1,41 @@ @@ -0,0 +1,41 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
* <p>
* http://www.apache.org/licenses/LICENSE-2.0
* <p>
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
**/
package org.apache.kafka.connect.transforms.util;
import org.apache.kafka.common.config.ConfigDef;
import org.apache.kafka.common.config.ConfigException;
import java.util.regex.Pattern;
public class RegexValidator implements ConfigDef.Validator {
@Override
public void ensureValid(String name, Object value) {
try {
Pattern.compile((String) value);
} catch (Exception e) {
throw new ConfigException(name, value, "Invalid regex: " + e.getMessage());
}
}
@Override
public String toString() {
return "valid regex";
}
}

61
connect/transforms/src/main/java/org/apache/kafka/connect/transforms/util/Requirements.java

@ -0,0 +1,61 @@ @@ -0,0 +1,61 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
* <p>
* http://www.apache.org/licenses/LICENSE-2.0
* <p>
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
**/
package org.apache.kafka.connect.transforms.util;
import org.apache.kafka.connect.connector.ConnectRecord;
import org.apache.kafka.connect.data.Schema;
import org.apache.kafka.connect.data.Struct;
import org.apache.kafka.connect.errors.DataException;
import org.apache.kafka.connect.sink.SinkRecord;
import java.util.Map;
public class Requirements {
public static void requireSchema(Schema schema, String purpose) {
if (schema == null) {
throw new DataException("Schema required for [" + purpose + "]");
}
}
public static Map<String, Object> requireMap(Object value, String purpose) {
if (!(value instanceof Map)) {
throw new DataException("Only Map objects supported in absence of schema for [" + purpose + "], found: " + nullSafeClassName(value));
}
return (Map<String, Object>) value;
}
public static Struct requireStruct(Object value, String purpose) {
if (!(value instanceof Struct)) {
throw new DataException("Only Struct objects supported for [" + purpose + "], found: " + nullSafeClassName(value));
}
return (Struct) value;
}
public static SinkRecord requireSinkRecord(ConnectRecord<?> record, String purpose) {
if (!(record instanceof SinkRecord)) {
throw new DataException("Only SinkRecord supported for [" + purpose + "], found: " + nullSafeClassName(record));
}
return (SinkRecord) record;
}
private static String nullSafeClassName(Object x) {
return x == null ? "null" : x.getClass().getCanonicalName();
}
}

40
connect/transforms/src/main/java/org/apache/kafka/connect/transforms/util/SchemaUtil.java

@ -0,0 +1,40 @@ @@ -0,0 +1,40 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
* <p/>
* http://www.apache.org/licenses/LICENSE-2.0
* <p/>
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
**/
package org.apache.kafka.connect.transforms.util;
import org.apache.kafka.connect.data.Schema;
import org.apache.kafka.connect.data.SchemaBuilder;
import java.util.Map;
public class SchemaUtil {
public static SchemaBuilder copySchemaBasics(Schema source, SchemaBuilder builder) {
builder.name(source.name());
builder.version(source.version());
builder.doc(source.doc());
final Map<String, String> params = source.parameters();
if (params != null) {
builder.parameters(params);
}
return builder;
}
}

59
connect/transforms/src/test/java/org/apache/kafka/connect/transforms/ExtractFieldTest.java

@ -0,0 +1,59 @@ @@ -0,0 +1,59 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
* <p/>
* http://www.apache.org/licenses/LICENSE-2.0
* <p/>
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
**/
package org.apache.kafka.connect.transforms;
import org.apache.kafka.connect.data.Schema;
import org.apache.kafka.connect.data.SchemaBuilder;
import org.apache.kafka.connect.data.Struct;
import org.apache.kafka.connect.sink.SinkRecord;
import org.junit.Test;
import java.util.Collections;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNull;
public class ExtractFieldTest {
@Test
public void schemaless() {
final ExtractField<SinkRecord> xform = new ExtractField.Key<>();
xform.configure(Collections.singletonMap("field", "magic"));
final SinkRecord record = new SinkRecord("test", 0, null, Collections.singletonMap("magic", 42), null, null, 0);
final SinkRecord transformedRecord = xform.apply(record);
assertNull(transformedRecord.keySchema());
assertEquals(42, transformedRecord.key());
}
@Test
public void withSchema() {
final ExtractField<SinkRecord> xform = new ExtractField.Key<>();
xform.configure(Collections.singletonMap("field", "magic"));
final Schema keySchema = SchemaBuilder.struct().field("magic", Schema.INT32_SCHEMA).build();
final Struct key = new Struct(keySchema).put("magic", 42);
final SinkRecord record = new SinkRecord("test", 0, keySchema, key, null, null, 0);
final SinkRecord transformedRecord = xform.apply(record);
assertEquals(Schema.INT32_SCHEMA, transformedRecord.keySchema());
assertEquals(42, transformedRecord.key());
}
}

19
connect/transforms/src/test/java/org/apache/kafka/connect/transforms/HoistToStructTest.java → connect/transforms/src/test/java/org/apache/kafka/connect/transforms/HoistFieldTest.java

@ -25,12 +25,25 @@ import org.junit.Test; @@ -25,12 +25,25 @@ import org.junit.Test;
import java.util.Collections;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNull;
public class HoistToStructTest {
public class HoistFieldTest {
@Test
public void sanityCheck() {
final HoistToStruct<SinkRecord> xform = new HoistToStruct.Key<>();
public void schemaless() {
final HoistField<SinkRecord> xform = new HoistField.Key<>();
xform.configure(Collections.singletonMap("field", "magic"));
final SinkRecord record = new SinkRecord("test", 0, null, 42, null, null, 0);
final SinkRecord transformedRecord = xform.apply(record);
assertNull(transformedRecord.keySchema());
assertEquals(Collections.singletonMap("magic", 42), transformedRecord.key());
}
@Test
public void withSchema() {
final HoistField<SinkRecord> xform = new HoistField.Key<>();
xform.configure(Collections.singletonMap("field", "magic"));
final SinkRecord record = new SinkRecord("test", 0, Schema.INT32_SCHEMA, 42, null, null, 0);

156
connect/transforms/src/test/java/org/apache/kafka/connect/transforms/MaskFieldTest.java

@ -0,0 +1,156 @@ @@ -0,0 +1,156 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
* <p/>
* http://www.apache.org/licenses/LICENSE-2.0
* <p/>
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
**/
package org.apache.kafka.connect.transforms;
import org.apache.kafka.connect.data.Decimal;
import org.apache.kafka.connect.data.Field;
import org.apache.kafka.connect.data.Schema;
import org.apache.kafka.connect.data.SchemaBuilder;
import org.apache.kafka.connect.data.Struct;
import org.apache.kafka.connect.data.Time;
import org.apache.kafka.connect.data.Timestamp;
import org.apache.kafka.connect.sink.SinkRecord;
import org.junit.Test;
import java.math.BigDecimal;
import java.math.BigInteger;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.Date;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import static org.junit.Assert.assertEquals;
public class MaskFieldTest {
private static MaskField<SinkRecord> transform(List<String> fields) {
final MaskField<SinkRecord> xform = new MaskField.Value<>();
xform.configure(Collections.singletonMap("fields", fields));
return xform;
}
private static SinkRecord record(Schema schema, Object value) {
return new SinkRecord("", 0, null, null, schema, value, 0);
}
@Test
public void schemaless() {
final Map<String, Object> value = new HashMap<>();
value.put("magic", 42);
value.put("bool", true);
value.put("byte", (byte) 42);
value.put("short", (short) 42);
value.put("int", 42);
value.put("long", 42L);
value.put("float", 42f);
value.put("double", 42d);
value.put("string", "blabla");
value.put("date", new Date());
value.put("bigint", new BigInteger("42"));
value.put("bigdec", new BigDecimal("42.0"));
value.put("list", Collections.singletonList(42));
value.put("map", Collections.singletonMap("key", "value"));
final List<String> maskFields = new ArrayList<>(value.keySet());
maskFields.remove("magic");
final Map<String, Object> updatedValue = (Map) transform(maskFields).apply(record(null, value)).value();
assertEquals(42, updatedValue.get("magic"));
assertEquals(false, updatedValue.get("bool"));
assertEquals((byte) 0, updatedValue.get("byte"));
assertEquals((short) 0, updatedValue.get("short"));
assertEquals(0, updatedValue.get("int"));
assertEquals(0L, updatedValue.get("long"));
assertEquals(0f, updatedValue.get("float"));
assertEquals(0d, updatedValue.get("double"));
assertEquals("", updatedValue.get("string"));
assertEquals(new Date(0), updatedValue.get("date"));
assertEquals(BigInteger.ZERO, updatedValue.get("bigint"));
assertEquals(BigDecimal.ZERO, updatedValue.get("bigdec"));
assertEquals(Collections.emptyList(), updatedValue.get("list"));
assertEquals(Collections.emptyMap(), updatedValue.get("map"));
}
@Test
public void withSchema() {
Schema schema = SchemaBuilder.struct()
.field("magic", Schema.INT32_SCHEMA)
.field("bool", Schema.BOOLEAN_SCHEMA)
.field("byte", Schema.INT8_SCHEMA)
.field("short", Schema.INT16_SCHEMA)
.field("int", Schema.INT32_SCHEMA)
.field("long", Schema.INT64_SCHEMA)
.field("float", Schema.FLOAT32_SCHEMA)
.field("double", Schema.FLOAT64_SCHEMA)
.field("string", Schema.STRING_SCHEMA)
.field("date", org.apache.kafka.connect.data.Date.SCHEMA)
.field("time", Time.SCHEMA)
.field("timestamp", Timestamp.SCHEMA)
.field("decimal", Decimal.schema(0))
.field("array", SchemaBuilder.array(Schema.INT32_SCHEMA))
.field("map", SchemaBuilder.map(Schema.STRING_SCHEMA, Schema.STRING_SCHEMA))
.build();
final Struct value = new Struct(schema);
value.put("magic", 42);
value.put("bool", true);
value.put("byte", (byte) 42);
value.put("short", (short) 42);
value.put("int", 42);
value.put("long", 42L);
value.put("float", 42f);
value.put("double", 42d);
value.put("string", "hmm");
value.put("date", new Date());
value.put("time", new Date());
value.put("timestamp", new Date());
value.put("decimal", new BigDecimal(42));
value.put("array", Arrays.asList(1, 2, 3));
value.put("map", Collections.singletonMap("what", "what"));
final List<String> maskFields = new ArrayList<>(schema.fields().size());
for (Field field: schema.fields()) {
if (!field.name().equals("magic")) {
maskFields.add(field.name());
}
}
final Struct updatedValue = (Struct) transform(maskFields).apply(record(schema, value)).value();
assertEquals(42, updatedValue.get("magic"));
assertEquals(false, updatedValue.get("bool"));
assertEquals((byte) 0, updatedValue.get("byte"));
assertEquals((short) 0, updatedValue.get("short"));
assertEquals(0, updatedValue.get("int"));
assertEquals(0L, updatedValue.get("long"));
assertEquals(0f, updatedValue.get("float"));
assertEquals(0d, updatedValue.get("double"));
assertEquals("", updatedValue.get("string"));
assertEquals(new Date(0), updatedValue.get("date"));
assertEquals(new Date(0), updatedValue.get("time"));
assertEquals(new Date(0), updatedValue.get("timestamp"));
assertEquals(BigDecimal.ZERO, updatedValue.get("decimal"));
assertEquals(Collections.emptyList(), updatedValue.get("array"));
assertEquals(Collections.emptyMap(), updatedValue.get("map"));
}
}

70
connect/transforms/src/test/java/org/apache/kafka/connect/transforms/RegexRouterTest.java

@ -0,0 +1,70 @@ @@ -0,0 +1,70 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
* <p>
* http://www.apache.org/licenses/LICENSE-2.0
* <p>
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
**/
package org.apache.kafka.connect.transforms;
import org.apache.kafka.connect.sink.SinkRecord;
import org.junit.Test;
import java.util.HashMap;
import java.util.Map;
import static org.junit.Assert.assertEquals;
public class RegexRouterTest {
private static String apply(String regex, String replacement, String topic) {
final Map<String, String> props = new HashMap<>();
props.put("regex", regex);
props.put("replacement", replacement);
final RegexRouter<SinkRecord> router = new RegexRouter<>();
router.configure(props);
return router.apply(new SinkRecord(topic, 0, null, null, null, null, 0))
.topic();
}
@Test
public void staticReplacement() {
assertEquals("bar", apply("foo", "bar", "foo"));
}
@Test
public void doesntMatch() {
assertEquals("orig", apply("foo", "bar", "orig"));
}
@Test
public void identity() {
assertEquals("orig", apply("(.*)", "$1", "orig"));
}
@Test
public void addPrefix() {
assertEquals("prefix-orig", apply("(.*)", "prefix-$1", "orig"));
}
@Test
public void addSuffix() {
assertEquals("orig-suffix", apply("(.*)", "$1-suffix", "orig"));
}
@Test
public void slice() {
assertEquals("index", apply("(.*)-(\\d\\d\\d\\d\\d\\d\\d\\d)", "$1", "index-20160117"));
}
}

92
connect/transforms/src/test/java/org/apache/kafka/connect/transforms/ReplaceFieldTest.java

@ -0,0 +1,92 @@ @@ -0,0 +1,92 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
* <p>
* http://www.apache.org/licenses/LICENSE-2.0
* <p>
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
**/
package org.apache.kafka.connect.transforms;
import org.apache.kafka.connect.data.Schema;
import org.apache.kafka.connect.data.SchemaBuilder;
import org.apache.kafka.connect.data.Struct;
import org.apache.kafka.connect.sink.SinkRecord;
import org.junit.Test;
import java.util.HashMap;
import java.util.Map;
import static org.junit.Assert.assertEquals;
public class ReplaceFieldTest {
@Test
public void schemaless() {
final ReplaceField<SinkRecord> xform = new ReplaceField.Value<>();
final Map<String, String> props = new HashMap<>();
props.put("blacklist", "dont");
props.put("renames", "abc:xyz,foo:bar");
xform.configure(props);
final Map<String, Object> value = new HashMap<>();
value.put("dont", "whatever");
value.put("abc", 42);
value.put("foo", true);
value.put("etc", "etc");
final SinkRecord record = new SinkRecord("test", 0, null, null, null, value, 0);
final SinkRecord transformedRecord = xform.apply(record);
final Map updatedValue = (Map) transformedRecord.value();
assertEquals(3, updatedValue.size());
assertEquals(42, updatedValue.get("xyz"));
assertEquals(true, updatedValue.get("bar"));
assertEquals("etc", updatedValue.get("etc"));
}
@Test
public void withSchema() {
final ReplaceField<SinkRecord> xform = new ReplaceField.Value<>();
final Map<String, String> props = new HashMap<>();
props.put("whitelist", "abc,foo");
props.put("renames", "abc:xyz,foo:bar");
xform.configure(props);
final Schema schema = SchemaBuilder.struct()
.field("dont", Schema.STRING_SCHEMA)
.field("abc", Schema.INT32_SCHEMA)
.field("foo", Schema.BOOLEAN_SCHEMA)
.field("etc", Schema.STRING_SCHEMA)
.build();
final Struct value = new Struct(schema);
value.put("dont", "whatever");
value.put("abc", 42);
value.put("foo", true);
value.put("etc", "etc");
final SinkRecord record = new SinkRecord("test", 0, null, null, schema, value, 0);
final SinkRecord transformedRecord = xform.apply(record);
final Struct updatedValue = (Struct) transformedRecord.value();
assertEquals(2, updatedValue.schema().fields().size());
assertEquals(new Integer(42), updatedValue.getInt32("xyz"));
assertEquals(true, updatedValue.getBoolean("bar"));
}
}

67
connect/transforms/src/test/java/org/apache/kafka/connect/transforms/SetSchemaMetadataTest.java

@ -0,0 +1,67 @@ @@ -0,0 +1,67 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
* <p>
* http://www.apache.org/licenses/LICENSE-2.0
* <p>
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
**/
package org.apache.kafka.connect.transforms;
import org.apache.kafka.connect.data.SchemaBuilder;
import org.apache.kafka.connect.sink.SinkRecord;
import org.junit.Test;
import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
import static org.junit.Assert.assertEquals;
public class SetSchemaMetadataTest {
@Test
public void schemaNameUpdate() {
final SetSchemaMetadata<SinkRecord> xform = new SetSchemaMetadata.Value<>();
xform.configure(Collections.singletonMap("schema.name", "foo"));
final SinkRecord record = new SinkRecord("", 0, null, null, SchemaBuilder.struct().build(), null, 0);
final SinkRecord updatedRecord = xform.apply(record);
assertEquals("foo", updatedRecord.valueSchema().name());
}
@Test
public void schemaVersionUpdate() {
final SetSchemaMetadata<SinkRecord> xform = new SetSchemaMetadata.Value<>();
xform.configure(Collections.singletonMap("schema.version", 42));
final SinkRecord record = new SinkRecord("", 0, null, null, SchemaBuilder.struct().build(), null, 0);
final SinkRecord updatedRecord = xform.apply(record);
assertEquals(new Integer(42), updatedRecord.valueSchema().version());
}
@Test
public void schemaNameAndVersionUpdate() {
final Map<String, String> props = new HashMap<>();
props.put("schema.name", "foo");
props.put("schema.version", "42");
final SetSchemaMetadata<SinkRecord> xform = new SetSchemaMetadata.Value<>();
xform.configure(props);
final SinkRecord record = new SinkRecord("", 0, null, null, SchemaBuilder.struct().build(), null, 0);
final SinkRecord updatedRecord = xform.apply(record);
assertEquals("foo", updatedRecord.valueSchema().name());
assertEquals(new Integer(42), updatedRecord.valueSchema().version());
}
}

87
connect/transforms/src/test/java/org/apache/kafka/connect/transforms/ValueToKeyTest.java

@ -0,0 +1,87 @@ @@ -0,0 +1,87 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
* <p>
* http://www.apache.org/licenses/LICENSE-2.0
* <p>
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
**/
package org.apache.kafka.connect.transforms;
import org.apache.kafka.connect.data.Schema;
import org.apache.kafka.connect.data.SchemaBuilder;
import org.apache.kafka.connect.data.Struct;
import org.apache.kafka.connect.sink.SinkRecord;
import org.junit.Test;
import java.util.Collections;
import java.util.HashMap;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNull;
public class ValueToKeyTest {
@Test
public void schemaless() {
final ValueToKey<SinkRecord> xform = new ValueToKey<>();
xform.configure(Collections.singletonMap("fields", "a,b"));
final HashMap<String, Integer> value = new HashMap<>();
value.put("a", 1);
value.put("b", 2);
value.put("c", 3);
final SinkRecord record = new SinkRecord("", 0, null, null, null, value, 0);
final SinkRecord transformedRecord = xform.apply(record);
final HashMap<String, Integer> expectedKey = new HashMap<>();
expectedKey.put("a", 1);
expectedKey.put("b", 2);
assertNull(transformedRecord.keySchema());
assertEquals(expectedKey, transformedRecord.key());
}
@Test
public void withSchema() {
final ValueToKey<SinkRecord> xform = new ValueToKey<>();
xform.configure(Collections.singletonMap("fields", "a,b"));
final Schema valueSchema = SchemaBuilder.struct()
.field("a", Schema.INT32_SCHEMA)
.field("b", Schema.INT32_SCHEMA)
.field("c", Schema.INT32_SCHEMA)
.build();
final Struct value = new Struct(valueSchema);
value.put("a", 1);
value.put("b", 2);
value.put("c", 3);
final SinkRecord record = new SinkRecord("", 0, null, null, valueSchema, value, 0);
final SinkRecord transformedRecord = xform.apply(record);
final Schema expectedKeySchema = SchemaBuilder.struct()
.field("a", Schema.INT32_SCHEMA)
.field("b", Schema.INT32_SCHEMA)
.build();
final Struct expectedKey = new Struct(expectedKeySchema)
.put("a", 1)
.put("b", 2);
assertEquals(expectedKeySchema, transformedRecord.keySchema());
assertEquals(expectedKey, transformedRecord.key());
}
}

16
docs/connect.html

@ -100,6 +100,22 @@ @@ -100,6 +100,22 @@
For any other options, you should consult the documentation for the connector.
<h4><a id="connect_transforms" href="#connect_transforms">Transformations</a></h4>
Connectors can be configured with transformations to make lightweight message-at-a-time modifications. They can be convenient for minor data massaging and routing changes.
A transformation chain can be specified in the connector configuration.
<ul>
<li><code>transforms</code> - List of aliases for the transformation, specifying the order in which the transformations will be applied.</li>
<li><code>transforms.$alias.type</code> - Fully qualified class name for the transformation.</li>
<li><code>transforms.$alias.$transformationSpecificConfig</code> Configuration properties for the transformation</li>
</ul>
Several widely-applicable data and routing transformations are included with Kafka Connect:
<!--#include virtual="generated/connect_transforms.html" -->
<h4><a id="connect_rest" href="#connect_rest">REST API</a></h4>
Since Kafka Connect is intended to be run as a service, it also provides a REST API for managing connectors. By default, this service runs on port 8083. The following are the currently supported endpoints:

2
tests/kafkatest/tests/connect/connect_distributed_test.py

@ -460,7 +460,7 @@ class ConnectDistributedTest(Test): @@ -460,7 +460,7 @@ class ConnectDistributedTest(Test):
'file': self.INPUT_FILE,
'topic': self.TOPIC,
'transforms': 'hoistToStruct,insertTimestampField',
'transforms.hoistToStruct.type': 'org.apache.kafka.connect.transforms.HoistToStruct$Value',
'transforms.hoistToStruct.type': 'org.apache.kafka.connect.transforms.HoistField$Value',
'transforms.hoistToStruct.field': 'content',
'transforms.insertTimestampField.type': 'org.apache.kafka.connect.transforms.InsertField$Value',
'transforms.insertTimestampField.timestamp.field': ts_fieldname,

Loading…
Cancel
Save