diff --git a/streams/streams-scala/src/main/scala/org/apache/kafka/streams/scala/FunctionsCompatConversions.scala b/streams/streams-scala/src/main/scala/org/apache/kafka/streams/scala/FunctionsCompatConversions.scala index 7cd3ac83283..26756e076b9 100644 --- a/streams/streams-scala/src/main/scala/org/apache/kafka/streams/scala/FunctionsCompatConversions.scala +++ b/streams/streams-scala/src/main/scala/org/apache/kafka/streams/scala/FunctionsCompatConversions.scala @@ -21,6 +21,8 @@ import org.apache.kafka.streams.kstream._ import scala.collection.JavaConverters._ import java.lang.{Iterable => JIterable} +import org.apache.kafka.streams.processor.ProcessorContext + /** * Implicit classes that offer conversions of Scala function literals to * SAM (Single Abstract Method) objects in Java. These make the Scala APIs much @@ -120,4 +122,40 @@ private[scala] object FunctionsCompatConversions { override def get(): Transformer[K, V, VO] = f() } } + + implicit class TransformerSupplierAsJava[K, V, VO](val supplier: TransformerSupplier[K, V, Iterable[VO]]) + extends AnyVal { + def asJava: TransformerSupplier[K, V, JIterable[VO]] = new TransformerSupplier[K, V, JIterable[VO]] { + override def get(): Transformer[K, V, JIterable[VO]] = + new Transformer[K, V, JIterable[VO]] { + override def transform(key: K, value: V): JIterable[VO] = supplier.get().transform(key, value).asJava + override def init(context: ProcessorContext): Unit = supplier.get().init(context) + override def close(): Unit = supplier.get().close() + } + } + } + implicit class ValueTransformerSupplierAsJava[V, VO](val supplier: ValueTransformerSupplier[V, Iterable[VO]]) + extends AnyVal { + def asJava: ValueTransformerSupplier[V, JIterable[VO]] = new ValueTransformerSupplier[V, JIterable[VO]] { + override def get(): ValueTransformer[V, JIterable[VO]] = + new ValueTransformer[V, JIterable[VO]] { + override def transform(value: V): JIterable[VO] = supplier.get().transform(value).asJava + override def init(context: ProcessorContext): Unit = supplier.get().init(context) + override def close(): Unit = supplier.get().close() + } + } + } + implicit class ValueTransformerSupplierWithKeyAsJava[K, V, VO]( + val supplier: ValueTransformerWithKeySupplier[K, V, Iterable[VO]] + ) extends AnyVal { + def asJava: ValueTransformerWithKeySupplier[K, V, JIterable[VO]] = + new ValueTransformerWithKeySupplier[K, V, JIterable[VO]] { + override def get(): ValueTransformerWithKey[K, V, JIterable[VO]] = + new ValueTransformerWithKey[K, V, JIterable[VO]] { + override def transform(key: K, value: V): JIterable[VO] = supplier.get().transform(key, value).asJava + override def init(context: ProcessorContext): Unit = supplier.get().init(context) + override def close(): Unit = supplier.get().close() + } + } + } } diff --git a/streams/streams-scala/src/main/scala/org/apache/kafka/streams/scala/kstream/KStream.scala b/streams/streams-scala/src/main/scala/org/apache/kafka/streams/scala/kstream/KStream.scala index 4627875f8f0..115fc216c92 100644 --- a/streams/streams-scala/src/main/scala/org/apache/kafka/streams/scala/kstream/KStream.scala +++ b/streams/streams-scala/src/main/scala/org/apache/kafka/streams/scala/kstream/KStream.scala @@ -307,6 +307,63 @@ class KStream[K, V](val inner: KStreamJ[K, V]) { stateStoreNames: String*): KStream[K1, V1] = inner.transform(transformerSupplier, stateStoreNames: _*) + /** + * Transform each record of the input stream into zero or more records in the output stream (both key and value type + * can be altered arbitrarily). + * A `Transformer` (provided by the given `TransformerSupplier`) is applied to each input record + * and computes zero or more output records. + * In order to assign a state, the state must be created and added via `addStateStore` before they can be connected + * to the `Transformer`. + * It's not required to connect global state stores that are added via `addGlobalStore`; + * read-only access to global state stores is available by default. + * + * @param transformerSupplier the `TransformerSuplier` that generates `Transformer` + * @param stateStoreNames the names of the state stores used by the processor + * @return a [[KStream]] that contains more or less records with new key and value (possibly of different type) + * @see `org.apache.kafka.streams.kstream.KStream#transform` + */ + def flatTransform[K1, V1](transformerSupplier: TransformerSupplier[K, V, Iterable[KeyValue[K1, V1]]], + stateStoreNames: String*): KStream[K1, V1] = + inner.flatTransform(transformerSupplier.asJava, stateStoreNames: _*) + + /** + * Transform the value of each input record into zero or more records (with possible new type) in the + * output stream. + * A `ValueTransformer` (provided by the given `ValueTransformerSupplier`) is applied to each input + * record value and computes a new value for it. + * In order to assign a state, the state must be created and added via `addStateStore` before they can be connected + * to the `ValueTransformer`. + * It's not required to connect global state stores that are added via `addGlobalStore`; + * read-only access to global state stores is available by default. + * + * @param valueTransformerSupplier a instance of `ValueTransformerSupplier` that generates a `ValueTransformer` + * @param stateStoreNames the names of the state stores used by the processor + * @return a [[KStream]] that contains records with unmodified key and new values (possibly of different type) + * @see `org.apache.kafka.streams.kstream.KStream#transformValues` + */ + def flatTransformValues[VR](valueTransformerSupplier: ValueTransformerSupplier[V, Iterable[VR]], + stateStoreNames: String*): KStream[K, VR] = + inner.flatTransformValues[VR](valueTransformerSupplier.asJava, stateStoreNames: _*) + + /** + * Transform the value of each input record into zero or more records (with possible new type) in the + * output stream. + * A `ValueTransformer` (provided by the given `ValueTransformerSupplier`) is applied to each input + * record value and computes a new value for it. + * In order to assign a state, the state must be created and added via `addStateStore` before they can be connected + * to the `ValueTransformer`. + * It's not required to connect global state stores that are added via `addGlobalStore`; + * read-only access to global state stores is available by default. + * + * @param valueTransformerSupplier a instance of `ValueTransformerWithKeySupplier` that generates a `ValueTransformerWithKey` + * @param stateStoreNames the names of the state stores used by the processor + * @return a [[KStream]] that contains records with unmodified key and new values (possibly of different type) + * @see `org.apache.kafka.streams.kstream.KStream#transformValues` + */ + def flatTransformValues[VR](valueTransformerSupplier: ValueTransformerWithKeySupplier[K, V, Iterable[VR]], + stateStoreNames: String*): KStream[K, VR] = + inner.flatTransformValues[VR](valueTransformerSupplier.asJava, stateStoreNames: _*) + /** * Transform the value of each input record into a new value (with possible new type) of the output record. * A `ValueTransformer` (provided by the given `ValueTransformerSupplier`) is applied to each input diff --git a/streams/streams-scala/src/test/scala/org/apache/kafka/streams/scala/kstream/KStreamTest.scala b/streams/streams-scala/src/test/scala/org/apache/kafka/streams/scala/kstream/KStreamTest.scala index b3bcfe95243..36441a07107 100644 --- a/streams/streams-scala/src/test/scala/org/apache/kafka/streams/scala/kstream/KStreamTest.scala +++ b/streams/streams-scala/src/test/scala/org/apache/kafka/streams/scala/kstream/KStreamTest.scala @@ -21,7 +21,17 @@ package org.apache.kafka.streams.scala.kstream import java.time.Duration.ofSeconds import java.time.Instant -import org.apache.kafka.streams.kstream.JoinWindows +import org.apache.kafka.streams.KeyValue +import org.apache.kafka.streams.kstream.{ + JoinWindows, + Transformer, + TransformerSupplier, + ValueTransformer, + ValueTransformerSupplier, + ValueTransformerWithKey, + ValueTransformerWithKeySupplier +} +import org.apache.kafka.streams.processor.ProcessorContext import org.apache.kafka.streams.scala.ImplicitConversions._ import org.apache.kafka.streams.scala.Serdes._ import org.apache.kafka.streams.scala.StreamsBuilder @@ -173,4 +183,140 @@ class KStreamTest extends FlatSpec with Matchers with TestDriver { testDriver.close() } + + "transform a KStream" should "transform correctly records" in { + class TestTransformer extends Transformer[String, String, KeyValue[String, String]] { + override def init(context: ProcessorContext): Unit = {} + override def transform(key: String, value: String): KeyValue[String, String] = + new KeyValue(s"$key-transformed", s"$value-transformed") + override def close(): Unit = {} + } + val builder = new StreamsBuilder() + val sourceTopic = "source" + val sinkTopic = "sink" + + val stream = builder.stream[String, String](sourceTopic) + stream + .transform(new TransformerSupplier[String, String, KeyValue[String, String]] { + def get(): Transformer[String, String, KeyValue[String, String]] = + new TestTransformer + }) + .to(sinkTopic) + + val now = Instant.now() + val testDriver = createTestDriver(builder, now) + val testInput = testDriver.createInput[String, String](sourceTopic) + val testOutput = testDriver.createOutput[String, String](sinkTopic) + + testInput.pipeInput("1", "value", now) + + val result = testOutput.readKeyValue() + result.value shouldBe "value-transformed" + result.key shouldBe "1-transformed" + + testOutput.isEmpty shouldBe true + + testDriver.close() + } + + "flatTransform a KStream" should "flatTransform correctly records" in { + class TestTransformer extends Transformer[String, String, Iterable[KeyValue[String, String]]] { + override def init(context: ProcessorContext): Unit = {} + override def transform(key: String, value: String): Iterable[KeyValue[String, String]] = + Array(new KeyValue(s"$key-transformed", s"$value-transformed")) + override def close(): Unit = {} + } + val builder = new StreamsBuilder() + val sourceTopic = "source" + val sinkTopic = "sink" + + val stream = builder.stream[String, String](sourceTopic) + stream + .flatTransform(new TransformerSupplier[String, String, Iterable[KeyValue[String, String]]] { + def get(): Transformer[String, String, Iterable[KeyValue[String, String]]] = + new TestTransformer + }) + .to(sinkTopic) + + val now = Instant.now() + val testDriver = createTestDriver(builder, now) + val testInput = testDriver.createInput[String, String](sourceTopic) + val testOutput = testDriver.createOutput[String, String](sinkTopic) + + testInput.pipeInput("1", "value", now) + + val result = testOutput.readKeyValue() + result.value shouldBe "value-transformed" + result.key shouldBe "1-transformed" + + testOutput.isEmpty shouldBe true + + testDriver.close() + } + + "flatTransformValues a KStream" should "correctly flatTransform values in records" in { + class TestTransformer extends ValueTransformer[String, Iterable[String]] { + override def init(context: ProcessorContext): Unit = {} + override def transform(value: String): Iterable[String] = + Array(s"$value-transformed") + override def close(): Unit = {} + } + val builder = new StreamsBuilder() + val sourceTopic = "source" + val sinkTopic = "sink" + + val stream = builder.stream[String, String](sourceTopic) + stream + .flatTransformValues(new ValueTransformerSupplier[String, Iterable[String]] { + def get(): ValueTransformer[String, Iterable[String]] = + new TestTransformer + }) + .to(sinkTopic) + + val now = Instant.now() + val testDriver = createTestDriver(builder, now) + val testInput = testDriver.createInput[String, String](sourceTopic) + val testOutput = testDriver.createOutput[String, String](sinkTopic) + + testInput.pipeInput("1", "value", now) + + testOutput.readValue shouldBe "value-transformed" + + testOutput.isEmpty shouldBe true + + testDriver.close() + } + + "flatTransformValues with key in a KStream" should "correctly flatTransformValues in records" in { + class TestTransformer extends ValueTransformerWithKey[String, String, Iterable[String]] { + override def init(context: ProcessorContext): Unit = {} + override def transform(key: String, value: String): Iterable[String] = + Array(s"$value-transformed-$key") + override def close(): Unit = {} + } + val builder = new StreamsBuilder() + val sourceTopic = "source" + val sinkTopic = "sink" + + val stream = builder.stream[String, String](sourceTopic) + stream + .flatTransformValues(new ValueTransformerWithKeySupplier[String, String, Iterable[String]] { + def get(): ValueTransformerWithKey[String, String, Iterable[String]] = + new TestTransformer + }) + .to(sinkTopic) + + val now = Instant.now() + val testDriver = createTestDriver(builder, now) + val testInput = testDriver.createInput[String, String](sourceTopic) + val testOutput = testDriver.createOutput[String, String](sinkTopic) + + testInput.pipeInput("1", "value", now) + + testOutput.readValue shouldBe "value-transformed-1" + + testOutput.isEmpty shouldBe true + + testDriver.close() + } }