Browse Source

MINOR: Fix build scala 2.12 build after KAFKA-10020 (#9245)

Fixes a problem in which the Serdes class in the same package as
the tests (the old one) overshadows the one we explicitly imported
(the new one), but only in Scala 2.12. Since users (hopefully) don't
put their classes in our packages, they won't face the same problem.

Reviewers: Chia-Ping Tsai <chia7712@gmail.com>, David Arthur <mumrah@gmail.com>, John Roesler <vvcephei@apache.org>
pull/9248/head
Yuriy Badalyantc 4 years ago committed by GitHub
parent
commit
4662ed4aac
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 16
      streams/streams-scala/src/test/scala/org/apache/kafka/streams/scala/StreamToTableJoinScalaIntegrationTestImplicitSerdes.scala
  2. 30
      streams/streams-scala/src/test/scala/org/apache/kafka/streams/scala/TopologyTest.scala
  3. 8
      streams/streams-scala/src/test/scala/org/apache/kafka/streams/scala/WordCountTest.scala

16
streams/streams-scala/src/test/scala/org/apache/kafka/streams/scala/StreamToTableJoinScalaIntegrationTestImplicitSerdes.scala

@ -19,7 +19,7 @@ package org.apache.kafka.streams.scala @@ -19,7 +19,7 @@ package org.apache.kafka.streams.scala
import java.util.Properties
import org.apache.kafka.streams.{KafkaStreams, KeyValue, StreamsConfig}
import org.apache.kafka.streams.scala.serialization.Serdes
import org.apache.kafka.streams.scala.serialization.{Serdes => NewSerdes}
import org.apache.kafka.streams.scala.ImplicitConversions._
import org.apache.kafka.streams.scala.kstream._
import org.apache.kafka.streams.scala.utils.StreamToTableJoinScalaIntegrationTestBase
@ -133,16 +133,16 @@ class StreamToTableJoinScalaIntegrationTestImplicitSerdes extends StreamToTableJ @@ -133,16 +133,16 @@ class StreamToTableJoinScalaIntegrationTestImplicitSerdes extends StreamToTableJ
val streamsConfiguration: Properties = getStreamsConfiguration()
streamsConfiguration.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.stringSerde.getClass.getName)
streamsConfiguration.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.stringSerde.getClass.getName)
streamsConfiguration.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, NewSerdes.stringSerde.getClass.getName)
streamsConfiguration.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, NewSerdes.stringSerde.getClass.getName)
val builder: StreamsBuilderJ = new StreamsBuilderJ()
val userClicksStream: KStreamJ[String, JLong] =
builder.stream[String, JLong](userClicksTopicJ, Consumed.`with`(Serdes.stringSerde, Serdes.javaLongSerde))
builder.stream[String, JLong](userClicksTopicJ, Consumed.`with`(NewSerdes.stringSerde, NewSerdes.javaLongSerde))
val userRegionsTable: KTableJ[String, String] =
builder.table[String, String](userRegionsTopicJ, Consumed.`with`(Serdes.stringSerde, Serdes.stringSerde))
builder.table[String, String](userRegionsTopicJ, Consumed.`with`(NewSerdes.stringSerde, NewSerdes.stringSerde))
// Join the stream against the table.
val valueJoinerJ: ValueJoiner[JLong, String, (String, JLong)] =
@ -150,7 +150,7 @@ class StreamToTableJoinScalaIntegrationTestImplicitSerdes extends StreamToTableJ @@ -150,7 +150,7 @@ class StreamToTableJoinScalaIntegrationTestImplicitSerdes extends StreamToTableJ
val userClicksJoinRegion: KStreamJ[String, (String, JLong)] = userClicksStream.leftJoin(
userRegionsTable,
valueJoinerJ,
Joined.`with`[String, JLong, String](Serdes.stringSerde, Serdes.javaLongSerde, Serdes.stringSerde)
Joined.`with`[String, JLong, String](NewSerdes.stringSerde, NewSerdes.javaLongSerde, NewSerdes.stringSerde)
)
// Change the stream from <user> -> <region, clicks> to <region> -> <clicks>
@ -160,11 +160,11 @@ class StreamToTableJoinScalaIntegrationTestImplicitSerdes extends StreamToTableJ @@ -160,11 +160,11 @@ class StreamToTableJoinScalaIntegrationTestImplicitSerdes extends StreamToTableJ
// Compute the total per region by summing the individual click counts per region.
val clicksPerRegion: KTableJ[String, JLong] = clicksByRegion
.groupByKey(Grouped.`with`(Serdes.stringSerde, Serdes.javaLongSerde))
.groupByKey(Grouped.`with`(NewSerdes.stringSerde, NewSerdes.javaLongSerde))
.reduce((v1, v2) => v1 + v2)
// Write the (continuously updating) results to the output topic.
clicksPerRegion.toStream.to(outputTopicJ, Produced.`with`(Serdes.stringSerde, Serdes.javaLongSerde))
clicksPerRegion.toStream.to(outputTopicJ, Produced.`with`(NewSerdes.stringSerde, NewSerdes.javaLongSerde))
val streams = new KafkaStreamsJ(builder.build(), streamsConfiguration)

