Browse Source

MINOR: Fix streams Scala foreach recursive call (#5539)

Reviewers: Guozhang Wang <guozhang@confluent.io>, John Roesler <john@confluent.io>
pull/5568/head
Joan Goyeau 6 years ago committed by Guozhang Wang
parent
commit
b8559de23d
  1. 6
      streams/streams-scala/src/main/scala/org/apache/kafka/streams/scala/FunctionConversions.scala
  2. 12
      streams/streams-scala/src/main/scala/org/apache/kafka/streams/scala/kstream/KStream.scala
  3. 16
      streams/streams-scala/src/test/scala/org/apache/kafka/streams/scala/KStreamTest.scala

6
streams/streams-scala/src/main/scala/org/apache/kafka/streams/scala/FunctionConversions.scala

@ -34,6 +34,12 @@ import java.lang.{Iterable => JIterable} @@ -34,6 +34,12 @@ import java.lang.{Iterable => JIterable}
*/
object FunctionConversions {
implicit private[scala] class ForeachActionFromFunction[K, V](val p: (K, V) => Unit) extends AnyVal {
def asForeachAction: ForeachAction[K, V] = new ForeachAction[K, V] {
override def apply(key: K, value: V): Unit = p(key, value)
}
}
implicit class PredicateFromFunction[K, V](val p: (K, V) => Boolean) extends AnyVal {
def asPredicate: Predicate[K, V] = new Predicate[K, V] {
override def test(key: K, value: V): Boolean = p(key, value)

12
streams/streams-scala/src/main/scala/org/apache/kafka/streams/scala/kstream/KStream.scala

@ -22,7 +22,7 @@ package kstream @@ -22,7 +22,7 @@ package kstream
import org.apache.kafka.streams.KeyValue
import org.apache.kafka.streams.kstream.{KStream => KStreamJ, _}
import org.apache.kafka.streams.processor.{Processor, ProcessorContext, ProcessorSupplier, TopicNameExtractor}
import org.apache.kafka.streams.processor.{Processor, ProcessorSupplier, TopicNameExtractor}
import org.apache.kafka.streams.scala.ImplicitConversions._
import org.apache.kafka.streams.scala.FunctionConversions._
@ -84,10 +84,8 @@ class KStream[K, V](val inner: KStreamJ[K, V]) { @@ -84,10 +84,8 @@ class KStream[K, V](val inner: KStreamJ[K, V]) {
* @return a [[KStream]] that contains records with new key and value (possibly both of different type)
* @see `org.apache.kafka.streams.kstream.KStream#map`
*/
def map[KR, VR](mapper: (K, V) => (KR, VR)): KStream[KR, VR] = {
val kvMapper = mapper.tupled andThen tuple2ToKeyValue
inner.map[KR, VR](((k: K, v: V) => kvMapper(k, v)).asKeyValueMapper)
}
def map[KR, VR](mapper: (K, V) => (KR, VR)): KStream[KR, VR] =
inner.map[KR, VR](mapper.asKeyValueMapper)
/**
* Transform the value of each input record into a new value (with possible new type) of the output record.
@ -124,7 +122,7 @@ class KStream[K, V](val inner: KStreamJ[K, V]) { @@ -124,7 +122,7 @@ class KStream[K, V](val inner: KStreamJ[K, V]) {
* @see `org.apache.kafka.streams.kstream.KStream#flatMap`
*/
def flatMap[KR, VR](mapper: (K, V) => Iterable[(KR, VR)]): KStream[KR, VR] = {
val kvMapper = mapper.tupled andThen (iter => iter.map(tuple2ToKeyValue).asJava)
val kvMapper = mapper.tupled.andThen(_.map(tuple2ToKeyValue).asJava)
inner.flatMap[KR, VR](((k: K, v: V) => kvMapper(k, v)).asKeyValueMapper)
}
@ -173,7 +171,7 @@ class KStream[K, V](val inner: KStreamJ[K, V]) { @@ -173,7 +171,7 @@ class KStream[K, V](val inner: KStreamJ[K, V]) {
* @see `org.apache.kafka.streams.kstream.KStream#foreach`
*/
def foreach(action: (K, V) => Unit): Unit =
inner.foreach((k: K, v: V) => action(k, v))
inner.foreach(action.asForeachAction)
/**
* Creates an array of `KStream` from this stream by branching the records in the original stream based on

16
streams/streams-scala/src/test/scala/org/apache/kafka/streams/scala/KStreamTest.scala

@ -75,6 +75,22 @@ class KStreamTest extends FlatSpec with Matchers with TestDriver { @@ -75,6 +75,22 @@ class KStreamTest extends FlatSpec with Matchers with TestDriver {
testDriver.close()
}
"foreach a KStream" should "side effect records" in {
val builder = new StreamsBuilder()
val sourceTopic = "source"
var acc = ""
builder.stream[String, String](sourceTopic).foreach((_, value) => acc += value)
val testDriver = createTestDriver(builder)
testDriver.pipeRecord(sourceTopic, ("1", "value1"))
testDriver.pipeRecord(sourceTopic, ("2", "value2"))
acc shouldBe "value1value2"
testDriver.close()
}
"selectKey a KStream" should "select a new key" in {
val builder = new StreamsBuilder()
val sourceTopic = "source"

Loading…
Cancel
Save