From 4662ed4aac79ed539f3a62140b8b154517837dbd Mon Sep 17 00:00:00 2001 From: Yuriy Badalyantc Date: Thu, 3 Sep 2020 00:49:47 +0700 Subject: [PATCH] MINOR: Fix build scala 2.12 build after KAFKA-10020 (#9245) Fixes a problem in which the Serdes class in the same package as the tests (the old one) overshadows the one we explicitly imported (the new one), but only in Scala 2.12. Since users (hopefully) don't put their classes in our packages, they won't face the same problem. Reviewers: Chia-Ping Tsai , David Arthur , John Roesler --- ...inScalaIntegrationTestImplicitSerdes.scala | 16 +++++----- .../kafka/streams/scala/TopologyTest.scala | 30 +++++++++---------- .../kafka/streams/scala/WordCountTest.scala | 8 ++--- 3 files changed, 27 insertions(+), 27 deletions(-) diff --git a/streams/streams-scala/src/test/scala/org/apache/kafka/streams/scala/StreamToTableJoinScalaIntegrationTestImplicitSerdes.scala b/streams/streams-scala/src/test/scala/org/apache/kafka/streams/scala/StreamToTableJoinScalaIntegrationTestImplicitSerdes.scala index d4a1aa70add..0c2445403ae 100644 --- a/streams/streams-scala/src/test/scala/org/apache/kafka/streams/scala/StreamToTableJoinScalaIntegrationTestImplicitSerdes.scala +++ b/streams/streams-scala/src/test/scala/org/apache/kafka/streams/scala/StreamToTableJoinScalaIntegrationTestImplicitSerdes.scala @@ -19,7 +19,7 @@ package org.apache.kafka.streams.scala import java.util.Properties import org.apache.kafka.streams.{KafkaStreams, KeyValue, StreamsConfig} -import org.apache.kafka.streams.scala.serialization.Serdes +import org.apache.kafka.streams.scala.serialization.{Serdes => NewSerdes} import org.apache.kafka.streams.scala.ImplicitConversions._ import org.apache.kafka.streams.scala.kstream._ import org.apache.kafka.streams.scala.utils.StreamToTableJoinScalaIntegrationTestBase @@ -133,16 +133,16 @@ class StreamToTableJoinScalaIntegrationTestImplicitSerdes extends StreamToTableJ val streamsConfiguration: Properties = getStreamsConfiguration() - streamsConfiguration.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.stringSerde.getClass.getName) - streamsConfiguration.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.stringSerde.getClass.getName) + streamsConfiguration.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, NewSerdes.stringSerde.getClass.getName) + streamsConfiguration.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, NewSerdes.stringSerde.getClass.getName) val builder: StreamsBuilderJ = new StreamsBuilderJ() val userClicksStream: KStreamJ[String, JLong] = - builder.stream[String, JLong](userClicksTopicJ, Consumed.`with`(Serdes.stringSerde, Serdes.javaLongSerde)) + builder.stream[String, JLong](userClicksTopicJ, Consumed.`with`(NewSerdes.stringSerde, NewSerdes.javaLongSerde)) val userRegionsTable: KTableJ[String, String] = - builder.table[String, String](userRegionsTopicJ, Consumed.`with`(Serdes.stringSerde, Serdes.stringSerde)) + builder.table[String, String](userRegionsTopicJ, Consumed.`with`(NewSerdes.stringSerde, NewSerdes.stringSerde)) // Join the stream against the table. val valueJoinerJ: ValueJoiner[JLong, String, (String, JLong)] = @@ -150,7 +150,7 @@ class StreamToTableJoinScalaIntegrationTestImplicitSerdes extends StreamToTableJ val userClicksJoinRegion: KStreamJ[String, (String, JLong)] = userClicksStream.leftJoin( userRegionsTable, valueJoinerJ, - Joined.`with`[String, JLong, String](Serdes.stringSerde, Serdes.javaLongSerde, Serdes.stringSerde) + Joined.`with`[String, JLong, String](NewSerdes.stringSerde, NewSerdes.javaLongSerde, NewSerdes.stringSerde) ) // Change the stream from -> to -> @@ -160,11 +160,11 @@ class StreamToTableJoinScalaIntegrationTestImplicitSerdes extends StreamToTableJ // Compute the total per region by summing the individual click counts per region. val clicksPerRegion: KTableJ[String, JLong] = clicksByRegion - .groupByKey(Grouped.`with`(Serdes.stringSerde, Serdes.javaLongSerde)) + .groupByKey(Grouped.`with`(NewSerdes.stringSerde, NewSerdes.javaLongSerde)) .reduce((v1, v2) => v1 + v2) // Write the (continuously updating) results to the output topic. - clicksPerRegion.toStream.to(outputTopicJ, Produced.`with`(Serdes.stringSerde, Serdes.javaLongSerde)) + clicksPerRegion.toStream.to(outputTopicJ, Produced.`with`(NewSerdes.stringSerde, NewSerdes.javaLongSerde)) val streams = new KafkaStreamsJ(builder.build(), streamsConfiguration) diff --git a/streams/streams-scala/src/test/scala/org/apache/kafka/streams/scala/TopologyTest.scala b/streams/streams-scala/src/test/scala/org/apache/kafka/streams/scala/TopologyTest.scala index 1262431c320..eeed948eefb 100644 --- a/streams/streams-scala/src/test/scala/org/apache/kafka/streams/scala/TopologyTest.scala +++ b/streams/streams-scala/src/test/scala/org/apache/kafka/streams/scala/TopologyTest.scala @@ -42,7 +42,7 @@ import org.apache.kafka.streams.kstream.{ } import org.apache.kafka.streams.processor.{AbstractProcessor, ProcessorContext, ProcessorSupplier} import org.apache.kafka.streams.scala.ImplicitConversions._ -import org.apache.kafka.streams.scala.serialization.Serdes +import org.apache.kafka.streams.scala.serialization.{Serdes => NewSerdes} import org.apache.kafka.streams.scala.serialization.Serdes._ import org.apache.kafka.streams.scala.kstream._ import org.apache.kafka.streams.{KeyValue, StreamsConfig, TopologyDescription, StreamsBuilder => StreamsBuilderJ} @@ -355,7 +355,7 @@ class TopologyTest { val builder: StreamsBuilder = new StreamsBuilder val sourceStream: KStream[String, String] = - builder.stream(inputTopic)(Consumed.`with`(Serdes.stringSerde, Serdes.stringSerde)) + builder.stream(inputTopic)(Consumed.`with`(NewSerdes.stringSerde, NewSerdes.stringSerde)) val mappedStream: KStream[String, String] = sourceStream.map((k: String, v: String) => (k.toUpperCase(Locale.getDefault), v)) @@ -365,30 +365,30 @@ class TopologyTest { .process(() => new SimpleProcessor(processorValueCollector)) val stream2 = mappedStream.groupByKey - .aggregate(0)(aggregator)(Materialized.`with`(Serdes.stringSerde, Serdes.intSerde)) + .aggregate(0)(aggregator)(Materialized.`with`(NewSerdes.stringSerde, NewSerdes.intSerde)) .toStream - stream2.to(AGGREGATION_TOPIC)(Produced.`with`(Serdes.stringSerde, Serdes.intSerde)) + stream2.to(AGGREGATION_TOPIC)(Produced.`with`(NewSerdes.stringSerde, NewSerdes.intSerde)) // adding operators for case where the repartition node is further downstream val stream3 = mappedStream .filter((_: String, _: String) => true) .peek((k: String, v: String) => System.out.println(k + ":" + v)) .groupByKey - .reduce(reducer)(Materialized.`with`(Serdes.stringSerde, Serdes.stringSerde)) + .reduce(reducer)(Materialized.`with`(NewSerdes.stringSerde, NewSerdes.stringSerde)) .toStream - stream3.to(REDUCE_TOPIC)(Produced.`with`(Serdes.stringSerde, Serdes.stringSerde)) + stream3.to(REDUCE_TOPIC)(Produced.`with`(NewSerdes.stringSerde, NewSerdes.stringSerde)) mappedStream .filter((k: String, _: String) => k == "A") .join(stream2)((v1: String, v2: Int) => v1 + ":" + v2.toString, JoinWindows.of(Duration.ofMillis(5000)))( - StreamJoined.`with`(Serdes.stringSerde, Serdes.stringSerde, Serdes.intSerde) + StreamJoined.`with`(NewSerdes.stringSerde, NewSerdes.stringSerde, NewSerdes.intSerde) ) .to(JOINED_TOPIC) mappedStream .filter((k: String, _: String) => k == "A") .join(stream3)((v1: String, v2: String) => v1 + ":" + v2.toString, JoinWindows.of(Duration.ofMillis(5000)))( - StreamJoined.`with`(Serdes.stringSerde, Serdes.stringSerde, Serdes.stringSerde) + StreamJoined.`with`(NewSerdes.stringSerde, NewSerdes.stringSerde, NewSerdes.stringSerde) ) .to(JOINED_TOPIC) @@ -411,7 +411,7 @@ class TopologyTest { val builder = new StreamsBuilderJ - val sourceStream = builder.stream(inputTopic, Consumed.`with`(Serdes.stringSerde, Serdes.stringSerde)) + val sourceStream = builder.stream(inputTopic, Consumed.`with`(NewSerdes.stringSerde, NewSerdes.stringSerde)) val mappedStream: KStreamJ[String, String] = sourceStream.map(keyValueMapper) @@ -421,25 +421,25 @@ class TopologyTest { .process(processorSupplier) val stream2: KStreamJ[String, Integer] = mappedStream.groupByKey - .aggregate(initializer, aggregator, MaterializedJ.`with`(Serdes.stringSerde, SerdesJ.Integer)) + .aggregate(initializer, aggregator, MaterializedJ.`with`(NewSerdes.stringSerde, SerdesJ.Integer)) .toStream - stream2.to(AGGREGATION_TOPIC, Produced.`with`(Serdes.stringSerde, SerdesJ.Integer)) + stream2.to(AGGREGATION_TOPIC, Produced.`with`(NewSerdes.stringSerde, SerdesJ.Integer)) // adding operators for case where the repartition node is further downstream val stream3 = mappedStream .filter((_, _) => true) .peek((k, v) => System.out.println(k + ":" + v)) .groupByKey - .reduce(reducer, MaterializedJ.`with`(Serdes.stringSerde, Serdes.stringSerde)) + .reduce(reducer, MaterializedJ.`with`(NewSerdes.stringSerde, NewSerdes.stringSerde)) .toStream - stream3.to(REDUCE_TOPIC, Produced.`with`(Serdes.stringSerde, Serdes.stringSerde)) + stream3.to(REDUCE_TOPIC, Produced.`with`(NewSerdes.stringSerde, NewSerdes.stringSerde)) mappedStream .filter((key, _) => key == "A") .join[Integer, String](stream2, valueJoiner2, JoinWindows.of(Duration.ofMillis(5000)), - StreamJoinedJ.`with`(Serdes.stringSerde, Serdes.stringSerde, SerdesJ.Integer)) + StreamJoinedJ.`with`(NewSerdes.stringSerde, NewSerdes.stringSerde, SerdesJ.Integer)) .to(JOINED_TOPIC) mappedStream @@ -447,7 +447,7 @@ class TopologyTest { .join(stream3, valueJoiner3, JoinWindows.of(Duration.ofMillis(5000)), - StreamJoinedJ.`with`(Serdes.stringSerde, Serdes.stringSerde, SerdesJ.String)) + StreamJoinedJ.`with`(NewSerdes.stringSerde, NewSerdes.stringSerde, SerdesJ.String)) .to(JOINED_TOPIC) builder diff --git a/streams/streams-scala/src/test/scala/org/apache/kafka/streams/scala/WordCountTest.scala b/streams/streams-scala/src/test/scala/org/apache/kafka/streams/scala/WordCountTest.scala index bd1b279ba68..16b3493a675 100644 --- a/streams/streams-scala/src/test/scala/org/apache/kafka/streams/scala/WordCountTest.scala +++ b/streams/streams-scala/src/test/scala/org/apache/kafka/streams/scala/WordCountTest.scala @@ -25,7 +25,7 @@ import java.util.regex.Pattern import org.junit.Assert._ import org.junit._ import org.junit.rules.TemporaryFolder -import org.apache.kafka.streams.scala.serialization.Serdes +import org.apache.kafka.streams.scala.serialization.{Serdes => NewSerdes} import org.apache.kafka.streams.{KafkaStreams, KeyValue, StreamsConfig} import org.apache.kafka.streams.scala.kstream._ import org.apache.kafka.streams.integration.utils.{EmbeddedKafkaCluster, IntegrationTestUtils} @@ -144,8 +144,8 @@ class WordCountTest extends WordCountTestData { import scala.jdk.CollectionConverters._ val streamsConfiguration = getStreamsConfiguration() - streamsConfiguration.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.stringSerde.getClass.getName) - streamsConfiguration.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.stringSerde.getClass.getName) + streamsConfiguration.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, NewSerdes.stringSerde.getClass.getName) + streamsConfiguration.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, NewSerdes.stringSerde.getClass.getName) val streamBuilder = new StreamsBuilderJ val textLines: KStreamJ[String, String] = streamBuilder.stream[String, String](inputTopicJ) @@ -162,7 +162,7 @@ class WordCountTest extends WordCountTestData { val wordCounts: KTableJ[String, java.lang.Long] = grouped.count() - wordCounts.toStream.to(outputTopicJ, Produced.`with`(Serdes.stringSerde, Serdes.javaLongSerde)) + wordCounts.toStream.to(outputTopicJ, Produced.`with`(NewSerdes.stringSerde, NewSerdes.javaLongSerde)) val streams: KafkaStreamsJ = new KafkaStreamsJ(streamBuilder.build(), streamsConfiguration) streams.start()