Browse Source

KAFKA-9011: Scala bindings for flatTransform and flatTransformValues in KStream (#7520)

Reviewers: John Roesler <john@confluent.io>, Bill Bejeck <bbejeck@gmail.com>
pull/7685/head
Alex Kokachev 5 years ago committed by Bill Bejeck
parent
commit
9a125a72a2
  1. 38
      streams/streams-scala/src/main/scala/org/apache/kafka/streams/scala/FunctionsCompatConversions.scala
  2. 57
      streams/streams-scala/src/main/scala/org/apache/kafka/streams/scala/kstream/KStream.scala
  3. 148
      streams/streams-scala/src/test/scala/org/apache/kafka/streams/scala/kstream/KStreamTest.scala

38
streams/streams-scala/src/main/scala/org/apache/kafka/streams/scala/FunctionsCompatConversions.scala

@ -21,6 +21,8 @@ import org.apache.kafka.streams.kstream._ @@ -21,6 +21,8 @@ import org.apache.kafka.streams.kstream._
import scala.collection.JavaConverters._
import java.lang.{Iterable => JIterable}
import org.apache.kafka.streams.processor.ProcessorContext
/**
* Implicit classes that offer conversions of Scala function literals to
* SAM (Single Abstract Method) objects in Java. These make the Scala APIs much
@ -120,4 +122,40 @@ private[scala] object FunctionsCompatConversions { @@ -120,4 +122,40 @@ private[scala] object FunctionsCompatConversions {
override def get(): Transformer[K, V, VO] = f()
}
}
implicit class TransformerSupplierAsJava[K, V, VO](val supplier: TransformerSupplier[K, V, Iterable[VO]])
extends AnyVal {
def asJava: TransformerSupplier[K, V, JIterable[VO]] = new TransformerSupplier[K, V, JIterable[VO]] {
override def get(): Transformer[K, V, JIterable[VO]] =
new Transformer[K, V, JIterable[VO]] {
override def transform(key: K, value: V): JIterable[VO] = supplier.get().transform(key, value).asJava
override def init(context: ProcessorContext): Unit = supplier.get().init(context)
override def close(): Unit = supplier.get().close()
}
}
}
implicit class ValueTransformerSupplierAsJava[V, VO](val supplier: ValueTransformerSupplier[V, Iterable[VO]])
extends AnyVal {
def asJava: ValueTransformerSupplier[V, JIterable[VO]] = new ValueTransformerSupplier[V, JIterable[VO]] {
override def get(): ValueTransformer[V, JIterable[VO]] =
new ValueTransformer[V, JIterable[VO]] {
override def transform(value: V): JIterable[VO] = supplier.get().transform(value).asJava
override def init(context: ProcessorContext): Unit = supplier.get().init(context)
override def close(): Unit = supplier.get().close()
}
}
}
implicit class ValueTransformerSupplierWithKeyAsJava[K, V, VO](
val supplier: ValueTransformerWithKeySupplier[K, V, Iterable[VO]]
) extends AnyVal {
def asJava: ValueTransformerWithKeySupplier[K, V, JIterable[VO]] =
new ValueTransformerWithKeySupplier[K, V, JIterable[VO]] {
override def get(): ValueTransformerWithKey[K, V, JIterable[VO]] =
new ValueTransformerWithKey[K, V, JIterable[VO]] {
override def transform(key: K, value: V): JIterable[VO] = supplier.get().transform(key, value).asJava
override def init(context: ProcessorContext): Unit = supplier.get().init(context)
override def close(): Unit = supplier.get().close()
}
}
}
}

57
streams/streams-scala/src/main/scala/org/apache/kafka/streams/scala/kstream/KStream.scala

@ -307,6 +307,63 @@ class KStream[K, V](val inner: KStreamJ[K, V]) { @@ -307,6 +307,63 @@ class KStream[K, V](val inner: KStreamJ[K, V]) {
stateStoreNames: String*): KStream[K1, V1] =
inner.transform(transformerSupplier, stateStoreNames: _*)
/**
* Transform each record of the input stream into zero or more records in the output stream (both key and value type
* can be altered arbitrarily).
* A `Transformer` (provided by the given `TransformerSupplier`) is applied to each input record
* and computes zero or more output records.
* In order to assign a state, the state must be created and added via `addStateStore` before they can be connected
* to the `Transformer`.
* It's not required to connect global state stores that are added via `addGlobalStore`;
* read-only access to global state stores is available by default.
*
* @param transformerSupplier the `TransformerSuplier` that generates `Transformer`
* @param stateStoreNames the names of the state stores used by the processor
* @return a [[KStream]] that contains more or less records with new key and value (possibly of different type)
* @see `org.apache.kafka.streams.kstream.KStream#transform`
*/
def flatTransform[K1, V1](transformerSupplier: TransformerSupplier[K, V, Iterable[KeyValue[K1, V1]]],
stateStoreNames: String*): KStream[K1, V1] =
inner.flatTransform(transformerSupplier.asJava, stateStoreNames: _*)
/**
* Transform the value of each input record into zero or more records (with possible new type) in the
* output stream.
* A `ValueTransformer` (provided by the given `ValueTransformerSupplier`) is applied to each input
* record value and computes a new value for it.
* In order to assign a state, the state must be created and added via `addStateStore` before they can be connected
* to the `ValueTransformer`.
* It's not required to connect global state stores that are added via `addGlobalStore`;
* read-only access to global state stores is available by default.
*
* @param valueTransformerSupplier a instance of `ValueTransformerSupplier` that generates a `ValueTransformer`
* @param stateStoreNames the names of the state stores used by the processor
* @return a [[KStream]] that contains records with unmodified key and new values (possibly of different type)
* @see `org.apache.kafka.streams.kstream.KStream#transformValues`
*/
def flatTransformValues[VR](valueTransformerSupplier: ValueTransformerSupplier[V, Iterable[VR]],
stateStoreNames: String*): KStream[K, VR] =
inner.flatTransformValues[VR](valueTransformerSupplier.asJava, stateStoreNames: _*)
/**
* Transform the value of each input record into zero or more records (with possible new type) in the
* output stream.
* A `ValueTransformer` (provided by the given `ValueTransformerSupplier`) is applied to each input
* record value and computes a new value for it.
* In order to assign a state, the state must be created and added via `addStateStore` before they can be connected
* to the `ValueTransformer`.
* It's not required to connect global state stores that are added via `addGlobalStore`;
* read-only access to global state stores is available by default.
*
* @param valueTransformerSupplier a instance of `ValueTransformerWithKeySupplier` that generates a `ValueTransformerWithKey`
* @param stateStoreNames the names of the state stores used by the processor
* @return a [[KStream]] that contains records with unmodified key and new values (possibly of different type)
* @see `org.apache.kafka.streams.kstream.KStream#transformValues`
*/
def flatTransformValues[VR](valueTransformerSupplier: ValueTransformerWithKeySupplier[K, V, Iterable[VR]],
stateStoreNames: String*): KStream[K, VR] =
inner.flatTransformValues[VR](valueTransformerSupplier.asJava, stateStoreNames: _*)
/**
* Transform the value of each input record into a new value (with possible new type) of the output record.
* A `ValueTransformer` (provided by the given `ValueTransformerSupplier`) is applied to each input

148
streams/streams-scala/src/test/scala/org/apache/kafka/streams/scala/kstream/KStreamTest.scala

@ -21,7 +21,17 @@ package org.apache.kafka.streams.scala.kstream @@ -21,7 +21,17 @@ package org.apache.kafka.streams.scala.kstream
import java.time.Duration.ofSeconds
import java.time.Instant
import org.apache.kafka.streams.kstream.JoinWindows
import org.apache.kafka.streams.KeyValue
import org.apache.kafka.streams.kstream.{
JoinWindows,
Transformer,
TransformerSupplier,
ValueTransformer,
ValueTransformerSupplier,
ValueTransformerWithKey,
ValueTransformerWithKeySupplier
}
import org.apache.kafka.streams.processor.ProcessorContext
import org.apache.kafka.streams.scala.ImplicitConversions._
import org.apache.kafka.streams.scala.Serdes._
import org.apache.kafka.streams.scala.StreamsBuilder
@ -173,4 +183,140 @@ class KStreamTest extends FlatSpec with Matchers with TestDriver { @@ -173,4 +183,140 @@ class KStreamTest extends FlatSpec with Matchers with TestDriver {
testDriver.close()
}
"transform a KStream" should "transform correctly records" in {
class TestTransformer extends Transformer[String, String, KeyValue[String, String]] {
override def init(context: ProcessorContext): Unit = {}
override def transform(key: String, value: String): KeyValue[String, String] =
new KeyValue(s"$key-transformed", s"$value-transformed")
override def close(): Unit = {}
}
val builder = new StreamsBuilder()
val sourceTopic = "source"
val sinkTopic = "sink"
val stream = builder.stream[String, String](sourceTopic)
stream
.transform(new TransformerSupplier[String, String, KeyValue[String, String]] {
def get(): Transformer[String, String, KeyValue[String, String]] =
new TestTransformer
})
.to(sinkTopic)
val now = Instant.now()
val testDriver = createTestDriver(builder, now)
val testInput = testDriver.createInput[String, String](sourceTopic)
val testOutput = testDriver.createOutput[String, String](sinkTopic)
testInput.pipeInput("1", "value", now)
val result = testOutput.readKeyValue()
result.value shouldBe "value-transformed"
result.key shouldBe "1-transformed"
testOutput.isEmpty shouldBe true
testDriver.close()
}
"flatTransform a KStream" should "flatTransform correctly records" in {
class TestTransformer extends Transformer[String, String, Iterable[KeyValue[String, String]]] {
override def init(context: ProcessorContext): Unit = {}
override def transform(key: String, value: String): Iterable[KeyValue[String, String]] =
Array(new KeyValue(s"$key-transformed", s"$value-transformed"))
override def close(): Unit = {}
}
val builder = new StreamsBuilder()
val sourceTopic = "source"
val sinkTopic = "sink"
val stream = builder.stream[String, String](sourceTopic)
stream
.flatTransform(new TransformerSupplier[String, String, Iterable[KeyValue[String, String]]] {
def get(): Transformer[String, String, Iterable[KeyValue[String, String]]] =
new TestTransformer
})
.to(sinkTopic)
val now = Instant.now()
val testDriver = createTestDriver(builder, now)
val testInput = testDriver.createInput[String, String](sourceTopic)
val testOutput = testDriver.createOutput[String, String](sinkTopic)
testInput.pipeInput("1", "value", now)
val result = testOutput.readKeyValue()
result.value shouldBe "value-transformed"
result.key shouldBe "1-transformed"
testOutput.isEmpty shouldBe true
testDriver.close()
}
"flatTransformValues a KStream" should "correctly flatTransform values in records" in {
class TestTransformer extends ValueTransformer[String, Iterable[String]] {
override def init(context: ProcessorContext): Unit = {}
override def transform(value: String): Iterable[String] =
Array(s"$value-transformed")
override def close(): Unit = {}
}
val builder = new StreamsBuilder()
val sourceTopic = "source"
val sinkTopic = "sink"
val stream = builder.stream[String, String](sourceTopic)
stream
.flatTransformValues(new ValueTransformerSupplier[String, Iterable[String]] {
def get(): ValueTransformer[String, Iterable[String]] =
new TestTransformer
})
.to(sinkTopic)
val now = Instant.now()
val testDriver = createTestDriver(builder, now)
val testInput = testDriver.createInput[String, String](sourceTopic)
val testOutput = testDriver.createOutput[String, String](sinkTopic)
testInput.pipeInput("1", "value", now)
testOutput.readValue shouldBe "value-transformed"
testOutput.isEmpty shouldBe true
testDriver.close()
}
"flatTransformValues with key in a KStream" should "correctly flatTransformValues in records" in {
class TestTransformer extends ValueTransformerWithKey[String, String, Iterable[String]] {
override def init(context: ProcessorContext): Unit = {}
override def transform(key: String, value: String): Iterable[String] =
Array(s"$value-transformed-$key")
override def close(): Unit = {}
}
val builder = new StreamsBuilder()
val sourceTopic = "source"
val sinkTopic = "sink"
val stream = builder.stream[String, String](sourceTopic)
stream
.flatTransformValues(new ValueTransformerWithKeySupplier[String, String, Iterable[String]] {
def get(): ValueTransformerWithKey[String, String, Iterable[String]] =
new TestTransformer
})
.to(sinkTopic)
val now = Instant.now()
val testDriver = createTestDriver(builder, now)
val testInput = testDriver.createInput[String, String](sourceTopic)
val testOutput = testDriver.createOutput[String, String](sinkTopic)
testInput.pipeInput("1", "value", now)
testOutput.readValue shouldBe "value-transformed-1"
testOutput.isEmpty shouldBe true
testDriver.close()
}
}

Loading…
Cancel
Save