From 7f5c380c34967dfb57ba7f2c46fe28ba4ae24ad6 Mon Sep 17 00:00:00 2001 From: Alex Kokachev Date: Fri, 15 Nov 2019 08:12:08 +1100 Subject: [PATCH] KAFKA-9011: Removed multiple calls to supplier.get() in order to avoid multiple transformer instances being created. (#7685) This is a followup PR for #7520 to address issue of multiple calls to get() as it was pointed out by @bbejeck in #7520 (comment) Reviewers: Bill Bejeck --- .../scala/FunctionsCompatConversions.scala | 30 +++++++++++-------- 1 file changed, 18 insertions(+), 12 deletions(-) diff --git a/streams/streams-scala/src/main/scala/org/apache/kafka/streams/scala/FunctionsCompatConversions.scala b/streams/streams-scala/src/main/scala/org/apache/kafka/streams/scala/FunctionsCompatConversions.scala index 26756e076b9..e8420f4b3ca 100644 --- a/streams/streams-scala/src/main/scala/org/apache/kafka/streams/scala/FunctionsCompatConversions.scala +++ b/streams/streams-scala/src/main/scala/org/apache/kafka/streams/scala/FunctionsCompatConversions.scala @@ -126,23 +126,27 @@ private[scala] object FunctionsCompatConversions { implicit class TransformerSupplierAsJava[K, V, VO](val supplier: TransformerSupplier[K, V, Iterable[VO]]) extends AnyVal { def asJava: TransformerSupplier[K, V, JIterable[VO]] = new TransformerSupplier[K, V, JIterable[VO]] { - override def get(): Transformer[K, V, JIterable[VO]] = + override def get(): Transformer[K, V, JIterable[VO]] = { + val innerTransformer = supplier.get() new Transformer[K, V, JIterable[VO]] { - override def transform(key: K, value: V): JIterable[VO] = supplier.get().transform(key, value).asJava - override def init(context: ProcessorContext): Unit = supplier.get().init(context) - override def close(): Unit = supplier.get().close() + override def transform(key: K, value: V): JIterable[VO] = innerTransformer.transform(key, value).asJava + override def init(context: ProcessorContext): Unit = innerTransformer.init(context) + override def close(): Unit = innerTransformer.close() } + } } } implicit class ValueTransformerSupplierAsJava[V, VO](val supplier: ValueTransformerSupplier[V, Iterable[VO]]) extends AnyVal { def asJava: ValueTransformerSupplier[V, JIterable[VO]] = new ValueTransformerSupplier[V, JIterable[VO]] { - override def get(): ValueTransformer[V, JIterable[VO]] = + override def get(): ValueTransformer[V, JIterable[VO]] = { + val innerTransformer = supplier.get() new ValueTransformer[V, JIterable[VO]] { - override def transform(value: V): JIterable[VO] = supplier.get().transform(value).asJava - override def init(context: ProcessorContext): Unit = supplier.get().init(context) - override def close(): Unit = supplier.get().close() + override def transform(value: V): JIterable[VO] = innerTransformer.transform(value).asJava + override def init(context: ProcessorContext): Unit = innerTransformer.init(context) + override def close(): Unit = innerTransformer.close() } + } } } implicit class ValueTransformerSupplierWithKeyAsJava[K, V, VO]( @@ -150,12 +154,14 @@ private[scala] object FunctionsCompatConversions { ) extends AnyVal { def asJava: ValueTransformerWithKeySupplier[K, V, JIterable[VO]] = new ValueTransformerWithKeySupplier[K, V, JIterable[VO]] { - override def get(): ValueTransformerWithKey[K, V, JIterable[VO]] = + override def get(): ValueTransformerWithKey[K, V, JIterable[VO]] = { + val innerTransformer = supplier.get() new ValueTransformerWithKey[K, V, JIterable[VO]] { - override def transform(key: K, value: V): JIterable[VO] = supplier.get().transform(key, value).asJava - override def init(context: ProcessorContext): Unit = supplier.get().init(context) - override def close(): Unit = supplier.get().close() + override def transform(key: K, value: V): JIterable[VO] = innerTransformer.transform(key, value).asJava + override def init(context: ProcessorContext): Unit = innerTransformer.init(context) + override def close(): Unit = innerTransformer.close() } + } } } }