30
streams/streams-scala/src/test/scala/org/apache/kafka/streams/scala/TopologyTest.scala

@ -42,7 +42,7 @@ import org.apache.kafka.streams.kstream.{ @@ -42,7 +42,7 @@ import org.apache.kafka.streams.kstream.{
}
import org.apache.kafka.streams.processor.{AbstractProcessor, ProcessorContext, ProcessorSupplier}
import org.apache.kafka.streams.scala.ImplicitConversions._
import org.apache.kafka.streams.scala.serialization.Serdes
import org.apache.kafka.streams.scala.serialization.{Serdes => NewSerdes}
import org.apache.kafka.streams.scala.serialization.Serdes._
import org.apache.kafka.streams.scala.kstream._
import org.apache.kafka.streams.{KeyValue, StreamsConfig, TopologyDescription, StreamsBuilder => StreamsBuilderJ}
@ -355,7 +355,7 @@ class TopologyTest { @@ -355,7 +355,7 @@ class TopologyTest {
val builder: StreamsBuilder = new StreamsBuilder
val sourceStream: KStream[String, String] =
builder.stream(inputTopic)(Consumed.`with`(Serdes.stringSerde, Serdes.stringSerde))
builder.stream(inputTopic)(Consumed.`with`(NewSerdes.stringSerde, NewSerdes.stringSerde))
val mappedStream: KStream[String, String] =
sourceStream.map((k: String, v: String) => (k.toUpperCase(Locale.getDefault), v))
@ -365,30 +365,30 @@ class TopologyTest { @@ -365,30 +365,30 @@ class TopologyTest {
.process(() => new SimpleProcessor(processorValueCollector))
val stream2 = mappedStream.groupByKey
.aggregate(0)(aggregator)(Materialized.`with`(Serdes.stringSerde, Serdes.intSerde))
.aggregate(0)(aggregator)(Materialized.`with`(NewSerdes.stringSerde, NewSerdes.intSerde))
.toStream
stream2.to(AGGREGATION_TOPIC)(Produced.`with`(Serdes.stringSerde, Serdes.intSerde))
stream2.to(AGGREGATION_TOPIC)(Produced.`with`(NewSerdes.stringSerde, NewSerdes.intSerde))
// adding operators for case where the repartition node is further downstream
val stream3 = mappedStream
.filter((_: String, _: String) => true)
.peek((k: String, v: String) => System.out.println(k + ":" + v))
.groupByKey
.reduce(reducer)(Materialized.`with`(Serdes.stringSerde, Serdes.stringSerde))
.reduce(reducer)(Materialized.`with`(NewSerdes.stringSerde, NewSerdes.stringSerde))
.toStream
stream3.to(REDUCE_TOPIC)(Produced.`with`(Serdes.stringSerde, Serdes.stringSerde))
stream3.to(REDUCE_TOPIC)(Produced.`with`(NewSerdes.stringSerde, NewSerdes.stringSerde))
mappedStream
.filter((k: String, _: String) => k == "A")
.join(stream2)((v1: String, v2: Int) => v1 + ":" + v2.toString, JoinWindows.of(Duration.ofMillis(5000)))(
StreamJoined.`with`(Serdes.stringSerde, Serdes.stringSerde, Serdes.intSerde)
StreamJoined.`with`(NewSerdes.stringSerde, NewSerdes.stringSerde, NewSerdes.intSerde)
)
.to(JOINED_TOPIC)
mappedStream
.filter((k: String, _: String) => k == "A")
.join(stream3)((v1: String, v2: String) => v1 + ":" + v2.toString, JoinWindows.of(Duration.ofMillis(5000)))(
StreamJoined.`with`(Serdes.stringSerde, Serdes.stringSerde, Serdes.stringSerde)
StreamJoined.`with`(NewSerdes.stringSerde, NewSerdes.stringSerde, NewSerdes.stringSerde)
)
.to(JOINED_TOPIC)
@ -411,7 +411,7 @@ class TopologyTest { @@ -411,7 +411,7 @@ class TopologyTest {
val builder = new StreamsBuilderJ
val sourceStream = builder.stream(inputTopic, Consumed.`with`(Serdes.stringSerde, Serdes.stringSerde))
val sourceStream = builder.stream(inputTopic, Consumed.`with`(NewSerdes.stringSerde, NewSerdes.stringSerde))
val mappedStream: KStreamJ[String, String] =
sourceStream.map(keyValueMapper)
@ -421,25 +421,25 @@ class TopologyTest { @@ -421,25 +421,25 @@ class TopologyTest {
.process(processorSupplier)
val stream2: KStreamJ[String, Integer] = mappedStream.groupByKey
.aggregate(initializer, aggregator, MaterializedJ.`with`(Serdes.stringSerde, SerdesJ.Integer))
.aggregate(initializer, aggregator, MaterializedJ.`with`(NewSerdes.stringSerde, SerdesJ.Integer))
.toStream
stream2.to(AGGREGATION_TOPIC, Produced.`with`(Serdes.stringSerde, SerdesJ.Integer))
stream2.to(AGGREGATION_TOPIC, Produced.`with`(NewSerdes.stringSerde, SerdesJ.Integer))
// adding operators for case where the repartition node is further downstream
val stream3 = mappedStream
.filter((_, _) => true)
.peek((k, v) => System.out.println(k + ":" + v))
.groupByKey
.reduce(reducer, MaterializedJ.`with`(Serdes.stringSerde, Serdes.stringSerde))
.reduce(reducer, MaterializedJ.`with`(NewSerdes.stringSerde, NewSerdes.stringSerde))
.toStream
stream3.to(REDUCE_TOPIC, Produced.`with`(Serdes.stringSerde, Serdes.stringSerde))
stream3.to(REDUCE_TOPIC, Produced.`with`(NewSerdes.stringSerde, NewSerdes.stringSerde))
mappedStream
.filter((key, _) => key == "A")
.join[Integer, String](stream2,
valueJoiner2,
JoinWindows.of(Duration.ofMillis(5000)),
StreamJoinedJ.`with`(Serdes.stringSerde, Serdes.stringSerde, SerdesJ.Integer))
StreamJoinedJ.`with`(NewSerdes.stringSerde, NewSerdes.stringSerde, SerdesJ.Integer))
.to(JOINED_TOPIC)
mappedStream
@ -447,7 +447,7 @@ class TopologyTest { @@ -447,7 +447,7 @@ class TopologyTest {
.join(stream3,
valueJoiner3,
JoinWindows.of(Duration.ofMillis(5000)),
StreamJoinedJ.`with`(Serdes.stringSerde, Serdes.stringSerde, SerdesJ.String))
StreamJoinedJ.`with`(NewSerdes.stringSerde, NewSerdes.stringSerde, SerdesJ.String))
.to(JOINED_TOPIC)
builder

8
streams/streams-scala/src/test/scala/org/apache/kafka/streams/scala/WordCountTest.scala

@ -25,7 +25,7 @@ import java.util.regex.Pattern @@ -25,7 +25,7 @@ import java.util.regex.Pattern
import org.junit.Assert._
import org.junit._
import org.junit.rules.TemporaryFolder
import org.apache.kafka.streams.scala.serialization.Serdes
import org.apache.kafka.streams.scala.serialization.{Serdes => NewSerdes}
import org.apache.kafka.streams.{KafkaStreams, KeyValue, StreamsConfig}
import org.apache.kafka.streams.scala.kstream._
import org.apache.kafka.streams.integration.utils.{EmbeddedKafkaCluster, IntegrationTestUtils}
@ -144,8 +144,8 @@ class WordCountTest extends WordCountTestData { @@ -144,8 +144,8 @@ class WordCountTest extends WordCountTestData {
import scala.jdk.CollectionConverters._
val streamsConfiguration = getStreamsConfiguration()
streamsConfiguration.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.stringSerde.getClass.getName)
streamsConfiguration.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.stringSerde.getClass.getName)
streamsConfiguration.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, NewSerdes.stringSerde.getClass.getName)
streamsConfiguration.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, NewSerdes.stringSerde.getClass.getName)
val streamBuilder = new StreamsBuilderJ
val textLines: KStreamJ[String, String] = streamBuilder.stream[String, String](inputTopicJ)
@ -162,7 +162,7 @@ class WordCountTest extends WordCountTestData { @@ -162,7 +162,7 @@ class WordCountTest extends WordCountTestData {
val wordCounts: KTableJ[String, java.lang.Long] = grouped.count()
wordCounts.toStream.to(outputTopicJ, Produced.`with`(Serdes.stringSerde, Serdes.javaLongSerde))
wordCounts.toStream.to(outputTopicJ, Produced.`with`(NewSerdes.stringSerde, NewSerdes.javaLongSerde))
val streams: KafkaStreamsJ = new KafkaStreamsJ(streamBuilder.build(), streamsConfiguration)
streams.start()

Loading…
Cancel
Save