The RegexSourceIntegrationTest has some flakiness as it deletes and re-creates the same output topic before each test. This PR reduces the chance for errors by creating a unique output topic for each test.
Reviewers: Matthias J. Sax <mjsax@apache.org>, Boyang Chen <boyang@confluent.io>
fix checkpoint file warning by filtering checkpointable offsets per task
clean up state manager hierarchy to prevent similar bugs
Reviewers: Bruno Cadonna <bruno@confluent.io>, Bill Bejeck <bbejeck@gmail.com>
Users often use the RocksDBConfigSetter to modify parameters such as cache or block size, which must be set through the BlockBasedTableConfig object. Rather than creating a new object in the config setter, however, users should most likely retrieve a reference to the existing one so as to not lose the other defaults (eg the BloomFilter)
There have been notes from the community that it is not obvious this should be done, nor is it immediately clear how to do so. This PR updates the RocksDBConfigSetter docs to hopefully improve things.
I also piggybacked a few minor cleanups in the docs
Reviewers: Kamal Chandraprakash, Jim Galasyn <jim.galasyn@confluent.io>, Bruno Cadonna <bruno@confluent.io>, Bill Bejeck <bbejeck@gmail.com>
This PR fixes a wrong input stream name in PipeDemo's javadoc.
Reviewers: Kamal Chandraprakash <kamal.chandraprakash@gmail.com>, Jason Gustafson <jason@confluent.io>
KIP-91 was included in Kafka 2.1.0, so we should mention
`delivery.timeout.ms` in the hint as it's the config that
users would want to change in most cases.
Reviewers: Matthias J. Sax <matthias@confluent.io>, John Roesler <john@confluent.io>, Bill Bejeck <bbejeck@gmail.com>, Guozhang Wang <wangguoz@gmail.com>
Scala 2.13 support was added to build via #5454. This PR adjusts the code so that
it compiles with 2.11, 2.12 and 2.13.
Changes:
* Add `scala-collection-compat` dependency.
* Import `scala.collection.Seq` in a number of places for consistent behavior between
Scala 2.11, 2.12 and 2.13.
* Remove wildcard imports that were causing the Java classes to have priority over the
Scala ones, related Scala issue: https://github.com/scala/scala/pull/6589.
* Replace parallel collection usage with `Future`. The former is no longer included by
default in the standard library.
* Replace val _: Unit workaround with one that is more concise and works with Scala 2.13
* Replace `filterKeys` with `filter` when we expect a `Map`. `filterKeys` returns a view
that doesn't implement the `Map` trait in Scala 2.13.
* Replace `mapValues` with `map` or add a `toMap` as an additional transformation
when we expect a `Map`. `mapValues` returns a view that doesn't implement the
`Map` trait in Scala 2.13.
* Replace `breakOut` with `iterator` and `to`, `breakOut` was removed in Scala
2.13.
* Replace to() with toMap, toIndexedSeq and toSet
* Replace `mutable.Buffer.--` with `filterNot`.
* ControlException is an abstract class in Scala 2.13.
* Variable arguments can only receive arrays or immutable.Seq in Scala 2.13.
* Use `Factory` instead of `CanBuildFrom` in DecodeJson. `CanBuildFrom` behaves
a bit differently in Scala 2.13 and it's been deprecated. `Factory` has the behavior
we need and it's available via the compat library.
* Fix failing tests due to behavior change in Scala 2.13,
"Map.values.map is not strict in Scala 2.13" (https://github.com/scala/bug/issues/11589).
* Use Java collections instead of Scala ones in StreamResetter (a Java class).
* Adjust CheckpointFile.write to take an `Iterable` instead of `Seq` to avoid
unnecessary collection copies.
* Fix DelayedElectLeader to use a Map instead of Set and avoid `to` call that
doesn't work in Scala 2.13.
* Use unordered map for mapping in SimpleAclAuthorizer, mapping of ordered
maps require an `Ordering` in Scala 2.13 for safety reasons.
* Adapt `ConsumerGroupCommand` to compile with Scala 2.13.
* CoreUtils.min takes an `Iterable` instead of `TraversableOnce`, the latter does
not exist in Scala 2.13.
* Replace `Unit` with `()` in a couple places. Scala 2.13 is stricter when it expects
a value instead of a type.
* Fix bug in CustomQuotaCallbackTest where we did not necessarily set `partitionRatio`
correctly, `forall` can terminate early.
* Add a couple of spotbugs exclusions that are needed by code generated by Scala 2.13
* Remove unused variables, simplify some code and remove procedure syntax in a few
places.
* Remove unused `CoreUtils.JSONEscapeString`.
Reviewers: Manikumar Reddy <manikumar.reddy@gmail.com>, José Armando García Sancio <jsancio@users.noreply.github.com>
The purpose here is to leverage static membership information during round robin consumer assignment, because persistent member id could help make the assignment remain the same during rebalance.
The comparison logic is changed to:
1. If member A and member B both have group.instance.id, then compare their group.instance.id
2. If member A has group.instance.id, while member B doesn't, then A < B
3. If both member A and B don't have group.instance.id, compare their member.id
In round robin assignor, we use ephemeral member.id to sort the members in order for assignment. This semantic is not stable and could trigger unnecessary shuffle of tasks. By leveraging group.instance.id the static member assignment shall be persist when satisfying following conditions:
1. number of members remain the same across generation
2. static members' identities persist across generation
Reviewers: A. Sophie Blee-Goldman <sophie@confluent.io>, Guozhang Wang <wangguoz@gmail.com>
Sub-task required to allow to define custom processor names with KStreams DSL(KIP-307). This is the 4th PR for KIP-307.
Reviewers: John Roesler <john@confluent.io>, Bill Bejeck <bbejeck@gmail.com>
De-duplicate the common case in which the prior value is the same as the old value.
Reviewers: Sophie Blee-Goldman <sophie@confluent.io>, Bill Bejeck <bbejeck@gmail.com>
- Timeout occurred due to initial slow rebalancing.
- Added code to wait until `KafkaStreams` instance is in state RUNNING to check registration of metrics and in state NOT_RUNNING to check deregistration of metrics.
- I removed all other wait conditions, because they are not needed if `KafkaStreams` instance is in the right state.
Reviewers: Guozhang Wang <wangguoz@gmail.com>
Messages containing key and value were moved to the TRACE logging level, however the exception is still adding the key and value.
This commits remove the key and value from StreamsException.
Reviewers: Bill Bejeck <bbejeck@gmail.com>
The ResetIntegrationTest has experienced several failures and it seems the current timeout of 10 seconds may not be enough time
Reviewers: Matthias J. Sax <mjsax@apache.org>, Boyang Chen <boyang@confluent.io>
As title suggested, we boost 3 stream instances stream job with one minute session timeout, and once the group is stable, doing couple of rolling bounces for the entire cluster. Every rejoin based on restart should have no generation bump on the client side.
Reviewers: Guozhang Wang <wangguoz@gmail.com>, Bill Bejeck <bbejeck@gmail.com>
In RocksDBTimestampedStore#openRocksDB we try to open a db with two column families. If this succeeds but the first column family is empty (db.newIterator.seekToFirst.isValid() == false) we never actually close its ColumnFamilyHandle
Reviewers: Matthias J. Sax <matthias@confluent.io>, Guozhang Wang <wangguoz@gmail.com>
Quick tech debt cleanup. For some reason StreamsPartitionAssignor uses an InternalTopicMetadata class which wraps an InternalTopicConfig object along with the number of partitions. But InternalTopicConfig already has a numPartitions field, so we should just use it directly instead.
Reviewers: Guozhang Wang <wangguoz@gmail.com>, Bruno Cadonna <bruno@confluent.io>, Bill Bejeck <bbejeck@gmail.com>
Reviewers: Bill Bejeck <bill@confluent.io>, Boyang Chen <boyang@confluent.io>, Bruno Cadonna <bruno@confluent.io>, Guozhang Wang <guozhang@confuent.io>
* StreamsMetricsImpl wraps the Kafka Streams' metrics registry and provides logic to create
and register sensors and their corresponding metrics. An example for such logic can be found in
threadLevelSensor(). Furthermore, StreamsMetricsmpl keeps track of the sensors on the
different levels of an application, i.e., thread, task, etc., and provides logic to remove sensors per
level, e.g., removeAllThreadLevelSensors(). There is one StreamsMetricsImpl object per
application instance.
* ThreadMetrics contains only static methods that specify all built-in thread-level sensors and
metrics and provide logic to register and retrieve those thread-level sensors, e.g., commitSensor().
* From anywhere inside the code base with access to StreamsMetricsImpl, thread-level sensors can be accessed by using ThreadMetrics.
* ThreadsMetrics does not inherit from StreamsMetricsImpl anymore.
Reviewers: A. Sophie Blee-Goldman <sophie@confluent.io>, John Roesler <john@confluent.io>, Guozhang Wang <guozhang@confluent.io>
When the restored record value is null, we are in danger of NPE during restoration phase.
Reviewers: Bill Bejeck <bill@confluent.io>, Guozhang Wang <guozhang@confluent.io>, Matthias J. Sax <matthias@confluent.io>
Sub-task required to allow to define custom processor names with KStreams DSL(KIP-307) :
- overload methods for stateless operations to accept a Named parameter (filter, filterNot, map, mapValues, foreach, peek, branch, transform, transformValue, flatTransform)
- overload process method to accept a Named parameter
- overload join/leftJoin/outerJoin methods
Reviewers: John Roesler <john@confluent.io>, Boyang Chen <boyang@confluent.io>,
Bill Bejeck <bbejeck@gmail.com>
See also #6684
KTable processors must be supplied with a KTableProcessorSupplier, which in turn requires implementing a ValueGetter, for use with joins and groupings.
For suppression, a correct view only includes the previously emitted values (not the currently buffered ones), so this change also involves pushing the Change value type into the suppression buffer's interface, so that it can get the prior value upon first buffering (which is also the previously emitted value).
Reviewers: A. Sophie Blee-Goldman <sophie@confluent.io>, Guozhang Wang <guozhang@confluent.io>
Remove processedKeys / processedValues / processedWithTimestamps as they are covered with processed already.
Reviewers: Matthias J. Sax <mjsax@apache.org>, John Roesler <john@confluent.io>, Boyang Chen <boyang@confluent.io>
Fix KAFKA-8187: State store record loss across multiple reassignments when using standby tasks.
Do not let the thread to transit to RUNNING until all tasks (including standby tasks) are ready.
Reviewers: Guozhang Wang <wangguoz@gmail.com>, Bill Bejeck <bbejeck@gmail.com>
A lot of confusion seems to have arisen from the StreamBuilder#addGlobalStore(...ProcessorSupplier) method. Users have assumed they can safely use this to transform records before populating their global state store; unfortunately this results in corrupted data as on restore the records are read directly from the source topic changelog, bypassing their custom processor.
We should probably provide a means to do this at some point but for the time being we should clarify the proper use of #addGlobalStore as it currently functions
Reviewers: Matthias J. Sax <mjsax@apache.org>, Bruno Cadonna <bruno@confluent.io>
As title states. We plan to merge this to both trunk and 2.3 if it could fix the stream system tests globally.
Reference implementation: #6673
Reviewers: Guozhang Wang <wangguoz@gmail.com>, Matthias J. Sax <mjsax@apache.org>
When we close a task and EOS is enabled we should always close the producer regardless if the task is in a zombie state (the broker fenced the producer) or not.
I've added tests that fail without this change.
Reviewers: Matthias J. Sax <mjsax@apache.org>, Jason Gustafson <jason@confluent.io>
The implementation of KIP-258 broke the state store methods in KStreamTestDriver.
These methods were unused in this project, so the breakage was not detected.
Since this is an internal testing utility, and it was deprecated and partially removed in
favor of TopologyTestDriver, I opted to just complete the removal of the class.
Reviewers: A. Sophie Blee-Goldman <ableegoldman@gmail.com>, Boyang Chen <boyang@confluent.io>, Bill Bejeck <bill@confluent.io>, Matthias J. Sax <matthias@confluent.io>, Guozhang Wang <wangguoz@gmail.com>
Basic idea:
KTable-KTable join: set max(left-ts,right-ts) for result
#agg(...) (stream/table windowed/non-windowed): set max(ts1, ts2, ts3,...) of all input records that contribute to the aggregation result
for all stateless transformation: input-ts -> output-ts
Reviewers: Guozhang Wang <wangguoz@gmail.com>, John Roesler <john@confluent.io>, Andy Coates <andy@confluent.io>, Bill Bejeck <bbejeck@gmail.com
When users provide a name for operation via the Streams DSL, we need to increment the counter used for auto-generated names to make sure any operators downstream of a named operator still produce a compatible name.
This PR is a subset of #6411 by @fhussonnois. We need to merge this PR now because it covers cases when users name repartition topics or state stores.
Updated tests to reflect the counter produces expected number even when the user provides a name.
Matthias J. Sax <mjsax@apache.org>, John Roesler <john@confluent.io>
This upgrade exposes a number of new options, including the WriteBufferManager which -- along with existing TableConfig options -- allows users to limit the total memory used by RocksDB across instances. This can alleviate some cascading OOM potential when, for example, a large number of stateful tasks are suddenly migrated to the same host.
The RocksDB docs guarantee backwards format compatibility across versions
Reviewers: Matthias J. Sax <mjsax@apache.org>, Bill Bejeck <bbejeck@gmail.com>,
When choosing the next record to process, we should look at the head record's timestamp of each partition and choose the lowest rather than choosing the lowest of the partition's streamtime.
This change effectively makes RecordQueue return the timestamp of the head record rather than its streamtime. Streamtime is removed (replaced) from RecordQueue as it was only being tracked in order to choose the next partition to poll from.
Reviewers: Matthias J. Sax <mjsax@apache.org>, Bill Bejeck <bbejeck@gmail.com>