Browse Source

chore: Fix scaladoc warnings (#13164)

Make sure no scaladoc warnings are emitted from the streams-scala project build.
We cannot fully fix all scaladoc warnings due to limitations of the scaladoc tool,
so this is a best-effort attempt at fixing as many warnings as possible. We also
disable one problematic class of scaladoc wornings (link errors) in the gradle build.

The causes of existing warnings are that we link to java members from scaladoc, which
is not possible, or we fail to disambiguate some members.

The broad rule applied in the changes is
 - For links to Java members such as [[StateStore]], we use the fully qualified name in a code tag
   to make manual link resolution via a search engine easy.
 - For some common terms that are also linked to Java members, like [[Serde]], we omit the link.
 - We disambiguate where possible.
 - In the special case of @throws declarations with Java Exceptions, we do not seem to be able
   to avoid the warning altogther.

Reviewers: Matthias J. Sax <mjsax@apache.org>, Guozhang Wang <wangguoz@gmail.com>
pull/13169/head
Lucas Brutschy 2 years ago committed by GitHub
parent
commit
eb7f490159
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 4
      build.gradle
  2. 4
      streams/streams-scala/src/main/scala/org/apache/kafka/streams/scala/kstream/Branched.scala
  3. 12
      streams/streams-scala/src/main/scala/org/apache/kafka/streams/scala/kstream/BranchedKStream.scala
  4. 6
      streams/streams-scala/src/main/scala/org/apache/kafka/streams/scala/kstream/Consumed.scala
  5. 8
      streams/streams-scala/src/main/scala/org/apache/kafka/streams/scala/kstream/Grouped.scala
  6. 8
      streams/streams-scala/src/main/scala/org/apache/kafka/streams/scala/kstream/Joined.scala
  7. 14
      streams/streams-scala/src/main/scala/org/apache/kafka/streams/scala/kstream/KStream.scala
  8. 10
      streams/streams-scala/src/main/scala/org/apache/kafka/streams/scala/kstream/KTable.scala
  9. 30
      streams/streams-scala/src/main/scala/org/apache/kafka/streams/scala/kstream/Materialized.scala
  10. 6
      streams/streams-scala/src/main/scala/org/apache/kafka/streams/scala/kstream/Produced.scala
  11. 6
      streams/streams-scala/src/main/scala/org/apache/kafka/streams/scala/kstream/Repartitioned.scala
  12. 12
      streams/streams-scala/src/main/scala/org/apache/kafka/streams/scala/kstream/StreamJoined.scala

4
build.gradle

@ -2071,6 +2071,10 @@ project(':streams:streams-scala') {
include "**/org/apache/kafka/streams/scala/**" include "**/org/apache/kafka/streams/scala/**"
} }
scaladoc {
scalaDocOptions.additionalParameters = ["-no-link-warnings"]
}
tasks.create(name: "copyDependantLibs", type: Copy) { tasks.create(name: "copyDependantLibs", type: Copy) {
from (configurations.runtimeClasspath) { from (configurations.runtimeClasspath) {
exclude('kafka-streams*') exclude('kafka-streams*')

4
streams/streams-scala/src/main/scala/org/apache/kafka/streams/scala/kstream/Branched.scala

@ -36,7 +36,7 @@ object Branched {
* *
* @param chain A function that will be applied to the branch. If the provided function returns * @param chain A function that will be applied to the branch. If the provided function returns
* `null`, its result is ignored, otherwise it is added to the Map returned * `null`, its result is ignored, otherwise it is added to the Map returned
* by [[BranchedKStream.defaultBranch]] or [[BranchedKStream.noDefaultBranch]] (see * by [[BranchedKStream.defaultBranch()*]] or [[BranchedKStream.noDefaultBranch]] (see
* [[BranchedKStream]] description for details). * [[BranchedKStream]] description for details).
* @param name the branch name suffix to be used. If `null`, a default branch name suffix will be generated * @param name the branch name suffix to be used. If `null`, a default branch name suffix will be generated
* (see [[BranchedKStream]] description for details) * (see [[BranchedKStream]] description for details)
@ -53,7 +53,7 @@ object Branched {
* *
* @param chain A consumer to which the branch will be sent. If a non-null consumer is provided here, * @param chain A consumer to which the branch will be sent. If a non-null consumer is provided here,
* the respective branch will not be added to the resulting Map returned * the respective branch will not be added to the resulting Map returned
* by [[BranchedKStream.defaultBranch]] or [[BranchedKStream.noDefaultBranch]] (see * by [[BranchedKStream.defaultBranch()*]] or [[BranchedKStream.noDefaultBranch]] (see
* [[BranchedKStream]] description for details). * [[BranchedKStream]] description for details).
* @param name the branch name suffix to be used. If `null`, a default branch name suffix will be generated * @param name the branch name suffix to be used. If `null`, a default branch name suffix will be generated
* (see [[BranchedKStream]] description for details) * (see [[BranchedKStream]] description for details)

12
streams/streams-scala/src/main/scala/org/apache/kafka/streams/scala/kstream/BranchedKStream.scala

@ -27,7 +27,8 @@ import scala.jdk.CollectionConverters._
/** /**
* Branches the records in the original stream based on the predicates supplied for the branch definitions. * Branches the records in the original stream based on the predicates supplied for the branch definitions.
* <p> * <p>
* Branches are defined with [[branch]] or [[defaultBranch]] methods. Each record is evaluated against the predicates * Branches are defined with [[branch]] or [[defaultBranch()*]]
* methods. Each record is evaluated against the predicates
* supplied via [[Branched]] parameters, and is routed to the first branch for which its respective predicate * supplied via [[Branched]] parameters, and is routed to the first branch for which its respective predicate
* evaluates to `true`. If a record does not match any predicates, it will be routed to the default branch, * evaluates to `true`. If a record does not match any predicates, it will be routed to the default branch,
* or dropped if no default branch is created. * or dropped if no default branch is created.
@ -36,11 +37,12 @@ import scala.jdk.CollectionConverters._
* Each branch (which is a [[KStream]] instance) then can be processed either by * Each branch (which is a [[KStream]] instance) then can be processed either by
* a function or a consumer provided via a [[Branched]] * a function or a consumer provided via a [[Branched]]
* parameter. If certain conditions are met, it also can be accessed from the `Map` returned by * parameter. If certain conditions are met, it also can be accessed from the `Map` returned by
* an optional [[defaultBranch]] or [[noDefaultBranch]] method call. * an optional [[defaultBranch()*]] or [[noDefaultBranch]] method call.
* <p> * <p>
* The branching happens on a first match basis: A record in the original stream is assigned to the corresponding result * The branching happens on a first match basis: A record in the original stream is assigned to the corresponding result
* stream for the first predicate that evaluates to true, and is assigned to this stream only. If you need * stream for the first predicate that evaluates to true, and is assigned to this stream only. If you need
* to route a record to multiple streams, you can apply multiple [[KStream.filter]] operators to the same [[KStream]] * to route a record to multiple streams, you can apply multiple
* [[KStream.filter]] operators to the same [[KStream]]
* instance, one for each predicate, instead of branching. * instance, one for each predicate, instead of branching.
* <p> * <p>
* The process of routing the records to different branches is a stateless record-by-record operation. * The process of routing the records to different branches is a stateless record-by-record operation.
@ -83,7 +85,7 @@ class BranchedKStream[K, V](val inner: BranchedKStreamJ[K, V]) {
/** /**
* Finalize the construction of branches and defines the default branch for the messages not intercepted * Finalize the construction of branches and defines the default branch for the messages not intercepted
* by other branches. Calling [[defaultBranch]] or [[noDefaultBranch]] is optional. * by other branches. Calling [[defaultBranch()*]] or [[noDefaultBranch]] is optional.
* *
* @return Map of named branches. For rules of forming the resulting map, see [[BranchedKStream]] * @return Map of named branches. For rules of forming the resulting map, see [[BranchedKStream]]
* description. * description.
@ -92,7 +94,7 @@ class BranchedKStream[K, V](val inner: BranchedKStreamJ[K, V]) {
/** /**
* Finalize the construction of branches and defines the default branch for the messages not intercepted * Finalize the construction of branches and defines the default branch for the messages not intercepted
* by other branches. Calling [[defaultBranch]] or [[noDefaultBranch]] is optional. * by other branches. Calling [[defaultBranch()*]] or [[noDefaultBranch]] is optional.
* *
* @param branched A [[Branched]] parameter, that allows to define a branch name, an in-place * @param branched A [[Branched]] parameter, that allows to define a branch name, an in-place
* branch consumer or branch mapper for [[BranchedKStream]]. * branch consumer or branch mapper for [[BranchedKStream]].

6
streams/streams-scala/src/main/scala/org/apache/kafka/streams/scala/kstream/Consumed.scala

@ -43,7 +43,7 @@ object Consumed {
ConsumedJ.`with`(keySerde, valueSerde, timestampExtractor, resetPolicy) ConsumedJ.`with`(keySerde, valueSerde, timestampExtractor, resetPolicy)
/** /**
* Create an instance of [[Consumed]] with key and value [[Serde]]s. * Create an instance of [[Consumed]] with key and value Serdes.
* *
* @tparam K key type * @tparam K key type
* @tparam V value type * @tparam V value type
@ -53,7 +53,7 @@ object Consumed {
ConsumedJ.`with`(keySerde, valueSerde) ConsumedJ.`with`(keySerde, valueSerde)
/** /**
* Create an instance of [[Consumed]] with a [[TimestampExtractor]]. * Create an instance of [[Consumed]] with a `org.apache.kafka.streams.processor.TimestampExtractor`.
* *
* @param timestampExtractor the timestamp extractor to used. If `null` the default timestamp extractor from * @param timestampExtractor the timestamp extractor to used. If `null` the default timestamp extractor from
* config will be used * config will be used
@ -67,7 +67,7 @@ object Consumed {
ConsumedJ.`with`(timestampExtractor).withKeySerde(keySerde).withValueSerde(valueSerde) ConsumedJ.`with`(timestampExtractor).withKeySerde(keySerde).withValueSerde(valueSerde)
/** /**
* Create an instance of [[Consumed]] with a [[Topology.AutoOffsetReset]]. * Create an instance of [[Consumed]] with a `org.apache.kafka.streams.Topology.AutoOffsetReset`.
* *
* @tparam K key type * @tparam K key type
* @tparam V value type * @tparam V value type

8
streams/streams-scala/src/main/scala/org/apache/kafka/streams/scala/kstream/Grouped.scala

@ -22,8 +22,8 @@ import org.apache.kafka.streams.kstream.{Grouped => GroupedJ}
object Grouped { object Grouped {
/** /**
* Construct a `Grouped` instance with the provided key and value [[Serde]]s. * Construct a `Grouped` instance with the provided key and value Serdes.
* If the [[Serde]] params are `null` the default serdes defined in the configs will be used. * If the Serde params are `null` the default serdes defined in the configs will be used.
* *
* @tparam K the key type * @tparam K the key type
* @tparam V the value type * @tparam V the value type
@ -35,8 +35,8 @@ object Grouped {
GroupedJ.`with`(keySerde, valueSerde) GroupedJ.`with`(keySerde, valueSerde)
/** /**
* Construct a `Grouped` instance with the provided key and value [[Serde]]s. * Construct a `Grouped` instance with the provided key and value Serdes.
* If the [[Serde]] params are `null` the default serdes defined in the configs will be used. * If the Serde params are `null` the default serdes defined in the configs will be used.
* *
* @tparam K the key type * @tparam K the key type
* @tparam V the value type * @tparam V the value type

8
streams/streams-scala/src/main/scala/org/apache/kafka/streams/scala/kstream/Joined.scala

@ -22,7 +22,7 @@ import org.apache.kafka.streams.kstream.{Joined => JoinedJ}
object Joined { object Joined {
/** /**
* Create an instance of [[org.apache.kafka.streams.kstream.Joined]] with key, value, and otherValue [[Serde]] * Create an instance of `org.apache.kafka.streams.kstream.Joined` with key, value, and otherValue Serde
* instances. * instances.
* `null` values are accepted and will be replaced by the default serdes as defined in config. * `null` values are accepted and will be replaced by the default serdes as defined in config.
* *
@ -32,7 +32,7 @@ object Joined {
* @param keySerde the key serde to use. * @param keySerde the key serde to use.
* @param valueSerde the value serde to use. * @param valueSerde the value serde to use.
* @param otherValueSerde the otherValue serde to use. If `null` the default value serde from config will be used * @param otherValueSerde the otherValue serde to use. If `null` the default value serde from config will be used
* @return new [[org.apache.kafka.streams.kstream.Joined]] instance with the provided serdes * @return new `org.apache.kafka.streams.kstream.Joined` instance with the provided serdes
*/ */
def `with`[K, V, VO](implicit def `with`[K, V, VO](implicit
keySerde: Serde[K], keySerde: Serde[K],
@ -42,7 +42,7 @@ object Joined {
JoinedJ.`with`(keySerde, valueSerde, otherValueSerde) JoinedJ.`with`(keySerde, valueSerde, otherValueSerde)
/** /**
* Create an instance of [[org.apache.kafka.streams.kstream.Joined]] with key, value, and otherValue [[Serde]] * Create an instance of `org.apache.kafka.streams.kstream.Joined` with key, value, and otherValue Serde
* instances. * instances.
* `null` values are accepted and will be replaced by the default serdes as defined in config. * `null` values are accepted and will be replaced by the default serdes as defined in config.
* *
@ -53,7 +53,7 @@ object Joined {
* @param keySerde the key serde to use. * @param keySerde the key serde to use.
* @param valueSerde the value serde to use. * @param valueSerde the value serde to use.
* @param otherValueSerde the otherValue serde to use. If `null` the default value serde from config will be used * @param otherValueSerde the otherValue serde to use. If `null` the default value serde from config will be used
* @return new [[org.apache.kafka.streams.kstream.Joined]] instance with the provided serdes * @return new `org.apache.kafka.streams.kstream.Joined` instance with the provided serdes
*/ */
// disable spotless scala, which wants to make a mess of the argument lists // disable spotless scala, which wants to make a mess of the argument lists
// format: off // format: off

14
streams/streams-scala/src/main/scala/org/apache/kafka/streams/scala/kstream/KStream.scala

@ -46,7 +46,7 @@ import org.apache.kafka.streams.scala.FunctionsCompatConversions.{
import scala.jdk.CollectionConverters._ import scala.jdk.CollectionConverters._
/** /**
* Wraps the Java class [[org.apache.kafka.streams.kstream.KStream KStream]] and delegates method calls to the * Wraps the Java class `org.apache.kafka.streams.kstream.KStream` and delegates method calls to the
* underlying Java object. * underlying Java object.
* *
* @tparam K Type of keys * @tparam K Type of keys
@ -823,7 +823,7 @@ class KStream[K, V](val inner: KStreamJ[K, V]) {
* It's not required to connect global state stores that are added via `addGlobalStore`; * It's not required to connect global state stores that are added via `addGlobalStore`;
* read-only access to global state stores is available by default. * read-only access to global state stores is available by default.
* *
* @param processorSupplier a function that generates a [[org.apache.kafka.streams.processor.Processor]] * @param processorSupplier a function that generates a `org.apache.kafka.streams.processor.Processor`
* @param stateStoreNames the names of the state store used by the processor * @param stateStoreNames the names of the state store used by the processor
* @see `org.apache.kafka.streams.kstream.KStream#process` * @see `org.apache.kafka.streams.kstream.KStream#process`
*/ */
@ -844,7 +844,7 @@ class KStream[K, V](val inner: KStreamJ[K, V]) {
* It's not required to connect global state stores that are added via `addGlobalStore`; * It's not required to connect global state stores that are added via `addGlobalStore`;
* read-only access to global state stores is available by default. * read-only access to global state stores is available by default.
* *
* @param processorSupplier a function that generates a [[org.apache.kafka.streams.processor.Processor]] * @param processorSupplier a function that generates a `org.apache.kafka.streams.processor.Processor`
* @param named a [[Named]] config used to name the processor in the topology * @param named a [[Named]] config used to name the processor in the topology
* @param stateStoreNames the names of the state store used by the processor * @param stateStoreNames the names of the state store used by the processor
* @see `org.apache.kafka.streams.kstream.KStream#process` * @see `org.apache.kafka.streams.kstream.KStream#process`
@ -870,7 +870,7 @@ class KStream[K, V](val inner: KStreamJ[K, V]) {
* Note that this overload takes a ProcessorSupplier instead of a Function to avoid post-erasure ambiguity with * Note that this overload takes a ProcessorSupplier instead of a Function to avoid post-erasure ambiguity with
* the older (deprecated) overload. * the older (deprecated) overload.
* *
* @param processorSupplier a supplier for [[org.apache.kafka.streams.processor.api.Processor]] * @param processorSupplier a supplier for `org.apache.kafka.streams.processor.api.Processor`
* @param stateStoreNames the names of the state store used by the processor * @param stateStoreNames the names of the state store used by the processor
* @see `org.apache.kafka.streams.kstream.KStream#process` * @see `org.apache.kafka.streams.kstream.KStream#process`
*/ */
@ -888,7 +888,7 @@ class KStream[K, V](val inner: KStreamJ[K, V]) {
* Note that this overload takes a ProcessorSupplier instead of a Function to avoid post-erasure ambiguity with * Note that this overload takes a ProcessorSupplier instead of a Function to avoid post-erasure ambiguity with
* the older (deprecated) overload. * the older (deprecated) overload.
* *
* @param processorSupplier a supplier for [[org.apache.kafka.streams.processor.api.Processor]] * @param processorSupplier a supplier for `org.apache.kafka.streams.processor.api.Processor`
* @param named a [[Named]] config used to name the processor in the topology * @param named a [[Named]] config used to name the processor in the topology
* @param stateStoreNames the names of the state store used by the processor * @param stateStoreNames the names of the state store used by the processor
* @see `org.apache.kafka.streams.kstream.KStream#process` * @see `org.apache.kafka.streams.kstream.KStream#process`
@ -911,7 +911,7 @@ class KStream[K, V](val inner: KStreamJ[K, V]) {
* Note that this overload takes a FixedKeyProcessorSupplier instead of a Function to avoid post-erasure ambiguity with * Note that this overload takes a FixedKeyProcessorSupplier instead of a Function to avoid post-erasure ambiguity with
* the older (deprecated) overload. * the older (deprecated) overload.
* *
* @param processorSupplier a supplier for [[org.apache.kafka.streams.processor.api.FixedKeyProcessor]] * @param processorSupplier a supplier for `org.apache.kafka.streams.processor.api.FixedKeyProcessor`
* @param stateStoreNames the names of the state store used by the processor * @param stateStoreNames the names of the state store used by the processor
* @see `org.apache.kafka.streams.kstream.KStream#process` * @see `org.apache.kafka.streams.kstream.KStream#process`
*/ */
@ -932,7 +932,7 @@ class KStream[K, V](val inner: KStreamJ[K, V]) {
* Note that this overload takes a ProcessorSupplier instead of a Function to avoid post-erasure ambiguity with * Note that this overload takes a ProcessorSupplier instead of a Function to avoid post-erasure ambiguity with
* the older (deprecated) overload. * the older (deprecated) overload.
* *
* @param processorSupplier a supplier for [[org.apache.kafka.streams.processor.api.FixedKeyProcessor]] * @param processorSupplier a supplier for `org.apache.kafka.streams.processor.api.FixedKeyProcessor`
* @param named a [[Named]] config used to name the processor in the topology * @param named a [[Named]] config used to name the processor in the topology
* @param stateStoreNames the names of the state store used by the processor * @param stateStoreNames the names of the state store used by the processor
* @see `org.apache.kafka.streams.kstream.KStream#process` * @see `org.apache.kafka.streams.kstream.KStream#process`

10
streams/streams-scala/src/main/scala/org/apache/kafka/streams/scala/kstream/KTable.scala

@ -30,7 +30,7 @@ import org.apache.kafka.streams.scala.FunctionsCompatConversions.{
import org.apache.kafka.streams.state.KeyValueStore import org.apache.kafka.streams.state.KeyValueStore
/** /**
* Wraps the Java class [[org.apache.kafka.streams.kstream.KTable]] and delegates method calls to the underlying Java object. * Wraps the Java class `org.apache.kafka.streams.kstream.KTable` and delegates method calls to the underlying Java object.
* *
* @tparam K Type of keys * @tparam K Type of keys
* @tparam V Type of values * @tparam V Type of values
@ -312,7 +312,7 @@ class KTable[K, V](val inner: KTableJ[K, V]) {
new KStream(inner.toStream[KR](mapper.asKeyValueMapper, named)) new KStream(inner.toStream[KR](mapper.asKeyValueMapper, named))
/** /**
* Suppress some updates from this changelog stream, determined by the supplied [[org.apache.kafka.streams.kstream.Suppressed]] configuration. * Suppress some updates from this changelog stream, determined by the supplied `org.apache.kafka.streams.kstream.Suppressed` configuration.
* *
* This controls what updates downstream table and stream operations will receive. * This controls what updates downstream table and stream operations will receive.
* *
@ -673,7 +673,8 @@ class KTable[K, V](val inner: KTableJ[K, V]) {
* @param other the other [[KTable]] to be joined with this [[KTable]], keyed on the value obtained from keyExtractor * @param other the other [[KTable]] to be joined with this [[KTable]], keyed on the value obtained from keyExtractor
* @param keyExtractor a function that extracts the foreign key from this table's value * @param keyExtractor a function that extracts the foreign key from this table's value
* @param joiner a function that computes the join result for a pair of matching records * @param joiner a function that computes the join result for a pair of matching records
* @param tableJoined a [[TableJoined]] used to configure partitioners and names of internal topics and stores * @param tableJoined a `org.apache.kafka.streams.kstream.TableJoined` used to configure
* partitioners and names of internal topics and stores
* @param materialized a `Materialized` that describes how the `StateStore` for the resulting [[KTable]] * @param materialized a `Materialized` that describes how the `StateStore` for the resulting [[KTable]]
* should be materialized. * should be materialized.
* @return a [[KTable]] that contains join-records for each key and values computed by the given joiner, * @return a [[KTable]] that contains join-records for each key and values computed by the given joiner,
@ -738,7 +739,8 @@ class KTable[K, V](val inner: KTableJ[K, V]) {
* @param other the other [[KTable]] to be joined with this [[KTable]], keyed on the value obtained from keyExtractor * @param other the other [[KTable]] to be joined with this [[KTable]], keyed on the value obtained from keyExtractor
* @param keyExtractor a function that extracts the foreign key from this table's value * @param keyExtractor a function that extracts the foreign key from this table's value
* @param joiner a function that computes the join result for a pair of matching records * @param joiner a function that computes the join result for a pair of matching records
* @param tableJoined a [[TableJoined]] used to configure partitioners and names of internal topics and stores * @param tableJoined a `org.apache.kafka.streams.kstream.TableJoined` used to configure
* partitioners and names of internal topics and stores
* @param materialized a `Materialized` that describes how the `StateStore` for the resulting [[KTable]] * @param materialized a `Materialized` that describes how the `StateStore` for the resulting [[KTable]]
* should be materialized. * should be materialized.
* @return a [[KTable]] that contains join-records for each key and values computed by the given joiner, * @return a [[KTable]] that contains join-records for each key and values computed by the given joiner,

30
streams/streams-scala/src/main/scala/org/apache/kafka/streams/scala/kstream/Materialized.scala

@ -25,25 +25,25 @@ import org.apache.kafka.streams.state.{KeyValueBytesStoreSupplier, SessionBytesS
object Materialized { object Materialized {
/** /**
* Materialize a [[StateStore]] with the provided key and value [[Serde]]s. * Materialize a `org.apache.kafka.streams.processor.StateStore` with the provided key and value Serdes.
* An internal name will be used for the store. * An internal name will be used for the store.
* *
* @tparam K key type * @tparam K key type
* @tparam V value type * @tparam V value type
* @tparam S store type * @tparam S store type
* @param keySerde the key [[Serde]] to use. * @param keySerde the key Serde to use.
* @param valueSerde the value [[Serde]] to use. * @param valueSerde the value Serde to use.
* @return a new [[Materialized]] instance with the given key and value serdes * @return a new [[Materialized]] instance with the given key and value serdes
*/ */
def `with`[K, V, S <: StateStore](implicit keySerde: Serde[K], valueSerde: Serde[V]): MaterializedJ[K, V, S] = def `with`[K, V, S <: StateStore](implicit keySerde: Serde[K], valueSerde: Serde[V]): MaterializedJ[K, V, S] =
MaterializedJ.`with`(keySerde, valueSerde) MaterializedJ.`with`(keySerde, valueSerde)
/** /**
* Materialize a [[StateStore]] with the given name. * Materialize a `org.apache.kafka.streams.processor.StateStore` with the given name.
* *
* @tparam K key type of the store * @tparam K key type of the store
* @tparam V value type of the store * @tparam V value type of the store
* @tparam S type of the [[StateStore]] * @tparam S type of the `org.apache.kafka.streams.processor.StateStore`
* @param storeName the name of the underlying [[org.apache.kafka.streams.scala.kstream.KTable]] state store; * @param storeName the name of the underlying [[org.apache.kafka.streams.scala.kstream.KTable]] state store;
* valid characters are ASCII alphanumerics, '.', '_' and '-'. * valid characters are ASCII alphanumerics, '.', '_' and '-'.
* @param keySerde the key serde to use. * @param keySerde the key serde to use.
@ -56,15 +56,16 @@ object Materialized {
MaterializedJ.as(storeName).withKeySerde(keySerde).withValueSerde(valueSerde) MaterializedJ.as(storeName).withKeySerde(keySerde).withValueSerde(valueSerde)
/** /**
* Materialize a [[org.apache.kafka.streams.state.WindowStore]] using the provided [[WindowBytesStoreSupplier]]. * Materialize a `org.apache.kafka.streams.state.WindowStore` using the provided
* `org.apache.kafka.streams.state.WindowBytesStoreSupplier`.
* *
* Important: Custom subclasses are allowed here, but they should respect the retention contract: * Important: Custom subclasses are allowed here, but they should respect the retention contract:
* Window stores are required to retain windows at least as long as (window size + window grace period). * Window stores are required to retain windows at least as long as (window size + window grace period).
* Stores constructed via [[org.apache.kafka.streams.state.Stores]] already satisfy this contract. * Stores constructed via `org.apache.kafka.streams.state.Stores` already satisfy this contract.
* *
* @tparam K key type of the store * @tparam K key type of the store
* @tparam V value type of the store * @tparam V value type of the store
* @param supplier the [[WindowBytesStoreSupplier]] used to materialize the store * @param supplier the `org.apache.kafka.streams.state.WindowBytesStoreSupplier` used to materialize the store
* @param keySerde the key serde to use. * @param keySerde the key serde to use.
* @param valueSerde the value serde to use. * @param valueSerde the value serde to use.
* @return a new [[Materialized]] instance with the given supplier * @return a new [[Materialized]] instance with the given supplier
@ -75,15 +76,16 @@ object Materialized {
MaterializedJ.as(supplier).withKeySerde(keySerde).withValueSerde(valueSerde) MaterializedJ.as(supplier).withKeySerde(keySerde).withValueSerde(valueSerde)
/** /**
* Materialize a [[org.apache.kafka.streams.state.SessionStore]] using the provided [[SessionBytesStoreSupplier]]. * Materialize a `org.apache.kafka.streams.state.SessionStore` using the provided
* `org.apache.kafka.streams.state.SessionBytesStoreSupplier`.
* *
* Important: Custom subclasses are allowed here, but they should respect the retention contract: * Important: Custom subclasses are allowed here, but they should respect the retention contract:
* Session stores are required to retain windows at least as long as (session inactivity gap + session grace period). * Session stores are required to retain windows at least as long as (session inactivity gap + session grace period).
* Stores constructed via [[org.apache.kafka.streams.state.Stores]] already satisfy this contract. * Stores constructed via `org.apache.kafka.streams.state.Stores` already satisfy this contract.
* *
* @tparam K key type of the store * @tparam K key type of the store
* @tparam V value type of the store * @tparam V value type of the store
* @param supplier the [[SessionBytesStoreSupplier]] used to materialize the store * @param supplier the `org.apache.kafka.streams.state.SessionBytesStoreSupplier` used to materialize the store
* @param keySerde the key serde to use. * @param keySerde the key serde to use.
* @param valueSerde the value serde to use. * @param valueSerde the value serde to use.
* @return a new [[Materialized]] instance with the given supplier * @return a new [[Materialized]] instance with the given supplier
@ -94,11 +96,13 @@ object Materialized {
MaterializedJ.as(supplier).withKeySerde(keySerde).withValueSerde(valueSerde) MaterializedJ.as(supplier).withKeySerde(keySerde).withValueSerde(valueSerde)
/** /**
* Materialize a [[org.apache.kafka.streams.state.KeyValueStore]] using the provided [[KeyValueBytesStoreSupplier]]. * Materialize a `org.apache.kafka.streams.state.KeyValueStore` using the provided
* `org.apache.kafka.streams.state.KeyValueBytesStoreSupplier`.
* *
* @tparam K key type of the store * @tparam K key type of the store
* @tparam V value type of the store * @tparam V value type of the store
* @param supplier the [[KeyValueBytesStoreSupplier]] used to materialize the store * @param supplier the `org.apache.kafka.streams.state.KeyValueBytesStoreSupplier` used to
* materialize the store
* @param keySerde the key serde to use. * @param keySerde the key serde to use.
* @param valueSerde the value serde to use. * @param valueSerde the value serde to use.
* @return a new [[Materialized]] instance with the given supplier * @return a new [[Materialized]] instance with the given supplier

6
streams/streams-scala/src/main/scala/org/apache/kafka/streams/scala/kstream/Produced.scala

@ -43,9 +43,9 @@ object Produced {
* @tparam V value type * @tparam V value type
* @param partitioner the function used to determine how records are distributed among partitions of the topic, * @param partitioner the function used to determine how records are distributed among partitions of the topic,
* if not specified and `keySerde` provides a * if not specified and `keySerde` provides a
* [[org.apache.kafka.streams.kstream.internals.WindowedSerializer]] for the key * `org.apache.kafka.streams.kstream.internals.WindowedSerializer` for the key
* [[org.apache.kafka.streams.kstream.internals.WindowedStreamPartitioner]] will be * `org.apache.kafka.streams.kstream.internals.WindowedStreamPartitioner` will be
* used&mdash;otherwise [[org.apache.kafka.clients.producer.internals.DefaultPartitioner]] * used&mdash;otherwise `org.apache.kafka.clients.producer.internals.DefaultPartitioner`
* will be used * will be used
* @param keySerde Serde to use for serializing the key * @param keySerde Serde to use for serializing the key
* @param valueSerde Serde to use for serializing the value * @param valueSerde Serde to use for serializing the value

6
streams/streams-scala/src/main/scala/org/apache/kafka/streams/scala/kstream/Repartitioned.scala

@ -56,9 +56,9 @@ object Repartitioned {
* @tparam V value type * @tparam V value type
* @param partitioner the function used to determine how records are distributed among partitions of the topic, * @param partitioner the function used to determine how records are distributed among partitions of the topic,
* if not specified and `keySerde` provides a * if not specified and `keySerde` provides a
* [[org.apache.kafka.streams.kstream.internals.WindowedSerializer]] for the key * `org.apache.kafka.streams.kstream.internals.WindowedSerializer` for the key
* [[org.apache.kafka.streams.kstream.internals.WindowedStreamPartitioner]] will be * `org.apache.kafka.streams.kstream.internals.WindowedStreamPartitioner` will be
* used&mdash;otherwise [[org.apache.kafka.clients.producer.internals.DefaultPartitioner]] * used&mdash;otherwise `org.apache.kafka.clients.producer.internals.DefaultPartitioner`
* will be used * will be used
* @param keySerde Serde to use for serializing the key * @param keySerde Serde to use for serializing the key
* @param valueSerde Serde to use for serializing the value * @param valueSerde Serde to use for serializing the value

12
streams/streams-scala/src/main/scala/org/apache/kafka/streams/scala/kstream/StreamJoined.scala

@ -23,8 +23,8 @@ import org.apache.kafka.streams.state.WindowBytesStoreSupplier
object StreamJoined { object StreamJoined {
/** /**
* Create an instance of [[StreamJoined]] with key, value, and otherValue [[Serde]] * Create an instance of [[StreamJoined]] with key, value, and otherValue
* instances. * `org.apache.kafka.common.serialization.Serde` instances.
* `null` values are accepted and will be replaced by the default serdes as defined in config. * `null` values are accepted and will be replaced by the default serdes as defined in config.
* *
* @tparam K key type * @tparam K key type
@ -44,8 +44,8 @@ object StreamJoined {
/** /**
* Create an instance of [[StreamJoined]] with store suppliers for the calling stream * Create an instance of [[StreamJoined]] with store suppliers for the calling stream
* and the other stream. Also adds the key, value, and otherValue [[Serde]] * and the other stream. Also adds the key, value, and otherValue
* instances. * `org.apache.kafka.common.serialization.Serde` instances.
* `null` values are accepted and will be replaced by the default serdes as defined in config. * `null` values are accepted and will be replaced by the default serdes as defined in config.
* *
* @tparam K key type * @tparam K key type
@ -70,8 +70,8 @@ object StreamJoined {
/** /**
* Create an instance of [[StreamJoined]] with the name used for naming * Create an instance of [[StreamJoined]] with the name used for naming
* the state stores involved in the join. Also adds the key, value, and otherValue [[Serde]] * the state stores involved in the join. Also adds the key, value, and otherValue
* instances. * `org.apache.kafka.common.serialization.Serde` instances.
* `null` values are accepted and will be replaced by the default serdes as defined in config. * `null` values are accepted and will be replaced by the default serdes as defined in config.
* *
* @tparam K key type * @tparam K key type

Loading…
Cancel
Save