Browse Source

KAFKA-9011: Removed multiple calls to supplier.get() in order to avoid multiple transformer instances being created. (#7685)

This is a followup PR for #7520 to address issue of multiple calls to get() as it was pointed out by @bbejeck in #7520 (comment)

Reviewers: Bill Bejeck <bbejeck@gmail.com>
pull/7288/merge
Alex Kokachev 5 years ago committed by Bill Bejeck
parent
commit
7f5c380c34
  1. 30
      streams/streams-scala/src/main/scala/org/apache/kafka/streams/scala/FunctionsCompatConversions.scala

30
streams/streams-scala/src/main/scala/org/apache/kafka/streams/scala/FunctionsCompatConversions.scala

@ -126,23 +126,27 @@ private[scala] object FunctionsCompatConversions { @@ -126,23 +126,27 @@ private[scala] object FunctionsCompatConversions {
implicit class TransformerSupplierAsJava[K, V, VO](val supplier: TransformerSupplier[K, V, Iterable[VO]])
extends AnyVal {
def asJava: TransformerSupplier[K, V, JIterable[VO]] = new TransformerSupplier[K, V, JIterable[VO]] {
override def get(): Transformer[K, V, JIterable[VO]] =
override def get(): Transformer[K, V, JIterable[VO]] = {
val innerTransformer = supplier.get()
new Transformer[K, V, JIterable[VO]] {
override def transform(key: K, value: V): JIterable[VO] = supplier.get().transform(key, value).asJava
override def init(context: ProcessorContext): Unit = supplier.get().init(context)
override def close(): Unit = supplier.get().close()
override def transform(key: K, value: V): JIterable[VO] = innerTransformer.transform(key, value).asJava
override def init(context: ProcessorContext): Unit = innerTransformer.init(context)
override def close(): Unit = innerTransformer.close()
}
}
}
}
implicit class ValueTransformerSupplierAsJava[V, VO](val supplier: ValueTransformerSupplier[V, Iterable[VO]])
extends AnyVal {
def asJava: ValueTransformerSupplier[V, JIterable[VO]] = new ValueTransformerSupplier[V, JIterable[VO]] {
override def get(): ValueTransformer[V, JIterable[VO]] =
override def get(): ValueTransformer[V, JIterable[VO]] = {
val innerTransformer = supplier.get()
new ValueTransformer[V, JIterable[VO]] {
override def transform(value: V): JIterable[VO] = supplier.get().transform(value).asJava
override def init(context: ProcessorContext): Unit = supplier.get().init(context)
override def close(): Unit = supplier.get().close()
override def transform(value: V): JIterable[VO] = innerTransformer.transform(value).asJava
override def init(context: ProcessorContext): Unit = innerTransformer.init(context)
override def close(): Unit = innerTransformer.close()
}
}
}
}
implicit class ValueTransformerSupplierWithKeyAsJava[K, V, VO](
@ -150,12 +154,14 @@ private[scala] object FunctionsCompatConversions { @@ -150,12 +154,14 @@ private[scala] object FunctionsCompatConversions {
) extends AnyVal {
def asJava: ValueTransformerWithKeySupplier[K, V, JIterable[VO]] =
new ValueTransformerWithKeySupplier[K, V, JIterable[VO]] {
override def get(): ValueTransformerWithKey[K, V, JIterable[VO]] =
override def get(): ValueTransformerWithKey[K, V, JIterable[VO]] = {
val innerTransformer = supplier.get()
new ValueTransformerWithKey[K, V, JIterable[VO]] {
override def transform(key: K, value: V): JIterable[VO] = supplier.get().transform(key, value).asJava
override def init(context: ProcessorContext): Unit = supplier.get().init(context)
override def close(): Unit = supplier.get().close()
override def transform(key: K, value: V): JIterable[VO] = innerTransformer.transform(key, value).asJava
override def init(context: ProcessorContext): Unit = innerTransformer.init(context)
override def close(): Unit = innerTransformer.close()
}
}
}
}
}

Loading…
Cancel
Save