The rebalance exception withholding is no longer necessary as we have better mechanism for catching and wrapping these exceptions. Throw them directly should be fine and simplify our current error handling.
Reviewers: A. Sophie Blee-Goldman <sophie@confluent.io>, John Roesler <john@confluent.io>, Guozhang Wang <wangguoz@gmail.com>
Upfront refactoring for KIP-447.
Introduces `StreamsProducer` that allows to share a producer over multiple tasks and track the TX status.
Reviewers: Boyang Chen <boyang@confluent.io>, Guozhang Wang <guozhang@confluent.io>
This is a minor fix of a regression introduced in the refactoring PR: in current trunk standbyTask#commitNeeded always return false, which would cause standby tasks to never be committed until closed. To go back to the old behavior we would return true when new data has been applied and offsets being updated.
Reviewers: Boyang Chen <boyang@confluent.io>, John Roesler <john@confluent.io>
1. Removed task field from TaskMigrated; the only caller that encodes a task id from StreamTask actually do not throw so we only log it. To handle it on StreamThread we just always enforce rebalance (and we would call onPartitionsLost to remove all tasks as dirty).
2. Added TaskCorruptedException with a set of task-ids. The first scenario of this is the restoreConsumer.poll which throws InvalidOffset indicating that the logs are truncated / compacted. To handle it on StreamThread we first close the corresponding tasks as dirty (if EOS is enabled we would also wipe out the state stores), and then revive them into the CREATED state.
3. Also fixed a bug while investigating KAFKA-9572: when suspending / closing a restoring task we should not commit the new offsets but only updating the checkpoint file.
4. Re-enabled the unit test.
This fixes two issues which together caused the soak to crash/some test to fail occasionally.
What happened was: In the main StreamThread loop we initialized a new task in TaskManager#checkForCompletedRestoration which includes registering, but not initializing, its changelogs. We then complete the loop and call poll, which resulted in a rebalance that revoked the newly-initialized task. In TaskManager#handleAssignment we then closed the task cleanly and go to remove the changelogs from the StoreChangelogReader only to get an IllegalStateException because the changelog partitions were not in the restore consumer's assignment (due to being uninitialized).
This by itself should^ be a recoverable error, as we catch exceptions here and retry closing the task as unclean. Of course the task actually was successfully closed (clean) so we now get an unexpected exception Illegal state CLOSED while closing active task
The fix(es) I'd propose are:
1. Keep the restore consumer's assignment in sync with the registered changelogs, ie the set ChangelogReader#changelogs but pause them until they are initialized edit: since the consumer does still perform some actions (gg fetches) on paused partitions, we should avoid adding uninitialized changelogs to the restore consumer's assignment. Instead, we should just skip them when removing.
2. Move the StoreChangelogReader#remove call to before the task.closeClean so that the task is only marked as closed if everything was successful. We should do so regardless, as we should (attempt to) remove the changelogs even if the clean close failed and we must do unclean.
Reviewers: John Roesler <john@confluent.io>, Guozhang Wang <wangguoz@gmail.com>
Fixes a bug where KStream#transformValues would forward null values from the provided ValueTransform#transform operation.
A test was added for verification KStreamTransformValuesTest#shouldEmitNoRecordIfTransformReturnsNull. A parallel test for non-key ValueTransformer was not added, as they share the same code path.
Reviewers: Bruno Cadonna <bruno@confluent.io>, Bill Bejeck <bbejeck@gmail.com>
This PR fixes two bugs related to stream refactoring:
1. The subscribed topics are not updated correctly when topic gets removed from broker.
2. The remainingPartitions computation doesn't account the case when one task has a pattern subscription of multiple topics. Then the input partition change will not be assumed as containsAll
The bugs are exposed from integration test testRegexMatchesTopicsAWhenDeleted and could be used to verify the fix works.
Reviewers: A. Sophie Blee-Goldman <sophie@confluent.io>, Guozhang Wang <wangguoz@gmail.com>
1. Delay the initialization (producer.initTxn) from construction to maybeInitialize; if it times out we just swallow and retry in the next iteration.
2. If completeRestoration (consumer.committed) times out, just swallow and retry in the next iteration.
3. For other calls (producer.partitionsFor, producer.commitTxn, consumer.commit), treat the timeout exception as fatal.
Reviewers: Matthias J. Sax <matthias@confluent.io>
Correct the process-rate (and total) sensor to measure throughput (and total record processing count).
Reviewers: Guozhang Wang <guozhang@confluent.io>
- Added additional synchronization and increased timeouts to handle flakiness
- Added some pre-cautionary retries when trying to obtain lag map
Reviewers: Guozhang Wang <wangguoz@gmail.com>
Corrects a flaw leading to an exception while building topologies that include both:
* A foreign-key join with the result not explicitly materialized
* An operation after the join that requires source materialization
Also corrects a flaw in TopologyTestDriver leading to output records being enqueued in the wrong order under some (presumably rare) circumstances.
Reviewers: Matthias J. Sax <matthias@confluent.io>, Guozhang Wang <wangguoz@gmail.com>
Migrates TopologyTestDriver processing to be closer to the same processing/ordering
semantics as KafkaStreams. This corrects the output order for recursive topologies
as reported in KAFKA-9503, and also works similarly in the case of task idling.
* Added init() method to RocksDBMetricsRecorder
* Added call to init() of RocksDBMetricsRecorder to init() of RocksDB store
* Added call to init() of RocksDBMetricsRecorder to openExisting() of segmented state stores
* Adapted unit tests
* Added integration test that reproduces the situation in which the bug occurred
Reviewers: Guozhang Wang <wangguoz@gmail.com>
During the discussion for KIP-213, we decided to pass "pseudo-topics"
to the internal serdes we use to construct the wrapper serdes for
CombinedKey and hashing the left-hand-side value. However, during
the implementation, this strategy wasn't fully implemented, and we wound
up using the same topic name for a few different data types.
Reviewers: Guozhang Wang <guozhang@confluent.io>
During the KIP-213 implementation and verification, we neglected to test the
code path for falling back to default serdes if none are given in the topology.
Reviewer: Bill Bejeck <bbejeck@gmail.com>
Relying on integration test to catch an algorithm bug introduces more flakiness, reduce the test into a unit test to reduce the flakiness until we upgrade Java/Scala libs.
Checked the test shall fail with older version of StreamsPartitionAssignor.
Reviewers: Guozhang Wang <wangguoz@gmail.com>
Found this bug from the repeated flaky runs of system tests, it seems to be long lurking but also would only happen if there are frequent rebalances / topic creation within a short time, which is exactly the case in some of our smoke system tests.
Also added a unit test.
Reviewers: Boyang Chen <boyang@confluent.io>, A. Sophie Blee-Goldman <sophie@confluent.io>, Matthias J. Sax <matthias@confluent.io>
Follows up on the original PR for KAFKA-9445 to address a final round of feedback
Reviewers: John Roesler <vvcephei@apache.org>, Matthias J. Sax <matthias@confluent.io>
Follow up to original PR #7985 for KIP-523 (adding `KStream#toTable()` operator)
- improve JavaDocs
- add more unit tests
- fix bug for auto-repartitioning
- some code cleanup
Reviewers: High Lee <yello1109@daum.net>, John Roesler <john@confluent.io>
1. StoreChangelogReaderTest.shouldRequestCommittedOffsetsAndHandleTimeoutException[1] This is due to stricter ternary operator type casting
2. KStreamImplTest.shouldSupportTriggerMaterializedWithKTableFromKStream
This is added recently where String typed values for <String, Integer>, in J8 it is allowed but in J11 it is not allowed.
Reviewers: John Roesler <john@confluent.io>
Followup to KAFKA-7317 and KAFKA-9113, there's some additional cleanup we can do in InternalTopologyBuilder. Mostly refactors the subscription code to make the initialization more explicit and reduce some duplicated code in the update logic.
Also some minor cleanup of the build method.
Reviewers: Guozhang Wang <wangguoz@gmail.com>
Instead of always try to update committed offset limits as long as there are buffered records for standby tasks, we leverage on the commit interval to reduce our consumer.committed frequency.
Reviewers: Chia-Ping Tsai <chia7712@gmail.com>, John Roesler <john@confluent.io>
Since ValueAndTimestampSerializer wraps an unknown Serializer, the output of that Serializer can be null. In which case the line
.allocate(rawTimestamp.length + rawValue.length)
will throw a NullPointerException.
This pull request returns null instead.
Reviewers: Matthias J. Sax <matthias@confluent.io>, Guozhang Wang <wangguoz@gmail.com>
This PR is collaborated by Guozhang Wang and John Roesler. It is a significant tech debt cleanup on task management and state management, and is broken down by several sub-tasks listed below:
Extract embedded clients (producer and consumer) into RecordCollector from StreamTask.
guozhangwang#2
guozhangwang#5
Consolidate the standby updating and active restoring logic into ChangelogReader and extract out of StreamThread.
guozhangwang#3
guozhangwang#4
Introduce Task state life cycle (created, restoring, running, suspended, closing), and refactor the task operations based on the current state.
guozhangwang#6
guozhangwang#7
Consolidate AssignedTasks into TaskManager and simplify the logic of changelog management and task management (since they are already moved in step 2) and 3)).
guozhangwang#8
guozhangwang#9
Also simplified the StreamThread logic a bit as the embedded clients / changelog restoration logic has been moved into step 1) and 2).
guozhangwang#10
Reviewers: A. Sophie Blee-Goldman <sophie@confluent.io>, Bruno Cadonna <bruno@confluent.io>, Boyang Chen <boyang@confluent.io>
This ticket shall improve two aspects of the retrieval of sensors:
https://issues.apache.org/jira/browse/KAFKA-9152
Currently, when a sensor is retrieved with *Metrics.*Sensor() (e.g. ThreadMetrics.createTaskSensor()) after it was created with the same method *Metrics.*Sensor(), the sensor is added again to the corresponding queue in Sensors (e.g. threadLevelSensors) in StreamsMetricsImpl. Those queues are used to remove the sensors when removeAllLevelSensors() is called. Having multiple times the same sensors in this queue is not an issue from a correctness point of view. However, it would reduce the footprint to only store a sensor once in those queues.
When a sensor is retrieved, the current code attempts to create a new sensor and to add to it again the corresponding metrics. This could be avoided.
Both aspects could be improved by checking whether a sensor already exists by calling getSensor() on the Metrics object and checking the return value.
Reviewers: Bruno Cadonna <bruno@confluent.io>, Bill Bejeck <bbejeck@gmail.com>
Also addresses KAFKA-8821
Note that we still have to fall back to using pattern subscription if the user has added any regex-based source nodes to the topology. Includes some minor cleanup on the side
Reviewers: Bill Bejeck <bbejeck@gmail.com>
Add a new method to KafkaStreams to return an estimate of the lags for
all partitions of all local stores.
Implements: KIP-535
Co-authored-by: Navinder Pal Singh Brar <navinder_brar@yahoo.com>
Reviewed-by: John Roesler <vvcephei@apache.org>
Add a new overload of KafkaStreams#store that allows users
to query standby and restoring stores in addition to active ones.
Closes: #7962
Implements: KIP-535
Co-authored-by: Navinder Pal Singh Brar <navinder_brar@yahoo.com>
Reviewed-by: John Roesler <vvcephei@apache.org>
Deprecate existing metadata query APIs in favor of new
ones that include standby hosts as well as partition
information.
Closes: #7960
Implements: KIP-535
Co-authored-by: Navinder Pal Singh Brar <navinder_brar@yahoo.com>
Reviewed-by: John Roesler <vvcephei@apache.org>
Not wait until updateAssignmentMetadataIfNeeded returns true, but only call it once with 0 timeout. Also do not return empty if in rebalance.
Trim the pre-fetched records after long polling since assignment may have been changed.
Also need to update SubscriptionState to retain the state in assignFromSubscribed if it already exists (similar to assignFromUser), so that we do not need the transition of INITIALIZING to FETCHING.
Unit test: this actually took me the most time :)
Reviewers: John Roesler <john@confluent.io>, Bill Bejeck <bill@confluent.io>, Bruno Cadonna <bruno@confluent.io>, Sophie Blee-Goldman <sophie@confluent.io>, Jason Gustafson <jason@confluent.io>, Richard Yu <yohan.richard.yu@gmail.com>, dengziming <dengziming1993@gmail.com>